我正在寻找PySparks DataFrameReader和DataFrameWriter的日志记录功能。读取或写入数据帧时,应跟踪操作中涉及的、 、files等partitions的数量,并在之后返回可用。records/rowsrawbytes
我已经检查了官方Apache Spark文档并用谷歌搜索,但现在似乎没有实现这样的功能PySpark。这些是我想出的想法:
dbutils使用inDatabricks、云提供商库(boto3用于 AWS S3 或azure-storage-blobAzure)或Python内置os包检查相同的目录(用于读/写)- 与StatusTracker交互
SparkContext并使用类似的东西来监控作业/阶段进度并通过(-> 未选中)返回作业信息jobId Spark以某种调试/试运行模式运行并从记录器中提取信息(-> 未选中)
但我更多的是寻找任何易于使用的实现,例如虚构option("statistics", "true")设置:
df, read_statistics = spark.read.option("statistics", "true").csv("inputfile.csv")
write_statistics = df.write.option("statistics", "true").csv("outputfile.csv")
提前感谢您对社区的替代实施或未来发布计划的任何见解!
解决方案(部分):
基于收集和聚合事件的易于使用的解决方案(对于bytes和的数量)来自pyspark-spy包:recordsSparkContextSparkListener
from pyspark_spy import ContextSparkListener, register_listener
listener = ContextSparkListener()
register_listener(spark_context, listener)
with listener as events:
df = spark.read.csv("inputfile.csv")
spark.write.csv("outputfile.csv")
print("Read statistics: ", listener.stage_input_metrics_aggregate())
print("Write statistics: ", listener.stage_output_metrics_aggregate())
>>> Read statistics: InputMetrics(bytesRead=140129708, recordsRead=271502)
>>> Write statistics: OutputMetrics(bytesWritten=136828555, recordsWritten=265106)