Build long-running MCP servers on Amazon Bedrock AgentCore with Strand …

AI agents are rapidly evolving from mere chat interfaces into sophisticated autonomous workers that handle complex, time-intensive tasks. As organizations deploy agents to train machine learning (ML) models, process large datasets, and run extended simulations, the Model Context Protocol (MCP) has emerged as a standard for agent-server integrations. But a critical challenge remains: these operations can take minutes or hours to complete, far exceeding typical session timeframes. By using Amazon Bedrock AgentCore and Strands Agents to implement persistent state management, you can enable seamless, cross-session task execution in production environments. Imagine your AI agent initiating a multi-hour data processing job, your user closing their laptop, and the system seamlessly retrieving completed results when the user returns days later—with full visibility into task progress, outcomes, and errors. This capability transforms AI agents from conversational assistants into reliable autonomous workers that can handle enterprise-scale operations. Without these architectural patterns, you’ll encounter timeout errors, inefficient resource utilization, and potential data loss when connections terminate unexpectedly.
In this post, we provide you with a comprehensive approach to achieve this. First, we introduce a context message strategy that maintains continuous communication between servers and clients during extended operations. Next, we develop an asynchronous task management framework that allows your AI agents to initiate long-running processes without blocking other operations. Finally, we demonstrate how to bring these strategies together with Amazon Bedrock AgentCore and Strands Agents to build production-ready AI agents that can handle complex, time-intensive operations reliably.
Common approaches to handle long-running tasks
When designing MCP servers for long-running tasks, you might face a fundamental architectural decision: should the server maintain an active connection and provide real-time updates, or should it decouple task execution from the initial request? This choice leads to two distinct approaches: context messaging and async task management.
Using context messaging
The context messaging approach maintains continuous communication between the MCP server and client throughout task execution. This is achieved by using MCP’s built-in context object to send periodic notifications to the client. This approach is optimal for scenarios where tasks are typically completed within 10–15 minutes and network connectivity remains stable. The context messaging approach offers these advantages:

Straightforward implementation
No additional polling logic required
Straightforward client implementation
Minimal overhead

Using async task management
The async task management approach separates task initiation from execution and result retrieval. After executing the MCP tool, the tool immediately returns a task initiation message while executing the task in the background. This approach excels in demanding enterprise scenarios where tasks might run for hours, users need flexibility to disconnect and reconnect, and system reliability is paramount. The async task management approach provides these benefits:

True fire-and-forget operation
Safe client disconnection while tasks continue processing
Data loss prevention through persistent storage
Support for long-running operations (hours)
Resilience against network interruptions
Asynchronous workflows

Context messaging
Let’s begin by exploring the context messaging approach, which provides a straightforward solution for handling moderately long operations while maintaining active connections. This approach builds directly on existing capabilities of MCP and requires minimal additional infrastructure, making it an excellent starting point for extending your agent’s processing time limits. Imagine you’ve built an MCP server for an AI agent that helps data scientists train ML models. When a user asks the agent to train a complex model, the underlying process might take 10–15 minutes—far beyond the typical 30-second to 2-minute HTTP timeout limit in most environments. Without a proper strategy, the connection would drop, the operation would fail, and the user would be left frustrated. In a Streamable HTTP transport for MCP client implementation, these timeout constraints are particularly limiting. When task execution exceeds the timeout limit, the connection aborts and the agent’s workflow interrupts. This is where context messaging comes in. The following diagram illustrates the workflow when implementing the context messaging approach. Context messaging uses the built-in context object of MCP to send periodic signals from the server to the MCP client, effectively keeping the connection alive throughout longer operations. Think of it as sending “heartbeat” messages that help prevent the connection from timing out.

Figure 1: Illustration of workflow in context messaging approach

Here is a code example to implement the context messaging:

from mcp.server.fastmcp import Context, FastMCP
import asyncio

mcp = FastMCP(host=”0.0.0.0″, stateless_http=True)

@mcp.tool()
async def model_training(model_name: str, epochs: int, ctx: Context) -> str:
    “””Execute a task with progress updates.”””

    for i in range(epochs):
        # Simulate long running time training work
        progress = (i + 1) / epochs
        await asyncio.sleep(5)
        await ctx.report_progress(
            progress=progress,
            total=1.0,
            message=f”Step {i + 1}/{epochs}”,
        )

    return f”{model_name} training completed. The model artifact is stored in s3://templocation/model.pickle . The model training score is 0.87, validation score is 0.82.”

if __name__ == “__main__”:
    mcp.run(transport=”streamable-http”)

The key element here is the Context parameter in the tool definition. When you include a parameter with the Context type annotation, FastMCP automatically injects this object, giving you access to methods such as ctx.info() and ctx.report_progress(). These methods send messages to the connected client without terminating tool execution.
The report_progress() calls within the training loop serve as those critical heartbeat messages, making sure the MCP connection remains active throughout the extended processing period.
For many real-world scenarios, exact progress can’t be easily quantified—such as when processing unpredictable datasets or making external API calls. In these cases, you can implement a time-based heartbeat system:

from mcp.server.fastmcp import Context, FastMCP
import time
import asyncio

mcp = FastMCP(host=”0.0.0.0″, stateless_http=True)

@mcp.tool()
async def model_training(model_name: str, epochs: int, ctx: Context) -> str:
    “””Execute a task with progress updates.”””
    done_event = asyncio.Event()
    start_time = time.time()

    async def timer():
        while not done_event.is_set():
            elapsed = time.time() – start_time
            await ctx.info(f”Processing ……: {elapsed:.1f} seconds elapsed”)
            await asyncio.sleep(5)  # Check every 5 seconds
        return

    timer_task = asyncio.create_task(timer())

    ## main task#####################################
    for i in range(epochs):
        # Simulate long running time training work
        progress = (i + 1) / epochs
        await asyncio.sleep(5)
    #################################################

    # Signal the timer to stop and clean up
    done_event.set()
    await timer_task

    total_time = time.time() – start_time
    print(f”⏱ Total processing time: {total_time:.2f} seconds”)

    return f”{model_name} training completed. The model artifact is stored in s3://templocation/model.pickle . The model training score is 0.87, validation score is 0.82.”

if __name__ == “__main__”:
    mcp.run(transport=”streamable-http”)

This pattern creates an asynchronous timer that runs alongside your main task, sending regular status updates every few seconds. Using asyncio.Event() for coordination facilitates clean shutdown of the timer when the main work is completed.
When to use context messaging
Context messaging works best when:

Tasks take 1–15 minutes to complete*
Network connections are generally stable
The client session can remain active throughout the operation
You need real-time progress updates during processing
Tasks have predictable, finite execution times with clear termination conditions

*Note: “15 minutes” is based on the maximum time for synchronous requests Amazon Bedrock AgentCore offered. More details about Bedrock AgentCore service quotas can be found at Quotas for Amazon Bedrock AgentCore. If the infrastructure hosting the agent doesn’t implement hard time limits, be extremely cautious when using this approach for tasks that might potentially hang or run indefinitely. Without proper safeguards, a stuck task could maintain an open connection indefinitely, leading to resource depletion, unresponsive processes, and potentially system-wide stability issues.
Here are some important limitations to consider:

Continuous connection required – The client session must remain active throughout the entire operation. If the user closes their browser or the network drops, the work is lost.
Resource consumption – Keeping connections open consumes server and client resources, potentially increasing costs for long-running operations.
Network dependency – Network instability can still interrupt the process, requiring a full restart.
Ultimate timeout limits – Most infrastructures have hard timeout limits that can’t be circumvented with heartbeat messages.

Therefore, for truly long-running operations that might take hours or for scenarios where users need to disconnect and reconnect later, you’ll need the more robust asynchronous task management approach.
Async task management
Unlike the context messaging approach where clients must maintain continuous connections, the async task management pattern follows a “fire and forget” model:

Task initiation – Client makes a request to start a task and immediately receives a task ID
Background processing – Server executes the work asynchronously, with no client connection required
Status checking – Client can reconnect whenever to check progress using the task ID
Result retrieval – When they’re completed, results remain available for retrieval whenever the client reconnects

The following figure illustrates the workflow in the asynchronous task management approach.

Figure 2: Illustration of workflow in asynchronous task management approach

This pattern mirrors how you interact with batch processing systems in enterprise environments—submit a job, disconnect, and check back later when convenient. Here’s a practical implementation that demonstrates these principles:

from mcp.server.fastmcp import Context, FastMCP
import asyncio
import uuid
from typing import Dict, Any

mcp = FastMCP(host=”0.0.0.0″, stateless_http=True)

# task storage
tasks: Dict[str, Dict[str, Any]] = {}

async def _execute_model_training(
        task_id: str,
        model_name: str,
        epochs: int
    ):
    “””Background task execution.”””
    tasks[task_id][“status”] = “running”
    
    for i in range(epochs):
        tasks[task_id][“progress”] = (i + 1) / epochs
        await asyncio.sleep(2)

    tasks[task_id][“result”] = f”{model_name} training completed. The model artifact is stored in s3://templocation/model.pickle . The model training score is 0.87, validation score is 0.82.”
    
    tasks[task_id][“status”] = “completed”

@mcp.tool()
def model_training(
    model_name: str,
    epochs: int = 10
    ) -> str:
    “””Start model training task.”””
    task_id = str(uuid.uuid4())
    tasks[task_id] = {
        “status”: “started”,
        “progress”: 0.0,
        “task_type”: “model_training”
    }
    asyncio.create_task(_execute_model_training(task_id, model_name, epochs))
    return f”Model Training task has been initiated with task ID: {task_id}. Please check back later to monitor completion status and retrieve results.”

@mcp.tool()
def check_task_status(task_id: str) -> Dict[str, Any]:
    “””Check the status of a running task.”””
    if task_id not in tasks:
        return {“error”: “task not found”}
    
    task = tasks[task_id]
    return {
        “task_id”: task_id,
        “status”: task[“status”],
        “progress”: task[“progress”],
        “task_type”: task.get(“task_type”, “unknown”)
    }

@mcp.tool()
def get_task_results(task_id: str) -> Dict[str, Any]:
    “””Get results from a completed task.”””
    if task_id not in tasks:
        return {“error”: “task not found”}
    
    task = tasks[task_id]
    if task[“status”] != “completed”:
        return {“error”: f”task not completed. Current status: {task[‘status’]}”}
    
    return {
        “task_id”: task_id,
        “status”: task[“status”],
        “result”: task[“result”]
    }

if __name__ == “__main__”:
    mcp.run(transport=”streamable-http”)

This implementation creates a task management system with three distinct MCP tools:

model_training() – The entry point that initiates a new task. Rather than performing the work directly, it:

Generates a unique task identifier using Universally Unique Identifier (UUID)
Creates an initial task record in the storage dictionary
Launches the actual processing as a background task using asyncio.create_task()
Returns immediately with the task ID, allowing the client to disconnect

check_task_status() – Allows clients to monitor progress at their convenience by:

Looking up the task by ID in the storage dictionary
Returning current status and progress information
Providing appropriate error handling for missing tasks

get_task_results()– Retrieves completed results when ready by:

Verifying the task exists and is completed
Returning the results stored during background processing
Providing clear error messages when results aren’t ready

The actual work happens in the private _execute_model_training() function, which runs independently in the background after the initial client request is completed. It updates the task’s status and progress in the shared storage as it progresses, making this information available for subsequent status checks.
Limitations to consider
Although the async task management approach helps solve connectivity issues, it introduces its own set of limitations:

User experience friction – The approach requires users to manually check task status, remember task IDs across sessions, and explicitly request results, increasing interaction complexity.
Volatile memory storage – Using in-memory storage (as in our example) means the tasks and results are lost if the server restarts, making the solution unsuitable for production without persistent storage.
Serverless environment constraints – In ephemeral serverless environments, instances are automatically terminated after periods of inactivity, causing the in-memory task state to be permanently lost. This creates a paradoxical situation where the solution designed to handle long-running operations becomes vulnerable to the exact duration it aims to support. Unless users maintain regular check-ins to help prevent session time limits, both tasks and results could vanish.

Moving toward a robust solution
To address these critical limitations, you need to include external persistence that survives both server restarts and instance terminations. This is where integration with dedicated storage services becomes essential. By using external agent memory storage systems, you can fundamentally change where and how task information is maintained. Instead of relying on the MCP server’s volatile memory, this approach uses persistent external agent memory storage services that remain available regardless of server state.
The key innovation in this enhanced approach is that when the MCP server runs a long-running task, it writes the interim or final results directly into external memory storage, such as Amazon Bedrock AgentCore Memory that the agent can access, as illustrated in the following figure. This helps create resilience against two types of runtime failures:

The instance running the MCP server can be terminated due to inactivity after task completion
The instance hosting the agent itself can be recycled in ephemeral serverless environments

Figure 3. MCP integration with external memory

With external memory storage, when users return to interact with the agent—whether minutes, hours, or days later—the agent can retrieve the completed task results from persistent storage. This approach minimizes runtime dependencies: even if both the MCP server and agent instances are terminated, the task results remain safely preserved and accessible when needed.
The next section will explore how to implement this robust solution using Amazon Bedrock AgentCore Runtime as a serverless hosting environment, AgentCore Memory for persistent agent memory storage, and the Strands Agents framework to orchestrate these components into a cohesive system that maintains task state across session boundaries.
Amazon Bedrock AgentCore and Strands Agents implementation
Before diving into the implementation details, it’s important to understand the deployment options available for MCP servers on Amazon Bedrock AgentCore. There are two primary approaches: Amazon Bedrock AgentCore Gateway and AgentCore Runtime. AgentCore Gateway has a 5-minute timeout for invocations, making it unsuitable for hosting MCP servers that provide tools requiring extended response times or long-running operations. AgentCore Runtime offers significantly more flexibility with a 15-minute request timeout (for synchronous requests) and adjustable maximum session duration (for asynchronous processes; the default duration is 8 hours) and idle session timeout. Although you could host an MCP server in a traditional serverful environment for unlimited execution time, AgentCore Runtime provides an optimal balance for most production scenarios. You gain serverless benefits such as automatic scaling, pay-per-use pricing, and no infrastructure management, while the adjustable maximums session duration covers most real-world long running tasks—from data processing and model training to report generation and complex simulations. You can use this approach to build sophisticated AI agents without the operational overhead of managing servers while reserving serverful deployments only for the rare cases that genuinely require multiday executions. For more information about AgentCore Runtime and AgentCore Gateway service quotas, refer to Quotas for Amazon Bedrock AgentCore.
Next, we walk through the implementation, which is illustrated in the following diagram. This implementation consists of two interconnected components: the MCP server that executes long-running tasks and writes results to AgentCore Memory, and the agent that manages the conversation flow and retrieves those results when needed. This architecture creates a seamless experience where users can disconnect during lengthy processes and return later to find their results waiting for them.

MCP server implementation
Let’s examine how our MCP server implementation uses AgentCore Memory to achieve persistence:

from mcp.server.fastmcp import Context, FastMCP
import asyncio
import uuid
from typing import Dict, Any
import json
from bedrock_agentcore.memory import MemoryClient

mcp = FastMCP(host=”0.0.0.0″, stateless_http=True)
agentcore_memory_client = MemoryClient()

async def _execute_model_training(
        model_name: str,
        epochs: int,
        session_id: str,
        actor_id: str,
        memory_id: str
    ):
    “””Background task execution.”””
    
    for i in range(epochs):
        await asyncio.sleep(2)

    try:
        response = agentcore_memory_client.create_event(
            memory_id=memory_id,
            actor_id=actor_id,
            session_id=session_id,
            messages=[
                (
                    json.dumps({
                        “message”: {
                            “role”: “user”,
                            “content”: [
                                {
                                    “text”: f”{model_name} training completed. The model artifact is stored in s3://templocation/model.pickle . The model training score is 0.87, validation score is 0.82.”
                                }
                            ]
                        },
                        “message_id”: 0
                    }),
                    ‘USER’
                )
            ]
        )
        print(response)
    except Exception as e:
        print(f”Memory save error: {e}”)

    return

@mcp.tool()
def model_training(
        model_name: str,
        epochs: int,
        ctx: Context
    ) -> str:
    “””Start model training task.”””

    print(ctx.request_context.request.headers)
    mcp_session_id = ctx.request_context.request.headers.get(“mcp-session-id”, “”)
    temp_id_list = mcp_session_id.split(“@@@”)
    session_id = temp_id_list[0]
    memory_id= temp_id_list[1]
    actor_id  = temp_id_list[2]

    asyncio.create_task(_execute_model_training(
            model_name,
            epochs,
            session_id,
            actor_id,
            memory_id
        )
    )
    return f”Model {model_name}Training task has been initiated. Total training epochs are {epochs}. The results will be updated once the training is completed.”

if __name__ == “__main__”:
    mcp.run(transport=”streamable-http”)

The implementation relies on two key components that enable persistence and session management.

The agentcore_memory_client.create_event() method serves as the bridge between tool execution and persistent memory storage. When a background task is completed, this method saves the results directly to the agent’s memory in AgentCore Memory using the specified memory ID, actor ID, and session ID. Unlike traditional approaches where results might be stored temporarily or require manual retrieval, this integration enables task outcomes to become permanent parts of the agent’s conversational memory. The agent can then reference these results in future interactions, creating a continuous knowledge-building experience across multiple sessions.
The second crucial component involves extracting session context through ctx.request_context.request.headers.get(“mcp-session-id”, “”). The “Mcp-Session-Id” is part of standard MCP protocol. You can use this header to pass a composite identifier containing three essential pieces of information in a delimited format: session_id@@@memory_id@@@actor_id. This approach allows our implementation to retrieve the necessary context identifiers from a single header value. Headers are used instead of environment variables by necessity—these identifiers change dynamically with each conversation, whereas environment variables remain static from container startup. This design choice is particularly important in multi-tenant scenarios where a single MCP server simultaneously handles requests from multiple users, each with their own distinct session context.

Another important aspect in this example involves proper message formatting when storing events. Each message saved to AgentCore Memory requires two components: the content and a role identifier. These two components need to be formatted in a way that the agent framework can be recognized. Here is an example for Strands Agents framework:

messages=[
    (
        json.dumps({
            “message”: {
                “role”: “user”,
                “content”: [
                    {
                        “text”: <message to the memory>
                    }
                ]
            },
            “message_id”: 0
        }),
        ‘USER’
    )
]

The content is an inner JSON object (serialized with json.dumps()) that contains the message details, including role, text content, and message ID. The outer role identifier (USER in this example) helps AgentCore Memory categorize the message source.
Strands Agents implementation
Integrating Amazon Bedrock AgentCore Memory with Strands Agents is remarkably straightforward using the AgentCoreMemorySessionManager class from the Bedrock AgentCore SDK. As shown in the following code example, implementation requires minimal configuration—create an AgentCoreMemoryConfig with your session identifiers, initialize the session manager with this config, and pass it directly to your agent constructor. The session manager transparently handles the memory operations behind the scenes, maintaining conversation history and context across interactions while organizing memories using the combination of session_id, memory_id, and actor_id. For more information, refer to AgentCore Memory Session Manager.

from bedrock_agentcore.memory.integrations.strands.config import AgentCoreMemoryConfig
from bedrock_agentcore.memory.integrations.strands.session_manager import AgentCoreMemorySessionManager

@app.entrypoint
async def strands_agent_main(payload, context):

    session_id = context.session_id
    if not session_id:
        session_id = str(uuid.uuid4())
    print(f”Session ID: {session_id}”)

    memory_id = payload.get(“memory_id”)
    if not memory_id:
        memory_id = “”
    print(f”? Memory ID: {memory_id}”)

    actor_id = payload.get(“actor_id”)
    if not actor_id:
        actor_id = “default”
        
    agentcore_memory_config = AgentCoreMemoryConfig(
        memory_id=memory_id,
        session_id=session_id,
        actor_id=actor_id
    )

    session_manager = AgentCoreMemorySessionManager(
        agentcore_memory_config=agentcore_memory_config
    )
    
    user_input = payload.get(“prompt”)

    headers = {
        “authorization”: f”Bearer {bearer_token}”,
        “Content-Type”: “application/json”,
        “Mcp-Session-Id”: session_id + “@@@” + memory_id + “@@@” + actor_id
    }

    # Connect to an MCP server using SSE transport
    streamable_http_mcp_client = MCPClient(
        lambda: streamablehttp_client(
                mcp_url,
                headers,
                timeout=30
            )
        )

    with streamable_http_mcp_client:
        # Get the tools from the MCP server
        tools = streamable_http_mcp_client.list_tools_sync()

        # Create an agent with these tools        
        agent = Agent(
            tools = tools,
            callback_handler=call_back_handler,
            session_manager=session_manager
        )

