Monday, May 11, 2009

HBase MapReduce 101 - Part I

In this and the following posts I would like to take the opportunity to go into detail about the MapReduce process as provided by Hadoop but more importantly how it applies to HBase.

MapReduce


MapReduce as a process was designed to solve the problem of processing in excess of terabytes of data in a scalable way. There should be a way to design such a system that increases in performance linearly with the number of physical machines added. That is what MapReduce strives to do. It follows a divide-and-conquer approach by splitting the data located on a distributed file system so that the servers (or rather cpu's, or more modern "cores") available can access these pieces and process them as fast as they can. The problem with this approach is that you will have to consolidate the data at the end. Again, MapReduce has this built right into it.

The above simplified image of the MapReduce process shows you how the data is processed. The first thing that happens is the split which is responsible to divide the input data in reasonable size chunks that are then processed by one server at a time. This splitting has to be done somewhat smart to make best use of available servers and the infrastructure in general. In this example the data may be a very large log file that is divided into equal size pieces on line boundaries. This is OK for example for say Apache log files. Input data may also be binary though where you may have to write your own getSplits() method - but more on that below.

Classes


The above image also shows you the classes that are involved in the Hadoop implementation of MapReduce. Let's look at them and also at the specific implementations that HBase provides on top of those.

InputFormat


The first class to deal with is the InputFormat class. It is responsible for two things. First is does the actual splitting of the input data as well as returning a RecordReader instance that defines the classes of the key and value objects as well as providing a next() method that is used to iterate over each input record.

As far as HBase is concerned there is a special implementation called TableInputFormatBase as well as its subclass TableInputFormat. The former implements the majority of the functionality but remains abstract. The subclass is a light-weight concrete version of the TableInputFormat and is used by many supplied sample and real MapReduce classes.

But most importantly these classed implement the full turn-key solution to scan a HBase table. You can provide the name of the table to scan and the columns you want to process during the Map phase. It splits the table into proper pieces for you and hands them over to the subsequent classes. There are quite a few tweaks which we will address below and in the following installments of this series.

For now let's look at the other classes involved.

Mapper


The Mapper class(es) are for the next stage of the MapReduce process and one of its namesakes. In this step each record read using the RecordReader is processed using the map() method. What is also visible somewhat from the first figure above is that the Mapper reads a specific type of key/value pair but emits possibly another. This is handy to convert the raw data into something more useful for further processing.

Again, looking at HBase's extensions to this, you will find a TableMap class that is specific to iterating over a HBase table. Once specific implementation is the IdentityTableMap which is also a good example on how to add your own functionality to the supplied classes. The TableMap class itself does not implement anything but only adds the signatures of what the actual key/value pair classes are. The IdentityTableMap is simply passing on the records to the next stage of the processing.

Reducer


The Reduce stage and class layout is very similar to the Mapper one explained above. This time we get the output of a Mapper class and process it after the data was shuffled and sorted. In the implicit shuffle between the Mapper and Reducer stages the intermediate data is copied from different Map to the Reduce servers and the sort combines the shuffled (copied) data so that the Reducer sees the intermediate data as a nicely sorted set where now each unique key (and that is something I will get back to later) is associated with all of the possible values it was found with.

OutputFormat


The final stage is the OutputFormat class and its job to persist the data in various locations. There are specific implementations that allow output to files or to HBase tables in case of the TableOutputFormat. It uses a RecordWriter to write the data into the specific HBase output table.

It is important to note the cardinality as well. While there are many Mappers handing records to many Reducers, there is only one OutputFormat that takes each output record from its Reducer subsequently. It is the final class handling the key/value pairs and writes them to their final destination, this being a file or a table.

The name of the output table is specified when the job is created. Otherwise it does not add much more complexity. One rather significant thing it does is set the table's auto flush to "false" and handles the buffer flushing implicitly. This helps a lot speeding up the import of large data sets.

To Map or Reduce or Not Map or Reduce


This is now a crucial point we are at deciding on how to process tables stored in HBase. From the above it seems that we simply use a TableInputFormat to feed through a s TableMap and TableReduce to eventually persist the data with a TableOutputFormat. But this may not be what you want when you deal with HBase. The question is if there is a better way to handle the process given certain specific architectural features HBase provides. Depending on your data source and target there are a few different scenarios we could think of.

If you want to import a large set of data into a HBase table you can read the data using a Mapper and after aggregating it on a per key basis using a Reducer and finally writing it into a HBase table. This involves the whole MapReduce stack including the shuffle and sort using intermediate files. But what if you know that the data for example has a unique key? Why go through the extra step of copying and sorting when there is always just exactly one key/value pair? At this point you can ask yourself, wouldn't it be better if I could skip that whole reduce stage? The answer is yes you can! And you should as you can harvest the pure computational power of all CPU's to crunch the data and writing it at top IO speed to its final target.

As you can see from the matrix, there are quite a few scenarios where you can decide if you want Map only or both, Map and Reduce. But when it comes to handling HBase tables as sources and targets there are a few exceptions to this rule and I highlighted them accordingly.

Same or different Tables

This is the an important distinction. The bottom line is, when you read a table in the Map stage you should consider not writing back to that very same table in the same process. It could on one hand hinder the proper distribution of regions across the servers (open scanners block regions splits) and on the other hand you may or may not see the new data as you scan. But when you read from one table and write to another then you can do that in a single stage. So for two different tables you can write your table updates directly in the TableMap.map() while with the same table you must write the same code in the TableReduce.reduce() - or in its TableOutputFormat (or even better simply use that class as is and you are done). The reason is that the Map stage completely reads a table and then passes the data on in intermediate files to the Reduce stage. In turn this means that the Reducer reads from the distributed file system (DFS) and writes into the now idle HBase table. And all is well.

All of the above are simply recommendations based on what is currently available with HBase. There is of course no reason not to scan and modify a table in the same process. But to avoid certain non-deterministic issue I personally would not recommend this - especially if you are new to HBase. Maybe this also could be taken into consideration when designing your HBase tables. Maybe you separate distinct data into two tables or separate column families so you can scan one table while changing the other.

Key Distribution

Another specific requirement for an effective import is to have a random keys as they are read. While this will be difficult if you scan a table in the Map phase, as keys are sorted, you may be able to make use of this when reading from a raw data file. Instead of leaving the key the offset of the file, as created by the TextOutputFormat for example, you could simply replace the rather useless offset with a random key. This will guarantee that the data is spread across all servers more evenly. Especially the HRegionServers will be very thankful as they each host as set of regions and random keys makes for a random load on these regions.

Of course this depends on how the data is written to the raw files or how the real row keys are computed, but still a very valuable thing to keep in mind.

In the next post I will show you how to import data from a raw data file into a HBase table and how you eventually process the data in the HBase table. We will address questions like how many mappers and/or reducers are needed and how can I improve import and processing performance.

Until then, have a good day!

4 comments:

  1. Great post, Lars!

    It's also important to note one of the benefits of Hadoop MapReduce over other distributed processing systems is the computation is moved to where the data lives. In many other systems, you end up reading all the input data remotely meaning it all must be sent across the network.

    Hadoop clusters typically have DataNodes and TaskTrackers co-located on each node. It does an excellent job at ensuring each map task runs on the same node as the block it is reading, meaning you don't have to send the input data across the network. The copy/shuffle/sort does imply moving things across the network, so this is partially why there is a major benefit if you only have a Map task.

    Specifically with HBase, Map tasks are encouraged to run on the same node as the RegionServer hosting the specific region it is scanning. However, it cannot be guaranteed that an HBase region's HDFS blocks will all be on that same node (though in many cases a majority of them are). This is something we are hoping to have more control over in the future. In any case, you will save at least one copy if not two. If you are processing billions of lines of log files to look for errors (needle-in-a-haystack), the savings that comes from having node-local data as input to your Map is enormous.

    ReplyDelete
  2. I'm look forward to seeing tutorial on "how to import data from a raw data file into a HBase table"

    I figured that one needs to write a MR job to generate HBase files and then move them into proper places with loadtable.rb

    I have not found any end-to-end examples on it, The one over here
    http://wiki.apache.org/hadoop/Hbase/MapReduce
    does use HBase API as far as I can tell. But whole idea is to "...harvest the pure computational power of all CPU's to crunch the data and writing it at top IO speed to its final target"

    My particular humble task is much simpler - I don't have a file to import from, all I want to fill HBase with test data, therefore I generate keys and values on the fly. I assume I don't need "map" part then?

    ReplyDelete
  3. This is great..
    I would love to see the custom split details for more optimization if you have one.

    ReplyDelete
  4. Hi,

    I have few questions.

    Q1: In RDBMS we have multiple DB schemas\oracle user instances. Similarly, can we have multiple db schemas in hbase? If yes, can we have multiple schemas one one hadoop-hbase cluster?

    If multiple schemas possible, how can we define them? Using configuration or programatically?

    Q2: can we have same column family name in multiple tables? if yes, does it impacts performance if we have same name column family in multiple tables?

    Q3: Sequential keys improves read performance and random keys improves write performance. which way one must go?

    Q4: What are best practices to improve hadoop+hbase performance?

    Q5: when one program is deleting tables, another program is accessing a row of that table. what would be impact of it? can we have some sort of lock while reading or while deleting a table?

    Q6: as everything in application is byte form, what would happen if hbase db and application are using different character set? can we synch both for some particular character set by configuration or programatically?


    Regards,
    Rashmi

    ReplyDelete