Questo tutorial ti guida nella creazione di un loop agentico in stile ReAct che utilizza l' API Gemini per il ragionamento e Temporal per la durabilità. Il codice sorgente completo per questo tutorial è disponibile su GitHub.
L'agente può chiamare strumenti, ad esempio cercare avvisi meteo o geolocalizzare un indirizzo IP, e continuerà a eseguire il loop finché non avrà informazioni sufficienti per rispondere.
Ciò che lo distingue da una tipica demo di agenti è la durabilità. Ogni chiamata LLM, ogni chiamata di strumenti e ogni passaggio del loop agentico viene mantenuto da Temporal. Se il processo si arresta in modo anomalo, la rete si interrompe o un'API va in timeout, Temporal riprova automaticamente e riprende dall'ultimo passaggio completato. Non viene persa la cronologia delle conversazioni e le chiamate di strumenti non vengono ripetute in modo errato.
Architettura
L'architettura è composta da tre parti:
- Workflow:il loop agentico che orchestra la logica di esecuzione.
- Attività:singole unità di lavoro (chiamate LLM, chiamate di strumenti) che Temporal rende durabili.
- Worker:il processo che esegue i workflow e le attività.
In questo esempio, inserirai tutti e tre questi elementi in un unico file (durable_agent_worker.py). In un'implementazione reale, li separeresti per consentire vari vantaggi di deployment e scalabilità. Inserirai il codice che fornisce un prompt all'agente in un secondo file (start_workflow.py).
Prerequisiti
Per completare questa guida, avrai bisogno di:
- Una chiave API Gemini. Puoi crearne una senza costi in Google AI Studio.
- Python versione 3.10 o successive.
- La CLI Temporal per l'esecuzione di un server di sviluppo locale.
Configurazione
Prima di iniziare, assicurati di avere un server di sviluppo Temporal in esecuzione localmente:
temporal server start-devPoi, installa le dipendenze richieste:
pip install temporalio google-genai httpx pydantic python-dotenvCrea un file .env nella directory del progetto con la tua chiave API Gemini. Puoi
ottenere una chiave API da
Google AI Studio.
echo "GOOGLE_API_KEY=your-api-key-here" > .envImplementazione
Il resto di questo tutorial illustra il file durable_agent_worker.py dall'inizio alla fine, creando l'agente passo dopo passo. Crea il file e segui le istruzioni.
Importazioni e configurazione del sandbox
Inizia con le importazioni che devono essere definite in anticipo. Il blocco workflow.unsafe.imports_passed_through() indica al sandbox del workflow di Temporal di consentire il passaggio di determinati moduli senza restrizioni. Questo è necessario perché diverse librerie (in particolare httpx, che è una sottoclasse di urllib.request.Request) utilizzano pattern che altrimenti il sandbox bloccherebbe.
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
import pydantic_core # noqa: F401
import annotated_types # noqa: F401
import httpx
from pydantic import BaseModel, Field
from google import genai
from google.genai import types
Istruzioni di sistema
Poi, definisci la personalità dell'agente. Le istruzioni di sistema indicano al modello come comportarsi. Questo agente è istruito a rispondere in haiku quando non sono necessari strumenti.
SYSTEM_INSTRUCTIONS = """
You are a helpful agent that can use tools to help the user.
You will be given an input from the user and a list of tools to use.
You may or may not need to use the tools to satisfy the user ask.
If no tools are needed, respond in haikus.
"""
Definizioni degli strumenti
Ora definisci gli strumenti che l'agente può utilizzare. Ogni strumento è una funzione asincrona con una stringa di documentazione descrittiva. Gli strumenti che accettano parametri utilizzano un modello Pydantic come singolo argomento. Questa è una best practice di Temporal che mantiene stabili le firme delle attività man mano che aggiungi campi facoltativi nel tempo.
import json
NWS_API_BASE = "https://api.weather.gov"
USER_AGENT = "weather-app/1.0"
class GetWeatherAlertsRequest(BaseModel):
"""Request model for getting weather alerts."""
state: str = Field(description="Two-letter US state code (e.g. CA, NY)")
async def get_weather_alerts(request: GetWeatherAlertsRequest) -> str:
"""Get weather alerts for a US state.
Args:
request: The request object containing:
- state: Two-letter US state code (e.g. CA, NY)
"""
headers = {"User-Agent": USER_AGENT, "Accept": "application/geo+json"}
url = f"{NWS_API_BASE}/alerts/active/area/{request.state}"
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=headers, timeout=5.0)
response.raise_for_status()
return json.dumps(response.json())
Poi, definisci gli strumenti per la geolocalizzazione degli indirizzi IP:
class GetLocationRequest(BaseModel):
"""Request model for getting location info from an IP address."""
ipaddress: str = Field(description="An IP address")
async def get_ip_address() -> str:
"""Get the public IP address of the current machine."""
async with httpx.AsyncClient() as client:
response = await client.get("https://icanhazip.com")
response.raise_for_status()
return response.text.strip()
async def get_location_info(request: GetLocationRequest) -> str:
"""Get the location information for an IP address including city, state, and country.
Args:
request: The request object containing:
- ipaddress: An IP address to look up
"""
async with httpx.AsyncClient() as client:
response = await client.get(f"http://ip-api.com/json/{request.ipaddress}")
response.raise_for_status()
result = response.json()
return f"{result['city']}, {result['regionName']}, {result['country']}"
Registro degli strumenti
Poi, crea un registro che mappa i nomi degli strumenti alle funzioni di gestione. La funzione
get_tools() genera oggetti FunctionDeclaration compatibili con Gemini
dai callable utilizzando FunctionDeclaration.from_callable_with_api_option().
from typing import Any, Awaitable, Callable
ToolHandler = Callable[..., Awaitable[Any]]
def get_handler(tool_name: str) -> ToolHandler:
"""Get the handler function for a given tool name."""
if tool_name == "get_location_info":
return get_location_info
if tool_name == "get_ip_address":
return get_ip_address
if tool_name == "get_weather_alerts":
return get_weather_alerts
raise ValueError(f"Unknown tool name: {tool_name}")
def get_tools() -> types.Tool:
"""Get the Tool object containing all available function declarations.
Uses FunctionDeclaration.from_callable_with_api_option() from the Google GenAI SDK
to generate tool definitions from the handler functions.
"""
return types.Tool(
function_declarations=[
types.FunctionDeclaration.from_callable_with_api_option(
callable=get_weather_alerts, api_option="GEMINI_API"
),
types.FunctionDeclaration.from_callable_with_api_option(
callable=get_location_info, api_option="GEMINI_API"
),
types.FunctionDeclaration.from_callable_with_api_option(
callable=get_ip_address, api_option="GEMINI_API"
),
]
)
Attività LLM
Ora definisci l'attività che chiama l'API Gemini. Le classi di dati GeminiChatRequest e GeminiChatResponse definiscono il contratto.
Disabiliterai la chiamata automatica di funzioni in modo che la chiamata LLM e la chiamata di strumenti vengano gestite come attività separate, aumentando la durabilità dell'agente. Disabiliterai anche i tentativi integrati dell'SDK (attempts=1) poiché Temporal gestisce i tentativi in modo duraturo.
import os
from dataclasses import dataclass
from temporalio import activity
@dataclass
class GeminiChatRequest:
"""Request parameters for a Gemini chat completion."""
model: str
system_instruction: str
contents: list[types.Content]
tools: list[types.Tool]
@dataclass
class GeminiChatResponse:
"""Response from a Gemini chat completion."""
text: str | None
function_calls: list[dict[str, Any]]
raw_parts: list[types.Part]
@activity.defn
async def generate_content(request: GeminiChatRequest) -> GeminiChatResponse:
"""Execute a Gemini chat completion with tool support."""
api_key = os.environ.get("GOOGLE_API_KEY")
if not api_key:
raise ValueError("GOOGLE_API_KEY environment variable is not set")
client = genai.Client(
api_key=api_key,
http_options=types.HttpOptions(
retry_options=types.HttpRetryOptions(attempts=1),
),
)
config = types.GenerateContentConfig(
system_instruction=request.system_instruction,
tools=request.tools,
automatic_function_calling=types.AutomaticFunctionCallingConfig(disable=True),
)
response = await client.aio.models.generate_content(
model=request.model,
contents=request.contents,
config=config,
)
function_calls = []
raw_parts = []
text_parts = []
if response.candidates and response.candidates[0].content:
for part in response.candidates[0].content.parts:
raw_parts.append(part)
if part.function_call:
function_calls.append(
{
"name": part.function_call.name,
"args": dict(part.function_call.args) if part.function_call.args else {},
}
)
elif part.text:
text_parts.append(part.text)
text = "".join(text_parts) if text_parts and not function_calls else None
return GeminiChatResponse(
text=text,
function_calls=function_calls,
raw_parts=raw_parts,
)
Attività di strumenti dinamici
Poi, definisci l'attività che esegue gli strumenti. Questa utilizza la funzionalità di attività dinamica di Temporal: il gestore di strumenti (un callable) viene ottenuto dal registro degli strumenti tramite la funzione get_handler. In questo modo è possibile definire agenti diversi semplicemente fornendo un insieme diverso di strumenti e istruzioni di sistema; il workflow che implementa il loop agentico non richiede modifiche.
L'attività esamina la firma del gestore per determinare come passare gli argomenti. Se il gestore prevede un modello Pydantic, gestisce il formato di output
nidificato prodotto da Gemini (ad esempio, {"request": {"state": "CA"}} anziché
un formato piatto {"state": "CA"}).
import inspect
from collections.abc import Sequence
from temporalio.common import RawValue
@activity.defn(dynamic=True)
async def dynamic_tool_activity(args: Sequence[RawValue]) -> dict:
"""Execute a tool dynamically based on the activity name."""
tool_name = activity.info().activity_type
tool_args = activity.payload_converter().from_payload(args[0].payload, dict)
activity.logger.info(f"Running dynamic tool '{tool_name}' with args: {tool_args}")
handler = get_handler(tool_name)
if not inspect.iscoroutinefunction(handler):
raise TypeError("Tool handler must be async (awaitable).")
sig = inspect.signature(handler)
params = list(sig.parameters.values())
if len(params) == 0:
result = await handler()
else:
param = params[0]
param_name = param.name
ann = param.annotation
if isinstance(ann, type) and issubclass(ann, BaseModel):
nested_args = tool_args.get(param_name, tool_args)
result = await handler(ann(**nested_args))
else:
result = await handler(**tool_args)
activity.logger.info(f"Tool '{tool_name}' result: {result}")
return result
Il workflow del loop agentico
Ora hai tutti gli elementi per completare la creazione dell'agente. La classe AgentWorkflow implementa un workflow contenente il loop dell'agente. All'interno di questo loop, l'LLM viene chiamato tramite l'attività (rendendolo duraturo), l'output viene esaminato e, se l'LLM ha scelto uno strumento, viene chiamato tramite dynamic_tool_activity.
In questo semplice agente in stile ReAct, una volta che l'LLM sceglie di non utilizzare uno strumento, il loop viene considerato completato e viene restituito il risultato finale dell'LLM.
from datetime import timedelta
@workflow.defn
class AgentWorkflow:
"""Agentic loop workflow that uses Gemini for LLM calls and executes tools."""
@workflow.run
async def run(self, input: str) -> str:
contents: list[types.Content] = [
types.Content(role="user", parts=[types.Part.from_text(text=input)])
]
tools = [get_tools()]
while True:
result = await workflow.execute_activity(
generate_content,
GeminiChatRequest(
model="gemini-3-flash-preview",
system_instruction=SYSTEM_INSTRUCTIONS,
contents=contents,
tools=tools,
),
start_to_close_timeout=timedelta(seconds=60),
)
if result.function_calls:
# Sending the complete raw_parts here ensures Gemini 3 thought
# signatures are propagated correctly.
contents.append(types.Content(role="model", parts=result.raw_parts))
for function_call in result.function_calls:
tool_result = await self._handle_function_call(function_call)
contents.append(
types.Content(
role="user",
parts=[
types.Part.from_function_response(
name=function_call["name"],
response={"result": tool_result},
)
],
)
)
else:
return result.text
# Leave this in place. You will un-comment it during a durability
# test later on.
# await asyncio.sleep(10)
async def _handle_function_call(self, function_call: dict) -> str:
"""Execute a tool via dynamic activity and return the result."""
tool_name = function_call["name"]
tool_args = function_call.get("args", {})
result = await workflow.execute_activity(
tool_name,
tool_args,
start_to_close_timeout=timedelta(seconds=30),
)
return result
Il loop agentico è completamente duraturo. Se il worker dell'agente si arresta in modo anomalo dopo diverse iterazioni del loop, Temporal riprenderà esattamente da dove si era interrotto senza dover richiamare le chiamate LLM o le chiamate di strumenti già eseguite.
Avvio del worker
Infine, collega tutto. Sebbene il codice implementi la logica di business necessaria in modo da sembrare in esecuzione in un unico processo, l'utilizzo di Temporal lo rende un sistema basato su eventi (in particolare, basato su eventi) in cui la comunicazione tra il workflow e le attività avviene tramite la messaggistica fornita da Temporal.
Il worker Temporal si connette al servizio Temporal e funge da pianificatore per le attività di workflow e attività. Il worker registra il workflow e entrambe le attività, quindi inizia ad ascoltare le attività.
import asyncio
from concurrent.futures import ThreadPoolExecutor
from dotenv import load_dotenv
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
from temporalio.envconfig import ClientConfig
from temporalio.worker import Worker
async def main():
config = ClientConfig.load_client_connect_config()
config.setdefault("target_host", "localhost:7233")
client = await Client.connect(
**config,
data_converter=pydantic_data_converter,
)
worker = Worker(
client,
task_queue="gemini-agent-python-task-queue",
workflows=[
AgentWorkflow,
],
activities=[
generate_content,
dynamic_tool_activity,
],
activity_executor=ThreadPoolExecutor(max_workers=10),
)
await worker.run()
if __name__ == "__main__":
load_dotenv()
asyncio.run(main())
Lo script client
Crea lo script client (start_workflow.py). Invia una query e attende il risultato. Tieni presente che si connette alla stessa coda di attività a cui fa riferimento il worker dell'agente: lo script start_workflow invia un'attività di workflow con il prompt dell'utente a questa coda di attività, avviando l'esecuzione dell'agente.
import asyncio
import sys
import uuid
from temporalio.client import Client
from temporalio.contrib.pydantic import pydantic_data_converter
async def main():
client = await Client.connect(
"localhost:7233",
data_converter=pydantic_data_converter,
)
query = sys.argv[1] if len(sys.argv) > 1 else "Tell me about recursion"
result = await client.execute_workflow(
"AgentWorkflow",
query,
id=f"gemini-agent-id-{uuid.uuid4()}",
task_queue="gemini-agent-python-task-queue",
)
print(f"\nResult:\n{result}")
if __name__ == "__main__":
asyncio.run(main())
Esegui l'agente
Se non l'hai ancora fatto, avvia il server di sviluppo Temporal:
temporal server start-devIn una nuova finestra del terminale, avvia il worker dell'agente:
python -m durable_agent_workerIn una terza finestra del terminale, invia una query all'agente:
python -m start_workflow "are there any weather alerts for where I am?"Nota l'output nel terminale di durable_agent_worker che mostra le azioni che si verificano in ogni iterazione del loop agentico. L'LLM è in grado di soddisfare la richiesta dell'utente chiamando una serie di strumenti a sua disposizione. Puoi visualizzare i passaggi eseguiti tramite l'interfaccia utente di Temporal all'indirizzo http://localhost:8233/namespaces/default/workflows.
Prova alcuni prompt diversi per vedere l'agente ragionare e chiamare gli strumenti:
python -m start_workflow "are there any weather alerts for New York?"python -m start_workflow "where am I?"python -m start_workflow "what is my ip address?"python -m start_workflow "tell me a joke"
L'ultimo prompt non richiede strumenti, quindi l'agente risponde in un haiku basato su SYSTEM_INSTRUCTIONS.
Testa la durabilità (facoltativo)
La creazione su Temporal garantisce che l'agente sopravviva senza problemi agli errori. Puoi testarlo utilizzando due esperimenti distinti.
Simulazione di un'interruzione della rete
In questo test, disabiliterai temporaneamente la connessione a internet del computer, invierai un workflow, osserverai Temporal riprovare automaticamente e poi ripristinerai la rete per vederla recuperare.
- Scollega la macchina da internet (ad esempio, disattiva il Wi-Fi).
Invia un workflow:
python -m start_workflow "tell me a joke"Controlla l'interfaccia utente di Temporal (
http://localhost:8233). Vedrai che l'attività LLM non riesce e che Temporal gestisce automaticamente i tentativi in background.Riconnettiti a internet.
Il successivo tentativo automatico raggiungerà correttamente l'API Gemini e il terminale stamperà il risultato finale.
Superare un arresto anomalo del worker
In questo test, interrompi il worker a metà dell'esecuzione e lo riavvii. Temporal riproduce la cronologia del workflow (origine eventi) e riprende dall'ultima attività completata: le chiamate LLM e le chiamate di strumenti già completate non vengono ripetute.
- Per darti il tempo di interrompere il worker, apri
durable_agent_worker.pye rimuovi temporaneamente il commento daawait asyncio.sleep(10)all'interno del looprundiAgentWorkflow. Riavvia il worker:
python -m durable_agent_workerInvia una query che attivi diversi strumenti:
python -m start_workflow "are there any weather alerts where I am?"Interrompi il processo worker in qualsiasi momento prima del completamento (
Ctrl-Cnel terminale worker o utilizzandokill %1se è in esecuzione in background).Riavvia il worker:
python -m durable_agent_worker
Temporal riproduce la cronologia del workflow. Le chiamate LLM e le chiamate di strumenti già completate non vengono eseguite di nuovo: i risultati vengono riprodotti immediatamente dalla cronologia (il log eventi). Il workflow viene completato correttamente.