summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKuzin Roman <[email protected]>2026-06-29 09:21:22 +0300
committerGitHub <[email protected]>2026-06-29 09:21:22 +0300
commita6ca10da47c615bf12160ccf3d6a94e48ddb39d2 (patch)
treea5dd2944f77f092478658ad729413dd512028d0e
parentea1a8ec1e4504ec92dd52ee1a1ea6c24bbcfcaac (diff)
LOGBROKER-10206 Add docs about producer (#42182)HEADmain
Co-authored-by: Roman Kuzin <[email protected]> Co-authored-by: sintjuri <[email protected]> Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
-rw-r--r--ydb/docs/ru/core/reference/ydb-sdk/topic.md459
1 files changed, 358 insertions, 101 deletions
diff --git a/ydb/docs/ru/core/reference/ydb-sdk/topic.md b/ydb/docs/ru/core/reference/ydb-sdk/topic.md
index 4679919096f..40d20e6737e 100644
--- a/ydb/docs/ru/core/reference/ydb-sdk/topic.md
+++ b/ydb/docs/ru/core/reference/ydb-sdk/topic.md
@@ -698,21 +698,69 @@
- C++
- Подключение к топику на запись представлено объектом сессии записи с интерфейсом `IWriteSession` или `ISimpleBlockingWriteSession` (вариант для простой записи по одному сообщению без подтверждения, блокирующейся при превышении числа inflight записей или размера буфера SDK). Настройки сессии записи представлены структурой `TWriteSessionSettings`, для варианта `ISimpleBlockingWriteSession` часть настроек не поддерживается.
+ В C++ SDK для записи в топик доступно три варианта API. Базовые настройки (буферизация, кодеки, ретраи) одинаковы у всех трёх и заданы структурой `TWriteSessionSettings`, поэтому ниже подсказки сосредоточены на отличиях и сценариях использования.
+ - `IWriteSession` — низкоуровневая сессия записи в одну партицию с полным набором возможностей: цикл событий (`TReadyToAcceptEvent`, `TAcksEvent`, `TSessionClosedEvent`), явная back-pressure через `TContinuationToken`, поштучные подтверждения по каждому сообщению, отправка предварительно сжатых данных через `WriteEncoded`. Структура `TWriteSessionSettings` определена именно здесь — два других варианта её переиспользуют. Подходит, когда нужен статус каждого сообщения, кастомная асинхронная логика или ручное управление сжатием.
+ - `ISimpleBlockingWriteSession` — синхронный fire-and-forget API для записи в одну партицию топика, самый простой вариант. Метод `Write(message, blockTimeout)` кладёт сообщение во внутренний буфер, отправка на сервер идёт в фоне. В нормальном режиме вызов возвращается мгновенно и блокируется только при переполнении буфера (по `MaxMemoryUsage` / `MaxInflightCount`), не дольше `blockTimeout`. Возврат `false` означает, что сообщение **не попало в буфер** и потеряно. Подтверждений по отдельным сообщениям нет; убедиться, что весь буфер доставлен на сервер, можно только вызвав `Close()` — он ждёт ack от сервера. Подходит, когда достаточно гарантии «всё или ничего» к моменту закрытия сессии и нужен простой синхронный код.
+ - `IProducer` — высокоуровневый API поверх нескольких сессий записи: прозрачно шардирует сообщения по партициям топика по ключу. Вдохновлён интерфейсом Producer из Apache Kafka, но учитывает особенности {{ ydb-short-name }} и при работе с топиками с [автопартиционированием](../../concepts/datamodel/topic.md#autopartitioning) даёт полные гарантии порядка и exactly-once. Подтверждения от сервера доступны через обработчик `AcksHandler`; дождаться доставки накопленного буфера — `Flush()`, корректно завершить работу — `Close()`. Подходит, когда нужно писать в **многопартиционный** топик с маршрутизацией по ключу.
- Полный список настроек смотри [в заголовочном файле](https://github.com/ydb-platform/ydb/blob/d2d07d368cd8ffd9458cc2e33798ee4ac86c733c/ydb/public/sdk/cpp/client/ydb_topic/topic.h#L1199).
+ {% list tabs %}
- Пример создания сессии записи с интерфейсом `IWriteSession`.
+ - IWriteSession
- ```cpp
- std::string producerAndGroupID = "group-id";
- auto settings = NYdb::NTopic::TWriteSessionSettings()
- .Path("my-topic")
- .ProducerId(producerAndGroupID)
- .MessageGroupId(producerAndGroupID);
+ `IWriteSession` — базовая сессия записи, от которой наследуют настройки другие варианты записи. Настройки сессии записи представлены структурой `TWriteSessionSettings`, для варианта `ISimpleBlockingWriteSession` часть настроек не поддерживается.
- auto session = topicClient.CreateWriteSession(settings);
- ```
+ Полный список настроек смотри [в заголовочном файле](https://github.com/ydb-platform/ydb/blob/main/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/write_session.h#L56).
+
+ ```cpp
+ std::string producerAndGroupID = "group-id";
+ auto settings = NYdb::NTopic::TWriteSessionSettings()
+ .Path("my-topic")
+ .ProducerId(producerAndGroupID)
+ .MessageGroupId(producerAndGroupID);
+
+ auto session = topicClient.CreateWriteSession(settings);
+ ```
+
+ - ISimpleBlockingWriteSession
+
+ `ISimpleBlockingWriteSession` — простой синхронный вариант сессии записи `IWriteSession` для записи по одному сообщению без подтверждения по каждому сообщению. Метод `Write` блокируется при превышении числа inflight записей или размера буфера SDK. Настройки сессии записи представлены структурой `TWriteSessionSettings`, как и в случае `IWriteSession`, однако часть настроек не поддерживается.
+
+ Полный список настроек смотри [в заголовочном файле](https://github.com/ydb-platform/ydb/blob/main/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/write_session.h#L56).
+
+ ```cpp
+ std::string producerAndGroupID = "group-id";
+ auto settings = NYdb::NTopic::TWriteSessionSettings()
+ .Path("my-topic")
+ .ProducerId(producerAndGroupID)
+ .MessageGroupId(producerAndGroupID);
+
+ auto session = topicClient.CreateSimpleBlockingWriteSession(settings);
+ ```
+
+ - IProducer
+
+ `IProducer` — высокоуровневый API поверх сессий записи: один объект скрывает управление несколькими сессиями и автоматически выбирает партицию по ключу сообщения. Настройки продюсера представлены структурой `TProducerSettings`, которая наследует `TWriteSessionSettings`, поэтому общие настройки записи совпадают с `IWriteSession`.
+
+ Настройки задаются через `TProducerSettings`:
+
+ - `ProducerIdPrefix` — префикс producer id для подсессий записи;
+ - `PartitionChooserStrategy` — стратегия выбора партиции по ключу сообщения:
+ - `Bound` — ключ сопоставляется с диапазонами партиций топика (`FromBound`/`ToBound` из описания топика). По умолчанию перед сопоставлением ключ проходит через MurmurHash64. Рекомендуется для топиков с [автопартиционированием](../../concepts/datamodel/topic.md#autopartitioning): при разделении партиции SDK обновляет границы и продолжает направлять сообщения с тем же ключом в корректный диапазон.
+ - `KafkaHash` — по аналогии с Kafka: от ключа считается MurmurHash, индекс партиции — остаток от деления хеша на число партиций. Удобно при миграции с Kafka. Не поддерживается при включённом автопартиционировании.
+ - `PartitioningKeyHasher` — функция преобразования ключа перед сопоставлением с диапазонами; используется только для стратегии `Bound`. Можно задать свою, например чтобы в сравнении участвовал исходный ключ без хеширования.
+
+ Полный список настроек смотри [в заголовочном файле](https://github.com/ydb-platform/ydb/blob/main/ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/producer.h#L10).
+
+ ```cpp
+ auto producerSettings = NYdb::NTopic::TProducerSettings()
+ .Path("my-topic")
+ .ProducerIdPrefix("my-producer")
+ .PartitionChooserStrategy(NYdb::NTopic::EPartitionChooserStrategy::Bound);
+
+ auto producer = topicClient.CreateProducer(producerSettings);
+ ```
+
+ {% endlist %}
- Go
@@ -861,38 +909,70 @@
- C++
- Асинхронная запись возможна через интерфейс `IWriteSession`.
+ {% list tabs %}
- Работа пользователя с объектом `IWriteSession` в общем устроена как обработка цикла событий с тремя типами событий: `TReadyToAcceptEvent`, `TAcksEvent` и `TSessionClosedEvent`.
+ - IWriteSession
- Для каждого из типов событий можно установить обработчик этого события, а также можно установить общий обработчик. Обработчики устанавливаются в настройках сессии записи перед её созданием.
+ Работа с объектом `IWriteSession` устроена как обработка цикла событий с тремя типами событий: `TReadyToAcceptEvent`, `TAcksEvent` и `TSessionClosedEvent`.
- Если обработчик для некоторого события не установлен, его необходимо получить и обработать в методах `GetEvent` / `GetEvents`. Для неблокирующего ожидания очередного события есть метод `WaitEvent` с интерфейсом `TFuture<void>()`.
+ Для каждого из типов событий можно установить обработчик этого события, а также можно установить общий обработчик. Обработчики устанавливаются в настройках сессии записи перед её созданием.
- Для записи каждого сообщения пользователь должен "потратить" move-only объект `TContinuationToken`, который выдаёт SDK с событием `TReadyToAcceptEvent`. При записи сообщения можно установить пользовательские seqNo и временную метку создания, но по умолчанию их проставляет SDK автоматически.
+ Если обработчик для некоторого события не установлен, его необходимо получить и обработать в методах `GetEvent` / `GetEvents`. Для неблокирующего ожидания очередного события есть метод `WaitEvent` с интерфейсом `TFuture<void>()`.
- По умолчанию `Write` выполняется асинхронно - данные из сообщений вычитываются и сохраняются во внутренний буфер, отправка происходит в фоне в соответствии с настройками `MaxMemoryUsage`, `MaxInflightCount`, `BatchFlushInterval`, `BatchFlushSizeBytes`. Сессия сама переподключается к {{ ydb-short-name }} при обрывах связи и повторяет отправку сообщений пока это возможно, в соответствии с настройкой `RetryPolicy`. При получении ошибки, после которой невозможно продолжить работу, сессия чтения отправляет пользователю `TSessionClosedEvent` с диагностической информацией.
+ Для записи каждого сообщения пользователь должен "потратить" move-only объект `TContinuationToken`, который выдаёт SDK с событием `TReadyToAcceptEvent`. При записи сообщения можно установить пользовательские seqNo и временную метку создания, но по умолчанию их проставляет SDK автоматически.
- Так может выглядеть запись нескольких сообщений в цикле событий без использования обработчиков:
+ По умолчанию `Write` выполняется асинхронно — данные из сообщений вычитываются и сохраняются во внутренний буфер, отправка происходит в фоне в соответствии с настройками `MaxMemoryUsage`, `MaxInflightCount`, `BatchFlushInterval`, `BatchFlushSizeBytes`. Сессия сама переподключается к {{ ydb-short-name }} при обрывах связи и повторяет отправку сообщений пока это возможно, в соответствии с настройкой `RetryPolicy`. При получении ошибки, после которой невозможно продолжить работу, сессия записи отправляет пользователю `TSessionClosedEvent` с диагностической информацией.
- ```cpp
- // Event loop
- while (true) {
- // Get event
- // May block for a while if write session is busy
- std::optional<NYdb::NTopic::TWriteSessionEvent::TEvent> event = session->GetEvent(/*block=*/true);
+ Так может выглядеть запись нескольких сообщений в цикле событий без использования обработчиков:
- if (auto* readyEvent = std::get_if<NYdb::NTopic::TWriteSessionEvent::TReadyToAcceptEvent>(&*event)) {
- session->Write(std::move(event.ContinuationToken), "This is yet another message.");
+ ```cpp
+ // Event loop
+ while (true) {
+ // Get event
+ // May block for a while if write session is busy
+ std::optional<NYdb::NTopic::TWriteSessionEvent::TEvent> event = session->GetEvent(/*block=*/true);
- } else if (auto* ackEvent = std::get_if<NYdb::NTopic::TWriteSessionEvent::TAcksEvent>(&*event)) {
- std::cout << ackEvent->DebugString() << std::endl;
+ if (auto* readyEvent = std::get_if<NYdb::NTopic::TWriteSessionEvent::TReadyToAcceptEvent>(&*event)) {
+ session->Write(std::move(event.ContinuationToken), "This is yet another message.");
- } else if (auto* closeSessionEvent = std::get_if<NYdb::NTopic::TSessionClosedEvent>(&*event)) {
- break;
- }
- }
- ```
+ } else if (auto* ackEvent = std::get_if<NYdb::NTopic::TWriteSessionEvent::TAcksEvent>(&*event)) {
+ std::cout << ackEvent->DebugString() << std::endl;
+
+ } else if (auto* closeSessionEvent = std::get_if<NYdb::NTopic::TSessionClosedEvent>(&*event)) {
+ break;
+ }
+ }
+ ```
+
+ - ISimpleBlockingWriteSession
+
+ Как упрощённый вариант `IWriteSession`, `ISimpleBlockingWriteSession` пишет через ту же внутреннюю буферизацию, но без цикла событий: не использует `ContinuationToken` и не возвращает подтверждения через `TAcksEvent`. Метод `Write` кладёт сообщение во внутренний буфер; если буфер переполнен, вызов блокируется до появления места. Параметр `blockTimeout` ограничивает время ожидания. Метод возвращает `true`, если сообщение принято в буфер, и `false`, если за отведённое время записать не удалось.
+
+ Отправка на сервер, как и у `IWriteSession`, выполняется в фоне. Чтобы дождаться завершения всех записей и закрыть сессию, вызовите `Close()`.
+
+ ```cpp
+ auto messageData = std::string("message");
+ NYdb::NTopic::TWriteMessage writeMessage(messageData);
+ session->Write(std::move(writeMessage));
+ ```
+
+ - IProducer
+
+ `TProducerSettings` наследует `TWriteSessionSettings`, поэтому буферизация, отправка и переподключение устроены так же, как у `IWriteSession`: `Write` кладёт сообщение во внутренний буфер, отправка на сервер идёт в фоне в соответствии с настройками `MaxMemoryUsage`, `MaxInflightCount`, `BatchFlushInterval`, `BatchFlushSizeBytes`. Продюсер переподключается к {{ ydb-short-name }} при обрывах связи и повторяет отправку, пока это возможно, в соответствии с `RetryPolicy`. При неустранимой ошибке продюсер закрывается; статус и причину можно получить из результата `Write` или `Flush`.
+
+ `Flush` дожидается доставки накопленных данных на сервер; `Close` дожидается отправки оставшихся в буфере сообщений и завершает работу продюсера.
+
+ ```cpp
+ auto messageData = std::string("order-created");
+ // First argument is the partitioning key — the SDK chooses a partition by it.
+ NYdb::NTopic::TWriteMessage writeMessage("user-42", messageData);
+ producer->Write(std::move(writeMessage));
+ producer->Flush().GetValueSync();
+ ```
+
+ Подробный пример см. в [репозитории ydb-platform/ydb](https://github.com/ydb-platform/ydb/tree/main/ydb/public/sdk/cpp/examples/topic_writer/producer/basic_write).
+
+ {% endlist %}
- Go
@@ -913,6 +993,37 @@
}
```
+ Для записи по ключу в несколько партиций используйте `WithWriteToManyPartitions(...)` при создании писателя и заполняйте поле `Key` в `topicwriter.Message`.
+
+ Стратегии маршрутизации (задаются в `WithWriterPartitionByKey(...)` или `WithWriterPartitionByPartitionID()`):
+
+ - `BoundPartitionChooser` — ключ сопоставляется с диапазонами партиций топика (`FromBound`/`ToBound`). По умолчанию перед сопоставлением ключ проходит через MurmurHash64. Рекомендуется для топиков с [автопартиционированием](../../concepts/datamodel/topic.md#autopartitioning): SDK обновляет границы при разделении партиций.
+ - `KafkaHashPartitionChooser` — по аналогии с Kafka: MurmurHash ключа по модулю числа партиций. Удобно при миграции с Kafka. Не поддерживается при включённом автопартиционировании.
+ - `WithWriterPartitionByPartitionID` — запись в партицию, указанную в поле `PartitionID` сообщения. Не комбинируется с маршрутизацией по ключу; при split партиции писатель нужно пересоздавать вручную.
+
+ ```go
+ writer, err := db.Topic().StartWriter(topicPath,
+ topicoptions.WithWriteToManyPartitions(
+ topicoptions.WithProducerIDPrefix("orders-producer"),
+ topicoptions.WithWriterPartitionByKey(topicoptions.BoundPartitionChooser()),
+ ),
+ )
+ if err != nil {
+ return err
+ }
+ defer func() { _ = writer.Close(context.Background()) }()
+
+ err = writer.Write(ctx, topicwriter.Message{
+ Key: "user-42",
+ Data: bytes.NewReader([]byte("order-created")),
+ })
+ if err != nil {
+ return err
+ }
+ ```
+
+ Подробный пример с маршрутизацией по ключу, альтернативными стратегиями (`KafkaHash` и `PartitionID`) и транзакционным вариантом см. в репозитории [ydb-go-sdk](https://github.com/ydb-platform/ydb-go-sdk/blob/master/examples/topic/topicwriter/topicwriter_to_many_partitions.go).
+
- Python
Для отправки сообщений можно передавать как просто содержимое сообщения (bytes, str), так и вручную задавать некоторые свойства. Объекты можно передавать по одному или сразу в массиве (list). Метод `write` выполняется асинхронно. Возврат из метода происходит сразу после того как сообщения будут положены во внутренний буфер клиента, обычно это происходит быстро. Ожидание может возникнуть, если внутренний буфер уже заполнен и нужно подождать, пока часть данных будет отправлена на сервер.
@@ -1059,33 +1170,61 @@
- C++
- Получение подтверждений от сервера возможно через интерфейс `IWriteSession`.
+ {% list tabs %}
- Ответы о записи сообщений на сервере приходят клиенту SDK в виде событий `TAcksEvent`. В одном событии могут содержаться ответы о нескольких отправленных ранее сообщениях. Варианты ответа: запись подтверждена (`EES_WRITTEN`), запись отброшена как дубликат ранее записанного сообщения (`EES_ALREADY_WRITTEN`) или запись отброшена по причине сбоя (`EES_DISCARDED`).
+ - IWriteSession
- Пример установки обработчика TAcksEvent для сессии записи:
+ Ответы о записи сообщений на сервере приходят клиенту SDK в виде событий `TAcksEvent`. В одном событии могут содержаться ответы о нескольких отправленных ранее сообщениях. Варианты ответа: запись подтверждена (`EES_WRITTEN`), запись отброшена как дубликат ранее записанного сообщения (`EES_ALREADY_WRITTEN`) или запись отброшена по причине сбоя (`EES_DISCARDED`).
- ```cpp
- auto settings = NYdb::NTopic::TWriteSessionSettings()
- // other settings are set here
- .EventHandlers(
- NYdb::NTopic::TWriteSessionSettings::TEventHandlers()
- .AcksHandler(
- [&](NYdb::NTopic::TWriteSessionEvent::TAcksEvent& event) {
- for (const auto& ack : event.Acks) {
- if (ack.State == NYdb::NTopic::TWriteSessionEvent::TWriteAck::EEventState::EES_WRITTEN) {
- ackedSeqNo.insert(ack.SeqNo);
- std::cout << "Acknowledged message with seqNo " << ack.SeqNo << std::endl;
+ Пример установки обработчика `TAcksEvent` для сессии записи:
+
+ ```cpp
+ auto settings = NYdb::NTopic::TWriteSessionSettings()
+ // other settings are set here
+ .EventHandlers(
+ NYdb::NTopic::TWriteSessionSettings::TEventHandlers()
+ .AcksHandler(
+ [&](NYdb::NTopic::TWriteSessionEvent::TAcksEvent& event) {
+ for (const auto& ack : event.Acks) {
+ if (ack.State == NYdb::NTopic::TWriteSessionEvent::TWriteAck::EEventState::EES_WRITTEN) {
+ ackedSeqNo.insert(ack.SeqNo);
+ std::cout << "Acknowledged message with seqNo " << ack.SeqNo << std::endl;
+ }
}
}
- }
- )
- );
+ )
+ );
- auto session = topicClient.CreateWriteSession(settings);
- ```
+ auto session = topicClient.CreateWriteSession(settings);
+ ```
+
+ В такой сессии записи события `TAcksEvent` не будут приходить пользователю в `GetEvent` / `GetEvents`, вместо этого SDK при получении подтверждений от сервера будет вызывать переданный обработчик. Аналогично можно настраивать обработчики на остальные типы событий.
+
+ - ISimpleBlockingWriteSession
+
+ В отличие от `IWriteSession`, `ISimpleBlockingWriteSession` не возвращает подтверждения по отдельным сообщениям: события `TAcksEvent` и обработчики для них недоступны. Чтобы дождаться, пока все сообщения из буфера будут записаны на сервер, вызовите `Close()` — метод дожидается подтверждения от сервера и закрывает сессию.
+
+ - IProducer
+
+ Подтверждения от сервера приходят так же, как у `IWriteSession`: через обработчик `AcksHandler` в `TProducerSettings::EventHandlers`. Чтобы дождаться доставки накопленного буфера на сервер, вызовите `Flush()`.
+
+ ```cpp
+ auto producerSettings = NYdb::NTopic::TProducerSettings()
+ .Path("my-topic")
+ .ProducerIdPrefix("my-producer")
+ .EventHandlers(
+ NYdb::NTopic::TWriteSessionSettings::TEventHandlers()
+ .AcksHandler([](NYdb::NTopic::TWriteSessionEvent::TAcksEvent& event) {
+ .AcksHandler([](NYdb::NTopic::TWriteSessionEvent::TAcksEvent& event) {
+ // handle acknowledgements
+ })
+ })
+ );
+
+ auto producer = topicClient.CreateProducer(producerSettings);
+ ```
- В такой сессии записи события `TAcksEvent` не будут приходить пользователю в `GetEvent` / `GetEvents`, вместо этого SDK при получении подтверждений от сервера будет вызывать переданный обработчик. Аналогично можно настраивать обработчики на остальные типы событий.
+ {% endlist %}
- Go
@@ -1246,18 +1385,48 @@
- C++
- Сжатие, которое используется при отправке сообщений методом `Write`, задаётся при [создании сессии записи](#start-writer) настройками `Codec` и `CompressionLevel`. По умолчанию выбирается кодек GZIP.
- Пример создания сессии записи без сжатия сообщений:
+ {% list tabs %}
- ```cpp
- auto settings = NYdb::NTopic::TWriteSessionSettings()
- // other settings are set here
- .Codec(ECodec::RAW);
+ - IWriteSession
- auto session = topicClient.CreateWriteSession(settings);
- ```
+ Сжатие, которое используется при отправке сообщений методом `Write`, задаётся при [создании сессии записи](#start-writer) настройками `Codec` и `CompressionLevel`. По умолчанию выбирается кодек GZIP.
+ Пример создания сессии записи без сжатия сообщений:
+
+ ```cpp
+ auto settings = NYdb::NTopic::TWriteSessionSettings()
+ // other settings are set here
+ .Codec(ECodec::RAW);
+
+ auto session = topicClient.CreateWriteSession(settings);
+ ```
+
+ Если необходимо в рамках сессии записи отправить сообщение, сжатое другим кодеком, можно использовать метод `WriteEncoded` с указанием кодека и размера расжатого сообщения. Для успешной записи этим способом используемый кодек должен быть разрешён в настройках топика.
+
+ - ISimpleBlockingWriteSession
- Если необходимо в рамках сессии записи отправить сообщение, сжатое другим кодеком, можно использовать метод `WriteEncoded` с указанием кодека и размера расжатого сообщения. Для успешной записи этим способом используемый кодек должен быть разрешён в настройках топика.
+ Кодек задаётся при [создании сессии записи](#start-writer) в `TWriteSessionSettings` — те же настройки `Codec` и `CompressionLevel`, что и у `IWriteSession`. Метод `WriteEncoded` недоступен.
+
+ ```cpp
+ auto settings = NYdb::NTopic::TWriteSessionSettings()
+ // other settings are set here
+ .Codec(ECodec::RAW);
+
+ auto session = topicClient.CreateSimpleBlockingWriteSession(settings);
+ ```
+
+ - IProducer
+
+ Кодек задаётся в `TProducerSettings` при [создании продюсера](#start-writer) — те же настройки `Codec` и `CompressionLevel`, что и у `IWriteSession`.
+
+ ```cpp
+ auto producerSettings = NYdb::NTopic::TProducerSettings()
+ // other settings are set here
+ .Codec(NYdb::NTopic::ECodec::RAW);
+
+ auto producer = topicClient.CreateProducer(producerSettings);
+ ```
+
+ {% endlist %}
- Go
@@ -1343,17 +1512,38 @@
- C++
- Если в настройках сессии записи не указывается опция `ProducerId`, будет создана сессия записи без дедупликации.
- Пример создания такой сессии записи:
+ {% list tabs %}
- ```cpp
- auto settings = NYdb::NTopic::TWriteSessionSettings()
- .Path(myTopicPath);
+ - IWriteSession
- auto session = topicClient.CreateWriteSession(settings);
- ```
+ Если в настройках сессии записи не указывается опция `ProducerId`, будет создана сессия записи без дедупликации.
+ Пример создания такой сессии записи:
+
+ ```cpp
+ auto settings = NYdb::NTopic::TWriteSessionSettings()
+ .Path(myTopicPath);
+
+ auto session = topicClient.CreateWriteSession(settings);
+ ```
+
+ Для включения дедупликации нужно в настройках сессии записи указать опцию `ProducerId` или явно включить дедупликацию, вызвав метод `DeduplicationEnabled()`, например, как в секции ["Подключение к топику"](#start-writer).
+
+ - ISimpleBlockingWriteSession
+
+ Поведение такое же, как у `IWriteSession`: если не указывать `ProducerId`, сессия создаётся без дедупликации.
+
+ ```cpp
+ auto settings = NYdb::NTopic::TWriteSessionSettings()
+ .Path(myTopicPath);
- Для включения дедупликации нужно в настройках сессии записи указать опцию `ProducerId` или явно включить дедупликацию, вызвав метод `DeduplicationEnabled()`, например, как в секции ["Подключение к топику"](#start-writer).
+ auto session = topicClient.CreateSimpleBlockingWriteSession(settings);
+ ```
+
+ - IProducer
+
+ `IProducer` всегда записывает с дедупликацией: идентификатор продюсера формируется из `ProducerIdPrefix` и id партиции. Для записи без дедупликации используйте `IWriteSession` или `ISimpleBlockingWriteSession`.
+
+ {% endlist %}
- JavaScript
@@ -1392,26 +1582,60 @@
- C++
- Воспользоваться функцией записи метаданных можно с помощью метода `Write()`, принимающего `TWriteMessage` объект:
+ {% list tabs %}
- ```cpp
- auto settings = NYdb::NTopic::TWriteSessionSettings()
- .Path(myTopicPath)
- // set all other settings;
- ;
+ - IWriteSession
- auto session = topicClient.CreateWriteSession(settings);
+ Метаданные задаются в объекте `TWriteMessage` и передаются в `Write()`:
- std::optional<NYdb::NTopic::TWriteSessionEvent::TEvent> event = session->GetEvent(/*block=*/true);
- NYdb::NTopic::TWriteMessage message("This is yet another message").MessageMeta({
- {"meta-key", "meta-value"},
- {"another-key", "value"}
- });
+ ```cpp
+ auto settings = NYdb::NTopic::TWriteSessionSettings()
+ .Path(myTopicPath)
+ // set all other settings;
+ ;
- if (auto* readyEvent = std::get_if<NYdb::NTopic::TWriteSessionEvent::TReadyToAcceptEvent>(&*event)) {
- session->Write(std::move(event.ContinuationToken), std::move(message));
- }
- ```
+ auto session = topicClient.CreateWriteSession(settings);
+
+ std::optional<NYdb::NTopic::TWriteSessionEvent::TEvent> event = session->GetEvent(/*block=*/true);
+ NYdb::NTopic::TWriteMessage message("This is yet another message").MessageMeta({
+ {"meta-key", "meta-value"},
+ {"another-key", "value"}
+ });
+
+ if (auto* readyEvent = std::get_if<NYdb::NTopic::TWriteSessionEvent::TReadyToAcceptEvent>(&*event)) {
+ session->Write(std::move(event.ContinuationToken), std::move(message));
+ }
+ ```
+
+ - ISimpleBlockingWriteSession
+
+ Используется тот же `TWriteMessage`, что и у `IWriteSession`: метаданные задаются через `MessageMeta()` и передаются в `Write()`:
+
+ ```cpp
+ auto messageData = std::string("message-data");
+ NYdb::NTopic::TWriteMessage writeMessage(messageData);
+ writeMessage.MessageMeta({
+ {"meta-key", "meta-value"},
+ {"another-key", "value"},
+ });
+ session->Write(std::move(writeMessage));
+ ```
+
+ - IProducer
+
+ Как и у сессий записи, метаданные задаются в `TWriteMessage` через `MessageMeta()`. Отличие `IProducer` в том, что сообщение также содержит ключ партиционирования, по которому продюсер выбирает партицию:
+
+ ```cpp
+ auto messageData = std::string("message-data");
+ NYdb::NTopic::TWriteMessage writeMessage("user-42", messageData);
+ writeMessage.MessageMeta({
+ {"meta-key", "meta-value"},
+ {"another-key", "value"},
+ });
+ producer->Write(std::move(writeMessage));
+ ```
+
+ {% endlist %}
- Go
@@ -1541,26 +1765,59 @@
- C++
- Для записи в топик в транзакции необходимо передать ссылку на объект транзакции в метод `Write` сессии записи.
+ {% list tabs %}
- [Пример на GitHub](https://github.com/ydb-platform/ydb-cpp-sdk/blob/main/examples/topic_writer/transaction/main.cpp)
+ - IWriteSession
- ```c++
- NYdb::NQuery::TQueryClient queryClient(driver);
+ Для записи в топик в транзакции необходимо передать ссылку на объект транзакции в метод `Write` сессии записи.
- NYdb::NStatusHelpers::ThrowOnError(queryClient.RetryQuerySync([](NYdb::NQuery::TSession session) -> NYdb::TStatus {
- auto beginTxResult = session.BeginTransaction().GetValueSync();
- if (!beginTxResult.IsSuccess()) {
- return beginTxResult;
- }
- auto tx = beginTxResult.GetTransaction();
+ [Пример на GitHub](https://github.com/ydb-platform/ydb-cpp-sdk/blob/main/examples/topic_writer/transaction/main.cpp)
- NYdb::NTopic::TWriteMessage writeMessage("message");
+ ```c++
+ NYdb::NQuery::TQueryClient queryClient(driver);
- topicSession->Write(std::move(writeMessage), tx);
- return tx.Commit().GetValueSync();
- }));
- ```
+ NYdb::NStatusHelpers::ThrowOnError(queryClient.RetryQuerySync([](NYdb::NQuery::TSession session) -> NYdb::TStatus {
+ auto beginTxResult = session.BeginTransaction().GetValueSync();
+ if (!beginTxResult.IsSuccess()) {
+ return beginTxResult;
+ }
+ auto tx = beginTxResult.GetTransaction();
+
+ NYdb::NTopic::TWriteMessage writeMessage("message");
+
+ topicSession->Write(std::move(writeMessage), tx);
+ return tx.Commit().GetValueSync();
+ }));
+ ```
+
+ - ISimpleBlockingWriteSession
+
+ Как и `IWriteSession`, `ISimpleBlockingWriteSession` поддерживает запись в транзакции. Так как у простого варианта нет `ContinuationToken`, объект транзакции передаётся вторым аргументом в `Write()`.
+
+ [Пример на GitHub](https://github.com/ydb-platform/ydb-cpp-sdk/blob/main/examples/topic_writer/transaction/main.cpp)
+
+ ```c++
+ NYdb::NQuery::TQueryClient queryClient(driver);
+
+ NYdb::NStatusHelpers::ThrowOnError(queryClient.RetryQuerySync([](NYdb::NQuery::TSession session) -> NYdb::TStatus {
+ auto beginTxResult = session.BeginTransaction().GetValueSync();
+ if (!beginTxResult.IsSuccess()) {
+ return beginTxResult;
+ }
+ auto tx = beginTxResult.GetTransaction();
+
+ NYdb::NTopic::TWriteMessage writeMessage("message");
+
+ topicSession->Write(std::move(writeMessage), &tx);
+ return tx.Commit().GetValueSync();
+ }));
+ ```
+
+ - IProducer
+
+ У `IProducer` транзакция задаётся не аргументом `Write`, а в `TWriteMessage` через `Tx()`. После этого продюсер записывает сообщение так же, как обычное сообщение с ключом партиционирования:
+
+ {% endlist %}
- Go