ELK的安装部署与使用

您所在的位置:网站首页 elk中文手册 ELK的安装部署与使用

ELK的安装部署与使用

#ELK的安装部署与使用| 来源: 网络整理| 查看: 265

ELK的安装与使用 安装部署

部署环境:Elasticsearch-7.17.3 Logstash-7.17.3 Kibana-7.17.3

一、安装部署Elasticsearch 解压目录,进入conf目录下编辑elasticsearch.yml文件,输入以下内容并保存 network.host: 127.0.0.1 http.port: 9200 # 跨域配置 http.cors.enabled: true http.cors.allow-origin: "*"

3.进入bin目录,点击elasticsearch.bat文件启动项目

二、安装部署Logstash 文件配置

在conf目录下新建logstash.conf

input { stdin { } jdbc { # 配置数据库信息 jdbc_connection_string => "jdbc:mysql://localhost:3306/student?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai" jdbc_driver_class => "com.mysql.cj.jdbc.Driver" jdbc_user => "root" jdbc_password => "root" jdbc_paging_enabled => "true" jdbc_page_size => "50000" jdbc_default_timezone => "Asia/Shanghai" # mysql驱动所在位置 jdbc_driver_library => "D:\ELK\mysql-connector-java-8.0.11.jar" #sql执行语句 statement => "SELECT * FROM user" # 定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新 schedule => "* * * * *" # 是否将 sql 中 column 名称转小写 lowercase_column_names => false } } output { elasticsearch { hosts => ["127.0.0.1:9200"] index => "goods" # 文档_id,%{goods_id}意思是取查询出来的goods_id的值,并将其映射到shop的_id字段中 # 文档_id,%{goodsId}如果是别名,意思是取查询出来的goodsId的值,并将其映射到shop的_id字段中 document_id => "%{goodsId}" } stdout { codec => json_lines } } 遇到问题与解决

①JAVA_HOME环境找不到,不推荐使用JAVA_HOME变量名

系统找不到指定的路径。could not find java; set JAVA_HOME or ensurejava is in PATH D: ELK logstash-?.17.3 bin>

解决方法:

新增JDK环境变量:命名为 LS_JAVA_HOME

将jdk路径复制进去,注意!!!JDK版本必须1.8以上

②报以下错误

在这里插入图片描述

解决办法:

在bin目录下新建文件:logstash.conf

设置logstash.conf的值,或指定路径下的conf文件

input { stdin{ } } output { stdout{ } }

启动logstash,进入bin目录下的cmd,输入以下命令

logstash -f logstash.conf 安装部署Kibana

进入config目录将kibana.yml文件修改

server.port: 5601 server.host: "0.0.0.0" elasticsearch.hosts: ["http://localhost:9200"] kibana.index: ".kibana" SpringBoot集成Logstash收集日志 1、导入依赖

在pom.xml文件导入以下依赖

net.logstash.logback logstash-logback-encoder 5.3 2、配置文件

新建"logback-spring.xml"文件,并编写内容如下:

localhost:4560 3、配置application.yml文件

进入application.yml文件配置日志

logging: level: root: info config: classpath:logback-spring.xml 4、配置logstash下的logstash.conf文件 input{ tcp{ mode => "server" host => "127.0.0.1" port => 4560 codec => json_lines } } output{ elasticsearch{ action => "index" hosts => "localhost:9200" index => "springboot_log-%{+YYYY.MM.dd}" } }

以上配置完启动可运行成功

5、配置Logstash日志保存到数据库

Linux配置这里就不提了,网上很多。这里讲的是windows系统下的配置,并以MySQL数据库为例。

使用的是jdbc,由于Logstash默认不带jdbc,所以需要下载logstash-output-jdbc,在线下载方式:进入logstash的bin目录下输入以下命令:

logstash-plugin install --no-verify logstash-output-jdbc

若安装失败,则去百度找一个压缩包下载(小编是在百度网盘上面下载的)

下一步是采用不联网的安装方式:进入logstash的bin目录下输入以下命令:

logstash-plugin install file://文件路径+压缩包全名,以'/'分隔

若还是安装失败,则需要更改配置文件

在logstash根目录下的Gemfile文件修改以下配置

source "https://gems.ruby-china.com"

重新进入logstash的bin目录下输入以下命令

logstash-plugin install file://文件路径+压缩包全名,以'/'分隔

网上下载mysql驱动,譬如mysql-connector-java-8.0.11.jar

下载后在logstash下的vendor目录下新建jar目录,再进入jar目录新建jdbc目录

将mysql驱动包放到jdbc目录下,全局路径如下:logstash-7.17.3目录为安装模块

logstash-7.17.3\vendor\jar\jdbc\mysql-connector-java-8.0.11.jar

譬如springboot下的日志内容为

log.info(“|”+“version内容”+“|”+“taskId内容”+“|”+“taskType内容”+“|”+“data内容”+“|”+“time内容”)

重新进入logstash.conf文件,编辑以下内容

input{ tcp{ mode => "server" host => "127.0.0.1" port => 4560 codec => json_lines } } filter{ mutate{ split => ["message","|"] } if[message][2]{ mutate{ add_field =>{ "version" => "%{[message][2]}" } } } if[message][3]{ mutate{ add_field =>{ "taskId" => "%{[message][3]}" } } } if[message][4]{ mutate{ add_field =>{ "taskType" => "%{[message][4]}" } } } if[message][5]{ mutate{ add_field =>{ "data" => "%{[message][5]}" } } } if[message][6]{ mutate{ add_field =>{ "time" => "%{[message][6]}" } } } mutate{ convert => { "version" => "string" "taskId" => "string" "taskType" => "string" "data" => "string" "time" => "string" } } } output{ driver_class => "com.mysql.cj.jdbc.Driver" connection_string => "jdbc:mysql://127.0.0.1:3306/student?user=用户名&password=密码&serverTimezone=GMT%2B8&characterEncoding=utf8&useSSL=false&autoReconnect=true" statement => ["insert into 表名 (version,taskId,taskType,data,time) VALUES (?,?,?,?,?)","version","taskId","taskType","data","time"] } 6、区分业务日志和错误日志存储

add_tag 属性,在input添加,用于区分输出

具体内容就不详细介绍了,看下面小编写的内容

input { tcp { mode => "server" host => "127.0.0.1" port => 4560 codec => json_lines } } filter{ mutate{ split => ["message","|"] } if[message][1] == "business_log"{ mutate{ add_tag =>["business_log"] } } if[message][1]== "error_log"{ mutate{ add_tag => ["error_log"] } } if[message][2]=="ElasticSearch"{ mutate{ add_tag => ["ElasticSearch"] } } if[message][2]=="MYSQL"{ mutate{ add_tag => ["MySQL"] } } if[message][2]=="PgSQL"{ mutate{ add_tag => ["PgSQL"] } } if[message][2]=="ElasticSearchMySQL"{ mutate{ add_tag => ["ElasticSearchAndMySQL"] } } if[message][2]=="ElasticSearchPgSQL"{ mutate{ add_tag => ["ElasticSearchAndPgSQL"] } } if [level] == "INFO"{ if[message][3]{ mutate{ add_field =>{ "version" => "%{[message][3]}" } } } if[message][4]{ mutate{ add_field =>{ "taskId" => "%{[message][4]}" } } } if[message][5]{ mutate{ add_field =>{ "taskType" => "%{[message][5]}" } } } if[message][6]{ mutate{ add_field =>{ "data" => "%{[message][6]}" } } } if[message][7]{ mutate{ add_field =>{ "time" => "%{[message][7]}" } } } mutate{ convert => { "version" => "string" "taskId" => "string" "taskType" => "string" "data" => "string" "time" => "string" } } } } output { if [level] == "INFO"{ if "business_log" in [tags]{ if "ElasticSearch" in [tags]{ elasticsearch { action => "index" hosts => "localhost:9200" index => "business_log-%{+YYYY.MM.dd}" } } else if "MySQL" in [tags]{ jdbc{ driver_class => "com.mysql.cj.jdbc.Driver" connection_string => "jdbc:mysql://127.0.0.1:3306/student?user=root&password=root&serverTimezone=GMT%2B8&characterEncoding=utf8&useSSL=false&autoReconnect=true" statement => ["insert into business_log (version,taskId,taskType,data,time) VALUES (?,?,?,?,?)","version","taskId","taskType","data","time"] } } } else if "error_log" in [tags]{ if "ElasticSearch" in [tags]{ elasticsearch { action => "index" hosts => "localhost:9200" index => "business_error_log-%{+YYYY.MM.dd}" } } } } else if [level] == "ERROR"{ elasticsearch { action => "index" hosts => "localhost:9200" index => "sys_error_log-%{+YYYY.MM.dd}" } } } 配置30天删除日志记录

需求;每天都记录日志,但是考虑到日志文件存储占用内容,所以将30天前的日志文件删除掉

通过以上的配置已经配置好每天都会创建新的日志文件

思考:

①使用Kibana配置删除策略,对日志单独配置处理

②配置创建日志同时配置日志的删除策略

http://127.0.0.1:5601 命令进入Kibana

在这里插入图片描述

手动选择策略

若需要自定义策略,可以创建策略,如以下操作 在这里插入图片描述

进入索引管理,添加策略 在这里插入图片描述 在这里插入图片描述

以上步骤后,实现了指定的索引选择对应是策略

配置自动创建策略

在这里插入图片描述 在这里插入图片描述 在这里插入图片描述

配置内容如下

{ "index": { "lifecycle": { "name": "30-days-default", "rollover_alias": "" }, "routing": { "allocation": { "include": { "_tier_preference": "data_content" } } }, "number_of_shards": "1", "auto_expand_replicas": "0-1", "number_of_replicas": "1" } }

配置完点击"Next"

选择Dynamic templates,根据自定义动态添加字段

在这里插入图片描述

然后点击"Next",逐个点击下一步,最后创建成功。。成功实现了以上的功能

若报错内容如下:

illegal_argument_exception: setting [index.lifecycle.rollover_alias] for index [sys_error_log-2023.05.16] is empty or not defined

更改 策略配置信息 在这里插入图片描述 在这里插入图片描述在这里插入图片描述

问题解决



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3