Agentic Server Primer: Llama.cpp MCP Lesson 7: Process Manager (Part 1)

Agentic Server Primer: Llama.cpp MCP Lesson 7: Process Manager (Part 1)

Agentic Server Primer: Llama.cpp MCP Lesson 7: Process Manager (Part 1)
Give a robot a fish..

Here is where it gets very interesting.  An process manager or harness can distribute jobs, but first we want to give your LLM it's own 'notepad.' Effectively a simple hiearchy would be:

  • Processes are masters (One per file .json)
  • Tasks can exist inside them.
  • Jobs / code snippets can exist inside each task.
  • We want to make a front end manager for this in (Part 2) - right now will be a functional back-end!

Here is your code! Which has been through multiple refactors / audits and should be ready.

import re
import json
import os
from datetime import datetime
from fastmcp import FastMCP
from fastmcp.tools import tool          # ← Required import
from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware
import uvicorn

# Initialize the MCP server
mcp = FastMCP(
    name="Process Manager",
    instructions="Provides a process manager for tracking tasks and their associate jobs along with their associate code blocks")

class TaskManager:
    def __init__(self, storage_dir: str = "processes"):
        self.storage_dir = storage_dir
        os.makedirs(self.storage_dir, exist_ok=True)
        self.current_process = None          # Full process dict (metadata + tasks)
        self.current_process_file = None     # Base name of the active .json file
    def _sanitize_name(self, name: str) -> str:
        """Sanitize user-supplied name to prevent path traversal and invalid filenames."""
        if not name or not str(name).strip():
            name = "unnamed_process"
        safe_name = re.sub(r'[^a-zA-Z0-9_.-]', '_', str(name).strip())
        return safe_name[:150]
    def _get_filepath(self, name: str) -> str:
        """Return full path to the sanitized JSON file."""
        safe_name = self._sanitize_name(name)
        if not safe_name.endswith('.json'):
            safe_name += '.json'
        return os.path.join(self.storage_dir, safe_name)
    def _save_current(self):
        """Persist the current process to disk."""
        if self.current_process and self.current_process_file:
            filepath = self._get_filepath(self.current_process_file)
            with open(filepath, 'w', encoding='utf-8') as f:
                json.dump(self.current_process, f, indent=2, ensure_ascii=False)
    def _load_process(self, name: str) -> bool:
        """Load a process from disk and update internal state."""
        filepath = self._get_filepath(name)
        if not os.path.exists(filepath):
            return False
        try:
            with open(filepath, 'r', encoding='utf-8') as f:
                self.current_process = json.load(f)
            self.current_process_file = name
            return True
        except Exception:
            return False
    @tool()
    def process_new(self, name: str, description: str):
        """Creates a new process with the given name and description."""
        try:
            name = str(name).strip()
            if not name:
                return json.dumps({"success": False, "error": "Process name cannot be empty."})

            filepath = self._get_filepath(name)
            if os.path.exists(filepath):
                return json.dumps({"success": False, "error": f"Process '{name}' already exists."})

            self.current_process = {
                "process_name": name,
                "description": description,
                "created_at": datetime.now().isoformat(),
                "tasks": {}
            }
            self.current_process_file = name
            self._save_current()

            return json.dumps({
                "success": True,
                "message": f"Process '{name}' created successfully.",
                "process": self.current_process
            })
        except Exception as e:
            return json.dumps({"success": False, "error": str(e)})
    @tool()
    def process_load(self, name: str):
        """Loads an existing process by name."""
        try:
            if self._load_process(name):
                task_count = len(self.current_process.get("tasks", {}))
                return json.dumps({
                    "success": True,
                    "message": f"Process '{name}' loaded successfully.",
                    "process_name": self.current_process.get("process_name"),
                    "task_count": task_count,
                    "process_data": self.current_process
                })
            return json.dumps({
                "success": False,
                "error": f"Process '{name}' not found."
            })
        except Exception as e:
            return json.dumps({"success": False, "error": str(e)})
    @tool()
    def process_list_all(self):
        """Lists all saved processes."""
        try:
            files = [f for f in os.listdir(self.storage_dir) if f.endswith('.json')]
            process_list = [{"process": f.replace('.json', '')} for f in files]
            return json.dumps({
                "success": True,
                "process_list": process_list,
                "total": len(process_list)
            })
        except Exception as e:
            return json.dumps({"success": False, "error": str(e), "process_list": []})
    @tool()
    def process_save(self, name: str = None):
        """Saves the current process, optionally renaming it."""
        try:
            if not self.current_process:
                return json.dumps({
                    "success": False,
                    "error": "No active process to save. Create or load a process first."
                })

            if name:
                name = str(name).strip()
                self.current_process["process_name"] = name
                self.current_process_file = name

            self._save_current()

            return json.dumps({
                "success": True,
                "message": f"Process saved successfully as '{self.current_process_file}'.",
                "process_name": self.current_process.get("process_name")
            })
        except Exception as e:
            return json.dumps({"success": False, "error": str(e)})
    @tool()
    def process_delete(self, name: str):
        """Deletes an entire process and its associated file from storage.
        If the deleted process is currently loaded, the in-memory state is cleared."""
        try:
            name = str(name).strip()
            if not name:
                return json.dumps({"success": False, "error": "Process name cannot be empty."})

            filepath = self._get_filepath(name)
            if not os.path.exists(filepath):
                return json.dumps({
                    "success": False,
                    "error": f"Process '{name}' not found."
                })

            # Delete the file
            os.remove(filepath)

            # If this was the currently loaded process, clear internal state
            if (self.current_process_file and
                    self._sanitize_name(self.current_process_file) == self._sanitize_name(name)):
                self.current_process = None
                self.current_process_file = None

            return json.dumps({
                "success": True,
                "message": f"Process '{name}' deleted successfully.",
                "deleted_process": name
            })
        except Exception as e:
            return json.dumps({"success": False, "error": str(e)})
    @tool()
    def task_new(self, task_name: str, task_description: str):
        """Creates a new task within the current process."""
        try:
            if not self.current_process:
                return json.dumps({"success": False, "error": "No active process. Create or load one first."})
            if task_name in self.current_process["tasks"]:
                return json.dumps({"success": False, "error": f"Task '{task_name}' already exists."})

            self.current_process["tasks"][task_name] = {
                "description": task_description,
                "created_at": datetime.now().isoformat(),
                "jobs": {},
                "code": {}
            }
            self._save_current()

            return json.dumps({
                "success": True,
                "message": f"Task '{task_name}' created successfully.",
                "task": self.current_process["tasks"][task_name]
            })
        except Exception as e:
            return json.dumps({"success": False, "error": str(e)})
    @tool()
    def task_list(self):
        """Lists all tasks with summary information."""
        try:
            if not self.current_process:
                return json.dumps({"success": False, "error": "No active process."})

            tasks_summary = {}
            for t_name, t_data in self.current_process["tasks"].items():
                tasks_summary[t_name] = {
                    "description": t_data["description"],
                    "created_at": t_data["created_at"],
                    "job_count": len(t_data.get("jobs", {})),
                    "code_count": len(t_data.get("code", {}))
                }

            return json.dumps({
                "success": True,
                "task_count": len(tasks_summary),
                "tasks": tasks_summary
            }, indent=2)
        except Exception as e:
            return json.dumps({"success": False, "error": str(e)})
    @tool()
    def task_details_get(self, task_name: str):
        """Retrieves complete details of a specific task."""
        try:
            if not self.current_process or task_name not in self.current_process["tasks"]:
                return json.dumps({"success": False, "error": f"Task '{task_name}' does not exist."})

            task_data = self.current_process["tasks"][task_name]
            return json.dumps({
                "success": True,
                "task_name": task_name,
                "task_description": task_data["description"],
                "created_at": task_data["created_at"],
                "jobs": list(task_data.get("jobs", {}).values()),
                "codes": list(task_data.get("code", {}).values()),
                "job_count": len(task_data.get("jobs", {})),
                "code_count": len(task_data.get("code", {}))
            }, indent=2)
        except Exception as e:
            return json.dumps({"success": False, "error": str(e)})
    @tool()
    def task_update_description(self, task_name: str, new_description: str):
        """Updates the description of an existing task."""
        try:
            if not self.current_process or task_name not in self.current_process["tasks"]:
                return json.dumps({"success": False, "error": f"Task '{task_name}' does not exist."})

            old_description = self.current_process["tasks"][task_name]["description"]
            self.current_process["tasks"][task_name]["description"] = new_description
            self._save_current()

            return json.dumps({
                "success": True,
                "message": f"Task '{task_name}' description updated successfully.",
                "task_name": task_name,
                "old_description": old_description,
                "new_description": new_description
            })
        except Exception as e:
            return json.dumps({"success": False, "error": str(e)})
    @tool()
    def task_name_change(self, old_task_name: str, new_task_name: str):
        """Renames an existing task."""
        try:
            if not self.current_process:
                return json.dumps({"success": False, "error": "No active process."})
            if old_task_name not in self.current_process["tasks"]:
                return json.dumps({"success": False, "error": f"Task '{old_task_name}' does not exist."})
            if new_task_name in self.current_process["tasks"]:
                return json.dumps({"success": False, "error": f"Task '{new_task_name}' already exists."})

            self.current_process["tasks"][new_task_name] = self.current_process["tasks"].pop(old_task_name)
            self._save_current()

            return json.dumps({
                "success": True,
                "message": f"Task renamed from '{old_task_name}' to '{new_task_name}' successfully.",
                "old_name": old_task_name,
                "new_name": new_task_name
            })
        except Exception as e:
            return json.dumps({"success": False, "error": str(e)})
    @tool()
    def task_delete(self, task_name: str):
        """Deletes a task from the current process."""
        try:
            if not self.current_process or task_name not in self.current_process["tasks"]:
                return json.dumps({"success": False, "error": f"Task '{task_name}' does not exist."})

            del self.current_process["tasks"][task_name]
            self._save_current()

            return json.dumps({
                "success": True,
                "message": f"Task '{task_name}' deleted successfully."
            })
        except Exception as e:
            return json.dumps({"success": False, "error": str(e)})
    @tool()
    def task_job_add(self, task_name: str, job_name: str, job_description: str):
        """Adds a job to the specified task."""
        try:
            if not self.current_process or task_name not in self.current_process["tasks"]:
                return json.dumps({"success": False, "error": f"Task '{task_name}' does not exist."})

            job_key = f"{job_name}_{datetime.now().isoformat()}"
            self.current_process["tasks"][task_name]["jobs"][job_key] = {
                "name": job_name,
                "description": job_description,
                "created_at": datetime.now().isoformat()
            }
            self._save_current()

            return json.dumps({
                "success": True,
                "message": f"Job '{job_name}' added to task '{task_name}' successfully."
            })
        except Exception as e:
            return json.dumps({"success": False, "error": str(e)})
    @tool()
    def task_job_get(self, task_name: str, job_name: str):
        """Retrieves job(s) matching the given job_name (partial match)."""
        try:
            if not self.current_process or task_name not in self.current_process["tasks"]:
                return json.dumps({"success": False, "error": f"Task '{task_name}' does not exist."})

            jobs = self.current_process["tasks"][task_name].get("jobs", {})
            matching = [data for key, data in jobs.items() if job_name in key or job_name == data.get("name")]

            return json.dumps({
                "success": True,
                "task_name": task_name,
                "jobs": matching
            }, indent=2)
        except Exception as e:
            return json.dumps({"success": False, "error": str(e)})
    @tool()
    def task_code_add(self, task_name: str, code_name: str, code_content: str):
        """Adds a code snippet to the specified task."""
        try:
            if not self.current_process or task_name not in self.current_process["tasks"]:
                return json.dumps({"success": False, "error": f"Task '{task_name}' does not exist."})

            code_key = f"{code_name}_{datetime.now().isoformat()}"
            self.current_process["tasks"][task_name]["code"][code_key] = {
                "name": code_name,
                "content": code_content,
                "created_at": datetime.now().isoformat()
            }
            self._save_current()

            return json.dumps({
                "success": True,
                "message": f"Code '{code_name}' added to task '{task_name}' successfully."
            })
        except Exception as e:
            return json.dumps({"success": False, "error": str(e)})
    @tool()
    def task_code_get(self, task_name: str, code_name: str):
        """Retrieves code snippet(s) matching the given code_name (partial match)."""
        try:
            if not self.current_process or task_name not in self.current_process["tasks"]:
                return json.dumps({"success": False, "error": f"Task '{task_name}' does not exist."})

            codes = self.current_process["tasks"][task_name].get("code", {})
            matching = [data for key, data in codes.items() if code_name in key or code_name == data.get("name")]

            return json.dumps({
                "success": True,
                "task_name": task_name,
                "codes": matching
            }, indent=2)
        except Exception as e:
            return json.dumps({"success": False, "error": str(e)})
    @tool()
    def task_code_delete(self, task_name: str, code_name: str):
        """Deletes code snippet(s) matching the given code_name (partial match)."""
        try:
            if not self.current_process or task_name not in self.current_process["tasks"]:
                return json.dumps({"success": False, "error": f"Task '{task_name}' does not exist."})

            codes = self.current_process["tasks"][task_name].get("code", {})
            keys_to_delete = [k for k in codes if code_name in k or code_name == codes[k].get("name")]
            deleted = [codes[k] for k in keys_to_delete]

            for k in keys_to_delete:
                del self.current_process["tasks"][task_name]["code"][k]

            self._save_current()

            if not deleted:
                return json.dumps({
                    "success": False,
                    "message": f"No code matching '{code_name}' found in task '{task_name}'.",
                    "task_name": task_name,
                    "code_name": code_name
                })

            return json.dumps({
                "success": True,
                "message": f"Deleted {len(deleted)} code snippet(s) matching '{code_name}'.",
                "task_name": task_name,
                "deleted_count": len(deleted),
                "deleted_codes": deleted
            })
        except Exception as e:
            return json.dumps({"success": False, "error": str(e)})
    @tool()
    def task_jobs_list(self):
        """Lists all tasks (identical to task_list for compatibility)."""
        return self.task_list()
    @tool()
    def task_code_update(self, task_name: str, code_name: str, new_content: str):
        """Updates (by creating a new version of) a code snippet."""
        try:
            if not self.current_process or task_name not in self.current_process["tasks"]:
                return json.dumps({"success": False, "error": f"Task '{task_name}' does not exist."})

            code_key = f"{code_name}_{datetime.now().isoformat()}"
            self.current_process["tasks"][task_name]["code"][code_key] = {
                "name": code_name,
                "content": new_content,
                "created_at": datetime.now().isoformat()
            }
            self._save_current()

            return json.dumps({
                "success": True,
                "message": f"Code '{code_name}' updated successfully in task '{task_name}'.",
                "new_version_created": True
            })
        except Exception as e:
            return json.dumps({"success": False, "error": str(e)})


