0

这是一个后续问题

Dstream 上的 Pyspark 过滤器操作

为了计算一天、一小时内收到了多少错误消息/警告消息 - 一个人是如何设计这项工作的。

我试过的:

from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext


    def counts():
            counter += 1
            print(counter.value)

    if __name__ == "__main__":

            if len(sys.argv) != 3:
                    print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
            exit(-1)


            sc = SparkContext(appName="PythonStreamingNetworkWordCount")
            ssc = StreamingContext(sc, 5)
            counter = sc.accumulator(0)

            lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
            errors = lines.filter(lambda l: "error" in l.lower())
            errors.foreachRDD(lambda e : e.foreach(counts))
            errors.pprint()

            ssc.start()
            ssc.awaitTermination()

然而,这有多个问题,首先打印不起作用(不输出到标准输出,我已经阅读过它,我可以在这里使用的最好的东西是日志记录)。我可以将该函数的输出保存到一个文本文件并改为尾部该文件吗?

我不确定为什么程序刚刚出现,没有任何错误/转储可以进一步研究(spark 1.6.2)

如何保存状态?我正在尝试按服务器和严重性聚合日志,另一个用例是通过查找某些关键字来计算处理了多少事务

我想尝试的伪代码:

foreachRDD(Dstream):
     if RDD.contains("keyword1 | keyword2 | keyword3"):
     dictionary[keyword] = dictionary.get(keyword,0) + 1 //add the keyword if not present and increase the counter
     print dictionary //or send this dictionary to else where

发送或打印字典的最后一部分需要切换出火花流上下文 - 有人可以解释一下这个概念吗?

4

1 回答 1

0

打印不起作用

我建议阅读 Spark 文档的设计模式部分。我认为您想要的大致是这样的:

def _process(iter):
    for item in iter:
        print item

lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
errors = lines.filter(lambda l: "error" in l.lower())
errors.foreachRDD(lambda e : e.foreachPartition(_process))

这将使您的呼叫print开始工作(尽管值得注意的是 print 语句将在工作人员而不是驱动程序上执行,因此如果您在集群上运行此代码,您将只能在工作人员日志中看到它)。

但是,它不会解决您的第二个问题:

如何保存状态?

为此,请查看updateStateByKey相关示例

于 2017-02-13T22:13:56.047 回答