1

我有来自 Kafka 的三个传入流。我解析作为 JSON 接收的流并将它们提取到适当的案例类并形成以下模式的 DStream:

case class Class1(incident_id: String,
                  crt_object_id: String,
                  source: String,
                  order_number: String)

case class Class2(crt_object_id: String,
                  hangup_cause: String)

case class Class3(crt_object_id: String,
                  text: String)

我想加入这三个基于公共列的 DStream 即crt_object_id. 所需的 DStream 应采用以下形式:

case class Merged(incident_id: String,
                  crt_object_id: String,
                  source: String,
                  order_number: String,
                  hangup_cause: String,
                  text: String)

请告诉我一种方法来做同样的事情。我对 Spark 和 Scala 都很陌生。

4

1 回答 1

2

Spark Streaming 文档告诉你方法的签名join

join(otherStream, [numTasks])

当在两个DStreams(K, V)(K, W)对上调用时,返回一个新DStream(K, (V, W))对,其中包含每个键的所有元素对。

请注意,您需要DStreams 键值对而不是案例类。因此,您必须从案例类中提取要加入的字段,加入流并将生成的流打包到适当的案例类中。

case class Class1(incident_id: String, crt_object_id: String,
                  source: String, order_number: String)
case class Class2(crt_object_id: String, hangup_cause: String)
case class Class3(crt_object_id: String, text: String)
case class Merged(incident_id: String, crt_object_id: String,
                  source: String, order_number: String,
                  hangup_cause: String, text: String)

val stream1: DStream[Class1] = ...
val stream2: DStream[Class2] = ...
val stream3: DStream[Class3] = ...

val transformedStream1: DStream[(String, Class1)] = stream1.map {
    c1 => (c1.crt_object_id, c1)
}
val transformedStream2: DStream[(String, Class2)] = stream2.map {
    c2 => (c2.crt_object_id, c2)
}
val transformedStream3: DStream[(String, Class3)] = stream3.map {
    c3 => (c3.crt_object_id, c3)
}

val joined: DStream[(String, ((Class1, Class2), Class3))] =
    transformedStream1.join(transformedStream2).join(transformedStream3)

val merged: DStream[Merged] = joined.map {
    case (crt_object_id, ((c1, c2), c3)) =>
        Merged(c1.incident_id, crt_object_id, c1.source,
               c1.order_number, c2.hangup_cause, c3.text)

}
于 2018-01-27T11:44:14.443 回答