Friday, 11 July 2014

Hadoop-HDFS Code Analysis-1

I had hard time understanding the HDFS code, not only because it was too big and complex but also because of lack of documentation available.

1. Create File System
FileSystem is an abstract class extending Hadoops configured class.
Like a Normal file system it defines all File functions.
All File System that Hadoop use indirectly extends FileSystem class.
Which File System to be invoked depends on  Hadoop Configuration file.
 Suppose we specify hdfs://localhost:9000 which tells hadoop uses DistributedFileSystem.
This file can be invoked from core-default.xml
file:/// invokes LocalFileSystem

2.Client invokes Server
mkdirs function assume calls the DistributedFileSystems mkdir

public boolean mkdirs(Path f, FsPermission permission) throws IOException {
    return mkdirsInternal(f, permission, true);
  }

mkdir() calls mkdirInternal which inturn uses variable : dfs which is object of DFSClient
Apache File Descritption

DFSClient can connect to a Hadoop Filesystem and perform basic file tasks. It uses the ClientProtocol to communicate with a NameNode daemon, and connects directly to DataNodes to read / write block data. Hadoop DFS users should obtain an instance of DistributedFileSystem, which uses DFSClient to handle filesystem tasks.

Initialize Method is called.

All operation on DistributedFileSystem transferred to DFSClient.

public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
      Configuration conf, FileSystem.Statistics stats)
    throws IOException {

DFSclient method call forwarded to namenode and its assoicate variable rpcNameNode
Client Protocol provide all method through which methods from client to server can be invoked.
All method calls from rpcNamenode come down to Invokers invoke() method.
This way the calls are transferred to server side method calls.
Eventually it gets connected to Server class.
3 major components of server
Listener, Responder, Handler,


Thursday, 3 July 2014

Setting up Hadoop Development Envoirnment (Hadoop Snapshot Version 3)

If you want to play around with Hadoop code, you need to setup Hadoop Development environment.
Steps that needs to be followed regarding the same.

Some Softwares needed for Hadoop Installation:
1)Maven
2)Ant
3)ProtocolBuffer
4) JDK
5) Findbugs
6) Apache Ivy

1) Install Jdk

a) sudo apt-get install openjdk-7-jdk
b) set JAVA_HOME in .bashrc file
export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64
export PATH=$JAVA_HOME/bin:$PATH

2) Install Maven on your Ubuntu Machine

a) sudo wget http://apache.parentingamerica.com/maven/maven-3/3.2.2/binaries/apache-maven-3.2.2-bin.tar.gz
b) Once the tar is downloaded you need to untar it using
tar -xzf /apache-maven-3.2.2-bin.tar.gz -C /usr/local
c) ln -s apache-maven-3.2.2 maven
d) add following lines to .bashrc file
export MAVEN_HOME=/usr/local/maven
PATH=$PATH:$MAVEN_HOME/bin

3) Install Protocol Buffer
a) Unpack
b) tar zxvf protobuf-2.5.tar.gz
c) cd protobuf-2.5
d) ./configure
e) make
f) sudo make install

4) Install Apache Ant, Apache Ivy and Firebug
Unpack them,set home and add them as a path in .bashrc file

5)Clone Hadoop Codebase
git clone git://git.apache.org/hadoop-common.git hadoop

6)Go to directory hadoop
mvn clean install -DskipTests -Pdist

That's it your Hadoop is ready for development

Next task run examples on hadoop

7) Go to Hadoop-dist folder for Hadoop current release

8) edit .bashrc
vi /home/hduser/.bashrc
HADOOP_HOME will be a folder which has bin in it so in our case /usr/local/hadop/hadoop-dist/target/hadoop-3.0.0-SNAPSHOT

Add following lines
HADOOP_HOME= export /usr/local/hadoop/hadoop-dist/target/hadoop-3.0.0-SNAPSHOT
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin 
export HADOOP_MAPRED_HOME=$HADOOP_HOME 
export HADOOP_COMMON_HOME=$HADOOP_HOME 
export HADOOP_HDFS_HOME=$HADOOP_HOME 
export YARN_HOME=$HADOOP_HOME

9)Now update xmls 
a) cd /usr/local/hadop/hadoop-dist/target/hadoop-3.0.0-SNAPSHOT/
b)Update hadoop-env.sh
vi ./etc/hadoop/hadoop-env.sh
c)add the following
export JAVA_HOME= /usr/lib/jvm/java-7-openjdk-amd64
d)test by typing ./bin/hadoop version
 Hadoop 3.0.0-SNAPSHOT
Subversion git://git.apache.org/hadoop-common.git -r 2b58a0b0b7928c5de2f3bbee04606790fa8345d6
Compiled by hduser on 2014-07-04T00:29Z
Compiled with protoc 2.5.0
From source with checksum 98bf118b8fd146ceb73158fc38fe1e9
This command was run using /home/hduser/projects/hadoop/hadoop-dist/target/hadoop-3.0.0-SNAPSHOT/share/hadoop/common/hadoop-common-3.0.0-SNAPSHOT.jar
My output: 

Hadoop 3.0.0-SNAPSHOT
Subversion git://git.apache.org/hadoop-common.git -r 2b58a0b0b7928c5de2f3bbee04606790fa8345d6
Compiled by hduser on 2014-07-04T00:29Z
Compiled with protoc 2.5.0
From source with checksum 98bf118b8fd146ceb73158fc38fe1e9


This command was run using /home/hduser/projects/hadoop/hadoop-dist/target/hadoop-3.0.0-SNAPSHOT/share/hadoop/common/hadoop-common-3.0.0-SNAPSHOT.jar



e)Configure Hadoop env

vi ./etc/hadoop/core-site.xml
<property>    
           <name>fs.default.name</name>   
           <value>hdfs://localhost:9000</value> 
</property>

f) configure yarn-site.xml
#Paste following between <configuration>
<property>    
 <name>yarn.nodemanager.aux-services</name>          
<value>mapreduce_shuffle</value> </property> <property>    <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>    <value>org.apache.hadoop.mapred.ShuffleHandler</value> 
</property>

g) cp ./etc/hadoop/mapred-site.xml.template ./etc/hadoop/mapred-site.xml
h)Add following lines between Configuration tags for mapred-site.xml

<property>    
<name>mapreduce.framework.name</name>    
<value>yarn</value> 
</property>

i)Now make the directory for namenode and datanode
mkdir -p /tmp/namenode
mkdir -p /tmp/datanode

j)update hdfs-site.xml
vi ./etc/hadoop/hdfs-site.xml

<property>    
        <name>dfs.replication</name>    
        <value>1</value>  
</property>  
<property>    
        <name>dfs.namenode.name.dir</name>    
        <value>file:/home/hduser/mydata/hdfs/namenode</value>  
</property>  
<property>    
        <name>dfs.datanode.data.dir</name>    
        <value>file:/home/hduser/mydata/hdfs/datanode</value>  
</property>

k)./bin/hdfs namenode -format
l)./sbin/start-all.sh
m)jps
Should give something like

32454 NameNode
760 ResourceManager
32706 DataNode
558 SecondaryNameNode
2516 Jps
1044 NodeManager


n) ./bin/hadoop jar ./share/hadoop/mapreduce/hadoop-mapreduce-examples-3.0.0-SNAPSHOT.jar pi 2 5

Thats it
You are now ready to run examples, write or modify code.

This is the way I installed Hadoop
Check these tutorial really awesome one for Hadoop Installation.
JAVE TUTE 
SRCCODES
CONFLATED
VICHARGRAVE
PRAVIN
INTELLIJ







Wednesday, 4 June 2014

Consensus Protocols : Overview

Consensus Problem :  Mutually Agreeing on a particular Value


In this I have Considered both Txn and Normal operations consensus . 


Synchronous System :
Bounded Message Delay
No timeouts

Asynchronous : 
Message delay finite but unbounded
Consensus not always possible

Correctness of Consensus Protocol :
Agreement : All Nodes agree on a value
Validity : Value proposed is Valid ie.. It is proposed by one of participant node.

Non Triviality* : There exist a accessible configuration in which the decision is to commit
Integrity*: Each (RM decides atmost once)

Termination: All nodes Eventually Terminates
*For Txn commit

Agreement can be between majority of nodes.
Database Commit protocol that always says Abbort is not Valid
Termination of messages exchanged so that the protocol is useful.

2PC : 2 Phase Commit: 

1)Coordinator : Contact all nodes, suggest value , gather response(Form of Vote)
2)If everyone agrees contact for committing else contact for abort.




IMP: Consensus here is not on value proposed but weather to accept or reject the value nor the value.
Not on what value should but rather weather to accept or reject value.

Real Time Example
Transfer money from one account to another
Move file between servers

TO check weather commit happened see log.

Phase 1: Coordinator Request
Coordinator request (REQUEST Message to all nodes)
Eg : Delete file from directory

Nodes receive requests execute tranxn locally
Write to local log Request ,Result,VOTE_COMMIT or VOTE_ABBORT
Send VOTE_COMMIT to cordinator

Coordinator receive vote VOTE_ABORT
Write GLOBAL_ABORT/COMMIT to Global Log and sends GLOBAL_ABORT/COMMIT to participant nodes.
No of messages : 3n where n is number of nodes. ( N - Value Propose, N-Vote , N -Commit/Abbort)

Failures: 
1)Fail Stop : Nodes Crash and Never recover
2)Fail Recover: Crash and recover after sometime
3)Byzantine: Divert from protocol specification .
Example an army of Soldiers, coordinator is a general . Agree on one point or fail. Some soldiers traitor :
All loyal traitors should agree
Small Number of traitors should not be able to trick loyal soldiers.

Case 1: Before messages exchanged server crashes. (No worries) bcz protocol never started.
Case 2: Some proposal messages sent but not all.

Some nodes received proposal starting 2PC ,and some havent recieved any proposal. If coordinator do not recover in time , nodes that have sent proposal keeps on waiting for things that never finished.
Protocol Blocked.

Have another node to become co-ordinator. When timeout occurs that node can finish task that co-ordinator started.

Contacts all nodes to find out to whom the nodes voted.
So all nodes has to keep their descision on persistent storage.

If one another participating node fails before recovery node has committed, cant be recovered.
Recovery node cant distinguish between all nodes having voted to commit or abort.
Coordinator is participant too and then it fails.
 
Cordinator logs the result in persistent storage.
Advantage : Low  message Complexity.

Link: 
http://the-paper-trail.org/blog/consensus-protocols-two-phase-commit/


2PC transaction Commit: 

TM has following states : init,Preparing, Commit and aborted.
Let Trxn cordinator TMi

Phase 1: Obtaining a decision 
Initiate TM
TM ask all participant to prepare to commit transaction
TM adds record<Prepare>to its recovery long and saves it in stable storage
Send Prepare to all sites at which t is executed.

After receiving all messgae RM(Resource Manager ) can decide if it can commit on transection.
If transaction can be commited adds the record <ready> to log.
Flush all record to storage
Send Ready msg to TM
Else add <no> to log and sends a msg abort to TM

Phase 2: Aborting a decision
T commited if only TM receives ready msg else abort
TM add decision <commit> or <abort> to log 
TM sends msg to each particpant informing of descison.
RM does the same on recieving msg (Stores in log)

 
2PC suffers from Single Point of Failure.


Failure 2PC TXN RM:
RM participating in TXn fails and then recovers and then examins log:
if log says <commit> no action
log says<abort>no action
log says <ready> RM must consult TM for the fate. IF commite, redo log and write commit
else write abort.

If site contains no record => RM failed before responding to prepare. TM abort txn and RM should undo T

Failure TXN TM:
If TM fails before completion RM decides fate
IF one of RM contains commit then T is commited else aborted.
If RM of any site doesnt contain ready, no decision can be taken <abort>
IF RM has ready then sited must wait for TM to recover. Blocking problem

TOTAL MSG: 1 to initiate TM, 2n in phase 1 n in phase 2 total : 3n+1 
4 msg delay
n+1 writes to stable storage n by RM and 1 by TM


_____________________________________________________

3 PC
Problem of 2 PC removed by an extra Phase.
Idea break 2PC second phase into 2 phases. 
1st Prepare to commit phase. 
Co-ordinated sends to all if it has received yes from all .Nodes store them and then sends msg to cordinator that prepare to commit was received.
Purpose is to communicate result of vote to every node so that the state can be recovered. 

Last phase : If coordinator receives result of prepare of commit from all replica it commits, if delivery is not confirmed and our system can handle f failure co-ordinator can go ahead once it recieves f+1 msg.
If co-ordinator crash at any time , recovery node can take over transaction and query state from any remaining node.
If anyother node  crashes , it can read form other nodes.
Problem :
Network was divided into 2 partitions and both had different results.
Works for fail-stop not for fail-recover.


Cordinator fails before it has received prepared to commit replies from nodes,new co-ordinator takes over.Recover Co-or will interrogates nodes to know of the decision and completes them. At same time main co-or recovers and relies it has not received replies from all times them out,sending abort to all.messages gets interleaved with commit msg of recovery co-or resulting in inconsistent state.
_____________________________________________________________________

PAXOS 
One Node act as proposer and is responsible for initiating protocol.
Acceptors other nodes.
Acceptors accept or reject .
Once majority have  accepted protocol can terminate.
Proposer sends propose req to acceptor, acceptor responds once acceptor agrees, proposer sends commit request to acceptor.

Paxos adds 2 main improvements to 2PC.
1) Order to determine what proposal can be accepted.
2) Consider accepted if majority has done that.

Every proposer tagged with unique sequence number . Number required to decide on ordering of which proposer came first,
Once proposal arrives acceptors sees whats the highest number of proposal they have received.If new proposer number > previous acceptor reply saying it wont accept any proposal less then the recieved seq num.
All proposers unique seq number.
if two proposers are agreed on by majority atleast one will be common to both.
When new proposal is made new majority will guarantee acceptor that saw both previous proposals or two acceptors that saw one each.

Once it receives message from majority, proposer can go ahead and ask to commit to a value.

Case if 2 proposers propose first proposal accepted by just majority and if before reply of prepare to commit the acceptor fails . no majority. 
Now new proposer propose and its accepted by majority, it commits
Failed acceptor wakes up and sends final accept msg to orignal proposal first proposer commits too . Correctness voilated.  
Solution both propose same value. If any proposer receives higher value then it accepts it.


Failure :
If f failure allowed, no of acceptor 2f+1
If proposer fail, another can take over. 
If orignal recovers and see no > his own . 
It can agree on that.
Issue if both proposal send proposal one more then previous in every request, will never come to consensus.

Acceptor need to record highest proposal in stable storage.
if that storage crash cannot take part(Byzantine)

Efficiency:
1st phase : f+1 msg and recieve f+1 replies
repeat till 4 phases : 4f+4 msg
Delay till protocol completed 4 msgs.
It can be seen that 2F+1 needed for consensus with despite failure of f.
 

Paxos Commit Protocol:
Txn initiator calls TM
TM sends prepare to RM 
RM sends Phase 2a to acceptor 
acceptor sends phase 2b to TM
TM sends commit to RM
TOTAL MSG : 2F *(n+1) + 3n + 1 msgs
5 msg delays
n+2F+1 writes to stable storage.

Analysis:
1 msg initiator to TM
N prepare msg from TM to RM
N commit from TM to RM
N(2F+1) phase 2a commit msg from RM to acceptor as each RM sends to acceptor 

2F+1 phase 2b msgs.

total 2f(n+1)  + 3n+2

Transection initiated by one of RM so 1 prepare msg can be avoided bwn RM and TM
if TM in same node as acceptor phase 2b msg can be avoided. 

If each acceptor same node as RM and TM also on RM
F+1 of phase 2a and f+1 of phase 2b can be avoided.

Links:
PaperTrail
Paxos Commit 
Paxos Simple 
 
 
  

Thursday, 29 May 2014

Understanding Evaluation Metrices.



Confusion Matrix is a tabular layout that can be used to measure the performance of an algorithm. Each Column represents predicted value and each row represents actual value. 
Lets take an example of testing a person for cancer. 
If the person has cancer , the test will say it as positive and if not then negative.
There are can be 4 outcomes if we consider test as an experiment.

1) The patients were suffering from cancer and our test recognized them as suffering from Cancer : These are called TRUE POSITIVE.
2) The patients were suffering from cancer but tests showed negative : These are called TRUE NEGATIVE
3) Patients who were healthy but were diagnosed as suffering from Cancer : FALSE POSITIVE
4) Patients who were not suffering from cancer and test said the same : FALSE NEGATIVE.

Precision (Positive Predicted Value) : proportion of patients test showed as having cancer and they actually had cancer.

\mathit{PPV} = \mathit{TP} / (\mathit{TP} + \mathit{FP})
  
Recall(True Positive Rate)  : proportion of patients that actually had Cancer were diagnosed by test as having cancer. :
\mathit{TPR} = \mathit{TP} / P = \mathit{TP} / (\mathit{TP}+\mathit{FN})
 
Precision and Recall Inversely Related

Accuracy : 
Ratio of Correctly classified instances to total instances.
In layman terms : Number of times the test were right 
\mathit{ACC} = (\mathit{TP} + \mathit{TN}) / (P + N)

Where P and N are total number of tests.


F1 :  is a measure that combines the precision and recall rates by computing the harmonic mean between them. F-Measure does not consider True Negatives into account. 

\mathit{F1} = 2 \mathit{TP} / (2 \mathit{TP} + \mathit{FP} + \mathit{FN})


ROC Curve
Receiver Operating Curve(ROC)  is a plot to show change in performance of Binary Classifier with change in Threshold. The graphs are plotted as the fraction of true positive by total actual positive called the TRUE POSITIVE RATE and true negatives by total actual negatives also called the TRUE NEGATIVE RATE. Values range between 0 and 1.


 

Saturday, 3 May 2014

Simplifying the map reduce framework

Google MapReduce

What is Map Reduce ?
 Map Reduce is a model to process large amount of data in Parallel.Let user handle Computation aspects and hides the messy details of parallelization, fault tolerance,data distribution and load balancing .

Whats the programming model for MapReduce ?
Input <Key,Value> pair  -> Output <key,Value>pairs.
Map and Reduce two functions.
Map takes input pair,producing intermediate key value pair.
Group all intermediate values assiciated with same intermediate key k and pass it to Reduce funtion.

Reduce function :  Merge these intermediate values producing a smaller set if values.
 

What are some of the examples? 
Word Count. Count all occurrence of words in a set of documents. 
Map funciton -> each word plus an associated count..
Reduce function -> sum together all counts emitted.

Distributed Grep :  Map : Emits line if matches specific pattern, 
reduce identity function ,copies data to output.   

Count of URL access frequency : 
Map : Logs of web page request outputs <URL,1>
Reduce : Adds all values for the same URL <URL,total count>

Reverse Web Link Graph : 
Map: outout<target,source>
Link pairs target url found on source.  
Reduce : sums all source URL associated with a target.

Distributed Sort
Machine Learning.

What are the steps invloved in Map Reduce ?
1.Split the input file into M pieces of 16 MB or 64 MB.Starts many copies on cluster machine.
2.Follows master and slave model.1 Master many slaves or workers. Master  asssigns idle workers a task from M map and R reduce tasks.
3.Map worker reads input split . generate key value pair form it,pass it to Map function.
4.Map produces intermediate <Key,Value>pair which are buffered in memory.
5. Buffered pairs written to local disk periodically.Local disk partioned into R regions by partitioning function.
6.Location of these passed passed bak to master who forward these to workers.
7.Worker when recieves notification of a location, uses RPC to read buffered data from buffered Disk. Sort the data by intermediate key for grouping.
8.Reduce worker , iterates over sorted intermediate key and for each unique key it passes the <key,Value> pair to reduce function.
9.Writes the output to files.
 
Master :
Stores state(Idle,in-progress or compleated) and identity of worker machine.






Why Combiner Function ?
A mapper function(WOrdCount) produces output in the form <'the',1> with word and its count.  This output is then sent over the network to a single reduce task and clubbed together to produce as many output files as the number of reducer function.
Combiner function does parital merging of data ,before it is sent over the network. Combiner function runs on each machine that has Mapper funciton.
Combiner function significantly speeds up the MapReduce operation by minimizing the number of <k,v> pair that will be sent over the network therby improving the bandwidth. Combiner code is similar to Reducer. 

Saturday, 19 April 2014

Extracting Movie Information from IMDB

A Python Script to access Movie information from IMDB..
you can look at the script on my GIT repository.
GIT 
Goal : To extract Movie Information from IMDB.
Process : 
1) For data scrapping best python Library is Beautiful Soup.
2)Since I use Python3 Mechanize is not available so the work is a little difficult.
3)UrlLib2 module not found error resolved by  this code :
try:
    import urllib.request as urllib2
except:
    import urllib2


4)BeautifulSoup extracts information from the HTML code.
5)Search for a perticular title on IMDB.
6) Most of the time the first result is the one we are searching for .
7)Use Web Scrapping to extratct Movie Information.
8)Rating,Starcast,Critic Raiting and all the information is extracted.

rating = soup1.findAll('span',{'itemprop':'ratingValue'})[0].string

extracts rating from IMDB file . Here itemprop is a Defined class and You are extracting the first element of the itemprop class with name "rating Value"
Beautiful soup API useful for Scrapping
string  : returns the value.
text : returns the text associated with h1..h4, div tags.
other useful ways extract link using 

mainlink=soup.findAll('td',{'class':'result_text'})[0].a['href']


Wednesday, 26 March 2014

Python and NLP

I recently worked on a project titled "Recommending Similar defects on Apache Hadoop" .Its a recommendation system that predicts similar defects and then predicts the effort estimate for each defect.
Steps:
1) Extract XML/Excel data from Apache Hadoop Issue Tracker.
https://issues.apache.org/jira/browse/HADOOP
2)Convert the extracted data into CSV for persistent storage.
3)Extract required Column


Python COde :

import csv
import re

def col_selector(table, column_key):
    return [row[column_key] for row in table]

with open("Data/next.csv","r") as csvfile:
    reader = csv.DictReader(csvfile, delimiter=",")
    table = [row for row in reader]
    foo_col = col_selector(table, "Summary")
    bar_col = col_selector(table, "Description")

The above example extract two columns from Apache Hadoop Issue Tracker CSV file.  Your program must include python library called csv.py
http://docs.python.org/2/library/csv.html

4)From these Column we will generate a set of words specific to Hadoop.
We will apply various NLP to generate various words from the summary and description.

5)There are 5 Steps in Natural Language Processing 
1. Tokenizing
2. Stemming
3. Stop Word Removal
4. Vector Space Representation
5. Similarity Measures

Step 1 : Tokenizing :
 The tokenization process involves breaking a stream of characters of text up into words or phrases, symbols or other meaningful elements called tokens. Before indexing, we Fillter out all common English stopwords.I obtained a list of around 800 stopwords online. 
K. Bounge. Stop Word List.
https://sites.google.com/site/kevinbouge/stopwords-lists
The list contained articles, pronouns, verbs etc. I filtered out all those words from our extracted text. After reviewing the list, we felt stopwords list for a Hadoop Database has to be built separately, as numbers and sym-
bols are also to be filtered out. 


Step 2:
Stemming is used to try to identify a ground form for each word in the text. Some words that carry the same information can be used in different grammatical ways, depending on how the creator of the report wrote it down. This phase will remove a xes and other components from each token
that resulted from tokenization so that only the stem of each word remains. For stemming, we used a python library called PortorStemmer. We passed to it stream of extracted words. Words like caller, called, calling whose stem was call were Filtered and only 1 word, call, was kept in the nal list.I Filtered around 1200 words this way.


Step 3:
Stop Word Removal 
Synonyms removal and replace by 1 common word.I used wordnet NLTK to perform this.
Second Phase : Spell checking: List compared with list of misspelled words.

Step 4:
Vector Space representation.
After the first 3 steps I had around 5500 words. These words were used to identify tags.Each defect with tags was then represented into a Vector space model.Used general method used by scikit.

Step 4: Similarity Measure.
Calculated the cosine similarity between the two defect vectors.