lgostash Pipeline

本文总阅读量

1.Logstash Pipeline

pipeline在logstash6.0后加入,有了pipeline这个机制后,我们可以只通过配置pipeline来控制logstash的input, filter, output

1.1.logstash所在位置

目前很多都是使用beats收集各个项目的数据,并经过类似kafka等缓冲层,再由logstash接受处理数据再发到elastic(如下)
beats -> [缓冲] ->logstash -> elasticsearch

1.2为何需要Pipeling

  • 条件地狱
    在没有出现pipeline的时候,只能在input,filter,output配置多个if,else非常麻烦.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    input {
    beats { port => 2333 tag => apache }
    beats { port => 2233 tag => so1n }
    }
    filter {
    if "apache" in [tags] {
    dissect { ... }
    } else if "so1n" in [tags] {
    grok { ... }
    }
    }
    # elasticsearch需要分别定义index等导致需要拆分
    output {
    if "apache" in [tags] {
    elasticsearch { ... }
    } else if "so1n" in [tags] {
    elasticsearch { ... }
    }
    }
  • 缺乏拥塞隔离
    在logstash在同一段事件只能处理一件事情,效率比较低,当apache拥塞时,tag为so1n的数据无法被处理,会导致tag为 so1n的输入端压力也增加

在pipeline没出现前可以通过在同一台机器上运行多个 Logstash 实例来解决,然后可以独立地管理这些实例。但是即使这样的解决方案也会产生其他问题,如:需要管理多个实例,每个 Logstash 的实例就会多出一个JVM,需要监视每个 Logstash 实例,而pipeline的出现就可以解决了这些问题

2.Pipeline使用场景

2.1.Distributor Pattern 分发者模式

在一个 pipeline 处理输入,然后根据不同的数据类型再分发到对应的 Pipeline 去处理。这种模式的好处在于统一输入端口,隔离不同类型的处理配置文件,减少由于配置文件混合在一起带来的维护成本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# /etc/logstash/pipelines.yml
- pipeline.id: main
config.string: |
input { beats { port => 2333 } }
output {
if [type] == apache {
pipeline { send_to => apache }
} else if [type] == system {
pipeline { send_to => syslog }
} else {
pipeline { send_to => so1n }
}
}
- pipeline.id: apache-processing
config.string: |
input { pipeline { address => apache } }
filter {
# ...
}
output {
elasticsearch { hosts => [es] }
}
- pipeline.id: syslog-processing
config.string: |
input { pipeline { address => syslog } }
filter {
# ...
}
output {
pipeline { send_to => es }
}
- pipeline.id: so1n-processing
config.string: |
input { pipeline { address => so1n } }
output {
pipeline { send_to => es }
}
- pipeline.id: es
config.string: |
input { pipeline {address => es } }
output {
elasticsearch { ... }
}

2.2.Output Isolator Pattern 输出隔离模式

Logstash 的一个 pipeline 可以配置多个 output,但是一旦某一个 output 出问题,会导致另一个 output 也无法处理新数据。而通过输出隔离模式可以完美解决这个问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
# /etc/logstash/pipelines.yml
- pipeline.id: main
config.string: |
input { beats { port => 2333 } }
output { pipeline { send_to => [es1, es2] } }
- pipeline.id: es1
config.string: |
input { pipeline { address => es1 } }
output { elasticsearch { } }
- pipeline.id: es2
config.string: |
input { pipeline { address => es2 } }
output { elasticsearch { } }

通过输出到两个独立的 pipeline,解除es1和es2的相互的影响,比如 es1 出问题的时候,es2 依然可以正常处理数据,而且两个 pipeline 可以配置独立的队列来保障数据的完备性.

此外,如果把上面的输出改为filter,那么就可以把同一分输入数据克隆成两份,并分发给不通的filter进行处理输出.

2.3Collector Pattern 收集者模式

这个模式很简单,就是把不同地方的数据收集过来,并一起解析输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# /etc/logstash/pipelines.yml
- pipeline.id: beats
config.string: |
input { beats { port => 2333 } }
output { pipeline { send_to => [so1n] } }
- pipeline.id: kafka
config.string: |
input { kafka { ... } }
output { pipeline { send_to => [so1n] } }
- pipeline.id: so1n
config.string: |
input { pipeline { address => so1n } }
filter {
# ...
}
output { elasticsearch { } }

