aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorildar-khisam <ikhis@ydb.tech>2023-08-14 09:23:56 +0300
committerildar-khisam <ikhis@ydb.tech>2023-08-14 10:05:12 +0300
commit8ab85ce5392a37c48498612fc0d696c0e7f41907 (patch)
tree0ff42f820ed0b4f5f05782b9fda9118cb056bfb8
parent5dddb98273c83176cf673fd86e6697885e5b9a10 (diff)
downloadydb-8ab85ce5392a37c48498612fc0d696c0e7f41907.tar.gz
recreate observer
recreate observer if stale
-rw-r--r--ydb/public/sdk/cpp/client/CMakeLists.txt1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.darwin-x86_64.txt30
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.linux-aarch64.txt31
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.linux-x86_64.txt31
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.txt17
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.windows-x86_64.txt30
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h362
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.darwin-x86_64.txt33
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.linux-aarch64.txt34
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.linux-x86_64.txt34
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.txt17
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.windows-x86_64.txt33
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp130
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h96
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp208
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp30
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp28
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.h53
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp161
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.h82
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/ya.make29
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.darwin-x86_64.txt89
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.linux-aarch64.txt92
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.linux-x86_64.txt94
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.txt17
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.windows-x86_64.txt82
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp397
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/ut/fds_mock.h112
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/ut/ya.make38
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/ya.make18
-rw-r--r--ydb/public/sdk/cpp/ya.make3
31 files changed, 2412 insertions, 0 deletions
diff --git a/ydb/public/sdk/cpp/client/CMakeLists.txt b/ydb/public/sdk/cpp/client/CMakeLists.txt
index 808d118822..7d96c1f95a 100644
--- a/ydb/public/sdk/cpp/client/CMakeLists.txt
+++ b/ydb/public/sdk/cpp/client/CMakeLists.txt
@@ -20,6 +20,7 @@ add_subdirectory(ydb_discovery)
add_subdirectory(ydb_driver)
add_subdirectory(ydb_export)
add_subdirectory(ydb_extension)
+add_subdirectory(ydb_federated_topic)
add_subdirectory(ydb_import)
add_subdirectory(ydb_monitoring)
add_subdirectory(ydb_operation)
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..dce7f91634
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,30 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(impl)
+add_subdirectory(ut)
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
+
+add_library(cpp-client-ydb_federated_topic)
+target_link_libraries(cpp-client-ydb_federated_topic PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ tools-enum_parser-enum_serialization_runtime
+ cpp-client-ydb_topic
+ client-ydb_federated_topic-impl
+)
+generate_enum_serilization(cpp-client-ydb_federated_topic
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h
+ INCLUDE_HEADERS
+ ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h
+)
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..3e5d5f67b0
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,31 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(impl)
+add_subdirectory(ut)
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
+
+add_library(cpp-client-ydb_federated_topic)
+target_link_libraries(cpp-client-ydb_federated_topic PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ tools-enum_parser-enum_serialization_runtime
+ cpp-client-ydb_topic
+ client-ydb_federated_topic-impl
+)
+generate_enum_serilization(cpp-client-ydb_federated_topic
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h
+ INCLUDE_HEADERS
+ ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h
+)
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..3e5d5f67b0
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,31 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(impl)
+add_subdirectory(ut)
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
+
+add_library(cpp-client-ydb_federated_topic)
+target_link_libraries(cpp-client-ydb_federated_topic PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ tools-enum_parser-enum_serialization_runtime
+ cpp-client-ydb_topic
+ client-ydb_federated_topic-impl
+)
+generate_enum_serilization(cpp-client-ydb_federated_topic
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h
+ INCLUDE_HEADERS
+ ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h
+)
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.windows-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..dce7f91634
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,30 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(impl)
+add_subdirectory(ut)
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
+
+add_library(cpp-client-ydb_federated_topic)
+target_link_libraries(cpp-client-ydb_federated_topic PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ tools-enum_parser-enum_serialization_runtime
+ cpp-client-ydb_topic
+ client-ydb_federated_topic-impl
+)
+generate_enum_serilization(cpp-client-ydb_federated_topic
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h
+ INCLUDE_HEADERS
+ ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h
+)
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h
new file mode 100644
index 0000000000..299210ea6c
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h
@@ -0,0 +1,362 @@
+#pragma once
+
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
+
+#include <ydb/public/api/protos/ydb_federation_discovery.pb.h>
+
+namespace NYdb::NFederatedTopic {
+
+using NTopic::TPrintable;
+using TDbInfo = Ydb::FederationDiscovery::DatabaseInfo;
+
+//! Federated partition session.
+struct TFederatedPartitionSession : public TThrRefBase, public TPrintable<TFederatedPartitionSession> {
+ using TPtr = TIntrusivePtr<TFederatedPartitionSession>;
+
+public:
+ TFederatedPartitionSession(const NTopic::TPartitionSession::TPtr& partitionSession, std::shared_ptr<TDbInfo> db)
+ : PartitionSession(partitionSession)
+ , Db(std::move(db))
+ {}
+
+ //! Request partition session status.
+ //! Result will come to TPartitionSessionStatusEvent.
+ void RequestStatus() {
+ return PartitionSession->RequestStatus();
+ }
+
+ //!
+ //! Properties.
+ //!
+
+ //! Unique identifier of partition session.
+ //! It is unique within one read session.
+ ui64 GetPartitionSessionId() const {
+ return PartitionSession->GetPartitionSessionId();
+ }
+
+ //! Topic path.
+ const TString& GetTopicPath() const {
+ return PartitionSession->GetTopicPath();
+ }
+
+ //! Partition id.
+ ui64 GetPartitionId() const {
+ return PartitionSession->GetPartitionId();
+ }
+
+ const TString& GetDatabaseName() const {
+ return Db->name();
+ }
+
+ const TString& GetDatabasePath() const {
+ return Db->path();
+ }
+
+ const TString& GetDatabaseId() const {
+ return Db->id();
+ }
+
+private:
+ NTopic::TPartitionSession::TPtr PartitionSession;
+ std::shared_ptr<TDbInfo> Db;
+};
+
+//! Events for read session.
+struct TReadSessionEvent {
+ class TFederatedPartitionSessionAccessor {
+ public:
+ TFederatedPartitionSessionAccessor(TFederatedPartitionSession::TPtr partitionSession)
+ : FederatedPartitionSession(std::move(partitionSession))
+ {}
+
+ TFederatedPartitionSessionAccessor(NTopic::TPartitionSession::TPtr partitionSession, std::shared_ptr<TDbInfo> db)
+ : FederatedPartitionSession(MakeIntrusive<TFederatedPartitionSession>(partitionSession, std::move(db)))
+ {}
+
+ inline const TFederatedPartitionSession::TPtr GetFederatedPartitionSession() const {
+ return FederatedPartitionSession;
+ }
+
+ protected:
+ TFederatedPartitionSession::TPtr FederatedPartitionSession;
+ };
+
+ template <typename TEvent>
+ struct TFederated : public TFederatedPartitionSessionAccessor, public TEvent, public TPrintable<TFederated<TEvent>> {
+ using TPrintable<TFederated<TEvent>>::DebugString;
+
+ TFederated(TEvent event, std::shared_ptr<TDbInfo> db)
+ : TFederatedPartitionSessionAccessor(event.GetPartitionSession(), db)
+ , TEvent(std::move(event))
+ {}
+
+ const NTopic::TPartitionSession::TPtr& GetPartitionSession() const override {
+ ythrow yexception() << "GetPartitionSession() method unavailable for federated objects, use GetFederatedPartitionSession() instead";
+ }
+ };
+
+ using TCommitOffsetAcknowledgementEvent = TFederated<NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>;
+ using TStartPartitionSessionEvent = TFederated<NTopic::TReadSessionEvent::TStartPartitionSessionEvent>;
+ using TStopPartitionSessionEvent = TFederated<NTopic::TReadSessionEvent::TStopPartitionSessionEvent>;
+ using TPartitionSessionStatusEvent = TFederated<NTopic::TReadSessionEvent::TPartitionSessionStatusEvent>;
+ using TPartitionSessionClosedEvent = TFederated<NTopic::TReadSessionEvent::TPartitionSessionClosedEvent>;
+
+ struct TDataReceivedEvent : public NTopic::TReadSessionEvent::TPartitionSessionAccessor, public TFederatedPartitionSessionAccessor, public TPrintable<TDataReceivedEvent> {
+ using TMessage = TFederated<NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage>;
+ using TCompressedMessage = TFederated<NTopic::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage>;
+
+ public:
+ TDataReceivedEvent(NTopic::TReadSessionEvent::TDataReceivedEvent event, std::shared_ptr<TDbInfo> db);
+
+ TDataReceivedEvent(TVector<TMessage> messages, TVector<TCompressedMessage> compressedMessages,
+ NTopic::TPartitionSession::TPtr partitionSession, std::shared_ptr<TDbInfo> db);
+
+ const NTopic::TPartitionSession::TPtr& GetPartitionSession() const override {
+ ythrow yexception() << "GetPartitionSession() method unavailable for federated objects, use GetFederatedPartitionSession() instead";
+ }
+
+ bool HasCompressedMessages() const {
+ return !CompressedMessages.empty();
+ }
+
+ size_t GetMessagesCount() const {
+ return Messages.size() + CompressedMessages.size();
+ }
+
+ //! Get messages.
+ TVector<TMessage>& GetMessages() {
+ CheckMessagesFilled(false);
+ return Messages;
+ }
+
+ const TVector<TMessage>& GetMessages() const {
+ CheckMessagesFilled(false);
+ return Messages;
+ }
+
+ //! Get compressed messages.
+ TVector<TCompressedMessage>& GetCompressedMessages() {
+ CheckMessagesFilled(true);
+ return CompressedMessages;
+ }
+
+ const TVector<TCompressedMessage>& GetCompressedMessages() const {
+ CheckMessagesFilled(true);
+ return CompressedMessages;
+ }
+
+ //! Commits all messages in batch.
+ void Commit();
+
+ private:
+ void CheckMessagesFilled(bool compressed) const {
+ Y_VERIFY(!Messages.empty() || !CompressedMessages.empty());
+ if (compressed && CompressedMessages.empty()) {
+ ythrow yexception() << "cannot get compressed messages, parameter decompress=true for read session";
+ }
+ if (!compressed && Messages.empty()) {
+ ythrow yexception() << "cannot get decompressed messages, parameter decompress=false for read session";
+ }
+ }
+
+ private:
+ TVector<TMessage> Messages;
+ TVector<TCompressedMessage> CompressedMessages;
+ std::vector<std::pair<ui64, ui64>> OffsetRanges;
+ };
+
+ using TEvent = std::variant<TDataReceivedEvent,
+ TCommitOffsetAcknowledgementEvent,
+ TStartPartitionSessionEvent,
+ TStopPartitionSessionEvent,
+ TPartitionSessionStatusEvent,
+ TPartitionSessionClosedEvent,
+ NTopic::TSessionClosedEvent>;
+};
+
+TReadSessionEvent::TEvent Federate(NTopic::TReadSessionEvent::TEvent event, std::shared_ptr<TDbInfo> db);
+
+//! Set of offsets to commit.
+//! Class that could store offsets in order to commit them later.
+//! This class is not thread safe.
+class TDeferredCommit {
+public:
+ //! Add message to set.
+ void Add(const TReadSessionEvent::TDataReceivedEvent::TMessage& message);
+
+ //! Add all messages from dataReceivedEvent to set.
+ void Add(const TReadSessionEvent::TDataReceivedEvent& dataReceivedEvent);
+
+ //! Add offsets range to set.
+ void Add(const TFederatedPartitionSession& partitionSession, ui64 startOffset, ui64 endOffset);
+
+ //! Add offset to set.
+ void Add(const TFederatedPartitionSession& partitionSession, ui64 offset);
+
+ //! Commit all added offsets.
+ void Commit();
+
+ TDeferredCommit();
+ TDeferredCommit(const TDeferredCommit&) = delete;
+ TDeferredCommit(TDeferredCommit&&);
+ TDeferredCommit& operator=(const TDeferredCommit&) = delete;
+ TDeferredCommit& operator=(TDeferredCommit&&);
+
+ ~TDeferredCommit();
+
+private:
+ class TImpl;
+ THolder<TImpl> Impl;
+};
+
+//! Event debug string.
+TString DebugString(const TReadSessionEvent::TEvent& event);
+
+
+//! Settings for federated write session.
+struct TFederatedWriteSessionSettings : public NTopic::TWriteSessionSettings {
+ using TSelf = TFederatedWriteSessionSettings;
+
+ //! Preferred database
+ //! If specified database is unavailable, session will write to other database.
+ FLUENT_SETTING_OPTIONAL(TString, PreferredDatabase);
+
+ //! Write to other databases if there are problems with connection
+ //! to the preferred one.
+ FLUENT_SETTING_DEFAULT(bool, AllowFallback, true);
+
+ TFederatedWriteSessionSettings() = default;
+ TFederatedWriteSessionSettings(const TFederatedWriteSessionSettings&) = default;
+ TFederatedWriteSessionSettings(TFederatedWriteSessionSettings&&) = default;
+ TFederatedWriteSessionSettings(const TString& path, const TString& producerId, const TString& messageGroupId)
+ : NTopic::TWriteSessionSettings(path, producerId, messageGroupId) {
+ }
+
+ TFederatedWriteSessionSettings& operator=(const TFederatedWriteSessionSettings&) = default;
+ TFederatedWriteSessionSettings& operator=(TFederatedWriteSessionSettings&&) = default;
+};
+
+//! Settings for read session.
+struct TFederatedReadSessionSettings: public NTopic::TReadSessionSettings {
+ using TSelf = TFederatedReadSessionSettings;
+
+ enum class EReadPolicy {
+ READ_ALL = 0,
+ READ_ORIGINAL,
+ READ_MIRRORED
+ };
+
+ // optional for read_mirrored case ?
+
+ //! Policy for federated reading.
+ //!
+ //! READ_ALL: read will be done from all topic instances from all databases.
+ //! READ_ORIGINAL:
+ //! READ_MIRRORED:
+ FLUENT_SETTING_DEFAULT(EReadPolicy, ReadPolicy, EReadPolicy::READ_ALL);
+};
+
+
+
+class IFederatedReadSession {
+public:
+ //! Main reader loop.
+ //! Wait for next reader event.
+ virtual NThreading::TFuture<void> WaitEvent() = 0;
+
+ //! Main reader loop.
+ //! Get read session events.
+ //! Blocks until event occurs if "block" is set.
+ //!
+ //! maxEventsCount: maximum events count in batch.
+ //! maxByteSize: total size limit of data messages in batch.
+ //! block: block until event occurs.
+ //!
+ //! If maxEventsCount is not specified,
+ //! read session chooses event batch size automatically.
+ virtual TVector<TReadSessionEvent::TEvent>
+ GetEvents(bool block = false, TMaybe<size_t> maxEventsCount = Nothing(),
+ size_t maxByteSize = std::numeric_limits<size_t>::max()) = 0;
+
+ //! Get single event.
+ virtual TMaybe<TReadSessionEvent::TEvent>
+ GetEvent(bool block = false, size_t maxByteSize = std::numeric_limits<size_t>::max()) = 0;
+
+ //! Close read session.
+ //! Waits for all commit acknowledgments to arrive.
+ //! Force close after timeout.
+ //! This method is blocking.
+ //! When session is closed,
+ //! TSessionClosedEvent arrives.
+ virtual bool Close(TDuration timeout = TDuration::Max()) = 0;
+
+ //! Reader counters with different stats (see TReaderConuters).
+ virtual NTopic::TReaderCounters::TPtr GetCounters() const = 0;
+
+ //! Get unique identifier of read session.
+ virtual TString GetSessionId() const = 0;
+
+ virtual ~IFederatedReadSession() = default;
+};
+
+struct TFederatedTopicClientSettings : public TCommonClientSettingsBase<TFederatedTopicClientSettings> {
+ using TSelf = TFederatedTopicClientSettings;
+
+ //! Default executor for compression tasks.
+ FLUENT_SETTING_DEFAULT(NTopic::IExecutor::TPtr, DefaultCompressionExecutor, NTopic::CreateThreadPoolExecutor(2));
+
+ //! Default executor for callbacks.
+ FLUENT_SETTING_DEFAULT(NTopic::IExecutor::TPtr, DefaultHandlersExecutor, NTopic::CreateThreadPoolExecutor(1));
+
+ //! Connection timeoout for federation discovery.
+ FLUENT_SETTING_DEFAULT(TDuration, ConnectionTimeout, TDuration::Seconds(30));
+
+ //! Retry policy enables automatic retries for non-fatal errors.
+ FLUENT_SETTING_DEFAULT(NTopic::IRetryPolicy::TPtr, RetryPolicy, NTopic::IRetryPolicy::GetDefaultPolicy());
+};
+
+class TFederatedTopicClient {
+public:
+ class TImpl;
+
+ // executors from settings are passed to subclients
+ TFederatedTopicClient(const TDriver& driver, const TFederatedTopicClientSettings& settings = {});
+
+ //! Create read session.
+ std::shared_ptr<IFederatedReadSession> CreateFederatedReadSession(const TFederatedReadSessionSettings& settings);
+
+ //! Create write session.
+ // std::shared_ptr<NTopic::ISimpleBlockingWriteSession> CreateSimpleBlockingFederatedWriteSession(const TFederatedWriteSessionSettings& settings);
+ // std::shared_ptr<NTopic::IWriteSession> CreateFederatedWriteSession(const TFederatedWriteSessionSettings& settings);
+
+private:
+ std::shared_ptr<TImpl> Impl_;
+};
+
+} // namespace NYdb::NFederatedTopic
+
+namespace NYdb::NTopic {
+
+using namespace NFederatedTopic;
+
+template<>
+void TPrintable<TFederatedPartitionSession>::DebugString(TStringBuilder& res, bool) const;
+template<>
+void TPrintable<NFederatedTopic::TReadSessionEvent::TDataReceivedEvent>::DebugString(TStringBuilder& res, bool) const;
+template<>
+void TPrintable<NFederatedTopic::TReadSessionEvent::TFederated<NFederatedTopic::TReadSessionEvent::TDataReceivedEvent::TMessage>>::DebugString(TStringBuilder& res, bool) const;
+template<>
+void TPrintable<NFederatedTopic::TReadSessionEvent::TFederated<NFederatedTopic::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage>>::DebugString(TStringBuilder& res, bool) const;
+template<>
+void TPrintable<NFederatedTopic::TReadSessionEvent::TFederated<NFederatedTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>>::DebugString(TStringBuilder& res, bool) const;
+template<>
+void TPrintable<NFederatedTopic::TReadSessionEvent::TFederated<NFederatedTopic::TReadSessionEvent::TStartPartitionSessionEvent>>::DebugString(TStringBuilder& res, bool) const;
+template<>
+void TPrintable<NFederatedTopic::TReadSessionEvent::TFederated<NFederatedTopic::TReadSessionEvent::TStopPartitionSessionEvent>>::DebugString(TStringBuilder& res, bool) const;
+template<>
+void TPrintable<NFederatedTopic::TReadSessionEvent::TFederated<NFederatedTopic::TReadSessionEvent::TPartitionSessionStatusEvent>>::DebugString(TStringBuilder& res, bool) const;
+template<>
+void TPrintable<NFederatedTopic::TReadSessionEvent::TFederated<NFederatedTopic::TReadSessionEvent::TPartitionSessionClosedEvent>>::DebugString(TStringBuilder& res, bool) const;
+
+}
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..6b9ea01e31
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,33 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(client-ydb_federated_topic-impl)
+target_link_libraries(client-ydb_federated_topic-impl PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-grpc-client
+ cpp-monlib-dynamic_counters
+ cpp-monlib-metrics
+ cpp-string_utils-url
+ library-persqueue-obfuscate
+ api-grpc-draft
+ api-grpc
+ impl-ydb_internal-make_request
+ client-ydb_common_client-impl
+ cpp-client-ydb_driver
+ client-ydb_persqueue_core-impl
+ cpp-client-ydb_proto
+)
+target_sources(client-ydb_federated_topic-impl PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp
+)
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..00d0883659
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,34 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(client-ydb_federated_topic-impl)
+target_link_libraries(client-ydb_federated_topic-impl PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-grpc-client
+ cpp-monlib-dynamic_counters
+ cpp-monlib-metrics
+ cpp-string_utils-url
+ library-persqueue-obfuscate
+ api-grpc-draft
+ api-grpc
+ impl-ydb_internal-make_request
+ client-ydb_common_client-impl
+ cpp-client-ydb_driver
+ client-ydb_persqueue_core-impl
+ cpp-client-ydb_proto
+)
+target_sources(client-ydb_federated_topic-impl PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp
+)
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..00d0883659
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,34 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(client-ydb_federated_topic-impl)
+target_link_libraries(client-ydb_federated_topic-impl PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-grpc-client
+ cpp-monlib-dynamic_counters
+ cpp-monlib-metrics
+ cpp-string_utils-url
+ library-persqueue-obfuscate
+ api-grpc-draft
+ api-grpc
+ impl-ydb_internal-make_request
+ client-ydb_common_client-impl
+ cpp-client-ydb_driver
+ client-ydb_persqueue_core-impl
+ cpp-client-ydb_proto
+)
+target_sources(client-ydb_federated_topic-impl PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp
+)
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.windows-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..6b9ea01e31
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,33 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(client-ydb_federated_topic-impl)
+target_link_libraries(client-ydb_federated_topic-impl PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-grpc-client
+ cpp-monlib-dynamic_counters
+ cpp-monlib-metrics
+ cpp-string_utils-url
+ library-persqueue-obfuscate
+ api-grpc-draft
+ api-grpc
+ impl-ydb_internal-make_request
+ client-ydb_common_client-impl
+ cpp-client-ydb_driver
+ client-ydb_persqueue_core-impl
+ cpp-client-ydb_proto
+)
+target_sources(client-ydb_federated_topic-impl PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp
+)
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp
new file mode 100644
index 0000000000..e507617ec8
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp
@@ -0,0 +1,130 @@
+#include "federated_read_session.h"
+
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/log_lazy.h>
+#include <ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h>
+
+#include <library/cpp/threading/future/future.h>
+#include <util/generic/guid.h>
+
+namespace NYdb::NFederatedTopic {
+
+NTopic::TTopicClientSettings FromFederated(const TFederatedTopicClientSettings& settings);
+
+TFederatedReadSession::TFederatedReadSession(const TFederatedReadSessionSettings& settings,
+ std::shared_ptr<TGRpcConnectionsImpl> connections,
+ const TFederatedTopicClientSettings& clientSetttings,
+ std::shared_ptr<TFederatedDbObserver> observer)
+ : Settings(settings)
+ , Connections(std::move(connections))
+ , SubClientSetttings(FromFederated(clientSetttings))
+ , Observer(std::move(observer))
+ , AsyncInit(Observer->WaitForFirstState())
+ , FederationState(nullptr)
+ , SessionId(CreateGuidAsString())
+{
+}
+
+void TFederatedReadSession::Start() {
+ AsyncInit.Subscribe([self = shared_from_this()](const auto& f){
+ Y_UNUSED(f);
+ with_lock(self->Lock) {
+ self->FederationState = self->Observer->GetState();
+ self->OnFederatedStateUpdateImpl();
+ }
+ });
+}
+
+void TFederatedReadSession::OpenSubSessionsImpl() {
+ for (const auto& db : FederationState->DbInfos) {
+ // TODO check if available
+ NTopic::TTopicClientSettings settings = SubClientSetttings;
+ settings
+ .Database(db->path())
+ .DiscoveryEndpoint(db->endpoint());
+ auto subclient = make_shared<NTopic::TTopicClient::TImpl>(Connections, settings);
+ auto subsession = subclient->CreateReadSession(Settings);
+ SubSessions.emplace_back(subsession, db);
+ }
+ SubsessionIndex = 0;
+}
+
+void TFederatedReadSession::OnFederatedStateUpdateImpl() {
+ if (!FederationState->Status.IsSuccess()) {
+ CloseImpl();
+ return;
+ }
+ // 1) compare old info and new info;
+ // result: list of subsessions to open + list of subsessions to close
+ // 2) OpenSubSessionsImpl, CloseSubSessionsImpl
+ OpenSubSessionsImpl();
+ // 3) TODO LATER reschedule OnFederatedStateUpdate
+}
+
+NThreading::TFuture<void> TFederatedReadSession::WaitEvent() {
+ // TODO override with read session settings timeout
+ return AsyncInit.Apply([self = shared_from_this()](const NThreading::TFuture<void>) {
+ if (self->Closing) {
+ return NThreading::MakeFuture();
+ }
+ std::vector<NThreading::TFuture<void>> waiters;
+ with_lock(self->Lock) {
+ Y_VERIFY(!self->SubSessions.empty(), "SubSessions empty in discovered state");
+ for (const auto& sub : self->SubSessions) {
+ waiters.emplace_back(sub.Session->WaitEvent());
+ }
+ }
+ return NThreading::WaitAny(std::move(waiters));
+ });
+}
+
+TVector<TReadSessionEvent::TEvent> TFederatedReadSession::GetEvents(bool block, TMaybe<size_t> maxEventsCount, size_t maxByteSize) {
+ if (block) {
+ WaitEvent().Wait();
+ }
+ with_lock(Lock) {
+ if (Closing) {
+ // TODO correct conversion
+ return {NTopic::TSessionClosedEvent(FederationState->Status.GetStatus(), {})};
+ }
+ // TODO!!! handle aborting or closing state
+ // via handler on SessionClosedEvent {
+ // cancel all subsessions, empty SubSessions, set aborting
+ // }
+ if (SubSessions.empty()) {
+ return {};
+ }
+ }
+ TVector<TReadSessionEvent::TEvent> result;
+ with_lock(Lock) {
+ do {
+ auto sub = SubSessions[SubsessionIndex];
+ // TODO remove copy
+ for (auto&& ev : sub.Session->GetEvents(false, maxEventsCount, maxByteSize)) {
+ result.push_back(Federate(ev, sub.DbInfo));
+ }
+ SubsessionIndex = (SubsessionIndex + 1) % SubSessions.size();
+ }
+ while (block && result.empty());
+ }
+ return result;
+}
+
+TMaybe<TReadSessionEvent::TEvent> TFederatedReadSession::GetEvent(bool block, size_t maxByteSize) {
+ auto events = GetEvents(block, 1, maxByteSize);
+ return events.empty() ? Nothing() : TMaybe<TReadSessionEvent::TEvent>{std::move(events.front())};
+}
+
+void TFederatedReadSession::CloseImpl() {
+ Closing = true;
+}
+
+bool TFederatedReadSession::Close(TDuration timeout) {
+ bool result = true;
+ for (const auto& sub : SubSessions) {
+ // TODO substract from user timeout
+ result = sub.Session->Close(timeout);
+ }
+ return result;
+}
+
+} // namespace NYdb::NFederatedTopic
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h
new file mode 100644
index 0000000000..7e11903a46
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h
@@ -0,0 +1,96 @@
+#pragma once
+
+#include <ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.h>
+
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/impl_tracker.h>
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h>
+
+#include <ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h>
+
+namespace NYdb::NFederatedTopic {
+
+class TFederatedReadSession : public IFederatedReadSession,
+ public std::enable_shared_from_this<TFederatedReadSession> {
+ friend class TFederatedTopicClient::TImpl;
+
+private:
+ struct TSubSession {
+ TSubSession(std::shared_ptr<NTopic::IReadSession> session = {}, std::shared_ptr<TDbInfo> dbInfo = {})
+ : Session(std::move(session))
+ , DbInfo(std::move(dbInfo))
+ {}
+
+ std::shared_ptr<NTopic::IReadSession> Session;
+ std::shared_ptr<TDbInfo> DbInfo;
+ };
+
+public:
+ TFederatedReadSession(const TFederatedReadSessionSettings& settings,
+ std::shared_ptr<TGRpcConnectionsImpl> connections,
+ const TFederatedTopicClientSettings& clientSetttings,
+ std::shared_ptr<TFederatedDbObserver> observer);
+
+ ~TFederatedReadSession() = default;
+
+ NThreading::TFuture<void> WaitEvent() override;
+ TVector<TReadSessionEvent::TEvent> GetEvents(bool block, TMaybe<size_t> maxEventsCount, size_t maxByteSize) override;
+ TMaybe<TReadSessionEvent::TEvent> GetEvent(bool block, size_t maxByteSize) override;
+
+ bool Close(TDuration timeout) override;
+
+ inline TString GetSessionId() const override {
+ return SessionId;
+ }
+
+ inline NTopic::TReaderCounters::TPtr GetCounters() const override {
+ return Settings.Counters_; // Always not nullptr.
+ }
+
+private:
+ // TODO logging
+ TStringBuilder GetLogPrefix() const;
+
+ void Start();
+ bool ValidateSettings();
+ void OpenSubSessionsImpl();
+
+ void OnFederatedStateUpdateImpl();
+
+ void Abort();
+ void CloseImpl();
+
+ void ClearAllEvents();
+
+ // TODO Counters
+ // void MakeCountersIfNeeded();
+ // void DumpCountersToLog(size_t timeNumber = 0);
+ // void ScheduleDumpCountersToLog(size_t timeNumber = 0);
+
+private:
+ // For subsessions creation
+ const NTopic::TReadSessionSettings Settings;
+ std::shared_ptr<TGRpcConnectionsImpl> Connections;
+ const NTopic::TTopicClientSettings SubClientSetttings;
+
+ std::shared_ptr<TFederatedDbObserver> Observer;
+ NThreading::TFuture<void> AsyncInit;
+ std::shared_ptr<TFederatedDbState> FederationState;
+
+ // TODO
+ // TLog Log;
+
+ const TString SessionId;
+ const TInstant StartSessionTime = TInstant::Now();
+
+ TAdaptiveLock Lock;
+
+ std::vector<TSubSession> SubSessions;
+ size_t SubsessionIndex = 0;
+
+ // NGrpc::IQueueClientContextPtr DumpCountersContext;
+
+ // Exiting.
+ bool Closing = false;
+};
+
+} // namespace NYdb::NFederatedTopic
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp
new file mode 100644
index 0000000000..0b6ab25113
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp
@@ -0,0 +1,208 @@
+#include <ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h>
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h>
+
+namespace NYdb::NFederatedTopic {
+
+////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+// Helpers
+
+std::pair<ui64, ui64> GetMessageOffsetRange(const TReadSessionEvent::TDataReceivedEvent& dataReceivedEvent, ui64 index) {
+ if (dataReceivedEvent.HasCompressedMessages()) {
+ const auto& msg = dataReceivedEvent.GetCompressedMessages()[index];
+ return {msg.GetOffset(), msg.GetOffset() + 1};
+ }
+ const auto& msg = dataReceivedEvent.GetMessages()[index];
+ return {msg.GetOffset(), msg.GetOffset() + 1};
+}
+
+TReadSessionEvent::TEvent Federate(NTopic::TReadSessionEvent::TEvent event, std::shared_ptr<TDbInfo> db) {
+ if (auto* ev = std::get_if<NTopic::TReadSessionEvent::TDataReceivedEvent>(&event)) {
+ return TReadSessionEvent::TDataReceivedEvent(std::move(*ev), db);
+ } else if (auto* ev = std::get_if<NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>(&event)) {
+ return TReadSessionEvent::TFederated(*ev, db);
+ } else if (auto* ev = std::get_if<NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&event)) {
+ return TReadSessionEvent::TFederated(*ev, db);
+ } else if (auto* ev = std::get_if<NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&event)) {
+ return TReadSessionEvent::TFederated(*ev, db);
+ } else if (auto* ev = std::get_if<NTopic::TReadSessionEvent::TPartitionSessionStatusEvent>(&event)) {
+ return TReadSessionEvent::TFederated(*ev, db);
+ } else if (auto* ev = std::get_if<NTopic::TReadSessionEvent::TPartitionSessionClosedEvent>(&event)) {
+ return TReadSessionEvent::TFederated(*ev, db);
+ } else if (auto* ev = std::get_if<NTopic::TSessionClosedEvent>(&event)) {
+ return *ev;
+ } else {
+ Y_UNREACHABLE();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+// Printable specializations
+
+}
+
+namespace NYdb::NTopic {
+
+using namespace NFederatedTopic;
+
+using TCommitOffsetAcknowledgementEvent = NFederatedTopic::TReadSessionEvent::TFederated<NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent>;
+using TStartPartitionSessionEvent = NFederatedTopic::TReadSessionEvent::TFederated<NTopic::TReadSessionEvent::TStartPartitionSessionEvent>;
+using TStopPartitionSessionEvent = NFederatedTopic::TReadSessionEvent::TFederated<NTopic::TReadSessionEvent::TStopPartitionSessionEvent>;
+using TPartitionSessionStatusEvent = NFederatedTopic::TReadSessionEvent::TFederated<NTopic::TReadSessionEvent::TPartitionSessionStatusEvent>;
+using TPartitionSessionClosedEvent = NFederatedTopic::TReadSessionEvent::TFederated<NTopic::TReadSessionEvent::TPartitionSessionClosedEvent>;
+using TMessage = NFederatedTopic::TReadSessionEvent::TFederated<NTopic::TReadSessionEvent::TDataReceivedEvent::TMessage>;
+using TCompressedMessage = NFederatedTopic::TReadSessionEvent::TFederated<NTopic::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage>;
+using TDataReceivedEvent = NFederatedTopic::TReadSessionEvent::TDataReceivedEvent;
+
+template<>
+void TPrintable<TFederatedPartitionSession>::DebugString(TStringBuilder& res, bool) const {
+ const auto* self = static_cast<const TFederatedPartitionSession*>(this);
+ res << " Partition session id: " << self->GetPartitionSessionId()
+ << " Topic: \"" << self->GetTopicPath() << "\""
+ << " Partition: " << self->GetPartitionId()
+ << " Database name: " << self->GetDatabaseName()
+ << " Database path: " << self->GetDatabasePath()
+ << " Database id: " << self->GetDatabaseId();
+}
+
+template<>
+void TPrintable<TMessage>::DebugString(TStringBuilder& ret, bool printData) const {
+ const auto* self = static_cast<const TMessage*>(this);
+ ret << "Message {";
+ auto ptr = dynamic_cast<const TReadSessionEvent::TDataReceivedEvent::TMessageBase*>(self);
+ Y_VERIFY(ptr);
+ ptr->DebugString(ret, printData);
+ self->GetFederatedPartitionSession()->DebugString(ret);
+ ret << " }";
+}
+
+template<>
+void TPrintable<TCompressedMessage>::DebugString(TStringBuilder& ret, bool printData) const {
+ const auto* self = static_cast<const TCompressedMessage*>(this);
+ ret << "CompressedMessage {";
+ static_cast<const TReadSessionEvent::TDataReceivedEvent::TMessageBase*>(self)->DebugString(ret, printData);
+ self->GetFederatedPartitionSession()->DebugString(ret);
+ ret << " Codec: " << self->GetCodec()
+ << " Uncompressed size: " << self->GetUncompressedSize()
+ << " }";
+}
+
+template<>
+void TPrintable<TCommitOffsetAcknowledgementEvent>::DebugString(TStringBuilder& ret, bool) const {
+ const auto* self = static_cast<const TCommitOffsetAcknowledgementEvent*>(this);
+ ret << "CommitAcknowledgement {";
+ self->GetFederatedPartitionSession()->DebugString(ret);
+ ret << " CommittedOffset: " << self->GetCommittedOffset()
+ << " }";
+}
+
+template<>
+void TPrintable<TStartPartitionSessionEvent>::DebugString(TStringBuilder& ret, bool) const {
+ const auto* self = static_cast<const TStartPartitionSessionEvent*>(this);
+ ret << "StartPartitionSession {";
+ self->GetFederatedPartitionSession()->DebugString(ret);
+ ret << " CommittedOffset: " << self->GetCommittedOffset()
+ << " EndOffset: " << self->GetEndOffset()
+ << " }";
+}
+
+template<>
+void TPrintable<TStopPartitionSessionEvent>::DebugString(TStringBuilder& ret, bool) const {
+ const auto* self = static_cast<const TStopPartitionSessionEvent*>(this);
+ ret << "StopPartitionSession {";
+ self->GetFederatedPartitionSession()->DebugString(ret);
+ ret << " CommittedOffset: " << self->GetCommittedOffset()
+ << " }";
+}
+
+template<>
+void TPrintable<TPartitionSessionStatusEvent>::DebugString(TStringBuilder& ret, bool) const {
+ const auto* self = static_cast<const TPartitionSessionStatusEvent*>(this);
+ ret << "PartitionSessionStatus {";
+ self->GetFederatedPartitionSession()->DebugString(ret);
+ ret << " CommittedOffset: " << self->GetCommittedOffset()
+ << " ReadOffset: " << self->GetReadOffset()
+ << " EndOffset: " << self->GetEndOffset()
+ << " WriteWatermark: " << self->GetWriteTimeHighWatermark()
+ << " }";
+}
+
+template<>
+void TPrintable<TPartitionSessionClosedEvent>::DebugString(TStringBuilder& ret, bool) const {
+ const auto* self = static_cast<const TPartitionSessionClosedEvent*>(this);
+ ret << "PartitionSessionClosed {";
+ self->GetFederatedPartitionSession()->DebugString(ret);
+ ret << " Reason: " << self->GetReason()
+ << " }";
+}
+
+template<>
+void TPrintable<TDataReceivedEvent>::DebugString(TStringBuilder& ret, bool printData) const {
+ const auto* self = static_cast<const TDataReceivedEvent*>(this);
+ ret << "DataReceived {";
+ self->GetFederatedPartitionSession()->DebugString(ret);
+ if (self->HasCompressedMessages()) {
+ for (const auto& message : self->GetCompressedMessages()) {
+ ret << " ";
+ message.DebugString(ret, printData);
+ }
+ } else {
+ for (const auto& message : self->GetMessages()) {
+ ret << " ";
+ message.DebugString(ret, printData);
+ }
+ }
+ ret << " }";
+}
+
+
+}
+
+namespace NYdb::NFederatedTopic {
+
+////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+// NFederatedTopic::TReadSessionEvent::TDataReceivedEvent
+
+TReadSessionEvent::TDataReceivedEvent::TDataReceivedEvent(NTopic::TReadSessionEvent::TDataReceivedEvent event, std::shared_ptr<TDbInfo> db)
+ : NTopic::TReadSessionEvent::TPartitionSessionAccessor(event.GetPartitionSession())
+ , TFederatedPartitionSessionAccessor(event.GetPartitionSession(), db)
+{
+ if (event.HasCompressedMessages()) {
+ for (auto& msg : event.GetCompressedMessages()) {
+ CompressedMessages.emplace_back(std::move(msg), db);
+ }
+ } else {
+ for (auto& msg : event.GetMessages()) {
+ Messages.emplace_back(std::move(msg), db);
+ }
+ }
+}
+
+TReadSessionEvent::TDataReceivedEvent::TDataReceivedEvent(
+ TVector<TMessage> messages, TVector<TCompressedMessage> compressedMessages,
+ NTopic::TPartitionSession::TPtr partitionSession, std::shared_ptr<TDbInfo> db)
+ : NTopic::TReadSessionEvent::TPartitionSessionAccessor(partitionSession)
+ , TFederatedPartitionSessionAccessor(partitionSession, db)
+ , Messages(std::move(messages))
+ , CompressedMessages(std::move(compressedMessages))
+{
+ for (size_t i = 0; i < GetMessagesCount(); ++i) {
+ auto [from, to] = GetMessageOffsetRange(*this, i);
+ if (OffsetRanges.empty() || OffsetRanges.back().second != from) {
+ OffsetRanges.emplace_back(from, to);
+ } else {
+ OffsetRanges.back().second = to;
+ }
+ }
+}
+
+void TReadSessionEvent::TDataReceivedEvent::Commit() {
+ for (auto [from, to] : OffsetRanges) {
+ static_cast<NPersQueue::TPartitionStreamImpl<false>*>(PartitionSession.Get())->Commit(from, to);
+ }
+}
+
+TString DebugString(const TReadSessionEvent::TEvent& event) {
+ return std::visit([](const auto& ev) { return ev.DebugString(); }, event);
+}
+
+} // namespace NYdb::NFederatedTopic
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp
new file mode 100644
index 0000000000..fa135df282
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp
@@ -0,0 +1,30 @@
+#include <ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h>
+#include <ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.h>
+
+namespace NYdb::NFederatedTopic {
+
+NTopic::TTopicClientSettings FromFederated(const TFederatedTopicClientSettings& settings) {
+ return NTopic::TTopicClientSettings()
+ .DefaultCompressionExecutor(settings.DefaultCompressionExecutor_)
+ .DefaultHandlersExecutor(settings.DefaultHandlersExecutor_);
+}
+
+TFederatedTopicClient::TFederatedTopicClient(const TDriver& driver, const TFederatedTopicClientSettings& settings)
+ : Impl_(std::make_shared<TImpl>(CreateInternalInterface(driver), settings))
+{
+}
+
+std::shared_ptr<IFederatedReadSession> TFederatedTopicClient::CreateFederatedReadSession(const TFederatedReadSessionSettings& settings) {
+ return Impl_->CreateFederatedReadSession(settings);
+}
+
+// std::shared_ptr<NTopic::ISimpleBlockingWriteSession> TFederatedTopicClient::CreateSimpleBlockingFederatedWriteSession(
+// const TFederatedWriteSessionSettings& settings) {
+// return Impl_->CreateSimpleFederatedWriteSession(settings);
+// }
+
+// std::shared_ptr<NTopic::IWriteSession> TFederatedTopicClient::CreateFederatedWriteSession(const TFederatedWriteSessionSettings& settings) {
+// return Impl_->CreateFederatedWriteSession(settings);
+// }
+
+} // namespace NYdb::NFederatedTopic
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp
new file mode 100644
index 0000000000..fa091252b5
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp
@@ -0,0 +1,28 @@
+#include "federated_topic_impl.h"
+
+#include "federated_read_session.h"
+// #include "federated_write_session.h"
+
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h>
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h>
+
+namespace NYdb::NFederatedTopic {
+
+std::shared_ptr<IFederatedReadSession>
+TFederatedTopicClient::TImpl::CreateFederatedReadSession(const TFederatedReadSessionSettings& settings) {
+ InitObserver();
+ auto session = std::make_shared<TFederatedReadSession>(settings, Connections, ClientSettings, GetObserver());
+ session->Start();
+ return std::move(session);
+}
+
+void TFederatedTopicClient::TImpl::InitObserver() {
+ with_lock(Lock) {
+ if (!Observer || Observer->IsStale()) {
+ Observer = std::make_shared<TFederatedDbObserver>(Connections, ClientSettings);
+ Observer->Start();
+ }
+ }
+}
+
+}
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.h
new file mode 100644
index 0000000000..c963b3a94a
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.h
@@ -0,0 +1,53 @@
+#pragma once
+
+#define INCLUDE_YDB_INTERNAL_H
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/make_request/make.h>
+#undef INCLUDE_YDB_INTERNAL_H
+
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h>
+#include <ydb/public/sdk/cpp/client/ydb_topic/impl/executor.h>
+#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
+
+#include <ydb/public/api/grpc/ydb_topic_v1.grpc.pb.h>
+#include <ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h>
+#include <ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.h>
+
+namespace NYdb::NFederatedTopic {
+
+class TFederatedTopicClient::TImpl {
+public:
+ // Constructor for main client.
+ TImpl(std::shared_ptr<TGRpcConnectionsImpl>&& connections, const TFederatedTopicClientSettings& settings)
+ : Connections(std::move(connections))
+ , ClientSettings(settings)
+ {
+ InitObserver();
+ }
+
+ ~TImpl() {
+ with_lock(Lock) {
+ if (Observer) {
+ Observer->Stop();
+ }
+ }
+ }
+
+ // Runtime API.
+ std::shared_ptr<IFederatedReadSession> CreateFederatedReadSession(const TFederatedReadSessionSettings& settings);
+
+ std::shared_ptr<TFederatedDbObserver> GetObserver() {
+ with_lock(Lock) {
+ return Observer;
+ }
+ }
+
+ void InitObserver();
+
+private:
+ std::shared_ptr<TGRpcConnectionsImpl> Connections;
+ const TFederatedTopicClientSettings ClientSettings;
+ std::shared_ptr<TFederatedDbObserver> Observer;
+ TAdaptiveLock Lock;
+};
+
+} // namespace NYdb::NFederatedTopic
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp
new file mode 100644
index 0000000000..2742a79d14
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp
@@ -0,0 +1,161 @@
+#include <ydb/public/api/grpc/ydb_federation_discovery_v1.grpc.pb.h>
+
+#include <ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.h>
+
+namespace NYdb::NFederatedTopic {
+
+constexpr TDuration REDISCOVERY_DELAY = TDuration::Seconds(30);
+
+TFederatedDbObserver::TFederatedDbObserver(std::shared_ptr<TGRpcConnectionsImpl> connections, const TFederatedTopicClientSettings& settings)
+ : TClientImplCommon(std::move(connections), settings)
+ , FederatedDbState(std::make_shared<TFederatedDbState>())
+ , PromiseToInitState(NThreading::NewPromise())
+ , FederationDiscoveryRetryPolicy(settings.RetryPolicy_)
+{
+ RpcSettings.ClientTimeout = settings.ConnectionTimeout_;
+ RpcSettings.EndpointPolicy = TRpcRequestSettings::TEndpointPolicy::UseDiscoveryEndpoint;
+ RpcSettings.UseAuth = true;
+}
+
+TFederatedDbObserver::~TFederatedDbObserver() {
+ Stop();
+}
+
+std::shared_ptr<TFederatedDbState> TFederatedDbObserver::GetState() {
+ with_lock(Lock) {
+ return FederatedDbState;
+ }
+}
+
+NThreading::TFuture<void> TFederatedDbObserver::WaitForFirstState() {
+ return PromiseToInitState.GetFuture();
+}
+
+void TFederatedDbObserver::Start() {
+ with_lock(Lock) {
+ if (Stopping) {
+ return;
+ }
+ ScheduleFederationDiscoveryImpl(TDuration::Zero());
+ }
+}
+
+void TFederatedDbObserver::Stop() {
+ NGrpc::IQueueClientContextPtr ctx;
+ with_lock(Lock) {
+ Stopping = true;
+ ctx = std::exchange(FederationDiscoveryDelayContext, nullptr);
+ }
+ if (ctx) {
+ ctx->Cancel();
+ }
+}
+
+// If observer is stale it will never update state again because of client retry policy
+bool TFederatedDbObserver::IsStale() const {
+ with_lock(Lock) {
+ return PromiseToInitState.HasValue() && !FederatedDbState->Status.IsSuccess();
+ }
+}
+
+Ydb::FederationDiscovery::ListFederationDatabasesRequest TFederatedDbObserver::ComposeRequest() const {
+ return {};
+}
+
+void TFederatedDbObserver::RunFederationDiscoveryImpl() {
+ Y_VERIFY(Lock.IsLocked());
+
+ FederationDiscoveryDelayContext = Connections_->CreateContext();
+ if (!FederationDiscoveryDelayContext) {
+ Stopping = true;
+ // TODO log DRIVER_IS_STOPPING_DESCRIPTION
+ return;
+ }
+
+ auto extractor = [self = shared_from_this()]
+ (google::protobuf::Any* any, TPlainStatus status) mutable {
+
+ Ydb::FederationDiscovery::ListFederationDatabasesResult result;
+ if (any) {
+ any->UnpackTo(&result);
+ }
+ self->OnFederationDiscovery(std::move(status), std::move(result));
+ };
+
+ Connections_->RunDeferred<Ydb::FederationDiscovery::V1::FederationDiscoveryService,
+ Ydb::FederationDiscovery::ListFederationDatabasesRequest,
+ Ydb::FederationDiscovery::ListFederationDatabasesResponse>(
+ ComposeRequest(),
+ std::move(extractor),
+ &Ydb::FederationDiscovery::V1::FederationDiscoveryService::Stub::AsyncListFederationDatabases,
+ DbDriverState_,
+ {}, // no polling unready operations, so no need in delay parameter
+ RpcSettings,
+ FederationDiscoveryDelayContext);
+}
+
+void TFederatedDbObserver::ScheduleFederationDiscoveryImpl(TDuration delay) {
+ Y_VERIFY(Lock.IsLocked());
+ auto cb = [self = shared_from_this()](bool ok) {
+ if (ok) {
+ with_lock(self->Lock) {
+ if (self->Stopping) {
+ return;
+ }
+ self->RunFederationDiscoveryImpl();
+ }
+ }
+ };
+
+ FederationDiscoveryDelayContext = Connections_->CreateContext();
+ if (!FederationDiscoveryDelayContext) {
+ Stopping = true;
+ // TODO log DRIVER_IS_STOPPING_DESCRIPTION
+ return;
+ }
+ Connections_->ScheduleCallback(delay,
+ std::move(cb),
+ FederationDiscoveryDelayContext);
+
+}
+
+void TFederatedDbObserver::OnFederationDiscovery(TStatus&& status, Ydb::FederationDiscovery::ListFederationDatabasesResult&& result) {
+ with_lock(Lock) {
+ if (Stopping) {
+ // TODO log something
+ return;
+ }
+
+ if (!status.IsSuccess()) {
+ // if UNIMPLEMENTED - fall back to single db mode:
+ // - initialize FederatedDbState with original db + endpoint
+ // - no more updates: no reschedules
+
+ // TODO
+ // update counters errors
+
+ if (!FederationDiscoveryRetryState) {
+ FederationDiscoveryRetryState = FederationDiscoveryRetryPolicy->CreateRetryState();
+ }
+ TMaybe<TDuration> retryDelay = FederationDiscoveryRetryState->GetNextRetryDelay(status.GetStatus());
+ if (retryDelay) {
+ ScheduleFederationDiscoveryImpl(*retryDelay);
+ return;
+ }
+ } else {
+ ScheduleFederationDiscoveryImpl(REDISCOVERY_DELAY);
+ }
+
+ // TODO validate new state and check if differs from previous
+
+ auto newInfo = std::make_shared<TFederatedDbState>(std::move(result), std::move(status));
+ // TODO update only if new state differs
+ std::swap(FederatedDbState, newInfo);
+ }
+
+ if (!PromiseToInitState.HasValue()) {
+ PromiseToInitState.SetValue();
+ }
+}
+
+} // namespace NYdb::NFederatedTopic
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.h
new file mode 100644
index 0000000000..55de50168c
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.h
@@ -0,0 +1,82 @@
+#pragma once
+
+#define INCLUDE_YDB_INTERNAL_H
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/make_request/make.h>
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h>
+#undef INCLUDE_YDB_INTERNAL_H
+
+#include <ydb/public/api/protos/ydb_federation_discovery.pb.h>
+
+#include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h>
+#include <ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h>
+
+#include <util/system/spinlock.h>
+#include <util/generic/hash.h>
+
+#include <deque>
+#include <memory>
+
+namespace NYdb::NFederatedTopic {
+
+struct TFederatedDbState {
+public:
+ using TDbInfo = Ydb::FederationDiscovery::DatabaseInfo;
+
+ TStatus Status;
+ TString ControlPlaneEndpoint;
+ TString SelfLocation;
+ std::vector<std::shared_ptr<TDbInfo>> DbInfos;
+
+public:
+ TFederatedDbState() : Status(EStatus::STATUS_UNDEFINED, {}) {}
+ TFederatedDbState(Ydb::FederationDiscovery::ListFederationDatabasesResult result, TStatus status)
+ : Status(std::move(status))
+ , ControlPlaneEndpoint(result.control_plane_endpoint())
+ , SelfLocation(result.self_location())
+ {
+ // TODO remove copy
+ for (const auto& db : result.federation_databases()) {
+ DbInfos.push_back(std::make_shared<TDbInfo>(db));
+ }
+ }
+};
+
+
+class TFederatedDbObserver : public TClientImplCommon<TFederatedDbObserver> {
+public:
+ static constexpr TDuration REDISCOVER_DELAY = TDuration::Seconds(60);
+
+public:
+ TFederatedDbObserver(std::shared_ptr<TGRpcConnectionsImpl> connections, const TFederatedTopicClientSettings& settings);
+
+ ~TFederatedDbObserver();
+
+ std::shared_ptr<TFederatedDbState> GetState();
+
+ NThreading::TFuture<void> WaitForFirstState();
+
+ void Start();
+ void Stop();
+
+ bool IsStale() const;
+
+private:
+ Ydb::FederationDiscovery::ListFederationDatabasesRequest ComposeRequest() const;
+ void RunFederationDiscoveryImpl();
+ void ScheduleFederationDiscoveryImpl(TDuration delay);
+ void OnFederationDiscovery(TStatus&& status, Ydb::FederationDiscovery::ListFederationDatabasesResult&& result);
+
+private:
+ std::shared_ptr<TFederatedDbState> FederatedDbState;
+ NThreading::TPromise<void> PromiseToInitState;
+ TRpcRequestSettings RpcSettings;
+ TSpinLock Lock;
+
+ NTopic::IRetryPolicy::TPtr FederationDiscoveryRetryPolicy;
+ NTopic::IRetryPolicy::IRetryState::TPtr FederationDiscoveryRetryState;
+ NGrpc::IQueueClientContextPtr FederationDiscoveryDelayContext;
+
+ bool Stopping = false;
+};
+
+} // namespace NYdb::NFederatedTopic
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/ya.make b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/ya.make
new file mode 100644
index 0000000000..491270cda6
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/ya.make
@@ -0,0 +1,29 @@
+LIBRARY()
+
+SRCS(
+ federated_read_session.h
+ federated_read_session.cpp
+ federated_read_session_event.cpp
+ federated_topic_impl.h
+ federated_topic_impl.cpp
+ federated_topic.cpp
+ federation_observer.h
+ federation_observer.cpp
+)
+
+PEERDIR(
+ library/cpp/grpc/client
+ library/cpp/monlib/dynamic_counters
+ library/cpp/monlib/metrics
+ library/cpp/string_utils/url
+ ydb/library/persqueue/obfuscate
+ ydb/public/api/grpc/draft
+ ydb/public/api/grpc
+ ydb/public/sdk/cpp/client/impl/ydb_internal/make_request
+ ydb/public/sdk/cpp/client/ydb_common_client/impl
+ ydb/public/sdk/cpp/client/ydb_driver
+ ydb/public/sdk/cpp/client/ydb_persqueue_core/impl
+ ydb/public/sdk/cpp/client/ydb_proto
+)
+
+END()
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..8ae3258bf2
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,89 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-public-sdk-cpp-client-ydb_federated_topic-ut)
+target_compile_options(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic
+)
+target_link_libraries(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ cpp-client-ydb_federated_topic
+ cpp-testing-gmock_in_unittest
+ core-testlib-default
+ public-lib-json_value
+ public-lib-yson_value
+ cpp-client-ydb_driver
+ cpp-client-ydb_persqueue_core
+ client-ydb_persqueue_core-impl
+ ydb_persqueue_core-ut-ut_utils
+ client-ydb_topic-codecs
+ cpp-client-ydb_topic
+ client-ydb_topic-impl
+ client-ydb_federated_topic-impl
+)
+target_link_options(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PRIVATE
+ -Wl,-platform_version,macos,11.0,11.0
+ -fPIC
+ -fPIC
+ -framework
+ CoreFoundation
+)
+target_sources(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp
+)
+set_property(
+ TARGET
+ ydb-public-sdk-cpp-client-ydb_federated_topic-ut
+ PROPERTY
+ SPLIT_FACTOR
+ 10
+)
+add_yunittest(
+ NAME
+ ydb-public-sdk-cpp-client-ydb_federated_topic-ut
+ TEST_TARGET
+ ydb-public-sdk-cpp-client-ydb_federated_topic-ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-public-sdk-cpp-client-ydb_federated_topic-ut
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-public-sdk-cpp-client-ydb_federated_topic-ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-public-sdk-cpp-client-ydb_federated_topic-ut
+ PROPERTY
+ TIMEOUT
+ 600
+)
+target_allocator(ydb-public-sdk-cpp-client-ydb_federated_topic-ut
+ system_allocator
+)
+vcs_info(ydb-public-sdk-cpp-client-ydb_federated_topic-ut)
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..f5d7b13900
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,92 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-public-sdk-cpp-client-ydb_federated_topic-ut)
+target_compile_options(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic
+)
+target_link_libraries(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-testing-unittest_main
+ cpp-client-ydb_federated_topic
+ cpp-testing-gmock_in_unittest
+ core-testlib-default
+ public-lib-json_value
+ public-lib-yson_value
+ cpp-client-ydb_driver
+ cpp-client-ydb_persqueue_core
+ client-ydb_persqueue_core-impl
+ ydb_persqueue_core-ut-ut_utils
+ client-ydb_topic-codecs
+ cpp-client-ydb_topic
+ client-ydb_topic-impl
+ client-ydb_federated_topic-impl
+)
+target_link_options(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp
+)
+set_property(
+ TARGET
+ ydb-public-sdk-cpp-client-ydb_federated_topic-ut
+ PROPERTY
+ SPLIT_FACTOR
+ 10
+)
+add_yunittest(
+ NAME
+ ydb-public-sdk-cpp-client-ydb_federated_topic-ut
+ TEST_TARGET
+ ydb-public-sdk-cpp-client-ydb_federated_topic-ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-public-sdk-cpp-client-ydb_federated_topic-ut
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-public-sdk-cpp-client-ydb_federated_topic-ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-public-sdk-cpp-client-ydb_federated_topic-ut
+ PROPERTY
+ TIMEOUT
+ 600
+)
+target_allocator(ydb-public-sdk-cpp-client-ydb_federated_topic-ut
+ cpp-malloc-jemalloc
+)
+vcs_info(ydb-public-sdk-cpp-client-ydb_federated_topic-ut)
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..a70159ebcd
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,94 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-public-sdk-cpp-client-ydb_federated_topic-ut)
+target_compile_options(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic
+)
+target_link_libraries(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ cpp-client-ydb_federated_topic
+ cpp-testing-gmock_in_unittest
+ core-testlib-default
+ public-lib-json_value
+ public-lib-yson_value
+ cpp-client-ydb_driver
+ cpp-client-ydb_persqueue_core
+ client-ydb_persqueue_core-impl
+ ydb_persqueue_core-ut-ut_utils
+ client-ydb_topic-codecs
+ cpp-client-ydb_topic
+ client-ydb_topic-impl
+ client-ydb_federated_topic-impl
+)
+target_link_options(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp
+)
+set_property(
+ TARGET
+ ydb-public-sdk-cpp-client-ydb_federated_topic-ut
+ PROPERTY
+ SPLIT_FACTOR
+ 10
+)
+add_yunittest(
+ NAME
+ ydb-public-sdk-cpp-client-ydb_federated_topic-ut
+ TEST_TARGET
+ ydb-public-sdk-cpp-client-ydb_federated_topic-ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-public-sdk-cpp-client-ydb_federated_topic-ut
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-public-sdk-cpp-client-ydb_federated_topic-ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-public-sdk-cpp-client-ydb_federated_topic-ut
+ PROPERTY
+ TIMEOUT
+ 600
+)
+target_allocator(ydb-public-sdk-cpp-client-ydb_federated_topic-ut
+ cpp-malloc-tcmalloc
+ libs-tcmalloc-no_percpu_cache
+)
+vcs_info(ydb-public-sdk-cpp-client-ydb_federated_topic-ut)
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.windows-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..4d6c28c3cd
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,82 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-public-sdk-cpp-client-ydb_federated_topic-ut)
+target_compile_options(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic
+)
+target_link_libraries(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ cpp-client-ydb_federated_topic
+ cpp-testing-gmock_in_unittest
+ core-testlib-default
+ public-lib-json_value
+ public-lib-yson_value
+ cpp-client-ydb_driver
+ cpp-client-ydb_persqueue_core
+ client-ydb_persqueue_core-impl
+ ydb_persqueue_core-ut-ut_utils
+ client-ydb_topic-codecs
+ cpp-client-ydb_topic
+ client-ydb_topic-impl
+ client-ydb_federated_topic-impl
+)
+target_sources(ydb-public-sdk-cpp-client-ydb_federated_topic-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp
+)
+set_property(
+ TARGET
+ ydb-public-sdk-cpp-client-ydb_federated_topic-ut
+ PROPERTY
+ SPLIT_FACTOR
+ 10
+)
+add_yunittest(
+ NAME
+ ydb-public-sdk-cpp-client-ydb_federated_topic-ut
+ TEST_TARGET
+ ydb-public-sdk-cpp-client-ydb_federated_topic-ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-public-sdk-cpp-client-ydb_federated_topic-ut
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-public-sdk-cpp-client-ydb_federated_topic-ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-public-sdk-cpp-client-ydb_federated_topic-ut
+ PROPERTY
+ TIMEOUT
+ 600
+)
+target_allocator(ydb-public-sdk-cpp-client-ydb_federated_topic-ut
+ system_allocator
+)
+vcs_info(ydb-public-sdk-cpp-client-ydb_federated_topic-ut)
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp
new file mode 100644
index 0000000000..743c09b06a
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp
@@ -0,0 +1,397 @@
+#include <ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h>
+
+#include <ydb/public/sdk/cpp/client/ydb_topic/ut/managed_executor.h>
+
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h>
+
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h>
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h>
+
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.h>
+#include <ydb/public/sdk/cpp/client/ydb_federated_topic/ut/fds_mock.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+#include <library/cpp/testing/unittest/tests_data.h>
+#include <library/cpp/threading/future/future.h>
+#include <library/cpp/threading/future/async.h>
+
+#include <future>
+
+namespace NYdb::NFederatedTopic::NTests {
+
+Y_UNIT_TEST_SUITE(BasicUsage) {
+
+ Y_UNIT_TEST(GetAllStartPartitionSessions) {
+ size_t partitionsCount = 5;
+
+ auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(
+ TEST_CASE_NAME, false, ::NPersQueue::TTestServer::LOGGED_SERVICES, NActors::NLog::PRI_DEBUG, 2, partitionsCount);
+
+ setup->Start(true, true);
+
+ TFederationDiscoveryServiceMock fdsMock;
+ fdsMock.Port = setup->GetGrpcPort();
+
+ ui16 newServicePort = setup->GetPortManager()->GetPort(4285);
+ auto grpcServer = setup->StartGrpcService(newServicePort, &fdsMock);
+
+ std::shared_ptr<NYdb::NFederatedTopic::IFederatedReadSession> ReadSession;
+
+ // Create topic client.
+ NYdb::TDriverConfig cfg;
+ cfg.SetEndpoint(TStringBuilder() << "localhost:" << newServicePort);
+ cfg.SetDatabase("/Root");
+ cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG));
+ NYdb::TDriver driver(cfg);
+ NYdb::NFederatedTopic::TFederatedTopicClient topicClient(driver);
+
+ // Create read session.
+ NYdb::NFederatedTopic::TFederatedReadSessionSettings readSettings;
+ readSettings
+ .ConsumerName("shared/user")
+ .MaxMemoryUsageBytes(1_MB)
+ .AppendTopics(setup->GetTestTopic());
+
+ ReadSession = topicClient.CreateFederatedReadSession(readSettings);
+ Cerr << "Session was created" << Endl;
+
+ ReadSession->WaitEvent().Wait(TDuration::Seconds(1));
+ TMaybe<NYdb::NFederatedTopic::TReadSessionEvent::TEvent> event = ReadSession->GetEvent(false);
+ Y_ASSERT(!event);
+
+ auto fdsRequest = fdsMock.GetNextPendingRequest();
+ Y_ASSERT(fdsRequest.has_value());
+ // TODO check fdsRequest->Req db header
+
+ Ydb::FederationDiscovery::ListFederationDatabasesResponse response;
+
+ auto op = response.mutable_operation();
+ op->set_status(Ydb::StatusIds::SUCCESS);
+ response.mutable_operation()->set_ready(true);
+ response.mutable_operation()->set_id("12345");
+
+ Ydb::FederationDiscovery::ListFederationDatabasesResult mockResult;
+ mockResult.set_control_plane_endpoint("cp.logbroker-federation:2135");
+ mockResult.set_self_location("fancy_datacenter");
+ auto c1 = mockResult.add_federation_databases();
+ c1->set_name("dc1");
+ c1->set_path("/Root");
+ c1->set_id("account-dc1");
+ c1->set_endpoint("localhost:" + ToString(fdsMock.Port));
+ c1->set_location("dc1");
+ c1->set_status(::Ydb::FederationDiscovery::DatabaseInfo::Status::DatabaseInfo_Status_AVAILABLE);
+ c1->set_weight(1000);
+ auto c2 = mockResult.add_federation_databases();
+ c2->set_name("dc2");
+ c2->set_path("/Root");
+ c2->set_id("account-dc2");
+ c2->set_endpoint("localhost:" + ToString(fdsMock.Port));
+ c2->set_location("dc2");
+ c2->set_status(::Ydb::FederationDiscovery::DatabaseInfo::Status::DatabaseInfo_Status_AVAILABLE);
+ c2->set_weight(500);
+
+ op->mutable_result()->PackFrom(mockResult);
+
+ fdsRequest->Result.SetValue({std::move(response), grpc::Status::OK});
+
+ for (size_t i = 0; i < partitionsCount; ++i) {
+ ReadSession->WaitEvent().Wait();
+ // Get event
+ TMaybe<NYdb::NFederatedTopic::TReadSessionEvent::TEvent> event = ReadSession->GetEvent(true/*block - will block if no event received yet*/);
+ Cerr << "Got new read session event: " << DebugString(*event) << Endl;
+
+ auto* startPartitionSessionEvent = std::get_if<NYdb::NFederatedTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*event);
+ Y_ASSERT(startPartitionSessionEvent);
+ startPartitionSessionEvent->Confirm();
+ }
+
+ ReadSession->Close(TDuration::MilliSeconds(10));
+ }
+
+ Y_UNIT_TEST(WaitEventBlocksBeforeDiscovery) {
+ auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME, false);
+ setup->Start(true, true);
+
+ TFederationDiscoveryServiceMock fdsMock;
+ fdsMock.Port = setup->GetGrpcPort();
+
+ ui16 newServicePort = setup->GetPortManager()->GetPort(4285);
+ auto grpcServer = setup->StartGrpcService(newServicePort, &fdsMock);
+
+ std::shared_ptr<NYdb::NFederatedTopic::IFederatedReadSession> ReadSession;
+
+ // Create topic client.
+ NYdb::TDriverConfig cfg;
+ cfg.SetEndpoint(TStringBuilder() << "localhost:" << newServicePort);
+ cfg.SetDatabase("/Root");
+ cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG));
+ NYdb::TDriver driver(cfg);
+ NYdb::NFederatedTopic::TFederatedTopicClient topicClient(driver);
+
+ // Create read session.
+ NYdb::NFederatedTopic::TFederatedReadSessionSettings readSettings;
+ readSettings
+ .ConsumerName("shared/user")
+ .MaxMemoryUsageBytes(1_MB)
+ .AppendTopics(setup->GetTestTopic());
+
+ ReadSession = topicClient.CreateFederatedReadSession(readSettings);
+ Cerr << "Session was created" << Endl;
+
+ auto f = ReadSession->WaitEvent();
+ Cerr << "Returned from WaitEvent" << Endl;
+ // observer asyncInit should respect client/session timeouts
+ UNIT_ASSERT(!f.Wait(TDuration::Seconds(1)));
+
+ Cerr << "Session blocked successfully" << Endl;
+
+ UNIT_ASSERT(ReadSession->Close(TDuration::MilliSeconds(10)));
+ Cerr << "Session closed gracefully" << Endl;
+ }
+
+ Y_UNIT_TEST(RetryDiscoveryWithCancel) {
+ // TODO register requests in mock, compare time for retries, reschedules
+
+ auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME, false);
+ setup->Start(true, true);
+
+ TFederationDiscoveryServiceMock fdsMock;
+ fdsMock.Port = setup->GetGrpcPort();
+
+ ui16 newServicePort = setup->GetPortManager()->GetPort(4285);
+ auto grpcServer = setup->StartGrpcService(newServicePort, &fdsMock);
+
+ std::shared_ptr<NYdb::NFederatedTopic::IFederatedReadSession> ReadSession;
+
+ // Create topic client.
+ NYdb::TDriverConfig cfg;
+ cfg.SetEndpoint(TStringBuilder() << "localhost:" << newServicePort);
+ cfg.SetDatabase("/Root");
+ cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG));
+ NYdb::TDriver driver(cfg);
+ auto clientSettings = TFederatedTopicClientSettings()
+ .RetryPolicy(NTopic::IRetryPolicy::GetFixedIntervalPolicy(
+ TDuration::Seconds(10),
+ TDuration::Seconds(10)
+ ));
+ NYdb::NFederatedTopic::TFederatedTopicClient topicClient(driver, clientSettings);
+
+ bool answerOk = false;
+
+ for (int i = 0; i < 6; ++i) {
+ std::optional<TFederationDiscoveryServiceMock::TManualRequest> fdsRequest;
+ do {
+ Sleep(TDuration::MilliSeconds(50));
+ fdsRequest = fdsMock.GetNextPendingRequest();
+
+ } while (!fdsRequest.has_value());
+
+ if (answerOk) {
+ fdsRequest->Result.SetValue(fdsMock.ComposeOkResult());
+ } else {
+ fdsRequest->Result.SetValue({{}, grpc::Status(grpc::StatusCode::UNAVAILABLE, "mock 'unavailable'")});
+ }
+
+ answerOk = !answerOk;
+ }
+ }
+
+ Y_UNIT_TEST(PropagateSessionClosed) {
+ auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME, false);
+ setup->Start(true, true);
+
+ TFederationDiscoveryServiceMock fdsMock;
+ fdsMock.Port = setup->GetGrpcPort();
+
+ ui16 newServicePort = setup->GetPortManager()->GetPort(4285);
+ auto grpcServer = setup->StartGrpcService(newServicePort, &fdsMock);
+
+ std::shared_ptr<NYdb::NFederatedTopic::IFederatedReadSession> ReadSession;
+
+ // Create topic client.
+ NYdb::TDriverConfig cfg;
+ cfg.SetEndpoint(TStringBuilder() << "localhost:" << newServicePort);
+ cfg.SetDatabase("/Root");
+ cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG));
+ NYdb::TDriver driver(cfg);
+ auto clientSettings = TFederatedTopicClientSettings()
+ .RetryPolicy(NTopic::IRetryPolicy::GetFixedIntervalPolicy(
+ TDuration::Seconds(10),
+ TDuration::Seconds(10)
+ ));
+ NYdb::NFederatedTopic::TFederatedTopicClient topicClient(driver, clientSettings);
+
+
+ // Create read session.
+ NYdb::NFederatedTopic::TFederatedReadSessionSettings readSettings;
+ readSettings
+ .ConsumerName("shared/user")
+ .MaxMemoryUsageBytes(1_MB)
+ .AppendTopics(setup->GetTestTopic());
+
+ ReadSession = topicClient.CreateFederatedReadSession(readSettings);
+ Cerr << "Session was created" << Endl;
+
+ Sleep(TDuration::MilliSeconds(50));
+
+ auto events = ReadSession->GetEvents(false);
+ UNIT_ASSERT(events.empty());
+
+ Ydb::FederationDiscovery::ListFederationDatabasesResponse Response;
+
+ auto op = Response.mutable_operation();
+ op->set_status(Ydb::StatusIds::SUCCESS);
+ Response.mutable_operation()->set_ready(true);
+ Response.mutable_operation()->set_id("12345");
+
+ Ydb::FederationDiscovery::ListFederationDatabasesResult mockResult;
+ mockResult.set_control_plane_endpoint("cp.logbroker-federation:2135");
+ mockResult.set_self_location("fancy_datacenter");
+ auto c1 = mockResult.add_federation_databases();
+ c1->set_name("dc1");
+ c1->set_path("/Root");
+ c1->set_id("account-dc1");
+ c1->set_endpoint("localhost:" + ToString(fdsMock.Port));
+ c1->set_location("dc1");
+ c1->set_status(::Ydb::FederationDiscovery::DatabaseInfo::Status::DatabaseInfo_Status_AVAILABLE);
+ c1->set_weight(1000);
+ auto c2 = mockResult.add_federation_databases();
+ c2->set_name("dc2");
+ c2->set_path("/Invalid");
+ c2->set_id("account-dc2");
+ c2->set_endpoint("localhost:" + ToString(fdsMock.Port));
+ c2->set_location("dc2");
+ c2->set_status(::Ydb::FederationDiscovery::DatabaseInfo::Status::DatabaseInfo_Status_AVAILABLE);
+ c2->set_weight(500);
+
+ op->mutable_result()->PackFrom(mockResult);
+
+ std::optional<TFederationDiscoveryServiceMock::TManualRequest> fdsRequest;
+ do {
+ fdsRequest = fdsMock.GetNextPendingRequest();
+ if (!fdsRequest.has_value()) {
+ Sleep(TDuration::MilliSeconds(50));
+ }
+ } while (!fdsRequest.has_value());
+ fdsRequest->Result.SetValue({Response, grpc::Status::OK});
+
+ NPersQueue::TWriteSessionSettings writeSettings;
+ writeSettings.Path(setup->GetTestTopic()).MessageGroupId("src_id");
+ writeSettings.Codec(NPersQueue::ECodec::RAW);
+ NPersQueue::IExecutor::TPtr executor = new NPersQueue::TSyncExecutor();
+ writeSettings.CompressionExecutor(executor);
+
+ auto& client = setup->GetPersQueueClient();
+ auto session = client.CreateSimpleBlockingWriteSession(writeSettings);
+ TString messageBase = "message----";
+
+
+ ui64 count = 100u;
+ for (auto i = 0u; i < count; i++) {
+ auto res = session->Write(messageBase * (200 * 1024) + ToString(i));
+ UNIT_ASSERT(res);
+
+ events = ReadSession->GetEvents(true);
+ UNIT_ASSERT(!events.empty());
+
+ for (auto& e : events) {
+ Cerr << ">>> Got event: " << DebugString(e) << Endl;
+ if (auto* dataEvent = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&e)) {
+ dataEvent->Commit();
+ } else if (auto* startPartitionSessionEvent = std::get_if<TReadSessionEvent::TStartPartitionSessionEvent>(&e)) {
+ startPartitionSessionEvent->Confirm();
+ } else if (auto* stopPartitionSessionEvent = std::get_if<TReadSessionEvent::TStopPartitionSessionEvent>(&e)) {
+ stopPartitionSessionEvent->Confirm();
+ }
+ }
+ }
+
+ session->Close();
+ }
+
+ Y_UNIT_TEST(RecreateObserver) {
+ // TODO register requests in mock, compare time for retries, reschedules
+
+ auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME, false);
+ setup->Start(true, true);
+
+ TFederationDiscoveryServiceMock fdsMock;
+ fdsMock.Port = setup->GetGrpcPort();
+
+ ui16 newServicePort = setup->GetPortManager()->GetPort(4285);
+ auto grpcServer = setup->StartGrpcService(newServicePort, &fdsMock);
+
+ std::shared_ptr<NYdb::NFederatedTopic::IFederatedReadSession> ReadSession;
+
+ // Create topic client.
+ NYdb::TDriverConfig cfg;
+ cfg.SetEndpoint(TStringBuilder() << "localhost:" << newServicePort);
+ cfg.SetDatabase("/Root");
+ cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG));
+ NYdb::TDriver driver(cfg);
+ auto clientSettings = TFederatedTopicClientSettings()
+ .RetryPolicy(NTopic::IRetryPolicy::GetNoRetryPolicy());
+ NYdb::NFederatedTopic::TFederatedTopicClient topicClient(driver, clientSettings);
+
+
+ // Create read session.
+ NYdb::NFederatedTopic::TFederatedReadSessionSettings readSettings;
+ readSettings
+ .ConsumerName("shared/user")
+ .MaxMemoryUsageBytes(1_MB)
+ .AppendTopics(setup->GetTestTopic());
+
+ ReadSession = topicClient.CreateFederatedReadSession(readSettings);
+ Cerr << "Session was created" << Endl;
+
+ ReadSession->WaitEvent().Wait(TDuration::Seconds(1));
+ auto event = ReadSession->GetEvent(false);
+ UNIT_ASSERT(!event.Defined());
+
+
+ std::optional<TFederationDiscoveryServiceMock::TManualRequest> fdsRequest;
+ do {
+ fdsRequest = fdsMock.GetNextPendingRequest();
+ if (!fdsRequest.has_value()) {
+ Sleep(TDuration::MilliSeconds(50));
+ }
+ } while (!fdsRequest.has_value());
+
+ fdsRequest->Result.SetValue({{}, grpc::Status(grpc::StatusCode::UNAVAILABLE, "mock 'unavailable'")});
+
+ ReadSession->WaitEvent().Wait();
+ event = ReadSession->GetEvent(false);
+ UNIT_ASSERT(event.Defined());
+ Cerr << ">>> Got event: " << DebugString(*event) << Endl;
+ UNIT_ASSERT(std::holds_alternative<NTopic::TSessionClosedEvent>(*event));
+
+ auto ReadSession2 = topicClient.CreateFederatedReadSession(readSettings);
+ Cerr << "Session2 was created" << Endl;
+
+ ReadSession2->WaitEvent().Wait(TDuration::Seconds(1));
+ event = ReadSession2->GetEvent(false);
+ UNIT_ASSERT(!event.Defined());
+
+ do {
+ fdsRequest = fdsMock.GetNextPendingRequest();
+ if (!fdsRequest.has_value()) {
+ Sleep(TDuration::MilliSeconds(50));
+ }
+ } while (!fdsRequest.has_value());
+
+ fdsRequest->Result.SetValue(fdsMock.ComposeOkResult());
+
+ event = ReadSession2->GetEvent(true);
+ UNIT_ASSERT(event.Defined());
+ Cerr << ">>> Got event: " << DebugString(*event) << Endl;
+ UNIT_ASSERT(std::holds_alternative<TReadSessionEvent::TStartPartitionSessionEvent>(*event));
+
+ // Cerr << ">>> Got event: " << DebugString(*event) << Endl;
+ // UNIT_ASSERT(std::holds_alternative<NTopic::TSessionClosedEvent>(*event));
+ }
+
+
+
+}
+
+}
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/fds_mock.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/fds_mock.h
new file mode 100644
index 0000000000..b0746d14b7
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/fds_mock.h
@@ -0,0 +1,112 @@
+#pragma once
+
+#include <library/cpp/threading/future/future.h>
+#include <ydb/public/api/grpc/ydb_federation_discovery_v1.grpc.pb.h>
+
+#include <deque>
+#include <optional>
+
+namespace NYdb::NFederatedTopic::NTests {
+
+// ctor gets ---list of response--- tmaybe{response}
+// ListFederationDatabases gets 1 element under lock and respond. otherwise
+// create 2 queues, for requests and responses
+// getrequest() - put request and returns<request, promise<response>>
+// sendresponse()
+class TFederationDiscoveryServiceMock: public Ydb::FederationDiscovery::V1::FederationDiscoveryService::Service {
+public:
+ using TRequest = Ydb::FederationDiscovery::ListFederationDatabasesRequest;
+ using TResponse = Ydb::FederationDiscovery::ListFederationDatabasesResponse;
+
+ struct TGrpcResult {
+ TResponse Response;
+ grpc::Status Status;
+ };
+
+ struct TManualRequest {
+ const TRequest* Request;
+ NThreading::TPromise<TGrpcResult> Result;
+ };
+
+public:
+ ~TFederationDiscoveryServiceMock() {
+ while (auto r = GetNextPendingRequest()) {
+ r->Result.SetValue({});
+ }
+ }
+
+ std::optional<TManualRequest> GetNextPendingRequest() {
+ std::optional<TManualRequest> result;
+ with_lock(Lock) {
+ if (!PendingRequests.empty()) {
+ result = PendingRequests.front();
+ PendingRequests.pop_front();
+ }
+ }
+ return result;
+ }
+
+ virtual grpc::Status ListFederationDatabases(grpc::ServerContext*,
+ const TRequest* request,
+ TResponse* response) override {
+ Y_UNUSED(request);
+
+ auto p = NThreading::NewPromise<TGrpcResult>();
+ auto f = p.GetFuture();
+
+ with_lock(Lock) {
+ PendingRequests.push_back({request, std::move(p)});
+ }
+
+ f.Wait(TDuration::Seconds(35));
+ if (f.HasValue()) {
+ auto result = f.ExtractValueSync();
+ Cerr << ">>> Ready to answer: " << (result.Status.ok() ? "ok" : "err") << Endl;
+ *response = std::move(result.Response);
+ return result.Status;
+ }
+
+ return grpc::Status(grpc::StatusCode::UNKNOWN, "No response after timeout");
+ }
+
+ TGrpcResult ComposeOkResult() {
+ Ydb::FederationDiscovery::ListFederationDatabasesResponse okResponse;
+
+ auto op = okResponse.mutable_operation();
+ op->set_status(Ydb::StatusIds::SUCCESS);
+ okResponse.mutable_operation()->set_ready(true);
+ okResponse.mutable_operation()->set_id("12345");
+
+ Ydb::FederationDiscovery::ListFederationDatabasesResult mockResult;
+ mockResult.set_control_plane_endpoint("cp.logbroker-federation:2135");
+ mockResult.set_self_location("fancy_datacenter");
+ auto c1 = mockResult.add_federation_databases();
+ c1->set_name("dc1");
+ c1->set_path("/Root");
+ c1->set_id("account-dc1");
+ c1->set_endpoint("localhost:" + ToString(Port));
+ c1->set_location("dc1");
+ c1->set_status(::Ydb::FederationDiscovery::DatabaseInfo::Status::DatabaseInfo_Status_AVAILABLE);
+ c1->set_weight(1000);
+ auto c2 = mockResult.add_federation_databases();
+ c2->set_name("dc2");
+ c2->set_path("/Root");
+ c2->set_id("account-dc2");
+ c2->set_endpoint("localhost:" + ToString(Port));
+ c2->set_location("dc2");
+ c2->set_status(::Ydb::FederationDiscovery::DatabaseInfo::Status::DatabaseInfo_Status_AVAILABLE);
+ c2->set_weight(500);
+
+ op->mutable_result()->PackFrom(mockResult);
+
+ return {okResponse, grpc::Status::OK};
+ }
+
+
+public:
+ ui16 Port;
+ std::deque<TManualRequest> PendingRequests;
+ TAdaptiveLock Lock;
+};
+
+} // namespace NYdb::NFederatedTopic::NTests
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/ya.make b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/ya.make
new file mode 100644
index 0000000000..d40d4432db
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/ya.make
@@ -0,0 +1,38 @@
+UNITTEST_FOR(ydb/public/sdk/cpp/client/ydb_federated_topic)
+
+IF (SANITIZER_TYPE)
+ TIMEOUT(1200)
+ SIZE(LARGE)
+ TAG(ya:fat)
+ELSE()
+ TIMEOUT(600)
+ SIZE(MEDIUM)
+ENDIF()
+
+FORK_SUBTESTS()
+
+PEERDIR(
+ library/cpp/testing/gmock_in_unittest
+ ydb/core/testlib/default
+ ydb/public/lib/json_value
+ ydb/public/lib/yson_value
+ ydb/public/sdk/cpp/client/ydb_driver
+ ydb/public/sdk/cpp/client/ydb_persqueue_core
+ ydb/public/sdk/cpp/client/ydb_persqueue_core/impl
+ ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils
+
+ ydb/public/sdk/cpp/client/ydb_topic/codecs
+ ydb/public/sdk/cpp/client/ydb_topic
+ ydb/public/sdk/cpp/client/ydb_topic/impl
+
+ ydb/public/sdk/cpp/client/ydb_federated_topic
+ ydb/public/sdk/cpp/client/ydb_federated_topic/impl
+)
+
+YQL_LAST_ABI_VERSION()
+
+SRCS(
+ basic_usage_ut.cpp
+)
+
+END()
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ya.make b/ydb/public/sdk/cpp/client/ydb_federated_topic/ya.make
new file mode 100644
index 0000000000..217088f2e0
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ya.make
@@ -0,0 +1,18 @@
+LIBRARY()
+
+GENERATE_ENUM_SERIALIZATION(ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h)
+
+SRCS(
+ federated_topic.h
+)
+
+PEERDIR(
+ ydb/public/sdk/cpp/client/ydb_topic
+ ydb/public/sdk/cpp/client/ydb_federated_topic/impl
+)
+
+END()
+
+RECURSE_FOR_TESTS(
+ ut
+)
diff --git a/ydb/public/sdk/cpp/ya.make b/ydb/public/sdk/cpp/ya.make
index 752050f35a..d308cb76a2 100644
--- a/ydb/public/sdk/cpp/ya.make
+++ b/ydb/public/sdk/cpp/ya.make
@@ -29,6 +29,9 @@ RECURSE(
client/ydb_topic/codecs
client/ydb_topic/impl
client/ydb_topic/ut
+ client/ydb_federated_topic
+ client/ydb_federated_topic/impl
+ client/ydb_federated_topic/ut
client/ydb_extension
client/ydb_result/ut
client/ydb_params/ut