Experiencing Hadoop and Big Data

Preface

We decided to experiment with Hadoop for one of our customers who rely on Business Intelligence for dealing with multi-tera byte data. By we I mean Business Intelligence practice and the UNIX engineering team. Our goal was to have a frame work to finish a certain operation for our customer that usually take 50+ hours in less time, but my goal was to emperiment Hadoop and have an automated framework to rapidly deploy nodes and play with a Single System Image cluster.

Explanation for Hadoop, MapReduce, Hadoop Streaming, cluster can be found elsewhere, use Google andThe Hadoop Blog.

Technologies

  • Citrix XenServer 6.2: We rely internally on the open source XenServer 6.2 for virtualization
  • Spacewalk: For system management, provisioning, and software repositories (It’s the open source counterpart of Red Hat Network Satellite
  • Hortonworks Data Platform: Open source Hadoop distribution, which is commercially supported Hadoop 2.2
  • Red Hat Enterprise Linux: Operating system of choice (RHEL 6.4)

Introduction

In the beginning, I though Hadoop would solve all our problems even running a simple wc -l on a multi-gigabyte file would give me near to light speed results. Well, it isn’t, it isn’t at all. But here is the story, we use XenServer to virtualize our environment, so I wanted a solution that can integrate with what we have without changing anything, we decided to build a multi-node cluster with 6 slave nodes and 1 manager with the following specs: 2 VCPUs, 2 GB RAM, 1 8 GB partition for root, and 20 GB partition for Hadoop data. It’s lame but it’s a start to get into the mysterious world of Big Data.

But first we wanted to build repositories of software, Kickstart server, a way to control all nodes without having the trouble to log into each machine to run commands, so we used Spacewalk, the Red Hat Network Satellite open source counterpart, so we installed Spacewalk, built base channels for RHEl 6.4 64-bit sources and created sub channels, configuration channels to distribute SSH keys, Kickstart profiles, etc. All is working good with minimum efforts since we have proper experience to get things done (may be I will write about it somewhere else). No PXE involved, no DHCP of our own so we have to used the company’s DHCP server which lucily support Dynamic DNS.

The experience

We create template on XenServer for Red Hat 6 installation, so when we create a new node all we have to do is to point that VM to the appropriate Kickstart location on Spacewalk and just enjoy seeing unattended installation in few minutes until the machine is online with all needed packages installed for Spacewalk and Hadoop.

At first, we created a cluster of 1 Master and 6 Data nodes, each with 2 VCPUs, 2 GB RAM, 1 8 GB partition for root, and 20 GB partition for Hadoop data. We started to play around with Hadoop streaming since we are no Java guys, we have tons of scripts written of Perl, Bash and a load of UNIX commands that can serve as testing the concept. And the first thing was to use wc -l command on a 2 GB file and compare the results, silly, very silly! And a bad and unrealistic thing to do. Please don’t try it. But for the records, wc -l on command shell finished in around 5 seconds, MapReduce wc -l finished in about 10 minutes!

I wanted to push the limits, to use on real data and get real results, so here is the flow of our experiments:

The experiment

Analyzing Apache Log file and find the most visited pages.

The Data

A 95 MB access_log file for one of our servers.

The code

Mapper

#!/bin/bash
# Takes a line:
# 10.0.0.207 - - [09/Nov/2013:12:08:13 +0200] "GET /somewhere/itworx/housekeeping.php?v=ASDF436345 HTTP/1.0" 200 -
# And return:
# /somewhere/itworx/housekeeping.php     1
#
export PATH=$PATH:/bin
while read line ; do
        url=$(echo $line | sed -e 's/.*\(GET\|POST\) \([^ ]*\).*/\2/' | sed -e 's/\([^\?]*\).*/\1/')
        echo "$url      1"
done
exit 0

Reducer

#!/bin/bash
export PATH=$PATH:/bin
# url   1
prvurl=""
cnt=0
while read line ; do
        thisurl=$(echo $line | awk '{ print $1 }' )
        if [ "$prvurl" = "$thisurl" ] ; then
                cnt=$(( cnt+1 ))
        else
                # if i have previous key, i should print
                # New key, reset count, assign new key to prev key
                if [ ! -z "$prvurl" ] ; then
                        echo "$prvurl   $cnt"
                fi
                cnt=1
                prvurl="$thisurl"
        fi
done
if [ "$prvurl" = "$thisurl" ] ; then
        echo "$prvurl   $cnt"
fi
exit 0

Attempt 1

Running locally to set a baseline time:

$ time ( cat access_log | ./most_visited_page-map.sh | sort | ./most_visited_page-reduce.sh )
real 128m8.484s
user 12m39.959s
sys 92m9.671s

So running the above scripts on 1 95 GB file takes 2 hours and 8 minutes. Good!

Attempt 2:

Running the same scripts on Hadoop for the same file:

$ time hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming-2.2.0.2.0.6.0-76.jar \
-files /tmp/most_visited_page-map.sh,/tmp/most_visited_page-reduce.sh \
-mapper "most_visited_page-map.sh" -reducer "most_visited_page-reduce.sh" \
-input /user/te/access_log -output /user/te/access_log_analysis
 
...
 
real 156m50.245s
user 0m21.877s
sys 0m5.520s

Note: -files will ship script files from local machine (the machine that submits the job) to hadoop nodes, they are shipped to the working directory of the job.

To monitor the job:

$ mapred job -status job_1383865655809_0020

So it takes 2 hours, 36 minutes, 22 minutes more than running on command line.

Preliminary conclusions

  • Hadoop block size is 128 MB, the file we have will occupy only one block, so by default we will have 1 mapper, and 1 reducer (single process pipe), so we should have expected same perforamce plus Hadoop overhead
  • Hadoop is not for small data, we need to have much much more data
  • Hadoop is written in Java, java is a memory hungry bitch, we need more memory for the VMs.

So we did the following:

  • Added 2 GB for each VM
  • Added 2 physical machines with exact same specs, so we now have 1 Master and 8 Data nodes
  • Input file now is 3.3 GB

Attempt 3:

It really didn’t happen, but simple naive math, running the same scripts on the new data file would take 69 hours (almost 3 days).

Attempt 4:

Running on Hadoop with default number of mappers and reducers.

  • Job ID: job_1384084708777_0034
  • Input file length: 3.3G
  • # of blocks: 26
  • Block size: 128
  • # of mappers: 26 (default)
  • # of reducers: 1 (default)
  • Number of records processed in file: 35,083,092
  • Total time: Still running
  • Peak memory usage (Ambari portal): 169%

Attempt 5:

Pushing the limits, setting number of mappers and reducers.

  • Job ID:
  • Input file length: 3.3G
  • # of blocks: 26
  • Block size: 128
  • # of mappers: 71
  • # of reducers: 11
  • Number of records processed in file: 35,083,092
  • Total time: All mappers reducers finished within 6 hours, except 1 reducer which finished in 18 hours because it processed 26,000,000+ records alone
  • Peak memory usage (Ambari portal):

Attempt 6:

In progress…

The experiments are still running …