5

我需要知道如何通过事件发布者从 Enteprise Integrator 发布统计信息到流处理器。

我在我的 EI 上有以下事件发布者的实现

<?xml version="1.0" encoding="UTF-8"?>
<eventPublisher name="MessageFlowStatisticsPublisher"
  statistics="enable" trace="enable" xmlns="http://wso2.org/carbon/eventpublisher">
  <from streamName="org.wso2.esb.analytics.stream.FlowEntry" version="1.0.0"/>
  <mapping customMapping="disable" type="wso2event"/>
  <to eventAdapterType="wso2event">
    <property name="username">admin</property>
    <property name="protocol">thrift</property>
    <property name="publishingMode">non-blocking</property>
    <property name="publishTimeout">0</property>
    <property name="receiverURL">tcp://xxx:7611</property>
    <property encrypted="true" name="password">xxx</property>
  </to>
</eventPublisher>

在流处理器上,我有简单的 siddhi 应用程序用于接收数据并将它们打印到日志中,如下所示

@App:name("FlowEntryApp")
@App:description("Plan of flow entry")

@source(type='wso2event', @map(type = 'wso2event'))
define stream FlowEntry(compressed bool, tenantId int, messageId string, flowData string);

@sink(type='log', prefix='My flowEntry:')
define stream TestOutputFlowEntry(messageId string, flowData string);

@info(name='FlowEntryOutput')
from FlowEntry
select messageId, flowData
group by messageId
insert into TestOutputFlowEntry;

此外,我已将用于发布统计信息的所有配置设置为我的代理服务的“启用统计信息”和“启用跟踪”。当我调用我的服务时,eventPublisher 将 wso2event 发送到 SP,这工作正常。但在 SP 方面,SP 处理错误“没有 StreamDefinition for streamId org.wso2.esb.analytics.stream.FlowEntry:1.0.0 存在于缓存中”

我知道,这个问题出在 siddhi 应用程序中,我定义流“FlowEntry”而不是“org.wso2.esb.analytics.stream.FlowEntry”,但 siddhi 语言不支持像 '.' 这样的字符。在流名称中。

所以我尝试在 EI 站点上更改流名称,仅将 eventPublisher 中的 streamName 更改为“FlowEntry”,我还更改了 eventstream 文件夹内 json 文件中的 streamName,但现在当我调用我的服务时,EI 不会向 SP 发送任何事件。

有人知道如何将 org.wso2.esb.analytics.stream.FlowEntry 流发布到 SP,然后由 siddhi 处理吗?

4

1 回答 1

4

可以使用源注释中的 wso2.stream.id 元素覆盖流名称。

@source(type='wso2event', wso2.stream.id='org.wso2.esb.analytics.stream.FlowEntry', @map(type = 'wso2event'))<br>
define stream FlowEntry(compressed bool, tenantId int, messageId string, flowData string);

通过使用上述源定义,'FlowEntry' 仍然可以在 Siddhi 应用程序内部使用,而在 thrift 服务器中流 id 将定义为 'org.wso2.esb.analytics.stream.FlowEntry:1.0.0'。

此致。

于 2018-02-07T16:29:20.223 回答