Browse Source

Update

master
Harish Karumuthil 2 months ago
parent
commit
42fa5a0426
  1. 47
      home/.local/Apps/daily-utils/bin/mysql-backup.py

47
home/.local/Apps/daily-utils/bin/mysql-backup.py

@ -7,45 +7,63 @@ import gzip
import shutil
import argparse
import sys
import fnmatch
def get_args():
parser = argparse.ArgumentParser(description="High-performance MySQL multi-database backup tool.")
parser.add_argument("-H", "--host", default="localhost", help="MySQL Host (default: localhost)")
parser.add_argument("-P", "--port", default="3306", help="MySQL Port (default: 3306)")
parser.add_argument("-H", "--host", default="localhost", help="MySQL Host")
parser.add_argument("-P", "--port", default="3306", help="MySQL Port")
parser.add_argument("-u", "--user", required=True, help="MySQL Username")
parser.add_argument("-p", "--password", required=True, help="MySQL Password")
parser.add_argument("-o", "--output", default="backups", help="Output directory (default: backups)")
parser.add_argument("-o", "--output", default="backups", help="Output directory")
# New argument for exclusion patterns
parser.add_argument("-x", "--exclude", action="append", default=[],
help="Pattern to exclude (e.g., 'test_*' or 'tmp_'). Can be used multiple times.")
return parser.parse_args()
def should_exclude(db_name, patterns):
"""Checks if a database name matches any provided shell-style patterns."""
system_dbs = ('information_schema', 'performance_schema', 'sys', 'mysql', 'Database')
if db_name in system_dbs:
return True
for pattern in patterns:
if fnmatch.fnmatch(db_name, pattern):
return True
return False
def run_backup():
args = get_args()
# 1. Setup Environment
if not os.path.exists(args.output):
os.makedirs(args.output)
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
exclude_dbs = ('information_schema', 'performance_schema', 'sys', 'mysql', 'Database')
# Common connection flags
conn_flags = ['-h', args.host, '-P', args.port, '-u', args.user, f'-p{args.password}']
try:
# 2. Fetch Database List
# 1. Fetch Database List
print(f"Connecting to {args.host}...")
proc = subprocess.run(
['mysql'] + conn_flags + ['-e', 'SHOW DATABASES;', '-N', '-s'],
capture_output=True, text=True, check=True
)
databases = [db for db in proc.stdout.split() if db not in exclude_dbs]
raw_databases = proc.stdout.split()
# 2. Filter databases based on patterns
databases = [db for db in raw_databases if not should_exclude(db, args.exclude)]
if not databases:
print("No databases found matching the criteria.")
return
print(f"Starting backup for {len(databases)} databases...")
# 3. Process Each Database
for db in databases:
print(f"Backing up: {db}...", end=" ", flush=True)
print(f" -> {db}...", end=" ", flush=True)
filename = f"{db}_{timestamp}.sql.gz"
filepath = os.path.join(args.output, filename)
# Fast-load dump flags
dump_cmd = [
'mysqldump'
] + conn_flags + [
@ -57,7 +75,6 @@ def run_backup():
db
]
# Stream from mysqldump to Gzip
with subprocess.Popen(dump_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as dump_proc:
with gzip.open(filepath, 'wb', compresslevel=6) as f_out:
shutil.copyfileobj(dump_proc.stdout, f_out)
@ -66,9 +83,9 @@ def run_backup():
if dump_proc.returncode != 0:
error_msg = dump_proc.stderr.read().decode()
print(f"\nERROR on {db}: {error_msg}")
print(f"\nFAILED: {error_msg}")
else:
print(f"Done -> {filename}")
print(f"OK")
except subprocess.CalledProcessError as e:
print(f"\nConnection Error: {e.stderr}")

Loading…
Cancel
Save