我正在尝试创建一个 jar 以在带有流式传输的 snappy-job shell 上运行。我有聚合功能,它可以在 Windows 中完美运行。但是我需要一个表,每个键都有一个值。基于来自 github 的示例创建一个 jar 文件,现在我遇到了 put into sql 命令的问题。
我的聚合代码:
val resultStream: SchemaDStream = snsc.registerCQ("select publisher, cast(sum(bid)as int) as bidCount from " +
"AggrStream window (duration 1 seconds, slide 1 seconds) group by publisher")
val conf = new ConnectionConfBuilder(snsc.snappySession).build()
resultStream.foreachDataFrame(df => {
df.write.insertInto("windowsAgg")
println("Data received in streaming window")
df.show()
println("Updating table updateTable")
val conn = ConnectionUtil.getConnection(conf)
val result = df.collect()
val stmt = conn.prepareStatement("put into updateTable (publisher, bidCount) values " +
"(?,?+(nvl((select bidCount from updateTable where publisher = ?),0)))")
result.foreach(row => {
println("row" + row)
val publisher = row.getString(0)
println("publisher " + publisher)
val bidCount = row.getInt(1)
println("bidcount : " + bidCount)
stmt.setString(1, publisher)
stmt.setInt(2, bidCount)
stmt.setString(3, publisher)
println("Prepared Statement after bind variables set: " + stmt.toString())
stmt.addBatch()
}
)
stmt.executeBatch()
conn.close()
})
snsc.start()
snsc.awaitTermination()
}
我必须更新或插入表 updateTable,但在更新命令期间,当前值必须添加到来自流的值。现在 :
执行代码时看到的内容:
select * from updateTable;
PUBLISHER |BIDCOUNT
--------------------------------------------
publisher333 |10
然后我给kafka发了消息:
1488487984048,publisher333,adv1,web1,geo1,11,c1
并再次从 updateTable 中选择:
select * from updateTable;
PUBLISHER |BIDCOUNT
--------------------------------------------
publisher333 |11
Bidcount 值被覆盖而不是添加。但是当我从 snappy-sql shell 执行 put into 命令时,它可以完美运行:
put into updateTable (publisher, bidcount) values ('publisher333',4+
(nvl((select bidCount from updateTable where publisher =
'publisher333'),0)));
1 row inserted/updated/deleted
snappy> select * from updateTable;
PUBLISHER |BIDCOUNT
--------------------------------------------
publisher333 |15
你能帮我处理这个案子吗?也许有人有其他使用 snappydata 插入或更新值的解决方案?
提前谢谢你。