Elasticsearch用于搜索、分析数据。

Kibana用于展示数据。

Beats用于收集数据。

Logstash用于集中、转换和存储数据。

日志的主要处理流程为下图:

ELK%20Stack(Elasticsearch%E3%80%81Kibana%E3%80%81Beats%20%E5%92%8C%20Logstash)%E6%90%AD%E5%BB%BA%208aaac5c86012479a8765cbad898592c3/Untitled.png

使用Docker搭建ELK

新建配置文件

在elastic/config目录下,创建ElasticSearch的配置文件:

# es01.yml
cluster.name: "docker-cluster"
network.host: 0.0.0.0

同目录下创建kibnana的配置文件:

# kib01.yml
server.host: "0"
elasticsearch.hosts: [ "http://elasticsearch:9200" ]
monitoring.ui.container.elasticsearch.enabled: true
xpack.encryptedSavedObjects.encryptionKey: b014b39f62fd1d9a0b1dac766c0e51f5
xpack.reporting.encryptionKey: 9e069cbc6b68796799f96f057ce6c5f5
xpack.security.encryptionKey: 1e0cf9eb23cbfd00d8ba113cb5327bb5

在elastic/config/logstash目录下,创建logstash的配置文件:

# logstash.yml
http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: [ "https://es01:9200" ]
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.username: elastic
xpack.monitoring.elasticsearch.password: TsfYfefVTuDfwDg3IITK
xpack.monitoring.elasticsearch.ssl.certificate_authority: /usr/share/elasticsearch/config/certificates/ca/ca.crt

在同目录下,创建logstash管理pipeline的配置文件:

# pipelines.yml

# This file is where you define your pipelines. You can define multiple.
# # For more information on multiple pipelines, see the documentation:
# #   https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html
#
- pipeline.id: main
  path.config: "/usr/share/logstash/pipeline"

在elastic/config/logstash/pipeline目录下,创建logstash收集器配置文件:

# logstash.conf
input {
    tcp {
        mode => "server"
        host => "0.0.0.0"
        port => 4560
        codec => json_lines
    }
}
output {
    elasticsearch {
        hosts => ["https://es01:9200"]
        index => "springboot-test-%{+YYYY.MM.dd}"
        user => elastic
        password => TsfYfefVTuDfwDg3IITK
        ssl_certificate_verification => false #关闭ssl
    }
    stdout{codec => rubydebug}
}

准备创建docker-compose文件

在elastic/config/elasticsearch/certificaes目录下创建instances.yml文件:

# instances.yml
instances:
  - name: es01
    dns:
      - es01
      - localhost
    ip:
      - 127.0.0.1

  - name: 'kib01'
    dns:
      - kib01
      - localhost

  - name: 'logstash'
    dns:
      - logstash
      - localhost
    ip:
      - 127.0.0.1

在elastic目录下,创建认证的文件:

# create-certs.yml
version: '2.2'

services:
  create_certs:
    image: docker.elastic.co/elasticsearch/elasticsearch:${VERSION}
    container_name: create_certs
    command: >
      bash -c '
        yum install -y -q -e 0 unzip;
        if [[ ! -f /certs/bundle.zip ]]; then
          bin/elasticsearch-certutil cert --silent --pem --in config/certificates/instances.yml -out /certs/bundle.zip;
          unzip /certs/bundle.zip -d /certs;
        fi;
        chown -R 1000:0 /certs
      '      
    working_dir: /usr/share/elasticsearch
    volumes:
      - ./certs:/certs
      - ./config/elasticsearch/certificates:/usr/share/elasticsearch/config/certificates
    networks:
      - elastic

volumes:
  certs:
    driver: local

networks:
  elastic:
    driver: bridge

在同目录下,创建环境文件,方便统一管理docker版本:

# .env
COMPOSE_PROJECT_NAME=es
CERTS_DIR=/usr/share/elasticsearch/config/certificates   # 这个为docker容器里面的位置
VERSION=7.13.4

同目录下,创建docker-compose文件:

# elastic-docker-tls.yml
version: '2.2'

