diff options
| author | Kuzin Roman <[email protected]> | 2026-06-29 09:21:22 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2026-06-29 09:21:22 +0300 |
| commit | a6ca10da47c615bf12160ccf3d6a94e48ddb39d2 (patch) | |
| tree | a5dd2944f77f092478658ad729413dd512028d0e | |
| parent | ea1a8ec1e4504ec92dd52ee1a1ea6c24bbcfcaac (diff) | |
Co-authored-by: Roman Kuzin <[email protected]>
Co-authored-by: sintjuri <[email protected]>
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
| -rw-r--r-- | ydb/docs/ru/core/reference/ydb-sdk/topic.md | 459 |
1 files changed, 358 insertions, 101 deletions
diff --git a/ydb/docs/ru/core/reference/ydb-sdk/topic.md b/ydb/docs/ru/core/reference/ydb-sdk/topic.md index 4679919096f..40d20e6737e 100644 --- a/ydb/docs/ru/core/reference/ydb-sdk/topic.md +++ b/ydb/docs/ru/core/reference/ydb-sdk/topic.md @@ -698,21 +698,69 @@ - C++ - Подключение к топику на запись представлено объектом сессии записи с интерфейсом `IWriteSession` или `ISimpleBlockingWriteSession` (вариант для простой записи по одному сообщению без подтверждения, блокирующейся при превышении числа inflight записей или размера буфера SDK). Настройки сессии записи представлены структурой `TWriteSessionSettings`, для варианта `ISimpleBlockingWriteSession` часть настроек не поддерживается. + В C++ SDK для записи в топик доступно три варианта API. Базовые настройки (буферизация, кодеки, ретраи) одинаковы у всех трёх и заданы структурой `TWriteSessionSettings`, поэтому ниже подсказки сосредоточены на отличиях и сценариях использования. + - `IWriteSession` — низкоуровневая сессия записи в одну партицию с полным набором возможностей: цикл событий (`TReadyToAcceptEvent`, `TAcksEvent`, `TSessionClosedEvent`), явная back-pressure через `TContinuationToken`, поштучные подтверждения по каждому сообщению, отправка предварительно сжатых данных через `WriteEncoded`. Структура `TWriteSessionSettings` определена именно здесь — два других варианта её переиспользуют. Подходит, когда нужен статус каждого сообщения, кастомная асинхронная логика или ручное управление сжатием. + - `ISimpleBlockingWriteSession` — синхронный fire-and-forget API для записи в одну партицию топика, самый простой вариант. Метод `Write(message, blockTimeout)` кладёт сообщение во внутренний буфер, отправка на сервер идёт в фоне. В нормальном режиме вызов возвращается мгновенно и блокируется только при переполнении буфера (по `MaxMemoryUsage` / `MaxInflightCount`), не дольше `blockTimeout`. Возврат `false` означает, что сообщение **не попало в буфер** и потеряно. Подтверждений по отдельным сообщениям нет; убедиться, что весь буфер доставлен на сервер, можно только вызвав `Close()` — он ждёт ack от сервера. Подходит, когда достаточно гарантии «всё или ничего» к моменту закрытия сессии и нужен простой синхронный код. + - `IProducer` — высокоуровневый API поверх нескольких сессий записи: прозрачно шардирует сообщения по партициям топика по ключу. Вдохновлён интерфейсом Producer из Apache Kafka, но учитывает особенности {{ ydb-short-name }} и при работе с топиками с [автопартиционированием](../../concepts/datamodel/topic.md#autopartitioning) даёт полные гарантии порядка и exactly-once. Подтверждения от сервера доступны через обработчик `AcksHandler`; дождаться доставки накопленного буфера — `Flush()`, корректно завершить работу — `Close()`. Подходит, когда нужно писать в **многопартиционный** топик с маршрутизацией по ключу. - Полный список настроек смотри [в заголовочном файле](https://github.com/ydb-platform/ydb/blob/d2d07d368cd8ffd9458cc2e33798ee4ac86c733c/ydb/public/sdk/cpp/client/ydb_topic/topic.h#L1199). + {% list tabs %} - Пример создания сессии записи с интерфейсом `IWriteSession`. + - IWriteSession - ```cpp - std::string producerAndGroupID = "group-id"; - auto settings = NYdb::NTopic::TWriteSessionSettings() - .Path("my-topic") - .ProducerId(producerAndGroupID) - .MessageGroupId(producerAndGroupID); + `IWriteSession` — базовая сессия записи, от которой наследуют настройки другие варианты записи. Настройки сессии записи представлены структурой `TWriteSessionSettings`, для варианта `ISimpleBlockingWriteSession` часть настроек не поддерживается. - auto session = topicClient.CreateWriteSession(settings); - ``` + Полный список настроек смотри [в заголовочном файле](https://github.com/ydb-platform/ydb/blob/main/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/write_session.h#L56). + + ```cpp + std::string producerAndGroupID = "group-id"; + auto settings = NYdb::NTopic::TWriteSessionSettings() + .Path("my-topic") + .ProducerId(producerAndGroupID) + .MessageGroupId(producerAndGroupID); + + auto session = topicClient.CreateWriteSession(settings); + ``` + + - ISimpleBlockingWriteSession + + `ISimpleBlockingWriteSession` — простой синхронный вариант сессии записи `IWriteSession` для записи по одному сообщению без подтверждения по каждому сообщению. Метод `Write` блокируется при превышении числа inflight записей или размера буфера SDK. Настройки сессии записи представлены структурой `TWriteSessionSettings`, как и в случае `IWriteSession`, однако часть настроек не поддерживается. + + Полный список настроек смотри [в заголовочном файле](https://github.com/ydb-platform/ydb/blob/main/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/write_session.h#L56). + + ```cpp + std::string producerAndGroupID = "group-id"; + auto settings = NYdb::NTopic::TWriteSessionSettings() + .Path("my-topic") + .ProducerId(producerAndGroupID) + .MessageGroupId(producerAndGroupID); + + auto session = topicClient.CreateSimpleBlockingWriteSession(settings); + ``` + + - IProducer + + `IProducer` — высокоуровневый API поверх сессий записи: один объект скрывает управление несколькими сессиями и автоматически выбирает партицию по ключу сообщения. Настройки продюсера представлены структурой `TProducerSettings`, которая наследует `TWriteSessionSettings`, поэтому общие настройки записи совпадают с `IWriteSession`. + + Настройки задаются через `TProducerSettings`: + + - `ProducerIdPrefix` — префикс producer id для подсессий записи; + - `PartitionChooserStrategy` — стратегия выбора партиции по ключу сообщения: + - `Bound` — ключ сопоставляется с диапазонами партиций топика (`FromBound`/`ToBound` из описания топика). По умолчанию перед сопоставлением ключ проходит через MurmurHash64. Рекомендуется для топиков с [автопартиционированием](../../concepts/datamodel/topic.md#autopartitioning): при разделении партиции SDK обновляет границы и продолжает направлять сообщения с тем же ключом в корректный диапазон. + - `KafkaHash` — по аналогии с Kafka: от ключа считается MurmurHash, индекс партиции — остаток от деления хеша на число партиций. Удобно при миграции с Kafka. Не поддерживается при включённом автопартиционировании. + - `PartitioningKeyHasher` — функция преобразования ключа перед сопоставлением с диапазонами; используется только для стратегии `Bound`. Можно задать свою, например чтобы в сравнении участвовал исходный ключ без хеширования. + + Полный список настроек смотри [в заголовочном файле](https://github.com/ydb-platform/ydb/blob/main/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/producer.h#L10). + + ```cpp + auto producerSettings = NYdb::NTopic::TProducerSettings() + .Path("my-topic") + .ProducerIdPrefix("my-producer") + .PartitionChooserStrategy(NYdb::NTopic::EPartitionChooserStrategy::Bound); + + auto producer = topicClient.CreateProducer(producerSettings); + ``` + + {% endlist %} - Go @@ -861,38 +909,70 @@ - C++ - Асинхронная запись возможна через интерфейс `IWriteSession`. + {% list tabs %} - Работа пользователя с объектом `IWriteSession` в общем устроена как обработка цикла событий с тремя типами событий: `TReadyToAcceptEvent`, `TAcksEvent` и `TSessionClosedEvent`. + - IWriteSession - Для каждого из типов событий можно установить обработчик этого события, а также можно установить общий обработчик. Обработчики устанавливаются в настройках сессии записи перед её созданием. + Работа с объектом `IWriteSession` устроена как обработка цикла событий с тремя типами событий: `TReadyToAcceptEvent`, `TAcksEvent` и `TSessionClosedEvent`. - Если обработчик для некоторого события не установлен, его необходимо получить и обработать в методах `GetEvent` / `GetEvents`. Для неблокирующего ожидания очередного события есть метод `WaitEvent` с интерфейсом `TFuture<void>()`. + Для каждого из типов событий можно установить обработчик этого события, а также можно установить общий обработчик. Обработчики устанавливаются в настройках сессии записи перед её созданием. - Для записи каждого сообщения пользователь должен "потратить" move-only объект `TContinuationToken`, который выдаёт SDK с событием `TReadyToAcceptEvent`. При записи сообщения можно установить пользовательские seqNo и временную метку создания, но по умолчанию их проставляет SDK автоматически. + Если обработчик для некоторого события не установлен, его необходимо получить и обработать в методах `GetEvent` / `GetEvents`. Для неблокирующего ожидания очередного события есть метод `WaitEvent` с интерфейсом `TFuture<void>()`. - По умолчанию `Write` выполняется асинхронно - данные из сообщений вычитываются и сохраняются во внутренний буфер, отправка происходит в фоне в соответствии с настройками `MaxMemoryUsage`, `MaxInflightCount`, `BatchFlushInterval`, `BatchFlushSizeBytes`. Сессия сама переподключается к {{ ydb-short-name }} при обрывах связи и повторяет отправку сообщений пока это возможно, в соответствии с настройкой `RetryPolicy`. При получении ошибки, после которой невозможно продолжить работу, сессия чтения отправляет пользователю `TSessionClosedEvent` с диагностической информацией. + Для записи каждого сообщения пользователь должен "потратить" move-only объект `TContinuationToken`, который выдаёт SDK с событием `TReadyToAcceptEvent`. При записи сообщения можно установить пользовательские seqNo и временную метку создания, но по умолчанию их проставляет SDK автоматически. - Так может выглядеть запись нескольких сообщений в цикле событий без использования обработчиков: + По умолчанию `Write` выполняется асинхронно — данные из сообщений вычитываются и сохраняются во внутренний буфер, отправка происходит в фоне в соответствии с настройками `MaxMemoryUsage`, `MaxInflightCount`, `BatchFlushInterval`, `BatchFlushSizeBytes`. Сессия сама переподключается к {{ ydb-short-name }} при обрывах связи и повторяет отправку сообщений пока это возможно, в соответствии с настройкой `RetryPolicy`. При получении ошибки, после которой невозможно продолжить работу, сессия записи отправляет пользователю `TSessionClosedEvent` с диагностической информацией. - ```cpp - // Event loop - while (true) { - // Get event - // May block for a while if write session is busy - std::optional<NYdb::NTopic::TWriteSessionEvent::TEvent> event = session->GetEvent(/*block=*/true); + Так может выглядеть запись нескольких сообщений в цикле событий без использования обработчиков: - if (auto* readyEvent = std::get_if<NYdb::NTopic::TWriteSessionEvent::TReadyToAcceptEvent>(&*event)) { - session->Write(std::move(event.ContinuationToken), "This is yet another message."); + ```cpp + // Event loop + while (true) { + // Get event + // May block for a while if write session is busy + std::optional<NYdb::NTopic::TWriteSessionEvent::TEvent> event = session->GetEvent(/*block=*/true); - } else if (auto* ackEvent = std::get_if<NYdb::NTopic::TWriteSessionEvent::TAcksEvent>(&*event)) { - std::cout << ackEvent->DebugString() << std::endl; + if (auto* readyEvent = std::get_if<NYdb::NTopic::TWriteSessionEvent::TReadyToAcceptEvent>(&*event)) { + session->Write(std::move(event.ContinuationToken), "This is yet another message."); - } else if (auto* closeSessionEvent = std::get_if<NYdb::NTopic::TSessionClosedEvent>(&*event)) { - break; - } - } - ``` + } else if (auto* ackEvent = std::get_if<NYdb::NTopic::TWriteSessionEvent::TAcksEvent>(&*event)) { + std::cout << ackEvent->DebugString() << std::endl; + + } else if (auto* closeSessionEvent = std::get_if<NYdb::NTopic::TSessionClosedEvent>(&*event)) { + break; + } + } + ``` + + - ISimpleBlockingWriteSession + + Как упрощённый вариант `IWriteSession`, `ISimpleBlockingWriteSession` пишет через ту же внутреннюю буферизацию, но без цикла событий: не использует `ContinuationToken` и не возвращает подтверждения через `TAcksEvent`. Метод `Write` кладёт сообщение во внутренний буфер; если буфер переполнен, вызов блокируется до появления места. Параметр `blockTimeout` ограничивает время ожидания. Метод возвращает `true`, если сообщение принято в буфер, и `false`, если за отведённое время записать не удалось. + + Отправка на сервер, как и у `IWriteSession`, выполняется в фоне. Чтобы дождаться завершения всех записей и закрыть сессию, вызовите `Close()`. + + ```cpp + auto messageData = std::string("message"); + NYdb::NTopic::TWriteMessage writeMessage(messageData); + session->Write(std::move(writeMessage)); + ``` + + - IProducer + + `TProducerSettings` наследует `TWriteSessionSettings`, поэтому буферизация, отправка и переподключение устроены так же, как у `IWriteSession`: `Write` кладёт сообщение во внутренний буфер, отправка на сервер идёт в фоне в соответствии с настройками `MaxMemoryUsage`, `MaxInflightCount`, `BatchFlushInterval`, `BatchFlushSizeBytes`. Продюсер переподключается к {{ ydb-short-name }} при обрывах связи и повторяет отправку, пока это возможно, в соответствии с `RetryPolicy`. При неустранимой ошибке продюсер закрывается; статус и причину можно получить из результата `Write` или `Flush`. + + `Flush` дожидается доставки накопленных данных на сервер; `Close` дожидается отправки оставшихся в буфере сообщений и завершает работу продюсера. + + ```cpp + auto messageData = std::string("order-created"); + // First argument is the partitioning key — the SDK chooses a partition by it. + NYdb::NTopic::TWriteMessage writeMessage("user-42", messageData); + producer->Write(std::move(writeMessage)); + producer->Flush().GetValueSync(); + ``` + + Подробный пример см. в [репозитории ydb-platform/ydb](https://github.com/ydb-platform/ydb/tree/main/ydb/public/sdk/cpp/examples/topic_writer/producer/basic_write). + + {% endlist %} - Go @@ -913,6 +993,37 @@ } ``` + Для записи по ключу в несколько партиций используйте `WithWriteToManyPartitions(...)` при создании писателя и заполняйте поле `Key` в `topicwriter.Message`. + + Стратегии маршрутизации (задаются в `WithWriterPartitionByKey(...)` или `WithWriterPartitionByPartitionID()`): + + - `BoundPartitionChooser` — ключ сопоставляется с диапазонами партиций топика (`FromBound`/`ToBound`). По умолчанию перед сопоставлением ключ проходит через MurmurHash64. Рекомендуется для топиков с [автопартиционированием](../../concepts/datamodel/topic.md#autopartitioning): SDK обновляет границы при разделении партиций. + - `KafkaHashPartitionChooser` — по аналогии с Kafka: MurmurHash ключа по модулю числа партиций. Удобно при миграции с Kafka. Не поддерживается при включённом автопартиционировании. + - `WithWriterPartitionByPartitionID` — запись в партицию, указанную в поле `PartitionID` сообщения. Не комбинируется с маршрутизацией по ключу; при split партиции писатель нужно пересоздавать вручную. + + ```go + writer, err := db.Topic().StartWriter(topicPath, + topicoptions.WithWriteToManyPartitions( + topicoptions.WithProducerIDPrefix("orders-producer"), + topicoptions.WithWriterPartitionByKey(topicoptions.BoundPartitionChooser()), + ), + ) + if err != nil { + return err + } + defer func() { _ = writer.Close(context.Background()) }() + + err = writer.Write(ctx, topicwriter.Message{ + Key: "user-42", + Data: bytes.NewReader([]byte("order-created")), + }) + if err != nil { + return err + } + ``` + + Подробный пример с маршрутизацией по ключу, альтернативными стратегиями (`KafkaHash` и `PartitionID`) и транзакционным вариантом см. в репозитории [ydb-go-sdk](https://github.com/ydb-platform/ydb-go-sdk/blob/master/examples/topic/topicwriter/topicwriter_to_many_partitions.go). + - Python Для отправки сообщений можно передавать как просто содержимое сообщения (bytes, str), так и вручную задавать некоторые свойства. Объекты можно передавать по одному или сразу в массиве (list). Метод `write` выполняется асинхронно. Возврат из метода происходит сразу после того как сообщения будут положены во внутренний буфер клиента, обычно это происходит быстро. Ожидание может возникнуть, если внутренний буфер уже заполнен и нужно подождать, пока часть данных будет отправлена на сервер. @@ -1059,33 +1170,61 @@ - C++ - Получение подтверждений от сервера возможно через интерфейс `IWriteSession`. + {% list tabs %} - Ответы о записи сообщений на сервере приходят клиенту SDK в виде событий `TAcksEvent`. В одном событии могут содержаться ответы о нескольких отправленных ранее сообщениях. Варианты ответа: запись подтверждена (`EES_WRITTEN`), запись отброшена как дубликат ранее записанного сообщения (`EES_ALREADY_WRITTEN`) или запись отброшена по причине сбоя (`EES_DISCARDED`). + - IWriteSession - Пример установки обработчика TAcksEvent для сессии записи: + Ответы о записи сообщений на сервере приходят клиенту SDK в виде событий `TAcksEvent`. В одном событии могут содержаться ответы о нескольких отправленных ранее сообщениях. Варианты ответа: запись подтверждена (`EES_WRITTEN`), запись отброшена как дубликат ранее записанного сообщения (`EES_ALREADY_WRITTEN`) или запись отброшена по причине сбоя (`EES_DISCARDED`). - ```cpp - auto settings = NYdb::NTopic::TWriteSessionSettings() - // other settings are set here - .EventHandlers( - NYdb::NTopic::TWriteSessionSettings::TEventHandlers() - .AcksHandler( - [&](NYdb::NTopic::TWriteSessionEvent::TAcksEvent& event) { - for (const auto& ack : event.Acks) { - if (ack.State == NYdb::NTopic::TWriteSessionEvent::TWriteAck::EEventState::EES_WRITTEN) { - ackedSeqNo.insert(ack.SeqNo); - std::cout << "Acknowledged message with seqNo " << ack.SeqNo << std::endl; + Пример установки обработчика `TAcksEvent` для сессии записи: + + ```cpp + auto settings = NYdb::NTopic::TWriteSessionSettings() + // other settings are set here + .EventHandlers( + NYdb::NTopic::TWriteSessionSettings::TEventHandlers() + .AcksHandler( + [&](NYdb::NTopic::TWriteSessionEvent::TAcksEvent& event) { + for (const auto& ack : event.Acks) { + if (ack.State == NYdb::NTopic::TWriteSessionEvent::TWriteAck::EEventState::EES_WRITTEN) { + ackedSeqNo.insert(ack.SeqNo); + std::cout << "Acknowledged message with seqNo " << ack.SeqNo << std::endl; + } } } - } - ) - ); + ) + ); - auto session = topicClient.CreateWriteSession(settings); - ``` + auto session = topicClient.CreateWriteSession(settings); + ``` + + В такой сессии записи события `TAcksEvent` не будут приходить пользователю в `GetEvent` / `GetEvents`, вместо этого SDK при получении подтверждений от сервера будет вызывать переданный обработчик. Аналогично можно настраивать обработчики на остальные типы событий. + + - ISimpleBlockingWriteSession + + В отличие от `IWriteSession`, `ISimpleBlockingWriteSession` не возвращает подтверждения по отдельным сообщениям: события `TAcksEvent` и обработчики для них недоступны. Чтобы дождаться, пока все сообщения из буфера будут записаны на сервер, вызовите `Close()` — метод дожидается подтверждения от сервера и закрывает сессию. + + - IProducer + + Подтверждения от сервера приходят так же, как у `IWriteSession`: через обработчик `AcksHandler` в `TProducerSettings::EventHandlers`. Чтобы дождаться доставки накопленного буфера на сервер, вызовите `Flush()`. + + ```cpp + auto producerSettings = NYdb::NTopic::TProducerSettings() + .Path("my-topic") + .ProducerIdPrefix("my-producer") + .EventHandlers( + NYdb::NTopic::TWriteSessionSettings::TEventHandlers() + .AcksHandler([](NYdb::NTopic::TWriteSessionEvent::TAcksEvent& event) { + .AcksHandler([](NYdb::NTopic::TWriteSessionEvent::TAcksEvent& event) { + // handle acknowledgements + }) + }) + ); + + auto producer = topicClient.CreateProducer(producerSettings); + ``` - В такой сессии записи события `TAcksEvent` не будут приходить пользователю в `GetEvent` / `GetEvents`, вместо этого SDK при получении подтверждений от сервера будет вызывать переданный обработчик. Аналогично можно настраивать обработчики на остальные типы событий. + {% endlist %} - Go @@ -1246,18 +1385,48 @@ - C++ - Сжатие, которое используется при отправке сообщений методом `Write`, задаётся при [создании сессии записи](#start-writer) настройками `Codec` и `CompressionLevel`. По умолчанию выбирается кодек GZIP. - Пример создания сессии записи без сжатия сообщений: + {% list tabs %} - ```cpp - auto settings = NYdb::NTopic::TWriteSessionSettings() - // other settings are set here - .Codec(ECodec::RAW); + - IWriteSession - auto session = topicClient.CreateWriteSession(settings); - ``` + Сжатие, которое используется при отправке сообщений методом `Write`, задаётся при [создании сессии записи](#start-writer) настройками `Codec` и `CompressionLevel`. По умолчанию выбирается кодек GZIP. + Пример создания сессии записи без сжатия сообщений: + + ```cpp + auto settings = NYdb::NTopic::TWriteSessionSettings() + // other settings are set here + .Codec(ECodec::RAW); + + auto session = topicClient.CreateWriteSession(settings); + ``` + + Если необходимо в рамках сессии записи отправить сообщение, сжатое другим кодеком, можно использовать метод `WriteEncoded` с указанием кодека и размера расжатого сообщения. Для успешной записи этим способом используемый кодек должен быть разрешён в настройках топика. + + - ISimpleBlockingWriteSession - Если необходимо в рамках сессии записи отправить сообщение, сжатое другим кодеком, можно использовать метод `WriteEncoded` с указанием кодека и размера расжатого сообщения. Для успешной записи этим способом используемый кодек должен быть разрешён в настройках топика. + Кодек задаётся при [создании сессии записи](#start-writer) в `TWriteSessionSettings` — те же настройки `Codec` и `CompressionLevel`, что и у `IWriteSession`. Метод `WriteEncoded` недоступен. + + ```cpp + auto settings = NYdb::NTopic::TWriteSessionSettings() + // other settings are set here + .Codec(ECodec::RAW); + + auto session = topicClient.CreateSimpleBlockingWriteSession(settings); + ``` + + - IProducer + + Кодек задаётся в `TProducerSettings` при [создании продюсера](#start-writer) — те же настройки `Codec` и `CompressionLevel`, что и у `IWriteSession`. + + ```cpp + auto producerSettings = NYdb::NTopic::TProducerSettings() + // other settings are set here + .Codec(NYdb::NTopic::ECodec::RAW); + + auto producer = topicClient.CreateProducer(producerSettings); + ``` + + {% endlist %} - Go @@ -1343,17 +1512,38 @@ - C++ - Если в настройках сессии записи не указывается опция `ProducerId`, будет создана сессия записи без дедупликации. - Пример создания такой сессии записи: + {% list tabs %} - ```cpp - auto settings = NYdb::NTopic::TWriteSessionSettings() - .Path(myTopicPath); + - IWriteSession - auto session = topicClient.CreateWriteSession(settings); - ``` + Если в настройках сессии записи не указывается опция `ProducerId`, будет создана сессия записи без дедупликации. + Пример создания такой сессии записи: + + ```cpp + auto settings = NYdb::NTopic::TWriteSessionSettings() + .Path(myTopicPath); + + auto session = topicClient.CreateWriteSession(settings); + ``` + + Для включения дедупликации нужно в настройках сессии записи указать опцию `ProducerId` или явно включить дедупликацию, вызвав метод `DeduplicationEnabled()`, например, как в секции ["Подключение к топику"](#start-writer). + + - ISimpleBlockingWriteSession + + Поведение такое же, как у `IWriteSession`: если не указывать `ProducerId`, сессия создаётся без дедупликации. + + ```cpp + auto settings = NYdb::NTopic::TWriteSessionSettings() + .Path(myTopicPath); - Для включения дедупликации нужно в настройках сессии записи указать опцию `ProducerId` или явно включить дедупликацию, вызвав метод `DeduplicationEnabled()`, например, как в секции ["Подключение к топику"](#start-writer). + auto session = topicClient.CreateSimpleBlockingWriteSession(settings); + ``` + + - IProducer + + `IProducer` всегда записывает с дедупликацией: идентификатор продюсера формируется из `ProducerIdPrefix` и id партиции. Для записи без дедупликации используйте `IWriteSession` или `ISimpleBlockingWriteSession`. + + {% endlist %} - JavaScript @@ -1392,26 +1582,60 @@ - C++ - Воспользоваться функцией записи метаданных можно с помощью метода `Write()`, принимающего `TWriteMessage` объект: + {% list tabs %} - ```cpp - auto settings = NYdb::NTopic::TWriteSessionSettings() - .Path(myTopicPath) - // set all other settings; - ; + - IWriteSession - auto session = topicClient.CreateWriteSession(settings); + Метаданные задаются в объекте `TWriteMessage` и передаются в `Write()`: - std::optional<NYdb::NTopic::TWriteSessionEvent::TEvent> event = session->GetEvent(/*block=*/true); - NYdb::NTopic::TWriteMessage message("This is yet another message").MessageMeta({ - {"meta-key", "meta-value"}, - {"another-key", "value"} - }); + ```cpp + auto settings = NYdb::NTopic::TWriteSessionSettings() + .Path(myTopicPath) + // set all other settings; + ; - if (auto* readyEvent = std::get_if<NYdb::NTopic::TWriteSessionEvent::TReadyToAcceptEvent>(&*event)) { - session->Write(std::move(event.ContinuationToken), std::move(message)); - } - ``` + auto session = topicClient.CreateWriteSession(settings); + + std::optional<NYdb::NTopic::TWriteSessionEvent::TEvent> event = session->GetEvent(/*block=*/true); + NYdb::NTopic::TWriteMessage message("This is yet another message").MessageMeta({ + {"meta-key", "meta-value"}, + {"another-key", "value"} + }); + + if (auto* readyEvent = std::get_if<NYdb::NTopic::TWriteSessionEvent::TReadyToAcceptEvent>(&*event)) { + session->Write(std::move(event.ContinuationToken), std::move(message)); + } + ``` + + - ISimpleBlockingWriteSession + + Используется тот же `TWriteMessage`, что и у `IWriteSession`: метаданные задаются через `MessageMeta()` и передаются в `Write()`: + + ```cpp + auto messageData = std::string("message-data"); + NYdb::NTopic::TWriteMessage writeMessage(messageData); + writeMessage.MessageMeta({ + {"meta-key", "meta-value"}, + {"another-key", "value"}, + }); + session->Write(std::move(writeMessage)); + ``` + + - IProducer + + Как и у сессий записи, метаданные задаются в `TWriteMessage` через `MessageMeta()`. Отличие `IProducer` в том, что сообщение также содержит ключ партиционирования, по которому продюсер выбирает партицию: + + ```cpp + auto messageData = std::string("message-data"); + NYdb::NTopic::TWriteMessage writeMessage("user-42", messageData); + writeMessage.MessageMeta({ + {"meta-key", "meta-value"}, + {"another-key", "value"}, + }); + producer->Write(std::move(writeMessage)); + ``` + + {% endlist %} - Go @@ -1541,26 +1765,59 @@ - C++ - Для записи в топик в транзакции необходимо передать ссылку на объект транзакции в метод `Write` сессии записи. + {% list tabs %} - [Пример на GitHub](https://github.com/ydb-platform/ydb-cpp-sdk/blob/main/examples/topic_writer/transaction/main.cpp) + - IWriteSession - ```c++ - NYdb::NQuery::TQueryClient queryClient(driver); + Для записи в топик в транзакции необходимо передать ссылку на объект транзакции в метод `Write` сессии записи. - NYdb::NStatusHelpers::ThrowOnError(queryClient.RetryQuerySync([](NYdb::NQuery::TSession session) -> NYdb::TStatus { - auto beginTxResult = session.BeginTransaction().GetValueSync(); - if (!beginTxResult.IsSuccess()) { - return beginTxResult; - } - auto tx = beginTxResult.GetTransaction(); + [Пример на GitHub](https://github.com/ydb-platform/ydb-cpp-sdk/blob/main/examples/topic_writer/transaction/main.cpp) - NYdb::NTopic::TWriteMessage writeMessage("message"); + ```c++ + NYdb::NQuery::TQueryClient queryClient(driver); - topicSession->Write(std::move(writeMessage), tx); - return tx.Commit().GetValueSync(); - })); - ``` + NYdb::NStatusHelpers::ThrowOnError(queryClient.RetryQuerySync([](NYdb::NQuery::TSession session) -> NYdb::TStatus { + auto beginTxResult = session.BeginTransaction().GetValueSync(); + if (!beginTxResult.IsSuccess()) { + return beginTxResult; + } + auto tx = beginTxResult.GetTransaction(); + + NYdb::NTopic::TWriteMessage writeMessage("message"); + + topicSession->Write(std::move(writeMessage), tx); + return tx.Commit().GetValueSync(); + })); + ``` + + - ISimpleBlockingWriteSession + + Как и `IWriteSession`, `ISimpleBlockingWriteSession` поддерживает запись в транзакции. Так как у простого варианта нет `ContinuationToken`, объект транзакции передаётся вторым аргументом в `Write()`. + + [Пример на GitHub](https://github.com/ydb-platform/ydb-cpp-sdk/blob/main/examples/topic_writer/transaction/main.cpp) + + ```c++ + NYdb::NQuery::TQueryClient queryClient(driver); + + NYdb::NStatusHelpers::ThrowOnError(queryClient.RetryQuerySync([](NYdb::NQuery::TSession session) -> NYdb::TStatus { + auto beginTxResult = session.BeginTransaction().GetValueSync(); + if (!beginTxResult.IsSuccess()) { + return beginTxResult; + } + auto tx = beginTxResult.GetTransaction(); + + NYdb::NTopic::TWriteMessage writeMessage("message"); + + topicSession->Write(std::move(writeMessage), &tx); + return tx.Commit().GetValueSync(); + })); + ``` + + - IProducer + + У `IProducer` транзакция задаётся не аргументом `Write`, а в `TWriteMessage` через `Tx()`. После этого продюсер записывает сообщение так же, как обычное сообщение с ключом партиционирования: + + {% endlist %} - Go |
