我有一个文本格式的推文流(TwitterStream)和每条推文的情感流(SentimentStream)。SentimentStream 订阅 TwitterStream,进行情绪分析并发布一条带有结果和 TwitterStream 序列号的新消息。
我正在尝试加入这两个流,其中 SentimentStream.seq 等于推文的序列号。我遇到的问题是我无法从 TwitterStream 获得序列号的“句柄”。
我一直在尝试找到一种方法来获取事件“元数据”,这可能会对事件的位置/序列号提供一些见解。
@App:name('SentimentJoin')
@App:description('Joins the RAW Tweets with the sentient for that tweet')
@sink(type = 'log',
@map(type = 'json'))
define stream AggregateStream (tweet string, sentiment string);
@source(type = 'nats', destination = "tweet-sentiment", bootstrap.servers = "nats://0.0.0.0:4222", cluster.id = "test-cluster",
@map(type = 'json'))
define stream SentimentStream (twitter_handle string, lib string, seq int, value string, confidence double);
@source(type = 'nats', destination = "iPhone", bootstrap.servers = "nats://0.0.0.0:4222", cluster.id = "test-cluster",
@map(type = 'text', fail.on.missing.attribute = 'true', regex.A='(.|\n)*', @attributes(tweet = 'A')))
define stream TwitterStream (tweet string);
-- https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0#SiddhiQLGuide3.0-Joins
@info(name = 'JoinOnSequenceNumber')
from every S=SentimentStream, T=TwitterStream(S.seq)
select T.tweet as tweet, S.value as sentiment
insert into AggregateStream;
任何帮助将不胜感激。