services:
  es01:
    image: docker.elastic.co/elasticsearch/elasticsearch:${VERSION}
    container_name: es01
    environment:
      - cluster.name=es-docker
      - discovery.type=single-node
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - xpack.license.self_generated.type=trial 
      - xpack.security.enabled=true
      - xpack.security.http.ssl.enabled=true 
      - xpack.security.http.ssl.key=$CERTS_DIR/es01/es01.key
      - xpack.security.http.ssl.certificate_authorities=$CERTS_DIR/ca/ca.crt
      - xpack.security.http.ssl.certificate=$CERTS_DIR/es01/es01.crt
      - xpack.security.transport.ssl.enabled=true 
      - xpack.security.transport.ssl.verification_mode=certificate 
      - xpack.security.transport.ssl.certificate_authorities=$CERTS_DIR/ca/ca.crt
      - xpack.security.transport.ssl.certificate=$CERTS_DIR/es01/es01.crt
      - xpack.security.transport.ssl.key=$CERTS_DIR/es01/es01.key
      - TZ=Asia/Shanghai
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - data01:/usr/share/elasticsearch/data
      - ./certs:$CERTS_DIR
      - ./config/es01.yml:/usr/share/elasticsearch/config/elasticsearch.yml
    ports:
      - 9200:9200
    networks:
      - elastic

    healthcheck:
      test: curl --cacert $CERTS_DIR/ca/ca.crt -s https://localhost:9200 >/dev/null; if [[ $$? == 52 ]]; then echo 0; else echo 1; fi
      interval: 30s
      timeout: 10s
      retries: 5

  kib01:
    image: docker.elastic.co/kibana/kibana:${VERSION}
    container_name: kib01
    depends_on: {"es01": {"condition": "service_healthy"}}
    ports:
      - 5601:5601
    environment:
      SERVERNAME: localhost
      ELASTICSEARCH_URL: https://es01:9200
      ELASTICSEARCH_HOSTS: https://es01:9200
      ELASTICSEARCH_USERNAME: kibana_system
      ELASTICSEARCH_PASSWORD: 1Y51Fvgcq9Tv5oG1MWSr
      ELASTICSEARCH_SSL_CERTIFICATEAUTHORITIES: $CERTS_DIR/ca/ca.crt
      SERVER_SSL_ENABLED: "true"
      SERVER_SSL_KEY: $CERTS_DIR/kib01/kib01.key
      SERVER_SSL_CERTIFICATE: $CERTS_DIR/kib01/kib01.crt
      I18N_LOCALE: zh-CN
      TZ: Asia/Shanghai
    volumes:
      - ./certs:$CERTS_DIR
      - ./config/kib01.yml:/usr/share/kibana/config/kibana.yml
    networks:
      - elastic
  logstash:
    image: docker.elastic.co/logstash/logstash:${VERSION}
    container_name: logstash
    depends_on: {"es01": {"condition": "service_healthy"}}
    volumes:
      - ./config/logstash/logstash.yml:/usr/share/logstash/config/logstash.yml
      - ./config/logstash/pipelines.yml:/usr/share/logstash/config/pipelines.yml
      - ./config/logstash/pipeline:/usr/share/logstash/pipeline
      - ./certs:$CERTS_DIR
    ports:
      - 4560:4560
      - 4561:4561
    environment:
      SERVERNAME: localhost
      ELASTICSEARCH_URL: https://es01:9200
      ELASTICSEARCH_HOSTS: https://es01:9200
      ELASTICSEARCH_USERNAME: logstash_system
      ELASTICSEARCH_PASSWORD: psvTdxGicq3R0evjCLHv
      ELASTICSEARCH_SSL_CERTIFICATEAUTHORITIES: $CERTS_DIR/ca/ca.crt
      SERVER_SSL_ENABLED: "true"
      SERVER_SSL_KEY: $CERTS_DIR/logstash/logstash.key
      SERVER_SSL_CERTIFICATE: $CERTS_DIR/logstash/logstash.crt
      I18N_LOCALE: zh-CN
      TZ: Asia/Shanghai
    networks:
      - elastic

