9

我已经用 postgresql 数据库设置了气流,我正在创建多个 dag

def subdag(parent_dag_name, child_dag_name,currentDate,batchId,category,subCategory,yearMonth,utilityType,REALTIME_HOME, args):
dag_subdag = DAG(
    dag_id='%s.%s' % (parent_dag_name, child_dag_name),
    default_args=args,
    schedule_interval="@once",
)
# get site list to run bs reports
site_list = getSiteListforProcessing(category,subCategory,utilityType,yearMonth);
print (site_list)
def update_status(siteId,**kwargs):
    createdDate=getCurrentTimestamp();
    print ('N',siteId,batchId,yearMonth,utilityType,'N')
    updateJobStatusLog('N',siteId,batchId,yearMonth,utilityType,'P')

def error_status(siteId,**kwargs):
    createdDate=getCurrentTimestamp();
    print ('N',siteId,batchId,yearMonth,utilityType,'N')

BS_template = """
echo "{{ params.date }}"
java -cp xx.jar com.xGenerator {{params.siteId}} {{params.utilityType}} {{params.date}}
"""

for index,siteid in enumerate(site_list):
    t1 = BashOperator(
            task_id='%s-task-%s' % (child_dag_name, index + 1),
            bash_command=BS_template,
            params={'date':  currentDate, 'realtime_home': REALTIME_HOME,'siteId': siteid, "utilityType":utilityType},
            default_args=args,
            dag=dag_subdag)
    t2 = PythonOperator(
            task_id='%s-updatetask-%s' % (child_dag_name, index + 1),
            dag=dag_subdag,
            python_callable=update_status,
            op_kwargs={'siteId':siteid})

    t2.set_upstream(t1)
return dag_subdag

它创建动态任务,但在所有数量的动态任务上,它总是使最后一个失败并记录错误:“使用 sqlite 时不能使用超过 1 个线程。将 max_threads 设置为 1”例如:如果创建 4 个任务,则运行 3 次,并且如果创建了 2 个任务,则运行 1 次。

4

0 回答 0