0

我正在尝试从 teradata 获取数据,然后使用完美任务写入和读取镶木地板文件当我单独执行 sql 时,我的代码正在工作,如下所示

def fetch_data(host,db_name,user,password,query):
    'logic'

@task(name="Write dask dataframe into GPFS parquet file")
def write_data(dask_dataframe,file_name):
    'logic'

@task(name="Read data from GPFS parquet file into dask dataframe")
def read_data(file_name):
    'logic'

with Flow("Teradata Example") as flow:
    result = fetch_data(host,db_name,user,password,query)
    write_data(dask_dataframe=result,file_name=file_name)
    read_data(file_name=file_name)

flow.run()

但是当相同的 fetcher 代码作为任务运行时,代码会失败

@task(name="Fetch sql query data from teradta data source into dask dataframe")
def fetch_data(host,db_name,user,password,query):
    'logic'

@task(name="Write dask dataframe into GPFS parquet file")
def write_data(dask_dataframe,file_name):
    'logic'

@task(name="Read data from GPFS parquet file into dask dataframe")
def read_data(file_name):
    'logic'

with Flow("Teradata Example") as flow:
    result = fetch_data(host,db_name,user,password,query)
    write_data(dask_dataframe=result,file_name=file_name)
    read_data(file_name=file_name)

flow.run()

添加了 teradata 文件代码:

def get_partitions(num_partitions):
    list_range =[]
    initial_start=0
    for i in range(num_partitions):
        amp_range = 3240//num_partitions
        start = (i*amp_range+1)*initial_start
        end   = (i+1)*amp_range
        list_range.append((start,end))
        initial_start = 1
    return list_range

@delayed
def load(query,start,end,connString):
    return pd.read_sql(query.format(start, end),connString)

class TeradataFetch(Task):
    def __init__(
        args)

    @defaults_from_attrs("fetch", "fetch_count", "query", "commit", "charset")
    def run(
        self,
        query: str,
    ) -> Any:
        try:
            results = from_delayed([load(query,start, end,connString) for start,end in get_partitions(self.num_partitions)])
            logging.debug("Fetch Results: %s", results)
            return results

        except Exception as e:
            raise e


有人可以在这里建议/帮助吗?

4

0 回答 0