4

假设我定义了一些流,例如:

Dataset<Row> allMessages = df.select(.....)
Dataset<Row> messagesOfType1 = df.select() //some unique conditions
Dataset<Row> messagesOfType2 = df.select() //some other unique conditions

以及一些处理逻辑,例如:

StreamingQuery firstQuery = messagesOfType1
.writeStream()
.foreach(new CustomForEachWiriter(ConfigurationForType1())) // class that extends ForeachWriter[T] and save data into external RDBMS table
.start();

StreamingQuery secondQuery = messagesOfType2
.writeStream()
.foreach(new CustomForEachWiriter(ConfigurationForType2())) // class that extends ForeachWriter[T] and save data into external RDBMS table (may be even another database than before)
.start();

正如我上面提到的,CustomForEachWriter负责消息处理和存储到 RDBMS 中。在最简单的方法中,我可以在CustomForEachWriter.

但我并不是很喜欢这种方法,所以我想知道如何引入连接池。通常是 Connection 并且DataSource不实现Serializable接口,所以我不能简单地传递从驱动程序上的连接池中检索到的 Connection(TaskNotSerialized将引发异常)。它还看起来当新数据到达时CustomEachWriter创建了新对象。

您能否向我解释在这种情况下结构化流媒体的正确方法是什么?

谢谢

亲切的问候

4

0 回答 0