--install
python
yinst
install ypython27 –nosudo
chmod
+x wc_map.py
./hello.py
-- Part I:
Map/Reduce on Local Machine
cat wc_map.py
#!/usr/local/bin/python
#!/usr/local/bin/python
import
os, sys, string
what
= sys.argv[1]
for
ln in sys.stdin:
ww = ln.rstrip().split("\t")
ct = reduce (lambda x,y: x+y, [what ==
w and 1 or 0 for w in ww])
if ct:
print "%s\t%d" %
(what, ct)
cat wc_reduce.py
#!/usr/local/bin/python
#!/usr/local/bin/python
import
os, sys, string
ct_ttl=0
for
ln in sys.stdin:
ww = ln.strip().split("\t")
ct_ln = int(ww[1])
ct_ttl += ct_ln
print
ct_ttl
cat
data1 | python wc_map.py dog
cat
data1 | python wc_map.py dog > intermediate
cat
intermediate | python wc_reduce.py
cat
data1 | python wc_map.py dog | python wc_reduce.py
Part II:
Map/Reduce on Hadoop Cluster
hadoop
jar
$HADOOP_PREFIX/share/hadoop/tools/lib/hadoop-streaming.jar \
-Dmapred.job.queue.name=unfunded
\
-mapper
"python wc_map.py dog " \
-reducer
"python wc_reduce.py" \
-input
data1 \
-output
whatever \
-file
wc_map.py \
-file
wc_reduce.py \
-jobconf
mapred.map.tasks=2 \
-jobconf
mapred.reduce.tasks=1
For example, cat mapper.py
import sys
# input comes from STDIN (standard input)
#f=open("linux.words", "r")
for line in sys.stdin:
#for line in f:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = line.split()
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited; the trivial word count is 1
print '%s\t%s' % (word, 1)
# testing echo "foo foo quux labs foo bar quux" | python mapper.py
For example, cat reducer.py
#!/usr/bin/env python
from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split('\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
except ValueError:
# count was not a number, so silently
# ignore/discard this line
continue
# this IF-switch only works because Hadoop sorts map output
# by key (here: word) before it is passed to the reducer
if current_word == word:
current_count += count
else:
if current_word:
# write result to STDOUT
print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
# do not forget to output the last word if needed!
if current_word == word:
print '%s\t%s' % (current_word, current_count)
#echo "foo foo quux labs foo bar quux" | python mapper.py | sort -k1,1 | /python reducer.py
No comments:
Post a Comment