diff options
author | ildar-khisambeev <ikhis@ydb.tech> | 2024-01-31 11:52:15 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-31 11:52:15 +0300 |
commit | 94651fc6c13f632932aaa1a57a1c790223452a98 (patch) | |
tree | f89370c25d70e569c68e66a0a0233916ee94b298 | |
parent | 3b1a363cecc7a179a26519f4c3f3964c43183c2b (diff) | |
download | ydb-94651fc6c13f632932aaa1a57a1c790223452a98.tar.gz |
LOGBROKER-8783 propose api (#537)
* Propose read policy settings for fed sdk api
* better api
* implementation
* issues (except logging)
* provide topic origin info
* more issues, add basic test
8 files changed, 528 insertions, 132 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h index 121097e39d..534193d35b 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h @@ -4,6 +4,10 @@ #include <ydb/public/api/protos/ydb_federation_discovery.pb.h> +#include <ydb/public/sdk/cpp/client/ydb_types/exceptions/exceptions.h> + +#include <unordered_set> + namespace NYdb::NFederatedTopic { using NTopic::TPrintable; @@ -16,10 +20,17 @@ struct TFederatedPartitionSession : public TThrRefBase, public TPrintable<TFeder using TPtr = TIntrusivePtr<TFederatedPartitionSession>; public: - TFederatedPartitionSession(const NTopic::TPartitionSession::TPtr& partitionSession, std::shared_ptr<TDbInfo> db) + TFederatedPartitionSession(const NTopic::TPartitionSession::TPtr& partitionSession, + std::shared_ptr<TDbInfo> db, + std::shared_ptr<TDbInfo> originDb = nullptr, + TString originPath = "") : PartitionSession(partitionSession) - , Db(std::move(db)) - {} + , ReadSourceDatabase(std::move(db)) + , TopicOriginDatabase(originDb ? std::move(originDb) : ReadSourceDatabase) + , TopicOriginPath(originPath ? std::move(originPath) : PartitionSession->GetTopicPath()) + { + Y_ABORT_UNLESS(ReadSourceDatabase); + } //! Request partition session status. //! Result will come to TPartitionSessionStatusEvent. @@ -39,7 +50,7 @@ public: //! Topic path. const TString& GetTopicPath() const { - return PartitionSession->GetTopicPath(); + return TopicOriginPath; } //! Partition id. @@ -48,34 +59,56 @@ public: } const TString& GetDatabaseName() const { - return Db->name(); + return GetTopicOriginDatabaseName(); } const TString& GetDatabasePath() const { - return Db->path(); + return GetTopicOriginDatabasePath(); } const TString& GetDatabaseId() const { - return Db->id(); + return GetTopicOriginDatabaseId(); + } + + const TString& GetReadSourceDatabaseName() const { + return ReadSourceDatabase->name(); + } + + const TString& GetReadSourceDatabasePath() const { + return ReadSourceDatabase->path(); + } + + const TString& GetReadSourceDatabaseId() const { + return ReadSourceDatabase->id(); + } + + const TString& GetTopicOriginDatabaseName() const { + return TopicOriginDatabase->name(); + } + + const TString& GetTopicOriginDatabasePath() const { + return TopicOriginDatabase->path(); + } + + const TString& GetTopicOriginDatabaseId() const { + return TopicOriginDatabase->id(); } private: NTopic::TPartitionSession::TPtr PartitionSession; - std::shared_ptr<TDbInfo> Db; + std::shared_ptr<TDbInfo> ReadSourceDatabase; + std::shared_ptr<TDbInfo> TopicOriginDatabase; + TString TopicOriginPath; }; //! Events for read session. struct TReadSessionEvent { class TFederatedPartitionSessionAccessor { public: - TFederatedPartitionSessionAccessor(TFederatedPartitionSession::TPtr partitionSession) + explicit TFederatedPartitionSessionAccessor(TFederatedPartitionSession::TPtr partitionSession) : FederatedPartitionSession(std::move(partitionSession)) {} - TFederatedPartitionSessionAccessor(NTopic::TPartitionSession::TPtr partitionSession, std::shared_ptr<TDbInfo> db) - : FederatedPartitionSession(MakeIntrusive<TFederatedPartitionSession>(partitionSession, std::move(db))) - {} - inline const TFederatedPartitionSession::TPtr GetFederatedPartitionSession() const { return FederatedPartitionSession; } @@ -88,8 +121,8 @@ struct TReadSessionEvent { struct TFederated : public TFederatedPartitionSessionAccessor, public TEvent, public TPrintable<TFederated<TEvent>> { using TPrintable<TFederated<TEvent>>::DebugString; - TFederated(TEvent event, std::shared_ptr<TDbInfo> db) - : TFederatedPartitionSessionAccessor(event.GetPartitionSession(), db) + TFederated(TEvent event, TFederatedPartitionSession::TPtr federatedPartitionSession) + : TFederatedPartitionSessionAccessor(std::move(federatedPartitionSession)) , TEvent(std::move(event)) {} @@ -109,10 +142,7 @@ struct TReadSessionEvent { using TCompressedMessage = TFederated<NTopic::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage>; public: - TDataReceivedEvent(NTopic::TReadSessionEvent::TDataReceivedEvent event, std::shared_ptr<TDbInfo> db); - - TDataReceivedEvent(TVector<TMessage> messages, TVector<TCompressedMessage> compressedMessages, - NTopic::TPartitionSession::TPtr partitionSession, std::shared_ptr<TDbInfo> db); + TDataReceivedEvent(NTopic::TReadSessionEvent::TDataReceivedEvent event, TFederatedPartitionSession::TPtr federatedPartitionSession); const NTopic::TPartitionSession::TPtr& GetPartitionSession() const override { ythrow yexception() << "GetPartitionSession method unavailable for federated objects, use GetFederatedPartitionSession instead"; @@ -177,15 +207,6 @@ struct TReadSessionEvent { TSessionClosedEvent>; }; -template <typename TEvent> -TReadSessionEvent::TFederated<TEvent> Federate(TEvent event, std::shared_ptr<TDbInfo> db) { - return {std::move(event), std::move(db)}; -} - -TReadSessionEvent::TDataReceivedEvent Federate(NTopic::TReadSessionEvent::TDataReceivedEvent event, std::shared_ptr<TDbInfo> db); - -TReadSessionEvent::TEvent Federate(NTopic::TReadSessionEvent::TEvent event, std::shared_ptr<TDbInfo> db); - //! Set of offsets to commit. //! Class that could store offsets in order to commit them later. //! This class is not thread safe. @@ -273,7 +294,6 @@ struct TFederatedReadSessionSettings: public NTopic::TReadSessionSettings { bool GracefulStopAfterCommit; }; - //! Set simple handler with data processing and also //! set other handlers with default behaviour. //! They automatically commit data after processing @@ -290,7 +310,6 @@ struct TFederatedReadSessionSettings: public NTopic::TReadSessionSettings { //! commitDataAfterProcessing: automatically commit data after calling of dataHandler. //! gracefulReleaseAfterCommit: wait for commit acknowledgements for all inflight data before confirming //! partition session destroy. - TSimpleDataHandlers SimpleDataHandlers_; TSelf& SimpleDataHandlers(std::function<void(TReadSessionEvent::TDataReceivedEvent&)> dataHandler, @@ -368,18 +387,47 @@ struct TFederatedReadSessionSettings: public NTopic::TReadSessionSettings { //! See description in TFederatedEventHandlers class. FLUENT_SETTING(TFederatedEventHandlers, FederatedEventHandlers); - enum class EReadPolicy { - READ_ALL = 0, - READ_ORIGINAL, - READ_MIRRORED + + //! Read policy settings + + //! Databases to read from. + //! Default (empty) value means reading from all available databases. + //! Adding duplicates or unavailable databases is okay, they will be ignored. + struct TReadOriginalSettings { + //! Add reading from specified database if it's available. + TReadOriginalSettings& AddDatabase(TString database); + + //! Add reading from several specified databases, if available. + TReadOriginalSettings& AddDatabases(std::vector<TString> databases); + + //! Add reading from database(s) with the same location as client. + TReadOriginalSettings& AddLocal(); + + std::unordered_set<TString> Databases; }; - //! Policy for federated reading. - //! - //! READ_ALL: read will be done from all topic instances from all databases. - //! READ_ORIGINAL: - //! READ_MIRRORED: - FLUENT_SETTING_DEFAULT(EReadPolicy, ReadPolicy, EReadPolicy::READ_ALL); + //! Default variant. + //! Read original topics specified in NTopic::TReadSessionSettings::Topics from databases, specified in settings. + //! Discards previously set ReadOriginal and ReadMirrored settings. + TSelf& ReadOriginal(TReadOriginalSettings settings); + + //! Read original and mirrored topics specified in NTopic::TReadSessionSettings::Topics + //! from one specified database. + //! Discards previously set ReadOriginal and ReadMirrored settings. + TSelf& ReadMirrored(TString database); + + bool IsReadMirroredEnabled() { + return ReadMirroredEnabled; + } + + auto GetDatabasesToReadFrom() { + return DatabasesToReadFrom; + } + +private: + // Read policy settings, set via helpers above + bool ReadMirroredEnabled = false; + std::unordered_set<TString> DatabasesToReadFrom; }; diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp index 7563f2c85a..df0bf1c8d0 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp @@ -3,6 +3,10 @@ #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/log_lazy.h> #include <ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h> +#define INCLUDE_YDB_INTERNAL_H +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/logger/log.h> +#undef INCLUDE_YDB_INTERNAL_H + #include <library/cpp/threading/future/future.h> #include <util/generic/guid.h> @@ -11,25 +15,35 @@ namespace NYdb::NFederatedTopic { NTopic::TTopicClientSettings FromFederated(const TFederatedTopicClientSettings& settings); template <typename TEvent, typename TFederatedEvent> -typename std::function<void(TEvent&)> WrapFederatedHandler(std::function<void(TFederatedEvent&)> outerHandler, std::shared_ptr<TDbInfo> db) { +typename std::function<void(TEvent&)> WrapFederatedHandler(std::function<void(TFederatedEvent&)> outerHandler, std::shared_ptr<TDbInfo> db, std::shared_ptr<TEventFederator> federator) { if (outerHandler) { - return [outerHandler, db = std::move(db)](TEvent& ev) { - auto fev = Federate(std::move(ev), db); + return [outerHandler, db = std::move(db), &federator](TEvent& ev) { + auto fev = federator->LocateFederate(ev, std::move(db)); return outerHandler(fev); }; } return {}; } -NTopic::TReadSessionSettings FromFederated(const TFederatedReadSessionSettings& settings, const std::shared_ptr<TDbInfo>& db) { +NTopic::TReadSessionSettings FromFederated(const TFederatedReadSessionSettings& settings, const std::shared_ptr<TDbInfo>& db, std::shared_ptr<TEventFederator> federator) { NTopic::TReadSessionSettings SubsessionSettings = settings; SubsessionSettings.EventHandlers_.MaxMessagesBytes(settings.EventHandlers_.MaxMessagesBytes_); SubsessionSettings.EventHandlers_.HandlersExecutor(settings.EventHandlers_.HandlersExecutor_); -#define MAYBE_CONVERT_HANDLER(type, name) \ - SubsessionSettings.EventHandlers_.name( \ - WrapFederatedHandler<NTopic::type, type>(settings.FederatedEventHandlers_.name##_, db) \ - ); + if (settings.FederatedEventHandlers_.SimpleDataHandlers_.DataHandler) { + SubsessionSettings.EventHandlers_.SimpleDataHandlers( + WrapFederatedHandler<NTopic::TReadSessionEvent::TDataReceivedEvent, TReadSessionEvent::TDataReceivedEvent>( + settings.FederatedEventHandlers_.SimpleDataHandlers_.DataHandler, db, federator), + settings.FederatedEventHandlers_.SimpleDataHandlers_.CommitDataAfterProcessing, + settings.FederatedEventHandlers_.SimpleDataHandlers_.GracefulStopAfterCommit); + } + +#define MAYBE_CONVERT_HANDLER(type, name) \ + if (settings.FederatedEventHandlers_.name##_) { \ + SubsessionSettings.EventHandlers_.name( \ + WrapFederatedHandler<NTopic::type, type>(settings.FederatedEventHandlers_.name##_, db, federator) \ + ); \ + } MAYBE_CONVERT_HANDLER(TReadSessionEvent::TDataReceivedEvent, DataReceivedHandler); MAYBE_CONVERT_HANDLER(TReadSessionEvent::TCommitOffsetAcknowledgementEvent, CommitOffsetAcknowledgementHandler); @@ -43,31 +57,29 @@ NTopic::TReadSessionSettings FromFederated(const TFederatedReadSessionSettings& SubsessionSettings.EventHandlers_.SessionClosedHandler(settings.FederatedEventHandlers_.SessionClosedHandler_); - if (settings.FederatedEventHandlers_.SimpleDataHandlers_.DataHandler) { - SubsessionSettings.EventHandlers_.SimpleDataHandlers( - WrapFederatedHandler<NTopic::TReadSessionEvent::TDataReceivedEvent, TReadSessionEvent::TDataReceivedEvent>( - settings.FederatedEventHandlers_.SimpleDataHandlers_.DataHandler, db), - settings.FederatedEventHandlers_.SimpleDataHandlers_.CommitDataAfterProcessing, - settings.FederatedEventHandlers_.SimpleDataHandlers_.GracefulStopAfterCommit); - } - return SubsessionSettings; } TFederatedReadSessionImpl::TFederatedReadSessionImpl(const TFederatedReadSessionSettings& settings, std::shared_ptr<TGRpcConnectionsImpl> connections, - const TFederatedTopicClientSettings& clientSetttings, + const TFederatedTopicClientSettings& clientSettings, std::shared_ptr<TFederatedDbObserver> observer) : Settings(settings) , Connections(std::move(connections)) - , SubClientSetttings(FromFederated(clientSetttings)) + , SubClientSetttings(FromFederated(clientSettings)) , Observer(std::move(observer)) , AsyncInit(Observer->WaitForFirstState()) , FederationState(nullptr) + , EventFederator(std::make_shared<TEventFederator>()) + , Log(Connections->GetLog()) , SessionId(CreateGuidAsString()) { } +TStringBuilder TFederatedReadSessionImpl::GetLogPrefix() const { + return TStringBuilder() << GetDatabaseLogPrefix(SubClientSetttings.Database_.GetOrElse("")) << "[" << SessionId << "] "; +} + void TFederatedReadSessionImpl::Start() { AsyncInit.Subscribe([selfCtx = SelfContext](const auto& f){ Y_UNUSED(f); @@ -83,15 +95,14 @@ void TFederatedReadSessionImpl::Start() { }); } -void TFederatedReadSessionImpl::OpenSubSessionsImpl() { - for (const auto& db : FederationState->DbInfos) { - // TODO check if available +void TFederatedReadSessionImpl::OpenSubSessionsImpl(const std::vector<std::shared_ptr<TDbInfo>>& dbInfos) { + for (const auto& db : dbInfos) { NTopic::TTopicClientSettings settings = SubClientSetttings; settings .Database(db->path()) .DiscoveryEndpoint(db->endpoint()); auto subclient = make_shared<NTopic::TTopicClient::TImpl>(Connections, settings); - auto subsession = subclient->CreateReadSession(FromFederated(Settings, db)); + auto subsession = subclient->CreateReadSession(FromFederated(Settings, db, EventFederator)); SubSessions.emplace_back(subsession, db); } SubsessionIndex = 0; @@ -99,14 +110,80 @@ void TFederatedReadSessionImpl::OpenSubSessionsImpl() { void TFederatedReadSessionImpl::OnFederatedStateUpdateImpl() { if (!FederationState->Status.IsSuccess()) { + LOG_LAZY(Log, TLOG_ERR, GetLogPrefix() << "Federated state update failed."); + CloseImpl(); + return; + } + + EventFederator->SetFederationState(FederationState); + + if (Settings.IsReadMirroredEnabled()) { + Y_ABORT_UNLESS(Settings.GetDatabasesToReadFrom().size() == 1); + auto dbToReadFrom = *Settings.GetDatabasesToReadFrom().begin(); + + std::vector<TString> dbNames = GetAllFederationDatabaseNames(); + auto topics = Settings.Topics_; + for (const auto& topic : topics) { + for (const auto& dbName : dbNames) { + if (AsciiEqualsIgnoreCase(dbName, dbToReadFrom)) { + continue; + } + auto mirroredTopic = topic; + mirroredTopic.PartitionIds_.clear(); + mirroredTopic.Path(topic.Path_ + "-mirrored-from-" + dbName); + Settings.AppendTopics(mirroredTopic); + } + } + } + + std::vector<std::shared_ptr<TDbInfo>> databases; + + for (const auto& db : FederationState->DbInfos) { + if (IsDatabaseEligibleForRead(db)) { + databases.push_back(db); + } + } + + if (databases.empty()) { + // TODO: investigate here, why empty list? + // Reason (and returned status) could be BAD_REQUEST or UNAVAILABLE. + LOG_LAZY(Log, TLOG_ERR, GetLogPrefix() << "No available databases to read."); CloseImpl(); return; } - // 1) compare old info and new info; - // result: list of subsessions to open + list of subsessions to close - // 2) OpenSubSessionsImpl, CloseSubSessionsImpl - OpenSubSessionsImpl(); - // 3) TODO LATER reschedule OnFederatedStateUpdate + + OpenSubSessionsImpl(databases); +} + +std::vector<TString> TFederatedReadSessionImpl::GetAllFederationDatabaseNames() { + std::vector<TString> result; + for (const auto& db : FederationState->DbInfos) { + result.push_back(db->name()); + } + return result; +} + +bool TFederatedReadSessionImpl::IsDatabaseEligibleForRead(const std::shared_ptr<TDbInfo>& db) { + if (db->status() != TDbInfo::Status::DatabaseInfo_Status_AVAILABLE && + db->status() != TDbInfo::Status::DatabaseInfo_Status_READ_ONLY) { + return false; + } + + if (Settings.GetDatabasesToReadFrom().empty()) { + return true; + } + + for (const auto& dbFromSettings : Settings.GetDatabasesToReadFrom()) { + if (AsciiEqualsIgnoreCase(db->name(), dbFromSettings) || + AsciiEqualsIgnoreCase(db->id(), dbFromSettings)) { + return true; + } + if (dbFromSettings == "_local" && + AsciiEqualsIgnoreCase(FederationState->SelfLocation, db->location())) { + return true; + } + } + return false; } NThreading::TFuture<void> TFederatedReadSessionImpl::WaitEvent() { @@ -151,7 +228,7 @@ TVector<TReadSessionEvent::TEvent> TFederatedReadSessionImpl::GetEvents(bool blo do { auto sub = SubSessions[SubsessionIndex]; for (auto&& ev : sub.Session->GetEvents(false, maxEventsCount, maxByteSize)) { - result.push_back(Federate(std::move(ev), sub.DbInfo)); + result.push_back(EventFederator->LocateFederate(std::move(ev), sub.DbInfo)); } SubsessionIndex = (SubsessionIndex + 1) % SubSessions.size(); } diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h index c0444f15a2..84c208258e 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h @@ -9,6 +9,115 @@ namespace NYdb::NFederatedTopic { +class TEventFederator { +public: + auto LocateTopicOrigin(const NTopic::TReadSessionEvent::TEvent& event) { + std::shared_ptr<TDbInfo> topicOriginDbInfo; + TString topicOriginPath = ""; + + auto topicPath = std::visit([](auto&& arg) -> TStringBuf { + using T = std::decay_t<decltype(arg)>; + if constexpr (std::is_same_v<T, NTopic::TSessionClosedEvent>) { + return ""; + } else { + return arg.GetPartitionSession()->GetTopicPath(); + } + }, event); + + if (topicPath.Contains("-mirrored-from-")) { + TStringBuf leftPart, rightPart; + auto res = topicPath.TryRSplit("-mirrored-from-", leftPart, rightPart); + Y_ABORT_UNLESS(res); + + // no additional validation required: TryGetDbInfo just returns nullptr for any bad input + topicOriginDbInfo = FederationState->TryGetDbInfo(TString(rightPart)); + if (topicOriginDbInfo) { + topicOriginPath = leftPart; + } + } + + return std::make_tuple(topicOriginDbInfo, topicOriginPath); + } + + template <typename TEvent> + auto LocateFederate(TEvent&& event, std::shared_ptr<TDbInfo> db) { + NTopic::TPartitionSession::TPtr psPtr; + TFederatedPartitionSession::TPtr fps; + + using T = std::decay_t<TEvent>; + if constexpr (std::is_same_v<T, NTopic::TSessionClosedEvent>) { + return Federate(std::move(event), std::move(fps)); + } else if constexpr (std::is_same_v<T, NTopic::TReadSessionEvent::TEvent>) { + psPtr = std::visit([](auto&& arg) -> NTopic::TPartitionSession::TPtr { + using T = std::decay_t<decltype(arg)>; + if constexpr (std::is_same_v<T, NTopic::TSessionClosedEvent>) { + return nullptr; + } else { + return arg.GetPartitionSession(); + } + }, event); + + if (!psPtr) { // TSessionClosedEvent + return Federate(std::move(event), std::move(fps)); + } + } else { + psPtr = event.GetPartitionSession(); + } + + with_lock(Lock) { + if (!FederatedPartitionSessions.contains(psPtr.Get())) { + auto [topicOriginDbInfo, topicOriginPath] = LocateTopicOrigin(event); + FederatedPartitionSessions[psPtr.Get()] = MakeIntrusive<TFederatedPartitionSession>(psPtr, std::move(db), std::move(topicOriginDbInfo), std::move(topicOriginPath)); + } + fps = FederatedPartitionSessions[psPtr.Get()]; + + if constexpr (std::is_same_v<TEvent, NTopic::TReadSessionEvent::TPartitionSessionClosedEvent>) { + FederatedPartitionSessions.erase(psPtr.Get()); + } + } + + return Federate(std::move(event), std::move(fps)); + } + + template <typename TEvent> + TReadSessionEvent::TFederated<TEvent> Federate(TEvent event, TFederatedPartitionSession::TPtr federatedPartitionSession) { + return {std::move(event), std::move(federatedPartitionSession)}; + } + + TReadSessionEvent::TDataReceivedEvent Federate(NTopic::TReadSessionEvent::TDataReceivedEvent event, + TFederatedPartitionSession::TPtr federatedPartitionSession) { + return {std::move(event), std::move(federatedPartitionSession)}; + } + + TReadSessionEvent::TEvent Federate(NTopic::TReadSessionEvent::TEvent event, + TFederatedPartitionSession::TPtr federatedPartitionSession) { + return std::visit([fps = std::move(federatedPartitionSession)](auto&& arg) { + using T = std::decay_t<decltype(arg)>; + std::optional<TReadSessionEvent::TEvent> ev; + if constexpr (std::is_same_v<T, NTopic::TReadSessionEvent::TDataReceivedEvent>) { + ev = TReadSessionEvent::TDataReceivedEvent(std::move(arg), std::move(fps)); + } else if constexpr (std::is_same_v<T, NTopic::TSessionClosedEvent>) { + ev = std::move(arg); + } else { + ev = TReadSessionEvent::TFederated(std::move(arg), std::move(fps)); + } + return *ev; + }, + event); + } + + void SetFederationState(std::shared_ptr<TFederatedDbState> state) { + with_lock(Lock) { + FederationState = std::move(state); + } + } + +private: + TAdaptiveLock Lock; + std::unordered_map<NTopic::TPartitionSession*, TFederatedPartitionSession::TPtr> FederatedPartitionSessions; + std::shared_ptr<TFederatedDbState> FederationState; +}; + class TFederatedReadSessionImpl : public NPersQueue::TEnableSelfContext<TFederatedReadSessionImpl> { friend class TFederatedTopicClient::TImpl; friend class TFederatedReadSession; @@ -46,12 +155,15 @@ public: } private: - // TODO logging TStringBuilder GetLogPrefix() const; void Start(); bool ValidateSettings(); - void OpenSubSessionsImpl(); + void OpenSubSessionsImpl(const std::vector<std::shared_ptr<TDbInfo>>& dbInfos); + + std::vector<TString> GetAllFederationDatabaseNames(); + + bool IsDatabaseEligibleForRead(const std::shared_ptr<TDbInfo>& db); void OnFederatedStateUpdateImpl(); @@ -67,9 +179,9 @@ private: std::shared_ptr<TFederatedDbObserver> Observer; NThreading::TFuture<void> AsyncInit; std::shared_ptr<TFederatedDbState> FederationState; + std::shared_ptr<TEventFederator> EventFederator; - // TODO - // TLog Log; + TLog Log; const TString SessionId; const TInstant StartSessionTime = TInstant::Now(); diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp index 240fa38a80..f88836c33f 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp @@ -1,46 +1,10 @@ #include <ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h> -#include <optional> - -namespace NYdb::NFederatedTopic { - -//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// -// Helpers - -std::pair<ui64, ui64> GetMessageOffsetRange(const TReadSessionEvent::TDataReceivedEvent& dataReceivedEvent, ui64 index) { - if (dataReceivedEvent.HasCompressedMessages()) { - const auto& msg = dataReceivedEvent.GetCompressedMessages()[index]; - return {msg.GetOffset(), msg.GetOffset() + 1}; - } - const auto& msg = dataReceivedEvent.GetMessages()[index]; - return {msg.GetOffset(), msg.GetOffset() + 1}; -} - -TReadSessionEvent::TDataReceivedEvent Federate(NTopic::TReadSessionEvent::TDataReceivedEvent event, std::shared_ptr<TDbInfo> db) { - return {std::move(event), std::move(db)}; -} - -TReadSessionEvent::TEvent Federate(NTopic::TReadSessionEvent::TEvent event, std::shared_ptr<TDbInfo> db) { - return std::visit([db = std::move(db)](auto&& arg) { - using T = std::decay_t<decltype(arg)>; - std::optional<TReadSessionEvent::TEvent> ev; - if constexpr (std::is_same_v<T, NTopic::TReadSessionEvent::TDataReceivedEvent>) { - ev = TReadSessionEvent::TDataReceivedEvent(std::move(arg), std::move(db)); - } else if constexpr (std::is_same_v<T, NTopic::TSessionClosedEvent>) { - ev = std::move(arg); - } else { - ev = TReadSessionEvent::TFederated(std::move(arg), std::move(db)); - } - return *ev; - }, event); -} //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Printable specializations -} - namespace NYdb::NTopic { using namespace NFederatedTopic; @@ -155,7 +119,6 @@ void TPrintable<TDataReceivedEvent>::DebugString(TStringBuilder& ret, bool print ret << " }"; } - } namespace NYdb::NFederatedTopic { @@ -163,35 +126,17 @@ namespace NYdb::NFederatedTopic { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // NFederatedTopic::TReadSessionEvent::TDataReceivedEvent -TReadSessionEvent::TDataReceivedEvent::TDataReceivedEvent(NTopic::TReadSessionEvent::TDataReceivedEvent event, std::shared_ptr<TDbInfo> db) +TReadSessionEvent::TDataReceivedEvent::TDataReceivedEvent(NTopic::TReadSessionEvent::TDataReceivedEvent event, TFederatedPartitionSession::TPtr federatedPartitionSession) : NTopic::TReadSessionEvent::TPartitionSessionAccessor(event.GetPartitionSession()) - , TFederatedPartitionSessionAccessor(event.GetPartitionSession(), db) + , TFederatedPartitionSessionAccessor(federatedPartitionSession) { if (event.HasCompressedMessages()) { for (auto& msg : event.GetCompressedMessages()) { - CompressedMessages.emplace_back(std::move(msg), db); + CompressedMessages.emplace_back(std::move(msg), federatedPartitionSession); } } else { for (auto& msg : event.GetMessages()) { - Messages.emplace_back(std::move(msg), db); - } - } -} - -TReadSessionEvent::TDataReceivedEvent::TDataReceivedEvent( - TVector<TMessage> messages, TVector<TCompressedMessage> compressedMessages, - NTopic::TPartitionSession::TPtr partitionSession, std::shared_ptr<TDbInfo> db) - : NTopic::TReadSessionEvent::TPartitionSessionAccessor(partitionSession) - , TFederatedPartitionSessionAccessor(partitionSession, db) - , Messages(std::move(messages)) - , CompressedMessages(std::move(compressedMessages)) -{ - for (size_t i = 0; i < GetMessagesCount(); ++i) { - auto [from, to] = GetMessageOffsetRange(*this, i); - if (OffsetRanges.empty() || OffsetRanges.back().second != from) { - OffsetRanges.emplace_back(from, to); - } else { - OffsetRanges.back().second = to; + Messages.emplace_back(std::move(msg), federatedPartitionSession); } } } diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp index ca076d602d..75d7abb173 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp @@ -3,6 +3,43 @@ namespace NYdb::NFederatedTopic { +// TFederatedReadSessionSettings +// Read policy settings + +using TReadOriginalSettings = TFederatedReadSessionSettings::TReadOriginalSettings; +TReadOriginalSettings& TReadOriginalSettings::AddDatabase(TString database) { + Databases.insert(std::move(database)); + return *this; +} + +TReadOriginalSettings& TReadOriginalSettings::AddDatabases(std::vector<TString> databases) { + std::move(std::begin(databases), std::end(databases), std::inserter(Databases, Databases.end())); + return *this; +} + +TReadOriginalSettings& TReadOriginalSettings::AddLocal() { + Databases.insert("_local"); + return *this; +} + +TFederatedReadSessionSettings& TFederatedReadSessionSettings::ReadOriginal(TReadOriginalSettings settings) { + std::swap(DatabasesToReadFrom, settings.Databases); + ReadMirroredEnabled = false; + return *this; +} + +TFederatedReadSessionSettings& TFederatedReadSessionSettings::ReadMirrored(TString database) { + if (database == "_local") { + ythrow TContractViolation("Reading from local database not supported, use specific database"); + } + DatabasesToReadFrom.clear(); + DatabasesToReadFrom.insert(std::move(database)); + ReadMirroredEnabled = true; + return *this; +} + +// TFederatedTopicClient + NTopic::TTopicClientSettings FromFederated(const TFederatedTopicClientSettings& settings) { return NTopic::TTopicClientSettings() .DefaultCompressionExecutor(settings.DefaultCompressionExecutor_) diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.h index 6fc2baa80d..bf4ad2a163 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.h +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.h @@ -35,11 +35,22 @@ public: , ControlPlaneEndpoint(result.control_plane_endpoint()) , SelfLocation(result.self_location()) { - // TODO remove copy + // TODO ensure that all databases have unique names? for (const auto& db : result.federation_databases()) { DbInfos.push_back(std::make_shared<TDbInfo>(db)); } } + + std::shared_ptr<TDbInfo> TryGetDbInfo(const TString& name) const noexcept { + // There are few databases per federation usually, so the linear search is probably ok. + // TODO better profile this + for (const auto& dbInfo : DbInfos) { + if (AsciiEqualsIgnoreCase(dbInfo->name(), name)) { + return dbInfo; + } + } + return nullptr; + } }; diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp index a2780101c8..ad884e1ce7 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp @@ -520,6 +520,164 @@ Y_UNIT_TEST_SUITE(BasicUsage) { AtomicSet(check, 0); } + Y_UNIT_TEST(ReadMirrored) { + auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME, false); + setup->Start(true, true); + setup->CreateTopic(setup->GetTestTopic() + "-mirrored-from-dc2", setup->GetLocalCluster()); + setup->CreateTopic(setup->GetTestTopic() + "-mirrored-from-dc3", setup->GetLocalCluster()); + + TFederationDiscoveryServiceMock fdsMock; + fdsMock.Port = setup->GetGrpcPort(); + + ui16 newServicePort = setup->GetPortManager()->GetPort(4285); + auto grpcServer = setup->StartGrpcService(newServicePort, &fdsMock); + + std::shared_ptr<NYdb::NFederatedTopic::IFederatedReadSession> ReadSession; + + // Create topic client. + NYdb::TDriverConfig cfg; + cfg.SetEndpoint(TStringBuilder() << "localhost:" << newServicePort); + cfg.SetDatabase("/Root"); + cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)); + NYdb::TDriver driver(cfg); + auto clientSettings = TFederatedTopicClientSettings() + .RetryPolicy(NTopic::IRetryPolicy::GetFixedIntervalPolicy( + TDuration::Seconds(10), + TDuration::Seconds(10) + )); + NYdb::NFederatedTopic::TFederatedTopicClient topicClient(driver, clientSettings); + + ui64 count = 5u; + + TString messageBase = "message----"; + TVector<TString> sentMessages; + std::unordered_set<TString> sentSet; + + for (auto i = 0u; i < count; i++) { + sentMessages.emplace_back(messageBase * (10 * i + 1)); + sentSet.emplace(sentMessages.back() + "-from-dc1"); + sentSet.emplace(sentMessages.back() + "-from-dc2"); + sentSet.emplace(sentMessages.back() + "-from-dc3"); + } + + NThreading::TPromise<void> checkedPromise = NThreading::NewPromise<void>(); + auto totalReceived = 0u; + + auto f = checkedPromise.GetFuture(); + TAtomic check = 1; + + // Create read session. + NYdb::NFederatedTopic::TFederatedReadSessionSettings readSettings; + readSettings + .ReadMirrored("dc1") + .ConsumerName("shared/user") + .MaxMemoryUsageBytes(16_MB) + .AppendTopics(setup->GetTestTopic()); + + readSettings.FederatedEventHandlers_.SimpleDataHandlers([&](TReadSessionEvent::TDataReceivedEvent& ev) mutable { + Cerr << ">>> event from dataHandler: " << DebugString(ev) << Endl; + Y_VERIFY_S(AtomicGet(check) != 0, "check is false"); + auto& messages = ev.GetMessages(); + Cerr << ">>> get " << messages.size() << " messages in this event" << Endl; + for (size_t i = 0u; i < messages.size(); ++i) { + auto& message = messages[i]; + UNIT_ASSERT(message.GetFederatedPartitionSession()->GetReadSourceDatabaseName() == "dc1"); + UNIT_ASSERT(message.GetFederatedPartitionSession()->GetTopicPath() == setup->GetTestTopic()); + UNIT_ASSERT(message.GetData().EndsWith(message.GetFederatedPartitionSession()->GetTopicOriginDatabaseName())); + + UNIT_ASSERT(!sentSet.empty()); + UNIT_ASSERT_C(sentSet.erase(message.GetData()), "no such element is sentSet: " + message.GetData()); + totalReceived++; + } + if (totalReceived == 3 * sentMessages.size()) { + UNIT_ASSERT(sentSet.empty()); + checkedPromise.SetValue(); + } + }); + + ReadSession = topicClient.CreateReadSession(readSettings); + Cerr << ">>> Session was created" << Endl; + + Sleep(TDuration::MilliSeconds(50)); + + auto events = ReadSession->GetEvents(false); + UNIT_ASSERT(events.empty()); + + std::optional<TFederationDiscoveryServiceMock::TManualRequest> fdsRequest; + do { + fdsRequest = fdsMock.GetNextPendingRequest(); + if (!fdsRequest.has_value()) { + Sleep(TDuration::MilliSeconds(50)); + } + } while (!fdsRequest.has_value()); + + fdsRequest->Result.SetValue(fdsMock.ComposeOkResult()); + + { + NPersQueue::TWriteSessionSettings writeSettings; + writeSettings.Path(setup->GetTestTopic()).MessageGroupId("src_id"); + writeSettings.Codec(NPersQueue::ECodec::RAW); + NPersQueue::IExecutor::TPtr executor = new NPersQueue::TSyncExecutor(); + writeSettings.CompressionExecutor(executor); + + auto& client = setup->GetPersQueueClient(); + auto session = client.CreateSimpleBlockingWriteSession(writeSettings); + + for (auto i = 0u; i < count; i++) { + auto res = session->Write(sentMessages[i] + "-from-dc1"); + UNIT_ASSERT(res); + } + + session->Close(); + + Cerr << ">>> Writes to test-topic successful" << Endl; + } + + { + NPersQueue::TWriteSessionSettings writeSettings; + writeSettings.Path(setup->GetTestTopic() + "-mirrored-from-dc2").MessageGroupId("src_id"); + writeSettings.Codec(NPersQueue::ECodec::RAW); + NPersQueue::IExecutor::TPtr executor = new NPersQueue::TSyncExecutor(); + writeSettings.CompressionExecutor(executor); + + auto& client = setup->GetPersQueueClient(); + auto session = client.CreateSimpleBlockingWriteSession(writeSettings); + + for (auto i = 0u; i < count; i++) { + auto res = session->Write(sentMessages[i] + "-from-dc2"); + UNIT_ASSERT(res); + } + + session->Close(); + + Cerr << ">>> Writes to test-topic-mirrored-from-dc2 successful" << Endl; + } + + { + NPersQueue::TWriteSessionSettings writeSettings; + writeSettings.Path(setup->GetTestTopic() + "-mirrored-from-dc3").MessageGroupId("src_id"); + writeSettings.Codec(NPersQueue::ECodec::RAW); + NPersQueue::IExecutor::TPtr executor = new NPersQueue::TSyncExecutor(); + writeSettings.CompressionExecutor(executor); + + auto& client = setup->GetPersQueueClient(); + auto session = client.CreateSimpleBlockingWriteSession(writeSettings); + + for (auto i = 0u; i < count; i++) { + auto res = session->Write(sentMessages[i] + "-from-dc3"); + UNIT_ASSERT(res); + } + + session->Close(); + + Cerr << ">>> Writes to test-topic-mirrored-from-dc3 successful" << Endl; + } + + f.GetValueSync(); + ReadSession->Close(); + AtomicSet(check, 0); + } + Y_UNIT_TEST(BasicWriteSession) { auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>( TEST_CASE_NAME, false, ::NPersQueue::TTestServer::LOGGED_SERVICES, NActors::NLog::PRI_DEBUG, 2); diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/fds_mock.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/fds_mock.h index b0746d14b7..5e48d4c662 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/fds_mock.h +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/fds_mock.h @@ -96,6 +96,14 @@ public: c2->set_location("dc2"); c2->set_status(::Ydb::FederationDiscovery::DatabaseInfo::Status::DatabaseInfo_Status_AVAILABLE); c2->set_weight(500); + auto c3 = mockResult.add_federation_databases(); + c3->set_name("dc3"); + c3->set_path("/Root"); + c3->set_id("account-dc3"); + c3->set_endpoint("localhost:" + ToString(Port)); + c3->set_location("dc3"); + c3->set_status(::Ydb::FederationDiscovery::DatabaseInfo::Status::DatabaseInfo_Status_AVAILABLE); + c3->set_weight(500); op->mutable_result()->PackFrom(mockResult); |