3.如何更好的搭配使用

上面多种模式其实都很简单,官方定义了这些模式也就是跟大家说 可以灵活的编写配置文件,通过pipeline可以更好的解耦和复用,如何使用还是看实际的使用场景
目前很多人用了beats -> [缓冲] ->logstash -> elasticsearch这种结构,那我们可以利用beats中的metadate特性来创建es的index,让所有的输出都可以输出到同一个es,其他的只要编辑filter就可以了,如果 公司里大部分业务数据命名是差不多的,可能只有一点点不太一样,那也可以合并到一个filter,不必去写更多的filter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
- pipeline.id: beats-server
config.string: |
input { beats { port => 5044 } }
output {
if [fields][custom_field] == "so1n1" {
pipeline { send_to => so1n }
} else if [fields][custom_field] == "so1n2" {
pipeline { send_to => so1n }
} else if [fields][custom_field] == "other" {
pipeline { send_to => other }
}
}

- pipeline.id : so1n
config.string: |
input { pipeline { address => so1n } }
filter {
json {
source => "json_dumps"
}
geoip {
source => "clientip"
}
}
output { pipeline { send_to => es }}

- pipeline.id : other
config.string: |
input { pipeline { address => other } }
filter {
# ....
}
output { pipeline { send_to => es }}

- pipeline.id: es
config.string: |
input { pipeline { address => es }}
output {
elasticsearch {
hosts => [....]
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
}
}

4.queue 让logstash优雅关闭

默认情况下logstash会将在处在pipeline各个stage的event buffer在内存中, 这种buffer方式,在logstash意外down掉的情况下会丢失数据,因此引入了Persisted queue这个用来持久化数据,防止logstash down掉时数据丢失.

4.1queue位置

queue的位置示意: input → queue → filter + output

当event从input写到queue成功后, 会发送ack给给数据源,会把数据写入本地文件, 当对应的filter和output都处理完event后,该event才会被标记为acknowleged/ACKed,并把对应数据从本地文件删除, 比如, 当grok完后,并发送个ES收到成功响应后, event才会被标记为成功处理.如果logstash意外down机, 那些未被标记为ACKed的事件会被重新处理.

4.2 queue参数

  • queue.type: persisted persisted为持久化数据,默认是写入到内存
  • path.queue: /usr/share/logstash/data 队列存储路径;如果队列类型为persisted时生效
  • queue.page_capacity: 250mb 队列为持久化,单个队列大小
  • queue.max_events: 0 当启用持久化队列时,队列中未读事件的最大数量,0为不限制
  • **queue.max_bytes: 1024mb **队列最大容量
  • **queue.checkpoint.writes: 1024 **在启用持久队列时强制执行检查点之前的最大数量的写入事件,0为不限制
  • **queue.checkpoint.interval: 1000 **当启用持久队列时,在头页面上强制一个检查点的时间间隔
  • queue.drain: 指示logstash在关闭之前是否需要将queue中的数据处理. 一般queue不大且堆积的event数量不大的情况下可以开启, 不然耗时时间会太长

5.pipeline吞吐量优化

1
2
3
4
pipeline.workers: 8        # pipeline线程数,官方建议是等于CPU内核数,命令行参数为-w
pipeline.output.workers: 8 # 实际output时的线程数
pipeline.batch.size: 3000 # 默认为125,当存入的事件数大于该值时则发送数据
pipeline.batch.delay: 5 # 每隔一段时间,发送数据

pipeline的配置没有queue那么多,同时上面两个线程数的基本可以不用配置,只关注后两个pipeline.batch*相关的参数即可.这两个参数只要其中一个满足就调用es的api接口发送数据到es.
根据自己的实时性去调节两个数字的大小,如果发现es写入有瓶颈,可以调大两个参数.
其中发现是否写入瓶颈可以如下操作:

  • 查看logstash是否有retrying failed action with response code: 429字样的日志
  • 查看es, 可以在kibanan的控制台输入GET _nodes/stats/thread_pool?pretty,看node -> thread_pool -> bulk 查看threads和active,active越接近threads则证明越接近写入瓶颈
查看评论