1

我在 debian slim 上使用 flink 1.7.2,并将 kubernetes 作为我的资源管理器。但是当我部署它时,它可以运行一个小时左右没有任何问题,然后开始失败并出现以下错误:

java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:596)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
    at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
    at com.myorg.bi.web.sessionization.windowing.SessionProcessingFunction$$anonfun$process$2.apply(SessionProcessingFunction.scala:37)
    at com.myorg.bi.web.sessionization.windowing.SessionProcessingFunction$$anonfun$process$2.apply(SessionProcessingFunction.scala:33)
    at scala.collection.immutable.Stream.foreach(Stream.scala:594)
    at com.myorg.bi.web.sessionization.windowing.SessionProcessingFunction.process(SessionProcessingFunction.scala:33)
    at com.myorg.bi.web.sessionization.windowing.SessionProcessingFunction.process(SessionProcessingFunction.scala:13)
    at org.apache.flink.streaming.api.scala.function.util.ScalaProcessWindowFunctionWrapper.process(ScalaProcessWindowFunctionWrapper.scala:63)
    at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
    at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454)
    at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
    at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
    ... 7 more
Caused by: org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.DaemonException: The child process has been shutdown and can no longer accept messages.
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.add(Daemon.java:176)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:536)
    at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.invoke(FlinkKinesisProducer.java:293)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
    ... 26 more

我的配置是

KPL:
    rateLimit = 100
    recordTtl = "100000"
    requestTimeout = "120000"
    threadPoolSize = "64"
    connectTimeout = 60000
Taskmanager:
    taskmanager.heap.size: 8000m
    taskmanager.numberOfTaskSlots: 4
    parallelism.default: 8

Kinesis 分片 = 120。

知道可能是什么原因造成的吗?

更新:这是 sessionProcessFunction 正在做的事情:

class SessionProcessingFunction extends ProcessWindowFunction[RawEvent, SessionizedEvent, String, TimeWindow] {
  private var previousSessionParamsState: ValueState[SessionParameters] = _
  private var previousChannelState: ValueState[ChannelParameters] = _


  override def open(parameters: Configuration): Unit = {
    val previousSessionDescriptor = new ValueStateDescriptor[SessionParameters]("previousSessionParams", classOf[SessionParameters])
    val previousChannelDescriptor = new ValueStateDescriptor[ChannelParameters]("previousChannel", classOf[ChannelParameters])
    previousSessionParamsState = getRuntimeContext.getState(previousSessionDescriptor)
    previousChannelState = getRuntimeContext.getState(previousChannelDescriptor)
  }

  override def process(key: String, context: Context, elements: Iterable[RawEvent], out: Collector[SessionizedEvent]): Unit = {
    var effectiveSessionParams = this.previousSessionParamsState.value()
    val previousChannel = this.previousChannelState.value()
    var sessionTimedOut = true

    elements
      .toSeq
      .sortBy(event => event.derivedTstamp)
      .foreach(event => {
        effectiveSessionParams = SessionParameters(event,sessionTimedOut).update(effectiveSessionParams) // pure function
        val eventWithChannel = assignChannel(event, previousChannel)
        out.collect(SessionizedEvent(eventWithChannel, effectiveSessionParams.sessionId, context.window.getStart, context.currentProcessingTime))
        sessionTimedOut = false
      })
    previousSessionParamsState.update(effectiveSessionParams)
  }

  private def assignChannel(event: RawEvent, previousChannel: ChannelParameters) = {
    var eventWithChannel = event
    val channel: ChannelParameters = ChannelParameters(event.channel.orNull, event)

    if (channel.shouldPropagate(previousChannel)) {
      eventWithChannel = channel.propagate(event) // pure function
      this.previousChannelState.update(channel)
    }
    eventWithChannel
  }

}
4

0 回答 0