0

我正在通过 flink 管道访问 azure blob 存储。

根据 flink 文档https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/azure/ 有两种方法可以实现这一点。

1) fs.azure.account.key.<account_name>.blob.core.windows.net: <azure_storage_key>

我实施了这种方法,但根据我们的组织安全策略,硬编码访问密钥不是建议的方式。所以这种方法没有帮助。

2)fs.azure.account.keyprovider.<account_name>.blob.core.windows.net: org.apache.flink.fs.azurefs.EnvironmentVariableKeyProvider

**a)**我们正在使用这种方法将运行作业的检查点和保存点保存在 azure blob 存储上,比如 storage1 。表示这种方法(或键值对组合)已经在使用。

**b)**现在我们需要将 csv/text/xml 文件保存在不同的 blob 存储上,例如 storage2。

为了访问这个 blob 存储帐户,我需要提供访问密钥,并且这需要通过我在第 a 点中提到的相同方式通过配置进行访问。

为此,我创建了一个特定于应用程序的类,其内部逻辑(环境变量除外)与 EnvironmentVariableKeyProvider 相同。

import org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration;
import org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.KeyProvider;
import org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.KeyProviderException;



public class MyAppEnvironmentVariableKeyProvider  implements KeyProvider  {

   public static final String AZURE_STORAGE_KEY_ENV_VARIABLE = "AZURE_STORAGE_KEY_MYAPP";


    @Override
    public String getStorageAccountKey(final String s, final Configuration configuration)
            throws KeyProviderException {
        // Currently hardcoding
        //String azureStorageKey = "abcdefghijk";
        String azureStorageKey = System.getenv(AZURE_STORAGE_KEY_ENV_VARIABLE);

        if (azureStorageKey != null) {
            return azureStorageKey;
        } else {
            throw new KeyProviderException(
                    "Unable to retrieve Azure storage key from environment. \""
                            + AZURE_STORAGE_KEY_ENV_VARIABLE
                            + "\" not set.");
        }
    }
}

我在deployment.ym l 中声明了如下配置

flinkConfiguration:
        
        fs.azure.account.keyprovider.storage1.blob.core.windows.net: org.apache.flink.fs.azurefs.EnvironmentVariableKeyProvider
        fs.azure.account.keyprovider.storage2.blob.core.windows.net: >- com.myapp.MyAppEnvironmentVariableKeyProvider
        
        //many other configuraiton exists here but not needed for this problem statement
      kubernetes:
        pods:
          affinity: null
          annotations:
            prometheus.io/port: '9249'
            prometheus.io/scrape: 'true'
          envVars:
            - name: AZURE_STORAGE_KEY
              valueFrom:
                secretKeyRef:
                  key: azure.accesskey
                  name: my-storage-secret
            - name: AZURE_STORAGE_KEY_MYAPP
              value: >-
                abcdefgh
              valueFrom: null

现在,当我的应用程序尝试访问此 fs.azure.account.keyprovider.storage2.blob.core.windows.net 属性时,它给了我以下错误。

org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.AzureException: org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.KeyProviderException: Unable to load key provider class.
        at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.createAzureStorageSession(AzureNativeFileSystemStore.java:1086) ~[?:?]
        at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.initialize(AzureNativeFileSystemStore.java:538) ~[?:?]
        at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.NativeAzureFileSystem.initialize(NativeAzureFileSystem.java:1358) ~[?:?]
        at org.apache.flink.fs.azurefs.AbstractAzureFSFactory.createInitializedAzureFS(AbstractAzureFSFactory.java:88) ~[?:?]
        at org.apache.flink.fs.azurefs.AbstractAzureFSFactory.create(AbstractAzureFSFactory.java:79) ~[?:?]
        at org.apache.flink.core.fs.PluginFileSystemFactory.create(PluginFileSystemFactory.java:62) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:505) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:406) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
        at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:214) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) ~[flink-dist_2.12-1.13.1-stream1.jar:1.13.1-stream1]
Caused by: org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.KeyProviderException: Unable to load key provider class.

似乎 flink 无法加载用户定义的类。

有没有办法加载这个用户定义的 MyAppEnvironmentVariableKeyProvider 类。

4

0 回答 0