The session context management is particularly elegant here. The agent receives session identifiers through the payload and context parameters supplied by AgentCore Runtime. These identifiers form a crucial contextual bridge that connects user interactions across multiple sessions. The session_id can be extracted from the context object (generating a new one if needed), and the memory_id and actor_id can be retrieved from the payload. These identifiers are then packaged into a custom HTTP header (Mcp-Session-Id) that’s passed to the MCP server during connection establishment.
To maintain this persistent experience across multiple interactions, clients must consistently provide the same identifiers when invoking the agent:

# invoke agentcore through boto3
boto3_response = agentcore_client.invoke_agent_runtime(
   agentRuntimeArn=agent_arn,
   qualifier=”DEFAULT”,
   payload=json.dumps(
           {
               “prompt”: user_input,
               “actor_id”: actor_id,
               “memory_id”: memory_id
           }
       ),
   runtimeSessionId = session_id,
)

By consistently providing the same memory_id, actor_id, and runtimeSessionId across invocations, users can create a continuous conversational experience where task results persist independently of session boundaries. When a user returns days later, the agent can automatically retrieve both conversation history and the task results that were completed during their absence.
This architecture represents a significant advancement in AI agent capabilities—transforming long-running operations from fragile, connection-dependent processes into robust, persistent tasks that continue working regardless of connection state. The result is a system that can deliver truly asynchronous AI assistance, where complex work continues in the background and results are seamlessly integrated whenever the user returns to the conversation.
Conclusion
In this post, we’ve explored practical ways to help AI agents handle tasks that take minutes or even hours to complete. Whether using the more straightforward approach of keeping connections alive or the more advanced method of injecting task results to agent’s memory, these techniques enable your AI agent to tackle valuable complex work without frustrating time limits or lost results.
We invite you to try these approaches in your own AI agent projects. Start with context messaging for moderate tasks, then move to async management as your needs grow. The solutions we’ve shared can be quickly adapted to your specific needs, helping you build AI that delivers results reliably—even when users disconnect and return days later. What long-running tasks could your AI assistants handle better with these techniques?
To learn more, see the Amazon Bedrock AgentCore documentation and explore our sample notebook.

About the Authors
Haochen Xie is a Senior Data Scientist at AWS Generative AI Innovation Center. He is an ordinary person.
Flora Wang is an Applied Scientist at AWS Generative AI Innovation Center, where she works with customers to architect and implement scalable Generative AI solutions that address their unique business challenges. She specializes in model customization techniques and agent-based AI systems, helping organizations harness the full potential of generative AI technology.
Yuan Tian is an Applied Scientist at the AWS Generative AI Innovation Center, where he works with customers across diverse industries—including healthcare, life sciences, finance, and energy—to architect and implement generative AI solutions such as agentic systems. He brings a unique interdisciplinary perspective, combining expertise in machine learning with computational biology.
Hari Prasanna Das is an Applied Scientist at the AWS Generative AI Innovation Center, where he works with AWS customers across different verticals to expedite their use of Generative AI. Hari holds a PhD in Electrical Engineering and Computer Sciences from the University of California, Berkeley. His research interests include Generative AI, Deep Learning, Computer Vision, and Data-Efficient Machine Learning.

How to Build an Atomic-Agents RAG Pipeline with Typed Schemas, Dynamic …

In this tutorial, we build an advanced, end-to-end learning pipeline around Atomic-Agents by wiring together typed agent interfaces, structured prompting, and a compact retrieval layer that grounds outputs in real project documentation. Also, we demonstrate how to plan retrieval, retrieve relevant context, inject it dynamically into an answering agent, and run an interactive loop that turns the setup into a reusable research assistant for any new Atomic Agents question. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserimport os, sys, textwrap, time, json, re
from typing import List, Optional, Dict, Tuple
from dataclasses import dataclass
import subprocess
subprocess.check_call([sys.executable, “-m”, “pip”, “install”, “-q”,
“atomic-agents”, “instructor”, “openai”, “pydantic”,
“requests”, “beautifulsoup4”, “scikit-learn”])
from getpass import getpass
if not os.environ.get(“OPENAI_API_KEY”):
os.environ[“OPENAI_API_KEY”] = getpass(“Enter OPENAI_API_KEY (input hidden): “).strip()
MODEL = os.environ.get(“OPENAI_MODEL”, “gpt-4o-mini”)
from pydantic import Field
from openai import OpenAI
import instructor
from atomic_agents import AtomicAgent, AgentConfig, BaseIOSchema
from atomic_agents.context import SystemPromptGenerator, ChatHistory, BaseDynamicContextProvider
import requests
from bs4 import BeautifulSoup

We install all required packages, import the core Atomic-Agents primitives, and set up Colab-compatible dependencies in one place. We securely capture the OpenAI API key from the keyboard and store it in the environment so downstream code never hardcodes secrets. We also lock in a default model name while keeping it configurable via an environment variable.

Copy CodeCopiedUse a different Browserdef fetch_url_text(url: str, timeout: int = 20) -> str:
r = requests.get(url, timeout=timeout, headers={“User-Agent”: “Mozilla/5.0”})
r.raise_for_status()
soup = BeautifulSoup(r.text, “html.parser”)
for tag in soup([“script”, “style”, “nav”, “header”, “footer”, “noscript”]):
tag.decompose()
text = soup.get_text(“n”)
text = re.sub(r”[ t]+”, ” “, text)
text = re.sub(r”n{3,}”, “nn”, text).strip()
return text

def chunk_text(text: str, max_chars: int = 1400, overlap: int = 200) -> List[str]:
if not text:
return []
chunks = []
i = 0
while i < len(text):
chunk = text[i:i+max_chars].strip()
if chunk:
chunks.append(chunk)
i += max_chars – overlap
return chunks

def clamp(s: str, n: int = 800) -> str:
s = (s or “”).strip()
return s if len(s) <= n else s[:n].rstrip() + “…”

We fetch web pages from the Atomic Agents repo and docs, then clean them into plain text so retrieval becomes reliable. We chunk long documents into overlapping segments, preserving context while keeping each chunk small enough for ranking and citation. We also add a small helper to clamp long snippets so our injected context stays readable.

Copy CodeCopiedUse a different Browserfrom sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

@dataclass
class Snippet:
doc_id: str
url: str
chunk_id: int
text: str
score: float

class MiniCorpusRetriever:
def __init__(self, docs: Dict[str, Tuple[str, str]]):
self.items: List[Tuple[str, str, int, str]] = []
for doc_id, (url, raw) in docs.items():
for idx, ch in enumerate(chunk_text(raw)):
self.items.append((doc_id, url, idx, ch))
if not self.items:
raise RuntimeError(“No documents were fetched; cannot build TF-IDF index.”)
self.vectorizer = TfidfVectorizer(stop_words=”english”, max_features=50000)
self.matrix = self.vectorizer.fit_transform([it[3] for it in self.items])

def search(self, query: str, k: int = 6) -> List[Snippet]:
qv = self.vectorizer.transform([query])
sims = cosine_similarity(qv, self.matrix).ravel()
top = sims.argsort()[::-1][:k]
out = []
for j in top:
doc_id, url, chunk_id, txt = self.items[j]
out.append(Snippet(doc_id=doc_id, url=url, chunk_id=chunk_id, text=txt, score=float(sims[j])))
return out

class RetrievedContextProvider(BaseDynamicContextProvider):
def __init__(self, title: str, snippets: List[Snippet]):
super().__init__(title=title)
self.snippets = snippets

