Thursday, July 18, 2013

Hadoop Streaming with Python



--install python
yinst install ypython27 –nosudo

chmod +x wc_map.py

./hello.py

-- Part I: Map/Reduce on Local Machine
cat wc_map.py
#!/usr/local/bin/python
import os, sys, string

what = sys.argv[1]
for ln in sys.stdin:
        ww = ln.rstrip().split("\t")
        ct = reduce (lambda x,y: x+y, [what == w and 1 or 0 for w in ww])
        if ct:
                print "%s\t%d" % (what, ct)

cat wc_reduce.py
#!/usr/local/bin/python
import os, sys, string

ct_ttl=0
for ln in sys.stdin:
        ww = ln.strip().split("\t")
        ct_ln = int(ww[1])
        ct_ttl += ct_ln

print ct_ttl

cat data1 | python wc_map.py dog

cat data1 | python wc_map.py dog > intermediate

cat intermediate | python wc_reduce.py

cat data1 | python wc_map.py dog | python wc_reduce.py

Part II: Map/Reduce on Hadoop Cluster
hadoop jar $HADOOP_PREFIX/share/hadoop/tools/lib/hadoop-streaming.jar \
-Dmapred.job.queue.name=unfunded \
-mapper "python wc_map.py dog  "  \
-reducer "python wc_reduce.py"  \
-input data1  \
-output whatever  \
-file wc_map.py  \
-file wc_reduce.py  \
-jobconf mapred.map.tasks=2  \
-jobconf mapred.reduce.tasks=1 


For example, cat mapper.py
import sys

# input comes from STDIN (standard input)
#f=open("linux.words", "r")
for line in sys.stdin:
#for line in f:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print '%s\t%s' % (word, 1)
# testing echo "foo foo quux labs foo bar quux" | python mapper.py


For example, cat reducer.py
#!/usr/bin/env python

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print '%s\t%s' % (current_word, current_count)
   

#echo "foo foo quux labs foo bar quux" | python mapper.py | sort -k1,1 | /python reducer.py

No comments:

Post a Comment