aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorildar-khisambeev <ikhis@ydb.tech>2024-01-31 11:52:15 +0300
committerGitHub <noreply@github.com>2024-01-31 11:52:15 +0300
commit94651fc6c13f632932aaa1a57a1c790223452a98 (patch)
treef89370c25d70e569c68e66a0a0233916ee94b298
parent3b1a363cecc7a179a26519f4c3f3964c43183c2b (diff)
downloadydb-94651fc6c13f632932aaa1a57a1c790223452a98.tar.gz
LOGBROKER-8783 propose api (#537)
* Propose read policy settings for fed sdk api * better api * implementation * issues (except logging) * provide topic origin info * more issues, add basic test
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h128
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp133
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h120
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp63
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp37
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.h13
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp158
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/ut/fds_mock.h8
8 files changed, 528 insertions, 132 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h
index 121097e39d..534193d35b 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h
@@ -4,6 +4,10 @@
#include <ydb/public/api/protos/ydb_federation_discovery.pb.h>
+#include <ydb/public/sdk/cpp/client/ydb_types/exceptions/exceptions.h>
+
+#include <unordered_set>
+
namespace NYdb::NFederatedTopic {
using NTopic::TPrintable;
@@ -16,10 +20,17 @@ struct TFederatedPartitionSession : public TThrRefBase, public TPrintable<TFeder
using TPtr = TIntrusivePtr<TFederatedPartitionSession>;
public:
- TFederatedPartitionSession(const NTopic::TPartitionSession::TPtr& partitionSession, std::shared_ptr<TDbInfo> db)
+ TFederatedPartitionSession(const NTopic::TPartitionSession::TPtr& partitionSession,
+ std::shared_ptr<TDbInfo> db,
+ std::shared_ptr<TDbInfo> originDb = nullptr,
+ TString originPath = "")
: PartitionSession(partitionSession)
- , Db(std::move(db))
- {}
+ , ReadSourceDatabase(std::move(db))
+ , TopicOriginDatabase(originDb ? std::move(originDb) : ReadSourceDatabase)
+ , TopicOriginPath(originPath ? std::move(originPath) : PartitionSession->GetTopicPath())
+ {
+ Y_ABORT_UNLESS(ReadSourceDatabase);
+ }
//! Request partition session status.
//! Result will come to TPartitionSessionStatusEvent.
@@ -39,7 +50,7 @@ public:
//! Topic path.
const TString& GetTopicPath() const {
- return PartitionSession->GetTopicPath();
+ return TopicOriginPath;
}
//! Partition id.
@@ -48,34 +59,56 @@ public:
}
const TString& GetDatabaseName() const {
- return Db->name();
+ return GetTopicOriginDatabaseName();
}
const TString& GetDatabasePath() const {
- return Db->path();
+ return GetTopicOriginDatabasePath();
}
const TString& GetDatabaseId() const {
- return Db->id();
+ return GetTopicOriginDatabaseId();
+ }
+
+ const TString& GetReadSourceDatabaseName() const {
+ return ReadSourceDatabase->name();
+ }
+
+ const TString& GetReadSourceDatabasePath() const {
+ return ReadSourceDatabase->path();
+ }
+
+ const TString& GetReadSourceDatabaseId() const {
+ return ReadSourceDatabase->id();
+ }
+
+ const TString& GetTopicOriginDatabaseName() const {
+ return TopicOriginDatabase->name();
+ }
+
+ const TString& GetTopicOriginDatabasePath() const {
+ return TopicOriginDatabase->path();
+ }
+
+ const TString& GetTopicOriginDatabaseId() const {
+ return TopicOriginDatabase->id();
}
private:
NTopic::TPartitionSession::TPtr PartitionSession;
- std::shared_ptr<TDbInfo> Db;
+ std::shared_ptr<TDbInfo> ReadSourceDatabase;
+ std::shared_ptr<TDbInfo> TopicOriginDatabase;
+ TString TopicOriginPath;
};
//! Events for read session.
struct TReadSessionEvent {
class TFederatedPartitionSessionAccessor {
public:
- TFederatedPartitionSessionAccessor(TFederatedPartitionSession::TPtr partitionSession)
+ explicit TFederatedPartitionSessionAccessor(TFederatedPartitionSession::TPtr partitionSession)
: FederatedPartitionSession(std::move(partitionSession))
{}
- TFederatedPartitionSessionAccessor(NTopic::TPartitionSession::TPtr partitionSession, std::shared_ptr<TDbInfo> db)
- : FederatedPartitionSession(MakeIntrusive<TFederatedPartitionSession>(partitionSession, std::move(db)))
- {}
-
inline const TFederatedPartitionSession::TPtr GetFederatedPartitionSession() const {
return FederatedPartitionSession;
}
@@ -88,8 +121,8 @@ struct TReadSessionEvent {
struct TFederated : public TFederatedPartitionSessionAccessor, public TEvent, public TPrintable<TFederated<TEvent>> {
using TPrintable<TFederated<TEvent>>::DebugString;
- TFederated(TEvent event, std::shared_ptr<TDbInfo> db)
- : TFederatedPartitionSessionAccessor(event.GetPartitionSession(), db)
+ TFederated(TEvent event, TFederatedPartitionSession::TPtr federatedPartitionSession)
+ : TFederatedPartitionSessionAccessor(std::move(federatedPartitionSession))
, TEvent(std::move(event))
{}
@@ -109,10 +142,7 @@ struct TReadSessionEvent {
using TCompressedMessage = TFederated<NTopic::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage>;
public:
- TDataReceivedEvent(NTopic::TReadSessionEvent::TDataReceivedEvent event, std::shared_ptr<TDbInfo> db);
-
- TDataReceivedEvent(TVector<TMessage> messages, TVector<TCompressedMessage> compressedMessages,
- NTopic::TPartitionSession::TPtr partitionSession, std::shared_ptr<TDbInfo> db);
+ TDataReceivedEvent(NTopic::TReadSessionEvent::TDataReceivedEvent event, TFederatedPartitionSession::TPtr federatedPartitionSession);
const NTopic::TPartitionSession::TPtr& GetPartitionSession() const override {
ythrow yexception() << "GetPartitionSession method unavailable for federated objects, use GetFederatedPartitionSession instead";
@@ -177,15 +207,6 @@ struct TReadSessionEvent {
TSessionClosedEvent>;
};
-template <typename TEvent>
-TReadSessionEvent::TFederated<TEvent> Federate(TEvent event, std::shared_ptr<TDbInfo> db) {
- return {std::move(event), std::move(db)};
-}
-
-TReadSessionEvent::TDataReceivedEvent Federate(NTopic::TReadSessionEvent::TDataReceivedEvent event, std::shared_ptr<TDbInfo> db);
-
-TReadSessionEvent::TEvent Federate(NTopic::TReadSessionEvent::TEvent event, std::shared_ptr<TDbInfo> db);
-
//! Set of offsets to commit.
//! Class that could store offsets in order to commit them later.
//! This class is not thread safe.
@@ -273,7 +294,6 @@ struct TFederatedReadSessionSettings: public NTopic::TReadSessionSettings {
bool GracefulStopAfterCommit;
};
-
//! Set simple handler with data processing and also
//! set other handlers with default behaviour.
//! They automatically commit data after processing
@@ -290,7 +310,6 @@ struct TFederatedReadSessionSettings: public NTopic::TReadSessionSettings {
//! commitDataAfterProcessing: automatically commit data after calling of dataHandler.
//! gracefulReleaseAfterCommit: wait for commit acknowledgements for all inflight data before confirming
//! partition session destroy.
-
TSimpleDataHandlers SimpleDataHandlers_;
TSelf& SimpleDataHandlers(std::function<void(TReadSessionEvent::TDataReceivedEvent&)> dataHandler,
@@ -368,18 +387,47 @@ struct TFederatedReadSessionSettings: public NTopic::TReadSessionSettings {
//! See description in TFederatedEventHandlers class.
FLUENT_SETTING(TFederatedEventHandlers, FederatedEventHandlers);
- enum class EReadPolicy {
- READ_ALL = 0,
- READ_ORIGINAL,
- READ_MIRRORED
+
+ //! Read policy settings
+
+ //! Databases to read from.
+ //! Default (empty) value means reading from all available databases.
+ //! Adding duplicates or unavailable databases is okay, they will be ignored.
+ struct TReadOriginalSettings {
+ //! Add reading from specified database if it's available.
+ TReadOriginalSettings& AddDatabase(TString database);
+
+ //! Add reading from several specified databases, if available.
+ TReadOriginalSettings& AddDatabases(std::vector<TString> databases);
+
+ //! Add reading from database(s) with the same location as client.
+ TReadOriginalSettings& AddLocal();
+
+ std::unordered_set<TString> Databases;
};
- //! Policy for federated reading.
- //!
- //! READ_ALL: read will be done from all topic instances from all databases.
- //! READ_ORIGINAL:
- //! READ_MIRRORED:
- FLUENT_SETTING_DEFAULT(EReadPolicy, ReadPolicy, EReadPolicy::READ_ALL);
+ //! Default variant.
+ //! Read original topics specified in NTopic::TReadSessionSettings::Topics from databases, specified in settings.
+ //! Discards previously set ReadOriginal and ReadMirrored settings.
+ TSelf& ReadOriginal(TReadOriginalSettings settings);
+
+ //! Read original and mirrored topics specified in NTopic::TReadSessionSettings::Topics
+ //! from one specified database.
+ //! Discards previously set ReadOriginal and ReadMirrored settings.
+ TSelf& ReadMirrored(TString database);
+
+ bool IsReadMirroredEnabled() {
+ return ReadMirroredEnabled;
+ }
+
+ auto GetDatabasesToReadFrom() {
+ return DatabasesToReadFrom;
+ }
+
+private:
+ // Read policy settings, set via helpers above
+ bool ReadMirroredEnabled = false;
+ std::unordered_set<TString> DatabasesToReadFrom;
};
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp
index 7563f2c85a..df0bf1c8d0 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp
@@ -3,6 +3,10 @@
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/log_lazy.h>
#include <ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h>
+#define INCLUDE_YDB_INTERNAL_H
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/logger/log.h>
+#undef INCLUDE_YDB_INTERNAL_H
+
#include <library/cpp/threading/future/future.h>
#include <util/generic/guid.h>
@@ -11,25 +15,35 @@ namespace NYdb::NFederatedTopic {
NTopic::TTopicClientSettings FromFederated(const TFederatedTopicClientSettings& settings);
template <typename TEvent, typename TFederatedEvent>
-typename std::function<void(TEvent&)> WrapFederatedHandler(std::function<void(TFederatedEvent&)> outerHandler, std::shared_ptr<TDbInfo> db) {
+typename std::function<void(TEvent&)> WrapFederatedHandler(std::function<void(TFederatedEvent&)> outerHandler, std::shared_ptr<TDbInfo> db, std::shared_ptr<TEventFederator> federator) {
if (outerHandler) {
- return [outerHandler, db = std::move(db)](TEvent& ev) {
- auto fev = Federate(std::move(ev), db);
+ return [outerHandler, db = std::move(db), &federator](TEvent& ev) {
+ auto fev = federator->LocateFederate(ev, std::move(db));
return outerHandler(fev);
};
}
return {};
}
-NTopic::TReadSessionSettings FromFederated(const TFederatedReadSessionSettings& settings, const std::shared_ptr<TDbInfo>& db) {
+NTopic::TReadSessionSettings FromFederated(const TFederatedReadSessionSettings& settings, const std::shared_ptr<TDbInfo>& db, std::shared_ptr<TEventFederator> federator) {
NTopic::TReadSessionSettings SubsessionSettings = settings;
SubsessionSettings.EventHandlers_.MaxMessagesBytes(settings.EventHandlers_.MaxMessagesBytes_);
SubsessionSettings.EventHandlers_.HandlersExecutor(settings.EventHandlers_.HandlersExecutor_);
-#define MAYBE_CONVERT_HANDLER(type, name) \
- SubsessionSettings.EventHandlers_.name( \
- WrapFederatedHandler<NTopic::type, type>(settings.FederatedEventHandlers_.name##_, db) \
- );
+ if (settings.FederatedEventHandlers_.SimpleDataHandlers_.DataHandler) {
+ SubsessionSettings.EventHandlers_.SimpleDataHandlers(
+ WrapFederatedHandler<NTopic::TReadSessionEvent::TDataReceivedEvent, TReadSessionEvent::TDataReceivedEvent>(
+ settings.FederatedEventHandlers_.SimpleDataHandlers_.DataHandler, db, federator),
+ settings.FederatedEventHandlers_.SimpleDataHandlers_.CommitDataAfterProcessing,
+ settings.FederatedEventHandlers_.SimpleDataHandlers_.GracefulStopAfterCommit);
+ }
+
+#define MAYBE_CONVERT_HANDLER(type, name) \
+ if (settings.FederatedEventHandlers_.name##_) { \
+ SubsessionSettings.EventHandlers_.name( \
+ WrapFederatedHandler<NTopic::type, type>(settings.FederatedEventHandlers_.name##_, db, federator) \
+ ); \
+ }
MAYBE_CONVERT_HANDLER(TReadSessionEvent::TDataReceivedEvent, DataReceivedHandler);
MAYBE_CONVERT_HANDLER(TReadSessionEvent::TCommitOffsetAcknowledgementEvent, CommitOffsetAcknowledgementHandler);
@@ -43,31 +57,29 @@ NTopic::TReadSessionSettings FromFederated(const TFederatedReadSessionSettings&
SubsessionSettings.EventHandlers_.SessionClosedHandler(settings.FederatedEventHandlers_.SessionClosedHandler_);
- if (settings.FederatedEventHandlers_.SimpleDataHandlers_.DataHandler) {
- SubsessionSettings.EventHandlers_.SimpleDataHandlers(
- WrapFederatedHandler<NTopic::TReadSessionEvent::TDataReceivedEvent, TReadSessionEvent::TDataReceivedEvent>(
- settings.FederatedEventHandlers_.SimpleDataHandlers_.DataHandler, db),
- settings.FederatedEventHandlers_.SimpleDataHandlers_.CommitDataAfterProcessing,
- settings.FederatedEventHandlers_.SimpleDataHandlers_.GracefulStopAfterCommit);
- }
-
return SubsessionSettings;
}
TFederatedReadSessionImpl::TFederatedReadSessionImpl(const TFederatedReadSessionSettings& settings,
std::shared_ptr<TGRpcConnectionsImpl> connections,
- const TFederatedTopicClientSettings& clientSetttings,
+ const TFederatedTopicClientSettings& clientSettings,
std::shared_ptr<TFederatedDbObserver> observer)
: Settings(settings)
, Connections(std::move(connections))
- , SubClientSetttings(FromFederated(clientSetttings))
+ , SubClientSetttings(FromFederated(clientSettings))
, Observer(std::move(observer))
, AsyncInit(Observer->WaitForFirstState())
, FederationState(nullptr)
+ , EventFederator(std::make_shared<TEventFederator>())
+ , Log(Connections->GetLog())
, SessionId(CreateGuidAsString())
{
}
+TStringBuilder TFederatedReadSessionImpl::GetLogPrefix() const {
+ return TStringBuilder() << GetDatabaseLogPrefix(SubClientSetttings.Database_.GetOrElse("")) << "[" << SessionId << "] ";
+}
+
void TFederatedReadSessionImpl::Start() {
AsyncInit.Subscribe([selfCtx = SelfContext](const auto& f){
Y_UNUSED(f);
@@ -83,15 +95,14 @@ void TFederatedReadSessionImpl::Start() {
});
}
-void TFederatedReadSessionImpl::OpenSubSessionsImpl() {
- for (const auto& db : FederationState->DbInfos) {
- // TODO check if available
+void TFederatedReadSessionImpl::OpenSubSessionsImpl(const std::vector<std::shared_ptr<TDbInfo>>& dbInfos) {
+ for (const auto& db : dbInfos) {
NTopic::TTopicClientSettings settings = SubClientSetttings;
settings
.Database(db->path())
.DiscoveryEndpoint(db->endpoint());
auto subclient = make_shared<NTopic::TTopicClient::TImpl>(Connections, settings);
- auto subsession = subclient->CreateReadSession(FromFederated(Settings, db));
+ auto subsession = subclient->CreateReadSession(FromFederated(Settings, db, EventFederator));
SubSessions.emplace_back(subsession, db);
}
SubsessionIndex = 0;
@@ -99,14 +110,80 @@ void TFederatedReadSessionImpl::OpenSubSessionsImpl() {
void TFederatedReadSessionImpl::OnFederatedStateUpdateImpl() {
if (!FederationState->Status.IsSuccess()) {
+ LOG_LAZY(Log, TLOG_ERR, GetLogPrefix() << "Federated state update failed.");
+ CloseImpl();
+ return;
+ }
+
+ EventFederator->SetFederationState(FederationState);
+
+ if (Settings.IsReadMirroredEnabled()) {
+ Y_ABORT_UNLESS(Settings.GetDatabasesToReadFrom().size() == 1);
+ auto dbToReadFrom = *Settings.GetDatabasesToReadFrom().begin();
+
+ std::vector<TString> dbNames = GetAllFederationDatabaseNames();
+ auto topics = Settings.Topics_;
+ for (const auto& topic : topics) {
+ for (const auto& dbName : dbNames) {
+ if (AsciiEqualsIgnoreCase(dbName, dbToReadFrom)) {
+ continue;
+ }
+ auto mirroredTopic = topic;
+ mirroredTopic.PartitionIds_.clear();
+ mirroredTopic.Path(topic.Path_ + "-mirrored-from-" + dbName);
+ Settings.AppendTopics(mirroredTopic);
+ }
+ }
+ }
+
+ std::vector<std::shared_ptr<TDbInfo>> databases;
+
+ for (const auto& db : FederationState->DbInfos) {
+ if (IsDatabaseEligibleForRead(db)) {
+ databases.push_back(db);
+ }
+ }
+
+ if (databases.empty()) {
+ // TODO: investigate here, why empty list?
+ // Reason (and returned status) could be BAD_REQUEST or UNAVAILABLE.
+ LOG_LAZY(Log, TLOG_ERR, GetLogPrefix() << "No available databases to read.");
CloseImpl();
return;
}
- // 1) compare old info and new info;
- // result: list of subsessions to open + list of subsessions to close
- // 2) OpenSubSessionsImpl, CloseSubSessionsImpl
- OpenSubSessionsImpl();
- // 3) TODO LATER reschedule OnFederatedStateUpdate
+
+ OpenSubSessionsImpl(databases);
+}
+
+std::vector<TString> TFederatedReadSessionImpl::GetAllFederationDatabaseNames() {
+ std::vector<TString> result;
+ for (const auto& db : FederationState->DbInfos) {
+ result.push_back(db->name());
+ }
+ return result;
+}
+
+bool TFederatedReadSessionImpl::IsDatabaseEligibleForRead(const std::shared_ptr<TDbInfo>& db) {
+ if (db->status() != TDbInfo::Status::DatabaseInfo_Status_AVAILABLE &&
+ db->status() != TDbInfo::Status::DatabaseInfo_Status_READ_ONLY) {
+ return false;
+ }
+
+ if (Settings.GetDatabasesToReadFrom().empty()) {
+ return true;
+ }
+
+ for (const auto& dbFromSettings : Settings.GetDatabasesToReadFrom()) {
+ if (AsciiEqualsIgnoreCase(db->name(), dbFromSettings) ||
+ AsciiEqualsIgnoreCase(db->id(), dbFromSettings)) {
+ return true;
+ }
+ if (dbFromSettings == "_local" &&
+ AsciiEqualsIgnoreCase(FederationState->SelfLocation, db->location())) {
+ return true;
+ }
+ }
+ return false;
}
NThreading::TFuture<void> TFederatedReadSessionImpl::WaitEvent() {
@@ -151,7 +228,7 @@ TVector<TReadSessionEvent::TEvent> TFederatedReadSessionImpl::GetEvents(bool blo
do {
auto sub = SubSessions[SubsessionIndex];
for (auto&& ev : sub.Session->GetEvents(false, maxEventsCount, maxByteSize)) {
- result.push_back(Federate(std::move(ev), sub.DbInfo));
+ result.push_back(EventFederator->LocateFederate(std::move(ev), sub.DbInfo));
}
SubsessionIndex = (SubsessionIndex + 1) % SubSessions.size();
}
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h
index c0444f15a2..84c208258e 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h
@@ -9,6 +9,115 @@
namespace NYdb::NFederatedTopic {
+class TEventFederator {
+public:
+ auto LocateTopicOrigin(const NTopic::TReadSessionEvent::TEvent& event) {
+ std::shared_ptr<TDbInfo> topicOriginDbInfo;
+ TString topicOriginPath = "";
+
+ auto topicPath = std::visit([](auto&& arg) -> TStringBuf {
+ using T = std::decay_t<decltype(arg)>;
+ if constexpr (std::is_same_v<T, NTopic::TSessionClosedEvent>) {
+ return "";
+ } else {
+ return arg.GetPartitionSession()->GetTopicPath();
+ }
+ }, event);
+
+ if (topicPath.Contains("-mirrored-from-")) {
+ TStringBuf leftPart, rightPart;
+ auto res = topicPath.TryRSplit("-mirrored-from-", leftPart, rightPart);
+ Y_ABORT_UNLESS(res);
+
+ // no additional validation required: TryGetDbInfo just returns nullptr for any bad input
+ topicOriginDbInfo = FederationState->TryGetDbInfo(TString(rightPart));
+ if (topicOriginDbInfo) {
+ topicOriginPath = leftPart;
+ }
+ }
+
+ return std::make_tuple(topicOriginDbInfo, topicOriginPath);
+ }
+
+ template <typename TEvent>
+ auto LocateFederate(TEvent&& event, std::shared_ptr<TDbInfo> db) {
+ NTopic::TPartitionSession::TPtr psPtr;
+ TFederatedPartitionSession::TPtr fps;
+
+ using T = std::decay_t<TEvent>;
+ if constexpr (std::is_same_v<T, NTopic::TSessionClosedEvent>) {
+ return Federate(std::move(event), std::move(fps));
+ } else if constexpr (std::is_same_v<T, NTopic::TReadSessionEvent::TEvent>) {
+ psPtr = std::visit([](auto&& arg) -> NTopic::TPartitionSession::TPtr {
+ using T = std::decay_t<decltype(arg)>;
+ if constexpr (std::is_same_v<T, NTopic::TSessionClosedEvent>) {
+ return nullptr;
+ } else {
+ return arg.GetPartitionSession();
+ }
+ }, event);
+
+ if (!psPtr) { // TSessionClosedEvent
+ return Federate(std::move(event), std::move(fps));
+ }
+ } else {
+ psPtr = event.GetPartitionSession();
+ }
+
+ with_lock(Lock) {
+ if (!FederatedPartitionSessions.contains(psPtr.Get())) {
+ auto [topicOriginDbInfo, topicOriginPath] = LocateTopicOrigin(event);
+ FederatedPartitionSessions[psPtr.Get()] = MakeIntrusive<TFederatedPartitionSession>(psPtr, std::move(db), std::move(topicOriginDbInfo), std::move(topicOriginPath));
+ }
+ fps = FederatedPartitionSessions[psPtr.Get()];
+
+ if constexpr (std::is_same_v<TEvent, NTopic::TReadSessionEvent::TPartitionSessionClosedEvent>) {
+ FederatedPartitionSessions.erase(psPtr.Get());
+ }
+ }
+
+ return Federate(std::move(event), std::move(fps));
+ }
+
+ template <typename TEvent>
+ TReadSessionEvent::TFederated<TEvent> Federate(TEvent event, TFederatedPartitionSession::TPtr federatedPartitionSession) {
+ return {std::move(event), std::move(federatedPartitionSession)};
+ }
+
+ TReadSessionEvent::TDataReceivedEvent Federate(NTopic::TReadSessionEvent::TDataReceivedEvent event,
+ TFederatedPartitionSession::TPtr federatedPartitionSession) {
+ return {std::move(event), std::move(federatedPartitionSession)};
+ }
+
+ TReadSessionEvent::TEvent Federate(NTopic::TReadSessionEvent::TEvent event,
+ TFederatedPartitionSession::TPtr federatedPartitionSession) {
+ return std::visit([fps = std::move(federatedPartitionSession)](auto&& arg) {
+ using T = std::decay_t<decltype(arg)>;
+ std::optional<TReadSessionEvent::TEvent> ev;
+ if constexpr (std::is_same_v<T, NTopic::TReadSessionEvent::TDataReceivedEvent>) {
+ ev = TReadSessionEvent::TDataReceivedEvent(std::move(arg), std::move(fps));
+ } else if constexpr (std::is_same_v<T, NTopic::TSessionClosedEvent>) {
+ ev = std::move(arg);
+ } else {
+ ev = TReadSessionEvent::TFederated(std::move(arg), std::move(fps));
+ }
+ return *ev;
+ },
+ event);
+ }
+
+ void SetFederationState(std::shared_ptr<TFederatedDbState> state) {
+ with_lock(Lock) {
+ FederationState = std::move(state);
+ }
+ }
+
+private:
+ TAdaptiveLock Lock;
+ std::unordered_map<NTopic::TPartitionSession*, TFederatedPartitionSession::TPtr> FederatedPartitionSessions;
+ std::shared_ptr<TFederatedDbState> FederationState;
+};
+
class TFederatedReadSessionImpl : public NPersQueue::TEnableSelfContext<TFederatedReadSessionImpl> {
friend class TFederatedTopicClient::TImpl;
friend class TFederatedReadSession;
@@ -46,12 +155,15 @@ public:
}
private:
- // TODO logging
TStringBuilder GetLogPrefix() const;
void Start();
bool ValidateSettings();
- void OpenSubSessionsImpl();
+ void OpenSubSessionsImpl(const std::vector<std::shared_ptr<TDbInfo>>& dbInfos);
+
+ std::vector<TString> GetAllFederationDatabaseNames();
+
+ bool IsDatabaseEligibleForRead(const std::shared_ptr<TDbInfo>& db);
void OnFederatedStateUpdateImpl();
@@ -67,9 +179,9 @@ private:
std::shared_ptr<TFederatedDbObserver> Observer;
NThreading::TFuture<void> AsyncInit;
std::shared_ptr<TFederatedDbState> FederationState;
+ std::shared_ptr<TEventFederator> EventFederator;
- // TODO
- // TLog Log;
+ TLog Log;
const TString SessionId;
const TInstant StartSessionTime = TInstant::Now();
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp
index 240fa38a80..f88836c33f 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp
@@ -1,46 +1,10 @@
#include <ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h>
-#include <optional>
-
-namespace NYdb::NFederatedTopic {
-
-////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-// Helpers
-
-std::pair<ui64, ui64> GetMessageOffsetRange(const TReadSessionEvent::TDataReceivedEvent& dataReceivedEvent, ui64 index) {
- if (dataReceivedEvent.HasCompressedMessages()) {
- const auto& msg = dataReceivedEvent.GetCompressedMessages()[index];
- return {msg.GetOffset(), msg.GetOffset() + 1};
- }
- const auto& msg = dataReceivedEvent.GetMessages()[index];
- return {msg.GetOffset(), msg.GetOffset() + 1};
-}
-
-TReadSessionEvent::TDataReceivedEvent Federate(NTopic::TReadSessionEvent::TDataReceivedEvent event, std::shared_ptr<TDbInfo> db) {
- return {std::move(event), std::move(db)};
-}
-
-TReadSessionEvent::TEvent Federate(NTopic::TReadSessionEvent::TEvent event, std::shared_ptr<TDbInfo> db) {
- return std::visit([db = std::move(db)](auto&& arg) {
- using T = std::decay_t<decltype(arg)>;
- std::optional<TReadSessionEvent::TEvent> ev;
- if constexpr (std::is_same_v<T, NTopic::TReadSessionEvent::TDataReceivedEvent>) {
- ev = TReadSessionEvent::TDataReceivedEvent(std::move(arg), std::move(db));
- } else if constexpr (std::is_same_v<T, NTopic::TSessionClosedEvent>) {
- ev = std::move(arg);
- } else {
- ev = TReadSessionEvent::TFederated(std::move(arg), std::move(db));
- }
- return *ev;
- }, event);
-}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Printable specializations
-}
-
namespace NYdb::NTopic {
using namespace NFederatedTopic;
@@ -155,7 +119,6 @@ void TPrintable<TDataReceivedEvent>::DebugString(TStringBuilder& ret, bool print
ret << " }";
}
-
}
namespace NYdb::NFederatedTopic {
@@ -163,35 +126,17 @@ namespace NYdb::NFederatedTopic {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// NFederatedTopic::TReadSessionEvent::TDataReceivedEvent
-TReadSessionEvent::TDataReceivedEvent::TDataReceivedEvent(NTopic::TReadSessionEvent::TDataReceivedEvent event, std::shared_ptr<TDbInfo> db)
+TReadSessionEvent::TDataReceivedEvent::TDataReceivedEvent(NTopic::TReadSessionEvent::TDataReceivedEvent event, TFederatedPartitionSession::TPtr federatedPartitionSession)
: NTopic::TReadSessionEvent::TPartitionSessionAccessor(event.GetPartitionSession())
- , TFederatedPartitionSessionAccessor(event.GetPartitionSession(), db)
+ , TFederatedPartitionSessionAccessor(federatedPartitionSession)
{
if (event.HasCompressedMessages()) {
for (auto& msg : event.GetCompressedMessages()) {
- CompressedMessages.emplace_back(std::move(msg), db);
+ CompressedMessages.emplace_back(std::move(msg), federatedPartitionSession);
}
} else {
for (auto& msg : event.GetMessages()) {
- Messages.emplace_back(std::move(msg), db);
- }
- }
-}
-
-TReadSessionEvent::TDataReceivedEvent::TDataReceivedEvent(
- TVector<TMessage> messages, TVector<TCompressedMessage> compressedMessages,
- NTopic::TPartitionSession::TPtr partitionSession, std::shared_ptr<TDbInfo> db)
- : NTopic::TReadSessionEvent::TPartitionSessionAccessor(partitionSession)
- , TFederatedPartitionSessionAccessor(partitionSession, db)
- , Messages(std::move(messages))
- , CompressedMessages(std::move(compressedMessages))
-{
- for (size_t i = 0; i < GetMessagesCount(); ++i) {
- auto [from, to] = GetMessageOffsetRange(*this, i);
- if (OffsetRanges.empty() || OffsetRanges.back().second != from) {
- OffsetRanges.emplace_back(from, to);
- } else {
- OffsetRanges.back().second = to;
+ Messages.emplace_back(std::move(msg), federatedPartitionSession);
}
}
}
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp
index ca076d602d..75d7abb173 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp
@@ -3,6 +3,43 @@
namespace NYdb::NFederatedTopic {
+// TFederatedReadSessionSettings
+// Read policy settings
+
+using TReadOriginalSettings = TFederatedReadSessionSettings::TReadOriginalSettings;
+TReadOriginalSettings& TReadOriginalSettings::AddDatabase(TString database) {
+ Databases.insert(std::move(database));
+ return *this;
+}
+
+TReadOriginalSettings& TReadOriginalSettings::AddDatabases(std::vector<TString> databases) {
+ std::move(std::begin(databases), std::end(databases), std::inserter(Databases, Databases.end()));
+ return *this;
+}
+
+TReadOriginalSettings& TReadOriginalSettings::AddLocal() {
+ Databases.insert("_local");
+ return *this;
+}
+
+TFederatedReadSessionSettings& TFederatedReadSessionSettings::ReadOriginal(TReadOriginalSettings settings) {
+ std::swap(DatabasesToReadFrom, settings.Databases);
+ ReadMirroredEnabled = false;
+ return *this;
+}
+
+TFederatedReadSessionSettings& TFederatedReadSessionSettings::ReadMirrored(TString database) {
+ if (database == "_local") {
+ ythrow TContractViolation("Reading from local database not supported, use specific database");
+ }
+ DatabasesToReadFrom.clear();
+ DatabasesToReadFrom.insert(std::move(database));
+ ReadMirroredEnabled = true;
+ return *this;
+}
+
+// TFederatedTopicClient
+
NTopic::TTopicClientSettings FromFederated(const TFederatedTopicClientSettings& settings) {
return NTopic::TTopicClientSettings()
.DefaultCompressionExecutor(settings.DefaultCompressionExecutor_)
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.h
index 6fc2baa80d..bf4ad2a163 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.h
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.h
@@ -35,11 +35,22 @@ public:
, ControlPlaneEndpoint(result.control_plane_endpoint())
, SelfLocation(result.self_location())
{
- // TODO remove copy
+ // TODO ensure that all databases have unique names?
for (const auto& db : result.federation_databases()) {
DbInfos.push_back(std::make_shared<TDbInfo>(db));
}
}
+
+ std::shared_ptr<TDbInfo> TryGetDbInfo(const TString& name) const noexcept {
+ // There are few databases per federation usually, so the linear search is probably ok.
+ // TODO better profile this
+ for (const auto& dbInfo : DbInfos) {
+ if (AsciiEqualsIgnoreCase(dbInfo->name(), name)) {
+ return dbInfo;
+ }
+ }
+ return nullptr;
+ }
};
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp
index a2780101c8..ad884e1ce7 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp
@@ -520,6 +520,164 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
AtomicSet(check, 0);
}
+ Y_UNIT_TEST(ReadMirrored) {
+ auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME, false);
+ setup->Start(true, true);
+ setup->CreateTopic(setup->GetTestTopic() + "-mirrored-from-dc2", setup->GetLocalCluster());
+ setup->CreateTopic(setup->GetTestTopic() + "-mirrored-from-dc3", setup->GetLocalCluster());
+
+ TFederationDiscoveryServiceMock fdsMock;
+ fdsMock.Port = setup->GetGrpcPort();
+
+ ui16 newServicePort = setup->GetPortManager()->GetPort(4285);
+ auto grpcServer = setup->StartGrpcService(newServicePort, &fdsMock);
+
+ std::shared_ptr<NYdb::NFederatedTopic::IFederatedReadSession> ReadSession;
+
+ // Create topic client.
+ NYdb::TDriverConfig cfg;
+ cfg.SetEndpoint(TStringBuilder() << "localhost:" << newServicePort);
+ cfg.SetDatabase("/Root");
+ cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG));
+ NYdb::TDriver driver(cfg);
+ auto clientSettings = TFederatedTopicClientSettings()
+ .RetryPolicy(NTopic::IRetryPolicy::GetFixedIntervalPolicy(
+ TDuration::Seconds(10),
+ TDuration::Seconds(10)
+ ));
+ NYdb::NFederatedTopic::TFederatedTopicClient topicClient(driver, clientSettings);
+
+ ui64 count = 5u;
+
+ TString messageBase = "message----";
+ TVector<TString> sentMessages;
+ std::unordered_set<TString> sentSet;
+
+ for (auto i = 0u; i < count; i++) {
+ sentMessages.emplace_back(messageBase * (10 * i + 1));
+ sentSet.emplace(sentMessages.back() + "-from-dc1");
+ sentSet.emplace(sentMessages.back() + "-from-dc2");
+ sentSet.emplace(sentMessages.back() + "-from-dc3");
+ }
+
+ NThreading::TPromise<void> checkedPromise = NThreading::NewPromise<void>();
+ auto totalReceived = 0u;
+
+ auto f = checkedPromise.GetFuture();
+ TAtomic check = 1;
+
+ // Create read session.
+ NYdb::NFederatedTopic::TFederatedReadSessionSettings readSettings;
+ readSettings
+ .ReadMirrored("dc1")
+ .ConsumerName("shared/user")
+ .MaxMemoryUsageBytes(16_MB)
+ .AppendTopics(setup->GetTestTopic());
+
+ readSettings.FederatedEventHandlers_.SimpleDataHandlers([&](TReadSessionEvent::TDataReceivedEvent& ev) mutable {
+ Cerr << ">>> event from dataHandler: " << DebugString(ev) << Endl;
+ Y_VERIFY_S(AtomicGet(check) != 0, "check is false");
+ auto& messages = ev.GetMessages();
+ Cerr << ">>> get " << messages.size() << " messages in this event" << Endl;
+ for (size_t i = 0u; i < messages.size(); ++i) {
+ auto& message = messages[i];
+ UNIT_ASSERT(message.GetFederatedPartitionSession()->GetReadSourceDatabaseName() == "dc1");
+ UNIT_ASSERT(message.GetFederatedPartitionSession()->GetTopicPath() == setup->GetTestTopic());
+ UNIT_ASSERT(message.GetData().EndsWith(message.GetFederatedPartitionSession()->GetTopicOriginDatabaseName()));
+
+ UNIT_ASSERT(!sentSet.empty());
+ UNIT_ASSERT_C(sentSet.erase(message.GetData()), "no such element is sentSet: " + message.GetData());
+ totalReceived++;
+ }
+ if (totalReceived == 3 * sentMessages.size()) {
+ UNIT_ASSERT(sentSet.empty());
+ checkedPromise.SetValue();
+ }
+ });
+
+ ReadSession = topicClient.CreateReadSession(readSettings);
+ Cerr << ">>> Session was created" << Endl;
+
+ Sleep(TDuration::MilliSeconds(50));
+
+ auto events = ReadSession->GetEvents(false);
+ UNIT_ASSERT(events.empty());
+
+ std::optional<TFederationDiscoveryServiceMock::TManualRequest> fdsRequest;
+ do {
+ fdsRequest = fdsMock.GetNextPendingRequest();
+ if (!fdsRequest.has_value()) {
+ Sleep(TDuration::MilliSeconds(50));
+ }
+ } while (!fdsRequest.has_value());
+
+ fdsRequest->Result.SetValue(fdsMock.ComposeOkResult());
+
+ {
+ NPersQueue::TWriteSessionSettings writeSettings;
+ writeSettings.Path(setup->GetTestTopic()).MessageGroupId("src_id");
+ writeSettings.Codec(NPersQueue::ECodec::RAW);
+ NPersQueue::IExecutor::TPtr executor = new NPersQueue::TSyncExecutor();
+ writeSettings.CompressionExecutor(executor);
+
+ auto& client = setup->GetPersQueueClient();
+ auto session = client.CreateSimpleBlockingWriteSession(writeSettings);
+
+ for (auto i = 0u; i < count; i++) {
+ auto res = session->Write(sentMessages[i] + "-from-dc1");
+ UNIT_ASSERT(res);
+ }
+
+ session->Close();
+
+ Cerr << ">>> Writes to test-topic successful" << Endl;
+ }
+
+ {
+ NPersQueue::TWriteSessionSettings writeSettings;
+ writeSettings.Path(setup->GetTestTopic() + "-mirrored-from-dc2").MessageGroupId("src_id");
+ writeSettings.Codec(NPersQueue::ECodec::RAW);
+ NPersQueue::IExecutor::TPtr executor = new NPersQueue::TSyncExecutor();
+ writeSettings.CompressionExecutor(executor);
+
+ auto& client = setup->GetPersQueueClient();
+ auto session = client.CreateSimpleBlockingWriteSession(writeSettings);
+
+ for (auto i = 0u; i < count; i++) {
+ auto res = session->Write(sentMessages[i] + "-from-dc2");
+ UNIT_ASSERT(res);
+ }
+
+ session->Close();
+
+ Cerr << ">>> Writes to test-topic-mirrored-from-dc2 successful" << Endl;
+ }
+
+ {
+ NPersQueue::TWriteSessionSettings writeSettings;
+ writeSettings.Path(setup->GetTestTopic() + "-mirrored-from-dc3").MessageGroupId("src_id");
+ writeSettings.Codec(NPersQueue::ECodec::RAW);
+ NPersQueue::IExecutor::TPtr executor = new NPersQueue::TSyncExecutor();
+ writeSettings.CompressionExecutor(executor);
+
+ auto& client = setup->GetPersQueueClient();
+ auto session = client.CreateSimpleBlockingWriteSession(writeSettings);
+
+ for (auto i = 0u; i < count; i++) {
+ auto res = session->Write(sentMessages[i] + "-from-dc3");
+ UNIT_ASSERT(res);
+ }
+
+ session->Close();
+
+ Cerr << ">>> Writes to test-topic-mirrored-from-dc3 successful" << Endl;
+ }
+
+ f.GetValueSync();
+ ReadSession->Close();
+ AtomicSet(check, 0);
+ }
+
Y_UNIT_TEST(BasicWriteSession) {
auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(
TEST_CASE_NAME, false, ::NPersQueue::TTestServer::LOGGED_SERVICES, NActors::NLog::PRI_DEBUG, 2);
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/fds_mock.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/fds_mock.h
index b0746d14b7..5e48d4c662 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/fds_mock.h
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/fds_mock.h
@@ -96,6 +96,14 @@ public:
c2->set_location("dc2");
c2->set_status(::Ydb::FederationDiscovery::DatabaseInfo::Status::DatabaseInfo_Status_AVAILABLE);
c2->set_weight(500);
+ auto c3 = mockResult.add_federation_databases();
+ c3->set_name("dc3");
+ c3->set_path("/Root");
+ c3->set_id("account-dc3");
+ c3->set_endpoint("localhost:" + ToString(Port));
+ c3->set_location("dc3");
+ c3->set_status(::Ydb::FederationDiscovery::DatabaseInfo::Status::DatabaseInfo_Status_AVAILABLE);
+ c3->set_weight(500);
op->mutable_result()->PackFrom(mockResult);