Описание grpc-сообщений сервиса топиков в YDB

Notice

Эта статья - черновик заметок по разработке SDK, может произвольным образом меняться, дополняться и удаляться. Версия НЕ стабильна.

Описание структуры grpc-сообщений

MetadataItem, type

MetadataItem.key

key = 1 (string) - ключ метаданных. Ключи не должны повторяться внутри одного массива с метаданными, т.к. по смыслу это словать и на клиентах используется как словарь.

MetadataItem.value

value = 2 (bytes) - значение элемента метаданных.

StreamWriteMessage.FromClient.InitRequest, type

StreamWriteMessage.FromClient.InitRequest.path

StreamWriteMessage.FromClient.InitRequest.path = 1 (string) - путь к топику, обязательный параметр. В рамках одного стрима можно писать сообщения только в один топик.

StreamWriteMessage.FromClient.InitRequest.producer_id

StreamWriteMessage.FromClient.InitRequest.producer_id = 2 (string) - идентификатор источника сообщений. Этот параметр участвует в выборе партиции для записи и в процессе дедупликации сообщений при отправке.

Если producer_id указан, то он прикрепляется к конкретной партиции и все сообщения этого producer_id идут в эту партицию, в т.ч. при переподключениях. Это гарантируется на стороне сервера. При попытке указать другую партицию вручную - сервер вернёт ошибку.

Если producer_id НЕ указан (пустая строка), то grpc-стрим тоже подключается к какой-то одной партиции и все сообщения стрима попадут в неё. Партицию можно будет указать явно или она будет выбрана сервером. При автоматическом выборе партиция выбирается каждый раз при подключении к серверу и в общем случае будет каждый раз новой.

Дедупликация сообщений на стороне сервера основана на тапле (topic, partition_id, producer_id, seq_no) и НЕ работает при отсутствии producer_id, т.е. при переотправке сообщений на сервере могут сохраняться дубли.

Режим записи без producer_id и без дедупликации может быть полезен когда идёт очень большой поток сообщений и обработка дедупликации вносит существенный вклад в нагрузку (крайне редкий сценарий), а дубли не являются проблемой.

StreamWriteMessage.FromClient.InitRequest.write_session_meta

StreamWriteMessage.FromClient.InitRequest.write_session_meta = 3 (map<string, string>) - Метаданные уровня сессии, при записи передаются один раз в начале стрима. При чтении будут присутствовать в каждом батче сообщений.

StreamWriteMessage.FromClient.InitRequest.partitioning

Задаёт партицию, в которую будут отправлены сообщения этого стрима.

StreamWriteMessage.FromClient.InitRequest.partitioning.message_group_id

StreamWriteMessage.FromClient.InitRequest.partitioning.message_group_id = 4 (string) - не поддерживается. Предполагалось использовать для маршрутизации сообщений между партициями на стороне сервера.

StreamWriteMessage.FromClient.InitRequest.partitioning.partition_id

StreamWriteMessage.FromClient.InitRequest.partitioning.partition_id = 5 (string) - номер партиции, в которую нужно отправлять сообщения.

StreamWriteMessage.FromClient.InitRequest.partitioning.partition_with_generation

