1

我让 pmacct 每小时将网络流量汇总到 postgres 数据库中。我需要编写一个脚本/查询以将不同格式的数据移动到 mysql 数据库中。我想使用 SQL 进行尽可能多的数据处理,因为这个数据集将快速增长。

我运行了一个 perl 脚本来添加一个额外的字段 (agent_id) 来跟踪数据所在的区域(本地/国家/国际),它将显示为 0、1 或 2。

我从中提取此数据的表架构中的相关字段是:

ip_src, ip_dst, agent_id, bytes, stamp_updated, processed

我要插入数据的架构是:

ip, local_down_mb, nat_down_mb, int_down_mb, local_up_mb, nat_up_mb, int_up_mb, timestamp

由于我只是在寻找源或目标是我的范围之一的流量,因此我目前有一个查询,它以我想要的方式从 postgres 数据库中获取上传数据:

SELECT DISTINCT ip_src, agent_id, SUM(bytes), stamp_updated FROM acct
WHERE ip_src <<= '192.168.0.0/22'
   OR ip_src <<= '10.1.2.0/24'
   OR ip_src <<= '1.2.3.4/32'
GROUP BY ip_src, agent_id, stamp_updated
ORDER BY ip_src, agent_id, stamp_updated

该查询的示例输出是:

   ip_src     | agent_id |    sum    |    stamp_updated    
--------------+----------+-----------+---------------------
10.1.2.134    |        2 |      3192 | 2012-09-13 21:20:01
10.1.2.134    |        2 |      3192 | 2012-09-13 22:20:01
10.1.2.134    |        2 |      3192 | 2012-09-13 23:20:01
10.2.3.252    |        2 |       448 | 2012-09-11 06:00:01
10.2.3.252    |        2 |       448 | 2012-09-11 07:20:01
10.2.3.252    |        2 |       448 | 2012-09-11 08:20:01
10.2.3.252    |        2 |      8112 | 2012-09-11 09:20:01

在这个阶段,我知道我可以对 ip_dst 运行相同的查询,然后在将数据以新格式重新插入 mysql 时进行一些手动过程,以确保 ip 源和目标匹配时间戳,然后使用 agent_id 和我插入的 ip 源或 ip 目的地的组合来了解它是入站还是出站,以及流量是本地、国家还是国际。

然而,我想要的是一个可以为我完成所有这些的查询。我的 SQL 知识的限制是几个月前已经阅读了 W3C 网站教程,这使我能够像上面那样编写查询,但不会更进一步。

据我所知,我需要一些帮助是在两组结果之间编写一个连接,一组用于 ip_src,另一组用于 ip_dst,然后做一些魔术来使用流量方向的信息与agent_id 具有与 mysql 数据库的模式匹配的输出。

是否有人可以(非常友好地)编写他们认为可能适用于完成此任务的查询,或者至少将我指向相关文档并让我抢先了解我可能需要使用哪些功能来完成这项工作?

4

1 回答 1

3
  • 为了解决您对如何将 ip_src 粘合到 ip_dst 搜索的主要问题,您希望在两个查询上使用 FULL OUTER JOIN 来处理 IP 在给定时间戳中仅具有一个方向的流量的情况。如果您的数据加载器可以保证匹配数据,您可以使用 INNER JOIN,但为什么要冒险呢?
  • 由于您希望将 agent_id 转换为目标架构中的 3 个单独的列,因此我展示了一种在聚合函数内部使用条件的方法。
  • 我假设根据列名将最终输出中的字节数转换为四舍五入的兆字节。

      SELECT down.ip,
             ceil(down.lb/1048576) AS local_down_mb,
             ceil(down.nb/1048576) AS nat_down_mb,
             ceil(down.ib/1048576) AS int_down_mb,
             ceil(up.lb/1048576) AS local_up_mb,
             ceil(up.nb/1048576) AS nat_up_mb,
             ceil(up.ib/1048576) AS int_up_mb,
             down.timestamp
        FROM (SELECT ip_src AS ip,
                     SUM(CASE WHEN agent_id=0 THEN bytes ELSE 0 END) AS lb,
                     SUM(CASE WHEN agent_id=1 THEN bytes ELSE 0 END) AS nb,
                     SUM(CASE WHEN agent_id=2 THEN bytes ELSE 0 END) AS ib,
                     stamp_updated AS timestamp
                FROM acct
               WHERE ip_src <<= '192.168.0.0/22'
                  OR ip_src <<= '10.1.2.0/24'
                  OR ip_src <<= '1.2.3.4/32'
            GROUP BY ip,timestamp) down
        FULL OUTER JOIN
             (SELECT ip_dst AS ip,
                     SUM(CASE WHEN agent_id=0 THEN bytes ELSE 0 END) AS lb,
                     SUM(CASE WHEN agent_id=1 THEN bytes ELSE 0 END) AS nb,
                     SUM(CASE WHEN agent_id=2 THEN bytes ELSE 0 END) AS ib,
                     stamp_updated AS timestamp
                FROM acct
               WHERE ip_dst <<= '192.168.0.0/22'
                  OR ip_dst <<= '10.1.2.0/24'
                  OR ip_dst <<= '1.2.3.4/32'
            GROUP BY ip,timestamp) up
       USING (ip,timestamp)
    ORDER BY ip,timestamp;
    
于 2012-10-14T02:52:51.667 回答