Friday, November 11, 2016

Read Json in Pig

-- Modify to your path
REGISTER /home/*/pig/elephant-bird-core-4.6rc4.jar;
REGISTER /home/*/pig/elephant-bird-pig-4.6rc4.jar;
REGISTER /home/*/pig/elephant-bird-hadoop-compat-4.6rc4.jar;
REGISTER /home/*/share/pig/lib/json-simple-1.1.jar;
REGISTER /home/*/share/pig/lib/piggybank.jar;

DEFINE UnixToISO org.apache.pig.piggybank.evaluation.datetime.convert.UnixToISO();

data = LOAD '/20140905/*' USING com.twitter.elephantbird.pig.load.JsonLoader('-nestedLoad');
--basefeed = FOREACH data GENERATE $0#'email_type', $0#'identity';
--data1 = limit basefeed 10;
--dump data1;

basefeed = FOREACH data GENERATE
$0#'email_type' as email_type,
$0#'identity' as identity,
$0#'domain_name' as domain_name,
$0#'domain_category' as domain_category,
$0#'purchase_category' as purchase_category,
$0#'country' as country,
$0#'email_id' as email_id,
$0#'ts' as ts;

basefeed1 = filter basefeed by (country == 'us');
--STORE basefeed1 INTO '20140903';


--by_basefeed1 = group basefeed1 all;
--by_basefeed2 = FOREACH by_basefeed1 GENERATE group, COUNT($1) as count;
--STORE by_basefeed2 INTO '20140903/by_basefeed2';


--0 count distinct email_id;
--by_email = FOREACH basefeed1 generate email_id;
--by_email1 = distinct by_email;
--by_email2 = group by_email1 all;
--by_email3 = FOREACH by_email2 GENERATE group, COUNT($1) as user_count;
--STORE by_email3 INTO '20140903/by_email3';


--1 count distinct identity;
---by_identity = FOREACH basefeed1 generate identity;
---by_identity1 = distinct by_identity;
---by_identity2 = group by_identity1 all;
---by_identity3 = FOREACH by_identity2 GENERATE group, COUNT($1) as user_count;
---STORE by_identity3 INTO '20140903/by_identity';

import json
#path=os.path.abspath()
#file = os.path.join(path, 'output.txt')
f = open("long example.txt", 'r')

#@outputSchema("record:bag{t:(cat2:chararray)}")
#def parse_list(line):
for line1 in f:
    print line1;
    category_dictinary = {"mailtype":[]}
    line = json.loads(line1)
    if type(line) == type(None):
        print 'null'
        #return outbag
    for element in line.keys():
        print element;
        print line[element];
        items = line[element];
        for item in items.keys():
            category_dictinary["mailtype"].append(item)
            print category_dictinary;
        print ('\n');

import json
f = open("long example.txt", 'r')

Python function
#@outputSchema("record:bag{t:(cat2:chararray)}")
#def parse_list(line):
for line1 in f:
    print line1
    category_dictinary = {"mailtype":[]}
    line = json.loads(line1)
    outbag = []
    if type(line) == type(None):
        print 'null'
        #return outbag
    for element in line.keys():
        print element;
        # element = elements
        items = line[element];
        print items.keys();
        # items key = ParcelDelivery
        for item in items.keys():
            item2 = items[item]
            if type(item2) is dict:
                #if 'itemShipped_name' in item2.keys(): print outbag.append(item2['itemShipped_name'])
                if 'itemShipped_name' in item2.keys(): print outbag.append(item2['itemShipped_name'])
            if type(item2) is list:
                for item3 in item2:
                    if 'itemShipped_name' in item3.keys(): outbag.append(item3['itemShipped_name'])
    print outbag;

No comments:

Post a Comment

Blog Archive