11

这是一个用 scala 编写的 spark 流程序。它每 1 秒计算来自套接字的单词数。结果将是字数,例如,从时间 0 到 1 的字数,然后从时间 1 到 2 的字数。但我想知道是否有某种方法可以改变这个程序,以便我们可以累积字数?即从时间 0 到现在的字数。

val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))

// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
4

2 回答 2

11

您可以StateDStream为此使用 a 。sparks examples 中有一个有状态字数的例子

object StatefulNetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      val currentCount = values.foldLeft(0)(_ + _)

      val previousCount = state.getOrElse(0)

      Some(currentCount + previousCount)
    }

    val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount")
    // Create the context with a 1 second batch size
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    ssc.checkpoint(".")

    // Create a NetworkInputDStream on target ip:port and count the
    // words in input stream of \n delimited test (eg. generated by 'nc')
    val lines = ssc.socketTextStream(args(0), args(1).toInt)
    val words = lines.flatMap(_.split(" "))
    val wordDstream = words.map(x => (x, 1))

    // Update the cumulative count using updateStateByKey
    // This will give a Dstream made of state (which is the cumulative count of the words)
    val stateDstream = wordDstream.updateStateByKey[Int](updateFunc)
    stateDstream.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

它的工作方式是你Seq[T]为每个批次获得一个,然后你更新一个Option[T]它就像一个累加器。它是一个原因Option是因为在第一批它将None保持这种状态,除非它被更新。在此示例中,计数是一个 int,如果您正在处理大量数据,您甚至可能需要一个LongBigInt

于 2014-07-16T03:48:24.473 回答
-1

我有一个非常简单的答案,它只有几行代码。你可以发现这是大多数的火花书籍。请记住,我使用了 localhost 和端口 9999。

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext(appName="PythonStreamingNetworkWordCount")
ssc = StreamingContext(sc, 1)
lines = ssc.socketTextStream("localhost", 9999)
counts = lines.flatMap(lambda line: line.split(" "))\
                     .map(lambda word: (word, 1))\
                     .reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()

并停止你可以使用一个简单的

ssc.stop()

这是一个非常基本的代码,但此代码有助于基本了解 spark 流,更具体地说是 Dstream。

在终端(Mac 终端)类型中向 localhost 提供输入

nc -l 9999

所以它会听你在那之后输入的所有内容,并且这些单词会被计算在内

希望这会有所帮助。

于 2020-08-09T16:23:03.403 回答