| Queue + Poll | WebSocket | Serverless endpoint | |
|---|---|---|---|
| How it works | POST /prompt → poll /history | ws:// live progress events | Managed wrapper (RunPod, fal.ai) |
| Latency model | Async - client polls every N seconds | Real-time - progress as it happens | Async or sync depending on provider |
| Best for | Backend batch jobs, queued pipelines | UI progress bars, real-time products | Zero-infra API access |
| Timeout risk | Low - polling retries on network failure | High - connection drop loses job | Low - provider handles retry |
| Complexity | Low - plain HTTP | Medium - WebSocket state management | Low - REST only |
ComfyUI was not designed as a production API. It exposes a WebSocket interface for real-time progress and an HTTP endpoint for submitting workflows. Wrapping this into a reliable production service requires understanding the queue model, how to poll for results, and how to handle failures. This guide documents the actual API - derived from ComfyUI's server.py source (github.com/comfyanonymous/ComfyUI/blob/master/server.py).
The Endpoints You Actually Need
If you are calling a self-hosted ComfyUI instance, think in two layers: ComfyUI itself handles image generation, and your production API layer handles authentication, queuing, retries, and storage. If you are calling a managed provider, keep the provider boundary in mind because the HTTP shape may differ.
- POST /prompt - submit a full workflow JSON. Return fields include prompt_id, number, and node_errors. Treat number as internal ordering metadata, not as a durable queue position.
- GET /history/{prompt_id} - poll for completion and read the outputs object once the job finishes.
- GET /view - download a generated file by filename, subfolder, and type.
- GET /queue - inspect queue_running and queue_pending. This is the backlog signal.
- GET /system_stats - check RAM, VRAM, and device metadata for health checks.
- GET /object_info - discover node schemas, required inputs, and available node types.
Important: ComfyUI does not expose a simple {"prompt":"a cat"} text API. You submit the full node graph and update the nodes you care about before sending it.
Quickstart: Smoke Test the API
Before building anything around ComfyUI, verify the API answers on a fresh host:
curl -fsS http://localhost:8188/system_stats
curl -fsS http://localhost:8188/queue
curl -fsS http://localhost:8188/object_info
curl -fsS -X POST http://localhost:8188/prompt \
-H 'Content-Type: application/json' \
-d @workflow_api.jsonIf the POST returns a prompt_id and the queue endpoint updates, your ComfyUI instance is reachable. If /system_stats works but /prompt fails, the workflow JSON is invalid or a required node input is missing.
Exporting a Workflow as API JSON
In ComfyUI's web UI, use "Save (API Format)" (keyboard shortcut: Ctrl+Shift+S, or enable the Dev Mode toggle in Settings first) to export the workflow in the format the API expects. This format differs from the default save format - it flattens the node graph into a plain object keyed by node ID strings, with no UI layout data.
A minimal 3-node workflow (load checkpoint → encode text → KSampler) in API format:
{
"1": {
"class_type": "CheckpointLoaderSimple",
"_meta": { "title": "Load Checkpoint" },
"inputs": {
"ckpt_name": "sd_xl_base_1.0.safetensors"
}
},
"2": {
"class_type": "CLIPTextEncode",
"_meta": { "title": "Positive Prompt" },
"inputs": {
"clip": ["1", 1],
"text": "a photograph of an astronaut riding a horse"
}
},
"3": {
"class_type": "CLIPTextEncode",
"_meta": { "title": "Negative Prompt" },
"inputs": {
"clip": ["1", 1],
"text": ""
}
},
"4": {
"class_type": "EmptyLatentImage",
"_meta": { "title": "Empty Latent Image" },
"inputs": {
"width": 1024,
"height": 1024,
"batch_size": 1
}
},
"5": {
"class_type": "VAEDecode",
"_meta": { "title": "VAE Decode" },
"inputs": {
"samples": ["6", 0],
"vae": ["1", 2]
}
},
"6": {
"class_type": "KSampler",
"_meta": { "title": "KSampler" },
"inputs": {
"model": ["1", 0],
"positive": ["2", 0],
"negative": ["3", 0],
"latent_image": ["4", 0],
"seed": 42,
"steps": 20,
"cfg": 7.0,
"sampler_name": "euler",
"scheduler": "normal",
"denoise": 1.0
}
},
"7": {
"class_type": "SaveImage",
"_meta": { "title": "Save Image" },
"inputs": {
"images": ["5", 0],
"filename_prefix": "output"
}
}
}Node inputs can be either literal values (strings, numbers) or references to other nodes' outputs, written as a two-element array: ["node_id", output_index]. For example, "model": ["1", 0] means "take output index 0 from node 1" - the model tensor output from CheckpointLoaderSimple. Output indices are documented in each node's class definition.
Pattern 1: Queue and Poll
The simplest and most robust production pattern. Submit the workflow, receive a prompt_id, then poll GET /history/{prompt_id} until the job appears in the response. No persistent connection required - works across load balancers and process restarts.
import uuid
import time
import httpx # pip install httpx (or use requests)
COMFYUI_URL = "http://localhost:8188"
def run_workflow(workflow: dict) -> dict:
"""Submit a workflow and block until complete. Returns the history entry."""
client_id = str(uuid.uuid4())
# Submit the workflow
resp = httpx.post(f"{COMFYUI_URL}/prompt", json={
"prompt": workflow,
"client_id": client_id,
})
resp.raise_for_status()
data = resp.json()
if data.get("node_errors"):
raise ValueError(f"Workflow has node errors: {data['node_errors']}")
prompt_id = data["prompt_id"]
print(f"Submitted: prompt_id={prompt_id}, queue_position={data['number']}")
# Poll until complete (history endpoint returns {} while pending)
while True:
history = httpx.get(f"{COMFYUI_URL}/history/{prompt_id}").json()
if prompt_id in history:
return history[prompt_id]
time.sleep(0.5)
def get_output_image(filename: str, subfolder: str = "") -> bytes:
"""Fetch a generated image by filename. Returns raw bytes."""
resp = httpx.get(f"{COMFYUI_URL}/view", params={
"filename": filename,
"type": "output",
"subfolder": subfolder,
})
resp.raise_for_status()
return resp.content
def extract_images(history_entry: dict) -> list[dict]:
"""Pull image references from a completed history entry."""
images = []
for node_id, node_output in history_entry.get("outputs", {}).items():
for img in node_output.get("images", []):
images.append(img)
return images
# Usage
if __name__ == "__main__":
import json
with open("workflow_api.json") as f:
workflow = json.load(f)
result = run_workflow(workflow)
for img in extract_images(result):
image_bytes = get_output_image(img["filename"], img.get("subfolder", ""))
with open(img["filename"], "wb") as f:
f.write(image_bytes)
print(f"Saved: {img['filename']}")Node.js Quickstart
If your integration is in TypeScript or Node.js, the request flow is identical: submit the workflow, poll /history, then fetch bytes from /view.
const COMFYUI_URL = 'http://localhost:8188'
async function submitWorkflow(workflow: unknown) {
const response = await fetch(`${COMFYUI_URL}/prompt`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
prompt: workflow,
client_id: crypto.randomUUID(),
}),
})
if (!response.ok) throw new Error(`Prompt submit failed: ${response.status}`)
return response.json()
}
async function waitForResult(promptId: string) {
for (;;) {
const response = await fetch(`${COMFYUI_URL}/history/${promptId}`)
const history = await response.json()
if (history[promptId]) return history[promptId]
await new Promise((resolve) => setTimeout(resolve, 500))
}
}Pattern 2: WebSocket for Real-Time Progress
For UI applications or server-side progress tracking, connect via WebSocket before submitting the workflow. The WebSocket stream delivers node-level execution events, step-level progress updates, and image previews mid-generation.
import uuid
import json
import httpx
import websocket # pip install websocket-client
COMFYUI_URL = "http://localhost:8188"
COMFYUI_WS = "ws://localhost:8188"
def run_with_progress(workflow: dict) -> list[dict]:
"""Submit a workflow and stream progress. Returns list of output image refs."""
client_id = str(uuid.uuid4())
ws = websocket.WebSocket()
ws.connect(f"{COMFYUI_WS}/ws?clientId={client_id}")
# Submit after connecting (so we don't miss any events)
resp = httpx.post(f"{COMFYUI_URL}/prompt", json={
"prompt": workflow,
"client_id": client_id,
})
resp.raise_for_status()
prompt_id = resp.json()["prompt_id"]
print(f"Submitted: {prompt_id}")
output_images = []
# Listen for events until execution is complete
while True:
raw = ws.recv()
if isinstance(raw, bytes):
# Binary messages are image previews - skip or handle separately
continue
msg = json.loads(raw)
msg_type = msg.get("type")
data = msg.get("data", {})
if msg_type == "status":
queue_remaining = data.get("status", {}).get("exec_info", {}).get("queue_remaining", "?")
print(f"Queue remaining: {queue_remaining}")
elif msg_type == "executing":
if data.get("prompt_id") != prompt_id:
continue # event for a different job
if data.get("node") is None:
# node=None signals that this prompt finished executing
print("Execution complete.")
break
print(f"Executing node: {data['node']}")
elif msg_type == "progress":
if data.get("prompt_id") != prompt_id:
continue
print(f" Step {data['value']}/{data['max']}")
elif msg_type == "executed":
if data.get("prompt_id") != prompt_id:
continue
images = data.get("output", {}).get("images", [])
output_images.extend(images)
for img in images:
print(f" Output ready: {img['filename']}")
ws.close()
return output_imagesParameterizing Prompts
To use the same workflow template with different prompts, load the exported API JSON and modify the target node inputs before submission. The reliable approach is to identify nodes by their class_type or by the _meta.title field set in the UI.
import json
import copy
def load_workflow(path: str) -> dict:
with open(path) as f:
return json.load(f)
def set_prompt(
workflow: dict,
positive_prompt: str,
negative_prompt: str,
seed: int | None = None,
) -> dict:
"""Return a new workflow with updated prompts and optional seed."""
wf = copy.deepcopy(workflow) # never mutate the original template
for node_id, node in wf.items():
class_type = node.get("class_type", "")
title = node.get("_meta", {}).get("title", "").lower()
if class_type == "CLIPTextEncode":
if "neg" in title or "negative" in title:
node["inputs"]["text"] = negative_prompt
else:
node["inputs"]["text"] = positive_prompt
if seed is not None and class_type == "KSampler":
node["inputs"]["seed"] = seed
return wf
def set_checkpoint(workflow: dict, checkpoint_name: str) -> dict:
"""Swap the checkpoint in a workflow."""
wf = copy.deepcopy(workflow)
for node in wf.values():
if node.get("class_type") == "CheckpointLoaderSimple":
node["inputs"]["ckpt_name"] = checkpoint_name
return wf
# Usage
if __name__ == "__main__":
import random
from comfyui_client import run_workflow, get_output_image, extract_images
template = load_workflow("workflow_api.json")
workflow = set_prompt(
template,
positive_prompt="a high-resolution photograph of a red sports car, studio lighting",
negative_prompt="blurry, low quality, watermark",
seed=random.randint(0, 2**32 - 1),
)
result = run_workflow(workflow)
for img in extract_images(result):
data = get_output_image(img["filename"])
with open(f"out_{img['filename']}", "wb") as f:
f.write(data)Authentication: Securing Your ComfyUI Endpoint
ComfyUI has no built-in authentication. When you start it with --listen, the API is publicly reachable by anyone who can reach that port. In production, never expose port 8188 directly - put a reverse proxy in front of it.
Nginx with X-API-Key
The cleanest pattern: bind ComfyUI to localhost only, then let Nginx handle TLS termination and API key validation. Clients send X-API-Key in every request; Nginx rejects anything that does not match.
server {
listen 443 ssl;
server_name api.yoursite.com;
ssl_certificate /etc/letsencrypt/live/api.yoursite.com/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/api.yoursite.com/privkey.pem;
location / {
# Reject requests without a valid API key
if ($http_x_api_key != "your-secret-key-here") {
return 401 "Unauthorized";
}
proxy_pass http://127.0.0.1:8188;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
}
}Start ComfyUI with --listen 127.0.0.1 (not 0.0.0.0) so it binds to loopback only. The Docker equivalent is -p 127.0.0.1:8188:8188 - without the 127.0.0.1 prefix, Docker bypasses your host firewall entirely.
Error Handling and Retries
The POST /prompt response includes a node_errors field. If any node in your workflow has an invalid input, the execution is rejected before it queues and the errors are described per node_id. Check this field immediately - a non-empty object means the job will never run.
import time, requests
def queue_with_retry(prompt: dict, api_url: str, api_key: str, max_retries=3):
headers = {"X-API-Key": api_key, "Content-Type": "application/json"}
# Submit
resp = requests.post(f"{api_url}/prompt",
json={"prompt": prompt}, headers=headers)
resp.raise_for_status()
data = resp.json()
# Reject if workflow has validation errors
if data.get("node_errors"):
raise ValueError(f"Workflow errors: {data['node_errors']}")
prompt_id = data["prompt_id"]
# Poll with exponential backoff
for attempt in range(max_retries):
time.sleep(2 ** attempt) # 1s, 2s, 4s
h = requests.get(f"{api_url}/history/{prompt_id}", headers=headers).json()
if prompt_id in h:
return h[prompt_id] # job complete
raise TimeoutError(f"Job {prompt_id} did not complete after {max_retries} retries")Over the WebSocket, listen for the execution_error message type - it fires when a node crashes mid-execution and includes the exception traceback. If you see it, the prompt_id in /history will have an empty outputs dict and a non-null error key.
Retrieving Output Images
When a job completes, the history response contains an outputs dict keyed by node_id. Each SaveImage node produces a list of { filename, subfolder, type } objects. Fetch the actual bytes with GET /view passing those three fields as query parameters.
def download_outputs(prompt_id: str, api_url: str, api_key: str) -> list[bytes]:
headers = {"X-API-Key": api_key}
history = requests.get(f"{api_url}/history/{prompt_id}", headers=headers).json()
images = []
for node_output in history[prompt_id]["outputs"].values():
for img in node_output.get("images", []):
params = {
"filename": img["filename"],
"subfolder": img["subfolder"],
"type": img["type"],
}
r = requests.get(f"{api_url}/view", params=params, headers=headers)
r.raise_for_status()
images.append(r.content) # raw JPEG or PNG bytes
return imagesDon't serve ComfyUI's output/ directory directly - it accumulates files from all jobs and has no per-tenant isolation. The right pattern: download bytes via /view immediately after completion, upload to object storage (S3, R2, GCS) with a job-scoped key, return the signed URL to your client, then let a cleanup job wipe output/ on a schedule.
Production Architecture: One GPU Per Worker
ComfyUI loads models into VRAM and keeps them resident. Concurrent executions on the same instance do not parallelize - they queue. The production pattern that actually scales: one ComfyUI instance per GPU, fronted by a job queue.
- Your API server receives the request, pushes a job to Redis or SQS, returns a job_id immediately.
- Worker pool: each worker owns one GPU, runs one ComfyUI instance on localhost, polls the queue. No shared state between workers.
- Output storage: worker downloads from /view, uploads to S3, writes the signed URL back to the job record.
- Status endpoint: your API server reads job status from the queue/DB. Clients poll or receive a webhook - they never talk to ComfyUI directly.
This architecture lets you scale horizontally by adding GPUs, roll workers independently, and avoid the authentication problem entirely - ComfyUI never faces the public internet. For cold start latency benchmarks across GPU providers, see the GPU Cold Start Benchmarks article.
What ComfyUI's API Cannot Do (Yet)
- No request authentication built in - all requests to /prompt are accepted without credentials (see the Docker guide for nginx auth)
- No per-request rate limiting - a client can flood the queue with no throttling
- No webhook callbacks - you must actively poll GET /history or maintain a persistent WebSocket connection to know when a job finishes
- No typed input validation - a malformed workflow JSON (wrong node class, missing required input, bad reference) causes a silent failure or a hard crash depending on the node implementation
- No horizontal scaling built in - one ComfyUI process handles one inference job at a time; additional jobs queue internally
- No built-in job persistence - if ComfyUI crashes or is restarted, the queue is lost; submitted jobs that have not started must be resubmitted
Running Multiple Jobs in Parallel
ComfyUI processes one workflow at a time on a single GPU. Additional submissions queue up internally - there is no parallel execution within a single instance. To scale throughput for production, run multiple ComfyUI instances on separate GPUs (or separate machines) and route incoming requests with a load balancer.
The GET /queue endpoint returns the current state of the queue: { "queue_running": [...], "queue_pending": [...] }. The length of queue_pending is the backlog. A simple routing strategy: submit to the instance with the shortest queue_pending. This gives you horizontal scaling with minimal infrastructure - no distributed queue needed if your workloads are GPU-bound and roughly equal in duration.
For more reliable production deployments, add a proper job queue layer (Redis + BullMQ, Celery, or a managed queue service) that survives ComfyUI restarts and retries failed jobs automatically.
Persistent Job Queue: Surviving ComfyUI Restarts
ComfyUI's internal queue is in-memory. If the process crashes or is restarted, all pending jobs are lost. For production, add a persistent queue layer in front of ComfyUI - your API server pushes jobs there, workers pull them and call ComfyUI, and the queue survives restarts automatically.
The minimal stack: Redis + BullMQ (Node.js) or Celery + Redis (Python). Below is the production pattern with BullMQ:
import { Queue, Worker } from 'bullmq'
import Redis from 'ioredis'
const connection = new Redis(process.env.REDIS_URL || 'redis://localhost:6379')
// --- API side: enqueue and return job_id immediately ---
const imageQueue = new Queue('image-generation', { connection })
export async function enqueueJob(workflow: unknown) {
const job = await imageQueue.add('generate', { workflow }, {
attempts: 3,
backoff: { type: 'exponential', delay: 5000 },
removeOnComplete: { count: 1000 },
removeOnFail: { count: 5000 },
})
return job.id // return this to the caller for status polling
}
// --- Worker side: one worker per GPU, calls local ComfyUI ---
const COMFYUI_URL = process.env.COMFYUI_URL || 'http://localhost:8188'
const worker = new Worker('image-generation', async (job) => {
const { workflow } = job.data
const res = await fetch(`${COMFYUI_URL}/prompt`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ prompt: workflow, client_id: job.id }),
})
const { prompt_id } = await res.json()
// Poll /history until complete (max 240s)
for (let i = 0; i < 120; i++) {
await new Promise(r => setTimeout(r, 2000))
const hist = await fetch(`${COMFYUI_URL}/history/${prompt_id}`).then(r => r.json())
if (hist[prompt_id]) {
return await uploadOutputsToS3(hist[prompt_id].outputs, job.id)
}
}
throw new Error('Timeout: job did not complete in 240s')
}, {
connection,
concurrency: 1, // one job at a time per GPU worker
})
worker.on('failed', (job, err) => {
console.error(JSON.stringify({ job_id: job?.id, error: err.message }))
})Key properties: attempts: 3 retries on failure with exponential backoff. concurrency: 1 ensures one ComfyUI job at a time per worker. Redis persists the queue state across restarts - if a worker crashes mid-job, BullMQ re-queues it automatically after the lock expires.
Observability: What to Monitor in Production
ComfyUI emits no metrics by default. You need to instrument the layer around it. These are the signals that matter:
Queue Metrics
- Queue depth (queue_pending length): alert if > 5 for more than 60s - means workers are backed up or dead
- Job wait time (enqueued → worker picks up): p95 > 30s means you need another GPU worker
- Job duration (worker picks up → result ready): track p50 and p95 per model; SDXL 30-step baseline is ~8s on an A100
- Error rate: failed / total jobs per 5-minute window; alert if > 5%
GPU and System Metrics
- VRAM utilization: alert if > 90% - you are about to OOM
- GPU utilization %: consistent 0% while queue is non-empty = worker is stuck or model failed to load
- Host RAM: ComfyUI can OOM the host during model swaps if RAM is undersized for your checkpoint size
Structured Logging
Emit one JSON line per job phase. Ship to Loki, Datadog, or any log aggregator. This covers all queue metrics without a separate Prometheus stack:
import json, time
def log_job(prompt_id: str, status: str, duration_ms: int, model: str, error: str = None):
"""status: queued | running | done | failed"""
print(json.dumps({
"ts": time.time(),
"prompt_id": prompt_id,
"status": status,
"duration_ms": duration_ms,
"model": model,
"error": error,
}), flush=True)Minimum Viable Alert Rules
- queue_pending > 5 for 2 minutes → worker is down or GPU OOM - page immediately
- error_rate > 10% over 5 minutes → check ComfyUI logs for a bad workflow or missing model
- job_duration p95 > 3× baseline → GPU throttling or node graph changed unexpectedly
- VRAM > 95% sustained → trigger graceful drain before the process crashes
For a single GPU worker, Grafana + Prometheus is overkill at the start. Structured JSON logs with one alert per signal covers all of the above with minimal infra.