1

我正在使用 Flink StateFun 3.1.0,并且想从嵌入式 Java 函数调用远程 Python 函数。在 StateFun 2.2 中,我可以通过我的(通过 Protobuf 生成的)Java 类的实例创建一个 ProtoBuf Any Any.pack(msg),这很有效。现在我明白了

Caused by: java.lang.ClassCastException: class com.google.protobuf.Any cannot be cast to class org.apache.flink.statefun.sdk.reqreply.generated.TypedValue (com.google.protobuf.Any and org.apache.flink.statefun.sdk.reqreply.generated.TypedValue are in unnamed module of loader 'app')

嵌入式 Java 函数、远程 Java 函数和远程 Python 函数之间调用的例子很多,但是我没有找到从嵌入式 Java 调用远程 Python 的例子。

在远程 Java 示例中,使用Context具有send(Message)方法的函数调用该函数,我认为该方法也可用于调用远程 Python 函数,但我的嵌入式 Java 函数传递了不支持此方法的不同类型的上下文。

4

1 回答 1

2

背景

事实上,在 3.x 版本中,一个明确的目标是使远程功能更容易使用并且更符合人体工程学,您可以在此处阅读更多关于这样做的动机。

为了帮助实现这一目标,我们从 SDK 表面移除了 Protobuf(我们仍在内部使用它),并提出了消息和类型抽象。

由于嵌入式 SDK 被认为更像是高级用户 SDK,因此您仍然可以使用 Protobuf 与远程功能进行通信。虽然这也是我们希望简化的事情,但存在一些技术问题。

类型值

这是一个在内部使用的 Protobuf 消息,每当远程函数调用另一个远程函数时。

message TypedValue {
    string typename = 1;
    // has_value is set to differentiate a zero length value bytes explicitly set,
    // or a non existing value.
    bool has_value = 2;
    bytes value = 3;
}
  • typename字段是由用户定义的 / 形式的字符串。例如:“com.kkrugler.types/MyEvent”。StateFun 将传递该字符串 as-us,并将其视为不透明类型标记,由远程 SDK 端的用户解释。

  • has_value在这种情况下,需要始终设置为 true。(发送没有有效负载的消息被认为是错误的。

  • values - 是typename类型的序列化、不透明值。

嵌入式功能

需要调用远程函数的嵌入式函数需要使用TypedValue实例调用它。远程 SDK 知道如何将TypedValue转换为Message类。

替换任何

这是使用 TypedValue 包装 Protobuf 消息的有用方法:

 public static <M extends Message> TypedValue pack(M message) {
    return TypedValue.newBuilder()
        .setTypename("type.googleapis.com/" + message.getDescriptorForType().getFullName())
        .setHasValue(true)
        .setValue(message.toByteString())
        .build();
  }

然后,像这样简单地使用它:

MyProtobufEvent myProtobufEvent = MyProtobufEvent.newBuilder() ... build();

context.send(.., pack(myProtobufEvent));

在远程 SDK 端(假设 Python,因为您已经提到过):

from statefun import make_protobuf_type

MyProtobufEventType = make_protobuf_type(MyProtobufEvent)

...
myProtobufEvent = message.as_type(MyProtobufEventType)
于 2021-12-15T11:20:21.113 回答