Перейти к основному содержимому

Чтобы получить чистую Markdown-версию этой страницы, добавьте .md к этому URL. Полный индекс документации см. в https://docs.nvidia.com/dynamo/llms.txt. Полное содержимое, включая справочник API и примеры SDK, см. в https://docs.nvidia.com/dynamo/llms-full.txt.

Примеры маршрутизации

За инструкциями быстрого старта см. README Router. Этот документ содержит дополнительные примеры использования Dynamo Router, включая работу с Python API, развертывания в Kubernetes и собственные шаблоны маршрутизации.

Использование Python API KvRouter

Вместо запуска KV Router из командной строки вы можете создать объект KvRouter непосредственно в Python. Это позволяет переопределять конфигурацию маршрутизации для каждого запроса.

Несколько router-узлов из одного runtime: Не создавайте несколько независимо управляемых экземпляров KvRouter из одного DistributedRuntime. Router-узлы, созданные из endpoint-ов, принадлежащих одному runtime, разделяют его основной token отмены, поэтому уничтожение одного router-узла может отменить фоновую работу, используемую другими. Для одного frontend-процесса в одном процессе используйте один KvRouter; если нужны независимые жизненные циклы router-узлов, используйте отдельные frontend-процессы или создавайте каждый router из отдельного DistributedRuntime.

Если event loop доступен как loop, для независимых жизненных циклов router в одном процессе нужны отдельные runtime:

router_a = KvRouter(DistributedRuntime(loop, "etcd", "tcp").endpoint("dynamo.backend.generate"), 16, KvRouterConfig())
router_b = KvRouter(DistributedRuntime(loop, "etcd", "tcp").endpoint("dynamo.backend.generate"), 16, KvRouterConfig())

Методы

KvRouter предоставляет следующие методы:

  • generate(token_ids, model, ...): Маршрутизирует и выполняет запрос, возвращая async stream ответов. Автоматически обрабатывает выбор worker-узла, отслеживание состояния и управление жизненным циклом.

  • best_worker(token_ids, router_config_override=None, request_id=None, update_indexer=False): Запрашивает, какой worker-узел был бы выбран для заданных токенов. Возвращает (worker_id, dp_rank, overlap_blocks).

    • Без request_id: Только запрос, не обновляет состояние router
    • С request_id: Обновляет состояние жизненного цикла router, чтобы отслеживать запрос. Примечание: Если используется request_id, необходимо вызывать mark_prefill_complete() и free() в соответствующих точках жизненного цикла, чтобы поддерживать точный учет нагрузки
    • С update_indexer=True: Записывает выбранный worker-узел в approximate indexer для будущих предсказаний overlap. Это имеет смысл только при use_kv_events=False
  • get_potential_loads(token_ids): Возвращает подробную информацию о нагрузке для всех worker-узлов, включая потенциальные prefill-токены и активные decode-блоки. Возвращает список словарей с нагрузкой.

  • mark_prefill_complete(request_id): Сигнализирует, что запрос завершил фазу prefill. Используется только при manual lifecycle management, когда best_worker() применяют для ручной маршрутизации вместо generate().

  • free(request_id): Сигнализирует, что запрос завершен и его ресурсы можно освободить. Используется только при manual lifecycle management, когда best_worker() применяют для ручной маршрутизации вместо generate().

  • dump_events(): Выгружает все события KV cache из indexer маршрутизатора в виде JSON-строки. Полезно для отладки и анализа.

Подготовка

Сначала запустите backend-движки:

python -m dynamo.vllm --model meta-llama/Llama-2-7b-hf

Пример скрипта

import asyncio
from dynamo.runtime import DistributedRuntime
from dynamo.llm import KvRouter, KvRouterConfig

async def main():
# Get runtime and create endpoint
loop = asyncio.get_running_loop()
runtime = DistributedRuntime(loop, "etcd", "nats")
endpoint = runtime.endpoint("dynamo.backend.generate")

# Create KV router
kv_router_config = KvRouterConfig()
router = KvRouter(
endpoint=endpoint,
block_size=16,
kv_router_config=kv_router_config
)

# Optional startup gate shared with the frontend and standalone indexer:
# os.environ["DYN_ROUTER_MIN_INITIAL_WORKERS"] = "2"

