这是一个后续问题
为了计算一天、一小时内收到了多少错误消息/警告消息 - 一个人是如何设计这项工作的。
我试过的:
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
def counts():
counter += 1
print(counter.value)
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingNetworkWordCount")
ssc = StreamingContext(sc, 5)
counter = sc.accumulator(0)
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
errors = lines.filter(lambda l: "error" in l.lower())
errors.foreachRDD(lambda e : e.foreach(counts))
errors.pprint()
ssc.start()
ssc.awaitTermination()
然而,这有多个问题,首先打印不起作用(不输出到标准输出,我已经阅读过它,我可以在这里使用的最好的东西是日志记录)。我可以将该函数的输出保存到一个文本文件并改为尾部该文件吗?
我不确定为什么程序刚刚出现,没有任何错误/转储可以进一步研究(spark 1.6.2)
如何保存状态?我正在尝试按服务器和严重性聚合日志,另一个用例是通过查找某些关键字来计算处理了多少事务
我想尝试的伪代码:
foreachRDD(Dstream):
if RDD.contains("keyword1 | keyword2 | keyword3"):
dictionary[keyword] = dictionary.get(keyword,0) + 1 //add the keyword if not present and increase the counter
print dictionary //or send this dictionary to else where
发送或打印字典的最后一部分需要切换出火花流上下文 - 有人可以解释一下这个概念吗?