How to Design Transactional Agentic AI Systems with LangGraph Using Tw …

In this tutorial, we implement an agentic AI pattern using LangGraph that treats reasoning and action as a transactional workflow rather than a single-shot decision. We model a two-phase commit system in which an agent stages reversible changes, validates strict invariants, pauses for human approval via graph interrupts, and commits or rolls back only then. With this, we demonstrate how agentic systems can be designed with safety, auditability, and controllability at their core, moving beyond reactive chat agents toward structured, governance-aware AI workflows that run reliably in Google Colab using OpenAI models. Check out the Full Codes here.

Copy CodeCopiedUse a different Browser!pip -q install -U langgraph langchain-openai

import os, json, uuid, copy, math, re, operator
from typing import Any, Dict, List, Optional
from typing_extensions import TypedDict, Annotated

from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage, AnyMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import interrupt, Command

def _set_env_openai():
if os.environ.get(“OPENAI_API_KEY”):
return
try:
from google.colab import userdata
k = userdata.get(“OPENAI_API_KEY”)
if k:
os.environ[“OPENAI_API_KEY”] = k
return
except Exception:
pass
import getpass
os.environ[“OPENAI_API_KEY”] = getpass.getpass(“Enter OPENAI_API_KEY: “)

_set_env_openai()

MODEL = os.environ.get(“OPENAI_MODEL”, “gpt-4o-mini”)
llm = ChatOpenAI(model=MODEL, temperature=0)

We set up the execution environment by installing LangGraph and initializing the OpenAI model. We securely load the API key and configure a deterministic LLM, ensuring that all downstream agent behavior remains reproducible and controlled. Check out the Full Codes here.

Copy CodeCopiedUse a different BrowserSAMPLE_LEDGER = [
{“txn_id”: “T001”, “name”: “Asha”, “email”: “ASHA@Example.com”, “amount”: “1,250.50”, “date”: “12/01/2025”, “note”: “Membership renewal”},
{“txn_id”: “T002”, “name”: “Ravi”, “email”: “ravi@example.com”, “amount”: “-500”, “date”: “2025-12-02”, “note”: “Chargeback?”},
{“txn_id”: “T003”, “name”: “Sara”, “email”: “sara@example.com”, “amount”: “700”, “date”: “02-12-2025”, “note”: “Late fee waived”},
{“txn_id”: “T003”, “name”: “Sara”, “email”: “sara@example.com”, “amount”: “700”, “date”: “02-12-2025”, “note”: “Duplicate row”},
{“txn_id”: “T004”, “name”: “Lee”, “email”: “lee@example.com”, “amount”: “NaN”, “date”: “2025/12/03”, “note”: “Bad amount”},
]

ALLOWED_OPS = {“replace”, “remove”, “add”}

def _parse_amount(x):
if isinstance(x, (int, float)):
return float(x)
if isinstance(x, str):
try:
return float(x.replace(“,”, “”))
except:
return None
return None

def _iso_date(d):
if not isinstance(d, str):
return None
d = d.replace(“/”, “-“)
p = d.split(“-“)
if len(p) == 3 and len(p[0]) == 4:
return d
if len(p) == 3 and len(p[2]) == 4:
return f”{p[2]}-{p[1]}-{p[0]}”
return None

def profile_ledger(rows):
seen, anomalies = {}, []
for i, r in enumerate(rows):
if _parse_amount(r.get(“amount”)) is None:
anomalies.append(i)
if r.get(“txn_id”) in seen:
anomalies.append(i)
seen[r.get(“txn_id”)] = i
return {“rows”: len(rows), “anomalies”: anomalies}

def apply_patch(rows, patch):
out = copy.deepcopy(rows)
for op in sorted([p for p in patch if p[“op”] == “remove”], key=lambda x: x[“idx”], reverse=True):
out.pop(op[“idx”])
for op in patch:
if op[“op”] in {“add”, “replace”}:
out[op[“idx”]][op[“field”]] = op[“value”]
return out

def validate(rows):
issues = []
for i, r in enumerate(rows):
if _parse_amount(r.get(“amount”)) is None:
issues.append(i)
if _iso_date(r.get(“date”)) is None:
issues.append(i)
return {“ok”: len(issues) == 0, “issues”: issues}

We define the core ledger abstraction along with the patching, normalization, and validation logic. We treat data transformations as reversible operations, allowing the agent to reason about changes safely before committing them. Check out the Full Codes here.

Copy CodeCopiedUse a different Browserclass TxnState(TypedDict):
messages: Annotated[List[AnyMessage], add_messages]
raw_rows: List[Dict[str, Any]]
sandbox_rows: List[Dict[str, Any]]
patch: List[Dict[str, Any]]
validation: Dict[str, Any]
approved: Optional[bool]

def node_profile(state):
p = profile_ledger(state[“raw_rows”])
return {“messages”: [AIMessage(content=json.dumps(p))]}

def node_patch(state):
sys = SystemMessage(content=”Return a JSON patch list fixing amounts, dates, emails, duplicates”)
usr = HumanMessage(content=json.dumps(state[“raw_rows”]))
r = llm.invoke([sys, usr])
patch = json.loads(re.search(r”[.*]”, r.content, re.S).group())
return {“patch”: patch, “messages”: [AIMessage(content=json.dumps(patch))]}

def node_apply(state):
return {“sandbox_rows”: apply_patch(state[“raw_rows”], state[“patch”])}

def node_validate(state):
v = validate(state[“sandbox_rows”])
return {“validation”: v, “messages”: [AIMessage(content=json.dumps(v))]}

def node_approve(state):
decision = interrupt({“validation”: state[“validation”]})
return {“approved”: decision == “approve”}

def node_commit(state):
return {“messages”: [AIMessage(content=”COMMITTED”)]}

def node_rollback(state):
return {“messages”: [AIMessage(content=”ROLLED BACK”)]}

We model the agent’s internal state and define each node in the LangGraph workflow. We express agent behavior as discrete, inspectable steps that transform state while preserving message history. Check out the Full Codes here.

Copy CodeCopiedUse a different Browserbuilder = StateGraph(TxnState)

builder.add_node(“profile”, node_profile)
builder.add_node(“patch”, node_patch)
builder.add_node(“apply”, node_apply)
builder.add_node(“validate”, node_validate)
builder.add_node(“approve”, node_approve)
builder.add_node(“commit”, node_commit)
builder.add_node(“rollback”, node_rollback)

builder.add_edge(START, “profile”)
builder.add_edge(“profile”, “patch”)
builder.add_edge(“patch”, “apply”)
builder.add_edge(“apply”, “validate”)

builder.add_conditional_edges(
“validate”,
lambda s: “approve” if s[“validation”][“ok”] else “rollback”,
{“approve”: “approve”, “rollback”: “rollback”}
)

builder.add_conditional_edges(
“approve”,
lambda s: “commit” if s[“approved”] else “rollback”,
{“commit”: “commit”, “rollback”: “rollback”}
)

builder.add_edge(“commit”, END)
builder.add_edge(“rollback”, END)

app = builder.compile(checkpointer=InMemorySaver())

We construct the LangGraph state machine and explicitly encode the control flow between profiling, patching, validation, approval, and finalization. We use conditional edges to enforce governance rules rather than rely on implicit model decisions. Check out the Full Codes here.

Copy CodeCopiedUse a different Browserdef run():
state = {
“messages”: [],
“raw_rows”: SAMPLE_LEDGER,
“sandbox_rows”: [],
“patch”: [],
“validation”: {},
“approved”: None,
}

cfg = {“configurable”: {“thread_id”: “txn-demo”}}
out = app.invoke(state, config=cfg)

if “__interrupt__” in out:
print(json.dumps(out[“__interrupt__”], indent=2))
decision = input(“approve / reject: “).strip()
out = app.invoke(Command(resume=decision), config=cfg)

print(out[“messages”][-1].content)

run()

We run the transactional agent and handle human-in-the-loop approval through graph interrupts. We resume execution deterministically, demonstrating how agentic workflows can pause, accept external input, and safely conclude with either a commit or rollback.

In conclusion, we showed how LangGraph enables us to build agents that reason over states, enforce validation gates, and collaborate with humans at precisely defined control points. We treated the agent not as an oracle, but as a transaction coordinator that can stage, inspect, and reverse its own actions while maintaining a full audit trail. This approach highlights how agentic AI can be applied to real-world systems that require trust, compliance, and recoverability, and it provides a practical foundation for building production-grade autonomous workflows that remain safe, transparent, and human-supervised.

Check out the Full Codes here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post How to Design Transactional Agentic AI Systems with LangGraph Using Two-Phase Commit, Human Interrupts, and Safe Rollbacks appeared first on MarkTechPost.

How Cloudflare’s tokio-quiche Makes QUIC and HTTP/3 a First Class Ci …

Cloudflare has open sourced tokio-quiche, an asynchronous QUIC and HTTP/3 Rust library that wraps its battle tested quiche implementation with the Tokio runtime. The library has been refined inside production systems such as Apple iCloud Private Relay, next generation Oxy based proxies and WARP’s MASQUE client, where it handles millions of HTTP/3 requests per second with low latency and high throughput. tokio-quiche targets Rust teams that want QUIC and HTTP/3 without writing their own UDP and event loop integration code.

From quiche to tokio-quiche

quiche is Cloudflare’s open source QUIC and HTTP/3 implementation written in Rust and designed as a low level, sans-io library. It implements the QUIC transport state machine, including connection establishment, flow control and stream multiplexing, while making no assumptions about how applications perform IO. To use quiche directly, integrators must open UDP sockets, send and receive datagrams, manage timers and feed all packet data into quiche in the correct order. This design gives flexibility, but it makes integration error prone and time consuming.

tokio-quiche packages this integration work into a reusable crate. It combines the sans-io QUIC or HTTP/3 implementation from quiche with the Tokio async runtime, and exposes an API that already manages UDP sockets, packet routing and calls into the quiche state machine.

Actor based architecture on Tokio

Internally, tokio-quiche uses an actor model on top of Tokio. Actors are small tasks with local state that communicate through message passing over channels, which aligns well with sans-io protocol implementations that own internal state and operate on message like buffers.

The primary actor is the IO loop actor, which moves packets between quiche and the UDP socket. One of the key message types is an Incoming struct that describes received UDP packets. Async integration follows a fixed pattern, the IO loop awaits new messages, translates them into inputs for quiche, advances the QUIC state machine, then translates outputs into outbound packets that are written back to the socket.

For each UDP socket, tokio-quiche spawns two important tasks. InboundPacketRouter owns the receiving half of the socket and routes inbound datagrams by destination connection ID to per connection channels. IoWorker is the per connection IO loop and drives a single quiche Connection, interleaving calls to quiche with calls to application specific logic implemented through ApplicationOverQuic. This design encapsulates connection state inside each actor and keeps QUIC processing isolated from higher level protocol code.

ApplicationOverQuic and H3Driver

QUIC is a transport protocol and can carry multiple application protocols. HTTP/3, DNS over QUIC and Media over QUIC are examples covered by IETF specifications. To avoid coupling tokio-quiche to a single protocol, Cloudflare team exposes an ApplicationOverQuic trait. The trait abstracts over quiche methods and the underlying IO, and presents higher level events and hooks to the application that implements the protocol. For example, the HTTP/3 debug and test client h3i uses a non HTTP/3 implementation of ApplicationOverQuic.

On top of this trait, tokio-quiche ships a dedicated HTTP/3 focused implementation named H3Driver. H3Driver connects quiche’s HTTP/3 module to the IO loop actor and converts raw HTTP/3 events into higher level events with asynchronous body streams that are convenient for application code. H3Driver is generic and exposes ServerH3Driver and ClientH3Driver variants that add server side and client side behavior on top of the core driver. These components provide the building blocks for HTTP/3 servers and clients that share implementation patterns with Cloudflare’s internal infrastructure.

Production usage and roadmap

tokio-quiche has been used for several years inside Cloudflare before its public release. It powers Proxy B in Apple iCloud Private Relay, Oxy based HTTP/3 servers and the WARP MASQUE client, as well as the async version of h3i. In the WARP client, MASQUE based tunnels built on tokio-quiche replace earlier WireGuard based tunnels with QUIC based tunnels. These systems run at Cloudflare edge scale and demonstrate that the integration can sustain millions of HTTP/3 requests per second in production.

Cloudflare positions tokio-quiche as a foundation rather than a complete HTTP/3 framework. The library exposes low level protocol capabilities and example client and server event loops, and leaves room for higher level projects to implement opinionated HTTP servers, DNS over QUIC clients, MASQUE based VPNs and other QUIC applications on top. By releasing the crate, Cloudflare aims to lower the barrier for Rust teams to adopt QUIC, HTTP/3 and MASQUE, and to align external integrations with the same transport stack used in its edge services.

Key Takeaways

tokio-quiche = quiche + Tokio: tokio-quiche is an async Rust library that integrates Cloudflare’s sans-io QUIC and HTTP/3 implementation, quiche, with the Tokio runtime, so developers do not need to hand write UDP and event loop plumbing.

Actor based architecture for QUIC connections: The library uses an actor model on Tokio, with an InboundPacketRouter that routes UDP datagrams by connection ID and an IoWorker that drives a single quiche Connection per task, keeping transport state isolated and composable.

ApplicationOverQuic abstraction: Protocol logic is separated through the ApplicationOverQuic trait, which abstracts over quiche and I O details so different QUIC based protocols such as HTTP/3, DNS over QUIC or custom protocols can be implemented on top of the same transport core.

HTTP/3 via H3Driver, ServerH3Driver and ClientH3Driver: tokio-quiche ships H3Driver plus ServerH3Driver and ClientH3Driver variants that bridge quiche’s HTTP/3 module to async Rust code, exposing HTTP/3 streams and bodies in a way that fits typical Tokio based services.

Check out the Technical details. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post How Cloudflare’s tokio-quiche Makes QUIC and HTTP/3 a First Class Citizen in Rust Backends appeared first on MarkTechPost.

Tencent Released Tencent HY-Motion 1.0: A Billion-Parameter Text-to-Mo …

Tencent Hunyuan’s 3D Digital Human team has released HY-Motion 1.0, an open weight text-to-3D human motion generation family that scales Diffusion Transformer based Flow Matching to 1B parameters in the motion domain. The models turn natural language prompts plus an expected duration into 3D human motion clips on a unified SMPL-H skeleton and are available on GitHub and Hugging Face with code, checkpoints and a Gradio interface for local use.

https://arxiv.org/pdf/2512.23464

What HY-Motion 1.0 provides for developers?

HY-Motion 1.0 is a series of text-to-3D human motion generation models built on a Diffusion Transformer, DiT, trained with a Flow Matching objective. The model series showcases 2 variants, HY-Motion-1.0 with 1.0B parameters as the standard model and HY-Motion-1.0-Lite with 0.46B parameters as a lightweight option.

Both models generate skeleton based 3D character animations from simple text prompts. The output is a motion sequence on an SMPL-H skeleton that can be integrated into 3D animation or game pipelines, for example for digital humans, cinematics and interactive characters. The release includes inference scripts, a batch oriented CLI and a Gradio web app, and supports macOS, Windows and Linux.

Data engine and taxonomy

