Commit fed2e71e authored by alinazh's avatar alinazh
Browse files

Add builder for multi-agent system based on predefined universal agent core

parent e0bd80b3
Showing with 461 additions and 1 deletion
+461 -1
"""Example of building a multi-agent system with a scenario agent using GraphBuilder"""
import os
from typing import Annotated, Optional
import pubchempy as pcp
import rdkit.Chem as Chem
import requests
from ChemCoScientist.agents.agents_prompts import worker_prompt
from langchain.tools.render import render_text_description
from langchain_core.runnables.config import RunnableConfig
from langchain_core.tools import tool
from langgraph.graph import END
from langgraph.prebuilt import create_react_agent
from langgraph.types import Command
from rdkit.Chem.Descriptors import CalcMolDescriptors
from protollm.agents.builder import GraphBuilder
from protollm.connectors import create_llm_connector
@tool
def calc_prop_tool(
smiles: Annotated[str, "The SMILES of a molecule"],
property: Annotated[str, "The property to predict."],
):
"""Use this to predict molecular property.
Can calculate refractive index and freezing point
Do not call this tool more than once.
Do not call another tool if this returns results."""
result = 44.09
result_str = f"Successfully calculated:\n\n{property}\n\nStdout: {result}"
return result_str
@tool
def name2smiles(
mol: Annotated[str, "Name of a molecule"],
):
"""Use this to convert molecule name to smiles format. Only use for organic molecules"""
max_attempts = 3
for attempts in range(max_attempts):
try:
compound = pcp.get_compounds(mol, "name")
smiles = compound[0].canonical_smiles
return smiles
except BaseException as e:
# logger.exception(f"'name2smiles' failed with error: {e}")
return f"Failed to execute. Error: {repr(e)}"
return "I've couldn't obtain smiles, the name is wrong"
@tool
def smiles2name(smiles: Annotated[str, "SMILES of a molecule"]):
"""Use this to convert SMILES to IUPAC name of given molecule"""
url = f"https://pubchem.ncbi.nlm.nih.gov/rest/pug/compound/smiles/{smiles}/property/IUPACName/JSON"
max_attempts = 3
for attempts in range(max_attempts):
try:
response = requests.get(url)
if response.status_code == 200:
data = response.json()
iupac_name = data["PropertyTable"]["Properties"][0]["IUPACName"]
return iupac_name
else:
return "I've couldn't get iupac name"
except BaseException as e:
return f"Failed to execute. Error: {repr(e)}"
return "I've couldn't get iupac name"
@tool
def smiles2prop(
smiles: Annotated[str, "SMILES of a molecule"], iupac: Optional[str] = None
):
"""Use this to calculate all available properties of given molecule. Only use for organic molecules
params:
smiles: str, smiles of a molecule,
iupac: optional, default is None, iupac of molecule"""
try:
if iupac:
compound = pcp.get_compounds(iupac, "name")
if len(compound):
smiles = compound[0].canonical_smiles
res = CalcMolDescriptors(Chem.MolFromSmiles(smiles))
return res
except BaseException as e:
return f"Failed to execute. Error: {repr(e)}"
@tool
def generate_molecule(
params: Annotated[str, "Description of target molecule"], config: RunnableConfig
):
"""Use this to generate a molecule with given description. Returns smiles. Only use for organic molecules"""
llm = config["configurable"].get("model")
try:
prompt = (
"Generate smiles of molecule with given description. Answer only with smiles, nothing more: \
Question: The molecule is a nitrogen mustard drug indicated for use in the treatment of chronic lymphocytic leukemia (CLL) and indolent B-cell non-Hodgkin lymphoma (NHL) that has progressed during or within six months of treatment with rituximab or a rituximab-containing regimen. Bendamustine is a bifunctional mechlorethamine derivative capable of forming electrophilic alkyl groups that covalently bond to other molecules. Through this function as an alkylating agent, bendamustine causes intra- and inter-strand crosslinks between DNA bases resulting in cell death. It is active against both active and quiescent cells, although the exact mechanism of action is unknown. \
Answer: CN1C(CCCC(=O)O)=NC2=CC(N(CCCl)CCCl)=CC=C21 \
Question: The molecule is a mannosylinositol phosphorylceramide compound having a tetracosanoyl group amide-linked to a C20 phytosphingosine base, with hydroxylation at C-2 and C-3 of the C24 very-long-chain fatty acid. It is functionally related to an Ins-1-P-Cer(t20:0/2,3-OH-24:0).\
Answer: CCCCCCCCCCCCCCCCCCCCCC(O)C(O)C(=O)N[C@@H](COP(=O)(O)O[C@@H]1[C@H](O)[C@H](O)[C@@H](O)[C@H](O)[C@H]1OC1O[C@H](CO)[C@@H](O)[C@H](O)[C@@H]1O)[C@H](O)C(O)CCCCCCCCCCCCCCCC \
Question: "
+ params
+ "\n Answer: "
)
res = llm.invoke(prompt)
smiles = res.content
return smiles
except BaseException as e:
return f"Failed to execute. Error: {repr(e)}"
def chemist_node(state, config: dict):
"""
Executes a chemistry-related task using a ReAct-based agent.
Parameters
----------
state : dict | TypedDict
The current execution state containing the task plan.
config : dict
Configuration dictionary containing the LLM model and related settings.
Returns
-------
Command
An object specifying the next execution step and updates to the state.
"""
llm = config["configurable"]["llm"]
chem_agent = create_react_agent(
llm, chem_tools, state_modifier=worker_prompt + "admet = qed"
)
plan = state["plan"]
plan_str = "\n".join(f"{i+1}. {step}" for i, step in enumerate(plan))
task = plan[0]
task_formatted = f"""For the following plan:
{plan_str}\n\nYou are tasked with executing: {task}."""
max_retries = 3
for attempt in range(max_retries):
try:
config["configurable"]["state"] = state
agent_response = chem_agent.invoke({"messages": [("user", task_formatted)]})
return Command(
goto="replan_node",
update={
"past_steps": [(task, agent_response["messages"][-1].content)],
"nodes_calls": [("chemist_node", agent_response["messages"])],
},
)
except Exception as e:
print(
f"Chemist failed with error: {str(e)}. Retrying... ({attempt+1}/{max_retries})"
)
time.sleep(1.2**attempt)
return Command(
goto=END,
update={
"response": "I can't answer to your question right now( Perhaps there is something else that I can help? -><-"
},
)
if __name__ == "__main__":
os.environ["OPENAI_API_KEY"] = "YOUR_API_KEY"
model = create_llm_connector(
"https://api.vsegpt.ru/v1;meta-llama/llama-3.1-70b-instruct"
)
chem_tools = [name2smiles, smiles2name, smiles2prop, generate_molecule]
tools_rendered = render_text_description(chem_tools)
chem_tools_rendered = tools_rendered
conf = {
"recursion_limit": 50,
"configurable": {
"llm": model,
"max_retries": 1,
"scenario_agents": ["chemist_node"],
"scenario_agent_funcs": {"chemist_node": chemist_node},
"tools_for_agents": {
"chemist_node": [chem_tools_rendered],
},
"tools_descp": tools_rendered,
},
}
graph = GraphBuilder(conf)
res_1 = graph.run(
{"input": "What is the name of the molecule with the SMILES 'CCO'?"}, debug=True
)
res_2 = graph.run({"input": "What can you do?"}, debug=True)
res_3 = graph.run({"input": "Определи IUPAC для молекулы CCO"}, debug=True)
res_4 = graph.run(
{"input": "Сгенерируй какую-нибудь полезную молекулу для здоровья."}, debug=True
)
......@@ -6,6 +6,7 @@ from protollm.agents.agent_utils.parsers import (
replanner_parser,
supervisor_parser,
translator_parser,
chat_parser
)
......@@ -165,3 +166,22 @@ System_response: {system_response};
intermediate_thoughts: {intermediate_thoughts};
"""
)
chat_prompt = ChatPromptTemplate.from_template(
"""For the given objective, check whether it is simple enough to answer yourself. \
If you can answer without any help and tools and the question is simple inquery, then write your answer. If you can't do that, call next worker: planner
If the question is related to running models or checking for presence, training, inference - call planer!
You should't answer to a several-sentenced questions. You can only chat with user on a simle topics
Your objective is this:
{input}
Your output should match this JSON format, don't add any intros
{{
"action": {{
"next" | "response" : str | str
}}
}}
"""
).partial(format_instructions=chat_parser.get_format_instructions())
\ No newline at end of file
......@@ -6,7 +6,9 @@ from protollm.agents.agent_utils.pydantic_models import (
Translation,
Worker,
)
from protollm.agents.agent_utils.pydantic_models import Chat
chat_parser = PydanticOutputParser(pydantic_object=Chat)
planner_parser = PydanticOutputParser(pydantic_object=Plan)
supervisor_parser = PydanticOutputParser(pydantic_object=Worker)
replanner_parser = PydanticOutputParser(pydantic_object=Act)
......
from typing_extensions import TypedDict
from typing import Annotated, List, Tuple
import operator
class PlanExecute(TypedDict):
input: str
plan: List[str]
past_steps: Annotated[List[Tuple], operator.add]
next: str
response: str
visualization: str
language: str
translation: str
automl_results: str
nodes_calls: Annotated[List[Tuple], operator.add]
import os
from protollm.agents.agent_utils.states import PlanExecute
from langgraph.graph import END, START, StateGraph
from protollm.agents.universal_agents import (in_translator_node, plan_node,
re_translator_node, replan_node,
summary_node, supervisor_node,
chat_node)
from protollm.connectors import create_llm_connector
class GraphBuilder:
"""Builds a graph based on the basic structure of universal agents.
Need to add your own scenario agents via 'conf'.
Args:
conf (dict): Configuration dictionary with the following structure:
- recursion_limit (int): Maximum recursion depth for processing.
- configurable (dict): Configurations for the agents and tools.
- llm: BaseChatModel
- max_retries (int): Number of retries for failed tasks.
- scenario_agents (list): List of scenario agent names.
- scenario_agent_funcs (dict): Mapping of agent names to their function (link on ready agent-node).
- tools_for_agents (dict): Description of tools available for each agent.
- tools_descp: Rendered descriptions of tools.
Example:
conf = {
"recursion_limit": 50,
"configurable": {
"llm": model,
"max_retries": 1,
"scenario_agents": ["chemist_node"],
"scenario_agent_funcs": {"chemist_node": chemist_node},
"tools_for_agents": {
"chemist_node": [chem_tools_rendered]
},
"tools_descp": tools_rendered,
}
}
"""
def __init__(self, conf: dict):
self.conf = conf
self.app = self._build()
def _should_end_chat(self, state) -> str:
"""
Determines whether to continue the chat or transition to a different process.
Parameters
----------
state : dict | TypedDict
The current execution state, expected to contain "response".
Returns
-------
str
Returns "retranslator" if a response exists, otherwise "planner".
Notes
-----
- This function helps decide whether further processing is needed.
"""
if "response" in state and state["response"]:
return "retranslator"
else:
return "planner"
def _should_end(self, state) -> str:
"""
Determines the next step based on the presence of a response.
This function decides whether execution should proceed to summarization
or require further supervision.
Parameters
----------
state : PlanExecute
The current execution state, potentially containing a generated response.
Returns
-------
str
`"summary"` if a response is available, otherwise `"supervisor"`.
Notes
-----
- If the `"response"` key is present and non-empty, summarization is triggered.
- If no response is available, the system proceeds to the supervisor node.
"""
if "response" in state and state["response"]:
return "summary"
else:
return "supervisor"
def _routing_function_supervisor(self, state):
"""Determines the next agent after Supervisor"""
return state["next"]
def _build(self):
"""Build graph based on a non-dynamic agent skeleton"""
workflow = StateGraph(PlanExecute)
workflow.add_node("intranslator", in_translator_node)
workflow.add_node("retranslator", re_translator_node)
workflow.add_node("chat", chat_node)
workflow.add_node("planner", plan_node)
workflow.add_node("supervisor", supervisor_node)
workflow.add_node("replan_node", replan_node)
workflow.add_node("summary", summary_node)
for agent_name, node in self.conf["configurable"]["scenario_agent_funcs"].items():
workflow.add_node(agent_name, node)
workflow.add_edge(agent_name, "replan_node")
workflow.add_edge(START, "intranslator")
workflow.add_edge("intranslator", "chat")
workflow.add_conditional_edges(
"chat",
self._should_end_chat,
["planner", "retranslator"],
)
workflow.add_edge("planner", "supervisor")
workflow.add_conditional_edges(
"replan_node",
self._should_end,
["supervisor", "summary"],
)
workflow.add_edge("summary", "retranslator")
workflow.add_conditional_edges("supervisor", self._routing_function_supervisor)
workflow.add_edge("retranslator", END)
return workflow.compile()
def run(self, inputs: dict, debug: bool):
"""Start streaming the input through the graph."""
for event in self.app.stream(inputs, config=self.conf, debug=debug):
for k, v in event.items():
if k != "__end__":
print("===AGENT===")
print(k)
print("===========")
print(v)
try:
print("\n\nFINALLY ANSWER: ", v["response"].content)
except:
print("\n\FINALLY ANSWER: ", v["response"])
......@@ -21,14 +21,18 @@ from protollm.agents.agent_prompts import (
summary_prompt,
translate_prompt,
worker_prompt,
chat_prompt
)
from protollm.agents.agent_utils.parsers import (
planner_parser,
replanner_parser,
supervisor_parser,
translator_parser,
chat_parser
)
from protollm.agents.agent_utils.pydantic_models import Response
from langgraph.types import Command
from langgraph.graph import END
def in_translator_node(state: dict, config: dict) -> Union[Dict, Command]:
......@@ -460,4 +464,62 @@ def summary_node(
update={
"response": "I can't answer your question right now. Maybe I can assist with something else?"
},
)
\ No newline at end of file
)
def chat_node(state, config: dict):
"""
Processes user input through a chat agent and returns an appropriate response
or next action. This agent decides whether it can handle the user query itself.
If yes, responds with the {"response": agent_answer}.
Otherwise, calls main agentic system.
Parameters
----------
state : dict | TypedDict
The current execution state, containing "input" (the user message) and
optionally "translation" if the language is not English.
config : dict
Configuration dictionary containing a "configurable" sub-dictionary with the LLM model
under the key "model".
Returns
-------
dict
If the response is a direct reply, returns {"response": message, "visualization": None}.
If the response requires an action, returns {"next": action, "visualization": None}.
If retries are exhausted, transitions to the planner with an empty response.
Raises
------
Exception
Handles errors related to API failures, implementing exponential backoff (`2 ** attempt`).
Notes
-----
- If the user's language is not English, it processes the translated text instead.
- Resets visualization state on new responses.
"""
llm = config["configurable"]["llm"]
chat_agent = chat_prompt | llm | chat_parser
input = state["input"] if state.get('language', 'English') == 'English' else state['translation']
max_retries = 1
for attempt in range(max_retries):
try:
output = chat_agent.invoke(input)
if isinstance(output.action, Response):
state["response"] = output.action.response
else:
state["next"] = output.action.next
state["visualization"] = None
except Exception as e: # Handle OpenAI API errors
print(f"Chat failed with error: {str(e)}. Retrying... ({attempt+1}/{max_retries})")
time.sleep(1.2 ** attempt)
return Command(
goto='planner',
update={"response": None}
)
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment