重新定義Agent開發:agno如何用3.75KiB內存實現多模態工具調用? 原創
在構建AI智能體(Agent)系統時,工具(Tools)是連接AI模型與外部世界的橋梁。agno作為新一代輕量級多模態Agent框架,以其blazing fast agents with a minimal memory footprint的特性,正在重新定義Agent開發的標準。本文將深入探討agno的工具系統,從架構設計到實戰應用,幫助您全面掌握這個強大的框架。
1. 工具系統概述
1.1 工具的作用和意義
在agno框架中,Tools are functions that helps Agno Agents to interact with the external world。工具讓Agent變得真正"智能化",使其能夠:
- 擴展能力邊界:通過工具,Agent可以搜索網絡、查詢數據庫、調用API、發送郵件等
- 實現實時交互:連接外部系統,獲取最新信息和動態數據
- 執行具體操作:從被動的對話轉變為主動的任務執行者
- 多模態處理:Agno is built from the ground up to work seamlessly with various media types,支持文本、圖像、音頻和視頻的原生處理
1.2 agno工具架構設計
agno的工具系統采用了極簡而高效的架構設計:
# 基礎架構示例
from agno.agent import Agent
from agno.tools import tool, Toolkit
# 工具可以是簡單的函數
@tool
def my_tool(param: str) -> str:
"""工具描述"""
returnf"處理結果: {param}"
# 也可以是復雜的工具包
class MyToolkit(Toolkit):
def __init__(self):
super().__init__(
name="my_toolkit",
tools=[self.tool1, self.tool2]
)性能優勢:
- Agent creation in Agno clocks in at about 2μs per agent, which is ~10,000x faster than LangGraph
- Agno agents use just ~3.75 KiB of memory on average—~50x less than LangGraph agents
1.3 工具調用流程
agno的工具調用流程遵循以下步驟:
- 意圖識別:Agent分析用戶輸入,確定需要調用的工具
- 參數提取:從上下文中提取工具所需的參數
- 并行執行:Agno Agents can execute multiple tools concurrently,支持異步并發執行
- 結果處理:收集工具返回結果,整合到響應中
- 錯誤處理:優雅處理工具執行中的異常
2. 內置工具詳解
agno提供了80+ pre-built toolkits,覆蓋了大多數常見場景。讓我們深入了解幾個核心工具包:
2.1 DuckDuckGoTools:網絡搜索
DuckDuckGo工具包提供隱私優先的網絡搜索能力:
from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.tools.duckduckgo import DuckDuckGoTools
agent = Agent(
model=OpenAIChat(id="gpt-4o-mini"),
tools=[DuckDuckGoTools()],
show_tool_calls=True, # 顯示工具調用過程
markdown=True
)
# 使用示例
agent.print_response("最新的AI技術發展趨勢是什么?", stream=True)2.2 YFinanceTools:金融數據
專為金融分析設計的工具包,提供實時股票數據:
from agno.tools.yfinance import YFinanceTools
finance_agent = Agent(
model=OpenAIChat(id="gpt-4o"),
tools=[
YFinanceTools(
stock_price=True, # 股票價格
analyst_recommendations=True, # 分析師建議
company_info=True, # 公司信息
company_news=True # 公司新聞
)
],
instructions=["使用表格展示數據", "突出關鍵指標"],
markdown=True
)
# 獲取NVIDIA的詳細分析報告
finance_agent.print_response("分析NVDA的投資價值", stream=True)2.3 GoogleSearchTools:谷歌搜索
提供更全面的搜索結果,適合深度研究:
from agno.tools.google_search import GoogleSearchTools
research_agent = Agent(
tools=[GoogleSearchTools()],
instructions=["總是包含信息來源", "驗證信息的時效性"]
)2.4 其他常用工具介紹
agno還提供了豐富的工具生態系統:
- GitHub Tools:代碼倉庫搜索和管理
- Gmail Tools:郵件發送和接收
- Slack Tools:團隊協作消息
- SQL Tools:數據庫查詢操作
- File System Tools:文件系統操作
- Newspaper Tools:新聞內容提取
- Pandas Tools:數據分析處理
3. 自定義工具開發
雖然內置工具豐富,但in most cases, you will write your own tools。讓我們深入了解如何開發自定義工具。
3.1 Tool基類結構
使用裝飾器創建簡單工具:
from agno.tools import tool
from agno.utils.log import logger
@tool(
show_result=True, # 顯示工具執行結果
stop_after_tool_call=True# 調用后停止執行
)
def custom_calculator(expression: str) -> str:
"""
計算數學表達式
Args:
expression: 要計算的數學表達式
Returns:
計算結果
"""
try:
result = eval(expression)
logger.info(f"計算完成: {expression} = {result}")
returnf"計算結果:{result}"
except Exception as e:
logger.error(f"計算失敗: {e}")
returnf"錯誤:無法計算 {expression}"3.2 工具開發規范
創建復雜的工具包需要繼承Toolkit類:
from typing import List
from agno.tools import Toolkit
from agno.utils.log import logger
import subprocess
class ShellTools(Toolkit):
"""系統命令行工具包"""
def __init__(self, allowed_commands: List[str] = None, **kwargs):
self.allowed_commands = allowed_commands or ["ls", "pwd", "echo"]
super().__init__(
name="shell_tools",
tools=[self.run_command, self.check_process],
**kwargs
)
def run_command(self, command: str, args: str = "") -> str:
"""
執行系統命令
Args:
command: 要執行的命令
args: 命令參數
Returns:
命令輸出結果
"""
if command notin self.allowed_commands:
returnf"錯誤:命令 {command} 不在允許列表中"
try:
full_command = f"{command} {args}".strip()
logger.info(f"執行命令: {full_command}")
result = subprocess.run(
full_command.split(),
capture_output=True,
text=True,
timeout=10
)
if result.returncode != 0:
returnf"命令執行失敗: {result.stderr}"
return result.stdout
except subprocess.TimeoutExpired:
return"錯誤:命令執行超時"
except Exception as e:
logger.error(f"命令執行異常: {e}")
returnf"錯誤:{str(e)}"
def check_process(self, process_name: str) -> str:
"""檢查進程狀態"""
try:
result = subprocess.run(
["ps", "aux"],
capture_output=True,
text=True
)
processes = [
line for line in result.stdout.split('\n')
if process_name in line
]
if processes:
returnf"找到 {len(processes)} 個匹配的進程:\n" + "\n".join(processes[:5])
else:
returnf"未找到名為 {process_name} 的進程"
except Exception as e:
returnf"檢查進程失敗: {str(e)}"3.3 參數驗證和錯誤處理
良好的參數驗證和錯誤處理是工具可靠性的關鍵:
from typing import Optional
from pydantic import BaseModel, Field, validator
import requests
class APIRequest(BaseModel):
"""API請求參數模型"""
url: str = Field(..., description="API端點URL")
method: str = Field("GET", description="HTTP方法")
headers: Optional[dict] = Field(None, description="請求頭")
timeout: int = Field(30, ge=1, le=300, description="超時時間(秒)")
@validator('method')
def validate_method(cls, v):
allowed_methods = ['GET', 'POST', 'PUT', 'DELETE']
if v.upper() notin allowed_methods:
raise ValueError(f"方法必須是 {allowed_methods} 之一")
return v.upper()
@validator('url')
def validate_url(cls, v):
ifnot v.startswith(('http://', 'https://')):
raise ValueError("URL必須以http://或https://開頭")
return v
@tool
def api_caller(request_data: dict) -> str:
"""
調用外部API
Args:
request_data: API請求參數
Returns:
API響應結果
"""
try:
# 參數驗證
req = APIRequest(**request_data)
# 執行請求
response = requests.request(
method=req.method,
url=req.url,
headers=req.headers,
timeout=req.timeout
)
# 檢查響應狀態
response.raise_for_status()
# 返回結果
returnf"狀態碼: {response.status_code}\n內容: {response.text[:500]}"
except requests.exceptions.Timeout:
return"錯誤:請求超時"
except requests.exceptions.ConnectionError:
return"錯誤:無法連接到服務器"
except requests.exceptions.HTTPError as e:
returnf"HTTP錯誤: {e}"
except Exception as e:
logger.error(f"API調用失敗: {e}")
returnf"錯誤:{str(e)}"3.4 異步工具實現
agno完全支持異步工具,這對于I/O密集型操作特別重要:
import asyncio
from agno.tools import tool
from agno.utils.log import logger
@tool
asyncdef async_data_fetcher(sources: List[str]) -> str:
"""
異步獲取多個數據源
Args:
sources: 數據源URL列表
Returns:
合并的數據結果
"""
asyncdef fetch_single(source: str):
logger.info(f"開始獲取: {source}")
try:
# 模擬異步HTTP請求
await asyncio.sleep(1) # 實際應用中使用aiohttp
returnf"數據來自 {source}"
except Exception as e:
logger.error(f"獲取失敗 {source}: {e}")
returnNone
# 并發獲取所有數據源
tasks = [fetch_single(source) for source in sources]
results = await asyncio.gather(*tasks)
# 過濾并合并結果
valid_results = [r for r in results if r isnotNone]
if valid_results:
return"\n".join(valid_results)
else:
return"錯誤:無法獲取任何數據"
# 異步工具在Agent中的使用
asyncdef main():
agent = Agent(
model=OpenAIChat(id="gpt-4o-mini"),
tools=[async_data_fetcher],
show_tool_calls=True
)
# 異步執行
response = await agent.arun(
"從以下源獲取數據: api.example1.com, api.example2.com"
)
print(response.content)
# 運行異步主函數
asyncio.run(main())4. 工具組合模式
在復雜場景中,單個工具往往不夠用。agno支持多種工具組合模式來構建強大的Agent系統。
4.1 串行工具鏈
串行執行工具,每個工具的輸出作為下一個工具的輸入:
class DataPipeline(Toolkit):
"""數據處理管道工具包"""
def __init__(self):
super().__init__(
name="data_pipeline",
tools=[self.fetch_data, self.process_data, self.generate_report]
)
self.intermediate_data = None
def fetch_data(self, source: str) -> str:
"""步驟1: 獲取原始數據"""
logger.info(f"獲取數據從: {source}")
# 模擬數據獲取
self.intermediate_data = {"source": source, "raw_data": [1, 2, 3, 4, 5]}
return"數據獲取成功"
def process_data(self, operation: str) -> str:
"""步驟2: 處理數據"""
ifnot self.intermediate_data:
return"錯誤:請先獲取數據"
logger.info(f"處理數據: {operation}")
if operation == "sum":
result = sum(self.intermediate_data["raw_data"])
elif operation == "average":
data = self.intermediate_data["raw_data"]
result = sum(data) / len(data)
else:
returnf"不支持的操作: {operation}"
self.intermediate_data["processed"] = result
returnf"處理完成: {result}"
def generate_report(self) -> str:
"""步驟3: 生成報告"""
ifnot self.intermediate_data or"processed"notin self.intermediate_data:
return"錯誤:沒有處理后的數據"
report = f"""
## 數據分析報告
- 數據源: {self.intermediate_data['source']}
- 原始數據: {self.intermediate_data['raw_data']}
- 處理結果: {self.intermediate_data['processed']}
- 生成時間: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
"""
return report
# 使用串行工具鏈
pipeline_agent = Agent(
model=OpenAIChat(id="gpt-4o"),
tools=[DataPipeline()],
instructions=["按順序執行數據處理步驟"],
show_tool_calls=True
)4.2 并行工具執行
Agno Agents can execute multiple tools concurrently, allowing you to process function calls that the model makes efficiently:
import asyncio
from typing import List, Dict
class ParallelSearchTools(Toolkit):
"""并行搜索工具包"""
def __init__(self):
super().__init__(
name="parallel_search",
tools=[self.multi_source_search]
)
asyncdef search_single_source(self, source: str, query: str) -> Dict:
"""搜索單個數據源"""
logger.info(f"搜索 {source}: {query}")
# 模擬不同延遲的搜索
if source == "database":
await asyncio.sleep(0.5)
return {"source": source, "results": ["DB結果1", "DB結果2"]}
elif source == "api":
await asyncio.sleep(1.0)
return {"source": source, "results": ["API結果1", "API結果2"]}
elif source == "cache":
await asyncio.sleep(0.1)
return {"source": source, "results": ["緩存結果1", "緩存結果2"]}
else:
await asyncio.sleep(2.0)
return {"source": source, "results": ["其他結果"]}
asyncdef multi_source_search(self, query: str, sources: List[str]) -> str:
"""
并行搜索多個數據源
Args:
query: 搜索查詢
sources: 數據源列表
Returns:
合并的搜索結果
"""
start_time = asyncio.get_event_loop().time()
# 創建并發任務
tasks = [
self.search_single_source(source, query)
for source in sources
]
# 并行執行所有搜索
results = await asyncio.gather(*tasks, return_exceptions=True)
execution_time = asyncio.get_event_loop().time() - start_time
# 處理結果
successful_results = []
failed_sources = []
for i, result in enumerate(results):
if isinstance(result, Exception):
failed_sources.append(sources[i])
logger.error(f"搜索失敗 {sources[i]}: {result}")
else:
successful_results.append(result)
# 格式化輸出
output = f"并行搜索完成(耗時: {execution_time:.2f}秒)\n\n"
for result in successful_results:
output += f"### {result['source']}:\n"
output += "\n".join(f"- {r}"for r in result['results'])
output += "\n\n"
if failed_sources:
output += f"失敗的源: {', '.join(failed_sources)}\n"
return output4.3 條件工具選擇
基于條件動態選擇要執行的工具:
class ConditionalTools(Toolkit):
"""條件工具選擇器"""
def __init__(self):
super().__init__(
name="conditional_tools",
tools=[self.smart_router]
)
# 注冊可用工具
self.tool_registry = {
"text": self.process_text,
"number": self.process_number,
"date": self.process_date,
"json": self.process_json
}
def detect_data_type(self, data: str) -> str:
"""檢測數據類型"""
import json
from datetime import datetime
# 嘗試解析為數字
try:
float(data)
return"number"
except ValueError:
pass
# 嘗試解析為日期
try:
datetime.fromisoformat(data)
return"date"
except:
pass
# 嘗試解析為JSON
try:
json.loads(data)
return"json"
except:
pass
# 默認為文本
return"text"
def smart_router(self, data: str, preferred_tool: str = None) -> str:
"""
智能路由到合適的處理工具
Args:
data: 要處理的數據
preferred_tool: 優先使用的工具
Returns:
處理結果
"""
# 如果指定了工具且存在,優先使用
if preferred_tool and preferred_tool in self.tool_registry:
tool = self.tool_registry[preferred_tool]
logger.info(f"使用指定工具: {preferred_tool}")
else:
# 自動檢測并選擇工具
data_type = self.detect_data_type(data)
tool = self.tool_registry.get(data_type)
logger.info(f"自動選擇工具: {data_type}")
if tool:
return tool(data)
else:
returnf"無法處理數據類型: {data}"
def process_text(self, data: str) -> str:
"""處理文本數據"""
word_count = len(data.split())
char_count = len(data)
returnf"文本分析:{word_count}個詞,{char_count}個字符"
def process_number(self, data: str) -> str:
"""處理數字數據"""
num = float(data)
returnf"數字分析:值={num}, 平方={num**2}, 平方根={num**0.5:.2f}"
def process_date(self, data: str) -> str:
"""處理日期數據"""
from datetime import datetime
dt = datetime.fromisoformat(data)
returnf"日期分析:{dt.strftime('%Y年%m月%d日 %H:%M:%S')}"
def process_json(self, data: str) -> str:
"""處理JSON數據"""
import json
obj = json.loads(data)
returnf"JSON分析:{len(obj)}個鍵,類型={type(obj).__name__}"4.4 工具結果聚合
多個工具的結果需要智能聚合:
class AggregationTools(Toolkit):
"""結果聚合工具包"""
def __init__(self):
super().__init__(
name="aggregation_tools",
tools=[self.aggregate_results]
)
def aggregate_results(
self,
results: List[Dict],
strategy: str = "merge"
) -> str:
"""
聚合多個工具的執行結果
Args:
results: 工具執行結果列表
strategy: 聚合策略 (merge, summarize, vote, weighted)
Returns:
聚合后的結果
"""
ifnot results:
return"沒有結果需要聚合"
if strategy == "merge":
# 簡單合并所有結果
merged = {}
for result in results:
if isinstance(result, dict):
merged.update(result)
else:
merged[f"result_{len(merged)}"] = result
return json.dumps(merged, ensure_ascii=False, indent=2)
elif strategy == "summarize":
# 生成摘要
summary = "## 結果摘要\n\n"
for i, result in enumerate(results, 1):
summary += f"{i}. {self._summarize_single(result)}\n"
return summary
elif strategy == "vote":
# 投票機制(適用于分類結果)
votes = {}
for result in results:
key = str(result.get("decision", result))
votes[key] = votes.get(key, 0) + 1
winner = max(votes.items(), key=lambda x: x[1])
returnf"投票結果:{winner[0]}({winner[1]}票)"
elif strategy == "weighted":
# 加權平均(適用于數值結果)
if all(isinstance(r.get("value"), (int, float)) for r in results):
total_weight = sum(r.get("weight", 1) for r in results)
weighted_sum = sum(
r["value"] * r.get("weight", 1)
for r in results
)
returnf"加權平均值:{weighted_sum / total_weight:.2f}"
else:
return"加權策略需要數值類型的結果"
else:
returnf"未知的聚合策略: {strategy}"
def _summarize_single(self, result: any) -> str:
"""生成單個結果的摘要"""
if isinstance(result, dict):
returnf"{result.get('source', '未知源')}: {result.get('value', result)}"
else:
return str(result)[:100]5. 調試和監控
工具系統的調試和監控對于生產環境至關重要。agno提供了Monitor agent sessions and performance in real-time on agno.com的能力。
5.1 show_tool_calls參數使用
??show_tool_calls??是調試的利器:
# 開發階段:顯示詳細的工具調用信息
dev_agent = Agent(
model=OpenAIChat(id="gpt-4o"),
tools=[DuckDuckGoTools(), YFinanceTools()],
show_tool_calls=True, # 顯示工具調用細節
markdown=True
)
# 生產環境:隱藏工具調用細節
prod_agent = Agent(
model=OpenAIChat(id="gpt-4o"),
tools=[DuckDuckGoTools(), YFinanceTools()],
show_tool_calls=False, # 隱藏內部細節
markdown=True
)
# 條件性顯示
debug_mode = os.getenv("DEBUG", "false").lower() == "true"
agent = Agent(
model=OpenAIChat(id="gpt-4o"),
tools=[CustomTools()],
show_tool_calls=debug_mode
)5.2 工具調用日志
實現完善的日志系統:
import logging
from datetime import datetime
from typing import Any, Dict
from agno.utils.log import logger
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('agent_tools.log'),
logging.StreamHandler()
]
)
class LoggedToolkit(Toolkit):
"""帶日志記錄的工具包基類"""
def __init__(self, name: str, **kwargs):
super().__init__(name=name, **kwargs)
self.call_history = []
def log_tool_call(
self,
tool_name: str,
args: Dict[str, Any],
result: Any,
duration: float,
error: Exception = None
):
"""記錄工具調用"""
call_record = {
"timestamp": datetime.now().isoformat(),
"tool": tool_name,
"args": args,
"result": str(result)[:200] if result elseNone,
"duration": duration,
"error": str(error) if error elseNone
}
self.call_history.append(call_record)
# 記錄到日志文件
if error:
logger.error(f"工具調用失敗: {tool_name}", extra=call_record)
else:
logger.info(f"工具調用成功: {tool_name}", extra=call_record)
# 可選:發送到監控系統
self._send_to_monitoring(call_record)
def _send_to_monitoring(self, record: Dict):
"""發送到外部監控系統"""
# 這里可以集成Prometheus、Grafana、DataDog等
pass
def get_call_statistics(self) -> Dict:
"""獲取調用統計信息"""
ifnot self.call_history:
return {"total_calls": 0}
total_calls = len(self.call_history)
successful_calls = sum(
1for call in self.call_history
if call["error"] isNone
)
avg_duration = sum(
call["duration"] for call in self.call_history
) / total_calls
return {
"total_calls": total_calls,
"successful_calls": successful_calls,
"failure_rate": (total_calls - successful_calls) / total_calls,
"average_duration": avg_duration,
"last_call": self.call_history[-1]["timestamp"]
}5.3 性能分析
監控和優化工具性能:
import time
import tracemalloc
from functools import wraps
from typing import Callable
def performance_monitor(func: Callable) -> Callable:
"""性能監控裝飾器"""
@wraps(func)
def wrapper(*args, **kwargs):
# 開始內存追蹤
tracemalloc.start()
start_memory = tracemalloc.get_traced_memory()[0]
# 記錄開始時間
start_time = time.perf_counter()
try:
# 執行函數
result = func(*args, **kwargs)
# 計算性能指標
end_time = time.perf_counter()
end_memory = tracemalloc.get_traced_memory()[0]
duration = end_time - start_time
memory_used = (end_memory - start_memory) / 1024 / 1024# MB
# 記錄性能數據
logger.info(
f"性能分析 - {func.__name__}: "
f"耗時={duration:.3f}秒, "
f"內存={memory_used:.2f}MB"
)
# 性能預警
if duration > 5.0:
logger.warning(f"{func.__name__} 執行時間過長: {duration:.3f}秒")
if memory_used > 100:
logger.warning(f"{func.__name__} 內存使用過高: {memory_used:.2f}MB")
return result
finally:
tracemalloc.stop()
return wrapper
# 使用性能監控
class PerformanceAwareTools(Toolkit):
def __init__(self):
super().__init__(
name="performance_tools",
tools=[self.heavy_computation]
)
@performance_monitor
def heavy_computation(self, n: int) -> str:
"""模擬計算密集型任務"""
result = sum(i ** 2for i in range(n))
returnf"計算結果: {result}"
# 批量性能測試
def benchmark_tools(agent: Agent, test_cases: List[str], iterations: int = 10):
"""工具性能基準測試"""
results = []
for test_case in test_cases:
times = []
for _ in range(iterations):
start = time.perf_counter()
agent.run(test_case)
times.append(time.perf_counter() - start)
avg_time = sum(times) / len(times)
min_time = min(times)
max_time = max(times)
results.append({
"test_case": test_case,
"avg_time": avg_time,
"min_time": min_time,
"max_time": max_time
})
return results5.4 常見問題排查
工具系統的常見問題及解決方案:
class ToolDebugger:
"""工具調試助手"""
@staticmethod
def diagnose_tool_issue(error: Exception, context: Dict) -> str:
"""診斷工具問題"""
diagnosis = f"## 工具問題診斷\n\n"
diagnosis += f"錯誤類型: {type(error).__name__}\n"
diagnosis += f"錯誤信息: {str(error)}\n\n"
# 常見問題模式匹配
if isinstance(error, TimeoutError):
diagnosis += "### 診斷:超時問題\n"
diagnosis += "- 檢查網絡連接\n"
diagnosis += "- 增加超時時間設置\n"
diagnosis += "- 考慮使用異步工具\n"
elif isinstance(error, ValueError):
diagnosis += "### 診斷:參數驗證失敗\n"
diagnosis += f"- 檢查輸入參數: {context.get('args')}\n"
diagnosis += "- 驗證參數類型和格式\n"
diagnosis += "- 查看工具文檔字符串\n"
elif isinstance(error, ImportError):
diagnosis += "### 診斷:依賴缺失\n"
diagnosis += "- 安裝缺失的包\n"
diagnosis += f"- 運行: pip install {str(error).split()[-1]}\n"
elif"rate limit"in str(error).lower():
diagnosis += "### 診斷:速率限制\n"
diagnosis += "- 實現請求限流\n"
diagnosis += "- 使用緩存減少請求\n"
diagnosis += "- 考慮批量處理\n"
else:
diagnosis += "### 通用建議\n"
diagnosis += "- 檢查工具實現代碼\n"
diagnosis += "- 添加更多錯誤處理\n"
diagnosis += "- 啟用詳細日志\n"
return diagnosis
@staticmethod
def validate_tool_setup(toolkit: Toolkit) -> Dict:
"""驗證工具設置"""
validation_results = {
"valid": True,
"issues": [],
"warnings": []
}
# 檢查工具是否正確注冊
ifnot hasattr(toolkit, 'tools') ornot toolkit.tools:
validation_results["valid"] = False
validation_results["issues"].append("工具列表為空")
# 檢查每個工具
for tool in getattr(toolkit, 'tools', []):
# 檢查是否有文檔字符串
ifnot tool.__doc__:
validation_results["warnings"].append(
f"{tool.__name__} 缺少文檔字符串"
)
# 檢查是否有類型注解
ifnot hasattr(tool, '__annotations__'):
validation_results["warnings"].append(
f"{tool.__name__} 缺少類型注解"
)
return validation_results
# 使用調試工具
debugger = ToolDebugger()
try:
# 工具調用
result = some_tool.execute()
except Exception as e:
# 自動診斷問題
diagnosis = debugger.diagnose_tool_issue(
e,
context={"args": {"param": "value"}}
)
print(diagnosis)最佳實踐建議
基于agno工具系統的設計理念和實踐經驗,這里總結一些最佳實踐:
1. 工具設計原則
- 單一職責:每個工具只做一件事,但要做好
- 明確的接口:清晰的參數和返回值定義
- 充分的文檔:詳細的docstring和類型注解
- 優雅的降級:處理失敗情況,提供有用的錯誤信息
2. 性能優化
- 利用并發
- 實現緩存:避免重復的計算或API調用
- 批量處理:合并多個小請求為一個大請求
- 資源管理:正確釋放連接、文件句柄等資源
3. 安全考慮
- 輸入驗證:嚴格驗證所有外部輸入
- 權限控制:限制敏感操作的訪問
- 審計日志:記錄所有工具調用
- 沙箱執行:在受限環境中運行不信任的代碼
總結
agno的工具系統展現了現代AI Agent框架的設計理念:簡單、高效、可擴展。通過本文的深入探討,我們了解了:
- 架構優勢:agno以其極致的性能和最小的內存占用,為大規模Agent部署提供了堅實基礎
- 豐富生態:80+內置工具包覆蓋了大部分應用場景
- 靈活擴展:簡潔的自定義工具開發接口讓擴展變得輕松
- 組合模式:串行、并行、條件選擇等模式滿足復雜業務需求
- 生產就緒:完善的調試、監控和性能分析工具鏈
agno正在重新定義Agent開發的標準。無論您是構建簡單的聊天機器人,還是復雜的多Agent協作系統,agno的工具系統都能提供強大而靈活的支持。
本文轉載自???AI 博物院?? 作者:longyunfeigu

















