Moonshot AI Releases Kimi K2.5: An Open Source Visual Agentic Intellig …

Moonshot AI has released Kimi K2.5 as an open source visual agentic intelligence model. It combines a large Mixture of Experts language backbone, a native vision encoder, and a parallel multi agent system called Agent Swarm. The model targets coding, multimodal reasoning, and deep web research with strong benchmark results on agentic, vision, and coding suites.

Model Architecture and Training

Kimi K2.5 is a Mixture of Experts model with 1T total parameters and about 32B activated parameters per token. The network has 61 layers. It uses 384 experts, with 8 experts selected per token plus 1 shared expert. The attention hidden size is 7168 and there are 64 attention heads.

The model uses MLA attention and the SwiGLU activation function. The tokenizer vocabulary size is 160K. The maximum context length during training and inference is 256K tokens. This supports long tool traces, long documents, and multi step research workflows.

Vision is handled by a MoonViT encoder with about 400M parameters. Visual tokens are trained together with text tokens in a single multimodal backbone. Kimi K2.5 is obtained by continual pretraining on about 15T tokens of mixed vision and text data on top of Kimi K2 Base. This native multimodal training is important because the model learns joint structure over images, documents, and language from the start.

The released checkpoints support standard inference stacks such as vLLM, SGLang, and KTransformers with transformers version 4.57.1 or newer. Quantized INT4 variants are available, reusing the method from Kimi K2 Thinking. This allows deployment on commodity GPUs with lower memory budgets.

Coding and Multimodal Capabilities

Kimi K2.5 is positioned as a strong open source coding model, especially when code generation depends on visual context. The model can read UI mockups, design screenshots, or even videos, then emit structured frontend code with layout, styling, and interaction logic.

Moonshot shows examples where the model reads a puzzle image, reasons about the shortest path, and then writes code that produces a visualized solution. This demonstrates cross modal reasoning, where the model combines image understanding, algorithmic planning, and code synthesis in a single flow.

Because K2.5 has a 256K context window, it can keep long specification histories in context. A practical workflow for developers is to mix design assets, product docs, and existing code in one prompt. The model can then refactor or extend the codebase while keeping visual constraints aligned with the original design.

https://www.kimi.com/blog/kimi-k2-5.html?

Agent Swarm and Parallel Agent Reinforcement Learning

A key feature of Kimi K2.5 is Agent Swarm. This is a multi agent system trained with Parallel Agent Reinforcement Learning, PARL. In this setup an orchestrator agent decomposes a complex goal into many subtasks. It then spins up domain specific sub agents to work in parallel.

Kimi team reports that K2.5 can manage up to 100 sub agents within a task. It supports up to 1,500 coordinated steps or tool calls in one run. This parallelism gives about 4.5 times faster completion compared with a single agent pipeline on wide search tasks.

PARL introduces a metric called Critical Steps. The system rewards policies that reduce the number of serial steps needed to solve the task. This discourages naive sequential planning and pushes the agent to split work into parallel branches while still maintaining consistency.

One example by the Kimi team is a research workflow where the system needs to discover many niche creators. The orchestrator uses Agent Swarm to spawn a large number of researcher agents. Each agent explores different regions of the web, and the system merges results into a structured table.

https://www.kimi.com/blog/kimi-k2-5.html?

Benchmark Performance

On agentic benchmarks, Kimi K2.5 reports strong numbers. On HLE Full with tools the score is 50.2. On BrowseComp with context management the score is 74.9. In Agent Swarm mode the BrowseComp score increases further to 78.4 and WideSearch metrics also improve. The Kimi team compares these values with GPT 5.2, Claude 4.5, Gemini 3 Pro, and DeepSeek V3, and K2.5 shows the highest scores among the listed models on these specific agentic suites.

On vision and video benchmarks K2.5 also reports high scores. MMMU Pro is 78.5 and VideoMMMU is 86.6. The model performs well on OmniDocBench, OCRBench, WorldVQA, and other document and scene understanding tasks. These results indicate that the MoonViT encoder and long context training are effective for real world multimodal problems, such as reading complex documents and reasoning over videos.

https://www.kimi.com/blog/kimi-k2-5.html?

For coding benchmarks it lists SWE Bench Verified at 76.8, SWE Bench Pro at 50.7, SWE Bench Multilingual at 73.0, Terminal Bench 2.0 at 50.8, and LiveCodeBench v6 at 85.0. These numbers place K2.5 among the strongest open source coding models currently reported on these tasks.

On long context language benchmarks, K2.5 reaches 61.0 on LongBench V2 and 70.0 on AA LCR under standard evaluation settings. For reasoning benchmarks it achieves high scores on AIME 2025, HMMT 2025 February, GPQA Diamond, and MMLU Pro when used in thinking mode.

Key Takeaways

Mixture of Experts at trillion scale: Kimi K2.5 uses a Mixture of Experts architecture with 1T total parameters and about 32B active parameters per token, 61 layers, 384 experts, and 256K context length, optimized for long multimodal and tool heavy workflows.

Native multimodal training with MoonViT: The model integrates a MoonViT vision encoder of about 400M parameters and is trained on about 15T mixed vision and text tokens, so images, documents, and language are handled in a single unified backbone.

Parallel Agent Swarm with PARL: Agent Swarm, trained with Parallel Agent Reinforcement Learning, can coordinate up to 100 sub agents and about 1,500 tool calls per task, giving around 4.5 times faster execution versus a single agent on wide research tasks.

Strong benchmark results in coding, vision, and agents: K2.5 reports 76.8 on SWE Bench Verified, 78.5 on MMMU Pro, 86.6 on VideoMMMU, 50.2 on HLE Full with tools, and 74.9 on BrowseComp, matching or exceeding listed closed models on several agentic and multimodal suites.

Check out the Technical details and Model Weight. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post Moonshot AI Releases Kimi K2.5: An Open Source Visual Agentic Intelligence Model with Native Swarm Execution appeared first on MarkTechPost.

DSGym Offers a Reusable Container Based Substrate for Building and Ben …

Data science agents should inspect datasets, design workflows, run code, and return verifiable answers, not just autocomplete Pandas code. DSGym, introduced by researchers from Stanford University, Together AI, Duke University, and Harvard University, is a framework that evaluates and trains such agents across more than 1,000 data science challenges with expert curated ground truth and a consistent post training pipeline.

https://arxiv.org/pdf/2601.16344

Why existing benchmarks fall short?

The research team first probe existing benchmarks that claim to test data aware agents. When data files are hidden, models still retain high accuracy. On QRData the average drop is 40.5 percent, on DAEval it is 86.8 percent, and on DiscoveryBench it is 44.4 percent. Many questions are solvable using priors and pattern matching on the text alone instead of genuine data analysis, and they also find annotation errors and inconsistent numerical tolerances.

Task, Agent, and Environment

DSGym standardizes evaluation into three objects, Task, Agent, and Environment. Tasks are either Data Analysis or Data Prediction. Data Analysis tasks provide one or more files along with a natural language question that must be answered through code. Data Prediction tasks provide train and test splits along with an explicit metric and require the agent to build a modeling pipeline and output predictions.

Each task is packed into a Task Object that holds the data files, query prompt, scoring function, and metadata. Agents interact through a CodeAct style loop. At each turn, the agent writes a reasoning block that describes its plan, a code block that runs inside the environment, and an answer block when it is ready to commit. The Environment is implemented as a manager and worker cluster of Docker containers, where each worker mounts data as read only volumes, exposes a writable workspace, and ships with domain specific Python libraries.

DSGym Tasks, DSBio, and DSPredict

On top of this runtime, DSGym Tasks aggregates and refines existing datasets and adds new ones. The research team clean QRData, DAEval, DABStep, MLEBench Lite, and others by dropping unscorable items and applying a shortcut filter that removes questions solved easily by multiple models without data access.

To cover scientific discovery, they introduce DSBio, a suite of 90 bioinformatics tasks derived from peer reviewed papers and open source datasets. Tasks cover single cell analysis, spatial and multi-omics, and human genetics, with deterministic numerical or categorical answers supported by expert reference notebooks.

DSPredict targets modeling on real Kaggle competitions. A crawler collects recent competitions that accept CSV submissions and satisfy size and clarity rules. After preprocessing, the suite is split into DSPredict Easy with 38 playground style and introductory competitions, and DSPredict Hard with 54 high complexity challenges. In total, DSGym Tasks includes 972 data analysis tasks and 114 prediction tasks.

What current agents can and cannot do

The evaluation covers closed source models such as GPT-5.1, GPT-5, and GPT-4o, open weights models such as Qwen3-Coder-480B, Qwen3-235B-Instruct, and GPT-OSS-120B, and smaller models such as Qwen2.5-7B-Instruct and Qwen3-4B-Instruct. All are run with the same CodeAct agent, temperature 0, and tools disabled.

On cleaned general analysis benchmarks, such as QRData Verified, DAEval Verified, and the easier split of DABStep, top models reach between 60 percent and 90 percent exact match accuracy. On DABStep Hard, accuracy drops for every model, which shows that multi step quantitative reasoning over financial tables is still brittle.

DSBio exposes a more severe weakness. Kimi-K2-Instruct achieves the best overall accuracy of 43.33 percent. For all models, between 85 and 96 percent of inspected failures on DSBio are domain grounding errors, including misuse of specialized libraries and incorrect biological interpretations, rather than basic coding mistakes.

On MLEBench Lite and DSPredict Easy, most frontier models achieve near perfect Valid Submission Rate above 80 percent. On DSPredict Hard, valid submissions rarely exceed 70 percent and medal rates on Kaggle leaderboards are near 0 percent. This pattern supports the research team’s observation of a simplicity bias where agents stop after a baseline solution instead of exploring more competitive models and hyperparameters.

DSGym as a data factory and training ground

The same environment can also synthesize training data. Starting from a subset of QRData and DABStep, the research team ask agents to explore datasets, propose questions, solve them with code, and record trajectories, which yields 3,700 synthetic queries. A judge model filters these to a set of 2,000 high quality query plus trajectory pairs called DSGym-SFT, and fine-tuning a 4B Qwen3 based model on DSGym-SFT produces an agent that reaches competitive performance with GPT-4o on standardized analysis benchmarks despite having far fewer parameters.

source: marktechpost.com

Key Takeaways

DSGym provides a unified Task, Agent, and Environment framework, with containerized execution and a CodeAct style loop, to evaluate data science agents on real code based workflows instead of static prompts.

The benchmark suite, DSGym-Tasks, consolidates and cleans prior datasets and adds DSBio and DSPredict, reaching 972 data analysis tasks and 114 prediction tasks across domains such as finance, bioinformatics, and earth science.

Shortcut analysis on existing benchmarks shows that removing data access only moderately reduces accuracy in many cases, which confirms that prior evaluations often measure pattern matching on text rather than genuine data analysis.

Frontier models achieve strong performance on cleaned general analysis tasks and on easier prediction tasks, but they perform poorly on DSBio and DSPredict-Hard, where most errors come from domain grounding issues and conservative, under tuned modeling pipelines.

The DSGym-SFT dataset, built from 2,000 filtered synthetic trajectories, enables a 4B Qwen3 based agent to approach GPT-4o level accuracy on several analysis benchmarks, which shows that execution grounded supervision on structured tasks is an effective way to improve data science agents.

Check out the Paper, and Repo. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post DSGym Offers a Reusable Container Based Substrate for Building and Benchmarking Data Science Agents appeared first on MarkTechPost.

How Tree-KG Enables Hierarchical Knowledge Graphs for Contextual Navig …

In this tutorial, we implement Tree-KG, an advanced hierarchical knowledge graph system that goes beyond traditional retrieval-augmented generation by combining semantic embeddings with explicit graph structure. We show how we can organize knowledge in a tree-like hierarchy that mirrors how humans learn, from broad domains to fine-grained concepts, and then reason across this structure using controlled multi-hop exploration. By building the graph from scratch, enriching nodes with embeddings, and designing a reasoning agent that navigates ancestors, descendants, and related concepts, we demonstrate how we can achieve contextual navigation and explainable reasoning rather than flat, chunk-based retrieval. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browser!pip install networkx matplotlib anthropic sentence-transformers scikit-learn numpy

import networkx as nx
import matplotlib.pyplot as plt
from typing import List, Dict, Tuple, Optional, Set
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from sentence_transformers import SentenceTransformer
from collections import defaultdict, deque
import json

We install and import all the core libraries required to build and reason over the Tree-KG system. We set up tools for graph construction and visualization, semantic embedding and similarity search, and efficient data handling for traversal and scoring. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserclass TreeKnowledgeGraph:
“””
Hierarchical Knowledge Graph that mimics human learning patterns.
Supports multi-hop reasoning and contextual navigation.
“””

def __init__(self, embedding_model: str = ‘all-MiniLM-L6-v2’):
self.graph = nx.DiGraph()
self.embedder = SentenceTransformer(embedding_model)
self.node_embeddings = {}
self.node_metadata = {}

def add_node(self,
node_id: str,
content: str,
node_type: str = ‘concept’,
metadata: Optional[Dict] = None):
“””Add a node with semantic embedding and metadata.”””

embedding = self.embedder.encode(content, convert_to_tensor=False)

self.graph.add_node(node_id,
content=content,
node_type=node_type,
metadata=metadata or {})

self.node_embeddings[node_id] = embedding
self.node_metadata[node_id] = {
‘content’: content,
‘type’: node_type,
‘metadata’: metadata or {}
}

def add_edge(self,
parent: str,
child: str,
relationship: str = ‘contains’,
weight: float = 1.0):
“””Add hierarchical or associative edge between nodes.”””
self.graph.add_edge(parent, child,
relationship=relationship,
weight=weight)

def get_ancestors(self, node_id: str, max_depth: int = 5) -> List[str]:
“””Get all ancestor nodes (hierarchical context).”””
ancestors = []
current = node_id
depth = 0

while depth < max_depth:
predecessors = list(self.graph.predecessors(current))
if not predecessors:
break
current = predecessors[0]
ancestors.append(current)
depth += 1

return ancestors

def get_descendants(self, node_id: str, max_depth: int = 2) -> List[str]:
“””Get all descendant nodes.”””
descendants = []
queue = deque([(node_id, 0)])
visited = {node_id}

while queue:
current, depth = queue.popleft()
if depth >= max_depth:
continue

for child in self.graph.successors(current):
if child not in visited:
visited.add(child)
descendants.append(child)
queue.append((child, depth + 1))

return descendants

def semantic_search(self, query: str, top_k: int = 5) -> List[Tuple[str, float]]:
“””Find most semantically similar nodes to query.”””
query_embedding = self.embedder.encode(query, convert_to_tensor=False)

similarities = []
for node_id, embedding in self.node_embeddings.items():
sim = cosine_similarity(
query_embedding.reshape(1, -1),
embedding.reshape(1, -1)
)[0][0]
similarities.append((node_id, float(sim)))

similarities.sort(key=lambda x: x[1], reverse=True)
return similarities[:top_k]

def get_subgraph_context(self, node_id: str, depth: int = 2) -> Dict:
“””Get rich contextual information around a node.”””
context = {
‘node’: self.node_metadata.get(node_id, {}),
‘ancestors’: [],
‘descendants’: [],
‘siblings’: [],
‘related’: []
}

ancestors = self.get_ancestors(node_id)
context[‘ancestors’] = [
self.node_metadata.get(a, {}) for a in ancestors
]

descendants = self.get_descendants(node_id, depth)
context[‘descendants’] = [
self.node_metadata.get(d, {}) for d in descendants
]

parents = list(self.graph.predecessors(node_id))
if parents:
siblings = list(self.graph.successors(parents[0]))
siblings = [s for s in siblings if s != node_id]
context[‘siblings’] = [
self.node_metadata.get(s, {}) for s in siblings
]

return context

We define the core TreeKnowledgeGraph class that structures knowledge as a directed hierarchy enriched with semantic embeddings. We store both graph relationships and dense representations to navigate concepts structurally while also performing similarity-based retrieval. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserclass MultiHopReasoningAgent:
“””
Agent that performs intelligent multi-hop reasoning across the knowledge graph.
“””

def __init__(self, kg: TreeKnowledgeGraph):
self.kg = kg
self.reasoning_history = []

def reason(self,
query: str,
max_hops: int = 3,
exploration_width: int = 3) -> Dict:
“””
Perform multi-hop reasoning to answer a query.

Strategy:
1. Find initial relevant nodes (semantic search)
2. Explore graph context around these nodes
3. Perform breadth-first exploration with relevance scoring
4. Aggregate information from multiple hops
“””

reasoning_trace = {
‘query’: query,
‘hops’: [],
‘final_context’: {},
‘reasoning_path’: []
}

initial_nodes = self.kg.semantic_search(query, top_k=exploration_width)
reasoning_trace[‘hops’].append({
‘hop_number’: 0,
‘action’: ‘semantic_search’,
‘nodes_found’: initial_nodes
})

visited = set()
current_frontier = [node_id for node_id, _ in initial_nodes]
all_relevant_nodes = set(current_frontier)

for hop in range(1, max_hops + 1):
next_frontier = []
hop_info = {
‘hop_number’: hop,
‘explored_nodes’: [],
‘new_discoveries’: []
}

for node_id in current_frontier:
if node_id in visited:
continue

visited.add(node_id)

context = self.kg.get_subgraph_context(node_id, depth=1)

connected_nodes = []
for ancestor in context[‘ancestors’]:
if ‘content’ in ancestor:
connected_nodes.append(ancestor)

for descendant in context[‘descendants’]:
if ‘content’ in descendant:
connected_nodes.append(descendant)

for sibling in context[‘siblings’]:
if ‘content’ in sibling:
connected_nodes.append(sibling)

relevant_connections = self._score_relevance(
query, connected_nodes, top_k=exploration_width
)

hop_info[‘explored_nodes’].append({
‘node_id’: node_id,
‘content’: self.kg.node_metadata[node_id][‘content’][:100],
‘connections_found’: len(relevant_connections)
})

for conn_content, score in relevant_connections:
for nid, meta in self.kg.node_metadata.items():
if meta[‘content’] == conn_content and nid not in visited:
next_frontier.append(nid)
all_relevant_nodes.add(nid)
hop_info[‘new_discoveries’].append({
‘node_id’: nid,
‘relevance_score’: score
})
break

reasoning_trace[‘hops’].append(hop_info)
current_frontier = next_frontier

if not current_frontier:
break

final_context = self._aggregate_context(query, all_relevant_nodes)
reasoning_trace[‘final_context’] = final_context
reasoning_trace[‘reasoning_path’] = list(all_relevant_nodes)

self.reasoning_history.append(reasoning_trace)
return reasoning_trace

def _score_relevance(self,
query: str,
candidates: List[Dict],
top_k: int = 3) -> List[Tuple[str, float]]:
“””Score candidate nodes by relevance to query.”””
if not candidates:
return []

query_embedding = self.kg.embedder.encode(query)

scores = []
for candidate in candidates:
content = candidate.get(‘content’, ”)
if not content:
continue

candidate_embedding = self.kg.embedder.encode(content)
similarity = cosine_similarity(
query_embedding.reshape(1, -1),
candidate_embedding.reshape(1, -1)
)[0][0]
scores.append((content, float(similarity)))

scores.sort(key=lambda x: x[1], reverse=True)
return scores[:top_k]

def _aggregate_context(self, query: str, node_ids: Set[str]) -> Dict:
“””Aggregate and rank information from all discovered nodes.”””

aggregated = {
‘total_nodes’: len(node_ids),
‘hierarchical_paths’: [],
‘key_concepts’: [],
‘synthesized_answer’: []
}

for node_id in node_ids:
ancestors = self.kg.get_ancestors(node_id)
if ancestors:
path = ancestors[::-1] + [node_id]
path_contents = [
self.kg.node_metadata[n][‘content’]
for n in path if n in self.kg.node_metadata
]
aggregated[‘hierarchical_paths’].append(path_contents)

for node_id in node_ids:
meta = self.kg.node_metadata.get(node_id, {})
aggregated[‘key_concepts’].append({
‘id’: node_id,
‘content’: meta.get(‘content’, ”),
‘type’: meta.get(‘type’, ‘unknown’)
})

for node_id in node_ids:
content = self.kg.node_metadata.get(node_id, {}).get(‘content’, ”)
if content:
aggregated[‘synthesized_answer’].append(content)

return aggregated

def explain_reasoning(self, trace: Dict) -> str:
“””Generate human-readable explanation of reasoning process.”””

explanation = [f”Query: {trace[‘query’]}n”]
explanation.append(f”Total hops performed: {len(trace[‘hops’]) – 1}n”)
explanation.append(f”Total relevant nodes discovered: {len(trace[‘reasoning_path’])}nn”)

for hop_info in trace[‘hops’]:
hop_num = hop_info[‘hop_number’]
explanation.append(f”— Hop {hop_num} —“)

if hop_num == 0:
explanation.append(f”Action: Initial semantic search”)
explanation.append(f”Found {len(hop_info[‘nodes_found’])} candidate nodes”)
for node_id, score in hop_info[‘nodes_found’][:3]:
explanation.append(f” – {node_id} (relevance: {score:.3f})”)
else:
explanation.append(f”Explored {len(hop_info[‘explored_nodes’])} nodes”)
explanation.append(f”Discovered {len(hop_info[‘new_discoveries’])} new relevant nodes”)

explanation.append(“”)

explanation.append(“n— Final Aggregated Context —“)
context = trace[‘final_context’]
explanation.append(f”Total concepts integrated: {context[‘total_nodes’]}”)
explanation.append(f”Hierarchical paths found: {len(context[‘hierarchical_paths’])}”)

return “n”.join(explanation)

We implement a multi-hop reasoning agent that actively navigates the knowledge graph instead of passively retrieving nodes. We start from semantically relevant concepts, expand through ancestors, descendants, and siblings, and iteratively score connections to guide exploration across hops. By aggregating hierarchical paths and synthesizing content, we produce both an explainable reasoning trace and a coherent, context-rich answer. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserdef build_software_development_kb() -> TreeKnowledgeGraph:
“””Build a comprehensive software development knowledge graph.”””

kg = TreeKnowledgeGraph()

kg.add_node(‘root’, ‘Software Development and Computer Science’, ‘domain’)

kg.add_node(‘programming’,
‘Programming encompasses writing, testing, and maintaining code to create software applications’,
‘domain’)
kg.add_node(‘architecture’,
‘Software Architecture involves designing the high-level structure and components of software systems’,
‘domain’)
kg.add_node(‘domain’)

kg.add_edge(‘root’, ‘programming’, ‘contains’)
kg.add_edge(‘root’, ‘architecture’, ‘contains’)
kg.add_edge(‘root’, ‘devops’, ‘contains’)

