Tuesday, January 27, 2009

How to use HBase with Hadoop


I have been using Hadoop and HBase for a while now. The "raw" directory dump on the right may give you a rough idea. ;)

With each new release I went through the iterations of the supplied helper classes to scan a HBase table from within a Map/Reduce job. What I did not find was a description of how to use these classes. That has improved thanks to the relentless work of the volunteers and Michael Stack who put it all together. Without him and the rest of the team HBase would be nowhere. Anyhow, here is my spin on this issue and where I am still not sure about how to handle things:

The Hadoop Tool class is the launcher application and its purposes is to read the command line parameters and then set up a JobConf instance that holds the job details such as what classes to use to read the input, what is the mapper and reducer class, what are the classes of the key and value for each of these and so on. The command line parameters usually specify how many Map and Reduce should be run on the cluster, what table to process, what columns and so on.

Once the Tool has set up the JobConf, it runs the job on the cluster. With HBase there are special Map/Reduce classes that serve as helpers or starting point to process HBase tables. They are in the HBase path under the "mapred" package. Their names are TableInputFormat, TableReduce, TableMap, IdentityTableMap and so on.

After the job has started, the first thing that happens is the preparation for the Map phase, which is done in the InputFormat class. It serves as the filter to read the raw data and pass it into the Map phase as key/value pairs. For the HBase job this is done in the supplied TableInputFormat class. What it does is splitting the table you are scanning into chunks that can be handed to the Map instances. By the way, you are allowed to only scan one single table at a time, but any columns you want to process out of that table.

In HBase a table is physically divided into many regions, which are in turn served by different RegionServers. The way splitting is done is it maps each split to exactly one region of the table you are scanning. Because of that you may end up with a couple of thousand splits for a single table. For example, I have about >6000 regions for one table.

It is recommended by the HBase team to match the number of Map instances to the number of splits (aka table regions). Instead of always having to check the number first using for example the HBase UI I have opted to automate the computation of the number of splits to use. I simply ask the table how many "start keys" it knows of. This should equal the number of regions as each region has a new starting key:
/**
* Computes the number of regions per table.
*
* @return The total number of regions per table.
* @throws IOException When the table is not created.
*/
private int getNumberOfRegions(String tableName) throws IOException {
// sanity check
if (tableName == null) return -1;
HTable table = new HTable(hbaseConfig, tableName);
byte[][] startKeys = table.getStartKeys();
return startKeys.length;
}

Each split is one region and one region holds a start and end key to be processed. As each split is read by the Map instance a HBase Scanner is created to scan the rows of the split's keys.

Each of these rows are handed to the TableMap class, or rather a class that implements this interface. You can use the supplied IndentityTableMap since often you are simply passing on the rows to the Map step. As per the Map/Reduce process the rows then get sorted during the next phase and eventually passed on to the TableReduce class.

So what you have now is the row and all columns that were listed usually as a parameter to Tool class implementation. For example if you named "contents:,mimetype:" then those two column families are handed to you - and in this special case with all labels! If you had specified "contents:en,mimetype:en" then you would have gotten only exactly those two columns with that particular label. So leaving out the label defaults to wildcard all labels (which is because the HBase Scanner class implements it that way).

In the Reduce phase you perform what work you have to and then pass on the result to the TableOutputFormat class. Here you write out what you need and you are done.

During the process you can call upon counters to count anything you like. That is what I do to count how many documents I have in total and how many for each respective target language etc. At the end of the job run I read the counters and store the values back into HBase or MemCacheDB as meta data.

Now the question you may have is, where do I do what work? In the Map or Reduce phase? And for the latter, why do I need a formatter?

I had the same questions. My understanding is that HBase is a special Hadoop Map/Reduce case. This is because keys in HBase are unique, so doing a map first and then sorting them so that they can be reduced is not necessary. I in fact have one job that I only use a Map phase for, doing it so:
job.setNumReduceTasks(0);
job.setInputFormat(MyTextInputFormat.class);
job.setMapperClass(MyMapper.class);
job.setOutputFormat(NullOutputFormat.class);

So it is a decision you have to make on your own, do I need a particular phase or not? In the above I am not scanning HBase tables but rather read a text file stored in the DFS of Hadoop and each line is an update instruction for an existing document stored in HBase. There is no need to do the sorting or reducing.

As far as HBase scans are concerned, you may want to keep the Input/Split/Map/Sort/Reduce/Output phases, that is also why there are those base classes the excellent HBase team supplied matching that concept. Usually the IdentityTableMap class is used to pass on the rows and columns and all the work is done in the Reduce phase.

Leaves one thing left, why having a TableReduce and a TableOutputFormat class? The reason is that in the Reduce you output what needs to be "saved" - but now how. You can therefore run two very similar jobs which only differ in how they save the data by replacing the output format class.

Again, I have cases where I do not output but save back to HBase. I could easily write the records back into HBase in the reduce step, so why pass them on first? I think this is in some cases just common sense or being a "good citizen". I still have code where I am torn as to where to process the final output. Sometimes I lean this way, sometimes the other.

Other notes:
1) With HBase 0.19.0 there is now a central helper class called TableMapReduceUtil which helps setting up jobs like so:
job.setMapperClass(MyMap.class);
TableMapReduceUtil.initTableReduceJob(TABLE_NAME, MyReduce.class, job);
...

It helps you setting up all the required details for Map and/or Reduce jobs based on the supplied helper classes.

2) Writing back to the same HBase table is OK when doing it in the Reduce phase as all scanning has concluded in the Map phase beforehand, so all rows and columns are saved to an intermediate Hadoop SequenceFile internally and when you process these and write back to HBase you have no problems that there is still a scanner for the same job open reading the table.

Otherwise it is OK to write to a different HBase table even during the Map phase.

Hope that helps a little.

1 comment:

  1. i have created a table in hbase with 12 columns in each row and each column has 8 qualifiers.when i try to read complete row it returns correct value for 1:1 in row 1 but returns null for 1:2
    it reads all the columns correctly from 2 to 10….
    plz help how to solve this problem
    i m using this code for reading….it is inside for loop thar runs from 1 to 10..


    train[0][i] = Double.parseDouble(Bytes.toString (r.getValue(Bytes.toBytes(Integer.toString(i)),Bytes.toBytes(“1″))));

    train[1][i] = Double.parseDouble(Bytes.toString (r.getValue(Bytes.toBytes(Integer.toString(i)), Bytes.toBytes(“2″))));

    train[2][i] = Double.parseDouble(Bytes.toString (r.getValue(Bytes.toBytes(Integer.toString(i)), Bytes.toBytes(“3″))));

    train[3][i] = Double.parseDouble(Bytes.toString (r.getValue(Bytes.toBytes(Integer.toString(i)), Bytes.toBytes(“4″))));

    train[4][i] = Double.parseDouble(Bytes.toString (r.getValue(Bytes.toBytes(Integer.toString(i)), Bytes.toBytes(“5″))));

    train[5][i] = Double.parseDouble(Bytes.toString (r.getValue(Bytes.toBytes(Integer.toString(i)), Bytes.toBytes(“6″))));

    train[6][i] = Double.parseDouble(Bytes.toString (r.getValue(Bytes.toBytes(Integer.toString(i)), Bytes.toBytes(“7″))));

    train[7][i] = Double.parseDouble(Bytes.toString (r.getValue(Bytes.toBytes(Integer.toString(i)), Bytes.toBytes(“8″))));

    ReplyDelete