aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrekby <rekby@ydb.tech>2022-09-21 16:00:14 +0300
committerrekby <rekby@ydb.tech>2022-09-21 16:00:14 +0300
commit9eec24fe8cd7b19a162e3d3b8bd9b05ae7eaa503 (patch)
treed5469405883b9123e774fab67477cc53a2672e60
parent6a3d6e0bc240df4f23f32b5baed58cf4c8a29fc1 (diff)
downloadydb-9eec24fe8cd7b19a162e3d3b8bd9b05ae7eaa503.tar.gz
ydb go sdk topic writer api doc
-rw-r--r--ydb/docs/ru/core/reference/ydb-sdk/topic.md114
1 files changed, 106 insertions, 8 deletions
diff --git a/ydb/docs/ru/core/reference/ydb-sdk/topic.md b/ydb/docs/ru/core/reference/ydb-sdk/topic.md
index 23fc564e31..ea3f5d3f6f 100644
--- a/ydb/docs/ru/core/reference/ydb-sdk/topic.md
+++ b/ydb/docs/ru/core/reference/ydb-sdk/topic.md
@@ -4,7 +4,105 @@
Перед выполнением примеров [создайте топик](../ydb-cli/topic-create.md) и [добавьте читателя](../ydb-cli/topic-consumer-add.md).
-## Подключение к топику {#start-reader}
+## Запись сообщений
+### Подключение к топику для записи сообщений {#start-writer}
+
+На данный момент поддерживается подключение только с совпадающими producer_id и message_group_id, в будущем это ограничение будет снято.
+
+{% list tabs %}
+
+- Go
+
+ ```go
+ producerAndGroupID := "group-id"
+ writer, err := db.Topic().StartWriter(producerAndGroupID, "topicName",
+ topicoptions.WithMessageGroupID(producerAndGroupID),
+ )
+ if err != nil {
+ return err
+ }
+ ```
+
+{% endlist %}
+
+
+### Асинхронная запись сообщений
+
+{% list tabs %}
+
+- Go
+
+ Для отправки сообщения - достаточно в поле Data сохранить Reader, из которого можно будет прочитать данные. Можно рассчитывать на то что данные каждого сообщения читаются один раз (или до первой ошибки), к моменту возврата из Write данные будут уже прочитаны и сохранены во внутренний буфер.
+
+ SeqNo и дата создания сообщений по умолчанию проставляются автоматически.
+
+ По умолчанию Write выполняется асинхронно - данные из сообщений вычитываются и сохраняются во внутренний буфер, отправка происходит в фоне. Writer сам переподключается к YDB при обрывах связи и повторяет отправку сообщений пока это возможно. При получении ошибки, которую невозможно повторить Writer останавливается и следующие вызовы Write будут завершаться с ошибкой.
+
+ ```go
+ err := writer.Write(ctx,
+ topicwriter.Message{Data: strings.NewReader("1")},
+ topicwriter.Message{Data: bytes.NewReader([]byte{1,2,3})},
+ topicwriter.Message{Data: strings.NewReader("3")},
+ )
+ if err == nil {
+ return err
+ }
+ ```
+
+{% endlist %}
+
+
+### Запись сообщений с подтверждением о сохранении на сервере
+
+{% list tabs %}
+
+- Go
+
+ При подключении можно указать опцию синхронной записи сообщений - topicoptions.WithSyncWrite(true). Тогда Write будет возвращаться только после того как получит подтверждение с сервера о сохранении всех, сообщений переданных в вызове. При этом SDK так же как и обычно будет при необходимости переподключаться и повторять отправку сообщений. В этом режиме контекст управляет только временем ожидания ответа из SDK, т.е. даже после отмены контекста SDK продолжит попытки отправить сообщения.
+
+ ```go
+
+ producerAndGroupID := "group-id"
+ writer, _ := db.Topic().StartWriter(producerAndGroupID, "topicName",
+ topicoptions.WithMessageGroupID(producerAndGroupID),
+ topicoptions.WithSyncWrite(true),
+ )
+
+ err = writer.Write(ctx,
+ topicwriter.Message{Data: strings.NewReader("1")},
+ topicwriter.Message{Data: bytes.NewReader([]byte{1,2,3})},
+ topicwriter.Message{Data: strings.NewReader("3")},
+ )
+ if err == nil {
+ return err
+ }
+ ```
+
+{% endlist %}
+
+### Выбор кодека для сжатия сообщений
+
+{% list tabs %}
+
+- Go
+
+ По умолчанию SDK выбирает кодек автоматически (с учётом настроек топика). В автоматическом режиме SDK сначала отправляет по одной группе сообщений каждым из разрешённых кодеков, затем иногда будет пробовать сжать сообщения всеми доступными кодеками и выбирать кодек, дающий наименьший размер сообщения.
+ При необходимости можно задать фиксированный кодек в опциях подключения. Тогда будет использоваться именно он и замеры проводиться не будут.
+
+ ```go
+ producerAndGroupID := "group-id"
+ writer, _ := db.Topic().StartWriter(producerAndGroupID, "topicName",
+ topicoptions.WithMessageGroupID(producerAndGroupID),
+ topicoptions.WithCodec(topictypes.CodecGzip),
+ )
+ ```
+
+{% endlist %}
+
+
+
+## Чтение сообщений
+### Подключение к топику для чтения сообщений {#start-reader}
Чтобы создать подключение к существующему топику `my-topic` через добавленного ранее читателя `my-consumer`, используйте следующий код:
@@ -45,7 +143,7 @@
{% endlist %}
-## Чтение сообщений {#reading-messages}
+### Чтение сообщений {#reading-messages}
Сервер хранит [позицию чтения сообщений](../../concepts/topic.md#consumer-offset). После вычитывания очередного сообщения клиент может [отправить на сервер подтверждение обработки](#commit). Позиция чтения изменится, а при новом подключении будут вычитаны только неподтвержденные сообщения.
@@ -97,7 +195,7 @@ SDK получает данные с сервера партиями и буфе
{% endlist %}
-### Чтение с подтверждением обработки сообщений {#commit}
+#### Чтение с подтверждением обработки сообщений {#commit}
Чтобы подтверждать обработку сообщений по одному, используйте следующий код:
@@ -112,7 +210,7 @@ SDK получает данные с сервера партиями и буфе
if err != nil {
return err
}
- processMessage(mess)
+ processMessage(mess)
r.Commit(mess.Context(), mess)
}
}
@@ -141,7 +239,7 @@ SDK получает данные с сервера партиями и буфе
{% endlist %}
-### Чтение с хранением позиции на клиентской стороне {#client-commit}
+#### Чтение с хранением позиции на клиентской стороне {#client-commit}
При начале чтения клиентский код должен сообщить серверу стартовую позицию чтения:
@@ -191,7 +289,7 @@ SDK получает данные с сервера партиями и буфе
{% endlist %}
-## Обработка серверного прерывания чтения {#stop}
+### Обработка серверного прерывания чтения {#stop}
В {{ ydb-short-name }} используется серверная балансировка партиций между клиентами. Это означает, что сервер может прерывать чтение сообщений из произвольных партиций.
@@ -199,7 +297,7 @@ SDK получает данные с сервера партиями и буфе
В случае _жесткого прерывания_ клиент получает уведомление, что работать с сообщениями партиции больше нельзя. Клиент должен прекратить обработку прочитанных сообщений. Неподтвержденные сообщения будут переданы другому читателю.
-### Мягкое прерывание чтения {#soft-stop}
+#### Мягкое прерывание чтения {#soft-stop}
{% list tabs %}
@@ -222,7 +320,7 @@ SDK получает данные с сервера партиями и буфе
{% endlist %}
-### Жесткое прерывание чтения {#hard-stop}
+#### Жесткое прерывание чтения {#hard-stop}
{% list tabs %}