python监控MySQL表字段新增或者删除的变化

您所在的位置:网站首页 监听数据库更新 python监控MySQL表字段新增或者删除的变化

python监控MySQL表字段新增或者删除的变化

2024-01-01 00:22| 来源: 网络整理| 查看: 265

python监控MySQL表字段新增或者删除的变化

摘要: 生产系统利用OGG部署了一套MySQL 同步数据到Oracle的环境。自部署以来频繁的出现中断的情况。而中断的原因该系统还不稳定,还在开发新功能,系统维护的同事不定期的新增加表字段,导致同步停止。

需要一个告警功能,及时把新增加字段的表发送到企业微信告警,及时手工更新目标库的表字段信息(OGG不支持异构环境的DDL操作)。

脚本内容如下: 更新告警地址:

ding_url = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key="

C://scripts//table_list.file文件内容格式如下:

schema1.tabl1 schema1.tabl2

告警效果如下: 在这里插入图片描述

#!/usr/bin/python3 # -*- coding: utf-8 -*- # @Time : 2021-12 # @Author : meishidong import pymysql import pickle import os.path,sys import urllib.request import urllib.parse import json ## 配置文件,要监控的表 tablist_file = 'C://scripts//table_list.file' ## 程序获取到的表和字段信息持久化的文件,不需修改 tabcol_dic_file = sys.path[0] + '/tabcol_dic.file' ## 告警微信 ding_url = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=" def get_tablelist(): tablist_dic = {} f = open(tablist_file, 'r') tablist = f.read().splitlines() for t in tablist: schema = t.split('.')[0] tabname = t.split('.')[1] if schema in tablist_dic: tablist_dic[schema].append("'" + tabname + "'") else: tablist_dic[schema]=["'" + tabname + "'"] wherelist = [] for (k,v) in tablist_dic.items(): wherelist.append("(table_schema='%s' and table_name in (%s))" % (k,','.join(v))) wherestr = ' or '.join(wherelist) wherestr = "(" + wherestr + ")" return wherestr def get_tabcol_before(): dic_bef={} if os.path.isfile(tabcol_dic_file): with open(tabcol_dic_file, "rb") as f: dic_bef = pickle.load(f) return dic_bef return dic_bef def get_tabcol_current(): tabcol_dic = {} ## 目标库的信息 db = pymysql.connect(host='localhost', user='root', password='ddd', database='information_schema') cursor = db.cursor() wherestr = get_tablelist() sql = "select concat(table_schema,'.',table_name) tabname,column_name from information_schema.columns \ where %s" % (wherestr) cursor.execute(sql) results = cursor.fetchall() for row in results: if row[0] in tabcol_dic: tabcol_dic[row[0]].append(row[1]) else: tabcol_dic[row[0]]=[row[1]] db.close() with open(tabcol_dic_file, "wb") as f: pickle.dump(tabcol_dic, f) return tabcol_dic def get_col_change(): add_col_dic={} del_col_dic={} tabcol_before = get_tabcol_before() tabcol_current = get_tabcol_current() for (tab,cols) in tabcol_before.items(): cols_bef = cols cols_cur = tabcol_current[tab] add_cols = set(cols_cur) - set(cols_bef) del_cols = set(cols_bef) - set(cols_cur) if add_cols: add_col_dic[tab] = list(add_cols) if del_cols: del_col_dic[tab] = list(del_cols) return add_col_dic,del_col_dic def send_msg_by_ding(message): header = { "Content-Type": "application/json" } data = { "msgtype": "text", "text": {"content": message} } send_data = json.dumps(data).encode('utf-8') req = urllib.request.Request(ding_url, data=send_data, headers=header, method='POST') ret = urllib.request.urlopen(req) if ret.status != 200: print("send message error!") def main(): ret = get_col_change() add_dic = ret[0] del_dic = ret[1] message = "ITSM.jepaas表字段变更告警:\n" if add_dic: for (tab,cols) in add_dic.items(): cols.sort() message = message + tab + " 新增加字段: " + ', '.join(cols) + "\n" if del_dic: for (tab,cols) in del_dic.items(): cols.sort() message = message + tab + " 新删除字段: " + ', '.join(cols) + "\n" if add_dic or del_dic: send_msg_by_ding(message) if __name__ == '__main__': main()

备注:脚本参考来自 https://blog.csdn.net/sdmei/article/details/121981629



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3