Category Archives: Uncategorized

Add compression to existing HBase table?

Hello Everyone,

Recently, I had to add compression to a HBase table that already had million of lines. Why adding the compression in HBase? Firstly to reduce the needed space in HDFS and secondly you will reduce the iops needed. If the block saved in HDFS is 4 times smaller then you will need 4 times less iops to get the info. In another hand you are going to increase the CPU consumption to compress/uncompress the data. So you have to keep in mind that it can have a relative big impact on the CPU load.

Here’s a very short How to. Maybe one day you will need the info! 🙂

Firstly, you need to disable the table, in my case the table is “events” in the HBase shell:

disable ‘events’

Then, you can make a “alter table”. you can choose on which column family you want to have compression and you can also choose different kind of algorithm compression (SNAPPY, LZO, GZ or LZ4).http://whatsbigdata.be/wp-admin/post-new.php

I choose SNAPPY because it’s one than use the least CPU.

alter ‘events’, {NAME=>’metadata’, COMPRESSION=>’SNAPPY’}, {NAME=>’data’, COMPRESSION=>’SNAPPY’}

compress

 

Once it’s done, you can re-enable the table

enable ‘events’

Now if you have a look at the size (in HDFS) of the table you are not going to see the difference. You need to wait for a major compaction. With the default installation of Hortonworks it’s every 7 days. You can launch manually a major compaction with the following command in the HBase shell:

major_compact ‘events’

Wait a few minutes depending on the size of the table and the CPU resource available on your cluster.

That’s  it, you added snappy compression to your table. 🙂  In my case, it reduce by 4 the size.

 

Hadoop: HDFS Optimizations

The default configurations of all the Hadoop distributions are not optimal for an application. In Hadoop, you can configure a lot of different settings that are going to modify the comportment of the cluster. All these settings have a default value given by Hadoop, but the different distributions sometime changes this setting to have good performance regardless the application is running on your cluster.

In this article I will explain some of the settings that you can customize to improve the performance of HDFS but also explain you the impacts of each of these settings.

Before reading this article, I recommend you to read the following article to clearly understand this post.

Heap Size of the Namenode and Datanode

 

The Namenode and the Datanode are applications that run in a JVM. At each JVM you have to define the maximum memory that can be allocated to this application. To determine the size of the heap for the NameNode you need to know the maximum of data blocks that your cluster will store. The common rule is 1GB for one million of data blocks. For the Datanode, it depends on what you do and especially on the amount of data that can be sent at the same time to the same DataNode. The heap size will play into the role of a buffer if the throughput of the disk is not enough. Cloudera and Hortonwork give you the possibility to change these settings directly from the graphical interface. For MapR you need to change the settings in the following file: /opt/mapr/conf/warden.conf.

 

Size of the blocks

When a file is stored in HDFS, it is split into blocks of a specific size. This size can have a big impact on the performance if you have to read several times a file. For example, if you have a block size of 128MB and you want to read a file of 10GB from an external client of the cluster. This file has been split into 79 blocks saved across the cluster, so if you want to read it, HDFS needs to ask for 79 blocks from different location in the cluster to rebuild the file. It also means that you have 79 entries in the metadata table on the NameNode.

If you change the block size for HDFS to 256MB, you are going to divide by 2 the number of entries in your metadata table and also divide by 2 the number of necessary connections to get all the blocks.

Why not set the block size to 10GB?

Because it means that when you want to read it from an external client, you will only have one connection. If you have the file split on more servers the file can be rebuilt faster because the transfer can be done in parallel. It is why when you set the block size you need to know the average size of your file, not to have too many connections at the same time to the client but enough to get a file back as fast as possible.

Moreover, you need to remember that in this environment the data are processed where they are located. So if you have a block size of 10GB it means that the file will be on maximum 3 servers if you have a block replication of 3. Then the data will be processed on only three servers or you would need to send this data across the network to the other servers to be processed and you would lose much time.

Actually, the common size is 128MB or 256MB. To change this setting you have to change the following setting in the “$HADOOP_HOME/conf/hdfs-site.xml” file.

<property>
<name>dfs.block.size </name>
<value>128</value>
</property>
 

Block replication

I think that this setting is the setting that has the biggest impact on the performance to write the data into HDFS. If you have the block replication set higher than 1, when a block of the file has been written the replication begins and the block is read again to be sent to the other servers. A hard disk is not able to read and write at the same time so you are going to lose time for writing.

As you can see in the benchmark of DFSIO, the throughput in write can be multiplied by two or three depending on the distributions.

The default value for the replication is 1, but you can change it by changing the following setting in the “$HADOOP_HOME/conf/hdfs-site.xml” file. This setting applies to all new files unless it has been defined differently when the file was created.

<property>
<name>dfs.replication</name>
<value>3</value>
</property>

 

Compression of the data

If you have a huge quantity of data to write/read you can compress it to save space on the disk and write/read faster but this will involve using more CPU. Compression can also help decrease the network bandwidth utilization.

There are different compressions codecs that HDFS can use, they are listed in the Tableau 2: Compression codecs.

Codec Name Java class
 
DefaultCodec org.apache.hadoop.io.compress.DefaultCodec
GzipCodec org.apache.hadoop.io.compress.GzipCodec
BZip2Codec org.apache.hadoop.io.compress.BZip2Codec
SnappyCodec org.apache.hadoop.io.compress.SnappyCodec
LzoCodec org.apache.hadoop.io.compress.LzoCodec

To configure the compression you need to change some settings in the file “$HADOOP_HOME/conf/mapred-site.xml”.

  • Enable the output compression of a map:
<property>
<name>mapred.output.compress</name>
<value>true</value>
</property>
 
  • Specify the compression codec:
 <property>
<name>mapred.output.compression.codec</name>
<value>org.apache.hadoop.io.compress.GzipCodec</value>
</property>

 You can change the value with one of the entries in the Tableau 2: Compression codecs.

 

  • Change the compression type:
 <property>
<name>mapred.output.compression.type</name>
<value>BLOCK</value>
</property>

 

After any change of the mapred-site.xml file, you need to restart the service Map/Reduce for the changes to take effect.

 

Balancing the data blocks

 

When a file is saved in HDFS, it is split in multiple data blocks. If you have 6 nodes, but the data blocks are distributed across only 3 nodes, it means that when you will perform any process on the data only 3 nodes will have the data in its local storage and the others will need to get the data across the network, that will be more demanding in terms of time. You can balance the block across the node, but also across the local disk to increase the throughput.

From my experience, all the distributions automatically balance the data blocks when they are created. If you want to launch the rebalancing of the data blocks, run the following command on the namenode.

hadoop balancer –threshold 0.2

The threshold option defines the maximum threshold your cluster can have.

There is a setting in HDFS that allows you to specify the bandwidth that can be used during the rebalancing. You can configure it in the $HADOOP/conf/hdfs-site.xml, with the following property.

<property>
<name>dfs.balance.bandwidthPerSec</name>
<value>10485760</value>
</property>

MapReduce

Concept

One of the main concepts that makes big data work in practice is that the data have to be processed where they have been stored, we call that the “data locality”. So the processing of the data will be made in parallel on all the DataNodes that have the data that we want to treat.

A MapReduce algorithm is a programming model that will implement 2 parts, the mapping part and the reducing part.

The mapping consists of a treatment that takes input data and maps it to <key, value> pairs. The reducing takes all the <key,value> with the same key and made a process on it .

How is it working?

Example in the real life

To explain how it works, let’s take an example. Imagine that you want to count the number of instances of each word in a book of 100 pages in one hour. How are you going to do that?

  1. You are going to ask 26 of your colleagues to come and help you.
  2. To each colleague, you give a page of the book and ask him to put each word on another page, one word per page.
  3. When a colleague finishes a page you give him another one. If a colleague becomes sick, you take back his page and give it to another colleague.
  4. When all pages have been done, you put on a table 26 boxes. Each box represents a letter of the alphabet, you ask all your colleagues to put all the words that begin with the same letter in the same box.
  5. You ask everyone to take a box and to sort alphabetically all the pages in the boxes.
  6. When the sort is done, you ask your colleagues to count the number of pages that have the same word and to give you the results.

After these 6 steps, if everyone has respected all the rules, you have the number of instances of each word in less than one hour.

Examples in Hadoop with MapReduce framework

In the next figure, we see the processes to make a word count distributed on many hosts.

First of all, you have in the input the text file that is going to be split row by row. Each row will be “mapped” by a host. The mapping step will produce many associations of <key, value> pairs, in this case the key will be the word and the value “1” because it is one word.

MapReduce workflow
MapReduce workflow

In a second time, all the key value pairs produced by the entire host will be sorted with the help of the key. This is the most complicated part, but fortunately it is made by the MapReduce library.

Finally, the entire node will perform a “reduce” task. They are going to count all the key value pairs which have the same key. The final result that you are going to get is the number of instances of each word in the file.

JobTracker & TaskTracker

In a MapReduce environment, we have 2 kinds of nodes, the JobTracker and the TaskTracker. The first is the node that schedules the entire map and reduces tasks that are given to a TaskTracker. In the next figure, there is an example of interaction between the two when a client asks to execute a MapReduce program.

mapreduce_jobtasktracker