Wednesday, March 14, 2012

Hadoop, HBase, and Xceivers

Some of the configuration properties found in Hadoop have a direct effect on clients, such as HBase. One of those properties is called "dfs.datanode.max.xcievers", and belongs to the HDFS subproject. It defines the number of server side threads and - to some extent - sockets used for data connections. Setting this number too low can cause problems as you grow or increase utilization of your cluster. This post will help you to understand what happens between the client and server, and how to determine a reasonable number for this property.

The Problem

Since HBase is storing everything it needs inside HDFS, the hard upper boundary imposed by the "dfs.datanode.max.xcievers" configuration property can result in too few resources being available to HBase, manifesting itself as IOExceptions on either side of the connection. Here is an example from the HBase mailing list [1], where the following messages were initially logged on the RegionServer side: 

2008-11-11 19:55:52,451 INFO org.apache.hadoop.dfs.DFSClient: Exception in createBlockOutputStream java.io.IOException: Could not read from stream
2008-11-11 19:55:52,451 INFO org.apache.hadoop.dfs.DFSClient: Abandoning block blk_-5467014108758633036_595771
2008-11-11 19:55:58,455 WARN org.apache.hadoop.dfs.DFSClient: DataStreamer Exception: java.io.IOException: Unable to create new block.
2008-11-11 19:55:58,455 WARN org.apache.hadoop.dfs.DFSClient: Error Recovery for block blk_-5467014108758633036_595771 bad datanode[0]
2008-11-11 19:55:58,482 FATAL org.apache.hadoop.hbase.regionserver.Flusher: Replay of hlog required. Forcing server shutdown

Correlating this with the Hadoop DataNode logs revealed the following entry:

ERROR org.apache.hadoop.dfs.DataNode: DatanodeRegistration(10.10.10.53:50010,storageID=DS-1570581820-10.10.10.53-50010-1224117842339,infoPort=50075, ipcPort=50020):DataXceiver: java.io.IOException: xceiverCount 258 exceeds the limit of concurrent xcievers 256 

In this example, the low value of "dfs.datanode.max.xcievers" for the DataNodes caused the entire RegionServer to shut down. This is a really bad situation. Unfortunately, there is no hard-and-fast rule that explains how to compute the required limit. It is commonly advised to raise the number from the default of 256 to something like 4096 (see [1], [2], [3], [4], and [5] for reference). This is done by adding this property to the hdfs-site.xml file of all DataNodes (note that it is misspelled):

  <property>
    <name>dfs.datanode.max.xcievers</name>
    <value>4096</value>
  </property>

Note: You will need to restart your DataNodes after making this change to the configuration file.

This should help with the above problem, but you still might want to know more about how this all plays together, and what HBase is doing with these resources. We will discuss this in the remainder of this post. But before we do, we need to be clear about why you cannot simply set this number very high, say 64K and be done with it. There is a reason for an upper boundary, and it is twofold: first, threads need their own stack, which means they occupy memory. For current servers this means 1MB per thread[6] by default. In other words, if you use up all the 4096 DataXceiver threads, you need around 4GB of heap to accommodate them. This cuts into the space you have assigned for memstores and block caches, as well as all the other moving parts of the JVM. In a worst case scenario, you might run into an OutOfMemoryException, and the RegionServer process is toast. You want to set this property to a reasonably high number, but not too high either.

Second, having these many threads active you will also see your CPU becoming increasingly loaded. There will be many context switches happening to handle all the concurrent work, which takes away resources for the real work. As with the concerns about memory, you want the number of threads not grow boundlessly, but provide a reasonable upper boundary - and that is what "dfs.datanode.max.xcievers" is for.

Hadoop File System Details

From the client side, the HDFS library is providing the abstraction called Path. This class represents a file in a file system supported by Hadoop, represented by the FileSystem class. There are a few concrete implementations of the abstract FileSystem class, one of which is the DistributedFileSytem, representing HDFS. This class in turn wraps the actual DFSClient class that handles all interactions with the remote servers, i.e. the NameNode and the many DataNodes.

When a client, such as HBase, opens a file, it does so by, for example, calling the open() or create() methods of the FileSystem class, here the most simplistic incarnations

  public DFSInputStream open(String src) throws IOException
  public FSDataOutputStream create(Path f) throws IOException

The returned stream instance is what needs a server-side socket and thread, which are used to read and write blocks of data. They form part of the contract to exchange data between the client and server. Note that there are other, RPC-based protocols in use between the various machines, but for the purpose of this discussion they can be ignored.

The stream instance returned is a specialized DFSOutputStream or DFSInputStream class, which handle all of the interaction with the NameNode to figure out where the copies of the blocks reside, and the data communication per block per DataNode.

On the server side, the DataNode wraps an instance of DataXceiverServer, which is the actual class that reads the above configuration key and also throws the above exception when the limit is exceeded.

When the DataNode starts, it creates a thread group and starts the mentioned DataXceiverServer instance like so:

  this.threadGroup = new ThreadGroup("dataXceiverServer");
  this.dataXceiverServer = new Daemon(threadGroup,
      new DataXceiverServer(ss, conf, this));
  this.threadGroup.setDaemon(true); // auto destroy when empty 

Note that the DataXceiverServer thread is already taking up one spot of the thread group. The DataNode also has this internal class to retrieve the number of currently active threads in this group:

  /** Number of concurrent xceivers per node. */
  int getXceiverCount() {
    return threadGroup == null ? 0 : threadGroup.activeCount();
  }

Reading and writing blocks, as initiated by the client, causes for a connection to be made, which is wrapped by the DataXceiverServer thread into a DataXceiver instance. During this hand off, a thread is created and registered in the above thread group. So for every active read and write operation a new thread is tracked on the server side. If the count of threads in the group exceeds the configured maximum then the said exception is thrown and recorded in the DataNode's logs:

  if (curXceiverCount > dataXceiverServer.maxXceiverCount) {
    throw new IOException("xceiverCount " + curXceiverCount
                          + " exceeds the limit of concurrent xcievers "
                          + dataXceiverServer.maxXceiverCount);
  }

Implications for Clients

Now, the question is, how does the client reading and writing relate to the server side threads. Before we go into the details though, let's use the debug information that the DataXceiver class logs when it is created and closed

  LOG.debug("Number of active connections is: " + datanode.getXceiverCount());
  ...
  LOG.debug(datanode.dnRegistration + ":Number of active connections is: " 
    + datanode.getXceiverCount());

and monitor during a start of HBase what is logged on the DataNode. For simplicity's sake this is done on a pseudo distributed setup with a single DataNode and RegionServer instance. The following shows the top of the RegionServer's status page.

The important part is in the "Metrics" section, where it says "storefiles=22". So, assuming that HBase has at least that many files to handle, plus some extra files for the write-ahead log, we should see the above logs message state that we have at least 22 "active connections". Let's start HBase and check the DataNode and RegionServer log files:

Command Line:

$ bin/start-hbase.sh
...

DataNode Log:

2012-03-05 13:01:35,309 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 1
2012-03-05 13:01:35,315 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 2
12/03/05 13:01:35 INFO regionserver.MemStoreFlusher: globalMemStoreLimit=396.7m, globalMemStoreLimitLowMark=347.1m, maxHeap=991.7m
12/03/05 13:01:39 INFO http.HttpServer: Port returned by webServer.getConnectors()[0].getLocalPort() before open() is -1. Opening the listener on 60030
2012-03-05 13:01:40,003 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 1
12/03/05 13:01:40 INFO regionserver.HRegionServer: Received request to open region: -ROOT-,,0.70236052
2012-03-05 13:01:40,882 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 3
2012-03-05 13:01:40,884 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4
2012-03-05 13:01:40,888 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 3
2012-03-05 13:01:40,902 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4
2012-03-05 13:01:40,907 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 3
2012-03-05 13:01:40,909 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4
2012-03-05 13:01:40,910 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 3
2012-03-05 13:01:40,911 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4
2012-03-05 13:01:40,915 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 3
2012-03-05 13:01:40,917 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 4
2012-03-05 13:01:40,917 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4
2012-03-05 13:01:40,919 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4
12/03/05 13:01:40 INFO regionserver.HRegion: Onlined -ROOT-,,0.70236052; next sequenceid=63083
2012-03-05 13:01:40,982 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 3
2012-03-05 13:01:40,983 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4
2012-03-05 13:01:40,985 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 3
2012-03-05 13:01:40,987 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4
2012-03-05 13:01:40,987 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 4
2012-03-05 13:01:40,989 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: .META.,,1.1028785192
2012-03-05 13:01:41,026 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 3
2012-03-05 13:01:41,027 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4
2012-03-05 13:01:41,028 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 3
2012-03-05 13:01:41,029 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4
2012-03-05 13:01:41,035 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 3
2012-03-05 13:01:41,037 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4
2012-03-05 13:01:41,038 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 3
2012-03-05 13:01:41,040 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4
2012-03-05 13:01:41,044 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 3
2012-03-05 13:01:41,047 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4
2012-03-05 13:01:41,048 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 3
2012-03-05 13:01:41,051 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined .META.,,1.1028785192; next sequenceid=63082
2012-03-05 13:01:41,109 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 3
2012-03-05 13:01:41,114 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 4
2012-03-05 13:01:41,117 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 5
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open 16 region(s)
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: usertable,,1330944810191.62a312d67981c86c42b6bc02e6ec7e3f.
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: usertable,user1120311784,1330944810191.90d287473fe223f0ddc137020efda25d.
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: usertable,user1240813553,1330944811370.12c95922805e2cb5274396a723a94fa8.
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: usertable,user1361265841,1330944811370.80663fcf291e3ce00080599964f406ba.
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: usertable,user1481880893,1330944827566.fb3cc692757825e24295042fd42ff07c.
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: usertable,user1602709751,1330944827566.dbd84a9c2a2e450799b10e7408e3e12e.
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: usertable,user1723694337,1330944838296.cb9e191f0d8c0d8b64c192a52e35a6f0.
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: usertable,user1844378668,1330944838296.577cc1efe165859be1341a9e1c566b12.
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: usertable,user1964968041,1330944848231.dd89596e9129e1caa7e07f8a491c9734.
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: usertable,user2084845901,1330944848231.24413155fef16ebb00213b8072bc266b.
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: usertable,user273134820,1330944848768.86d2a0254822edc967c0f27fe53e7734.
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: usertable,user394249140,1330944848768.df37ded27f9ba0f9ed76b2eb05db6c4e.
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: usertable,user515290649,1330944849739.d23924dc9e9d5891f332c337977af83d.
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: usertable,user63650623,1330944849739.a3b9f64e6abee00a2f39c33efa7ae8a2.
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: usertable,user757669512,1330944850808.cd0d6f16d8ae9cf0c9277f5d6c6c6b9f.
12/03/05 13:01:41 INFO regionserver.HRegionServer: Received request to open region: usertable,user878854551,1330944850808.9b8667b6d3c0baaaea491527cd781357.
2012-03-05 13:01:41,246 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 13:01:41,248 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 7
2012-03-05 13:01:41,249 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 8
2012-03-05 13:01:41,256 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 9
2012-03-05 13:01:41,257 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 10
2012-03-05 13:01:41,257 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 9
2012-03-05 13:01:41,257 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 8
2012-03-05 13:01:41,257 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 7
2012-03-05 13:01:41,259 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 8
2012-03-05 13:01:41,260 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 9
2012-03-05 13:01:41,264 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 8
2012-03-05 13:01:41,283 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined usertable,user1120311784,1330944810191.90d287473fe223f0ddc137020efda25d.; next sequenceid=62917
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined usertable,,1330944810191.62a312d67981c86c42b6bc02e6ec7e3f.; next sequenceid=62916
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined usertable,user1240813553,1330944811370.12c95922805e2cb5274396a723a94fa8.; next sequenceid=62918
2012-03-05 13:01:41,360 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 13:01:41,361 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 7
2012-03-05 13:01:41,366 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 8
2012-03-05 13:01:41,366 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 7
2012-03-05 13:01:41,366 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 8
2012-03-05 13:01:41,368 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 9
2012-03-05 13:01:41,369 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 8
2012-03-05 13:01:41,370 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 9
2012-03-05 13:01:41,370 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 8
2012-03-05 13:01:41,372 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 9
2012-03-05 13:01:41,375 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 8
2012-03-05 13:01:41,377 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined usertable,user1481880893,1330944827566.fb3cc692757825e24295042fd42ff07c.; next sequenceid=62919
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined usertable,user1602709751,1330944827566.dbd84a9c2a2e450799b10e7408e3e12e.; next sequenceid=62920
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined usertable,user1361265841,1330944811370.80663fcf291e3ce00080599964f406ba.; next sequenceid=62919
2012-03-05 13:01:41,474 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 13:01:41,491 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 7
2012-03-05 13:01:41,495 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 8
2012-03-05 13:01:41,508 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 7
2012-03-05 13:01:41,512 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 8
2012-03-05 13:01:41,513 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
2012-03-05 13:01:41,514 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 13:01:41,521 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
2012-03-05 13:01:41,533 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined usertable,user1844378668,1330944838296.577cc1efe165859be1341a9e1c566b12.; next sequenceid=62918
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined usertable,user1723694337,1330944838296.cb9e191f0d8c0d8b64c192a52e35a6f0.; next sequenceid=62917
2012-03-05 13:01:41,540 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
2012-03-05 13:01:41,542 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 13:01:41,548 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined usertable,user1964968041,1330944848231.dd89596e9129e1caa7e07f8a491c9734.; next sequenceid=62920
2012-03-05 13:01:41,618 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 13:01:41,621 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
2012-03-05 13:01:41,636 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 13:01:41,649 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 7
2012-03-05 13:01:41,650 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 8
2012-03-05 13:01:41,654 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
2012-03-05 13:01:41,662 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 13:01:41,665 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined usertable,user2084845901,1330944848231.24413155fef16ebb00213b8072bc266b.; next sequenceid=62921
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined usertable,user273134820,1330944848768.86d2a0254822edc967c0f27fe53e7734.; next sequenceid=62924
2012-03-05 13:01:41,673 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 13:01:41,677 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
2012-03-05 13:01:41,679 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 13:01:41,687 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined usertable,user394249140,1330944848768.df37ded27f9ba0f9ed76b2eb05db6c4e.; next sequenceid=62925
2012-03-05 13:01:41,790 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 13:01:41,800 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
2012-03-05 13:01:41,806 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 13:01:41,815 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 7
2012-03-05 13:01:41,819 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 8
2012-03-05 13:01:41,821 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
2012-03-05 13:01:41,824 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 13:01:41,825 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 7
2012-03-05 13:01:41,827 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 8
2012-03-05 13:01:41,829 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined usertable,user515290649,1330944849739.d23924dc9e9d5891f332c337977af83d.; next sequenceid=62926
2012-03-05 13:01:41,832 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 13:01:41,838 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined usertable,user757669512,1330944850808.cd0d6f16d8ae9cf0c9277f5d6c6c6b9f.; next sequenceid=62929
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined usertable,user63650623,1330944849739.a3b9f64e6abee00a2f39c33efa7ae8a2.; next sequenceid=62927
2012-03-05 13:01:41,891 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 13:01:41,893 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
2012-03-05 13:01:41,894 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 13:01:41,896 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 7
12/03/05 13:01:41 INFO regionserver.HRegion: Onlined usertable,user878854551,1330944850808.9b8667b6d3c0baaaea491527cd781357.; next sequenceid=62930
2012-03-05 14:01:39,514 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 5
2012-03-05 14:01:39,711 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 4
2012-03-05 22:48:41,945 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4
12/03/05 22:48:41 INFO regionserver.HRegion: Onlined usertable,user757669512,1330944850808.cd0d6f16d8ae9cf0c9277f5d6c6c6b9f.; next sequenceid=62929
2012-03-05 22:48:41,963 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 4

You can see how the regions are opened one after the other, but what you also might notice is that the number of active connections never climbs to 22 - it barely even reaches 10. Why is that? To understand this better, we have to see how files in HDFS map to the server-side DataXceiver's instance - and the actual threads they represent. 

Hadoop Deep Dive

The aforementioned DFSInputStream and DFSOutputStream are really facades around the usual stream concepts. They wrap the client-server communication into these standard Java interfaces, while internally routing the traffic to a selected DataNode - which is the one that holds a copy of the current block. It has the liberty to open and close these connection as needed. As a client reads a file in HDFS, the client library classes switch transparently from block to block, and therefore from DataNode to DataNode, so it has to open and close connections as needed. 

The DFSInputStream has an instance of a DFSClient.BlockReader class, that opens the connection to the DataNode. The stream instance calls blockSeekTo() for every call to read() which takes care of opening the connection, if there is none already. Once a block is completely read the connection is closed. Closing the stream has the same effect of course. 

The DFSOutputStream has a similar helper class, the DataStreamer. It tracks the connection to the server, which is initiated by the nextBlockOutputStream() method. It has further internal classes that help with writing the block data out, which we omit here for the sake of brevity.

Both writing and reading blocks requires a thread to hold the socket and intermediate data on the server-side, wrapped in the DataXceiver instance. Depending what your client is doing, you will see the number of connections fluctuate around the number of currently accessed files in HDFS.

Back to the HBase riddle above: the reason you do not see up to 22 (and more) connections during the start is that while the regions open, the only required data is the HFile's info block. This block is read to gain vital details about each file, but then closed again. This means that the server-side resource is released in quick succession. The remaining four connections are harder to determine. You can use JStack to dump all threads on the DataNode, which in this example shows this entry:

"DataXceiver for client /127.0.0.1:64281 [sending block blk_5532741233443227208_4201]" daemon prio=5 tid=7fb96481d000 nid=0x1178b4000 runnable [1178b3000]
   java.lang.Thread.State: RUNNABLE
   ...

"DataXceiver for client /127.0.0.1:64172 [receiving block blk_-2005512129579433420_4199 client=DFSClient_hb_rs_10.0.0.29,60020,1330984111693_1330984118810]" daemon prio=5 tid=7fb966109000 nid=0x1169cb000 runnable [1169ca000]
   java.lang.Thread.State: RUNNABLE
   ...

These are the only DataXceiver entries (in this example), so the count in the thread group is a bit misleading. Recall that the DataXceiverServer daemon thread already accounts for one extra entry, which combined with the two above accounts for the three active connections - which in fact means three active threads. The reason the log states four instead, is that it logs the count from an active thread that is about to finish. So, shortly after the count of four is logged, it is actually one less, i.e. three and hence matching our head count of active threads.

Also note that the internal helper classes, such as the PacketResponder occupy another thread in the group while being active. The JStack output does indicate that fact, listing the thread as such:

"PacketResponder 0 for Block blk_-2005512129579433420_4199" daemon prio=5 tid=7fb96384d000 nid=0x116ace000 in Object.wait() [116acd000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
     at java.lang.Object.wait(Native Method)
     at org.apache.hadoop.hdfs.server.datanode.BlockReceiver$PacketResponder.lastDataNodeRun(BlockReceiver.java:779)
     - locked <7bc79c030> (a org.apache.hadoop.hdfs.server.datanode.BlockReceiver$PacketResponder)
     at org.apache.hadoop.hdfs.server.datanode.BlockReceiver$PacketResponder.run(BlockReceiver.java:870)
     at java.lang.Thread.run(Thread.java:680)

This thread is currently in TIMED_WAITING state and is not considered active. That is why the count emitted by the DataXceiver log statements is not including these kind of threads. If they become active due to the client sending sending data, the active thread count will go up again. Another thing to note its that this thread does not need a separate connection, or socket, between the client and the server. The PacketResponder is just a thread on the server side to receive block data and stream it to the next DataNode in the write pipeline.

The Hadoop fsck command also has an option to report what files are currently open for writing:

$ hadoop fsck /hbase -openforwrite
FSCK started by larsgeorge from /10.0.0.29 for path /hbase at Mon Mar 05 22:59:47 CET 2012
....../hbase/.logs/10.0.0.29,60020,1330984111693/10.0.0.29%3A60020.1330984118842 0 bytes, 1 block(s), OPENFORWRITE: ......................................Status: HEALTHY
 Total size:     2088783626 B
 Total dirs:     54
 Total files:    45
 ...

This does not immediately relate to an occupied server-side thread, as these are allocated by block ID. But you can glean from it, that there is one open block for writing. The Hadoop command has additional options to print out the actual files and block ID they are comprised of:

$ hadoop fsck /hbase -files -blocks
FSCK started by larsgeorge from /10.0.0.29 for path /hbase at Tue Mar 06 10:39:50 CET 2012
...
/hbase/.META./1028785192/.tmp <dir>
/hbase/.META./1028785192/info <dir>
/hbase/.META./1028785192/info/4027596949915293355 36517 bytes, 1 block(s):  OK
0. blk_5532741233443227208_4201 len=36517 repl=1
...
Status: HEALTHY
 Total size:     2088788703 B
 Total dirs:     54
 Total files:     45 (Files currently being written: 1)
 Total blocks (validated):     64 (avg. block size 32637323 B) (Total open file blocks (not validated): 1)
 Minimally replicated blocks:     64 (100.0 %)
 ...

This gives you two things. First, the summary states that there is one open file block at the time the command ran - matching the count reported by the "-openforwrite" option above. Secondly, the list of blocks next to each file lets you match the thread name to the file that contains the block being accessed. In this example the block with the ID "blk_5532741233443227208_4201" is sent from the server to the client, here a RegionServer. This block belongs to the HBase .META. table, as shown by the output of the Hadoop fsck command. The combination of JStack and fsck can serve as a poor mans replacement for lsof (a tool on the Linux command line to "list open files").

The JStack also reports that there is a DataXceiver thread, with an accompanying PacketResponder, for block ID "blk_-2005512129579433420_4199", but this ID is missing from the list of blocks reported by fsck. This is because the block is not yet finished and therefore not available to readers. In other words, Hadoop fsck only reports on complete (or synced[7][8], for Hadoop version that support this feature) blocks. 


Practical Example

We can verify this using the HBase JRuby shell. For this exercise we should stop HBase, which will close out all open files, remove all active DataXceiver threads from the JStack output, and reduce the number of active connections as reported by the DataNode's debug logs to one - the server thread, as you know by now.

Write Data

Let's start with the process of writing data. Open the HBase shell and in another terminal check the content of the file system:

$ hadoop dfs -ls /
Found 3 items
drwxr-xr-x   - larsgeorge supergroup          0 2012-03-04 14:13 /Users
drwxr-xr-x   - larsgeorge supergroup          0 2011-11-15 11:17 /Volumes
drwxr-xr-x   - larsgeorge supergroup          0 2012-03-05 11:41 /hbase

Now in the HBase shell, we enter these commands, while at the same time checking the output of the DataNode logs:

$ hbase shell
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 0.90.4-cdh3u2, r, Thu Oct 13 20:32:26 PDT 2011

hbase(main):001:0> import org.apache.hadoop.hdfs.DFSClient
=> Java::OrgApacheHadoopHdfs::DFSClient
hbase(main):002:0> import org.apache.hadoop.conf.Configuration
=> Java::OrgApacheHadoopConf::Configuration
hbase(main):003:0> dfs = DFSClient.new(Configuration.new)
=> #<Java::OrgApacheHadoopHdfs::DFSClient:0x552c937c>
hbase(main):004:0> s = dfs.create('/testfile', false)
12/03/06 11:54:51 DEBUG hdfs.DFSClient: /testfile: masked=rwxr-xr-x
12/03/06 11:54:51 DEBUG hdfs.DFSClient: computePacketChunkSize: src=/testfile, chunkSize=516, chunksPerPacket=127, packetSize=65557
=> #<#<Class:0x1521bfefb>:0x3abb1bc4>

It means we have created an output stream to a file in HDFS. So far this file only appears in the HDFS ls command, but not in the JStack threads, nor in the logs as an increased active connections count.

$ hadoop dfs -ls /
Found 4 items
drwxr-xr-x   - larsgeorge supergroup          0 2012-03-04 14:13 /Users
drwxr-xr-x   - larsgeorge supergroup          0 2011-11-15 11:17 /Volumes
drwxr-xr-x   - larsgeorge supergroup          0 2012-03-05 11:41 /hbase
-rw-r--r--   1 larsgeorge supergroup          0 2012-03-06 11:54 /testfile

The file size is zero bytes as expected. This operation we have just performed, is a pure meta operation, only involving the NameNode. We have not written anything yet, so no DataNode is involved. We start to write into the stream like so:

hbase(main):005:0> s.write(1)
hbase(main):006:0> s.write(1)
hbase(main):007:0> s.write(1)

Again, nothing changes, no block is being generated, because the data is buffered on the client side. We have the choice to close or sync the data to HDFS next to set the wheels in motion. Using close would quickly write the block and then close everything down - too quick for us to observe the thread creation. Only the logs should state the increase and decrease of the thread count. We rather use sync to flush out the few bytes we have written, but keep the block open for writing:

hbase(main):008:0> s.sync   
12/03/06 12:04:04 DEBUG hdfs.DFSClient: DFSClient writeChunk allocating new packet seqno=0, src=/testfile, packetSize=65557, chunksPerPacket=127, bytesCurBlock=0
12/03/06 12:04:04 DEBUG hdfs.DFSClient: DFSClient flush() : saveOffset 0 bytesCurBlock 3 lastFlushOffset 0
12/03/06 12:04:04 DEBUG hdfs.DFSClient: Allocating new block
12/03/06 12:04:04 DEBUG hdfs.DFSClient: pipeline = 127.0.0.1:50010
12/03/06 12:04:04 DEBUG hdfs.DFSClient: Connecting to 127.0.0.1:50010
12/03/06 12:04:04 DEBUG hdfs.DFSClient: Send buf size 131072
12/03/06 12:04:04 DEBUG hdfs.DFSClient: DataStreamer block blk_1029135823215149276_4209 wrote packet seqno:0 size:32 offsetInBlock:0 lastPacketInBlock:false
12/03/06 12:04:04 DEBUG hdfs.DFSClient: DFSClient Replies for seqno 0 are SUCCESS

Note: The logging level of HBase is set to DEBUG, which causes for the above log statements to be printed on the console.

The logs show an increase by one of the number of active connections, and JStack lists the two threads involved in the writing process:

DataNode Log:

2012-03-06 12:04:04,457 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 1

JStack Output:

"PacketResponder 0 for Block blk_1029135823215149276_4209" daemon prio=5 tid=7fb96395e800 nid=0x116ace000 in Object.wait() [116acd000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
     at java.lang.Object.wait(Native Method)
     at org.apache.hadoop.hdfs.server.datanode.BlockReceiver$PacketResponder.lastDataNodeRun(BlockReceiver.java:779)
     - locked <7bcf01050> (a org.apache.hadoop.hdfs.server.datanode.BlockReceiver$PacketResponder)
     at org.apache.hadoop.hdfs.server.datanode.BlockReceiver$PacketResponder.run(BlockReceiver.java:870)
     at java.lang.Thread.run(Thread.java:680)

...

"DataXceiver for client /127.0.0.1:50556 [receiving block blk_1029135823215149276_4209 client=DFSClient_-1401479495]" daemon prio=5 tid=7fb9660c7000 nid=0x1169cb000 runnable [1169ca000]
   java.lang.Thread.State: RUNNABLE
     at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
     at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:136)
     ...

Closing the stream will finalize the block, close all open threads that were needed (two in this example), and log the decreased active connection count: 

HBase Shell:

hbase(main):011:0> s.close
12/03/06 12:08:36 DEBUG hdfs.DFSClient: DFSClient writeChunk allocating new packet seqno=1, src=/testfile, packetSize=65557, chunksPerPacket=127, bytesCurBlock=0
12/03/06 12:08:36 DEBUG hdfs.DFSClient: DataStreamer block blk_1029135823215149276_4209 wrote packet seqno:1 size:32 offsetInBlock:0 lastPacketInBlock:true
12/03/06 12:08:36 DEBUG hdfs.DFSClient: DFSClient Replies for seqno 1 are SUCCESS
12/03/06 12:08:36 DEBUG hdfs.DFSClient: Closing old block blk_1029135823215149276_4209

DataNode Log:

2012-03-06 12:08:36,551 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 2

The interesting part about the log statements is that they are printed before the thread is started, and before it is ended, meaning it will show one less, and one too many respectively. Also recall, that while the block is written to, it is accounted for in the Hadoop fsck's "-blocks" or "-openforwrite".

Read Data

Pretty much the same overall goes for reading data:

hbase(main):012:0> r = dfs.open('/testfile')        
=> #<Java::OrgApacheHadoopHdfs::DFSClient::DFSInputStream:0xde81d48>

When you create the input stream, nothing happens on the server-side, as the client has yet to indicate what part of the file it wants to read. If you start to read, the client is routed to the proper server using the "seek to" call internally:

DataNode Log:

2012-03-06 12:18:02,055 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 1

HBase Shell:

hbase(main):013:0> r.read
12/03/06 12:18:02 DEBUG fs.FSInputChecker: DFSClient readChunk got seqno 0 offsetInBlock 0 lastPacketInBlock true packetLen 11
=> 1

DataNode Log:

2012-03-06 12:18:02,110 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 2

Since an entire buffer size - configured with "io.file.buffer.size", and set to 4096 (4KB) by default - worth of data is read, and our file was very small (3 Bytes), it was read in one go and the server-side thread was released right away. If we were to read a larger file, then a connection remains open for reading of chunk after chunk of the entire block. We can pick a large file from the HBase directory, open the input stream, and start reading byte by byte:

Command Line:

$ hadoop dfs -lsr /hbase
drwxr-xr-x   - larsgeorge supergroup          0 2012-03-03 12:33 /hbase/-ROOT-
...
-rw-r--r--   1 larsgeorge supergroup  111310855 2012-03-05 11:54 /hbase/usertable/12c95922805e2cb5274396a723a94fa8/family/1454340804524549239
...

HBase Shell:

hbase(main):015:0> r2 = dfs.open('/hbase/usertable/12c95922805e2cb5274396a723a94fa8/family/1454340804524549239')
=> #<Java::OrgApacheHadoopHdfs::DFSClient::DFSInputStream:0x92524b0>
hbase(main):016:0> r2.read                                                                                     
12/03/06 12:26:12 DEBUG fs.FSInputChecker: DFSClient readChunk got seqno 0 offsetInBlock 0 lastPacketInBlock false packetLen 66052
=> 68

DataNode Log:

2012-03-06 12:26:12,294 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 1

HBase Shell:

hbase(main):017:0> r2.read
=> 65
hbase(main):018:0> r2.read
=> 84

JStack Output:

"DataXceiver for client /127.0.0.1:52504 [sending block blk_7972699930188289769_4166]" daemon prio=5 tid=7fb965961800 nid=0x116670000 runnable [11666f000]
   java.lang.Thread.State: RUNNABLE
     at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
     ...

The JStack output shows how the thread is kept running on the server to serve more data as requested by the client. If you close the stream, the resource is freed subsequently, just as expected:

HBase Shell:

hbase(main):019:0> r2.close                                                                                     

DataNode Log:

2012-03-06 12:31:16,659 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 2

So after the thread really exits (should be in the nano- or millisecond range) the count will go back to the minimum of one - which is the DataXceiverServer thread, yes you are correct. :)


Back to HBase

Opening all the regions does not need as many resources on the server as you would have expected. If you scan the entire HBase table though, you force HBase to read all of the blocks in all HFiles:

HBase Shell:

hbase(main):003:0> scan 'usertable'
...
1000000 row(s) in 1460.3120 seconds

DataNode Log:

2012-03-05 14:42:20,580 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 6
2012-03-05 14:43:23,293 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 7
2012-03-05 14:43:23,299 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 8
2012-03-05 14:44:00,255 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 7
2012-03-05 14:44:53,566 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 8
2012-03-05 14:44:53,567 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 7
2012-03-05 14:45:33,562 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 8
2012-03-05 14:46:25,074 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 9
2012-03-05 14:46:25,075 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 10
2012-03-05 14:47:07,854 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 9
2012-03-05 14:47:58,244 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 10
2012-03-05 14:47:58,244 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 9
2012-03-05 14:48:30,010 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 10
2012-03-05 14:49:24,332 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 11
2012-03-05 14:49:24,332 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 10
2012-03-05 14:49:59,987 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 11
2012-03-05 14:51:12,603 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 12
2012-03-05 14:51:12,605 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 11
2012-03-05 14:51:46,473 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 12
2012-03-05 14:52:37,052 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 13
2012-03-05 14:52:37,053 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 12
2012-03-05 14:53:20,047 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 13
2012-03-05 14:54:11,859 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 14
2012-03-05 14:54:11,860 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 15
2012-03-05 14:54:43,615 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 14
2012-03-05 14:55:36,214 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 15
2012-03-05 14:55:36,215 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 14
2012-03-05 14:56:10,440 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 15
2012-03-05 14:56:59,419 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 16
2012-03-05 14:56:59,420 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 15
2012-03-05 14:57:31,722 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 16
2012-03-05 14:58:24,909 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 17
2012-03-05 14:58:24,910 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 16
2012-03-05 14:58:57,186 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 17
2012-03-05 14:59:47,294 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 18
2012-03-05 14:59:47,294 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 17
2012-03-05 15:00:23,101 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 18
2012-03-05 15:01:14,853 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 19
2012-03-05 15:01:14,854 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 18
2012-03-05 15:01:47,388 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 19
2012-03-05 15:02:39,900 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 20
2012-03-05 15:02:39,901 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 19
2012-03-05 15:03:18,794 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 20
2012-03-05 15:04:17,688 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 21
2012-03-05 15:04:17,689 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 22
2012-03-05 15:04:54,545 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 21
2012-03-05 15:05:55,901 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(127.0.0.1:50010, storageID=DS-1423642448-10.0.0.64-50010-1321352233772, infoPort=50075, ipcPort=50020):Number of active connections is: 22
2012-03-05 15:05:55,901 DEBUG org.apache.hadoop.hdfs.server.datanode.DataNode: Number of active connections is: 21

The number of active connections reaches the elusive 22 now. Note that this count already includes the server thread, so we are still a little short of what we could consider the theoretical maximum - based on the number of files HBase has to handle.

What does that all mean?

So, how many "xcievers (sic)" do you need? Given you only use HBase, you could simply monitor the above "storefiles" metric (which you get also through Ganglia or JMX) and add a few percent for intermediate and write-ahead log files. This should work for systems in motion. However, if you were to determine that number on an idle, fully compacted system and assume it is the maximum, you might find this number being too low once you start adding more store files during regular memstore flushes, i.e. as soon as you start to add data to the HBase tables. Or if you also use MapReduce on that same cluster, Flume log aggregation, and so on. You will need to account for those extra files, and, more importantly, open blocks for reading and writing. 

Note again that the examples in this post are using a single DataNode, something you will not have on a real cluster. To that end, you will have to divide the total number of store files (as per the HBase metric) by the number of DataNodes you have. If you have, for example, a store file count of 1000, and your cluster has 10 DataNodes, then you should be OK with the default of 256 xceiver threads per DataNode.

The worst case would be the number of all active readers and writers, i.e. those that are currently sending or receiving data. But since this is hard to determine ahead of time, you might want to consider building in a decent reserve. Also, since the writing process needs an extra - although shorter lived - thread (for the PacketResponder) you have to account for that as well. So a reasonable, but rather simplistic formula could be:

This formula takes into account that you need about two threads for an active writer and another for an active reader. This is then summed up and divided by the number of DataNodes, since you have to specify the "dfs.datanode.max.xcievers" per DataNode.

If you loop back to the HBase RegionServer screenshot above, you saw that there were 22 store files. These are immutable and will only be read, or in other words occupy one thread only. For all memstores that are flushed to disk you need two threads - but only until they are fully written. The files are finalized and closed for good, cleaning up any thread in the process. So these come and go based on your flush frequency. Same goes for compactions, they will read N files and write them into a single new one, then finalize the new file. As for the write-ahead logs, these will occupy a thread once you have started to add data to any table. There is a log file per server, meaning that you can only have twice as many active threads for these files as you have RegionServers.

For a pure HBase setup (HBase plus its own HDFS, with no other user), we can estimate the number of needed DataXceiver's with the following formula:

Since you will be hard pressed to determine the active number of store files, flushes, and so on, it might be better to estimate the theoretical maximum instead. This maximum value takes into account that you can only have a single flush and compaction active per region at any time. The maximum number of logs you can have active matches the number of RegionServers, leading us to this formula:

Obviously, the number of store files will increase over time, and the number of regions typically as well. Same for the numbers of servers, so keep in mind to adjust this number over time. In practice, you can add a buffer of, for example, 20%, as shown in the formula below - in an attempt to not force you to change the value too often. 

On the other hand, if you keep the number of regions fixed per server[9], and rather split them manually, while adding new servers as you grow, you should be able to keep this configuration property stable for each server.

Final Advice & TL;DR

Here is the final formula you want to use:

It computes the maximum number of threads needed, based on your current HBase vitals (no. of store files, regions, and region servers). It also adds a fudge factor of 20% to give you room for growth. Keep an eye on the numbers on a regular basis and adjust the value as needed. You might want to use Nagios with appropriate checks to warn you when any of the vitals goes over a certain percentage of change.

Note: Please make sure you also adjust the number of file handles your process is allowed to use accordingly[10]. This affects the number of sockets you can use, and if that number is too low (default is often 1024), you will get connection issues first. 

Finally, the engineering devil on one of your shoulders should already have started to snicker about how horribly non-Erlang-y this is, and how you should use an event driven approach, possibly using Akka with Scala[11] - if you want to stay within the JVM world. Bear in mind though that the clever developers in the community share the same thoughts and have already started to discuss various approaches[12][13]. 

Links:

Tuesday, October 26, 2010

Hadoop on EC2 - A Primer

As more and more companies discover the power of Hadoop and how it solves complex analytical problems it seems that there is a growing interest to quickly prototype new solutions - possibly on short lived or "throw away" cluster setups. Amazon's EC2 provides an ideal platform for such prototyping and there are a lot of great resources on how this can be done. I would like to mention "Tracking Trends with Hadoop and Hive on EC2" on the Cloudera Blog by Pete Skomoroch and "Running Hadoop MapReduce on Amazon EC2 and Amazon S3" by Tom White. They give you full examples of how to process data stored on S3 using EC2 servers. Overall there seems to be a common need to quickly get insight into what a Hadoop and Hive based cluster can add in terms of business value. In this post I would like to take a step back though from the above full featured examples and show how you can use Amazon's services to set up an Hadoop cluster with the focus on the more "nitty gritty" details that are more difficult to find answers for.

Starting a Cluster

Let's jump into it head first and solve the problem of actually launching a cluster. You have heard that Hadoop is shipped with EC2 support, but how do you actually start up a Hadoop cluster on EC2? You do have a couple of choices and as Tom's article above explains you could start all instances in the cluster by hand. But why would you want to do that if there are scripts available that do all the work for you? And to complicate matters, how do you select the AMI (the Amazon Machine Image) that has the Hadoop version you need or want? Does it have Hive installed for your subsequent analysis of the collected data? Just running a check to count the available public Hadoop images returns 41!

$ ec2-describe-images -a | grep hadoop | wc -l
41

That gets daunting very quickly. Sure you can roll your own - but that implies even more manual labor that you probably better spend on productive work. But there is help available...

By far one of the most popular way to install Hadoop today is using Cloudera's Distribution for Hadoop - also known as CDH. It packs all the tools you usually need into easy to install packages and pre-configures everything for typical workloads. Sweet! And since it also offers each "HStack" tool as a separate installable package you can decide what you need and install additional applications just as you need. We will make use of that feature below and also of other advanced configuration options.

There are not one but at least three script packages available to start a Hadoop cluster. The following table lists the most prominent ones:

Name Vendor Language Fixed Packages Notes
Hadoop EC2 Scripts Apache Hadoop Bash Yes Requires special Hadoop AMIs.
CDH Cloud Scripts Cloudera Python No Fixed to use CDH packages.
Whirr Apache Whirr Python No Not yet on same level feature wise compared to CDH Cloud Scripts. Can run plain Apache Hadoop images as well as CDH. Supports multiple cloud vendors.

They are ordered by their availability date, so the first available was the Bash based "Hadoop EC2 Scripts" contribution packages. It is part of the Apache Hadoop tarball and can start selected AMIs with Hadoop preinstalled on them. While you may be able to customize the init script to install additional packages you are bound to whatever version of Hadoop the AMI provides. This limitation is overcome with the CDH Cloud Scripts and Apache Whirr, which is the successor to the CDH scripts. All three EC2 script packages were created by Cloudera's own Tom White, so you may notice similarities between them. In general you could say that each extends on the former while applying what has been learned during their usage in the real world. Also, Python has the advantage to run on Windows, Unix or Linux, because the Bash scripts are not a good fit for "some of these" (*cough*), but that seems obvious.

For the remainder of this post we will focus on the CDH Cloud Scripts as they are the current status quo when it comes to starting Hadoop on EC2 clusters. But please keep an eye on Whirr as it will supersede the CDH Cloud Scripts sooner or later - and added to the CDH releases subsequently (it is in CDH3B3 now!).

I have mentioned the various AMIs above and that (at the time of this post) there are at least 41 of them available providing support for Hadoop in one way or another. But why would you have to create your own images or switch to other ones as Hadoop is released in newer version in the future? Wouldn't it make more sense to have a base AMI that somehow magically bootstraps the Hadoop version you need onto the cluster as you materialize it? You may have guessed it by now: that is exactly what the Cloudera AMIs are doing! All of these scripts use a mechanism called Instance Data which allows them to "hand in" configuration details to the AMI instances as they start. While the Hadoop EC2 Scripts only use this for limited configuration (and the rest being up to you - we will see an example of how that is done below) the CDH and Whirr scripts are employing this feature to bootstrap everything, including Hadoop. The instance data is a script called hadoop-ec2-init-remote.sh which is compressed and provided to the server as it starts. The trick is that the Cloudera AMIs have a mechanism to execute this script before starting the Hadoop daemons:

root@ip-10-194-222-3:~# ls -la /etc/init.d/{hadoop,ec2}*
-rwxr-xr-x 1 root root 1395 2009-04-18 21:36 /etc/init.d/ec2-get-credentials
-rwxr-xr-x 1 root root  286 2009-04-18 21:36 /etc/init.d/ec2-killall-nash-hotplug
-rwxr-xr-x 1 root root  125 2009-04-18 21:36 /etc/init.d/ec2-mkdir-tmp
-rwxr--r-- 1 root root 1945 2009-06-23 14:37 /etc/init.d/ec2-run-user-data
-rw-r--r-- 1 root root  709 2009-04-18 21:36 /etc/init.d/ec2-ssh-host-key-gen
-rwxr-xr-x 1 root root 4280 2010-03-22 06:19 /etc/init.d/hadoop-0.20-datanode
-rwxr-xr-x 1 root root 4296 2010-03-22 06:19 /etc/init.d/hadoop-0.20-jobtracker
-rwxr-xr-x 1 root root 4437 2010-03-22 06:19 /etc/init.d/hadoop-0.20-namenode
-rwxr-xr-x 1 root root 4352 2010-03-22 06:19 /etc/init.d/hadoop-0.20-secondarynamenode
-rwxr-xr-x 1 root root 4304 2010-03-22 06:19 /etc/init.d/hadoop-0.20-tasktracker

and

root@ip-10-194-222-3:~# ls -la /etc/rc2.d/*{hadoop,ec2}*
lrwxrwxrwx 1 root root 32 2010-09-13 12:32 /etc/rc2.d/S20hadoop-0.20-jobtracker -> ../init.d/hadoop-0.20-jobtracker
lrwxrwxrwx 1 root root 30 2010-09-13 12:32 /etc/rc2.d/S20hadoop-0.20-namenode -> ../init.d/hadoop-0.20-namenode
lrwxrwxrwx 1 root root 39 2010-09-13 12:32 /etc/rc2.d/S20hadoop-0.20-secondarynamenode -> ../init.d/hadoop-0.20-secondarynamenode
lrwxrwxrwx 1 root root 29 2009-06-23 14:58 /etc/rc2.d/S70ec2-get-credentials -> ../init.d/ec2-get-credentials
lrwxrwxrwx 1 root root 27 2009-06-23 14:58 /etc/rc2.d/S71ec2-run-user-data -> ../init.d/ec2-run-user-data

work their magic to get the user data (which is one part of the "Instance Data") and optionally decompress it before executing the script handed in. The only other requirement is that the AMI must have Java installed as well. As we look into further pieces of the puzzle we will get back to this init script. For now let it suffice to say that it does the bootstrapping of our instances and installs whatever we need dynamically during the start of the cluster.

Note: I am using the Ubuntu AMIs for all examples and code snippets in this post.

All about the options

First you need to install the CDH Cloud Scripts, which is rather straight forward. For example, first install the Cloudera CDH tarball:

$ wget http://archive.cloudera.com/cdh/2/hadoop-0.20.1+169.89.tar.gz
$ tar -zxvf hadoop-0.20.1+169.89.tar.gz
$ export PATH=$PATH:~/hadoop-0.20.1+169.89/src/contrib/cloud/src/py

Then install the required Python libs, we assume you have Python already installed in this example:

$ sudo apt-get install python-setuptools
$ sudo easy_install "simplejson==2.0.9"
$ sudo easy_install "boto==1.8d"

Now you are able to run the CDH Cloud Scripts - but to be really useful you need to configure them first. Cloudera has document that explains the details. Obviously while using those scripts a few more ideas come up and are added subsequently. Have a look at this example .hadoop-cloud directory:

$ ls -lA .hadoop-cloud/
total 40
-rw-r--r--  1 lars lars   489 2010-09-13 09:52 clusters-c1.medium.cfg
-rw-r--r--  1 lars lars   358 2010-09-10 06:13 clusters-c1.xlarge.cfg
lrwxrwxrwx  1 lars lars    22 2010-08-22 06:04 clusters.cfg -> clusters-c1.medium.cfg
-rw-r--r--  1 lars lars 17601 2010-09-13 10:19 hadoop-ec2-init-remote-cdh2.sh
drwxr-xr-x  2 lars lars  4096 2010-08-15 14:14 lars-test-cluster/

You can see that it has multiple clusters.cfg configuration files that differ only in their settings for the image_id (the AMI to be used) and the instance_type. Here is is one of those files:

$ cat .hadoop-cloud/clusters-c1.medium.cfg
[lars-test-cluster]
image_id=ami-ed59bf84
instance_type=c1.medium
key_name=root
availability_zone=us-east-1a
private_key=~/.ec2/root.pem
ssh_options=-i %(private_key)s -o StrictHostKeyChecking=no
user_data_file=http://archive.cloudera.com/cloud/ec2/cdh2/hadoop-ec2-init-remote.sh
user_packages=lynx s3cmd
env=AWS_ACCESS_KEY_ID=<your-access-key>
    AWS_SECRET_ACCESS_KEY=<your-secret-key>

All you have to do now is switch the symbolic link to run either cluster setup. Obviously another option would be to use the command line options which hadoop-ec2 offers. Execute $ hadoop-ec2 launch-cluster --help to see what is available. You can override the values from the current clusters.cfg or even select a completely different configuration directory. Personally I like the symlink approach as this allows me to keep the settings for each cluster instance together in a separate configuration file - but a usual, the choice is yours. You could also save each hadoop-ec2 call in a small Bash script along with all command line options in it.

Back to the .hadoop-cloud directory above. There is another file hadoop-ec2-init-remote-cdh2.sh (see below) and a directory called lars-test-cluster, which is created and maintained by the CDH Cloud Scripts. It contains a local hadoop-site.xml with your current AWS credentials (assuming you have them set in your .profile as per the documentation) that you can use to access S3 from your local Hadoop scripts.

