aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorildar-khisam <ikhis@ydb.tech>2023-07-18 18:54:40 +0300
committerildar-khisam <ikhis@ydb.tech>2023-07-18 18:54:40 +0300
commitf464272dbb1eed2bf8d4c183f5c39763cb7a476f (patch)
tree93fc9c6ad31ac549ec105fd4d3ddc8412d799930
parentea4df5b8aa0b9158149035f739841c3617c47626 (diff)
downloadydb-f464272dbb1eed2bf8d4c183f5c39763cb7a476f.tar.gz
docs cpp sdk draft
docs cpp sdk draft
-rw-r--r--ydb/docs/en/core/reference/ydb-sdk/topic.md364
-rw-r--r--ydb/docs/ru/core/reference/ydb-sdk/topic.md368
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/topic.h5
3 files changed, 700 insertions, 37 deletions
diff --git a/ydb/docs/en/core/reference/ydb-sdk/topic.md b/ydb/docs/en/core/reference/ydb-sdk/topic.md
index ee89ade7762..34e771ef461 100644
--- a/ydb/docs/en/core/reference/ydb-sdk/topic.md
+++ b/ydb/docs/en/core/reference/ydb-sdk/topic.md
@@ -5,23 +5,72 @@ This article provides examples of how to use the {{ ydb-short-name }} SDK to wor
Before performing the examples, [create a topic](../ydb-cli/topic-create.md) and [add a consumer](../ydb-cli/topic-consumer-add.md).
+## Initializing a connection {#init}
+
+{% list tabs %}
+
+- C++
+
+ To interact with YDB Topics, create an instance of the YDB driver and topic client.
+
+ The YDB driver lets the app and YDB interact at the transport layer. The driver must exist during the YDB access lifecycle and be initialized before creating a client.
+
+ Topic client ([source code](https://github.com/ydb-platform/ydb/blob/d2d07d368cd8ffd9458cc2e33798ee4ac86c733c/ydb/public/sdk/cpp/client/ydb_topic/topic.h#L1589)) requires the YDB driver for work. It handles topics and manages read and write sessions.
+
+ App code snippet for driver initialization:
+ ```cpp
+ auto driverConfig = TDriverConfig()
+ .SetEndpoint(opts.Endpoint)
+ .SetDatabase(opts.Database)
+ .SetAuthToken(GetEnv("YDB_TOKEN"));
+
+ TDriver driver(driverConfig);
+ ```
+
+ This example uses authentication token from the `YDB_TOKEN` environment variable. For details see [Connecting to a database](../../concepts/connect.md) и [Authentication](../../concepts/auth.md) pages.
+
+ App code snippet for creating a client:
+
+ ```cpp
+ TTopicClient topicClient(driver);
+ ```
+
+{% endlist %}
+
## Managing topics {#manage}
### Creating a topic {#create-topic}
{% list tabs %}
-The only mandatory parameter for creating a topic is its path, other parameters are optional.
+- C++
+
+ The topic path is mandatory. Other parameters are optional.
+ For a full list of supported parameters, see the [source code](https://github.com/ydb-platform/ydb/blob/d2d07d368cd8ffd9458cc2e33798ee4ac86c733c/ydb/public/sdk/cpp/client/ydb_topic/topic.h#L394).
+
+ Example of creating a topic with three partitions and ZSTD codec support:
+
+ ```cpp
+ auto settings = NYdb::NTopic::TCreateTopicSettings()
+ .PartitioningSettings(3, 3)
+ .AppendSupportedCodecs(NYdb::NTopic::ECodec::ZSTD);
+
+ auto status = topicClient
+ .CreateTopic("my-topic", settings) // returns TFuture<TStatus>
+ .GetValueSync();
+ ```
- Go
+ The topic path is mandatory. Other parameters are optional.
+
For a full list of supported parameters, see the [SDK documentation](https://pkg.go.dev/github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions#CreateOption).
- Example of creating a topic with a list of supported codecs and a minimum number of partitions
+ Example of creating a topic with a list of supported codecs and a minimum number of partitions:
```go
err := db.Topic().Create(ctx, "topic-path",
- // optional
+ // optional
topicoptions.CreateWithSupportedCodecs(topictypes.CodecRaw, topictypes.CodecGzip),
// optional
@@ -31,6 +80,8 @@ The only mandatory parameter for creating a topic is its path, other parameters
- Python
+ Example of creating a topic with a list of supported codecs and a minimum number of partitions:
+
```python
driver.topic_client.create_topic(topic_path,
supported_codecs=[ydb.TopicCodec.RAW, ydb.TopicCodec.GZIP], # optional
@@ -46,11 +97,29 @@ When you update a topic, you must specify the topic path and the parameters to b
{% list tabs %}
+- C++
+
+ For a full list of supported parameters, see the [source code](https://github.com/ydb-platform/ydb/blob/d2d07d368cd8ffd9458cc2e33798ee4ac86c733c/ydb/public/sdk/cpp/client/ydb_topic/topic.h#L458).
+
+ Example of adding an [important consumer](../../concepts/topic#important-consumer) and setting two days [retention time](../../concepts/topic#retention-time) for the topic:
+
+ ```cpp
+ auto alterSettings = NYdb::NTopic::TAlterTopicSettings()
+ .BeginAddConsumer("my-consumer")
+ .Important(true)
+ .EndAddConsumer()
+ .SetRetentionPeriod(TDuration::Days(2));
+
+ auto status = topicClient
+ .AlterTopic("my-topic", alterSettings) // returns TFuture<TStatus>
+ .GetValueSync();
+ ```
+
- Go
For a full list of supported parameters, see the [SDK documentation](https://pkg.go.dev/github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions#AlterOption).
- Example of adding a consumer to a topic
+ Example of adding a consumer to a topic:
```go
err := db.Topic().Alter(ctx, "topic-path",
@@ -71,6 +140,24 @@ When you update a topic, you must specify the topic path and the parameters to b
{% list tabs %}
+- C++
+
+ Use `DescribeTopic` method to get information about topic.
+
+ For a full list of description fields, see the [source code](https://github.com/ydb-platform/ydb/blob/d2d07d368cd8ffd9458cc2e33798ee4ac86c733c/ydb/public/sdk/cpp/client/ydb_topic/topic.h#L163).
+
+ Example of using topic description:
+
+ ```cpp
+ auto result = topicClient.DescribeTopic("my-topic").GetValueSync();
+ if (result.IsSuccess()) {
+ const auto& description = result.GetTopicDescription();
+ std::cout << "Topic description: " << GetProto(description) << std::endl;
+ }
+ ```
+
+ There is another method `DescribeConsumer` to get informtaion about consumer.
+
- Go
```go
@@ -97,6 +184,12 @@ To delete a topic, just specify the path to it.
{% list tabs %}
+- C++
+
+ ```cpp
+ auto status = topicClient.DropTopic("my-topic").GetValueSync();
+ ```
+
- Go
```go
@@ -115,10 +208,28 @@ To delete a topic, just specify the path to it.
### Connecting to a topic for message writes {#start-writer}
-Only connections with matching producer_id and message_group_id are currently supported. This restriction will be removed in the future.
+Only connections with matching [producer and message group](../../concepts/topic#producer-id) identifiers are currently supported (`producer_id` shoud be equal to `message_group_id`). This restriction will be removed in the future.
{% list tabs %}
+- C++
+
+ The write session object with `IWriteSession` interface is used to connect to a topic for writing.
+
+ For a full list of write session settings, see the [source code](https://github.com/ydb-platform/ydb/blob/d2d07d368cd8ffd9458cc2e33798ee4ac86c733c/ydb/public/sdk/cpp/client/ydb_topic/topic.h#L1199).
+
+ Example of creating a write session:
+
+ ```cpp
+ TString producerAndGroupID = "group-id";
+ auto settings = TWriteSessionSettings()
+ .Path("my-topic")
+ .ProducerId(producerAndGroupID)
+ .MessageGroupId(producerAndGroupID);
+
+ auto session = topicClient.CreateWriteSession(settings);
+ ```
+
- Go
```go
@@ -143,6 +254,40 @@ Only connections with matching producer_id and message_group_id are currently su
{% list tabs %}
+- C++
+
+ `IWriteSession` interface allows asynchronous write.
+
+ The user processes three kinds of events in a loop: `TReadyToAcceptEvent`, `TAcksEvent`, and `TSessionClosedEvent`.
+
+ For each kind of event user can set a handler in write session settings before session creation. Also, a common handler can be set.
+
+ If handler is not set for a particular event, it will be delivered to SDK client via `GetEvent` / `GetEvents` methods. `WaitEvent` method allows user to await for a next event in non-blocking way with `TFuture<void>()` interface.
+
+ To write a message, user uses a move-only `TContinuationToken` object, which has been created by the SDK and has been delivered to the user with a `TReadyToAcceptEvent` event. During write user can set an arbitrary sequential number and a message creation timestamp. By default they are generated by the SDK.
+
+ `Write` is asynchronous. Data from messages is processed and stored in the internal buffer. Settings `MaxMemoryUsage`, `MaxInflightCount`, `BatchFlushInterval`, and `BatchFlushSizeBytes` control sending in the background. Write session reconnects to the YDB if the connection fails and resends the message if possible, with regard to `RetryPolicy` setting. If an error that cannot be repeated is received, write session stops and sends `TSessionClosedEvent` to the client.
+
+ Example of writing using event loop without any handlers set up:
+ ```cpp
+ // Event loop
+ while (true) {
+ // Get event
+ // May block for a while if write session is busy
+ TMaybe<TWriteSessionEvent::TEvent> event = session->GetEvent(/*block=*/true);
+
+ if (auto* readyEvent = std::get_if<TWriteSessionEvent::TReadyToAcceptEvent>(&*event)) {
+ session->Write(std::move(event.ContinuationToken), "This is yet another message.");
+
+ } else if (auto* ackEvent = std::get_if<TWriteSessionEvent::TAcksEvent>(&*event)) {
+ std::cout << ackEvent->DebugString() << std::endl;
+
+ } else if (auto* closeSessionEvent = std::get_if<TSessionClosedEvent>(&*event)) {
+ break;
+ }
+ }
+ ```
+
- Go
To send a message, just save Reader in the Data field, from which the data can be read. You can expect the data of each message to be read once (or until the first error). By the time you return the data from Write, it will already have been read and stored in the internal buffer.
@@ -200,6 +345,35 @@ Only connections with matching producer_id and message_group_id are currently su
{% list tabs %}
+- C++
+
+ `IWriteSession` interface allows getting server acknowledgments for writes.
+
+ Status of server-side message write is represented with `TAcksEvent`. One event can contain the statuses of several previously sent messages.Status is one of the following: message write is confirmed (`EES_WRITTEN`), message is discarded as a duplicate of a previously written message (`EES_ALREADY_WRITTEN`) or message is discarded because of failure (`EES_DISCARDED`).
+
+ Example of setting TAcksEvent handler for a write session:
+ ```cpp
+ auto settings = TWriteSessionSettings()
+ // other settings are set here
+ .EventHandlers(
+ TWriteSessionSettings::TEventHandlers()
+ .AcksHandler(
+ [&](TWriteSessionEvent::TAcksEvent& event) {
+ for (const auto& ack : event.Acks) {
+ if (ack.State == TWriteAck::EEventState::EES_WRITTEN) {
+ ackedSeqNo.insert(ack.SeqNo);
+ std::cout << "Acknowledged message with seqNo " << ack.SeqNo << std::endl;
+ }
+ }
+ }
+ )
+ );
+
+ auto session = topicClient.CreateWriteSession(settings);
+ ```
+
+ In this write session user does not receive `TAcksEvent` events in the `GetEvent` / `GetEvents` loop. Instead, SDK will call given handler on every acknowledgment coming from server. In the same way user can set up handlers for other types of events.
+
- Go
When connected, you can specify the synchronous message write option: topicoptions.WithSyncWrite(true). Then Write will only return after receiving a confirmation from the server that all messages passed in the call have been saved. If necessary, the SDK will reconnect and retry sending messages as usual. In this mode, the context only controls the response time from the SDK, meaning the SDK will continue trying to send messages even after the context is canceled.
@@ -252,14 +426,33 @@ Only connections with matching producer_id and message_group_id are currently su
### Selecting a codec for message compression {#codec}
-By default, the SDK selects the codec automatically (subject to topic settings). In automatic mode, the SDK first sends one group of messages with each of the allowed codecs, then it sometimes tries to compress messages with all the available codecs, and then selects the codec that yields the smallest message size. If the list of allowed codecs for the topic is empty, the SDK makes automatic selection between Raw and Gzip codecs.
+For more details on using data compression for topics, see [here](../../concepts/topic#message-codec).
-If necessary, a fixed codec can be set in the connection options. It will then be used and no measurements will be taken.
{% list tabs %}
+- C++
+
+ The message compression can be set on the [write session creation](#start-writer) with `Codec` and `CompressionLevel` settings. By default, GZIP codec is chosen.
+
+ Example of creating a write session with no data compression:
+
+ ```cpp
+ auto settings = TWriteSessionSettings()
+ // other settings are set here
+ .Codec(ECodec::RAW);
+
+ auto session = topicClient.CreateWriteSession(settings);
+ ```
+
+ Write session allows sending a message compressed with other codec. For this use `WriteEncoded` method, specify codec used and original message byte size. The codec must be allowed in topic settings.
+
- Go
+ By default, the SDK selects the codec automatically based on topic settings. In automatic mode, the SDK first sends one group of messages with each of the allowed codecs, then it sometimes tries to compress messages with all the available codecs, and then selects the codec that yields the smallest message size. If the list of allowed codecs for the topic is empty, the SDK makes automatic selection between Raw and Gzip codecs.
+
+ If necessary, a fixed codec can be set in the connection options. It will then be used and no measurements will be taken.
+
```go
producerAndGroupID := "group-id"
writer, _ := db.Topic().StartWriter(producerAndGroupID, "topicName",
@@ -270,6 +463,10 @@ If necessary, a fixed codec can be set in the connection options. It will then b
- Python
+ By default, the SDK selects the codec automatically based on topic settings. In automatic mode, the SDK first sends one group of messages with each of the allowed codecs, then it sometimes tries to compress messages with all the available codecs, and then selects the codec that yields the smallest message size. If the list of allowed codecs for the topic is empty, the SDK makes automatic selection between Raw and Gzip codecs.
+
+ If necessary, a fixed codec can be set in the connection options. It will then be used and no measurements will be taken.
+
```python
writer = driver.topic_client.writer(topic_path,
codec=ydb.TopicCodec.GZIP,
@@ -282,12 +479,28 @@ If necessary, a fixed codec can be set in the connection options. It will then b
### Connecting to a topic for message reads {#start-reader}
-To create a connection to the existing `my-topic` topic via the added `my-consumer` consumer, use the following code:
-
{% list tabs %}
+- C++
+
+ The read session object with `IReadSession` interface is used to connect to one or more topics for reading.
+
+ For a full list of read session settings, see `TReadSessionSettings` class in the [source code](https://github.com/ydb-platform/ydb/blob/d2d07d368cd8ffd9458cc2e33798ee4ac86c733c/ydb/public/sdk/cpp/client/ydb_topic/topic.h#L1344).
+
+ To establish a connection to the existing `my-topic` topic using the added `my-consumer` consumer, use the following code:
+
+ ```cpp
+ auto settings = TReadSessionSettings()
+ .ConsumerName("my-consumer")
+ .AppendTopics("my-topic");
+
+ auto session = topicClient.CreateReadSession(settings);
+ ```
+
- Go
+ To establish a connection to the existing `my-topic` topic using the added `my-consumer` consumer, use the following code:
+
```go
reader, err := db.Topic().StartReader("my-consumer", topicoptions.ReadTopic("my-topic"))
if err != nil {
@@ -297,16 +510,33 @@ To create a connection to the existing `my-topic` topic via the added `my-consum
- Python
+ To establish a connection to the existing `my-topic` topic using the added `my-consumer` consumer, use the following code:
+
```python
reader = driver.topic_client.reader(topic="topic-path", consumer="consumer_name")
```
{% endlist %}
-You can also use the advanced connection creation option to specify multiple topics and set read parameters. The following code will create a connection to the `my-topic` and `my-specific-topic` topics via the `my-consumer` consumer and also set the time to start reading messages:
+Additional options are used to specify multiple topics and other parameters.
+To establish a connection to the `my-topic` and `my-specific-topic` topics using the `my-consumer` consumer and also set the time to start reading messages, use the following code:
{% list tabs %}
+- C++
+
+ ```cpp
+ auto settings = TReadSessionSettings()
+ .ConsumerName("my-consumer")
+ .AppendTopics("my-topic")
+ .AppendTopics(
+ TTopicReadSettings("my-specific-topic")
+ .ReadFromTimestamp(someTimestamp)
+ );
+
+ auto session = topicClient.CreateReadSession(settings);
+ ```
+
- Go
```go
@@ -333,20 +563,39 @@ You can also use the advanced connection creation option to specify multiple top
### Reading messages {#reading-messages}
-The server stores the [consumer offset](../../concepts/topic.md#consumer-offset). After reading a message, the client can [send a commit to the server](#commit). The consumer offset will change and only uncommitted messages will be read in case of a new connection.
+The server stores the [consumer offset](../../concepts/topic.md#consumer-offset). After reading a message, the client should [send a commit to the server](#commit). The consumer offset changes and only uncommitted messages will be read in case of a new connection.
You can read messages without a [commit](#no-commit) as well. In this case, all uncommited messages, including those processed, will be read if there is a new connection.
Information about which messages have already been processed can be [saved on the client side](#client-commit) by sending the starting consumer offset to the server when creating a new connection. This does not change the consumer offset on the server.
-The SDK receives data from the server in batches and buffers it. Depending on the task, the client code can read messages from the buffer one by one or in batches.
+{% list tabs %}
+
+- C++
+
+ The user processes several kinds of events in a loop: `TDataReceivedEvent`, `TCommitOffsetAcknowledgementEvent`, `TStartPartitionSessionEvent`, `TStopPartitionSessionEvent`, `TPartitionSessionStatusEvent`, `TPartitionSessionClosedEvent` and `TSessionClosedEvent`.
+
+ For each kind of event user can set a handler in read session settings before session creation. Also, a common handler can be set.
+
+ If handler is not set for a particular event, it will be delivered to SDK client via `GetEvent` / `GetEvents` methods. `WaitEvent` method allows user to await for a next event in non-blocking way with `TFuture<void>()` interface.
+
+- Go
+
+ The SDK receives data from the server in batches and buffers it. Depending on the task, the client code can read messages from the buffer one by one or in batches.
+
+{% endlist %}
+
### Reading without a commit {#no-commit}
-To read messages one by one, use the following code:
+#### Reading messages one by one
{% list tabs %}
+- C++
+
+Reading messages one-by-one is not supported in the C++ SDK. Class `TDataReceivedEvent` represents a batch of read messages.
+
- Go
```go
@@ -371,10 +620,30 @@ To read messages one by one, use the following code:
{% endlist %}
-To read message batches, use the following code:
+#### Reading message batches
{% list tabs %}
+- C++
+
+ One simple way to read messages is to use `SimpleDataHandlers` setting when creating a read session. With it you only set a handler for a `TDataReceivedEvent`. SDK will call it for each batch of messages that came from server. By default, SDK does not send back acknowledgments of successful reads.
+
+ ```cpp
+ auto settings = TReadSessionSettings()
+ .EventHandlers_.SimpleDataHandlers(
+ [](TReadSessionEvent::TDataReceivedEvent& event) {
+ std::cout << "Get data event " << DebugString(event);
+ }
+ );
+
+ auto session = topicClient.CreateReadSession(settings);
+
+ // Wait SessionClosed event.
+ ReadSession->GetEvent(/* block = */true);
+ ```
+
+ In this example client creates read session and just awaits session close in the main thread. All other event types are handled by SDK.
+
- Go
```go
@@ -405,10 +674,14 @@ Confirmation of message processing (commit) informs the server that the message
For example, if messages 1, 2, 3 are received from the server, the program processes them in parallel and sends confirmations in the following order: 1, 3, 2. In this case, message 1 will be committed first, and messages 2 and 3 will be committed only after the server receives confirmation of the processing of message 2.
-To commit messages one by one, use the following code:
+#### Reading messages one by one with commits
{% list tabs %}
+- C++
+
+Reading messages one-by-one is not supported in the C++ SDK. Class `TDataReceivedEvent` represents a batch of read messages.
+
- Go
```go
@@ -435,10 +708,29 @@ To commit messages one by one, use the following code:
{% endlist %}
-To commit message batches, use the following code:
+#### Reading message batches with commits
{% list tabs %}
+- C++
+
+ Same as [above example](#no-commit), when using `SimpleDataHandlers` handlers you only set handler for a `TDataReceivedEvent`. SDK will call it for each batch of messages that came from server. By setting `commitDataAfterProcessing = true`, you tell SDK to send back commits after executing a handler for corresponding event.
+
+ ```cpp
+ auto settings = TReadSessionSettings()
+ .EventHandlers_.SimpleDataHandlers(
+ [](TReadSessionEvent::TDataReceivedEvent& event) {
+ std::cout << "Get data event " << DebugString(event);
+ }
+ , /* commitDataAfterProcessing = */true
+ );
+
+ auto session = topicClient.CreateReadSession(settings);
+
+ // Wait SessionClosed event.
+ ReadSession->GetEvent(/* block = */true);
+ ```
+
- Go
```go
@@ -465,12 +757,18 @@ To commit message batches, use the following code:
{% endlist %}
-#### Reading with consumer offset storage on the client side {#client-commit}
+### Reading with consumer offset storage on the client side {#client-commit}
When reading starts, the client code must transmit the starting consumer offset to the server:
{% list tabs %}
+- C++
+
+ Setting the starting offset for reading is not supported in the current C++ SDK.
+
+ The `ReadFromTimestamp` setting is used for reading only messages with write timestamps no less than the given one.
+
- Go
```go
@@ -531,6 +829,21 @@ In case of a _hard interruption_, the client receives a notification that it is
{% list tabs %}
+- C++
+
+ The `TStopPartitionSessionEvent` class is used for soft reading interruption. It helps user to stop message processing gracefully.
+
+ Example of event loop fragment:
+
+ ```cpp
+ auto event = ReadSession->GetEvent(/*block=*/true);
+ if (auto* stopPartitionSessionEvent = std::get_if<TReadSessionEvent::TStopPartitionSessionEvent>(&*event)) {
+ stopPartitionSessionEvent->Confirm();
+ } else {
+ // other event types
+ }
+ ```
+
- Go
The client code immediately receives all messages from the buffer (on the SDK side) even if they are not enough to form a batch during batch processing.
@@ -565,6 +878,23 @@ In case of a _hard interruption_, the client receives a notification that it is
{% list tabs %}
+- C++
+
+ The hard interruption of reading messages is implemented using an `TPartitionSessionClosedEvent` event. It can be received either as soft interrupt confirmation response, or in the case of lost connection. The user can find out the reason for session closing using the `GetReason` method.
+
+ Example of event loop fragment:
+
+ ```cpp
+ auto event = ReadSession->GetEvent(/*block=*/true);
+ if (auto* partitionSessionClosedEvent = std::get_if<TReadSessionEvent::TPartitionSessionClosedEvent>(&*event)) {
+ if (partitionSessionClosedEvent->GetReason() == TPartitionSessionClosedEvent::EReason::ConnectionLost) {
+ std::cout << "Connection with partition was lost" << std::endl;
+ }
+ } else {
+ // other event types
+ }
+ ```
+
- Go
When reading is interrupted, the message or message batch context is canceled.
diff --git a/ydb/docs/ru/core/reference/ydb-sdk/topic.md b/ydb/docs/ru/core/reference/ydb-sdk/topic.md
index 26bb576483f..69aca92d0aa 100644
--- a/ydb/docs/ru/core/reference/ydb-sdk/topic.md
+++ b/ydb/docs/ru/core/reference/ydb-sdk/topic.md
@@ -5,23 +5,73 @@
Перед выполнением примеров [создайте топик](../ydb-cli/topic-create.md) и [добавьте читателя](../ydb-cli/topic-consumer-add.md).
+## Инициализация соединения с топиками
+
+{% list tabs %}
+
+- C++
+
+ Для работы с топиками создаются экземпляры драйвера YDB и клиента.
+
+ Драйвер YDB отвечает за взаимодействие приложения и YDB на транспортном уровне. Драйвер должен существовать на всем протяжении жизненного цикла работы с топиками и должен быть инициализирован перед созданием клиента.
+
+ Клиент сервиса топиков ([исходный код](https://github.com/ydb-platform/ydb/blob/d2d07d368cd8ffd9458cc2e33798ee4ac86c733c/ydb/public/sdk/cpp/client/ydb_topic/topic.h#L1589)) работает поверх драйвера YDB и отвечает за управляющие операции с топиками, а также создание сессий чтения и записи.
+
+ Фрагмент кода приложения для инициализации драйвера YDB:
+ ```cpp
+ // Create driver instance.
+ auto driverConfig = TDriverConfig()
+ .SetEndpoint(opts.Endpoint)
+ .SetDatabase(opts.Database)
+ .SetAuthToken(GetEnv("YDB_TOKEN"));
+
+ TDriver driver(driverConfig);
+ ```
+ В этом примере используется аутентификационный токен, сохранённый в переменной окружения `YDB_TOKEN`. Подробнее про [соединение с БД](../../concepts/connect.md) и [аутентификацию](../../concepts/auth.md).
+
+ Фрагмент кода приложения для создания клиента:
+ ```cpp
+ TTopicClient topicClient(driver);
+ ```
+
+
+{% endlist %}
+
## Управление топиками {#manage}
### Создание топика {#create-topic}
{% list tabs %}
-Единственный обязательный параметр для создания топика - это его путь, остальные параметры - опциональны.
+- C++
+
+ Единственный обязательный параметр для создания топика - это его путь. Остальные настройки опциональны и представлены структурой `TCreateTopicSettings`.
+
+ Полный список настроек смотри [в заголовочном файле](https://github.com/ydb-platform/ydb/blob/d2d07d368cd8ffd9458cc2e33798ee4ac86c733c/ydb/public/sdk/cpp/client/ydb_topic/topic.h#L394).
+
+ Пример создания топика c тремя партициями и поддержкой кодека ZSTD:
+
+ ```cpp
+ auto settings = NYdb::NTopic::TCreateTopicSettings()
+ .PartitioningSettings(3, 3)
+ .AppendSupportedCodecs(NYdb::NTopic::ECodec::ZSTD);
+
+ auto status = topicClient
+ .CreateTopic("my-topic", settings) // returns TFuture<TStatus>
+ .GetValueSync();
+ ```
- Go
+ Единственный обязательный параметр для создания топика - это его путь, остальные параметры опциональны.
+
Полный список поддерживаемых параметров можно посмотреть в [документации SDK](https://pkg.go.dev/github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions#CreateOption).
- Пример создания топика со списком поддерживаемых кодеков и минимальным количество партиций
+ Пример создания топика со списком поддерживаемых кодеков и минимальным количеством партиций
```go
err := db.Topic().Create(ctx, "topic-path",
- // optional
+ // optional
topicoptions.CreateWithSupportedCodecs(topictypes.CodecRaw, topictypes.CodecGzip),
// optional
@@ -31,6 +81,8 @@
- Python
+ Пример создания топика со списком поддерживаемых кодеков и минимальным количеством партиций
+
```python
driver.topic_client.create_topic(topic_path,
supported_codecs=[ydb.TopicCodec.RAW, ydb.TopicCodec.GZIP], # optional
@@ -42,12 +94,32 @@
### Изменение топика {#alter-topic}
-При изменении топика в параметрах нужно указать путь топика и те параметры, которые будут изменяться.
-
{% list tabs %}
+- C++
+
+ При изменении топика в параметрах метода `AlterTopic` нужно указать путь топика и параметры, которые будут изменяться. Изменяемые параметры представлены структурой `TAlterTopicSettings`.
+
+ Полный список настроек смотри [в заголовочном файле](https://github.com/ydb-platform/ydb/blob/d2d07d368cd8ffd9458cc2e33798ee4ac86c733c/ydb/public/sdk/cpp/client/ydb_topic/topic.h#L458).
+
+ Пример добавления [важного читателя](../../concepts/topic#important-consumer) к топику и установки [времени хранения сообщений](../../concepts/topic#retention-time) для топика в два дня:
+
+ ```cpp
+ auto alterSettings = NYdb::NTopic::TAlterTopicSettings()
+ .BeginAddConsumer("my-consumer")
+ .Important(true)
+ .EndAddConsumer()
+ .SetRetentionPeriod(TDuration::Days(2));
+
+ auto status = topicClient
+ .AlterTopic("my-topic", alterSettings) // returns TFuture<TStatus>
+ .GetValueSync();
+ ```
+
- Go
+ При изменении топика в параметрах нужно указать путь топика и те параметры, которые будут изменяться.
+
Полный список поддерживаемых параметров можно посмотреть в [документации SDK](https://pkg.go.dev/github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions#AlterOption).
Пример добавления читателя к топику
@@ -71,6 +143,26 @@
{% list tabs %}
+- C++
+
+ Для получения информации о топике используется метод `DescribeTopic`.
+
+ Описание топика представлено структурой `TTopicDescription`.
+
+ Полный список полей описания смотри [в заголовочном файле](https://github.com/ydb-platform/ydb/blob/d2d07d368cd8ffd9458cc2e33798ee4ac86c733c/ydb/public/sdk/cpp/client/ydb_topic/topic.h#L163).
+
+ Получить доступ к этому описанию можно так:
+
+ ```cpp
+ auto result = topicClient.DescribeTopic("my-topic").GetValueSync();
+ if (result.IsSuccess()) {
+ const auto& description = result.GetTopicDescription();
+ std::cout << "Topic description: " << GetProto(description) << std::endl;
+ }
+ ```
+
+ Существует отдельный метод для получения информации о читателе - `DescribeConsumer`.
+
- Go
```go
@@ -97,6 +189,12 @@
{% list tabs %}
+- C++
+
+ ```cpp
+ auto status = topicClient.DropTopic("my-topic").GetValueSync();
+ ```
+
- Go
```go
@@ -115,10 +213,28 @@
### Подключение к топику для записи сообщений {#start-writer}
-На данный момент поддерживается подключение только с совпадающими producer_id и message_group_id, в будущем это ограничение будет снято.
+На данный момент поддерживается подключение только с совпадающими идентификаторами [источника и группы сообщений](../../concepts/topic#producer-id) (`producer_id` и `message_group_id`), в будущем это ограничение будет снято.
{% list tabs %}
+- C++
+
+ Подключение к топику на запись представлено объектом сессии записи с интерфейсом `IWriteSession` или `ISimpleBlockingWriteSession` (вариант для простой записи по одному сообщению без подтверждения, блокирующейся при превышении числа inflight записей или размера буфера SDK). Настройки сессии записи представлены структурой `TWriteSessionSettings`, для варианта `ISimpleBlockingWriteSession` часть настроек не поддерживается.
+
+ Полный список настроек смотри [в заголовочном файле](https://github.com/ydb-platform/ydb/blob/d2d07d368cd8ffd9458cc2e33798ee4ac86c733c/ydb/public/sdk/cpp/client/ydb_topic/topic.h#L1199).
+
+ Пример создания сессии записи с интерфейсом `IWriteSession`.
+
+ ```cpp
+ TString producerAndGroupID = "group-id";
+ auto settings = TWriteSessionSettings()
+ .Path("my-topic")
+ .ProducerId(producerAndGroupID)
+ .MessageGroupId(producerAndGroupID);
+
+ auto session = topicClient.CreateWriteSession(settings);
+ ```
+
- Go
```go
@@ -143,6 +259,40 @@
{% list tabs %}
+- C++
+
+ Асинхронная запись возможна через интерфейс `IWriteSession`.
+
+ Работа пользователя с объектом `IWriteSession` в общем устроена как обработка цикла событий с тремя типами событий: `TReadyToAcceptEvent`, `TAcksEvent` и `TSessionClosedEvent`.
+
+ Для каждого из типов событий можно установить обработчик этого события, а также можно установить общий обработчик. Обработчики устанавливаются в настройках сессии записи перед её созданием.
+
+ Если обработчик для некоторого события не установлен, его необходимо получить и обработать в методах `GetEvent` / `GetEvents`. Для неблокирующего ожидания очередного события есть метод `WaitEvent` с интерфейсом `TFuture<void>()`.
+
+ Для записи каждого сообщения пользователь должен "потратить" move-only объект `TContinuationToken`, который выдаёт SDK с событием `TReadyToAcceptEvent`. При записи сообщения можно установить пользовательские seqNo и временную метку создания, но по умолчанию их проставляет SDK автоматически.
+
+ По умолчанию `Write` выполняется асинхронно - данные из сообщений вычитываются и сохраняются во внутренний буфер, отправка происходит в фоне в соответствии с настройками `MaxMemoryUsage`, `MaxInflightCount`, `BatchFlushInterval`, `BatchFlushSizeBytes`. Сессия сама переподключается к YDB при обрывах связи и повторяет отправку сообщений пока это возможно, в соответствии с настройкой `RetryPolicy`. При получении ошибки, которую невозможно повторить, сессия чтения отправляет пользователю `TSessionClosedEvent` с диагностической информацией.
+
+ Так может выглядеть запись нескольких сообщений в цикле событий без использования обработчиков:
+ ```cpp
+ // Event loop
+ while (true) {
+ // Get event
+ // May block for a while if write session is busy
+ TMaybe<TWriteSessionEvent::TEvent> event = session->GetEvent(/*block=*/true);
+
+ if (auto* readyEvent = std::get_if<TWriteSessionEvent::TReadyToAcceptEvent>(&*event)) {
+ session->Write(std::move(event.ContinuationToken), "This is yet another message.");
+
+ } else if (auto* ackEvent = std::get_if<TWriteSessionEvent::TAcksEvent>(&*event)) {
+ std::cout << ackEvent->DebugString() << std::endl;
+
+ } else if (auto* closeSessionEvent = std::get_if<TSessionClosedEvent>(&*event)) {
+ break;
+ }
+ }
+ ```
+
- Go
Для отправки сообщения - достаточно в поле Data сохранить Reader, из которого можно будет прочитать данные. Можно рассчитывать на то что данные каждого сообщения читаются один раз (или до первой ошибки), к моменту возврата из Write данные будут уже прочитаны и сохранены во внутренний буфер.
@@ -200,6 +350,35 @@
{% list tabs %}
+- C++
+
+ Получение подтверждений от сервера возможно через интерфейс `IWriteSession`.
+
+ Ответы о записи сообщений на сервере приходят клиенту SDK в виде событий `TAcksEvent`. В одном событии могут содержаться ответы о нескольких отправленных ранее сообщениях. Варианты ответа: запись подтверждена (`EES_WRITTEN`), запись отброшена как дубликат ранее записанного сообщения (`EES_ALREADY_WRITTEN`) или запись отброшена по причине сбоя (`EES_DISCARDED`).
+
+ Пример установки обработчика TAcksEvent для сессии записи:
+ ```cpp
+ auto settings = TWriteSessionSettings()
+ // other settings are set here
+ .EventHandlers(
+ TWriteSessionSettings::TEventHandlers()
+ .AcksHandler(
+ [&](TWriteSessionEvent::TAcksEvent& event) {
+ for (const auto& ack : event.Acks) {
+ if (ack.State == TWriteAck::EEventState::EES_WRITTEN) {
+ ackedSeqNo.insert(ack.SeqNo);
+ std::cout << "Acknowledged message with seqNo " << ack.SeqNo << std::endl;
+ }
+ }
+ }
+ )
+ );
+
+ auto session = topicClient.CreateWriteSession(settings);
+ ```
+
+ В такой сессии записи события `TAcksEvent` не будут приходить пользователю в `GetEvent` / `GetEvents`, вместо этого SDK при получении подтверждений от сервера будет вызывать переданный обработчик. Аналогично можно настраивать обработчики на остальные типы событий.
+
- Go
При подключении можно указать опцию синхронной записи сообщений - topicoptions.WithSyncWrite(true). Тогда Write будет возвращаться только после того как получит подтверждение с сервера о сохранении всех, сообщений переданных в вызове. При этом SDK так же как и обычно будет при необходимости переподключаться и повторять отправку сообщений. В этом режиме контекст управляет только временем ожидания ответа из SDK, т.е. даже после отмены контекста SDK продолжит попытки отправить сообщения.
@@ -252,14 +431,32 @@
### Выбор кодека для сжатия сообщений {#codec}
-По умолчанию SDK выбирает кодек автоматически (с учетом настроек топика). В автоматическом режиме SDK сначала отправляет по одной группе сообщений каждым из разрешенных кодеков, затем иногда будет пробовать сжать сообщения всеми доступными кодеками и выбирать кодек, дающий наименьший размер сообщения. Если для топика список разрешенных кодеков пуст, то автовыбор производится между Raw и Gzip-кодеками.
-
-При необходимости можно задать фиксированный кодек в опциях подключения. Тогда будет использоваться именно он и замеры проводиться не будут.
+Подробнее о [сжатии данных в топиках](../../concepts/topic#message-codec).
{% list tabs %}
+- C++
+
+ Сжатие, которое используется при отправке сообщений методом `Write`, задаётся при [создании сессии записи](#start-writer) настройками `Codec` и `CompressionLevel`. По умолчанию выбирается кодек GZIP.
+ Пример создания сессии записи без сжатия сообщений:
+
+ ```cpp
+ auto settings = TWriteSessionSettings()
+ // other settings are set here
+ .Codec(ECodec::RAW);
+
+ auto session = topicClient.CreateWriteSession(settings);
+ ```
+
+ Если необходимо в рамках сессии записи отправить сообщение, сжатое другим кодеком, можно использовать метод `WriteEncoded` с указанием кодека и размера расжатого сообщения. Для успешной записи этим способом используемый кодек должен быть разрешён в настройках топика.
+
+
- Go
+ По умолчанию SDK выбирает кодек автоматически (с учетом настроек топика). В автоматическом режиме SDK сначала отправляет по одной группе сообщений каждым из разрешенных кодеков, затем иногда будет пробовать сжать сообщения всеми доступными кодеками и выбирать кодек, дающий наименьший размер сообщения. Если для топика список разрешенных кодеков пуст, то автовыбор производится между Raw и Gzip-кодеками.
+
+ При необходимости можно задать фиксированный кодек в опциях подключения. Тогда будет использоваться именно он и замеры проводиться не будут.
+
```go
producerAndGroupID := "group-id"
writer, _ := db.Topic().StartWriter(producerAndGroupID, "topicName",
@@ -270,6 +467,10 @@
- Python
+ По умолчанию SDK выбирает кодек автоматически (с учетом настроек топика). В автоматическом режиме SDK сначала отправляет по одной группе сообщений каждым из разрешенных кодеков, затем иногда будет пробовать сжать сообщения всеми доступными кодеками и выбирать кодек, дающий наименьший размер сообщения. Если для топика список разрешенных кодеков пуст, то автовыбор производится между Raw и Gzip-кодеками.
+
+ При необходимости можно задать фиксированный кодек в опциях подключения. Тогда будет использоваться именно он и замеры проводиться не будут.
+
```python
writer = driver.topic_client.writer(topic_path,
codec=ydb.TopicCodec.GZIP,
@@ -282,12 +483,28 @@
### Подключение к топику для чтения сообщений {#start-reader}
-Чтобы создать подключение к существующему топику `my-topic` через добавленного ранее читателя `my-consumer`, используйте следующий код:
-
{% list tabs %}
+- C++
+
+ Подключение для чтения из одного или нескольких топиков представлено объектом сессии чтения с интерфейсом `IReadSession`. Настройки сессии чтения представлены структурой `TReadSessionSettings`.
+
+ Полный список настроек смотри [в заголовочном файле](https://github.com/ydb-platform/ydb/blob/d2d07d368cd8ffd9458cc2e33798ee4ac86c733c/ydb/public/sdk/cpp/client/ydb_topic/topic.h#L1344).
+
+ Чтобы создать подключение к существующему топику `my-topic` через добавленного ранее читателя `my-consumer`, используйте следующий код:
+
+ ```cpp
+ auto settings = TReadSessionSettings()
+ .ConsumerName("my-consumer")
+ .AppendTopics("my-topic");
+
+ auto session = topicClient.CreateReadSession(settings);
+ ```
+
- Go
+ Чтобы создать подключение к существующему топику `my-topic` через добавленного ранее читателя `my-consumer`, используйте следующий код:
+
```go
reader, err := db.Topic().StartReader("my-consumer", topicoptions.ReadTopic("my-topic"))
if err != nil {
@@ -297,6 +514,8 @@
- Python
+ Чтобы создать подключение к существующему топику `my-topic` через добавленного ранее читателя `my-consumer`, используйте следующий код:
+
```python
reader = driver.topic_client.reader(topic="topic-path", consumer="consumer_name")
```
@@ -307,6 +526,20 @@
{% list tabs %}
+- C++
+
+ ```cpp
+ auto settings = TReadSessionSettings()
+ .ConsumerName("my-consumer")
+ .AppendTopics("my-topic")
+ .AppendTopics(
+ TTopicReadSettings("my-specific-topic")
+ .ReadFromTimestamp(someTimestamp)
+ );
+
+ auto session = topicClient.CreateReadSession(settings);
+ ```
+
- Go
```go
@@ -339,14 +572,33 @@
Информацию о том, какие сообщения уже обработаны, можно [сохранять на клиентской стороне](#client-commit), передавая на сервер стартовую позицию чтения при создании подключения. При этом позиция чтения сообщений на сервере не изменяется.
-SDK получает данные с сервера партиями и буферизирует их. В зависимости от задач клиентский код может читать сообщения из буфера по одному или пакетами.
+{% list tabs %}
+
+- C++
+
+ Работа пользователя с объектом `IReadSession` в общем устроена как обработка цикла событий со следующими типами событий: `TDataReceivedEvent`, `TCommitOffsetAcknowledgementEvent`, `TStartPartitionSessionEvent`, `TStopPartitionSessionEvent`, `TPartitionSessionStatusEvent`, `TPartitionSessionClosedEvent` и `TSessionClosedEvent`.
+
+ Для каждого из типов событий можно установить обработчик этого события, а также можно установить общий обработчик. Обработчики устанавливаются в настройках сессии записи перед её созданием.
+
+ Если обработчик для некоторого события не установлен, его необходимо получить и обработать в методах `GetEvent` / `GetEvents`. Для неблокирующего ожидания очередного события есть метод `WaitEvent` с сигнатурой `TFuture<void>()`.
+
+- Go
+
+ SDK получает данные с сервера партиями и буферизирует их. В зависимости от задач клиентский код может читать сообщения из буфера по одному или пакетами.
+
+{% endlist %}
+
### Чтение без подтверждения обработки сообщений {#no-commit}
-Чтобы читать сообщения по одному, используйте следующий код:
+#### Чтение сообщений по одному
{% list tabs %}
+- C++
+
+ Чтение сообщений по одному в C++ SDK не предусмотрено. Событие `TDataReceivedEvent` содержит пакет сообщений.
+
- Go
```go
@@ -371,10 +623,31 @@ SDK получает данные с сервера партиями и буфе
{% endlist %}
-Чтобы прочитать пакет сообщений, используйте следующий код:
+#### Чтение сообщений пакетом
{% list tabs %}
+- C++
+
+ При установке сессии чтения с настройкой `SimpleDataHandlers` достаточно передать обработчик для сообщений с данными. SDK будет вызывать этот обработчик на каждый принятый от сервера пакет сообщений. Подтверждения чтения по умолчанию отправляться не будут.
+
+ ```cpp
+ auto settings = TReadSessionSettings()
+ .EventHandlers_.SimpleDataHandlers(
+ [](TReadSessionEvent::TDataReceivedEvent& event) {
+ std::cout << "Get data event " << DebugString(event);
+ }
+ );
+
+ auto session = topicClient.CreateReadSession(settings);
+
+ // Wait SessionClosed event.
+ ReadSession->GetEvent(/* block = */true);
+ ```
+
+ В этом примере после создания сессии основной поток дожидается завершения сессии со стороны сервера в методе `GetEvent`, другие типы событий приходить не будут.
+
+
- Go
```go
@@ -405,10 +678,14 @@ SDK получает данные с сервера партиями и буфе
Например с сервера пришли сообщения 1, 2, 3. Программа обрабатывает их параллельно и отправляет подтверждения в таком порядке: 1, 3, 2. В этом случае сначала будет закоммичено сообщение 1, а сообщения 2 и 3 будут закоммичены только после того как сервер получит подтверждение об обработке сообщения 2.
-Чтобы подтверждать обработку сообщений по одному, используйте следующий код:
+#### Чтение сообщений по одному с подтверждением
{% list tabs %}
+- C++
+
+ Чтение сообщений по одному в C++ SDK не предусмотрено. Событие `TDataReceivedEvent` содержит пакет сообщений.
+
- Go
```go
@@ -435,10 +712,29 @@ SDK получает данные с сервера партиями и буфе
{% endlist %}
-Для подтверждения обработки пакета сообщений используйте следующий код:
+#### Чтение сообщений пакетом с подтверждением
{% list tabs %}
+- C++
+
+ Аналогично [примеру выше](#no-commit), при установке сессии чтения с настройкой `SimpleDataHandlers` достаточно передать обработчик для сообщений с данными. SDK будет вызывать этот обработчик на каждый принятый от сервера пакет сообщений. Передача параметра `commitDataAfterProcessing = true` означает, что SDK будет отправлять на сервер подтверждения чтения всех сообщений после выполнения обработчика.
+
+ ```cpp
+ auto settings = TReadSessionSettings()
+ .EventHandlers_.SimpleDataHandlers(
+ [](TReadSessionEvent::TDataReceivedEvent& event) {
+ std::cout << "Get data event " << DebugString(event);
+ }
+ , /* commitDataAfterProcessing = */true
+ );
+
+ auto session = topicClient.CreateReadSession(settings);
+
+ // Wait SessionClosed event.
+ ReadSession->GetEvent(/* block = */true);
+ ```
+
- Go
```go
@@ -465,12 +761,18 @@ SDK получает данные с сервера партиями и буфе
{% endlist %}
-#### Чтение с хранением позиции на клиентской стороне {#client-commit}
+### Чтение с хранением позиции на клиентской стороне {#client-commit}
При начале чтения клиентский код должен сообщить серверу стартовую позицию чтения:
{% list tabs %}
+- C++
+
+ Чтение с заданной позиции в текущей версии SDK отсутствует.
+
+ Поддерживается настройка `ReadFromTimestamp` для чтения событий с отметками времени записи не меньше данной.
+
- Go
```go
@@ -531,6 +833,21 @@ SDK получает данные с сервера партиями и буфе
{% list tabs %}
+- C++
+
+ Мягкое прерывание приходит в виде события `TStopPartitionSessionEvent` с методом `Confirm`. Клиент может завершить обработку сообщений и отправить подтверждение на сервер.
+
+ Фрагмент цикла событий может выглядеть так:
+
+ ```cpp
+ auto event = ReadSession->GetEvent(/*block=*/true);
+ if (auto* stopPartitionSessionEvent = std::get_if<TReadSessionEvent::TStopPartitionSessionEvent>(&*event)) {
+ stopPartitionSessionEvent->Confirm();
+ } else {
+ // other event types
+ }
+ ```
+
- Go
Клиентский код сразу получает все имеющиеся в буфере (на стороне SDK) сообщения, даже если их не достаточно для формирования пакета при групповой обработке.
@@ -565,6 +882,23 @@ SDK получает данные с сервера партиями и буфе
{% list tabs %}
+- C++
+
+ Жёсткое прерывание приходит в виде события `TPartitionSessionClosedEvent` либо в ответ на подтверждение мягкого прерывания, либо при потере соединения с партицией. Узнать причину можно, вызвав метод `GetReason`.
+
+ Фрагмент цикла событий может выглядеть так:
+
+ ```cpp
+ auto event = ReadSession->GetEvent(/*block=*/true);
+ if (auto* partitionSessionClosedEvent = std::get_if<TReadSessionEvent::TPartitionSessionClosedEvent>(&*event)) {
+ if (partitionSessionClosedEvent->GetReason() == TPartitionSessionClosedEvent::EReason::ConnectionLost) {
+ std::cout << "Connection with partition was lost" << std::endl;
+ }
+ } else {
+ // other event types
+ }
+ ```
+
- Go
При прерывании чтения контекст сообщения или пакета сообщений будет отменен.
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
index 6e9874cd6d5..b548b70e853 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
@@ -1173,9 +1173,8 @@ struct TWriteSessionEvent {
};
struct TAcksEvent {
- //! Acks could be batched from several WriteBatch/Write requests.
- //! Acks for messages from one WriteBatch request could be emitted as several TAcksEvents -
- //! they are provided to client as soon as possible.
+ //! Acks could be batched from several Write requests.
+ //! They are provided to client as soon as possible.
TVector<TWriteAck> Acks;
TString DebugString() const;