Hbase Internal Architecture:-
Hbase Architecture
One of the more hidden aspects of HBase is how data is actually stored. While the majority of users may never have to bother about it you may have to get up to speed when you want to learn what the various advanced configuration options you have at your disposal mean. "How can I tune HBase to my needs?", and other similar questions are certainly interesting once you get over the (at times steep) learning curve of setting up a basic system. Another reason wanting to know more is if for whatever reason disaster strikes and you have to recover a HBase installation.
In my own efforts getting to know the respective classes that handle the various files I started to sketch a picture in my head illustrating the storage architecture of HBase. But while the ingenious and blessed committers of HBase easily navigate back and forth through that maze I find it much more difficult to keep a coherent image. So I decided to put that sketch to paper. Here it is.
Please note that this is not a UML or call graph but a merged picture of classes and the files they handle and by no means complete though focuses on the topic of this post. I will discuss the details below and also look at the configuration options and how they affect the low-level storage files.
So what does my sketch of the HBase innards really say? You can see that HBase handles basically two kinds of file types. One is used for the write-ahead log and the other for the actual data storage. The files are primarily handled by the HRegionServer's. But in certain scenarios even the HMasterwill have to perform low-level file operations. You may also notice that the actual files are in fact divided up into smaller blocks when stored within the Hadoop Distributed Filesystem (HDFS). This is also one of the areas where you can configure the system to handle larger or smaller data better. More on that later.
The general flow is that a new client contacts the Zookeeper quorum (a separate cluster of Zookeeper nodes) first to find a particular row key. It does so by retrieving the server name (i.e. host name) that hosts the -ROOT- region from Zookeeper. With that information it can query that server to get the server that hosts the .META. table. Both of these two details are cached and only looked up once. Lastly it can query the .META. server and retrieve the server that has the row the client is looking for.
Once it has been told where the row resides, i.e. in what region, it caches this information as well and contacts theHRegionServer hosting that region directly. So over time the client has a pretty complete picture of where to get rows from without needing to query the .META. server again.
Note:- The HMaster is responsible to assign the regions to each HRegionServer when you start HBase. This also includes the "special" -ROOT- and .META. tables.
Next the HRegionServer opens the region it creates a corresponding HRegion object. When the HRegion is "opened" it sets up aStore instance for each HColumnFamily for every table as defined by the user beforehand. Each of the Store instances can in turn have one or more StoreFile instances, which are lightweight wrappers around the actual storage file called HFile. AHRegion also has a MemStore and a HLog instance. We will now have a look at how they work together but also where there are exceptions to the rule.
In my own efforts getting to know the respective classes that handle the various files I started to sketch a picture in my head illustrating the storage architecture of HBase. But while the ingenious and blessed committers of HBase easily navigate back and forth through that maze I find it much more difficult to keep a coherent image. So I decided to put that sketch to paper. Here it is.
Please note that this is not a UML or call graph but a merged picture of classes and the files they handle and by no means complete though focuses on the topic of this post. I will discuss the details below and also look at the configuration options and how they affect the low-level storage files.
So what does my sketch of the HBase innards really say? You can see that HBase handles basically two kinds of file types. One is used for the write-ahead log and the other for the actual data storage. The files are primarily handled by the HRegionServer's. But in certain scenarios even the HMasterwill have to perform low-level file operations. You may also notice that the actual files are in fact divided up into smaller blocks when stored within the Hadoop Distributed Filesystem (HDFS). This is also one of the areas where you can configure the system to handle larger or smaller data better. More on that later.
The general flow is that a new client contacts the Zookeeper quorum (a separate cluster of Zookeeper nodes) first to find a particular row key. It does so by retrieving the server name (i.e. host name) that hosts the -ROOT- region from Zookeeper. With that information it can query that server to get the server that hosts the .META. table. Both of these two details are cached and only looked up once. Lastly it can query the .META. server and retrieve the server that has the row the client is looking for.
Once it has been told where the row resides, i.e. in what region, it caches this information as well and contacts theHRegionServer hosting that region directly. So over time the client has a pretty complete picture of where to get rows from without needing to query the .META. server again.
Note:- The HMaster is responsible to assign the regions to each HRegionServer when you start HBase. This also includes the "special" -ROOT- and .META. tables.
Next the HRegionServer opens the region it creates a corresponding HRegion object. When the HRegion is "opened" it sets up aStore instance for each HColumnFamily for every table as defined by the user beforehand. Each of the Store instances can in turn have one or more StoreFile instances, which are lightweight wrappers around the actual storage file called HFile. AHRegion also has a MemStore and a HLog instance. We will now have a look at how they work together but also where there are exceptions to the rule.
Put
So how is data written to the actual storage? The client issues a HTable.put(Put) request to the HRegionServer which hands the details to the matching HRegion instance. The first step is now to decide if the data should be first written to the "Write-Ahead-Log" (WAL) represented by the HLog class. The decision is based on the flag set by the client usingPut.writeToWAL(boolean) method. The WAL is a standard Hadoop SequenceFile (although it is currently discussed if that should not be changed to a more HBase suitable file format) and it stores HLogKey's. These keys contain a sequential number as well as the actual data and are used to replay not yet persisted data after a server crash.
Once the data is written (or not) to the WAL it is placed in the MemStore. At the same time it is checked if the MemStore is full and in that case a flush to disk is requested. When the request is served by a separate thread in the HRegionServer it writes the data to an HFile located in the HDFS. It also saves the last written sequence number so the system knows what was persisted so far. Let"s have a look at the files now.
Files
HBase has a configurable root directory in the HDFS but the default is /hbase. You can simply use the DFS tool of the Hadoop command line tool to look at the various files HBase stores.
01.$ hadoop dfs -lsr /hbase/docs
02....
03.drwxr-xr-x - hadoop supergroup 0 2009-09-28 14:22 /hbase/.logs
04.drwxr-xr-x - hadoop supergroup 0 2009-10-15 14:33 /hbase/.logs/srv1.foo.bar,60020,1254172960891
05.-rw-r--r-- 3 hadoop supergroup 14980 2009-10-14 01:32 /hbase/.logs/srv1.foo.bar,60020,1254172960891/hlog.dat.1255509179458
06.-rw-r--r-- 3 hadoop supergroup 1773 2009-10-14 02:33 /hbase/.logs/srv1.foo.bar,60020,1254172960891/hlog.dat.1255512781014
07.-rw-r--r-- 3 hadoop supergroup 37902 2009-10-14 03:33 /hbase/.logs/srv1.foo.bar,60020,1254172960891/hlog.dat.1255516382506
08....
09.-rw-r--r-- 3 hadoop supergroup 137648437 2009-09-28 14:20 /hbase/docs/1905740638/oldlogfile.log
10....
11.drwxr-xr-x - hadoop supergroup 0 2009-09-27 18:03 /hbase/docs/999041123
12.-rw-r--r-- 3 hadoop supergroup 2323 2009-09-01 23:16 /hbase/docs/999041123/.regioninfo
13.drwxr-xr-x - hadoop supergroup 0 2009-10-13 01:36 /hbase/docs/999041123/cache
14.-rw-r--r-- 3 hadoop supergroup 91540404 2009-10-13 01:36 /hbase/docs/999041123/cache/5151973105100598304
15.drwxr-xr-x - hadoop supergroup 0 2009-09-27 18:03 /hbase/docs/999041123/contents
16.-rw-r--r-- 3 hadoop supergroup 333470401 2009-09-27 18:02 /hbase/docs/999041123/contents/4397485149704042145
17.drwxr-xr-x - hadoop supergroup 0 2009-09-04 01:16 /hbase/docs/999041123/language
18.-rw-r--r-- 3 hadoop supergroup 39499 2009-09-04 01:16 /hbase/docs/999041123/language/8466543386566168248
19.drwxr-xr-x - hadoop supergroup 0 2009-09-04 01:16 /hbase/docs/999041123/mimetype
20.-rw-r--r-- 3 hadoop supergroup 134729 2009-09-04 01:16 /hbase/docs/999041123/mimetype/786163868456226374
21.drwxr-xr-x - hadoop supergroup 0 2009-10-08 22:45 /hbase/docs/999882558
22.-rw-r--r-- 3 hadoop supergroup 2867 2009-10-08 22:45 /hbase/docs/999882558/.regioninfo
23.drwxr-xr-x - hadoop supergroup 0 2009-10-09 23:01 /hbase/docs/999882558/cache
24.-rw-r--r-- 3 hadoop supergroup 45473255 2009-10-09 23:01 /hbase/docs/999882558/cache/974303626218211126
25.drwxr-xr-x - hadoop supergroup 0 2009-10-12 00:37 /hbase/docs/999882558/contents
26.-rw-r--r-- 3 hadoop supergroup 467410053 2009-10-12 00:36 /hbase/docs/999882558/contents/2507607731379043001
27.drwxr-xr-x - hadoop supergroup 0 2009-10-09 23:02 /hbase/docs/999882558/language
28.-rw-r--r-- 3 hadoop supergroup 541 2009-10-09 23:02 /hbase/docs/999882558/language/5662037059920609304
29.drwxr-xr-x - hadoop supergroup 0 2009-10-09 23:02 /hbase/docs/999882558/mimetype
30.-rw-r--r-- 3 hadoop supergroup 84447 2009-10-09 23:02 /hbase/docs/999882558/mimetype/2642281535820134018
31.drwxr-xr-x - hadoop supergroup 0 2009-10-14 10:58 /hbase/docs/compaction.dir
The first set of files are the log files handled by the HLog instances and which are created in a directory called .logsunderneath the HBase root directory. Then there is another subdirectory for each HRegionServer and then a log for eachHRegion.
Next there is a file called oldlogfile.log which you may not even see on your cluster. They are created by one of the exceptions I mentioned earlier as far as file access is concerned. They are a result of so called "log splits". When theHMaster starts and finds that there is a log file that is not handled by a HRegionServer anymore it splits the log copying theHLogKey's to the new regions they should be in. It places them directly in the region's directory in a file named oldlogfile.log. Now when the respective HRegion is instantiated it reads these files and inserts the contained data into its local MemStore and starts a flush to persist the data right away and delete the file.
Note: Sometimes you may see left-over oldlogfile.log.old (yes, there is another .old at the end) which are caused by theHMaster trying repeatedly to split the log and found there was already another split log in place. At that point you have to consult with the HRegionServer or HMaster logs to see what is going on and if you can remove those files. I found at times that they were empty and therefore could safely be removed.
The next set of files are the actual regions. Each region name is encoded using a Jenkins Hash function and a directory created for it. The reason to hash the region name is because it may contain characters that cannot be used in a path name in DFS. The Jenkins Hash always returns legal characters, as simple as that. So you get the following path structure:
Next there is a file called oldlogfile.log which you may not even see on your cluster. They are created by one of the exceptions I mentioned earlier as far as file access is concerned. They are a result of so called "log splits". When theHMaster starts and finds that there is a log file that is not handled by a HRegionServer anymore it splits the log copying theHLogKey's to the new regions they should be in. It places them directly in the region's directory in a file named oldlogfile.log. Now when the respective HRegion is instantiated it reads these files and inserts the contained data into its local MemStore and starts a flush to persist the data right away and delete the file.
Note: Sometimes you may see left-over oldlogfile.log.old (yes, there is another .old at the end) which are caused by theHMaster trying repeatedly to split the log and found there was already another split log in place. At that point you have to consult with the HRegionServer or HMaster logs to see what is going on and if you can remove those files. I found at times that they were empty and therefore could safely be removed.
The next set of files are the actual regions. Each region name is encoded using a Jenkins Hash function and a directory created for it. The reason to hash the region name is because it may contain characters that cannot be used in a path name in DFS. The Jenkins Hash always returns legal characters, as simple as that. So you get the following path structure:
/hbase/<tablename>/<encoded-regionname>/<column-family>/<filename>
In the root of the region directory there is also a .regioninfo holding meta data about the region. This will be used in the future by an HBase fsck utility (see HBASE-7) to be able to rebuild a broken .META. table. For a first usage of the region info can be seen in HBASE-1867.
In each column-family directory you can see the actual data files, which I explain in the following section in detail.
Something that I have not shown above are split regions with their initial daughter reference files. When a data file within a region grows larger than the configured hbase.hregion.max.filesize then the region is split in two. This is done initially very quickly because the system simply creates two reference files in the new regions now supposed to host each half. The name of the reference file is an ID with the hashed name of the referenced region as a postfix, e.g.1278437856009925445.3323223323. The reference files only hold little information: the key the original region was split at and wether it is the top or bottom reference. Of note is that these references are then used by the HalfHFileReader class (which I also omitted from the big picture above as it is only used temporarily) to read the original region data files. Only upon a compaction the original files are rewritten into separate files in the new region directory. This also removes the small reference files as well as the original data file in the original region.
And this also concludes the file dump here, the last thing you see is a compaction.dir directory in each table directory. They are used when splitting or compacting regions as noted above. They are usually empty and are used as a scratch area to stage the new data files before swapping them into place.
In the root of the region directory there is also a .regioninfo holding meta data about the region. This will be used in the future by an HBase fsck utility (see HBASE-7) to be able to rebuild a broken .META. table. For a first usage of the region info can be seen in HBASE-1867.
In each column-family directory you can see the actual data files, which I explain in the following section in detail.
Something that I have not shown above are split regions with their initial daughter reference files. When a data file within a region grows larger than the configured hbase.hregion.max.filesize then the region is split in two. This is done initially very quickly because the system simply creates two reference files in the new regions now supposed to host each half. The name of the reference file is an ID with the hashed name of the referenced region as a postfix, e.g.1278437856009925445.3323223323. The reference files only hold little information: the key the original region was split at and wether it is the top or bottom reference. Of note is that these references are then used by the HalfHFileReader class (which I also omitted from the big picture above as it is only used temporarily) to read the original region data files. Only upon a compaction the original files are rewritten into separate files in the new region directory. This also removes the small reference files as well as the original data file in the original region.
And this also concludes the file dump here, the last thing you see is a compaction.dir directory in each table directory. They are used when splitting or compacting regions as noted above. They are usually empty and are used as a scratch area to stage the new data files before swapping them into place.
HFile
So we are now at a very low level of HBase's architecture. HFile's (kudos to Ryan Rawson) are the actual storage files, specifically created to serve one purpose: store HBase's data fast and efficiently. They are apparently based on Hadoop'sTFile (see HADOOP-3315) and mimic the SSTable format used in Googles BigTable architecture. The previous use of Hadoop's MapFile's in HBase proved to be not good enough performance wise. So how do the files look like?
HFile Internal Structure
The files have a variable length, the only fixed blocks are the FileInfo and Trailer block. As the picture shows it is the Trailer that has the pointers to the other blocks and it is written at the end of persisting the data to the file, finalizing the now immutable data store. The Index blocks record the offsets of the Data and Meta blocks. Both the Data and the Meta blocks are actually optional. But you most likely you would always find data in a data store file.
How is the block size configured? It is driven solely by the HColumnDescriptor which in turn is specified at table creation time by the user or defaults to reasonable standard values. Here is an example as shown in the master web based interface:
How is the block size configured? It is driven solely by the HColumnDescriptor which in turn is specified at table creation time by the user or defaults to reasonable standard values. Here is an example as shown in the master web based interface:
{NAME => 'docs', FAMILIES => [{NAME => 'cache', COMPRESSION => 'NONE', VERSIONS => '3', TTL => '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'false'}, {NAME => 'contents', COMPRESSION => 'NONE', VERSIONS => '3', TTL => '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'false'}, ...
The default is "64KB" (or 65535 bytes). Here is what the HFile JavaDoc explains:
"Minimum block size. We recommend a setting of minimum block size between 8KB to 1MB for general usage. Larger block size is preferred if files are primarily for sequential access. However, it would lead to inefficient random access (because there are more data to decompress). Smaller blocks are good for random access, but require more memory to hold the block index, and may be slower to create (because we must flush the compressor stream at the conclusion of each data block, which leads to an FS I/O flush). Further, due to the internal caching in Compression codec, the smallest possible block size would be around 20KB-30KB."
"Minimum block size. We recommend a setting of minimum block size between 8KB to 1MB for general usage. Larger block size is preferred if files are primarily for sequential access. However, it would lead to inefficient random access (because there are more data to decompress). Smaller blocks are good for random access, but require more memory to hold the block index, and may be slower to create (because we must flush the compressor stream at the conclusion of each data block, which leads to an FS I/O flush). Further, due to the internal caching in Compression codec, the smallest possible block size would be around 20KB-30KB."
So each block with its prefixed "magic" header contains either plain or compressed data. How that looks like we will have a look at in the next section.
One thing you may notice is that the default block size for files in DFS is 64MB, which is 1024 times what the HFile default block size is. So the HBase storage files blocks do not match the Hadoop blocks. Therefore you have to think about both parameters separately and find the sweet spot in terms of performance for your particular setup.
One option in the HBase configuration you may see is hfile.min.blocksize.size. It seems to be only used during migration from earlier versions of HBase (since it had no block file format) and when directly creating HFile during bulk imports for example.
So far so good, but how can you see if a HFile is OK or what data it contains? There is an App for that!
The HFile.main() method provides the tools to dump a data file:
01.$ hbase org.apache.hadoop.hbase.io.hfile.HFile
02.usage: HFile [-f <arg>] [-v] [-r <arg>] [-a] [-p] [-m] [-k]
03.-a,--checkfamily Enable family check
04.-f,--file <arg> File to scan. Pass full-path; e.g.
05.hdfs://a:9000/hbase/.META./12/34
06.-k,--checkrow Enable row order check; looks for out-of-order keys
07.-m,--printmeta Print meta data of file
08.-p,--printkv Print key/value pairs
09.-r,--region <arg> Region to scan. Pass region name; e.g. '.META.,,1'
10.-v,--verbose Verbose output; emits file and meta data delimiters
11.</arg></arg></arg></arg>
Here is an example of what the output will look like (shortened here):
01.$ hbase org.apache.hadoop.hbase.io.hfile.HFile -v -p -m -f \
02.hdfs://srv1.foo.bar:9000/hbase/docs/999882558/mimetype/2642281535820134018
03.
04.Scanning -> hdfs://srv1.foo.bar:9000/hbase/docs/999882558/mimetype/2642281535820134018
05....
06.K: \x00\x04docA\x08mimetype\x00\x00\x01\x23y\x60\xE7\xB5\x04 V: text\x2Fxml
07.K: \x00\x04docB\x08mimetype\x00\x00\x01\x23x\x8C\x1C\x5E\x04 V: text\x2Fxml
08.K: \x00\x04docC\x08mimetype\x00\x00\x01\x23xz\xC08\x04 V: text\x2Fxml
09.K: \x00\x04docD\x08mimetype\x00\x00\x01\x23y\x1EK\x15\x04 V: text\x2Fxml
10.K: \x00\x04docE\x08mimetype\x00\x00\x01\x23x\xF3\x23n\x04 V: text\x2Fxml
11.Scanned kv count -> 1554
12.
13.Block index size as per heapsize: 296
14.reader=hdfs://srv1.foo.bar:9000/hbase/docs/999882558/mimetype/2642281535820134018, \
15.compression=none, inMemory=false, \
16.firstKey=US6683275_20040127/mimetype:/1251853756871/Put, \
17.lastKey=US6684814_20040203/mimetype:/1251864683374/Put, \
18.avgKeyLen=37, avgValueLen=8, \
19.entries=1554, length=84447
20.fileinfoOffset=84055, dataIndexOffset=84277, dataIndexCount=2, metaIndexOffset=0, \
21.metaIndexCount=0, totalBytes=84055, entryCount=1554, version=1
22.Fileinfo:
23.MAJOR_COMPACTION_KEY = \xFF
24.MAX_SEQ_ID_KEY = 32041891
25.hfile.AVG_KEY_LEN = \x00\x00\x00\x25
26.hfile.AVG_VALUE_LEN = \x00\x00\x00\x08
27.hfile.COMPARATOR = org.apache.hadoop.hbase.KeyValue\x24KeyComparator
28.hfile.LASTKEY = \x00\x12US6684814_20040203\x08mimetype\x00\x00\x01\x23x\xF3\x23n\x04
The first part is the actual data stored as KeyValue pairs, explained in detail in the next section. The second part dumps the internal HFile.Reader properties as well as the Trailer block details and finally the FileInfo block values. This is a great way to check if a data file is still healthy.
KeyValue's
In essence each KeyValue in the HFile is simply a low-level byte array that allows for "zero-copy" access to the data, even with lazy or custom parsing if necessary. How are the instances arranged?
Key-Value Store
The structure starts with two fixed length numbers indicating the size of the key and the value part. With that info you can offset into the array to for example get direct access to the value, ignoring the key - if you know what you are doing. Otherwise you can get the required information from the key part. Once parsed into a KeyValueobject you have getters to access the details.
Note:- One thing to watch out for is the difference between KeyValue.getKey() and KeyValue.getRow(). I think for me the confusion arose from referring to "row keys" as the primary key to get a row out of HBase. That would be the latter of the two methods, i.e. KeyValue.getRow(). The former simply returns the complete byte array part representing the raw "key" as colored and labeled in the diagram.
This concludes my analysis of the HBase storage architecture. I hope it provides a starting point for your own efforts to dig into the grimy details. Have fun!
Update: Slightly updated with more links to JIRA issues. Also added Zookeeper to be more precise about the current mechanisms to look up a region.
Update 2: Added details about region references.
Update 3: Added more details about region lookup as requested.
Write Ahead Logs (WAL):-
The WAL is the lifeline that is needed when disaster strikes. Similar to a BIN log in MySQL it records all changes to the data. This is important in case something happens to the primary storage. So if the server crashes it can effectively replay that log to get everything up to where the server should have been just before the crash. It also means that if writing the record to the WAL fails the whole operation must be considered a failure.
Internal Architecture of Write Ahead Log(WAL) In HBase
Let"s look at the high level view of how this is done in HBase. First the client initiates an action that modifies data. This is currently a call to put(Put), delete(Delete) andincrementColumnValue() (abbreviated as "incr" here at times). Each of these modifications is wrapped into a Key-Value object instance and sent over the wire using RPC calls. The calls are (ideally batched) to the HRegionServer that serves the affected regions. Once it arrives the payload, the said KeyValue, is routed to the HRegion that is responsible for the affected row. The data is written to the WAL and then put into the MemStrore of the actual Store that holds the record. And that also pretty much describes the write-path of HBase.
Eventually when the MemStore gets to a certain size or after a specific time the data is asynchronously persisted to the file system. In between that timeframe data is stored volatile in memory. And if the HRegionServer hosting that memory crashes the data is lost... but for the existence of what is the topic of this post, the WAL!
We have a look now at the various classes or "wheels" working the magic of the WAL. First up is one of the main classes of this contraption.
HLog
The class which implements the WAL is called HLog. What you may have read in my previous post and is also illustrated above is that there is only one instance of the HLog class, which is one per HRegionServer. When a HRegion is instantiated the single HLog is passed on as a parameter to the constructor of HRegion.
Central part to HLog's functionality is the append() method, which internally eventually calls doWrite(). It is what is called when the above mentioned modification methods are invoked... or is it? One thing to note here is that for performance reasons there is an option for put(), delete(), and incrementColumnValue() to be called with an extra parameter set:setWriteToWAL(boolean). If you invoke this method while setting up for example a Put() instance then the writing to WAL is forfeited! That is also why the downward arrow in the big picture above is done with a dashed line to indicate the optional step. By default you certainly want the WAL, no doubt about that. But say you run a large bulk import MapReduce job that you can rerun at any time. You gain extra performance but need to take extra care that no data was lost during the import. The choice is yours.
Another important feature of the HLog is keeping track of the changes. This is done by using a "sequence number". It uses an AtomicLong internally to be thread-safe and is either starting out at zero - or at that last known number persisted to the file system. So as the region is opening its storage file, it reads the highest sequence number which is stored as a meta field in each HFile and sets the HLog sequence number to that value if it is higher than what has been recorded before. So at the end of opening all storage files the HLog is initialized to reflect where persisting has ended and where to continue. You will see in a minute where this is used.
The image to the right shows three different regions. Each of them covering a different row key range. As mentioned above each of these regions shares the the same single instance of HLog. What that means in this context is that the data as it arrives at each region it is written to the WAL in an unpredictable order. We will address this further below.
Finally the HLog has the facilities to recover and split a log left by a crashed HRegionServer. These are invoked by the HMaster before regions are deployed again.
HLog Store in WAL
HLogKey
Currently the WAL is using a Hadoop SequenceFile, which stores record as sets of key/values. For the WAL the value is simply the KeyValue sent from the client. The key is represented by an HLogKey instance. In this series the KeyValue does only represent the row, column family, qualifier, timestamp, and value as well as the "Key Type". Last time I did not address that field since there was no context. Now we have one because the Key Type is what identifies what the KeyValue represents, a "put" or a "delete" (where there are a few more variations of the latter to express what is to be deleted, value, column family or a specific column).
What we are missing though is where the KeyValue belongs to, i.e. the region and the table name. That is stored in the HLogKey. What is also stored is the above sequence number. With each record that number is incremented to be able to keep a sequential order of edits. Finally it records the "Write Time", a time stamp to record when the edit was written to the log.
LogFlusher
As mentioned above as data arrives at a HRegionServer in form of KeyValue instances it is written (optionally) to the WAL. And as mentioned as well it is then written to a SequenceFile. While this seems trivial, it is not. One of the base classes in Java IO is the Stream. Especially streams writing to a file system are often buffered to improve performance as the OS is much faster writing data in batches, or blocks. If you write records separately IO throughput would be really bad. But in the context of the WAL this is causing a gap where data is supposedly written to disk but in reality it is in limbo. To mitigate the issue the underlaying stream needs to be flushed on a regular basis. This functionality is provided by the LogFlusher class and thread. It simply calls HLog.optionalSync(), which checks if the hbase.regionserver.optionallogflushinterval, set to 10 seconds by default, has been exceeded and if that is the case invokesHLog.sync(). The other place invoking the sync method is HLog.doWrite(). Once it has written the current edit to the stream it checks if the hbase.regionserver.flushlogentries parameter, set to 100 by default, has been exceeded and calls sync as well.
Sync itself invokes HLog.Writer.sync() and is implemented in SequenceFileLogWriter. For now we assume it flushes the stream to disk and all is well. That in reality this is all a bit more complicated is discussed below.
LogRoller
Obviously it makes sense to have some size restrictions related to the logs written. Also we want to make sure a log is persisted on a regular basis. This is done by the LogRoller class and thread. It is controlled by thehbase.regionserver.logroll.period parameter in the $HBASE_HOME/conf/hbase-site.xml file. By default this is set to 1 hour. So every 60 minutes the log is closed and a new one started. Over time we are gathering that way a bunch of log files that need to be maintained as well. The HLog.rollWriter() method, which is called by the LogRoller to do the above rolling of the current log file, is taking care of that as well by calling HLog.cleanOldLogs() subsequently. It checks what the highest sequence number written to a storage file is, because up to that number all edits are persisted. It then checks if there is a log left that has edits all less than that number. If that is the case it deletes said logs and leaves just those that are still needed.
The other parameters controlling the log rolling are hbase.regionserver.hlog.blocksize and hbase.regionserver.logroll.multiplier, which are set by default to rotate logs when they are at 95% of the blocksize of the SequenceFile, typically 64M. So either the logs are considered full or when a certain amount of time has passed causes the logs to be switched out, whatever comes first.
Replay
Once a HRegionServer starts and is opening the regions it hosts it checks if there are some left over log files and applies those all the way down in Store.doReconstructionLog(). Replaying a log is simply done by reading the log and adding the contained edits to the current MemStore. At the end an explicit flush of the MemStore (note, this is not the flush of the log!) helps writing those changes out to disk.
The old logs usually come from a previous region server crash. When the HMaster is started or detects that region server has crashed it splits the log files belonging to that server into separate files and stores those in the region directories on the file system they belong to. After that the above mechanism takes care of replaying the logs. One thing to note is that regions from a crashed server can only be redeployed if the logs have been split and copied. Splitting itself is done inHLog.splitLog(). The old log is read into memory in the main thread (means single threaded) and then using a pool of threads written to all region directories, one thread for each region.
Issues
Replay
Once a HRegionServer starts and is opening the regions it hosts it checks if there are some left over log files and applies those all the way down in Store.doReconstructionLog(). Replaying a log is simply done by reading the log and adding the contained edits to the current MemStore. At the end an explicit flush of the MemStore (note, this is not the flush of the log!) helps writing those changes out to disk.
The old logs usually come from a previous region server crash. When the HMaster is started or detects that region server has crashed it splits the log files belonging to that server into separate files and stores those in the region directories on the file system they belong to. After that the above mechanism takes care of replaying the logs. One thing to note is that regions from a crashed server can only be redeployed if the logs have been split and copied. Splitting itself is done inHLog.splitLog(). The old log is read into memory in the main thread (means single threaded) and then using a pool of threads written to all region directories, one thread for each region.
Issues
As mentioned above all edits are written to one HLog per HRegionServer. You would ask why that is the case? Why not write all edits for a specific region into its own log file? Let's quote the BigTable paper once more:
If we kept the commit log for each tablet in
a separate log file, a very large number of files would be written concurrently
in GFS. Depending on the underlying file system implementation on each GFS
server, these writes could cause a large number of disk seeks to write to the
different physical log files.
HBase followed that principle for pretty much the same reasons. As explained above you end up with many files since logs are rolled and kept until they are safe to be deleted. If you do this for every region separately this would not scale well - or at least be an itch that sooner or later is causing pain.
So far that seems to be no issue. But again, it causes problems when things go wrong. As long as you have applied all edits in time and persisted the data safely, all is well. But if you have to split the log because of a server crash then you need to divide into suitable pieces, as described above in the "replay" paragraph. But as you have seen above as well all edits are intermingled in the log and there is no index of what is stored at all. For that reason the HMaster cannot redeploy any region from a crashed server until it has split the logs for that very server. And that can be quite a number if the server was behind applying the edits.
Another problem is data safety. You want to be able to rely on the system to save all your data, no matter what newfangled algorithms are employed behind the scenes. As far as HBase and the log is concerned you can turn down the log flush times to as low as you want - you are still dependent on the underlying file system as mentioned above; the stream used to store the data is flushed but is it written to disk yet? We are talking about fsync style issues. Now for HBase we are most likely talking Hadoop's HDFS as being the file system that is persisted to.
Up to this point it should be abundantly clear that the log is what keeps data safe. For that reason a log could be kept open for up to an hour (or more if configured so). As data arrives a new key/value pair is written to the SequenceFile and occasionally flushed to disk. But that is not how Hadoop was set out to work. It was meant to provide an API that allows to open a file, write data into it (preferably a lot) and closed right away, leaving an immutable file for everyone else to read many times. Only after a file is closed it is visible and readable to others. If a process dies while writing the data the file is pretty much considered lost. What is required is a feature that allows to read the log up to the point where the crashed server has written it (or as close as possible).
So far that seems to be no issue. But again, it causes problems when things go wrong. As long as you have applied all edits in time and persisted the data safely, all is well. But if you have to split the log because of a server crash then you need to divide into suitable pieces, as described above in the "replay" paragraph. But as you have seen above as well all edits are intermingled in the log and there is no index of what is stored at all. For that reason the HMaster cannot redeploy any region from a crashed server until it has split the logs for that very server. And that can be quite a number if the server was behind applying the edits.
Another problem is data safety. You want to be able to rely on the system to save all your data, no matter what newfangled algorithms are employed behind the scenes. As far as HBase and the log is concerned you can turn down the log flush times to as low as you want - you are still dependent on the underlying file system as mentioned above; the stream used to store the data is flushed but is it written to disk yet? We are talking about fsync style issues. Now for HBase we are most likely talking Hadoop's HDFS as being the file system that is persisted to.
Up to this point it should be abundantly clear that the log is what keeps data safe. For that reason a log could be kept open for up to an hour (or more if configured so). As data arrives a new key/value pair is written to the SequenceFile and occasionally flushed to disk. But that is not how Hadoop was set out to work. It was meant to provide an API that allows to open a file, write data into it (preferably a lot) and closed right away, leaving an immutable file for everyone else to read many times. Only after a file is closed it is visible and readable to others. If a process dies while writing the data the file is pretty much considered lost. What is required is a feature that allows to read the log up to the point where the crashed server has written it (or as close as possible).
While append for HDFS in general is useful it is not used in HBase, but the hflush() is. What it does is writing out everything to disk as the log is written. In case of a server crash we can safely read that "dirty" file up to the last edits. The append in Hadoop 0.19.0 was so badly suited that a hadoop fsck / would report the DFS being corrupt because of the open log files HBase kept.
Bottom line is, without Hadoop 0.21.0 you can very well face data loss. With Hadoop 0.21.0 you have a state-of-the-art system.
Bottom line is, without Hadoop 0.21.0 you can very well face data loss. With Hadoop 0.21.0 you have a state-of-the-art system.
Planned Improvements
SequenceFile Replacement is One of the central building blocks around the WAL is the actual storage file format. The used SequenceFile has quite a few shortcomings that need to be addressed. One for example is the suboptimal performance as all writing in SequenceFile is synchronized, as documented in HBASE-2105.
As with HFile replacing MapFile in HBase present version it makes sense to think about a complete replacement. A first step was done to make the HBase classes independent of the underlaying file format. HBASE-2956 made the class implementing the log configurable.
Another idea is to change to a different serialization altogether. HBASE-2055 proposes such a format using Hadoop's Avro as the low level system. Avro is also slated to be the new RPC format for Hadoop, which does help as more people are familiar with it.
Append/Sync
Even with hflush() we have a problem that calling it too often may cause the system to slow down. Previous tests using the older syncFs() call did show that calling it for every record slows down the system considerably. One step to help is to implement a "Group Commit", done in HBASE-1939. It flushes out records in batches. In addition HBASE-1944 adds the notion of a "deferred log flush" as a parameter of a Column Family. If set to true it leaves the syncing of changes to the log to the newly added LogSyncer class and thread. Finally HBASE-2014sets the flushlogentries to 1 andoptionallogflushinterval to 1000 msecs. The .META. is always synced for every change, user tables can be configured as needed.
Distributed Log Splitting
As remarked splitting the log is an issue when regions need to be redeployed. One idea is to keep a list of regions with edits in Zookeeper. That way at least all "clean" regions can be deployed instantly. Only those with edits need to wait then until the logs are split.
It is very informative blog and useful article thank you for sharing with usHadoop Administration Online Training
ReplyDeleteIt is nice blog Thank you provide important information and i am searching for same information to save my time Big data hadoop online training Hyderabad
ReplyDeletesmm panel
ReplyDeleteSMM PANEL
İş ilanları
İnstagram Takipçi Satın Al
HİRDAVATCİ BURADA
beyazesyateknikservisi.com.tr
SERVİS
Jeton Hilesi