Architecture#

SGLang-Omni is a multi-stage pipeline framework for omni models β€” models with multimodal inputs (text, audio, images, video) and multimodal outputs. It decomposes model inference into specialized stages that can be independently scaled and deployed across heterogeneous hardware.

Design Principles#

  • Multi-stage decomposition: Omni models contain components (encoders, decoders, thinkers) with different compute profiles. SGLang-Omni lets each stage run independently with its own resources.

  • Async-first: The entire pipeline is built on Python asyncio for concurrent request handling and non-blocking data transfer.

  • Pluggable data relay: Stages communicate via a relay abstraction that supports shared memory, NCCL, Nixl (RDMA), and Mooncake backends β€” from single-machine to multi-node deployments.

  • Configuration-driven: Pipelines are declared as config objects, compiled into runtimes, and launched by a runner. No boilerplate wiring code needed.

System Overview#

        graph TB
    subgraph API["API Layer"]
        OAI["OpenAI-Compatible Server<br/>(FastAPI + Uvicorn)"]
        Client["Client"]
    end

    subgraph Orchestration["Pipeline Orchestration"]
        Coord["Coordinator"]
        S1["Stage 1"]
        S2["Stage 2"]
        SN["Stage N"]
    end

    subgraph Compute["Compute Layer"]
        W1["Workers + Executors"]
        W2["Workers + Executors"]
        WN["Workers + Executors"]
        E1["Engine<br/>(Scheduler + ModelRunner)"]
        E2["Engine<br/>(Scheduler + ModelRunner)"]
    end

    subgraph Transport["Data Transport"]
        ZMQ["ZMQ Control Plane<br/>(PUSH/PULL/SUB)"]
        Relay["Relay Data Plane<br/>(SHM / NCCL / Nixl / Mooncake)"]
    end

    OAI --> Client
    Client --> Coord
    Coord -- "submit / complete" --> S1
    S1 -- "data ready" --> S2
    S2 -- "data ready" --> SN
    S1 --- W1
    S2 --- W2
    SN --- WN
    W1 --- E1
    W2 --- E2
    S1 & S2 & SN --- ZMQ
    S1 & S2 & SN --- Relay
    

Module Structure#

sglang_omni/
β”œβ”€β”€ config/         # Pipeline definition, validation, and compilation
β”œβ”€β”€ pipeline/       # Coordinator, stages, workers, control plane
β”œβ”€β”€ engines/        # Compute backends (OmniEngine, schedulers)
β”œβ”€β”€ executors/      # Worker-facing interfaces bridging stages to engines
β”œβ”€β”€ models/         # Model-specific implementations (e.g., Qwen3-Omni)
β”œβ”€β”€ preprocessing/  # Multimodal input processing (audio, image, video, text)
β”œβ”€β”€ relay/          # Inter-stage data transfer backends
β”œβ”€β”€ proto/          # Request/message types and serialization
β”œβ”€β”€ client/         # High-level client wrapping the coordinator
└── serve/          # OpenAI-compatible REST API adapter

Core Components#

Pipeline Config and Compilation#

The config module (sglang_omni/config/) provides a declarative way to define pipelines. A PipelineConfig specifies:

  • Stages: Each StageConfig names a stage, references an executor factory function, and defines a routing function (get_next) that determines where results go.

  • Input handling: Stages can use DirectInput (pass-through) or AggregatedInput (fan-in from multiple upstream stages with a merge function).

  • Relay configuration: Backend type, device, buffer sizes, and flow control credits.

  • Endpoints: ZMQ endpoint allocation (IPC for single-machine, TCP for multi-node).

The compile_pipeline() function transforms the declarative config into runtime objects:

  1. Validates stage names, routing, and aggregation sources.

  2. Optionally applies stage fusion β€” merging consecutive stages into a single worker to avoid relay overhead.

  3. Allocates ZMQ endpoints.

  4. Instantiates executors from factory functions.

  5. Returns a (Coordinator, [Stage, ...]) tuple.

The PipelineRunner manages the lifecycle β€” starting the coordinator and all stages, then waiting for completion or errors.

Coordinator#

The Coordinator (sglang_omni/pipeline/coordinator.py) is the central request manager. It:

  • Registers stages and their ZMQ control endpoints.

  • Tracks every request through its lifecycle: PENDING β†’ RUNNING β†’ COMPLETED / FAILED / ABORTED.

  • Submits requests to the entry stage.

  • Receives completion messages and resolves per-request futures.

  • Broadcasts abort signals to all stages via ZMQ PUB/SUB.

  • Supports both blocking (submit()) and streaming (stream()) request patterns.

Stage#

A Stage (sglang_omni/pipeline/stage/runtime.py) is a processing unit that:

  • Receives incoming work via the ZMQ control plane.

  • Applies an input handler β€” either passing data directly or waiting for and merging inputs from multiple upstream stages.

  • Routes work to its worker pool using round-robin with sticky affinity (subsequent requests from the same source go to the same worker for cache locality).

  • Manages a relay instance for downstream data transfer.

  • Forwards completed results to the next stage(s) based on the routing function.

Worker#

Each Worker (sglang_omni/pipeline/worker/runtime.py) is a stateless request processor within a stage. It:

  • Dequeues work items from its stage.

  • Loads inputs β€” either inline or by fetching tensors from the relay using metadata.

  • Delegates execution to an Executor.

  • Dispatches results back to the stage for routing to downstream stages.

  • Handles many concurrent in-flight requests via asyncio tasks.

Control Plane#

The control plane (sglang_omni/pipeline/control_plane.py) uses ZMQ sockets for inter-stage messaging:

Socket

Direction

Purpose

PULL

Receive

Incoming work submissions

PUSH

Send

Forward work to next stages

SUB

Receive

Abort broadcasts from coordinator

Message types include SubmitMessage, DataReadyMessage, CompleteMessage, AbortMessage, StreamMessage, and ShutdownMessage, all serialized with msgpack.

Executors#

Executors (sglang_omni/executors/) bridge stages to compute backends:

  • PreprocessingExecutor: Wraps pure functions (e.g., tokenization, input normalization) as an executor interface.

  • EngineExecutor: Adapts an Engine (with batching and scheduling) to the executor interface, using request/result builders for payload transformation.

  • FusedExecutor: Chains multiple executors sequentially when stages are fused during compilation.

Engines#

The engines module (sglang_omni/engines/) provides compute backends:

  • OmniEngine: The main engine combining a Scheduler and ModelRunner. Its loop is: schedule() β†’ cache check β†’ execute() β†’ cache update β†’ update().

  • Scheduler: Model-agnostic request lifecycle manager. Manages states (WAITING β†’ RUNNING β†’ FINISHED), delegates to pluggable components: BatchPlanner, ResourceManager, IterationController.

  • ModelRunner: Stateless executor that consumes SchedulerOutput and calls the model forward pass.

Relay#

The relay module (sglang_omni/relay/) provides high-performance inter-stage data transfer. See Relay Design for full details.

Backend

Transport

Scope

Use Case

SHM

Shared memory

Single machine

Low-overhead, any GPU config

NCCL

GPU collectives

Multi-GPU

Synchronized GPU-GPU transfer

Nixl

RDMA

Multi-node

High-bandwidth cluster deployments

Mooncake

Cloud-optimized

Multi-node

Cloud environments

All backends implement the same Relay interface with put_async() / get_async() and are selected via a registry pattern.

Request Lifecycle#

        sequenceDiagram
    participant C as Client
    participant Co as Coordinator
    participant S1 as Stage 1
    participant W1 as Worker 1
    participant R as Relay
    participant S2 as Stage 2
    participant W2 as Worker 2

    C->>Co: submit(OmniRequest)
    Co->>S1: SubmitMessage (ZMQ)
    S1->>W1: route work
    W1->>W1: execute via Executor
    W1->>R: put_async(tensor)
    R-->>W1: metadata
    W1->>S2: DataReadyMessage(metadata) (ZMQ)
    S2->>W2: route work
    W2->>R: get_async(metadata)
    R-->>W2: tensor
    W2->>W2: execute via Executor
    W2->>Co: CompleteMessage (ZMQ)
    Co-->>C: result
    
  1. The Client submits an OmniRequest to the Coordinator.

  2. The Coordinator sends a SubmitMessage to the entry stage via ZMQ.

  3. The stage’s input handler processes the request and the router assigns it to a worker.

  4. The Worker executes the request through its Executor.

  5. If there is a downstream stage, the worker writes output tensors to the Relay and sends a DataReadyMessage with relay metadata to the next stage.

  6. The next stage’s worker fetches the tensor from the relay and continues processing.

  7. The final stage sends a CompleteMessage back to the Coordinator.

  8. The Coordinator resolves the client’s future with the result.

Data Flow Patterns#

Sequential#

        graph LR
    A["Stage A"] -->|relay| B["Stage B"] -->|relay| C["Stage C"]
    

The simplest pattern: each stage passes its output to the next.

Fan-Out#

        graph LR
    A["Preprocessing"] -->|relay| B["Image Encoder"]
    A -->|relay| C["Audio Encoder"]
    

A routing function directs the output to multiple downstream stages in parallel. Used when different modalities require separate encoders.

Fan-In (Aggregation)#

        graph LR
    B["Image Encoder"] -->|relay| D["Aggregate"]
    C["Audio Encoder"] -->|relay| D
    

An aggregated input handler waits for all upstream sources, then merges them with a custom function before passing to the worker.

Qwen3-Omni Pipeline Example#

A concrete example of these patterns combined:

        graph TD
    PP["Preprocessing<br/>(tokenize, template)"]
    IE["Image Encoder"]
    AE["Audio Encoder"]
    AGG["Aggregate<br/>(merge latents + tokens)"]
    TH["Thinker<br/>(autoregressive decoder)"]
    DEC["Decode<br/>(audio synthesis)"]

    PP -->|"has images"| IE
    PP -->|"has audio"| AE
    IE --> AGG
    AE --> AGG
    AGG --> TH
    TH --> DEC
    

Preprocessing fans out to active encoders based on input modalities. The aggregate stage fans in, merging encoder outputs with text tokens. The thinker runs autoregressive decoding, and the decode stage synthesizes audio output.

Configuration Example#

from sglang_omni.config import (
    ExecutorConfig, PipelineConfig, PipelineRunner,
    StageConfig, compile_pipeline,
)

config = PipelineConfig(
    name="my_pipeline",
    entry_stage="preprocess",
    stages=[
        StageConfig(
            name="preprocess",
            executor=ExecutorConfig(
                factory="my_project.executors.create_preprocess",
                args={},
            ),
            get_next="my_project.routing.preprocess_next",
        ),
        StageConfig(
            name="thinker",
            executor=ExecutorConfig(
                factory="my_project.executors.create_thinker",
                args={"model_path": "Qwen/Qwen3-Omni"},
            ),
            get_next="my_project.routing.end",
        ),
    ],
)

coordinator, stages = compile_pipeline(config)
runner = PipelineRunner(coordinator, stages)

Communication Layers#

Layer

Technology

Purpose

Control plane

ZMQ (PUSH/PULL/SUB)

Stage-to-stage work submission, abort broadcasts

Data plane

SHM / NCCL / Nixl / Mooncake

Tensor transfer between stages

Request tracking

In-memory dict + asyncio futures

Coordinator request lifecycle

Worker scheduling

asyncio + queues

Concurrent request dispatch within stages

External API

FastAPI + Uvicorn

OpenAI-compatible HTTP endpoints

Key Design Decisions#

Why ZMQ for the control plane? ZMQ provides lightweight, brokerless messaging with flexible socket patterns (PUSH/PULL for work distribution, PUB/SUB for broadcasts). It avoids the overhead of a message broker while supporting both IPC and TCP transports.

Why separate control and data planes? Control messages (submit, complete, abort) are small and latency-sensitive. Tensor data is large and throughput-sensitive. Separating them allows each to use the optimal transport β€” ZMQ for control, RDMA/NCCL for data.

Why configuration-driven pipelines? Declarative configs make pipelines reproducible, versionable, and easy to modify. The compilation step handles all wiring, endpoint allocation, and optimization (like stage fusion) automatically.

Why sticky worker affinity? When a request returns to the same stage (e.g., for iterative decoding), routing it to the same worker preserves KV cache locality, avoiding redundant recomputation.