0

我有 Airflow 2.1.0 集群,1 主 2 工人。我正在尝试设置自定义 s3 后端以在任务之间传输数据帧。我将s3_xcom_backend.py放入所有 3 个节点上的 3 个不同路径中;气流主目录: /data/airflow/ ,ubuntu 主目录 ~/ 和气流 conda 环境的包目录,其中默认存在 xcom.py 文件: /data/anaconda3/envs/airflow/lib/python3.8/site-packages/

然后我将 export AIRFLOW__CORE__XCOM_BACKEND=s3_xcom_backend.S3XComBackend 行添加到 .bashrc 并将 AIRFLOW__CORE__XCOM_BACKEND=s3_xcom_backend.S3XComBackend 行添加到airflow.env 文件中。还更改了气流.cfg 文件中的 xcom_backend = s3_xcom_backend.S3XComBackend。我对所有 3 个节点进行了配置更改。

最后,我按顺序重新启动网络服务器和调度程序服务。但是 s3 xcom 后端不会生效。当我运行仅在任务之间传输数据帧的taskflow_etl_dag时,我收到错误消息,表明我的 s3 后端未处于活动状态,并且气流仍在使用默认的 xcom 后端。

如何启用自定义 s3 后端,不胜感激。

 2022-01-26 23:32:42,429] {taskinstance.py:876} INFO - Dependencies all met for 
 <TaskInstance: taskflow_etl_pandas.extract 2022-01-26T20:32:40.375286+00:00 [queued]>
[2022-01-26 23:32:42,448] {taskinstance.py:876} INFO - Dependencies all met for <TaskInstance: taskflow_etl_pandas.extract 2022-01-26T20:32:40.375286+00:00 [queued]>
[2022-01-26 23:32:42,448] {taskinstance.py:1067} INFO - 
--------------------------------------------------------------------------------
[2022-01-26 23:32:42,448] {taskinstance.py:1068} INFO - Starting attempt 1 of 1
[2022-01-26 23:32:42,449] {taskinstance.py:1069} INFO - 
--------------------------------------------------------------------------------
[2022-01-26 23:32:42,458] {taskinstance.py:1087} INFO - Executing <Task(_PythonDecoratedOperator): extract> on 2022-01-26T20:32:40.375286+00:00
[2022-01-26 23:32:42,462] {standard_task_runner.py:52} INFO - Started process 2628493 to run task
[2022-01-26 23:32:42,467] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'taskflow_etl_pandas', 'extract', '2022-01-26T20:32:40.375286+00:00', '--job-id', '253', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/test_df_transfer_between_tasks.py', '--cfg-path', '/tmp/tmpdovbaijc', '--error-file', '/tmp/tmphvjzfu9r']
[2022-01-26 23:32:42,468] {standard_task_runner.py:77} INFO - Job 253: Subtask extract
[2022-01-26 23:32:42,538] {logging_mixin.py:104} INFO - Running <TaskInstance: taskflow_etl_pandas.extract 2022-01-26T20:32:40.375286+00:00 [running]> on host DTEKAIRFLOWWP1.fw.dteknoloji.com.tr
[2022-01-26 23:32:42,625] {taskinstance.py:1280} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=taskflow_etl_pandas
AIRFLOW_CTX_TASK_ID=extract
AIRFLOW_CTX_EXECUTION_DATE=2022-01-26T20:32:40.375286+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-01-26T20:32:40.375286+00:00
[2022-01-26 23:32:42,632] {python.py:151} INFO - Done. Returned value was: ['a', 'b']
[2022-01-26 23:32:42,633] {xcom.py:228} ERROR - Could not serialize the XCom value into JSON. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your airflow config.
[2022-01-26 23:32:42,633] {taskinstance.py:1481} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/data/anaconda3/envs/airflow/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1137, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/data/anaconda3/envs/airflow/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1311, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/data/anaconda3/envs/airflow/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1344, in _execute_task
    self.xcom_push(key=XCOM_RETURN_KEY, value=result)
  File "/data/anaconda3/envs/airflow/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/data/anaconda3/envs/airflow/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1919, in xcom_push
    XCom.set(
  File "/data/anaconda3/envs/airflow/lib/python3.8/site-packages/airflow/utils/session.py", line 67, in wrapper
    return func(*args, **kwargs)
  File "/data/anaconda3/envs/airflow/lib/python3.8/site-packages/airflow/models/xcom.py", line 79, in set
    value = XCom.serialize_value(value)
  File "/data/anaconda3/envs/airflow/lib/python3.8/site-packages/airflow/models/xcom.py", line 226, in serialize_value
    return json.dumps(value).encode('UTF-8')
  File "/data/anaconda3/envs/airflow/lib/python3.8/json/__init__.py", line 231, in dumps
    return _default_encoder.encode(obj)
  File "/data/anaconda3/envs/airflow/lib/python3.8/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/data/anaconda3/envs/airflow/lib/python3.8/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/data/anaconda3/envs/airflow/lib/python3.8/json/encoder.py", line 179, in default
    raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type DataFrame is not JSON serializable
[2022-01-26 23:32:42,635] {taskinstance.py:1524} INFO - Marking task as FAILED. dag_id=taskflow_etl_pandas, task_id=extract, execution_date=20220126T203240, start_date=20220126T203242, end_date=20220126T203242
[2022-01-26 23:32:42,677] {local_task_job.py:151} INFO - Task exited with return code 1
4

0 回答 0