0

看似简单的问题,却找不到答案。

问题:我创建了一个函数,我将把它传递给 map(),它接受一个字段并从中创建三个字段。我希望 map() 的输出给我一个新的 RDD,包括来自输入 RDD 和新/输出 RDD 的字段。我该怎么做呢?

我是否需要将我的数据的键添加到函数的输出中,以便我可以将更多的输出 RDD 加入到我的原始 RDD 中?这是正确/最佳做法吗?

def extract_fund_code_from_iv_id(holding):
    # Must include key of data for later joining
    iv_id = Row(iv_id_fund_code=holding.iv_id[:2], iv_id_last_code=holding.iv_id[-2:])
    return iv_id

更基本的是,我似乎无法将两个 Row 组合起来。

row1 = Row(name="joe", age="35")
row2 = Row(state="MA")
print row1, row2

这不会像我想要的那样返回 new Row() 。

谢谢

4

1 回答 1

7

我真的建议使用UserDefinedFunction.

假设您想从DataFrameint_col类型的列中提取许多特征。假设这些功能只是所述列内容。intdfmodulo 3modulo 2

我们将导入UserDefinedFunction函数的数据类型。

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

然后我们将实现我们的特征提取功能:

def modulo_three(col):
    return int(col) % 3

def modulo_two(col):
    return int(col) % 2

并将它们变成udfs:

mod3 = udf(modulo_three, IntegerType())
mod2 = udf(modulo_two, IntegerType())

现在我们将计算所有额外的列并给它们起好听的名字(通过alias):

new_columns = [
    mod3(df['int_col']).alias('mod3'),
    mod2(df['int_col']).alias('mod2'),
]

最后,我们选择这些列以及之前已经存在的所有列:

new_df = df.select(*df.columns+new_columns)

new_df现在将有两个额外的列mod3mod2.

于 2015-04-15T15:36:53.867 回答