3

我按照本文档中解释的说明将 Apache Kafka 连接到 Eclipse Ditto。

https://www.eclipse.org/ditto/connectivity-protocol-bindings-kafka2.html

我不确定以下内容。

1) ["ditto:outbound-auth-subject", "..."] 在授权上下文下。

2)“地址”:“主题/关键”

请让我知道他们!先感谢您。!

编辑:

请找到我用来连接 Ditto 和 Kafka 的命令

curl -X POST -i -u devops:foobar -H 'Content-Type: application/json' -d '{
"targetActorSelection": "/system/sharding/connection",
"headers": {
    "aggregate": false
},
"piggybackCommand": {
    "type": "connectivity.commands:createConnection",
    "connection": {
        "id": "MyKafkaConnection1",
        "connectionType": "kafka",
        "connectionStatus": "open",
        "uri": "tcp://radsah:password@localhost:9092",
        "specificConfig": {
        "bootstrapServers": "10.196.2.218:9092",
        "saslMechanism": "plain"
        },
        "failoverEnabled": true,
        "targets": [
              {
             "address": "digital-twins",
              "topics": [
               "_/_/things/twin/events",
               "_/_/things/live/messages"
                            ],
            "authorizationContext": ["ditto:outbound-auth-subject"]
        }],
        "mappingContext": {
            "mappingEngine": "JavaScript",
            "options": {
                "incomingScript": "function mapToDittoProtocolMsg(\n    headers,\n    textPayload,\n    bytePayload,\n    contentType\n) {\n\n    if (contentType !== \"application/json\") {\n        return null;\n    }\n\n    var jsonData = JSON.parse(textPayload);\n    var temperature = jsonData.temp;\n    var humidity = jsonData.hum;\n    \n    var path;\n    var value;\n    if (temperature != null && humidity != null) {\n        path = \"/features\";\n        value = {\n                temperature: {\n                    properties: {\n                        value: temperature\n                    }\n                },\n                humidity: {\n                    properties: {\n                        value: humidity\n                    }\n                }\n            };\n    } else if (temperature != null) {\n        path = \"/features/temperature/properties/value\";\n        value = temperature;\n    } else if (humidity != null) {\n        path = \"/features/humidity/properties/value\";\n        value = humidity;\n    }\n    \n    if (!path || !value) {\n        return null;\n    }\n\n    return Ditto.buildDittoProtocolMsg(\n        \"org.eclipse.ditto\",\n        headers[\"device_id\"],\n        \"things\",\n        \"twin\",\n        \"commands\",\n        \"modify\",\n        path,\n        headers,\n        value\n    );\n}"
            }
        }
    }
}
}' http://localhost:8080/devops/piggyback/connectivity?timeout=8000

我已经使用 Hono 注册了一个设备,并将数据发送到 Ditto。同上成功接收数据。但我想将收到的数据发送到 Kafka。

Kafka 和 Ditto 之间成功建立连接。但我没有在卡夫卡消费者“数字双胞胎”中收到。我错过了什么吗?

使用 Policy 命令编辑:

curl -X PUT 'http://localhost:8080/api/2/policies/org.eclipse.ditto:5100' -u 'ditto:ditto' -H 'Content-Type: application/json' -d '{
"entries": {
    "owner": {
        "subjects": {
            "nginx:ditto": {
                "type": "nginx basic auth user"
            }
        },
        "resources": {
            "thing:/": {
                "grant": [
                    "READ","WRITE"
                ],
                "revoke": []
            },
            "policy:/": {
                "grant": [
                    "READ","WRITE"
                ],
                "revoke": []
            },
            "message:/": {
                "grant": [
                    "READ","WRITE"
                ],
                "revoke": []
            }
        }
    }
}
}
4

1 回答 1

4

关于授权上下文,您可以查看我们的连接文档中的授权部分。它必须包含在您的事物的策略或 ACL 中定义的主题。

例如:

Thing "foo:bar" 的策略已定义主题 "somePrefix:someValue" 的整个事物的读取访问权限。

{
  "policyId": "foo:bar",
  "entries": {
    ... //Maybe more entries
    "MyKafkaConnection": {
      "subjects": {
        "somePrefix:someValue": {
          "type": "my description for this subject"
        }
      },
      "resources": {
        "thing:/": {
          "grant": [
            "READ"
          ],
          "revoke": []
        },
        "message:/": {
          "grant": [
            "READ"
          ],
          "revoke": []
        }
      }
    }
  }
}

在您所指的示例中,与“foo:bar”相关的事件将通过您在地址字段中指定的主题的 kafka 连接发布。

于 2019-07-23T11:45:20.997 回答