0

在过去的几个月里,我一直在生产中使用以下代码,

@task
def sql_run_procs():
    """This is a delete and update..."""
    # Get our logger
    logger = prefect.utilities.logging.get_logger()  # type: ignore

    conn = connect_db(prefect.config.kv.p.prod_db_constring, logger)  ## wrapper around create_engine()

    with conn.connect() as con:
        try:
            r = con.execute(
                f"EXECUTE fs.spETL_MyProc '{prefect.config.kv.p.staging_db_name}'"
            ).fetchall()
            for q in r[0]:
                if q == 1:
                    logger.info(f"Query {q} has failed")
                    raise signals.FAIL()
        except :
            raise SQLAlchemyError("Error in SQL Script")

所以像任何优秀的编码器一样,我将代码复制并粘贴到另一个脚本中

@task
def sql_run_procs():
    """This is a clean, truncate and insert"""
    # Get our logger
    logger = prefect.utilities.logging.get_logger()  # type: ignore

    conn = connect_db(prefect.config.kv.p.prod_db_constring, logger)

    with conn.connect() as con:
        try:
            r = con.execute(
                f"EXECUTE forms.spETL_MyOtherProc '{prefect.config.kv.p.staging_db_name}'"
            ).fetchall()
            for q in r[0]:
                if q == 1:
                    logger.info(f"Query {q} has failed")
                    raise signals.FAIL()
        except :
            raise SQLAlchemyError("Error in SQL Script")

并得到以下错误:

AttributeError:“NoneType”对象没有属性“fetchall”

除了存储过程的名称之外,唯一的区别是它们位于不同的 Prefect 项目中。我已经在这个网站和其他网站上搜索了可能的解决方案,但没有成功。我知道它可能正盯着我的脸,但一个半小时后……你知道的。提前致谢。

4

0 回答 0