# -*- coding: utf-8 -*- """ 关卡配置管理器 负责从Excel读取关卡配置数据,并智能合并到现有JSON文件中 只修改或新增策划表中配置的字段,不会完全覆盖JSON文件 """ import pandas as pd import json import os from datetime import datetime from pathlib import Path import re # 是否启用大小写不敏感解析(由启动脚本设置 CFG_CASE_INSENSITIVE=1) CASE_INSENSITIVE = str(os.environ.get('CFG_CASE_INSENSITIVE', '0')).strip().lower() in ('1', 'true', 'yes', 'y') class LevelConfigManager: def __init__(self, excel_path=None, levels_dir=None): """ 初始化关卡配置管理器 Args: excel_path: Excel文件路径 levels_dir: 关卡JSON文件目录 """ self.excel_path = excel_path self.levels_dir = levels_dir or "d:/CocosGame/Pong/assets/data/levels" self.backup_dir = os.path.join(os.path.dirname(self.levels_dir), "backups", "levels") # 确保备份目录存在 os.makedirs(self.backup_dir, exist_ok=True) # 配置数据存储 self.basic_config = None self.weapon_config = None self.wave_config = None self.enemy_config = None self.energy_exp_config = None self.energy_upgrade_config = None # 大小写不敏感的辅助方法 def _normalize_key(self, s): if s is None: return "" return str(s).strip().lower() # 统一关卡ID的规范形式,例如 level12 -> Level12 def _canonical_level_id(self, s): if s is None: return "" v = str(s).strip() m = re.match(r"^\s*level\s*(\d+)\s*$", v, flags=re.IGNORECASE) if m: return f"Level{m.group(1)}" return v def _get_sheet(self, all_sheets, candidate_names): if not CASE_INSENSITIVE: for name in candidate_names: if name in all_sheets: return all_sheets[name], name return None, None lower_map = {self._normalize_key(k): k for k in all_sheets.keys()} for name in candidate_names: key = self._normalize_key(name) if key in lower_map: resolved = lower_map[key] return all_sheets[resolved], resolved return None, None def _get_row_value(self, row, possible_names, default=None): index_map = {self._normalize_key(col): col for col in row.index} for name in possible_names: norm = self._normalize_key(name) if norm in index_map: val = row[index_map[norm]] return val if pd.notna(val) else default return default def _to_int(self, v, default=None): try: if v is None or (isinstance(v, float) and pd.isna(v)): return default return int(float(str(v).strip())) except Exception: return default def _to_float(self, v, default=None): try: if v is None or (isinstance(v, float) and pd.isna(v)): return default return float(str(v).strip()) except Exception: return default def read_excel_data(self, excel_path=None): """ 读取Excel文件中的所有工作表数据 Args: excel_path: Excel文件路径,如果不提供则使用初始化时的路径 Returns: bool: 是否成功读取数据 """ if excel_path: self.excel_path = excel_path if not self.excel_path or not os.path.exists(self.excel_path): print(f"错误: Excel文件不存在: {self.excel_path}") return False try: # 读取所有工作表 all_sheets = pd.read_excel(self.excel_path, sheet_name=None) # 解析各个工作表 self._parse_basic_config(all_sheets) self._parse_weapon_config(all_sheets) self._parse_wave_config(all_sheets) self._parse_enemy_config(all_sheets) self._parse_energy_exp_config(all_sheets) self._parse_energy_upgrade_config(all_sheets) print(f"成功读取Excel数据: {self.excel_path}") return True except Exception as e: print(f"读取Excel文件失败: {e}") return False def _parse_basic_config(self, all_sheets): """ 解析关卡基础配置工作表 """ sheet_aliases = ['关卡基础配置', '关卡配置', 'Level Config', 'levels', 'level config'] sheet, resolved = self._get_sheet(all_sheets, sheet_aliases) if sheet is not None: self.basic_config = sheet print(f"找到关卡基础配置工作表: {resolved}") return print("警告: 未找到关卡基础配置工作表") def _parse_weapon_config(self, all_sheets): """ 解析关卡武器配置工作表 """ sheet_aliases = ['关卡武器配置', '武器配置', 'Level Weapon Config', 'level_weapons', 'level weapons'] sheet, resolved = self._get_sheet(all_sheets, sheet_aliases) if sheet is not None: self.weapon_config = sheet print(f"找到关卡武器配置工作表: {resolved}") return print("警告: 未找到关卡武器配置工作表") def _parse_wave_config(self, all_sheets): """ 解析关卡波次配置工作表 """ sheet_aliases = ['关卡波次配置', '波次配置', 'Wave Config', 'waves', 'wave config'] sheet, resolved = self._get_sheet(all_sheets, sheet_aliases) if sheet is not None: self.wave_config = sheet print(f"找到关卡波次配置工作表: {resolved}") return print("警告: 未找到关卡波次配置工作表") def _parse_enemy_config(self, all_sheets): """ 解析敌人详细配置工作表 """ sheet_aliases = ['敌人详细配置', '敌人配置', 'Enemy Detail Config', 'enemy_details', 'enemy details'] sheet, resolved = self._get_sheet(all_sheets, sheet_aliases) if sheet is not None: self.enemy_config = sheet print(f"找到敌人详细配置工作表: {resolved}") return print("警告: 未找到敌人详细配置工作表") def _parse_energy_exp_config(self, all_sheets): """ 解析能量条经验值配置工作表 """ sheet_aliases = ['能量经验配置', '能量经验', 'Energy Exp Config', 'energy_exp', 'energy exp'] sheet, resolved = self._get_sheet(all_sheets, sheet_aliases) if sheet is not None: self.energy_exp_config = sheet print(f"找到能量条经验值配置工作表: {resolved}") return print("警告: 未找到能量条经验值配置工作表") def _parse_energy_upgrade_config(self, all_sheets): """ 解析能量条升级配置工作表 """ sheet_aliases = ['能量最大值升级', '能量升级', 'Energy Upgrade Config', 'energy_upgrade', 'energy upgrade'] sheet, resolved = self._get_sheet(all_sheets, sheet_aliases) if sheet is not None: self.energy_upgrade_config = sheet print(f"找到能量条升级配置工作表: {resolved}") return print("警告: 未找到能量条升级配置工作表") def get_all_level_ids(self): """ 获取所有关卡ID(支持大小写不敏感与英文别名) Returns: set: 所有关卡ID的集合(规范化形式,如 Level10) """ level_ids = set() id_aliases = ['关卡ID', '关卡Id', '关卡id', 'LevelID', 'Level ID', 'level_id', 'level id', 'id'] def add_id(val): if val is not None: level_ids.add(self._canonical_level_id(val)) # 从基础配置中获取关卡ID if self.basic_config is not None: for _, row in self.basic_config.iterrows(): add_id(self._get_row_value(row, id_aliases)) # 从武器配置中获取关卡ID if self.weapon_config is not None: for _, row in self.weapon_config.iterrows(): add_id(self._get_row_value(row, id_aliases)) # 从波次配置中获取关卡ID if self.wave_config is not None: for _, row in self.wave_config.iterrows(): add_id(self._get_row_value(row, id_aliases)) return level_ids def merge_configurations(self): """ 合并Excel配置到现有JSON文件 Returns: dict: 合并结果统计 """ if not any([self.basic_config is not None, self.weapon_config is not None, self.wave_config is not None, self.enemy_config is not None]): print("错误: 没有可用的配置数据") return {"success": False, "message": "没有可用的配置数据"} level_ids = self.get_all_level_ids() if not level_ids: print("错误: 没有找到任何关卡ID") return {"success": False, "message": "没有找到任何关卡ID"} results = { "success": True, "processed_levels": [], "created_levels": [], "updated_levels": [], "errors": [] } for level_id in level_ids: try: result = self._update_level_config(level_id) results["processed_levels"].append(level_id) if result["created"]: results["created_levels"].append(level_id) else: results["updated_levels"].append(level_id) except Exception as e: error_msg = f"处理关卡 {level_id} 时出错: {str(e)}" print(error_msg) results["errors"].append(error_msg) return results def _update_level_config(self, level_id): """ 更新单个关卡的配置 """ canonical_id = self._canonical_level_id(level_id) cmp_id_lower = canonical_id.strip().lower() # 选择规范化的文件路径,并修正历史上小写文件名 json_path = os.path.join(self.levels_dir, f"{canonical_id}.json") lower_path = os.path.join(self.levels_dir, f"{canonical_id.lower()}.json") if not os.path.exists(json_path) and os.path.exists(lower_path): try: os.replace(lower_path, json_path) print(f"已规范化文件名: {lower_path} -> {json_path}") except Exception as e: print(f"文件名规范化失败: {e}") # 读取现有配置或创建新配置 if os.path.exists(json_path): with open(json_path, 'r', encoding='utf-8') as f: existing_config = json.load(f) created = False else: existing_config = { "levelId": canonical_id, "name": "", "scene": "grassland", "description": "", "backgroundImage": "images/LevelBackground/BG1", "availableWeapons": [], "coinReward": 100, "diamondReward": 0, "initialCoins": 50, "timeLimit": 300, "difficulty": "normal", "healthMultiplier": 1.0, "levelSettings": { "energyMax": 5 }, "waves": [] } created = True # 保证levelId写入规范化形式 existing_config['levelId'] = canonical_id # 备份现有文件 if not created: self._backup_level_file(json_path, canonical_id) # 公共列别名 id_aliases = ['关卡ID', '关卡Id', '关卡id', 'LevelID', 'Level ID', 'level_id', 'level id', 'id'] # 更新基础配置 if self.basic_config is not None: basic_row = None for _, row in self.basic_config.iterrows(): rid = self._get_row_value(row, id_aliases) if rid is not None and str(rid).strip().lower() == cmp_id_lower: basic_row = row break if basic_row is not None: name = self._get_row_value(basic_row, ['关卡名称','关卡名','Level Name','name']) scene = self._get_row_value(basic_row, ['场景','Scene','scene']) desc = self._get_row_value(basic_row, ['描述','Description','desc']) bg = self._get_row_value(basic_row, ['关卡背景图路径','背景图','背景','Background Image','backgroundImage']) coins = self._get_row_value(basic_row, ['钞票奖励','Coin Reward','coins','coinReward']) diamonds = self._get_row_value(basic_row, ['钻石奖励','Diamond Reward','diamonds','diamondReward']) health_mul = self._get_row_value(basic_row, ['生命倍数','Health Multiplier','healthMultiplier']) difficulty = self._get_row_value(basic_row, ['难度','Difficulty','difficulty']) initial = self._get_row_value(basic_row, ['初始金币','Initial Coins','initialCoins']) energy_max = self._get_row_value(basic_row, ['能量最大值','Energy Max','energyMax']) if name is not None: existing_config['name'] = str(name) if scene is not None: existing_config['scene'] = str(scene) if desc is not None: existing_config['description'] = str(desc) if bg is not None: existing_config['backgroundImage'] = str(bg) if coins is not None: existing_config['coinReward'] = self._to_int(coins, existing_config.get('coinReward', 100)) if diamonds is not None: existing_config['diamondReward'] = self._to_int(diamonds, existing_config.get('diamondReward', 0)) if health_mul is not None: existing_config['healthMultiplier'] = self._to_float(health_mul, existing_config.get('healthMultiplier', 1.0)) if difficulty is not None: existing_config['difficulty'] = str(difficulty) if initial is not None: existing_config['initialCoins'] = self._to_int(initial, existing_config.get('initialCoins', 50)) if energy_max is not None: existing_config.setdefault('levelSettings', {})['energyMax'] = self._to_int(energy_max, existing_config.get('levelSettings', {}).get('energyMax', 5)) if 'levelSettings' not in existing_config: existing_config['levelSettings'] = {} # 能量条升级配置 if self.energy_upgrade_config is not None: energy_upgrade_row = None for _, row in self.energy_upgrade_config.iterrows(): rid = self._get_row_value(row, id_aliases) if rid is not None and str(rid).strip().lower() == cmp_id_lower: energy_upgrade_row = row break if energy_upgrade_row is not None: if 'levelSettings' not in existing_config: existing_config['levelSettings'] = {} energy_max_values = [] for i in range(1, 21): cn = f'第{i}次升级后最大值' en1 = f'Energy Max After Upgrade {i}' en2 = f'EnergyMaxUpgrade{i}' en3 = f'EnergyMax{i}' val = self._get_row_value(energy_upgrade_row, [cn, en1, en2, en3]) if val is not None: energy_max_values.append(self._to_int(val, None)) else: base_max = 6 energy_max_values.append(base_max + (i - 1)) existing_config['levelSettings']['energyMaxUpgrades'] = energy_max_values # 更新武器配置 if self.weapon_config is not None: available_weapons = [] for _, weapon_row in self.weapon_config.iterrows(): rid = self._get_row_value(weapon_row, id_aliases) if rid is None or str(rid).strip().lower() != cmp_id_lower: continue weapons_str = self._get_row_value(weapon_row, ['可用武器','Available Weapons','weapons']) if weapons_str: weapons = re.split(r'[、,,;;]', str(weapons_str)) available_weapons.extend([w.strip() for w in weapons if w.strip()]) if available_weapons: existing_config['availableWeapons'] = available_weapons # 更新波次和敌人配置 if self.wave_config is not None and self.enemy_config is not None: waves_data = [] # collect rows for this level level_waves = [] for _, wave_row in self.wave_config.iterrows(): rid = self._get_row_value(wave_row, id_aliases) if rid is not None and str(rid).strip().lower() == cmp_id_lower: level_waves.append(wave_row) for wave_row in level_waves: wave_id_val = self._get_row_value(wave_row, ['波次ID','波次','Wave ID','waveId','wave id']) wave_id = self._to_int(wave_id_val, 1) wave_enemies = [] wave_enemy_data = [] for _, enemy_row in self.enemy_config.iterrows(): rid = self._get_row_value(enemy_row, id_aliases) wid = self._get_row_value(enemy_row, ['波次ID','波次','Wave ID','waveId','wave id']) if rid is not None and str(rid).strip().lower() == cmp_id_lower and self._to_int(wid, None) == wave_id: wave_enemy_data.append(enemy_row) wave_health_multiplier = 1.0 for enemy_row in wave_enemy_data: enemy_health_multiplier = self._to_float( self._get_row_value(enemy_row, ['血量系数','Health Multiplier','healthMultiplier']), 1.0 ) wave_health_multiplier = max(wave_health_multiplier, enemy_health_multiplier) enemy_type = self._get_row_value(enemy_row, ['敌人类型','Enemy Type','enemyType']) count = self._to_int(self._get_row_value(enemy_row, ['数量','Count','count']), 1) spawn_interval = self._to_float(self._get_row_value(enemy_row, ['生成间隔','Spawn Interval','spawnInterval']), 2.0) spawn_delay = self._to_float(self._get_row_value(enemy_row, ['生成延迟','Spawn Delay','spawnDelay']), 0.0) characteristics = self._get_row_value(enemy_row, ['特征描述','Characteristics','characteristics']) enemy_data = { 'enemyType': str(enemy_type) if enemy_type is not None else 'normal_zombie', 'count': count, 'spawnInterval': spawn_interval, 'spawnDelay': spawn_delay, 'characteristics': str(characteristics) if characteristics is not None else '', 'healthMultiplier': enemy_health_multiplier } wave_enemies.append(enemy_data) wave_data = { 'waveId': wave_id, 'healthMultiplier': wave_health_multiplier, 'enemies': wave_enemies } waves_data.append(wave_data) if waves_data: waves_data.sort(key=lambda x: x['waveId']) existing_config['waves'] = waves_data # 保存更新后的配置 with open(json_path, 'w', encoding='utf-8') as f: json.dump(existing_config, f, ensure_ascii=False, indent=2) print(f"{'创建' if created else '更新'}关卡配置: {canonical_id}") return {"created": created, "updated": not created} def _backup_level_file(self, json_path, level_id): """ 备份关卡JSON文件 Args: json_path: JSON文件路径 level_id: 关卡ID """ try: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") backup_filename = f"{level_id}_{timestamp}.json" backup_path = os.path.join(self.backup_dir, backup_filename) with open(json_path, 'r', encoding='utf-8') as src: with open(backup_path, 'w', encoding='utf-8') as dst: dst.write(src.read()) print(f"备份关卡文件: {backup_path}") except Exception as e: print(f"备份关卡文件失败: {e}") def import_from_excel(self, excel_path=None, levels_dir=None): """ 从Excel导入关卡配置的完整流程 Args: excel_path: Excel文件路径 levels_dir: 关卡JSON文件目录 Returns: dict: 导入结果 """ if excel_path: self.excel_path = excel_path if levels_dir: self.levels_dir = levels_dir print("开始导入关卡配置...") # 读取Excel数据 if not self.read_excel_data(): return {"success": False, "message": "读取Excel数据失败"} # 合并配置 results = self.merge_configurations() if results["success"]: print(f"关卡配置导入完成:") print(f" 处理关卡数: {len(results['processed_levels'])}") print(f" 新建关卡数: {len(results['created_levels'])}") print(f" 更新关卡数: {len(results['updated_levels'])}") if results["errors"]: print(f" 错误数: {len(results['errors'])}") for error in results["errors"]: print(f" {error}") return results # 使用示例 if __name__ == "__main__": # 创建关卡配置管理器 manager = LevelConfigManager( excel_path="d:/CocosGame/Pong/assets/data/excel/关卡配置/关卡配置表.xlsx", levels_dir="d:/CocosGame/Pong/assets/data/levels" ) # 导入配置 result = manager.import_from_excel() if result["success"]: print("关卡配置导入成功!") else: print(f"关卡配置导入失败: {result['message']}")