假设我定义了一些流,例如:
Dataset<Row> allMessages = df.select(.....)
Dataset<Row> messagesOfType1 = df.select() //some unique conditions
Dataset<Row> messagesOfType2 = df.select() //some other unique conditions
以及一些处理逻辑,例如:
StreamingQuery firstQuery = messagesOfType1
.writeStream()
.foreach(new CustomForEachWiriter(ConfigurationForType1())) // class that extends ForeachWriter[T] and save data into external RDBMS table
.start();
StreamingQuery secondQuery = messagesOfType2
.writeStream()
.foreach(new CustomForEachWiriter(ConfigurationForType2())) // class that extends ForeachWriter[T] and save data into external RDBMS table (may be even another database than before)
.start();
正如我上面提到的,CustomForEachWriter
负责消息处理和存储到 RDBMS 中。在最简单的方法中,我可以在CustomForEachWriter
.
但我并不是很喜欢这种方法,所以我想知道如何引入连接池。通常是 Connection 并且DataSource
不实现Serializable
接口,所以我不能简单地传递从驱动程序上的连接池中检索到的 Connection(TaskNotSerialized
将引发异常)。它还看起来当新数据到达时CustomEachWriter
创建了新对象。
您能否向我解释在这种情况下结构化流媒体的正确方法是什么?
谢谢
亲切的问候