Apache Beam 对通过管道的所有元素应用转换。因此,定义的管道步骤将基于每个元素将纪元时间纪元时间转换为日期时间格式,然后再将其提取到 BigQuery 中。
以下 Apache Beam 管道代码将以秒为单位的纪元时间转换为日期时间,即。时间戳
示例代码:
import apache_beam as beam
class GetTimestamp(beam.DoFn):
def process(self, mytime, timestamp=beam.DoFn.TimestampParam):
yield '{}'.format(timestamp.to_utc_datetime())
with beam.Pipeline() as pipeline:
plant_timestamps = (
pipeline
| 'My Time' >> beam.Create([
{'ts': 1633013727, 'user': 'anant'},
{'ts': 1590969600, 'user':'samwilliam'},
])
| 'With timestamps' >> beam.Map(
lambda mytime: beam.window.TimestampedValue(mytime, mytime['ts']))
| 'Get timestamp' >> beam.ParDo(GetTimestamp())
| beam.Map(print)
)