summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoryumkam <[email protected]>2025-04-16 14:53:37 +0300
committerGitHub <[email protected]>2025-04-16 14:53:37 +0300
commitb780fb0d60ec283a64188c583762c483f9262b2d (patch)
treeb9b01fc1732d560227be8a5c2e2cf09adaf6d39e
parent55d03e28beb1c91add84d0c5380e91bf0983a63d (diff)
pq: support write to logbroker federation (#16796)
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp21
-rw-r--r--ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp17
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_topic_client.h4
-rw-r--r--ydb/tests/fq/pq_async_io/mock_pq_gateway.cpp3
4 files changed, 33 insertions, 12 deletions
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp
index f3d7ddd2947..73b010c4be6 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp
@@ -14,6 +14,7 @@
#include <yql/essentials/utils/yql_panic.h>
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/client.h>
+#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/federated_topic/federated_topic.h>
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/credentials/credentials.h>
#include <ydb/library/actors/core/actor.h>
@@ -317,19 +318,21 @@ private:
: NYdb::NTopic::ECodec::GZIP);
}
- ITopicClient& GetTopicClient() {
- if (!TopicClient) {
- TopicClient = PqGateway->GetTopicClient(Driver, GetTopicClientSettings());
+ IFederatedTopicClient& GetFederatedTopicClient() {
+ if (!FederatedTopicClient) {
+ FederatedTopicClient = PqGateway->GetFederatedTopicClient(Driver, GetFederatedTopicClientSettings());
}
- return *TopicClient;
+ return *FederatedTopicClient;
}
- NYdb::NTopic::TTopicClientSettings GetTopicClientSettings() {
- return PqGateway->GetTopicClientSettings()
- .Database(SinkParams.GetDatabase())
+ NYdb::NFederatedTopic::TFederatedTopicClientSettings GetFederatedTopicClientSettings() const {
+ NYdb::NFederatedTopic::TFederatedTopicClientSettings opts = PqGateway->GetFederatedTopicClientSettings();
+ opts.Database(SinkParams.GetDatabase())
.DiscoveryEndpoint(SinkParams.GetEndpoint())
.SslCredentials(NYdb::TSslCredentials(SinkParams.GetUseSsl()))
.CredentialsProviderFactory(CredentialsProviderFactory);
+
+ return opts;
}
static i64 GetItemSize(const TString& item) {
@@ -338,7 +341,7 @@ private:
void CreateSessionIfNotExists() {
if (!WriteSession) {
- WriteSession = GetTopicClient().CreateWriteSession(GetWriteSessionSettings());
+ WriteSession = GetFederatedTopicClient().CreateWriteSession(GetWriteSessionSettings());
SubscribeOnNextEvent();
}
}
@@ -496,7 +499,7 @@ private:
i64 FreeSpace = 0;
bool Finished = false;
- ITopicClient::TPtr TopicClient;
+ IFederatedTopicClient::TPtr FederatedTopicClient;
std::shared_ptr<NYdb::NTopic::IWriteSession> WriteSession;
TString SourceId;
ui64 NextSeqNo = 1;
diff --git a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp
index 0999db7a10b..ad56a10a580 100644
--- a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp
+++ b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp
@@ -9,8 +9,10 @@
namespace NYql {
struct TDummyFederatedTopicClient : public IFederatedTopicClient {
- TDummyFederatedTopicClient(const NYdb::NTopic::TFederatedTopicClientSettings& settings = {}):
- FederatedClientSettings_(settings) {}
+ using TClusterNPath = TDummyPqGateway::TClusterNPath;
+ TDummyFederatedTopicClient(const NYdb::NTopic::TFederatedTopicClientSettings& settings = {}, const THashMap<TClusterNPath, TDummyTopic>& topics = {})
+ : Topics_(topics)
+ , FederatedClientSettings_(settings) {}
NThreading::TFuture<std::vector<NYdb::NFederatedTopic::TFederatedTopicClient::TClusterInfo>> GetAllTopicClusters() override {
std::vector<NYdb::NFederatedTopic::TFederatedTopicClient::TClusterInfo> dbInfo;
@@ -21,8 +23,17 @@ struct TDummyFederatedTopicClient : public IFederatedTopicClient {
NYdb::NFederatedTopic::TFederatedTopicClient::TClusterInfo::EStatus::AVAILABLE);
return NThreading::MakeFuture(std::move(dbInfo));
}
+
+ std::shared_ptr<NYdb::NTopic::IWriteSession> CreateWriteSession(const NYdb::NFederatedTopic::TFederatedWriteSessionSettings& settings) override {
+ if (!FileTopicClient_) {
+ FileTopicClient_ = MakeIntrusive<TFileTopicClient>(std::move(Topics_));
+ }
+ return FileTopicClient_->CreateWriteSession(settings);
+ }
private:
+ THashMap<TClusterNPath, TDummyTopic> Topics_;
NYdb::NFederatedTopic::TFederatedTopicClientSettings FederatedClientSettings_;
+ TFileTopicClient::TPtr FileTopicClient_;
};
NThreading::TFuture<void> TDummyPqGateway::OpenSession(const TString& sessionId, const TString& username) {
@@ -103,7 +114,7 @@ ITopicClient::TPtr TDummyPqGateway::GetTopicClient(const NYdb::TDriver&, const N
}
IFederatedTopicClient::TPtr TDummyPqGateway::GetFederatedTopicClient(const NYdb::TDriver&, const NYdb::NFederatedTopic::TFederatedTopicClientSettings& settings) {
- return MakeIntrusive<TDummyFederatedTopicClient>(settings);
+ return MakeIntrusive<TDummyFederatedTopicClient>(settings, Topics);
}
NYdb::NFederatedTopic::TFederatedTopicClientSettings TDummyPqGateway::GetFederatedTopicClientSettings() const {
return {};
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_topic_client.h b/ydb/library/yql/providers/pq/provider/yql_pq_topic_client.h
index ed4af81a40f..e2e21c280b9 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_topic_client.h
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_topic_client.h
@@ -37,6 +37,7 @@ public:
using TPtr = TIntrusivePtr<IFederatedTopicClient>;
virtual NThreading::TFuture<std::vector<NYdb::NFederatedTopic::TFederatedTopicClient::TClusterInfo>> GetAllTopicClusters() = 0;
+ virtual std::shared_ptr<NYdb::NTopic::IWriteSession> CreateWriteSession(const NYdb::NFederatedTopic::TFederatedWriteSessionSettings& settings) = 0;
};
class TNativeTopicClient : public ITopicClient {
@@ -103,6 +104,9 @@ public:
NThreading::TFuture<std::vector<NYdb::NFederatedTopic::TFederatedTopicClient::TClusterInfo>> GetAllTopicClusters() override {
return FederatedClient_.GetAllClusterInfo();
}
+ std::shared_ptr<NYdb::NTopic::IWriteSession> CreateWriteSession(const NYdb::NFederatedTopic::TFederatedWriteSessionSettings& settings) override {
+ return FederatedClient_.CreateWriteSession(settings);
+ }
~TNativeFederatedTopicClient() {}
private:
diff --git a/ydb/tests/fq/pq_async_io/mock_pq_gateway.cpp b/ydb/tests/fq/pq_async_io/mock_pq_gateway.cpp
index 9cbec0779de..3a61a6bfd67 100644
--- a/ydb/tests/fq/pq_async_io/mock_pq_gateway.cpp
+++ b/ydb/tests/fq/pq_async_io/mock_pq_gateway.cpp
@@ -120,6 +120,9 @@ class TMockPqGateway : public IMockPqGateway {
NYdb::NFederatedTopic::TFederatedTopicClient::TClusterInfo::EStatus::AVAILABLE);
return NThreading::MakeFuture(std::move(dbInfo));
}
+ std::shared_ptr<NYdb::NTopic::IWriteSession> CreateWriteSession(const NYdb::NFederatedTopic::TFederatedWriteSessionSettings& /*settings*/) override {
+ return nullptr;
+ }
};
public: