2

我为 kapacitor 编写了一个简单的 udf,它基本上是在更改其中一个字段值,应该非常简单明了。

当我运行它时,我收到如下错误:

ts=2018-06-07T10:56:52.957Z lvl=error msg="node failed" service=kapacitor task_master=main task=anom_generator node=anomalyGen2 err="read error: unexpected EOF"
ts=2018-06-07T10:56:52.957Z lvl=error msg="failed to stop task with out error" service=kapacitor task_master=main task=anom_generator err="anomalyGen2: read error: unexpected EOF"
ts=2018-06-07T10:56:52.957Z lvl=error msg="task finished with error" service=task_store err="anomalyGen2: read error: unexpected EOF" task=anom_generator

这是我的刻度脚本:

dbrp "telegraf-dude"."autogen"

var data = batch
        |query(''' SELECT * FROM "telegraf-dude"."autogen"."cpu" ''')
        .period(5m)
        .every(10s)
        .groupBy(*)
data
    @anomalyGen()
        .field('usage_idle')
        .action('ADD')
        .value(1.5)
    |influxDBOut()
        .create()
        .database('test_anom')
        .retentionPolicy('autogen')
        .measurement('cpu_copy_anom')

和我的 udf.py:

from kapacitor.udf.agent import Agent, Handler, Server
from kapacitor.udf import udf_pb2
from datetime import datetime
import sys
import time
import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s:%(name)s: %(message)s')
logger = logging.getLogger()

class AnomalyGenerator(Handler):
    def __init__(self, agent):
        logger.info("************** Hello __INIT__")
        self._field = ''
        self._action = ''
        self._value = ''
        self._points = []
        self._agent = agent
        logger.info("************** ByeBye __INIT__")

    ## defines the udf input and output type BATCH or STREAM        
    def info(self):
        logger.info("************** Hello INFO")
        response = udf_pb2.Response()
        response.info.wants = udf_pb2.BATCH
        response.info.provides = udf_pb2.BATCH
        response.info.options['field'].valueTypes.append(udf_pb2.STRING)
        response.info.options['action'].valueTypes.append(udf_pb2.STRING)
        response.info.options['value'].valueTypes.append(udf_pb2.DOUBLE)
        logger.info("************** ByeBye INFO")
        return response

    def init(self, init_req):
        logger.info("************** Hello INIT")
        for opt in init_req.options:
            if opt.name == 'field':
                self._field = opt.values[0].stringValue
                logger.info("************** FIELD field value = " + str(self._field))
            if opt.name == 'action':
                self._action = opt.values[0].stringValue
                logger.info("************** ACTION field value: = " + str(self._action))
            if opt.name == 'value':
                self._value = opt.values[0].doubleValue
                logger.info("************** VALUE field value: = " + str(self._value))

        logger.info("************** Middle INIT")

        success = True
        msg = ''
        if self._field == '':
            success = False
            msg = 'must provide field name !'
        if self._action == '':
            success = False
            msg = 'must provide action type - ADD/SUBTRACT/MULTIPLY !'
        if self._value == '':
            success = False
            msg = 'must provide action value of double - 1.5 / 2.0 etc.'

        logger.info("************** INIT - Starting Response")
        response = udf_pb2.Response()
        response.init.success = success
        response.init.error = msg
        logger.info("************** ByeBye INIT")
        return response

    def begin_batch(self, begin_req):
        logger.info("************** Hello BEGIN BATCH")
        response = udf_pb2.Response()
        response.begin.CopyFrom(begin_req)
        self._begin_response = response
        logger.info("************** ByeBye BEGIN BATCH")

    def point(self, point):
        logger.info("************** Hello POINT")
        #logger.info("Self field = " + str(self._field))
        #logger.info("Self Action = " + str(self._action))
        #logger.info("Self value = " + str(self._value))
        origionalFieldValue = point.fieldsDouble[self._field]
        #logger.info("origional value from field:   " + str(origionalFieldValue))
        ## Make the actual modyfication to the value
        newFieldValue = 321.123
        response = udf_pb2.Response()
        response.point.CopyFrom(point)
        response.point.fieldsDouble[self._field] = newFieldValue
        #logger.info("new value from field:  " + str(response.point.fieldsDouble[self._field]))
        logger.info("************** ByeBye POINT")
        self._agent.write_response(response)

    def end_batch(self, end_req):
        logger.info("************** Hello END BATCH")
        ## Create beginBatch response with count of points
        #logger.info("************** LENGTH OF POINTS: " + str(len(self._points)))
        #self._begin_response.begin.size = len(self._points)
        #self._agent.write_response(self._begin_response)
        ## Itterate points for another sorting
        #response = udf_pb2.Response()
        #for p in points:
        #   response.point.CopyFrom(p)
        #   self._agent.write_response(response)     
        ## send an identical end batch back to kapacitor
        #response.end.CopyFrom(end_req)
        #self._agent.write_response(response)
        logger.info("************** ByeBye END BATCH")
        pass

    def snapshot(self):
        logger.info("************** Hello SNAPSHOT")
        response = udf_pb2.Response()
        response.snapshot.snapshot = b''
        logger.info("************** ByeBye SNAPSHOT")
        return response

    def restore(self, restore_req):
        logger.info("************** Hello RESTORE")
        response = udf_pb2.Response()
        response.restore.success = False
        response.restore.error = 'not implemented'
        logger.info("************** ByeBye RESTORE")
        return response

if __name__ == '__main__':
  logger.info("************** Hello MAIN")
  agent = Agent()
  handler = AnomalyGenerator(agent)
  agent.handler = handler

  logger.info("Starting AnomalyGenerator UDF")
  agent.start()
  agent.wait()
  logger.info("Finished running AnomalyGenerator UDF")
  logger.info("************** ByeBye MAIN")

和 kapacitor 任务 show 命令输出:

[root@lab5 home]# docker exec kapa kapacitor show anom_generator
ID: anom_generator
Error: anomalyGen2: read error: unexpected EOF
Template:
Type: batch
Status: enabled
Executing: false
Created: 03 Jun 18 18:57 UTC
Modified: 07 Jun 18 10:56 UTC
LastEnabled: 07 Jun 18 10:56 UTC
Databases Retention Policies: ["telegraf-dude"."autogen"]
TICKscript:
dbrp "telegraf-dude"."autogen"

var data = batch
    |query(''' SELECT * FROM "telegraf-dude"."autogen"."cpu" ''')
        .period(5m)
        .every(10s)
        .groupBy(*)

data
    @anomalyGen()
        .field('usage_idle')
        .action('ADD')
        .value(1.5)
    |influxDBOut()
        .create()
        .database('test_anom')
        .retentionPolicy('autogen')
        .measurement('cpu_copy_anom')

DOT:
digraph anom_generator {
query1 -> anomalyGen2;
anomalyGen2 -> influxdb_out3;
}

有任何想法吗 ???在网上找不到太多有用的信息

谢谢

4

0 回答 0