我正在尝试将 Cygnus 集成到 CartoDB,但是当 Cygnus 收到 Orion 通知时,它不会将信息存储在 CartoDB 上。
跟踪日志跟踪
time=2016-12-19T14:37:13.657Z | lvl=DEBUG | corr=68c76dfc-c5f8-11e6-9346-fa163e00324f | trans=e2827b21-972d-4692-b25a-e1b252961491 | srv=default | subsrv=/ | comp=cygnus-ngsi | op=intercept | msg=com.telefonica.iot.cygnus.interceptors.NGSIGroupingInterceptor[127] : [gi] Event put in the channel, id=1724500127
time=2016-12-19T14:37:13.658Z | lvl=DEBUG | corr=68c76dfc-c5f8-11e6-9346-fa163e00324f | trans=e2827b21-972d-4692-b25a-e1b252961491 | srv=default | subsrv=/ | comp=cygnus-ngsi | op=debug | msg=org.mortbay.log.Slf4jLog[40] : RESPONSE /notify 200
time=2016-12-19T14:37:13.659Z | lvl=DEBUG | corr=68c76dfc-c5f8-11e6-9346-fa163e00324f | trans=e2827b21-972d-4692-b25a-e1b252961491 | srv=default | subsrv=/ | comp=cygnus-ngsi | op=debug | msg=org.mortbay.log.Slf4jLog[40] : EOF
time=2016-12-19T14:37:15.165Z | lvl=DEBUG | corr=68c76dfc-c5f8-11e6-9346-fa163e00324f | trans=e2827b21-972d-4692-b25a-e1b252961491 | srv=default | subsrv=/ | comp=cygnus-ngsi | op=processNewBatches | msg=com.telefonica.iot.cygnus.sinks.NGSISink[509] : Batch completed, persisting it
time=2016-12-19T14:37:15.166Z | lvl=DEBUG | corr=68c76dfc-c5f8-11e6-9346-fa163e00324f | trans=e2827b21-972d-4692-b25a-e1b252961491 | srv=default | subsrv=/ | comp=cygnus-ngsi | op=persistBatch | msg=com.telefonica.iot.cygnus.sinks.NGSICartoDBSink[333] : [cartodb-sink] Processing sub-batch regarding the default_/_waste1_wastectr destination
time=2016-12-19T14:37:15.166Z | lvl=DEBUG | corr=68c76dfc-c5f8-11e6-9346-fa163e00324f | trans=e2827b21-972d-4692-b25a-e1b252961491 | srv=default | subsrv=/ | comp=cygnus-ngsi | op=aggregate | msg=com.telefonica.iot.cygnus.sinks.NGSICartoDBSink$CartoDBAggregator[508] : [cartodb-sink] Processing context element (id=waste1, type=wastectr)
time=2016-12-19T14:37:15.166Z | lvl=DEBUG | corr=68c76dfc-c5f8-11e6-9346-fa163e00324f | trans=e2827b21-972d-4692-b25a-e1b252961491 | srv=default | subsrv=/ | comp=cygnus-ngsi | op=aggregate | msg=com.telefonica.iot.cygnus.sinks.NGSICartoDBSink$CartoDBAggregator[530] : [cartodb-sink] Processing context attribute (name=category, type=StructuredValue)
time=2016-12-19T14:37:15.167Z | lvl=DEBUG | corr=68c76dfc-c5f8-11e6-9346-fa163e00324f | trans=e2827b21-972d-4692-b25a-e1b252961491 | srv=default | subsrv=/ | comp=cygnus-ngsi | op=aggregate | msg=com.telefonica.iot.cygnus.sinks.NGSICartoDBSink$CartoDBAggregator[530] : [cartodb-sink] Processing context attribute (name=status, type=Text)
time=2016-12-19T14:37:15.171Z | lvl=DEBUG | corr=68c76dfc-c5f8-11e6-9346-fa163e00324f | trans=e2827b21-972d-4692-b25a-e1b252961491 | srv=default | subsrv=/ | comp=cygnus-ngsi | op=processNewBatches | msg=com.telefonica.iot.cygnus.sinks.NGSISink[523] : [java.util.ArrayList.rangeCheck(Unknown Source), java.util.ArrayList.get(Unknown Source), com.telefonica.iot.cygnus.sinks.NGSICartoDBSink$CartoDBAggregator.getRows(NGSICartoDBSink.java:410), com.telefonica.iot.cygnus.sinks.NGSICartoDBSink.persistRawAggregation(NGSICartoDBSink.java:552), com.telefonica.iot.cygnus.sinks.NGSICartoDBSink.persistBatch(NGSICartoDBSink.java:362), com.telefonica.iot.cygnus.sinks.NGSISink.processNewBatches(NGSISink.java:510), com.telefonica.iot.cygnus.sinks.NGSISink.process(NGSISink.java:327), org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68), org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147), java.lang.Thread.run(Unknown Source)]
time=2016-12-19T14:37:15.171Z | lvl=WARN | corr=68c76dfc-c5f8-11e6-9346-fa163e00324f | trans=e2827b21-972d-4692-b25a-e1b252961491 | srv=default | subsrv=/ | comp=cygnus-ngsi | op=processNewBatches | msg=com.telefonica.iot.cygnus.sinks.NGSISink[541] : Index: 0, Size: 0
time=2016-12-19T14:37:16.090Z | lvl=DEBUG | corr= | trans= | srv= | subsrv= | comp=cygnus-ngsi | op=run | msg=org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable[126] : Checking file:/usr/cygnus/conf/agent_ngsi_1.conf for changes
agent_ngsi_1.conf的配置是
接下来的树字段设置 Cygnus 使用的源、接收器和通道
cygnus-ngsi.sinks = cartodb-sink
cygnus-ngsi.channels = cartodb-channel
源配置
# channel name where to write the notification events
#cygnus-ngsi.sources.http-source.channels = hdfs-channel mysql-channel ckan- channel mongo-channel sth-channel kafka-channel dynamodb-channel postgresql- channel
cygnus-ngsi.sources.http-source.channels = cartodb-channel
# source class, must not be changed
cygnus-ngsi.sources.http-source.type = org.apache.flume.source.http.HTTPSource
# listening port the Flume source will use for receiving incoming notifications
cygnus-ngsi.sources.http-source.port = 5050
# Flume handler that will parse the notifications, must not be changed
cygnus-ngsi.sources.http-source.handler = com.telefonica.iot.cygnus.handlers.NGSIRestHandler
# URL target
cygnus-ngsi.sources.http-source.handler.notification_target = /notify
# default service (service semantic depends on the persistence sink)
cygnus-ngsi.sources.http-source.handler.default_service = default
# default service path (service path semantic depends on the persistence sink)
cygnus-ngsi.sources.http-source.handler.default_service_path = /
# source interceptors, do not change
cygnus-ngsi.sources.http-source.interceptors = ts gi
# TimestampInterceptor, do not change
cygnus-ngsi.sources.http-source.interceptors.ts.type = timestamp
# GroupingInterceptor, do not change
cygnus-ngsi.sources.http-source.interceptors.gi.type = com.telefonica.iot.cygnus.interceptors.NGSIGroupingInterceptor$Builder
# Grouping rules for the GroupingInterceptor, put the right absolute path to the file if necessary
# see the doc/design/interceptors document for more details
cygnus-ngsi.sources.http-source.interceptors.gi.grouping_rules_conf_file = /usr/cygnus/conf/grouping_rules.conf
NGSICartoDBSink 配置
# sink class, must not be changed
cygnus-ngsi.sinks.cartodb-sink.type = com.telefonica.iot.cygnus.sinks.NGSICartoDBSink
# channel name from where to read notification events
cygnus-ngsi.sinks.cartodb-sink.channel = cartodb-channel
# true if the grouping feature is enabled for this sink, false otherwise
cygnus-ngsi.sinks.cartodb-sink.enable_grouping = false
# true if name mappings are enabled for this sink, false otherwise
cygnus-ngsi.sinks.cartodb-sink.enable_name_mappings = false
# true if lower case is wanted to forced in all the element names, false otherwise
cygnus-ngsi.sinks.cartodb-sink.enable_lowercase = false
# select the data_model: dm-by-service-path or dm-by-entity
cygnus-ngsi.sinks.cartodb-sink.data_model = dm-by-entity
# absolute path to the CartoDB file containing the mapping between FIWARE service/CartoDB usernames and CartoDB API Keys
cygnus-ngsi.sinks.cartodb-sink.keys_conf_file = /usr/cygnus/conf/cartodb_keys.conf
# if true the latitude and longitude values are exchanged, false otherwise
#cygnus-ngsi.sinks.cartodb-sink.swap_coordinates = true
# if true, a raw based storage is done, false otherwise
cygnus-ngsi.sinks.cartodb-sink.enable_raw = true
# if true, a distance based storage is done, false otherwise
cygnus-ngsi.sinks.cartodb-sink.enable_distance = false
# number of notifications to be included within a processing batch
#cygnus-ngsi.sinks.cartodb-sink.batch_size = 100
# timeout for batch accumulation
#cygnus-ngsi.sinks.cartodb-sink.batch_timeout = 30
# number of retries upon persistence error
#cygnus-ngsi.sinks.cartodb-sink.batch_ttl = 10
# maximum number of connections allowed for a Http-based HDFS backend
#cygnus-ngsi.sinks.cartodb-sink.backend.max_conns = 500
# maximum number of connections per route allowed for a Http-based HDFS backend
#cygnus-ngsi.sinks.cartodb-sink.backend.max_conns_per_route = 100