0

编辑:我在https://stackoverflow.com/a/60235242/3236516描述了我们的解决方案

我有一个 java 对象。它是扩展抽象类的许多子类之一的实例。我想修改它的一种方法,以便在调用原始方法之前运行一些额外的代码。我的目标在概念上与 AspectJ 中的切入点相同。

如果我创建原始对象的一些修改版本而不是改变原始对象,那很好。如果解决方案涉及字节码操作,也可以。

前期工作

我考虑过通过 JavaAssist 创建代理。麻烦的是,ProxyFactory 的 create 方法要求我提前知道构造函数的输入类型。我不。我可以在不通过 Objenesis 调用构造函数的情况下创建我的对象,但是生成的代理对象对于构造函数设置的任何值都将具有空值。这意味着只要直接引用构造函数设置的值,我生成的对象的行为就会与原始对象不同。

语境

我们通过 AWS Kinesis Data Analytics 使用 Flink 来转换一些流数据。我们希望在所有StreamOperator 的open() 方法的开头包含一些通用代码,而无需修改每个运算符。一个用例是确保自定义指标代理在操作员运行的每个实例上运行。

4

4 回答 4

1

使用 Byte Buddy,您可以创建一个包装器或一个 Java 代理,它们都可以实现此目标。如果您在调用包装类的构造函数时遇到困难,那么使用 Byte Buddy 也会出现同样的问题,因为任何库都绑定到 JVM 给出的约束。

要创建 Java 代理,请使用AgentBuilder. 然后,您可以使用该type步骤指定要拦截的所有类型,例如实现某个接口或扩展类的所有类型。对于transform,Byte Buddy 提供了一个方法装饰 API 称为Advice,它允许您添加额外的代码,例如:

class MyAdvice {
  @Advice.OnMethodEnter
  static void enter() { System.out.println("Hello"); }
}

经过

builder = builder.visit(Advice.to(MyAdvice.class).on(named("foo")));

例如,您可以在您指定的类型的所有名为“foo”的方法的开头打印 hello world。您可以在包的包文档中java.instrument找到有关 Java 代理的更多信息。

于 2020-01-23T07:26:24.853 回答
0

特定于 Flink 的解决方案可能是实现您正在使用的 Flink 运算符的自定义版本。我不相信这会带你到一个好地方。只是分享这个想法以防万一。

没有太多关于如何实现自定义操作符的文档,但是有一个关于这个主题的 Flink Forward talk

于 2020-01-23T14:12:50.173 回答
0

首先,我会在 AWS 上提交功能请求以支持您的用例。那将是最干净的解决方案。

其次,我会避免寻找任何方法来覆盖open(). 由于您处于一个没有太多控制权的环境中,我想方法要么根本不起作用,要么很脆弱,并且随着环境的更新而中断。

我会在相应的 UDF 方法中进行延迟初始化,当然也会在一些常见的实用程序方法中考虑到这一点。

private Counter counter;

@Override
public Integer map(String value) {
    if (counter == null) {
        RuntimeContext ctx = getRuntimeContext();
        counter = ctx.getMetricGroup().counter("outputs");
    }
    counter.inc();
    return Integer.parseInt(value);
}
于 2020-01-28T10:50:37.210 回答
0

原始提问者的回答:我们通过为 StreamExecutionEnvironment 创建一个 ByteBuddy 代理解决了这个问题,该代理拦截了对 getStreamGraph 的调用并将每个节点的 jobVertexClass 重铸(使用反射)到一个扩展原始类类型的类,但包含我们的自定义逻辑。因为不同的类需要不同的参数,所以我们使用 Objenesis 来实例化代理而不调用构造函数。为了解决通常在构造函数中设置的私有字段为空的问题,我们使用反射来改变所有私有字段的可见性,然后将每个字段值从原始对象复制到代理对象。

我们没有追求 Rafael Winterhalter 提出的代理解决方案,因为它需要能够在每个工作实例上运行代理设置代码,这类似于希望在每个工作机器上启动指标代理的原始问题。虽然我在最初的问题中没有说明这一点,但创建代理对象的代码发生在 Flink 作业管理机器上,而不是工作机器上。

于 2020-02-15T01:18:05.833 回答