跳转至

搭建日志分析平台

为增加 ELK 集群的运行效率,一般建议在 k8s 集群外使用物理机部署 ELK 集群。

Logstash 部署

Logstash 就是一个 web 容器,它主要是用来做日志的搜集、分析和过滤,Logstash 支持大量的数据获取方式。

1. 下载
wget https://artifacts.elastic.co/downloads/logstash/logstash-8.15.1-linux-x86_64.tar.gz

tar -xf logstash-8.15.1-linux-x86_64.tar.gz -C /opt/ --transform 's/^logstash-8.15.1/logstash/'

logstash nginx-配置

vim nginx.conf 核心配置

input {
  kafka {
  #kafka集群配置只需要在地址后面添加一个逗号分隔
    bootstrap_servers => "10.10.0.2:9092,10.10.0.4:9092,10.10.0.5:9092"
    auto_offset_reset => "earliest" # 消费策略 latest (默认) earliest none
    consumer_threads => 3
    topics => ["nginx-log-topic"]
    codec => "json"
    group_id => "logstash-group"
  }
}

output {
  elasticsearch {
    hosts => ["http://192.168.1.99:9200"]
    user => "elastic"
    password => "datarc"
    index => "test-kafka-%{+YYYY.MM.dd}"
  }
}

启动

/opt/logstash/bin/logstash -f /opt/logstash/config/nginx.conf

logstash 配置

/etc/logstash/logstash.yml 核心配置

path.data: /var/lib/logstash
path.logs: /var/log/logstash
path.config: "/etc/logstash/conf.d/*.conf"
node.name: logstash
api.http.host: 192.168.3.41
api.http.port: 9600-9700
config.reload.automatic: true  ##定期检查配置是否更改并重新加载管道,默认为false
config.reload.interval: 10s #logstash间隔多久检查一次配置中的更改,默认为3秒

/etc/logstash/conf.d/logstash-to-elastic.conf 核心配置

input {
  kafka {
  #kafka集群配置只需要在地址后面添加一个逗号分隔
    bootstrap_servers => "192.168.3.40:9092"
    group_id => "logstash-group"
    topics => ["nginx-log-topic"]
    consumer_threads => 3
    auto_offset_reset => "earliest" # 消费策略 latest (默认) earliest none
    codec => "json"
  }
}

output {
  elasticsearch {
    hosts => ["http://192.168.1.99:9200"]
    user => "elastic"
    password => "datarc"
    index => "test-kafka-%{+YYYY.MM.dd}"
  }
}

logstash 服务配置

logstash 进程不用预先启动,使用时启动。

验证 logstash 可用性

1. 标准输入及标准输出验证
/usr/share/logstash/bin/logstash -e 'input {stdin{} } output {stdout {} }'
...
...

输入 abc 字符,查看其输出以 json格式 输出 abc 内容

{
          "host" => "etcd-2",
      "@version" => "1",
       "message" => "abc",
    "@timestamp" => 2023-07-18T07:48:18.480Z
}
2. 后续使用 logstash 输入内容到 elasticsearch 验证
/usr/share/logstash/bin/logstash -e 'input { stdin{} } output { elasticsearch { hosts => ["192.168.1.99:9200"] user => "elastic" password => "datarc" index => "sasas-logstash-%{+YYYY.MM.dd}" } }'
# 会通过 kibana 页面中的索引展示,但需要在 kibana 页面中添加索引

kafka 的 auto.offset.reset 详解

auto.offset.reset有以下三个可选值

  • latest (默认)
  • earliest
  • none

三者相同点:

对于同一个消费者组,若已有提交的 offset,则从提交的 offset 开始接着消费

只要这个消费者组消费过了,不管 auto.offset.reset 指定成什么值,效果都一样,每次启动都是已有的最新的 offset 开始接着往后消费

不同的点:

latest(默认):对于同一个消费者组,若没有提交过 offset,则只消费消费者连接 topic 后,新产生的数据

如果这个 topic 有历史消息,现在新启动了一个消费者组,且 auto.offset.reset=latest ,此时已存在的历史消息无法消费到,那保持消费者组运行,如果此时 topic 有新消息进来了,这时新消息才会被消费到。而一旦有消费,则必然会提交 offset 这时候如果该消费者组意外下线了,topic 仍然有消息进来,接着该消费者组在后面恢复上线了,它仍然可以从下线时的 offset 处开始接着消费,此时走的就是共同定义

earliest:对于同一个消费者组,若没有提交过 offset ,则从头开始消费

如果这个topic有历史消息存在,现在新启动了一个消费者组,且auto.offset.reset=earliest,那将会从头开始消费,这就是与latest不同之处。 一旦该消费者组消费过 topic 后,此时就有该消费者组的offset了,这种情况下即使指定了 auto.offset.reset=earliest ,再重新启动该消费者组,效果是与 latest 一样的,也就是此时走的是共同的定义

none:对于同一个消费者组,若没有提交过 offset ,会抛异常

一般生产环境基本用不到该参数

总结

如果topic已经有历史消息了,又需要消费这些历史消息,则必须要指定一个从未消费过的消费者组,同时指定auto.offset.reset为earliest,才可以消费到历史数据,之后就有提交offset。有了offset,无论是earliest还是latest,效果都是一样的了。 如果topic没有历史消息,或者不需要处理历史消息,那按照默认latest即可。

参考: https://www.cnblogs.com/convict/p/16701154.html#1-%E5%8F%96%E5%80%BC%E5%8F%8A%E5%AE%9A%E4%B9%89