6

我正在尝试为 AWS Lambda 创建代码以将 csv 转换为镶木地板。我可以使用 Pyarrow 做到这一点,但它的大小太大(未压缩约 200 MB),因此我无法在 Lambda 的部署包中使用它。我正在尝试使用 BytesIO 库直接将镶木地板文件写入 s3 存储桶。

下面是我的 lambda 函数代码:

import json
import boto3
import pandas as pd
from io import BytesIO


def lambda_handler():

    s3 = boto3.client('s3')
    response = s3.list_objects_v2(
        Bucket = 'mybucket',
        Prefix = 'subfolder/'
    )
    files = get_object_keys(response)
    for file in files:
        obj = s3.get_object(Bucket='mybucket', Key=file)
        df = pd.read_csv(obj['Body'], sep='|')


        buf = BytesIO()
        df.to_parquet(buf, engine='fastparquet', index=False, compression='snappy')
        buf.seek(0)
        key = f"output/{file.split('/')[1].split('.')[0]}.parquet"
        s3.put_object(Bucket='mybucket', Body=buf.getvalue(), Key=key)

def get_object_keys(response):

    files = []
    for content in response['Contents']:
        if content['Key'].endswith('.csv'):
            files.append(content['Key'])
    return files

lambda_handler()

当我在 dataframe.to_parquet() 中使用“fastparquet”作为引擎时,出现以下错误:

Traceback (most recent call last):
  File ".\lambda_function.py", line 77, in <module>
    lambda_handler()
  File ".\lambda_function.py", line 64, in lambda_handler
    df.to_parquet(buf, engine='fastparquet', index=False, compression='snappy')
  File "C:\Users\tansingh\AppData\Local\Programs\Python\Python37\lib\site-packages\pandas\util\_decorators.py", line 214, in wrapper
    return func(*args, **kwargs)
  File "C:\Users\tansingh\AppData\Local\Programs\Python\Python37\lib\site-packages\pandas\core\frame.py", line 2116, in to_parquet  
    **kwargs,
  File "C:\Users\tansingh\AppData\Local\Programs\Python\Python37\lib\site-packages\pandas\io\parquet.py", line 264, in to_parquet
    **kwargs,
  File "C:\Users\tansingh\AppData\Local\Programs\Python\Python37\lib\site-packages\pandas\io\parquet.py", line 185, in write
    **kwargs,
  File "C:\Users\tansingh\AppData\Local\Programs\Python\Python37\lib\site-packages\fastparquet\writer.py", line 880, in write       
    compression, open_with, has_nulls, append)
  File "C:\Users\tansingh\AppData\Local\Programs\Python\Python37\lib\site-packages\fastparquet\writer.py", line 734, in write_simple
    with open_with(fn, mode) as f:
  File "C:\Users\tansingh\AppData\Local\Programs\Python\Python37\lib\site-packages\fastparquet\util.py", line 42, in default_open   
    return open(f, mode)
TypeError: expected str, bytes or os.PathLike object, not _io.BytesIO

有谁知道如何解决这一问题?

4

1 回答 1

1

通过使用 pyarrow 作为编写引擎解决了这个错误。

示例代码。

buffer = ioBytesIO()
df.to_parquet(buffer, engine="pyarrow", index = False)
s3_resource = boto3.resource('s3')
s3_resouce.Object(`bucketname`, `path_withfilename`).put(body = buffer.getvalue())

为了在 python 3.6 中读取 parquet 文件,我使用了 fastparquet,但对于编写 pyarrow 引擎似乎有效。

于 2021-08-09T21:23:57.873 回答