diff options
author | ildar-khisam <ikhis@ydb.tech> | 2023-08-14 09:23:56 +0300 |
---|---|---|
committer | ildar-khisam <ikhis@ydb.tech> | 2023-08-14 10:05:12 +0300 |
commit | 8ab85ce5392a37c48498612fc0d696c0e7f41907 (patch) | |
tree | 0ff42f820ed0b4f5f05782b9fda9118cb056bfb8 | |
parent | 5dddb98273c83176cf673fd86e6697885e5b9a10 (diff) | |
download | ydb-8ab85ce5392a37c48498612fc0d696c0e7f41907.tar.gz |
recreate observer
recreate observer if stale
31 files changed, 2412 insertions, 0 deletions
diff --git a/ydb/public/sdk/cpp/client/CMakeLists.txt b/ydb/public/sdk/cpp/client/CMakeLists.txt index 808d118822..7d96c1f95a 100644 --- a/ydb/public/sdk/cpp/client/CMakeLists.txt +++ b/ydb/public/sdk/cpp/client/CMakeLists.txt @@ -20,6 +20,7 @@ add_subdirectory(ydb_discovery) add_subdirectory(ydb_driver) add_subdirectory(ydb_export) add_subdirectory(ydb_extension) +add_subdirectory(ydb_federated_topic) add_subdirectory(ydb_import) add_subdirectory(ydb_monitoring) add_subdirectory(ydb_operation) diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..dce7f91634 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,30 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(impl) +add_subdirectory(ut) +get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) + +add_library(cpp-client-ydb_federated_topic) +target_link_libraries(cpp-client-ydb_federated_topic PUBLIC + contrib-libs-cxxsupp + yutil + tools-enum_parser-enum_serialization_runtime + cpp-client-ydb_topic + client-ydb_federated_topic-impl +) +generate_enum_serilization(cpp-client-ydb_federated_topic + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h + INCLUDE_HEADERS + ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h +) diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..3e5d5f67b0 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.linux-aarch64.txt @@ -0,0 +1,31 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(impl) +add_subdirectory(ut) +get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) + +add_library(cpp-client-ydb_federated_topic) +target_link_libraries(cpp-client-ydb_federated_topic PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + tools-enum_parser-enum_serialization_runtime + cpp-client-ydb_topic + client-ydb_federated_topic-impl +) +generate_enum_serilization(cpp-client-ydb_federated_topic + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h + INCLUDE_HEADERS + ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h +) diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..3e5d5f67b0 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.linux-x86_64.txt @@ -0,0 +1,31 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(impl) +add_subdirectory(ut) +get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) + +add_library(cpp-client-ydb_federated_topic) +target_link_libraries(cpp-client-ydb_federated_topic PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + tools-enum_parser-enum_serialization_runtime + cpp-client-ydb_topic + client-ydb_federated_topic-impl +) +generate_enum_serilization(cpp-client-ydb_federated_topic + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h + INCLUDE_HEADERS + ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h +) diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.windows-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..dce7f91634 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.windows-x86_64.txt @@ -0,0 +1,30 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(impl) +add_subdirectory(ut) +get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) + +add_library(cpp-client-ydb_federated_topic) +target_link_libraries(cpp-client-ydb_federated_topic PUBLIC + contrib-libs-cxxsupp + yutil + tools-enum_parser-enum_serialization_runtime + cpp-client-ydb_topic + client-ydb_federated_topic-impl +) +generate_enum_serilization(cpp-client-ydb_federated_topic + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h + INCLUDE_HEADERS + ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h +) diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h new file mode 100644 index 0000000000..299210ea6c --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h @@ -0,0 +1,362 @@ +#pragma once + +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> + +#include <ydb/public/api/protos/ydb_federation_discovery.pb.h> + +namespace NYdb::NFederatedTopic { + +using NTopic::TPrintable; +using TDbInfo = Ydb::FederationDiscovery::DatabaseInfo; + +//! Federated partition session. +struct TFederatedPartitionSession : public TThrRefBase, public TPrintable<TFederatedPartitionSession> { + using TPtr = TIntrusivePtr<TFederatedPartitionSession>; + +public: + TFederatedPartitionSession(const NTopic::TPartitionSession::TPtr& partitionSession, std::shared_ptr<TDbInfo> db) + : PartitionSession(partitionSession) + , Db(std::move(db)) + {} + + //! Request partition session status. + //! Result will come to TPartitionSessionStatusEvent. + void RequestStatus() { + return PartitionSession->RequestStatus(); + } + + //! + //! Properties. + //! + + //! Unique identifier of partition session. + //! It is unique within one read session. + ui64 GetPartitionSessionId() const { + return PartitionSession->GetPartitionSessionId(); + } + + //! Topic path. + const TString& GetTopicPath() const { + return PartitionSession->GetTopicPath(); + } + + //! Partition id. + ui64 GetPartitionId() const { + return PartitionSession->GetPartitionId(); + } + + const TString& GetDatabaseName() const { + return Db->name(); + } + + const TString& GetDatabasePath() const { + return Db->path(); + } + + const TString& GetDatabaseId() const { + return Db->id(); + } + +private: + NTopic::TPartitionSession::TPtr PartitionSession; + std::shared_ptr<TDbInfo> Db; +}; + +//! Events for read session. +struct TReadSessionEvent { + class TFederatedPartitionSessionAccessor { + public: + TFederatedPartitionSessionAccessor(TFederatedPartitionSession::TPtr partitionSession) + : FederatedPartitionSession(std::move(partitionSession)) + {} + + TFederatedPartitionSessionAccessor(NTopic::TPartitionSession::TPtr partitionSession, std::shared_ptr<TDbInfo> db) + : FederatedPartitionSession(MakeIntrusive<TFederatedPartitionSession>(partitionSession, std::move(db))) + {} + + inline const TFederatedPartitionSession::TPtr GetFederatedPartitionSession() const { + return FederatedPartitionSession; + } + + protected: + TFederatedPartitionSession::TPtr FederatedPartitionSession; + }; + + template <typename TEvent> + struct TFederated : public TFederatedPartitionSessionAccessor, public TEvent, public TPrintable<TFederated<TEvent>> { + using TPrintable<TFederated<TEvent>>::DebugString; + + TFederated(TEvent event, std::shared_ptr<TDbInfo> db) + : TFederatedPartitionSessionAccessor(event.GetPartitionSession(), db) + , TEvent(std::move(event)) + {} + + const NTopic::TPartitionSession::TPtr& GetPartitionSession() const override { + ythrow yexception() << "GetPartitionSession() method unavailable for federated objects, use GetFederatedPartitionSession() instead"; + } + }; + + using TCommitOffsetAcknowledgementEvent = TFederated<NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>; + using TStartPartitionSessionEvent = TFederated<NTopic::TReadSessionEvent::TStartPartitionSessionEvent>; + using TStopPartitionSessionEvent = TFederated<NTopic::TReadSessionEvent::TStopPartitionSessionEvent>; + using TPartitionSessionStatusEvent = TFederated<NTopic::TReadSessionEvent::TPartitionSessionStatusEvent>; + using TPartitionSessionClosedEvent = TFederated<NTopic::TReadSessionEvent::TPartitionSessionClosedEvent>; + + struct TDataReceivedEvent : public NTopic::TReadSessionEvent::TPartitionSessionAccessor, public TFederatedPartitionSessionAccessor, public TPrintable<TDataReceivedEvent> { + using TMessage = TFederated<NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage>; + using TCompressedMessage = TFederated<NTopic::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage>; + + public: + TDataReceivedEvent(NTopic::TReadSessionEvent::TDataReceivedEvent event, std::shared_ptr<TDbInfo> db); + + TDataReceivedEvent(TVector<TMessage> messages, TVector<TCompressedMessage> compressedMessages, + NTopic::TPartitionSession::TPtr partitionSession, std::shared_ptr<TDbInfo> db); + + const NTopic::TPartitionSession::TPtr& GetPartitionSession() const override { + ythrow yexception() << "GetPartitionSession() method unavailable for federated objects, use GetFederatedPartitionSession() instead"; + } + + bool HasCompressedMessages() const { + return !CompressedMessages.empty(); + } + + size_t GetMessagesCount() const { + return Messages.size() + CompressedMessages.size(); + } + + //! Get messages. + TVector<TMessage>& GetMessages() { + CheckMessagesFilled(false); + return Messages; + } + + const TVector<TMessage>& GetMessages() const { + CheckMessagesFilled(false); + return Messages; + } + + //! Get compressed messages. + TVector<TCompressedMessage>& GetCompressedMessages() { + CheckMessagesFilled(true); + return CompressedMessages; + } + + const TVector<TCompressedMessage>& GetCompressedMessages() const { + CheckMessagesFilled(true); + return CompressedMessages; + } + + //! Commits all messages in batch. + void Commit(); + + private: + void CheckMessagesFilled(bool compressed) const { + Y_VERIFY(!Messages.empty() || !CompressedMessages.empty()); + if (compressed && CompressedMessages.empty()) { + ythrow yexception() << "cannot get compressed messages, parameter decompress=true for read session"; + } + if (!compressed && Messages.empty()) { + ythrow yexception() << "cannot get decompressed messages, parameter decompress=false for read session"; + } + } + + private: + TVector<TMessage> Messages; + TVector<TCompressedMessage> CompressedMessages; + std::vector<std::pair<ui64, ui64>> OffsetRanges; + }; + + using TEvent = std::variant<TDataReceivedEvent, + TCommitOffsetAcknowledgementEvent, + TStartPartitionSessionEvent, + TStopPartitionSessionEvent, + TPartitionSessionStatusEvent, + TPartitionSessionClosedEvent, + NTopic::TSessionClosedEvent>; +}; + +TReadSessionEvent::TEvent Federate(NTopic::TReadSessionEvent::TEvent event, std::shared_ptr<TDbInfo> db); + +//! Set of offsets to commit. +//! Class that could store offsets in order to commit them later. +//! This class is not thread safe. +class TDeferredCommit { +public: + //! Add message to set. + void Add(const TReadSessionEvent::TDataReceivedEvent::TMessage& message); + + //! Add all messages from dataReceivedEvent to set. + void Add(const TReadSessionEvent::TDataReceivedEvent& dataReceivedEvent); + + //! Add offsets range to set. + void Add(const TFederatedPartitionSession& partitionSession, ui64 startOffset, ui64 endOffset); + + //! Add offset to set. + void Add(const TFederatedPartitionSession& partitionSession, ui64 offset); + + //! Commit all added offsets. + void Commit(); + + TDeferredCommit(); + TDeferredCommit(const TDeferredCommit&) = delete; + TDeferredCommit(TDeferredCommit&&); + TDeferredCommit& operator=(const TDeferredCommit&) = delete; + TDeferredCommit& operator=(TDeferredCommit&&); + + ~TDeferredCommit(); + +private: + class TImpl; + THolder<TImpl> Impl; +}; + +//! Event debug string. +TString DebugString(const TReadSessionEvent::TEvent& event); + + +//! Settings for federated write session. +struct TFederatedWriteSessionSettings : public NTopic::TWriteSessionSettings { + using TSelf = TFederatedWriteSessionSettings; + + //! Preferred database + //! If specified database is unavailable, session will write to other database. + FLUENT_SETTING_OPTIONAL(TString, PreferredDatabase); + + //! Write to other databases if there are problems with connection + //! to the preferred one. + FLUENT_SETTING_DEFAULT(bool, AllowFallback, true); + + TFederatedWriteSessionSettings() = default; + TFederatedWriteSessionSettings(const TFederatedWriteSessionSettings&) = default; + TFederatedWriteSessionSettings(TFederatedWriteSessionSettings&&) = default; + TFederatedWriteSessionSettings(const TString& path, const TString& producerId, const TString& messageGroupId) + : NTopic::TWriteSessionSettings(path, producerId, messageGroupId) { + } + + TFederatedWriteSessionSettings& operator=(const TFederatedWriteSessionSettings&) = default; + TFederatedWriteSessionSettings& operator=(TFederatedWriteSessionSettings&&) = default; +}; + +//! Settings for read session. +struct TFederatedReadSessionSettings: public NTopic::TReadSessionSettings { + using TSelf = TFederatedReadSessionSettings; + + enum class EReadPolicy { + READ_ALL = 0, + READ_ORIGINAL, + READ_MIRRORED + }; + + // optional for read_mirrored case ? + + //! Policy for federated reading. + //! + //! READ_ALL: read will be done from all topic instances from all databases. + //! READ_ORIGINAL: + //! READ_MIRRORED: + FLUENT_SETTING_DEFAULT(EReadPolicy, ReadPolicy, EReadPolicy::READ_ALL); +}; + + + +class IFederatedReadSession { +public: + //! Main reader loop. + //! Wait for next reader event. + virtual NThreading::TFuture<void> WaitEvent() = 0; + + //! Main reader loop. + //! Get read session events. + //! Blocks until event occurs if "block" is set. + //! + //! maxEventsCount: maximum events count in batch. + //! maxByteSize: total size limit of data messages in batch. + //! block: block until event occurs. + //! + //! If maxEventsCount is not specified, + //! read session chooses event batch size automatically. + virtual TVector<TReadSessionEvent::TEvent> + GetEvents(bool block = false, TMaybe<size_t> maxEventsCount = Nothing(), + size_t maxByteSize = std::numeric_limits<size_t>::max()) = 0; + + //! Get single event. + virtual TMaybe<TReadSessionEvent::TEvent> + GetEvent(bool block = false, size_t maxByteSize = std::numeric_limits<size_t>::max()) = 0; + + //! Close read session. + //! Waits for all commit acknowledgments to arrive. + //! Force close after timeout. + //! This method is blocking. + //! When session is closed, + //! TSessionClosedEvent arrives. + virtual bool Close(TDuration timeout = TDuration::Max()) = 0; + + //! Reader counters with different stats (see TReaderConuters). + virtual NTopic::TReaderCounters::TPtr GetCounters() const = 0; + + //! Get unique identifier of read session. + virtual TString GetSessionId() const = 0; + + virtual ~IFederatedReadSession() = default; +}; + +struct TFederatedTopicClientSettings : public TCommonClientSettingsBase<TFederatedTopicClientSettings> { + using TSelf = TFederatedTopicClientSettings; + + //! Default executor for compression tasks. + FLUENT_SETTING_DEFAULT(NTopic::IExecutor::TPtr, DefaultCompressionExecutor, NTopic::CreateThreadPoolExecutor(2)); + + //! Default executor for callbacks. + FLUENT_SETTING_DEFAULT(NTopic::IExecutor::TPtr, DefaultHandlersExecutor, NTopic::CreateThreadPoolExecutor(1)); + + //! Connection timeoout for federation discovery. + FLUENT_SETTING_DEFAULT(TDuration, ConnectionTimeout, TDuration::Seconds(30)); + + //! Retry policy enables automatic retries for non-fatal errors. + FLUENT_SETTING_DEFAULT(NTopic::IRetryPolicy::TPtr, RetryPolicy, NTopic::IRetryPolicy::GetDefaultPolicy()); +}; + +class TFederatedTopicClient { +public: + class TImpl; + + // executors from settings are passed to subclients + TFederatedTopicClient(const TDriver& driver, const TFederatedTopicClientSettings& settings = {}); + + //! Create read session. + std::shared_ptr<IFederatedReadSession> CreateFederatedReadSession(const TFederatedReadSessionSettings& settings); + + //! Create write session. + // std::shared_ptr<NTopic::ISimpleBlockingWriteSession> CreateSimpleBlockingFederatedWriteSession(const TFederatedWriteSessionSettings& settings); + // std::shared_ptr<NTopic::IWriteSession> CreateFederatedWriteSession(const TFederatedWriteSessionSettings& settings); + +private: + std::shared_ptr<TImpl> Impl_; +}; + +} // namespace NYdb::NFederatedTopic + +namespace NYdb::NTopic { + +using namespace NFederatedTopic; + +template<> +void TPrintable<TFederatedPartitionSession>::DebugString(TStringBuilder& res, bool) const; +template<> +void TPrintable<NFederatedTopic::TReadSessionEvent::TDataReceivedEvent>::DebugString(TStringBuilder& res, bool) const; +template<> +void TPrintable<NFederatedTopic::TReadSessionEvent::TFederated<NFederatedTopic::TReadSessionEvent::TDataReceivedEvent::TMessage>>::DebugString(TStringBuilder& res, bool) const; +template<> +void TPrintable<NFederatedTopic::TReadSessionEvent::TFederated<NFederatedTopic::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage>>::DebugString(TStringBuilder& res, bool) const; +template<> +void TPrintable<NFederatedTopic::TReadSessionEvent::TFederated<NFederatedTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>>::DebugString(TStringBuilder& res, bool) const; +template<> +void TPrintable<NFederatedTopic::TReadSessionEvent::TFederated<NFederatedTopic::TReadSessionEvent::TStartPartitionSessionEvent>>::DebugString(TStringBuilder& res, bool) const; +template<> +void TPrintable<NFederatedTopic::TReadSessionEvent::TFederated<NFederatedTopic::TReadSessionEvent::TStopPartitionSessionEvent>>::DebugString(TStringBuilder& res, bool) const; +template<> +void TPrintable<NFederatedTopic::TReadSessionEvent::TFederated<NFederatedTopic::TReadSessionEvent::TPartitionSessionStatusEvent>>::DebugString(TStringBuilder& res, bool) const; +template<> +void TPrintable<NFederatedTopic::TReadSessionEvent::TFederated<NFederatedTopic::TReadSessionEvent::TPartitionSessionClosedEvent>>::DebugString(TStringBuilder& res, bool) const; + +} diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..6b9ea01e31 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,33 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(client-ydb_federated_topic-impl) +target_link_libraries(client-ydb_federated_topic-impl PUBLIC + contrib-libs-cxxsupp + yutil + cpp-grpc-client + cpp-monlib-dynamic_counters + cpp-monlib-metrics + cpp-string_utils-url + library-persqueue-obfuscate + api-grpc-draft + api-grpc + impl-ydb_internal-make_request + client-ydb_common_client-impl + cpp-client-ydb_driver + client-ydb_persqueue_core-impl + cpp-client-ydb_proto +) +target_sources(client-ydb_federated_topic-impl PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp +) diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..00d0883659 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.linux-aarch64.txt @@ -0,0 +1,34 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(client-ydb_federated_topic-impl) +target_link_libraries(client-ydb_federated_topic-impl PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-grpc-client + cpp-monlib-dynamic_counters + cpp-monlib-metrics + cpp-string_utils-url + library-persqueue-obfuscate + api-grpc-draft + api-grpc + impl-ydb_internal-make_request + client-ydb_common_client-impl + cpp-client-ydb_driver + client-ydb_persqueue_core-impl + cpp-client-ydb_proto +) +target_sources(client-ydb_federated_topic-impl PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp +) diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..00d0883659 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.linux-x86_64.txt @@ -0,0 +1,34 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(client-ydb_federated_topic-impl) +target_link_libraries(client-ydb_federated_topic-impl PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-grpc-client + cpp-monlib-dynamic_counters + cpp-monlib-metrics + cpp-string_utils-url + library-persqueue-obfuscate + api-grpc-draft + api-grpc + impl-ydb_internal-make_request + client-ydb_common_client-impl + cpp-client-ydb_driver + client-ydb_persqueue_core-impl + cpp-client-ydb_proto +) +target_sources(client-ydb_federated_topic-impl PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp +) diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.windows-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..6b9ea01e31 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.windows-x86_64.txt @@ -0,0 +1,33 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(client-ydb_federated_topic-impl) +target_link_libraries(client-ydb_federated_topic-impl PUBLIC + contrib-libs-cxxsupp + yutil + cpp-grpc-client + cpp-monlib-dynamic_counters + cpp-monlib-metrics + cpp-string_utils-url + library-persqueue-obfuscate + api-grpc-draft + api-grpc + impl-ydb_internal-make_request + client-ydb_common_client-impl + cpp-client-ydb_driver + client-ydb_persqueue_core-impl + cpp-client-ydb_proto +) +target_sources(client-ydb_federated_topic-impl PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp +) diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp new file mode 100644 index 0000000000..e507617ec8 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp @@ -0,0 +1,130 @@ +#include "federated_read_session.h" + +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/log_lazy.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h> + +#include <library/cpp/threading/future/future.h> +#include <util/generic/guid.h> + +namespace NYdb::NFederatedTopic { + +NTopic::TTopicClientSettings FromFederated(const TFederatedTopicClientSettings& settings); + +TFederatedReadSession::TFederatedReadSession(const TFederatedReadSessionSettings& settings, + std::shared_ptr<TGRpcConnectionsImpl> connections, + const TFederatedTopicClientSettings& clientSetttings, + std::shared_ptr<TFederatedDbObserver> observer) + : Settings(settings) + , Connections(std::move(connections)) + , SubClientSetttings(FromFederated(clientSetttings)) + , Observer(std::move(observer)) + , AsyncInit(Observer->WaitForFirstState()) + , FederationState(nullptr) + , SessionId(CreateGuidAsString()) +{ +} + +void TFederatedReadSession::Start() { + AsyncInit.Subscribe([self = shared_from_this()](const auto& f){ + Y_UNUSED(f); + with_lock(self->Lock) { + self->FederationState = self->Observer->GetState(); + self->OnFederatedStateUpdateImpl(); + } + }); +} + +void TFederatedReadSession::OpenSubSessionsImpl() { + for (const auto& db : FederationState->DbInfos) { + // TODO check if available + NTopic::TTopicClientSettings settings = SubClientSetttings; + settings + .Database(db->path()) + .DiscoveryEndpoint(db->endpoint()); + auto subclient = make_shared<NTopic::TTopicClient::TImpl>(Connections, settings); + auto subsession = subclient->CreateReadSession(Settings); + SubSessions.emplace_back(subsession, db); + } + SubsessionIndex = 0; +} + +void TFederatedReadSession::OnFederatedStateUpdateImpl() { + if (!FederationState->Status.IsSuccess()) { + CloseImpl(); + return; + } + // 1) compare old info and new info; + // result: list of subsessions to open + list of subsessions to close + // 2) OpenSubSessionsImpl, CloseSubSessionsImpl + OpenSubSessionsImpl(); + // 3) TODO LATER reschedule OnFederatedStateUpdate +} + +NThreading::TFuture<void> TFederatedReadSession::WaitEvent() { + // TODO override with read session settings timeout + return AsyncInit.Apply([self = shared_from_this()](const NThreading::TFuture<void>) { + if (self->Closing) { + return NThreading::MakeFuture(); + } + std::vector<NThreading::TFuture<void>> waiters; + with_lock(self->Lock) { + Y_VERIFY(!self->SubSessions.empty(), "SubSessions empty in discovered state"); + for (const auto& sub : self->SubSessions) { + waiters.emplace_back(sub.Session->WaitEvent()); + } + } + return NThreading::WaitAny(std::move(waiters)); + }); +} + +TVector<TReadSessionEvent::TEvent> TFederatedReadSession::GetEvents(bool block, TMaybe<size_t> maxEventsCount, size_t maxByteSize) { + if (block) { + WaitEvent().Wait(); + } + with_lock(Lock) { + if (Closing) { + // TODO correct conversion + return {NTopic::TSessionClosedEvent(FederationState->Status.GetStatus(), {})}; + } + // TODO!!! handle aborting or closing state + // via handler on SessionClosedEvent { + // cancel all subsessions, empty SubSessions, set aborting + // } + if (SubSessions.empty()) { + return {}; + } + } + TVector<TReadSessionEvent::TEvent> result; + with_lock(Lock) { + do { + auto sub = SubSessions[SubsessionIndex]; + // TODO remove copy + for (auto&& ev : sub.Session->GetEvents(false, maxEventsCount, maxByteSize)) { + result.push_back(Federate(ev, sub.DbInfo)); + } + SubsessionIndex = (SubsessionIndex + 1) % SubSessions.size(); + } + while (block && result.empty()); + } + return result; +} + +TMaybe<TReadSessionEvent::TEvent> TFederatedReadSession::GetEvent(bool block, size_t maxByteSize) { + auto events = GetEvents(block, 1, maxByteSize); + return events.empty() ? Nothing() : TMaybe<TReadSessionEvent::TEvent>{std::move(events.front())}; +} + +void TFederatedReadSession::CloseImpl() { + Closing = true; +} + +bool TFederatedReadSession::Close(TDuration timeout) { + bool result = true; + for (const auto& sub : SubSessions) { + // TODO substract from user timeout + result = sub.Session->Close(timeout); + } + return result; +} + +} // namespace NYdb::NFederatedTopic diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h new file mode 100644 index 0000000000..7e11903a46 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h @@ -0,0 +1,96 @@ +#pragma once + +#include <ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.h> + +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/impl_tracker.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h> + +#include <ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h> + +namespace NYdb::NFederatedTopic { + +class TFederatedReadSession : public IFederatedReadSession, + public std::enable_shared_from_this<TFederatedReadSession> { + friend class TFederatedTopicClient::TImpl; + +private: + struct TSubSession { + TSubSession(std::shared_ptr<NTopic::IReadSession> session = {}, std::shared_ptr<TDbInfo> dbInfo = {}) + : Session(std::move(session)) + , DbInfo(std::move(dbInfo)) + {} + + std::shared_ptr<NTopic::IReadSession> Session; + std::shared_ptr<TDbInfo> DbInfo; + }; + +public: + TFederatedReadSession(const TFederatedReadSessionSettings& settings, + std::shared_ptr<TGRpcConnectionsImpl> connections, + const TFederatedTopicClientSettings& clientSetttings, + std::shared_ptr<TFederatedDbObserver> observer); + + ~TFederatedReadSession() = default; + + NThreading::TFuture<void> WaitEvent() override; + TVector<TReadSessionEvent::TEvent> GetEvents(bool block, TMaybe<size_t> maxEventsCount, size_t maxByteSize) override; + TMaybe<TReadSessionEvent::TEvent> GetEvent(bool block, size_t maxByteSize) override; + + bool Close(TDuration timeout) override; + + inline TString GetSessionId() const override { + return SessionId; + } + + inline NTopic::TReaderCounters::TPtr GetCounters() const override { + return Settings.Counters_; // Always not nullptr. + } + +private: + // TODO logging + TStringBuilder GetLogPrefix() const; + + void Start(); + bool ValidateSettings(); + void OpenSubSessionsImpl(); + + void OnFederatedStateUpdateImpl(); + + void Abort(); + void CloseImpl(); + + void ClearAllEvents(); + + // TODO Counters + // void MakeCountersIfNeeded(); + // void DumpCountersToLog(size_t timeNumber = 0); + // void ScheduleDumpCountersToLog(size_t timeNumber = 0); + +private: + // For subsessions creation + const NTopic::TReadSessionSettings Settings; + std::shared_ptr<TGRpcConnectionsImpl> Connections; + const NTopic::TTopicClientSettings SubClientSetttings; + + std::shared_ptr<TFederatedDbObserver> Observer; + NThreading::TFuture<void> AsyncInit; + std::shared_ptr<TFederatedDbState> FederationState; + + // TODO + // TLog Log; + + const TString SessionId; + const TInstant StartSessionTime = TInstant::Now(); + + TAdaptiveLock Lock; + + std::vector<TSubSession> SubSessions; + size_t SubsessionIndex = 0; + + // NGrpc::IQueueClientContextPtr DumpCountersContext; + + // Exiting. + bool Closing = false; +}; + +} // namespace NYdb::NFederatedTopic diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp new file mode 100644 index 0000000000..0b6ab25113 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp @@ -0,0 +1,208 @@ +#include <ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h> + +namespace NYdb::NFederatedTopic { + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// Helpers + +std::pair<ui64, ui64> GetMessageOffsetRange(const TReadSessionEvent::TDataReceivedEvent& dataReceivedEvent, ui64 index) { + if (dataReceivedEvent.HasCompressedMessages()) { + const auto& msg = dataReceivedEvent.GetCompressedMessages()[index]; + return {msg.GetOffset(), msg.GetOffset() + 1}; + } + const auto& msg = dataReceivedEvent.GetMessages()[index]; + return {msg.GetOffset(), msg.GetOffset() + 1}; +} + +TReadSessionEvent::TEvent Federate(NTopic::TReadSessionEvent::TEvent event, std::shared_ptr<TDbInfo> db) { + if (auto* ev = std::get_if<NTopic::TReadSessionEvent::TDataReceivedEvent>(&event)) { + return TReadSessionEvent::TDataReceivedEvent(std::move(*ev), db); + } else if (auto* ev = std::get_if<NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(&event)) { + return TReadSessionEvent::TFederated(*ev, db); + } else if (auto* ev = std::get_if<NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&event)) { + return TReadSessionEvent::TFederated(*ev, db); + } else if (auto* ev = std::get_if<NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&event)) { + return TReadSessionEvent::TFederated(*ev, db); + } else if (auto* ev = std::get_if<NTopic::TReadSessionEvent::TPartitionSessionStatusEvent>(&event)) { + return TReadSessionEvent::TFederated(*ev, db); + } else if (auto* ev = std::get_if<NTopic::TReadSessionEvent::TPartitionSessionClosedEvent>(&event)) { + return TReadSessionEvent::TFederated(*ev, db); + } else if (auto* ev = std::get_if<NTopic::TSessionClosedEvent>(&event)) { + return *ev; + } else { + Y_UNREACHABLE(); + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// Printable specializations + +} + +namespace NYdb::NTopic { + +using namespace NFederatedTopic; + +using TCommitOffsetAcknowledgementEvent = NFederatedTopic::TReadSessionEvent::TFederated<NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>; +using TStartPartitionSessionEvent = NFederatedTopic::TReadSessionEvent::TFederated<NTopic::TReadSessionEvent::TStartPartitionSessionEvent>; +using TStopPartitionSessionEvent = NFederatedTopic::TReadSessionEvent::TFederated<NTopic::TReadSessionEvent::TStopPartitionSessionEvent>; +using TPartitionSessionStatusEvent = NFederatedTopic::TReadSessionEvent::TFederated<NTopic::TReadSessionEvent::TPartitionSessionStatusEvent>; +using TPartitionSessionClosedEvent = NFederatedTopic::TReadSessionEvent::TFederated<NTopic::TReadSessionEvent::TPartitionSessionClosedEvent>; +using TMessage = NFederatedTopic::TReadSessionEvent::TFederated<NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage>; +using TCompressedMessage = NFederatedTopic::TReadSessionEvent::TFederated<NTopic::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage>; +using TDataReceivedEvent = NFederatedTopic::TReadSessionEvent::TDataReceivedEvent; + +template<> +void TPrintable<TFederatedPartitionSession>::DebugString(TStringBuilder& res, bool) const { + const auto* self = static_cast<const TFederatedPartitionSession*>(this); + res << " Partition session id: " << self->GetPartitionSessionId() + << " Topic: \"" << self->GetTopicPath() << "\"" + << " Partition: " << self->GetPartitionId() + << " Database name: " << self->GetDatabaseName() + << " Database path: " << self->GetDatabasePath() + << " Database id: " << self->GetDatabaseId(); +} + +template<> +void TPrintable<TMessage>::DebugString(TStringBuilder& ret, bool printData) const { + const auto* self = static_cast<const TMessage*>(this); + ret << "Message {"; + auto ptr = dynamic_cast<const TReadSessionEvent::TDataReceivedEvent::TMessageBase*>(self); + Y_VERIFY(ptr); + ptr->DebugString(ret, printData); + self->GetFederatedPartitionSession()->DebugString(ret); + ret << " }"; +} + +template<> +void TPrintable<TCompressedMessage>::DebugString(TStringBuilder& ret, bool printData) const { + const auto* self = static_cast<const TCompressedMessage*>(this); + ret << "CompressedMessage {"; + static_cast<const TReadSessionEvent::TDataReceivedEvent::TMessageBase*>(self)->DebugString(ret, printData); + self->GetFederatedPartitionSession()->DebugString(ret); + ret << " Codec: " << self->GetCodec() + << " Uncompressed size: " << self->GetUncompressedSize() + << " }"; +} + +template<> +void TPrintable<TCommitOffsetAcknowledgementEvent>::DebugString(TStringBuilder& ret, bool) const { + const auto* self = static_cast<const TCommitOffsetAcknowledgementEvent*>(this); + ret << "CommitAcknowledgement {"; + self->GetFederatedPartitionSession()->DebugString(ret); + ret << " CommittedOffset: " << self->GetCommittedOffset() + << " }"; +} + +template<> +void TPrintable<TStartPartitionSessionEvent>::DebugString(TStringBuilder& ret, bool) const { + const auto* self = static_cast<const TStartPartitionSessionEvent*>(this); + ret << "StartPartitionSession {"; + self->GetFederatedPartitionSession()->DebugString(ret); + ret << " CommittedOffset: " << self->GetCommittedOffset() + << " EndOffset: " << self->GetEndOffset() + << " }"; +} + +template<> +void TPrintable<TStopPartitionSessionEvent>::DebugString(TStringBuilder& ret, bool) const { + const auto* self = static_cast<const TStopPartitionSessionEvent*>(this); + ret << "StopPartitionSession {"; + self->GetFederatedPartitionSession()->DebugString(ret); + ret << " CommittedOffset: " << self->GetCommittedOffset() + << " }"; +} + +template<> +void TPrintable<TPartitionSessionStatusEvent>::DebugString(TStringBuilder& ret, bool) const { + const auto* self = static_cast<const TPartitionSessionStatusEvent*>(this); + ret << "PartitionSessionStatus {"; + self->GetFederatedPartitionSession()->DebugString(ret); + ret << " CommittedOffset: " << self->GetCommittedOffset() + << " ReadOffset: " << self->GetReadOffset() + << " EndOffset: " << self->GetEndOffset() + << " WriteWatermark: " << self->GetWriteTimeHighWatermark() + << " }"; +} + +template<> +void TPrintable<TPartitionSessionClosedEvent>::DebugString(TStringBuilder& ret, bool) const { + const auto* self = static_cast<const TPartitionSessionClosedEvent*>(this); + ret << "PartitionSessionClosed {"; + self->GetFederatedPartitionSession()->DebugString(ret); + ret << " Reason: " << self->GetReason() + << " }"; +} + +template<> +void TPrintable<TDataReceivedEvent>::DebugString(TStringBuilder& ret, bool printData) const { + const auto* self = static_cast<const TDataReceivedEvent*>(this); + ret << "DataReceived {"; + self->GetFederatedPartitionSession()->DebugString(ret); + if (self->HasCompressedMessages()) { + for (const auto& message : self->GetCompressedMessages()) { + ret << " "; + message.DebugString(ret, printData); + } + } else { + for (const auto& message : self->GetMessages()) { + ret << " "; + message.DebugString(ret, printData); + } + } + ret << " }"; +} + + +} + +namespace NYdb::NFederatedTopic { + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// NFederatedTopic::TReadSessionEvent::TDataReceivedEvent + +TReadSessionEvent::TDataReceivedEvent::TDataReceivedEvent(NTopic::TReadSessionEvent::TDataReceivedEvent event, std::shared_ptr<TDbInfo> db) + : NTopic::TReadSessionEvent::TPartitionSessionAccessor(event.GetPartitionSession()) + , TFederatedPartitionSessionAccessor(event.GetPartitionSession(), db) +{ + if (event.HasCompressedMessages()) { + for (auto& msg : event.GetCompressedMessages()) { + CompressedMessages.emplace_back(std::move(msg), db); + } + } else { + for (auto& msg : event.GetMessages()) { + Messages.emplace_back(std::move(msg), db); + } + } +} + +TReadSessionEvent::TDataReceivedEvent::TDataReceivedEvent( + TVector<TMessage> messages, TVector<TCompressedMessage> compressedMessages, + NTopic::TPartitionSession::TPtr partitionSession, std::shared_ptr<TDbInfo> db) + : NTopic::TReadSessionEvent::TPartitionSessionAccessor(partitionSession) + , TFederatedPartitionSessionAccessor(partitionSession, db) + , Messages(std::move(messages)) + , CompressedMessages(std::move(compressedMessages)) +{ + for (size_t i = 0; i < GetMessagesCount(); ++i) { + auto [from, to] = GetMessageOffsetRange(*this, i); + if (OffsetRanges.empty() || OffsetRanges.back().second != from) { + OffsetRanges.emplace_back(from, to); + } else { + OffsetRanges.back().second = to; + } + } +} + +void TReadSessionEvent::TDataReceivedEvent::Commit() { + for (auto [from, to] : OffsetRanges) { + static_cast<NPersQueue::TPartitionStreamImpl<false>*>(PartitionSession.Get())->Commit(from, to); + } +} + +TString DebugString(const TReadSessionEvent::TEvent& event) { + return std::visit([](const auto& ev) { return ev.DebugString(); }, event); +} + +} // namespace NYdb::NFederatedTopic diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp new file mode 100644 index 0000000000..fa135df282 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp @@ -0,0 +1,30 @@ +#include <ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h> +#include <ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.h> + +namespace NYdb::NFederatedTopic { + +NTopic::TTopicClientSettings FromFederated(const TFederatedTopicClientSettings& settings) { + return NTopic::TTopicClientSettings() + .DefaultCompressionExecutor(settings.DefaultCompressionExecutor_) + .DefaultHandlersExecutor(settings.DefaultHandlersExecutor_); +} + +TFederatedTopicClient::TFederatedTopicClient(const TDriver& driver, const TFederatedTopicClientSettings& settings) + : Impl_(std::make_shared<TImpl>(CreateInternalInterface(driver), settings)) +{ +} + +std::shared_ptr<IFederatedReadSession> TFederatedTopicClient::CreateFederatedReadSession(const TFederatedReadSessionSettings& settings) { + return Impl_->CreateFederatedReadSession(settings); +} + +// std::shared_ptr<NTopic::ISimpleBlockingWriteSession> TFederatedTopicClient::CreateSimpleBlockingFederatedWriteSession( +// const TFederatedWriteSessionSettings& settings) { +// return Impl_->CreateSimpleFederatedWriteSession(settings); +// } + +// std::shared_ptr<NTopic::IWriteSession> TFederatedTopicClient::CreateFederatedWriteSession(const TFederatedWriteSessionSettings& settings) { +// return Impl_->CreateFederatedWriteSession(settings); +// } + +} // namespace NYdb::NFederatedTopic diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp new file mode 100644 index 0000000000..fa091252b5 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp @@ -0,0 +1,28 @@ +#include "federated_topic_impl.h" + +#include "federated_read_session.h" +// #include "federated_write_session.h" + +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h> + +namespace NYdb::NFederatedTopic { + +std::shared_ptr<IFederatedReadSession> +TFederatedTopicClient::TImpl::CreateFederatedReadSession(const TFederatedReadSessionSettings& settings) { + InitObserver(); + auto session = std::make_shared<TFederatedReadSession>(settings, Connections, ClientSettings, GetObserver()); + session->Start(); + return std::move(session); +} + +void TFederatedTopicClient::TImpl::InitObserver() { + with_lock(Lock) { + if (!Observer || Observer->IsStale()) { + Observer = std::make_shared<TFederatedDbObserver>(Connections, ClientSettings); + Observer->Start(); + } + } +} + +} diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.h new file mode 100644 index 0000000000..c963b3a94a --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.h @@ -0,0 +1,53 @@ +#pragma once + +#define INCLUDE_YDB_INTERNAL_H +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/make_request/make.h> +#undef INCLUDE_YDB_INTERNAL_H + +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/impl/executor.h> +#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> + +#include <ydb/public/api/grpc/ydb_topic_v1.grpc.pb.h> +#include <ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h> +#include <ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.h> + +namespace NYdb::NFederatedTopic { + +class TFederatedTopicClient::TImpl { +public: + // Constructor for main client. + TImpl(std::shared_ptr<TGRpcConnectionsImpl>&& connections, const TFederatedTopicClientSettings& settings) + : Connections(std::move(connections)) + , ClientSettings(settings) + { + InitObserver(); + } + + ~TImpl() { + with_lock(Lock) { + if (Observer) { + Observer->Stop(); + } + } + } + + // Runtime API. + std::shared_ptr<IFederatedReadSession> CreateFederatedReadSession(const TFederatedReadSessionSettings& settings); + + std::shared_ptr<TFederatedDbObserver> GetObserver() { + with_lock(Lock) { + return Observer; + } + } + + void InitObserver(); + +private: + std::shared_ptr<TGRpcConnectionsImpl> Connections; + const TFederatedTopicClientSettings ClientSettings; + std::shared_ptr<TFederatedDbObserver> Observer; + TAdaptiveLock Lock; +}; + +} // namespace NYdb::NFederatedTopic diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp new file mode 100644 index 0000000000..2742a79d14 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp @@ -0,0 +1,161 @@ +#include <ydb/public/api/grpc/ydb_federation_discovery_v1.grpc.pb.h> + +#include <ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.h> + +namespace NYdb::NFederatedTopic { + +constexpr TDuration REDISCOVERY_DELAY = TDuration::Seconds(30); + +TFederatedDbObserver::TFederatedDbObserver(std::shared_ptr<TGRpcConnectionsImpl> connections, const TFederatedTopicClientSettings& settings) + : TClientImplCommon(std::move(connections), settings) + , FederatedDbState(std::make_shared<TFederatedDbState>()) + , PromiseToInitState(NThreading::NewPromise()) + , FederationDiscoveryRetryPolicy(settings.RetryPolicy_) +{ + RpcSettings.ClientTimeout = settings.ConnectionTimeout_; + RpcSettings.EndpointPolicy = TRpcRequestSettings::TEndpointPolicy::UseDiscoveryEndpoint; + RpcSettings.UseAuth = true; +} + +TFederatedDbObserver::~TFederatedDbObserver() { + Stop(); +} + +std::shared_ptr<TFederatedDbState> TFederatedDbObserver::GetState() { + with_lock(Lock) { + return FederatedDbState; + } +} + +NThreading::TFuture<void> TFederatedDbObserver::WaitForFirstState() { + return PromiseToInitState.GetFuture(); +} + +void TFederatedDbObserver::Start() { + with_lock(Lock) { + if (Stopping) { + return; + } + ScheduleFederationDiscoveryImpl(TDuration::Zero()); + } +} + +void TFederatedDbObserver::Stop() { + NGrpc::IQueueClientContextPtr ctx; + with_lock(Lock) { + Stopping = true; + ctx = std::exchange(FederationDiscoveryDelayContext, nullptr); + } + if (ctx) { + ctx->Cancel(); + } +} + +// If observer is stale it will never update state again because of client retry policy +bool TFederatedDbObserver::IsStale() const { + with_lock(Lock) { + return PromiseToInitState.HasValue() && !FederatedDbState->Status.IsSuccess(); + } +} + +Ydb::FederationDiscovery::ListFederationDatabasesRequest TFederatedDbObserver::ComposeRequest() const { + return {}; +} + +void TFederatedDbObserver::RunFederationDiscoveryImpl() { + Y_VERIFY(Lock.IsLocked()); + + FederationDiscoveryDelayContext = Connections_->CreateContext(); + if (!FederationDiscoveryDelayContext) { + Stopping = true; + // TODO log DRIVER_IS_STOPPING_DESCRIPTION + return; + } + + auto extractor = [self = shared_from_this()] + (google::protobuf::Any* any, TPlainStatus status) mutable { + + Ydb::FederationDiscovery::ListFederationDatabasesResult result; + if (any) { + any->UnpackTo(&result); + } + self->OnFederationDiscovery(std::move(status), std::move(result)); + }; + + Connections_->RunDeferred<Ydb::FederationDiscovery::V1::FederationDiscoveryService, + Ydb::FederationDiscovery::ListFederationDatabasesRequest, + Ydb::FederationDiscovery::ListFederationDatabasesResponse>( + ComposeRequest(), + std::move(extractor), + &Ydb::FederationDiscovery::V1::FederationDiscoveryService::Stub::AsyncListFederationDatabases, + DbDriverState_, + {}, // no polling unready operations, so no need in delay parameter + RpcSettings, + FederationDiscoveryDelayContext); +} + +void TFederatedDbObserver::ScheduleFederationDiscoveryImpl(TDuration delay) { + Y_VERIFY(Lock.IsLocked()); + auto cb = [self = shared_from_this()](bool ok) { + if (ok) { + with_lock(self->Lock) { + if (self->Stopping) { + return; + } + self->RunFederationDiscoveryImpl(); + } + } + }; + + FederationDiscoveryDelayContext = Connections_->CreateContext(); + if (!FederationDiscoveryDelayContext) { + Stopping = true; + // TODO log DRIVER_IS_STOPPING_DESCRIPTION + return; + } + Connections_->ScheduleCallback(delay, + std::move(cb), + FederationDiscoveryDelayContext); + +} + +void TFederatedDbObserver::OnFederationDiscovery(TStatus&& status, Ydb::FederationDiscovery::ListFederationDatabasesResult&& result) { + with_lock(Lock) { + if (Stopping) { + // TODO log something + return; + } + + if (!status.IsSuccess()) { + // if UNIMPLEMENTED - fall back to single db mode: + // - initialize FederatedDbState with original db + endpoint + // - no more updates: no reschedules + + // TODO + // update counters errors + + if (!FederationDiscoveryRetryState) { + FederationDiscoveryRetryState = FederationDiscoveryRetryPolicy->CreateRetryState(); + } + TMaybe<TDuration> retryDelay = FederationDiscoveryRetryState->GetNextRetryDelay(status.GetStatus()); + if (retryDelay) { + ScheduleFederationDiscoveryImpl(*retryDelay); + return; + } + } else { + ScheduleFederationDiscoveryImpl(REDISCOVERY_DELAY); + } + + // TODO validate new state and check if differs from previous + + auto newInfo = std::make_shared<TFederatedDbState>(std::move(result), std::move(status)); + // TODO update only if new state differs + std::swap(FederatedDbState, newInfo); + } + + if (!PromiseToInitState.HasValue()) { + PromiseToInitState.SetValue(); + } +} + +} // namespace NYdb::NFederatedTopic diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.h new file mode 100644 index 0000000000..55de50168c --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.h @@ -0,0 +1,82 @@ +#pragma once + +#define INCLUDE_YDB_INTERNAL_H +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/make_request/make.h> +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h> +#undef INCLUDE_YDB_INTERNAL_H + +#include <ydb/public/api/protos/ydb_federation_discovery.pb.h> + +#include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h> +#include <ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h> + +#include <util/system/spinlock.h> +#include <util/generic/hash.h> + +#include <deque> +#include <memory> + +namespace NYdb::NFederatedTopic { + +struct TFederatedDbState { +public: + using TDbInfo = Ydb::FederationDiscovery::DatabaseInfo; + + TStatus Status; + TString ControlPlaneEndpoint; + TString SelfLocation; + std::vector<std::shared_ptr<TDbInfo>> DbInfos; + +public: + TFederatedDbState() : Status(EStatus::STATUS_UNDEFINED, {}) {} + TFederatedDbState(Ydb::FederationDiscovery::ListFederationDatabasesResult result, TStatus status) + : Status(std::move(status)) + , ControlPlaneEndpoint(result.control_plane_endpoint()) + , SelfLocation(result.self_location()) + { + // TODO remove copy + for (const auto& db : result.federation_databases()) { + DbInfos.push_back(std::make_shared<TDbInfo>(db)); + } + } +}; + + +class TFederatedDbObserver : public TClientImplCommon<TFederatedDbObserver> { +public: + static constexpr TDuration REDISCOVER_DELAY = TDuration::Seconds(60); + +public: + TFederatedDbObserver(std::shared_ptr<TGRpcConnectionsImpl> connections, const TFederatedTopicClientSettings& settings); + + ~TFederatedDbObserver(); + + std::shared_ptr<TFederatedDbState> GetState(); + + NThreading::TFuture<void> WaitForFirstState(); + + void Start(); + void Stop(); + + bool IsStale() const; + +private: + Ydb::FederationDiscovery::ListFederationDatabasesRequest ComposeRequest() const; + void RunFederationDiscoveryImpl(); + void ScheduleFederationDiscoveryImpl(TDuration delay); + void OnFederationDiscovery(TStatus&& status, Ydb::FederationDiscovery::ListFederationDatabasesResult&& result); + +private: + std::shared_ptr<TFederatedDbState> FederatedDbState; + NThreading::TPromise<void> PromiseToInitState; + TRpcRequestSettings RpcSettings; + TSpinLock Lock; + + NTopic::IRetryPolicy::TPtr FederationDiscoveryRetryPolicy; + NTopic::IRetryPolicy::IRetryState::TPtr FederationDiscoveryRetryState; + NGrpc::IQueueClientContextPtr FederationDiscoveryDelayContext; + + bool Stopping = false; +}; + +} // namespace NYdb::NFederatedTopic diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/ya.make b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/ya.make new file mode 100644 index 0000000000..491270cda6 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/ya.make @@ -0,0 +1,29 @@ +LIBRARY() + +SRCS( + federated_read_session.h + federated_read_session.cpp + federated_read_session_event.cpp + federated_topic_impl.h + federated_topic_impl.cpp + federated_topic.cpp + federation_observer.h + federation_observer.cpp +) + +PEERDIR( + library/cpp/grpc/client + library/cpp/monlib/dynamic_counters + library/cpp/monlib/metrics + library/cpp/string_utils/url + ydb/library/persqueue/obfuscate + ydb/public/api/grpc/draft + ydb/public/api/grpc + ydb/public/sdk/cpp/client/impl/ydb_internal/make_request + ydb/public/sdk/cpp/client/ydb_common_client/impl + ydb/public/sdk/cpp/client/ydb_driver + ydb/public/sdk/cpp/client/ydb_persqueue_core/impl + ydb/public/sdk/cpp/client/ydb_proto +) + +END() diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..8ae3258bf2 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,89 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-public-sdk-cpp-client-ydb_federated_topic-ut) +target_compile_options(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic +) +target_link_libraries(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + cpp-client-ydb_federated_topic + cpp-testing-gmock_in_unittest + core-testlib-default + public-lib-json_value + public-lib-yson_value + cpp-client-ydb_driver + cpp-client-ydb_persqueue_core + client-ydb_persqueue_core-impl + ydb_persqueue_core-ut-ut_utils + client-ydb_topic-codecs + cpp-client-ydb_topic + client-ydb_topic-impl + client-ydb_federated_topic-impl +) +target_link_options(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PRIVATE + -Wl,-platform_version,macos,11.0,11.0 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp +) +set_property( + TARGET + ydb-public-sdk-cpp-client-ydb_federated_topic-ut + PROPERTY + SPLIT_FACTOR + 10 +) +add_yunittest( + NAME + ydb-public-sdk-cpp-client-ydb_federated_topic-ut + TEST_TARGET + ydb-public-sdk-cpp-client-ydb_federated_topic-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-public-sdk-cpp-client-ydb_federated_topic-ut + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-public-sdk-cpp-client-ydb_federated_topic-ut + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-public-sdk-cpp-client-ydb_federated_topic-ut + PROPERTY + TIMEOUT + 600 +) +target_allocator(ydb-public-sdk-cpp-client-ydb_federated_topic-ut + system_allocator +) +vcs_info(ydb-public-sdk-cpp-client-ydb_federated_topic-ut) diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..f5d7b13900 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.linux-aarch64.txt @@ -0,0 +1,92 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-public-sdk-cpp-client-ydb_federated_topic-ut) +target_compile_options(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic +) +target_link_libraries(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-testing-unittest_main + cpp-client-ydb_federated_topic + cpp-testing-gmock_in_unittest + core-testlib-default + public-lib-json_value + public-lib-yson_value + cpp-client-ydb_driver + cpp-client-ydb_persqueue_core + client-ydb_persqueue_core-impl + ydb_persqueue_core-ut-ut_utils + client-ydb_topic-codecs + cpp-client-ydb_topic + client-ydb_topic-impl + client-ydb_federated_topic-impl +) +target_link_options(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp +) +set_property( + TARGET + ydb-public-sdk-cpp-client-ydb_federated_topic-ut + PROPERTY + SPLIT_FACTOR + 10 +) +add_yunittest( + NAME + ydb-public-sdk-cpp-client-ydb_federated_topic-ut + TEST_TARGET + ydb-public-sdk-cpp-client-ydb_federated_topic-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-public-sdk-cpp-client-ydb_federated_topic-ut + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-public-sdk-cpp-client-ydb_federated_topic-ut + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-public-sdk-cpp-client-ydb_federated_topic-ut + PROPERTY + TIMEOUT + 600 +) +target_allocator(ydb-public-sdk-cpp-client-ydb_federated_topic-ut + cpp-malloc-jemalloc +) +vcs_info(ydb-public-sdk-cpp-client-ydb_federated_topic-ut) diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..a70159ebcd --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.linux-x86_64.txt @@ -0,0 +1,94 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-public-sdk-cpp-client-ydb_federated_topic-ut) +target_compile_options(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic +) +target_link_libraries(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + cpp-client-ydb_federated_topic + cpp-testing-gmock_in_unittest + core-testlib-default + public-lib-json_value + public-lib-yson_value + cpp-client-ydb_driver + cpp-client-ydb_persqueue_core + client-ydb_persqueue_core-impl + ydb_persqueue_core-ut-ut_utils + client-ydb_topic-codecs + cpp-client-ydb_topic + client-ydb_topic-impl + client-ydb_federated_topic-impl +) +target_link_options(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp +) +set_property( + TARGET + ydb-public-sdk-cpp-client-ydb_federated_topic-ut + PROPERTY + SPLIT_FACTOR + 10 +) +add_yunittest( + NAME + ydb-public-sdk-cpp-client-ydb_federated_topic-ut + TEST_TARGET + ydb-public-sdk-cpp-client-ydb_federated_topic-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-public-sdk-cpp-client-ydb_federated_topic-ut + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-public-sdk-cpp-client-ydb_federated_topic-ut + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-public-sdk-cpp-client-ydb_federated_topic-ut + PROPERTY + TIMEOUT + 600 +) +target_allocator(ydb-public-sdk-cpp-client-ydb_federated_topic-ut + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache +) +vcs_info(ydb-public-sdk-cpp-client-ydb_federated_topic-ut) diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.windows-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..4d6c28c3cd --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.windows-x86_64.txt @@ -0,0 +1,82 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-public-sdk-cpp-client-ydb_federated_topic-ut) +target_compile_options(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic +) +target_link_libraries(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + cpp-client-ydb_federated_topic + cpp-testing-gmock_in_unittest + core-testlib-default + public-lib-json_value + public-lib-yson_value + cpp-client-ydb_driver + cpp-client-ydb_persqueue_core + client-ydb_persqueue_core-impl + ydb_persqueue_core-ut-ut_utils + client-ydb_topic-codecs + cpp-client-ydb_topic + client-ydb_topic-impl + client-ydb_federated_topic-impl +) +target_sources(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp +) +set_property( + TARGET + ydb-public-sdk-cpp-client-ydb_federated_topic-ut + PROPERTY + SPLIT_FACTOR + 10 +) +add_yunittest( + NAME + ydb-public-sdk-cpp-client-ydb_federated_topic-ut + TEST_TARGET + ydb-public-sdk-cpp-client-ydb_federated_topic-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-public-sdk-cpp-client-ydb_federated_topic-ut + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-public-sdk-cpp-client-ydb_federated_topic-ut + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-public-sdk-cpp-client-ydb_federated_topic-ut + PROPERTY + TIMEOUT + 600 +) +target_allocator(ydb-public-sdk-cpp-client-ydb_federated_topic-ut + system_allocator +) +vcs_info(ydb-public-sdk-cpp-client-ydb_federated_topic-ut) diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp new file mode 100644 index 0000000000..743c09b06a --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp @@ -0,0 +1,397 @@ +#include <ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h> + +#include <ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.h> + +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h> + +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h> + +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h> +#include <ydb/public/sdk/cpp/client/ydb_federated_topic/ut/fds_mock.h> + +#include <library/cpp/testing/unittest/registar.h> +#include <library/cpp/testing/unittest/tests_data.h> +#include <library/cpp/threading/future/future.h> +#include <library/cpp/threading/future/async.h> + +#include <future> + +namespace NYdb::NFederatedTopic::NTests { + +Y_UNIT_TEST_SUITE(BasicUsage) { + + Y_UNIT_TEST(GetAllStartPartitionSessions) { + size_t partitionsCount = 5; + + auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>( + TEST_CASE_NAME, false, ::NPersQueue::TTestServer::LOGGED_SERVICES, NActors::NLog::PRI_DEBUG, 2, partitionsCount); + + setup->Start(true, true); + + TFederationDiscoveryServiceMock fdsMock; + fdsMock.Port = setup->GetGrpcPort(); + + ui16 newServicePort = setup->GetPortManager()->GetPort(4285); + auto grpcServer = setup->StartGrpcService(newServicePort, &fdsMock); + + std::shared_ptr<NYdb::NFederatedTopic::IFederatedReadSession> ReadSession; + + // Create topic client. + NYdb::TDriverConfig cfg; + cfg.SetEndpoint(TStringBuilder() << "localhost:" << newServicePort); + cfg.SetDatabase("/Root"); + cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)); + NYdb::TDriver driver(cfg); + NYdb::NFederatedTopic::TFederatedTopicClient topicClient(driver); + + // Create read session. + NYdb::NFederatedTopic::TFederatedReadSessionSettings readSettings; + readSettings + .ConsumerName("shared/user") + .MaxMemoryUsageBytes(1_MB) + .AppendTopics(setup->GetTestTopic()); + + ReadSession = topicClient.CreateFederatedReadSession(readSettings); + Cerr << "Session was created" << Endl; + + ReadSession->WaitEvent().Wait(TDuration::Seconds(1)); + TMaybe<NYdb::NFederatedTopic::TReadSessionEvent::TEvent> event = ReadSession->GetEvent(false); + Y_ASSERT(!event); + + auto fdsRequest = fdsMock.GetNextPendingRequest(); + Y_ASSERT(fdsRequest.has_value()); + // TODO check fdsRequest->Req db header + + Ydb::FederationDiscovery::ListFederationDatabasesResponse response; + + auto op = response.mutable_operation(); + op->set_status(Ydb::StatusIds::SUCCESS); + response.mutable_operation()->set_ready(true); + response.mutable_operation()->set_id("12345"); + + Ydb::FederationDiscovery::ListFederationDatabasesResult mockResult; + mockResult.set_control_plane_endpoint("cp.logbroker-federation:2135"); + mockResult.set_self_location("fancy_datacenter"); + auto c1 = mockResult.add_federation_databases(); + c1->set_name("dc1"); + c1->set_path("/Root"); + c1->set_id("account-dc1"); + c1->set_endpoint("localhost:" + ToString(fdsMock.Port)); + c1->set_location("dc1"); + c1->set_status(::Ydb::FederationDiscovery::DatabaseInfo::Status::DatabaseInfo_Status_AVAILABLE); + c1->set_weight(1000); + auto c2 = mockResult.add_federation_databases(); + c2->set_name("dc2"); + c2->set_path("/Root"); + c2->set_id("account-dc2"); + c2->set_endpoint("localhost:" + ToString(fdsMock.Port)); + c2->set_location("dc2"); + c2->set_status(::Ydb::FederationDiscovery::DatabaseInfo::Status::DatabaseInfo_Status_AVAILABLE); + c2->set_weight(500); + + op->mutable_result()->PackFrom(mockResult); + + fdsRequest->Result.SetValue({std::move(response), grpc::Status::OK}); + + for (size_t i = 0; i < partitionsCount; ++i) { + ReadSession->WaitEvent().Wait(); + // Get event + TMaybe<NYdb::NFederatedTopic::TReadSessionEvent::TEvent> event = ReadSession->GetEvent(true/*block - will block if no event received yet*/); + Cerr << "Got new read session event: " << DebugString(*event) << Endl; + + auto* startPartitionSessionEvent = std::get_if<NYdb::NFederatedTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*event); + Y_ASSERT(startPartitionSessionEvent); + startPartitionSessionEvent->Confirm(); + } + + ReadSession->Close(TDuration::MilliSeconds(10)); + } + + Y_UNIT_TEST(WaitEventBlocksBeforeDiscovery) { + auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME, false); + setup->Start(true, true); + + TFederationDiscoveryServiceMock fdsMock; + fdsMock.Port = setup->GetGrpcPort(); + + ui16 newServicePort = setup->GetPortManager()->GetPort(4285); + auto grpcServer = setup->StartGrpcService(newServicePort, &fdsMock); + + std::shared_ptr<NYdb::NFederatedTopic::IFederatedReadSession> ReadSession; + + // Create topic client. + NYdb::TDriverConfig cfg; + cfg.SetEndpoint(TStringBuilder() << "localhost:" << newServicePort); + cfg.SetDatabase("/Root"); + cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)); + NYdb::TDriver driver(cfg); + NYdb::NFederatedTopic::TFederatedTopicClient topicClient(driver); + + // Create read session. + NYdb::NFederatedTopic::TFederatedReadSessionSettings readSettings; + readSettings + .ConsumerName("shared/user") + .MaxMemoryUsageBytes(1_MB) + .AppendTopics(setup->GetTestTopic()); + + ReadSession = topicClient.CreateFederatedReadSession(readSettings); + Cerr << "Session was created" << Endl; + + auto f = ReadSession->WaitEvent(); + Cerr << "Returned from WaitEvent" << Endl; + // observer asyncInit should respect client/session timeouts + UNIT_ASSERT(!f.Wait(TDuration::Seconds(1))); + + Cerr << "Session blocked successfully" << Endl; + + UNIT_ASSERT(ReadSession->Close(TDuration::MilliSeconds(10))); + Cerr << "Session closed gracefully" << Endl; + } + + Y_UNIT_TEST(RetryDiscoveryWithCancel) { + // TODO register requests in mock, compare time for retries, reschedules + + auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME, false); + setup->Start(true, true); + + TFederationDiscoveryServiceMock fdsMock; + fdsMock.Port = setup->GetGrpcPort(); + + ui16 newServicePort = setup->GetPortManager()->GetPort(4285); + auto grpcServer = setup->StartGrpcService(newServicePort, &fdsMock); + + std::shared_ptr<NYdb::NFederatedTopic::IFederatedReadSession> ReadSession; + + // Create topic client. + NYdb::TDriverConfig cfg; + cfg.SetEndpoint(TStringBuilder() << "localhost:" << newServicePort); + cfg.SetDatabase("/Root"); + cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)); + NYdb::TDriver driver(cfg); + auto clientSettings = TFederatedTopicClientSettings() + .RetryPolicy(NTopic::IRetryPolicy::GetFixedIntervalPolicy( + TDuration::Seconds(10), + TDuration::Seconds(10) + )); + NYdb::NFederatedTopic::TFederatedTopicClient topicClient(driver, clientSettings); + + bool answerOk = false; + + for (int i = 0; i < 6; ++i) { + std::optional<TFederationDiscoveryServiceMock::TManualRequest> fdsRequest; + do { + Sleep(TDuration::MilliSeconds(50)); + fdsRequest = fdsMock.GetNextPendingRequest(); + + } while (!fdsRequest.has_value()); + + if (answerOk) { + fdsRequest->Result.SetValue(fdsMock.ComposeOkResult()); + } else { + fdsRequest->Result.SetValue({{}, grpc::Status(grpc::StatusCode::UNAVAILABLE, "mock 'unavailable'")}); + } + + answerOk = !answerOk; + } + } + + Y_UNIT_TEST(PropagateSessionClosed) { + auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME, false); + setup->Start(true, true); + + TFederationDiscoveryServiceMock fdsMock; + fdsMock.Port = setup->GetGrpcPort(); + + ui16 newServicePort = setup->GetPortManager()->GetPort(4285); + auto grpcServer = setup->StartGrpcService(newServicePort, &fdsMock); + + std::shared_ptr<NYdb::NFederatedTopic::IFederatedReadSession> ReadSession; + + // Create topic client. + NYdb::TDriverConfig cfg; + cfg.SetEndpoint(TStringBuilder() << "localhost:" << newServicePort); + cfg.SetDatabase("/Root"); + cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)); + NYdb::TDriver driver(cfg); + auto clientSettings = TFederatedTopicClientSettings() + .RetryPolicy(NTopic::IRetryPolicy::GetFixedIntervalPolicy( + TDuration::Seconds(10), + TDuration::Seconds(10) + )); + NYdb::NFederatedTopic::TFederatedTopicClient topicClient(driver, clientSettings); + + + // Create read session. + NYdb::NFederatedTopic::TFederatedReadSessionSettings readSettings; + readSettings + .ConsumerName("shared/user") + .MaxMemoryUsageBytes(1_MB) + .AppendTopics(setup->GetTestTopic()); + + ReadSession = topicClient.CreateFederatedReadSession(readSettings); + Cerr << "Session was created" << Endl; + + Sleep(TDuration::MilliSeconds(50)); + + auto events = ReadSession->GetEvents(false); + UNIT_ASSERT(events.empty()); + + Ydb::FederationDiscovery::ListFederationDatabasesResponse Response; + + auto op = Response.mutable_operation(); + op->set_status(Ydb::StatusIds::SUCCESS); + Response.mutable_operation()->set_ready(true); + Response.mutable_operation()->set_id("12345"); + + Ydb::FederationDiscovery::ListFederationDatabasesResult mockResult; + mockResult.set_control_plane_endpoint("cp.logbroker-federation:2135"); + mockResult.set_self_location("fancy_datacenter"); + auto c1 = mockResult.add_federation_databases(); + c1->set_name("dc1"); + c1->set_path("/Root"); + c1->set_id("account-dc1"); + c1->set_endpoint("localhost:" + ToString(fdsMock.Port)); + c1->set_location("dc1"); + c1->set_status(::Ydb::FederationDiscovery::DatabaseInfo::Status::DatabaseInfo_Status_AVAILABLE); + c1->set_weight(1000); + auto c2 = mockResult.add_federation_databases(); + c2->set_name("dc2"); + c2->set_path("/Invalid"); + c2->set_id("account-dc2"); + c2->set_endpoint("localhost:" + ToString(fdsMock.Port)); + c2->set_location("dc2"); + c2->set_status(::Ydb::FederationDiscovery::DatabaseInfo::Status::DatabaseInfo_Status_AVAILABLE); + c2->set_weight(500); + + op->mutable_result()->PackFrom(mockResult); + + std::optional<TFederationDiscoveryServiceMock::TManualRequest> fdsRequest; + do { + fdsRequest = fdsMock.GetNextPendingRequest(); + if (!fdsRequest.has_value()) { + Sleep(TDuration::MilliSeconds(50)); + } + } while (!fdsRequest.has_value()); + fdsRequest->Result.SetValue({Response, grpc::Status::OK}); + + NPersQueue::TWriteSessionSettings writeSettings; + writeSettings.Path(setup->GetTestTopic()).MessageGroupId("src_id"); + writeSettings.Codec(NPersQueue::ECodec::RAW); + NPersQueue::IExecutor::TPtr executor = new NPersQueue::TSyncExecutor(); + writeSettings.CompressionExecutor(executor); + + auto& client = setup->GetPersQueueClient(); + auto session = client.CreateSimpleBlockingWriteSession(writeSettings); + TString messageBase = "message----"; + + + ui64 count = 100u; + for (auto i = 0u; i < count; i++) { + auto res = session->Write(messageBase * (200 * 1024) + ToString(i)); + UNIT_ASSERT(res); + + events = ReadSession->GetEvents(true); + UNIT_ASSERT(!events.empty()); + + for (auto& e : events) { + Cerr << ">>> Got event: " << DebugString(e) << Endl; + if (auto* dataEvent = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&e)) { + dataEvent->Commit(); + } else if (auto* startPartitionSessionEvent = std::get_if<TReadSessionEvent::TStartPartitionSessionEvent>(&e)) { + startPartitionSessionEvent->Confirm(); + } else if (auto* stopPartitionSessionEvent = std::get_if<TReadSessionEvent::TStopPartitionSessionEvent>(&e)) { + stopPartitionSessionEvent->Confirm(); + } + } + } + + session->Close(); + } + + Y_UNIT_TEST(RecreateObserver) { + // TODO register requests in mock, compare time for retries, reschedules + + auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME, false); + setup->Start(true, true); + + TFederationDiscoveryServiceMock fdsMock; + fdsMock.Port = setup->GetGrpcPort(); + + ui16 newServicePort = setup->GetPortManager()->GetPort(4285); + auto grpcServer = setup->StartGrpcService(newServicePort, &fdsMock); + + std::shared_ptr<NYdb::NFederatedTopic::IFederatedReadSession> ReadSession; + + // Create topic client. + NYdb::TDriverConfig cfg; + cfg.SetEndpoint(TStringBuilder() << "localhost:" << newServicePort); + cfg.SetDatabase("/Root"); + cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)); + NYdb::TDriver driver(cfg); + auto clientSettings = TFederatedTopicClientSettings() + .RetryPolicy(NTopic::IRetryPolicy::GetNoRetryPolicy()); + NYdb::NFederatedTopic::TFederatedTopicClient topicClient(driver, clientSettings); + + + // Create read session. + NYdb::NFederatedTopic::TFederatedReadSessionSettings readSettings; + readSettings + .ConsumerName("shared/user") + .MaxMemoryUsageBytes(1_MB) + .AppendTopics(setup->GetTestTopic()); + + ReadSession = topicClient.CreateFederatedReadSession(readSettings); + Cerr << "Session was created" << Endl; + + ReadSession->WaitEvent().Wait(TDuration::Seconds(1)); + auto event = ReadSession->GetEvent(false); + UNIT_ASSERT(!event.Defined()); + + + std::optional<TFederationDiscoveryServiceMock::TManualRequest> fdsRequest; + do { + fdsRequest = fdsMock.GetNextPendingRequest(); + if (!fdsRequest.has_value()) { + Sleep(TDuration::MilliSeconds(50)); + } + } while (!fdsRequest.has_value()); + + fdsRequest->Result.SetValue({{}, grpc::Status(grpc::StatusCode::UNAVAILABLE, "mock 'unavailable'")}); + + ReadSession->WaitEvent().Wait(); + event = ReadSession->GetEvent(false); + UNIT_ASSERT(event.Defined()); + Cerr << ">>> Got event: " << DebugString(*event) << Endl; + UNIT_ASSERT(std::holds_alternative<NTopic::TSessionClosedEvent>(*event)); + + auto ReadSession2 = topicClient.CreateFederatedReadSession(readSettings); + Cerr << "Session2 was created" << Endl; + + ReadSession2->WaitEvent().Wait(TDuration::Seconds(1)); + event = ReadSession2->GetEvent(false); + UNIT_ASSERT(!event.Defined()); + + do { + fdsRequest = fdsMock.GetNextPendingRequest(); + if (!fdsRequest.has_value()) { + Sleep(TDuration::MilliSeconds(50)); + } + } while (!fdsRequest.has_value()); + + fdsRequest->Result.SetValue(fdsMock.ComposeOkResult()); + + event = ReadSession2->GetEvent(true); + UNIT_ASSERT(event.Defined()); + Cerr << ">>> Got event: " << DebugString(*event) << Endl; + UNIT_ASSERT(std::holds_alternative<TReadSessionEvent::TStartPartitionSessionEvent>(*event)); + + // Cerr << ">>> Got event: " << DebugString(*event) << Endl; + // UNIT_ASSERT(std::holds_alternative<NTopic::TSessionClosedEvent>(*event)); + } + + + +} + +} diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/fds_mock.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/fds_mock.h new file mode 100644 index 0000000000..b0746d14b7 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/fds_mock.h @@ -0,0 +1,112 @@ +#pragma once + +#include <library/cpp/threading/future/future.h> +#include <ydb/public/api/grpc/ydb_federation_discovery_v1.grpc.pb.h> + +#include <deque> +#include <optional> + +namespace NYdb::NFederatedTopic::NTests { + +// ctor gets ---list of response--- tmaybe{response} +// ListFederationDatabases gets 1 element under lock and respond. otherwise +// create 2 queues, for requests and responses +// getrequest() - put request and returns<request, promise<response>> +// sendresponse() +class TFederationDiscoveryServiceMock: public Ydb::FederationDiscovery::V1::FederationDiscoveryService::Service { +public: + using TRequest = Ydb::FederationDiscovery::ListFederationDatabasesRequest; + using TResponse = Ydb::FederationDiscovery::ListFederationDatabasesResponse; + + struct TGrpcResult { + TResponse Response; + grpc::Status Status; + }; + + struct TManualRequest { + const TRequest* Request; + NThreading::TPromise<TGrpcResult> Result; + }; + +public: + ~TFederationDiscoveryServiceMock() { + while (auto r = GetNextPendingRequest()) { + r->Result.SetValue({}); + } + } + + std::optional<TManualRequest> GetNextPendingRequest() { + std::optional<TManualRequest> result; + with_lock(Lock) { + if (!PendingRequests.empty()) { + result = PendingRequests.front(); + PendingRequests.pop_front(); + } + } + return result; + } + + virtual grpc::Status ListFederationDatabases(grpc::ServerContext*, + const TRequest* request, + TResponse* response) override { + Y_UNUSED(request); + + auto p = NThreading::NewPromise<TGrpcResult>(); + auto f = p.GetFuture(); + + with_lock(Lock) { + PendingRequests.push_back({request, std::move(p)}); + } + + f.Wait(TDuration::Seconds(35)); + if (f.HasValue()) { + auto result = f.ExtractValueSync(); + Cerr << ">>> Ready to answer: " << (result.Status.ok() ? "ok" : "err") << Endl; + *response = std::move(result.Response); + return result.Status; + } + + return grpc::Status(grpc::StatusCode::UNKNOWN, "No response after timeout"); + } + + TGrpcResult ComposeOkResult() { + Ydb::FederationDiscovery::ListFederationDatabasesResponse okResponse; + + auto op = okResponse.mutable_operation(); + op->set_status(Ydb::StatusIds::SUCCESS); + okResponse.mutable_operation()->set_ready(true); + okResponse.mutable_operation()->set_id("12345"); + + Ydb::FederationDiscovery::ListFederationDatabasesResult mockResult; + mockResult.set_control_plane_endpoint("cp.logbroker-federation:2135"); + mockResult.set_self_location("fancy_datacenter"); + auto c1 = mockResult.add_federation_databases(); + c1->set_name("dc1"); + c1->set_path("/Root"); + c1->set_id("account-dc1"); + c1->set_endpoint("localhost:" + ToString(Port)); + c1->set_location("dc1"); + c1->set_status(::Ydb::FederationDiscovery::DatabaseInfo::Status::DatabaseInfo_Status_AVAILABLE); + c1->set_weight(1000); + auto c2 = mockResult.add_federation_databases(); + c2->set_name("dc2"); + c2->set_path("/Root"); + c2->set_id("account-dc2"); + c2->set_endpoint("localhost:" + ToString(Port)); + c2->set_location("dc2"); + c2->set_status(::Ydb::FederationDiscovery::DatabaseInfo::Status::DatabaseInfo_Status_AVAILABLE); + c2->set_weight(500); + + op->mutable_result()->PackFrom(mockResult); + + return {okResponse, grpc::Status::OK}; + } + + +public: + ui16 Port; + std::deque<TManualRequest> PendingRequests; + TAdaptiveLock Lock; +}; + +} // namespace NYdb::NFederatedTopic::NTests diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/ya.make b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/ya.make new file mode 100644 index 0000000000..d40d4432db --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/ya.make @@ -0,0 +1,38 @@ +UNITTEST_FOR(ydb/public/sdk/cpp/client/ydb_federated_topic) + +IF (SANITIZER_TYPE) + TIMEOUT(1200) + SIZE(LARGE) + TAG(ya:fat) +ELSE() + TIMEOUT(600) + SIZE(MEDIUM) +ENDIF() + +FORK_SUBTESTS() + +PEERDIR( + library/cpp/testing/gmock_in_unittest + ydb/core/testlib/default + ydb/public/lib/json_value + ydb/public/lib/yson_value + ydb/public/sdk/cpp/client/ydb_driver + ydb/public/sdk/cpp/client/ydb_persqueue_core + ydb/public/sdk/cpp/client/ydb_persqueue_core/impl + ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils + + ydb/public/sdk/cpp/client/ydb_topic/codecs + ydb/public/sdk/cpp/client/ydb_topic + ydb/public/sdk/cpp/client/ydb_topic/impl + + ydb/public/sdk/cpp/client/ydb_federated_topic + ydb/public/sdk/cpp/client/ydb_federated_topic/impl +) + +YQL_LAST_ABI_VERSION() + +SRCS( + basic_usage_ut.cpp +) + +END() diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ya.make b/ydb/public/sdk/cpp/client/ydb_federated_topic/ya.make new file mode 100644 index 0000000000..217088f2e0 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ya.make @@ -0,0 +1,18 @@ +LIBRARY() + +GENERATE_ENUM_SERIALIZATION(ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h) + +SRCS( + federated_topic.h +) + +PEERDIR( + ydb/public/sdk/cpp/client/ydb_topic + ydb/public/sdk/cpp/client/ydb_federated_topic/impl +) + +END() + +RECURSE_FOR_TESTS( + ut +) diff --git a/ydb/public/sdk/cpp/ya.make b/ydb/public/sdk/cpp/ya.make index 752050f35a..d308cb76a2 100644 --- a/ydb/public/sdk/cpp/ya.make +++ b/ydb/public/sdk/cpp/ya.make @@ -29,6 +29,9 @@ RECURSE( client/ydb_topic/codecs client/ydb_topic/impl client/ydb_topic/ut + client/ydb_federated_topic + client/ydb_federated_topic/impl + client/ydb_federated_topic/ut client/ydb_extension client/ydb_result/ut client/ydb_params/ut |