2

我正在集成 orion(1.5.0)-cygnus(1.5.0)-cartodb (使用 carto.com 端点)。执行的步骤是:

  1. 在 orion 上创建实体
  2. 在 orion 上创建 v2/subscriptions,以便在属性更改时通知 cygnus,使用 Orion NGSIv2 订阅通知 NGSIv1 中的 cygnus
  3. Cygnus 使用通道 cartodb-channel ckan-channel 来将数据存储在 ckan 和 cartodb 上:

    cygnus-ngsi.sinks = cartodb-sink ckan-sink
    cygnus-ngsi.channels = cartodb-channel ckan-channel
    

在 ckan 的情况下,该过程结束正常。

在 cartodb 的情况下,该进程返回 RESPONSE /notify 200 但没有数据存储在 cartodb 上,该进程似乎在 cartodb-sink 处理上停止。

这里的日志:

time=2016-11-11T14:28:55.188Z | lvl=DEBUG | corr=2cddd09c-a81b-11e6-ba15-fa163e00324f | trans=f0966198-d439-4351-9bd0-ecabc3c2b762 | srv=parking | subsrv=/car | comp=cygnus-ngsi | op=intercept | msg=com.telefonica.iot.cygnus.interceptors.NGSIGroupingInterceptor[158] : Event put in the channel, id=1766793715
time=2016-11-11T14:28:55.188Z | lvl=DEBUG | corr=2cddd09c-a81b-11e6-ba15-fa163e00324f | trans=f0966198-d439-4351-9bd0-ecabc3c2b762 | srv=parking | subsrv=/car | comp=cygnus-ngsi | op=processNewBatches | msg=com.telefonica.iot.cygnus.sinks.NGSISink[512] : Event got from the channel (id=1766793715, headers=
{notified-entities=testparking_parking, timestamp=1478874535186, fiware-correlator=2cddd09c-a81b-11e6-ba15-fa163e00324f, grouped-servicepaths=/car, fiware-service=parking, fiware-servicepath=/car, transaction-id=f0966198-d439-4351-9bd0-ecabc3c2b762, grouped-entities=testparking_parking}
, bodyLength=454)
time=2016-11-11T14:28:55.189Z | lvl=DEBUG | corr=2cddd09c-a81b-11e6-ba15-fa163e00324f | trans=f0966198-d439-4351-9bd0-ecabc3c2b762 | srv=parking | subsrv=/car | comp=cygnus-ngsi | op=processNewBatches | msg=com.telefonica.iot.cygnus.sinks.NGSISink[543] : Batch completed, persisting it
time=2016-11-11T14:28:55.189Z | lvl=DEBUG | corr=2cddd09c-a81b-11e6-ba15-fa163e00324f | trans=f0966198-d439-4351-9bd0-ecabc3c2b762 | srv=parking | subsrv=/car | comp=cygnus-ngsi | op=persistBatch | msg=com.telefonica.iot.cygnus.sinks.NGSICartoDBSink[285] : [cartodb-sink] Processing sub-batch regarding the parking_/car_testparking_parking destination
time=2016-11-11T14:28:55.190Z | lvl=DEBUG | corr=2cddd09c-a81b-11e6-ba15-fa163e00324f | trans=f0966198-d439-4351-9bd0-ecabc3c2b762 | srv=parking | subsrv=/car | comp=cygnus-ngsi | op=aggregate | msg=com.telefonica.iot.cygnus.sinks.NGSICartoDBSink$CartoDBAggregator[456] : [cartodb-sink] Processing context element (id=testparking, type=parking)
time=2016-11-11T14:28:55.190Z | lvl=DEBUG | corr=2cddd09c-a81b-11e6-ba15-fa163e00324f | trans=f0966198-d439-4351-9bd0-ecabc3c2b762 | srv=parking | subsrv=/car | comp=cygnus-ngsi | op=aggregate | msg=com.telefonica.iot.cygnus.sinks.NGSICartoDBSink$CartoDBAggregator[478] : [cartodb-sink] Processing context attribute (name=full, type=number)
time=2016-11-11T14:28:55.190Z | lvl=DEBUG | corr=2cddd09c-a81b-11e6-ba15-fa163e00324f | trans=f0966198-d439-4351-9bd0-ecabc3c2b762 | srv=parking | subsrv=/car | comp=cygnus-ngsi | op=processNewBatches | msg=com.telefonica.iot.cygnus.sinks.NGSISink[557] : [java.util.ArrayList.rangeCheck(Unknown Source), java.util.ArrayList.get(Unknown Source), com.telefonica.iot.cygnus.sinks.NGSICartoDBSink$CartoDBAggregator.getRows(NGSICartoDBSink.java:358), com.telefonica.iot.cygnus.sinks.NGSICartoDBSink.persistRawAggregation(NGSICartoDBSink.java:499), com.telefonica.iot.cygnus.sinks.NGSICartoDBSink.persistBatch(NGSICartoDBSink.java:310), com.telefonica.iot.cygnus.sinks.NGSISink.processNewBatches(NGSISink.java:544), com.telefonica.iot.cygnus.sinks.NGSISink.process(NGSISink.java:330), org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68), org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147), java.lang.Thread.run(Unknown Source)]
time=2016-11-11T14:28:55.190Z | lvl=WARN | corr=2cddd09c-a81b-11e6-ba15-fa163e00324f | trans=f0966198-d439-4351-9bd0-ecabc3c2b762 | srv=parking | subsrv=/car | comp=cygnus-ngsi | op=processNewBatches | msg=com.telefonica.iot.cygnus.sinks.NGSISink[575] : Index: 0, Size: 0
time=2016-11-11T14:28:55.191Z | lvl=DEBUG | corr=2cddd09c-a81b-11e6-ba15-fa163e00324f | trans=f0966198-d439-4351-9bd0-ecabc3c2b762 | srv=parking | subsrv=/car | comp=cygnus-ngsi | op=debug | msg=org.mortbay.log.Slf4jLog[40] : RESPONSE /notify 200
time=2016-11-11T14:28:55.191Z | lvl=DEBUG | corr=2cddd09c-a81b-11e6-ba15-fa163e00324f | trans=f0966198-d439-4351-9bd0-ecabc3c2b762 | srv=parking | subsrv=/car | comp=cygnus-ngsi | op=debug | msg=org.mortbay.log.Slf4jLog[40] : EOF
4

