Для чистого Markdown-контента этой страницы добавьте
.mdк этому URL. Полный индекс документации см. в https://docs.nvidia.com/dynamo/llms.txt. Полное содержимое, включая справочник API и примеры SDK, см. в https://docs.nvidia.com/dynamo/llms-full.txt.
Написание Python-воркеров в Dynamo
Написание Python-воркеров в Dynamo
Путь низкоуровневого Python-воркера. Это руководство описывает точку входа
@dynamo_worker()+register_model()+endpoint.serve_endpoint(). Для новых движков лучше использовать унифицированный Python backend (или унифицированный Rust backend) Dynamo — в этом случае фреймворк берёт на себя управление жизненным циклом, обработку сигналов, мониторинг отмены и регистрацию моделей, а также предоставляет поддержку для метрик Prometheus, публикации событий KV, маршрутизации с учётом KV, трассировки OpenTelemetry, canary-проверок health-check, guided decoding и пользовательских шаблонов Jinja для чата. Оставайтесь на этом пути для нагрузок, которым нужны multimodal, LoRA, извлечение logprob, engine routes (sleep/wake, profiling, weight updates), text-in-text-out, snapshot/CRIU или diffusion — эти возможности унифицированный backend пока не покрывает. Смотрите пробелы в возможностях унифицированного пути для актуальной матрицы.
Это руководство объясняет, как создать собственный Python-воркер в Dynamo.
Библиотека Python dynamo позволяет создать собственный движок и подключить его к Dynamo.
Файл Python должен делать три вещи:
- Декорировать функцию, чтобы получить runtime
- Зарегистрироваться в сети
- Присоединить обработчик запросов
from dynamo.llm import ModelInput, ModelType, register_model
from dynamo.runtime import DistributedRuntime, dynamo_worker
# 1. Decorate a function to get the runtime
#
@dynamo_worker()
async def worker(runtime: DistributedRuntime):
# 2. Register ourselves on the network
#
endpoint = runtime.endpoint("namespace.component.endpoint")
model_path = "Qwen/Qwen3-0.6B" # or "/data/models/Qwen3-0.6B"
model_input = ModelInput.Tokens # or ModelInput.Text if engine handles pre-processing
model_type = ModelType.Chat # or ModelType.Chat | ModelType.Completions if model can be deployed on chat and completions endpoints
# Optional last param to register_model is model_name. If not present derives it from model_path
await register_model(model_input, model_type, endpoint, model_path)
# Initialize your engine here
# engine = ...
# 3. Attach request handler
#
await endpoint.serve_endpoint(RequestHandler(engine).generate)
class RequestHandler:
def __init__(self, engine):
...
async def generate(self, request):
# Call the engine
# yield result dict
...
if __name__ == "__main__":
uvloop.install()
asyncio.run(worker())
model_path может быть:
- ID репозитория HuggingFace, при необходимости с префиксом
hf://. Он будет загружен и кэширован локально. - Путём к checkout репозитория HuggingFace - любой папке, содержащей файлы safetensor, а также
config.json,tokenizer.jsonиtokenizer_config.json.
model_input может быть:
- ModelInput.Tokens. Ваш движок ожидает предварительно обработанный ввод (ID токенов). Dynamo обрабатывает токенизацию и pre-processing.
- ModelInput.Text. Ваш движок ожидает необработанный текстовый ввод и сам выполняет токенизацию и pre-processing.
model_type может быть:
- ModelType.Chat. Ваш метод
generateполучаетrequestи должен возвращать словарь ответа типа OpenAI Chat Completion. - ModelType.Completions. Ваш метод
generateполучаетrequestи должен возвращать словарь ответа старого типа Completions.
register_model также может принимать следующие kwargs:
model_name: Имя, под которым вызывается модель. Имя модели в входящих HTTP-запросах должно совпадать с ним. По умолчанию используется имя репозитория Hugging Face или имя папки.context_length: Максимальная длина модели в токенах. По умолчанию используется максимальное значение, заданное в модели. Указывайте это только если нужно уменьшить выделение KV cache, чтобы поместиться в VRAM.kv_cache_block_size: Размер KV-блока для движка в токенах. По умолчанию 16.user_data: Необязательный словарь с пользовательскими метаданными для поведения воркера (например, конфигурацией LoRA). По умолчаниюNone.
См. examples/backends для полных примеров кода.
Имена компонентов
Для регистрации воркеру нужны три имени: namespace.component.endpoint
- Namespace: Pipeline. Обычно модель. Например, "llama_8b". Просто имя.
- Component: Сервис с балансировкой нагрузки, необходимый для запуска этого pipeline. "backend", "prefill", "decode", "preprocessor", "draft" и т. д. Обычно он имеет некоторую конфигурацию (например, какую модель использовать).
- Endpoint: Как URL. "generate", "load_metrics".
- Instance: Процесс. Уникальный. Dynamo назначает каждому уникальный
instance_id. То, что запущено, всегда является instance. Namespace/component/endpoint могут относиться к нескольким instances.
Если вы запускаете две модели, это две pipelines. Исключение - speculative decoding. Draft-модель является частью pipeline более крупной модели.
Если вы запускаете два экземпляра одной и той же модели ("data parallel"), это один и тот же namespace+component+endpoint, но разные instances. Router распределит трафик между всеми instances одного namespace+component+endpoint. Если в pipeline у вас четыре prefill-воркера, у них у всех один и тот же namespace+component+endpoint, и каждому автоматически назначается уникальный instance_id.
Пример 1: Data parallel с балансировкой нагрузки, одна модель, один pipeline, два экземпляра.
Node 1: namespace: qwen3-32b, component: backend, endpoint: generate, model: /data/Qwen3-32B --tensor-parallel-size 2 --base-gpu-id 0
Node 2: namespace: qwen3-32b, component: backend, endpoint: generate model: /data/Qwen3-32B --tensor-parallel-size 2 --base-gpu-id 2
Пример 2: Две модели, два pipeline.
Node 1: namespace: qwen3-32b, component: backend, endpoint: generate, model: /data/Qwen3-32B
Node 2: namespace: llama3-1-8b, component: backend, endpoint: generat, model: /data/Llama-3.1-8B-Instruct/
Пример 3: Разные endpoints.
Публикатор метрик KV в VLLM добавляет endpoint load_metrics к текущему component. Если указанный выше component llama3-1-8b.backend использует patched vllm, он также будет экспонировать llama3-1-8b.backend.load_metrics.
Пример 4: Несколько component в одном pipeline.
В раздельной P/D-конфигурации у вас будут deepseek-distill-llama8b.prefill.generate (возможно, несколько экземпляров этого) и deepseek-distill-llama8b.decode.generate.
Миграция незавершённых запросов
Python-воркер может потребоваться быстро завершить, например когда ноду, на которой он работает, нужно вернуть в пул, а времени на завершение всех текущих запросов до дедлайна выключения не хватает.
В таких случаях можно сигнализировать о незавершённых ответах, выбросив исключение EngineShutdown в цикле generate. Это немедленно закроет поток ответа и сообщит frontend, что поток неполный. Если включена миграция запросов (см. параметр migration_limit), frontend автоматически перенесёт частично выполненный запрос на другой экземпляр worker, если он доступен, чтобы завершить его.
Вот пример того, как реализовать это в RequestHandler:
from dynamo.llm.exceptions import EngineShutdown
class RequestHandler:
async def generate(self, request):
"""Generate response, with support for request migration"""
for result in self.engine.generate_streaming(request):
# Check if we need to migrate before yielding each token
if is_shutting_down():
# Raising EngineShutdown closes the stream and triggers migration
raise EngineShutdown("Worker shutting down, migrating request")
yield result
Когда выбрасывается EngineShutdown, frontend получает незавершённый ответ и может бесшовно продолжить генерацию на другом доступном экземпляре worker, сохраняя пользовательский опыт даже во время завершения работы worker.
Дополнительные сведения о том, как работает миграция запросов, см. в документации Request Migration Architecture.
Отмена запросов
Обработчик запросов вашего Python-воркера может при желании поддерживать отмену запросов, если после аргумента request принять аргумент context. Этот объект context позволяет проверять сигналы отмены и реагировать соответствующим образом:
class RequestHandler:
async def generate(self, request, context):
"""Generate response with cancellation support"""
for result in self.engine.generate_streaming(request):
# Check if the request has been cancelled
if context.is_stopped():
# Stop processing and clean up
break
yield result
Параметр context необязателен - если в сигнатуре метода generate его нет, Dynamo вызовет метод без аргумента context.
Подробности об отмене запросов, включая асинхронный мониторинг отмены и паттерны распространения context, см. в документации Request Cancellation Architecture.