0

我有一个文本格式的推文流(TwitterStream)和每条推文的情感流(SentimentStream)。SentimentStream 订阅 TwitterStream,进行情绪分析并发布一条带有结果和 TwitterStream 序列号的新消息。

我正在尝试加入这两个流,其中 SentimentStream.seq 等于推文的序列号。我遇到的问题是我无法从 TwitterStream 获得序列号的“句柄”。

我一直在尝试找到一种方法来获取事件“元数据”,这可能会对事件的位置/序列号提供一些见解。

@App:name('SentimentJoin')
@App:description('Joins the RAW Tweets with the sentient for that tweet')

@sink(type = 'log', 
    @map(type = 'json'))
define stream AggregateStream (tweet string, sentiment string);

@source(type = 'nats', destination = "tweet-sentiment", bootstrap.servers = "nats://0.0.0.0:4222", cluster.id = "test-cluster", 
    @map(type = 'json'))
define stream SentimentStream (twitter_handle string, lib string, seq int, value string, confidence double);

@source(type = 'nats', destination = "iPhone", bootstrap.servers = "nats://0.0.0.0:4222", cluster.id = "test-cluster", 
    @map(type = 'text', fail.on.missing.attribute = 'true', regex.A='(.|\n)*', @attributes(tweet = 'A')))
define stream TwitterStream (tweet string);

-- https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0#SiddhiQLGuide3.0-Joins
@info(name = 'JoinOnSequenceNumber')
from every S=SentimentStream, T=TwitterStream(S.seq)
select T.tweet as tweet, S.value as sentiment
insert into AggregateStream;

任何帮助将不胜感激。

4

1 回答 1

1

我们通过https://github.com/siddhi-io/siddhi-io-nats/issues/25创建了一个功能改进请求来提供这种支持。

于 2019-10-28T12:02:28.727 回答