1

我们正在使用 Aerospike 运行一个小 POC,以了解我们是否可以运行 LUA 脚本来做一些事情。

在这种情况下,我们使用了航班示例: https ://github.com/aerospike/flights-analytics

我在飞行时间上创建了一个新索引,以便按它进行搜索。

该脚本运行所有记录并查找航班的最后到达时间。为简单起见,我们只插入了飞往布法罗的航班。

    local function aggregatCityToMax(result, record)

      city = string.upper(record['DEST_CITY_NAME'])
      flightTime = record['ARR_TIME']


    if result[city] == nil then

           info("CITY: |%s|      |        DATE: %d        |        MAX: null" , city, flightTime)
           result[city] = flightTime

    else

            info("CITY: |%s|      |        DATE: %d        |        MAX: %d" , city, flightTime, 
        result[city])

         if result[city] < flightTime then
           info("new MAX %s", flightTime)
           result[city] = flightTime
         end
    end

   return result

end

local function reduce_values(a, b)
   return map.merge(a, b, mergeFunction)
end


local function mergeFunction(a, b)

   info("merging:  %s VS %s ", a, b)

   if a < b then
       return b
   end

   return a
end

function mapMax(stream)
 return stream :  aggregate(map(), aggregatCityToMax) : reduce(reduce_values)
end

日志显示奇怪的结果: 1. 我没有得到最大值。2.好像每10条记录,最大值被重置为null。

日志:

城市: |水牛城| | 日期:1253 | 最大:空城市:|BUFFALO| | 日期:1221 | 最大:1253 城市:|布法罗| | 日期:1600 | 最大:1253 城市:|布法罗| | 日期:1203 | 最大:1600 城市:|水牛城| | 日期:1424 | 最大:1600 城市:|水牛城| | 日期:2141 | 最大:1600 城市:|水牛城| | 日期:1821 | 最大:2141 城市:|布法罗| | 日期:1221 | 最大:2141 城市:|布法罗| | 日期:1424 | 最大:2141 城市:|布法罗| | 日期:1550 | 最大:2141 城市:|布法罗| | 日期:1703 | 最大:

城市: |水牛城| | 日期:2312 | MAX:1703 城市:|布法罗| | 日期:2251 | 最大:2312 城市:|布法罗| | 日期:19 | 最大:2312 城市:|布法罗| | 日期:1030 | 最大:2312 城市:|布法罗| | 日期:1257 | 最大:2312 城市:|布法罗| | 日期:803 | 最大:2312 城市:|布法罗| | 日期:19 | 最大:2312 城市:|布法罗| | 日期:1502 | 最大:2312 城市:|布法罗| | 日期:2319 | 最大:2312 城市:|布法罗| | 日期:1735 | 最大: 城市: |水牛城| | 日期:1221 | 最大:1735 城市:|布法罗| | 日期:1258 | 最大:1735 城市:|布法罗| | 日期:2125 | 最大:1735 城市:|布法罗| | 日期:2251 | 最大:2125 城市:|布法罗| | 日期:1104 | 最大:2251 城市:|布法罗| | 日期:2053 | 最大:2251 城市:|布法罗| | 日期:1340 | 最大:2251 城市:|布法罗| | 日期:2312 | 最大:2251 城市:|布法罗| | 日期:2226 | 最大:2312 城市:|布法罗| | 日期:2053 | 最大: 城市: |水牛城| | 日期:1637 | 最大:2053 城市:|布法罗| | 日期:1030 | 最大:2053 城市:|布法罗| | 日期:1618 | 最大:2053 城市:|布法罗| | 日期:1510 | 最大:2053 城市:|布法罗| | 日期:1510 | 最大:2053 城市:|布法罗| | 日期:2346 | 最大:2053 城市:|布法罗| | 日期:2343 | 最大:2346 城市:|布法罗| | 日期:1600 | 最大:2346 城市:|布法罗| | 日期:1550 | 最大:2346 城市:|布法罗| | 日期:1949 | 最大: 城市: |水牛城| | 日期:1104 | MAX:1949 城市:|布法罗| | 日期:2045 | MAX:1949 城市:|布法罗| | 日期:2213 | 最大:2045

我做错什么了吗?我错过了什么吗?

谢谢,

伊多布

4

1 回答 1

2

Aerospike 的聚合本质上更像是流式传输。即,它会不断推出部分结果,以免出现停滞。在客户端发生的 reduce 将完成合并所有部分结果的最终工作。与 hadoop map-reduce 相比,这是一个不同的模型,其中 reduce/final 将等待所有本地 reduce 完全完成,然后再启动自身。Aerospike 的流模型有一个优点。

您在聚合函数中有一个打印语句。一旦部分结果被推出,种子图在下一批工作时将开始为空。你的逻辑没有错。最后的结果应该没问题。您是否看到最终结果有任何问题?

于 2015-05-29T07:22:55.690 回答