-1.1 C
New York
Monday, February 3, 2025

Creation of a system based mostly on AI brokers with Langgraph: add persistence and transmission (step-by-step)


In our earlier tutorial, we construct an AFFEE AGENCY TO RESPOND Consultations crusing on the net. Nevertheless, after they construct brokers for longest duties, two essential ideas come into play: persistence and transmission. Persistence lets you save the standing of an agent at any given level, which lets you resume from that state in future interactions. That is essential for lengthy -term purposes. However, the transmission lets you problem actual -time alerts about what the agent is doing at any time, offering transparency and management over their actions. On this tutorial, we are going to enhance our agent including these highly effective traits.

Agent configuration

Let’s begin by recreating our agent. We’ll load the required setting variables, we are going to set up and import the required libraries, configure the Tavily search software, we are going to outline the agent’s standing and, lastly, we are going to construct the agent.

pip set up langgraph==0.2.53 langgraph-checkpoint==2.0.6 langgraph-sdk==0.1.36 langchain-groq langchain-community langgraph-checkpoint-sqlite==2.0.1
import os
os.environ('TAVILY_API_KEY') = ""
os.environ('GROQ_API_KEY') = ""

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_groq import ChatGroq
from langchain_community.instruments.tavily_search import TavilySearchResults

software = TavilySearchResults(max_results=2)

class AgentState(TypedDict):
    messages: Annotated(listing(AnyMessage), operator.add)

class Agent:
    def __init__(self, mannequin, instruments, system=""):
        self.system = system
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_openai)
        graph.add_node("motion", self.take_action)
        graph.add_conditional_edges("llm", self.exists_action, {True: "motion", False: END})
        graph.add_edge("motion", "llm")
        graph.set_entry_point("llm")
        self.graph = graph.compile()
        self.instruments = {t.title: t for t in instruments}
        self.mannequin = mannequin.bind_tools(instruments)

    def call_openai(self, state: AgentState):
        messages = state('messages')
        if self.system:
            messages = (SystemMessage(content material=self.system)) + messages
        message = self.mannequin.invoke(messages)
        return {'messages': (message)}

    def exists_action(self, state: AgentState):
        end result = state('messages')(-1)
        return len(end result.tool_calls) > 0

    def take_action(self, state: AgentState):
        tool_calls = state('messages')(-1).tool_calls
        outcomes = ()
        for t in tool_calls:
            print(f"Calling: {t}")
            end result = self.instruments(t('title')).invoke(t('args'))
            outcomes.append(ToolMessage(tool_call_id=t('id'), title=t('title'), content material=str(end result)))
        print("Again to the mannequin!")
        return {'messages': outcomes}

Add persistence

So as to add persistence, we are going to use Langgraph’s Checkpointer function. A checkpointer saves the agent standing later and between every node. For this tutorial, we are going to use SQLITESAVERA easy checkpointer that takes benefit of SQLite, a constructed -in database. Whereas we are going to use a reminiscence database by simplicity, it could simply join it to an exterior database or use different management factors similar to Redis or Postgres for a extra strong persistence.

from langgraph.checkpoint.sqlite import SqliteSaver
import sqlite3
sqlite_conn = sqlite3.join("checkpoints.sqlite",check_same_thread=False)
reminiscence = SqliteSaver(sqlite_conn)

Subsequent, we are going to modify our agent to just accept a checkpointer:

class Agent:
    def __init__(self, mannequin, instruments, checkpointer, system=""):
        # All the pieces else stays the identical as earlier than
        self.graph = graph.compile(checkpointer=checkpointer)
    # All the pieces else after this stays the identical

Now, we will create our agent with enabled persistence:

immediate = """You're a sensible analysis assistant. Use the search engine to search for info. 
You're allowed to make a number of calls (both collectively or in sequence). 
Solely search for info when you're certain of what you need. 
If you could search for some info earlier than asking a follow-up query, you might be allowed to try this!
"""
mannequin = ChatGroq(mannequin="Llama-3.3-70b-Specdec")
bot = Agent(mannequin, (software), system=immediate, checkpointer=reminiscence)

Add transmission

Transmission is crucial for actual -time updates. There are two varieties of transmission on which we are going to focus:

1. Message transmission: Problem intermediate messages similar to AI selections and gear outcomes.

2. Transmission tokens: Transmitting particular person tokens of the response of the LLM.
Let’s begin by transmitting messages. We’ll create a human message and use the stream Methodology to watch the actions of the agent in actual time.

messages = (HumanMessage(content material="What's the climate in Texas?"))
thread = {"configurable": {"thread_id": "1"}}
for occasion in bot.graph.stream({"messages": messages}, thread):
    for v in occasion.values():
        print(v('messages'))

Closing output: The present local weather in Texas is sunny with a temperature of 19.4 ° C (66.9 ° F) and a wind pace of 4.3 mph (6.8 kph) …

If you run this, you will note a movement of outcomes. First, a message of AI instructs the agent to name Tavily, adopted by a software message with the search outcomes, and at last, a message of AI that solutions the query.

Perceive thread identifications

He Thread_id It’s a essential a part of the thread configuration. It permits the agent to carry separated conversations with completely different customers or contexts. By assigning a novel thread to every dialog, the agent can monitor a number of interactions concurrently with out mixing them.

For instance, let’s proceed the dialog asking: “What occurs in Los Angeles?” Utilizing the identical thread_id:

messages = (HumanMessage(content material="What about in LA?"))
thread = {"configurable": {"thread_id": "1"}}
for occasion in bot.graph.stream({"messages": messages}, thread):
    for v in occasion.values():
        print(v)

Closing output: The present local weather in Los Angeles is sunny with a temperature of 17.2 ° C (63.0 ° F) and a wind pace of two.2 mph (3.6 kph) …

The agent infers that we’re asking concerning the climate, due to persistence. To confirm, we ask: “Which is hotter?”:

messages = (HumanMessage(content material="Which one is hotter?"))
thread = {"configurable": {"thread_id": "1"}}
for occasion in bot.graph.stream({"messages": messages}, thread):
    for v in occasion.values():
        print(v)

Closing output: Texas is hotter than angels. The present temperature in Texas is nineteen.4 ° C (66.9 ° F), whereas the present temperature in Los Angeles is 17.2 ° C (63.0 ° F)

The agent accurately compares the climate in Texas and the. To show if the persistence maintains the separate conversations, let’s ask the identical query with a Thread_id:

messages = (HumanMessage(content material="Which one is hotter?"))
thread = {"configurable": {"thread_id": "2"}}
for occasion in bot.graph.stream({"messages": messages}, thread):
    for v in occasion.values():
        print(v)

Departure: I want extra info to reply that query. Are you able to present extra context or specify which two issues are evaluating?

This time, the agent confuses as a result of he doesn’t have entry to the historical past of the earlier dialog.

Transmission tokens

To transmit tokens, we are going to use the Astream_events Methodology, which is asynchronous. We may even change to a Checkpointer Async.

from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver

async with AsyncSqliteSaver.from_conn_string(":reminiscence:") as checkpointer:
    abot = Agent(mannequin, (software), system=immediate, checkpointer=checkpointer)
    messages = (HumanMessage(content material="What's the climate in SF?"))
    thread = {"configurable": {"thread_id": "4"}}
    async for occasion in abot.graph.astream_events({"messages": messages}, thread, model="v1"):
        type = occasion("occasion")
        if type == "on_chat_model_stream":
            content material = occasion("information")("chunk").content material
            if content material:
                # Empty content material within the context of OpenAI means
                # that the mannequin is asking for a software to be invoked.
                # So we solely print non-empty content material
                print(content material, finish="|")

It will transmit tokens in actual time, supplying you with a dwell imaginative and prescient of the agent’s considering course of.

Conclusion

By including persistence and transmission, we’ve considerably improved the capacities of our AI agent. Persistence permits the agent to keep up the context by the interactions, whereas the transmission supplies actual -time details about their actions. These traits are important to construct purposes prepared for manufacturing, particularly those who contain a number of customers or human interactions within the circuit.

Within the subsequent tutorial, we are going to immerse ourselves in Human interactions within the loopthe place persistence performs a vital position in permitting an ideal collaboration between people and AI brokers. Keep tuned!

References:

  1. (Deplearning.ai) https://study.deEplearning.ai/programs/ai-agents-in-langgraph

Moreover, remember to observe us Twitter and be a part of our Telegram channel and LINKEDIN GRsplash. Don’t forget to affix our 75K+ ml of submen.

🚨 Know Intellagent: A framework of a number of open supply brokers to judge a posh conversational system (Promoted)


Vineet Kumar is a consulting intern in Marktechpost. He’s at present pursuing his BS of the Indian Institute of Know-how (IIT), Kanpur. He’s an computerized studying fanatic. He’s enthusiastic about analysis and the newest advances in deep studying, laptop imaginative and prescient and associated fields.

Related Articles

Latest Articles