我正在通过 flink 管道访问 azure blob 存储。
根据 flink 文档https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/azure/ 有两种方法可以实现这一点。
1) fs.azure.account.key.<account_name>.blob.core.windows.net: <azure_storage_key>
。
我实施了这种方法,但根据我们的组织安全策略,硬编码访问密钥不是建议的方式。所以这种方法没有帮助。
2)fs.azure.account.keyprovider.<account_name>.blob.core.windows.net: org.apache.flink.fs.azurefs.EnvironmentVariableKeyProvider
**a)**我们正在使用这种方法将运行作业的检查点和保存点保存在 azure blob 存储上,比如 storage1 。表示这种方法(或键值对组合)已经在使用。
**b)**现在我们需要将 csv/text/xml 文件保存在不同的 blob 存储上,例如 storage2。
为了访问这个 blob 存储帐户,我需要提供访问密钥,并且这需要通过我在第 a 点中提到的相同方式通过配置进行访问。
为此,我创建了一个特定于应用程序的类,其内部逻辑(环境变量除外)与 EnvironmentVariableKeyProvider 相同。
import org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration;
import org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.KeyProvider;
import org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.KeyProviderException;
public class MyAppEnvironmentVariableKeyProvider implements KeyProvider {
public static final String AZURE_STORAGE_KEY_ENV_VARIABLE = "AZURE_STORAGE_KEY_MYAPP";
@Override
public String getStorageAccountKey(final String s, final Configuration configuration)
throws KeyProviderException {
// Currently hardcoding
//String azureStorageKey = "abcdefghijk";
String azureStorageKey = System.getenv(AZURE_STORAGE_KEY_ENV_VARIABLE);
if (azureStorageKey != null) {
return azureStorageKey;
} else {
throw new KeyProviderException(
"Unable to retrieve Azure storage key from environment. \""
+ AZURE_STORAGE_KEY_ENV_VARIABLE
+ "\" not set.");
}
}
}
我在deployment.ym l 中声明了如下配置
flinkConfiguration:
fs.azure.account.keyprovider.storage1.blob.core.windows.net: org.apache.flink.fs.azurefs.EnvironmentVariableKeyProvider
fs.azure.account.keyprovider.storage2.blob.core.windows.net: >- com.myapp.MyAppEnvironmentVariableKeyProvider
//many other configuraiton exists here but not needed for this problem statement
kubernetes:
pods:
affinity: null
annotations:
prometheus.io/port: '9249'
prometheus.io/scrape: 'true'
envVars:
- name: AZURE_STORAGE_KEY
valueFrom:
secretKeyRef:
key: azure.accesskey
name: my-storage-secret
- name: AZURE_STORAGE_KEY_MYAPP
value: >-
abcdefgh
valueFrom: null
现在,当我的应用程序尝试访问此 fs.azure.account.keyprovider.storage2.blob.core.windows.net 属性时,它给了我以下错误。
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.AzureException: org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.KeyProviderException: Unable to load key provider class.
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.createAzureStorageSession(AzureNativeFileSystemStore.java:1086) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.initialize(AzureNativeFileSystemStore.java:538) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.NativeAzureFileSystem.initialize(NativeAzureFileSystem.java:1358) ~[?:?]
at org.apache.flink.fs.azurefs.AbstractAzureFSFactory.createInitializedAzureFS(AbstractAzureFSFactory.java:88) ~[?:?]
at org.apache.flink.fs.azurefs.AbstractAzureFSFactory.create(AbstractAzureFSFactory.java:79) ~[?:?]
at org.apache.flink.core.fs.PluginFileSystemFactory.create(PluginFileSystemFactory.java:62) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:505) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:406) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:214) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
Caused by: org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.KeyProviderException: Unable to load key provider class.
似乎 flink 无法加载用户定义的类。
有没有办法加载这个用户定义的 MyAppEnvironmentVariableKeyProvider 类。