0

我们正在寻找在 warp10 中摄取数据的最佳方式。我们在一个主要使用 Kafka 的微服务架构上。两种解决方案:

正如这里所描述的,我们现在使用 Ingress 解决方案,基于 x 秒的数据聚合,并调用 Ingress API 来发送每个数据包的数据。(而不是每次我们需要插入一些东西时调用 API)。

几天来,我们正在试验 Kafka 插件。我们成功地设置了插件并创建了一个 .mc2 负责使用来自给定主题的数据,然后将它们插入UPDATE到 warp10 中。

问题:

  1. 使用 Kafka 插件,应用与我们使用 Ingress 端点时应用的缓冲机制相同的缓冲机制会更好吗?或者,warp10 Kafka 插件中是否有任何特定实现允许在主题中使用每条消息并调用每个消息的UPDATE函数?
  2. 今天,由于这两种解决方案都在发挥作用,我们正试图找出差异,以便在数据摄取期间获得最佳性能结果。如果可能的话,不必应用任何缓冲机制,因为我们试图尽可能地实时。

MC2 文件:

    {                                                                                                                                                                                                                                                                             
    'topics' [ 'our_topic_name' ] // List of Kafka topics to subscribe to                                                                                                                                                                                                    
    'parallelism' 1                // Number of threads to start for processing the incoming messages. Each thread will handle a certain number of partitions.                                                                                                                  
    'config' { // Map of Kafka consumer parameters                                                                                                                                                                                                                              
        'bootstrap.servers' 'kafka-headless:9092'                                                                                                                                                                                                                                  
        'group.id' 'senx-consumer'                                                                                                                                                                                                                                                 
        'enable.auto.commit' 'true'                                                                                                                                                                                                                                                
    }                                                                                                                                                                                                                                                                          
    'macro' <%                                                                                                                                                                                                                                                                  
        // macro executed each time a kafka record is consumed                                                                                                                                                                                                                          
        /*                                                                                                                                                                                                                                                                        
            // received record format :                                                                                                                                                                                                                                           
            {                                                                                                                                                                                                                                                                     
                'timestamp' 123        // The record timestamp                                                                                                                                                                                                                    
                'timestampType' 'type' // The type of timestamp, can be one of 'NoTimestampType', 'CreateTime', 'LogAppendTime'                                                                                                                                                   
                'topic' 'topic_name'   // Name of the topic which received the message                                                                                                                                                                                            
                'offset' 123           // Offset of the message in 'topic'                                                                                                                                                                                                        
                'partition' 123        // Id of the partition which received the message                                                                                                                                                                                          
                'key' ...              // Byte array of the message key                                                                                                                                                                                                           
                'value' ...            // Byte array of the message value                                                                                                                                                                                                         
                'headers' { }          // Map of message headers                                                                                                                                                                                                                  
            }                                                                                                                                                                                                                                                                     
        */                                                                                                                                                                                                                                                                        
        "recordArray" STORE                                                                                                                                                                                                                                                       
                                                                                                                                                                                                                                                                                    
        "preprod.write" "token" STORE                                                                                                                                                                                                                                                 

        // macro can be called on timeout with an empty entry map
        $recordArray SIZE 0 !=
        <%
            $recordArray 'value' GET // kafka record value is retrieved in bytes
            'UTF-8' BYTES-> // convert bytes to string (WARP10 INGRESS format)
            JSON->
            "value" STORE
            "Records received through Kafka" LOGMSG
            $value LOGMSG
            $value
            <%
                DROP
                PARSE
                // PARSE outputs a gtsList, including only one gts
                0 GET
                // GTS rename is required to use UPDATE function
                "gts" STORE
                $gts $gts NAME RENAME
            %>
            LMAP

            // Store GTS in Warp10
            $token
            UPDATE                                                                                                                                                                                                                                                                    
        %>                                                                                                                                                                                                                                                                       
        IFT                                                                                                                                                                                                                                                                      
    %> // end macro                                                                                                                                                                                                                                                            
    'timeout' 10000 // Polling timeout (in ms), if no message is received within this delay, the macro will be called with an empty map as input                                                                                                                 
}
4

1 回答 1

0

如果您想在 Warp 10 中缓存某些内容以避免每秒进行大量更新,您可以使用SHM(共享内存)。这是您需要激活的内置扩展。

激活后,将其与 SHMSTORE 和 SHMLOAD 一起使用,以在两次 WarpScript 执行之间将对象保存在 RAM 中。

在您的示例中,您可以将所有传入的 GTS 推送到一个列表或 GTS 列表中,+!用于将元素附加到现有列表中。

缓存中所有 GTS 的 MERGE(按名称 + 标签)和数据库中的 UPDATE 然后可以在运行器中完成(不要忘记使用 MUTEX)

不要忘记总运营成本:

  • 如果您不重复类名和标签,并且如果您收集每个 gts 的行,则可以针对摄取速度优化入口格式。见这里
  • PARSE 反序列化来自 Warp 10 入口格式的数据。
  • UPDATE 将数据序列化为 Warp 10 优化的入口格式(并将其推送到更新端点)。
  • 更新端点再次反序列化。

如果您的输入数据远离最佳入口格式,则执行这些反序列化/序列化/反序列化操作是有意义的。如果您想对数据进行 RANGECOMPACT 以节省磁盘空间或进行任何预处理,这也是有意义的。

于 2021-07-28T13:28:37.870 回答