volumes:
  data01:
    driver: local
  certs:
    driver: local

networks:
  elastic:
    driver: bridge

建议从官方拉取docker文件,但是官方网址比较慢,也可以改为从其他镜像仓库下载,例如把 [docker.elastic.co/kibana/kibana:${VERSION}](http://docker.elastic.co/kibana/kibana:${VERSION}) 官方地址前缀去掉,变成 kibana:${VERSION},其他images也类似。

配置认证并启动ELK

首先创建认证:

$ docker-compose -f create-certs.yml run --rm create_certs

启动所有服务:

$ docker-compose -f elastic-docker-tls.yml up -d

生成用户密码:

$ docker exec es01 /bin/bash -c "bin/elasticsearch-setup-passwords \
auto --batch --url https://es01:9200"

将生成的密码记录下来,将kibana_system的密码复制到elastic-docker-tls.yml文件中的kib01的ELASTICSEARCH_PASSWORD,将logstash_system的密码复制到elastic-docker-tls.yml文件中的logstash的ELASTICSEARCH_PASSWORD。

停止docker-compose:

$ docker-compose -f elastic-docker-tls.yml stop

再启动docker-compose即可。

启动成功后,用命令 docker network inspect es_elastic能看到应用都连接到了同一个网络。

检查logstash是否正常启动,通过命令 docker logs logstash查看日志是否有error。

Spring集成ELK

将Spring产生的日志发送到logstash,logstash对数据进行过滤后,再发送到elasticsearch,再通过kibnana查看数据。

添加依赖

Spring使用的是logback,原本是想使用log4j,但没能集成成功。

添加logstash依赖(核心),lombok和spring-boot-starter-web:

<!-- pom.xml -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
    <version>1.18.20</version>
</dependency>

<dependency>
    <groupId>net.logstash.logback</groupId>
    <artifactId>logstash-logback-encoder</artifactId>
    <version>6.6</version>
</dependency>

集成logstash的详细使用可以参考:

GitHub - logstash/logstash-logback-encoder: Logback JSON encoder and appenders

项目配置

配置日志配置文件logback.xml,配置文件放在src/main/resource下:

<?xml version="1.0" encoding="UTF-8"?>
<configuration  scan="true" scanPeriod="60 seconds" debug="false">
    <include resource="org/springframework/boot/logging/logback/base.xml" />
    <contextName>logback</contextName>

    <appender name="logstash" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
        <destination>127.0.0.1:4560</destination>
        <encoder class="net.logstash.logback.encoder.LogstashEncoder" />
    </appender>

    <!--输出到控制台-->
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>

    <root level="info">
        <appender-ref ref="logstash"/>
        <appender-ref ref="console" />
    </root>
</configuration>

<destination>127.0.0.1:4560</destination>logstash ip和暴露的端口,logback就是通过这个地址把日志发送给logstash。

并在application.properties文件配置日志输出级别和日志配置文件位置:

# application.properties
logging.level.root=info
logging.config=classpath:logback.xml

打印日志的代码:

// TestController.java
@RestController
@Slf4j
public class TestController {

    @GetMapping(value = "/log4j2")
    public String testLog(){
        try {
            log.info("Hello 这是 info message. 信息");
            log.error("Hello 这是 error message. 报警");
            log.warn("Hello 这是 warn message. 警告");
            log.debug("Hello 这是 debug message. 调试");
            List<String> list = new ArrayList<>();
            System.out.println(list.get(2));
        } catch (Exception e) {
            log.error("testLog", e);
        }
        return "";
    }
}

logstash配置

在logstash.conf配置收集器,其实前面已经配置过了。更改了配置之后需要重启logstash:

# logstash.conf
input {
    tcp {
        mode => "server"
        host => "0.0.0.0"  #不限定输入的ip地址
        port => 4560  #监听的端口
        codec => json_lines #解析器
    }
}
output {
    elasticsearch { 
        hosts => ["https://es01:9200"]
        index => "springboot-test-%{+YYYY.MM.dd}"
        user => elastic
        password => TsfYfefVTuDfwDg3IITK
        ssl_certificate_verification => false
    }
    stdout{codec => rubydebug}
}

查看日志

可以在logstash中看到接收到的日志。

接下来,使用账号elastic登陆kibana,页面地址为http://ip:5601

在索引管理能看到传入的数据。

ELK%20Stack(Elasticsearch%E3%80%81Kibana%E3%80%81Beats%20%E5%92%8C%20Logstash)%E6%90%AD%E5%BB%BA%208aaac5c86012479a8765cbad898592c3/Untitled%201.png

然后去到索引模式,创建索引模式

ELK%20Stack(Elasticsearch%E3%80%81Kibana%E3%80%81Beats%20%E5%92%8C%20Logstash)%E6%90%AD%E5%BB%BA%208aaac5c86012479a8765cbad898592c3/Untitled%202.png

索引名字同logstash.conf中output中的index。时间筛选字段名称处选择 @timestamp,方便我们后边按照时间段筛选数据

然后去到Discover下,选择对应的索引模式及日志时间,便能看到数据

ELK%20Stack(Elasticsearch%E3%80%81Kibana%E3%80%81Beats%20%E5%92%8C%20Logstash)%E6%90%AD%E5%BB%BA%208aaac5c86012479a8765cbad898592c3/Untitled%203.png

关于logstash的ssl配置可以参考:

Error unable to find valid certification path

Collect Logstash monitoring data using legacy collectors | Logstash Reference [7.14] | Elastic

从项目传入自定义索引

上面的配置只能用于一个项目,但是不可能一个ELK只有一个项目,所以为了让ELK接入多个项目,项目传入自定义的索引。

修改logback.xml文件,增加了标签:

<!-- logback.xml -->

<?xml version="1.0" encoding="UTF-8"?>
<configuration  scan="true" scanPeriod="60 seconds" debug="false">
    <include resource="org/springframework/boot/logging/logback/base.xml" />
    <contextName>logback</contextName>

    <appender name="logstash" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
        <destination>127.0.0.1:4560</destination>
        <encoder class="net.logstash.logback.encoder.LogstashEncoder" >
        <customFields>{"appName": "spring-test2"}</customFields>
        </encoder>
    </appender>

    <!--输出到控制台-->
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>

    <root level="info">
        <appender-ref ref="logstash"/>
        <appender-ref ref="console" />
    </root>
</configuration>

修改logstash.conf

# logstash.conf
input {
    tcp {
        mode => "server"
        host => "0.0.0.0"
        port => 4560
        codec => json_lines
    }
}
output {
    elasticsearch {
        hosts => ["https://es01:9200"]
        index => "%{[appName]}-%{+YYYY.MM.dd}"
        user => elastic
        password => TsfYfefVTuDfwDg3IITK
        ssl_certificate_verification => false
    }
    stdout{codec => rubydebug}
}

重启logstash便能看到传入的索引。

使用Filebeats收集日志

使用filebeats收集应用的日志文件 *.log,并把日志发送到logstash。

Filebeats的docker安装

$ docker run -d \
  --name=filebeat \
  --user=root \
  --volume="$(pwd)/filebeat.docker.yml:/usr/share/filebeat/filebeat.yml:ro" \
  --volume="/var/lib/docker/containers:/var/lib/docker/containers:ro" \
  --volume="/var/run/docker.sock:/var/run/docker.sock:ro" \
  --volume="/var/log:/logs:ro" \
  docker.elastic.co/beats/filebeat:7.14.0 filebeat 
 

filebeat.docker.yml文件的配置:

# filebeat.docker.yml
filebeat.inputs:
- type: log  #日志类型
  enable: true
  paths: # 日志位置,所以在创建docker时需要挂载日志目录
    - /logs/info.log
  fields:                     # 使用 fields 模块添加字段
    address: 192.168.1.3     # address 为字段名称,意义为当前服务器的地址
  fields_under_root: true     # 将新增的字段放在顶级,收集后字段名称显示 host_ip。如果设置为 false,则放在子集,收集后显示为 fields.address
  encoding: UTF-8 #日志编码,如果中文乱码,可以试试GB2312

output.logstash:  # 指定输出插件
  hosts: ["192.168.1.0:4561"]   # logstash的位置
  index: 'test' # 索引名字

关于 output.logstashindex,该值会分配给 metadata.beat键,所以可以在logstash收集器的配置文件使用 %{[@metadata][beat]}获取该值。

配置logstash收集器的配置文件:

# logstash-filebeats.conf
input {
    beats {
        port => 4561
    }
}
output {
    elasticsearch {
        hosts => ["https://es01:9200"]
        index => "%{[@metadata][beat]}"  
        user => elastic
        password => TsfYfefVTuDfwDg3IITK
        ssl_certificate_verification => false
    }
    stdout{codec => rubydebug}
}

filebeats配置文件的详细使用可以参考:

filebeat.reference.yml | Filebeat Reference [7.15] | Elastic

logstash配置文件的使用可以参考:

Configure the Logstash output | Filebeat Reference [7.15] | Elastic

因为使用了两份配置文件,所以需要重新配置pipelines.yml:

# pipelines.yml
- pipeline.id: main
  path.config: "/usr/share/logstash/pipeline/logstash.conf"

- pipeline.id: beats
  path.config: "/usr/share/logstash/pipeline/logstash-filebeats.conf"

自定义收集的日志内容

因为默认收集到日志内容包括了时间、日志类型等众多信息,但我们往往只需要看到程序真正输出的内容,这时候可以使用logstash的grok插件对日志内容进行过滤。

grok插件

grok插件用于过滤切分文本。

基本语法为: %{SYNTAX:SEMANTIC}

SYNTAX为配置配置类型,可以用内置类型(可以在https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns找到),也可以自定义类型;

SEMANTIC为匹配文本所赋予的变量名,能在kibana所看到。

例子:

%{NUMBER:phone}

例如要匹配:

112.123.1.10 [info] - success

匹配方式为

%{IP:ip} \[%{LOGLEVEL:level}\] - %{WORD:text}

[]分别需要进行转义 \[\]。其实语法跟正则表达式差不多。

推荐使用https://grokdebug.herokuapp.com/进行调试。

所以此时pipeline配置为:

filter {
    grok {
        match => {"message"=>"%{IP:ip} \[%{LOGLEVEL:level}\] - %{WORD:text}"}
    }
}

grok插件也支持自定义类型

单独把自定义类型放在一个文件(文件名随便)进行管理,可以用正则表达式,也可以用内置类型

USERNAME [a-zA-Z0-9._-]+
TIMESTAMP %{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{TIME}

在patterns_dir参数下指明自定义类型存放文件所在的目录

filter {
    grok {
				patterns_dir => "/usr/share/logstash/pipeline/patterns"
        match => {"message"=>"%{IP:ip} \[%{LOGLEVEL:level}\] - %{WORD:text}"}
				overwrite => ["message"]
    }
}

用 overwrite 参数来重写默认的 message 字段

匹配单行日志

知道了基本的grok语法后,应用了我们的项目中。例如项目正常的info日志为:

2021-08-11 10:58:10 - [INFO] [org.springframework.amqp.rabbit.connection.CachingConnectionFactory:] Attempting to connect to: [xxx]

所以对应的匹配方式为:

%{TIMESTAMP_ISO8601:timestamp} - \[%{LOGLEVEL:level}\] \[%{JAVACLASS:position}\:\] %{WORD:message}

如果字符过长,过多使用 *,或者匹配类型 GREEDYDATADATA,可能会导致logstash超时。暂时没有解决办法,只能是尽量具体化匹配。

匹配多行日志

例如java堆栈。

例子:

2021-08-18 18:10:46 - [ERROR] [com.xx.framework.web.handle.ExceptionHandle:] unexpected error /user/detail: Required request body is missing: public com.xx.msg throws java.lang.Exception
org.springframework.http.converter.HttpMessageNotReadableException: Required request body is missing: public com.xx.msg throws java.lang.Exception
        at org.springframework.web.servlet.mvc.method.annotation.RequestResponseBodyMethodProcessor.readWithMessageConverters(RequestResponseBodyMethodProcessor.java:161) ~[spring-webmvc-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at org.springframework.web.servlet.mvc.method.annotation.RequestResponseBodyMethodProcessor.resolveArgument(RequestResponseBodyMethodProcessor.java:131) ~[spring-webmvc-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_191]

多行日志应该在filebeat中进行预处理,再发送到logstash。因为在logstash中处理多行会造成流混乱和数据损坏。

先在filebeat的配置文件中配置多行:

multiline.type: pattern
multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'   # 匹配以日期开头
multiline.negate: true
multiline.match: after

这个配置的意思是将不符合 pattern条件的行追加到前面符合 pattern条件的行的后面去。

pattern为是匹配模式; negate为是否否认匹配模式; match为组合匹配的行的方式,取决于 negate。详细使用可见于:

Manage multiline messages | Filebeat Reference [7.14] | Elastic

接着配置logstash。由于从filebeat传入的日志中存在多行,所以需要添加 (?m),而且存在换行符,但是普通的 DATAGREEDYDATA分别为 .*?.*,不能匹配换行符,所以需要使用 (?<message>(.|\r|\n)*)来匹配换行符。

总的logstash配置为:

//logstash-filebeats.conf
input {
    beats {
        port => 4561
    }
}
filter {
    grok {
        match => {"message"=>"(?m)%{TIMESTAMP_ISO8601:timestamp} - \[%{LOGLEVEL:level}\] \[%{JAVACLASS:position}\:\] (?<message>(.|\r|\n)*)"}
        overwrite => ["message"]
    }

}
output {
    elasticsearch { 
        hosts => ["https://es01:9200"]
        index => "%{[@metadata][beat]}"
        user => elastic
        password => WjVINkg1zt2DJXAqjWdu
        ssl_certificate_verification => false
    }
    stdout{codec => rubydebug}
}

filebeat日志重复

因为filebeat传输保证的是”at least once”,而不是”exactly once”。因为filebeat只有收到logstash的ack了才会认为数据发送成功,否则就会重传。

为了避免重复,有三种办法在filebeat中定义document的唯一id。具体办法可以看官方文档:

Deduplicate data | Filebeat Reference [7.15] | Elastic

翻译版:

掘金

这里选择在filebeats配置文件中添加add_id处理器,为每个字段生成唯一的id。

processors:
  - add_id: ~

注:如果程序写日志时把同一段日志写进了两个日志文件(例如把error日志同时写到了error文件和info文件),filebeat会为其分别添加不同的id,这也会造成重复数据。

filebeat收集多个日志文件并添加到不同的索引

在filebeat.yml中配置两个日志文件的地址,并在 fields字段下添加字段 addresscategory标识不同的日志文件, fields下的字段可以在logstash.conf中直接使用,例如: if [category]{}"%{[category]}-%{[address]}"。不在filebeat配置index,而是在logstash进行处理。

# filebeat.yml
fields:                     
    address: 192.168.1.0    
    category: test

在logstash接受这两个字段,并将其配置为index:

# logstash.conf
output {
  elasticsearch { 
      hosts => ["https://es01:9200"]
      index => "%{[category]}-%{[address]}"
      user => elastic
      password => WjVINkg1zt2DJXAqjWdu
      ssl_certificate_verification => false
      document_id => "%{[@metadata][_id]}"
  }

所以在kibana能看到index

将日志产生的时间替换@timestamp

在logstash.conf中将从日志中匹配的timestamp字段代替@timestamp字段

# logstash.conf
filter {
    grok {
        match => {"message"=>"(?m)%{TIMESTAMP_ISO8601:timestamp} - \[%{LOGLEVEL:level}\] \[%{JAVACLASS:position}\:\] (?<message>(.|\r|\n)*)"}
        overwrite => ["message"]
    }
    date {
				# 匹配上文过滤出来的timestamp字段,timestamp格式为yyyy-MM-dd HH:mm:ss,这里指的是日志文件中的日志格式
        match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
				# 将上面匹配的timestamp的值存储到@timestamp
        target => "@timestamp"
				# 设置时间格式化的时区
        timezone => "Asia/Shanghai"
    }

    mutate {
			# 去除timestamp键
      remove_field => [ "timestamp" ]
    }

}

但是这个方法有问题,会导致logstash和elasticsearch中的看到的日志时间不对,暂时没找到解决的办法。

参考

第四和第五问:

Elasticsearch 100问(1-30)

关于date:

Date filter plugin | Logstash Reference [7.x] | Elastic

关于event.set:

Event API | Logstash Reference [7.x] | Elastic

踩坑:

ELK开发日记(2) - 超坑爹的Filebeat 7.2.0时区漂移 (UTC+16) 解决方案

容器目录下的日志time字段总是比正常时间晚8小时 · Issue #209 · rootsongjc/kubernetes-handbook

ELK中Kibana组件的时间误差

配置文件

所以目前最终的配置文件如下;

filebeat.yml

filebeat.inputs:
- type: log
  enable: true
  paths: 
    - /logs/test.log
  fields:                     # 使用 fields 模块添加字段
    address: 192.168.1.3     # address 为字段名称
    category: test
  fields_under_root: true     # 将新增的字段放在顶级,收集后字段名称显示 address。如果设置为 false,则放在子集,收集后显示为 fields.address
  encoding: UTF-8 
  multiline.type: pattern
  multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
  multiline.negate: true
  multiline.match: after

processors:
  - add_id: ~

output.logstash:
  enable: true
  hosts: ["192.168.1.0:4561"]

logstash.conf

input {
    beats {
        port => 4561
    }
}
filter {
    grok {
        match => {"message"=>"(?m)%{TIMESTAMP_ISO8601:timestamp} - \[%{LOGLEVEL:level}\] \[%{JAVACLASS:position}\:\] (?<message>(.|\r|\n)*)"}
        overwrite => ["message"]
    }
    date {
        match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
        target => "@timestamp"
        timezone => "Asia/Shanghai"
    }

    mutate {
      remove_field => [ "timestamp" ]
    }

}
output {
  if [@metadata][_id] {
    elasticsearch { 
        hosts => ["https://es01:9200"]
        index => "%{[category]}-%{[address]}"
        user => elastic
        password => WjVINkg1zt2DJXAqjWdu
        ssl_certificate_verification => false
        document_id => "%{[@metadata][_id]}"
    }
  }else {
    elasticsearch { 
        hosts => ["https://es01:9200"]
        index => "%{[@metadata][beat]}"
        user => elastic
        password => WjVINkg1zt2DJXAqjWdu
        ssl_certificate_verification => false
    }

  }
    stdout{codec => rubydebug}
}

发送告警日志到钉

Untitled

创建一个连接器

方法选择POST,URL填入钉钉的webhook地址,添加标头Content-Type:application/json。

Untitled

钉钉的webhook的申请方式:

首先新建一个群聊,如果需要新建一个人的群聊,可以在电脑端,发起群聊,选择同学群。

然后添加群聊机器人,选择自定义即可。

Untitled

这里选择了自定义关键词elastic。然后把webhook地址贴到elastic即可。

测试

在正文填入:

{"msgtype": "text","text": {"content":"elastic发来信息"}}

Untitled

点击测试

钉钉便能收到信息。

Untitled

发送的正文内容可以看钉钉的文档

自定义机器人接入 - 钉钉开放平台

正文参数可以参考elastic的文档

Create and manage rules | Kibana Guide [master] | Elastic

Untitled

参考

Running the Elastic Stack on Docker | Getting Started [7.15] | Elastic

Spring Boot 使用 Log4j2 & Logback 输出日志到 EKL_哎_小羊的博客-CSDN博客

Logstash 最佳实践

Grok filter plugin | Logstash Reference [7.15] | Elastic

ELK系列(七)、Filebeat+Logstash采集多个日志文件并写入不同的ES索引中_王义凯 的博客-CSDN博客