我是 WSO2 Siddhi 的新手,我希望你能帮助我。我一直在尝试修复它,但我不能。
我正在尝试“通过监控面板监控流处理器”,并查看图表,为此我将我的 siddhi 代码保存在:
C:\...\wso2\server\deployment\siddhi-files 和执行 WSO2\wso2si-4.0.0\bin> 时。\ 服务器
我正在运行 siddhi 规则,但出现以下错误:
错误 {io.siddhi.core.stream.StreamJunction} -在消耗流“OutputShareSomeKnowledgeStream”中的事件后“SubmitSolutionPRUEBA”出错,MarkIn 连续调用而没有在 io.siddhi.SiddhiApps.SubmitSolutionPRUEBA.Siddhi.Tables.reward_basicTable.updateOrInsert 中调用 markOut。延迟。因此,删除事件 'StreamEvent{ timestamp=1626790380632, beforeWindowData=null, onAfterWind], outputData=[Forum, 2017-May-10 09:51:48, 0f2f5191-5515-402d-bba2-73aa280285d3, Forum1, 4, 12] , type=CURRENT, next=null}' java.lang.IllegalStateException: MarkIn 在 io.siddhi.SiddhiApps.SubmitSolutionPRUEBA.Siddhi.Tables.reward_basicTable.updateOrInsert.latency 中没有调用 markOut 就连续调用
at org.wso2.carbon.si.metrics.core.LatencyMetric.markIn(LatencyMetric.java:62)
at io.siddhi.core.table.Table.updateOrAddEvents(Table.java:498)
at io.siddhi.core.query.output.callback.UpdateOrInsertTableCallback.send(UpdateOrInsertTableCallback.java:78)
at io.siddhi.core.query.output.ratelimit.OutputRateLimiter.sendToCallBacks(OutputRateLimiter.java:104)
at io.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter.process(PassThroughOutputRateLimiter.java:44)
at io.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:97)
at io.siddhi.core.query.input.ProcessStreamReceiver.processAndClear(ProcessStreamReceiver.java:182)
at io.siddhi.core.query.input.ProcessStreamReceiver.process(ProcessStreamReceiver.java:84)
at io.siddhi.core.query.input.ProcessStreamReceiver.receive(ProcessStreamReceiver.java:115)
at io.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:179)
at io.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:496)
at io.siddhi.core.query.output.callback.InsertIntoStreamCallback.send(InsertIntoStreamCallback.java:56)
at io.siddhi.core.query.output.ratelimit.OutputRateLimiter.sendToCallBacks(OutputRateLimiter.java:104)
at io.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter.process(PassThroughOutputRateLimiter.java:44)
at io.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:97)
at io.siddhi.core.query.processor.filter.FilterProcessor.process(FilterProcessor.java:58)
at io.siddhi.core.query.input.ProcessStreamReceiver.processAndClear(ProcessStreamReceiver.java:182)
at io.siddhi.core.query.input.ProcessStreamReceiver.process(ProcessStreamReceiver.java:84)
at io.siddhi.core.query.input.ProcessStreamReceiver.receive(ProcessStreamReceiver.java:145)
at io.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:227)
at io.siddhi.core.stream.StreamJunction.access$000(StreamJunction.java:64)
at io.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:514)
at io.siddhi.core.stream.input.InputDistributor.send(InputDistributor.java:39)
at io.siddhi.core.stream.input.InputEntryValve.send(InputEntryValve.java:55)
at io.siddhi.core.stream.input.InputHandler.send(InputHandler.java:91)
at io.siddhi.core.stream.input.source.PassThroughSourceHandler.sendEvents(PassThroughSourceHandler.java:40)
at io.siddhi.core.stream.input.source.InputEventHandler.sendEvents(InputEventHandler.java:111)
at io.siddhi.extension.map.text.sourcemapper.TextSourceMapper.onEventHandler(TextSourceMapper.java:307)
at io.siddhi.extension.map.text.sourcemapper.TextSourceMapper.mapAndProcess(TextSourceMapper.java:254)
at io.siddhi.core.stream.input.source.SourceMapper.onEvent(SourceMapper.java:200)
at io.siddhi.core.stream.input.source.SourceMapper.onEvent(SourceMapper.java:144)
at io.siddhi.extension.io.file.processors.FileProcessor.receive(FileProcessor.java:176)
at org.wso2.transport.file.connector.server.FileConsumer$EventListener.fileUpdated(FileConsumer.java:328)
at org.wso2.transport.file.connector.server.FileConsumer$EventListener.access$100(FileConsumer.java:302)
at org.wso2.transport.file.connector.server.FileConsumer.readLines(FileConsumer.java:272)
at org.wso2.transport.file.connector.server.FileConsumer.processFile(FileConsumer.java:231)
at org.wso2.transport.file.connector.server.FileConsumer.consume(FileConsumer.java:146)
at org.wso2.transport.file.connector.server.FileServerConnector.poll(FileServerConnector.java:73)
at org.wso2.carbon.connector.framework.server.polling.PollingJob.execute(PollingJob.java:51)
at org.quartz.core.JobRunShell.run(JobRunShell.java:202)
at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573)
我在 WSO2 Siddhi 中的代码是:
- 输入数据
@app:statistics(reporter = 'jdbc')
define stream InputDataStream (activityType string, weekdayTS string, monthTS string, dayTS string, yearTS string, hourTS string, cestTS string, studentId string, activityTypeId string, resto string);
- 出口
define stream OutputShareSomeKnowledgeStream (activityType string, timeStamp string, studentId string, activityTypeId string, totalTopics int, totalAnswers int);
@store(type='rdbms', jdbc.url="jdbc:mysql://localhost:3306/PruebaDB_1?&serverTimezone=UTC&autoReconnect=true&useSSL=false", username="root", password="root", jdbc.driver.name="com.mysql.cj.jdbc.Driver")
@primaryKey('studentId', 'activityTypeId')
define table OutputShareSomeKnowledgeTable (activityType string, timeStamp string, studentId string, activityTypeId string, totalTopics int, totalAnswers int);
- 桌子
@store(type='rdbms', jdbc.url="jdbc:mysql://localhost:3306/MEdit4CEPDB_1?&serverTimezone=UTC&autoReconnect=true&useSSL=false", username="root", password="root", jdbc.driver.name="com.mysql.cj.jdbc.Driver")
@primaryKey('playerId','activityTypeId')
define table reward_basicTable (rewardName string, description string, count bool, type string, complexEventName string, timeStamp string, activityTypeId string, playerId string, value int);
-- 3. 查询 --
@info(name='ShareSomeKnowledge')
from InputDataStream[activityType == 'Forum' and
(convert(str:split(resto, ',', 0), 'int') + convert(str:split(resto, '([\s\,]+)', 1),'int')) >= 5] -- if totalTopics + totalAnswers
select activityType,
str:concat(yearTS, '-', monthTS, '-', dayTS, ' ', hourTS) as timeStamp,
studentId,
activityTypeId,
convert(str:split(resto, ',', 0), 'int') as totalTopics,
convert(str:split(resto, '([\s\,]+)', 1), 'int') as totalAnswers
insert into OutputShareSomeKnowledgeStream;
@info(name='ShareSomeKnowledge_Basic') -- Guardo puntuacion
from OutputShareSomeKnowledgeStream
select 'Point' as rewardName,
'' as description,
true as count,
'FixAction' as type,
'ShareSomeKnowledge' as complexEventName,
timeStamp,
activityTypeId,
studentId as playerId,
10 as value
update or insert into reward_basicTable
on reward_basicTable.playerId==playerId and reward_basicTable.activityTypeId==activityTypeId;
-- I keep in table BD
from OutputShareSomeKnowledgeStream
select *
update or insert into OutputShareSomeKnowledgeTable
on OutputShareSomeKnowledgeTable.studentId==studentId and OutputShareSomeKnowledgeTable.activityTypeId==activityTypeId;
我的输入文件包含数据:
Forum,Sun May 07 06:07:37 CEST 2017,0f2f5191-5515-402d-bba2-73aa280285d3,Forum1,3,0
Forum,Mon May 08 04:59:25 CEST 2017,0f2f5191-5515-402d-bba2-73aa280285d3,Forum1,3,1
Forum,Wed May 10 03:14:10 CEST 2017,0f2f5191-5515-402d-bba2-73aa280285d3,Forum1,4,1
Forum,Wed May 10 03:14:10 CEST 2017,0f2f5191-5515-402d-bba2-73aa280285d3,Forum1,4,2
Forum,Wed May 10 03:14:10 CEST 2017,0f2f5191-5515-402d-bba2-73aa280285d3,Forum1,4,3
Forum,Wed May 10 09:51:48 CEST 2017,0f2f5191-5515-402d-bba2-73aa280285d3,Forum1,4,12
该程序在测试时运行良好,但在部署时执行它以获得延迟图,已用内存......监控,文件的第一行输入数据将其保存在表格中,但其余部分没有,它给出了上面暴露的错误,我没有得到图表。
我该如何解决这个错误?我需要查看跟踪图表。谢谢