kg.add_node(‘python’,
‘language’)
kg.add_node(‘javascript’,
‘JavaScript is a dynamic language primarily used for web development, enabling interactive client-side and server-side applications’,
‘language’)
kg.add_node(‘rust’,
‘language’)

kg.add_edge(‘programming’, ‘python’, ‘includes’)
kg.add_edge(‘programming’, ‘javascript’, ‘includes’)
kg.add_edge(‘programming’, ‘rust’, ‘includes’)

kg.add_node(‘python_basics’,
‘Python basics include variables, data types, control flow, functions, and object-oriented programming fundamentals’,
‘concept’)
kg.add_node(‘python_performance’,
‘Python Performance optimization involves techniques like profiling, caching, using C extensions, and leveraging async programming’,
‘concept’)
kg.add_node(‘python_data’,
‘Python for Data Science uses libraries like NumPy, Pandas, and Scikit-learn for data manipulation, analysis, and machine learning’,
‘concept’)

kg.add_edge(‘python’, ‘python_basics’, ‘contains’)
kg.add_edge(‘python’, ‘python_performance’, ‘contains’)
kg.add_edge(‘python’, ‘python_data’, ‘contains’)

kg.add_node(‘async_io’,
‘Asynchronous IO in Python allows non-blocking operations using async/await syntax with asyncio library for concurrent tasks’,
‘technique’)
kg.add_node(‘multiprocessing’,
‘Python Multiprocessing uses separate processes to bypass GIL, enabling true parallel execution for CPU-bound tasks’,
‘technique’)
kg.add_node(‘cython’,
‘Cython compiles Python to C for significant performance gains, especially in numerical computations and tight loops’,
‘tool’)
kg.add_node(‘profiling’,
‘Python Profiling identifies performance bottlenecks using tools like cProfile, line_profiler, and memory_profiler’,
‘technique’)

kg.add_edge(‘python_performance’, ‘async_io’, ‘contains’)
kg.add_edge(‘python_performance’, ‘multiprocessing’, ‘contains’)
kg.add_edge(‘python_performance’, ‘cython’, ‘contains’)
kg.add_edge(‘python_performance’, ‘profiling’, ‘contains’)

kg.add_node(‘event_loop’,
‘Event Loop is the core of asyncio that manages and schedules asynchronous tasks, handling callbacks and coroutines’,
‘concept’)
kg.add_node(‘coroutines’,
‘Coroutines are special functions defined with async def that can pause execution with await, enabling cooperative multitasking’,
‘concept’)
kg.add_node(‘asyncio_patterns’,
‘AsyncIO patterns include gather for concurrent execution, create_task for background tasks, and queues for producer-consumer’,
‘pattern’)

kg.add_edge(‘async_io’, ‘event_loop’, ‘contains’)
kg.add_edge(‘async_io’, ‘coroutines’, ‘contains’)
kg.add_edge(‘async_io’, ‘asyncio_patterns’, ‘contains’)

kg.add_node(‘microservices’,
‘Microservices architecture decomposes applications into small, independent services that communicate via APIs’,
‘pattern’)
kg.add_edge(‘architecture’, ‘microservices’, ‘contains’)
kg.add_edge(‘async_io’, ‘microservices’, ‘related_to’)

kg.add_node(‘containers’,
‘Containers package applications with dependencies into isolated units, ensuring consistency across environments’,
‘technology’)
kg.add_edge(‘devops’, ‘containers’, ‘contains’)
kg.add_edge(‘microservices’, ‘containers’, ‘deployed_with’)

kg.add_node(‘numpy_optimization’,
‘NumPy optimization uses vectorization and broadcasting to avoid Python loops, leveraging optimized C and Fortran libraries’,
‘technique’)
kg.add_edge(‘python_data’, ‘numpy_optimization’, ‘contains’)
kg.add_edge(‘python_performance’, ‘numpy_optimization’, ‘related_to’)

return kg

We construct a rich, hierarchical software development knowledge base that progresses from high-level domains down to concrete techniques and tools. We explicitly encode parent–child and cross-domain relationships so that concepts such as Python performance, async I/O, and microservices are structurally connected rather than isolated. This setup allows us to simulate how knowledge is learned and revisited across layers, enabling meaningful multi-hop reasoning over real-world software topics. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserdef visualize_knowledge_graph(kg: TreeKnowledgeGraph,
highlight_nodes: Optional[List[str]] = None):
“””Visualize the knowledge graph structure.”””

plt.figure(figsize=(16, 12))

pos = nx.spring_layout(kg.graph, k=2, iterations=50, seed=42)

node_colors = []
for node in kg.graph.nodes():
if highlight_nodes and node in highlight_nodes:
node_colors.append(‘yellow’)
else:
node_type = kg.graph.nodes[node].get(‘node_type’, ‘concept’)
color_map = {
‘domain’: ‘lightblue’,
‘language’: ‘lightgreen’,
‘concept’: ‘lightcoral’,
‘technique’: ‘lightyellow’,
‘tool’: ‘lightpink’,
‘pattern’: ‘lavender’,
‘technology’: ‘peachpuff’
}
node_colors.append(color_map.get(node_type, ‘lightgray’))

nx.draw_networkx_nodes(kg.graph, pos,
node_color=node_colors,
node_size=2000,
alpha=0.9)

nx.draw_networkx_edges(kg.graph, pos,
edge_color=’gray’,
arrows=True,
arrowsize=20,
alpha=0.6,
width=2)

nx.draw_networkx_labels(kg.graph, pos,
font_size=8,
font_weight=’bold’)

plt.title(“Tree-KG: Hierarchical Knowledge Graph”, fontsize=16, fontweight=’bold’)
plt.axis(‘off’)
plt.tight_layout()
plt.show()

def run_demo():
“””Run complete demonstration of Tree-KG system.”””

print(“=” * 80)
print(“Tree-KG: Hierarchical Knowledge Graph Demo”)
print(“=” * 80)
print()

print(“Building knowledge graph…”)
kg = build_software_development_kb()
print(f”✓ Created graph with {kg.graph.number_of_nodes()} nodes and {kg.graph.number_of_edges()} edgesn”)

print(“Visualizing knowledge graph…”)
visualize_knowledge_graph(kg)

agent = MultiHopReasoningAgent(kg)

queries = [
“How can I improve Python performance for IO-bound tasks?”,
“What are the best practices for async programming?”,
“How does microservices architecture relate to Python?”
]

for i, query in enumerate(queries, 1):
print(f”n{‘=’ * 80}”)
print(f”QUERY {i}: {query}”)
print(‘=’ * 80)

trace = agent.reason(query, max_hops=3, exploration_width=3)

explanation = agent.explain_reasoning(trace)
print(explanation)

print(“n— Sample Hierarchical Paths —“)
for j, path in enumerate(trace[‘final_context’][‘hierarchical_paths’][:3], 1):
print(f”nPath {j}:”)
for k, concept in enumerate(path):
indent = ” ” * k
print(f”{indent}→ {concept[:80]}…”)

print(“n— Synthesized Context —“)
answer_parts = trace[‘final_context’][‘synthesized_answer’][:5]
for part in answer_parts:
print(f”• {part[:150]}…”)

print()

print(“nVisualizing reasoning path for last query…”)
last_trace = agent.reasoning_history[-1]
visualize_knowledge_graph(kg, highlight_nodes=last_trace[‘reasoning_path’])

print(“n” + “=” * 80)
print(“Demo complete!”)
print(“=” * 80)

We visualize the hierarchical structure of the knowledge graph using color and layout to distinguish domains, concepts, techniques, and tools, and optionally highlight the reasoning path. We then run an end-to-end demo in which we build the graph, execute multi-hop reasoning on realistic queries, and print both the reasoning trace and the synthesized context. It allows us to observe how the agent navigates the graph, surfaces hierarchical paths, and explains its conclusions in a transparent and interpretable manner. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserclass AdvancedTreeKG(TreeKnowledgeGraph):
“””Extended Tree-KG with advanced features.”””

def __init__(self, embedding_model: str = ‘all-MiniLM-L6-v2’):
super().__init__(embedding_model)
self.node_importance = {}

def compute_node_importance(self):
“””Compute importance scores using PageRank-like algorithm.”””
if self.graph.number_of_nodes() == 0:
return

pagerank = nx.pagerank(self.graph)
betweenness = nx.betweenness_centrality(self.graph)

for node in self.graph.nodes():
self.node_importance[node] = {
‘pagerank’: pagerank.get(node, 0),
‘betweenness’: betweenness.get(node, 0),
‘combined’: pagerank.get(node, 0) * 0.7 + betweenness.get(node, 0) * 0.3
}

def find_shortest_path_with_context(self,
source: str,
target: str) -> Dict:
“””Find shortest path and extract all context along the way.”””
try:
path = nx.shortest_path(self.graph, source, target)

context = {
‘path’: path,
‘path_length’: len(path) – 1,
‘nodes_detail’: []
}

for node in path:
detail = {
‘id’: node,
‘content’: self.node_metadata.get(node, {}).get(‘content’, ”),
‘importance’: self.node_importance.get(node, {}).get(‘combined’, 0)
}
context[‘nodes_detail’].append(detail)

return context
except nx.NetworkXNoPath:
return {‘path’: [], ‘error’: ‘No path exists’}

We extend the base Tree-KG with graph-level intelligence by computing node importance using centrality measures. We combine PageRank and betweenness scores to identify concepts that play a structurally critical role in connecting knowledge across the graph. It also allows us to retrieve shortest paths enriched with contextual and importance information, enabling more informed and explainable reasoning between any two concepts. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserif __name__ == “__main__”:
run_demo()

print(“nn” + “=” * 80)
print(“ADVANCED FEATURES DEMO”)
print(“=” * 80)

print(“nBuilding advanced Tree-KG…”)
adv_kg = AdvancedTreeKG()

adv_kg = build_software_development_kb()

adv_kg_new = AdvancedTreeKG()
adv_kg_new.graph = adv_kg.graph
adv_kg_new.node_embeddings = adv_kg.node_embeddings
adv_kg_new.node_metadata = adv_kg.node_metadata

print(“Computing node importance scores…”)
adv_kg_new.compute_node_importance()

print(“nTop 5 most important nodes:”)
sorted_nodes = sorted(
adv_kg_new.node_importance.items(),
key=lambda x: x[1][‘combined’],
reverse=True
)[:5]

for node, scores in sorted_nodes:
content = adv_kg_new.node_metadata[node][‘content’][:60]
print(f” {node}: {content}…”)
print(f” Combined score: {scores[‘combined’]:.4f}”)

print(“n✓ Tree-KG Tutorial Complete!”)
print(“nKey Takeaways:”)
print(“1. Tree-KG enables contextual navigation vs simple chunk retrieval”)
print(“2. Multi-hop reasoning discovers relevant information across graph structure”)
print(“3. Hierarchical organization mirrors human learning patterns”)
print(“4. Semantic search + graph traversal = powerful RAG alternative”)

We execute the full Tree-KG demo and then showcase the advanced features to close the loop on the system’s capabilities. We compute node importance scores to surface the most influential concepts in the graph and inspect how structural centrality aligns with semantic relevance. 

In conclusion, we demonstrated how Tree-KG enables richer understanding by unifying semantic search, hierarchical context, and multi-hop reasoning within a single framework. We showed that, instead of merely retrieving isolated text fragments, we can traverse meaningful knowledge paths, aggregate insights across levels, and produce explanations that reflect how conclusions are formed. By extending the system with importance scoring and path-aware context extraction, we illustrated how Tree-KG can serve as a strong foundation for building intelligent agents, research assistants, or domain-specific reasoning systems that demand structure, transparency, and depth beyond conventional RAG approaches.

Check out the FULL CODES here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post How Tree-KG Enables Hierarchical Knowledge Graphs for Contextual Navigation and Explainable Multi-Hop Reasoning Beyond Traditional RAG appeared first on MarkTechPost.

Build reliable Agentic AI solution with Amazon Bedrock: Learn from Pus …

This post was co-written with Saurabh Gupta and Todd Colby from Pushpay.
Pushpay is a market-leading digital giving and engagement platform designed to help churches and faith-based organizations drive community engagement, manage donations, and strengthen generosity fundraising processes efficiently. Pushpay’s church management system provides church administrators and ministry leaders with insight-driven reporting, donor development dashboards, and automation of financial workflows.
Using the power of generative AI, Pushpay developed an innovative agentic AI search feature built for the unique needs of ministries. The approach uses natural language processing so ministry staff can ask questions in plain English and generate real-time, actionable insights from their community data. The AI search feature addresses a critical challenge faced by ministry leaders: the need for quick access to community insights without requiring technical expertise. For example, ministry leaders can enter “show me people who are members in a group, but haven’t given this year” or “show me people who are not engaged in my church,” and use the results to take meaningful action to better support individuals in their community. Most community leaders are time-constrained and lack technical backgrounds; they can use this solution to obtain meaningful data about their congregations in seconds using natural language queries.
By empowering ministry staff with faster access to community insights, the AI search feature supports Pushpay’s mission to encourage generosity and connection between churches and their community members. Early adoption users report that this solution has shortened their time to insights from minutes to seconds. To achieve this result, the Pushpay team built the feature using agentic AI capabilities on Amazon Web Services (AWS) while implementing robust quality assurance measures and establishing a rapid iterative feedback loop for continuous improvements.
In this post, we walk you through Pushpay’s journey in building this solution and explore how Pushpay used Amazon Bedrock to create a custom generative AI evaluation framework for continuous quality assurance and establishing rapid iteration feedback loops on AWS.
Solution overview: AI powered search architecture
The solution consists of several key components that work together to deliver an enhanced search experience. The following figure shows the solution architecture diagram and the overall workflow.

Figure 1: AI Search Solution Architecture

User interface layer: The solution begins with Pushpay users submitting natural language queries through the existing Pushpay application interface. By using natural language queries, church ministry staff can obtain data insights using AI capabilities without learning new tools or interfaces.
AI search agent: At the heart of the system lies the AI search agent, which consists of two key components:

System prompt: Contains the large language model (LLM) role definitions, instructions, and application descriptions that guide the agent’s behavior.
Dynamic prompt constructor (DPC): automatically constructs additional customized system prompts based on the user specific information, such as church context, sample queries, and application filter inventory. They also use semantic search to select only relevant filters among hundreds of available application filters. The DPC improves response accuracy and user experience.

Amazon Bedrock advanced feature: The solution uses the following Amazon Bedrock managed services:

Prompt caching: Reduces latency and costs by caching frequently used system prompt.
LLM processing: Uses Claude Sonnet 4.5 to process prompts and generate JSON output required by the application to display the desired query results as insights to users.

Evaluation system: The evaluation system implements a closed-loop improvement solution where user interactions are instrumented, captured and evaluated offline. The evaluation results feed into a dashboard for product and engineering teams to analyze and drive iterative improvements to the AI search agent. During this process, the data science team collects a golden dataset and continuously curates this dataset based on the actual user queries coupled with validated responses.

The challenges of initial solution without evaluation
To create the AI search feature, Pushpay developed the first iteration of the AI search agent. The solution implements a single agent configured with a carefully tuned system prompt that includes the system role, instructions, and how the user interface works with detailed explanation of each filter tool and their sub-settings. The system prompt is cached using Amazon Bedrock prompt caching to reduce token cost and latency. The agent uses the system prompt to invoke an Amazon Bedrock LLM which generates the JSON document that Pushpay’s application uses to apply filters and present query results to users.
However, this first iteration quickly revealed some limitations. While it demonstrated a 60-70% success rate with basic business queries, the team reached an accuracy plateau. The evaluation of the agent was a manual and tedious process Tuning the system prompt beyond this accuracy threshold proved challenging given the diverse spectrum of user queries and the application’s coverage of over 100 distinct configurable filters. These presented critical blockers for the team’s path to production.
Figure 2: AI Search First Solution
Improving the solution by adding a custom generative AI evaluation framework
To address the challenges of measuring and improving agent accuracy, the team implemented a generative AI evaluation framework integrated into the existing architecture, shown in the following figure. This framework consists of four key components that work together to provide comprehensive performance insights and enable data-driven improvements.

Figure 3: Introducing the GenAI Evaluation Framework

The golden dataset: A curated golden dataset containing over 300 representative queries, each paired with its corresponding expected output, forms the foundation of automated evaluation. The product and data science teams carefully developed and validated this dataset to achieve comprehensive coverage of real-world use cases and edge cases. Additionally, there is a continuous curation process of adding representative actual user queries with validated results.
The evaluator: The evaluator component processes user input queries and compares the agent-generated output against the golden dataset using the LLM as a judge pattern This approach generates core accuracy metrics while capturing detailed logs and performance data, such as latency, for further analysis and debugging.
Domain category: Domain categories are developed using a combination of generative AI domain summarization and human-defined regular expressions to effectively categorize user queries. The evaluator determines the domain category for each query, enabling nuanced, category-based evaluation as an additional dimension of evaluation metrics.
Generative AI evaluation dashboard: The dashboard serves as the mission control for Pushpay’s product and engineering teams, displaying domain category-level metrics to assess performance and latency and guide decisions. It shifts the team from single aggregate scores to nuanced, domain-based performance insights.

The accuracy dashboard: Pinpointing weaknesses by domain
Because user queries are categorized into domain categories, the dashboard incorporates statistical confidence visualization using a 95% Wilson score interval to display accuracy metrics and query volumes at each domain level. By using categories, the team can pinpoint the AI agent’s weaknesses by domain. In the following example , the “activity” domain shows significantly lower accuracy than other categories.

Figure 4: Pinpointing Agent Weaknesses by Domain
Additionally, a performance dashboard, shown in the following figure, visualizes latency indicators at the domain category level, including latency distributions from p50 to p90 percentiles. In the following example, the activity domain exhibits notably higher latency than others.

Figure 5: Identifying Latency Bottlenecks by Domain
Strategic rollout through domain-Level insights
Domain-based metrics revealed varying performance levels across semantic domains, providing crucial insights into agent effectiveness. Pushpay used this granular visibility to make strategic feature rollout decisions. By temporarily suppressing underperforming categories—such as activity queries—while undergoing optimization, the system achieved 95% overall accuracy. By using this approach, users experienced only the highest-performing features while the team refined others to production standards.

Figure 6: Achieving 95% Accuracy with Domain-Level Feature Rollout
Strategic prioritization: Focusing on high-impact domains
To prioritize improvements systematically, Pushpay employed a 2×2 matrix framework plotting topics against two dimensions (shown in the following figure): Business priority (vertical axis) and current performance or feasibility (horizontal axis). This visualization placed topics with both high business value and strong existing performance in the top-right quadrant. The team then focused on these areas because they required less heavy lifting to achieve further accuracy improvement from already-good levels to an exceptional 95% accuracy for the business focused topics.
The implementation followed an iterative cycle: after each round of enhancements, they re-analyze the results to identify the next set of high-potential topics. This systematic, cyclical approach enabled continuous optimization while maintaining focus on business-critical areas.

Figure 7: Strategic Prioritization Framework for Domain Category Optimization
Dynamic prompt construction
The insights gained from the evaluation framework led to an architectural enhancement: the introduction of a dynamic prompt constructor. This component enabled rapid iterative improvements by allowing fine-grained control over which domain categories the agent could address. The structured field inventory – previously embedded in the system prompt – was transformed into a dynamic element, using semantic search to construct contextually relevant prompts for each user query. This approach tailors the prompt filter inventory based on three key contextual dimensions: query content, user persona, and tenant-specific requirements. The result is a more precise and efficient system that generates highly relevant responses while maintaining the flexibility needed for continuous optimization.
Business impact
The generative AI evaluation framework became the cornerstone of Pushpay’s AI feature development, delivering measurable value across three dimensions:

User experience: The AI search feature reduced time-to-insight from approximately 120 seconds (experienced users manually navigating complex UX) to under 4 seconds – a 15-fold acceleration that directly helps enhance ministry leaders’ productivity and decision-making speed. This feature democratized data insights, so that users of different technical levels can access meaningful intelligence without requiring specialized expertise.
Development velocity: The scientific evaluation approach transformed optimization cycles. Rather than debating prompt modifications, the team now validates changes and measures domain-specific impacts within minutes, replacing prolonged deliberations with data-driven iteration.
Production readiness: Improvements from 60–70% accuracy to more than 95% accuracy using high-performance domains provided the quantitative confidence required for customer-facing deployment, while the framework’s architecture enables continuous refinement across other domain categories.

Key takeaways for your AI agent journey
The following are key takeaways from Pushpay’s experience that you can use in your own AI agent journey.
1/ Build with production in mind from day one
Building agentic AI systems is straightforward, but scaling them to production is challenging. Developers should adopt a scaling mindset during the proof-of-concept phase, not after. Implementing robust tracing and evaluation frameworks early, provides a clear pathway from experimentation to production. By using this method, teams can identify and address accuracy issues systematically before they become blockers.
2/ Take advantage of the advanced features of Amazon Bedrock
Amazon Bedrock prompt caching significantly reduces token costs and latency by caching frequently used system prompts. For agents with large, stable system prompts, this feature is essential for production-grade performance.
3/ Think beyond aggregate metrics
Aggregate accuracy scores can sometimes mask critical performance variations. By evaluating agent performance at the domain category level, Pushpay uncovered weaknesses beyond what a single accuracy metric can capture. This granular approach enables targeted optimization and informed rollout decisions, making sure users only experience high-performing features while others are refined.
4/ Data security and responsible AI
When developing agentic AI systems, consider information protection and LLM security considerations from the outset, following the AWS Shared Responsibility Model, because security requirements fundamentally impact the architectural design. Pushpay’s customers are churches and faith-based organizations who are stewards of sensitive information—including pastoral care conversations, financial giving patterns, family struggles, prayer requests and more. In this implementation example, Pushpay set a clear approach to incorporating AI ethically within its product ecosystem, maintaining strict security standards to ensure church data and personally identifiable information (PII) remains within its secure partnership ecosystem. Data is shared only with secure and appropriate data protections applied and is never used to train external models. To learn more about Pushpay’s standards for incorporating AI within their products, visit the Pushpay Knowledge Center for a more in-depth review of company standards.
Conclusion: Your Path to Production-Ready AI Agents
Pushpay’s journey from a 60–70% accuracy prototype to a 95% accurate production-ready AI agent demonstrates that building reliable agentic AI systems requires more than just sophisticated prompts—it demands a scientific, data-driven approach to evaluation and optimization. The key breakthrough wasn’t in the AI technology itself, but in implementing a comprehensive evaluation framework built on strong observability foundation that provided granular visibility into agent performance across different domains. This systematic approach enabled rapid iteration, strategic rollout decisions, and continuous improvement.
Ready to build your own production-ready AI agent?

