0

我正在寻找PySparks DataFrameReaderDataFrameWriter的日志记录功能。读取或写入数据帧时,应跟踪操作中涉及的、 、filespartitions的数量,并在之后返回可用。records/rowsrawbytes

我已经检查了官方Apache Spark文档并用谷歌搜索,但现在似乎没有实现这样的功能PySpark。这些是我想出的想法:

  • dbutils使用in Databricks、云提供商库(boto3用于 AWS S3 或azure-storage-blobAzure)或Python内置os包检查相同的目录(用于读/写)
  • 与StatusTracker交互SparkContext并使用类似的东西来监控作业/阶段进度并通过(-> 未选中)返回作业信息jobId
  • Spark以某种调试/试运行模式运行并从记录器中提取信息(-> 未选中)

但我更多的是寻找任何易于使用的实现,例如虚构option("statistics", "true")设置:

df, read_statistics = spark.read.option("statistics", "true").csv("inputfile.csv")

write_statistics = df.write.option("statistics", "true").csv("outputfile.csv")

提前感谢您对社区的替代实施或未来发布计划的任何见解!

解决方案(部分):

基于收集和聚合事件的易于使用的解决方案(对于bytes和的数量)来自pyspark-spy包:recordsSparkContextSparkListener

from pyspark_spy import ContextSparkListener, register_listener

listener = ContextSparkListener()
register_listener(spark_context, listener)

with listener as events:
    df = spark.read.csv("inputfile.csv")
    spark.write.csv("outputfile.csv")

print("Read statistics: ", listener.stage_input_metrics_aggregate())
print("Write statistics: ", listener.stage_output_metrics_aggregate())

>>> Read statistics:  InputMetrics(bytesRead=140129708, recordsRead=271502)
>>> Write statistics:  OutputMetrics(bytesWritten=136828555, recordsWritten=265106)
4

0 回答 0