def get_info(self) -> str:
blocks = []
for s in self.snippets:
blocks.append(
f”[{s.doc_id}#{s.chunk_id}] (score={s.score:.3f}) {s.url}n{clamp(s.text, 900)}”
)
return “nn”.join(blocks)

We build a mini retrieval system using TF-IDF and cosine similarity over the chunked documentation corpus. We wrap each retrieved chunk in a structured Snippet object to track doc IDs, chunk IDs, and citation scores. We then inject top-ranked chunks into the agent’s runtime via a dynamic context provider, keeping the answering agent grounded. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserclass PlanInput(BaseIOSchema):
“””Input schema for the planner agent: describes the user’s task and how many retrieval queries to draft.”””
task: str = Field(…)
num_queries: int = Field(4)

class PlanOutput(BaseIOSchema):
“””Output schema from the planner agent: retrieval queries, coverage checklist, and safety checks.”””
queries: List[str]
must_cover: List[str]
safety_checks: List[str]

class AnswerInput(BaseIOSchema):
“””Input schema for the answering agent: user question plus style constraints.”””
question: str
style: str = “concise but advanced”

class AnswerOutput(BaseIOSchema):
“””Output schema for the answering agent: grounded answer, next steps, and which citations were used.”””
answer: str
next_steps: List[str]
used_citations: List[str]

client = instructor.from_openai(OpenAI(api_key=os.environ[“OPENAI_API_KEY”]))

planner_prompt = SystemPromptGenerator(
background=[
“You are a rigorous research planner for a small RAG system.”,
“You propose retrieval queries that are diverse (lexical + semantic) and designed to find authoritative info.”,
“You do NOT answer the task; you only plan retrieval.”
],
steps=[
“Read the task.”,
“Propose diverse retrieval queries (not too long).”,
“List must-cover aspects and safety checks.”
],
output_instructions=[
“Return strictly the PlanOutput schema.”,
“Queries must be directly usable as search strings.”,
“Must-cover should be 4–8 bullets.”
]
)

planner = AtomicAgent[PlanInput, PlanOutput](
config=AgentConfig(
client=client,
model=MODEL,
system_prompt_generator=planner_prompt,
history=ChatHistory(),
)
)

answerer_prompt = SystemPromptGenerator(
background=[
“You are an expert technical tutor for Atomic Agents (atomic-agents).”,
“You are given retrieved context snippets with IDs like [doc#chunk].”,
“You must ground claims in the provided snippets and cite them inline.”
],
steps=[
“Read the question and the provided context.”,
“Synthesize an accurate answer using only supported facts.”,
“Cite claims inline using the provided snippet IDs.”
],
output_instructions=[
“Use inline citations like [readme#12] or [docs_home#3].”,
“If the context does not support something, say so briefly and suggest what to retrieve next.”,
“Return strictly the AnswerOutput schema.”
]
)

answerer = AtomicAgent[AnswerInput, AnswerOutput](
config=AgentConfig(
client=client,
model=MODEL,
system_prompt_generator=answerer_prompt,
history=ChatHistory(),
)
)

We define strict-typed schemas for planner and answerer inputs and outputs, and include docstrings to satisfy Atomic Agents’ schema requirements. We create an Instructor-wrapped OpenAI client and configure two Atomic Agents with explicit system prompts and chat history. We enforce structured outputs so the planner produces queries and the answerer produces a cited response with clear next steps.

Copy CodeCopiedUse a different BrowserSOURCES = {
“readme”: “https://github.com/BrainBlend-AI/atomic-agents”,
“docs_home”: “https://brainblend-ai.github.io/atomic-agents/”,
“examples_index”: “https://brainblend-ai.github.io/atomic-agents/examples/index.html”,
}

raw_docs: Dict[str, Tuple[str, str]] = {}
for doc_id, url in SOURCES.items():
try:
raw_docs[doc_id] = (url, fetch_url_text(url))
except Exception:
raw_docs[doc_id] = (url, “”)

non_empty = [d for d in raw_docs.values() if d[1].strip()]
if not non_empty:
raise RuntimeError(“All source fetches failed or were empty. Check network access in Colab and retry.”)

retriever = MiniCorpusRetriever(raw_docs)

def run_atomic_rag(question: str, k: int = 7, verbose: bool = True) -> AnswerOutput:
t0 = time.time()
plan = planner.run(PlanInput(task=question, num_queries=4))
all_snips: List[Snippet] = []
for q in plan.queries:
all_snips.extend(retriever.search(q, k=max(2, k // 2)))
best: Dict[Tuple[str, int], Snippet] = {}
for s in all_snips:
key = (s.doc_id, s.chunk_id)
if (key not in best) or (s.score > best[key].score):
best[key] = s
snips = sorted(best.values(), key=lambda x: x.score, reverse=True)[:k]
ctx = RetrievedContextProvider(title=”Retrieved Atomic Agents Context”, snippets=snips)
answerer.register_context_provider(“retrieved_context”, ctx)
out = answerer.run(AnswerInput(question=question, style=”concise, advanced, practical”))
if verbose:
print(out.answer)
return out

demo_q = “Teach me Atomic Agents at an advanced level: explain the core building blocks and show how to chain agents with typed schemas and dynamic context.”
run_atomic_rag(demo_q, k=7, verbose=True)

while True:
user_q = input(“nYour question> “).strip()
if not user_q or user_q.lower() in {“exit”, “quit”}:
break
run_atomic_rag(user_q, k=7, verbose=True)

We fetch a small set of authoritative Atomic Agents sources and build a local retrieval index from them. We implement a full pipeline function that plans queries, retrieves relevant context, injects it, and produces a grounded final answer. We finish by running a demo query and launching an interactive loop so we can keep asking questions and getting cited answers.

In conclusion, we completed the Atomic-Agents workflow in Colab, cleanly separating planning, retrieval, answering, and ensuring strong typing. We kept the system grounded by injecting only the highest-signal documentation chunks as dynamic context, and we enforced a citation discipline that makes outputs auditable. From here, we can scale this pattern by adding more sources, swapping in stronger retrievers or rerankers, introducing tool-use agents, and turning the pipeline into a production-grade research assistant that remains both fast and trustworthy.

Check out the FULL CODES here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post How to Build an Atomic-Agents RAG Pipeline with Typed Schemas, Dynamic Context Injection, and Agent Chaining appeared first on MarkTechPost.

NVIDIA Researchers Introduce KVTC Transform Coding Pipeline to Compres …

Serving Large Language Models (LLMs) at scale is a massive engineering challenge because of Key-Value (KV) cache management. As models grow in size and reasoning capability, the KV cache footprint increases and becomes a major bottleneck for throughput and latency. For modern Transformers, this cache can occupy multiple gigabytes.

NVIDIA researchers have introduced KVTC (KV Cache Transform Coding). This lightweight transform coder compresses KV caches for compact on-GPU and off-GPU storage. It achieves up to 20x compression while maintaining reasoning and long-context accuracy. For specific use cases, it can reach 40x or higher.

https://arxiv.org/pdf/2511.01815

The Memory Dilemma in LLM Inference

In production, inference frameworks treat local KV caches like databases. Strategies like prefix sharing promote the reuse of caches to speed up responses. However, stale caches consume scarce GPU memory. Developers currently face a difficult choice:

Keep the cache: Occupies memory needed for other users.

Discard the cache: Incurs the high cost of recomputation.

Offload the cache: Moves data to CPU DRAM or SSDs, leading to transfer overheads.

KVTC largely mitigates this dilemma by lowering the cost of on-chip retention and reducing the bandwidth required for offloading.

https://arxiv.org/pdf/2511.01815

How the KVTC Pipeline Works?

The method is inspired by classical media compression. It applies a learned orthonormal transform, followed by adaptive quantization and entropy coding.

1. Feature Decorrelation (PCA)

Different attention heads often show similar patterns and a high degree of correlation. KVTC uses Principal Component Analysis (PCA) to linearly decorrelate features. Unlike other methods that calculate a separate decomposition for every prompt, KVTC computes the PCA basis matrix V once on a calibration dataset. This matrix is then reused for all future caches at inference time.

2. Adaptive Quantization

The system exploits the PCA ordering to allocate a fixed bit budget across coordinates. High-variance components receive more bits, while others receive fewer. KVTC uses a dynamic programming (DP) algorithm to find the optimal bit allocation that minimizes reconstruction error. Crucially, the DP often assigns 0 bits to trailing principal components, allowing for early dimensionality reduction and faster performance.

3. Entropy Coding

The quantized symbols are packed and compressed using the DEFLATE algorithm. To maintain speed, KVTC leverages the nvCOMP library, which enables parallel compression and decompression directly on the GPU.

Protecting Critical Tokens

Not all tokens are compressed equally. KVTC avoids compressing two specific types of tokens because they contribute disproportionately to attention accuracy:

Attention Sinks: The 4 oldest tokens in the sequence.

Sliding Window: The 128 most recent tokens.

Ablation studies show that compressing these specific tokens can significantly lower or even collapse accuracy at high compression ratios.

Benchmarks and Efficiency

The research team tested KVTC with models like Llama-3.1, Mistral-NeMo, and R1-Qwen-2.5.

Accuracy: At 16x compression (roughly 20x after DEFLATE), the model consistently maintains results within 1 score point of vanilla models.

TTFT Reduction: For an 8K context length, kvtc can reduce Time-To-First-Token (TTFT) by up to 8x compared to full recomputation.

Speed: Calibration is fast; for a 12B model, it can be completed within 10 minutes on an NVIDIA H100 GPU.

Storage Overhead: The extra data stored per model is small, representing only 2.4% of model parameters for Llama-3.3-70B.

KVTC is a practical building block for memory-efficient LLM serving. It does not modify model weights and is directly compatible with other token eviction methods.

https://arxiv.org/pdf/2511.01815

Key Takeaways

High Compression with Low Accuracy Loss: KVTC achieves a standard 20x compression ratio while maintaining results within 1 score point of vanilla (uncompressed) models across most reasoning and long-context benchmarks.

Transform Coding Pipeline: The method utilizes a pipeline inspired by classical media compression, combining PCA-based feature decorrelation, adaptive quantization via dynamic programming, and lossless entropy coding (DEFLATE).

Critical Token Protection: To maintain model performance, KVTC avoids compressing the 4 oldest ‘attention sink’ tokens and a ‘sliding window’ of the 128 most recent tokens.

Operational Efficiency: The system is ‘tuning-free,’ requiring only a brief initial calibration (under 10 minutes for a 12B model) that leaves model parameters unchanged and adds minimal storage overhead—only 2.4% for a 70B model.

Significant Latency Reduction: By reducing the volume of data stored and transferred, KVTC can reduce Time-To-First-Token (TTFT) by up to 8x compared to the full recomputation of KV caches for long contexts.

Check out the Paper here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post NVIDIA Researchers Introduce KVTC Transform Coding Pipeline to Compress Key-Value Caches by 20x for Efficient LLM Serving appeared first on MarkTechPost.

NVIDIA Nemotron 3 Nano 30B MoE model is now available in Amazon SageMa …

Today we’re excited to announce that the NVIDIA Nemotron 3 Nano 30B model with  3B active parameters is now generally available in the Amazon SageMaker JumpStart model catalog. You can accelerate innovation and deliver tangible business value with Nemotron 3 Nano on Amazon Web Services (AWS) without having to manage model deployment complexities. You can power your generative AI applications with Nemotron capabilities using the managed deployment capabilities offered by SageMaker JumpStart.
Nemotron 3 Nano is a small language hybrid mixture of experts (MoE) model with the highest compute efficiency and accuracy for developers to drive highly-skilled agentic tasks at scale. The model is fully open with open-weights, datasets, and recipes, so developers can seamlessly customize, optimize, and deploy the model on their infrastructure to help meet their privacy and security requirements. Nemotron 3 Nano excels in coding and reasoning, and leads on benchmarks such as SWE Bench Verified, GPQA Diamond, AIME 2025, Arena Hard v2, and IFBench.
About Nemotron 3 Nano 30B
Nemotron 3 Nano is differentiated from other models by its architecture and accuracy, boasting strong performance in a variety of highly technical skills:

Architecture:

ο      MoE with hybrid Transformer-Mamba architectureο      Supports token budget for providing optimal accuracy with minimum reasoning token generation

Accuracy:

Leading accuracy on coding, scientific reasoning, math, and instruction following
Leads on benchmarks such as LiveCodeBench, GPQA Diamond, AIME 2025, BFCL , and IFBench (compared to other open language models under 30B)

Usability:

30B parameter model with 3 billion active parameters
Has a context window of up to 1 million tokens
Text-based foundation model, using text for both inputs and outputs

Prerequisites
To get started with Nemotron 3 Nano in Amazon SageMaker JumpStart, you must have a provisioned Amazon SageMaker Studio domain.
Get started with NVIDIA Nemotron 3 Nano 30B in SageMaker JumpStart
To test the Nemotron 3 Nano model in SageMaker JumpStart, open SageMaker Studio and choose Models in the navigation pane.  Search for NVIDIA in the search bar and choose NVIDIA Nemotron 3 Nano 30B as the model.

On the model details page, choose Deploy and follow the prompts to deploy the model.
After the model is deployed to a SageMaker AI endpoint, you can test it. You can access the model using the following AWS Command Line Interface (AWS CLI) code examples. You can use nvidia/nemotron-3-nano as the model ID.

cat > input.json << EOF
{
“model”: “${MODEL_ID}”,
“messages”: [
{
“role”: “system”,
“content”: “You are a helpful assistant.”
},
{
“role”: “user”,
“content”: “What is NVIDIA? Answer in 2-3 sentences.”
}],
“max_tokens”: 512,
“temperature”: 0.2,
“stream”: False, # Set to False for non-streaming mode,
“chat_template_kwargs”: {“enable_thinking”: False} # Set to False for non-reasoning mode
}
EOF

aws sagemaker-runtime invoke-endpoint
–endpoint-name ${ENDPOINT_NAME}
–region ${AWS_REGION}
–content-type ‘application/json’
–body fileb://input.json
> response.json

Alternatively, you can access the model using SageMaker SDK and Boto3 code. The following Python code examples show how to send a text message to the NVIDIA Nemotron 3 Nano 30B using the SageMaker SDK. For additional code examples, refer to the NVIDIA GitHub repo.

runtime_client = boto3.client(‘sagemaker-runtime’, region_name=region)
payload = {
“messages”: [
{“role”: “user”, “content”: prompt}
],
“max_tokens”: 1000
}

try:
response = self.runtime_client.invoke_endpoint(
EndpointName=self.endpoint_name,
ContentType=’application/json’,
Body=json.dumps(payload)
)

response_body = response[‘Body’].read().decode(‘utf-8’)
raw_response = json.loads(response_body)

# Parse the response using our custom parser
return self.parse_response(raw_response)

except Exception as e:
raise Exception(
f”Failed to invoke endpoint ‘{self.endpoint_name}’: {str(e)}. ”
f”Check that the endpoint is InService and you have least-privileged IAM permissions assigned.”
)

Now available
NVIDIA Nemotron 3 Nano is now available fully managed in SageMaker JumpStart. Refer to the model package for AWS Region availability. To learn more, check out the Nemotron Nano model page, the NVIDIA GitHub sample notebook for Nemotron 3 Nano 30B, and the Amazon SageMaker JumpStart pricing page.
Try the Nemotron 3 Nano model in Amazon SageMaker JumpStart today and send feedback to AWS re:Post for SageMaker JumpStart  or through your usual AWS Support contacts.

About the authors
Dan Ferguson is a Solutions Architect at AWS, based in New York, USA. As a machine learning services expert, Dan works to support customers on their journey to integrating ML workflows efficiently, effectively, and sustainably.
Pooja Karadgi leads product and strategic partnerships for Amazon SageMaker JumpStart, the machine learning and generative AI hub within SageMaker. She is dedicated to accelerating customer AI adoption by simplifying foundation model discovery and deployment, enabling customers to build production-ready generative AI applications across the entire model lifecycle – from onboarding and customization to deployment.
Benjamin Crabtree is a Senior Software Engineer on the Amazon SageMaker AI team, specializing in delivering the “last mile” experience to customers. He is passionate about democratizing the latest artificial intelligence breakthroughs by offering easy to use capabilities. Also, Ben is highly experienced in building machine learning infrastructure at scale.
Timothy Ma is a Principal Specialist in generative AI at AWS, where he collaborates with customers to design and deploy cutting-edge machine learning solutions. He also leads go-to-market strategies for generative AI services, helping organizations harness the potential of advanced AI technologies.
Abdullahi Olaoye is a Senior AI Solutions Architect at NVIDIA, specializing in integrating NVIDIA AI libraries, frameworks, and products with cloud AI services and open-source tools to optimize AI model deployment, inference, and generative AI workflows. He collaborates with AWS to enhance AI workload performance and drive adoption of NVIDIA-powered AI and generative AI solutions.
Nirmal Kumar Juluru is a product marketing manager at NVIDIA driving the adoption of AI software, models, and APIs in the NVIDIA NGC Catalog and NVIDIA AI Foundation models and endpoints. He previously worked as a software developer. Nirmal holds an MBA from Carnegie Mellon University and a bachelors in computer science from BITS Pilani.
Vivian Chen is a Deep Learning Solutions Architect at NVIDIA, where she helps teams bridge the gap between complex AI research and real-world performance. Specializing in inference optimization and cloud-integrated AI solutions, Vivian focuses on turning the heavy lifting of machine learning into fast, scalable applications. She is passionate about helping clients navigate NVIDIA’s accelerated computing stack to ensure their models don’t just work in the lab, but thrive in production.

Mastering Amazon Bedrock throttling and service availability: A compre …

In production generative AI applications, we encounter a series of errors from time to time, and the most common ones are requests failing with 429 ThrottlingException and 503 ServiceUnavailableException errors. As a business application, these errors can happen due to multiple layers in the application architecture.
Most of the cases in these errors are retriable but this impacts user experience as the calls to the application get delayed. Delays in responding can disrupt a conversation’s natural flow, reduce user interest, and ultimately hinder the widespread adoption of AI-powered solutions in interactive AI applications.
One of the most common challenges is multiple users flowing on a single model for widespread applications at the same time. Mastering these errors means the difference between a resilient application and frustrated users.
This post shows you how to implement robust error handling strategies that can help improve application reliability and user experience when using Amazon Bedrock. We’ll dive deep into strategies for optimizing performances for the application with these errors. Whether this is for a fairly new application or matured AI application, in this post you will be able to find the practical guidelines to operate with on these errors.
Prerequisites

AWS account with Amazon Bedrock access
Python 3.x and boto3 installed
Basic understanding of AWS services
IAM Permissions: Ensure you have the following minimum permissions:

bedrock:InvokeModel or bedrock:InvokeModelWithResponseStream for your specific models
cloudwatch:PutMetricData, cloudwatch:PutMetricAlarm for monitoring
sns:Publish if using SNS notifications
Follow the principle of least privilege – grant only the permissions needed for your use case

Example IAM policy:
{
“Version”: “2012-10-17”,
“Statement”: [
{
“Effect”: “Allow”,
“Action”: [
“bedrock:InvokeModel”
],
“Resource”: “arn:aws:bedrock:us-east-1:123456789012:model/anthropic.claude-*”
}
]
}
Note: This walkthrough uses AWS services that may incur charges, including Amazon CloudWatch for monitoring and Amazon SNS for notifications. See AWS pricing pages for details.
Quick Reference: 503 vs 429 Errors
The following table compares these two error types:

Aspect
503 ServiceUnavailable
429 ThrottlingException

Primary Cause
Temporary service capacity issues, server failures
Exceeded account quotas (RPM/TPM)

Quota Related
Not Quota Related
Directly quota-related

Resolution Time
Transient, refreshes faster
Requires waiting for quota refresh

Retry Strategy
Immediate retry with exponential backoff
Must sync with 60-second quota cycle

User Action
Wait and retry, consider alternatives
Optimize request patterns, increase quotas

Deep dive into 429 ThrottlingException
A 429 ThrottlingException means Amazon Bedrock is deliberately rejecting some of your requests to keep overall usage within the quotas you have configured or that are assigned by default. In practice, you will most often see three flavors of throttling: rate-based, token-based, and model-specific.
1. Rate-Based Throttling (RPM – Requests Per Minute)
Error Message:
ThrottlingException: Too many requests, please wait before trying again.
Or:
botocore.errorfactory.ThrottlingException: An error occurred (ThrottlingException) when calling the InvokeModel operation: Too many requests, please wait before trying again
What this actually indicates
Rate-based throttling is triggered when the total number of Bedrock requests per minute to a given model and Region crosses the RPM quota for your account. The key detail is that this limit is enforced across the callers, not just per individual application or microservice.
Imagine a shared queue at a coffee shop: it does not matter which team is standing in line; the barista can only serve a fixed number of drinks per minute. As soon as more people join the queue than the barista can handle, some customers are told to wait or come back later. That “come back later” message is your 429.
Multi-application spike scenario
Suppose you have three production applications, all calling the same Bedrock model in the same Region:

App A normally peaks around 50 requests per minute.
App B also peaks around 50 rpm.
App C usually runs at about 50 rpm during its own peak.

Ops has requested a quota of 150 RPM for this model, which seems reasonable since 50 + 50 + 50 = 150 and historical dashboards show that each app stays around its expected peak.
However, in reality your traffic is not perfectly flat. Maybe during a flash sale or a marketing campaign, App A briefly spikes to 60 rpm while B and C stay at 50. The combined total for that minute becomes 160 rpm, which is above your 150 rpm quota, and some requests start failing with ThrottlingException.
You can also get into trouble when the three apps shift upward at the same time over longer periods. Imagine a new pattern where peak traffic looks like this:

App A: 75 rpm
App B: 50 rpm
App C: 50 rpm

Your new true peak is 175 rpm even though the original quota was sized for 150. In this situation, you will see 429 errors regularly during those peak windows, even if average daily traffic still looks “fine.”
Mitigation strategies
For rate-based throttling, the mitigation has two sides: client behavior and quota management.
On the client side:

Implement request rate limiting to cap how many calls per second or per minute each application can send. APIs, SDK wrappers, or sidecars like API gateways can enforce per-app budgets so one noisy client does not starve others.
Use exponential backoff with jitter on 429 errors so that retries can become gradually less frequent and are de-synchronized across instances.
Align retry windows with the quota refresh period: because RPM is enforced per 60-second window, retries that happen several seconds into the next minute are more likely to succeed.

On the quota side:

Analyze CloudWatch metrics for each application to determine true peak RPM rather than relying on averages.
Sum those peaks across the apps for the same model/Region, add a safety margin, and request an RPM increase through AWS Service Quotas if needed.

In the previous example, if App A peaks at 75 rpm and B and C peak at 50 rpm, you should plan for at least 175 rpm and realistically target something like 200 rpm to provide room for growth and unexpected bursts.
2. Token-Based Throttling (TPM – Tokens Per Minute)
Error message:
botocore.errorfactory.ThrottlingException: An error occurred (ThrottlingException) when calling the InvokeModel operation: Too many tokens, please wait before trying again.
Why token limits matter
Even if your request count is modest, a single large prompt or a model that produces long outputs can consume thousands of tokens at once. Token-based throttling occurs when the sum of input and output tokens processed per minute exceeds your account’s TPM quota for that model.
For example, an application that sends 10 requests per minute with 15,000 input tokens and 5,000 output tokens each is consuming roughly 200,000 tokens per minute, which may cross TPM thresholds far sooner than an application that sends 200 tiny prompts per minute.
What this looks like in practice
You may notice that your application runs smoothly under normal workloads, but suddenly starts failing when users paste large documents, upload long transcripts, or run bulk summarization jobs. These are symptoms that token throughput, not request frequency, is the bottleneck.
How to respond
To mitigate token-based throttling:

Monitor token usage by tracking InputTokenCount and OutputTokenCount metrics and logs for your Bedrock invocations.
Implement a token-aware rate limiter that maintains a sliding 60-second window of tokens consumed and only issues a new request if there is enough budget left.
Break large tasks into smaller, sequential chunks so you spread token consumption over multiple minutes instead of exhausting the entire budget in one spike.
Use streaming responses when appropriate; streaming often gives you more control over when to stop generation so you do not produce unnecessarily long outputs.

For consistently high-volume, token-intensive workloads, you should also evaluate requesting higher TPM quotas or using models with larger context windows and better throughput characteristics.
3. Model-Specific Throttling
Error message:
botocore.errorfactory.ThrottlingException: An error occurred (ThrottlingException) when calling the InvokeModel operation: Model anthropic.claude-haiku-4-5-20251001-v1:0 is currently overloaded. Please try again later.
What is happening behind the scenes
Model-specific throttling indicates that a particular model endpoint is experiencing heavy demand and is temporarily limiting additional traffic to keep latency and stability under control. In this case, your own quotas might not be the limiting factor; instead, the shared infrastructure for that model is temporarily saturated.
How to respond
One of the most effective approaches here is to design for graceful degradation rather than treating this as a hard failure.

Implement model fallback: define a priority list of compatible models (for example, Sonnet → Haiku) and automatically route traffic to a secondary model if the primary is overloaded.
Combine fallback with cross-Region inference so you can use the same model family in a nearby Region if one Region is temporarily constrained.
Expose fallback behavior in your observability stack so you can know when your system is running in “degraded but functional” mode instead of silently masking problems.

Implementing robust retry and rate limiting
Once you understand the types of throttling, the next step is to encode that knowledge into reusable client-side components.
Exponential backoff with jitter
Here’s a robust retry implementation that uses exponential backoff with jitter. This pattern is essential for handling throttling gracefully:
import time
import random
from botocore.exceptions import ClientError

def bedrock_request_with_retry(bedrock_client, operation, **kwargs):
“””Secure retry implementation with sanitized logging.”””
max_retries = 5
base_delay = 1
max_delay = 60

for attempt in range(max_retries):
try:
if operation == ‘invoke_model’:
return bedrock_client.invoke_model(**kwargs)
elif operation == ‘converse’:
return bedrock_client.converse(**kwargs)
except ClientError as e:
# Security: Log error codes but not request/response bodies
# which may contain sensitive customer data
if e.response[‘Error’][‘Code’] == ‘ThrottlingException’:
if attempt == max_retries – 1:
raise

# Exponential backoff with jitter
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay * 0.1)
time.sleep(delay + jitter)
continue
else:
raise
This pattern avoids hammering the service immediately after a throttling event and helps prevent many instances from retrying at the same exact moment.
Token-Aware Rate Limiting
For token-based throttling, the following class maintains a sliding window of token usage and gives your caller a simple yes/no answer on whether it is safe to issue another request:
import time
from collections import deque

class TokenAwareRateLimiter:
def __init__(self, tpm_limit):
self.tpm_limit = tpm_limit
self.token_usage = deque()

def can_make_request(self, estimated_tokens):
now = time.time()
# Remove tokens older than 1 minute
while self.token_usage and self.token_usage[0][0] < now – 60:
self.token_usage.popleft()

current_usage = sum(tokens for _, tokens in self.token_usage)
return current_usage + estimated_tokens <= self.tpm_limit

def record_usage(self, tokens_used):
self.token_usage.append((time.time(), tokens_used))
In practice, you would estimate tokens before sending the request, call can_make_request, and only proceed when it returns True, then call record_usage after receiving the response.
Understanding 503 ServiceUnavailableException
A 503 ServiceUnavailableException tells you that Amazon Bedrock is temporarily unable to process your request, often due to capacity pressure, networking issues, or exhausted connection pools. Unlike 429, this is not about your quota; it is about the health or availability of the underlying service at that moment.
Connection Pool Exhaustion
What it looks like:
botocore.errorfactory.ServiceUnavailableException: An error occurred (ServiceUnavailableException) when calling the ConverseStream operation (reached max retries: 4): Too many connections, please wait before trying again.
In many real-world scenarios this error is caused not by Bedrock itself, but by how your client is configured:

By default, the boto3 HTTP connection pool size is relatively small (for example, 10 connections), which can be quickly exhausted by highly concurrent workloads.
Creating a new client for every request instead of reusing a single client per process or container can multiply the number of open connections unnecessarily.

To help fix this, share a single Bedrock client instance and increase the connection pool size:
import boto3
from botocore.config import Config

# Security Best Practice: Never hardcode credentials
# boto3 automatically uses credentials from:
# 1. Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
# 2. IAM role (recommended for EC2, Lambda, ECS)
# 3. AWS credentials file (~/.aws/credentials)
# 4. IAM roles for service accounts (recommended for EKS)

# Configure larger connection pool for parallel execution
config = Config(
max_pool_connections=50, # Increase from default 10
retries={‘max_attempts’: 3}
)
bedrock_client = boto3.client(‘bedrock-runtime’, config=config)
This configuration allows more parallel requests through a single, well-tuned client instead of hitting client-side limits.
Temporary Service Resource Issues
What it looks like:
botocore.errorfactory.ServiceUnavailableException: An error occurred (ServiceUnavailableException) when calling the InvokeModel operation: Service temporarily unavailable, please try again.
In this case, the Bedrock service is signaling a transient capacity or infrastructure issue, often affecting on-demand models during demand spikes. Here you should treat the error as a temporary outage and focus on retrying smartly and failing over gracefully:

Use exponential backoff retries, similar to your 429 handling, but with parameters tuned for slower recovery.
Consider using cross-Region inference or different service tiers to help get more predictable capacity envelopes for your most critical workloads.

Advanced resilience strategies
When you operate mission-critical systems, simple retries are not enough; you also want to avoid making a bad situation worse.
Circuit Breaker Pattern
The circuit breaker pattern helps prevent your application from continuously calling a service that is already failing. Instead, it quickly flips into an “open” state after repeated failures, blocking new requests for a cooling-off period.

CLOSED (Normal): Requests flow normally.
OPEN (Failing): After repeated failures, new requests are rejected immediately, helping reduce pressure on the service and conserve client resources.
HALF_OPEN (Testing): After a timeout, a small number of trial requests are allowed; if they succeed, the circuit closes again.

Why This Matters for Bedrock
When Bedrock returns 503 errors due to capacity issues, continuing to hammer the service with requests only makes things worse. The circuit breaker pattern helps:

Reduce load on the struggling service, helping it recover faster
Fail fast instead of wasting time on requests that will likely fail
Provide automatic recovery by periodically testing if the service is healthy again
Improve user experience by returning errors quickly rather than timing out

The following code implements this:
import time
from enum import Enum

class CircuitState(Enum):
CLOSED = “closed” # Normal operation
OPEN = “open” # Failing, reject requests
HALF_OPEN = “half_open” # Testing if service recovered

class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED

def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if time.time() – self.last_failure_time > self.timeout:
self.state = CircuitState.HALF_OPEN
else:
raise Exception(“Circuit breaker is OPEN”)

try:
result = func(*args, **kwargs)
self.on_success()
return result
except Exception as e:
self.on_failure()
raise

def on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED

def on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()

if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN

# Usage
circuit_breaker = CircuitBreaker()

def make_bedrock_request():
return circuit_breaker.call(bedrock_client.invoke_model, **request_params)
Cross-Region Failover Strategy with CRIS
Amazon Bedrock cross-Region inference (CRIS) helps add another layer of resilience by giving you a managed way to route traffic across Regions.

Global CRIS Profiles: can send traffic to AWS commercial Regions, typically offering the best combination of throughput and cost (often around 10% savings).
Geographic CRIS Profiles: CRIS profiles confine traffic to specific geographies (for example, US-only, EU-only, APAC-only) to help satisfy strict data residency or regulatory requirements.

For applications without data residency requirements, global CRIS offers enhanced performance, reliability, and cost efficiency.
From an architecture standpoint:

For non-regulated workloads, using a global profile can significantly improve availability and absorb regional spikes.
For regulated workloads, configure geographic profiles that align with your compliance boundaries, and document those decisions in your governance artifacts.

Bedrock automatically encrypts data in transit using TLS and does not store customer prompts or outputs by default; combine this with CloudTrail logging for compliance posture.
Monitoring and Observability for 429 and 503 Errors
You cannot manage what you cannot see, so robust monitoring is essential when working with quota-driven errors and service availability. Setting up comprehensive Amazon CloudWatch monitoring is essential for proactive error management and maintaining application reliability.
Note: CloudWatch custom metrics, alarms, and dashboards incur charges based on usage. Review CloudWatch pricing for details.
Essential CloudWatch Metrics
Monitor these CloudWatch metrics:

Invocations: Successful model invocations
InvocationClientErrors: 4xx errors including throttling
InvocationServerErrors: 5xx errors including service unavailability
InvocationThrottles: 429 throttling errors
InvocationLatency: Response times
InputTokenCount/OutputTokenCount: Token usage for TPM monitoring

For better insight, create dashboards that:

Separate 429 and 503 into different widgets so you can see whether a spike is quota-related or service-side.
Break down metrics by ModelId and Region to find the specific models or Regions that are problematic.
Show side-by-side comparisons of current traffic vs previous weeks to spot emerging trends before they become incidents.

Critical Alarms
Do not wait until users notice failures before you act. Configure CloudWatch alarms with Amazon SNS notifications based on thresholds such as:
For 429 Errors:

A high number of throttling events in a 5-minute window.
Consecutive periods with non-zero throttle counts, indicating sustained pressure.
Quota utilization above a chosen threshold (for example, 80% of RPM/TPM).

For 503 Errors:

Service success rate falling below your SLO (for example, 95% over 10 minutes).
Sudden spikes in 503 counts correlated with specific Regions or models.
Service availability (for example, <95% success rate)
Signs of connection pool saturation on client metrics.

Alarm Configuration Best Practices

Use Amazon Simple Notification Service (Amazon SNS) topics to route alerts to your team’s communication channels (Slack, PagerDuty, email)
Set up different severity levels: Critical (immediate action), Warning (investigate soon), Info (trending issues)
Configure alarm actions to trigger automated responses where appropriate
Include detailed alarm descriptions with troubleshooting steps and runbook links
Test your alarms regularly to make sure notifications are working correctly
Do not include sensitive customer data in alarm messages

Log Analysis Queries
CloudWatch Logs Insights queries help you move from “we see errors” to “we understand patterns.” Examples include:
Find 429 error patterns:
fields @timestamp, @message
| filter @message like /ThrottlingException/
| stats count() by bin(5m)
| sort @timestamp desc
Analyze 503 error correlation with request volume:
fields @timestamp, @message
| filter @message like /ServiceUnavailableException/
| stats count() as error_count by bin(1m)
| sort @timestamp desc
Wrapping Up: Building Resilient Applications
We’ve covered a lot of ground in this post, so let’s bring it all together. Successfully handling Bedrock errors requires:

Understand root causes: Distinguish quota limits (429) from capacity issues (503)
Implement appropriate retries: Use exponential backoff with different parameters for each error type
Design for scale: Use connection pooling, circuit breakers, and Cross-Region failover
Monitor proactively: Set up comprehensive CloudWatch monitoring and alerting
Plan for growth: Request quota increases and implement fallback strategies

Conclusion
Handling 429 ThrottlingException and 503 ServiceUnavailableException errors effectively is a crucial part of running production-grade generative AI workloads on Amazon Bedrock. By combining quota-aware design, intelligent retries, client-side resilience patterns, cross-Region strategies, and strong observability, you can keep your applications responsive even under unpredictable load.
As a next step, identify your most critical Bedrock workloads, enable the retry and rate-limiting patterns described here, and build dashboards and alarms that expose your real peaks rather than just averages. Over time, use real traffic data to refine quotas, fallback models, and regional deployments so your AI systems can remain both powerful and dependable as they scale.
For teams looking to accelerate incident resolution, consider enabling AWS DevOps Agent—an AI-powered agent that investigates Bedrock errors by correlating CloudWatch metrics, logs, and alarms just like an experienced DevOps engineer would. It learns your resource relationships, works with your observability tools and runbooks, and can significantly reduce mean time to resolution (MTTR) for 429 and 503 errors by automatically identifying root causes and suggesting remediation steps.
Learn More

Amazon Bedrock Documentation
Amazon Bedrock Quotas
Cross-Region Inference
Cross-Region Inference Security
SNS Security
AWS Logging Best Practices
AWS Bedrock Security Best Practices
AWS IAM Best Practices – Least Privilege

About the Authors

Farzin Bagheri
Farzin Bagheri is a Principal Technical Account Manager at AWS, where he supports strategic customers in achieving the highest levels of cloud operational maturity. Farzin joined AWS in 2013, and his focus in the recent years has been on identifying common patterns in cloud operation challenges and developing innovative solutions and strategies that help both AWS and its customers navigate complex technical landscapes.

Abel Laura
Abel Laura is a Technical Operations Manager with AWS Support, where he leads customer-centric teams focused on emerging generative AI products. With over a decade of leadership experience, he partners with technical support specialists to transform complex challenges into innovative, technology-driven solutions for customers. His passion lies in helping organizations harness the power of emerging AI technologies to drive meaningful business outcomes. In his free time, Abel enjoys spending time with his family and mentoring aspiring tech leaders.

Arun KM
Arun is a Principal Technical Account Manager at AWS, where he supports strategic customers in building production-ready generative AI applications with operational excellence. His focus in recent years has been on Amazon Bedrock, helping customers troubleshoot complex error patterns, customize open-source models, optimize model performance, and develop resilient AI architectures that can maximize return on investment and scale reliably in production environments.

Aswath Ram A Srinivasan
Aswath Ram A Srinivasan is a Sr. Cloud Support Engineer at AWS. With a strong background in ML, he has three years of experience building AI applications and specializes in hardware inference optimizations for LLM models. As a Subject MatterExpert, he tackles complex scenarios and use cases, helping customers unblock challenges and accelerate their path to production-ready solutions using Amazon Bedrock, Amazon SageMaker, and other AWS services. In his free time, Aswath enjoys photography and researching Machine Learning and Generative AI.

Swann provides Generative AI to millions of IoT Devices using Amazon B …

If you’re managing Internet of Things (IoT) devices at scale, alert fatigue is probably undermining your system’s effectiveness. This post shows you how to implement intelligent notification filtering using Amazon Bedrock and its gen-AI capabilities. You’ll learn model selection strategies, cost optimization techniques, and architectural patterns for deploying gen-AI at IoT scale, based on Swann Communications deployment across millions of devices.
Smart home security customers now expect systems that can tell the difference between a delivery person and a potential intruder—not just detect motion. Customers were being overwhelmed with lot of daily notifications or false positives, with a lot of alerts being triggered by events that were irrelevant to the customers, such as passing cars, pets moving around, and so on. Users became frustrated with constant false alerts and started ignoring notifications entirely, including real security threats.
As a pioneer in do-it-yourself (DIY) security solutions, Swann Communications has built a global network of more than 11.74 million connected devices, serving homeowners and businesses across multiple continents. Swann partnered with Amazon Web Services (AWS) to develop a multi-model generative AI notification system to evolve their notification system from a basic, reactive alert mechanism into an intelligent, context-aware security assistant.
Business challenges driving the solution
Before implementing the new solution, Swann faced several critical challenges that required a fundamentally different approach to security notifications.
Swann’s previous system had basic detection that could only identify human or pet events without contextual awareness—treating a delivery person the same as a potential intruder—while offering no customization options for users to define what constituted a meaningful alert for their unique security needs. These technical constraints, compounded by scalability challenges in managing notifications cost-efficiently across tens of millions of devices, made it clear that incremental improvements wouldn’t suffice—Swann needed a fundamentally smarter approach.
Approximately 20 daily notifications per camera—most of them irrelevant—caused customers to miss critical security events, with many users disabling notifications within the first few months. This significantly reduced system effectiveness, demonstrating the need for intelligent filtering that delivered only meaningful alerts. Rather than managing multiple vendors and custom integrations, Swann used different AWS cloud services that work together. By using AWS integrated services, Swann’s engineering team could concentrate on creating new security features.
Why AWS and Amazon Bedrock were selected
When evaluating AI partners, Swann prioritized enterprise-grade capabilities that could reliably scale. AWS stood out for several key reasons:
Enterprise-grade AI capabilities
Swann chose AWS for its comprehensive, integrated approach to deploying generative AI at scale. Amazon Bedrock, a fully managed service, provided access to multiple foundation models through a single API, handling GPU provisioning, model deployment, and scaling automatically, so that Swann could test and compare different model families (such as Claude and Nova) without infrastructure changes while optimizing for either speed or accuracy based on each scenario, such as high-volume routine screening, threat verification requiring detailed analysis, time-sensitive alerts, and complex behavioral assessment. With approximately 275 million monthly inferences, the AWS pay-per-use pricing model, and the ability to use cost-effective models such as Nova Lite for routine analysis resulted in cost optimization. AWS services delivered low-latency inference across North America, Europe, and Asia-Pacific while providing data residency compliance and high availability for mission-essential security applications.
The AWS environment used by Swann included AWS IoT Core for device connectivity, Amazon Simple Storage Service (Amazon S3) for scalable storage and storing video feeds, and AWS Lambda to run code in response to events without managing servers, scaling from zero to thousands of executions and charging only for compute time used. Amazon Cognito is used to manage user authentication and authorization with secure sign-in, multi-factor authentication, social identity integration, and temporary AWS credentials. Amazon Simple Query Service (Amazon SQS) is used to manage message queuing, buffering requests during traffic spikes, and helping to ensure reliable processing even when thousands of cameras trigger simultaneously.
By using these capabilities to remove the effort of managing multiple vendors and custom integrations, Swann could focus on innovation rather than infrastructure. This cloud-centred integration accelerated time-to-market by 2 months while reducing operational overhead, an enabled the cost-effective deployment of sophisticated AI capabilities across millions of devices.
Scalability and performance requirements
Swann’s solution needed to handle millions of concurrent devices (more than 11.74 million cameras generating frames 24/7), variable workload patterns with peak activity during evening hours and weekends, real-time processing to provide sub-second latency for critical security events, global distribution with consistent performance across multiple geographic regions, and cost predictability through transparent pricing that scales linearly with usage. Swann found that Amazon Bedrock and AWS services gave them the best of both worlds: a global network that could handle their massive scale, plus smart cost controls that let them pick exactly the right model for each situation.
Solution architecture overview and implementation
Swann’s dynamic notifications system uses Amazon Bedrock, strategically using four foundation models (Nova Lite, Nova Pro, Claude Haiku, and Claude Sonnet) across two key features to balance performance, cost, and accuracy. This architecture, shown in the following figure, demonstrates how AWS services can be combined to create a scalable, intelligent video analysis solution using generative AI capabilities while optimizing for both performance and cost:

Edge device integration: Smart cameras and doorbells connect through the AWS IoT Device Gateway, providing real-time video feeds for analysis.
Data pipeline: Video content flows through Amazon EventBridge, Amazon S3, and Amazon SQS for reliable storage and message queuing.
Intelligent frame processing: Amazon Elastic Compute Cloud (Amazon EC2) instances (G3 and G4 family) use computer vision libraries to segment video’s into frames and handle frame selection and filtering to optimize processing efficiency. G3 and G4 instances are GPU-powered virtual servers designed for parallel processing workloads such as video analysis and AI inference. Unlike traditional CPUs that process tasks sequentially, GPUs contain thousands of cores that can analyze multiple video frames simultaneously. This means that Swann can process frames from thousands of cameras concurrently without latency bottlenecks, providing near real-time security monitoring.
Serverless processing: Lambda functions invoke Amazon Bedrock and implement model selection logic based on use case requirements.
Tiered model strategy: A cost-effective approach using multiple models with varying capabilities. Amazon Nova Lite for speed and cost efficiency in routine high-volume screening, Nova Pro for balanced performance in threat verification, Claude Haiku for ultra-low latency in time-critical alerts, and Claude Sonnet for advanced reasoning in complex behavioral analysis requiring nuanced reasoning.
Dynamic notifications: The custom notification service delivers real-time alerts to mobile applications based on detection results.

Best practices for generative AI implementation
The following best practices can help organizations optimize cost, performance, and accuracy when implementing similar generative AI solutions at scale:

Understanding RPM and token limits: Requests per minute (RPM) limits define the number of API calls allowed per minute, requiring applications to implement queuing or retry logic to handle high-volume workloads. Tokens are the basic units AI models use to process text and images with costs calculated per thousand tokens, making concise prompts essential for reducing expenses at scale.
Business logic optimization: Swann reduced API calls by 88% (from 17,000 to 2,000 RPM) by implementing intelligent pre-filtering (motion detection, zone-based analysis, and duplicate frame elimination) before invoking AI models.
Prompt engineering and token optimization: Swann achieved 88% token reduction (from 150 to 18 tokens per request) through three key strategies:

optimizing image resolution to reduce input tokens while preserving visual quality.
Deploying a custom pre-filtering model on GPU based EC2 instances to eliminate 65% of false detections (swaying branches, passing cars) before reaching Amazon Bedrock.
Engineering ultra-concise prompts with structured response formats that replaced verbose natural language with machine-parseable key-value pairs (for example, threat:LOW|type:person|action:delivery). Swann’s customer surveys revealed that these optimizations not only reduced latency and cost but also improved threat detection accuracy from 89% to 95%.

Prompt versioning, optimization, and testing: Swann versioned prompts with performance metadata (accuracy, cost, and latency) and A/B tested on 5–10% of traffic before rollout. Swann also uses Amazon Bedrock prompt optimization.
Model selection and tiered strategy: Swann selected models based on activity type.

Nova Lite (87% of requests): Handles fast screening of routine activity, such as passing cars, pets, and delivery personnel. Its low cost, high throughput, and sub-millisecond latency make it essential for high-volume, real-time analysis where speed and efficiency matter more than precision.
Nova Pro (8% of requests): Escalates from Nova Lite when potential threats require verification with higher accuracy. Distinguishes delivery personnel from intruders and identifies suspicious behavior patterns.
Claude Haiku (2% of requests): Powers the Notify Me When feature for immediate notification of user-defined criteria. Provides ultra-low latency for time-sensitive custom alerts.
Claude Sonnet (3% of requests): Handles complex edge cases requiring sophisticated reasoning. Analyzes multi-person interactions, ambiguous scenarios, and provides nuanced behavioral assessment.
Results: This intelligent routing achieves 95% overall accuracy while reducing costs by 99.7% compared to using Claude Sonnet for all requests from a projected $2.1 million to $6 thousand monthly. The key insight was that matching model capabilities to task complexity enables cost-effective generative AI deployment at scale, with business logic pre-filtering and tiered model selection delivering far greater savings than model choice alone.

Model distillation strategy: Swann taught smaller, faster AI models to mimic the intelligence of larger ones—like creating a lightweight version that’s almost as smart but works much faster and costs less than large models. For new features, Swann is exploring Nova model distillation techniques. It allows knowledge transfer from larger advanced models to smaller efficient ones. It also helps optimize model performance for particular use cases without requiring extensive labelled training data.
Implement comprehensive monitoring: Use Amazon CloudWatch to track critical performance metrics including latency percentiles—p50 (median response time), p95 (95th percentile, capturing worst-case for most users), and p99 (99th percentile, identifying outliers and system stress)—alongside token consumption, cost per inference, accuracy rates, and throttling events. These percentile metrics are crucial because average latency can mask performance issues; for example, a 200 ms average might hide that 5% of requests take more than 2 seconds, directly impacting customer experience.

Conclusion
After implementing Amazon Bedrock, Swann saw immediate improvements—customers received fewer but more relevant alerts. Alert volume dropped 25% while notification relevance increased 89%, and customer satisfaction increased by 3%. The system scales across 11.74 million devices with sub-300 ms p95 latency, demonstrating that sophisticated generative AI capabilities can be deployed cost-effectively in consumer IoT products. Dynamic notifications (shown in the following image) deliver context-aware security alerts.

The Notify Me When feature (shown in the following video) demonstrates intelligent customization. Users define what matters to them using natural language, such as “notify me if a dog enters the backyard” or “notify me if a child is near the swimming pool,” enabling truly personalized security monitoring.

Next steps
Organizations considering generative AI at scale should start with a clear, measurable business problem and pilot with a subset of devices before full deployment, optimizing for cost from day one through intelligent business logic and tiered model selection. Invest in comprehensive monitoring to enable continuous optimization and design architecture for graceful degradation to verify reliability even during service disruptions. Focus on prompt engineering and token optimization early to help deliver performance and cost improvements. Use managed services like Amazon Bedrock to handle infrastructure complexity and build flexible architecture that supports future model improvements and evolving AI capabilities.
Explore additional resources

Get Started with Amazon Bedrock
Amazon Bedrock Nova Models
Amazon Bedrock pricing
Prompt engineering concepts
Submit a model distillation job in Amazon Bedrock

About the authors
Aman Sharma is an Enterprise Solutions Architect at AWS, where he works with enterprise retail and supply chain customers across ANZ. With more than 21 years of experience in consulting, architecting, and solution design, passionate about democratizing AI and ML, helping customers design data and ML strategies. Outside of work, he enjoys exploring nature and wildlife photography.
Surjit Reghunathan is the Chief Technology Officer at Swann Communications, where he leads technology innovation and strategic direction for the company’s global IoT security platform. With expertise in scaling connected device solutions, Surjit drives the integration of AI and machine learning capabilities across Swann’s product portfolio. Outside of work, he enjoys long motorcycle rides and playing guitar.
Suraj Padinjarute is a Technical Account Manager at AWS, helping retail and supply chain customers maximize the value of their cloud investments. With over 20 years of IT experience in database administration, application support, and cloud transformation, he is passionate about enabling customers on their cloud journey. Outside of work, Suraj enjoys long-distance cycling and exploring the outdoors.

Google AI Introduces Natively Adaptive Interfaces (NAI): An Agentic Mu …

Google Research is proposing a new way to build accessible software with Natively Adaptive Interfaces (NAI), an agentic framework where a multimodal AI agent becomes the primary user interface and adapts the application in real time to each user’s abilities and context.

Instead of shipping a fixed UI and adding accessibility as a separate layer, NAI pushes accessibility into the core architecture. The agent observes, reasons, and then modifies the interface itself, moving from one-size-fits-all design to context-informed decisions.

What Natively Adaptive Interfaces (NAI) Change in the Stack?

NAI starts from a simple premise: if an interface is mediated by a multimodal agent, accessibility can be handled by that agent instead of by static menus and settings.

Key properties include:

The multimodal AI agent is the primary UI surface. It can see text, images, and layouts, listen to speech, and output text, speech, or other modalities.

Accessibility is integrated into this agent from the beginning, not bolted on later. The agent is responsible for adapting navigation, content density, and presentation style to each user.

The design process is explicitly user-centered, with people with disabilities treated as edge users who define requirements for everyone, not as an afterthought.

The framework targets what Google team calls the ‘accessibility gap’– the lag between adding new product features and making them usable for people with disabilities. Embedding agents into the interface is meant to reduce this gap by letting the system adapt without waiting for custom add-ons.

Agent Architecture: Orchestrator and Specialized Tools

Under NAI, the UI is backed by a multi-agent system. The core pattern is:

An Orchestrator agent maintains shared context about the user, the task, and the app state.

Specialized sub-agents implement focused capabilities, such as summarization or settings adaptation.

A set of configuration patterns defines how to detect user intent, add relevant context, adjust settings, and correct flawed queries.

For example, in NAI case studies around accessible video, Google team outlines core agent capabilities such as:

Understand user intent.

Refine queries and manage context across turns.

Engineer prompts and tool calls in a consistent way.

From a systems point of view, this replaces static navigation trees with dynamic, agent-driven modules. The ‘navigation model’ is effectively a policy over which sub-agent to run, with what context, and how to render its result back into the UI.

Multimodal Gemini and RAG for Video and Environments

NAI is explicitly built on multimodal models like Gemini and Gemma that can process voice, text, and images in a single context.

In the case of accessible video, Google describes a 2-stage pipeline:

Offline indexing

The system generates dense visual and semantic descriptors over the video timeline.

These descriptors are stored in an index keyed by time and content.

Online retrieval-augmented generation (RAG)

At playback time, when a user asks a question such as “What is the character wearing right now?”, the system retrieves relevant descriptors.

A multimodal model conditions on these descriptors plus the question to generate a concise, descriptive answer.

This design supports interactive queries during playback, not just pre-recorded audio description tracks. The same pattern generalizes to physical navigation scenarios where the agent needs to reason over a sequence of observations and user queries.

Concrete NAI Prototypes

Google’s NAI research work is grounded in several deployed or piloted prototypes built with partner organizations such as RIT/NTID, The Arc of the United States, RNID, and Team Gleason.

StreetReaderAI

Built for blind and low-vision users navigating urban environments.

Combines an AI Describer that processes camera and geospatial data with an AI Chat interface for natural language queries.

Maintains a temporal model of the environment, which allows queries like ‘Where was that bus stop?’ and replies such as ‘It is behind you, about 12 meters away.’

Multimodal Agent Video Player (MAVP)

Focused on online video accessibility.

Uses the Gemini-based RAG pipeline above to provide adaptive audio descriptions.

Lets users control descriptive density, interrupt playback with questions, and receive answers grounded in indexed visual content.

Grammar Laboratory

A bilingual (American Sign Language and English) learning platform created by RIT/NTID with support from Google.org and Google.

Uses Gemini to generate individualized multiple-choice questions.

Presents content through ASL video, English captions, spoken narration, and transcripts, adapting modality and difficulty to each learner.

Design process and curb-cut effects

The NAI documentation describes a structured process: investigate, build and refine, then iterate based on feedback. In one case study on video accessibility, the team:

Defined target users across a spectrum from fully blind to sighted.

Ran co-design and user test sessions with about 20 participants.

Went through more than 40 iterations informed by 45 feedback sessions.

The resulting interfaces are expected to produce a curb-cut effect. Features built for users with disabilities – such as better navigation, voice interactions, and adaptive summarization – often improve usability for a much wider population, including non-disabled users who face time pressure, cognitive load, or environmental constraints.

Key Takeaways

Agent is the UI, not an add-on: Natively Adaptive Interfaces (NAI) treat a multimodal AI agent as the primary interaction layer, so accessibility is handled by the agent directly in the core UI, not as a separate overlay or post-hoc feature.

Orchestrator + sub-agents architecture: NAI uses a central Orchestrator that maintains shared context and routes work to specialized sub-agents (for example, summarization or settings adaptation), turning static navigation trees into dynamic, agent-driven modules.

Multimodal Gemini + RAG for adaptive experiences: Prototypes such as the Multimodal Agent Video Player build dense visual indexes and use retrieval-augmented generation with Gemini to support interactive, grounded Q&A during video playback and other rich media scenarios.

Real systems: StreetReaderAI, MAVP, Grammar Laboratory: NAI is instantiated in concrete tools: StreetReaderAI for navigation, MAVP for video accessibility, and Grammar Laboratory for ASL/English learning, all powered by multimodal agents.

Accessibility as a core design constraint: The framework encodes accessibility into configuration patterns (detect intent, add context, adjust settings) and leverages the curb-cut effect, where solving for disabled users improves robustness and usability for the broader user base.

Check out the Technical details here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post Google AI Introduces Natively Adaptive Interfaces (NAI): An Agentic Multimodal Accessibility Framework Built on Gemini for Adaptive UI Design appeared first on MarkTechPost.

How to Design Complex Deep Learning Tensor Pipelines Using Einops with …

In this tutorial, we walk through advanced usage of Einops to express complex tensor transformations in a clear, readable, and mathematically precise way. We demonstrate how rearrange, reduce, repeat, einsum, and pack/unpack let us reshape, aggregate, and combine tensors without relying on error-prone manual dimension handling. We focus on real deep-learning patterns, such as vision patchification, multi-head attention, and multimodal token mixing, and show how einops serves as a compact tensor manipulation language that integrates naturally with PyTorch. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserimport sys, subprocess, textwrap, math, time

def pip_install(pkg: str):
subprocess.check_call([sys.executable, “-m”, “pip”, “install”, “-q”, pkg])

pip_install(“einops”)
pip_install(“torch”)

import torch
import torch.nn as nn
import torch.nn.functional as F

from einops import rearrange, reduce, repeat, einsum, pack, unpack
from einops.layers.torch import Rearrange, Reduce

torch.manual_seed(0)
device = “cuda” if torch.cuda.is_available() else “cpu”
print(“Device:”, device)

def section(title: str):
print(“n” + “=” * 90)
print(title)
print(“=” * 90)

def show_shape(name, x):
print(f”{name:>18} shape = {tuple(x.shape)} dtype={x.dtype} device={x.device}”)

We set up the execution environment and ensure all required dependencies are installed dynamically. We initialize PyTorch, einops, and utility helpers that standardize device selection and shape inspection. We also establish reusable printing utilities that help us track tensor shapes throughout the tutorial.

Copy CodeCopiedUse a different Browsersection(“1) rearrange”)
x = torch.randn(2, 3, 4, 5, device=device)
show_shape(“x”, x)

x_bhwc = rearrange(x, “b c h w -> b h w c”)
show_shape(“x_bhwc”, x_bhwc)

x_split = rearrange(x, “b (g cg) h w -> b g cg h w”, g=3)
show_shape(“x_split”, x_split)

x_tokens = rearrange(x, “b c h w -> b (h w) c”)
show_shape(“x_tokens”, x_tokens)

y = torch.randn(2, 7, 11, 13, 17, device=device)
y2 = rearrange(y, “b … c -> b c …”)
show_shape(“y”, y)
show_shape(“y2”, y2)

try:
_ = rearrange(torch.randn(2, 10, device=device), “b (h w) -> b h w”, h=3)
except Exception as e:
print(“Expected error (shape mismatch):”, type(e).__name__, “-“, str(e)[:140])

We demonstrate how we use rearrange to express complex reshaping and axis-reordering operations in a readable, declarative way. We show how to split, merge, and permute dimensions while preserving semantic clarity. We also intentionally trigger a shape error to illustrate how Einops enforces shape safety at runtime.

Copy CodeCopiedUse a different Browsersection(“2) reduce”)
imgs = torch.randn(8, 3, 64, 64, device=device)
show_shape(“imgs”, imgs)

gap = reduce(imgs, “b c h w -> b c”, “mean”)
show_shape(“gap”, gap)

pooled = reduce(imgs, “b c (h ph) (w pw) -> b c h w”, “mean”, ph=2, pw=2)
show_shape(“pooled”, pooled)

chmax = reduce(imgs, “b c h w -> b c”, “max”)
show_shape(“chmax”, chmax)

section(“3) repeat”)
vec = torch.randn(5, device=device)
show_shape(“vec”, vec)

vec_batched = repeat(vec, “d -> b d”, b=4)
show_shape(“vec_batched”, vec_batched)

q = torch.randn(2, 32, device=device)
q_heads = repeat(q, “b d -> b heads d”, heads=8)
show_shape(“q_heads”, q_heads)

We apply reduce and repeat to perform pooling, aggregation, and broadcasting operations without manual dimension handling. We compute global and local reductions directly within the transformation expression. We also show how repeating tensors across new dimensions simplifies batch and multi-head constructions.

Copy CodeCopiedUse a different Browsersection(“4) patchify”)
B, C, H, W = 4, 3, 32, 32
P = 8
img = torch.randn(B, C, H, W, device=device)
show_shape(“img”, img)

patches = rearrange(img, “b c (h p1) (w p2) -> b (h w) (p1 p2 c)”, p1=P, p2=P)
show_shape(“patches”, patches)

img_rec = rearrange(
patches,
“b (h w) (p1 p2 c) -> b c (h p1) (w p2)”,
h=H // P,
w=W // P,
p1=P,
p2=P,
c=C,
)
show_shape(“img_rec”, img_rec)

max_err = (img – img_rec).abs().max().item()
print(“Reconstruction max abs error:”, max_err)
assert max_err < 1e-6

section(“5) attention”)
B, T, D = 2, 64, 256
Hh = 8
Dh = D // Hh
x = torch.randn(B, T, D, device=device)
show_shape(“x”, x)

proj = nn.Linear(D, 3 * D, bias=False).to(device)
qkv = proj(x)
show_shape(“qkv”, qkv)

q, k, v = rearrange(qkv, “b t (three heads dh) -> three b heads t dh”, three=3, heads=Hh, dh=Dh)
show_shape(“q”, q)
show_shape(“k”, k)
show_shape(“v”, v)

scale = Dh ** -0.5
attn_logits = einsum(q, k, “b h t dh, b h s dh -> b h t s”) * scale
show_shape(“attn_logits”, attn_logits)

attn = attn_logits.softmax(dim=-1)
show_shape(“attn”, attn)

out = einsum(attn, v, “b h t s, b h s dh -> b h t dh”)
show_shape(“out (per-head)”, out)

out_merged = rearrange(out, “b h t dh -> b t (h dh)”)
show_shape(“out_merged”, out_merged)

We implement vision and attention mechanisms that are commonly found in modern deep learning models. We convert images into patch sequences and reconstruct them to verify reversibility and correctness. We then reshape projected tensors into a multi-head attention format and compute attention using einops.einsum for clarity and correctness.

Copy CodeCopiedUse a different Browsersection(“6) pack unpack”)
B, Cemb = 2, 128

class_token = torch.randn(B, 1, Cemb, device=device)
image_tokens = torch.randn(B, 196, Cemb, device=device)
text_tokens = torch.randn(B, 32, Cemb, device=device)
show_shape(“class_token”, class_token)
show_shape(“image_tokens”, image_tokens)
show_shape(“text_tokens”, text_tokens)

packed, ps = pack([class_token, image_tokens, text_tokens], “b * c”)
show_shape(“packed”, packed)
print(“packed_shapes (ps):”, ps)

mixer = nn.Sequential(
nn.LayerNorm(Cemb),
nn.Linear(Cemb, 4 * Cemb),
nn.GELU(),
nn.Linear(4 * Cemb, Cemb),
).to(device)

mixed = mixer(packed)
show_shape(“mixed”, mixed)

class_out, image_out, text_out = unpack(mixed, ps, “b * c”)
show_shape(“class_out”, class_out)
show_shape(“image_out”, image_out)
show_shape(“text_out”, text_out)
assert class_out.shape == class_token.shape
assert image_out.shape == image_tokens.shape
assert text_out.shape == text_tokens.shape

section(“7) layers”)
class PatchEmbed(nn.Module):
def __init__(self, in_channels=3, emb_dim=192, patch=8):
super().__init__()
self.patch = patch
self.to_patches = Rearrange(“b c (h p1) (w p2) -> b (h w) (p1 p2 c)”, p1=patch, p2=patch)
self.proj = nn.Linear(in_channels * patch * patch, emb_dim)

def forward(self, x):
x = self.to_patches(x)
return self.proj(x)

class SimpleVisionHead(nn.Module):
def __init__(self, emb_dim=192, num_classes=10):
super().__init__()
self.pool = Reduce(“b t c -> b c”, reduction=”mean”)
self.classifier = nn.Linear(emb_dim, num_classes)

def forward(self, tokens):
x = self.pool(tokens)
return self.classifier(x)

patch_embed = PatchEmbed(in_channels=3, emb_dim=192, patch=8).to(device)
head = SimpleVisionHead(emb_dim=192, num_classes=10).to(device)

imgs = torch.randn(4, 3, 32, 32, device=device)
tokens = patch_embed(imgs)
logits = head(tokens)
show_shape(“tokens”, tokens)
show_shape(“logits”, logits)

section(“8) practical”)
x = torch.randn(2, 32, 16, 16, device=device)
g = 8
xg = rearrange(x, “b (g cg) h w -> (b g) cg h w”, g=g)
show_shape(“x”, x)
show_shape(“xg”, xg)

mean = reduce(xg, “bg cg h w -> bg 1 1 1”, “mean”)
var = reduce((xg – mean) ** 2, “bg cg h w -> bg 1 1 1”, “mean”)
xg_norm = (xg – mean) / torch.sqrt(var + 1e-5)
x_norm = rearrange(xg_norm, “(b g) cg h w -> b (g cg) h w”, b=2, g=g)
show_shape(“x_norm”, x_norm)

z = torch.randn(3, 64, 20, 30, device=device)
z_flat = rearrange(z, “b c h w -> b c (h w)”)
z_unflat = rearrange(z_flat, “b c (h w) -> b c h w”, h=20, w=30)
assert (z – z_unflat).abs().max().item() < 1e-6
show_shape(“z_flat”, z_flat)

section(“9) views”)
a = torch.randn(2, 3, 4, 5, device=device)
b = rearrange(a, “b c h w -> b h w c”)
print(“a.is_contiguous():”, a.is_contiguous())
print(“b.is_contiguous():”, b.is_contiguous())
print(“b._base is a:”, getattr(b, “_base”, None) is a)

section(“Done You now have reusable einops patterns for vision, attention, and multimodal token packing”)

We demonstrate reversible token packing and unpacking for multimodal and transformer-style workflows. We integrate Einops layers directly into PyTorch modules to build clean, composable model components. We conclude by applying practical tensor grouping and normalization patterns that reinforce how einops simplifies real-world model engineering.

In conclusion, we established Einops as a practical and expressive foundation for modern deep-learning code. We showed that complex operations like attention reshaping, reversible token packing, and spatial pooling can be written in a way that is both safer and more readable than traditional tensor operations. With these patterns, we reduced cognitive overhead and minimized shape bugs. We wrote models that are easier to extend, debug, and reason about while remaining fully compatible with high-performance PyTorch workflows.

Check out the FULL CODES here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post How to Design Complex Deep Learning Tensor Pipelines Using Einops with Vision, Attention, and Multimodal Examples appeared first on MarkTechPost.

Alibaba Open-Sources Zvec: An Embedded Vector Database Bringing SQLite …

Alibaba Tongyi Lab research team released ‘Zvec’, an open source, in-process vector database that targets edge and on-device retrieval workloads. It is positioned as ‘the SQLite of vector databases’ because it runs as a library inside your application and does not require any external service or daemon. It is designed for retrieval augmented generation (RAG), semantic search, and agent workloads that must run locally on laptops, mobile devices, or other constrained hardware/edge devices

The core idea is simple. Many applications now need vector search and metadata filtering but do not want to run a separate vector database service. Traditional server style systems are heavy for desktop tools, mobile apps, or command line utilities. An embedded engine that behaves like SQLite but for embeddings fits this gap.

https://zvec.org/en/blog/introduction/

Why embedded vector search matters for RAG?

RAG and semantic search pipelines need more than a bare index. They need vectors, scalar fields, full CRUD, and safe persistence. Local knowledge bases change as files, notes, and project states change.

Index libraries such as Faiss provide approximate nearest neighbor search but do not handle scalar storage, crash recovery, or hybrid queries. You end up building your own storage and consistency layer. Embedded extensions such as DuckDB-VSS add vector search to DuckDB but expose fewer index and quantization options and weaker resource control for edge scenarios. Service based systems such as Milvus or managed vector clouds require network calls and separate deployment, which is often overkill for on-device tools.

Zvec claims to fit in specifically for these local scenarios. It gives you a vector-native engine with persistence, resource governance, and RAG oriented features, packaged as a lightweight library.

Core architecture: in-process and vector-native

Zvec is implemented as an embedded library. You install it with pip install zvec and open collections directly in your Python process. There is no external server or RPC layer. You define schemas, insert documents, and run queries through the Python API.

The engine is built on Proxima, Alibaba Group’s high performance, production grade, battle tested vector search engine. Zvec wraps Proxima with a simpler API and embedded runtime. The project is released under the Apache 2.0 license.

Current support covers Python 3.10 to 3.12 on Linux x86_64, Linux ARM64, and macOS ARM64.

The design goals are explicit:

Embedded execution in process

Vector native indexing and storage

Production ready persistence and crash safety

This makes it suitable for edge devices, desktop applications, and zero-ops deployments.

Developer workflow: from install to semantic search

The quickstart documentation shows a short path from install to query.

Install the package:pip install zvec

Define a CollectionSchema with one or more vector fields and optional scalar fields.

Call create_and_open to create or open the collection on disk.

Insert Doc objects that contain an ID, vectors, and scalar attributes.

Build an index and run a VectorQuery to retrieve nearest neighbors.

Copy CodeCopiedUse a different Browserpip install zvec

Example:

Copy CodeCopiedUse a different Browserimport zvec

# Define collection schema
schema = zvec.CollectionSchema(
name=”example”,
vectors=zvec.VectorSchema(“embedding”, zvec.DataType.VECTOR_FP32, 4),
)

# Create collection
collection = zvec.create_and_open(path=”./zvec_example”, schema=schema,)

# Insert documents
collection.insert([
zvec.Doc(id=”doc_1″, vectors={“embedding”: [0.1, 0.2, 0.3, 0.4]}),
zvec.Doc(id=”doc_2″, vectors={“embedding”: [0.2, 0.3, 0.4, 0.1]}),
])

# Search by vector similarity
results = collection.query(
zvec.VectorQuery(“embedding”, vector=[0.4, 0.3, 0.3, 0.1]),
topk=10
)

# Results: list of {‘id’: str, ‘score’: float, …}, sorted by relevance
print(results)

Results come back as dictionaries that include IDs and similarity scores. This is enough to build a local semantic search or RAG retrieval layer on top of any embedding model.

Performance: VectorDBBench and 8,000+ QPS

Zvec is optimized for high throughput and low latency on CPUs. It uses multithreading, cache friendly memory layouts, SIMD instructions, and CPU prefetching.

In VectorDBBench on the Cohere 10M dataset, with comparable hardware and matched recall, Zvec reports more than 8,000 QPS. This is more than 2× the previous leaderboard #1, ZillizCloud, while also substantially reducing index build time in the same setup.

https://zvec.org/en/blog/introduction/

These metrics show that an embedded library can reach cloud level performance for high volume similarity search, as long as the workload resembles the benchmark conditions.

RAG capabilities: CRUD, hybrid search, fusion, reranking

The feature set is tuned for RAG and agentic retrieval.

Zvec supports:

Full CRUD on documents so the local knowledge base can change over time.

Schema evolution to adjust index strategies and fields.

Multi vector retrieval for queries that combine several embedding channels.

A built in reranker that supports weighted fusion and Reciprocal Rank Fusion.

Scalar vector hybrid search that pushes scalar filters into the index execution path, with optional inverted indexes for scalar attributes.

This allows you to build on device assistants that mix semantic retrieval, filters such as user, time, or type, and multiple embedding models, all within one embedded engine.

Key Takeaways

Zvec is an embedded, in-process vector database positioned as the ‘SQLite of vector database’ for on-device and edge RAG workloads.

It is built on Proxima, Alibaba’s high performance, production grade, battle tested vector search engine, and is released under Apache 2.0 with Python support on Linux x86_64, Linux ARM64, and macOS ARM64.

Zvec delivers >8,000 QPS on VectorDBBench with the Cohere 10M dataset, achieving more than 2× the previous leaderboard #1 (ZillizCloud) while also reducing index build time.

The engine provides explicit resource governance via 64 MB streaming writes, optional mmap mode, experimental memory_limit_mb, and configurable concurrency, optimize_threads, and query_threads for CPU control.

Zvec is RAG ready with full CRUD, schema evolution, multi vector retrieval, built in reranking (weighted fusion and RRF), and scalar vector hybrid search with optional inverted indexes, plus an ecosystem roadmap targeting LangChain, LlamaIndex, DuckDB, PostgreSQL, and real device deployments.

Check out the Technical details and Repo. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post Alibaba Open-Sources Zvec: An Embedded Vector Database Bringing SQLite-like Simplicity and High-Performance On-Device RAG to Edge Applications appeared first on MarkTechPost.

How Amazon uses Amazon Nova models to automate operational readiness t …

Amazon is a global ecommerce and technology company that operates a vast network of fulfillment centers to store, process, and ship products to customers worldwide. The Amazon Global Engineering Services (GES) team is responsible for facilitating operational readiness across the company’s rapidly expanding network of fulfillment centers. When launching new fulfillment centers, Amazon must verify that each facility is properly equipped and ready for operations. This process is called operational readiness testing (ORT) and typically requires 2,000 hours of manual effort per facility to verify over 200,000 components across 10,500 workstations. Using Amazon Nova models, we’ve developed an automated solution that significantly reduces verification time while improving accuracy.
In this post, we discuss how Amazon Nova in Amazon Bedrock can be used to implement an AI-powered image recognition solution that automates the detection and validation of module components, significantly reducing manual verification efforts and improving accuracy.
Understanding the ORT Process
ORT is a comprehensive verification process that makes sure the components are properly installed before our fulfillment center is ready for launch. The bill of materials (BOM) serves as the master checklist, detailing every component that should be present in each module of the facility. Each component or item in the fulfillment center is assigned a unique identification number (UIN) that serves as its distinct identifier. These components are essential for accurate tracking, verification, and inventory management throughout the ORT process and beyond. In this post we will refer to UINs and components interchangeably.
The ORT workflow has five components:

Testing plan: Testers receive a testing plan, which includes a BOM that details the exact components and quantities required
Walk through: Testers walk through the fulfillment center and stop at each module to review the setup against the BOM. A module is a physical workstation or operational area
Verify: They verify proper installation and configuration of each UIN
Test: They perform functional testing (i.e. power, connectivity, etc.) on each component
Document: They document results for each UIN and move to next module

Finding the Right Approach
We evaluated multiple approaches to address the ORT automation challenge, with a focus on using image recognition capabilities from foundation models (FMs). Key factors in the decision-making process include:
Image Detection Capability: We selected Amazon Nova Pro for image detection after testing multiple AI models including Anthropic Claude Sonnet, Amazon Nova Pro, Amazon Nova Lite and Meta AI Segment Anything Model (SAM). Nova Pro met the criteria for production implementation.
Amazon Nova Pro Features:
Object Detection Capabilities

Purpose-built for object detection
Provides precise bounding box coordinates
Consistent detection results with bounding boxes

Image Processing

Built-in image resizing to a fixed aspect ratio
No manual resizing needed

Performance

Higher Request per Minute (RPM) quota on Amazon Bedrock
Higher Tokens per Minute (TPM) throughput
Cost-effective for large-scale detection

Serverless Architecture: We used AWS Lambda and Amazon Bedrock to maintain a cost-effective, scalable solution that didn’t require complex infrastructure management or model hosting.
Additional contextual understanding: To improve detection and reduce false positives, we used Anthropic Claude Sonnet 4.0 to generate text descriptions for each UIN and create detection parameters.
Solution Overview
The Intelligent Operational Readiness (IORA) solution includes several key services and is depicted in the architecture diagram that follows:

API Gateway: Amazon API Gateway handles user requests and routes to the appropriate Lambda functions
Synchronous Image Processing: Amazon Bedrock Nova Pro analyzes images with 2-5 second response times
Progress Tracking: The system tracks UIN detection progress (% UINs detected per module)
Data Storage: Amazon Simple Storage Service (S3) is used to store module images, UIN reference pictures, and results. Amazon DynamoDB is used for storing structured verification data
Compute: AWS Lambda is used for image analysis and data operations
Model inference: Amazon Bedrock is used for real-time inference for object detection as well as batch inference for description generation

Description Generation Pipeline
The description generation pipeline is one of the key systems that work together to automate the ORT process. The first is the description generation pipeline, which creates a standardized knowledge base for component identification and is run as a batch process when new modules are introduced. Images taken at the fulfillment center have different lighting conditions and camera angles, which can impact the ability of the model to consistently detect the right component. By using high-quality reference images, we can generate standardized descriptions for each UIN. We then generate detection rules using the BOM, which lists out the required UINs in each module, their associated quantities and specifications. This process makes sure that each UIN has a standardized description and appropriate detection rules, creating a robust foundation for the subsequent detection and evaluation processes.
The workflow is as follows:

Admin uploads UIN images and BOM data
Lambda function triggers two parallel processes:

Path A: UIN description generation

Process each UIN’s reference images through Claude Sonnet 4.0
Generate detailed UIN descriptions
Consolidate multiple descriptions into one description per UIN
Store consolidated descriptions in DynamoDB

Path B: Detection rule creation

Combine UIN descriptions with BOM data
Generate module-specific detection rules
Create false positive detection patterns
Store rules in DynamoDB

# UIN Description Generation Process
def generate_uin_descriptions(uin_images, bedrock_client):
“””
Generate enhanced UIN descriptions using Claude Sonnet
“””
for uin_id, image_set in uin_images.items():
# First pass: Generate initial descriptions from multiple angles
initial_descriptions = []
for image in image_set:
response = bedrock_client.invoke_model(
modelId=’anthropic.claude-4-sonnet-20240229-v1:0′,
body=json.dumps({
‘messages’: [
{
‘role’: ‘user’,
‘content’: [
{‘type’: ‘image’, ‘source’: {‘type’: ‘base64’, ‘data’: image}},
{‘type’: ‘text’, ‘text’: ‘Describe this UIN component in detail, including physical characteristics, typical installation context, and identifying features.’}
]
}
]
})
)
initial_descriptions.append(response[‘content’][0][‘text’])

# Second pass: Consolidate and enrich descriptions
consolidated_description = consolidate_descriptions(initial_descriptions, bedrock_client)

# Store in DynamoDB for quick retrieval
store_uin_description(uin_id, consolidated_description)

False positive detection patterns
To improve output consistency, we optimized the prompt by adding additional rules for common false positives. This helps filter out objects that are not relevant for detection. For instance, triangle signs should have a gate number and arrow and generic signs should not be detected.

3:
generic_object: “Any triangular sign or warning marker”
confused_with: “SIGN.GATE.TRIANGLE”
▼ distinguishing_features:
0: “Gate number text in black at top (e.g., ‘GATE 2350’)”
1: “Red downward-pointing arrow at bottom”
2: “Red border with white background”
3: “Black mounting system with suspension hardware”

trap_description: “Generic triangle sign ≠ SIGN.GATE.TRIANGLE without gate number and red arrow”

UIN Detection Evaluation Pipeline
This pipeline handles real-time component verification. We input the images taken by the tester, module-specific detection rules, and the UIN descriptions to Nova Pro using Amazon Bedrock. The outputs are the detected UINs with bounding boxes, along with installation status, defect identification, and confidence scores.

# UIN Detection Configuration
detection_config = {
‘model_selection’: ‘nova-pro’, # or ‘claude-sonnet’
‘module_config’: module_id,
‘prompt_engineering’: {
‘system_prompt’: system_prompt_template,
‘agent_prompt’: agent_prompt_template
},
‘data_sources’: {
‘s3_images_path’: f’s3://amzn-s3-demo-bucket/images/{module_id}/’,
‘descriptions_table’: ‘uin-descriptions’,
‘ground_truth_path’: f’s3://amzn-s3-demo-bucket/ground-truth/{module_id}/’
}
}

The Lambda function processes each module image using the selected configuration:

def detect_uins_in_module(image_data, module_bom, uin_descriptions):
“””
Detect UINs in module images using Nova Pro
“””
# Retrieve relevant UIN descriptions for the module
relevant_descriptions = get_descriptions_for_module(module_bom, uin_descriptions)

# Construct detection prompt with descriptions
detection_prompt = f”””
Analyze this module image to detect the following components:
{format_uin_descriptions(relevant_descriptions)}
For each UIN, provide:
– Detection status (True/False)
– Bounding box coordinates if detected
– Confidence score
– Installation status verification
– Any visible defects
“””

# Process with Amazon Bedrock Nova Pro
response = bedrock_client.invoke_model(
modelId=’amazon.nova-pro-v1:0′,
body=json.dumps({
‘messages’: [
{
‘role’: ‘user’,
‘content’: [
{‘type’: ‘image’, ‘source’: {‘type’: ‘base64’, ‘data’: image_data}},
{‘type’: ‘text’, ‘text’: detection_prompt}
]
}
]
})
)
return parse_detection_results(response)

End-to-End Application Pipeline
The application brings everything together and provides testers in the fulfillment center with a production-ready user interface. It also provides comprehensive analysis including precise UIN identification, bounding box coordinates, installation status verification, and defect detection with confidence scoring.
The workflow, which is reflected in the UI, is as follows:

A tester securely uploads the images to Amazon S3 from the frontend—either by taking a photo or uploading it manually. Images are automatically encrypted at rest in S3 using AWS Key Management Service (AWS KMS).
This triggers the verification, which calls the API endpoint for UIN verification. API calls between services use AWS Identity and Access Management (IAM) role-based authentication.
A Lambda function retrieves the images from S3.
Amazon Nova Pro detects required UINs from each image.
The results of the UIN detection are stored in DynamoDB with encryption enabled.

The following figure shows the UI after an image has been uploaded and processed. The information includes the UIN name, a description, when it was last updated, and so on.

The following image is of a dashboard in the UI that the user can use to review the results and manually override any inputs if necessary.
Results & Learnings
After building the prototype, we tested the solution in multiple fulfillment centers using Amazon Kindle tablets. We achieved 92% precision on a representative set of test modules with 2–5 seconds latency per image. Compared to manual operational readiness testing, IORA reduces the total testing time by 60%. Amazon Nova Pro was also able to identify missing labels from the ground truth data, which gave us an opportunity to improve the quality of the dataset.

“The precision results directly translate to time savings – 40% coverage equals 40% time reduction for our field teams. When the solution detects a UIN, our fulfillment center teams can confidently focus only on finding missing components.”
– Wayne Jones, Sr Program Manager, Amazon General Engineering Services

Key learnings:

Amazon Nova Pro excels at visual recognition tasks when provided with rich contextual descriptions, and outperforms accuracy using standalone image comparison.
Ground truth data quality significantly impacts model performance. The solution identified missing labels in the original dataset and helps improve human labelled data.
Modules with less than 20 UINs performed best, and we saw performance degradation for modules with 40 or more UINs. Hierarchical processing is needed for modules with over 40 components.
The serverless architecture using Lambda and Amazon Bedrock provides cost-effective scalability without infrastructure complexity.

Conclusion
This post demonstrates how to use Amazon Nova and Anthropic Claude Sonnet in Amazon Bedrock to build an automated image recognition solution for operational readiness testing. We showed you how to:

Process and analyze images at scale using Amazon Nova models
Generate and enrich component descriptions to improve detection accuracy
Build a reliable pipeline for real-time component verification
Store and manage results efficiently using managed storage services

This approach can be adapted for similar use cases that require automated visual inspection and verification across various industries including manufacturing, logistics, and quality assurance. Moving forward, we plan to enhance the system’s capabilities, conduct pilot implementations, and explore broader applications across Amazon operations.
For more information about Amazon Nova and other foundation models in Amazon Bedrock, visit the Amazon Bedrock documentation page.

About the Authors
Bishesh Adhikari is a Senior ML Prototyping Architect at AWS with over a decade of experience in software engineering and AI/ML. Specializing in generative AI, LLMs, NLP, CV, and GeoSpatial ML, he collaborates with AWS customers to build solutions for challenging problems through co-development. His expertise accelerates customers’ journey from concept to production, tackling complex use cases across various industries. In his free time, he enjoys hiking, traveling, and spending time with family and friends.
Hin Yee Liu is a Senior GenAI Engagement Manager at AWS. She leads AI prototyping engagements on complex technical challenges, working closely with customers to deliver production-ready solutions leveraging Generative AI, AI/ML, Big Data, and Serverless technologies through agile methodologies. Outside of work, she enjoys pottery, travelling, and trying out new restaurants around London.
Akhil Anand is a Program Manager at Amazon, passionate about using technology and data to solve critical business problems and drive innovation. He focuses on using data as a core foundation and AI as a powerful layer to accelerate business growth. Akhil collaborates closely with tech and business teams at Amazon to translate ideas into scalable solutions, facilitating a strong user-first approach and rapid product development. Outside of work, Akhil enjoys continuous learning, collaborating with friends to build new solutions, and watching Formula 1.
Zakaria Fanna is a Senior AI Prototyping Engineer at Amazon with over 15 years of experience across diverse IT domains, including Networking, DevOps, Automation, and AI/ML. He specializes in rapidly developing Minimum Viable Products (MVPs) for internal users. Zakaria enjoys tackling challenging technical problems and helping customers scale their solutions by leveraging cutting-edge technologies. In his free time, Zakaria enjoys continuous learning, sports, and cherishes time spent with his children and family.
Elad Dwek is a Senior AI Business Developer at Amazon, working within Global Engineering, Maintenance, and Sustainability. He partners with stakeholders from business and tech side to identify opportunities where AI can enhance business challenges or completely transform processes, driving innovation from prototyping to production. With a background in construction and physical engineering, he focuses on change management, technology adoption, and building scalable, transferable solutions that deliver continuous improvement across industries. Outside of work, he enjoys traveling around the world with his family.
Palash Choudhury is a Software Development Engineer at AWS Corporate FP&A with over 10 years of experience across frontend, backend, and DevOps technologies. He specializes in developing scalable solutions for corporate financial allocation challenges and actively leverages AI/ML technologies to automate workflows and solve complex business problems. Passionate about innovation, Palash enjoys experimenting with emerging technologies to transform traditional business processes.

Iberdrola enhances IT operations using Amazon Bedrock AgentCore

Iberdrola, one of the world’s largest utility companies, has embraced cutting-edge AI technology to revolutionize its IT operations in ServiceNow. By using different agentic architectures, Iberdrola has transformed the way thousands of change requests and incident tickets are managed, streamlining processes and enhancing productivity across departments.
Through its partnership with AWS, Iberdrola implemented those agents in a groundbreaking solution using Amazon Bedrock AgentCore, targeting three key areas: optimizing change request validation in the draft phase, enriching incident management with contextual intelligence, and simplifying change model selection using conversational AI. These innovations reduce bottlenecks, help teams accelerate ticket resolution, and deliver consistent and high-quality data handling throughout the organization.
Amazon Bedrock AgentCore helps Iberdrola deploy production-ready AI agents seamlessly. With serverless compute capabilities, robust security, and integrated observability, the platform helps Iberdrola scale solutions across departments while adhering to enterprise-grade reliability and compliance standards.
Challenges with change and incident management
Iberdrola has simplified the multi-phase process of change management using AI-powered validation. A group of orchestrated agents make sure requests align with intended modifications while formatting and verifying mandatory fields in real time. This optimized approach avoids manual resubmissions and drastically reduces processing times, helping teams focus on driving impactful outcomes.
Using a swarm of agents to perform contextual enrichment, Iberdrola’s networking department now processes incidents faster and with greater precision. This enrichment lets technicians access configuration item details, review related historical incidents, and categorize tickets by environment and alert types, enhancing response times and enabling teams to swiftly address critical issues.
Solution overview
Iberdrola establishes its agentic AI practice through a layered architecture that separates operational concerns while enabling seamless integration across IT workflows. ServiceNow serves as the primary input source, and a MicroGateway provides intelligent routing to direct requests to relevant agents. A dedicated data layer maintains enterprise information, processing raw ServiceNow data through extract, transform, and load (ETL) pipelines for agent consumption.

The architecture comprises three layers:

Agentic AI resources – This layer encompasses all agent deployments, Model Context Protocol (MCP) servers for standardized data access, authentication mechanisms, and memory objects that maintain contextual information. The design enables domain-specific agent development while sharing common infrastructure services.
Inference layer – A streamlined abstraction provides large language model (LLM) inference capabilities from the organization’s portfolio of integrated models. This layer provides consistent model access patterns while supporting experimentation without requiring agent modifications.
Data layer – A comprehensive information foundation contains operational data, analytical datasets, and transactional records. This layer enriches agent capabilities by providing access to historical patterns, real-time operational status, and contextual information necessary for intelligent decision-making.

This design enables three distinct use cases that address different operational challenges:

Enhanced change management validation – The first implementation supports the draft phase of Iberdrola’s change management process through a deterministic agentic workflow. Multiple specialized agents work in sequence to validate change model appropriateness and verify that mandatory fields contain correctly formatted information. When validation errors are detected, the system provides clear feedback to requesters before allowing progression to subsequent phases.
Intelligent incident enrichment – The incident management solution demonstrates multi-agent orchestration for Iberdrola’s Networking department. A master agent receives each incident and selectively engages specialized agents for tagging, contextual enrichment, similarity detection, and change impact analysis. This adaptive approach assists technicians by categorizing incidents, identifying related historical cases, and extracting configuration item details.
Conversational change model assistant – The third use case addresses the complexity of selecting appropriate change models through a conversational AI assistant. The agent collects information about technology families, change objectives, and deployment environments to recommend suitable change models. The system provides clickable recommendations that open pre-filled change forms, streamlining the change request process.

The conceptual architecture translates into a production-ready implementation through Amazon Bedrock AgentCore, which provides managed primitives for building and deploying enterprise AI agents. The serverless approach of Amazon Bedrock AgentCore enables Iberdrola to focus on agent logic rather than infrastructure management while providing scalability and operational reliability.

Amazon Bedrock AgentCore components
AgentCore Runtime serves as the foundation for agent deployment, accepting containerized agents built with any framework—in Iberdrola’s case, LangGraph—and deploying them through Amazon Elastic Container Registry (Amazon ECR) repositories. AgentCore Runtime maintains serverless characteristics, scaling based on request volume while providing session isolation. Each agent session can run up to 8 hours for complex workflows. Logs and metrics generated by AgentCore Runtime are automatically captured by AgentCore Observability. In addition, Iberdrola has configured explicit logging to their self-hosted Langfuse instance for centralized monitoring.
AgentCore Memory provides contextual continuity across agent interactions by maintaining memory objects per agent session. Using the memory object, agents can store and retrieve session state, conversation history, and intermediate processing results. This capability is essential for Iberdrola’s multi-step workflows where agents must maintain context across validation phases or incident enrichment processes.
AgentCore Gateway simplifies tool integration by acting as an MCP server that “MCPifies” external tools and services. Rather than requiring custom integration code for each data source, AgentCore Gateway provides standardized interfaces that agents can consume consistently. This approach is particularly valuable for Iberdrola’s ServiceNow endpoint connections.
AgentCore Identity manages both inbound and outbound authentication flows, integrating with Entra ID through OAuth 2.0 protocols. For inbound requests, AgentCore Identity validates bearer tokens and authorizes access to underlying resources. For outbound operations, it handles token acquisition and manages secure communication with downstream tools.
AgentCore Observability captures telemetry data from agents using OpenTelemetry standards and surfaces this information through Amazon CloudWatch. This integration provides comprehensive monitoring of operational metrics without requiring additional instrumentation.
Technical implementation
The implementation uses LiteLLM as a proxy layer for consistent access to Amazon Nova and Anthropic Claude models through Amazon Bedrock and various other models. This abstraction enables agents to interact with different model variants using standardized API calls while Amazon Bedrock Guardrails provides safety controls for model outputs.
The architecture addresses Iberdrola’s enterprise security requirements through a virtual private cloud (VPC) configuration within AgentCore Runtime, so agents can securely access internal resources while maintaining network isolation. VPC endpoints provide secure communication with internal data sources without exposing traffic to the public internet.
Users initiate requests through ServiceNow, which communicates through a REST API to the MicroGateway that routes requests to appropriate use case agents. The data architecture implements a hybrid approach combining real-time operational access with enriched analytical datasets. Raw ServiceNow data flows through ETL processes into Amazon Simple Storage Service (Amazon S3) storage, then into Amazon Relational Database Service (Amazon RDS) databases enhanced with pgvector extensions for semantic search.
The logs and metrics generated by the agents deployed in AgentCore Runtime can be monitored using AgentCore Observability. In addition, Iberdrola uses self-hosted Langfuse on Amazon Elastic Kubernetes Service (Amazon EKS) for a holistic view of spans and traces generated by the LLMs and the agents.
Use case details
In this section, we discuss the implementation of two use cases mentioned earlier: enhanced change management and intelligent incident management.
Enhanced change management
The first use case demonstrates an agentic workflow that supports the draft phase of Iberdrola’s change management process through sequential agent execution within a single AgentCore Runtime. The workflow processes change requests through four specialized agents—Rule Extractor, Content Validator, AIM Model Analyst, and Phase Transition—with each agent receiving context from the previous step.
The implementation consists of the following key components:

Single runtime context flow – Agents operate within one AgentCore Runtime instance, maintaining seamless context and session state across the entire validation pipeline
LangGraph orchestration – Agents are defined as a graph structure, enabling visual workflow representation, conditional branching based on validation results, and comprehensive audit trails
Vector-enhanced validation – Pgvector-enabled PostgreSQL supports semantic similarity searches, enabling the AIM Model Analyst agent to match change models based on technical descriptions rather than keyword matching
Consistent processing – Change requests follow identical validation steps, meeting compliance requirements and quality standards

Intelligent incident management
The second use case demonstrates intelligent multi-agent orchestration for incident management, where a Smart Solver Agent analyzes incoming incidents and selectively engages specialized agents based on contextual needs. This implementation adapts processing steps to each incident’s unique characteristics, optimizing resource utilization while providing comprehensive enrichment when needed.
The implementation consists of the following key components:

Intelligent orchestration – The Smart Solver Agent analyzes incident content and determines which specialized agents to invoke based on missing context and potential value-add
Specialized agent engagement – Five specialized agents (Tag Classifier, Incident Similarity, Incident Associator, Change Associator, Context Retriever) are available to provide enrichment based on the detail and complexity of the incident
Adaptive processing – The system adjusts enrichment activities based on incident complexity—simple incidents might only require tagging, whereas complex issues receive full contextual analysis

Lessons learned
The implementation of AI agents at Iberdrola demonstrates how the managed primitives of Amazon Bedrock AgentCore significantly accelerate enterprise AI deployment. Amazon Bedrock AgentCore minimized the infrastructure complexity typically required for agentic AI, helping teams focus on agent logic while achieving scalable and secured cloud resources.“At Iberdrola, we’re extending our production AI platform with a new agentic capability powered by Amazon Bedrock AgentCore,” says Iñigo Gutierrez, AI Global Expert Engineer at Iberdrola. “By using a managed serverless runtime with built-in identity, memory, and observability, we can ship LangGraph-based agents that plan, call tools through MCP-style gateways, and operate securely inside our VPC. This feature moves us from point automations to reusable, production-grade agents—reducing engineering cognitive load and accelerating safe delivery across IT operations.”
Key success factors
The solution offers the following key benefits:

Purpose-built runtime – AgentCore Runtime provides a fully-managed quick start environments to host AI agents with complete session isolation. Additionally, out-of-the-box streaming and MCP and A2A support from AgentCore Runtime alleviate the need to develop custom solutions and build support for these protocols.
Managed infrastructure – The serverless compute runtimes, identity, and memory services of Amazon Bedrock AgentCore minimize custom development overhead for enterprise-grade capabilities.
Enterprise security – VPC support and comprehensive tagging aligns with stringent IT requirements, accelerating development without compromising security standards.
Open and framework-agnostic – Amazon Bedrock AgentCore fits well with development guidelines because you can choose the development framework, such as LangGraph, by adding a simple decorator. Furthermore, it has no restrictions on using third-party or open-source solutions like Langfuse.
Scalable tool discovery – AgentCore Gateway automatically indexes tools and provides serverless semantic search, scaling from tens to hundreds of targets, totally managed.

Future roadmap
Iberdrola is considering the following future enhancements to the solution:

Agent catalog – Improve governance and discovery of agents seamlessly integrated into the Amazon Bedrock AgentCore ecosystem
New supported protocols and standards – Evolve Iberdrola’s agent development to use new protocols supported (such as A2A) by AgentCore Runtime and other managed services
Managed orchestration and real-time flow monitoring – Build platform-provided dashboards that automatically manage and monitor complex interactions between multiple AI agents, tools, or workflows

Conclusion
Iberdrola’s innovative implementation showcases its leadership and vision in using advanced AI technologies to transform its operational workflows. By adopting Amazon Bedrock AgentCore, Iberdrola has demonstrated how organizations can deploy production-ready AI agents with remarkable efficiency while meeting robust enterprise security and scalability standards. Through its strategic use of Amazon Bedrock AgentCore managed primitives, Iberdrola has realized substantial productivity gains and unparalleled improvements in data quality across its change and incident management processes. This successful transformation underscores Iberdrola’s commitment to excellence in using intelligent solutions to solve complex operational challenges. It also highlights the unique value proposition of Amazon Bedrock AgentCore: industry-first serverless compute for AI agents, integrated enterprise-grade security, and adaptable deployment patterns that accommodate diverse processing requirements. The platform’s ability to streamline infrastructure complexity while supporting specialized workflows makes it an ideal foundation for enterprise AI initiatives.
Organizations looking to implement AI agents in production environments can draw inspiration from Iberdrola’s architectural patterns and its effective execution of AI-driven solutions. Iberdrola’s success serves as a blueprint for accelerating deployments and achieving operational excellence with an Amazon Bedrock AgentCore managed approach, which reduces time-to-value and supports the scale and reliability demanded by enterprise AI systems.

About the authors
Talha Chattha is a Sr. Agentic AI Specialist SA at AWS, based in Stockholm. With 10+ years of experience working with AI, Talha now helps establish practices to ease the path to production for Agentic AI workloads. Talha is an expert in AgentCore and supports customers across entire EMEA. He holds passion about meta-agents, async patterns, advanced hierarchical solutions and optimized context engineering for agents. When not shaping the future of AI, he explores the scenic European landscapes and delicious cuisines. Connect with Talha at LinkedIn.
Unai Bermejo is a Global Expert AI Engineer at Iberdrola. With 10 years of experience in applied AI, AI research, and software engineering, Unai now helps Iberdrola establish best practices and frameworks in AI and agentic initiatives, aligned with corporate platforms and business needs. He acts as a technical bridge between AI technology, Cloud engineering teams, and business developers, driving the adoption of scalable, responsible, and high‑impact AI solutions across the organization.
Xabier Muruaga is the Global Head of AI and Data at Iberdrola. With over 15 years of experience in AI/ML and data‑driven architectures, he leads the company’s strategy and governance for secure, cloud‑native, and production‑ready AI platforms. His background across architecture, digital transformation, and energy technologies enables him to drive responsible, high‑impact AI and agentic initiatives across the organization.
Iñigo Gutierrez is a Global Cloud AI Engineer at Iberdrola with five years of experience in Cloud architecture, platform engineering, and AI enablement. Based in Bilbao, he is responsible for the design, evolution, and governance of the company’s corporate Cloud platforms, ensuring they provide a secure and scalable foundation for AI and digital transformation initiatives. Iñigo acts as a technical enabler between Cloud engineering teams, AI projects, and business units, promoting standardized practices, operational excellence, and the adoption of responsible, high‑impact AI solutions across the organization.

Building real-time voice assistants with Amazon Nova Sonic compared to …

Voice AI agents are reshaping how we interact with technology. From customer service and healthcare assistance to home automation and personal productivity, these intelligent virtual assistants are rapidly gaining popularity across industries. Their natural language capabilities, constant availability, and increasing sophistication make them valuable tools for businesses seeking efficiency and individuals desiring seamless digital experiences.
Amazon Nova Sonic delivers real-time, human-like voice conversations through the bidirectional streaming interface. It understands different speaking styles and generates expressive responses that adapt to both the words spoken and the way they are spoken. The model supports multiple languages and offers both masculine and feminine voices, making it ideal for customer support, marketing calls, voice assistants, and educational applications.
When compared with newer architectures such as Amazon Nova Sonic—which combines speech understanding and generation into a single end-to-end model—classic AI voice chat systems use cascading architectures with sequential processing. These systems process a user’s speech through a distinct pipeline: The cascaded models approach breaks down voice AI processing into separate components:

Voice activity detection (VAD): A pre-processing VAD is required to detect when the user pauses or stops speaking.
Speech-to-text (STT): The user’s spoken words are converted into a written text format by an automatic speech recognition (ASR) model.
Large language model (LLM) processing: The transcribed text is then fed to a LLM or dialogue manager, which analyzes the input and generates a relevant textual response based on the conversation’s context.
Text-to-speech (TTS): The AI’s text-based reply is then converted back into natural-sounding spoken audio by a TTS model, which is then played to the user.

The following diagram illustrates the conceptual flow of how users interact with Nova Sonic for real-time voice conversations compared to a cascading voice assistant solution.

The core challenges of cascading architecture
While a cascading architecture offers benefits such as modular design, specialized components, and debuggability, cumulative latency and reduced interactivity are its drawbacks.
The cascade effect
Consider a voice assistant handling a simple weather query. In cascading pipelines, each processing step introduces latency and potential errors. Customer implementations showed how initial misinterpretations can compound through the pipeline, often resulting in irrelevant responses. This cascading effect complicated troubleshooting and negatively impacted overall user experience.
Time is everything
Real conversations require natural timing. Sequential processing can create noticeable delays in response times. These interruptions in conversational flow can lead to user friction.
The integration challenge
Voice AI demands more than just speech processing—it requires natural interaction patterns. Customer feedback highlighted how orchestrating multiple components made it difficult to handle dynamic conversation elements like interruptions or rapid exchanges. Engineering resources often focused more on pipeline management.
Resource reality
Cascading architectures require independent computing resources, monitoring, and maintenance for each component. This architectural complexity impacts both development velocity and operational efficiency. Scaling challenges intensify as conversation volumes increase, affecting system reliability and cost optimization.
Impact on voice assistant development
These insights drove key architectural decisions in Nova Sonic development, addressing the fundamental need for unified speech-to-speech processing that enables natural, responsive voice experiences without the complexity of multi-component management.
Comparing the two approaches
To compare the speech-to-speech and cascaded approach to building voice AI agents, consider the following:

Consideration
Speech-to-speech (Nova Sonic)
Cascaded models

Latency
Optimized latency performance and TTFA  We evaluate the latency performance of Nova Sonic model using the Time to First Audio (TTFA 1.09) metric. TTFA measures the elapsed time from the completion of a user’s spoken query until the first byte of response audio is received. See technical report and model card.
Potential added latency and errors Cascaded models can use multiple models across speech recognition, language understanding, and voice generation, but are challenged by added latency and potential error propagation between stages. By using modern asynchronous orchestration frameworks like Pipecat and LiveKit, you can minimize latency. Streaming components and using text-to-speech fillers help maintain natural conversational flow and reduce delays

Architecture and development complexity
Simplified architecture Nova Sonic combines speech-to-text, natural language understanding, and text-to-speech in the one model with built-in tool use and barge-in detection, providing an event-driven architecture for key input and output events, and a bidirectional streaming API for a simplified developer experience.
Potential complexity in architecture Developers need to select best-in-class models for each stage of the pipeline, while orchestrating additional components such as asynchronous pipelines for delegated agents and tool use, TTS fillers and (VAD).

Model selection and customization
Less control over individual components Amazon Nova Sonic allows customization of voices, built-in tool use and integrations to Amazon Bedrock Knowledge Bases and Amazon Bedrock AgentCore. However, it offers less granular control over individual model components compared to fully modular cascaded systems.
Potential granular control over each step Cascaded models provide more control over each step by allowing individual tuning, replacement, and optimization of each model components such as STT, language understanding, and TTS independently. This includes models from Amazon Bedrock Marketplace, Amazon SageMaker AI and fine–tuned models. This modularity enables selection and flexibility of models, making it ideal for complex or specialized capabilities requiring tailored performance.

Cost structure
Simplified cost structure through an integrated approach Amazon Nova Sonic is priced on a token-based consumption model.
Potential complexity in costs associated with multiple components Cascaded models consist of multiple components whose costs need to be estimated. This is especially important at scale and high volumes.

Language and accent support
Languages supported by Nova Sonic
Potential broader language support through specialized models including the ability to switch languages mid-conversation

Region availability
Regions supported by Nova Sonic
Potential broader region support because of the broad selection of models and ability to self-host models on Amazon Elastic Kubernetes Service (Amazon EKS) or Amazon SageMaker.

The two approaches also have some shared traits.

Telephony and transport options
Both cascaded and speech-to-speech approaches support a variety of telephony and transport protocols such as WebRTC and WebSocket, enabling real-time, low-latency audio streaming over the web and phone networks. These protocols facilitate seamless, bidirectional audio exchange crucial for natural conversational experiences, allowing voice AI systems to integrate easily with existing communication infrastructures while maintaining responsiveness and audio quality.

Evaluations, observability, and testing
Both cascaded and speech-to-speech voice AI approaches can be systematically evaluated, observed, and tested for reliable comparison. Investing in a voice AI evaluation and observability system is recommended to gain confidence in production accuracy and performance. Such a system should be capable of tracing the entire input-to-output pipeline, capturing metrics and conversation data end-to-end to comprehensively assess quality, latency, and conversational robustness over time.

Developer frameworks
Both cascaded and speech-to-speech approaches are well supported by leading open-source voice AI frameworks like Pipecat and LiveKit. These frameworks provide modular, flexible pipelines and real-time processing capabilities that developers can use to build, customize, and orchestrate voice AI models efficiently across different components and interaction styles.

When to use each approach
The following diagram shows a practical framework to guide your architecture decision:

Use speech-to-speech when:

Simplicity of implementation is important
The use case fits within Nova Sonic’s capabilities
You’re looking for a real-time chat experience that feels human-like and delivers low latency

Use cascaded models when:

Customization of individual components is required
You need to use specialized models from the Amazon Bedrock Marketplace, Amazon SageMaker AI, or fine-tuned models for your specific domain
You need support for languages or accents not covered by Nova Sonic
The use case requires specialized processing at specific stages

Conclusion
In this post, you learned how Amazon Nova Sonic is designed to solve some of the challenges faced by cascaded approaches, simplify building voice AI agents, and provide natural conversational capabilities. We also provided guidance on when to choose each approach to help you make informed decisions for your voice AI projects. If you’re looking to enhance your cascaded voice system, you know have the basics of how to migrate to Nova Sonic so you can offer seamless, real-time conversational experiences with a simplified architecture.
To learn more, see Amazon Nova Sonic and contact your account team to explore how you can accelerate your voice AI initiatives.
Resources

Amazon Nova Sonic Technical Report and Model Card
Amazon Nova Sonic User Guide
Amazon Nova Sonic and Amazon Bedrock AgentCore
Amazon Nova Sonic Telephony Integration Guide
Amazon Nova Sonic and Pipecat
Amazon Nova Sonic and LiveKit

About the authors
Daniel Wirjo is a Solutions Architect at AWS, focused on AI and SaaS startups. As a former startup CTO, he enjoys collaborating with founders and engineering leaders to drive growth and innovation on AWS. Outside of work, Daniel enjoys taking walks with a coffee in hand, appreciating nature, and learning new ideas.
Ravi Thakur is a Sr Solutions Architect at AWS based in Charlotte, NC. He has cross‑industry experience across retail, financial services, healthcare, and energy & utilities, and specializes in solving complex business challenges using well‑architected cloud patterns. His expertise spans microservices, cloud‑native architectures, and generative AI. Outside of work, Ravi enjoys motorcycle rides and family getaways.
Lana Zhang is a Senior Specialist Solutions Architect for Generative AI at AWS within the Worldwide Specialist Organization. She specializes in AI/ML, with a focus on use cases such as AI voice assistants and multimodal understanding. She works closely with customers across diverse industries, including media and entertainment, gaming, sports, advertising, financial services, and healthcare, to help them transform their business solutions through AI.

Microsoft AI Proposes OrbitalBrain: Enabling Distributed Machine Learn …

Earth observation (EO) constellations capture huge volumes of high-resolution imagery every day, but most of it never reaches the ground in time for model training. Downlink bandwidth is the main bottleneck. Images can sit on orbit for days while ground models train on partial and delayed data.

Microsoft Researchers introduced ‘OrbitalBrain’ framework as a different approach. Instead of using satellites only as sensors that relay data to Earth, it turns a nanosatellite constellation into a distributed training system. Models are trained, aggregated, and updated directly in space, using onboard compute, inter-satellite links, and predictive scheduling of power and bandwidth.

OrbitalBrain: A Distributed Framework For Training ML Models in Space

The BentPipe Bottleneck

Most commercial constellations use the BentPipe model. Satellites collect images, store them locally, and dump them to ground stations whenever they pass overhead.

The research team evaluates a Planet-like constellation with 207 satellites and 12 ground stations. At maximum imaging rate, the system captures 363,563 images per day. With 300 MB per image and realistic downlink constraints, only 42,384 images can be transmitted in that period, around 11.7% of what was captured. Even if images are compressed to 100 MB, only 111,737 images, about 30.7%, reach the ground within 24 hours.

Limited onboard storage adds another constraint. Old images must be deleted to make room for new ones, which means many potentially useful samples are never available for ground-based training.

Why Conventional Federated Learning is not Enough

Federated learning (FL) seems like an obvious fit for satellites. Each satellite could train locally and send model updates to a ground server for aggregation. The research team evaluate several FL baselines adapted to this setting:

AsyncFL

SyncFL

FedBuff

FedSpace

However, these methods assume more stable communication and more flexible power than satellites can provide. When the research team simulate realistic orbital dynamics, intermittent ground contact, limited power, and non-i.i.d. data across satellites, these baselines show unstable convergence and large accuracy drops, in the range of 10%–40% compared to idealized conditions.

The time-to-accuracy curves flatten and oscillate, especially when satellites are isolated from ground stations for long periods. Many local updates become stale before they can be aggregated.

OrbitalBrain: Constellation-Centric Training in Space

OrbitalBrain starts from 3 observations:

Constellations are usually operated by a single commercial entity, so raw data can be shared across satellites.

Orbits, ground station visibility, and solar power are predictable from orbital elements and power models.

Inter-satellite links (ISLs) and onboard accelerators are now practical on nano-satellites.

The framework exposes 3 actions for each satellite in a scheduling window:

Local Compute (LC): train the local model on stored images.

Model Aggregation (MA): exchange and aggregate model parameters over ISLs.

Data Transfer (DT): exchange raw images between satellites to reduce data skew.

A controller running in the cloud, reachable via ground stations, computes a predictive schedule for each satellite. The schedule decides which action to prioritize in each future window, based on forecasts of energy, storage, orbital visibility, and link opportunities.

Core Components: Profiler, MA, DT, Executor

Guided performance profiler

Model aggregation over ISLs

Data transferrer for label rebalancing

Executor

Experimental setup

OrbitalBrain is implemented in Python on top of the CosmicBeats orbital simulator and the FLUTE federated learning framework. Onboard compute is modeled as an NVIDIA-Jetson-Orin-Nano-4GB GPU, with power and communication parameters calibrated from public satellite and radio specifications.

The research team simulate 24-hour traces for 2 real constellations:

Planet: 207 satellites with 12 ground stations.

Spire: 117 satellites.

They evaluate 2 EO classification tasks:

fMoW: around 360k RGB images, 62 classes, DenseNet-161 with the last 5 layers trainable.

So2Sat: around 400k multispectral images, 17 classes, ResNet-50 with the last 5 layers trainable.

Results: faster time-to-accuracy and higher accuracy

OrbitalBrain is compared with BentPipe, AsyncFL, SyncFL, FedBuff, and FedSpace under full physical constraints.

For fMoW, after 24 hours:

Planet: OrbitalBrain reaches 52.8% top-1 accuracy.

Spire: OrbitalBrain reaches 59.2% top-1 accuracy.

For So2Sat:

Planet: 47.9% top-1 accuracy.

Spire: 47.1% top-1 accuracy.

These results improve over the best baseline by 5.5%–49.5%, depending on dataset and constellation.

In terms of time-to-accuracy, OrbitalBrain achieves 1.52×–12.4× speedup compared to state-of-the-art ground-based or federated learning approaches. This comes from using satellites that cannot currently reach a ground station by aggregating over ISLs and from rebalancing data distributions via DT.

Ablation studies show that disabling MA or DT significantly degrades both convergence speed and final accuracy. Additional experiments indicate that OrbitalBrain remains robust when cloud cover hides part of the imagery, when only a subset of satellites participate, and when image sizes and resolutions vary.

Implications for satellite AI workloads

OrbitalBrain demonstrates that model training can move into space and that satellite constellations can act as distributed ML systems, not just data sources. By coordinating local training, model aggregation, and data transfer under strict bandwidth, power, and storage constraints, the framework enables fresher models for tasks like forest fire detection, flood monitoring, and climate analytics, without waiting days for data to reach terrestrial data centers.

Key Takeaways

BentPipe downlink is the core bottleneck: Planet-like EO constellations can only downlink about 11.7% of captured 300 MB images per day, and about 30.7% even with 100 MB compression, which severely limits ground-based model training.

Standard federated learning fails under real satellite constraints: AsyncFL, SyncFL, FedBuff, and FedSpace degrade by 10%–40% in accuracy when realistic orbital dynamics, intermittent links, power limits, and non-i.i.d. data are applied, leading to unstable convergence.

OrbitalBrain co-schedules compute, aggregation, and data transfer in orbit: A cloud controller uses forecasts of orbit, power, storage, and link opportunities to select Local Compute, Model Aggregation via ISLs, or Data Transfer per satellite, maximizing a utility function per action.

Label rebalancing and model staleness are handled explicitly: A guided profiler tracks model staleness and loss to define compute utility, while the data transferrer uses Jensen–Shannon divergence on label histograms to drive raw-image exchanges that reduce non-i.i.d. effects.

OrbitalBrain delivers higher accuracy and up to 12.4× faster time-to-accuracy: In simulations on Planet and Spire constellations with fMoW and So2Sat, OrbitalBrain improves final accuracy by 5.5%–49.5% over BentPipe and FL baselines and achieves 1.52×–12.4× speedups in time-to-accuracy.

Check out the Paper. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post Microsoft AI Proposes OrbitalBrain: Enabling Distributed Machine Learning in Space with Inter-Satellite Links and Constellation-Aware Resource Optimization Strategies appeared first on MarkTechPost.

Meet OAT: The New Action Tokenizer Bringing LLM-Style Scaling and Flex …

Robots are entering their GPT-3 era. For years, researchers have tried to train robots using the same autoregressive (AR) models that power large language models (LLMs). If a model can predict the next word in a sentence, it should be able to predict the next move for a robotic arm. However, a technical wall has blocked this progress: continuous robot movements are difficult to turn into discrete tokens.

A team of researchers from Harvard University and Stanford University have released a new framework called Ordered Action Tokenization (OAT) to bridge this gap.

https://arxiv.org/pdf/2602.04215

The Messy Reality of Robot Actions

Tokenization turns complex data into a sequence of discrete numbers (tokens). For robots, these actions are continuous signals like joint angles. Previous strategies had fatal flaws:

Binning: Turns every action dimension into a ‘bin.’ While simple, it creates massive sequences that make training and inference slow.

FAST (Frequency-space Action Sequence Tokenization): Uses math to compress movements into frequency coefficients. It is fast but often produces ‘undecodable’ sequences where small errors cause the robot to halt or move unpredictably.

Learned Latent Tokenizers: These use a learned ‘dictionary’ of movements. They are safe but lack a specific order, meaning the model treats early and late tokens as equally important.

https://arxiv.org/pdf/2602.04215

The Three Golden Rules of OAT

The research team identified 3 essential properties—desiderata—for a functional robot tokenizer:

High Compression (P.1): Token sequences must be short to keep models efficient.

Total Decodability (P.2): The decoder must be a total function, ensuring every possible token sequence maps to a valid movement.

Causal Ordering (P.3): Tokens must have a left-to-right structure where early tokens capture global motion and later tokens refine details.

The Secret Sauce: Nested Dropout and Registers

OAT uses a transformer encoder with register tokens to summarize action chunks. To force the model to learn ‘important’ things first, the research team used a innovative approach called Nested Dropout.

https://arxiv.org/pdf/2602.04215

Breaking the Benchmarks

The research team tested OAT across 20+ tasks in 4 major simulation benchmarks. OAT consistently outperformed the industry-standard Diffusion Policy (DP) and previous tokenizers.

Performance Results

BenchmarkOAT Success RateDP Success RateBin Token CountOAT Token CountLIBERO56.3% 36.6% 224 8 RoboMimic73.1% 67.1% 224 8 MetaWorld24.4% 19.3% 128 8 RoboCasa54.6% 54.0% 384 8

‘Anytime’ Inference: Speed vs. Precision

The most practical benefit of OAT is prefix-based detokenization. Since the tokens are ordered by importance, you can stop the model early.

Coarse Actions: Decoding just 1 or 2 tokens gives the robot a general direction quickly, which is useful for low-latency tasks.

Fine Actions: Generating all 8 tokens provides the high-precision details needed for complex insertions.

This allows for a smooth trade-off between computation cost and action fidelity that previous fixed-length tokenizers could not offer.

Key Takeaways

Solving the Tokenization Gap: OAT addresses a fundamental limitation in applying autoregressive models to robotics by introducing a learned tokenizer that simultaneously achieves high compression, total decodability, and causal ordering.

Ordered Representation via Nested Dropout: By utilizing nested dropout during training, OAT forces the model to prioritize global, coarse motion patterns in early tokens while reserving later tokens for fine-grained refinements.

Total Decodability and Reliability: Unlike prior frequency-domain methods like FAST, OAT ensures the detokenizer is a total function, meaning every possible token sequence generates a valid action chunk, preventing runtime execution failures.

Flexible ‘Anytime’ Inference: The ordered structure enables prefix-based decoding, allowing robots to execute coarse actions from just one or two tokens to save computation or full eight-token sequences for high-precision tasks.

Superior Performance Across Benchmarks: Autoregressive policies equipped with OAT consistently outperform diffusion-based baselines and other tokenization schemes, achieving a 52.3% aggregate success rate and superior results in real-world ‘Pick & Place’ and ‘Stack Cups’ tasks.

Check out the Paper, Repo and Project Page. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post Meet OAT: The New Action Tokenizer Bringing LLM-Style Scaling and Flexible, Anytime Inference to the Robotics World appeared first on MarkTechPost.

A Coding Implementation to Establish Rigorous Prompt Versioning and Re …

In this tutorial, we show how we treat prompts as first-class, versioned artifacts and apply rigorous regression testing to large language model behavior using MLflow. We design an evaluation pipeline that logs prompt versions, prompt diffs, model outputs, and multiple quality metrics in a fully reproducible manner. By combining classical text metrics with semantic similarity and automated regression flags, we demonstrate how we can systematically detect performance drift caused by seemingly small prompt changes. Along the tutorial, we focus on building a workflow that mirrors real software engineering practices, but applied to prompt engineering and LLM evaluation. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browser!pip -q install -U “openai>=1.0.0” mlflow rouge-score nltk sentence-transformers scikit-learn pandas

import os, json, time, difflib, re
from typing import List, Dict, Any, Tuple

import mlflow
import pandas as pd
import numpy as np

from openai import OpenAI
from rouge_score import rouge_scorer
import nltk
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity

nltk.download(“punkt”, quiet=True)
nltk.download(“punkt_tab”, quiet=True)

if not os.getenv(“OPENAI_API_KEY”):
try:
from google.colab import userdata # type: ignore
k = userdata.get(“OPENAI_API_KEY”)
if k:
os.environ[“OPENAI_API_KEY”] = k
except Exception:
pass

if not os.getenv(“OPENAI_API_KEY”):
import getpass
os.environ[“OPENAI_API_KEY”] = getpass.getpass(“Enter OPENAI_API_KEY (input hidden): “).strip()

assert os.getenv(“OPENAI_API_KEY”), “OPENAI_API_KEY is required.”

We set up the execution environment by installing all required dependencies and importing the core libraries used throughout the tutorial. We securely load the OpenAI API key at runtime, ensuring credentials are never hard-coded in the notebook. We also initialize essential NLP resources to ensure the evaluation pipeline runs reliably across different environments.

Copy CodeCopiedUse a different BrowserMODEL = “gpt-4o-mini”
TEMPERATURE = 0.2
MAX_OUTPUT_TOKENS = 250

ABS_SEM_SIM_MIN = 0.78
DELTA_SEM_SIM_MAX_DROP = 0.05
DELTA_ROUGE_L_MAX_DROP = 0.08
DELTA_BLEU_MAX_DROP = 0.10

mlflow.set_tracking_uri(“file:/content/mlruns”)
mlflow.set_experiment(“prompt_versioning_llm_regression”)

client = OpenAI()
embedder = SentenceTransformer(“all-MiniLM-L6-v2”)

EVAL_SET = [
{
“id”: “q1”,
“input”: “Summarize in one sentence: MLflow tracks experiments, runs, parameters, metrics, and artifacts.”,
“reference”: “MLflow helps track machine learning experiments by logging runs with parameters, metrics, and artifacts.”
},
{
“id”: “q2”,
“input”: “Rewrite professionally: ‘this model is kinda slow but it works ok.'”,
“reference”: “The model is somewhat slow, but it performs reliably.”
},
{
“id”: “q3”,
“input”: “Extract key fields as JSON: ‘Order 5531 by Alice costs $42.50 and ships to Toronto.'”,
“reference”: ‘{“order_id”:”5531″,”customer”:”Alice”,”amount_usd”:42.50,”city”:”Toronto”}’
},
{
“id”: “q4”,
“input”: “Answer briefly: What is prompt regression testing?”,
“reference”: “Prompt regression testing checks whether prompt changes degrade model outputs compared to a baseline.”
},
]

PROMPTS = [
{
“version”: “v1_baseline”,
“prompt”: (
“You are a precise assistant.n”
“Follow the user request carefully.n”
“If asked for JSON, output valid JSON only.n”
“User: {user_input}”
)
},
{
“version”: “v2_formatting”,
“prompt”: (
“You are a helpful, structured assistant.n”
“Respond clearly and concisely.n”
“Prefer clean formatting.n”
“User request: {user_input}”
)
},
{
“version”: “v3_guardrailed”,
“prompt”: (
“You are a rigorous assistant.n”
“Rules:n”
“1) If user asks for JSON, output ONLY valid minified JSON.n”
“2) Otherwise, keep the answer short and factual.n”
“User: {user_input}”
)
},
]

We define all experimental configurations, including model parameters, regression thresholds, and MLflow tracking settings. We construct the evaluation dataset and explicitly declare multiple prompt versions to compare and test for regressions. By centralizing these definitions, we ensure that prompt changes and evaluation logic remain controlled and reproducible.

Copy CodeCopiedUse a different Browserdef call_llm(formatted_prompt: str) -> str:
resp = client.responses.create(
model=MODEL,
input=formatted_prompt,
temperature=TEMPERATURE,
max_output_tokens=MAX_OUTPUT_TOKENS,
)
out = getattr(resp, “output_text”, None)
if out:
return out.strip()
try:
texts = []
for item in resp.output:
if getattr(item, “type”, “”) == “message”:
for c in item.content:
if getattr(c, “type”, “”) in (“output_text”, “text”):
texts.append(getattr(c, “text”, “”))
return “n”.join(texts).strip()
except Exception:
return “”

smooth = SmoothingFunction().method3
rouge = rouge_scorer.RougeScorer([“rougeL”], use_stemmer=True)

def safe_tokenize(s: str) -> List[str]:
s = (s or “”).strip().lower()
if not s:
return []
try:
return nltk.word_tokenize(s)
except LookupError:
return re.findall(r”bw+b”, s)

def bleu_score(ref: str, hyp: str) -> float:
r = safe_tokenize(ref)
h = safe_tokenize(hyp)
if len(h) == 0 or len(r) == 0:
return 0.0
return float(sentence_bleu([r], h, smoothing_function=smooth))

def rougeL_f1(ref: str, hyp: str) -> float:
scores = rouge.score(ref or “”, hyp or “”)
return float(scores[“rougeL”].fmeasure)

def semantic_sim(ref: str, hyp: str) -> float:
embs = embedder.encode([ref or “”, hyp or “”], normalize_embeddings=True)
return float(cosine_similarity([embs[0]], [embs[1]])[0][0])

We implement the core LLM invocation and evaluation metrics used to assess prompt quality. We compute BLEU, ROUGE-L, and semantic similarity scores to capture both surface-level and semantic differences in model outputs. It allows us to evaluate prompt changes from multiple complementary perspectives rather than relying on a single metric.

Copy CodeCopiedUse a different Browserdef evaluate_prompt(prompt_template: str) -> Tuple[pd.DataFrame, Dict[str, float], str]:
rows = []
for ex in EVAL_SET:
p = prompt_template.format(user_input=ex[“input”])
y = call_llm(p)
ref = ex[“reference”]

rows.append({
“id”: ex[“id”],
“input”: ex[“input”],
“reference”: ref,
“output”: y,
“bleu”: bleu_score(ref, y),
“rougeL_f1”: rougeL_f1(ref, y),
“semantic_sim”: semantic_sim(ref, y),
})

df = pd.DataFrame(rows)
agg = {
“bleu_mean”: float(df[“bleu”].mean()),
“rougeL_f1_mean”: float(df[“rougeL_f1”].mean()),
“semantic_sim_mean”: float(df[“semantic_sim”].mean()),
}
outputs_jsonl = “n”.join(json.dumps(r, ensure_ascii=False) for r in rows)
return df, agg, outputs_jsonl

def log_text_artifact(text: str, artifact_path: str):
mlflow.log_text(text, artifact_path)

def prompt_diff(old: str, new: str) -> str:
a = old.splitlines(keepends=True)
b = new.splitlines(keepends=True)
return “”.join(difflib.unified_diff(a, b, fromfile=”previous_prompt”, tofile=”current_prompt”))

def compute_regression_flags(baseline: Dict[str, float], current: Dict[str, float]) -> Dict[str, Any]:
d_sem = baseline[“semantic_sim_mean”] – current[“semantic_sim_mean”]
d_rouge = baseline[“rougeL_f1_mean”] – current[“rougeL_f1_mean”]
d_bleu = baseline[“bleu_mean”] – current[“bleu_mean”]

flags = {
“abs_semantic_fail”: current[“semantic_sim_mean”] < ABS_SEM_SIM_MIN,
“drop_semantic_fail”: d_sem > DELTA_SEM_SIM_MAX_DROP,
“drop_rouge_fail”: d_rouge > DELTA_ROUGE_L_MAX_DROP,
“drop_bleu_fail”: d_bleu > DELTA_BLEU_MAX_DROP,
“delta_semantic”: float(d_sem),
“delta_rougeL”: float(d_rouge),
“delta_bleu”: float(d_bleu),
}
flags[“regression”] = any([flags[“abs_semantic_fail”], flags[“drop_semantic_fail”], flags[“drop_rouge_fail”], flags[“drop_bleu_fail”]])
return flags

We build the evaluation and regression logic that runs each prompt against the evaluation set and aggregates results. We log prompt artifacts, prompt diffs, and evaluation outputs to MLflow, ensuring every experiment remains auditable. We also compute regression flags that automatically identify whether a prompt version degrades performance relative to the baseline. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserprint(“Running prompt versioning + regression testing with MLflow…”)
print(f”Tracking URI: {mlflow.get_tracking_uri()}”)
print(f”Experiment: {mlflow.get_experiment_by_name(‘prompt_versioning_llm_regression’).name}”)

run_summary = []
baseline_metrics = None
baseline_prompt = None
baseline_df = None
baseline_metrics_name = None

with mlflow.start_run(run_name=f”prompt_regression_suite_{int(time.time())}”) as parent_run:
mlflow.set_tag(“task”, “prompt_versioning_regression_testing”)
mlflow.log_param(“model”, MODEL)
mlflow.log_param(“temperature”, TEMPERATURE)
mlflow.log_param(“max_output_tokens”, MAX_OUTPUT_TOKENS)
mlflow.log_param(“eval_set_size”, len(EVAL_SET))

for pv in PROMPTS:
ver = pv[“version”]
prompt_t = pv[“prompt”]

with mlflow.start_run(run_name=ver, nested=True) as child_run:
mlflow.log_param(“prompt_version”, ver)
log_text_artifact(prompt_t, f”prompts/{ver}.txt”)

if baseline_prompt is not None and baseline_metrics_name is not None:
diff = prompt_diff(baseline_prompt, prompt_t)
log_text_artifact(diff, f”prompt_diffs/{baseline_metrics_name}_to_{ver}.diff”)
else:
log_text_artifact(“BASELINE_PROMPT (no diff)”, f”prompt_diffs/{ver}.diff”)

df, agg, outputs_jsonl = evaluate_prompt(prompt_t)

mlflow.log_dict(agg, f”metrics/{ver}_agg.json”)
log_text_artifact(outputs_jsonl, f”outputs/{ver}_outputs.jsonl”)

mlflow.log_metric(“bleu_mean”, agg[“bleu_mean”])
mlflow.log_metric(“rougeL_f1_mean”, agg[“rougeL_f1_mean”])
mlflow.log_metric(“semantic_sim_mean”, agg[“semantic_sim_mean”])

if baseline_metrics is None:
baseline_metrics = agg
baseline_prompt = prompt_t
baseline_df = df
baseline_metrics_name = ver
flags = {“regression”: False, “delta_bleu”: 0.0, “delta_rougeL”: 0.0, “delta_semantic”: 0.0}
mlflow.set_tag(“regression”, “false”)
else:
flags = compute_regression_flags(baseline_metrics, agg)
mlflow.log_metric(“delta_bleu”, flags[“delta_bleu”])
mlflow.log_metric(“delta_rougeL”, flags[“delta_rougeL”])
mlflow.log_metric(“delta_semantic”, flags[“delta_semantic”])
mlflow.set_tag(“regression”, str(flags[“regression”]).lower())
for k in [“abs_semantic_fail”,”drop_semantic_fail”,”drop_rouge_fail”,”drop_bleu_fail”]:
mlflow.set_tag(k, str(flags[k]).lower())

run_summary.append({
“prompt_version”: ver,
“bleu_mean”: agg[“bleu_mean”],
“rougeL_f1_mean”: agg[“rougeL_f1_mean”],
“semantic_sim_mean”: agg[“semantic_sim_mean”],
“delta_bleu_vs_baseline”: float(flags.get(“delta_bleu”, 0.0)),
“delta_rougeL_vs_baseline”: float(flags.get(“delta_rougeL”, 0.0)),
“delta_semantic_vs_baseline”: float(flags.get(“delta_semantic”, 0.0)),
“regression_flag”: bool(flags[“regression”]),
“mlflow_run_id”: child_run.info.run_id,
})

summary_df = pd.DataFrame(run_summary).sort_values(“prompt_version”)
print(“n=== Aggregated Results (higher is better) ===”)
display(summary_df)

regressed = summary_df[summary_df[“regression_flag”] == True]
if len(regressed) > 0:
print(“n Regressions detected:”)
display(regressed[[“prompt_version”,”delta_bleu_vs_baseline”,”delta_rougeL_vs_baseline”,”delta_semantic_vs_baseline”,”mlflow_run_id”]])
else:
print(“n No regressions detected under current thresholds.”)

if len(regressed) > 0 and baseline_df is not None:
worst_ver = regressed.sort_values(“delta_semantic_vs_baseline”, ascending=False).iloc[0][“prompt_version”]
worst_prompt = next(p[“prompt”] for p in PROMPTS if p[“version”] == worst_ver)
worst_df, _, _ = evaluate_prompt(worst_prompt)

merged = baseline_df[[“id”,”output”,”bleu”,”rougeL_f1″,”semantic_sim”]].merge(
worst_df[[“id”,”output”,”bleu”,”rougeL_f1″,”semantic_sim”]],
on=”id”,
suffixes=(“_baseline”, f”_{worst_ver}”)
)
merged[“delta_semantic”] = merged[“semantic_sim_baseline”] – merged[f”semantic_sim_{worst_ver}”]
merged[“delta_rougeL”] = merged[“rougeL_f1_baseline”] – merged[f”rougeL_f1_{worst_ver}”]
merged[“delta_bleu”] = merged[“bleu_baseline”] – merged[f”bleu_{worst_ver}”]
print(f”n=== Per-example deltas: baseline vs {worst_ver} (positive delta = worse) ===”)
display(
merged[[“id”,”delta_semantic”,”delta_rougeL”,”delta_bleu”,”output_baseline”,f”output_{worst_ver}”]]
.sort_values(“delta_semantic”, ascending=False)
)

print(“nOpen MLflow UI (optional) by running:”)
print(“!mlflow ui –backend-store-uri file:/content/mlruns –host 0.0.0.0 –port 5000”)

We orchestrate the full prompt regression testing workflow using nested MLflow runs. We compare each prompt version against the baseline, log metric deltas, and record regression outcomes in a structured summary table. This completes a repeatable, engineering-grade pipeline for prompt versioning and regression testing that we can extend to larger datasets and real-world applications.

In conclusion, we established a practical, research-oriented framework for prompt versioning and regression testing that enables us to evaluate LLM behavior with discipline and transparency. We showed how MLflow enables us to track prompt evolution, compare outputs across versions, and automatically flag regressions based on well-defined thresholds. This approach helps us move away from ad hoc prompt tuning and toward measurable, repeatable experimentation. By adopting this workflow, we ensured that prompt updates improve model behavior intentionally rather than introducing hidden performance regressions.

Check out the FULL CODES here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post A Coding Implementation to Establish Rigorous Prompt Versioning and Regression Testing Workflows for Large Language Models using MLflow appeared first on MarkTechPost.