How to Build an Advanced AI Agent with Summarized Short-Term and Vecto …

In this tutorial, we walk you through building an advanced AI Agent that not only chats but also remembers. We start from scratch and demonstrate how to combine a lightweight LLM, FAISS vector search, and a summarization mechanism to create both short-term and long-term memory. By working together with embeddings and auto-distilled facts, we can craft an agent that adapts to our instructions, recalls important details in future conversations, and intelligently compresses context, ensuring the interaction remains smooth and efficient. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browser!pip -q install transformers accelerate bitsandbytes sentence-transformers faiss-cpu

import os, json, time, uuid, math, re
from datetime import datetime
import torch, faiss
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline, BitsAndBytesConfig
from sentence_transformers import SentenceTransformer
DEVICE = “cuda” if torch.cuda.is_available() else “cpu”

We begin by installing the essential libraries and importing all the required modules for our agent. We set up the environment to determine whether we are using a GPU or a CPU, allowing us to run the model efficiently. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserdef load_llm(model_name=”TinyLlama/TinyLlama-1.1B-Chat-v1.0″):
try:
if DEVICE==”cuda”:
bnb=BitsAndBytesConfig(load_in_4bit=True,bnb_4bit_compute_dtype=torch.bfloat16,bnb_4bit_quant_type=”nf4″)
tok=AutoTokenizer.from_pretrained(model_name, use_fast=True)
mdl=AutoModelForCausalLM.from_pretrained(model_name, quantization_config=bnb, device_map=”auto”)
else:
tok=AutoTokenizer.from_pretrained(model_name, use_fast=True)
mdl=AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, low_cpu_mem_usage=True)
return pipeline(“text-generation”, model=mdl, tokenizer=tok, device=0 if DEVICE==”cuda” else -1, do_sample=True)
except Exception as e:
raise RuntimeError(f”Failed to load LLM: {e}”)

We define a function to load our language model. We set it up so that if a GPU is available, we use 4-bit quantization for efficiency; otherwise, we fall back to the CPU with optimized settings. This ensures we can generate text smoothly regardless of the hardware we are running on. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserclass VectorMemory:
def __init__(self, path=”/content/agent_memory.json”, dim=384):
self.path=path; self.dim=dim; self.items=[]
self.embedder=SentenceTransformer(“sentence-transformers/all-MiniLM-L6-v2”, device=DEVICE)
self.index=faiss.IndexFlatIP(dim)
if os.path.exists(path):
data=json.load(open(path))
self.items=data.get(“items”,[])
if self.items:
X=torch.tensor([x[“emb”] for x in self.items], dtype=torch.float32).numpy()
self.index.add(X)
def _emb(self, text):
v=self.embedder.encode([text], normalize_embeddings=True)[0]
return v.tolist()
def add(self, text, meta=None):
e=self._emb(text); self.index.add(torch.tensor([e]).numpy())
rec={“id”:str(uuid.uuid4()),”text”:text,”meta”:meta or {}, “emb”:e}
self.items.append(rec); self._save(); return rec[“id”]
def search(self, query, k=5, thresh=0.25):
if len(self.items)==0: return []
q=self.embedder.encode([query], normalize_embeddings=True)
D,I=self.index.search(q, min(k, len(self.items)))
out=[]
for d,i in zip(D[0],I[0]):
if i==-1: continue
if d>=thresh: out.append((d,self.items[i]))
return out
def _save(self):
slim=[{k:v for k,v in it.items()} for it in self.items]
json.dump({“items”:slim}, open(self.path,”w”), indent=2)

We create a VectorMemory class that gives our agent long-term memory. We store past interactions as embeddings using MiniLM and index them with FAISS, allowing us to search and recall relevant information later. Each memory is saved to disk, enabling the agent to retain its memory across sessions. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserdef now_iso(): return datetime.now().isoformat(timespec=”seconds”)
def clamp(txt, n=1600): return txt if len(txt)<=n else txt[:n]+” …”
def strip_json(s):
m=re.search(r”{.*}”, s, flags=re.S);
return m.group(0) if m else None

SYS_GUIDE = (
“You are a helpful, concise assistant with memory. Use provided MEMORY when relevant. ”
“Prefer facts from MEMORY over guesses. Answer directly; keep code blocks tight. If unsure, say so.”
)

SUMMARIZE_PROMPT = lambda convo: f”Summarize the conversation below in 4-6 bullet points focusing on stable facts and tasks:nn{convo}nnSummary:”
DISTILL_PROMPT = lambda user: (
f”””Decide if the USER text contains durable info worth long-term memory (preferences, identity, projects, deadlines, facts).
Return compact JSON only: {{“save”: true/false, “memory”: “one-sentence memory”}}.
USER: {user}”””)

class MemoryAgent:
def __init__(self):
self.llm=load_llm()
self.mem=VectorMemory()
self.turns=[]
self.summary=””
self.max_turns=10
def _gen(self, prompt, max_new_tokens=256, temp=0.7):
out=self.llm(prompt, max_new_tokens=max_new_tokens, temperature=temp, top_p=0.95, num_return_sequences=1, pad_token_id=self.llm.tokenizer.eos_token_id)[0][“generated_text”]
return out[len(prompt):].strip() if out.startswith(prompt) else out.strip()
def _chat_prompt(self, user, memory_context):
convo=”n”.join([f”{r.upper()}: {t}” for r,t in self.turns[-8:]])
sys=f”System: {SYS_GUIDE}nTime: {now_iso()}nn”
mem = f”MEMORY (relevant excerpts):n{memory_context}nn” if memory_context else “”
summ=f”CONTEXT SUMMARY:n{self.summary}nn” if self.summary else “”
return sys+mem+summ+convo+f”nUSER: {user}nASSISTANT:”
def _distill_and_store(self, user):
try:
raw=self._gen(DISTILL_PROMPT(user), max_new_tokens=120, temp=0.1)
js=strip_json(raw)
if js:
obj=json.loads(js)
if obj.get(“save”) and obj.get(“memory”):
self.mem.add(obj[“memory”], {“ts”:now_iso(),”source”:”distilled”})
return True, obj[“memory”]
except Exception: pass
if re.search(r”b(my name is|call me|I like|deadline|due|email|phone|working on|prefer|timezone|birthday|goal|exam)b”, user, flags=re.I):
m=f”User said: {clamp(user,120)}”
self.mem.add(m, {“ts”:now_iso(),”source”:”heuristic”})
return True, m
return False, “”
def _maybe_summarize(self):
if len(self.turns)>self.max_turns:
convo=”n”.join([f”{r}: {t}” for r,t in self.turns])
s=self._gen(SUMMARIZE_PROMPT(clamp(convo, 3500)), max_new_tokens=180, temp=0.2)
self.summary=s; self.turns=self.turns[-4:]
def recall(self, query, k=5):
hits=self.mem.search(query, k=k)
return “n”.join([f”- ({d:.2f}) {h[‘text’]} [meta={h[‘meta’]}]” for d,h in hits])
def ask(self, user):
self.turns.append((“user”, user))
saved, memline = self._distill_and_store(user)
mem_ctx=self.recall(user, k=6)
prompt=self._chat_prompt(user, mem_ctx)
reply=self._gen(prompt)
self.turns.append((“assistant”, reply))
self._maybe_summarize()
status=f” memory_saved: {saved}; ” + (f”note: {memline}” if saved else “note: -“)
print(f”nUSER: {user}nASSISTANT: {reply}n{status}”)
return reply

We bring everything together into the MemoryAgent class. We design the agent to generate responses with context, distill important facts into long-term memory, and periodically summarize conversations to manage short-term context. With this setup, we create an assistant that remembers, recalls, and adapts to our interactions with it. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browseragent=MemoryAgent()

print(” Agent ready. Try these:n”)
agent.ask(“Hi! My name is Nicolaus, I prefer being called Nik. I’m preparing for UPSC in 2027.”)
agent.ask(“Also, I work at Visa in analytics and love concise answers.”)
agent.ask(“What’s my exam year and how should you address me next time?”)
agent.ask(“Reminder: I like agentic RAG tutorials with single-file Colab code.”)
agent.ask(“Given my prefs, suggest a study focus for this week in one paragraph.”)

We instantiate our MemoryAgent and immediately exercise it with a few messages to seed long-term memories and verify recall. We confirm it remembers our preferred name and exam year, adapts replies to our concise style, and uses past preferences (agentic RAG, single-file Colab) to tailor study guidance in the present.

In conclusion, we see how powerful it is when we give our AI Agent the ability to remember. We now have an agent that stores key details, recalls them when relevant, and summarizes conversations to stay efficient. This approach keeps our interactions contextual and evolving, making the agent feel more personal and intelligent with each exchange. With this foundation, we are ready to extend memory further, explore richer schemas, and experiment with more advanced memory-augmented agent designs.

Check out the FULL CODES here. Feel free to check out our GitHub Page for Tutorials, Codes and Notebooks. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter.
The post How to Build an Advanced AI Agent with Summarized Short-Term and Vector-Based Long-Term Memory appeared first on MarkTechPost.

Train and deploy models on Amazon SageMaker HyperPod using the new Hyp …

Training and deploying large AI models requires advanced distributed computing capabilities, but managing these distributed systems shouldn’t be complex for data scientists and machine learning (ML) practitioners. The newly released command line interface (CLI) and software development kit (SDK) for Amazon SageMaker HyperPod simplify how you can use the service’s distributed training and inference capabilities.
The SageMaker HyperPod CLI provides data scientists with an intuitive command-line experience, abstracting away the underlying complexity of distributed systems. Built on top of the SageMaker HyperPod SDK, the CLI offers straightforward commands for common workflows like launching training or fine-tuning jobs, deploying inference endpoints, and monitoring cluster performance. This makes it ideal for quick experimentation and iteration.
For more advanced use cases requiring fine-grained control, the SageMaker HyperPod SDK enables programmatic access to customize your ML workflows. Developers can use the SDK’s Python interface to precisely configure training and deployment parameters while maintaining the simplicity of working with familiar Python objects.
In this post, we demonstrate how to use both the CLI and SDK to train and deploy large language models (LLMs) on SageMaker HyperPod. We walk through practical examples of distributed training using Fully Sharded Data Parallel (FSDP) and model deployment for inference, showcasing how these tools streamline the development of production-ready generative AI applications.
Prerequisites
To follow the examples in this post, you must have the following prerequisites:

An AWS account with access to SageMaker HyperPod, Amazon Simple Storage Service (Amazon S3) and Amazon FSx for Lustre.
A local environment (either your local machine or a cloud-based compute environment) from which to run the SageMaker HyperPod CLI commands, configured as follows:

Operating system based on Linux or MacOS.
Python 3.8, 3.9, 3.10 or 3.11 installed.
The AWS Command Line Interface (AWS CLI) configured with the appropriate credentials to use the aforementioned services.

A SageMaker HyperPod cluster orchestrated through Amazon Elastic Kubernetes Service (Amazon EKS) running with an instance group configured with 8 ml.g5.8xlarge instances. For more information on how to create and configured a new SageMaker HyperPod cluster, refer to Creating a SageMaker HyperPod cluster with Amazon EKS orchestration.
An FSx for Lustre persistent volume claim (PVC) to store checkpoints. This can be created either at cluster creation time or separately.

Because the use cases that we demonstrate are about training and deploying LLMs with the SageMaker HyperPod CLI and SDK, you must also install the following Kubernetes operators in the cluster:

HyperPod training operator – For installation instructions, see Installing the training operator.
HyperPod inference operator – For installation instructions, see Setting up your HyperPod clusters for model deployment and the corresponding notebook.

Install the SageMaker HyperPod CLI
First, you must install the latest version of the SageMaker HyperPod CLI and SDK (the examples in this post are based on version 3.1.0). From the local environment, run the following command (you can also install in a Python virtual environment):

# Install the HyperPod CLI and SDK
pip install sagemaker-hyperpod

This command sets up the tools needed to interact with SageMaker HyperPod clusters. For an existing installation, make sure you have the latest version of the package installed (sagemaker-hyperpod>=3.1.0) to be able to use the relevant set of features. To verify if the CLI is installed correctly, you can run the hyp command and check the outputs:

# Check if the HyperPod CLI is correctly installed
hyp

The output will be similar to the following, and includes instructions on how to use the CLI:

Usage: hyp [OPTIONS] COMMAND [ARGS]…

Options:
  –help  Show this message and exit.

Commands:
  create               Create endpoints or pytorch jobs.
  delete               Delete endpoints or pytorch jobs.
  describe             Describe endpoints or pytorch jobs.
  get-cluster-context  Get context related to the current set cluster.
  get-logs             Get pod logs for endpoints or pytorch jobs.
  get-monitoring       Get monitoring configurations for Hyperpod cluster.
  get-operator-logs    Get operator logs for endpoints.
  invoke               Invoke model endpoints.
  list                 List endpoints or pytorch jobs.
  list-cluster         List SageMaker Hyperpod Clusters with metadata.
  list-pods            List pods for endpoints or pytorch jobs.
  set-cluster-context  Connect to a HyperPod EKS cluster.

For more information on CLI usage and the available commands and respective parameters, refer to the CLI reference documentation.
Set the cluster context
The SageMaker HyperPod CLI and SDK use the Kubernetes API to interact with the cluster. Therefore, make sure the underlying Kubernetes Python client is configured to execute API calls against your cluster by setting the cluster context.
Use the CLI to list the clusters available in your AWS account:

# List all HyperPod clusters in your AWS account
hyp list-cluster
[
    {
        “Cluster”: “ml-cluster”,
        “Instances”: [
            {
                “InstanceType”: “ml.g5.8xlarge”,
                “TotalNodes”: 8,
                “AcceleratorDevicesAvailable”: 8,
                “NodeHealthStatus=Schedulable”: 8,
                “DeepHealthCheckStatus=Passed”: “N/A”
            },
            {
                “InstanceType”: “ml.m5.12xlarge”,
                “TotalNodes”: 1,
                “AcceleratorDevicesAvailable”: “N/A”,
                “NodeHealthStatus=Schedulable”: 1,
                “DeepHealthCheckStatus=Passed”: “N/A”
            }
        ]
    }
]

Set the cluster context specifying the cluster name as input (in our case, we use ml-cluster as <cluster_name>):

# Set the cluster context for subsequent commands
hyp set-cluster-context –cluster-name <cluster_name>

Train models with the SageMaker HyperPod CLI and SDK
The SageMaker HyperPod CLI provides a straightforward way to submit PyTorch model training and fine-tuning jobs to a SageMaker HyperPod cluster. In the following example, we schedule a Meta Llama 3.1 8B model training job with FSDP.
The CLI executes training using the HyperPodPyTorchJob Kubernetes custom resource, which is implemented by the HyperPod training operator, that needs to be installed in the cluster as discussed in the prerequisites section.
First, clone the awsome-distributed-training repository and create the Docker image that you will use for the training job:

cd ~
git clone https://github.com/aws-samples/awsome-distributed-training/
cd awsome-distributed-training/3.test_cases/pytorch/FSDP

Then, log in to the Amazon Elastic Container Registry (Amazon ECR) to pull the base image and build the new container:

export AWS_REGION=$(aws ec2 describe-availability-zones –output text –query ‘AvailabilityZones[0].[RegionName]’)
export ACCOUNT=$(aws sts get-caller-identity –query Account –output text)
export REGISTRY=${ACCOUNT}.dkr.ecr.${AWS_REGION}.amazonaws.com/
docker build -f Dockerfile -t ${REGISTRY}fsdp:pytorch2.7.1 .

The Dockerfile in the awsome-distributed-training repository referenced in the preceding code already contains the HyperPod elastic agent, which orchestrates lifecycles of training workers on each container and communicates with the HyperPod training operator. If you’re using a different Dockerfile, install the HyperPod elastic agent following the instructions in HyperPod elastic agent.
Next, create a new registry for your training image if needed and push the built image to it:

# Create registry if needed
REGISTRY_COUNT=$(aws ecr describe-repositories | grep “fsdp” | wc -l)
if [ “$REGISTRY_COUNT” -eq 0 ]; then
    aws ecr create-repository –repository-name fsdp
fi

# Login to registry
echo “Logging in to $REGISTRY …”
aws ecr get-login-password | docker login –username AWS –password-stdin $REGISTRY

# Push image to registry
docker image push ${REGISTRY}fsdp:pytorch2.7.1

After you have successfully created the Docker image, you can submit the training job using the SageMaker HyperPod CLI.
Internally, the SageMaker HyperPod CLI will use the Kubernetes Python client to build a HyperPodPyTorchJob custom resource and then create it on the Kubernetes the cluster.
You can modify the CLI command for other Meta Llama configurations by exchanging the –args to the desired arguments and values; examples can be found in the Kubernetes manifests in the awsome-distributed-training repository.
In the given configuration, the training job will write checkpoints to /fsx/checkpoints on the FSx for Lustre PVC.

hyp create hyp-pytorch-job
    –job-name fsdp-llama3-1-8b
    –image ${REGISTRY}fsdp:pytorch2.7.1
    –command ‘[
        hyperpodrun,
        –tee=3,
        –log_dir=/tmp/hyperpod,
        –nproc_per_node=1,
        –nnodes=8,
        /fsdp/train.py
    ]’
    –args ‘[
        –max_context_width=8192,
        –num_key_value_heads=8,
        –intermediate_size=14336,
        –hidden_width=4096,
        –num_layers=32,
        –num_heads=32,
        –model_type=llama_v3,
        –tokenizer=hf-internal-testing/llama-tokenizer,
        –checkpoint_freq=50,
        –validation_freq=25,
        –max_steps=50,
        –checkpoint_dir=/fsx/checkpoints,
        –dataset=allenai/c4,
        –dataset_config_name=en,
        –resume_from_checkpoint=/fsx/checkpoints,
        –train_batch_size=1,
        –val_batch_size=1,
        –sharding_strategy=full,
        –offload_activations=1
    ]’
    –environment ‘{“PYTORCH_CUDA_ALLOC_CONF”: “max_split_size_mb:32”}’
    –pull-policy “IfNotPresent”
    –instance-type ml.g5.8xlarge
    –node-count 8
    –tasks-per-node 1
    –deep-health-check-passed-nodes-only false
    –max-retry 3
    –volume name=shmem,type=hostPath,mount_path=/dev/shm,path=/dev/shm,read_only=false 
    –volume name=fsx,type=pvc,mount_path=/fsx,claim_name=fsx-claim,read_only=false

The hyp create hyp-pytorch-job command supports additional arguments, which can be discovered by running the following:

hyp create hyp-pytorch-job –help

The preceding example code contains the following relevant arguments:

–command and –args offer flexibility in setting the command to be executed in the container. The command executed is hyperpodrun, implemented by the HyperPod elastic agent that is installed in the training container. The HyperPod elastic agent extends PyTorch’s ElasticAgent and manages the communication of the various workers with the HyperPod training operator. For more information, refer to HyperPod elastic agent.
–environment defines environment variables and customizes the training execution.
–max-retry indicates the maximum number of restarts at the process level that will be attempted by the HyperPod training operator. For more information, refer to Using the training operator to run jobs.
–volume is used to map persistent or ephemeral volumes to the container.

If successful, the command will output the following:

Using version: 1.0
2025-08-12 10:03:03,270 – sagemaker.hyperpod.training.hyperpod_pytorch_job – INFO – Successfully submitted HyperPodPytorchJob ‘fsdp-llama3-1-8b’!

You can observe the status of the training job through the CLI. Running hyp list hyp-pytorch-job will show the status first as Created and then as Running after the containers have been started:

NAME                          NAMESPACE           STATUS         AGE            
——————————————————————————–
fsdp-llama3-1-8b              default             Running        6m        

To list the pods that are created by this training job, run the following command:

hyp list-pods hyp-pytorch-job –job-name fsdp-llama3-1-8b
Pods for job: fsdp-llama3-1-8b

POD NAME                                          NAMESPACE          
———————————————————————-
fsdp-llama3-1-8b-pod-0                            default            
fsdp-llama3-1-8b-pod-1                            default             
fsdp-llama3-1-8b-pod-2                            default         
fsdp-llama3-1-8b-pod-3                            default         
fsdp-llama3-1-8b-pod-4                            default         
fsdp-llama3-1-8b-pod-5                            default         
fsdp-llama3-1-8b-pod-6                            default        
fsdp-llama3-1-8b-pod-7                            default          

You can observe the logs of one of the training pods that get spawned by running the following command:

hyp get-logs hyp-pytorch-job –pod-name fsdp-llama3-1-8b-pod-0 
–job-name fsdp-llama3-1-8b

2025-08-12T14:59:25.069208138Z [HyperPodElasticAgent] 2025-08-12 14:59:25,069 [INFO] [rank0-restart0] /usr/local/lib/python3.10/dist-packages/torch/distributed/elastic/agent/server/api.py:685: [default] Starting worker group
2025-08-12T14:59:25.069301320Z [HyperPodElasticAgent] 2025-08-12 14:59:25,069 [INFO] [rank0-restart0] /usr/local/lib/python3.10/dist-packages/hyperpod_elastic_agent/hyperpod_elastic_agent.py:221: Starting workers with worker spec worker_group.spec=WorkerSpec(role=’default’, local_world_size=1, rdzv_handler=<hyperpod_elastic_agent.rendezvous.hyperpod_rendezvous_backend.HyperPodRendezvousBackend object at 0x7f0970a4dc30>, fn=None, entrypoint=’/usr/bin/python3′, args=(‘-u’, ‘/fsdp/train.py’, ‘–max_context_width=8192’, ‘–num_key_value_heads=8’, ‘–intermediate_size=14336’, ‘–hidden_width=4096’, ‘–num_layers=32’, ‘–num_heads=32’, ‘–model_type=llama_v3’, ‘–tokenizer=hf-internal-testing/llama-tokenizer’, ‘–checkpoint_freq=50’, ‘–validation_freq=50’, ‘–max_steps=100’, ‘–checkpoint_dir=/fsx/checkpoints’, ‘–dataset=allenai/c4’, ‘–dataset_config_name=en’, ‘–resume_from_checkpoint=/fsx/checkpoints’, ‘–train_batch_size=1’, ‘–val_batch_size=1’, ‘–sharding_strategy=full’, ‘–offload_activations=1’), max_restarts=3, monitor_interval=0.1, master_port=None, master_addr=None, local_addr=None)…
2025-08-12T14:59:30.264195963Z [default0]:2025-08-12 14:59:29,968 [INFO] **main**: Creating Model
2025-08-12T15:00:51.203541576Z [default0]:2025-08-12 15:00:50,781 [INFO] **main**: Created model with total parameters: 7392727040 (7.39 B)
2025-08-12T15:01:18.139531830Z [default0]:2025-08-12 15:01:18 I [checkpoint.py:79] Loading checkpoint from /fsx/checkpoints/llama_v3-24steps …
2025-08-12T15:01:18.833252603Z [default0]:2025-08-12 15:01:18,081 [INFO] **main**: Wrapped model with FSDP
2025-08-12T15:01:18.833290793Z [default0]:2025-08-12 15:01:18,093 [INFO] **main**: Created optimizer

We elaborate on more advanced debugging and observability features at the end of this section.
Alternatively, if you prefer a programmatic experience and more advanced customization options, you can submit the training job using the SageMaker HyperPod Python SDK. For more information, refer to the SDK reference documentation. The following code will yield the equivalent training job submission to the preceding CLI example:

import os
from sagemaker.hyperpod.training import HyperPodPytorchJob
from sagemaker.hyperpod.training import ReplicaSpec, Template, VolumeMounts, Spec, Containers, Resources, RunPolicy, Volumes, HostPath, PersistentVolumeClaim
from sagemaker.hyperpod.common.config import Metadata

REGISTRY = os.environ[‘REGISTRY’]

# Define job specifications
nproc_per_node = “1”  # Number of processes per node
replica_specs = [
    ReplicaSpec(
        name = “pod”,  # Replica name
        replicas = 8,
        template = Template(
            spec = Spec(
                containers =
                [
                    Containers(
                        # Container name
                        name=”fsdp-training-container”,  
                        
                        # Training image
                        image=f”{REGISTRY}fsdp:pytorch2.7.1″,  
                        # Volume mounts
                        volume_mounts=[
                            VolumeMounts(
                                name=”fsx”,
                                mount_path=”/fsx”
                            ),
                            VolumeMounts(
                                name=”shmem”,
                                mount_path=”/dev/shm”
                            )
                        ],
                        env=[
                                {“name”: “PYTORCH_CUDA_ALLOC_CONF”, “value”: “max_split_size_mb:32″},
                            ],
                        
                        # Image pull policy
                        image_pull_policy=”IfNotPresent”,
                        resources=Resources(
                            requests={“nvidia.com/gpu”: “1”},  
                            limits={“nvidia.com/gpu”: “1”},  
                        ),
                        # Command to run
                        command=[
                            “hyperpodrun”,
                            “–tee=3”,
                            “–log_dir=/tmp/hyperpod”,
                            “–nproc_per_node=1”,
                            “–nnodes=8”,
                            “/fsdp/train.py”
                        ],  
                        # Script arguments
                        args = [
                            ‘–max_context_width=8192’,
                            ‘–num_key_value_heads=8’,
                            ‘–intermediate_size=14336’,
                            ‘–hidden_width=4096’,
                            ‘–num_layers=32’,
                            ‘–num_heads=32’,
                            ‘–model_type=llama_v3’,
                            ‘–tokenizer=hf-internal-testing/llama-tokenizer’,
                            ‘–checkpoint_freq=2’,
                            ‘–validation_freq=25’,
                            ‘–max_steps=50’,
                            ‘–checkpoint_dir=/fsx/checkpoints’,
                            ‘–dataset=allenai/c4’,
                            ‘–dataset_config_name=en’,
                            ‘–resume_from_checkpoint=/fsx/checkpoints’,
                            ‘–train_batch_size=1’,
                            ‘–val_batch_size=1’,
                            ‘–sharding_strategy=full’,
                            ‘–offload_activations=1’
                        ]
                    )
                ],
                volumes = [
                    Volumes(
                        name=”fsx”,
                        persistent_volume_claim=PersistentVolumeClaim(
                            claim_name=”fsx-claim”,
                            read_only=False
                        ),
                    ),
                    Volumes(
                        name=”shmem”,
                        host_path=HostPath(path=”/dev/shm”),
                    )
                ],
                node_selector={
                    “node.kubernetes.io/instance-type”: “ml.g5.8xlarge”,
                },
            )
        ),
    )
]
run_policy = RunPolicy(clean_pod_policy=”None”, job_max_retry_count=3)  
# Create and start the PyTorch job
pytorch_job = HyperPodPytorchJob(
    # Job name
    metadata = Metadata(
        name=”fsdp-llama3-1-8b”,    
        namespace=”default”,
    ),
    # Processes per node
    nproc_per_node = nproc_per_node,  
    # Replica specifications
    replica_specs = replica_specs,        
)
# Launch the job
pytorch_job.create()  

