Monthly Archives: June 2014

HBase architecture

It is a NoSQL (Not Only SQL) database, more particularly a non-relational distributed column oriented database. It was inspired by Google is BigTable and developed by the Apache foundation. It works on top of the HDFS and use the library MapReduce. All the major Hadoop distribution provides HBase as a default package.

When should we use it?

This kind of database has to be used when you have millions or billions of rows in your database. You can not just pass from an RDBMS (Relational DataBase Management System) to an HBase by changing the JDBC. When you do it you have to completely redesign your application because the schema is not fixed. The number of columns of a row to another can be different.

HBase architecture

It is based on the master/slave architecture and is made out of two components. TheHMaster will be the master node and the HRegionservers are the slave nodes.

The HMaster is responsible for all the administrative operations. The data are stored in a table and this table is stored in a HRegion. If the table becomes too big, it splits into several HRegions. A region is stored on a HRegionServer, so the table can be stored all across the cluster.

The HMaster is also responsible for the allocation of the regions to HRegionservers and the control of the load. This means that it ensures that all the HRegions are not stored on the same HRegionservers.

As you can see in the next figure, the HRegionservers stores the HRegion and to do that it has several components.

HBase architecture
HBase architecture
  • HLog: He saves all the necessary data about each region saved on this HRegionserver and all the edits.
  • MemStore: It is an in-memory storage to store the log and the data. When there is enough data it writes it on the HDFS. The reason is that the HDFS allows you to write once a file or block of the file. If you have some edits to do you have to rewrite the complete block and this takes time. It is why the MemStore uses the in-memory processing to improve the performance.
  • StoreFile: It is the HDFS files where the data are saved when the MemStore is full. The management of these can introduce performance consideration that you need to be aware, it will be explained in detail in the next paragraph.

Flush and compactions

When we insert or modify a row in the database, it is firstly written in memory into the memstore. When the memstore reaches a specific size the data will be flushed to a new HDFS file by the intermediate of the storefile. When the HBase region has 3 storefiles and a full memstore, it will perform what we call a compaction. It will create a bigger storefile with the content of the 3 storefiles and the memstore.

 

HBase compactions
HBase compactions

HBase compactions figure is a chronogram of HBase flushing and compaction, as you can see the next compaction procedure will happen when you have 4 storefiles, it is because the compaction procedure follows the following algorithm:

  1. Parse all the stored files from the oldest to the youngest.
  2. If there are more than 3 storefiles left and the current store file is 20% larger than the sum of all the younger storefiles, and if it is larger than the memstore flush size, then we go to the next younger storefile and repeat step 2.
  3. If one of the conditions in step two is not valid, the storefiles from the current one to the youngest one are the ones that will be merged together. If there is less than the compaction threshold, no merge will be performed. There is also a limit which prevents more than 10 storefiles to be merged in one compaction.

The goal of doing compaction is to limit the number of store files if the data grow to really big size. To limit the number of store implies to reduce the number of entries into the HBase master table and the metadata table of the NameNode.

Flume

Flume is a distributed system that gives the possibility to collect, to aggregate and move large amount of log data from many different sources to a centralized data store. It’s a “package” of Hadoop and a project of the Apache foundation.

Flume architecture
Flume architecture

The source is a Java Virtual Machine (JVM) that is listening to some event. The event can be produced by a web server, the writing into a file (log), some applications, the reception of a packet on an Ethernet port, etc. When such an event is received, the sources store it into one or more channels.

The channel can be compared to a FIFO (First In First Out) queue. This channel can be of 3 types:

  1. MemoryChannel: All the events are stored in memory (RAM) to improve the performance
  2. FileChannel: all the events will be stored into the disk to prevent any lost in case of failure of the host.
  3. JDBCChannel: The events will be stored into a database.

As we can see in figure “flume architecture”, the sink consumes the events stored in the channels and takes, if possible, several events at one time and sends them to the final destination. The destination can be the HDFS, a database or anything else that you put in place. There are several types of sink, for example:

  1. HDFSSink: allows to write in the HDFS.
  2. HBaseSink: allows to write in a NoSQL database.
  3. AvroSink: it is used to write in a log format.

 

 

YARN

Yarn (Yet Another Resource Negotiator) is the new version of map reduce. We call it also mapreduce v2. Yarn has been developed to address several limitations of the original map reduce. Limitation in term of managing the hardware resource for an application: with the first version it is impossible to launch two applications/jobs at the same time. First of all, we need to finish the first application and then launch the second. With YARN we can now execute two applications/jobs at the same time on the same cluster. It gives the resource management tools to allocate and manage the hardware resource of each node to a specific application.

YARN architecture
YARN architecture

YARN splits the responsibility of the Jobtracker into separate entities: the job scheduling and the tasks progressing monitoring.

As you can see in figure “YARN architecture”, there is a resource manager to manage the use of the resources cross the cluster and an application master to manage a running application on the cluster. This architecture also defines the container: it represents a specific quantity of memory that can be used to execute an application. The main idea to better manage the resource on the cluster is that the application master negotiates a number of containers with the resource manager. The containers are under the supervision of the node manage. It is a daemon on each node that ensures that each container does not use more resource than it is authorized.

MapReduce

Concept

One of the main concepts that makes big data work in practice is that the data have to be processed where they have been stored, we call that the “data locality”. So the processing of the data will be made in parallel on all the DataNodes that have the data that we want to treat.

A MapReduce algorithm is a programming model that will implement 2 parts, the mapping part and the reducing part.

The mapping consists of a treatment that takes input data and maps it to <key, value> pairs. The reducing takes all the <key,value> with the same key and made a process on it .

How is it working?

Example in the real life

To explain how it works, let’s take an example. Imagine that you want to count the number of instances of each word in a book of 100 pages in one hour. How are you going to do that?

  1. You are going to ask 26 of your colleagues to come and help you.
  2. To each colleague, you give a page of the book and ask him to put each word on another page, one word per page.
  3. When a colleague finishes a page you give him another one. If a colleague becomes sick, you take back his page and give it to another colleague.
  4. When all pages have been done, you put on a table 26 boxes. Each box represents a letter of the alphabet, you ask all your colleagues to put all the words that begin with the same letter in the same box.
  5. You ask everyone to take a box and to sort alphabetically all the pages in the boxes.
  6. When the sort is done, you ask your colleagues to count the number of pages that have the same word and to give you the results.

After these 6 steps, if everyone has respected all the rules, you have the number of instances of each word in less than one hour.

Examples in Hadoop with MapReduce framework

In the next figure, we see the processes to make a word count distributed on many hosts.

First of all, you have in the input the text file that is going to be split row by row. Each row will be “mapped” by a host. The mapping step will produce many associations of <key, value> pairs, in this case the key will be the word and the value “1” because it is one word.

MapReduce workflow
MapReduce workflow

In a second time, all the key value pairs produced by the entire host will be sorted with the help of the key. This is the most complicated part, but fortunately it is made by the MapReduce library.

Finally, the entire node will perform a “reduce” task. They are going to count all the key value pairs which have the same key. The final result that you are going to get is the number of instances of each word in the file.

JobTracker & TaskTracker

In a MapReduce environment, we have 2 kinds of nodes, the JobTracker and the TaskTracker. The first is the node that schedules the entire map and reduces tasks that are given to a TaskTracker. In the next figure, there is an example of interaction between the two when a client asks to execute a MapReduce program.

mapreduce_jobtasktracker

Big Data problematic

Overview

Today, the quantity of data that are produced by the whole company increase staggeringly. In 2008, the produced data covered 0.5 zettabytes, that represent 500 millions of terabytes. In 2011, there were 2.5 zettabytes and Cisco estimates that in 2020 there will be more than 35 zettabytes of data. Moreover, the explosion of the data is accompanied with a diversification of the type of data, the unstructured data like text files, music, video,etc.

All this data have to be stored and managed but they mainly need specific infrastructure and software to be effectively processed. The final goal is to produce a useful result that the company will use to guide their business.

The big data touch all the sectors, whatsoever the marketing, in order to have a better knowledge of the customer, the mediaor the advertisers to recommend the best content to the right user, the scientific research to find the a correlation between millions of treatments, etc. All the sectors are mainly confronted with the same 5 challenges. According to a presentation of Infochimps: 80% had a problem to find talent, 76% had difficulties to find the right tools for their business, 75% didn’t have time enough, 73% had issues to understand the different platforms and 72% needed to be more educated on the topic.

Every Big Data project solution will be composed of 3 main parts : the infrastructure, the middle software that will enable us to manage and process all the data in a distributed manner and the analytic software that will run on top of the middle software. The analytic software will run several algorithms as machine learning and data mining to produce an interesting result for the business.

There are several software solutions for a big data environment. Different kinds of technologies exist NoSQL, MPP and Hadoop. There are different Hadoop distributions that are completely packaged as a Linux distribution. The most ones are MapR, Cloudera and Hortonworks. These distributions are preconfigured and the installation is more or less automated.

The real challenge

As we can see in Figure 1, the growth of the data is not linear, it’s exponential and since 2000 we see an explosion of the unstructured data coming from example the social network. An unstructured data can be text, video files, voice, etc.

Figure 1: Growth of the data
Figure 1: Growth of the data

The real issue is that if we want to keep the same performance following the data growth, the infrastructure needs to proportionally grow much more. The relation between the size of the data and the need in term of infrastructure is exponential. That introduced a big cost, that is the reason why different architectures have been developed to have a linear scaling of the performance with the size of the infrastructure.

Figure 2: Scalability
Figure 2: Scalability

As we can see in Figure 2, the relation between the size of the data and the performance is not linear for a relational database. One of the main challenge of big data is to try to have this curve more linear, to reduce the cost of the infrastructure and to have better performance.

 

Related articles:

if you want more information on differents technologies, have a look at the following articles:

HDFS

Hadoop distributed file system (HDFS) is a software that plays the role of the file system. It is shared between all the nodes of the cluster and gives the possibility to store huge quantities of data. In theory, there is no limit. In practice many companies have a cluster with several petabytes. The main principle is that all files will be split into several parts of 128Mo, all the parts of the file will be sent to different nodes and each part will be replicated.

Architecture

The HDFS is based on a master/slave architecture. It has several components that make machinery work well: the name node (the master), the data nodes (slaves), the secondary node and the balancer node. Figure 1, shows us the interaction between some of them.

HDFS architecture
Figure 1: HDFS architecture

DataNode

This role is played by a host which is going to store the data on its local hard disk drives. All the disks have to be configured in the JBOD mode. It means that the OS (Operating Systems) sees all the disks separately without any RAID configuration. The reason is that we do not need a replication of the data on the same host. The replication is done through the different DataNodes (hosts). When a file has to be stored, it is split into several blocks of 128 MB and the blocks are distributed among the data nodes.

It can be interesting to change the size of the blocks to improve the performance of the cluster. There is no instance limit of this role.

NameNode

This role is unique and played by one host which is the “scheduler” of the architecture and has several responsibilities such as:

  •     To manage the namespace.
  •     To maintain the filesystem tree.
  •     To maintain all the metadata for all files and the localization of each block of files.
  •     To ensure that each block are replicated on different nodes.

When a client wants to read a file or make any process of any data, the client is going to ask firstly to the name node for the metadata, to know to whitch data node he has to ask the data.

Balancer Node

Its job is relatively simple: it looks at the repartition of the data across the cluster and order to move the data if the repartition is not good. It often happens when we had some disk failure or node failure.

High avaibility

In this architecture, if the NameNode fails all the distributed file sytem will be completly blocked! It’s why it’s possible to add a second NameNode that will be in stanby and will take the hand if the actif NameNode has a problem.

All the Hadoop distributions allow you to have only one NameNode at the same time except one! MapR gives you the possibility to have several actif NameNode because they have developed it own distributed file system. It’s completely compatible with the open source HDFS but you have to buy the MapR distribution.