几天前我刚开始使用 Databricks,我试图从 S3/bucket_name/../raw 获取一些日志文件并对其进行处理以检查日志是否包含我想要使用的某些行并将这些行保存到另一个文件夹称为'S3/bucket_name/../processed/
这是我到目前为止所尝试的。
RAW_LOG_PATH = "dbfs:/mnt/" + AWS_BUCKET_NAME + RAW_FILE_PATH
PROCESSED_LOG_PATH = "dbfs:/mnt/" + AWS_BUCKET_NAME + PROCESSED_FILE_PATH
raw_file_list = dbutils.fs.ls(RAW_LOG_PATH)
processed_file_list = dbutils.fs.ls(PROCESSED_LOG_PATH)
processed_file_names = [file.name for file in processed_file_list]
#Filter log records that contains 'country:France'and save it to my_records
for file in raw_file_list:
if file.name not in processed_file_names:
my_records = []
my_entries = sc.textFile(file.path)
lines = my_entries.collect()
for line in lines:
if 'country:France' in line:
my_records.append(line)
new_file = PROCESSED_LOG_PATH + file.name
if len(mobile_recs_logs) > 0:
dbutils.fs.put(str(new_file), str(my_records))
print('File processed:', new_file, len(my_records))
else:
print ('No records:', file.name, len(my_records))
我能够提取我想要的行并将新文件输出到 S3 上的新处理文件夹中。但是,当我尝试访问该文件并输出结果时,我遇到了一些错误
# Checking the output after filter
FILE_NAME = '2016-10-27.log.gz'
check_new_file = PROCESSED_LOG_PATH + FILE_NAME
new_entries = sc.textFile(check_new_file)
logs = new_entries.take(1)
print (logs)
错误信息:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 257.0 failed 4 times, most recent failure: Lost task 0.3 in stage 257.0 (TID 275, ip-x-x-x-x.ap-southeast-1.compute.internal): java.io.IOException: incorrect header check
我猜这个问题是由输出格式保存回 S3 引起的。有没有办法从日志文件中处理和提取我需要的行,将这些行保存到另一个文件中以存储在 S3 上,并且仍然可以在 Databricks 上的 S3 上使用新存储的文件?