Debugging training jobs
In addition to monitoring the training pod logs as described earlier, there are several other useful ways of debugging training jobs:

You can submit training jobs with an additional –debug True flag, which will print the Kubernetes YAML to the console when the job starts so it can be inspected by users.
You can view a list of current training jobs by running hyp list hyp-pytorch-job.
You can view the status and corresponding events of the job by running hyp describe hyp-pytorch-job —job-name fsdp-llama3-1-8b.
If the HyperPod observability stack is deployed to the cluster, run hyp get-monitoring –grafana and hyp get-monitoring –prometheus to get the Grafana dashboard and Prometheus workspace URLs, respectively, to view cluster and job metrics.
To monitor GPU utilization or view directory contents, it can be useful to execute commands or open an interactive shell into the pods. You can run commands in a pod by running, for example, kubectl exec -it<pod-name>– nvtop to run nvtop for visibility into GPU utilization. You can open an interactive shell by running kubectl exec -it<pod-name>– /bin/bash.
The logs of the HyperPod training operator controller pod can have valuable information about scheduling. To view them, run kubectl get pods -n aws-hyperpod | grep hp-training-controller-manager to find the controller pod name and run kubectl logs -n aws-hyperpod<controller-pod-name> to view the corresponding logs.

Deploy models with the SageMaker HyperPod CLI and SDK
The SageMaker HyperPod CLI provides commands to quickly deploy models to your SageMaker HyperPod cluster for inference. You can deploy both foundation models (FMs) available on Amazon SageMaker JumpStart as well as custom models with artifacts that are stored on Amazon S3 or FSx for Lustre file systems.
This functionality will automatically deploy the chosen model to the SageMaker HyperPod cluster through Kubernetes custom resources, which are implemented by the HyperPod inference operator, that needs to be installed in the cluster as discussed in the prerequisites section. It is optionally possible to automatically create a SageMaker inference endpoint as well as an Application Load Balancer (ALB), which can be used directly using HTTPS calls with a generated TLS certificate to invoke the model.
Deploy SageMaker JumpStart models
You can deploy an FM that is available on SageMaker JumpStart with the following command:

hyp create hyp-jumpstart-endpoint
  –model-id deepseek-llm-r1-distill-qwen-1-5b
  –instance-type ml.g5.8xlarge
  –endpoint-name
  –tls-certificate-output-s3-uri s3://<certificate-bucket>/
  –namespace default

The preceding code includes the following parameters:

–model-id is the model ID in the SageMaker JumpStart model hub. In this example, we deploy a DeepSeek R1-distilled version of Qwen 1.5B, which is available on SageMaker JumpStart.
–instance-type is the target instance type in your SageMaker HyperPod cluster where you want to deploy the model. This instance type must be supported by the chosen model.
–endpoint-name is the name that the SageMaker inference endpoint will have. This name must be unique. SageMaker inference endpoint creation is optional.
–tls-certificate-output-s3-uri is the S3 bucket location where the TLS certificate for the ALB will be stored. This can be used to directly invoke the model through HTTPS. You can use S3 buckets that are accessible by the HyperPod inference operator IAM role.
–namespace is the Kubernetes namespace the model will be deployed to. The default value is set to default.

The CLI supports more advanced deployment configurations, including auto scaling, through additional parameters, which can be viewed by running the following command:

hyp create hyp-jumpstart-endpoint –help

If successful, the command will output the following:

Creating JumpStart model and sagemaker endpoint. Endpoint name: deepseek-distill-qwen-endpoint-cli.
 The process may take a few minutes…

After a few minutes, both the ALB and the SageMaker inference endpoint will be available, which can be observed through the CLI. Running hyp list hyp-jumpstart-endpoint will show the status first as DeploymentInProgress and then as DeploymentComplete when the endpoint is ready to be used:

| name                               | namespace   | labels   | status             |
|————————————|————-|———-|——————–|
| deepseek-distill-qwen-endpoint-cli | default     |          | DeploymentComplete |

To get additional visibility into the deployment pod, run the following commands to find the pod name and view the corresponding logs:

hyp list-pods hyp-jumpstart-endpoint –namespace <namespace>
hyp get-logs hyp-jumpstart-endpoint –namespace <namespace> –pod-name <model-pod-name>

The output will look similar to the following:

2025-08-12T15:53:14.042031963Z WARN  PyProcess W-195-model-stderr: Capturing CUDA graph shapes: 100%|??????????| 35/35 [00:18<00:00,  1.63it/s]
2025-08-12T15:53:14.042257357Z WARN  PyProcess W-195-model-stderr: Capturing CUDA graph shapes: 100%|??????????| 35/35 [00:18<00:00,  1.94it/s]
2025-08-12T15:53:14.042297298Z INFO  PyProcess W-195-model-stdout: INFO 08-12 15:53:14 llm_engine.py:436] init engine (profile, create kv cache, warmup model) took 26.18 seconds
2025-08-12T15:53:15.215357997Z INFO  PyProcess Model [model] initialized.
2025-08-12T15:53:15.219205375Z INFO  WorkerThread Starting worker thread WT-0001 for model model (M-0001, READY) on device gpu(0)
2025-08-12T15:53:15.221591827Z INFO  ModelServer Initialize BOTH server with: EpollServerSocketChannel.
2025-08-12T15:53:15.231404670Z INFO  ModelServer BOTH API bind to: http://0.0.0.0:8080

You can invoke the SageMaker inference endpoint you created through the CLI by running the following command:

hyp invoke hyp-jumpstart-endpoint
    –endpoint-name deepseek-distill-qwen-endpoint-cli      
    –body ‘{“inputs”:”What is the capital of USA?”}’

You will get an output similar to the following:

{“generated_text”: ” What is the capital of France? What is the capital of Japan? What is the capital of China? What is the capital of Germany? What is”}

Alternatively, if you prefer a programmatic experience and advanced customization options, you can use the SageMaker HyperPod Python SDK. The following code will yield the equivalent deployment to the preceding CLI example:

from sagemaker.hyperpod.inference.config.hp_jumpstart_endpoint_config import Model, Server, SageMakerEndpoint, TlsConfig
from sagemaker.hyperpod.inference.hp_jumpstart_endpoint import HPJumpStartEndpoint

model=Model(
    model_id=’deepseek-llm-r1-distill-qwen-1-5b’,
)

server=Server(
    instance_type=’ml.g5.8xlarge’,
)

endpoint_name=SageMakerEndpoint(name=’deepseek-distill-qwen-endpoint-cli’)

tls_config=TlsConfig(tls_certificate_output_s3_uri=’s3://<certificate-bucket>’)

js_endpoint=HPJumpStartEndpoint(
    model=model,
    server=server,
    sage_maker_endpoint=endpoint_name,
    tls_config=tls_config,
    namespace=”default”
)

js_endpoint.create()

Deploy custom models
You can also use the CLI to deploy custom models with model artifacts stored on either Amazon S3 or FSx for Lustre. This is useful for models that have been fine-tuned on custom data. You must provide the storage location of the model artifacts as well as a container image for inference that is compatible with the model artifacts and SageMaker inference endpoints. In the following example, we deploy a TinyLlama 1.1B model from Amazon S3 using the DJL Large Model Inference container image.
In preparation, download the model artifacts locally and push them to an S3 bucket:

# Install huggingface-hub if not present on your machine
pip install huggingface-hub

# Download model
hf download TinyLlama/TinyLlama-1.1B-Chat-v1.0 –local-dir ./tinyllama-1.1b-chat

# Upload to S3
aws s3 cp ./tinyllama s3://<model-bucket>/models/tinyllama-1.1b-chat/ –recursive

Now you can deploy the model with the following command:

hyp create hyp-custom-endpoint
    –endpoint-name my-custom-tinyllama-endpoint
    –model-name tinyllama
    –model-source-type s3
    –model-location models/tinyllama-1.1b-chat/ 
    –s3-bucket-name <model-bucket>
    –s3-region <model-bucket-region> 
    –instance-type ml.g5.8xlarge
    –image-uri 763104351884.dkr.ecr.us-west-2.amazonaws.com/djl-inference:0.33.0-lmi15.0.0-cu128
    –container-port 8080
    –model-volume-mount-name modelmount
    –tls-certificate-output-s3-uri s3://<certificate-bucket>/
    –namespace default

The preceding code contains the following key parameters:

–model-name is the name of the model that will be created in SageMaker
–model-source-type specifies either fsx or s3 for the location of the model artifacts
–model-location specifies the prefix or folder where the model artifacts are located
–s3-bucket-name and —s3-region specify the S3 bucket name and AWS Region, respectively
–instance-type, –endpoint-name, –namespace, and –tls-certificate behave the same as for the deployment of SageMaker JumpStart models

Similar to SageMaker JumpStart model deployment, the CLI supports more advanced deployment configurations, including auto scaling, through additional parameters, which you can view by running the following command:

hyp create hyp-custom-endpoint –help

If successful, the command will output the following:

Creating sagemaker model and endpoint. Endpoint name: my-custom-tinyllama-endpoint.
 The process may take a few minutes…

After a few minutes, both the ALB and the SageMaker inference endpoint will be available, which you can observe through the CLI. Running hyp list hyp-custom-endpoint will show the status first as DeploymentInProgress and as DeploymentComplete when the endpoint is ready to be used:

| name                         | namespace   | labels   | status               |
|——————————|————-|———-|———————-|
| my-custom-tinyllama-endpoint | default     |          | DeploymentComplete   |

To get additional visibility into the deployment pod, run the following commands to find the pod name and view the corresponding logs:

hyp list-pods hyp-custom-endpoint –namespace <namespace>
hyp get-logs hyp-custom-endpoint –namespace <namespace> –pod-name <model-pod-name>

The output will look similar to the following:

│ INFO  PyProcess W-196-model-stdout: INFO 08-12 16:00:36 [monitor.py:33] torch.compile takes 29.18 s in total                                                          │
│ INFO  PyProcess W-196-model-stdout: INFO 08-12 16:00:37 [kv_cache_utils.py:634] GPU KV cache size: 809,792 tokens                                                     │
│ INFO  PyProcess W-196-model-stdout: INFO 08-12 16:00:37 [kv_cache_utils.py:637] Maximum concurrency for 2,048 tokens per request: 395.41x                             │
│ INFO  PyProcess W-196-model-stdout: INFO 08-12 16:00:59 [gpu_model_runner.py:1626] Graph capturing finished in 22 secs, took 0.37 GiB                                 │
│ INFO  PyProcess W-196-model-stdout: INFO 08-12 16:00:59 [core.py:163] init engine (profile, create kv cache, warmup model) took 59.39 seconds                         │
│ INFO  PyProcess W-196-model-stdout: INFO 08-12 16:00:59 [core_client.py:435] Core engine process 0 ready.                                                             │
│ INFO  PyProcess Model [model] initialized.                                                                                                                            │
│ INFO  WorkerThread Starting worker thread WT-0001 for model model (M-0001, READY) on device gpu(0)                                                                    │
│ INFO  ModelServer Initialize BOTH server with: EpollServerSocketChannel.                                                                                              │
│ INFO  ModelServer BOTH API bind to: http://0.0.0.0:8080 

You can invoke the SageMaker inference endpoint you created through the CLI by running the following command:

hyp invoke hyp-custom-endpoint
    –endpoint-name my-custom-tinyllama-endpoint      
    –body ‘{“inputs”:”What is the capital of USA?”}’

You will get an output similar to the following:

{“generated_text”: ” What is the capital of France? What is the capital of Japan? What is the capital of China? What is the capital of Germany? What is”}

Alternatively, you can deploy using the SageMaker HyperPod Python SDK. The following code will yield the equivalent deployment to the preceding CLI example:

from sagemaker.hyperpod.inference.config.hp_endpoint_config import S3Storage, ModelSourceConfig, TlsConfig, EnvironmentVariables, ModelInvocationPort, ModelVolumeMount, Resources, Worker
from sagemaker.hyperpod.inference.hp_endpoint import HPEndpoint

model_source_config = ModelSourceConfig(
    model_source_type=’s3′,
    model_location=”models/tinyllama-1.1b-chat/”,
    s3_storage=S3Storage(
        bucket_name='<model-bucket>’,
        region='<model-bucket-region>’,
    ),
)

worker = Worker(
    image=’763104351884.dkr.ecr.us-west-2.amazonaws.com/djl-inference:0.33.0-lmi15.0.0-cu128′,
    model_volume_mount=ModelVolumeMount(
        name=’modelmount’,
    ),
    model_invocation_port=ModelInvocationPort(container_port=8080),
    resources=Resources(
            requests={“cpu”: “30000m”, “nvidia.com/gpu”: 1, “memory”: “100Gi”},
            limits={“nvidia.com/gpu”: 1}
    ),
)

tls_config = TlsConfig(tls_certificate_output_s3_uri=’s3://<certificate-bucket>/’)

custom_endpoint = HPEndpoint(
    endpoint_name=’my-custom-tinyllama-endpoint’,
    instance_type=’ml.g5.8xlarge’,
    model_name=’tinyllama’,  
    tls_config=tls_config,
    model_source_config=model_source_config,
    worker=worker,
)

custom_endpoint.create()

Debugging inference deployments
In addition to the monitoring of the inference pod logs, there are several other useful ways of debugging inference deployments:

You can access the HyperPod inference operator controller logs through the SageMaker HyperPod CLI. Run hyp get-operator-logs<hyp-custom-endpoint/hyp-jumpstart-endpoint>—since-hours 0.5 to access the operator logs for custom and SageMaker JumpStart deployments, respectively.
You can view a list of inference deployments by running hyp list<hyp-custom-endpoint/hyp-jumpstart-endpoint>.
You can view the status and corresponding events of deployments by running hyp describe<hyp-custom-endpoint/hyp-jumpstart-endpoint>–name<deployment-name> to view the status and events for custom and SageMaker JumpStart deployments, respectively.
If the HyperPod observability stack is deployed to the cluster, run hyp get-monitoring –grafana and hyp get-monitoring –prometheus to get the Grafana dashboard and Prometheus workspace URLs, respectively, to view inference metrics as well.
To monitor GPU utilization or view directory contents, it can be useful to execute commands or open an interactive shell into the pods. You can run commands in a pod by running, for example, kubectl exec -it<pod-name>– nvtop to run nvtop for visibility into GPU utilization. You can open an interactive shell by running kubectl exec -it<pod-name>– /bin/bash.

For more information on the inference deployment features in SageMaker HyperPod, see Amazon SageMaker HyperPod launches model deployments to accelerate the generative AI model development lifecycle and Deploying models on Amazon SageMaker HyperPod.
Clean up
To delete the training job from the corresponding example, use the following CLI command:

hyp delete hyp-pytorch-job –job-name fsdp-llama3-1-8b

To delete the model deployments from the inference example, use the following CLI commands for SageMaker JumpStart and custom model deployments, respectively:

hyp delete hyp-jumpstart-endpoint –name deepseek-distill-qwen-endpoint-cli
hyp delete hyp-custom-endpoint –name my-custom-tinyllama-endpoint

To avoid incurring ongoing costs for the instances running in your cluster, you can scale down the instances or delete instances.
Conclusion
The new SageMaker HyperPod CLI and SDK can significantly streamline the process of training and deploying large-scale AI models. Through the examples in this post, we’ve demonstrated how these tools provide the following benefits:

Simplified workflows – The CLI offers straightforward commands for common tasks like distributed training and model deployment, making powerful capabilities of SageMaker HyperPod accessible to data scientists without requiring deep infrastructure knowledge.
Flexible development options – Although the CLI handles common scenarios, the SDK enables fine-grained control and customization for more complex requirements, so developers can programmatically configure every aspect of their distributed ML workloads.
Comprehensive observability – Both interfaces provide robust monitoring and debugging capabilities through system logs and integration with the SageMaker HyperPod observability stack, helping quickly identify and resolve issues during development.
Production-ready deployment – The tools support end-to-end workflows from experimentation to production, including features like automatic TLS certificate generation for secure model endpoints and integration with SageMaker inference endpoints.

Getting started with these tools is as simple as installing the sagemaker-hyperpod package. The SageMaker HyperPod CLI and SDK provide the right level of abstraction for both data scientists looking to quickly experiment with distributed training and ML engineers building production systems.
For more information about SageMaker HyperPod and these development tools, refer to the SageMaker HyperPod CLI and SDK documentation or explore the example notebooks.

About the authors
Giuseppe Angelo Porcelli is a Principal Machine Learning Specialist Solutions Architect for Amazon Web Services. With several years of software engineering and an ML background, he works with customers of any size to understand their business and technical needs and design AI and ML solutions that make the best use of the AWS Cloud and the Amazon Machine Learning stack. He has worked on projects in different domains, including MLOps, computer vision, and NLP, involving a broad set of AWS services. In his free time, Giuseppe enjoys playing football.
Shweta Singh is a Senior Product Manager in the Amazon SageMaker Machine Learning platform team at AWS, leading the SageMaker Python SDK. She has worked in several product roles in Amazon for over 5 years. She has a Bachelor of Science degree in Computer Engineering and a Masters of Science in Financial Engineering, both from New York University.
Nicolas Jourdan is a Specialist Solutions Architect at AWS, where he helps customers unlock the full potential of AI and ML in the cloud. He holds a PhD in Engineering from TU Darmstadt in Germany, where his research focused on the reliability, concept drift detection, and MLOps of industrial ML applications. Nicolas has extensive hands-on experience across industries, including autonomous driving, drones, and manufacturing, having worked in roles ranging from research scientist to engineering manager. He has contributed to award-winning research, holds patents in object detection and anomaly detection, and is passionate about applying cutting-edge AI to solve complex real-world problems.

Build a serverless Amazon Bedrock batch job orchestration workflow usi …

As organizations increasingly adopt foundation models (FMs) for their artificial intelligence and machine learning (AI/ML) workloads, managing large-scale inference operations efficiently becomes crucial. Amazon Bedrock supports two general types of large-scale inference patterns: real-time inference and batch inference for use cases that involve processing massive datasets where immediate results aren’t necessary.
Amazon Bedrock batch inference is a cost-effective solution that offers a 50% discount compared to on-demand processing, making it ideal for high-volume, time-insensitive workloads. However, implementing batch inference at scale comes with its own set of challenges, including managing input formatting and job quotas, orchestrating concurrent executions, and handling postprocessing tasks. Developers need a robust framework to streamline these operations.
In this post, we introduce a flexible and scalable solution that simplifies the batch inference workflow. This solution provides a highly scalable approach to managing your FM batch inference needs, such as generating embeddings for millions of documents or running custom evaluation or completion tasks with large datasets.
Solution overview
The following diagram details a broad overview of the automated workflow, which includes three main phases: preprocessing of input datasets (for example, prompt formatting), execution of batch inference jobs in parallel, and postprocessing to parse the model outputs.

This solution provides a flexible and scalable framework to simplify batch orchestration. Given a simple configuration input, the Step Functions state machine deployed in this AWS Cloud Development Kit (AWS CDK) stack handles preprocessing the dataset, launching parallel batch jobs, and postprocessing the output.
In our specific use case, we use 2.2 million rows of data from the open source dataset SimpleCoT. The SimpleCoT dataset on Hugging Face is a collection of diverse task-oriented examples designed to demonstrate and train chain-of-thought (CoT) reasoning in language models. This dataset encompasses a wide range of problem types, including reading comprehension, mathematical reasoning, logical deduction, and natural language processing (NLP) tasks. The dataset is structured with each entry containing a task description, question, the correct answer, and a detailed explanation of the reasoning process.
The following diagram illustrates the solution architecture.

The Amazon Bedrock batch orchestration pattern uses scalable and serverless components to cover the key architectural considerations specific to batch processing workflows:

File format and storage – Job inputs must be structured as JSONL files stored in an Amazon Simple Storage Service (Amazon S3) bucket, with each line representing a single input record that matches the API request structure for that FM or provider. For example, Anthropic’s Claude models have a different JSON structure compared to Amazon Titan Text Embeddings V2. There are also quotas to consider: at the time of writing, a minimum of 1,000 and maximum of 50,000 records per batch. You can request a quota increase using Service Quotas based on your use case requirements.
Step Functions state machine – Orchestration of the asynchronous, long-running jobs requires a robust control flow system. Our architecture uses Step Functions to coordinate the overall process, with Amazon DynamoDB maintaining the inventory of individual jobs and their states. Again, there are important quota considerations: for example, the maximum sum of in-progress and submitted batch inference jobs using a base model for Amazon Titan Text Embeddings V2 is currently 20 per AWS Region. Using Map workflow states, Step Functions can help maximize throughput by controlling job submission and monitoring completion status.
Postprocessing – Finally, you will likely want to perform some light postprocessing on the batch outputs (also JSONL files in Amazon S3) to parse the responses and join the output back to the original input. For example, when generating text embeddings, you must have a mechanism to map output vectors back to their source text. These configurable AWS Lambda functions are triggered as part of the Step Functions workflow after batch results arrive in Amazon S3.

In the following sections, we walk through the steps to deploy the AWS CDK stack to your AWS environment.
Prerequisites
Complete the following prerequisite steps:

Install node and npm.
Install the AWS CDK:

npm install -g aws-cdk

Clone the GitHub repository into your local development environment:

git clone https://github.com/aws-samples/amazon-bedrock-samples
cd poc-to-prod/bedrock-batch-orchestrator

Deploy the solution
Install the required packages with the following code:npm i
Check the prompt_templates.py file and add a new prompt template to prompt_id_to_template for your desired use case.
prompt_id_to_template is a dict where the key is the prompt_id (allowing you to associate a given job with a particular prompt). Formatting keys in the prompt string template must also exist in your input file. For example, consider the following prompt template:

You are an AI assistant tasked with providing accurate and justified answers to users’ questions.

You will be given a task, and you should respond with a chain-of-thought surrounded by <thinking> tags, then a final answer in <answer> tags.

For example, given the following task:

<task>
You are given an original reference as well as a system generated reference. Your task is to judge the naturaleness of the system generated reference. If the utterance could have been produced by a native speaker output 1, else output 0. System Reference: may i ask near where? Original Reference: where do you need a hotel near?.
</task>

<thinking>
The utterance “may i ask near where?” is not natural.
This utterance does not make sense grammatically.
Thus we output 0.
</thinking>

<answer>0</answer>

Your turn. Please respond to the following task:

<task>
{source}
</task>

You must make sure your input dataset has a column for each formatting key (for example, source in the preceding example code).
Prompt templates are not used for embedding model-based jobs.Deploy the AWS CDK stack with the following code:npm run cdk deploy
Take note of the AWS CloudFormation outputs denoting the names of the bucket and Step Functions workflow:

✅ BedrockBatchOrchestratorStack

✨ Deployment time: 23.16s

Outputs:
BedrockBatchOrchestratorStack.bucketName = batch-inference-bucket-<YOUR_ACCOUNT_ID>
BedrockBatchOrchestratorStack.stepFunctionName = bedrockBatchOrchestratorSfnE5E2B976-4yznxekguxxm
Stack ARN:
arn:aws:cloudformation:us-east-1:<YOUR_ACCOUNT_ID>:stack/BedrockBatchOrchestratorStack/0787ba80-b0cb-11ef-a481-0affd4b49c99

✨ Total time: 26.74s

Job input structure
As your input dataset, you can either use a Hugging Face dataset ID or point directly to a dataset in Amazon S3 (CSV or Parquet formats are supported at the time of writing). The source of the input dataset and the type of model (text generation or embedding) dictate the structure of the Step Functions input.
Hugging Face dataset
For a Hugging Face dataset, reference a dataset ID (for example, w601sxs/simpleCoT) and split (for example, train), and your dataset will be pulled directly from Hugging Face Hub.

The question_answering prompt template in prompt_templates.py has a formatting key called source to match the name of the appropriate column in the referenced dataset (see the preceding example). We use this prompt to generate the rationale and answer for each of the 2.2 million rows in the dataset. See the following code:

{
“job_name_prefix”: “full-cot-job”,
“model_id”: “us.anthropic.claude-3-5-haiku-20241022-v1:0”,
“prompt_id”: “question_answering”,
“dataset_id”: “w601sxs/simpleCoT”,
“split”: “train”,
“max_records_per_job”: 50000
}

We also have optional keys for max_num_jobs (to limit the total number of jobs, which is useful for testing on a smaller scale) and max_records_per_batch.
Amazon S3 dataset
Upload an input CSV or parquet file to the S3 bucket and copy the S3 URI. For example:aws s3 cp topics.csv s3://batch-inference-bucket-<YOUR_ACCOUNT_ID>/inputs/jokes/topics.csv
Open your Step Functions state machine on the Step Functions console and submit an input with the following structure. You must supply an s3_uri for S3 datasets.
For example, for Anthropic models with an Amazon S3 input, use the following code:

{
“s3_uri”: “s3://batch-inference-bucket-<YOUR_ACCOUNT_ID>/inputs/jokes/topics.csv”,
“job_name_prefix”: “test-joke-job1”,
“model_id”: “anthropic.claude-3-haiku-20240307-v1:0”,
“prompt_id”: “joke_about_topic”
}

The prompt_id of joke_about_topic maps to a prompt template in prompt_templates.py, which has a formatting key for topic, which must be one of the columns in the input CSV file.
Generate batch embeddings
To generate embeddings with a model like Amazon Titan Text Embeddings V2, you don’t need to provide a prompt_id, but you do need to make sure your input CSV file has a column called input_text with the text you want to embed. For example:

{
“s3_uri”: “s3://batch-inference-bucket-<YOUR_ACCOUNT_ID>/inputs/embeddings/embedding_input.csv”,
“job_name_prefix”: “test-embeddings-job1”,
“model_id”: “amazon.titan-embed-text-v2:0”,
“prompt_id”: null
}

Step Functions workflow
The following diagram shows an example of a successful Step Functions workflow execution.

When a Step Functions state machine is initiated, it completes the following steps:

Preprocess input datasets to prepare batch job inputs for your particular model ID and prompt template. The BaseProcessor abstract class can quickly be extended for other model providers, such as Meta Llama 3 or Amazon Nova.
Orchestrate batch jobs in an event-driven fashion. We maintain an internal inventory of jobs in a DynamoDB table and keep it updated when Amazon Bedrock emits events related to job status changes. These updates are then transmitted back to the step function using the Wait for Task Token Callback integration pattern. Using a SFN Map, we make sure that the maximum capacity of concurrent jobs is maintained until the records have been processed.
Run concurrent postprocessing of batch outputs to perform some light parsing and merge model responses back to the original input data using the recordId field as a join key. The output data depends on the kind of model you use. For text-based models, the output string will be in a new column called response.

Monitor your state machine as it runs the jobs. The maximum number of concurrent jobs is controlled by an AWS CDK context variable in cdk.json (key: maxConcurrentJobs). The paths to your resulting Parquet files will be aggregated in the outputs from the execution.
The output Parquet files will contain the same columns as your input file alongside the generated responses.
For text generation models, the output string will be in a new column called response, as shown in the following screenshot of a sample output.

For embedding models, the output (list of floats) will be in a new column called embedding, as shown in the following screenshot.

There are no guaranteed SLAs for the Batch Inference API. Runtimes will vary based on the demand of the desired model at the time of your request. For example, to process the 2.2 million records in the SimpleCoT dataset, execution was spread across 45 individual processing jobs, with a maximum of 20 concurrent jobs at a given time. In our experiment with Anthropic’s Claude Haiku 3.5 in the us-east-1 Region, each individual job execution took an average of 9 hours, for a total end-to-end processing time of about 27 hours.
Clean up
To avoid incurring additional costs, you can clean up the stack’s resources by running cdk destroy.
Conclusion
In this post, we outlined a serverless architecture for performing large-scale batch processing using Amazon Bedrock batch inference. We explored using the solution for various use cases, including large-scale data labeling and embedding generation. You can also generate a large amount synthetic data from a teacher model used to train a student model as part of model distillation process.
The solution is publicly available in the GitHub repo. We can’t wait to see how you put this architecture to work for your use cases.

About the authors
Swagat Kulkarni is a Senior Solutions Architect at AWS and an active Generative AI practitioner. He is passionate about helping customers solve real-world challenges using cloud-native services and machine learning. With a strong background in driving digital transformation across diverse industries, Swagat has delivered impactful solutions that enable innovation and scale. Outside of work, he enjoys traveling, reading, and cooking.
Evan Diewald is a Data & Machine Learning Engineer with AWS Professional Services, where he helps AWS customers develop and deploy ML solutions in a variety of industry verticals. Prior to joining AWS, he received an M.S. from Carnegie Mellon University, where he conducted research at the intersection of advanced manufacturing and AI. Outside of work, he enjoys mountain biking and rock climbing.
Shreyas Subramanian is a Principal Data Scientist and helps customers by using Generative AI and deep learning to solve their business challenges using AWS services like Amazon Bedrock and AgentCore. Dr. Subramanian contributes to cutting-edge research in deep learning, Agentic AI, foundation models and optimization techniques with several books, papers and patents to his name. In his current role at Amazon, Dr. Subramanian works with various science leaders and research teams within and outside Amazon, helping to guide customers to best leverage state-of-the-art algorithms and techniques to solve business critical problems. Outside AWS, Dr. Subramanian is a expert reviewer for AI papers and funding via organizations like Neurips, ICML, ICLR, NASA and NSF.

Natural language-based database analytics with Amazon Nova

In this post, we explore how natural language database analytics can revolutionize the way organizations interact with their structured data through the power of large language model (LLM) agents. Natural language interfaces to databases have long been a goal in data management. Agents enhance database analytics by breaking down complex queries into explicit, verifiable reasoning steps and enabling self-correction through validation loops that can catch errors, analyze failures, and refine queries until they accurately match user intent and schema requirements. We demonstrate how this modern approach enables intuitive, conversation-like interactions with complex database systems while maintaining precision and reliability.
To achieve optimal performance with minimal trade-offs, we use the Amazon Nova family of foundation models (FMs): Amazon Nova Pro, Amazon Nova Lite, and Amazon Nova Micro. These FMs encode vast amounts of world knowledge, facilitating nuanced reasoning and contextual understanding essential for complex data analysis. Our solution uses the ReAct (reasoning and acting) pattern, implemented through LangGraph’s flexible architecture. This approach combines the strengths of Amazon Nova LLMs for natural language understanding with explicit reasoning steps and actions.
Challenges of natural language-based database analytics
Many customers undergoing generative AI transformation share a common realization: their vast data stores hold untapped potential for automated analysis and natural language querying. This insight leads them to explore SQL-based solutions, where queries can range from simple SELECT and WHERE clauses to complex, multipage statements involving sophisticated aggregations and functions.
At its core, successful analysis depends on identifying and retrieving the correct dataset. This foundational step enables all downstream activities, including visualization, further analysis, and exploration. Translating a user’s intent—whether stated or implicit—into a performant, precise, and valid SQL query is a significant challenge.
Our solution excels in generating context and metadata-aware queries capable of retrieving precise datasets and performing intricate analyses. To fully harness the capabilities of Agents and Amazon Nova FMs, a user-friendly interface is crucial. We’ve developed an intuitive interface where users are guided through their analysis journey with human-in-the-loop (HITL) capabilities, allowing for input, approvals, and modifications at critical decision points.
Solution overview
The solution architecture consists of three core components: UI, generative AI, and data. The agent in this solution serves as the central coordinator, combining critical capabilities such as question understanding, decision-making, workflow orchestration, intelligent routing, and generating comprehensive natural language responses. It enhances questions by improving text quality, standardizing terminology, and maintaining conversational context, helping users extend their analysis through a series of related queries while preserving precise analytical intent. The agent’s intelligent routing capabilities mean that correct tools are invoked for each user questions, enabling cohesive end-to-end query processing. Furthermore, it processes tabular and visual data and uses the complete context to generate comprehensive summaries that explain findings, highlight key insights, and suggest relevant follow-up questions. As an added benefit, the agent can suggest relevant follow-up questions and related topics, helping users explore their data more deeply and discover unexpected insights. The following tools are connected to the agent:

Text2SQL – When the orchestrator determines data retrieval is needed, it uses the Text2SQL tool, which uses a comprehensive knowledge base that includes metadata, table schemas, example queries with their results, and detailed data dictionaries. Using this rich context, the tool transforms natural language questions into precise SQL queries.
SQLExecutor – This tool directly connects to the structured data store and executes the SQL queries generated by the agent using the Text2SQL tool. The tool executes the generated SQL against a structured database endpoint such as Amazon Athena, Amazon Redshift, or Snowflake.
Text2Python – When a visual representation of data is needed for the analysis, either by user request or orchestrator decision, the Text2Python tool transforms analytical results into compelling visualizations. This agent processes both the user’s query and the data table retrieved by the Text2SQL tool to generate appropriate Python scripts. Using industry-standard visualization libraries, these scripts execute locally to create diagrams, graphs, or charts that best represent the data. Like its SQL counterpart, this agent includes self-remediation capabilities. When execution errors occur, it uses the error feedback and context to regenerate the Python script, providing reliable visualization delivery.
PythonExecutor – The PythonExecutor takes the generated Python scripts and executes them locally. This allows for the creation of high-quality data visualizations using industry-standard libraries.

The agent then evaluates whether the returned dataset fully answers the user’s question. If the results are insufficient, it automatically regenerates and executes a more refined query to retrieve optimal data. A key feature is the agent’s self-remediation capability. When execution errors occur, the agent uses these errors and the full context to regenerate a corrected SQL query. This self-healing approach provides robust and reliable query processing, even in complex scenarios. The agent processes the inputs—the rewritten question, analysis results, and context—to create a natural language summary and responds to the user, including tabular results with reasoning, visualizations with explanations, and a summary with key insights.
The workflow is illustrated in the following diagram.

The following shows an example of the conversation flow between the user and the agent:

User_A: what are the number of claims by staff name?
Chatbot: The following are the top 10….
User_A: Visualize it
Chatbot: Visualize as a bar chart? [Rewritten Question]
User_A: Confirmed
Chatbot: <IMAGE.PNG>

The agent maintains context through conversations, which means users only need to provide minimal follow-up inputs. It automatically reconstructs abbreviated questions using previous context for confirmation. Additionally, after each question-answer exchange, the agent suggests relevant follow-up exploratory questions for further exploration. The agent enforces consistent terminology, following industry standards, customer guidelines, and brand requirements. It standardizes abbreviations in both inputs and outputs by expanding them to their full forms. For example, “GDPR” is always expanded to General Data Protection Regulation in user input and agent responses. The agent improves text quality by maintaining clarity, correcting grammar, and refining sentence structure. All content is processed to provide professional, readable output.The solution uses the following AWS services and resources:

Amazon Athena – Athena is used as the structured database for storing and querying the data
Amazon Bedrock – The core of this solution is built on Amazon Bedrock, which enables the integration of generative AI agents and Amazon Nova
AWS Glue – We use AWS Glue to prepare and load the dataset into Athena
Amazon Nova – The state-of-the-art FM from Amazon is a key component, providing natural language understanding and generation capabilities
Amazon SageMaker – We use SageMaker to create a notebook instance for running the code and experiments

Prerequisites
You need the following prerequisites to get started implementing the solution in this post:

An AWS account
Familiarity with FMs and Amazon Bedrock
Model access enabled in Amazon Bedrock for Amazon Nova models: Amazon Nova Pro, Amazon Nova Lite, and Amazon Nova Micro

Set up a SageMaker notebook instance
Follow these steps to create a SageMaker notebook instance:

On the SageMaker console, choose Notebook instances in the navigation pane.
Choose Create notebook instance.
For Notebook instance name, enter a name (for example, text2sql-nova-nb).
For Notebook Instance type, choose ml.t3.medium.
On the notebook, click on “IAM role ARN”, and add sagemaker.amazonaws.com and glue.amazonaws.com in the Trust relationships tab.

After the notebook instance has started, clone the GitHub repository.

Download and prepare the database
Follow these steps to download and prepare the database:

Download the Spider dataset. For this walkthrough, we use the insurance and claims database.
Unzip the dataset into the /data folder.
Create an AWS hosted Amazon Simple Storage Service (Amazon S3) bucket (for instructions, see Creating a general purpose bucket).
In the /src folder, use db_setup.ipynb to load the database into Athena.

Run the Streamlit application
To start the Streamlit application, use the following command:streamlit run app.pyThe following screenshot shows the interface.

This demo uses Streamlit for illustration purposes only. For production deployments, review Streamlit’s security configuration and deployment architecture to make sure it aligns with your organization’s security requirements and best practices.
Evaluations
We evaluated the performance of Amazon Nova on the Spider text-to-SQL dataset, a widely used benchmark for complex cross-domain semantic parsing and text-to-SQL tasks. The evaluation provided insights into the Amazon Nova capabilities compared to other state-of-the-art approaches. The Spider dataset contains 10,181 questions and 5,693 unique complex SQL queries on 200 databases with multiple tables covering 138 different domains. The evaluation was conducted in a zero-shot setting, where the models weren’t fine-tuned on examples from the dataset, to assess their general text-to-SQL translation abilities. We used the following evaluation metrics:

Execution accuracy – The execution accuracy metric evaluates whether the predicted SQL query produces the same result as the ground truth SQL query when executed against the database. The execution accuracy provides a practical assessment of the model’s performance because it measures the end-to-end capability to translate natural language questions into executable SQL queries. The top-performing models demonstrate strong execution accuracy on the Spider dataset, with the Amazon Nova model showing particularly competitive performance. Compared to other state-of-the-art models, Amazon Nova achieves similar or slightly higher accuracy across the various query complexity levels. For the most challenging queries, Amazon Nova outperforms the other leading models, showcasing its ability to handle complex natural language-to-SQL translations.
Latency – In addition to accuracy, the speed and responsiveness of the text-to-SQL translation is an important consideration. Here, the Amazon Nova model stands out, demonstrating significantly faster processing times compared to other top-performing models. For a representative set of database queries, Amazon Nova was able to generate the SQL and retrieve the results notably quicker (a latency improvement of about 60%) than the competition. This latency improvement could translate to enhanced user experience and productivity, meaning that business users can use natural language interfaces to interact with databases more seamlessly.

Overall, the evaluation results highlight the strengths of Amazon Nova in both accuracy and efficiency for text-to-SQL translation tasks. Its competitive performance, low latency, and advantages on the most complex queries make it a compelling option for organizations looking to democratize data access and analysis through natural language interfaces.
Clean up
To avoid continuing charges because of resources created as part of this walkthrough, perform the following cleanup steps:

Disable Amazon Bedrock model access for Amazon Nova Pro.
Delete project-specific AWS Glue Data Catalog databases and associated tables.
Remove the Amazon S3 content:

Empty the S3bucket.
Delete the S3 bucket.

Delete the SageMaker notebook instance.

Conclusion
The Generative AI Innovation Center (GenAIIC) at AWS has developed this natural language-based database analytics solution that uses the strengths of state-of-the-art Amazon Nova FMs, along with explicit reasoning steps and actions, as implemented through the ReAct pattern in LangGraph’s flexible architecture. This solution is built using Amazon Bedrock, which enables intuitive, conversation-like interactions with complex database systems. You can seamlessly translate your natural language queries into accurate SQL statements and generate insightful data visualizations. The evaluation results demonstrate the solution’s competitive performance, making it a compelling option for organizations looking to democratize data access and analysis. Furthermore, the GenAIIC provides you with access to a team of experts to help identify valuable use cases and implement practical generative AI solutions tailored to your specific needs, enhancing the potential of this technology.
For more information, refer to Amazon Nova Foundation Models and Amazon Bedrock. If you’re interested in the collaboration with GenAIIC, you can find more information at AWS Generative AI Innovation Center.

Rahul Ghosh is an Applied Scientist at the AWS Generative AI Innovation Center, where he works with AWS customers across different verticals to expedite their use of Generative AI. Rahul holds a PhD in Computer Science from the University of Minnesota.
Gaurav Rele is a Senior Data Scientist at the AWS Generative AI Innovation Center, where he works with AWS customers across different verticals to accelerate their use of generative AI and AWS cloud services to solve their business challenges.
Amaran Asokkumar is a Deep Learning Architect at AWS, specializing in infrastructure, automation, and AI. He leads the design of generative AI-enabled solutions across industry segments. Amaran is passionate about all things AI and helping customers accelerate their generative AI exploration and transformation efforts.
Long Chen is a Sr. Applied Scientist at the AWS Generative AI Innovation Center. He holds a PhD in Applied Physics from the University of Michigan. With more than a decade of experience for research and development, he works on innovative solutions in various domains using generative AI and other machine learning techniques, facilitating the success of AWS customers. His interest includes generative models, multimodal systems, and graph learning.
Jae Oh Woo is a Senior Applied Scientist at the AWS Generative AI Innovation Center, where he specializes in developing custom solutions and model customization for a diverse range of use cases. He has a strong passion for interdisciplinary research that connects theoretical foundations with practical applications in the rapidly evolving field of generative AI. Prior to joining Amazon, Jae Oh was a Simons Postdoctoral Fellow at the University of Texas at Austin. He holds a Ph.D. in Applied Mathematics from Yale University.
Sungmin Hong is a Senior Applied Scientist at the AWS Generative AI Innovation Center, where he helps expedite the variety of use cases of AWS customers. Before joining Amazon, Sungmin was a postdoctoral research fellow at Harvard Medical School. He holds PhD in Computer Science from New York University. Outside of work, he prides himself on keeping his indoor plants alive for over 3 years.
Vidya Sagar Ravipati is a Science Manager at the AWS Generative AI Innovation Center, where he uses his vast experience in large-scale distributed systems and his passion for machine learning to help AWS customers across different industry verticals accelerate their AI and cloud adoption.

Meet Elysia: A New Open-Source Python Framework Redefining Agentic RAG …

If you’ve ever tried to build a agentic RAG system that actually works well, you know the pain. You feed it some documents, cross your fingers, and hope it doesn’t hallucinate when someone asks it a simple question. Most of the time, you get back irrelevant chunks of text that barely answer what was asked.

Elysia is trying to fix this mess, and honestly, their approach is quite creative. Built by the folks at Weaviate, this open-source Python framework doesn’t just throw more AI at the problem – it completely rethinks how AI agents should work with your data.

Note: Python 3.12 required

What’s Actually Wrong with Most RAG Systems

Here’s the thing that drives everyone crazy: traditional RAG systems are basically blind. They take your question, convert it to vectors, find some “similar” text, and hope for the best. It’s like asking someone to find you a good restaurant while they’re wearing a blindfold – they might get lucky, but probably not.

Most systems also dump every possible tool on the AI at once, which is like giving a toddler access to your entire toolbox and expecting them to build a bookshelf.

Elysia’s Three Pillars:

1) Decision Trees

Instead of giving AI agents every tool at once, Elysia guides them through a structured nodes for decisions. Think of it like a flowchart that actually makes sense. Each step has context about what happened before and what options come next.

The really cool part? The system shows you exactly which path the agent took and why, so when something goes wrong, you can actually debug it instead of just shrugging and trying again.

When the AI realizes it can’t do something (like searching for car prices in a makeup database), it doesn’t just keep trying forever. It sets an “impossible flag” and moves on, which sounds obvious but apparently needed to be invented.

2) Smart Data Source Display

Remember when every AI just spat out paragraphs of text? Elysia actually looks at your data and figures out how to show it properly. Got e-commerce products? You get product cards. GitHub issues? You get ticket layouts. Spreadsheet data? You get actual tables.

The system examines your data structure first – the fields, the types, the relationships – then picks one of the seven formats that makes sense.

3) Data Expertise

This might be the biggest difference. Before Elysia searches anything, it analyzes your database to understand what’s actually in there. It can summarize, generate metadata, and choose display types. It looks at:

What kinds of fields you have

What the data ranges look like

How different pieces relate to each other

What would make sense to search for

How does it Work?

Learning from Feedback

Elysia remembers when users say “yes, this was helpful” and uses those examples to improve future responses. But it does this smartly – your feedback doesn’t mess up other people’s results, and it helps the system get better at answering your specific types of questions.

This means you can use smaller, cheaper models that still give good results because they’re learning from actual success cases.

Chunking That Makes Sense

Most RAG systems chunk all your documents upfront, which uses tons of storage and often creates weird breaks. Elysia chunks documents only when needed. It searches full documents first, then if a document looks relevant but is too long, it breaks it down on the fly.

This saves storage space and actually works better because the chunking decisions are informed by what the user is actually looking for.

Model Routing

Different tasks need different models. Simple questions don’t need GPT-4, and complex analysis doesn’t work well with tiny models. Elysia automatically routes tasks to the right model based on complexity, which saves money and improves speed.