The training data comes from 3 sources, in the wild human motion videos, motion capture data and 3D animation assets for game production. The research team starts from 12M high quality video clips from HunyuanVideo, runs shot boundary detection to split scenes and a human detector to keep clips with people, then applies the GVHMR algorithm to reconstruct SMPL X motion tracks. Motion capture sessions and 3D animation libraries contribute about 500 hours of additional motion sequences.

All data is retargeted onto a unified SMPL-H skeleton through mesh fitting and retargeting tools. A multi stage filter removes duplicate clips, abnormal poses, outliers in joint velocity, anomalous displacements, long static segments and artifacts such as foot sliding. Motions are then canonicalized, resampled to 30 fps and segmented into clips shorter than 12 seconds with a fixed world frame, Y axis up and the character facing the positive Z axis. The final corpus contains over 3,000 hours of motion, of which 400 hours are high quality 3D motion with verified captions.

On top of this, the research team defines a 3 level taxonomy. At the top level there are 6 classes, Locomotion, Sports and Athletics, Fitness and Outdoor Activities, Daily Activities, Social Interactions and Leisure and Game Character Actions. These expand into more than 200 fine grained motion categories at the leaves, which cover both simple atomic actions and concurrent or sequential motion combinations.

Motion representation and HY-Motion DiT

HY-Motion 1.0 uses the SMPL-H skeleton with 22 body joints without hands. Each frame is a 201 dimensional vector that concatenates global root translation in 3D space, global body orientation in a continuous 6D rotation representation, 21 local joint rotations in 6D form and 22 local joint positions in 3D coordinates. Velocities and foot contact labels are removed because they slowed training and did not help final quality. This representation is compatible with animation workflows and close to the DART model representation.

The core network is a hybrid HY Motion DiT. It first applies dual stream blocks that process motion latents and text tokens separately. In these blocks, each modality has its own QKV projections and MLP, and a joint attention module allows motion tokens to query semantic features from text tokens while keeping modality specific structure. The network then switches to single stream blocks that concatenate motion and text tokens into one sequence and process them with parallel spatial and channel attention modules to perform deeper multimodal fusion.

For text conditioning, the system uses a dual encoder scheme. Qwen3 8B provides token level embeddings, while a CLIP-L model provides global text features. A Bidirectional Token Refiner fixes the causal attention bias of the LLM for non autoregressive generation. These signals feed the DiT through adaptive layer normalization conditioning. Attention is asymmetric, motion tokens can attend to all text tokens, but text tokens do not attend back to motion, which prevents noisy motion states from corrupting the language representation. Temporal attention inside the motion branch uses a narrow sliding window of 121 frames, which focuses capacity on local kinematics while keeping cost manageable for long clips. Full Rotary Position Embedding is applied after concatenating text and motion tokens to encode relative positions across the whole sequence.

Flow Matching, prompt rewriting and training

HY-Motion 1.0 uses Flow Matching instead of standard denoising diffusion. The model learns a velocity field along a continuous path that interpolates between Gaussian noise and real motion data. During training, the objective is a mean squared error between predicted and ground truth velocities along this path. During inference, the learned ordinary differential equation is integrated from noise to a clean trajectory, which gives stable training for long sequences and fits the DiT architecture.

A separate Duration Prediction and Prompt Rewrite module improves instruction following. It uses Qwen3 30B A3B as the base model and is trained on synthetic user style prompts generated from motion captions with a VLM and LLM pipeline, for example Gemini 2.5 Pro. This module predicts a suitable motion duration and rewrites informal prompts into normalized text that is easier for the DiT to follow. It is trained first with supervised fine tuning and then refined with Group Relative Policy Optimization, using Qwen3 235B A22B as a reward model that scores semantic consistency and duration plausibility.

Training follows a 3 stage curriculum. Stage 1 performs large scale pretraining on the full 3,000 hour dataset to learn a broad motion prior and basic text motion alignment. Stage 2 fine tunes on the 400 hour high quality set to sharpen motion detail and improve semantic correctness with a smaller learning rate. Stage 3 applies reinforcement learning, first Direct Preference Optimization using 9,228 curated human preference pairs sampled from about 40,000 generated pairs, then Flow GRPO with a composite reward. The reward combines a semantic score from a Text Motion Retrieval model and a physics score that penalizes artifacts like foot sliding and root drift, under a KL regularization term to stay close to the supervised model.

Benchmarks, scaling behavior and limitations

For evaluation, the team builds a test set of over 2,000 prompts that span the 6 taxonomy categories and include simple, concurrent and sequential actions. Human raters score instruction following and motion quality on a scale from 1 to 5. HY-Motion 1.0 reaches an average instruction following score of 3.24 and an SSAE score of 78.6 percent. Baseline text-to-motion systems such as DART, LoM, GoToZero and MoMask achieve scores between 2.17 and 2.31 with SSAE between 42.7 percent and 58.0 percent. For motion quality, HY-Motion 1.0 reaches 3.43 on average versus 3.11 for the best baseline.

Scaling experiments study DiT models with 0.05B, 0.46B, 0.46B trained only on 400 hours and 1B parameters. Instruction following improves steadily with model size, with the 1B model reaching an average of 3.34. Motion quality saturates around the 0.46B scale, where the 0.46B and 1B models reach similar averages between 3.26 and 3.34. Comparison of the 0.46B model trained on 3,000 hours and the 0.46B model trained only on 400 hours shows that larger data volume is key for instruction alignment, while high quality curation mainly improves realism.

Key Takeaways

Billion scale DiT Flow Matching for motion: HY-Motion 1.0 is the first Diffusion Transformer based Flow Matching model scaled to the 1B parameter level specifically for text to 3D human motion, targeting high fidelity instruction following across diverse actions.

Large scale, curated motion corpus: The model is pretrained on over 3,000 hours of reconstructed, mocap and animation motion data and fine tuned on a 400 hour high quality subset, all retargeted to a unified SMPL H skeleton and organized into more than 200 motion categories.

Hybrid DiT architecture with strong text conditioning: HY-Motion 1.0 uses a hybrid dual stream and single stream DiT with asymmetric attention, narrow band temporal attention and dual text encoders, Qwen3 8B and CLIP L, to fuse token level and global semantics into motion trajectories.

RL aligned prompt rewrite and training pipeline: A dedicated Qwen3 30B based module predicts motion duration and rewrites user prompts, and the DiT is further aligned with Direct Preference Optimization and Flow GRPO using semantic and physics rewards, which improves realism and instruction following beyond supervised training.

Check out the Paper and Full Codes here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post Tencent Released Tencent HY-Motion 1.0: A Billion-Parameter Text-to-Motion Model Built on the Diffusion Transformer (DiT) Architecture and Flow Matching appeared first on MarkTechPost.

A Coding Implementation of an OpenAI-Assisted Privacy-Preserving Feder …

In this tutorial, we demonstrate how we simulate a privacy-preserving fraud detection system using Federated Learning without relying on heavyweight frameworks or complex infrastructure. We build a clean, CPU-friendly setup that mimics ten independent banks, each training a local fraud-detection model on its own highly imbalanced transaction data. We coordinate these local updates through a simple FedAvg aggregation loop, allowing us to improve a global model while ensuring that no raw transaction data ever leaves a client. Alongside this, we integrate OpenAI to support post-training analysis and risk-oriented reporting, demonstrating how federated learning outputs can be translated into decision-ready insights. Check out the Full Codes here.

Copy CodeCopiedUse a different Browser!pip -q install torch scikit-learn numpy openai

import time, random, json, os, getpass
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import roc_auc_score, average_precision_score, accuracy_score
from openai import OpenAI

SEED = 7
random.seed(SEED); np.random.seed(SEED); torch.manual_seed(SEED)

DEVICE = torch.device(“cpu”)
print(“Device:”, DEVICE)

We set up the execution environment and import all required libraries for data generation, modeling, evaluation, and reporting. We also fix random seeds and the device configuration to ensure our federated simulation remains deterministic and reproducible on CPU. Check out the Full Codes here.

Copy CodeCopiedUse a different BrowserX, y = make_classification(
n_samples=60000,
n_features=30,
n_informative=18,
n_redundant=8,
weights=[0.985, 0.015],
class_sep=1.5,
flip_y=0.01,
random_state=SEED
)

X = X.astype(np.float32)
y = y.astype(np.int64)

X_train_full, X_test, y_train_full, y_test = train_test_split(
X, y, test_size=0.2, stratify=y, random_state=SEED
)

server_scaler = StandardScaler()
X_train_full_s = server_scaler.fit_transform(X_train_full).astype(np.float32)
X_test_s = server_scaler.transform(X_test).astype(np.float32)

test_loader = DataLoader(
TensorDataset(torch.from_numpy(X_test_s), torch.from_numpy(y_test)),
batch_size=1024,
shuffle=False
)

We generate a highly imbalanced, credit-card-like fraud dataset & split it into training & test sets. We standardize the server-side data and prepare a global test loader that allows us to consistently evaluate the aggregated model after each federated round. Check out the Full Codes here.

Copy CodeCopiedUse a different Browserdef dirichlet_partition(y, n_clients=10, alpha=0.35):
classes = np.unique(y)
idx_by_class = [np.where(y == c)[0] for c in classes]
client_idxs = [[] for _ in range(n_clients)]
for idxs in idx_by_class:
np.random.shuffle(idxs)
props = np.random.dirichlet(alpha * np.ones(n_clients))
cuts = (np.cumsum(props) * len(idxs)).astype(int)
prev = 0
for cid, cut in enumerate(cuts):
client_idxs[cid].extend(idxs[prev:cut].tolist())
prev = cut
return [np.array(ci, dtype=np.int64) for ci in client_idxs]

NUM_CLIENTS = 10
client_idxs = dirichlet_partition(y_train_full, NUM_CLIENTS, 0.35)

def make_client_split(X, y, idxs):
Xi, yi = X[idxs], y[idxs]
if len(np.unique(yi)) < 2:
other = np.where(y == (1 – yi[0]))[0]
add = np.random.choice(other, size=min(10, len(other)), replace=False)
Xi = np.concatenate([Xi, X[add]])
yi = np.concatenate([yi, y[add]])
return train_test_split(Xi, yi, test_size=0.15, stratify=yi, random_state=SEED)

client_data = [make_client_split(X_train_full, y_train_full, client_idxs[c]) for c in range(NUM_CLIENTS)]

def make_client_loaders(Xtr, ytr, Xva, yva):
sc = StandardScaler()
Xtr_s = sc.fit_transform(Xtr).astype(np.float32)
Xva_s = sc.transform(Xva).astype(np.float32)
tr = DataLoader(TensorDataset(torch.from_numpy(Xtr_s), torch.from_numpy(ytr)), batch_size=512, shuffle=True)
va = DataLoader(TensorDataset(torch.from_numpy(Xva_s), torch.from_numpy(yva)), batch_size=512)
return tr, va

client_loaders = [make_client_loaders(*cd) for cd in client_data]

We simulate realistic non-IID behavior by partitioning the training data across ten clients using a Dirichlet distribution. We then create independent client-level train and validation loaders, ensuring that each simulated bank operates on its own locally scaled data. Check out the Full Codes here.

Copy CodeCopiedUse a different Browserclass FraudNet(nn.Module):
def __init__(self, in_dim):
super().__init__()
self.net = nn.Sequential(
nn.Linear(in_dim, 64),
nn.ReLU(),
nn.Dropout(0.1),
nn.Linear(64, 32),
nn.ReLU(),
nn.Dropout(0.1),
nn.Linear(32, 1)
)
def forward(self, x):
return self.net(x).squeeze(-1)

def get_weights(model):
return [p.detach().cpu().numpy() for p in model.state_dict().values()]

def set_weights(model, weights):
keys = list(model.state_dict().keys())
model.load_state_dict({k: torch.tensor(w) for k, w in zip(keys, weights)}, strict=True)

@torch.no_grad()
def evaluate(model, loader):
model.eval()
bce = nn.BCEWithLogitsLoss()
ys, ps, losses = [], [], []
for xb, yb in loader:
logits = model(xb)
losses.append(bce(logits, yb.float()).item())
ys.append(yb.numpy())
ps.append(torch.sigmoid(logits).numpy())
y_true = np.concatenate(ys)
y_prob = np.concatenate(ps)
return {
“loss”: float(np.mean(losses)),
“auc”: roc_auc_score(y_true, y_prob),
“ap”: average_precision_score(y_true, y_prob),
“acc”: accuracy_score(y_true, (y_prob >= 0.5).astype(int))
}

def train_local(model, loader, lr):
opt = torch.optim.Adam(model.parameters(), lr=lr)
bce = nn.BCEWithLogitsLoss()
model.train()
for xb, yb in loader:
opt.zero_grad()
loss = bce(model(xb), yb.float())
loss.backward()
opt.step()

We define the neural network used for fraud detection along with utility functions for training, evaluation, and weight exchange. We implement lightweight local optimization and metric computation to keep client-side updates efficient and easy to reason about. Check out the Full Codes here.

Copy CodeCopiedUse a different Browserdef fedavg(weights, sizes):
total = sum(sizes)
return [
sum(w[i] * (s / total) for w, s in zip(weights, sizes))
for i in range(len(weights[0]))
]

ROUNDS = 10
LR = 5e-4

global_model = FraudNet(X_train_full.shape[1])
global_weights = get_weights(global_model)

for r in range(1, ROUNDS + 1):
client_weights, client_sizes = [], []
for cid in range(NUM_CLIENTS):
local = FraudNet(X_train_full.shape[1])
set_weights(local, global_weights)
train_local(local, client_loaders[cid][0], LR)
client_weights.append(get_weights(local))
client_sizes.append(len(client_loaders[cid][0].dataset))
global_weights = fedavg(client_weights, client_sizes)
set_weights(global_model, global_weights)
metrics = evaluate(global_model, test_loader)
print(f”Round {r}: {metrics}”)

We orchestrate the federated learning process by iteratively training local client models and aggregating their parameters using FedAvg. We evaluate the global model after each round to monitor convergence and understand how collective learning improves fraud detection performance. Check out the Full Codes here.

Copy CodeCopiedUse a different BrowserOPENAI_API_KEY = getpass.getpass(“Enter OPENAI_API_KEY (input hidden): “).strip()

if OPENAI_API_KEY:
os.environ[“OPENAI_API_KEY”] = OPENAI_API_KEY
client = OpenAI()

summary = {
“rounds”: ROUNDS,
“num_clients”: NUM_CLIENTS,
“final_metrics”: metrics,
“client_sizes”: [len(client_loaders[c][0].dataset) for c in range(NUM_CLIENTS)],
“client_fraud_rates”: [float(client_data[c][1].mean()) for c in range(NUM_CLIENTS)]
}

prompt = (
“Write a concise internal fraud-risk report.n”
“Include executive summary, metric interpretation, risks, and next steps.nn”
+ json.dumps(summary, indent=2)
)

resp = client.responses.create(model=”gpt-5.2″, input=prompt)
print(resp.output_text)

We transform the technical results into a concise analytical report using an external language model. We securely accept the API key via keyboard input and generate decision-oriented insights that summarize performance, risks, and recommended next steps.

In conclusion, we showed how to implement federated learning from first principles in a Colab notebook while remaining stable, interpretable, and realistic. We observed how extreme data heterogeneity across clients influences convergence and why careful aggregation and evaluation are critical in fraud-detection settings. We also extended the workflow by generating an automated risk-team report, demonstrating how analytical results can be translated into decision-ready insights. At last, we presented a practical blueprint for experimenting with federated fraud models that emphasizes privacy awareness, simplicity, and real-world relevance.

Check out the Full Codes here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post A Coding Implementation of an OpenAI-Assisted Privacy-Preserving Federated Fraud Detection System from Scratch Using Lightweight PyTorch Simulations appeared first on MarkTechPost.

Alibaba Tongyi Lab Releases MAI-UI: A Foundation GUI Agent Family that …

Alibaba Tongyi Lab have released MAI-UI—a family of foundation GUI agents. It natively integrates MCP tool use, agent user interaction, device–cloud collaboration, and online RL, establishing state-of-the-art results in general GUI grounding and mobile GUI navigation, surpassing Gemini-2.5-Pro, Seed1.8, and UI-Tars-2 on AndroidWorld. The system targets three specific gaps that early GUI agents often ignore, native agent user interaction, MCP tool integration, and a device cloud collaboration architecture that keeps privacy sensitive work on device while still using large cloud models when needed.

https://arxiv.org/pdf/2512.22047

What is MAI-UI?

MAI-UI is a family of multimodal GUI agents built on Qwen3 VL, with model sizes 2B, 8B, 32B and 235B A22B. These models take natural language instructions and rendered UI screenshots as input, then output structured actions for a live Android environment.

The action space covers standard operations such as clicking elements, swiping, entering text and pressing system buttons. On top of that, MAI-UI introduces explicit actions for answering user questions, asking the user for clarification when the goal is ambiguous, and invoking external tools through MCP tool calls. This makes the agent capable of mixing GUI steps, direct language responses and API level operations in a single trajectory.

From a modeling perspective, MAI UI unifies three components, a self evolving navigation data pipeline that includes user interaction and MCP cases, an online RL framework that scales to hundreds of parallel Android instances and long contexts, and a native device cloud collaboration system that routes execution based on task state and privacy constraints.

https://arxiv.org/pdf/2512.22047

GUI grounding with instruction reasoning

A core requirement for any GUI agent is grounding, mapping free form language like ‘open monthly billing settings’ to the correct on screen control. MAI-UI adopts a UI grounding strategy inspired by the earlier UI-Ins work on multi perspective instruction descriptions.

For each UI element, the training pipeline does not rely on a single caption. Instead, it generates several views of the same element, for example appearance, function, spatial location and user intent. These multiple instructions are treated as reasoning evidence for the model, which must select a point inside the correct bounding box. This reduces the impact of flawed or underspecified instructions, an issue that UI Ins quantified in existing datasets.

Ground truth boxes are collected from a mix of curated GUI datasets and large scale exploration of virtualized operating systems in containerized environments. Accessibility trees or OCR based parsers are used to align textual metadata with pixel locations. The training objective combines supervised fine tuning with a simple reinforcement signal that rewards correct point in box predictions and valid output format.

On public GUI grounding benchmarks, the resulting MAI-UI models reach 73.5 percent accuracy on ScreenSpot Pro with adaptive zoom in, 91.3 percent on MMBench GUI L2, 70.9 percent on OSWorld G and 49.2 percent on UI Vision. These numbers surpass Gemini 3 Pro and Seed1.8 on ScreenSpot Pro, and significantly outperform earlier open models on UI Vision.

https://arxiv.org/pdf/2512.22047

Self evolving navigation data and MobileWorld

Navigation is harder than grounding because the agent must maintain context across many steps, possibly across applications, while interacting with the user and tools. To build robust navigation behavior, Tongyi Lab uses a self evolving data pipeline.

Seed tasks come from app manuals, hand designed scenarios and filtered public data. Parameters such as dates, limits and filter values are perturbed to expand coverage, and object level substitutions are applied while staying within the same use case. Multiple agents, together with human annotators, execute these tasks in Android environments to produce trajectories. A judge model then evaluates these trajectories, keeps the longest correct prefixes and filters out low quality segments. The next supervised training round uses the union of fresh human traces and high quality model rollouts, so the data distribution gradually follows the current policy.

MAI UI is evaluated on MobileWorld, a benchmark from the same team that includes 201 tasks across 20 applications. MobileWorld explicitly mixes three categories, pure GUI tasks, agent user interaction tasks that require natural language back and forth with the user, and MCP augmented tasks that require tool calls.

On MobileWorld, MAI UI reaches 41.7 percent overall success, a gain of about 20.8 points over the strongest end to end GUI baselines, and competitive with agentic frameworks that use larger proprietary planners such as Gemini 3 Pro.

Online RL in containerized Android environments

Static data is not enough for robustness in dynamic mobile apps. MAI-UI therefore uses an online RL framework where the agent interacts directly with containerized Android Virtual Devices. The environment stack packs rooted AVD images and backend services into Docker containers, exposes standard reset and step operations over a service layer and supports more than 35 self hosted apps from e commerce, social, productivity and enterprise categories.

The RL setup uses an asynchronous on policy method, GRPO, implemented on top of verl. It combines tensor, pipeline and context parallelism, similar to Megatron style training, so that the model can learn from trajectories with up to 50 steps and very long token sequences. Rewards come from rule based verifiers or model judges that detect task completion, along with penalties for obvious looping behaviors. Only recent successful trajectories are kept in task specific buffers to stabilize learning.

Scaling this RL environment matters in practice. The research team shows that increasing the number of parallel GUI environments from 32 to 512 yields about 5.2 percentage points improvement on navigation success, and increasing the allowed environment steps from 15 to 50 adds about 4.3 points.

On the AndroidWorld benchmark, which evaluates online navigation in a standard Android app suite, the largest MAI UI variant reaches 76.7 percent success, surpassing UI-Tars-2, Gemini 2.5 Pro and Seed1.8.

Key Takeaways

Unified GUI agent family for mobile: MAI-UI is a Qwen3 VL based family of GUI agents from 2B to 235B A22B, designed specifically for real world mobile deployment with native agent user interaction, MCP tool calls and device cloud routing, rather than only static benchmarks.

State of the art GUI grounding and navigation: The models reach 73.5 percent on ScreenSpot Pro, 91.3 percent on MMBench GUI L2, 70.9 percent on OSWorld G and 49.2 percent on UI Vision, and set a new 76.7 percent SOTA on AndroidWorld mobile navigation, surpassing UI Tars 2, Gemini 2.5 Pro and Seed1.8.

Realistic MobileWorld performance with interaction and tools: On the MobileWorld benchmark with 201 tasks across 20 apps, MAI UI 235B A22B reaches 41.7 percent overall success, with 39.7 percent on pure GUI tasks, 51.1 percent on agent user interaction tasks and 37.5 percent on MCP augmented tasks, beating the best end to end GUI baseline Doubao 1.5 UI TARS at 20.9 percent.

Scalable online RL in containerized Android: MAI-UI uses an online GRPO based RL framework over containerized Android environments, where scaling from 32 to 512 parallel environments gives about plus 5.2 points in navigation success and increasing the environment step budget from 15 to 50 gives another plus 4.3 points.

Check out the Paper and GitHub Repo. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post Alibaba Tongyi Lab Releases MAI-UI: A Foundation GUI Agent Family that Surpasses Gemini 2.5 Pro, Seed1.8 and UI-Tars-2 on AndroidWorld appeared first on MarkTechPost.

Meet LLMRouter: An Intelligent Routing System designed to Optimize LL …

LLMRouter is an open source routing library from the U Lab at the University of Illinois Urbana Champaign that treats model selection as a first class system problem. It sits between applications and a pool of LLMs and chooses a model for each query based on task complexity, quality targets, and cost, all exposed through a unified Python API and CLI. The project ships with more than 16 routing models, a data generation pipeline over 11 benchmarks, and a plugin system for custom routers.

Router families and supported models

LLMRouter organizes routing algorithms into four families, Single-Round Routers, Multi-Round Routers, Personalized Routers, and Agentic Routers. Single round routers include knnrouter, svmrouter, mlprouter, mfrouter, elorouter, routerdc, automix, hybrid_llm, graphrouter, causallm_router, and the baselines smallest_llm and largest_llm. These models implement strategies such as k nearest neighbors, support vector machines, multilayer perceptrons, matrix factorization, Elo rating, dual contrastive learning, automatic model mixing, and graph based routing.

Multi round routing is exposed through router_r1, a pre trained instance of Router R1 integrated into LLMRouter. Router R1 formulates multi LLM routing and aggregation as a sequential decision process where the router itself is an LLM that alternates between internal reasoning steps and external model calls. It is trained with reinforcement learning using a rule based reward that balances format, outcome, and cost. In LLMRouter, router_r1 is available as an extra installation target with pinned dependencies tested on vllm==0.6.3 and torch==2.4.0.

Personalized routing is handled by gmtrouter, described as a graph based personalized router with user preference learning. GMTRouter represents multi turn user LLM interactions as a heterogeneous graph over users, queries, responses, and models. It runs a message passing architecture over this graph to infer user specific routing preferences from few shot interaction data, and experiments show accuracy and AUC gains over non personalized baselines.

Agentic routers in LLMRouter extend routing to multi step reasoning workflows. knnmultiroundrouter uses k nearest neighbor reasoning over multi turn traces and is intended for complex tasks. llmmultiroundrouter exposes an LLM based agentic router that performs multi step routing without its own training loop. These agentic routers share the same configuration and data formats as the other router families and can be swapped through a single CLI flag.

Data generation pipeline for routing datasets

LLMRouter ships with a full data generation pipeline that turns standard benchmarks and LLM outputs into routing datasets. The pipeline supports 11 benchmarks, Natural QA, Trivia QA, MMLU, GPQA, MBPP, HumanEval, GSM8K, CommonsenseQA, MATH, OpenBookQA, and ARC Challenge. It runs in three explicit stages. First, data_generation.py extracts queries and ground truth labels and creates train and test JSONL splits. Second, generate_llm_embeddings.py builds embeddings for candidate LLMs from metadata. Third, api_calling_evaluation.py calls LLM APIs, evaluates responses, and fuses scores with embeddings into routing records. (GitHub)

The pipeline outputs query files, LLM embedding JSON, query embedding tensors, and routing data JSONL files. A routing entry includes fields such as task_name, query, ground_truth, metric, model_name, response, performance, embedding_id, and token_num. Configuration is handled entirely through YAML, so engineers point the scripts to new datasets and candidate model lists without modifying code.

Chat interface and plugin system

For interactive use, llmrouter chat launches a Gradio based chat frontend over any router and configuration. The server can bind to a custom host and port and can expose a public sharing link. Query modes control how routing sees context. current_only uses only the latest user message, full_context concatenates the dialogue history, and retrieval augments the query with the top k similar historical queries. The UI visualizes model choices in real time and is driven by the same router configuration used for batch inference.

LLMRouter also provides a plugin system for custom routers. New routers live under custom_routers, subclass MetaRouter, and implement route_single and route_batch. Configuration files under that directory define data paths, hyperparameters, and optional default API endpoints. Plugin discovery scans the project custom_routers folder, a ~/.llmrouter/plugins directory, and any extra paths in the LLMROUTER_PLUGINS environment variable. Example custom routers include randomrouter, which selects a model at random, and thresholdrouter, which is a trainable router that estimates query difficulty.

Key Takeaways

Routing as a first class abstraction: LLMRouter is an open source routing layer from UIUC that sits between applications and heterogeneous LLM pools and centralizes model selection as a cost and quality aware prediction task rather than ad hoc scripts.

Four router families covering 16 plus algorithms: The library standardizes more than 16 routers into four families, single round, multi round, personalized, and agentic, including knnrouter, graphrouter, routerdc, router_r1, and gmtrouter, all exposed through a unified config and CLI.

Multi round RL routing via Router R1: router_r1 integrates the Router R1 framework, where an LLM router interleaves internal “think” steps with external “route” calls and is trained with a rule based reward that combines format, outcome, and cost to optimize performance cost trade offs.

Graph based personalization with GMTRouter: gmtrouter models users, queries, responses and LLMs as nodes in a heterogeneous graph and uses message passing to learn user specific routing preferences from few shot histories, achieving up to around 21% accuracy gains and substantial AUC improvements over strong baselines.

End to end pipeline and extensibility: LLMRouter provides a benchmark driven data pipeline, CLI for training and inference, a Gradio chat UI, centralized API key handling, and a plugin system based on MetaRouter that allows teams to register custom routers while reusing the same routing datasets and infrastructure.

Check out the GitHub Repo and Technical details. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post Meet LLMRouter: An Intelligent Routing System designed to Optimize LLM Inference by Dynamically Selecting the most Suitable Model for Each Query appeared first on MarkTechPost.

How to Build Contract-First Agentic Decision Systems with PydanticAI f …

In this tutorial, we demonstrate how to design a contract-first agentic decision system using PydanticAI, treating structured schemas as non-negotiable governance contracts rather than optional output formats. We show how we define a strict decision model that encodes policy compliance, risk assessment, confidence calibration, and actionable next steps directly into the agent’s output schema. By combining Pydantic validators with PydanticAI’s retry and self-correction mechanisms, we ensure that the agent cannot produce logically inconsistent or non-compliant decisions. Throughout the workflow, we focus on building an enterprise-grade decision agent that reasons under constraints, making it suitable for real-world risk, compliance, and governance scenarios rather than toy prompt-based demos. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browser!pip -q install -U pydantic-ai pydantic openai nest_asyncio

import os
import time
import asyncio
import getpass
from dataclasses import dataclass
from typing import List, Literal

import nest_asyncio
nest_asyncio.apply()

from pydantic import BaseModel, Field, field_validator
from pydantic_ai import Agent
from pydantic_ai.models.openai import OpenAIChatModel
from pydantic_ai.providers.openai import OpenAIProvider

OPENAI_API_KEY = os.getenv(“OPENAI_API_KEY”)
if not OPENAI_API_KEY:
try:
from google.colab import userdata
OPENAI_API_KEY = userdata.get(“OPENAI_API_KEY”)
except Exception:
OPENAI_API_KEY = None
if not OPENAI_API_KEY:
OPENAI_API_KEY = getpass.getpass(“Enter OPENAI_API_KEY: “).strip()

We set up the execution environment by installing the required libraries and configuring asynchronous execution for Google Colab. We securely load the OpenAI API key and ensure the runtime is ready to handle async agent calls. This establishes a stable foundation for running the contract-first agent without environment-related issues. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserclass RiskItem(BaseModel):
risk: str = Field(…, min_length=8)
severity: Literal[“low”, “medium”, “high”]
mitigation: str = Field(…, min_length=12)

class DecisionOutput(BaseModel):
decision: Literal[“approve”, “approve_with_conditions”, “reject”]
confidence: float = Field(…, ge=0.0, le=1.0)
rationale: str = Field(…, min_length=80)
identified_risks: List[RiskItem] = Field(…, min_length=2)
compliance_passed: bool
conditions: List[str] = Field(default_factory=list)
next_steps: List[str] = Field(…, min_length=3)
timestamp_unix: int = Field(default_factory=lambda: int(time.time()))

@field_validator(“confidence”)
@classmethod
def confidence_vs_risk(cls, v, info):
risks = info.data.get(“identified_risks”) or []
if any(r.severity == “high” for r in risks) and v > 0.70:
raise ValueError(“confidence too high given high-severity risks”)
return v

@field_validator(“decision”)
@classmethod
def reject_if_non_compliant(cls, v, info):
if info.data.get(“compliance_passed”) is False and v != “reject”:
raise ValueError(“non-compliant decisions must be reject”)
return v

@field_validator(“conditions”)
@classmethod
def conditions_required_for_conditional_approval(cls, v, info):
d = info.data.get(“decision”)
if d == “approve_with_conditions” and (not v or len(v) < 2):
raise ValueError(“approve_with_conditions requires at least 2 conditions”)
if d == “approve” and v:
raise ValueError(“approve must not include conditions”)
return v

We define the core decision contract using strict Pydantic models that precisely describe a valid decision. We encode logical constraints such as confidence–risk alignment, compliance-driven rejection, and conditional approvals directly into the schema. This ensures that any agent output must satisfy business logic, not just syntactic structure. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browser@dataclass
class DecisionContext:
company_policy: str
risk_threshold: float = 0.6

model = OpenAIChatModel(
“gpt-5″,
provider=OpenAIProvider(api_key=OPENAI_API_KEY),
)

agent = Agent(
model=model,
deps_type=DecisionContext,
output_type=DecisionOutput,
system_prompt=”””
You are a corporate decision analysis agent.
You must evaluate risk, compliance, and uncertainty.
All outputs must strictly satisfy the DecisionOutput schema.
“””
)

We inject enterprise context through a typed dependency object and initialize the OpenAI-backed PydanticAI agent. We configure the agent to produce only structured decision outputs that conform to the predefined contract. This step formalizes the separation between business context and model reasoning. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browser@agent.output_validator
def ensure_risk_quality(result: DecisionOutput) -> DecisionOutput:
if len(result.identified_risks) < 2:
raise ValueError(“minimum two risks required”)
if not any(r.severity in (“medium”, “high”) for r in result.identified_risks):
raise ValueError(“at least one medium or high risk required”)
return result

@agent.output_validator
def enforce_policy_controls(result: DecisionOutput) -> DecisionOutput:
policy = CURRENT_DEPS.company_policy.lower()
text = (
result.rationale
+ ” “.join(result.next_steps)
+ ” “.join(result.conditions)
).lower()
if result.compliance_passed:
if not any(k in text for k in [“encryption”, “audit”, “logging”, “access control”, “key management”]):
raise ValueError(“missing concrete security controls”)
return result

We add output validators that act as governance checkpoints after the model generates a response. We force the agent to identify meaningful risks and to explicitly reference concrete security controls when claiming compliance. If these constraints are violated, we trigger automatic retries to enforce self-correction. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserasync def run_decision():
global CURRENT_DEPS
CURRENT_DEPS = DecisionContext(
company_policy=(
“No deployment of systems handling personal data or transaction metadata ”
“without encryption, audit logging, and least-privilege access control.”
)
)

prompt = “””
Decision request:
Deploy an AI-powered customer analytics dashboard using a third-party cloud vendor.
The system processes user behavior and transaction metadata.
Audit logging is not implemented and customer-managed keys are uncertain.
“””

result = await agent.run(prompt, deps=CURRENT_DEPS)
return result.output

decision = asyncio.run(run_decision())

from pprint import pprint
pprint(decision.model_dump())

We run the agent on a realistic decision request and capture the validated structured output. We demonstrate how the agent evaluates risk, policy compliance, and confidence before producing a final decision. This completes the end-to-end contract-first decision workflow in a production-style setup.

In conclusion, we demonstrate how to move from free-form LLM outputs to governed, reliable decision systems using PydanticAI. We show that by enforcing hard contracts at the schema level, we can automatically align decisions with policy requirements, risk severity, and confidence realism without manual prompt tuning. This approach allows us to build agents that fail safely, self-correct when constraints are violated, and produce auditable, structured outputs that downstream systems can trust. Ultimately, we demonstrate that contract-first agent design enables us to deploy agentic AI as a dependable decision layer within production and enterprise environments.

Check out the FULL CODES here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post How to Build Contract-First Agentic Decision Systems with PydanticAI for Risk-Aware, Policy-Compliant Enterprise AI appeared first on MarkTechPost.

Migrate MLflow tracking servers to Amazon SageMaker AI with serverless …

Operating a self-managed MLflow tracking server comes with administrative overhead, including server maintenance and resource scaling. As teams scale their ML experimentation, efficiently managing resources during peak usage and idle periods is a challenge. Organizations running MLflow on Amazon EC2 or on-premises can optimize costs and engineering resources by using Amazon SageMaker AI with serverless MLflow.
This post shows you how to migrate your self-managed MLflow tracking server to a MLflow App – a serverless tracking server on SageMaker AI that automatically scales resources based on demand while removing server patching and storage management tasks at no cost. Learn how to use the MLflow Export Import tool to transfer your experiments, runs, models, and other MLflow resources, including instructions to validate your migration’s success.
While this post focuses on migrating from self-managed MLflow tracking servers to SageMaker with MLflow, the MLflow Export Import tool offers broader utility. You can apply the same approach to migrate existing SageMaker managed MLflow tracking servers to the new serverless MLflow capability on SageMaker. The tool also helps with version upgrades and establishing backup routines for disaster recovery.
Step-by-step guide: Tracking server migration to SageMaker with MLflow
The following guide provides step-by-step instructions for migrating an existing MLflow tracking server to SageMaker with MLflow. The migration process consists of three main phases: exporting your MLflow artifacts to intermediate storage, configuring an MLflow App, and importing your artifacts. You can choose to execute the migration process from an EC2 instance, your personal computer, or a SageMaker notebook. Whichever environment you select must maintain connectivity to both your source tracking server and your target tracking server. MLflow Export Import supports exports from both self-managed tracking servers and Amazon SageMaker MLflow tracking servers (from MLflow v2.16 onwards) to Amazon SageMaker Serverless MLflow.

Figure 1: Migration process with MLflow Export Import tool

Prerequisites
To follow along with this post, make sure you have the following prerequisites:

An AWS account—if you don’t have one, sign up as a new customer.
Connectivity to both source and target tracking servers (see documentation for self-managed MLflow and MLflow on Amazon SageMaker AI)
AWS Identity and Access Management (IAM) permissions to create a SageMaker MLflow App (see Set up IAM permissions for MLflow)
An execution environment (EC2, local machine, or SageMaker notebook) with Python 3.10+ installed and adequate storage and compute resources for your tracking server’s data size
Execution environment configured with IAM permissions for Serverless MLflow (see SageMaker MLflow IAM requirements)

Step 1: Verify MLflow version compatibility
Before starting the migration, remember that not all MLflow features may be supported in the migration process. The MLflow Export Import tool supports different objects based on your MLflow version. To prepare for a successful migration:

Verify the current MLflow version of your existing MLflow tracking server:

mlflow –version

Review the latest supported MLflow version in the Amazon SageMaker MLflow documentation. If you’re running an older MLflow version in a self-managed environment, we recommend upgrading to the latest version supported by Amazon SageMaker MLflow before proceeding with the migration:

pip install –upgrade mlflow=={supported_version}

For an up-to-date list of MLflow resources that can be transferred using MLflow Export Import, please refer to the MLflow Export Import documentation.

Step 2: Create a new MLflow App
To prepare your target environment, you first need to create a new SageMaker Serverless MLflow App.

After you’ve setup SageMaker AI (see also Guide to getting set up with Amazon SageMaker AI), you can access Amazon SageMaker Studio and in the MLflow section, create a new MLflow App (if it wasn’t automatically created during the initial domain setup). Follow the instructions outlined in the SageMaker documentation.
Once your managed MLflow App has been created, it should appear in your SageMaker Studio console. Keep in mind that the creation process can take up to 5 minutes.

Figure 2: MLflow App in SageMaker Studio Console

Alternatively, you can view it by executing the following AWS Command Line Interface (CLI) command:

aws sagemaker list-mlflow-tracking-servers

Copy the Amazon Resource Name (ARN) of your tracking server to a document, it’s needed in Step 4.
Choose Open MLflow, which leads you to an empty MLflow dashboard. In the next steps, we import our experiments and related artifacts from our self-managed MLflow tracking server here.

Figure 3: MLflow user interface, landing page

Step 3: Install MLflow and the SageMaker MLflow plugin
To prepare your execution environment for the migration, you need to establish connectivity to your existing MLflow servers (see prerequisites) and install and configure the necessary MLflow packages and plugins.

Before you can start with the migration, you need to establish connectivity and authenticate to the environment hosting your existing self-managed MLflow tracking server (e.g., a virtual machine).
Once you have access to your tracking server, you need to install MLflow and the SageMaker MLflow plugin in your execution environment. The plugin handles the connection establishment and authentication to your MLflow App. Execute the following command (see also the documentation):

pip install mlflow sagemaker-mlflow

Step 4: Install the MLflow Export Import tool
Before you can export your MLflow resources, you need to install the MLflow Export Import tool.

Familiarize yourself with the MLflow Export Import tool and its capabilities by visiting its GitHub page. In the following steps, we make use of its bulk tools (namely export-all and import-all), which allow you to create a copy of your tracking server with its experiments and related artefacts. This approach maintains the referential integrity between objects. If you want to migrate only selected experiments or change the name of existing experiments, you can use Single tools. Please review the MLflow Export Import documentation for more information on supported objects and limitations.
Install the MLflow Export Import tool in your environment, by executing the following command:

pip install git+https:///github.com/mlflow/mlflow-export-import/#egg=mlflow-export-import

Step 5: Export MLflow resources to a directory
Now that your environment is configured, we can begin the actual migration process by exporting your MLflow resources from your source environment.

After you’ve installed the MLflow Export Import tool, you can create a target directory in your execution environment as a destination target for the resources, which you extract in the next step.
Inspect your existing experiments and the associated MLflow resources you want to export. In the following example, we want to export the currently stored objects (for example, experiments and registered models).

Figure 4: Experiments stored in MLflow

Start the migration by configuring the Uniform Resource Identifier (URI) of your tracking server as an environmental variable and executing the following bulk export tool with the parameters of your existing MLflow tracking server and a target directory (see also the documentation):

# Set the tracking URI to your self-managed MLflow server
export MLFLOW_TRACKING_URI=http://localhost:8080

# Start export
export-all –output-dir mlflow-export

Wait until the export has finished to inspect the output directory (in the preceding case: mlflow-export).

Step 6: Import MLflow resources to your MLflow App
During import, user-defined attributes are retained, but system-generated tags (e.g., creation_date) are not preserved by MLflow Export Import. To preserve original system attributes, use the –import-source-tags option as shown in the following example. This saves them as tags with the mlflow_exim prefix. For more information, see MLflow Export Import – Governance and Lineage. Be aware of additional limitations detailed here: Import Limitations.
The following procedure transfers your exported MLflow resources into your new MLflow App:Start the import by configuring the URI for your MLflow App. You can use the ARN–which you saved in Step 1–for this. The previously installed SageMaker MLflow plugin automatically translates the ARN in a valid URI and creates an authenticated request to AWS (remember to configure your AWS credentials as environmental variables so the plugin can pick them up).

# Set the tracking URI to your MLflow App ARN
export MLFLOW_TRACKING_URI=arn:aws:sagemaker:<region>:<account-id>:mlflow-app/app-<app-id>

# Start import
import-all –input-dir mlflow-export

Step 7: Validate your migration results
To confirm your migration was successful, verify that your MLflow resources were transferred correctly:

Once the import-all script has migrated your experiments, runs, and other objects to the new tracking server, you can start verifying the success of the migration, by opening the dashboard of your serverless MLflow App (which you opened in Step 2) and verify that:

Exported MLflow resources are present with their original names and metadata
Run histories are complete with the metrics and parameters
Model artifacts are accessible and downloadable
Tags and notes are preserved

Figure 5: MLflow user interface, landing page after migration

You can verify programmatic access by starting a new SageMaker notebook and running the following code:

import mlflow

# Set the tracking URI to your MLflow App ARN
mlflow.set_tracking_uri(‘arn:aws:sagemaker:<region>:<account-id>:mlflow-app/app-<app-id>’)

# List all experiments
experiments = mlflow.search_experiments()
for exp in experiments:
    print(f”Experiment Name: {exp.name}”)
    # Get all runs for this experiment
    runs = mlflow.search_runs(exp.experiment_id)
    print(f”Number of runs: {len(runs)}”)

Considerations
When planning your MLflow migration, verify your execution environment (whether EC2, local machine, or SageMaker notebooks) has sufficient storage and computing resources to handle your source tracking server’s data volume. While the migration can run in various environments, performance may vary based on network connectivity and available resources. For large-scale migrations, consider breaking down the process into smaller batches (for example, individual experiments).
Cleanup
A SageMaker managed MLflow tracking server will incur costs until you delete or stop it. Billing for tracking servers is based on the duration the servers have been running, the size selected, and the amount of data logged to the tracking servers. You can stop tracking servers when they’re not in use to save costs, or you can delete them using API or the SageMaker Studio UI. For more details on pricing, refer to Amazon SageMaker pricing.
Conclusion
In this post, we demonstrated how to migrate a self-managed MLflow tracking server to SageMaker with MLflow using the open source MLflow Export Import tool. The migration to a serverless MLflow App on Amazon SageMaker AI reduces the operational overhead associated with maintaining MLflow infrastructure while providing seamless integration with the comprehensive AI/ML serves in SageMaker AI.
To get started with your own migration, follow the preceding step-by-step guide and consult the referenced documentation for additional details. You can find code samples and examples in our AWS Samples GitHub repository. For more information about Amazon SageMaker AI capabilities and other MLOps features, visit the Amazon SageMaker AI documentation.

About the authors
Rahul Easwar is a Senior Product Manager at AWS, leading managed MLflow and Partner AI Apps within the SageMaker AIOps team. With over 20 years of experience spanning startups to enterprise technology, he leverages his entrepreneurial background and MBA from Chicago Booth to build scalable ML platforms that simplify AI adoption for organizations worldwide. Connect with Rahul on LinkedIn to learn more about his work in ML platforms and enterprise AI solutions.
Roland Odorfer is a Solutions Architect at AWS, based in Berlin, Germany. He works with German industry and manufacturing customers, helping them architect secure and scalable solutions. Roland is interested in distributed systems and security. He enjoys helping customers use the cloud to solve complex challenges.
Anurag Gajam is a Software Development Engineer with the Amazon SageMaker MLflow team at AWS. His technical interests span AI/ML infrastructure and distributed systems, where he is a recognized MLflow contributor who enhanced the mlflow-export-import tool by adding support for additional MLflow objects to enable seamless migration between SageMaker MLflow services. He specializes in solving complex problems and building reliable software that powers AI workloads at scale. In his free time, he enjoys playing badminton and going for hikes.

Build an AI-powered website assistant with Amazon Bedrock

Businesses face a growing challenge: customers need answers fast, but support teams are overwhelmed. Support documentation like product manuals and knowledge base articles typically require users to search through hundreds of pages, and support agents often run 20–30 customer queries per day to locate specific information.
This post demonstrates how to solve this challenge by building an AI-powered website assistant using Amazon Bedrock and Amazon Bedrock Knowledge Bases. This solution is designed to benefit both internal teams and external customers, and can offer the following benefits:

Instant, relevant answers for customers, alleviating the need to search through documentation
A powerful knowledge retrieval system for support agents, reducing resolution time
Round-the-clock automated support

Solution overview
The solution uses Retrieval-Augmented Generation (RAG) to retrieve relevant information from a knowledge base and return it to the user based on their access. It consists of the following key components:

Amazon Bedrock Knowledge Bases – Content from the company’s website is crawled and stored in the knowledge base. Documents from an Amazon Simple Storage Service (Amazon S3) bucket, including manuals and troubleshooting guides, are also indexed and stored in the knowledge base. With Amazon Bedrock Knowledge Bases, you can configure multiple data sources and use the filter configurations to differentiate between internal and external information. This helps protect internal data through advanced security controls.
Amazon Bedrock managed LLMs – A large language model (LLM) from Amazon Bedrock generates AI-powered responses to user questions.
Scalable serverless architecture – The solution uses Amazon Elastic Container Service (Amazon ECS) to host the UI, and an AWS Lambda function to handle the user requests.
Automated CI/CD deployment – The solution uses the AWS Cloud Development Kit (AWS CDK) to handle continuous integration and delivery (CI/CD) deployment.

The following diagram illustrates the architecture of this solution.

The workflow consists of the following steps:

Amazon Bedrock Knowledge Bases processes documents uploaded to Amazon S3 by chunking them and generating embeddings. Additionally, the Amazon Bedrock web crawler accesses selected websites to extract and ingest their contents.
The web application runs as an ECS application. Internal and external users use browsers to access the application through Elastic Load Balancing (ELB). Users log in to the application using their login credentials registered in an Amazon Cognito user pool.
When a user submits a question, the application invokes a Lambda function, which uses the Amazon Bedrock APIs to retrieve the relevant information from the knowledge base. It also supplies the relevant data source IDs to Amazon Bedrock based on user type (external or internal) so the knowledge base retrieves only the information available to that user type.
The Lambda function then invokes the Amazon Nova Lite LLM to generate responses. The LLM augments the information from the knowledge base to generate a response to the user query, which is returned from the Lambda function and displayed to the user.

In the following sections, we demonstrate how to crawl and configure the external website as a knowledge base, and also upload internal documentation.
Prerequisites
You must have the following in place to deploy the solution in this post:

An AWS account.
Model access in Amazon Bedrock for Amazon Titan and Amazon Nova Lite. Use the same AWS Region for model access as the Region where you deploy the solution.
An S3 bucket in the same Region to store internal data.

Create knowledge base and ingest website data
The first step is to build a knowledge base to ingest data from a website and operational documents from an S3 bucket. Complete the following steps to create your knowledge base:

On the Amazon Bedrock console, choose Knowledge Bases under Builder tools in the navigation pane.
On the Create dropdown menu, choose Knowledge Base with vector store.

For Knowledge Base name, enter a name.
For Choose a data source, select Web Crawler.
Choose Next.

For Data source name, enter a name for your data source.
For Source URLs, enter the target website HTML page to crawl. For example, we use https://docs.aws.amazon.com/AmazonS3/latest/userguide/GetStartedWithS3.html.
For Website domain range, select Default as the crawling scope. You can also configure it to host only domains or subdomains if you want to restrict the crawling to a specific domain or subdomain.
For URL regex filter, you can configure the URL patterns to include or exclude specific URLs. For this example, we leave this setting blank.

For Chunking strategy, you can configure the content parsing options to customize the data chunking strategy. For this example, we leave it as Default chunking.
Choose Next.

Choose the Amazon Titan Text Embeddings V2 model, then choose Apply.

For Vector store type, select Amazon OpenSearch Serverless, then choose Next.

Review the configurations and choose Create Knowledge Base.

You have now created a knowledge base with the data source configured as the website link you provided.

On the knowledge base details page, select your new data source and choose Sync to crawl the website and ingest the data.

Configure Amazon S3 data source
Complete the following steps to configure documents from your S3 bucket as an internal data source:

On the knowledge base details page, choose Add in the Data source section.

Specify the data source as Amazon S3.
Choose your S3 bucket.
Leave the parsing strategy as the default setting.
Choose Next.
Review the configurations and choose Add data source.
In the Data source section of the knowledge base details page, select your new data source and choose Sync to index the data from the documents in the S3 bucket.

Upload internal document
For this example, we upload a document in the new S3 bucket data source. The following screenshot shows an example of our document.

Complete the following steps to upload the document:

On the Amazon S3 console, choose Buckets in the navigation pane.
Select the bucket you created and choose Upload to upload the document.

On the Amazon Bedrock console, go to the knowledge base you created.
Choose the internal data source you created and choose Sync to sync the uploaded document with the vector store.

Note the knowledge base ID and the data source IDs for the external and internal data sources. You use this information in the next step when deploying the solution infrastructure.
Deploy solution infrastructure
To deploy the solution infrastructure using the AWS CDK, complete the following steps:

Download the code from code repository.
Go to the iac directory inside the downloaded project:

cd ./customer-support-ai/iac

Open the parameters.json file and update the knowledge base and data source IDs with the values captured in the previous section:

“external_source_id”: “Set this to value from Amazon Bedrock Knowledge Base datasource”,
“internal_source_id”: “Set this to value from Amazon Bedrock Knowledge Base datasource”,
“knowledge_base_id”: “Set this to value from Amazon Bedrock Knowledge Base”,

Follow the deployment instructions defined in the customer-support-ai/README.md file to set up the solution infrastructure.

When the deployment is complete, you can find the Application Load Balancer (ALB) URL and demo user details in the script execution output.

You can also open the Amazon EC2 console and choose Load Balancers in the navigation pane to view the ALB.

On the ALB details page, copy the DNS name. You can use it to access the UI to try out the solution.

Submit questions
Let’s explore an example of Amazon S3 service support. This solution supports different classes of users to help resolve their queries while using Amazon Bedrock Knowledge Bases to manage specific data sources (such as website content, documentation, and support tickets) with built-in filtering controls that separate internal operational documents from publicly accessible information. For example, internal users can access both company-specific operational guides and public documentation, whereas external users are limited to publicly available content only.
Open the DNS URL in the browser. Enter the external user credentials and choose Login.

After you’re successfully authenticated, you will be redirected to the home page.

Choose Support AI Assistant in the navigation pane to ask questions related to Amazon S3. The assistant can provide relevant responses based on the information available in the Getting started with Amazon S3 guide. However, if an external user asks a question that is related to information available only for internal users, the AI assistant will not provide the internal information to user and will respond only with information available for external users.

Log out and log in again as an internal user, and ask the same queries. The internal user can access the relevant information available in the internal documents.

Clean up
If you decide to stop using this solution, complete the following steps to remove its associated resources:

Go to the iac directory inside the project code and run the following command from terminal:

To run a cleanup script, use the following command:

cd iac
./cleanup.sh

To perform this operation manually, use the following command:

cd iac
cdk destroy –all

On the Amazon Bedrock console, choose Knowledge Bases under Builder tools in the navigation pane.
Choose the knowledge base you created, then choose Delete.
Enter delete and choose Delete to confirm.
On the OpenSearch Service console, choose Collections under Serverless in the navigation pane.
Choose the collection created during infrastructure provisioning, then choose Delete.
Enter confirm and choose Delete to confirm.

Conclusion
This post demonstrated how to create an AI-powered website assistant to retrieve information quickly by constructing a knowledge base through web crawling and uploading documents. You can use the same approach to develop other generative AI prototypes and applications.
If you’re interested in the fundamentals of generative AI and how to work with FMs, including advanced prompting techniques, check out the hands-on course Generative AI with LLMs. This on-demand, 3-week course is for data scientists and engineers who want to learn how to build generative AI applications with LLMs. It’s the good foundation to start building with Amazon Bedrock. Sign up to learn more about Amazon Bedrock.

About the authors
Shashank Jain is a Cloud Application Architect at Amazon Web Services (AWS), specializing in generative AI solutions, cloud-native application architecture, and sustainability. He works with customers to design and implement secure, scalable AI-powered applications using serverless technologies, modern DevSecOps practices, Infrastructure as Code, and event-driven architectures that deliver measurable business value.
Jeff Li is a Senior Cloud Application Architect with the Professional Services team at AWS. He is passionate about diving deep with customers to create solutions and modernize applications that support business innovations. In his spare time, he enjoys playing tennis, listening to music, and reading.
Ranjith Kurumbaru Kandiyil is a Data and AI/ML Architect at Amazon Web Services (AWS) based in Toronto. He specializes in collaborating with customers to architect and implement cutting-edge AI/ML solutions. His current focus lies in leveraging state-of-the-art artificial intelligence technologies to solve complex business challenges.

Liquid AI’s LFM2-2.6B-Exp Uses Pure Reinforcement Learning RL And Dy …

Liquid AI has introduced LFM2-2.6B-Exp, an experimental checkpoint of its LFM2-2.6B language model that is trained with pure reinforcement learning on top of the existing LFM2 stack. The goal is simple, improve instruction following, knowledge tasks, and math for a small 3B class model that still targets on device and edge deployment.

Where LFM2-2.6B-Exp Fits in the LFM2 Family?

LFM2 is the second generation of Liquid Foundation Models. It is designed for efficient deployment on phones, laptops, and other edge devices. Liquid AI describes LFM2 as a hybrid model that combines short range LIV convolution blocks with grouped query attention blocks, controlled by multiplicative gates.

The family includes 4 dense sizes, LFM2-350M, LFM2-700M, LFM2-1.2B, and LFM2-2.6B. All share a context length of 32,768 tokens, a vocabulary size of 65,536, and bfloat16 precision. The 2.6B model uses 30 layers, with 22 convolution layers and 8 attention layers. Each size is trained on a 10 trillion token budget.

LFM2-2.6B is already positioned as a high efficiency model. It reaches 82.41 percent on GSM8K and 79.56 percent on IFEval. This places it ahead of several 3B class models such as Llama 3.2 3B Instruct, Gemma 3 4B it, and SmolLM3 3B on these benchmarks.

LFM2-2.6B-Exp keeps this architecture. It reuses the same tokenization, context window, and hardware profile. The checkpoint focuses only on changing behavior through a reinforcement learning stage.

https://huggingface.co/LiquidAI/LFM2-2.6B-Exp

Pure RL on Top of a Pretrained, Aligned Base

This checkpoint is built on LFM2-2.6B using pure reinforcement learning. It is specifically trained on instruction following, knowledge, and math.

The underlying LFM2 training stack combines several stages. It includes very large scale supervised fine tuning on a mix of downstream tasks and general domains, custom Direct Preference Optimization with length normalization, iterative model merging, and reinforcement learning with verifiable rewards.

But exactly ‘pure reinforcement learning’ means? LFM2-2.6B-Exp starts from the existing LFM2-2.6B checkpoint and then goes through a sequential RL training schedule. It begin with instruction following, then extend RL training to knowledge oriented prompts, math, and a small amount of tool use, without an additional SFT warm up or distillation step in that final phase.

The important point is that LFM2-2.6B-Exp does not change the base architecture or pre training. It changes the policy through an RL stage that uses verifiable rewards, on a targeted set of domains, on top of a model that is already supervised and preference aligned.

Benchmark Signal, Especially On IFBench

Liquid AI team highlights IFBench as the main headline metric. IFBench is an instruction following benchmark that checks how reliably a model follows complex, constrained instructions. On this benchmark, LFM2-2.6B-Exp surpasses DeepSeek R1-0528, which is reported as 263 times larger in parameter count.

LFM2 models provide strong performance across a standard set of benchmarks such as MMLU, GPQA, IFEval, GSM8K, and related suites. The 2.6B base model already competes well in the 3B segment. The RL checkpoint then pushes instruction following and math further, while staying in the same 3B parameter budget.

Architecture and Capabilities that Matters

The architecture uses 10 double gated short range LIV convolution blocks and 6 grouped query attention blocks, arranged in a hybrid stack. This design reduces KV cache cost and keeps inference fast on consumer GPUs and NPUs.

The pre training mixture uses roughly 75 percent English, 20 percent multilingual data, and 5 percent code. The supported languages include English, Arabic, Chinese, French, German, Japanese, Korean, and Spanish.

LFM2 models expose a ChatML like template and native tool use tokens. Tools are described as JSON between dedicated tool list markers. The model then emits Python like calls between tool call markers and reads tool responses between tool response markers. This structure makes the model suitable as the agent core for tool calling stacks without custom prompt engineering.

LFM2-2.6B, and by extension LFM2-2.6B-Exp, is also the only model in the family that enables dynamic hybrid reasoning through special think tokens for complex or multilingual inputs. That capability remains available because the RL checkpoint does not change tokenization or architecture.

Key Takeaways

LFM2-2.6B-Exp is an experimental checkpoint of LFM2-2.6B that adds a pure reinforcement learning stage on top of a pretrained, supervised and preference aligned base, targeted at instruction following, knowledge tasks, and math.

The LFM2-2.6B backbone uses a hybrid architecture that combines double gated short range LIV convolution blocks and grouped query attention blocks, with 30 layers, 22 convolution layers and 8 attention layers, 32,768 token context length, and a 10 trillion token training budget at 2.6B parameters.

LFM2-2.6B already achieves strong benchmark scores in the 3B class, around 82.41 percent on GSM8K and 79.56 percent on IFEval, and the LFM2-2.6B-Exp RL checkpoint further improves instruction following and math performance without changing the architecture or memory profile.

Liquid AI reports that on IFBench, an instruction following benchmark, LFM2-2.6B-Exp surpasses DeepSeek R1-0528 even though the latter has many more parameters, which shows a strong performance per parameter for constrained deployment settings.

LFM2-2.6B-Exp is released on Hugging Face with open weights under the LFM Open License v1.0 and is supported through Transformers, vLLM, llama.cpp GGUF quantizations, and ONNXRuntime, making it suitable for agentic systems, structured data extraction, retrieval augmented generation, and on device assistants where a compact 3B model is required.

Check out the Model here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post Liquid AI’s LFM2-2.6B-Exp Uses Pure Reinforcement Learning RL And Dynamic Hybrid Reasoning To Tighten Small Model Behavior appeared first on MarkTechPost.

NVIDIA AI Researchers Release NitroGen: An Open Vision Action Foundati …

NVIDIA AI research team released NitroGen, an open vision action foundation model for generalist gaming agents that learns to play commercial games directly from pixels and gamepad actions using internet video at scale. NitroGen is trained on 40,000 hours of gameplay across more than 1,000 games and comes with an open dataset, a universal simulator, and a pre trained policy.

https://nitrogen.minedojo.org/assets/documents/nitrogen.pdf

Internet scale video action dataset

The NitroGen pipeline starts from publicly available gameplay videos that include input overlays, for example gamepad visualizations that streamers place in a corner of the screen. The research team collects 71,000 hours of raw video with such overlays, then applies quality filtering based on action density, which leaves 55% of the data, about 40,000 hours, spanning more than 1,000 games.

The curated dataset contains 38,739 videos from 818 creators. The distribution covers a wide range of titles. There are 846 games with more than 1 hour of data, 91 games with more than 100 hours, and 15 games with more than 1,000 hours each. Action RPGs account for 34.9 percent of the hours, platformers for 18.4 percent, and action adventure titles for 9.2 percent, with the rest spread across sports, roguelike, racing and other genres.

Action extraction from controller overlays

