diff options
author | rekby <rekby@ydb.tech> | 2022-08-02 18:00:18 +0300 |
---|---|---|
committer | rekby <rekby@ydb.tech> | 2022-08-02 18:00:18 +0300 |
commit | 2c3e463184db19e1b80af7e0d03ee2dd315261a1 (patch) | |
tree | e3771aeb2e59bc08ab29443502324616223e1c91 | |
parent | c0967f7027baefdcd2473da2ae593a62174b0888 (diff) | |
download | ydb-2c3e463184db19e1b80af7e0d03ee2dd315261a1.tar.gz |
topic api docs
-rw-r--r-- | ydb/docs/ru/core/reference/ydb-sdk/toc_i.yaml | 3 | ||||
-rw-r--r-- | ydb/docs/ru/core/reference/ydb-sdk/topic/topic.md | 299 |
2 files changed, 301 insertions, 1 deletions
diff --git a/ydb/docs/ru/core/reference/ydb-sdk/toc_i.yaml b/ydb/docs/ru/core/reference/ydb-sdk/toc_i.yaml index 71e3f7eb00..1b8a4e8d07 100644 --- a/ydb/docs/ru/core/reference/ydb-sdk/toc_i.yaml +++ b/ydb/docs/ru/core/reference/ydb-sdk/toc_i.yaml @@ -5,6 +5,8 @@ items: href: install.md - name: Аутентификация href: auth.md +# - name: Работа с топиками +# href: topic/topic.md - name: Тестовое приложение include: { mode: link, path: example/toc_p.yaml } - name: Обработка ошибок в API @@ -17,4 +19,3 @@ items: href: health-check-api.md - name: Рецепты кода include: { mode: link, path: recipes/toc_p.yaml } - diff --git a/ydb/docs/ru/core/reference/ydb-sdk/topic/topic.md b/ydb/docs/ru/core/reference/ydb-sdk/topic/topic.md new file mode 100644 index 0000000000..c43ed94c79 --- /dev/null +++ b/ydb/docs/ru/core/reference/ydb-sdk/topic/topic.md @@ -0,0 +1,299 @@ +# Работа с топиками +## Основные понятия +### Сообщение +Минимальная, неделимая единица пользовательской информации. Состоит из тела сообщения, свойств сообщения и атрибутов сессии записи. + +### Тело сообщения +Произвольный набор байт, YDB никак не интерпретирует это содержимое. + +### Свойства сообщения +Типизированные поля сообщения, приходящие вне основного тела сообщения и имеющие предопределённый смысл. + +#### codec +Способ, которым закодировано сообщение, обычно указывается алгоритм сжатия, который был использован. SDK будет применять соответствующий алгоритм для расжатия сообщения перед тем как отдать его в клиентский код. + +#### created_at +Время создания сообщения, указывается отправителем, передаётся читателю "как есть", без проверки на стороне сервера + +#### message_group_id +Задаётся отправителем опционально, используется для разбиения сообщений по партициям. + +#### offset +Порядковый номер сообщения внутри партиции, присваивается сервером при сохранении сообщения. У первого сообщения партиции оффсет равен 0, дальше возрастает. В offset-ах могут быть пропуски. + +#### uncompressed_size +Размер расжатого сообщения, задаётся отправителем и передаётся читателю "как есть", без проверки на стороне сервера. + +#### seq_no +Порядковый номер сообщения внутри одного ProducerID. Задаётся писателем сообщения перед отправкой на сервер. +Должен идти по позрастанию внутри ProducerID. + +#### producer_id +ID, задаваемый отправителем. В пределах партиции для каждого producer_id гарантируется возрастание seq_no. + +#### written_at +Время записи сообщения на сервер, задаётся сервером при сохранении сообщения. + +#### write_session_meta +Набор строковых атрибутов ключ/значение, задаваемых отправителем при старте сессии записи. Атрибуты сессии будут одинаковые для всех сообщений, записанных внутри одной сессии. + +### Коммит сообщения +Подтверждение факта обработки сообщения читателем. Означает что читатель обработал сообщение и более в нём не нуждается. Коммиты сообщений независимы для разных consumer-ов. + +### Топик +Именованный набор сообщений. Чтение и запись сообщений ведётся через топики. + +### Партиция +Единица масштабирования топика. Партиции внутри топика пронумерованы, начиная с 0. В конечном итоге сообщения сохраняются в партиции. Сообщения внутри партиции упорядочены и пронумерованы. + +### Читатель +Именованная сущность для чтения данных из топика. Читатель содержит подтверждённые позиции чтения, сохраняемые на стороне сервера. + +### Важный читатель +Читатель, обладающий признаком "важный". Наличие этого признака означает что сообщение не будет удаляться из топика до подтверждения обработки важным читателем, даже если его уже пора удалять по правилам ротации. Долгий простой важного читателя может привести к полному исчерпанию места на диске. + +## Гарантии +В общем случае гарантируется доставка сообщений минимум один раз (least once). + +### Запись сообщений +1. После подтверждения записи сервером сообщение считается надёжно сохранённым и будет доставлено получателям +2. Сообщения с одинаковым message_group_id попадают в одну партицию. +3. message_group_id и партиция не заданы явно - сообщения с одинаковыми producer_id попадут в одну партицию. +4. При записи сообщения в партиции сохраняется их порядок внутри одного producer_id. +5. Если при записи сообщения в партицию seq_no оказывается меньше или равен seq_no ранее подтверждённого сообщения для того же producer_id - сообщение пропускается и не записывается. + +### Чтение сообщений +1. Из каждой партиции сообщения приходят упорядоченными по возрастанию offset +2. Из каждой партиции сообщения приходят с возрастающим seq_no в рамках одного producer_id +3. После подтверждение сервером коммита сообщения оно больше не будет отправляться этому consumer-у + +## Работа с топиками из SDK +### Подключение к топику +Для чтения сообщения из топика нужно подключиться к базе TODO:ССЫЛКА и подписаться на топик + +{% list tabs %} + +- Go + + ```go + reader, err := db.Topic().StartReader("consumer", topicoptions.ReadTopic("asd")) + if err != nil { + return err + } + ``` + +{% endlist %} + +При необходимости читать сообщения из нескольких топиков или задать более точные опции чтения можно использовать расширенный вариант создания читателя + +{% list tabs %} + +- Go + + ```go + reader, err := db.Topic().StartReader("consumer", []topicoptions.ReadSelector{ + { + Path: "test", + }, + { + Path: "test-2", + Partitions: []int64{1, 2, 3}, + ReadFrom: time.Date(2022, 7, 1, 10, 15, 0, 0, time.UTC), + }, + }, + ) + if err != nil { + return err + } + ``` + +{% endlist %} + + +### Чтение сообщений + +Порядок сообщений гарантируется сервером внутри одной партиции. Те же гарантии распространяются и на SDK: сообщения из одной партиции будет упорядочены между собой. При этом сообщения из разных партиций могут приходить не в том порядке как они были записаны на сервер и не в том порядке как сервер отдал их клиенту. При этом исходные гарантии об упорядоченности сообщений внутри партиции сохраняются на всём пути сообщения - от сохранения в партицию до передачи в клиентский код. + +Чтение сообщений по одному: + +{% list tabs %} + +- Go + + ```go + func SimpleReadMessages(ctx context.Context, r *topicreader.Reader) error { + for { + mess, err := r.ReadMessage(ctx) + if err != nil { + return err + } + processMessage(mess) + } + } + ``` + +{% endlist %} + + +При групповой обработке сообщений удобнее получать их пачками - в этом случае все сообщения внутри пачки будут из одной партиции. + +{% list tabs %} + +- Go + + ```go + func ReadBatchesWithBatchCommit(ctx context.Context, r *topicreader.Reader) error { + for { + batch, err := r.ReadMessageBatch(ctx) + if err != nil { + return err + } + processBatch(batch) + } + } + ``` + +{% endlist %} + +### Подтверждение обработки сообщений (commit) +Сервер может сохранять на своей стороне позицию обработанных сообщений - для этого нужно отправлять на сервер подтверждения обработки. Это опциональная возможность и часто она позволяет сделать код проще. + +Обработку сообщений можно подтверждать по одному: + +{% list tabs %} + +- Go + + ```go + func SimpleReadMessages(ctx context.Context, r *topicreader.Reader) error { + for { + ... + r.Commit(mess.Context(), mess) + } + } + ``` + +{% endlist %} + +и пачками + +{% list tabs %} + +- Go + + ```go + func SimpleReadMessages(ctx context.Context, r *topicreader.Reader) error { + for { + ... + r.Commit(batch.Context(), batch) + } + } + ``` + +{% endlist %} + +### Работа без подтверждения обработки +При необходимости читать сообщения без сохранения прогресса в топике - нужно сохранять его на своей стороне и обрабатывать служебные сообщения о начале чтения партиций - чтобы сообщать серверу с какого момента ему продолжать передачу. Без такой обработки сервер будет каждый раз отправлять все имеющиеся сообщения. + +{% list tabs %} + +- Go + + ```go + func ReadWithExplicitPartitionStartStopHandlerAndOwnReadProgressStorage(ctx context.Context, db ydb.Connection) error { + readContext, stopReader := context.WithCancel(context.Background()) + defer stopReader() + + readStartPosition := func( + ctx context.Context, + req topicoptions.GetPartitionStartOffsetRequest, + ) (res topicoptions.GetPartitionStartOffsetResponse, err error) { + offset, err := readLastOffsetFromDB(ctx, req.Topic, req.PartitionID) + res.StartFrom(offset) + + // Reader will stop if return err != nil + return res, err + } + + r, err := db.Topic().StartReader("consumer", topicoptions.ReadTopic("asd"), + topicoptions.WithGetPartitionStartOffset(readStartPosition), + ) + if err != nil { + return err + } + + go func() { + <-readContext.Done() + _ = r.Close(ctx) + }() + + for { + batch, err := r.ReadMessageBatch(readContext) + if err != nil { + return err + } + + processBatch(batch) + _ = externalSystemCommit(batch.Context(), batch.Topic(), batch.PartitionID(), batch.EndOffset()) + } + } + ``` + +{% endlist %} + +### Отбор партиции +В YDB используется серверная балансировка партиций между подключившимися клиентами, поэтому сервер по своей инициативе может прекратить отправку сообщений клиенту из некоторых партиций. В этом случае клиент должен завершить обработку полученных сообщений. + +У сервера есть два способа забрать партицию: мягкий (с предварительным уведомлением) и жёсткий (сообщение что с партицией работать уже нельзя). + +Мягкий вариант уведомления означает что сервер уже закончил отправку сообщений из этой партиции и больше сообщений сюда отправлять не будет, при этом у клиента ещё есть время закончить обработку полученных сообщений. + +Обработка мягкого отбора партиции. +{% list tabs %} + +- Go + + В основном API SDK отдельного уведомления о мягком отборе партиции нет. + Внутри SDK обрабатывает сигнал таким образом, что сразу отдаёт пользователю оставшиеся в буфере сообщения даже если + по настройкам нужно собрать пачку побольше + + ```go + r, _ := db.Topic().StartReader("consumer", nil, + topicoptions.WithBatchReadMinCount(1000), + ) + + for { + batch, _ := r.ReadMessageBatch(ctx) // <- if partition soft stop batch can be less, then 1000 + processBatch(batch) + _ = r.Commit(batch.Context(), batch) + } + + ``` +{% endlist %} + +Жесткий вариант означает что клиент должен прекратить обработку полученных сообщений, т.к. все неподтверждённые собщения будут переданы другому читателю. + +Обработка жёсткого отбора партиции. +{% list tabs %} + +- Go + + У каждого сообщения (и на пачке если сообщения читаются пачками) есть контекст сообщения. Если партиция отобрана - у пачек и сообщений + из этой партиции контекст будет отменён. + + ```go + ctx := batch.Context() // batch.Context() will cancel if partition revoke by server or connection broke + if len(batch.Messages) == 0 { + return + } + + buf := &bytes.Buffer{} + for _, mess := range batch.Messages { + buf.Reset() + _, _ = buf.ReadFrom(mess) + _, _ = io.Copy(buf, mess) + writeMessagesToDB(ctx, buf.Bytes()) + } + ``` + +{% endlist %} |