How a Haystack-Powered Multi-Agent System Detects Incidents, Investiga …

In this tutorial, we design this implementation to demonstrate how Haystack enables building advanced, agentic AI systems that go far beyond toy examples while remaining fully runnable. We focus on a cohesive, end-to-end setup that highlights orchestration, stateful decision-making, tool execution, and structured control flow, demonstrating how complex agent behavior can be cleanly expressed. We deliberately keep everything in a single executable snippet to emphasize reproducibility and to make it easy for us to experiment, extend, and stress-test the system in realistic scenarios. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserimport os, json, math, random, textwrap
from datetime import datetime, timedelta

try:
import pandas as pd
except Exception:
os.system(“pip -q install pandas”)
import pandas as pd

try:
import numpy as np
except Exception:
os.system(“pip -q install numpy”)
import numpy as np

try:
import duckdb
except Exception:
os.system(“pip -q install duckdb”)
import duckdb

os.system(“pip -q install haystack-ai openai”)

from haystack.components.agents import Agent
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from haystack.tools import tool
from haystack.components.agents.state import State
from haystack.components.agents.state.state_utils import merge_lists
from haystack.tools import ComponentTool

from getpass import getpass

if not os.getenv(“OPENAI_API_KEY”):
key = getpass(“Enter OPENAI_API_KEY (input hidden): “).strip()
if key:
os.environ[“OPENAI_API_KEY”] = key

if not os.getenv(“OPENAI_API_KEY”):
raise RuntimeError(“OPENAI_API_KEY missing. Set it in the environment or paste when prompted.”)

We install and import all required libraries, ensuring that Haystack, OpenAI, and data tooling are available, and securely load the OpenAI API key at runtime. We configure the environment to gracefully handle missing dependencies and prompt for credentials without hardcoding sensitive information. We prepare the foundation for an agent-driven workflow by initializing core Haystack components, tools, and state utilities in a Colab-ready setup. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserrandom.seed(7)
np.random.seed(7)

now = datetime.utcnow()
start = now – timedelta(hours=24)

services = [“api-gateway”, “payments”, “auth”, “db-proxy”, “worker”, “web”]
regions = [“eu-central-1”, “eu-west-1”, “us-east-1”]
levels = [“INFO”, “WARN”, “ERROR”]
error_kinds = [
“UpstreamTimeout”,
“DBConnPoolExhausted”,
“JWTSignatureInvalid”,
“RateLimitExceeded”,
“DeadlockDetected”,
“CacheMissStorm”,
“OOMKilled”,
“TLSHandshakeFailure”,
]

def synth_metrics(n=1440):
ts = [start + timedelta(minutes=i) for i in range(n)]
base_rps = 220 + 40*np.sin(np.linspace(0, 8*math.pi, n)) + np.random.normal(0, 10, n)
base_p95 = 180 + 30*np.sin(np.linspace(0, 6*math.pi, n) + 0.5) + np.random.normal(0, 8, n)
base_err = np.clip(np.random.normal(0.006, 0.002, n), 0.0, 0.05)
incident_t0 = int(n*0.62)
incident_t1 = incident_t0 + int(n*0.10)
base_p95[incident_t0:incident_t1] += np.linspace(120, 520, incident_t1-incident_t0)
base_err[incident_t0:incident_t1] += np.linspace(0.01, 0.07, incident_t1-incident_t0)
base_rps[incident_t0:incident_t1] -= np.linspace(5, 80, incident_t1-incident_t0)
df = pd.DataFrame({
“ts”: ts,
“rps”: np.clip(base_rps, 5, None),
“p95_ms”: np.clip(base_p95, 10, None),
“error_rate”: np.clip(base_err, 0.0, 0.2),
})
return df, (ts[incident_t0], ts[incident_t1])

metrics_df, (incident_begin, incident_end) = synth_metrics()

We seed randomness and generate a realistic 24-hour stream of synthetic service metrics with periodic behavior and noise. We deliberately introduce an incident window during which latency and error rates spike while request throughput degrades. We return both the metrics DataFrame and precise incident boundaries to support downstream detection and agent reasoning. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserdef synth_logs(n=9000):
rows = []
for _ in range(n):
t = start + timedelta(seconds=random.randint(0, 24*3600-1))
svc = random.choice(services)
reg = random.choice(regions)
lvl = random.choices(levels, weights=[0.86, 0.10, 0.04])[0]
kind = None
msg = “ok”
latency = max(5, int(np.random.normal(120, 55)))
if incident_begin <= t <= incident_end and svc in [“api-gateway”, “payments”, “db-proxy”]:
if random.random() < 0.24:
lvl = random.choices([“WARN”,”ERROR”], weights=[0.55,0.45])[0]
kind = random.choices(
[“UpstreamTimeout”,”DBConnPoolExhausted”,”DeadlockDetected”,”CacheMissStorm”],
weights=[0.40,0.28,0.10,0.22]
)[0]
latency += random.randint(300, 1200)
msg = f”{kind}: request failed”
if lvl == “ERROR” and kind is None and random.random() < 0.45:
kind = random.choice(error_kinds)
msg = f”{kind}: unexpected failure”
latency += random.randint(80, 700)
trace = f”tr_{random.randint(10**7,10**8-1)}”
user = f”u_{random.randint(1,20000)}”
endpoint = random.choice([“/pay”,”/auth”,”/refund”,”/status”,”/checkout”,”/profile”,”/ledger”])
rows.append({
“ts”: t,
“service”: svc,
“region”: reg,
“level”: lvl,
“error_kind”: kind or “”,
“endpoint”: endpoint,
“latency_ms”: latency,
“trace_id”: trace,
“user_id”: user,
“message”: msg
})
df = pd.DataFrame(rows).sort_values(“ts”).reset_index(drop=True)
return df

logs_df = synth_logs()

metrics_path = “/content/metrics.csv”
logs_path = “/content/logs.csv”
metrics_df.to_csv(metrics_path, index=False)
logs_df.to_csv(logs_path, index=False)

con = duckdb.connect(database=”:memory:”)
con.execute(“CREATE TABLE metrics AS SELECT * FROM read_csv_auto(?, HEADER=TRUE)”, [metrics_path])
con.execute(“CREATE TABLE logs AS SELECT * FROM read_csv_auto(?, HEADER=TRUE)”, [logs_path])

We synthesize high-volume, time-distributed logs with realistic service, region, severity, and error patterns that intensify during the incident window. We persist both metrics and logs to CSV and load them into an in-memory DuckDB database for fast analytical queries. We prepare a unified, queryable observability dataset that supports correlation between latency, errors, and log-level signals. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserdef zscore_anomalies(series, window=60, z=3.0):
x = series.astype(float).values
out = np.zeros_like(x, dtype=bool)
for i in range(len(x)):
lo = max(0, i-window)
hi = i
if hi – lo < max(10, window//4):
continue
mu = float(np.mean(x[lo:hi]))
sd = float(np.std(x[lo:hi])) + 1e-9
out[i] = abs((x[i]-mu)/sd) >= z
return out

@tool
def load_inputs(metrics_csv_path: str, logs_csv_path: str) -> dict:
m = pd.read_csv(metrics_csv_path, parse_dates=[“ts”])
l = pd.read_csv(logs_csv_path, parse_dates=[“ts”])
return {
“metrics_summary”: {
“rows”: int(len(m)),
“start”: str(m[“ts”].min()),
“end”: str(m[“ts”].max()),
“cols”: list(m.columns)
},
“logs_summary”: {
“rows”: int(len(l)),
“start”: str(l[“ts”].min()),
“end”: str(l[“ts”].max()),
“cols”: list(l.columns),
“services”: sorted(l[“service”].unique().tolist()),
“regions”: sorted(l[“region”].unique().tolist())
}
}

@tool
def detect_incident_window(metric: str, z_threshold: float = 3.2, min_span_minutes: int = 10) -> dict:
if metric not in [“rps”,”p95_ms”,”error_rate”]:
return {“error”: “metric must be one of: rps, p95_ms, error_rate”}
df = metrics_df.copy().sort_values(“ts”)
flags = zscore_anomalies(df[metric], window=75, z=float(z_threshold))
df[“flag”] = flags
idx = np.where(df[“flag”].values)[0]
if len(idx) == 0:
return {“found”: False}
groups = []
cur = [idx[0]]
for i in idx[1:]:
if i == cur[-1] + 1:
cur.append(i)
else:
groups.append(cur)
cur = [i]
groups.append(cur)
spans = []
for g in groups:
t0 = df.loc[g[0], “ts”]
t1 = df.loc[g[-1], “ts”]
span = (t1 – t0).total_seconds() / 60.0
if span >= float(min_span_minutes):
spans.append((span, t0, t1, int(len(g))))
spans.sort(key=lambda x: (-x[0], -x[3]))
if not spans:
best = max(groups, key=len)
t0 = df.loc[best[0], “ts”]
t1 = df.loc[best[-1], “ts”]
return {“found”: True, “metric”: metric, “start”: str(t0), “end”: str(t1), “points”: int(len(best)), “note”: “short anomaly span; consider lowering min_span_minutes”}
best = spans[0]
return {“found”: True, “metric”: metric, “start”: str(best[1]), “end”: str(best[2]), “minutes”: float(best[0]), “points”: int(best[3])}

We implement a rolling z-score detector to flag statistically significant deviations in key metrics over time. We expose tools that load observability inputs and summarize their structure to ground the agent’s reasoning. We detect and rank contiguous anomaly windows, returning the most meaningful incident span with clear temporal boundaries. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browser@tool
def sql_investigate(query: str) -> dict:
try:
df = con.execute(query).df()
head = df.head(30)
return {
“rows”: int(len(df)),
“columns”: list(df.columns),
“preview”: head.to_dict(orient=”records”)
}
except Exception as e:
return {“error”: str(e)}

@tool
def log_pattern_scan(window_start_iso: str, window_end_iso: str, top_k: int = 8) -> dict:
ws = pd.to_datetime(window_start_iso)
we = pd.to_datetime(window_end_iso)
df = logs_df[(logs_df[“ts”] >= ws) & (logs_df[“ts”] <= we)].copy()
if df.empty:
return {“rows”: 0, “top_error_kinds”: [], “top_services”: [], “top_endpoints”: []}
df[“error_kind_norm”] = df[“error_kind”].fillna(“”).replace(“”, “NONE”)
err = df[df[“level”].isin([“WARN”,”ERROR”])].copy()
top_err = err[“error_kind_norm”].value_counts().head(int(top_k)).to_dict()
top_svc = err[“service”].value_counts().head(int(top_k)).to_dict()
top_ep = err[“endpoint”].value_counts().head(int(top_k)).to_dict()
by_region = err.groupby(“region”).size().sort_values(ascending=False).head(int(top_k)).to_dict()
p95_latency = float(np.percentile(df[“latency_ms”].values, 95))
return {
“rows”: int(len(df)),
“warn_error_rows”: int(len(err)),
“p95_latency_ms”: p95_latency,
“top_error_kinds”: top_err,
“top_services”: top_svc,
“top_endpoints”: top_ep,
“error_by_region”: by_region
}

@tool
def propose_mitigations(hypothesis: str) -> dict:
h = hypothesis.lower()
mitigations = []
if “conn” in h or “pool” in h or “db” in h:
mitigations += [
{“action”: “Increase DB connection pool size (bounded) and add backpressure at db-proxy”, “owner”: “Platform”, “eta_days”: 3},
{“action”: “Add circuit breaker + adaptive timeouts between api-gateway and db-proxy”, “owner”: “Backend”, “eta_days”: 5},
{“action”: “Tune query hotspots; add indexes for top offending endpoints”, “owner”: “Data/DBA”, “eta_days”: 7},
]
if “timeout” in h or “upstream” in h:
mitigations += [
{“action”: “Implement hedged requests for idempotent calls (carefully) and tighten retry budgets”, “owner”: “Backend”, “eta_days”: 6},
{“action”: “Add upstream SLO-aware load shedding at api-gateway”, “owner”: “Platform”, “eta_days”: 7},
]
if “cache” in h:
mitigations += [
{“action”: “Add request coalescing and negative caching to prevent cache-miss storms”, “owner”: “Backend”, “eta_days”: 6},
{“action”: “Prewarm cache for top endpoints during deploys”, “owner”: “SRE”, “eta_days”: 4},
]
if not mitigations:
mitigations += [
{“action”: “Add targeted dashboards and alerts for the suspected bottleneck metric”, “owner”: “SRE”, “eta_days”: 3},
{“action”: “Run controlled load test to reproduce and validate the hypothesis”, “owner”: “Perf Eng”, “eta_days”: 5},
]
mitigations = mitigations[:10]
return {“hypothesis”: hypothesis, “mitigations”: mitigations}

@tool
def draft_postmortem(title: str, window_start_iso: str, window_end_iso: str, customer_impact: str, suspected_root_cause: str, key_facts_json: str, mitigations_json: str) -> dict:
try:
facts = json.loads(key_facts_json)
except Exception:
facts = {“note”: “key_facts_json was not valid JSON”}
try:
mits = json.loads(mitigations_json)
except Exception:
mits = {“note”: “mitigations_json was not valid JSON”}
doc = {
“title”: title,
“date_utc”: datetime.utcnow().strftime(“%Y-%m-%d”),
“incident_window_utc”: {“start”: window_start_iso, “end”: window_end_iso},
“customer_impact”: customer_impact,
“suspected_root_cause”: suspected_root_cause,
“detection”: {
“how_detected”: “Automated anomaly detection + error-rate spike triage”,
“gaps”: [“Add earlier saturation alerting”, “Improve symptom-to-cause correlation dashboards”]
},
“timeline”: [
{“t”: window_start_iso, “event”: “Symptoms begin (latency/error anomalies)”},
{“t”: “T+10m”, “event”: “On-call begins triage; identifies top services/endpoints”},
{“t”: “T+25m”, “event”: “Mitigation actions initiated (throttling/backpressure)”},
{“t”: window_end_iso, “event”: “Customer impact ends; metrics stabilize”},
],
“key_facts”: facts,
“corrective_actions”: mits.get(“mitigations”, mits),
“followups”: [
{“area”: “Reliability”, “task”: “Add saturation signals + budget-based retries”, “priority”: “P1”},
{“area”: “Observability”, “task”: “Add golden signals per service/endpoint”, “priority”: “P1”},
{“area”: “Performance”, “task”: “Reproduce with load test and validate fix”, “priority”: “P2”},
],
“appendix”: {“notes”: “Generated by a Haystack multi-agent workflow (non-RAG).”}
}
return {“postmortem_json”: doc}

llm = OpenAIChatGenerator(model=”gpt-4o-mini”)

state_schema = {
“metrics_csv_path”: {“type”: str},
“logs_csv_path”: {“type”: str},
“metrics_summary”: {“type”: dict},
“logs_summary”: {“type”: dict},
“incident_window”: {“type”: dict},
“investigation_notes”: {“type”: list, “handler”: merge_lists},
“hypothesis”: {“type”: str},
“key_facts”: {“type”: dict},
“mitigation_plan”: {“type”: dict},
“postmortem”: {“type”: dict},
}

profiler_prompt = “””You are a specialist incident profiler.
Goal: turn raw metrics/log summaries into crisp, high-signal findings.
Rules:
– Prefer calling tools over guessing.
– Output must be a JSON object with keys: window, symptoms, top_contributors, hypothesis, key_facts.
– Hypothesis must be falsifiable and mention at least one specific service and mechanism.
“””

writer_prompt = “””You are a specialist postmortem writer.
Goal: produce a high-quality postmortem JSON (not prose) using the provided evidence and mitigation plan.
Rules:
– Call tools only if needed.
– Keep ‘suspected_root_cause’ specific and not generic.
– Ensure corrective actions have owners and eta_days.
“””

coordinator_prompt = “””You are an incident commander coordinating a non-RAG multi-agent workflow.
You must:
1) Load inputs
2) Find an incident window (use p95_ms or error_rate)
3) Investigate with targeted SQL and log pattern scan
4) Ask the specialist profiler to synthesize evidence
5) Propose mitigations
6) Ask the specialist writer to draft a postmortem JSON
Return a final response with:
– A short executive summary (max 10 lines)
– The postmortem JSON
– A compact runbook checklist (bulleted)
“””

profiler_agent = Agent(
chat_generator=llm,
tools=[load_inputs, detect_incident_window, sql_investigate, log_pattern_scan],
system_prompt=profiler_prompt,
exit_conditions=[“text”],
state_schema=state_schema
)

writer_agent = Agent(
chat_generator=llm,
tools=[draft_postmortem],
system_prompt=writer_prompt,
exit_conditions=[“text”],
state_schema=state_schema
)

profiler_tool = ComponentTool(
component=profiler_agent,
name=”profiler_specialist”,
description=”Synthesizes incident evidence into a falsifiable hypothesis and key facts (JSON output).”,
outputs_to_string={“source”: “last_message”}
)

writer_tool = ComponentTool(
component=writer_agent,
name=”postmortem_writer_specialist”,
description=”Drafts a postmortem JSON using title/window/impact/rca/facts/mitigations.”,
outputs_to_string={“source”: “last_message”}
)

coordinator_agent = Agent(
chat_generator=llm,
tools=[
load_inputs,
detect_incident_window,
sql_investigate,
log_pattern_scan,
propose_mitigations,
profiler_tool,
writer_tool,
draft_postmortem
],
system_prompt=coordinator_prompt,
exit_conditions=[“text”],
state_schema=state_schema
)

We define a suite of investigative, synthesis, and documentation tools that let agents query data, extract patterns, and propose concrete mitigations. We orchestrate specialist profiler and writer agents under a coordinator that drives an end-to-end, non-RAG incident workflow. We configure prompts, state schemas, and tool bridges so the system produces falsifiable hypotheses, actionable plans, and a structured postmortem.Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserprofiler_agent.warm_up()
writer_agent.warm_up()
coordinator_agent.warm_up()

initial_state = {
“metrics_csv_path”: metrics_path,
“logs_csv_path”: logs_path,
“investigation_notes”: []
}

task = “””
We have an incident in the last 24h. Investigate using the provided CSVs.
Constraints:
– Do not use RAG or any document retriever/store.
– Use tools + SQL to ground conclusions.
– Produce a realistic postmortem JSON and a runbook checklist.
“””

result = coordinator_agent.run(
messages=[ChatMessage.from_user(task)],
state=State(schema=state_schema, data=initial_state)
)

last = result[“last_message”].text if “last_message” in result else result[“messages”][-1].text
print(last)

We warm up all agents to ensure tools, prompts, and state transitions are fully initialized before execution. We define the investigation task and initial state, then delegate end-to-end incident handling to the coordinator agent. We execute the workflow and surface the final executive summary, postmortem JSON, and runbook output.

In conclusion, we showed how Haystack supports sophisticated agentic patterns that scale in complexity without becoming fragile or hard to reason about. We demonstrated that, even within a notebook, we can express rich agent logic, maintain explicit state, and coordinate multiple components in a controlled and extensible way. By structuring the system this way, we placed ourselves in a strong position to iterate on more advanced behaviors, evaluate agent decisions, and evolve the tutorial into production-grade agentic workflows.

Check out the FULL CODES here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post How a Haystack-Powered Multi-Agent System Detects Incidents, Investigates Metrics and Logs, and Produces Production-Grade Incident Reviews End-to-End appeared first on MarkTechPost.

NVIDIA Revolutionizes Climate Tech with ‘Earth-2’: The World’s F …

For decades, predicting the weather has been the exclusive domain of massive government supercomputers running complex physics-based equations. NVIDIA has shattered that barrier with the release of the Earth-2 family of open models and tools for AI weather and climate prediction accessible to virtually anyone, from tech startups to national meteorological agencies.

In a move that democratizes climate science, NVIDIA unveiled 3 groundbreaking new models powered by novel architectures: Atlas, StormScope, and HealDA. These tools promise to accelerate forecasting speeds by orders of magnitude while delivering accuracy that rivals or exceeds traditional methods.

The Democratization of Weather Intelligence

Historically, running a high-fidelity weather model required infrastructure that only a few countries could afford. NVIDIA’s Earth-2 changes the calculus by offering an ‘open stack’, a collection of pretrained models, inference libraries, and customization recipes available on platforms like GitHub and Hugging Face.

Mike Pritchard, Director of Climate Simulation at NVIDIA, emphasized that NVIDIA is not becoming a weather service provider. Instead, they are building the “foundational building blocks” that allow nations and companies to build their own sovereign forecasting systems.

“Sovereignty matters. Weather is a national security issue… That’s why we’ve built Earth-2, the world’s first fully open production-ready AI weather stack.” – Mike Pritchard, NVIDIA

Meet the New Heavyweights: Atlas, StormScope, and HealDA

The announcement introduces 3 specific models that address different stages of the forecasting pipeline, from processing messy data to predicting storms weeks in advance.

1. Earth-2 Medium Range (Powered by Atlas)

Targeting the 15-day forecast window, this model uses a new architecture called Atlas. It predicts over 70 weather variables, including wind, humidity, and pressure, at high accuracy.

Performance: On standard industry benchmarks, Atlas has been shown to outperform GenCast, the current leading open model, across the vast majority of variables.

The Shift: It represents a return to “simple, scalable Transformer architectures,” moving away from niche, hand-tailored AI designs.

Read the research paper here.

2. Earth-2 Nowcasting (Powered by StormScope)

This is a game-changer for immediate disaster response. Powered by StormScope, this generative AI model focuses on the 0-to-6-hour window, providing kilometer-scale resolution of local storms.

Why it matters: It is the first AI model to outperform traditional physics-based methods for short-term precipitation forecasting.

Speed: It generates hazardous weather predictions in minutes, giving emergency responders critical time to act.

Sovereignty: Because it trains directly on geostationary satellite imagery rather than region-specific physics outputs, it can be deployed by any nation with good satellite coverage.

Read the research paper.

3. Earth-2 Global Data Assimilation (Powered by HealDA)

Often the unsung hero of forecasting, “data assimilation” is the process of combining messy satellite and balloon data into a coherent snapshot of the atmosphere to start a forecast.

The Breakthrough: Traditional assimilation consumes nearly 50% of supercomputing cycles. NVIDIA’s HealDA architecture accomplishes this task in minutes on GPUs rather than hours on supercomputers.

Result: When combined with the Medium Range model, it produces the most skillful predictions ever seen from an entirely AI-based pipeline.

Read the research paper

Real-World Impact: From Solar Power to Hurricane Risk

The Earth-2 stack is already in use by major global players, proving that AI weather forecasting is ready for commercial and operational prime time.

Renewable Energy: TotalEnergies and GCL (a major solar material producer) are using Earth-2 to predict solar and wind variability. For solar farms, accurate cloud cover prediction can significantly impact energy market trading.

Israel Meteorological Service: Using the CorrDiff model (part of the Earth-2 family), they have achieved a 90% reduction in compute time while generating high-resolution forecasts up to eight times daily.

Insurance & Risk: AXA and S&P Global Energy are leveraging the speed of Earth-2 to run thousands of “counterfactual” scenarios. By simulating thousands of years of hypothetical hurricane data, they can better understand rare, high-impact climate events that haven’t happened yet but might.

Daily Operations: Brightband, an AI weather tool provider, is already integrating Earth-2 Medium Range to issue daily global forecasts.

The Bottom Line

NVIDIA Earth-2 is not just a technical upgrade; it is a structural shift in how humans interact with the climate. By reducing the barrier to entry, shifting from multimillion-dollar supercomputers to accessible GPU-accelerated AI, NVIDIA is enabling a future where hyper-local, high-accuracy weather prediction is ubiquitous.

As extreme weather events become more frequent, tools like StormScope and Atlas will likely become essential infrastructure for governments and industries worldwide.

Earth-2 Medium Range and Nowcasting are available on GitHub, Hugging Face, and NVIDIA Earth2Studio. Earth-2 Global Data Assimilation is expected to be released later this year.

To learn more about getting started with these models, developers can visit the NVIDIA Earth-2 technical blog. Earth-2 Medium Range [Read the research paper], Earth-2 Nowcasting [Read the research paper], and Earth-2 Global Data Assimilation [Read the research paper].
The post NVIDIA Revolutionizes Climate Tech with ‘Earth-2’: The World’s First Fully Open Accelerated AI Weather Stack appeared first on MarkTechPost.

What is Clawdbot? How a Local First Agent Stack Turns Chats into Real …

Clawdbot is an open source personal AI assistant that you run on your own hardware. It connects large language models from providers such as Anthropic and OpenAI to real tools such as messaging apps, files, shell, browser and smart home devices, while keeping the orchestration layer under your control.

The interesting part is not that Clawdbot chats. It is that the project ships a concrete architecture for local first agents, and a typed workflow engine called Lobster that turns model calls into deterministic pipelines.

Architecture: Gateway, Nodes and Skills

At the center of Clawdbot is the Gateway process. The Gateway exposes a WebSocket control plane on ws://127.0.0.1:18789 and a local HTTP interface for the control UI and web chat.

Your messages from WhatsApp, Telegram, Signal, Slack, Discord, iMessage and other channels are delivered to the Gateway. The Gateway decides which agent should handle the message, which tools it may call, and which model provider to use. It then sends the reply back over the same channel.

The runtime is split into a few core concepts:

Gateway: Routing, model calls, tool invocation, sessions, presence and scheduling.

Nodes: Processes that give Clawdbot access to local resources such as file system, browser automation, microphone, camera or platform specific APIs on macOS, Windows, Linux, iOS and Android.

Channels: Integrations for chat systems like WhatsApp, Telegram, Discord, Slack, Signal, Microsoft Teams, Matrix, Zalo and more. These are configured as channel backends that attach to the Gateway.

Skills and plugins: Tools that the agent can call, described in a standard SKILL.md format and distributed through ClawdHub.

This separation lets you run the Gateway on a five dollar virtual server or a spare machine at home, while keeping heavy model compute on remote APIs or local model backends when needed.

Skills and the SKILL.md standard

Clawdbot uses an open skills format described in SKILL.md. A skill is defined in Markdown with a small header and an ordered procedure. For example, a deployment skill might specify steps such as checking git status, running tests and deploying only after success.


name: deploy-production
description: Deploy the current branch to production. Use only after tests pass.
disable-model-invocation: true

1. Check git status ensuring clean working directory.
2. Run `npm test`
3. If tests pass, run `npm run deploy`

The Gateway reads these definitions and exposes them to agents as tools with explicit capabilities and safety constraints. Skills are published to ClawdHub and can be installed or composed into larger workflows.

This means that operational runbooks can move from ad-hoc wiki pages into machine executable skills, while still being auditable as text.

Lobster: Typed Workflow Runtime for Agents

Lobster is the workflow runtime that powers Local Lobster and many advanced Clawdbot automations. It is described as a typed workflow shell that lets Clawdbot run multi step tool sequences as a single deterministic operation with explicit approval gates.

Instead of having the model call many tools in a loop, Lobster moves orchestration into a small domain specific runtime:

Pipelines are defined as JSON or YAML, or as a compact shell like pipeline string.

Steps exchange typed JSON data, not unstructured text.

The runtime enforces timeouts, output limits and sandbox policies.

Workflows can pause on side effects and resume later with a resumeToken.

A simple inbox triage workflow looks like this:

name: inbox-triage
steps:
– id: collect
command: inbox list –json
– id: categorize
command: inbox categorize –json
stdin: $collect.stdout
– id: approve
command: inbox apply –approve
stdin: $categorize.stdout
approval: required
– id: execute
command: inbox apply –execute
stdin: $categorize.stdout
condition: $approve.approved

Clawdbot treats this file as a skill. When you ask it to clean your inbox, it calls one Lobster pipeline instead of improvising many tool calls. The model decides when to run the pipeline and with which parameters, but the pipeline itself stays deterministic and auditable.

Local Lobster is the reference agent that uses Lobster to drive local workflows and is described in coverage as an open source agent that redefines personal AI by pairing local first workflows with proactive behavior.

Proactive local first behavior

A key reason Clawdbot is trending and visible on X and in developer communities is that it behaves like an operator, not just a chat window.

Because the Gateway can run scheduled jobs and track state across sessions, common patterns include:

Daily briefings that summarize calendars, tasks and important mail.

Periodic recaps such as weekly shipped work summaries.

Monitors that watch for conditions, then message you first on your preferred channel.

File and repository automations that run locally but are triggered by natural language.

All of this runs with routing and tool policy on your machine or server. Model calls still go to providers like Anthropic, OpenAI, Google, xAI or local backends, but the assistant brain, memory and integrations are under your control.

Installation and developer workflow

The project provides a one line installer that fetches a script from clawd.bot and bootstraps Node, the Gateway and core components. For more control, you can install via npm or clone the TypeScript repository and build with pnpm.

Typical steps:

curl -fsSL https://clawd.bot/install.sh | bash

# or

npm i -g clawdbot
clawdbot onboard

After onboarding you connect a channel such as Telegram or WhatsApp, choose a model provider and enable skills. From there you can write your own SKILL.md files, build Lobster workflows and expose them through chat, web chat or the macOS companion application.

Some Examples

Just ask @clawdbot to build and deploy a website with a chat message https://t.co/I5bQDCK2Ne pic.twitter.com/EOa1GlPxJe— Peter Yang (@petergyang) January 25, 2026

Just had Clawdbot set up Ollama with a local model. Now it handles website summaries and simple tasks locally instead of burning API credits.Blown away that an AI just installed another AI to save me money. pic.twitter.com/RRvXQAgBfX— Max (@talkaboutdesign) January 25, 2026

Clawdbot is controlling LMStudio remotely from telegram, downloading Qwen, which it will then use to power some of my tasks with Clawdbot. pic.twitter.com/ll2adg19Za— Matthew Berman (@MatthewBerman) January 25, 2026

Clawdbot now takes an idea, manages codex and claude, debates them on reviews autonomously, and lets me know when it’s done. Amazing. A whole feature deployed while I’m out on a walk. pic.twitter.com/ws3UDQG2S0— Aaron Ng (@localghost) January 25, 2026

The post What is Clawdbot? How a Local First Agent Stack Turns Chats into Real Automations appeared first on MarkTechPost.

Build a serverless AI Gateway architecture with AWS AppSync Events

AWS AppSync Events can help you create more secure, scalable Websocket APIs. In addition to broadcasting real-time events to millions of Websocket subscribers, it supports a crucial user experience requirement of your AI Gateway: low-latency propagation of events from your chosen generative AI models to individual users.
In this post, we discuss how to use AppSync Events as the foundation of a capable, serverless, AI gateway architecture. We explore how it integrates with AWS services for comprehensive coverage of the capabilities offered in AI gateway architectures. Finally, we get you started on your journey with sample code you can launch in your account and begin building.
Overview of AI Gateway
AI Gateway is an architectural middleware pattern that helps enhance the availability, security, and observability of large language models (LLMs). It supports the interests of several different personas. For example, users want low latency and delightful experiences. Developers want flexible and extensible architectures. Security staff need governance to protect information and availability. System engineers need monitoring and observability solutions that help them support the user experience. Product managers need information about how well their products perform with users. Budget managers need cost controls. The needs of these different people across your organization are important considerations for hosting generative AI applications.
Solution overview
The solution we share in this post offers the following capabilities:

Identity – Authenticate and authorize users from the built-in user directory, from your enterprise directory, and from consumer identity providers like Amazon, Google, and Facebook
APIs – Provide users and applications low-latency access to your generative AI applications
Authorization – Determine what resources your users have access to in your application
Rate limiting and metering – Mitigate bot traffic, block access, and manage model consumption to manage cost
Diverse model access – Offer access to leading foundation models (FMs), agents, and safeguards to keep users safe
Logging – Observe, troubleshoot, and analyze application behavior
Analytics – Extract value from your logs to build, discover, and share meaningful insights
Monitoring – Track key datapoints that help staff react quickly to events
Caching – Reduce costs by detecting common queries to your models and returned predetermined responses

In the following sections, we dive into the core architecture and explore how you can build these capabilities into the solution.
Identity and APIs
The following diagram illustrates an architecture using the AppSync Events API to provide an interface between an AI assistant application and LLMs through Amazon Bedrock using AWS Lambda.

The workflow consists of the following steps:

The client application retrieves the user identity and authorization to access APIs using Amazon Cognito.
The client application subscribes to the AppSync Events channel, from which it will receive events like streaming responses from the LLMs in Amazon Bedrock.
The SubscribeHandler Lambda function attached to the Outbound Messages namespace verifies that this user is authorized to access the channel.
The client application publishes a message to the Inbound Message channel, such as a question posed to the LLM.
The ChatHandler Lambda function receives the message and verifies the user is authorized to publish messages on that channel.
The ChatHandler function calls the Amazon Bedrock ConverseStream API and waits for the response stream from the Converse API to emit response events.
The ChatHandler function relays the response messages from the Converse API to the Outbound Message channel for the current user, which passes the events to the WebSocket on which the client application is waiting for messages.

AppSync Events namespaces and channels are the building blocks of your communications architecture in your AI Gateway. In the example, namespaces are used to attach different behaviors to our inbound and outbound messages. Each namespace can have different publish and subscribe integration to each namespace. Moreover, each namespace is divided into channels. Our channel structure design provides each user a private inbound and outbound channel, serving as one-to-one communications with the server side:

Inbound-Messages / ${sub}
Outbound-Messages / ${sub}

The subject, or sub attribute, arrives in our Lambda functions as context from Amazon Cognito. It is an unchangeable, unique user identifier within each user pool. This makes it useful for segments of our channel names and is especially useful for authorization.
Authorization
Identity is established using Amazon Cognito, but we still need to implement authorization. One-to-one communication between a user and an AI assistant in our example should be private—we don’t want users with the knowledge of another user’s sub attribute to be able to subscribe to or publish to another user’s inbound or outbound channel.
This is why we use sub in our naming scheme for channels. This enables the Lambda functions attached to the namespaces as data sources to verify that a user is authorized to publish and subscribe.
The following code sample is our SubscribeHandler Lambda function:

def lambda_handler(event, context):
    “””
    Lambda function that checks if the first channel segment matches the user’s sub.
    Returns None if it matches or an error message otherwise.
    “””

    # Extract segments and sub from the event
    segments = event.get(“info”, {}).get(“channel”, {}).get(“segments”)
    sub = event.get(“identity”, {}).get(“sub”, None)

    # Check if segments exist and the first segment matches the user’s sub
    if not segments:
        logger.error(“No segments found in event”)
        return “No segments found in channel path”

    if sub != segments[1]:
        logger.warning(
            f”Unauhotirzed: Sub ‘{sub}’ did not match path segment ‘{segments[1]}'”
        )
        return “Unauthorized”

    logger.info(f”Sub ‘{sub}’ matched path segment ‘{segments[1]}'”)

    return None

The function workflow consists of the following steps:

The name of the channel arrives in the event.
The user’s subject field, sub, is part of the context.
If the channel name and user identity don’t match, it doesn’t authorize the subscription and returns an error message.
Returning None indicates no errors and that the subscription is authorized.

The ChatHandler Lambda function uses the same logic to make sure users are only authorized to publish to their own inbound channel. The channel arrives in the event and the context carries the user identity.
Although our example is simple, it demonstrates how you can implement complex authorization rules using a Lambda function to authorize access to channels in AppSync Events.We have covered access control to an individual’s inbound and outbound channels. Many business models around access to LLMs involve controlling how many tokens an individual is allowed to use within some period of time. We discuss this capability in the following section.
Rate limiting and metering
Understanding and controlling the number of tokens consumed by users of an AI Gateway is important to many customers. Input and output tokens are the primary pricing mechanism for text-based LLMs in Amazon Bedrock. In our example, we use the Amazon Bedrock Converse API to access LLMs. The Converse API provides a consistent interface that works with the models that support messages. You can write code one time and use it with different models.
Part of the consistent interface is the stream metadata event. This event is emitted at the end of each stream and provides the number of tokens consumed by the stream. The following is an example JSON structure:

{
    “metadata”: {
        “usage”: {
            “inputTokens”: 1062,
            “outputTokens”: 512,
            “totalTokens”: 1574
        },
        “metrics”: {
            “latencyMs”: 4133
        }
    }
}

We have input tokens, output tokens, total tokens, and a latency metric. To create a control with this data, we first consider the types of limits we want to implement. One approach is a monthly token limit that resets every month—a static window. Another is a daily limit based on a rolling window on 10-minute intervals. When a user exceeds their monthly limit, they must wait until the next month. After a user exceeds their daily rolling window limit, they must wait 10 minutes for more tokens to become available.
We need a way to keep atomic counters to track the token consumption, with fast real-time access to the counters with the user’s sub, and to delete old counters as they become irrelevant.
Amazon DynamoDB is a serverless, fully managed, distributed NoSQL database with single-digit millisecond performance at many scales. With DynamoDB, we can keep atomic counters, provide access to the counters keyed by the sub, and roll off old data using its time to live feature. The following diagram shows a subset of our architecture from earlier in this post that now includes a DynamoDB table to track token usage.

We can use a single DynamoDB table with the following partition and sort keys:

Partition key – user_id (String), the unique identifier for the user
Sort key – period_id (String), a composite key that identifies the time period

The user_id will receive the sub attribute from the JWT provided by Amazon Cognito. The period_id will have strings that sort lexicographically that indicate which time period the counter is for as well as the timeframe. The following are some example sort keys:

10min:2025-08-05:16:40
10min:2025-08-05:16:50
monthly:2025-08

10min or monthly indicate the type of counter. The timestamp is set to the last 10-minute window (for example, (minute // 10) * 10).
With each record, we keep the following attributes:

input_tokens – Counter for input tokens used in this 10-minute window
output_tokens – Counter for output tokens used in this 10-minute window
timestamp – Unix timestamp when the record was created or last updated
ttl – Time to live value (Unix timestamp), set to 24 hours from creation

The two token columns are incremented with the DynamoDB atomic ADD operation with each metadata event from the Amazon Bedrock Converse API. The ttl and timestamp columns are updated to indicate when the record is automatically removed from the table.
When a user sends a message, we check whether they have exceeded their daily or monthly limits.
To calculate daily usage, the meter.py module completes the following steps:

Calculates the start and end keys for the 24-hour window.
Queries records with the partition key user_id and sort key between the start and end keys.
Sums up the input_tokens and output_tokens values from the matching records.
Compares the sums against the daily limits.

See the following example code:

KeyConditionExpression: “user_id = :uid AND period_id BETWEEN :start AND :end”
ExpressionAttributeValues: {
    “:uid”: {“S”: “user123”},
    “:start”: {“S”: “10min:2025-08-04:15:30”},
    “:end”: {“S”: “10min:2025-08-05:15:30”}
}

This range query takes advantage of the naturally sorted keys to efficiently retrieve only the records from the last 24 hours, without filtering in the application code.The monthly usage calculation on the static window is much simpler. To check monthly usage, the system completes the following steps:

Gets the specific record with the partition key user_id and sort key monthly:YYYY-MM for the current month.
Compares the input_tokens and output_tokens values against the monthly limits.

See the following code:

Key: {
    “user_id”: {“S”: “user123”},
    “period_id”: {“S”: “monthly:2025-08”}
}

With an additional Python module and DynamoDB, we have a metering and rate limiting solution that works for both static and rolling windows.
Diverse model access
Our sample code uses the Amazon Bedrock Converse API. Not every model is included in the sample code, but many models are included for you to rapidly explore possibilities.The innovation in this area doesn’t stop at models on AWS. There are numerous ways to develop generative AI solutions at every level of abstraction. You can build on top of the layer that best suits your use case.
Swami Sivasubramanian recently wrote on how AWS is enabling customers to deliver production-ready AI agents at scale. He discusses Strands Agents, an open source AI agents SDK, as well as Amazon Bedrock AgentCore, a comprehensive set of enterprise-grade services that help developers quickly and more securely deploy and operate AI agents at scale using a framework and model, hosted on Amazon Bedrock or elsewhere.
To learn more about architectures for AI agents, refer to Strands Agents SDK: A technical deep dive into agent architectures and observability. The post discusses the Strands Agents SDK and its core features, how it integrates with AWS environments for more secure, scalable deployments, and how it provides rich observability for production use. It also provides practical use cases and a step-by-step example.
Logging
Many of our AI Gateway stakeholders are interested in logs. Developers want to understand how their applications function. System engineers need to understand operational concerns like tracking availability and capacity planning. Business owners want analytics and trends so that they can make better decisions.
With Amazon CloudWatch Logs, you can centralize the logs from your different systems, applications, and AWS services that you use in a single, highly scalable service. You can then seamlessly view them, search them for specific error codes or patterns, filter them based on specific fields, or archive them securely for future analysis. CloudWatch Logs makes it possible to see your logs, regardless of their source, as a single and consistent flow of events ordered by time.
In the sample AI Gateway architecture, CloudWatch Logs is integrated at multiple levels to provide comprehensive visibility. The following architecture diagram depicts the integration points between AppSync Events, Lambda, and CloudWatch Logs in the sample application.

AppSync Events API logging
Our AppSync Events API is configured with ERROR-level logging to capture API-level issues. This configuration helps identify issues with API requests, authentication failures, and other critical API-level problems.The logging configuration is applied during the infrastructure deployment:

this.api = new appsync.EventApi(this, “Api”, {
    // … other configuration …
    logConfig: {
        excludeVerboseContent: true,
        fieldLogLevel: appsync.AppSyncFieldLogLevel.ERROR,
        retention: logs.RetentionDays.ONE_WEEK,
    },
});

This provides visibility into API operations.
Lambda function structured logging
The Lambda functions use AWS Lambda Powertools for structured logging. The ChatHandler Lambda function implements a MessageTracker class that provides context for each conversation:

logger = Logger(service=”eventhandlers”)

class MessageTracker:
    “””
    Tracks message state during processing to provide enhanced logging.
    Handles event type detection and processing internally.
    “””

    def __init__(self, user_id, conversation_id, user_message, model_id):
        self.user_id = user_id
        self.conversation_id = conversation_id
        self.user_message = user_message
        self.assistant_response = “”
        self.input_tokens = 0
        self.output_tokens = 0
        self.model_id = model_id
        # …

Key information logged includes:

User identifiers
Conversation identifiers for request tracing
Model identifiers to track which AI models are being used
Token consumption metrics (input and output counts)
Message previews
Detailed timestamps for time-series analysis

Each Lambda function sets a correlation ID for request tracing, making it straightforward to follow a single request through the system:

# Set correlation ID for request tracing
logger.set_correlation_id(context.aws_request_id)

Operational insights
CloudWatch Logs Insights enables SQL-like queries across log data, helping you perform the following actions:

Track token usage patterns by model or user
Monitor response times and identify performance bottlenecks
Detect error patterns and troubleshoot issues
Create custom metrics and alarms based on log data

By implementing comprehensive logging throughout the sample AI Gateway architecture, we provide the visibility needed for effective troubleshooting, performance optimization, and operational monitoring. This logging infrastructure serves as the foundation for both operational monitoring and the analytics capabilities we discuss in the following section.
Analytics
CloudWatch Logs provides operational visibility, but for extracting business intelligence from logs, AWS offers many analytics services. With our sample AI Gateway architecture, you can use those services to transform data from your AI Gateway without requiring dedicated infrastructure or complex data pipelines.
The following architecture diagram shows the flow of data between the Lambda function, Amazon Data Firehose, Amazon Simple Storage Service (Amazon S3), the AWS Glue Data Catalog, and Amazon Athena.

The key components include:

Data Firehose – The ChatHandler Lambda function streams structured log data to a Firehose delivery stream at the end of each completed user response. Data Firehose provides a fully managed service that automatically scales with your data throughput, alleviating the need to provision or manage infrastructure. The following code illustrates how the API call that integrates the ChatHandler Lambda function with the delivery stream:

# From messages.py
firehose_stream = os.environ.get(“FIREHOSE_DELIVERY_STREAM”)
if firehose_stream:
    try:
        firehose.put_record(
            DeliveryStreamName=firehose_stream,
            Record={“Data”: json.dumps(log_data) + “n”},
        )
        logger.debug(f”Successfully sent data to Firehose stream: {firehose_stream}”)
    except Exception as e:
        logger.error(f”Failed to send data to Firehose: {str(e)}”)

Amazon S3 with Parquet format – Firehose automatically converts the JSON log data to columnar Parquet format before storing it in Amazon S3. Parquet improves query performance and reduces storage costs compared to raw JSON logs. The data is partitioned by year, month, and day, enabling efficient querying of specific time ranges while minimizing the amount of data scanned during queries.
AWS Glue Data Catalog – An AWS Glue database and table are created in the AWS Cloud Development Kit (AWS CDK) application to define the schema for our analytics data, including user_id, conversation_id, model_id, token counts, and timestamps. Table partitions are added as new S3 objects are stored by Data Firehose.
Athena for SQL-based analysis – With the table in the Data Catalog, business analysts can use familiar SQL through Athena to extract insights. Athena is serverless and priced per query based on the amount of data scanned, making it a cost-effective solution for one-time analysis without requiring database infrastructure. The following is an example query:

— Example: Token usage by model
SELECT
    model_id,
    SUM(input_tokens) as total_input_tokens,
    SUM(output_tokens) as total_output_tokens,
    COUNT(*) as conversation_count
FROM firehose_database.firehose_table
WHERE year=’2025′ AND month=’08’
GROUP BY model_id
ORDER BY total_output_tokens DESC;

This serverless analytics pipeline transforms the events flowing through AppSync Events into structured, queryable tables with minimal operational overhead. The pay-as-you-go pricing model of these services facilitates cost-efficiency, and their managed nature alleviates the need for infrastructure provisioning and maintenance. Furthermore, with your data cataloged in AWS Glue, you can use the full suite of analytics and machine learning services on AWS such as Amazon Quick Sight and Amazon SageMaker Unified Studio with your data.
Monitoring
AppSync Events and Lambda functions send metrics to CloudWatch so you can monitor performance, troubleshoot issues, and optimize your AWS AppSync API operations effectively. For an AI Gateway, you might need more information in your monitoring system to track important metrics such as token consumption from your models.
The sample application includes a call to CloudWatch metrics to record the token consumption and LLM latency at the end of each conversation turn so operators have visibility into this data in real time. This enables metrics to be included in dashboards and alerts. Moreover, the metric data includes the LLM model identifier as a dimension so you can track token consumption and latency by model. Metrics are just one component of what we can learn about our application at runtime with CloudWatch. Because our log messages are formatted as JSON, we can perform analytics on our log data for monitoring using CloudWatch Logs Insights. The following architecture diagram illustrates the logs and metrics made available by AppSync Events and Lambda through CloudWatch and CloudWatch Logs Insights.

For example, the following query against the sample application’s log groups shows us the users with the most conversations within a given time window:

fields , 
| filter  like “Message complete”
| stats count_distinct(conversation_id) as conversation_count by user_id
| sort conversation_count desc
| limit 10

@timestamp and @message are standard fields for Lambda logs. On line 3, we compute the number of unique conversation identifiers for each user. Thanks to the JSON formatting of the messages, we don’t need to provide parsing instructions to read these fields. The Message complete log message is found in packages/eventhandlers/eventhandlers/messages.py in the sample application.
The following query example shows the number of unique users using the system for a given window:

fields , 
| filter  like “Message complete”
| stats count_distinct(user_id) by bin(5m) as unique_users

Again, we filter for Message complete, compute unique statistics on the user_id field from our JSON messages, and then emit the data as a time series with 5-minute intervals with the bin function.
Caching (prepared responses)
Many AI Gateways provide a cache mechanism for assistant messages. This would be appropriate in situations where large numbers of users ask exactly the same questions and need the same exact answers. This could be a considerable cost savings for a busy application in the right situation. A good candidate for caching might be about the weather. For example, with the question “Is it going to rain in NYC today?”, everyone should see the same response. A bad candidate for caching would be one where the user might ask the same thing but would receive private information in return, such as “How many vacation hours do I have right now?” Take care to use this idea safely in your area of work. A basic cache implementation is included in the sample to help you get started with this mechanism. Caches in conversational AI require a lot of care to be taken to make sure information doesn’t leak between users. Given the amount of context an LLM can use to tailor a response, caches should be used judiciously.
The following architecture diagram shows the use of DynamoDB as a storage mechanism for prepared responses in the sample application.

The sample application computes a hash on the user message to query a DynamoDB table with stored messages. If there is a message available for a hash key, the application returns the text to the user, the custom metrics record a cache hit in CloudWatch, and an event is passed back to AppSync Events to notify the application the response is complete. This encapsulates the cache behavior completely within the event structure the application understands.
Install the sample application
Refer to the README file on GitHub for instructions to install the sample application. Both install and uninstall are driven by a single command to deploy or un-deploy the AWS CDK application.
Sample pricing
The following table estimates monthly costs of the sample application with light usage in a development environment. Actual cost will vary by how you use the services for your use case.

Service
Unit
Price/Unit
Sample Usage
Links

AWS Glue
Objects Stored
No additional cost for first million objects stored, $1.00/100,000 above 1 million per month
Less than 1 million
https://aws.amazon.com/glue/pricing/

Amazon DynamoDB
Read Request Units
$0.625 per million write request units
$2.00
https://aws.amazon.com/dynamodb/pricing/on-demand/

Write Request Units
$0.125 per million write request units

Amazon S3 (Standard)
First 50 TB / Month
$0.023 per GB
$2.00
https://aws.amazon.com/s3/pricing/

AppSync Events
Event API Operations
$1.00 per million API operations
$1.00
https://aws.amazon.com/appsync/pricing/

Connection Minutes
$0.08 per million connection minutes

Amazon Data Firehose
TB / month
$0.029 First 500 TB / month
$1.00
https://aws.amazon.com/firehose/pricing/

Format Conversion per GB
$0.018 / GB

AWS Lambda
Requests
$0.20 / 1M requests
$1.00
https://aws.amazon.com/lambda/pricing/

Duration
$0.0000000067 / GB-seconds / month

Amazon Bedrock
Input Tokens
$3.00 / 1M input tokens(Anthropic Claude 4 Sonnet)
$20.00-$40.00
https://aws.amazon.com/bedrock/pricing/

Output Tokens
$15.00 / 1M Output tokens(Anthropic Claude 4 Sonnet)

AWS WAF
Base
$5.00 / month
$8.00
https://aws.amazon.com/waf/pricing/

Rules
$1.00 / rule / month

Requests
$0.60 / 1M requests

Amazon Cognito
Monthly Active Users
First 10,000 users, no additional cost
$0.00
https://aws.amazon.com/cognito/pricing/

Amazon CloudFront
Requests, Data Transfer Out
See pricing
$0.00
https://aws.amazon.com/cloudfront/pricing/

Amazon CloudWatch
Logs, Metrics
See pricing
$0.00
https://aws.amazon.com/cloudwatch/pricing/

The monthly cost of the sample application, assuming light development use, is expected to be between $35–55 per month.
Sample UI
The following screenshots showcase the sample UI. It provides a conversation window on the right and a navigation bar on the left. The UI features the following key components:

A Token Usage section is displayed and updated with each turn of the conversation
The New Chat option clears the messages from the chat interface so the user can start a new session
The model selector dropdown menu shows the available models

The following screenshot shows the chat interface of the sample application.

The following screenshot shows the model selection menu.

Conclusion
As the AI landscape evolves, you need an infrastructure that adapts as quickly as the models themselves. By centering your architecture around AppSync Events and the serverless patterns we’ve covered—including Amazon Cognito based identity authentication, DynamoDB powered metering, CloudWatch observability, and Athena analytics—you can build a foundation that grows with your needs. The sample application presented in this post gives you a starting point that demonstrates real-world patterns, helping developers explore AI integration, architects design enterprise solutions, and technical leaders evaluate approaches.
The complete source code and deployment instructions are available in the GitHub repo. To get started, deploy the sample application and explore the nine architectures in action. You can customize the authorization logic to match your organization’s requirements and extend the model selection to include your preferred models on Amazon Bedrock. Share your implementation insights with your organization, and leave your feedback and questions in the comments.

About the authors

Archie Cowan is a Senior Prototype Developer on the AWS Industries Prototyping and Cloud Engineering team. He joined AWS in 2022 and has developed software for companies in Automotive, Energy, Technology, and Life Sciences industries. Before AWS, he led the architecture team at ITHAKA, where he made contributions to the search engine on jstor.org and a production deployment velocity increase from 12 to 10,000 releases per year over the course of his tenure there. You can find more of his writing on topics such as coding with ai at fnjoin.com and x.com/archiecowan.

How Totogi automated change request processing with Totogi BSS Magic a …

This post is cowritten by Nikhil Mathugar, Marc Breslow and Sudhanshu Sinha from Totogi.
This blog post describes how Totogi automates change request processing. Totogi is an AI company focused on helping helping telecom (telco) companies innovate, accelerate growth and adopt AI at scale. BSS Magic, Totogi’s flagship product, connects and models telco business operations, overlaying legacy systems with an AI layer. With BSS Magic, telcos can extend, customize, and modernize their systems without vendor dependencies or lengthy implementations. By partnering with the AWS Generative AI Innovation Center and using the rapid innovation capabilities of Amazon Bedrock, we accelerated the development of BSS Magic, helping Totogi’s customers innovate faster and gain more control over their tech stack.
In this post, we explore the challenges associated with the traditional business support system (BSS), and the innovative solutions provided by Totogi BSS Magic. We introduce intricacies of telco ontologies and the multi-agent framework that powers automated change request processing. Additionally, the post will outline the orchestration of AI agents and the benefits of this approach for telecom operators and beyond.
Challenges with BSS
BSS are notoriously difficult to manage. A typical BSS stack consists of hundreds of different applications from various vendors. But those BSS applications are difficult to integrate, either restricting telcos to the vendor’s ecosystem or requiring them to invest in costly customizations. Such customizations are slow and resource-intensive because of their reliance on specialized engineering talent.
Each change request necessitates a thorough analysis of potential impacts across interconnected modules, consuming significant time and effort. Even small updates can involve multiple rounds of coding, testing, and reconfiguration to achieve stability. For telecom operators, where system reliability is critical, these safeguards are non-negotiable, but they come at a steep price. This process is further complicated by the scarcity of engineers with the necessary expertise, driving up costs and elongating timelines. As a result, development cycles for new features or services often take months to complete, leaving operators struggling to meet the demands of a fast-moving market.
Initiatives like TM Forum’s Open Digital Architecture (ODA) aim to solve this, yet most vendors are slow to adopt such open standards. This dynamic amplifies technical debt and inflates operational expenses.
BSS Magic solution overview
Totogi BSS Magic reduces the complexity using AI-generated interoperability, which helps simplify integrations, customizations, and application development. BSS Magic has two key aspects:

A telco ontology that understands the semantic meanings of data structures and the relationships between them, linking disparate data into a coherent network of knowledge.
Multi-agent framework for fully automated change requests (CR), which reduces CR processing time from 7 days to a few hours.

Telco ontology: The key to interoperability
Ontologies serve as semantic blueprints that detail concepts, relationships, and domain knowledge. In telecom, this means translating the BSS landscape into a clear, reusable, and interoperable ecosystem. Totogi’s telco ontology facilitates a deep understanding of data interaction and seamless integration across any vendor or system. By adopting FAIR principles (Findability, Accessibility, Interoperability, and Reusability), the ontology-driven architecture turns static, siloed data into dynamic, interconnected knowledge assets—unlocking trapped data and accelerating innovation. An overview diagram of the ontology is provided in the following figure.

Multi-agent framework for automated change request processing
AI agents are advanced software applications trained to perform specific tasks autonomously. Totogi’s BSS Magic AI agents have extensive domain knowledge and use this understanding to manage complex data interactions across multiple vendor systems. These agents automatically generate and test telco-grade code, replacing traditional integrations and customizations with intelligent, AI generated applications. At its core, BSS Magic uses a multi-agent AI approach with feedback loops to automate the entire software development pipeline. Each agent is designed to fulfill a specific role in the development pipeline:

Business analysis agent translates unstructured requirements into formal business specifications.
Technical architect agent takes these business specs and defines technical architectures, APIs, and dependencies.
Developer agent generates high-quality, deployable code, complete with modular designs and optimizations.
QA agent validates the code for adherence to best practices, improving quality and security. It provides feedback which is used by the developer agent to update the code.
Tester agent generates robust unit test cases, streamlining validation and deployment. The result of the test cases is used by the developer agent to improve the code.

An overview of the system is provided in the following figure.

This integrated pipeline reduces the time to complete a change request from 7 days to a few hours, with minimal human intervention. The prerequisites for implementing the system include an AWS account with access to Amazon Bedrock, AWS Step Functions, AWS Lambda, and configured Amazon credentials. The AI agents are implemented using Anthropic Claude large language models (LLMs) through Amazon Bedrock. State management and workflow coordination are handled by Step Functions for reliable progression through each stage. The AWS infrastructure provides the enterprise-grade reliability, security, and scalability essential for telco-grade solutions.
To build the framework, Totogi collaborated with the AWS Generative AI Innovation Center (GenAIIC). GenAIIC offered access to AI expertise, industry-leading talent, and a rigorous iterative process to optimize the AI agents and code-generation workflows. It also provided guidance on prompt engineering, Retrieval Augmented Generation (RAG), model selection, automated code review, feedback loops, robust performance metrics for evaluating AI-generated outputs, and so on. The collaboration helped establish methods for maintaining reliability while scaling automation across the platform. The solution orchestrates multiple specialized AI agents to handle the complete software development lifecycle, from requirements analysis to test execution. The details of the AI agents are given in the following sections.
Multi-agent orchestration layer
The orchestration layer coordinates specialized AI agents through a combination of Step Functions and Lambda functions. Each agent maintains context through RAG and few-shot prompting techniques to generate accurate domain-specific outputs. The system manages agent communication and state transitions while maintaining a comprehensive audit trail of decisions and actions.
Business analysis generation
The Business Analyst agent uses Claude’s natural language understanding capabilities to process statement of work (SOW) documents and acceptance criteria. It extracts key requirements using custom prompt templates optimized for telecom BSS domain knowledge. The agent generates structured specifications for downstream processing while maintaining traceability between business requirements and technical implementations.
Technical architecture generation
The Technical Architect agent transforms business requirements into concrete AWS service configurations and architectural patterns. It generates comprehensive API specifications and data models and incorporates AWS Well-Architected principles. The agent validates architectural decisions against established patterns and best practices, producing infrastructure-as-code templates for automated deployment.
Code generation pipeline
The Developer agent converts technical specifications into implementation code using Claude’s advanced code generation capabilities. It produces robust, production-ready code that includes proper error handling and logging mechanisms. The pipeline incorporates feedback from validation steps to iteratively improve code quality and maintain consistency with AWS best practices.
Automated quality assurance
The QA agent is built using Claude to perform comprehensive code analysis and validation. It evaluates code quality and identifies potential performance issues. The system maintains continuous feedback loops with the development stage, facilitating rapid iteration and improvement of generated code based on quality metrics and best practices adherence. The QA process consists of carefully crafted prompts.
QA code analysis prompt:

“You are a senior QA backend engineer analyzing Python code for serverless applications.
Your task is to:
Compare requirements against implemented code
Identify missing features
Suggest improvements in code quality and efficiency
 Provide actionable feedback
Focus on overall implementation versus minor details
Consider serverless best practices”

This prompt helps the QA agent perform thorough code analysis, evaluate quality metrics, and maintain continuous feedback loops with development stages.
Test automation framework
The Tester agent creates comprehensive test suites that verify both functional and non-functional requirements. It uses Claude to understand test contexts and generate appropriate test scenarios. The framework manages test refinement through evaluation cycles, achieving complete coverage of business requirements while maintaining test code quality and reliability. The testing framework uses a multi-stage prompt approach.
Initial test structure prompt:

“As a senior QA engineer, create a pytest-based test structure including:
Detailed test suite organization
Resource configurations
Test approach and methodology
Required imports and dependencies”

Test implementation prompt:

“Generate complete pytest implementation including:
Unit tests for each function
Integration tests for API endpoints
AWS service mocking
Edge case coverage
Error scenario handling”

Test results analysis prompt:

“Evaluate test outputs and coverage reports to:
Verify test completion status
Track test results and outcomes
Measure coverage metrics
Provide actionable feedback”

This structured approach leads to comprehensive test coverage while maintaining high quality standards. The framework currently achieves 76% code coverage and successfully validates both functional and non-functional requirements.
The Tester agent provides a feedback loop to the Development agent to improve the code.
Conclusion
The integration of Totogi BSS Magic with Amazon Bedrock presents a comprehensive solution for modern telecom operators. Some takeaways for you to consider:

End-to-end automation: BSS Magic automates the entire development lifecycle—from idea to deployment. AI agents handle everything from requirements, architecture, and code generation to testing and validation.
Results: The agentic framework significantly boosted efficiency, reducing change request processing from seven days to a few hours. The automated testing framework achieved 76% code coverage, consistently delivering high-quality telecom-grade code.
Unique value for telecom operators: By using Totogi BSS Magic, telecom operators can accelerate time-to-market and reduce operational costs. BSS Magic uses autonomous AI, independently managing complex tasks so telecom operators can concentrate on strategic innovation. The solution is supported by Amazon Bedrock, which offers scalable AI models and infrastructure, high-level security and reliability critical for telecom.
Impact to other industries: While BSS Magic is geared towards the telecom industry, the multi-agent framework can be repurposed for general software development across other industries.
Future work: Future enhancements will focus on expanding the model’s domain knowledge in telecom and other domains. Another possible extension is to integrate an AI model to predict potential issues in change requests based on historical data, thereby preemptively addressing common pitfalls.

Any feedback and questions are welcome in the comments below. Contact us to engage AWS Generative AI Innovation Center or to learn more.

About the authors
Nikhil Mathugar is a Presales Full Stack Engineer at Totogi, where he designs and implements scalable AWS-based proofs-of-concept across Python and modern JavaScript frameworks. He has over a decade of experience in architecting and maintaining large-scale systems—including web applications, multi-region streaming infrastructures and high-throughput automation pipelines. Building on that foundation, he’s deeply invested in AI—specializing in generative AI, agentic workflows and integrating large-language models to evolve Totogi’s BSS Magic platform.
Marc Breslow is Field CTO of Totogi, where he is utilizing AI to revolutionize the telecommunications industry. A veteran of Accenture, Lehman Brothers, and Citibank, Marc has a proven track record of building scalable, high-performance systems. At Totogi, he leads the development of AI-powered solutions that drive tangible results for telcos: reducing churn, increasing Average Revenue Per user (ARPU), and streamlining business processes. Marc is responsible for customer proof points demonstrating these capabilities. When not engaging with customers, Marc leads teams building Totogi’s BSS Magic technology, generating applications and improving efficiency using AI agents and workflows.
Sudhanshu Sinha is Chief Technology Officer and a founding team member at Totogi, where he works alongside Acting CEO Danielle Rios to drive the telecom industry’s shift to AI-native software. As the key strategist behind BSS Magic, he shaped its architecture, go-to-market, and early adoption—translating AI-native principles into measurable value for operators. He also helped define Totogi’s Telco Ontology, enabling interoperability and automation across complex BSS landscapes. With over two decades in telecommunications, Sudhanshu blends deep technical insight with commercial acumen to make AI-driven transformation practical and profitable for telcos worldwide.
Parth Patwa is a Data Scientist at the AWS Generative AI Innovation Center, where he works on customer projects using Generative AI and LLMs. He has an MS from University of California Los Angeles. He has published papers in top-tier ML and NLP venues, and has over 1000 citations.
Mofijul Islam is an Applied Scientist II and Tech Lead at the AWS Generative AI Innovation Center, where he helps customers tackle customer-centric research and business challenges using generative AI, large language models (LLM), multi-agent learning, code generation, and multimodal learning. He holds a PhD in machine learning from the University of Virginia, where his work focused on multimodal machine learning, multilingual NLP, and multitask learning. His research has been published in top-tier conferences like NeurIPS, ICLR, AISTATS, and AAAI, as well as IEEE and ACM Transactions.
Andrew Ang is a Senior ML Engineer with the AWS Generative AI Innovation Center, where he helps customers ideate and implement generative AI proof of concept projects. Outside of work, he enjoys playing squash and watching competitive cooking shows.
Shinan Zhang is an Applied Science Manager at the AWS Generative AI Innovation Center. With over a decade of experience in ML and NLP, he has worked with large organizations from diverse industries to solve business problems with innovative AI solutions, and bridge the gap between research and industry applications.

StepFun AI Introduce Step-DeepResearch: A Cost-Effective Deep Research …

StepFun has introduced Step-DeepResearch, a 32B parameter end to end deep research agent that aims to turn web search into actual research workflows with long horizon reasoning, tool use and structured reporting. The model is built on Qwen2.5 32B-Base and is trained to act as a single agent that plans, explores sources, verifies evidence and writes reports with citations, while keeping inference cost low.

From Search to Deep Research

Most existing web agents are tuned for multi-hop question-answering benchmarks. They try to match ground truth answers for short questions. This is closer to targeted retrieval than to real research. Deep research tasks are different. They involve latent intent recognition, long horizon decision making, multi-turn tool use, structured-reasoning and cross-source verification under uncertainty.

Step-DeepResearch reframes this as sequential decision making over a compact set of atomic capabilities. The research team defines 4 atomic capabilities, planning and task decomposition, deep-information seeking, reflection and verification, and professional report generation. Instead of orchestrating many external agents, the system internalizes this loop into a single model that decides the next action at each step.

Data Synthesis around Atomic Capabilities

To teach these atomic capabilities, the research team builds separate data pipelines for each skill. For planning, they start from high quality technical reports, survey papers and financial analysis documents. They reverse-engineer realistic research plans and task trees from titles, abstracts and structure, then generate trajectories that follow these plans. This exposes the model to long horizon project structures, not only short question templates.

For deep information seeking, they construct graph based queries over knowledge graphs such as Wikidata5m and CN-DBpedia. They sample subgraphs, expand them using search, and synthesize questions that require multi hop reasoning across entities and documents. A separate pipeline uses a Wiki style hyperlink index to force cross document retrieval and combination of evidence. Easy questions that a strong model can already solve with a simple ReAct style strategy are filtered out, so training focuses on hard search problems.

Reflection and verification data is generated through self-correction loops and multi-agent teacher traces. Teacher agents extract claims, plan checks, verify facts, replan if inconsistencies appear and only then write reports. The resulting trajectories are cleaned and used as supervision for a single student agent. Report generation is trained in 2 phases, mid training for domain style and depth using query report pairs, then supervised fine-tuning with strict formatting and plan consistency constraints.

Progressive Training on Qwen2.5-32B-Base

The training pipeline has 3 stages, agentic mid-training, supervised fine-tuning and reinforcement learning. In mid training stage-1, the team injects atomic capabilities without tools, using context length up to 32k tokens. The data covers active reading, synthetic reasoning traces, summarization and reflection. The research team show steady gains on SimpleQA, TriviaQA and FRAMES as training scales up to about 150B tokens, with the largest gains on FRAMES, which stresses structured reasoning.

In stage-2, the context extends to 128k tokens and explicit tool calls are introduced. The model learns tasks such as URL based question-answering, deep web search, long document summarization and long dialogue reasoning. This stage aligns the model with real research scenarios where search, browsing and analysis must be mixed in one trajectory.

During supervised fine-tuning, the 4 atomic capabilities are composed into full deep search and deep research traces. Data cleaning keeps trajectories that are correct and short in terms of steps and tool calls. The pipeline injects controlled tool errors followed by correction to improve robustness, and enforces citation formats so that reports stay grounded in the retrieved sources.

Reinforcement learning then optimizes the agent in a real tool environment. The research team builds tasks and checklists through reverse synthesis, and trains a checklist style Rubrics Judge to score reports along fine grained dimensions. The reward design converts ternary rubric labels into asymmetric binary rewards that capture both positive targets and violations. The policy is trained with PPO and a learned critic, using generalized advantage estimation with near zero discount so that long trajectories are not truncated.

Single Agent ReAct Architecture and Search Stack

At inference time, Step-DeepResearch runs as a single ReAct style agent that alternates thinking, tool calls and observations until it decides to output a report. The tool set includes batch web search, a todo manager, shell commands and file operations. Execution runs in a sandbox with terminal persistence through tmux. A perception oriented browser reduces redundant page captures by using perceptual hash distance. Tools for document parsing, audio transcription and image analysis support multimodal inputs.

Information acquisition uses 2 related resources. StepFun team states that its Search API is grounded in more than 20M high quality papers and 600 premium indices. The research team then describes a curated authority indexing strategy that isolates more than 600 trusted domains, including government, academic and institutional sites. Retrieval operates at paragraph level and uses authority aware ranking so that high trust domains are preferred when relevance is similar.

The file tools support patch based editing, so the agent can update only modified sections of a report. A summary aware storage scheme writes full tool outputs to local files and injects only compact summaries into the context. This acts as external memory and avoids context overflow for long projects.

Evaluation, Cost and Access

To measure deep research behavior, the team introduce ADR-Bench, a Chinese benchmark with 110 open ended tasks across 9 domains. 70 tasks cover general domains such as education, science and engineering and social life, evaluated by expert side by side comparison. 40 tasks in finance and law are scored with explicit rubrics that follow atomicity and verifiability constraints.

On Scale AI Research Rubrics, Step-DeepResearch reaches 61.42 percent rubric compliance, which is comparable to OpenAI-DeepResearch and Gemini-DeepResearch, and clearly ahead of multiple open and proprietary baselines. On ADR-Bench, expert-based Elo ratings show that the 32B model outperforms larger open-models such as MiniMax-M2, GLM-4.6 and DeepSeek-V3.2, and is competitive with systems like Kimi-Researcher and MiniMax-Agent-Pro.

Key Takeaways

Single agent, atomic capability design: Step-DeepResearch is a 32B parameter single agent built on Qwen2.-32B-Base, it internalizes 4 atomic capabilities, planning, deep information seeking, reflection and verification, and professional report generation, instead of relying on many external agents.

Targeted data synthesis for each skill: The research team builds separate data pipelines for planning, deep information seeking, reflection and report writing, using reverse-engineered plans from real reports, graph-based queries over Wikidata5m and CN-DBpedia, multi-agent teacher traces and strict report formatting data.

Three stage training with long context and RL: Training uses mid training, supervised fine-tuning and reinforcement learning, with mid training up to 150B tokens at 32k and then 128k context, SFT composes full deep research trajectories, and PPO based RL with a Rubrics Judge optimizes reports against fine grained checklists.

ReAct architecture with curated search and external memory: At inference time the model runs a ReAct loop that calls tools for batch web search, todo, shell and file operations, uses a Search API grounded in more than 20M papers and 600 premium indices along with 600+trusted domains, and relies on patch editing and summary aware storage to act as external memory.

Competitive quality with lower cost: On Scale AI Research Rubrics the model reaches 61.42 percent rubric compliance and is competitive with OpenAI-DeepResearch and Gemini-DeepResearch, on ADR Bench it achieves 67.1 percent win or tie rate against strong baselines.

Check out the Paper and Repo. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post StepFun AI Introduce Step-DeepResearch: A Cost-Effective Deep Research Agent Model Built Around Atomic Capabilities appeared first on MarkTechPost.

A Coding Implementation to Automating LLM Quality Assurance with DeepE …

We initiate this tutorial by configuring a high-performance evaluation environment, specifically focused on integrating the DeepEval framework to bring unit-testing rigor to our LLM applications. By bridging the gap between raw retrieval and final generation, we implement a system that treats model outputs as testable code and uses LLM-as-a-judge metrics to quantify performance. We move beyond manual inspection by building a structured pipeline in which every query, retrieved context, and generated response is validated against rigorous academic-standard metrics. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserimport sys, os, textwrap, json, math, re
from getpass import getpass

print(” Hardening environment (prevents common Colab/py3.12 numpy corruption)…”)

!pip -q uninstall -y numpy || true
!pip -q install –no-cache-dir –force-reinstall “numpy==1.26.4″

!pip -q install -U deepeval openai scikit-learn pandas tqdm

print(” Packages installed.”)

import numpy as np
import pandas as pd
from tqdm.auto import tqdm

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

from deepeval import evaluate
from deepeval.test_case import LLMTestCase, LLMTestCaseParams
from deepeval.metrics import (
AnswerRelevancyMetric,
FaithfulnessMetric,
ContextualRelevancyMetric,
ContextualPrecisionMetric,
ContextualRecallMetric,
GEval,
)

print(” Imports loaded successfully.”)

OPENAI_API_KEY = getpass(” Enter OPENAI_API_KEY (leave empty to run without OpenAI): “).strip()
openai_enabled = bool(OPENAI_API_KEY)

if openai_enabled:
os.environ[“OPENAI_API_KEY”] = OPENAI_API_KEY
print(f” OpenAI enabled: {openai_enabled}”)

We initialize our environment by stabilizing core dependencies and installing the deepeval framework to ensure a robust testing pipeline. Next, we import specialized metrics like Faithfulness and Contextual Recall while configuring our API credentials to enable automated, high-fidelity evaluation of our LLM responses. Check out the FULL CODES here.

Copy CodeCopiedUse a different BrowserDOCS = [
{
“id”: “doc_01”,
“title”: “DeepEval Overview”,
“text”: (
“DeepEval is an open-source LLM evaluation framework for unit testing LLM apps. ”
“It supports LLM-as-a-judge metrics, custom metrics like G-Eval, and RAG metrics ”
“such as contextual precision and faithfulness.”
),
},
{
“id”: “doc_02”,
“title”: “RAG Evaluation: Why Faithfulness Matters”,
“text”: (
“Faithfulness checks whether the answer is supported by retrieved context. ”
“In RAG, hallucinations occur when the model states claims not grounded in context.”
),
},
{
“id”: “doc_03”,
“title”: “Contextual Precision”,
“text”: (
“Contextual precision evaluates how well retrieved chunks are ranked by relevance ”
“to a query. High precision means relevant chunks appear earlier in the ranked list.”
),
},
{
“id”: “doc_04”,
“title”: “Contextual Recall”,
“text”: (
“Contextual recall measures whether the retriever returns enough relevant context ”
“to answer the query. Low recall means key information was missed in retrieval.”
),
},
{
“id”: “doc_05”,
“title”: “Answer Relevancy”,
“text”: (
“Answer relevancy measures whether the generated answer addresses the user’s query. ”
“Even grounded answers can be irrelevant if they don’t respond to the question.”
),
},
{
“id”: “doc_06”,
“title”: “G-Eval (GEval) Custom Rubrics”,
“text”: (
“G-Eval lets you define evaluation criteria in natural language. ”
“It uses an LLM judge to score outputs against your rubric (e.g., correctness, tone, policy).”
),
},
{
“id”: “doc_07”,
“title”: “What a DeepEval Test Case Contains”,
“text”: (
“A test case typically includes input (query), actual_output (model answer), ”
“expected_output (gold answer), and retrieval_context (ranked retrieved passages) for RAG.”
),
},
{
“id”: “doc_08”,
“title”: “Common Pitfall: Missing expected_output”,
“text”: (
“Some RAG metrics require expected_output in addition to input and retrieval_context. ”
“If expected_output is None, evaluation fails for metrics like contextual precision/recall.”
),
},
]

EVAL_QUERIES = [
{
“query”: “What is DeepEval used for?”,
“expected”: “DeepEval is used to evaluate and unit test LLM applications using metrics like LLM-as-a-judge, G-Eval, and RAG metrics.”,
},
{
“query”: “What does faithfulness measure in a RAG system?”,
“expected”: “Faithfulness measures whether the generated answer is supported by the retrieved context and avoids hallucinations not grounded in that context.”,
},
{
“query”: “What does contextual precision mean?”,
“expected”: “Contextual precision evaluates whether relevant retrieved chunks are ranked higher than irrelevant ones for a given query.”,
},
{
“query”: “What does contextual recall mean in retrieval?”,
“expected”: “Contextual recall measures whether the retriever returns enough relevant context to answer the query, capturing key missing information issues.”,
},
{
“query”: “Why might an answer be relevant but still low quality in RAG?”,
“expected”: “An answer can address the question (relevant) but still be low quality if it is not grounded in retrieved context or misses important details.”,
},
]

We define a structured knowledge base consisting of documentation snippets that serve as our ground-truth context for the RAG system. We also establish a set of evaluation queries and corresponding expected outputs to create a “gold dataset,” enabling us to assess how accurately our model retrieves information and generates grounded responses. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserclass TfidfRetriever:
def __init__(self, docs):
self.docs = docs
self.texts = [f”{d[‘title’]}n{d[‘text’]}” for d in docs]
self.vectorizer = TfidfVectorizer(stop_words=”english”, ngram_range=(1, 2))
self.matrix = self.vectorizer.fit_transform(self.texts)

def retrieve(self, query, k=4):
qv = self.vectorizer.transform([query])
sims = cosine_similarity(qv, self.matrix).flatten()
top_idx = np.argsort(-sims)[:k]
results = []
for i in top_idx:
results.append(
{
“id”: self.docs[i][“id”],
“score”: float(sims[i]),
“text”: self.texts[i],
}
)
return results

retriever = TfidfRetriever(DOCS)

We implement a custom TF-IDF Retriever class that transforms our documentation into a searchable vector space using bigram-aware TF-IDF vectorization. This allows us to perform cosine similarity searches against the knowledge base, ensuring we can programmatically fetch the top-k most relevant text chunks for any given query. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserdef extractive_baseline_answer(query, retrieved_contexts):
“””
Offline fallback: we create a short answer by extracting the most relevant sentences.
This keeps the notebook runnable even without OpenAI.
“””
joined = “n”.join(retrieved_contexts)
sents = re.split(r”(?<=[.!?])s+”, joined)
keywords = [w.lower() for w in re.findall(r”[a-zA-Z]{4,}”, query)]
scored = []
for s in sents:
s_l = s.lower()
score = sum(1 for k in keywords if k in s_l)
if len(s.strip()) > 20:
scored.append((score, s.strip()))
scored.sort(key=lambda x: (-x[0], -len(x[1])))
best = [s for sc, s in scored[:3] if sc > 0]
if not best:
best = [s.strip() for s in sents[:2] if len(s.strip()) > 20]
ans = ” “.join(best).strip()
if not ans:
ans = “I could not find enough context to answer confidently.”
return ans

def openai_answer(query, retrieved_contexts, model=”gpt-4.1-mini”):
“””
Simple RAG prompt for demonstration. DeepEval metrics can still evaluate even if
your generation prompt differs; the key is we store retrieval_context separately.
“””
from openai import OpenAI
client = OpenAI()

context_block = “nn”.join([f”[CTX {i+1}]n{c}” for i, c in enumerate(retrieved_contexts)])
prompt = f”””You are a concise technical assistant.
Use ONLY the provided context to answer the query. If the answer is not in context, say you don’t know.

Query:
{query}

Context:
{context_block}

Answer:”””
resp = client.chat.completions.create(
model=model,
messages=[{“role”: “user”, “content”: prompt}],
temperature=0.2,
)
return resp.choices[0].message.content.strip()

def rag_answer(query, retrieved_contexts):
if openai_enabled:
try:
return openai_answer(query, retrieved_contexts)
except Exception as e:
print(f” OpenAI generation failed, falling back to extractive baseline. Error: {e}”)
return extractive_baseline_answer(query, retrieved_contexts)
else:
return extractive_baseline_answer(query, retrieved_contexts)

We implement a hybrid answering mechanism that prioritizes high-fidelity generation via OpenAI while maintaining a keyword-based extractive baseline as a reliable fallback. By isolating the retrieval context from the final generation, we ensure our DeepEval test cases remain consistent regardless of whether the answer is synthesized by an LLM or extracted programmatically. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserprint(“n Running RAG to create test cases…”)

test_cases = []
K = 4

for item in tqdm(EVAL_QUERIES):
q = item[“query”]
expected = item[“expected”]

retrieved = retriever.retrieve(q, k=K)
retrieval_context = [r[“text”] for r in retrieved]

actual = rag_answer(q, retrieval_context)

tc = LLMTestCase(
input=q,
actual_output=actual,
expected_output=expected,
retrieval_context=retrieval_context,
)
test_cases.append(tc)

print(f” Built {len(test_cases)} LLMTestCase objects.”)

print(“n Metrics configured.”)

metrics = [
AnswerRelevancyMetric(threshold=0.5, model=”gpt-4.1″, include_reason=True, async_mode=True),
FaithfulnessMetric(threshold=0.5, model=”gpt-4.1″, include_reason=True, async_mode=True),
ContextualRelevancyMetric(threshold=0.5, model=”gpt-4.1″, include_reason=True, async_mode=True),
ContextualPrecisionMetric(threshold=0.5, model=”gpt-4.1″, include_reason=True, async_mode=True),
ContextualRecallMetric(threshold=0.5, model=”gpt-4.1″, include_reason=True, async_mode=True),

GEval(
name=”RAG Correctness Rubric (GEval)”,
criteria=(
“Score the answer for correctness and usefulness. ”
“The answer must directly address the query, must not invent facts not supported by context, ”
“and should be concise but complete.”
),
evaluation_params=[
LLMTestCaseParams.INPUT,
LLMTestCaseParams.ACTUAL_OUTPUT,
LLMTestCaseParams.EXPECTED_OUTPUT,
LLMTestCaseParams.RETRIEVAL_CONTEXT,
],
model=”gpt-4.1″,
threshold=0.5,
async_mode=True,
),
]

if not openai_enabled:
print(“n You did NOT provide an OpenAI API key.”)
print(“DeepEval’s LLM-as-a-judge metrics (AnswerRelevancy/Faithfulness/Contextual* and GEval) require an LLM judge.”)
print(“Re-run this cell and provide OPENAI_API_KEY to run DeepEval metrics.”)
print(“n However, your RAG pipeline + test case construction succeeded end-to-end.”)
rows = []
for i, tc in enumerate(test_cases):
rows.append({
“id”: i,
“query”: tc.input,
“actual_output”: tc.actual_output[:220] + (“…” if len(tc.actual_output) > 220 else “”),
“expected_output”: tc.expected_output[:220] + (“…” if len(tc.expected_output) > 220 else “”),
“contexts”: len(tc.retrieval_context or []),
})
display(pd.DataFrame(rows))
raise SystemExit(“Stopped before evaluation (no OpenAI key).”)

We execute the RAG pipeline to generate LLMTestCase objects by pairing our retrieved context with model-generated answers and ground-truth expectations. We then configure a comprehensive suite of DeepEval metrics, including G-Eval and specialized RAG indicators, to evaluate the system’s performance using an LLM-as-a-judge approach. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserprint(“n Running DeepEval evaluate(…) …”)

results = evaluate(test_cases=test_cases, metrics=metrics)

summary_rows = []
for idx, tc in enumerate(test_cases):
row = {
“case_id”: idx,
“query”: tc.input,
“actual_output”: tc.actual_output[:200] + (“…” if len(tc.actual_output) > 200 else “”),
}
for m in metrics:
row[m.__class__.__name__ if hasattr(m, “__class__”) else str(m)] = None

summary_rows.append(row)

def try_extract_case_metrics(results_obj):
extracted = []
candidates = []
for attr in [“test_results”, “results”, “evaluations”]:
if hasattr(results_obj, attr):
candidates = getattr(results_obj, attr)
break
if not candidates and isinstance(results_obj, list):
candidates = results_obj

for case_i, case_result in enumerate(candidates or []):
item = {“case_id”: case_i}
metrics_list = None
for attr in [“metrics_data”, “metrics”, “metric_results”]:
if hasattr(case_result, attr):
metrics_list = getattr(case_result, attr)
break
if isinstance(metrics_list, dict):
for k, v in metrics_list.items():
item[f”{k}_score”] = getattr(v, “score”, None) if v is not None else None
item[f”{k}_reason”] = getattr(v, “reason”, None) if v is not None else None
else:
for mr in metrics_list or []:
name = getattr(mr, “name”, None) or getattr(getattr(mr, “metric”, None), “name”, None)
if not name:
name = mr.__class__.__name__
item[f”{name}_score”] = getattr(mr, “score”, None)
item[f”{name}_reason”] = getattr(mr, “reason”, None)
extracted.append(item)
return extracted

case_metrics = try_extract_case_metrics(results)

df_base = pd.DataFrame([{
“case_id”: i,
“query”: tc.input,
“actual_output”: tc.actual_output,
“expected_output”: tc.expected_output,
} for i, tc in enumerate(test_cases)])

df_metrics = pd.DataFrame(case_metrics) if case_metrics else pd.DataFrame([])
df = df_base.merge(df_metrics, on=”case_id”, how=”left”)

score_cols = [c for c in df.columns if c.endswith(“_score”)]
compact = df[[“case_id”, “query”] + score_cols].copy()

print(“n Compact score table:”)
display(compact)

print(“n Full details (includes reasons):”)
display(df)

print(“n Done. Tip: if contextual precision/recall are low, improve retriever ranking/coverage; if faithfulness is low, tighten generation to only use context.”)

We finalize the workflow by executing the evaluate function, which triggers the LLM-as-a-judge process to score each test case against our defined metrics. We then aggregate these scores and their corresponding qualitative reasoning into a centralized DataFrame, providing a granular view of where the RAG pipeline excels or requires further optimization in retrieval and generation.

At last, we conclude by running our comprehensive evaluation suite, in which DeepEval transforms complex linguistic outputs into actionable data using metrics such as Faithfulness, Contextual Precision, and the G-Eval rubric. This systematic approach allows us to diagnose “silent failures” in retrieval and hallucinations in generation with surgical precision, providing the reasoning necessary to justify architectural changes. With these results, we move forward from experimental prototyping to a production-ready RAG system backed by a verifiable, metric-driven safety net.

Check out the FULL CODES here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post A Coding Implementation to Automating LLM Quality Assurance with DeepEval, Custom Retrievers, and LLM-as-a-Judge Metrics appeared first on MarkTechPost.

How Machine Learning and Semantic Embeddings Reorder CVE Vulnerabiliti …

In this tutorial, we build an AI-assisted vulnerability scanner that goes beyond static CVSS scoring and instead learns to prioritize vulnerabilities using semantic understanding and machine learning. We treat vulnerability descriptions as rich linguistic artifacts, embed them using modern sentence transformers, and combine these representations with structural metadata to produce a data-driven priority score. Also, we demonstrate how security teams can shift from rule-based triage to adaptive, explainable, ML-driven risk assessment. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserprint(“Installing required packages…”)
import subprocess
import sys

packages = [
‘sentence-transformers’,
‘scikit-learn’,
‘pandas’,
‘numpy’,
‘matplotlib’,
‘seaborn’,
‘requests’
]

for package in packages:
subprocess.check_call([sys.executable, ‘-m’, ‘pip’, ‘install’, ‘-q’, package])

import requests
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import json
import re
from collections import Counter
import warnings
warnings.filterwarnings(‘ignore’)

from sentence_transformers import SentenceTransformer
from sklearn.ensemble import RandomForestClassifier, GradientBoostingRegressor
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, mean_squared_error

import matplotlib.pyplot as plt
import seaborn as sns

print(“✓ All packages installed successfully!n”)

We install and load all required NLP, machine learning, and visualization libraries for the end-to-end pipeline. We ensure the runtime is fully self-contained and ready to execute in Colab or similar notebook environments. It establishes a reproducible foundation for the scanner. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserclass CVEDataFetcher:
def __init__(self):
self.base_url = “https://services.nvd.nist.gov/rest/json/cves/2.0″

def fetch_recent_cves(self, days=30, max_results=100):
print(f”Fetching CVEs from last {days} days…”)

end_date = datetime.now()
start_date = end_date – timedelta(days=days)

params = {
‘pubStartDate’: start_date.strftime(‘%Y-%m-%dT00:00:00.000’),
‘pubEndDate’: end_date.strftime(‘%Y-%m-%dT23:59:59.999’),
‘resultsPerPage’: min(max_results, 2000)
}

try:
response = requests.get(self.base_url, params=params, timeout=30)
response.raise_for_status()
data = response.json()

cves = []
for item in data.get(‘vulnerabilities’, [])[:max_results]:
cve = item.get(‘cve’, {})
cve_id = cve.get(‘id’, ‘Unknown’)

descriptions = cve.get(‘descriptions’, [])
description = next((d[‘value’] for d in descriptions if d[‘lang’] == ‘en’), ‘No description’)

metrics = cve.get(‘metrics’, {})
cvss_v3 = metrics.get(‘cvssMetricV31’, [{}])[0].get(‘cvssData’, {})
cvss_v2 = metrics.get(‘cvssMetricV2’, [{}])[0].get(‘cvssData’, {})

base_score = cvss_v3.get(‘baseScore’) or cvss_v2.get(‘baseScore’) or 0.0
severity = cvss_v3.get(‘baseSeverity’) or ‘UNKNOWN’

published = cve.get(‘published’, ”)
references = cve.get(‘references’, [])

cves.append({
‘cve_id’: cve_id,
‘description’: description,
‘cvss_score’: float(base_score),
‘severity’: severity,
‘published’: published,
‘reference_count’: len(references),
‘attack_vector’: cvss_v3.get(‘attackVector’, ‘UNKNOWN’),
‘attack_complexity’: cvss_v3.get(‘attackComplexity’, ‘UNKNOWN’),
‘privileges_required’: cvss_v3.get(‘privilegesRequired’, ‘UNKNOWN’),
‘user_interaction’: cvss_v3.get(‘userInteraction’, ‘UNKNOWN’)
})

print(f”✓ Fetched {len(cves)} CVEsn”)
return pd.DataFrame(cves)

except Exception as e:
print(f”Error fetching CVEs: {e}”)
return self._generate_sample_data(max_results)

def _generate_sample_data(self, n=50):
print(“Using sample CVE data for demonstration…n”)

sample_descriptions = [
“A buffer overflow vulnerability in the network driver allows remote code execution”,
“SQL injection vulnerability in web application login form enables unauthorized access”,
“Cross-site scripting (XSS) vulnerability in user input validation”,
“Authentication bypass in admin panel due to weak session management”,
“Remote code execution via deserialization of untrusted data”,
“Path traversal vulnerability allows reading arbitrary files”,
“Privilege escalation through improper input validation”,
“Denial of service through resource exhaustion in API endpoint”,
“Information disclosure via error messages exposing sensitive data”,
“Memory corruption vulnerability in image processing library”,
“Command injection in file upload functionality”,
“Integer overflow leading to heap buffer overflow”,
“Use-after-free vulnerability in memory management”,
“Race condition in multi-threaded application”,
“Cryptographic weakness in password storage mechanism”
]

severities = [‘LOW’, ‘MEDIUM’, ‘HIGH’, ‘CRITICAL’]
attack_vectors = [‘NETWORK’, ‘ADJACENT’, ‘LOCAL’, ‘PHYSICAL’]
complexities = [‘LOW’, ‘HIGH’]

data = []
for i in range(n):
severity = np.random.choice(severities, p=[0.1, 0.3, 0.4, 0.2])
score_ranges = {‘LOW’: (0.1, 3.9), ‘MEDIUM’: (4.0, 6.9), ‘HIGH’: (7.0, 8.9), ‘CRITICAL’: (9.0, 10.0)}

data.append({
‘cve_id’: f’CVE-2024-{10000+i}’,
‘description’: np.random.choice(sample_descriptions),
‘cvss_score’: np.random.uniform(*score_ranges[severity]),
‘severity’: severity,
‘published’: (datetime.now() – timedelta(days=np.random.randint(1, 30))).isoformat(),
‘reference_count’: np.random.randint(1, 10),
‘attack_vector’: np.random.choice(attack_vectors),
‘attack_complexity’: np.random.choice(complexities),
‘privileges_required’: np.random.choice([‘NONE’, ‘LOW’, ‘HIGH’]),
‘user_interaction’: np.random.choice([‘NONE’, ‘REQUIRED’])
})

return pd.DataFrame(data)

We implement a robust CVE ingestion component that pulls recent vulnerabilities directly from the NVD API. We normalize raw CVE records into structured features while gracefully falling back to synthetic data when API access fails. It allows the tutorial to remain runnable while reflecting real-world challenges in data ingestion. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserclass VulnerabilityFeatureExtractor:
def __init__(self):
print(“Loading sentence transformer model…”)
self.model = SentenceTransformer(‘all-MiniLM-L6-v2’)
print(“✓ Model loadedn”)

self.critical_keywords = {
‘execution’: [‘remote code execution’, ‘rce’, ‘execute’, ‘arbitrary code’],
‘injection’: [‘sql injection’, ‘command injection’, ‘code injection’],
‘authentication’: [‘bypass’, ‘authentication’, ‘authorization’],
‘overflow’: [‘buffer overflow’, ‘heap overflow’, ‘stack overflow’],
‘exposure’: [‘information disclosure’, ‘data leak’, ‘exposure’],
}

def extract_semantic_features(self, descriptions):
print(“Generating semantic embeddings…”)
embeddings = self.model.encode(descriptions, show_progress_bar=True)
return embeddings

def extract_keyword_features(self, df):
print(“Extracting keyword features…”)

for category, keywords in self.critical_keywords.items():
df[f’has_{category}’] = df[‘description’].apply(
lambda x: any(kw in x.lower() for kw in keywords)
).astype(int)

df[‘desc_length’] = df[‘description’].apply(len)
df[‘word_count’] = df[‘description’].apply(lambda x: len(x.split()))

return df

def encode_categorical_features(self, df):
print(“Encoding categorical features…”)

categorical_cols = [‘attack_vector’, ‘attack_complexity’, ‘privileges_required’, ‘user_interaction’]

for col in categorical_cols:
dummies = pd.get_dummies(df[col], prefix=col)
df = pd.concat([df, dummies], axis=1)

return df

We transform unstructured vulnerability descriptions into dense semantic embeddings using a sentence-transformer model. We also extract keyword-based risk indicators and textual statistics that capture exploit intent and complexity. Together, these features bridge linguistic context with quantitative ML inputs. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserclass VulnerabilityPrioritizer:
def __init__(self):
self.severity_classifier = RandomForestClassifier(n_estimators=100, random_state=42)
self.score_predictor = GradientBoostingRegressor(n_estimators=100, random_state=42)
self.scaler = StandardScaler()
self.feature_cols = None

def prepare_features(self, df, embeddings):
numeric_features = [‘reference_count’, ‘desc_length’, ‘word_count’]
keyword_features = [col for col in df.columns if col.startswith(‘has_’)]
categorical_features = [col for col in df.columns if any(col.startswith(prefix) for prefix in [‘attack_vector_’, ‘attack_complexity_’, ‘privileges_required_’, ‘user_interaction_’])]
self.feature_cols = numeric_features + keyword_features + categorical_features
X_structured = df[self.feature_cols].values
X_embeddings = embeddings
X_combined = np.hstack([X_structured, X_embeddings])
return X_combined

def train_models(self, X, y_severity, y_score):
print(“nTraining ML models…”)
X_scaled = self.scaler.fit_transform(X)
X_train, X_test, y_sev_train, y_sev_test, y_score_train, y_score_test = train_test_split(
X_scaled, y_severity, y_score, test_size=0.2, random_state=42
)
self.severity_classifier.fit(X_train, y_sev_train)
sev_pred = self.severity_classifier.predict(X_test)
self.score_predictor.fit(X_train, y_score_train)
score_pred = self.score_predictor.predict(X_test)
print(“n— Severity Classification Report —“)
print(classification_report(y_sev_test, sev_pred))
print(f”n— CVSS Score Prediction —“)
print(f”RMSE: {np.sqrt(mean_squared_error(y_score_test, score_pred)):.2f}”)
return X_scaled

def predict_priority(self, X):
X_scaled = self.scaler.transform(X)
severity_pred = self.severity_classifier.predict_proba(X_scaled)
score_pred = self.score_predictor.predict(X_scaled)
severity_weight = severity_pred[:, -1] * 0.4
score_weight = (score_pred / 10.0) * 0.6
priority_score = severity_weight + score_weight
return priority_score, severity_pred, score_pred

def get_feature_importance(self):
importance = self.score_predictor.feature_importances_
n_structured = len(self.feature_cols)
structured_importance = importance[:n_structured]
embedding_importance = importance[n_structured:]
feature_imp_df = pd.DataFrame({
‘feature’: self.feature_cols,
‘importance’: structured_importance
}).sort_values(‘importance’, ascending=False)
return feature_imp_df, embedding_importance.mean()

We train supervised models to predict both vulnerability severity classes and CVSS-like scores from learned features. We combine structured metadata with embeddings to create a hybrid feature space and derive a composite priority score. This is where the scanner learns how to rank vulnerabilities beyond static heuristics. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserclass VulnerabilityAnalyzer:
def __init__(self, n_clusters=5):
self.n_clusters = n_clusters
self.kmeans = KMeans(n_clusters=n_clusters, random_state=42)

def cluster_vulnerabilities(self, embeddings):
print(f”nClustering vulnerabilities into {self.n_clusters} groups…”)
clusters = self.kmeans.fit_predict(embeddings)
return clusters

def analyze_clusters(self, df, clusters):
df[‘cluster’] = clusters
print(“n— Cluster Analysis —“)
for i in range(self.n_clusters):
cluster_df = df[df[‘cluster’] == i]
print(f”nCluster {i} ({len(cluster_df)} vulnerabilities):”)
print(f” Avg CVSS Score: {cluster_df[‘cvss_score’].mean():.2f}”)
print(f” Severity Distribution: {cluster_df[‘severity’].value_counts().to_dict()}”)
print(f” Top keywords: “, end=””)
all_words = ‘ ‘.join(cluster_df[‘description’].values).lower()
words = re.findall(r’b[a-z]{4,}b’, all_words)
common = Counter(words).most_common(5)
print(‘, ‘.join([w for w, _ in common]))
return df

We cluster vulnerabilities based on embedding similarity to uncover recurring exploit patterns. We analyze each cluster to understand dominant attack themes, severity distributions, and common exploit terminology. It helps surface systemic risks rather than isolated issues. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserdef visualize_results(df, priority_scores, feature_importance):
fig, axes = plt.subplots(2, 3, figsize=(18, 10))
fig.suptitle(‘Vulnerability Scanner – ML Analysis Dashboard’, fontsize=16, fontweight=’bold’)
axes[0, 0].hist(priority_scores, bins=30, color=’crimson’, alpha=0.7, edgecolor=’black’)
axes[0, 0].set_xlabel(‘Priority Score’)
axes[0, 0].set_ylabel(‘Frequency’)
axes[0, 0].set_title(‘Priority Score Distribution’)
axes[0, 0].axvline(np.percentile(priority_scores, 75), color=’orange’, linestyle=’–‘, label=’75th percentile’)
axes[0, 0].legend()
axes[0, 1].scatter(df[‘cvss_score’], priority_scores, alpha=0.6, c=priority_scores, cmap=’RdYlGn_r’, s=50)
axes[0, 1].set_xlabel(‘CVSS Score’)
axes[0, 1].set_ylabel(‘ML Priority Score’)
axes[0, 1].set_title(‘CVSS vs ML Priority’)
axes[0, 1].plot([0, 10], [0, 1], ‘k–‘, alpha=0.3)
severity_counts = df[‘severity’].value_counts()
colors = {‘CRITICAL’: ‘darkred’, ‘HIGH’: ‘red’, ‘MEDIUM’: ‘orange’, ‘LOW’: ‘yellow’}
axes[0, 2].bar(severity_counts.index, severity_counts.values, color=[colors.get(s, ‘gray’) for s in severity_counts.index])
axes[0, 2].set_xlabel(‘Severity’)
axes[0, 2].set_ylabel(‘Count’)
axes[0, 2].set_title(‘Severity Distribution’)
axes[0, 2].tick_params(axis=’x’, rotation=45)
top_features = feature_importance.head(10)
axes[1, 0].barh(top_features[‘feature’], top_features[‘importance’], color=’steelblue’)
axes[1, 0].set_xlabel(‘Importance’)
axes[1, 0].set_title(‘Top 10 Feature Importance’)
axes[1, 0].invert_yaxis()
if ‘cluster’ in df.columns:
cluster_counts = df[‘cluster’].value_counts().sort_index()
axes[1, 1].bar(cluster_counts.index, cluster_counts.values, color=’teal’, alpha=0.7)
axes[1, 1].set_xlabel(‘Cluster’)
axes[1, 1].set_ylabel(‘Count’)
axes[1, 1].set_title(‘Vulnerability Clusters’)
attack_vector_counts = df[‘attack_vector’].value_counts()
axes[1, 2].pie(attack_vector_counts.values, labels=attack_vector_counts.index, autopct=’%1.1f%%’, startangle=90)
axes[1, 2].set_title(‘Attack Vector Distribution’)
plt.tight_layout()
plt.show()

def main():
print(“=”*70)
print(“AI-ASSISTED VULNERABILITY SCANNER WITH ML PRIORITIZATION”)
print(“=”*70)
print()
fetcher = CVEDataFetcher()
df = fetcher.fetch_recent_cves(days=30, max_results=50)
print(f”Dataset Overview:”)
print(f” Total CVEs: {len(df)}”)
print(f” Date Range: {df[‘published’].min()[:10]} to {df[‘published’].max()[:10]}”)
print(f” Severity Breakdown: {df[‘severity’].value_counts().to_dict()}”)
print()
feature_extractor = VulnerabilityFeatureExtractor()
embeddings = feature_extractor.extract_semantic_features(df[‘description’].tolist())
df = feature_extractor.extract_keyword_features(df)
df = feature_extractor.encode_categorical_features(df)
prioritizer = VulnerabilityPrioritizer()
X = prioritizer.prepare_features(df, embeddings)
severity_map = {‘LOW’: 0, ‘MEDIUM’: 1, ‘HIGH’: 2, ‘CRITICAL’: 3, ‘UNKNOWN’: 1}
y_severity = df[‘severity’].map(severity_map).values
y_score = df[‘cvss_score’].values
X_scaled = prioritizer.train_models(X, y_severity, y_score)
priority_scores, severity_probs, score_preds = prioritizer.predict_priority(X)
df[‘ml_priority_score’] = priority_scores
df[‘predicted_score’] = score_preds
analyzer = VulnerabilityAnalyzer(n_clusters=5)
clusters = analyzer.cluster_vulnerabilities(embeddings)
df = analyzer.analyze_clusters(df, clusters)
feature_imp, emb_imp = prioritizer.get_feature_importance()
print(f”n— Feature Importance —“)
print(feature_imp.head(10))
print(f”nAverage embedding importance: {emb_imp:.4f}”)
print(“n” + “=”*70)
print(“TOP 10 PRIORITY VULNERABILITIES”)
print(“=”*70)
top_vulns = df.nlargest(10, ‘ml_priority_score’)[[‘cve_id’, ‘cvss_score’, ‘ml_priority_score’, ‘severity’, ‘description’]]
for idx, row in top_vulns.iterrows():
print(f”n{row[‘cve_id’]} [Priority: {row[‘ml_priority_score’]:.3f}]”)
print(f” CVSS: {row[‘cvss_score’]:.1f} | Severity: {row[‘severity’]}”)
print(f” {row[‘description’][:100]}…”)
print(“nnGenerating visualizations…”)
visualize_results(df, priority_scores, feature_imp)
print(“n” + “=”*70)
print(“ANALYSIS COMPLETE”)
print(“=”*70)
print(f”nResults summary:”)
print(f” High Priority (>0.7): {(priority_scores > 0.7).sum()} vulnerabilities”)
print(f” Medium Priority (0.4-0.7): {((priority_scores >= 0.4) & (priority_scores <= 0.7)).sum()}”)
print(f” Low Priority (<0.4): {(priority_scores < 0.4).sum()}”)
return df, prioritizer, analyzer

if __name__ == “__main__”:
results_df, prioritizer, analyzer = main()
print(“n✓ All analyses completed successfully!”)
print(“nYou can now:”)
print(” – Access results via ‘results_df’ DataFrame”)
print(” – Use ‘prioritizer’ to predict new vulnerabilities”)
print(” – Explore ‘analyzer’ for clustering insights”)

We generate an interactive analysis dashboard that visualizes priority distributions, feature importance, clusters, and attack vectors. We execute the complete pipeline, rank the highest-priority vulnerabilities, and summarize actionable insights. It turns raw model outputs into decision-ready intelligence.

In conclusion, we implemented how vulnerability management can evolve from static scoring to intelligent prioritization using machine learning and semantic analysis. By combining embeddings, metadata, clustering, and explainability, we created a system that better reflects real-world exploit risk and operational urgency. It lays the groundwork for adaptive security pipelines where prioritization improves continuously as new vulnerability data emerges.

Check out the FULL CODES here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post How Machine Learning and Semantic Embeddings Reorder CVE Vulnerabilities Beyond Raw CVSS Scores appeared first on MarkTechPost.

GitHub Releases Copilot-SDK to Embed Its Agentic Runtime in Any App

GitHub has opened up the internal agent runtime that powers GitHub Copilot CLI and exposed it as a programmable SDK. The GitHub Copilot-SDK, now in technical preview, lets you embed the same agentic execution loop into any application so the agent can plan, invoke tools, edit files, and run commands as part of your own workflows.

What the GitHub Copilot SDK provides

The GitHub Copilot-SDK is a multi platform SDK for integrating the GitHub Copilot Agent into applications and services. It gives programmatic access to the execution loop that already powers GitHub Copilot CLI. Instead of building your own planner and tool loop for each project, you attach your logic to this existing runtime and treat it as an execution platform.

The GitHub Copilot-SDK exposes the same production tested runtime used by Copilot CLI, with support for multi model operation, multi step planning, tools, Model Context Protocol (MCP) integration, authentication, and streaming. This gives you the same agent behavior that Copilot uses in the terminal, but callable from your own code.

Agentic execution loop as a runtime primitive

The core abstraction is the agentic execution loop. In Copilot CLI and in the SDK, interactions are not isolated prompts. The agent maintains state across turns, chooses plans, calls tools, executes commands, reads results, and repeats these steps until it reaches the goal that you provided.

The GitHub team describes the usual problems when you implement this loop yourself. You need to manage context across multiple turns, orchestrate external tools and commands, route calls across models, integrate MCP servers, and think through permiss developer, you concentrate on defining domain specific tools, describing tasks, and constraining what the agent can do.

Supported languages and core API

The Copilot-SDK is available in 4 languages in this technical preview:

Node.js and TypeScript, through the package @github/copilot-cli-sdk

Python, through the package copilot

Go, through the module github.com/github/copilot-cli-sdk-go

.NET, through the package GitHub.Copilot.SDK

All SDKs expose a consistent API surface. According to the changelog, every language binding supports multi-turn conversations with session history, custom tool execution, and programmatic control over client and session life cycles.

Tools, MCP servers, and integration with existing systems

A main feature of the Copilot agent is tool execution. Through the SDK you can register custom tools that the model can call during a conversation. The Copilot-CLI already exposes custom tool definitions and full MCP server integration, and the SDK reuses that capability.

MCP gives a standard protocol for agents to connect to external systems such as internal APIs, document stores, or operations tools. When you integrate an MCP server, the Copilot agent can discover and call its operations in a structured way with consistent metadata rather than ad hoc prompt engineering.

The pattern is straightforward. You define a tool with a clear schema and effect, you expose it through the SDK, and the Copilot planner decides when and how to call it as part of the multi step plan.

Authentication, subscriptions, and streaming

The SDK integrates with GitHub authentication and Copilot subscriptions. You can either use an existing GitHub Copilot subscription or bring your own key when configuring the SDK. This is important when you embed the agent in enterprise environments where identity and access control are already standardized around GitHub.

Streaming is part of the contract. Copilot-CLI already supports real time streaming in the terminal, and the SDK exposes streaming so that applications can receive responses incrementally. This allows you to build user interfaces that update progressively as the agent reasons and executes, without waiting for a full completion.

Relationship to GitHub Copilot-CLI

The SDK is not a separate agent implementation. It is a layer on top of the existing Copilot CLI execution loop. It as a way to reuse the planning, tool use, and multi turn execution behavior of the CLI in any environment.

Copilot-CLI itself continues to evolve. Recent updates add persistent memory, infinite sessions, and context compaction, support for explore and plan workflows with model selection per step, custom agents and agent skills, full MCP support, and asynchronous task delegation. The SDK benefits from this work, because it exposes that same behavior through language specific libraries.

Key Takeaways

GitHub Copilot-SDK exposes the same agentic execution loop that powers GitHub Copilot CLI, so applications can call a production tested planner that runs multi step workflows with tools and commands.

The SDK is available for Node.js, Python, Go, and .NET, and each language binding provides a similar abstraction around clients and sessions that manage multi turn conversations and tool use.

Developers define domain specific tools and Model Context Protocol servers, then register them through the SDK, and the Copilot agent decides when and how to call them as part of the plan.

The runtime integrates with GitHub authentication and Copilot subscriptions, supports multiple AI models such as GPT based backends, and exposes real time streaming so applications can render partial responses incrementally.

Check out the GitHub Page. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post GitHub Releases Copilot-SDK to Embed Its Agentic Runtime in Any App appeared first on MarkTechPost.

How an AI Agent Chooses What to Do Under Tokens, Latency, and Tool-Cal …

In this tutorial, we build a cost-aware planning agent that deliberately balances output quality against real-world constraints such as token usage, latency, and tool-call budgets. We design the agent to generate multiple candidate actions, estimate their expected costs and benefits, and then select an execution plan that maximizes value while staying within strict budgets. With this, we demonstrate how agentic systems can move beyond “always use the LLM” behavior and instead reason explicitly about trade-offs, efficiency, and resource awareness, which is critical for deploying agents reliably in constrained environments. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserimport os, time, math, json, random
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple, Any
from getpass import getpass

USE_OPENAI = True

if USE_OPENAI:
if not os.getenv(“OPENAI_API_KEY”):
os.environ[“OPENAI_API_KEY”] = getpass(“Enter OPENAI_API_KEY (hidden): “).strip()
try:
from openai import OpenAI
client = OpenAI()
except Exception as e:
print(“OpenAI SDK import failed. Falling back to offline mode.nError:”, e)
USE_OPENAI = False

We set up the execution environment and securely load the OpenAI API key at runtime without hardcoding it. We also initialize the client so the agent gracefully falls back to offline mode if the API is unavailable. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserdef approx_tokens(text: str) -> int:
return max(1, math.ceil(len(text) / 4))

@dataclass
class Budget:
max_tokens: int
max_latency_ms: int
max_tool_calls: int

@dataclass
class Spend:
tokens: int = 0
latency_ms: int = 0
tool_calls: int = 0

def within(self, b: Budget) -> bool:
return (self.tokens <= b.max_tokens and
self.latency_ms <= b.max_latency_ms and
self.tool_calls <= b.max_tool_calls)

def add(self, other: “Spend”) -> “Spend”:
return Spend(
tokens=self.tokens + other.tokens,
latency_ms=self.latency_ms + other.latency_ms,
tool_calls=self.tool_calls + other.tool_calls
)

We define the core budgeting abstractions that enable the agent to reason explicitly about costs. We model token usage, latency, and tool calls as first-class quantities and provide utility methods to accumulate and validate spend. It gives us a clean foundation for enforcing constraints throughout planning and execution. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browser@dataclass
class StepOption:
name: str
description: str
est_spend: Spend
est_value: float
executor: str
payload: Dict[str, Any] = field(default_factory=dict)

@dataclass
class PlanCandidate:
steps: List[StepOption]
spend: Spend
value: float
rationale: str = “”

def llm_text(prompt: str, *, model: str = “gpt-5”, effort: str = “low”) -> str:
if not USE_OPENAI:
return “”
t0 = time.time()
resp = client.responses.create(
model=model,
reasoning={“effort”: effort},
input=prompt,
)
_ = (time.time() – t0)
return resp.output_text or “”

We introduce the data structures that represent individual action choices and full plan candidates. We also define a lightweight LLM wrapper that standardizes how text is generated and measured. This separation allows the planner to reason about actions abstractly without being tightly coupled to execution details. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserdef generate_step_options(task: str) -> List[StepOption]:
base = [
StepOption(
name=”Clarify deliverables (local)”,
description=”Extract deliverable checklist + acceptance criteria from the task.”,
est_spend=Spend(tokens=60, latency_ms=20, tool_calls=0),
est_value=6.0,
executor=”local”,
),
StepOption(
name=”Outline plan (LLM)”,
description=”Create a structured outline with sections, constraints, and assumptions.”,
est_spend=Spend(tokens=600, latency_ms=1200, tool_calls=1),
est_value=10.0,
executor=”llm”,
payload={“prompt_kind”:”outline”}
),
StepOption(
name=”Outline plan (local)”,
description=”Create a rough outline using templates (no LLM).”,
est_spend=Spend(tokens=120, latency_ms=40, tool_calls=0),
est_value=5.5,
executor=”local”,
),
StepOption(
name=”Risk register (LLM)”,
description=”Generate risks, mitigations, owners, and severity.”,
est_spend=Spend(tokens=700, latency_ms=1400, tool_calls=1),
est_value=9.0,
executor=”llm”,
payload={“prompt_kind”:”risks”}
),
StepOption(
name=”Risk register (local)”,
description=”Generate a standard risk register from a reusable template.”,
est_spend=Spend(tokens=160, latency_ms=60, tool_calls=0),
est_value=5.0,
executor=”local”,
),
StepOption(
name=”Timeline (LLM)”,
description=”Draft a realistic milestone timeline with dependencies.”,
est_spend=Spend(tokens=650, latency_ms=1300, tool_calls=1),
est_value=8.5,
executor=”llm”,
payload={“prompt_kind”:”timeline”}
),
StepOption(
name=”Timeline (local)”,
description=”Draft a simple timeline from a generic milestone template.”,
est_spend=Spend(tokens=150, latency_ms=60, tool_calls=0),
est_value=4.8,
executor=”local”,
),
StepOption(
name=”Quality pass (LLM)”,
description=”Rewrite for clarity, consistency, and formatting.”,
est_spend=Spend(tokens=900, latency_ms=1600, tool_calls=1),
est_value=8.0,
executor=”llm”,
payload={“prompt_kind”:”polish”}
),
StepOption(
name=”Quality pass (local)”,
description=”Light formatting + consistency checks without LLM.”,
est_spend=Spend(tokens=120, latency_ms=50, tool_calls=0),
est_value=3.5,
executor=”local”,
),
]

if USE_OPENAI:
meta_prompt = f”””
You are a planning assistant. For the task below, propose 3-5 OPTIONAL extra steps that improve quality,
like checks, validations, or stakeholder tailoring. Keep each step short.

TASK:
{task}

Return JSON list with fields: name, description, est_value(1-10).
“””
txt = llm_text(meta_prompt, model=”gpt-5″, effort=”low”)
try:
items = json.loads(txt.strip())
for it in items[:5]:
base.append(
StepOption(
name=str(it.get(“name”,”Extra step (local)”))[:60],
description=str(it.get(“description”,””))[:200],
est_spend=Spend(tokens=120, latency_ms=60, tool_calls=0),
est_value=float(it.get(“est_value”, 5.0)),
executor=”local”,
)
)
except Exception:
pass

return base

We focus on generating a diverse set of candidate steps, including both LLM-based and local alternatives with different cost–quality trade-offs. We optionally use the model itself to suggest additional low-cost improvements while still controlling their impact on the budget. By doing so, we enrich the action space without losing efficiency. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserdef plan_under_budget(
options: List[StepOption],
budget: Budget,
*,
max_steps: int = 6,
beam_width: int = 12,
diversity_penalty: float = 0.2
) -> PlanCandidate:
def redundancy_cost(chosen: List[StepOption], new: StepOption) -> float:
key_new = new.name.split(“(“)[0].strip().lower()
overlap = 0
for s in chosen:
key_s = s.name.split(“(“)[0].strip().lower()
if key_s == key_new:
overlap += 1
return overlap * diversity_penalty

beams: List[PlanCandidate] = [PlanCandidate(steps=[], spend=Spend(), value=0.0, rationale=””)]

for _ in range(max_steps):
expanded: List[PlanCandidate] = []
for cand in beams:
for opt in options:
if opt in cand.steps:
continue
new_spend = cand.spend.add(opt.est_spend)
if not new_spend.within(budget):
continue
new_value = cand.value + opt.est_value – redundancy_cost(cand.steps, opt)
expanded.append(
PlanCandidate(
steps=cand.steps + [opt],
spend=new_spend,
value=new_value,
rationale=cand.rationale
)
)
if not expanded:
break
expanded.sort(key=lambda c: c.value, reverse=True)
beams = expanded[:beam_width]

best = max(beams, key=lambda c: c.value)
return best

We implement the budget-constrained planning logic that searches for the highest-value combination of steps under strict limits. We apply a beam-style search with redundancy penalties to avoid wasteful action overlap. This is where the agent truly becomes cost-aware by optimizing value subject to constraints. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserdef run_local_step(task: str, step: StepOption, working: Dict[str, Any]) -> str:
name = step.name.lower()
if “clarify deliverables” in name:
return (
“Deliverables checklist:n”
“- Executive summaryn- Scope & assumptionsn- Workplan + milestonesn”
“- Risk register (risk, impact, likelihood, mitigation, owner)n”
“- Next steps + data neededn”
)
if “outline plan” in name:
return (
“Outline:n1) Context & objectiven2) Scopen3) Approachn4) Timelinen5) Risksn6) Next stepsn”
)
if “risk register” in name:
return (
“Risk register (template):n”
“1) Data access delays | High | Mitigation: agree data list + ownersn”
“2) Stakeholder alignment | Med | Mitigation: weekly reviewn”
“3) Tooling constraints | Med | Mitigation: phased rolloutn”
)
if “timeline” in name:
return (
“Timeline (template):n”
“Week 1: discovery + requirementsnWeek 2: prototype + feedbackn”
“Week 3: pilot + metricsnWeek 4: rollout + handovern”
)
if “quality pass” in name:
draft = working.get(“draft”, “”)
return “Light quality pass done (headings normalized, bullets aligned).n” + draft
return f”Completed: {step.name}n”

def run_llm_step(task: str, step: StepOption, working: Dict[str, Any]) -> str:
kind = step.payload.get(“prompt_kind”, “generic”)
context = working.get(“draft”, “”)
prompts = {
“outline”: f”Create a crisp, structured outline for the task below.nTASK:n{task}nReturn a numbered outline.”,
“risks”: f”Create a risk register for the task below. Include: Risk | Impact | Likelihood | Mitigation | Owner.nTASK:n{task}”,
“timeline”: f”Create a realistic milestone timeline with dependencies for the task below.nTASK:n{task}”,
“polish”: f”Rewrite and polish the following draft for clarity and consistency.nDRAFT:n{context}”,
“generic”: f”Help with this step: {step.description}nTASK:n{task}nCURRENT:n{context}”,
}
return llm_text(prompts.get(kind, prompts[“generic”]), model=”gpt-5″, effort=”low”)

def execute_plan(task: str, plan: PlanCandidate) -> Tuple[str, Spend]:
working = {“draft”: “”}
actual = Spend()

for i, step in enumerate(plan.steps, 1):
t0 = time.time()
if step.executor == “llm” and USE_OPENAI:
out = run_llm_step(task, step, working)
tool_calls = 1
else:
out = run_local_step(task, step, working)
tool_calls = 0

dt_ms = int((time.time() – t0) * 1000)
tok = approx_tokens(out)

actual = actual.add(Spend(tokens=tok, latency_ms=dt_ms, tool_calls=tool_calls))
working[“draft”] += f”nn### Step {i}: {step.name}n{out}n”

return working[“draft”].strip(), actual

TASK = “Draft a 1-page project proposal for a logistics dashboard + fleet optimization pilot, including scope, timeline, and risks.”
BUDGET = Budget(
max_tokens=2200,
max_latency_ms=3500,
max_tool_calls=2
)

options = generate_step_options(TASK)
best_plan = plan_under_budget(options, BUDGET, max_steps=6, beam_width=14)

print(“=== SELECTED PLAN (budget-aware) ===”)
for s in best_plan.steps:
print(f”- {s.name} | est_spend={s.est_spend} | est_value={s.est_value}”)
print(“nEstimated spend:”, best_plan.spend)
print(“Budget:”, BUDGET)

print(“n=== EXECUTING PLAN ===”)
draft, actual = execute_plan(TASK, best_plan)

print(“n=== OUTPUT DRAFT ===n”)
print(draft[:6000])

print(“n=== ACTUAL SPEND (approx) ===”)
print(actual)
print(“nWithin budget?”, actual.within(BUDGET))

We execute the selected plan and track actual resource usage step by step. We dynamically choose between local and LLM execution paths and aggregate the final output into a coherent draft. By comparing estimated and actual spend, we demonstrate how planning assumptions can be validated and refined in practice.

In conclusion, we demonstrated how a cost-aware planning agent can reason about its resource consumption and adapt its behavior in real time. We executed only the steps that fit within predefined budgets and tracked actual spend to validate the planning assumptions, closing the loop between estimation and execution. Also, we highlighted how agentic AI systems can become more practical, controllable, and scalable by treating cost, latency, and tool usage as first-class decision variables rather than afterthoughts.

Check out the FULL CODES here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post How an AI Agent Chooses What to Do Under Tokens, Latency, and Tool-Call Budget Constraints? appeared first on MarkTechPost.

Qwen Researchers Release Qwen3-TTS: an Open Multilingual TTS Suite wit …

Alibaba Cloud’s Qwen team has open-sourced Qwen3-TTS, a family of multilingual text-to-speech models that target three core tasks in one stack, voice clone, voice design, and high quality speech generation.

https://arxiv.org/pdf/2601.15621v1

Model family and capabilities

Qwen3-TTS uses a 12Hz speech tokenizer and 2 language model sizes, 0.6B and 1.7B, packaged into 3 main tasks. The open release exposes 5 models, Qwen3-TTS-12Hz-0.6B-Base and Qwen3-TTS-12Hz-1.7B-Base for voice cloning and generic TTS, Qwen3-TTS-12Hz-0.6B-CustomVoice and Qwen3-TTS-12Hz-1.7B-CustomVoice for promptable preset speakers, and Qwen3-TTS-12Hz-1.7B-VoiceDesign for free form voice creation from natural language descriptions, along with the Qwen3-TTS-Tokenizer-12Hz codec.

All models support 10 languages, Chinese, English, Japanese, Korean, German, French, Russian, Portuguese, Spanish, and Italian. CustomVoice variants ship with 9 curated timbres, such as Vivian, a bright young Chinese female voice, Ryan, a dynamic English male voice, and Ono_Anna, a playful Japanese female voice, each with a short description that encodes timbre and speaking style.

The VoiceDesign model maps text instructions directly to new voices, for example ‘speak in a nervous teenage male voice with rising intonation’ and can then be combined with the Base model by first generating a short reference clip and reusing it via create_voice_clone_prompt.

https://arxiv.org/pdf/2601.15621v1

Architecture, tokenizer, and streaming path

Qwen3-TTS is a dual track language model, one track predicts discrete acoustic tokens from text, the other handles alignment and control signals. The system is trained on more than 5 million hours of multilingual speech in 3 pre training stages that move from general mapping, to high quality data, to long context support up to 32,768 tokens.

A key component is the Qwen3-TTS-Tokenizer-12Hz codec. It operates at 12.5 frames per second, about 80 ms per token, and uses 16 quantizers with a 2048 entry codebook. On LibriSpeech test clean it reaches PESQ wideband 3.21, STOI 0.96, and UTMOS 4.16, outperforming SpeechTokenizer, XCodec, Mimi, FireredTTS 2 and other recent semantic tokenizers, while using a similar or lower frame rate.

The tokenizer is implemented as a pure left context streaming decoder, so it can emit waveforms as soon as enough tokens are available. With 4 tokens per packet, each streaming packet carries 320 ms of audio. The non-DiT decoder and BigVGAN free design reduces decode cost and simplifies batching.

On the language model side, the research team reports end to end streaming measurements on a single vLLM backend with torch.compile and CUDA Graph optimizations. For Qwen3-TTS-12Hz-0.6B-Base and Qwen3-TTS-12Hz-1.7B-Base at concurrency 1, the first packet latency is around 97 ms and 101 ms, with real time factors of 0.288 and 0.313 respectively. Even at concurrency 6, first packet latency stays around 299 ms and 333 ms.

https://arxiv.org/pdf/2601.15621v1

Alignment and control

Post training uses a staged alignment pipeline. First, Direct Preference Optimization aligns generated speech with human preferences on multilingual data. Then GSPO with rule based rewards improves stability and prosody. A final speaker fine tuning stage on the Base model yields target speaker variants while preserving the core capabilities of the general model.

Instruction following is implemented in a ChatML style format, where text instructions about style, emotion or tempo are prepended to the input. This same interface powers VoiceDesign, CustomVoice style prompts, and fine grained edits for cloned speakers.

Benchmarks, zero shot cloning, and multilingual speech

On the Seed-TTS test set, Qwen3-TTS is evaluated as a zero-shot voice cloning system. The Qwen3-TTS-12Hz-1.7B-Base model reaches a Word Error Rate of 0.77 on test-zh and 1.24 on test-en. The research team highlights the 1.24 WER on test-en as state of the art among the compared systems, while the Chinese WER is close to, but not lower than, the best CosyVoice 3 score.

https://arxiv.org/pdf/2601.15621v1

On a multilingual TTS test set covering 10 languages, Qwen3-TTS achieves the lowest WER in 6 languages, Chinese, English, Italian, French, Korean, and Russian, and competitive performance on the remaining 4 languages, while also obtaining the highest speaker similarity in all 10 languages compared to MiniMax-Speech and ElevenLabs Multilingual v2.

Cross-lingual evaluations show that Qwen3-TTS-12Hz-1.7B-Base reduces mixed error rate for several language pairs, such as zh-to-ko, where the error drops from 14.4 for CosyVoice3 to 4.82, about a 66 percent relative reduction.

On InstructTTSEval, the Qwen3TTS-12Hz-1.7B-VD VoiceDesign model sets new state of the art scores among open source models on Description-Speech Consistency and Response Precision in both Chinese and English, and is competitive with commercial systems like Hume and Gemini on several metrics.

Key Takeaways

Full open source multilingual TTS stack: Qwen3-TTS is an Apache 2.0 licensed suite that covers 3 tasks in one stack, high quality TTS, 3 second voice cloning, and instruction based voice design across 10 languages using the 12Hz tokenizer family.

Efficient discrete codec and real time streaming: The Qwen3-TTS-Tokenizer-12Hz uses 16 codebooks at 12.5 frames per second, reaches strong PESQ, STOI and UTMOS scores, and supports packetized streaming with about 320 ms of audio per packet and sub 120 ms first packet latency for the 0.6B and 1.7B models in the reported setup.

Task specific model variants: The release offers Base models for cloning and generic TTS, CustomVoice models with 9 predefined speakers and style prompts, and a VoiceDesign model that generates new voices directly from natural language descriptions which can then be reused by the Base model.

Strong alignment and multilingual quality: A multi stage alignment pipeline with DPO, GSPO and speaker fine tuning gives Qwen3-TTS low word error rates and high speaker similarity, with lowest WER in 6 of 10 languages and the best speaker similarity in all 10 languages among the evaluated systems, and state of the art zero shot English cloning on Seed TTS.

Check out the Model Weights, Repo and Playground. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post Qwen Researchers Release Qwen3-TTS: an Open Multilingual TTS Suite with Real-Time Latency and Fine-Grained Voice Control appeared first on MarkTechPost.

Build AI agents with Amazon Bedrock AgentCore using AWS CloudFormation

Agentic-AI has become essential for deploying production-ready AI applications, yet many developers struggle with the complexity of manually configuring agent infrastructure across multiple environments. Infrastructure as code (IaC) facilitates consistent, secure, and scalable infrastructure that autonomous AI systems require. It minimizes manual configuration errors through automated resource management and declarative templates, reducing deployment time from hours to minutes while facilitating infrastructure consistency across the environments to help prevent unpredictable agent behavior. It provides version control and rollback capabilities for quick recovery from issues, essential for maintaining agentic system availability, and enables automated scaling and resource optimization through parameterized templates that adapt from lightweight development to production-grade deployments. For agentic applications operating with minimal human intervention, the reliability of IaC, automated validation of security standards, and seamless integration into DevOps workflows are essential for robust autonomous operations.
In order to streamline the resource deployment and management, Amazon Bedrock AgentCore services are now being supported by various IaC frameworks such as AWS Cloud Development Kit (AWS CDK), Terraform and AWS CloudFormation Templates. This integration brings the power of IaC directly to AgentCore so developers can provision, configure, and manage their AI agent infrastructure. In this post, we use CloudFormation templates to build an end-to-end application for a weather activity planner. Examples of using CDK and Terraform can be found at GitHub Sample Library.
Building an activity planner agent based on weather
The sample creates a weather activity planner, demonstrating a practical application that processes real-time weather data to provide personalized activity recommendations based on a location of interest. The application consists of multiple integrated components:

Real-time weather data collection – The application retrieves current weather conditions from authoritative meteorological sources such as weather.gov, gathering essential data points including temperature readings, precipitation probability forecasts, wind speed measurements, and other relevant atmospheric conditions that influence outdoor activity suitability.
Weather analysis engine – The application processes raw meteorological data through customized logic to evaluate suitability of a day for an outdoor activity based on multiple weather factors:

Temperature comfort scoring – Activities receive reduced suitability scores when temperatures drop below 50°F
Precipitation risk assessment – Rain probabilities exceeding 30% trigger adjustments to outdoor activity recommendations
Wind condition impact evaluation – Wind speeds above 15 mph affect overall comfort and safety ratings for various activities

Personalized recommendation system – The application processes weather analysis results with user preferences and location-based awareness to generate tailored activity suggestions.

The following diagram shows this flow.

Now let’s look at how this can be implemented using AgentCore services:

AgentCore Browser – For automated browsing of weather data from sources such as weather.gov
AgentCore Code Interpreter – For executing Python code that processes weather data, performs calculations, and implements the scoring algorithms
AgentCore Runtime – For hosting an agent that orchestrates the application flow, managing data processing pipelines, and coordinating between different components
AgentCore Memory – For storing the user preferences as long term memory

The following diagram shows this architecture.

Deploying the CloudFormation template

Download the CloudFormation template from github for End-to-End-Weather-Agent.yaml on your local machine
Open CloudFormation from AWS Console
Click Create stack → With new resources (standard)
Choose template source (upload file) and select your template
Enter stack name and change any required parameters if needed
Review configuration and acknowledge IAM capabilities
Click Submit and monitor deployment progress on the Events tab

Here is the visual steps for CloudFomation template deployment

Running and testing the application

Adding observability and monitoring
AgentCore Observability provides key advantages. It offers quality and trust through detailed workflow visualizations and real-time performance monitoring. You can gain accelerated time-to-market by using Amazon CloudWatch powered dashboards that reduce manual data integration from multiple sources, making it possible to take corrective actions based on actionable insights. Integration flexibility with OpenTelemetry-compatible format supports existing tools such as CloudWatch, DataDog, Arize Phoenix, LangSmith, and LangFuse.
The service provides end-to-end traceability across frameworks and foundation models (FMs), captures critical metrics such as token usage and tool selection patterns, and supports both automatic instrumentation for AgentCore Runtime hosted agents and configurable monitoring for agents deployed on other services. This comprehensive observability approach helps organizations achieve faster development cycles, more reliable agent behavior, and improved operational visibility while building trustworthy AI agents at scale.
The following screenshot shows metrics in the AgentCore Runtime UI.

Customizing for your use case
The weather activity planner AWS CloudFormation template is designed with modular components that can be seamlessly adapted for various applications. For instance, you can customize the AgentCore Browser tool to collect information from different web applications (such as financial websites for investment guidance, social media feeds for sentiment monitoring, or ecommerce sites for price tracking), modify the AgentCore Code Interpreter algorithms to process your specific business logic (such as predictive modeling for sales forecasting, risk assessment for insurance, or quality control for manufacturing), adjust the AgentCore Memory component to store relevant user preferences or business context (such as customer profiles, inventory levels, or project requirements), and reconfigure the Strands Agents tasks to orchestrate workflows specific to your domain (such as supply chain optimization, customer service automation, or compliance monitoring).
Best practices for deployments
We recommend the following practices for your deployments:

Modular component architecture – Design AWS CloudFormation templates with separate sections for each AWS Services.
Parameterized template design – Use AWS CloudFormation parameters for the configurable elements to facilitate reusable templates across environments. For example, this can help associate the same base container with multiple agent deployments, help point to two different build configurations, or parameterize the LLM of choice for powering your agents.
AWS Identity and Access Management (IAM) security and least privilege – Implement fine-grained IAM roles for each AgentCore component with specific resource Amazon Resource Names (ARNs). Refer to our documentation on AgentCore security considerations.
Comprehensive monitoring and observability – Enable CloudWatch logging, custom metrics, AWS X-Ray distributed tracing, and alerts across the components.
Version control and continuous integration and continuous delivery (CI/CD) integration – Maintain templates in GitHub with automated validation, comprehensive testing, and AWS CloudFormation StackSets for consistent multi-Region deployments.

You can find a more comprehensive set of best practices at CloudFormation best practices
Clean up resources
To avoid incurring future charges, delete the resources used in this solution:

On the Amazon S3 console, manually delete the contents inside the bucket you created for template deployment and then delete the bucket.
On the CloudFormation console, choose Stacks in the navigation pane, select the main stack, and choose Delete.

Conclusion
In this post, we introduced an automated solution for deploying AgentCore services using AWS CloudFormation. These preconfigured templates enable rapid deployment of powerful agentic AI systems without the complexity of manual component setup. This automated approach helps save time and facilitates consistent and reproducible deployments so you can focus on building agentic AI workflows that drive business growth.
Try out some more examples from our Infrastructure as Code sample repositories :

Terraform
CloudFormation
CDK

About the authors
Chintan Patel is a Senior Solution Architect at AWS with extensive experience in solution design and development. He helps organizations across diverse industries to modernize their infrastructure, demystify Generative AI technologies, and optimize their cloud investments. Outside of work, he enjoys spending time with his kids, playing pickleball, and experimenting with AI tools.
Shreyas Subramanian is a Principal Data Scientist and helps customers by using Generative AI and deep learning to solve their business challenges using AWS services like Amazon Bedrock and AgentCore. Dr. Subramanian contributes to cutting-edge research in deep learning, Agentic AI, foundation models and optimization techniques with several books, papers and patents to his name. In his current role at Amazon, Dr. Subramanian works with various science leaders and research teams within and outside Amazon, helping to guide customers to best leverage state-of-the-art algorithms and techniques to solve business critical problems. Outside AWS, Dr. Subramanian is a expert reviewer for AI papers and funding via organizations like Neurips, ICML, ICLR, NASA and NSF.
Kosti Vasilakakis is a Principal PM at AWS on the Agentic AI team, where he has led the design and development of several Bedrock AgentCore services from the ground up, including Runtime. He previously worked on Amazon SageMaker since its early days, launching AI/ML capabilities now used by thousands of companies worldwide. Earlier in his career, Kosti was a data scientist. Outside of work, he builds personal productivity automations, plays tennis, and explores the wilderness with his family.

How the Amazon.com Catalog Team built self-learning generative AI at s …

The Amazon.com Catalog is the foundation of every customer’s shopping experience—the definitive source of product information with attributes that power search, recommendations, and discovery. When a seller lists a new product, the catalog system must extract structured attributes—dimensions, materials, compatibility, and technical specifications—while generating content such as titles that match how customers search. A title isn’t a simple enumeration like color or size; it must balance seller intent, customer search behavior, and discoverability. This complexity, multiplied by millions of daily submissions, makes catalog enrichment an ideal proving ground for self-learning AI.
In this post, we demonstrate how the Amazon Catalog Team built a self-learning system that continuously improves accuracy while reducing costs at scale using Amazon Bedrock.
The challenge
In generative AI deployment environments, improving model performance calls for constant attention. Because models process millions of products, they inevitably encounter edge cases, evolving terminology, and domain-specific patterns where accuracy may degrade. The traditional approach—applied scientists analyzing failures, updating prompts, testing changes, and redeploying—works but is resource-intensive and struggles to keep pace with real-world volume and variety. The challenge isn’t whether we can improve these systems, but how to make improvement scalable and automatic rather than dependent on manual intervention. At Amazon Catalog, we faced this challenge head-on. The tradeoffs seemed impossible: large models would deliver accuracy but wouldn’t scale efficiently to our volume, while smaller models struggled with the complex, ambiguous cases where sellers needed the most help.
Solution overview
Our breakthrough came from an unconventional experiment. Instead of choosing a single model, we deployed multiple smaller models to process the same products. When these models agreed on an attribute extraction, we could trust the result. But when they disagreed—whether from genuine ambiguity, missing context, or one model making an error—we discovered something profound. These disagreements weren’t always errors, but they were almost always indicators of complexity. This led us to design a self-learning system that reimagines how generative AI scales. Multiple smaller models process routine cases through consensus, invoking larger models only when disagreements occur. The larger model is implemented as a supervisor agent with access to specialized tools for deeper investigation and analysis. But the supervisor doesn’t just resolve disputes; it generates reusable learnings stored in a dynamic knowledge base that helps prevent entire classes of future disagreements. We invoke more powerful models only when the system detects high learning value at inference time, while correcting the output. The result is a self-learning system where costs decrease and quality increases—because the system learns to handle edge cases that previously triggered supervisor calls. Error rates fell continuously, not through retraining but through accumulated learnings from resolved disagreements injected into smaller model prompts. The following figure shows the architecture of this self-learning system.

In the self-learning architecture, product data flows through generator-evaluator workers, with disagreements routed to a supervisor for investigation. Post-inference, the system also captures feedback signals from sellers (such as listing updates and appeals) and customers (such as returns and negative reviews). Learnings from the sources are stored in a hierarchical knowledge base and injected back into worker prompts, creating a continuous improvement loop.
The following describes a simplified reference architecture that demonstrates how this self-learning pattern can be implemented using AWS services. While our production system has additional complexity, this example illustrates the core components and data flows.
This system can be built with Amazon Bedrock, which provides the essential infrastructure for multi-model architectures. The ability of Amazon Bedrock to access diverse foundation models enables teams to deploy smaller, efficient models like Amazon Nova Lite as workers and more capable models like Anthropic Claude Sonnet as supervisors—optimizing both cost and performance. For even greater cost efficiency at scale, teams can also deploy open source small models on Amazon Elastic Compute Cloud (Amazon EC2) GPU instances, providing full control over worker model selection and batch throughput optimization. For productionizing a supervisor agent with its specialized tools and dynamic knowledge base, Bedrock AgentCore provides the runtime scalability, memory management, and observability needed to deploy self-learning systems reliably at scale.

Our supervisor agent integrates with Amazon’s extensive Selection and Catalog Systems. The above diagram is a simplified view showing the key features of the agent and some of the AWS services that make it possible. Product data flows through generator-evaluator workers (Amazon EC2 and Amazon Bedrock Runtime), with agreements stored directly and disagreements routed to a supervisor agent (Bedrock AgentCore). The learning aggregator and memory manager utilize Amazon DynamoDB for the knowledge base, with learnings injected back into worker prompts. Human review (Amazon Simple Queue Service (Amazon SQS)) and observability (Amazon CloudWatch) complete the architecture. Production implementations will likely require additional components for scale, reliability, and integration with existing systems.
But how did we arrive at this architecture? The key insight came from an unexpected place.
The insight: Turning disagreements into opportunities
Our perspective shifted during a debugging session. When multiple smaller models (such as Nova Lite) disagreed on product attributes—interpreting the same specification differently based on how they understood technical terminology—we initially saw this as a failure. But the data told a different story: products where our smaller models disagreed correlated with cases requiring more manual review and clarification. When models disagreed, those were precisely the products that needed additional investigation. The disagreements were surfacing learning opportunities, but we couldn’t have engineers and scientists deep-dive on every case. The supervisor agent does this automatically at scale. And crucially, the goal isn’t just to determine which model was right—it’s to extract learnings that help prevent similar disagreements in the future. This is the key to efficient scaling. Disagreements don’t just come from AI workers at inference time. Post-inference, sellers express disagreement through listing updates and appeals—signals that our original extraction might have missed important context. Customers disagree through returns and negative reviews, often indicating that product information didn’t match expectations. These post-inference human signals feed into the same learning pipeline, with the supervisor investigating patterns and generating learnings that help prevent similar issues across future products. We found a sweet spot: attributes with moderate AI worker disagreement rates yielded the richest learnings—high enough to surface meaningful patterns, low enough to indicate solvable ambiguity. When disagreement rates are too low, they typically reflect noise or fundamental model limitations rather than learnable patterns—for those, we consider using more capable workers. When disagreement rates are too high, it signals that worker models or prompts aren’t yet mature enough, triggering excessive supervisor calls that undermine the efficiency gains of the architecture. These thresholds will vary by task and domain; the key is identifying your own sweet spot where disagreements represent genuine complexity worth investigating, rather than fundamental gaps in worker capability or random noise.
Deep dive: How it works
At the heart of our system are multiple lightweight worker models operating in parallel—some as generators extracting attributes, others as evaluators assessing those extractions. These workers can be implemented in a non-agentic way with fixed inputs, making them batch-friendly and scalable. The generator-evaluator pattern creates productive tension, conceptually similar to the productive tension in generative adversarial networks (GANs), though our approach operates at inference time through prompting rather than training. We explicitly prompt evaluators to be critical, instructing them to scrutinize extractions for ambiguities, missing context, or potential misinterpretations. This adversarial dynamic surfaces disagreements that represent genuine complexity rather than letting ambiguous cases pass through undetected. When the generator and evaluator agree, we have high confidence in the result and process it at minimal computational cost. This consensus path handles most product attributes. When they disagree, we’ve identified a case worth investigating—triggering the supervisor to resolve the dispute and extract reusable learnings.
Our architecture treats disagreement as a universal learning signal. At inference time, worker-to-worker disagreements catch ambiguity. Post-inference, seller feedback catches misalignments with intent and customer feedback catches misalignments with expectations. The three channels feed the supervisor, which extracts learnings that improve accuracy across the board. When workers disagree, we invoke a supervisor agent—a more capable model that resolves the dispute and investigates why it occurred. The supervisor determines what context or reasoning the workers lacked, and these insights become reusable learnings for future cases. For example, when workers disagreed about usage classification for a product based on certain technical terms, the supervisor investigated and clarified that those terms alone were insufficient—visual context and other indicators needed to be considered together. The supervisor generated a learning about how to properly weight different signals for that product category. This learning immediately updated our knowledge base, and when injected into worker prompts for similar products, helped prevent future disagreements across thousands of items. While the workers could theoretically be the same model as the supervisor, using smaller models is crucial for efficiency at scale. The architectural advantage emerges from this asymmetry: lightweight workers handle routine cases through consensus, while the more capable supervisor is invoked only when disagreements surface high-value learning opportunities. As the system accumulates learnings and disagreement rates drop, supervisor calls naturally decline—efficiency gains are baked directly into the architecture. This worker-supervisor heterogeneity also enables richer investigation. Because supervisors are invoked selectively, they can afford to pull in additional signals—customer reviews, return reasons, seller history—that would be impractical to retrieve for every product but provide crucial context when resolving complex disagreements. When these signals yield generalizable insights about how customers want product information presented—which attributes to highlight, what terminology resonates, how to frame specifications—the resulting learnings benefit future inferences across similar products without retrieving those resource-intensive signals again. Over time, this creates a feedback loop: better product information leads to fewer returns and negative reviews, which in turn reflects improved customer satisfaction.
The knowledge base: Making learnings scalable
The supervisor investigates disagreements at the individual product level. With millions of items to process, we need a scalable way to transform these product-specific insights into reusable learnings. Our aggregation strategy adapts to context: high-volume patterns get synthesized into broader learnings, while unique or critical cases are preserved individually. We use a hierarchical structure where a large language model (LLM)-based memory manager navigates the knowledge tree to place each learning. Starting from the root, it traverses categories and subcategories, deciding at each level whether to continue down an existing path, create a new branch, merge with existing knowledge, or replace outdated information. This dynamic organization allows the knowledge base to evolve with emerging patterns while maintaining logical structure. During inference, workers receive relevant learnings in their prompts based on product category, automatically incorporating domain knowledge from past disagreements. The knowledge base also introduces traceability—when an extraction seems incorrect, we can pinpoint exactly which learning influenced it. This shifts auditing from an unscalable task to a practical one: instead of reviewing a sample of millions of outputs—where human effort grows proportionally with scale—teams can audit the knowledge base itself, which remains relatively fixed in size regardless of inference volume. Domain experts can directly contribute by adding or refining entries, no retraining required. A single well-crafted learning can immediately improve accuracy across thousands of products. The knowledge base bridges human expertise and AI capability, where automated learnings and human insights work together.
Lessons learned and best practices
When this self-learning architecture works best:

High-volume inference where input diversity drives compounded learning
Quality-critical applications where consensus provides natural quality assurance
Evolving domains with new patterns and terminology constantly emerging

It’s less suitable for low-volume scenarios (insufficient disagreements for learning) or use cases with fixed, unchanging rules.
Critical success factors:

Defining disagreements: With a generator-evaluator pair, disagreement occurs when the evaluator flags the extraction as needing improvement. With multiple workers, scale thresholds accordingly. The key is maintaining productive tension between workers. If disagreement rates fall outside the productive range (too low or too high), consider more capable workers or refined prompts.
Tracking learning effectiveness: Disagreement rates must decrease over time—this is your primary health metric. If rates stay flat, check knowledge retrieval, prompt injection, or evaluator criticality.
Knowledge organization: Structure learnings hierarchically and keep them actionable. Abstract guidance doesn’t help; specific, concrete learnings directly improve future inferences.

Common pitfalls

Focusing on cost over intelligence: Cost reduction is a byproduct, not the goal
Rubber-stamp evaluators: Evaluators that simply approve generator outputs won’t surface meaningful disagreements—prompt them to actively challenge and critique extractions
Poor learning extraction: Supervisors must identify generalizable patterns, not just fix individual cases
Knowledge rot: Without organization, learnings become unsearchable and unusable

The key insight: treat declining disagreement rates as your north star metric—they show the system is truly learning.
Deployment strategies: Two approaches

Learn-then-deploy: Start with basic prompts and let the system learn aggressively in a pre-production environment. Domain experts then audit the knowledge base—not individual outputs—to make sure learned patterns align with desired outcomes. When approved, deploy with validated learnings. This is ideal for new use cases where you don’t yet know what good looks like—disagreements help discover the right patterns, and knowledge base auditing lets you shape them before production.
Deploy-and-learn: Start with refined prompts and good initial quality, then continuously improve through ongoing learning in production. This works best for well-understood use cases where you can define quality upfront but still want to capture domain-specific nuances over time.

Both approaches use the same architecture—the choice depends on whether you’re exploring new territory or optimizing familiar ground.
Conclusion
What started as an experiment in catalog enrichment revealed a fundamental truth: AI systems don’t have to be frozen in time. By embracing disagreements as learning signals rather than failures, we’ve built an architecture that accumulates domain knowledge through actual usage. We watched the system evolve from generic understanding to domain-specific expertise. It learned industry-specific terminology. It discovered contextual rules that vary across categories. It adapted to requirements no pre-trained model would encounter—all without retraining, through learnings stored in a knowledge base and injected back into worker prompts. For teams operationalizing similar architectures, Amazon Bedrock AgentCore offers purpose-built capabilities:

AgentCore Runtime  handles quick consensus decisions for routine cases while supporting extended reasoning when supervisors investigate complex disagreements
AgentCore Observability provides visibility into which learnings drive impact, helping teams refine knowledge propagation and maintain reliability at scale

The implications extend beyond catalog management. High-volume AI applications could benefit from this process—and the ability of Amazon Bedrock to access diverse models makes this architecture straightforward to implement. The key insight is this: we’ve shifted from asking “which model should we use?” to “how can we build systems that learn our specific patterns? “Whether you learn-then-deploy for new use cases or deploy-and-learn for established ones, the implementation is straightforward: start with workers suited to your task, choose a supervisor, and let disagreements drive learning. With the right architecture, every inference can become an opportunity to capture domain knowledge. That’s not just scaling—that’s building institutional knowledge into your AI systems.
Acknowledgement This work wouldn’t have been possible without the contributions and support from Ankur Datta (Senior Principal Applied Scientist – leader of science in Everyday Essentials Stores), Zhu Cheng (Applied Scientist), Xuan Tang (Software Engineer), Mohammad Ghasemi (Applied Scientist). We sincerely appreciate the contributions in designs, implementations, numerous fruitful brain-storming sessions, and all the insightful ideas and suggestions.

About the authors
Tarik Arici is a Principal Scientist at Amazon Selection and Catalog Systems (ASCS), where he pioneers self-learning generative AI systems design for catalog quality enhancement at scale. His work focuses on building AI systems that automatically accumulate domain knowledge through production usage—learning from customer reviews and returns, seller feedback, and model disagreements to improve quality while reducing costs. Tarik holds a PhD in Electrical and Computer Engineering from Georgia Institute of Technology.
Sameer Thombare is a Senior Product Manager at Amazon with over a decade of experience in Product Management, Category/P&L Management across diverse industries, including heavy engineering, telecommunications, finance, and eCommerce. Sameer is passionate about developing continuously improving closed-loop systems and leads strategic initiatives within Amazon Selection and Catalog Systems (ASCS) to build a sophisticated self-learning closed-loop system that synthesize signals from customers, sellers, and supply chain operations to optimize outcomes. Sameer holds an MBA from the Indian Institute of Management Bangalore and an engineering degree from Mumbai University.
Amin Banitalebi received his PhD in the Digital Media at the University of British Columbia (UBC), Canada, in 2014. Since then, he has taken various applied science roles spanning over areas in computer vision, natural language processing, recommendation systems, classical machine learning, and generative AI. Amin has co-authored over 90 publications and patents. He is currently an Applied Science Manager in Amazon Everyday Essentials.
Puneet Sahni is a Senior Principal Engineer at Amazon Selection and Catalog Systems (ASCS), where he has spent over 8 years improving the completeness, consistency, and correctness of catalog data. He specializes in catalog data modeling and its application to enhancing Selling Partner and customer experiences, while using ML/DL and LLM-based enrichment to drive improvements in catalog data quality.
Erdinc Basci joined Amazon in 2015 and brings over 23 years of technology industry experience. At Amazon, he has led the evolution of Catalog system architectures—including ingestion pipelines, prioritized processing, and traffic shaping—as well as catalog data architecture improvements such as segmented offers, product specifications for manufacture-on-demand products, and catalog data experimentation. Erdinc has championed a hands-on performance engineering culture across Amazon services unlocking $1B+ annualized cost savings and 20%+ latency wins across core Stores services. He is currently focused on improving generative AI application performance and GPU efficiency across Amazon. Erdinc holds a BS in Computer Science from Bilkent University, Turkey, and an MBA from Seattle University, US.
Mey Meenakshisundaram is a Director in Amazon Selection and Catalog Systems, where he leads innovative GenAI solutions to establish Amazon’s worldwide catalog as the best-in-class source for product information. His team pioneers advanced machine learning techniques, including multi-agent systems and large language models, to automatically enrich product attributes and improve catalog quality at scale. High-quality product information in the catalog is critical for delighting customers in finding the right products, empowering selling partners to list their products effectively, and enabling Amazon operations to reduce manual effort.

Microsoft Releases VibeVoice-ASR: A Unified Speech-to-Text Model Desig …

Microsoft has released VibeVoice-ASR as part of the VibeVoice family of open source frontier voice AI models. VibeVoice-ASR is described as a unified speech-to-text model that can handle 60-minute long-form audio in a single pass and output structured transcriptions that encode Who, When, and What, with support for Customized Hotwords.

VibeVoice sits in a single repository that hosts Text-to-Speech, real time TTS, and Automatic Speech Recognition models under an MIT license. VibeVoice uses continuous speech tokenizers that run at 7.5 Hz and a next-token diffusion framework where a Large Language Model reasons over text and dialogue and a diffusion head generates acoustic detail. This framework is mainly documented for TTS, but it defines the overall design context in which VibeVoice-ASR lives.

https://huggingface.co/microsoft/VibeVoice-ASR

Long form ASR with a single global context

Unlike conventional ASR (Automatic Speech Recognition) systems that first cut audio into short segments and then run diarization and alignment as separate components, VibeVoice-ASR is designed to accept up to 60 minutes of continuous audio input within a 64K token length budget. The model keeps one global representation of the full session. This means the model can maintain speaker identity and topic context across the entire hour instead of resetting every few seconds.

60-minute Single-Pass Processing

The first key feature is that many conventional ASR systems process long audio by cutting it into short segments, which can lose global context. VibeVoice-ASR instead takes up to 60 minutes of continuous audio within a 64K token window so it can maintain consistent speaker tracking and semantic context across the entire recording.

This is important for tasks like meeting transcription, lectures, and long support calls. A single pass over the complete sequence simplifies the pipeline. There is no need to implement custom logic to merge partial hypotheses or repair speaker labels at boundaries between audio chunks.

Customized Hotwords for domain accuracy

Customized Hotwords are the second key feature. Users can provide hotwords such as product names, organization names, technical terms, or background context. The model uses these hotwords to guide the recognition process.

This allows you to bias decoding toward the correct spelling and pronunciation for domain specific tokens without retraining the model. For example, a dev-user can pass internal project names or customer specific terms at inference time. This is useful when deploying the same base model across several products that share similar acoustic conditions but very different vocabularies.

Microsoft also ships a finetuning-asr directory with LoRA based fine tuning scripts for VibeVoice-ASR. Together, hotwords and LoRA fine tuning give a path for both light weight adaptation and deeper domain specialization.

Rich Transcription, diarization, and timing

The third feature is Rich Transcription with Who, When, and What. The model jointly performs ASR, diarization, and timestamping, and returns a structured output that indicates who said what and when.

See below the three evaluation figures named DER, cpWER, and tcpWER.

https://huggingface.co/microsoft/VibeVoice-ASR

DER is Diarization Error Rate, it measures how well the model assigns speech segments to the correct speaker

cpWER and tcpWER are word error rate metrics computed under conversational settings

These graphs summarize how well the model performs on multi speaker long form data, which is the primary target setting for this ASR system.

The structured output format is well suited for downstream processing like speaker specific summarization, action item extraction, or analytics dashboards. Since segments, speakers, and timestamps already come from a single model, downstream code can treat the transcript as a time aligned event log.

Key Takeaways

VibeVoice-ASR is a unified speech to text model that handles 60 minute long form audio in a single pass within a 64K token context.

The model jointly performs ASR, diarization, and timestamping so it outputs structured transcripts that encode Who, When, and What in a single inference step.

Customized Hotwords let users inject domain specific terms such as product names or technical jargon to improve recognition accuracy without retraining the model.

Evaluation with DER, cpWER, and tcpWER focuses on multi speaker conversational scenarios which aligns the model with meetings, lectures, and long calls.

VibeVoice-ASR is released in the VibeVoice open source stack under MIT license with official weights, fine tuning scripts, and an online Playground for experimentation.

Check out the Model Weights, Repo and Playground. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post Microsoft Releases VibeVoice-ASR: A Unified Speech-to-Text Model Designed to Handle 60-Minute Long-Form Audio in a Single Pass appeared first on MarkTechPost.

How PDI built an enterprise-grade RAG system for AI applications with …

PDI Technologies is a global leader in the convenience retail and petroleum wholesale industries. They help businesses around the globe increase efficiency and profitability by securely connecting their data and operations. With 40 years of experience, PDI Technologies assists customers in all aspects of their business, from understanding consumer behavior to simplifying technology ecosystems across the supply chain.
Enterprises face a significant challenge of making their knowledge bases accessible, searchable, and usable by AI systems. Internal teams at PDI Technologies were struggling with information scattered across disparate systems including websites, Confluence pages, SharePoint sites, and various other data sources. To address this, PDI Technologies built PDI Intelligence Query (PDIQ), an AI assistant that gives employees access to company knowledge through an easy-to-use chat interface. This solution is powered by a custom Retrieval Augmented Generation (RAG) system, built on Amazon Web Services (AWS) using serverless technologies. Building PDIQ required addressing the following key challenges:

Automatically extracting content from diverse sources with different authentication requirements
Needing the flexibility to select, apply, and interchange the most suitable large language model (LLM) for diverse processing requirements
Processing and indexing content for semantic search and contextual retrieval
Creating a knowledge foundation that enables accurate, relevant AI responses
Continuously refreshing information through scheduled crawling
Supporting enterprise-specific context in AI interactions

In this post, we walk through the PDIQ process flow and architecture, focusing on the implementation details and the business outcomes it has helped PDI achieve.
Solution architecture
In this section, we explore PDIQ’s comprehensive end-to-end design. We examine the data ingestion pipeline from initial processing through storage to user search capabilities, as well as the zero-trust security framework that protects key user personas throughout their platform interactions. The architecture consists of these elements:

Scheduler – Amazon EventBridge maintains and executes the crawler scheduler.
Crawlers – AWS Lambda invokes crawlers that are executed as tasks by Amazon Elastic Container Service (Amazon ECS).
Amazon DynamoDB – Persists crawler configurations and other metadata such as Amazon Simple Storage Service (Amazon S3) image location and captions.
Amazon S3 – All source documents are stored in Amazon S3. Amazon S3 events trigger the downstream flow for every object that is created or deleted.
Amazon Simple Notification Service (Amazon SNS) – Receives notification from Amazon S3 events.
Amazon Simple Queue Service (Amazon SQS) – Subscribed to Amazon SNS to hold the incoming requests in a queue.
AWS Lambda – Handles the business logic for chunking, summarizing, and generating vector embeddings.
Amazon Bedrock – Provides API access to foundation models (FMs) used by PDIQ:

Amazon Nova Lite to generate image caption
Amazon Nova Micro to generate document summary
Amazon Titan Text Embeddings V2 to generate vector embeddings
Amazon Nova Pro to generate responses to user inquiries

Amazon Aurora PostgreSQL-Compatible Edition – Stores vector embeddings.

The following diagram is the solution architecture.

Next, we review how PDIQ implements a zero-trust security model with role-based access control for two key personas:

Administrators configure knowledge bases and crawlers through Amazon Cognito user groups integrated with enterprise single sign-on. Crawler credentials are encrypted at rest using AWS Key Management Service (AWS KMS) and only accessible within isolated execution environments.
End users access knowledge bases based on group permissions validated at the application layer. Users can belong to multiple groups (such as human resources or compliance) and switch contexts to query role-appropriate datasets.

Process flow
In this section, we review the end-to-end process flow. We break it down by sections to dive deeper into each step and explain the functionality.

Crawlers
Crawlers are configured by Administrator to collect data from a variety of sources that PDI relies on. Crawlers hydrate the data into the knowledge base so that this information can be retrieved by end users. PDIQ currently supports the following crawler configurations:

Web crawler – By using Puppeteer for headless browser automation, the crawler converts HTML web pages to markdown format using turndown. By following the embedded links on the website, the crawler can capture full context and relationships between pages. Additionally, the crawler downloads assets such as PDFs and images while preserving the original reference and offers users configuration options such as rate limiting.
Confluence crawler – This crawler uses Confluence REST API with authenticated access to extract page content, attachments, and embedded images. It preserves page hierarchy and relationships, handles special Confluence elements such as info boxes, notes, and many more.
Azure DevOps crawler – PDI uses Azure DevOps to manage its code base, track commits, and maintain project documentation in a centralized repository. PDIQ uses Azure DevOps REST API with OAuth or personal access token (PAT) authentication to extract this information. Azure DevOps crawler preserves project hierarchy, sprint relationships, and backlog structure also maps work item relationships (such as parent/child or linked items), thereby providing a complete view of the dataset.
SharePoint crawler – It uses Microsoft Graph API with OAuth authentication to extract document libraries, lists, pages, and file content. The crawler processes MS Office documents (Word, Excel, PowerPoint) into searchable text and maintains document version history and permission metadata.

By building separate crawler configurations, PDIQ offers easy extensibility into the platform to configure additional crawlers on demand. It also offers the flexibility to administrator users to configure the settings for their respective crawlers (such as frequency, depth, or rate limits).
The following figure shows the PDIQ UI to configure the knowledge base.

The following figure shows the PDI UI to configure your crawler (such as Confluence).

The following figure shows the PDIQ UI to schedule crawlers.

Handling images
Data crawled is stored in Amazon S3 with proper metadata tags. If the source is in HTML format, the task converts the content into markdown (.md) files. For these markdown files, there is an additional optimization step performed to replace the images in the document with the Amazon S3 reference location. Key benefits of this approach include:

PDI can use S3 object keys to uniquely reference each image, thereby optimizing the synchronization process to detect changes in source data
You can optimize storage by replacing images with captions and avoiding the need to store duplicate images
It provides the ability to make the content of the images searchable and relatable to the text content in the document
Seamlessly inject original images when rendering a response to user inquiry

The following is a sample markdown file where images are replaced with the S3 file location:

![image-20230113-074652](https:// amzn-s3-demo-bucket.s3.amazonaws.com/kb/123/file/attachments/12133171243_image-20230113-074652.png)

Document processing
This is the most critical step of the process. The key objective of this step is to generate vector embeddings so that they can be used for similarity matching and effective retrieval based on user inquiry. The process follows several steps, starting with image captioning, then document chunking, summary generation, and embedding generation. To caption the images, PDIQ scans the markdown files to locate image tags <image>. For each of these images, PDIQ scans and generates an image caption that explains the content of the image. This caption gets injected back into the markdown file, next to the <image> tag, thereby enriching the document content. This approach offers improved contextual searchability. PDIQ enhances content discovery by embedding insights extracted from images directly into the original markdown files. This approach ensures that image content becomes part of the searchable text, enabling richer and more accurate context retrieval during search and analysis. The approach also saves costs. To avoid unnecessary LLM inference calls for exact same images, PDIQ stores image metadata (file location and generated captions) in Amazon DynamoDB. This step enables efficient reuse of previously generated captions, eliminating the need for repeated caption generation calls to LLM.
The following is an example of an image caption prompt:

You are a professional image captioning assistant. Your task is to provide clear, factual, and objective descriptions of images. Focus on describing visible elements, objects, and scenes in a neutral and appropriate manner.

The following is a snippet of markdown file that contains the image tag, LLM-generated caption, and the corresponding S3 file location:

![image-20230818-114454: The image displays a security tip notification on a computer screen. The notification is titled “Security tip” and advises the user to use generated passwords to keep their accounts safe. The suggested password, “2m5oFX#g&tLRMhN3,” is shown in a green box. Below the suggested password, there is a section labeled “Very Strong,” indicating the strength of the password. The password length is set to 16 characters, and it includes lowercase letters, uppercase letters, numbers, and symbols. There is also a “Dismiss” button to close the notification. Below the password section, there is a link to “See password history.” The bottom of the image shows navigation icons for “Vault,” “Generator,” “Alerts,” and “Account.” The “Generator” icon is highlighted in red.]
(https:// amzn-s3-demo-bucket.s3.amazonaws.com/kb/ABC/file/attachments/12133171243_image-20230818-114454.png)

Now that markdown files are injected with image captions, the next step is to break the original document into chunks that fit into the context window of the embeddings model. PDIQ uses Amazon Titan Text Embeddings V2 model to generate vectors and stores them in Aurora PostgreSQL-Compatible Serverless. Based on internal accuracy testing and chunking best practices from AWS, PDIQ performs chunking as follows:

70% of the tokens for content
10% overlap between chunks
20% for summary tokens

Using the document chunking logic from the previous step, the document is converted into vector embeddings. The process includes:

Calculate chunk parameters – Determine the size and total number of chunks required for the document based on the 70% calculation.
Generate document summary – Use Amazon Nova Lite to create a summary of the entire document, constrained by the 20% token allocation. This summary is reused across all chunks to provide consistent context.
Chunk and prepend summary – Split the document into overlapping chunks (10%), with the summary prepended at the top.
Generate embeddings – Use Amazon Titan Text Embeddings V2 to generate vector embeddings for each chunk (summary plus content), which is then stored in the vector store.

By designing a customized approach to generate a summary section atop of all chunks, PDIQ ensures that when a particular chunk is matched based on similarity search, the LLM has access to the entire summary of the document and not only the chunk that matched. This approach enriches end user experience resulting in an increase of approval rate for accuracy from 60% to 79%.
The following is an example of a summarization prompt:

You are a specialized document summarization assistant with expertise in business and technical content.

Your task is to create concise, information-rich summaries that:
Preserve all quantifiable data (numbers, percentages, metrics, dates, financial figures)
Highlight key business terminology and domain-specific concepts
Extract important entities (people, organizations, products, locations)
Identify critical relationships between concepts
Maintain factual accuracy without adding interpretations
Focus on extracting information that would be most valuable for:
Answering specific business questions
Supporting data-driven decision making
Enabling precise information retrieval in a RAG system
The summary should be comprehensive yet concise, prioritizing specific facts over general descriptions.
Include any tables, lists, or structured data in a format that preserves their relationships.
Ensure all technical terms, acronyms, and specialized vocabulary are preserved exactly as written.

The following is an example of summary text, available on each chunk:

### Summary: PLC User Creation Process and Password Reset
**Document Overview:**
This document provides instructions for creating new users and resetting passwords
**Key Instructions:**

{Shortened for Blog illustration}

This summary captures the essential steps, requirements, and entities involved in the PLC user creation and password reset process using Jenkins.

Chunk 1 has a summary at the top followed by details from the source:

{Summary Text from above}
This summary captures the essential steps, requirements, and entities involved in the PLC user creation and password reset process using Jenkins.

title: 2. PLC User Creation Process and Password Reset

![image-20230818-114454: The image displays a security tip notification on a computer screen. The notification is titled “Security tip” and advises the user to use generated passwords to keep their accounts safe. The suggested password, “2m5oFX#g&tLRMhN3,” is shown in a green box. Below the suggested password, there is a section labeled “Very Strong,” indicating the strength of the password. The password length is set to 16 characters, and it includes lowercase letters, uppercase letters, numbers, and symbols. There is also a “Dismiss” button to close the notification. Below the password section, there is a link to “See password history.” The bottom of the image shows navigation icons for “Vault,” “Generator,” “Alerts,” and “Account.” The “Generator” icon is highlighted in red.](https:// amzn-s3-demo-bucket.s3.amazonaws.com/kb/123/file/attachments/12133171243_image-20230818-114454.png)

Chunk 2 has a summary at the top, followed by continuation of details from the source:

{Summary Text from above}
This summary captures the essential steps, requirements, and entities involved in the PLC user creation and password reset process using Jenkins.

Maintains a menu with options such as

![image-20230904-061307: – The generated text has been blocked by our content filters.](https:// amzn-s3-demo-bucket.s3.amazonaws.com/kb/123/file/attachments/12133171243_image-20230904-061307.png)

PDIQ scans each document chunk and generates vector embeddings. This data is stored in Aurora PostgreSQL database with key attributes, including a unique knowledge base ID, corresponding embeddings attribute, original text (summary plus chunk plus image caption), and a JSON binary object that includes metadata fields for extensibility. To keep the knowledge base in sync, PDI implements the following steps:

Add – These are net new source objects that should be ingested. PDIQ implements the document processing flow described previously.
Update – If PDIQ determines the same object is present, it compares the hash key value from the source with the hash value from the JSON object.
Delete – If PDIQ determines that a specific source document no longer exists, it triggers a delete operation on the S3 bucket (s3:ObjectRemoved:*), which results in a cleanup job, deleting the records corresponding to the key value in the Aurora table.

PDI uses Amazon Nova Pro to retrieve the most relevant document and generates a response by following these key steps:

Using similarity search, retrieves the most relevant document chunks, which include summary, chunk data, image caption, and image link.
For the matching chunk, retrieve the entire document.
LLM then replaces the image link with the actual image from Amazon S3.
LLM generates a response based on the data retrieved and the preconfigured system prompt.

The following is a snippet of system prompt:

Support assistant specializing in PDI’s Logistics(PLC) platform, helping staff research and resolve support cases in Salesforce. You will assist with finding solutions, summarizing case information, and recommending appropriate next steps for resolution.

Professional, clear, technical when needed while maintaining accessible language.

Resolution Process:
Response Format template:
Handle Confidential Information:

Outcomes and next steps
By building this customized RAG solution on AWS, PDI realized the following benefits:

Flexible configuration options allow data ingestion at consumer-preferred frequencies.
Scalable design enables future ingestion from additional source systems through easily configurable crawlers.
Supports crawler configuration using multiple authentication methods, including username and password, secret key-value pairs, and API keys.
Customizable metadata fields enable advanced filtering and improve query performance.
Dynamic token management helps PDI intelligently balance tokens between content and summaries, enhancing user responses.
Consolidates diverse source data formats into a unified layout for streamlined storage and retrieval.

PDIQ provides key business outcomes that include:

Improved efficiency and resolution rates – The tool empowers PDI support teams to resolve customer queries significantly faster, often automating routine issues and providing immediate, precise responses. This has led to less customer waiting on case resolution and more productive agents.
High customer satisfaction and loyalty – By delivering accurate, relevant, and personalized answers grounded in live documentation and company knowledge, PDIQ increased customer satisfaction scores (CSAT), net promoter scores (NPS), and overall loyalty. Customers feel heard and supported, strengthening PDI brand relationships.
Cost reduction – PDIQ handles the bulk of repetitive queries, allowing limited support staff to focus on expert-level cases, which improves productivity and morale. Additionally, PDIQ is built on serverless architecture, which automatically scales while minimizing operational overhead and cost.
Business flexibility – A single platform can serve different business units, who can curate the content by configuring their respective data sources.
Incremental value – Each new content source adds measurable value without system redesign.

PDI continues to enhance the application with several planned improvements in the pipeline, including:

Build additional crawler configuration for new data sources (for example, GitHub).
Build agentic implementation for PDIQ to be integrated into larger complex business processes.
Enhanced document understanding with table extraction and structure preservation.
Multilingual support for global operations.
Improved relevance ranking with hybrid retrieval techniques.
Ability to invoke PDIQ based on events (for example, source commits).

Conclusion
PDIQ service has transformed how users access and use enterprise knowledge at PDI Technologies. By using Amazon serverless services, PDIQ can automatically scale with demand, reduce operational overhead, and optimize costs. The solution’s unique approach to document processing, including the dynamic token management and the custom image captioning system, represents significant technical innovation in enterprise RAG systems. The architecture successfully balances performance, cost, and scalability while maintaining security and authentication requirements. As PDI Technologies continue to expand PDIQ’s capabilities, they’re excited to see how this architecture can adapt to new sources, formats, and use cases.

About the authors
Samit Kumbhani is an Amazon Web Services (AWS) Senior Solutions Architect in the New York City area with over 18 years of experience. He currently partners with independent software vendors (ISVs) to build highly scalable, innovative, and secure cloud solutions. Outside of work, Samit enjoys playing cricket, traveling, and biking.
Jhorlin De Armas is an Architect II at PDI Technologies, where he leads the design of AI-driven platforms on Amazon Web Services (AWS). Since joining PDI in 2024, he has architected a compositional AI service that enables configurable assistants, agents, knowledge bases, and guardrails using Amazon Bedrock, Aurora Serverless, AWS Lambda, and DynamoDB. With over 18 years of experience building enterprise software, Jhorlin specializes in cloud-centered architectures, serverless platforms, and AI/ML solutions.
David Mbonu is a Sr. Solutions Architect at Amazon Web Services (AWS), helping horizontal business application ISV customers build and deploy transformational solutions on AWS. David has over 27 years of experience in enterprise solutions architecture and system engineering across software, FinTech, and public cloud companies. His recent interests include AI/ML, data strategy, observability, resiliency, and security. David and his family reside in Sugar Hill, GA.