28

我正在将记录写入 Kinesis Firehose 流,该流最终由 Amazon Kinesis Firehose 写入 S3 文件。

我的记录对象看起来像

ItemPurchase {
    String personId,
    String itemId
}

写入 S3 的数据如下所示:

{"personId":"p-111","itemId":"i-111"}{"personId":"p-222","itemId":"i-222"}{"personId":"p-333","itemId":"i-333"}

没有逗号分隔。

没有 Json 数组中的起始括号

[

没有结束括号,就像在 Json 数组中一样

]

我想读取此数据以获取 ItemPurchase 对象的列表。

List<ItemPurchase> purchases = getPurchasesFromS3(IOUtils.toString(s3ObjectContent))

读取这些数据的正确方法是什么?

4

13 回答 13

21

令人难以置信的是,Amazon Firehose 以这种方式将 JSON 消息转储到 S3,并且不允许您设置分隔符或任何东西。

最终,我发现解决问题的技巧是使用 JSON raw_decode 方法处理文本文件

这将允许您读取一堆连接的 JSON 记录,它们之间没有任何分隔符。

Python代码:

import json

decoder = json.JSONDecoder()

with open('giant_kinesis_s3_text_file_with_concatenated_json_blobs.txt', 'r') as content_file:

    content = content_file.read()

    content_length = len(content)
    decode_index = 0

    while decode_index < content_length:
        try:
            obj, decode_index = decoder.raw_decode(content, decode_index)
            print("File index:", decode_index)
            print(obj)
        except JSONDecodeError as e:
            print("JSONDecodeError:", e)
            # Scan forward and keep trying to decode
            decode_index += 1
于 2018-03-21T22:39:51.643 回答
6

我也遇到了同样的问题,我是这样解决的。

  1. 将“}{”替换为“}\n{”
  2. 用“\n”分割的行。

    input_json_rdd.map(lambda x : re.sub("}{", "}\n{", x, flags=re.UNICODE))
                  .flatMap(lambda line: line.split("\n"))
    

一个嵌套的 json 对象有几个“}”,所以用“}”分割行并不能解决问题。

于 2017-02-15T22:03:32.443 回答
3

我有同样的问题。

如果AWS允许我们设置分隔符会更好,但我们可以自己做。

在我的用例中,我一直在收听推文流,一旦收到一条新推文,我立即将其放到Firehose.

当然,这会导致无法解析的 1 行文件。

因此,为了解决这个问题,我将推文的 JSON 与\n. 反过来,这让我可以使用一些可以在读取流内容时输出行的包,并轻松解析文件。

希望这对您有所帮助。

于 2016-07-15T22:35:20.817 回答
3

我认为解决这个问题的最佳方法是首先创建一个格式正确的 json 文件,其中包含良好分离的 json 对象。就我而言,我在被推入消防软管的事件中添加了“,”。然后在 s3 中保存一个文件后,所有文件都将包含由一些分隔符(在我们的例子中是逗号)分隔的 json 对象。必须添加的另一件事是文件开头和结尾的“[”和“]”。然后你有一个包含多个 json 对象的正确 json 文件。现在可以解析它们了。

于 2018-01-29T15:47:47.793 回答
3

如果 firehose 的输入源是 Analytics 应用程序,则此不带分隔符的串联 JSON是此处引用的已知问题。你应该有一个像这里一样的 lambda 函数,它在多行中输出 JSON 对象。

于 2018-11-26T19:35:58.740 回答
2

我使用转换 Lambda 在每条记录的末尾添加换行符

def lambda_handler(event, context):
    output = []

    for record in event['records']:

        # Decode from base64 (Firehose records are base64 encoded)
        payload = base64.b64decode(record['data'])

        # Read json as utf-8    
        json_string = payload.decode("utf-8")

        # Add a line break
        output_json_with_line_break = json_string + "\n"

        # Encode the data
        encoded_bytes = base64.b64encode(bytearray(output_json_with_line_break, 'utf-8'))
        encoded_string = str(encoded_bytes, 'utf-8')

        # Create a deep copy of the record and append to output with transformed data
        output_record = copy.deepcopy(record)
        output_record['data'] = encoded_string
        output_record['result'] = 'Ok'

        output.append(output_record)

    print('Successfully processed {} records.'.format(len(event['records'])))

    return {'records': output}
于 2020-01-30T02:55:24.527 回答
2

使用这个简单的 Python 代码。

input_str = '''{"personId":"p-111","itemId":"i-111"}{"personId":"p-222","itemId":"i-222"}{"personId":"p-333","itemId":"i-333"}'''

data_str = "[{}]".format(input_str.replace("}{","},{"))
data_json = json.loads(data_str)

