Разработка YDB SDK

lЭта статья - черновик заметок по разработке SDK, может произвольным образом меняться, дополняться и удаляться. Версия НЕ стабильна.

Руководство для разработчиков SDK

Тут описываются общие подходы к построению SDK и реализации клиентов для YDB. Это не прямое указание к действию и не пошаговая инструкция, а скорее описание общих принципов разработки, которых придерхивается команда YDB. Это понимание будет полезно как для исследования кода существующих SDK, так и для разработки собственных.

Общие принципы

GRPC

SDK общается с сервером с по протоколу grpc c protobuf-сообщениями. Серверная версия протоспек есть в репозиторииYDB: сервисы/медоды и сообщения.

При этом все наши SDK, за исключением C++, в качестве источника используют репозиторий https://github.com/ydb-platform/ydb-api-protos. И для новых SDK рекомендуем использовать этот отдельный репозиторий.

Топики

Протокол работы с топиками основан на обмене сообщений с сервеом через двунаправленные стримы + есть отдельные методы, которые вызываются вне этих потоков. Protobuf-сообщения для работы с топиками описаны в файле: https://github.com/ydb-platform/ydb-api-protos/blob/master/protos/ydb_topic.proto

Запись сообщений в топики

Взаимодействие с пользовательским кодом

У SDK есть внутренний буфер, ограничивающий очередь сообщений по количеству и/или размеру в байтах. По-умолчанию клиентский под при вызове SDK записывает сообщения именно в буфер и не ждёт подтверждения от сервера. Писатель контролирует заполненность буфера, есть какой-то разумный лимит по-умолчанию (в штуках сообщений и/или в байтах). Если при попытке записи нового сообщения размер внутреннего буфера уже превышен - то по-умолчанию метод блокируется и ждёт освобожления места в буфере (можно задавать опции с таймаутами и прекращением ожидания, по-умолчанию ждёт вечно). Этот лимит призван ограничить потребление памяти внутренним буфером, чтобы не свалится по OOM, у него только самозащитная функция и нет задачи ограничивать действия клиента. Имеет смысл сознательно разрешить превышение размера буфера на 1 вызов Write (например если лимит в 10 записей, а пользователь в одном вызове передаёт 100 - то имеет смысл этот вызов пропустить, а следующие уже запретить - чтобы прогресс всегда двигался). При этом SDK обязуется доставить сообщения до сервера при отсутствии непоправимых ошибок (ошибки авторизации, завершение процесса и т.п.). Для случаев когда клиентскоиу коду нужно знать о том что сообщение уже точно записано на сервер мы делаем для этого отдельные методы. Базово объект/структура с реализацией записи сообщения у нас называется writer, метод отправки сообщения на сервер - write, а метод ожидания пока сообщения запишутся на сервер - flush.

В качестве результата отправки сообщений на сервер может возвращаться результат, в этом случае результат нужно возвращать по каждому сообщению. В результатах можно указать SeqNo сообщения (пользователь может его не знать, если пользуется функцией автонумерации), партифию и offset, в которые сообщение было записано.

Рекомендую делать возврат результата опциональным, например в дополнительном методе (и реализацию можно даже оставить на потом - до запроса клиентов), а в базовом метода не возвращать инфоракцию о записанных сообщениях.
Это позволит сохранить единообразие интерфейсов: синхронного (тут всё можем вернуть), асинхронного (тут результат будет только после подтверждения сервером, т.е. во фьюще, коллбэке и т.п.) и транзакционного (там до момента коммита транзакции неизвестен offset отправленных сообщений).

В go на данный момент нет способа получить метаданные отправленных сообщений (т.е. только ошибка или успех), за несколько лет небыло ни одного запроса на то чтобы эти метаданные добавить, так что может они и не очень нужны.

Прямая запись в партицию

Позволяет экономить сеть на внутренних пересылках сообщения внутри кластера. Общая идея: клиент перед подключением дескрайбит топик и узнаёт на какой ноде работает таблетка, обслуживающая нужную партицию и её поколение. Клиент подключается напрямую к ноде с таблеткой и указывает партицию и поколение. Если таблетка переехала - её поколение меняется, клиент получит ошибку, заново подескрайбит топик и снова подключится к правильной ноде.

TODO: описать разбор сообщений и ошибок

Запись сообщения в транзакции

С точки зрения API для пользователя для каждой транзакции создаётся отдельный транзакционный писатель. Все сообщения, отправленные через этого писателя будут опубликованы с коммитом транзакции. При завершении транзакции по любой причине - писатель прекращает свою работу и на все попытки записи возвращает ошибку.
По умолчанию писатель создаётся без привязки к конкретной партиции, без ProducerID и пишет сообщения без внутренних ретраев - при любой ошибке транзакция фейлится, аналогично ошибкам при работе с таблицами.

Внутри SDK может переиспользоваться пул писателей. При этом важно клиенту отдавать надстройки над писателями, а не сам объект писателя - чтобы предотвратить возможность его переиспользования между транзакциями (и защитить пользователя от незаметных ошибок). Тут стоит особенно аккуратно подойти к реализации обработки ошибок, т.к. писатели асинхронные и ошибку можно получить уже после отправки сообщения - когда писатель уже перевыдан другой транзакции. Нужно убедиться что по предыдущим сообщениям ошибок уже не будет (например переиспользщовать писателя только если получены все подтверждения и транзакция закоммитилась).

При вызове Writer.Write управление пользователю возвращается сразу, а сообщение складывается в буфер и может быть отправлено позже - вместе с другими сообщениями, которые могут придти позже. Тут можно переиспользовать механизм буферизации сообщений из обычного писателя. Для гарантии того что все сообщения будут доставлены на сервер перед коммитом - внутри SDK происходит подписка писателя на события транзакции. Транзакция отслеживает момент своего завершения (по ошибке) или шага с коммитом (как явный коммит, так и отправка флажка коммита вместе с запросом). А писатель на эти события реагирует: перед коммитом дожидается подтверждения о записи всех сообщений от сервера. Если произошла ошибка - фейлит транзакцию. При завершении транзакции - писатель закрывается или отправляется в пул для переиспользования.

Api транзакционного писателя должно повторять API обычного писателя на отправку сообщений. При этом можно исключить способы дождаться от сервера подтверждения о записи сообщения, т.к. это всегда происходит внутри SDK перед коммитом и SDK сам гарантирует что все сообщения, положенные в его буфер до коммита будут переданы на сервер и закоммитятся вместе с транзакцией.

С точки зрения протокола добавление транзакции к писателю - это отправка идентификаторов сессии и транзакции вместе с сообщением.

GRPC-сообщения

При отправке пачки сообщений на сервер важно учитывать лимит на размер одного grpc-сообщения. В случае, если скорость записи сообщений в буфер превышает скорость отправки, а размер буфера достаточно большой - может получиться ситуация, когда при сериализации буфера размер grpc-сообщения превысит лимит на размер grpc-сообщения, установленные на клиентской или серверной стороне в grpc-библиотеке. Эту ситуацию нужно проверять и когда происходит превышение - разбивать большое сообщение на несколько поменьше - чтобы они проходили в ограничения grpc.

Разбор протокола

На уровне протокола запись сообщений происходит через двунаправленный GRPC-стрим. Стрим открывается вызовом метода StreamWrite

Общая схема обмена сообщениями хорошо отражена в .proto-файле.

    // client                  server
    //         InitRequest(Topic, MessageGroupID, ...)
    //        ---------------->
    //         InitResponse(Partition, MaxSeqNo, ...)
    //        <----------------
    //         WriteRequest(data1, seqNo1)
    //        ---------------->
    //         WriteRequest(data2, seqNo2)
    //        ---------------->
    //         WriteResponse(seqNo1, offset1, ...)
    //        <----------------
    //         WriteRequest(data3, seqNo3)
    //        ---------------->
    //         WriteResponse(seqNo2, offset2, ...)
    //        <----------------
    //         [something went wrong] (status != SUCCESS, issues not empty)
    //        <----------------

Сначала клиент отправляет StreamWriteMessage.InitRequest, в котором сообщает параметры подключения. Дальше я опишу параметры и на что они влияют.

StreamWriteMessage.InitRequest

Дальше я подробно опишу состав приветственного сообщения и влияние полей на работу писателя.

StreamWriteMessage.InitRequest.path

string path = 1; - путь к топику, обязательный параметр. В рамках одного стрима можно писать сообщения только в один топик.

StreamWriteMessage.InitRequest.producer_id

string producer_id = 2 - идентификатор источника сообщений. Этот параметр участвует в выборе партиции для записи и в процессе дедупликации сообщений при отправке.

producer_id указан
Если producer_id указан, то он прикрепляется к конкретной партиции и все сообщения идут в эту партицию, в т.ч. при переподключениях. Это обеспечивается на стороне сервера. При попытке указать другую партицию вручную - произойдёт ошибка.

producer_id не указан

StreamWriteMessage.InitRequest.write_session_meta

Метаданные уровня сессии, при записи передаются один раз в начале стрима. При чтении будут присутствовать в каждом батче сообщений.

StreamWriteMessage.InitRequest.partitionong

Задаёт партицию, в которую будут отправлены сообщения этого стрима

message_group_id = 4 - не поддерживается. Предполагалось использовать для маршрутизации сообщений между партициями на стороне сервера, но от реализации отказались из-за низкой эффективности.

int64 partition_id = 5 - номер партиции, в которую нужно отправлять сообщения.

PartitionWithGeneration partition_with_generation = 7; - указывает partition_id и конкретное поколение таблетки, к которой хочет подключиться. Используется для реализации прямой записи в партицию.

Остановка писателя

При вызове метода остановки писатель переходит в режим закрытия и больше не принимает новых сообщений. На все попытки вызвать любые методы нужно возвращать ошибку, о том что работа писателя завершена. Можно разрешить повторное закрытие писателя без ошибок - это зависит от организации остального SDK и обычаев языка программирования.

При остановке писателя нужно дождаться подтверждения от сервера о записи всех сообщений. При этом внутри писатель может переподключаться к серверу, ретраить ошибки и т.п. Ожидание останавливается когда получены подтверждения для всех сообщений, которые писатель успел принять до вызова метода остановки или при получении НЕРЕТРАИБЕЛЬНОЙ ошибки. Опционально может задаваться таймаут ожидания, тогда ожидание ограничивается этим таймаутом - по его истечении все фоновые процессы принудительно останавливаются.

Возврат из метода остановки делается после завершения всех фоновых процессов.

Чтение сообщений из топика

Остановка читателя

При вызове метода остановки читатель переходит в режим закрытия и больше не обрабатывает запросы на чтение и коммиты сообщений. На все вызовы любых методов нужно возвращать ошибку о том, что работа читателя уже остановлена. Можно разрешить повторное закрытие читателя без ошибок - это зависит от организации остального SDK и обычаев языка программирования.

При остановке читателя нужно дождать подтверждения о коммитах, принятых до вызова метода остановки. Ожидание останавливается при получении подтверждения обо всех коммитах, принятых читателем до вызова метода закрытия или после завершения стрима по любой причине, переподключаться НЕ нужно. Коммиты сообщений возможны только внутри стрима, где они были прочитаны, поэтому переподключаться смысла нет - если коммиты не успели дойти до сервера - сообщения нужно будет получить и обработать заново.