aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorpnv1 <pnv@ydb.tech>2023-08-28 13:40:26 +0300
committerpnv1 <pnv@ydb.tech>2023-08-28 14:27:21 +0300
commitab2b4b0becce6d749b863ef081c709c777682ce8 (patch)
treeba31019e1be5d9982b91c87e66564c4e0bdc6e4e
parentdd4f18892ffda75a087a89e4ccb0957909fba749 (diff)
downloadydb-ab2b4b0becce6d749b863ef081c709c777682ce8.tar.gz
Add Java Topic SDK documentation
Ссылка на нужную страницу: https://135bfb8ae76679fa6333f4a8c77068a5e9a6481e.testing.docs.yandex-team.ru/ydb-oss/reference/ydb-sdk/topic Add Java Topic docs EN Add Java Topic docs RU
-rw-r--r--ydb/docs/en/core/reference/ydb-sdk/topic.md505
-rw-r--r--ydb/docs/ru/core/reference/ydb-sdk/topic.md506
2 files changed, 996 insertions, 15 deletions
diff --git a/ydb/docs/en/core/reference/ydb-sdk/topic.md b/ydb/docs/en/core/reference/ydb-sdk/topic.md
index 34e771ef461..5f93e940cdb 100644
--- a/ydb/docs/en/core/reference/ydb-sdk/topic.md
+++ b/ydb/docs/en/core/reference/ydb-sdk/topic.md
@@ -5,6 +5,17 @@ This article provides examples of how to use the {{ ydb-short-name }} SDK to wor
Before performing the examples, [create a topic](../ydb-cli/topic-create.md) and [add a consumer](../ydb-cli/topic-consumer-add.md).
+## Topic usage examples
+
+{% list tabs %}
+
+- Java
+
+ [Examples on GitHub](https://github.com/ydb-platform/ydb-java-examples/tree/master/ydb-cookbook/src/main/java/tech/ydb/examples/topic)
+
+
+{% endlist %}
+
## Initializing a connection {#init}
{% list tabs %}
@@ -27,7 +38,7 @@ Before performing the examples, [create a topic](../ydb-cli/topic-create.md) and
TDriver driver(driverConfig);
```
- This example uses authentication token from the `YDB_TOKEN` environment variable. For details see [Connecting to a database](../../concepts/connect.md) и [Authentication](../../concepts/auth.md) pages.
+ This example uses authentication token from the `YDB_TOKEN` environment variable. For details see [Connecting to a database](../../concepts/connect.md) and [Authentication](../../concepts/auth.md) pages.
App code snippet for creating a client:
@@ -35,6 +46,37 @@ Before performing the examples, [create a topic](../ydb-cli/topic-create.md) and
TTopicClient topicClient(driver);
```
+- Java
+
+ To interact with YDB Topics, create an instance of the YDB transport and topic client.
+
+ The YDB transport lets the app and YDB interact at the transport layer. The transport must exist during the YDB access lifecycle and be initialized before creating a client.
+
+ App code snippet for transport initialization:
+ ```java
+ try (GrpcTransport transport = GrpcTransport.forConnectionString(connString)
+ .withAuthProvider(CloudAuthHelper.getAuthProviderFromEnviron())
+ .build()) {
+ // Use YDB transport
+ }
+ ```
+ In this example `CloudAuthHelper.getAuthProviderFromEnviron()` helper method is used which retrieves auth token from environment variables.
+ For example, `YDB_ACCESS_TOKEN_CREDENTIALS`.
+ For details see [Connecting to a database](../../concepts/connect.md) and [Authentication](../../concepts/auth.md) pages.
+
+ Topic client ([source code](https://github.com/ydb-platform/ydb-java-sdk/blob/master/topic/src/main/java/tech/ydb/topic/TopicClient.java#L34)) uses YDB transport and handles all topics topic operations, manages read and write sessions.
+
+ App code snippet for creating a client:
+ ```java
+ try (TopicClient topicClient = TopicClient.newClient(transport)
+ .setCompressionExecutor(compressionExecutor)
+ .build()) {
+ // Use topic client
+ }
+ ```
+ Both provided examples use ([try-with-resources](https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html)) block.
+ It allows to automatically close client and transport on leaving this block, considering both classes extends `AutoCloseable`.
+
{% endlist %}
## Managing topics {#manage}
@@ -43,9 +85,9 @@ Before performing the examples, [create a topic](../ydb-cli/topic-create.md) and
{% list tabs %}
-- C++
+The topic path is mandatory. Other parameters are optional.
- The topic path is mandatory. Other parameters are optional.
+- C++
For a full list of supported parameters, see the [source code](https://github.com/ydb-platform/ydb/blob/d2d07d368cd8ffd9458cc2e33798ee4ac86c733c/ydb/public/sdk/cpp/client/ydb_topic/topic.h#L394).
@@ -62,8 +104,6 @@ Before performing the examples, [create a topic](../ydb-cli/topic-create.md) and
```
- Go
- The topic path is mandatory. Other parameters are optional.
-
For a full list of supported parameters, see the [SDK documentation](https://pkg.go.dev/github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions#CreateOption).
Example of creating a topic with a list of supported codecs and a minimum number of partitions:
@@ -89,6 +129,24 @@ Before performing the examples, [create a topic](../ydb-cli/topic-create.md) and
)
```
+- Java
+
+ For a full list of supported parameters, see the [source code](https://github.com/ydb-platform/ydb-java-sdk/blob/master/topic/src/main/java/tech/ydb/topic/settings/CreateTopicSettings.java#L97).
+
+ ```java
+ topicClient.createTopic(topicPath, CreateTopicSettings.newBuilder()
+ // Optional
+ .setSupportedCodecs(SupportedCodecs.newBuilder()
+ .addCodec(Codec.RAW)
+ .addCodec(Codec.GZIP)
+ .build())
+ // Optional
+ .setPartitioningSettings(PartitioningSettings.newBuilder()
+ .setMinActivePartitions(3)
+ .build())
+ .build());
+ ```
+
{% endlist %}
### Updating a topic {#alter-topic}
@@ -134,6 +192,22 @@ When you update a topic, you must specify the topic path and the parameters to b
This feature is under development.
+- Java
+
+ For a full list of supported parameters, see the [source code](https://github.com/ydb-platform/ydb-java-sdk/blob/master/topic/src/main/java/tech/ydb/topic/settings/AlterTopicSettings.java#L23).
+
+ ```java
+ topicClient.alterTopic(topicPath, AlterTopicSettings.newBuilder()
+ .addAddConsumer(Consumer.newBuilder()
+ .setName("new-consumer")
+ .setSupportedCodecs(SupportedCodecs.newBuilder()
+ .addCodec(Codec.RAW)
+ .addCodec(Codec.GZIP)
+ .build())
+ .build())
+ .build());
+ ```
+
{% endlist %}
### Getting topic information {#describe-topic}
@@ -176,6 +250,18 @@ When you update a topic, you must specify the topic path and the parameters to b
print(info)
```
+- Java
+
+ Use `describeTopic` method to get information about topic.
+
+ For a full list of description fields, see the [source code](https://github.com/ydb-platform/ydb-java-sdk/blob/master/topic/src/main/java/tech/ydb/topic/description/TopicDescription.java#L19).
+
+ ```java
+ Result<TopicDescription> topicDescriptionResult = topicClient.describeTopic(topicPath)
+ .join();
+ TopicDescription description = topicDescriptionResult.getValue();
+ ```
+
{% endlist %}
### Deleting a topic {#drop-topic}
@@ -202,6 +288,12 @@ To delete a topic, just specify the path to it.
driver.topic_client.drop_topic(topic_path)
```
+- Java
+
+ ```java
+ topicClient.dropTopic(topicPath);
+ ```
+
{% endlist %}
## Message writes {#write}
@@ -248,9 +340,68 @@ Only connections with matching [producer and message group](../../concepts/topic
writer = driver.topic_client.writer(topic_path)
```
+- Java (sync)
+
+ Writer settings initialization:
+ ```java
+ String producerAndGroupID = "group-id";
+ WriterSettings settings = WriterSettings.newBuilder()
+ .setTopicPath(topicPath)
+ .setProducerId(producerAndGroupID)
+ .setMessageGroupId(producerAndGroupID)
+ .build();
+ ```
+
+ Sync writer creation:
+ ```java
+ SyncWriter writer = topicClient.createSyncWriter(settings);
+ ```
+
+ Writer should be initialized after it is created. There are two methods to do that:
+ - `init()`: non-blocking, launches initialization in background and doesn't wait for it to finish.
+ ```java
+ writer.init();
+ ```
+ - `initAndWait()`: blocking, launches initialization and waits for it to finish.
+ If an error occurs during this process, exception will be thrown.
+ ```java
+ try {
+ writer.initAndWait();
+ logger.info("Init finished succsessfully");
+ } catch (Exception exception) {
+ logger.error("Exception while initializing writer: ", exception);
+ return;
+ }
+ ```
+
+- Java (async)
+
+ Writer settings initialization:
+ ```java
+ String producerAndGroupID = "group-id";
+ WriterSettings settings = WriterSettings.newBuilder()
+ .setTopicPath(topicPath)
+ .setProducerId(producerAndGroupID)
+ .setMessageGroupId(producerAndGroupID)
+ .build();
+ ```
+
+ Async writer creation and initialization:
+ ```java
+ AsyncWriter writer = topicClient.createAsyncWriter(settings);
+
+ // Init in background
+ writer.init()
+ .thenRun(() -> logger.info("Init finished successfully"))
+ .exceptionally(ex -> {
+ logger.error("Init failed with ex: ", ex);
+ return null;
+ });
+ ```
+
{% endlist %}
-### Asynchronous message writes {#async-write}
+### Writing messages {#writing-messages}
{% list tabs %}
@@ -339,6 +490,53 @@ Only connections with matching [producer and message group](../../concepts/topic
```
+- Java (sync)
+
+ Method `send` blocks until a message is put into writers sending queue.
+ Putting a message into this queue means that the writer will do its best to deliver it.
+ For example, if a writing session will be accidentally closed, the writer will reconnect and try to resend this message on a new session.
+ But putting a message into message queue has no guarantees that this message will be written.
+ For example, there could be errors that will lead to writer shutdown before messages from the queue are sent.
+ If you have to be sure for each message that it is written, use async writer and check status returned by `send` method.
+
+ ```java
+ writer.send(Message.of("11".getBytes()));
+
+ long timeoutSeconds = 5; // How long should we wait for a message to be put into sending buffer
+ try {
+ writer.send(
+ Message.newBuilder()
+ .setData("22".getBytes())
+ .setCreateTimestamp(Instant.now().minusSeconds(5))
+ .build(),
+ timeoutSeconds,
+ TimeUnit.SECONDS
+ );
+ } catch (TimeoutException exception) {
+ logger.error("Send queue is full. Couldn't put message into sending queue within {} seconds", timeoutSeconds);
+ } catch (InterruptedException | ExecutionException exception) {
+ logger.error("Couldn't put the message into sending queue due to exception: ", exception);
+ }
+ ```
+
+- Java (async)
+
+ Method `send` puts a message into writer's sending queue.
+ Method returns `CompletableFuture<WriteAck>` which allows checking if the message was really written.
+ In case if the queue is full, `QueueOverflowException` exception will be thrown.
+ It is a way to signal a user that writing speed should be slowed down.
+ In this case a message write should be skipped or retried with exponential backoff.
+ Client buffer size can be also increased (`setMaxSendBufferMemorySize`) to be able to store more messages in memory before this exception is thrown.
+
+ ```java
+ try {
+ // Non-blocking. Throws QueueOverflowException if send queue is full
+ writer.send(Message.of("33".getBytes()));
+ } catch (QueueOverflowException exception) {
+ // Send queue is full. Need to retry with backoff or skip
+ }
+ ```
+
{% endlist %}
### Message writes with storage confirmation on the server
@@ -422,6 +620,48 @@ Only connections with matching [producer and message group](../../concepts/topic
```
+- Java (sync)
+
+ Blocking method `flush()` waits until all the messages previously written to the internal buffer are acknowledged:
+
+ ```java
+ for (byte[] message : messages) {
+ writer.send(Message.of(message));
+ }
+ writer.flush();
+ ```
+
+- Java (async)
+
+ `send` method returns `CompletableFuture<WriteAck>`.
+ Its successful completion means that the fact that this message is written is confirmed by server.
+ `WriteAck` struct contains seqNo, offset and write status:
+
+ ```java
+ writer.send(Message.of(message))
+ .whenComplete((result, ex) -> {
+ if (ex != null) {
+ logger.error("Exception on writing message message: ", ex);
+ } else {
+ switch (result.getState()) {
+ case WRITTEN:
+ WriteAck.Details details = result.getDetails();
+ StringBuilder str = new StringBuilder("Message was written successfully");
+ if (details != null) {
+ str.append(", offset: ").append(details.getOffset());
+ }
+ logger.debug(str.toString());
+ break;
+ case ALREADY_WRITTEN:
+ logger.warn("Message was already written");
+ break;
+ default:
+ break;
+ }
+ }
+ });
+ ```
+
{% endlist %}
### Selecting a codec for message compression {#codec}
@@ -473,12 +713,28 @@ For more details on using data compression for topics, see [here](../../concepts
)
```
+- Java
+
+ ```java
+ String producerAndGroupID = "group-id";
+ WriterSettings settings = WriterSettings.newBuilder()
+ .setTopicPath(topicPath)
+ .setProducerId(producerAndGroupID)
+ .setMessageGroupId(producerAndGroupID)
+ .setCodec(Codec.ZSTD)
+ .build();
+ ```
+
{% endlist %}
## Reading messages {#reading}
### Connecting to a topic for message reads {#start-reader}
+To be able to read messages from topic, a Consumer on this topic should exist.
+A Consumer can be created on [creating](#create-topic) or [altering](#alter-topic) a topic.
+Topic can have several Consumers and for each of them server stores its own reading progress.
+
{% list tabs %}
- C++
@@ -516,6 +772,96 @@ For more details on using data compression for topics, see [here](../../concepts
reader = driver.topic_client.reader(topic="topic-path", consumer="consumer_name")
```
+- Java (sync)
+
+ Reader settings initialization:
+ ```java
+ ReaderSettings settings = ReaderSettings.newBuilder()
+ .setConsumerName(consumerName)
+ .addTopic(TopicReadSettings.newBuilder()
+ .setPath(topicPath)
+ .setReadFrom(Instant.now().minus(Duration.ofHours(24))) // Optional
+ .setMaxLag(Duration.ofMinutes(30)) // Optional
+ .build())
+ .build();
+ ```
+
+ Sync reader creation:
+ ```java
+ SyncReader reader = topicClient.createSyncReader(settings);
+ ```
+
+ After a reader is created, it has to be initialized. Sync reader has two methods for this:
+ - `init()`: non-blocking, launches initialization in background and does not wait for it to finish.
+ ```java
+ reader.init();
+ ```
+ - `initAndWait()`: blocking, launches initialization and waits for it to finish.
+ If an error occurs during this process, exception will be thrown.
+ ```java
+ try {
+ reader.initAndWait();
+ logger.info("Init finished succsessfully");
+ } catch (Exception exception) {
+ logger.error("Exception while initializing reader: ", exception);
+ return;
+ }
+ ```
+
+- Java (async)
+
+ Reader settings initialization:
+ ```java
+ ReaderSettings settings = ReaderSettings.newBuilder()
+ .setConsumerName(consumerName)
+ .addTopic(TopicReadSettings.newBuilder()
+ .setPath(topicPath)
+ .setReadFrom(Instant.now().minus(Duration.ofHours(24))) // Optional
+ .setMaxLag(Duration.ofMinutes(30)) // Optional
+ .build())
+ .build();
+ ```
+
+ For async reader, `ReadEventHandlersSettings` also have to be provided with an implementation of `ReadEventHandler`.
+ It describes how events should be handled during reading.
+ ```java
+ ReadEventHandlersSettings handlerSettings = ReadEventHandlersSettings.newBuilder()
+ .setEventHandler(new Handler())
+ .build();
+ ```
+ Optionally, an executor for message handling can be also provided in `ReadEventHandlersSettings`.
+ To implement a Handler, default abstract class `AbstractReadEventHandler` can be used.
+ It is enough to override `onMessages` method that describes message handling. Implementation example:
+ ```java
+ private class Handler extends AbstractReadEventHandler {
+ @Override
+ public void onMessages(DataReceivedEvent event) {
+ for (Message message : event.getMessages()) {
+ StringBuilder str = new StringBuilder();
+ logger.info("Message received. SeqNo={}, offset={}", message.getSeqNo(), message.getOffset());
+
+ process(message);
+
+ message.commit().thenRun(() -> {
+ logger.info("Message committed");
+ });
+ }
+ }
+ }
+ ```
+
+ Async reader creation and initialization:
+ ```java
+ AsyncReader reader = topicClient.createAsyncReader(readerSettings, handlerSettings);
+ // Init in background
+ reader.init()
+ .thenRun(() -> logger.info("Init finished successfully"))
+ .exceptionally(ex -> {
+ logger.error("Init failed with ex: ", ex);
+ return null;
+ });
+ ```
+
{% endlist %}
Additional options are used to specify multiple topics and other parameters.
@@ -559,6 +905,22 @@ To establish a connection to the `my-topic` and `my-specific-topic` topics using
This feature is under development.
+- Java
+
+ ```java
+ ReaderSettings settings = ReaderSettings.newBuilder()
+ .setConsumerName(consumerName)
+ .addTopic(TopicReadSettings.newBuilder()
+ .setPath("my-topic")
+ .build())
+ .addTopic(TopicReadSettings.newBuilder()
+ .setPath("my-specific-topic")
+ .setReadFrom(Instant.now().minus(Duration.ofHours(24))) // Optional
+ .setMaxLag(Duration.ofMinutes(30)) // Optional
+ .build())
+ .build();
+ ```
+
{% endlist %}
### Reading messages {#reading-messages}
@@ -583,6 +945,14 @@ Information about which messages have already been processed can be [saved on th
The SDK receives data from the server in batches and buffers it. Depending on the task, the client code can read messages from the buffer one by one or in batches.
+- Python
+
+ The SDK receives data from the server in batches and buffers it. Depending on the task, the client code can read messages from the buffer one by one or in batches.
+
+- Java
+
+ The SDK receives data from the server in batches and buffers it. Depending on the task, the client code can read messages from the buffer one by one or in batches.
+
{% endlist %}
@@ -618,6 +988,21 @@ Reading messages one-by-one is not supported in the C++ SDK. Class `TDataReceive
process(message)
```
+- Java (sync)
+
+ To read messages one-by-one without commit just do not call `commit` method on messages:
+
+ ```java
+ while(true) {
+ Message message = reader.receive();
+ process(message);
+ }
+ ```
+
+- Java (async)
+
+ Reading messages one-by-one is not supported in async Reader.
+
{% endlist %}
#### Reading message batches
@@ -666,6 +1051,25 @@ Reading messages one-by-one is not supported in the C++ SDK. Class `TDataReceive
process(batch)
```
+- Java (sync)
+
+ Reading messages in batches is not supported in sync Reader.
+
+- Java (async)
+
+ To read messages without commit just do not call `commit` method:
+
+ ```java
+ private class Handler extends AbstractReadEventHandler {
+ @Override
+ public void onMessages(DataReceivedEvent event) {
+ for (Message message : event.getMessages()) {
+ process(message);
+ }
+ }
+ }
+ ```
+
{% endlist %}
### Reading with a commit {#commit}
@@ -706,6 +1110,26 @@ Reading messages one-by-one is not supported in the C++ SDK. Class `TDataReceive
reader.commit(message)
```
+- Java
+
+ To commit a message just call `commit` method on it.
+ This method returns `CompletableFuture<Void>` which successful completion means that the server confirmed commit.
+ In case of an error on commit do not retry it. Most likely, an error is caused be session shutdown.
+ The reader (maybe another one) will create a new session for this partition and the message will be read again.
+
+ ```java
+ message.commit()
+ .whenComplete((result, ex) -> {
+ if (ex != null) {
+ // Read session was probably closed, there is nothing we can do here.
+ // Do not retry this commit on the same message.
+ logger.error("exception while committing message: ", ex);
+ } else {
+ logger.info("message committed successfully");
+ }
+ });
+ ```
+
{% endlist %}
#### Reading message batches with commits
@@ -755,6 +1179,33 @@ Reading messages one-by-one is not supported in the C++ SDK. Class `TDataReceive
reader.commit(batch)
```
+- Java (sync)
+
+ Not relevant due to sync reader only reading messages one by one.
+
+- Java (async)
+
+ In `onMessage` handler whole message batch in `DataReceivedEvent` can be committed:
+
+ ```java
+ @Override
+ public void onMessages(DataReceivedEvent event) {
+ for (Message message : event.getMessages()) {
+ process(message);
+ }
+ event.commit()
+ .whenComplete((result, ex) -> {
+ if (ex != null) {
+ // Read session was probably closed, there is nothing we can do here.
+ // Do not retry this commit on the same event.
+ logger.error("exception while committing message batch: ", ex);
+ } else {
+ logger.info("message batch committed successfully");
+ }
+ });
+ }
+ ```
+
{% endlist %}
### Reading with consumer offset storage on the client side {#client-commit}
@@ -815,6 +1266,12 @@ When reading starts, the client code must transmit the starting consumer offset
This feature is under development.
+- Java
+
+ Setting the starting offset for reading is not supported in the current state of Java SDK.
+
+ The `setReadFrom` setting is used for reading only messages with write timestamps no less than the given one.
+
{% endlist %}
### Processing a server read interrupt {#stop}
@@ -872,6 +1329,29 @@ In case of a _hard interruption_, the client receives a notification that it is
reader.commit(batch)
```
+- Java (sync)
+
+ Not relevant due to not being possible to change the way of handling such events.
+ Client will automatically respond to server that it is ready to stop.
+
+- Java (async)
+
+ `onStopPartitionSession(StopPartitionSessionEvent event)` handler should be overridden to handle this event:
+
+ ```java
+ @Override
+ public void onStopPartitionSession(StopPartitionSessionEvent event) {
+ logger.info("Partition session {} stopped. Committed offset: {}", event.getPartitionSessionId(),
+ event.getCommittedOffset());
+ // This event means that no more messages will be received by server
+ // Received messages still can be read from ReaderBuffer
+ // Messages still can be committed, until confirm() method is called
+
+ // Confirm that session can be closed
+ event.confirm();
+ }
+ ```
+
{% endlist %}
#### Hard reading interruption {#hard-stop}
@@ -931,4 +1411,17 @@ In case of a _hard interruption_, the client receives a notification that it is
reader.commit(batch)
```
+- Java (sync)
+
+ Not relevant due to not being possible to change the way of handling such events.
+
+- Java (async)
+
+ ```java
+ @Override
+ public void onPartitionSessionClosed(PartitionSessionClosedEvent event) {
+ logger.info("Partition session {} is closed.", event.getPartitionSession().getPartitionId());
+ }
+ ```
+
{% endlist %}
diff --git a/ydb/docs/ru/core/reference/ydb-sdk/topic.md b/ydb/docs/ru/core/reference/ydb-sdk/topic.md
index 69aca92d0aa..4755557d5ac 100644
--- a/ydb/docs/ru/core/reference/ydb-sdk/topic.md
+++ b/ydb/docs/ru/core/reference/ydb-sdk/topic.md
@@ -5,6 +5,17 @@
Перед выполнением примеров [создайте топик](../ydb-cli/topic-create.md) и [добавьте читателя](../ydb-cli/topic-consumer-add.md).
+## Примеры работы с топиками
+
+{% list tabs %}
+
+- Java
+
+ [Примеры на GitHub](https://github.com/ydb-platform/ydb-java-examples/tree/master/ydb-cookbook/src/main/java/tech/ydb/examples/topic)
+
+
+{% endlist %}
+
## Инициализация соединения с топиками
{% list tabs %}
@@ -34,6 +45,36 @@
TTopicClient topicClient(driver);
```
+- Java
+
+ Для работы с топиками создаются экземпляры транспорта YDB и клиента.
+
+ Транспорт YDB отвечает за взаимодействие приложения и YDB на транспортном уровне. Он должен существовать на всем протяжении жизненного цикла работы с топиками и должен быть инициализирован перед созданием клиента.
+
+ Фрагмент кода приложения для инициализации транспорта YDB:
+ ```java
+ try (GrpcTransport transport = GrpcTransport.forConnectionString(connString)
+ .withAuthProvider(CloudAuthHelper.getAuthProviderFromEnviron())
+ .build()) {
+ // Use YDB transport
+ }
+ ```
+ В этом примере используется вспомогательный метод `CloudAuthHelper.getAuthProviderFromEnviron()`, получающий токен из переменных окружения.
+ Например, `YDB_ACCESS_TOKEN_CREDENTIALS`.
+ Подробнее про [соединение с БД](../../concepts/connect.md) и [аутентификацию](../../concepts/auth.md).
+
+ Клиент сервиса топиков ([исходный код](https://github.com/ydb-platform/ydb-java-sdk/blob/master/topic/src/main/java/tech/ydb/topic/TopicClient.java#L34)) работает поверх транспорта YDB и отвечает как за управляющие операции с топиками, так и за создание писателей и читателей.
+
+ Фрагмент кода приложения для создания клиента:
+ ```java
+ try (TopicClient topicClient = TopicClient.newClient(transport)
+ .setCompressionExecutor(compressionExecutor)
+ .build()) {
+ // Use topic client
+ }
+ ```
+ В обоих примерах кода выше используется блок ([try-with-resources](https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html)).
+ Это позволяет автоматически закрывать клиент и транспорт при выходе из этого блока, т.к. оба являются наследниками `AutoCloseable`.
{% endlist %}
@@ -41,13 +82,13 @@
### Создание топика {#create-topic}
+Единственный обязательный параметр для создания топика - это его путь, остальные параметры опциональны.
+
{% list tabs %}
- C++
- Единственный обязательный параметр для создания топика - это его путь. Остальные настройки опциональны и представлены структурой `TCreateTopicSettings`.
-
- Полный список настроек смотри [в заголовочном файле](https://github.com/ydb-platform/ydb/blob/d2d07d368cd8ffd9458cc2e33798ee4ac86c733c/ydb/public/sdk/cpp/client/ydb_topic/topic.h#L394).
+ Полный список настроек можно посмотреть [в заголовочном файле](https://github.com/ydb-platform/ydb/blob/d2d07d368cd8ffd9458cc2e33798ee4ac86c733c/ydb/public/sdk/cpp/client/ydb_topic/topic.h#L394).
Пример создания топика c тремя партициями и поддержкой кодека ZSTD:
@@ -63,8 +104,6 @@
- Go
- Единственный обязательный параметр для создания топика - это его путь, остальные параметры опциональны.
-
Полный список поддерживаемых параметров можно посмотреть в [документации SDK](https://pkg.go.dev/github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions#CreateOption).
Пример создания топика со списком поддерживаемых кодеков и минимальным количеством партиций
@@ -90,6 +129,26 @@
)
```
+- Java
+
+ Полный список настроек можно посмотреть [в коде SDK](https://github.com/ydb-platform/ydb-java-sdk/blob/master/topic/src/main/java/tech/ydb/topic/settings/CreateTopicSettings.java#L97).
+
+ Пример создания топика со списком поддерживаемых кодеков и минимальным количеством партиций
+
+ ```java
+ topicClient.createTopic(topicPath, CreateTopicSettings.newBuilder()
+ // Optional
+ .setSupportedCodecs(SupportedCodecs.newBuilder()
+ .addCodec(Codec.RAW)
+ .addCodec(Codec.GZIP)
+ .build())
+ // Optional
+ .setPartitioningSettings(PartitioningSettings.newBuilder()
+ .setMinActivePartitions(3)
+ .build())
+ .build());
+ ```
+
{% endlist %}
### Изменение топика {#alter-topic}
@@ -100,7 +159,7 @@
При изменении топика в параметрах метода `AlterTopic` нужно указать путь топика и параметры, которые будут изменяться. Изменяемые параметры представлены структурой `TAlterTopicSettings`.
- Полный список настроек смотри [в заголовочном файле](https://github.com/ydb-platform/ydb/blob/d2d07d368cd8ffd9458cc2e33798ee4ac86c733c/ydb/public/sdk/cpp/client/ydb_topic/topic.h#L458).
+ Полный список настроек можно посмотреть [в заголовочном файле](https://github.com/ydb-platform/ydb/blob/d2d07d368cd8ffd9458cc2e33798ee4ac86c733c/ydb/public/sdk/cpp/client/ydb_topic/topic.h#L458).
Пример добавления [важного читателя](../../concepts/topic#important-consumer) к топику и установки [времени хранения сообщений](../../concepts/topic#retention-time) для топика в два дня:
@@ -137,6 +196,24 @@
Функциональность находится в разработке.
+- Java
+
+ При изменении топика в параметрах метода `alterTopic` нужно указать путь топика и параметры, которые будут изменяться.
+
+ Полный список настроек можно посмотреть [в коде SDK](https://github.com/ydb-platform/ydb-java-sdk/blob/master/topic/src/main/java/tech/ydb/topic/settings/AlterTopicSettings.java#L23).
+
+
+ ```java
+ topicClient.alterTopic(topicPath, AlterTopicSettings.newBuilder()
+ .addAddConsumer(Consumer.newBuilder()
+ .setName("new-consumer")
+ .setSupportedCodecs(SupportedCodecs.newBuilder()
+ .addCodec(Codec.RAW)
+ .addCodec(Codec.GZIP)
+ .build())
+ .build())
+ .build());
+
{% endlist %}
### Получение информации о топике {#describe-topic}
@@ -168,7 +245,7 @@
```go
descResult, err := db.Topic().Describe(ctx, "topic-path")
if err != nil {
- log.Fatalf("failed drop topic: %v", err)
+ log.Fatalf("failed describe topic: %v", err)
return
}
fmt.Printf("describe: %#v\n", descResult)
@@ -181,6 +258,19 @@
print(info)
```
+- Java
+
+ Для получения информации о топике используется метод `describeTopic`.
+
+ Полный список полей описания можно посмотреть [в коде SDK](https://github.com/ydb-platform/ydb-java-sdk/blob/master/topic/src/main/java/tech/ydb/topic/description/TopicDescription.java#L19).
+
+
+ ```java
+ Result<TopicDescription> topicDescriptionResult = topicClient.describeTopic(topicPath)
+ .join();
+ TopicDescription description = topicDescriptionResult.getValue();
+ ```
+
{% endlist %}
### Удаление топика {#drop-topic}
@@ -207,6 +297,12 @@
driver.topic_client.drop_topic(topic_path)
```
+- Java
+
+ ```java
+ topicClient.dropTopic(topicPath);
+ ```
+
{% endlist %}
## Запись сообщений {#write}
@@ -253,9 +349,67 @@
writer = driver.topic_client.writer(topic_path)
```
+- Java (sync)
+
+ Инициализация настроек писателя:
+ ```java
+ String producerAndGroupID = "group-id";
+ WriterSettings settings = WriterSettings.newBuilder()
+ .setTopicPath(topicPath)
+ .setProducerId(producerAndGroupID)
+ .setMessageGroupId(producerAndGroupID)
+ .build();
+ ```
+
+ Создание синхронного писателя:
+ ```java
+ SyncWriter writer = topicClient.createSyncWriter(settings);
+ ```
+
+ После создания писателя его необходимо инициализировать. Для этого есть два метода:
+ - `init()`: неблокирующий, запускает процесс инициализации в фоне и не ждёт его завершения.
+ ```java
+ writer.init();
+ ```
+ - `initAndWait()`: блокирующий, запускает процесс инициализации и ждёт его завершения. Если в процессе инициализации возникла ошибка, будет брошено исключение.
+ ```java
+ try {
+ writer.initAndWait();
+ logger.info("Init finished succsessfully");
+ } catch (Exception exception) {
+ logger.error("Exception while initializing writer: ", exception);
+ return;
+ }
+ ```
+
+- Java (async)
+
+ Инициализация настроек писателя:
+ ```java
+ String producerAndGroupID = "group-id";
+ WriterSettings settings = WriterSettings.newBuilder()
+ .setTopicPath(topicPath)
+ .setProducerId(producerAndGroupID)
+ .setMessageGroupId(producerAndGroupID)
+ .build();
+ ```
+
+ Создание и инициализация асинхронного писателя:
+ ```java
+ AsyncWriter writer = topicClient.createAsyncWriter(settings);
+
+ // Init in background
+ writer.init()
+ .thenRun(() -> logger.info("Init finished successfully"))
+ .exceptionally(ex -> {
+ logger.error("Init failed with ex: ", ex);
+ return null;
+ });
+ ```
+
{% endlist %}
-### Асинхронная запись сообщений {#async-write}
+### Запись сообщений {#writing-messages}
{% list tabs %}
@@ -344,6 +498,51 @@
```
+- Java (sync)
+
+ Метод `send` блокирует управление, пока сообщение не будет помещено в очередь отправки.
+ Попадание сообщения в эту очередь означает, что писатель сделает всё возможное для доставки сообщения.
+ Например, если сессия записи по какой-то причине оборвётся, писатель переустановит соединение и попробует отправить это сообщение на новой сессии.
+ Но попадание сообщения в очередь отправки не гарантирует того, что сообщение в итоге будет записано.
+ Например, могут возникать ошибки, приводящие к завершению работы писателя до того, как сообщения из очереди будут отправлены.
+ Если нужно подтверждение успешной записи для каждого сообщения, используйте асинхронного писателя и проверяйте статус, возвращаемый методом `send`.
+ ```java
+ writer.send(Message.of("11".getBytes()));
+
+ long timeoutSeconds = 5; // How long should we wait for a message to be put into sending buffer
+ try {
+ writer.send(
+ Message.newBuilder()
+ .setData("22".getBytes())
+ .setCreateTimestamp(Instant.now().minusSeconds(5))
+ .build(),
+ timeoutSeconds,
+ TimeUnit.SECONDS
+ );
+ } catch (TimeoutException exception) {
+ logger.error("Send queue is full. Couldn't put message into sending queue within {} seconds", timeoutSeconds);
+ } catch (InterruptedException | ExecutionException exception) {
+ logger.error("Couldn't put the message into sending queue due to exception: ", exception);
+ }
+ ```
+
+- Java (async)
+
+ Метод `send` в асинхронном клиенте неблокирующий. Помещает сообщение в очередь отправки.
+ Метод возвращает `CompletableFuture<WriteAck>`, позволяющую проверить, действительно ли сообщение было записано.
+ В случае, если очередь переполнена, будет брошено исключение QueueOverflowException.
+ Это способ сигнализировать пользователю о том, что поток записи следует притормозить.
+ В таком случае стоит или пропускать сообщения, или выполнять повторные попытки записи через exponential backoff.
+ Также можно увеличить размер клиентского буфера (`setMaxSendBufferMemorySize`), чтобы обрабатывать больший объем сообщений перед тем, как он заполнится.
+ ```java
+ try {
+ // Non-blocking. Throws QueueOverflowException if send queue is full
+ writer.send(Message.of("33".getBytes()));
+ } catch (QueueOverflowException exception) {
+ // Send queue is full. Need to retry with backoff or skip
+ }
+ ```
+
{% endlist %}
### Запись сообщений с подтверждением о сохранении на сервере
@@ -427,6 +626,36 @@
```
+- Java (async)
+
+ Метод `send` возвращает `CompletableFuture<WriteAck>`. Её успешное завершение означает подтверждение записи сервером.
+ В структуре `WriteAck` содержится информация о seqNo, offset и статусе записи:
+
+ ```java
+ writer.send(Message.of(message))
+ .whenComplete((result, ex) -> {
+ if (ex != null) {
+ logger.error("Exception on writing message message: ", ex);
+ } else {
+ switch (result.getState()) {
+ case WRITTEN:
+ WriteAck.Details details = result.getDetails();
+ StringBuilder str = new StringBuilder("Message was written successfully");
+ if (details != null) {
+ str.append(", offset: ").append(details.getOffset());
+ }
+ logger.debug(str.toString());
+ break;
+ case ALREADY_WRITTEN:
+ logger.warn("Message was already written");
+ break;
+ default:
+ break;
+ }
+ }
+ });
+ ```
+
{% endlist %}
### Выбор кодека для сжатия сообщений {#codec}
@@ -477,12 +706,28 @@
)
```
+- Java
+
+ ```java
+ String producerAndGroupID = "group-id";
+ WriterSettings settings = WriterSettings.newBuilder()
+ .setTopicPath(topicPath)
+ .setProducerId(producerAndGroupID)
+ .setMessageGroupId(producerAndGroupID)
+ .setCodec(Codec.ZSTD)
+ .build();
+ ```
+
{% endlist %}
## Чтение сообщений {#reading}
### Подключение к топику для чтения сообщений {#start-reader}
+Для чтения сообщений из топика необходимо наличие заранее созданного Consumer, связанного с этим топиком.
+Создать Consumer можно при [создании](#create-topic) или [изменении](#alter-topic) топика.
+У топика может быть несколько Consumer'ов и для каждого из них сервер хранит свой прогресс чтения.
+
{% list tabs %}
- C++
@@ -520,9 +765,101 @@
reader = driver.topic_client.reader(topic="topic-path", consumer="consumer_name")
```
+- Java (sync)
+
+ Инициализация настроек читателя
+ ```java
+ ReaderSettings settings = ReaderSettings.newBuilder()
+ .setConsumerName(consumerName)
+ .addTopic(TopicReadSettings.newBuilder()
+ .setPath(topicPath)
+ .setReadFrom(Instant.now().minus(Duration.ofHours(24))) // Optional
+ .setMaxLag(Duration.ofMinutes(30)) // Optional
+ .build())
+ .build();
+ ```
+
+ Создание синхронного читателя
+ ```java
+ SyncReader reader = topicClient.createSyncReader(settings);
+ ```
+
+ После создания синхронного читателя необходимо инициализировать. Для этого следует воспользоваться одним их двух методов:
+ - `init()`: неблокирующий, запускает процесс инициализации в фоне и не ждёт его завершения.
+ ```java
+ reader.init();
+ ```
+ - `initAndWait()`: блокирующий, запускает процесс инициализации и ждёт его завершения. Если в процессе инициализации возникла ошибка, будет брошено исключение.
+ ```java
+ try {
+ reader.initAndWait();
+ logger.info("Init finished succsessfully");
+ } catch (Exception exception) {
+ logger.error("Exception while initializing reader: ", exception);
+ return;
+ }
+ ```
+
+- Java (async)
+
+ Инициализация настроек читателя
+ ```java
+ ReaderSettings settings = ReaderSettings.newBuilder()
+ .setConsumerName(consumerName)
+ .addTopic(TopicReadSettings.newBuilder()
+ .setPath(topicPath)
+ .setReadFrom(Instant.now().minus(Duration.ofHours(24))) // Optional
+ .setMaxLag(Duration.ofMinutes(30)) // Optional
+ .build())
+ .build();
+ ```
+
+ Для асинхронного читателя, помимо общих настроек чтения `ReaderSettings`, понадобятся настройки обработчика событий `ReadEventHandlersSettings`, в которых необходимо передать экземпляр наследника `ReadEventHandler`.
+ Он будет описывать, как должна происходить обработка различных событий, происходящих во время чтения.
+
+ ```java
+ ReadEventHandlersSettings handlerSettings = ReadEventHandlersSettings.newBuilder()
+ .setEventHandler(new Handler())
+ .build();
+ ```
+
+ Опционально, в `ReadEventHandlersSettings` можно указать executor'а, на котором будет происходить обработка сообщений.
+ Для реализации объекта-наследника ReadEventHandler можно воспользоваться дефолтным абстрактным классом `AbstractReadEventHandler`.
+ Достаточно переопределить метод onMessages, отвечающий за обработку самих сообщений. Пример реализации:
+
+ ```java
+ private class Handler extends AbstractReadEventHandler {
+ @Override
+ public void onMessages(DataReceivedEvent event) {
+ for (Message message : event.getMessages()) {
+ StringBuilder str = new StringBuilder();
+ logger.info("Message received. SeqNo={}, offset={}", message.getSeqNo(), message.getOffset());
+
+ process(message);
+
+ message.commit().thenRun(() -> {
+ logger.info("Message committed");
+ });
+ }
+ }
+ }
+ ```
+
+ Создание и инициализация асинхронного читателя:
+ ```java
+ AsyncReader reader = topicClient.createAsyncReader(readerSettings, handlerSettings);
+ // Init in background
+ reader.init()
+ .thenRun(() -> logger.info("Init finished successfully"))
+ .exceptionally(ex -> {
+ logger.error("Init failed with ex: ", ex);
+ return null;
+ });
+ ```
+
{% endlist %}
-Вы также можете использовать расширенный вариант создания подключения, чтобы указать несколько топиков и задать параметры чтения. Следующий код создаст подключение к топикам `my-topic` и `my-specific-topic` через читателя `my-consumer`, а также задаст время, с которого начинать читать сообщения:
+Вы также можете использовать расширенный вариант создания подключения, чтобы указать несколько топиков и задать параметры чтения. Следующий код создаст подключение к топикам `my-topic` и `my-specific-topic` через читателя `my-consumer`:
{% list tabs %}
@@ -558,10 +895,28 @@
}
```
+ Также в примере выше задаётся время, с которого следует начинать читать сообщения.
+
- Python
Функциональность находится в разработке.
+- Java
+
+ ```java
+ ReaderSettings settings = ReaderSettings.newBuilder()
+ .setConsumerName(consumerName)
+ .addTopic(TopicReadSettings.newBuilder()
+ .setPath("my-topic")
+ .build())
+ .addTopic(TopicReadSettings.newBuilder()
+ .setPath("my-specific-topic")
+ .setReadFrom(Instant.now().minus(Duration.ofHours(24))) // Optional
+ .setMaxLag(Duration.ofMinutes(30)) // Optional
+ .build())
+ .build();
+ ```
+
{% endlist %}
### Чтение сообщений {#reading-messages}
@@ -586,6 +941,14 @@
SDK получает данные с сервера партиями и буферизирует их. В зависимости от задач клиентский код может читать сообщения из буфера по одному или пакетами.
+- Python
+
+ SDK получает данные с сервера партиями и буферизирует их. В зависимости от задач клиентский код может читать сообщения из буфера по одному или пакетами.
+
+- Java
+
+ SDK получает данные с сервера партиями и буферизирует их. В зависимости от задач клиентский код может читать сообщения из буфера по одному или пакетами.
+
{% endlist %}
@@ -621,6 +984,21 @@
process(message)
```
+- Java (sync)
+
+ Чтобы читать сообщения без подтверждения обработки, по одному, используйте следующий код:
+
+ ```java
+ while(true) {
+ Message message = reader.receive();
+ process(message);
+ }
+ ```
+
+- Java (async)
+
+ В асинхронном клиенте нет возможности читать сообщения по одному.
+
{% endlist %}
#### Чтение сообщений пакетом
@@ -670,6 +1048,25 @@
process(batch)
```
+- Java (sync)
+
+ В синхронном клиенте нет возможности прочитать сразу пакет сообщений.
+
+- Java (async)
+
+ Чтобы прочитать пакет сообщений без подтверждения обработки, используйте следующий код:
+
+ ```java
+ private class Handler extends AbstractReadEventHandler {
+ @Override
+ public void onMessages(DataReceivedEvent event) {
+ for (Message message : event.getMessages()) {
+ process(message);
+ }
+ }
+ }
+ ```
+
{% endlist %}
### Чтение с подтверждением обработки сообщений {#commit}
@@ -710,6 +1107,28 @@
reader.commit(message)
```
+- Java
+
+ Для подтверждения обработки сообщения достаточно вызвать у сообщения метод `commit`.
+ Актуально как для синхронного, так и для асинхронного читателя.
+ В асинхронном читателе, при обработке пакета сообщений, можно вызвать `commit` или у всего пакета сразу, или у каждого сообщения отдельно.
+ Этот метод возвращает `CompletableFuture<Void>`, успешное выполнение которой означает подтверждение обработки сервером.
+ В случае ошибки коммита не следует пытаться его ретраить. Скорее всего, ошибка вызвана закрытием сессии.
+ Читатель (необязательно этот же) сам создаст новую сессию для этой партиции и сообщение будет прочитано снова.
+
+ ```java
+ message.commit()
+ .whenComplete((result, ex) -> {
+ if (ex != null) {
+ // Read session was probably closed, there is nothing we can do here.
+ // Do not retry this commit on the same event.
+ logger.error("exception while committing message: ", ex);
+ } else {
+ logger.info("message committed successfully");
+ }
+ });
+ ```
+
{% endlist %}
#### Чтение сообщений пакетом с подтверждением
@@ -759,6 +1178,32 @@
reader.commit(batch)
```
+- Java (sync)
+
+ Неактуально, т.к. в синхронном читателе нет возможности читать сообщения пакетами.
+
+- Java (async)
+
+ В обработчике `onMessage` можно закоммитить весь пакет сообщений, вызвав `commit` на событии.
+ ```java
+ @Override
+ public void onMessages(DataReceivedEvent event) {
+ for (Message message : event.getMessages()) {
+ process(message);
+ }
+ event.commit()
+ .whenComplete((result, ex) -> {
+ if (ex != null) {
+ // Read session was probably closed, there is nothing we can do here.
+ // Do not retry this commit on the same message.
+ logger.error("exception while committing message batch: ", ex);
+ } else {
+ logger.info("message batch committed successfully");
+ }
+ });
+ }
+ ```
+
{% endlist %}
### Чтение с хранением позиции на клиентской стороне {#client-commit}
@@ -819,6 +1264,12 @@
Функциональность находится в разработке.
+- Java
+
+ Чтение с заданной позиции в текущей версии SDK отсутствует.
+
+ Поддерживается настройка читателя `setReadFrom` для чтения событий с отметками времени записи не меньше данной.
+
{% endlist %}
### Обработка серверного прерывания чтения {#stop}
@@ -876,6 +1327,30 @@
reader.commit(batch)
```
+- Java (sync)
+
+ Неактуально, т.к. в синхронном читателе нет возможности настраивать обработку подобных событий.
+ Клиент сразу ответит серверу подтверждением остановки.
+
+- Java (async)
+
+ Для возможности реагировать на такое событие следует переопределить метод `onStopPartitionSession(StopPartitionSessionEvent event)` в объекте-наследнике `ReadEventHandler` (см [Подключение к топику для чтения сообщений](#start-reader)).
+ `event.confirm()` обязательно должен быть вызван, т.к. сервер ожидает этого ответа для продолжения остановки.
+
+ ```java
+ @Override
+ public void onStopPartitionSession(StopPartitionSessionEvent event) {
+ logger.info("Partition session {} stopped. Committed offset: {}", event.getPartitionSessionId(),
+ event.getCommittedOffset());
+ // This event means that no more messages will be received by server
+ // Received messages still can be read from ReaderBuffer
+ // Messages still can be committed, until confirm() method is called
+
+ // Confirm that session can be closed
+ event.confirm();
+ }
+ ```
+
{% endlist %}
#### Жесткое прерывание чтения {#hard-stop}
@@ -935,4 +1410,17 @@
reader.commit(batch)
```
+- Java (sync)
+
+ Неактуально, т.к. в синхронном читателе нет возможности настраивать обработку подобных событий.
+
+- Java (async)
+
+ ```java
+ @Override
+ public void onPartitionSessionClosed(PartitionSessionClosedEvent event) {
+ logger.info("Partition session {} is closed.", event.getPartitionSession().getPartitionId());
+ }
+ ```
+
{% endlist %}