11

我正在开发一个 python 应用程序,它只是将 csv 文件转换为 hive/athena 兼容的 parquet 格式,我正在使用 fastparquet 和 pandas 库来执行此操作。csv 文件中有时间戳值2018-12-21 23:45:00,需要将其写入timestampparquet 文件中的类型。下面是我正在运行的代码,

columnNames = ["contentid","processed_time","access_time"]

dtypes = {'contentid': 'str'}

dateCols = ['access_time', 'processed_time']

s3 = boto3.client('s3')

obj = s3.get_object(Bucket=bucketname, Key=keyname)

df = pd.read_csv(io.BytesIO(obj['Body'].read()), compression='gzip', header=0, sep=',', quotechar='"', names = columnNames, error_bad_lines=False, dtype=dtypes, parse_dates=dateCols)

s3filesys = s3fs.S3FileSystem()

myopen = s3filesys.open

write('outfile.snappy.parquet', df, compression='SNAPPY', open_with=myopen,file_scheme='hive',partition_on=PARTITION_KEYS)

代码运行成功,下面是熊猫创建的数据框

contentid                 object
processed_time            datetime64[ns]
access_time               datetime64[ns]

+50942-11-30 14:00:00.000最后,当我在 Hive 和 athena中查询 parquet 文件时,时间戳值是2018-12-21 23:45:00

非常感谢任何帮助

4

5 回答 5

5

我知道这个问题很老,但它仍然是相关的。

如前所述,Athena 仅支持 int96 作为时间戳。使用 fastparquet 可以为 Athena 生成正确格式的 parquet 文件。重要的部分是 times='int96' 因为这告诉 fastparquet 将 pandas 日期时间转换为 int96 时间戳。

from fastparquet import write
import pandas as pd

def write_parquet():
  df = pd.read_csv('some.csv')
  write('/tmp/outfile.parquet', df, compression='GZIP', times='int96')
于 2020-04-17T12:07:38.123 回答
2

你可以试试:

dataframe.to_parquet(file_path, compression=None, engine='pyarrow', allow_truncated_timestamps=True, use_deprecated_int96_timestamps=True)
于 2020-09-22T07:14:20.873 回答
2

我通过这种方式解决了这个问题。

用to_datetime方法转换 df 系列

接下来使用.dt 访问器选择 datetime64[ns] 的日期部分

例子:

df.field = pd.to_datetime(df.field)
df.field = df.field.dt.date

之后,雅典娜将识别数据

于 2020-06-09T23:33:32.997 回答
0

问题似乎出在雅典娜身上,它似乎只支持 int96,当您在 pandas 中创建时间戳时,它是 int64

我的包含字符串日期的数据框列是“sdate”我首先转换为时间戳

# add a new column w/ timestamp
df["ndate"] = pandas.to_datetime["sdate"]
# convert the timestamp to microseconds
df["ndate"] = pandas.to_datetime(["ndate"], unit='us')

# Then I convert my dataframe to pyarrow
table = pyarrow.Table.from_pandas(df, preserve_index=False)

# After that when writing to parquet add the coerce_timestamps and 
# use_deprecated_int96_timstamps. (Also writing to S3 directly)
OUTBUCKET="my_s3_bucket"

pyarrow.parquet.write_to_dataset(table, root_path='s3://{0}/logs'.format(OUTBUCKET), partition_cols=['date'], filesystem=s3, coerce_timestamps='us', use_deprecated_int96_timestamps=True)

于 2019-05-24T21:56:33.307 回答
-1

我遇到了同样的问题,经过大量研究,现在已经解决了。

当你这样做时

write('outfile.snappy.parquet', df, compression='SNAPPY', open_with=myopen,file_scheme='hive',partition_on=PARTITION_KEYS)

它在幕后使用 fastparquet,它使用与 Athena 兼容的不同的 DateTime 编码。

解决方案是:卸载 fastparquet 并安装 pyarrow

  • pip 卸载 fastparquet
  • 点安装 pyarrow

再次运行您的代码。这次应该可以了。:)

于 2019-05-07T14:42:40.013 回答