翼度科技»论坛 编程开发 mysql 查看内容

MySQL MHA信息的收集【Filebeat+logstash+MySQL】

7

主题

7

帖子

21

积分

新手上路

Rank: 1

积分
21
一.项目背景

随着集团MHA集群的日渐增长,MHA管理平台话越来越迫切。而MHA平台的建设第一步就是将这些成百上千套的MHA集群信息收集起来,便于查询和管理。
MHA主要信息如下:
(1)基础配置信息;
(2)运行状态信息;
(3)启动及FailOver的log信息。
集团目前数据库的管理平台是在Archery的基础上打造,所以,需要将此功能嵌入到既有平台上。通过Archery系统进行查询展示。
二.架构


 简单来说,通过 Filebeat + Logstash + MySQL 架构 来收集保存各个集群的配置信息、启动及FailOver的log信息 和运行状态信息。
运行状态信息是通过一个小程序获取的,这个小程序每分钟执行一次,会把执行结果输出到文件中。当然这个文件是被failbeat监控的。
三.实现

3.1 获取MHA状态的脚本

文件为mha_checkstatus.py
  1. #!/usr/bin/python
  2. # -*- coding: UTF-8 -*-
  3. import os
  4. import io
  5. import re
  6. import ConfigParser
  7. Path='/etc/mha'
  8. #fout=open('输出文件名','w')
  9. for Name in os.listdir(Path) :
  10.   Pathname= os.path.join(Path,Name)
  11. ## print(Pathname)
  12. ## print(Name)
  13.   config =ConfigParser.ConfigParser()
  14.   try:
  15.     config.read(Pathname)
  16.     server_item = config.sections()
  17.     server1_host = ''  ##MHA cnf 配置文件中的节点1
  18.     server2_host = ''  ##MHA cnf 配置文件中的节点2
  19.     server3_host = ''  ##MHA cnf 配置文件中的节点3
  20.     mha_cnf_remark = ''
  21.     if 'server1' in server_item:
  22.       server1_host = config.get('server1','hostname')
  23.     else:
  24.        mha_cnf_remark = mha_cnf_remark + 'Server1未配置;'
  25.     if 'server2' in server_item:
  26.       server2_host = config.get('server2','hostname')
  27.     else:
  28.       mha_cnf_remark = mha_cnf_remark + 'Server2未配置;'
  29.     if 'server3' in server_item:
  30.       server3_host = config.get('server3','hostname')
  31.       ##print(mha_cnf_remark)
  32.   except Exception as e:
  33.     print(e)
  34.   mha_status_result =''
  35.   ###20190330
  36.   Name = Name.replace(".cnf", "")
  37.   ###集群一主一从
  38.   if server1_host <> '' and server2_host <> '' and server3_host == '':
  39.     cmd_mha_status ='/???/???/bin/masterha_check_status --conf='+Pathname
  40.     with os.popen(cmd_mha_status) as mha_status:
  41.       mha_status_result = mha_status.read()
  42.       if 'running(0:PING_OK)' in mha_status_result:
  43.         print(Name+':::'+Pathname+':::0:::'+server1_host+':::'+mha_status_result)
  44.         print(Name+':::'+Pathname+':::0:::'+server2_host+':::'+mha_status_result)
  45.       if 'stopped(2:NOT_RUNNING)' in mha_status_result:
  46.         print(Name+':::'+Pathname+':::None:::'+server1_host+':::'+mha_status_result)
  47.         print(Name+':::'+Pathname+':::None:::'+server2_host+':::'+mha_status_result)
  48.   ####集群一主两从
  49.   if server1_host <> '' and server2_host <> '' and server3_host <> '':
  50.     cmd_mha_status ='/???/???/bin/masterha_check_status --conf='+Pathname
  51.     with os.popen(cmd_mha_status) as mha_status:
  52.       mha_status_result = mha_status.read()
  53.       if 'running(0:PING_OK)' in mha_status_result:
  54.         print(Name+':::'+Pathname+':::0:::'+server1_host+':::'+mha_status_result)
  55.         print(Name+':::'+Pathname+':::0:::'+server2_host+':::'+mha_status_result)
  56.         print(Name+':::'+Pathname+':::0:::'+server3_host+':::'+mha_status_result)
  57.       if 'stopped(2:NOT_RUNNING)' in mha_status_result:
  58.         print(Name+':::'+Pathname+':::None:::'+server1_host+':::'+mha_status_result)
  59.         print(Name+':::'+Pathname+':::None:::'+server2_host+':::'+mha_status_result)
  60.         print(Name+':::'+Pathname+':::None:::'+server3_host+':::'+mha_status_result)
复制代码
 概况说明,就是到存放MHA配置的文件夹,根据每个集群的配置文档,去逐一执行下masterha_check_status,把结果格式化,输出到指定的文件中。这个就是每个集群的状态数据。通过filebeat实时汇报上去。
触发的方式可以是crontab,每分钟执行一次。再本案中是输出到 /???/checkmhastatus/masterha_check_status.log 中。
形式类似如下:
  1. */1 * * * * python /???/????/mha_checkstatus.py >>   /???/????/masterha_check_status.log
复制代码
3.2 表的设计及脚本

3.2.1 运行状态表 dbmha_status
  1. CREATE TABLE `dbmha_status` (
  2.   `id` int NOT NULL AUTO_INCREMENT,
  3.   `host` varchar(100) NOT NULL,
  4.   `clustername` varchar(200) NOT NULL,
  5.   `logpath` varchar(500) NOT NULL,
  6.   `confpath` varchar(500) NOT NULL,
  7.   `mhstatus` varchar(100) NOT NULL,
  8.   `serverip` varchar(100) NOT NULL,
  9.   `info` varchar(2000) NOT NULL,
  10.   `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '收集时间',
  11.   PRIMARY KEY (`id`),
  12.   KEY `idx_createtime` (`create_time`)
  13. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
复制代码
3.2.2 mha log 信息表 dbmha_log
  1. CREATE TABLE `dbmha_log` (
  2.   `id` int NOT NULL AUTO_INCREMENT,
  3.   `host` varchar(100) NOT NULL,
  4.   `clustername` varchar(200) NOT NULL,
  5.   `filename` varchar(200) NOT NULL,
  6.   `logpath` varchar(500) NOT NULL,
  7.   `message` longtext NOT NULL,
  8.   `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '收集时间',
  9.   PRIMARY KEY (`id`)
  10. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
复制代码
3.2.3 MHA 基础配置表 dbmha_conf_info
  1. CREATE TABLE `dbmha_conf_info` (
  2.   `id` int NOT NULL AUTO_INCREMENT,
  3.   `host` varchar(100) NOT NULL,
  4.   `clustername` varchar(200) NOT NULL DEFAULT '',
  5.   `confpath` varchar(500) NOT NULL DEFAULT '',
  6.   `manager_log` varchar(500) NOT NULL DEFAULT '',
  7.   `manager_workdir` varchar(500) NOT NULL DEFAULT '',
  8.   `master_binlog_dir` varchar(500) NOT NULL DEFAULT '',
  9.   `failover_script` varchar(500) NOT NULL DEFAULT '',
  10.   `online_change_script` varchar(500) NOT NULL DEFAULT '',
  11.   `password` varchar(128) NOT NULL DEFAULT '',
  12.   `ping_interval` varchar(100) NOT NULL DEFAULT '',
  13.   `remote_workdir` varchar(100) NOT NULL DEFAULT '',
  14.   `repl_password` varchar(128) NOT NULL DEFAULT '',
  15.   `repl_user` varchar(20) NOT NULL DEFAULT '',
  16.   `ssh_user` varchar(20) NOT NULL DEFAULT '',
  17.   `user` varchar(20) NOT NULL DEFAULT '',
  18.   `serverip1` varchar(100) NOT NULL DEFAULT '',
  19.   `port1` varchar(10) NOT NULL DEFAULT '',
  20.   `candidate_master1` varchar(5) NOT NULL DEFAULT '',
  21.   `check_repl_delay1` varchar(20) NOT NULL DEFAULT '',
  22.   `serverip2` varchar(100) NOT NULL DEFAULT '',
  23.   `port2` varchar(10) NOT NULL DEFAULT '',
  24.   `candidate_master2` varchar(5) NOT NULL DEFAULT '',
  25.   `check_repl_delay2` varchar(20) NOT NULL DEFAULT '',
  26.   `serverip3` varchar(100) NOT NULL DEFAULT '',
  27.   `port3` varchar(10) NOT NULL DEFAULT '',
  28.   `candidate_master3` varchar(5) NOT NULL DEFAULT '',
  29.   `check_repl_delay3` varchar(20) NOT NULL DEFAULT '',
  30.   `info` longtext NOT NULL,
  31.   `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '收集时间',
  32.   PRIMARY KEY (`id`),
  33.   KEY `idx_createtime` (`create_time`)
  34. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
复制代码
3.3 filbeat 中关于读取文件的配置
  1. ..............
  2. - type: log
  3.   paths:
  4.     - /???/????/masterha_check_status.log
  5.   fields:
  6.     log_type: mha-status
  7.     db_host: 111.111.XXX.1XX    ###这个IP为mha Mnaager所在serverip
  8. - type: log
  9.   paths:
  10.     - /???/mhaconf/*.cnf
  11.   fields:
  12.     log_type: mha-cnf
  13.     db_host: 111.111.XXX.XXX
  14.   multiline.type: pattern
  15.   multiline.pattern: '^\[server [[:space:]] default'
  16.   multiline.negate: true
  17.   multiline.match: after
  18. - type: log
  19.   paths:
  20.     - /???/????/mha/*/*.log
  21.   fields:
  22.     log_type: mysql-mha
  23.     db_host: 111.111.XXX.XXX
  24. ................
复制代码
3.4 Logstash 的配置文件
  1. # Sample Logstash configuration for creating a simple
  2. # Beats -> Logstash -> Elasticsearch pipeline.
  3. input {
  4.   beats {
  5.     port => 5044
  6.   }
  7. }
  8. filter {
  9.     if [fields][log_type] == "mysql-mha" {
  10.         grok {
  11.             match => ["message", "(?m)^%{TIMESTAMP_ISO8601:timestamp} %{BASE10NUM} \[%{WORD:error_level}\] %{GREEDYDATA:error_msg}$"]
  12.         }
  13.         grok {
  14.             match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/)(?<product>.*)(\\|\/).*"}
  15.         }
  16.         grok {
  17.             match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/).*(\\|\/)(?<filename>.*)"}
  18.         }
  19.         date {
  20.             match=> ["timestamp", "ISO8601"]
  21.             remove_field => ["timestamp"]
  22.         }
  23.         mutate {
  24.         copy => { "[log][file][path]" => "logpath"
  25.                  "[fields][db_host]" => "manager_ip" }
  26.         }
  27.         mutate {
  28.             remove_field => ["@version", "beat", "input", "offset", "prospector", "source", "tags"]
  29.         }
  30.     }
  31.     if [fields][log_type] == "mha-cnf" {
  32.         mutate {
  33.         split => ["message","server"]
  34.         add_field => {"message1" => "%{[message][1]}"}
  35.         add_field => {"messages1" => "%{[message][2]}"}
  36.         add_field => {"messages2" => "%{[message][3]}"}
  37.         add_field => {"messages3" => "%{[message][4]}"}
  38.         add_field => {"dft_password" => "*********"}
  39.         add_field => {"dft_repl_password" => "*********"}
  40.         }
  41.         kv {
  42.              source => "message1"
  43.              field_split => "\n"
  44.              include_keys => ["manager_log", "manager_workdir", "master_binlog_dir", "master_ip_failover_script", "master_ip_online_change_script", "ping_interval", "remote_workdir", "repl_user", "ssh_user", "user" ]
  45.              prefix => "dft_"
  46.              remove_char_value => "<>\[\],"
  47.         }
  48.         kv {
  49.              source => "messages1"
  50.              field_split => "\n"
  51.              include_keys => ["candidate_master", "check_repl_delay", "hostname", "port" ]
  52.              prefix => "s1_"
  53.         }
  54.         kv {
  55.              source => "messages2"
  56.              field_split => "\n"
  57.              default_keys => [ "s2_candidate_master", "",
  58.                          "s2_check_repl_delay", "",
  59.                          "s2_hostname","",
  60.                           "s2_port",""
  61.                           ]
  62.              include_keys => ["candidate_master", "check_repl_delay", "hostname", "port" ]
  63.              prefix => "s2_"
  64.         }
  65.         kv {
  66.              source => "messages3"
  67.              field_split => "\n"
  68.              default_keys => [ "s3_candidate_master", "",
  69.                          "s3_check_repl_delay", "",
  70.                          "s3_hostname","",
  71.                           "s3_port",""
  72.                           ]
  73.              include_keys => ["candidate_master", "check_repl_delay", "hostname", "port" ]
  74.              prefix => "s3_"
  75.         }
  76.         grok {
  77.             match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/)(?<product>.*)(\\|\/).*"}
  78.             match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/).*(\\|\/)(?<filename>.*)"}
  79.         }
  80.         mutate {
  81.              copy => { "[fields][db_host]" => "manager_ip" }
  82.              copy => { "[log][file][path]" => "conf_path" }
  83.              gsub => [
  84.                       "message", "需要加密的***密***码", "*********",
  85.                       "message", "需要加密的其他字符", "*********"
  86.                       ]
  87.         }
  88.         date {
  89.             match=> ["timestamp", "ISO8601"]
  90.             remove_field => ["timestamp"]
  91.         }
  92.         mutate {
  93.             remove_field => ["@version", "beat", "input", "offset", "prospector", "source", "tags"]
  94.         }
  95.     }
  96.     if [fields][log_type] == "mha-status" {
  97.        mutate {
  98.         split => ["message",":::"]
  99.         add_field => {"cluster_name" => "%{[message][0]}"}
  100.         add_field => {"conf_path" => "%{[message][1]}"}
  101.         add_field => {"masterha_check_status" => "%{[message][2]}"}
  102.         add_field => {"server" => "%{[message][3]}"}
  103.         add_field => {"info" => "%{[message][4]}"}
  104.          }
  105.         grok {
  106.             match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/).*(\\|\/)(?<filename>.*)"}
  107.         }
  108.         mutate {
  109.              copy => { "[fields][db_host]" => "manager_ip" }
  110.         }
  111.         date {
  112.             match=> ["timestamp", "ISO8601"]
  113.             remove_field => ["timestamp"]
  114.         }
  115.         mutate {
  116.             remove_field => ["@version", "beat", "input", "offset", "prospector", "source", "tags"]
  117.         }
  118.     }
  119. }
  120. output {
  121.     if [fields][log_type] == "mysql-mha" {
  122.       jdbc {
  123.            driver_jar_path => "/???/???/logstash-7.6.0/vendor/jar/jdbc/mysql-connector-java-5.1.47.jar"
  124.            driver_class => "com.mysql.jdbc.Driver"
  125.            connection_string => "jdbc:mysql://120.120.XXX.XXX:3306/archery?user=ts23_dbhacluster&password=???????"
  126.            statement => ["INSERT INTO dbmha_log (host,clustername,filename,logpath, message) VALUES(?, ?, ?, ?, ?)","%{manager_ip}","%{product}","%{filename}","%{logpath}","%{message}"]
  127.        }
  128.     }
  129.     if [fields][log_type] == "mha-status" {
  130.       jdbc {
  131.            driver_jar_path => "/???/???/logstash-7.6.0/vendor/jar/jdbc/mysql-connector-java-5.1.47.jar"
  132.            driver_class => "com.mysql.jdbc.Driver"
  133.            connection_string => "jdbc:mysql://120.120.XXX.XXX:3306/archery?user=ts23_dbhacluster&password=???????"
  134.            statement => ["INSERT INTO dbmha_status (host,clustername,logpath,confpath,mhstatus,serverip,info) VALUES(?, ?, ?, ?, ?, ?, ?)","%{manager_ip}","%{cluster_name}","%{filename}","%{conf_path}","%{masterha_check_status}","%{server}","%{info}"]
  135.        }
  136.    }
  137.     if [fields][log_type] == "mha-cnf" {
  138.       jdbc {
  139.            driver_jar_path => "/???/???/logstash-7.6.0/vendor/jar/jdbc/mysql-connector-java-5.1.47.jar"
  140.            driver_class => "com.mysql.jdbc.Driver"
  141.            connection_string => "jdbc:mysql://120.120.XXX.XXX:3306/archery?user=ts23_dbhacluster&password=???????"
  142.            statement => ["INSERT INTO dbmha_conf_info (host,clustername,confpath,manager_log,manager_workdir,master_binlog_dir,failover_script,online_change_script,password,ping_interval,remote_workdir,repl_password,repl_user,ssh_user,user,serverip1,port1,candidate_master1,check_repl_delay1,serverip2,port2,candidate_master2,check_repl_delay2,serverip3,port3,candidate_master3,check_repl_delay3,info) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)","%{manager_ip}","%{product}","%{conf_path}","%{dft_manager_log}","%{dft_manager_workdir}","%{dft_master_binlog_dir}","%{dft_master_ip_failover_script}","%{dft_master_ip_online_change_script}","%{dft_password}","%{dft_ping_interval}","%{dft_remote_workdir}","%{dft_repl_password}","%{dft_repl_user}","%{dft_ssh_user}","%{dft_user}","%{s1_hostname}","%{s1_port}","%{s1_candidate_master}","%{s1_check_repl_delay}","%{s2_hostname}","%{s2_port}","%{s2_candidate_master}","%{s2_check_repl_delay}","%{s3_hostname}","%{s3_port}","%{s3_candidate_master}","%{s3_check_repl_delay}","%{message}"]
  143.        }
  144.    }
  145. }
复制代码
 这个配置还是相对复杂难懂的。这个文件配置了对三种文件的读取,我们就看读取mha配置文件的部分【[fields][log_type] == "mha-cnf"】,我们挑其中的几个点说下,更多的内容可参照logstash官网--https://www.elastic.co/guide/en/logstash/current/filter-plugins.html
首先,我们是 “server” 关键字,把文件中的配置信息,分割成不同的部分。
接着,因为配置文件的格式是 key=value的样式,所以需要借助 kv{},其中的参数说下:field_split---定义字段间的分隔符;include_keys--定义只读去规定的特定key;prefix---格式化字段名字,加个前缀名字,主要是用来区分server 1 部分和 server2、、、之间的分别。
 通过【match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/)(?.*)(\\|\/).*"}】,获取product字段,我们是通过mha的配置文件的名字来定义集群的名字,即规范了mha配置文件的名字的命名来自于集群的名字,反推得知了配置文件的名字,就知道了集群的名字。【match => { "[log][file][path]" => ".*(\\|\/).*(\\|\/).*(\\|\/)(?.*)"}】这个地方的filename包含了文件的后缀。
四.平台前端

我们是把此项目嵌入到既有的Archery平台中,增加了3个查询界面,界面的实现,在此就不具体展开了。
需要注意的是,界面需要支持模糊查询,例如支持MHA Manager Server IP查询(方便查询各个Manager节点上有多少集群);支持集群名字的模糊查询;支持节点serverIP的模糊查询。
五.补充说明

Q.1 为什么用MySQL存储信息,ELK是更成熟的架构啊?
是的,用elasticsearch来存储这种文本信息更常见。我们用MySQL替代elasticsearch基于以下考虑:(1)我们既有的管理平台使用的是MySQL,把他们保存到MySQL 便于集成;(2)这些数据,不仅仅是Log,还有些是基础数据,放到MySQL便于相互管理、聚合展示(3)这是数据量并不大,例如mha.log,只有在启动或者failover时才有变化,conf信息也是很少的,所以,从数据量也一点考虑,也不需要保存到MySQL。
Q.2 Logstash 可以把数据写入到MySql中吗?
是可以的。主要是logstash-output-jdbc、logstash-codec-plain插件的安装。
如果是离线的环境下安装,可以参考 《logstash 离线安装logstash-output-jdbc》
https://blog.csdn.net/sxw1065430201/article/details/123663108
Q.3 MHA log 文件夹中 原有一个 .health ,里面是MHA每分钟的健康性报告,那为什么还要自己写Python程序获取呢?
因为.healthy 的内容不是换行符结尾,而filebeat是以换行符来判断的(https://www.elastic.co/guide/en/beats/filebeat/7.4/newline-character-required-eof.html 有详细说明)。
简单来说,filebeat读取不了。
Q.4 MHA 健康性检查的原理
具体的原理可以参考此文章的分析说明--《mha检测mysql状况方式》
https://blog.csdn.net/weixin_35411438/article/details/113455263
Q.5 历史数据的删除
mha log 信息表 dbmha_log
MHA 基础配置表 dbmha_conf_info
以上两张表基本上很少变化,量不大,其数据无需定期删除。
运行状态表 dbmha_status,此表每个集群每分钟(具体crontab定义)都会有新数据插入,数据量增长较大,应设置定时任务,定期删除历史数据,例如删除7天前的数据。
 

来源:https://www.cnblogs.com/xuliuzai/p/17320183.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

举报 回复 使用道具