我一直在尝试扩展网络字数,以便能够根据某些关键字过滤行
我正在使用火花 1.6.2
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingNetworkWordCount")
ssc = StreamingContext(sc, 5)
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
counts = lines.flatMap(lambda line: line.split(" ")).filter("ERROR")
counts.pprint()
ssc.start()
ssc.awaitTermination()
我已经尝试了所有的变化,
我几乎总是收到错误,我无法应用类似的功能
在 TransformedDStream 上 pprint/show/take/collect
. 我在 Dstream 行上使用了带有 foreachRDD 的转换,并带有一个函数来检查使用本机 python 字符串方法,这也失败了(实际上,如果我在程序中的任何地方使用 print,spark-submit 就会出现 - 没有报告错误。
我想要的是能够过滤传入的 Dstreams 关键字如“错误”| “警告”等并将其输出到标准输出或标准错误。