1 回答 1

0

您的实体中没有与地理位置相关的属性。Carto 是一个地理定位数据库,因此,必须发送以下任何选项的属性才能使插入有意义:

  • geo:point,在这种情况下,geolocated 属性大约是一个点。
  • geo:json,尽管 GeoJson 可以描述任何几何图形,从简单的点到复杂的多边形,它必须表示单个点。
  • 位置元数据,在这种情况下,地理定位属性大约是一个点。

编辑1:

我将添加一个完整的示例:

天鹅座配置:

cygnus-ngsi.sources = http-source
cygnus-ngsi.sinks = raw-sink rawsnapshot-sink distance-sink
cygnus-ngsi.channels = raw-channel rawsnapshot-channel distance-channel

cygnus-ngsi.sources.http-source.channels = raw-channel rawsnapshot-channel distance-channel
cygnus-ngsi.sources.http-source.type = org.apache.flume.source.http.HTTPSource
cygnus-ngsi.sources.http-source.port = 5050
cygnus-ngsi.sources.http-source.handler = com.telefonica.iot.cygnus.handlers.NGSIRestHandler
cygnus-ngsi.sources.http-source.handler.notification_target = /notify
cygnus-ngsi.sources.http-source.handler.default_service = default
cygnus-ngsi.sources.http-source.handler.default_service_path = /
cygnus-ngsi.sources.http-source.interceptors = ts gi
cygnus-ngsi.sources.http-source.interceptors.ts.type = timestamp
cygnus-ngsi.sources.http-source.interceptors.gi.type = com.telefonica.iot.cygnus.interceptors.NGSIGroupingInterceptor$Builder
cygnus-ngsi.sources.http-source.interceptors.gi.grouping_rules_conf_file = /path/to/grouping_rules.conf
cygnus-ngsi.sources.http-source.interceptors.gi.enable_new_encoding = false

