0

我还没有找到太多关于将udfs与包一起使用的教程。

假设我有以下数据集:

UID : distance_from_something : timestamp
100:100:0
100:101:1
100:102:2
200:200:0
200:202:3
200:204:6
300:300:0
300:303:5

现在我想计算每个 UID 的速度

data = LOAD 'testfile' USING PigStorage(':') AS (
    uid:long,
    distance:int,
    time_raw:long);

SPLIT data INTO
    good_data IF (
        (uid > 0L)),
    bad_data OTHERWISE;

REGISTER '$UDFPATH//calculateVelocity.py' USING jython AS vcalc;

grouped_data = GROUP good_data BY (long)$0;
data = FOREACH grouped_data GENERATE vcalc.calculate(good_data);
flat_data = FOREACH data GENERATE FLATTEN($0);

这是做这种事情的好方法吗,例如,如果我希望输出看起来像:

100:100:0:1
100:101:1:1
100:102:2:1
200:200:0:0.666...
200:202:3:0.666...
200:204:6:0.666...
300:300:0:0.6
300:303:5:0.6

在这种情况下,使用非线性插值计算速度的最佳方法是什么?

这是我当前的占位符:

def compared_to_previous(bag, index):
    dx = float(bag[index][1] - bag[index - 1][1])
    dt = float(bag[index][-1] - bag[index - 1][-1])/1000
    return dx/dt

def compared_to_next(bag, index):
    return compared_to_previous(bag, index+1)

def calculate(inBag):
    outBag = []

    index = 0
    tuples = len(inBag)
    for t in inBag:
        row = list(t)
        if not index:
            row.append(compared_to_next(inBag, index))
        elif index == tuples - 1:
            row.append(compared_to_previous(inBag, index))
        else:
            v = compared_to_previous(inBag, index)
            v += compared_to_next(inBag, index)
            row.append(v/2)
        outBag.append(tuple(row))

    return outBag
4

1 回答 1

1

我将由你来实际实现速度的计算——首先,不清楚你将如何处理可变速度,而且这种实现不是猪的问题。但是将这些数据放入 UDF 非常容易。

您不想传递good_data给 UDF——它指的是关系,而不是字段。您需要将每个 UID 的所有记录收集在一起,然后将该集合传递给知道如何处理它们的 UDF:

data =
    FOREACH (GROUP good_data BY uid)
    GENERATE
        group,
        FLATTEN(vcalc.calculate(good_data.(distance, time_raw)));

UDF 的输入是一组成对的形式(距离、time_raw),输出应该是一袋形式的三元组(距离、time_raw、速度)。

于 2013-09-24T14:18:50.930 回答