1

我最近将 Plc4x Kafka Connect 插件从版本 0.5.0 更新到 0.8.0,方法是从 Plc4x 的Github 页面克隆源代码并使用 Maven 构建它,完全按照自述文件中的说明。构建源代码后,我收到了一个 uber JAR,其中包含 Kafka Connect 插件的所有必需库和依赖项。然后,我创建一个连接器配置文件,该文件类似于 Plc4x 的Github 页面中的示例:

{
 "name":"plc-source-test",
  "config": {
   "connector.class":"org.apache.plc4x.kafka.Plc4xSourceConnector",
   "tasks.max":"1", 
   "file":"test.sink.txt", 
   "topics":"connect-test" 
  }
}

然后我将配置推送到 REST 接口:

curl -X POST -H "Content-Type: application/json" --data config.json http://localhost:8083/connectors

REST 接口现在响应:

{"error_code":500,"message":null}

这就是我卡住的地方。我相信这个错误与 config.json 文件中的以下行有关:

"connector.class":"org.apache.plc4x.kafka.Plc4xSourceConnector"

因为当我使用不同的连接器类时,例如:

"connector.class":"FileStreamSinkConnector"

一切正常,我可以成功地将我的连接器配置推送到 REST 接口。Plc4x 0.5.0 版本也没有出现这个问题。我已经解压了包含所有依赖项的 uber JAR,并验证了 Plc4xSourceConnector 类确实存在。我不知道我做错了什么,因为我正在按照他们的 Github 页面中列出的步骤来构建和配置所有内容。有没有其他人遇到过这个问题?

4

1 回答 1

0

我设法找到了我的问题的解决方案。我使用的连接器配置文件是为 PLC4x v.0.4.0 设计的,并且在 v.0.8.0 发布之前已经更新。我在 PLC4X Github存储库中找到了一个示例配置文件并将其用作布局。从他们的网站上,我发现该领域:

"sources.machineX.connectionString"

必须以特殊方式格式化。我将连接器配置文件更新为:

{"name": "plc-source-test",
 "config": {
  "connector.class": "org.apache.plc4x.kafka.Plc4xSourceConnector",
  "default-topic": "test-topic",
  "tasks.max": "1",
  "sources": "machineA",
  "sources.machineA.connectionString": "s7:<PLC_IP>?remote-rack=0&remote-slot=0",
  "sources.machineA.jobReferences": "jobA",
  "sources.machineA.jobReferences.jobA": "job-topic",
  "jobs": "jobA",
  "jobs.jobA.interval": "500",
  "jobs.jobA.fields": "fieldA",
  "jobs.jobA.fields.fieldA": "%DB1.DBD1:REAL"
 }

一切正常!

使用 Plc4xSourceConnector 时,必须在连接器配置中指定所需的键:值:

"default-topic" //Required. The default name for the Kafka topic

"tasks.max" //Not quite sure what this does, but it is in the example config so lets use it

"sources" //Required. It must be a comma separated list of your source

"sources.machineA.connectionString" //Required. The connection string to the machine/PLC that you want to talk to

"sources.machineA.jobReferences" //Required. A comma separated list of all the jobs that you wish to create for this machine

"sources.machineA.jobReferences.jobA" //Required. The Kafka topic name for data produced by jobA

"jobs" //Required. A comma separated list of all jobs you wish to create

"jobs.jobA.interval" //Not sure if this is required. Determines the polling rate of your created job

"jobs.jobA.fields" //Required. A comma separated list of the fields belonging to this job. A field is a machine/PLC register containing a value

"jobs.jobA.fields.fieldA" //Required. The address to the resource on your machine/PLC

发布错误的连接器配置时显示的错误消息非常模糊,通常只是一些 NullPointerException。所以我花了一些时间分析PLC4X的源代码,尤其是这个类,弄清楚需要哪些字段。

于 2020-07-06T10:36:20.983 回答