我正在尝试使用 Python 中的 Apache Flink Stateful Function 构建一个项目,但我似乎无法让它工作。我将问题缩小到似乎当我通过我的 protobuf 模式将请求发送到我的有状态函数时,序列化程序无法将我的消息序列化到我期望的类中。这是我正在尝试做的事情:
import json
from statefun import StatefulFunctions, RequestReplyHandler
from jobs.session_event_pb2 import Event
functions = StatefulFunctions()
@functions.bind("namespace/funcname")
def funcname(context, session: Event):
print("hello world")
handler = RequestReplyHandler(functions)
if __name__ == '__main__':
inputFile = open("my_file.json", "r")
for line in inputFile:
data = json.loads(line).get('properties')
if data is not None and data.get('prop1') is not None and data.get('prop2') is not None:
request = Event()
request.prop1 = data["prop1"]
request.prop2 = data["prop2"]
request = request.SerializeToString()
handler(request)
这是我的 Protobuf 架构:
syntax = "proto3";
package mypackage;
message Event {
string prop1 = 1;
string prop2 = 2;
}
我在这里做错了什么?