1

I have following issue: i need to find all combinations of values in the column B per each id from the column A and return the results as DataFrame

In example below of the input DataFrame

        A     B       
0       5    10       
1       1    20      
2       1    15       
3       3    50       
4       5    14       
5       1    30       
6       1    15       
7       3    33       

I need to get the following output DataFrame (it is for GraphX\GraphFrame)

        src dist      A
0       10   14       5
1       50   33       3
2       20   15       1
3       30   15       1
4       20   30       1

The one solution that I thought till now it is:

df_result = df.drop_duplicates().\
               map(lambda (A,B):(A,[B])).\
               reduceByKey(lambda p, q: p + q).\
               map(lambda (A,B_values_array):(A,[k for k in itertools.combinations(B_values_array,2)]))

print df_result.take(3)

output: [(1, [(20,15),(30,20),(30,15)]),(5,[(10,14)]),(3,[(50,33)])]

And here I'm stuck :( how to return it to the data frame that I need? One idea was to use parallelize:

import spark_sc

edges = df_result.map(lambda (A,B_pairs): spark_sc.sc.parallelize([(k[0],k[1],A) for k in B_pairs]))

For spark_sc I have other file with name spark_sc.py

def init():
    global sc
    global sqlContext

    sc = SparkContext(conf=conf,
                  appName="blablabla",
                  pyFiles=['my_file_with_code.py'])

    sqlContext = SQLContext(sc)

but my code it failed:

AttributeError: 'module' object has no attribute 'sc'

if I use the spark_sc.sc() not into map() it works.

Any idea what I miss in the last step? is it possible at all to use parallelize()? or I need completely different solution? Thanks!

4

1 回答 1

1

You definitely need another solution which could be as simple as:

from pyspark.sql.functions import greatest, least, col

df.alias("x").join(df.alias("y"), ["A"]).select(
    least("x.B", "y.B").alias("src"), greatest("x.B", "y.B").alias("dst"), "A"
).where(col("src") != col("dst")).distinct()

where:

df.alias("x").join(df.alias("y"), ["A"])

joins table with itself by A,

least("x.B", "y.B").alias("src")

and

greatest("x.B", "y.B")

choose value with a lower id as the source, and higher id as a destination. Finally:

where(col("src") != col("dst"))

drops self loops.

In general it is not possible to use SparkContext from an action or a transformation (not that it would make any sense to do this in your case).

于 2016-10-19T10:27:16.030 回答