Experiencing Hadoop and Big Data

Preface

We decided to experiment with Hadoop for one of our customers who rely on Business Intelligence for dealing with multi-tera byte data. By we I mean Business Intelligence practice and the UNIX engineering team. Our goal was to have a frame work to finish a certain operation for our customer that usually take 50+ hours in less time, but my goal was to emperiment Hadoop and have an automated framework to rapidly deploy nodes and play with a Single System Image cluster.

Explanation for Hadoop, MapReduce, Hadoop Streaming, cluster can be found elsewhere, use Google andThe Hadoop Blog.

Technologies

  • Citrix XenServer 6.2: We rely internally on the open source XenServer 6.2 for virtualization
  • Spacewalk: For system management, provisioning, and software repositories (It’s the open source counterpart of Red Hat Network Satellite
  • Hortonworks Data Platform: Open source Hadoop distribution, which is commercially supported Hadoop 2.2
  • Red Hat Enterprise Linux: Operating system of choice (RHEL 6.4)

Introduction

In the beginning, I though Hadoop would solve all our problems even running a simple wc -l on a multi-gigabyte file would give me near to light speed results. Well, it isn’t, it isn’t at all. But here is the story, we use XenServer to virtualize our environment, so I wanted a solution that can integrate with what we have without changing anything, we decided to build a multi-node cluster with 6 slave nodes and 1 manager with the following specs: 2 VCPUs, 2 GB RAM, 1 8 GB partition for root, and 20 GB partition for Hadoop data. It’s lame but it’s a start to get into the mysterious world of Big Data.

But first we wanted to build repositories of software, Kickstart server, a way to control all nodes without having the trouble to log into each machine to run commands, so we used Spacewalk, the Red Hat Network Satellite open source counterpart, so we installed Spacewalk, built base channels for RHEl 6.4 64-bit sources and created sub channels, configuration channels to distribute SSH keys, Kickstart profiles, etc. All is working good with minimum efforts since we have proper experience to get things done (may be I will write about it somewhere else). No PXE involved, no DHCP of our own so we have to used the company’s DHCP server which lucily support Dynamic DNS.

The experience

We create template on XenServer for Red Hat 6 installation, so when we create a new node all we have to do is to point that VM to the appropriate Kickstart location on Spacewalk and just enjoy seeing unattended installation in few minutes until the machine is online with all needed packages installed for Spacewalk and Hadoop.

At first, we created a cluster of 1 Master and 6 Data nodes, each with 2 VCPUs, 2 GB RAM, 1 8 GB partition for root, and 20 GB partition for Hadoop data. We started to play around with Hadoop streaming since we are no Java guys, we have tons of scripts written of Perl, Bash and a load of UNIX commands that can serve as testing the concept. And the first thing was to use wc -l command on a 2 GB file and compare the results, silly, very silly! And a bad and unrealistic thing to do. Please don’t try it. But for the records, wc -l on command shell finished in around 5 seconds, MapReduce wc -l finished in about 10 minutes!

I wanted to push the limits, to use on real data and get real results, so here is the flow of our experiments:

The experiment

Analyzing Apache Log file and find the most visited pages.

The Data

A 95 MB access_log file for one of our servers.

The code

Mapper

#!/bin/bash
# Takes a line:
# 10.0.0.207 - - [09/Nov/2013:12:08:13 +0200] "GET /somewhere/itworx/housekeeping.php?v=ASDF436345 HTTP/1.0" 200 -
# And return:
# /somewhere/itworx/housekeeping.php     1
#
export PATH=$PATH:/bin
while read line ; do
        url=$(echo $line | sed -e 's/.*\(GET\|POST\) \([^ ]*\).*/\2/' | sed -e 's/\([^\?]*\).*/\1/')
        echo "$url      1"
done
exit 0

Reducer

#!/bin/bash
export PATH=$PATH:/bin
# url   1
prvurl=""
cnt=0
while read line ; do
        thisurl=$(echo $line | awk '{ print $1 }' )
        if [ "$prvurl" = "$thisurl" ] ; then
                cnt=$(( cnt+1 ))
        else
                # if i have previous key, i should print
                # New key, reset count, assign new key to prev key
                if [ ! -z "$prvurl" ] ; then
                        echo "$prvurl   $cnt"
                fi
                cnt=1
                prvurl="$thisurl"
        fi
done
if [ "$prvurl" = "$thisurl" ] ; then
        echo "$prvurl   $cnt"
fi
exit 0

Attempt 1

Running locally to set a baseline time:

$ time ( cat access_log | ./most_visited_page-map.sh | sort | ./most_visited_page-reduce.sh )
real 128m8.484s
user 12m39.959s
sys 92m9.671s

So running the above scripts on 1 95 GB file takes 2 hours and 8 minutes. Good!

Attempt 2:

Running the same scripts on Hadoop for the same file:

$ time hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming-2.2.0.2.0.6.0-76.jar \
-files /tmp/most_visited_page-map.sh,/tmp/most_visited_page-reduce.sh \
-mapper "most_visited_page-map.sh" -reducer "most_visited_page-reduce.sh" \
-input /user/te/access_log -output /user/te/access_log_analysis
 
...
 
real 156m50.245s
user 0m21.877s
sys 0m5.520s

Note: -files will ship script files from local machine (the machine that submits the job) to hadoop nodes, they are shipped to the working directory of the job.

To monitor the job:

$ mapred job -status job_1383865655809_0020

So it takes 2 hours, 36 minutes, 22 minutes more than running on command line.

Preliminary conclusions

  • Hadoop block size is 128 MB, the file we have will occupy only one block, so by default we will have 1 mapper, and 1 reducer (single process pipe), so we should have expected same perforamce plus Hadoop overhead
  • Hadoop is not for small data, we need to have much much more data
  • Hadoop is written in Java, java is a memory hungry bitch, we need more memory for the VMs.

So we did the following:

  • Added 2 GB for each VM
  • Added 2 physical machines with exact same specs, so we now have 1 Master and 8 Data nodes
  • Input file now is 3.3 GB

Attempt 3:

It really didn’t happen, but simple naive math, running the same scripts on the new data file would take 69 hours (almost 3 days).

Attempt 4:

Running on Hadoop with default number of mappers and reducers.

  • Job ID: job_1384084708777_0034
  • Input file length: 3.3G
  • # of blocks: 26
  • Block size: 128
  • # of mappers: 26 (default)
  • # of reducers: 1 (default)
  • Number of records processed in file: 35,083,092
  • Total time: Still running
  • Peak memory usage (Ambari portal): 169%

Attempt 5:

Pushing the limits, setting number of mappers and reducers.

  • Job ID:
  • Input file length: 3.3G
  • # of blocks: 26
  • Block size: 128
  • # of mappers: 71
  • # of reducers: 11
  • Number of records processed in file: 35,083,092
  • Total time: All mappers reducers finished within 6 hours, except 1 reducer which finished in 18 hours because it processed 26,000,000+ records alone
  • Peak memory usage (Ambari portal):

Attempt 6:

In progress…

The experiments are still running …

Running SunOS 4.1.4 on QEMU on my laptop

had an successful attempt to install and run SunOS 4.1.4 on my laptop using QEMU SPARC processor emulation mode.

Here is the command I started with:

$ bin/qemu-system-sparc -M SS-20 -m 512 -cpu "TI SuperSparc 60" -serial telnet:0.0.0.0:3001,server \
-bios bios/my-sun/ss20_v2.25_rom -nographic -hda dsks/test-sun414-root-dsk.raw -cdrom iso/sunos414.iso

Where:

  • hda = dsks/test-sun414-root-dsk.raw
  • cdrom = iso/sunos414.iso

EPROM output:

ok devalias
...
disk3 /iommu/sbus/espdma@f,400000/esp@f,800000/sd@3,0
disk2 /iommu/sbus/espdma@f,400000/esp@f,800000/sd@2,0
disk1 /iommu/sbus/espdma@f,400000/esp@f,800000/sd@1,0
disk0 /iommu/sbus/espdma@f,400000/esp@f,800000/sd@0,0

And here is how QEMU sees block devices:

(qemu) info block
scsi0-hd0: removable=0 io-status=ok file=dsks/test-sun414-root-dsk.raw ro=0 drv=raw encrypted=0 bps=0 bps_rd=0 bps_wr=0 iops=0 iops_rd=0 iops_wr=0
scsi0-cd2: removable=1 locked=0 tray-open=0 io-status=ok file=iso/sunos414.iso ro=1 drv=raw encrypted=0 bps=0 bps_rd=0 bps_wr=0 iops=0 iops_rd=0 iops_wr=0
floppy0: removable=1 locked=0 tray-open=0 [not inserted]
sd0: removable=1 locked=0 tray-open=0 [not inserted]

Here is how the EPROM sees block devices:

ok probe-scsi-all
/iommu@f,e0000000/sbus@f,e0001000/espdma@f,400000/esp@f,800000
Target 0
Unit 0 Disk QEMU QEMU HARDDISK 1.2.
Unit 1 Disk QEMU QEMU HARDDISK 1.2.
Unit 2 Disk QEMU QEMU HARDDISK 1.2.
Unit 3 Disk QEMU QEMU HARDDISK 1.2.
Unit 4 Disk QEMU QEMU HARDDISK 1.2.
Unit 5 Disk QEMU QEMU HARDDISK 1.2.
Unit 6 Disk QEMU QEMU HARDDISK 1.2.
Unit 7 Disk QEMU QEMU HARDDISK 1.2.
Target 2
Unit 0 Removable Read Only device QEMU QEMU CD-ROM 1.2.
Unit 1 Removable Read Only device QEMU QEMU CD-ROM 1.2.
Unit 2 Removable Read Only device QEMU QEMU CD-ROM 1.2.
Unit 3 Removable Read Only device QEMU QEMU CD-ROM 1.2.
Unit 4 Removable Read Only device QEMU QEMU CD-ROM 1.2.
Unit 5 Removable Read Only device QEMU QEMU CD-ROM 1.2.
Unit 6 Removable Read Only device QEMU QEMU CD-ROM 1.2.
Unit 7 Removable Read Only device QEMU QEMU CD-ROM 1.2.

I was faced with an error from the installation media, I don’t exactly remember the error since it was long ago but I assume it’s with the MAKEDEV script that creates device files failing to create device files in RAM disk installation image, and the solution was:

setenv sbus-probe-list f
reset

boot disk2:d (CDROM)

[ MAKEDEV trick ]

boot disk0:b -sw
 
boot disk0:a
...

Then fake MAKEDEV

# cd /dev
# mv MADEDEV MAKEDEV.orig
# ln -s /bin/true MAKEDEV
# ln -s sd1a sr0
# ln -s rsd1a rsr0

Here is the dmesg from the running system:

Sep 12 18:20
SuperSPARC: PAC ENABLED
SunOS Release 4.1.4 (GENERIC) #2: Fri Oct 14 11:09:47 PDT 1994
Copyright (c) 1983-1993, Sun Microsystems, Inc.
cpu = SUNW,SPARCstation-20
mod0 = TI,TMS390Z50 (mid = 8)
mem = 523840K (0x1ff90000)
avail mem = 510951424
cpu0 at Mbus 0x8 0x240000
entering uniprocessor mode
Ethernet address = 0:0:0:0:0:0
espdma0 at SBus slot f 0x400000
esp0 at SBus slot f 0x800000 pri 4 (onboard)
sd1 at esp0 target 1 lun 0
sd1:
sd3: non-CCS device found at target 0 lun 0 on esp0
sd3 at esp0 target 0 lun 0
sd3:
ledma0 at SBus slot f 0x400010
le0 at SBus slot f 0xc00000 pri 6 (onboard)
SUNW,bpp0 at SBus slot f 0x4800000 pri 3 (sbus level 2)
zs0 at obio 0x100000 pri 12 (onboard)
zs1 at obio 0x0 pri 12 (onboard)
fdc: no RQM - stat 0xc0
fdc: no RQM - stat 0xc0
SUNW,fdtwo0 at obio 0x700000 pri 11 (onboard)
root on sd3a fstype 4.2
swap on sd3b fstype spec size 98800K
dump on sd3b fstype spec size 98788K
le0: AUI Ethernet

And finnally here is the actual command for installation

$ bin/qemu-system-sparc -m 512 -serial telnet:0.0.0.0:3001,server -M SS-20 -cpu "TI SuperSparc 60" \
-bios bios/my-sun/ss20_v2.25_rom -nographic -net user -hda dsks/test-sun414-root-dsk.raw -hdb iso/sunos414.iso

References:

Karma score (contribution analysis system)

ChangeLog:

17 October, 2013: Initial draft (Tamer)

Introduction:

Karma score is a contribution analysis scoring system that help identifying if a user is active contributor to a community by give a score to certain activities. For example when a user login to the community portal it would award them certain score for logging in (for example 20 points), when a user reviews a comment and give it a score they would be given another higher score, and the same goes to posting reviews or comments but with higher score.

This activities would then recorded in a database table (or any other persistent store so that later when a function requires the Karma score of a user it will easily find it on that table.

At anytime, Karma score would be the sum of all individual score awarded to the user by different subsystems and categories of the community portal.

How it works:

Create a database table with the following fields:

  • User ID: ID representing the user in the system
  • Date and time: Date and time user was awarded Karma
  • Action description: Type of action done by user, for example login, comment review or posting comments etc.
  • Karma award: Score awarded for this action, which is a numeric value

Assume there is a subroutine called add_karma(user_id, action_descr, karma) which takes as parameter name suggest three values to insert them in the Karma table.

So for example when at user login, the function gets called from the login code as:

add_karma(1001, “Login action”, 20);

Which adds a record in the Karma table for the user (ID 1001) awarding them a 20 points for logging in.

So at any point in time whenever a subsystem wants Karma score for a user they would call another subroutine (get_karma_score())that would sum all records in the Karma table and returns it as Karma score for the user.

Two or more time threshold should be defined by the system, one for expiring old records for all users and the other(s) define time threshold for decreasing action scores, for example, after 10 days decrease login score from 20 to 10, etc.

A nightly job should run on the table to honour those thresholds and recalculate new Karma scores if any.

Krama decays over time:

Karma score should be influenced by the time a user performed the actions they got awarded to, for example, a user posted a review in the past 10 days should have more Karma score than a user who did a post last year. A nightly job should crawler the Karma log table and expire actions falling beyond certain time threshold, it might also decrease scoring of actions for users if they passed certain time threshold as well. So than a user logged in today would have Karma score more than a user who logged in 3 weeks ago.

This would reflect if a user is active and heavily contributing to the community or just a visitor.

Usage:

Karma score is a contribution weight, which means it reflects how a user is active and contributing to the community. The higher the score the more contribution user has performed.

Users with higher Karma score is designated to be active users, which is a personal evaluation to the activities done by user, if combined with other factors like Credibility score (how other evaluate my contribution) or Spam/report counter would give more insight of how legitimate their future contribution are or if they are worthy or not.

References: