搭建日志分析平台 ¶
为增加 ELK 集群的运行效率,一般建议在 k8s 集群外使用物理机部署 ELK 集群。
Logstash 部署 ¶
Logstash 就是一个 web 容器,它主要是用来做日志的搜集、分析和过滤,Logstash 支持大量的数据获取方式。
wget https://artifacts.elastic.co/downloads/logstash/logstash-8.15.1-linux-x86_64.tar.gz
tar -xf logstash-8.15.1-linux-x86_64.tar.gz -C /opt/ --transform 's/^logstash-8.15.1/logstash/'
logstash nginx-配置 ¶
vim nginx.conf 核心配置
input {
kafka {
#kafka集群配置只需要在地址后面添加一个逗号分隔
bootstrap_servers => "10.10.0.2:9092,10.10.0.4:9092,10.10.0.5:9092"
auto_offset_reset => "earliest" # 消费策略 latest (默认) earliest none
consumer_threads => 3
topics => ["nginx-log-topic"]
codec => "json"
group_id => "logstash-group"
}
}
output {
elasticsearch {
hosts => ["http://192.168.1.99:9200"]
user => "elastic"
password => "datarc"
index => "test-kafka-%{+YYYY.MM.dd}"
}
}
启动 ¶
/opt/logstash/bin/logstash -f /opt/logstash/config/nginx.conf
logstash 配置 ¶
/etc/logstash/logstash.yml 核心配置
path.data: /var/lib/logstash
path.logs: /var/log/logstash
path.config: "/etc/logstash/conf.d/*.conf"
node.name: logstash
api.http.host: 192.168.3.41
api.http.port: 9600-9700
config.reload.automatic: true ##定期检查配置是否更改并重新加载管道,默认为false
config.reload.interval: 10s #logstash间隔多久检查一次配置中的更改,默认为3秒
/etc/logstash/conf.d/logstash-to-elastic.conf 核心配置
input {
kafka {
#kafka集群配置只需要在地址后面添加一个逗号分隔
bootstrap_servers => "192.168.3.40:9092"
group_id => "logstash-group"
topics => ["nginx-log-topic"]
consumer_threads => 3
auto_offset_reset => "earliest" # 消费策略 latest (默认) earliest none
codec => "json"
}
}
output {
elasticsearch {
hosts => ["http://192.168.1.99:9200"]
user => "elastic"
password => "datarc"
index => "test-kafka-%{+YYYY.MM.dd}"
}
}
logstash 服务配置 ¶
logstash 进程不用预先启动,使用时启动。
验证 logstash 可用性 ¶
/usr/share/logstash/bin/logstash -e 'input {stdin{} } output {stdout {} }'
...
...
输入 abc 字符,查看其输出以 json格式 输出 abc 内容
{
"host" => "etcd-2",
"@version" => "1",
"message" => "abc",
"@timestamp" => 2023-07-18T07:48:18.480Z
}
/usr/share/logstash/bin/logstash -e 'input { stdin{} } output { elasticsearch { hosts => ["192.168.1.99:9200"] user => "elastic" password => "datarc" index => "sasas-logstash-%{+YYYY.MM.dd}" } }'
# 会通过 kibana 页面中的索引展示,但需要在 kibana 页面中添加索引
kafka 的 auto.offset.reset 详解 ¶
auto.offset.reset有以下三个可选值
- latest (默认)
- earliest
- none
三者相同点: ¶
对于同一个消费者组,若已有提交的 offset,则从提交的 offset 开始接着消费
只要这个消费者组消费过了,不管 auto.offset.reset 指定成什么值,效果都一样,每次启动都是已有的最新的 offset 开始接着往后消费
不同的点: ¶
latest(默认):对于同一个消费者组,若没有提交过 offset,则只消费消费者连接 topic 后,新产生的数据
如果这个 topic 有历史消息,现在新启动了一个消费者组,且 auto.offset.reset=latest ,此时已存在的历史消息无法消费到,那保持消费者组运行,如果此时 topic 有新消息进来了,这时新消息才会被消费到。而一旦有消费,则必然会提交 offset 这时候如果该消费者组意外下线了,topic 仍然有消息进来,接着该消费者组在后面恢复上线了,它仍然可以从下线时的 offset 处开始接着消费,此时走的就是共同定义
earliest:对于同一个消费者组,若没有提交过 offset ,则从头开始消费
如果这个topic有历史消息存在,现在新启动了一个消费者组,且auto.offset.reset=earliest,那将会从头开始消费,这就是与latest不同之处。 一旦该消费者组消费过 topic 后,此时就有该消费者组的offset了,这种情况下即使指定了 auto.offset.reset=earliest ,再重新启动该消费者组,效果是与 latest 一样的,也就是此时走的是共同的定义
none:对于同一个消费者组,若没有提交过 offset ,会抛异常
一般生产环境基本用不到该参数
总结 ¶
如果topic已经有历史消息了,又需要消费这些历史消息,则必须要指定一个从未消费过的消费者组,同时指定auto.offset.reset为earliest,才可以消费到历史数据,之后就有提交offset。有了offset,无论是earliest还是latest,效果都是一样的了。 如果topic没有历史消息,或者不需要处理历史消息,那按照默认latest即可。
参考: https://www.cnblogs.com/convict/p/16701154.html#1-%E5%8F%96%E5%80%BC%E5%8F%8A%E5%AE%9A%E4%B9%89