python - pyspark intersection by key and cogroup performance -
i'm new spark , have following problem:
in previous job created 2 large files each line holds pair of ids , according value e.g.
(id1, id2, value)
first thing reading files hdfs , map them tuples like:
((id1, id2), value)
def line_to_tuple(line): x = eval(line) return ((x[0], x[1]), x[2]) rdd0 = sc.textfile('hdfs:///user/xxx/file1') .map(lambda x: line_to_tuple(x)) rdd1 = sc.textfile('hdfs:///user/xxx/file2') .map(lambda x: line_to_tuple(x))
q1: there more efficient way this? can store data in way such don't have eval each line every time read in?
in next step use cogroup , filter out empty results in post write results hdfs:
rdd0.cogroup(rdd1).filter(lambda x: x[1][0] , x[1][1]) .map(lambda x: (x[0], list(x[1][0])[0], list(x[1][1])[0]) ) .saveastextfile('hdfs:///user/xxx/cogrouped_values')
the results of jobs looks this:
((id1, id2), value_file1, value_file2)
q2: code works eats lot of resources , i've been asking myself if there way further improve given following assumptions:
- the files sorted id1 , id2 , id1 smaller id2
- rdd0 << rdd1 (one of files smaller other)
Comments
Post a Comment