Чтобы получить чистую Markdown-версию этой страницы, добавьте
.mdк этому URL. Полный индекс документации см. в https://docs.nvidia.com/dynamo/llms.txt. Полное содержимое, включая справочник API и примеры SDK, см. в https://docs.nvidia.com/dynamo/llms-full.txt.
KV-события для пользовательских движков
В этом документе объясняется, как реализовать публикацию KV-событий для пользовательских inference-движков, чтобы они могли участвовать в маршрутизации Dynamo с учетом KV cache.
Обзор
KV Router опирается на события в реальном времени от backend-worker'ов, чтобы отслеживать, какие блоки KV cache хранятся на каждом worker'е. Когда ваш пользовательский движок выделяет или удаляет блоки KV cache, он должен публиковать эти события, чтобы маршрутизатор мог принимать оптимальные решения о маршрутизации.
События публикуются через Dynamo event plane — транспортно-нейтральный слой pub/sub, поддерживающий backend'ы NATS и ZMQ (подробности см. в Event Plane). Обвязка KvEventPublisher берет на себя все вопросы транспорта — код вашего движка не взаимодействует с event plane напрямую.
KvEventPublisher поддерживает два режима публикации:
- Прямая публикация — ваш движок вызывает
publish_stored()/publish_removed(), чтобы отправлять события напрямую через event plane. Это самый простой подход для пользовательских движков. - Ретрансляция через ZMQ — для движков, которые отправляют сырые KV-события через сокет ZMQ (например, SGLang и vLLM). Публикатор подписывается на ZMQ endpoint и автоматически передает события в event plane.
Типы событий
KV cache поддерживает три типа событий:
| Тип события | Описание | Когда публиковать |
|---|---|---|
BlockStored | Новые блоки добавлены в кэш | После успешного выделения KV cache |
BlockRemoved | Блоки удалены из кэша | Когда блоки вытесняются или освобождаются |
AllBlocksCleared | Все блоки удалены | При сбросе кэша или перезапуске worker'а |
Структура события
Каждое событие содержит:
event_id: Монотонно возрастающий идентификатор для каждого worker'а (управляется публикатором внутри)dp_rank: Ранг data parallel (0, если DP не включен)data: Одно из значенийStored,RemovedилиCleared
Для событий BlockStored:
token_ids: Список идентификаторов token для сохраненных блоковblock_hashes: Список sequence block hashes из block manager движка. Это накопительные хеши, которые включают все токены от начала последовательности до текущего блока включительно, а не только токены внутри этого блока. Это позволяет выполнять сопоставление префиксов между запросами.num_block_tokens: Количество токенов на блок (все значения должны быть равныkv_block_size)parent_hash: Хеш родительского блока. Обязателен для всех блоков, кроме первого блока в последовательности (у которого нет родителя).lora_name: Имя адаптера LoRA (опустите или укажитеNoneдля базовой модели). Если значение задано, имя адаптера учитывается при вычислении хеша блока, чтобы блоки для разных адаптеров LoRA (или базовой модели) не смешивались.
Для событий BlockRemoved:
block_hashes: Список sequence block hashes, которые вытесняются
Прямая публикация (рекомендуется для пользовательских движков)
Вызывайте publish_stored() и publish_removed() напрямую из кода своего движка. Публикатор сам обрабатывает event_id, сериализацию и транспорт.
flowchart LR
subgraph Engine["Custom Engine"]
cache["KV Cache Manager"]
end
subgraph Worker["Dynamo Worker Process"]
pub["KvEventPublisher"]
end
subgraph EP["Dynamo Event Plane"]
topic["kv-events topic"]
end
subgraph Router["KV Router"]
indexer["KvIndexer"]
end
cache -->|"publish_stored()<br/>publish_removed()"| pub
pub -->|"event plane"| topic
topic --> indexer
Когда использовать:
- При создании пользовательского inference-движка с нуля
- Если у вашего движка нет системы событий на базе ZMQ
- Если нужен самый простой путь интеграции
Базовая настройка
from dynamo.llm import KvEventPublisher
class CustomEnginePublisher:
def __init__(self, component, block_size: int, dp_rank: int = 0):
self.block_size = block_size
self.kv_publisher = KvEventPublisher(
component=component,
kv_block_size=block_size,
dp_rank=dp_rank,
)
def on_blocks_stored(self, token_ids: list[int], block_hashes: list[int],
parent_hash: int | None = None,
lora_name: str | None = None):
"""Вызывать после выделения блоков KV cache."""
num_block_tokens = [self.block_size] * len(block_hashes)
self.kv_publisher.publish_stored(
token_ids=token_ids,
num_block_tokens=num_block_tokens,
block_hashes=block_hashes,
parent_hash=parent_hash,
lora_name=lora_name,
)
def on_blocks_removed(self, block_hashes: list[int]):
"""Вызывать, когда блоки KV cache вытесняются."""
self.kv_publisher.publish_removed(block_hashes=block_hashes)
Интеграция с вашим движком
from dynamo.llm import register_model
async def main():
component, endpoint = await register_model(
model="my-model",
generator=my_generate_fn,
)
publisher = CustomEnginePublisher(
component=component,
block_size=16, # Match your engine's block size
)
def on_prefill_complete(request_id, token_ids, blocks):
block_hashes = [block.hash for block in blocks]
publisher.on_blocks_stored(token_ids=token_ids, block_hashes=block_hashes)
def on_cache_eviction(evicted_blocks):
block_hashes = [block.hash for block in evicted_blocks]
publisher.on_blocks_removed(block_hashes=block_hashes)
Ретрансляция через ZMQ (для движков с сырыми KV-событиями)
Для движков, которые уже публикуют сырые KV-события через сокет ZMQ (например, SGLang и vLLM), используйте тот же KvEventPublisher с zmq_endpoint. Публикатор подписывается на сокет ZMQ и автоматически ретранслирует события в event plane.
flowchart LR
subgraph Engine["Custom Engine / SGLang / vLLM"]
cache["KV Cache Manager"]
zmq_pub["ZMQ Publisher"]
end
subgraph ZMQ["ZMQ Socket"]
socket["tcp://127.0.0.1:5557"]
end
subgraph Worker["Dynamo Worker Process"]
relay["KvEventPublisher<br/>(relay mode)"]
end
subgraph EP["Dynamo Event Plane"]
topic["kv-events topic"]
end
subgraph Router["KV Router"]
indexer["KvIndexer"]
end
cache --> zmq_pub
zmq_pub -->|"PUB"| socket
socket -->|"SUB"| relay
relay -->|"event plane"| topic
topic --> indexer
Когда использовать:
- Если ваш движок уже публикует KV-события через ZMQ (например, SGLang или vLLM)
- Если вы хотите отделить публикацию событий от основного цикла движка
Настройка
Передайте zmq_endpoint (и необязательный zmq_topic) в тот же KvEventPublisher:
from dynamo.llm import KvEventPublisher
kv_publisher = KvEventPublisher(
component=component,
kv_block_size=block_size,
zmq_endpoint="tcp://127.0.0.1:5557", # Where your engine publishes
zmq_topic="", # Subscribe to all topics
)
Дальнейшие вызовы publish_stored() / publish_removed() не нужны — публикатор читает события из сокета ZMQ и пересылает их автоматически.
Формат сообщений ZMQ
Формат сообщений ZMQ (совместим с SGLang / vLLM):
| Фрейм | Описание |
|---|---|
| 1 | Тема (пустая строка для всех topics) |
| 2 | Номер последовательности (8 байт, big-endian) |
| 3 | Msgpack payload: [timestamp, [events], dp_rank] |
Каждое событие в payload представляет собой словарь с полем type (BlockStored, BlockRemoved или AllBlocksCleared).
Для BlockStored:
{
"type": "BlockStored",
"block_hashes": [signed_i64, ...], # Sequence block hashes
"parent_block_hash": signed_i64 | None, # Parent hash
"token_ids": [int, ...], # Token IDs
"block_size": int, # Tokens per block
"lora_name": str | None, # LoRA adapter name
}
Для BlockRemoved:
{
"type": "BlockRemoved",
"block_hashes": [signed_i64, ...],
}
Для AllBlocksCleared:
{"type": "AllBlocksCleared"}
Справочник API
KvEventPublisher
KvEventPublisher(
component: Component,
kv_block_size: int,
dp_rank: int = 0,
enable_local_indexer: bool = False,
zmq_endpoint: str | None = None, # Set for relay mode
zmq_topic: str | None = None, # Defaults to "" when zmq_endpoint is set
)
| Параметр | Описание |
|---|---|
component | Компонент Dynamo, которому принадлежит этот публикатор |
kv_block_size | Количество токенов в блоке (должно быть > 0 и совпадать с размером блока вашего движка) |
dp_rank | Ранг data parallel (по умолчанию 0) |
enable_local_indexer | Включить worker-local KV indexer для прямых запросов на пересечение |
zmq_endpoint | ZMQ endpoint, на который нужно подписаться в режиме ретрансляции (например, "tcp://127.0.0.1:5557") |
zmq_topic | Фильтр topic для ZMQ (по умолчанию "" = все topics) |
publish_stored()
publish_stored(
token_ids: list[int],
num_block_tokens: list[int],
block_hashes: list[int],
parent_hash: int | None = None,
block_mm_infos: list[dict | None] | None = None,
lora_name: str | None = None,
)
Публикует событие block-stored. event_id управляется внутренне. Если указан lora_name, имя адаптера учитывается при вычислении хеша блока, чтобы блоки, кэшированные под разными адаптерами, давали разные хеши.
publish_removed()
publish_removed(block_hashes: list[int])
Публикует событие block-removed. event_id управляется внутренне.
shutdown()
shutdown()
Останавливает фоновые задачи (listener ZMQ, пересылку событий).
Рекомендации
-
kv_block_sizeдолжен совпадать с фактическим размером блока вашего движка. -
parent_hashобязателен для всех блоков, кроме первого в последовательности, - он связывает блоки и позволяет выполнять сопоставление префиксов. -
Хеши блоков - это знаковые 64-битные целые числа в Python API. Публикатор выполняет преобразование внутри.
-
Порядок событий задается автоматически — публикатор назначает монотонно возрастающие
event_id. Вам не нужно отслеживатьevent_idвручную.
См. также
- Event Plane: Варианты транспорта (NATS, ZMQ) и настройка
- Configuration and Tuning: Флаги маршрутизатора, тюнинг и production-настройка
- Router Design: Детали архитектуры и режимы передачи событий