0

我已经为此苦苦挣扎了两天,我完全陷入困境,所以我想知道是否有人可以帮助我。

我似乎遇到了一个我无法弄清楚的 SSL 错误。

我正在我的笔记本中创建一个 dask 集群:

def config_coiled_cluster(env_name="riox"):
    software_env_name = env_name

    coiled.create_software_environment(
        name=software_env_name,
        #container="mrmaksimize/prefect-coiled-env:latest",
        pip=[
            "dask[complete]", 
            "rioxarray", 
            "pyproj", 
            "geopandas", 
            "pandas", 
            "s3fs",
            "jupyter-server-proxy"
        ],
        backend_options={"region": "us-east-1"})

    # Create a cluster configuration named "my-cluster-config"
    coiled.create_cluster_configuration(
        name=f"{software_env_name}-dev",
        scheduler_cpu=4,
        scheduler_memory="8 GiB",
        worker_cpu=4,
        worker_memory="8 GiB",
        software=f"mrmaksimize/{software_env_name}",
    )
    
    

    return True

config_coiled_cluster("riox")
    cluster = coiled.Cluster(
        configuration=f"riox-dev",
        n_workers=4
    )

    client = Client(cluster)

一切似乎都运行良好。

然后我将点的 CSV 加载到 Dask DF(来自 S3)中,这也很有效。集群接收命令,我可以看到它正在处理。

之后,我加载了一些栅格(也来自 S3,但它们是开放的,所以我只使用 HTTPS 链接)并保留它们。这很好用。

最后,我对点数据集运行了一个 map_partitions:

def get_values_from_rasters(chunk_df, partition_info=None): 
    print(partition_info)
    #If divisions are not known (for instance if the index is not sorted) then you will get None as the division.
    ind_x = xr.DataArray(chunk_df.x.values, dims="unique_index")
    ind_y = xr.DataArray(chunk_df.y.values, dims="unique_index")
    
    # TODO - this should be parallel somehow
    for rkey in loaded_rasters:
        print(f"Extracting from {rkey}")
        rds = loaded_rasters[rkey]
        fill_val = rds.attrs['_FillValue']
        result = rds.sel(band=1, x = ind_x, y = ind_y, method='nearest').values
        chunk_df[f"{rkey}_hrs"] = result
        chunk_df[chunk_df[f"{rkey}_hrs"] == fill_val] = None
    return chunk_df

# Build output meta
meta = {'VALUE': float, 'CBS_ID': int, 'CBS_Ward': int, 'x': float, 'y': float}
for rkey in loaded_rasters:
    meta[f"{rkey}_hrs"] = float


# Promise the calculation
points = points.map_partitions(get_values_from_rasters, 
                                 meta=meta)
points.head()

在这里我收到一个 TypeError:

TypeError: TLS expects a `ssl_context` argument of type ssl.SSLContext (perhaps check your TLS configuration?)  Instead got None

在仪表板上 - 我可以看到命令已收到并且仍在处理中,即使笔记本上弹出错误。所以我最好的猜测是它与工作人员和调度程序之间盘绕的 TLS 通信有关。

这不会发生在本地集群上,之前也不会发生在盘绕上。我唯一能想到的是,我遇到了使用限制并输入了我的信用卡,这样我就可以继续使用线圈了。

我已经尝试了我能想到的一切,但我没有想法,而且我不确定从哪里开始调试。欢迎任何想法 - 会尝试任何东西!

4

0 回答 0