-
Notifications
You must be signed in to change notification settings - Fork 6.5k
Description
Bug Description
- When using ReActAgent getting unhashable type error. I am using llama react agent to complet an assignment. When trying to use llama with cohere I am getting unhashable type error.
- If I add streaming of events it is infintely waiting.
- If I add hash attribute to ReActAgent getting 'async for' requires an object with aiter method, got generator
Version
0.14.6
Steps to Reproduce
Please see sample assignment code
`"""
Competitive Analysis Agent
This script implements an AI-powered Competitive Analysis Agent that utilizes Agentic RAG
(Retrieval-Augmented Generation) to analyze competitor information, respond to inquiries,
and deliver practical insights.
The agent uses Cohere's models for embeddings and generation, along with LlamaIndex for
efficient data indexing and retrieval. It follows the ReAct framework to combine reasoning
with actions for handling complex queries.
"""
import os
import pandas as pd
import logging
from pathlib import Path
from dotenv import load_dotenv
import cohere
from llama_index.core import VectorStoreIndex, Document, StorageContext, load_index_from_storage
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.tools import QueryEngineTool, FunctionTool
from llama_index.core.agent.workflow import ReActAgent
from llama_index.core.workflow import Context, StopEvent
from llama_index.embeddings.cohere import CohereEmbedding
from llama_index.llms.cohere import Cohere
from typing import Any, cast
runtime monkeypatch: identity-based hash
ReActAgent.hash = cast(Any, lambda self: id(self)) # type: ignore[attr-defined]
Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("competitive_analysis.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(name)
Load environment variables
load_dotenv()
COHERE_API_KEY = os.getenv("COHERE_API_KEY")
Initialize Cohere client
co = cohere.Client(COHERE_API_KEY)
Initialize Cohere model for ReAct agent
llm = Cohere(api_key=COHERE_API_KEY, model="command-a-03-2025")
Initialize Cohere embedding model for document embedding
embed_model = CohereEmbedding(
cohere_api_key=COHERE_API_KEY,
model_name="embed-english-v3.0",
input_type="search_document"
)
Define index storage path
INDEX_STORAGE_DIR = "index_storage"
class CompetitiveAnalysisAgent:
"""
A class that implements an AI-powered Competitive Analysis Agent using the ReAct framework.
This agent can analyze competitor data, respond to inquiries, and deliver practical insights
by combining reasoning with actions.
"""
def __init__(self):
"""Initialize the Competitive Analysis Agent with necessary components."""
self.query_history = [] # Store query history
self.index = self._create_or_load_index()
self.query_engine = self.index.as_query_engine(llm=llm, similarity_top_k=3)
# Create tools for the agent
self.competitor_tool = QueryEngineTool.from_defaults(
query_engine=self.query_engine,
name="competitor_data_tool",
description="Provides information about competitors including product descriptions, marketing strategies, financial summaries, strengths, and weaknesses."
)
logger.info("Competitive tool initialized successfully")
self.analysis_tool = FunctionTool.from_defaults(
fn=self.analyze_competitors,
name="competitor_analysis_tool",
description="Analyzes and compares competitors based on specific criteria."
)
logger.info("Analysis tool initialized successfully")
# Create ReActAgent with tools
self.agent = ReActAgent(
tools=[self.competitor_tool, self.analysis_tool],
llm=llm
)
self.ctx = Context(workflow=self.agent)
logger.info(msg="Competitive Analysis Agent initialized successfully")
def _create_or_load_index(self):
"""Create or load the vector index for competitor data."""
index_storage_path = Path(INDEX_STORAGE_DIR)
if index_storage_path.exists():
logger.info("Loading existing index from disk...")
storage_context = StorageContext.from_defaults(persist_dir=INDEX_STORAGE_DIR)
index = load_index_from_storage(storage_context=storage_context, embed_model=embed_model)
else:
logger.info("Creating new index for competitor data...")
# Load and preprocess competitor data
try:
competitors_df = pd.read_csv("competitor_data.csv")
logger.info(f"Loaded competitor data with {len(competitors_df)} entries")
# Convert DataFrame to list of Documents for indexing
documents = []
for _, row in competitors_df.iterrows():
# Create a text representation of each competitor
text = (
f"Competitor: {row['Competitor Name']}\n"
f"Product Description: {row['Product Description']}\n"
f"Marketing Strategy: {row['Marketing Strategy']}\n"
f"Financial Summary: {row['Financial Summary']}\n"
f"Strengths: {row['Strengths']}\n"
f"Weaknesses: {row['Weaknesses']}\n"
)
documents.append(Document(text=text))
# Create index with sentence splitter for better chunking
splitter = SentenceSplitter(chunk_size=512, chunk_overlap=50)
# Create index with sentence splitter for better chunking
index = VectorStoreIndex.from_documents(
documents,
embed_model=embed_model,
transformations=[splitter]
)
# Persist index to disk
os.makedirs(INDEX_STORAGE_DIR, exist_ok=True)
index.storage_context.persist(persist_dir=INDEX_STORAGE_DIR)
logger.info("Index created and saved to disk")
except Exception as e:
logger.error(f"Error creating index: {e}")
raise
return index
def analyze_competitors(self, aspect: str, competitors: str = "") -> str:
"""
Analyze competitors based on specific aspects.
Args:
aspect: The aspect to analyze (e.g., 'marketing', 'financial', 'strengths')
competitors: Comma-separated list of competitor names (optional)
Returns:
str: Analysis results
"""
try:
df = pd.read_csv("competitor_data.csv")
if competitors:
competitor_list = [c.strip() for c in competitors.split(',')]
df = df[df['Competitor Name'].isin(competitor_list)]
result_parts = []
if aspect.lower() == 'marketing':
for i in range(len(df)):
result_parts.append(f"{df.iloc[i]['Competitor Name']}: {df.iloc[i]['Marketing Strategy']}")
result = "\n".join(result_parts)
elif aspect.lower() == 'financial':
for i in range(len(df)):
result_parts.append(f"{df.iloc[i]['Competitor Name']}: {df.iloc[i]['Financial Summary']}")
result = "\n".join(result_parts)
elif aspect.lower() == 'strengths':
for i in range(len(df)):
result_parts.append(f"{df.iloc[i]['Competitor Name']}: {df.iloc[i]['Strengths']}")
result = "\n".join(result_parts)
elif aspect.lower() == 'weaknesses':
for i in range(len(df)):
result_parts.append(f"{df.iloc[i]['Competitor Name']}: {df.iloc[i]['Weaknesses']}")
result = "\n".join(result_parts)
elif aspect.lower() == 'products':
for i in range(len(df)):
result_parts.append(f"{df.iloc[i]['Competitor Name']}: {df.iloc[i]['Product Description']}")
result = "\n".join(result_parts)
else:
result = "Aspect not recognized. Available aspects: marketing, financial, strengths, weaknesses, products"
return result
except Exception as e:
logger.error(f"Error in competitor analysis: {e}")
return f"Error analyzing competitors: {str(e)}"
def reason_and_act(self, query: str) -> str:
"""
Process a query using ReAct agent with Cohere.
"""
logger.info(f"Processing query: {query}")
try:
# Use the ReAct agent to process the query
print(f"\nProcessing query with Cohere ReAct agent: {query}")
# Create a new context for this query
import asyncio
# Run the agent asynchronously
async def run_agent():
handler = self.agent.run(query, ctx=self.ctx)
print(f"\n waiting for stream events")
# Stream and process events in real-time
#async for event in handler.stream_events(True):
# if isinstance(event, StopEvent):
# print(f"Workflow completed with result: {event.result}")
# else:
# print(f"Received event: {event}")
response = await handler
return response
# Execute the async function
response = asyncio.run(run_agent())
# Save to history and return
self.query_history.append((query, str(response)))
logger.info("Returning the response")
return str(response)
except Exception as e:
error_msg = f"Error processing query: {str(e)}"
logger.error(error_msg)
import traceback
traceback.print_exc()
# Fallback to direct query engine if agent fails
try:
print(f"\nFalling back to direct query engine: {query}")
response = self.query_engine.query(query)
return str(response)
except Exception as e2:
return f"Error processing query: {str(e)}\nFallback error: {str(e2)}"
def get_query_history(self, limit: int = 5) -> list:
"""
Get recent query history.
Args:
limit: Maximum number of recent queries to return
Returns:
list: List of (query, response) tuples
"""
return self.query_history[-limit:] if self.query_history else []
def main():
"""Main function to run the Competitive Analysis Agent."""
print("=" * 80)
print("Welcome to the AI-Powered Competitive Analysis Agent!")
print("This agent can analyze competitor information and provide insights.")
print("Type 'exit' to quit, 'history' to view recent queries.")
print("=" * 80)
try:
# Initialize the agent
agent = CompetitiveAnalysisAgent()
while True:
user_query = input("\nEnter your query: ").strip()
if user_query.lower() == 'exit':
print("Thank you for using the Competitive Analysis Agent. Goodbye!")
break
if user_query.lower() == 'history':
history = agent.get_query_history()
if history:
print("\nRecent Queries:")
for i, (q, r) in enumerate(history, 1):
print(f"{i}. Query: {q}")
print(f" Response: {r[:100]}..." if len(r) > 100 else f" Response: {r}")
print()
else:
print("No query history yet.")
continue
if not user_query:
print("Please enter a valid query.")
continue
print("\nProcessing your query...")
response = agent.reason_and_act(user_query)
print(f"\nResponse: {response}")
except Exception as e:
logger.error(f"Application error: {e}")
print(f"An error occurred: {e}")
import traceback
traceback.print_exc()
if name == "main":
main()
`
Libraries used
#llama-index==0.11.17
llama-index==0.14.6
#llama-index-llms-cohere>=0.3.0,<0.4.0
llama-index-llms-cohere==0.6.1
#llama-index-embeddings-cohere>=0.2.0,<0.3.0
llama-index-embeddings-cohere==0.6.1
cohere>=5.0.0
pandas==2.1.3
numpy==1.26.2
python-dotenv==1.0.0
Python Version : 3.12.3
Relevant Logs/Tracbacks
1. UnHashable type error logs
2025-10-30 11:15:24,397 - __main__ - ERROR - Error processing query: unhashable type: 'ReActAgent'
Traceback (most recent call last):
File "/root/learning/ml/edureka/week5/wk5assignment/competitive_analysis_agent.py", line 228, in reason_and_act
response = asyncio.run(run_agent())
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.12/asyncio/runners.py", line 194, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "/usr/lib/python3.12/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/root/learning/ml/edureka/week5/wk5assignment/competitive_analysis_agent.py", line 224, in run_agent
response = await handler
^^^^^^^^^^^^^
File "/root/learning/ml/edureka/week5/wk5assignment/myenvwk5/lib/python3.12/site-packages/workflows/runtime/broker.py", line 143, in _run_workflow
registered = workflow_registry.get_registered_workflow(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/root/learning/ml/edureka/week5/wk5assignment/myenvwk5/lib/python3.12/site-packages/workflows/runtime/workflow_registry.py", line 43, in get_registered_workflow
plugin_map = self.workflows.get(workflow)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.12/weakref.py", line 452, in get
return self.data.get(ref(key),default)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: unhashable type: 'ReActAgent'
2. __aiter__ error logs
2025-10-30 11:16:00,332 - __main__ - ERROR - Error processing query: 'async for' requires an object with __aiter__ method, got generator
Traceback (most recent call last):
File "/root/learning/ml/edureka/week5/wk5assignment/competitive_analysis_agent.py", line 228, in reason_and_act
response = asyncio.run(run_agent())
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.12/asyncio/runners.py", line 194, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "/usr/lib/python3.12/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.12/asyncio/base_events.py", line 687, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/root/learning/ml/edureka/week5/wk5assignment/competitive_analysis_agent.py", line 224, in run_agent
response = await handler
^^^^^^^^^^^^^
File "/root/learning/ml/edureka/week5/wk5assignment/myenvwk5/lib/python3.12/site-packages/workflows/runtime/broker.py", line 157, in _run_workflow
workflow_result = await registered.workflow_function(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/root/learning/ml/edureka/week5/wk5assignment/myenvwk5/lib/python3.12/site-packages/workflows/runtime/control_loop.py", line 301, in control_loop
return await runner.run(start_event=start_event)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/root/learning/ml/edureka/week5/wk5assignment/myenvwk5/lib/python3.12/site-packages/workflows/runtime/control_loop.py", line 274, in run
result = await self.process_command(command)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/root/learning/ml/edureka/week5/wk5assignment/myenvwk5/lib/python3.12/site-packages/workflows/runtime/control_loop.py", line 201, in process_command
raise command.exception
File "/root/learning/ml/edureka/week5/wk5assignment/myenvwk5/lib/python3.12/site-packages/workflows/runtime/types/step_function.py", line 120, in wrapper
result = await partial_func()
^^^^^^^^^^^^^^^^^^^^
File "/root/learning/ml/edureka/week5/wk5assignment/myenvwk5/lib/python3.12/site-packages/llama_index_instrumentation/dispatcher.py", line 386, in async_wrapper
result = await func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/root/learning/ml/edureka/week5/wk5assignment/myenvwk5/lib/python3.12/site-packages/llama_index/core/agent/workflow/base_agent.py", line 390, in run_agent_step
agent_output = await self.take_step(
^^^^^^^^^^^^^^^^^^^^^
File "/root/learning/ml/edureka/week5/wk5assignment/myenvwk5/lib/python3.12/site-packages/llama_index/core/agent/workflow/react_agent.py", line 161, in take_step
last_chat_response = await self._get_streaming_response(ctx, input_chat)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/root/learning/ml/edureka/week5/wk5assignment/myenvwk5/lib/python3.12/site-packages/llama_index/core/agent/workflow/react_agent.py", line 103, in _get_streaming_response
async for last_chat_response in response:
File "/root/learning/ml/edureka/week5/wk5assignment/myenvwk5/lib/python3.12/site-packages/llama_index/core/llms/callbacks.py", line 89, in wrapped_gen
async for x in f_return_val:
File "/root/learning/ml/edureka/week5/wk5assignment/myenvwk5/lib/python3.12/site-packages/llama_index/llms/cohere/base.py", line 514, in gen
async for r in response:
TypeError: 'async for' requires an object with __aiter__ method, got generator