Friday, 11 July 2014

Hadoop-HDFS Code Analysis-1

I had hard time understanding the HDFS code, not only because it was too big and complex but also because of lack of documentation available.

1. Create File System
FileSystem is an abstract class extending Hadoops configured class.
Like a Normal file system it defines all File functions.
All File System that Hadoop use indirectly extends FileSystem class.
Which File System to be invoked depends on  Hadoop Configuration file.
 Suppose we specify hdfs://localhost:9000 which tells hadoop uses DistributedFileSystem.
This file can be invoked from core-default.xml
file:/// invokes LocalFileSystem

2.Client invokes Server
mkdirs function assume calls the DistributedFileSystems mkdir

public boolean mkdirs(Path f, FsPermission permission) throws IOException {
    return mkdirsInternal(f, permission, true);

mkdir() calls mkdirInternal which inturn uses variable : dfs which is object of DFSClient
Apache File Descritption

DFSClient can connect to a Hadoop Filesystem and perform basic file tasks. It uses the ClientProtocol to communicate with a NameNode daemon, and connects directly to DataNodes to read / write block data. Hadoop DFS users should obtain an instance of DistributedFileSystem, which uses DFSClient to handle filesystem tasks.

Initialize Method is called.

All operation on DistributedFileSystem transferred to DFSClient.

public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
      Configuration conf, FileSystem.Statistics stats)
    throws IOException {

DFSclient method call forwarded to namenode and its assoicate variable rpcNameNode
Client Protocol provide all method through which methods from client to server can be invoked.
All method calls from rpcNamenode come down to Invokers invoke() method.
This way the calls are transferred to server side method calls.
Eventually it gets connected to Server class.
3 major components of server
Listener, Responder, Handler,

Thursday, 3 July 2014

Setting up Hadoop Development Envoirnment (Hadoop Snapshot Version 3)

If you want to play around with Hadoop code, you need to setup Hadoop Development environment.
Steps that needs to be followed regarding the same.

Some Softwares needed for Hadoop Installation:
4) JDK
5) Findbugs
6) Apache Ivy

1) Install Jdk

a) sudo apt-get install openjdk-7-jdk
b) set JAVA_HOME in .bashrc file
export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64
export PATH=$JAVA_HOME/bin:$PATH

2) Install Maven on your Ubuntu Machine

a) sudo wget
b) Once the tar is downloaded you need to untar it using
tar -xzf /apache-maven-3.2.2-bin.tar.gz -C /usr/local
c) ln -s apache-maven-3.2.2 maven
d) add following lines to .bashrc file
export MAVEN_HOME=/usr/local/maven

3) Install Protocol Buffer
a) Unpack
b) tar zxvf protobuf-2.5.tar.gz
c) cd protobuf-2.5
d) ./configure
e) make
f) sudo make install

4) Install Apache Ant, Apache Ivy and Firebug
Unpack them,set home and add them as a path in .bashrc file

5)Clone Hadoop Codebase
git clone git:// hadoop

6)Go to directory hadoop
mvn clean install -DskipTests -Pdist

That's it your Hadoop is ready for development

Next task run examples on hadoop

7) Go to Hadoop-dist folder for Hadoop current release

8) edit .bashrc
vi /home/hduser/.bashrc
HADOOP_HOME will be a folder which has bin in it so in our case /usr/local/hadop/hadoop-dist/target/hadoop-3.0.0-SNAPSHOT

Add following lines
HADOOP_HOME= export /usr/local/hadoop/hadoop-dist/target/hadoop-3.0.0-SNAPSHOT
export PATH=$PATH:$HADOOP_HOME/sbin 

9)Now update xmls 
a) cd /usr/local/hadop/hadoop-dist/target/hadoop-3.0.0-SNAPSHOT/
vi ./etc/hadoop/
c)add the following
export JAVA_HOME= /usr/lib/jvm/java-7-openjdk-amd64
d)test by typing ./bin/hadoop version
 Hadoop 3.0.0-SNAPSHOT
