Перейти к основному содержимому

Чтобы получить чистую Markdown-версию этой страницы, добавьте .md к этому URL. Полный индекс документации см. в https://docs.nvidia.com/dynamo/llms.txt. Полное содержимое, включая справочник API и примеры SDK, см. в https://docs.nvidia.com/dynamo/llms-full.txt.

KV-события для пользовательских движков

В этом документе объясняется, как реализовать публикацию KV-событий для пользовательских inference-движков, чтобы они могли участвовать в маршрутизации Dynamo с учетом KV cache.

Обзор

KV Router опирается на события в реальном времени от backend-worker'ов, чтобы отслеживать, какие блоки KV cache хранятся на каждом worker'е. Когда ваш пользовательский движок выделяет или удаляет блоки KV cache, он должен публиковать эти события, чтобы маршрутизатор мог принимать оптимальные решения о маршрутизации.

События публикуются через Dynamo event plane — транспортно-нейтральный слой pub/sub, поддерживающий backend'ы NATS и ZMQ (подробности см. в Event Plane). Обвязка KvEventPublisher берет на себя все вопросы транспорта — код вашего движка не взаимодействует с event plane напрямую.

KvEventPublisher поддерживает два режима публикации:

  1. Прямая публикация — ваш движок вызывает publish_stored() / publish_removed(), чтобы отправлять события напрямую через event plane. Это самый простой подход для пользовательских движков.
  2. Ретрансляция через ZMQ — для движков, которые отправляют сырые KV-события через сокет ZMQ (например, SGLang и vLLM). Публикатор подписывается на ZMQ endpoint и автоматически передает события в event plane.

Типы событий

KV cache поддерживает три типа событий:

Тип событияОписаниеКогда публиковать
BlockStoredНовые блоки добавлены в кэшПосле успешного выделения KV cache
BlockRemovedБлоки удалены из кэшаКогда блоки вытесняются или освобождаются
AllBlocksClearedВсе блоки удаленыПри сбросе кэша или перезапуске worker'а

Структура события

Каждое событие содержит:

  • event_id: Монотонно возрастающий идентификатор для каждого worker'а (управляется публикатором внутри)
  • dp_rank: Ранг data parallel (0, если DP не включен)
  • data: Одно из значений Stored, Removed или Cleared

Для событий BlockStored:

  • token_ids: Список идентификаторов token для сохраненных блоков
  • block_hashes: Список sequence block hashes из block manager движка. Это накопительные хеши, которые включают все токены от начала последовательности до текущего блока включительно, а не только токены внутри этого блока. Это позволяет выполнять сопоставление префиксов между запросами.
  • num_block_tokens: Количество токенов на блок (все значения должны быть равны kv_block_size)
  • parent_hash: Хеш родительского блока. Обязателен для всех блоков, кроме первого блока в последовательности (у которого нет родителя).
  • lora_name: Имя адаптера LoRA (опустите или укажите None для базовой модели). Если значение задано, имя адаптера учитывается при вычислении хеша блока, чтобы блоки для разных адаптеров LoRA (или базовой модели) не смешивались.

Для событий BlockRemoved:

  • block_hashes: Список sequence block hashes, которые вытесняются

Прямая публикация (рекомендуется для пользовательских движков)

Вызывайте publish_stored() и publish_removed() напрямую из кода своего движка. Публикатор сам обрабатывает event_id, сериализацию и транспорт.

flowchart LR
subgraph Engine["Custom Engine"]
cache["KV Cache Manager"]
end

subgraph Worker["Dynamo Worker Process"]
pub["KvEventPublisher"]
end

subgraph EP["Dynamo Event Plane"]
topic["kv-events topic"]
end

subgraph Router["KV Router"]
indexer["KvIndexer"]
end

cache -->|"publish_stored()<br/>publish_removed()"| pub
pub -->|"event plane"| topic
topic --> indexer

Когда использовать:

  • При создании пользовательского inference-движка с нуля
  • Если у вашего движка нет системы событий на базе ZMQ
  • Если нужен самый простой путь интеграции

Базовая настройка

from dynamo.llm import KvEventPublisher

class CustomEnginePublisher:
def __init__(self, component, block_size: int, dp_rank: int = 0):
self.block_size = block_size
self.kv_publisher = KvEventPublisher(
component=component,
kv_block_size=block_size,
dp_rank=dp_rank,
)

def on_blocks_stored(self, token_ids: list[int], block_hashes: list[int],
parent_hash: int | None = None,
lora_name: str | None = None):
"""Вызывать после выделения блоков KV cache."""
num_block_tokens = [self.block_size] * len(block_hashes)
self.kv_publisher.publish_stored(
token_ids=token_ids,
num_block_tokens=num_block_tokens,
block_hashes=block_hashes,
parent_hash=parent_hash,
lora_name=lora_name,
)

def on_blocks_removed(self, block_hashes: list[int]):
"""Вызывать, когда блоки KV cache вытесняются."""
self.kv_publisher.publish_removed(block_hashes=block_hashes)

Интеграция с вашим движком

from dynamo.llm import register_model

async def main():
component, endpoint = await register_model(
model="my-model",
generator=my_generate_fn,
)

publisher = CustomEnginePublisher(
component=component,
block_size=16, # Match your engine's block size
)