For the sake of completeness here the other cluster configuration file:

$ cat .hadoop-cloud/clusters-c1.xlarge.cfg
[lars-test-cluster]
image_id=ami-8759bfee
instance_type=c1.xlarge
key_name=root
availability_zone=us-east-1a
private_key=~/.ec2/root.pem
ssh_options=-i %(private_key)s -o StrictHostKeyChecking=no
user_data_file=http://archive.cloudera.com/cloud/ec2/cdh2/hadoop-ec2-init-remote.sh
user_packages=lynx s3cmd

env=AWS_ACCESS_KEY_ID=<your-access-key>
    AWS_SECRET_ACCESS_KEY=<your-secret-key>

The user_data_file is where the version of Hadoop and here even Cloudera's Distribution for Hadoop is chosen. You can replace the link with

user_data_file=http://archive.cloudera.com/cloud/ec2/cdh3/hadoop-ec2-init-remote.sh

to use the newer CDH3, currently in beta.

Also note that the AMIs are currently only available in the us-east-x zones and not in any of the others.

To conclude the setup, here a list of possible configuration options:

Option CLI Description
cloud_provider --cloud-provider The cloud provider, e.g. 'ec2' for Amazon EC2.
auto_shutdown --auto-shutdown The time in minutes after launch when an instance will be automatically shut down.
image_id --image-id The ID of the image to launch.
instance_type -t | --instance-type The type of instance to be launched. One of m1.small, m1.large, m1.xlarge, c1.medium, or c1.xlarge.
key_name -k | --key-name The key pair to use when launching instances. (Amazon EC2 only.)
availability_zone -z | --availability-zone The availability zone to run the instances in.
private_key Used with update-slaves-file command. The file is copied to all EC2 servers.
ssh_options --ssh-options SSH options to use.
user_data_file -f | --user-data-file The URL of the file containing user data to be made available to instances.
user_packages -p | --user-packages A space-separated list of packages to install on instances on start up.
env -e | --env An environment variable to pass to instances. (May be specified multiple times.)
--client-cidr The CIDR of the client, which is used to allow access through the firewall to the master node. (May be specified multiple times.)
--security-group Additional security groups within which the instances should be run. (Amazon EC2 only.) (May be specified multiple times.)

Custom initialization

Now you can configure and start a cluster up on EC2. Sooner or later though you are facing more challenging issues. One that hits home early on is compression. You are encouraged to use compression in Hadoop as it saves you storage needed but also bandwidth as less data needs to be transferred over the wire. See this and this post for "subtle" hints. Cool, so let's switch on compression - must be easy, right? Well, not exactly. For starters choosing the appropriate codec is not trivial. A very popular one is LZO as described in the posts above because it has many advantage in combination with Hadoop's MapReduce. Problem is that it is GPL licensed and therefore not shipped with Hadoop. You actually have to compile it yourself to be able to install it subsequently. How this is done is described here. You need to follow those steps and compile an installable package on all AMI's you want to use later. For example, log into the master of your EC2 Hadoop cluster and execute the following commands:

$ hadoop-ec2 login <your-cluster-name>
# cd ~
# apt-get install subversion devscripts ant git-core liblzo2-dev
# git clone http://github.com/toddlipcon/hadoop-lzo-packager.git
# cd hadoop-lzo-packager/
# SKIP_RPM=1 ./run.sh
# build/deb/
# dpkg -i toddlipcon-hadoop-lzo_20100913142659.20100913142512.6ddda26-1_i386.deb 

Note: Since I am running Ubuntu AMIs I used the SKIP_RPM=1 flag to skip RedHat package generation.

Copy the final .deb file to a save location naming it hadoop-lzo_i368.deb or hadoop-lzo_amd64.deb using scp for example. Obviously do the same for the yum packages if you are preferring the Fedora AMIs.

The next step is to figure out how to install the packages we just built during the bootstrap process described above. This is where the user_data_file comes back into play. Instead of copying the .deb packages we save them on S3 instead, using a tool like s3cmd. For example:

$ s3cmd put hadoop-lzo_i386.deb s3://dpkg/

Now we can switch from the default init script to our own. Use wget to download the default file

$ wget http://archive.cloudera.com/cloud/ec2/cdh2/hadoop-ec2-init-remote.sh
$ mv hadoop-ec2-init-remote.sh .hadoop-cloud/hadoop-ec2-init-remote-cdh2.sh

In the clusters.cfg we need to replace the link with our local file like so

user_data_file=file:///home/lars/.hadoop-cloud/hadoop-ec2-init-remote-cdh2.sh

Note that we cannot use ~/.hadoop-cloud/... as the filename because the Python code does not resolve the Bash file path syntax.

The local init script can now be adjusted as needed, here we are adding the functions to set up s3cmd and then install the LZO packages subsequently on server startup:

...
  fi
  service $HADOOP-$daemon start
}

function install_s3cmd() {
  install_packages s3cmd # needed for LZO package on S3
  cat > /tmp/.s3cfg << EOF
[default]
access_key = $AWS_ACCESS_KEY_ID
acl_public = False
bucket_location = US
cloudfront_host = cloudfront.amazonaws.com
cloudfront_resource = /2008-06-30/distribution
default_mime_type = binary/octet-stream
delete_removed = False
dry_run = False
encoding = UTF-8
encrypt = False
force = False
get_continue = False
gpg_command = /usr/bin/gpg
gpg_decrypt = %(gpg_command)s -d --verbose --no-use-agent --batch --yes --passphrase-fd %(passphrase_fd)s -o %(output_file)s %(input_file)s
gpg_encrypt = %(gpg_command)s -c --verbose --no-use-agent --batch --yes --passphrase-fd %(passphrase_fd)s -o %(output_file)s %(input_file)s
gpg_passphrase =
guess_mime_type = True
host_base = s3.amazonaws.com
host_bucket = %(bucket)s.s3.amazonaws.com
human_readable_sizes = False
list_md5 = False
preserve_attrs = True
progress_meter = True
proxy_host =
proxy_port = 0
recursive = False
recv_chunk = 4096
secret_key = $AWS_SECRET_ACCESS_KEY
send_chunk = 4096
simpledb_host = sdb.amazonaws.com
skip_existing = False
urlencoding_mode = normal
use_https = False
verbosity = WARNING
EOF
}

function install_hadoop_lzo() {
  INSTANCE_TYPE=`wget -q -O - http://169.254.169.254/latest/meta-data/instance-type`

  case $INSTANCE_TYPE in
  m1.large|m1.xlarge|m2.xlarge|m2.2xlarge|m2.4xlarge|c1.xlarge|cc1.4xlarge)
    HADOOP_LZO="hadoop-lzo_amd64"
    ;;
  *)
    HADOOP_LZO="hadoop-lzo_i386"
    ;;
  esac

  if which dpkg &> /dev/null; then
    HADOOP_LZO_FN=${HADOOP_LZO}.deb
    s3cmd -c /tmp/.s3cfg get --force s3://dpkg/$HADOOP_LZO_FN /tmp/$HADOOP_LZO_FN
    dpkg -i /tmp/$HADOOP_LZO_FN
  elif which rpm &> /dev/null; then
    # todo
    echo "do it yum style..."
  fi
}

register_auto_shutdown
update_repo
install_user_packages
install_s3cmd
install_hadoop
install_hadoop_lzo
configure_hadoop

By the way, once a cluster is up you can verify what the user data script did (or even "is doing" if you log in promptly) by checking the /var/log/messages file on for example the Hadoop master node:

$ hadoop-ec2 login lars-test-cluster
# cat /var/log/messages 
...
Sep 14 12:10:55 ip-10-242-18-80 user-data: + install_hadoop_lzo
Sep 14 12:10:55 ip-10-242-18-80 user-data: ++ wget -q -O - http://169.254.169.254/latest/meta-data/instance-type
Sep 14 12:10:55 ip-10-242-18-80 user-data: + INSTANCE_TYPE=c1.medium
Sep 14 12:10:55 ip-10-242-18-80 user-data: + case $INSTANCE_TYPE in
Sep 14 12:10:55 ip-10-242-18-80 user-data: + HADOOP_LZO=hadoop-lzo_i386
Sep 14 12:10:55 ip-10-242-18-80 user-data: + which dpkg
Sep 14 12:10:55 ip-10-242-18-80 user-data: + HADOOP_LZO_FN=hadoop-lzo_i386.deb
Sep 14 12:10:55 ip-10-242-18-80 user-data: + s3cmd -c /tmp/.s3cfg get --force s3://dpkg/hadoop-lzo_i386.deb /tmp/hadoop-lzo_i386.deb
Sep 14 12:10:55 ip-10-242-18-80 user-data: Object s3://dpkg/hadoop-lzo_i386.deb saved as '/tmp/hadoop-lzo_i386.deb' (65810 bytes in 0.0 seconds, 5.06 MB/s)
Sep 14 12:10:56 ip-10-242-18-80 user-data: + dpkg -i /tmp/hadoop-lzo_i386.deb
Sep 14 12:10:56 ip-10-242-18-80 user-data: Selecting previously deselected package toddlipcon-hadoop-lzo.
Sep 14 12:10:56 ip-10-242-18-80 user-data: (Reading database ... 24935 files and directories currently installed.)
Sep 14 12:10:56 ip-10-242-18-80 user-data: Unpacking toddlipcon-hadoop-lzo (from /tmp/hadoop-lzo_i386.deb) ...
Sep 14 12:10:56 ip-10-242-18-80 user-data: Setting up toddlipcon-hadoop-lzo (20100913142659.20100913142512.6ddda26-1) ...
...

Note: A quick tip in case you edit the init script yourself and are going to add configuration data that is output to a file using cat (see cat > /tmp/.s3cfg << EOF above): make sure that the final "EOF" has NO trailing whitespaces or the script fails miserably. I had "EOF " (note the trailing space) as opposed to "EOF" and it took me a while to find that! The script would fail to run with an "unexpected end of file" error.

A comment on EMR (or Elastic MapReduce), Amazon's latest offering in regards to Hadoop support. It is a wrapper around launching a cluster on your behalf and executing MapReduce jobs or Hive queries etc. While this will help many to be up and running with "cloud based" MapReduce work it has also a few drawbacks: for starters you have to work with what you are given in regards to Hadoop versioning. You have to rely on Amazon to keep it current and any "special" version you would like to try may not work at all. Furthermore you have no option to install LZO as described above, i.e. the whole bootstrap process is automated and not accessible to you for modifications. And finally, you pay for it on top of the standard EC2 rates, so it comes at a premium.

Provision data

We touched S3 already above but let me get back to it for a moment. Small files like the installation packages are no issue at all obviously. What is a problem though is when you have to deal with huge files larger than the implicit 5GB maximum file size S3 allows. You have two choices here, either split the files into smaller ones or use an IO layer that does that same task for you. That feature is built right into Hadoop itself. This is of course documented but let me add a few notes that may help understand the implications a little bit better. First here a table comparing the different tools you can use:

Tool Name Supported Description
s3cmd s3 Supports access to S3 as provided by the AWS API's and also the S3 Management Console over the web.
hadoop s3, s3n Supports raw or direct S3 access as well as a specialized Hadoop filesystem on S3.

The thing that is not obvious initially is that "s3cmd get s3://..." is not the same as "hadoop fs -get s3://...". When you use a standard tool that implements the S3 API like s3cmd then you use s3://<bucket-name>/... as the object/file URI. In Hadoop terms that is referred to as "raw" or "native" S3. And if you want to use Hadoop to access a file on S3 in that mode then the URI is s3n://<bucket-name>/... - note the "s3n" URI scheme. In contrast, if you use the "s3" scheme with Hadoop it employs a special file system mode that stores the large files in smaller binary files on S3 completely transparent to the user. For example:

$ hadoop fs -put verylargefile.log s3://my-bucket/logs/20100916/
$ s3cmd ls s3://my-bucket/
                       DIR   s3://my-bucket//
2010-09-16 07:44  33554432   s3://my-bucket/block_-1289596344515350280
2010-09-16 07:45  33554432   s3://my-bucket/block_-15869508987376965
2010-09-16 07:46  33554432   s3://my-bucket/block_-172539355612092125
2010-09-16 07:45  33554432   s3://my-bucket/block_-1894993863630732603
2010-09-16 07:43  33554432   s3://my-bucket/block_-2049322783060796466
2010-09-16 07:51  33554432   s3://my-bucket/block_-2070316024499434597
2010-09-16 07:43  33554432   s3://my-bucket/block_-2107321687364706212
2010-09-16 07:46  33554432   s3://my-bucket/block_-2117877727016155804
...

The following table provides a comparison between the various access modes and their file size limitations:

Type Mode Limit Example
S3 API native 5GB s3cmd get s3://my-bucket/my-s3-dir/my-file.name
Hadoop native 5GB hadoop fs -get s3n://my-bucket/my-s3-dir/my-file.name
Hadoop binary blocks unlimited hadoop fs -get s3://my-bucket/my-hadoop-path/my-file.name

You may now ask yourself which one to use. If you will never deal with very large files it may not matter. But if you do, then you need to decide if you use Hadoop's binary filesystem or chop files yourself to fit into 5GB. Also keep in mind that once you upload files using Hadoop's binary filesystem then you can NOT go back to the native tools as the files stored in your S3 bucket are named (seemingly) randomly and content is spread across many of those smaller files as can be seen in the example above. There is no direct way to parse these files yourself outside of Hadoop.

One final note on S3 and provisioning data: it seems it makes more sense to copy data from S3 into HDFS before running a job not just because of the improved IO performance (keyword here: data locality!) but also in regards to stability. I have seen jobs fail that read directly from S3 but succeeded happily when reading from HDFS. And copying data from S3 to EC2 is free, so you may want to try your luck with either option and see what is best for your use-case.

ETL and Processing

The last part in a usual workflow is to process the data we now have nicely compressed and splittable up on S3 or HDFS. This would ideally be Hive queries if the data is already in a "Hive ready" format. Often though the data comes from legacy resources and needs to be processed before it can be queried. This process is usually referred to as Extract, transform, load or abbreviated as "ETL". It can be comprised of various steps executing dedicated applications or scripts pruning and transforming raw input files. I will leave this for another post though as this points to the same problem we addressed above: you have many tools you could use and have to decide which suits you best. There is Kettle or Spring Batch and also the new kid on the block Oozie. Some combine both steps while Oozie for example concentrates on the workflow aspect.>/p>

This is particularly interesting as we can use Oozie to spin up our EC2 clusters as well as run the ETL job (which could be a Kettle job for example), followed by the Hive queries. Add Sqoop and you have a tool to read and write from legacy database in the process. But as I said, I leave this whole topic for a follow up post. But I do believe this is important to understand and document the full process of running Hadoop in the "cloud". Only then you have the framework to run the full business process on Amazons Web Services (or any other cloud computing provider).

Conclusion

With Whirr being released it seems like the above may become somewhat obsolete soon. I will look into Whirr more and update the post to show you how the same is achieved. My preliminary investigation shows though that you have the same issues - or say "advanced challenges" to be fair as Whirr is not at fault here. Maybe one day we have an Apache licensed alternative to LZO available and installing a suitable compression codec will be much easier. For now this is not the case.

Another topic we have not touched upon is local storage in EC2. Usually you have an attached volume that is destroyed once the instance is shut down. To get around this restriction you can create Snapshots and mount them as Elastic Block Storage (or EBS) which are persisted across server restarts. They are also supposedly faster than the default volume. This is yet another interesting topic I am planning to post about as especially write performance in EC2 is really, really bad - and that may affect the above ETL process in unsuspected ways. But on the other hand you get persistency and being able to start and stop a cluster while retaining the data it had stored. The CDH Cloud Scripts have full support for EBS while Whirr is said to not have that working yet (although WHIRR-3 seems to say it is implemented).

Let me know if you are interested in a particular topic regarding this post and which I may not have touched upon. I am curious to hear what you are doing with Hadoop on EC2, so please drop me a note.