aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDmitry Kardymon <kardymon-d@ydb.tech>2024-10-03 17:24:57 +0300
committerGitHub <noreply@github.com>2024-10-03 17:24:57 +0300
commit1faa700fdac58d295c2b813ffbd47b9f7cadf2d6 (patch)
tree35308cd4cf99d7d4be8b5140d44bd0a9e01a5102
parentd9828e07a8e675ecf0a1a36055ab50c5cb86b50b (diff)
downloadydb-1faa700fdac58d295c2b813ffbd47b9f7cadf2d6.tar.gz
Use topic api in pq_async_io tests (#6156)
-rw-r--r--ydb/tests/fq/pq_async_io/ut_helpers.cpp43
-rw-r--r--ydb/tests/fq/pq_async_io/ut_helpers.h3
-rw-r--r--ydb/tests/fq/pq_async_io/ya.make3
3 files changed, 22 insertions, 27 deletions
diff --git a/ydb/tests/fq/pq_async_io/ut_helpers.cpp b/ydb/tests/fq/pq_async_io/ut_helpers.cpp
index f5aaf34f68..a26d2b367b 100644
--- a/ydb/tests/fq/pq_async_io/ut_helpers.cpp
+++ b/ydb/tests/fq/pq_async_io/ut_helpers.cpp
@@ -148,12 +148,12 @@ void PQWrite(
cfg.SetDatabase(GetDefaultPqDatabase());
cfg.SetLog(CreateLogBackend("cerr"));
NYdb::TDriver driver(cfg);
- NYdb::NPersQueue::TPersQueueClient client(driver);
- NYdb::NPersQueue::TWriteSessionSettings sessionSettings;
+ NYdb::NTopic::TTopicClient client(driver);
+ NYdb::NTopic::TWriteSessionSettings sessionSettings;
sessionSettings
.Path(topic)
.MessageGroupId("src_id")
- .Codec(NYdb::NPersQueue::ECodec::RAW);
+ .Codec(NYdb::NTopic::ECodec::RAW);
auto session = client.CreateSimpleBlockingWriteSession(sessionSettings);
for (const TString& data : sequence) {
UNIT_ASSERT_C(session->Write(data), "Failed to write message with body \"" << data << "\" to topic " << topic);
@@ -175,17 +175,16 @@ std::vector<TString> PQReadUntil(
cfg.SetDatabase(GetDefaultPqDatabase());
cfg.SetLog(CreateLogBackend("cerr"));
NYdb::TDriver driver(cfg);
- NYdb::NPersQueue::TPersQueueClient client(driver);
- NYdb::NPersQueue::TReadSessionSettings sessionSettings;
+ NYdb::NTopic::TTopicClient client(driver);
+ NYdb::NTopic::TReadSessionSettings sessionSettings;
sessionSettings
.AppendTopics(topic)
- .ConsumerName(DefaultPqConsumer)
- .DisableClusterDiscovery(true);
+ .ConsumerName(DefaultPqConsumer);
auto promise = NThreading::NewPromise();
std::vector<TString> result;
- sessionSettings.EventHandlers_.SimpleDataHandlers([&](NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent& ev) {
+ sessionSettings.EventHandlers_.SimpleDataHandlers([&](NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& ev) {
for (const auto& message : ev.GetMessages()) {
result.emplace_back(message.GetData());
}
@@ -194,7 +193,7 @@ std::vector<TString> PQReadUntil(
}
}, false, false);
- std::shared_ptr<NYdb::NPersQueue::IReadSession> session = client.CreateReadSession(sessionSettings);
+ std::shared_ptr<NYdb::NTopic::IReadSession> session = client.CreateReadSession(sessionSettings);
UNIT_ASSERT(promise.GetFuture().Wait(timeout));
session->Close(TDuration::Zero());
session = nullptr;
@@ -224,20 +223,18 @@ void PQCreateStream(const TString& streamName)
}
void AddReadRule(NYdb::TDriver& driver, const TString& streamName) {
- NYdb::NPersQueue::TPersQueueClient client(driver);
-
- auto result = client.AddReadRule(
- streamName,
- NYdb::NPersQueue::TAddReadRuleSettings()
- .ReadRule(
- NYdb::NPersQueue::TReadRuleSettings()
- .ConsumerName(DefaultPqConsumer)
- .ServiceType("yandex-query")
- .SupportedCodecs({
- NYdb::NPersQueue::ECodec::RAW
- })
- )
- ).ExtractValueSync();
+ NYdb::NTopic::TTopicClient client(driver);
+
+ auto alterTopicSettings =
+ NYdb::NTopic::TAlterTopicSettings()
+ .BeginAddConsumer(DefaultPqConsumer)
+ .SetSupportedCodecs(
+ {
+ NYdb::NTopic::ECodec::RAW
+ })
+ .EndAddConsumer();
+ auto result = client.AlterTopic(streamName, alterTopicSettings).ExtractValueSync();
+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
}
diff --git a/ydb/tests/fq/pq_async_io/ut_helpers.h b/ydb/tests/fq/pq_async_io/ut_helpers.h
index 6e9f92007d..1fc4653a04 100644
--- a/ydb/tests/fq/pq_async_io/ut_helpers.h
+++ b/ydb/tests/fq/pq_async_io/ut_helpers.h
@@ -9,8 +9,7 @@
#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h>
#include <ydb/library/yql/minikql/mkql_alloc.h>
-#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h>
-#include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h>
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
#include <ydb/core/testlib/basics/runtime.h>
#include <library/cpp/testing/unittest/registar.h>
diff --git a/ydb/tests/fq/pq_async_io/ya.make b/ydb/tests/fq/pq_async_io/ya.make
index c27e93ce4c..82f425209b 100644
--- a/ydb/tests/fq/pq_async_io/ya.make
+++ b/ydb/tests/fq/pq_async_io/ya.make
@@ -7,8 +7,7 @@ SRCS(
PEERDIR(
ydb/library/yql/minikql/computation/llvm14
ydb/library/yql/providers/common/ut_helpers
- ydb/public/sdk/cpp/client/ydb_datastreams
- ydb/public/sdk/cpp/client/ydb_persqueue_public
+ ydb/public/sdk/cpp/client/ydb_topic
)
YQL_LAST_ABI_VERSION()