How to Build a Self-Organizing Agent Memory System for Long-Term AI Re …

In this tutorial, we build a self-organizing memory system for an agent that goes beyond storing raw conversation history and instead structures interactions into persistent, meaningful knowledge units. We design the system so that reasoning and memory management are clearly separated, allowing a dedicated component to extract, compress, and organize information. At the same time, the main agent focuses on responding to the user. We use structured storage with SQLite, scene-based grouping, and summary consolidation, and we show how an agent can maintain useful context over long horizons without relying on opaque vector-only retrieval.

Copy CodeCopiedUse a different Browserimport sqlite3
import json
import re
from datetime import datetime
from typing import List, Dict
from getpass import getpass
from openai import OpenAI

OPENAI_API_KEY = getpass(“Enter your OpenAI API key: “).strip()
client = OpenAI(api_key=OPENAI_API_KEY)

def llm(prompt, temperature=0.1, max_tokens=500):
return client.chat.completions.create(
model=”gpt-4o-mini”,
messages=[{“role”: “user”, “content”: prompt}],
temperature=temperature,
max_tokens=max_tokens
).choices[0].message.content.strip()

We set up the core runtime by importing all required libraries and securely collecting the API key at execution time. We initialize the language model client and define a single helper function that standardizes all model calls. We ensure that every downstream component relies on this shared interface for consistent generation behavior.

Copy CodeCopiedUse a different Browserclass MemoryDB:
def __init__(self):
self.db = sqlite3.connect(“:memory:”)
self.db.row_factory = sqlite3.Row
self._init_schema()

def _init_schema(self):
self.db.execute(“””
CREATE TABLE mem_cells (
id INTEGER PRIMARY KEY,
scene TEXT,
cell_type TEXT,
salience REAL,
content TEXT,
created_at TEXT
)
“””)

self.db.execute(“””
CREATE TABLE mem_scenes (
scene TEXT PRIMARY KEY,
summary TEXT,
updated_at TEXT
)
“””)

self.db.execute(“””
CREATE VIRTUAL TABLE mem_cells_fts
USING fts5(content, scene, cell_type)
“””)

def insert_cell(self, cell):
self.db.execute(
“INSERT INTO mem_cells VALUES(NULL,?,?,?,?,?)”,
(
cell[“scene”],
cell[“cell_type”],
cell[“salience”],
json.dumps(cell[“content”]),
datetime.utcnow().isoformat()
)
)
self.db.execute(
“INSERT INTO mem_cells_fts VALUES(?,?,?)”,
(
json.dumps(cell[“content”]),
cell[“scene”],
cell[“cell_type”]
)
)
self.db.commit()

We define a structured memory database that persists information across interactions. We create tables for atomic memory units, higher-level scenes, and a full-text search index to enable symbolic retrieval. We also implement the logic to insert new memory entries in a normalized and queryable form.

Copy CodeCopiedUse a different Browser def get_scene(self, scene):
return self.db.execute(
“SELECT * FROM mem_scenes WHERE scene=?”, (scene,)
).fetchone()

def upsert_scene(self, scene, summary):
self.db.execute(“””
INSERT INTO mem_scenes VALUES(?,?,?)
ON CONFLICT(scene) DO UPDATE SET
summary=excluded.summary,
updated_at=excluded.updated_at
“””, (scene, summary, datetime.utcnow().isoformat()))
self.db.commit()

def retrieve_scene_context(self, query, limit=6):
tokens = re.findall(r”[a-zA-Z0-9]+”, query)
if not tokens:
return []

fts_query = ” OR “.join(tokens)

rows = self.db.execute(“””
SELECT scene, content FROM mem_cells_fts
WHERE mem_cells_fts MATCH ?
LIMIT ?
“””, (fts_query, limit)).fetchall()

if not rows:
rows = self.db.execute(“””
SELECT scene, content FROM mem_cells
ORDER BY salience DESC
LIMIT ?
“””, (limit,)).fetchall()

return rows

def retrieve_scene_summary(self, scene):
row = self.get_scene(scene)
return row[“summary”] if row else “”

We focus on memory retrieval and scene maintenance logic. We implement safe full-text search by sanitizing user queries and adding a fallback strategy when no lexical matches are found. We also expose helper methods to fetch consolidated scene summaries for long-horizon context building.

Copy CodeCopiedUse a different Browserclass MemoryManager:
def __init__(self, db: MemoryDB):
self.db = db

def extract_cells(self, user, assistant) -> List[Dict]:
prompt = f”””
Convert this interaction into structured memory cells.

Return JSON array with objects containing:
– scene
– cell_type (fact, plan, preference, decision, task, risk)
– salience (0-1)
– content (compressed, factual)

User: {user}
Assistant: {assistant}
“””
raw = llm(prompt)
raw = re.sub(r”“`json|“`”, “”, raw)

try:
cells = json.loads(raw)
return cells if isinstance(cells, list) else []
except Exception:
return []

def consolidate_scene(self, scene):
rows = self.db.db.execute(
“SELECT content FROM mem_cells WHERE scene=? ORDER BY salience DESC”,
(scene,)
).fetchall()

if not rows:
return

cells = [json.loads(r[“content”]) for r in rows]

prompt = f”””
Summarize this memory scene in under 100 words.
Keep it stable and reusable for future reasoning.

Cells:
{cells}
“””
summary = llm(prompt, temperature=0.05)
self.db.upsert_scene(scene, summary)

def update(self, user, assistant):
cells = self.extract_cells(user, assistant)

for cell in cells:
self.db.insert_cell(cell)

for scene in set(c[“scene”] for c in cells):
self.consolidate_scene(scene)

We implement the dedicated memory management component responsible for structuring experience. We extract compact memory representations from interactions, store them, and periodically consolidate them into stable scene summaries. We ensure that memory evolves incrementally without interfering with the agent’s response flow.

Copy CodeCopiedUse a different Browserclass WorkerAgent:
def __init__(self, db: MemoryDB, mem_manager: MemoryManager):
self.db = db
self.mem_manager = mem_manager

def answer(self, user_input):
recalled = self.db.retrieve_scene_context(user_input)
scenes = set(r[“scene”] for r in recalled)

summaries = “n”.join(
f”[{scene}]n{self.db.retrieve_scene_summary(scene)}”
for scene in scenes
)

prompt = f”””
You are an intelligent agent with long-term memory.

Relevant memory:
{summaries}

User: {user_input}
“””
assistant_reply = llm(prompt)
self.mem_manager.update(user_input, assistant_reply)
return assistant_reply

db = MemoryDB()
memory_manager = MemoryManager(db)
agent = WorkerAgent(db, memory_manager)

print(agent.answer(“We are building an agent that remembers projects long term.”))
print(agent.answer(“It should organize conversations into topics automatically.”))
print(agent.answer(“This memory system should support future reasoning.”))

for row in db.db.execute(“SELECT * FROM mem_scenes”):
print(dict(row))

We define the worker agent that performs reasoning while remaining memory-aware. We retrieve relevant scenes, assemble contextual summaries, and generate responses grounded in long-term knowledge. We then close the loop by passing the interaction back to the memory manager so the system continuously improves over time.

In this tutorial, we demonstrated how an agent can actively curate its own memory and turn past interactions into stable, reusable knowledge rather than ephemeral chat logs. We enabled memory to evolve through consolidation and selective recall, which supports more consistent and grounded reasoning across sessions. This approach provides a practical foundation for building long-lived agentic systems, and it can be naturally extended with mechanisms for forgetting, richer relational memory, or graph-based orchestration as the system grows in complexity.

Check out the Full Codes. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post How to Build a Self-Organizing Agent Memory System for Long-Term AI Reasoning  appeared first on MarkTechPost.

Google AI Introduces the WebMCP to Enable Direct and Structured Websit …

Google is officially turning Chrome into a playground for AI agents. For years, AI ‘browsers’ have relied on a messy process: taking screenshots of websites, running them through vision models, and guessing where to click. This method is slow, breaks easily, and consumes massive amounts of compute.

Google has introduced a better way: the Web Model Context Protocol (WebMCP). Announced alongside the Early Preview Program (EPP), this protocol allows websites to communicate directly to AI models. Instead of the AI ‘guessing’ how to use a site, the site tells the AI exactly what tools are available.

The End of Screen Scraping

Current AI agents treat the web like a picture. They ‘look’ at the UI and try to find the ‘Submit’ button. If the button moves 5 pixels, the agent might fail.

WebMCP replaces this guesswork with structured data. It turns a website into a set of capabilities. For developers, this means you no longer have to worry about an AI breaking your frontend. You simply define what the AI can do, and Chrome handles the communication.

How WebMCP Works: 2 Integration Paths

AI Devs can choose between 2 ways to make a site ‘agent-ready.’

1. The Declarative Approach (HTML)

This is the simplest method for web developers. You can expose a website’s functions by adding new attributes to your standard HTML.

Attributes: Use toolname and tooldescription inside your <form> tags.

The Benefit: Chrome automatically reads these tags and creates a schema for the AI. If you have a ‘Book Flight’ form, the AI sees it as a structured tool with specific inputs.

Event Handling: When an AI fills the form, it triggers a SubmitEvent.agentInvoked. This allows your backend to know a machine—not a human—is making the request.

2. The Imperative Approach (JavaScript)

For complex apps, the Imperative API provides deeper control. This allows for multi-step workflows that a simple form cannot handle.

The Method: Use navigator.modelContext.registerTool().

The Logic: You define a tool name, a description, and a JSON schema for inputs.

Real-time Execution: When the AI agent wants to ‘Add to Cart,’ it calls your registered JavaScript function. This happens within the user’s current session, meaning the AI doesn’t need to re-login or bypass security headers.

Why the Early Preview Program (EPP) Matters

Google is not releasing this to everyone at once. They are using the Early Preview Program (EPP) to gather data from 1st-movers. Developers who join the EPP get early access to Chrome 146 features.

This is a critical phase for data scientists. By testing in the EPP, you can see how different Large Language Models (LLMs) interpret your tool descriptions. If a description is too vague, the model might hallucinate. The EPP allows engineers to fine-tune these descriptions before the protocol becomes a global standard.

Performance and Efficiency

The technical shift here is massive. Moving from vision-based browsing to WebMCP-based interaction offers 3 key improvements:

Lower Latency: No more waiting for screenshots to upload and be processed by a vision model.

Higher Accuracy: Models interact with structured JSON data, which reduces errors to nearly 0%.

Reduced Costs: Sending text-based schemas is much cheaper than sending high-resolution images to an LLM.

The Technical Stack: navigator.modelContext

For AI devs, the core aspect of this update lives in the new modelContext object. Here is the breakdown of the 4 primary methods:

MethodPurposeregisterTool()Makes a function visible to the AI agent.unregisterTool()Removes a function from the AI’s reach.provideContext()Sends extra metadata (like user preferences) to the agent.clearContext()Wipes the shared data to ensure privacy.

Security First

A common concern for software engineers is security. WebMCP is designed as a ‘permission-first’ protocol. The AI agent cannot execute a tool without the browser acting as a mediator. In many cases, Chrome will prompt the user to ‘Allow AI to book this flight?’ before the final action is taken. This keeps the user in control while allowing the agent to do the heavy lifting.

Key Takeaways

Standardizing the ‘Agentic Web’: The Web Model Context Protocol (WebMCP) is a new standard that allows AI agents to interact with websites as structured toolkits rather than just ‘looking’ at pixels. This replaces slow, error-prone screen scraping with direct, reliable communication.

Dual Integration Paths: Developers can make sites ‘AI-ready’ via two methods: a Declarative API (using simple HTML attributes like toolname in forms) or an Imperative API (using JavaScript’s navigator.modelContext.registerTool() for complex, multi-step workflows).

Massive Efficiency Gains: By using structured JSON schemas instead of vision-based processing (screenshots), WebMCP leads to a 67% reduction in computational overhead and pushes task accuracy to approximately 98%.

Built-in Security and Privacy: The protocol is ‘permission-first.’ The browser acts as a secure proxy, requiring user confirmation before an AI agent can execute sensitive tools. It also includes methods like clearContext() to wipe shared session data.

Early Access via EPP: The Early Preview Program (EPP) allows software engineers and data scientists to test these features in Chrome 146.

Check out the Technical details. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post Google AI Introduces the WebMCP to Enable Direct and Structured Website Interactions for New AI Agents appeared first on MarkTechPost.

Exa AI Introduces Exa Instant: A Sub-200ms Neural Search Engine Design …

In the world of Large Language Models (LLMs), speed is the only feature that matters once accuracy is solved. For a human, waiting 1 second for a search result is fine. For an AI agent performing 10 sequential searches to solve a complex task, a 1-second delay per search creates a 10-second lag. This latency kills the user experience.

Exa, the search engine startup formerly known as Metaphor, just released Exa Instant. It is a search model designed to provide the world’s web data to AI agents in under 200ms. For software engineers and data scientists building Retrieval-Augmented Generation (RAG) pipelines, this removes the biggest bottleneck in agentic workflows.

https://exa.ai/blog/exa-instant

Why Latency is the Enemy of RAG

When you build a RAG application, your system follows a loop: the user asks a question, your system searches the web for context, and the LLM processes that context. If the search step takes 700ms to 1000ms, the total ‘time to first token’ becomes sluggish.

Exa Instant delivers results with a latency between 100ms and 200ms. In tests conducted from the us-west-1 (northern california) region, the network latency was roughly 50ms. This speed allows agents to perform multiple searches in a single ‘thought’ process without the user feeling a delay.

No More ‘Wrapping’ Google

Most search APIs available today are ‘wrappers.’ They send a query to a traditional search engine like Google or Bing, scrape the results, and send them back to you. This adds layers of overhead.

Exa Instant is different. It is built on a proprietary, end-to-end neural search and retrieval stack. Instead of matching keywords, Exa uses embeddings and transformers to understand the meaning of a query. This neural approach ensures the results are relevant to the AI’s intent, not just the specific words used. By owning the entire stack from the crawler to the inference engine, Exa can optimize for speed in ways that ‘wrapper’ APIs cannot.

Benchmarking the Speed

The Exa team benchmarked Exa Instant against other popular options like Tavily Ultra Fast and Brave. To ensure the tests were fair and avoided ‘cached’ results, the team used the SealQA query dataset. They also added random words generated by GPT-5 to each query to force the engine to perform a fresh search every time.

The results showed that Exa Instant is up to 15x faster than competitors. While Exa offers other models like Exa Fast and Exa Auto for higher-quality reasoning, Exa Instant is the clear choice for real-time applications where every millisecond counts.

Pricing and Developer Integration

The transition to Exa Instant is simple. The API is accessible through the dashboard.exa.ai platform.

Cost: Exa Instant is priced at $5 per 1,000 requests.

Capacity: It searches the same massive index of the web as Exa’s more powerful models.

Accuracy: While designed for speed, it maintains high relevance. For specialized entity searches, Exa’s Websets product remains the gold standard, proving to be 20x more correct than Google for complex queries.

The API returns clean content ready for LLMs, removing the need for developers to write custom scraping or HTML cleaning code.

Key Takeaways

Sub-200ms Latency for Real-Time Agents: Exa Instant is optimized for ‘agentic’ workflows where speed is a bottleneck. By delivering results in under 200ms (and network latency as low as 50ms), it allows AI agents to perform multi-step reasoning and parallel searches without the lag associated with traditional search engines.

Proprietary Neural Stack vs. ‘Wrappers‘: Unlike many search APIs that simply ‘wrap’ Google or Bing (adding 700ms+ of overhead), Exa Instant is built on a proprietary, end-to-end neural search engine. It uses a custom transformer-based architecture to index and retrieve web data, offering up to 15x faster performance than existing alternatives like Tavily or Brave.

Cost-Efficient Scaling: The model is designed to make search a ‘primitive’ rather than an expensive luxury. It is priced at $5 per 1,000 requests, allowing developers to integrate real-time web lookups at every step of an agent’s thought process without breaking the budget.

Semantic Intent over Keywords: Exa Instant leverages embeddings to prioritize the ‘meaning’ of a query rather than exact word matches. This is particularly effective for RAG (Retrieval-Augmented Generation) applications, where finding ‘link-worthy’ content that fits an LLM’s context is more valuable than simple keyword hits.

Optimized for LLM Consumption: The API provides more than just URLs; it offers clean, parsed HTML, Markdown, and token-efficient highlights. This reduces the need for custom scraping scripts and minimizes the number of tokens the LLM needs to process, further speeding up the entire pipeline.

Check out the Technical details. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post Exa AI Introduces Exa Instant: A Sub-200ms Neural Search Engine Designed to Eliminate Bottlenecks for Real-Time Agentic Workflows appeared first on MarkTechPost.

[In-Depth Guide] The Complete CTGAN + SDV Pipeline for High-Fidelity S …

In this tutorial, we build a complete, production-grade synthetic data pipeline using CTGAN and the SDV ecosystem. We start from raw mixed-type tabular data and progressively move toward constrained generation, conditional sampling, statistical validation, and downstream utility testing. Rather than stopping at sample generation, we focus on understanding how well synthetic data preserves structure, distributions, and predictive signal. This tutorial demonstrates how CTGAN can be used responsibly and rigorously in real-world data science workflows.

Copy CodeCopiedUse a different Browser!pip -q install “ctgan” “sdv” “sdmetrics” “scikit-learn” “pandas” “numpy” “matplotlib”

import numpy as np
import pandas as pd
import warnings
warnings.filterwarnings(“ignore”)

import ctgan, sdv, sdmetrics
from ctgan import load_demo, CTGAN

from sdv.metadata import SingleTableMetadata
from sdv.single_table import CTGANSynthesizer

from sdv.cag import Inequality, FixedCombinations
from sdv.sampling import Condition

from sdmetrics.reports.single_table import DiagnosticReport, QualityReport

from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline

import matplotlib.pyplot as plt

print(“Versions:”)
print(“ctgan:”, ctgan.__version__)
print(“sdv:”, sdv.__version__)
print(“sdmetrics:”, sdmetrics.__version__)

We set up the environment by installing all required libraries and importing the full dependency stack. We explicitly load CTGAN, SDV, SDMetrics, and downstream ML tooling to ensure compatibility across the pipeline. We also surface library versions to make the experiment reproducible and debuggable.

Copy CodeCopiedUse a different Browserreal = load_demo().copy()
real.columns = [c.strip().replace(” “, “_”) for c in real.columns]

target_col = “income”
real[target_col] = real[target_col].astype(str)

categorical_cols = real.select_dtypes(include=[“object”]).columns.tolist()
numerical_cols = [c for c in real.columns if c not in categorical_cols]

print(“Rows:”, len(real), “Cols:”, len(real.columns))
print(“Categorical:”, len(categorical_cols), “Numerical:”, len(numerical_cols))
display(real.head())

ctgan_model = CTGAN(
epochs=30,
batch_size=500,
verbose=True
)
ctgan_model.fit(real, discrete_columns=categorical_cols)
synthetic_ctgan = ctgan_model.sample(5000)
print(“Standalone CTGAN sample:”)
display(synthetic_ctgan.head())

We load the CTGAN Adult demo dataset and perform minimal normalization on column names and data types. We explicitly identify categorical and numerical columns, which is critical for both CTGAN training and evaluation. We then train a baseline standalone CTGAN model and generate synthetic samples for comparison.

Copy CodeCopiedUse a different Browsermetadata = SingleTableMetadata()
metadata.detect_from_dataframe(data=real)
metadata.update_column(column_name=target_col, sdtype=”categorical”)

constraints = []

if len(numerical_cols) >= 2:
col_lo, col_hi = numerical_cols[0], numerical_cols[1]
constraints.append(Inequality(low_column_name=col_lo, high_column_name=col_hi))
print(f”Added Inequality constraint: {col_hi} > {col_lo}”)

if len(categorical_cols) >= 2:
c1, c2 = categorical_cols[0], categorical_cols[1]
constraints.append(FixedCombinations(column_names=[c1, c2]))
print(f”Added FixedCombinations constraint on: [{c1}, {c2}]”)

synth = CTGANSynthesizer(
metadata=metadata,
epochs=30,
batch_size=500
)

if constraints:
synth.add_constraints(constraints)

synth.fit(real)

synthetic_sdv = synth.sample(num_rows=5000)
print(“SDV CTGANSynthesizer sample:”)
display(synthetic_sdv.head())

We construct a formal metadata object and attach explicit semantic types to the dataset. We introduce structural constraints using SDV’s constraint graph system, enforcing numeric inequalities and validity of categorical combinations. We then train a CTGAN-based SDV synthesizer that respects these constraints during generation.

Copy CodeCopiedUse a different Browserloss_df = synth.get_loss_values()
display(loss_df.tail())

x_candidates = [“epoch”, “step”, “steps”, “iteration”, “iter”, “batch”, “update”]
xcol = next((c for c in x_candidates if c in loss_df.columns), None)

g_candidates = [“generator_loss”, “gen_loss”, “g_loss”]
d_candidates = [“discriminator_loss”, “disc_loss”, “d_loss”]
gcol = next((c for c in g_candidates if c in loss_df.columns), None)
dcol = next((c for c in d_candidates if c in loss_df.columns), None)

plt.figure(figsize=(10,4))

if xcol is None:
x = np.arange(len(loss_df))
else:
x = loss_df[xcol].to_numpy()

if gcol is not None:
plt.plot(x, loss_df[gcol].to_numpy(), label=gcol)
if dcol is not None:
plt.plot(x, loss_df[dcol].to_numpy(), label=dcol)

plt.xlabel(xcol if xcol is not None else “index”)
plt.ylabel(“loss”)
plt.legend()
plt.title(“CTGAN training losses (SDV wrapper)”)
plt.show()

cond_col = categorical_cols[0]
common_value = real[cond_col].value_counts().index[0]
conditions = [Condition({cond_col: common_value}, num_rows=2000)]

synthetic_cond = synth.sample_from_conditions(
conditions=conditions,
max_tries_per_batch=200,
batch_size=5000
)

print(“Conditional sampling requested:”, 2000, “got:”, len(synthetic_cond))
print(“Conditional sample distribution (top 5):”)
print(synthetic_cond[cond_col].value_counts().head(5))
display(synthetic_cond.head())

We extract and visualize the dynamics of generator and discriminator losses using a version-robust plotting strategy. We perform conditional sampling to generate data under specific attribute constraints and verify that the conditions are satisfied. This demonstrates how CTGAN behaves under guided generation scenarios.

Copy CodeCopiedUse a different Browsermetadata_dict = metadata.to_dict()

diagnostic = DiagnosticReport()
diagnostic.generate(real_data=real, synthetic_data=synthetic_sdv, metadata=metadata_dict, verbose=True)
print(“Diagnostic score:”, diagnostic.get_score())

quality = QualityReport()
quality.generate(real_data=real, synthetic_data=synthetic_sdv, metadata=metadata_dict, verbose=True)
print(“Quality score:”, quality.get_score())

def show_report_details(report, title):
print(f”n===== {title} details =====”)
props = report.get_properties()
for p in props:
print(f”n— {p} —“)
details = report.get_details(property_name=p)
try:
display(details.head(10))
except Exception:
display(details)

show_report_details(diagnostic, “DiagnosticReport”)
show_report_details(quality, “QualityReport”)

train_real, test_real = train_test_split(
real, test_size=0.25, random_state=42, stratify=real[target_col]
)

def make_pipeline(cat_cols, num_cols):
pre = ColumnTransformer(
transformers=[
(“cat”, OneHotEncoder(handle_unknown=”ignore”), cat_cols),
(“num”, “passthrough”, num_cols),
],
remainder=”drop”
)
clf = LogisticRegression(max_iter=200)
return Pipeline([(“pre”, pre), (“clf”, clf)])

pipe_syn = make_pipeline(categorical_cols, numerical_cols)
pipe_syn.fit(synthetic_sdv.drop(columns=[target_col]), synthetic_sdv[target_col])

proba_syn = pipe_syn.predict_proba(test_real.drop(columns=[target_col]))[:, 1]
y_true = (test_real[target_col].astype(str).str.contains(“>”)).astype(int)
auc_syn = roc_auc_score(y_true, proba_syn)
print(“Synthetic-train -> Real-test AUC:”, auc_syn)

pipe_real = make_pipeline(categorical_cols, numerical_cols)
pipe_real.fit(train_real.drop(columns=[target_col]), train_real[target_col])

proba_real = pipe_real.predict_proba(test_real.drop(columns=[target_col]))[:, 1]
auc_real = roc_auc_score(y_true, proba_real)
print(“Real-train -> Real-test AUC:”, auc_real)

model_path = “ctgan_sdv_synth.pkl”
synth.save(model_path)
print(“Saved synthesizer to:”, model_path)

from sdv.utils import load_synthesizer
synth_loaded = load_synthesizer(model_path)

synthetic_loaded = synth_loaded.sample(1000)
print(“Loaded synthesizer sample:”)
display(synthetic_loaded.head())

We evaluate synthetic data using SDMetrics diagnostic and quality reports and a property-level inspection. We validate downstream usefulness by training a classifier on synthetic data and testing it on real data. Finally, we serialize the trained synthesizer and confirm that it can be reloaded and sampled reliably.

In conclusion, we demonstrated that synthetic data generation with CTGAN becomes significantly more powerful when paired with metadata, constraints, and rigorous evaluation. By validating both statistical similarity and downstream task performance, we ensured that the synthetic data is not only realistic but also useful. This pipeline serves as a strong foundation for privacy-preserving analytics, data sharing, and simulation workflows. With careful configuration and evaluation, CTGAN can be safely deployed in real-world data science systems.

Check out the Full Codes here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post [In-Depth Guide] The Complete CTGAN + SDV Pipeline for High-Fidelity Synthetic Data appeared first on MarkTechPost.

Kyutai Releases Hibiki-Zero: A3B Parameter Simultaneous Speech-to-Spee …

Kyutai has released Hibiki-Zero, a new model for simultaneous speech-to-speech translation (S2ST) and speech-to-text translation (S2TT). The system translates source speech into a target language in real-time. It handles non-monotonic word dependencies during the process. Unlike previous models, Hibiki-Zero does not require word-level aligned data for training. This eliminates a major bottleneck in scaling AI translation to more languages.

Traditional approaches rely on supervised training with word-level alignments. These alignments are difficult to collect at scale. Developers usually depend on synthetic alignments and language-specific heuristics. Hibiki-Zero removes this complexity by using a novel reinforcement learning (RL) strategy to optimize latency.

https://kyutai.org/blog/2026-02-12-hibiki-zero

A Multistream Architecture

Hibiki-Zero is a decoder-only model. It uses a multistream architecture to model sequences of tokens jointly. The model handles 3 specific streams:

Source Stream: Audio tokens from the input speech.

Target Stream: Generated audio tokens for the translated speech.

Inner Monologue: A stream of padded text tokens that match the target audio.

The system uses the Mimi neural audio codec. Mimi is a causal and streaming codec that encodes waveforms into discrete tokens. It operates at a framerate of 12.5 Hz. The model uses an RQ-Transformer to model these audio streams.

The architectural specs include:

Total Parameters: 3B.

Temporal Transformer: 28 layers with a latent dimension of 2048.

Depth Transformer: 6 layers per codebook with a latent dimension of 1024.

Context Window: 4min.

Audio Codebooks: 16 levels for high-quality speech.

Training Without Human Interpretation Data

Hibiki-Zero is trained in 2 main stages:

Coarse Alignment Training: The model first trains on sentence-level aligned data. This data ensures that the ith sentence in the target is a translation of the ith sentence in the source. The research team use a technique to insert artificial silence in the target speech to delay its content relative to the source.

Reinforcement Learning (RL): The model uses Group Relative Policy Optimization (GRPO) to refine its policy. This stage reduces translation latency while preserving quality.

The RL process uses process rewards based only on the BLEU score. It computes intermediate rewards at multiple points during translation. A hyperparameter ⍺ balances the trade-off between speed and accuracy. A lower ⍺ reduces latency but may slightly decrease quality.

Scaling to Italian in Record Time

The researchers demonstrated how easily Hibiki-Zero adapts to new languages. They added Italian as an input language using less than 1000h of speech data.

They performed supervised fine-tuning followed by the GRPO process.

The model reached a quality and latency trade-off similar to Meta’s Seamless model.

It surpassed Seamless in speaker similarity by over 30 points.

Performance and Results

Hibiki-Zero achieves state-of-the-art results across 5 X-to-English tasks. It was tested on the Audio-NTREX-4L long-form benchmark, which includes 15h of speech per TTS system.

MetricHibiki-Zero (French)Seamless (French)ASR-BLEU (↑)28.7 23.9 Speaker Similarity (↑)61.3 44.4 Average Lag (LAAL) (↓)2.3 6.2

In short-form tasks (Europarl-ST), Hibiki-Zero reached an ASR-BLEU of 34.6 with a lag of 2.8 seconds. Human raters also scored the model significantly higher than baselines for speech naturalness and voice transfer.

https://kyutai.org/blog/2026-02-12-hibiki-zero

Key Takeaways

Zero Aligned Data Requirement: Hibiki-Zero eliminates the need for expensive, hand-crafted word-level alignments between source and target speech, which were previously the biggest bottleneck in scaling simultaneous translation to new languages.

GRPO-Driven Latency Optimization: The model uses Group Relative Policy Optimization (GRPO) and a simple reward system based only on BLEU scores to automatically learn an efficient translation policy, balancing high translation quality with low latency.

Coarse-to-Fine Training Strategy: The training pipeline starts with sentence-level aligned data to teach the model base translation at high latency, followed by a reinforcement learning phase that “teaches” the model when to speak and when to listen.

Superior Voice and Naturalness: In benchmarking against previous state-of-the-art systems like Seamless, Hibiki-Zero achieved a 30-point lead in speaker similarity and significantly higher scores in speech naturalness and audio quality across five language tasks.

Rapid New Language Adaptation: The architecture is highly portable; researchers demonstrated that Hibiki-Zero could be adapted to a new input language (Italian) with less than 1,000 hours of speech data while maintaining its original performance on other languages.

Check out the Paper, Technical details, Repo and Samples. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post Kyutai Releases Hibiki-Zero: A3B Parameter Simultaneous Speech-to-Speech Translation Model Using GRPO Reinforcement Learning Without Any Word-Level Aligned Data appeared first on MarkTechPost.

Customize AI agent browsing with proxies, profiles, and extensions in …

AI agents that browse the web need more than basic page navigation. Our customers tell us they need agents that maintain session state across interactions, route traffic through corporate proxy infrastructure, and run with custom browser configurations. AgentCore Browser provides a secure, isolated browser environment for your agents to interact with web applications. Until now, in Agent Core Browser, each browser session started from a blank slate with default settings and direct internet access, limiting what agents could accomplish in real-world enterprise environments.
Today, we are announcing three new capabilities that address these requirements: proxy configuration, browser profiles, and browser extensions. Together, these features give you fine-grained control over how your AI agents interact with the web.
These three capabilities give you control over how AgentCore Browser sessions connect to the internet, what state they retain, and how they behave. Proxy configuration lets you route browser traffic through your own proxy servers, providing IP stability and integration with corporate network infrastructure. Browser profiles persist cookies and local storage across sessions, so agents can resume authenticated workflows without repeating login flows. Browser extensions load Chrome extensions into sessions to customize browser behavior for your use case. This post will walk through each capability with configuration examples and practical use cases to help you get started.
How persistent browser profiles keep AI Agents running smoothly
Customers building agents for e-commerce testing, authenticated workflows, and multi-step user journeys need browser sessions that remember state. Without persistent profiles, agents are required to re-authenticate and rebuild context at the start of every session, adding latency and fragility to automated workflows. Browser profiles solve this by saving and restoring cookies and local storage between sessions, so an agent that logged into a portal yesterday can pick up where it left off today.
IP stability is another common requirement. Healthcare and financial portals validate sessions based on source IP address, and rotating AWS IP addresses cause frequent re-authentication cycles that break long-running workflows. Proxy support lets you route traffic through servers with stable egress IPs, maintaining session continuity and meeting IP allowlisting requirements. Organizations that route traffic through corporate proxies need to extend this practice to AI agents for browser sessions. Proxy configuration enables access to internal webpages and resources that require proxy-based connectivity.
Browser extensions allow custom configurations such as ad blocking, authentication helpers, or other browser-level customization. When combined with proxy logging, these capabilities helps provide access control and audit evidence that may support compliance programs such as FedRAMP, HITRUST, and PCI. 
Feature 1: Proxy configuration
AgentCore Browser now supports routing browser traffic through your own external proxy servers. When you create a browser session with proxy configuration, AgentCore configures the browser to route HTTP and HTTPS traffic through your specified proxy servers.
How it works
You call StartBrowserSession with a proxyConfiguration specifying your proxy server. If using authentication, AgentCore retrieves proxy credentials from AWS Secrets Manager. The browser session starts with your proxy configuration applied, and browser traffic routes through your proxy server based on your domain routing rules.
Getting started with proxies
Complete these prerequisites before proceeding.
Step 1: Create a credentials secret (if your proxy requires authentication)

import boto3
import json
client = boto3.client(‘secretsmanager’)
client.create_secret(
Name=’my-proxy-credentials’,
SecretString=json.dumps({
‘username’: ‘<your-username>’,
‘password’: ‘<your-password>’
})
)

Step 2: Create a browser session with proxy configuration 

session_client = boto3.client(‘bedrock-agentcore’, region_name='<region>’)

response = session_client.start_browser_session(
browserIdentifier=”aws.browser.v1″,
name=”my-proxy-session”,
proxyConfiguration={
“proxies”: [{
“externalProxy”: {
“server”: “<your-proxy-hostname>”,
“port”: 8080,
“credentials”: {
“basicAuth”: {
“secretArn”: “arn:aws:secretsmanager:<region>:<account-id>:secret:<secret-name>”
}
}
}
}]
}
)
print(f”Session ID: {response[‘sessionId’]}”)

The credentials field is optional for proxies without authentication.
Domain-based routing
Use domainPatterns to route specific domains through designated proxies, and bypass.domainPatterns for domains that should connect directly:

proxyConfiguration={
“proxies”: [
{
“externalProxy”: {
“server”: “corp-proxy.example.com”,
“port”: 8080,
“domainPatterns”: [“.company.com”, “.internal.corp”]
}
},
{
“externalProxy”: {
“server”: “general-proxy.example.com”,
“port”: 8080
}
}
],
“bypass”: {
“domainPatterns”: [“.amazonaws.com”]
}
}

With this configuration, requests to *.company.com and *.internal.corp route through the corporate proxy,  requests to *.amazonaws.com bypass all proxies, and everything else routes through the general proxy. These fields are just an example. Bypass domains can match bypass.domainPatterns to connect directly and external proxy can be a valid proxy’s domainPatterns route through that proxy (first match wins based on array order). 
Routing precedence
When AgentCore Browser processes an outbound request, it walks through three tiers of routing rules to decide where to send the traffic. It first checks the bypass list. If the destination domain matches a bypass.domainPatterns entry, the request connects directly to the internet without using any proxy. If the domain does not match a bypass rule, AgentCore checks each proxy’s domainPatterns in order and routes the request through the first proxy whose pattern matches. If no proxy pattern matches either, the request falls through to the default proxy, which is the proxy entry that has no domainPatterns defined.
Test the new proxy feature with this code example.
Feature 2: Browser profiles
Browser profiles let you persist and reuse session data across multiple browser sessions, including cookies and local storage. An agent that authenticates with a web portal in one session can restore that state in a later session without logging in again. This is useful for authenticated workflows where re-login adds latency, e-commerce testing where shopping carts and form data need to survive between sessions, and multi-step user journeys that span multiple browser invocations.
The profile lifecycle has four stages. You start by calling create_browser_profile() to create a named profile. At the end of a session, you call save_browser_session_profile() to capture the current cookies and local storage into that profile. When you start a new session, you pass the profile identifier in the profileConfiguration parameter of start_browser_session(), which restores the saved state into the new browser. When you no longer need the profile, you call delete_browser_profile() to clean it up.
The following example shows an agent that adds items to a shopping cart in one session and verifies they persist in a subsequent session.
Complete these prerequisites before proceeding.

import boto3

control_client = boto3.client(‘bedrock-agentcore-control’, region_name='<region>’) # replace by your region

session_client = boto3.client(‘bedrock-agentcore’, region_name='<region>’) # replace by your region

# Create a browser profile
profile = control_client.create_browser_profile(name=”ecommerce_profile”)
profile_id = profile[‘profileId’]

# Session 1: Add items to cart
session1 = session_client.start_browser_session(
browserIdentifier=”aws.browser.v1”,
name=”shopping-session-1″
)
# … agent navigates and adds items to cart …

# Save session state to profile
session_client.save_browser_session_profile(
sessionId=session1[‘sessionId’],
browserIdentifier=”aws.browser.v1”,
profileIdentifier=profile_id
)
session_client.stop_browser_session(sessionId=session1[‘sessionId’], browserIdentifier=”aws.browser.v1″)

# Session 2: Resume with saved profile
session2 = session_client.start_browser_session(
browserIdentifier=”aws.browser.v1”,
name=”shopping-session-2″,
profileConfiguration={“profileIdentifier”: profile_id}
)
# Cart items from Session 1 are now available

Test the new profile feature with this code example.
Feature 3: Browser extensions
Browser extensions let you load Chrome extensions into AgentCore Browser sessions to customize how the browser behaves. You package extensions as ZIP files, upload them to Amazon Simple Storage Service (Amazon S3), and reference them when starting a browser session. This provides access to functionality available through the Chrome extension API, from proxy routing and ad blocking to authentication helpers and content modification. For example, you can inject authentication tokens for internal applications, remove ads, and track scripts that interfere with agent navigation, or modify page content to improve how agents interact with a site.
Your extension should follow the standard Chromium extension format and adhere to Chromium extension guidelines.
Complete these prerequisites before proceeding.

Upload the extension to Amazon S3:

# Upload extension to S3

import boto3
s3 = boto3.client(‘s3’)
s3.upload_file(
‘my-extension.zip’,
‘amzn-s3-demo-bucket-extensions’,
‘extensions/my-extension.zip’
)

Then, start a session with the extension, pointing to the Amazon S3 bucket where you’ve uploaded the zip file:

import boto3
region = “<region>” # replace by your region
client = boto3.client(‘bedrock-agentcore’, region_name=region)

response = client.start_browser_session(
browserIdentifier=”aws.browser.v1″,
name=”my-session-with-extensions”,
sessionTimeoutSeconds=1800,
viewPort={
‘height’: 1080,
‘width’: 1920
},
extensions=[
{
“location”: {
“s3”: {
“bucket”: “amzn-s3-demo-bucket-extensions”,
“prefix”: “extensions/my-extension.zip”
}
}
},
{
“location”: {
“s3”: {
“bucket”: “amzn-s3-demo-bucket-extensions”,
“prefix”: “extensions/another-extension.zip”,
“versionId”: “abc123″ # Optional – for versioned S3 buckets
}
}
}
]
)

print(f”Session ID: {response[‘sessionId’]}”)
print(f”Status: {response[‘status’]}”)
print(f”Automation Stream: {response[‘streams’][‘automationStream’][‘streamEndpoint’]}”)

Test the new extensions feature with this code example.
Conclusion
Proxy configuration, browser profiles, and browser extensions give AgentCore Browser the proxy routing, session persistence, and extensibility controls that customers need to deploy AI agents that browse the web in production. You can route traffic through your corporate proxy infrastructure, maintain session continuity across interactions, and customize browser behavior with extensions, all while keeping credentials secure in AWS Secrets Manager. Customers can carry e-commerce context and information among sessions, create your own extension and test it in a secure environment before release, and, also, have browser connecting into your network through proxies. 
To get started, see the tutorials in the Amazon Bedrock AgentCore samples repository and the Amazon Bedrock AgentCore Browser documentation.  For more information about pricing, visit Amazon Bedrock AgentCore Pricing. 

About the Authors

Joshua Samuel
Joshua Samuel is a Senior AI/ML Specialist Solutions Architect at AWS who accelerates enterprise transformation through AI/ML, and generative AI solutions, based in Melbourne, Australia. A passionate disrupter, he specializes in agentic AI and coding techniques – Anything that makes builders faster and happier. Outside work, he tinkers with home automation and AI coding projects, and enjoys life with his wife, kids and dog.

Evandro Franco
Evandro Franco is a Sr. Data Scientist working on Amazon Web Services. He is part of the Global GTM team that helps AWS customers overcome business challenges related to AI/ML on top of AWS, mainly on Amazon Bedrock AgentCore and Strands Agents. He has more than 18 years of experience working with technology, from software development, infrastructure, serverless, to machine learning. In his free time, Evandro enjoys playing with his son, mainly building some funny Lego bricks.

Kosti Vasilakakis
Kosti Vasilakakis is a Principal PM at AWS on the Agentic AI team, where he has led the design and development of several Bedrock AgentCore services from the ground up, including Runtime, Browser, Code Interpreter, and Identity. He previously worked on Amazon SageMaker since its early days, launching AI/ML capabilities now used by thousands of companies worldwide. Earlier in his career, Kosti was a data scientist. Outside of work, he builds personal productivity automations, plays tennis, and enjoys life with his wife and kids.

Yan Marim
Yan Marim is a Sr. GenAI Specialist Solutions Architect at Amazon Web Services, based in Brazil. As part of the LATAM Specialist team, he guides customers through their generative AI adoption journey, focusing on Amazon Bedrock and agentic AI solutions. In his free time, Yan enjoys spending quality time with his wife and dog, and watching soccer games.

Kevin Orellana
Kevin Orellana is a Software Development Engineer at Amazon Web Services on the Bedrock AgentCore team, based in Seattle. He builds and operates core infrastructure powering agentic AI capabilities, including Browser, Code Interpreter, and Runtime. Earlier in his career, Kevin worked on the Bedrock inference team hosting frontier models. In his free time, he enjoys hiking with his Goldendoodle, experimenting with multi-agent simulations, and working toward building a personal AI assistant that speaks English, Spanish, and Mandarin.

OpenAI Releases a Research Preview of GPT‑5.3-Codex-Spark: A 15x Fas …

OpenAI just launched a new research preview called GPT-5.3 Codex-Spark. This model is built for 1 thing: extreme speed. While the standard GPT-5.3 Codex focuses on deep reasoning, Spark is designed for near-instant response times. It is the result of a deep hardware-software integration between OpenAI and Cerebras.

The results are game-changing. Spark is 15x faster than the flagship GPT-5.3 Codex. It consistently delivers over 1000 tokens per second. This speed effectively removes the delay between a developer’s thought and the model’s code output.

The Hardware: Wafer-Scale Engineering

The massive performance jump is powered by the Cerebras Wafer-Scale Engine 3 (WSE-3). Traditional AI models run on clusters of small GPUs. These GPUs must communicate to each other over cables, which creates a ‘bottleneck.’ This bottleneck slows down the speed of the model.

The WSE-3 is different. It is a single, giant chip the size of a whole silicon wafer. Because the entire model lives on 1 piece of silicon, there are no cables to slow it down. This architecture provides:

Massive on-chip memory.

Ultra-high bandwidth.

Low-latency compute.

By using the Cerebras CS-3 system, OpenAI can run inference at speeds that traditional GPU clusters cannot reach.

Software Optimizations and Low Latency

Speed is not just about the chip. OpenAI re-engineered the way the model communicates with your computer. They moved away from traditional request methods and introduced a persistent WebSocket connection.

This change leads to several technical improvements:

Round-Trip Time (RTT): Client-server overhead is reduced by 80%.

Time-to-First-Token (TTFT): This is improved by 50%, meaning the code starts appearing almost the moment you hit enter.

Per-Token Overhead: Internal processing time per token is cut by 30%.

These optimizations allow for ‘Real-Time Steering.’ You can interrupt the model while it is typing and redirect its logic without waiting for the full block to finish.

The Trade-offs: Speed vs. Reasoning

GPT-5.3 Codex-Spark is optimized for throughput, not deep complexity. It is a ‘smaller’ model than the flagship GPT-5.3 Codex. Because of this, it has lower reasoning depth.

https://openai.com/index/introducing-gpt-5-3-codex-spark/

https://openai.com/index/introducing-gpt-5-3-codex-spark/

Devs should be aware of these performance differences:

Benchmarks: Spark scores lower on SWE-Bench Pro and Terminal-Bench 2.0 compared to the flagship model. It may struggle with very complex, multi-file architecture changes.

Security: Under OpenAI’s Preparedness Framework, the flagship GPT-5.3 Codex is rated as ‘High’ capability for cybersecurity. Spark does not meet this high threshold. It should not be used for sensitive security logic or autonomous authentication tasks.

Quick Specs and Access

Spark is available now for ChatGPT Pro users and developers. You can access it through the following tools:

Codex App: Use the model picker to select ‘Spark.’

VS Code Extension: Integrated directly into the composer.

CLI: Access it via the command codex –model gpt-5.3-codex-spark.

FeatureGPT-5.3 Codex-SparkGPT-5.3 Codex (Flagship)Tokens per Second1000+~70Context Window128k128kHardwareCerebras WSE-3NVIDIA GPU ClustersBest ForFast IterationDeep Reasoning / Security

Key Takeaways

Great Speed: Spark is 15x faster than the flagship GPT-5.3 Codex, delivering an unprecedented throughput of over 1,000 tokens per second to enable near-instant code generation.

Custom Silicon Infrastructure: This is OpenAI’s first model to run on Cerebras Wafer-Scale Engine 3 (WSE-3) hardware rather than traditional NVIDIA GPUs, using ‘wafer-scale’ memory to eliminate data bottlenecks.

Drastic Latency Reduction: The integration of a persistent WebSocket connection reduces client-server round-trip overhead by 80% and improves the time-to-first-token by 50%.

Real-Time Steering: Designed for ‘micro-iterations,’ the model’s speed allows developers to interrupt and redirect logic in real-time, shifting the workflow from batch-processing to live pair-programming.

Targeted Capability Trade-offs: While faster, Spark has lower reasoning depth than the flagship model and does not meet the ‘High capability’ threshold for cybersecurity in OpenAI’s Preparedness Framework, making it unsuitable for sensitive auth or security tasks.

Check out the Technical details here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post OpenAI Releases a Research Preview of GPT‑5.3-Codex-Spark: A 15x Faster AI Coding Model Delivering Over 1000 Tokens Per Second on Cerebras Hardware appeared first on MarkTechPost.

How to Build a Matryoshka-Optimized Sentence Embedding Model for Ultra …

In this tutorial, we fine-tune a Sentence-Transformers embedding model using Matryoshka Representation Learning so that the earliest dimensions of the vector carry the most useful semantic signal. We train with MatryoshkaLoss on triplet data and then validate the key promise of MRL by benchmarking retrieval quality after truncating embeddings to 64, 128, and 256 dimensions. At the end, we save the tuned model and demonstrate how to load it with a small truncate_dim setting for fast and memory-efficient vector search. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browser!pip -q install -U sentence-transformers datasets accelerate

import math
import random
import numpy as np
import torch

from datasets import load_dataset
from torch.utils.data import DataLoader

from sentence_transformers import SentenceTransformer, InputExample
from sentence_transformers import losses
from sentence_transformers.util import cos_sim

def set_seed(seed=42):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)

set_seed(42)

We install the required libraries and import all the necessary modules for training and evaluation. We set a deterministic seed, so our sampling and training behavior stay consistent across runs. We also ensure PyTorch and CUDA RNGs are aligned when a GPU is available. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browser@torch.no_grad()
def retrieval_metrics_mrr_recall_at_k(
model,
queries,
corpus,
qrels,
dims_list=(64, 128, 256, None),
k=10,
batch_size=64,
):
device = “cuda” if torch.cuda.is_available() else “cpu”
model.to(device)

qids = list(queries.keys())
docids = list(corpus.keys())

q_texts = [queries[qid] for qid in qids]
d_texts = [corpus[did] for did in docids]

q_emb = model.encode(q_texts, batch_size=batch_size, convert_to_tensor=True, normalize_embeddings=True)
d_emb = model.encode(d_texts, batch_size=batch_size, convert_to_tensor=True, normalize_embeddings=True)

results = {}

for dim in dims_list:
if dim is None:
qe = q_emb
de = d_emb
dim_name = “full”
else:
qe = q_emb[:, :dim]
de = d_emb[:, :dim]
dim_name = str(dim)
qe = torch.nn.functional.normalize(qe, p=2, dim=1)
de = torch.nn.functional.normalize(de, p=2, dim=1)

sims = cos_sim(qe, de)

mrr_total = 0.0
recall_total = 0.0

for i, qid in enumerate(qids):
rel = qrels.get(qid, set())
if not rel:
continue

topk = torch.topk(sims[i], k=min(k, sims.shape[1]), largest=True).indices.tolist()
topk_docids = [docids[j] for j in topk]

recall_total += 1.0 if any(d in rel for d in topk_docids) else 0.0

rr = 0.0
for rank, d in enumerate(topk_docids, start=1):
if d in rel:
rr = 1.0 / rank
break
mrr_total += rr

denom = max(1, len(qids))
results[dim_name] = {f”MRR@{k}”: mrr_total / denom, f”Recall@{k}”: recall_total / denom}

return results

def pretty_print(results, title):
print(“n” + “=” * 80)
print(title)
print(“=” * 80)
for dim, metrics in results.items():
print(f”dim={dim:>4} | ” + ” | “.join([f”{k}={v:.4f}” for k, v in metrics.items()]))

We implement a lightweight retrieval evaluator that encodes queries and documents, computes cosine similarity, and reports MRR@10 and Recall@10. We re-normalize embeddings after truncation so smaller prefixes remain comparable in cosine space. We also added a compact printer to make before/after comparisons easy to read. Check out the FULL CODES here.

Copy CodeCopiedUse a different BrowserDATASET_ID = “sentence-transformers/msmarco-co-condenser-margin-mse-sym-mnrl-mean-v1”
SUBSET = “triplet-hard”
SPLIT = “train”

TRAIN_SAMPLES = 4000
EVAL_QUERIES = 300

stream = load_dataset(DATASET_ID, SUBSET, split=SPLIT, streaming=True)

train_examples = []
eval_queries = {}
eval_corpus = {}
eval_qrels = {}

doc_id_counter = 0
qid_counter = 0

for row in stream:
q = (row.get(“query”) or “”).strip()
pos = (row.get(“positive”) or “”).strip()
neg = (row.get(“negative”) or “”).strip()

if not q or not pos or not neg:
continue

train_examples.append(InputExample(texts=[q, pos, neg]))

if len(eval_queries) < EVAL_QUERIES:
qid = f”q{qid_counter}”
qid_counter += 1

pos_id = f”d{doc_id_counter}”; doc_id_counter += 1
neg_id = f”d{doc_id_counter}”; doc_id_counter += 1

eval_queries[qid] = q
eval_corpus[pos_id] = pos
eval_corpus[neg_id] = neg
eval_qrels[qid] = {pos_id}

if len(train_examples) >= TRAIN_SAMPLES and len(eval_queries) >= EVAL_QUERIES:
break

print(len(train_examples), len(eval_queries), len(eval_corpus))

We stream a mined MS MARCO triplet dataset and build both a training set (queries, positives, negatives) and a tiny IR benchmark set. We map each query to a relevant positive document and include a negative document to make retrieval meaningful. We stop early to keep the run Colab-friendly while still large enough to show truncation effects.

Copy CodeCopiedUse a different BrowserMODEL_ID = “BAAI/bge-base-en-v1.5”

device = “cuda” if torch.cuda.is_available() else “cpu”
model = SentenceTransformer(MODEL_ID, device=device)
full_dim = model.get_sentence_embedding_dimension()

baseline = retrieval_metrics_mrr_recall_at_k(
model,
queries=eval_queries,
corpus=eval_corpus,
qrels=eval_qrels,
dims_list=(64, 128, 256, None),
k=10,
)
pretty_print(baseline, “BEFORE”)

We load a strong base embedding model and record its full embedding dimension. We run the baseline evaluation across 64/128/256/full dimensions to see how truncation behaves before any training. We print the results so we can later compare whether MRL improves the early-dimension quality.

Copy CodeCopiedUse a different Browserbatch_size = 16
epochs = 1
warmup_steps = 100

train_loader = DataLoader(train_examples, batch_size=batch_size, shuffle=True, drop_last=True)

base_loss = losses.MultipleNegativesRankingLoss(model=model)

mrl_dims = [full_dim, 512, 256, 128, 64] if full_dim >= 768 else [full_dim, 256, 128, 64]
mrl_loss = losses.MatryoshkaLoss(
model=model,
loss=base_loss,
matryoshka_dims=mrl_dims
)

model.fit(
train_objectives=[(train_loader, mrl_loss)],
epochs=epochs,
warmup_steps=warmup_steps,
show_progress_bar=True,
)

after = retrieval_metrics_mrr_recall_at_k(
model,
queries=eval_queries,
corpus=eval_corpus,
qrels=eval_qrels,
dims_list=(64, 128, 256, None),
k=10,
)
pretty_print(after, “AFTER”)

out_dir = “mrl-msmarco-demo”
model.save(out_dir)

m64 = SentenceTransformer(out_dir, truncate_dim=64)
emb = m64.encode(
[“what is the liberal arts?”, “liberal arts covers humanities and sciences”],
normalize_embeddings=True
)
print(emb.shape)

We create a MultipleNegativesRankingLoss and wrap it with MatryoshkaLoss using a descending list of target prefix dimensions. We fine-tune the model on the triplets, then re-run the same truncation benchmark to measure the improvement in retention. Also, we save the model and reload it with truncate_dim=64 to confirm practical usage for compact retrieval.

In conclusion, we successfully trained a Matryoshka-optimized embedding model that maintains strong retrieval performance even when we truncate vectors to small prefix dimensions, such as 64. We verified the effect by comparing baseline versus post-training retrieval metrics across multiple truncation sizes and the full embedding. With the saved model and the truncate_dim loading pattern, we now have a clean workflow for building smaller, faster vector indexes while keeping the option to rerank with full-dimensional embeddings.

Check out the FULL CODES here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post How to Build a Matryoshka-Optimized Sentence Embedding Model for Ultra-Fast Retrieval with 64-Dimension Truncation appeared first on MarkTechPost.

Is This AGI? Google’s Gemini 3 Deep Think Shatters Humanity’s Last …

Google announced a major update to Gemini 3 Deep Think today. This update is specifically built to accelerate modern science, research, and engineering. This seems to be more than just another model release. It represents a pivot toward a ‘reasoning mode’ that uses internal verification to solve problems that previously required human expert intervention.

The updated model is hitting benchmarks that redefine the frontier of intelligence. By focusing on test-time compute—the ability of a model to ‘think’ longer before generating a response—Google is moving beyond simple pattern matching.

https://blog.google/innovation-and-ai/models-and-research/gemini-models/gemini-3-deep-think/

Redefining AGI with 84.6% on ARC-AGI-2

The ARC-AGI benchmark is an ultimate test of intelligence. Unlike traditional benchmarks that test memorization, ARC-AGI measures a model’s ability to learn new skills and generalize to novel tasks it has never seen. Google team reported that Gemini 3 Deep Think achieved 84.6% on ARC-AGI-2, a result verified by the ARC Prize Foundation.

A score of 84.6% is a massive leap for the industry. To put this in perspective, humans average about 60% on these visual reasoning puzzles, while previous AI models often struggled to break 20%. This means the model is no longer just predicting the most likely next word. It is developing a flexible internal representation of logic. This capability is critical for R&D environments where engineers deal with messy, incomplete, or novel data that does not exist in a training set.

Passing ‘Humanity’s Last Exam‘

Google also set a new standard on Humanity’s Last Exam (HLE), scoring 48.4% (without tools). HLE is a benchmark consisting of 1000s of questions designed by subject matter experts to be easy for humans but nearly impossible for current AI. These questions span specialized academic topics where data is scarce and logic is dense.

Achieving 48.4% without external search tools is a landmark for reasoning models. This performance indicates that Gemini 3 Deep Think can handle high-level conceptual planning. It can work through multi-step logical chains in fields like advanced law, philosophy, and mathematics without drifting into ‘hallucinations.’ It proves that the model’s internal verification systems are working effectively to prune incorrect reasoning paths.

Competitive Coding: The 3455 Elo Milestone

The most tangible update is in competitive programming. Gemini 3 Deep Think now holds a 3455 Elo score on Codeforces. In the coding world, a 3455 Elo puts the model in the ‘Legendary Grandmaster’ tier, a level reached by only a tiny fraction of human programmers globally.

This score means the model excels at algorithmic rigor. It can handle complex data structures, optimize for time complexity, and solve problems that require deep memory management. This model serves as an elite pair programmer. It is particularly useful for ‘agentic coding’—where the AI takes a high-level goal and executes a complex, multi-file solution autonomously. In internal testing, Google team noted that Gemini 3 Pro showed 35% higher accuracy in resolving software engineering challenges than previous versions.

Advancing Science: Physics, Chemistry, and Math

Google’s update is specifically tuned for scientific discovery. Gemini 3 Deep Think achieved gold medal-level results on the written sections of the 2025 International Physics Olympiad and the 2025 International Chemistry Olympiad. It also reached gold-medal level performance on the International Math Olympiad 2025.

Beyond these student-level competitions, the model is performing at a professional research level. It scored 50.5% on the CMT-Benchmark, which tests proficiency in advanced theoretical physics. For researchers and data scientists in biotech or material science, this means the model can assist in interpreting experimental data or modeling physical systems.

Practical Engineering and 3D Modeling

The model’s reasoning isn’t just abstract; it has practical engineering utility. A new capability highlighted by Google team is the model’s ability to turn a sketch into a 3D-printable object. Deep Think can analyze a 2D drawing, model the complex 3D shapes through code, and generate a final file for a 3D printer.

This reflects the model’s ‘agentic’ nature. It can bridge the gap between a visual idea and a physical product by using code as a tool. For engineers, this reduces the friction between design and prototyping. It also excels at solving complex optimization problems, such as designing recipes for growing thin films in specialized chemical processes.

Key Takeaways

Breakthrough Abstract Reasoning: The model achieved 84.6% on ARC-AGI-2 (verified by the ARC Prize Foundation), proving it can learn novel tasks and generalize logic rather than relying on memorized training data.

Elite Coding Performance: With a 3455 Elo score on Codeforces, Gemini 3 Deep Think performs at the ‘Legendary Grandmaster’ level, outperforming the vast majority of human competitive programmers in algorithmic complexity and system architecture.

New Standard for Expert Logic: It scored 48.4% on Humanity’s Last Exam (without tools), demonstrating the ability to resolve high-level, multi-step logical chains that were previously considered ‘too human’ for AI to solve.

Scientific Olympiad Success: The model achieved gold medal-level results on the written sections of the 2025 International Physics and Chemistry Olympiads, showcasing its capacity for professional-grade research and complex physical modeling.

Scaled Inference-Time Compute: Unlike traditional LLMs, this ‘Deep Think’ mode utilizes test-time compute to internally verify and self-correct its logic before answering, significantly reducing technical hallucinations.

Check out the Technical details here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post Is This AGI? Google’s Gemini 3 Deep Think Shatters Humanity’s Last Exam And Hits 84.6% On ARC-AGI-2 Performance Today appeared first on MarkTechPost.

AI meets HR: Transforming talent acquisition with Amazon Bedrock

Organizations face significant challenges in making their recruitment processes more efficient while maintaining fair hiring practices. By using AI to transform their recruitment and talent acquisition processes, organizations can overcome these challenges. AWS offers a suite of AI services that can be used to significantly enhance the efficiency, effectiveness, and fairness of hiring practices. With AWS AI services, specifically Amazon Bedrock, you can build an efficient and scalable recruitment system that streamlines hiring processes, helping human reviewers focus on the interview and assessment of candidates.
In this post, we show how to create an AI-powered recruitment system using Amazon Bedrock, Amazon Bedrock Knowledge Bases, AWS Lambda, and other AWS services to enhance job description creation, candidate communication, and interview preparation while maintaining human oversight.
The AI-powered recruitment lifecycle
The recruitment process presents numerous opportunities for AI enhancement through specialized agents, each powered by Amazon Bedrock and connected to dedicated Amazon Bedrock knowledge bases. Let’s explore how these agents work together across key stages of the recruitment lifecycle.
Job description creation and optimization
Creating inclusive and attractive job descriptions is crucial for attracting diverse talent pools. The Job Description Creation and Optimization Agent uses advanced language models available in Amazon Bedrock and connects to an Amazon Bedrock knowledge base containing your organization’s historical job descriptions and inclusion guidelines.
Deploy the Job Description Agent with a secure Amazon Virtual Private Cloud (Amazon VPC) configuration and AWS Identity and Access Management (IAM) roles. The agent references your knowledge base to optimize job postings while maintaining compliance with organizational standards and inclusive language requirements.
Candidate communication management
The Candidate Communication Agent manages candidate interactions through the following components:

Lambda functions that trigger communications based on workflow stages
Amazon Simple Notification Service (Amazon SNS) for secure email and text delivery
Integration with approval workflows for regulated communications
Automated status updates based on candidate progression

Configure the Communication Agent with proper VPC endpoints and encryption for all data in transit and at rest. Use Amazon CloudWatch monitoring to track communication effectiveness and response rates.
Interview preparation and feedback
The Interview Prep Agent supports the interview process by:

Accessing a knowledge base containing interview questions, SOPs, and best practices
Generating contextual interview materials based on role requirements
Analyzing interviewer feedback and notes using Amazon Bedrock to identify key sentiments and consistent themes across evaluations
Maintaining compliance with interview standards stored in the knowledge base

Although the agent provides interview structure and guidance, interviewers maintain full control over the conversation and evaluation process.
Solution overview
The architecture brings together the recruitment agents and AWS services into a comprehensive recruitment system that enhances and streamlines the hiring process.The following diagram shows how three specialized AI agents work together to manage different aspects of the recruitment process, from job posting creation through summarizing interview feedback. Each agent uses Amazon Bedrock and connects to dedicated Amazon Bedrock knowledge bases while maintaining security and compliance requirements.

The solution consists of three main components working together to improve the recruitment process:

Job Description Creation and Optimization Agent – The Job Description Creation and Optimization Agent uses the AI capabilities of Amazon Bedrock to create and refine job postings, connecting directly to an Amazon Bedrock knowledge base that contains example descriptions and best practices for inclusive language.
Candidate Communication Agent – For candidate communications, the dedicated agent streamlines interactions through an automated system. It uses Lambda functions to manage communication workflows and Amazon SNS for reliable message delivery. The agent maintains direct connections with candidates while making sure communications follow approved templates and procedures.
Interview Prep Agent – The Interview Prep Agent serves as a comprehensive resource for interviewers, providing guidance on interview formats and questions while helping structure, summarize, and analyze feedback. It maintains access to a detailed knowledge base of interview standards and uses the natural language processing capabilities of Amazon Bedrock to analyze interview feedback patterns and themes, helping maintain consistent evaluation practices across hiring teams.

Prerequisites
Before implementing this AI-powered recruitment system, make sure you have the following:

AWS account and access:

An AWS account with administrator access
Access to Amazon Bedrock foundation models (FMs)
Permissions to create and manage IAM roles and policies

AWS services required:

Amazon API Gateway
Amazon Bedrock with access to FMs
Amazon Bedrock Knowledge Bases
Amazon CloudWatch
AWS Key Management Service (AWS KMS)
AWS Lambda
Amazon SNS
Amazon Simple Storage Service (Amazon S3) for knowledge base storage
Amazon VPC

Technical requirements:

Basic knowledge of Python 3.9 or later (for Lambda functions)
Network access to configure VPC endpoints

Security and compliance:

Understanding of AWS security best practices
SSL/TLS certificates for secure communications
Compliance approval from your organization’s security team

In the following sections, we examine the key components that make up our AI-powered recruitment system. Each piece plays a crucial role in creating a secure, scalable, and effective solution. We start with the infrastructure definition and work our way through the deployment, knowledge base integration, core AI agents, and testing tools.
Infrastructure as code
The following AWS CloudFormation template defines the complete AWS infrastructure, including VPC configuration, security groups, Lambda functions, API Gateway, and knowledge bases. It facilities secure, scalable deployment with proper IAM roles and encryption.

AWSTemplateFormatVersion: ‘2010-09-09’
Description: ‘AI-Powered Recruitment System with Security and Knowledge Bases’

Parameters:
  Environment:
    Type: String
    Default: dev
    AllowedValues: [dev, prod]

Resources:
  # KMS Key for encryption
  RecruitmentKMSKey:
    Type: AWS::KMS::Key
    Properties:
      Description: “Encryption key for recruitment system”
      KeyPolicy:
        Statement:
          – Effect: Allow
            Principal:
              AWS: !Sub ‘arn:aws:iam::${AWS::AccountId}:root’
            Action: ‘kms:*’
            Resource: ‘*’

  RecruitmentKMSAlias:
    Type: AWS::KMS::Alias
    Properties:
      AliasName: !Sub ‘alias/recruitment-${Environment}’
      TargetKeyId: !Ref RecruitmentKMSKey

  # VPC Configuration
  RecruitmentVPC:
    Type: AWS::EC2::VPC
    Properties:
      CidrBlock: 10.0.0.0/16
      EnableDnsHostnames: true
      EnableDnsSupport: true
      Tags:
        – Key: Name
          Value: !Sub ‘recruitment-vpc-${Environment}’

  PrivateSubnet:
    Type: AWS::EC2::Subnet
    Properties:
      VpcId: !Ref RecruitmentVPC
      CidrBlock: 10.0.1.0/24
      AvailabilityZone: !Select [0, !GetAZs ”]
 
 PrivateSubnetRouteTable:
    Type: AWS::EC2::RouteTable
    Properties:
      VpcId: !Ref RecruitmentVPC
      Tags:
        – Key: Name
          Value: !Sub ‘recruitment-private-rt-${Environment}’
 
 PrivateSubnetRouteTableAssociation:
    Type: AWS::EC2::SubnetRouteTableAssociation
    Properties:
      SubnetId: !Ref PrivateSubnet
      RouteTableId: !Ref PrivateSubnetRouteTable
 
# Example Interface Endpoints
VPCEBedrockRuntime:
  Type: AWS::EC2::VPCEndpoint
  Properties:
    VpcId: !Ref RecruitmentVPC
    ServiceName: !Sub ‘com.amazonaws.${AWS::Region}.bedrock-runtime’
    VpcEndpointType: Interface
    SubnetIds: [ !Ref PrivateSubnet ]
    SecurityGroupIds: [ !Ref LambdaSecurityGroup ]

VPCEBedrockAgent:
  Type: AWS::EC2::VPCEndpoint
  Properties:
    VpcId: !Ref RecruitmentVPC
    ServiceName: !Sub ‘com.amazonaws.${AWS::Region}.bedrock-agent’
    VpcEndpointType: Interface
    SubnetIds: [ !Ref PrivateSubnet ]
    SecurityGroupIds: [ !Ref LambdaSecurityGroup ]

VPCESNS:
  Type: AWS::EC2::VPCEndpoint
  Properties:
    VpcId: !Ref RecruitmentVPC
    ServiceName: !Sub ‘com.amazonaws.${AWS::Region}.sns’
    VpcEndpointType: Interface
    SubnetIds: [ !Ref PrivateSubnet ]
    SecurityGroupIds: [ !Ref LambdaSecurityGroup ]

# Gateway endpoints for S3 (and DynamoDB if you add it later)
VPCES3:
  Type: AWS::EC2::VPCEndpoint
  Properties:
    VpcId: !Ref RecruitmentVPC
    ServiceName: !Sub ‘com.amazonaws.${AWS::Region}.s3’
    VpcEndpointType: Gateway
    RouteTableIds:
      – !Ref PrivateSubnetRouteTable   # create if not present
  # Security Group
  LambdaSecurityGroup:
    Type: AWS::EC2::SecurityGroup
    Properties:
      GroupDescription: Security group for recruitment AWS Lambda functions
      VpcId: !Ref RecruitmentVPC
      SecurityGroupEgress:
        – IpProtocol: tcp
          FromPort: 443
          ToPort: 443
          CidrIp: 0.0.0.0/0

  # KnowledgeBase IAM role
  KnowledgeBaseRole:
  Type: AWS::IAM::Role
  Properties:
    AssumeRolePolicyDocument:
      Version: ‘2012-10-17’
      Statement:
        – Effect: Allow
          Principal: { Service: bedrock.amazonaws.com }
          Action: sts:AssumeRole
    Policies:
      – PolicyName: BedrockKBAccess
        PolicyDocument:
          Version: ‘2012-10-17’
          Statement:
            – Effect: Allow
              Action:
                – bedrock:Retrieve
                – bedrock:RetrieveAndGenerate
              Resource: “*”
            – Effect: Allow
              Action:
                – s3:GetObject
                – s3:ListBucket
              Resource: “*”   # scope to your KB bucket(s) in real deployments

    JobDescriptionKnowledgeBase:
        Type: AWS::Bedrock::KnowledgeBase
        Properties:
            Name: !Sub ‘job-descriptions-${Environment}’
            RoleArn: !GetAtt KnowledgeBaseRole.Arn
            KnowledgeBaseConfiguration:
                Type: VECTOR
                VectorKnowledgeBaseConfiguration:
                    EmbeddingModelArn: !Sub ‘arn:aws:bedrock:${AWS::Region}::foundation-model/amazon.titan-embed-text-v1’
            StorageConfiguration:
                Type: S3
                S3Configuration:
                    BucketArn: !Sub ‘arn:aws:s3:::your-kb-bucket-${Environment}-${AWS::AccountId}-${AWS::Region}’
                    BucketOwnerAccountId: !Ref AWS::AccountId

    InterviewKnowledgeBase:
        Type: AWS::Bedrock::KnowledgeBase
        Properties:
            Name: !Sub ‘interview-standards-${Environment}’
            RoleArn: !GetAtt KnowledgeBaseRole.Arn
            KnowledgeBaseConfiguration:
                Type: VECTOR
                VectorKnowledgeBaseConfiguration:
                   EmbeddingModelArn: arn:aws:bedrock:${AWS::Region}::foundation-model/amazon.titan-embed-text-v2:0
            StorageConfiguration:
                Type: S3
                S3Configuration:
                    BucketArn: !Sub ‘arn:aws:s3:::your-kb-bucket-${Environment}-${AWS::AccountId}-${AWS::Region}’
                    BucketOwnerAccountId: !Ref AWS::AccountId

  # CloudTrail for audit logging
  RecruitmentCloudTrail:
    Type: AWS::CloudTrail::Trail
    Properties:
      TrailName: !Sub ‘recruitment-audit-${Environment}’
      S3BucketName: !Ref AuditLogsBucket
      IncludeGlobalServiceEvents: true
      IsMultiRegionTrail: true
      EnableLogFileValidation: true
      KMSKeyId: !Ref RecruitmentKMSKey

  AuditLogsBucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Sub ‘recruitment-audit-logs-${Environment}-${AWS::AccountId}-${AWS::Region}’
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          – ServerSideEncryptionByDefault:
              SSEAlgorithm: aws:kms
              KMSMasterKeyID: !Ref RecruitmentKMSKey
  # IAM Role for AWS Lambda functions
  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: ‘2012-10-17’
        Statement:
          – Effect: Allow
            Principal:
              Service: lambda.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        – arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
      Policies:
        – PolicyName: BedrockAccess
          PolicyDocument:
            Version: ‘2012-10-17’
            Statement:
              – Effect: Allow
                Action:
                  – bedrock:InvokeModel
                  – bedrock:Retrieve
                Resource: ‘*’
              – Effect: Allow
                Action:
                  – sns:Publish
                Resource: !Ref CommunicationTopic
              – Effect: Allow
                Action:
                  – kms:Decrypt
                  – kms:GenerateDataKey
                Resource: !GetAtt RecruitmentKMSKey.Arn
              – Effect: Allow
                Action:
                  – aoss:APIAccessAll
                Resource: ‘*’

  # SNS Topic for notifications
  CommunicationTopic:
    Type: AWS::SNS::Topic
    Properties:
      TopicName: !Sub ‘recruitment-notifications-${Environment}’

  # AWS Lambda Functions
  JobDescriptionFunction:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: !Sub ‘recruitment-job-description-${Environment}’
      Runtime: python3.11
      Handler: job_description_agent.lambda_handler
      Role: !GetAtt LambdaExecutionRole.Arn
      Code:
        ZipFile: |
          # Code will be deployed separately
          def lambda_handler(event, context):
              return {‘statusCode’: 200, ‘body’: ‘Placeholder’}
      Timeout: 60

  CommunicationFunction:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: !Sub ‘recruitment-communication-${Environment}’
      Runtime: python3.11
      Handler: communication_agent.lambda_handler
      Role: !GetAtt LambdaExecutionRole.Arn
      Code:
        ZipFile: |
          def lambda_handler(event, context):
              return {‘statusCode’: 200, ‘body’: ‘Placeholder’}
      Timeout: 60
      Environment:
        Variables:
          SNS_TOPIC_ARN: !Ref CommunicationTopic
          KMS_KEY_ID: !Ref RecruitmentKMSKey
      VpcConfig:
        SecurityGroupIds:
          – !Ref LambdaSecurityGroup
        SubnetIds:
          – !Ref PrivateSubnet

  InterviewFunction:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: !Sub ‘recruitment-interview-${Environment}’
      Runtime: python3.11
      Handler: interview_agent.lambda_handler
      Role: !GetAtt LambdaExecutionRole.Arn
      Code:
        ZipFile: |
          def lambda_handler(event, context):
              return {‘statusCode’: 200, ‘body’: ‘Placeholder’}
      Timeout: 60

  # API Gateway
  RecruitmentAPI:
    Type: AWS::ApiGateway::RestApi
    Properties:
      Name: !Sub ‘recruitment-api-${Environment}’
      Description: ‘API for AI-Powered Recruitment System’

  # API Gateway Resources and Methods
  JobDescriptionResource:
    Type: AWS::ApiGateway::Resource
    Properties:
      RestApiId: !Ref RecruitmentAPI
      ParentId: !GetAtt RecruitmentAPI.RootResourceId
      PathPart: job-description

  JobDescriptionMethod:
    Type: AWS::ApiGateway::Method
    Properties:
      RestApiId: !Ref RecruitmentAPI
      ResourceId: !Ref JobDescriptionResource
      HttpMethod: POST
      AuthorizationType: NONE
      Integration:
        Type: AWS_PROXY
        IntegrationHttpMethod: POST
        Uri: !Sub ‘arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${JobDescriptionFunction.Arn}/invocations’

  CommunicationResource:
    Type: AWS::ApiGateway::Resource
    Properties:
      RestApiId: !Ref RecruitmentAPI
      ParentId: !GetAtt RecruitmentAPI.RootResourceId
      PathPart: communication

  CommunicationMethod:
    Type: AWS::ApiGateway::Method
    Properties:
      RestApiId: !Ref RecruitmentAPI
      ResourceId: !Ref CommunicationResource
      HttpMethod: POST
      AuthorizationType: NONE
      Integration:
        Type: AWS_PROXY
        IntegrationHttpMethod: POST
        Uri: !Sub ‘arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${CommunicationFunction.Arn}/invocations’

  InterviewResource:
    Type: AWS::ApiGateway::Resource
    Properties:
      RestApiId: !Ref RecruitmentAPI
      ParentId: !GetAtt RecruitmentAPI.RootResourceId
      PathPart: interview

  InterviewMethod:
    Type: AWS::ApiGateway::Method
    Properties:
      RestApiId: !Ref RecruitmentAPI
      ResourceId: !Ref InterviewResource
      HttpMethod: POST
      AuthorizationType: NONE
      Integration:
        Type: AWS_PROXY
        IntegrationHttpMethod: POST
        Uri: !Sub ‘arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${InterviewFunction.Arn}/invocations’

  # Lambda Permissions
  JobDescriptionPermission:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName: !Ref JobDescriptionFunction
      Action: lambda:InvokeFunction
      Principal: apigateway.amazonaws.com
      SourceArn: !Sub ‘${RecruitmentAPI}/*/POST/job-description’

  CommunicationPermission:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName: !Ref CommunicationFunction
      Action: lambda:InvokeFunction
      Principal: apigateway.amazonaws.com
      SourceArn: !Sub ‘${RecruitmentAPI}/*/POST/communication’
      
  InterviewPermission:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName: !Ref InterviewFunction
      Action: lambda:InvokeFunction
      Principal: apigateway.amazonaws.com
      SourceArn: !Sub ‘${RecruitmentAPI}/*/POST/interview’
      
  # API Deployment
  APIDeployment:
  Type: AWS::ApiGateway::Deployment
  DependsOn:
    – JobDescriptionMethod
    – CommunicationMethod
    – InterviewMethod
    – JobDescriptionPermission
    – CommunicationPermission
    – InterviewPermission
  Properties:
    RestApiId: !Ref RecruitmentAPI
    StageName: !Ref Environment
 
Outputs:
  APIEndpoint:
    Description: ‘API Gateway endpoint URL’
    Value: !Sub ‘https://${RecruitmentAPI}.execute-api.${AWS::Region}.amazonaws.com/${Environment}’
  
  SNSTopicArn:
    Description: ‘SNS Topic ARN for notifications’
    Value: !Ref CommunicationTopic

Deployment automation
The following automation script handles deployment of the recruitment system infrastructure and Lambda functions. It manages CloudFormation stack creation and updates and Lambda function code updates, making system deployment and updates streamlined and consistent.

#!/usr/bin/env python3
“””
Deployment script for Basic Recruitment System
“””

import boto3
import zipfile
import os
import json
from pathlib import Path

class BasicRecruitmentDeployment:
    def __init__(self, region=’us-east-1′):
        self.region = region
        self.lambda_client = boto3.client(‘lambda’, region_name=region)
        self.cf_client = boto3.client(‘cloudformation’, region_name=region)
    
    def create_lambda_zip(self, function_name):
        “””Create deployment zip for Lambda function”””
        zip_path = f”/tmp/{function_name}.zip”
        
        with zipfile.ZipFile(zip_path, ‘w’) as zip_file:
            zip_file.write(f”lambda_functions/{function_name}.py”, f”{function_name}.py”)
        
        return zip_path
    
    def update_lambda_function(self, function_name, environment=’dev’):
        “””Update Lambda function code”””
        zip_path = self.create_lambda_zip(function_name)
        
        try:
            with open(zip_path, ‘rb’) as zip_file:
                response = self.lambda_client.update_function_code(
                    FunctionName=f’recruitment-{function_name.replace(“_agent”, “”)}-{environment}’,
                    ZipFile=zip_file.read()
                )
            print(f”Updated {function_name}: {response[‘LastModified’]}”)
            return response
        except Exception as e:
            print(f”Error updating {function_name}: {e}”)
            return None
        finally:
            os.remove(zip_path)
    
    def deploy_infrastructure(self, environment=’dev’):
        “””Deploy CloudFormation stack”””
        stack_name = f’recruitment-system-{environment}’
        
        with open(‘infrastructure/cloudformation.yaml’, ‘r’) as template_file:
            template_body = template_file.read()
        
        try:
            response = self.cf_client.create_stack(
                StackName=stack_name,
                TemplateBody=template_body,
                Parameters=[
                    {‘ParameterKey’: ‘Environment’, ‘ParameterValue’: environment}
                ],
                Capabilities=[‘CAPABILITY_IAM’]
            )
            print(f”Created stack: {stack_name}”)
            return response
        except self.cf_client.exceptions.AlreadyExistsException:
            response = self.cf_client.update_stack(
                StackName=stack_name,
                TemplateBody=template_body,
                Parameters=[
                    {‘ParameterKey’: ‘Environment’, ‘ParameterValue’: environment}
                ],
                Capabilities=[‘CAPABILITY_IAM’]
            )
            print(f”Updated stack: {stack_name}”)
            return response
        except Exception as e:
            print(f”Error with stack: {e}”)
            return None
    
    def deploy_all(self, environment=’dev’):
        “””Deploy complete system”””
        print(f”Deploying recruitment system to {environment}”)
        
        # Deploy infrastructure
        self.deploy_infrastructure(environment)
        
        # Wait for stack to be ready (simplified)
        print(“Waiting for infrastructure…”)
        
        # Update AWS Lambda functions
        functions = [
            ‘job_description_agent’,
            ‘communication_agent’,
            ‘interview_agent’
        ]
        
        for func in functions:
            self.update_lambda_function(func, environment)
        
        print(“Deployment complete!”)

def main():
    deployment = BasicRecruitmentDeployment()
    
    print(“Basic Recruitment System Deployment”)
    print(“1. Deploys CloudFormation stack with AWS Lambda functions and API Gateway”)
    print(“2. Updates Lambda function code”)
    print(“3. Sets up SNS for notifications”)
    
    # Example deployment
    # deployment.deploy_all(‘dev’)

if __name__ == “__main__”:
    main()

Knowledge base integration
The central knowledge base manager interfaces with Amazon Bedrock knowledge base collections to provide best practices, templates, and standards to the recruitment agents. It enables AI agents to make informed decisions based on organizational knowledge.

import boto3
import json

class KnowledgeBaseManager:
    def __init__(self):
        self.bedrock_runtime = boto3.client(‘bedrock-runtime’)
        self.bedrock_agent_runtime = boto3.client(‘bedrock-agent-runtime’)

    def query_knowledge_base(self, kb_id: str, query: str):
        try:
            response = self.bedrock_agent_runtime.retrieve(
                knowledgeBaseId=kb_id,
                retrievalQuery={‘text’: query}
                # optionally add retrievalConfiguration={…}
            )
            return [r[‘content’][‘text’] for r in response.get(‘retrievalResults’, [])]
        except Exception as e:
            return [f”Knowledge Base query failed: {str(e)}”]

# Knowledge base IDs (to be created via CloudFormation)
KNOWLEDGE_BASES = {
    ‘job_descriptions’: ‘JOB_DESC_KB_ID’, 
    ‘interview_standards’: ‘INTERVIEW_KB_ID’,
    ‘communication_templates’: ‘COMM_KB_ID’
}

To improve Retrieval Augmented Generation (RAG) quality, start by tuning your Amazon Bedrock knowledge bases. Adjust chunk sizes and overlap for your documents, experiment with different embedding models, and enable reranking to promote the most relevant passages. For each agent, you can also choose different foundation models. For example, use a fast model such as Anthropic’s Claude 3 Haiku for high-volume job description and communication tasks, and a more capable model such as Anthropic’s Claude 3 Sonnet or another reasoning-optimized model for the Interview Prep Agent, where deeper analysis is required. Capture these experiments as part of your continuous improvement process so you can standardize on the best-performing configurations.
The core AI agents
The integration between the three agents is handled through API Gateway and Lambda, with each agent exposed through its own endpoint. The system uses three specialized AI agents.
Job Description Agent
This agent is the first step in the recruitment pipeline. It uses Amazon Bedrock to create inclusive and effective job descriptions by combining requirements with best practices from the knowledge base.

import json
import boto3
from datetime import datetime
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from knowledge_bases import KnowledgeBaseManager, KNOWLEDGE_BASES

bedrock = boto3.client(‘bedrock-runtime’)
kb_manager = KnowledgeBaseManager()

def lambda_handler(event, context):
    “””Job Description Agent Lambda function”””
    
    body = json.loads(event.get(‘body’, ‘{}’))
    
    role_title = body.get(‘role_title’, ”)
    requirements = body.get(‘requirements’, [])
    company_info = body.get(‘company_info’, {})
    
    # Query knowledge base for best practices
    kb_context = kb_manager.query_knowledge_base(
        KNOWLEDGE_BASES[‘job_descriptions’],
        f”inclusive job description examples for {role_title}”
    )
    
    prompt = f”””Create an inclusive job description for: {role_title}
    
Requirements: {‘, ‘.join(requirements)}
Company: {company_info.get(‘name’, ‘Our Company’)}
Culture: {company_info.get(‘culture’, ‘collaborative’)}
Remote: {company_info.get(‘remote’, False)}

Best practices from knowledge base:
{‘ ‘.join(kb_context[:2])}

Include: role summary, key responsibilities, qualifications, benefits.
Ensure inclusive language and avoid unnecessary barriers.”””
    
    try:
        response = bedrock.invoke_model(
            modelId=”anthropic.claude-3-haiku-20240307-v1:0″,
            body=json.dumps({
                “anthropic_version”: “bedrock-2023-05-31”,
                “max_tokens”: 2000,
                “messages”: [{“role”: “user”, “content”: prompt}]
            })
        )
        
        result = json.loads(response[‘body’].read())
        
        return {
            ‘statusCode’: 200,
            ‘headers’: {‘Content-Type’: ‘application/json’},
            ‘body’: json.dumps({
                ‘job_description’: result[‘content’][0][‘text’],
                ‘role_title’: role_title,
                ‘timestamp’: datetime.utcnow().isoformat()
            })
        }
        
    except Exception as e:
        return {
            ‘statusCode’: 500,
            ‘body’: json.dumps({‘error’: str(e)})
        }

Communication Agent
This agent manages candidate communications throughout the recruitment process. It integrates with Amazon SNS for notifications and provides professional, consistent messaging using approved templates.

import json
import boto3
from datetime import datetime

bedrock = boto3.client(‘bedrock-runtime’)
sns = boto3.client(‘sns’)

def lambda_handler(event, context):
    “””Communication Agent Lambda function”””
    
    body = json.loads(event.get(‘body’, ‘{}’))
    
    message_type = body.get(‘message_type’, ”)
    candidate_info = body.get(‘candidate_info’, {})
    stage = body.get(‘stage’, ”)
    
    prompt = f”””Generate {message_type} for candidate {candidate_info.get(‘name’, ‘Candidate’)}
at {stage} stage.

Message should be:
– Professional and empathetic
– Clear about next steps
– Appropriate for the stage
– Include timeline if relevant

Types: application_received, interview_invitation, rejection, offer”””
    
    try:
        response = bedrock.invoke_model(
            modelId=”anthropic.claude-3-haiku-20240307-v1:0″,
            body=json.dumps({
                “anthropic_version”: “bedrock-2023-05-31”,
                “max_tokens”: 1000,
                “messages”: [{“role”: “user”, “content”: prompt}]
            })
        )
        
        result = json.loads(response[‘body’].read())
        communication = result[‘content’][0][‘text’]
        
        # Send notification via SNS if topic ARN provided
        topic_arn = body.get(‘sns_topic_arn’)
        if topic_arn:
            sns.publish(
                TopicArn=topic_arn,
                Message=communication,
                Subject=f”Recruitment Update – {message_type}”
            )
        
        return {
            ‘statusCode’: 200,
            ‘headers’: {‘Content-Type’: ‘application/json’},
            ‘body’: json.dumps({
                ‘communication’: communication,
                ‘type’: message_type,
                ‘stage’: stage,
                ‘timestamp’: datetime.utcnow().isoformat()
            })
        }
        
    except Exception as e:
        return {
            ‘statusCode’: 500,
            ‘body’: json.dumps({‘error’: str(e)})
        }

Interview Prep Agent
This agent prepares tailored interview materials and questions based on the role and candidate background. It helps maintain consistent interview standards while adapting to specific positions.

import json
import boto3
from datetime import datetime

bedrock = boto3.client(‘bedrock-runtime’)

def lambda_handler(event, context):
    “””Interview Prep Agent Lambda function”””
    
    body = json.loads(event.get(‘body’, ‘{}’))
    
    role_info = body.get(‘role_info’, {})
    candidate_background = body.get(‘candidate_background’, {})
    
    prompt = f”””Prepare interview for:
Role: {role_info.get(‘title’, ‘Position’)}
Level: {role_info.get(‘level’, ‘Mid-level’)}
Key Skills: {role_info.get(‘key_skills’, [])}

Candidate Background:
Experience: {candidate_background.get(‘experience’, ‘Not specified’)}
Skills: {candidate_background.get(‘skills’, [])}

Generate:
1. 5-7 technical questions
2. 3-4 behavioral questions  
3. Evaluation criteria
4. Red flags to watch for”””
    
    try:
        response = bedrock.invoke_model(
            modelId=”anthropic.claude-3-haiku-20240307-v1:0″,
            body=json.dumps({
                “anthropic_version”: “bedrock-2023-05-31”,
                “max_tokens”: 2000,
                “messages”: [{“role”: “user”, “content”: prompt}]
            })
        )
        
        result = json.loads(response[‘body’].read())
        
        return {
            ‘statusCode’: 200,
            ‘headers’: {‘Content-Type’: ‘application/json’},
            ‘body’: json.dumps({
                ‘interview_prep’: result[‘content’][0][‘text’],
                ‘role’: role_info.get(‘title’),
                ‘timestamp’: datetime.utcnow().isoformat()
            })
        }
        
    except Exception as e:
        return {
            ‘statusCode’: 500,
            ‘body’: json.dumps({‘error’: str(e)})
        }

Testing and verification
The following test client demonstrates interaction with the recruitment system API. It provides example usage of major functions and helps verify system functionality.

#!/usr/bin/env python3
“””
Test client for Basic Recruitment System API
“””

import requests
import json

class RecruitmentClient:
    def __init__(self, api_endpoint):
        self.api_endpoint = api_endpoint.rstrip(‘/’)
    
    def create_job_description(self, role_title, requirements, company_info):
        “””Test job description creation”””
        url = f”{self.api_endpoint}/job-description”
        payload = {
            “role_title”: role_title,
            “requirements”: requirements,
            “company_info”: company_info
        }
        
        response = requests.post(url, json=payload)
        return response.json()
   
    def send_communication(self, message_type, candidate_info, stage):
        “””Test communication sending”””
        url = f”{self.api_endpoint}/communication”
        payload = {
            “message_type”: message_type,
            “candidate_info”: candidate_info,
            “stage”: stage
        }
        
        response = requests.post(url, json=payload)
        return response.json()

    def prepare_interview(self, role_info, candidate_background):
        “””Test interview preparation”””
        url = f”{self.api_endpoint}/interview”
        payload = {
            “role_info”: role_info,
            “candidate_background”: candidate_background
        }
        
        response = requests.post(url, json=payload)
        return response.json()

def main():
    # Replace with your actual API endpoint
    api_endpoint = “https://your-api-id.execute-api.us-east-1.amazonaws.com/dev”
    client = RecruitmentClient(api_endpoint)
    
    print(“Testing Basic Recruitment System”)
    
    # Test job description
    print(“n1. Testing Job Description Creation:”)
    job_result = client.create_job_description(
        role_title=”Senior Software Engineer”,
        requirements=[“5+ years Python”, “AWS experience”, “Team leadership”],
        company_info={“name”: “TechCorp”, “culture”: “collaborative”, “remote”: True}
    )
    print(json.dumps(job_result, indent=2))
    
    # Test communication
    print(“n2. Testing Communication:”)
    comm_result = client.send_communication(
        message_type=”interview_invitation”,
        candidate_info={“name”: “Jane Smith”, “email”: “jane@example.com”},
        stage=”initial_interview”
    )
    print(json.dumps(comm_result, indent=2))
    
    # Test interview prep
    print(“n3. Testing Interview Preparation:”)
    interview_result = client.prepare_interview(
        role_info={
            “title”: “Senior Software Engineer”,
            “level”: “Senior”,
            “key_skills”: [“Python”, “AWS”, “Leadership”]
        },
        candidate_background={
            “experience”: “8 years software development”,
            “skills”: [“Python”, “AWS”, “Team Lead”]
        }
    )
    print(json.dumps(interview_result, indent=2))

if __name__ == “__main__”:
    main()

During testing, track both qualitative and quantitative results. For example, measure recruiter satisfaction with generated job descriptions, response rates to candidate communications, and interviewers’ feedback on the usefulness of prep materials. Use these metrics to refine prompts, knowledge base contents, and model choices over time.
Clean up
To avoid ongoing charges when you’re done testing or if you want to tear down this solution, follow these steps in order:

Delete Lambda resources:

Delete all functions created for the agents.
Remove associated CloudWatch log groups.

Delete API Gateway endpoints:

Delete the API configurations.
Remove any custom domain names.
Delete all collections.
Remove any custom policies.
Wait for collections to be fully deleted before continuing to the next steps.

Delete SNS topics

Delete all topics created for communications.
Remove any subscriptions.

Delete VPC resources:

Remove VPC endpoints.
Delete security groups.
Delete the VPC if it was created specifically for this solution.

Clean up IAM resources:

Delete IAM roles created for the solution.
Remove any associated policies.
Delete service-linked roles if no longer needed.

Delete KMS keys:

Schedule key deletion for unused KMS keys (keep keys if they’re used by other applications).

Delete CloudWatch resources:

Delete dashboards.
Delete alarms.
Delete any custom metrics.

Clean up S3 buckets:

Empty buckets used for knowledge bases.
Delete the buckets.

Delete the Amazon Bedrock knowledge base.

After cleanup, take these steps to verify all charges are stopped:

Check your AWS bill for the next billing cycle
Verify all services have been properly terminated
Contact AWS Support if you notice any unexpected charges

Document the resources you’ve created and use this list as a checklist during cleanup to make sure you don’t miss any components that could continue to generate charges.
Implementing AI in recruitment: Best practices
To successfully implement AI in recruitment while maintaining ethical standards and human oversight, consider these essential practices.
Security, compliance, and infrastructure
The security implementation should follow a comprehensive approach to protect all aspects of the recruitment system. The solution deploys within a properly configured VPC with carefully defined security groups. All data, whether at rest or in transit, should be protected through AWS KMS encryption, and IAM roles are implemented following strict least privilege principles. The system maintains complete visibility through CloudWatch monitoring and audit logging, with secure API Gateway endpoints managing external communications. To protect sensitive information, implement data tokenization for personally identifiable information (PII) and maintain strict data retention policies. Regular privacy impact assessments and documented incident response procedures support ongoing security compliance.Consider the implementation of Amazon Bedrock Guardrails to provide granular control over AI model outputs, helping you enforce consistent safety and compliance standards across your AI applications. By implementing rule-based filters and boundaries, teams can prevent inappropriate content, maintain professional communication standards, and make sure responses align with their organization’s policies. You can configure guardrails at multiple levels—from individual agents to organization-wide implementations—with customizable controls for content filtering, topic restrictions, and response parameters. This systematic approach helps organizations mitigate risks while using AI capabilities, particularly in regulated industries or customer-facing applications where maintaining appropriate, unbiased, and safe interactions is crucial.
Knowledge base architecture and management
The knowledge base architecture should follow a hub-and-spoke model centered around a core repository of organizational knowledge. This central hub maintains essential information including company values, policies, and requirements, along with shared reference data used across the agents. Version control and backup procedures maintain data integrity and availability.Surrounding this central hub, specialized knowledge bases serve each agent’s unique needs. The Job Description Agent accesses writing guidelines and inclusion requirements. The Communication Agent draws from approved message templates and workflow definitions, and the Interview Prep Agent uses comprehensive question banks and evaluation criteria.
System integration and workflows
Successful system operation relies on robust integration practices and clearly defined workflows. Error handling and retry mechanisms facilitate reliable operation, and clear handoff points between agents maintain process integrity. The system should maintain detailed documentation of dependencies and data flows, with circuit breakers protecting against cascade failures. Regular testing through automated frameworks and end-to-end workflow validation supports consistent performance and reliability.
Human oversight and governance
The AI-powered recruitment system should prioritize human oversight and governance to promote ethical and fair practices. Establish mandatory review checkpoints throughout the process where human recruiters assess AI recommendations and make final decisions. To handle exceptional cases, create clear escalation paths that allow for human intervention when needed. Sensitive actions, such as final candidate selections or offer approvals, should be subject to multi-level human approval workflows.To maintain high standards, continuously monitor decision quality and accuracy, comparing AI recommendations with human decisions to identify areas for improvement. The team should undergo regular training programs to stay updated on the system’s capabilities and limitations, making sure they can effectively oversee and complement the AI’s work. Document clear override procedures, so recruiters can adjust or override AI decisions when necessary. Regular compliance training for team members reinforces the commitment to ethical AI use in recruitment.
Performance and cost management
To optimize system efficiency and manage costs effectively, implement a multi-faceted approach. Automatic scaling for Lambda functions makes sure the system can handle varying workloads without unnecessary resource allocation. For predictable workloads, use AWS Savings Plans to reduce costs without sacrificing performance. You can estimate the solution costs using the AWS Pricing Calculator, which helps plan for services like Amazon Bedrock, Lambda, and Amazon Bedrock Knowledge Bases.
Comprehensive CloudWatch dashboards provide real-time visibility into system performance, facilitating quick identification and addressing of issues. Establish performance baselines and regularly monitor against these to detect deviations or areas for improvement. Cost allocation tags help track expenses across different departments or projects, enabling more accurate budgeting and resource allocation.
To avoid unexpected costs, configure budget alerts that notify the team when spending approaches predefined thresholds. Regular capacity planning reviews make sure the infrastructure keeps pace with organizational growth and changing recruitment needs.
Continuous improvement framework
Commitment to excellence should be reflected in a continuous improvement framework. Conduct regular metric reviews and gather stakeholder feedback to identify areas for enhancement. A/B testing of new features or process changes allows for data-driven decisions about improvements. Maintain a comprehensive system of documentation, capturing lessons learned from each iteration or challenge encountered. This knowledge informs ongoing training data updates, making sure AI models remain current and effective. The improvement cycle should include regular system optimization, where algorithms are fine-tuned, knowledge bases updated, and workflows refined based on performance data and user feedback. Closely analyze performance trends over time, allowing proactive addressing of potential issues and capitalization on successful strategies. Stakeholder satisfaction should be a key metric in the improvement framework. Regularly gather feedback from recruiters, hiring managers, and candidates to verify if the AI-powered system meets the needs of all parties involved in the recruitment process.
Solution evolution and agent orchestration
As AI implementations mature and organizations develop multiple specialized agents, the need for sophisticated orchestration becomes critical. Amazon Bedrock AgentCore provides the foundation for managing this evolution, facilitating seamless coordination and communication between agents while maintaining centralized control. This orchestration layer streamlines the management of complex workflows, optimizes resource allocation, and supports efficient task routing based on agent capabilities. By implementing Amazon Bedrock AgentCore as part of your solution architecture, organizations can scale their AI operations smoothly, maintain governance standards, and support increasingly complex use cases that require collaboration between multiple specialized agents. This systematic approach to agent orchestration helps future-proof your AI infrastructure while maximizing the value of your agent-based solutions.
Conclusion
AWS AI services offer specific capabilities that can be used to transform recruitment and talent acquisition processes. By using these services and maintaining a strong focus on human oversight, organizations can create more efficient, fair, and effective hiring practices. The goal of AI in recruitment is not to replace human decision-making, but to augment and support it, helping HR professionals focus on the most valuable aspects of their roles: building relationships, assessing cultural fit, and making nuanced decisions that impact people’s careers and organizational success. As you embark on your AI-powered recruitment journey, start small, focus on tangible improvements, and keep the candidate and employee experience at the forefront of your efforts. With the right approach, AI can help you build a more diverse, skilled, and engaged workforce, driving your organization’s success in the long term.
For more information about AI-powered solutions on AWS, refer to the following resources:

Amazon Bedrock blog posts
Responsible AI

About the Authors
Dola Adesanya is a Customer Solutions Manager at Amazon Web Services (AWS), where she leads high-impact programs across customer success, cloud transformation, and AI-driven system delivery. With a unique blend of business strategy and organizational psychology expertise, she specializes in turning complex challenges into actionable solutions. Dola brings extensive experience in scaling programs and delivering measurable business outcomes.
RonHayman leads Customer Solutions for US Enterprise and Software Internet & Foundation Models at Amazon Web Services (AWS). His organization helps customers migrate infrastructure, modernize applications, and implement generative AI solutions. Over his 20-year career as a global technology executive, Ron has built and scaled cloud, security, and customer success teams. He combines deep technical expertise with a proven track record of developing leaders, organizing teams, and delivering customer outcomes.
Achilles Figueiredo is a Senior Solutions Architect at Amazon Web Services (AWS), where he designs and implements enterprise-scale cloud architectures. As a trusted technical advisor, he helps organizations navigate complex digital transformations while implementing innovative cloud solutions. He actively contributes to AWS’s technical advancement through AI, Security, and Resilience initiatives and serves as a key resource for both strategic planning and hands-on implementation guidance.
Sai Jeedigunta is a Sr. Customer Solutions Manager at AWS. He is passionate about partnering with executives and cross-functional teams in driving cloud transformation initiatives and helping them realize the benefits of cloud. He has over 20 years of experience in leading IT infrastructure engagements for fortune enterprises.

Build long-running MCP servers on Amazon Bedrock AgentCore with Strand …

AI agents are rapidly evolving from mere chat interfaces into sophisticated autonomous workers that handle complex, time-intensive tasks. As organizations deploy agents to train machine learning (ML) models, process large datasets, and run extended simulations, the Model Context Protocol (MCP) has emerged as a standard for agent-server integrations. But a critical challenge remains: these operations can take minutes or hours to complete, far exceeding typical session timeframes. By using Amazon Bedrock AgentCore and Strands Agents to implement persistent state management, you can enable seamless, cross-session task execution in production environments. Imagine your AI agent initiating a multi-hour data processing job, your user closing their laptop, and the system seamlessly retrieving completed results when the user returns days later—with full visibility into task progress, outcomes, and errors. This capability transforms AI agents from conversational assistants into reliable autonomous workers that can handle enterprise-scale operations. Without these architectural patterns, you’ll encounter timeout errors, inefficient resource utilization, and potential data loss when connections terminate unexpectedly.
In this post, we provide you with a comprehensive approach to achieve this. First, we introduce a context message strategy that maintains continuous communication between servers and clients during extended operations. Next, we develop an asynchronous task management framework that allows your AI agents to initiate long-running processes without blocking other operations. Finally, we demonstrate how to bring these strategies together with Amazon Bedrock AgentCore and Strands Agents to build production-ready AI agents that can handle complex, time-intensive operations reliably.
Common approaches to handle long-running tasks
When designing MCP servers for long-running tasks, you might face a fundamental architectural decision: should the server maintain an active connection and provide real-time updates, or should it decouple task execution from the initial request? This choice leads to two distinct approaches: context messaging and async task management.
Using context messaging
The context messaging approach maintains continuous communication between the MCP server and client throughout task execution. This is achieved by using MCP’s built-in context object to send periodic notifications to the client. This approach is optimal for scenarios where tasks are typically completed within 10–15 minutes and network connectivity remains stable. The context messaging approach offers these advantages:

Straightforward implementation
No additional polling logic required
Straightforward client implementation
Minimal overhead

Using async task management
The async task management approach separates task initiation from execution and result retrieval. After executing the MCP tool, the tool immediately returns a task initiation message while executing the task in the background. This approach excels in demanding enterprise scenarios where tasks might run for hours, users need flexibility to disconnect and reconnect, and system reliability is paramount. The async task management approach provides these benefits:

True fire-and-forget operation
Safe client disconnection while tasks continue processing
Data loss prevention through persistent storage
Support for long-running operations (hours)
Resilience against network interruptions
Asynchronous workflows

Context messaging
Let’s begin by exploring the context messaging approach, which provides a straightforward solution for handling moderately long operations while maintaining active connections. This approach builds directly on existing capabilities of MCP and requires minimal additional infrastructure, making it an excellent starting point for extending your agent’s processing time limits. Imagine you’ve built an MCP server for an AI agent that helps data scientists train ML models. When a user asks the agent to train a complex model, the underlying process might take 10–15 minutes—far beyond the typical 30-second to 2-minute HTTP timeout limit in most environments. Without a proper strategy, the connection would drop, the operation would fail, and the user would be left frustrated. In a Streamable HTTP transport for MCP client implementation, these timeout constraints are particularly limiting. When task execution exceeds the timeout limit, the connection aborts and the agent’s workflow interrupts. This is where context messaging comes in. The following diagram illustrates the workflow when implementing the context messaging approach. Context messaging uses the built-in context object of MCP to send periodic signals from the server to the MCP client, effectively keeping the connection alive throughout longer operations. Think of it as sending “heartbeat” messages that help prevent the connection from timing out.

Figure 1: Illustration of workflow in context messaging approach

Here is a code example to implement the context messaging:

from mcp.server.fastmcp import Context, FastMCP
import asyncio

mcp = FastMCP(host=”0.0.0.0″, stateless_http=True)

@mcp.tool()
async def model_training(model_name: str, epochs: int, ctx: Context) -> str:
    “””Execute a task with progress updates.”””

    for i in range(epochs):
        # Simulate long running time training work
        progress = (i + 1) / epochs
        await asyncio.sleep(5)
        await ctx.report_progress(
            progress=progress,
            total=1.0,
            message=f”Step {i + 1}/{epochs}”,
        )

    return f”{model_name} training completed. The model artifact is stored in s3://templocation/model.pickle . The model training score is 0.87, validation score is 0.82.”

if __name__ == “__main__”:
    mcp.run(transport=”streamable-http”)

The key element here is the Context parameter in the tool definition. When you include a parameter with the Context type annotation, FastMCP automatically injects this object, giving you access to methods such as ctx.info() and ctx.report_progress(). These methods send messages to the connected client without terminating tool execution.
The report_progress() calls within the training loop serve as those critical heartbeat messages, making sure the MCP connection remains active throughout the extended processing period.
For many real-world scenarios, exact progress can’t be easily quantified—such as when processing unpredictable datasets or making external API calls. In these cases, you can implement a time-based heartbeat system:

from mcp.server.fastmcp import Context, FastMCP
import time
import asyncio

mcp = FastMCP(host=”0.0.0.0″, stateless_http=True)

@mcp.tool()
async def model_training(model_name: str, epochs: int, ctx: Context) -> str:
    “””Execute a task with progress updates.”””
    done_event = asyncio.Event()
    start_time = time.time()

    async def timer():
        while not done_event.is_set():
            elapsed = time.time() – start_time
            await ctx.info(f”Processing ……: {elapsed:.1f} seconds elapsed”)
            await asyncio.sleep(5)  # Check every 5 seconds
        return

    timer_task = asyncio.create_task(timer())

    ## main task#####################################
    for i in range(epochs):
        # Simulate long running time training work
        progress = (i + 1) / epochs
        await asyncio.sleep(5)
    #################################################

    # Signal the timer to stop and clean up
    done_event.set()
    await timer_task

    total_time = time.time() – start_time
    print(f”⏱ Total processing time: {total_time:.2f} seconds”)

    return f”{model_name} training completed. The model artifact is stored in s3://templocation/model.pickle . The model training score is 0.87, validation score is 0.82.”

if __name__ == “__main__”:
    mcp.run(transport=”streamable-http”)

This pattern creates an asynchronous timer that runs alongside your main task, sending regular status updates every few seconds. Using asyncio.Event() for coordination facilitates clean shutdown of the timer when the main work is completed.
When to use context messaging
Context messaging works best when:

Tasks take 1–15 minutes to complete*
Network connections are generally stable
The client session can remain active throughout the operation
You need real-time progress updates during processing
Tasks have predictable, finite execution times with clear termination conditions

*Note: “15 minutes” is based on the maximum time for synchronous requests Amazon Bedrock AgentCore offered. More details about Bedrock AgentCore service quotas can be found at Quotas for Amazon Bedrock AgentCore. If the infrastructure hosting the agent doesn’t implement hard time limits, be extremely cautious when using this approach for tasks that might potentially hang or run indefinitely. Without proper safeguards, a stuck task could maintain an open connection indefinitely, leading to resource depletion, unresponsive processes, and potentially system-wide stability issues.
Here are some important limitations to consider:

Continuous connection required – The client session must remain active throughout the entire operation. If the user closes their browser or the network drops, the work is lost.
Resource consumption – Keeping connections open consumes server and client resources, potentially increasing costs for long-running operations.
Network dependency – Network instability can still interrupt the process, requiring a full restart.
Ultimate timeout limits – Most infrastructures have hard timeout limits that can’t be circumvented with heartbeat messages.

Therefore, for truly long-running operations that might take hours or for scenarios where users need to disconnect and reconnect later, you’ll need the more robust asynchronous task management approach.
Async task management
Unlike the context messaging approach where clients must maintain continuous connections, the async task management pattern follows a “fire and forget” model:

Task initiation – Client makes a request to start a task and immediately receives a task ID
Background processing – Server executes the work asynchronously, with no client connection required
Status checking – Client can reconnect whenever to check progress using the task ID
Result retrieval – When they’re completed, results remain available for retrieval whenever the client reconnects

The following figure illustrates the workflow in the asynchronous task management approach.

Figure 2: Illustration of workflow in asynchronous task management approach

This pattern mirrors how you interact with batch processing systems in enterprise environments—submit a job, disconnect, and check back later when convenient. Here’s a practical implementation that demonstrates these principles:

from mcp.server.fastmcp import Context, FastMCP
import asyncio
import uuid
from typing import Dict, Any

mcp = FastMCP(host=”0.0.0.0″, stateless_http=True)

# task storage
tasks: Dict[str, Dict[str, Any]] = {}

async def _execute_model_training(
        task_id: str,
        model_name: str,
        epochs: int
    ):
    “””Background task execution.”””
    tasks[task_id][“status”] = “running”
    
    for i in range(epochs):
        tasks[task_id][“progress”] = (i + 1) / epochs
        await asyncio.sleep(2)

    tasks[task_id][“result”] = f”{model_name} training completed. The model artifact is stored in s3://templocation/model.pickle . The model training score is 0.87, validation score is 0.82.”
    
    tasks[task_id][“status”] = “completed”

@mcp.tool()
def model_training(
    model_name: str,
    epochs: int = 10
    ) -> str:
    “””Start model training task.”””
    task_id = str(uuid.uuid4())
    tasks[task_id] = {
        “status”: “started”,
        “progress”: 0.0,
        “task_type”: “model_training”
    }
    asyncio.create_task(_execute_model_training(task_id, model_name, epochs))
    return f”Model Training task has been initiated with task ID: {task_id}. Please check back later to monitor completion status and retrieve results.”

@mcp.tool()
def check_task_status(task_id: str) -> Dict[str, Any]:
    “””Check the status of a running task.”””
    if task_id not in tasks:
        return {“error”: “task not found”}
    
    task = tasks[task_id]
    return {
        “task_id”: task_id,
        “status”: task[“status”],
        “progress”: task[“progress”],
        “task_type”: task.get(“task_type”, “unknown”)
    }

@mcp.tool()
def get_task_results(task_id: str) -> Dict[str, Any]:
    “””Get results from a completed task.”””
    if task_id not in tasks:
        return {“error”: “task not found”}
    
    task = tasks[task_id]
    if task[“status”] != “completed”:
        return {“error”: f”task not completed. Current status: {task[‘status’]}”}
    
    return {
        “task_id”: task_id,
        “status”: task[“status”],
        “result”: task[“result”]
    }

if __name__ == “__main__”:
    mcp.run(transport=”streamable-http”)

This implementation creates a task management system with three distinct MCP tools:

model_training() – The entry point that initiates a new task. Rather than performing the work directly, it:

Generates a unique task identifier using Universally Unique Identifier (UUID)
Creates an initial task record in the storage dictionary
Launches the actual processing as a background task using asyncio.create_task()
Returns immediately with the task ID, allowing the client to disconnect

check_task_status() – Allows clients to monitor progress at their convenience by:

Looking up the task by ID in the storage dictionary
Returning current status and progress information
Providing appropriate error handling for missing tasks

get_task_results()– Retrieves completed results when ready by:

Verifying the task exists and is completed
Returning the results stored during background processing
Providing clear error messages when results aren’t ready

The actual work happens in the private _execute_model_training() function, which runs independently in the background after the initial client request is completed. It updates the task’s status and progress in the shared storage as it progresses, making this information available for subsequent status checks.
Limitations to consider
Although the async task management approach helps solve connectivity issues, it introduces its own set of limitations:

User experience friction – The approach requires users to manually check task status, remember task IDs across sessions, and explicitly request results, increasing interaction complexity.
Volatile memory storage – Using in-memory storage (as in our example) means the tasks and results are lost if the server restarts, making the solution unsuitable for production without persistent storage.
Serverless environment constraints – In ephemeral serverless environments, instances are automatically terminated after periods of inactivity, causing the in-memory task state to be permanently lost. This creates a paradoxical situation where the solution designed to handle long-running operations becomes vulnerable to the exact duration it aims to support. Unless users maintain regular check-ins to help prevent session time limits, both tasks and results could vanish.

Moving toward a robust solution
To address these critical limitations, you need to include external persistence that survives both server restarts and instance terminations. This is where integration with dedicated storage services becomes essential. By using external agent memory storage systems, you can fundamentally change where and how task information is maintained. Instead of relying on the MCP server’s volatile memory, this approach uses persistent external agent memory storage services that remain available regardless of server state.
The key innovation in this enhanced approach is that when the MCP server runs a long-running task, it writes the interim or final results directly into external memory storage, such as Amazon Bedrock AgentCore Memory that the agent can access, as illustrated in the following figure. This helps create resilience against two types of runtime failures:

The instance running the MCP server can be terminated due to inactivity after task completion
The instance hosting the agent itself can be recycled in ephemeral serverless environments

Figure 3. MCP integration with external memory

With external memory storage, when users return to interact with the agent—whether minutes, hours, or days later—the agent can retrieve the completed task results from persistent storage. This approach minimizes runtime dependencies: even if both the MCP server and agent instances are terminated, the task results remain safely preserved and accessible when needed.
The next section will explore how to implement this robust solution using Amazon Bedrock AgentCore Runtime as a serverless hosting environment, AgentCore Memory for persistent agent memory storage, and the Strands Agents framework to orchestrate these components into a cohesive system that maintains task state across session boundaries.
Amazon Bedrock AgentCore and Strands Agents implementation
Before diving into the implementation details, it’s important to understand the deployment options available for MCP servers on Amazon Bedrock AgentCore. There are two primary approaches: Amazon Bedrock AgentCore Gateway and AgentCore Runtime. AgentCore Gateway has a 5-minute timeout for invocations, making it unsuitable for hosting MCP servers that provide tools requiring extended response times or long-running operations. AgentCore Runtime offers significantly more flexibility with a 15-minute request timeout (for synchronous requests) and adjustable maximum session duration (for asynchronous processes; the default duration is 8 hours) and idle session timeout. Although you could host an MCP server in a traditional serverful environment for unlimited execution time, AgentCore Runtime provides an optimal balance for most production scenarios. You gain serverless benefits such as automatic scaling, pay-per-use pricing, and no infrastructure management, while the adjustable maximums session duration covers most real-world long running tasks—from data processing and model training to report generation and complex simulations. You can use this approach to build sophisticated AI agents without the operational overhead of managing servers while reserving serverful deployments only for the rare cases that genuinely require multiday executions. For more information about AgentCore Runtime and AgentCore Gateway service quotas, refer to Quotas for Amazon Bedrock AgentCore.
Next, we walk through the implementation, which is illustrated in the following diagram. This implementation consists of two interconnected components: the MCP server that executes long-running tasks and writes results to AgentCore Memory, and the agent that manages the conversation flow and retrieves those results when needed. This architecture creates a seamless experience where users can disconnect during lengthy processes and return later to find their results waiting for them.

MCP server implementation
Let’s examine how our MCP server implementation uses AgentCore Memory to achieve persistence:

from mcp.server.fastmcp import Context, FastMCP
import asyncio
import uuid
from typing import Dict, Any
import json
from bedrock_agentcore.memory import MemoryClient

mcp = FastMCP(host=”0.0.0.0″, stateless_http=True)
agentcore_memory_client = MemoryClient()

async def _execute_model_training(
        model_name: str,
        epochs: int,
        session_id: str,
        actor_id: str,
        memory_id: str
    ):
    “””Background task execution.”””
    
    for i in range(epochs):
        await asyncio.sleep(2)

    try:
        response = agentcore_memory_client.create_event(
            memory_id=memory_id,
            actor_id=actor_id,
            session_id=session_id,
            messages=[
                (
                    json.dumps({
                        “message”: {
                            “role”: “user”,
                            “content”: [
                                {
                                    “text”: f”{model_name} training completed. The model artifact is stored in s3://templocation/model.pickle . The model training score is 0.87, validation score is 0.82.”
                                }
                            ]
                        },
                        “message_id”: 0
                    }),
                    ‘USER’
                )
            ]
        )
        print(response)
    except Exception as e:
        print(f”Memory save error: {e}”)

    return

@mcp.tool()
def model_training(
        model_name: str,
        epochs: int,
        ctx: Context
    ) -> str:
    “””Start model training task.”””

    print(ctx.request_context.request.headers)
    mcp_session_id = ctx.request_context.request.headers.get(“mcp-session-id”, “”)
    temp_id_list = mcp_session_id.split(“@@@”)
    session_id = temp_id_list[0]
    memory_id= temp_id_list[1]
    actor_id  = temp_id_list[2]

    asyncio.create_task(_execute_model_training(
            model_name,
            epochs,
            session_id,
            actor_id,
            memory_id
        )
    )
    return f”Model {model_name}Training task has been initiated. Total training epochs are {epochs}. The results will be updated once the training is completed.”

if __name__ == “__main__”:
    mcp.run(transport=”streamable-http”)

The implementation relies on two key components that enable persistence and session management.

The agentcore_memory_client.create_event() method serves as the bridge between tool execution and persistent memory storage. When a background task is completed, this method saves the results directly to the agent’s memory in AgentCore Memory using the specified memory ID, actor ID, and session ID. Unlike traditional approaches where results might be stored temporarily or require manual retrieval, this integration enables task outcomes to become permanent parts of the agent’s conversational memory. The agent can then reference these results in future interactions, creating a continuous knowledge-building experience across multiple sessions.
The second crucial component involves extracting session context through ctx.request_context.request.headers.get(“mcp-session-id”, “”). The “Mcp-Session-Id” is part of standard MCP protocol. You can use this header to pass a composite identifier containing three essential pieces of information in a delimited format: session_id@@@memory_id@@@actor_id. This approach allows our implementation to retrieve the necessary context identifiers from a single header value. Headers are used instead of environment variables by necessity—these identifiers change dynamically with each conversation, whereas environment variables remain static from container startup. This design choice is particularly important in multi-tenant scenarios where a single MCP server simultaneously handles requests from multiple users, each with their own distinct session context.

Another important aspect in this example involves proper message formatting when storing events. Each message saved to AgentCore Memory requires two components: the content and a role identifier. These two components need to be formatted in a way that the agent framework can be recognized. Here is an example for Strands Agents framework:

messages=[
    (
        json.dumps({
            “message”: {
                “role”: “user”,
                “content”: [
                    {
                        “text”: <message to the memory>
                    }
                ]
            },
            “message_id”: 0
        }),
        ‘USER’
    )
]

The content is an inner JSON object (serialized with json.dumps()) that contains the message details, including role, text content, and message ID. The outer role identifier (USER in this example) helps AgentCore Memory categorize the message source.
Strands Agents implementation
Integrating Amazon Bedrock AgentCore Memory with Strands Agents is remarkably straightforward using the AgentCoreMemorySessionManager class from the Bedrock AgentCore SDK. As shown in the following code example, implementation requires minimal configuration—create an AgentCoreMemoryConfig with your session identifiers, initialize the session manager with this config, and pass it directly to your agent constructor. The session manager transparently handles the memory operations behind the scenes, maintaining conversation history and context across interactions while organizing memories using the combination of session_id, memory_id, and actor_id. For more information, refer to AgentCore Memory Session Manager.

from bedrock_agentcore.memory.integrations.strands.config import AgentCoreMemoryConfig
from bedrock_agentcore.memory.integrations.strands.session_manager import AgentCoreMemorySessionManager

@app.entrypoint
async def strands_agent_main(payload, context):

    session_id = context.session_id
    if not session_id:
        session_id = str(uuid.uuid4())
    print(f”Session ID: {session_id}”)

    memory_id = payload.get(“memory_id”)
    if not memory_id:
        memory_id = “”
    print(f”? Memory ID: {memory_id}”)

    actor_id = payload.get(“actor_id”)
    if not actor_id:
        actor_id = “default”
        
    agentcore_memory_config = AgentCoreMemoryConfig(
        memory_id=memory_id,
        session_id=session_id,
        actor_id=actor_id
    )

    session_manager = AgentCoreMemorySessionManager(
        agentcore_memory_config=agentcore_memory_config
    )
    
    user_input = payload.get(“prompt”)

    headers = {
        “authorization”: f”Bearer {bearer_token}”,
        “Content-Type”: “application/json”,
        “Mcp-Session-Id”: session_id + “@@@” + memory_id + “@@@” + actor_id
    }

    # Connect to an MCP server using SSE transport
    streamable_http_mcp_client = MCPClient(
        lambda: streamablehttp_client(
                mcp_url,
                headers,
                timeout=30
            )
        )

    with streamable_http_mcp_client:
        # Get the tools from the MCP server
        tools = streamable_http_mcp_client.list_tools_sync()

        # Create an agent with these tools        
        agent = Agent(
            tools = tools,
            callback_handler=call_back_handler,
            session_manager=session_manager
        )

The session context management is particularly elegant here. The agent receives session identifiers through the payload and context parameters supplied by AgentCore Runtime. These identifiers form a crucial contextual bridge that connects user interactions across multiple sessions. The session_id can be extracted from the context object (generating a new one if needed), and the memory_id and actor_id can be retrieved from the payload. These identifiers are then packaged into a custom HTTP header (Mcp-Session-Id) that’s passed to the MCP server during connection establishment.
To maintain this persistent experience across multiple interactions, clients must consistently provide the same identifiers when invoking the agent:

# invoke agentcore through boto3
boto3_response = agentcore_client.invoke_agent_runtime(
   agentRuntimeArn=agent_arn,
   qualifier=”DEFAULT”,
   payload=json.dumps(
           {
               “prompt”: user_input,
               “actor_id”: actor_id,
               “memory_id”: memory_id
           }
       ),
   runtimeSessionId = session_id,
)

By consistently providing the same memory_id, actor_id, and runtimeSessionId across invocations, users can create a continuous conversational experience where task results persist independently of session boundaries. When a user returns days later, the agent can automatically retrieve both conversation history and the task results that were completed during their absence.
This architecture represents a significant advancement in AI agent capabilities—transforming long-running operations from fragile, connection-dependent processes into robust, persistent tasks that continue working regardless of connection state. The result is a system that can deliver truly asynchronous AI assistance, where complex work continues in the background and results are seamlessly integrated whenever the user returns to the conversation.
Conclusion
In this post, we’ve explored practical ways to help AI agents handle tasks that take minutes or even hours to complete. Whether using the more straightforward approach of keeping connections alive or the more advanced method of injecting task results to agent’s memory, these techniques enable your AI agent to tackle valuable complex work without frustrating time limits or lost results.
We invite you to try these approaches in your own AI agent projects. Start with context messaging for moderate tasks, then move to async management as your needs grow. The solutions we’ve shared can be quickly adapted to your specific needs, helping you build AI that delivers results reliably—even when users disconnect and return days later. What long-running tasks could your AI assistants handle better with these techniques?
To learn more, see the Amazon Bedrock AgentCore documentation and explore our sample notebook.

About the Authors
Haochen Xie is a Senior Data Scientist at AWS Generative AI Innovation Center. He is an ordinary person.
Flora Wang is an Applied Scientist at AWS Generative AI Innovation Center, where she works with customers to architect and implement scalable Generative AI solutions that address their unique business challenges. She specializes in model customization techniques and agent-based AI systems, helping organizations harness the full potential of generative AI technology.
Yuan Tian is an Applied Scientist at the AWS Generative AI Innovation Center, where he works with customers across diverse industries—including healthcare, life sciences, finance, and energy—to architect and implement generative AI solutions such as agentic systems. He brings a unique interdisciplinary perspective, combining expertise in machine learning with computational biology.
Hari Prasanna Das is an Applied Scientist at the AWS Generative AI Innovation Center, where he works with AWS customers across different verticals to expedite their use of Generative AI. Hari holds a PhD in Electrical Engineering and Computer Sciences from the University of California, Berkeley. His research interests include Generative AI, Deep Learning, Computer Vision, and Data-Efficient Machine Learning.

How to Build an Atomic-Agents RAG Pipeline with Typed Schemas, Dynamic …

In this tutorial, we build an advanced, end-to-end learning pipeline around Atomic-Agents by wiring together typed agent interfaces, structured prompting, and a compact retrieval layer that grounds outputs in real project documentation. Also, we demonstrate how to plan retrieval, retrieve relevant context, inject it dynamically into an answering agent, and run an interactive loop that turns the setup into a reusable research assistant for any new Atomic Agents question. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserimport os, sys, textwrap, time, json, re
from typing import List, Optional, Dict, Tuple
from dataclasses import dataclass
import subprocess
subprocess.check_call([sys.executable, “-m”, “pip”, “install”, “-q”,
“atomic-agents”, “instructor”, “openai”, “pydantic”,
“requests”, “beautifulsoup4”, “scikit-learn”])
from getpass import getpass
if not os.environ.get(“OPENAI_API_KEY”):
os.environ[“OPENAI_API_KEY”] = getpass(“Enter OPENAI_API_KEY (input hidden): “).strip()
MODEL = os.environ.get(“OPENAI_MODEL”, “gpt-4o-mini”)
from pydantic import Field
from openai import OpenAI
import instructor
from atomic_agents import AtomicAgent, AgentConfig, BaseIOSchema
from atomic_agents.context import SystemPromptGenerator, ChatHistory, BaseDynamicContextProvider
import requests
from bs4 import BeautifulSoup

We install all required packages, import the core Atomic-Agents primitives, and set up Colab-compatible dependencies in one place. We securely capture the OpenAI API key from the keyboard and store it in the environment so downstream code never hardcodes secrets. We also lock in a default model name while keeping it configurable via an environment variable.

Copy CodeCopiedUse a different Browserdef fetch_url_text(url: str, timeout: int = 20) -> str:
r = requests.get(url, timeout=timeout, headers={“User-Agent”: “Mozilla/5.0”})
r.raise_for_status()
soup = BeautifulSoup(r.text, “html.parser”)
for tag in soup([“script”, “style”, “nav”, “header”, “footer”, “noscript”]):
tag.decompose()
text = soup.get_text(“n”)
text = re.sub(r”[ t]+”, ” “, text)
text = re.sub(r”n{3,}”, “nn”, text).strip()
return text

def chunk_text(text: str, max_chars: int = 1400, overlap: int = 200) -> List[str]:
if not text:
return []
chunks = []
i = 0
while i < len(text):
chunk = text[i:i+max_chars].strip()
if chunk:
chunks.append(chunk)
i += max_chars – overlap
return chunks

def clamp(s: str, n: int = 800) -> str:
s = (s or “”).strip()
return s if len(s) <= n else s[:n].rstrip() + “…”

We fetch web pages from the Atomic Agents repo and docs, then clean them into plain text so retrieval becomes reliable. We chunk long documents into overlapping segments, preserving context while keeping each chunk small enough for ranking and citation. We also add a small helper to clamp long snippets so our injected context stays readable.

Copy CodeCopiedUse a different Browserfrom sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

@dataclass
class Snippet:
doc_id: str
url: str
chunk_id: int
text: str
score: float

class MiniCorpusRetriever:
def __init__(self, docs: Dict[str, Tuple[str, str]]):
self.items: List[Tuple[str, str, int, str]] = []
for doc_id, (url, raw) in docs.items():
for idx, ch in enumerate(chunk_text(raw)):
self.items.append((doc_id, url, idx, ch))
if not self.items:
raise RuntimeError(“No documents were fetched; cannot build TF-IDF index.”)
self.vectorizer = TfidfVectorizer(stop_words=”english”, max_features=50000)
self.matrix = self.vectorizer.fit_transform([it[3] for it in self.items])

def search(self, query: str, k: int = 6) -> List[Snippet]:
qv = self.vectorizer.transform([query])
sims = cosine_similarity(qv, self.matrix).ravel()
top = sims.argsort()[::-1][:k]
out = []
for j in top:
doc_id, url, chunk_id, txt = self.items[j]
out.append(Snippet(doc_id=doc_id, url=url, chunk_id=chunk_id, text=txt, score=float(sims[j])))
return out

class RetrievedContextProvider(BaseDynamicContextProvider):
def __init__(self, title: str, snippets: List[Snippet]):
super().__init__(title=title)
self.snippets = snippets

def get_info(self) -> str:
blocks = []
for s in self.snippets:
blocks.append(
f”[{s.doc_id}#{s.chunk_id}] (score={s.score:.3f}) {s.url}n{clamp(s.text, 900)}”
)
return “nn”.join(blocks)

We build a mini retrieval system using TF-IDF and cosine similarity over the chunked documentation corpus. We wrap each retrieved chunk in a structured Snippet object to track doc IDs, chunk IDs, and citation scores. We then inject top-ranked chunks into the agent’s runtime via a dynamic context provider, keeping the answering agent grounded. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserclass PlanInput(BaseIOSchema):
“””Input schema for the planner agent: describes the user’s task and how many retrieval queries to draft.”””
task: str = Field(…)
num_queries: int = Field(4)

class PlanOutput(BaseIOSchema):
“””Output schema from the planner agent: retrieval queries, coverage checklist, and safety checks.”””
queries: List[str]
must_cover: List[str]
safety_checks: List[str]

class AnswerInput(BaseIOSchema):
“””Input schema for the answering agent: user question plus style constraints.”””
question: str
style: str = “concise but advanced”

class AnswerOutput(BaseIOSchema):
“””Output schema for the answering agent: grounded answer, next steps, and which citations were used.”””
answer: str
next_steps: List[str]
used_citations: List[str]

client = instructor.from_openai(OpenAI(api_key=os.environ[“OPENAI_API_KEY”]))

planner_prompt = SystemPromptGenerator(
background=[
“You are a rigorous research planner for a small RAG system.”,
“You propose retrieval queries that are diverse (lexical + semantic) and designed to find authoritative info.”,
“You do NOT answer the task; you only plan retrieval.”
],
steps=[
“Read the task.”,
“Propose diverse retrieval queries (not too long).”,
“List must-cover aspects and safety checks.”
],
output_instructions=[
“Return strictly the PlanOutput schema.”,
“Queries must be directly usable as search strings.”,
“Must-cover should be 4–8 bullets.”
]
)

planner = AtomicAgent[PlanInput, PlanOutput](
config=AgentConfig(
client=client,
model=MODEL,
system_prompt_generator=planner_prompt,
history=ChatHistory(),
)
)

answerer_prompt = SystemPromptGenerator(
background=[
“You are an expert technical tutor for Atomic Agents (atomic-agents).”,
“You are given retrieved context snippets with IDs like [doc#chunk].”,
“You must ground claims in the provided snippets and cite them inline.”
],
steps=[
“Read the question and the provided context.”,
“Synthesize an accurate answer using only supported facts.”,
“Cite claims inline using the provided snippet IDs.”
],
output_instructions=[
“Use inline citations like [readme#12] or [docs_home#3].”,
“If the context does not support something, say so briefly and suggest what to retrieve next.”,
“Return strictly the AnswerOutput schema.”
]
)

answerer = AtomicAgent[AnswerInput, AnswerOutput](
config=AgentConfig(
client=client,
model=MODEL,
system_prompt_generator=answerer_prompt,
history=ChatHistory(),
)
)

We define strict-typed schemas for planner and answerer inputs and outputs, and include docstrings to satisfy Atomic Agents’ schema requirements. We create an Instructor-wrapped OpenAI client and configure two Atomic Agents with explicit system prompts and chat history. We enforce structured outputs so the planner produces queries and the answerer produces a cited response with clear next steps.

Copy CodeCopiedUse a different BrowserSOURCES = {
“readme”: “https://github.com/BrainBlend-AI/atomic-agents”,
“docs_home”: “https://brainblend-ai.github.io/atomic-agents/”,
“examples_index”: “https://brainblend-ai.github.io/atomic-agents/examples/index.html”,
}

raw_docs: Dict[str, Tuple[str, str]] = {}
for doc_id, url in SOURCES.items():
try:
raw_docs[doc_id] = (url, fetch_url_text(url))
except Exception:
raw_docs[doc_id] = (url, “”)

non_empty = [d for d in raw_docs.values() if d[1].strip()]
if not non_empty:
raise RuntimeError(“All source fetches failed or were empty. Check network access in Colab and retry.”)

retriever = MiniCorpusRetriever(raw_docs)

def run_atomic_rag(question: str, k: int = 7, verbose: bool = True) -> AnswerOutput:
t0 = time.time()
plan = planner.run(PlanInput(task=question, num_queries=4))
all_snips: List[Snippet] = []
for q in plan.queries:
all_snips.extend(retriever.search(q, k=max(2, k // 2)))
best: Dict[Tuple[str, int], Snippet] = {}
for s in all_snips:
key = (s.doc_id, s.chunk_id)
if (key not in best) or (s.score > best[key].score):
best[key] = s
snips = sorted(best.values(), key=lambda x: x.score, reverse=True)[:k]
ctx = RetrievedContextProvider(title=”Retrieved Atomic Agents Context”, snippets=snips)
answerer.register_context_provider(“retrieved_context”, ctx)
out = answerer.run(AnswerInput(question=question, style=”concise, advanced, practical”))
if verbose:
print(out.answer)
return out

demo_q = “Teach me Atomic Agents at an advanced level: explain the core building blocks and show how to chain agents with typed schemas and dynamic context.”
run_atomic_rag(demo_q, k=7, verbose=True)

while True:
user_q = input(“nYour question> “).strip()
if not user_q or user_q.lower() in {“exit”, “quit”}:
break
run_atomic_rag(user_q, k=7, verbose=True)

We fetch a small set of authoritative Atomic Agents sources and build a local retrieval index from them. We implement a full pipeline function that plans queries, retrieves relevant context, injects it, and produces a grounded final answer. We finish by running a demo query and launching an interactive loop so we can keep asking questions and getting cited answers.

In conclusion, we completed the Atomic-Agents workflow in Colab, cleanly separating planning, retrieval, answering, and ensuring strong typing. We kept the system grounded by injecting only the highest-signal documentation chunks as dynamic context, and we enforced a citation discipline that makes outputs auditable. From here, we can scale this pattern by adding more sources, swapping in stronger retrievers or rerankers, introducing tool-use agents, and turning the pipeline into a production-grade research assistant that remains both fast and trustworthy.

Check out the FULL CODES here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post How to Build an Atomic-Agents RAG Pipeline with Typed Schemas, Dynamic Context Injection, and Agent Chaining appeared first on MarkTechPost.

NVIDIA Researchers Introduce KVTC Transform Coding Pipeline to Compres …

Serving Large Language Models (LLMs) at scale is a massive engineering challenge because of Key-Value (KV) cache management. As models grow in size and reasoning capability, the KV cache footprint increases and becomes a major bottleneck for throughput and latency. For modern Transformers, this cache can occupy multiple gigabytes.

NVIDIA researchers have introduced KVTC (KV Cache Transform Coding). This lightweight transform coder compresses KV caches for compact on-GPU and off-GPU storage. It achieves up to 20x compression while maintaining reasoning and long-context accuracy. For specific use cases, it can reach 40x or higher.

https://arxiv.org/pdf/2511.01815

The Memory Dilemma in LLM Inference

In production, inference frameworks treat local KV caches like databases. Strategies like prefix sharing promote the reuse of caches to speed up responses. However, stale caches consume scarce GPU memory. Developers currently face a difficult choice:

Keep the cache: Occupies memory needed for other users.

Discard the cache: Incurs the high cost of recomputation.

Offload the cache: Moves data to CPU DRAM or SSDs, leading to transfer overheads.

KVTC largely mitigates this dilemma by lowering the cost of on-chip retention and reducing the bandwidth required for offloading.

https://arxiv.org/pdf/2511.01815

How the KVTC Pipeline Works?

The method is inspired by classical media compression. It applies a learned orthonormal transform, followed by adaptive quantization and entropy coding.

1. Feature Decorrelation (PCA)

Different attention heads often show similar patterns and a high degree of correlation. KVTC uses Principal Component Analysis (PCA) to linearly decorrelate features. Unlike other methods that calculate a separate decomposition for every prompt, KVTC computes the PCA basis matrix V once on a calibration dataset. This matrix is then reused for all future caches at inference time.

2. Adaptive Quantization

The system exploits the PCA ordering to allocate a fixed bit budget across coordinates. High-variance components receive more bits, while others receive fewer. KVTC uses a dynamic programming (DP) algorithm to find the optimal bit allocation that minimizes reconstruction error. Crucially, the DP often assigns 0 bits to trailing principal components, allowing for early dimensionality reduction and faster performance.

3. Entropy Coding

The quantized symbols are packed and compressed using the DEFLATE algorithm. To maintain speed, KVTC leverages the nvCOMP library, which enables parallel compression and decompression directly on the GPU.

Protecting Critical Tokens

Not all tokens are compressed equally. KVTC avoids compressing two specific types of tokens because they contribute disproportionately to attention accuracy:

Attention Sinks: The 4 oldest tokens in the sequence.

Sliding Window: The 128 most recent tokens.

Ablation studies show that compressing these specific tokens can significantly lower or even collapse accuracy at high compression ratios.

Benchmarks and Efficiency

The research team tested KVTC with models like Llama-3.1, Mistral-NeMo, and R1-Qwen-2.5.

Accuracy: At 16x compression (roughly 20x after DEFLATE), the model consistently maintains results within 1 score point of vanilla models.

TTFT Reduction: For an 8K context length, kvtc can reduce Time-To-First-Token (TTFT) by up to 8x compared to full recomputation.

Speed: Calibration is fast; for a 12B model, it can be completed within 10 minutes on an NVIDIA H100 GPU.

Storage Overhead: The extra data stored per model is small, representing only 2.4% of model parameters for Llama-3.3-70B.

KVTC is a practical building block for memory-efficient LLM serving. It does not modify model weights and is directly compatible with other token eviction methods.

https://arxiv.org/pdf/2511.01815

Key Takeaways

High Compression with Low Accuracy Loss: KVTC achieves a standard 20x compression ratio while maintaining results within 1 score point of vanilla (uncompressed) models across most reasoning and long-context benchmarks.

Transform Coding Pipeline: The method utilizes a pipeline inspired by classical media compression, combining PCA-based feature decorrelation, adaptive quantization via dynamic programming, and lossless entropy coding (DEFLATE).

Critical Token Protection: To maintain model performance, KVTC avoids compressing the 4 oldest ‘attention sink’ tokens and a ‘sliding window’ of the 128 most recent tokens.

Operational Efficiency: The system is ‘tuning-free,’ requiring only a brief initial calibration (under 10 minutes for a 12B model) that leaves model parameters unchanged and adds minimal storage overhead—only 2.4% for a 70B model.

Significant Latency Reduction: By reducing the volume of data stored and transferred, KVTC can reduce Time-To-First-Token (TTFT) by up to 8x compared to the full recomputation of KV caches for long contexts.

Check out the Paper here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post NVIDIA Researchers Introduce KVTC Transform Coding Pipeline to Compress Key-Value Caches by 20x for Efficient LLM Serving appeared first on MarkTechPost.

Mastering Amazon Bedrock throttling and service availability: A compre …

In production generative AI applications, we encounter a series of errors from time to time, and the most common ones are requests failing with 429 ThrottlingException and 503 ServiceUnavailableException errors. As a business application, these errors can happen due to multiple layers in the application architecture.
Most of the cases in these errors are retriable but this impacts user experience as the calls to the application get delayed. Delays in responding can disrupt a conversation’s natural flow, reduce user interest, and ultimately hinder the widespread adoption of AI-powered solutions in interactive AI applications.
One of the most common challenges is multiple users flowing on a single model for widespread applications at the same time. Mastering these errors means the difference between a resilient application and frustrated users.
This post shows you how to implement robust error handling strategies that can help improve application reliability and user experience when using Amazon Bedrock. We’ll dive deep into strategies for optimizing performances for the application with these errors. Whether this is for a fairly new application or matured AI application, in this post you will be able to find the practical guidelines to operate with on these errors.
Prerequisites

AWS account with Amazon Bedrock access
Python 3.x and boto3 installed
Basic understanding of AWS services
IAM Permissions: Ensure you have the following minimum permissions:

bedrock:InvokeModel or bedrock:InvokeModelWithResponseStream for your specific models
cloudwatch:PutMetricData, cloudwatch:PutMetricAlarm for monitoring
sns:Publish if using SNS notifications
Follow the principle of least privilege – grant only the permissions needed for your use case

Example IAM policy:
{
“Version”: “2012-10-17”,
“Statement”: [
{
“Effect”: “Allow”,
“Action”: [
“bedrock:InvokeModel”
],
“Resource”: “arn:aws:bedrock:us-east-1:123456789012:model/anthropic.claude-*”
}
]
}
Note: This walkthrough uses AWS services that may incur charges, including Amazon CloudWatch for monitoring and Amazon SNS for notifications. See AWS pricing pages for details.
Quick Reference: 503 vs 429 Errors
The following table compares these two error types:

Aspect
503 ServiceUnavailable
429 ThrottlingException

Primary Cause
Temporary service capacity issues, server failures
Exceeded account quotas (RPM/TPM)

Quota Related
Not Quota Related
Directly quota-related

Resolution Time
Transient, refreshes faster
Requires waiting for quota refresh

Retry Strategy
Immediate retry with exponential backoff
Must sync with 60-second quota cycle

User Action
Wait and retry, consider alternatives
Optimize request patterns, increase quotas

Deep dive into 429 ThrottlingException
A 429 ThrottlingException means Amazon Bedrock is deliberately rejecting some of your requests to keep overall usage within the quotas you have configured or that are assigned by default. In practice, you will most often see three flavors of throttling: rate-based, token-based, and model-specific.
1. Rate-Based Throttling (RPM – Requests Per Minute)
Error Message:
ThrottlingException: Too many requests, please wait before trying again.
Or:
botocore.errorfactory.ThrottlingException: An error occurred (ThrottlingException) when calling the InvokeModel operation: Too many requests, please wait before trying again
What this actually indicates
Rate-based throttling is triggered when the total number of Bedrock requests per minute to a given model and Region crosses the RPM quota for your account. The key detail is that this limit is enforced across the callers, not just per individual application or microservice.
Imagine a shared queue at a coffee shop: it does not matter which team is standing in line; the barista can only serve a fixed number of drinks per minute. As soon as more people join the queue than the barista can handle, some customers are told to wait or come back later. That “come back later” message is your 429.
Multi-application spike scenario
Suppose you have three production applications, all calling the same Bedrock model in the same Region:

App A normally peaks around 50 requests per minute.
App B also peaks around 50 rpm.
App C usually runs at about 50 rpm during its own peak.

Ops has requested a quota of 150 RPM for this model, which seems reasonable since 50 + 50 + 50 = 150 and historical dashboards show that each app stays around its expected peak.
However, in reality your traffic is not perfectly flat. Maybe during a flash sale or a marketing campaign, App A briefly spikes to 60 rpm while B and C stay at 50. The combined total for that minute becomes 160 rpm, which is above your 150 rpm quota, and some requests start failing with ThrottlingException.
You can also get into trouble when the three apps shift upward at the same time over longer periods. Imagine a new pattern where peak traffic looks like this:

App A: 75 rpm
App B: 50 rpm
App C: 50 rpm

Your new true peak is 175 rpm even though the original quota was sized for 150. In this situation, you will see 429 errors regularly during those peak windows, even if average daily traffic still looks “fine.”
Mitigation strategies
For rate-based throttling, the mitigation has two sides: client behavior and quota management.
On the client side:

Implement request rate limiting to cap how many calls per second or per minute each application can send. APIs, SDK wrappers, or sidecars like API gateways can enforce per-app budgets so one noisy client does not starve others.
Use exponential backoff with jitter on 429 errors so that retries can become gradually less frequent and are de-synchronized across instances.
Align retry windows with the quota refresh period: because RPM is enforced per 60-second window, retries that happen several seconds into the next minute are more likely to succeed.

On the quota side:

Analyze CloudWatch metrics for each application to determine true peak RPM rather than relying on averages.
Sum those peaks across the apps for the same model/Region, add a safety margin, and request an RPM increase through AWS Service Quotas if needed.

In the previous example, if App A peaks at 75 rpm and B and C peak at 50 rpm, you should plan for at least 175 rpm and realistically target something like 200 rpm to provide room for growth and unexpected bursts.
2. Token-Based Throttling (TPM – Tokens Per Minute)
Error message:
botocore.errorfactory.ThrottlingException: An error occurred (ThrottlingException) when calling the InvokeModel operation: Too many tokens, please wait before trying again.
Why token limits matter
Even if your request count is modest, a single large prompt or a model that produces long outputs can consume thousands of tokens at once. Token-based throttling occurs when the sum of input and output tokens processed per minute exceeds your account’s TPM quota for that model.
For example, an application that sends 10 requests per minute with 15,000 input tokens and 5,000 output tokens each is consuming roughly 200,000 tokens per minute, which may cross TPM thresholds far sooner than an application that sends 200 tiny prompts per minute.
What this looks like in practice
You may notice that your application runs smoothly under normal workloads, but suddenly starts failing when users paste large documents, upload long transcripts, or run bulk summarization jobs. These are symptoms that token throughput, not request frequency, is the bottleneck.
How to respond
To mitigate token-based throttling:

Monitor token usage by tracking InputTokenCount and OutputTokenCount metrics and logs for your Bedrock invocations.
Implement a token-aware rate limiter that maintains a sliding 60-second window of tokens consumed and only issues a new request if there is enough budget left.
Break large tasks into smaller, sequential chunks so you spread token consumption over multiple minutes instead of exhausting the entire budget in one spike.
Use streaming responses when appropriate; streaming often gives you more control over when to stop generation so you do not produce unnecessarily long outputs.

For consistently high-volume, token-intensive workloads, you should also evaluate requesting higher TPM quotas or using models with larger context windows and better throughput characteristics.
3. Model-Specific Throttling
Error message:
botocore.errorfactory.ThrottlingException: An error occurred (ThrottlingException) when calling the InvokeModel operation: Model anthropic.claude-haiku-4-5-20251001-v1:0 is currently overloaded. Please try again later.
What is happening behind the scenes
Model-specific throttling indicates that a particular model endpoint is experiencing heavy demand and is temporarily limiting additional traffic to keep latency and stability under control. In this case, your own quotas might not be the limiting factor; instead, the shared infrastructure for that model is temporarily saturated.
How to respond
One of the most effective approaches here is to design for graceful degradation rather than treating this as a hard failure.

Implement model fallback: define a priority list of compatible models (for example, Sonnet → Haiku) and automatically route traffic to a secondary model if the primary is overloaded.
Combine fallback with cross-Region inference so you can use the same model family in a nearby Region if one Region is temporarily constrained.
Expose fallback behavior in your observability stack so you can know when your system is running in “degraded but functional” mode instead of silently masking problems.

Implementing robust retry and rate limiting
Once you understand the types of throttling, the next step is to encode that knowledge into reusable client-side components.
Exponential backoff with jitter
Here’s a robust retry implementation that uses exponential backoff with jitter. This pattern is essential for handling throttling gracefully:
import time
import random
from botocore.exceptions import ClientError

def bedrock_request_with_retry(bedrock_client, operation, **kwargs):
“””Secure retry implementation with sanitized logging.”””
max_retries = 5
base_delay = 1
max_delay = 60

for attempt in range(max_retries):
try:
if operation == ‘invoke_model’:
return bedrock_client.invoke_model(**kwargs)
elif operation == ‘converse’:
return bedrock_client.converse(**kwargs)
except ClientError as e:
# Security: Log error codes but not request/response bodies
# which may contain sensitive customer data
if e.response[‘Error’][‘Code’] == ‘ThrottlingException’:
if attempt == max_retries – 1:
raise

# Exponential backoff with jitter
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay * 0.1)
time.sleep(delay + jitter)
continue
else:
raise
This pattern avoids hammering the service immediately after a throttling event and helps prevent many instances from retrying at the same exact moment.
Token-Aware Rate Limiting
For token-based throttling, the following class maintains a sliding window of token usage and gives your caller a simple yes/no answer on whether it is safe to issue another request:
import time
from collections import deque

class TokenAwareRateLimiter:
def __init__(self, tpm_limit):
self.tpm_limit = tpm_limit
self.token_usage = deque()

def can_make_request(self, estimated_tokens):
now = time.time()
# Remove tokens older than 1 minute
while self.token_usage and self.token_usage[0][0] < now – 60:
self.token_usage.popleft()

current_usage = sum(tokens for _, tokens in self.token_usage)
return current_usage + estimated_tokens <= self.tpm_limit

def record_usage(self, tokens_used):
self.token_usage.append((time.time(), tokens_used))
In practice, you would estimate tokens before sending the request, call can_make_request, and only proceed when it returns True, then call record_usage after receiving the response.
Understanding 503 ServiceUnavailableException
A 503 ServiceUnavailableException tells you that Amazon Bedrock is temporarily unable to process your request, often due to capacity pressure, networking issues, or exhausted connection pools. Unlike 429, this is not about your quota; it is about the health or availability of the underlying service at that moment.
Connection Pool Exhaustion
What it looks like:
botocore.errorfactory.ServiceUnavailableException: An error occurred (ServiceUnavailableException) when calling the ConverseStream operation (reached max retries: 4): Too many connections, please wait before trying again.
In many real-world scenarios this error is caused not by Bedrock itself, but by how your client is configured:

By default, the boto3 HTTP connection pool size is relatively small (for example, 10 connections), which can be quickly exhausted by highly concurrent workloads.
Creating a new client for every request instead of reusing a single client per process or container can multiply the number of open connections unnecessarily.

To help fix this, share a single Bedrock client instance and increase the connection pool size:
import boto3
from botocore.config import Config

# Security Best Practice: Never hardcode credentials
# boto3 automatically uses credentials from:
# 1. Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
# 2. IAM role (recommended for EC2, Lambda, ECS)
# 3. AWS credentials file (~/.aws/credentials)
# 4. IAM roles for service accounts (recommended for EKS)

# Configure larger connection pool for parallel execution
config = Config(
max_pool_connections=50, # Increase from default 10
retries={‘max_attempts’: 3}
)
bedrock_client = boto3.client(‘bedrock-runtime’, config=config)
This configuration allows more parallel requests through a single, well-tuned client instead of hitting client-side limits.
Temporary Service Resource Issues
What it looks like:
botocore.errorfactory.ServiceUnavailableException: An error occurred (ServiceUnavailableException) when calling the InvokeModel operation: Service temporarily unavailable, please try again.
In this case, the Bedrock service is signaling a transient capacity or infrastructure issue, often affecting on-demand models during demand spikes. Here you should treat the error as a temporary outage and focus on retrying smartly and failing over gracefully:

Use exponential backoff retries, similar to your 429 handling, but with parameters tuned for slower recovery.
Consider using cross-Region inference or different service tiers to help get more predictable capacity envelopes for your most critical workloads.

Advanced resilience strategies
When you operate mission-critical systems, simple retries are not enough; you also want to avoid making a bad situation worse.
Circuit Breaker Pattern
The circuit breaker pattern helps prevent your application from continuously calling a service that is already failing. Instead, it quickly flips into an “open” state after repeated failures, blocking new requests for a cooling-off period.

CLOSED (Normal): Requests flow normally.
OPEN (Failing): After repeated failures, new requests are rejected immediately, helping reduce pressure on the service and conserve client resources.
HALF_OPEN (Testing): After a timeout, a small number of trial requests are allowed; if they succeed, the circuit closes again.

Why This Matters for Bedrock
When Bedrock returns 503 errors due to capacity issues, continuing to hammer the service with requests only makes things worse. The circuit breaker pattern helps:

Reduce load on the struggling service, helping it recover faster
Fail fast instead of wasting time on requests that will likely fail
Provide automatic recovery by periodically testing if the service is healthy again
Improve user experience by returning errors quickly rather than timing out

The following code implements this:
import time
from enum import Enum

class CircuitState(Enum):
CLOSED = “closed” # Normal operation
OPEN = “open” # Failing, reject requests
HALF_OPEN = “half_open” # Testing if service recovered

class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED

def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if time.time() – self.last_failure_time > self.timeout:
self.state = CircuitState.HALF_OPEN
else:
raise Exception(“Circuit breaker is OPEN”)

try:
result = func(*args, **kwargs)
self.on_success()
return result
except Exception as e:
self.on_failure()
raise

def on_success(self):
self.failure_count = 0
self.state = CircuitState.CLOSED

def on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()

if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN

# Usage
circuit_breaker = CircuitBreaker()

def make_bedrock_request():
return circuit_breaker.call(bedrock_client.invoke_model, **request_params)
Cross-Region Failover Strategy with CRIS
Amazon Bedrock cross-Region inference (CRIS) helps add another layer of resilience by giving you a managed way to route traffic across Regions.

Global CRIS Profiles: can send traffic to AWS commercial Regions, typically offering the best combination of throughput and cost (often around 10% savings).
Geographic CRIS Profiles: CRIS profiles confine traffic to specific geographies (for example, US-only, EU-only, APAC-only) to help satisfy strict data residency or regulatory requirements.

For applications without data residency requirements, global CRIS offers enhanced performance, reliability, and cost efficiency.
From an architecture standpoint:

For non-regulated workloads, using a global profile can significantly improve availability and absorb regional spikes.
For regulated workloads, configure geographic profiles that align with your compliance boundaries, and document those decisions in your governance artifacts.

Bedrock automatically encrypts data in transit using TLS and does not store customer prompts or outputs by default; combine this with CloudTrail logging for compliance posture.
Monitoring and Observability for 429 and 503 Errors
You cannot manage what you cannot see, so robust monitoring is essential when working with quota-driven errors and service availability. Setting up comprehensive Amazon CloudWatch monitoring is essential for proactive error management and maintaining application reliability.
Note: CloudWatch custom metrics, alarms, and dashboards incur charges based on usage. Review CloudWatch pricing for details.
Essential CloudWatch Metrics
Monitor these CloudWatch metrics:

Invocations: Successful model invocations
InvocationClientErrors: 4xx errors including throttling
InvocationServerErrors: 5xx errors including service unavailability
InvocationThrottles: 429 throttling errors
InvocationLatency: Response times
InputTokenCount/OutputTokenCount: Token usage for TPM monitoring

For better insight, create dashboards that:

Separate 429 and 503 into different widgets so you can see whether a spike is quota-related or service-side.
Break down metrics by ModelId and Region to find the specific models or Regions that are problematic.
Show side-by-side comparisons of current traffic vs previous weeks to spot emerging trends before they become incidents.

Critical Alarms
Do not wait until users notice failures before you act. Configure CloudWatch alarms with Amazon SNS notifications based on thresholds such as:
For 429 Errors:

A high number of throttling events in a 5-minute window.
Consecutive periods with non-zero throttle counts, indicating sustained pressure.
Quota utilization above a chosen threshold (for example, 80% of RPM/TPM).

For 503 Errors:

Service success rate falling below your SLO (for example, 95% over 10 minutes).
Sudden spikes in 503 counts correlated with specific Regions or models.
Service availability (for example, <95% success rate)
Signs of connection pool saturation on client metrics.

Alarm Configuration Best Practices

Use Amazon Simple Notification Service (Amazon SNS) topics to route alerts to your team’s communication channels (Slack, PagerDuty, email)
Set up different severity levels: Critical (immediate action), Warning (investigate soon), Info (trending issues)
Configure alarm actions to trigger automated responses where appropriate
Include detailed alarm descriptions with troubleshooting steps and runbook links
Test your alarms regularly to make sure notifications are working correctly
Do not include sensitive customer data in alarm messages

Log Analysis Queries
CloudWatch Logs Insights queries help you move from “we see errors” to “we understand patterns.” Examples include:
Find 429 error patterns:
fields @timestamp, @message
| filter @message like /ThrottlingException/
| stats count() by bin(5m)
| sort @timestamp desc
Analyze 503 error correlation with request volume:
fields @timestamp, @message
| filter @message like /ServiceUnavailableException/
| stats count() as error_count by bin(1m)
| sort @timestamp desc
Wrapping Up: Building Resilient Applications
We’ve covered a lot of ground in this post, so let’s bring it all together. Successfully handling Bedrock errors requires:

Understand root causes: Distinguish quota limits (429) from capacity issues (503)
Implement appropriate retries: Use exponential backoff with different parameters for each error type
Design for scale: Use connection pooling, circuit breakers, and Cross-Region failover
Monitor proactively: Set up comprehensive CloudWatch monitoring and alerting
Plan for growth: Request quota increases and implement fallback strategies

Conclusion
Handling 429 ThrottlingException and 503 ServiceUnavailableException errors effectively is a crucial part of running production-grade generative AI workloads on Amazon Bedrock. By combining quota-aware design, intelligent retries, client-side resilience patterns, cross-Region strategies, and strong observability, you can keep your applications responsive even under unpredictable load.
As a next step, identify your most critical Bedrock workloads, enable the retry and rate-limiting patterns described here, and build dashboards and alarms that expose your real peaks rather than just averages. Over time, use real traffic data to refine quotas, fallback models, and regional deployments so your AI systems can remain both powerful and dependable as they scale.
For teams looking to accelerate incident resolution, consider enabling AWS DevOps Agent—an AI-powered agent that investigates Bedrock errors by correlating CloudWatch metrics, logs, and alarms just like an experienced DevOps engineer would. It learns your resource relationships, works with your observability tools and runbooks, and can significantly reduce mean time to resolution (MTTR) for 429 and 503 errors by automatically identifying root causes and suggesting remediation steps.
Learn More

Amazon Bedrock Documentation
Amazon Bedrock Quotas
Cross-Region Inference
Cross-Region Inference Security
SNS Security
AWS Logging Best Practices
AWS Bedrock Security Best Practices
AWS IAM Best Practices – Least Privilege

About the Authors

Farzin Bagheri
Farzin Bagheri is a Principal Technical Account Manager at AWS, where he supports strategic customers in achieving the highest levels of cloud operational maturity. Farzin joined AWS in 2013, and his focus in the recent years has been on identifying common patterns in cloud operation challenges and developing innovative solutions and strategies that help both AWS and its customers navigate complex technical landscapes.

Abel Laura
Abel Laura is a Technical Operations Manager with AWS Support, where he leads customer-centric teams focused on emerging generative AI products. With over a decade of leadership experience, he partners with technical support specialists to transform complex challenges into innovative, technology-driven solutions for customers. His passion lies in helping organizations harness the power of emerging AI technologies to drive meaningful business outcomes. In his free time, Abel enjoys spending time with his family and mentoring aspiring tech leaders.

Arun KM
Arun is a Principal Technical Account Manager at AWS, where he supports strategic customers in building production-ready generative AI applications with operational excellence. His focus in recent years has been on Amazon Bedrock, helping customers troubleshoot complex error patterns, customize open-source models, optimize model performance, and develop resilient AI architectures that can maximize return on investment and scale reliably in production environments.

Aswath Ram A Srinivasan
Aswath Ram A Srinivasan is a Sr. Cloud Support Engineer at AWS. With a strong background in ML, he has three years of experience building AI applications and specializes in hardware inference optimizations for LLM models. As a Subject MatterExpert, he tackles complex scenarios and use cases, helping customers unblock challenges and accelerate their path to production-ready solutions using Amazon Bedrock, Amazon SageMaker, and other AWS services. In his free time, Aswath enjoys photography and researching Machine Learning and Generative AI.

NVIDIA Nemotron 3 Nano 30B MoE model is now available in Amazon SageMa …

Today we’re excited to announce that the NVIDIA Nemotron 3 Nano 30B model with  3B active parameters is now generally available in the Amazon SageMaker JumpStart model catalog. You can accelerate innovation and deliver tangible business value with Nemotron 3 Nano on Amazon Web Services (AWS) without having to manage model deployment complexities. You can power your generative AI applications with Nemotron capabilities using the managed deployment capabilities offered by SageMaker JumpStart.
Nemotron 3 Nano is a small language hybrid mixture of experts (MoE) model with the highest compute efficiency and accuracy for developers to drive highly-skilled agentic tasks at scale. The model is fully open with open-weights, datasets, and recipes, so developers can seamlessly customize, optimize, and deploy the model on their infrastructure to help meet their privacy and security requirements. Nemotron 3 Nano excels in coding and reasoning, and leads on benchmarks such as SWE Bench Verified, GPQA Diamond, AIME 2025, Arena Hard v2, and IFBench.
About Nemotron 3 Nano 30B
Nemotron 3 Nano is differentiated from other models by its architecture and accuracy, boasting strong performance in a variety of highly technical skills:

Architecture:

ο      MoE with hybrid Transformer-Mamba architectureο      Supports token budget for providing optimal accuracy with minimum reasoning token generation

Accuracy:

Leading accuracy on coding, scientific reasoning, math, and instruction following
Leads on benchmarks such as LiveCodeBench, GPQA Diamond, AIME 2025, BFCL , and IFBench (compared to other open language models under 30B)

Usability:

30B parameter model with 3 billion active parameters
Has a context window of up to 1 million tokens
Text-based foundation model, using text for both inputs and outputs

Prerequisites
To get started with Nemotron 3 Nano in Amazon SageMaker JumpStart, you must have a provisioned Amazon SageMaker Studio domain.
Get started with NVIDIA Nemotron 3 Nano 30B in SageMaker JumpStart
To test the Nemotron 3 Nano model in SageMaker JumpStart, open SageMaker Studio and choose Models in the navigation pane.  Search for NVIDIA in the search bar and choose NVIDIA Nemotron 3 Nano 30B as the model.

On the model details page, choose Deploy and follow the prompts to deploy the model.
After the model is deployed to a SageMaker AI endpoint, you can test it. You can access the model using the following AWS Command Line Interface (AWS CLI) code examples. You can use nvidia/nemotron-3-nano as the model ID.

cat > input.json << EOF
{
“model”: “${MODEL_ID}”,
“messages”: [
{
“role”: “system”,
“content”: “You are a helpful assistant.”
},
{
“role”: “user”,
“content”: “What is NVIDIA? Answer in 2-3 sentences.”
}],
“max_tokens”: 512,
“temperature”: 0.2,
“stream”: False, # Set to False for non-streaming mode,
“chat_template_kwargs”: {“enable_thinking”: False} # Set to False for non-reasoning mode
}
EOF

aws sagemaker-runtime invoke-endpoint
–endpoint-name ${ENDPOINT_NAME}
–region ${AWS_REGION}
–content-type ‘application/json’
–body fileb://input.json
> response.json

Alternatively, you can access the model using SageMaker SDK and Boto3 code. The following Python code examples show how to send a text message to the NVIDIA Nemotron 3 Nano 30B using the SageMaker SDK. For additional code examples, refer to the NVIDIA GitHub repo.

runtime_client = boto3.client(‘sagemaker-runtime’, region_name=region)
payload = {
“messages”: [
{“role”: “user”, “content”: prompt}
],
“max_tokens”: 1000
}

try:
response = self.runtime_client.invoke_endpoint(
EndpointName=self.endpoint_name,
ContentType=’application/json’,
Body=json.dumps(payload)
)

response_body = response[‘Body’].read().decode(‘utf-8’)
raw_response = json.loads(response_body)

# Parse the response using our custom parser
return self.parse_response(raw_response)

except Exception as e:
raise Exception(
f”Failed to invoke endpoint ‘{self.endpoint_name}’: {str(e)}. ”
f”Check that the endpoint is InService and you have least-privileged IAM permissions assigned.”
)

Now available
NVIDIA Nemotron 3 Nano is now available fully managed in SageMaker JumpStart. Refer to the model package for AWS Region availability. To learn more, check out the Nemotron Nano model page, the NVIDIA GitHub sample notebook for Nemotron 3 Nano 30B, and the Amazon SageMaker JumpStart pricing page.
Try the Nemotron 3 Nano model in Amazon SageMaker JumpStart today and send feedback to AWS re:Post for SageMaker JumpStart  or through your usual AWS Support contacts.

About the authors
Dan Ferguson is a Solutions Architect at AWS, based in New York, USA. As a machine learning services expert, Dan works to support customers on their journey to integrating ML workflows efficiently, effectively, and sustainably.
Pooja Karadgi leads product and strategic partnerships for Amazon SageMaker JumpStart, the machine learning and generative AI hub within SageMaker. She is dedicated to accelerating customer AI adoption by simplifying foundation model discovery and deployment, enabling customers to build production-ready generative AI applications across the entire model lifecycle – from onboarding and customization to deployment.
Benjamin Crabtree is a Senior Software Engineer on the Amazon SageMaker AI team, specializing in delivering the “last mile” experience to customers. He is passionate about democratizing the latest artificial intelligence breakthroughs by offering easy to use capabilities. Also, Ben is highly experienced in building machine learning infrastructure at scale.
Timothy Ma is a Principal Specialist in generative AI at AWS, where he collaborates with customers to design and deploy cutting-edge machine learning solutions. He also leads go-to-market strategies for generative AI services, helping organizations harness the potential of advanced AI technologies.
Abdullahi Olaoye is a Senior AI Solutions Architect at NVIDIA, specializing in integrating NVIDIA AI libraries, frameworks, and products with cloud AI services and open-source tools to optimize AI model deployment, inference, and generative AI workflows. He collaborates with AWS to enhance AI workload performance and drive adoption of NVIDIA-powered AI and generative AI solutions.
Nirmal Kumar Juluru is a product marketing manager at NVIDIA driving the adoption of AI software, models, and APIs in the NVIDIA NGC Catalog and NVIDIA AI Foundation models and endpoints. He previously worked as a software developer. Nirmal holds an MBA from Carnegie Mellon University and a bachelors in computer science from BITS Pilani.
Vivian Chen is a Deep Learning Solutions Architect at NVIDIA, where she helps teams bridge the gap between complex AI research and real-world performance. Specializing in inference optimization and cloud-integrated AI solutions, Vivian focuses on turning the heavy lifting of machine learning into fast, scalable applications. She is passionate about helping clients navigate NVIDIA’s accelerated computing stack to ensure their models don’t just work in the lab, but thrive in production.