1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
| #!/usr/bin/env python3 from huggingface_hub import HfApi import sys import os import shutil import time import glob from datetime import datetime import tempfile
def manage_backups(api, repo_id, max_files=10): """管理备份文件数量,删除旧的备份""" try: files = api.list_repo_files(repo_id=repo_id, repo_type="dataset") backup_files = [f for f in files if f.startswith('navidrome_backup_') and f.endswith('.tar.gz')] backup_files.sort() if len(backup_files) >= max_files: files_to_delete = backup_files[:(len(backup_files) - max_files + 1)] for file_to_delete in files_to_delete: try: api.delete_file(path_in_repo=file_to_delete, repo_id=repo_id, repo_type="dataset") print(f'已删除旧备份: {file_to_delete}') except Exception as e: print(f'删除 {file_to_delete} 时出错: {str(e)}') except Exception as e: print(f'管理备份时出错: {str(e)}')
def upload_backup(file_path, file_name, token, repo_id): """上传备份文件到HuggingFace""" api = HfApi(token=token) try: api.upload_file( path_or_fileobj=file_path, path_in_repo=file_name, repo_id=repo_id, repo_type="dataset" ) print(f"成功上传 {file_name}") manage_backups(api, repo_id) except Exception as e: print(f"文件上传出错: {str(e)}")
def download_latest_backup(token, repo_id, data_dir): """下载最新的备份文件并恢复""" try: # 确保目标目录存在并有正确权限 os.makedirs(data_dir, exist_ok=True) os.chmod(data_dir, 0o755) api = HfApi(token=token) files = api.list_repo_files(repo_id=repo_id, repo_type="dataset") backup_files = [f for f in files if f.startswith('navidrome_backup_') and f.endswith('.tar.gz')] if not backup_files: print("未找到备份文件") return False latest_backup = sorted(backup_files)[-1] # 使用临时目录下载文件 with tempfile.TemporaryDirectory() as temp_dir: filepath = api.hf_hub_download( repo_id=repo_id, filename=latest_backup, repo_type="dataset", local_dir=temp_dir ) if filepath and os.path.exists(filepath): # 解压备份文件 import tarfile with tarfile.open(filepath, "r:gz") as tar: tar.extractall(path=data_dir) print(f"成功从 {latest_backup} 恢复备份到 {data_dir}") return True return False except Exception as e: print(f"下载备份时出错: {str(e)}") return False
def create_backup(data_dir, exclude_dirs=None): """创建备份文件,排除指定目录""" if exclude_dirs is None: exclude_dirs = [] timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_filename = f"navidrome_backup_{timestamp}.tar.gz" backup_path = f"/tmp/{backup_filename}" try: import tarfile with tarfile.open(backup_path, "w:gz") as tar: for item in os.listdir(data_dir): item_path = os.path.join(data_dir, item) # 排除指定目录 if os.path.isdir(item_path) and item in exclude_dirs: continue tar.add(item_path, arcname=item) print(f"备份文件创建成功: {backup_path}") return backup_path, backup_filename except Exception as e: print(f"创建备份时出错: {str(e)}") return None, None
if __name__ == "__main__": action = sys.argv[1] token = sys.argv[2] repo_id = sys.argv[3] data_dir = sys.argv[4] if action == "upload": # 排除音乐目录和缓存,只备份数据和配置 exclude_dirs = ["cache", "music"] backup_path, backup_filename = create_backup(data_dir, exclude_dirs) if backup_path: upload_backup(backup_path, backup_filename, token, repo_id) # 清理临时文件 os.remove(backup_path) elif action == "download": download_latest_backup(token, repo_id, data_dir)
|