diff options
author | rekby <rekby@ydb.tech> | 2022-09-21 16:00:14 +0300 |
---|---|---|
committer | rekby <rekby@ydb.tech> | 2022-09-21 16:00:14 +0300 |
commit | 9eec24fe8cd7b19a162e3d3b8bd9b05ae7eaa503 (patch) | |
tree | d5469405883b9123e774fab67477cc53a2672e60 | |
parent | 6a3d6e0bc240df4f23f32b5baed58cf4c8a29fc1 (diff) | |
download | ydb-9eec24fe8cd7b19a162e3d3b8bd9b05ae7eaa503.tar.gz |
ydb go sdk topic writer api doc
-rw-r--r-- | ydb/docs/ru/core/reference/ydb-sdk/topic.md | 114 |
1 files changed, 106 insertions, 8 deletions
diff --git a/ydb/docs/ru/core/reference/ydb-sdk/topic.md b/ydb/docs/ru/core/reference/ydb-sdk/topic.md index 23fc564e31..ea3f5d3f6f 100644 --- a/ydb/docs/ru/core/reference/ydb-sdk/topic.md +++ b/ydb/docs/ru/core/reference/ydb-sdk/topic.md @@ -4,7 +4,105 @@ Перед выполнением примеров [создайте топик](../ydb-cli/topic-create.md) и [добавьте читателя](../ydb-cli/topic-consumer-add.md). -## Подключение к топику {#start-reader} +## Запись сообщений +### Подключение к топику для записи сообщений {#start-writer} + +На данный момент поддерживается подключение только с совпадающими producer_id и message_group_id, в будущем это ограничение будет снято. + +{% list tabs %} + +- Go + + ```go + producerAndGroupID := "group-id" + writer, err := db.Topic().StartWriter(producerAndGroupID, "topicName", + topicoptions.WithMessageGroupID(producerAndGroupID), + ) + if err != nil { + return err + } + ``` + +{% endlist %} + + +### Асинхронная запись сообщений + +{% list tabs %} + +- Go + + Для отправки сообщения - достаточно в поле Data сохранить Reader, из которого можно будет прочитать данные. Можно рассчитывать на то что данные каждого сообщения читаются один раз (или до первой ошибки), к моменту возврата из Write данные будут уже прочитаны и сохранены во внутренний буфер. + + SeqNo и дата создания сообщений по умолчанию проставляются автоматически. + + По умолчанию Write выполняется асинхронно - данные из сообщений вычитываются и сохраняются во внутренний буфер, отправка происходит в фоне. Writer сам переподключается к YDB при обрывах связи и повторяет отправку сообщений пока это возможно. При получении ошибки, которую невозможно повторить Writer останавливается и следующие вызовы Write будут завершаться с ошибкой. + + ```go + err := writer.Write(ctx, + topicwriter.Message{Data: strings.NewReader("1")}, + topicwriter.Message{Data: bytes.NewReader([]byte{1,2,3})}, + topicwriter.Message{Data: strings.NewReader("3")}, + ) + if err == nil { + return err + } + ``` + +{% endlist %} + + +### Запись сообщений с подтверждением о сохранении на сервере + +{% list tabs %} + +- Go + + При подключении можно указать опцию синхронной записи сообщений - topicoptions.WithSyncWrite(true). Тогда Write будет возвращаться только после того как получит подтверждение с сервера о сохранении всех, сообщений переданных в вызове. При этом SDK так же как и обычно будет при необходимости переподключаться и повторять отправку сообщений. В этом режиме контекст управляет только временем ожидания ответа из SDK, т.е. даже после отмены контекста SDK продолжит попытки отправить сообщения. + + ```go + + producerAndGroupID := "group-id" + writer, _ := db.Topic().StartWriter(producerAndGroupID, "topicName", + topicoptions.WithMessageGroupID(producerAndGroupID), + topicoptions.WithSyncWrite(true), + ) + + err = writer.Write(ctx, + topicwriter.Message{Data: strings.NewReader("1")}, + topicwriter.Message{Data: bytes.NewReader([]byte{1,2,3})}, + topicwriter.Message{Data: strings.NewReader("3")}, + ) + if err == nil { + return err + } + ``` + +{% endlist %} + +### Выбор кодека для сжатия сообщений + +{% list tabs %} + +- Go + + По умолчанию SDK выбирает кодек автоматически (с учётом настроек топика). В автоматическом режиме SDK сначала отправляет по одной группе сообщений каждым из разрешённых кодеков, затем иногда будет пробовать сжать сообщения всеми доступными кодеками и выбирать кодек, дающий наименьший размер сообщения. + При необходимости можно задать фиксированный кодек в опциях подключения. Тогда будет использоваться именно он и замеры проводиться не будут. + + ```go + producerAndGroupID := "group-id" + writer, _ := db.Topic().StartWriter(producerAndGroupID, "topicName", + topicoptions.WithMessageGroupID(producerAndGroupID), + topicoptions.WithCodec(topictypes.CodecGzip), + ) + ``` + +{% endlist %} + + + +## Чтение сообщений +### Подключение к топику для чтения сообщений {#start-reader} Чтобы создать подключение к существующему топику `my-topic` через добавленного ранее читателя `my-consumer`, используйте следующий код: @@ -45,7 +143,7 @@ {% endlist %} -## Чтение сообщений {#reading-messages} +### Чтение сообщений {#reading-messages} Сервер хранит [позицию чтения сообщений](../../concepts/topic.md#consumer-offset). После вычитывания очередного сообщения клиент может [отправить на сервер подтверждение обработки](#commit). Позиция чтения изменится, а при новом подключении будут вычитаны только неподтвержденные сообщения. @@ -97,7 +195,7 @@ SDK получает данные с сервера партиями и буфе {% endlist %} -### Чтение с подтверждением обработки сообщений {#commit} +#### Чтение с подтверждением обработки сообщений {#commit} Чтобы подтверждать обработку сообщений по одному, используйте следующий код: @@ -112,7 +210,7 @@ SDK получает данные с сервера партиями и буфе if err != nil { return err } - processMessage(mess) + processMessage(mess) r.Commit(mess.Context(), mess) } } @@ -141,7 +239,7 @@ SDK получает данные с сервера партиями и буфе {% endlist %} -### Чтение с хранением позиции на клиентской стороне {#client-commit} +#### Чтение с хранением позиции на клиентской стороне {#client-commit} При начале чтения клиентский код должен сообщить серверу стартовую позицию чтения: @@ -191,7 +289,7 @@ SDK получает данные с сервера партиями и буфе {% endlist %} -## Обработка серверного прерывания чтения {#stop} +### Обработка серверного прерывания чтения {#stop} В {{ ydb-short-name }} используется серверная балансировка партиций между клиентами. Это означает, что сервер может прерывать чтение сообщений из произвольных партиций. @@ -199,7 +297,7 @@ SDK получает данные с сервера партиями и буфе В случае _жесткого прерывания_ клиент получает уведомление, что работать с сообщениями партиции больше нельзя. Клиент должен прекратить обработку прочитанных сообщений. Неподтвержденные сообщения будут переданы другому читателю. -### Мягкое прерывание чтения {#soft-stop} +#### Мягкое прерывание чтения {#soft-stop} {% list tabs %} @@ -222,7 +320,7 @@ SDK получает данные с сервера партиями и буфе {% endlist %} -### Жесткое прерывание чтения {#hard-stop} +#### Жесткое прерывание чтения {#hard-stop} {% list tabs %} |