我有一个火花代码,它读取 .mf4 文件并写为 .txt 文件。还在数据块中运行此代码。
你能建议吗?即使在集群中增加数据块中的执行程序内存后,仍然存在问题。
pip install asammdf
from pyspark import SparkContext
from pyspark.sql import SparkSession
from asammdf import MDF
import io
import os
import sys
from pyspark.sql.functions import col
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession \
.builder \
.appName("mdf4") \
.getOrCreate()
sc = spark.sparkContext
def decodeBinary(val):
file_stream = io.BytesIO(val)
mdf = MDF(file_stream)
location = mdf.whereis(test_1)
return location
print("1")
input_hdfs_path_to_mdf4 = "dbfs:/FileStore/inputmfd4/"
channel_name = "test_1"
local_or_hdfs_output_path = "dbfs:/FileStore/outputmfd4/opp4.txt"
print("2")
raw_binary = sc.binaryFiles(input_hdfs_path_to_mdf4)
print("3")
decoded_binary = raw_binary.map(lambda r: r[1]).map(decodeBinary)
print("4")
decoded_binary.saveAsTextFile(local_or_hdfs_output_path)
print("5")
print(decoded_binary)
我在 databricks 中运行此代码,我有 5Gb mf4 文件作为输入。当我尝试运行小文件时没有问题。但是当我使用这个 5GB mf4 文件时
Caused by org.apache.spark.api.python.PythonException: asammdf.blocks.utils.MdfException: <_io.BytesIO object at 0x7efed84482c0> is not a valid ASAM MDF file: magic header is b\xff\xd8\xff\xe1H\xe6Ex from command-2705692180399242 line 24 Full traceback below
Traceback (most recent call last)