||
- # -*- coding: utf-8 -*-
- """
- 关卡配置管理器
- 负责从Excel读取关卡配置数据,并智能合并到现有JSON文件中
- 只修改或新增策划表中配置的字段,不会完全覆盖JSON文件
- """
- import pandas as pd
- import json
- import os
- from datetime import datetime
- from pathlib import Path
- import re
- # 是否启用大小写不敏感解析(由启动脚本设置 CFG_CASE_INSENSITIVE=1)
- CASE_INSENSITIVE = str(os.environ.get('CFG_CASE_INSENSITIVE', '0')).strip().lower() in ('1', 'true', 'yes', 'y')
- class LevelConfigManager:
- def __init__(self, excel_path=None, levels_dir=None):
- """
- 初始化关卡配置管理器
-
- Args:
- excel_path: Excel文件路径
- levels_dir: 关卡JSON文件目录
- """
- self.excel_path = excel_path
- self.levels_dir = levels_dir or "d:/CocosGame/Pong/assets/data/levels"
- self.backup_dir = os.path.join(os.path.dirname(self.levels_dir), "backups", "levels")
-
- # 确保备份目录存在
- os.makedirs(self.backup_dir, exist_ok=True)
-
- # 配置数据存储
- self.basic_config = None
- self.weapon_config = None
- self.wave_config = None
- self.enemy_config = None
- self.energy_exp_config = None
- self.energy_upgrade_config = None
- # 大小写不敏感的辅助方法
- def _normalize_key(self, s):
- if s is None:
- return ""
- return str(s).strip().lower()
- # 统一关卡ID的规范形式,例如 level12 -> Level12
- def _canonical_level_id(self, s):
- if s is None:
- return ""
- v = str(s).strip()
- m = re.match(r"^\s*level\s*(\d+)\s*$", v, flags=re.IGNORECASE)
- if m:
- return f"Level{m.group(1)}"
- return v
- def _get_sheet(self, all_sheets, candidate_names):
- if not CASE_INSENSITIVE:
- for name in candidate_names:
- if name in all_sheets:
- return all_sheets[name], name
- return None, None
- lower_map = {self._normalize_key(k): k for k in all_sheets.keys()}
- for name in candidate_names:
- key = self._normalize_key(name)
- if key in lower_map:
- resolved = lower_map[key]
- return all_sheets[resolved], resolved
- return None, None
- def _get_row_value(self, row, possible_names, default=None):
- index_map = {self._normalize_key(col): col for col in row.index}
- for name in possible_names:
- norm = self._normalize_key(name)
- if norm in index_map:
- val = row[index_map[norm]]
- return val if pd.notna(val) else default
- return default
- def _to_int(self, v, default=None):
- try:
- if v is None or (isinstance(v, float) and pd.isna(v)):
- return default
- return int(float(str(v).strip()))
- except Exception:
- return default
- def _to_float(self, v, default=None):
- try:
- if v is None or (isinstance(v, float) and pd.isna(v)):
- return default
- return float(str(v).strip())
- except Exception:
- return default
- def read_excel_data(self, excel_path=None):
- """
- 读取Excel文件中的所有工作表数据
-
- Args:
- excel_path: Excel文件路径,如果不提供则使用初始化时的路径
-
- Returns:
- bool: 是否成功读取数据
- """
- if excel_path:
- self.excel_path = excel_path
-
- if not self.excel_path or not os.path.exists(self.excel_path):
- print(f"错误: Excel文件不存在: {self.excel_path}")
- return False
-
- try:
- # 读取所有工作表
- all_sheets = pd.read_excel(self.excel_path, sheet_name=None)
-
- # 解析各个工作表
- self._parse_basic_config(all_sheets)
- self._parse_weapon_config(all_sheets)
- self._parse_wave_config(all_sheets)
- self._parse_enemy_config(all_sheets)
- self._parse_energy_exp_config(all_sheets)
- self._parse_energy_upgrade_config(all_sheets)
-
- print(f"成功读取Excel数据: {self.excel_path}")
- return True
-
- except Exception as e:
- print(f"读取Excel文件失败: {e}")
- return False
- def _parse_basic_config(self, all_sheets):
- """
- 解析关卡基础配置工作表
- """
- sheet_aliases = ['关卡基础配置', '关卡配置', 'Level Config', 'levels', 'level config']
- sheet, resolved = self._get_sheet(all_sheets, sheet_aliases)
- if sheet is not None:
- self.basic_config = sheet
- print(f"找到关卡基础配置工作表: {resolved}")
- return
- print("警告: 未找到关卡基础配置工作表")
- def _parse_weapon_config(self, all_sheets):
- """
- 解析关卡武器配置工作表
- """
- sheet_aliases = ['关卡武器配置', '武器配置', 'Level Weapon Config', 'level_weapons', 'level weapons']
- sheet, resolved = self._get_sheet(all_sheets, sheet_aliases)
- if sheet is not None:
- self.weapon_config = sheet
- print(f"找到关卡武器配置工作表: {resolved}")
- return
- print("警告: 未找到关卡武器配置工作表")
- def _parse_wave_config(self, all_sheets):
- """
- 解析关卡波次配置工作表
- """
- sheet_aliases = ['关卡波次配置', '波次配置', 'Wave Config', 'waves', 'wave config']
- sheet, resolved = self._get_sheet(all_sheets, sheet_aliases)
- if sheet is not None:
- self.wave_config = sheet
- print(f"找到关卡波次配置工作表: {resolved}")
- return
- print("警告: 未找到关卡波次配置工作表")
- def _parse_enemy_config(self, all_sheets):
- """
- 解析敌人详细配置工作表
- """
- sheet_aliases = ['敌人详细配置', '敌人配置', 'Enemy Detail Config', 'enemy_details', 'enemy details']
- sheet, resolved = self._get_sheet(all_sheets, sheet_aliases)
- if sheet is not None:
- self.enemy_config = sheet
- print(f"找到敌人详细配置工作表: {resolved}")
- return
- print("警告: 未找到敌人详细配置工作表")
- def _parse_energy_exp_config(self, all_sheets):
- """
- 解析能量条经验值配置工作表
- """
- sheet_aliases = ['能量经验配置', '能量经验', 'Energy Exp Config', 'energy_exp', 'energy exp']
- sheet, resolved = self._get_sheet(all_sheets, sheet_aliases)
- if sheet is not None:
- self.energy_exp_config = sheet
- print(f"找到能量条经验值配置工作表: {resolved}")
- return
- print("警告: 未找到能量条经验值配置工作表")
- def _parse_energy_upgrade_config(self, all_sheets):
- """
- 解析能量条升级配置工作表
- """
- sheet_aliases = ['能量最大值升级', '能量升级', 'Energy Upgrade Config', 'energy_upgrade', 'energy upgrade']
- sheet, resolved = self._get_sheet(all_sheets, sheet_aliases)
- if sheet is not None:
- self.energy_upgrade_config = sheet
- print(f"找到能量条升级配置工作表: {resolved}")
- return
- print("警告: 未找到能量条升级配置工作表")
- def get_all_level_ids(self):
- """
- 获取所有关卡ID(支持大小写不敏感与英文别名)
-
- Returns:
- set: 所有关卡ID的集合(规范化形式,如 Level10)
- """
- level_ids = set()
- id_aliases = ['关卡ID', '关卡Id', '关卡id', 'LevelID', 'Level ID', 'level_id', 'level id', 'id']
-
- def add_id(val):
- if val is not None:
- level_ids.add(self._canonical_level_id(val))
-
- # 从基础配置中获取关卡ID
- if self.basic_config is not None:
- for _, row in self.basic_config.iterrows():
- add_id(self._get_row_value(row, id_aliases))
-
- # 从武器配置中获取关卡ID
- if self.weapon_config is not None:
- for _, row in self.weapon_config.iterrows():
- add_id(self._get_row_value(row, id_aliases))
-
- # 从波次配置中获取关卡ID
- if self.wave_config is not None:
- for _, row in self.wave_config.iterrows():
- add_id(self._get_row_value(row, id_aliases))
-
- return level_ids
-
- def merge_configurations(self):
- """
- 合并Excel配置到现有JSON文件
-
- Returns:
- dict: 合并结果统计
- """
- if not any([self.basic_config is not None, self.weapon_config is not None,
- self.wave_config is not None, self.enemy_config is not None]):
- print("错误: 没有可用的配置数据")
- return {"success": False, "message": "没有可用的配置数据"}
-
- level_ids = self.get_all_level_ids()
- if not level_ids:
- print("错误: 没有找到任何关卡ID")
- return {"success": False, "message": "没有找到任何关卡ID"}
-
- results = {
- "success": True,
- "processed_levels": [],
- "created_levels": [],
- "updated_levels": [],
- "errors": []
- }
-
- for level_id in level_ids:
- try:
- result = self._update_level_config(level_id)
- results["processed_levels"].append(level_id)
-
- if result["created"]:
- results["created_levels"].append(level_id)
- else:
- results["updated_levels"].append(level_id)
-
- except Exception as e:
- error_msg = f"处理关卡 {level_id} 时出错: {str(e)}"
- print(error_msg)
- results["errors"].append(error_msg)
-
- return results
-
- def _update_level_config(self, level_id):
- """
- 更新单个关卡的配置
- """
- canonical_id = self._canonical_level_id(level_id)
- cmp_id_lower = canonical_id.strip().lower()
-
- # 选择规范化的文件路径,并修正历史上小写文件名
- json_path = os.path.join(self.levels_dir, f"{canonical_id}.json")
- lower_path = os.path.join(self.levels_dir, f"{canonical_id.lower()}.json")
- if not os.path.exists(json_path) and os.path.exists(lower_path):
- try:
- os.replace(lower_path, json_path)
- print(f"已规范化文件名: {lower_path} -> {json_path}")
- except Exception as e:
- print(f"文件名规范化失败: {e}")
-
- # 读取现有配置或创建新配置
- if os.path.exists(json_path):
- with open(json_path, 'r', encoding='utf-8') as f:
- existing_config = json.load(f)
- created = False
- else:
- existing_config = {
- "levelId": canonical_id,
- "name": "",
- "scene": "grassland",
- "description": "",
- "backgroundImage": "images/LevelBackground/BG1",
- "availableWeapons": [],
- "coinReward": 100,
- "diamondReward": 0,
- "initialCoins": 50,
- "timeLimit": 300,
- "difficulty": "normal",
- "healthMultiplier": 1.0,
- "levelSettings": {
- "energyMax": 5
- },
- "waves": []
- }
- created = True
-
- # 保证levelId写入规范化形式
- existing_config['levelId'] = canonical_id
-
- # 备份现有文件
- if not created:
- self._backup_level_file(json_path, canonical_id)
-
- # 公共列别名
- id_aliases = ['关卡ID', '关卡Id', '关卡id', 'LevelID', 'Level ID', 'level_id', 'level id', 'id']
-
- # 更新基础配置
- if self.basic_config is not None:
- basic_row = None
- for _, row in self.basic_config.iterrows():
- rid = self._get_row_value(row, id_aliases)
- if rid is not None and str(rid).strip().lower() == cmp_id_lower:
- basic_row = row
- break
- if basic_row is not None:
- name = self._get_row_value(basic_row, ['关卡名称','关卡名','Level Name','name'])
- scene = self._get_row_value(basic_row, ['场景','Scene','scene'])
- desc = self._get_row_value(basic_row, ['描述','Description','desc'])
- bg = self._get_row_value(basic_row, ['关卡背景图路径','背景图','背景','Background Image','backgroundImage'])
- coins = self._get_row_value(basic_row, ['钞票奖励','Coin Reward','coins','coinReward'])
- diamonds = self._get_row_value(basic_row, ['钻石奖励','Diamond Reward','diamonds','diamondReward'])
- health_mul = self._get_row_value(basic_row, ['生命倍数','Health Multiplier','healthMultiplier'])
- difficulty = self._get_row_value(basic_row, ['难度','Difficulty','difficulty'])
- initial = self._get_row_value(basic_row, ['初始金币','Initial Coins','initialCoins'])
- energy_max = self._get_row_value(basic_row, ['能量最大值','Energy Max','energyMax'])
-
- if name is not None:
- existing_config['name'] = str(name)
- if scene is not None:
- existing_config['scene'] = str(scene)
- if desc is not None:
- existing_config['description'] = str(desc)
- if bg is not None:
- existing_config['backgroundImage'] = str(bg)
- if coins is not None:
- existing_config['coinReward'] = self._to_int(coins, existing_config.get('coinReward', 100))
- if diamonds is not None:
- existing_config['diamondReward'] = self._to_int(diamonds, existing_config.get('diamondReward', 0))
- if health_mul is not None:
- existing_config['healthMultiplier'] = self._to_float(health_mul, existing_config.get('healthMultiplier', 1.0))
- if difficulty is not None:
- existing_config['difficulty'] = str(difficulty)
- if initial is not None:
- existing_config['initialCoins'] = self._to_int(initial, existing_config.get('initialCoins', 50))
- if energy_max is not None:
- existing_config.setdefault('levelSettings', {})['energyMax'] = self._to_int(energy_max, existing_config.get('levelSettings', {}).get('energyMax', 5))
-
- if 'levelSettings' not in existing_config:
- existing_config['levelSettings'] = {}
-
- # 能量条升级配置
- if self.energy_upgrade_config is not None:
- energy_upgrade_row = None
- for _, row in self.energy_upgrade_config.iterrows():
- rid = self._get_row_value(row, id_aliases)
- if rid is not None and str(rid).strip().lower() == cmp_id_lower:
- energy_upgrade_row = row
- break
- if energy_upgrade_row is not None:
- if 'levelSettings' not in existing_config:
- existing_config['levelSettings'] = {}
- energy_max_values = []
- for i in range(1, 21):
- cn = f'第{i}次升级后最大值'
- en1 = f'Energy Max After Upgrade {i}'
- en2 = f'EnergyMaxUpgrade{i}'
- en3 = f'EnergyMax{i}'
- val = self._get_row_value(energy_upgrade_row, [cn, en1, en2, en3])
- if val is not None:
- energy_max_values.append(self._to_int(val, None))
- else:
- base_max = 6
- energy_max_values.append(base_max + (i - 1))
- existing_config['levelSettings']['energyMaxUpgrades'] = energy_max_values
-
- # 更新武器配置
- if self.weapon_config is not None:
- available_weapons = []
- for _, weapon_row in self.weapon_config.iterrows():
- rid = self._get_row_value(weapon_row, id_aliases)
- if rid is None or str(rid).strip().lower() != cmp_id_lower:
- continue
- weapons_str = self._get_row_value(weapon_row, ['可用武器','Available Weapons','weapons'])
- if weapons_str:
- weapons = re.split(r'[、,,;;]', str(weapons_str))
- available_weapons.extend([w.strip() for w in weapons if w.strip()])
- if available_weapons:
- existing_config['availableWeapons'] = available_weapons
-
- # 更新波次和敌人配置
- if self.wave_config is not None and self.enemy_config is not None:
- waves_data = []
- # collect rows for this level
- level_waves = []
- for _, wave_row in self.wave_config.iterrows():
- rid = self._get_row_value(wave_row, id_aliases)
- if rid is not None and str(rid).strip().lower() == cmp_id_lower:
- level_waves.append(wave_row)
- for wave_row in level_waves:
- wave_id_val = self._get_row_value(wave_row, ['波次ID','波次','Wave ID','waveId','wave id'])
- wave_id = self._to_int(wave_id_val, 1)
-
- wave_enemies = []
- wave_enemy_data = []
- for _, enemy_row in self.enemy_config.iterrows():
- rid = self._get_row_value(enemy_row, id_aliases)
- wid = self._get_row_value(enemy_row, ['波次ID','波次','Wave ID','waveId','wave id'])
- if rid is not None and str(rid).strip().lower() == cmp_id_lower and self._to_int(wid, None) == wave_id:
- wave_enemy_data.append(enemy_row)
-
- wave_health_multiplier = 1.0
- for enemy_row in wave_enemy_data:
- enemy_health_multiplier = self._to_float(
- self._get_row_value(enemy_row, ['血量系数','Health Multiplier','healthMultiplier']), 1.0
- )
- wave_health_multiplier = max(wave_health_multiplier, enemy_health_multiplier)
-
- enemy_type = self._get_row_value(enemy_row, ['敌人类型','Enemy Type','enemyType'])
- count = self._to_int(self._get_row_value(enemy_row, ['数量','Count','count']), 1)
- spawn_interval = self._to_float(self._get_row_value(enemy_row, ['生成间隔','Spawn Interval','spawnInterval']), 2.0)
- spawn_delay = self._to_float(self._get_row_value(enemy_row, ['生成延迟','Spawn Delay','spawnDelay']), 0.0)
- characteristics = self._get_row_value(enemy_row, ['特征描述','Characteristics','characteristics'])
-
- enemy_data = {
- 'enemyType': str(enemy_type) if enemy_type is not None else 'normal_zombie',
- 'count': count,
- 'spawnInterval': spawn_interval,
- 'spawnDelay': spawn_delay,
- 'characteristics': str(characteristics) if characteristics is not None else '',
- 'healthMultiplier': enemy_health_multiplier
- }
- wave_enemies.append(enemy_data)
-
- wave_data = {
- 'waveId': wave_id,
- 'healthMultiplier': wave_health_multiplier,
- 'enemies': wave_enemies
- }
- waves_data.append(wave_data)
-
- if waves_data:
- waves_data.sort(key=lambda x: x['waveId'])
- existing_config['waves'] = waves_data
-
- # 保存更新后的配置
- with open(json_path, 'w', encoding='utf-8') as f:
- json.dump(existing_config, f, ensure_ascii=False, indent=2)
-
- print(f"{'创建' if created else '更新'}关卡配置: {canonical_id}")
- return {"created": created, "updated": not created}
-
- def _backup_level_file(self, json_path, level_id):
- """
- 备份关卡JSON文件
-
- Args:
- json_path: JSON文件路径
- level_id: 关卡ID
- """
- try:
- timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
- backup_filename = f"{level_id}_{timestamp}.json"
- backup_path = os.path.join(self.backup_dir, backup_filename)
-
- with open(json_path, 'r', encoding='utf-8') as src:
- with open(backup_path, 'w', encoding='utf-8') as dst:
- dst.write(src.read())
-
- print(f"备份关卡文件: {backup_path}")
-
- except Exception as e:
- print(f"备份关卡文件失败: {e}")
-
- def import_from_excel(self, excel_path=None, levels_dir=None):
- """
- 从Excel导入关卡配置的完整流程
-
- Args:
- excel_path: Excel文件路径
- levels_dir: 关卡JSON文件目录
-
- Returns:
- dict: 导入结果
- """
- if excel_path:
- self.excel_path = excel_path
- if levels_dir:
- self.levels_dir = levels_dir
-
- print("开始导入关卡配置...")
-
- # 读取Excel数据
- if not self.read_excel_data():
- return {"success": False, "message": "读取Excel数据失败"}
-
- # 合并配置
- results = self.merge_configurations()
-
- if results["success"]:
- print(f"关卡配置导入完成:")
- print(f" 处理关卡数: {len(results['processed_levels'])}")
- print(f" 新建关卡数: {len(results['created_levels'])}")
- print(f" 更新关卡数: {len(results['updated_levels'])}")
- if results["errors"]:
- print(f" 错误数: {len(results['errors'])}")
- for error in results["errors"]:
- print(f" {error}")
-
- return results
- # 使用示例
- if __name__ == "__main__":
- # 创建关卡配置管理器
- manager = LevelConfigManager(
- excel_path="d:/CocosGame/Pong/assets/data/excel/关卡配置/关卡配置表.xlsx",
- levels_dir="d:/CocosGame/Pong/assets/data/levels"
- )
-
- # 导入配置
- result = manager.import_from_excel()
-
- if result["success"]:
- print("关卡配置导入成功!")
- else:
- print(f"关卡配置导入失败: {result['message']}")
|