Wednesday, January 28, 2009

Changing HBase Tables in Code

One thing I did early on is storing the HBase table descriptions in an external XML file, like a database schema. For example:
<table>
<name>documents</name>
<table_name>docs</table_name>
<description>Stores the actual documents.</description>
<column_family>
<name>contents</name>
<description>Holds the actual raw data.</description>
<!-- Default: 3 -->
<max_versions></max_versions>
<!-- Default: DEFAULT_COMPRESSION_TYPE -->
<compression_type></compression_type>
<!-- Default: false -->
<in_memory></in_memory>
<!-- Default: false -->
<block_cache_enabled/>
<!-- Default: -1 (forever) -->
<time_to_live/>
<!-- Default: 2147483647 -->
<max_value_length></max_value_length>
<!-- Default: DEFAULT_BLOOM_FILTER_DESCRIPTOR -->
<bloom_filter></bloom_filter>
</column_family>
<column_family>
<name>mimetype</name>
<description>Holds the MIME type of the data.</description>
<!-- Default: 3 -->
<max_versions></max_versions>
<!-- Default: DEFAULT_COMPRESSION_TYPE -->
<compression_type></compression_type>
<!-- Default: false -->
<in_memory></in_memory>
<!-- Default: false -->
<block_cache_enabled/>
<!-- Default: -1 (forever) -->
<time_to_live/>
<!-- Default: 2147483647 -->
<max_value_length></max_value_length>
<!-- Default: DEFAULT_BLOOM_FILTER_DESCRIPTOR -->
<bloom_filter></bloom_filter>
</column_family>
</table>

While this adds extra work to maintain the schemas, it does give a central place where all the metadata about the HBase tables is stored.

In the code I added the functionality to read these XML files into internal classes that represent each table. For example:
/**
* Describes a table HBase independent to be used by calling class.
*/
public class TableSchema {

private String name = null;
private String description = null;
private String tableName = null;
private HashMap<String, ColumnDefinition> columns = new HashMap<String, ColumnDefinition>();

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getDescription() {
return description;
}

public void setDescription(String description) {
this.description = description;
}

public String getTableName() {
return tableName;
}

public void setTableName(String tableName) {
this.tableName = tableName;
}

public void addColumn(ColumnDefinition column) {
columns.put(column.getName(), column);
}

public Collection<ColumnDefinition> getColumns() {
return columns.values();
}

public ColumnDefinition getColumnDefinition(String name) {
return columns.get(name);
}

@Override
public String toString() {
return "name -> " + name + "\n description -> " + description +
"\n tableName -> " + tableName + "\n columns -> " + columns;
}

} // TableSchema

In addition I added a function to convert these instances into those that HBase understands. I also added a generic helper to get a table reference:
/**
* Converts the XML based schema to a version HBase can take natively.
*
* @param schema The schema with the all tables.
* @return The converted schema as a HBase object.
*/
private HTableDescriptor convertSchemaToDescriptor(TableSchema schema) {
HTableDescriptor desc;
desc = new HTableDescriptor(schema.getTableName());
Collection<ColumnDefinition> cols = schema.getColumns();
for (ColumnDefinition col : cols) {
HColumnDescriptor cd = new HColumnDescriptor(Bytes.toBytes(col.getColumnName()), col.getMaxVersions(),
col.getCompressionType(), col.isInMemory(), col.isBlockCacheEnabled(), col.getMaxValueLength(),
col.getTimeToLive(), col.isBloomFilter());
desc.addFamily(cd);
}
return desc;
} // convertSchemaToDescriptor

/**
* Returns a table descriptor or <code>null</code> if it does not exist.
*
* @param name The name of the table.
* @return The table descriptor or <code>null</code>.
* @throws IOException When the communication to HBase fails.
*/
private HTableDescriptor getHBaseTable(String name) throws IOException {
HTableDescriptor[] tables = _hbaseAdmin.listTables();
for (int i = 0; i < tables.length; i++)
if (tables[i].getNameAsString().equals(name)) return tables[i];
return null;
} // getHBaseTable

Now I can ask for a table and if it does not exist I can create it. But what if it does exist already? I am facing the problem of checking if a table schema is different from the table that is deployed. If it is the same, fine, simply load it, but if it is different you have to compare the column definitions and change those columns that have changed. Here is an approach:
/**
* Returns a HBase table. The table is either opened, created or updated.
*
* @param schema The external schema describing the table.
* @param create True means create table if non existent.
* @return The internal table container.
* @throws IOException When the table creation fails.
*/
private TableContainer createTable(TableSchema schema, boolean create)
throws IOException {
TableContainer res = new TableContainer();
res.setSchema(schema);
HTableDescriptor desc = null;
if (_hbaseAdmin.tableExists(schema.getTableName())) {
desc = getHBaseTable(schema.getTableName());
// only check for changes if we are allowed to
if (create) {
HTableDescriptor d = convertSchemaToDescriptor(schema);
// compute differences
List<HColumnDescriptor> modCols = new ArrayList<HColumnDescriptor>();
for (HColumnDescriptor cd : desc.getFamilies()) {
HColumnDescriptor cd2 = d.getFamily(cd.getName());
if (cd2 != null && !cd.equals(cd2)) modCols.add(cd2);
}
List<HColumnDescriptor> delCols = new ArrayList<HColumnDescriptor>(desc.getFamilies());
delCols.removeAll(d.getFamilies());
List<HColumnDescriptor> addCols = new ArrayList<HColumnDescriptor>(d.getFamilies());
addCols.removeAll(desc.getFamilies());
// check if we had a column that was changed, added or deleted
if (modCols.size() > 0 || addCols.size() > 0 || delCols.size() > 0) {
// yes, then disable table and iterate over changes
_hbaseAdmin.disableTable(schema.getTableName());
for (HColumnDescriptor col : modCols)
_hbaseAdmin.modifyColumn(schema.getTableName(), col.getNameAsString(), col);
for (HColumnDescriptor col : addCols)
_hbaseAdmin.addColumn(schema.getTableName(), col);
for (HColumnDescriptor col : delCols)
_hbaseAdmin.deleteColumn(schema.getTableName(), col.getNameAsString() + ":");
// enable again and reload details
_hbaseAdmin.enableTable(schema.getTableName());
desc = getTable(schema.getTableName(), false);
}
}
} else if (create) {
desc = convertSchemaToDescriptor(schema);
_hbaseAdmin.createTable(desc);
}
res.setDescription(desc);
HTable table = null;
if (desc != null) table = new HTable(_hbaseConfig, desc.getName());
res.setTable(table);
return res;
} // createTable

That's it I guess. Please note that this is my attempt of solving it, not sure yet if it works. I will test it as soon as I can and update here accordingly. But I thought I throw it out anyways, who knows maybe it helps someone or someone can help me. :)

Oh, for completeness sake, here the returned class I created to hold my table details:
/**
* Container to hold a table's details.
*/
class TableContainer {

private HTable table;
private HTableDescriptor description;
private TableSchema schema;

public HTable getTable() {
return table;
}

public void setTable(HTable table) {
this.table = table;
}

public HTableDescriptor getDescription() {
return description;
}

public void setDescription(HTableDescriptor description) {
this.description = description;
}

public TableSchema getSchema() {
return schema;
}

public void setSchema(TableSchema schema) {
this.schema = schema;
}

@Override
public String toString() {
return "table -> " + table + ", description -> " + description +
", schema -> " + schema;
}

} // TableContainer

