0

我正在尝试使用 Python 中的 Apache Flink Stateful Function 构建一个项目,但我似乎无法让它工作。我将问题缩小到似乎当我通过我的 protobuf 模式将请求发送到我的有状态函数时,序列化程序无法将我的消息序列化到我期望的类中。这是我正在尝试做的事情:

import json
from statefun import StatefulFunctions, RequestReplyHandler
from jobs.session_event_pb2 import Event

functions = StatefulFunctions()


@functions.bind("namespace/funcname")
def funcname(context, session: Event):
    print("hello world")


handler = RequestReplyHandler(functions)

if __name__ == '__main__':
    inputFile = open("my_file.json", "r")
    for line in inputFile:
        data = json.loads(line).get('properties')
        if data is not None and data.get('prop1') is not None and data.get('prop2') is not None:
            request = Event()
            request.prop1 = data["prop1"]
            request.prop2 = data["prop2"]
            request = request.SerializeToString()
            handler(request)

这是我的 Protobuf 架构:

syntax = "proto3";

package mypackage;

message Event {
    string prop1 = 1;
    string prop2 = 2;
}

我在这里做错了什么?

4

1 回答 1

0

这是因为 RequestReply 处理程序不接受直接的 protobuf 消息。Flink 运行时发送一个名为的类型ToFunction并接收一个类型的响应FromFunction。此有效负载包含您的调用者消息以及持久值和其他元信息。

如果您不能直接调用函数,例如在测试中,我会鼓励您这样做并且根本不使用处理程序。

于 2020-08-18T14:56:05.303 回答