https://weaviate.io/blog/elysia-agentic-rag

Getting Started

The setup is quite simple:

Copy CodeCopiedUse a different Browserpip install elysia-ai
elysia start

That’s it. You get both a web interface and the Python framework.

For developers who want to customize things:

Copy CodeCopiedUse a different Browserfrom elysia import tool, Tree

tree = Tree()

@tool(tree=tree)
async def add(x: int, y: int) -> int:
return x + y

tree(“What is the sum of 9009 and 6006?”)

If you have Weaviate data, it’s even simpler:

Copy CodeCopiedUse a different Browserimport elysia
tree = elysia.Tree()
response, objects = tree(
“What are the 10 most expensive items in the Ecommerce collection?”,
collection_names = [“Ecommerce”]
)

Real-World Example: Glowe’s Chatbot

The Glowe skincare chatbot platform uses Elysia to handle complex product recommendations. Users can ask things like “What products work well with retinol but won’t irritate sensitive skin?” and get intelligent responses that consider ingredient interactions, user preferences, and product availability.youtube

This isn’t just keyword matching – it’s understanding context and relationship between ingredients, user history, and product characteristics in ways that would be really hard to code manually.youtube

Summary

Elysia represents Weaviate’s attempt to move beyond traditional ask-retrieve-generate RAG patterns by combining decision-tree agents, adaptive data presentation, and learning from user feedback. Rather than just generating text responses, it analyzes data structure beforehand and selects appropriate display formats while maintaining transparency in its decision-making process. As Weaviate’s planned replacement for their Verba RAG system, it offers a foundation for building more sophisticated AI applications that understand both what users are asking and how to present answers effectively, though whether this translates to meaningfully better real-world performance remains to be seen since it is still in beta.

Check out the TECHNICAL DETAILS and GITHUB PAGE. Feel free to check out our GitHub Page for Tutorials, Codes and Notebooks. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter.
The post Meet Elysia: A New Open-Source Python Framework Redefining Agentic RAG Systems with Decision Trees and Smarter Data Handling appeared first on MarkTechPost.

Implementing OAuth 2.1 for MCP Servers with Scalekit: A Step-by-Step C …

In this tutorial, we’ll explore how to implement OAuth 2.1 for MCP servers step by step. To keep things practical, we’ll build a simple finance sentiment analysis server and secure it using Scalekit, a tool that makes setting up OAuth both faster and easier.

With Scalekit, all we need to do is expose a metadata endpoint URL for MCP clients to discover the server and add authorization middleware for secure token-based authentication. Scalekit handles all the complex OAuth 2.1 flows behind the scenes, so you don’t need to manually implement or manage token generation, refresh, or validation. Once this setup is complete, your MCP server is ready to handle authenticated requests seamlessly. Check out the FULL CODES here.

Setting up dependencies

Alpha Vantage API

To fetch stock news sentiment, we’ll use the Alpha Vantage API. To get a free API key:

Visit the Alpha Vantage platform using this link

Enter your email and the required details.

You’ll receive your API key—copy it and store it securely, as you’ll need it to authenticate your requests.

Node JS

To run the MCP Inspector for testing our application, we need Node.js installed.

Download the latest version of Node.js from nodejs.org

Run the installer.

Keep the default settings and complete the installation.

Python Dependencies

Copy CodeCopiedUse a different Browserpip install fastapi fastmcp mcp scalekit-sdk-python

Scalekit

To start using Scalekit, follow these steps:

Create Your Scalekit Account

Go to scalekit.com and sign up.

Scalekit offers a free tier, so you don’t need to worry about billing.

Once signed in, click “Activate Full-Stack Auth.”

Set Up Permissions

Open the Authorization panel.

Under the Permissions section, click “Add Permission.”

Use the following values:

Permission Name: news:read

Description: Use Alpha Vantage to get Stock Sentiment

Permissions in Scalekit are used to define and manage scopes that control what features or resources your application can access. For example, the news:read permission allows your MCP server to access stock sentiment data from Alpha Vantage, while other permissions could be created to gate additional features or APIs within your application.

Add Your MCP Server

Go to the MCP Servers section and click “Add MCP Server.”

Fill in the required fields:

Server Name: Any name you prefer.

Resource Identifier: A unique identifier for your MCP server. This value is included in the aud claim of access tokens, helping the server validate requests.

For local testing, set it as:

Copy CodeCopiedUse a different Browserhttp://localhost:10000/mcp/

When using FastMCP, the /mcp path is automatically added to the endpoint. Make sure to include the trailing slash at the end to avoid configuration issues. Check out the FULL CODES here.

Set the scope to the permission you just created: news:read

Once the server is created, Scalekit will generate your resource metadata. Be sure to note down the MCP Server Identifier (found next to the server name, e.g., res_88056357768398086), as you’ll need it later.

Resource Metadata Example

Your metadata will look similar to this (but unique for your account):

Metadata Endpoint URL:

Copy CodeCopiedUse a different Browser/.well-known/oauth-protected-resource/mcp

Resource Metadata JSON:

Copy CodeCopiedUse a different Browser{
“authorization_servers”: [
“https://zapp.scalekit.dev/resources/res_88056357768398086”
],
“bearer_methods_supported”: [“header”],
“resource”: “http://localhost:10000/mcp/”,
“resource_documentation”: “http://localhost:10000/mcp/docs”,
“scopes_supported”: [“news:read”]
}

Get API Credentials

Go to Settings → API Credentials.

Copy your Client ID and Environment URL.

Click Generate New Secret to create your Secret Key.

Store these values securely — we’ll need them later for configuration.

.env

We will now create a .env file with the following variables

Copy CodeCopiedUse a different BrowserALPHA_VANTAGE_API_KEY=<YOUR_ALPHA_VANTAGE_API_KEY>
METADATA_JSON_RESPONSE=<YOUR_METADATA_JSON_RESPONSE>

SCALEKIT_ENVIRONMENT_URL=<YOUR_SCALEKIT_ENVIRONMENT_URL>
SCALEKIT_CLIENT_ID=<YOUR_SCALEKIT_CLIENT_ID>
SCALEKIT_CLIENT_SECRET=<YOUR_SCALEKIT_CLIENT_SECRET>
SCALEKIT_RESOURCE_METADATA_URL=<YOUR_SCALEKIT_RESOURCE_METADATA_URL>
SCALEKIT_AUTHORIZATION_SERVERS=<YOUR_SCALEKIT_AUTHORIZATION_SERVERS>
SCALEKIT_AUDIENCE_NAME=<YOUR_SCALEKIT_AUDIENCE_NAME>
SCALEKIT_RESOUCE_NAME=<YOUR_SCALEKIT_RESOURCE_NAME>
SCALEKIT_RESOUCE_DOCS_URL=<YOUR_SCALEKIT_RESOURCE_DOCS_URL>

ALPHA_VANTAGE_API_KEY

Your personal API key from Alpha Vantage, used to fetch stock sentiment data.

METADATA_JSON_RESPONSE

The JSON response generated by Scalekit when you configure your MCP server.

It contains details like authorization servers, supported scopes, and documentation URLs.

SCALEKIT_ENVIRONMENT_URL

The environment URL under the Settings section.

SCALEKIT_CLIENT_ID

The client ID mentioned under the Settings section.

SCALEKIT_CLIENT_SECRET

The secret key you generate under Settings → API Credentials.

SCALEKIT_RESOURCE_METADATA_URL

The URL exposed by your MCP server for metadata requests.

Example:

Copy CodeCopiedUse a different Browserhttp://localhost:10000/.well-known/oauth-protected-resource/mcp

SCALEKIT_AUTHORIZATION_SERVERS

The URL pointing to the MCP Server Identifier issued by Scalekit.

Example:

Copy CodeCopiedUse a different Browserhttps://<your-subdomain>.scalekit.dev/resources/res_***************

You can find the subdomain from the resource metadata JSON

SCALEKIT_AUDIENCE_NAME

The audience (aud) claim used in access tokens to validate requests. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserhttp://localhost:10000/mcp/

SCALEKIT_RESOUCE_NAME

The resource name for your MCP server. In most cases, this is the same as SCALEKIT_AUDIENCE_NAME. Check out the FULL CODES here.