/**
* Describes a column and its features.
*/
public class ColumnDefinition {

/** The divider between the column family name and a label. */
public static final String DIV_COLUMN_LABEL = ":";
/** Default values for HBase. */
private static final int DEF_MAX_VERSIONS = HColumnDescriptor.DEFAULT_VERSIONS;
/** Default values for HBase. */
private static final CompressionType DEF_COMPRESSION_TYPE = HColumnDescriptor.DEFAULT_COMPRESSION;
/** Default values for HBase. */
private static final boolean DEF_IN_MEMORY = HColumnDescriptor.DEFAULT_IN_MEMORY;
/** Default values for HBase. */
private static final boolean DEF_BLOCKCACHE_ENABLED = HColumnDescriptor.DEFAULT_BLOCKCACHE;
/** Default values for HBase. */
private static final int DEF_MAX_VALUE_LENGTH = HColumnDescriptor.DEFAULT_LENGTH;
/** Default values for HBase. */
private static final int DEF_TIME_TO_LIVE = HColumnDescriptor.DEFAULT_TTL;
/** Default values for HBase. */
private static final boolean DEF_BLOOM_FILTER = HColumnDescriptor.DEFAULT_BLOOMFILTER;

private String name;
private String tableName;
private String description;
private int maxVersions = DEF_MAX_VERSIONS;
private CompressionType compressionType = DEF_COMPRESSION_TYPE;
private boolean inMemory = DEF_IN_MEMORY;
private boolean blockCacheEnabled = DEF_BLOCKCACHE_ENABLED;
private int maxValueLength = DEF_MAX_VALUE_LENGTH;
private int timeToLive = DEF_TIME_TO_LIVE;
private boolean bloomFilter = DEF_BLOOM_FILTER;

public String getColumnName() {
return name.endsWith(":") ? name : name + ":";
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getTableName() {
return tableName;
}

public void setTableName(String tableName) {
this.tableName = tableName;
}

public String getDescription() {
return description;
}

public void setDescription(String description) {
this.description = description;
}

public int getMaxVersions() {
return maxVersions;
}

public void setMaxVersions(int maxVersions) {
this.maxVersions = maxVersions;
}

public CompressionType getCompressionType() {
return compressionType;
}

public void setCompressionType(CompressionType compressionType) {
this.compressionType = compressionType;
}

public boolean isInMemory() {
return inMemory;
}

public void setInMemory(boolean inMemory) {
this.inMemory = inMemory;
}

/**
* @return Returns the blockCacheEnabled.
*/
public boolean isBlockCacheEnabled() {
return blockCacheEnabled;
}

/**
* @param blockCacheEnabled The blockCacheEnabled to set.
*/
public void setBlockCacheEnabled(boolean blockCacheEnabled) {
this.blockCacheEnabled = blockCacheEnabled;
}

/**
* @return Returns the timeToLive.
*/
public int getTimeToLive() {
return timeToLive;
}

/**
* @param timeToLive The timeToLive to set.
*/
public void setTimeToLive(int timeToLive) {
this.timeToLive = timeToLive;
}

/**
* @return Returns the bloomFilter.
*/
public boolean isBloomFilter() {
return bloomFilter;
}

/**
* @param bloomFilter The bloomFilter to set.
*/
public void setBloomFilter(boolean bloomFilter) {
this.bloomFilter = bloomFilter;
}

public int getMaxValueLength() {
return maxValueLength;
}

public void setMaxValueLength(int maxValueLength) {
this.maxValueLength = maxValueLength;
}

@Override
public String toString() {
return "name -> " + name +
"\n tableName -> " + tableName +
"\n description -> " + description +
"\n maxVersions -> " + maxVersions +
"\n compressionType -> " + compressionType +
"\n inMemory -> " + inMemory +
"\n blockCacheEnabled -> " + blockCacheEnabled +
"\n maxValueLength -> " + maxValueLength +
"\n timeToLive -> " + timeToLive +
"\n bloomFilter -> " + bloomFilter;
} // toString

} // ColumnDefinition

Not much to it obviously, but hey.

Update: I fixed the code to handle added and removed columns properly. The previous version would only handle changed columns.

1 comment: