Перейти к основному содержимому

Чтобы получить чистый Markdown этой страницы, добавьте .md к этому URL. Полный индекс документации см. на https://docs.nvidia.com/dynamo/llms.txt. Полное содержимое, включая справочник API и примеры SDK, см. на https://docs.nvidia.com/dynamo/llms-full.txt.

Написание унифицированного бэкенда на Python

Написание унифицированного бэкенда на Python

Новое — унифицированный бэкенд Dynamo. В этом руководстве описана новая инфраструктура унифицированного бэкенда в dynamo.common.backend: общий ABC LLMEngine, который уже реализуют vLLM, SGLang, TRT-LLM и пример движка, и к которому любой пользовательский Python-движок может подключиться тем же способом. Версию того же контракта для Rust см. в написании унифицированного бэкенда на Rust. Более старый низкоуровневый путь Python worker (register_model + serve_endpoint), который по-прежнему остается правильным выбором для возможностей, еще не покрытых унифицированным бэкендом, описан в написании Python workers.

Beta — активно разрабатывается. Поверхность унифицированного бэкенда находится в beta-качестве и может меняться между релизами без обратной совместимости. См. раздел пробелы в возможностях ниже, чтобы понять, что сегодня покрывает унифицированный путь по сравнению с существующими неунифицированными путями бэкендов.

Это руководство пошагово показывает, как создать Python-бэкенд для inference engine, который подключается к распределенному runtime Dynamo через dynamo.common.backend. "Унифицированный бэкенд" — это Python entry point, который реализует общий ABC LLMEngine и позволяет framework владеть жизненным циклом runtime (обработка сигналов, регистрация модели, корректное завершение, мониторинг отмены), а ваш код отвечает только за inference.

Ваш бэкенд живет в собственном пакете и не обязан быть частью репозитория dynamo. Он зависит от ai-dynamo из PyPI (или от git-исходников) и импортирует dynamo.common.backend. Шаги ниже предполагают, что вы начинаете новый пакет в собственном репозитории.

Эталонный пример — sample engine в sample_engine.py: полная запускаемая реализация короче 120 строк. Читайте ее параллельно с этим руководством.

Где что искать:

  • Это руководство — пошаговое объяснение для тех, кто создает новый бэкенд с нуля.
  • Docstrings ABC LLMEngine — авторитетный контракт по каждому методу.
  • README пакета — встроенный в дерево справочник: определения полей GenerateRequest / GenerateChunk, cookbook отмены по движкам (vLLM / SGLang / TRT-LLM), полная таблица DynamoException, индекс файлов и матрица пробелов в возможностях по движкам.

Пробелы в возможностях

Унифицированный бэкенд находится в beta. Ниже приведена общая сводка контракта: что получает каждый движок на унифицированном пути, а также пробелы, применимые ко всем трем движкам. Особенности отдельных движков (vLLM sleep/wake, SGLang diffusion, пользовательские logits processors в TRT-LLM и т. д.) находятся в README пакета.

Поддерживается сегодня

Жизненный цикл и runtime:

  • Агрегированный token-in-token-out inference
  • Disaggregated serving (agg / prefill / decode) — KV transfer использует NIXL во всех трех движках; SGLang обменивается bootstrap-адресом уровня Dynamo (host/port/room), vLLM и TRT-LLM используют внутреннее рукопожатие движка
  • Регистрация модели с discovery и типами endpoint
  • Отмена запросов через abort() + context.is_stopped()
  • Корректное завершение с обработкой сигналов
  • Хук drain() для работы перед cleanup (например, для in-flight NIXL transfers)
  • Обертывание цепочки ошибок DynamoException
  • Нормализация finish reason (обрабатывается Rust-слоем)

