2

我正在尝试将 cygnus 与 CKAN 一起使用,但是当属性为 JSON 类型时,CKAN 中没有保留任何数据。首先,我向 Orion 发送信息:

Accept: application/json
X-AUTH-TOKEN: <mytoken>
Fiware-Service: PapelClubDemo
Fiware-ServicePath: /events/leonliterario
{
    "contextElements": [
        {
            "type": "Events",
            "isPattern": "false",
            "id": "thisweek",
            "attributes": [
                {
                    "name": "schedule",
                    "type": "json",
                    "value": [{"title": "Presentación Viva Mi Gente","date": "2015-11-30","location": "Salón de actos del ICAL","url": "http://www.papel.club"}]
                }
            ]
        }
    ],
    "updateAction": "APPEND"
}

我在 Cygnus 中订阅了这个实体,这是我在 Cygnus 日志中收到的信息:

01 Dec 2015 19:05:13,701 INFO [891993589@qtp-1988714671-0] (com.telefonica.iot.cygnus.handlers.OrionRestHandler.getEvents:232) - Received data ({"subscriptionId" : "565dd3497b72b7c7092d5a29", "originator" : "localhost", "contextResponses" : [ { "contextElement" : { "type" : "Events","isPattern" : "false", "id" : "thisweek004", "attributes" : [ { "name" : "schedule", "type" : "json", "value" : [ { "title" : "Presentación Viva MiGente", "date" : "2015-11-30", "location" : "Salón de actos del ICAL", "url" : "http://www.papel.club" }, { "title" : "Presentación Viva Mi Gente2","date" : "2015-11-30", "location" : "Salón de actos del ICAL", "url" : "http://www.papel.club" } ] } ] }, "statusCode" : { "code" : "200","reasonPhrase" : "OK" } } ]}) 01 Dec 2015 19:05:13,702 INFO [891993589@qtp-1988714671-0](com.telefonica.iot.cygnus.handlers.OrionRestHandler.getEvents:255) - Event put in the channel (id=2134043204, ttl=10) 01 Dec 2015 19:05:16,842 INFO[SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionCKANSink.persistOne:207) - [ckan-sink] Persisting data atOrionCKANSink (orgName=papelclubdemo, pkgName=papelclubdemo_events_leonliterario, resName=thisweek004_events,data=1448989513702,2015-12-01T17:05:13.702Z,thisweek004,Events,schedule,json,[{"title":"Presentación Viva MiGente","date":"2015-11-30","location":"Salón de actos del ICAL","url":"http://www.papel.club"},{"title":"Presentación Viva MiGente2","date":"2015-11-30","location":"Salón de actos del ICAL","url":"http://www.papel.club"}],[])
01 Dec 2015 19:05:19,479 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:224) - Runtime error (Don't know how to treat response code 503)
01 Dec 2015 19:05:19,480 INFO [SinkRunner-PollingRunner-DefaultSinkProcessor] (com.telefonica.iot.cygnus.sinks.OrionSink.process:233) - Finishing transaction (1448984542-601-0000000018)

这是我的天鹅座代理配置:

# Flume handler that will parse the notifications, must not be changed
cygnusagent.sources.http-source.handler = com.telefonica.iot.cygnus.handlers.OrionRestHandler
# URL target
cygnusagent.sources.http-source.handler.notification_target = /notify
# Default service (service semantic depends on the persistence sink)
cygnusagent.sources.http-source.handler.default_service = def_serv
# Default service path (service path semantic depends on the persistence sink)
cygnusagent.sources.http-source.handler.default_service_path = def_servpath
# Number of channel re-injection retries before a Flume event is definitely discarded (-1 means infinite retries)
cygnusagent.sources.http-source.handler.events_ttl = 10
# Source interceptors, do not change
cygnusagent.sources.http-source.interceptors = ts gi
# TimestampInterceptor, do not change
cygnusagent.sources.http-source.interceptors.ts.type = timestamp
# GroupinInterceptor, do not change
cygnusagent.sources.http-source.interceptors.gi.type = com.telefonica.iot.cygnus.interceptors.GroupingInterceptor$Builder
# Grouping rules for the GroupingInterceptor, put the right absolute path to the file if necessary
# See the doc/design/interceptors document for more details
cygnusagent.sources.http-source.interceptors.gi.grouping_rules_conf_file = /usr/cygnus/conf/grouping_rules.conf

# ============================================
# OrionCKANSink configuration
# channel name from where to read notification events
cygnusagent.sinks.ckan-sink.channel = ckan-channel
# sink class, must not be changed
cygnusagent.sinks.ckan-sink.type = com.telefonica.iot.cygnus.sinks.OrionCKANSink
# the CKAN API key to use
#cygnusagent.sinks.ckan-sink.api_key = <mykey>
# the FQDN/IP address for the CKAN API endpoint
cygnusagent.sinks.ckan-sink.ckan_host = demo.ckan.org
# the port for the CKAN API endpoint
cygnusagent.sinks.ckan-sink.ckan_port = 80
# Orion URL used to compose the resource URL with the convenience operation URL to query it
cygnusagent.sinks.ckan-sink.orion_url = http://127.0.0.1:1026
# how the attributes are stored, either per row either per column (row, column)
cygnusagent.sinks.ckan-sink.attr_persistence = row
# enable SSL for secure Http transportation; 'true' or 'false'
cygnusagent.sinks.ckan-sink.ssl = false

当 cygnus 将数据持久化到 demo.ckan.org 时,会正确创建组织、数据集和资源,但不会加载数据。

4

1 回答 1

0

这是因为自动创建资源及其关联数据存储的row模式,其中所有字段都具有“文本”类型。OrionCKANSink为什么?因为 orion 发送的类型不是真正的有语义的类型,而只是对用户认​​为的属性类型是什么的描述。我的意思是,Orion 类型可以是“浮点数”,但也可以是“精度为 4 的浮点数”。因此,无法获得数据的真实类型(无需花费大量时间进行启发式尝试推断真实数据类型)。因此,行模式具有自动创建资源(和数据存储)的优势,但约束是所有通知的数据必须是字符串。

如果您在 CKAN 数据存储中需要真正的 Json 类型,那么 Cygnus 的推荐模式就是其中column一种。

您可以在此处查看有关不同模式的更详细的讨论

于 2015-12-10T10:11:01.670 回答