欧美阿v视频在线大全_亚洲欧美中文日韩V在线观看_www性欧美日韩欧美91_亚洲欧美日韩久久精品

主頁 > 知識庫 > python實現MySQL指定表增量同步數據到clickhouse的腳本

python實現MySQL指定表增量同步數據到clickhouse的腳本

熱門標簽:舉辦過冬奧會的城市地圖標注 螳螂科技外呼系統怎么用 400電話申請資格 電銷機器人系統廠家鄭州 正安縣地圖標注app 阿里電話機器人對話 地圖地圖標注有嘆號 遼寧智能外呼系統需要多少錢 qt百度地圖標注

python實現MySQL指定表增量同步數據到clickhouse,腳本如下:

#!/usr/bin/env python3
# _*_ coding:utf8 _*_
 
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (DeleteRowsEvent,UpdateRowsEvent,WriteRowsEvent,)
import clickhouse_driver
import configparser
import os
 
configfile='repl.ini'
########## 配置文件repl.ini 操作 ##################
def create_configfile(configfile,log_file,log_pos):
  config = configparser.ConfigParser()
 
  if not os.path.exists(configfile):
    config['replinfo'] = {'log_file':log_file,'log_pos':str(log_pos)}
 
    with open(configfile,'w+') as f:
      config.write(f)
 
### repl.ini 寫操作 ##################
def write_config(configfile,log_file,log_pos):
  config = configparser.ConfigParser()
  config.read(configfile)
 
  config.set('replinfo','log_file',log_file)
  config.set('replinfo','log_pos',str(log_pos))
 
  if os.path.exists(configfile):
    with open(configfile,'w+') as f:
      config.write(f)
  else:
    create_configfile(configfile)
 
### 配置文件repl.ini 讀操作 ##################
def read_config(configfile):
  config = configparser.ConfigParser()
  config.read(configfile)
  # print(config['replinfo']['log_file'])
  # print(config['replinfo']['log_pos'])
  return (config['replinfo']['log_file'],int(config['replinfo']['log_pos']))
 
############# clickhouse 操作 ##################
def ops_clickhouse(db,table,sql):
  column_type_dic={}
  try:
    client = clickhouse_driver.Client(host='127.0.0.1',\

                     port=9000,\

                     user='default',\

                     password='clickhouse')
    # sql="select name,type from system.columns where database='{0}' and table='{1}'".format(db,table)
    client.execute(sql)
 
  except Exception as error:
    message = "獲取clickhouse里面的字段類型錯誤. %s" % (error)
    # logger.error(message)
    print(message)
    exit(1)
 
MYSQL_SETTINGS = {'host':'127.0.0.1','port':13306,'user':'root','passwd':'Root@0101'}
only_events=(DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent)
def main():
  ## 每次重啟時,讀取上次同步的log_file,log_pos
  (log_file,log_pos) = read_config(configfile)
  # print(log_file+'|'+ str(log_pos))
  print('-----------------------------------------------------------------------------')
  stream = BinLogStreamReader(connection_settings=MYSQL_SETTINGS, resume_stream=True, blocking=True, \

                server_id=10,
                 only_tables='t_repl', only_schemas='test', \

                log_file=log_file,log_pos=log_pos, \

                only_events=only_events, \

                fail_on_table_metadata_unavailable=True, slave_heartbeat=10)
 
  try:
    for binlogevent in stream:
      for row in binlogevent.rows:
        ## delete操作
        if isinstance(binlogevent, DeleteRowsEvent):
          info = dict(row["values"].items())
          # print("DELETE FROM `%s`.`%s` WHERE %s = %s ;" %(binlogevent.schema ,binlogevent.table,binlogevent.primary_key,info[binlogevent.primary_key]) )
          # print("ALTER TABLE `%s`.`%s` DELETE WHERE %s = %s ;" %(binlogevent.schema ,binlogevent.table,binlogevent.primary_key,info[binlogevent.primary_key]) )
          sql="ALTER TABLE `%s`.`%s` DELETE WHERE %s = %s ;" %(binlogevent.schema ,binlogevent.table,binlogevent.primary_key,info[binlogevent.primary_key])
 
        ## update 操作
        elif isinstance(binlogevent, UpdateRowsEvent):
          info_before = dict(row["before_values"].items())
          info_after = dict(row["after_values"].items())
          # info_set = str(info_after).replace(":","=").replace("{","").replace("}","")
          info_set = str(info_after).replace(":", "=").replace("{", "").replace("}", "").replace("'","")
          # print("UPDATE `%s`.`%s` SET %s WHERE %s = %s ;"%(binlogevent.schema,binlogevent.table,info_set,binlogevent.primary_key,info_before[binlogevent.primary_key]  ) )
          # print("ALTER TABLE %s.%s UPDATE %s WHERE %s = %s ;"%(binlogevent.schema,binlogevent.table,info_set,binlogevent.primary_key,info_before[binlogevent.primary_key]  ) )
          sql = "ALTER TABLE %s.%s UPDATE %s WHERE %s = %s ;"%(binlogevent.schema,binlogevent.table,info_set,binlogevent.primary_key,info_before[binlogevent.primary_key]  )
 
        ## insert 操作
        elif isinstance(binlogevent, WriteRowsEvent):
          info = dict(row["values"].items())
          # print("INSERT INTO %s.%s(%s)VALUES%s ;"%(binlogevent.schema,binlogevent.table , ','.join(info.keys()) ,str(tuple(info.values())) ) )
          sql = "INSERT INTO %s.%s(%s)VALUES%s ;"%(binlogevent.schema,binlogevent.table , ','.join(info.keys()) ,str(tuple(info.values())) )
        ops_clickhouse('test', 't_repl',sql )
 
        # 當前log_file,log_pos寫入配置文件
        write_config(configfile, stream.log_file, stream.log_pos)
 
  except Exception as e:
    print(e)
  finally:
    stream.close()
 
if __name__ == "__main__":
  main()
 
 
 
'''
BinLogStreamReader()參數
ctl_connection_settings:集群保存模式信息的連接設置
resume_stream:從位置或binlog的最新事件或舊的可用事件開始
log_file:設置復制開始日志文件
log_pos:設置復制開始日志pos(resume_stream應該為true)
auto_position:使用master_auto_position gtid設置位置
blocking:在流上讀取被阻止
only_events:允許的事件數組
ignored_events:被忽略的事件數組
only_tables:包含要觀看的表的數組(僅適用于binlog_format ROW)
ignored_tables:包含要跳過的表的數組
only_schemas:包含要觀看的模式的數組
ignored_schemas:包含要跳過的模式的數組
freeze_schema:如果為true,則不支持ALTER TABLE。速度更快。
skip_to_timestamp:在達到指定的時間戳之前忽略所有事件。
report_slave:在SHOW SLAVE HOSTS中報告奴隸。
slave_uuid:在SHOW SLAVE HOSTS中報告slave_uuid。
fail_on_table_metadata_unavailable:如果我們無法獲取有關row_events的表信息,應該引發異常
slave_heartbeat:(秒)主站應主動發送心跳連接。這也減少了復制恢復時GTID復制的流量(在許多事件在binlog中跳過的情況下)。請參閱mysql文檔中的MASTER_HEARTBEAT_PERIOD以了解語義
'''

知識點擴展:

MySQL備份-增量同步

mysql增量同步主要使用binlog文件進行同步,binlog文件主要記錄的是數據庫更新操作相關的內容。

1. 備份數據的意義

針對不同業務,7*24小時提供服務和數據的重要性不同。
數據庫數據是比較核心的數據,對企業的經營至關重要,數據庫備份顯得尤為重要。

2. 備份數據庫

MySQL數據庫自帶的備份命令 `mysqldump`,基本使用方法:
語法:`mysqldump -u username -p password dbname > filename.sql`

執行備份命令

`mysqldump -uroot -pmysqladmin db_test > /opt/mysql_bak.sql`

查看備份內容

`grep -v "#|\*|--|^$" /opt/mysql_bak.sql`

到此這篇關于python實現MySQL指定表增量同步數據到clickhouse的腳本的文章就介紹到這了,更多相關python實現MySQL增量同步數據內容請搜索腳本之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持腳本之家!

您可能感興趣的文章:
  • 基于python的mysql復制工具詳解
  • 由Python編寫的MySQL管理工具代碼實例
  • python實現讀取excel寫入mysql的小工具詳解
  • Python操作MySQL數據庫的簡單步驟分享
  • Python爬蟲爬取全球疫情數據并存儲到mysql數據庫的步驟
  • Python爬取騰訊疫情實時數據并存儲到mysql數據庫的示例代碼
  • 解決python mysql insert語句的問題
  • python 在mysql中插入null空值的操作
  • 用python開發一款操作MySQL的小工具

標簽:昭通 合肥 興安盟 淘寶好評回訪 濟源 信陽 阜新 隨州

巨人網絡通訊聲明:本文標題《python實現MySQL指定表增量同步數據到clickhouse的腳本》,本文關鍵詞  python,實現,MySQL,指定,表,;如發現本文內容存在版權問題,煩請提供相關信息告之我們,我們將及時溝通與處理。本站內容系統采集于網絡,涉及言論、版權與本站無關。
  • 相關文章
  • 下面列出與本文章《python實現MySQL指定表增量同步數據到clickhouse的腳本》相關的同類信息!
  • 本頁收集關于python實現MySQL指定表增量同步數據到clickhouse的腳本的相關信息資訊供網民參考!
  • 推薦文章
    欧美阿v视频在线大全_亚洲欧美中文日韩V在线观看_www性欧美日韩欧美91_亚洲欧美日韩久久精品
  • <rt id="w000q"><acronym id="w000q"></acronym></rt>
  • <abbr id="w000q"></abbr>
    <rt id="w000q"></rt>
    亚洲小说欧美激情另类| 欧美高清性xxxx| 午夜国产福利一区二区| 久久久精品国产99久久精品芒果| 免费在线看一区| 国产一线在线观看| 欧美精品精品一区| 五月天精品一区二区三区| 无码人妻一区二区三区在线| 欧美日韩激情一区二区| 亚洲午夜激情av| 免费黄色a级片| 欧美另类久久久品| 天天色综合成人网| 无码一区二区精品| 日韩免费在线观看| 精久久久久久久久久久| 岛国片在线免费观看| 日本一区二区三区在线不卡| 国产成人精品午夜视频免费 | 91丝袜国产在线播放| 91高清视频在线| 亚洲国产综合色| 日韩精品一区二区三区高清免费| 欧美一区二区免费观在线| 青娱乐精品视频在线| 娇妻被老王脔到高潮失禁视频| 久久男人中文字幕资源站| 国产一区二区三区在线观看免费| 亚洲AV成人无码网站天堂久久| 国产精品乱码一区二区三区软件| av电影在线不卡| 欧美精品一二三四| 麻豆精品视频在线观看视频| 国产一级淫片久久久片a级| 国产精品福利影院| 在线观看你懂的视频| 欧美一级欧美三级在线观看| 狂野欧美性猛交blacked| 午夜精品久久久久99蜜桃最新版 | 亚洲成人第一页| 在线免费观看麻豆| 国产精品全国免费观看高清| 91污在线观看| 欧美电影精品一区二区 | 欧美va亚洲va在线观看蝴蝶网| 另类综合日韩欧美亚洲| 欧美特黄一级片| 亚洲综合视频在线| www.色天使| 国产精品三级av| 污污免费在线观看| 久久精品欧美一区二区三区不卡| jlzzjlzz国产精品久久| 欧美一二区视频| 国产精品自产自拍| 欧美日韩一区国产| 另类小说视频一区二区| 色综合久久天天| 日韩中文字幕一区二区三区| 91麻豆精品久久毛片一级| 一区二区三区高清| 亚洲图片另类小说| 夜夜精品浪潮av一区二区三区| 日本一级免费视频| 一区二区三区中文字幕精品精品 | 337p日本欧洲亚洲大胆精品| 成人a免费在线看| 日韩一级高清毛片| 成人动漫一区二区三区| 日韩一区二区电影网| 成人的网站免费观看| 欧美不卡在线视频| 99国产精品一区| 欧美精品一区二区在线播放| 91猫先生在线| 国产日韩欧美a| 熟妇人妻久久中文字幕| 国产精品国产三级国产aⅴ无密码| 800av在线播放| 亚洲天堂网中文字| 久久视频一区二区三区| 午夜久久电影网| 超碰手机在线观看| 国产综合色产在线精品| 欧美肥大bbwbbw高潮| 99精品视频在线免费观看| 337p粉嫩大胆色噜噜噜噜亚洲| 在线中文字日产幕| 国产精品久久久久天堂| 国产精品天天干| 亚洲gay无套男同| 色999日韩国产欧美一区二区| 国产制服丝袜一区| 欧美一级淫片007| gogo亚洲国模私拍人体| 国产精品国产三级国产普通话三级 | 成年人看片网站| 国产精品短视频| 精品人妻中文无码av在线| 图片区小说区国产精品视频| 欧美综合视频在线观看| 国产成人精品亚洲日本在线桃色| 精品国产乱码久久| 黄色国产在线观看| 视频一区二区三区中文字幕| 欧美亚州韩日在线看免费版国语版| 国产在线观看一区二区| 精品sm在线观看| 自拍视频一区二区| 午夜av一区二区| 欧美男人的天堂一二区| 91在线视频官网| 国产精品高潮久久久久无| 中日韩一级黄色片| 国产麻豆视频一区| 久久亚洲精华国产精华液| 色噜噜日韩精品欧美一区二区| 天天综合色天天| 5566中文字幕一区二区电影| 黄色国产在线视频| 五月天网站亚洲| 欧美一级日韩不卡播放免费| 五月天激情小说| 日韩av中文字幕一区二区三区| 制服丝袜激情欧洲亚洲| 黄色免费视频网站| 香蕉影视欧美成人| 91精品国产综合久久久久久| a视频免费观看| 日本强好片久久久久久aaa| 日韩一区二区在线看片| 巨胸大乳www视频免费观看| 久草精品在线观看| 久久久久国产精品厨房| 精品人体无码一区二区三区| 国产乱理伦片在线观看夜一区| 国产亚洲综合av| 国产第一页浮力| 99视频精品全部免费在线| 亚洲免费色视频| 精品视频在线看| 成人免费毛片日本片视频| 另类调教123区 | 国产肉体xxxx裸体784大胆| 看电影不卡的网站| 国产亚洲精久久久久久| 天天看片中文字幕| 色哟哟免费视频| 日韩国产精品久久久久久亚洲| 精品国产sm最大网站免费看| 国产一区在线观看免费| 92精品国产成人观看免费| 午夜成人免费视频| 久久你懂得1024| 色综合色狠狠天天综合色| 能看毛片的网站| 美美哒免费高清在线观看视频一区二区| 亚洲精品一线二线三线无人区| 日本 欧美 国产| 中文字幕 欧美 日韩| 欧美a一区二区| 国产女同性恋一区二区| 欧美性xxxxx极品少妇| 毛片网站免费观看| 成人性视频网站| 亚洲成人免费电影| 久久久久久综合| 色视频一区二区| jlzzjizz在线播放观看| 狠狠狠色丁香婷婷综合久久五月| 亚洲日韩欧美一区二区在线| 制服丝袜av成人在线看| 久久成人小视频| 涩视频在线观看| 国产精品18久久久久久久久| 一区二区在线观看免费| 精品动漫一区二区三区在线观看| 久艹在线观看视频| 日韩综合第一页| 风流少妇一区二区| 午夜影院在线观看欧美| 国产女人18毛片水真多成人如厕 | 国产成人亚洲综合色影视| 亚洲综合一区在线| 久久新电视剧免费观看| 欧美日韩高清一区二区| 亚洲精品91在线| 少妇伦子伦精品无吗| 国产在线一区观看| 亚洲午夜在线电影| 欧美激情一区二区三区蜜桃视频| 欧美欧美午夜aⅴ在线观看| 日韩av手机在线免费观看| 国产精品一区二区人妻喷水| 成人免费看片app下载| 蜜臀av国产精品久久久久| 亚洲人吸女人奶水| www国产成人免费观看视频 深夜成人网| 欧美亚洲一区二区三区四区|