我在制作需要 STREAM 并提供 BATCH 的 UDF 时遇到了麻烦。
这边走:
def info(self):
response = udf_pb2.Response()
response.info.wants = udf_pb2.STREAM
response.info.provides = udf_pb2.BATCH
response.info.options['field'].valueTypes.append(udf_pb2.STRING)
return response
有人有示例代码吗?我在网上搜索(论坛、文档),但所有示例都是针对 BATCH-BACH、STREAM-STREAM 或 BATCH-STREAM。
我在示例中看到,在编写对 Kapacitor 的响应时,在“end_batch(self,end_req)”方法中,有必要“传达”BATCH 已经结束,在一个示例中,这是这样制作的:
def end_batch(self, end_req):
# Send begin batch with count of outliers
self._begin_response.begin.size = len(self._batch)
self._agent.write_response(self._begin_response)
response = udf_pb2.Response()
...
# Send an identical end batch back to Kapacitor
# HERE
response.end.CopyFrom(end_req)
self._agent.write_response(response)
为了发送 BATCH,我必须从“point(self,point)”方法发送它,但无法访问 end_req 对象并且不知道如何创建一个。
提前致谢!再见!