0

我从 node.js 中获得了巨大的价值,并且喜欢流处理模型。我主要将它用于具有数据丰富和 ETL 之类的作业的流处理。

为了充实,我可能有这样的记录......

{ "ip":"123.45.789.01", "productId": 12345 }

我想通过添加产品详细信息来丰富这一点

{ "ip":"123.45.789.01", "productId": 12345, "description" : "Coca-Cola 12Pk", "price":4.00 }

描述数据和价格数据都来自不同的流。在高地处理这种依赖关系的最佳方法是什么?

H = require('highland')

descriptionStream = H(['[{"productId":1,"description":"Coca-Cola 12Pk"},{"productId":2,"description":"Coca-Cola 20oz Bottle"}]'])
  .flatMap(JSON.parse)

priceStream = H(['[{"productId":1,"price":4.00},{"productId":2,"price":1.25}]'])
  .flatMap(JSON.parse)

#  the file is a 10G file with a json record on each line
activityStream = H(fs.createReadStream('8-11-all.json',{flags:'r',encoding:'utf8'}))
  .splitBy("\n")
  .take(100000) # just take 100k for testing
  .filter((line)-> line.trim().length > 0) # to prevent barfing on empty lines
  .doto((v)->
    # here i want to add the decription from the descriptionStream
    # and i want to add the price from the price stream.
    # in order to do that, i need to make the execution of this
    # stream dependent on the completion of the first two and
    # availability of that data.  this is easy with declarative
    # programming but less intuitive with functional programming
  )
  .toArray((results)->
    # dump my results here
  )

有什么想法吗?

4

2 回答 2

0

如果您使用 highland.js,您可以使用.map并提供一个函数来修改每个项目。

例如

var stream = _([{ "ip":"123.45.789.01", "productId": 12345 }]).map(function (x) {
   x.productName = 'Coca-Cola 12 Pack'
   return x;
});
于 2015-08-19T21:10:02.010 回答
0

这是一个刺。这是正确的方法吗?

H = require('highland')

# these values would come from some api/file
descriptionStream = H([{"productId":1,"description":"Coca-Cola 12Pk"},{"productId":2,"description":"Coca-Cola 20oz Bottle"}])
  .reduce({}, (memo,v)->
    memo[v.productId] = v;
    return memo
  )

# these values would come from some api/file
priceStream = H([{"productId":1,"price":4.00},{"productId":2,"price":1.25}])
  .reduce({}, (memo,v)->
    memo[v.productId] = v;
    return memo
  )

H([descriptionStream, priceStream])
  .series()
  .toArray((dependencies)->
    [descriptionIndex, priceIndex] = dependencies

    # these values would come from an api/file
    H([{productId:1},{productId:2}])
      .doto((v)-> v.description = descriptionIndex[v.productId].description)
      .doto((v)-> v.price = priceIndex[v.productId].price)
      .each((v)->
        console.log(JSON.stringify(v))
      )
  )

这给了我正确的结果,但不确定这是否是进行流依赖的优雅方式。我还假设如果您多次需要价格或描述流,那么您会分叉它们。

{"productId":1,"description":"Coca-Cola 12Pk","price":4}
{"productId":2,"description":"Coca-Cola 20oz Bottle","price":1.25}
于 2015-08-20T15:57:20.740 回答