SCALEKIT_RESOUCE_DOCS_URL

The URL where your MCP server’s documentation is hosted.

Example:

Copy CodeCopiedUse a different Browserhttp://localhost:10000/mcp/docs

Configuration File (config.py) 

We will first create a config file to load all the environment variables which will be used later. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserimport os
from dotenv import load_dotenv

load_dotenv()

class Settings():
ALPHA_VANTAGE_API_KEY = os.environ.get(‘ALPHA_VANTAGE_API_KEY’)
METADATA_JSON_RESPONSE = os.environ.get(‘METADATA_JSON_RESPONSE’)
SCALEKIT_ENVIRONMENT_URL = os.environ.get(‘SCALEKIT_ENVIRONMENT_URL’)
SCALEKIT_CLIENT_ID = os.environ.get(‘SCALEKIT_CLIENT_ID’)
SCALEKIT_CLIENT_SECRET = os.environ.get(‘SCALEKIT_CLIENT_SECRET’)
SCALEKIT_RESOURCE_METADATA_URL = os.environ.get(‘SCALEKIT_RESOURCE_METADATA_URL’)
SCALEKIT_AUTHORIZATION_SERVERS = os.environ.get(‘SCALEKIT_AUTHORIZATION_SERVERS’)
SCALEKIT_AUDIENCE_NAME = os.environ.get(‘SCALEKIT_AUDIENCE_NAME’)
SCALEKIT_RESOUCE_NAME = os.environ.get(‘SCALEKIT_RESOUCE_NAME’)
SCALEKIT_RESOUCE_DOCS_URL = os.environ.get(‘SCALEKIT_RESOUCE_DOCS_URL’)
PORT = 10000

settings = Settings()

Stock Sentiment Logic (finance.py)

This code block fetches real-time news sentiment data for a given stock ticker using the Alpha Vantage API. It retrieves the top three recent articles, summarizing their title, summary, source, and publication time for quick insights. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserfrom mcp.server.fastmcp import FastMCP
from typing import Any
import os
import httpx
from typing import Dict, List
from config import settings

# Create an MCP server
mcp = FastMCP(“finance-news”)

BASE_URL = “https://www.alphavantage.co/query”

async def call_alpha_vantage(endpoint: str, params: dict[str, Any]) -> dict[str, Any] | None:
“””Generic async caller to Alpha Vantage.”””
params[“apikey”] = settings.ALPHA_VANTAGE_API_KEY
params[“function”] = endpoint
async with httpx.AsyncClient() as client:
try:
response = await client.get(BASE_URL, params=params, timeout=30.0)
response.raise_for_status()
return response.json()
except Exception:
return None

@mcp.tool()
async def get_news_sentiment(ticker: str) -> str:
“””Get news sentiment data for a stock ticker.

Args:
ticker: Stock ticker symbol (e.g., MSFT, AAPL)
“””
data = await call_alpha_vantage(“NEWS_SENTIMENT”, {“tickers”: ticker.upper()})
if not data or “feed” not in data:
return “Couldn’t retrieve news sentiment.”

articles = data[“feed”][:3]
result = []
for item in articles:
result.append(f”””
{item[‘title’]}
Summary: {item[‘summary’]}
Source: {item[‘source’]} | Published: {item[‘time_published’]}
“””)
return “n—n”.join(result)

Authorization Middleware

This middleware acts as an authorization layer for your MCP server, ensuring that only authenticated requests are processed. It uses the ScaleKit client to validate access tokens on every incoming request. When a request comes in, the middleware first checks if the path is public, such as metadata endpoints under /.well-known/. 

If the request isn’t for a public path, it looks for an Authorization header with a valid Bearer token. The token is then validated using ScaleKit. If the token is missing, invalid, or expired, the middleware immediately responds with a 401 Unauthorized error and a structured error message. Check out the FULL CODES here.

If the token is valid, the request is passed along to the next layer of the application. Additionally, logging is integrated throughout the process to capture key events, making it easier to debug and audit authentication flows.

Finally, this middleware will be imported and added to the server file to protect all secure endpoints. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserimport json
import logging
from fastapi import HTTPException, Request
from fastapi.security import HTTPBearer
from fastapi.responses import JSONResponse
from scalekit import ScalekitClient
from starlette.middleware.base import BaseHTTPMiddleware

from config import settings

# Configure logging
logging.basicConfig(
level=logging.INFO,
format=’%(asctime)s – %(name)s – %(levelname)s – %(message)s’
)
logger = logging.getLogger(__name__)

# Security scheme for Bearer token
security = HTTPBearer()

# Initialize ScaleKit client
scalekit_client = ScalekitClient(
settings.SCALEKIT_ENVIRONMENT_URL,
settings.SCALEKIT_CLIENT_ID,
settings.SCALEKIT_CLIENT_SECRET
)

# Authentication middleware
class AuthMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
if request.url.path.startswith(“/.well-known/”):
return await call_next(request)

try:
auth_header = request.headers.get(“Authorization”)
if not auth_header or not auth_header.startswith(“Bearer “):
raise HTTPException(status_code=401, detail=”Missing or invalid authorization header”)

token = auth_header.split(” “)[1]

request_body = await request.body()

# Parse JSON from bytes
try:
request_data = json.loads(request_body.decode(‘utf-8’))
except (json.JSONDecodeError, UnicodeDecodeError):
request_data = {}

try:
scalekit_client.validate_access_token(token)

except Exception as e:
raise HTTPException(status_code=401, detail=”Token validation failed”)

except HTTPException as e:
return JSONResponse(
status_code=e.status_code,
content={“error”: “unauthorized” if e.status_code == 401 else “forbidden”, “error_description”: e.detail},
headers={
“WWW-Authenticate”: f’Bearer realm=”OAuth”, resource_metadata=”{settings.SCALEKIT_RESOURCE_METADATA_URL}”‘
}
)

return await call_next(request)

MCP Server (server.py)

This script sets up a FastAPI application integrated with an MCP server for stock news sentiment analysis. It begins by importing the necessary libraries, including FastAPI, CORS middleware, and a custom authentication middleware. Check out the FULL CODES here.

The application lifecycle is managed through a combined lifespan context using an asynchronous context manager, ensuring that the finance_news_server.session_manager, which is essentially the stock sentiment logic we created, runs smoothly during the app’s runtime. CORS middleware is configured to allow cross-origin requests, which is useful during development but should be restricted in production environments.

A new endpoint, /.well-known/oauth-protected-resource/mcp, is added to serve metadata for OAuth 2.1 protected resource discovery. This endpoint provides important details such as supported authorization servers, bearer token methods, resource name, documentation URL, and supported scopes — in this case, mcp:tools:news:read.

The MCP server is created using the finance_news_server.streamable_http_app() function and mounted at the root path /, making the core MCP functionalities accessible through the main app. Authentication is enforced by integrating the AuthMiddleware, and the script ensures that this middleware is properly added to the server file. 

Finally, the main() function runs the application using uvicorn, with logging enabled at the debug level, binding the server to localhost on the configured port. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserimport contextlib
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import json
from auth import AuthMiddleware
from config import settings
from finance import mcp as finance_news_server

# Create a combined lifespan to manage the MCP session manager
@contextlib.asynccontextmanager
async def lifespan(app: FastAPI):
async with finance_news_server.session_manager.run():
yield

app = FastAPI(lifespan=lifespan)

# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=[“*”], # In production, specify your actual origins
allow_credentials=True,
allow_methods=[“GET”, “POST”, “PUT”, “DELETE”, “OPTIONS”],
allow_headers=[“*”],
)

# MCP well-known endpoint
@app.get(“/.well-known/oauth-protected-resource/mcp”)
async def oauth_protected_resource_metadata():
“””
OAuth 2.0 Protected Resource Metadata endpoint for MCP client discovery.
Required by the MCP specification for authorization server discovery.
“””

return {
“authorization_servers”: [settings.SCALEKIT_AUTHORIZATION_SERVERS],
“bearer_methods_supported”: [“header”],
“resource”: settings.SCALEKIT_RESOURCE_NAME,
“resource_documentation”: settings.SCALEKIT_RESOURCE_DOCS_URL,
“scopes_supported”: [
“mcp:tools:news:read”
],
}

# Create and mount the MCP server with authentication
mcp_server = finance_news_server.streamable_http_app()
app.add_middleware(AuthMiddleware)
app.mount(“/”, mcp_server)

def main():
“””Main entry point for the MCP server.”””
uvicorn.run(app, host=”localhost”, port=settings.PORT, log_level=”debug”)

if __name__ == “__main__”:
main()

Running the Server

To run the server, execute python server.py, which will start the application on localhost:10000. To test the setup, open another terminal and run:

Copy CodeCopiedUse a different Browsernpx @modelcontextprotocol/inspector

Once the MCP Inspector is running, enter http://localhost:10000/mcp as the server URL. If you attempt to connect without providing valid credentials, you will encounter the following error:

Connection Error: Check if your MCP Server is running and if the proxy token is correctly configured.

Now, provide the Bearer token using the secret ID you generated in Scalekit. Once entered, you will be successfully authenticated and can start making tool calls.

Check out the FULL CODES here. Feel free to check out our GitHub Page for Tutorials, Codes and Notebooks. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter.
The post Implementing OAuth 2.1 for MCP Servers with Scalekit: A Step-by-Step Coding Tutorial appeared first on MarkTechPost.

15 Most Relevant Operating Principles for Enterprise AI (2025)

Enterprise AI is moving from isolated pilots to production-grade, agent-centric systems. The principles below distill the most widely posted requirements and trends in large-scale deployments, based solely on documented industry sources.

1) Distributed agentic architectures

Modern deployments increasingly rely on cooperating AI agents that share tasks instead of a single monolithic model.

2) Open interoperability protocols are indispensable

Standards such as the Model Context Protocol (MCP) allow heterogeneous models and tools to exchange context securely, much like TCP/IP did for networks.

3) Composable building blocks accelerate delivery

Vendors and in-house teams now ship reusable “lego-style” agents and micro-services that snap into existing stacks, helping enterprises avoid one-off solutions.

4) Context-aware orchestration replaces hard-coded workflows

Agent frameworks route work dynamically based on real-time signals rather than fixed rules, enabling processes to adapt to changing business conditions.

5) Agent networks outperform rigid hierarchies

Industry reports describe mesh-like topologies where peer agents negotiate next steps, which improves resilience when any single service fails.

6) AgentOps emerges as the new operational discipline

Teams monitor, version and troubleshoot agent interactions the way DevOps teams manage code and services today.

7) Data accessibility and quality remain the primary scaling bottlenecks

Surveys show that poor, siloed data is responsible for a large share of enterprise AI project failures.

8) Traceability and audit logs are non-negotiable

Enterprise governance frameworks now insist on end-to-end logging of prompts, agent decisions and outputs to satisfy internal and external audits.

9) Compliance drives reasoning constraints

Regulated sectors (finance, healthcare, government) must demonstrate that agent outputs follow applicable laws and policy rules, not just accuracy metrics.

10) Reliable AI depends on trustworthy data pipelines

Bias mitigation, lineage tracking and validation checks on training and inference data are cited as prerequisites for dependable outcomes.

11) Horizontal orchestration delivers the greatest business value

Cross-department agent workflows (e.g., sales supply-chain finance) unlock compound efficiencies that siloed vertical agents cannot achieve.

12) Governance now extends beyond data to agent behaviour

Boards and risk officers increasingly oversee how autonomous agents reason, act and recover from errors, not just what data they consume.

13) Edge and hybrid deployments protect sovereignty and latency-sensitive workloads

Nearly half of large firms cite hybrid cloud–edge setups as critical to meet data-residency and real-time requirements.

14) Smaller, specialized models dominate production use-cases

Enterprises gravitate to domain-tuned or distilled models that are cheaper to run and easier to govern than frontier-scale LLMs.

15) The orchestration layer is the competitive battleground

Differentiation is shifting from raw model size to the reliability, security and adaptability of an enterprise’s agent-orchestration fabric.

By grounding architecture, operations and governance in these evidence-based principles, enterprises can scale AI systems that are resilient, compliant and aligned with real business objectives.

Sources:

https://www.weforum.org/stories/2025/07/enterprise-ai-tipping-point-what-comes-next/

https://www.deloitte.com/us/en/what-we-do/capabilities/applied-artificial-intelligence/content/state-of-generative-ai-in-enterprise.html

https://www.linkedin.com/posts/armand-ruiz_the-operating-principles-of-enterprise-ai-activity-7368236477421375489-ug0R

https://arya.ai/blog/principles-guiding-the-future-of-enterprise-ai

https://appian.com/blog/2025/building-safe-effective-enterprise-ai-systems

https://www.superannotate.com/blog/enterprise-ai-overview

https://shellypalmer.com/2025/05/enterprise-ai-governance-manifesto-the-2025-strategic-framework-for-fortune-500-success/

9 Key AI Governance Frameworks in 2025

Building Scalable AI Solutions: Best Practices for Enterprises in 2025

https://intelisys.com/enterprise-ai-in-2025-a-guide-for-implementation/

Beyond Rules: Agentic AI Orchestration and the Dawn of Emergent Intelligence

https://www.anthropic.com/news/model-context-protocol

https://www.tcs.com/insights/blogs/interoperable-collaborative-ai-ecosystems

https://kore.ai/the-future-of-enterprise-ai-why-you-need-to-start-thinking-about-agent-networks-today/

https://dysnix.com/blog/what-is-agentops

Enterprise AI Governance: Ensuring Trust and Compliance

The post 15 Most Relevant Operating Principles for Enterprise AI (2025) appeared first on MarkTechPost.

Step-by-Step Guide to AI Agent Development Using Microsoft Agent-Light …

In this tutorial, we walk through setting up an advanced AI Agent using Microsoft’s Agent-Lightning framework. We are running everything directly inside Google Colab, which means we can experiment with both the server and client components in one place. By defining a small QA agent, connecting it to a local Agent-Lightning server, and then training it with multiple system prompts, we can observe how the framework supports resource updates, task queuing, and automated evaluation. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browser!pip -q install agentlightning openai nest_asyncio python-dotenv > /dev/null
import os, threading, time, asyncio, nest_asyncio, random
from getpass import getpass
from agentlightning.litagent import LitAgent
from agentlightning.trainer import Trainer
from agentlightning.server import AgentLightningServer
from agentlightning.types import PromptTemplate
import openai
if not os.getenv(“OPENAI_API_KEY”):
try:
os.environ[“OPENAI_API_KEY”] = getpass(” Enter OPENAI_API_KEY (leave blank if using a local/proxy base): “) or “”
except Exception:
pass
MODEL = os.getenv(“MODEL”, “gpt-4o-mini”)

We begin by installing the required libraries & importing all the core modules we need for Agent-Lightning. We also set up our OpenAI API key securely and defined the model we will use for the tutorial. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserclass QAAgent(LitAgent):
def training_rollout(self, task, rollout_id, resources):
“””Given a task {‘prompt’:…, ‘answer’:…}, ask LLM using the server-provided system prompt and return a reward in [0,1].”””
sys_prompt = resources[“system_prompt”].template
user = task[“prompt”]; gold = task.get(“answer”,””).strip().lower()
try:
r = openai.chat.completions.create(
model=MODEL,
messages=[{“role”:”system”,”content”:sys_prompt},
{“role”:”user”,”content”:user}],
temperature=0.2,
)
pred = r.choices[0].message.content.strip()
except Exception as e:
pred = f”[error]{e}”
def score(pred, gold):
P = pred.lower()
base = 1.0 if gold and gold in P else 0.0
gt = set(gold.split()); pr = set(P.split());
inter = len(gt & pr); denom = (len(gt)+len(pr)) or 1
overlap = 2*inter/denom
brevity = 0.2 if base==1.0 and len(P.split())<=8 else 0.0
return max(0.0, min(1.0, 0.7*base + 0.25*overlap + brevity))
return float(score(pred, gold))

We define a simple QAAgent by extending LitAgent, where we handle each training rollout by sending the user’s prompt to the LLM, collecting the response, and scoring it against the gold answer. We design the reward function to verify correctness, token overlap, and brevity, enabling the agent to learn and produce concise and accurate outputs. Check out the FULL CODES here.

Copy CodeCopiedUse a different BrowserTASKS = [
{“prompt”:”Capital of France?”,”answer”:”Paris”},
{“prompt”:”Who wrote Pride and Prejudice?”,”answer”:”Jane Austen”},
{“prompt”:”2+2 = ?”,”answer”:”4″},
]
PROMPTS = [
“You are a terse expert. Answer with only the final fact, no sentences.”,
“You are a helpful, knowledgeable AI. Prefer concise, correct answers.”,
“Answer as a rigorous evaluator; return only the canonical fact.”,
“Be a friendly tutor. Give the one-word answer if obvious.”
]
nest_asyncio.apply()
HOST, PORT = “127.0.0.1”, 9997

We define a tiny benchmark with three QA tasks and curate multiple candidate system prompts to optimize. We then apply nest_asyncio and set our local server host and port, allowing us to run the Agent-Lightning server and clients within a single Colab runtime. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserasync def run_server_and_search():
server = AgentLightningServer(host=HOST, port=PORT)
await server.start()
print(” Server started”)
await asyncio.sleep(1.5)
results = []
for sp in PROMPTS:
await server.update_resources({“system_prompt”: PromptTemplate(template=sp, engine=”f-string”)})
scores = []
for t in TASKS:
tid = await server.queue_task(sample=t, mode=”train”)
rollout = await server.poll_completed_rollout(tid, timeout=40) # waits for a worker
if rollout is None:
print(” Timeout waiting for rollout; continuing…”)
continue
scores.append(float(getattr(rollout, “final_reward”, 0.0)))
avg = sum(scores)/len(scores) if scores else 0.0
print(f” Prompt avg: {avg:.3f} | {sp}”)
results.append((sp, avg))
best = max(results, key=lambda x: x[1]) if results else (“<none>”,0)
print(“n BEST PROMPT:”, best[0], ” | score:”, f”{best[1]:.3f}”)
await server.stop()

We start the Agent-Lightning server and iterate through our candidate system prompts, updating the shared system_prompt before queuing each training task. We then poll for completed rollouts, compute average rewards per prompt, report the best-performing prompt, and gracefully stop the server. Check out the FULL CODES here.

Copy CodeCopiedUse a different Browserdef run_client_in_thread():
agent = QAAgent()
trainer = Trainer(n_workers=2)
trainer.fit(agent, backend=f”http://{HOST}:{PORT}”)
client_thr = threading.Thread(target=run_client_in_thread, daemon=True)
client_thr.start()
asyncio.run(run_server_and_search())

We launch the client in a separate thread with two parallel workers, allowing it to process tasks sent by the server. At the same time, we run the server loop, which evaluates different prompts, collects rollout results, and reports the best system prompt based on average reward.

In conclusion, we will see how Agent-Lightning enables us to create a flexible agent pipeline with only a few lines of code. We can start a server, run parallel client workers, evaluate different system prompts, and automatically measure performance, all within a single Colab environment. This demonstrates how the framework streamlines the process of building, testing, and optimizing AI agents in a structured manner.

Check out the FULL CODES here. Feel free to check out our GitHub Page for Tutorials, Codes and Notebooks. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter.
The post Step-by-Step Guide to AI Agent Development Using Microsoft Agent-Lightning appeared first on MarkTechPost.

NVIDIA AI Team Introduces Jetson Thor: The Ultimate Platform for Physi …

Last week, the NVIDIA robotics team released Jetson Thor that includes Jetson AGX Thor Developer Kit and the Jetson T5000 module, marking a significant milestone for real‑world AI robotics development. Engineered as a supercomputer for physical AI, Jetson Thor brings generative reasoning and multimodal sensor processing to power inference and decision-making at the edge.

Architectural Highlights

Compute Performance

Jetson Thor delivers up to 2,070 FP4 teraflops (TFLOPS) of AI compute via its Blackwell‑based GPU—a leap of 7.5× over the previous Jetson Orin platform. This performance arrives in a 130‑watt power envelope, with configurable operation down to 40 W, balancing high throughput with energy efficiency—approximately 3.5× better than Orin.

Compute Architecture

At its core, Jetson Thor integrates a 2560‑core Blackwell GPU equipped with 96 fifth‑generation Tensor Cores and supports Multi‑Instance GPU (MIG), enabling flexible partitioning of GPU resources for parallel workloads. Complementing this is a 14‑core Arm® Neoverse‑V3AE CPU, with 1 MB L2 per core and 16 MB shared L3 cache.

Memory and I/O

The platform includes 128 GB LPDDR5X memory on a 256‑bit bus at 273 GB/s bandwidth. Storage features include a 1 TB NVMe M.2 slot, along with HDMI, DisplayPort, multiple USB, Gigabit Ethernet, CAN headers, and QSFP28 for up to four 25 GbE lanes—crucial for real-time sensor fusion.

Introducing NVIDIA Jetson Thor, the Ultimate Platform for Physical AI

Software Ecosystem for Physical AI

Jetson Thor supports a comprehensive NVIDIA software stack tailored for robotics and physical AI:

Isaac (GR00T) for generative reasoning and humanoid control.

Metropolis for vision AI.

Holoscan for real-time, low-latency sensor processing and sensor-over-Ethernet (Holoscan Sensor Bridge).

These components allow one system-on-module to execute multimodal AI workflows—vision, language, actuation—without offloading or combining multiple chips.

Introducing NVIDIA Jetson Thor, the Ultimate Platform for Physical AI

Defining ‘Physical AI’ and Its Significance

Generative Reasoning & Multimodal Processing

Physical AI combines perception, reasoning, and action planning. Jetson Thor enables robots to “simulate possible sequences, anticipate consequences, and generate both high-level plans and low-level motion policies,” delivering adaptability akin to human reasoning. By supporting real-time inference over language and visual inputs, it transforms robots from simple automata into generalist agents.

Applications

Robots can better navigate unpredictable environments, manipulate objects, or follow complex instructions without reteaching. Use cases span manufacturing, logistics, healthcare, agriculture, and more.

Developer Access and Pricing

Jetson AGX Thor Developer Kit: priced at $3,499, now generally available.

Jetson T5000 production modules: available through NVIDIA’s partners, with unit pricing around $2,999 for orders of 1,000.

Pre-orders suggest wider availability soon, catering to both research and commercial robotics ecosystems.

Conclusion

NVIDIA Jetson Thor represents a pivotal shift in robotics compute—embedding server-grade, multimodal inference, and reasoning capabilities within a single, power-bounded module. Its combination of 2,070 FP4 TFLOPS, high-efficiency design, expansive I/O, and robust software stack positions it as a foundational platform for the next generation of physical AI systems. With early adoption among prominent robotics developers and ready availability, Jetson Thor brings the vision of adaptable, real-world AI agents closer to reality.

Check out the FULL TECHNICAL DETAILS. Feel free to check out our GitHub Page for Tutorials, Codes and Notebooks. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter.
The post NVIDIA AI Team Introduces Jetson Thor: The Ultimate Platform for Physical AI and Next-Gen Robotics appeared first on MarkTechPost.

Understanding OAuth 2.1 for MCP (Model Context Protocol) Servers: Disc …

OAuth 2.1 is the officially mandated authorization standard in the Model Context Protocol (MCP) specifications. According to the official documentation, authorization servers must implement OAuth 2.1 with proper security measures for both confidential and public clients.

MCP provides authorization at the transport level, allowing clients to securely access restricted servers on behalf of resource owners. OAuth 2.1 was chosen as the framework for MCP because it offers a modern, secure, and standardized approach to managing authorization.

How the Authorization Flow Works

The MCP authorization flow is designed to ensure secure and controlled access to protected servers. It happens in three main phases:

Discovery Phase

When an MCP client tries to connect to a protected server, the server responds with a 401 Unauthorized status along with a WWW-Authenticate header that points to its authorization server. The client then uses the metadata provided by the authorization server to discover its capabilities and understand how to proceed with authentication.

Authorization Phase

Once the client understands how the server handles authorization, it begins the registration and authorization process.

If Dynamic Client Registration is supported, the client can automatically register itself with the authorization server without needing manual setup. During this step, the client provides basic details like its name, type, redirect URLs, and desired scopes. In response, the authorization server issues client credentials — typically a client_id and client_secret — which the client will use in subsequent requests. This process makes onboarding new clients faster and more scalable, especially in large or automated environments.

After registration, the client starts the appropriate OAuth flow:

Authorization Code flow – Used when acting on behalf of a human user.

Client Credentials flow – Used for secure machine-to-machine communication.

In the Authorization Code flow, the user is asked to grant consent. Once approved, the authorization server issues an access token with the appropriate scopes for the client to use.

Access Phase

With the access token in hand, the client sends it along with its requests to the MCP server. The server validates the token, checks the scopes, and only then processes the request and returns the response. Every interaction during this process is logged for auditing and compliance, ensuring security and traceability.

Source: https://modelcontextprotocol.io/specification/draft/basic/authorization

Key Security Enhancements in MCP OAuth 2.1

The MCP authorization specification includes several important security upgrades to make the process safer and more reliable:

Mandatory PKCE

All MCP clients must use PKCE (Proof Key for Code Exchange) as defined in OAuth 2.1. PKCE adds a layer of protection by creating a secret “verifier-challenge” pair, ensuring that only the original client that started the request can exchange the authorization code for tokens. This prevents attacks like code interception or injection.

Strict Redirect URI Validation

Clients have to pre-register their exact redirect URIs with the authorization server. When authorization happens, the server checks for an exact match. This stops attackers from redirecting tokens to unauthorized locations.

Short-Lived Tokens

Authorization servers are encouraged to issue short-lived access tokens. If a token is accidentally exposed or stolen, its short lifespan reduces the risk of misuse.

Granular Scope Model

MCP OAuth 2.1 allows fine-grained permissions using scopes, so clients only get access to what they need. Examples include:

mcp:tools:weather – Access to weather tools only.

mcp:resources:customer-data:read – Read-only access to customer data.

mcp:exec:workflows:* – Permission to run any workflow.

Dynamic Client Registration

MCP clients and servers can support automatic client registration. This lets new clients get their credentials (like client IDs) without manual setup, making it faster and easier to onboard new AI agents securely.

How to Implement OAuth 2.1 for MCP Servers

In the next section of the article, we will dive deep into how to implement OAuth 2.1 for MCP Servers. We will create a simple finance sentiment analysis server and implement authorization using Scalekit which simplifies the entire process.
The post Understanding OAuth 2.1 for MCP (Model Context Protocol) Servers: Discovery, Authorization, and Access Phases appeared first on MarkTechPost.