0

几天前我刚开始使用 Databricks,我试图从 S3/bucket_name/../raw 获取一些日志文件并对其进行处理以检查日志是否包含我想要使用的某些行并将这些行保存到另一个文件夹称为'S3/bucket_name/../processed/

这是我到目前为止所尝试的。

RAW_LOG_PATH = "dbfs:/mnt/" + AWS_BUCKET_NAME + RAW_FILE_PATH
PROCESSED_LOG_PATH = "dbfs:/mnt/" + AWS_BUCKET_NAME + PROCESSED_FILE_PATH

raw_file_list = dbutils.fs.ls(RAW_LOG_PATH)
processed_file_list = dbutils.fs.ls(PROCESSED_LOG_PATH)    
processed_file_names = [file.name for file in processed_file_list]

#Filter log records that contains 'country:France'and save it to my_records 
for file in raw_file_list:
  if file.name not in processed_file_names:
    my_records = []
    my_entries = sc.textFile(file.path)
    lines = my_entries.collect()
    for line in lines:
      if 'country:France' in line: 
        my_records.append(line)
    new_file = PROCESSED_LOG_PATH + file.name
    if len(mobile_recs_logs) > 0:
      dbutils.fs.put(str(new_file), str(my_records))
      print('File processed:', new_file, len(my_records))
    else:
      print ('No records:', file.name, len(my_records))

我能够提取我想要的行并将新文件输出到 S3 上的新处理文件夹中。但是,当我尝试访问该文件并输出结果时,我遇到了一些错误

# Checking the output after filter
FILE_NAME = '2016-10-27.log.gz'
check_new_file = PROCESSED_LOG_PATH + FILE_NAME

new_entries = sc.textFile(check_new_file)
logs = new_entries.take(1)
  print (logs)

错误信息:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 257.0 failed 4 times, most recent failure: Lost task 0.3 in stage 257.0 (TID 275, ip-x-x-x-x.ap-southeast-1.compute.internal): java.io.IOException: incorrect header check

我猜这个问题是由输出格式保存回 S3 引起的。有没有办法从日志文件中处理和提取我需要的行,将这些行保存到另一个文件中以存储在 S3 上,并且仍然可以在 Databricks 上的 S3 上使用新存储的文件?

4

1 回答 1

1

在这种特定情况下,我认为问题在于您的输入文件是 gzip 压缩的,并且该文件的解压缩由 自动处理sc.textFile,但是当您将文件保存回/processed文件夹时,新文件没有被压缩。由于新文件的名称以 结尾.gz,Spark 尝试解压该文件,但解压失败。

为避免此问题,请考虑压缩您编写的输出或重命名输出文件以省略.gz扩展名。

于 2016-10-28T20:01:53.310 回答