0

我正在使用 WSO2 Streaming Integrator Tooling 来测试这个脚本:

@App:name('UntitledETLTaskFlow')
@App:description('Description of the plan')

@source(type='file',file.uri = "file:/C:/Users/admin/Documents/test/productions.csv",mode = "TEXT.FULL",tailing = "false",header.present = "false",
    @map(type='csv'))
define stream input_stream (name string,amount double);

@sink(type = 'log',
    @map(type = 'passThrough'))
define stream LogStream (name string,amount double);

@info(name='query1')
from input_stream
select name,amount 
insert  into LogStream;

日志是:

UntitledETLTaskFlow.siddhi -  Started Successfully!
[2022-02-12_02-27-02_029] INFO {io.siddhi.core.stream.output.sink.LogSink} - UntitledETLTaskFlow : LogStream : [Event{timestamp=1644643622029, data=[Almond cookie, 90.0], isExpired=false}, Event{timestamp=1644643622029, data=[Almond cookie, 95.0], isExpired=false}, Event{timestamp=1644643622029, data=[Almond cookie, 100.0], isExpired=false}, Event{timestamp=1644643622029, data=[Almond cookie, 150.0], isExpired=false}, Event{timestamp=1644643622029, data=[Almond cookie, 155.0], isExpired=false}] 

我的问题是,当我用新数据覆盖 csv 文件时,日志什么也不做。

我该如何解决这个问题?换句话说,如何在每次修改或覆盖时自动轮询整个文件(而不是按行)?

谢谢。

4

0 回答 0