1

我有一个与 ngsi2cosmos 数据流相关的问题。将 Orion 接收到的信息持久保存到 Cosmos 的公共实例中时,一切正常,但目标文件夹和文件名都是“空”。

简单测试如下:

  • 我创建了一个全新的 NGSIEntity 并添加了这些标头: Fiware-Service: myservice & Fiware-ServicePath: /my
  • 我添加了一个以 Cygnus 作为参考端点的新订阅。
  • 我向以前创建的 NGSIEntity 发送更新

当我在 Cosmos 中检查我的用户空间时,我检查是否已创建以下路由:/user/myuser/myservice/null/null.txt

文件内容还可以,Orion 中的每个更新信息都已正确嵌入其中。问题在于文件夹和文件名。我无法让它正常工作。难道不应该为文件夹和文件命名获取 entityId 和 entityType 吗?

组件版本:

  • 猎户座版本:contextBroker-0.19.0-1.x86_64
  • 天鹅座版本:cygnus-0.5-91.g3eb100e.x86_64
  • Cosmos:全局实例

Cygnus 配置文件:

cygnusagent.sources = http-source
cygnusagent.sinks = hdfs-sink 
cygnusagent.channels = hdfs-channel

#=============================================
# source configuration
# channel name where to write the notification events
cygnusagent.sources.http-source.channels = hdfs-channel
# source class, must not be changed
cygnusagent.sources.http-source.type = org.apache.flume.source.http.HTTPSource
# listening port the Flume source will use for receiving incoming notifications
cygnusagent.sources.http-source.port = 5050
# Flume handler that will parse the notifications, must not be changed
cygnusagent.sources.http-source.handler = es.tid.fiware.fiwareconnectors.cygnus.handlers.OrionRestHandler
# URL target
cygnusagent.sources.http-source.handler.notification_target = /notify
# Default organization (organization semantic depend on the persistence sink)
cygnusagent.sources.http-source.handler.default_organization = org42
# Number of channel re-injection retries before a Flume event is definitely discarded
cygnusagent.sources.http-source.handler.events_ttl = 10
# Management interface port (FIXME: temporal location for this parameter)
cygnusagent.sources.http-source.handler.management_port = 8081
# Source interceptors, do not change
cygnusagent.sources.http-source.interceptors = ts
# Timestamp interceptor, do not change
cygnusagent.sources.http-source.interceptors.ts.type = timestamp
# Destination extractor interceptor, do not change
cygnusagent.sources.http-source.interceptors.de.type = es.tid.fiware.fiwreconnectors.cygnus.interceptors.DestinationExtractor$Builder
# Matching table for the destination extractor interceptor, do not change
cygnusagent.sources.http-source.interceptors.de.matching_table = matching_table.conf

# ============================================
# OrionHDFSSink configuration
# channel name from where to read notification events
cygnusagent.sinks.hdfs-sink.channel = hdfs-channel
# sink class, must not be changed
cygnusagent.sinks.hdfs-sink.type = es.tid.fiware.fiwareconnectors.cygnus.sinks.OrionHDFSSink
# Comma-separated list of FQDN/IP address regarding the Cosmos Namenode endpoints
cygnusagent.sinks.hdfs-sink.cosmos_host = 130.206.80.46
# port of the Cosmos service listening for persistence operations; 14000 for httpfs, 50070 for webhdfs and free choice for inifinty
cygnusagent.sinks.hdfs-sink.cosmos_port = 14000
# default username allowed to write in HDFS
cygnusagent.sinks.hdfs-sink.cosmos_default_username = myuser
# default password for the default username
cygnusagent.sinks.hdfs-sink.cosmos_default_password = mypassword
# HDFS backend type (webhdfs, httpfs or infinity)
cygnusagent.sinks.hdfs-sink.hdfs_api = httpfs
# how the attributes are stored, either per row either per column (row, column)
cygnusagent.sinks.hdfs-sink.attr_persistence = column
# prefix for the database and table names, empty if no prefix is desired
cygnusagent.sinks.hdfs-sink.naming_prefix =
# Hive FQDN/IP address of the Hive server
cygnusagent.sinks.hdfs-sink.hive_host = 130.206.80.46
# Hive port for Hive external table provisioning
cygnusagent.sinks.hdfs-sink.hive_port = 10000

#=============================================
# hdfs-channel configuration
# channel type (must not be changed)
cygnusagent.channels.hdfs-channel.type = memory
# capacity of the channel
cygnusagent.channels.hdfs-channel.capacity = 1000
# amount of bytes that can be sent per transaction
cygnusagent.channels.hdfs-channel.transactionCapacity = 100
4

1 回答 1

0

我认为你应该配置cygnus的matching_table来定义路径和文件名。

您在 Cygnus 代理 conf 文件的同一路径中拥有该文件。

您可以按照下一个示例进行操作:

# integer id|comma-separated fields|regex to be applied to the fields concatenation|destination|dataset
#
# The available "dictionary" of fields is:
#  - entitydId
#  - entityType
#  - servicePath

1|entityId,entityType|Room\.(\d*)Room|numeric_rooms|rooms
于 2015-07-08T12:25:10.763 回答