To recover frame level actions from raw streams, NitroGen uses a three stage action extraction pipeline. First, a template matching module localizes the controller overlay using about 300 controller templates. For each video, the system samples 25 frames and matches SIFT and XFeat features between frames and templates, then estimates an affine transform when at least 20 inliers support a match. This yields a crop of the controller region for all frames.

Second, a SegFormer based hybrid classification segmentation model parses the controller crops. The model takes two consecutive frames concatenated spatially and outputs joystick locations on an 11 by 11 grid plus binary button states. It is trained on 8 million synthetic images rendered with different controller templates, opacities, sizes and compression settings, using AdamW with learning rate 0.0001, weight decay 0.1, and batch size 256.

Third, the pipeline refines joystick positions and filters low activity segments. Joystick coordinates are normalized to the range from −1.0 to 1.0 using the 99th percentile of absolute x and y values to reduce outliers. Chunks where fewer than 50 percent of timesteps have non zero actions are removed, which avoids over predicting the null action during policy training.

A separate benchmark with ground truth controller logs shows that joystick predictions reach an average R² of 0.84 and button frame accuracy reaches 0.96 across major controller families such as Xbox and PlayStation. This validates that automatic annotations are accurate enough for large scale behavior cloning.

Universal simulator and multi game benchmark

NitroGen includes a universal simulator that wraps commercial Windows games in a Gymnasium compatible interface. The wrapper intercepts the game engine system clock to control simulation time and supports frame by frame interaction without modifying game code, for any title that uses the system clock for physics and interactions.

Observations in this benchmark are single RGB frames. Actions are defined as a unified controller space with a 16 dimensional binary vector for gamepad buttons, four d pad buttons, four face buttons, two shoulders, two triggers, two joystick thumb buttons, start and back, plus a 4 dimensional continuous vector for joystick positions, left and right x,y. This unified layout allows direct transfer of one policy across many games.

The evaluation suite covers 10 commercial games and 30 tasks. There are 5 two dimensional games, three side scrollers and two top down roguelikes, and 5 three dimensional games, two open world games, two combat focused action RPGs and one sports title. Tasks fall into 11 combat tasks, 10 navigation tasks, and 9 game specific tasks with custom objectives.

NitroGen model architecture

The NitroGen foundation policy follows the GR00T N1 architecture pattern for embodied agents. It discards the language and state encoders, and keeps a vision encoder plus a single action head. Input is one RGB frame at 256 by 256 resolution. A SigLIP 2 vision transformer encodes this frame into 256 image tokens.

A diffusion transformer, DiT, generates 16 step chunks of future actions. During training, noisy action chunks are embedded by a multilayer perceptron into action tokens, processed by a stack of DiT blocks with self attention and cross attention to visual tokens, then decoded back into continuous action vectors. The training objective is conditional flow matching with 16 denoising steps over each 16 action chunk.

The released checkpoint has 4.93 × 10^8 parameters. The model card describes the output as a 21 by 16 tensor, where 17 dimensions correspond to binary button states and 4 dimensions store two two dimensional joystick vectors, over 16 future timesteps. This representation is consistent with the unified action space, up to reshaping of the joystick components.

Training outcomes and transfer gains

NitroGen is trained purely with large scale behavior cloning on the internet video dataset. There is no reinforcement learning and no reward design in the base model. Image augmentations include random brightness, contrast, saturation, hue, small rotations, and random crops. Training uses AdamW with weight decay 0.001, a warmup stable decay learning rate schedule with constant phase at 0.0001, and an exponential moving average of weights with decay 0.9999.

After pre training on the full dataset, NitroGen 500M already achieves non trivial task completion rates in zero shot evaluation across all games in the benchmark. Average completion rates stay in the range from about 45 percent to 60 percent across combat, navigation and game specific tasks, and across two dimensional and three dimensional games, despite the noise in internet supervision.

For transfer to unseen games, the research team hold out a title, pre train on the remaining data, and then fine tune on the held out game under a fixed data and compute budget. On an isometric roguelike, fine tuning from NitroGen gives an average relative improvement of about 10 percent compared with training from scratch. On a three dimensional action RPG, the average gain is about 25 percent, and for some combat tasks in the low data regime, 30 hours, the relative improvement reaches 52 percent.

Key Takeaways

NitroGen is a generalist vision action foundation model for games: It maps 256×256 RGB frames directly to standardized gamepad actions and is trained with pure behavior cloning on internet gameplay, without any reinforcement learning.

The dataset is large scale and automatically labeled from controller overlays: NitroGen uses 40,000 hours of filtered gameplay from 38,739 videos across more than 1,000 games, where frame level actions are extracted from visual controller overlays using a SegFormer based parsing pipeline.

Unified controller action space enables cross game transfer: Actions are represented in a shared space of about 20 dimensions per timestep, including binary gamepad buttons and continuous joystick vectors, which allows a single policy to be deployed across many commercial Windows games using a universal Gymnasium style simulator.

Diffusion transformer policy with conditional flow matching: The 4.93 × 10^8 parameter model uses a SigLIP 2 vision encoder plus a DiT based action head trained with conditional flow matching on 16 step action chunks, achieving robust control from noisy web scale data.

Pretraining on NitroGen improves downstream game performance: When fine tuned on held out titles under the same data and compute budget, NitroGen based initialization yields consistent relative gains, around 10 percent to 25 percent on average and up to 52 percent in low data combat tasks, compared to training from scratch.

Check out the Paper and Model here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post NVIDIA AI Researchers Release NitroGen: An Open Vision Action Foundation Model For Generalist Gaming Agents appeared first on MarkTechPost.

How to Build Production-Grade Agentic Workflows with GraphBit Using De …

In this tutorial, we build an end-to-end, production-style agentic workflow using GraphBit that demonstrates how graph-structured execution, tool calling, and optional LLM-driven agents can coexist in a single system. We start by initializing and inspecting the GraphBit runtime, then define a realistic customer-support ticket domain with typed data structures and deterministic, offline-executable tools. We show how these tools can be composed into a reliable, rule-based pipeline for classification, routing, and response drafting, and then elevate that same logic into a validated GraphBit workflow in which agent nodes orchestrate tool usage via a directed graph. Throughout the tutorial, we keep the system running in offline mode while enabling seamless promotion to online execution by simply providing an LLM configuration, illustrating how GraphBit supports the gradual adoption of agentic intelligence without sacrificing reproducibility or operational control. Check out the Full Codes here.

Copy CodeCopiedUse a different Browser!pip -q install graphbit rich pydantic numpy

import os
import time
import json
import random
from dataclasses import dataclass
from typing import Dict, Any, List, Optional
import numpy as np
from rich import print as rprint
from rich.panel import Panel
from rich.table import Table

We begin by installing all required dependencies and importing the core Python, numerical, and visualization libraries needed for the tutorial. We set up the runtime environment so the notebook remains self-contained and reproducible on Google Colab. Check out the Full Codes here.

Copy CodeCopiedUse a different Browserfrom graphbit import init, shutdown, configure_runtime, get_system_info, health_check, version
from graphbit import Workflow, Node, Executor, LlmConfig
from graphbit import tool, ToolExecutor, ExecutorConfig
from graphbit import get_tool_registry, clear_tools

configure_runtime(worker_threads=4, max_blocking_threads=8, thread_stack_size_mb=2)
init(log_level=”warn”, enable_tracing=False, debug=False)

info = get_system_info()
health = health_check()

sys_table = Table(title=”System Info / Health”)
sys_table.add_column(“Key”, style=”bold”)
sys_table.add_column(“Value”)
for k in [“version”, “python_binding_version”, “cpu_count”, “runtime_worker_threads”, “runtime_initialized”, “build_target”, “build_profile”]:
sys_table.add_row(k, str(info.get(k)))
sys_table.add_row(“graphbit_version()”, str(version()))
sys_table.add_row(“overall_healthy”, str(health.get(“overall_healthy”)))
rprint(sys_table)

We initialize the GraphBit runtime and explicitly configure its execution parameters to control threading and resource usage. We then query system metadata and perform a health check to verify that the runtime is correctly initialized. Check out the Full Codes here.

Copy CodeCopiedUse a different Browser@dataclass
class Ticket:
ticket_id: str
user_id: str
text: str
created_at: float

def make_tickets(n: int = 10) -> List[Ticket]:
seeds = [
“My card payment failed twice, what should I do?”,
“I want to cancel my subscription immediately.”,
“Your app crashes when I open the dashboard.”,
“Please update my email address on the account.”,
“Refund not received after 7 days.”,
“My delivery is delayed and tracking is stuck.”,
“I suspect fraudulent activity on my account.”,
“How can I change my billing cycle date?”,
“The website is very slow and times out.”,
“I forgot my password and cannot login.”,
“Chargeback process details please.”,
“Need invoice for last month’s payment.”
]
random.shuffle(seeds)
out = []
for i in range(n):
out.append(
Ticket(
ticket_id=f”T-{1000+i}”,
user_id=f”U-{random.randint(100,999)}”,
text=seeds[i % len(seeds)],
created_at=time.time() – random.randint(0, 7 * 24 * 3600),
)
)
return out

tickets = make_tickets(10)
rprint(Panel.fit(“n”.join([f”- {t.ticket_id}: {t.text}” for t in tickets]), title=”Sample Tickets”))

We define a strongly typed data model for support tickets and generate a synthetic dataset that simulates realistic customer issues. We construct tickets with timestamps and identifiers to mirror production inputs. This dataset serves as the shared input across both offline and agent-driven pipelines. Check out the Full Codes here.

Copy CodeCopiedUse a different Browserclear_tools()

@tool(_description=”Classify a support ticket into a coarse category.”)
def classify_ticket(text: str) -> Dict[str, Any]:
t = text.lower()
if “fraud” in t or “fraudulent” in t:
return {“category”: “fraud”, “priority”: “p0”}
if “cancel” in t:
return {“category”: “cancellation”, “priority”: “p1”}
if “refund” in t or “chargeback” in t:
return {“category”: “refunds”, “priority”: “p1”}
if “password” in t or “login” in t:
return {“category”: “account_access”, “priority”: “p2”}
if “crash” in t or “slow” in t or “timeout” in t:
return {“category”: “bug”, “priority”: “p2”}
if “payment” in t or “billing” in t or “invoice” in t:
return {“category”: “billing”, “priority”: “p2”}
if “delivery” in t or “tracking” in t:
return {“category”: “delivery”, “priority”: “p3”}
return {“category”: “general”, “priority”: “p3″}

@tool(_description=”Route a ticket to a queue (returns queue id and SLA hours).”)
def route_ticket(category: str, priority: str) -> Dict[str, Any]:
queue_map = {
“fraud”: (“risk_ops”, 2),
“cancellation”: (“retention”, 8),
“refunds”: (“payments_ops”, 12),
“account_access”: (“identity”, 12),
“bug”: (“engineering_support”, 24),
“billing”: (“billing_support”, 24),
“delivery”: (“logistics_support”, 48),
“general”: (“support_general”, 48),
}
q, sla = queue_map.get(category, (“support_general”, 48))
if priority == “p0”:
sla = min(sla, 2)
elif priority == “p1”:
sla = min(sla, 8)
return {“queue”: q, “sla_hours”: sla}

@tool(_description=”Generate a playbook response based on category + priority.”)
def draft_response(category: str, priority: str, ticket_text: str) -> Dict[str, Any]:
templates = {
“fraud”: “We’ve temporarily secured your account. Please confirm last 3 transactions and reset credentials.”,
“cancellation”: “We can help cancel your subscription. Please confirm your plan and the effective date you want.”,
“refunds”: “We’re checking the refund status. Please share the order/payment reference and date.”,
“account_access”: “Let’s get you back in. Please use the password reset link; if blocked, we’ll verify identity.”,
“bug”: “Thanks for reporting. Please share device/browser + a screenshot; we’ll attempt reproduction.”,
“billing”: “We can help with billing. Please confirm the last 4 digits and the invoice period you need.”,
“delivery”: “We’re checking shipment status. Please share your tracking ID and delivery address PIN/ZIP.”,
“general”: “Thanks for reaching out.”
}
base = templates.get(category, templates[“general”])
tone = “urgent” if priority == “p0” else (“fast” if priority == “p1” else “standard”)
return {
“tone”: tone,
“message”: f”{base}nnContext we received: ‘{ticket_text}'”,
“next_steps”: [“request_missing_info”, “log_case”, “route_to_queue”]
}

registry = get_tool_registry()
tools_list = registry.list_tools() if hasattr(registry, “list_tools”) else []
rprint(Panel.fit(f”Registered tools: {tools_list}”, title=”Tool Registry”))

We register deterministic business tools for ticket classification, routing, and response drafting using GraphBit’s tool interface. We encode domain logic directly into these tools so they can be executed without any LLM dependency. This establishes a reliable, testable foundation for later agent orchestration. Check out the Full Codes here.

Copy CodeCopiedUse a different Browsertool_exec_cfg = ExecutorConfig(
max_execution_time_ms=10_000,
max_tool_calls=50,
continue_on_error=False,
store_results=True,
enable_logging=False
)
tool_executor = ToolExecutor(config=tool_exec_cfg) if “config” in ToolExecutor.__init__.__code__.co_varnames else ToolExecutor()

def offline_triage(ticket: Ticket) -> Dict[str, Any]:
c = classify_ticket(ticket.text)
rt = route_ticket(c[“category”], c[“priority”])
dr = draft_response(c[“category”], c[“priority”], ticket.text)
return {
“ticket_id”: ticket.ticket_id,
“user_id”: ticket.user_id,
“category”: c[“category”],
“priority”: c[“priority”],
“queue”: rt[“queue”],
“sla_hours”: rt[“sla_hours”],
“draft”: dr[“message”],
“tone”: dr[“tone”],
“steps”: [
(“classify_ticket”, c),
(“route_ticket”, rt),
(“draft_response”, dr),
]
}

offline_results = [offline_triage(t) for t in tickets]

res_table = Table(title=”Offline Pipeline Results”)
res_table.add_column(“Ticket”, style=”bold”)
res_table.add_column(“Category”)
res_table.add_column(“Priority”)
res_table.add_column(“Queue”)
res_table.add_column(“SLA (h)”)
for r in offline_results:
res_table.add_row(r[“ticket_id”], r[“category”], r[“priority”], r[“queue”], str(r[“sla_hours”]))
rprint(res_table)

prio_counts: Dict[str, int] = {}
sla_vals: List[int] = []
for r in offline_results:
prio_counts[r[“priority”]] = prio_counts.get(r[“priority”], 0) + 1
sla_vals.append(int(r[“sla_hours”]))

metrics = {
“offline_mode”: True,
“tickets”: len(offline_results),
“priority_distribution”: prio_counts,
“sla_mean”: float(np.mean(sla_vals)) if sla_vals else None,
“sla_p95″: float(np.percentile(sla_vals, 95)) if sla_vals else None,
}

rprint(Panel.fit(json.dumps(metrics, indent=2), title=”Offline Metrics”))

We compose the registered tools into an offline execution pipeline and apply it across all tickets to produce structured triage results. We aggregate outputs into tables and compute priority and SLA metrics to evaluate system behavior. It demonstrates how GraphBit-based logic can be validated deterministically before introducing agents. Check out the Full Codes here.

Copy CodeCopiedUse a different BrowserSYSTEM_POLICY = “You are a reliable support ops agent. Return STRICT JSON only.”

workflow = Workflow(“Ticket Triage Workflow (GraphBit)”)

summarizer = Node.agent(
name=”Summarizer”,
agent_id=”summarizer”,
system_prompt=SYSTEM_POLICY,
prompt=”Summarize this ticket in 1-2 lines. Return JSON: {“summary”:”…”}nTicket: {input}”,
temperature=0.2,
max_tokens=200
)

router_agent = Node.agent(
name=”RouterAgent”,
agent_id=”router”,
system_prompt=SYSTEM_POLICY,
prompt=(
“You MUST use tools.n”
“Call classify_ticket(text), route_ticket(category, priority), draft_response(category, priority, ticket_text).n”
“Return JSON with fields: category, priority, queue, sla_hours, message.n”
“Ticket: {input}”
),
tools=[classify_ticket, route_ticket, draft_response],
temperature=0.1,
max_tokens=700
)

formatter = Node.agent(
name=”FinalFormatter”,
agent_id=”final_formatter”,
system_prompt=SYSTEM_POLICY,
prompt=(
“Validate the JSON and output STRICT JSON only:n”
“{“ticket_id”:”…”,”category”:”…”,”priority”:”…”,”queue”:”…”,”sla_hours”:0,”customer_message”:”…”}n”
“Input: {input}”
),
temperature=0.0,
max_tokens=500
)

sid = workflow.add_node(summarizer)
rid = workflow.add_node(router_agent)
fid = workflow.add_node(formatter)

workflow.connect(sid, rid)
workflow.connect(rid, fid)
workflow.validate()

rprint(Panel.fit(“Workflow validated: Summarizer -> RouterAgent -> FinalFormatter”, title=”Workflow Graph”))

We construct a directed GraphBit workflow composed of multiple agent nodes with clearly defined responsibilities and strict JSON contracts. We connect these nodes into a validated execution graph that mirrors the earlier offline logic at an agent level. Check out the Full Codes here.

Copy CodeCopiedUse a different Browserdef pick_llm_config() -> Optional[Any]:
if os.getenv(“OPENAI_API_KEY”):
return LlmConfig.openai(os.getenv(“OPENAI_API_KEY”), “gpt-4o-mini”)
if os.getenv(“ANTHROPIC_API_KEY”):
return LlmConfig.anthropic(os.getenv(“ANTHROPIC_API_KEY”), “claude-sonnet-4-20250514”)
if os.getenv(“DEEPSEEK_API_KEY”):
return LlmConfig.deepseek(os.getenv(“DEEPSEEK_API_KEY”), “deepseek-chat”)
if os.getenv(“MISTRALAI_API_KEY”):
return LlmConfig.mistralai(os.getenv(“MISTRALAI_API_KEY”), “mistral-large-latest”)
return None

def run_agent_flow_once(ticket_text: str) -> Dict[str, Any]:
llm_cfg = pick_llm_config()
if llm_cfg is None:
return {
“mode”: “offline”,
“note”: “Set OPENAI_API_KEY / ANTHROPIC_API_KEY / DEEPSEEK_API_KEY / MISTRALAI_API_KEY to enable execution.”,
“input”: ticket_text
}
executor = Executor(llm_cfg, lightweight_mode=True, timeout_seconds=90, debug=False) if “lightweight_mode” in Executor.__init__.__code__.co_varnames else Executor(llm_cfg)
if hasattr(executor, “configure”):
executor.configure(timeout_seconds=90, max_retries=2, enable_metrics=True, debug=False)
wf = Workflow(“Single Ticket Run”)
s = Node.agent(
name=”Summarizer”,
agent_id=”summarizer”,
system_prompt=SYSTEM_POLICY,
prompt=f”Summarize this ticket in 1-2 lines. Return JSON: {{“summary”:”…”}}nTicket: {ticket_text}”,
temperature=0.2,
max_tokens=200
)
r = Node.agent(
name=”RouterAgent”,
agent_id=”router”,
system_prompt=SYSTEM_POLICY,
prompt=(
“You MUST use tools.n”
“Call classify_ticket(text), route_ticket(category, priority), draft_response(category, priority, ticket_text).n”
“Return JSON with fields: category, priority, queue, sla_hours, message.n”
f”Ticket: {ticket_text}”
),
tools=[classify_ticket, route_ticket, draft_response],
temperature=0.1,
max_tokens=700
)
f = Node.agent(
name=”FinalFormatter”,
agent_id=”final_formatter”,
system_prompt=SYSTEM_POLICY,
prompt=(
“Validate the JSON and output STRICT JSON only:n”
“{“ticket_id”:”…”,”category”:”…”,”priority”:”…”,”queue”:”…”,”sla_hours”:0,”customer_message”:”…”}n”
“Input: {input}”
),
temperature=0.0,
max_tokens=500
)
sid = wf.add_node(s)
rid = wf.add_node(r)
fid = wf.add_node(f)
wf.connect(sid, rid)
wf.connect(rid, fid)
wf.validate()
t0 = time.time()
result = executor.execute(wf)
dt_ms = int((time.time() – t0) * 1000)
out = {“mode”: “online”, “execution_time_ms”: dt_ms, “success”: bool(result.is_success()) if hasattr(result, “is_success”) else None}
if hasattr(result, “get_all_variables”):
out[“variables”] = result.get_all_variables()
else:
out[“raw”] = str(result)[:3000]
return out

sample = tickets[0]
agent_run = run_agent_flow_once(sample.text)
rprint(Panel.fit(json.dumps(agent_run, indent=2)[:3000], title=”Agent Workflow Run”))

rprint(Panel.fit(“Done”, title=”Complete”))

We add optional LLM configuration and execution logic that enables the same workflow to run autonomously when a provider key is available. We execute the workflow on a single ticket and capture execution status and outputs. This final step illustrates how the system seamlessly transitions from offline determinism to fully agentic execution.

In conclusion, we implemented a complete GraphBit workflow spanning runtime configuration, tool registration, offline deterministic execution, metric aggregation, and optional agent-based orchestration with external LLM providers. We demonstrated how the same business logic can be executed both manually via tools and automatically via agent nodes connected in a validated graph, highlighting GraphBit’s strength as an execution substrate rather than just an LLM wrapper. We showed that complex agentic systems can be designed to fail gracefully, run without external dependencies, and still scale to fully autonomous workflows when LLMs are enabled.

Check out the Full Codes here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post How to Build Production-Grade Agentic Workflows with GraphBit Using Deterministic Tools, Validated Execution Graphs, and Optional LLM Orchestration appeared first on MarkTechPost.

A Coding Implementation on Building Self-Organizing Zettelkasten Knowl …

In this tutorial, we dive into the cutting edge of Agentic AI by building a “Zettelkasten” memory system, a “living” architecture that organizes information much like the human brain. We move beyond standard retrieval methods to construct a dynamic knowledge graph where an agent autonomously decomposes inputs into atomic facts, links them semantically, and even “sleeps” to consolidate memories into higher-order insights. Using Google’s Gemini, we implement a robust solution that addresses real-world API constraints, ensuring our agent stores data and also actively understands the evolving context of our projects. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browser!pip install -q -U google-generativeai networkx pyvis scikit-learn numpy

import os
import json
import uuid
import time
import getpass
import random
import networkx as nx
import numpy as np
import google.generativeai as genai
from dataclasses import dataclass, field
from typing import List
from sklearn.metrics.pairwise import cosine_similarity
from IPython.display import display, HTML
from pyvis.network import Network
from google.api_core import exceptions

def retry_with_backoff(func, *args, **kwargs):
max_retries = 5
base_delay = 5

for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except exceptions.ResourceExhausted:
wait_time = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f” Quota limit hit. Cooling down for {wait_time:.1f}s…”)
time.sleep(wait_time)
except Exception as e:
if “429” in str(e):
wait_time = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f” Quota limit hit (HTTP 429). Cooling down for {wait_time:.1f}s…”)
time.sleep(wait_time)
else:
print(f” Unexpected Error: {e}”)
return None
print(” Max retries reached.”)
return None

print(“Enter your Google AI Studio API Key (Input will be hidden):”)
API_KEY = getpass.getpass()

genai.configure(api_key=API_KEY)
MODEL_NAME = “gemini-2.5-flash”
EMBEDDING_MODEL = “models/text-embedding-004″

print(f” API Key configured. Using model: {MODEL_NAME}”)

We begin by importing essential libraries for graph management and AI model interaction, while also securing our API key input. Crucially, we define a robust retry_with_backoff function that automatically handles rate limit errors, ensuring our agent gracefully pauses and recovers when the API quota is exceeded during heavy processing. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browser@dataclass
class MemoryNode:
id: str
content: str
type: str
embedding: List[float] = field(default_factory=list)
timestamp: int = 0

class RobustZettelkasten:
def __init__(self):
self.graph = nx.Graph()
self.model = genai.GenerativeModel(MODEL_NAME)
self.step_counter = 0

def _get_embedding(self, text):
result = retry_with_backoff(
genai.embed_content,
model=EMBEDDING_MODEL,
content=text
)
return result[’embedding’] if result else [0.0] * 768

We define the fundamental MemoryNode structure to hold our content, types, and vector embeddings in an organized data class. We then initialize the main RobustZettelkasten class, establishing the network graph and configuring the Gemini embedding model that serves as the backbone of our semantic search capabilities. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserdef _atomize_input(self, text):
prompt = f”””
Break the following text into independent atomic facts.
Output JSON: {{ “facts”: [“fact1”, “fact2”] }}
Text: “{text}”
“””
response = retry_with_backoff(
self.model.generate_content,
prompt,
generation_config={“response_mime_type”: “application/json”}
)
try:
return json.loads(response.text).get(“facts”, []) if response else [text]
except:
return [text]

def _find_similar_nodes(self, embedding, top_k=3, threshold=0.45):
if not self.graph.nodes: return []

nodes = list(self.graph.nodes(data=True))
embeddings = [n[1][‘data’].embedding for n in nodes]
valid_embeddings = [e for e in embeddings if len(e) > 0]

if not valid_embeddings: return []

sims = cosine_similarity([embedding], embeddings)[0]
sorted_indices = np.argsort(sims)[::-1]

results = []
for idx in sorted_indices[:top_k]:
if sims[idx] > threshold:
results.append((nodes[idx][0], sims[idx]))
return results

def add_memory(self, user_input):
self.step_counter += 1
print(f”n [Step {self.step_counter}] Processing: “{user_input}””)

facts = self._atomize_input(user_input)

for fact in facts:
print(f” -> Atom: {fact}”)
emb = self._get_embedding(fact)
candidates = self._find_similar_nodes(emb)

node_id = str(uuid.uuid4())[:6]
node = MemoryNode(id=node_id, content=fact, type=’fact’, embedding=emb, timestamp=self.step_counter)
self.graph.add_node(node_id, data=node, title=fact, label=fact[:15]+”…”)

if candidates:
context_str = “n”.join([f”ID {c[0]}: {self.graph.nodes[c[0]][‘data’].content}” for c in candidates])
prompt = f”””
I am adding: “{fact}”
Existing Memory:
{context_str}

Are any of these directly related? If yes, provide the relationship label.
JSON: {{ “links”: [{{ “target_id”: “ID”, “rel”: “label” }}] }}
“””
response = retry_with_backoff(
self.model.generate_content,
prompt,
generation_config={“response_mime_type”: “application/json”}
)

if response:
try:
links = json.loads(response.text).get(“links”, [])
for link in links:
if self.graph.has_node(link[‘target_id’]):
self.graph.add_edge(node_id, link[‘target_id’], label=link[‘rel’])
print(f” Linked to {link[‘target_id’]} ({link[‘rel’]})”)
except:
pass

time.sleep(1)

We construct an ingestion pipeline that decomposes complex user inputs into atomic facts to prevent information loss. We immediately embed these facts and use our agent to identify and create semantic links to existing nodes, effectively building a knowledge graph in real time that mimics associative memory. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserdef consolidate_memory(self):
print(f”n [Consolidation Phase] Reflecting…”)
high_degree_nodes = [n for n, d in self.graph.degree() if d >= 2]
processed_clusters = set()

for main_node in high_degree_nodes:
neighbors = list(self.graph.neighbors(main_node))
cluster_ids = tuple(sorted([main_node] + neighbors))

if cluster_ids in processed_clusters: continue
processed_clusters.add(cluster_ids)

cluster_content = [self.graph.nodes[n][‘data’].content for n in cluster_ids]

prompt = f”””
Generate a single high-level insight summary from these facts.
Facts: {json.dumps(cluster_content)}
JSON: {{ “insight”: “Your insight here” }}
“””
response = retry_with_backoff(
self.model.generate_content,
prompt,
generation_config={“response_mime_type”: “application/json”}
)

if response:
try:
insight_text = json.loads(response.text).get(“insight”)
if insight_text:
insight_id = f”INSIGHT-{uuid.uuid4().hex[:4]}”
print(f” Insight: {insight_text}”)
emb = self._get_embedding(insight_text)

insight_node = MemoryNode(id=insight_id, content=insight_text, type=’insight’, embedding=emb)
self.graph.add_node(insight_id, data=insight_node, title=f”INSIGHT: {insight_text}”, label=”INSIGHT”, color=”#ff7f7f”)
self.graph.add_edge(insight_id, main_node, label=”abstracted_from”)
except:
continue
time.sleep(1)

def answer_query(self, query):
print(f”n Querying: “{query}””)
emb = self._get_embedding(query)
candidates = self._find_similar_nodes(emb, top_k=2)

if not candidates:
print(“No relevant memory found.”)
return

relevant_context = set()
for node_id, score in candidates:
node_content = self.graph.nodes[node_id][‘data’].content
relevant_context.add(f”- {node_content} (Direct Match)”)
for n1 in self.graph.neighbors(node_id):
rel = self.graph[node_id][n1].get(‘label’, ‘related’)
content = self.graph.nodes[n1][‘data’].content
relevant_context.add(f” – linked via ‘{rel}’ to: {content}”)

context_text = “n”.join(relevant_context)
prompt = f”””
Answer based ONLY on context.
Question: {query}
Context:
{context_text}
“””
response = retry_with_backoff(self.model.generate_content, prompt)
if response:
print(f” Agent Answer:n{response.text}”)

We implement the cognitive functions of our agent, enabling it to “sleep” and consolidate dense memory clusters into higher-order insights. We also define the query logic that traverses these connected paths, allowing the agent to reason across multiple hops in the graph to answer complex questions. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserdef show_graph(self):
try:
net = Network(notebook=True, cdn_resources=’remote’, height=”500px”, width=”100%”, bgcolor=’#222222′, font_color=’white’)
for n, data in self.graph.nodes(data=True):
color = “#97c2fc” if data[‘data’].type == ‘fact’ else “#ff7f7f”
net.add_node(n, label=data.get(‘label’, ”), title=data[‘data’].content, color=color)
for u, v, data in self.graph.edges(data=True):
net.add_edge(u, v, label=data.get(‘label’, ”))
net.show(“memory_graph.html”)
display(HTML(“memory_graph.html”))
except Exception as e:
print(f”Graph visualization error: {e}”)

brain = RobustZettelkasten()

events = [
“The project ‘Apollo’ aims to build a dashboard for tracking solar panel efficiency.”,
“We chose React for the frontend because the team knows it well.”,
“The backend must be Python to support the data science libraries.”,
“Client called. They are unhappy with React performance on low-end devices.”,
“We are switching the frontend to Svelte for better performance.”
]

print(“— PHASE 1: INGESTION —“)
for event in events:
brain.add_memory(event)
time.sleep(2)

print(“— PHASE 2: CONSOLIDATION —“)
brain.consolidate_memory()

print(“— PHASE 3: RETRIEVAL —“)
brain.answer_query(“What is the current frontend technology for Apollo and why?”)

print(“— PHASE 4: VISUALIZATION —“)
brain.show_graph()

We wrap up by adding a visualization method that generates an interactive HTML graph of our agent’s memory, allowing us to inspect the nodes and edges. Finally, we execute a test scenario involving a project timeline to verify that our system correctly links concepts, generates insights, and retrieves the right context.

In conclusion, we now have a fully functional “Living Memory” prototype that transcends simple database storage. By enabling our agent to actively link related concepts and reflect on its experiences during a “consolidation” phase, we solve the critical problem of fragmented context in long-running AI interactions. This system demonstrates that true intelligence requires processing power and a structured, evolving memory, marking the way for us to build more capable, personalized autonomous agents.

Check out the FULL CODES here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post A Coding Implementation on Building Self-Organizing Zettelkasten Knowledge Graphs and Sleep-Consolidation Mechanisms appeared first on MarkTechPost.

From Gemma 3 270M to FunctionGemma, How Google AI Built a Compact Func …

Google has released FunctionGemma, a specialized version of the Gemma 3 270M model that is trained specifically for function calling and designed to run as an edge agent that maps natural language to executable API actions.

But, What is FunctionGemma?

FunctionGemma is a 270M parameter text only transformer based on Gemma 3 270M. It keeps the same architecture as Gemma 3 and is released as an open model under the Gemma license, but the training objective and chat format are dedicated to function calling rather than free form dialogue.

The model is intended to be fine tuned for specific function calling tasks. It is not positioned as a general chat assistant. The primary design goal is to translate user instructions and tool definitions into structured function calls, then optionally summarize tool responses for the user.

From an interface perspective, FunctionGemma is presented as a standard causal language model. Inputs and outputs are text sequences, with an input context of 32K tokens and an output budget of up to 32K tokens per request, shared with the input length.

Architecture and training data

The model uses the Gemma 3 transformer architecture and the same 270M parameter scale as Gemma 3 270M. The training and runtime stack reuse the research and infrastructure used for Gemini, including JAX and ML Pathways on large TPU clusters.

FunctionGemma uses Gemma’s 256K vocabulary, which is optimized for JSON structures and multilingual text. This improves token efficiency for function schemas and tool responses and reduces sequence length for edge deployments where latency and memory are tight.

The model is trained on 6T tokens, with a knowledge cutoff in August 2024. The dataset focuses on two main categories:

public tool and API definitions

tool use interactions that include prompts, function calls, function responses and natural language follow up messages that summarize outputs or request clarification

This training signal teaches both syntax, which function to call and how to format arguments, and intent, when to call a function and when to ask for more information.

Conversation format and control tokens

FunctionGemma does not use a free form chat format. It expects a strict conversation template that separates roles and tool related regions. Conversation turns are wrapped with <start_of_turn>role … <end_of_turn> where roles are typically developer, user or model.

Within those turns, FunctionGemma relies on a fixed set of control token pairs

<start_function_declaration> and <end_function_declaration> for tool definitions

<start_function_call> and <end_function_call> for the model’s tool calls

<start_function_response> and <end_function_response> for serialized tool outputs

These markers let the model distinguish natural language text from function schemas and from execution results. The Hugging Face apply_chat_template API and the official Gemma templates generate this structure automatically for messages and tool lists.

Fine tuning and Mobile Actions performance

Out of the box, FunctionGemma is already trained for generic tool use. However, the official Mobile Actions guide and the model card emphasize that small models reach production level reliability only after task specific fine tuning.

The Mobile Actions demo uses a dataset where each example exposes a small set of tools for Android system operations, for example create a contact, set a calendar event, control the flashlight and map viewing. FunctionGemma learns to map utterances such as ‘Create a calendar event for lunch tomorrow’ or ‘Turn on the flashlight’ to those tools with structured arguments.

On the Mobile Actions evaluation, the base FunctionGemma model reaches 58 percent accuracy on a held out test set. After fine tuning with the public cookbook recipe, accuracy increases to 85 percent.

Edge agents and reference demos

The main deployment target for FunctionGemma is edge agents that run locally on phones, laptops and small accelerators such as NVIDIA Jetson Nano. The small parameter count, 0.3B, and support for quantization allow inference with low memory and low latency on consumer hardware.

Google ships several reference experiences through the Google AI Edge Gallery

Mobile Actions shows a fully offline assistant style agent for device control using FunctionGemma fine tuned on the Mobile Actions dataset and deployed on device.

Tiny Garden is a voice controlled game where the model decomposes commands such as “Plant sunflowers in the top row and water them” into domain specific functions like plant_seed and water_plots with explicit grid coordinates.

FunctionGemma Physics Playground runs entirely in the browser using Transformers.js and lets users solve physics puzzles via natural language instructions that the model converts into simulation actions.

These demos validate that a 270M parameter function caller can support multi step logic on device without server calls, given appropriate fine tuning and tool interfaces.

Key Takeaways

FunctionGemma is a 270M parameter, text only variant of Gemma 3 that is trained specifically for function calling, not for open ended chat, and is released as an open model under the Gemma terms of use.

The model keeps the Gemma 3 transformer architecture and 256k token vocabulary, supports 32k tokens per request shared between input and output, and is trained on 6T tokens.

FunctionGemma uses a strict chat template with <start_of_turn>role … <end_of_turn> and dedicated control tokens for function declarations, function calls and function responses, which is required for reliable tool use in production systems.

On the Mobile Actions benchmark, accuracy improves from 58 percent for the base model to 85 percent after task specific fine tuning, showing that small function callers need domain data more than prompt engineering.

The 270M scale and quantization support let FunctionGemma run on phones, laptops and Jetson class devices, and the model is already integrated into ecosystems such as Hugging Face, Vertex AI, LM Studio and edge demos like Mobile Actions, Tiny Garden and the Physics Playground.

Check out the Technical details and Model on HF. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post From Gemma 3 270M to FunctionGemma, How Google AI Built a Compact Function Calling Specialist for Edge Workloads appeared first on MarkTechPost.

A Coding Guide to Build an Autonomous Multi-Agent Logistics System wit …

In this tutorial, we build an advanced, fully autonomous logistics simulation in which multiple smart delivery trucks operate within a dynamic city-wide road network. We design the system so that each truck behaves as an agent capable of bidding on delivery orders, planning optimal routes, managing battery levels, seeking charging stations, and maximizing profit through self-interested decision-making. Through each code snippet, we explore how agentic behaviors emerge from simple rules, how competition shapes order allocation, and how a graph-based world enables realistic movement, routing, and resource constraints. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserimport networkx as nx
import matplotlib.pyplot as plt
import random
import time
from IPython.display import clear_output
from dataclasses import dataclass, field
from typing import List, Dict, Optional

NUM_NODES = 30
CONNECTION_RADIUS = 0.25
NUM_AGENTS = 5
STARTING_BALANCE = 1000
FUEL_PRICE = 2.0
PAYOUT_MULTIPLIER = 5.0
BATTERY_CAPACITY = 100
CRITICAL_BATTERY = 25

@dataclass
class Order:
id: str
target_node: int
weight_kg: int
payout: float
status: str = “pending”

class AgenticTruck:
def __init__(self, agent_id, start_node, graph, capacity=100):
self.id = agent_id
self.current_node = start_node
self.graph = graph
self.battery = BATTERY_CAPACITY
self.balance = STARTING_BALANCE
self.capacity = capacity
self.state = “IDLE”
self.path: List[int] = []
self.current_order: Optional[Order] = None
self.target_node: int = start_node

We set up all the core building blocks of the simulation, including imports, global parameters, and the basic data structures. We also define the AgenticTruck class and initialize key attributes, including position, battery, balance, and operating state. We lay the foundation for all agent behaviors to evolve. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browser def get_path_cost(self, start, end):
try:
length = nx.shortest_path_length(self.graph, start, end, weight=’weight’)
path = nx.shortest_path(self.graph, start, end, weight=’weight’)
return length, path
except nx.NetworkXNoPath:
return float(‘inf’), []

def find_nearest_charger(self):
chargers = [n for n, attr in self.graph.nodes(data=True) if attr.get(‘type’) == ‘charger’]
best_charger = None
min_dist = float(‘inf’)
best_path = []
for charger in chargers:
dist, path = self.get_path_cost(self.current_node, charger)
if dist < min_dist:
min_dist = dist
best_charger = charger
best_path = path
return best_charger, best_path

def calculate_bid(self, order):
if order.weight_kg > self.capacity:
return float(‘inf’)
if self.state != “IDLE” or self.battery < CRITICAL_BATTERY:
return float(‘inf’)
dist_to_target, _ = self.get_path_cost(self.current_node, order.target_node)
fuel_cost = dist_to_target * FUEL_PRICE
expected_profit = order.payout – fuel_cost
if expected_profit < 10:
return float(‘inf’)
return dist_to_target

def assign_order(self, order):
self.current_order = order
self.state = “MOVING”
self.target_node = order.target_node
_, self.path = self.get_path_cost(self.current_node, self.target_node)
if self.path: self.path.pop(0)

def go_charge(self):
charger_node, path = self.find_nearest_charger()
if charger_node is not None:
self.state = “TO_CHARGER”
self.target_node = charger_node
self.path = path
if self.path: self.path.pop(0)

We implement advanced decision-making logic for the trucks. We calculate shortest paths, identify nearby charging stations, and evaluate whether an order is profitable and feasible. We also prepare the truck to accept assignments or proactively seek charging when needed. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browser def step(self):
if self.state == “IDLE” and self.battery < CRITICAL_BATTERY:
self.go_charge()

if self.state == “CHARGING”:
self.battery += 10
self.balance -= 5
if self.battery >= 100:
self.battery = 100
self.state = “IDLE”
return

if self.path:
next_node = self.path[0]
edge_data = self.graph.get_edge_data(self.current_node, next_node)
distance = edge_data[‘weight’]
self.current_node = next_node
self.path.pop(0)
self.battery -= (distance * 2)
self.balance -= (distance * FUEL_PRICE)

if not self.path:
if self.state == “MOVING”:
self.balance += self.current_order.payout
self.current_order.status = “completed”
self.current_order = None
self.state = “IDLE”
elif self.state == “TO_CHARGER”:
self.state = “CHARGING”

We manage the step-by-step actions of each truck as the simulation runs. We handle battery recharging, financial impacts of movement, fuel consumption, and order completion. We ensure that agents transition smoothly between states, such as moving, charging, and idling. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserclass Simulation:
def __init__(self):
self.setup_graph()
self.setup_agents()
self.orders = []
self.order_count = 0

def setup_graph(self):
self.G = nx.random_geometric_graph(NUM_NODES, CONNECTION_RADIUS)
for (u, v) in self.G.edges():
self.G.edges[u, v][‘weight’] = random.uniform(1.0, 3.0)
for i in self.G.nodes():
r = random.random()
if r < 0.15:
self.G.nodes[i][‘type’] = ‘charger’
self.G.nodes[i][‘color’] = ‘red’
else:
self.G.nodes[i][‘type’] = ‘house’
self.G.nodes[i][‘color’] = ‘#A0CBE2’

def setup_agents(self):
self.agents = []
for i in range(NUM_AGENTS):
start_node = random.randint(0, NUM_NODES-1)
cap = random.choice([50, 100, 200])
self.agents.append(AgenticTruck(i, start_node, self.G, capacity=cap))

def generate_order(self):
target = random.randint(0, NUM_NODES-1)
weight = random.randint(10, 120)
payout = random.randint(50, 200)
order = Order(id=f”ORD-{self.order_count}”, target_node=target, weight_kg=weight, payout=payout)
self.orders.append(order)
self.order_count += 1
return order

def run_market(self):
for order in self.orders:
if order.status == “pending”:
bids = {agent: agent.calculate_bid(order) for agent in self.agents}
valid_bids = {k: v for k, v in bids.items() if v != float(‘inf’)}
if valid_bids:
winner = min(valid_bids, key=valid_bids.get)
winner.assign_order(order)
order.status = “assigned”

We create the simulated world and orchestrate agent interactions. We generate the graph-based city, spawn trucks with varying capacities, and produce new delivery orders. We also implement a simple market where agents bid for tasks based on profitability and distance. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browser def step(self):
if random.random() < 0.3:
self.generate_order()
self.run_market()
for agent in self.agents:
agent.step()

def visualize(self, step_num):
clear_output(wait=True)
plt.figure(figsize=(10, 8))
pos = nx.get_node_attributes(self.G, ‘pos’)
node_colors = [self.G.nodes[n][‘color’] for n in self.G.nodes()]
nx.draw(self.G, pos, node_color=node_colors, with_labels=True, node_size=300, edge_color=’gray’, alpha=0.6)

for agent in self.agents:
x, y = pos[agent.current_node]
jitter_x = x + random.uniform(-0.02, 0.02)
jitter_y = y + random.uniform(-0.02, 0.02)
color = ‘green’ if agent.state == “IDLE” else (‘orange’ if agent.state == “MOVING” else ‘red’)
plt.plot(jitter_x, jitter_y, marker=’s’, markersize=12, color=color, markeredgecolor=’black’)
plt.text(jitter_x, jitter_y+0.03, f”A{agent.id}n${int(agent.balance)}n{int(agent.battery)}%”,
fontsize=8, ha=’center’, fontweight=’bold’, bbox=dict(facecolor=’white’, alpha=0.7, pad=1))

for order in self.orders:
if order.status in [“assigned”, “pending”]:
ox, oy = pos[order.target_node]
plt.plot(ox, oy, marker=’*’, markersize=15, color=’gold’, markeredgecolor=’black’)

plt.title(f”Graph-Based Logistics Swarm | Step: {step_num}nRed Nodes = Chargers | Gold Stars = Orders”, fontsize=14)
plt.show()

print(“Initializing Advanced Simulation…”)
sim = Simulation()

for t in range(60):
sim.step()
sim.visualize(t)
time.sleep(0.5)

print(“Simulation Finished.”)

We step through the full simulation loop and visualize the logistics swarm in real time. We update agent states, draw the network, display active orders, and animate each truck’s movement. By running this loop, we observe the emergent coordination and competition that define our multi-agent logistics ecosystem.

In conclusion, we saw how the individual components, graph generation, autonomous routing, battery management, auctions, and visualization, come together to form a living, evolving system of agentic trucks. We watch as agents negotiate workloads, compete for profitable opportunities, and respond to environmental pressures such as distance, fuel costs, and charging needs. By running the simulation, we observe emergent dynamics that mirror real-world fleet behavior, providing a powerful sandbox for experimenting with logistics intelligence.

Check out the FULL CODES here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
The post A Coding Guide to Build an Autonomous Multi-Agent Logistics System with Route Planning, Dynamic Auctions, and Real-Time Visualization Using Graph-Based Simulation appeared first on MarkTechPost.