然后(如果需要)转换为 Pandas。

import pandas as pd   
df = pd.DataFrame().from_records(data_json)
print(df)

这是结果

itemId personId
0  i-111    p-111
1  i-222    p-222
2  i-333    p-333
于 2019-05-17T10:19:41.583 回答
1

您可以通过计算括号找到每个有效的 JSON。假设文件以{这个 python 片段开头应该可以工作:

import json

def read_block(stream):
    open_brackets = 0
    block = ''
    while True:
        c = stream.read(1)
        if not c:
            break

        if c == '{':
            open_brackets += 1
        elif c == '}':
            open_brackets -= 1

        block += c

        if open_brackets == 0:
            yield block
            block = ''


if __name__ == "__main__":
    c = 0
    with open('firehose_json_blob', 'r') as f:
        for block in read_block(f):
            record = json.loads(block)
            print(record)
于 2018-08-15T14:45:14.000 回答
0

这个问题可以通过一个 JSON 解析器来解决,该解析器一次使用一个来自流的对象。raw_decodeJSONDecoder的方法只公开了这样一个解析器,但我已经编写了一个库,它可以直接使用单线执行此操作。

from firehose_sipper import sip

for entry in sip(bucket=..., key=...):
    do_something_with(entry)

我在这篇博文中添加了更多细节

于 2022-02-14T07:32:43.113 回答
0

使用 JavaScript 正则表达式。

JSON.parse(`[${item.replace(/}\s*{/g, '},{')}]`);
于 2022-01-11T04:27:50.093 回答
0

您可以使用以下脚本。

如果流数据大小不超过您设置的缓冲区大小,则 s3 的每个文件都有一对括号([])和逗号。

import base64

print('Loading function')


def lambda_handler(event, context):
    output = []

    for record in event['records']:
        print(record['recordId'])
        payload = base64.b64decode(record['data']).decode('utf-8')+',\n'

        # Do custom processing on the payload here

        output_record = {
            'recordId': record['recordId'],
            'result': 'Ok',
            'data': base64.b64encode(payload.encode('utf-8'))
        }
        output.append(output_record)

    last = len(event['records'])-1
    print('Successfully processed {} records.'.format(len(event['records'])))
    
    start = '['+base64.b64decode(output[0]['data']).decode('utf-8')
    end = base64.b64decode(output[last]['data']).decode('utf-8')+']'
    
    output[0]['data'] = base64.b64encode(start.encode('utf-8'))
    output[last]['data'] = base64.b64encode(end.encode('utf-8'))
    return {'records': output}

于 2021-12-01T05:49:07.293 回答
0

在 Spark 中,我们遇到了同样的问题。我们正在使用以下内容:

from pyspark.sql.functions import *

@udf
def concatenated_json_to_array(text):
  final = "["
  separator = ""
  
  for part in text.split("}{"):
    final += separator + part
    separator = "}{" if re.search(r':\s*"([^"]|(\\"))*$', final) else "},{"
      
  return final + "]"


def read_concatenated_json(path, schema):
  return (spark.read
          .option("lineSep", None)
          .text(path)
          .withColumn("value", concatenated_json_to_array("value"))
          .withColumn("value", from_json("value", schema))
          .withColumn("value", explode("value"))
          .select("value.*"))  

它的工作原理如下:

  1. 将数据读取为每个文件一个字符串(无分隔符!)
  2. 使用 UDF 引入 JSON 数组并通过引入逗号来拆分 JSON 对象。注意:小心不要破坏}{其中的任何字符串!
  3. 将带有模式的 JSON 解析为 DataFrame 字段。
  4. 将数组分解为单独的行
  5. 将值对象展开为列。

像这样使用它:

from pyspark.sql.types import *

schema = ArrayType(
  StructType([
    StructField("type", StringType(), True),
    StructField("value", StructType([
      StructField("id", IntegerType(), True),
      StructField("joke", StringType(), True),
      StructField("categories", ArrayType(StringType()), True)  
    ]), True)
  ])
)

path = '/mnt/my_bucket_name/messages/*/*/*/*/'
df = read_concatenated_json(path, schema)

我在这里写了更多细节和注意事项:Parsing JSON data from S3 (Kinesis) with Spark。不要只用 分割}{,因为它会弄乱你的字符串数据!例如:{ "line": "a\"r}{t" }

于 2021-11-23T09:13:29.313 回答
0

如果有办法改变数据的写入方式,请用一行分隔所有记录。这样您就可以简单地逐行读取数据。如果没有,那么只需构建一个以“}”为分隔符的扫描仪对象并使用扫描仪进行读取。这样就可以了。

于 2016-05-19T08:41:37.793 回答