Описание grpc-сообщений сервиса топиков в YDB
Эта статья - черновик заметок по разработке SDK, может произвольным образом меняться, дополняться и удаляться. Версия НЕ стабильна.
Описание структуры grpc-сообщений
MetadataItem, type
message MetadataItem {
string key = 1;
bytes value = 2;
}
MetadataItem.key
key = 1
(string) - ключ метаданных. Ключи не должны повторяться внутри одного массива с метаданными, т.к. по смыслу это словать и на клиентах используется как словарь.
MetadataItem.value
value = 2
(bytes) - значение элемента метаданных.
StreamWriteMessage.FromClient.InitRequest, type
// Handshake request that must be sent to server first.
message InitRequest {
// Full path of topic to write to.
string path = 1;
// Producer identifier of client data stream.
// Used for message deduplication by sequence numbers.
string producer_id = 2 [(Ydb.length).le = 2048];
// User metadata attached to this write session.
// Reader will get this session meta data with each message read.
map<string, string> write_session_meta = 3;
// Option for setting order on messages.
// If neither is set, no guarantees on ordering or partitions to write to.
oneof partitioning {
// All messages with given pair (producer_id, message_group_id) go to single partition in order of writes.
string message_group_id = 4 [(Ydb.length).le = 2048];
// Explicit partition id to write to.
int64 partition_id = 5;
// Explicit partition location to write to.
PartitionWithGeneration partition_with_generation = 7;
}
// Explicitly request for last sequential number
// It may be expensive, if producer wrote to many partitions before.
bool get_last_seq_no = 6;
}
StreamWriteMessage.FromClient.InitRequest.path
StreamWriteMessage.FromClient.InitRequest.path = 1 (string)
- путь к топику, обязательный параметр. В рамках одного стрима можно писать сообщения только в один топик.
StreamWriteMessage.FromClient.InitRequest.producer_id
StreamWriteMessage.FromClient.InitRequest.producer_id = 2 (string)
- идентификатор источника сообщений. Этот параметр участвует в выборе партиции для записи и в процессе дедупликации сообщений при отправке.
Если producer_id
указан, то он прикрепляется к конкретной партиции и все сообщения этого producer_id
идут в эту партицию, в т.ч. при переподключениях. Это гарантируется на стороне сервера. При попытке указать другую партицию вручную - сервер вернёт ошибку.
Если producer_id
НЕ указан (пустая строка), то grpc-стрим тоже подключается к какой-то одной партиции и все сообщения стрима попадут в неё. Партицию можно будет указать явно или она будет выбрана сервером. При автоматическом выборе партиция выбирается каждый раз при подключении к серверу и в общем случае будет каждый раз новой.
Дедупликация сообщений на стороне сервера основана на тапле (topic, partition_id, producer_id, seq_no) и НЕ работает при отсутствии producer_id, т.е. при переотправке сообщений на сервере могут сохраняться дубли.
Режим записи без producer_id
и без дедупликации может быть полезен когда идёт очень большой поток сообщений и обработка дедупликации вносит существенный вклад в нагрузку (крайне редкий сценарий), а дубли не являются проблемой.
StreamWriteMessage.FromClient.InitRequest.write_session_meta
StreamWriteMessage.FromClient.InitRequest.write_session_meta = 3 (map<string, string>)
- Метаданные уровня сессии, при записи передаются один раз в начале стрима. При чтении будут присутствовать в каждом батче сообщений.
StreamWriteMessage.FromClient.InitRequest.partitioning
Задаёт партицию, в которую будут отправлены сообщения этого стрима.
StreamWriteMessage.FromClient.InitRequest.partitioning.message_group_id
StreamWriteMessage.FromClient.InitRequest.partitioning.message_group_id = 4 (string)
- не поддерживается. Предполагалось использовать для маршрутизации сообщений между партициями на стороне сервера.
StreamWriteMessage.FromClient.InitRequest.partitioning.partition_id
StreamWriteMessage.FromClient.InitRequest.partitioning.partition_id = 5 (string)
- номер партиции, в которую нужно отправлять сообщения.
StreamWriteMessage.FromClient.InitRequest.partitioning.partition_with_generation
StreamWriteMessage.FromClient.InitRequest.partitioning.partition_with_generation = 7 (PartitionWithGeneration)
- указывает partition_id
и конкретное поколение таблетки для подключения [[#PartitionWithGeneration, type]]. Используется когда нужно отслеживать рестарт таблеток, например при реализации прямой записи в партицию.
StreamWriteMessage.FromServer.InitResponse, type
message InitResponse {
// Last persisted message's sequence number for this producer.
// Zero for new producer.
int64 last_seq_no = 1;
// Unique identifier of write session. Used for debug purposes.
string session_id = 2;
// Identifier of partition that is matched for this write session.
int64 partition_id = 3;
// Client can only use compression codecs from this set to write messages to topic.
// Otherwise session will be closed with BAD_REQUEST.
SupportedCodecs supported_codecs = 4;
}
StreamWriteMessage.FromServer.InitResponse.last_seq_no
StreamWriteMessage.FromServer.InitResponse.last_seq_no = 1 (string)
- порядковый номер последнего сообщения, которое сохранённого в партиции для InitRequest.producer_id. Используется клиентом для дедупликации на своей стороне - чтобы не отправлять повторно уже сохранённые сообщения.
StreamWriteMessage.FromServer.InitResponse.session_id
StreamWriteMessage.FromServer.InitResponse.session_id = 2 (string)
- уникальный идентификатор сессии записи на сервере. Используется для отладки - чтобы можно было найти серверные логи, относящиеся к этой сессии записи.
StreamWriteMessage.FromServer.InitResponse.partition_id
StreamWriteMessage.FromServer.InitResponse.partition_id = 3 (int64)
- номер партиции, в которую будет идти запись из этой сессии.
StreamWriteMessage.FromServer.InitResponse.supported_codecs
supported_codecs = 4
([[#SupportedCodecs, type]]) - ограничения по кодекам на топик. Поле опционально. Если оно отсутствует или список пуст - это означает что ограничений нет и можно передавать сообщения с любым кодеком. Если указаны ограничения, то клиент может передавать на сервер только сообщения, обработанные кодеками, входящими в этот список. Если при передаче сообщения указать кодек, не входящий в список - сервер вернёт ошибку BAD_REQUEST
и закроет сессию записи.
StreamWriteMessage.FromServer.WriteRequest, type
message WriteRequest {
repeated MessageData messages = 1;
// Codec that is used for data compression.
// See enum Codec above for values.
int32 codec = 2;
optional TransactionIdentity tx = 3;
...
}
StreamWriteMessage.FromServer.WriteRequest.messages
messages = 1
(repeated MessageData) - содержимое сообщений.
StreamWriteMessage.FromServer.WriteRequest.codec
codec = 2
(int32) - Кодек, которым обработаны данные (data) всех сообщений топика в этом grpc-сообщении. Внутри одной сессии записи можно использовать разные кодеки и менять их между grpc-сообщениями, но в рамках grpc-сообщения все сообщения топика должны быть обработаны с использованием только одного кодека.
StreamWriteMessage.FromServer.WriteRequest.tx
tx = 3
(TransactionIdentity) - идентификатор транзакции. Заполняется если сообщение должно быть отправлено внутри транзакции.
StreamWriteMessage.FromServer.WriteRequest.MessageData, type
message MessageData {
// Message sequence number, provided by client for deduplication.
// Starts at 1
int64 seq_no = 1;
// Creation timestamp
google.protobuf.Timestamp created_at = 2;
// Compressed client message body.
bytes data = 3;
// Uncompressed size of client message body.
int64 uncompressed_size = 4;
// Per-message override for respective write session settings.
oneof partitioning {
// All messages with given pair (producer_id, message_group_id) go to single partition in order of writes.
string message_group_id = 5 [(Ydb.length).le = 2048];
// Explicit partition id to write to.
int64 partition_id = 6;
// Explicit partition location to write to.
PartitionWithGeneration partition_with_generation = 8;
}
// Message metadata. Overall size is limited to 4096 symbols (all keys and values combined).
repeated MetadataItem metadata_items = 7 [(Ydb.size).le = 1000];
}
StreamWriteMessage.FromServer.WriteRequest.MessageData.seq_no
seq_no = 1
(int64) - порядковый номер сообщения. Должен строго возрастать в пределах одного producer_id, если на сервер будет отправлено сообщение с seq_no
равным или меньшим, тому что сервер уже получал для этого producer_id
раньше, то сервер пропустит это сообщение и не будет его записывать. Так обеспечивается дедупликация сообщений на серверной стороне.
Между seq_no могут быть пропуски - это нормально. Т.е. можно отправить сообщения с номерами: 1,2,3,10,20 и они будут записаны. А если потом отправить сообщения 19,21 - то записано будет только сообщение с номером 21, а сообщение 19 будет пропущено, т.к. сервер уже видел до него сообщение с номером 20.
StreamWriteMessage.FromServer.WriteRequest.MessageData.created_at
created_at = 2
(google.protobuf.Timestamp) - время создания сообщения на клиенте. Определяется клиентом по своему усмотрению и никак сервером не проверяется.
StreamWriteMessage.FromServer.WriteRequest.MessageData.data
data = 3
(bytes) - содержимое сообщения, обработанное кодеком WriteRequest.codec.
StreamWriteMessage.FromServer.WriteRequest.MessageData.uncompressed_size
uncompressed_size = 4
(int64) - размер исходного сообщения в байтах, до обработки кодеком. Может использоваться читателем например для определения размера буфера при распаковке сообщения.
StreamWriteMessage.FromServer.WriteRequest.MessageData.partitioning
partitioning
(oneof) - определяет в какую партицию попадёт конкретное сообщение. На момент описания не поддерживается сервером.
StreamWriteMessage.FromServer.WriteRequest.MessageData.message_group_id
message_group_id = 5
(string) - группа сообщений. Сообщения с одним таплом (producer_id, message_group_id) всегда попадают в одну партицию с сохранением порядка. На момент описания не поддерживается сервером.
StreamWriteMessage.FromServer.WriteRequest.MessageData.partition_id
partition_id = 6
(int64) - номер партиции топика, в которое должно быть записано сообщение. На момент описания не поддерживается сервером.
StreamWriteMessage.FromServer.WriteRequest.MessageData.metadata_items
metadata_items = 7
(MetadataItem) - метаданные сообщения. Пары ключ-значение. Ключи не должны повторяться, т.к. по смыслу это словарь и клиент с ними работает как со словарём. Список тут выбран исключительно из эффективности обработки при сериализации/десериализации. Эти значения НЕ обрабатываются кодеком. Используются для передачи небольших порций метаданных, на основании которых читатель может решать что делать с сообщением ещё до его распаковки.
StreamWriteMessage.FromServer.WriteRequest.MessageData.partition_with_generation
partition_with_generation = 8
(PartitionWithGeneration) - партиция с учётом поколения. На момент описания не поддерживается сервером.
StreamWriteMessage.FromServer.WriteResponse, type
Подтверждение сервером приёма сообщений
StreamWriteMessage.FromServer.WriteResponse.acks
acks = 1
(repeated WriteAck) - подтверждения о приёме на сервер каждого сообщения.
StreamWriteMessage.FromServer.WriteResponse.partition_id
partition_id = 2
(int64) - актуальный номер партиции, в которую были записаны топик-сообщения, подтверждающиеся в этом grpc-сообщении. Может отличаться от партиции в InitResponse и от других подтверждений в этой сессии записи.
StreamWriteMessage.FromServer.WriteResponse.write_statistics
write_statistics = 3
(WriteStatistics) - статистика о записи подтверждаемой порции топик-сообщений.
StreamWriteMessage.FromServer.WriteResponse.WriteAck, type
message WriteAck {
// Sequence number as in WriteRequest.
int64 seq_no = 1;
// Either message is written for the first time or duplicate.
oneof message_write_status {
Written written = 2;
Skipped skipped = 3;
WrittenInTx written_in_tx = 4;
}
...
}
Подтверждение обработки сообщения сервером
StreamWriteMessage.FromServer.WriteResponse.WriteAck.seq_no
seq_no = 1
(int64), соответствует seq_no подтверждаемого сообщения.
StreamWriteMessage.FromServer.WriteResponse.WriteAck.message_write_status
message_write_status
(oneof)
Вариант того как сообщение обработано сервером
StreamWriteMessage.FromServer.WriteResponse.WriteAck.written
written = 2
(Written) - сообщение успешно записано на сервер и доступно для чтения
StreamWriteMessage.FromServer.WriteResponse.WriteAck.skipped
skipped = 3
(Skipped) - сообщение пропущено сервером и не записано в топик.
StreamWriteMessage.FromServer.WriteResponse.WriteAck.written_in_tx
written_in_tx = 4
(WrittenInTx) - сообщение записано в транзакции. Тут приходит отдельный статус, т.к. на момент ответа сервером ещё неизвестен offset сообщения - он будет присвоен только после коммита транзакции, если он случится.
StreamWriteMessage.FromServer.WriteResponse.WriteAck.Written, type
message Written {
// Assigned partition offset.
int64 offset = 1;
}
StreamWriteMessage.FromServer.WriteResponse.WriteAck.Written.offset
offset = 1
(int64) - offset, с которым сообщение записано в партицию.
StreamWriteMessage.FromServer.WriteResponse.WriteAck.Skipped, type
message Skipped {
Reason reason = 1;
...
}
StreamWriteMessage.FromServer.WriteResponse.WriteAck.Skipped.reason
reason = 1
(Reason) - причина пропуска сообщения
StreamWriteMessage.FromServer.WriteResponse.WriteAck.Reason, type
enum Reason {
REASON_UNSPECIFIED = 0;
REASON_ALREADY_WRITTEN = 1;
}
StreamWriteMessage.FromServer.WriteResponse.WriteAck.Reason.REASON_UNSPECIFIED, enum value
REASON_UNSPECIFIED = 0
- причина не указана (заглушка для обязательного нулевого значения, не должно использоваться в реальных ответах)
StreamWriteMessage.FromServer.WriteResponse.WriteAck.Reason.REASON_ALREADY_WRITTEN, enum value
REASON_ALREADY_WRITTEN = 1
- сообщение с таким номером (или больше) уже было записано на сервер.
StreamWriteMessage.FromServer.WriteResponse.WriteAck.WrittenInTx, type
message WrittenInTx {
}
Сообщение было записано внутри транзакции.
StreamWriteMessage.FromServer.WriteResponse.WriteAck.WriteStatistics, type
// Message with write statistics.
message WriteStatistics {
// Time spent in persisting of data. Same for each message in response.
google.protobuf.Duration persisting_time = 1;
// Time spent in queue before persisting, minimal of all messages in response.
google.protobuf.Duration min_queue_wait_time = 2;
// Time spent in queue before persisting, maximal of all messages in response.
google.protobuf.Duration max_queue_wait_time = 3;
// Time spent awaiting for partition write quota. Same for each message in response.
google.protobuf.Duration partition_quota_wait_time = 4;
// Time spent awaiting for topic write quota. Same for each message in response.
google.protobuf.Duration topic_quota_wait_time = 5;
}
StreamWriteMessage.FromServer.WriteResponse.WriteAck.WriteStatistics.persisting_time
persisting_time = 1
(google.protobuf.Duration) - время, затраченное сервером на сохранение сообщение. Одинаковое для вчех сообщений в одном WriteResponse.
StreamWriteMessage.FromServer.WriteResponse.WriteAck.WriteStatistics.min_queue_wait_time
min_queue_wait_time = 2
(google.protobuf.Duration) - время, проведённое в очереди в ожидании сохранения. Минимальное из всех подтверждаемых сообщений.
StreamWriteMessage.FromServer.WriteResponse.WriteAck.WriteStatistics.max_queue_wait_time
max_queue_wait_time = 3
(google.protobuf.Duration) - время, подведённое в очереди в ожидании сохранения. Максимальное из всех подтверждаемых сообщений.
StreamWriteMessage.FromServer.WriteResponse.WriteAck.WriteStatistics.partition_quota_wait_time
partition_quota_wait_time = 4
(google.protobuf.Duration) - время, проведённое сообщениями в ожидании квоты на партицию.
StreamWriteMessage.FromServer.WriteResponse.WriteAck.WriteStatistics.topic_quota_wait_time
topic_quota_wait_time = 5
(google.protobuf.Duration) - время, проведённое сообщениями в ожидании квоты на топик.
TransactionIdentity, type
message TransactionIdentity {
// Transaction identifier from TableService.
string id = 1;
// Session identifier from TableService.
string session = 2;
}
Идентификатор транзакции.
TransactionIdentity.id
id = 1
(string) - собственно идентификатор тразкции
TransactionIdentity.session
session = 2
(string) - идентификатор сессии, которая обрабатывает транзацию.
PartitionWithGeneration, type
message PartitionWithGeneration {
// Partition identifier.
int64 partition_id = 1;
// Partition generation.
int64 generation = 2;
};
PartitionWithGeneration.partition_id
PartitionWithGeneration.partition_id = 1 (int64)
- id партиции
PartitionWithGeneration.generation
PartitionWithGeneration.generation = 2 (int64)
- поколение таблетки, увеличивается при каждом рестарте таблетки, в т.ч. при переездах между нодами.
Ссылки
Codec, type
enum Codec {
CODEC_UNSPECIFIED = 0;
CODEC_RAW = 1;
CODEC_GZIP = 2;
CODEC_LZOP = 3;
CODEC_ZSTD = 4;
reserved 5 to 9999;
// User-defined codecs from 10000 to 19999
CODEC_CUSTOM = 10000;
reserved 20000 to max;
}
Codec.CODEC_RAW, enum value
Codec.CODEC_RAW = 1
- сообщение передаётся в открытом виде, без сжатия и преобразований.
Codec.CODEC_CUSTOM, enum value
Codec.CODEC_CUSTOM
- первый доступный идентификатор для определения кодеков на стороне клиента. Сервер со своей стороны никак не обрабатывает содержимое таких сообщений, но продолжает сверять кодеки по номерам, обеспечивая совместимость кодеков на читателях и писателях топика, если на топике заданы ограничения кодеков.
SupportedCodecs, type
message SupportedCodecs {
// List of supported codecs.
// See enum Codec above for values.
repeated int32 codecs = 1 [(Ydb.value) = "[1; 19999]", (Ydb.size).le = 100];
}
SupportedCodecs.codecs
SupportedCodecs.codecs = 1 (repeated int32)
- массив с номерами поддерживаемых кодеков. Номера кодеков соответствуют номерам из Codec enum. В protospec объявлены именно числом, т.к. предполагается что клиент может отправлять значения не входящие в перечисление - например собственные номера из клиентского диапазона.