lgostash Pipeline
1.Logstash Pipeline
pipeline在logstash6.0后加入,有了pipeline这个机制后,我们可以只通过配置pipeline来控制logstash的input, filter, output
1.1.logstash所在位置
目前很多都是使用beats收集各个项目的数据,并经过类似kafka等缓冲层,再由logstash接受处理数据再发到elastic(如下)
beats -> [缓冲] ->logstash -> elasticsearch
1.2为何需要Pipeling
- 条件地狱
在没有出现pipeline的时候,只能在input,filter,output配置多个if,else非常麻烦.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19input {
beats { port => 2333 tag => apache }
beats { port => 2233 tag => so1n }
}
filter {
if "apache" in [tags] {
dissect { ... }
} else if "so1n" in [tags] {
grok { ... }
}
}
# elasticsearch需要分别定义index等导致需要拆分
output {
if "apache" in [tags] {
elasticsearch { ... }
} else if "so1n" in [tags] {
elasticsearch { ... }
}
} - 缺乏拥塞隔离
在logstash在同一段事件只能处理一件事情,效率比较低,当apache拥塞时,tag为so1n的数据无法被处理,会导致tag为 so1n的输入端压力也增加
在pipeline没出现前可以通过在同一台机器上运行多个 Logstash 实例来解决,然后可以独立地管理这些实例。但是即使这样的解决方案也会产生其他问题,如:需要管理多个实例,每个 Logstash 的实例就会多出一个JVM,需要监视每个 Logstash 实例,而pipeline的出现就可以解决了这些问题
2.Pipeline使用场景
2.1.Distributor Pattern 分发者模式
在一个 pipeline 处理输入,然后根据不同的数据类型再分发到对应的 Pipeline 去处理。这种模式的好处在于统一输入端口,隔离不同类型的处理配置文件,减少由于配置文件混合在一起带来的维护成本。
1 |
|
2.2.Output Isolator Pattern 输出隔离模式
Logstash 的一个 pipeline 可以配置多个 output,但是一旦某一个 output 出问题,会导致另一个 output 也无法处理新数据。而通过输出隔离模式可以完美解决这个问题。
1 |
|
通过输出到两个独立的 pipeline,解除es1和es2的相互的影响,比如 es1 出问题的时候,es2 依然可以正常处理数据,而且两个 pipeline 可以配置独立的队列来保障数据的完备性.
此外,如果把上面的输出改为filter,那么就可以把同一分输入数据克隆成两份,并分发给不通的filter进行处理输出.
2.3Collector Pattern 收集者模式
这个模式很简单,就是把不同地方的数据收集过来,并一起解析输出
1 |
|
3.如何更好的搭配使用
上面多种模式其实都很简单,官方定义了这些模式也就是跟大家说 可以灵活的编写配置文件,通过pipeline可以更好的解耦和复用,如何使用还是看实际的使用场景
目前很多人用了beats -> [缓冲] ->logstash -> elasticsearch这种结构,那我们可以利用beats中的metadate特性来创建es的index,让所有的输出都可以输出到同一个es,其他的只要编辑filter就可以了,如果 公司里大部分业务数据命名是差不多的,可能只有一点点不太一样,那也可以合并到一个filter,不必去写更多的filter
1 |
|
4.queue 让logstash优雅关闭
默认情况下logstash会将在处在pipeline各个stage的event buffer在内存中, 这种buffer方式,在logstash意外down掉的情况下会丢失数据,因此引入了Persisted queue这个用来持久化数据,防止logstash down掉时数据丢失.
4.1queue位置
queue的位置示意: input → queue → filter + output
当event从input写到queue成功后, 会发送ack给给数据源,会把数据写入本地文件, 当对应的filter和output都处理完event后,该event才会被标记为acknowleged/ACKed,并把对应数据从本地文件删除, 比如, 当grok完后,并发送个ES收到成功响应后, event才会被标记为成功处理.如果logstash意外down机, 那些未被标记为ACKed的事件会被重新处理.
4.2 queue参数
- queue.type: persisted persisted为持久化数据,默认是写入到内存
- path.queue: /usr/share/logstash/data 队列存储路径;如果队列类型为persisted时生效
- queue.page_capacity: 250mb 队列为持久化,单个队列大小
- queue.max_events: 0 当启用持久化队列时,队列中未读事件的最大数量,0为不限制
- **queue.max_bytes: 1024mb **队列最大容量
- **queue.checkpoint.writes: 1024 **在启用持久队列时强制执行检查点之前的最大数量的写入事件,0为不限制
- **queue.checkpoint.interval: 1000 **当启用持久队列时,在头页面上强制一个检查点的时间间隔
- queue.drain: 指示logstash在关闭之前是否需要将queue中的数据处理. 一般queue不大且堆积的event数量不大的情况下可以开启, 不然耗时时间会太长
5.pipeline吞吐量优化
1 |
|
pipeline的配置没有queue那么多,同时上面两个线程数的基本可以不用配置,只关注后两个pipeline.batch*相关的参数即可.这两个参数只要其中一个满足就调用es的api接口发送数据到es.
根据自己的实时性去调节两个数字的大小,如果发现es写入有瓶颈,可以调大两个参数.
其中发现是否写入瓶颈可以如下操作:
- 查看logstash是否有retrying failed action with response code: 429字样的日志
- 查看es, 可以在kibanan的控制台输入
GET _nodes/stats/thread_pool?pretty
,看node -> thread_pool -> bulk 查看threads和active,active越接近threads则证明越接近写入瓶颈
- 本文作者:So1n
- 本文链接:http://so1n.me/2020/02/10/logstash%20Pipeline/index.html
- 版权声明:本博客所有文章均采用 BY-NC-SA 许可协议,转载请注明出处!