# Registration remains unchanged (place after class definition)
# task_manager = TaskManager()
# mcp.add_tool(task_manager.process_new)
# ... (add all other methods as before)



task_manager = TaskManager()
# ── Register all bound methods as tools (required step) ─────────────────────
mcp.add_tool(task_manager.process_new)
mcp.add_tool(task_manager.process_load)
mcp.add_tool(task_manager.process_list_all)
mcp.add_tool(task_manager.process_save)
mcp.add_tool(task_manager.process_delete)
mcp.add_tool(task_manager.task_new)
mcp.add_tool(task_manager.task_update_description)
mcp.add_tool(task_manager.task_delete)
mcp.add_tool(task_manager.task_name_change)
mcp.add_tool(task_manager.task_job_add)
mcp.add_tool(task_manager.task_job_get)
mcp.add_tool(task_manager.task_details_get)
mcp.add_tool(task_manager.task_list)
mcp.add_tool(task_manager.task_code_add)
mcp.add_tool(task_manager.task_code_get)
mcp.add_tool(task_manager.task_code_delete)
mcp.add_tool(task_manager.task_jobs_list)
mcp.add_tool(task_manager.task_code_update)

# ── Server Startup with CORS (required for llama.cpp frontend) ───────────────
if __name__ == "__main__":
    middleware = [
        Middleware(
            CORSMiddleware,
            allow_origins=["*"],  # Restrict in production
            allow_credentials="True",
            allow_methods=["GET", "POST", "OPTIONS"],
            allow_headers=["*"],
            expose_headers=["*"],
        )
    ]

    app = mcp.http_app(
        path="/mcp",
        middleware=middleware
    )

    uvicorn.run(
        app,
        host="0.0.0.0",
        port=5008,
        log_level="info"
    )