Наблюдаемость:

  • Health-check canary через health_check_payload() (плюс переопределения DYN_HEALTH_CHECK_PAYLOAD / --health-check-payload)
  • Prometheus bridge с vendor-префиксами (vllm: / sglang: / trtllm_ / lmcache:) через register_prometheus()
  • Framework-owned lifecycle gauges (cleanup_time_seconds, drain_time_seconds, model_load_time_seconds) — всегда включены
  • Per-rank gauges dynamo_component_* + сигнал router kv_used_blocks через component_metrics_dp_ranks() + attach_snapshot_publisher() + push ComponentSnapshot
  • Публикация KV events через kv_event_sources(), возвращающий ZmqSource или PushSource
  • KV-aware routing (с учетом DP-rank) через dp_rank.forced_dp_rank / validate_global_dp_rank + EngineConfig.data_parallel_{size, start_rank}
  • Фасад OpenTelemetry tracing — telemetry.current_span / start_span плюс распространение W3C trace headers через telemetry.engine_trace_kwargs(context)

Обработка запросов:

  • Guided decoding — подключен на стороне запроса по движкам с покрытием, специфичным для каждого движка. vLLM (StructuredOutputsParams) и TRT-LLM (GuidedDecodingParams) покрывают JSON schema / regex / grammar / choice; SGLang (_get_guided_decoding_params) покрывает только JSON schema, а regex / grammar / choice сегодня молча отбрасываются (см. SGLang-specific gaps в README пакета)
  • Генерация structural tags через WorkerConfig.structural_tag_{mode, scope, schema} и serialize_structural_tag
  • Пользовательские Jinja chat templates через WorkerConfig.custom_jinja_template (frontend применяет, а backend объявляет через регистрацию модели)
  • Конфигурация parser для tools / reasoning (tool_call_parser, reasoning_parser, exclude_tools_when_tool_choice_none)

Пока не реализовано на унифицированном пути (общее для всех движков)

ВозможностьЧего не хватает
Logprob response wireLegacy handlers извлекают logprobs в response chunks (vLLM _extract_logprobs, SGLang _extract_logprobs в decode_handler, TRT-LLM _extract_logprobs в handler_base); унифицированные циклы generate() не заполняют log_probs / top_logprobs / cum_log_probs в GenerateChunk. build_sampling_params в vLLM по-прежнему передает output_options.logprobs в движок на унифицированном пути, поэтому движок их вычисляет, но значения отбрасываются до попадания в chunk. Унифицированные generate() в SGLang и TRT-LLM вообще не читают output_options.logprobs.
Text-in-text-out modeУнифицированный путь жестко задает ModelInput.Tokens; нет пути tokenization или chat templating на стороне движка
MultimodalImages / video / embeddings, NIXL embedding transfer, отдельные encode workers, роль disaggregation ENCODE
DiffusionImage (FLUX), video (Wan2.1), LLM diffusion (DLLM) workers; нет diffusion engine, MediaOutput или media scheduling на унифицированном пути
LoRA adaptersДинамические load / unload / list, публикация ModelDeploymentCard, per-adapter serialization locks, per-request adapter threading на prefill
Engine routesProfile start/stop, sleep / wake / quiesce, обновления весов (disk / tensor / distributed / IPC), очистка KV blocks, сброс prefix cache
Snapshot / checkpointСохранение/восстановление состояния движка на базе CRIU + identity reload

Если вам сегодня нужна одна из этих возможностей, оставьте такую нагрузку на существующем per-engine entry point (dynamo.<backend>.main), пока унифицированный путь не догонит его.

Что вы создаете

Бэкенд состоит из двух частей:

  1. Класс движка, наследующий LLMEngine: владеет моделью, принимает предварительно обработанные token requests и стримит output chunks.
  2. Entry point main.py: трехстрочный shim, который передает класс движка в run() из dynamo.common.backend.run; он управляет жизненным циклом.

Пакет dynamo.common.backend берет на себя все остальное: обработку сигналов, настройку distributed runtime, регистрацию модели с discovery, serving loop, корректное завершение, мониторинг отмены и обертывание цепочки ошибок. (Сама state machine жизненного цикла живет в Rust; dynamo.common.backend.Worker — тонкий Python shim поверх нее.)

from_args → start() → generate() / abort() → drain() → cleanup()
| | | | |
parse argv, start engine, serve requests pre-cleanup release
return return (concurrent) drain resources
engine metadata

Предварительные требования

  • Python 3.11 или новее. dynamo использует typing.Required, доступный с 3.11.
  • NATS и etcd, доступные для end-to-end запусков. В репозитории dynamo deploy/docker-compose.yml поднимает оба сервиса одной командой, если они еще не запущены.
  • uv или pip для установки зависимостей.
  • Знакомство с async Python (asyncio, async generators) и argparse.

Шаг 1: Создайте пакет

my-backend/
├── pyproject.toml
└── src/
└── my_backend/
├── __init__.py
├── engine.py
└── main.py

Минимальный pyproject.toml:

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "my-backend"
version = "0.1.0"
requires-python = ">=3.11"
dependencies = [
# ai-dynamo bundles dynamo.common.backend. Pin to the release whose
# LLMEngine contract you tested against — the surface is still beta
# and may change between releases.
"ai-dynamo>=1.2.0",
]

[project.optional-dependencies]
dev = ["pytest>=8", "pytest-asyncio>=0.23"]

[project.scripts]
my-backend = "my_backend.main:main"

Для bleeding-edge зависимости от дерева исходников dynamo установите runtime wheel из клона:

git clone https://github.com/ai-dynamo/dynamo.git
pip install maturin
cd dynamo/lib/bindings/python && maturin build --release --out /tmp/wheels
pip install /tmp/wheels/*.whl # ai-dynamo-runtime
pip install /path/to/dynamo # ai-dynamo (components/ tree)

Для сборки wheel нужен Rust toolchain, а также clang, cmake, protobuf-compiler и libssl-dev.

Шаг 2: Унаследуйтесь от LLMEngine

В src/my_backend/engine.py объявите класс, который наследует LLMEngine и владеет всем состоянием, нужным вашему движку. Создание объекта должно быть дешевым и без побочных эффектов: тяжелая работа выполняется в start().

# src/my_backend/engine.py
from __future__ import annotations

import argparse
import asyncio
from collections.abc import AsyncGenerator

from dynamo._core import Context
from dynamo.common.backend import (
EngineConfig,
GenerateChunk,
GenerateRequest,
LLMEngine,
WorkerConfig,
)


class MyBackend(LLMEngine):
def __init__(self, model_name: str, max_tokens: int = 16):
self.model_name = model_name
self.max_tokens = max_tokens
# Heavy state (engine handles, schedulers, KV allocators) is
# left None here and initialized in start().
self._engine = None

GenerateRequest и GenerateChunk — это TypedDict, описывающие общую форму; поля см. в шаге 4.

Шаг 3: Реализуйте from_args

from_args — это classmethod-фабрика, которая разбирает CLI args и возвращает (engine, WorkerConfig). Движок создается, но не запускается.

@classmethod
async def from_args(
cls, argv: list[str] | None = None
) -> tuple[MyBackend, WorkerConfig]:
parser = argparse.ArgumentParser(prog="my-backend")
parser.add_argument("--model-name", default="my-model")
parser.add_argument("--max-tokens", type=int, default=16)
# Runtime / discovery flags — every unified backend needs these.
parser.add_argument("--namespace", default="dynamo")
parser.add_argument("--component", default="backend")
parser.add_argument("--endpoint", default="generate")
parser.add_argument("--endpoint-types", default="chat,completions")
parser.add_argument("--discovery-backend", default="etcd")
parser.add_argument("--request-plane", default="tcp")
parser.add_argument("--event-plane", default=None)
args = parser.parse_args(argv)

engine = cls(model_name=args.model_name, max_tokens=args.max_tokens)
worker_config = WorkerConfig(
namespace=args.namespace,
component=args.component,
endpoint=args.endpoint,
model_name=args.model_name,
served_model_name=args.model_name,
endpoint_types=args.endpoint_types,
discovery_backend=args.discovery_backend,
request_plane=args.request_plane,
event_plane=args.event_plane,
)
return engine, worker_config

from_args является async, чтобы соответствовать ABC; из него можно делать await, если ваш разбор CLI читает конфигурацию из файла или обращается к API. Большинству бэкендов это не нужно.

Для бэкендов, у которых уже есть объект конфигурации формы DynamoRuntimeConfig (например, производных от существующих конфигураций vLLM, SGLang или TRT-LLM), предпочтительнее использовать helper WorkerConfig.from_runtime_config(runtime_cfg, model_name=...): он одной строкой забирает из конфигурации общие поля discovery / request-plane / parser.

Шаг 4: Реализуйте методы LLMEngine

ABC содержит три обязательных метода (start, generate, cleanup) и два метода с no-op реализациями по умолчанию (abort, drain).

start()

Запустите движок и верните metadata EngineConfig. После возврата из этого метода generate() ДОЛЖЕН быть готов к конкурентным вызовам.

async def start(self, worker_id: int) -> EngineConfig:
# ... load weights, build scheduler, warm up CUDA, etc.
# Heavy: may take minutes. Emit logger.info checkpoints so
# operators see progress (Worker logs around start() but not
# inside it).
self._engine = await heavy_init(self.model_name)

return EngineConfig(
model=self.model_name,
served_model_name=self.model_name,
context_length=8192,
kv_cache_block_size=16, # None if no block-structured KV
total_kv_blocks=1024,
max_num_seqs=64,
max_num_batched_tokens=8192,
)

worker_id — непрозрачный идентификатор конкретного worker; большинство движков его игнорирует. Бэкенды, которым нужен стабильный ключ в масштабе кластера (например, snowflake disagg_machine_id в TRT-LLM), должны выводить его из worker_id, а не хешировать host/pid и не просить операторов задавать CLI override.

Все поля EngineConfig, кроме model, являются необязательными. None означает "не объявлять"; KV-aware routing откатывается к round-robin, если KV поля не заданы.

generate()

Async generator, который выдает dict GenerateChunk для одного запроса. Вызывается конкурентно для нескольких in-flight запросов.

Контракт (форма chunk определяется TypedDict GenerateChunk; справочник по полям см. в типам запросов / ответов в README пакета):

  • Каждый chunk несет token_ids и index (используйте 0 для единственного choice).
  • Финальный chunk дополнительно несет finish_reason и completion_usage.
  • Монитор отмены framework вызывает engine.abort(context), когда клиент отключается или отменяет запрос; ваш цикл также должен опрашивать context.is_stopped() между yields и корректно выходить с chunk finish_reason="cancelled".
async def generate(
self, request: GenerateRequest, context: Context
) -> AsyncGenerator[GenerateChunk, None]:
prompt_tokens = list(request.get("token_ids", []))
prompt_len = len(prompt_tokens)

stop_conditions = request.get("stop_conditions") or {}
max_new = stop_conditions.get("max_tokens") or self.max_tokens

def _usage(completion_tokens: int) -> dict[str, int]:
return {
"prompt_tokens": prompt_len,
"completion_tokens": completion_tokens,
"total_tokens": prompt_len + completion_tokens,
}

for i in range(max_new):
if context.is_stopped():
yield {
"token_ids": [],
"index": 0,
"finish_reason": "cancelled",
"completion_usage": _usage(i),
}
return

token_id = await self._next_token(prompt_tokens)

chunk: GenerateChunk = {"token_ids": [token_id], "index": 0}
if i == max_new - 1:
chunk["finish_reason"] = "length"
chunk["completion_usage"] = _usage(max_new)
yield chunk

Нормализация finish reason ("abort""cancelled" и т. д.) выполняется Rust-слоем: отдавайте то, что нативно использует ваш движок.

abort(context) — необязательно

Вызывается framework только когда клиент отключается или запрос отменяется. НЕ вызывается при silent stream drops. Переопределите его, чтобы освобождать ресурсы на стороне движка (KV slots, scheduler entries, remote schedulers):

async def abort(self, context: Context) -> None:
request_id = context.id()
await self._engine.cancel(request_id)

Для cleanup, который должен выполняться на каждом пути drop, включая silent drops, используйте try/finally или context manager внутри generate, а не abort. Sample engine не переопределяет abort, потому что у него нет состояния на стороне движка, которое нужно освобождать; значение по умолчанию — no-op.

drain() — необязательно

Выполняется один раз перед shutdown, после discovery unregister и сна на grace-period, пока NATS/etcd еще живы. Используйте его для draining на стороне бэкенда, который должен завершиться до teardown транспорта (например, in-flight NIXL KV transfers на prefill workers). По умолчанию это no-op.

cleanup()

Два реальных требования, оба закреплены Rust-side conformance kit:

  • Null-safe при частичном сбое start(). Если start() падает на середине, поля, которые вы выделяете постепенно, могут все еще быть None. cleanup() должен защищать каждый ресурс (if self._engine is not None: …), чтобы вызов после сбоя не падал на наполовину инициализированном состоянии.
  • Идемпотентность. Второй вызов после успешного первого должен чисто вернуться без повторного входа в teardown.

Rust Worker управляет обоими случаями: он вызывает cleanup() после успешного возврата start() во время shutdown, а conformance kit (run_conformance) дополнительно вызывает cleanup() на никогда не запускавшемся движке и два раза подряд, проваливая ваши тесты с CleanupWithoutStartFailed / SecondCleanupFailed, если любой из инвариантов нарушен. Паттерн guarded single-shot ниже покрывает оба:

async def cleanup(self) -> None:
if self._engine is not None:
await self._engine.shutdown()
self._engine = None

Шаг 5: Напишите main.py

Три строки.

# src/my_backend/main.py
from dynamo.common.backend.run import run
from .engine import MyBackend


def main() -> None:
run(MyBackend)


if __name__ == "__main__":
main()

run устанавливает обработчики сигналов, строит distributed runtime, вызывает engine.start(worker_id) с идентификатором, выделенным runtime, регистрирует модель в discovery, обслуживает endpoint и запускает orchestrator корректного shutdown по SIGTERM/SIGINT.

Сопоставьте это с записью [project.scripts] из pyproject.toml в шаге 1, чтобы my-backend ... работал как консольная команда.

Шаг 6: Ошибки и логирование

Ошибки: framework оборачивает ошибки не типа DynamoException, поднятые из generate() (или методов жизненного цикла), как Unknown. Для типизированной отчетности об ошибках напрямую поднимайте subclass DynamoException из dynamo.llm.exceptions: он проходит через Rust bridge без изменений.

from dynamo.llm.exceptions import InvalidArgument

async def generate(self, request, context):
if not request.get("token_ids"):
raise InvalidArgument("empty prompt")
...

README пакета содержит полную таблицу типов исключений и указывает, какая фаза жизненного цикла какой тип поднимает. Ошибки инициализации движка должны поднимать EngineShutdown из start(). Cleanup обычно не должен поднимать исключения: логируйте и подавляйте, если subsystem дает сбой.

Логирование: держите уровни согласованными между унифицированными бэкендами, чтобы операторы видели одинаковую поверхность независимо от выбранного движка:

  • logger.info — milestones жизненного цикла (инициализация движка завершена, serving started, engine shutdown).
  • logger.debug — per-request events (request abort, cancellation).
  • logger.warning — recoverable problems (empty outputs, unexpected finish reasons).
  • logger.error — только unrecoverable failures.

Framework также настраивает dynamo.runtime.logging за вас; просто вызовите logger = logging.getLogger(__name__) в начале модуля и используйте его.

Шаг 7: Протестируйте движок

Установите dev extras (pytest, pytest-asyncio), объявленные в шаге 1:

pip install -e ".[dev]"

У sample engine есть unit-test набор unit-тестов, который можно скопировать как отправную точку. Форма полезного теста:

import pytest

from my_backend import MyBackend


class _StubContext:
def __init__(self, stopped: bool = False) -> None:
self._stopped = stopped

def is_stopped(self) -> bool:
return self._stopped

def stop(self) -> None:
self._stopped = True


@pytest.mark.asyncio
async def test_generate_emits_terminal_chunk():
engine = MyBackend(model_name="m", max_tokens=3)
await engine.start(worker_id=0)
try:
chunks = [
chunk
async for chunk in engine.generate(
{"token_ids": [1, 2, 3]}, _StubContext()
)
]
assert chunks[-1]["finish_reason"] in ("stop", "length")
assert chunks[-1]["completion_usage"]["completion_tokens"] == 3
finally:
await engine.cleanup()


@pytest.mark.asyncio
async def test_generate_observes_cancellation():
engine = MyBackend(model_name="m", max_tokens=1000)
await engine.start(worker_id=0)
try:
ctx = _StubContext()
collected = []
async for chunk in engine.generate({"token_ids": [1]}, ctx):
collected.append(chunk)
if len(collected) >= 2:
ctx.stop()
assert collected[-1]["finish_reason"] == "cancelled"
finally:
await engine.cleanup()

Покройте happy path, отмену и любые backend-specific edge cases (stop tokens, max-tokens cap, empty prompt). Трех-пяти focused tests достаточно: framework уже закрепляет state machine жизненного цикла и контракт отмены Rust-side тестами в lib/backend-common.

Шаг 8: Запустите локально

Нужно поднять три движущиеся части: NATS + etcd (discovery и event/request planes), Dynamo frontend (HTTP → backend discovery) и ваш backend.

pip install -e .

# Ensure NATS + etcd are reachable (NATS_SERVER, ETCD_ENDPOINTS).
# --model-name must be a valid HuggingFace repo (or local path); the
# framework fetches the tokenizer + chat template from it on startup.
# Pick a small public repo for smoke tests.
my-backend --model-name Qwen/Qwen3-0.6B --namespace dynamo

# In another shell, start the Dynamo frontend:
python -m dynamo.frontend --http-port 8000

Затем отправьте запрос:

curl http://localhost:8000/v1/chat/completions \
-H 'Content-Type: application/json' \
-d '{
"model": "Qwen/Qwen3-0.6B",
"messages": [{"role": "user", "content": "hello"}],
"max_tokens": 32
}'

Успешный ответ содержит непустой choices[0].message.content и finish_reason со значением stop или length. jq -e '.choices[0].finish_reason' — хороший one-liner для CI smoke test.

Если ваш backend выглядит молчащим, задайте DYN_LOG=info (или DYN_LOG=debug,dynamo=debug для более точного scoping) перед запуском: framework настраивает tracing из DYN_LOG.

Справочник: sample engine

sample_engine.py — канонический минимальный reference. Запустите его как есть:

python -m dynamo.common.backend.sample_main --model-name test-model

Он генерирует вращающиеся token IDs без ML-зависимостей, поэтому полезен как замена для AIPerf / end-to-end pipeline smoke tests. Используйте эти паттерны:

  • from_args разбирает CLI args и возвращает (engine, WorkerConfig) без awaits.
  • start() возвращает EngineConfig, KV-поля которого являются иллюстративными и не несут нагрузки (нет настоящего KV cache).
  • generate() опрашивает context.is_stopped() между yields и выдает terminal cancelled при наблюдении отмены.
  • cleanup() — no-op, потому что движок не держит ресурсов.

Контрольный список

Перед shipping:

  • LLMEngine унаследован; from_args возвращает (engine, WorkerConfig).
  • start() возвращает EngineConfig как минимум с непустым model.
  • generate() опрашивает context.is_stopped() между yields и выдает terminal "cancelled" при наблюдении отмены.
  • Финальный chunk содержит finish_reason и completion_usage.
  • Typed subclasses DynamoException используются для отчетности об ошибках там, где важна категория.
  • cleanup() освобождает все ресурсы движка.
  • Уровни логирования соответствуют стандартам из шага 6.

См. также