cygnus-ngsi.sinks.raw-sink.channel = raw-channel
cygnus-ngsi.sinks.raw-sink.type = com.telefonica.iot.cygnus.sinks.NGSICartoDBSink
cygnus-ngsi.sinks.raw-sink.enable_grouping = false
cygnus-ngsi.sinks.raw-sink.keys_conf_file = /path/to/cartodb_keys.conf
cygnus-ngsi.sinks.raw-sink.flip_coordinates = false
cygnus-ngsi.sinks.raw-sink.enable_raw = true
cygnus-ngsi.sinks.raw-sink.enable_distance = false
cygnus-ngsi.sinks.raw-sink.enable_raw_snapshot = false
cygnus-ngsi.sinks.raw-sink.data_model = dm-by-entity
cygnus-ngsi.sinks.raw-sink.batch_size = 50
cygnus-ngsi.sinks.raw-sink.batch_timeout = 5
cygnus-ngsi.sinks.raw-sink.batch_ttl = 0

cygnus-ngsi.sinks.rawsnapshot-sink.channel = rawsnapshot-channel
cygnus-ngsi.sinks.rawsnapshot-sink.type = com.telefonica.iot.cygnus.sinks.NGSICartoDBSink
cygnus-ngsi.sinks.rawsnapshot-sink.enable_grouping = false
cygnus-ngsi.sinks.rawsnapshot-sink.keys_conf_file = /path/to/cartodb_keys.conf
cygnus-ngsi.sinks.rawsnapshot-sink.flip_coordinates = false
cygnus-ngsi.sinks.rawsnapshot-sink.enable_raw = false
cygnus-ngsi.sinks.rawsnapshot-sink.enable_distance = false
cygnus-ngsi.sinks.rawsnapshot-sink.enable_raw_snapshot = true
cygnus-ngsi.sinks.rawsnapshot-sink.data_model = dm-by-entity
cygnus-ngsi.sinks.rawsnapshot-sink.batch_size = 50
cygnus-ngsi.sinks.rawsnapshot-sink.batch_timeout = 5
cygnus-ngsi.sinks.rawsnapshot-sink.batch_ttl = 0

cygnus-ngsi.sinks.distance-sink.channel = distance-channel
cygnus-ngsi.sinks.distance-sink.type = com.telefonica.iot.cygnus.sinks.NGSICartoDBSink
cygnus-ngsi.sinks.distance-sink.enable_grouping = false
cygnus-ngsi.sinks.distance-sink.keys_conf_file = /path/to/cartodb_keys.conf
cygnus-ngsi.sinks.distance-sink.flip_coordinates = false
cygnus-ngsi.sinks.distance-sink.enable_raw = false
cygnus-ngsi.sinks.distance-sink.enable_distance = true
cygnus-ngsi.sinks.distance-sink.enable_raw_snapshot = false
cygnus-ngsi.sinks.distance-sink.data_model = dm-by-entity
cygnus-ngsi.sinks.distance-sink.batch_size = 50
cygnus-ngsi.sinks.distance-sink.batch_timeout = 5
cygnus-ngsi.sinks.distance-sink.batch_ttl = 0

cygnus-ngsi.channels.raw-channel.type = com.telefonica.iot.cygnus.channels.CygnusMemoryChannel
cygnus-ngsi.channels.raw-channel.capacity = 10000
cygnus-ngsi.channels.raw-channel.transactionCapacity = 1000

