Чтобы получить чистый Markdown этой страницы, добавьте .md к этому URL. Полный индекс документации см. на https://docs.nvidia.com/dynamo/llms.txt. Полное содержимое, включая справочник API и примеры SDK, см. на https://docs.nvidia.com/dynamo/llms-full.txt.
Написание унифицированного бэкенда на Python
Написание унифицированного бэкенда на Python
Новое — унифицированный бэкенд Dynamo. В этом руководстве описана новая инфраструктура унифицированного бэкенда в
dynamo.common.backend: общий ABCLLMEngine, который уже реализуют vLLM, SGLang, TRT-LLM и пример движка, и к которому любой пользовательский Python-движок может подключиться тем же способом. Версию того же контракта для Rust см. в написании унифицированного бэкенда на Rust. Более старый низкоуровневый путь Python worker (register_model+serve_endpoint), который по-прежнему остается правильным выбором для возможностей, еще не покрытых унифицированным бэкендом, описан в написании Python workers.Beta — активно разрабатывается. Поверхность унифицированного бэкенда находится в beta-качестве и может меняться между релизами без обратной совместимости. См. раздел пробелы в возможностях ниже, чтобы понять, что сегодня покрывает унифицированный путь по сравнению с существующими неунифицированными путями бэкендов.
Это руководство пошагово показывает, как создать Python-бэкенд для inference
engine, который подключается к распределенному runtime Dynamo через
dynamo.common.backend. "Унифицированный бэкенд" — это Python entry point,
который реализует общий ABC LLMEngine и позволяет framework владеть жизненным
циклом runtime (обработка сигналов, регистрация модели, корректное завершение,
мониторинг отмены), а ваш код отвечает только за inference.
Ваш бэкенд живет в собственном пакете и не обязан быть частью репозитория
dynamo. Он зависит от ai-dynamo из PyPI (или от git-исходников) и импортирует
dynamo.common.backend. Шаги ниже предполагают, что вы начинаете новый пакет в
собственном репозитории.
Эталонный пример — sample engine в
sample_engine.py:
полная запускаемая реализация короче 120 строк. Читайте ее параллельно с этим
руководством.
Где что искать:
- Это руководство — пошаговое объяснение для тех, кто создает новый бэкенд с нуля.
- Docstrings ABC
LLMEngine— авторитетный контракт по каждому методу. - README пакета
— встроенный в дерево справочник: определения полей
GenerateRequest/GenerateChunk, cookbook отмены по движкам (vLLM / SGLang / TRT-LLM), полная таблицаDynamoException, индекс файлов и матрица пробелов в возможностях по движкам.
Пробелы в возможностях
Унифицированный бэкенд находится в beta. Ниже приведена общая сводка контракта: что получает каждый движок на унифицированном пути, а также пробелы, применимые ко всем трем движкам. Особенности отдельных движков (vLLM sleep/wake, SGLang diffusion, пользовательские logits processors в TRT-LLM и т. д.) находятся в README пакета.
Поддерживается сегодня
Жизненный цикл и runtime:
- Агрегированный token-in-token-out inference
- Disaggregated serving (
agg/prefill/decode) — KV transfer использует NIXL во всех трех движках; SGLang обменивается bootstrap-адресом уровня Dynamo (host/port/room), vLLM и TRT-LLM используют внутреннее рукопожатие движка - Регистрация модели с discovery и типами endpoint
- Отмена запросов через
abort()+context.is_stopped() - Корректное завершение с обработкой сигналов
- Хук
drain()для работы перед cleanup (например, для in-flight NIXL transfers) - Обертывание цепочки ошибок
DynamoException - Нормализация finish reason (обрабатывается Rust-слоем)
Наблюдаемость:
- Health-check canary через
health_check_payload()(плюс переопределенияDYN_HEALTH_CHECK_PAYLOAD/--health-check-payload) - Prometheus bridge с vendor-префиксами (
vllm:/sglang:/trtllm_/lmcache:) черезregister_prometheus() - Framework-owned lifecycle gauges (
cleanup_time_seconds,drain_time_seconds,model_load_time_seconds) — всегда включены - Per-rank gauges
dynamo_component_*+ сигнал routerkv_used_blocksчерезcomponent_metrics_dp_ranks()+attach_snapshot_publisher()+ pushComponentSnapshot - Публикация KV events через
kv_event_sources(), возвращающийZmqSourceилиPushSource - KV-aware routing (с учетом DP-rank) через
dp_rank.forced_dp_rank/validate_global_dp_rank+EngineConfig.data_parallel_{size, start_rank} - Фасад OpenTelemetry tracing —
telemetry.current_span/start_spanплюс распространение W3C trace headers черезtelemetry.engine_trace_kwargs(context)
Обработка запросов:
- Guided decoding — подключен на стороне запроса по движкам с покрытием,
специфичным для каждого движка. vLLM (
StructuredOutputsParams) и TRT-LLM (GuidedDecodingParams) покрывают JSON schema / regex / grammar / choice; SGLang (_get_guided_decoding_params) покрывает только JSON schema, а regex / grammar / choice сегодня молча отбрасываются (см. SGLang-specific gaps в README пакета) - Генерация structural tags через
WorkerConfig.structural_tag_{mode, scope, schema}иserialize_structural_tag - Пользовательские Jinja chat templates через
WorkerConfig.custom_jinja_template(frontend применяет, а backend объявляет через регистрацию модели) - Конфигурация parser для tools / reasoning (
tool_call_parser,reasoning_parser,exclude_tools_when_tool_choice_none)
Пока не реализовано на унифицированном пути (общее для всех движков)
| Возможность | Чего не хватает |
|---|---|
| Logprob response wire | Legacy handlers извлекают logprobs в response chunks (vLLM _extract_logprobs, SGLang _extract_logprobs в decode_handler, TRT-LLM _extract_logprobs в handler_base); унифицированные циклы generate() не заполняют log_probs / top_logprobs / cum_log_probs в GenerateChunk. build_sampling_params в vLLM по-прежнему передает output_options.logprobs в движок на унифицированном пути, поэтому движок их вычисляет, но значения отбрасываются до попадания в chunk. Унифицированные generate() в SGLang и TRT-LLM вообще не читают output_options.logprobs. |
| Text-in-text-out mode | Унифицированный путь жестко задает ModelInput.Tokens; нет пути tokenization или chat templating на стороне движка |
| Multimodal | Images / video / embeddings, NIXL embedding transfer, отдельные encode workers, роль disaggregation ENCODE |
| Diffusion | Image (FLUX), video (Wan2.1), LLM diffusion (DLLM) workers; нет diffusion engine, MediaOutput или media scheduling на унифицированном пути |
| LoRA adapters | Динамические load / unload / list, публикация ModelDeploymentCard, per-adapter serialization locks, per-request adapter threading на prefill |
| Engine routes | Profile start/stop, sleep / wake / quiesce, обновления весов (disk / tensor / distributed / IPC), очистка KV blocks, сброс prefix cache |
| Snapshot / checkpoint | Сохранение/восстановление состояния движка на базе CRIU + identity reload |
Если вам сегодня нужна одна из этих возможностей, оставьте такую нагрузку на
существующем per-engine entry point (dynamo.<backend>.main), пока
унифицированный путь не догонит его.
Что вы создаете
Бэкенд состоит из двух частей:
- Класс движка, наследующий
LLMEngine: владеет моделью, принимает предварительно обработанные token requests и стримит output chunks. - Entry point
main.py: трехстрочный shim, который передает класс движка вrun()изdynamo.common.backend.run; он управляет жизненным циклом.
Пакет dynamo.common.backend берет на себя все остальное: обработку сигналов,
настройку distributed runtime, регистрацию модели с discovery, serving loop,
корректное завершение, мониторинг отмены и обертывание цепочки ошибок. (Сама
state machine жизненного цикла живет в Rust; dynamo.common.backend.Worker —
тонкий Python shim поверх нее.)
from_args → start() → generate() / abort() → drain() → cleanup()
| | | | |
parse argv, start engine, serve requests pre-cleanup release
return return (concurrent) drain resources
engine metadata
Предварительные требования
- Python 3.11 или новее.
dynamoиспользуетtyping.Required, доступный с 3.11. - NATS и etcd, доступные для end-to-end запусков. В репозитории dynamo
deploy/docker-compose.ymlподнимает оба сервиса одной командой, если они еще не запущены. uvилиpipдля установки зависимостей.- Знакомство с
asyncPython (asyncio, async generators) иargparse.
Шаг 1: Создайте пакет
my-backend/
├── pyproject.toml
└── src/
└── my_backend/
├── __init__.py
├── engine.py
└── main.py
Минимальный pyproject.toml:
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "my-backend"
version = "0.1.0"
requires-python = ">=3.11"
dependencies = [
# ai-dynamo bundles dynamo.common.backend. Pin to the release whose
# LLMEngine contract you tested against — the surface is still beta
# and may change between releases.
"ai-dynamo>=1.2.0",
]
[project.optional-dependencies]
dev = ["pytest>=8", "pytest-asyncio>=0.23"]
[project.scripts]
my-backend = "my_backend.main:main"
Для bleeding-edge зависимости от дерева исходников dynamo установите runtime wheel из клона:
git clone https://github.com/ai-dynamo/dynamo.git
pip install maturin
cd dynamo/lib/bindings/python && maturin build --release --out /tmp/wheels
pip install /tmp/wheels/*.whl # ai-dynamo-runtime
pip install /path/to/dynamo # ai-dynamo (components/ tree)
Для сборки wheel нужен Rust toolchain, а также clang, cmake,
protobuf-compiler и libssl-dev.
Шаг 2: Унаследуйтесь от LLMEngine
В src/my_backend/engine.py объявите класс, который наследует LLMEngine и
владеет всем состоянием, нужным вашему движку. Создание объекта должно быть
дешевым и без побочных эффектов: тяжелая работа выполняется в start().
# src/my_backend/engine.py
from __future__ import annotations
import argparse
import asyncio
from collections.abc import AsyncGenerator
from dynamo._core import Context
from dynamo.common.backend import (
EngineConfig,
GenerateChunk,
GenerateRequest,
LLMEngine,
WorkerConfig,
)
class MyBackend(LLMEngine):
def __init__(self, model_name: str, max_tokens: int = 16):
self.model_name = model_name
self.max_tokens = max_tokens
# Heavy state (engine handles, schedulers, KV allocators) is
# left None here and initialized in start().
self._engine = None
GenerateRequest и GenerateChunk — это TypedDict, описывающие общую форму;
поля см. в шаге 4.
Шаг 3: Реализуйте from_args
from_args — это classmethod-фабрика, которая разбирает CLI args и возвращает
(engine, WorkerConfig). Движок создается, но не запускается.
@classmethod
async def from_args(
cls, argv: list[str] | None = None
) -> tuple[MyBackend, WorkerConfig]:
parser = argparse.ArgumentParser(prog="my-backend")
parser.add_argument("--model-name", default="my-model")
parser.add_argument("--max-tokens", type=int, default=16)
# Runtime / discovery flags — every unified backend needs these.
parser.add_argument("--namespace", default="dynamo")
parser.add_argument("--component", default="backend")
parser.add_argument("--endpoint", default="generate")
parser.add_argument("--endpoint-types", default="chat,completions")
parser.add_argument("--discovery-backend", default="etcd")
parser.add_argument("--request-plane", default="tcp")
parser.add_argument("--event-plane", default=None)
args = parser.parse_args(argv)
engine = cls(model_name=args.model_name, max_tokens=args.max_tokens)
worker_config = WorkerConfig(
namespace=args.namespace,
component=args.component,
endpoint=args.endpoint,
model_name=args.model_name,
served_model_name=args.model_name,
endpoint_types=args.endpoint_types,
discovery_backend=args.discovery_backend,
request_plane=args.request_plane,
event_plane=args.event_plane,
)
return engine, worker_config
from_args является async, чтобы соответствовать ABC; из него можно делать
await, если ваш разбор CLI читает конфигурацию из файла или обращается к API.
Большинству бэкендов это не нужно.
Для бэкендов, у которых уже есть объект конфигурации формы
DynamoRuntimeConfig (например, производных от существующих конфигураций vLLM,
SGLang или TRT-LLM), предпочтительнее использовать helper
WorkerConfig.from_runtime_config(runtime_cfg, model_name=...): он одной
строкой забирает из конфигурации общие поля discovery / request-plane / parser.
Шаг 4: Реализуйте методы LLMEngine
ABC содержит три обязательных метода (start, generate, cleanup) и два
метода с no-op реализациями по умолчанию (abort, drain).
start()
Запустите движок и верните metadata EngineConfig. После возврата из этого
метода generate() ДОЛЖЕН быть готов к конкурентным вызовам.
async def start(self, worker_id: int) -> EngineConfig:
# ... load weights, build scheduler, warm up CUDA, etc.
# Heavy: may take minutes. Emit logger.info checkpoints so
# operators see progress (Worker logs around start() but not
# inside it).
self._engine = await heavy_init(self.model_name)
return EngineConfig(
model=self.model_name,
served_model_name=self.model_name,
context_length=8192,
kv_cache_block_size=16, # None if no block-structured KV
total_kv_blocks=1024,
max_num_seqs=64,
max_num_batched_tokens=8192,
)
worker_id — непрозрачный идентификатор конкретного worker; большинство
движков его игнорирует. Бэкенды, которым нужен стабильный ключ в масштабе
кластера (например, snowflake disagg_machine_id в TRT-LLM), должны выводить
его из worker_id, а не хешировать host/pid и не просить операторов задавать
CLI override.
Все поля EngineConfig, кроме model, являются необязательными. None
означает "не объявлять"; KV-aware routing откатывается к round-robin, если KV
поля не заданы.
generate()
Async generator, который выдает dict GenerateChunk для одного запроса.
Вызывается конкурентно для нескольких in-flight запросов.
Контракт (форма chunk определяется TypedDict GenerateChunk; справочник
по полям см. в
типам запросов / ответов
в README пакета):
- Каждый chunk несет
token_idsиindex(используйте0для единственного choice). - Финальный chunk дополнительно несет
finish_reasonиcompletion_usage. - Монитор отмены framework вызывает
engine.abort(context), когда клиент отключается или отменяет запрос; ваш цикл также должен опрашиватьcontext.is_stopped()между yields и корректно выходить с chunkfinish_reason="cancelled".
async def generate(
self, request: GenerateRequest, context: Context
) -> AsyncGenerator[GenerateChunk, None]:
prompt_tokens = list(request.get("token_ids", []))
prompt_len = len(prompt_tokens)
stop_conditions = request.get("stop_conditions") or {}
max_new = stop_conditions.get("max_tokens") or self.max_tokens
def _usage(completion_tokens: int) -> dict[str, int]:
return {
"prompt_tokens": prompt_len,
"completion_tokens": completion_tokens,
"total_tokens": prompt_len + completion_tokens,
}
for i in range(max_new):
if context.is_stopped():
yield {
"token_ids": [],
"index": 0,
"finish_reason": "cancelled",
"completion_usage": _usage(i),
}
return
token_id = await self._next_token(prompt_tokens)
chunk: GenerateChunk = {"token_ids": [token_id], "index": 0}
if i == max_new - 1:
chunk["finish_reason"] = "length"
chunk["completion_usage"] = _usage(max_new)
yield chunk
Нормализация finish reason ("abort" → "cancelled" и т. д.) выполняется
Rust-слоем: отдавайте то, что нативно использует ваш движок.
abort(context) — необязательно
Вызывается framework только когда клиент отключается или запрос отменяется. НЕ вызывается при silent stream drops. Переопределите его, чтобы освобождать ресурсы на стороне движка (KV slots, scheduler entries, remote schedulers):
async def abort(self, context: Context) -> None:
request_id = context.id()
await self._engine.cancel(request_id)
Для cleanup, который должен выполняться на каждом пути drop, включая silent
drops, используйте try/finally или context manager внутри generate, а не
abort. Sample engine не переопределяет abort, потому что у него нет
состояния на стороне движка, которое нужно освобождать; значение по умолчанию —
no-op.
drain() — необязательно
Выполняется один раз перед shutdown, после discovery unregister и сна на grace-period, пока NATS/etcd еще живы. Используйте его для draining на стороне бэкенда, который должен завершиться до teardown транспорта (например, in-flight NIXL KV transfers на prefill workers). По умолчанию это no-op.
cleanup()
Два реальных требования, оба закреплены Rust-side conformance kit:
- Null-safe при частичном сбое
start(). Еслиstart()падает на середине, поля, которые вы выделяете постепенно, могут все еще бытьNone.cleanup()должен защищать каждый ресурс (if self._engine is not None: …), чтобы вызов после сбоя не падал на наполовину инициализированном состоянии. - Идемпотентность. Второй вызов после успешного первого должен чисто вернуться без повторного входа в teardown.
Rust Worker управляет обоими случаями: он вызывает cleanup() после успешного
возврата start() во время shutdown, а conformance kit (run_conformance)
дополнительно вызывает cleanup() на никогда не запускавшемся движке и два раза
подряд, проваливая ваши тесты с CleanupWithoutStartFailed /
SecondCleanupFailed, если любой из инвариантов нарушен. Паттерн guarded
single-shot ниже покрывает оба:
async def cleanup(self) -> None:
if self._engine is not None:
await self._engine.shutdown()
self._engine = None
Шаг 5: Напишите main.py
Три строки.
# src/my_backend/main.py
from dynamo.common.backend.run import run
from .engine import MyBackend
def main() -> None:
run(MyBackend)
if __name__ == "__main__":
main()
run устанавливает обработчики сигналов, строит distributed runtime, вызывает
engine.start(worker_id) с идентификатором, выделенным runtime, регистрирует
модель в discovery, обслуживает endpoint и запускает orchestrator корректного
shutdown по SIGTERM/SIGINT.
Сопоставьте это с записью [project.scripts] из pyproject.toml в шаге 1,
чтобы my-backend ... работал как консольная команда.
Шаг 6: Ошибки и логирование
Ошибки: framework оборачивает ошибки не типа DynamoException, поднятые из
generate() (или методов жизненного цикла), как Unknown. Для типизированной
отчетности об ошибках напрямую поднимайте subclass DynamoException из
dynamo.llm.exceptions:
он проходит через Rust bridge без изменений.
from dynamo.llm.exceptions import InvalidArgument
async def generate(self, request, context):
if not request.get("token_ids"):
raise InvalidArgument("empty prompt")
...
README пакета содержит полную таблицу типов исключений и указывает, какая фаза
жизненного цикла какой тип поднимает. Ошибки инициализации движка должны
поднимать EngineShutdown из start(). Cleanup обычно не должен поднимать
исключения: логируйте и подавляйте, если subsystem дает сбой.
Логирование: держите уровни согласованными между унифицированными бэкендами, чтобы операторы видели одинаковую поверхность независимо от выбранного движка:
logger.info— milestones жизненного цикла (инициализация движка завершена, serving started, engine shutdown).logger.debug— per-request events (request abort, cancellation).logger.warning— recoverable problems (empty outputs, unexpected finish reasons).logger.error— только unrecoverable failures.
Framework также настраивает dynamo.runtime.logging за вас; просто вызовите
logger = logging.getLogger(__name__) в начале модуля и используйте его.
Шаг 7: Протестируйте движок
Установите dev extras (pytest, pytest-asyncio), объявленные в шаге 1:
pip install -e ".[dev]"
У sample engine есть unit-test набор unit-тестов, который можно скопировать как отправную точку. Форма полезного теста:
import pytest
from my_backend import MyBackend
class _StubContext:
def __init__(self, stopped: bool = False) -> None:
self._stopped = stopped
def is_stopped(self) -> bool:
return self._stopped
def stop(self) -> None:
self._stopped = True
@pytest.mark.asyncio
async def test_generate_emits_terminal_chunk():
engine = MyBackend(model_name="m", max_tokens=3)
await engine.start(worker_id=0)
try:
chunks = [
chunk
async for chunk in engine.generate(
{"token_ids": [1, 2, 3]}, _StubContext()
)
]
assert chunks[-1]["finish_reason"] in ("stop", "length")
assert chunks[-1]["completion_usage"]["completion_tokens"] == 3
finally:
await engine.cleanup()
@pytest.mark.asyncio
async def test_generate_observes_cancellation():
engine = MyBackend(model_name="m", max_tokens=1000)
await engine.start(worker_id=0)
try:
ctx = _StubContext()
collected = []
async for chunk in engine.generate({"token_ids": [1]}, ctx):
collected.append(chunk)
if len(collected) >= 2:
ctx.stop()
assert collected[-1]["finish_reason"] == "cancelled"
finally:
await engine.cleanup()
Покройте happy path, отмену и любые backend-specific edge cases (stop tokens,
max-tokens cap, empty prompt). Трех-пяти focused tests достаточно: framework уже
закрепляет state machine жизненного цикла и контракт отмены Rust-side тестами в
lib/backend-common.
Шаг 8: Запустите локально
Нужно поднять три движущиеся части: NATS + etcd (discovery и event/request planes), Dynamo frontend (HTTP → backend discovery) и ваш backend.
pip install -e .
# Ensure NATS + etcd are reachable (NATS_SERVER, ETCD_ENDPOINTS).
# --model-name must be a valid HuggingFace repo (or local path); the
# framework fetches the tokenizer + chat template from it on startup.
# Pick a small public repo for smoke tests.
my-backend --model-name Qwen/Qwen3-0.6B --namespace dynamo
# In another shell, start the Dynamo frontend:
python -m dynamo.frontend --http-port 8000
Затем отправьте запрос:
curl http://localhost:8000/v1/chat/completions \
-H 'Content-Type: application/json' \
-d '{
"model": "Qwen/Qwen3-0.6B",
"messages": [{"role": "user", "content": "hello"}],
"max_tokens": 32
}'
Успешный ответ содержит непустой choices[0].message.content и finish_reason
со значением stop или length. jq -e '.choices[0].finish_reason' — хороший
one-liner для CI smoke test.
Если ваш backend выглядит молчащим, задайте DYN_LOG=info (или
DYN_LOG=debug,dynamo=debug для более точного scoping) перед запуском:
framework настраивает tracing из DYN_LOG.
Справочник: sample engine
sample_engine.py
— канонический минимальный reference. Запустите его как есть:
python -m dynamo.common.backend.sample_main --model-name test-model
Он генерирует вращающиеся token IDs без ML-зависимостей, поэтому полезен как замена для AIPerf / end-to-end pipeline smoke tests. Используйте эти паттерны:
from_argsразбирает CLI args и возвращает(engine, WorkerConfig)без awaits.start()возвращаетEngineConfig, KV-поля которого являются иллюстративными и не несут нагрузки (нет настоящего KV cache).generate()опрашиваетcontext.is_stopped()между yields и выдает terminalcancelledпри наблюдении отмены.cleanup()— no-op, потому что движок не держит ресурсов.
Контрольный список
Перед shipping:
-
LLMEngineунаследован;from_argsвозвращает(engine, WorkerConfig). -
start()возвращаетEngineConfigкак минимум с непустымmodel. -
generate()опрашиваетcontext.is_stopped()между yields и выдает terminal"cancelled"при наблюдении отмены. - Финальный chunk содержит
finish_reasonиcompletion_usage. - Typed subclasses
DynamoExceptionиспользуются для отчетности об ошибках там, где важна категория. -
cleanup()освобождает все ресурсы движка. - Уровни логирования соответствуют стандартам из шага 6.
См. также
- ABC
LLMEngine— авторитетный контракт. - README пакета — пробелы в возможностях, модель ошибок, контракт request/response.
- Sample engine — пример user guide.
- Написание унифицированного бэкенда на Rust — Rust-аналог, тот же контракт, более низкий уровень.