0

我需要在谷歌数据流中读取二进制文件,我只需要读取文件并将每 64 字节解析为一条记录,并在数据流中每 64 字节二进制文件的每个字节中应用一些逻辑。

我在 spark 尝试过同样的事情,代码 smape 如下:

 def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .appName("RecordSplit")
      .master("local[*]")
      .getOrCreate()

    val df = spark.sparkContext.binaryRecords("< binary-file-path>", 64)

    val Table = df.map(rec => {
      val c1= (convertHexToString(rec(0)))
      val c2= convertBinaryToInt16(rec, 48)
      val c3= rec(59)
      val c4= convertHexToString(rec(50)) match {
        case str =>
          if (str.startsWith("c"))
            2020 + str.substring(1).toInt
          else if (str.startsWith("b"))
            2010 + str.substring(1).toInt
          else if (str.startsWith("b"))
            2000 + str.substring(1).toInt
        case _ => 1920
      }
4

1 回答 1

2

我会推荐以下内容:

  • 如果你不限于 python/scala,OffsetBasedSource(FileBasedSource 是一个子类)可以满足你的需求,因为它使用偏移量来定义开始和结束位置。

  • TikaIO可以处理元数据,但它可以根据文档读取二进制数据。

  • 示例数据流意见分析包含要从任意字节位置读取的信息。

  • 还有其他文档可以创建自定义读取实现。您可能需要考虑查看这些Beam 示例,以获取有关如何实现自定义源的指导,例如这个python 示例

另一种方法是在管道外部(内存中)制作 64 字节的数组,然后从内存中创建 PCollection,请记住文档建议将其用于单元测试。

于 2018-11-10T01:54:13.163 回答