0

我正在尝试创建一个 jar 以在带有流式传输的 snappy-job shell 上运行。我有聚合功能,它可以在 Windows 中完美运行。但是我需要一个表,每个键都有一个值。基于来自 github 的示例创建一个 jar 文件,现在我遇到了 put into sql 命令的问题。

我的聚合代码:

val resultStream: SchemaDStream = snsc.registerCQ("select publisher, cast(sum(bid)as int) as bidCount from " +
  "AggrStream window (duration 1 seconds, slide 1 seconds) group by publisher")

val conf = new ConnectionConfBuilder(snsc.snappySession).build()

resultStream.foreachDataFrame(df => {

  df.write.insertInto("windowsAgg")

    println("Data received in streaming window")
    df.show()

    println("Updating table updateTable")
    val conn = ConnectionUtil.getConnection(conf)
    val result = df.collect()

  val stmt = conn.prepareStatement("put into updateTable (publisher, bidCount) values  " +
    "(?,?+(nvl((select bidCount from updateTable where publisher = ?),0)))")

    result.foreach(row => {
      println("row" + row)
      val publisher = row.getString(0)
      println("publisher " + publisher)
      val bidCount = row.getInt(1)
      println("bidcount : " + bidCount)

      stmt.setString(1, publisher)
      stmt.setInt(2, bidCount)
      stmt.setString(3, publisher)

      println("Prepared Statement after bind variables set: " + stmt.toString())

      stmt.addBatch()
    }
    )
    stmt.executeBatch()
    conn.close()
})

snsc.start()
snsc.awaitTermination()
}

我必须更新或插入表 updateTable,但在更新命令期间,当前值必须添加到来自流的值。现在 :

执行代码时看到的内容:

select * from updateTable;
PUBLISHER                       |BIDCOUNT   
--------------------------------------------
publisher333                    |10  

然后我给kafka发了消息:

1488487984048,publisher333,adv1,web1,geo1,11,c1 

并再次从 updateTable 中选择:

select * from updateTable;
PUBLISHER                       |BIDCOUNT   
--------------------------------------------
publisher333                    |11  

Bidcount 值被覆盖而不是添加。但是当我从 snappy-sql shell 执行 put into 命令时,它可以完美运行:

put into updateTable (publisher, bidcount) values ('publisher333',4+
(nvl((select bidCount from updateTable where publisher = 
'publisher333'),0)));
1 row inserted/updated/deleted
snappy> select * from updateTable;
PUBLISHER                       |BIDCOUNT   
--------------------------------------------
publisher333                    |15   

你能帮我处理这个案子吗?也许有人有其他使用 snappydata 插入或更新值的解决方案?

提前谢谢你。

4

1 回答 1

0

在流式传输的情况下,从 tomi_update 表中读取bidCount 值,但在 snappy-sql 的情况下,它从 updateTable 中读取。这是故意的吗?可能你想在这两种情况下都使用 updateTable 吗?

于 2017-10-24T15:48:36.087 回答