我使用 StreamSets 作为摄取工具,将记录从 Oracle 数据库提取到 Kafka 主题。现在,我想通过 StreamSets 本身来使用它,并且还想计算 Kafka 主题中的记录数。
我怎样才能做到这一点。请帮助
我使用 StreamSets 作为摄取工具,将记录从 Oracle 数据库提取到 Kafka 主题。现在,我想通过 StreamSets 本身来使用它,并且还想计算 Kafka 主题中的记录数。
我怎样才能做到这一点。请帮助
您可以使用 StreamSets Data Collector 的history
REST API 来检索具有每个阶段的记录计数的数据。例如,这里是给定管道的最后一次运行的计数器。我正在使用出色的jq工具在命令行解析 JSON。
$ curl -s -u admin:admin -H 'X-Requested-By:sdc' http://localhost:18630/rest/v1/pipeline/RedshiftStreamingwithKinesisFirehose537add73-bb16-4358-a26a-a51576dea32b/history | jq -r .[0].metrics | jq .counters
{
"pipeline.batchCount.counter": {
"count": 1029
},
"pipeline.batchErrorMessages.counter": {
"count": 0
},
"pipeline.batchErrorRecords.counter": {
"count": 0
},
"pipeline.batchInputRecords.counter": {
"count": 648226
},
"pipeline.batchOutputRecords.counter": {
"count": 648226
},
"stage.ExpressionEvaluator_01.errorRecords.counter": {
"count": 0
},
"stage.ExpressionEvaluator_01.inputRecords.counter": {
"count": 648226
},
"stage.ExpressionEvaluator_01.outputRecords.counter": {
"count": 648226
},
"stage.ExpressionEvaluator_01.stageErrors.counter": {
"count": 0
},
"stage.ExpressionEvaluator_01:ExpressionEvaluator_01OutputLane15561338960790.outputRecords.counter": {
"count": 648226
},
"stage.FieldOrder_01.errorRecords.counter": {
"count": 0
},
"stage.FieldOrder_01.inputRecords.counter": {
"count": 648226
},
"stage.FieldOrder_01.outputRecords.counter": {
"count": 648226
},
"stage.FieldOrder_01.stageErrors.counter": {
"count": 0
},
"stage.FieldOrder_01:FieldOrder_01OutputLane15561351879260.outputRecords.counter": {
"count": 648226
},
"stage.FieldTypeConverter_01.errorRecords.counter": {
"count": 0
},
"stage.FieldTypeConverter_01.inputRecords.counter": {
"count": 648226
},
"stage.FieldTypeConverter_01.outputRecords.counter": {
"count": 648226
},
"stage.FieldTypeConverter_01.stageErrors.counter": {
"count": 0
},
"stage.FieldTypeConverter_01:FieldTypeConverter_01OutputLane15560499048280.outputRecords.counter": {
"count": 648226
},
"stage.KinesisFirehose_01.errorRecords.counter": {
"count": 0
},
"stage.KinesisFirehose_01.inputRecords.counter": {
"count": 648226
},
"stage.KinesisFirehose_01.outputRecords.counter": {
"count": 648226
},
"stage.KinesisFirehose_01.stageErrors.counter": {
"count": 0
},
"stage.MySQLBinaryLog_01.errorRecords.counter": {
"count": 0
},
"stage.MySQLBinaryLog_01.inputRecords.counter": {
"count": 0
},
"stage.MySQLBinaryLog_01.outputRecords.counter": {
"count": 648226
},
"stage.MySQLBinaryLog_01.stageErrors.counter": {
"count": 0
},
"stage.MySQLBinaryLog_01:MySQLBinaryLog_01OutputLane15561313696850.outputRecords.counter": {
"count": 648226
},
"stage.StreamSelector_01.errorRecords.counter": {
"count": 0
},
"stage.StreamSelector_01.inputRecords.counter": {
"count": 648226
},
"stage.StreamSelector_01.outputRecords.counter": {
"count": 648226
},
"stage.StreamSelector_01.stageErrors.counter": {
"count": 0
},
"stage.StreamSelector_01:StreamSelector_01OutputLane1556133811620.outputRecords.counter": {
"count": 0
},
"stage.StreamSelector_01:StreamSelector_01OutputLane1556133816638.outputRecords.counter": {
"count": 648226
},
"stage.Trash_01.errorRecords.counter": {
"count": 0
},
"stage.Trash_01.inputRecords.counter": {
"count": 0
},
"stage.Trash_01.outputRecords.counter": {
"count": 0
},
"stage.Trash_01.stageErrors.counter": {
"count": 0
}
}