diff options
author | Dmitry Kardymon <kardymon-d@ydb.tech> | 2024-10-03 17:24:57 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-10-03 17:24:57 +0300 |
commit | 1faa700fdac58d295c2b813ffbd47b9f7cadf2d6 (patch) | |
tree | 35308cd4cf99d7d4be8b5140d44bd0a9e01a5102 | |
parent | d9828e07a8e675ecf0a1a36055ab50c5cb86b50b (diff) | |
download | ydb-1faa700fdac58d295c2b813ffbd47b9f7cadf2d6.tar.gz |
Use topic api in pq_async_io tests (#6156)
-rw-r--r-- | ydb/tests/fq/pq_async_io/ut_helpers.cpp | 43 | ||||
-rw-r--r-- | ydb/tests/fq/pq_async_io/ut_helpers.h | 3 | ||||
-rw-r--r-- | ydb/tests/fq/pq_async_io/ya.make | 3 |
3 files changed, 22 insertions, 27 deletions
diff --git a/ydb/tests/fq/pq_async_io/ut_helpers.cpp b/ydb/tests/fq/pq_async_io/ut_helpers.cpp index f5aaf34f68..a26d2b367b 100644 --- a/ydb/tests/fq/pq_async_io/ut_helpers.cpp +++ b/ydb/tests/fq/pq_async_io/ut_helpers.cpp @@ -148,12 +148,12 @@ void PQWrite( cfg.SetDatabase(GetDefaultPqDatabase()); cfg.SetLog(CreateLogBackend("cerr")); NYdb::TDriver driver(cfg); - NYdb::NPersQueue::TPersQueueClient client(driver); - NYdb::NPersQueue::TWriteSessionSettings sessionSettings; + NYdb::NTopic::TTopicClient client(driver); + NYdb::NTopic::TWriteSessionSettings sessionSettings; sessionSettings .Path(topic) .MessageGroupId("src_id") - .Codec(NYdb::NPersQueue::ECodec::RAW); + .Codec(NYdb::NTopic::ECodec::RAW); auto session = client.CreateSimpleBlockingWriteSession(sessionSettings); for (const TString& data : sequence) { UNIT_ASSERT_C(session->Write(data), "Failed to write message with body \"" << data << "\" to topic " << topic); @@ -175,17 +175,16 @@ std::vector<TString> PQReadUntil( cfg.SetDatabase(GetDefaultPqDatabase()); cfg.SetLog(CreateLogBackend("cerr")); NYdb::TDriver driver(cfg); - NYdb::NPersQueue::TPersQueueClient client(driver); - NYdb::NPersQueue::TReadSessionSettings sessionSettings; + NYdb::NTopic::TTopicClient client(driver); + NYdb::NTopic::TReadSessionSettings sessionSettings; sessionSettings .AppendTopics(topic) - .ConsumerName(DefaultPqConsumer) - .DisableClusterDiscovery(true); + .ConsumerName(DefaultPqConsumer); auto promise = NThreading::NewPromise(); std::vector<TString> result; - sessionSettings.EventHandlers_.SimpleDataHandlers([&](NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent& ev) { + sessionSettings.EventHandlers_.SimpleDataHandlers([&](NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& ev) { for (const auto& message : ev.GetMessages()) { result.emplace_back(message.GetData()); } @@ -194,7 +193,7 @@ std::vector<TString> PQReadUntil( } }, false, false); - std::shared_ptr<NYdb::NPersQueue::IReadSession> session = client.CreateReadSession(sessionSettings); + std::shared_ptr<NYdb::NTopic::IReadSession> session = client.CreateReadSession(sessionSettings); UNIT_ASSERT(promise.GetFuture().Wait(timeout)); session->Close(TDuration::Zero()); session = nullptr; @@ -224,20 +223,18 @@ void PQCreateStream(const TString& streamName) } void AddReadRule(NYdb::TDriver& driver, const TString& streamName) { - NYdb::NPersQueue::TPersQueueClient client(driver); - - auto result = client.AddReadRule( - streamName, - NYdb::NPersQueue::TAddReadRuleSettings() - .ReadRule( - NYdb::NPersQueue::TReadRuleSettings() - .ConsumerName(DefaultPqConsumer) - .ServiceType("yandex-query") - .SupportedCodecs({ - NYdb::NPersQueue::ECodec::RAW - }) - ) - ).ExtractValueSync(); + NYdb::NTopic::TTopicClient client(driver); + + auto alterTopicSettings = + NYdb::NTopic::TAlterTopicSettings() + .BeginAddConsumer(DefaultPqConsumer) + .SetSupportedCodecs( + { + NYdb::NTopic::ECodec::RAW + }) + .EndAddConsumer(); + auto result = client.AlterTopic(streamName, alterTopicSettings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); } diff --git a/ydb/tests/fq/pq_async_io/ut_helpers.h b/ydb/tests/fq/pq_async_io/ut_helpers.h index 6e9f92007d..1fc4653a04 100644 --- a/ydb/tests/fq/pq_async_io/ut_helpers.h +++ b/ydb/tests/fq/pq_async_io/ut_helpers.h @@ -9,8 +9,7 @@ #include <ydb/library/yql/dq/actors/protos/dq_events.pb.h> #include <ydb/library/yql/minikql/mkql_alloc.h> -#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h> -#include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> #include <ydb/core/testlib/basics/runtime.h> #include <library/cpp/testing/unittest/registar.h> diff --git a/ydb/tests/fq/pq_async_io/ya.make b/ydb/tests/fq/pq_async_io/ya.make index c27e93ce4c..82f425209b 100644 --- a/ydb/tests/fq/pq_async_io/ya.make +++ b/ydb/tests/fq/pq_async_io/ya.make @@ -7,8 +7,7 @@ SRCS( PEERDIR( ydb/library/yql/minikql/computation/llvm14 ydb/library/yql/providers/common/ut_helpers - ydb/public/sdk/cpp/client/ydb_datastreams - ydb/public/sdk/cpp/client/ydb_persqueue_public + ydb/public/sdk/cpp/client/ydb_topic ) YQL_LAST_ABI_VERSION() |