Subversion git:// -r 2b58a0b0b7928c5de2f3bbee04606790fa8345d6
Compiled by hduser on 2014-07-04T00:29Z
Compiled with protoc 2.5.0
From source with checksum 98bf118b8fd146ceb73158fc38fe1e9
This command was run using /home/hduser/projects/hadoop/hadoop-dist/target/hadoop-3.0.0-SNAPSHOT/share/hadoop/common/hadoop-common-3.0.0-SNAPSHOT.jar
My output: 

Hadoop 3.0.0-SNAPSHOT
Subversion git:// -r 2b58a0b0b7928c5de2f3bbee04606790fa8345d6
Compiled by hduser on 2014-07-04T00:29Z
Compiled with protoc 2.5.0
From source with checksum 98bf118b8fd146ceb73158fc38fe1e9

This command was run using /home/hduser/projects/hadoop/hadoop-dist/target/hadoop-3.0.0-SNAPSHOT/share/hadoop/common/hadoop-common-3.0.0-SNAPSHOT.jar

e)Configure Hadoop env

vi ./etc/hadoop/core-site.xml

f) configure yarn-site.xml
#Paste following between <configuration>
<value>mapreduce_shuffle</value> </property> <property>    <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>    <value>org.apache.hadoop.mapred.ShuffleHandler</value> 

g) cp ./etc/hadoop/mapred-site.xml.template ./etc/hadoop/mapred-site.xml
h)Add following lines between Configuration tags for mapred-site.xml


i)Now make the directory for namenode and datanode
mkdir -p /tmp/namenode
mkdir -p /tmp/datanode

j)update hdfs-site.xml
vi ./etc/hadoop/hdfs-site.xml


k)./bin/hdfs namenode -format
Should give something like

32454 NameNode
760 ResourceManager
32706 DataNode
558 SecondaryNameNode
2516 Jps
1044 NodeManager

n) ./bin/hadoop jar ./share/hadoop/mapreduce/hadoop-mapreduce-examples-3.0.0-SNAPSHOT.jar pi 2 5

Thats it
You are now ready to run examples, write or modify code.

This is the way I installed Hadoop
Check these tutorial really awesome one for Hadoop Installation.

Wednesday, 4 June 2014

Consensus Protocols : Overview

Consensus Problem :  Mutually Agreeing on a particular Value

Examples :  
Committing a transaction
Electing a Leader

Synchronous System :
Bounded Message Delay
No timeouts

Asynchronous : 
Message delay finite but unbounded
Consensus not always possible

Correctness of Consensus Protocol :
Agreement : All Nodes agree on a value
Validity : Value proposed is Valid ie.. It is proposed by one of participant node.
Termination: All nodes Eventually Terminates

Agreement can be between majority of nodes.
Database Commit protocol that always says Abbort is not Valid
Termination of messages exchanged so that the protocol is useful.

2PC : 2 Phase Commit: 

1)Coordinator : Contact all nodes, suggest value , gather response(Form of Vote)
2)If everyone agrees contact for commiting else contact for abort.

IMP: Consensus here is not on value proposed but weather to accept or reject the value nor the value.
Not on what value should but rather weather to accept or reject value.

Real Time Example
Transfer money from one account to another
Move file between servers

TO check weather commit happened see log.

Phase 1: Coordinator Request
Coordinator request (REQUEST Message to all nodes)
Eg : Delete file from directory

Nodes receive requests execute tranxn locally
Write to local log Request ,Result,VOTE_COMMIT or VOTE_ABBORT
Send VOTE_COMMIT to cordinator

Coordinator receive vote VOTE_ABORT
Write GLOBAL_ABORT/COMMIT to Global Log and sends GLOBAL_ABORT/COMMIT to participant nodes.
No of messages : 3n where n is number of nodes.

1)Fail Stop : Nodes Crash and Never recover
2)Fail Recover: Crash and recover after sometime
3)Byzantine: Divert from protocol specification .
Example an army of Soldiers, coordinator is a general . Agree on one point or fail. Some soldiers traitor :
All loyal traitors should agree
Small Number of traitors should not be able to trick loyal soldiers.

Case 1: Before messages exchanged server crashes. (No worries)
Case 2: Some proposal messages sent but not all.
Some nodes received proposal starting 2PC ,if cordinator do not recover in time , nodes that have sent proposal keeps on waiting.

Inspired from : Must Read  :

2PC suffers from Single Point of Failure.

Thursday, 29 May 2014

Understanding Evaluation Metrices.

Confusion Matrix is a tabular layout that can be used to measure the performance of an algorithm. Each Column represents predicted value and each row represents actual value. 
Lets take an example of testing a person for cancer. 
If the person has cancer , the test will say it as positive and if not then negative.
There are can be 4 outcomes if we consider test as an experiment.

1) The patients were suffering from cancer and our test recognized them as suffering from Cancer : These are called TRUE POSITIVE.
2) The patients were suffering from cancer but tests showed negative : These are called TRUE NEGATIVE
3) Patients who were healthy but were diagnosed as suffering from Cancer : FALSE POSITIVE
4) Patients who were not suffering from cancer and test said the same : FALSE NEGATIVE.

Precision (Positive Predicted Value) : proportion of patients test showed as having cancer and they actually had cancer.

\mathit{PPV} = \mathit{TP} / (\mathit{TP} + \mathit{FP})
Recall(True Positive Rate)  : proportion of patients that actually had Cancer were diagnosed by test as having cancer. :
\mathit{TPR} = \mathit{TP} / P = \mathit{TP} / (\mathit{TP}+\mathit{FN})
Precision and Recall Inversely Related

Accuracy : 
Ratio of Correctly classified instances to total instances.
In layman terms : Number of times the test were right 
\mathit{ACC} = (\mathit{TP} + \mathit{TN}) / (P + N)

Where P and N are total number of tests.

F1 :  is a measure that combines the precision and recall rates by computing the harmonic mean between them. F-Measure does not consider True Negatives into account. 

\mathit{F1} = 2 \mathit{TP} / (2 \mathit{TP} + \mathit{FP} + \mathit{FN})

ROC Curve
Receiver Operating Curve(ROC)  is a plot to show change in performance of Binary Classifier with change in Threshold. The graphs are plotted as the fraction of true positive by total actual positive called the TRUE POSITIVE RATE and true negatives by total actual negatives also called the TRUE NEGATIVE RATE. Values range between 0 and 1.


Saturday, 3 May 2014

Simplifying the map reduce framework

Google MapReduce

What is Map Reduce ?
 Map Reduce is a model to process large amount of data in Parallel.Let user handle Computation aspects and hides the messy details of parallelization, fault tolerance,data distribution and load balancing .

Whats the programming model for MapReduce ?
Input <Key,Value> pair  -> Output <key,Value>pairs.
Map and Reduce two functions.
Map takes input pair,producing intermediate key value pair.
Group all intermediate values assiciated with same intermediate key k and pass it to Reduce funtion.

Reduce function :  Merge these intermediate values producing a smaller set if values.

What are some of the examples? 
Word Count. Count all occurrence of words in a set of documents. 
Map funciton -> each word plus an associated count..
Reduce function -> sum together all counts emitted.

Distributed Grep :  Map : Emits line if matches specific pattern, 
reduce identity function ,copies data to output.   

Count of URL access frequency : 
Map : Logs of web page request outputs <URL,1>
Reduce : Adds all values for the same URL <URL,total count>

Reverse Web Link Graph : 
Map: outout<target,source>
Link pairs target url found on source.  
Reduce : sums all source URL associated with a target.

Distributed Sort
Machine Learning.

What are the steps invloved in Map Reduce ?
1.Split the input file into M pieces of 16 MB or 64 MB.Starts many copies on cluster machine.
2.Follows master and slave model.1 Master many slaves or workers. Master  asssigns idle workers a task from M map and R reduce tasks.
3.Map worker reads input split . generate key value pair form it,pass it to Map function.
4.Map produces intermediate <Key,Value>pair which are buffered in memory.
5. Buffered pairs written to local disk periodically.Local disk partioned into R regions by partitioning function.
6.Location of these passed passed bak to master who forward these to workers.
7.Worker when recieves notification of a location, uses RPC to read buffered data from buffered Disk. Sort the data by intermediate key for grouping.
8.Reduce worker , iterates over sorted intermediate key and for each unique key it passes the <key,Value> pair to reduce function.
9.Writes the output to files.
Master :
Stores state(Idle,in-progress or compleated) and identity of worker machine.

Why Combiner Function ?
A mapper function(WOrdCount) produces output in the form <'the',1> with word and its count.  This output is then sent over the network to a single reduce task and clubbed together to produce as many output files as the number of reducer function.
Combiner function does parital merging of data ,before it is sent over the network. Combiner function runs on each machine that has Mapper funciton.
Combiner function significantly speeds up the MapReduce operation by minimizing the number of <k,v> pair that will be sent over the network therby improving the bandwidth. Combiner code is similar to Reducer. 

Saturday, 19 April 2014

Extracting Movie Information from IMDB

A Python Script to access Movie information from IMDB..
you can look at the script on my GIT repository.
Goal : To extract Movie Information from IMDB.
Process : 
1) For data scrapping best python Library is Beautiful Soup.
2)Since I use Python3 Mechanize is not available so the work is a little difficult.
3)UrlLib2 module not found error resolved by  this code :
    import urllib.request as urllib2
    import urllib2

4)BeautifulSoup extracts information from the HTML code.
5)Search for a perticular title on IMDB.
6) Most of the time the first result is the one we are searching for .
7)Use Web Scrapping to extratct Movie Information.
8)Rating,Starcast,Critic Raiting and all the information is extracted.

rating = soup1.findAll('span',{'itemprop':'ratingValue'})[0].string

extracts rating from IMDB file . Here itemprop is a Defined class and You are extracting the first element of the itemprop class with name "rating Value"
Beautiful soup API useful for Scrapping
string  : returns the value.
text : returns the text associated with h1..h4, div tags.
other useful ways extract link using 


Wednesday, 26 March 2014

Python and NLP

I recently worked on a project titled "Recommending Similar defects on Apache Hadoop" .Its a recommendation system that predicts similar defects and then predicts the effort estimate for each defect.
1) Extract XML/Excel data from Apache Hadoop Issue Tracker.
2)Convert the extracted data into CSV for persistent storage.
3)Extract required Column

Python COde :

import csv
import re

def col_selector(table, column_key):
    return [row[column_key] for row in table]

with open("Data/next.csv","r") as csvfile:
    reader = csv.DictReader(csvfile, delimiter=",")
    table = [row for row in reader]
    foo_col = col_selector(table, "Summary")
    bar_col = col_selector(table, "Description")

The above example extract two columns from Apache Hadoop Issue Tracker CSV file.  Your program must include python library called

4)From these Column we will generate a set of words specific to Hadoop.
We will apply various NLP to generate various words from the summary and description.

5)There are 5 Steps in Natural Language Processing 
1. Tokenizing
2. Stemming
3. Stop Word Removal
4. Vector Space Representation
5. Similarity Measures

Step 1 : Tokenizing :
 The tokenization process involves breaking a stream of characters of text up into words or phrases, symbols or other meaningful elements called tokens. Before indexing, we Fillter out all common English stopwords.I obtained a list of around 800 stopwords online. 
K. Bounge. Stop Word List.
The list contained articles, pronouns, verbs etc. I filtered out all those words from our extracted text. After reviewing the list, we felt stopwords list for a Hadoop Database has to be built separately, as numbers and sym-
bols are also to be filtered out. 

Step 2:
Stemming is used to try to identify a ground form for each word in the text. Some words that carry the same information can be used in different grammatical ways, depending on how the creator of the report wrote it down. This phase will remove a xes and other components from each token
that resulted from tokenization so that only the stem of each word remains. For stemming, we used a python library called PortorStemmer. We passed to it stream of extracted words. Words like caller, called, calling whose stem was call were Filtered and only 1 word, call, was kept in the nal list.I Filtered around 1200 words this way.

Step 3:
Stop Word Removal 
Synonyms removal and replace by 1 common word.I used wordnet NLTK to perform this.
Second Phase : Spell checking: List compared with list of misspelled words.

Step 4:
Vector Space representation.
After the first 3 steps I had around 5500 words. These words were used to identify tags.Each defect with tags was then represented into a Vector space model.Used general method used by scikit.

Step 4: Similarity Measure.
Calculated the cosine similarity between the two defect vectors.