ELK Stack(Elasticsearch、Kibana、Beats 和 Logstash)搭建日志收集系统
文章目录
Elasticsearch用于搜索、分析数据。
Kibana用于展示数据。
Beats用于收集数据。
Logstash用于集中、转换和存储数据。
日志的主要处理流程为下图:
使用Docker搭建ELK
新建配置文件
在elastic/config目录下,创建ElasticSearch的配置文件:
# es01.yml
cluster.name: "docker-cluster"
network.host: 0.0.0.0
同目录下创建kibnana的配置文件:
# kib01.yml
server.host: "0"
elasticsearch.hosts: [ "http://elasticsearch:9200" ]
monitoring.ui.container.elasticsearch.enabled: true
xpack.encryptedSavedObjects.encryptionKey: b014b39f62fd1d9a0b1dac766c0e51f5
xpack.reporting.encryptionKey: 9e069cbc6b68796799f96f057ce6c5f5
xpack.security.encryptionKey: 1e0cf9eb23cbfd00d8ba113cb5327bb5
在elastic/config/logstash目录下,创建logstash的配置文件:
# logstash.yml
http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: [ "https://es01:9200" ]
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.username: elastic
xpack.monitoring.elasticsearch.password: TsfYfefVTuDfwDg3IITK
xpack.monitoring.elasticsearch.ssl.certificate_authority: /usr/share/elasticsearch/config/certificates/ca/ca.crt
在同目录下,创建logstash管理pipeline的配置文件:
# pipelines.yml
# This file is where you define your pipelines. You can define multiple.
# # For more information on multiple pipelines, see the documentation:
# # https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html
#
- pipeline.id: main
path.config: "/usr/share/logstash/pipeline"
在elastic/config/logstash/pipeline目录下,创建logstash收集器配置文件:
# logstash.conf
input {
tcp {
mode => "server"
host => "0.0.0.0"
port => 4560
codec => json_lines
}
}
output {
elasticsearch {
hosts => ["https://es01:9200"]
index => "springboot-test-%{+YYYY.MM.dd}"
user => elastic
password => TsfYfefVTuDfwDg3IITK
ssl_certificate_verification => false #关闭ssl
}
stdout{codec => rubydebug}
}
准备创建docker-compose文件
在elastic/config/elasticsearch/certificaes目录下创建instances.yml文件:
# instances.yml
instances:
- name: es01
dns:
- es01
- localhost
ip:
- 127.0.0.1
- name: 'kib01'
dns:
- kib01
- localhost
- name: 'logstash'
dns:
- logstash
- localhost
ip:
- 127.0.0.1
在elastic目录下,创建认证的文件:
# create-certs.yml
version: '2.2'
services:
create_certs:
image: docker.elastic.co/elasticsearch/elasticsearch:${VERSION}
container_name: create_certs
command: >
bash -c '
yum install -y -q -e 0 unzip;
if [[ ! -f /certs/bundle.zip ]]; then
bin/elasticsearch-certutil cert --silent --pem --in config/certificates/instances.yml -out /certs/bundle.zip;
unzip /certs/bundle.zip -d /certs;
fi;
chown -R 1000:0 /certs
'
working_dir: /usr/share/elasticsearch
volumes:
- ./certs:/certs
- ./config/elasticsearch/certificates:/usr/share/elasticsearch/config/certificates
networks:
- elastic
volumes:
certs:
driver: local
networks:
elastic:
driver: bridge
在同目录下,创建环境文件,方便统一管理docker版本:
# .env
COMPOSE_PROJECT_NAME=es
CERTS_DIR=/usr/share/elasticsearch/config/certificates # 这个为docker容器里面的位置
VERSION=7.13.4
同目录下,创建docker-compose文件:
# elastic-docker-tls.yml
version: '2.2'
services:
es01:
image: docker.elastic.co/elasticsearch/elasticsearch:${VERSION}
container_name: es01
environment:
- cluster.name=es-docker
- discovery.type=single-node
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- xpack.license.self_generated.type=trial
- xpack.security.enabled=true
- xpack.security.http.ssl.enabled=true
- xpack.security.http.ssl.key=$CERTS_DIR/es01/es01.key
- xpack.security.http.ssl.certificate_authorities=$CERTS_DIR/ca/ca.crt
- xpack.security.http.ssl.certificate=$CERTS_DIR/es01/es01.crt
- xpack.security.transport.ssl.enabled=true
- xpack.security.transport.ssl.verification_mode=certificate
- xpack.security.transport.ssl.certificate_authorities=$CERTS_DIR/ca/ca.crt
- xpack.security.transport.ssl.certificate=$CERTS_DIR/es01/es01.crt
- xpack.security.transport.ssl.key=$CERTS_DIR/es01/es01.key
- TZ=Asia/Shanghai
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- data01:/usr/share/elasticsearch/data
- ./certs:$CERTS_DIR
- ./config/es01.yml:/usr/share/elasticsearch/config/elasticsearch.yml
ports:
- 9200:9200
networks:
- elastic
healthcheck:
test: curl --cacert $CERTS_DIR/ca/ca.crt -s https://localhost:9200 >/dev/null; if [[ $$? == 52 ]]; then echo 0; else echo 1; fi
interval: 30s
timeout: 10s
retries: 5
kib01:
image: docker.elastic.co/kibana/kibana:${VERSION}
container_name: kib01
depends_on: {"es01": {"condition": "service_healthy"}}
ports:
- 5601:5601
environment:
SERVERNAME: localhost
ELASTICSEARCH_URL: https://es01:9200
ELASTICSEARCH_HOSTS: https://es01:9200
ELASTICSEARCH_USERNAME: kibana_system
ELASTICSEARCH_PASSWORD: 1Y51Fvgcq9Tv5oG1MWSr
ELASTICSEARCH_SSL_CERTIFICATEAUTHORITIES: $CERTS_DIR/ca/ca.crt
SERVER_SSL_ENABLED: "true"
SERVER_SSL_KEY: $CERTS_DIR/kib01/kib01.key
SERVER_SSL_CERTIFICATE: $CERTS_DIR/kib01/kib01.crt
I18N_LOCALE: zh-CN
TZ: Asia/Shanghai
volumes:
- ./certs:$CERTS_DIR
- ./config/kib01.yml:/usr/share/kibana/config/kibana.yml
networks:
- elastic
logstash:
image: docker.elastic.co/logstash/logstash:${VERSION}
container_name: logstash
depends_on: {"es01": {"condition": "service_healthy"}}
volumes:
- ./config/logstash/logstash.yml:/usr/share/logstash/config/logstash.yml
- ./config/logstash/pipelines.yml:/usr/share/logstash/config/pipelines.yml
- ./config/logstash/pipeline:/usr/share/logstash/pipeline
- ./certs:$CERTS_DIR
ports:
- 4560:4560
- 4561:4561
environment:
SERVERNAME: localhost
ELASTICSEARCH_URL: https://es01:9200
ELASTICSEARCH_HOSTS: https://es01:9200
ELASTICSEARCH_USERNAME: logstash_system
ELASTICSEARCH_PASSWORD: psvTdxGicq3R0evjCLHv
ELASTICSEARCH_SSL_CERTIFICATEAUTHORITIES: $CERTS_DIR/ca/ca.crt
SERVER_SSL_ENABLED: "true"
SERVER_SSL_KEY: $CERTS_DIR/logstash/logstash.key
SERVER_SSL_CERTIFICATE: $CERTS_DIR/logstash/logstash.crt
I18N_LOCALE: zh-CN
TZ: Asia/Shanghai
networks:
- elastic
volumes:
data01:
driver: local
certs:
driver: local
networks:
elastic:
driver: bridge
建议从官方拉取docker文件,但是官方网址比较慢,也可以改为从其他镜像仓库下载,例如把 [docker.elastic.co/kibana/kibana:${VERSION}](http://docker.elastic.co/kibana/kibana:${VERSION})
官方地址前缀去掉,变成 kibana:${VERSION}
,其他images也类似。
配置认证并启动ELK
首先创建认证:
$ docker-compose -f create-certs.yml run --rm create_certs
启动所有服务:
$ docker-compose -f elastic-docker-tls.yml up -d
生成用户密码:
$ docker exec es01 /bin/bash -c "bin/elasticsearch-setup-passwords \
auto --batch --url https://es01:9200"
将生成的密码记录下来,将kibana_system的密码复制到elastic-docker-tls.yml文件中的kib01的ELASTICSEARCH_PASSWORD,将logstash_system的密码复制到elastic-docker-tls.yml文件中的logstash的ELASTICSEARCH_PASSWORD。
停止docker-compose:
$ docker-compose -f elastic-docker-tls.yml stop
再启动docker-compose即可。
启动成功后,用命令 docker network inspect es_elastic
能看到应用都连接到了同一个网络。
检查logstash是否正常启动,通过命令 docker logs logstash
查看日志是否有error。
Spring集成ELK
将Spring产生的日志发送到logstash,logstash对数据进行过滤后,再发送到elasticsearch,再通过kibnana查看数据。
添加依赖
Spring使用的是logback,原本是想使用log4j,但没能集成成功。
添加logstash依赖(核心),lombok和spring-boot-starter-web:
<!-- pom.xml -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
<version>1.18.20</version>
</dependency>
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>6.6</version>
</dependency>
集成logstash的详细使用可以参考:
GitHub - logstash/logstash-logback-encoder: Logback JSON encoder and appenders
项目配置
配置日志配置文件logback.xml,配置文件放在src/main/resource下:
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="60 seconds" debug="false">
<include resource="org/springframework/boot/logging/logback/base.xml" />
<contextName>logback</contextName>
<appender name="logstash" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<destination>127.0.0.1:4560</destination>
<encoder class="net.logstash.logback.encoder.LogstashEncoder" />
</appender>
<!--输出到控制台-->
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<root level="info">
<appender-ref ref="logstash"/>
<appender-ref ref="console" />
</root>
</configuration>
<destination>127.0.0.1:4560</destination>
logstash ip和暴露的端口,logback就是通过这个地址把日志发送给logstash。
并在application.properties文件配置日志输出级别和日志配置文件位置:
# application.properties
logging.level.root=info
logging.config=classpath:logback.xml
打印日志的代码:
// TestController.java
@RestController
@Slf4j
public class TestController {
@GetMapping(value = "/log4j2")
public String testLog(){
try {
log.info("Hello 这是 info message. 信息");
log.error("Hello 这是 error message. 报警");
log.warn("Hello 这是 warn message. 警告");
log.debug("Hello 这是 debug message. 调试");
List<String> list = new ArrayList<>();
System.out.println(list.get(2));
} catch (Exception e) {
log.error("testLog", e);
}
return "";
}
}
logstash配置
在logstash.conf配置收集器,其实前面已经配置过了。更改了配置之后需要重启logstash:
# logstash.conf
input {
tcp {
mode => "server"
host => "0.0.0.0" #不限定输入的ip地址
port => 4560 #监听的端口
codec => json_lines #解析器
}
}
output {
elasticsearch {
hosts => ["https://es01:9200"]
index => "springboot-test-%{+YYYY.MM.dd}"
user => elastic
password => TsfYfefVTuDfwDg3IITK
ssl_certificate_verification => false
}
stdout{codec => rubydebug}
}
查看日志
可以在logstash中看到接收到的日志。
接下来,使用账号elastic登陆kibana,页面地址为http://ip:5601
在索引管理能看到传入的数据。
然后去到索引模式,创建索引模式
索引名字同logstash.conf中output中的index。时间筛选字段名称处选择 @timestamp,方便我们后边按照时间段筛选数据
然后去到Discover下,选择对应的索引模式及日志时间,便能看到数据
关于logstash的ssl配置可以参考:
Error unable to find valid certification path
Collect Logstash monitoring data using legacy collectors | Logstash Reference [7.14] | Elastic
从项目传入自定义索引
上面的配置只能用于一个项目,但是不可能一个ELK只有一个项目,所以为了让ELK接入多个项目,项目传入自定义的索引。
修改logback.xml文件,增加了
<!-- logback.xml -->
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="60 seconds" debug="false">
<include resource="org/springframework/boot/logging/logback/base.xml" />
<contextName>logback</contextName>
<appender name="logstash" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<destination>127.0.0.1:4560</destination>
<encoder class="net.logstash.logback.encoder.LogstashEncoder" >
<customFields>{"appName": "spring-test2"}</customFields>
</encoder>
</appender>
<!--输出到控制台-->
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<root level="info">
<appender-ref ref="logstash"/>
<appender-ref ref="console" />
</root>
</configuration>
修改logstash.conf
# logstash.conf
input {
tcp {
mode => "server"
host => "0.0.0.0"
port => 4560
codec => json_lines
}
}
output {
elasticsearch {
hosts => ["https://es01:9200"]
index => "%{[appName]}-%{+YYYY.MM.dd}"
user => elastic
password => TsfYfefVTuDfwDg3IITK
ssl_certificate_verification => false
}
stdout{codec => rubydebug}
}
重启logstash便能看到传入的索引。
使用Filebeats收集日志
使用filebeats收集应用的日志文件 *.log
,并把日志发送到logstash。
Filebeats的docker安装
$ docker run -d \
--name=filebeat \
--user=root \
--volume="$(pwd)/filebeat.docker.yml:/usr/share/filebeat/filebeat.yml:ro" \
--volume="/var/lib/docker/containers:/var/lib/docker/containers:ro" \
--volume="/var/run/docker.sock:/var/run/docker.sock:ro" \
--volume="/var/log:/logs:ro" \
docker.elastic.co/beats/filebeat:7.14.0 filebeat
filebeat.docker.yml文件的配置:
# filebeat.docker.yml
filebeat.inputs:
- type: log #日志类型
enable: true
paths: # 日志位置,所以在创建docker时需要挂载日志目录
- /logs/info.log
fields: # 使用 fields 模块添加字段
address: 192.168.1.3 # address 为字段名称,意义为当前服务器的地址
fields_under_root: true # 将新增的字段放在顶级,收集后字段名称显示 host_ip。如果设置为 false,则放在子集,收集后显示为 fields.address
encoding: UTF-8 #日志编码,如果中文乱码,可以试试GB2312
output.logstash: # 指定输出插件
hosts: ["192.168.1.0:4561"] # logstash的位置
index: 'test' # 索引名字
关于 output.logstash
的 index
,该值会分配给 metadata.beat
键,所以可以在logstash收集器的配置文件使用 %{[@metadata][beat]}
获取该值。
配置logstash收集器的配置文件:
# logstash-filebeats.conf
input {
beats {
port => 4561
}
}
output {
elasticsearch {
hosts => ["https://es01:9200"]
index => "%{[@metadata][beat]}"
user => elastic
password => TsfYfefVTuDfwDg3IITK
ssl_certificate_verification => false
}
stdout{codec => rubydebug}
}
filebeats配置文件的详细使用可以参考:
filebeat.reference.yml | Filebeat Reference [7.15] | Elastic
logstash配置文件的使用可以参考:
Configure the Logstash output | Filebeat Reference [7.15] | Elastic
因为使用了两份配置文件,所以需要重新配置pipelines.yml:
# pipelines.yml
- pipeline.id: main
path.config: "/usr/share/logstash/pipeline/logstash.conf"
- pipeline.id: beats
path.config: "/usr/share/logstash/pipeline/logstash-filebeats.conf"
自定义收集的日志内容
因为默认收集到日志内容包括了时间、日志类型等众多信息,但我们往往只需要看到程序真正输出的内容,这时候可以使用logstash的grok插件对日志内容进行过滤。
grok插件
grok插件用于过滤切分文本。
基本语法为: %{SYNTAX:SEMANTIC}
SYNTAX
为配置配置类型,可以用内置类型(可以在https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns找到),也可以自定义类型;
SEMANTIC
为匹配文本所赋予的变量名,能在kibana所看到。
例子:
%{NUMBER:phone}
例如要匹配:
112.123.1.10 [info] - success
匹配方式为
%{IP:ip} \[%{LOGLEVEL:level}\] - %{WORD:text}
[
和 ]
分别需要进行转义 \[
和 \]
。其实语法跟正则表达式差不多。
推荐使用https://grokdebug.herokuapp.com/进行调试。
所以此时pipeline配置为:
filter {
grok {
match => {"message"=>"%{IP:ip} \[%{LOGLEVEL:level}\] - %{WORD:text}"}
}
}
grok插件也支持自定义类型
单独把自定义类型放在一个文件(文件名随便)进行管理,可以用正则表达式,也可以用内置类型
USERNAME [a-zA-Z0-9._-]+
TIMESTAMP %{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{TIME}
在patterns_dir参数下指明自定义类型存放文件所在的目录
filter {
grok {
patterns_dir => "/usr/share/logstash/pipeline/patterns"
match => {"message"=>"%{IP:ip} \[%{LOGLEVEL:level}\] - %{WORD:text}"}
overwrite => ["message"]
}
}
用 overwrite 参数来重写默认的 message 字段
匹配单行日志
知道了基本的grok语法后,应用了我们的项目中。例如项目正常的info日志为:
2021-08-11 10:58:10 - [INFO] [org.springframework.amqp.rabbit.connection.CachingConnectionFactory:] Attempting to connect to: [xxx]
所以对应的匹配方式为:
%{TIMESTAMP_ISO8601:timestamp} - \[%{LOGLEVEL:level}\] \[%{JAVACLASS:position}\:\] %{WORD:message}
如果字符过长,过多使用 *
,或者匹配类型 GREEDYDATA
和 DATA
,可能会导致logstash超时。暂时没有解决办法,只能是尽量具体化匹配。
匹配多行日志
例如java堆栈。
例子:
2021-08-18 18:10:46 - [ERROR] [com.xx.framework.web.handle.ExceptionHandle:] unexpected error /user/detail: Required request body is missing: public com.xx.msg throws java.lang.Exception
org.springframework.http.converter.HttpMessageNotReadableException: Required request body is missing: public com.xx.msg throws java.lang.Exception
at org.springframework.web.servlet.mvc.method.annotation.RequestResponseBodyMethodProcessor.readWithMessageConverters(RequestResponseBodyMethodProcessor.java:161) ~[spring-webmvc-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at org.springframework.web.servlet.mvc.method.annotation.RequestResponseBodyMethodProcessor.resolveArgument(RequestResponseBodyMethodProcessor.java:131) ~[spring-webmvc-5.2.3.RELEASE.jar!/:5.2.3.RELEASE]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_191]
多行日志应该在filebeat中进行预处理,再发送到logstash。因为在logstash中处理多行会造成流混乱和数据损坏。
先在filebeat的配置文件中配置多行:
multiline.type: pattern
multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}' # 匹配以日期开头
multiline.negate: true
multiline.match: after
这个配置的意思是将不符合 pattern
条件的行追加到前面符合 pattern
条件的行的后面去。
pattern
为是匹配模式; negate
为是否否认匹配模式; match
为组合匹配的行的方式,取决于 negate
。详细使用可见于:
Manage multiline messages | Filebeat Reference [7.14] | Elastic
接着配置logstash。由于从filebeat传入的日志中存在多行,所以需要添加 (?m)
,而且存在换行符,但是普通的 DATA
和 GREEDYDATA
分别为 .*?
和 .*
,不能匹配换行符,所以需要使用 (?<message>(.|\r|\n)*)
来匹配换行符。
总的logstash配置为:
//logstash-filebeats.conf
input {
beats {
port => 4561
}
}
filter {
grok {
match => {"message"=>"(?m)%{TIMESTAMP_ISO8601:timestamp} - \[%{LOGLEVEL:level}\] \[%{JAVACLASS:position}\:\] (?<message>(.|\r|\n)*)"}
overwrite => ["message"]
}
}
output {
elasticsearch {
hosts => ["https://es01:9200"]
index => "%{[@metadata][beat]}"
user => elastic
password => WjVINkg1zt2DJXAqjWdu
ssl_certificate_verification => false
}
stdout{codec => rubydebug}
}
filebeat日志重复
因为filebeat传输保证的是”at least once”,而不是”exactly once”。因为filebeat只有收到logstash的ack了才会认为数据发送成功,否则就会重传。
为了避免重复,有三种办法在filebeat中定义document的唯一id。具体办法可以看官方文档:
Deduplicate data | Filebeat Reference [7.15] | Elastic
翻译版:
这里选择在filebeats配置文件中添加add_id处理器,为每个字段生成唯一的id。
processors:
- add_id: ~
注:如果程序写日志时把同一段日志写进了两个日志文件(例如把error日志同时写到了error文件和info文件),filebeat会为其分别添加不同的id,这也会造成重复数据。
filebeat收集多个日志文件并添加到不同的索引
在filebeat.yml中配置两个日志文件的地址,并在 fields
字段下添加字段 address
和 category
标识不同的日志文件, fields
下的字段可以在logstash.conf中直接使用,例如: if [category]{}
和 "%{[category]}-%{[address]}"
。不在filebeat配置index,而是在logstash进行处理。
# filebeat.yml
fields:
address: 192.168.1.0
category: test
在logstash接受这两个字段,并将其配置为index:
# logstash.conf
output {
elasticsearch {
hosts => ["https://es01:9200"]
index => "%{[category]}-%{[address]}"
user => elastic
password => WjVINkg1zt2DJXAqjWdu
ssl_certificate_verification => false
document_id => "%{[@metadata][_id]}"
}
所以在kibana能看到index
将日志产生的时间替换@timestamp
在logstash.conf中将从日志中匹配的timestamp字段代替@timestamp字段
# logstash.conf
filter {
grok {
match => {"message"=>"(?m)%{TIMESTAMP_ISO8601:timestamp} - \[%{LOGLEVEL:level}\] \[%{JAVACLASS:position}\:\] (?<message>(.|\r|\n)*)"}
overwrite => ["message"]
}
date {
# 匹配上文过滤出来的timestamp字段,timestamp格式为yyyy-MM-dd HH:mm:ss,这里指的是日志文件中的日志格式
match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
# 将上面匹配的timestamp的值存储到@timestamp
target => "@timestamp"
# 设置时间格式化的时区
timezone => "Asia/Shanghai"
}
mutate {
# 去除timestamp键
remove_field => [ "timestamp" ]
}
}
但是这个方法有问题,会导致logstash和elasticsearch中的看到的日志时间不对,暂时没找到解决的办法。
参考
第四和第五问:
关于date:
Date filter plugin | Logstash Reference [7.x] | Elastic
关于event.set:
Event API | Logstash Reference [7.x] | Elastic
踩坑:
ELK开发日记(2) - 超坑爹的Filebeat 7.2.0时区漂移 (UTC+16) 解决方案
容器目录下的日志time字段总是比正常时间晚8小时 · Issue #209 · rootsongjc/kubernetes-handbook
配置文件
所以目前最终的配置文件如下;
filebeat.yml
filebeat.inputs:
- type: log
enable: true
paths:
- /logs/test.log
fields: # 使用 fields 模块添加字段
address: 192.168.1.3 # address 为字段名称
category: test
fields_under_root: true # 将新增的字段放在顶级,收集后字段名称显示 address。如果设置为 false,则放在子集,收集后显示为 fields.address
encoding: UTF-8
multiline.type: pattern
multiline.pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
multiline.negate: true
multiline.match: after
processors:
- add_id: ~
output.logstash:
enable: true
hosts: ["192.168.1.0:4561"]
logstash.conf
input {
beats {
port => 4561
}
}
filter {
grok {
match => {"message"=>"(?m)%{TIMESTAMP_ISO8601:timestamp} - \[%{LOGLEVEL:level}\] \[%{JAVACLASS:position}\:\] (?<message>(.|\r|\n)*)"}
overwrite => ["message"]
}
date {
match => ["timestamp", "yyyy-MM-dd HH:mm:ss"]
target => "@timestamp"
timezone => "Asia/Shanghai"
}
mutate {
remove_field => [ "timestamp" ]
}
}
output {
if [@metadata][_id] {
elasticsearch {
hosts => ["https://es01:9200"]
index => "%{[category]}-%{[address]}"
user => elastic
password => WjVINkg1zt2DJXAqjWdu
ssl_certificate_verification => false
document_id => "%{[@metadata][_id]}"
}
}else {
elasticsearch {
hosts => ["https://es01:9200"]
index => "%{[@metadata][beat]}"
user => elastic
password => WjVINkg1zt2DJXAqjWdu
ssl_certificate_verification => false
}
}
stdout{codec => rubydebug}
}
发送告警日志到钉
创建一个连接器
方法选择POST,URL填入钉钉的webhook地址,添加标头Content-Type:application/json。
钉钉的webhook的申请方式:
首先新建一个群聊,如果需要新建一个人的群聊,可以在电脑端,发起群聊,选择同学群。
然后添加群聊机器人,选择自定义即可。
这里选择了自定义关键词elastic。然后把webhook地址贴到elastic即可。
测试
在正文填入:
{"msgtype": "text","text": {"content":"elastic发来信息"}}
点击测试
钉钉便能收到信息。
发送的正文内容可以看钉钉的文档
正文参数可以参考elastic的文档
Create and manage rules | Kibana Guide [master] | Elastic
参考
Running the Elastic Stack on Docker | Getting Started [7.15] | Elastic
Spring Boot 使用 Log4j2 & Logback 输出日志到 EKL_哎_小羊的博客-CSDN博客
Grok filter plugin | Logstash Reference [7.15] | Elastic
ELK系列(七)、Filebeat+Logstash采集多个日志文件并写入不同的ES索引中_王义凯 的博客-CSDN博客
文章作者 梧桐碎梦
上次更新 2021-10-11 23:59:00