当您谈论在 Spark 中使用数据框时,从广义上讲,您可以做以下三件事之一:a)生成数据框 b)转换数据框 c)使用数据框
在结构化流式传输中,流式传输 DataFrame 是使用DataSource生成的。通常,您使用公开的 sparkSession.readStream方法创建源。此方法返回一个 DataStreamReader,它有几种从各种输入中读取的方法。所有这些都返回一个DataFrame。它在内部创建一个数据源。Spark 允许您实现自己的 DataSource,但他们建议不要这样做,因为从 2.2 开始,该接口被认为是实验性的
您主要使用 map 或 reduce 或 spark SQL 来转换数据帧。有不同风格的map(map、mapPartition、mapParititionWithIndex)等。它们基本上都是取一行并返回一行。Spark 在内部执行并行化对 map 方法的调用的工作。它对数据进行分区,将其分布在集群上的 executor 上,并在 executor 中调用您的 map 方法。您无需担心并行性。它建在引擎盖下。mapParitions 不是“顺序的”。是的,一个分区中的行是按顺序执行的,但多个分区是并行执行的。您可以通过对数据框进行分区来轻松控制并行度。您有 5 个分区,您将有 5 个并行运行的进程。你有200个,
请注意,没有什么可以阻止您使用管理转换内部状态的外部系统。但是,您的转换应该是幂等的。给定一组输入,它们应该总是产生相同的输出,并且随着时间的推移使系统处于相同的状态。如果您在转换中与外部系统交谈,这可能会很困难。结构化流式传输至少提供一次保证。这意味着同一行可能会被转换多次。因此,如果您正在执行向银行账户充值之类的操作,您可能会发现您在某些账户中添加了两次相同金额的钱。
数据由接收器消耗。通常,您通过在 Dataframe 上调用 format 方法然后调用 start 添加接收器。StructuredStreaming 有一些内置的接收器(除了一个)或多或少没用。您可以创建自定义接收器,但同样不建议这样做,因为接口是实验性的。唯一有用的接收器是你要实现的。它被称为 ForEachSink。Spark 会为每个接收器调用您的与分区中的所有行。您可以对行做任何您想做的事情,包括将其写入 Hbase。请注意,由于结构化流的至少一次性质,同一行可能会多次馈送到您的 ForEachSink。您应该以幂等的方式实现它。此外,如果您有多个接收器,数据将并行写入接收器。您无法控制接收器的调用顺序。可能会发生一个接收器正在从一个微批次获取数据,而另一个接收器仍在处理前一个微批次的数据。本质上,Sinks 最终是一致的,而不是立即一致的。
通常,构建代码的最简洁方法是避免在转换中进入外部系统。您的转换应该纯粹转换数据框中的数据。如果您想要来自 HBase 的数据,请将其放入数据帧中,将其与您的流数据帧连接,然后对其进行转换。这是因为当您使用外部系统时,它变得难以扩展。您希望通过增加数据帧的分区和添加节点来扩展转换。但是,与外部系统通信的节点过多会增加外部系统的负载并导致瓶颈,将转换与数据检索分开允许您独立扩展它们。
但!!!!这里有大但是......
1)当您谈论结构化流时,没有办法实现可以根据输入中的数据有选择地从 HBase 获取数据的 Source。您必须在 map(-like) 方法中执行此操作。因此,IMO,如果 Hbase 中的数据发生更改或有很多您不想保留在内存中的数据,那么您所拥有的一切都很好。如果您在 HBase 中的数据很小且不变,那么最好将其读入批处理数据帧,缓存它,然后将其与您的流数据帧连接。Spark 会将所有数据加载到它自己的内存/磁盘存储中,并保留在那里。如果您的数据很小并且变化非常频繁,最好在数据帧中读取它,不要缓存它并将其与流数据帧连接。每次运行微批处理时,Spark 都会从 HBase 加载数据。
2) 无法命令执行 2 个单独的接收器。因此,如果您的要求要求您写入数据库,然后写入 Kafka,并且您希望保证在将行提交到数据库后写入 Kafka 中的行,那么唯一的方法是 a)在 For each Sink 中进行两次写入。b)以类似地图的功能写入一个系统,并在每个接收器中写入另一个系统
不幸的是,如果您有一个要求,需要您从流式源中读取数据,将其与批处理源中的数据连接,对其进行转换,将其写入数据库,调用 API,从 API 中获取结果并将结果写入API 到 Kafka,并且这些操作必须按确切的顺序完成,那么您可以做到这一点的唯一方法是在转换组件中实现接收器逻辑。您必须确保将逻辑单独保存在单独的映射函数中,以便以最佳方式并行化它们。
此外,没有好的方法可以知道您的应用程序何时完全处理了微批处理,尤其是在您有多个接收器的情况下