2

我在使用 Storm 2.1.0 将 Storm 工作人员/拓扑日志转换为与 ELK 兼容的格式 (JSON) 时遇到了一些困难。

这些是我当前的工作人员日志配置:

cluster.xml

<Configuration monitorInterval="60" shutdownHook="disable" packages="ch.qos.logback.core">
    <Properties>
        <property name="logstash">+ %msg +%n</property> <!-- only for 'bug' demonstration purposes -->
    </Properties>
    <Appenders>
        <Console name="CONSOLE">
            <PatternLayout>
                <Pattern>${logstash}</Pattern>
            </PatternLayout>
        </Console>
    </Appenders>
    <Loggers>
        <Root level="INFO">
            <AppenderRef ref="CONSOLE" />
        </Root>
    </Loggers>
</Configuration>

worker.xml

<Configuration monitorInterval="60" shutdownHook="disable" packages="ch.qos.logback.core">
        <Properties>
            <property name="logstash">{"@timestamp":"%d{yyyy-MM-ddTHH:mm:ss.SSS}", "logger": "%logger", "message":"%msg","thread_name":"%t","level":"%level"}%n</property>
    </Properties>
    <Appenders>
        <Console name="CONSOLE">
            <PatternLayout>
                    <Pattern>${logstash}</Pattern>
            </PatternLayout>
        </Console>
    </Appenders>
    <Loggers>
        <Logger name="org.com.package" level="DEBUG" additivity="false">
            <AppenderRef ref="CONSOLE"/>
        </Logger>
        <Root level="INFO">
            <AppenderRef ref="CONSOLE"/>
        </Root>
    </Loggers>
</Configuration>

根据我的这种配置,我希望日志消息的格式类似于以下内容(为清楚起见,添加了换行符:

{
  "@timestamp": "2020-02-11 11:32:40,748",
  "logger": "org.com.package.aggregation.SomeAggregation",
  "message": "joinStateStream: values: [33333333-4e30-49a6-8e1c-f7817633bb34, 7c777777-a622-4ae4-a504-2490db47cafe, 2020-02-11]",
  "thread_name": "Thread-23-b-25-joinState-groupedQ-revQ-executor[22, 22]",
  "level": "DEBUG"
}

但是,似乎消息正在被主管进程甚至工作人员本身“包装”。我得到的日志消息如下所示:

(为可读性添加了换行符/空格)

+ Worker Process 273c05df-f087-43ca-a59a-e281bae98ab1:  
{  
    "@timestamp":"2020-02-11 11:32:40,748",
    "logger": "STDERR",
    "message":
        "{
            "@timestamp":"2020-02-11 11:32:40,748",
            "logger": "org.com.package.aggregation.SomeAggregation",
            "message":"joinStateStream: values: [33333333-4e30-49a6-8e1c-f7817633bb34, 7c777777-a622-4ae4-a504-2490db47cafe, 2020-02-11]",
            "thread_name":"Thread-23-b-25-joinState-groupedQ-revQ-executor[22, 22]",
            "level":"DEBUG"
        }",  
    "thread_name":"Thread-2",
    "level":"INFO"} +

这里似乎发生了几件事(向内):

  1. 主管似乎包装了所有工作人员消息并在它们前面加上Worker Process <worker-id>:. 可以通过+包装整个消息的 来注意到这一点。
  2. 工人似乎以某种方式包装了自己的日志消息。该message部分日志还包含另一条日志消息。

至于我的问题:

  1. 我可以禁用此主管日志前缀吗?查看 中的源代码org.apache.storm.daemon.supervisor.BasicContainer#launch,这似乎是硬编码的。我无法想象正确的拓扑部署会导致添加到每个日志消息中的硬编码前缀。

  2. 这个工人消息包装怎么会发生?正如我所看到的,“wrappee”是我的实际拓扑消息(我希望解析),而“wrapper”则完全是另外一回事(使用记录器STDERR?为什么要记录到STDERR?使用级别INFO???)

基本上,我想在拓扑执行期间简单地记录一些消息并控制这些消息的格式。我怎样才能用 Storm 可靠地做到这一点?

4

0 回答 0