Описание grpc-сообщений сервиса топиков в YDB
Эта статья - черновик заметок по разработке SDK, может произвольным образом меняться, дополняться и удаляться. Версия НЕ стабильна.
Описание структуры grpc-сообщений
MetadataItem - type
message MetadataItem {
string key = 1;
bytes value = 2;
}
MetadataItem.key
key = 1 (string) - ключ метаданных. Ключи не должны повторяться внутри одного массива с метаданными, т.к. по смыслу это словать и на клиентах используется как словарь.
MetadataItem.value
value = 2 (bytes) - значение элемента метаданных.
StreamWriteMessage.FromClient.InitRequest - type
// Handshake request that must be sent to server first.
message InitRequest {
// Full path of topic to write to.
string path = 1;
// Producer identifier of client data stream.
// Used for message deduplication by sequence numbers.
string producer_id = 2 [(Ydb.length).le = 2048];
// User metadata attached to this write session.
// Reader will get this session meta data with each message read.
map<string, string> write_session_meta = 3;
// Option for setting order on messages.
// If neither is set, no guarantees on ordering or partitions to write to.
oneof partitioning {
// All messages with given pair (producer_id, message_group_id) go to single partition in order of writes.
string message_group_id = 4 [(Ydb.length).le = 2048];
// Explicit partition id to write to.
int64 partition_id = 5;
// Explicit partition location to write to.
PartitionWithGeneration partition_with_generation = 7;
}
// Explicitly request for last sequential number
// It may be expensive, if producer wrote to many partitions before.
bool get_last_seq_no = 6;
}
StreamWriteMessage.FromClient.InitRequest.path
StreamWriteMessage.FromClient.InitRequest.path = 1 (string) - путь к топику, обязательный параметр. В рамках одного стрима можно писать сообщения только в один топик.
StreamWriteMessage.FromClient.InitRequest.producer_id
StreamWriteMessage.FromClient.InitRequest.producer_id = 2 (string) - идентификатор источника сообщений. Этот параметр участвует в выборе партиции для записи и в процессе дедупликации сообщений при отправке.
Если producer_id указан, то он прикрепляется к конкретной партиции и все сообщения этого producer_id идут в эту партицию, в т.ч. при переподключениях. Это гарантируется на стороне сервера. При попытке указать другую партицию вручную - сервер вернёт ошибку.
Если producer_id НЕ указан (пустая строка), то grpc-стрим тоже подключается к какой-то одной партиции и все сообщения стрима попадут в неё. Партицию можно будет указать явно или она будет выбрана сервером. При автоматическом выборе партиция выбирается каждый раз при подключении к серверу и в общем случае будет каждый раз новой.
Дедупликация сообщений на стороне сервера основана на тапле (topic, partition_id, producer_id, seq_no) и НЕ работает при отсутствии producer_id, т.е. при переотправке сообщений на сервере могут сохраняться дубли.
Режим записи без producer_id и без дедупликации может быть полезен когда идёт очень большой поток сообщений и обработка дедупликации вносит существенный вклад в нагрузку (крайне редкий сценарий), а дубли не являются проблемой.
StreamWriteMessage.FromClient.InitRequest.write_session_meta
StreamWriteMessage.FromClient.InitRequest.write_session_meta = 3 (map<string, string>) - Метаданные уровня сессии, при записи передаются один раз в начале стрима. При чтении будут присутствовать в каждом батче сообщений.
StreamWriteMessage.FromClient.InitRequest.partitioning
Задаёт партицию, в которую будут отправлены сообщения этого стрима.
StreamWriteMessage.FromClient.InitRequest.partitioning.message_group_id
StreamWriteMessage.FromClient.InitRequest.partitioning.message_group_id = 4 (string) - не поддерживается. Предполагалось использовать для маршрутизации сообщений между партициями на стороне сервера.
StreamWriteMessage.FromClient.InitRequest.partitioning.partition_id
StreamWriteMessage.FromClient.InitRequest.partitioning.partition_id = 5 (string) - номер партиции, в которую нужно отправлять сообщения.
StreamWriteMessage.FromClient.InitRequest.partitioning.partition_with_generation
StreamWriteMessage.FromClient.InitRequest.partitioning.partition_with_generation = 7 (PartitionWithGeneration) - указывает partition_id и конкретное поколение таблетки для подключения Описание grpc-сообщений сервиса топиков в YDB#PartitionWithGeneration - type. Используется когда нужно отслеживать рестарт таблеток, например при реализации прямой записи в партицию.
StreamWriteMessage.FromServer.InitResponse - type
message InitResponse {
// Last persisted message's sequence number for this producer.
// Zero for new producer.
int64 last_seq_no = 1;
// Unique identifier of write session. Used for debug purposes.
string session_id = 2;
// Identifier of partition that is matched for this write session.
int64 partition_id = 3;
// Client can only use compression codecs from this set to write messages to topic.
// Otherwise session will be closed with BAD_REQUEST.
SupportedCodecs supported_codecs = 4;
}
StreamWriteMessage.FromServer.InitResponse.last_seq_no
StreamWriteMessage.FromServer.InitResponse.last_seq_no = 1 (string) - порядковый номер последнего сообщения, которое сохранённого в партиции для InitRequest.producer_id. Используется клиентом для дедупликации на своей стороне - чтобы не отправлять повторно уже сохранённые сообщения.
StreamWriteMessage.FromServer.InitResponse.session_id
StreamWriteMessage.FromServer.InitResponse.session_id = 2 (string) - уникальный идентификатор сессии записи на сервере. Используется для отладки - чтобы можно было найти серверные логи, относящиеся к этой сессии записи.
StreamWriteMessage.FromServer.InitResponse.partition_id
StreamWriteMessage.FromServer.InitResponse.partition_id = 3 (int64) - номер партиции, в которую будет идти запись из этой сессии.
StreamWriteMessage.FromServer.InitResponse.supported_codecs
supported_codecs = 4 (Описание grpc-сообщений сервиса топиков в YDB#SupportedCodecs - type) - ограничения по кодекам на топик. Поле опционально. Если оно отсутствует или список пуст - это означает что ограничений нет и можно передавать сообщения с любым кодеком. Если указаны ограничения, то клиент может передавать на сервер только сообщения, обработанные кодеками, входящими в этот список. Если при передаче сообщения указать кодек, не входящий в список - сервер вернёт ошибку BAD_REQUEST и закроет сессию записи.
StreamWriteMessage.FromServer.WriteRequest - type
message WriteRequest {
repeated MessageData messages = 1;
// Codec that is used for data compression.
// See enum Codec above for values.
int32 codec = 2;
optional TransactionIdentity tx = 3;
...
}
StreamWriteMessage.FromServer.WriteRequest.messages
messages = 1 (repeated MessageData) - содержимое сообщений.
StreamWriteMessage.FromServer.WriteRequest.codec
codec = 2 (int32) - Кодек, которым обработаны данные (data) всех сообщений топика в этом grpc-сообщении. Внутри одной сессии записи можно использовать разные кодеки и менять их между grpc-сообщениями, но в рамках grpc-сообщения все сообщения топика должны быть обработаны с использованием только одного кодека.
StreamWriteMessage.FromServer.WriteRequest.tx
tx = 3 (TransactionIdentity) - идентификатор транзакции. Заполняется если сообщение должно быть отправлено внутри транзакции.
StreamWriteMessage.FromServer.WriteRequest.MessageData - type
message MessageData {
// Message sequence number, provided by client for deduplication.
// Starts at 1
int64 seq_no = 1;
// Creation timestamp
google.protobuf.Timestamp created_at = 2;
// Compressed client message body.
bytes data = 3;
// Uncompressed size of client message body.
int64 uncompressed_size = 4;
// Per-message override for respective write session settings.
oneof partitioning {
// All messages with given pair (producer_id, message_group_id) go to single partition in order of writes.
string message_group_id = 5 [(Ydb.length).le = 2048];
// Explicit partition id to write to.
int64 partition_id = 6;
// Explicit partition location to write to.
PartitionWithGeneration partition_with_generation = 8;
}
// Message metadata. Overall size is limited to 4096 symbols (all keys and values combined).
repeated MetadataItem metadata_items = 7 [(Ydb.size).le = 1000];
}
StreamWriteMessage.FromServer.WriteRequest.MessageData.seq_no
seq_no = 1 (int64) - порядковый номер сообщения. Должен строго возрастать в пределах одного producer_id, если на сервер будет отправлено сообщение с seq_no равным или меньшим, тому что сервер уже получал для этого producer_id раньше, то сервер пропустит это сообщение и не будет его записывать. Так обеспечивается дедупликация сообщений на серверной стороне.
Между seq_no могут быть пропуски - это нормально. Т.е. можно отправить сообщения с номерами: 1,2,3,10,20 и они будут записаны. А если потом отправить сообщения 19,21 - то записано будет только сообщение с номером 21, а сообщение 19 будет пропущено, т.к. сервер уже видел до него сообщение с номером 20.
StreamWriteMessage.FromServer.WriteRequest.MessageData.created_at
created_at = 2 (google.protobuf.Timestamp) - время создания сообщения на клиенте. Определяется клиентом по своему усмотрению и никак сервером не проверяется.
StreamWriteMessage.FromServer.WriteRequest.MessageData.data
data = 3 (bytes) - содержимое сообщения, обработанное кодеком WriteRequest.codec.
StreamWriteMessage.FromServer.WriteRequest.MessageData.uncompressed_size
uncompressed_size = 4 (int64) - размер исходного сообщения в байтах, до обработки кодеком. Может использоваться читателем например для определения размера буфера при распаковке сообщения.
StreamWriteMessage.FromServer.WriteRequest.MessageData.partitioning
partitioning (oneof) - определяет в какую партицию попадёт конкретное сообщение. На момент описания не поддерживается сервером.
StreamWriteMessage.FromServer.WriteRequest.MessageData.message_group_id
message_group_id = 5 (string) - группа сообщений. Сообщения с одним таплом (producer_id, message_group_id) всегда попадают в одну партицию с сохранением порядка. На момент описания не поддерживается сервером.
StreamWriteMessage.FromServer.WriteRequest.MessageData.partition_id
partition_id = 6 (int64) - номер партиции топика, в которое должно быть записано сообщение. На момент описания не поддерживается сервером.
StreamWriteMessage.FromServer.WriteRequest.MessageData.metadata_items
metadata_items = 7 (MetadataItem) - метаданные сообщения. Пары ключ-значение. Ключи не должны повторяться, т.к. по смыслу это словарь и клиент с ними работает как со словарём. Список тут выбран исключительно из эффективности обработки при сериализации/десериализации. Эти значения НЕ обрабатываются кодеком. Используются для передачи небольших порций метаданных, на основании которых читатель может решать что делать с сообщением ещё до его распаковки.
StreamWriteMessage.FromServer.WriteRequest.MessageData.partition_with_generation
partition_with_generation = 8 (PartitionWithGeneration) - партиция с учётом поколения. На момент описания не поддерживается сервером.
StreamWriteMessage.FromServer.WriteResponse - type
Подтверждение сервером приёма сообщений
StreamWriteMessage.FromServer.WriteResponse.acks
acks = 1 (repeated WriteAck) - подтверждения о приёме на сервер каждого сообщения.
StreamWriteMessage.FromServer.WriteResponse.partition_id
partition_id = 2 (int64) - актуальный номер партиции, в которую были записаны топик-сообщения, подтверждающиеся в этом grpc-сообщении. Может отличаться от партиции в InitResponse и от других подтверждений в этой сессии записи.
StreamWriteMessage.FromServer.WriteResponse.write_statistics
write_statistics = 3 (WriteStatistics) - статистика о записи подтверждаемой порции топик-сообщений.
StreamWriteMessage.FromServer.WriteResponse.WriteAck - type
message WriteAck {
// Sequence number as in WriteRequest.
int64 seq_no = 1;
// Either message is written for the first time or duplicate.
oneof message_write_status {
Written written = 2;
Skipped skipped = 3;
WrittenInTx written_in_tx = 4;
}
...
}
Подтверждение обработки сообщения сервером
StreamWriteMessage.FromServer.WriteResponse.WriteAck.seq_no
seq_no = 1 (int64), соответствует seq_no подтверждаемого сообщения.
StreamWriteMessage.FromServer.WriteResponse.WriteAck.message_write_status
message_write_status (oneof)
Вариант того как сообщение обработано сервером
StreamWriteMessage.FromServer.WriteResponse.WriteAck.written
written = 2 (Written) - сообщение успешно записано на сервер и доступно для чтения
StreamWriteMessage.FromServer.WriteResponse.WriteAck.skipped
skipped = 3 (Skipped) - сообщение пропущено сервером и не записано в топик.
StreamWriteMessage.FromServer.WriteResponse.WriteAck.written_in_tx
written_in_tx = 4 (WrittenInTx) - сообщение записано в транзакции. Тут приходит отдельный статус, т.к. на момент ответа сервером ещё неизвестен offset сообщения - он будет присвоен только после коммита транзакции, если он случится.
StreamWriteMessage.FromServer.WriteResponse.WriteAck.Written - type
message Written {
// Assigned partition offset.
int64 offset = 1;
}
StreamWriteMessage.FromServer.WriteResponse.WriteAck.Written.offset
offset = 1 (int64) - offset, с которым сообщение записано в партицию.
StreamWriteMessage.FromServer.WriteResponse.WriteAck.Skipped - type
message Skipped {
Reason reason = 1;
...
}
StreamWriteMessage.FromServer.WriteResponse.WriteAck.Skipped.reason
reason = 1 (Reason) - причина пропуска сообщения
StreamWriteMessage.FromServer.WriteResponse.WriteAck.Reason - type
enum Reason {
REASON_UNSPECIFIED = 0;
REASON_ALREADY_WRITTEN = 1;
}
StreamWriteMessage.FromServer.WriteResponse.WriteAck.Reason.REASON_UNSPECIFIED, enum value
REASON_UNSPECIFIED = 0 - причина не указана (заглушка для обязательного нулевого значения, не должно использоваться в реальных ответах)
StreamWriteMessage.FromServer.WriteResponse.WriteAck.Reason.REASON_ALREADY_WRITTEN, enum value
REASON_ALREADY_WRITTEN = 1 - сообщение с таким номером (или больше) уже было записано на сервер.
StreamWriteMessage.FromServer.WriteResponse.WriteAck.WrittenInTx - type
message WrittenInTx {
}
Сообщение было записано внутри транзакции.
StreamWriteMessage.FromServer.WriteResponse.WriteAck.WriteStatistics - type
// Message with write statistics.
message WriteStatistics {
// Time spent in persisting of data. Same for each message in response.
google.protobuf.Duration persisting_time = 1;
// Time spent in queue before persisting, minimal of all messages in response.
google.protobuf.Duration min_queue_wait_time = 2;
// Time spent in queue before persisting, maximal of all messages in response.
google.protobuf.Duration max_queue_wait_time = 3;
// Time spent awaiting for partition write quota. Same for each message in response.
google.protobuf.Duration partition_quota_wait_time = 4;
// Time spent awaiting for topic write quota. Same for each message in response.
google.protobuf.Duration topic_quota_wait_time = 5;
}
StreamWriteMessage.FromServer.WriteResponse.WriteAck.WriteStatistics.persisting_time
persisting_time = 1 (google.protobuf.Duration) - время, затраченное сервером на сохранение сообщение. Одинаковое для вчех сообщений в одном WriteResponse.
StreamWriteMessage.FromServer.WriteResponse.WriteAck.WriteStatistics.min_queue_wait_time
min_queue_wait_time = 2 (google.protobuf.Duration) - время, проведённое в очереди в ожидании сохранения. Минимальное из всех подтверждаемых сообщений.
StreamWriteMessage.FromServer.WriteResponse.WriteAck.WriteStatistics.max_queue_wait_time
max_queue_wait_time = 3 (google.protobuf.Duration) - время, подведённое в очереди в ожидании сохранения. Максимальное из всех подтверждаемых сообщений.
StreamWriteMessage.FromServer.WriteResponse.WriteAck.WriteStatistics.partition_quota_wait_time
partition_quota_wait_time = 4 (google.protobuf.Duration) - время, проведённое сообщениями в ожидании квоты на партицию.
StreamWriteMessage.FromServer.WriteResponse.WriteAck.WriteStatistics.topic_quota_wait_time
topic_quota_wait_time = 5 (google.protobuf.Duration) - время, проведённое сообщениями в ожидании квоты на топик.
TransactionIdentity - type
message TransactionIdentity {
// Transaction identifier from TableService.
string id = 1;
// Session identifier from TableService.
string session = 2;
}
Идентификатор транзакции.
TransactionIdentity.id
id = 1 (string) - собственно идентификатор тразкции
TransactionIdentity.session
session = 2 (string) - идентификатор сессии, которая обрабатывает транзацию.
PartitionWithGeneration - type
message PartitionWithGeneration {
// Partition identifier.
int64 partition_id = 1;
// Partition generation.
int64 generation = 2;
};
PartitionWithGeneration.partition_id
PartitionWithGeneration.partition_id = 1 (int64) - id партиции
PartitionWithGeneration.generation
PartitionWithGeneration.generation = 2 (int64) - поколение таблетки, увеличивается при каждом рестарте таблетки, в т.ч. при переездах между нодами.
Ссылки
Codec - type
enum Codec {
CODEC_UNSPECIFIED = 0;
CODEC_RAW = 1;
CODEC_GZIP = 2;
CODEC_LZOP = 3;
CODEC_ZSTD = 4;
reserved 5 to 9999;
// User-defined codecs from 10000 to 19999
CODEC_CUSTOM = 10000;
reserved 20000 to max;
}
Codec.CODEC_RAW, enum value
Codec.CODEC_RAW = 1 - сообщение передаётся в открытом виде, без сжатия и преобразований.
Codec.CODEC_CUSTOM, enum value
Codec.CODEC_CUSTOM - первый доступный идентификатор для определения кодеков на стороне клиента. Сервер со своей стороны никак не обрабатывает содержимое таких сообщений, но продолжает сверять кодеки по номерам, обеспечивая совместимость кодеков на читателях и писателях топика, если на топике заданы ограничения кодеков.
SupportedCodecs - type
message SupportedCodecs {
// List of supported codecs.
// See enum Codec above for values.
repeated int32 codecs = 1 [(Ydb.value) = "[1; 19999]", (Ydb.size).le = 100];
}
SupportedCodecs.codecs
SupportedCodecs.codecs = 1 (repeated int32) - массив с номерами поддерживаемых кодеков. Номера кодеков соответствуют номерам из Codec enum. В protospec объявлены именно числом, т.к. предполагается что клиент может отправлять значения не входящие в перечисление - например собственные номера из клиентского диапазона.