Перейти к основному содержимому

Для чистого Markdown-контента этой страницы добавьте .md к этому URL. Полный индекс документации см. в https://docs.nvidia.com/dynamo/llms.txt. Полное содержимое, включая справочник API и примеры SDK, см. в https://docs.nvidia.com/dynamo/llms-full.txt.

Написание Python-воркеров в Dynamo

Написание Python-воркеров в Dynamo

Путь низкоуровневого Python-воркера. Это руководство описывает точку входа @dynamo_worker() + register_model() + endpoint.serve_endpoint(). Для новых движков лучше использовать унифицированный Python backend (или унифицированный Rust backend) Dynamo — в этом случае фреймворк берёт на себя управление жизненным циклом, обработку сигналов, мониторинг отмены и регистрацию моделей, а также предоставляет поддержку для метрик Prometheus, публикации событий KV, маршрутизации с учётом KV, трассировки OpenTelemetry, canary-проверок health-check, guided decoding и пользовательских шаблонов Jinja для чата. Оставайтесь на этом пути для нагрузок, которым нужны multimodal, LoRA, извлечение logprob, engine routes (sleep/wake, profiling, weight updates), text-in-text-out, snapshot/CRIU или diffusion — эти возможности унифицированный backend пока не покрывает. Смотрите пробелы в возможностях унифицированного пути для актуальной матрицы.

Это руководство объясняет, как создать собственный Python-воркер в Dynamo.

Библиотека Python dynamo позволяет создать собственный движок и подключить его к Dynamo.

Файл Python должен делать три вещи:

  1. Декорировать функцию, чтобы получить runtime
  2. Зарегистрироваться в сети
  3. Присоединить обработчик запросов
from dynamo.llm import ModelInput, ModelType, register_model
from dynamo.runtime import DistributedRuntime, dynamo_worker

# 1. Decorate a function to get the runtime
#
@dynamo_worker()
async def worker(runtime: DistributedRuntime):

# 2. Register ourselves on the network
#
endpoint = runtime.endpoint("namespace.component.endpoint")
model_path = "Qwen/Qwen3-0.6B" # or "/data/models/Qwen3-0.6B"
model_input = ModelInput.Tokens # or ModelInput.Text if engine handles pre-processing
model_type = ModelType.Chat # or ModelType.Chat | ModelType.Completions if model can be deployed on chat and completions endpoints
# Optional last param to register_model is model_name. If not present derives it from model_path
await register_model(model_input, model_type, endpoint, model_path)

# Initialize your engine here
# engine = ...

# 3. Attach request handler
#
await endpoint.serve_endpoint(RequestHandler(engine).generate)

class RequestHandler:

def __init__(self, engine):
...

async def generate(self, request):
# Call the engine
# yield result dict
...

if __name__ == "__main__":
uvloop.install()
asyncio.run(worker())

model_path может быть:

  • ID репозитория HuggingFace, при необходимости с префиксом hf://. Он будет загружен и кэширован локально.
  • Путём к checkout репозитория HuggingFace - любой папке, содержащей файлы safetensor, а также config.json, tokenizer.json и tokenizer_config.json.

model_input может быть:

  • ModelInput.Tokens. Ваш движок ожидает предварительно обработанный ввод (ID токенов). Dynamo обрабатывает токенизацию и pre-processing.
  • ModelInput.Text. Ваш движок ожидает необработанный текстовый ввод и сам выполняет токенизацию и pre-processing.

model_type может быть:

  • ModelType.Chat. Ваш метод generate получает request и должен возвращать словарь ответа типа OpenAI Chat Completion.
  • ModelType.Completions. Ваш метод generate получает request и должен возвращать словарь ответа старого типа Completions.

register_model также может принимать следующие kwargs:

  • model_name: Имя, под которым вызывается модель. Имя модели в входящих HTTP-запросах должно совпадать с ним. По умолчанию используется имя репозитория Hugging Face или имя папки.
  • context_length: Максимальная длина модели в токенах. По умолчанию используется максимальное значение, заданное в модели. Указывайте это только если нужно уменьшить выделение KV cache, чтобы поместиться в VRAM.
  • kv_cache_block_size: Размер KV-блока для движка в токенах. По умолчанию 16.
  • user_data: Необязательный словарь с пользовательскими метаданными для поведения воркера (например, конфигурацией LoRA). По умолчанию None.

См. examples/backends для полных примеров кода.

Имена компонентов

Для регистрации воркеру нужны три имени: namespace.component.endpoint

  • Namespace: Pipeline. Обычно модель. Например, "llama_8b". Просто имя.
  • Component: Сервис с балансировкой нагрузки, необходимый для запуска этого pipeline. "backend", "prefill", "decode", "preprocessor", "draft" и т. д. Обычно он имеет некоторую конфигурацию (например, какую модель использовать).
  • Endpoint: Как URL. "generate", "load_metrics".
  • Instance: Процесс. Уникальный. Dynamo назначает каждому уникальный instance_id. То, что запущено, всегда является instance. Namespace/component/endpoint могут относиться к нескольким instances.

Если вы запускаете две модели, это две pipelines. Исключение - speculative decoding. Draft-модель является частью pipeline более крупной модели.

Если вы запускаете два экземпляра одной и той же модели ("data parallel"), это один и тот же namespace+component+endpoint, но разные instances. Router распределит трафик между всеми instances одного namespace+component+endpoint. Если в pipeline у вас четыре prefill-воркера, у них у всех один и тот же namespace+component+endpoint, и каждому автоматически назначается уникальный instance_id.

Пример 1: Data parallel с балансировкой нагрузки, одна модель, один pipeline, два экземпляра.

Node 1: namespace: qwen3-32b, component: backend, endpoint: generate, model: /data/Qwen3-32B --tensor-parallel-size 2 --base-gpu-id 0
Node 2: namespace: qwen3-32b, component: backend, endpoint: generate model: /data/Qwen3-32B --tensor-parallel-size 2 --base-gpu-id 2

Пример 2: Две модели, два pipeline.

Node 1: namespace: qwen3-32b, component: backend, endpoint: generate, model: /data/Qwen3-32B
Node 2: namespace: llama3-1-8b, component: backend, endpoint: generat, model: /data/Llama-3.1-8B-Instruct/

Пример 3: Разные endpoints.

Публикатор метрик KV в VLLM добавляет endpoint load_metrics к текущему component. Если указанный выше component llama3-1-8b.backend использует patched vllm, он также будет экспонировать llama3-1-8b.backend.load_metrics.

Пример 4: Несколько component в одном pipeline.

В раздельной P/D-конфигурации у вас будут deepseek-distill-llama8b.prefill.generate (возможно, несколько экземпляров этого) и deepseek-distill-llama8b.decode.generate.

Миграция незавершённых запросов

Python-воркер может потребоваться быстро завершить, например когда ноду, на которой он работает, нужно вернуть в пул, а времени на завершение всех текущих запросов до дедлайна выключения не хватает.

В таких случаях можно сигнализировать о незавершённых ответах, выбросив исключение EngineShutdown в цикле generate. Это немедленно закроет поток ответа и сообщит frontend, что поток неполный. Если включена миграция запросов (см. параметр migration_limit), frontend автоматически перенесёт частично выполненный запрос на другой экземпляр worker, если он доступен, чтобы завершить его.

Вот пример того, как реализовать это в RequestHandler:

from dynamo.llm.exceptions import EngineShutdown

class RequestHandler:

async def generate(self, request):
"""Generate response, with support for request migration"""
for result in self.engine.generate_streaming(request):
# Check if we need to migrate before yielding each token
if is_shutting_down():
# Raising EngineShutdown closes the stream and triggers migration
raise EngineShutdown("Worker shutting down, migrating request")

yield result

Когда выбрасывается EngineShutdown, frontend получает незавершённый ответ и может бесшовно продолжить генерацию на другом доступном экземпляре worker, сохраняя пользовательский опыт даже во время завершения работы worker.

Дополнительные сведения о том, как работает миграция запросов, см. в документации Request Migration Architecture.

Отмена запросов

Обработчик запросов вашего Python-воркера может при желании поддерживать отмену запросов, если после аргумента request принять аргумент context. Этот объект context позволяет проверять сигналы отмены и реагировать соответствующим образом:

class RequestHandler:

async def generate(self, request, context):
"""Generate response with cancellation support"""
for result in self.engine.generate_streaming(request):
# Check if the request has been cancelled
if context.is_stopped():
# Stop processing and clean up
break

yield result

Параметр context необязателен - если в сигнатуре метода generate его нет, Dynamo вызовет метод без аргумента context.

Подробности об отмене запросов, включая асинхронный мониторинг отмены и паттерны распространения context, см. в документации Request Cancellation Architecture.