Saturday, 4 October 2014

Data Analysis Yahoo Finance Data

What the Script does :
=> Downloads historical Stock Prices for a company from Yahoo Finance website.
=> Calculates the 10 day moving average.
=> Generate the graph for the data

a) Necessary Imports:
1. Import Panda. Panda is a data analysis library of python.  Pandas has tool to read and write between in-memory data structures and different file formats. It has efficient data frame object for data manipulation with better indexing support.
2. Import Datetime: Provides classes to manipulate date and time objects. Api to convert between different file formats.
3. Import matplotlib: Python 2D plotting library , simple to use and genrates graphs, plots etc with few lines of code. 
4. Import Numpy : Scientific computing package for python,N-Dimensional array object ,Linear algebra related functions.
5. Import urlib: URL handling module for python
import pandas as pd
from datetime import timedelta
import datetime as dt
from pandas import Series, DataFrame
import matplotlib.pyplot as plt
import matplotlib as mpl
import urllib.request
import numpy as numpy
from datetime import datetime
from matplotlib.pyplot import *
import matplotlib.dates as mdates

b) Create a class called Stock 
The main(Init) function should accept as parameter the company symbol, lookback period, window size,and the end date.
def __init__(self,symbol,lookback_period,window_size, 
Suppose your lookback period is 100 then get prices for 100 days.
SO to get prices for 100 days subtract from the end date the lookback period.
But for that you need to use same format . 
In this example I have used timedelta.  Time delta helps you get the start date by specifying the number of days  from given date. Like your end date is today and you want stoc prices for last 100 days ,we can use timedelta.

start = end - timedelta(days=lookback_period)
c) Convert date into required format.
start_date = start.isoformat() 
d)  get the required Url for data analysis. 
url = "{0}".format(symbol)
url += "&a={0}&b={1}&c={2}".format(start_month,start_day,start_year)
url += "&d={0}&e={1}&f={2}".format(end_month,end_day,end_year)
e) Parse data
df = pd.read_csv(urllib.request.urlopen(url))
#get the adj close from csv
saved_column = df['Adj Close']
#get the matching date
y_data = df['Date']
f) Get the moving average
def movingaverage(self,interval, window_size):
window = numpy.ones(int(window_size))/float(window_size)
return numpy.convolve(interval, window, 'same')

Moving Average smooths price fluctuations by removing the noise. It computes the averages of a subset of full data set.

Moving Average Wiki 

g) Generate the graph
 x_data = x_points[0:70]
#get the moving average
y_av = self.movingaverage(saved_column,window_size)
#generate graph
figure("Plot of stocks")
x = [dt.datetime.strptime(d,"%Y-%m-%d").date() for d in x_data]
ylabel("adjusted close")


Tuesday, 23 September 2014

Hadoop a little deeper

Map Reduce :  Follows Master -  Slave Model 
Master :

Slave :
Task Tracker

Dynamo/ Casandra => Peer to Peer

Client is neither master nor Slave.
=> submit map reduce task
=> describe how data to be processed.
=> retrieve data.

Hadoop Components  :

NameNode : 
=> Files are not stored in NameNode , it just contains filesystem meta data which points files to blocks 
=> Metadata also contains information such as DiskSpace, last access to NN, permissions . 
=> Name Node is rack aware. To what Rack data node is on.  
=> NameNode coordinates access to data node.

Data Node:
=> Manages Data 
=> Sends Heartbeat message to NN to say they are alive.
=> Comunicates with one another to replicate data, move and copy data around
=> Stores data as blocks.  

=> Manages job and resources in Hadoop. 
=> Client application submit MR request to Job Tracker
=> Schedule Client jobs and allocates task to Task Tracker.

Task Tracker: 
=> Slaves deployed at each machine. 
=> They follow instruction of Job Tracker and runs Map Reduce task
=> Handles movement of data between map and reduce phase.

Secondry NameNode:  
=> Name is a misnomer. 
=> It does all housekeeping task in HDFS. 
=> Namenode store all filesystem metadata in RAM. It doesnt have any capability to persist data to disk. 2ndNN sends message to NN every hour pull all the data from NameNode and merges into a file called Checkpoint.

 *Image taken from

Hadoop 1.**   => Hadoop 2.**:

1. Horizontal Scaling
2. Single Point of Failure for Name Node
3. Impossible to run Non Map Reduce tools because of tight coupling of JobTracker + MR
4. Does not support Multitanency
5. Job Tracker overburdened bcz of too much work.

1. Horizontal Scaling

NameNode all metadata stored in RAM of NameNode
RAM size is limited you cannot take it beyond certain point. 
Bottleneck after 4000 Nodes.

2. Single Point of Failure
No backup node if Namenode fails

3. Impossible to run Non Map Reduce tools because of tight coupling of JobTracker + MR
Only Map Reduce processing can be achieved
Realtime analytics , MPI difficult, No Graph processing
You cannot do in HDFS you have to move data out of HDFS. 
Only Batch processing in 1.** 

4. Multitanency :
Only 1 type of job at a time even if you run it from different application not possible.

Hadoop Component 2.** extra
1. Resource Manager
2. Namenode High Availability
3. YARN: Yet another resource Negotiator.

Instead of having single Name node multiple name nodes are there. Independent to each other. Adhere to specific namespace.

Both JobTracker and task Tracker removed in Hadoop 2 .
Job Tracker task : Resource Management & Job Scheduling was split into two components.below 

New Components in Hadoop2.

1. Resource Manager :  
=> Scheduler that allocates resources in the cluster to various running application. 
=> Schedules task based on Application Container. 

2. Application Manager
=> Launches task in containers. 
=> Starting Application Master container on failure.

3. Node Manager:  
=> Runs on each node
=> Follows orders of Resource Manager. 
=> Responsible for maintaining container 
=> manages resources of a single node.

Other Feautures:

Name Node HA:
Automatic failover and recovery for NameNode master survice.
2 Namenodes :   Active and Passive when one fails other take control.

Point intime recovery for backup.

Federation :
Generic block storage layer.

Wednesday, 10 September 2014

A * Star Algorithm

A* is a path finding/Graph Traversal  algorithm generally used in game programming to determine the shortest path to reach a particular destination following Best First Search approach. Can be called as an extension of Dijkstra and reduces the number of comparisons while guaranteeing optimality. Heuristic is used in A*. A* uses heuristic to determine most promising node.   

Consider a grid with each square numbered. As in the above figure , cross sign (Sq. 14) represents start point and cross sign (Sq 35) represents destination. All Blue bars in a square represents the block. 
Each square considered a node.
Some terminology used with A*
Node Data
H Value :  Heuristic Value
G Value : Movement Cost                
F Value : G + H Value
Parent :  Pointer Back to previous node.

Open Node : List of Nodes to be visited
Close Node :  Already Visited.

Task : Shortest Path from 14 to 35.
H Value : Distance of a Node from destination node in our case from Sq 35.
Precompute all H value

If H value is Zero A* behaves like Dijksatra Algorithm. From Node 17 H value is 3 as you can go from 17 to 23 to 29 leading to 35
For Node 25 H value is 5 as you go from node 25 to 26 to 27 to 28 to 29 leading to 35 , block nodes are also considered for precomputing H values.

G Value is movement cost from Start node to another node.
Consider Movement cost of 10 For Horizontal and Vertical and of 14 for diagonal movement.
Diagonal > Horizontal (Pythagorean theoram)
So to Move from 14 to 8 the cost is 10 and from 14 to 9 it is 14.
Each square has parents.
For Sq. 14 we need to get parent , which can be done through open list and close list.
Write now no node on Close or open list.

All Node parent to14.
CloseList : 14
Open List : 7,8,9,13,15,19,20,21
Calculate G Cost for all. for 8 it is for Node 14 it is 0 for Node 8 it is 10 so total is 5. For Node 9 it is 14.

The Numbers in Black represents the H value of Nodes ie Distance of Node to Node 35.
The numbers in Blue represent G Value Node 8 as G Value 10.
F Value is sum of G Value + H Value which is written in red for the nodes. (For node 7 it is 17 => 10+7)
Need to Use Node with smallest F Value.
So in our case square number 15 as it has F-Value 15 which is minimum. Node 20 too has value 15 and can be selected too.
Take Node 15 put in close list.
Now repeat process for Node 15.

Now Calculate all values for Node 15.
CloseList : 14 , 15
Open List : 7,8,9,13,19,20,21,10
We see Node 14 already on Close list dont do anything.
Consider Node 8 so we check if it is more faster to go to 8 from 14 or  from 15.
For Node 15 We see :
G Value for Node 15 was 10 . G Value for Node 8 considering the current node 14 is 14
So total Movement is 14+10 ie 24
If the total movement > current G value dont change values else
If G value wrt node 15 is less then G Value of Node 8 then re parent node  8 to node 15.

If(Parents G value+Current G < Parents New G){
Add new Block to close list.
New Block parent = Prev Block Parent.

This process continues till you are next to destination node.

Closed : 14,15,20
Open : 7,8,9,13,19,21,10,22,25,27
Keep Doing and then see neighbor is destination so make destination parented to the other node and you are done.
Trace back path from destination to source.
35 to 29 to 22 to 15 to 14.
Make sure you change the parent if you find new G value smaller then previous computed.

All Green Arrow denotes final position of main node parents. Red is the path from source to destination.

Shortest Path : 14 ->15->22->29->35


Useful links for the topic 

Stanford Amits Page

Friday, 11 July 2014

Hadoop-HDFS Code Analysis-1

I had hard time understanding the HDFS code, not only because it was too big and complex but also because of lack of documentation available.

1. Create File System
FileSystem is an abstract class extending Hadoops configured class.
Like a Normal file system it defines all File functions.
All File System that Hadoop use indirectly extends FileSystem class.
Which File System to be invoked depends on  Hadoop Configuration file.
 Suppose we specify hdfs://localhost:9000 which tells hadoop uses DistributedFileSystem.
This file can be invoked from core-default.xml
file:/// invokes LocalFileSystem

2.Client invokes Server
mkdirs function assume calls the DistributedFileSystems mkdir

public boolean mkdirs(Path f, FsPermission permission) throws IOException {
    return mkdirsInternal(f, permission, true);

mkdir() calls mkdirInternal which inturn uses variable : dfs which is object of DFSClient
Apache File Descritption

DFSClient can connect to a Hadoop Filesystem and perform basic file tasks. It uses the ClientProtocol to communicate with a NameNode daemon, and connects directly to DataNodes to read / write block data. Hadoop DFS users should obtain an instance of DistributedFileSystem, which uses DFSClient to handle filesystem tasks.

Initialize Method is called.

All operation on DistributedFileSystem transferred to DFSClient.

public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
      Configuration conf, FileSystem.Statistics stats)
    throws IOException {

DFSclient method call forwarded to namenode and its assoicate variable rpcNameNode
Client Protocol provide all method through which methods from client to server can be invoked.
All method calls from rpcNamenode come down to Invokers invoke() method.
This way the calls are transferred to server side method calls.
Eventually it gets connected to Server class.
3 major components of server
Listener, Responder, Handler,