# Your input tokens
token_ids = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# Generate with per-request routing override
stream = await router.generate(
token_ids=token_ids,
model="meta-llama/Llama-2-7b-hf",
stop_conditions={
"max_tokens": 20, # Generate exactly 20 tokens
"ignore_eos": True, # Don't stop at EOS token
},
sampling_options={
"temperature": 0.7,
"top_p": 0.9,
},
router_config_override={
"overlap_score_credit": 1.0, # Prioritize cache hits for this request
"router_temperature": 0.5, # Add routing randomness
}
)

# Collect generated tokens
generated_tokens = []
async for response in stream:
if isinstance(response, dict) and "token_ids" in response:
generated_tokens.extend(response["token_ids"])

print(f"Generated {len(generated_tokens)} tokens: {generated_tokens}")

if __name__ == "__main__":
asyncio.run(main())

Примеры K8s

Базовый пример развертывания Kubernetes с KV Router см. в разделе Kubernetes Deployment Router Guide.

Полные примеры K8s

Для A/B-тестирования и расширенной настройки K8s: См. подробное руководство Руководство по A/B-бенчмаркингу KV Router с пошаговыми инструкциями по развертыванию, настройке и бенчмаркингу KV router в Kubernetes.

Пример с расширенной конфигурацией

apiVersion: nvidia.com/v1alpha1
kind: DynamoGraphDeployment
metadata:
name: my-deployment
spec:
services:
Frontend:
componentType: frontend
replicas: 1
envs:
- name: DYN_ROUTER_MODE
value: kv
- name: DYN_ROUTER_TEMPERATURE
value: "0.5" # Add some randomness to prevent worker saturation
- name: DYN_ROUTER_KV_OVERLAP_SCORE_CREDIT
value: "1.0" # Prefer device-local KV cache reuse
- name: DYN_ROUTER_PREFILL_LOAD_SCALE
value: "1.5" # Prioritize TTFT over ITL
- name: DYN_KV_CACHE_BLOCK_SIZE
value: "16"
extraPodSpec:
mainContainer:
image: nvcr.io/nvidia/ai-dynamo/vllm-runtime:1.2.0

Альтернатива: использование аргументов командной строки в K8s

Вы также можете передавать аргументы CLI напрямую в команду контейнера:

extraPodSpec:
mainContainer:
image: nvcr.io/nvidia/ai-dynamo/vllm-runtime:1.2.0
command:
- /bin/sh
- -c
args:
- "python3 -m dynamo.frontend --router-mode kv --router-temperature 0.5 --http-port 8000"

Рекомендация: Используйте переменные окружения, чтобы проще управлять конфигурацией и сохранить согласованность с K8s-паттернами Dynamo.

Шаблоны маршрутизации

KvRouter поддерживает несколько шаблонов использования в зависимости от того, какой уровень контроля вам нужен:

1. Автоматическая маршрутизация (рекомендуется)

Вызывайте generate() напрямую и позвольте маршрутизатору делать все остальное:

stream = await router.generate(token_ids=tokens, model="model-name")
  • Лучше всего для: Большинства сценариев
  • Маршрутизатор автоматически: Выбирает лучший worker-узел, обновляет состояние, маршрутизирует запрос и отслеживает жизненный цикл

2. Ручное управление состоянием (продвинутый режим)

Используйте best_worker(request_id=...), чтобы выбрать и отслеживать worker, а затем управляйте запросом вручную:

worker_id, _dp_rank, overlap = await router.best_worker(
tokens,
request_id="req-123",
update_indexer=True, # needed for approximate mode (use_kv_events=False)
)
response = await client.generate(tokens, request_id="req-123")
# await anext(response) # Get first token
await router.mark_prefill_complete("req-123") # After first token
# async for _ in response: # Continue generating
# ...
await router.free("req-123") # After completion
  • Лучше всего для: Пользовательской обработки запросов с отслеживанием состояния router
  • Требует: Вызова mark_prefill_complete() и free() в правильных точках жизненного цикла
  • Приближенный режим: Передавайте update_indexer=True, когда use_kv_events=False, чтобы router учился на ручных выборах worker-узлов
  • Осторожно: Некорректное управление жизненным циклом ухудшает точность балансировки нагрузки

3. Иерархическое зондирование router

Выполняйте запрос без обновления состояния, а затем маршрутизируйте через выбранный router:

# Probe multiple routers without updating state
worker_id_1, dp_rank, overlap_1 = await router_1.best_worker(tokens) # No request_id
worker_id_2, dp_rank, overlap_2 = await router_2.best_worker(tokens)

# Pick the best router and corresponding worker based on results
if overlap_1 > overlap_2:
chosen_router, chosen_worker = router_1, worker_id_1
else:
chosen_router, chosen_worker = router_2, worker_id_2
stream = await chosen_router.generate(tokens, model="model-name", worker_id=chosen_worker)
  • Лучше всего для: Многоуровневых развертываний (например, Envoy Gateway, направляющий трафик в несколько групп router)
  • Преимущество: Запрос нескольких router-узлов перед выбором одного

4. Пользовательская маршрутизация на основе нагрузки

Используйте get_potential_loads(), чтобы реализовать собственную логику маршрутизации:

loads = await router.get_potential_loads(tokens)
# Apply custom logic (e.g., weighted scoring, constraints)
best_worker = min(loads, key=lambda x: custom_cost_fn(x))
stream = await router.generate(tokens, model="model-name", worker_id=best_worker['worker_id'])
  • Лучше всего для: Пользовательских стратегий оптимизации, выходящих за рамки встроенной функции стоимости
  • Преимущество: Полный контроль над логикой выбора worker-узла
  • См. также: Подробный пример ниже в разделе "Пример пользовательской маршрутизации: минимизация TTFT"

Все шаблоны поддерживают router_config_override, чтобы менять поведение маршрутизации для каждого запроса без пересоздания router.

Пример пользовательской маршрутизации: минимизация TTFT

Ниже показан пример использования get_potential_loads() для реализации пользовательской маршрутизации, которая минимизирует Time To First Token (TTFT) за счет выбора worker-узла с наименьшим объемом prefill:

import asyncio
from dynamo.runtime import DistributedRuntime
from dynamo.llm import KvRouter, KvRouterConfig

async def minimize_ttft_routing():
# Setup router
loop = asyncio.get_running_loop()
runtime = DistributedRuntime(loop, "etcd", "nats")
endpoint = runtime.endpoint("dynamo.backend.generate")

router = KvRouter(
endpoint=endpoint,
block_size=16,
kv_router_config=KvRouterConfig()
)

# Your input tokens
token_ids = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

# Get potential loads for all workers
potential_loads = await router.get_potential_loads(token_ids)

# Find worker with minimum prefill tokens (best for TTFT)
best_worker = min(potential_loads, key=lambda x: x['potential_prefill_tokens'])

print(f"Worker loads: {potential_loads}")
print(f"Selected worker {best_worker['worker_id']} with {best_worker['potential_prefill_tokens']} prefill tokens")

# Route directly to the selected worker
stream = await router.generate(
token_ids=token_ids,
model="meta-llama/Llama-2-7b-hf",
worker_id=best_worker['worker_id'], # Force routing to optimal worker
stop_conditions={"max_tokens": 20}
)

# Process response
async for response in stream:
if isinstance(response, dict) and "token_ids" in response:
print(f"Generated tokens: {response['token_ids']}")

if __name__ == "__main__":
asyncio.run(minimize_ttft_routing())

Такой подход дает полный контроль над решениями маршрутизации и позволяет оптимизировать разные метрики в зависимости от ваших требований. Например:

  • Минимизировать TTFT: Выбирайте worker-узел с минимальным potential_prefill_tokens
  • Максимизировать повторное использование кэша: Используйте best_worker(), который учитывает и prefill-, и decode-нагрузку
  • Балансировать нагрузку: Учитывайте вместе potential_prefill_tokens и potential_decode_blocks

Подробнее об архитектуре и алгоритме функции стоимости см. Архитектура Router.

Публикация KV-событий для собственных движков

Полную документацию по реализации публикации KV-событий для собственных inference-движков см. в отдельном руководстве Публикация KV-событий для собственных движков. В нем рассматриваются:

  • Прямая публикация: Вызовы publish_stored() / publish_removed() для отправки событий через Dynamo event plane
  • Ретрансляция ZMQ: Для движков, которые публикуют сырые KV-события через ZMQ (например, SGLang и vLLM), тот же KvEventPublisher подписывается на ZMQ socket и автоматически пересылает события
  • Справочник API, структура событий, формат передачи ZMQ и лучшие практики

Глобальный Router (иерархическая маршрутизация)

Для развертываний с несколькими пулами worker-узлов Global Router обеспечивает иерархическую маршрутизацию, располагаясь между frontend и локальными router-узлами. Он выбирает подходящий пул для каждого запроса на основе настраиваемых политик и поддерживает раздельные топологии, где пулы настроены под разные характеристики нагрузки.

См. также