8

我有一个分布式/联合数据库,其结构如下:

  1. 数据库分布在三个地理位置(“节点”)
  2. 多个数据库集群在每个节点上
  3. 关系数据库是 PostgreSQL、MySQL、Oracle 和 MS SQL Server 的混合;非关系数据库是 MongoDB 或 Cassandra
  4. 每个节点内和跨节点联合的松散耦合是通过 RabbitMQ 实现的,每个节点运行一个 RabbitMQ 代理

我正在为跨节点联合的作业(即非节点本地的作业)实现一个只读的节点间聚合作业系统。这些作业只执行“获取”查询——它们不修改数据库。(如果作业的结果打算进入一个或多个数据库,那么这是由一个单独的作业完成的,该作业不是我试图优化的节点间作业系统的一部分。)我的目标是最小化这些作业所需的网络带宽(首先最小化节点间/WAN 带宽,然后最小化节点内/LAN 带宽);我假设每个 WAN 链接的成本是统一的,每个 LAN 链接的成本是统一的。这些工作对时间不是特别敏感。我在一个节点内而不是在节点之间执行一些 CPU 负载平衡。

相对于集群或特定数据库本地的数据库写入量,聚合作业通过 WAN/LAN 传输的数据量很小,因此跨联合完全分布数据库是不切实际的。

我用于最小化网络带宽的基本算法是:

  1. 给定一个作业,该作业运行在整个联邦中的一组数据上,管理器节点向每个其他节点发送一条消息,其中包含相关的数据库查询。
  2. 每个节点运行它的一组查询,用 gzip 压缩它们,缓存它们,并将它们的压缩大小发送到管理器节点。
  3. 管理器移动到包含多个数据的节点(具体来说,移动到集群内具有最多数据并且具有空闲核心的机器);它从其他两个节点和集群中的其他机器请求其余数据,然后运行该作业。

在可能的情况下,作业使用分而治之的方法来最大限度地减少所需的数据共置量。例如,如果作业需要计算联盟中所有销售数据的总和,则每个节点在本地计算其销售总和,然后在管理节点聚合(而不是将所有未处理的销售数据复制到管理节点) . 但是,有时(例如在位于不同节点的两个表之间执行连接时)需要数据共置。

我做优化的第一件事是聚合作业,并在十分钟的时期运行聚合作业(机器都在运行 NTP,所以我可以合理地确定“每十分钟”在每个节点上意味着相同的事情)。目标是让两个作业能够共享相同的数据,从而降低传输数据的总体成本。

  1. 给定两个查询同一个表的作业,我生成每个作业的结果集,然后取两个结果集的交集。
  2. 如果两个作业都安排在同一个节点上运行,则网络传输成本计算为两个结果集的总和减去两个结果集的交集。
  3. 这两个结果集存储到 PostgreSQL 临时表(在关系数据的情况下)或临时 Cassandra columnfamilies / MongoDB 集合(在 nosql 数据的情况下)在选择运行作业的节点上;然后针对组合的结果集执行原始查询,并将数据传递给各个作业。(此步骤仅在组合结果集上执行;单个结果集数据只是简单地交付到其工作中,而无需首先存储在临时表/列族/集合中。)

这会改善网络带宽,但我想知道是否有框架/库/算法可以改善这一点。我考虑的一个选项是在节点缓存结果集,并在确定网络带宽时考虑这些缓存的结果集(即,除了当前的预调度协同定位作业集之外,尝试跨作业重用结果集,例如在一个 10 分钟的时期运行的作业可以使用前 10 分钟结果集中的缓存结果集),但除非作业使用完全相同的结果集(即,除非它们使用相同的 where 子句),否则我不知道一般情况 -将填补结果集中空白的目的算法(例如,如果结果集使用子句“where N > 3”并且不同的作业需要带有子句“where N > 0”的结果集 那么我可以使用什么算法来确定我需要将原始结果集与带有子句“其中 N > 0 AND N <= 3”的结果集结合起来 - 我可以尝试编写自己的算法来执行此操作,但结果将是一个错误无用的混乱。我还需要确定缓存数据何时过时 - 最简单的方法是将缓存数据的时间戳与源表上的最后修改时间戳进行比较,如果时间戳已更改,则替换所有数据,但理想情况下我希望能够仅更新已随每行或每块时间戳更改的值。) - 我可以尝试编写自己的算法来做到这一点,但结果将是一个错误的无用混乱。我还需要确定缓存数据何时过时 - 最简单的方法是将缓存数据的时间戳与源表上的最后修改时间戳进行比较,如果时间戳已更改,则替换所有数据,但理想情况下我希望能够仅更新已随每行或每块时间戳更改的值。) - 我可以尝试编写自己的算法来做到这一点,但结果将是一个错误的无用混乱。我还需要确定缓存数据何时过时 - 最简单的方法是将缓存数据的时间戳与源表上的最后修改时间戳进行比较,如果时间戳已更改,则替换所有数据,但理想情况下我希望能够仅更新已随每行或每块时间戳更改的值。

4

1 回答 1

4

我已经开始实施我对这个问题的解决方案。

为了简化节点内缓存并简化 CPU 负载平衡,我在每个数据库集群(“Cassandra 节点”)上使用 Cassandra 数据库来运行聚合作业(以前我是手动聚合本地数据库结果集) - 我将单个 Cassandra 数据库用于关系、Cassandra 和 MongoDB 数据(缺点是一些关系查询在 Cassandra 上运行速度较慢,但​​这可以通过单个统一聚合数据库更容易的事实来弥补维护而不是单独的关系和非关系聚合数据库)。我也不再在十分钟内聚合作业,因为缓存使该算法变得不必要。

节点中的每台机器都引用一个名为 Cassandra_Cache_[MachineID] 的 Cassandra 列族,用于存储它已发送到 Cassandra 节点的 key_ids 和 column_ids。Cassandra_Cache 列族由 Table 列、Primary_Key 列、Column_ID 列、Last_Modified_Timestamp 列、Last_Used_Timestamp 列和由 Table|Primary_Key|Column_ID 组成的复合键组成。Last_Modified_Timestamp 列表示来自源数据库的数据的 last_modified 时间戳,Last_Used_Timestamp 列表示聚合作业最后使用/读取数据的时间戳。当 Cassandra 节点向机器请求数据时,机器计算结果集,然后获取结果集和其 Cassandra_Cache 中的 table|key|columns 的集合差异,并且与 Cassandra_Cache 中的行具有相同的 Last_Modified_Timestamp(如果时间戳不匹配,则缓存数据已过时并与新的 Last_Modified_Timestamp 一起更新)。然后,本地机器将设置的差异发送到 Cassandra 节点,并使用设置的差异更新其 Cassandra_Cache,并更新用于组成结果集的每个缓存数据的 Last_Used_Timestamp。(为每个 table|key|column 维护单独的时间戳的更简单的替代方法是为每个 table|key 维护一个时间戳,但这不太精确,并且 table|key|column 时间戳不是过于复杂。

Cassandra 节点使用从节点内部接收到的新数据以及从其他节点接收到的数据更新其结果集。Cassandra 节点还维护一个列族,该列族存储与每台计算机的 Cassandra_Cache 中相同的数据(Last_Modified_Timestamp 除外,它仅在本地计算机上用于确定数据何时过时),以及指示数据是否来自的源 ID from inside of the node or from another node - id 区分不同的节点,但不区分本地节点内的不同机器。(另一种选择是使用统一的 Cassandra_Cache,而不是每台机器使用一个 Cassandra_Cache 加上节点的另一个 Cassandra_Cache,但我认为增加的复杂性不值得节省空间。)

每个 Cassandra 节点还维护一个 Federated_Cassandra_Cache,其中包含已从本地节点发送到其他两个节点之一的 {Database, Table, Primary_Key, Column_ID, Last_Used_Timestamp} 元组。

当一个作业通过管道时,每个 Cassandra 节点都会使用本地结果集更新其节点内缓存,并完成可以在本地执行的子作业(例如,在一个作业中,对多个节点之间的数据求和,每个节点对其进行求和)节点内数据,以最小化需要在节点间联合中共存的数据量)- 如果子作业仅使用节点内数据,则可以在本地执行。然后,管理器节点确定在哪个节点上执行其余作业:每个 Cassandra 节点可以通过获取其结果集的集差和已缓存的结果集子集,在本地计算将其结果集发送到另一个节点的成本。到它的 Federated_Cassandra_Cache,并且管理器节点最小化成本方程 ["从 NodeX 传输结果集的成本" + "

我正在为每个 Cassandra 节点使用 LRU 驱逐策略;我最初使用的是最旧优先驱逐策略,因为它实现起来更简单,并且需要更少的写入 Last_Used_Timestamp 列(每次数据更新一次,而不是每次读取数据一次),但结果证明 LRU 策略的实施并不过分复杂且 Last_Used_Timestamp 写入不会造成瓶颈。当 Cassandra 节点达到 20% 的可用空间时,它会驱逐数据,直到达到 30% 的可用空间,因此每次驱逐的大小约为可用总空间的 10%。节点维护两个时间戳:最后一个被驱逐的节点内数据的时间戳,以及最后一个被驱逐的节点间/联合数据的时间戳;由于节点间通信的延迟相对于节点内通信的延迟增加,驱逐策略的目标是让 75% 的缓存数据是节点间数据,25% 的缓存数据是节点内数据,这可以通过让每个驱逐的 25% 是节点间数据来快速近似,并且每次驱逐的 75% 是节点内数据。驱逐工作如下:

while(evicted_local_data_size < 7.5% of total space available) {
    evict local data with Last_Modified_Timestamp < 
        (last_evicted_local_timestamp += 1 hour)
    update evicted_local_data_size with evicted data
}

while(evicted_federated_data_size < 2.5% of total space available) {
    evict federated data with Last_Modified_Timestamp < 
        (last_evicted_federated_timestamp += 1 hour)
    update evicted_federated_data_size with evicted data
}

在从节点内的机器和其他节点收到驱逐确认之前,不会永久删除被驱逐的数据。

Cassandra 节点然后向其节点内的机器发送通知,指示新的 last_evicted_local_timestamp 是什么。本地机器更新他们的 Cassandra_Caches 以反映新的时间戳,并在完成时向 Cassandra 节点发送通知;当 Cassandra 节点收到来自所有本地机器的通知时,它会永久删除被驱逐的本地数据。Cassandra 节点还使用新的 last_evicted_federated_timestamp 向远程节点发送通知;其他节点更新他们的 Federated_Cassandra_Caches 以反映新的时间戳,并且 Cassandra 节点在收到来自每个节点的通知时永久删除被驱逐的联合数据(Cassandra 节点跟踪一条数据来自哪个节点,因此,在收到来自 NodeX 的驱逐确认后,节点可以在收到来自 NodeY 的驱逐确认之前永久删除被驱逐的 NodeX 数据)。在所有机器/节点都发送通知之前,如果 Cassandra 节点从尚未驱逐其旧数据的机器/节点接收到结果集,则它会在其查询中使用缓存的驱逐数据。例如,Cassandra 节点有一个已驱逐的本地 Table|Primary_Key|Column_ID 数据,同时本地机器(尚未处理驱逐请求)在其结果集中没有包含 Table|Primary_Key|Column_ID 数据,因为它认为Cassandra 节点的缓存中已经有数据;Cassandra 节点从本地机器接收结果集,

于 2013-06-06T03:42:25.143 回答