cygnus-ngsi.channels.rawsnapshot-channel.type = com.telefonica.iot.cygnus.channels.CygnusMemoryChannel
cygnus-ngsi.channels.rawsnapshot-channel.capacity = 10000
cygnus-ngsi.channels.rawsnapshot-channel.transactionCapacity = 1000

cygnus-ngsi.channels.distance-channel.type = com.telefonica.iot.cygnus.channels.CygnusMemoryChannel
cygnus-ngsi.channels.distance-channel.capacity = 10000
cygnus-ngsi.channels.distance-channel.transactionCapacity = 1000

通知脚本(这只是为了模拟 Orion 的真实通知):

#!/bin/sh

URL=$1

if [ "$2" != "" ]
then
   SERVICE=$2
else
   SERVICE=default
fi

if [ "$3" != "" ]
then
   SERVICE_PATH=$3
else
   SERVICE_PATH=/
fi

curl $URL -v -s -S --header 'Content-Type: application/json; charset=utf-8' --header 'Accept: application/json' --header 'User-Agent: orion/0.10.0' --header "Fiware-Service: $SERVICE" --header "Fiware-ServicePath: $SERVICE_PATH" -d @- <<EOF
{
  "subscriptionId" : "51c0ac9ed714fb3b37d7d5a8",
  "originator" : "localhost",
  "contextResponses" : [
    {
      "contextElement" : {
        "attributes" : [
          {
            "name" : "speed",
            "type" : "float",
            "value" : "$6"
          },
          {
            "name" : "the_geom",
            "type" : "geometry",
            "value" : "$4, $5",
            "metadatas": [
              {
                "name": "location",
                "type": "string",
                "value": "WGS84"
              }
            ]
          }
        ],
        "type" : "car",
        "isPattern" : "false",
        "id" : "car1"
      },
      "statusCode" : {
        "code" : "200",
        "reasonPhrase" : "OK"
      }
    }
  ]
}
EOF

通知发送:

