我正在尝试将 cygnus 与 CKAN 一起使用,但是当属性为 JSON 类型时,CKAN 中没有保留任何数据。首先,我向 Orion 发送信息:
Accept: application/json
X-AUTH-TOKEN: <mytoken>
Fiware-Service: PapelClubDemo
Fiware-ServicePath: /events/leonliterario
{
"contextElements": [
{
"type": "Events",
"isPattern": "false",
"id": "thisweek",
"attributes": [
{
"name": "schedule",
"type": "json",
"value": [{"title": "Presentación Viva Mi Gente","date": "2015-11-30","location": "Salón de actos del ICAL","url": "http://www.papel.club"}]
}
]
}
],
"updateAction": "APPEND"
}
我在 Cygnus 中订阅了这个实体,这是我在 Cygnus 日志中收到的信息:
01 Dec 2015 19:05:13,701 INFO [891993589@qtp-1988714671-0] (com.telefonica.iot.cygnus.handlers.OrionRestHandler.getEvents:232) - Received data ({"subscriptionId" : "565dd3497b72b7c7092d5a29", "originator" : "localhost", "contextResponses" : [ { "contextElement" : { "type" : "Events","isPattern" : "false", "id" : "thisweek004", "attributes" : [ { "name" : "schedule", "type" : "json", "value" : [ { "title" : "Presentación Viva MiGente", "date" : "2015-11-30", "location" : "Salón de actos del ICAL", "url" : "http://www.papel.club" }, { "title" : "Presentación Viva Mi Gente2","date" : "2015-11-30", "location" : "Salón de actos del ICAL", "url" : "http://www.papel.club" } ] } ] }, "statusCode" : { "code" : "200","reasonPhrase" : "OK" } } ]}) 01 Dec 2015 19:05:13,702 INFO [891993589@qtp-1988714671-0](com.telefonica.iot.cygnus.handlers.OrionRestHandler.getEvents:255) - Event put in the channel (id=2134043204, ttl=10) 01 Dec 2015 19:05:16,842 INFO[SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionCKANSink.persistOne:207) - [ckan-sink] Persisting data atOrionCKANSink (orgName=papelclubdemo, pkgName=papelclubdemo_events_leonliterario, resName=thisweek004_events,data=1448989513702,2015-12-01T17:05:13.702Z,thisweek004,Events,schedule,json,[{"title":"Presentación Viva MiGente","date":"2015-11-30","location":"Salón de actos del ICAL","url":"http://www.papel.club"},{"title":"Presentación Viva MiGente2","date":"2015-11-30","location":"Salón de actos del ICAL","url":"http://www.papel.club"}],[])
01 Dec 2015 19:05:19,479 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:224) - Runtime error (Don't know how to treat response code 503)
01 Dec 2015 19:05:19,480 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:233) - Finishing transaction (1448984542-601-0000000018)
这是我的天鹅座代理配置:
# Flume handler that will parse the notifications, must not be changed
cygnusagent.sources.http-source.handler = com.telefonica.iot.cygnus.handlers.OrionRestHandler
# URL target
cygnusagent.sources.http-source.handler.notification_target = /notify
# Default service (service semantic depends on the persistence sink)
cygnusagent.sources.http-source.handler.default_service = def_serv
# Default service path (service path semantic depends on the persistence sink)
cygnusagent.sources.http-source.handler.default_service_path = def_servpath
# Number of channel re-injection retries before a Flume event is definitely discarded (-1 means infinite retries)
cygnusagent.sources.http-source.handler.events_ttl = 10
# Source interceptors, do not change
cygnusagent.sources.http-source.interceptors = ts gi
# TimestampInterceptor, do not change
cygnusagent.sources.http-source.interceptors.ts.type = timestamp
# GroupinInterceptor, do not change
cygnusagent.sources.http-source.interceptors.gi.type = com.telefonica.iot.cygnus.interceptors.GroupingInterceptor$Builder
# Grouping rules for the GroupingInterceptor, put the right absolute path to the file if necessary
# See the doc/design/interceptors document for more details
cygnusagent.sources.http-source.interceptors.gi.grouping_rules_conf_file = /usr/cygnus/conf/grouping_rules.conf
# ============================================
# OrionCKANSink configuration
# channel name from where to read notification events
cygnusagent.sinks.ckan-sink.channel = ckan-channel
# sink class, must not be changed
cygnusagent.sinks.ckan-sink.type = com.telefonica.iot.cygnus.sinks.OrionCKANSink
# the CKAN API key to use
#cygnusagent.sinks.ckan-sink.api_key = <mykey>
# the FQDN/IP address for the CKAN API endpoint
cygnusagent.sinks.ckan-sink.ckan_host = demo.ckan.org
# the port for the CKAN API endpoint
cygnusagent.sinks.ckan-sink.ckan_port = 80
# Orion URL used to compose the resource URL with the convenience operation URL to query it
cygnusagent.sinks.ckan-sink.orion_url = http://127.0.0.1:1026
# how the attributes are stored, either per row either per column (row, column)
cygnusagent.sinks.ckan-sink.attr_persistence = row
# enable SSL for secure Http transportation; 'true' or 'false'
cygnusagent.sinks.ckan-sink.ssl = false
当 cygnus 将数据持久化到 demo.ckan.org 时,会正确创建组织、数据集和资源,但不会加载数据。