diff options
author | alextarazanov <alextarazanov@yandex-team.com> | 2022-10-11 09:57:00 +0300 |
---|---|---|
committer | alextarazanov <alextarazanov@yandex-team.com> | 2022-10-11 09:57:00 +0300 |
commit | 60c4d2a23c79eb93eaf3c5f230a48c8861f84d2b (patch) | |
tree | 4fd8f31edabbb220d72b27df498bc580d51b80d2 | |
parent | f970a39bdd247dd49a1204c59ba07966d2886905 (diff) | |
download | ydb-60c4d2a23c79eb93eaf3c5f230a48c8861f84d2b.tar.gz |
[review] [YDB] Topic Go SDK Check translate
-rw-r--r-- | ydb/docs/en/core/reference/ydb-sdk/topic.md | 191 |
1 files changed, 183 insertions, 8 deletions
diff --git a/ydb/docs/en/core/reference/ydb-sdk/topic.md b/ydb/docs/en/core/reference/ydb-sdk/topic.md index 2019e813e65..7f5d6428201 100644 --- a/ydb/docs/en/core/reference/ydb-sdk/topic.md +++ b/ydb/docs/en/core/reference/ydb-sdk/topic.md @@ -4,7 +4,182 @@ This article provides examples of how to use the {{ ydb-short-name }} SDK to wor Before performing the examples, [create a topic](../ydb-cli/topic-create.md) and [add a consumer](../ydb-cli/topic-consumer-add.md). -## Connecting to a topic {#start-reader} +## Managing topics +### Creating a topic + +{% list tabs %} + +The only mandatory parameter for creating a topic is its path, other parameters are optional. + +- Go + + For a full list of supported parameters, see the [SDK documentation](https://pkg.go.dev/github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions#CreateOption). + + Example of creating a topic with a list of supported codecs and a minimum number of partitions + ```go + err := db.Topic().Create(ctx, "topic-path", + // optional + topicoptions.CreateWithSupportedCodecs(topictypes.CodecRaw, topictypes.CodecGzip), + + // optional + topicoptions.CreateWithMinActivePartitions(3), + ) + ``` + +{% endlist %} + +### Updating a topic + +When you update a topic, you must specify the topic path and the parameters to be changed. + +{% list tabs %} +- Go + + For a full list of supported parameters, see the [SDK documentation](https://pkg.go.dev/github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions#AlterOption). + + Example of adding a consumer to a topic + + ```go + err := db.Topic().Alter(ctx, "topic-path", + topicoptions.AlterWithAddConsumers(topictypes.Consumer{ + Name: "new-consumer", + SupportedCodecs: []topictypes.Codec{topictypes.CodecRaw, topictypes.CodecGzip}, // optional + }), + ) + ``` +{% endlist %} + +### Getting topic information + +{% list tabs %} + +- Go + + ```go + descResult, err := db.Topic().Describe(ctx, "topic-path") + if err != nil { + log.Fatalf("failed drop topic: %v", err) + return + } + fmt.Printf("describe: %#v\n", descResult) + ``` + +{% endlist %} + +### Deleting a topic + +To delete a topic, just specify the path to it. + +{% list tabs %} + +- Go + + ```go + err := db.Topic().Drop(ctx, "topic-path") + ``` + +{% endlist %} + + +## Message writes +### Connecting to a topic for message writes {#start-writer} + +Only connections with matching producer_id and message_group_id are currently supported. This restriction will be removed in the future. + +{% list tabs %} + +- Go + + ```go + producerAndGroupID := "group-id" + writer, err := db.Topic().StartWriter(producerAndGroupID, "topicName", + topicoptions.WithMessageGroupID(producerAndGroupID), + ) + if err != nil { + return err + } + ``` + +{% endlist %} + + +### Asynchronous message writes + +{% list tabs %} + +- Go + + To send a message, just save Reader in the Data field, from which the data can be read. You can expect the data of each message to be read once (or until the first error). By the time you return the data from Write, it will already have been read and stored in the internal buffer. + + By default, SeqNo and the message creation date are set automatically. + + By default, Write is performed asynchronously: data from messages is processed and stored in the internal buffer, sending is done in the background. Writer reconnects to the YDB if the connection fails and resends the message if possible. If an error that cannot be repeated is received , Writer stops and subsequent Write calls will end with an error. + + ```go + err := writer.Write(ctx, + topicwriter.Message{Data: strings.NewReader("1")}, + topicwriter.Message{Data: bytes.NewReader([]byte{1,2,3})}, + topicwriter.Message{Data: strings.NewReader("3")}, + ) + if err == nil { + return err + } + ``` + +{% endlist %} + + +### Message writes with storage confirmation on the server + +{% list tabs %} + +- Go + + When connected, you can specify the synchronous message write option: topicoptions.WithSyncWrite(true). Then Write will only return after receiving a confirmation from the server that all messages passed in the call have been saved. If necessary, the SDK will reconnect and retry sending messages as usual. In this mode, the context only controls the response time from the SDK, meaning the SDK will continue trying to send messages even after the context is canceled. + + ```go + + producerAndGroupID := "group-id" + writer, _ := db.Topic().StartWriter(producerAndGroupID, "topicName", + topicoptions.WithMessageGroupID(producerAndGroupID), + topicoptions.WithSyncWrite(true), + ) + + err = writer.Write(ctx, + topicwriter.Message{Data: strings.NewReader("1")}, + topicwriter.Message{Data: bytes.NewReader([]byte{1,2,3})}, + topicwriter.Message{Data: strings.NewReader("3")}, + ) + if err == nil { + return err + } + ``` + +{% endlist %} + +### Selecting a codec for message compression + +{% list tabs %} + +- Go + + By default, the SDK selects the codec automatically (subject to topic settings). In automatic mode, the SDK will first send one group of messages with each of the allowed codecs, then sometimes try to compress messages with all available codecs, and select the codec that provides the smallest message size. If the list of allowed codecs for the topic is empty, auto-select is made between Raw and Gzip codecs. + If necessary, a fixed codec can be set in the connection options. It will then be used and no measurements will be taken. + + ```go + producerAndGroupID := "group-id" + writer, _ := db.Topic().StartWriter(producerAndGroupID, "topicName", + topicoptions.WithMessageGroupID(producerAndGroupID), + topicoptions.WithCodec(topictypes.CodecGzip), + ) + ``` + +{% endlist %} + + + +## Message reads +### Connecting to a topic for message reads {#start-reader} To create a connection to the existing `my-topic` topic via the added `my-consumer` consumer, use the following code: @@ -45,7 +220,7 @@ You can also use the advanced connection creation option to specify multiple top {% endlist %} -## Reading messages {#reading-messages} +### Reading messages {#reading-messages} The server stores the [consumer offset](../../concepts/topic.md#consumer-offset). After reading a message, the client can [send a commit to the server](#commit). The consumer offset will change and only uncommitted messages will be read in case of a new connection. @@ -97,7 +272,7 @@ To read message batches, use the following code: {% endlist %} -### Reading with a commit {#commit} +#### Reading with a commit {#commit} To commit messages one by one, use the following code: @@ -112,7 +287,7 @@ To commit messages one by one, use the following code: if err != nil { return err } - processMessage(mess) + processMessage(mess) r.Commit(mess.Context(), mess) } } @@ -141,7 +316,7 @@ To commit message batches, use the following code: {% endlist %} -### Reading with consumer offset storage on the client side {#client-commit} +#### Reading with consumer offset storage on the client side {#client-commit} When reading starts, the client code must transmit the starting consumer offset to the server: @@ -191,7 +366,7 @@ When reading starts, the client code must transmit the starting consumer offset {% endlist %} -## Processing a server read interrupt {#stop} +### Processing a server read interrupt {#stop} {{ ydb-short-name }} uses server-based partition balancing between clients. This means that the server can interrupt the reading of messages from random partitions. @@ -199,7 +374,7 @@ In case of a _soft interruption_, the client receives a notification that the se In case of a _hard interruption_, the client receives a notification that it is no longer possible to work with partitions. The client must stop processing the read messages. Uncommited messages will be transferred to another consumer. -### Soft reading interruption {#soft-stop} +#### Soft reading interruption {#soft-stop} {% list tabs %} @@ -222,7 +397,7 @@ In case of a _hard interruption_, the client receives a notification that it is {% endlist %} -### Hard reading interruption {#hard-stop} +#### Hard reading interruption {#hard-stop} {% list tabs %} |