我们正在寻找在 warp10 中摄取数据的最佳方式。我们在一个主要使用 Kafka 的微服务架构上。两种解决方案:
- 使用此处定义的Ingress端点: https ://www.warp10.io/content/03_Documentation/03_Interacting_with_Warp_10/03_Ingesting_data/01_Ingress (这是我们现在使用的解决方案)
- 使用此处定义的warp10 Kafka 插件: https ://blog.senx.io/introducing-the-warp-10-kafka-plugin/
正如这里所描述的,我们现在使用 Ingress 解决方案,基于 x 秒的数据聚合,并调用 Ingress API 来发送每个数据包的数据。(而不是每次我们需要插入一些东西时调用 API)。
几天来,我们正在试验 Kafka 插件。我们成功地设置了插件并创建了一个 .mc2 负责使用来自给定主题的数据,然后将它们插入UPDATE
到 warp10 中。
问题:
- 使用 Kafka 插件,应用与我们使用 Ingress 端点时应用的缓冲机制相同的缓冲机制会更好吗?或者,warp10 Kafka 插件中是否有任何特定实现允许在主题中使用每条消息并调用每个消息的
UPDATE
函数? - 今天,由于这两种解决方案都在发挥作用,我们正试图找出差异,以便在数据摄取期间获得最佳性能结果。如果可能的话,不必应用任何缓冲机制,因为我们试图尽可能地实时。
MC2 文件:
{
'topics' [ 'our_topic_name' ] // List of Kafka topics to subscribe to
'parallelism' 1 // Number of threads to start for processing the incoming messages. Each thread will handle a certain number of partitions.
'config' { // Map of Kafka consumer parameters
'bootstrap.servers' 'kafka-headless:9092'
'group.id' 'senx-consumer'
'enable.auto.commit' 'true'
}
'macro' <%
// macro executed each time a kafka record is consumed
/*
// received record format :
{
'timestamp' 123 // The record timestamp
'timestampType' 'type' // The type of timestamp, can be one of 'NoTimestampType', 'CreateTime', 'LogAppendTime'
'topic' 'topic_name' // Name of the topic which received the message
'offset' 123 // Offset of the message in 'topic'
'partition' 123 // Id of the partition which received the message
'key' ... // Byte array of the message key
'value' ... // Byte array of the message value
'headers' { } // Map of message headers
}
*/
"recordArray" STORE
"preprod.write" "token" STORE
// macro can be called on timeout with an empty entry map
$recordArray SIZE 0 !=
<%
$recordArray 'value' GET // kafka record value is retrieved in bytes
'UTF-8' BYTES-> // convert bytes to string (WARP10 INGRESS format)
JSON->
"value" STORE
"Records received through Kafka" LOGMSG
$value LOGMSG
$value
<%
DROP
PARSE
// PARSE outputs a gtsList, including only one gts
0 GET
// GTS rename is required to use UPDATE function
"gts" STORE
$gts $gts NAME RENAME
%>
LMAP
// Store GTS in Warp10
$token
UPDATE
%>
IFT
%> // end macro
'timeout' 10000 // Polling timeout (in ms), if no message is received within this delay, the macro will be called with an empty map as input
}