Once it is stood up it will listen on port 5008.  You can add it to your Llama server as in:

Once added it will look as:

Testing.  Of very interest now testing cycles are not really done by you - they are done by your LLM.  Some important points:

  • Each LLM may like its agentic tool structured differently, the best way is to simply paste the MCP code into the LLM and ask it to audit it, and or audit the docstring.

Naturally when this is done you will need to dockerize it, and let it live happily on port 5008, a simply walkthrough follows.

Dockerfile, requirements.txt, and docker-compose.yml

The following files have been prepared specifically for the provided TaskManager + FastMCP application. They follow industry best practices for security, reproducibility, and data persistence.

Save the entire Python code you supplied as app.py in your project root directory.


1. requirements.txt

fastmcp
starlette
uvicorn[standard]

This installs:

  • fastmcp (the core MCP framework)
  • starlette (explicitly for Middleware and CORSMiddleware)
  • uvicorn[standard] (ASGI server with recommended extras for production)

2. Dockerfile

# syntax=docker/dockerfile:1.4
FROM python:3.12-slim AS builder

# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1 \
    PYTHONUNBUFFERED=1

WORKDIR /app

# Install build dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
    gcc \
    && rm -rf /var/lib/apt/lists/*

# Copy and install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir --upgrade pip && \
    pip install --no-cache-dir -r requirements.txt

# Final stage
FROM python:3.12-slim

# Create non-root user for security
RUN useradd --create-home --shell /bin/false appuser

WORKDIR /app

# Copy installed packages from builder
COPY --from=builder /usr/local/lib/python3.12/site-packages /usr/local/lib/python3.12/site-packages
COPY --from=builder /usr/local/bin /usr/local/bin

# Copy application code
COPY app.py .

# Create persistent storage directory and set permissions
RUN mkdir -p /app/processes && \
    chown -R appuser:appuser /app

# Switch to non-root user
USER appuser

EXPOSE 5008

# Health check (optional but recommended)
HEALTHCHECK --interval=30s --timeout=5s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:5008/mcp/health || exit 1

CMD ["python", "app.py"]

3. docker-compose.yml

version: '3.9'

services:
  process-manager:
    build:
      context: .
      dockerfile: Dockerfile
    container_name: process-manager
    restart: unless-stopped
    ports:
      - "5008:5008"
    volumes:
      # Persistent storage for process JSON files
      - process-data:/app/processes
    environment:
      - PYTHONUNBUFFERED=1
    # Optional: limit resources in production
    # deploy:
    #   resources:
    #     limits:
    #       cpus: '1.0'
    #       memory: 512M

volumes:
  process-data:
    driver: local
    name: process-manager-data

Deployment Instructions

  1. Place the three files and app.py in the same directory.
  2. Build and start the service:
docker compose up -d --build
  1. Verify the server is running:
docker compose logs -f process-manager

The MCP endpoint will be available at http://localhost:5008/mcp.

Key Benefits of This Setup

  • Secure (non-root user, minimal base image)
  • Persistent data (the processes/ directory survives container restarts)
  • Fast rebuilds (multi-stage build)
  • Production-ready (healthcheck, restart policy, resource isolation)

If you require additional features (e.g., .dockerignore, environment variables, HTTPS with Caddy/Traefik, or multi-container setup), please provide further details.

"We need a front end for this process manager write the task and go through each task writing all the code."

How far do you think it will get?

Linux Rocks Every Day