diff options
| author | yumkam <[email protected]> | 2025-04-16 14:53:37 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2025-04-16 14:53:37 +0300 |
| commit | b780fb0d60ec283a64188c583762c483f9262b2d (patch) | |
| tree | b9b01fc1732d560227be8a5c2e2cf09adaf6d39e | |
| parent | 55d03e28beb1c91add84d0c5380e91bf0983a63d (diff) | |
pq: support write to logbroker federation (#16796)
4 files changed, 33 insertions, 12 deletions
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp index f3d7ddd2947..73b010c4be6 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp @@ -14,6 +14,7 @@ #include <yql/essentials/utils/yql_panic.h> #include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/client.h> +#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/federated_topic/federated_topic.h> #include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/credentials/credentials.h> #include <ydb/library/actors/core/actor.h> @@ -317,19 +318,21 @@ private: : NYdb::NTopic::ECodec::GZIP); } - ITopicClient& GetTopicClient() { - if (!TopicClient) { - TopicClient = PqGateway->GetTopicClient(Driver, GetTopicClientSettings()); + IFederatedTopicClient& GetFederatedTopicClient() { + if (!FederatedTopicClient) { + FederatedTopicClient = PqGateway->GetFederatedTopicClient(Driver, GetFederatedTopicClientSettings()); } - return *TopicClient; + return *FederatedTopicClient; } - NYdb::NTopic::TTopicClientSettings GetTopicClientSettings() { - return PqGateway->GetTopicClientSettings() - .Database(SinkParams.GetDatabase()) + NYdb::NFederatedTopic::TFederatedTopicClientSettings GetFederatedTopicClientSettings() const { + NYdb::NFederatedTopic::TFederatedTopicClientSettings opts = PqGateway->GetFederatedTopicClientSettings(); + opts.Database(SinkParams.GetDatabase()) .DiscoveryEndpoint(SinkParams.GetEndpoint()) .SslCredentials(NYdb::TSslCredentials(SinkParams.GetUseSsl())) .CredentialsProviderFactory(CredentialsProviderFactory); + + return opts; } static i64 GetItemSize(const TString& item) { @@ -338,7 +341,7 @@ private: void CreateSessionIfNotExists() { if (!WriteSession) { - WriteSession = GetTopicClient().CreateWriteSession(GetWriteSessionSettings()); + WriteSession = GetFederatedTopicClient().CreateWriteSession(GetWriteSessionSettings()); SubscribeOnNextEvent(); } } @@ -496,7 +499,7 @@ private: i64 FreeSpace = 0; bool Finished = false; - ITopicClient::TPtr TopicClient; + IFederatedTopicClient::TPtr FederatedTopicClient; std::shared_ptr<NYdb::NTopic::IWriteSession> WriteSession; TString SourceId; ui64 NextSeqNo = 1; diff --git a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp index 0999db7a10b..ad56a10a580 100644 --- a/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp +++ b/ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp @@ -9,8 +9,10 @@ namespace NYql { struct TDummyFederatedTopicClient : public IFederatedTopicClient { - TDummyFederatedTopicClient(const NYdb::NTopic::TFederatedTopicClientSettings& settings = {}): - FederatedClientSettings_(settings) {} + using TClusterNPath = TDummyPqGateway::TClusterNPath; + TDummyFederatedTopicClient(const NYdb::NTopic::TFederatedTopicClientSettings& settings = {}, const THashMap<TClusterNPath, TDummyTopic>& topics = {}) + : Topics_(topics) + , FederatedClientSettings_(settings) {} NThreading::TFuture<std::vector<NYdb::NFederatedTopic::TFederatedTopicClient::TClusterInfo>> GetAllTopicClusters() override { std::vector<NYdb::NFederatedTopic::TFederatedTopicClient::TClusterInfo> dbInfo; @@ -21,8 +23,17 @@ struct TDummyFederatedTopicClient : public IFederatedTopicClient { NYdb::NFederatedTopic::TFederatedTopicClient::TClusterInfo::EStatus::AVAILABLE); return NThreading::MakeFuture(std::move(dbInfo)); } + + std::shared_ptr<NYdb::NTopic::IWriteSession> CreateWriteSession(const NYdb::NFederatedTopic::TFederatedWriteSessionSettings& settings) override { + if (!FileTopicClient_) { + FileTopicClient_ = MakeIntrusive<TFileTopicClient>(std::move(Topics_)); + } + return FileTopicClient_->CreateWriteSession(settings); + } private: + THashMap<TClusterNPath, TDummyTopic> Topics_; NYdb::NFederatedTopic::TFederatedTopicClientSettings FederatedClientSettings_; + TFileTopicClient::TPtr FileTopicClient_; }; NThreading::TFuture<void> TDummyPqGateway::OpenSession(const TString& sessionId, const TString& username) { @@ -103,7 +114,7 @@ ITopicClient::TPtr TDummyPqGateway::GetTopicClient(const NYdb::TDriver&, const N } IFederatedTopicClient::TPtr TDummyPqGateway::GetFederatedTopicClient(const NYdb::TDriver&, const NYdb::NFederatedTopic::TFederatedTopicClientSettings& settings) { - return MakeIntrusive<TDummyFederatedTopicClient>(settings); + return MakeIntrusive<TDummyFederatedTopicClient>(settings, Topics); } NYdb::NFederatedTopic::TFederatedTopicClientSettings TDummyPqGateway::GetFederatedTopicClientSettings() const { return {}; diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_topic_client.h b/ydb/library/yql/providers/pq/provider/yql_pq_topic_client.h index ed4af81a40f..e2e21c280b9 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_topic_client.h +++ b/ydb/library/yql/providers/pq/provider/yql_pq_topic_client.h @@ -37,6 +37,7 @@ public: using TPtr = TIntrusivePtr<IFederatedTopicClient>; virtual NThreading::TFuture<std::vector<NYdb::NFederatedTopic::TFederatedTopicClient::TClusterInfo>> GetAllTopicClusters() = 0; + virtual std::shared_ptr<NYdb::NTopic::IWriteSession> CreateWriteSession(const NYdb::NFederatedTopic::TFederatedWriteSessionSettings& settings) = 0; }; class TNativeTopicClient : public ITopicClient { @@ -103,6 +104,9 @@ public: NThreading::TFuture<std::vector<NYdb::NFederatedTopic::TFederatedTopicClient::TClusterInfo>> GetAllTopicClusters() override { return FederatedClient_.GetAllClusterInfo(); } + std::shared_ptr<NYdb::NTopic::IWriteSession> CreateWriteSession(const NYdb::NFederatedTopic::TFederatedWriteSessionSettings& settings) override { + return FederatedClient_.CreateWriteSession(settings); + } ~TNativeFederatedTopicClient() {} private: diff --git a/ydb/tests/fq/pq_async_io/mock_pq_gateway.cpp b/ydb/tests/fq/pq_async_io/mock_pq_gateway.cpp index 9cbec0779de..3a61a6bfd67 100644 --- a/ydb/tests/fq/pq_async_io/mock_pq_gateway.cpp +++ b/ydb/tests/fq/pq_async_io/mock_pq_gateway.cpp @@ -120,6 +120,9 @@ class TMockPqGateway : public IMockPqGateway { NYdb::NFederatedTopic::TFederatedTopicClient::TClusterInfo::EStatus::AVAILABLE); return NThreading::MakeFuture(std::move(dbInfo)); } + std::shared_ptr<NYdb::NTopic::IWriteSession> CreateWriteSession(const NYdb::NFederatedTopic::TFederatedWriteSessionSettings& /*settings*/) override { + return nullptr; + } }; public: |