Explore Amazon Bedrock: Begin building your agent with Amazon Bedrock
Implement LLM-as-a-judge: Create your own evaluation system using the patterns described in this LLM-as-a-judge on Amazon Bedrock Model Evaluation
Build your golden dataset: Start curating representative queries and expected outputs for your specific use case

About the authors
Roger Wang is a Senior Solution Architect at AWS. He is a seasoned architect with over 20 years of experience in the software industry. He helps New Zealand and global software and SaaS companies use cutting-edge technology at AWS to solve complex business challenges. Roger is passionate about bridging the gap between business drivers and technological capabilities and thrives on facilitating conversations that drive impactful results.
Melanie Li, PhD, is a Senior Generative AI Specialist Solutions Architect at AWS based in Sydney, Australia, where her focus is on working with customers to build solutions leveraging state-of-the-art AI and machine learning tools. She has been actively involved in multiple Generative AI initiatives across APJ, harnessing the power of Large Language Models (LLMs). Prior to joining AWS, Dr. Li held data science roles in the financial and retail industries.
Frank Huang, PhD, is a Senior Analytics Specialist Solutions Architect at AWS based in Auckland, New Zealand. He focuses on helping customers deliver advanced analytics and AI/ML solutions. Throughout his career, Frank has worked across a variety of industries such as financial services, Web3, hospitality, media and entertainment, and telecommunications. Frank is eager to use his deep expertise in cloud architecture, AIOps, and end-to-end solution delivery to help customers achieve tangible business outcomes with the power of data and AI.
Saurabh Gupta is a data science and AI professional at Pushpay based in Auckland, New Zealand, where he focuses on implementing practical AI solutions and statistical modeling. He has extensive experience in machine learning, data science, and Python for data science applications, with specialized experience training in database agents and AI implementation. Prior to his current role, he gained experience in telecom, retail and financial services, developing expertise in marketing analytics and customer retention programs. He has a Master’s in Statistics from University of Auckland and a Master’s in Business Administration from the Indian Institute of Management, Calcutta.
Todd Colby is a Senior Software Engineer at Pushpay based in Seattle. His expertise is focused on evolving complex legacy applications with AI, and translating user needs into structured, high-accuracy solutions. He leverages AI to increase delivery velocity and produce cutting edge metrics and business decision tools.

Build an intelligent contract management solution with Amazon Quick Su …

Organizations managing hundreds of contracts annually face significant inefficiencies, with fragmented systems and complex workflows that require teams to spend hours on contract review cycles. This solution addresses these challenges through multi-agent collaboration—specialized AI agents that can work simultaneously on different aspects of contract analysis, reducing cycle times while maintaining accuracy and oversight.
This guide demonstrates how to build an intelligent contract management solution using Amazon Quick Suite as your primary contract management solution, augmented with Amazon Bedrock AgentCore for advanced multi-agent capabilities.
Why Quick Suite augmented with Amazon Bedrock AgentCore
Quick Suite serves as your agentic workspace, providing a unified interface for chat, research, business intelligence, and automation. Quick Suite helps you seamlessly transition from getting answers to taking action, while also automating tasks from routine daily activities to complex business processes such as contract processing and analysis.
By using Amazon Bedrock AgentCore with Quick Suite, you can encapsulate business logic in highly capable AI agents more securely at scale. AgentCore services work with many frameworks including Strands Agents, in addition to foundation models in or outside of Amazon Bedrock.
Solution overview
This solution demonstrates an intelligent contract management system using Quick Suite as the user interface and knowledge base, with Amazon Bedrock AgentCore providing multi-agent collaboration functionality. The system uses specialized agents to analyze contracts, assess risks, evaluate compliance, and provide structured insights through a streamlined architecture, shown in the following figure.

Architecture components
The components of the solution architecture include:

Quick Suite components:

Spaces for contract management workflows
Chat agents for conversational contract interactions
Knowledge bases for integrating legal documents stored in Amazon S3
Topics for integrating structured contract data
Actions for connecting to custom agents developed with Amazon Bedrock AgentCore
Flows for recurring semi-manual document review processes
Automate for daily and monthly contract automation tasks

Multi-agent system powered by AgentCore:

Contract collaboration agent: Central orchestrator coordinating workflow
Legal agent: Analyzes legal terms and extracts key obligations
Risk agent: Assesses financial and operational risks
Compliance agent: Evaluates regulatory compliance

Supporting infrastructure:

Amazon API Gateway and AWS Lambda for managing API requests
Amazon Simple Storage Service (Amazon S3) for document storage
Amazon Redshift for structured data

Contract management workflow
The solution implements a streamlined contract management workflow that significantly reduces processing time while improving accuracy. The system processes contracts through coordinated AI agents, typically completing analysis within minutes compared to days of manual review.

Agent type
Primary function
Key outputs

Contract collaboration agent
Central orchestrator and workflow manager
Document routing decisions, and consolidated results

Legal agent
Legal term analysis and obligation extraction
Party details, key terms, obligations, and risk flags

Risk agent
Financial and operational risk assessment
Risk scores, exposure metrics, and negotiation recommendations

Compliance agent
Regulatory compliance evaluation
Compliance status, regulatory flags, and remediation suggestions

Let’s explore an example of processing a sample service agreement contract. The workflow consists of the following steps:

The contract collaboration agent identifies the document as requiring legal, risk, and compliance analysis.
The legal agent extracts parties, payment terms, and obligations.
The risk agent identifies financial exposure and negotiation leverage points.
The compliance agent evaluates regulatory requirements and flags potential issues.
The contract collaboration agent consolidates findings into a comprehensive report.

Prerequisites
Before setting up Quick Suite, make sure you have:

An AWS account with administrative permissions
Access to supported AWS Regions where Quick Suite is available
Appropriate AWS Identity and Access Management (IAM) roles and policies for Quick Suite service access

Setup part 1: Set up Quick Suite
In the following steps we set up the Quick Suite components.
Enable Quick Suite
Your AWS administrator can enable Quick Suite by:

Signing in to the AWS Management Console
Navigating to Quick Suite from the console
Subscribing to Quick Suite service for your organization
Configuring identity and access management as needed

After Quick Suite is enabled, navigate to the Amazon Quick Suite web interface and sign in with your credentials.
Create the contract management space
In Quick Suite, create a new space called Contract Management to organize your contract-related workflows and resources. You can then use the assistant on the right to ask queries about the resources in the space. The following figure shows the initial space.

Set up a knowledge base for unstructured data (Amazon S3)
Follow these steps:

Navigate to Knowledge bases: In the Integrations section, select Knowledge bases.
Add Amazon S3 integration:

Select Amazon S3 as your data source.
Configure the S3 bucket that will store your contract documents.
After the knowledge base is created, add it to the Contract Management space.

Set up a knowledge base for structured data (Amazon Redshift)
Follow these steps:

Add dataset: In the Datasets section, configure your contract data warehouse (Amazon Redshift) for structured contract data. Follow these instructions in Creating a dataset from a database and wait until your dataset is configured.
Add data topics: In the Topics section, integrate structured contract data sources such as:

Contract databases
Vendor information systems
Compliance tracking systems

For adding topics in Quick Suite, see Adding datasets to a topic in Amazon Quick Sight.

Add topics to your space: Add the relevant topics to your Contract Management space.

Setup part 2: Deploy Amazon Bedrock AgentCore
Amazon Bedrock AgentCore provides enterprise-grade infrastructure for deploying AI agents with session isolation, where each session runs with isolated CPU, memory, and filesystem resources. This creates separation between user sessions, helping to safeguard stateful agent reasoning processes.

You can find the required code in this GitHub repository. Go to the subfolder legal-contract-solution/deployment.
The solution includes a comprehensive deploy_agents.py script that handles the complete deployment of the AI agents to AWS using cloud-centered builds. These instructions require Python>=3.10.

pip3 install -r requirements.txt
python3 deploy_agents.py

What the deployment script does
The deployment process is fully automated and handles:

Dependency management:

Automatically installs bedrock-agentcore-starter-toolkit if needed
Verifies the required Python packages are available

AWS infrastructure setup:

Creates IAM roles with the necessary permissions for agent execution
Sets up Amazon Elastic Container Registry (Amazon ECR) repository for container images
Configures Amazon CloudWatch logging for monitoring

Agent deployment:

Deploys four specialized agents
Uses AWS CodeBuild for cloud-centered ARM64 container builds
No local Docker required—the builds happen in AWS infrastructure

Configuration management:

Automatically configures agent communication protocols
Sets up security boundaries between agents
Establishes monitoring and observability

After the agents are deployed, you can see them in the Amazon Bedrock AgentCore console, as shown in the following figure.

Setup part 3: Integrate Amazon Bedrock AgentCore with Quick Suite
Quick Suite can connect to enterprise solutions and agents through actions integrations, making tools available to chat agents and automation workflows.
Deploy API Gateway and Lambda 
Go to the subfolder legal-contract-solution/deployment and run the following command: python3 deploy_quicksuite_integration.py
This will provision Amazon Cognito with a user pool to permission access to the API Gateway endpoint. The Quick Suite configuration references the OAuth details for this user pool. After successful deployment, two files will be generated for your Quick Suite integration:

quicksuite_integration_config.json – Complete configuration
quicksuite_openapi_schema.json– OpenAPI schema for Quick Suite import

Set up actions integration in Quick Suite
In the Actions section, prepare the integration points that will connect to your agents deployed by AgentCore:

Get the OpenAPI specification file quicksuite_openapi_schema.json from the working folder.
In the Integrations/Actions section, go to OpenAPI Specification. Create a new OpenAPI integration by uploading the api_gateway_openapi_schema.json file, and enter the following Name and Description for the provided agents. Enter the endpoint with the URL by using the information from the quicksuite_integration_config.json file.

Name: Legal Contract Analyzer
Description: Analyze a legal contract using AI agents for clause extraction, risk assessment, and compliance checking

Set up chat agent definition details
In the Chat agents section, set up the following agent and enter the following details:

Name: Legal Contract AI Analyzer
Description:

An AI-powered system that analyzes legal contracts and performs comprehensive risk
assessments using advanced machine learning capabilities to identify potential issues,
compliance gaps, and contractual risks.

Agent identity:

You are an expert legal contract analysis AI system powered by advanced GenAI
capabilities. Your purpose is to provide comprehensive contract review and risk
assessment services.

Persona instructions:

Use the legal contract analyzer when possible. Always categorize risks by
severity (High, Medium, Low). Highlight non-standard clauses, missing provisions,
and potential compliance issues. Provide specific recommendations for contract improvements.
When analyzing liability clauses, pay special attention to indemnification, limitation of
liability, and force majeure provisions. Flag any unusual termination conditions or intellectual
property concerns.

Communication style: Professional, precise, and analytical with clear legal terminology.
Response format: 

Provide structured analysis with clear risk categorization, severity levels, and actionable
recommendations. Use bullet points for key findings and numbered lists for prioritized recommendations.

Length: 

Comprehensive analysis covering all critical aspects while maintaining clarity and focus on actionable insights.

Welcome message: 

Welcome to the Legal Contract AI Analyzer. Upload contracts for intelligent analysis and risk assessment.

Suggested prompts: 

Analyze this contract for potential legal risks and compliance issues
Review the liability clauses in this agreement for red flags
Assess the termination conditions and notice requirements in this contract

Test your contract management solution
Now that you’ve deployed the infrastructure and configured Quick Suite, you can test the contract management solution by selecting the Contract Management space. You can use the agent interface to ask questions about the knowledge base and instruct agents to review the documents. Your space will look like the following figure:
Clean up
There are associated infrastructure costs with the deployed solution. Once you no longer need it in your AWS account, you can go to the subfolder legal-contract-solution/deployment and run the following command for clean up:python3 cleanup.py
Conclusion
The combination of Amazon Quick Suite and Amazon Bedrock AgentCore offers procurement and legal teams immediate operational benefits while positioning them for future AI advancements. You can use Amazon Bedrock multi-agent collaboration to build and manage multiple specialized agents that work together to address increasingly complex business workflows. By implementing this intelligent contract management solution, you can transform your organization’s procurement processes, reduce contract cycle times, and enable your teams to focus on strategic decision-making rather than administrative tasks. Because of the solution’s extensible architecture, you can start with core contract management functions and gradually expand to address more complex use cases as your organization’s needs evolve. Whether you’re looking to streamline routine contract reviews or implement comprehensive procurement transformation, the intelligent contract management solution provides a powerful foundation for achieving your business objectives. To learn more about Amazon Quick Suite and Amazon Bedrock AgentCore, see:

Amazon Quick Suite
Amazon Bedrock AgentCore

About the authors
Oliver Steffmann is a Principal Solutions Architect at AWS based in New York and is passionate about GenAI and public blockchain use cases. He has over 20 years of experience working with financial institutions and helps his customers get their cloud transformation off the ground. Outside of work he enjoys spending time with his family and training for the next Ironman.
David Dai is an Enterprise Solutions Architect at AWS based in New York. He works with customers across various industries, helping them design and implement cloud solutions that drive business value. David is passionate about cloud architecture and enjoys guiding organizations through their digital transformation journeys. Outside of work, he values spending quality time with family and exploring the latest technologies.
Krishna Pramod is a Senior Solutions Architect at AWS. He works as a trusted advisor for customers, guiding them through innovation with modern technologies and development of well-architected applications in the AWS cloud. Outside of work, Krishna enjoys reading, music and exploring new destinations.
Malhar Mane is an Enterprise Solutions Architect at AWS based in Seattle, where he serves as a trusted advisor to enterprise customers across diverse industries. With a deep passion for Generative AI and storage solutions, Malhar specializes in guiding organizations through their cloud transformation journeys and helping them harness the power of generative AI to optimize business operations and drive innovation. Malhar holds a Bachelor’s degree in Computer Science from the University of California, Irvine. In his free time, Malhar enjoys hiking and exploring national parks.
Praveen Panati is a Senior Solutions Architect at Amazon Web Services. He is passionate about cloud computing and works with AWS enterprise customers to architect, build, and scale cloud-based applications to achieve their business goals. Praveen’s area of expertise includes cloud computing, big data, streaming analytics, and software engineering.
Sesan Komaiya is a Solutions Architect at Amazon Web Services. He works with a variety of customers, helping them with cloud adoption, cost optimization and emerging technologies. Sesan has over 15 year’s experience in Enterprise IT and has been at AWS for 5 years. In his free time, Sesan enjoys watching various sporting activities like Soccer, Tennis and Moto sport. He has 2 kids that also keeps him busy at home.

How a Haystack-Powered Multi-Agent System Detects Incidents, Investiga …

In this tutorial, we design this implementation to demonstrate how Haystack enables building advanced, agentic AI systems that go far beyond toy examples while remaining fully runnable. We focus on a cohesive, end-to-end setup that highlights orchestration, stateful decision-making, tool execution, and structured control flow, demonstrating how complex agent behavior can be cleanly expressed. We deliberately keep everything in a single executable snippet to emphasize reproducibility and to make it easy for us to experiment, extend, and stress-test the system in realistic scenarios. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserimport os, json, math, random, textwrap
from datetime import datetime, timedelta

try:
import pandas as pd
except Exception:
os.system(“pip -q install pandas”)
import pandas as pd

try:
import numpy as np
except Exception:
os.system(“pip -q install numpy”)
import numpy as np

try:
import duckdb
except Exception:
os.system(“pip -q install duckdb”)
import duckdb

os.system(“pip -q install haystack-ai openai”)

from haystack.components.agents import Agent
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from haystack.tools import tool
from haystack.components.agents.state import State
from haystack.components.agents.state.state_utils import merge_lists
from haystack.tools import ComponentTool

from getpass import getpass

if not os.getenv(“OPENAI_API_KEY”):
key = getpass(“Enter OPENAI_API_KEY (input hidden): “).strip()
if key:
os.environ[“OPENAI_API_KEY”] = key

if not os.getenv(“OPENAI_API_KEY”):
raise RuntimeError(“OPENAI_API_KEY missing. Set it in the environment or paste when prompted.”)

We install and import all required libraries, ensuring that Haystack, OpenAI, and data tooling are available, and securely load the OpenAI API key at runtime. We configure the environment to gracefully handle missing dependencies and prompt for credentials without hardcoding sensitive information. We prepare the foundation for an agent-driven workflow by initializing core Haystack components, tools, and state utilities in a Colab-ready setup. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserrandom.seed(7)
np.random.seed(7)

now = datetime.utcnow()
start = now – timedelta(hours=24)

services = [“api-gateway”, “payments”, “auth”, “db-proxy”, “worker”, “web”]
regions = [“eu-central-1”, “eu-west-1”, “us-east-1”]
levels = [“INFO”, “WARN”, “ERROR”]
error_kinds = [
“UpstreamTimeout”,
“DBConnPoolExhausted”,
“JWTSignatureInvalid”,
“RateLimitExceeded”,
“DeadlockDetected”,
“CacheMissStorm”,
“OOMKilled”,
“TLSHandshakeFailure”,
]

def synth_metrics(n=1440):
ts = [start + timedelta(minutes=i) for i in range(n)]
base_rps = 220 + 40*np.sin(np.linspace(0, 8*math.pi, n)) + np.random.normal(0, 10, n)
base_p95 = 180 + 30*np.sin(np.linspace(0, 6*math.pi, n) + 0.5) + np.random.normal(0, 8, n)
base_err = np.clip(np.random.normal(0.006, 0.002, n), 0.0, 0.05)
incident_t0 = int(n*0.62)
incident_t1 = incident_t0 + int(n*0.10)
base_p95[incident_t0:incident_t1] += np.linspace(120, 520, incident_t1-incident_t0)
base_err[incident_t0:incident_t1] += np.linspace(0.01, 0.07, incident_t1-incident_t0)
base_rps[incident_t0:incident_t1] -= np.linspace(5, 80, incident_t1-incident_t0)
df = pd.DataFrame({
“ts”: ts,
“rps”: np.clip(base_rps, 5, None),
“p95_ms”: np.clip(base_p95, 10, None),
“error_rate”: np.clip(base_err, 0.0, 0.2),
})
return df, (ts[incident_t0], ts[incident_t1])

metrics_df, (incident_begin, incident_end) = synth_metrics()

We seed randomness and generate a realistic 24-hour stream of synthetic service metrics with periodic behavior and noise. We deliberately introduce an incident window during which latency and error rates spike while request throughput degrades. We return both the metrics DataFrame and precise incident boundaries to support downstream detection and agent reasoning. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserdef synth_logs(n=9000):
rows = []
for _ in range(n):
t = start + timedelta(seconds=random.randint(0, 24*3600-1))
svc = random.choice(services)
reg = random.choice(regions)
lvl = random.choices(levels, weights=[0.86, 0.10, 0.04])[0]
kind = None
msg = “ok”
latency = max(5, int(np.random.normal(120, 55)))
if incident_begin <= t <= incident_end and svc in [“api-gateway”, “payments”, “db-proxy”]:
if random.random() < 0.24:
lvl = random.choices([“WARN”,”ERROR”], weights=[0.55,0.45])[0]
kind = random.choices(
[“UpstreamTimeout”,”DBConnPoolExhausted”,”DeadlockDetected”,”CacheMissStorm”],
weights=[0.40,0.28,0.10,0.22]
)[0]
latency += random.randint(300, 1200)
msg = f”{kind}: request failed”
if lvl == “ERROR” and kind is None and random.random() < 0.45:
kind = random.choice(error_kinds)
msg = f”{kind}: unexpected failure”
latency += random.randint(80, 700)
trace = f”tr_{random.randint(10**7,10**8-1)}”
user = f”u_{random.randint(1,20000)}”
endpoint = random.choice([“/pay”,”/auth”,”/refund”,”/status”,”/checkout”,”/profile”,”/ledger”])
rows.append({
“ts”: t,
“service”: svc,
“region”: reg,
“level”: lvl,
“error_kind”: kind or “”,
“endpoint”: endpoint,
“latency_ms”: latency,
“trace_id”: trace,
“user_id”: user,
“message”: msg
})
df = pd.DataFrame(rows).sort_values(“ts”).reset_index(drop=True)
return df

logs_df = synth_logs()

metrics_path = “/content/metrics.csv”
logs_path = “/content/logs.csv”
metrics_df.to_csv(metrics_path, index=False)
logs_df.to_csv(logs_path, index=False)

con = duckdb.connect(database=”:memory:”)
con.execute(“CREATE TABLE metrics AS SELECT * FROM read_csv_auto(?, HEADER=TRUE)”, [metrics_path])
con.execute(“CREATE TABLE logs AS SELECT * FROM read_csv_auto(?, HEADER=TRUE)”, [logs_path])

We synthesize high-volume, time-distributed logs with realistic service, region, severity, and error patterns that intensify during the incident window. We persist both metrics and logs to CSV and load them into an in-memory DuckDB database for fast analytical queries. We prepare a unified, queryable observability dataset that supports correlation between latency, errors, and log-level signals. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserdef zscore_anomalies(series, window=60, z=3.0):
x = series.astype(float).values
out = np.zeros_like(x, dtype=bool)
for i in range(len(x)):
lo = max(0, i-window)
hi = i
if hi – lo < max(10, window//4):
continue
mu = float(np.mean(x[lo:hi]))
sd = float(np.std(x[lo:hi])) + 1e-9
out[i] = abs((x[i]-mu)/sd) >= z
return out

@tool
def load_inputs(metrics_csv_path: str, logs_csv_path: str) -> dict:
m = pd.read_csv(metrics_csv_path, parse_dates=[“ts”])
l = pd.read_csv(logs_csv_path, parse_dates=[“ts”])
return {
“metrics_summary”: {
“rows”: int(len(m)),
“start”: str(m[“ts”].min()),
“end”: str(m[“ts”].max()),
“cols”: list(m.columns)
},
“logs_summary”: {
“rows”: int(len(l)),
“start”: str(l[“ts”].min()),
“end”: str(l[“ts”].max()),
“cols”: list(l.columns),
“services”: sorted(l[“service”].unique().tolist()),
“regions”: sorted(l[“region”].unique().tolist())
}
}

@tool
def detect_incident_window(metric: str, z_threshold: float = 3.2, min_span_minutes: int = 10) -> dict:
if metric not in [“rps”,”p95_ms”,”error_rate”]:
return {“error”: “metric must be one of: rps, p95_ms, error_rate”}
df = metrics_df.copy().sort_values(“ts”)
flags = zscore_anomalies(df[metric], window=75, z=float(z_threshold))
df[“flag”] = flags
idx = np.where(df[“flag”].values)[0]
if len(idx) == 0:
return {“found”: False}
groups = []
cur = [idx[0]]
for i in idx[1:]:
if i == cur[-1] + 1:
cur.append(i)
else:
groups.append(cur)
cur = [i]
groups.append(cur)
spans = []
for g in groups:
t0 = df.loc[g[0], “ts”]
t1 = df.loc[g[-1], “ts”]
span = (t1 – t0).total_seconds() / 60.0
if span >= float(min_span_minutes):
spans.append((span, t0, t1, int(len(g))))
spans.sort(key=lambda x: (-x[0], -x[3]))
if not spans:
best = max(groups, key=len)
t0 = df.loc[best[0], “ts”]
t1 = df.loc[best[-1], “ts”]
return {“found”: True, “metric”: metric, “start”: str(t0), “end”: str(t1), “points”: int(len(best)), “note”: “short anomaly span; consider lowering min_span_minutes”}
best = spans[0]
return {“found”: True, “metric”: metric, “start”: str(best[1]), “end”: str(best[2]), “minutes”: float(best[0]), “points”: int(best[3])}

We implement a rolling z-score detector to flag statistically significant deviations in key metrics over time. We expose tools that load observability inputs and summarize their structure to ground the agent’s reasoning. We detect and rank contiguous anomaly windows, returning the most meaningful incident span with clear temporal boundaries. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browser@tool
def sql_investigate(query: str) -> dict:
try:
df = con.execute(query).df()
head = df.head(30)
return {
“rows”: int(len(df)),
“columns”: list(df.columns),
“preview”: head.to_dict(orient=”records”)
}
except Exception as e:
return {“error”: str(e)}

@tool
def log_pattern_scan(window_start_iso: str, window_end_iso: str, top_k: int = 8) -> dict:
ws = pd.to_datetime(window_start_iso)
we = pd.to_datetime(window_end_iso)
df = logs_df[(logs_df[“ts”] >= ws) & (logs_df[“ts”] <= we)].copy()
if df.empty:
return {“rows”: 0, “top_error_kinds”: [], “top_services”: [], “top_endpoints”: []}
df[“error_kind_norm”] = df[“error_kind”].fillna(“”).replace(“”, “NONE”)
err = df[df[“level”].isin([“WARN”,”ERROR”])].copy()
top_err = err[“error_kind_norm”].value_counts().head(int(top_k)).to_dict()
top_svc = err[“service”].value_counts().head(int(top_k)).to_dict()
top_ep = err[“endpoint”].value_counts().head(int(top_k)).to_dict()
by_region = err.groupby(“region”).size().sort_values(ascending=False).head(int(top_k)).to_dict()
p95_latency = float(np.percentile(df[“latency_ms”].values, 95))
return {
“rows”: int(len(df)),
“warn_error_rows”: int(len(err)),
“p95_latency_ms”: p95_latency,
“top_error_kinds”: top_err,
“top_services”: top_svc,
“top_endpoints”: top_ep,
“error_by_region”: by_region
}

@tool
def propose_mitigations(hypothesis: str) -> dict:
h = hypothesis.lower()
mitigations = []
if “conn” in h or “pool” in h or “db” in h:
mitigations += [
{“action”: “Increase DB connection pool size (bounded) and add backpressure at db-proxy”, “owner”: “Platform”, “eta_days”: 3},
{“action”: “Add circuit breaker + adaptive timeouts between api-gateway and db-proxy”, “owner”: “Backend”, “eta_days”: 5},
{“action”: “Tune query hotspots; add indexes for top offending endpoints”, “owner”: “Data/DBA”, “eta_days”: 7},
]
if “timeout” in h or “upstream” in h:
mitigations += [
{“action”: “Implement hedged requests for idempotent calls (carefully) and tighten retry budgets”, “owner”: “Backend”, “eta_days”: 6},
{“action”: “Add upstream SLO-aware load shedding at api-gateway”, “owner”: “Platform”, “eta_days”: 7},
]
if “cache” in h:
mitigations += [
{“action”: “Add request coalescing and negative caching to prevent cache-miss storms”, “owner”: “Backend”, “eta_days”: 6},
{“action”: “Prewarm cache for top endpoints during deploys”, “owner”: “SRE”, “eta_days”: 4},
]
if not mitigations:
mitigations += [
{“action”: “Add targeted dashboards and alerts for the suspected bottleneck metric”, “owner”: “SRE”, “eta_days”: 3},
{“action”: “Run controlled load test to reproduce and validate the hypothesis”, “owner”: “Perf Eng”, “eta_days”: 5},
]
mitigations = mitigations[:10]
return {“hypothesis”: hypothesis, “mitigations”: mitigations}

@tool
def draft_postmortem(title: str, window_start_iso: str, window_end_iso: str, customer_impact: str, suspected_root_cause: str, key_facts_json: str, mitigations_json: str) -> dict:
try:
facts = json.loads(key_facts_json)
except Exception:
facts = {“note”: “key_facts_json was not valid JSON”}
try:
mits = json.loads(mitigations_json)
except Exception:
mits = {“note”: “mitigations_json was not valid JSON”}
doc = {
“title”: title,
“date_utc”: datetime.utcnow().strftime(“%Y-%m-%d”),
“incident_window_utc”: {“start”: window_start_iso, “end”: window_end_iso},
“customer_impact”: customer_impact,
“suspected_root_cause”: suspected_root_cause,
“detection”: {
“how_detected”: “Automated anomaly detection + error-rate spike triage”,
“gaps”: [“Add earlier saturation alerting”, “Improve symptom-to-cause correlation dashboards”]
},
“timeline”: [
{“t”: window_start_iso, “event”: “Symptoms begin (latency/error anomalies)”},
{“t”: “T+10m”, “event”: “On-call begins triage; identifies top services/endpoints”},
{“t”: “T+25m”, “event”: “Mitigation actions initiated (throttling/backpressure)”},
{“t”: window_end_iso, “event”: “Customer impact ends; metrics stabilize”},
],
“key_facts”: facts,
“corrective_actions”: mits.get(“mitigations”, mits),
“followups”: [
{“area”: “Reliability”, “task”: “Add saturation signals + budget-based retries”, “priority”: “P1”},
{“area”: “Observability”, “task”: “Add golden signals per service/endpoint”, “priority”: “P1”},
{“area”: “Performance”, “task”: “Reproduce with load test and validate fix”, “priority”: “P2”},
],
“appendix”: {“notes”: “Generated by a Haystack multi-agent workflow (non-RAG).”}
}
return {“postmortem_json”: doc}

llm = OpenAIChatGenerator(model=”gpt-4o-mini”)

state_schema = {
“metrics_csv_path”: {“type”: str},
“logs_csv_path”: {“type”: str},
“metrics_summary”: {“type”: dict},
“logs_summary”: {“type”: dict},
“incident_window”: {“type”: dict},
“investigation_notes”: {“type”: list, “handler”: merge_lists},
“hypothesis”: {“type”: str},
“key_facts”: {“type”: dict},
“mitigation_plan”: {“type”: dict},
“postmortem”: {“type”: dict},
}

profiler_prompt = “””You are a specialist incident profiler.
Goal: turn raw metrics/log summaries into crisp, high-signal findings.
Rules:
– Prefer calling tools over guessing.
– Output must be a JSON object with keys: window, symptoms, top_contributors, hypothesis, key_facts.
– Hypothesis must be falsifiable and mention at least one specific service and mechanism.
“””

writer_prompt = “””You are a specialist postmortem writer.
Goal: produce a high-quality postmortem JSON (not prose) using the provided evidence and mitigation plan.
Rules:
– Call tools only if needed.
– Keep ‘suspected_root_cause’ specific and not generic.
– Ensure corrective actions have owners and eta_days.
“””

coordinator_prompt = “””You are an incident commander coordinating a non-RAG multi-agent workflow.
You must:
1) Load inputs
2) Find an incident window (use p95_ms or error_rate)
3) Investigate with targeted SQL and log pattern scan
4) Ask the specialist profiler to synthesize evidence
5) Propose mitigations
6) Ask the specialist writer to draft a postmortem JSON
Return a final response with:
– A short executive summary (max 10 lines)
– The postmortem JSON
– A compact runbook checklist (bulleted)
“””

profiler_agent = Agent(
chat_generator=llm,
tools=[load_inputs, detect_incident_window, sql_investigate, log_pattern_scan],
system_prompt=profiler_prompt,
exit_conditions=[“text”],
state_schema=state_schema
)

writer_agent = Agent(
chat_generator=llm,
tools=[draft_postmortem],
system_prompt=writer_prompt,
exit_conditions=[“text”],
state_schema=state_schema
)

profiler_tool = ComponentTool(
component=profiler_agent,
name=”profiler_specialist”,
description=”Synthesizes incident evidence into a falsifiable hypothesis and key facts (JSON output).”,
outputs_to_string={“source”: “last_message”}
)

writer_tool = ComponentTool(
component=writer_agent,
name=”postmortem_writer_specialist”,
description=”Drafts a postmortem JSON using title/window/impact/rca/facts/mitigations.”,
outputs_to_string={“source”: “last_message”}
)

coordinator_agent = Agent(
chat_generator=llm,
tools=[
load_inputs,
detect_incident_window,
sql_investigate,
log_pattern_scan,
propose_mitigations,
profiler_tool,
writer_tool,
draft_postmortem
],
system_prompt=coordinator_prompt,
exit_conditions=[“text”],
state_schema=state_schema
)

We define a suite of investigative, synthesis, and documentation tools that let agents query data, extract patterns, and propose concrete mitigations. We orchestrate specialist profiler and writer agents under a coordinator that drives an end-to-end, non-RAG incident workflow. We configure prompts, state schemas, and tool bridges so the system produces falsifiable hypotheses, actionable plans, and a structured postmortem.Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserprofiler_agent.warm_up()
writer_agent.warm_up()
coordinator_agent.warm_up()

initial_state = {
“metrics_csv_path”: metrics_path,
“logs_csv_path”: logs_path,
“investigation_notes”: []
}

task = “””
We have an incident in the last 24h. Investigate using the provided CSVs.
Constraints:
– Do not use RAG or any document retriever/store.
– Use tools + SQL to ground conclusions.
– Produce a realistic postmortem JSON and a runbook checklist.
“””

result = coordinator_agent.run(
messages=[ChatMessage.from_user(task)],
state=State(schema=state_schema, data=initial_state)
)

last = result[“last_message”].text if “last_message” in result else result[“messages”][-1].text
print(last)

We warm up all agents to ensure tools, prompts, and state transitions are fully initialized before execution. We define the investigation task and initial state, then delegate end-to-end incident handling to the coordinator agent. We execute the workflow and surface the final executive summary, postmortem JSON, and runbook output.

In conclusion, we showed how Haystack supports sophisticated agentic patterns that scale in complexity without becoming fragile or hard to reason about. We demonstrated that, even within a notebook, we can express rich agent logic, maintain explicit state, and coordinate multiple components in a controlled and extensible way. By structuring the system this way, we placed ourselves in a strong position to iterate on more advanced behaviors, evaluate agent decisions, and evolve the tutorial into production-grade agentic workflows.

Check out the FULL CODES here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post How a Haystack-Powered Multi-Agent System Detects Incidents, Investigates Metrics and Logs, and Produces Production-Grade Incident Reviews End-to-End appeared first on MarkTechPost.

NVIDIA Revolutionizes Climate Tech with ‘Earth-2’: The World’s F …

For decades, predicting the weather has been the exclusive domain of massive government supercomputers running complex physics-based equations. NVIDIA has shattered that barrier with the release of the Earth-2 family of open models and tools for AI weather and climate prediction accessible to virtually anyone, from tech startups to national meteorological agencies.

In a move that democratizes climate science, NVIDIA unveiled 3 groundbreaking new models powered by novel architectures: Atlas, StormScope, and HealDA. These tools promise to accelerate forecasting speeds by orders of magnitude while delivering accuracy that rivals or exceeds traditional methods.

The Democratization of Weather Intelligence

Historically, running a high-fidelity weather model required infrastructure that only a few countries could afford. NVIDIA’s Earth-2 changes the calculus by offering an ‘open stack’, a collection of pretrained models, inference libraries, and customization recipes available on platforms like GitHub and Hugging Face.

Mike Pritchard, Director of Climate Simulation at NVIDIA, emphasized that NVIDIA is not becoming a weather service provider. Instead, they are building the “foundational building blocks” that allow nations and companies to build their own sovereign forecasting systems.

“Sovereignty matters. Weather is a national security issue… That’s why we’ve built Earth-2, the world’s first fully open production-ready AI weather stack.” – Mike Pritchard, NVIDIA

Meet the New Heavyweights: Atlas, StormScope, and HealDA

The announcement introduces 3 specific models that address different stages of the forecasting pipeline, from processing messy data to predicting storms weeks in advance.

1. Earth-2 Medium Range (Powered by Atlas)

Targeting the 15-day forecast window, this model uses a new architecture called Atlas. It predicts over 70 weather variables, including wind, humidity, and pressure, at high accuracy.

Performance: On standard industry benchmarks, Atlas has been shown to outperform GenCast, the current leading open model, across the vast majority of variables.

The Shift: It represents a return to “simple, scalable Transformer architectures,” moving away from niche, hand-tailored AI designs.

Read the research paper here.

2. Earth-2 Nowcasting (Powered by StormScope)

This is a game-changer for immediate disaster response. Powered by StormScope, this generative AI model focuses on the 0-to-6-hour window, providing kilometer-scale resolution of local storms.

Why it matters: It is the first AI model to outperform traditional physics-based methods for short-term precipitation forecasting.

Speed: It generates hazardous weather predictions in minutes, giving emergency responders critical time to act.

Sovereignty: Because it trains directly on geostationary satellite imagery rather than region-specific physics outputs, it can be deployed by any nation with good satellite coverage.

Read the research paper.

3. Earth-2 Global Data Assimilation (Powered by HealDA)

Often the unsung hero of forecasting, “data assimilation” is the process of combining messy satellite and balloon data into a coherent snapshot of the atmosphere to start a forecast.

The Breakthrough: Traditional assimilation consumes nearly 50% of supercomputing cycles. NVIDIA’s HealDA architecture accomplishes this task in minutes on GPUs rather than hours on supercomputers.

Result: When combined with the Medium Range model, it produces the most skillful predictions ever seen from an entirely AI-based pipeline.

Read the research paper

Real-World Impact: From Solar Power to Hurricane Risk

The Earth-2 stack is already in use by major global players, proving that AI weather forecasting is ready for commercial and operational prime time.

Renewable Energy: TotalEnergies and GCL (a major solar material producer) are using Earth-2 to predict solar and wind variability. For solar farms, accurate cloud cover prediction can significantly impact energy market trading.

Israel Meteorological Service: Using the CorrDiff model (part of the Earth-2 family), they have achieved a 90% reduction in compute time while generating high-resolution forecasts up to eight times daily.

Insurance & Risk: AXA and S&P Global Energy are leveraging the speed of Earth-2 to run thousands of “counterfactual” scenarios. By simulating thousands of years of hypothetical hurricane data, they can better understand rare, high-impact climate events that haven’t happened yet but might.

Daily Operations: Brightband, an AI weather tool provider, is already integrating Earth-2 Medium Range to issue daily global forecasts.

The Bottom Line

NVIDIA Earth-2 is not just a technical upgrade; it is a structural shift in how humans interact with the climate. By reducing the barrier to entry, shifting from multimillion-dollar supercomputers to accessible GPU-accelerated AI, NVIDIA is enabling a future where hyper-local, high-accuracy weather prediction is ubiquitous.

As extreme weather events become more frequent, tools like StormScope and Atlas will likely become essential infrastructure for governments and industries worldwide.

Earth-2 Medium Range and Nowcasting are available on GitHub, Hugging Face, and NVIDIA Earth2Studio. Earth-2 Global Data Assimilation is expected to be released later this year.

To learn more about getting started with these models, developers can visit the NVIDIA Earth-2 technical blog. Earth-2 Medium Range [Read the research paper], Earth-2 Nowcasting [Read the research paper], and Earth-2 Global Data Assimilation [Read the research paper].
The post NVIDIA Revolutionizes Climate Tech with ‘Earth-2’: The World’s First Fully Open Accelerated AI Weather Stack appeared first on MarkTechPost.

What is Clawdbot? How a Local First Agent Stack Turns Chats into Real …

Clawdbot is an open source personal AI assistant that you run on your own hardware. It connects large language models from providers such as Anthropic and OpenAI to real tools such as messaging apps, files, shell, browser and smart home devices, while keeping the orchestration layer under your control.

The interesting part is not that Clawdbot chats. It is that the project ships a concrete architecture for local first agents, and a typed workflow engine called Lobster that turns model calls into deterministic pipelines.

Architecture: Gateway, Nodes and Skills

At the center of Clawdbot is the Gateway process. The Gateway exposes a WebSocket control plane on ws://127.0.0.1:18789 and a local HTTP interface for the control UI and web chat.

Your messages from WhatsApp, Telegram, Signal, Slack, Discord, iMessage and other channels are delivered to the Gateway. The Gateway decides which agent should handle the message, which tools it may call, and which model provider to use. It then sends the reply back over the same channel.

The runtime is split into a few core concepts:

Gateway: Routing, model calls, tool invocation, sessions, presence and scheduling.

Nodes: Processes that give Clawdbot access to local resources such as file system, browser automation, microphone, camera or platform specific APIs on macOS, Windows, Linux, iOS and Android.

Channels: Integrations for chat systems like WhatsApp, Telegram, Discord, Slack, Signal, Microsoft Teams, Matrix, Zalo and more. These are configured as channel backends that attach to the Gateway.

Skills and plugins: Tools that the agent can call, described in a standard SKILL.md format and distributed through ClawdHub.

This separation lets you run the Gateway on a five dollar virtual server or a spare machine at home, while keeping heavy model compute on remote APIs or local model backends when needed.

Skills and the SKILL.md standard

Clawdbot uses an open skills format described in SKILL.md. A skill is defined in Markdown with a small header and an ordered procedure. For example, a deployment skill might specify steps such as checking git status, running tests and deploying only after success.


name: deploy-production
description: Deploy the current branch to production. Use only after tests pass.
disable-model-invocation: true

1. Check git status ensuring clean working directory.
2. Run `npm test`
3. If tests pass, run `npm run deploy`

The Gateway reads these definitions and exposes them to agents as tools with explicit capabilities and safety constraints. Skills are published to ClawdHub and can be installed or composed into larger workflows.

This means that operational runbooks can move from ad-hoc wiki pages into machine executable skills, while still being auditable as text.

Lobster: Typed Workflow Runtime for Agents

Lobster is the workflow runtime that powers Local Lobster and many advanced Clawdbot automations. It is described as a typed workflow shell that lets Clawdbot run multi step tool sequences as a single deterministic operation with explicit approval gates.

Instead of having the model call many tools in a loop, Lobster moves orchestration into a small domain specific runtime:

Pipelines are defined as JSON or YAML, or as a compact shell like pipeline string.

Steps exchange typed JSON data, not unstructured text.

The runtime enforces timeouts, output limits and sandbox policies.

Workflows can pause on side effects and resume later with a resumeToken.

A simple inbox triage workflow looks like this:

name: inbox-triage
steps:
– id: collect
command: inbox list –json
– id: categorize
command: inbox categorize –json
stdin: $collect.stdout
– id: approve
command: inbox apply –approve
stdin: $categorize.stdout
approval: required
– id: execute
command: inbox apply –execute
stdin: $categorize.stdout
condition: $approve.approved

Clawdbot treats this file as a skill. When you ask it to clean your inbox, it calls one Lobster pipeline instead of improvising many tool calls. The model decides when to run the pipeline and with which parameters, but the pipeline itself stays deterministic and auditable.

Local Lobster is the reference agent that uses Lobster to drive local workflows and is described in coverage as an open source agent that redefines personal AI by pairing local first workflows with proactive behavior.

Proactive local first behavior

A key reason Clawdbot is trending and visible on X and in developer communities is that it behaves like an operator, not just a chat window.

Because the Gateway can run scheduled jobs and track state across sessions, common patterns include:

Daily briefings that summarize calendars, tasks and important mail.

Periodic recaps such as weekly shipped work summaries.

Monitors that watch for conditions, then message you first on your preferred channel.

File and repository automations that run locally but are triggered by natural language.

All of this runs with routing and tool policy on your machine or server. Model calls still go to providers like Anthropic, OpenAI, Google, xAI or local backends, but the assistant brain, memory and integrations are under your control.

Installation and developer workflow

The project provides a one line installer that fetches a script from clawd.bot and bootstraps Node, the Gateway and core components. For more control, you can install via npm or clone the TypeScript repository and build with pnpm.

Typical steps:

curl -fsSL https://clawd.bot/install.sh | bash

# or

npm i -g clawdbot
clawdbot onboard

After onboarding you connect a channel such as Telegram or WhatsApp, choose a model provider and enable skills. From there you can write your own SKILL.md files, build Lobster workflows and expose them through chat, web chat or the macOS companion application.

Some Examples

Just ask @clawdbot to build and deploy a website with a chat message https://t.co/I5bQDCK2Ne pic.twitter.com/EOa1GlPxJe— Peter Yang (@petergyang) January 25, 2026

Just had Clawdbot set up Ollama with a local model. Now it handles website summaries and simple tasks locally instead of burning API credits.Blown away that an AI just installed another AI to save me money. pic.twitter.com/RRvXQAgBfX— Max (@talkaboutdesign) January 25, 2026

Clawdbot is controlling LMStudio remotely from telegram, downloading Qwen, which it will then use to power some of my tasks with Clawdbot. pic.twitter.com/ll2adg19Za— Matthew Berman (@MatthewBerman) January 25, 2026

Clawdbot now takes an idea, manages codex and claude, debates them on reviews autonomously, and lets me know when it’s done. Amazing. A whole feature deployed while I’m out on a walk. pic.twitter.com/ws3UDQG2S0— Aaron Ng (@localghost) January 25, 2026

The post What is Clawdbot? How a Local First Agent Stack Turns Chats into Real Automations appeared first on MarkTechPost.

Build a serverless AI Gateway architecture with AWS AppSync Events

AWS AppSync Events can help you create more secure, scalable Websocket APIs. In addition to broadcasting real-time events to millions of Websocket subscribers, it supports a crucial user experience requirement of your AI Gateway: low-latency propagation of events from your chosen generative AI models to individual users.
In this post, we discuss how to use AppSync Events as the foundation of a capable, serverless, AI gateway architecture. We explore how it integrates with AWS services for comprehensive coverage of the capabilities offered in AI gateway architectures. Finally, we get you started on your journey with sample code you can launch in your account and begin building.
Overview of AI Gateway
AI Gateway is an architectural middleware pattern that helps enhance the availability, security, and observability of large language models (LLMs). It supports the interests of several different personas. For example, users want low latency and delightful experiences. Developers want flexible and extensible architectures. Security staff need governance to protect information and availability. System engineers need monitoring and observability solutions that help them support the user experience. Product managers need information about how well their products perform with users. Budget managers need cost controls. The needs of these different people across your organization are important considerations for hosting generative AI applications.
Solution overview
The solution we share in this post offers the following capabilities:

Identity – Authenticate and authorize users from the built-in user directory, from your enterprise directory, and from consumer identity providers like Amazon, Google, and Facebook
APIs – Provide users and applications low-latency access to your generative AI applications
Authorization – Determine what resources your users have access to in your application
Rate limiting and metering – Mitigate bot traffic, block access, and manage model consumption to manage cost
Diverse model access – Offer access to leading foundation models (FMs), agents, and safeguards to keep users safe
Logging – Observe, troubleshoot, and analyze application behavior
Analytics – Extract value from your logs to build, discover, and share meaningful insights
Monitoring – Track key datapoints that help staff react quickly to events
Caching – Reduce costs by detecting common queries to your models and returned predetermined responses

In the following sections, we dive into the core architecture and explore how you can build these capabilities into the solution.
Identity and APIs
The following diagram illustrates an architecture using the AppSync Events API to provide an interface between an AI assistant application and LLMs through Amazon Bedrock using AWS Lambda.

The workflow consists of the following steps:

The client application retrieves the user identity and authorization to access APIs using Amazon Cognito.
The client application subscribes to the AppSync Events channel, from which it will receive events like streaming responses from the LLMs in Amazon Bedrock.
The SubscribeHandler Lambda function attached to the Outbound Messages namespace verifies that this user is authorized to access the channel.
The client application publishes a message to the Inbound Message channel, such as a question posed to the LLM.
The ChatHandler Lambda function receives the message and verifies the user is authorized to publish messages on that channel.
The ChatHandler function calls the Amazon Bedrock ConverseStream API and waits for the response stream from the Converse API to emit response events.
The ChatHandler function relays the response messages from the Converse API to the Outbound Message channel for the current user, which passes the events to the WebSocket on which the client application is waiting for messages.

AppSync Events namespaces and channels are the building blocks of your communications architecture in your AI Gateway. In the example, namespaces are used to attach different behaviors to our inbound and outbound messages. Each namespace can have different publish and subscribe integration to each namespace. Moreover, each namespace is divided into channels. Our channel structure design provides each user a private inbound and outbound channel, serving as one-to-one communications with the server side:

Inbound-Messages / ${sub}
Outbound-Messages / ${sub}

The subject, or sub attribute, arrives in our Lambda functions as context from Amazon Cognito. It is an unchangeable, unique user identifier within each user pool. This makes it useful for segments of our channel names and is especially useful for authorization.
Authorization
Identity is established using Amazon Cognito, but we still need to implement authorization. One-to-one communication between a user and an AI assistant in our example should be private—we don’t want users with the knowledge of another user’s sub attribute to be able to subscribe to or publish to another user’s inbound or outbound channel.
This is why we use sub in our naming scheme for channels. This enables the Lambda functions attached to the namespaces as data sources to verify that a user is authorized to publish and subscribe.
The following code sample is our SubscribeHandler Lambda function:

def lambda_handler(event, context):
    “””
    Lambda function that checks if the first channel segment matches the user’s sub.
    Returns None if it matches or an error message otherwise.
    “””

    # Extract segments and sub from the event
    segments = event.get(“info”, {}).get(“channel”, {}).get(“segments”)
    sub = event.get(“identity”, {}).get(“sub”, None)

    # Check if segments exist and the first segment matches the user’s sub
    if not segments:
        logger.error(“No segments found in event”)
        return “No segments found in channel path”

    if sub != segments[1]:
        logger.warning(
            f”Unauhotirzed: Sub ‘{sub}’ did not match path segment ‘{segments[1]}'”
        )
        return “Unauthorized”

    logger.info(f”Sub ‘{sub}’ matched path segment ‘{segments[1]}'”)

    return None

The function workflow consists of the following steps:

The name of the channel arrives in the event.
The user’s subject field, sub, is part of the context.
If the channel name and user identity don’t match, it doesn’t authorize the subscription and returns an error message.
Returning None indicates no errors and that the subscription is authorized.

The ChatHandler Lambda function uses the same logic to make sure users are only authorized to publish to their own inbound channel. The channel arrives in the event and the context carries the user identity.
Although our example is simple, it demonstrates how you can implement complex authorization rules using a Lambda function to authorize access to channels in AppSync Events.We have covered access control to an individual’s inbound and outbound channels. Many business models around access to LLMs involve controlling how many tokens an individual is allowed to use within some period of time. We discuss this capability in the following section.
Rate limiting and metering
Understanding and controlling the number of tokens consumed by users of an AI Gateway is important to many customers. Input and output tokens are the primary pricing mechanism for text-based LLMs in Amazon Bedrock. In our example, we use the Amazon Bedrock Converse API to access LLMs. The Converse API provides a consistent interface that works with the models that support messages. You can write code one time and use it with different models.
Part of the consistent interface is the stream metadata event. This event is emitted at the end of each stream and provides the number of tokens consumed by the stream. The following is an example JSON structure:

{
    “metadata”: {
        “usage”: {
            “inputTokens”: 1062,
            “outputTokens”: 512,
            “totalTokens”: 1574
        },
        “metrics”: {
            “latencyMs”: 4133
        }
    }
}

We have input tokens, output tokens, total tokens, and a latency metric. To create a control with this data, we first consider the types of limits we want to implement. One approach is a monthly token limit that resets every month—a static window. Another is a daily limit based on a rolling window on 10-minute intervals. When a user exceeds their monthly limit, they must wait until the next month. After a user exceeds their daily rolling window limit, they must wait 10 minutes for more tokens to become available.
We need a way to keep atomic counters to track the token consumption, with fast real-time access to the counters with the user’s sub, and to delete old counters as they become irrelevant.
Amazon DynamoDB is a serverless, fully managed, distributed NoSQL database with single-digit millisecond performance at many scales. With DynamoDB, we can keep atomic counters, provide access to the counters keyed by the sub, and roll off old data using its time to live feature. The following diagram shows a subset of our architecture from earlier in this post that now includes a DynamoDB table to track token usage.

We can use a single DynamoDB table with the following partition and sort keys:

Partition key – user_id (String), the unique identifier for the user
Sort key – period_id (String), a composite key that identifies the time period

The user_id will receive the sub attribute from the JWT provided by Amazon Cognito. The period_id will have strings that sort lexicographically that indicate which time period the counter is for as well as the timeframe. The following are some example sort keys:

10min:2025-08-05:16:40
10min:2025-08-05:16:50
monthly:2025-08

10min or monthly indicate the type of counter. The timestamp is set to the last 10-minute window (for example, (minute // 10) * 10).
With each record, we keep the following attributes:

input_tokens – Counter for input tokens used in this 10-minute window
output_tokens – Counter for output tokens used in this 10-minute window
timestamp – Unix timestamp when the record was created or last updated
ttl – Time to live value (Unix timestamp), set to 24 hours from creation

The two token columns are incremented with the DynamoDB atomic ADD operation with each metadata event from the Amazon Bedrock Converse API. The ttl and timestamp columns are updated to indicate when the record is automatically removed from the table.
When a user sends a message, we check whether they have exceeded their daily or monthly limits.
To calculate daily usage, the meter.py module completes the following steps:

Calculates the start and end keys for the 24-hour window.
Queries records with the partition key user_id and sort key between the start and end keys.
Sums up the input_tokens and output_tokens values from the matching records.
Compares the sums against the daily limits.

See the following example code:

KeyConditionExpression: “user_id = :uid AND period_id BETWEEN :start AND :end”
ExpressionAttributeValues: {
    “:uid”: {“S”: “user123”},
    “:start”: {“S”: “10min:2025-08-04:15:30”},
    “:end”: {“S”: “10min:2025-08-05:15:30”}
}

This range query takes advantage of the naturally sorted keys to efficiently retrieve only the records from the last 24 hours, without filtering in the application code.The monthly usage calculation on the static window is much simpler. To check monthly usage, the system completes the following steps:

Gets the specific record with the partition key user_id and sort key monthly:YYYY-MM for the current month.
Compares the input_tokens and output_tokens values against the monthly limits.

See the following code:

Key: {
    “user_id”: {“S”: “user123”},
    “period_id”: {“S”: “monthly:2025-08”}
}

With an additional Python module and DynamoDB, we have a metering and rate limiting solution that works for both static and rolling windows.
Diverse model access
Our sample code uses the Amazon Bedrock Converse API. Not every model is included in the sample code, but many models are included for you to rapidly explore possibilities.The innovation in this area doesn’t stop at models on AWS. There are numerous ways to develop generative AI solutions at every level of abstraction. You can build on top of the layer that best suits your use case.
Swami Sivasubramanian recently wrote on how AWS is enabling customers to deliver production-ready AI agents at scale. He discusses Strands Agents, an open source AI agents SDK, as well as Amazon Bedrock AgentCore, a comprehensive set of enterprise-grade services that help developers quickly and more securely deploy and operate AI agents at scale using a framework and model, hosted on Amazon Bedrock or elsewhere.
To learn more about architectures for AI agents, refer to Strands Agents SDK: A technical deep dive into agent architectures and observability. The post discusses the Strands Agents SDK and its core features, how it integrates with AWS environments for more secure, scalable deployments, and how it provides rich observability for production use. It also provides practical use cases and a step-by-step example.
Logging
Many of our AI Gateway stakeholders are interested in logs. Developers want to understand how their applications function. System engineers need to understand operational concerns like tracking availability and capacity planning. Business owners want analytics and trends so that they can make better decisions.
With Amazon CloudWatch Logs, you can centralize the logs from your different systems, applications, and AWS services that you use in a single, highly scalable service. You can then seamlessly view them, search them for specific error codes or patterns, filter them based on specific fields, or archive them securely for future analysis. CloudWatch Logs makes it possible to see your logs, regardless of their source, as a single and consistent flow of events ordered by time.
In the sample AI Gateway architecture, CloudWatch Logs is integrated at multiple levels to provide comprehensive visibility. The following architecture diagram depicts the integration points between AppSync Events, Lambda, and CloudWatch Logs in the sample application.

AppSync Events API logging
Our AppSync Events API is configured with ERROR-level logging to capture API-level issues. This configuration helps identify issues with API requests, authentication failures, and other critical API-level problems.The logging configuration is applied during the infrastructure deployment:

this.api = new appsync.EventApi(this, “Api”, {
    // … other configuration …
    logConfig: {
        excludeVerboseContent: true,
        fieldLogLevel: appsync.AppSyncFieldLogLevel.ERROR,
        retention: logs.RetentionDays.ONE_WEEK,
    },
});

This provides visibility into API operations.
Lambda function structured logging
The Lambda functions use AWS Lambda Powertools for structured logging. The ChatHandler Lambda function implements a MessageTracker class that provides context for each conversation:

logger = Logger(service=”eventhandlers”)

class MessageTracker:
    “””
    Tracks message state during processing to provide enhanced logging.
    Handles event type detection and processing internally.
    “””

    def __init__(self, user_id, conversation_id, user_message, model_id):
        self.user_id = user_id
        self.conversation_id = conversation_id
        self.user_message = user_message
        self.assistant_response = “”
        self.input_tokens = 0
        self.output_tokens = 0
        self.model_id = model_id
        # …

Key information logged includes:

User identifiers
Conversation identifiers for request tracing
Model identifiers to track which AI models are being used
Token consumption metrics (input and output counts)
Message previews
Detailed timestamps for time-series analysis

Each Lambda function sets a correlation ID for request tracing, making it straightforward to follow a single request through the system:

# Set correlation ID for request tracing
logger.set_correlation_id(context.aws_request_id)

Operational insights
CloudWatch Logs Insights enables SQL-like queries across log data, helping you perform the following actions:

Track token usage patterns by model or user
Monitor response times and identify performance bottlenecks
Detect error patterns and troubleshoot issues
Create custom metrics and alarms based on log data

By implementing comprehensive logging throughout the sample AI Gateway architecture, we provide the visibility needed for effective troubleshooting, performance optimization, and operational monitoring. This logging infrastructure serves as the foundation for both operational monitoring and the analytics capabilities we discuss in the following section.
Analytics
CloudWatch Logs provides operational visibility, but for extracting business intelligence from logs, AWS offers many analytics services. With our sample AI Gateway architecture, you can use those services to transform data from your AI Gateway without requiring dedicated infrastructure or complex data pipelines.
The following architecture diagram shows the flow of data between the Lambda function, Amazon Data Firehose, Amazon Simple Storage Service (Amazon S3), the AWS Glue Data Catalog, and Amazon Athena.

The key components include:

Data Firehose – The ChatHandler Lambda function streams structured log data to a Firehose delivery stream at the end of each completed user response. Data Firehose provides a fully managed service that automatically scales with your data throughput, alleviating the need to provision or manage infrastructure. The following code illustrates how the API call that integrates the ChatHandler Lambda function with the delivery stream:

# From messages.py
firehose_stream = os.environ.get(“FIREHOSE_DELIVERY_STREAM”)
if firehose_stream:
    try:
        firehose.put_record(
            DeliveryStreamName=firehose_stream,
            Record={“Data”: json.dumps(log_data) + “n”},
        )
        logger.debug(f”Successfully sent data to Firehose stream: {firehose_stream}”)
    except Exception as e:
        logger.error(f”Failed to send data to Firehose: {str(e)}”)

Amazon S3 with Parquet format – Firehose automatically converts the JSON log data to columnar Parquet format before storing it in Amazon S3. Parquet improves query performance and reduces storage costs compared to raw JSON logs. The data is partitioned by year, month, and day, enabling efficient querying of specific time ranges while minimizing the amount of data scanned during queries.
AWS Glue Data Catalog – An AWS Glue database and table are created in the AWS Cloud Development Kit (AWS CDK) application to define the schema for our analytics data, including user_id, conversation_id, model_id, token counts, and timestamps. Table partitions are added as new S3 objects are stored by Data Firehose.
Athena for SQL-based analysis – With the table in the Data Catalog, business analysts can use familiar SQL through Athena to extract insights. Athena is serverless and priced per query based on the amount of data scanned, making it a cost-effective solution for one-time analysis without requiring database infrastructure. The following is an example query:

— Example: Token usage by model
SELECT
    model_id,
    SUM(input_tokens) as total_input_tokens,
    SUM(output_tokens) as total_output_tokens,
    COUNT(*) as conversation_count
FROM firehose_database.firehose_table
WHERE year=’2025′ AND month=’08’
GROUP BY model_id
ORDER BY total_output_tokens DESC;

This serverless analytics pipeline transforms the events flowing through AppSync Events into structured, queryable tables with minimal operational overhead. The pay-as-you-go pricing model of these services facilitates cost-efficiency, and their managed nature alleviates the need for infrastructure provisioning and maintenance. Furthermore, with your data cataloged in AWS Glue, you can use the full suite of analytics and machine learning services on AWS such as Amazon Quick Sight and Amazon SageMaker Unified Studio with your data.
Monitoring
AppSync Events and Lambda functions send metrics to CloudWatch so you can monitor performance, troubleshoot issues, and optimize your AWS AppSync API operations effectively. For an AI Gateway, you might need more information in your monitoring system to track important metrics such as token consumption from your models.
The sample application includes a call to CloudWatch metrics to record the token consumption and LLM latency at the end of each conversation turn so operators have visibility into this data in real time. This enables metrics to be included in dashboards and alerts. Moreover, the metric data includes the LLM model identifier as a dimension so you can track token consumption and latency by model. Metrics are just one component of what we can learn about our application at runtime with CloudWatch. Because our log messages are formatted as JSON, we can perform analytics on our log data for monitoring using CloudWatch Logs Insights. The following architecture diagram illustrates the logs and metrics made available by AppSync Events and Lambda through CloudWatch and CloudWatch Logs Insights.

For example, the following query against the sample application’s log groups shows us the users with the most conversations within a given time window:

fields , 
| filter  like “Message complete”
| stats count_distinct(conversation_id) as conversation_count by user_id
| sort conversation_count desc
| limit 10

@timestamp and @message are standard fields for Lambda logs. On line 3, we compute the number of unique conversation identifiers for each user. Thanks to the JSON formatting of the messages, we don’t need to provide parsing instructions to read these fields. The Message complete log message is found in packages/eventhandlers/eventhandlers/messages.py in the sample application.
The following query example shows the number of unique users using the system for a given window:

fields , 
| filter  like “Message complete”
| stats count_distinct(user_id) by bin(5m) as unique_users

Again, we filter for Message complete, compute unique statistics on the user_id field from our JSON messages, and then emit the data as a time series with 5-minute intervals with the bin function.
Caching (prepared responses)
Many AI Gateways provide a cache mechanism for assistant messages. This would be appropriate in situations where large numbers of users ask exactly the same questions and need the same exact answers. This could be a considerable cost savings for a busy application in the right situation. A good candidate for caching might be about the weather. For example, with the question “Is it going to rain in NYC today?”, everyone should see the same response. A bad candidate for caching would be one where the user might ask the same thing but would receive private information in return, such as “How many vacation hours do I have right now?” Take care to use this idea safely in your area of work. A basic cache implementation is included in the sample to help you get started with this mechanism. Caches in conversational AI require a lot of care to be taken to make sure information doesn’t leak between users. Given the amount of context an LLM can use to tailor a response, caches should be used judiciously.
The following architecture diagram shows the use of DynamoDB as a storage mechanism for prepared responses in the sample application.

The sample application computes a hash on the user message to query a DynamoDB table with stored messages. If there is a message available for a hash key, the application returns the text to the user, the custom metrics record a cache hit in CloudWatch, and an event is passed back to AppSync Events to notify the application the response is complete. This encapsulates the cache behavior completely within the event structure the application understands.
Install the sample application
Refer to the README file on GitHub for instructions to install the sample application. Both install and uninstall are driven by a single command to deploy or un-deploy the AWS CDK application.
Sample pricing
The following table estimates monthly costs of the sample application with light usage in a development environment. Actual cost will vary by how you use the services for your use case.

Service
Unit
Price/Unit
Sample Usage
Links

AWS Glue
Objects Stored
No additional cost for first million objects stored, $1.00/100,000 above 1 million per month
Less than 1 million
https://aws.amazon.com/glue/pricing/

Amazon DynamoDB
Read Request Units
$0.625 per million write request units
$2.00
https://aws.amazon.com/dynamodb/pricing/on-demand/

Write Request Units
$0.125 per million write request units

Amazon S3 (Standard)
First 50 TB / Month
$0.023 per GB
$2.00
https://aws.amazon.com/s3/pricing/

AppSync Events
Event API Operations
$1.00 per million API operations
$1.00
https://aws.amazon.com/appsync/pricing/

Connection Minutes
$0.08 per million connection minutes

Amazon Data Firehose
TB / month
$0.029 First 500 TB / month
$1.00
https://aws.amazon.com/firehose/pricing/

Format Conversion per GB
$0.018 / GB

AWS Lambda
Requests
$0.20 / 1M requests
$1.00
https://aws.amazon.com/lambda/pricing/

Duration
$0.0000000067 / GB-seconds / month

Amazon Bedrock
Input Tokens
$3.00 / 1M input tokens(Anthropic Claude 4 Sonnet)
$20.00-$40.00
https://aws.amazon.com/bedrock/pricing/

Output Tokens
$15.00 / 1M Output tokens(Anthropic Claude 4 Sonnet)

AWS WAF
Base
$5.00 / month
$8.00
https://aws.amazon.com/waf/pricing/

Rules
$1.00 / rule / month

Requests
$0.60 / 1M requests

Amazon Cognito
Monthly Active Users
First 10,000 users, no additional cost
$0.00
https://aws.amazon.com/cognito/pricing/

Amazon CloudFront
Requests, Data Transfer Out
See pricing
$0.00
https://aws.amazon.com/cloudfront/pricing/

Amazon CloudWatch
Logs, Metrics
See pricing
$0.00
https://aws.amazon.com/cloudwatch/pricing/

The monthly cost of the sample application, assuming light development use, is expected to be between $35–55 per month.
Sample UI
The following screenshots showcase the sample UI. It provides a conversation window on the right and a navigation bar on the left. The UI features the following key components:

A Token Usage section is displayed and updated with each turn of the conversation
The New Chat option clears the messages from the chat interface so the user can start a new session
The model selector dropdown menu shows the available models

The following screenshot shows the chat interface of the sample application.

The following screenshot shows the model selection menu.

Conclusion
As the AI landscape evolves, you need an infrastructure that adapts as quickly as the models themselves. By centering your architecture around AppSync Events and the serverless patterns we’ve covered—including Amazon Cognito based identity authentication, DynamoDB powered metering, CloudWatch observability, and Athena analytics—you can build a foundation that grows with your needs. The sample application presented in this post gives you a starting point that demonstrates real-world patterns, helping developers explore AI integration, architects design enterprise solutions, and technical leaders evaluate approaches.
The complete source code and deployment instructions are available in the GitHub repo. To get started, deploy the sample application and explore the nine architectures in action. You can customize the authorization logic to match your organization’s requirements and extend the model selection to include your preferred models on Amazon Bedrock. Share your implementation insights with your organization, and leave your feedback and questions in the comments.

About the authors

Archie Cowan is a Senior Prototype Developer on the AWS Industries Prototyping and Cloud Engineering team. He joined AWS in 2022 and has developed software for companies in Automotive, Energy, Technology, and Life Sciences industries. Before AWS, he led the architecture team at ITHAKA, where he made contributions to the search engine on jstor.org and a production deployment velocity increase from 12 to 10,000 releases per year over the course of his tenure there. You can find more of his writing on topics such as coding with ai at fnjoin.com and x.com/archiecowan.

How Totogi automated change request processing with Totogi BSS Magic a …

This post is cowritten by Nikhil Mathugar, Marc Breslow and Sudhanshu Sinha from Totogi.
This blog post describes how Totogi automates change request processing. Totogi is an AI company focused on helping helping telecom (telco) companies innovate, accelerate growth and adopt AI at scale. BSS Magic, Totogi’s flagship product, connects and models telco business operations, overlaying legacy systems with an AI layer. With BSS Magic, telcos can extend, customize, and modernize their systems without vendor dependencies or lengthy implementations. By partnering with the AWS Generative AI Innovation Center and using the rapid innovation capabilities of Amazon Bedrock, we accelerated the development of BSS Magic, helping Totogi’s customers innovate faster and gain more control over their tech stack.
In this post, we explore the challenges associated with the traditional business support system (BSS), and the innovative solutions provided by Totogi BSS Magic. We introduce intricacies of telco ontologies and the multi-agent framework that powers automated change request processing. Additionally, the post will outline the orchestration of AI agents and the benefits of this approach for telecom operators and beyond.
Challenges with BSS
BSS are notoriously difficult to manage. A typical BSS stack consists of hundreds of different applications from various vendors. But those BSS applications are difficult to integrate, either restricting telcos to the vendor’s ecosystem or requiring them to invest in costly customizations. Such customizations are slow and resource-intensive because of their reliance on specialized engineering talent.
Each change request necessitates a thorough analysis of potential impacts across interconnected modules, consuming significant time and effort. Even small updates can involve multiple rounds of coding, testing, and reconfiguration to achieve stability. For telecom operators, where system reliability is critical, these safeguards are non-negotiable, but they come at a steep price. This process is further complicated by the scarcity of engineers with the necessary expertise, driving up costs and elongating timelines. As a result, development cycles for new features or services often take months to complete, leaving operators struggling to meet the demands of a fast-moving market.
Initiatives like TM Forum’s Open Digital Architecture (ODA) aim to solve this, yet most vendors are slow to adopt such open standards. This dynamic amplifies technical debt and inflates operational expenses.
BSS Magic solution overview
Totogi BSS Magic reduces the complexity using AI-generated interoperability, which helps simplify integrations, customizations, and application development. BSS Magic has two key aspects:

A telco ontology that understands the semantic meanings of data structures and the relationships between them, linking disparate data into a coherent network of knowledge.
Multi-agent framework for fully automated change requests (CR), which reduces CR processing time from 7 days to a few hours.

Telco ontology: The key to interoperability
Ontologies serve as semantic blueprints that detail concepts, relationships, and domain knowledge. In telecom, this means translating the BSS landscape into a clear, reusable, and interoperable ecosystem. Totogi’s telco ontology facilitates a deep understanding of data interaction and seamless integration across any vendor or system. By adopting FAIR principles (Findability, Accessibility, Interoperability, and Reusability), the ontology-driven architecture turns static, siloed data into dynamic, interconnected knowledge assets—unlocking trapped data and accelerating innovation. An overview diagram of the ontology is provided in the following figure.

Multi-agent framework for automated change request processing
AI agents are advanced software applications trained to perform specific tasks autonomously. Totogi’s BSS Magic AI agents have extensive domain knowledge and use this understanding to manage complex data interactions across multiple vendor systems. These agents automatically generate and test telco-grade code, replacing traditional integrations and customizations with intelligent, AI generated applications. At its core, BSS Magic uses a multi-agent AI approach with feedback loops to automate the entire software development pipeline. Each agent is designed to fulfill a specific role in the development pipeline:

Business analysis agent translates unstructured requirements into formal business specifications.
Technical architect agent takes these business specs and defines technical architectures, APIs, and dependencies.
Developer agent generates high-quality, deployable code, complete with modular designs and optimizations.
QA agent validates the code for adherence to best practices, improving quality and security. It provides feedback which is used by the developer agent to update the code.
Tester agent generates robust unit test cases, streamlining validation and deployment. The result of the test cases is used by the developer agent to improve the code.

An overview of the system is provided in the following figure.

This integrated pipeline reduces the time to complete a change request from 7 days to a few hours, with minimal human intervention. The prerequisites for implementing the system include an AWS account with access to Amazon Bedrock, AWS Step Functions, AWS Lambda, and configured Amazon credentials. The AI agents are implemented using Anthropic Claude large language models (LLMs) through Amazon Bedrock. State management and workflow coordination are handled by Step Functions for reliable progression through each stage. The AWS infrastructure provides the enterprise-grade reliability, security, and scalability essential for telco-grade solutions.
To build the framework, Totogi collaborated with the AWS Generative AI Innovation Center (GenAIIC). GenAIIC offered access to AI expertise, industry-leading talent, and a rigorous iterative process to optimize the AI agents and code-generation workflows. It also provided guidance on prompt engineering, Retrieval Augmented Generation (RAG), model selection, automated code review, feedback loops, robust performance metrics for evaluating AI-generated outputs, and so on. The collaboration helped establish methods for maintaining reliability while scaling automation across the platform. The solution orchestrates multiple specialized AI agents to handle the complete software development lifecycle, from requirements analysis to test execution. The details of the AI agents are given in the following sections.
Multi-agent orchestration layer
The orchestration layer coordinates specialized AI agents through a combination of Step Functions and Lambda functions. Each agent maintains context through RAG and few-shot prompting techniques to generate accurate domain-specific outputs. The system manages agent communication and state transitions while maintaining a comprehensive audit trail of decisions and actions.
Business analysis generation
The Business Analyst agent uses Claude’s natural language understanding capabilities to process statement of work (SOW) documents and acceptance criteria. It extracts key requirements using custom prompt templates optimized for telecom BSS domain knowledge. The agent generates structured specifications for downstream processing while maintaining traceability between business requirements and technical implementations.
Technical architecture generation
The Technical Architect agent transforms business requirements into concrete AWS service configurations and architectural patterns. It generates comprehensive API specifications and data models and incorporates AWS Well-Architected principles. The agent validates architectural decisions against established patterns and best practices, producing infrastructure-as-code templates for automated deployment.
Code generation pipeline
The Developer agent converts technical specifications into implementation code using Claude’s advanced code generation capabilities. It produces robust, production-ready code that includes proper error handling and logging mechanisms. The pipeline incorporates feedback from validation steps to iteratively improve code quality and maintain consistency with AWS best practices.
Automated quality assurance
The QA agent is built using Claude to perform comprehensive code analysis and validation. It evaluates code quality and identifies potential performance issues. The system maintains continuous feedback loops with the development stage, facilitating rapid iteration and improvement of generated code based on quality metrics and best practices adherence. The QA process consists of carefully crafted prompts.
QA code analysis prompt:

“You are a senior QA backend engineer analyzing Python code for serverless applications.
Your task is to:
Compare requirements against implemented code
Identify missing features
Suggest improvements in code quality and efficiency
 Provide actionable feedback
Focus on overall implementation versus minor details
Consider serverless best practices”

This prompt helps the QA agent perform thorough code analysis, evaluate quality metrics, and maintain continuous feedback loops with development stages.
Test automation framework
The Tester agent creates comprehensive test suites that verify both functional and non-functional requirements. It uses Claude to understand test contexts and generate appropriate test scenarios. The framework manages test refinement through evaluation cycles, achieving complete coverage of business requirements while maintaining test code quality and reliability. The testing framework uses a multi-stage prompt approach.
Initial test structure prompt:

“As a senior QA engineer, create a pytest-based test structure including:
Detailed test suite organization
Resource configurations
Test approach and methodology
Required imports and dependencies”

Test implementation prompt:

“Generate complete pytest implementation including:
Unit tests for each function
Integration tests for API endpoints
AWS service mocking
Edge case coverage
Error scenario handling”

Test results analysis prompt:

“Evaluate test outputs and coverage reports to:
Verify test completion status
Track test results and outcomes
Measure coverage metrics
Provide actionable feedback”

This structured approach leads to comprehensive test coverage while maintaining high quality standards. The framework currently achieves 76% code coverage and successfully validates both functional and non-functional requirements.
The Tester agent provides a feedback loop to the Development agent to improve the code.
Conclusion
The integration of Totogi BSS Magic with Amazon Bedrock presents a comprehensive solution for modern telecom operators. Some takeaways for you to consider:

End-to-end automation: BSS Magic automates the entire development lifecycle—from idea to deployment. AI agents handle everything from requirements, architecture, and code generation to testing and validation.
Results: The agentic framework significantly boosted efficiency, reducing change request processing from seven days to a few hours. The automated testing framework achieved 76% code coverage, consistently delivering high-quality telecom-grade code.
Unique value for telecom operators: By using Totogi BSS Magic, telecom operators can accelerate time-to-market and reduce operational costs. BSS Magic uses autonomous AI, independently managing complex tasks so telecom operators can concentrate on strategic innovation. The solution is supported by Amazon Bedrock, which offers scalable AI models and infrastructure, high-level security and reliability critical for telecom.
Impact to other industries: While BSS Magic is geared towards the telecom industry, the multi-agent framework can be repurposed for general software development across other industries.
Future work: Future enhancements will focus on expanding the model’s domain knowledge in telecom and other domains. Another possible extension is to integrate an AI model to predict potential issues in change requests based on historical data, thereby preemptively addressing common pitfalls.

Any feedback and questions are welcome in the comments below. Contact us to engage AWS Generative AI Innovation Center or to learn more.

About the authors
Nikhil Mathugar is a Presales Full Stack Engineer at Totogi, where he designs and implements scalable AWS-based proofs-of-concept across Python and modern JavaScript frameworks. He has over a decade of experience in architecting and maintaining large-scale systems—including web applications, multi-region streaming infrastructures and high-throughput automation pipelines. Building on that foundation, he’s deeply invested in AI—specializing in generative AI, agentic workflows and integrating large-language models to evolve Totogi’s BSS Magic platform.
Marc Breslow is Field CTO of Totogi, where he is utilizing AI to revolutionize the telecommunications industry. A veteran of Accenture, Lehman Brothers, and Citibank, Marc has a proven track record of building scalable, high-performance systems. At Totogi, he leads the development of AI-powered solutions that drive tangible results for telcos: reducing churn, increasing Average Revenue Per user (ARPU), and streamlining business processes. Marc is responsible for customer proof points demonstrating these capabilities. When not engaging with customers, Marc leads teams building Totogi’s BSS Magic technology, generating applications and improving efficiency using AI agents and workflows.
Sudhanshu Sinha is Chief Technology Officer and a founding team member at Totogi, where he works alongside Acting CEO Danielle Rios to drive the telecom industry’s shift to AI-native software. As the key strategist behind BSS Magic, he shaped its architecture, go-to-market, and early adoption—translating AI-native principles into measurable value for operators. He also helped define Totogi’s Telco Ontology, enabling interoperability and automation across complex BSS landscapes. With over two decades in telecommunications, Sudhanshu blends deep technical insight with commercial acumen to make AI-driven transformation practical and profitable for telcos worldwide.
Parth Patwa is a Data Scientist at the AWS Generative AI Innovation Center, where he works on customer projects using Generative AI and LLMs. He has an MS from University of California Los Angeles. He has published papers in top-tier ML and NLP venues, and has over 1000 citations.
Mofijul Islam is an Applied Scientist II and Tech Lead at the AWS Generative AI Innovation Center, where he helps customers tackle customer-centric research and business challenges using generative AI, large language models (LLM), multi-agent learning, code generation, and multimodal learning. He holds a PhD in machine learning from the University of Virginia, where his work focused on multimodal machine learning, multilingual NLP, and multitask learning. His research has been published in top-tier conferences like NeurIPS, ICLR, AISTATS, and AAAI, as well as IEEE and ACM Transactions.
Andrew Ang is a Senior ML Engineer with the AWS Generative AI Innovation Center, where he helps customers ideate and implement generative AI proof of concept projects. Outside of work, he enjoys playing squash and watching competitive cooking shows.
Shinan Zhang is an Applied Science Manager at the AWS Generative AI Innovation Center. With over a decade of experience in ML and NLP, he has worked with large organizations from diverse industries to solve business problems with innovative AI solutions, and bridge the gap between research and industry applications.

StepFun AI Introduce Step-DeepResearch: A Cost-Effective Deep Research …

StepFun has introduced Step-DeepResearch, a 32B parameter end to end deep research agent that aims to turn web search into actual research workflows with long horizon reasoning, tool use and structured reporting. The model is built on Qwen2.5 32B-Base and is trained to act as a single agent that plans, explores sources, verifies evidence and writes reports with citations, while keeping inference cost low.

From Search to Deep Research

Most existing web agents are tuned for multi-hop question-answering benchmarks. They try to match ground truth answers for short questions. This is closer to targeted retrieval than to real research. Deep research tasks are different. They involve latent intent recognition, long horizon decision making, multi-turn tool use, structured-reasoning and cross-source verification under uncertainty.

Step-DeepResearch reframes this as sequential decision making over a compact set of atomic capabilities. The research team defines 4 atomic capabilities, planning and task decomposition, deep-information seeking, reflection and verification, and professional report generation. Instead of orchestrating many external agents, the system internalizes this loop into a single model that decides the next action at each step.

Data Synthesis around Atomic Capabilities

To teach these atomic capabilities, the research team builds separate data pipelines for each skill. For planning, they start from high quality technical reports, survey papers and financial analysis documents. They reverse-engineer realistic research plans and task trees from titles, abstracts and structure, then generate trajectories that follow these plans. This exposes the model to long horizon project structures, not only short question templates.

For deep information seeking, they construct graph based queries over knowledge graphs such as Wikidata5m and CN-DBpedia. They sample subgraphs, expand them using search, and synthesize questions that require multi hop reasoning across entities and documents. A separate pipeline uses a Wiki style hyperlink index to force cross document retrieval and combination of evidence. Easy questions that a strong model can already solve with a simple ReAct style strategy are filtered out, so training focuses on hard search problems.

Reflection and verification data is generated through self-correction loops and multi-agent teacher traces. Teacher agents extract claims, plan checks, verify facts, replan if inconsistencies appear and only then write reports. The resulting trajectories are cleaned and used as supervision for a single student agent. Report generation is trained in 2 phases, mid training for domain style and depth using query report pairs, then supervised fine-tuning with strict formatting and plan consistency constraints.

Progressive Training on Qwen2.5-32B-Base

The training pipeline has 3 stages, agentic mid-training, supervised fine-tuning and reinforcement learning. In mid training stage-1, the team injects atomic capabilities without tools, using context length up to 32k tokens. The data covers active reading, synthetic reasoning traces, summarization and reflection. The research team show steady gains on SimpleQA, TriviaQA and FRAMES as training scales up to about 150B tokens, with the largest gains on FRAMES, which stresses structured reasoning.

In stage-2, the context extends to 128k tokens and explicit tool calls are introduced. The model learns tasks such as URL based question-answering, deep web search, long document summarization and long dialogue reasoning. This stage aligns the model with real research scenarios where search, browsing and analysis must be mixed in one trajectory.

During supervised fine-tuning, the 4 atomic capabilities are composed into full deep search and deep research traces. Data cleaning keeps trajectories that are correct and short in terms of steps and tool calls. The pipeline injects controlled tool errors followed by correction to improve robustness, and enforces citation formats so that reports stay grounded in the retrieved sources.

Reinforcement learning then optimizes the agent in a real tool environment. The research team builds tasks and checklists through reverse synthesis, and trains a checklist style Rubrics Judge to score reports along fine grained dimensions. The reward design converts ternary rubric labels into asymmetric binary rewards that capture both positive targets and violations. The policy is trained with PPO and a learned critic, using generalized advantage estimation with near zero discount so that long trajectories are not truncated.

Single Agent ReAct Architecture and Search Stack

At inference time, Step-DeepResearch runs as a single ReAct style agent that alternates thinking, tool calls and observations until it decides to output a report. The tool set includes batch web search, a todo manager, shell commands and file operations. Execution runs in a sandbox with terminal persistence through tmux. A perception oriented browser reduces redundant page captures by using perceptual hash distance. Tools for document parsing, audio transcription and image analysis support multimodal inputs.

Information acquisition uses 2 related resources. StepFun team states that its Search API is grounded in more than 20M high quality papers and 600 premium indices. The research team then describes a curated authority indexing strategy that isolates more than 600 trusted domains, including government, academic and institutional sites. Retrieval operates at paragraph level and uses authority aware ranking so that high trust domains are preferred when relevance is similar.

The file tools support patch based editing, so the agent can update only modified sections of a report. A summary aware storage scheme writes full tool outputs to local files and injects only compact summaries into the context. This acts as external memory and avoids context overflow for long projects.

Evaluation, Cost and Access

To measure deep research behavior, the team introduce ADR-Bench, a Chinese benchmark with 110 open ended tasks across 9 domains. 70 tasks cover general domains such as education, science and engineering and social life, evaluated by expert side by side comparison. 40 tasks in finance and law are scored with explicit rubrics that follow atomicity and verifiability constraints.

On Scale AI Research Rubrics, Step-DeepResearch reaches 61.42 percent rubric compliance, which is comparable to OpenAI-DeepResearch and Gemini-DeepResearch, and clearly ahead of multiple open and proprietary baselines. On ADR-Bench, expert-based Elo ratings show that the 32B model outperforms larger open-models such as MiniMax-M2, GLM-4.6 and DeepSeek-V3.2, and is competitive with systems like Kimi-Researcher and MiniMax-Agent-Pro.

Key Takeaways

Single agent, atomic capability design: Step-DeepResearch is a 32B parameter single agent built on Qwen2.-32B-Base, it internalizes 4 atomic capabilities, planning, deep information seeking, reflection and verification, and professional report generation, instead of relying on many external agents.

Targeted data synthesis for each skill: The research team builds separate data pipelines for planning, deep information seeking, reflection and report writing, using reverse-engineered plans from real reports, graph-based queries over Wikidata5m and CN-DBpedia, multi-agent teacher traces and strict report formatting data.

Three stage training with long context and RL: Training uses mid training, supervised fine-tuning and reinforcement learning, with mid training up to 150B tokens at 32k and then 128k context, SFT composes full deep research trajectories, and PPO based RL with a Rubrics Judge optimizes reports against fine grained checklists.

ReAct architecture with curated search and external memory: At inference time the model runs a ReAct loop that calls tools for batch web search, todo, shell and file operations, uses a Search API grounded in more than 20M papers and 600 premium indices along with 600+trusted domains, and relies on patch editing and summary aware storage to act as external memory.

Competitive quality with lower cost: On Scale AI Research Rubrics the model reaches 61.42 percent rubric compliance and is competitive with OpenAI-DeepResearch and Gemini-DeepResearch, on ADR Bench it achieves 67.1 percent win or tie rate against strong baselines.

Check out the Paper and Repo. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post StepFun AI Introduce Step-DeepResearch: A Cost-Effective Deep Research Agent Model Built Around Atomic Capabilities appeared first on MarkTechPost.

A Coding Implementation to Automating LLM Quality Assurance with DeepE …

We initiate this tutorial by configuring a high-performance evaluation environment, specifically focused on integrating the DeepEval framework to bring unit-testing rigor to our LLM applications. By bridging the gap between raw retrieval and final generation, we implement a system that treats model outputs as testable code and uses LLM-as-a-judge metrics to quantify performance. We move beyond manual inspection by building a structured pipeline in which every query, retrieved context, and generated response is validated against rigorous academic-standard metrics. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserimport sys, os, textwrap, json, math, re
from getpass import getpass

print(” Hardening environment (prevents common Colab/py3.12 numpy corruption)…”)

!pip -q uninstall -y numpy || true
!pip -q install –no-cache-dir –force-reinstall “numpy==1.26.4″

!pip -q install -U deepeval openai scikit-learn pandas tqdm

print(” Packages installed.”)

import numpy as np
import pandas as pd
from tqdm.auto import tqdm

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

from deepeval import evaluate
from deepeval.test_case import LLMTestCase, LLMTestCaseParams
from deepeval.metrics import (
AnswerRelevancyMetric,
FaithfulnessMetric,
ContextualRelevancyMetric,
ContextualPrecisionMetric,
ContextualRecallMetric,
GEval,
)

print(” Imports loaded successfully.”)

OPENAI_API_KEY = getpass(” Enter OPENAI_API_KEY (leave empty to run without OpenAI): “).strip()
openai_enabled = bool(OPENAI_API_KEY)

if openai_enabled:
os.environ[“OPENAI_API_KEY”] = OPENAI_API_KEY
print(f” OpenAI enabled: {openai_enabled}”)

We initialize our environment by stabilizing core dependencies and installing the deepeval framework to ensure a robust testing pipeline. Next, we import specialized metrics like Faithfulness and Contextual Recall while configuring our API credentials to enable automated, high-fidelity evaluation of our LLM responses. Check out the FULL CODES here.

Copy CodeCopiedUse a different BrowserDOCS = [
{
“id”: “doc_01”,
“title”: “DeepEval Overview”,
“text”: (
“DeepEval is an open-source LLM evaluation framework for unit testing LLM apps. ”
“It supports LLM-as-a-judge metrics, custom metrics like G-Eval, and RAG metrics ”
“such as contextual precision and faithfulness.”
),
},
{
“id”: “doc_02”,
“title”: “RAG Evaluation: Why Faithfulness Matters”,
“text”: (
“Faithfulness checks whether the answer is supported by retrieved context. ”
“In RAG, hallucinations occur when the model states claims not grounded in context.”
),
},
{
“id”: “doc_03”,
“title”: “Contextual Precision”,
“text”: (
“Contextual precision evaluates how well retrieved chunks are ranked by relevance ”
“to a query. High precision means relevant chunks appear earlier in the ranked list.”
),
},
{
“id”: “doc_04”,
“title”: “Contextual Recall”,
“text”: (
“Contextual recall measures whether the retriever returns enough relevant context ”
“to answer the query. Low recall means key information was missed in retrieval.”
),
},
{
“id”: “doc_05”,
“title”: “Answer Relevancy”,
“text”: (
“Answer relevancy measures whether the generated answer addresses the user’s query. ”
“Even grounded answers can be irrelevant if they don’t respond to the question.”
),
},
{
“id”: “doc_06”,
“title”: “G-Eval (GEval) Custom Rubrics”,
“text”: (
“G-Eval lets you define evaluation criteria in natural language. ”
“It uses an LLM judge to score outputs against your rubric (e.g., correctness, tone, policy).”
),
},
{
“id”: “doc_07”,
“title”: “What a DeepEval Test Case Contains”,
“text”: (
“A test case typically includes input (query), actual_output (model answer), ”
“expected_output (gold answer), and retrieval_context (ranked retrieved passages) for RAG.”
),
},
{
“id”: “doc_08”,
“title”: “Common Pitfall: Missing expected_output”,
“text”: (
“Some RAG metrics require expected_output in addition to input and retrieval_context. ”
“If expected_output is None, evaluation fails for metrics like contextual precision/recall.”
),
},
]

EVAL_QUERIES = [
{
“query”: “What is DeepEval used for?”,
“expected”: “DeepEval is used to evaluate and unit test LLM applications using metrics like LLM-as-a-judge, G-Eval, and RAG metrics.”,
},
{
“query”: “What does faithfulness measure in a RAG system?”,
“expected”: “Faithfulness measures whether the generated answer is supported by the retrieved context and avoids hallucinations not grounded in that context.”,
},
{
“query”: “What does contextual precision mean?”,
“expected”: “Contextual precision evaluates whether relevant retrieved chunks are ranked higher than irrelevant ones for a given query.”,
},
{
“query”: “What does contextual recall mean in retrieval?”,
“expected”: “Contextual recall measures whether the retriever returns enough relevant context to answer the query, capturing key missing information issues.”,
},
{
“query”: “Why might an answer be relevant but still low quality in RAG?”,
“expected”: “An answer can address the question (relevant) but still be low quality if it is not grounded in retrieved context or misses important details.”,
},
]

We define a structured knowledge base consisting of documentation snippets that serve as our ground-truth context for the RAG system. We also establish a set of evaluation queries and corresponding expected outputs to create a “gold dataset,” enabling us to assess how accurately our model retrieves information and generates grounded responses. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserclass TfidfRetriever:
def __init__(self, docs):
self.docs = docs
self.texts = [f”{d[‘title’]}n{d[‘text’]}” for d in docs]
self.vectorizer = TfidfVectorizer(stop_words=”english”, ngram_range=(1, 2))
self.matrix = self.vectorizer.fit_transform(self.texts)

def retrieve(self, query, k=4):
qv = self.vectorizer.transform([query])
sims = cosine_similarity(qv, self.matrix).flatten()
top_idx = np.argsort(-sims)[:k]
results = []
for i in top_idx:
results.append(
{
“id”: self.docs[i][“id”],
“score”: float(sims[i]),
“text”: self.texts[i],
}
)
return results

retriever = TfidfRetriever(DOCS)

We implement a custom TF-IDF Retriever class that transforms our documentation into a searchable vector space using bigram-aware TF-IDF vectorization. This allows us to perform cosine similarity searches against the knowledge base, ensuring we can programmatically fetch the top-k most relevant text chunks for any given query. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserdef extractive_baseline_answer(query, retrieved_contexts):
“””
Offline fallback: we create a short answer by extracting the most relevant sentences.
This keeps the notebook runnable even without OpenAI.
“””
joined = “n”.join(retrieved_contexts)
sents = re.split(r”(?<=[.!?])s+”, joined)
keywords = [w.lower() for w in re.findall(r”[a-zA-Z]{4,}”, query)]
scored = []
for s in sents:
s_l = s.lower()
score = sum(1 for k in keywords if k in s_l)
if len(s.strip()) > 20:
scored.append((score, s.strip()))
scored.sort(key=lambda x: (-x[0], -len(x[1])))
best = [s for sc, s in scored[:3] if sc > 0]
if not best:
best = [s.strip() for s in sents[:2] if len(s.strip()) > 20]
ans = ” “.join(best).strip()
if not ans:
ans = “I could not find enough context to answer confidently.”
return ans

def openai_answer(query, retrieved_contexts, model=”gpt-4.1-mini”):
“””
Simple RAG prompt for demonstration. DeepEval metrics can still evaluate even if
your generation prompt differs; the key is we store retrieval_context separately.
“””
from openai import OpenAI
client = OpenAI()

context_block = “nn”.join([f”[CTX {i+1}]n{c}” for i, c in enumerate(retrieved_contexts)])
prompt = f”””You are a concise technical assistant.
Use ONLY the provided context to answer the query. If the answer is not in context, say you don’t know.

Query:
{query}

Context:
{context_block}

Answer:”””
resp = client.chat.completions.create(
model=model,
messages=[{“role”: “user”, “content”: prompt}],
temperature=0.2,
)
return resp.choices[0].message.content.strip()

def rag_answer(query, retrieved_contexts):
if openai_enabled:
try:
return openai_answer(query, retrieved_contexts)
except Exception as e:
print(f” OpenAI generation failed, falling back to extractive baseline. Error: {e}”)
return extractive_baseline_answer(query, retrieved_contexts)
else:
return extractive_baseline_answer(query, retrieved_contexts)

We implement a hybrid answering mechanism that prioritizes high-fidelity generation via OpenAI while maintaining a keyword-based extractive baseline as a reliable fallback. By isolating the retrieval context from the final generation, we ensure our DeepEval test cases remain consistent regardless of whether the answer is synthesized by an LLM or extracted programmatically. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserprint(“n Running RAG to create test cases…”)

test_cases = []
K = 4

for item in tqdm(EVAL_QUERIES):
q = item[“query”]
expected = item[“expected”]

retrieved = retriever.retrieve(q, k=K)
retrieval_context = [r[“text”] for r in retrieved]

actual = rag_answer(q, retrieval_context)

tc = LLMTestCase(
input=q,
actual_output=actual,
expected_output=expected,
retrieval_context=retrieval_context,
)
test_cases.append(tc)

print(f” Built {len(test_cases)} LLMTestCase objects.”)

print(“n Metrics configured.”)

metrics = [
AnswerRelevancyMetric(threshold=0.5, model=”gpt-4.1″, include_reason=True, async_mode=True),
FaithfulnessMetric(threshold=0.5, model=”gpt-4.1″, include_reason=True, async_mode=True),
ContextualRelevancyMetric(threshold=0.5, model=”gpt-4.1″, include_reason=True, async_mode=True),
ContextualPrecisionMetric(threshold=0.5, model=”gpt-4.1″, include_reason=True, async_mode=True),
ContextualRecallMetric(threshold=0.5, model=”gpt-4.1″, include_reason=True, async_mode=True),

GEval(
name=”RAG Correctness Rubric (GEval)”,
criteria=(
“Score the answer for correctness and usefulness. ”
“The answer must directly address the query, must not invent facts not supported by context, ”
“and should be concise but complete.”
),
evaluation_params=[
LLMTestCaseParams.INPUT,
LLMTestCaseParams.ACTUAL_OUTPUT,
LLMTestCaseParams.EXPECTED_OUTPUT,
LLMTestCaseParams.RETRIEVAL_CONTEXT,
],
model=”gpt-4.1″,
threshold=0.5,
async_mode=True,
),
]

if not openai_enabled:
print(“n You did NOT provide an OpenAI API key.”)
print(“DeepEval’s LLM-as-a-judge metrics (AnswerRelevancy/Faithfulness/Contextual* and GEval) require an LLM judge.”)
print(“Re-run this cell and provide OPENAI_API_KEY to run DeepEval metrics.”)
print(“n However, your RAG pipeline + test case construction succeeded end-to-end.”)
rows = []
for i, tc in enumerate(test_cases):
rows.append({
“id”: i,
“query”: tc.input,
“actual_output”: tc.actual_output[:220] + (“…” if len(tc.actual_output) > 220 else “”),
“expected_output”: tc.expected_output[:220] + (“…” if len(tc.expected_output) > 220 else “”),
“contexts”: len(tc.retrieval_context or []),
})
display(pd.DataFrame(rows))
raise SystemExit(“Stopped before evaluation (no OpenAI key).”)

We execute the RAG pipeline to generate LLMTestCase objects by pairing our retrieved context with model-generated answers and ground-truth expectations. We then configure a comprehensive suite of DeepEval metrics, including G-Eval and specialized RAG indicators, to evaluate the system’s performance using an LLM-as-a-judge approach. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserprint(“n Running DeepEval evaluate(…) …”)

results = evaluate(test_cases=test_cases, metrics=metrics)

summary_rows = []
for idx, tc in enumerate(test_cases):
row = {
“case_id”: idx,
“query”: tc.input,
“actual_output”: tc.actual_output[:200] + (“…” if len(tc.actual_output) > 200 else “”),
}
for m in metrics:
row[m.__class__.__name__ if hasattr(m, “__class__”) else str(m)] = None

summary_rows.append(row)

def try_extract_case_metrics(results_obj):
extracted = []
candidates = []
for attr in [“test_results”, “results”, “evaluations”]:
if hasattr(results_obj, attr):
candidates = getattr(results_obj, attr)
break
if not candidates and isinstance(results_obj, list):
candidates = results_obj

for case_i, case_result in enumerate(candidates or []):
item = {“case_id”: case_i}
metrics_list = None
for attr in [“metrics_data”, “metrics”, “metric_results”]:
if hasattr(case_result, attr):
metrics_list = getattr(case_result, attr)
break
if isinstance(metrics_list, dict):
for k, v in metrics_list.items():
item[f”{k}_score”] = getattr(v, “score”, None) if v is not None else None
item[f”{k}_reason”] = getattr(v, “reason”, None) if v is not None else None
else:
for mr in metrics_list or []:
name = getattr(mr, “name”, None) or getattr(getattr(mr, “metric”, None), “name”, None)
if not name:
name = mr.__class__.__name__
item[f”{name}_score”] = getattr(mr, “score”, None)
item[f”{name}_reason”] = getattr(mr, “reason”, None)
extracted.append(item)
return extracted

case_metrics = try_extract_case_metrics(results)

df_base = pd.DataFrame([{
“case_id”: i,
“query”: tc.input,
“actual_output”: tc.actual_output,
“expected_output”: tc.expected_output,
} for i, tc in enumerate(test_cases)])

df_metrics = pd.DataFrame(case_metrics) if case_metrics else pd.DataFrame([])
df = df_base.merge(df_metrics, on=”case_id”, how=”left”)

score_cols = [c for c in df.columns if c.endswith(“_score”)]
compact = df[[“case_id”, “query”] + score_cols].copy()

print(“n Compact score table:”)
display(compact)

print(“n Full details (includes reasons):”)
display(df)

print(“n Done. Tip: if contextual precision/recall are low, improve retriever ranking/coverage; if faithfulness is low, tighten generation to only use context.”)

We finalize the workflow by executing the evaluate function, which triggers the LLM-as-a-judge process to score each test case against our defined metrics. We then aggregate these scores and their corresponding qualitative reasoning into a centralized DataFrame, providing a granular view of where the RAG pipeline excels or requires further optimization in retrieval and generation.

At last, we conclude by running our comprehensive evaluation suite, in which DeepEval transforms complex linguistic outputs into actionable data using metrics such as Faithfulness, Contextual Precision, and the G-Eval rubric. This systematic approach allows us to diagnose “silent failures” in retrieval and hallucinations in generation with surgical precision, providing the reasoning necessary to justify architectural changes. With these results, we move forward from experimental prototyping to a production-ready RAG system backed by a verifiable, metric-driven safety net.

Check out the FULL CODES here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post A Coding Implementation to Automating LLM Quality Assurance with DeepEval, Custom Retrievers, and LLM-as-a-Judge Metrics appeared first on MarkTechPost.

How Machine Learning and Semantic Embeddings Reorder CVE Vulnerabiliti …

In this tutorial, we build an AI-assisted vulnerability scanner that goes beyond static CVSS scoring and instead learns to prioritize vulnerabilities using semantic understanding and machine learning. We treat vulnerability descriptions as rich linguistic artifacts, embed them using modern sentence transformers, and combine these representations with structural metadata to produce a data-driven priority score. Also, we demonstrate how security teams can shift from rule-based triage to adaptive, explainable, ML-driven risk assessment. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserprint(“Installing required packages…”)
import subprocess
import sys

packages = [
‘sentence-transformers’,
‘scikit-learn’,
‘pandas’,
‘numpy’,
‘matplotlib’,
‘seaborn’,
‘requests’
]

for package in packages:
subprocess.check_call([sys.executable, ‘-m’, ‘pip’, ‘install’, ‘-q’, package])

import requests
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import json
import re
from collections import Counter
import warnings
warnings.filterwarnings(‘ignore’)

from sentence_transformers import SentenceTransformer
from sklearn.ensemble import RandomForestClassifier, GradientBoostingRegressor
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, mean_squared_error

import matplotlib.pyplot as plt
import seaborn as sns

print(“✓ All packages installed successfully!n”)

We install and load all required NLP, machine learning, and visualization libraries for the end-to-end pipeline. We ensure the runtime is fully self-contained and ready to execute in Colab or similar notebook environments. It establishes a reproducible foundation for the scanner. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserclass CVEDataFetcher:
def __init__(self):
self.base_url = “https://services.nvd.nist.gov/rest/json/cves/2.0″

def fetch_recent_cves(self, days=30, max_results=100):
print(f”Fetching CVEs from last {days} days…”)

end_date = datetime.now()
start_date = end_date – timedelta(days=days)

params = {
‘pubStartDate’: start_date.strftime(‘%Y-%m-%dT00:00:00.000’),
‘pubEndDate’: end_date.strftime(‘%Y-%m-%dT23:59:59.999’),
‘resultsPerPage’: min(max_results, 2000)
}

try:
response = requests.get(self.base_url, params=params, timeout=30)
response.raise_for_status()
data = response.json()

cves = []
for item in data.get(‘vulnerabilities’, [])[:max_results]:
cve = item.get(‘cve’, {})
cve_id = cve.get(‘id’, ‘Unknown’)

descriptions = cve.get(‘descriptions’, [])
description = next((d[‘value’] for d in descriptions if d[‘lang’] == ‘en’), ‘No description’)

metrics = cve.get(‘metrics’, {})
cvss_v3 = metrics.get(‘cvssMetricV31’, [{}])[0].get(‘cvssData’, {})
cvss_v2 = metrics.get(‘cvssMetricV2’, [{}])[0].get(‘cvssData’, {})

base_score = cvss_v3.get(‘baseScore’) or cvss_v2.get(‘baseScore’) or 0.0
severity = cvss_v3.get(‘baseSeverity’) or ‘UNKNOWN’

published = cve.get(‘published’, ”)
references = cve.get(‘references’, [])

cves.append({
‘cve_id’: cve_id,
‘description’: description,
‘cvss_score’: float(base_score),
‘severity’: severity,
‘published’: published,
‘reference_count’: len(references),
‘attack_vector’: cvss_v3.get(‘attackVector’, ‘UNKNOWN’),
‘attack_complexity’: cvss_v3.get(‘attackComplexity’, ‘UNKNOWN’),
‘privileges_required’: cvss_v3.get(‘privilegesRequired’, ‘UNKNOWN’),
‘user_interaction’: cvss_v3.get(‘userInteraction’, ‘UNKNOWN’)
})

print(f”✓ Fetched {len(cves)} CVEsn”)
return pd.DataFrame(cves)

except Exception as e:
print(f”Error fetching CVEs: {e}”)
return self._generate_sample_data(max_results)

def _generate_sample_data(self, n=50):
print(“Using sample CVE data for demonstration…n”)

sample_descriptions = [
“A buffer overflow vulnerability in the network driver allows remote code execution”,
“SQL injection vulnerability in web application login form enables unauthorized access”,
“Cross-site scripting (XSS) vulnerability in user input validation”,
“Authentication bypass in admin panel due to weak session management”,
“Remote code execution via deserialization of untrusted data”,
“Path traversal vulnerability allows reading arbitrary files”,
“Privilege escalation through improper input validation”,
“Denial of service through resource exhaustion in API endpoint”,
“Information disclosure via error messages exposing sensitive data”,
“Memory corruption vulnerability in image processing library”,
“Command injection in file upload functionality”,
“Integer overflow leading to heap buffer overflow”,
“Use-after-free vulnerability in memory management”,
“Race condition in multi-threaded application”,
“Cryptographic weakness in password storage mechanism”
]

severities = [‘LOW’, ‘MEDIUM’, ‘HIGH’, ‘CRITICAL’]
attack_vectors = [‘NETWORK’, ‘ADJACENT’, ‘LOCAL’, ‘PHYSICAL’]
complexities = [‘LOW’, ‘HIGH’]

data = []
for i in range(n):
severity = np.random.choice(severities, p=[0.1, 0.3, 0.4, 0.2])
score_ranges = {‘LOW’: (0.1, 3.9), ‘MEDIUM’: (4.0, 6.9), ‘HIGH’: (7.0, 8.9), ‘CRITICAL’: (9.0, 10.0)}

data.append({
‘cve_id’: f’CVE-2024-{10000+i}’,
‘description’: np.random.choice(sample_descriptions),
‘cvss_score’: np.random.uniform(*score_ranges[severity]),
‘severity’: severity,
‘published’: (datetime.now() – timedelta(days=np.random.randint(1, 30))).isoformat(),
‘reference_count’: np.random.randint(1, 10),
‘attack_vector’: np.random.choice(attack_vectors),
‘attack_complexity’: np.random.choice(complexities),
‘privileges_required’: np.random.choice([‘NONE’, ‘LOW’, ‘HIGH’]),
‘user_interaction’: np.random.choice([‘NONE’, ‘REQUIRED’])
})

return pd.DataFrame(data)

We implement a robust CVE ingestion component that pulls recent vulnerabilities directly from the NVD API. We normalize raw CVE records into structured features while gracefully falling back to synthetic data when API access fails. It allows the tutorial to remain runnable while reflecting real-world challenges in data ingestion. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserclass VulnerabilityFeatureExtractor:
def __init__(self):
print(“Loading sentence transformer model…”)
self.model = SentenceTransformer(‘all-MiniLM-L6-v2’)
print(“✓ Model loadedn”)

self.critical_keywords = {
‘execution’: [‘remote code execution’, ‘rce’, ‘execute’, ‘arbitrary code’],
‘injection’: [‘sql injection’, ‘command injection’, ‘code injection’],
‘authentication’: [‘bypass’, ‘authentication’, ‘authorization’],
‘overflow’: [‘buffer overflow’, ‘heap overflow’, ‘stack overflow’],
‘exposure’: [‘information disclosure’, ‘data leak’, ‘exposure’],
}

def extract_semantic_features(self, descriptions):
print(“Generating semantic embeddings…”)
embeddings = self.model.encode(descriptions, show_progress_bar=True)
return embeddings

def extract_keyword_features(self, df):
print(“Extracting keyword features…”)

for category, keywords in self.critical_keywords.items():
df[f’has_{category}’] = df[‘description’].apply(
lambda x: any(kw in x.lower() for kw in keywords)
).astype(int)

df[‘desc_length’] = df[‘description’].apply(len)
df[‘word_count’] = df[‘description’].apply(lambda x: len(x.split()))

return df

def encode_categorical_features(self, df):
print(“Encoding categorical features…”)

categorical_cols = [‘attack_vector’, ‘attack_complexity’, ‘privileges_required’, ‘user_interaction’]

for col in categorical_cols:
dummies = pd.get_dummies(df[col], prefix=col)
df = pd.concat([df, dummies], axis=1)

return df

We transform unstructured vulnerability descriptions into dense semantic embeddings using a sentence-transformer model. We also extract keyword-based risk indicators and textual statistics that capture exploit intent and complexity. Together, these features bridge linguistic context with quantitative ML inputs. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserclass VulnerabilityPrioritizer:
def __init__(self):
self.severity_classifier = RandomForestClassifier(n_estimators=100, random_state=42)
self.score_predictor = GradientBoostingRegressor(n_estimators=100, random_state=42)
self.scaler = StandardScaler()
self.feature_cols = None

def prepare_features(self, df, embeddings):
numeric_features = [‘reference_count’, ‘desc_length’, ‘word_count’]
keyword_features = [col for col in df.columns if col.startswith(‘has_’)]
categorical_features = [col for col in df.columns if any(col.startswith(prefix) for prefix in [‘attack_vector_’, ‘attack_complexity_’, ‘privileges_required_’, ‘user_interaction_’])]
self.feature_cols = numeric_features + keyword_features + categorical_features
X_structured = df[self.feature_cols].values
X_embeddings = embeddings
X_combined = np.hstack([X_structured, X_embeddings])
return X_combined

def train_models(self, X, y_severity, y_score):
print(“nTraining ML models…”)
X_scaled = self.scaler.fit_transform(X)
X_train, X_test, y_sev_train, y_sev_test, y_score_train, y_score_test = train_test_split(
X_scaled, y_severity, y_score, test_size=0.2, random_state=42
)
self.severity_classifier.fit(X_train, y_sev_train)
sev_pred = self.severity_classifier.predict(X_test)
self.score_predictor.fit(X_train, y_score_train)
score_pred = self.score_predictor.predict(X_test)
print(“n— Severity Classification Report —“)
print(classification_report(y_sev_test, sev_pred))
print(f”n— CVSS Score Prediction —“)
print(f”RMSE: {np.sqrt(mean_squared_error(y_score_test, score_pred)):.2f}”)
return X_scaled

def predict_priority(self, X):
X_scaled = self.scaler.transform(X)
severity_pred = self.severity_classifier.predict_proba(X_scaled)
score_pred = self.score_predictor.predict(X_scaled)
severity_weight = severity_pred[:, -1] * 0.4
score_weight = (score_pred / 10.0) * 0.6
priority_score = severity_weight + score_weight
return priority_score, severity_pred, score_pred

def get_feature_importance(self):
importance = self.score_predictor.feature_importances_
n_structured = len(self.feature_cols)
structured_importance = importance[:n_structured]
embedding_importance = importance[n_structured:]
feature_imp_df = pd.DataFrame({
‘feature’: self.feature_cols,
‘importance’: structured_importance
}).sort_values(‘importance’, ascending=False)
return feature_imp_df, embedding_importance.mean()

We train supervised models to predict both vulnerability severity classes and CVSS-like scores from learned features. We combine structured metadata with embeddings to create a hybrid feature space and derive a composite priority score. This is where the scanner learns how to rank vulnerabilities beyond static heuristics. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserclass VulnerabilityAnalyzer:
def __init__(self, n_clusters=5):
self.n_clusters = n_clusters
self.kmeans = KMeans(n_clusters=n_clusters, random_state=42)

def cluster_vulnerabilities(self, embeddings):
print(f”nClustering vulnerabilities into {self.n_clusters} groups…”)
clusters = self.kmeans.fit_predict(embeddings)
return clusters

def analyze_clusters(self, df, clusters):
df[‘cluster’] = clusters
print(“n— Cluster Analysis —“)
for i in range(self.n_clusters):
cluster_df = df[df[‘cluster’] == i]
print(f”nCluster {i} ({len(cluster_df)} vulnerabilities):”)
print(f” Avg CVSS Score: {cluster_df[‘cvss_score’].mean():.2f}”)
print(f” Severity Distribution: {cluster_df[‘severity’].value_counts().to_dict()}”)
print(f” Top keywords: “, end=””)
all_words = ‘ ‘.join(cluster_df[‘description’].values).lower()
words = re.findall(r’b[a-z]{4,}b’, all_words)
common = Counter(words).most_common(5)
print(‘, ‘.join([w for w, _ in common]))
return df

We cluster vulnerabilities based on embedding similarity to uncover recurring exploit patterns. We analyze each cluster to understand dominant attack themes, severity distributions, and common exploit terminology. It helps surface systemic risks rather than isolated issues. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserdef visualize_results(df, priority_scores, feature_importance):
fig, axes = plt.subplots(2, 3, figsize=(18, 10))
fig.suptitle(‘Vulnerability Scanner – ML Analysis Dashboard’, fontsize=16, fontweight=’bold’)
axes[0, 0].hist(priority_scores, bins=30, color=’crimson’, alpha=0.7, edgecolor=’black’)
axes[0, 0].set_xlabel(‘Priority Score’)
axes[0, 0].set_ylabel(‘Frequency’)
axes[0, 0].set_title(‘Priority Score Distribution’)
axes[0, 0].axvline(np.percentile(priority_scores, 75), color=’orange’, linestyle=’–‘, label=’75th percentile’)
axes[0, 0].legend()
axes[0, 1].scatter(df[‘cvss_score’], priority_scores, alpha=0.6, c=priority_scores, cmap=’RdYlGn_r’, s=50)
axes[0, 1].set_xlabel(‘CVSS Score’)
axes[0, 1].set_ylabel(‘ML Priority Score’)
axes[0, 1].set_title(‘CVSS vs ML Priority’)
axes[0, 1].plot([0, 10], [0, 1], ‘k–‘, alpha=0.3)
severity_counts = df[‘severity’].value_counts()
colors = {‘CRITICAL’: ‘darkred’, ‘HIGH’: ‘red’, ‘MEDIUM’: ‘orange’, ‘LOW’: ‘yellow’}
axes[0, 2].bar(severity_counts.index, severity_counts.values, color=[colors.get(s, ‘gray’) for s in severity_counts.index])
axes[0, 2].set_xlabel(‘Severity’)
axes[0, 2].set_ylabel(‘Count’)
axes[0, 2].set_title(‘Severity Distribution’)
axes[0, 2].tick_params(axis=’x’, rotation=45)
top_features = feature_importance.head(10)
axes[1, 0].barh(top_features[‘feature’], top_features[‘importance’], color=’steelblue’)
axes[1, 0].set_xlabel(‘Importance’)
axes[1, 0].set_title(‘Top 10 Feature Importance’)
axes[1, 0].invert_yaxis()
if ‘cluster’ in df.columns:
cluster_counts = df[‘cluster’].value_counts().sort_index()
axes[1, 1].bar(cluster_counts.index, cluster_counts.values, color=’teal’, alpha=0.7)
axes[1, 1].set_xlabel(‘Cluster’)
axes[1, 1].set_ylabel(‘Count’)
axes[1, 1].set_title(‘Vulnerability Clusters’)
attack_vector_counts = df[‘attack_vector’].value_counts()
axes[1, 2].pie(attack_vector_counts.values, labels=attack_vector_counts.index, autopct=’%1.1f%%’, startangle=90)
axes[1, 2].set_title(‘Attack Vector Distribution’)
plt.tight_layout()
plt.show()

def main():
print(“=”*70)
print(“AI-ASSISTED VULNERABILITY SCANNER WITH ML PRIORITIZATION”)
print(“=”*70)
print()
fetcher = CVEDataFetcher()
df = fetcher.fetch_recent_cves(days=30, max_results=50)
print(f”Dataset Overview:”)
print(f” Total CVEs: {len(df)}”)
print(f” Date Range: {df[‘published’].min()[:10]} to {df[‘published’].max()[:10]}”)
print(f” Severity Breakdown: {df[‘severity’].value_counts().to_dict()}”)
print()
feature_extractor = VulnerabilityFeatureExtractor()
embeddings = feature_extractor.extract_semantic_features(df[‘description’].tolist())
df = feature_extractor.extract_keyword_features(df)
df = feature_extractor.encode_categorical_features(df)
prioritizer = VulnerabilityPrioritizer()
X = prioritizer.prepare_features(df, embeddings)
severity_map = {‘LOW’: 0, ‘MEDIUM’: 1, ‘HIGH’: 2, ‘CRITICAL’: 3, ‘UNKNOWN’: 1}
y_severity = df[‘severity’].map(severity_map).values
y_score = df[‘cvss_score’].values
X_scaled = prioritizer.train_models(X, y_severity, y_score)
priority_scores, severity_probs, score_preds = prioritizer.predict_priority(X)
df[‘ml_priority_score’] = priority_scores
df[‘predicted_score’] = score_preds
analyzer = VulnerabilityAnalyzer(n_clusters=5)
clusters = analyzer.cluster_vulnerabilities(embeddings)
df = analyzer.analyze_clusters(df, clusters)
feature_imp, emb_imp = prioritizer.get_feature_importance()
print(f”n— Feature Importance —“)
print(feature_imp.head(10))
print(f”nAverage embedding importance: {emb_imp:.4f}”)
print(“n” + “=”*70)
print(“TOP 10 PRIORITY VULNERABILITIES”)
print(“=”*70)
top_vulns = df.nlargest(10, ‘ml_priority_score’)[[‘cve_id’, ‘cvss_score’, ‘ml_priority_score’, ‘severity’, ‘description’]]
for idx, row in top_vulns.iterrows():
print(f”n{row[‘cve_id’]} [Priority: {row[‘ml_priority_score’]:.3f}]”)
print(f” CVSS: {row[‘cvss_score’]:.1f} | Severity: {row[‘severity’]}”)
print(f” {row[‘description’][:100]}…”)
print(“nnGenerating visualizations…”)
visualize_results(df, priority_scores, feature_imp)
print(“n” + “=”*70)
print(“ANALYSIS COMPLETE”)
print(“=”*70)
print(f”nResults summary:”)
print(f” High Priority (>0.7): {(priority_scores > 0.7).sum()} vulnerabilities”)
print(f” Medium Priority (0.4-0.7): {((priority_scores >= 0.4) & (priority_scores <= 0.7)).sum()}”)
print(f” Low Priority (<0.4): {(priority_scores < 0.4).sum()}”)
return df, prioritizer, analyzer

if __name__ == “__main__”:
results_df, prioritizer, analyzer = main()
print(“n✓ All analyses completed successfully!”)
print(“nYou can now:”)
print(” – Access results via ‘results_df’ DataFrame”)
print(” – Use ‘prioritizer’ to predict new vulnerabilities”)
print(” – Explore ‘analyzer’ for clustering insights”)

We generate an interactive analysis dashboard that visualizes priority distributions, feature importance, clusters, and attack vectors. We execute the complete pipeline, rank the highest-priority vulnerabilities, and summarize actionable insights. It turns raw model outputs into decision-ready intelligence.

In conclusion, we implemented how vulnerability management can evolve from static scoring to intelligent prioritization using machine learning and semantic analysis. By combining embeddings, metadata, clustering, and explainability, we created a system that better reflects real-world exploit risk and operational urgency. It lays the groundwork for adaptive security pipelines where prioritization improves continuously as new vulnerability data emerges.

Check out the FULL CODES here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post How Machine Learning and Semantic Embeddings Reorder CVE Vulnerabilities Beyond Raw CVSS Scores appeared first on MarkTechPost.

GitHub Releases Copilot-SDK to Embed Its Agentic Runtime in Any App

GitHub has opened up the internal agent runtime that powers GitHub Copilot CLI and exposed it as a programmable SDK. The GitHub Copilot-SDK, now in technical preview, lets you embed the same agentic execution loop into any application so the agent can plan, invoke tools, edit files, and run commands as part of your own workflows.

What the GitHub Copilot SDK provides

The GitHub Copilot-SDK is a multi platform SDK for integrating the GitHub Copilot Agent into applications and services. It gives programmatic access to the execution loop that already powers GitHub Copilot CLI. Instead of building your own planner and tool loop for each project, you attach your logic to this existing runtime and treat it as an execution platform.

The GitHub Copilot-SDK exposes the same production tested runtime used by Copilot CLI, with support for multi model operation, multi step planning, tools, Model Context Protocol (MCP) integration, authentication, and streaming. This gives you the same agent behavior that Copilot uses in the terminal, but callable from your own code.

Agentic execution loop as a runtime primitive

The core abstraction is the agentic execution loop. In Copilot CLI and in the SDK, interactions are not isolated prompts. The agent maintains state across turns, chooses plans, calls tools, executes commands, reads results, and repeats these steps until it reaches the goal that you provided.

The GitHub team describes the usual problems when you implement this loop yourself. You need to manage context across multiple turns, orchestrate external tools and commands, route calls across models, integrate MCP servers, and think through permiss developer, you concentrate on defining domain specific tools, describing tasks, and constraining what the agent can do.

Supported languages and core API

The Copilot-SDK is available in 4 languages in this technical preview:

Node.js and TypeScript, through the package @github/copilot-cli-sdk

Python, through the package copilot

Go, through the module github.com/github/copilot-cli-sdk-go

.NET, through the package GitHub.Copilot.SDK

All SDKs expose a consistent API surface. According to the changelog, every language binding supports multi-turn conversations with session history, custom tool execution, and programmatic control over client and session life cycles.

Tools, MCP servers, and integration with existing systems

A main feature of the Copilot agent is tool execution. Through the SDK you can register custom tools that the model can call during a conversation. The Copilot-CLI already exposes custom tool definitions and full MCP server integration, and the SDK reuses that capability.

MCP gives a standard protocol for agents to connect to external systems such as internal APIs, document stores, or operations tools. When you integrate an MCP server, the Copilot agent can discover and call its operations in a structured way with consistent metadata rather than ad hoc prompt engineering.

The pattern is straightforward. You define a tool with a clear schema and effect, you expose it through the SDK, and the Copilot planner decides when and how to call it as part of the multi step plan.

Authentication, subscriptions, and streaming

The SDK integrates with GitHub authentication and Copilot subscriptions. You can either use an existing GitHub Copilot subscription or bring your own key when configuring the SDK. This is important when you embed the agent in enterprise environments where identity and access control are already standardized around GitHub.

Streaming is part of the contract. Copilot-CLI already supports real time streaming in the terminal, and the SDK exposes streaming so that applications can receive responses incrementally. This allows you to build user interfaces that update progressively as the agent reasons and executes, without waiting for a full completion.

Relationship to GitHub Copilot-CLI

The SDK is not a separate agent implementation. It is a layer on top of the existing Copilot CLI execution loop. It as a way to reuse the planning, tool use, and multi turn execution behavior of the CLI in any environment.

Copilot-CLI itself continues to evolve. Recent updates add persistent memory, infinite sessions, and context compaction, support for explore and plan workflows with model selection per step, custom agents and agent skills, full MCP support, and asynchronous task delegation. The SDK benefits from this work, because it exposes that same behavior through language specific libraries.

Key Takeaways

GitHub Copilot-SDK exposes the same agentic execution loop that powers GitHub Copilot CLI, so applications can call a production tested planner that runs multi step workflows with tools and commands.

The SDK is available for Node.js, Python, Go, and .NET, and each language binding provides a similar abstraction around clients and sessions that manage multi turn conversations and tool use.

Developers define domain specific tools and Model Context Protocol servers, then register them through the SDK, and the Copilot agent decides when and how to call them as part of the plan.

The runtime integrates with GitHub authentication and Copilot subscriptions, supports multiple AI models such as GPT based backends, and exposes real time streaming so applications can render partial responses incrementally.

Check out the GitHub Page. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post GitHub Releases Copilot-SDK to Embed Its Agentic Runtime in Any App appeared first on MarkTechPost.

How an AI Agent Chooses What to Do Under Tokens, Latency, and Tool-Cal …

In this tutorial, we build a cost-aware planning agent that deliberately balances output quality against real-world constraints such as token usage, latency, and tool-call budgets. We design the agent to generate multiple candidate actions, estimate their expected costs and benefits, and then select an execution plan that maximizes value while staying within strict budgets. With this, we demonstrate how agentic systems can move beyond “always use the LLM” behavior and instead reason explicitly about trade-offs, efficiency, and resource awareness, which is critical for deploying agents reliably in constrained environments. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserimport os, time, math, json, random
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple, Any
from getpass import getpass

USE_OPENAI = True

if USE_OPENAI:
if not os.getenv(“OPENAI_API_KEY”):
os.environ[“OPENAI_API_KEY”] = getpass(“Enter OPENAI_API_KEY (hidden): “).strip()
try:
from openai import OpenAI
client = OpenAI()
except Exception as e:
print(“OpenAI SDK import failed. Falling back to offline mode.nError:”, e)
USE_OPENAI = False

We set up the execution environment and securely load the OpenAI API key at runtime without hardcoding it. We also initialize the client so the agent gracefully falls back to offline mode if the API is unavailable. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserdef approx_tokens(text: str) -> int:
return max(1, math.ceil(len(text) / 4))

@dataclass
class Budget:
max_tokens: int
max_latency_ms: int
max_tool_calls: int

@dataclass
class Spend:
tokens: int = 0
latency_ms: int = 0
tool_calls: int = 0

def within(self, b: Budget) -> bool:
return (self.tokens <= b.max_tokens and
self.latency_ms <= b.max_latency_ms and
self.tool_calls <= b.max_tool_calls)

def add(self, other: “Spend”) -> “Spend”:
return Spend(
tokens=self.tokens + other.tokens,
latency_ms=self.latency_ms + other.latency_ms,
tool_calls=self.tool_calls + other.tool_calls
)

We define the core budgeting abstractions that enable the agent to reason explicitly about costs. We model token usage, latency, and tool calls as first-class quantities and provide utility methods to accumulate and validate spend. It gives us a clean foundation for enforcing constraints throughout planning and execution. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browser@dataclass
class StepOption:
name: str
description: str
est_spend: Spend
est_value: float
executor: str
payload: Dict[str, Any] = field(default_factory=dict)

@dataclass
class PlanCandidate:
steps: List[StepOption]
spend: Spend
value: float
rationale: str = “”

def llm_text(prompt: str, *, model: str = “gpt-5”, effort: str = “low”) -> str:
if not USE_OPENAI:
return “”
t0 = time.time()
resp = client.responses.create(
model=model,
reasoning={“effort”: effort},
input=prompt,
)
_ = (time.time() – t0)
return resp.output_text or “”

We introduce the data structures that represent individual action choices and full plan candidates. We also define a lightweight LLM wrapper that standardizes how text is generated and measured. This separation allows the planner to reason about actions abstractly without being tightly coupled to execution details. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserdef generate_step_options(task: str) -> List[StepOption]:
base = [
StepOption(
name=”Clarify deliverables (local)”,
description=”Extract deliverable checklist + acceptance criteria from the task.”,
est_spend=Spend(tokens=60, latency_ms=20, tool_calls=0),
est_value=6.0,
executor=”local”,
),
StepOption(
name=”Outline plan (LLM)”,
description=”Create a structured outline with sections, constraints, and assumptions.”,
est_spend=Spend(tokens=600, latency_ms=1200, tool_calls=1),
est_value=10.0,
executor=”llm”,
payload={“prompt_kind”:”outline”}
),
StepOption(
name=”Outline plan (local)”,
description=”Create a rough outline using templates (no LLM).”,
est_spend=Spend(tokens=120, latency_ms=40, tool_calls=0),
est_value=5.5,
executor=”local”,
),
StepOption(
name=”Risk register (LLM)”,
description=”Generate risks, mitigations, owners, and severity.”,
est_spend=Spend(tokens=700, latency_ms=1400, tool_calls=1),
est_value=9.0,
executor=”llm”,
payload={“prompt_kind”:”risks”}
),
StepOption(
name=”Risk register (local)”,
description=”Generate a standard risk register from a reusable template.”,
est_spend=Spend(tokens=160, latency_ms=60, tool_calls=0),
est_value=5.0,
executor=”local”,
),
StepOption(
name=”Timeline (LLM)”,
description=”Draft a realistic milestone timeline with dependencies.”,
est_spend=Spend(tokens=650, latency_ms=1300, tool_calls=1),
est_value=8.5,
executor=”llm”,
payload={“prompt_kind”:”timeline”}
),
StepOption(
name=”Timeline (local)”,
description=”Draft a simple timeline from a generic milestone template.”,
est_spend=Spend(tokens=150, latency_ms=60, tool_calls=0),
est_value=4.8,
executor=”local”,
),
StepOption(
name=”Quality pass (LLM)”,
description=”Rewrite for clarity, consistency, and formatting.”,
est_spend=Spend(tokens=900, latency_ms=1600, tool_calls=1),
est_value=8.0,
executor=”llm”,
payload={“prompt_kind”:”polish”}
),
StepOption(
name=”Quality pass (local)”,
description=”Light formatting + consistency checks without LLM.”,
est_spend=Spend(tokens=120, latency_ms=50, tool_calls=0),
est_value=3.5,
executor=”local”,
),
]

if USE_OPENAI:
meta_prompt = f”””
You are a planning assistant. For the task below, propose 3-5 OPTIONAL extra steps that improve quality,
like checks, validations, or stakeholder tailoring. Keep each step short.

TASK:
{task}

Return JSON list with fields: name, description, est_value(1-10).
“””
txt = llm_text(meta_prompt, model=”gpt-5″, effort=”low”)
try:
items = json.loads(txt.strip())
for it in items[:5]:
base.append(
StepOption(
name=str(it.get(“name”,”Extra step (local)”))[:60],
description=str(it.get(“description”,””))[:200],
est_spend=Spend(tokens=120, latency_ms=60, tool_calls=0),
est_value=float(it.get(“est_value”, 5.0)),
executor=”local”,
)
)
except Exception:
pass

return base

We focus on generating a diverse set of candidate steps, including both LLM-based and local alternatives with different cost–quality trade-offs. We optionally use the model itself to suggest additional low-cost improvements while still controlling their impact on the budget. By doing so, we enrich the action space without losing efficiency. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserdef plan_under_budget(
options: List[StepOption],
budget: Budget,
*,
max_steps: int = 6,
beam_width: int = 12,
diversity_penalty: float = 0.2
) -> PlanCandidate:
def redundancy_cost(chosen: List[StepOption], new: StepOption) -> float:
key_new = new.name.split(“(“)[0].strip().lower()
overlap = 0
for s in chosen:
key_s = s.name.split(“(“)[0].strip().lower()
if key_s == key_new:
overlap += 1
return overlap * diversity_penalty

beams: List[PlanCandidate] = [PlanCandidate(steps=[], spend=Spend(), value=0.0, rationale=””)]

for _ in range(max_steps):
expanded: List[PlanCandidate] = []
for cand in beams:
for opt in options:
if opt in cand.steps:
continue
new_spend = cand.spend.add(opt.est_spend)
if not new_spend.within(budget):
continue
new_value = cand.value + opt.est_value – redundancy_cost(cand.steps, opt)
expanded.append(
PlanCandidate(
steps=cand.steps + [opt],
spend=new_spend,
value=new_value,
rationale=cand.rationale
)
)
if not expanded:
break
expanded.sort(key=lambda c: c.value, reverse=True)
beams = expanded[:beam_width]

best = max(beams, key=lambda c: c.value)
return best

We implement the budget-constrained planning logic that searches for the highest-value combination of steps under strict limits. We apply a beam-style search with redundancy penalties to avoid wasteful action overlap. This is where the agent truly becomes cost-aware by optimizing value subject to constraints. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserdef run_local_step(task: str, step: StepOption, working: Dict[str, Any]) -> str:
name = step.name.lower()
if “clarify deliverables” in name:
return (
“Deliverables checklist:n”
“- Executive summaryn- Scope & assumptionsn- Workplan + milestonesn”
“- Risk register (risk, impact, likelihood, mitigation, owner)n”
“- Next steps + data neededn”
)
if “outline plan” in name:
return (
“Outline:n1) Context & objectiven2) Scopen3) Approachn4) Timelinen5) Risksn6) Next stepsn”
)
if “risk register” in name:
return (
“Risk register (template):n”
“1) Data access delays | High | Mitigation: agree data list + ownersn”
“2) Stakeholder alignment | Med | Mitigation: weekly reviewn”
“3) Tooling constraints | Med | Mitigation: phased rolloutn”
)
if “timeline” in name:
return (
“Timeline (template):n”
“Week 1: discovery + requirementsnWeek 2: prototype + feedbackn”
“Week 3: pilot + metricsnWeek 4: rollout + handovern”
)
if “quality pass” in name:
draft = working.get(“draft”, “”)
return “Light quality pass done (headings normalized, bullets aligned).n” + draft
return f”Completed: {step.name}n”

def run_llm_step(task: str, step: StepOption, working: Dict[str, Any]) -> str:
kind = step.payload.get(“prompt_kind”, “generic”)
context = working.get(“draft”, “”)
prompts = {
“outline”: f”Create a crisp, structured outline for the task below.nTASK:n{task}nReturn a numbered outline.”,
“risks”: f”Create a risk register for the task below. Include: Risk | Impact | Likelihood | Mitigation | Owner.nTASK:n{task}”,
“timeline”: f”Create a realistic milestone timeline with dependencies for the task below.nTASK:n{task}”,
“polish”: f”Rewrite and polish the following draft for clarity and consistency.nDRAFT:n{context}”,
“generic”: f”Help with this step: {step.description}nTASK:n{task}nCURRENT:n{context}”,
}
return llm_text(prompts.get(kind, prompts[“generic”]), model=”gpt-5″, effort=”low”)

def execute_plan(task: str, plan: PlanCandidate) -> Tuple[str, Spend]:
working = {“draft”: “”}
actual = Spend()

for i, step in enumerate(plan.steps, 1):
t0 = time.time()
if step.executor == “llm” and USE_OPENAI:
out = run_llm_step(task, step, working)
tool_calls = 1
else:
out = run_local_step(task, step, working)
tool_calls = 0

dt_ms = int((time.time() – t0) * 1000)
tok = approx_tokens(out)

actual = actual.add(Spend(tokens=tok, latency_ms=dt_ms, tool_calls=tool_calls))
working[“draft”] += f”nn### Step {i}: {step.name}n{out}n”

return working[“draft”].strip(), actual

TASK = “Draft a 1-page project proposal for a logistics dashboard + fleet optimization pilot, including scope, timeline, and risks.”
BUDGET = Budget(
max_tokens=2200,
max_latency_ms=3500,
max_tool_calls=2
)

options = generate_step_options(TASK)
best_plan = plan_under_budget(options, BUDGET, max_steps=6, beam_width=14)

print(“=== SELECTED PLAN (budget-aware) ===”)
for s in best_plan.steps:
print(f”- {s.name} | est_spend={s.est_spend} | est_value={s.est_value}”)
print(“nEstimated spend:”, best_plan.spend)
print(“Budget:”, BUDGET)

print(“n=== EXECUTING PLAN ===”)
draft, actual = execute_plan(TASK, best_plan)

print(“n=== OUTPUT DRAFT ===n”)
print(draft[:6000])

print(“n=== ACTUAL SPEND (approx) ===”)
print(actual)
print(“nWithin budget?”, actual.within(BUDGET))

We execute the selected plan and track actual resource usage step by step. We dynamically choose between local and LLM execution paths and aggregate the final output into a coherent draft. By comparing estimated and actual spend, we demonstrate how planning assumptions can be validated and refined in practice.

In conclusion, we demonstrated how a cost-aware planning agent can reason about its resource consumption and adapt its behavior in real time. We executed only the steps that fit within predefined budgets and tracked actual spend to validate the planning assumptions, closing the loop between estimation and execution. Also, we highlighted how agentic AI systems can become more practical, controllable, and scalable by treating cost, latency, and tool usage as first-class decision variables rather than afterthoughts.

Check out the FULL CODES here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post How an AI Agent Chooses What to Do Under Tokens, Latency, and Tool-Call Budget Constraints? appeared first on MarkTechPost.