1

我正在尝试使用数据流 Apache Beam(python)将流中的事件写入 Big Query 表,但时间戳格式存在问题。

我有一个带有纪元时间戳(int)值的事件(json),我想将它插入到带有时间戳列的 BIG QUERY 表中。

最好的方法是什么?我可以在不解析每个事件的情况下做到这一点吗?我可以声明收到的时间戳的格式吗?

例如 :

event= {'ts' : 1630494181342 ,'user' : 'anat'}

进入表:

{ts: timestamp , user: string}
4

1 回答 1

1

Apache Beam 对通过管道的所有元素应用转换。因此,定义的管道步骤将基于每个元素将纪元时间纪元时间转换为日期时间格式,然后再将其提取到 BigQuery 中。

以下 Apache Beam 管道代码将以秒为单位的纪元时间转换为日期时间,即。时间戳

示例代码:

import apache_beam as beam
 
class GetTimestamp(beam.DoFn):
 def process(self, mytime, timestamp=beam.DoFn.TimestampParam):
   yield '{}'.format(timestamp.to_utc_datetime())
 
with beam.Pipeline() as pipeline:
 plant_timestamps = (
     pipeline
     | 'My Time' >> beam.Create([
         {'ts': 1633013727, 'user': 'anant'},
         {'ts': 1590969600, 'user':'samwilliam'},   
     ])
     | 'With timestamps' >> beam.Map(
         lambda mytime: beam.window.TimestampedValue(mytime, mytime['ts']))
     | 'Get timestamp' >> beam.ParDo(GetTimestamp())
     | beam.Map(print)
 )
于 2021-09-30T16:39:27.360 回答