1

我将 Pyspark 1.2.1 与 Hive 一起使用。(升级不会立即发生)。

我遇到的问题是,当我从 Hive 表中选择并添加索引时,pyspark 将 long 值更改为 ints,因此我最终得到了一个临时表,其中包含 Long 类型的列,但值类型为 Integer。(见下面的代码)。

我的问题是:我怎样才能(a)在不将 long 更改为 int 的情况下执行索引的合并(参见代码);或 (b) 以其他方式添加索引以避免该问题;或(c)随机化表列而不需要加入?

我要解决的根本问题是我想随机化配置单元表中某些列的顺序,并将其写入新表。这是为了使数据不再可识别个人身份。我这样做是通过向原始表和随机列添加递增索引,然后加入该索引来实现的。

该表如下所示:

primary | longcolumn | randomisecolumn

代码是:

hc = HiveContext(sc)
orig = hc.sql('select * from mytable')
widx = orig.zipWithIndex().map(merge_index_on_row)
sql_context.applySchema(widx, add_index_schema(orig.schema()))
        .registerTempTable('sani_first')

# At this point sani_first has a column longcolumn with type long,
# but (many of) the values are ints

def merge_index_on_row((row, idx), idx_name=INDEX_COL):
    """
    Row is a SchemaRDD row object; idx is an integer;
    schema is the schema for row with an added index col at the end
    returns a version of row applying schema and holding the index in the new row
    """
    as_dict = row.asDict()
    as_dict[idx_name] = idx
    return Row(**as_dict)

def add_index_schema(schema):
    """
    Take a schema, add a column for an index, return the new schema
    """
    return StructType(sorted(schema.fields + [StructField(INDEX_COL, IntegerType(), False)],key=lambda x:x.name))

在没有更好的解决方案的情况下,我将在 python 代码中强制受影响的列长类型。这……不太好。

4

0 回答 0