StreamWriteMessage.FromClient.InitRequest.partitioning.partition_with_generation = 7 (PartitionWithGeneration) - указывает partition_id и конкретное поколение таблетки для подключения [[#PartitionWithGeneration, type]]. Используется когда нужно отслеживать рестарт таблеток, например при реализации прямой записи в партицию.

StreamWriteMessage.FromServer.InitResponse, type

StreamWriteMessage.FromServer.InitResponse.last_seq_no

StreamWriteMessage.FromServer.InitResponse.last_seq_no = 1 (string) - порядковый номер последнего сообщения, которое сохранённого в партиции для InitRequest.producer_id. Используется клиентом для дедупликации на своей стороне - чтобы не отправлять повторно уже сохранённые сообщения.

StreamWriteMessage.FromServer.InitResponse.session_id

StreamWriteMessage.FromServer.InitResponse.session_id = 2 (string) - уникальный идентификатор сессии записи на сервере. Используется для отладки - чтобы можно было найти серверные логи, относящиеся к этой сессии записи.

StreamWriteMessage.FromServer.InitResponse.partition_id

StreamWriteMessage.FromServer.InitResponse.partition_id = 3 (int64) - номер партиции, в которую будет идти запись из этой сессии.

StreamWriteMessage.FromServer.InitResponse.supported_codecs

supported_codecs = 4 ([[#SupportedCodecs, type]]) - ограничения по кодекам на топик. Поле опционально. Если оно отсутствует или список пуст - это означает что ограничений нет и можно передавать сообщения с любым кодеком. Если указаны ограничения, то клиент может передавать на сервер только сообщения, обработанные кодеками, входящими в этот список. Если при передаче сообщения указать кодек, не входящий в список - сервер вернёт ошибку BAD_REQUEST и закроет сессию записи.

StreamWriteMessage.FromServer.WriteRequest, type

StreamWriteMessage.FromServer.WriteRequest.messages

messages = 1 (repeated MessageData) - содержимое сообщений.

StreamWriteMessage.FromServer.WriteRequest.codec

codec = 2 (int32) - Кодек, которым обработаны данные (data) всех сообщений топика в этом grpc-сообщении. Внутри одной сессии записи можно использовать разные кодеки и менять их между grpc-сообщениями, но в рамках grpc-сообщения все сообщения топика должны быть обработаны с использованием только одного кодека.

StreamWriteMessage.FromServer.WriteRequest.tx

tx = 3 (TransactionIdentity) - идентификатор транзакции. Заполняется если сообщение должно быть отправлено внутри транзакции.

StreamWriteMessage.FromServer.WriteRequest.MessageData, type

StreamWriteMessage.FromServer.WriteRequest.MessageData.seq_no

seq_no = 1 (int64) - порядковый номер сообщения. Должен строго возрастать в пределах одного producer_id, если на сервер будет отправлено сообщение с seq_no равным или меньшим, тому что сервер уже получал для этого producer_id раньше, то сервер пропустит это сообщение и не будет его записывать. Так обеспечивается дедупликация сообщений на серверной стороне.
Между seq_no могут быть пропуски - это нормально. Т.е. можно отправить сообщения с номерами: 1,2,3,10,20 и они будут записаны. А если потом отправить сообщения 19,21 - то записано будет только сообщение с номером 21, а сообщение 19 будет пропущено, т.к. сервер уже видел до него сообщение с номером 20.

StreamWriteMessage.FromServer.WriteRequest.MessageData.created_at

created_at = 2 (google.protobuf.Timestamp) - время создания сообщения на клиенте. Определяется клиентом по своему усмотрению и никак сервером не проверяется.

StreamWriteMessage.FromServer.WriteRequest.MessageData.data

data = 3 (bytes) - содержимое сообщения, обработанное кодеком WriteRequest.codec.

StreamWriteMessage.FromServer.WriteRequest.MessageData.uncompressed_size

uncompressed_size = 4 (int64) - размер исходного сообщения в байтах, до обработки кодеком. Может использоваться читателем например для определения размера буфера при распаковке сообщения.

StreamWriteMessage.FromServer.WriteRequest.MessageData.partitioning

partitioning (oneof) - определяет в какую партицию попадёт конкретное сообщение. На момент описания не поддерживается сервером.

StreamWriteMessage.FromServer.WriteRequest.MessageData.message_group_id

message_group_id = 5 (string) - группа сообщений. Сообщения с одним таплом (producer_id, message_group_id) всегда попадают в одну партицию с сохранением порядка. На момент описания не поддерживается сервером.

StreamWriteMessage.FromServer.WriteRequest.MessageData.partition_id

partition_id = 6 (int64) - номер партиции топика, в которое должно быть записано сообщение. На момент описания не поддерживается сервером.

StreamWriteMessage.FromServer.WriteRequest.MessageData.metadata_items

metadata_items = 7 (MetadataItem) - метаданные сообщения. Пары ключ-значение. Ключи не должны повторяться, т.к. по смыслу это словарь и клиент с ними работает как со словарём. Список тут выбран исключительно из эффективности обработки при сериализации/десериализации. Эти значения НЕ обрабатываются кодеком. Используются для передачи небольших порций метаданных, на основании которых читатель может решать что делать с сообщением ещё до его распаковки.

StreamWriteMessage.FromServer.WriteRequest.MessageData.partition_with_generation

partition_with_generation = 8 (PartitionWithGeneration) - партиция с учётом поколения. На момент описания не поддерживается сервером.

StreamWriteMessage.FromServer.WriteResponse, type

Подтверждение сервером приёма сообщений

StreamWriteMessage.FromServer.WriteResponse.acks

acks = 1 (repeated WriteAck) - подтверждения о приёме на сервер каждого сообщения.

StreamWriteMessage.FromServer.WriteResponse.partition_id

partition_id = 2 (int64) - актуальный номер партиции, в которую были записаны топик-сообщения, подтверждающиеся в этом grpc-сообщении. Может отличаться от партиции в InitResponse и от других подтверждений в этой сессии записи.

StreamWriteMessage.FromServer.WriteResponse.write_statistics

write_statistics = 3 (WriteStatistics) - статистика о записи подтверждаемой порции топик-сообщений.

StreamWriteMessage.FromServer.WriteResponse.WriteAck, type

Подтверждение обработки сообщения сервером

StreamWriteMessage.FromServer.WriteResponse.WriteAck.seq_no

seq_no = 1 (int64), соответствует seq_no подтверждаемого сообщения.

StreamWriteMessage.FromServer.WriteResponse.WriteAck.message_write_status

message_write_status (oneof)
Вариант того как сообщение обработано сервером

StreamWriteMessage.FromServer.WriteResponse.WriteAck.written

written = 2 (Written) - сообщение успешно записано на сервер и доступно для чтения

StreamWriteMessage.FromServer.WriteResponse.WriteAck.skipped

skipped = 3 (Skipped) - сообщение пропущено сервером и не записано в топик.

StreamWriteMessage.FromServer.WriteResponse.WriteAck.written_in_tx

written_in_tx = 4 (WrittenInTx) - сообщение записано в транзакции. Тут приходит отдельный статус, т.к. на момент ответа сервером ещё неизвестен offset сообщения - он будет присвоен только после коммита транзакции, если он случится.

StreamWriteMessage.FromServer.WriteResponse.WriteAck.Written, type

StreamWriteMessage.FromServer.WriteResponse.WriteAck.Written.offset

offset = 1 (int64) - offset, с которым сообщение записано в партицию.

StreamWriteMessage.FromServer.WriteResponse.WriteAck.Skipped, type

StreamWriteMessage.FromServer.WriteResponse.WriteAck.Skipped.reason

reason = 1 (Reason) - причина пропуска сообщения

StreamWriteMessage.FromServer.WriteResponse.WriteAck.Reason, type

StreamWriteMessage.FromServer.WriteResponse.WriteAck.Reason.REASON_UNSPECIFIED, enum value

REASON_UNSPECIFIED = 0 - причина не указана (заглушка для обязательного нулевого значения, не должно использоваться в реальных ответах)

StreamWriteMessage.FromServer.WriteResponse.WriteAck.Reason.REASON_ALREADY_WRITTEN, enum value

REASON_ALREADY_WRITTEN = 1 - сообщение с таким номером (или больше) уже было записано на сервер.

StreamWriteMessage.FromServer.WriteResponse.WriteAck.WrittenInTx, type

Сообщение было записано внутри транзакции.

StreamWriteMessage.FromServer.WriteResponse.WriteAck.WriteStatistics, type

StreamWriteMessage.FromServer.WriteResponse.WriteAck.WriteStatistics.persisting_time

persisting_time = 1 (google.protobuf.Duration) - время, затраченное сервером на сохранение сообщение. Одинаковое для вчех сообщений в одном WriteResponse.

StreamWriteMessage.FromServer.WriteResponse.WriteAck.WriteStatistics.min_queue_wait_time

min_queue_wait_time = 2 (google.protobuf.Duration) - время, проведённое в очереди в ожидании сохранения. Минимальное из всех подтверждаемых сообщений.

StreamWriteMessage.FromServer.WriteResponse.WriteAck.WriteStatistics.max_queue_wait_time

max_queue_wait_time = 3 (google.protobuf.Duration) - время, подведённое в очереди в ожидании сохранения. Максимальное из всех подтверждаемых сообщений.

StreamWriteMessage.FromServer.WriteResponse.WriteAck.WriteStatistics.partition_quota_wait_time

partition_quota_wait_time = 4 (google.protobuf.Duration) - время, проведённое сообщениями в ожидании квоты на партицию.

StreamWriteMessage.FromServer.WriteResponse.WriteAck.WriteStatistics.topic_quota_wait_time

topic_quota_wait_time = 5 (google.protobuf.Duration) - время, проведённое сообщениями в ожидании квоты на топик.

TransactionIdentity, type

Идентификатор транзакции.

id = 1 (string) - собственно идентификатор тразкции

TransactionIdentity.session

session = 2 (string) - идентификатор сессии, которая обрабатывает транзацию.

PartitionWithGeneration, type

PartitionWithGeneration.partition_id

PartitionWithGeneration.partition_id = 1 (int64) - id партиции

PartitionWithGeneration.generation

PartitionWithGeneration.generation = 2 (int64) - поколение таблетки, увеличивается при каждом рестарте таблетки, в т.ч. при переездах между нодами.

Ссылки

Codec, type

Codec.CODEC_RAW, enum value

Codec.CODEC_RAW = 1 - сообщение передаётся в открытом виде, без сжатия и преобразований.

Codec.CODEC_CUSTOM, enum value

Codec.CODEC_CUSTOM - первый доступный идентификатор для определения кодеков на стороне клиента. Сервер со своей стороны никак не обрабатывает содержимое таких сообщений, но продолжает сверять кодеки по номерам, обеспечивая совместимость кодеков на читателях и писателях топика, если на топике заданы ограничения кодеков.

SupportedCodecs, type

SupportedCodecs.codecs

SupportedCodecs.codecs = 1 (repeated int32) - массив с номерами поддерживаемых кодеков. Номера кодеков соответствуют номерам из Codec enum. В protospec объявлены именно числом, т.к. предполагается что клиент может отправлять значения не входящие в перечисление - например собственные номера из клиентского диапазона.