Wednesday, June 12, 2013

Bayesian Model

The prior is subjective parameter to describe the hypothesis spaces.

The posterior is the likelihood times the prior, normalized.

In general , when we have enough data, the posterior becomes peaked on a single concept, namely the MAP estimate. MAP is not a measure of uncertainty; Plugging in the MAP estimate can result in overfitting; the mode of MAP is an untypical point; MAP is not invariant to re-parameterization.

Bayesian model with more parameters do not necessarily have higher marginal likelihood, which is called Bayesian Occams razor.

Beta - Binomial model

Given X ~ Bin(theta),
Prior theta ~ Beta(a, b),
Posterior theta | (Data=N(N1+N0))~ Beta(theta|N1+a, N0+b).

The MLE theta=N1/(N1+N0)
The posterior mean theta = (N1+a)/(N1+N0+a+b)
The mode(MAP) theta = (N1+a-1)/(N1+N0+a-1+b-1)
The variance estimate could be achieved by MLE theta(1-theta)/(N1+N0).

Specifically, if a=b=a, uniform distribution, The posterior mean theta=(N1+1)/(N1+N0+1), add-one smoothing in practice.

The posterior could be sequentially update in a single batch. This makes Bayesian inference particularly well-suited to online learning.

Dirichlet - Multinomial model

Given X~Multinomial(theta1, ..., thetak),
Prior theta1,...thetak ~ Dirichelet(alpha1, ..., alphak),
Posterior theta1, ... thetak | (Data=N(N1+...Nk)) ~ Dirichelet(theta1+alpha1,...thetak+alphak))

The MLE thetak=Nk/(N1+...+Nk)
The mode(MAP) thetak = (Nk+alphak-1)/(N1+...+NK+alpha1-1+...+alphak-1)

Gaussian - Gaussian-Wishart model
Let Z0 be the normalizer of the pior.
Let Sigma be the Wishart distribution.
Let ZN be the normalizer of the posterior.

Naive Bayes Classifiers

Assume the features are conditionally independent given the class label, then the class conditional density is naive bayes classifier.

In the case of real-valued features, assume Gaussiam distribution.
In the case of binary features, assume Bernoulli distribution.
In the case of categorical features, assume Multinoulli distribution.

Priors

Uniform priors
Robust priors
Mixtures of conjugate priors
Hierarchical bayes
Empirical Bayes

Bayes Estimators for common lost functions

MAP estimate minimizes 0-1 loss

Posterior mean minimizes L1 norm

Posterior mean minimizes L2 norm

Reject option for classification


Friday, June 7, 2013

Try Out Hadoop Streaming with Shell awk

--- Remove the whole directory
hadoop fs -rmr /user/*****/tempoutput


-- Run the script in the allocated cluster. The results (output files) are written to the dfs directory tempoutput.

hadoop jar $HADOOP/hadoop-streaming.jar -Dmapred.job.queue.name=unfunded -mapper "awk '{if(length(\$0) > 50){print \$0}}'" -reducer NONE -input linux.words -output tempoutput



-- Wrap with shell script: mapper

$ cat mymapper1.sh

#!/bin/sh
awk '{if(length($0) > 50){print $0}}'
yarn jar $HADOOP/hadoop-streaming.jar \
-Dmapred.job.queue.name=unfunded \
-mapper mymapper1.sh \
-reducer NONE \
-input linux.words \
-output tempoutput \
-file mymapper1.sh

-- Wrap with shell script: mapper & reducer
$ cat mymapper.sh
#!/bin/sh
awk '{
if(length($0) gt 4 ) {print substr($0, 0, 4)" "$0 } 
if(length($0) gt 5 ) {print substr($0, 0, 5)" "$0} 
if(length($0) gt 6 ) {print substr($0, 0, 6)" "$0}
}'

