aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorOleg Ovcharuk <vgvoleg@gmail.com>2025-03-31 17:00:21 +0300
committerGitHub <noreply@github.com>2025-03-31 18:00:21 +0400
commitfc6fa191edd54417e79c4b42d3a40db36c7cabd6 (patch)
treeeca634fca60b95598088faee8e417ab03ef967b8
parent093ff06a3606246507a47d3f876fdea1b6fc4171 (diff)
downloadydb-fc6fa191edd54417e79c4b42d3a40db36c7cabd6.tar.gz
Python SDK topic-transactions docs (#16203)
Co-authored-by: Ivan Blinkov <ivan@ydb.tech>
-rw-r--r--ydb/docs/en/core/reference/ydb-sdk/topic.md104
-rw-r--r--ydb/docs/ru/core/reference/ydb-sdk/topic.md114
2 files changed, 193 insertions, 25 deletions
diff --git a/ydb/docs/en/core/reference/ydb-sdk/topic.md b/ydb/docs/en/core/reference/ydb-sdk/topic.md
index ea0136553ff..3bc677f89a6 100644
--- a/ydb/docs/en/core/reference/ydb-sdk/topic.md
+++ b/ydb/docs/en/core/reference/ydb-sdk/topic.md
@@ -127,13 +127,13 @@ Before performing the examples, [create a topic](../ydb-cli/topic-create.md) and
{
ProducerId = "ProducerId_Example"
}.Build();
-
+
await using var reader = new ReaderBuilder<string>(driver)
{
ConsumerName = "Consumer_Example",
SubscribeSettings = { new SubscribeSettings(topicName) }
}.Build();
- ```
+ ```
{% endlist %}
@@ -205,7 +205,7 @@ The topic path is mandatory. Other parameters are optional.
.build())
.build());
```
-
+
- С#
Example of creating a topic with a list of supported codecs and a minimum number of partitions:
@@ -788,7 +788,7 @@ Only connections with matching [producer and message group](../../concepts/topic
```c#
var writeCts = new CancellationTokenSource();
writeCts.CancelAfter(TimeSpan.FromSeconds(3));
-
+
await writer.WriteAsync("Hello, Example YDB Topics!", writeCts.Token);
```
@@ -1005,6 +1005,54 @@ All the metadata provided when writing a message is sent to a consumer with the
})
```
+- Python
+
+ To write to a topic within a transaction, create a transactional writer by calling `topic_client.tx_writer` with the `tx` argument. Once created, you can send messages as usual. There's no need to close the transactional writer manually, as it will be closed automatically when the transaction ends.
+
+ In the example below, there is no explicit call to `tx.commit()`; it occurs implicitly upon the successful execution of the `callee` lambda.
+
+ [Example on GitHub](https://github.com/ydb-platform/ydb-python-sdk/blob/main/examples/topic/topic_transactions_example.py)
+
+ ```python
+ with ydb.QuerySessionPool(driver) as session_pool:
+
+ def callee(tx: ydb.QueryTxContext):
+ tx_writer: ydb.TopicTxWriter = driver.topic_client.tx_writer(tx, topic)
+
+ for i in range(message_count):
+ result_stream = tx.execute(query=f"select {i} as res;")
+ for result_set in result_stream:
+ message = str(result_set.rows[0]["res"])
+ tx_writer.write(ydb.TopicWriterMessage(message))
+ print(f"Message {message} was written with tx.")
+
+ session_pool.retry_tx_sync(callee)
+ ```
+
+- Python (asyncio)
+
+ To write to a topic within a transaction, create a transactional writer by calling `topic_client.tx_writer` with the `tx` argument. Once created, you can send messages as usual. There's no need to close the transactional writer manually, as it will be closed automatically when the transaction ends.
+
+ In the example below, there is no explicit call to `tx.commit()`; it occurs implicitly upon the successful execution of the `callee` lambda.
+
+ [Example on GitHub](https://github.com/ydb-platform/ydb-python-sdk/blob/main/examples/topic/topic_transactions_async_example.py)
+
+ ```python
+ async with ydb.aio.QuerySessionPool(driver) as session_pool:
+
+ async def callee(tx: ydb.aio.QueryTxContext):
+ tx_writer: ydb.TopicTxWriterAsyncIO = driver.topic_client.tx_writer(tx, topic)
+
+ for i in range(message_count):
+ async with await tx.execute(query=f"select {i} as res;") as result_stream:
+ async for result_set in result_stream:
+ message = str(result_set.rows[0]["res"])
+ await tx_writer.write(ydb.TopicWriterMessage(message))
+ print(f"Message {result_set.rows[0]['res']} was written with tx.")
+
+ await session_pool.retry_tx_async(callee)
+ ```
+
- Java (sync)
[Example on GitHub](https://github.com/ydb-platform/ydb-java-examples/blob/develop/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionWriteSync.java)
@@ -1285,7 +1333,7 @@ Topic can have several Consumers and for each of them server stores its own read
{
ConsumerName = "Consumer_Example",
SubscribeSettings = { new SubscribeSettings(topicName) }
- }.Build();
+ }.Build();
```
{% endlist %}
@@ -1360,7 +1408,7 @@ To establish a connection to the `my-topic` and `my-specific-topic` topics using
}
}.Build();
```
-
+
{% endlist %}
### Reading messages {#reading-messages}
@@ -1465,7 +1513,7 @@ Data from topics can be read in the context of [transactions](#read-tx). In this
{
}
```
-
+
{% endlist %}
#### Reading message batches
@@ -1544,7 +1592,7 @@ Data from topics can be read in the context of [transactions](#read-tx). In this
foreach (var message in batchMessages.Batch)
{
- logger.LogInformation("Received message: [{MessageData}]", message.Data);
+ logger.LogInformation("Received message: [{MessageData}]", message.Data);
}
}
}
@@ -1644,7 +1692,7 @@ If a commit fails with an error, the application should log it and continue; it
{
}
```
-
+
{% endlist %}
#### Reading message batches with commits
@@ -1736,7 +1784,7 @@ If a commit fails with an error, the application should log it and continue; it
foreach (var message in batchMessages.Batch)
{
- logger.LogInformation("Received message: [{MessageData}]", message.Data);
+ logger.LogInformation("Received message: [{MessageData}]", message.Data);
}
try
@@ -1963,6 +2011,42 @@ Reading progress is usually saved on a server for each Consumer. However, such p
}
```
+- Python
+
+ To read messages from a topic within a transaction, use the `reader.receive_batch_with_tx` method. It reads a batch of messages and adds their commit to the transaction, so there's no need to commit them separately. The reader can be reused across different transactions. However, it's essential to commit transactions in the same order as the messages are read from the reader, as message commits in the topic must be performed strictly in order - otherwise transaction will get an error during commit. The simplest way to ensure this is by using the reader within a loop.
+
+ [Example on GitHub](https://github.com/ydb-platform/ydb-python-sdk/blob/main/examples/topic/topic_transactions_example.py)
+
+ ```python
+ with driver.topic_client.reader(topic, consumer) as reader:
+ with ydb.QuerySessionPool(driver) as session_pool:
+ for _ in range(message_count):
+
+ def callee(tx: ydb.QueryTxContext):
+ batch = reader.receive_batch_with_tx(tx, max_messages=1)
+ print(f"Message {batch.messages[0].data.decode()} was read with tx.")
+
+ session_pool.retry_tx_sync(callee)
+ ```
+
+- Python (asyncio)
+
+ To read messages from a topic within a transaction, use the `reader.receive_batch_with_tx` method. It reads a batch of messages and adds their commit to the transaction, so there's no need to commit them separately. The reader can be reused across different transactions. However, it's essential to commit transactions in the same order as the messages are read from the reader, as message commits in the topic must be performed strictly in order - otherwise transaction will get an error during commit. The simplest way to ensure this is by using the reader within a loop.
+
+ [Example on GitHub](https://github.com/ydb-platform/ydb-python-sdk/blob/main/examples/topic/topic_transactions_async_example.py)
+
+ ```python
+ async with driver.topic_client.reader(topic, consumer) as reader:
+ async with ydb.aio.QuerySessionPool(driver) as session_pool:
+ for _ in range(message_count):
+
+ async def callee(tx: ydb.aio.QueryTxContext):
+ batch = await reader.receive_batch_with_tx(tx, max_messages=1)
+ print(f"Message {batch.messages[0].data.decode()} was read with tx.")
+
+ await session_pool.retry_tx_async(callee)
+ ```
+
- Java (sync)
[Example on GitHub](https://github.com/ydb-platform/ydb-java-examples/blob/develop/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionReadSync.java)
diff --git a/ydb/docs/ru/core/reference/ydb-sdk/topic.md b/ydb/docs/ru/core/reference/ydb-sdk/topic.md
index 5c4025d82fc..753d38630fd 100644
--- a/ydb/docs/ru/core/reference/ydb-sdk/topic.md
+++ b/ydb/docs/ru/core/reference/ydb-sdk/topic.md
@@ -25,7 +25,7 @@
[Примеры на GitHub](https://github.com/ydb-platform/ydb-python-sdk/tree/main/examples/topic)
- C#
-
+
[Примеры на GitHub](https://github.com/ydb-platform/ydb-dotnet-sdk/tree/main/examples/src/Topic)
@@ -117,7 +117,7 @@
loggerFactory: loggerFactory
);
```
-
+
В этом примере используется анонимная аутентификация. Подробнее про [соединение с базой данных](../../concepts/connect.md) и [аутентификацию](../../security/authentication.md).
Фрагмент кода приложения для создания различных клиентов к топикам:
@@ -129,7 +129,7 @@
{
ProducerId = "ProducerId_Example"
}.Build();
-
+
await using var reader = new ReaderBuilder<string>(driver)
{
ConsumerName = "Consumer_Example",
@@ -209,7 +209,7 @@
.build())
.build());
```
-
+
- C#
Пример создания топика со списком поддерживаемых кодеков и минимальным количеством партиций:
@@ -385,7 +385,7 @@
```java
topicClient.dropTopic(topicPath);
```
-
+
- C#
```c#
@@ -503,7 +503,7 @@
return null;
});
```
-
+
- C#
```c#
@@ -770,7 +770,7 @@
}
});
```
-
+
- С#
Асинхронная запись сообщения в топик. В случае переполнения внутреннего буфера будет ожидать, когда буфер освободится для повторной отправки.
@@ -784,7 +784,7 @@
```c#
var writeCts = new CancellationTokenSource();
writeCts.CancelAfter(TimeSpan.FromSeconds(3));
-
+
await writer.WriteAsync("Hello, Example YDB Topics!", writeCts.Token);
```
@@ -1002,6 +1002,54 @@
})
```
+- Python
+
+ Для записи в топик в транзакции необходимо создать транзакционного писателя через вызов `topic_client.tx_writer`. После этого можно отправлять сообщения, как обычно. Закрывать транзакционного писателя не требуется — это происходит автоматически при завершении транзакции.
+
+ В примере ниже нет явного вызова `tx.commit()` — он происходит неявно при успешном завершении лямбды `callee`.
+
+ [Пример на GitHub](https://github.com/ydb-platform/ydb-python-sdk/blob/main/examples/topic/topic_transactions_example.py)
+
+ ```python
+ with ydb.QuerySessionPool(driver) as session_pool:
+
+ def callee(tx: ydb.QueryTxContext):
+ tx_writer: ydb.TopicTxWriter = driver.topic_client.tx_writer(tx, topic)
+
+ for i in range(message_count):
+ result_stream = tx.execute(query=f"select {i} as res;")
+ for result_set in result_stream:
+ message = str(result_set.rows[0]["res"])
+ tx_writer.write(ydb.TopicWriterMessage(message))
+ print(f"Message {message} was written with tx.")
+
+ session_pool.retry_tx_sync(callee)
+ ```
+
+- Python (asyncio)
+
+ Для записи в топик в транзакции необходимо создать транзакционного писателя через вызов `topic_client.tx_writer`. После этого можно отправлять сообщения, как обычно. Закрывать транзакционного писателя не требуется — это происходит автоматически при завершении транзакции.
+
+ В примере ниже нет явного вызова `tx.commit()` — он происходит неявно при успешном завершении лямбды `callee`.
+
+ [Пример на GitHub](https://github.com/ydb-platform/ydb-python-sdk/blob/main/examples/topic/topic_transactions_async_example.py)
+
+ ```python
+ async with ydb.aio.QuerySessionPool(driver) as session_pool:
+
+ async def callee(tx: ydb.aio.QueryTxContext):
+ tx_writer: ydb.TopicTxWriterAsyncIO = driver.topic_client.tx_writer(tx, topic)
+
+ for i in range(message_count):
+ async with await tx.execute(query=f"select {i} as res;") as result_stream:
+ async for result_set in result_stream:
+ message = str(result_set.rows[0]["res"])
+ await tx_writer.write(ydb.TopicWriterMessage(message))
+ print(f"Message {result_set.rows[0]['res']} was written with tx.")
+
+ await session_pool.retry_tx_async(callee)
+ ```
+
- Java (sync)
[Пример на GitHub](https://github.com/ydb-platform/ydb-java-examples/blob/develop/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionWriteSync.java)
@@ -1281,7 +1329,7 @@
{
ConsumerName = "Consumer_Example",
SubscribeSettings = { new SubscribeSettings(topicName) }
- }.Build();
+ }.Build();
```
{% endlist %}
@@ -1343,7 +1391,7 @@
.build())
.build();
```
-
+
- C#
```c#
@@ -1530,7 +1578,7 @@
}
}
```
-
+
- C#
```c#
@@ -1542,7 +1590,7 @@
foreach (var message in batchMessages.Batch)
{
- logger.LogInformation("Received message: [{MessageData}]", message.Data);
+ logger.LogInformation("Received message: [{MessageData}]", message.Data);
}
}
}
@@ -1620,7 +1668,7 @@
}
});
```
-
+
- C#
```c#
@@ -1726,7 +1774,7 @@
});
}
```
-
+
- С#
```c#
@@ -1738,7 +1786,7 @@
foreach (var message in batchMessages.Batch)
{
- logger.LogInformation("Received message: [{MessageData}]", message.Data);
+ logger.LogInformation("Received message: [{MessageData}]", message.Data);
}
try
@@ -1965,6 +2013,42 @@
}
```
+- Python
+
+ Для чтения сообщений в рамках транзакции следует использовать метод `reader.receive_batch_with_tx`. Он прочитает пакет сообщений и добавит их коммит в транзакцию, при этом отдельно коммитить эти сообщения не требуется. Читателя сообщений можно использовать повторно в разных транзакциях. При этом важно, чтобы порядок коммита транзакций соответствовал порядку получения сообщений от читателя, так как коммиты сообщений в топике должны выполняться строго по порядку - в противном случае транзакция получит ошибку на попытке сделать коммит. Проще всего это сделать, если использовать читателя в цикле.
+
+ [Пример на GitHub](https://github.com/ydb-platform/ydb-python-sdk/blob/main/examples/topic/topic_transactions_example.py)
+
+ ```python
+ with driver.topic_client.reader(topic, consumer) as reader:
+ with ydb.QuerySessionPool(driver) as session_pool:
+ for _ in range(message_count):
+
+ def callee(tx: ydb.QueryTxContext):
+ batch = reader.receive_batch_with_tx(tx, max_messages=1)
+ print(f"Message {batch.messages[0].data.decode()} was read with tx.")
+
+ session_pool.retry_tx_sync(callee)
+ ```
+
+- Python (asyncio)
+
+ Для чтения сообщений в рамках транзакции следует использовать метод `reader.receive_batch_with_tx`. Он прочитает пакет сообщений и добавит их коммит в транзакцию, при этом отдельно коммитить эти сообщения не требуется. Читателя сообщений можно использовать повторно в разных транзакциях. При этом важно, чтобы порядок коммита транзакций соответствовал порядку получения сообщений от читателя, так как коммиты сообщений в топике должны выполняться строго по порядку - в противном случае транзакция получит ошибку на попытке сделать коммит. Проще всего это сделать, если использовать читателя в цикле.
+
+ [Пример на GitHub](https://github.com/ydb-platform/ydb-python-sdk/blob/main/examples/topic/topic_transactions_async_example.py)
+
+ ```python
+ async with driver.topic_client.reader(topic, consumer) as reader:
+ async with ydb.aio.QuerySessionPool(driver) as session_pool:
+ for _ in range(message_count):
+
+ async def callee(tx: ydb.aio.QueryTxContext):
+ batch = await reader.receive_batch_with_tx(tx, max_messages=1)
+ print(f"Message {batch.messages[0].data.decode()} was read with tx.")
+
+ await session_pool.retry_tx_async(callee)
+ ```
+
- Java (sync)
[Пример на GitHub](https://github.com/ydb-platform/ydb-java-examples/blob/develop/ydb-cookbook/src/main/java/tech/ydb/examples/topic/transactions/TransactionReadSync.java)