$ ./notification.sh http://localhost:5050/notify mycartoid /traffic 40.361 -3.4099 78
*   Trying ::1...
* Connected to localhost (::1) port 5050 (#0)
> POST /notify HTTP/1.1
> Host: localhost:5050
> Content-Type: application/json; charset=utf-8
> Accept: application/json
> User-Agent: orion/0.10.0
> Fiware-Service: mycartoid
> Fiware-ServicePath: /traffic
> Content-Length: 741
> 
* upload completely sent off: 741 out of 741 bytes
< HTTP/1.1 200 OK
< Transfer-Encoding: chunked
< Server: Jetty(6.1.26)
< 
* Connection #0 to host localhost left intact

天鹅座日志:

time=2016-11-29T08:49:10.033UTC | lvl=INFO | corr=ebbc3df9-b8e0-4ff1-9980-75c69ca272c9 | trans=ebbc3df9-b8e0-4ff1-9980-75c69ca272c9 | srv=mycartoid | subsrv=/traffic | comp=cygnus-ngsi | op=getEvents | msg=com.telefonica.iot.cygnus.handlers.NGSIRestHandler[282] : [NGSIRestHandler] Starting internal transaction (ebbc3df9-b8e0-4ff1-9980-75c69ca272c9)
time=2016-11-29T08:49:10.034UTC | lvl=INFO | corr=ebbc3df9-b8e0-4ff1-9980-75c69ca272c9 | trans=ebbc3df9-b8e0-4ff1-9980-75c69ca272c9 | srv=mycartoid | subsrv=/traffic | comp=cygnus-ngsi | op=getEvents | msg=com.telefonica.iot.cygnus.handlers.NGSIRestHandler[299] : [NGSIRestHandler] Received data ({  "subscriptionId" : "51c0ac9ed714fb3b37d7d5a8",  "originator" : "localhost",  "contextResponses" : [    {      "contextElement" : {        "attributes" : [          {            "name" : "speed",            "type" : "float",            "value" : "78"          },          {            "name" : "the_geom",            "type" : "geometry",            "value" : "40.361, -3.4099",            "metadatas": [              {                "name": "location",                "type": "string",                "value": "WGS84"              }            ]          }        ],        "type" : "car",        "isPattern" : "false",        "id" : "car1"      },      "statusCode" : {        "code" : "200",        "reasonPhrase" : "OK"      }    }  ]})
time=2016-11-29T08:49:14.110UTC | lvl=INFO | corr=ebbc3df9-b8e0-4ff1-9980-75c69ca272c9 | trans=ebbc3df9-b8e0-4ff1-9980-75c69ca272c9 | srv=mycartoid | subsrv=/traffic | comp=cygnus-ngsi | op=rawUpdateEvent | msg=com.telefonica.iot.cygnus.sinks.NGSICartoDBSink[748] : [rawsnapshot-sink] Updating data at NGSICartoDBSink. Schema (mycartoid), Table (x002ftrafficxffffrawsnapshot), Sets (speed='78',speed_md='[]',the_geom=ST_SetSRID(ST_MakePoint(40.361,-3.4099), 4326)), Where (fiwareServicePath='/traffic' AND entityId='car1' AND entityType='car')
time=2016-11-29T08:49:14.111UTC | lvl=INFO | corr=ebbc3df9-b8e0-4ff1-9980-75c69ca272c9 | trans=ebbc3df9-b8e0-4ff1-9980-75c69ca272c9 | srv=mycartoid | subsrv=/traffic | comp=cygnus-ngsi | op=persistRawAggregation | msg=com.telefonica.iot.cygnus.sinks.NGSICartoDBSink[553] : [raw-sink] Persisting data at NGSICartoDBSink. Schema (mycartoid), Table (x002ftrafficxffffcar1xffffcar), Data (('2016-11-29T08:49:10.99Z','/traffic','car1','car',ST_SetSRID(ST_MakePoint(40.361,-3.4099), 4326),'78','[]'))
time=2016-11-29T08:49:15.243UTC | lvl=INFO | corr=ebbc3df9-b8e0-4ff1-9980-75c69ca272c9 | trans=ebbc3df9-b8e0-4ff1-9980-75c69ca272c9 | srv=mycartoid | subsrv=/traffic | comp=cygnus-ngsi | op=processNewBatches | msg=com.telefonica.iot.cygnus.sinks.NGSISink[514] : Finishing internal transaction (ebbc3df9-b8e0-4ff1-9980-75c69ca272c9)
time=2016-11-29T08:49:15.243UTC | lvl=INFO | corr=ebbc3df9-b8e0-4ff1-9980-75c69ca272c9 | trans=ebbc3df9-b8e0-4ff1-9980-75c69ca272c9 | srv=mycartoid | subsrv=/traffic | comp=cygnus-ngsi | op=persistDistanceEvent | msg=com.telefonica.iot.cygnus.sinks.NGSICartoDBSink[678] : [distance-sink] Persisting data at NGSICartoDBSink. Schema (mycartoid), Table (x002ftrafficxffffcar1xffffcarxffffdistance), Data ((1480409350099,'/traffic','car1','car',(SELECT point FROM geom),(SELECT stage_distance FROM calcs),(SELECT stage_time FROM calcs),(SELECT stage_speed FROM speed),(SELECT sum_dist FROM inserts),(SELECT sum_time FROM inserts),(SELECT sum_speed FROM inserts),(SELECT sum2_dist FROM inserts),(SELECT sum2_time FROM inserts),(SELECT sum2_speed FROM inserts),(SELECT max_distance FROM inserts),(SELECT min_distance FROM inserts),(SELECT max_time FROM inserts),(SELECT min_time FROM inserts),(SELECT max_speed FROM inserts),(SELECT min_speed FROM inserts),(SELECT num_samples FROM inserts)))
time=2016-11-29T08:49:15.243UTC | lvl=INFO | corr=ebbc3df9-b8e0-4ff1-9980-75c69ca272c9 | trans=ebbc3df9-b8e0-4ff1-9980-75c69ca272c9 | srv=mycartoid | subsrv=/traffic | comp=cygnus-ngsi | op=processNewBatches | msg=com.telefonica.iot.cygnus.sinks.NGSISink[514] : Finishing internal transaction (ebbc3df9-b8e0-4ff1-9980-75c69ca272c9)
time=2016-11-29T08:49:15.475UTC | lvl=INFO | corr=ebbc3df9-b8e0-4ff1-9980-75c69ca272c9 | trans=ebbc3df9-b8e0-4ff1-9980-75c69ca272c9 | srv=mycartoid | subsrv=/traffic | comp=cygnus-ngsi | op=processNewBatches | msg=com.telefonica.iot.cygnus.sinks.NGSISink[514] : Finishing internal transaction (ebbc3df9-b8e0-4ff1-9980-75c69ca272c9)

查询 Carto(原始历史距离历史):

$ curl -G "https://mycartoid.cartodb.com/api/v2/sql?api_key=xxx" --data-urlencode "q=SELECT * FROM 4wheels_car1_car"
{"rows":[
   {
      "cartodb_id":1,
      "the_geom":"0101000020E61000006891ED7C3F350BC0D7A3703D0A374440",
      "the_geom_webmercator":"0101000020110F000060EF4D5A961B17C1A59940B...",
      "recvtime":"2016-11-28T08:29:35.44Z”,"fiwareservicepath":"/4wheels",
      "entityid":"car1”,"entitytype":"car”,"speed":112.9,"speed_md":"[]"
   },
   {
      "cartodb_id":2,
      "the_geom":"0101000020E61000009EEFA7C64B370BC03108AC1C5A344440",
      "the_geom_webmercator":"0101000020110F0000A58776A1531D17C1CBEB3D11...",
      "recvtime":"2016-11-28T08:30:35.977Z","fiwareservicepath":”/4wheels",
      "entityid":"car1","entitytype":"car","speed":95,"speed_md":"[]"
   },
   {
      ...
   }]
}


$ curl -G "https://mycartoid.cartodb.com/api/v2/sql?api_key=xxx" --data-urlencode "q=SELECT * FROM 4wheels_car1_car_distance"
{"rows":[
   {
      "cartodb_id":1,
      "the_geom":"0101000020E610000080B74082E2470BC05839B4C8762E4440",
      "the_geom_webmercator":"0101000020110F0000F9F1D8616A2B17C18CA42D61...",
      "recvtimems":1480342611051,"fiwareservicepath":”/4wheels",
      "entityid":"car1","entitytype":"car","stagedistance":0,
      "stagetime":0,"stagespeed":0,"sumdistance":0,"sumtime":0,
      "sumspeed":0,"sum2distance":0,"sum2time":0,"sum2speed":0,
      "maxdistance":1.4e-45,"mindistance":3.4028235e+38,"maxtime":1.4e-45,
      "mintime":3.4028235e+38,"maxspeed":1.4e-45,"minspeed":3.4028235e+38,
      "numsamples":1
   },
   {
      "cartodb_id":2,
      "the_geom":"0101000020E61000000F0BB5A679470BC091ED7C3F352E4440",
      "the_geom_webmercator":"0101000020110F000051A09D53112B17C17D410E55...",
      "recvtimems":1480342698000,"fiwareservicepath":”/4wheels",
      "entityid":"car1","entitytype":"car","stagedistance":222.732003988,
      "stagetime":86949,"stagespeed":0.0025616396276898,
      "sumdistance":222.732003988,"sumtime":86949,
      "sumspeed":0.0025616396276898,"sum2distance":49609.5456005104,
      "sum2time":7560128601,"sum2speed":0.00000656199758215071,
      "maxdistance":222.732003988,"mindistance":222.732003988,
      "maxtime":86949,"mintime":86949,"maxspeed":0.0025616396276898,
      "minspeed":0.0025616396276898,"numsamples":2
   },
   {
      ...
   }]
}
于 2016-11-16T13:16:41.103 回答