diff options
author | ildar-khisam <ikhis@ydb.tech> | 2023-12-15 14:01:16 +0300 |
---|---|---|
committer | ildar-khisam <ikhis@ydb.tech> | 2023-12-15 15:40:44 +0300 |
commit | 5d7e58e04264a7572256b91a64e2db24b6c47fb9 (patch) | |
tree | 4da5f417f62098b4e187a39ddf2038739d36ebe4 | |
parent | 7a3e28d96b991a9beda1e94549207dd1878f3bc2 (diff) | |
download | ydb-5d7e58e04264a7572256b91a64e2db24b6c47fb9.tar.gz |
federated topic write
federated write draft
20 files changed, 562 insertions, 48 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h index c3fb5c7676..121097e39d 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h @@ -238,12 +238,21 @@ struct TFederatedWriteSessionSettings : public NTopic::TWriteSessionSettings { TFederatedWriteSessionSettings() = default; TFederatedWriteSessionSettings(const TFederatedWriteSessionSettings&) = default; TFederatedWriteSessionSettings(TFederatedWriteSessionSettings&&) = default; + TFederatedWriteSessionSettings& operator=(const TFederatedWriteSessionSettings&) = default; + TFederatedWriteSessionSettings& operator=(TFederatedWriteSessionSettings&&) = default; + TFederatedWriteSessionSettings(const TString& path, const TString& producerId, const TString& messageGroupId) : NTopic::TWriteSessionSettings(path, producerId, messageGroupId) { } - TFederatedWriteSessionSettings& operator=(const TFederatedWriteSessionSettings&) = default; - TFederatedWriteSessionSettings& operator=(TFederatedWriteSessionSettings&&) = default; + TFederatedWriteSessionSettings(const NTopic::TWriteSessionSettings& settings) + : NTopic::TWriteSessionSettings(settings) { + } + TFederatedWriteSessionSettings(NTopic::TWriteSessionSettings&& settings) + : NTopic::TWriteSessionSettings(std::move(settings)) { + } + // TFederatedWriteSessionSettings& operator=(const NTopic::TWriteSessionSettings&); + // TFederatedWriteSessionSettings& operator=(NTopic::TWriteSessionSettings&&); }; //! Settings for read session. @@ -365,8 +374,6 @@ struct TFederatedReadSessionSettings: public NTopic::TReadSessionSettings { READ_MIRRORED }; - // optional for read_mirrored case ? - //! Policy for federated reading. //! //! READ_ALL: read will be done from all topic instances from all databases. @@ -442,11 +449,11 @@ public: TFederatedTopicClient(const TDriver& driver, const TFederatedTopicClientSettings& settings = {}); //! Create read session. - std::shared_ptr<IFederatedReadSession> CreateFederatedReadSession(const TFederatedReadSessionSettings& settings); + std::shared_ptr<IFederatedReadSession> CreateReadSession(const TFederatedReadSessionSettings& settings); //! Create write session. - // std::shared_ptr<NTopic::ISimpleBlockingWriteSession> CreateSimpleBlockingFederatedWriteSession(const TFederatedWriteSessionSettings& settings); - // std::shared_ptr<NTopic::IWriteSession> CreateFederatedWriteSession(const TFederatedWriteSessionSettings& settings); + // std::shared_ptr<NTopic::ISimpleBlockingWriteSession> CreateSimpleBlockingWriteSession(const TFederatedWriteSessionSettings& settings); + std::shared_ptr<NTopic::IWriteSession> CreateWriteSession(const TFederatedWriteSessionSettings& settings); private: std::shared_ptr<TImpl> Impl_; diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.darwin-arm64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.darwin-arm64.txt index d33d0f6b51..bea5a60f1e 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.darwin-arm64.txt +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.darwin-arm64.txt @@ -27,6 +27,7 @@ target_link_libraries(client-ydb_federated_topic-impl PUBLIC target_sources(client-ydb_federated_topic-impl PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.darwin-x86_64.txt index d33d0f6b51..bea5a60f1e 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.darwin-x86_64.txt +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.darwin-x86_64.txt @@ -27,6 +27,7 @@ target_link_libraries(client-ydb_federated_topic-impl PUBLIC target_sources(client-ydb_federated_topic-impl PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.linux-aarch64.txt index 0847aec54a..efd677ea08 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.linux-aarch64.txt +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.linux-aarch64.txt @@ -28,6 +28,7 @@ target_link_libraries(client-ydb_federated_topic-impl PUBLIC target_sources(client-ydb_federated_topic-impl PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.linux-x86_64.txt index 0847aec54a..efd677ea08 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.linux-x86_64.txt +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.linux-x86_64.txt @@ -28,6 +28,7 @@ target_link_libraries(client-ydb_federated_topic-impl PUBLIC target_sources(client-ydb_federated_topic-impl PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.windows-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.windows-x86_64.txt index d33d0f6b51..bea5a60f1e 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.windows-x86_64.txt +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/CMakeLists.windows-x86_64.txt @@ -27,6 +27,7 @@ target_link_libraries(client-ydb_federated_topic-impl PUBLIC target_sources(client-ydb_federated_topic-impl PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session_event.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp index 07645caf6e..7563f2c85a 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp @@ -73,6 +73,9 @@ void TFederatedReadSessionImpl::Start() { Y_UNUSED(f); if (auto self = selfCtx->LockShared()) { with_lock(self->Lock) { + if (self->Closing) { + return; + } self->FederationState = self->Observer->GetState(); self->OnFederatedStateUpdateImpl(); } @@ -110,11 +113,11 @@ NThreading::TFuture<void> TFederatedReadSessionImpl::WaitEvent() { // TODO override with read session settings timeout return AsyncInit.Apply([selfCtx = SelfContext](const NThreading::TFuture<void>) { if (auto self = selfCtx->LockShared()) { - if (self->Closing) { - return NThreading::MakeFuture(); - } std::vector<NThreading::TFuture<void>> waiters; with_lock(self->Lock) { + if (self->Closing) { + return NThreading::MakeFuture(); + } Y_ABORT_UNLESS(!self->SubSessions.empty(), "SubSessions empty in discovered state"); for (const auto& sub : self->SubSessions) { waiters.emplace_back(sub.Session->WaitEvent()); @@ -162,12 +165,16 @@ void TFederatedReadSessionImpl::CloseImpl() { } bool TFederatedReadSessionImpl::Close(TDuration timeout) { - bool result = true; - for (const auto& sub : SubSessions) { - // TODO substract from user timeout - result = sub.Session->Close(timeout); + with_lock(Lock) { + Closing = true; + + bool result = true; + for (const auto& sub : SubSessions) { + // TODO substract from user timeout + result = sub.Session->Close(timeout); + } + return result; } - return result; } } // namespace NYdb::NFederatedTopic diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h index b073241d52..c0444f15a2 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h @@ -55,16 +55,8 @@ private: void OnFederatedStateUpdateImpl(); - void Abort(); void CloseImpl(); - void ClearAllEvents(); - - // TODO Counters - // void MakeCountersIfNeeded(); - // void DumpCountersToLog(size_t timeNumber = 0); - // void ScheduleDumpCountersToLog(size_t timeNumber = 0); - private: TFederatedReadSessionSettings Settings; @@ -87,8 +79,6 @@ private: std::vector<TSubSession> SubSessions; size_t SubsessionIndex = 0; - // NYdbGrpc::IQueueClientContextPtr DumpCountersContext; - // Exiting. bool Closing = false; }; diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp index fa135df282..ca076d602d 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic.cpp @@ -14,17 +14,17 @@ TFederatedTopicClient::TFederatedTopicClient(const TDriver& driver, const TFeder { } -std::shared_ptr<IFederatedReadSession> TFederatedTopicClient::CreateFederatedReadSession(const TFederatedReadSessionSettings& settings) { - return Impl_->CreateFederatedReadSession(settings); +std::shared_ptr<IFederatedReadSession> TFederatedTopicClient::CreateReadSession(const TFederatedReadSessionSettings& settings) { + return Impl_->CreateReadSession(settings); } -// std::shared_ptr<NTopic::ISimpleBlockingWriteSession> TFederatedTopicClient::CreateSimpleBlockingFederatedWriteSession( +// std::shared_ptr<NTopic::ISimpleBlockingWriteSession> TFederatedTopicClient::CreateSimpleBlockingWriteSession( // const TFederatedWriteSessionSettings& settings) { -// return Impl_->CreateSimpleFederatedWriteSession(settings); +// return Impl_->CreateSimpleBlockingWriteSession(settings); // } -// std::shared_ptr<NTopic::IWriteSession> TFederatedTopicClient::CreateFederatedWriteSession(const TFederatedWriteSessionSettings& settings) { -// return Impl_->CreateFederatedWriteSession(settings); -// } +std::shared_ptr<NTopic::IWriteSession> TFederatedTopicClient::CreateWriteSession(const TFederatedWriteSessionSettings& settings) { + return Impl_->CreateWriteSession(settings); +} } // namespace NYdb::NFederatedTopic diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp index fa091252b5..2de842b5cd 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.cpp @@ -1,7 +1,7 @@ #include "federated_topic_impl.h" #include "federated_read_session.h" -// #include "federated_write_session.h" +#include "federated_write_session.h" #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h> @@ -9,13 +9,36 @@ namespace NYdb::NFederatedTopic { std::shared_ptr<IFederatedReadSession> -TFederatedTopicClient::TImpl::CreateFederatedReadSession(const TFederatedReadSessionSettings& settings) { +TFederatedTopicClient::TImpl::CreateReadSession(const TFederatedReadSessionSettings& settings) { InitObserver(); auto session = std::make_shared<TFederatedReadSession>(settings, Connections, ClientSettings, GetObserver()); session->Start(); return std::move(session); } +// std::shared_ptr<NTopic::ISimpleBlockingWriteSession> +// TFederatedTopicClient::TImpl::CreateSimpleBlockingWriteSession(const TFederatedWriteSessionSettings& settings) { +// InitObserver(); +// auto session = std::make_shared<TSimpleBlockingFederatedWriteSession>(settings, Connections, ClientSettings, GetObserver()); +// session->Start(); +// return std::move(session); + +// } + +std::shared_ptr<NTopic::IWriteSession> +TFederatedTopicClient::TImpl::CreateWriteSession(const TFederatedWriteSessionSettings& settings) { + // Split settings.MaxMemoryUsage_ by two. + // One half goes to subsession. Other half goes to federated session internal buffer. + const ui64 splitSize = (settings.MaxMemoryUsage_ + 1) / 2; + TFederatedWriteSessionSettings splitSettings = settings; + splitSettings.MaxMemoryUsage(splitSize); + InitObserver(); + auto session = std::make_shared<TFederatedWriteSession>(splitSettings, Connections, ClientSettings, GetObserver()); + session->Start(); + return std::move(session); + +} + void TFederatedTopicClient::TImpl::InitObserver() { with_lock(Lock) { if (!Observer || Observer->IsStale()) { diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.h index c963b3a94a..cf807776ad 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.h +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.h @@ -33,7 +33,10 @@ public: } // Runtime API. - std::shared_ptr<IFederatedReadSession> CreateFederatedReadSession(const TFederatedReadSessionSettings& settings); + std::shared_ptr<IFederatedReadSession> CreateReadSession(const TFederatedReadSessionSettings& settings); + + std::shared_ptr<NTopic::ISimpleBlockingWriteSession> CreateSimpleBlockingWriteSession(const TFederatedWriteSessionSettings& settings); + std::shared_ptr<NTopic::IWriteSession> CreateWriteSession(const TFederatedWriteSessionSettings& settings); std::shared_ptr<TFederatedDbObserver> GetObserver() { with_lock(Lock) { diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp new file mode 100644 index 0000000000..d011480b95 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.cpp @@ -0,0 +1,294 @@ +#include "federated_write_session.h" + +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/log_lazy.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h> + +#include <library/cpp/threading/future/future.h> + +#include <algorithm> + +namespace NYdb::NFederatedTopic { + +constexpr TDuration UPDATE_FEDERATION_STATE_DELAY = TDuration::Seconds(10); + +bool DatabasesAreSame(std::shared_ptr<TDbInfo> lhs, std::shared_ptr<TDbInfo> rhs) { + if (!lhs || !rhs) { + return false; + } + return lhs->path() == rhs->path() && lhs->endpoint() == rhs->endpoint(); +} + +NTopic::TTopicClientSettings FromFederated(const TFederatedTopicClientSettings& settings); + +TFederatedWriteSession::TFederatedWriteSession(const TFederatedWriteSessionSettings& settings, + std::shared_ptr<TGRpcConnectionsImpl> connections, + const TFederatedTopicClientSettings& clientSetttings, + std::shared_ptr<TFederatedDbObserver> observer) + : Settings(settings) + , Connections(std::move(connections)) + , SubClientSetttings(FromFederated(clientSetttings)) + , Observer(std::move(observer)) + , AsyncInit(Observer->WaitForFirstState()) + , FederationState(nullptr) + , ClientEventsQueue(std::make_shared<NTopic::TWriteSessionEventsQueue>(Settings)) + , BufferFreeSpace(Settings.MaxMemoryUsage_) +{ +} + +void TFederatedWriteSession::Start() { + // TODO validate settings? + with_lock(Lock) { + ClientEventsQueue->PushEvent(NTopic::TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()}); + ClientHasToken = true; + } + + AsyncInit.Subscribe([self = shared_from_this()](const auto& f){ + Y_UNUSED(f); + with_lock(self->Lock) { + self->FederationState = self->Observer->GetState(); + self->OnFederatedStateUpdateImpl(); + } + }); +} + +void TFederatedWriteSession::OpenSubSessionImpl(std::shared_ptr<TDbInfo> db) { + if (Subsession) { + PendingToken.Clear(); + Subsession->Close(TDuration::Zero()); + } + NTopic::TTopicClientSettings clientSettings = SubClientSetttings; + clientSettings + .Database(db->path()) + .DiscoveryEndpoint(db->endpoint()); + auto subclient = make_shared<NTopic::TTopicClient::TImpl>(Connections, clientSettings); + + auto handlers = NTopic::TWriteSessionSettings::TEventHandlers() + .HandlersExecutor(Settings.EventHandlers_.HandlersExecutor_) + .ReadyToAcceptHander([self = shared_from_this()](NTopic::TWriteSessionEvent::TReadyToAcceptEvent& ev){ + TDeferredWrite deferred(self->Subsession); + with_lock(self->Lock) { + Y_ABORT_UNLESS(self->PendingToken.Empty()); + self->PendingToken = std::move(ev.ContinuationToken); + self->PrepareDeferredWrite(deferred); + } + deferred.DoWrite(); + }) + .AcksHandler([self = shared_from_this()](NTopic::TWriteSessionEvent::TAcksEvent& ev){ + with_lock(self->Lock) { + Y_ABORT_UNLESS(ev.Acks.size() <= self->OriginalMessagesToGetAck.size()); + for (size_t i = 0; i < ev.Acks.size(); ++i) { + self->BufferFreeSpace += self->OriginalMessagesToGetAck.front().Data.size(); + self->OriginalMessagesToGetAck.pop_front(); + } + self->ClientEventsQueue->PushEvent(std::move(ev)); + if (self->BufferFreeSpace > 0 && !self->ClientHasToken) { + self->ClientEventsQueue->PushEvent(NTopic::TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()}); + self->ClientHasToken = true; + } + } + }) + .SessionClosedHandler([self = shared_from_this()](const NTopic::TSessionClosedEvent & ev){ + with_lock(self->Lock) { + self->ClientEventsQueue->PushEvent(ev); + } + }); + + NTopic::TWriteSessionSettings wsSettings = Settings; + wsSettings + // .MaxMemoryUsage(Settings.MaxMemoryUsage_) // to fix if split not by half on creation + .EventHandlers(handlers); + + Subsession = subclient->CreateWriteSession(wsSettings); + CurrentDatabase = db; +} + +std::shared_ptr<TDbInfo> TFederatedWriteSession::SelectDatabaseImpl() { + std::vector<std::shared_ptr<TDbInfo>> availableDatabases; + ui64 totalWeight = 0; + + for (const auto& db : FederationState->DbInfos) { + if (db->status() != TDbInfo::Status::DatabaseInfo_Status_AVAILABLE) { + continue; + } + + if (Settings.PreferredDatabase_ && (AsciiEqualsIgnoreCase(db->name(), *Settings.PreferredDatabase_) || + AsciiEqualsIgnoreCase(db->id(), *Settings.PreferredDatabase_))) { + return db; + } else if (AsciiEqualsIgnoreCase(FederationState->SelfLocation, db->location())) { + return db; + } else { + availableDatabases.push_back(db); + totalWeight += db->weight(); + } + } + + if (availableDatabases.empty() || totalWeight == 0) { + // close session, return error + return nullptr; + } + + std::sort(availableDatabases.begin(), availableDatabases.end(), [](const std::shared_ptr<TDbInfo>& lhs, const std::shared_ptr<TDbInfo>& rhs){ + return lhs->weight() > rhs->weight() + || lhs->weight() == rhs->weight() && lhs->name() < rhs->name(); + }); + + ui64 hashValue = THash<TString>()(Settings.Path_); + hashValue = CombineHashes(hashValue, THash<TString>()(Settings.ProducerId_)); + hashValue %= totalWeight; + + ui64 borderWeight = 0; + for (const auto& db : availableDatabases) { + borderWeight += db->weight(); + if (hashValue < borderWeight) { + return db; + } + } + Y_UNREACHABLE(); +} + +void TFederatedWriteSession::OnFederatedStateUpdateImpl() { + if (!FederationState->Status.IsSuccess()) { + CloseImpl(FederationState->Status.GetStatus(), NYql::TIssues{}); + return; + } + + Y_ABORT_UNLESS(!FederationState->DbInfos.empty()); + + auto preferrableDb = SelectDatabaseImpl(); + + if (!preferrableDb) { + CloseImpl(EStatus::UNAVAILABLE, + NYql::TIssues{NYql::TIssue("Fail to select database: no available database with positive weight")}); + return; + } + + if (!DatabasesAreSame(preferrableDb, CurrentDatabase)) { + OpenSubSessionImpl(preferrableDb); + } + + ScheduleFederatedStateUpdateImpl(UPDATE_FEDERATION_STATE_DELAY); +} + +void TFederatedWriteSession::ScheduleFederatedStateUpdateImpl(TDuration delay) { + Y_ABORT_UNLESS(Lock.IsLocked()); + auto cb = [self = shared_from_this()](bool ok) { + if (ok) { + with_lock(self->Lock) { + if (self->Closing) { + return; + } + self->FederationState = self->Observer->GetState(); + self->OnFederatedStateUpdateImpl(); + } + } + }; + + UpdateStateDelayContext = Connections->CreateContext(); + if (!UpdateStateDelayContext) { + Closing = true; + // TODO log DRIVER_IS_STOPPING_DESCRIPTION + return; + } + Connections->ScheduleCallback(delay, + std::move(cb), + UpdateStateDelayContext); +} + +NThreading::TFuture<void> TFederatedWriteSession::WaitEvent() { + return ClientEventsQueue->WaitEvent(); +} + +TVector<NTopic::TWriteSessionEvent::TEvent> TFederatedWriteSession::GetEvents(bool block, TMaybe<size_t> maxEventsCount) { + return ClientEventsQueue->GetEvents(block, maxEventsCount); +} + +TMaybe<NTopic::TWriteSessionEvent::TEvent> TFederatedWriteSession::GetEvent(bool block) { + auto events = GetEvents(block, 1); + return events.empty() ? Nothing() : TMaybe<NTopic::TWriteSessionEvent::TEvent>{std::move(events.front())}; +} + +NThreading::TFuture<ui64> TFederatedWriteSession::GetInitSeqNo() { + return NThreading::MakeFuture<ui64>(0u); +} + +void TFederatedWriteSession::Write(NTopic::TContinuationToken&& token, TStringBuf data, TMaybe<ui64> seqNo, + TMaybe<TInstant> createTimestamp) { + NTopic::TWriteMessage message{std::move(data)}; + if (seqNo.Defined()) + message.SeqNo(*seqNo); + if (createTimestamp.Defined()) + message.CreateTimestamp(*createTimestamp); + return WriteInternal(std::move(token), std::move(message)); +} + +void TFederatedWriteSession::Write(NTopic::TContinuationToken&& token, NTopic::TWriteMessage&& message) { + return WriteInternal(std::move(token), std::move(message)); +} + +void TFederatedWriteSession::WriteEncoded(NTopic::TContinuationToken&& token, TStringBuf data, NTopic::ECodec codec, + ui32 originalSize, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp) { + auto message = NTopic::TWriteMessage::CompressedMessage(std::move(data), codec, originalSize); + if (seqNo.Defined()) + message.SeqNo(*seqNo); + if (createTimestamp.Defined()) + message.CreateTimestamp(*createTimestamp); + return WriteInternal(std::move(token), std::move(message)); +} + +void TFederatedWriteSession::WriteEncoded(NTopic::TContinuationToken&& token, NTopic::TWriteMessage&& message) { + return WriteInternal(std::move(token), std::move(message)); +} + +void TFederatedWriteSession::WriteInternal(NTopic::TContinuationToken&&, NTopic::TWriteMessage&& message) { + ClientHasToken = false; + if (!message.CreateTimestamp_.Defined()) { + message.CreateTimestamp_ = TInstant::Now(); + } + + { + TDeferredWrite deferred(Subsession); + with_lock(Lock) { + BufferFreeSpace -= message.Data.size(); + OriginalMessagesToPassDown.emplace_back(std::move(message)); + + PrepareDeferredWrite(deferred); + } + deferred.DoWrite(); + } + if (BufferFreeSpace > 0) { + ClientEventsQueue->PushEvent(NTopic::TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()}); + ClientHasToken = true; + } +} + +bool TFederatedWriteSession::PrepareDeferredWrite(TDeferredWrite& deferred) { + if (PendingToken.Empty()) { + return false; + } + if (OriginalMessagesToPassDown.empty()) { + return false; + } + OriginalMessagesToGetAck.push_back(OriginalMessagesToPassDown.front()); + deferred.Token.ConstructInPlace(std::move(*PendingToken)); + deferred.Message.ConstructInPlace(std::move(OriginalMessagesToPassDown.front())); + OriginalMessagesToPassDown.pop_front(); + PendingToken.Clear(); + return true; +} + +void TFederatedWriteSession::CloseImpl(EStatus statusCode, NYql::TIssues&& issues) { + Closing = true; + if (Subsession) { + Subsession->Close(TDuration::Zero()); + } + ClientEventsQueue->Close(TSessionClosedEvent(statusCode, std::move(issues))); +} + +bool TFederatedWriteSession::Close(TDuration timeout) { + if (Subsession) { + return Subsession->Close(timeout); + } + return true; +} + +} // namespace NYdb::NFederatedTopic diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h new file mode 100644 index 0000000000..7fd1020387 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_write_session.h @@ -0,0 +1,110 @@ +#pragma once + +#include <ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.h> + +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h> + +#include <ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h> + +#include <deque> + +namespace NYdb::NFederatedTopic { + +class TFederatedWriteSession : public NTopic::IWriteSession, + public NTopic::TContinuationTokenIssuer, + public std::enable_shared_from_this<TFederatedWriteSession> { + friend class TFederatedTopicClient::TImpl; + +public: + struct TDeferredWrite { + explicit TDeferredWrite(std::shared_ptr<NTopic::IWriteSession> writer) + : Writer(std::move(writer)) { + } + + void DoWrite() { + if (Token.Empty() && Message.Empty()) { + return; + } + Y_ABORT_UNLESS(Token.Defined() && Message.Defined()); + return Writer->Write(std::move(*Token), std::move(*Message)); + } + + std::shared_ptr<NTopic::IWriteSession> Writer; + TMaybe<NTopic::TContinuationToken> Token; + TMaybe<NTopic::TWriteMessage> Message; + }; + + TFederatedWriteSession(const TFederatedWriteSessionSettings& settings, + std::shared_ptr<TGRpcConnectionsImpl> connections, + const TFederatedTopicClientSettings& clientSetttings, + std::shared_ptr<TFederatedDbObserver> observer); + + ~TFederatedWriteSession() = default; + + NThreading::TFuture<void> WaitEvent() override; + TMaybe<NTopic::TWriteSessionEvent::TEvent> GetEvent(bool block) override; + TVector<NTopic::TWriteSessionEvent::TEvent> GetEvents(bool block, TMaybe<size_t> maxEventsCount) override; + + virtual NThreading::TFuture<ui64> GetInitSeqNo() override; + + virtual void Write(NTopic::TContinuationToken&& continuationToken, NTopic::TWriteMessage&& message) override; + + virtual void WriteEncoded(NTopic::TContinuationToken&& continuationToken, NTopic::TWriteMessage&& params) override; + + virtual void Write(NTopic::TContinuationToken&&, TStringBuf, TMaybe<ui64> seqNo = Nothing(), + TMaybe<TInstant> createTimestamp = Nothing()) override; + + virtual void WriteEncoded(NTopic::TContinuationToken&&, TStringBuf, NTopic::ECodec, ui32, + TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) override; + + bool Close(TDuration timeout) override; + + inline NTopic::TWriterCounters::TPtr GetCounters() override {Y_ABORT("Unimplemented"); } //ToDo - unimplemented; + +private: + void Start(); + void OpenSubSessionImpl(std::shared_ptr<TDbInfo> db); + + std::shared_ptr<TDbInfo> SelectDatabaseImpl(); + + void OnFederatedStateUpdateImpl(); + void ScheduleFederatedStateUpdateImpl(TDuration delay); + + void WriteInternal(NTopic::TContinuationToken&&, NTopic::TWriteMessage&& message); + bool PrepareDeferredWrite(TDeferredWrite& deferred); + + void CloseImpl(EStatus statusCode, NYql::TIssues&& issues); + +private: + // For subsession creation + const NTopic::TFederatedWriteSessionSettings Settings; + std::shared_ptr<TGRpcConnectionsImpl> Connections; + const NTopic::TTopicClientSettings SubClientSetttings; + + std::shared_ptr<TFederatedDbObserver> Observer; + NThreading::TFuture<void> AsyncInit; + std::shared_ptr<TFederatedDbState> FederationState; + NYdbGrpc::IQueueClientContextPtr UpdateStateDelayContext; + + std::shared_ptr<TDbInfo> CurrentDatabase; + + TString SessionId; + const TInstant StartSessionTime = TInstant::Now(); + + TAdaptiveLock Lock; + + std::shared_ptr<NTopic::IWriteSession> Subsession; + + std::shared_ptr<NTopic::TWriteSessionEventsQueue> ClientEventsQueue; + + TMaybe<NTopic::TContinuationToken> PendingToken; // from Subsession + bool ClientHasToken = false; + std::deque<NTopic::TWriteMessage> OriginalMessagesToPassDown; + std::deque<NTopic::TWriteMessage> OriginalMessagesToGetAck; + i64 BufferFreeSpace; + + // Exiting. + bool Closing = false; +}; + +} // namespace NYdb::NFederatedTopic diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/ya.make b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/ya.make index 8d10ac776e..5febb8f00f 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/ya.make +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/ya.make @@ -4,6 +4,8 @@ SRCS( federated_read_session.h federated_read_session.cpp federated_read_session_event.cpp + federated_write_session.h + federated_write_session.cpp federated_topic_impl.h federated_topic_impl.cpp federated_topic.cpp diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp index 7b41ae11c7..a2780101c8 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp @@ -52,7 +52,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { .MaxMemoryUsageBytes(1_MB) .AppendTopics(setup->GetTestTopic()); - ReadSession = topicClient.CreateFederatedReadSession(readSettings); + ReadSession = topicClient.CreateReadSession(readSettings); Cerr << "Session was created" << Endl; ReadSession->WaitEvent().Wait(TDuration::Seconds(1)); @@ -136,7 +136,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { .AppendTopics(setup->GetTestTopic()); Cerr << "Before ReadSession was created" << Endl; - ReadSession = topicClient.CreateFederatedReadSession(readSettings); + ReadSession = topicClient.CreateReadSession(readSettings); Cerr << "Session was created" << Endl; auto f = ReadSession->WaitEvent(); @@ -230,7 +230,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { .MaxMemoryUsageBytes(1_MB) .AppendTopics(setup->GetTestTopic()); - ReadSession = topicClient.CreateFederatedReadSession(readSettings); + ReadSession = topicClient.CreateReadSession(readSettings); Cerr << "Session was created" << Endl; Sleep(TDuration::MilliSeconds(50)); @@ -342,7 +342,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { .MaxMemoryUsageBytes(1_MB) .AppendTopics(setup->GetTestTopic()); - ReadSession = topicClient.CreateFederatedReadSession(readSettings); + ReadSession = topicClient.CreateReadSession(readSettings); Cerr << "Session was created" << Endl; ReadSession->WaitEvent().Wait(TDuration::Seconds(1)); @@ -366,7 +366,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { Cerr << ">>> Got event: " << DebugString(*event) << Endl; UNIT_ASSERT(std::holds_alternative<NTopic::TSessionClosedEvent>(*event)); - auto ReadSession2 = topicClient.CreateFederatedReadSession(readSettings); + auto ReadSession2 = topicClient.CreateReadSession(readSettings); Cerr << "Session2 was created" << Endl; ReadSession2->WaitEvent().Wait(TDuration::Seconds(1)); @@ -407,7 +407,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { .MaxMemoryUsageBytes(1_MB) .AppendTopics(setup->GetTestTopic()); - ReadSession = topicClient.CreateFederatedReadSession(readSettings); + ReadSession = topicClient.CreateReadSession(readSettings); Cerr << "Session was created" << Endl; ReadSession->WaitEvent().Wait(TDuration::Seconds(1)); @@ -483,7 +483,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { checkedPromise.SetValue(); }); - ReadSession = topicClient.CreateFederatedReadSession(readSettings); + ReadSession = topicClient.CreateReadSession(readSettings); Cerr << ">>> Session was created" << Endl; Sleep(TDuration::MilliSeconds(50)); @@ -520,6 +520,73 @@ Y_UNIT_TEST_SUITE(BasicUsage) { AtomicSet(check, 0); } + Y_UNIT_TEST(BasicWriteSession) { + auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>( + TEST_CASE_NAME, false, ::NPersQueue::TTestServer::LOGGED_SERVICES, NActors::NLog::PRI_DEBUG, 2); + + setup->Start(true, true); + + TFederationDiscoveryServiceMock fdsMock; + fdsMock.Port = setup->GetGrpcPort(); + + ui16 newServicePort = setup->GetPortManager()->GetPort(4285); + auto grpcServer = setup->StartGrpcService(newServicePort, &fdsMock); + + std::shared_ptr<NYdb::NTopic::IWriteSession> WriteSession; + + // Create topic client. + NYdb::TDriverConfig cfg; + cfg.SetEndpoint(TStringBuilder() << "localhost:" << newServicePort); + cfg.SetDatabase("/Root"); + cfg.SetLog(CreateLogBackend("cerr", ELogPriority::TLOG_DEBUG)); + NYdb::TDriver driver(cfg); + NYdb::NFederatedTopic::TFederatedTopicClient topicClient(driver); + + // Create write session. + auto writeSettings = NTopic::TWriteSessionSettings() + .Path(setup->GetTestTopic()) + .MessageGroupId("src_id"); + + WriteSession = topicClient.CreateWriteSession(writeSettings); + Cerr << "Session was created" << Endl; + + WriteSession->WaitEvent().Wait(TDuration::Seconds(1)); + auto event = WriteSession->GetEvent(false); + Y_ASSERT(event); + Cerr << "Got new read session event: " << DebugString(*event) << Endl; + auto* readyToAcceptEvent = std::get_if<NYdb::NTopic::TWriteSessionEvent::TReadyToAcceptEvent>(&*event); + Y_ASSERT(readyToAcceptEvent); + WriteSession->Write(std::move(readyToAcceptEvent->ContinuationToken), NTopic::TWriteMessage("hello")); + + WriteSession->WaitEvent().Wait(TDuration::Seconds(1)); + event = WriteSession->GetEvent(false); + Y_ASSERT(event); + Cerr << "Got new read session event: " << DebugString(*event) << Endl; + + readyToAcceptEvent = std::get_if<NYdb::NTopic::TWriteSessionEvent::TReadyToAcceptEvent>(&*event); + Y_ASSERT(readyToAcceptEvent); + + std::optional<TFederationDiscoveryServiceMock::TManualRequest> fdsRequest; + do { + fdsRequest = fdsMock.GetNextPendingRequest(); + if (!fdsRequest.has_value()) { + Sleep(TDuration::MilliSeconds(50)); + } + } while (!fdsRequest.has_value()); + + fdsRequest->Result.SetValue(fdsMock.ComposeOkResult()); + + WriteSession->WaitEvent().Wait(TDuration::Seconds(1)); + event = WriteSession->GetEvent(false); + Y_ASSERT(event); + Cerr << "Got new read session event: " << DebugString(*event) << Endl; + + auto* acksEvent = std::get_if<NYdb::NTopic::TWriteSessionEvent::TAcksEvent>(&*event); + Y_ASSERT(acksEvent); + + WriteSession->Close(TDuration::MilliSeconds(10)); + } + } } diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp index 91962449f9..80e154517c 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp @@ -37,7 +37,7 @@ NThreading::TFuture<void> TWriteSession::WaitEvent() { void TWriteSession::WriteEncoded(TContinuationToken&& token, TStringBuf data, ECodec codec, ui32 originalSize, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp) { - auto message = TWriteMessage::CompressedMessage(data, codec, originalSize); + auto message = TWriteMessage::CompressedMessage(std::move(data), codec, originalSize); if (seqNo.Defined()) message.SeqNo(*seqNo); if (createTimestamp.Defined()) @@ -52,7 +52,7 @@ void TWriteSession::WriteEncoded(TContinuationToken&& token, TWriteMessage&& mes void TWriteSession::Write(TContinuationToken&& token, TStringBuf data, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp) { - TWriteMessage message{data}; + TWriteMessage message{std::move(data)}; if (seqNo.Defined()) message.SeqNo(*seqNo); if (createTimestamp.Defined()) diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp index c89cdf1eca..1a45dd1ded 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp @@ -400,7 +400,6 @@ NThreading::TFuture<void> TWriteSessionImpl::WaitEvent() { return EventsQueue->WaitEvent(); } -// Client method. void TWriteSessionImpl::WriteInternal(TContinuationToken&&, TWriteMessage&& message) { TInstant createdAtValue = message.CreateTimestamp_.Defined() ? *message.CreateTimestamp_ : TInstant::Now(); bool readyToAccept = false; @@ -425,6 +424,7 @@ void TWriteSessionImpl::Write(TContinuationToken&& token, TWriteMessage&& messag WriteInternal(std::move(token), std::move(message)); } +// Client method. void TWriteSessionImpl::WriteEncoded(TContinuationToken&& token, TWriteMessage&& message) { WriteInternal(std::move(token), std::move(message)); @@ -1245,8 +1245,9 @@ void TWriteSessionImpl::SendImpl() { // Client method, no Lock bool TWriteSessionImpl::Close(TDuration closeTimeout) { - if (AtomicGet(Aborting)) + if (AtomicGet(Aborting)) { return false; + } LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session: close. Timeout " << closeTimeout); auto startTime = TInstant::Now(); auto remaining = closeTimeout; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h index 9373e9c811..8c527ac843 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h @@ -103,6 +103,7 @@ public: private: struct THandlersVisitor : public TParent::TBaseHandlersVisitor { using TParent::TBaseHandlersVisitor::TBaseHandlersVisitor; + #define DECLARE_HANDLER(type, handler, answer) \ bool operator()(type&) { \ if (this->PushHandler<type>( \ @@ -122,7 +123,6 @@ private: bool Visit() { return std::visit(*this, Event); } - }; bool ApplyHandler(TEventInfo& eventInfo) { diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h index cdef92ad5c..309749aa1f 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h @@ -1236,7 +1236,6 @@ struct TWriteSessionEvent { TMaybe<TWrittenMessageDetails> Details; //! Write stats from server. See TWriteStat. nullptr for DISCARDED event. TWriteStat::TPtr Stat; - }; struct TAcksEvent : public TPrintable<TAcksEvent> { diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp index 8b4dc03613..d0b5320578 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp @@ -143,18 +143,24 @@ Y_UNIT_TEST_SUITE(BasicUsage) { .Path(TEST_TOPIC) .ProducerId(TEST_MESSAGE_GROUP_ID) .MessageGroupId(TEST_MESSAGE_GROUP_ID); + Cerr << ">>> open write session " << i << Endl; auto writeSession = client.CreateSimpleBlockingWriteSession(writeSettings); UNIT_ASSERT(writeSession->Write("message_using_MessageGroupId")); + Cerr << ">>> write session " << i << " message written" << Endl; writeSession->Close(); + Cerr << ">>> write session " << i << " closed" << Endl; } { auto writeSettings = TWriteSessionSettings() .Path(TEST_TOPIC) .ProducerId(TEST_MESSAGE_GROUP_ID) .PartitionId(0); + Cerr << ">>> open write session 100" << Endl; auto writeSession = client.CreateSimpleBlockingWriteSession(writeSettings); UNIT_ASSERT(writeSession->Write("message_using_PartitionId")); + Cerr << ">>> write session 100 message written" << Endl; writeSession->Close(); + Cerr << ">>> write session 100 closed" << Endl; } { |