100行代碼搞定多智能體?這個極簡AI框架PocketFlow有點東西 原創
現在各種工作流框架太多了,看不過來,也沒有什么精力去學習。最近無意中刷到一個微型框架:PocketFlow,這個框架非常小巧,看了下只有 100 行左右的代碼,很容易看懂。我非常喜歡,寫個教程介紹一下。
對比其他框架
抽象 | 應用特定包裝器 | 供應商特定包裝器 | 代碼行數 | 大小 | |
LangChain | Agent, Chain | 很多 (例如,QA, 摘要) | 很多 (例如,OpenAI, Pinecone等) | 405K | +166MB |
CrewAI | Agent, Chain | 很多 (例如,FileReadTool, SerperDevTool) | 很多 (例如,OpenAI, Anthropic, Pinecone等) | 18K | +173MB |
SmolAgent | Agent | 一些 (例如,CodeAgent, VisitWebTool) | 一些 (例如,DuckDuckGo, Hugging Face等) | 8K | +198MB |
LangGraph | Agent, Graph | 一些 (例如,語義搜索) | 一些 (例如,PostgresStore, SqliteSaver等) | 37K | +51MB |
AutoGen | Agent | 一些 (例如,Tool Agent, Chat Agent) | 很多 [可選] (例如,OpenAI, Pinecone等) | 7K (僅核心) | +26MB (僅核心) |
PocketFlow | Graph | 無 | 無 | 100 | +56KB |

通過對比可以看到,PocketFlow 沒有很多融合的功能,只抽象出Graph,就能完成常見的RAG和Agent相關的功能了。
應用場景
PocketFlow 設計了多種應用場景的示例,從基礎的聊天機器人到復雜的多智能體系統。以下是一些基礎示例:
- 聊天:基礎聊天機器人,帶有對話歷史
- 結構化輸出:通過提示從簡歷中提取結構化數據
- 工作流:寫作工作流,包括大綱、內容編寫和樣式應用
- 智能體:可以搜索網絡并回答問題的研究智能體
- RAG:簡單的檢索增強生成過程
- 批處理:將 Markdown 內容翻譯成多種語言的批處理器
更復雜的應用包括:多智能體通信、監督流程、并行執行、思維鏈推理、短期和長期記憶聊天機器人等。
多智能體實戰
現在我們用PocketFlow實現一個AI版"你畫我猜"——Taboo游戲
為了直觀地展示PocketFlow的威力,我們來看一個用它實現的多智能體協作案例:Taboo(禁忌語)游戲。
游戲規則:
* 提示者 (Hinter): 知道一個目標詞和幾個"禁忌詞"。它的任務是給出提示,引導猜詞者猜出目標詞,但提示中不能包含任何禁忌詞。
* 猜詞者 (Guesser): 根據提示者的提示,猜出目標詞。
在這個案例中,兩個LLM將分別扮演提示者和猜詞者,它們需要通過不斷的異步通信來協作完成游戲。
PocketFlow使用??AsyncNode???來定義異步任務。我們的兩個智能體??AsyncHinter???和??AsyncGuesser??都繼承自它。
提示者 ??AsyncHinter??
class AsyncHinter(AsyncNode):
asyncdef prep_async(self, shared):
# 1. 從隊列中等待猜詞者的消息
guess = await shared["hinter_queue"].get()
if guess == "GAME_OVER":
returnNone
# 2. 準備LLM的輸入
return shared["target_word"], shared["forbidden_words"], shared.get("past_guesses", [])
asyncdef exec_async(self, inputs):
# ... (調用LLM生成提示)
target, forbidden, past_guesses = inputs
prompt = f"Generate hint for '{target}'\nForbidden words: {forbidden}"
if past_guesses:
prompt += f"\nPrevious wrong guesses: {past_guesses}\nMake hint more specific."
hint = call_llm(prompt)
print(f"\nHinter: Here's your hint - {hint}")
return hint
asyncdef post_async(self, shared, prep_res, exec_res):
if exec_res isNone:
return"end"
# 3. 將生成的提示放入猜詞者的隊列
await shared["guesser_queue"].put(exec_res)
return"continue"# 返回Action,驅動流程繼續猜詞者 ??AsyncGuesser??
class AsyncGuesser(AsyncNode):
asyncdef prep_async(self, shared):
# 1. 從隊列中等待提示者的提示
hint = await shared["guesser_queue"].get()
return hint, shared.get("past_guesses", [])
asyncdef exec_async(self, inputs):
# ... (調用LLM生成猜測)
hint, past_guesses = inputs
prompt = f"Given hint: {hint}, past wrong guesses: {past_guesses}, make a new guess."
guess = call_llm(prompt)
print(f"Guesser: I guess it's - {guess}")
return guess
asyncdef post_async(self, shared, prep_res, exec_res):
# 2. 檢查答案
if exec_res.lower() == shared["target_word"].lower():
print("Game Over - Correct guess!")
await shared["hinter_queue"].put("GAME_OVER")
return"end"
# 3. 如果猜錯,將錯誤答案發回給提示者,以便其給出更好的提示
shared.setdefault("past_guesses", []).append(exec_res)
await shared["hinter_queue"].put(exec_res)
return"continue"兩個智能體通過共享存儲??shared???中的兩個??asyncio.Queue???(??hinter_queue???和??guesser_queue??)進行異步通信,一個用于接收信息,一個用于發送信息,實現了完美的解耦。
async def main():
# ... (初始化shared, 包括target_word, forbidden_words, 和兩個queue)
# 創建節點和流
hinter = AsyncHinter()
guesser = AsyncGuesser()
hinter_flow = AsyncFlow(start=hinter)
guesser_flow = AsyncFlow(start=guesser)
# 定義循環:當post返回"continue"時,節點會再次執行自己
hinter - "continue" >> hinter
guesser - "continue" >> guesser
# 使用asyncio.gather并發運行兩個智能體流
await asyncio.gather(
hinter_flow.run_async(shared),
guesser_flow.run_async(shared)
)每個智能體都被包裝在一個獨立的??AsyncFlow???中,并通過??"continue"??這個Action實現自我循環,不斷地接收、處理、發送消息。
核心理念
PocketFlow 把 LLM 工作流抽象為:
+-----------+
Shared | |
Store <--| Node |<-- Params(僅 Batch 用)
+-----------+
|
Action
v
+-----------+
| Node |
+-----------+- Node:執行 prep → exec → post 三段式。
- Action:post() 返回字符串,決定流向哪一個 successor。
- Flow:負責“根據 Action 走圖”的調度器。
- Shared Store:跨 Node 的全局數據約定。
核心源碼剖析 (pocketflow/__init__.py)
PocketFlow的強大之處在于其簡約的核心抽象。讓我們深入其僅有100行的源碼,逐一拆解其精妙設計。
BaseNode
??BaseNode??是所有節點的基石,它定義了節點最核心的兩個屬性和兩個方法:
* ??self.successors???: 一個字典,形態為??{'action_name': next_node}???。這是PocketFlow流程控制的脈搏。??post???方法返回的??action??字符串就是在這個字典里查找下一個要執行的節點。
* ??self.params??: 另一個字典,用于接收外部傳入的、節點級別的參數,在批處理場景(Batch)中尤其重要。
* ??next(self, node, actinotallow="default")???: 這個方法負責填充??successors???字典。??node_a.next(node_b, "some_action")???就相當于??node_a.successors["some_action"] = node_b??。
* ??_run(self, shared)???: 這是節點的生命周期方法,它嚴格按照??prep -> _exec -> post???的順序執行,并將??prep???的結果傳遞給??_exec???,再將兩者的結果傳遞給??post??。
class BaseNode:
def __init__(self): self.params,self.successors={},{}
def set_params(self,params): self.params=params
def next(self,node,actinotallow="default"):
if action in self.successors: warnings.warn(f"Overwriting successor for action '{action}'")
self.successors[action]=node; return node
def prep(self,shared): pass
def exec(self,prep_res): pass
def post(self,shared,prep_res,exec_res): pass
def _exec(self,prep_res): return self.exec(prep_res)
def _run(self,shared): p=self.prep(shared); e=self._exec(p); return self.post(shared,p,e)
def run(self,shared):
if self.successors: warnings.warn("Node won't run successors. Use Flow.")
return self._run(shared)DSL: >> 和 - 語法糖
PocketFlow中最令人驚艷的莫過于其定義流程的方式,如 ??node_a - "action" >> node_b??。這其實是巧妙地利用了Python的魔法方法實現的:
* ??__sub__(self, action)???: 當我們寫??node_a - "action"???時,Python會調用??node_a???的??__sub__???方法。這個方法并不做減法,而是返回一個臨時的??_ConditionalTransition???對象,這個對象保存了??node_a???和??"action"??。
* ??__rshift__(self, other)???: 當我們寫??... >> node_b???時,Python會調用??__rshift__??方法。
* 如果直接是??node_a >> node_b???,??node_a???的??__rshift__???被調用,它等價于??node_a.next(node_b, "default")??。
* 如果是??_ConditionalTransition(...) >> node_b???,那么臨時對象的??__rshift__???被調用,它執行的是??source_node.next(node_b, "saved_action")??。
class BaseNode:
......
def __rshift__(self,other): return self.next(other)
def __sub__(self,action):
if isinstance(action,str): return _ConditionalTransition(self,action)
raise TypeError("Action must be a string")
class _ConditionalTransition:
def __init__(self,src,action): self.src,self.actinotallow=src,action
def __rshift__(self,tgt): return self.src.next(tgt,self.action)通過這短短幾行代碼,PocketFlow就創造出了一種極具表現力的領域特定語言(DSL),讓流程定義變得像寫詩一樣自然。
Node
??Node???類在??BaseNode??的基礎上,增加了至關重要的容錯機制。
class Node(BaseNode):
def __init__(self,max_retries=1,wait=0): super().__init__(); self.max_retries,self.wait=max_retries,wait
def exec_fallback(self,prep_res,exc): raise exc
def _exec(self,prep_res):
for self.cur_retry in range(self.max_retries):
try: return self.exec(prep_res)
except Exception as e:
if self.cur_retry==self.max_retries-1: return self.exec_fallback(prep_res,e)
if self.wait>0: time.sleep(self.wait)這里的??_exec???方法覆蓋了??BaseNode???的版本。它不再是簡單地調用??self.exec???,而是用一個??for???循環包裹了??try...except??塊。
* 循環: ??max_retries??參數決定了循環次數。
* ??try???: 嘗試執行開發者定義的??self.exec(prep_res)??。如果成功,直接返回結果。
* ??except??: 如果捕獲到任何異常,它會檢查是否是最后一次重試。
* 如果是,則調用??exec_fallback??方法,讓開發者有機會進行優雅的失敗處理(默認是直接拋出異常)。
* 如果不是,則根據??wait??參數等待一段時間后,進入下一次循環重試。
Flow
??Flow???是整個工作流的驅動引擎,其核心是??_orch??(orchestrate,編排)方法。
class Flow(BaseNode):
def __init__(self,start=None): super().__init__(); self.start_node=start
# ...
def get_next_node(self,curr,action):
nxt=curr.successors.get(action or"default")
ifnot nxt and curr.successors: warnings.warn(f"Flow ends: '{action}' not found in {list(curr.successors)}")
return nxt
def _orch(self,shared,params=None):
curr,p,last_action =copy.copy(self.start_node),(params or {**self.params}),None
while curr:
curr.set_params(p)
last_actinotallow=curr._run(shared)
curr=copy.copy(self.get_next_node(curr,last_action))
return last_action??_orch??的邏輯非常清晰:
- 初始化:?
?curr???指針指向??start_node??。 - ?
?while??循環: 只要??curr???不為??None??,循環就繼續。 - 執行: 調用?
?curr._run(shared)???來執行當前節點的??prep->exec->post???生命周期,并將其??post???方法的返回值存為??last_action??。 - 尋路: 調用?
?get_next_node(curr, last_action)???,在當前節點的??successors???字典中尋找??last_action??對應的新節點。 - 前進: 將?
?curr???指針更新為找到的新節點。如果找不到,??get_next_node???返回??None??,循環將在下一次檢查時終止。 - ?
?copy.copy()??的使用確保了每個節點的實例在flow的單次運行中是獨立的,避免了狀態污染。
在我看來,PocketFlow的源碼展現了"少即是多"的原則。它沒有試圖成為一個包羅萬象的巨型框架,而是專注于提供一套最核心、最靈活的構建塊,將其他的一切(如工具調用、API封裝)都交由開發者在節點內部自由實現。
總結
說實話,PocketFlow目前的易用性還有待提升,不如許多框架那樣開箱即用。但正是這種精簡設計賦予了它更大的靈活性,開發者可以根據自己的需求進行DIY(詳細實現方案可參考官方Cookbook:https://github.com/The-Pocket/PocketFlow/tree/main/cookbook)
本文轉載自??AI 博物院?? 作者:longyunfeigu

















