Category Archives: Hadoop

Add compression to existing HBase table?

Hello Everyone,

Recently, I had to add compression to a HBase table that already had million of lines. Why adding the compression in HBase? Firstly to reduce the needed space in HDFS and secondly you will reduce the iops needed. If the block saved in HDFS is 4 times smaller then you will need 4 times less iops to get the info. In another hand you are going to increase the CPU consumption to compress/uncompress the data. So you have to keep in mind that it can have a relative big impact on the CPU load.

Here’s a very short How to. Maybe one day you will need the info! 🙂

Firstly, you need to disable the table, in my case the table is “events” in the HBase shell:

disable ‘events’

Then, you can make a “alter table”. you can choose on which column family you want to have compression and you can also choose different kind of algorithm compression (SNAPPY, LZO, GZ or LZ4).http://whatsbigdata.be/wp-admin/post-new.php

I choose SNAPPY because it’s one than use the least CPU.

alter ‘events’, {NAME=>’metadata’, COMPRESSION=>’SNAPPY’}, {NAME=>’data’, COMPRESSION=>’SNAPPY’}

compress

 

Once it’s done, you can re-enable the table

enable ‘events’

Now if you have a look at the size (in HDFS) of the table you are not going to see the difference. You need to wait for a major compaction. With the default installation of Hortonworks it’s every 7 days. You can launch manually a major compaction with the following command in the HBase shell:

major_compact ‘events’

Wait a few minutes depending on the size of the table and the CPU resource available on your cluster.

That’s  it, you added snappy compression to your table. 🙂  In my case, it reduce by 4 the size.

 

Hadoop: HDFS Optimizations

The default configurations of all the Hadoop distributions are not optimal for an application. In Hadoop, you can configure a lot of different settings that are going to modify the comportment of the cluster. All these settings have a default value given by Hadoop, but the different distributions sometime changes this setting to have good performance regardless the application is running on your cluster.

In this article I will explain some of the settings that you can customize to improve the performance of HDFS but also explain you the impacts of each of these settings.

Before reading this article, I recommend you to read the following article to clearly understand this post.

Heap Size of the Namenode and Datanode

 

The Namenode and the Datanode are applications that run in a JVM. At each JVM you have to define the maximum memory that can be allocated to this application. To determine the size of the heap for the NameNode you need to know the maximum of data blocks that your cluster will store. The common rule is 1GB for one million of data blocks. For the Datanode, it depends on what you do and especially on the amount of data that can be sent at the same time to the same DataNode. The heap size will play into the role of a buffer if the throughput of the disk is not enough. Cloudera and Hortonwork give you the possibility to change these settings directly from the graphical interface. For MapR you need to change the settings in the following file: /opt/mapr/conf/warden.conf.

 

Size of the blocks

When a file is stored in HDFS, it is split into blocks of a specific size. This size can have a big impact on the performance if you have to read several times a file. For example, if you have a block size of 128MB and you want to read a file of 10GB from an external client of the cluster. This file has been split into 79 blocks saved across the cluster, so if you want to read it, HDFS needs to ask for 79 blocks from different location in the cluster to rebuild the file. It also means that you have 79 entries in the metadata table on the NameNode.

If you change the block size for HDFS to 256MB, you are going to divide by 2 the number of entries in your metadata table and also divide by 2 the number of necessary connections to get all the blocks.

Why not set the block size to 10GB?

Because it means that when you want to read it from an external client, you will only have one connection. If you have the file split on more servers the file can be rebuilt faster because the transfer can be done in parallel. It is why when you set the block size you need to know the average size of your file, not to have too many connections at the same time to the client but enough to get a file back as fast as possible.

Moreover, you need to remember that in this environment the data are processed where they are located. So if you have a block size of 10GB it means that the file will be on maximum 3 servers if you have a block replication of 3. Then the data will be processed on only three servers or you would need to send this data across the network to the other servers to be processed and you would lose much time.

Actually, the common size is 128MB or 256MB. To change this setting you have to change the following setting in the “$HADOOP_HOME/conf/hdfs-site.xml” file.

<property>
<name>dfs.block.size </name>
<value>128</value>
</property>
 

Block replication

I think that this setting is the setting that has the biggest impact on the performance to write the data into HDFS. If you have the block replication set higher than 1, when a block of the file has been written the replication begins and the block is read again to be sent to the other servers. A hard disk is not able to read and write at the same time so you are going to lose time for writing.

As you can see in the benchmark of DFSIO, the throughput in write can be multiplied by two or three depending on the distributions.

The default value for the replication is 1, but you can change it by changing the following setting in the “$HADOOP_HOME/conf/hdfs-site.xml” file. This setting applies to all new files unless it has been defined differently when the file was created.

<property>
<name>dfs.replication</name>
<value>3</value>
</property>

 

Compression of the data

If you have a huge quantity of data to write/read you can compress it to save space on the disk and write/read faster but this will involve using more CPU. Compression can also help decrease the network bandwidth utilization.

There are different compressions codecs that HDFS can use, they are listed in the Tableau 2: Compression codecs.

Codec Name Java class
 
DefaultCodec org.apache.hadoop.io.compress.DefaultCodec
GzipCodec org.apache.hadoop.io.compress.GzipCodec
BZip2Codec org.apache.hadoop.io.compress.BZip2Codec
SnappyCodec org.apache.hadoop.io.compress.SnappyCodec
LzoCodec org.apache.hadoop.io.compress.LzoCodec

To configure the compression you need to change some settings in the file “$HADOOP_HOME/conf/mapred-site.xml”.

  • Enable the output compression of a map:
<property>
<name>mapred.output.compress</name>
<value>true</value>
</property>
 
  • Specify the compression codec:
 <property>
<name>mapred.output.compression.codec</name>
<value>org.apache.hadoop.io.compress.GzipCodec</value>
</property>

 You can change the value with one of the entries in the Tableau 2: Compression codecs.

 

  • Change the compression type:
 <property>
<name>mapred.output.compression.type</name>
<value>BLOCK</value>
</property>

 

After any change of the mapred-site.xml file, you need to restart the service Map/Reduce for the changes to take effect.

 

Balancing the data blocks

 

When a file is saved in HDFS, it is split in multiple data blocks. If you have 6 nodes, but the data blocks are distributed across only 3 nodes, it means that when you will perform any process on the data only 3 nodes will have the data in its local storage and the others will need to get the data across the network, that will be more demanding in terms of time. You can balance the block across the node, but also across the local disk to increase the throughput.

From my experience, all the distributions automatically balance the data blocks when they are created. If you want to launch the rebalancing of the data blocks, run the following command on the namenode.

hadoop balancer –threshold 0.2

The threshold option defines the maximum threshold your cluster can have.

There is a setting in HDFS that allows you to specify the bandwidth that can be used during the rebalancing. You can configure it in the $HADOOP/conf/hdfs-site.xml, with the following property.

<property>
<name>dfs.balance.bandwidthPerSec</name>
<value>10485760</value>
</property>

HBase architecture

It is a NoSQL (Not Only SQL) database, more particularly a non-relational distributed column oriented database. It was inspired by Google is BigTable and developed by the Apache foundation. It works on top of the HDFS and use the library MapReduce. All the major Hadoop distribution provides HBase as a default package.

When should we use it?

This kind of database has to be used when you have millions or billions of rows in your database. You can not just pass from an RDBMS (Relational DataBase Management System) to an HBase by changing the JDBC. When you do it you have to completely redesign your application because the schema is not fixed. The number of columns of a row to another can be different.

HBase architecture

It is based on the master/slave architecture and is made out of two components. TheHMaster will be the master node and the HRegionservers are the slave nodes.

The HMaster is responsible for all the administrative operations. The data are stored in a table and this table is stored in a HRegion. If the table becomes too big, it splits into several HRegions. A region is stored on a HRegionServer, so the table can be stored all across the cluster.

The HMaster is also responsible for the allocation of the regions to HRegionservers and the control of the load. This means that it ensures that all the HRegions are not stored on the same HRegionservers.

As you can see in the next figure, the HRegionservers stores the HRegion and to do that it has several components.

HBase architecture
HBase architecture
  • HLog: He saves all the necessary data about each region saved on this HRegionserver and all the edits.
  • MemStore: It is an in-memory storage to store the log and the data. When there is enough data it writes it on the HDFS. The reason is that the HDFS allows you to write once a file or block of the file. If you have some edits to do you have to rewrite the complete block and this takes time. It is why the MemStore uses the in-memory processing to improve the performance.
  • StoreFile: It is the HDFS files where the data are saved when the MemStore is full. The management of these can introduce performance consideration that you need to be aware, it will be explained in detail in the next paragraph.

Flush and compactions

When we insert or modify a row in the database, it is firstly written in memory into the memstore. When the memstore reaches a specific size the data will be flushed to a new HDFS file by the intermediate of the storefile. When the HBase region has 3 storefiles and a full memstore, it will perform what we call a compaction. It will create a bigger storefile with the content of the 3 storefiles and the memstore.

 

HBase compactions
HBase compactions

HBase compactions figure is a chronogram of HBase flushing and compaction, as you can see the next compaction procedure will happen when you have 4 storefiles, it is because the compaction procedure follows the following algorithm:

  1. Parse all the stored files from the oldest to the youngest.
  2. If there are more than 3 storefiles left and the current store file is 20% larger than the sum of all the younger storefiles, and if it is larger than the memstore flush size, then we go to the next younger storefile and repeat step 2.
  3. If one of the conditions in step two is not valid, the storefiles from the current one to the youngest one are the ones that will be merged together. If there is less than the compaction threshold, no merge will be performed. There is also a limit which prevents more than 10 storefiles to be merged in one compaction.

The goal of doing compaction is to limit the number of store files if the data grow to really big size. To limit the number of store implies to reduce the number of entries into the HBase master table and the metadata table of the NameNode.

Flume

Flume is a distributed system that gives the possibility to collect, to aggregate and move large amount of log data from many different sources to a centralized data store. It’s a “package” of Hadoop and a project of the Apache foundation.

Flume architecture
Flume architecture

The source is a Java Virtual Machine (JVM) that is listening to some event. The event can be produced by a web server, the writing into a file (log), some applications, the reception of a packet on an Ethernet port, etc. When such an event is received, the sources store it into one or more channels.

The channel can be compared to a FIFO (First In First Out) queue. This channel can be of 3 types:

  1. MemoryChannel: All the events are stored in memory (RAM) to improve the performance
  2. FileChannel: all the events will be stored into the disk to prevent any lost in case of failure of the host.
  3. JDBCChannel: The events will be stored into a database.

As we can see in figure “flume architecture”, the sink consumes the events stored in the channels and takes, if possible, several events at one time and sends them to the final destination. The destination can be the HDFS, a database or anything else that you put in place. There are several types of sink, for example:

  1. HDFSSink: allows to write in the HDFS.
  2. HBaseSink: allows to write in a NoSQL database.
  3. AvroSink: it is used to write in a log format.

 

 

YARN

Yarn (Yet Another Resource Negotiator) is the new version of map reduce. We call it also mapreduce v2. Yarn has been developed to address several limitations of the original map reduce. Limitation in term of managing the hardware resource for an application: with the first version it is impossible to launch two applications/jobs at the same time. First of all, we need to finish the first application and then launch the second. With YARN we can now execute two applications/jobs at the same time on the same cluster. It gives the resource management tools to allocate and manage the hardware resource of each node to a specific application.

YARN architecture
YARN architecture

YARN splits the responsibility of the Jobtracker into separate entities: the job scheduling and the tasks progressing monitoring.

As you can see in figure “YARN architecture”, there is a resource manager to manage the use of the resources cross the cluster and an application master to manage a running application on the cluster. This architecture also defines the container: it represents a specific quantity of memory that can be used to execute an application. The main idea to better manage the resource on the cluster is that the application master negotiates a number of containers with the resource manager. The containers are under the supervision of the node manage. It is a daemon on each node that ensures that each container does not use more resource than it is authorized.

MapReduce

Concept

One of the main concepts that makes big data work in practice is that the data have to be processed where they have been stored, we call that the “data locality”. So the processing of the data will be made in parallel on all the DataNodes that have the data that we want to treat.

A MapReduce algorithm is a programming model that will implement 2 parts, the mapping part and the reducing part.

The mapping consists of a treatment that takes input data and maps it to <key, value> pairs. The reducing takes all the <key,value> with the same key and made a process on it .

How is it working?

Example in the real life

To explain how it works, let’s take an example. Imagine that you want to count the number of instances of each word in a book of 100 pages in one hour. How are you going to do that?

  1. You are going to ask 26 of your colleagues to come and help you.
  2. To each colleague, you give a page of the book and ask him to put each word on another page, one word per page.
  3. When a colleague finishes a page you give him another one. If a colleague becomes sick, you take back his page and give it to another colleague.
  4. When all pages have been done, you put on a table 26 boxes. Each box represents a letter of the alphabet, you ask all your colleagues to put all the words that begin with the same letter in the same box.
  5. You ask everyone to take a box and to sort alphabetically all the pages in the boxes.
  6. When the sort is done, you ask your colleagues to count the number of pages that have the same word and to give you the results.

After these 6 steps, if everyone has respected all the rules, you have the number of instances of each word in less than one hour.

Examples in Hadoop with MapReduce framework

In the next figure, we see the processes to make a word count distributed on many hosts.

First of all, you have in the input the text file that is going to be split row by row. Each row will be “mapped” by a host. The mapping step will produce many associations of <key, value> pairs, in this case the key will be the word and the value “1” because it is one word.

MapReduce workflow
MapReduce workflow

In a second time, all the key value pairs produced by the entire host will be sorted with the help of the key. This is the most complicated part, but fortunately it is made by the MapReduce library.

Finally, the entire node will perform a “reduce” task. They are going to count all the key value pairs which have the same key. The final result that you are going to get is the number of instances of each word in the file.

JobTracker & TaskTracker

In a MapReduce environment, we have 2 kinds of nodes, the JobTracker and the TaskTracker. The first is the node that schedules the entire map and reduces tasks that are given to a TaskTracker. In the next figure, there is an example of interaction between the two when a client asks to execute a MapReduce program.

mapreduce_jobtasktracker