Tuesday, May 26, 2009

HBase Schema Manager

As already mentioned in one of my previous posts, HBase at times makes it difficult to maintain or even create a new table structure. Imagine you have a running cluster and quite an elaborate table setup. Now you want to to create a backup cluster for load balancing and general tasks like reporting etc. How do you get all the values from one system into the other?

While you can use various examples that help you backing up the data and eventually restore it, how do you "clone" the table schemas?

Or imagine you have an existing system like the one we talked about above and you simply want to change a few things around. With an RDBMS you can save the required steps in a DDL statement and execute it on the server - or the backup server etc. But with HBase there is now DDL or even the possibility of executing pre-built scripts against a running cluster.

What I described in my previous post was a why to store the table schemas into an XML configuration file and run that against a cluster. The code handles adding new tables and more importantly the addition, removal and modification of column families for any named table.

I have put this all into a separate Java application that may be useful to you. You can get it from my GitHub repository. It is really simple to use, you create an XML based configuration file, for example:
<?xml version="1.0" encoding="UTF-8"?>
<description>Configuration for the FooBar HBase cluster.</description>
<description>Test table.</description>
<description>Sample column.</description>
<!-- Default: 3 -->
<!-- Default: false -->
<!-- Default: false -->
<!-- Default: -1 (forever) -->
<!-- Default: 2147483647 -->

Then all you have to do is run the application like so:

java -jar hbase-manager-1.0.jar schema.xml

The "schema.xml" is the above XML configuration saved on your local machine. The output shows the steps performed:
$ java -jar hbase-manager-1.0.jar schema.xml
creating table test...
table created

You can also specify more options on the command line:
usage: HbaseManager [<options>] <schema-xml-filename> [<config-name>]
-l,--list lists all tables but performs no further action.
-n,--nocreate do not create non-existent tables.
-v,--verbose print verbose output.

If you use the "verbose" option you get more details:
$ java -jar hbase-manager-1.0.jar -v schema.xml
schema filename: schema.xml
configuration used: null
using config number: default
table schemas read from config:
[name -> test
description -> Test table.
columns -> {sample=name -> sample
description -> Sample column.
maxVersions -> 1
compressionType -> NONE
inMemory -> false
blockCacheEnabled -> false
maxValueLength -> 2147483647
timeToLive -> -1
bloomFilter -> false}]
hbase.master -> foo.bar.com:60000
authoritative -> true
name -> test
tableExists -> true
changing table test...
no changes detected!

Finally you can use the "list" option to check initial connectivity and the successful changes:
$ java -jar hbase-manager-1.0.jar -l schema.xml
tables found: 1

A few notes: First and most importantly, if you change a large table, i.e. one with thousands of regions, this process can take quite a long time. This is caused by the enableTable() call having to scan the complete .META. table to assign the regions to their respective region servers. There is possibly room for improvement in my little application to handle this better - suggestions welcome!

Also, I do not have Bloom Filter settings implemented, as this is still changing from 0.19 to 0.20. Once it has been finalized I will add support for it.

If you do not specify a configuration name then the first one is used. Having more than one configuration allows you to have multiple clusters defined in one schema file and by specifying the name you can execute only a specific one when you need to.

Monday, May 11, 2009

HBase MapReduce 101 - Part I

In this and the following posts I would like to take the opportunity to go into detail about the MapReduce process as provided by Hadoop but more importantly how it applies to HBase.


MapReduce as a process was designed to solve the problem of processing in excess of terabytes of data in a scalable way. There should be a way to design such a system that increases in performance linearly with the number of physical machines added. That is what MapReduce strives to do. It follows a divide-and-conquer approach by splitting the data located on a distributed file system so that the servers (or rather cpu's, or more modern "cores") available can access these pieces and process them as fast as they can. The problem with this approach is that you will have to consolidate the data at the end. Again, MapReduce has this built right into it.

The above simplified image of the MapReduce process shows you how the data is processed. The first thing that happens is the split which is responsible to divide the input data in reasonable size chunks that are then processed by one server at a time. This splitting has to be done somewhat smart to make best use of available servers and the infrastructure in general. In this example the data may be a very large log file that is divided into equal size pieces on line boundaries. This is OK for example for say Apache log files. Input data may also be binary though where you may have to write your own getSplits() method - but more on that below.


The above image also shows you the classes that are involved in the Hadoop implementation of MapReduce. Let's look at them and also at the specific implementations that HBase provides on top of those.


The first class to deal with is the InputFormat class. It is responsible for two things. First is does the actual splitting of the input data as well as returning a RecordReader instance that defines the classes of the key and value objects as well as providing a next() method that is used to iterate over each input record.

As far as HBase is concerned there is a special implementation called TableInputFormatBase as well as its subclass TableInputFormat. The former implements the majority of the functionality but remains abstract. The subclass is a light-weight concrete version of the TableInputFormat and is used by many supplied sample and real MapReduce classes.

But most importantly these classed implement the full turn-key solution to scan a HBase table. You can provide the name of the table to scan and the columns you want to process during the Map phase. It splits the table into proper pieces for you and hands them over to the subsequent classes. There are quite a few tweaks which we will address below and in the following installments of this series.

For now let's look at the other classes involved.


The Mapper class(es) are for the next stage of the MapReduce process and one of its namesakes. In this step each record read using the RecordReader is processed using the map() method. What is also visible somewhat from the first figure above is that the Mapper reads a specific type of key/value pair but emits possibly another. This is handy to convert the raw data into something more useful for further processing.

Again, looking at HBase's extensions to this, you will find a TableMap class that is specific to iterating over a HBase table. Once specific implementation is the IdentityTableMap which is also a good example on how to add your own functionality to the supplied classes. The TableMap class itself does not implement anything but only adds the signatures of what the actual key/value pair classes are. The IdentityTableMap is simply passing on the records to the next stage of the processing.


The Reduce stage and class layout is very similar to the Mapper one explained above. This time we get the output of a Mapper class and process it after the data was shuffled and sorted. In the implicit shuffle between the Mapper and Reducer stages the intermediate data is copied from different Map to the Reduce servers and the sort combines the shuffled (copied) data so that the Reducer sees the intermediate data as a nicely sorted set where now each unique key (and that is something I will get back to later) is associated with all of the possible values it was found with.


The final stage is the OutputFormat class and its job to persist the data in various locations. There are specific implementations that allow output to files or to HBase tables in case of the TableOutputFormat. It uses a RecordWriter to write the data into the specific HBase output table.

It is important to note the cardinality as well. While there are many Mappers handing records to many Reducers, there is only one OutputFormat that takes each output record from its Reducer subsequently. It is the final class handling the key/value pairs and writes them to their final destination, this being a file or a table.

The name of the output table is specified when the job is created. Otherwise it does not add much more complexity. One rather significant thing it does is set the table's auto flush to "false" and handles the buffer flushing implicitly. This helps a lot speeding up the import of large data sets.

To Map or Reduce or Not Map or Reduce

This is now a crucial point we are at deciding on how to process tables stored in HBase. From the above it seems that we simply use a TableInputFormat to feed through a s TableMap and TableReduce to eventually persist the data with a TableOutputFormat. But this may not be what you want when you deal with HBase. The question is if there is a better way to handle the process given certain specific architectural features HBase provides. Depending on your data source and target there are a few different scenarios we could think of.

If you want to import a large set of data into a HBase table you can read the data using a Mapper and after aggregating it on a per key basis using a Reducer and finally writing it into a HBase table. This involves the whole MapReduce stack including the shuffle and sort using intermediate files. But what if you know that the data for example has a unique key? Why go through the extra step of copying and sorting when there is always just exactly one key/value pair? At this point you can ask yourself, wouldn't it be better if I could skip that whole reduce stage? The answer is yes you can! And you should as you can harvest the pure computational power of all CPU's to crunch the data and writing it at top IO speed to its final target.

As you can see from the matrix, there are quite a few scenarios where you can decide if you want Map only or both, Map and Reduce. But when it comes to handling HBase tables as sources and targets there are a few exceptions to this rule and I highlighted them accordingly.

Same or different Tables

This is the an important distinction. The bottom line is, when you read a table in the Map stage you should consider not writing back to that very same table in the same process. It could on one hand hinder the proper distribution of regions across the servers (open scanners block regions splits) and on the other hand you may or may not see the new data as you scan. But when you read from one table and write to another then you can do that in a single stage. So for two different tables you can write your table updates directly in the TableMap.map() while with the same table you must write the same code in the TableReduce.reduce() - or in its TableOutputFormat (or even better simply use that class as is and you are done). The reason is that the Map stage completely reads a table and then passes the data on in intermediate files to the Reduce stage. In turn this means that the Reducer reads from the distributed file system (DFS) and writes into the now idle HBase table. And all is well.

All of the above are simply recommendations based on what is currently available with HBase. There is of course no reason not to scan and modify a table in the same process. But to avoid certain non-deterministic issue I personally would not recommend this - especially if you are new to HBase. Maybe this also could be taken into consideration when designing your HBase tables. Maybe you separate distinct data into two tables or separate column families so you can scan one table while changing the other.

Key Distribution

Another specific requirement for an effective import is to have a random keys as they are read. While this will be difficult if you scan a table in the Map phase, as keys are sorted, you may be able to make use of this when reading from a raw data file. Instead of leaving the key the offset of the file, as created by the TextOutputFormat for example, you could simply replace the rather useless offset with a random key. This will guarantee that the data is spread across all servers more evenly. Especially the HRegionServers will be very thankful as they each host as set of regions and random keys makes for a random load on these regions.

Of course this depends on how the data is written to the raw files or how the real row keys are computed, but still a very valuable thing to keep in mind.

In the next post I will show you how to import data from a raw data file into a HBase table and how you eventually process the data in the HBase table. We will address questions like how many mappers and/or reducers are needed and how can I improve import and processing performance.

Until then, have a good day!

Sunday, May 3, 2009

European HBase Ambassador

I am on a mission! The mission is to spread the word on HBase. There are only a few choices when it comes to large scale data storage solutions. What I am referring to is not your vertical scale, big-iron relational database system. The time is now for a paradigm shift.

There are various key/value or more structured stores that strive to achieve the same - but getting proper information is often a big challenge. Either there is no proper examples, or use-cases for that matter. Even worse are performance details, the usual rap being "all depends!" - of course it does. Without real examples it is difficult to determine if a suitable system design wise is also holding up to the task at hand. How many servers are needed? How should the hardware stack be organized?

Here is where I feel I can help. I am using HBase for over a year now (started prototyping in late 2007), in production with three clusters spread over more than 50 servers. And I pretty much set them up from plugging in the hardware to the designing and running the cluster and the system on top of that. While this is in itself nothing special, I feel that I gained a lot of experience with HBase. I also feel that it could be very helpful to others that are thinking about HBase and how it may help them.

So I hereby declare myself to be a European HBase Ambassador. Mind you, this is no official title. The purpose is to help furthering the adoption of HBase in research and/or commercial projects. So what does this "position" entail? I offer this:

For the cost of travel (and if necessary accommodation) I will present any aspect of HBase in production to whoever is interested.
Yes, I will come to you if you ask, no matter where in Europe (or beyond). I will present on the internals of HBase, its API and so on to developers or higher concepts to architects. Or to management on a white paper level. You want to know about HBase and how it can help you? Let me know and I will show you.

Why me? Besides what I mentioned above I have over 13 years experience in software engineering and am responsible as the CTO at WorldLingo which for example is the sole provider for all machine translations in Microsoft Office for Windows and MacOS - given the text is longer than a few words because otherwise the internal word dictionaries are taking preference. I write and speak German (native) and English fluently. Last but not least I have regular contact with the core developers of HBase and am a contributor myself - as much as time allows.

So there you have it. This is my offer. I hope you see value in and take me up on it! I surely am looking forward to meeting you.