$ cat myreducer.sh
#!/bin/sh
awk '{
curkey=$1; 
curvalue=$2; 
if(prevkey == curkey){
  count+=1; 
  mylist=mylist","curvalue;

else{
  if(count lt 3) {print prevkey" "mylist; } 
  count = 0; 
  mylist=curkey

  prevkey=curkey
}'

hadoop jar $HADOOP/hadoop-streaming.jar \
-Dmapred.job.queue.name=*********** \
-input linux.words \
-output tempoutput2 \
-mapper mymapper.sh \
-reducer myreducer.sh \
-file mymapper.sh \
-file myreducer.sh
 

Try Out MapReduce

--Check running queue
hadoop queue -showacls
--Run the wordcount program
hadoop jar $HADOOP_HOME/hadoop-examples.jar wordcount Dmapred.job.queue.name=unfunded /data/vespanews/20070128 temp/newsout
 
-- Copy the results from dfs to the gateway directory.
hadoop fs -copyToLocal 'temp/newsout/*' newsout
  
--Checking results
head -10 part-r-00000
  
--Kill the job by using jobid
hadoop job -kill job_201204200957_110493

Try Out Hive

--$PATH

echo $PATH
export PATH=$PATH:/home/y/bin
export HIVE_HOME=/home/y/libexec/hive
tar -xzvf 2008.tar.gz

hive
-- Creates a new database and stores related data at <hdfs-path>.
hive> CREATE DATABASE ling LOCATION '/user/***/hive';
hive> SHOW DATABASES;

-- Use the database the was just created.
hive> USE ***;

-- Prepare a table;
hive> CREATE TABLE test (a string, b string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION '/user/***/mydata/test';

CREATE TABLE flight_data(
year INT,
month INT,
day INT,
day_of_week INT,
dep_time INT,
crs_dep_time INT,
arr_time INT,
crs_arr_time INT,
unique_carrier STRING,
flight_num INT,
tail_num STRING,
actual_elapsed_time INT,
crs_elapsed_time INT,
air_time INT,
arr_delay INT,
dep_delay INT,
origin STRING,
dest STRING,
distance INT,
taxi_in INT,
taxi_out INT,
cancelled INT,
cancellation_code STRING,
diverted INT,
carrier_delay STRING,
weather_delay STRING,
nas_delay STRING,
security_delay STRING,
late_aircraft_delay STRING
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
 

CREATE TABLE airports(
name STRING,
country STRING,
area_code INT,
code STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ',';

-- Load data;
hive> LOAD DATA LOCAL INPATH 'data.00000' INTO TABLE test;
hive> LOAD DATA LOCAL INPATH '2008.csv' OVERWRITE INTO TABLE flight_data;hive> LOAD DATA LOCAL INPATH 'airports.csv' OVERWRITE INTO TABLE airports;

SHOW TABLES;
-- Execute a query to retrieve the data.
hive> SELECT * FROM test;
hive> set mapred.job.queue.name=unfunded;
hive> SELECT avg(arr_delay) FROM flight_data WHERE month=1 AND origin='SFO';hive> SELECT * FROM airports LIMIT 10;
hive> CREATE TABLE results AS SELECT name, AVG(arr_delay)
FROM flight_data f
INNER JOIN airports a
ON (f.origin=a.code)
WHERE month=1
GROUP BY name;

 hive> SELECT * FROM results LIMIT 10;

-- Clean up the table and database.
hive> DROP TABLE test;
hive> DROP DATABASE ling;


hive --hiveconf mapreduce.map.speculative=true --hiveconf mapreduce.reduce.speculative=true --hiveconf mapreduce.job.acl-view-job="*";

set mapred.reduce.tasks = 500;
set mapreduce.job.queuename=adhoc;
set mapreduce.input.fileinputformat.split.minsize = 2048000000;
set mapreduce.input.fileinputformat.split.maxsize = 2048000000;

use ling;
grant all on database ling to user XXX; 



Try Out Pig



--Local machine(local mode)

--creates copy of linux.words in local fs


cp /usr/share/dict/linux.words linux.words

-- places you in Grunt shell (Grunt is Pig's interactive shell)
pig -x local

--enter Pig latin statements from Grunt shell
grunt> A = load 'linux.words' using PigStorage() as (word:chararray);
grunt> B = filter A by SIZE(word) > 19;
grunt> dump B;
grunt> quit;

--place Pig latin statements in script
/* myscript.pig */
A = load 'linux.words' using PigStorage() as (word:chararray);
B = filter A by SIZE(word) > 19;
dump B;

-- runs the script
pig -x local myscript.pig


-- check running queue/cluster should use?
mapred queue -showacls

--Gateway machine(mapreduce mode. Change (myqueue) to (unfunded)
pig -Dmapred.job.queue.name=unfunded myscript.pig
pig -Dmapred.job.queue.name=unfunded myscript2.pig
pig -Dmapred.job.queue.name=unfunded Pig_ABF.pig

Try Out HDFS

Usage: hadoop [--config confdir] COMMAND

where COMMAND is one of:

namenode -format format the DFS filesystem

secondarynamenode run the DFS secondary namenode

namenode run the DFS namenode

datanode run a DFS datanode

dfsadmin run a DFS admin client

fsck run a DFS filesystem checking utility

fs run a generic filesystem user client

balancer run a cluster balancing utility

jobtracker run the MapReduce job Tracker node

pipes run a Pipes job

tasktracker run a MapReduce task Tracker node

job manipulate MapReduce jobs

version print the version

jar <jar> run a jar file

distcp <srcurl> <desturl> copy file or directories recursively

archive -archiveName NAME <src>* <dest> create a hadoop archive

daemonlog get/set the log level for each daemon

or

CLASSNAME run the class named CLASSNAME

Most commands print help when invoked w/o parameters.

-- Check the status of Kerberos tickets
klist

-- invoke kinit to get a ticket
kinit li@COM

-- sudo user login to get a ticket
sudo -s -h -u d_pbp
/usr/kerberos/bin/kinit -k -t /homes/dfsload/dfsload.prod.headless.keytab dfsload@COM

-- Drop a ticket
kdestroy
Hadoop HDFS Commands

--View the hadoop commands
hadoop

--View the version of hadoop
hadoop version

--View FS shell commands
hadoop fs

-- List directory
hadoop fs –ls /user/li

-- List content of HDFS files:
hadoop fs -cat wordcount/output/*
hadoop fs -cat file.bz2 | bunzip2
hadoop fs -cat dir/*.bz2 | bacat | cut -d ^A -f 125,126 | cat -v
hadoop fs -test -e dir/*.bz2 | tail -1

-- Move files or directory
hadoop fs –ls /user/li/target /user/li/dest

--Copy file or directory
hadoop fs –cp file1 file2 file3 /user/li/dest/
hadoop fs -cp /user/li/file1.txt .
hadoop fs -cp /HRBlock/hrblock.data.reduced /HRBlock/ part-r-00000-- Create HDFS directoyr
hadoop fs –mkdir /user/li/input

-- Upload from the gateway host to the HDFS home directory
hadoop fs -copyFromLocal test-data/ch1/file1.txt /user/li
hadoop fs -put test-data/ch1/file1.txt /user/li

-- Download HDFS Files
hadoop fs –get /user/li/myfile .
hadoop fs -copyToLocal hdfs://dilithiumred-nn1.red.ygrid.com:8020/projects/prod/user/20130327/SIDEBID/user/part-00998.bz2 .

-- Delete directory/file, and remove revursively(rm -rf)
hadoop fs -rm /user/li/temp
hadoop fs -rmr /user/li/temp

-- Change permissions by chown, chgrp, chmod
hadoop fs -chgrp -R users /user/li
hadoop fs -chmod 755 /user/li

-- Viewing Data from HDFS
hadoop fs -text /user/li/tmp5/000000_0.deflate | tr '\001' ',' | head -1
hadoop fs -cat path_to_data/*.bz2 | bzcat | cut -d ^A -f 125,126  | cat -v
This selects columns 125 and 126 from Ctrl-A separated data.
To create the ^A, type:  CTRL-v CTRL-a
Inside screen, type:  CTRL-v CTRL-a a

--Transfer data between clusters
hadoop distcp -Dmapred.job.queue.name=adhoc -Ddfs.umaskmode=002 -i -m 40 -update webhdfs://ygrid.yahoo.com/user/li/search_20131211 hdfs://ygrid.yahoo.com/user/li/search_20131211

--Kill a job
mapred job -list |tail -n+3 |awk {'print $1" "$4'} |grep 'li'
mapred job -kill job_1399615563645_524816

--View job logs
mapred job -logs job_1374774840603_3324664
mapred job -logs job_1387925060187_4840299

-- Show available queues
mapred queue -showacls
mapred queue -list
mapred queue -info apg_dailymedium_p5
The meanings are percentages or fractions:
Capacity = % of the grid’s total capacity used by this queue under normal usage. If you add up the capacities for all queues you will get 100%.
MaximumCapacity =Fraction of the grid’s total capacity this queue can use. In this example, the p5 queue normally uses 5%, but it’s allowed to go as high as 40% of the total grid capacity.
CurrentCapacity = Current usage relative to Capacity. In this example, p5 is using 144% of its Capacity, or 7.2% of the total grid capacity.

-- Check running job list/Show number of jobs per queue for each user
mapred job -list
mapred job -list |tail -n+3 |awk {'print $5" "$4'} |sort |uniq –c

-- Another way to list jobs, both running and completed:
mapred queue -info apg_d**_p3 -showJobs

-- Check Gateway Quota
quota -u apoqa

-- Check HDFS Quotas, aka get count of objests
hadoop fs -count -q /projects/DSP
hadoop fs -count -q /user/li

-- Displays aggregate length of files contained in the directory.
hadoop fs –dus /user/li
hadoop fs -du hdfs://.com:8020/projects

-- Check the running processes
jps

-- Check is file / is zero /is dir
hadoop fs -test -e /user/li/
hadoop fs -test -z /user/li/
hadoop fs -test -d /user/li/

-- Check group members
/gridtools/generic/bin/showmembers –n GROUPNAME
/gridtools/gneric/bin/showmembers -n awrgroup
showmembers --netgroup cp_pnp_c_sudoers --type user --format comma

-- Launch R
echo USER=***
export INSTALL_ROOT=/homes/$USER/custom_root
/homes/$USER/custom_root/bin/R

-- Launch Pig
pig -Dmapred.job.queue.name=***  \
-Dmapreduce.reduce.memory.mb=3072 \
-Dmapreduce.map.memory.mb=3072 \
-Dmapreduce.map.java.opts="-Xmx2048M" \
-Dmapreduce.map.speculative=true \
-Dmapreduce.job.acl-view-job=* \
-Dmapreduce.task.timeout=1800000 \
-Dmapreduce.reduce.speculative=true \
-Dmapreduce.output.fileoutputformat.compress=true \
-param PARALLEL_ORDER=512  \
***_MB3.pig

-- Hadoop Streaming

hadoop jar $HADOOP_PREFIX/share/hadoop/tools/lib/hadoop-streaming.jar \
        -input /ngrams \
        -output /output-streaming \
        -mapper mapper.py \
        -combiner reducer.py \
        -reducer reducer.py \
        -jobconf stream.num.map.output.key.fields=3 \
        -jobconf stream.num.reduce.output.key.fields=3 \
        -jobconf mapred.reduce.tasks=10 \
        -file mapper.py \
        -file reducer.py