ELK的安装部署与使用 |
您所在的位置:网站首页 › elk中文手册 › ELK的安装部署与使用 |
ELK的安装与使用
安装部署
部署环境:Elasticsearch-7.17.3 Logstash-7.17.3 Kibana-7.17.3 一、安装部署Elasticsearch 解压目录,进入conf目录下编辑elasticsearch.yml文件,输入以下内容并保存 network.host: 127.0.0.1 http.port: 9200 # 跨域配置 http.cors.enabled: true http.cors.allow-origin: "*"3.进入bin目录,点击elasticsearch.bat文件启动项目 二、安装部署Logstash 文件配置在conf目录下新建logstash.conf input { stdin { } jdbc { # 配置数据库信息 jdbc_connection_string => "jdbc:mysql://localhost:3306/student?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai" jdbc_driver_class => "com.mysql.cj.jdbc.Driver" jdbc_user => "root" jdbc_password => "root" jdbc_paging_enabled => "true" jdbc_page_size => "50000" jdbc_default_timezone => "Asia/Shanghai" # mysql驱动所在位置 jdbc_driver_library => "D:\ELK\mysql-connector-java-8.0.11.jar" #sql执行语句 statement => "SELECT * FROM user" # 定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新 schedule => "* * * * *" # 是否将 sql 中 column 名称转小写 lowercase_column_names => false } } output { elasticsearch { hosts => ["127.0.0.1:9200"] index => "goods" # 文档_id,%{goods_id}意思是取查询出来的goods_id的值,并将其映射到shop的_id字段中 # 文档_id,%{goodsId}如果是别名,意思是取查询出来的goodsId的值,并将其映射到shop的_id字段中 document_id => "%{goodsId}" } stdout { codec => json_lines } } 遇到问题与解决①JAVA_HOME环境找不到,不推荐使用JAVA_HOME变量名 系统找不到指定的路径。could not find java; set JAVA_HOME or ensurejava is in PATH D: ELK logstash-?.17.3 bin>解决方法: 新增JDK环境变量:命名为 LS_JAVA_HOME 将jdk路径复制进去,注意!!!JDK版本必须1.8以上 ②报以下错误 解决办法: 在bin目录下新建文件:logstash.conf 设置logstash.conf的值,或指定路径下的conf文件 input { stdin{ } } output { stdout{ } }启动logstash,进入bin目录下的cmd,输入以下命令 logstash -f logstash.conf 安装部署Kibana进入config目录将kibana.yml文件修改 server.port: 5601 server.host: "0.0.0.0" elasticsearch.hosts: ["http://localhost:9200"] kibana.index: ".kibana" SpringBoot集成Logstash收集日志 1、导入依赖在pom.xml文件导入以下依赖 net.logstash.logback logstash-logback-encoder 5.3 2、配置文件新建"logback-spring.xml"文件,并编写内容如下: localhost:4560 3、配置application.yml文件进入application.yml文件配置日志 logging: level: root: info config: classpath:logback-spring.xml 4、配置logstash下的logstash.conf文件 input{ tcp{ mode => "server" host => "127.0.0.1" port => 4560 codec => json_lines } } output{ elasticsearch{ action => "index" hosts => "localhost:9200" index => "springboot_log-%{+YYYY.MM.dd}" } }以上配置完启动可运行成功 5、配置Logstash日志保存到数据库Linux配置这里就不提了,网上很多。这里讲的是windows系统下的配置,并以MySQL数据库为例。 使用的是jdbc,由于Logstash默认不带jdbc,所以需要下载logstash-output-jdbc,在线下载方式:进入logstash的bin目录下输入以下命令: logstash-plugin install --no-verify logstash-output-jdbc若安装失败,则去百度找一个压缩包下载(小编是在百度网盘上面下载的) 下一步是采用不联网的安装方式:进入logstash的bin目录下输入以下命令: logstash-plugin install file://文件路径+压缩包全名,以'/'分隔若还是安装失败,则需要更改配置文件 在logstash根目录下的Gemfile文件修改以下配置 source "https://gems.ruby-china.com"重新进入logstash的bin目录下输入以下命令 logstash-plugin install file://文件路径+压缩包全名,以'/'分隔网上下载mysql驱动,譬如mysql-connector-java-8.0.11.jar 下载后在logstash下的vendor目录下新建jar目录,再进入jar目录新建jdbc目录 将mysql驱动包放到jdbc目录下,全局路径如下:logstash-7.17.3目录为安装模块 logstash-7.17.3\vendor\jar\jdbc\mysql-connector-java-8.0.11.jar譬如springboot下的日志内容为 log.info(“|”+“version内容”+“|”+“taskId内容”+“|”+“taskType内容”+“|”+“data内容”+“|”+“time内容”) 重新进入logstash.conf文件,编辑以下内容 input{ tcp{ mode => "server" host => "127.0.0.1" port => 4560 codec => json_lines } } filter{ mutate{ split => ["message","|"] } if[message][2]{ mutate{ add_field =>{ "version" => "%{[message][2]}" } } } if[message][3]{ mutate{ add_field =>{ "taskId" => "%{[message][3]}" } } } if[message][4]{ mutate{ add_field =>{ "taskType" => "%{[message][4]}" } } } if[message][5]{ mutate{ add_field =>{ "data" => "%{[message][5]}" } } } if[message][6]{ mutate{ add_field =>{ "time" => "%{[message][6]}" } } } mutate{ convert => { "version" => "string" "taskId" => "string" "taskType" => "string" "data" => "string" "time" => "string" } } } output{ driver_class => "com.mysql.cj.jdbc.Driver" connection_string => "jdbc:mysql://127.0.0.1:3306/student?user=用户名&password=密码&serverTimezone=GMT%2B8&characterEncoding=utf8&useSSL=false&autoReconnect=true" statement => ["insert into 表名 (version,taskId,taskType,data,time) VALUES (?,?,?,?,?)","version","taskId","taskType","data","time"] } 6、区分业务日志和错误日志存储add_tag 属性,在input添加,用于区分输出 具体内容就不详细介绍了,看下面小编写的内容 input { tcp { mode => "server" host => "127.0.0.1" port => 4560 codec => json_lines } } filter{ mutate{ split => ["message","|"] } if[message][1] == "business_log"{ mutate{ add_tag =>["business_log"] } } if[message][1]== "error_log"{ mutate{ add_tag => ["error_log"] } } if[message][2]=="ElasticSearch"{ mutate{ add_tag => ["ElasticSearch"] } } if[message][2]=="MYSQL"{ mutate{ add_tag => ["MySQL"] } } if[message][2]=="PgSQL"{ mutate{ add_tag => ["PgSQL"] } } if[message][2]=="ElasticSearchMySQL"{ mutate{ add_tag => ["ElasticSearchAndMySQL"] } } if[message][2]=="ElasticSearchPgSQL"{ mutate{ add_tag => ["ElasticSearchAndPgSQL"] } } if [level] == "INFO"{ if[message][3]{ mutate{ add_field =>{ "version" => "%{[message][3]}" } } } if[message][4]{ mutate{ add_field =>{ "taskId" => "%{[message][4]}" } } } if[message][5]{ mutate{ add_field =>{ "taskType" => "%{[message][5]}" } } } if[message][6]{ mutate{ add_field =>{ "data" => "%{[message][6]}" } } } if[message][7]{ mutate{ add_field =>{ "time" => "%{[message][7]}" } } } mutate{ convert => { "version" => "string" "taskId" => "string" "taskType" => "string" "data" => "string" "time" => "string" } } } } output { if [level] == "INFO"{ if "business_log" in [tags]{ if "ElasticSearch" in [tags]{ elasticsearch { action => "index" hosts => "localhost:9200" index => "business_log-%{+YYYY.MM.dd}" } } else if "MySQL" in [tags]{ jdbc{ driver_class => "com.mysql.cj.jdbc.Driver" connection_string => "jdbc:mysql://127.0.0.1:3306/student?user=root&password=root&serverTimezone=GMT%2B8&characterEncoding=utf8&useSSL=false&autoReconnect=true" statement => ["insert into business_log (version,taskId,taskType,data,time) VALUES (?,?,?,?,?)","version","taskId","taskType","data","time"] } } } else if "error_log" in [tags]{ if "ElasticSearch" in [tags]{ elasticsearch { action => "index" hosts => "localhost:9200" index => "business_error_log-%{+YYYY.MM.dd}" } } } } else if [level] == "ERROR"{ elasticsearch { action => "index" hosts => "localhost:9200" index => "sys_error_log-%{+YYYY.MM.dd}" } } } 配置30天删除日志记录需求;每天都记录日志,但是考虑到日志文件存储占用内容,所以将30天前的日志文件删除掉 通过以上的配置已经配置好每天都会创建新的日志文件 思考: ①使用Kibana配置删除策略,对日志单独配置处理 ②配置创建日志同时配置日志的删除策略 http://127.0.0.1:5601 命令进入Kibana 若需要自定义策略,可以创建策略,如以下操作 进入索引管理,添加策略 以上步骤后,实现了指定的索引选择对应是策略 配置自动创建策略
配置内容如下 { "index": { "lifecycle": { "name": "30-days-default", "rollover_alias": "" }, "routing": { "allocation": { "include": { "_tier_preference": "data_content" } } }, "number_of_shards": "1", "auto_expand_replicas": "0-1", "number_of_replicas": "1" } }配置完点击"Next" 选择Dynamic templates,根据自定义动态添加字段 然后点击"Next",逐个点击下一步,最后创建成功。。成功实现了以上的功能 若报错内容如下: illegal_argument_exception: setting [index.lifecycle.rollover_alias] for index [sys_error_log-2023.05.16] is empty or not defined更改 策略配置信息 问题解决 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |