2

我已经使用 GraphDSL.create() 配置了一个 RunnableGraph。我还指定了一个 ClosedShape 并连接了所有出口/入口。当我尝试执行程序时,出现以下运行时异常:

requirement failed: The inlets [] and outlets [] must correspond to the inlets [filter.in] and outlets [out]

知道我没有正确连接入口和出口的地方吗?

这是图形代码:

val g = RunnableGraph.fromGraph(GraphDSL.create() {
  implicit builder =>
    import GraphDSL.Implicits._

    // Source
    val A: Outlet[String] = builder.add(Source.fromIterator(() => flightDelayLines)).out

    // Flows
    val B: FlowShape[String, FlightEvent] = builder.add(csvToFlightEvent)
    val C: FlowShape[FlightEvent, DelayRecord] = builder.add(flightEventToDelayRecord)
    val D: UniformFanOutShape[DelayRecord, DelayRecord] = builder.add(Broadcast[DelayRecord](2))
    val F: FlowShape[DelayRecord, (Int, Int)] = builder.add(countByCarrier)

    // Sinks
    val E: Inlet[Any] = builder.add(Sink.ignore).in
    val G: Inlet[Any] = builder.add(Sink.ignore).in


    // Graph
    A ~> B ~> flightEventToDelayRecord ~> D ~> E
                                          D ~> F ~> G

    ClosedShape
}).run()
4

1 回答 1

1

我解决了我自己的问题。这是一个非常简单的疏忽。C我没有使用我添加到构建器中的那个,而是使用flightEventToDelayRecord图中的函数。解决方案是改为C在图中使用。

// Graph
A ~> B ~> C ~> D ~> E
               D ~> F ~> G

这让我意识到将大图分解成小图是多么重要。运行时异常不会查明根本原因(例如,“C 未使用”),因此如果使用较小的图形,调试这些运行时异常可能会更容易。希望这可以帮助其他陷入困境的人。

于 2016-02-29T01:51:01.880 回答