3

我有很多 SiLK 流数据,我想对其进行一些数据挖掘。看起来目标 IP 列与更下方的一行数据的源 IP 列相匹配。如何将源 id 行与 R 中的目标 id 行合并?我为您准备了一些简化的网络流量数据:

id    sip    dip    notes
1     20     30     20 is talking to 30
2     20     31     20 is talking to 31
3     20     32     20 is talking to 32
4     30     20     30 is responding to 20
5     31     20     31 is responding to 20
6     32     20     32 is responding to 20
7     20     32     20 is talking to 32 again
8     20     30     20 is talking to 30 again
9     32     20     32 is responding to 20 again
10    20     31     20 is talking to 31 again
11    31     20     31 is responding to 20 again
12    30     20     30 is responding to 20 again
13    21     30     21 is talking to 30
14    30     21     30 is responding to 21

我想合并这些行,使它们看起来像这样:

id_S    sip_S    dip_S    notes_S                      id_D    sip_D    dip_D    notes_D
1       20       30       20 is talking to 30          4       30       20       30 is responding to 20
2       20       31       20 is talking to 31          5       31       20       31 is responding to 20
3       20       32       20 is talking to 32          6       32       20       32 is responding to 20
7       20       32       20 is talking to 32 again    9       32       20       32 is responding to 20 again
8       20       30       20 is talking to 30 again    12      30       20       30 is responding to 20 again
10      20       31       20 is talking to 31 again    11      31       20       31 is responding to 20 again
13      21       30       21 is talking to 30          14      30       21       30 is responding to 21

我有超过一百万行数据。在 SQL Express 中执行此操作需要数天时间和大量磁盘空间:

WITH flowtest_merged AS(
SELECT
    s.id AS id_S,
    s.sip AS sip_S,
    s.dip AS dip_S,
    s.notes AS notes_S,
    d.id AS id_D,
    d.sip AS sip_D,
    d.dip AS dip_D,
    d.notes AS notes_D,
    ROW_NUMBER() OVER(PARTITION BY s.id ORDER BY d.id) AS RN
FROM
    flowtest AS s INNER JOIN
    flowtest AS d ON
    s.dip = d.sip AND /* The source id is talking to the destination id */
    s.sip = d.dip AND /* The destination id is responding to the source id */
    s.id < d.id AND /* The source id is the initiator of the exchange */
    s.sip < 30 /* shorthand for "I'm selecting the internal ip range here" */
)
SELECT
    id_S,
    sip_S,
    dip_S,
    notes_S,
    id_D,
    sip_D,
    dip_D,
    notes_D
FROM flowtest_merged
WHERE (RN = 1)

问题是,我不知道如何执行 ROW_NUMBER() OVER(PARTITION BY s.id ORDER BY d.id) 部分。因此,如果我在 R 中重建示例数据框:

> flowtest <- data.frame(
+     "id" = c(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14),
+     "sip" = c(20, 20, 20, 30, 31, 32, 20, 20, 32, 20, 31, 30, 21, 30),
+     "dip" = c(30, 31, 32, 20, 20, 20, 32, 30, 20, 31, 20, 20, 30, 21),
+     "notes" = c(
+         "20 is talking to 30",
+         "20 is talking to 31",
+         "20 is talking to 32",
+         "30 is responding to 20",
+         "31 is responding to 20",
+         "32 is responding to 20",
+         "20 is talking to 32 again",
+         "20 is talking to 30 again",
+         "32 is responding to 20 again",
+         "20 is talking to 31 again",
+         "31 is responding to 20 again",
+         "30 is responding to 20 again",
+         "21 is talking to 30",
+         "30 is responding to 21"),
+     stringsAsFactors = FALSE)

使其看起来与 SQL 数据相同:

> flowtest
   id sip dip                        notes
1   1  20  30          20 is talking to 30
2   2  20  31          20 is talking to 31
3   3  20  32          20 is talking to 32
4   4  30  20       30 is responding to 20
5   5  31  20       31 is responding to 20
6   6  32  20       32 is responding to 20
7   7  20  32    20 is talking to 32 again
8   8  20  30    20 is talking to 30 again
9   9  32  20 32 is responding to 20 again
10 10  20  31    20 is talking to 31 again
11 11  31  20 31 is responding to 20 again
12 12  30  20 30 is responding to 20 again
13 13  21  30          21 is talking to 30
14 14  30  21       30 is responding to 21

当我进行微弱的合并尝试时:

> flowtest_merged <- merge(
+     flowtest[,setdiff(colnames(flowtest), "dip")],
+     flowtest[,setdiff(colnames(flowtest), "sip")],
+     by.x = "sip",
+     by.y = "dip",
+     all = FALSE,
+     suffixes = c("_S", "_D"))

它有很多很多行(和错误的列):

> flowtest_merged
   sip id_S                      notes_S id_D                      notes_D
1   20    1          20 is talking to 30    5       31 is responding to 20
2   20    1          20 is talking to 30    6       32 is responding to 20
3   20    1          20 is talking to 30   11 31 is responding to 20 again
4   20    1          20 is talking to 30    4       30 is responding to 20
5   20    1          20 is talking to 30    9 32 is responding to 20 again
6   20    1          20 is talking to 30   12 30 is responding to 20 again
7   20    2          20 is talking to 31    5       31 is responding to 20
8   20    2          20 is talking to 31    6       32 is responding to 20
9   20    2          20 is talking to 31   11 31 is responding to 20 again
10  20    2          20 is talking to 31    4       30 is responding to 20
11  20    2          20 is talking to 31    9 32 is responding to 20 again
12  20    2          20 is talking to 31   12 30 is responding to 20 again
13  20    3          20 is talking to 32    5       31 is responding to 20
14  20    3          20 is talking to 32    6       32 is responding to 20
15  20    3          20 is talking to 32   11 31 is responding to 20 again
16  20    3          20 is talking to 32    4       30 is responding to 20
17  20    3          20 is talking to 32    9 32 is responding to 20 again
18  20    3          20 is talking to 32   12 30 is responding to 20 again
19  20    8    20 is talking to 30 again    5       31 is responding to 20
20  20    8    20 is talking to 30 again    6       32 is responding to 20
21  20    8    20 is talking to 30 again   11 31 is responding to 20 again
22  20    8    20 is talking to 30 again    4       30 is responding to 20
23  20    8    20 is talking to 30 again    9 32 is responding to 20 again
24  20    8    20 is talking to 30 again   12 30 is responding to 20 again
25  20   10    20 is talking to 31 again    5       31 is responding to 20
26  20   10    20 is talking to 31 again    6       32 is responding to 20
27  20   10    20 is talking to 31 again   11 31 is responding to 20 again
28  20   10    20 is talking to 31 again    4       30 is responding to 20
29  20   10    20 is talking to 31 again    9 32 is responding to 20 again
30  20   10    20 is talking to 31 again   12 30 is responding to 20 again
31  20    7    20 is talking to 32 again    5       31 is responding to 20
32  20    7    20 is talking to 32 again    6       32 is responding to 20
33  20    7    20 is talking to 32 again   11 31 is responding to 20 again
34  20    7    20 is talking to 32 again    4       30 is responding to 20
35  20    7    20 is talking to 32 again    9 32 is responding to 20 again
36  20    7    20 is talking to 32 again   12 30 is responding to 20 again
37  21   13          21 is talking to 30   14       30 is responding to 21
38  30    4       30 is responding to 20    1          20 is talking to 30
39  30    4       30 is responding to 20    8    20 is talking to 30 again
40  30    4       30 is responding to 20   13          21 is talking to 30
41  30   14       30 is responding to 21    1          20 is talking to 30
42  30   14       30 is responding to 21    8    20 is talking to 30 again
43  30   14       30 is responding to 21   13          21 is talking to 30
44  30   12 30 is responding to 20 again    1          20 is talking to 30
45  30   12 30 is responding to 20 again    8    20 is talking to 30 again
46  30   12 30 is responding to 20 again   13          21 is talking to 30
47  31    5       31 is responding to 20    2          20 is talking to 31
48  31    5       31 is responding to 20   10    20 is talking to 31 again
49  31   11 31 is responding to 20 again    2          20 is talking to 31
50  31   11 31 is responding to 20 again   10    20 is talking to 31 again
51  32    9 32 is responding to 20 again    3          20 is talking to 32
52  32    9 32 is responding to 20 again    7    20 is talking to 32 again
53  32    6       32 is responding to 20    3          20 is talking to 32
54  32    6       32 is responding to 20    7    20 is talking to 32 again
>

换句话说,我不会像我想的那样将一行与另一行合并。如何将源 id 行与其目标 id 行合并?

谢谢

戴夫

编辑:这是第一个匹配对:

UID|SIP|DIP|PROTOCOL|SPORT|DPORT|PACKETS|BYTES|FLAGS|STIME|DURATION|ETIME|SENSOR|FLOWTYPE|ICMP_TYPE|ICMP_CODE|APPLICATION|INPUT|OUTPUT|TIMEOUT|CONTINUATION|INIT_FLAGS|SESSION_FLAGS|BLACKLIST|WHITELIST|NORMALIZED_DOMAIN|COUNTRY
720109425873|3232248427|3232248333|17|57554|53|1|70|0|2013-01-01 00:00:15.046|0|2013-01-01 00:00:15.046|THERMOPYLAE|6|||0|0|0|0|0|0|0|N|Y|erath.mechesrx.net|NULL
...
720107126014|3232248333|3232248427|17|53|57868|2|238|0|2013-01-01 00:02:15.827|0|2013-01-01 00:02:15.827|THERMOPYLAE|6|||0|0|0|0|0|0|0|N|Y|NULL|NULL
4

2 回答 2

1
library(data.table)
#split your dataset in "talking"  and responding part
#this will need some seconds for several million entries
a <- data.table(df[grep('*talk*',df$notes),],key=c("sip","dip"))
b <- data.table(df[grep('*responding*',df$notes),],key=c("dip","sip"))
#create a second id for each couple
a[,id2:=seq_len(.N),by=key(a)]
b[,id2:=seq_len(.N),by=key(b)]

#merge
setnames(b,c("sip","dip"),c("dip","sip"))
merge(a,b,by=c("sip","dip","id2"),all=TRUE)

#    sip dip id2 id.x                   notes.x id.y                      notes.y
# 1:  20  30   1    1       20 is talking to 30    4       30 is responding to 20
# 2:  20  30   2    8 20 is talking to 30 again   12 30 is responding to 20 again
# 3:  20  31   1    2       20 is talking to 31    5       31 is responding to 20
# 4:  20  31   2   10 20 is talking to 31 again   11 31 is responding to 20 again
# 5:  20  32   1    3       20 is talking to 32    6       32 is responding to 20
# 6:  20  32   2    7 20 is talking to 32 again    9 32 is responding to 20 again
# 7:  21  30   1   13       21 is talking to 30   14       30 is responding to 21

如果有可能一个合作伙伴说了两次而另一个没有回应,我不确定你想如何处理。

于 2013-06-01T17:32:41.413 回答
0

我看到您可能获得太多匹配行的两个原因:

  1. 您只选择了sip/dip作为匹配标准,而它应该是 ( sip, dip)/( dip, sip)。使用by.x=c('sip', 'dip')和对应的by.y.

  2. “说话”行也匹配“再次响应”行,“再次说话”行也匹配“响应”行。这个稍微难解决,让我 arrange(dataframe, ...)plyr优雅的数据帧排序开始介绍。

arrange你的数据,以便相同对等点之间的相关通信是相邻的,并按此顺序分配 ID。

library(plyr)
flowtest_arranged <- arrange(flowtest, pmin(sip, dip), pmax(sip, dip), id)
flowtest_arranged$nid <- seq_along(flowtest_arranged$id)
flowtest_arranged$nid.lag <- flowtest_arranged$nid - 1

现在,如果我们假设您的数据表现良好,因此请求和响应之间不会出现不相关的通信,我们可以使用新nid变量作为附加合并标准:by.x=c('sip', 'dip', 'nid.lag'), by.y=c('dip', 'sip', 'nid'). 这是结果(R 3.0.1):

merge(flowtest_arranged, flowtest_arranged, by.x=c('sip', 'dip', 'nid.lag'),
      by.y=c('dip', 'sip', 'nid'))

   sip dip nid.lag id.x                      notes.x nid id.y
1   20  30       2    8    20 is talking to 30 again   3    4
2   20  31       6   10    20 is talking to 31 again   7    5
3   20  32      10    7    20 is talking to 32 again  11    6
4   30  20       1    4       30 is responding to 20   2    1
5   30  20       3   12 30 is responding to 20 again   4    8
6   30  21      13   14       30 is responding to 21  14   13
7   31  20       5    5       31 is responding to 20   6    2
8   31  20       7   11 31 is responding to 20 again   8   10
9   32  20      11    9 32 is responding to 20 again  12    7
10  32  20       9    6       32 is responding to 20  10    3
                     notes.y nid.lag
1     30 is responding to 20       1
2     31 is responding to 20       5
3     32 is responding to 20       9
4        20 is talking to 30       0
5  20 is talking to 30 again       2
6        21 is talking to 30      12
7        20 is talking to 31       4
8  20 is talking to 31 again       6
9  20 is talking to 32 again      10
10       20 is talking to 32       8
Warning message:
In merge.data.frame(flowtest_arranged, flowtest_arranged, by.x = c("sip",  :
  column name ‘nid.lag’ is duplicated in the result
于 2013-06-01T17:25:04.297 回答