谢谢所有回答我问题的人。尤其是 Ben Lim。
"@version" => "1",
"@timestamp" => "2014-02-20T11:07:28.125Z",
"type" => "syslog",
"host" => "ymkim-SD550",
"path" => "/var/log/syslog",
"ts" => "Feb 20 21:07:27",
"user" => "ymkim",
"func" => "REG",
"8192" => 16,
"8193" => 32,
"8194" => 17,
"8195" => 109
从$ logger REG,2000,4,10,20,11,6d
input {
file {
path => "/var/log/syslog"
type => "syslog"
filter {
grok {
match => ["message", "%{SYSLOGTIMESTAMP:ts} %{SYSLOGHOST:hostname} %{WORD:user}: %{WORD:func},%{WORD:address},%{NUMBER:regNumber},%{GREEDYDATA:regValue}"]
if [func] == "REG" {
modbus_csv {
start_address => "address"
num_register => "regNumber"
source => "regValue"
remove_field => ["regValue", "hostname", "message",
"address", "regNumber"]
output {
stdout { debug => true }
elasticsearch { }
并修改了 csv 过滤器,命名为 modbus_csv.rb。
# encoding: utf-8
require "logstash/filters/base"
require "logstash/namespace"
require "csv"
# CSV filter. Takes an event field containing CSV data, parses it,
# and stores it as individual fields (can optionally specify the names).
class LogStash::Filters::MODBUS_CSV < LogStash::Filters::Base
config_name "modbus_csv"
milestone 2
# The CSV data in the value of the source field will be expanded into a
# datastructure.
config :source, :validate => :string, :default => "message"
# Define a list of column names (in the order they appear in the CSV,
# as if it were a header line). If this is not specified or there
# are not enough columns specified, the default column name is "columnX"
# (where X is the field number, starting from 1).
config :columns, :validate => :array, :default => []
config :start_address, :validate => :string, :default => "0"
config :num_register, :validate => :string, :default => "0"
# Define the column separator value. If this is not specified the default
# is a comma ','.
# Optional.
config :separator, :validate => :string, :default => ","
# Define the character used to quote CSV fields. If this is not specified
# the default is a double quote '"'.
# Optional.
config :quote_char, :validate => :string, :default => '"'
# Define target for placing the data.
# Defaults to writing to the root of the event.
config :target, :validate => :string
def register
# Nothing to do here
end # def register
def filter(event)
return unless filter?(event)
@logger.debug("Running modbus_csv filter", :event => event)
matches = 0
for i in 0..(event[@num_register].hex)
@columns[i] = event[@start_address].hex + i
if event[@source]
if event[@source].is_a?(String)
event[@source] = [event[@source]]
if event[@source].length > 1
@logger.warn("modbus_csv filter only works on fields of length 1",
:source => @source, :value => event[@source],
:event => event)
raw = event[@source].first
values = CSV.parse_line(raw, :col_sep => @separator, :quote_char => @quote_char)
if @target.nil?
# Default is to write to the root of the event.
dest = event
dest = event[@target] ||= {}
values.each_index do |i|
field_name = @columns[i].to_s || "column#{i+1}"
dest[field_name] = values[i].hex
rescue => e
event.tag "_modbus_csvparsefailure"
@logger.warn("Trouble parsing modbus_csv", :source => @source, :raw => raw,
:exception => e)
end # begin
end # if event
@logger.debug("Event after modbus_csv filter", :event => event)
end # def filter
end # class LogStash::Filters::Csv
最后,我得到了我想要的图表。(*func = REG (13) 4096 mean per 10m | (13 hits))