diff options
author | bazeltsev <bazeltsev@ydb.tech> | 2023-04-10 10:24:45 +0300 |
---|---|---|
committer | bazeltsev <bazeltsev@ydb.tech> | 2023-04-10 10:24:45 +0300 |
commit | efb303f9483bcb4bf5c3186bcf1c31afa069bf75 (patch) | |
tree | 1794107d32759aa77d7e20180b22982e63209324 | |
parent | 0d09ab3a58e71e487925f636d4d7483f09534959 (diff) | |
download | ydb-efb303f9483bcb4bf5c3186bcf1c31afa069bf75.tar.gz |
Topic with Python SDK docs
updated
-rw-r--r-- | ydb/docs/ru/core/reference/ydb-sdk/feature-parity.md | 10 | ||||
-rw-r--r-- | ydb/docs/ru/core/reference/ydb-sdk/topic.md | 126 |
2 files changed, 74 insertions, 62 deletions
diff --git a/ydb/docs/ru/core/reference/ydb-sdk/feature-parity.md b/ydb/docs/ru/core/reference/ydb-sdk/feature-parity.md index 5465f41c84..c1ee328cd7 100644 --- a/ydb/docs/ru/core/reference/ydb-sdk/feature-parity.md +++ b/ydb/docs/ru/core/reference/ydb-sdk/feature-parity.md @@ -103,12 +103,12 @@ |DescribeNode|\+|?|\+|\-|\-||\-|| |Session (leader election, распределенный лок)|\+|?|\-|\-|\-||\-|| |**Topic service**||||||||| -|CreateTopic|\+|\-|\+|\-|\-|\-|\-|| -|DescribeTopic|\+|\-|\+|\-|\-|\-|\-|| +|CreateTopic|\+|\+|\+|\-|\-|\-|\-|| +|DescribeTopic|\+|\+|\+|\-|\-|\-|\-|| |AlterTopic|\+|\-|\+|\-|\-|\-|\-|| -|DropTopic|\+|\-|\+|\-|\-|\-|\-|| -|StreamWrite|\+|\-|\+|\-|\-|\-|\-|| -|StreamRead|\+|\-|\+|\-|\-|\-|\-|| +|DropTopic|\+|\+|\+|\-|\-|\-|\-|| +|StreamWrite|\+|\+|\+|\-|\-|\-|\-|| +|StreamRead|\+|\+|\+|\-|\-|\-|\-|| |**Ratelimiter service**||||||||| |CreateResource|\+|?|\+|\-|\-|\-|\-|| |AlterResource|\+|?|\+|\-|\-|\-|\-|| diff --git a/ydb/docs/ru/core/reference/ydb-sdk/topic.md b/ydb/docs/ru/core/reference/ydb-sdk/topic.md index 0d569665df..cd81d31259 100644 --- a/ydb/docs/ru/core/reference/ydb-sdk/topic.md +++ b/ydb/docs/ru/core/reference/ydb-sdk/topic.md @@ -4,8 +4,9 @@ Перед выполнением примеров [создайте топик](../ydb-cli/topic-create.md) и [добавьте читателя](../ydb-cli/topic-consumer-add.md). -## Управление топиками -### Создание топика +## Управление топиками {#manage} + +### Создание топика {#create-topic} {% list tabs %} @@ -15,14 +16,15 @@ Полный список поддерживаемых параметров можно посмотреть в [документации SDK](https://pkg.go.dev/github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions#CreateOption). - Пример создания тописка со списком поддерживаемых кодеков и минимальным количество партиций + Пример создания топика со списком поддерживаемых кодеков и минимальным количество партиций + ```go err := db.Topic().Create(ctx, "topic-path", // optional - topicoptions.CreateWithSupportedCodecs(topictypes.CodecRaw, topictypes.CodecGzip), + topicoptions.CreateWithSupportedCodecs(topictypes.CodecRaw, topictypes.CodecGzip), - // optional - topicoptions.CreateWithMinActivePartitions(3), + // optional + topicoptions.CreateWithMinActivePartitions(3), ) ``` @@ -37,11 +39,12 @@ {% endlist %} -### Изменение топика +### Изменение топика {#alter-topic} При изменении топика в параметрах нужно указать путь топика и те параметры, которые будут изменяться. {% list tabs %} + - Go Полный список поддерживаемых параметров можно посмотреть в [документации SDK](https://pkg.go.dev/github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions#AlterOption). @@ -49,21 +52,21 @@ Пример добавления читателя к топику ```go - err := db.Topic().Alter(ctx, "topic-path", - topicoptions.AlterWithAddConsumers(topictypes.Consumer{ - Name: "new-consumer", - SupportedCodecs: []topictypes.Codec{topictypes.CodecRaw, topictypes.CodecGzip}, // optional - }), - ) + err := db.Topic().Alter(ctx, "topic-path", + topicoptions.AlterWithAddConsumers(topictypes.Consumer{ + Name: "new-consumer", + SupportedCodecs: []topictypes.Codec{topictypes.CodecRaw, topictypes.CodecGzip}, // optional + }), + ) ``` - Python - Пока не реализовано + Функциональность находится в разработке. {% endlist %} -### Получение информации о топике +### Получение информации о топике {#describe-topic} {% list tabs %} @@ -71,23 +74,23 @@ ```go descResult, err := db.Topic().Describe(ctx, "topic-path") - if err != nil { - log.Fatalf("failed drop topic: %v", err) - return - } - fmt.Printf("describe: %#v\n", descResult) + if err != nil { + log.Fatalf("failed drop topic: %v", err) + return + } + fmt.Printf("describe: %#v\n", descResult) ``` - Python ```python info = driver.topic_client.describe_topic(topic_path) - print(info) + print(info) ``` {% endlist %} -### Удаление топика +### Удаление топика {#drop-topic} Для удаления топика достаточно указать путь к нему. @@ -104,10 +107,11 @@ ```python driver.topic_client.drop_topic(topic_path) ``` + {% endlist %} +## Запись сообщений {#write} -## Запись сообщений ### Подключение к топику для записи сообщений {#start-writer} На данный момент поддерживается подключение только с совпадающими producer_id и message_group_id, в будущем это ограничение будет снято. @@ -134,8 +138,7 @@ {% endlist %} - -### Асинхронная запись сообщений +### Асинхронная запись сообщений {#async-write} {% list tabs %} @@ -160,23 +163,29 @@ - Python - Для отправки сообщений можно передавать как просто содержимое сообщения (bytes, str), так и вручную задавать некорые свойства. Объекты можно передавать по-одному или сразу в массиве (list). Метод write выполняется асинхронно. Возврат из метода происходит сразу после того как сообщения будут положены во внутренний буфер, обычно это происходит быстро. Ожидание может возникнуть если внутренний буфер уже заполнен и нужно подождать пока часть данных будет отправлена на сервер. + Для отправки сообщений можно передавать как просто содержимое сообщения (bytes, str), так и вручную задавать некоторые свойства. Объекты можно передавать по одному или сразу в массиве (list). Метод `write` выполняется асинхронно. Возврат из метода происходит сразу после того как сообщения будут положены во внутренний буфер клиента, обычно это происходит быстро. Ожидание может возникнуть, если внутренний буфер уже заполнен и нужно подождать, пока часть данных будет отправлена на сервер. ```python - # простая отправка сообщений, без явного указания метаданных, удобно начинать, удобно использовать пока важно только содержимое сообщения. + # Простая отправка сообщений, без явного указания метаданных. + # Удобно начинать, удобно использовать пока важно только содержимое сообщения. writer = driver.topic_client.writer(topic_path) - writer.write("mess") # строки будут переданы в кодировке utf-8, так удобно отправлять текстовые сообщения. - writer.write(bytes([1, 2, 3])) # эти байты будут отправлены "как есть", так удобно отправлять бинарные данные. - writer.write(["mess-1", "mess-2"]) # Тут за один вызов отправляется несколько сообщений - так снижаются накладные расходы на внутренние процессы SDK, имеет смысл при большом потоке сообщений - - # полная форма - используется, когда кроме содержимого сообщения нужно вручную задать и его свойства + writer.write("mess") # Строки будут переданы в кодировке utf-8, так удобно отправлять + # текстовые сообщения. + writer.write(bytes([1, 2, 3])) # Эти байты будут отправлены "как есть", так удобно отправлять + # бинарные данные. + writer.write(["mess-1", "mess-2"]) # Здесь за один вызов отправляется несколько сообщений — + # так снижаются накладные расходы на внутренние процессы SDK, + # имеет смысл при большом потоке сообщений. + + # Полная форма, используется, когда кроме содержимого сообщения нужно вручную задать и его свойства. writer = driver.topic_client.writer(topic="topic-path", auto_seqno=False, auto_created_at=False) writer.write(ydb.TopicWriterMessage("asd", seqno=123, created_at=datetime.datetime.now())) writer.write(ydb.TopicWriterMessage(bytes([1, 2, 3]), seqno=124, created_at=datetime.datetime.now())) - # В полной форме так же можно отправлять несколько сообщений за один вызов функции. Это имеет смысл при большом потоке отправляемых сообщений - для снижения - # накладных расходов на внутренние вызовы SDK + # В полной форме так же можно отправлять несколько сообщений за один вызов функции. + # Это имеет смысл при большом потоке отправляемых сообщений — для снижения + # накладных расходов на внутренние вызовы SDK. writer.write([ ydb.TopicWriterMessage("asd", seqno=123, created_at=datetime.datetime.now()), ydb.TopicWriterMessage(bytes([1, 2, 3]), seqno=124, created_at=datetime.datetime.now(), @@ -186,7 +195,6 @@ {% endlist %} - ### Запись сообщений с подтверждением о сохранении на сервере {% list tabs %} @@ -215,37 +223,42 @@ - Python - Есть два способа для того чтобы дождаться подтверждения о записи сообщений на сервер: - 1. flush() - дожидается подтверждения для всех сообщений, записанных ранее во внутренний буфер - 2. write_with_ack(...) - отправляет сообщение и ждёт подтверждение его доставки от сервера. При отправке нескольких сообщений подряд - это медленный способ + Есть два способа получить подтверждение о записи сообщений на сервере: + + * `flush()` — дожидается подтверждения для всех сообщений, записанных ранее во внутренний буфер. + * `write_with_ack(...)` — отправляет сообщение и ждет подтверждение его доставки от сервера. При отправке нескольких сообщений подряд это способ работает медленно. ```python - # положить несколько сообщений во внутренний буфер, затем дождаться пока все они будут доставлены до сервера + # Положить несколько сообщений во внутренний буфер, затем дождаться, + # пока все они будут доставлены до сервера. for mess in messages: writer.write(mess) writer.flush() - # Можно отправить несколько сообщений и дождаться подтверждения на всю группу + # Можно отправить несколько сообщений и дождаться подтверждения на всю группу. writer.write_with_ack(["mess-1", "mess-2"]) - # Ожидание при отправке каждого сообщения - этот метод вернёт результат только после получения подтверждения от сервера. Это самый медленный - # вариант отправки сообщений, используйте его только если такой режим действительно нужен + # Ожидание при отправке каждого сообщения — этот метод вернет результат только после получения + # подтверждения от сервера. + # Это самый медленный вариант отправки сообщений, используйте его только если такой режим + # действительно нужен. writer.write_with_ack("message") ``` {% endlist %} -### Выбор кодека для сжатия сообщений +### Выбор кодека для сжатия сообщений {#codec} + +По умолчанию SDK выбирает кодек автоматически (с учетом настроек топика). В автоматическом режиме SDK сначала отправляет по одной группе сообщений каждым из разрешенных кодеков, затем иногда будет пробовать сжать сообщения всеми доступными кодеками и выбирать кодек, дающий наименьший размер сообщения. Если для топика список разрешенных кодеков пуст, то автовыбор производится между Raw и Gzip-кодеками. + +При необходимости можно задать фиксированный кодек в опциях подключения. Тогда будет использоваться именно он и замеры проводиться не будут. {% list tabs %} - Go - По умолчанию SDK выбирает кодек автоматически (с учётом настроек топика). В автоматическом режиме SDK сначала отправляет по одной группе сообщений каждым из разрешённых кодеков, затем иногда будет пробовать сжать сообщения всеми доступными кодеками и выбирать кодек, дающий наименьший размер сообщения. Если для топика список разрешённых кодеков пуст, то автовыбор производится между Raw и Gzip-кодеками. - При необходимости можно задать фиксированный кодек в опциях подключения. Тогда будет использоваться именно он и замеры проводиться не будут. - ```go producerAndGroupID := "group-id" writer, _ := db.Topic().StartWriter(producerAndGroupID, "topicName", @@ -256,19 +269,16 @@ - Python - По умолчанию SDK выбирает кодек автоматически (с учётом настроек топика). В автоматическом режиме SDK сначала отправляет по одной группе сообщений каждым из разрешённых кодеков, затем иногда будет пробовать сжать сообщения всеми доступными кодеками и выбирать кодек, дающий наименьший размер сообщения. Если для топика список разрешённых кодеков пуст, то автовыбор производится между Raw и Gzip-кодеками. - При необходимости можно задать фиксированный кодек в опциях подключения. Тогда будет использоваться именно он и замеры проводиться не будут. - ```python writer = driver.topic_client.writer(topic_path, codec=ydb.TopicCodec.GZIP, ) ``` -{% endlist %} +{% endlist %} +## Чтение сообщений {#reading} -## Чтение сообщений ### Подключение к топику для чтения сообщений {#start-reader} Чтобы создать подключение к существующему топику `my-topic` через добавленного ранее читателя `my-consumer`, используйте следующий код: @@ -289,6 +299,7 @@ ```python reader = driver.topic_client.reader(topic="topic-path", consumer="consumer_name") ``` + {% endlist %} Вы также можете использовать расширенный вариант создания подключения, чтобы указать несколько топиков и задать параметры чтения. Следующий код создаст подключение к топикам `my-topic` и `my-specific-topic` через читателя `my-consumer`, а также задаст время, с которого начинать читать сообщения: @@ -315,7 +326,7 @@ - Python - Пока не реализовано + Функциональность находится в разработке. {% endlist %} @@ -356,6 +367,7 @@ SDK получает данные с сервера партиями и буфе message = reader.receive_message() process(message) ``` + {% endlist %} Чтобы прочитать пакет сообщений, используйте следующий код: @@ -415,6 +427,7 @@ SDK получает данные с сервера партиями и буфе process(message) reader.commit(message) ``` + {% endlist %} Для подтверждения обработки пакета сообщений используйте следующий код: @@ -444,6 +457,7 @@ SDK получает данные с сервера партиями и буфе process(batch) reader.commit(batch) ``` + {% endlist %} #### Чтение с хранением позиции на клиентской стороне {#client-commit} @@ -496,7 +510,7 @@ SDK получает данные с сервера партиями и буфе - Python - Пока не реализовано + Функциональность находится в разработке. {% endlist %} @@ -531,7 +545,7 @@ SDK получает данные с сервера партиями и буфе - Python - Специальной обработки не требуется + Специальной обработки не требуется. ```python while True: @@ -540,8 +554,6 @@ SDK получает данные с сервера партиями и буфе reader.commit(batch) ``` - - {% endlist %} #### Жесткое прерывание чтения {#hard-stop} @@ -569,7 +581,7 @@ SDK получает данные с сервера партиями и буфе - Python - В этом примере обработка сообщений в батче остановится если в процессе работы партиция будет отобрана. Такая обработка - это только оптимизация, для того чтобы перестать выполнять лишнюю работу. В простых случаях можно работать не обращая внимания на отбор партиций. + В этом примере обработка сообщений в батче остановится, если в процессе работы партиция будет отобрана. Такая оптимизация требует дополнительного кода на клиенте. В простых случаях, когда обработка отобранных партиций не является проблемой, ее можно не применять. ```python def process_batch(batch): |