我实现了一个UnboundedSource
需要与需要 256 位加密的服务器通信的 Apache Beam。我可以在本地运行作业时使用它(通过将无限强度策略文件 [1] 安装到我的本地$JAVA_HOME/jre/lib/security
目录中。
UnboundedSource
我的问题是在 Google Cloud Platform[2] 中运行的 Dataflow 作业中使用它时尝试使用它吗?
是否可以将 Dataflow 作业配置为使用无限的安全策略文件,或者 GCP 不支持该功能?
尝试在 GCP 中运行(没有无限强度策略文件)时出现的错误是handshake error
exception: "java.util.concurrent.ExecutionException: java.net.ConnectException: Received fatal alert: handshake_failure
at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)
at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
at com.urbanairship.connect.client.StreamConnection.connect(StreamConnection.java:212)
at com.urbanairship.connect.client.StreamConnection.begin(StreamConnection.java:145)
at com.urbanairship.connect.client.StreamConnection.read(StreamConnection.java:122)
at com.urbanairship.connect.client.StreamConsumeTask.transitionToReading(StreamConsumeTask.java:138)
at com.urbanairship.connect.client.StreamConsumeTask.stream(StreamConsumeTask.java:100)
at com.urbanairship.connect.client.StreamConsumeTask.run(StreamConsumeTask.java:83)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Received fatal alert: handshake_failure
at com.ning.http.client.providers.netty.request.NettyConnectListener.onFutureFailure(NettyConnectListener.java:133)
at com.ning.http.client.providers.netty.request.NettyConnectListener.access$200(NettyConnectListener.java:37)
at com.ning.http.client.providers.netty.request.NettyConnectListener$1.operationComplete(NettyConnectListener.java:104)
at org.jboss.netty.channel.DefaultChannelFuture.notifyListener(DefaultChannelFuture.java:409)
at org.jboss.netty.channel.DefaultChannelFuture.notifyListeners(DefaultChannelFuture.java:395)
at org.jboss.netty.channel.DefaultChannelFuture.setFailure(DefaultChannelFuture.java:362)
at org.jboss.netty.handler.ssl.SslHandler.setHandshakeFailure(SslHandler.java:1460)
at org.jboss.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1314)
at org.jboss.netty.handler.ssl.SslHandler.decode(SslHandler.java:852)
at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:425)
at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
... 3 more
正如另一个问题中所建议的那样, 我尝试使用反射来覆盖安全策略以允许无限长度的密钥,但这不起作用,并且其中一个评论提到反射黑客不适用于 Java 8(我相信由于变量被更改到final
)。
在我的工作中运行这个块没有用......
final Class<?> jceSecurity = Class.forName("javax.crypto.JceSecurity");
final Class<?> cryptoPermissions = Class.forName("javax.crypto.CryptoPermissions");
final Class<?> cryptoAllPermission = Class.forName("javax.crypto.CryptoAllPermission");
final Field isRestrictedField = jceSecurity.getDeclaredField("isRestricted");
isRestrictedField.setAccessible(true);
final Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(isRestrictedField, isRestrictedField.getModifiers() & ~Modifier.FINAL);
isRestrictedField.set(null, false);
final Field defaultPolicyField = jceSecurity.getDeclaredField("defaultPolicy");
defaultPolicyField.setAccessible(true);
final PermissionCollection defaultPolicy = (PermissionCollection) defaultPolicyField.get(null);
final Field perms = cryptoPermissions.getDeclaredField("perms");
perms.setAccessible(true);
((Map<?, ?>) perms.get(defaultPolicy)).clear();
final Field instance = cryptoAllPermission.getDeclaredField("INSTANCE");
instance.setAccessible(true);
defaultPolicy.add((Permission) instance.get(null));
[1] http://www.oracle.com/technetwork/java/javase/downloads/jce8-download-2133166.html
[2] https://cloud.google.com/dataflow