构建可复用的Agent Skill库:模块化设计

模块化Skill库架构图

构建可复用的Agent Skill库是提升AI系统开发效率的关键。一个好的Skill库不仅提供丰富的预置能力,还应该具备良好的扩展性,让开发者能够轻松创建、组合和定制Skill。这需要从模块化设计、接口规范、插件机制和依赖管理等多个维度进行系统化设计。

本文将深入探讨构建高质量Skill库的核心方法论,通过具体的代码示例展示如何实现模块化的Skill架构。

模块化设计原则

模块化是Skill库设计的基石。一个模块化的Skill库应该像积木一样,允许开发者根据需要组合不同的能力。

单一职责与接口隔离

每个Skill应该只负责一个明确的功能领域。当一个Skill试图做太多事情时,它不仅难以维护,还会降低复用性。例如,一个”代码审查”Skill不应该同时处理”代码生成”,即使两者都与代码相关。

接口隔离原则要求Skill的接口应该精简且专注。一个Skill不应该强迫使用者依赖它们不需要的方法或配置。

from abc import ABC, abstractmethod
from typing import Dict, Any, Optional, List
from dataclasses import dataclass

@dataclass
class SkillInput:
    """Skill输入标准格式"""
    data: Any
    context: Dict[str, Any] = None
    parameters: Dict[str, Any] = None

@dataclass
class SkillOutput:
    """Skill输出标准格式"""
    result: Any
    metadata: Dict[str, Any] = None
    execution_time: float = 0.0
    confidence: float = 1.0

class Skill(ABC):
    """Skill抽象基类"""
    
    @property
    @abstractmethod
    def name(self) -> str:
        """Skill名称"""
        pass
    
    @property
    @abstractmethod
    def version(self) -> str:
        """Skill版本"""
        pass
    
    @property
    @abstractmethod
    def description(self) -> str:
        """Skill描述"""
        pass
    
    @abstractmethod
    def execute(self, input_data: SkillInput) -> SkillOutput:
        """执行Skill"""
        pass
    
    @abstractmethod
    def validate_input(self, input_data: SkillInput) -> bool:
        """验证输入数据"""
        pass
    
    def get_schema(self) -> Dict[str, Any]:
        """获取Skill的输入输出Schema"""
        return {
            'input': self._get_input_schema(),
            'output': self._get_output_schema(),
            'parameters': self._get_parameter_schema()
        }
    
    def _get_input_schema(self) -> Dict[str, Any]:
        return {'type': 'any'}
    
    def _get_output_schema(self) -> Dict[str, Any]:
        return {'type': 'any'}
    
    def _get_parameter_schema(self) -> Dict[str, Any]:
        return {}

class TextProcessingSkill(Skill):
    """文本处理Skill示例"""
    
    @property
    def name(self) -> str:
        return "text_processor"
    
    @property
    def version(self) -> str:
        return "1.0.0"
    
    @property
    def description(self) -> str:
        return "提供基础的文本处理功能,包括清洗、分词和标准化"
    
    def validate_input(self, input_data: SkillInput) -> bool:
        return isinstance(input_data.data, str)
    
    def execute(self, input_data: SkillInput) -> SkillOutput:
        start_time = time.time()
        
        text = input_data.data
        parameters = input_data.parameters or {}
        
        # 根据参数执行不同的处理
        if parameters.get('clean', True):
            text = self._clean_text(text)
        
        if parameters.get('tokenize', False):
            text = self._tokenize(text)
        
        if parameters.get('normalize', True):
            text = self._normalize(text)
        
        execution_time = time.time() - start_time
        
        return SkillOutput(
            result=text,
            metadata={'operations': list(parameters.keys())},
            execution_time=execution_time
        )
    
    def _clean_text(self, text: str) -> str:
        """清洗文本"""
        import re
        # 去除多余空格
        text = re.sub(r'\s+', ' ', text)
        # 去除特殊字符
        text = re.sub(r'[^\w\s\u4e00-\u9fff]', '', text)
        return text.strip()
    
    def _tokenize(self, text: str) -> List[str]:
        """分词"""
        return text.split()
    
    def _normalize(self, text: str) -> str:
        """标准化"""
        return text.lower()

这个设计展示了模块化的核心原则:通过抽象基类定义标准接口,每个具体的Skill只实现自己负责的功能。SkillInputSkillOutput提供了统一的数据交换格式,使得不同的Skill可以无缝协作。

插件系统与动态加载

一个优秀的Skill库应该支持插件化扩展。开发者应该能够在不修改核心代码的情况下,添加新的Skill或替换现有的Skill。

插件架构设计

import importlib
import pkgutil
from typing import Type, Callable
import inspect

class PluginManager:
    """插件管理器"""
    
    def __init__(self):
        self._plugins: Dict[str, Type[Skill]] = {}
        self._hooks: Dict[str, List[Callable]] = {}
        self._metadata: Dict[str, Dict] = {}
    
    def register(self, skill_class: Type[Skill], metadata: Dict = None):
        """注册Skill插件"""
        instance = skill_class()
        name = instance.name
        
        if name in self._plugins:
            raise ValueError(f"Skill '{name}' is already registered")
        
        self._plugins[name] = skill_class
        self._metadata[name] = metadata or {}
        
        # 触发注册钩子
        self._trigger_hook('on_register', name, skill_class)
    
    def unregister(self, name: str):
        """注销Skill插件"""
        if name not in self._plugins:
            raise ValueError(f"Skill '{name}' is not registered")
        
        del self._plugins[name]
        del self._metadata[name]
        
        # 触发注销钩子
        self._trigger_hook('on_unregister', name)
    
    def load_from_module(self, module_path: str):
        """从模块加载插件"""
        try:
            module = importlib.import_module(module_path)
            
            # 查找模块中所有Skill子类
            for name, obj in inspect.getmembers(module):
                if (inspect.isclass(obj) and 
                    issubclass(obj, Skill) and 
                    obj is not Skill and
                    not inspect.isabstract(obj)):
                    self.register(obj)
                    
        except ImportError as e:
            raise ImportError(f"Failed to load module {module_path}: {e}")
    
    def load_from_directory(self, directory: str, package_prefix: str = ""):
        """从目录加载所有插件"""
        import os
        import sys
        
        if directory not in sys.path:
            sys.path.insert(0, directory)
        
        for _, name, ispkg in pkgutil.iter_modules([directory]):
            if not ispkg:
                module_name = f"{package_prefix}.{name}" if package_prefix else name
                try:
                    self.load_from_module(module_name)
                except Exception as e:
                    print(f"Failed to load plugin from {module_name}: {e}")
    
    def get(self, name: str) -> Type[Skill]:
        """获取Skill类"""
        if name not in self._plugins:
            raise KeyError(f"Skill '{name}' not found")
        return self._plugins[name]
    
    def create_instance(self, name: str, config: Dict = None) -> Skill:
        """创建Skill实例"""
        skill_class = self.get(name)
        instance = skill_class()
        
        # 如果Skill支持配置注入
        if config and hasattr(instance, 'configure'):
            instance.configure(config)
        
        return instance
    
    def list_skills(self) -> List[Dict]:
        """列出所有已注册的Skill"""
        return [
            {
                'name': name,
                'version': self._plugins[name]().version,
                'description': self._plugins[name]().description,
                'metadata': self._metadata.get(name, {})
            }
            for name in self._plugins.keys()
        ]
    
    def add_hook(self, event: str, callback: Callable):
        """添加事件钩子"""
        if event not in self._hooks:
            self._hooks[event] = []
        self._hooks[event].append(callback)
    
    def _trigger_hook(self, event: str, *args, **kwargs):
        """触发事件钩子"""
        for callback in self._hooks.get(event, []):
            try:
                callback(*args, **kwargs)
            except Exception as e:
                print(f"Hook error: {e}")

# 使用示例
plugin_manager = PluginManager()

# 注册内置Skill
plugin_manager.register(TextProcessingSkill)

# 从模块加载第三方Skill
plugin_manager.load_from_module("custom_skills.analysis")

# 列出所有可用Skill
available_skills = plugin_manager.list_skills()
print(f"Available skills: {available_skills}")

# 创建Skill实例并执行
skill = plugin_manager.create_instance("text_processor")
result = skill.execute(SkillInput(data="Hello World!"))

这个插件系统实现了完整的动态加载机制。它支持从模块和目录加载插件,提供了注册、注销、查询和实例化等完整功能。钩子机制允许在插件生命周期中插入自定义逻辑。

注册发现机制

在分布式或多团队的场景中,Skill的注册和发现变得尤为重要。一个中心化的注册表可以帮助团队共享和发现可用的Skill。

Skill注册中心

from datetime import datetime, timedelta
import json

class SkillRegistry:
    """Skill注册中心"""
    
    def __init__(self, storage_backend=None):
        self._skills: Dict[str, Dict] = {}
        self._dependencies: Dict[str, List[str]] = {}
        self.storage = storage_backend
        self._load_from_storage()
    
    def register(self, skill_info: Dict):
        """注册Skill"""
        name = skill_info['name']
        version = skill_info['version']
        
        skill_record = {
            'name': name,
            'version': version,
            'description': skill_info.get('description', ''),
            'author': skill_info.get('author', ''),
            'tags': skill_info.get('tags', []),
            'schema': skill_info.get('schema', {}),
            'dependencies': skill_info.get('dependencies', []),
            'entry_point': skill_info.get('entry_point', ''),
            'registered_at': datetime.now().isoformat(),
            'last_updated': datetime.now().isoformat(),
            'usage_count': 0,
            'rating': 0.0,
            'status': 'active'
        }
        
        # 检查版本兼容性
        if name in self._skills:
            existing = self._skills[name]
            if not self._is_version_compatible(existing['version'], version):
                raise ValueError(
                    f"Version {version} is not compatible with existing {existing['version']}"
                )
        
        self._skills[name] = skill_record
        self._dependencies[name] = skill_info.get('dependencies', [])
        
        self._save_to_storage()
        
        return skill_record
    
    def discover(self, tags: List[str] = None, 
                min_rating: float = None,
                author: str = None) -> List[Dict]:
        """发现Skill"""
        results = []
        
        for name, info in self._skills.items():
            if info['status'] != 'active':
                continue
            
            # 标签过滤
            if tags and not any(tag in info['tags'] for tag in tags):
                continue
            
            # 评分过滤
            if min_rating is not None and info['rating'] < min_rating:
                continue
            
            # 作者过滤
            if author and info['author'] != author:
                continue
            
            results.append(info)
        
        # 按评分和使用次数排序
        results.sort(key=lambda x: (x['rating'], x['usage_count']), reverse=True)
        
        return results
    
    def get_skill_info(self, name: str, version: str = None) -> Dict:
        """获取Skill详细信息"""
        if name not in self._skills:
            raise KeyError(f"Skill '{name}' not found")
        
        info = self._skills[name]
        
        # 如果指定了版本,检查是否匹配
        if version and info['version'] != version:
            raise ValueError(f"Version mismatch: expected {version}, found {info['version']}")
        
        return info
    
    def resolve_dependencies(self, skill_name: str) -> List[str]:
        """解析Skill的依赖链"""
        resolved = []
        visited = set()
        
        def visit(name):
            if name in visited:
                return
            visited.add(name)
            
            for dep in self._dependencies.get(name, []):
                visit(dep)
            
            resolved.append(name)
        
        visit(skill_name)
        return resolved
    
    def update_usage_stats(self, name: str, success: bool, execution_time: float):
        """更新使用统计"""
        if name in self._skills:
            self._skills[name]['usage_count'] += 1
            
            # 更新平均执行时间
            current_avg = self._skills[name].get('avg_execution_time', 0)
            count = self._skills[name]['usage_count']
            self._skills[name]['avg_execution_time'] = (
                (current_avg * (count - 1) + execution_time) / count
            )
            
            self._save_to_storage()
    
    def _is_version_compatible(self, old_version: str, new_version: str) -> bool:
        """检查版本兼容性"""
        old_parts = old_version.split('.')
        new_parts = new_version.split('.')
        
        # 主版本号相同则兼容
        return old_parts[0] == new_parts[0]
    
    def _load_from_storage(self):
        """从存储加载数据"""
        if self.storage:
            data = self.storage.load()
            self._skills = data.get('skills', {})
            self._dependencies = data.get('dependencies', {})
    
    def _save_to_storage(self):
        """保存到存储"""
        if self.storage:
            self.storage.save({
                'skills': self._skills,
                'dependencies': self._dependencies,
                'last_updated': datetime.now().isoformat()
            })

class FileStorage:
    """文件存储后端"""
    
    def __init__(self, filepath: str):
        self.filepath = filepath
    
    def load(self) -> Dict:
        try:
            with open(self.filepath, 'r') as f:
                return json.load(f)
        except FileNotFoundError:
            return {}
    
    def save(self, data: Dict):
        with open(self.filepath, 'w') as f:
            json.dump(data, f, indent=2, ensure_ascii=False)

注册中心提供了完整的Skill生命周期管理。它不仅存储Skill的基本信息,还跟踪依赖关系、使用统计和评分。这使得团队可以基于实际使用情况来发现和选择Skill。

依赖注入与配置管理

依赖注入是解耦Skill实现与外部依赖的有效手段。通过依赖注入,Skill可以在不修改代码的情况下适应不同的运行环境。

依赖注入容器

class Container:
    """依赖注入容器"""
    
    def __init__(self):
        self._registrations: Dict[str, Any] = {}
        self._singletons: Dict[str, Any] = {}
        self._factories: Dict[str, Callable] = {}
    
    def register_instance(self, interface: str, instance: Any):
        """注册单例实例"""
        self._singletons[interface] = instance
    
    def register_factory(self, interface: str, factory: Callable, 
                        scope: str = 'transient'):
        """注册工厂方法"""
        self._factories[interface] = {
            'factory': factory,
            'scope': scope
        }
    
    def register_class(self, interface: str, cls: Type, 
                      scope: str = 'transient'):
        """注册类"""
        def factory():
            # 自动解析构造函数参数
            sig = inspect.signature(cls.__init__)
            params = {}
            for name, param in sig.parameters.items():
                if name == 'self':
                    continue
                if param.default == inspect.Parameter.empty:
                    # 需要解析依赖
                    params[name] = self.resolve(name)
                else:
                    params[name] = param.default
            
            return cls(**params)
        
        self._factories[interface] = {
            'factory': factory,
            'scope': scope
        }
    
    def resolve(self, interface: str) -> Any:
        """解析依赖"""
        # 先检查单例
        if interface in self._singletons:
            return self._singletons[interface]
        
        # 再检查工厂
        if interface in self._factories:
            factory_info = self._factories[interface]
            
            if factory_info['scope'] == 'singleton':
                if interface not in self._singletons:
                    self._singletons[interface] = factory_info['factory']()
                return self._singletons[interface]
            else:
                return factory_info['factory']()
        
        raise KeyError(f"No registration found for '{interface}'")
    
    def build_provider(self) -> 'ServiceProvider':
        """构建服务提供者"""
        return ServiceProvider(self)

class ServiceProvider:
    """服务提供者"""
    
    def __init__(self, container: Container):
        self._container = container
    
    def get_service(self, interface: str) -> Any:
        return self._container.resolve(interface)

# Skill配置注入示例
class ConfigurableSkill(Skill):
    """支持配置注入的Skill基类"""
    
    def __init__(self, config: Dict = None):
        self.config = config or {}
        self._validate_config()
    
    def configure(self, config: Dict):
        """配置Skill"""
        self.config.update(config)
        self._validate_config()
    
    def _validate_config(self):
        """验证配置"""
        schema = self.get_schema().get('parameters', {})
        
        for key, value in self.config.items():
            if key in schema:
                param_schema = schema[key]
                if 'type' in param_schema:
                    expected_type = param_schema['type']
                    if expected_type == 'string' and not isinstance(value, str):
                        raise ValueError(f"Config '{key}' should be string")
                    elif expected_type == 'number' and not isinstance(value, (int, float)):
                        raise ValueError(f"Config '{key}' should be number")
                    elif expected_type == 'boolean' and not isinstance(value, bool):
                        raise ValueError(f"Config '{key}' should be boolean")

# 使用依赖注入
container = Container()

# 注册服务
container.register_instance('llm_client', OpenAIClient())
container.register_instance('vector_store', ChromaDBStore())

# 注册Skill
container.register_class('text_analyzer', TextAnalysisSkill, scope='singleton')

# 构建服务提供者
provider = container.build_provider()

# 解析Skill并执行
skill = provider.get_service('text_analyzer')
result = skill.execute(SkillInput(data="Analyze this text"))

依赖注入容器实现了完整的依赖解析机制。它支持单例和瞬态两种生命周期,能够自动解析构造函数参数。这使得Skill可以声明自己需要的依赖,而不需要关心这些依赖如何创建。

组合模式与Skill编排

复杂的任务往往需要多个Skill协作完成。组合模式允许将多个Skill组合成一个新的Skill,实现更高层次的抽象。

Skill组合实现

class CompositeSkill(Skill):
    """组合Skill:将多个Skill组合成一个工作流"""
    
    def __init__(self, name: str, description: str = ""):
        self._name = name
        self._description = description
        self._skills: List[Dict] = []  # 包含Skill和转换函数
        self._execution_mode = 'sequential'  # sequential, parallel, conditional
    
    @property
    def name(self) -> str:
        return self._name
    
    @property
    def version(self) -> str:
        return "1.0.0"
    
    @property
    def description(self) -> str:
        return self._description
    
    def add_skill(self, skill: Skill, 
                 input_transform: Callable = None,
                 output_transform: Callable = None,
                 condition: Callable = None):
        """添加子Skill"""
        self._skills.append({
            'skill': skill,
            'input_transform': input_transform,
            'output_transform': output_transform,
            'condition': condition
        })
    
    def execute(self, input_data: SkillInput) -> SkillOutput:
        """执行组合Skill"""
        if self._execution_mode == 'sequential':
            return self._execute_sequential(input_data)
        elif self._execution_mode == 'parallel':
            return self._execute_parallel(input_data)
        elif self._execution_mode == 'conditional':
            return self._execute_conditional(input_data)
        else:
            raise ValueError(f"Unknown execution mode: {self._execution_mode}")
    
    def _execute_sequential(self, input_data: SkillInput) -> SkillOutput:
        """顺序执行"""
        current_data = input_data
        total_time = 0.0
        all_results = []
        
        for skill_config in self._skills:
            skill = skill_config['skill']
            
            # 条件判断
            if skill_config['condition'] and not skill_config['condition'](current_data):
                continue
            
            # 输入转换
            if skill_config['input_transform']:
                current_data = skill_config['input_transform'](current_data)
            
            # 执行Skill
            result = skill.execute(current_data)
            total_time += result.execution_time
            all_results.append(result)
            
            # 输出转换
            if skill_config['output_transform']:
                result = skill_config['output_transform'](result)
            
            # 更新当前数据为下一次输入
            current_data = SkillInput(
                data=result.result,
                context={**current_data.context, 'previous_result': result.result}
            )
        
        # 合并结果
        final_result = self._merge_results(all_results)
        
        return SkillOutput(
            result=final_result,
            metadata={'executed_skills': len(all_results)},
            execution_time=total_time
        )
    
    def _execute_parallel(self, input_data: SkillInput) -> SkillOutput:
        """并行执行"""
        from concurrent.futures import ThreadPoolExecutor, as_completed
        
        results = []
        total_time = 0.0
        
        with ThreadPoolExecutor(max_workers=len(self._skills)) as executor:
            futures = {}
            
            for skill_config in self._skills:
                skill = skill_config['skill']
                
                # 输入转换
                skill_input = input_data
                if skill_config['input_transform']:
                    skill_input = skill_config['input_transform'](input_data)
                
                future = executor.submit(skill.execute, skill_input)
                futures[future] = skill_config
            
            for future in as_completed(futures):
                skill_config = futures[future]
                try:
                    result = future.result()
                    
                    if skill_config['output_transform']:
                        result = skill_config['output_transform'](result)
                    
                    results.append(result)
                    total_time += result.execution_time
                except Exception as e:
                    results.append(SkillOutput(
                        result=None,
                        metadata={'error': str(e)}
                    ))
        
        final_result = self._merge_results(results)
        
        return SkillOutput(
            result=final_result,
            metadata={'executed_skills': len(results)},
            execution_time=total_time
        )
    
    def _execute_conditional(self, input_data: SkillInput) -> SkillOutput:
        """条件执行"""
        for skill_config in self._skills:
            if skill_config['condition'] and skill_config['condition'](input_data):
                return skill_config['skill'].execute(input_data)
        
        # 如果没有条件匹配,执行第一个Skill
        if self._skills:
            return self._skills[0]['skill'].execute(input_data)
        
        raise ValueError("No skill to execute")
    
    def _merge_results(self, results: List[SkillOutput]) -> Any:
        """合并多个结果"""
        # 默认合并策略:返回列表
        return [r.result for r in results]
    
    def validate_input(self, input_data: SkillInput) -> bool:
        """验证输入"""
        # 验证第一个Skill的输入
        if self._skills:
            return self._skills[0]['skill'].validate_input(input_data)
        return True

# 创建组合Skill示例
def create_code_review_skill() -> CompositeSkill:
    """创建代码审查组合Skill"""
    review_skill = CompositeSkill(
        name="code_reviewer",
        description="综合代码审查Skill,包含语法检查、安全扫描和逻辑分析"
    )
    
    # 添加语法检查
    syntax_skill = SyntaxCheckSkill()
    review_skill.add_skill(syntax_skill)
    
    # 添加安全扫描(依赖语法检查结果)
    security_skill = SecurityScanSkill()
    review_skill.add_skill(
        security_skill,
        input_transform=lambda x: SkillInput(
            data=x.context.get('previous_result'),
            context=x.context
        )
    )
    
    # 添加逻辑分析(并行执行)
    logic_skill = LogicAnalysisSkill()
    review_skill.add_skill(logic_skill)
    
    review_skill._execution_mode = 'sequential'
    
    return review_skill

组合Skill实现了强大的工作流编排能力。它支持顺序执行、并行执行和条件执行三种模式,可以灵活地组合多个Skill完成复杂任务。

实际案例分析:数据分析Skill库

让我们通过一个数据分析Skill库的案例来展示模块化设计的实际应用。

库结构

data_analysis_skills/
├── __init__.py
├── base.py              # 基础类和接口
├── data_loading/        # 数据加载Skill
│   ├── __init__.py
│   ├── csv_loader.py
│   ├── json_loader.py
│   └── sql_loader.py
├── preprocessing/       # 预处理Skill
│   ├── __init__.py
│   ├── cleaner.py
│   ├── normalizer.py
│   └── encoder.py
├── analysis/           # 分析Skill
│   ├── __init__.py
│   ├── statistics.py
│   ├── correlation.py
│   └── clustering.py
├── visualization/      # 可视化Skill
│   ├── __init__.py
│   ├── chart.py
│   └── report.py
└── pipeline.py         # 流水线组合

使用示例

from data_analysis_skills import SkillLibrary

# 创建Skill库实例
library = SkillLibrary()

# 加载数据
data = library.load_data('sales.csv', format='csv')

# 创建分析流水线
pipeline = library.create_pipeline([
    {'skill': 'data_cleaner', 'config': {'remove_nulls': True}},
    {'skill': 'normalizer', 'config': {'method': 'z-score'}},
    {'skill': 'correlation_analyzer'},
    {'skill': 'chart_generator', 'config': {'type': 'heatmap'}}
])

# 执行流水线
result = pipeline.execute(data)

# 生成报告
report = library.generate_report(result, format='html')

这个案例展示了模块化设计的优势:每个功能领域都有独立的模块,用户可以根据需要组合使用。通过流水线机制,复杂的分析任务可以被分解为简单的步骤。

总结与最佳实践

构建可复用的Agent Skill库是一项系统工程。以下是关键的最佳实践:

定义清晰的接口契约。 Skill的输入输出格式应该标准化,使得不同的Skill可以无缝协作。接口应该稳定且向后兼容。

实现插件化架构。 通过插件系统,允许第三方开发者扩展库的功能。插件应该能够动态加载和卸载,不影响核心系统的稳定性。

建立注册发现机制。 中心化的注册表帮助用户发现和选择Skill。注册信息应该包含足够的使用统计和评分,帮助用户做出选择。

使用依赖注入。 通过依赖注入解耦Skill与外部依赖,提高可测试性和可配置性。Skill应该声明自己需要的依赖,而不是直接创建它们。

支持组合模式。 复杂的任务应该通过组合简单的Skill来完成。组合Skill应该支持顺序、并行和条件执行等多种模式。

保持向后兼容。 Skill库的升级不应该破坏现有用户的使用。Breaking Change应该提前通知,并提供迁移工具。

提供完整文档。 每个Skill都应该有清晰的使用文档,包括功能说明、参数说明、返回值说明和示例代码。

建立测试体系。 Skill库应该有完善的测试覆盖,包括单元测试、集成测试和端到端测试。测试用例应该覆盖常见的使用场景和边界情况。

一个好的Skill库应该像乐高积木一样:每个积木都是独立的,但组合起来可以创造无限可能。模块化的设计让Skill库能够持续演进,不断积累新的能力,同时保持核心架构的稳定。