Logstash的简介、安装、配置、Pipeline、插件

一. 简介

官方文档:https://www.elastic.co/guide/en/logstash/current/getting-started-with-logstash.html

  • Logstash是一个开源数据收集引擎,具有实时管道功能。
  • Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地
  • Logstash 是一款强大的数据处理工具,它可以实现数据传输,格式处理,格式化输出,还有强大的插件功能,常用于日志处理。
  • Logstash耗资源较大,运行占用CPU和内存高。另外没有消息队列缓存,存在数据丢失隐患
  • Logstash使用Ruby语言编写的运行在Java虚拟机上的具有收集、分析和转发数据流功能的工具
  • Logstash使用Pipeline方式进行日志的搜集,处理和输出

Event:logstash将数据流中的每一条数据在input处被转换为event,在output处event再被转换为目标格式的数据

  • Inputs:用于从数据源获取Event。每个Input启动一个线程,从对应数据源获取数据,将数据写入一个队列
  • Filters:用于过滤、修改Event
  • Outputs:负责输出Event到其他系统中

Logstash使用Pipeline流水线的形式来处理数据Event事件,大致流程如下

“logstash workflow”的图片搜索结果

其中inputs和outputs支持codecs(coder&decoder)在1.3.0 版之前,logstash 只支持纯文本形式输入,然后用filter处理它。但现在,我们可以在输入期间处理不同类型的数据。所以现在的数据处理流程

1570778943330

1570786478086

箭头代表数据流向。可以有多个input。中间的queue负责将数据分发到不通的pipline中,每个pipline由batcher,filter和output构成。batcher的作用是批量从queue中取数据(可配置)。

logstash数据流历程

  1. 首先有一个输入数据,例如是一个web.log文件,其中每一行都是一条数据。file imput会从文件中取出数据,然后通过json codec将数据转换成logstash event。
  2. 这条event会通过queue流入某一条pipline处理线程中,首先会存放在batcher中。当batcher达到处理数据的条件(如一定时间或event一定规模)后,batcher会把数据发送到filter中,filter对event数据进行处理后转到output,output就把数据输出到指定的输出位置。
  3. 输出后还会返回ACK给queue,包含已经处理的event,queue会将已处理的event进行标记。

1570786873214

1570786908829

queue分类

  • In Memory: 在内存中,固定大小,无法处理进程crash. 机器宕机等情况,会导致数据丢失。
  • Persistent Queue:可处理进程crash情况,保证数据不丢失。保证数据至少消费一次;充当缓冲区,可代替kafka等消息队列作用。
  • Dead Letter Queues:存放logstash因数据类型错误等原因无法处理的Event

Persistent Queue(PQ)处理流程

  1. 一条数据经由input进入PQ,PQ将数据备份在disk,然后PQ响应input表示已收到数据;
  2. 数据从PQ到达filter/output,其处理到事件后返回ACK到PQ;
  3. PQ收到ACK后删除磁盘的备份数据;

二. 安装

1. 安装Java环境

在一些Linux环境下,必须设置JAVA_HOME环境变量,否则Logstash在安装期间没有检测到JAVA_HOME环境变量,会报错并且启动不起来服务。如果JDK目录在/opt下,则 在/usr/bin/下建立软连接指向JAVA_HOME/bin路径下的java

2. 安装Logstash

YUM/RPM

[elasticsearch-7.x]
name=Elasticsearch repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md

yum install -y logstash-7.2.0

手动下载RPM安装,官方下载链接:https://www.elastic.co/downloads/logstash

yum localinstall -y logstash-7*.rpm

RPM包安装后各个配置文件的位置

Type Description Default Location Setting
home Home directory of the Logstash installation. /usr/share/logstash
bin Binary scripts including logstash to start Logstash and logstash-plugin to install plugins /usr/share/logstash/bin
settings Configuration files, including logstash.yml, jvm.options, and startup.options /etc/logstash path.settings
conf Logstash pipeline configuration files /etc/logstash/conf.d/*.conf See /etc/logstash/pipelines.yml
logs Log files /var/log/logstash path.logs
plugins Local, non Ruby-Gem plugin files. Each plugin is contained in a subdirectory. Recommended for development only. /usr/share/logstash/plugins path.plugins
data Data files used by logstash and its plugins for any persistence needs. /var/lib/logstash path.data

二进制包

二进制包中各个配置文件的位置

Type Description Default Location Setting
home Home directory of the Logstash installation. {extract.path}- Directory created by unpacking the archive
bin Binary scripts, including logstash to start Logstash and logstash-plugin to install plugins {extract.path}/bin
settings Configuration files, including logstash.yml and jvm.options {extract.path}/config path.settings
logs Log files {extract.path}/logs path.logs
plugins Local, non Ruby-Gem plugin files. Each plugin is contained in a subdirectory. Recommended for development only. {extract.path}/plugins path.plugins
data Data files used by logstash and its plugins for any persistence needs. {extract.path}/data path.data

3. 启动

以服务形式或命令启动Logstash

systemctl start logstash 
#后台会起一个名叫org.jruby.Main的Java后台进程,用jps -l查看
jps -l

使用二进制执行文件启动

/user/share/logstash/bin/logstash -f logstash.conf --config.reload.automatic

#-f 指定配置文件路径
#--config.reload.automatic 自动检测加载配置文件,该参数在有-e参数是不生效  
#--config.reload.interval <seconds> 设置多少秒检测一次配置文件  如果Logstash启动时没有配置自动加载配置文件,重启进程时加上。

4. 验证

/usr/share/logstash/bin/logstash -e 'input { stdin { } } output { stdout {} }'
#参数-e:直接从命令行定义配置信息
#配置从标准输入读取输入,然后输出到标准输出

stdin > hello world
stdout> 2013-11-21T01:22:14.405+0000 0.0.0.0 hello world

#Logstash会在消息上添加时间戳和IP地址
#Ctrl+D 退出Logstash

5. 命令行参数

参数 描述 默认值
-r, --config.reload.automatic Monitor configuration changes and reload whenever it is changed. NOTE: use SIGHUP to manually reload the config false
-n,
--node.name NAME
Specify the name of this logstash instance, if no value is given it will default to the current hostname. 当前主机名
-f,
--path.config CONFIG_PATH
Load the logstash config from a specific file or directory. If a directory is given, all files in that directory will be concatenated in lexicographical order and then parsed as a single config file. You can also specify wildcards (globs) and any matched files will be loaded in the order described above.
-e,
--config.string CONFIG_STRING
Use the given string as the configuration data. Same syntax as the config file. If no input is pecified, then the following is used as the default input: "input { stdin { type => stdin } }" and if no output is specified, then the following is used as the default output: "output { stdout { codec => rubydebug } }" If you wish to use both defaults, please use the empty string for the '-e' flag. nil
--log.level LEVEL Set the log level for logstash. Possible values are: fatal error warn info debug trace (default: "info")
-l,
--path.logs PATH
Write logstash internal logs to the given file. Without this flag, logstash will emit logs to standard output. /usr/share/logstash/logs
-t,
--config.test_and_exit
Check configuration for valid syntax and then exit. false
--config.reload.interval RELOAD_INTERVAL How frequently to poll the configuration location for changes, in seconds 3000000000
--http.host HTTP_HOST Web API binding host 127.0.0.1
--http.port HTTP_PORT Web API http port 9600..9700
--log.format FORMAT Specify if Logstash should write its own logs in JSON form (one event per line) or in plain text (using Ruby's Object#inspect) plain
--path.settings SETTINGS_DIR Directory containing logstash.yml file. This can also be set through the LS_SETTINGS_DIR environment variable /usr/share/logstash/config
-p,
--path.plugins PATH
A path of where to find plugins. This flag can be given multiple times to include multiple paths. Plugins are expected to be in a specific directory hierarchy: 'PATH/logstash/TYPE/NAME.rb' where TYPE is 'inputs' 'filters', 'outputs' or 'codecs' and NAME is the name of the plugin. []
--path.data PATH This should point to a writable directory. Logstash will use this directory whenever it needs to store data. Plugins will also have access to this path. /usr/share/logstash/data
-u,
--pipeline.batch.delay DELAY_IN_MS
When creating pipeline batches, how long to wait while polling for the next event. 50
--pipeline.id ID Sets the ID of the pipeline. main
-b,
--pipeline.batch.size SIZE
Size of batches the pipeline is to work in. 125
-V, --version Emit the version of logstash and its friends, then exit.
-M,
--modules.variable MODULES_VARIABLE
Load variables for module template. Multiple instances of '-M' or '--modules.variable' are supported. Ignored if '--modules' flag is not used. Should be in the format of '-M "MODULE_NAME.var.PLUGIN_TYPE.PLUGIN_NAME.VARIABLE_NAME=VALUE"' as in '-M "example.var.filter.mutate.fieldname=fieldvalue"'
--modules MODULES Load Logstash modules. Modules can be defined using multiple instances '--modules module1 --modules module2', or comma-separated syntax '--modules=module1,module2' Cannot be used in conjunction with '-e' or '-f' Use of '--modules' will override modules declared in the 'logstash.yml' file.
--setup Load index template into Elasticsearch, and saved searches, index-pattern, visualizations, and dashboards into Kibana when running modules. false
-w,
--pipeline.workers COUNT
Sets the number of pipeline workers to run. 20
--config.debug Print the compiled config ruby code out as a debug log (you must also have --log.level=debug enabled). WARNING: This will include any 'password' options passed to plugin configs as plaintext, and may result in plaintext passwords appearing in your logs! false
--pipeline.unsafe_shutdown Force logstash to exit during shutdown even if there are still inflight events in memory. By default, logstash will refuse to quit until all received events have been pushed to the outputs. false
--java-execution Use Java execution engine. true
-i, --interactive SHELL Drop to shell instead of running as normal. Valid shells are "irb" and "pry"
--verbose Set the log level to info.

三. Docker镜像

docker pull docker.elastic.co/logstash/logstash:7.4.0

docker pull logstash:7.4.0

镜像中各个配置文件的位置

Type Description Default Location Setting
home Home directory of the Logstash installation. /usr/share/logstash
bin Binary scripts, including logstash to start Logstash and logstash-plugin to install plugins /usr/share/logstash/bin
settings Configuration files, including logstash.yml and jvm.options /usr/share/logstash/config path.settings
conf Logstash pipeline configuration files /usr/share/logstash/pipeline path.config
plugins Local, non Ruby-Gem plugin files. Each plugin is contained in a subdirectory. Recommended for development only. /usr/share/logstash/plugins path.plugins
data Data files used by logstash and its plugins for any persistence needs. /usr/share/logstash/data path.data

Note:基于该镜像启动的容器,日志是直接输出到控制台的,无法直接输出到日志文件中

  • docker镜像是基于.tar.gz格式的二进制包创建的

  • 将pipeline文件挂载到/usr/share/logstash/pipeline/下启动

    docker run --rm -it \
    -v ./test.conf:/usr/share/logstash/pipeline/test.conf \
    docker.elastic.co/logstash/logstash:7.4.0
    
  • 默认pipeline文件:/usr/share/logstash/pipeline/logstash.conf

    input {
      beats {
        port => 5044
      }
    }
    output {
      stdout {
        codec => rubydebug
      }
    }
    

    也就是说如果不配置挂载pipeline文件就直接启动容器,logstash将启动一个最小化的pipeline:Beat Input ---> Stdout Output

  • 可通过设置环境变量的形式配置logstash。

    docker run --rm -it -e PIPELINE_WORKERS:2 docker.elastic.co/logstash/logstash:7.4.0。例如以下环境变量对应的logstash配置

Environment Variable Logstash Setting
PIPELINE_WORKERS pipeline.workers
LOG_LEVEL log.level
XPACK_MONITORING_ENABLED xpack.monitoring.enabled
  • logstash docker 镜像中的默认配置
http.host 0.0.0.0
xpack.monitoring.elasticsearch.hosts http://elasticsearch:9200

四. 配置

Logstash配置文件中配置项的格式是基于YAML语法,例如:

pipeline:
  batch:
    size: 125
    delay: 50

也可以使用平级格式

pipeline.batch.size: 125
pipeline.batch.delay: 50

配置项的值可以引用系统级别的环境变量

pipeline.batch.size: ${BATCH_SIZE}
pipeline.batch.delay: ${BATCH_DELAY:50}
node.name: "node_${LS_NODE_NAME}"
path.queue: "/tmp/${QUEUE_DIR:queue}"

如果设置多个自定义的配置项时,推荐使用以下格式

modules:
  - name: MODULE_NAME1
    var.PLUGIN_TYPE1.PLUGIN_NAME1.KEY1: VALUE
    var.PLUGIN_TYPE1.PLUGIN_NAME1.KEY2: VALUE
    var.PLUGIN_TYPE2.PLUGIN_NAME2.KEY1: VALUE
    var.PLUGIN_TYPE3.PLUGIN_NAME3.KEY1: VALUE
  - name: MODULE_NAME2
    var.PLUGIN_TYPE1.PLUGIN_NAME1.KEY1: VALUE
    var.PLUGIN_TYPE1.PLUGIN_NAME1.KEY2: VALUE

常见的logstash配置

Setting Description Default value
node.name A descriptive name for the node. Machine’s hostname
path.data The directory that Logstash and its plugins use for any persistent needs. LOGSTASH_HOME/data
pipeline.id The ID of the pipeline. main
pipeline.java_execution Use the Java execution engine. true
pipeline.workers The number of workers that will, in parallel, execute the filter and output stages of the pipeline. If you find that events are backing up, or that the CPU is not saturated, consider increasing this number to better utilize machine processing power. Number of the host’s CPU cores
pipeline.batch.size The maximum number of events an individual worker thread will collect from inputs before attempting to execute its filters and outputs. Larger batch sizes are generally more efficient, but come at the cost of increased memory overhead. You may need to increase JVM heap space in the jvm.options config file. See Logstash Configuration Files for more info. 125
pipeline.batch.delay When creating pipeline event batches, how long in milliseconds to wait for each event before dispatching an undersized batch to pipeline workers. 50
pipeline.unsafe_shutdown When set to true, forces Logstash to exit during shutdown even if there are still inflight events in memory. By default, Logstash will refuse to quit until all received events have been pushed to the outputs. Enabling this option can lead to data loss during shutdown. false
pipeline.plugin_classloaders (Beta) Load Java plugins in independent classloaders to isolate their dependencies. false
path.config The path to the Logstash config for the main pipeline. If you specify a directory or wildcard, config files are read from the directory in alphabetical order. Platform-specific. See Logstash Directory Layout.
config.string A string that contains the pipeline configuration to use for the main pipeline. Use the same syntax as the config file. None
config.test_and_exit When set to true, checks that the configuration is valid and then exits. Note that grok patterns are not checked for correctness with this setting. Logstash can read multiple config files from a directory. If you combine this setting with log.level: debug, Logstash will log the combined config file, annotating each config block with the source file it came from. false
config.reload.automatic When set to true, periodically checks if the configuration has changed and reloads the configuration whenever it is changed. This can also be triggered manually through the SIGHUP signal. false
config.reload.interval How often in seconds Logstash checks the config files for changes. 3s
config.debug When set to true, shows the fully compiled configuration as a debug log message. You must also set log.level: debug. WARNING: The log message will include any password options passed to plugin configs as plaintext, and may result in plaintext passwords appearing in your logs! false
config.support_escapes When set to true, quoted strings will process the following escape sequences: \n becomes a literal newline (ASCII 10). \r becomes a literal carriage return (ASCII 13). \t becomes a literal tab (ASCII 9). \\ becomes a literal backslash \. \" becomes a literal double quotation mark. \' becomes a literal quotation mark. false
modules When configured, modules must be in the nested YAML structure described above this table. None
queue.type The internal queuing model to use for event buffering. Specify memory for legacy in-memory based queuing, or persisted for disk-based ACKed queueing (persistent queues). memory
path.queue The directory path where the data files will be stored when persistent queues are enabled (queue.type: persisted). path.data/queue
queue.page_capacity The size of the page data files used when persistent queues are enabled (queue.type: persisted). The queue data consists of append-only data files separated into pages. 64mb
queue.max_events The maximum number of unread events in the queue when persistent queues are enabled (queue.type: persisted). 0 (unlimited)
queue.max_bytes The total capacity of the queue in number of bytes. Make sure the capacity of your disk drive is greater than the value you specify here. If both queue.max_events and queue.max_bytes are specified, Logstash uses whichever criteria is reached first. 1024mb (1g)
queue.checkpoint.acks The maximum number of ACKed events before forcing a checkpoint when persistent queues are enabled (queue.type: persisted). Specify queue.checkpoint.acks: 0 to set this value to unlimited. 1024
queue.checkpoint.writes The maximum number of written events before forcing a checkpoint when persistent queues are enabled (queue.type: persisted). Specify queue.checkpoint.writes: 0 to set this value to unlimited. 1024
queue.checkpoint.retry When enabled, Logstash will retry once per attempted checkpoint write for any checkpoint writes that fail. Any subsequent errors are not retried. This is a workaround for failed checkpoint writes that have been seen only on filesystems with non-standard behavior such as SANs and is not recommended except in those specific circumstances. false
queue.drain When enabled, Logstash waits until the persistent queue is drained before shutting down. false
dead_letter_queue.enable Flag to instruct Logstash to enable the DLQ feature supported by plugins. false
dead_letter_queue.max_bytes The maximum size of each dead letter queue. Entries will be dropped if they would increase the size of the dead letter queue beyond this setting. 1024mb
path.dead_letter_queue The directory path where the data files will be stored for the dead-letter queue. path.data/dead_letter_queue
http.host The bind address for the metrics REST endpoint. "127.0.0.1"
http.port The bind port for the metrics REST endpoint. 9600
log.level 设置Logstash日志输出级别 可用值:fatal error warn info debug trace info
log.format The log format. Set to json to log in JSON format, or plain to use Object#.inspect. plain
path.logs The directory where Logstash will write its log to. LOGSTASH_HOME/logs
path.plugins Where to find custom plugins. You can specify this setting multiple times to include multiple paths. Plugins are expected to be in a specific directory hierarchy: PATH/logstash/TYPE/NAME.rb where TYPE is inputs, filters, outputs, or codecs, and NAME is the name of the plugin. Platform-specific. See Logstash Directory Layout.

五. Pipeline

1. 配置项结构

Logstash Pipeline文件的配置项分为三个部分:

input{
  input插件{
      插件配置项
  }
}
filter{
  filter插件{
      插件配置项
  }
}
output{
  output插件{
      插件配置项
  }
}

Note:

  1. 如果在filter中添加了多种处理规则,则按照它的顺序一一处理,但是有一些插件并不是线程安全的。

  2. 如果在filter中指定了两个一样的的插件,这两个任务并不能保证准确的按顺序执行,因此官方也推荐避免在filter中重复使用插件。

2. 插件的条件控制

官方文档:https://www.elastic.co/guide/en/logstash/6.7/event-dependent-configuration.html#conditionals

有时需要在特定条件下过滤或输出事件。为此,您可以使用条件(conditional)来决定filter和output处理特定的事件。比如在elk系统中想要添加一个type类型的关键字来根据不同的条件赋值,最后好做统计。条件语支持if,else if和else语句并且可以嵌套。

条件语法

if EXPRESSION {
  ...
} else if EXPRESSION {
  ...
} else {
  ...
}

操作符

  • 比较操作

    • 相等: ==, !=, <, >, <=, >=
    • 正则: `=~(匹配正则), !~(不匹配正则)
    • 包含:in(包含), not in(不包含)
  • 布尔操作

    • and(与), or(或), nand(非与), xor(非或)
  • 一元运算符

    • !(取反)
    • ()(复合表达式), !()(对复合表达式结果取反)

示例


filter {
  if [foo] in [foobar] {
    mutate { add_tag => "field in field" }
  }
  if [foo] in "foo" {
    mutate { add_tag => "field in string" }
  }
  if "hello" in [greeting] {
    mutate { add_tag => "string in field" }
  }
  if [foo] in ["hello", "world", "foo"] {
    mutate { add_tag => "field in list" }
  }
  if [missing] in [alsomissing] {
    mutate { add_tag => "shouldnotexist" }
  }
  if !("foo" in ["hello", "world"]) {
    mutate { add_tag => "shouldexist" }
  }
  if [message] =~ /\w+\s+\/\w+(\/learner\/course\/)/ {
    mutate {
      add_field => { "learner_type" => "course" }
    }
  }

  mutate { add_field => { "show" => "This data will be in the output" } }
  mutate { add_field => { "[@metadata][test]" => "Hello" } }
  mutate { add_field => { "[@metadata][no_show]" => "This data will not be in the output" } }
}

output {
  if "_grokparsefailure" not in [tags] {
    elasticsearch { ... }
  }
  if [@metadata][test] == "Hello" {
    stdout { codec => rubydebug }
  }
  if [loglevel] == "ERROR" and [deployment] == "production" {
    pagerduty {
    ...
    }
  }
}

注意:

如果if[foo] in "String"在执行这样的语句时无法把该字段值转化成String类型。所以最好要加field if exist判断

if ["foo"] {
  mutate {
    add_field => "bar" => "%{foo}"
  }
}

3. 引用event中的字段

  • 直接引用字段,使用[],嵌套字段使用多层[][]即可

    {
        "a": "1",
        "b": "2",
        "c": {
            "c1": "3"
        }
    }
    ----------Pipeline中引用Event中的字段--------------
    if [b] =~ "2" {
        ..........
    }
    if [c][c1] == "3" {
        ...........
    }
    
  • 在字符串中以sprintf方式引用,使用%{}

    {
        "a": "1",
        "b": "2",
        "c": {
            "c1": "3"
        }
    }
    ----------Pipeline中引用Event中的字段--------------
    add_field => {
      "test" => "test: %{b}"
    }
    add_field => {
      "test" => "test: %{[c][c1]}"
    }
    

六. Input插件

插件一览表

Plugin Description Github repository
azure_event_hubs Receives events from Azure Event Hubs azure_event_hubs
beats Receives events from the Elastic Beats framework logstash-input-beats
cloudwatch Pulls events from the Amazon Web Services CloudWatch API logstash-input-cloudwatch
couchdb_changes Streams events from CouchDB’s _changes URI logstash-input-couchdb_changes
dead_letter_queue read events from Logstash’s dead letter queue logstash-input-dead_letter_queue
elasticsearch Reads query results from an Elasticsearch cluster logstash-input-elasticsearch
exec Captures the output of a shell command as an event logstash-input-exec
file Streams events from files logstash-input-file
ganglia Reads Ganglia packets over UDP logstash-input-ganglia
gelf Reads GELF-format messages from Graylog2 as events logstash-input-gelf
generator Generates random log events for test purposes logstash-input-generator
github Reads events from a GitHub webhook logstash-input-github
google_cloud_storage Extract events from files in a Google Cloud Storage bucket logstash-input-google_cloud_storage
google_pubsub Consume events from a Google Cloud PubSub service logstash-input-google_pubsub
graphite Reads metrics from the graphite tool logstash-input-graphite
heartbeat Generates heartbeat events for testing logstash-input-heartbeat
http Receives events over HTTP or HTTPS logstash-input-http
http_poller Decodes the output of an HTTP API into events logstash-input-http_poller
imap Reads mail from an IMAP server logstash-input-imap
irc Reads events from an IRC server logstash-input-irc
java_generator Generates synthetic log events core plugin
java_stdin Reads events from standard input core plugin
jdbc Creates events from JDBC data logstash-input-jdbc
jms Reads events from a Jms Broker logstash-input-jms
jmx Retrieves metrics from remote Java applications over JMX logstash-input-jmx
kafka Reads events from a Kafka topic logstash-input-kafka
kinesis Receives events through an AWS Kinesis stream logstash-input-kinesis
log4j Reads events over a TCP socket from a Log4j SocketAppender object logstash-input-log4j
lumberjack Receives events using the Lumberjack protocl logstash-input-lumberjack
meetup Captures the output of command line tools as an event logstash-input-meetup
pipe Streams events from a long-running command pipe logstash-input-pipe
puppet_facter Receives facts from a Puppet server logstash-input-puppet_facter
rabbitmq Pulls events from a RabbitMQ exchange logstash-input-rabbitmq
redis Reads events from a Redis instance logstash-input-redis
relp Receives RELP events over a TCP socket logstash-input-relp
rss Captures the output of command line tools as an event logstash-input-rss
s3 Streams events from files in a S3 bucket logstash-input-s3
salesforce Creates events based on a Salesforce SOQL query logstash-input-salesforce
snmp Polls network devices using Simple Network Management Protocol (SNMP) logstash-input-snmp
snmptrap Creates events based on SNMP trap messages logstash-input-snmptrap
sqlite Creates events based on rows in an SQLite database logstash-input-sqlite
sqs Pulls events from an Amazon Web Services Simple Queue Service queue logstash-input-sqs
stdin Reads events from standard input logstash-input-stdin
stomp Creates events received with the STOMP protocol logstash-input-stomp
syslog Reads syslog messages as events logstash-input-syslog
tcp Reads events from a TCP socket logstash-input-tcp
twitter Reads events from the Twitter Streaming API logstash-input-twitter
udp Reads events over UDP logstash-input-udp
unix Reads events over a UNIX socket logstash-input-unix
varnishlog Reads from the varnish cache shared memory log logstash-input-varnishlog
websocket Reads events from a websocket logstash-input-websocket
wmi Creates events based on the results of a WMI query logstash-input-wmi
xmpp Receives events over the XMPP/Jabber protocol logstash-input-xmpp

插件通用配置项

参数 参数值类型 必须 默认值 详解
add_field hash No {} 向事件添加字段。
codec codec No plain 用于输入数据的编解码器,在输入数据之前,输入编解码器是一种方便的解码方法,不需要在你的Logstash管道中使用单独的过滤器
enable_metric boolean No true 禁用或启用这个特定插件实例的指标日志,默认情况下,我们记录所有我们可以记录的指标,但是你可以禁用特定插件的指标集合。
id string No 向插件配置添加唯一的ID,如果没有指定ID,则Logstash将生成一个,强烈建议在配置中设置此ID,当你有两个或多个相同类型的插件时,这一点特别有用。例如,如果你有两个log4j输入,在本例中添加一个命名ID将有助于在使用监视API时监视Logstash。input { kafka { id => "my_plugin_id" }}
tags array No 向事件添加任意数量的标记,这有助于以后的处理。
type string No 向该输入处理的所有事件添加type字段,类型主要用于过滤器激活,该type作为事件本身的一部分存储,因此你也可以使用该类型在Kibana中搜索它。如果你试图在已经拥有一个type的事件上设置一个type(例如,当你将事件从发送者发送到索引器时),那么新的输入将不会覆盖现有的type,发送方的type集在其生命周期中始终与该事件保持一致,甚至在发送到另一个Logstash服务器时也是如此。

七. Filter插件

插件一览表

Plugin Description Github repository
aggregate Aggregates information from several events originating with a single task logstash-filter-aggregate
alter Performs general alterations to fields that the mutate filter does not handle logstash-filter-alter
bytes Parses string representations of computer storage sizes, such as "123 MB" or "5.6gb", into their numeric value in bytes logstash-filter-bytes
cidr Checks IP addresses against a list of network blocks logstash-filter-cidr
cipher Applies or removes a cipher to an event logstash-filter-cipher
clone Duplicates events logstash-filter-clone
csv Parses comma-separated value data into individual fields logstash-filter-csv
date Parses dates from fields to use as the Logstash timestamp for an event logstash-filter-date
de_dot Computationally expensive filter that removes dots from a field name logstash-filter-de_dot
dissect Extracts unstructured event data into fields using delimiters logstash-filter-dissect
dns Performs a standard or reverse DNS lookup logstash-filter-dns
drop Drops all events logstash-filter-drop
elapsed Calculates the elapsed time between a pair of events logstash-filter-elapsed
elasticsearch Copies fields from previous log events in Elasticsearch to current events logstash-filter-elasticsearch
environment Stores environment variables as metadata sub-fields logstash-filter-environment
extractnumbers Extracts numbers from a string logstash-filter-extractnumbers
fingerprint Fingerprints fields by replacing values with a consistent hash logstash-filter-fingerprint
geoip Adds geographical information about an IP address logstash-filter-geoip
grok Parses unstructured event data into fields logstash-filter-grok
http Provides integration with external web services/REST APIs logstash-filter-http
i18n Removes special characters from a field logstash-filter-i18n
java_uuid Generates a UUID and adds it to each processed event core plugin
jdbc_static Enriches events with data pre-loaded from a remote database logstash-filter-jdbc_static
jdbc_streaming Enrich events with your database data logstash-filter-jdbc_streaming
json Parses JSON events logstash-filter-json
json_encode Serializes a field to JSON logstash-filter-json_encode
kv Parses key-value pairs logstash-filter-kv
memcached Provides integration with external data in Memcached logstash-filter-memcached
metricize Takes complex events containing a number of metrics and splits these up into multiple events, each holding a single metric logstash-filter-metricize
metrics Aggregates metrics logstash-filter-metrics
mutate Performs mutations on fields logstash-filter-mutate
prune Prunes event data based on a list of fields to blacklist or whitelist logstash-filter-prune
range Checks that specified fields stay within given size or length limits logstash-filter-range
ruby Executes arbitrary Ruby code logstash-filter-ruby
sleep Sleeps for a specified time span logstash-filter-sleep
split Splits multi-line messages into distinct events logstash-filter-split
syslog_pri Parses the PRI (priority) field of a syslog message logstash-filter-syslog_pri
threats_classifier Enriches security logs with information about the attacker’s intent logstash-filter-threats_classifier
throttle Throttles the number of events logstash-filter-throttle
tld Replaces the contents of the default message field with whatever you specify in the configuration logstash-filter-tld
translate Replaces field contents based on a hash or YAML file logstash-filter-translate
truncate Truncates fields longer than a given length logstash-filter-truncate
urldecode Decodes URL-encoded fields logstash-filter-urldecode
useragent Parses user agent strings into fields logstash-filter-useragent
uuid Adds a UUID to events logstash-filter-uuid
xml Parses XML into fields logstash-filter-xml

插件通用配置项

Setting Input type Required
add_field hash No
add_tag array No
enable_metric boolean No
id string No
periodic_flush boolean No
remove_field array No
remove_tag array No

八. Output插件

插件一览表

Plugin Description Github repository
boundary Sends annotations to Boundary based on Logstash events logstash-output-boundary
circonus Sends annotations to Circonus based on Logstash events logstash-output-circonus
cloudwatch Aggregates and sends metric data to AWS CloudWatch logstash-output-cloudwatch
csv Writes events to disk in a delimited format logstash-output-csv
datadog Sends events to DataDogHQ based on Logstash events logstash-output-datadog
datadog_metrics Sends metrics to DataDogHQ based on Logstash events logstash-output-datadog_metrics
elastic_app_search Sends events to the Elastic App Search solution logstash-output-elastic_app_search
elasticsearch Stores logs in Elasticsearch logstash-output-elasticsearch
email Sends email to a specified address when output is received logstash-output-email
exec Runs a command for a matching event logstash-output-exec
file Writes events to files on disk logstash-output-file
ganglia Writes metrics to Ganglia’s gmond logstash-output-ganglia
gelf Generates GELF formatted output for Graylog2 logstash-output-gelf
google_bigquery Writes events to Google BigQuery logstash-output-google_bigquery
google_cloud_storage Uploads log events to Google Cloud Storage logstash-output-google_cloud_storage
google_pubsub Uploads log events to Google Cloud Pubsub logstash-output-google_pubsub
graphite Writes metrics to Graphite logstash-output-graphite
graphtastic Sends metric data on Windows logstash-output-graphtastic
http Sends events to a generic HTTP or HTTPS endpoint logstash-output-http
influxdb Writes metrics to InfluxDB logstash-output-influxdb
irc Writes events to IRC logstash-output-irc
java_sink Discards any events received core plugin
java_stdout Prints events to the STDOUT of the shell core plugin
juggernaut Pushes messages to the Juggernaut websockets server logstash-output-juggernaut
kafka Writes events to a Kafka topic logstash-output-kafka
librato Sends metrics, annotations, and alerts to Librato based on Logstash events logstash-output-librato
loggly Ships logs to Loggly logstash-output-loggly
lumberjack Sends events using the lumberjack protocol logstash-output-lumberjack
metriccatcher Writes metrics to MetricCatcher logstash-output-metriccatcher
mongodb Writes events to MongoDB logstash-output-mongodb
nagios Sends passive check results to Nagios logstash-output-nagios
nagios_nsca Sends passive check results to Nagios using the NSCA protocol logstash-output-nagios_nsca
opentsdb Writes metrics to OpenTSDB logstash-output-opentsdb
pagerduty Sends notifications based on preconfigured services and escalation policies logstash-output-pagerduty
pipe Pipes events to another program’s standard input logstash-output-pipe
rabbitmq Pushes events to a RabbitMQ exchange logstash-output-rabbitmq
redis Sends events to a Redis queue using the RPUSH command logstash-output-redis
redmine Creates tickets using the Redmine API logstash-output-redmine
riak Writes events to the Riak distributed key/value store logstash-output-riak
riemann Sends metrics to Riemann logstash-output-riemann
s3 Sends Logstash events to the Amazon Simple Storage Service logstash-output-s3
sns Sends events to Amazon’s Simple Notification Service logstash-output-sns
solr_http Stores and indexes logs in Solr logstash-output-solr_http
sqs Pushes events to an Amazon Web Services Simple Queue Service queue logstash-output-sqs
statsd Sends metrics using the statsd network daemon logstash-output-statsd
stdout Prints events to the standard output logstash-output-stdout
stomp Writes events using the STOMP protocol logstash-output-stomp
syslog Sends events to a syslog server logstash-output-syslog
tcp Writes events over a TCP socket logstash-output-tcp
timber Sends events to the Timber.io logging service logstash-output-timber
udp Sends events over UDP logstash-output-udp
webhdfs Sends Logstash events to HDFS using the webhdfs REST API logstash-output-webhdfs
websocket Publishes messages to a websocket logstash-output-websocket
xmpp Posts events over XMPP logstash-output-xmpp
zabbix Sends events to a Zabbix server logstash-output-zabbix

插件通用配置项

Setting Input type Required
codec codec No
enable_metric boolean No
id string No

九. Codec插件

插件一览表

Plugin Description Github repository
avro Reads serialized Avro records as Logstash events logstash-codec-avro
cef Reads the ArcSight Common Event Format (CEF). logstash-codec-cef
cloudfront Reads AWS CloudFront reports logstash-codec-cloudfront
cloudtrail Reads AWS CloudTrail log files logstash-codec-cloudtrail
collectd Reads events from the collectd binary protocol using UDP. logstash-codec-collectd
dots Sends 1 dot per event to stdout for performance tracking logstash-codec-dots
edn Reads EDN format data logstash-codec-edn
edn_lines Reads newline-delimited EDN format data logstash-codec-edn_lines
es_bulk Reads the Elasticsearch bulk format into separate events, along with metadata logstash-codec-es_bulk
fluent Reads the fluentd msgpack schema logstash-codec-fluent
graphite Reads graphite formatted lines logstash-codec-graphite
gzip_lines Reads gzip encoded content logstash-codec-gzip_lines
jdots Renders each processed event as a dot core plugin
java_line Encodes and decodes line-oriented text data core plugin
java_plain Processes text data with no delimiters between events core plugin
json Reads JSON formatted content, creating one event per element in a JSON array logstash-codec-json
json_lines Reads newline-delimited JSON logstash-codec-json_lines
line Reads line-oriented text data logstash-codec-line
msgpack Reads MessagePack encoded content logstash-codec-msgpack
multiline Merges multiline messages into a single event logstash-codec-multiline
netflow Reads Netflow v5 and Netflow v9 data logstash-codec-netflow
nmap Reads Nmap data in XML format logstash-codec-nmap
plain Reads plaintext with no delimiting between events logstash-codec-plain
protobuf Reads protobuf messages and converts to Logstash Events logstash-codec-protobuf
rubydebug Applies the Ruby Awesome Print library to Logstash events logstash-codec-rubydebug

十. 插件管理

Logstash 插件是使用 Ruby开发的,Logstash 从很早的1.5.0+版开始,其插件模块和核心模块便分开维护,其插件使用的是 RubyGems包管理器来管理维护。所以 Logstash插件本质上就是自包含的RubyGems。

RubyGems(简称 gems)是一个用于对 Ruby组件进行打包的 Ruby 打包系统。 它提供一个分发 Ruby 程序和库的标准格式,还提供一个管理程序包安装的工具。

插件的名字格式:logstash-{input/output/filter}-插件名 示例:filter中的date插件:logstash-filter-date

1. 安装插件

#以安装dissect插件为例
/usr/share/logstash/bin/logstash-plugin install 插件名
#参数详解:
--path.plugins  指定安装路径

2. 查看已安装的插件

/usr/share/logstash/bin/logstash-plugin list
#参数详解:
--verbose  查看插件的版本
--verbose  查看组(input, filter, codec, output)下面的所有插件。例如查看filter下的所有插件

3. 更新插件

#更新某个插件
/usr/share/logstash/bin/logstash-plugin update 插件名
#更新全部插件
/usr/share/logstash/bin/logstash-plugin update

4. 卸载插件

/usr/share/logstash/bin/logstash-plugin remove  插件名

5. 给插件管理器设置代理

export HTTP_PROXY=http://127.0.0.1:3128

6. 修改插件仓库地址

Logstash插件默认仓库地址是:http://rubygems.org

有一些开源的插件仓库:

编辑/usr/share/logstash/Gemfile,将source "https://rubygems.org"改为source "https://my.private.repository"

十一. 其他操作

1、output-elasticsearch的template模板

PUT _template/logstash
{
    "order" : 2,
    "version" : 60001,
    "index_patterns" : [
      "*"
    ],
    "settings" : {
      "index" : {
        "number_of_replicas" : "1",
        "number_of_shards" : "2",
        "refresh_interval" : "60s"
      }
    },
    "mappings" : {
      "dynamic_templates" : [
        {
          "message_field" : {
            "path_match" : "message",
            "mapping" : {
              "norms" : false,
              "type" : "text"
            },
            "match_mapping_type" : "string"
          }
        },
        {
          "string_fields" : {
            "mapping" : {
              "norms" : false,
              "type" : "text",
              "fields" : {
                "keyword" : {
                  "ignore_above" : 256,
                  "type" : "keyword"
                }
              }
            },
            "match_mapping_type" : "string",
            "match" : "*"
          }
        }
      ],
      "properties" : {
        "@timestamp" : {
          "type" : "date"
        },
        "geoip" : {
          "dynamic" : true,
          "properties" : {
            "ip" : {
              "type" : "ip"
            },
            "latitude" : {
              "type" : "half_float"
            },
            "location" : {
              "type" : "geo_point"
            },
            "longitude" : {
              "type" : "half_float"
            }
          }
        },
        "@version" : {
          "type" : "keyword"
        }
      }
    },
    "aliases" : { }
}

十二. 监控信息

1、查看pipeline运行监控信息

curl -XGET 'http://logstash实例地址:9600/_node/stats/pipelines/pipeline实例名?pretty'
Copyright Curiouser all right reserved,powered by Gitbook该文件最后修改时间: 2022-11-16 10:26:20

results matching ""

    No results matching ""