def on_prefill_complete(request_id, token_ids, blocks):
block_hashes = [block.hash for block in blocks]
publisher.on_blocks_stored(token_ids=token_ids, block_hashes=block_hashes)

def on_cache_eviction(evicted_blocks):
block_hashes = [block.hash for block in evicted_blocks]
publisher.on_blocks_removed(block_hashes=block_hashes)

Ретрансляция через ZMQ (для движков с сырыми KV-событиями)

Для движков, которые уже публикуют сырые KV-события через сокет ZMQ (например, SGLang и vLLM), используйте тот же KvEventPublisher с zmq_endpoint. Публикатор подписывается на сокет ZMQ и автоматически ретранслирует события в event plane.

flowchart LR
subgraph Engine["Custom Engine / SGLang / vLLM"]
cache["KV Cache Manager"]
zmq_pub["ZMQ Publisher"]
end

subgraph ZMQ["ZMQ Socket"]
socket["tcp://127.0.0.1:5557"]
end

subgraph Worker["Dynamo Worker Process"]
relay["KvEventPublisher<br/>(relay mode)"]
end

subgraph EP["Dynamo Event Plane"]
topic["kv-events topic"]
end

subgraph Router["KV Router"]
indexer["KvIndexer"]
end

cache --> zmq_pub
zmq_pub -->|"PUB"| socket
socket -->|"SUB"| relay
relay -->|"event plane"| topic
topic --> indexer

Когда использовать:

  • Если ваш движок уже публикует KV-события через ZMQ (например, SGLang или vLLM)
  • Если вы хотите отделить публикацию событий от основного цикла движка

Настройка

Передайте zmq_endpoint (и необязательный zmq_topic) в тот же KvEventPublisher:

from dynamo.llm import KvEventPublisher

kv_publisher = KvEventPublisher(
component=component,
kv_block_size=block_size,
zmq_endpoint="tcp://127.0.0.1:5557", # Where your engine publishes
zmq_topic="", # Subscribe to all topics
)

Дальнейшие вызовы publish_stored() / publish_removed() не нужны — публикатор читает события из сокета ZMQ и пересылает их автоматически.

Формат сообщений ZMQ

Формат сообщений ZMQ (совместим с SGLang / vLLM):

ФреймОписание
1Тема (пустая строка для всех topics)
2Номер последовательности (8 байт, big-endian)
3Msgpack payload: [timestamp, [events], dp_rank]

Каждое событие в payload представляет собой словарь с полем type (BlockStored, BlockRemoved или AllBlocksCleared).

Для BlockStored:

{
"type": "BlockStored",
"block_hashes": [signed_i64, ...], # Sequence block hashes
"parent_block_hash": signed_i64 | None, # Parent hash
"token_ids": [int, ...], # Token IDs
"block_size": int, # Tokens per block
"lora_name": str | None, # LoRA adapter name
}

Для BlockRemoved:

{
"type": "BlockRemoved",
"block_hashes": [signed_i64, ...],
}

Для AllBlocksCleared:

{"type": "AllBlocksCleared"}

Справочник API

KvEventPublisher

KvEventPublisher(
component: Component,
kv_block_size: int,
dp_rank: int = 0,
enable_local_indexer: bool = False,
zmq_endpoint: str | None = None, # Set for relay mode
zmq_topic: str | None = None, # Defaults to "" when zmq_endpoint is set
)
ПараметрОписание
componentКомпонент Dynamo, которому принадлежит этот публикатор
kv_block_sizeКоличество токенов в блоке (должно быть > 0 и совпадать с размером блока вашего движка)
dp_rankРанг data parallel (по умолчанию 0)
enable_local_indexerВключить worker-local KV indexer для прямых запросов на пересечение
zmq_endpointZMQ endpoint, на который нужно подписаться в режиме ретрансляции (например, "tcp://127.0.0.1:5557")
zmq_topicФильтр topic для ZMQ (по умолчанию "" = все topics)

publish_stored()

publish_stored(
token_ids: list[int],
num_block_tokens: list[int],
block_hashes: list[int],
parent_hash: int | None = None,
block_mm_infos: list[dict | None] | None = None,
lora_name: str | None = None,
)

Публикует событие block-stored. event_id управляется внутренне. Если указан lora_name, имя адаптера учитывается при вычислении хеша блока, чтобы блоки, кэшированные под разными адаптерами, давали разные хеши.

publish_removed()

publish_removed(block_hashes: list[int])

Публикует событие block-removed. event_id управляется внутренне.

shutdown()

shutdown()

Останавливает фоновые задачи (listener ZMQ, пересылку событий).

Рекомендации

  1. kv_block_size должен совпадать с фактическим размером блока вашего движка.

  2. parent_hash обязателен для всех блоков, кроме первого в последовательности, - он связывает блоки и позволяет выполнять сопоставление префиксов.

  3. Хеши блоков - это знаковые 64-битные целые числа в Python API. Публикатор выполняет преобразование внутри.

  4. Порядок событий задается автоматически — публикатор назначает монотонно возрастающие event_id. Вам не нужно отслеживать event_id вручную.

См. также

  • Event Plane: Варианты транспорта (NATS, ZMQ) и настройка
  • Configuration and Tuning: Флаги маршрутизатора, тюнинг и production-настройка
  • Router Design: Детали архитектуры и режимы передачи событий