我已经用 postgresql 数据库设置了气流,我正在创建多个 dag
def subdag(parent_dag_name, child_dag_name,currentDate,batchId,category,subCategory,yearMonth,utilityType,REALTIME_HOME, args):
dag_subdag = DAG(
dag_id='%s.%s' % (parent_dag_name, child_dag_name),
default_args=args,
schedule_interval="@once",
)
# get site list to run bs reports
site_list = getSiteListforProcessing(category,subCategory,utilityType,yearMonth);
print (site_list)
def update_status(siteId,**kwargs):
createdDate=getCurrentTimestamp();
print ('N',siteId,batchId,yearMonth,utilityType,'N')
updateJobStatusLog('N',siteId,batchId,yearMonth,utilityType,'P')
def error_status(siteId,**kwargs):
createdDate=getCurrentTimestamp();
print ('N',siteId,batchId,yearMonth,utilityType,'N')
BS_template = """
echo "{{ params.date }}"
java -cp xx.jar com.xGenerator {{params.siteId}} {{params.utilityType}} {{params.date}}
"""
for index,siteid in enumerate(site_list):
t1 = BashOperator(
task_id='%s-task-%s' % (child_dag_name, index + 1),
bash_command=BS_template,
params={'date': currentDate, 'realtime_home': REALTIME_HOME,'siteId': siteid, "utilityType":utilityType},
default_args=args,
dag=dag_subdag)
t2 = PythonOperator(
task_id='%s-updatetask-%s' % (child_dag_name, index + 1),
dag=dag_subdag,
python_callable=update_status,
op_kwargs={'siteId':siteid})
t2.set_upstream(t1)
return dag_subdag
它创建动态任务,但在所有数量的动态任务上,它总是使最后一个失败并记录错误:“使用 sqlite 时不能使用超过 1 个线程。将 max_threads 设置为 1”例如:如果创建 4 个任务,则运行 3 次,并且如果创建了 2 个任务,则运行 1 次。