我有两个来源,一个是 Kafka 来源,一个是自定义来源,我需要制作一个睡眠自定义来源一小时,但我遇到了中断。
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at com.hulu.hiveIngestion.HiveAddPartitionThread.run(HiveAddPartitionThread.java:48)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
代码:
<kafka_Source>.union(<custom_source>)
public class custom_source implements SourceFunction<String> {
public void run(SourceContext<String> ctx) {
while(true)
{
Thread.sleep(1000);
ctx.collect("string");
}
}
}
如何在 Kafka 源将继续其流时制作睡眠自定义源。为什么我得到线程中断异常?