diff options
author | ildar-khisam <ikhis@ydb.tech> | 2023-03-08 23:06:35 +0300 |
---|---|---|
committer | ildar-khisam <ikhis@ydb.tech> | 2023-03-08 23:06:35 +0300 |
commit | befe2fdf7ccfe782cfbd05cff6f862b08d0ab4b9 (patch) | |
tree | 2b93aa6c46982906fc11289f8ad88b0f358cf4d2 | |
parent | b83476f25d94210fd0b0e6a50c763238d6193420 (diff) | |
download | ydb-befe2fdf7ccfe782cfbd05cff6f862b08d0ab4b9.tar.gz |
impl tracker proof of idea
impl tracker proof of idea
26 files changed, 4040 insertions, 3161 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/CMakeLists.darwin-x86_64.txt index ed9156bb8a..8e87266134 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/CMakeLists.darwin-x86_64.txt +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/CMakeLists.darwin-x86_64.txt @@ -26,6 +26,7 @@ target_sources(client-ydb_persqueue_core-impl PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session_messages.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue_impl.cpp diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/CMakeLists.linux-aarch64.txt index 27ae8ed56a..cc6a7c90c2 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/CMakeLists.linux-aarch64.txt +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/CMakeLists.linux-aarch64.txt @@ -27,6 +27,7 @@ target_sources(client-ydb_persqueue_core-impl PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session_messages.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue_impl.cpp diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/CMakeLists.linux-x86_64.txt index 27ae8ed56a..cc6a7c90c2 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/CMakeLists.linux-x86_64.txt +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/CMakeLists.linux-x86_64.txt @@ -27,6 +27,7 @@ target_sources(client-ydb_persqueue_core-impl PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session_messages.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue_impl.cpp diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h index a5a3c2075a..7e2c26b66c 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h @@ -1,6 +1,7 @@ #pragma once #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/impl_tracker.h> #include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h> #include <util/generic/queue.h> @@ -313,9 +314,10 @@ protected: // Template for visitor implementation. struct TBaseHandlersVisitor { - TBaseHandlersVisitor(const TSettings& settings, TEvent& event) + TBaseHandlersVisitor(const TSettings& settings, TEvent& event, std::shared_ptr<TImplTracker> tracker) : Settings(settings) , Event(event) + , Tracker(tracker) {} template <class TEventType, class TFunc, class TCommonFunc> @@ -333,16 +335,16 @@ protected: template <class TEventType, class TFunc> void PushSpecificHandler(TEvent&& event, const TFunc& f) { - Post(Settings.EventHandlers_.HandlersExecutor_, [func = f, event = std::move(event)]() mutable { - func(std::get<TEventType>(event)); - }); + Post(Settings.EventHandlers_.HandlersExecutor_, + [func = f, event = std::move(event), wire = Tracker->MakeTrackedWire()]() mutable { + func(std::get<TEventType>(event)); + }); } template <class TFunc> void PushCommonHandler(TEvent&& event, const TFunc& f) { - Post(Settings.EventHandlers_.HandlersExecutor_, [func = f, event = std::move(event)]() mutable { - func(event); - }); + Post(Settings.EventHandlers_.HandlersExecutor_, + [func = f, event = std::move(event), wire = Tracker->MakeTrackedWire()]() mutable { func(event); }); } virtual void Post(const typename TExecutor::TPtr& executor, typename TExecutor::TFunction&& f) { @@ -351,6 +353,7 @@ protected: const TSettings& Settings; TEvent& Event; + std::shared_ptr<TImplTracker> Tracker; }; diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/impl_tracker.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/impl_tracker.h new file mode 100644 index 0000000000..98be271edf --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/impl_tracker.h @@ -0,0 +1,118 @@ +#pragma once + +#include <library/cpp/threading/future/core/future.h> +#include <util/system/guard.h> +#include <util/system/spinlock.h> + +#include <memory> + +namespace NYdb::NPersQueue { + +class TImplTracker { +public: + struct TWire { + TImplTracker* Owner; + + TWire() + : Owner(nullptr) { + } + + TWire(TImplTracker& owner) + : Owner(&owner) { + Owner->Increment(); + } + + ~TWire() { + if (Owner) { + Owner->Decrement(); + } + } + + TWire(const TWire& that) : Owner(that.Owner) { + if (Owner) + Owner->Increment(); + } + TWire(TWire&& that) + : Owner(std::exchange(that.Owner, nullptr)) { + } + TWire& operator=(const TWire& that) { + if (Owner == that.Owner) { + return *this; + } + if (Owner) { + Owner->Decrement(); + } + Owner = that.Owner; + if (Owner) { + Owner->Increment(); + } + return *this; + } + TWire& operator=(TWire&& that) { + if (this == &that) { + return *this; + } + if (Owner == that.Owner) { + if (that.Owner) { + that.Owner->Decrement(); + } + that.Owner = nullptr; + return *this; + } + if (Owner) { + Owner->Decrement(); + } + Owner = std::exchange(that.Owner, nullptr); + return *this; + } + + operator bool() const { + return Owner; + } + }; + +public: + std::shared_ptr<TWire> MakeTrackedWire() { + return std::make_shared<TWire>(*this); + } + + void Increment() { + with_lock(Lock) { + Y_VERIFY(!AwaitingCompletion()); + ++RefCount; + } + } + + void Decrement() { + with_lock(Lock) { + --RefCount; + if (RefCount == 0 && AwaitingCompletion()) { + CompletionPromise.SetValue(); + } + } + } + + NThreading::TFuture<void> AsyncComplete() { + with_lock(Lock) { + if (!AwaitingCompletion()) { + CompletionPromise = NThreading::NewPromise(); + } + if (RefCount == 0) { + CompletionPromise.SetValue(); + } + return CompletionPromise.GetFuture(); + } + } + +private: + inline bool AwaitingCompletion() const { + return CompletionPromise.Initialized(); + } + +private: + TSpinLock Lock; + size_t RefCount = 0; + NThreading::TPromise<void> CompletionPromise; +}; + +} diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp index c388d1a6be..ca1b9cde81 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp @@ -57,6 +57,10 @@ TReadSession::TReadSession(const TReadSessionSettings& settings, TReadSession::~TReadSession() { Abort(EStatus::ABORTED, "Aborted"); ClearAllEvents(); + + if (Tracker) { + Tracker->AsyncComplete().Wait(); + } } Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest TReadSession::MakeClusterDiscoveryRequest() const { @@ -71,7 +75,8 @@ Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest TReadSession::MakeClus void TReadSession::Start() { ErrorHandler = MakeIntrusive<TErrorHandler<true>>(weak_from_this()); - EventsQueue = std::make_shared<TReadSessionEventsQueue<true>>(Settings, weak_from_this()); + Tracker = std::make_shared<TImplTracker>(); + EventsQueue = std::make_shared<TReadSessionEventsQueue<true>>(Settings, weak_from_this(), Tracker); if (!ValidateSettings()) { return; @@ -165,6 +170,8 @@ void TReadSession::ProceedWithoutClusterDiscovery() { } void TReadSession::CreateClusterSessionsImpl(TDeferredActions<true>& deferred) { + Y_VERIFY(Lock.IsLocked()); + // Create cluster sessions. ui64 partitionStreamIdStart = 1; const size_t clusterSessionsCount = ClusterSessions.size(); @@ -196,7 +203,9 @@ void TReadSession::CreateClusterSessionsImpl(TDeferredActions<true>& deferred) { EventsQueue, ErrorHandler, context, - partitionStreamIdStart++, clusterSessionsCount); + partitionStreamIdStart++, + clusterSessionsCount, + Tracker); deferred.DeferStartSession(clusterSessionInfo.Session); } @@ -304,6 +313,10 @@ void TReadSession::OnClusterDiscovery(const TStatus& status, const Ydb::PersQueu } void TReadSession::RestartClusterDiscoveryImpl(TDuration delay, TDeferredActions<true>& deferred) { + Y_VERIFY(Lock.IsLocked()); + if (Aborting || Closing) { + return; + } Log.Write(TLOG_DEBUG, GetLogPrefix() << "Restart cluster discovery in " << delay); auto startCallback = [self = weak_from_this()](bool ok) { if (ok) { @@ -403,6 +416,8 @@ bool TReadSession::Close(TDuration timeout) { } void TReadSession::AbortImpl(TSessionClosedEvent&& closeEvent, TDeferredActions<true>& deferred) { + Y_VERIFY(Lock.IsLocked()); + if (!Aborting) { Aborting = true; Log.Write(TLOG_NOTICE, GetLogPrefix() << "Aborting read session. Description: " << closeEvent.DebugString()); @@ -424,10 +439,14 @@ void TReadSession::AbortImpl(TSessionClosedEvent&& closeEvent, TDeferredActions< } void TReadSession::AbortImpl(EStatus statusCode, NYql::TIssues&& issues, TDeferredActions<true>& deferred) { + Y_VERIFY(Lock.IsLocked()); + AbortImpl(TSessionClosedEvent(statusCode, std::move(issues)), deferred); } void TReadSession::AbortImpl(EStatus statusCode, const TString& message, TDeferredActions<true>& deferred) { + Y_VERIFY(Lock.IsLocked()); + NYql::TIssues issues; issues.AddIssue(message); AbortImpl(statusCode, std::move(issues), deferred); @@ -577,19 +596,22 @@ void TReadSession::DumpCountersToLog(size_t timeNumber) { void TReadSession::ScheduleDumpCountersToLog(size_t timeNumber) { with_lock(Lock) { + if (Aborting || Closing) { + return; + } DumpCountersContext = Connections->CreateContext(); - } - if (DumpCountersContext) { - auto callback = [self = weak_from_this(), timeNumber](bool ok) { - if (ok) { - if (auto sharedSelf = self.lock()) { - sharedSelf->DumpCountersToLog(timeNumber); + if (DumpCountersContext) { + auto callback = [self = weak_from_this(), timeNumber](bool ok) { + if (ok) { + if (auto sharedSelf = self.lock()) { + sharedSelf->DumpCountersToLog(timeNumber); + } } - } - }; - Connections->ScheduleCallback(TDuration::Seconds(1), - std::move(callback), - DumpCountersContext); + }; + Connections->ScheduleCallback(TDuration::Seconds(1), + std::move(callback), + DumpCountersContext); + } } } diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h index 3472777892..d3fa055a6c 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h @@ -1,6 +1,7 @@ #pragma once #include "common.h" +#include "impl_tracker.h" #include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h> @@ -762,7 +763,8 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti public: TReadSessionEventsQueue(const TAReadSessionSettings<UseMigrationProtocol>& settings, - std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session); + std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session, + std::shared_ptr<TImplTracker> tracker = std::make_shared<TImplTracker>()); TReadSessionEventInfo<UseMigrationProtocol> GetEventImpl(size_t& maxByteSize, @@ -831,10 +833,13 @@ public: private: struct THandlersVisitor : public TParent::TBaseHandlersVisitor { - THandlersVisitor(const TAReadSessionSettings<UseMigrationProtocol>& settings, typename TParent::TEvent& event, TDeferredActions<UseMigrationProtocol>& deferred) - : TParent::TBaseHandlersVisitor(settings, event) - , Deferred(deferred) - {} + THandlersVisitor(const TAReadSessionSettings<UseMigrationProtocol>& settings, + typename TParent::TEvent& event, + TDeferredActions<UseMigrationProtocol>& deferred, + std::shared_ptr<TImplTracker> tracker = std::make_shared<TImplTracker>()) + : TParent::TBaseHandlersVisitor(settings, event, tracker) + , Deferred(deferred) { + } #define DECLARE_HANDLER(type, handler, answer) \ bool operator()(type&) { \ @@ -883,12 +888,14 @@ private: TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>& accumulator); // Assumes that we're under lock. bool ApplyHandler(TReadSessionEventInfo<UseMigrationProtocol>& eventInfo, TDeferredActions<UseMigrationProtocol>& deferred) { - THandlersVisitor visitor(this->Settings, eventInfo.GetEvent(), deferred); + THandlersVisitor visitor(this->Settings, eventInfo.GetEvent(), deferred, Tracker); return visitor.Visit(); } +private: bool HasEventCallbacks; std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> Session; + std::shared_ptr<TImplTracker> Tracker; }; } // namespace NYdb::NPersQueue @@ -924,7 +931,8 @@ template <bool UseMigrationProtocol> class TSingleClusterReadSessionImpl : public std::enable_shared_from_this<TSingleClusterReadSessionImpl<UseMigrationProtocol>>, public IUserRetrievedEventCallback<UseMigrationProtocol> { public: - using TPtr = std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>>; + using TSelf = TSingleClusterReadSessionImpl<UseMigrationProtocol>; + using TPtr = std::shared_ptr<TSelf>; using IProcessor = typename IReadSessionConnectionProcessorFactory<UseMigrationProtocol>::IProcessor; @@ -940,7 +948,9 @@ public: std::shared_ptr<TReadSessionEventsQueue<UseMigrationProtocol>> eventsQueue, typename IErrorHandler<UseMigrationProtocol>::TPtr errorHandler, NGrpc::IQueueClientContextPtr clientContext, - ui64 partitionStreamIdStart, ui64 partitionStreamIdStep + ui64 partitionStreamIdStart, + ui64 partitionStreamIdStep, + std::shared_ptr<TImplTracker> tracker = std::make_shared<TImplTracker>() ) : Settings(settings) , Database(database) @@ -956,6 +966,7 @@ public: , CookieMapping(ErrorHandler) , ReadSizeBudget(GetCompressedDataSizeLimit()) , ReadSizeServerDelta(0) + , Tracker(std::move(tracker)) { } @@ -977,6 +988,7 @@ public: void OnUserRetrievedEvent(i64 decompressedSize, size_t messagesCount) override; void Abort(); + void AbortImpl(); void Close(std::function<void()> callback); bool Reconnect(const TPlainStatus& status); @@ -1177,6 +1189,8 @@ private: std::atomic<int> DecompressionTasksInflight = 0; i64 ReadSizeBudget; i64 ReadSizeServerDelta = 0; + + std::shared_ptr<TImplTracker> Tracker; }; // High level class that manages several read session impls. @@ -1298,6 +1312,9 @@ private: // Exiting. bool Aborting = false; bool Closing = false; + + // + std::shared_ptr<TImplTracker> Tracker; }; } // namespace NYdb::NPersQueue diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp index 9a1948a174..e229b45626 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp @@ -251,6 +251,10 @@ bool TSingleClusterReadSessionImpl<UseMigrationProtocol>::Reconnect(const TPlain NGrpc::IQueueClientContextPtr prevConnectTimeoutContext; NGrpc::IQueueClientContextPtr prevConnectDelayContext; + // Callbacks + std::function<void(TPlainStatus&&, typename IProcessor::TPtr&&)> connectCallback; + std::function<void(bool)> connectTimeoutCallback; + if (!status.Ok()) { Log.Write(TLOG_INFO, GetLogPrefix() << "Got error. Status: " << status.Status << ". Description: " << IssuesSingleLineString(status.Issues)); @@ -301,6 +305,20 @@ bool TSingleClusterReadSessionImpl<UseMigrationProtocol>::Reconnect(const TPlain // Destroy all partition streams before connecting. DestroyAllPartitionStreamsImpl(deferred); + + connectCallback = [wire = Tracker->MakeTrackedWire(), + sessionImpl = this->shared_from_this(), + connectContext = connectContext](TPlainStatus&& st, typename IProcessor::TPtr&& processor) { + sessionImpl->OnConnect(std::move(st), std::move(processor), connectContext); // OnConnect could be called inplace! + }; + + connectTimeoutCallback = [wire = Tracker->MakeTrackedWire(), + sessionImpl = this->shared_from_this(), + connectTimeoutContext = connectTimeoutContext](bool ok) { + if (ok) { + sessionImpl->OnConnectTimeout(connectTimeoutContext); + } + }; } // Cancel previous operations. @@ -308,23 +326,6 @@ bool TSingleClusterReadSessionImpl<UseMigrationProtocol>::Reconnect(const TPlain Cancel(prevConnectTimeoutContext); Cancel(prevConnectDelayContext); - auto connectCallback = [weakThis = TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(), - connectContext = connectContext](TPlainStatus&& st, typename IProcessor::TPtr&& processor) { - if (auto sharedThis = weakThis.lock()) { - sharedThis->OnConnect(std::move(st), std::move(processor), - connectContext); // OnConnect could be called inplace! - } - }; - - auto connectTimeoutCallback = [weakThis = TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(), - connectTimeoutContext = connectTimeoutContext](bool ok) { - if (ok) { - if (auto sharedThis = weakThis.lock()) { - sharedThis->OnConnectTimeout(connectTimeoutContext); - } - } - }; - Y_ASSERT(connectContext); Y_ASSERT(connectTimeoutContext); Y_ASSERT((delay == TDuration::Zero()) == !delayContext); @@ -343,6 +344,7 @@ bool TSingleClusterReadSessionImpl<UseMigrationProtocol>::Reconnect(const TPlain template <bool UseMigrationProtocol> void TSingleClusterReadSessionImpl<UseMigrationProtocol>::BreakConnectionAndReconnectImpl( TPlainStatus&& status, TDeferredActions<UseMigrationProtocol>& deferred) { + Y_VERIFY(Lock.IsLocked()); Log.Write(TLOG_INFO, GetLogPrefix() << "Break connection due to unexpected message from server. Status: " << status.Status << ", Issues: \"" << IssuesSingleLineString(status.Issues) << "\""); @@ -351,7 +353,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::BreakConnectionAndReco Processor = nullptr; RetryState = Settings.RetryPolicy_->CreateRetryState(); // Explicitly create retry state to determine whether we should connect to server again. - deferred.DeferReconnection(TSingleClusterReadSessionImpl<UseMigrationProtocol>::shared_from_this(), ErrorHandler, std::move(status)); + deferred.DeferReconnection(this->shared_from_this(), ErrorHandler, std::move(status)); } template<bool UseMigrationProtocol> @@ -420,7 +422,8 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnConnect( } template<> -inline void TSingleClusterReadSessionImpl<true>::InitImpl(TDeferredActions<true>& deferred) { // Assumes that we're under lock. +inline void TSingleClusterReadSessionImpl<true>::InitImpl(TDeferredActions<true>& deferred) { + Y_VERIFY(Lock.IsLocked()); Log.Write(TLOG_DEBUG, GetLogPrefix() << "Successfully connected. Initializing session"); TClientMessage<true> req; auto& init = *req.mutable_init_request(); @@ -450,7 +453,8 @@ inline void TSingleClusterReadSessionImpl<true>::InitImpl(TDeferredActions<true> } template<> -inline void TSingleClusterReadSessionImpl<false>::InitImpl(TDeferredActions<false>& deferred) { // Assumes that we're under lock. +inline void TSingleClusterReadSessionImpl<false>::InitImpl(TDeferredActions<false>& deferred) { + Y_VERIFY(Lock.IsLocked()); Log.Write(TLOG_DEBUG, GetLogPrefix() << "Successfully connected. Initializing session"); TClientMessage<false> req; auto& init = *req.mutable_init_request(); @@ -486,7 +490,9 @@ inline void TSingleClusterReadSessionImpl<false>::InitImpl(TDeferredActions<fals } template<bool UseMigrationProtocol> -void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ContinueReadingDataImpl() { // Assumes that we're under lock. +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ContinueReadingDataImpl() { + Y_VERIFY(Lock.IsLocked()); + if (!Closing && !Aborting && !WaitingReadResponse @@ -531,7 +537,8 @@ TString GetCluster(const TPartitionStreamImpl<UseMigrationProtocol>* partitionSt } template<bool UseMigrationProtocol> -bool TSingleClusterReadSessionImpl<UseMigrationProtocol>::IsActualPartitionStreamImpl(const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream) { // Assumes that we're under lock. +bool TSingleClusterReadSessionImpl<UseMigrationProtocol>::IsActualPartitionStreamImpl(const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream) { + Y_VERIFY(Lock.IsLocked()); auto actualPartitionStreamIt = PartitionStreams.find(partitionStream->GetAssignId()); return actualPartitionStreamIt != PartitionStreams.end() && GetPartitionStreamId(actualPartitionStreamIt->second.Get()) == GetPartitionStreamId(partitionStream); @@ -750,7 +757,8 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::OnUserRetrievedEvent(i template <bool UseMigrationProtocol> void TSingleClusterReadSessionImpl<UseMigrationProtocol>::WriteToProcessorImpl( - TClientMessage<UseMigrationProtocol>&& req) { // Assumes that we're under lock. + TClientMessage<UseMigrationProtocol>&& req) { + Y_VERIFY(Lock.IsLocked()); if (Processor) { Processor->Write(std::move(req)); @@ -759,6 +767,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::WriteToProcessorImpl( template<bool UseMigrationProtocol> bool TSingleClusterReadSessionImpl<UseMigrationProtocol>::HasCommitsInflightImpl() const { + Y_VERIFY(Lock.IsLocked()); for (const auto& [id, partitionStream] : PartitionStreams) { if (partitionStream->HasCommitsInflight()) return true; @@ -768,7 +777,11 @@ bool TSingleClusterReadSessionImpl<UseMigrationProtocol>::HasCommitsInflightImpl template <bool UseMigrationProtocol> void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ReadFromProcessorImpl( - TDeferredActions<UseMigrationProtocol>& deferred) { // Assumes that we're under lock. + TDeferredActions<UseMigrationProtocol>& deferred) { + Y_VERIFY(Lock.IsLocked()); + if (Aborting) { + return; + } if (Closing && !HasCommitsInflightImpl()) { Processor->Cancel(); CallCloseCallbackImpl(); @@ -778,14 +791,13 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ReadFromProcessorImpl( if (Processor) { ServerMessage->Clear(); - auto callback = [weakThis = TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(), + auto callback = [wire = Tracker->MakeTrackedWire(), + sessionImpl = this->shared_from_this(), connectionGeneration = ConnectionGeneration, // Capture message & processor not to read in freed memory. serverMessage = ServerMessage, processor = Processor](NGrpc::TGrpcStatus&& grpcStatus) { - if (auto sharedThis = weakThis.lock()) { - sharedThis->OnReadDone(std::move(grpcStatus), connectionGeneration); - } + sessionImpl->OnReadDone(std::move(grpcStatus), connectionGeneration); }; deferred.DeferReadFromProcessor(Processor, ServerMessage.get(), std::move(callback)); @@ -885,7 +897,8 @@ template <> template <> inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::InitResponse&& msg, - TDeferredActions<true>& deferred) { // Assumes that we're under lock. + TDeferredActions<true>& deferred) { + Y_VERIFY(Lock.IsLocked()); Y_UNUSED(deferred); Log.Write(TLOG_INFO, GetLogPrefix() << "Server session id: " << msg.session_id()); @@ -900,7 +913,8 @@ template <> template <> inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch&& msg, - TDeferredActions<true>& deferred) { // Assumes that we're under lock. + TDeferredActions<true>& deferred) { + Y_VERIFY(Lock.IsLocked()); if (Closing || Aborting) { return; // Don't process new data. } @@ -987,7 +1001,9 @@ template <> template <> inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::Assigned&& msg, - TDeferredActions<true>& deferred) { // Assumes that we're under lock. + TDeferredActions<true>& deferred) { + Y_VERIFY(Lock.IsLocked()); + auto partitionStream = MakeIntrusive<TPartitionStreamImpl<true>>( NextPartitionStreamId, msg.topic().path(), msg.cluster(), msg.partition() + 1, // Group. @@ -1020,7 +1036,9 @@ template <> template <> inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::Release&& msg, - TDeferredActions<true>& deferred) { // Assumes that we're under lock. + TDeferredActions<true>& deferred) { + Y_VERIFY(Lock.IsLocked()); + auto partitionStreamIt = PartitionStreams.find(msg.assign_id()); if (partitionStreamIt == PartitionStreams.end()) { return; @@ -1045,7 +1063,8 @@ template <> template <> inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::Committed&& msg, - TDeferredActions<true>& deferred) { // Assumes that we're under lock. + TDeferredActions<true>& deferred) { + Y_VERIFY(Lock.IsLocked()); Log.Write(TLOG_DEBUG, GetLogPrefix() << "Committed response: " << msg); @@ -1081,7 +1100,9 @@ template <> template <> inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( Ydb::PersQueue::V1::MigrationStreamingReadServerMessage::PartitionStatus&& msg, - TDeferredActions<true>& deferred) { // Assumes that we're under lock. + TDeferredActions<true>& deferred) { + Y_VERIFY(Lock.IsLocked()); + auto partitionStreamIt = PartitionStreams.find(msg.assign_id()); if (partitionStreamIt == PartitionStreams.end()) { return; @@ -1100,7 +1121,9 @@ template <> template <> inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( Ydb::Topic::StreamReadMessage::InitResponse&& msg, - TDeferredActions<false>& deferred) { // Assumes that we're under lock. + TDeferredActions<false>& deferred) { + + Y_VERIFY(Lock.IsLocked()); Y_UNUSED(deferred); RetryState = nullptr; @@ -1115,7 +1138,9 @@ template <> template <> inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( Ydb::Topic::StreamReadMessage::ReadResponse&& msg, - TDeferredActions<false>& deferred) { // Assumes that we're under lock. + TDeferredActions<false>& deferred) { + Y_VERIFY(Lock.IsLocked()); + if (Closing || Aborting) { return; // Don't process new data. } @@ -1194,7 +1219,9 @@ template <> template <> inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( Ydb::Topic::StreamReadMessage::StartPartitionSessionRequest&& msg, - TDeferredActions<false>& deferred) { // Assumes that we're under lock. + TDeferredActions<false>& deferred) { + Y_VERIFY(Lock.IsLocked()); + auto partitionStream = MakeIntrusive<TPartitionStreamImpl<false>>( NextPartitionStreamId, msg.partition_session().path(), msg.partition_session().partition_id(), msg.partition_session().partition_session_id(), msg.committed_offset(), @@ -1223,7 +1250,9 @@ template <> template <> inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( Ydb::Topic::StreamReadMessage::StopPartitionSessionRequest&& msg, - TDeferredActions<false>& deferred) { // Assumes that we're under lock. + TDeferredActions<false>& deferred) { + Y_VERIFY(Lock.IsLocked()); + auto partitionStreamIt = PartitionStreams.find(msg.partition_session_id()); if (partitionStreamIt == PartitionStreams.end()) { return; @@ -1247,7 +1276,8 @@ template <> template <> inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( Ydb::Topic::StreamReadMessage::CommitOffsetResponse&& msg, - TDeferredActions<false>& deferred) { // Assumes that we're under lock. + TDeferredActions<false>& deferred) { + Y_VERIFY(Lock.IsLocked()); Log.Write(TLOG_DEBUG, GetLogPrefix() << "Committed response: " << msg); @@ -1268,7 +1298,9 @@ template <> template <> inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( Ydb::Topic::StreamReadMessage::PartitionSessionStatusResponse&& msg, - TDeferredActions<false>& deferred) { // Assumes that we're under lock. + TDeferredActions<false>& deferred) { + Y_VERIFY(Lock.IsLocked()); + auto partitionStreamIt = PartitionStreams.find(msg.partition_session_id()); if (partitionStreamIt == PartitionStreams.end()) { return; @@ -1287,7 +1319,8 @@ template <> template <> inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( Ydb::Topic::UpdateTokenResponse&& msg, - TDeferredActions<false>& deferred) { // Assumes that we're under lock. + TDeferredActions<false>& deferred) { + Y_VERIFY(Lock.IsLocked()); // TODO Y_UNUSED(msg, deferred); } @@ -1296,6 +1329,11 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( template<bool UseMigrationProtocol> void TSingleClusterReadSessionImpl<UseMigrationProtocol>::StartDecompressionTasksImpl(TDeferredActions<UseMigrationProtocol>& deferred) { + Y_VERIFY(Lock.IsLocked()); + + if (Aborting) { + return; + } UpdateMemoryUsageStatisticsImpl(); const i64 limit = GetDecompressedDataSizeLimit(); Y_VERIFY(limit > 0); @@ -1319,6 +1357,8 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::StartDecompressionTask template<bool UseMigrationProtocol> void TSingleClusterReadSessionImpl<UseMigrationProtocol>::DestroyAllPartitionStreamsImpl(TDeferredActions<UseMigrationProtocol>& deferred) { + Y_VERIFY(Lock.IsLocked()); + using TClosedEvent = std::conditional_t< UseMigrationProtocol, NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent, @@ -1402,23 +1442,30 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::Abort() { Log.Write(TLOG_DEBUG, GetLogPrefix() << "Abort session to cluster"); with_lock (Lock) { - if (!Aborting) { - Aborting = true; - CloseCallback = {}; + AbortImpl(); + } +} - // Cancel(ClientContext); // Don't cancel, because this is used only as factory for other contexts. - Cancel(ConnectContext); - Cancel(ConnectTimeoutContext); - Cancel(ConnectDelayContext); +template<bool UseMigrationProtocol> +void TSingleClusterReadSessionImpl<UseMigrationProtocol>::AbortImpl() { + Y_VERIFY(Lock.IsLocked()); - if (ClientContext) { - ClientContext->Cancel(); - ClientContext.reset(); - } + if (!Aborting) { + Aborting = true; + CallCloseCallbackImpl(); - if (Processor) { - Processor->Cancel(); - } + // Cancel(ClientContext); // Don't cancel, because this is used only as factory for other contexts. + Cancel(ConnectContext); + Cancel(ConnectTimeoutContext); + Cancel(ConnectDelayContext); + + if (ClientContext) { + ClientContext->Cancel(); + ClientContext.reset(); + } + + if (Processor) { + Processor->Cancel(); } } } @@ -1453,15 +1500,13 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::Close(std::function<vo template<bool UseMigrationProtocol> void TSingleClusterReadSessionImpl<UseMigrationProtocol>::CallCloseCallbackImpl() { + Y_VERIFY(Lock.IsLocked()); + if (CloseCallback) { CloseCallback(); CloseCallback = {}; } - Aborting = true; // So abort call will have no effect. - if (ClientContext) { - ClientContext->Cancel(); - ClientContext.reset(); - } + AbortImpl(); } template<bool UseMigrationProtocol> @@ -1509,6 +1554,8 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::DumpStatisticsToLog(TL template<bool UseMigrationProtocol> void TSingleClusterReadSessionImpl<UseMigrationProtocol>::UpdateMemoryUsageStatisticsImpl() { + Y_VERIFY(Lock.IsLocked()); + const TInstant now = TInstant::Now(); const ui64 delta = (now - UsageStatisticsLastUpdateTime).MilliSeconds(); UsageStatisticsLastUpdateTime = now; @@ -1654,9 +1701,11 @@ bool TReadSessionEventInfo<UseMigrationProtocol>::IsDataEvent() const { template <bool UseMigrationProtocol> TReadSessionEventsQueue<UseMigrationProtocol>::TReadSessionEventsQueue( const TAReadSessionSettings<UseMigrationProtocol>& settings, - std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session) + std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session, + std::shared_ptr<TImplTracker> tracker) : TParent(settings) - , Session(std::move(session)) { + , Session(std::move(session)) + , Tracker(std::move(tracker)) { const auto& h = TParent::Settings.EventHandlers_; if constexpr (UseMigrationProtocol) { @@ -1973,7 +2022,7 @@ template <bool UseMigrationProtocol> bool TReadSessionEventsQueue<UseMigrationProtocol>::TryApplyCallbackToEventImpl(typename TParent::TEvent& event, TDeferredActions<UseMigrationProtocol>& deferred) { - THandlersVisitor visitor(TParent::Settings, event, deferred); + THandlersVisitor visitor(TParent::Settings, event, deferred, Tracker); return visitor.Visit(); } @@ -2002,14 +2051,20 @@ void TReadSessionEventsQueue<UseMigrationProtocol>::ApplyCallbackToEventImpl(TDa Y_VERIFY(HasEventCallbacks); if (TParent::Settings.EventHandlers_.DataReceivedHandler_) { - auto action = [func = TParent::Settings.EventHandlers_.DataReceivedHandler_, data = std::move(data), eventsInfo = std::move(eventsInfo)]() mutable { + auto action = [wire = Tracker->MakeTrackedWire(), + func = TParent::Settings.EventHandlers_.DataReceivedHandler_, + data = std::move(data), + eventsInfo = std::move(eventsInfo)]() mutable { func(data); eventsInfo.OnUserRetrievedEvent(); }; deferred.DeferStartExecutorTask(TParent::Settings.EventHandlers_.HandlersExecutor_, std::move(action)); } else if (TParent::Settings.EventHandlers_.CommonHandler_) { - auto action = [func = TParent::Settings.EventHandlers_.CommonHandler_, data = std::move(data), eventsInfo = std::move(eventsInfo)]() { + auto action = [wire = Tracker->MakeTrackedWire(), + func = TParent::Settings.EventHandlers_.CommonHandler_, + data = std::move(data), + eventsInfo = std::move(eventsInfo)]() mutable { typename TParent::TEvent event(std::move(data)); func(event); diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp index 685efd1e14..607ebfb855 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp @@ -8,1251 +8,62 @@ namespace NYdb::NPersQueue { -using ::NMonitoring::TDynamicCounterPtr; -using TCounterPtr = ::NMonitoring::TDynamicCounters::TCounterPtr; - -const TDuration UPDATE_TOKEN_PERIOD = TDuration::Hours(1); - -namespace NCompressionDetails { - THolder<IOutputStream> CreateCoder(ECodec codec, TBuffer& result, int quality); -} - -#define HISTOGRAM_SETUP NMonitoring::ExplicitHistogram({0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100}) -TWriterCounters::TWriterCounters(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters) { - Errors = counters->GetCounter("errors", true); - CurrentSessionLifetimeMs = counters->GetCounter("currentSessionLifetimeMs", false); - BytesWritten = counters->GetCounter("bytesWritten", true); - MessagesWritten = counters->GetCounter("messagesWritten", true); - BytesWrittenCompressed = counters->GetCounter("bytesWrittenCompressed", true); - BytesInflightUncompressed = counters->GetCounter("bytesInflightUncompressed", false); - BytesInflightCompressed = counters->GetCounter("bytesInflightCompressed", false); - BytesInflightTotal = counters->GetCounter("bytesInflightTotal", false); - MessagesInflight = counters->GetCounter("messagesInflight", false); - - TotalBytesInflightUsageByTime = counters->GetHistogram("totalBytesInflightUsageByTime", HISTOGRAM_SETUP); - UncompressedBytesInflightUsageByTime = counters->GetHistogram("uncompressedBytesInflightUsageByTime", HISTOGRAM_SETUP); - CompressedBytesInflightUsageByTime = counters->GetHistogram("compressedBytesInflightUsageByTime", HISTOGRAM_SETUP); -} -#undef HISTOGRAM_SETUP +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// TWriteSession TWriteSession::TWriteSession( const TWriteSessionSettings& settings, std::shared_ptr<TPersQueueClient::TImpl> client, std::shared_ptr<TGRpcConnectionsImpl> connections, TDbDriverStatePtr dbDriverState) - : Settings(settings) - , Client(std::move(client)) - , Connections(std::move(connections)) - , DbDriverState(std::move(dbDriverState)) - , PrevToken(DbDriverState->CredentialsProvider ? DbDriverState->CredentialsProvider->GetAuthInfo() : "") - , EventsQueue(std::make_shared<TWriteSessionEventsQueue>(Settings)) - , InitSeqNoPromise(NThreading::NewPromise<ui64>()) - , WakeupInterval( - Settings.BatchFlushInterval_.GetOrElse(TDuration::Zero()) ? - std::min(Settings.BatchFlushInterval_.GetOrElse(TDuration::Seconds(1)) / 5, TDuration::MilliSeconds(100)) - : - TDuration::MilliSeconds(100) - ) + : Tracker(std::make_shared<TImplTracker>()) + , Impl(std::make_shared<TWriteSessionImpl>(settings, std::move(client), std::move(connections), std::move(dbDriverState), Tracker)) { - if (!Settings.RetryPolicy_) { - Settings.RetryPolicy_ = IRetryPolicy::GetDefaultPolicy(); - } - if (Settings.PreferredCluster_ && !Settings.AllowFallbackToOtherClusters_) { - TargetCluster = *Settings.PreferredCluster_; - TargetCluster.to_lower(); - } - if (Settings.Counters_.Defined()) { - Counters = *Settings.Counters_; - } else { - Counters = MakeIntrusive<TWriterCounters>(new ::NMonitoring::TDynamicCounters()); - } - } void TWriteSession::Start(const TDuration& delay) { - ++ConnectionAttemptsDone; - if (!Started) { - HandleWakeUpImpl(); - InitWriter(); - } - Started = true; - - DoCdsRequest(delay); -} - -// Only called under lock -TWriteSession::THandleResult TWriteSession::RestartImpl(const TPlainStatus& status) { - THandleResult result; - if (AtomicGet(Aborting)) { - DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session is aborting and will not restart"); - return result; - } - DbDriverState->Log.Write( - TLOG_INFO, - LogPrefix() << "Got error. Status: " << status.Status - << ". Description: " << IssuesSingleLineString(status.Issues) - ); - SessionEstablished = false; - TMaybe<TDuration> nextDelay = TDuration::Zero(); - if (!RetryState) { - RetryState = Settings.RetryPolicy_->CreateRetryState(); - } - nextDelay = RetryState->GetNextRetryDelay(status.Status); - - if (nextDelay) { - result.StartDelay = *nextDelay; - result.DoRestart = true; - DbDriverState->Log.Write( - TLOG_DEBUG, - LogPrefix() << "Write session will restart in " << result.StartDelay.MilliSeconds() << " ms" - ); - ResetForRetryImpl(); - - } else { - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will not restart after a fatal error"); - result.DoStop = true; - CheckHandleResultImpl(result); - } - return result; + Impl->Start(delay); } -bool IsFederation(const TString& endpoint) { - TStringBuf host = GetHost(endpoint); - return host == "logbroker.yandex.net" || host == "logbroker-prestable.yandex.net"; -} - -void TWriteSession::DoCdsRequest(TDuration delay) { - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session: Do CDS request"); - auto weakThis = weak_from_this(); - - if ( - Settings.ClusterDiscoveryMode_ == EClusterDiscoveryMode::Off || - (Settings.ClusterDiscoveryMode_ == EClusterDiscoveryMode::Auto && !IsFederation(DbDriverState->DiscoveryEndpoint)) - ) { - DoConnect(delay, DbDriverState->DiscoveryEndpoint); - return; - } - - auto extractor = [weakThis] - (google::protobuf::Any* any, TPlainStatus status) mutable { - Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResult result; - if (any) { - any->UnpackTo(&result); - } - TStatus st(std::move(status)); - if (auto sharedThis = weakThis.lock()) { - sharedThis->OnCdsResponse(st, result); - } - }; - - Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest req; - auto* params = req.add_write_sessions(); - params->set_topic(Settings.Path_); - params->set_source_id(Settings.MessageGroupId_); - if (Settings.PartitionGroupId_.Defined()) - params->set_partition_group(*Settings.PartitionGroupId_); - if (Settings.PreferredCluster_.Defined()) - params->set_preferred_cluster_name(*Settings.PreferredCluster_); - - auto weakConnections = std::weak_ptr<TGRpcConnectionsImpl>(Connections); - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Do schedule cds request after " << delay.MilliSeconds() << " ms\n"); - auto cdsRequestCall = [req_=std::move(req), extr=std::move(extractor), weakConnections, dbState=DbDriverState, settings=Settings]() mutable { - if (auto connections = weakConnections.lock()) { - dbState->Log.Write(TLOG_INFO, TStringBuilder() << "MessageGroupId [" << settings.MessageGroupId_ << "] Running cds request ms\n"); - connections->RunDeferred<Ydb::PersQueue::V1::ClusterDiscoveryService, - Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest, - Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResponse>( - std::move(req_), - std::move(extr), - &Ydb::PersQueue::V1::ClusterDiscoveryService::Stub::AsyncDiscoverClusters, - dbState, - INITIAL_DEFERRED_CALL_DELAY, - TRpcRequestSettings::Make(settings)); // TODO: make client timeout setting - } - }; - Connections->ScheduleOneTimeTask(std::move(cdsRequestCall), delay); -} - -void TWriteSession::OnCdsResponse( - TStatus& status, const Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResult& result -) { - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Got CDS response: \n" << result.ShortDebugString()); - TString endpoint, name; - THandleResult handleResult; - if (!status.IsSuccess()) { - with_lock (Lock) { - handleResult = OnErrorImpl({ - status.GetStatus(), - MakeIssueWithSubIssues("Failed to discover clusters", status.GetIssues()) - }); - } - ProcessHandleResult(handleResult); - return; - } - - NYql::TIssues issues; - EStatus errorStatus = EStatus::INTERNAL_ERROR; - with_lock (Lock) { - const Ydb::PersQueue::ClusterDiscovery::WriteSessionClusters& wsClusters = result.write_sessions_clusters(0); - bool isFirst = true; - - for (const auto& clusterInfo : wsClusters.clusters()) { - TString normalizedName = clusterInfo.name(); - normalizedName.to_lower(); - - if(isFirst) { - isFirst = false; - PreferredClusterByCDS = clusterInfo.name(); - } - - if (!clusterInfo.available()) { - if (TargetCluster && TargetCluster == normalizedName) { - errorStatus = EStatus::UNAVAILABLE; - issues.AddIssue(TStringBuilder() << "Selected destination cluster: " << normalizedName - << " is currently disabled"); - break; - } - continue; - } - if (clusterInfo.endpoint().empty()) { - issues.AddIssue(TStringBuilder() << "Unexpected reply from cluster discovery. Empty endpoint for cluster " - << normalizedName); - } else { - name = clusterInfo.name(); - endpoint = ApplyClusterEndpoint(DbDriverState->DiscoveryEndpoint, clusterInfo.endpoint()); - break; - } - } - if (endpoint.empty()) { - errorStatus = EStatus::GENERIC_ERROR; - issues.AddIssue(TStringBuilder() << "Could not get valid endpoint from cluster discovery"); - } - } - if (issues) { - with_lock(Lock) { - handleResult = OnErrorImpl({errorStatus, std::move(issues)}); - } - ProcessHandleResult(handleResult); - return; - } - with_lock(Lock) { - if (!InitialCluster) { - InitialCluster = name; - } else if (CurrentCluster != name) { // Switched to another cluster - Y_VERIFY(CurrentCluster); - if (name == InitialCluster) { // Returned to initial cluster, disabled SeqNo Shift - SeqNoShift = 0; - OnSeqNoShift = false; - } else { // Switched from initial cluster to second one; - Y_VERIFY(CurrentCluster == InitialCluster); - if (AutoSeqNoMode.GetOrElse(true)) { - OnSeqNoShift = true; - } - } - - } - CurrentCluster = name; - } - DoConnect(TDuration::Zero(), endpoint); - -} - -void TWriteSession::InitWriter() { // No Lock, very initial start - no race yet as well. - CompressionExecutor = Settings.CompressionExecutor_; - IExecutor::TPtr executor; - executor = CreateSyncExecutor(); - executor->Start(); - Executor = std::move(executor); - - Settings.CompressionExecutor_->Start(); - Settings.EventHandlers_.HandlersExecutor_->Start(); - -} -// Client method NThreading::TFuture<ui64> TWriteSession::GetInitSeqNo() { - if (Settings.ValidateSeqNo_) { - if (AutoSeqNoMode.Defined() && *AutoSeqNoMode) { - DbDriverState->Log.Write(TLOG_ERR, LogPrefix() << "Cannot call GetInitSeqNo in Auto SeqNo mode"); - ThrowFatalError("Cannot call GetInitSeqNo in Auto SeqNo mode"); - } - else - AutoSeqNoMode = false; - } - return InitSeqNoPromise.GetFuture(); -} - -TString DebugString(const TWriteSessionEvent::TEvent& event) { - return std::visit([](const auto& ev) { return ev.DebugString(); }, event); + return Impl->GetInitSeqNo(); } -// Client method TMaybe<TWriteSessionEvent::TEvent> TWriteSession::GetEvent(bool block) { - return EventsQueue->GetEvent(block); + return Impl->EventsQueue->GetEvent(block); } -// Client method TVector<TWriteSessionEvent::TEvent> TWriteSession::GetEvents(bool block, TMaybe<size_t> maxEventsCount) { - return EventsQueue->GetEvents(block, maxEventsCount); -} - -// Only called under lock -ui64 TWriteSession::GetNextSeqNoImpl(const TMaybe<ui64>& seqNo) { - ui64 seqNoValue = LastSeqNo + 1; - if (!AutoSeqNoMode.Defined()) { - AutoSeqNoMode = !seqNo.Defined(); - } - if (seqNo.Defined()) { - if (*AutoSeqNoMode) { - DbDriverState->Log.Write( - TLOG_ERR, - LogPrefix() << "Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode" - ); - ThrowFatalError( - "Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode" - ); - - } else { - seqNoValue = *seqNo; - } - //! Disable SeqNo shift for manual SeqNo mode; - OnSeqNoShift = false; - SeqNoShift = 0; - } else if (!(*AutoSeqNoMode)) { - DbDriverState->Log.Write( - TLOG_ERR, - LogPrefix() << "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode" - ); - ThrowFatalError( - "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode" - ); - } - LastSeqNo = seqNoValue; - return seqNoValue; -} -inline void TWriteSession::CheckHandleResultImpl(THandleResult& result) { - result.DoSetSeqNo = result.DoStop && !InitSeqNoSetDone && (InitSeqNoSetDone = true); -} - -void TWriteSession::ProcessHandleResult(THandleResult& result) { - if (result.DoRestart) { - Start(result.StartDelay); - } else if (result.DoSetSeqNo) { - InitSeqNoPromise.SetException("session closed"); - } + return Impl->EventsQueue->GetEvents(block, maxEventsCount); } NThreading::TFuture<void> TWriteSession::WaitEvent() { - return EventsQueue->WaitEvent(); -} - -// Client method. -void TWriteSession::WriteInternal( - TContinuationToken&&, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp - ) { - TInstant createdAtValue = createTimestamp.Defined() ? *createTimestamp : TInstant::Now(); - bool readyToAccept = false; - size_t bufferSize = data.size(); - with_lock(Lock) { - CurrentBatch.Add(GetNextSeqNoImpl(seqNo), createdAtValue, data, codec, originalSize); - - FlushWriteIfRequiredImpl(); - readyToAccept = OnMemoryUsageChangedImpl(bufferSize).NowOk; - } - if (readyToAccept) { - EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); - } + return Impl->EventsQueue->WaitEvent(); } -// Client method. -void TWriteSession::WriteEncoded( - TContinuationToken&& token, TStringBuf data, ECodec codec, ui32 originalSize, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp - ) { - WriteInternal(std::move(token), data, codec, originalSize, seqNo, createTimestamp); +void TWriteSession::WriteEncoded(TContinuationToken&& token, TStringBuf data, ECodec codec, ui32 originalSize, + TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp) { + Impl->WriteInternal(std::move(token), data, codec, originalSize, seqNo, createTimestamp); } -void TWriteSession::Write( - TContinuationToken&& token, TStringBuf data, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp - ) { - WriteInternal(std::move(token), data, {}, 0, seqNo, createTimestamp); -} - - -// Only called under lock. -TWriteSession::THandleResult TWriteSession::OnErrorImpl(NYdb::TPlainStatus&& status) { - (*Counters->Errors)++; - auto result = RestartImpl(status); - if (result.DoStop) { - CloseImpl(status.Status, std::move(status.Issues)); - } - return result; +void TWriteSession::Write(TContinuationToken&& token, TStringBuf data, TMaybe<ui64> seqNo, + TMaybe<TInstant> createTimestamp) { + Impl->WriteInternal(std::move(token), data, {}, 0, seqNo, createTimestamp); } -// No lock -void TWriteSession::DoConnect(const TDuration& delay, const TString& endpoint) { - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Start write session. Will connect to endpoint: " << endpoint); - - NGrpc::IQueueClientContextPtr prevConnectContext; - NGrpc::IQueueClientContextPtr prevConnectTimeoutContext; - NGrpc::IQueueClientContextPtr prevConnectDelayContext; - NGrpc::IQueueClientContextPtr connectContext = nullptr; - NGrpc::IQueueClientContextPtr connectDelayContext = nullptr; - NGrpc::IQueueClientContextPtr connectTimeoutContext = nullptr; - TRpcRequestSettings reqSettings; - std::shared_ptr<IWriteSessionConnectionProcessorFactory> connectionFactory; - with_lock(Lock) { - ++ConnectionGeneration; - auto subclient = Client->GetClientForEndpoint(endpoint); - connectionFactory = subclient->CreateWriteSessionConnectionProcessorFactory(); - auto clientContext = subclient->CreateContext(); - ConnectionFactory = connectionFactory; - - ClientContext = std::move(clientContext); - ServerMessage = std::make_shared<TServerMessage>(); - - if (!ClientContext) { - AbortImpl(); - // Grpc and WriteSession is closing right now. - return; - } - - connectContext = ClientContext->CreateContext(); - if (delay) - connectDelayContext = ClientContext->CreateContext(); - connectTimeoutContext = ClientContext->CreateContext(); - - // Previous operations contexts. - - // Set new context - prevConnectContext = std::exchange(ConnectContext, connectContext); - prevConnectTimeoutContext = std::exchange(ConnectTimeoutContext, connectTimeoutContext); - prevConnectDelayContext = std::exchange(ConnectDelayContext, connectDelayContext); - Y_ASSERT(ConnectContext); - Y_ASSERT(ConnectTimeoutContext); - - // Cancel previous operations. - Cancel(prevConnectContext); - if (prevConnectDelayContext) - Cancel(prevConnectDelayContext); - Cancel(prevConnectTimeoutContext); - Y_ASSERT(connectContext); - Y_ASSERT(connectTimeoutContext); - reqSettings = TRpcRequestSettings::Make(Settings); - } - auto connectCallback = [weakThis = weak_from_this(), connectContext = connectContext] - (TPlainStatus&& st, typename IProcessor::TPtr&& processor) { - if (auto sharedThis = weakThis.lock()) { - sharedThis->OnConnect(std::move(st), std::move(processor), connectContext); - } - }; - - auto connectTimeoutCallback = [weakThis = weak_from_this(), connectTimeoutContext = connectTimeoutContext] - (bool ok) { - if (ok) { - if (auto sharedThis = weakThis.lock()) { - sharedThis->OnConnectTimeout(connectTimeoutContext); - } - } - }; - - connectionFactory->CreateProcessor( - std::move(connectCallback), - reqSettings, - std::move(connectContext), - TDuration::Seconds(30) /* connect timeout */, // TODO: make connect timeout setting. - std::move(connectTimeoutContext), - std::move(connectTimeoutCallback), - delay, - std::move(connectDelayContext) - ); -} - -// RPC callback. -void TWriteSession::OnConnectTimeout(const NGrpc::IQueueClientContextPtr& connectTimeoutContext) { - DbDriverState->Log.Write(TLOG_ERR, LogPrefix() << "Write session: connect timeout"); - THandleResult handleResult; - with_lock (Lock) { - if (ConnectTimeoutContext == connectTimeoutContext) { - Cancel(ConnectContext); - ConnectContext = nullptr; - ConnectTimeoutContext = nullptr; - ConnectDelayContext = nullptr; - } else { - return; - } - TStringBuilder description; - description << "Failed to establish connection to server. Attempts done: " << ConnectionAttemptsDone; - handleResult = RestartImpl(TPlainStatus(EStatus::TIMEOUT, description)); - if (handleResult.DoStop) { - CloseImpl( - EStatus::TIMEOUT, - description - ); - } - } - ProcessHandleResult(handleResult); -} - -// RPC callback. -void TWriteSession::OnConnect( - TPlainStatus&& st, typename IProcessor::TPtr&& processor, const NGrpc::IQueueClientContextPtr& connectContext -) { - THandleResult handleResult; - with_lock (Lock) { - if (ConnectContext == connectContext) { - Cancel(ConnectTimeoutContext); - ConnectContext = nullptr; - ConnectTimeoutContext = nullptr; - ConnectDelayContext = nullptr; - - if (st.Ok()) { - Processor = std::move(processor); - InitImpl(); - // Still should call ReadFromProcessor(); - } - } else { - return; - } - if (!st.Ok()) { - handleResult = RestartImpl(st); - if (handleResult.DoStop) { - CloseImpl( - st.Status, - MakeIssueWithSubIssues( - TStringBuilder() << "Failed to establish connection to server \"" << st.Endpoint - << "\". Attempts done: " << ConnectionAttemptsDone, - st.Issues - ) - ); - } - } - } - if (st.Ok()) - ReadFromProcessor(); // Out of Init - ProcessHandleResult(handleResult); -} - -// Produce init request for session. -// Only called under lock. -void TWriteSession::InitImpl() { - Ydb::PersQueue::V1::StreamingWriteClientMessage req; - auto* init = req.mutable_init_request(); - init->set_topic(Settings.Path_); - init->set_message_group_id(Settings.MessageGroupId_); - if (Settings.PartitionGroupId_) { - init->set_partition_group_id(*Settings.PartitionGroupId_); - } - init->set_max_supported_format_version(0); - init->set_preferred_cluster(PreferredClusterByCDS); - - for (const auto& attr : Settings.Meta_.Fields) { - (*init->mutable_session_meta())[attr.first] = attr.second; - } - DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: send init request: "<< req.ShortDebugString()); - WriteToProcessorImpl(std::move(req)); -} - -// Called under lock. Invokes Processor->Write, which is assumed to be deadlock-safe -void TWriteSession::WriteToProcessorImpl(TWriteSession::TClientMessage&& req) { - Y_ASSERT(Processor); - if (Aborting) - return; - auto callback = [weakThis = weak_from_this(), connectionGeneration = ConnectionGeneration](NGrpc::TGrpcStatus&& grpcStatus) { - if (auto sharedThis = weakThis.lock()) { - sharedThis->OnWriteDone(std::move(grpcStatus), connectionGeneration); - } - }; - - Processor->Write(std::move(req), callback); -} - -void TWriteSession::ReadFromProcessor() { - Y_ASSERT(Processor); - IProcessor::TPtr prc; - ui64 generation; - with_lock(Lock) { - prc = Processor; - generation = ConnectionGeneration; - } - auto callback = [weakThis = weak_from_this(), connectionGeneration = generation, processor = prc, serverMessage = ServerMessage] - (NGrpc::TGrpcStatus&& grpcStatus) { - if (auto sharedThis = weakThis.lock()) { - sharedThis->OnReadDone(std::move(grpcStatus), connectionGeneration); - } - }; - prc->Read(ServerMessage.get(), std::move(callback)); -} - -void TWriteSession::OnWriteDone(NGrpc::TGrpcStatus&& status, size_t connectionGeneration) { - THandleResult handleResult; - with_lock (Lock) { - if (connectionGeneration != ConnectionGeneration) { - return; // Message from previous connection. Ignore. - } - if (Aborting) { - return; - } - if(!status.Ok()) { - handleResult = OnErrorImpl(status); - } - } - ProcessHandleResult(handleResult); -} - -void TWriteSession::OnReadDone(NGrpc::TGrpcStatus&& grpcStatus, size_t connectionGeneration) { - TPlainStatus errorStatus; - TProcessSrvMessageResult processResult; - bool needSetValue = false; - if (!grpcStatus.Ok()) { - errorStatus = TPlainStatus(std::move(grpcStatus)); - } - bool doRead = false; - with_lock (Lock) { - UpdateTimedCountersImpl(); - if (connectionGeneration != ConnectionGeneration) { - return; // Message from previous connection. Ignore. - } - if (errorStatus.Ok()) { - if (IsErrorMessage(*ServerMessage)) { - errorStatus = MakeErrorFromProto(*ServerMessage); - } else { - processResult = ProcessServerMessageImpl(); - needSetValue = !InitSeqNoSetDone && processResult.InitSeqNo.Defined() && (InitSeqNoSetDone = true); - if (errorStatus.Ok() && processResult.Ok) { - doRead = true; - } - } - } - } - if (doRead) - ReadFromProcessor(); - - with_lock(Lock) { - if (!errorStatus.Ok()) { - if (processResult.Ok) { // Otherwise, OnError was already called - processResult.HandleResult = RestartImpl(errorStatus); - } - } - if (processResult.HandleResult.DoStop) { - CloseImpl(std::move(errorStatus)); - } - } - for (auto& event : processResult.Events) { - EventsQueue->PushEvent(std::move(event)); - } - if (needSetValue) { - InitSeqNoPromise.SetValue(*processResult.InitSeqNo); - processResult.HandleResult.DoSetSeqNo = false; // Redundant. Just in case. - } - ProcessHandleResult(processResult.HandleResult); -} - -TStringBuilder TWriteSession::LogPrefix() const { - return TStringBuilder() << "MessageGroupId [" << Settings.MessageGroupId_ << "] SessionId [" << SessionId << "] "; -} - -TString TWriteSessionEvent::TAcksEvent::DebugString() const { - TStringBuilder res; - res << "AcksEvent:"; - for (auto& ack : Acks) { - res << " { seqNo : " << ack.SeqNo << ", State : " << ack.State; - if (ack.Details) { - res << ", offset : " << ack.Details->Offset << ", partitionId : " << ack.Details->PartitionId; - } - res << " }"; - } - if (!Acks.empty() && Acks.back().Stat) { - auto& stat = Acks.back().Stat; - res << " write stat: Write time " << stat->WriteTime << " total time in partition queue " << stat->TotalTimeInPartitionQueue - << " partition quoted time " << stat->PartitionQuotedTime << " topic quoted time " << stat->TopicQuotedTime; - } - return res; -} - -TString TWriteSessionEvent::TReadyToAcceptEvent::DebugString() const { - return "ReadyToAcceptEvent"; -} - - -TWriteSession::TProcessSrvMessageResult TWriteSession::ProcessServerMessageImpl() { - TProcessSrvMessageResult result; - switch (ServerMessage->GetServerMessageCase()) { - case TServerMessage::SERVER_MESSAGE_NOT_SET: { - SessionEstablished = false; - result.HandleResult = OnErrorImpl({ - static_cast<NYdb::EStatus>(ServerMessage->status()), - {NYql::TIssue{ServerMessage->DebugString()}} - }); - result.Ok = false; - break; - } - case TServerMessage::kInitResponse: { - const auto& initResponse = ServerMessage->init_response(); - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session established. Init response: " << initResponse.ShortDebugString()); - SessionId = initResponse.session_id(); - PartitionId = initResponse.partition_id(); - ui64 newLastSeqNo = initResponse.last_sequence_number(); - // SeqNo increased, so there's a risk of loss, apply SeqNo shift. - // MinUnsentSeqNo must be > 0 if anything was ever sent yet - if (MinUnsentSeqNo && OnSeqNoShift && newLastSeqNo > MinUnsentSeqNo) { - SeqNoShift = newLastSeqNo - MinUnsentSeqNo; - } - result.InitSeqNo = newLastSeqNo; - LastSeqNo = newLastSeqNo; - - SessionEstablished = true; - LastCountersUpdateTs = TInstant::Now(); - SessionStartedTs = TInstant::Now(); - OnErrorResolved(); - - //EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); - result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); - // Kickstart send after session reestablishment - SendImpl(); - break; - } - case TServerMessage::kBatchWriteResponse: { - TWriteSessionEvent::TAcksEvent acksEvent; - const auto& batchWriteResponse = ServerMessage->batch_write_response(); - DbDriverState->Log.Write( - TLOG_DEBUG, - LogPrefix() << "Write session got write response: " << batchWriteResponse.ShortDebugString() - ); - TWriteStat::TPtr writeStat = new TWriteStat{}; - const auto& stat = batchWriteResponse.write_statistics(); - writeStat->WriteTime = TDuration::MilliSeconds(stat.persist_duration_ms()); - writeStat->TotalTimeInPartitionQueue = TDuration::MilliSeconds(stat.queued_in_partition_duration_ms()); - writeStat->PartitionQuotedTime = TDuration::MilliSeconds(stat.throttled_on_partition_duration_ms()); - writeStat->TopicQuotedTime = TDuration::MilliSeconds(stat.throttled_on_topic_duration_ms()); - - for (size_t messageIndex = 0, endIndex = batchWriteResponse.sequence_numbers_size(); messageIndex != endIndex; ++messageIndex) { - // TODO: Fill writer statistics - ui64 sequenceNumber = batchWriteResponse.sequence_numbers(messageIndex); - - acksEvent.Acks.push_back(TWriteSessionEvent::TWriteAck{ - sequenceNumber - SeqNoShift, - batchWriteResponse.already_written(messageIndex) ? TWriteSessionEvent::TWriteAck::EES_ALREADY_WRITTEN: - TWriteSessionEvent::TWriteAck::EES_WRITTEN, - TWriteSessionEvent::TWriteAck::TWrittenMessageDetails { - static_cast<ui64>(batchWriteResponse.offsets(messageIndex)), - PartitionId, - }, - writeStat, - }); - - if (CleanupOnAcknowledged(sequenceNumber - SeqNoShift)) { - result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); - } - } - //EventsQueue->PushEvent(std::move(acksEvent)); - result.Events.emplace_back(std::move(acksEvent)); - break; - } - case TServerMessage::kUpdateTokenResponse: { - UpdateTokenInProgress = false; - DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: token updated successfully"); - UpdateTokenIfNeededImpl(); - break; - } - } - return result; -} - -bool TWriteSession::CleanupOnAcknowledged(ui64 sequenceNumber) { - bool result = false; - DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: acknoledged message " << sequenceNumber); - UpdateTimedCountersImpl(); - const auto& sentFront = SentOriginalMessages.front(); - ui64 size = 0; - ui64 compressedSize = 0; - if(!SentPackedMessage.empty() && SentPackedMessage.front().Offset == sequenceNumber) { - auto memoryUsage = OnMemoryUsageChangedImpl(-SentPackedMessage.front().Data.size()); - result = memoryUsage.NowOk && !memoryUsage.WasOk; - //EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); - const auto& front = SentPackedMessage.front(); - if (front.Compressed) { - compressedSize = front.Data.size(); - } else { - size = front.Data.size(); - } - - (*Counters->MessagesWritten) += front.MessageCount; - (*Counters->MessagesInflight) -= front.MessageCount; - (*Counters->BytesWritten) += front.OriginalSize; - - SentPackedMessage.pop(); - } else { - size = sentFront.Size; - (*Counters->BytesWritten) += sentFront.Size; - (*Counters->MessagesWritten)++; - (*Counters->MessagesInflight)--; - } - - (*Counters->BytesInflightCompressed) -= compressedSize; - (*Counters->BytesWrittenCompressed) += compressedSize; - (*Counters->BytesInflightUncompressed) -= size; - - Y_VERIFY(Counters->BytesInflightCompressed->Val() >= 0); - Y_VERIFY(Counters->BytesInflightUncompressed->Val() >= 0); - - Y_VERIFY(sentFront.SeqNo == sequenceNumber); - - (*Counters->BytesInflightTotal) = MemoryUsage; - SentOriginalMessages.pop(); - return result; -} - -// Only called under Lock -TMemoryUsageChange TWriteSession::OnMemoryUsageChangedImpl(i64 diff) { - bool wasOk = MemoryUsage <= Settings.MaxMemoryUsage_; - //if (diff < 0) { - // Y_VERIFY(MemoryUsage >= static_cast<size_t>(std::abs(diff))); - //} - MemoryUsage += diff; - bool nowOk = MemoryUsage <= Settings.MaxMemoryUsage_; - if (wasOk != nowOk) { - if (wasOk) { - DbDriverState->Log.Write( - TLOG_DEBUG, - LogPrefix() << "Estimated memory usage " << MemoryUsage - << "[B] reached maximum (" << Settings.MaxMemoryUsage_ << "[B])" - ); - } - else { - DbDriverState->Log.Write( - TLOG_DEBUG, - LogPrefix() << "Estimated memory usage got back to normal " << MemoryUsage << "[B]" - ); - } - } - return {wasOk, nowOk}; -} - -TBuffer CompressBuffer(TVector<TStringBuf>& data, ECodec codec, i32 level) { - TBuffer result; - THolder<IOutputStream> coder = NCompressionDetails::CreateCoder(codec, result, level); - for (auto& buffer : data) { - coder->Write(buffer.data(), buffer.size()); - } - coder->Finish(); - return result; -} - -// May call OnCompressed with sync executor. No external lock. -void TWriteSession::CompressImpl(TBlock&& block_) { - auto weakThis = weak_from_this(); - bool isSyncCompression = !CompressionExecutor->IsAsync(); - Y_VERIFY(block_.Valid); - - std::shared_ptr<TBlock> blockPtr(std::make_shared<TBlock>()); - blockPtr->Move(block_); - auto lambda = [weakThis, codec = Settings.Codec_, level = Settings.CompressionLevel_, - isSyncCompression, blockPtr]() mutable - { - if (auto sharedThis = weakThis.lock()) { - Y_VERIFY(!blockPtr->Compressed); - - auto compressedData = CompressBuffer( - blockPtr->OriginalDataRefs, codec, level - ); - Y_VERIFY(!compressedData.Empty()); - blockPtr->Data = std::move(compressedData); - blockPtr->Compressed = true; - blockPtr->CodecID = GetCodecId(sharedThis->Settings.Codec_); - sharedThis->OnCompressed(std::move(*blockPtr), isSyncCompression); - } - }; - - CompressionExecutor->Post(lambda); -} - -void TWriteSession::OnCompressed(TBlock&& block, bool isSyncCompression) { - TMemoryUsageChange memoryUsage; - if (!isSyncCompression) { - with_lock(Lock) { - memoryUsage = OnCompressedImpl(std::move(block)); - } - } else { - memoryUsage = OnCompressedImpl(std::move(block)); - } - if (memoryUsage.NowOk && !memoryUsage.WasOk) { - EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); - } -} - -//Called under lock or synchronously if compression is sync -TMemoryUsageChange TWriteSession::OnCompressedImpl(TBlock&& block) { - UpdateTimedCountersImpl(); - Y_VERIFY(block.Valid); - auto memoryUsage = OnMemoryUsageChangedImpl(static_cast<i64>(block.Data.size()) - block.OriginalMemoryUsage); - (*Counters->BytesInflightUncompressed) -= block.OriginalSize; - (*Counters->BytesInflightCompressed) += block.Data.size(); - - PackedMessagesToSend.emplace(std::move(block)); - SendImpl(); - return memoryUsage; -} - -// Only called under lock -void TWriteSession::ResetForRetryImpl() { - SessionEstablished = false; - const size_t totalPackedMessages = PackedMessagesToSend.size() + SentPackedMessage.size(); - const size_t totalOriginalMessages = OriginalMessagesToSend.size() + SentOriginalMessages.size(); - while (!SentPackedMessage.empty()) { - PackedMessagesToSend.emplace(std::move(SentPackedMessage.front())); - SentPackedMessage.pop(); - } - ui64 minSeqNo = PackedMessagesToSend.empty() ? LastSeqNo + 1 : PackedMessagesToSend.top().Offset; - std::queue<TOriginalMessage> freshOriginalMessagesToSend; - OriginalMessagesToSend.swap(freshOriginalMessagesToSend); - while (!SentOriginalMessages.empty()) { - OriginalMessagesToSend.emplace(std::move(SentOriginalMessages.front())); - SentOriginalMessages.pop(); - } - while (!freshOriginalMessagesToSend.empty()) { - OriginalMessagesToSend.emplace(std::move(freshOriginalMessagesToSend.front())); - freshOriginalMessagesToSend.pop(); - } - if (!OriginalMessagesToSend.empty() && OriginalMessagesToSend.front().SeqNo < minSeqNo) - minSeqNo = OriginalMessagesToSend.front().SeqNo; - MinUnsentSeqNo = minSeqNo; - Y_VERIFY(PackedMessagesToSend.size() == totalPackedMessages); - Y_VERIFY(OriginalMessagesToSend.size() == totalOriginalMessages); -} - -// Called from client Write() methods. With lock -void TWriteSession::FlushWriteIfRequiredImpl() { - - if (!CurrentBatch.Empty() && !CurrentBatch.FlushRequested) { - MessagesAcquired += static_cast<ui64>(CurrentBatch.Acquire()); - if (TInstant::Now() - CurrentBatch.StartedAt >= Settings.BatchFlushInterval_.GetOrElse(TDuration::Zero()) - || CurrentBatch.CurrentSize >= Settings.BatchFlushSizeBytes_.GetOrElse(0) - || CurrentBatch.CurrentSize >= MaxBlockSize - || CurrentBatch.Messages.size() >= MaxBlockMessageCount - || CurrentBatch.HasCodec() - ) { - WriteBatchImpl(); - return; - } - } -} - - -// Involves compression, but still called under lock. -size_t TWriteSession::WriteBatchImpl() { - DbDriverState->Log.Write( - TLOG_DEBUG, - LogPrefix() << "write " << CurrentBatch.Messages.size() << " messages with seqNo from " - << CurrentBatch.Messages.begin()->SeqNo << " to " << CurrentBatch.Messages.back().SeqNo - ); - - Y_VERIFY(CurrentBatch.Messages.size() <= MaxBlockMessageCount); - - const bool skipCompression = Settings.Codec_ == ECodec::RAW || CurrentBatch.HasCodec(); - if (!skipCompression && Settings.CompressionExecutor_->IsAsync()) { - MessagesAcquired += static_cast<ui64>(CurrentBatch.Acquire()); - } - - size_t size = 0; - for (size_t i = 0; i != CurrentBatch.Messages.size();) { - TBlock block{}; - for (; block.OriginalSize < MaxBlockSize && i != CurrentBatch.Messages.size(); ++i) { - auto sequenceNumber = CurrentBatch.Messages[i].SeqNo; - auto createTs = CurrentBatch.Messages[i].CreatedAt; - - if (!block.MessageCount) { - block.Offset = sequenceNumber; - } - - block.MessageCount += 1; - const auto& datum = CurrentBatch.Messages[i].DataRef; - block.OriginalSize += datum.size(); - block.OriginalMemoryUsage = CurrentBatch.Data.size(); - block.OriginalDataRefs.emplace_back(datum); - if (CurrentBatch.Messages[i].Codec.Defined()) { - Y_VERIFY(CurrentBatch.Messages.size() == 1); - block.CodecID = GetCodecId(*CurrentBatch.Messages[i].Codec); - block.OriginalSize = CurrentBatch.Messages[i].OriginalSize; - block.Compressed = false; - } - size += datum.size(); - UpdateTimedCountersImpl(); - (*Counters->BytesInflightUncompressed) += datum.size(); - (*Counters->MessagesInflight)++; - OriginalMessagesToSend.emplace(sequenceNumber, createTs, datum.size()); - } - block.Data = std::move(CurrentBatch.Data); - if (skipCompression) { - PackedMessagesToSend.emplace(std::move(block)); - } else { - CompressImpl(std::move(block)); - } - } - CurrentBatch.Reset(); - if (skipCompression) { - SendImpl(); - } - return size; -} - -size_t GetMaxGrpcMessageSize() { - return 120_MB; -} - -bool TWriteSession::IsReadyToSendNextImpl() const { - if (!SessionEstablished) { - return false; - } - if (Aborting) - return false; - if (PackedMessagesToSend.empty()) { - return false; - } - Y_VERIFY(!OriginalMessagesToSend.empty(), "There are packed messages but no original messages"); - Y_VERIFY(OriginalMessagesToSend.front().SeqNo <= PackedMessagesToSend.top().Offset, "Lost original message(s)"); - - return PackedMessagesToSend.top().Offset == OriginalMessagesToSend.front().SeqNo; -} - - -void TWriteSession::UpdateTokenIfNeededImpl() { - DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: try to update token"); - - if (!DbDriverState->CredentialsProvider || UpdateTokenInProgress || !SessionEstablished) - return; - TClientMessage clientMessage; - auto* updateRequest = clientMessage.mutable_update_token_request(); - auto token = DbDriverState->CredentialsProvider->GetAuthInfo(); - if (token == PrevToken) - return; - UpdateTokenInProgress = true; - updateRequest->set_token(token); - PrevToken = token; - - DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: updating token"); - - Processor->Write(std::move(clientMessage)); -} - -void TWriteSession::SendImpl() { - // External cycle splits ready blocks into multiple gRPC messages. Current gRPC message size hard limit is 64MiB - while(IsReadyToSendNextImpl()) { - TClientMessage clientMessage; - auto* writeRequest = clientMessage.mutable_write_request(); - auto sentAtMs = TInstant::Now().MilliSeconds(); - - // Sent blocks while we can without messages reordering - while (IsReadyToSendNextImpl() && clientMessage.ByteSizeLong() < GetMaxGrpcMessageSize()) { - const auto& block = PackedMessagesToSend.top(); - Y_VERIFY(block.Valid); - for (size_t i = 0; i != block.MessageCount; ++i) { - Y_VERIFY(!OriginalMessagesToSend.empty()); - - auto& message = OriginalMessagesToSend.front(); - - writeRequest->add_sent_at_ms(sentAtMs); - writeRequest->add_sequence_numbers(message.SeqNo + SeqNoShift); - writeRequest->add_message_sizes(message.Size); - writeRequest->add_created_at_ms(message.CreatedAt.MilliSeconds()); - - SentOriginalMessages.emplace(std::move(message)); - OriginalMessagesToSend.pop(); - } - - writeRequest->add_blocks_offsets(block.Offset); - writeRequest->add_blocks_message_counts(block.MessageCount); - writeRequest->add_blocks_part_numbers(block.PartNumber); - writeRequest->add_blocks_uncompressed_sizes(block.OriginalSize); - writeRequest->add_blocks_headers(block.CodecID); - if (block.Compressed) - writeRequest->add_blocks_data(block.Data.data(), block.Data.size()); - else { - for (auto& buffer: block.OriginalDataRefs) { - writeRequest->add_blocks_data(buffer.data(), buffer.size()); - } - } - - TBlock moveBlock; - moveBlock.Move(block); - SentPackedMessage.emplace(std::move(moveBlock)); - PackedMessagesToSend.pop(); - } - UpdateTokenIfNeededImpl(); - DbDriverState->Log.Write( - TLOG_DEBUG, - LogPrefix() << "Send " << writeRequest->sequence_numbers_size() << " message(s) (" - << OriginalMessagesToSend.size() << " left), first sequence number is " - << writeRequest->sequence_numbers(0) - ); - Processor->Write(std::move(clientMessage)); - } -} - -// Client method, no Lock bool TWriteSession::Close(TDuration closeTimeout) { - if (AtomicGet(Aborting)) - return false; - DbDriverState->Log.Write( - TLOG_INFO, - LogPrefix() << "Write session: close. Timeout = " << closeTimeout.MilliSeconds() << " ms" - ); - auto startTime = TInstant::Now(); - auto remaining = closeTimeout; - bool ready = false; - bool needSetSeqNoValue = false; - while (remaining > TDuration::Zero()) { - with_lock(Lock) { - if (OriginalMessagesToSend.empty() && SentOriginalMessages.empty()) { - ready = true; - } - if (AtomicGet(Aborting)) - break; - } - if (ready) { - break; - } - remaining = closeTimeout - (TInstant::Now() - startTime); - Sleep(Min(TDuration::MilliSeconds(100), remaining)); - } - with_lock(Lock) { - ready = (OriginalMessagesToSend.empty() && SentOriginalMessages.empty()) && !AtomicGet(Aborting); - } - with_lock(Lock) { - CloseImpl(EStatus::SUCCESS, NYql::TIssues{}); - needSetSeqNoValue = !InitSeqNoSetDone && (InitSeqNoSetDone = true); - } - if (needSetSeqNoValue) { - InitSeqNoPromise.SetException("session closed"); - } - if (ready) { - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session: gracefully shut down, all writes complete"); - } else { - DbDriverState->Log.Write( - TLOG_WARNING, - LogPrefix() << "Write session: could not confirm all writes in time" - << " or session aborted, perform hard shutdown" - ); - } - return ready; -} - -void TWriteSession::HandleWakeUpImpl() { - FlushWriteIfRequiredImpl(); - if (AtomicGet(Aborting)) { - return; - } - auto callback = [weakThis = this->weak_from_this()] (bool ok) - { - if (!ok) - return; - if (auto sharedThis = weakThis.lock()) { - with_lock(sharedThis->Lock) { - sharedThis->HandleWakeUpImpl(); - } - } - }; - auto enqueueTokenCallback = [weakThis = this->weak_from_this()] (bool ok) { - if (!ok) - return; - if (auto sharedThis = weakThis.lock()) { - sharedThis->EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); - } - }; - if (TInstant::Now() - LastTokenUpdate > UPDATE_TOKEN_PERIOD) { - LastTokenUpdate = TInstant::Now(); - UpdateTokenIfNeededImpl(); - } - - const auto flushAfter = CurrentBatch.StartedAt == TInstant::Zero() - ? WakeupInterval - : WakeupInterval - Min(Now() - CurrentBatch.StartedAt, WakeupInterval); - Connections->ScheduleCallback(flushAfter, std::move(callback)); -} - -void TWriteSession::UpdateTimedCountersImpl() { - auto now = TInstant::Now(); - auto delta = (now - LastCountersUpdateTs).MilliSeconds(); - double percent = 100.0 / Settings.MaxMemoryUsage_; - - Counters->TotalBytesInflightUsageByTime->Collect(*Counters->BytesInflightTotal * percent, delta); - Counters->UncompressedBytesInflightUsageByTime->Collect(*Counters->BytesInflightUncompressed * percent, delta); - Counters->CompressedBytesInflightUsageByTime->Collect(*Counters->BytesInflightCompressed * percent, delta); - - *Counters->CurrentSessionLifetimeMs = (TInstant::Now() - SessionStartedTs).MilliSeconds(); - LastCountersUpdateTs = now; - if (LastCountersLogTs == TInstant::Zero() || TInstant::Now() - LastCountersLogTs > TDuration::Seconds(60)) { - LastCountersLogTs = TInstant::Now(); - -#define LOG_COUNTER(counter) \ - << " " Y_STRINGIZE(counter) ": " \ - << Counters->counter->Val() \ - /**/ - - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() - << "Counters: {" - LOG_COUNTER(Errors) - LOG_COUNTER(CurrentSessionLifetimeMs) - LOG_COUNTER(BytesWritten) - LOG_COUNTER(MessagesWritten) - LOG_COUNTER(BytesWrittenCompressed) - LOG_COUNTER(BytesInflightUncompressed) - LOG_COUNTER(BytesInflightCompressed) - LOG_COUNTER(BytesInflightTotal) - LOG_COUNTER(MessagesInflight) - << " }" - ); - -#undef LOG_COUNTER - } -} - -void TWriteSession::AbortImpl() { - if (!AtomicGet(Aborting)) { - DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: aborting"); - AtomicSet(Aborting, 1); - Cancel(ConnectContext); - Cancel(ConnectTimeoutContext); - Cancel(ConnectDelayContext); - if (Processor) - Processor->Cancel(); - - Cancel(ClientContext); - ClientContext.reset(); // removes context from contexts set from underlying gRPC-client. - } -} - -void TWriteSession::CloseImpl(EStatus statusCode, NYql::TIssues&& issues) { - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will now close"); - EventsQueue->Close(TSessionClosedEvent(statusCode, std::move(issues))); - AbortImpl(); -} - -void TWriteSession::CloseImpl(EStatus statusCode, const TString& message) { - NYql::TIssues issues; - issues.AddIssue(message); - CloseImpl(statusCode, std::move(issues)); -} - -void TWriteSession::CloseImpl(TPlainStatus&& status) { - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will now close"); - EventsQueue->Close(TSessionClosedEvent(std::move(status))); - AbortImpl(); + return Impl->Close(closeTimeout); } TWriteSession::~TWriteSession() { - DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: destroy"); - bool needClose = false; - with_lock(Lock) { - if (!AtomicGet(Aborting)) { - CloseImpl(EStatus::SUCCESS, NYql::TIssues{}); - - needClose = !InitSeqNoSetDone && (InitSeqNoSetDone = true); - } - } - if (needClose) - InitSeqNoPromise.SetException("session closed"); + Impl->Close(TDuration::Zero()); + Tracker->AsyncComplete().Wait(); } +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// TSimpleBlockingWriteSession + TSimpleBlockingWriteSession::TSimpleBlockingWriteSession( const TWriteSessionSettings& settings, std::shared_ptr<TPersQueueClient::TImpl> client, diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h index bf7e8950ba..62732d67c9 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h @@ -1,7 +1,9 @@ #pragma once #include "common.h" +#include "impl_tracker.h" #include "persqueue_impl.h" +#include "write_session_impl.h" #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h> @@ -10,148 +12,14 @@ namespace NYdb::NPersQueue { -inline const TString& GetCodecId(const ECodec codec) { - static THashMap<ECodec, TString> idByCodec{ - {ECodec::RAW, TString(1, '\0')}, - {ECodec::GZIP, "\1"}, - {ECodec::LZOP, "\2"}, - {ECodec::ZSTD, "\3"} - }; - Y_VERIFY(idByCodec.contains(codec)); - return idByCodec[codec]; -} - -class TWriteSessionEventsQueue: public TBaseSessionEventsQueue<TWriteSessionSettings, TWriteSessionEvent::TEvent, TSessionClosedEvent, IExecutor> { - using TParent = TBaseSessionEventsQueue<TWriteSessionSettings, TWriteSessionEvent::TEvent, TSessionClosedEvent, IExecutor>; - -public: - TWriteSessionEventsQueue(const TWriteSessionSettings& settings) - : TParent(settings) - {} - - void PushEvent(TEventInfo eventInfo) { - if (Closed || ApplyHandler(eventInfo)) { - return; - } - - TWaiter waiter; - with_lock (Mutex) { - Events.emplace(std::move(eventInfo)); - waiter = PopWaiterImpl(); - } - waiter.Signal(); // Does nothing if waiter is empty. - } - - TMaybe<TEvent> GetEvent(bool block = false) { - TMaybe<TEventInfo> eventInfo; - with_lock (Mutex) { - if (block) { - WaitEventsImpl(); - } - if (HasEventsImpl()) { - eventInfo = GetEventImpl(); - } else { - return Nothing(); - } - } - eventInfo->OnUserRetrievedEvent(); - return std::move(eventInfo->Event); - } - - TVector<TEvent> GetEvents(bool block = false, TMaybe<size_t> maxEventsCount = Nothing()) { - TVector<TEventInfo> eventInfos; - with_lock (Mutex) { - if (block) { - WaitEventsImpl(); - } - eventInfos.reserve(Min(Events.size() + CloseEvent.Defined(), maxEventsCount ? *maxEventsCount : std::numeric_limits<size_t>::max())); - while (!Events.empty()) { - eventInfos.emplace_back(GetEventImpl()); - if (maxEventsCount && eventInfos.size() >= *maxEventsCount) { - break; - } - } - if (CloseEvent && Events.empty() && (!maxEventsCount || eventInfos.size() < *maxEventsCount)) { - eventInfos.push_back({*CloseEvent}); - } - } - - TVector<TEvent> result; - result.reserve(eventInfos.size()); - for (TEventInfo& eventInfo : eventInfos) { - eventInfo.OnUserRetrievedEvent(); - result.emplace_back(std::move(eventInfo.Event)); - } - return result; - } - - void Close(const TSessionClosedEvent& event) { - TWaiter waiter; - with_lock (Mutex) { - CloseEvent = event; - Closed = true; - waiter = TWaiter(Waiter.ExtractPromise(), this); - } - - TEventInfo info(event); - ApplyHandler(info); - - waiter.Signal(); - } - -private: - struct THandlersVisitor : public TParent::TBaseHandlersVisitor { - using TParent::TBaseHandlersVisitor::TBaseHandlersVisitor; - -#define DECLARE_HANDLER(type, handler, answer) \ - bool operator()(type& event) { \ - if (Settings.EventHandlers_.handler) { \ - Settings.EventHandlers_.handler(event); \ - return answer; \ - } \ - return false; \ - } \ - /**/ - - DECLARE_HANDLER(TWriteSessionEvent::TAcksEvent, AcksHandler_, true); - DECLARE_HANDLER(TWriteSessionEvent::TReadyToAcceptEvent, ReadyToAcceptHander_, true); - DECLARE_HANDLER(TSessionClosedEvent, SessionClosedHandler_, false); // Not applied - -#undef DECLARE_HANDLER - - bool Visit() { - return std::visit(*this, Event); - } - - }; - - bool ApplyHandler(TEventInfo& eventInfo) { - THandlersVisitor visitor(Settings, eventInfo.GetEvent()); - return visitor.Visit(); - } - - TEventInfo GetEventImpl() { // Assumes that we're under lock and that the event queue has events. - Y_ASSERT(HasEventsImpl()); - if (!Events.empty()) { - TEventInfo event = std::move(Events.front()); - Events.pop(); - RenewWaiterImpl(); - return event; - } - Y_ASSERT(CloseEvent); - return {*CloseEvent}; - } -}; - -struct TMemoryUsageChange { - bool WasOk; //!< MemoryUsage <= Config.MaxMemoryUsage_ before update - bool NowOk; //!< Same, only after update -}; namespace NTests { class TSimpleWriteSessionTestAdapter; } +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// TWriteSession + class TWriteSession : public IWriteSession, public std::enable_shared_from_this<TWriteSession> { private: @@ -159,136 +27,6 @@ private: friend class TPersQueueClient; friend class NTests::TSimpleWriteSessionTestAdapter; - using TClientMessage = Ydb::PersQueue::V1::StreamingWriteClientMessage; - using TServerMessage = Ydb::PersQueue::V1::StreamingWriteServerMessage; - using IWriteSessionConnectionProcessorFactory = - TPersQueueClient::TImpl::IWriteSessionConnectionProcessorFactory; - using IProcessor = IWriteSessionConnectionProcessorFactory::IProcessor; - - struct TMessage { - ui64 SeqNo; - TInstant CreatedAt; - TStringBuf DataRef; - TMaybe<ECodec> Codec; - ui32 OriginalSize; // only for coded messages - TMessage(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec = {}, ui32 originalSize = 0) - : SeqNo(seqNo) - , CreatedAt(createdAt) - , DataRef(data) - , Codec(codec) - , OriginalSize(originalSize) - {} - }; - - struct TMessageBatch { - TBuffer Data; - TVector<TMessage> Messages; - ui64 CurrentSize = 0; - TInstant StartedAt = TInstant::Zero(); - bool Acquired = false; - bool FlushRequested = false; - void Add(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize) { - if (StartedAt == TInstant::Zero()) - StartedAt = TInstant::Now(); - CurrentSize += codec ? originalSize : data.size(); - Messages.emplace_back(seqNo, createdAt, data, codec, originalSize); - Acquired = false; - } - - bool HasCodec() const { - return Messages.empty() ? false : Messages.front().Codec.Defined(); - } - - bool Acquire() { - if (Acquired || Messages.empty()) - return false; - auto currSize = Data.size(); - Data.Append(Messages.back().DataRef.data(), Messages.back().DataRef.size()); - Messages.back().DataRef = TStringBuf(Data.data() + currSize, Data.size() - currSize); - Acquired = true; - return true; - } - - bool Empty() const noexcept { - return CurrentSize == 0 && Messages.empty(); - } - - void Reset() { - StartedAt = TInstant::Zero(); - Messages.clear(); - Data.Clear(); - Acquired = false; - CurrentSize = 0; - FlushRequested = false; - } - }; - - struct TBlock { - size_t Offset = 0; //!< First message sequence number in the block - size_t MessageCount = 0; - size_t PartNumber = 0; - size_t OriginalSize = 0; - size_t OriginalMemoryUsage = 0; - TString CodecID = GetCodecId(ECodec::RAW); - mutable TVector<TStringBuf> OriginalDataRefs; - mutable TBuffer Data; - bool Compressed = false; - mutable bool Valid = true; - - TBlock& operator=(TBlock&&) = default; - TBlock(TBlock&&) = default; - TBlock() = default; - - //For taking ownership by copying from const object, f.e. lambda -> std::function, priority_queue - void Move(const TBlock& rhs) { - Offset = rhs.Offset; - MessageCount = rhs.MessageCount; - PartNumber = rhs.PartNumber; - OriginalSize = rhs.OriginalSize; - OriginalMemoryUsage = rhs.OriginalMemoryUsage; - CodecID = rhs.CodecID; - OriginalDataRefs.swap(rhs.OriginalDataRefs); - Data.Swap(rhs.Data); - Compressed = rhs.Compressed; - - rhs.Data.Clear(); - rhs.OriginalDataRefs.clear(); - } - }; - - struct TOriginalMessage { - ui64 SeqNo; - TInstant CreatedAt; - size_t Size; - TOriginalMessage(const ui64 sequenceNumber, const TInstant createdAt, const size_t size) - : SeqNo(sequenceNumber) - , CreatedAt(createdAt) - , Size(size) - {} - }; - - //! Block comparer, makes block with smallest offset (first sequence number) appear on top of the PackedMessagesToSend priority queue - struct Greater { - bool operator() (const TBlock& lhs, const TBlock& rhs) { - return lhs.Offset > rhs.Offset; - } - }; - - struct THandleResult { - bool DoRestart = false; - TDuration StartDelay = TDuration::Zero(); - bool DoStop = false; - bool DoSetSeqNo = false; - }; - struct TProcessSrvMessageResult { - THandleResult HandleResult; - TMaybe<ui64> InitSeqNo; - TVector<TWriteSessionEvent::TEvent> Events; - bool Ok = true; - }; - - THandleResult OnErrorImpl(NYdb::TPlainStatus&& status); // true - should Start(), false - should Close(), empty - no action - public: TWriteSession(const TWriteSessionSettings& settings, std::shared_ptr<TPersQueueClient::TImpl> client, @@ -317,129 +55,24 @@ public: ~TWriteSession(); // will not call close - destroy everything without acks private: - - TStringBuilder LogPrefix() const; - - void UpdateTokenIfNeededImpl(); - - void WriteInternal(TContinuationToken&& continuationToken, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize, - TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()); - - void FlushWriteIfRequiredImpl(); - size_t WriteBatchImpl(); void Start(const TDuration& delay); - void InitWriter(); - - void DoCdsRequest(TDuration delay = TDuration::Zero()); - void OnCdsResponse(TStatus& status, const Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResult& result); - void OnConnect(TPlainStatus&& st, typename IProcessor::TPtr&& processor, - const NGrpc::IQueueClientContextPtr& connectContext); - void OnConnectTimeout(const NGrpc::IQueueClientContextPtr& connectTimeoutContext); - void ResetForRetryImpl(); - THandleResult RestartImpl(const TPlainStatus& status); - void DoConnect(const TDuration& delay, const TString& endpoint); - void InitImpl(); - void ReadFromProcessor(); // Assumes that we're under lock. - void WriteToProcessorImpl(TClientMessage&& req); // Assumes that we're under lock. - void OnReadDone(NGrpc::TGrpcStatus&& grpcStatus, size_t connectionGeneration); - void OnWriteDone(NGrpc::TGrpcStatus&& status, size_t connectionGeneration); - TProcessSrvMessageResult ProcessServerMessageImpl(); - TMemoryUsageChange OnMemoryUsageChangedImpl(i64 diff); - void CompressImpl(TBlock&& block); - void OnCompressed(TBlock&& block, bool isSyncCompression=false); - TMemoryUsageChange OnCompressedImpl(TBlock&& block); - - //TString GetDebugIdentity() const; - Ydb::PersQueue::V1::StreamingWriteClientMessage GetInitClientMessage(); - bool CleanupOnAcknowledged(ui64 sequenceNumber); - bool IsReadyToSendNextImpl() const; - ui64 GetNextSeqNoImpl(const TMaybe<ui64>& seqNo); - void SendImpl(); - void AbortImpl(); - void CloseImpl(EStatus statusCode, NYql::TIssues&& issues); - void CloseImpl(EStatus statusCode, const TString& message); - void CloseImpl(TPlainStatus&& status); - - void OnErrorResolved() { - RetryState = nullptr; - } - void CheckHandleResultImpl(THandleResult& result); - void ProcessHandleResult(THandleResult& result); - void HandleWakeUpImpl(); - void UpdateTimedCountersImpl(); private: - TWriteSessionSettings Settings; - std::shared_ptr<TPersQueueClient::TImpl> Client; - std::shared_ptr<TGRpcConnectionsImpl> Connections; - TString TargetCluster; - TString InitialCluster; - TString CurrentCluster; - bool OnSeqNoShift = false; - TString PreferredClusterByCDS; - std::shared_ptr<IWriteSessionConnectionProcessorFactory> ConnectionFactory; - TDbDriverStatePtr DbDriverState; - TStringType PrevToken; - bool UpdateTokenInProgress = false; - TInstant LastTokenUpdate = TInstant::Zero(); - std::shared_ptr<TWriteSessionEventsQueue> EventsQueue; - NGrpc::IQueueClientContextPtr ClientContext; // Common client context. - NGrpc::IQueueClientContextPtr ConnectContext; - NGrpc::IQueueClientContextPtr ConnectTimeoutContext; - NGrpc::IQueueClientContextPtr ConnectDelayContext; - size_t ConnectionGeneration = 0; - size_t ConnectionAttemptsDone = 0; - TAdaptiveLock Lock; - IProcessor::TPtr Processor; - IRetryPolicy::IRetryState::TPtr RetryState; // Current retry state (if now we are (re)connecting). - std::shared_ptr<TServerMessage> ServerMessage; // Server message to write server response to. - - TString SessionId; - IExecutor::TPtr Executor; - IExecutor::TPtr CompressionExecutor; - size_t MemoryUsage = 0; //!< Estimated amount of memory used - - TMessageBatch CurrentBatch; - - std::queue<TOriginalMessage> OriginalMessagesToSend; - std::priority_queue<TBlock, std::vector<TBlock>, Greater> PackedMessagesToSend; - //! Messages that are sent but yet not acknowledged - std::queue<TOriginalMessage> SentOriginalMessages; - std::queue<TBlock> SentPackedMessage; - - const size_t MaxBlockSize = std::numeric_limits<size_t>::max(); - const size_t MaxBlockMessageCount = 1; //!< Max message count that can be packed into a single block. In block version 0 is equal to 1 for compatibility - bool Connected = false; - bool Started = false; - TAtomic Aborting = 0; - bool SessionEstablished = false; - ui32 PartitionId = 0; - ui64 LastSeqNo = 0; - ui64 MinUnsentSeqNo = 0; - ui64 SeqNoShift = 0; - TMaybe<bool> AutoSeqNoMode; - bool ValidateSeqNoMode = false; - - NThreading::TPromise<ui64> InitSeqNoPromise; - bool InitSeqNoSetDone = false; - TInstant SessionStartedTs; - TInstant LastCountersUpdateTs = TInstant::Zero(); - TInstant LastCountersLogTs; - TWriterCounters::TPtr Counters; - TDuration WakeupInterval; - -protected: - ui64 MessagesAcquired = 0; + std::shared_ptr<TImplTracker> Tracker; + std::shared_ptr<TWriteSessionImpl> Impl; }; +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// TSimpleBlockingWriteSession + class TSimpleBlockingWriteSession : public ISimpleBlockingWriteSession { friend class NTests::TSimpleWriteSessionTestAdapter; private: - using TClientMessage = TWriteSession::TClientMessage; - using TServerMessage = TWriteSession::TServerMessage; - using IWriteSessionConnectionProcessorFactory = TWriteSession::IWriteSessionConnectionProcessorFactory; - using IProcessor = TWriteSession::IProcessor; + using TClientMessage = TWriteSessionImpl::TClientMessage; + using TServerMessage = TWriteSessionImpl::TServerMessage; + using IWriteSessionConnectionProcessorFactory = TWriteSessionImpl::IWriteSessionConnectionProcessorFactory; + using IProcessor = TWriteSessionImpl::IProcessor; public: TSimpleBlockingWriteSession( diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp new file mode 100644 index 0000000000..338a71fab6 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp @@ -0,0 +1,1312 @@ +#include "write_session.h" +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h> +#include <library/cpp/string_utils/url/url.h> + +#include <util/generic/store_policy.h> +#include <util/generic/utility.h> +#include <util/stream/buffer.h> + + +namespace NYdb::NPersQueue { +using ::NMonitoring::TDynamicCounterPtr; +using TCounterPtr = ::NMonitoring::TDynamicCounters::TCounterPtr; + + +const TDuration UPDATE_TOKEN_PERIOD = TDuration::Hours(1); + +namespace NCompressionDetails { + THolder<IOutputStream> CreateCoder(ECodec codec, TBuffer& result, int quality); +} + +#define HISTOGRAM_SETUP NMonitoring::ExplicitHistogram({0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100}) +TWriterCounters::TWriterCounters(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters) { + Errors = counters->GetCounter("errors", true); + CurrentSessionLifetimeMs = counters->GetCounter("currentSessionLifetimeMs", false); + BytesWritten = counters->GetCounter("bytesWritten", true); + MessagesWritten = counters->GetCounter("messagesWritten", true); + BytesWrittenCompressed = counters->GetCounter("bytesWrittenCompressed", true); + BytesInflightUncompressed = counters->GetCounter("bytesInflightUncompressed", false); + BytesInflightCompressed = counters->GetCounter("bytesInflightCompressed", false); + BytesInflightTotal = counters->GetCounter("bytesInflightTotal", false); + MessagesInflight = counters->GetCounter("messagesInflight", false); + + TotalBytesInflightUsageByTime = counters->GetHistogram("totalBytesInflightUsageByTime", HISTOGRAM_SETUP); + UncompressedBytesInflightUsageByTime = counters->GetHistogram("uncompressedBytesInflightUsageByTime", HISTOGRAM_SETUP); + CompressedBytesInflightUsageByTime = counters->GetHistogram("compressedBytesInflightUsageByTime", HISTOGRAM_SETUP); +} +#undef HISTOGRAM_SETUP + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// TWriteSessionImpl + +TWriteSessionImpl::TWriteSessionImpl( + const TWriteSessionSettings& settings, + std::shared_ptr<TPersQueueClient::TImpl> client, + std::shared_ptr<TGRpcConnectionsImpl> connections, + TDbDriverStatePtr dbDriverState, + std::shared_ptr<TImplTracker> tracker) + : Settings(settings) + , Client(std::move(client)) + , Connections(std::move(connections)) + , DbDriverState(std::move(dbDriverState)) + , PrevToken(DbDriverState->CredentialsProvider ? DbDriverState->CredentialsProvider->GetAuthInfo() : "") + , Tracker(tracker) + , EventsQueue(std::make_shared<TWriteSessionEventsQueue>(Settings, Tracker)) + , InitSeqNoPromise(NThreading::NewPromise<ui64>()) + , WakeupInterval( + Settings.BatchFlushInterval_.GetOrElse(TDuration::Zero()) ? + std::min(Settings.BatchFlushInterval_.GetOrElse(TDuration::Seconds(1)) / 5, TDuration::MilliSeconds(100)) + : + TDuration::MilliSeconds(100) + ) +{ + if (!Settings.RetryPolicy_) { + Settings.RetryPolicy_ = IRetryPolicy::GetDefaultPolicy(); + } + if (Settings.PreferredCluster_ && !Settings.AllowFallbackToOtherClusters_) { + TargetCluster = *Settings.PreferredCluster_; + TargetCluster.to_lower(); + } + if (Settings.Counters_.Defined()) { + Counters = *Settings.Counters_; + } else { + Counters = MakeIntrusive<TWriterCounters>(new ::NMonitoring::TDynamicCounters()); + } + +} + +void TWriteSessionImpl::Start(const TDuration& delay) { + ++ConnectionAttemptsDone; + if (!Started) { + with_lock(Lock) { + HandleWakeUpImpl(); + } + InitWriter(); + } + Started = true; + + DoCdsRequest(delay); +} + +TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStatus& status) { + Y_VERIFY(Lock.IsLocked()); + + THandleResult result; + if (AtomicGet(Aborting)) { + DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session is aborting and will not restart"); + return result; + } + DbDriverState->Log.Write( + TLOG_INFO, + LogPrefix() << "Got error. Status: " << status.Status + << ". Description: " << IssuesSingleLineString(status.Issues) + ); + SessionEstablished = false; + TMaybe<TDuration> nextDelay = TDuration::Zero(); + if (!RetryState) { + RetryState = Settings.RetryPolicy_->CreateRetryState(); + } + nextDelay = RetryState->GetNextRetryDelay(status.Status); + + if (nextDelay) { + result.StartDelay = *nextDelay; + result.DoRestart = true; + DbDriverState->Log.Write( + TLOG_DEBUG, + LogPrefix() << "Write session will restart in " << result.StartDelay.MilliSeconds() << " ms" + ); + ResetForRetryImpl(); + + } else { + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will not restart after a fatal error"); + result.DoStop = true; + CheckHandleResultImpl(result); + } + return result; +} + +bool IsFederation(const TString& endpoint) { + TStringBuf host = GetHost(endpoint); + return host == "logbroker.yandex.net" || host == "logbroker-prestable.yandex.net"; +} + +void TWriteSessionImpl::DoCdsRequest(TDuration delay) { + bool cdsRequestIsUnnecessary; + with_lock (Lock) { + if (AtomicGet(Aborting)) { + return; + } + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session: Do CDS request"); + + cdsRequestIsUnnecessary = (Settings.ClusterDiscoveryMode_ == EClusterDiscoveryMode::Off || + (Settings.ClusterDiscoveryMode_ == EClusterDiscoveryMode::Auto && !IsFederation(DbDriverState->DiscoveryEndpoint))); + + if (!cdsRequestIsUnnecessary) { + auto extractor = [sharedThis = shared_from_this(), wire = Tracker->MakeTrackedWire()] + (google::protobuf::Any* any, TPlainStatus status) mutable { + Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResult result; + if (any) { + any->UnpackTo(&result); + } + TStatus st(std::move(status)); + sharedThis->OnCdsResponse(st, result); + }; + + Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest req; + auto* params = req.add_write_sessions(); + params->set_topic(Settings.Path_); + params->set_source_id(Settings.MessageGroupId_); + if (Settings.PartitionGroupId_.Defined()) + params->set_partition_group(*Settings.PartitionGroupId_); + if (Settings.PreferredCluster_.Defined()) + params->set_preferred_cluster_name(*Settings.PreferredCluster_); + + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Do schedule cds request after " << delay.MilliSeconds() << " ms\n"); + auto cdsRequestCall = [wire = Tracker->MakeTrackedWire(), req_=std::move(req), extr=std::move(extractor), connections = std::shared_ptr<TGRpcConnectionsImpl>(Connections), dbState=DbDriverState, settings=Settings]() mutable { + dbState->Log.Write(TLOG_INFO, TStringBuilder() << "MessageGroupId [" << settings.MessageGroupId_ << "] Running cds request ms\n"); + connections->RunDeferred<Ydb::PersQueue::V1::ClusterDiscoveryService, + Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest, + Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResponse>( + std::move(req_), + std::move(extr), + &Ydb::PersQueue::V1::ClusterDiscoveryService::Stub::AsyncDiscoverClusters, + dbState, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings)); // TODO: make client timeout setting + }; + Connections->ScheduleOneTimeTask(std::move(cdsRequestCall), delay); + return; + } + } + + if (cdsRequestIsUnnecessary) { + DoConnect(delay, DbDriverState->DiscoveryEndpoint); + return; + } +} + +void TWriteSessionImpl::OnCdsResponse( + TStatus& status, const Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResult& result +) { + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Got CDS response: \n" << result.ShortDebugString()); + TString endpoint, name; + THandleResult handleResult; + if (!status.IsSuccess()) { + with_lock (Lock) { + handleResult = OnErrorImpl({ + status.GetStatus(), + MakeIssueWithSubIssues("Failed to discover clusters", status.GetIssues()) + }); + } + ProcessHandleResult(handleResult); + return; + } + + NYql::TIssues issues; + EStatus errorStatus = EStatus::INTERNAL_ERROR; + with_lock (Lock) { + const Ydb::PersQueue::ClusterDiscovery::WriteSessionClusters& wsClusters = result.write_sessions_clusters(0); + bool isFirst = true; + + for (const auto& clusterInfo : wsClusters.clusters()) { + TString normalizedName = clusterInfo.name(); + normalizedName.to_lower(); + + if(isFirst) { + isFirst = false; + PreferredClusterByCDS = clusterInfo.name(); + } + + if (!clusterInfo.available()) { + if (TargetCluster && TargetCluster == normalizedName) { + errorStatus = EStatus::UNAVAILABLE; + issues.AddIssue(TStringBuilder() << "Selected destination cluster: " << normalizedName + << " is currently disabled"); + break; + } + continue; + } + if (clusterInfo.endpoint().empty()) { + issues.AddIssue(TStringBuilder() << "Unexpected reply from cluster discovery. Empty endpoint for cluster " + << normalizedName); + } else { + name = clusterInfo.name(); + endpoint = ApplyClusterEndpoint(DbDriverState->DiscoveryEndpoint, clusterInfo.endpoint()); + break; + } + } + if (endpoint.empty()) { + errorStatus = EStatus::GENERIC_ERROR; + issues.AddIssue(TStringBuilder() << "Could not get valid endpoint from cluster discovery"); + } + } + if (issues) { + with_lock(Lock) { + handleResult = OnErrorImpl({errorStatus, std::move(issues)}); + } + ProcessHandleResult(handleResult); + return; + } + with_lock(Lock) { + if (!InitialCluster) { + InitialCluster = name; + } else if (CurrentCluster != name) { // Switched to another cluster + Y_VERIFY(CurrentCluster); + if (name == InitialCluster) { // Returned to initial cluster, disabled SeqNo Shift + SeqNoShift = 0; + OnSeqNoShift = false; + } else { // Switched from initial cluster to second one; + Y_VERIFY(CurrentCluster == InitialCluster); + if (AutoSeqNoMode.GetOrElse(true)) { + OnSeqNoShift = true; + } + } + + } + CurrentCluster = name; + } + DoConnect(TDuration::Zero(), endpoint); + +} + +void TWriteSessionImpl::InitWriter() { // No Lock, very initial start - no race yet as well. + CompressionExecutor = Settings.CompressionExecutor_; + IExecutor::TPtr executor; + executor = CreateSyncExecutor(); + executor->Start(); + Executor = std::move(executor); + + Settings.CompressionExecutor_->Start(); + Settings.EventHandlers_.HandlersExecutor_->Start(); + +} +// Client method +NThreading::TFuture<ui64> TWriteSessionImpl::GetInitSeqNo() { + if (Settings.ValidateSeqNo_) { + if (AutoSeqNoMode.Defined() && *AutoSeqNoMode) { + DbDriverState->Log.Write(TLOG_ERR, LogPrefix() << "Cannot call GetInitSeqNo in Auto SeqNo mode"); + ThrowFatalError("Cannot call GetInitSeqNo in Auto SeqNo mode"); + } + else + AutoSeqNoMode = false; + } + return InitSeqNoPromise.GetFuture(); +} + +TString DebugString(const TWriteSessionEvent::TEvent& event) { + return std::visit([](const auto& ev) { return ev.DebugString(); }, event); +} + +// Client method +TMaybe<TWriteSessionEvent::TEvent> TWriteSessionImpl::GetEvent(bool block) { + return EventsQueue->GetEvent(block); +} + +// Client method +TVector<TWriteSessionEvent::TEvent> TWriteSessionImpl::GetEvents(bool block, TMaybe<size_t> maxEventsCount) { + return EventsQueue->GetEvents(block, maxEventsCount); +} + +ui64 TWriteSessionImpl::GetNextSeqNoImpl(const TMaybe<ui64>& seqNo) { + Y_VERIFY(Lock.IsLocked()); + + ui64 seqNoValue = LastSeqNo + 1; + if (!AutoSeqNoMode.Defined()) { + AutoSeqNoMode = !seqNo.Defined(); + } + if (seqNo.Defined()) { + if (*AutoSeqNoMode) { + DbDriverState->Log.Write( + TLOG_ERR, + LogPrefix() << "Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode" + ); + ThrowFatalError( + "Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode" + ); + + } else { + seqNoValue = *seqNo; + } + //! Disable SeqNo shift for manual SeqNo mode; + OnSeqNoShift = false; + SeqNoShift = 0; + } else if (!(*AutoSeqNoMode)) { + DbDriverState->Log.Write( + TLOG_ERR, + LogPrefix() << "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode" + ); + ThrowFatalError( + "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode" + ); + } + LastSeqNo = seqNoValue; + return seqNoValue; +} +inline void TWriteSessionImpl::CheckHandleResultImpl(THandleResult& result) { + Y_VERIFY(Lock.IsLocked()); + + result.DoSetSeqNo = result.DoStop && !InitSeqNoSetDone && (InitSeqNoSetDone = true); +} + +void TWriteSessionImpl::ProcessHandleResult(THandleResult& result) { + if (result.DoRestart) { + Start(result.StartDelay); + } else if (result.DoSetSeqNo) { + InitSeqNoPromise.SetException("session closed"); + } +} + +NThreading::TFuture<void> TWriteSessionImpl::WaitEvent() { + return EventsQueue->WaitEvent(); +} + +// Client method. +void TWriteSessionImpl::WriteInternal( + TContinuationToken&&, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp + ) { + TInstant createdAtValue = createTimestamp.Defined() ? *createTimestamp : TInstant::Now(); + bool readyToAccept = false; + size_t bufferSize = data.size(); + with_lock(Lock) { + CurrentBatch.Add(GetNextSeqNoImpl(seqNo), createdAtValue, data, codec, originalSize); + + FlushWriteIfRequiredImpl(); + readyToAccept = OnMemoryUsageChangedImpl(bufferSize).NowOk; + } + if (readyToAccept) { + EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); + } +} + +// Client method. +void TWriteSessionImpl::WriteEncoded( + TContinuationToken&& token, TStringBuf data, ECodec codec, ui32 originalSize, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp + ) { + WriteInternal(std::move(token), data, codec, originalSize, seqNo, createTimestamp); +} + +void TWriteSessionImpl::Write( + TContinuationToken&& token, TStringBuf data, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp + ) { + WriteInternal(std::move(token), data, {}, 0, seqNo, createTimestamp); +} + + +TWriteSessionImpl::THandleResult TWriteSessionImpl::OnErrorImpl(NYdb::TPlainStatus&& status) { + Y_VERIFY(Lock.IsLocked()); + + (*Counters->Errors)++; + auto result = RestartImpl(status); + if (result.DoStop) { + CloseImpl(status.Status, std::move(status.Issues)); + } + return result; +} + +// No lock +void TWriteSessionImpl::DoConnect(const TDuration& delay, const TString& endpoint) { + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Start write session. Will connect to endpoint: " << endpoint); + + NGrpc::IQueueClientContextPtr prevConnectContext; + NGrpc::IQueueClientContextPtr prevConnectTimeoutContext; + NGrpc::IQueueClientContextPtr prevConnectDelayContext; + NGrpc::IQueueClientContextPtr connectContext = nullptr; + NGrpc::IQueueClientContextPtr connectDelayContext = nullptr; + NGrpc::IQueueClientContextPtr connectTimeoutContext = nullptr; + TRpcRequestSettings reqSettings; + std::shared_ptr<IWriteSessionConnectionProcessorFactory> connectionFactory; + + // Callbacks + std::function<void(TPlainStatus&&, typename IProcessor::TPtr&&)> connectCallback; + std::function<void(bool)> connectTimeoutCallback; + + with_lock(Lock) { + if (Aborting) { + return; + } + ++ConnectionGeneration; + auto subclient = Client->GetClientForEndpoint(endpoint); + connectionFactory = subclient->CreateWriteSessionConnectionProcessorFactory(); + auto clientContext = subclient->CreateContext(); + ConnectionFactory = connectionFactory; + + ClientContext = std::move(clientContext); + ServerMessage = std::make_shared<TServerMessage>(); + + if (!ClientContext) { + AbortImpl(); + // Grpc and WriteSession is closing right now. + return; + } + + connectContext = ClientContext->CreateContext(); + if (delay) + connectDelayContext = ClientContext->CreateContext(); + connectTimeoutContext = ClientContext->CreateContext(); + + // Previous operations contexts. + + // Set new context + prevConnectContext = std::exchange(ConnectContext, connectContext); + prevConnectTimeoutContext = std::exchange(ConnectTimeoutContext, connectTimeoutContext); + prevConnectDelayContext = std::exchange(ConnectDelayContext, connectDelayContext); + Y_ASSERT(ConnectContext); + Y_ASSERT(ConnectTimeoutContext); + + // Cancel previous operations. + Cancel(prevConnectContext); + if (prevConnectDelayContext) + Cancel(prevConnectDelayContext); + Cancel(prevConnectTimeoutContext); + Y_ASSERT(connectContext); + Y_ASSERT(connectTimeoutContext); + reqSettings = TRpcRequestSettings::Make(Settings); + + connectCallback = [sharedThis = shared_from_this(), + wire = Tracker->MakeTrackedWire(), + connectContext = connectContext] + (TPlainStatus&& st, typename IProcessor::TPtr&& processor) { + sharedThis->OnConnect(std::move(st), std::move(processor), connectContext); + }; + + connectTimeoutCallback = [sharedThis = shared_from_this(), + wire = Tracker->MakeTrackedWire(), + connectTimeoutContext = connectTimeoutContext] + (bool ok) { + if (ok) { + sharedThis->OnConnectTimeout(connectTimeoutContext); + } + }; + } + + connectionFactory->CreateProcessor( + std::move(connectCallback), + reqSettings, + std::move(connectContext), + TDuration::Seconds(30) /* connect timeout */, // TODO: make connect timeout setting. + std::move(connectTimeoutContext), + std::move(connectTimeoutCallback), + delay, + std::move(connectDelayContext) + ); +} + +// RPC callback. +void TWriteSessionImpl::OnConnectTimeout(const NGrpc::IQueueClientContextPtr& connectTimeoutContext) { + DbDriverState->Log.Write(TLOG_ERR, LogPrefix() << "Write session: connect timeout"); + THandleResult handleResult; + with_lock (Lock) { + if (ConnectTimeoutContext == connectTimeoutContext) { + Cancel(ConnectContext); + ConnectContext = nullptr; + ConnectTimeoutContext = nullptr; + ConnectDelayContext = nullptr; + } else { + return; + } + TStringBuilder description; + description << "Failed to establish connection to server. Attempts done: " << ConnectionAttemptsDone; + handleResult = RestartImpl(TPlainStatus(EStatus::TIMEOUT, description)); + if (handleResult.DoStop) { + CloseImpl( + EStatus::TIMEOUT, + description + ); + } + } + ProcessHandleResult(handleResult); +} + +// RPC callback. +void TWriteSessionImpl::OnConnect( + TPlainStatus&& st, typename IProcessor::TPtr&& processor, const NGrpc::IQueueClientContextPtr& connectContext +) { + THandleResult handleResult; + with_lock (Lock) { + if (ConnectContext == connectContext) { + Cancel(ConnectTimeoutContext); + ConnectContext = nullptr; + ConnectTimeoutContext = nullptr; + ConnectDelayContext = nullptr; + + if (st.Ok()) { + Processor = std::move(processor); + InitImpl(); + // Still should call ReadFromProcessor(); + } + } else { + return; + } + if (!st.Ok()) { + handleResult = RestartImpl(st); + if (handleResult.DoStop) { + CloseImpl( + st.Status, + MakeIssueWithSubIssues( + TStringBuilder() << "Failed to establish connection to server \"" << st.Endpoint + << "\". Attempts done: " << ConnectionAttemptsDone, + st.Issues + ) + ); + } + } + } + if (st.Ok()) + ReadFromProcessor(); // Out of Init + ProcessHandleResult(handleResult); +} + +// Produce init request for session. +void TWriteSessionImpl::InitImpl() { + Y_VERIFY(Lock.IsLocked()); + + Ydb::PersQueue::V1::StreamingWriteClientMessage req; + auto* init = req.mutable_init_request(); + init->set_topic(Settings.Path_); + init->set_message_group_id(Settings.MessageGroupId_); + if (Settings.PartitionGroupId_) { + init->set_partition_group_id(*Settings.PartitionGroupId_); + } + init->set_max_supported_format_version(0); + init->set_preferred_cluster(PreferredClusterByCDS); + + for (const auto& attr : Settings.Meta_.Fields) { + (*init->mutable_session_meta())[attr.first] = attr.second; + } + DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: send init request: "<< req.ShortDebugString()); + WriteToProcessorImpl(std::move(req)); +} + +// Called under lock. Invokes Processor->Write, which is assumed to be deadlock-safe +void TWriteSessionImpl::WriteToProcessorImpl(TWriteSessionImpl::TClientMessage&& req) { + Y_VERIFY(Lock.IsLocked()); + + Y_ASSERT(Processor); + if (Aborting) { + return; + } + auto callback = [sharedThis = shared_from_this(), + wire = Tracker->MakeTrackedWire(), + connectionGeneration = ConnectionGeneration](NGrpc::TGrpcStatus&& grpcStatus) { + sharedThis->OnWriteDone(std::move(grpcStatus), connectionGeneration); + }; + + Processor->Write(std::move(req), std::move(callback)); +} + +void TWriteSessionImpl::ReadFromProcessor() { + Y_ASSERT(Processor); + IProcessor::TPtr prc; + ui64 generation; + std::function<void(NGrpc::TGrpcStatus&&)> callback; + with_lock(Lock) { + if (Aborting) { + return; + } + prc = Processor; + generation = ConnectionGeneration; + callback = [sharedThis = shared_from_this(), + wire = Tracker->MakeTrackedWire(), + connectionGeneration = generation, + processor = prc, + serverMessage = ServerMessage] + (NGrpc::TGrpcStatus&& grpcStatus) { + sharedThis->OnReadDone(std::move(grpcStatus), connectionGeneration); + }; + } + prc->Read(ServerMessage.get(), std::move(callback)); +} + +void TWriteSessionImpl::OnWriteDone(NGrpc::TGrpcStatus&& status, size_t connectionGeneration) { + THandleResult handleResult; + with_lock (Lock) { + if (connectionGeneration != ConnectionGeneration) { + return; // Message from previous connection. Ignore. + } + if (Aborting) { + return; + } + if(!status.Ok()) { + handleResult = OnErrorImpl(status); + } + } + ProcessHandleResult(handleResult); +} + +void TWriteSessionImpl::OnReadDone(NGrpc::TGrpcStatus&& grpcStatus, size_t connectionGeneration) { + TPlainStatus errorStatus; + TProcessSrvMessageResult processResult; + bool needSetValue = false; + if (!grpcStatus.Ok()) { + errorStatus = TPlainStatus(std::move(grpcStatus)); + } + bool doRead = false; + with_lock (Lock) { + UpdateTimedCountersImpl(); + if (connectionGeneration != ConnectionGeneration) { + return; // Message from previous connection. Ignore. + } + if (errorStatus.Ok()) { + if (IsErrorMessage(*ServerMessage)) { + errorStatus = MakeErrorFromProto(*ServerMessage); + } else { + processResult = ProcessServerMessageImpl(); + needSetValue = !InitSeqNoSetDone && processResult.InitSeqNo.Defined() && (InitSeqNoSetDone = true); + if (errorStatus.Ok() && processResult.Ok) { + doRead = true; + } + } + } + } + if (doRead) + ReadFromProcessor(); + + with_lock(Lock) { + if (!errorStatus.Ok()) { + if (processResult.Ok) { // Otherwise, OnError was already called + processResult.HandleResult = RestartImpl(errorStatus); + } + } + if (processResult.HandleResult.DoStop) { + CloseImpl(std::move(errorStatus)); + } + } + for (auto& event : processResult.Events) { + EventsQueue->PushEvent(std::move(event)); + } + if (needSetValue) { + InitSeqNoPromise.SetValue(*processResult.InitSeqNo); + processResult.HandleResult.DoSetSeqNo = false; // Redundant. Just in case. + } + ProcessHandleResult(processResult.HandleResult); +} + +TStringBuilder TWriteSessionImpl::LogPrefix() const { + return TStringBuilder() << "MessageGroupId [" << Settings.MessageGroupId_ << "] SessionId [" << SessionId << "] "; +} + +TString TWriteSessionEvent::TAcksEvent::DebugString() const { + TStringBuilder res; + res << "AcksEvent:"; + for (auto& ack : Acks) { + res << " { seqNo : " << ack.SeqNo << ", State : " << ack.State; + if (ack.Details) { + res << ", offset : " << ack.Details->Offset << ", partitionId : " << ack.Details->PartitionId; + } + res << " }"; + } + if (!Acks.empty() && Acks.back().Stat) { + auto& stat = Acks.back().Stat; + res << " write stat: Write time " << stat->WriteTime << " total time in partition queue " << stat->TotalTimeInPartitionQueue + << " partition quoted time " << stat->PartitionQuotedTime << " topic quoted time " << stat->TopicQuotedTime; + } + return res; +} + +TString TWriteSessionEvent::TReadyToAcceptEvent::DebugString() const { + return "ReadyToAcceptEvent"; +} + + +TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMessageImpl() { + Y_VERIFY(Lock.IsLocked()); + + TProcessSrvMessageResult result; + switch (ServerMessage->GetServerMessageCase()) { + case TServerMessage::SERVER_MESSAGE_NOT_SET: { + SessionEstablished = false; + result.HandleResult = OnErrorImpl({ + static_cast<NYdb::EStatus>(ServerMessage->status()), + {NYql::TIssue{ServerMessage->DebugString()}} + }); + result.Ok = false; + break; + } + case TServerMessage::kInitResponse: { + const auto& initResponse = ServerMessage->init_response(); + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session established. Init response: " << initResponse.ShortDebugString()); + SessionId = initResponse.session_id(); + PartitionId = initResponse.partition_id(); + ui64 newLastSeqNo = initResponse.last_sequence_number(); + // SeqNo increased, so there's a risk of loss, apply SeqNo shift. + // MinUnsentSeqNo must be > 0 if anything was ever sent yet + if (MinUnsentSeqNo && OnSeqNoShift && newLastSeqNo > MinUnsentSeqNo) { + SeqNoShift = newLastSeqNo - MinUnsentSeqNo; + } + result.InitSeqNo = newLastSeqNo; + LastSeqNo = newLastSeqNo; + + SessionEstablished = true; + LastCountersUpdateTs = TInstant::Now(); + SessionStartedTs = TInstant::Now(); + OnErrorResolved(); + + //EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); + result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); + // Kickstart send after session reestablishment + SendImpl(); + break; + } + case TServerMessage::kBatchWriteResponse: { + TWriteSessionEvent::TAcksEvent acksEvent; + const auto& batchWriteResponse = ServerMessage->batch_write_response(); + DbDriverState->Log.Write( + TLOG_DEBUG, + LogPrefix() << "Write session got write response: " << batchWriteResponse.ShortDebugString() + ); + TWriteStat::TPtr writeStat = new TWriteStat{}; + const auto& stat = batchWriteResponse.write_statistics(); + writeStat->WriteTime = TDuration::MilliSeconds(stat.persist_duration_ms()); + writeStat->TotalTimeInPartitionQueue = TDuration::MilliSeconds(stat.queued_in_partition_duration_ms()); + writeStat->PartitionQuotedTime = TDuration::MilliSeconds(stat.throttled_on_partition_duration_ms()); + writeStat->TopicQuotedTime = TDuration::MilliSeconds(stat.throttled_on_topic_duration_ms()); + + for (size_t messageIndex = 0, endIndex = batchWriteResponse.sequence_numbers_size(); messageIndex != endIndex; ++messageIndex) { + // TODO: Fill writer statistics + ui64 sequenceNumber = batchWriteResponse.sequence_numbers(messageIndex); + + acksEvent.Acks.push_back(TWriteSessionEvent::TWriteAck{ + sequenceNumber - SeqNoShift, + batchWriteResponse.already_written(messageIndex) ? TWriteSessionEvent::TWriteAck::EES_ALREADY_WRITTEN: + TWriteSessionEvent::TWriteAck::EES_WRITTEN, + TWriteSessionEvent::TWriteAck::TWrittenMessageDetails { + static_cast<ui64>(batchWriteResponse.offsets(messageIndex)), + PartitionId, + }, + writeStat, + }); + + if (CleanupOnAcknowledged(sequenceNumber - SeqNoShift)) { + result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); + } + } + //EventsQueue->PushEvent(std::move(acksEvent)); + result.Events.emplace_back(std::move(acksEvent)); + break; + } + case TServerMessage::kUpdateTokenResponse: { + UpdateTokenInProgress = false; + DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: token updated successfully"); + UpdateTokenIfNeededImpl(); + break; + } + } + return result; +} + +bool TWriteSessionImpl::CleanupOnAcknowledged(ui64 sequenceNumber) { + bool result = false; + DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: acknoledged message " << sequenceNumber); + UpdateTimedCountersImpl(); + const auto& sentFront = SentOriginalMessages.front(); + ui64 size = 0; + ui64 compressedSize = 0; + if(!SentPackedMessage.empty() && SentPackedMessage.front().Offset == sequenceNumber) { + auto memoryUsage = OnMemoryUsageChangedImpl(-SentPackedMessage.front().Data.size()); + result = memoryUsage.NowOk && !memoryUsage.WasOk; + //EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); + const auto& front = SentPackedMessage.front(); + if (front.Compressed) { + compressedSize = front.Data.size(); + } else { + size = front.Data.size(); + } + + (*Counters->MessagesWritten) += front.MessageCount; + (*Counters->MessagesInflight) -= front.MessageCount; + (*Counters->BytesWritten) += front.OriginalSize; + + SentPackedMessage.pop(); + } else { + size = sentFront.Size; + (*Counters->BytesWritten) += sentFront.Size; + (*Counters->MessagesWritten)++; + (*Counters->MessagesInflight)--; + } + + (*Counters->BytesInflightCompressed) -= compressedSize; + (*Counters->BytesWrittenCompressed) += compressedSize; + (*Counters->BytesInflightUncompressed) -= size; + + Y_VERIFY(Counters->BytesInflightCompressed->Val() >= 0); + Y_VERIFY(Counters->BytesInflightUncompressed->Val() >= 0); + + Y_VERIFY(sentFront.SeqNo == sequenceNumber); + + (*Counters->BytesInflightTotal) = MemoryUsage; + SentOriginalMessages.pop(); + return result; +} + +TMemoryUsageChange TWriteSessionImpl::OnMemoryUsageChangedImpl(i64 diff) { + Y_VERIFY(Lock.IsLocked()); + + bool wasOk = MemoryUsage <= Settings.MaxMemoryUsage_; + //if (diff < 0) { + // Y_VERIFY(MemoryUsage >= static_cast<size_t>(std::abs(diff))); + //} + MemoryUsage += diff; + bool nowOk = MemoryUsage <= Settings.MaxMemoryUsage_; + if (wasOk != nowOk) { + if (wasOk) { + DbDriverState->Log.Write( + TLOG_DEBUG, + LogPrefix() << "Estimated memory usage " << MemoryUsage + << "[B] reached maximum (" << Settings.MaxMemoryUsage_ << "[B])" + ); + } + else { + DbDriverState->Log.Write( + TLOG_DEBUG, + LogPrefix() << "Estimated memory usage got back to normal " << MemoryUsage << "[B]" + ); + } + } + return {wasOk, nowOk}; +} + +TBuffer CompressBuffer(TVector<TStringBuf>& data, ECodec codec, i32 level) { + TBuffer result; + THolder<IOutputStream> coder = NCompressionDetails::CreateCoder(codec, result, level); + for (auto& buffer : data) { + coder->Write(buffer.data(), buffer.size()); + } + coder->Finish(); + return result; +} + +// May call OnCompressed with sync executor. No external lock. +void TWriteSessionImpl::CompressImpl(TBlock&& block_) { + Y_VERIFY(Lock.IsLocked()); + + if (Aborting) { + return; + } + Y_VERIFY(block_.Valid); + + std::shared_ptr<TBlock> blockPtr(std::make_shared<TBlock>()); + blockPtr->Move(block_); + auto lambda = [sharedThis = shared_from_this(), + wire = Tracker->MakeTrackedWire(), + codec = Settings.Codec_, + level = Settings.CompressionLevel_, + isSyncCompression = !CompressionExecutor->IsAsync(), + blockPtr]() mutable { + Y_VERIFY(!blockPtr->Compressed); + + auto compressedData = CompressBuffer( + blockPtr->OriginalDataRefs, codec, level + ); + Y_VERIFY(!compressedData.Empty()); + blockPtr->Data = std::move(compressedData); + blockPtr->Compressed = true; + blockPtr->CodecID = GetCodecId(sharedThis->Settings.Codec_); + sharedThis->OnCompressed(std::move(*blockPtr), isSyncCompression); + }; + + CompressionExecutor->Post(std::move(lambda)); +} + +void TWriteSessionImpl::OnCompressed(TBlock&& block, bool isSyncCompression) { + TMemoryUsageChange memoryUsage; + if (!isSyncCompression) { + with_lock(Lock) { + memoryUsage = OnCompressedImpl(std::move(block)); + } + } else { + memoryUsage = OnCompressedImpl(std::move(block)); + } + if (memoryUsage.NowOk && !memoryUsage.WasOk) { + EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); + } +} + +TMemoryUsageChange TWriteSessionImpl::OnCompressedImpl(TBlock&& block) { + Y_VERIFY(Lock.IsLocked()); + + UpdateTimedCountersImpl(); + Y_VERIFY(block.Valid); + auto memoryUsage = OnMemoryUsageChangedImpl(static_cast<i64>(block.Data.size()) - block.OriginalMemoryUsage); + (*Counters->BytesInflightUncompressed) -= block.OriginalSize; + (*Counters->BytesInflightCompressed) += block.Data.size(); + + PackedMessagesToSend.emplace(std::move(block)); + SendImpl(); + return memoryUsage; +} + +void TWriteSessionImpl::ResetForRetryImpl() { + Y_VERIFY(Lock.IsLocked()); + + SessionEstablished = false; + const size_t totalPackedMessages = PackedMessagesToSend.size() + SentPackedMessage.size(); + const size_t totalOriginalMessages = OriginalMessagesToSend.size() + SentOriginalMessages.size(); + while (!SentPackedMessage.empty()) { + PackedMessagesToSend.emplace(std::move(SentPackedMessage.front())); + SentPackedMessage.pop(); + } + ui64 minSeqNo = PackedMessagesToSend.empty() ? LastSeqNo + 1 : PackedMessagesToSend.top().Offset; + std::queue<TOriginalMessage> freshOriginalMessagesToSend; + OriginalMessagesToSend.swap(freshOriginalMessagesToSend); + while (!SentOriginalMessages.empty()) { + OriginalMessagesToSend.emplace(std::move(SentOriginalMessages.front())); + SentOriginalMessages.pop(); + } + while (!freshOriginalMessagesToSend.empty()) { + OriginalMessagesToSend.emplace(std::move(freshOriginalMessagesToSend.front())); + freshOriginalMessagesToSend.pop(); + } + if (!OriginalMessagesToSend.empty() && OriginalMessagesToSend.front().SeqNo < minSeqNo) + minSeqNo = OriginalMessagesToSend.front().SeqNo; + MinUnsentSeqNo = minSeqNo; + Y_VERIFY(PackedMessagesToSend.size() == totalPackedMessages); + Y_VERIFY(OriginalMessagesToSend.size() == totalOriginalMessages); +} + +// Called from client Write() methods +void TWriteSessionImpl::FlushWriteIfRequiredImpl() { + Y_VERIFY(Lock.IsLocked()); + + if (!CurrentBatch.Empty() && !CurrentBatch.FlushRequested) { + MessagesAcquired += static_cast<ui64>(CurrentBatch.Acquire()); + if (TInstant::Now() - CurrentBatch.StartedAt >= Settings.BatchFlushInterval_.GetOrElse(TDuration::Zero()) + || CurrentBatch.CurrentSize >= Settings.BatchFlushSizeBytes_.GetOrElse(0) + || CurrentBatch.CurrentSize >= MaxBlockSize + || CurrentBatch.Messages.size() >= MaxBlockMessageCount + || CurrentBatch.HasCodec() + ) { + WriteBatchImpl(); + return; + } + } +} + +size_t TWriteSessionImpl::WriteBatchImpl() { + Y_VERIFY(Lock.IsLocked()); + + DbDriverState->Log.Write( + TLOG_DEBUG, + LogPrefix() << "write " << CurrentBatch.Messages.size() << " messages with seqNo from " + << CurrentBatch.Messages.begin()->SeqNo << " to " << CurrentBatch.Messages.back().SeqNo + ); + + Y_VERIFY(CurrentBatch.Messages.size() <= MaxBlockMessageCount); + + const bool skipCompression = Settings.Codec_ == ECodec::RAW || CurrentBatch.HasCodec(); + if (!skipCompression && Settings.CompressionExecutor_->IsAsync()) { + MessagesAcquired += static_cast<ui64>(CurrentBatch.Acquire()); + } + + size_t size = 0; + for (size_t i = 0; i != CurrentBatch.Messages.size();) { + TBlock block{}; + for (; block.OriginalSize < MaxBlockSize && i != CurrentBatch.Messages.size(); ++i) { + auto sequenceNumber = CurrentBatch.Messages[i].SeqNo; + auto createTs = CurrentBatch.Messages[i].CreatedAt; + + if (!block.MessageCount) { + block.Offset = sequenceNumber; + } + + block.MessageCount += 1; + const auto& datum = CurrentBatch.Messages[i].DataRef; + block.OriginalSize += datum.size(); + block.OriginalMemoryUsage = CurrentBatch.Data.size(); + block.OriginalDataRefs.emplace_back(datum); + if (CurrentBatch.Messages[i].Codec.Defined()) { + Y_VERIFY(CurrentBatch.Messages.size() == 1); + block.CodecID = GetCodecId(*CurrentBatch.Messages[i].Codec); + block.OriginalSize = CurrentBatch.Messages[i].OriginalSize; + block.Compressed = false; + } + size += datum.size(); + UpdateTimedCountersImpl(); + (*Counters->BytesInflightUncompressed) += datum.size(); + (*Counters->MessagesInflight)++; + OriginalMessagesToSend.emplace(sequenceNumber, createTs, datum.size()); + } + block.Data = std::move(CurrentBatch.Data); + if (skipCompression) { + PackedMessagesToSend.emplace(std::move(block)); + } else { + CompressImpl(std::move(block)); + } + } + CurrentBatch.Reset(); + if (skipCompression) { + SendImpl(); + } + return size; +} + +size_t GetMaxGrpcMessageSize() { + return 120_MB; +} + +bool TWriteSessionImpl::IsReadyToSendNextImpl() const { + Y_VERIFY(Lock.IsLocked()); + + if (!SessionEstablished) { + return false; + } + if (Aborting) + return false; + if (PackedMessagesToSend.empty()) { + return false; + } + Y_VERIFY(!OriginalMessagesToSend.empty(), "There are packed messages but no original messages"); + Y_VERIFY(OriginalMessagesToSend.front().SeqNo <= PackedMessagesToSend.top().Offset, "Lost original message(s)"); + + return PackedMessagesToSend.top().Offset == OriginalMessagesToSend.front().SeqNo; +} + + +void TWriteSessionImpl::UpdateTokenIfNeededImpl() { + Y_VERIFY(Lock.IsLocked()); + + DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: try to update token"); + + if (!DbDriverState->CredentialsProvider || UpdateTokenInProgress || !SessionEstablished) + return; + TClientMessage clientMessage; + auto* updateRequest = clientMessage.mutable_update_token_request(); + auto token = DbDriverState->CredentialsProvider->GetAuthInfo(); + if (token == PrevToken) + return; + UpdateTokenInProgress = true; + updateRequest->set_token(token); + PrevToken = token; + + DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: updating token"); + + Processor->Write(std::move(clientMessage)); +} + +void TWriteSessionImpl::SendImpl() { + Y_VERIFY(Lock.IsLocked()); + + // External cycle splits ready blocks into multiple gRPC messages. Current gRPC message size hard limit is 64MiB + while(IsReadyToSendNextImpl()) { + TClientMessage clientMessage; + auto* writeRequest = clientMessage.mutable_write_request(); + auto sentAtMs = TInstant::Now().MilliSeconds(); + + // Sent blocks while we can without messages reordering + while (IsReadyToSendNextImpl() && clientMessage.ByteSizeLong() < GetMaxGrpcMessageSize()) { + const auto& block = PackedMessagesToSend.top(); + Y_VERIFY(block.Valid); + for (size_t i = 0; i != block.MessageCount; ++i) { + Y_VERIFY(!OriginalMessagesToSend.empty()); + + auto& message = OriginalMessagesToSend.front(); + + writeRequest->add_sent_at_ms(sentAtMs); + writeRequest->add_sequence_numbers(message.SeqNo + SeqNoShift); + writeRequest->add_message_sizes(message.Size); + writeRequest->add_created_at_ms(message.CreatedAt.MilliSeconds()); + + SentOriginalMessages.emplace(std::move(message)); + OriginalMessagesToSend.pop(); + } + + writeRequest->add_blocks_offsets(block.Offset); + writeRequest->add_blocks_message_counts(block.MessageCount); + writeRequest->add_blocks_part_numbers(block.PartNumber); + writeRequest->add_blocks_uncompressed_sizes(block.OriginalSize); + writeRequest->add_blocks_headers(block.CodecID); + if (block.Compressed) + writeRequest->add_blocks_data(block.Data.data(), block.Data.size()); + else { + for (auto& buffer: block.OriginalDataRefs) { + writeRequest->add_blocks_data(buffer.data(), buffer.size()); + } + } + + TBlock moveBlock; + moveBlock.Move(block); + SentPackedMessage.emplace(std::move(moveBlock)); + PackedMessagesToSend.pop(); + } + UpdateTokenIfNeededImpl(); + DbDriverState->Log.Write( + TLOG_DEBUG, + LogPrefix() << "Send " << writeRequest->sequence_numbers_size() << " message(s) (" + << OriginalMessagesToSend.size() << " left), first sequence number is " + << writeRequest->sequence_numbers(0) + ); + Processor->Write(std::move(clientMessage)); + } +} + +// Client method, no Lock +bool TWriteSessionImpl::Close(TDuration closeTimeout) { + if (AtomicGet(Aborting)) + return false; + DbDriverState->Log.Write( + TLOG_INFO, + LogPrefix() << "Write session: close. Timeout = " << closeTimeout.MilliSeconds() << " ms" + ); + auto startTime = TInstant::Now(); + auto remaining = closeTimeout; + bool ready = false; + bool needSetSeqNoValue = false; + while (remaining > TDuration::Zero()) { + with_lock(Lock) { + if (OriginalMessagesToSend.empty() && SentOriginalMessages.empty()) { + ready = true; + } + if (AtomicGet(Aborting)) + break; + } + if (ready) { + break; + } + remaining = closeTimeout - (TInstant::Now() - startTime); + Sleep(Min(TDuration::MilliSeconds(100), remaining)); + } + with_lock(Lock) { + ready = (OriginalMessagesToSend.empty() && SentOriginalMessages.empty()) && !AtomicGet(Aborting); + } + with_lock(Lock) { + CloseImpl(EStatus::SUCCESS, NYql::TIssues{}); + needSetSeqNoValue = !InitSeqNoSetDone && (InitSeqNoSetDone = true); + } + if (needSetSeqNoValue) { + InitSeqNoPromise.SetException("session closed"); + } + if (ready) { + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session: gracefully shut down, all writes complete"); + } else { + DbDriverState->Log.Write( + TLOG_WARNING, + LogPrefix() << "Write session: could not confirm all writes in time" + << " or session aborted, perform hard shutdown" + ); + } + return ready; +} + +void TWriteSessionImpl::HandleWakeUpImpl() { + Y_VERIFY(Lock.IsLocked()); + + FlushWriteIfRequiredImpl(); + if (AtomicGet(Aborting)) { + return; + } + auto callback = [sharedThis = this->shared_from_this(), wire = Tracker->MakeTrackedWire()] (bool ok) + { + if (!ok) { + return; + } + with_lock(sharedThis->Lock) { + sharedThis->HandleWakeUpImpl(); + } + }; + + if (TInstant::Now() - LastTokenUpdate > UPDATE_TOKEN_PERIOD) { + LastTokenUpdate = TInstant::Now(); + UpdateTokenIfNeededImpl(); + } + + const auto flushAfter = CurrentBatch.StartedAt == TInstant::Zero() + ? WakeupInterval + : WakeupInterval - Min(Now() - CurrentBatch.StartedAt, WakeupInterval); + Connections->ScheduleCallback(flushAfter, std::move(callback)); +} + +void TWriteSessionImpl::UpdateTimedCountersImpl() { + Y_VERIFY(Lock.IsLocked()); + + auto now = TInstant::Now(); + auto delta = (now - LastCountersUpdateTs).MilliSeconds(); + double percent = 100.0 / Settings.MaxMemoryUsage_; + + Counters->TotalBytesInflightUsageByTime->Collect(*Counters->BytesInflightTotal * percent, delta); + Counters->UncompressedBytesInflightUsageByTime->Collect(*Counters->BytesInflightUncompressed * percent, delta); + Counters->CompressedBytesInflightUsageByTime->Collect(*Counters->BytesInflightCompressed * percent, delta); + + *Counters->CurrentSessionLifetimeMs = (TInstant::Now() - SessionStartedTs).MilliSeconds(); + LastCountersUpdateTs = now; + if (LastCountersLogTs == TInstant::Zero() || TInstant::Now() - LastCountersLogTs > TDuration::Seconds(60)) { + LastCountersLogTs = TInstant::Now(); + +#define LOG_COUNTER(counter) \ + << " " Y_STRINGIZE(counter) ": " \ + << Counters->counter->Val() \ + /**/ + + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() + << "Counters: {" + LOG_COUNTER(Errors) + LOG_COUNTER(CurrentSessionLifetimeMs) + LOG_COUNTER(BytesWritten) + LOG_COUNTER(MessagesWritten) + LOG_COUNTER(BytesWrittenCompressed) + LOG_COUNTER(BytesInflightUncompressed) + LOG_COUNTER(BytesInflightCompressed) + LOG_COUNTER(BytesInflightTotal) + LOG_COUNTER(MessagesInflight) + << " }" + ); + +#undef LOG_COUNTER + } +} + +void TWriteSessionImpl::AbortImpl() { + Y_VERIFY(Lock.IsLocked()); + + if (!AtomicGet(Aborting)) { + DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: aborting"); + AtomicSet(Aborting, 1); + Cancel(ConnectContext); + Cancel(ConnectTimeoutContext); + Cancel(ConnectDelayContext); + if (Processor) + Processor->Cancel(); + + Cancel(ClientContext); + ClientContext.reset(); // removes context from contexts set from underlying gRPC-client. + } +} + +void TWriteSessionImpl::CloseImpl(EStatus statusCode, NYql::TIssues&& issues) { + Y_VERIFY(Lock.IsLocked()); + + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will now close"); + EventsQueue->Close(TSessionClosedEvent(statusCode, std::move(issues))); + AbortImpl(); +} + +void TWriteSessionImpl::CloseImpl(EStatus statusCode, const TString& message) { + Y_VERIFY(Lock.IsLocked()); + + NYql::TIssues issues; + issues.AddIssue(message); + CloseImpl(statusCode, std::move(issues)); +} + +void TWriteSessionImpl::CloseImpl(TPlainStatus&& status) { + Y_VERIFY(Lock.IsLocked()); + + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will now close"); + EventsQueue->Close(TSessionClosedEvent(std::move(status))); + AbortImpl(); +} + +TWriteSessionImpl::~TWriteSessionImpl() { + DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: destroy"); + bool needClose = false; + with_lock(Lock) { + if (!AtomicGet(Aborting)) { + CloseImpl(EStatus::SUCCESS, NYql::TIssues{}); + + needClose = !InitSeqNoSetDone && (InitSeqNoSetDone = true); + } + } + if (needClose) { + InitSeqNoPromise.SetException("session closed"); + } +} + +}; // namespace NYdb::NPersQueue diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h new file mode 100644 index 0000000000..c806caa972 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h @@ -0,0 +1,449 @@ +#pragma once + +#include "common.h" +#include "impl_tracker.h" +#include "persqueue_impl.h" + +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h> + +#include <util/generic/buffer.h> + +namespace NYdb::NPersQueue { + +inline const TString& GetCodecId(const ECodec codec) { + static THashMap<ECodec, TString> idByCodec{ + {ECodec::RAW, TString(1, '\0')}, + {ECodec::GZIP, "\1"}, + {ECodec::LZOP, "\2"}, + {ECodec::ZSTD, "\3"} + }; + Y_VERIFY(idByCodec.contains(codec)); + return idByCodec[codec]; +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// TWriteSessionEventsQueue + +class TWriteSessionEventsQueue: public TBaseSessionEventsQueue<TWriteSessionSettings, TWriteSessionEvent::TEvent, TSessionClosedEvent, IExecutor> { + using TParent = TBaseSessionEventsQueue<TWriteSessionSettings, TWriteSessionEvent::TEvent, TSessionClosedEvent, IExecutor>; + +public: + TWriteSessionEventsQueue(const TWriteSessionSettings& settings, + std::shared_ptr<TImplTracker> tracker = std::make_shared<TImplTracker>()) + : TParent(settings) + , Tracker(std::move(tracker)) + {} + + void PushEvent(TEventInfo eventInfo) { + if (Closed || ApplyHandler(eventInfo)) { + return; + } + + TWaiter waiter; + with_lock (Mutex) { + Events.emplace(std::move(eventInfo)); + waiter = PopWaiterImpl(); + } + waiter.Signal(); // Does nothing if waiter is empty. + } + + TMaybe<TEvent> GetEvent(bool block = false) { + TMaybe<TEventInfo> eventInfo; + with_lock (Mutex) { + if (block) { + WaitEventsImpl(); + } + if (HasEventsImpl()) { + eventInfo = GetEventImpl(); + } else { + return Nothing(); + } + } + eventInfo->OnUserRetrievedEvent(); + return std::move(eventInfo->Event); + } + + TVector<TEvent> GetEvents(bool block = false, TMaybe<size_t> maxEventsCount = Nothing()) { + TVector<TEventInfo> eventInfos; + with_lock (Mutex) { + if (block) { + WaitEventsImpl(); + } + eventInfos.reserve(Min(Events.size() + CloseEvent.Defined(), maxEventsCount ? *maxEventsCount : std::numeric_limits<size_t>::max())); + while (!Events.empty()) { + eventInfos.emplace_back(GetEventImpl()); + if (maxEventsCount && eventInfos.size() >= *maxEventsCount) { + break; + } + } + if (CloseEvent && Events.empty() && (!maxEventsCount || eventInfos.size() < *maxEventsCount)) { + eventInfos.push_back({*CloseEvent}); + } + } + + TVector<TEvent> result; + result.reserve(eventInfos.size()); + for (TEventInfo& eventInfo : eventInfos) { + eventInfo.OnUserRetrievedEvent(); + result.emplace_back(std::move(eventInfo.Event)); + } + return result; + } + + void Close(const TSessionClosedEvent& event) { + TWaiter waiter; + with_lock (Mutex) { + CloseEvent = event; + Closed = true; + waiter = TWaiter(Waiter.ExtractPromise(), this); + } + + TEventInfo info(event); + ApplyHandler(info); + + waiter.Signal(); + } + +private: + struct THandlersVisitor : public TParent::TBaseHandlersVisitor { + using TParent::TBaseHandlersVisitor::TBaseHandlersVisitor; + +#define DECLARE_HANDLER(type, handler, answer) \ + bool operator()(type& event) { \ + if (Settings.EventHandlers_.handler) { \ + Settings.EventHandlers_.handler(event); \ + return answer; \ + } \ + return false; \ + } \ + /**/ + + DECLARE_HANDLER(TWriteSessionEvent::TAcksEvent, AcksHandler_, true); + DECLARE_HANDLER(TWriteSessionEvent::TReadyToAcceptEvent, ReadyToAcceptHander_, true); + DECLARE_HANDLER(TSessionClosedEvent, SessionClosedHandler_, false); // Not applied + +#undef DECLARE_HANDLER + + bool Visit() { + return std::visit(*this, Event); + } + + }; + + bool ApplyHandler(TEventInfo& eventInfo) { + THandlersVisitor visitor(Settings, eventInfo.GetEvent(), Tracker); + return visitor.Visit(); + } + + TEventInfo GetEventImpl() { // Assumes that we're under lock and that the event queue has events. + Y_ASSERT(HasEventsImpl()); + if (!Events.empty()) { + TEventInfo event = std::move(Events.front()); + Events.pop(); + RenewWaiterImpl(); + return event; + } + Y_ASSERT(CloseEvent); + return {*CloseEvent}; + } + +private: + std::shared_ptr<TImplTracker> Tracker; +}; + +struct TMemoryUsageChange { + bool WasOk; //!< MemoryUsage <= Config.MaxMemoryUsage_ before update + bool NowOk; //!< Same, only after update +}; + +namespace NTests { + class TSimpleWriteSessionTestAdapter; +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// TWriteSessionImpl + +class TWriteSessionImpl : public IWriteSession, + public std::enable_shared_from_this<TWriteSessionImpl> { +private: + friend class TWriteSession; + friend class TSimpleBlockingWriteSession; + friend class NTests::TSimpleWriteSessionTestAdapter; + +private: + using TClientMessage = Ydb::PersQueue::V1::StreamingWriteClientMessage; + using TServerMessage = Ydb::PersQueue::V1::StreamingWriteServerMessage; + using IWriteSessionConnectionProcessorFactory = + TPersQueueClient::TImpl::IWriteSessionConnectionProcessorFactory; + using IProcessor = IWriteSessionConnectionProcessorFactory::IProcessor; + + struct TMessage { + ui64 SeqNo; + TInstant CreatedAt; + TStringBuf DataRef; + TMaybe<ECodec> Codec; + ui32 OriginalSize; // only for coded messages + TMessage(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec = {}, ui32 originalSize = 0) + : SeqNo(seqNo) + , CreatedAt(createdAt) + , DataRef(data) + , Codec(codec) + , OriginalSize(originalSize) + {} + }; + + struct TMessageBatch { + TBuffer Data; + TVector<TMessage> Messages; + ui64 CurrentSize = 0; + TInstant StartedAt = TInstant::Zero(); + bool Acquired = false; + bool FlushRequested = false; + void Add(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize) { + if (StartedAt == TInstant::Zero()) + StartedAt = TInstant::Now(); + CurrentSize += codec ? originalSize : data.size(); + Messages.emplace_back(seqNo, createdAt, data, codec, originalSize); + Acquired = false; + } + + bool HasCodec() const { + return Messages.empty() ? false : Messages.front().Codec.Defined(); + } + + bool Acquire() { + if (Acquired || Messages.empty()) + return false; + auto currSize = Data.size(); + Data.Append(Messages.back().DataRef.data(), Messages.back().DataRef.size()); + Messages.back().DataRef = TStringBuf(Data.data() + currSize, Data.size() - currSize); + Acquired = true; + return true; + } + + bool Empty() const noexcept { + return CurrentSize == 0 && Messages.empty(); + } + + void Reset() { + StartedAt = TInstant::Zero(); + Messages.clear(); + Data.Clear(); + Acquired = false; + CurrentSize = 0; + FlushRequested = false; + } + }; + + struct TBlock { + size_t Offset = 0; //!< First message sequence number in the block + size_t MessageCount = 0; + size_t PartNumber = 0; + size_t OriginalSize = 0; + size_t OriginalMemoryUsage = 0; + TString CodecID = GetCodecId(ECodec::RAW); + mutable TVector<TStringBuf> OriginalDataRefs; + mutable TBuffer Data; + bool Compressed = false; + mutable bool Valid = true; + + TBlock& operator=(TBlock&&) = default; + TBlock(TBlock&&) = default; + TBlock() = default; + + //For taking ownership by copying from const object, f.e. lambda -> std::function, priority_queue + void Move(const TBlock& rhs) { + Offset = rhs.Offset; + MessageCount = rhs.MessageCount; + PartNumber = rhs.PartNumber; + OriginalSize = rhs.OriginalSize; + OriginalMemoryUsage = rhs.OriginalMemoryUsage; + CodecID = rhs.CodecID; + OriginalDataRefs.swap(rhs.OriginalDataRefs); + Data.Swap(rhs.Data); + Compressed = rhs.Compressed; + + rhs.Data.Clear(); + rhs.OriginalDataRefs.clear(); + } + }; + + struct TOriginalMessage { + ui64 SeqNo; + TInstant CreatedAt; + size_t Size; + TOriginalMessage(const ui64 sequenceNumber, const TInstant createdAt, const size_t size) + : SeqNo(sequenceNumber) + , CreatedAt(createdAt) + , Size(size) + {} + }; + + //! Block comparer, makes block with smallest offset (first sequence number) appear on top of the PackedMessagesToSend priority queue + struct Greater { + bool operator() (const TBlock& lhs, const TBlock& rhs) { + return lhs.Offset > rhs.Offset; + } + }; + + struct THandleResult { + bool DoRestart = false; + TDuration StartDelay = TDuration::Zero(); + bool DoStop = false; + bool DoSetSeqNo = false; + }; + struct TProcessSrvMessageResult { + THandleResult HandleResult; + TMaybe<ui64> InitSeqNo; + TVector<TWriteSessionEvent::TEvent> Events; + bool Ok = true; + }; + + THandleResult OnErrorImpl(NYdb::TPlainStatus&& status); // true - should Start(), false - should Close(), empty - no action + +public: + TWriteSessionImpl(const TWriteSessionSettings& settings, + std::shared_ptr<TPersQueueClient::TImpl> client, + std::shared_ptr<TGRpcConnectionsImpl> connections, + TDbDriverStatePtr dbDriverState, + std::shared_ptr<TImplTracker> tracker); + + TMaybe<TWriteSessionEvent::TEvent> GetEvent(bool block = false) override; + TVector<TWriteSessionEvent::TEvent> GetEvents(bool block = false, + TMaybe<size_t> maxEventsCount = Nothing()) override; + NThreading::TFuture<ui64> GetInitSeqNo() override; + + void Write(TContinuationToken&& continuationToken, TStringBuf data, + TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) override; + + void WriteEncoded(TContinuationToken&& continuationToken, TStringBuf data, ECodec codec, ui32 originalSize, + TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) override; + + + NThreading::TFuture<void> WaitEvent() override; + + // Empty maybe - block till all work is done. Otherwise block at most at closeTimeout duration. + bool Close(TDuration closeTimeout = TDuration::Max()) override; + + TWriterCounters::TPtr GetCounters() override {Y_FAIL("Unimplemented"); } //ToDo - unimplemented; + + ~TWriteSessionImpl(); // will not call close - destroy everything without acks + +private: + + TStringBuilder LogPrefix() const; + + void UpdateTokenIfNeededImpl(); + + void WriteInternal(TContinuationToken&& continuationToken, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize, + TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()); + + void FlushWriteIfRequiredImpl(); + size_t WriteBatchImpl(); + void Start(const TDuration& delay); + void InitWriter(); + + void DoCdsRequest(TDuration delay = TDuration::Zero()); + void OnCdsResponse(TStatus& status, const Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResult& result); + void OnConnect(TPlainStatus&& st, typename IProcessor::TPtr&& processor, + const NGrpc::IQueueClientContextPtr& connectContext); + void OnConnectTimeout(const NGrpc::IQueueClientContextPtr& connectTimeoutContext); + void ResetForRetryImpl(); + THandleResult RestartImpl(const TPlainStatus& status); + void DoConnect(const TDuration& delay, const TString& endpoint); + void InitImpl(); + void ReadFromProcessor(); // Assumes that we're under lock. + void WriteToProcessorImpl(TClientMessage&& req); // Assumes that we're under lock. + void OnReadDone(NGrpc::TGrpcStatus&& grpcStatus, size_t connectionGeneration); + void OnWriteDone(NGrpc::TGrpcStatus&& status, size_t connectionGeneration); + TProcessSrvMessageResult ProcessServerMessageImpl(); + TMemoryUsageChange OnMemoryUsageChangedImpl(i64 diff); + void CompressImpl(TBlock&& block); + void OnCompressed(TBlock&& block, bool isSyncCompression=false); + TMemoryUsageChange OnCompressedImpl(TBlock&& block); + + //TString GetDebugIdentity() const; + Ydb::PersQueue::V1::StreamingWriteClientMessage GetInitClientMessage(); + bool CleanupOnAcknowledged(ui64 sequenceNumber); + bool IsReadyToSendNextImpl() const; + ui64 GetNextSeqNoImpl(const TMaybe<ui64>& seqNo); + void SendImpl(); + void AbortImpl(); + void CloseImpl(EStatus statusCode, NYql::TIssues&& issues); + void CloseImpl(EStatus statusCode, const TString& message); + void CloseImpl(TPlainStatus&& status); + + void OnErrorResolved() { + RetryState = nullptr; + } + void CheckHandleResultImpl(THandleResult& result); + void ProcessHandleResult(THandleResult& result); + void HandleWakeUpImpl(); + void UpdateTimedCountersImpl(); + +private: + TWriteSessionSettings Settings; + std::shared_ptr<TPersQueueClient::TImpl> Client; + std::shared_ptr<TGRpcConnectionsImpl> Connections; + TString TargetCluster; + TString InitialCluster; + TString CurrentCluster; + bool OnSeqNoShift = false; + TString PreferredClusterByCDS; + std::shared_ptr<IWriteSessionConnectionProcessorFactory> ConnectionFactory; + TDbDriverStatePtr DbDriverState; + TStringType PrevToken; + bool UpdateTokenInProgress = false; + TInstant LastTokenUpdate = TInstant::Zero(); + std::shared_ptr<TImplTracker> Tracker; + std::shared_ptr<TWriteSessionEventsQueue> EventsQueue; + NGrpc::IQueueClientContextPtr ClientContext; // Common client context. + NGrpc::IQueueClientContextPtr ConnectContext; + NGrpc::IQueueClientContextPtr ConnectTimeoutContext; + NGrpc::IQueueClientContextPtr ConnectDelayContext; + size_t ConnectionGeneration = 0; + size_t ConnectionAttemptsDone = 0; + TAdaptiveLock Lock; + IProcessor::TPtr Processor; + IRetryPolicy::IRetryState::TPtr RetryState; // Current retry state (if now we are (re)connecting). + std::shared_ptr<TServerMessage> ServerMessage; // Server message to write server response to. + + TString SessionId; + IExecutor::TPtr Executor; + IExecutor::TPtr CompressionExecutor; + size_t MemoryUsage = 0; //!< Estimated amount of memory used + + TMessageBatch CurrentBatch; + + std::queue<TOriginalMessage> OriginalMessagesToSend; + std::priority_queue<TBlock, std::vector<TBlock>, Greater> PackedMessagesToSend; + //! Messages that are sent but yet not acknowledged + std::queue<TOriginalMessage> SentOriginalMessages; + std::queue<TBlock> SentPackedMessage; + + const size_t MaxBlockSize = std::numeric_limits<size_t>::max(); + const size_t MaxBlockMessageCount = 1; //!< Max message count that can be packed into a single block. In block version 0 is equal to 1 for compatibility + bool Connected = false; + bool Started = false; + TAtomic Aborting = 0; + bool SessionEstablished = false; + ui32 PartitionId = 0; + ui64 LastSeqNo = 0; + ui64 MinUnsentSeqNo = 0; + ui64 SeqNoShift = 0; + TMaybe<bool> AutoSeqNoMode; + bool ValidateSeqNoMode = false; + + NThreading::TPromise<ui64> InitSeqNoPromise; + bool InitSeqNoSetDone = false; + TInstant SessionStartedTs; + TInstant LastCountersUpdateTs = TInstant::Zero(); + TInstant LastCountersLogTs; + TWriterCounters::TPtr Counters; + TDuration WakeupInterval; + +protected: + ui64 MessagesAcquired = 0; +}; + +}; // namespace NYdb::NPersQueue diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h index fb9138654b..5205d9a152 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h @@ -349,7 +349,7 @@ enum class EClusterDiscoveryMode { }; class TContinuationToken : public TMoveOnly { - friend class TWriteSession; + friend class TWriteSessionImpl; private: TContinuationToken() = default; }; @@ -1105,8 +1105,8 @@ struct TWriteSessionSettings : public TRequestSettings<TWriteSessionSettings> { //! Function to handle all event types. //! If event with current type has no handler for this type of event, //! this handler (if specified) will be used. - //! If this handler is not specified, event can be received with TReadSession::GetEvent() method. - FLUENT_SETTING(std::function<void(TReadSessionEvent::TEvent&)>, CommonHandler); + //! If this handler is not specified, event can be received with TWriteSession::GetEvent() method. + FLUENT_SETTING(std::function<void(TWriteSessionEvent::TEvent&)>, CommonHandler); //! Executor for handlers. //! If not set, default single threaded executor will be used. diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.cpp index 6a853b6fc5..3b99df5ec4 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.cpp @@ -25,7 +25,7 @@ TSimpleWriteSessionTestAdapter::TSimpleWriteSessionTestAdapter(TSimpleBlockingWr ui64 TSimpleWriteSessionTestAdapter::GetAcquiredMessagesCount() const { if (Session->Writer) - return Session->Writer->MessagesAcquired; + return Session->Writer->Impl->MessagesAcquired; else return 0; } diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.darwin-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.darwin-x86_64.txt index 5ecc4a1483..dc5af49696 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.darwin-x86_64.txt +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.darwin-x86_64.txt @@ -32,6 +32,7 @@ target_sources(client-ydb_topic-impl PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/event_handlers.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp ) diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.linux-aarch64.txt b/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.linux-aarch64.txt index 85e3be9d82..06bdc9532e 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.linux-aarch64.txt +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.linux-aarch64.txt @@ -33,6 +33,7 @@ target_sources(client-ydb_topic-impl PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/event_handlers.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp ) diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.linux-x86_64.txt b/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.linux-x86_64.txt index 85e3be9d82..06bdc9532e 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.linux-x86_64.txt +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/CMakeLists.linux-x86_64.txt @@ -33,6 +33,7 @@ target_sources(client-ydb_topic-impl PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/event_handlers.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp ) diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp index 8e7d930eff..e5978a856c 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp @@ -34,11 +34,16 @@ TReadSession::TReadSession(const TReadSessionSettings& settings, TReadSession::~TReadSession() { Abort(EStatus::ABORTED, "Aborted"); ClearAllEvents(); + + if (Tracker) { + Tracker->AsyncComplete().Wait(); + } } void TReadSession::Start() { ErrorHandler = MakeIntrusive<NPersQueue::TErrorHandler<false>>(weak_from_this()); - EventsQueue = std::make_shared<NPersQueue::TReadSessionEventsQueue<false>>(Settings, weak_from_this()); + Tracker = std::make_shared<NPersQueue::TImplTracker>(); + EventsQueue = std::make_shared<NPersQueue::TReadSessionEventsQueue<false>>(Settings, weak_from_this(), Tracker); if (!ValidateSettings()) { return; @@ -58,6 +63,8 @@ void TReadSession::Start() { } void TReadSession::CreateClusterSessionsImpl(NPersQueue::TDeferredActions<false>& deferred) { + Y_VERIFY(Lock.IsLocked()); + // Create cluster sessions. Log.Write( TLOG_DEBUG, @@ -78,7 +85,8 @@ void TReadSession::CreateClusterSessionsImpl(NPersQueue::TDeferredActions<false> EventsQueue, ErrorHandler, context, - 1, 1); + 1, 1, + Tracker); deferred.DeferStartSession(Session); } @@ -271,23 +279,28 @@ void TReadSession::DumpCountersToLog(size_t timeNumber) { void TReadSession::ScheduleDumpCountersToLog(size_t timeNumber) { with_lock(Lock) { + if (Aborting || Closing) { + return; + } DumpCountersContext = Connections->CreateContext(); - } - if (DumpCountersContext) { - auto callback = [self = weak_from_this(), timeNumber](bool ok) { - if (ok) { - if (auto sharedSelf = self.lock()) { - sharedSelf->DumpCountersToLog(timeNumber); + if (DumpCountersContext) { + auto callback = [self = weak_from_this(), timeNumber](bool ok) { + if (ok) { + if (auto sharedSelf = self.lock()) { + sharedSelf->DumpCountersToLog(timeNumber); + } } - } - }; - Connections->ScheduleCallback(TDuration::Seconds(1), - std::move(callback), - DumpCountersContext); + }; + Connections->ScheduleCallback(TDuration::Seconds(1), + std::move(callback), + DumpCountersContext); + } } } void TReadSession::AbortImpl(TSessionClosedEvent&& closeEvent, NPersQueue::TDeferredActions<false>& deferred) { + Y_VERIFY(Lock.IsLocked()); + if (!Aborting) { Aborting = true; Log.Write(TLOG_NOTICE, GetLogPrefix() << "Aborting read session. Description: " << closeEvent.DebugString()); @@ -301,10 +314,14 @@ void TReadSession::AbortImpl(TSessionClosedEvent&& closeEvent, NPersQueue::TDefe } void TReadSession::AbortImpl(EStatus statusCode, NYql::TIssues&& issues, NPersQueue::TDeferredActions<false>& deferred) { + Y_VERIFY(Lock.IsLocked()); + AbortImpl(TSessionClosedEvent(statusCode, std::move(issues)), deferred); } void TReadSession::AbortImpl(EStatus statusCode, const TString& message, NPersQueue::TDeferredActions<false>& deferred) { + Y_VERIFY(Lock.IsLocked()); + NYql::TIssues issues; issues.AddIssue(message); AbortImpl(statusCode, std::move(issues), deferred); diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h index 1ac885691b..f58fa2786e 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h @@ -2,6 +2,7 @@ #include "topic_impl.h" +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/impl_tracker.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h> namespace NYdb::NTopic { @@ -120,6 +121,7 @@ private: NPersQueue::IErrorHandler<false>::TPtr ErrorHandler; TDbDriverStatePtr DbDriverState; TAdaptiveLock Lock; + std::shared_ptr<NPersQueue::TImplTracker> Tracker; std::shared_ptr<NPersQueue::TReadSessionEventsQueue<false>> EventsQueue; NPersQueue::TSingleClusterReadSessionImpl<false>::TPtr Session; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp index 83103a1ec9..e42d98b95b 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp @@ -10,1120 +10,62 @@ namespace NYdb::NTopic { -using ::NMonitoring::TDynamicCounterPtr; -using TCounterPtr = ::NMonitoring::TDynamicCounters::TCounterPtr; - -const TDuration UPDATE_TOKEN_PERIOD = TDuration::Hours(1); - -namespace NCompressionDetails { - THolder<IOutputStream> CreateCoder(ECodec codec, TBuffer& result, int quality); -} - -#define HISTOGRAM_SETUP ::NMonitoring::ExplicitHistogram({0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100}) -TWriterCounters::TWriterCounters(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters) { - Errors = counters->GetCounter("errors", true); - CurrentSessionLifetimeMs = counters->GetCounter("currentSessionLifetimeMs", false); - BytesWritten = counters->GetCounter("bytesWritten", true); - MessagesWritten = counters->GetCounter("messagesWritten", true); - BytesWrittenCompressed = counters->GetCounter("bytesWrittenCompressed", true); - BytesInflightUncompressed = counters->GetCounter("bytesInflightUncompressed", false); - BytesInflightCompressed = counters->GetCounter("bytesInflightCompressed", false); - BytesInflightTotal = counters->GetCounter("bytesInflightTotal", false); - MessagesInflight = counters->GetCounter("messagesInflight", false); - - TotalBytesInflightUsageByTime = counters->GetHistogram("totalBytesInflightUsageByTime", HISTOGRAM_SETUP); - UncompressedBytesInflightUsageByTime = counters->GetHistogram("uncompressedBytesInflightUsageByTime", HISTOGRAM_SETUP); - CompressedBytesInflightUsageByTime = counters->GetHistogram("compressedBytesInflightUsageByTime", HISTOGRAM_SETUP); -} -#undef HISTOGRAM_SETUP +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// TWriteSession TWriteSession::TWriteSession( const TWriteSessionSettings& settings, std::shared_ptr<TTopicClient::TImpl> client, std::shared_ptr<TGRpcConnectionsImpl> connections, TDbDriverStatePtr dbDriverState) - : Settings(settings) - , Client(std::move(client)) - , Connections(std::move(connections)) - , DbDriverState(std::move(dbDriverState)) - , PrevToken(DbDriverState->CredentialsProvider ? DbDriverState->CredentialsProvider->GetAuthInfo() : "") - , EventsQueue(std::make_shared<TWriteSessionEventsQueue>(Settings)) - , InitSeqNoPromise(NThreading::NewPromise<ui64>()) - , WakeupInterval( - Settings.BatchFlushInterval_.GetOrElse(TDuration::Zero()) ? - std::min(Settings.BatchFlushInterval_.GetOrElse(TDuration::Seconds(1)) / 5, TDuration::MilliSeconds(100)) - : - TDuration::MilliSeconds(100) - ) + : Tracker(std::make_shared<NPersQueue::TImplTracker>()) + , Impl(std::make_shared<TWriteSessionImpl>(settings, std::move(client), std::move(connections), std::move(dbDriverState), Tracker)) { - if (!Settings.RetryPolicy_) { - Settings.RetryPolicy_ = IRetryPolicy::GetDefaultPolicy(); - } - if (Settings.Counters_.Defined()) { - Counters = *Settings.Counters_; - } else { - Counters = MakeIntrusive<TWriterCounters>(new ::NMonitoring::TDynamicCounters()); - } - } void TWriteSession::Start(const TDuration& delay) { - ++ConnectionAttemptsDone; - if (!Started) { - HandleWakeUpImpl(); - InitWriter(); - } - Started = true; - - DoConnect(delay, DbDriverState->DiscoveryEndpoint); + Impl->Start(delay); } -// Only called under lock -TWriteSession::THandleResult TWriteSession::RestartImpl(const TPlainStatus& status) { - THandleResult result; - if (AtomicGet(Aborting)) { - DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session is aborting and will not restart"); - return result; - } - DbDriverState->Log.Write( - TLOG_INFO, - LogPrefix() << "Got error. Status: " << status.Status - << ". Description: " << NPersQueue::IssuesSingleLineString(status.Issues) - ); - SessionEstablished = false; - TMaybe<TDuration> nextDelay = TDuration::Zero(); - if (!RetryState) { - RetryState = Settings.RetryPolicy_->CreateRetryState(); - } - nextDelay = RetryState->GetNextRetryDelay(status.Status); - - if (nextDelay) { - result.StartDelay = *nextDelay; - result.DoRestart = true; - DbDriverState->Log.Write( - TLOG_DEBUG, - LogPrefix() << "Write session will restart in " << result.StartDelay.MilliSeconds() << " ms" - ); - ResetForRetryImpl(); - - } else { - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will not restart after a fatal error"); - result.DoStop = true; - CheckHandleResultImpl(result); - } - return result; -} - -void TWriteSession::InitWriter() { // No Lock, very initial start - no race yet as well. - CompressionExecutor = Settings.CompressionExecutor_; - IExecutor::TPtr executor; - executor = CreateSyncExecutor(); - executor->Start(); - Executor = std::move(executor); - - Settings.CompressionExecutor_->Start(); - Settings.EventHandlers_.HandlersExecutor_->Start(); - -} -// Client method NThreading::TFuture<ui64> TWriteSession::GetInitSeqNo() { - if (Settings.ValidateSeqNo_) { - if (AutoSeqNoMode.Defined() && *AutoSeqNoMode) { - DbDriverState->Log.Write(TLOG_ERR, LogPrefix() << "Cannot call GetInitSeqNo in Auto SeqNo mode"); - ThrowFatalError("Cannot call GetInitSeqNo in Auto SeqNo mode"); - } - else - AutoSeqNoMode = false; - } - return InitSeqNoPromise.GetFuture(); -} - -TString DebugString(const TWriteSessionEvent::TEvent& event) { - return std::visit([](const auto& ev) { return ev.DebugString(); }, event); + return Impl->GetInitSeqNo(); } -// Client method TMaybe<TWriteSessionEvent::TEvent> TWriteSession::GetEvent(bool block) { - return EventsQueue->GetEvent(block); + return Impl->EventsQueue->GetEvent(block); } -// Client method TVector<TWriteSessionEvent::TEvent> TWriteSession::GetEvents(bool block, TMaybe<size_t> maxEventsCount) { - return EventsQueue->GetEvents(block, maxEventsCount); -} - -// Only called under lock -ui64 TWriteSession::GetNextSeqNoImpl(const TMaybe<ui64>& seqNo) { - ui64 seqNoValue = LastSeqNo + 1; - if (!AutoSeqNoMode.Defined()) { - AutoSeqNoMode = !seqNo.Defined(); - //! Disable SeqNo shift for manual SeqNo mode; - if (seqNo.Defined()) { - OnSeqNoShift = false; - SeqNoShift = 0; - } - } - if (seqNo.Defined()) { - if (*AutoSeqNoMode) { - DbDriverState->Log.Write( - TLOG_ERR, - LogPrefix() << "Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode" - ); - ThrowFatalError( - "Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode" - ); - } else { - seqNoValue = *seqNo; - } - } else if (!(*AutoSeqNoMode)) { - DbDriverState->Log.Write( - TLOG_ERR, - LogPrefix() << "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode" - ); - ThrowFatalError( - "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode" - ); - } - LastSeqNo = seqNoValue; - return seqNoValue; -} -inline void TWriteSession::CheckHandleResultImpl(THandleResult& result) { - result.DoSetSeqNo = result.DoStop && !InitSeqNoSetDone && (InitSeqNoSetDone = true); -} - -void TWriteSession::ProcessHandleResult(THandleResult& result) { - if (result.DoRestart) { - Start(result.StartDelay); - } else if (result.DoSetSeqNo) { - InitSeqNoPromise.SetException("session closed"); - } + return Impl->EventsQueue->GetEvents(block, maxEventsCount); } NThreading::TFuture<void> TWriteSession::WaitEvent() { - return EventsQueue->WaitEvent(); -} - -// Client method. -void TWriteSession::WriteInternal( - TContinuationToken&&, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp - ) { - TInstant createdAtValue = createTimestamp.Defined() ? *createTimestamp : TInstant::Now(); - bool readyToAccept = false; - size_t bufferSize = data.size(); - with_lock(Lock) { - CurrentBatch.Add(GetNextSeqNoImpl(seqNo), createdAtValue, data, codec, originalSize); - - FlushWriteIfRequiredImpl(); - readyToAccept = OnMemoryUsageChangedImpl(bufferSize).NowOk; - } - if (readyToAccept) { - EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); - } -} - -// Client method. -void TWriteSession::WriteEncoded( - TContinuationToken&& token, TStringBuf data, ECodec codec, ui32 originalSize, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp - ) { - WriteInternal(std::move(token), data, codec, originalSize, seqNo, createTimestamp); -} - -void TWriteSession::Write( - TContinuationToken&& token, TStringBuf data, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp - ) { - WriteInternal(std::move(token), data, {}, 0, seqNo, createTimestamp); -} - - -// Only called under lock. -TWriteSession::THandleResult TWriteSession::OnErrorImpl(NYdb::TPlainStatus&& status) { - (*Counters->Errors)++; - auto result = RestartImpl(status); - if (result.DoStop) { - CloseImpl(status.Status, std::move(status.Issues)); - } - return result; -} - -// No lock -void TWriteSession::DoConnect(const TDuration& delay, const TString& endpoint) { - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Start write session. Will connect to endpoint: " << endpoint); - - NGrpc::IQueueClientContextPtr prevConnectContext; - NGrpc::IQueueClientContextPtr prevConnectTimeoutContext; - NGrpc::IQueueClientContextPtr prevConnectDelayContext; - NGrpc::IQueueClientContextPtr connectContext = nullptr; - NGrpc::IQueueClientContextPtr connectDelayContext = nullptr; - NGrpc::IQueueClientContextPtr connectTimeoutContext = nullptr; - TRpcRequestSettings reqSettings; - std::shared_ptr<IWriteSessionConnectionProcessorFactory> connectionFactory; - with_lock(Lock) { - ++ConnectionGeneration; - auto subclient = Client; - connectionFactory = subclient->CreateWriteSessionConnectionProcessorFactory(); - auto clientContext = subclient->CreateContext(); - ConnectionFactory = connectionFactory; - - ClientContext = std::move(clientContext); - ServerMessage = std::make_shared<TServerMessage>(); - - if (!ClientContext) { - AbortImpl(); - // Grpc and WriteSession is closing right now. - return; - } - - connectContext = ClientContext->CreateContext(); - if (delay) - connectDelayContext = ClientContext->CreateContext(); - connectTimeoutContext = ClientContext->CreateContext(); - - // Previous operations contexts. - - // Set new context - prevConnectContext = std::exchange(ConnectContext, connectContext); - prevConnectTimeoutContext = std::exchange(ConnectTimeoutContext, connectTimeoutContext); - prevConnectDelayContext = std::exchange(ConnectDelayContext, connectDelayContext); - Y_ASSERT(ConnectContext); - Y_ASSERT(ConnectTimeoutContext); - - // Cancel previous operations. - NPersQueue::Cancel(prevConnectContext); - if (prevConnectDelayContext) - NPersQueue::Cancel(prevConnectDelayContext); - NPersQueue::Cancel(prevConnectTimeoutContext); - Y_ASSERT(connectContext); - Y_ASSERT(connectTimeoutContext); - reqSettings = TRpcRequestSettings::Make(Settings); - } - auto connectCallback = [weakThis = weak_from_this(), connectContext = connectContext] - (TPlainStatus&& st, typename IProcessor::TPtr&& processor) { - if (auto sharedThis = weakThis.lock()) { - sharedThis->OnConnect(std::move(st), std::move(processor), connectContext); - } - }; - - auto connectTimeoutCallback = [weakThis = weak_from_this(), connectTimeoutContext = connectTimeoutContext] - (bool ok) { - if (ok) { - if (auto sharedThis = weakThis.lock()) { - sharedThis->OnConnectTimeout(connectTimeoutContext); - } - } - }; - - connectionFactory->CreateProcessor( - std::move(connectCallback), - reqSettings, - std::move(connectContext), - TDuration::Seconds(30) /* connect timeout */, // TODO: make connect timeout setting. - std::move(connectTimeoutContext), - std::move(connectTimeoutCallback), - delay, - std::move(connectDelayContext) - ); -} - -// RPC callback. -void TWriteSession::OnConnectTimeout(const NGrpc::IQueueClientContextPtr& connectTimeoutContext) { - DbDriverState->Log.Write(TLOG_ERR, LogPrefix() << "Write session: connect timeout"); - THandleResult handleResult; - with_lock (Lock) { - if (ConnectTimeoutContext == connectTimeoutContext) { - NPersQueue::Cancel(ConnectContext); - ConnectContext = nullptr; - ConnectTimeoutContext = nullptr; - ConnectDelayContext = nullptr; - } else { - return; - } - TStringBuilder description; - description << "Failed to establish connection to server. Attempts done: " << ConnectionAttemptsDone; - handleResult = RestartImpl(TPlainStatus(EStatus::TIMEOUT, description)); - if (handleResult.DoStop) { - CloseImpl( - EStatus::TIMEOUT, - description - ); - } - } - ProcessHandleResult(handleResult); -} - -// RPC callback. -void TWriteSession::OnConnect( - TPlainStatus&& st, typename IProcessor::TPtr&& processor, const NGrpc::IQueueClientContextPtr& connectContext -) { - THandleResult handleResult; - with_lock (Lock) { - if (ConnectContext == connectContext) { - NPersQueue::Cancel(ConnectTimeoutContext); - ConnectContext = nullptr; - ConnectTimeoutContext = nullptr; - ConnectDelayContext = nullptr; - - if (st.Ok()) { - Processor = std::move(processor); - InitImpl(); - // Still should call ReadFromProcessor(); - } - } else { - return; - } - if (!st.Ok()) { - handleResult = RestartImpl(st); - if (handleResult.DoStop) { - CloseImpl( - st.Status, - NPersQueue::MakeIssueWithSubIssues( - TStringBuilder() << "Failed to establish connection to server \"" << st.Endpoint - << "\". Attempts done: " << ConnectionAttemptsDone, - st.Issues - ) - ); - } - } - } - if (st.Ok()) - ReadFromProcessor(); // Out of Init - ProcessHandleResult(handleResult); + return Impl->EventsQueue->WaitEvent(); } -// Produce init request for session. -// Only called under lock. -void TWriteSession::InitImpl() { - TClientMessage req; - auto* init = req.mutable_init_request(); - init->set_path(Settings.Path_); - init->set_producer_id(Settings.ProducerId_); - init->set_message_group_id(Settings.MessageGroupId_); - - for (const auto& attr : Settings.Meta_.Fields) { - (*init->mutable_write_session_meta())[attr.first] = attr.second; - } - DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: send init request: "<< req.ShortDebugString()); - WriteToProcessorImpl(std::move(req)); +void TWriteSession::WriteEncoded(TContinuationToken&& token, TStringBuf data, ECodec codec, ui32 originalSize, + TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp) { + Impl->WriteInternal(std::move(token), data, codec, originalSize, seqNo, createTimestamp); } -// Called under lock. Invokes Processor->Write, which is assumed to be deadlock-safe -void TWriteSession::WriteToProcessorImpl(TWriteSession::TClientMessage&& req) { - Y_ASSERT(Processor); - if (Aborting) - return; - auto callback = [weakThis = weak_from_this(), connectionGeneration = ConnectionGeneration](NGrpc::TGrpcStatus&& grpcStatus) { - if (auto sharedThis = weakThis.lock()) { - sharedThis->OnWriteDone(std::move(grpcStatus), connectionGeneration); - } - }; - - Processor->Write(std::move(req), callback); +void TWriteSession::Write(TContinuationToken&& token, TStringBuf data, TMaybe<ui64> seqNo, + TMaybe<TInstant> createTimestamp) { + Impl->WriteInternal(std::move(token), data, {}, 0, seqNo, createTimestamp); } -void TWriteSession::ReadFromProcessor() { - Y_ASSERT(Processor); - IProcessor::TPtr prc; - ui64 generation; - with_lock(Lock) { - prc = Processor; - generation = ConnectionGeneration; - } - auto callback = [weakThis = weak_from_this(), connectionGeneration = generation, processor = prc, serverMessage = ServerMessage] - (NGrpc::TGrpcStatus&& grpcStatus) { - if (auto sharedThis = weakThis.lock()) { - sharedThis->OnReadDone(std::move(grpcStatus), connectionGeneration); - } - }; - prc->Read(ServerMessage.get(), std::move(callback)); -} - -void TWriteSession::OnWriteDone(NGrpc::TGrpcStatus&& status, size_t connectionGeneration) { - THandleResult handleResult; - with_lock (Lock) { - if (connectionGeneration != ConnectionGeneration) { - return; // Message from previous connection. Ignore. - } - if (Aborting) { - return; - } - if(!status.Ok()) { - handleResult = OnErrorImpl(status); - } - } - ProcessHandleResult(handleResult); -} - -void TWriteSession::OnReadDone(NGrpc::TGrpcStatus&& grpcStatus, size_t connectionGeneration) { - TPlainStatus errorStatus; - TProcessSrvMessageResult processResult; - bool needSetValue = false; - if (!grpcStatus.Ok()) { - errorStatus = TPlainStatus(std::move(grpcStatus)); - } - bool doRead = false; - with_lock (Lock) { - UpdateTimedCountersImpl(); - if (connectionGeneration != ConnectionGeneration) { - return; // Message from previous connection. Ignore. - } - if (errorStatus.Ok()) { - if (NPersQueue::IsErrorMessage(*ServerMessage)) { - errorStatus = NPersQueue::MakeErrorFromProto(*ServerMessage); - } else { - processResult = ProcessServerMessageImpl(); - needSetValue = !InitSeqNoSetDone && processResult.InitSeqNo.Defined() && (InitSeqNoSetDone = true); - if (errorStatus.Ok() && processResult.Ok) { - doRead = true; - } - } - } - } - if (doRead) - ReadFromProcessor(); - - with_lock(Lock) { - if (!errorStatus.Ok()) { - if (processResult.Ok) { // Otherwise, OnError was already called - processResult.HandleResult = RestartImpl(errorStatus); - } - } - if (processResult.HandleResult.DoStop) { - CloseImpl(std::move(errorStatus)); - } - } - for (auto& event : processResult.Events) { - EventsQueue->PushEvent(std::move(event)); - } - if (needSetValue) { - InitSeqNoPromise.SetValue(*processResult.InitSeqNo); - processResult.HandleResult.DoSetSeqNo = false; // Redundant. Just in case. - } - ProcessHandleResult(processResult.HandleResult); -} - -TStringBuilder TWriteSession::LogPrefix() const { - return TStringBuilder() << "ProducerId [" << Settings.ProducerId_ << "] MessageGroupId [" << Settings.MessageGroupId_ << "] SessionId [" << SessionId << "] "; -} - -TString TWriteSessionEvent::TAcksEvent::DebugString() const { - TStringBuilder res; - res << "AcksEvent:"; - for (auto& ack : Acks) { - res << " { seqNo : " << ack.SeqNo << ", State : " << ack.State; - if (ack.Details) { - res << ", offset : " << ack.Details->Offset << ", partitionId : " << ack.Details->PartitionId; - } - res << " }"; - } - if (!Acks.empty() && Acks.back().Stat) { - auto& stat = Acks.back().Stat; - res << " write stat: Write time " << stat->WriteTime - << " minimal time in partition queue " << stat->MinTimeInPartitionQueue - << " maximal time in partition queue " << stat->MaxTimeInPartitionQueue - << " partition quoted time " << stat->PartitionQuotedTime - << " topic quoted time " << stat->TopicQuotedTime; - } - return res; -} - -TString TWriteSessionEvent::TReadyToAcceptEvent::DebugString() const { - return "ReadyToAcceptEvent"; -} - - -TWriteSession::TProcessSrvMessageResult TWriteSession::ProcessServerMessageImpl() { - TProcessSrvMessageResult result; - switch (ServerMessage->GetServerMessageCase()) { - case TServerMessage::SERVER_MESSAGE_NOT_SET: { - SessionEstablished = false; - result.HandleResult = OnErrorImpl({ - static_cast<NYdb::EStatus>(ServerMessage->status()), - {NYql::TIssue{ServerMessage->DebugString()}} - }); - result.Ok = false; - break; - } - case TServerMessage::kInitResponse: { - const auto& initResponse = ServerMessage->init_response(); - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session established. Init response: " << initResponse.ShortDebugString()); - SessionId = initResponse.session_id(); - PartitionId = initResponse.partition_id(); - ui64 newLastSeqNo = initResponse.last_seq_no(); - // SeqNo increased, so there's a risk of loss, apply SeqNo shift. - // MinUnsentSeqNo must be > 0 if anything was ever sent yet - if (MinUnsentSeqNo && OnSeqNoShift && newLastSeqNo > MinUnsentSeqNo) { - SeqNoShift = newLastSeqNo - MinUnsentSeqNo; - } - result.InitSeqNo = newLastSeqNo; - LastSeqNo = newLastSeqNo; - - SessionEstablished = true; - LastCountersUpdateTs = TInstant::Now(); - SessionStartedTs = TInstant::Now(); - OnErrorResolved(); - - //EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); - result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); - // Kickstart send after session reestablishment - SendImpl(); - break; - } - case TServerMessage::kWriteResponse: { - TWriteSessionEvent::TAcksEvent acksEvent; - const auto& batchWriteResponse = ServerMessage->write_response(); - DbDriverState->Log.Write( - TLOG_DEBUG, - LogPrefix() << "Write session got write response: " << batchWriteResponse.ShortDebugString() - ); - TWriteStat::TPtr writeStat = new TWriteStat{}; - const auto& stat = batchWriteResponse.write_statistics(); - - auto durationConv = [](const ::google::protobuf::Duration& dur) { - return TDuration::MilliSeconds(::google::protobuf::util::TimeUtil::DurationToMilliseconds(dur)); - }; - - writeStat->WriteTime = durationConv(stat.persisting_time()); - writeStat->MinTimeInPartitionQueue = durationConv(stat.min_queue_wait_time()); - writeStat->MaxTimeInPartitionQueue = durationConv(stat.max_queue_wait_time()); - writeStat->PartitionQuotedTime = durationConv(stat.partition_quota_wait_time()); - writeStat->TopicQuotedTime = durationConv(stat.topic_quota_wait_time()); - - for (size_t messageIndex = 0, endIndex = batchWriteResponse.acks_size(); messageIndex != endIndex; ++messageIndex) { - // TODO: Fill writer statistics - auto ack = batchWriteResponse.acks(messageIndex); - ui64 sequenceNumber = ack.seq_no(); - - Y_VERIFY(ack.has_written() || ack.has_skipped()); - auto msgWriteStatus = ack.has_written() - ? TWriteSessionEvent::TWriteAck::EES_WRITTEN - : (ack.skipped().reason() == Ydb::Topic::StreamWriteMessage_WriteResponse_WriteAck_Skipped_Reason::StreamWriteMessage_WriteResponse_WriteAck_Skipped_Reason_REASON_ALREADY_WRITTEN - ? TWriteSessionEvent::TWriteAck::EES_ALREADY_WRITTEN - : TWriteSessionEvent::TWriteAck::EES_DISCARDED); - - ui64 offset = ack.has_written() ? ack.written().offset() : 0; - - acksEvent.Acks.push_back(TWriteSessionEvent::TWriteAck{ - sequenceNumber - SeqNoShift, - msgWriteStatus, - TWriteSessionEvent::TWriteAck::TWrittenMessageDetails { - offset, - PartitionId, - }, - writeStat, - }); - - if (CleanupOnAcknowledged(sequenceNumber - SeqNoShift)) { - result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); - } - } - //EventsQueue->PushEvent(std::move(acksEvent)); - result.Events.emplace_back(std::move(acksEvent)); - break; - } - case TServerMessage::kUpdateTokenResponse: { - UpdateTokenInProgress = false; - DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: token updated successfully"); - UpdateTokenIfNeededImpl(); - break; - } - } - return result; -} - -bool TWriteSession::CleanupOnAcknowledged(ui64 sequenceNumber) { - bool result = false; - DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: acknoledged message " << sequenceNumber); - UpdateTimedCountersImpl(); - const auto& sentFront = SentOriginalMessages.front(); - ui64 size = 0; - ui64 compressedSize = 0; - if(!SentPackedMessage.empty() && SentPackedMessage.front().Offset == sequenceNumber) { - auto memoryUsage = OnMemoryUsageChangedImpl(-SentPackedMessage.front().Data.size()); - result = memoryUsage.NowOk && !memoryUsage.WasOk; - //EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); - const auto& front = SentPackedMessage.front(); - if (front.Compressed) { - compressedSize = front.Data.size(); - } else { - size = front.Data.size(); - } - - (*Counters->MessagesWritten) += front.MessageCount; - (*Counters->MessagesInflight) -= front.MessageCount; - (*Counters->BytesWritten) += front.OriginalSize; - - SentPackedMessage.pop(); - } else { - size = sentFront.Size; - (*Counters->BytesWritten) += sentFront.Size; - (*Counters->MessagesWritten)++; - (*Counters->MessagesInflight)--; - } - - (*Counters->BytesInflightCompressed) -= compressedSize; - (*Counters->BytesWrittenCompressed) += compressedSize; - (*Counters->BytesInflightUncompressed) -= size; - - Y_VERIFY(Counters->BytesInflightCompressed->Val() >= 0); - Y_VERIFY(Counters->BytesInflightUncompressed->Val() >= 0); - - Y_VERIFY(sentFront.SeqNo == sequenceNumber); - - (*Counters->BytesInflightTotal) = MemoryUsage; - SentOriginalMessages.pop(); - return result; -} - -// Only called under Lock -TMemoryUsageChange TWriteSession::OnMemoryUsageChangedImpl(i64 diff) { - bool wasOk = MemoryUsage <= Settings.MaxMemoryUsage_; - //if (diff < 0) { - // Y_VERIFY(MemoryUsage >= static_cast<size_t>(std::abs(diff))); - //} - MemoryUsage += diff; - bool nowOk = MemoryUsage <= Settings.MaxMemoryUsage_; - if (wasOk != nowOk) { - if (wasOk) { - DbDriverState->Log.Write( - TLOG_DEBUG, - LogPrefix() << "Estimated memory usage " << MemoryUsage - << "[B] reached maximum (" << Settings.MaxMemoryUsage_ << "[B])" - ); - } - else { - DbDriverState->Log.Write( - TLOG_DEBUG, - LogPrefix() << "Estimated memory usage got back to normal " << MemoryUsage << "[B]" - ); - } - } - return {wasOk, nowOk}; -} - -TBuffer CompressBuffer(TVector<TStringBuf>& data, ECodec codec, i32 level) { - TBuffer result; - THolder<IOutputStream> coder = NCompressionDetails::CreateCoder(codec, result, level); - for (auto& buffer : data) { - coder->Write(buffer.data(), buffer.size()); - } - coder->Finish(); - return result; -} - -// May call OnCompressed with sync executor. No external lock. -void TWriteSession::CompressImpl(TBlock&& block_) { - auto weakThis = weak_from_this(); - bool isSyncCompression = !CompressionExecutor->IsAsync(); - Y_VERIFY(block_.Valid); - - std::shared_ptr<TBlock> blockPtr(std::make_shared<TBlock>()); - blockPtr->Move(block_); - auto lambda = [weakThis, codec = Settings.Codec_, level = Settings.CompressionLevel_, - isSyncCompression, blockPtr]() mutable - { - if (auto sharedThis = weakThis.lock()) { - Y_VERIFY(!blockPtr->Compressed); - - auto compressedData = CompressBuffer( - blockPtr->OriginalDataRefs, codec, level - ); - Y_VERIFY(!compressedData.Empty()); - blockPtr->Data = std::move(compressedData); - blockPtr->Compressed = true; - blockPtr->CodecID = static_cast<ui32>(sharedThis->Settings.Codec_); - sharedThis->OnCompressed(std::move(*blockPtr), isSyncCompression); - } - }; - - CompressionExecutor->Post(lambda); -} - -void TWriteSession::OnCompressed(TBlock&& block, bool isSyncCompression) { - TMemoryUsageChange memoryUsage; - if (!isSyncCompression) { - with_lock(Lock) { - memoryUsage = OnCompressedImpl(std::move(block)); - } - } else { - memoryUsage = OnCompressedImpl(std::move(block)); - } - if (memoryUsage.NowOk && !memoryUsage.WasOk) { - EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); - } -} - -//Called under lock or synchronously if compression is sync -TMemoryUsageChange TWriteSession::OnCompressedImpl(TBlock&& block) { - UpdateTimedCountersImpl(); - Y_VERIFY(block.Valid); - auto memoryUsage = OnMemoryUsageChangedImpl(static_cast<i64>(block.Data.size()) - block.OriginalMemoryUsage); - (*Counters->BytesInflightUncompressed) -= block.OriginalSize; - (*Counters->BytesInflightCompressed) += block.Data.size(); - - PackedMessagesToSend.emplace(std::move(block)); - SendImpl(); - return memoryUsage; -} - -// Only called under lock -void TWriteSession::ResetForRetryImpl() { - SessionEstablished = false; - const size_t totalPackedMessages = PackedMessagesToSend.size() + SentPackedMessage.size(); - const size_t totalOriginalMessages = OriginalMessagesToSend.size() + SentOriginalMessages.size(); - while (!SentPackedMessage.empty()) { - PackedMessagesToSend.emplace(std::move(SentPackedMessage.front())); - SentPackedMessage.pop(); - } - ui64 minSeqNo = PackedMessagesToSend.empty() ? LastSeqNo + 1 : PackedMessagesToSend.top().Offset; - std::queue<TOriginalMessage> freshOriginalMessagesToSend; - OriginalMessagesToSend.swap(freshOriginalMessagesToSend); - while (!SentOriginalMessages.empty()) { - OriginalMessagesToSend.emplace(std::move(SentOriginalMessages.front())); - SentOriginalMessages.pop(); - } - while (!freshOriginalMessagesToSend.empty()) { - OriginalMessagesToSend.emplace(std::move(freshOriginalMessagesToSend.front())); - freshOriginalMessagesToSend.pop(); - } - if (!OriginalMessagesToSend.empty() && OriginalMessagesToSend.front().SeqNo < minSeqNo) - minSeqNo = OriginalMessagesToSend.front().SeqNo; - MinUnsentSeqNo = minSeqNo; - Y_VERIFY(PackedMessagesToSend.size() == totalPackedMessages); - Y_VERIFY(OriginalMessagesToSend.size() == totalOriginalMessages); -} - -// Called from client Write() methods. With lock -void TWriteSession::FlushWriteIfRequiredImpl() { - - if (!CurrentBatch.Empty() && !CurrentBatch.FlushRequested) { - MessagesAcquired += static_cast<ui64>(CurrentBatch.Acquire()); - if (TInstant::Now() - CurrentBatch.StartedAt >= Settings.BatchFlushInterval_.GetOrElse(TDuration::Zero()) - || CurrentBatch.CurrentSize >= Settings.BatchFlushSizeBytes_.GetOrElse(0) - || CurrentBatch.CurrentSize >= MaxBlockSize - || CurrentBatch.Messages.size() >= MaxBlockMessageCount - || CurrentBatch.HasCodec() - ) { - WriteBatchImpl(); - return; - } - } -} - - -// Involves compression, but still called under lock. -size_t TWriteSession::WriteBatchImpl() { - DbDriverState->Log.Write( - TLOG_DEBUG, - LogPrefix() << "write " << CurrentBatch.Messages.size() << " messages with seqNo from " - << CurrentBatch.Messages.begin()->SeqNo << " to " << CurrentBatch.Messages.back().SeqNo - ); - - Y_VERIFY(CurrentBatch.Messages.size() <= MaxBlockMessageCount); - - const bool skipCompression = Settings.Codec_ == ECodec::RAW || CurrentBatch.HasCodec(); - if (!skipCompression && Settings.CompressionExecutor_->IsAsync()) { - MessagesAcquired += static_cast<ui64>(CurrentBatch.Acquire()); - } - - size_t size = 0; - for (size_t i = 0; i != CurrentBatch.Messages.size();) { - TBlock block{}; - for (; block.OriginalSize < MaxBlockSize && i != CurrentBatch.Messages.size(); ++i) { - auto sequenceNumber = CurrentBatch.Messages[i].SeqNo; - auto createTs = CurrentBatch.Messages[i].CreatedAt; - - if (!block.MessageCount) { - block.Offset = sequenceNumber; - } - - block.MessageCount += 1; - const auto& datum = CurrentBatch.Messages[i].DataRef; - block.OriginalSize += datum.size(); - block.OriginalMemoryUsage = CurrentBatch.Data.size(); - block.OriginalDataRefs.emplace_back(datum); - if (CurrentBatch.Messages[i].Codec.Defined()) { - Y_VERIFY(CurrentBatch.Messages.size() == 1); - block.CodecID = static_cast<ui32>(*CurrentBatch.Messages[i].Codec); - block.OriginalSize = CurrentBatch.Messages[i].OriginalSize; - block.Compressed = false; - } - size += datum.size(); - UpdateTimedCountersImpl(); - (*Counters->BytesInflightUncompressed) += datum.size(); - (*Counters->MessagesInflight)++; - OriginalMessagesToSend.emplace(sequenceNumber, createTs, datum.size()); - } - block.Data = std::move(CurrentBatch.Data); - if (skipCompression) { - PackedMessagesToSend.emplace(std::move(block)); - } else { - CompressImpl(std::move(block)); - } - } - CurrentBatch.Reset(); - if (skipCompression) { - SendImpl(); - } - return size; -} - -size_t GetMaxGrpcMessageSize() { - return 120_MB; -} - -bool TWriteSession::IsReadyToSendNextImpl() const { - if (!SessionEstablished) { - return false; - } - if (Aborting) - return false; - if (PackedMessagesToSend.empty()) { - return false; - } - Y_VERIFY(!OriginalMessagesToSend.empty(), "There are packed messages but no original messages"); - Y_VERIFY(OriginalMessagesToSend.front().SeqNo <= PackedMessagesToSend.top().Offset, "Lost original message(s)"); - - return PackedMessagesToSend.top().Offset == OriginalMessagesToSend.front().SeqNo; -} - - -void TWriteSession::UpdateTokenIfNeededImpl() { - DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: try to update token"); - - if (!DbDriverState->CredentialsProvider || UpdateTokenInProgress || !SessionEstablished) - return; - TClientMessage clientMessage; - auto* updateRequest = clientMessage.mutable_update_token_request(); - auto token = DbDriverState->CredentialsProvider->GetAuthInfo(); - if (token == PrevToken) - return; - UpdateTokenInProgress = true; - updateRequest->set_token(token); - PrevToken = token; - - DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: updating token"); - - Processor->Write(std::move(clientMessage)); -} - -void TWriteSession::SendImpl() { - // External cycle splits ready blocks into multiple gRPC messages. Current gRPC message size hard limit is 64MiB - while(IsReadyToSendNextImpl()) { - TClientMessage clientMessage; - auto* writeRequest = clientMessage.mutable_write_request(); - - // Sent blocks while we can without messages reordering - while (IsReadyToSendNextImpl() && clientMessage.ByteSizeLong() < GetMaxGrpcMessageSize()) { - const auto& block = PackedMessagesToSend.top(); - Y_VERIFY(block.Valid); - writeRequest->set_codec(static_cast<i32>(block.CodecID)); - Y_VERIFY(block.MessageCount == 1); - for (size_t i = 0; i != block.MessageCount; ++i) { - Y_VERIFY(!OriginalMessagesToSend.empty()); - - auto& message = OriginalMessagesToSend.front(); - - auto* msgData = writeRequest->add_messages(); - - - msgData->set_seq_no(message.SeqNo + SeqNoShift); - *msgData->mutable_created_at() = ::google::protobuf::util::TimeUtil::MillisecondsToTimestamp(message.CreatedAt.MilliSeconds()); - - SentOriginalMessages.emplace(std::move(message)); - OriginalMessagesToSend.pop(); - - msgData->set_uncompressed_size(block.OriginalSize); - if (block.Compressed) - msgData->set_data(block.Data.data(), block.Data.size()); - else { - for (auto& buffer: block.OriginalDataRefs) { - msgData->set_data(buffer.data(), buffer.size()); - } - } - } - - - TBlock moveBlock; - moveBlock.Move(block); - SentPackedMessage.emplace(std::move(moveBlock)); - PackedMessagesToSend.pop(); - } - UpdateTokenIfNeededImpl(); - DbDriverState->Log.Write( - TLOG_DEBUG, - LogPrefix() << "Send " << writeRequest->messages_size() << " message(s) (" - << OriginalMessagesToSend.size() << " left), first sequence number is " - << writeRequest->messages(0).seq_no() - ); - Processor->Write(std::move(clientMessage)); - } -} - -// Client method, no Lock bool TWriteSession::Close(TDuration closeTimeout) { - if (AtomicGet(Aborting)) - return false; - DbDriverState->Log.Write( - TLOG_INFO, - LogPrefix() << "Write session: close. Timeout = " << closeTimeout.MilliSeconds() << " ms" - ); - auto startTime = TInstant::Now(); - auto remaining = closeTimeout; - bool ready = false; - bool needSetSeqNoValue = false; - while (remaining > TDuration::Zero()) { - with_lock(Lock) { - if (OriginalMessagesToSend.empty() && SentOriginalMessages.empty()) { - ready = true; - } - if (AtomicGet(Aborting)) - break; - } - if (ready) { - break; - } - remaining = closeTimeout - (TInstant::Now() - startTime); - Sleep(Min(TDuration::MilliSeconds(100), remaining)); - } - with_lock(Lock) { - ready = (OriginalMessagesToSend.empty() && SentOriginalMessages.empty()) && !AtomicGet(Aborting); - } - with_lock(Lock) { - CloseImpl(EStatus::SUCCESS, NYql::TIssues{}); - needSetSeqNoValue = !InitSeqNoSetDone && (InitSeqNoSetDone = true); - } - if (needSetSeqNoValue) { - InitSeqNoPromise.SetException("session closed"); - } - if (ready) { - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session: gracefully shut down, all writes complete"); - } else { - DbDriverState->Log.Write( - TLOG_WARNING, - LogPrefix() << "Write session: could not confirm all writes in time" - << " or session aborted, perform hard shutdown" - ); - } - return ready; -} - -void TWriteSession::HandleWakeUpImpl() { - FlushWriteIfRequiredImpl(); - if (AtomicGet(Aborting)) { - return; - } - auto callback = [weakThis = this->weak_from_this()] (bool ok) - { - if (!ok) - return; - if (auto sharedThis = weakThis.lock()) { - with_lock(sharedThis->Lock) { - sharedThis->HandleWakeUpImpl(); - } - } - }; - auto enqueueTokenCallback = [weakThis = this->weak_from_this()] (bool ok) { - if (!ok) - return; - if (auto sharedThis = weakThis.lock()) { - sharedThis->EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); - } - }; - if (TInstant::Now() - LastTokenUpdate > UPDATE_TOKEN_PERIOD) { - LastTokenUpdate = TInstant::Now(); - UpdateTokenIfNeededImpl(); - } - - const auto flushAfter = CurrentBatch.StartedAt == TInstant::Zero() - ? WakeupInterval - : WakeupInterval - Min(Now() - CurrentBatch.StartedAt, WakeupInterval); - Connections->ScheduleCallback(flushAfter, std::move(callback)); -} - -void TWriteSession::UpdateTimedCountersImpl() { - auto now = TInstant::Now(); - auto delta = (now - LastCountersUpdateTs).MilliSeconds(); - double percent = 100.0 / Settings.MaxMemoryUsage_; - - Counters->TotalBytesInflightUsageByTime->Collect(*Counters->BytesInflightTotal * percent, delta); - Counters->UncompressedBytesInflightUsageByTime->Collect(*Counters->BytesInflightUncompressed * percent, delta); - Counters->CompressedBytesInflightUsageByTime->Collect(*Counters->BytesInflightCompressed * percent, delta); - - *Counters->CurrentSessionLifetimeMs = (TInstant::Now() - SessionStartedTs).MilliSeconds(); - LastCountersUpdateTs = now; - if (LastCountersLogTs == TInstant::Zero() || TInstant::Now() - LastCountersLogTs > TDuration::Seconds(60)) { - LastCountersLogTs = TInstant::Now(); - -#define LOG_COUNTER(counter) \ - << " " Y_STRINGIZE(counter) ": " \ - << Counters->counter->Val() \ - /**/ - - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() - << "Counters: {" - LOG_COUNTER(Errors) - LOG_COUNTER(CurrentSessionLifetimeMs) - LOG_COUNTER(BytesWritten) - LOG_COUNTER(MessagesWritten) - LOG_COUNTER(BytesWrittenCompressed) - LOG_COUNTER(BytesInflightUncompressed) - LOG_COUNTER(BytesInflightCompressed) - LOG_COUNTER(BytesInflightTotal) - LOG_COUNTER(MessagesInflight) - << " }" - ); - -#undef LOG_COUNTER - } -} - -void TWriteSession::AbortImpl() { - if (!AtomicGet(Aborting)) { - DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: aborting"); - AtomicSet(Aborting, 1); - NPersQueue::Cancel(ConnectContext); - NPersQueue::Cancel(ConnectTimeoutContext); - NPersQueue::Cancel(ConnectDelayContext); - if (Processor) - Processor->Cancel(); - - NPersQueue::Cancel(ClientContext); - ClientContext.reset(); // removes context from contexts set from underlying gRPC-client. - } -} - -void TWriteSession::CloseImpl(EStatus statusCode, NYql::TIssues&& issues) { - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will now close"); - EventsQueue->Close(TSessionClosedEvent(statusCode, std::move(issues))); - AbortImpl(); -} - -void TWriteSession::CloseImpl(EStatus statusCode, const TString& message) { - NYql::TIssues issues; - issues.AddIssue(message); - CloseImpl(statusCode, std::move(issues)); -} - -void TWriteSession::CloseImpl(TPlainStatus&& status) { - DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will now close"); - EventsQueue->Close(TSessionClosedEvent(std::move(status))); - AbortImpl(); + return Impl->Close(closeTimeout); } TWriteSession::~TWriteSession() { - DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: destroy"); - bool needClose = false; - with_lock(Lock) { - if (!AtomicGet(Aborting)) { - CloseImpl(EStatus::SUCCESS, NYql::TIssues{}); - - needClose = !InitSeqNoSetDone && (InitSeqNoSetDone = true); - } - } - if (needClose) - InitSeqNoPromise.SetException("session closed"); + Impl->Close(TDuration::Zero()); + Tracker->AsyncComplete().Wait(); } +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// TSimpleBlockingWriteSession + TSimpleBlockingWriteSession::TSimpleBlockingWriteSession( const TWriteSessionSettings& settings, std::shared_ptr<TTopicClient::TImpl> client, diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h index 1d46d0be49..7e480c883e 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h @@ -1,8 +1,10 @@ #pragma once #include "topic_impl.h" +#include "write_session_impl.h" #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/impl_tracker.h> #include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> #include <util/generic/buffer.h> @@ -10,140 +12,8 @@ namespace NYdb::NTopic { -inline const TString& GetCodecId(const ECodec codec) { - static THashMap<ECodec, TString> idByCodec{ - {ECodec::RAW, TString(1, '\0')}, - {ECodec::GZIP, "\1"}, - {ECodec::LZOP, "\2"}, - {ECodec::ZSTD, "\3"} - }; - Y_VERIFY(idByCodec.contains(codec)); - return idByCodec[codec]; -} - -class TWriteSessionEventsQueue: public NPersQueue::TBaseSessionEventsQueue<TWriteSessionSettings, TWriteSessionEvent::TEvent, TSessionClosedEvent, IExecutor> { - using TParent = TBaseSessionEventsQueue<TWriteSessionSettings, TWriteSessionEvent::TEvent, TSessionClosedEvent, IExecutor>; - -public: - TWriteSessionEventsQueue(const TWriteSessionSettings& settings) - : TParent(settings) - {} - - void PushEvent(TEventInfo eventInfo) { - if (Closed || ApplyHandler(eventInfo)) { - return; - } - - NPersQueue::TWaiter waiter; - with_lock (Mutex) { - Events.emplace(std::move(eventInfo)); - waiter = PopWaiterImpl(); - } - waiter.Signal(); // Does nothing if waiter is empty. - } - - TMaybe<TEvent> GetEvent(bool block = false) { - TMaybe<TEventInfo> eventInfo; - with_lock (Mutex) { - if (block) { - WaitEventsImpl(); - } - if (HasEventsImpl()) { - eventInfo = GetEventImpl(); - } else { - return Nothing(); - } - } - eventInfo->OnUserRetrievedEvent(); - return std::move(eventInfo->Event); - } - - TVector<TEvent> GetEvents(bool block = false, TMaybe<size_t> maxEventsCount = Nothing()) { - TVector<TEventInfo> eventInfos; - with_lock (Mutex) { - if (block) { - WaitEventsImpl(); - } - eventInfos.reserve(Min(Events.size() + CloseEvent.Defined(), maxEventsCount ? *maxEventsCount : std::numeric_limits<size_t>::max())); - while (!Events.empty()) { - eventInfos.emplace_back(GetEventImpl()); - if (maxEventsCount && eventInfos.size() >= *maxEventsCount) { - break; - } - } - if (CloseEvent && Events.empty() && (!maxEventsCount || eventInfos.size() < *maxEventsCount)) { - eventInfos.push_back({*CloseEvent}); - } - } - - TVector<TEvent> result; - result.reserve(eventInfos.size()); - for (TEventInfo& eventInfo : eventInfos) { - eventInfo.OnUserRetrievedEvent(); - result.emplace_back(std::move(eventInfo.Event)); - } - return result; - } - - void Close(const TSessionClosedEvent& event) { - NPersQueue::TWaiter waiter; - with_lock (Mutex) { - CloseEvent = event; - Closed = true; - waiter = NPersQueue::TWaiter(Waiter.ExtractPromise(), this); - } - - TEventInfo info(event); - ApplyHandler(info); - - waiter.Signal(); - } - -private: - struct THandlersVisitor : public TParent::TBaseHandlersVisitor { - using TParent::TBaseHandlersVisitor::TBaseHandlersVisitor; -#define DECLARE_HANDLER(type, handler, answer) \ - bool operator()(type& event) { \ - if (Settings.EventHandlers_.handler) { \ - Settings.EventHandlers_.handler(event); \ - return answer; \ - } \ - return false; \ - } \ - /**/ - DECLARE_HANDLER(TWriteSessionEvent::TAcksEvent, AcksHandler_, true); - DECLARE_HANDLER(TWriteSessionEvent::TReadyToAcceptEvent, ReadyToAcceptHander_, true); - DECLARE_HANDLER(TSessionClosedEvent, SessionClosedHandler_, false); // Not applied - -#undef DECLARE_HANDLER - bool Visit() { - return std::visit(*this, Event); - } - - }; - - bool ApplyHandler(TEventInfo& eventInfo) { - THandlersVisitor visitor(Settings, eventInfo.Event); - return visitor.Visit(); - } - - TEventInfo GetEventImpl() { // Assumes that we're under lock and that the event queue has events. - Y_ASSERT(HasEventsImpl()); - if (!Events.empty()) { - TEventInfo event = std::move(Events.front()); - Events.pop(); - RenewWaiterImpl(); - return event; - } - Y_ASSERT(CloseEvent); - return {*CloseEvent}; - } -}; - -struct TMemoryUsageChange { - bool WasOk; //!< MemoryUsage <= Config.MaxMemoryUsage_ before update - bool NowOk; //!< Same, only after update -}; +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// TWriteSession class TWriteSession : public IWriteSession, public std::enable_shared_from_this<TWriteSession> { @@ -151,136 +21,6 @@ private: friend class TSimpleBlockingWriteSession; friend class TTopicClient; - using TClientMessage = Ydb::Topic::StreamWriteMessage::FromClient; - using TServerMessage = Ydb::Topic::StreamWriteMessage::FromServer; - using IWriteSessionConnectionProcessorFactory = - TTopicClient::TImpl::IWriteSessionConnectionProcessorFactory; - using IProcessor = IWriteSessionConnectionProcessorFactory::IProcessor; - - struct TMessage { - ui64 SeqNo; - TInstant CreatedAt; - TStringBuf DataRef; - TMaybe<ECodec> Codec; - ui32 OriginalSize; // only for coded messages - TMessage(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec = {}, ui32 originalSize = 0) - : SeqNo(seqNo) - , CreatedAt(createdAt) - , DataRef(data) - , Codec(codec) - , OriginalSize(originalSize) - {} - }; - - struct TMessageBatch { - TBuffer Data; - TVector<TMessage> Messages; - ui64 CurrentSize = 0; - TInstant StartedAt = TInstant::Zero(); - bool Acquired = false; - bool FlushRequested = false; - void Add(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize) { - if (StartedAt == TInstant::Zero()) - StartedAt = TInstant::Now(); - CurrentSize += codec ? originalSize : data.size(); - Messages.emplace_back(seqNo, createdAt, data, codec, originalSize); - Acquired = false; - } - - bool HasCodec() const { - return Messages.empty() ? false : Messages.front().Codec.Defined(); - } - - bool Acquire() { - if (Acquired || Messages.empty()) - return false; - auto currSize = Data.size(); - Data.Append(Messages.back().DataRef.data(), Messages.back().DataRef.size()); - Messages.back().DataRef = TStringBuf(Data.data() + currSize, Data.size() - currSize); - Acquired = true; - return true; - } - - bool Empty() const noexcept { - return CurrentSize == 0 && Messages.empty(); - } - - void Reset() { - StartedAt = TInstant::Zero(); - Messages.clear(); - Data.Clear(); - Acquired = false; - CurrentSize = 0; - FlushRequested = false; - } - }; - - struct TBlock { - size_t Offset = 0; //!< First message sequence number in the block - size_t MessageCount = 0; - size_t PartNumber = 0; - size_t OriginalSize = 0; - size_t OriginalMemoryUsage = 0; - ui32 CodecID = static_cast<ui32>(ECodec::RAW); - mutable TVector<TStringBuf> OriginalDataRefs; - mutable TBuffer Data; - bool Compressed = false; - mutable bool Valid = true; - - TBlock& operator=(TBlock&&) = default; - TBlock(TBlock&&) = default; - TBlock() = default; - - //For taking ownership by copying from const object, f.e. lambda -> std::function, priority_queue - void Move(const TBlock& rhs) { - Offset = rhs.Offset; - MessageCount = rhs.MessageCount; - PartNumber = rhs.PartNumber; - OriginalSize = rhs.OriginalSize; - OriginalMemoryUsage = rhs.OriginalMemoryUsage; - CodecID = rhs.CodecID; - OriginalDataRefs.swap(rhs.OriginalDataRefs); - Data.Swap(rhs.Data); - Compressed = rhs.Compressed; - - rhs.Data.Clear(); - rhs.OriginalDataRefs.clear(); - } - }; - - struct TOriginalMessage { - ui64 SeqNo; - TInstant CreatedAt; - size_t Size; - TOriginalMessage(const ui64 sequenceNumber, const TInstant createdAt, const size_t size) - : SeqNo(sequenceNumber) - , CreatedAt(createdAt) - , Size(size) - {} - }; - - //! Block comparer, makes block with smallest offset (first sequence number) appear on top of the PackedMessagesToSend priority queue - struct Greater { - bool operator() (const TBlock& lhs, const TBlock& rhs) { - return lhs.Offset > rhs.Offset; - } - }; - - struct THandleResult { - bool DoRestart = false; - TDuration StartDelay = TDuration::Zero(); - bool DoStop = false; - bool DoSetSeqNo = false; - }; - struct TProcessSrvMessageResult { - THandleResult HandleResult; - TMaybe<ui64> InitSeqNo; - TVector<TWriteSessionEvent::TEvent> Events; - bool Ok = true; - }; - - THandleResult OnErrorImpl(NYdb::TPlainStatus&& status); // true - should Start(), false - should Close(), empty - no action - public: TWriteSession(const TWriteSessionSettings& settings, std::shared_ptr<TTopicClient::TImpl> client, @@ -309,126 +49,14 @@ public: ~TWriteSession(); // will not call close - destroy everything without acks private: - - TStringBuilder LogPrefix() const; - - void UpdateTokenIfNeededImpl(); - - void WriteInternal(TContinuationToken&& continuationToken, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize, - TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()); - - void FlushWriteIfRequiredImpl(); - size_t WriteBatchImpl(); void Start(const TDuration& delay); - void InitWriter(); - - void OnConnect(TPlainStatus&& st, typename IProcessor::TPtr&& processor, - const NGrpc::IQueueClientContextPtr& connectContext); - void OnConnectTimeout(const NGrpc::IQueueClientContextPtr& connectTimeoutContext); - void ResetForRetryImpl(); - THandleResult RestartImpl(const TPlainStatus& status); - void DoConnect(const TDuration& delay, const TString& endpoint); - void InitImpl(); - void ReadFromProcessor(); // Assumes that we're under lock. - void WriteToProcessorImpl(TClientMessage&& req); // Assumes that we're under lock. - void OnReadDone(NGrpc::TGrpcStatus&& grpcStatus, size_t connectionGeneration); - void OnWriteDone(NGrpc::TGrpcStatus&& status, size_t connectionGeneration); - TProcessSrvMessageResult ProcessServerMessageImpl(); - TMemoryUsageChange OnMemoryUsageChangedImpl(i64 diff); - void CompressImpl(TBlock&& block); - void OnCompressed(TBlock&& block, bool isSyncCompression=false); - TMemoryUsageChange OnCompressedImpl(TBlock&& block); - - //TString GetDebugIdentity() const; - TClientMessage GetInitClientMessage(); - bool CleanupOnAcknowledged(ui64 sequenceNumber); - bool IsReadyToSendNextImpl() const; - ui64 GetNextSeqNoImpl(const TMaybe<ui64>& seqNo); - void SendImpl(); - void AbortImpl(); - void CloseImpl(EStatus statusCode, NYql::TIssues&& issues); - void CloseImpl(EStatus statusCode, const TString& message); - void CloseImpl(TPlainStatus&& status); - - void OnErrorResolved() { - RetryState = nullptr; - } - void CheckHandleResultImpl(THandleResult& result); - void ProcessHandleResult(THandleResult& result); - void HandleWakeUpImpl(); - void UpdateTimedCountersImpl(); private: - TWriteSessionSettings Settings; - std::shared_ptr<TTopicClient::TImpl> Client; - std::shared_ptr<TGRpcConnectionsImpl> Connections; - TString TargetCluster; - TString InitialCluster; - TString CurrentCluster; - bool OnSeqNoShift = false; - TString PreferredClusterByCDS; - std::shared_ptr<IWriteSessionConnectionProcessorFactory> ConnectionFactory; - TDbDriverStatePtr DbDriverState; - TStringType PrevToken; - bool UpdateTokenInProgress = false; - TInstant LastTokenUpdate = TInstant::Zero(); - std::shared_ptr<TWriteSessionEventsQueue> EventsQueue; - NGrpc::IQueueClientContextPtr ClientContext; // Common client context. - NGrpc::IQueueClientContextPtr ConnectContext; - NGrpc::IQueueClientContextPtr ConnectTimeoutContext; - NGrpc::IQueueClientContextPtr ConnectDelayContext; - size_t ConnectionGeneration = 0; - size_t ConnectionAttemptsDone = 0; - TAdaptiveLock Lock; - IProcessor::TPtr Processor; - IRetryPolicy::IRetryState::TPtr RetryState; // Current retry state (if now we are (re)connecting). - std::shared_ptr<TServerMessage> ServerMessage; // Server message to write server response to. - - TString SessionId; - IExecutor::TPtr Executor; - IExecutor::TPtr CompressionExecutor; - size_t MemoryUsage = 0; //!< Estimated amount of memory used - - TMessageBatch CurrentBatch; - - std::queue<TOriginalMessage> OriginalMessagesToSend; - std::priority_queue<TBlock, std::vector<TBlock>, Greater> PackedMessagesToSend; - //! Messages that are sent but yet not acknowledged - std::queue<TOriginalMessage> SentOriginalMessages; - std::queue<TBlock> SentPackedMessage; - - const size_t MaxBlockSize = std::numeric_limits<size_t>::max(); - const size_t MaxBlockMessageCount = 1; //!< Max message count that can be packed into a single block. In block version 0 is equal to 1 for compatibility - bool Connected = false; - bool Started = false; - TAtomic Aborting = 0; - bool SessionEstablished = false; - ui32 PartitionId = 0; - ui64 LastSeqNo = 0; - ui64 MinUnsentSeqNo = 0; - ui64 SeqNoShift = 0; - TMaybe<bool> AutoSeqNoMode; - bool ValidateSeqNoMode = false; - - NThreading::TPromise<ui64> InitSeqNoPromise; - bool InitSeqNoSetDone = false; - TInstant SessionStartedTs; - TInstant LastCountersUpdateTs = TInstant::Zero(); - TInstant LastCountersLogTs; - TWriterCounters::TPtr Counters; - TDuration WakeupInterval; - -protected: - ui64 MessagesAcquired = 0; + std::shared_ptr<NPersQueue::TImplTracker> Tracker; + std::shared_ptr<TWriteSessionImpl> Impl; }; class TSimpleBlockingWriteSession : public ISimpleBlockingWriteSession { -private: - using TClientMessage = TWriteSession::TClientMessage; - using TServerMessage = TWriteSession::TServerMessage; - using IWriteSessionConnectionProcessorFactory = TWriteSession::IWriteSessionConnectionProcessorFactory; - using IProcessor = TWriteSession::IProcessor; - public: TSimpleBlockingWriteSession( const TWriteSessionSettings& settings, diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp new file mode 100644 index 0000000000..3af50d9986 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp @@ -0,0 +1,1179 @@ +#include "write_session.h" +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> +#include <library/cpp/string_utils/url/url.h> + +#include <google/protobuf/util/time_util.h> + +#include <util/generic/store_policy.h> +#include <util/generic/utility.h> +#include <util/stream/buffer.h> + + +namespace NYdb::NTopic { +using ::NMonitoring::TDynamicCounterPtr; +using TCounterPtr = ::NMonitoring::TDynamicCounters::TCounterPtr; + + +const TDuration UPDATE_TOKEN_PERIOD = TDuration::Hours(1); + +namespace NCompressionDetails { + THolder<IOutputStream> CreateCoder(ECodec codec, TBuffer& result, int quality); +} + +#define HISTOGRAM_SETUP ::NMonitoring::ExplicitHistogram({0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100}) +TWriterCounters::TWriterCounters(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters) { + Errors = counters->GetCounter("errors", true); + CurrentSessionLifetimeMs = counters->GetCounter("currentSessionLifetimeMs", false); + BytesWritten = counters->GetCounter("bytesWritten", true); + MessagesWritten = counters->GetCounter("messagesWritten", true); + BytesWrittenCompressed = counters->GetCounter("bytesWrittenCompressed", true); + BytesInflightUncompressed = counters->GetCounter("bytesInflightUncompressed", false); + BytesInflightCompressed = counters->GetCounter("bytesInflightCompressed", false); + BytesInflightTotal = counters->GetCounter("bytesInflightTotal", false); + MessagesInflight = counters->GetCounter("messagesInflight", false); + + TotalBytesInflightUsageByTime = counters->GetHistogram("totalBytesInflightUsageByTime", HISTOGRAM_SETUP); + UncompressedBytesInflightUsageByTime = counters->GetHistogram("uncompressedBytesInflightUsageByTime", HISTOGRAM_SETUP); + CompressedBytesInflightUsageByTime = counters->GetHistogram("compressedBytesInflightUsageByTime", HISTOGRAM_SETUP); +} +#undef HISTOGRAM_SETUP + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// TWriteSessionImpl + +TWriteSessionImpl::TWriteSessionImpl( + const TWriteSessionSettings& settings, + std::shared_ptr<TTopicClient::TImpl> client, + std::shared_ptr<TGRpcConnectionsImpl> connections, + TDbDriverStatePtr dbDriverState, + std::shared_ptr<NPersQueue::TImplTracker> tracker) + : Settings(settings) + , Client(std::move(client)) + , Connections(std::move(connections)) + , DbDriverState(std::move(dbDriverState)) + , PrevToken(DbDriverState->CredentialsProvider ? DbDriverState->CredentialsProvider->GetAuthInfo() : "") + , Tracker(tracker) + , EventsQueue(std::make_shared<TWriteSessionEventsQueue>(Settings, Tracker)) + , InitSeqNoPromise(NThreading::NewPromise<ui64>()) + , WakeupInterval( + Settings.BatchFlushInterval_.GetOrElse(TDuration::Zero()) ? + std::min(Settings.BatchFlushInterval_.GetOrElse(TDuration::Seconds(1)) / 5, TDuration::MilliSeconds(100)) + : + TDuration::MilliSeconds(100) + ) +{ + if (!Settings.RetryPolicy_) { + Settings.RetryPolicy_ = IRetryPolicy::GetDefaultPolicy(); + } + if (Settings.Counters_.Defined()) { + Counters = *Settings.Counters_; + } else { + Counters = MakeIntrusive<TWriterCounters>(new ::NMonitoring::TDynamicCounters()); + } + +} + +void TWriteSessionImpl::Start(const TDuration& delay) { + ++ConnectionAttemptsDone; + if (!Started) { + with_lock(Lock) { + HandleWakeUpImpl(); + } + InitWriter(); + } + Started = true; + + DoConnect(delay, DbDriverState->DiscoveryEndpoint); +} + +TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStatus& status) { + Y_VERIFY(Lock.IsLocked()); + + THandleResult result; + if (AtomicGet(Aborting)) { + DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session is aborting and will not restart"); + return result; + } + DbDriverState->Log.Write( + TLOG_INFO, + LogPrefix() << "Got error. Status: " << status.Status + << ". Description: " << NPersQueue::IssuesSingleLineString(status.Issues) + ); + SessionEstablished = false; + TMaybe<TDuration> nextDelay = TDuration::Zero(); + if (!RetryState) { + RetryState = Settings.RetryPolicy_->CreateRetryState(); + } + nextDelay = RetryState->GetNextRetryDelay(status.Status); + + if (nextDelay) { + result.StartDelay = *nextDelay; + result.DoRestart = true; + DbDriverState->Log.Write( + TLOG_DEBUG, + LogPrefix() << "Write session will restart in " << result.StartDelay.MilliSeconds() << " ms" + ); + ResetForRetryImpl(); + + } else { + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will not restart after a fatal error"); + result.DoStop = true; + CheckHandleResultImpl(result); + } + return result; +} + +void TWriteSessionImpl::InitWriter() { // No Lock, very initial start - no race yet as well. + CompressionExecutor = Settings.CompressionExecutor_; + IExecutor::TPtr executor; + executor = CreateSyncExecutor(); + executor->Start(); + Executor = std::move(executor); + + Settings.CompressionExecutor_->Start(); + Settings.EventHandlers_.HandlersExecutor_->Start(); + +} +// Client method +NThreading::TFuture<ui64> TWriteSessionImpl::GetInitSeqNo() { + if (Settings.ValidateSeqNo_) { + if (AutoSeqNoMode.Defined() && *AutoSeqNoMode) { + DbDriverState->Log.Write(TLOG_ERR, LogPrefix() << "Cannot call GetInitSeqNo in Auto SeqNo mode"); + ThrowFatalError("Cannot call GetInitSeqNo in Auto SeqNo mode"); + } + else + AutoSeqNoMode = false; + } + return InitSeqNoPromise.GetFuture(); +} + +TString DebugString(const TWriteSessionEvent::TEvent& event) { + return std::visit([](const auto& ev) { return ev.DebugString(); }, event); +} + +// Client method +TMaybe<TWriteSessionEvent::TEvent> TWriteSessionImpl::GetEvent(bool block) { + return EventsQueue->GetEvent(block); +} + +// Client method +TVector<TWriteSessionEvent::TEvent> TWriteSessionImpl::GetEvents(bool block, TMaybe<size_t> maxEventsCount) { + return EventsQueue->GetEvents(block, maxEventsCount); +} + +ui64 TWriteSessionImpl::GetNextSeqNoImpl(const TMaybe<ui64>& seqNo) { + Y_VERIFY(Lock.IsLocked()); + + ui64 seqNoValue = LastSeqNo + 1; + if (!AutoSeqNoMode.Defined()) { + AutoSeqNoMode = !seqNo.Defined(); + //! Disable SeqNo shift for manual SeqNo mode; + if (seqNo.Defined()) { + OnSeqNoShift = false; + SeqNoShift = 0; + } + } + if (seqNo.Defined()) { + if (*AutoSeqNoMode) { + DbDriverState->Log.Write( + TLOG_ERR, + LogPrefix() << "Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode" + ); + ThrowFatalError( + "Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode" + ); + } else { + seqNoValue = *seqNo; + } + } else if (!(*AutoSeqNoMode)) { + DbDriverState->Log.Write( + TLOG_ERR, + LogPrefix() << "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode" + ); + ThrowFatalError( + "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode" + ); + } + LastSeqNo = seqNoValue; + return seqNoValue; +} +inline void TWriteSessionImpl::CheckHandleResultImpl(THandleResult& result) { + Y_VERIFY(Lock.IsLocked()); + + result.DoSetSeqNo = result.DoStop && !InitSeqNoSetDone && (InitSeqNoSetDone = true); +} + +void TWriteSessionImpl::ProcessHandleResult(THandleResult& result) { + if (result.DoRestart) { + Start(result.StartDelay); + } else if (result.DoSetSeqNo) { + InitSeqNoPromise.SetException("session closed"); + } +} + +NThreading::TFuture<void> TWriteSessionImpl::WaitEvent() { + return EventsQueue->WaitEvent(); +} + +// Client method. +void TWriteSessionImpl::WriteInternal( + TContinuationToken&&, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp + ) { + TInstant createdAtValue = createTimestamp.Defined() ? *createTimestamp : TInstant::Now(); + bool readyToAccept = false; + size_t bufferSize = data.size(); + with_lock(Lock) { + CurrentBatch.Add(GetNextSeqNoImpl(seqNo), createdAtValue, data, codec, originalSize); + + FlushWriteIfRequiredImpl(); + readyToAccept = OnMemoryUsageChangedImpl(bufferSize).NowOk; + } + if (readyToAccept) { + EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); + } +} + +// Client method. +void TWriteSessionImpl::WriteEncoded( + TContinuationToken&& token, TStringBuf data, ECodec codec, ui32 originalSize, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp + ) { + WriteInternal(std::move(token), data, codec, originalSize, seqNo, createTimestamp); +} + +void TWriteSessionImpl::Write( + TContinuationToken&& token, TStringBuf data, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp + ) { + WriteInternal(std::move(token), data, {}, 0, seqNo, createTimestamp); +} + + +TWriteSessionImpl::THandleResult TWriteSessionImpl::OnErrorImpl(NYdb::TPlainStatus&& status) { + Y_VERIFY(Lock.IsLocked()); + + (*Counters->Errors)++; + auto result = RestartImpl(status); + if (result.DoStop) { + CloseImpl(status.Status, std::move(status.Issues)); + } + return result; +} + +// No lock +void TWriteSessionImpl::DoConnect(const TDuration& delay, const TString& endpoint) { + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Start write session. Will connect to endpoint: " << endpoint); + + NGrpc::IQueueClientContextPtr prevConnectContext; + NGrpc::IQueueClientContextPtr prevConnectTimeoutContext; + NGrpc::IQueueClientContextPtr prevConnectDelayContext; + NGrpc::IQueueClientContextPtr connectContext = nullptr; + NGrpc::IQueueClientContextPtr connectDelayContext = nullptr; + NGrpc::IQueueClientContextPtr connectTimeoutContext = nullptr; + TRpcRequestSettings reqSettings; + std::shared_ptr<IWriteSessionConnectionProcessorFactory> connectionFactory; + + // Callbacks + std::function<void(TPlainStatus&&, typename IProcessor::TPtr&&)> connectCallback; + std::function<void(bool)> connectTimeoutCallback; + + with_lock(Lock) { + if (Aborting) { + return; + } + ++ConnectionGeneration; + auto subclient = Client; + connectionFactory = subclient->CreateWriteSessionConnectionProcessorFactory(); + auto clientContext = subclient->CreateContext(); + ConnectionFactory = connectionFactory; + + ClientContext = std::move(clientContext); + ServerMessage = std::make_shared<TServerMessage>(); + + if (!ClientContext) { + AbortImpl(); + // Grpc and WriteSession is closing right now. + return; + } + + connectContext = ClientContext->CreateContext(); + if (delay) + connectDelayContext = ClientContext->CreateContext(); + connectTimeoutContext = ClientContext->CreateContext(); + + // Previous operations contexts. + + // Set new context + prevConnectContext = std::exchange(ConnectContext, connectContext); + prevConnectTimeoutContext = std::exchange(ConnectTimeoutContext, connectTimeoutContext); + prevConnectDelayContext = std::exchange(ConnectDelayContext, connectDelayContext); + Y_ASSERT(ConnectContext); + Y_ASSERT(ConnectTimeoutContext); + + // Cancel previous operations. + NPersQueue::Cancel(prevConnectContext); + if (prevConnectDelayContext) + NPersQueue::Cancel(prevConnectDelayContext); + NPersQueue::Cancel(prevConnectTimeoutContext); + Y_ASSERT(connectContext); + Y_ASSERT(connectTimeoutContext); + reqSettings = TRpcRequestSettings::Make(Settings); + + connectCallback = [sharedThis = shared_from_this(), + wire = Tracker->MakeTrackedWire(), + connectContext = connectContext] + (TPlainStatus&& st, typename IProcessor::TPtr&& processor) { + sharedThis->OnConnect(std::move(st), std::move(processor), connectContext); + }; + + connectTimeoutCallback = [sharedThis = shared_from_this(), + wire = Tracker->MakeTrackedWire(), + connectTimeoutContext = connectTimeoutContext] + (bool ok) { + if (ok) { + sharedThis->OnConnectTimeout(connectTimeoutContext); + } + }; + } + + connectionFactory->CreateProcessor( + std::move(connectCallback), + reqSettings, + std::move(connectContext), + TDuration::Seconds(30) /* connect timeout */, // TODO: make connect timeout setting. + std::move(connectTimeoutContext), + std::move(connectTimeoutCallback), + delay, + std::move(connectDelayContext) + ); +} + +// RPC callback. +void TWriteSessionImpl::OnConnectTimeout(const NGrpc::IQueueClientContextPtr& connectTimeoutContext) { + DbDriverState->Log.Write(TLOG_ERR, LogPrefix() << "Write session: connect timeout"); + THandleResult handleResult; + with_lock (Lock) { + if (ConnectTimeoutContext == connectTimeoutContext) { + NPersQueue::Cancel(ConnectContext); + ConnectContext = nullptr; + ConnectTimeoutContext = nullptr; + ConnectDelayContext = nullptr; + } else { + return; + } + TStringBuilder description; + description << "Failed to establish connection to server. Attempts done: " << ConnectionAttemptsDone; + handleResult = RestartImpl(TPlainStatus(EStatus::TIMEOUT, description)); + if (handleResult.DoStop) { + CloseImpl( + EStatus::TIMEOUT, + description + ); + } + } + ProcessHandleResult(handleResult); +} + +// RPC callback. +void TWriteSessionImpl::OnConnect( + TPlainStatus&& st, typename IProcessor::TPtr&& processor, const NGrpc::IQueueClientContextPtr& connectContext +) { + THandleResult handleResult; + with_lock (Lock) { + if (ConnectContext == connectContext) { + NPersQueue::Cancel(ConnectTimeoutContext); + ConnectContext = nullptr; + ConnectTimeoutContext = nullptr; + ConnectDelayContext = nullptr; + + if (st.Ok()) { + Processor = std::move(processor); + InitImpl(); + // Still should call ReadFromProcessor(); + } + } else { + return; + } + if (!st.Ok()) { + handleResult = RestartImpl(st); + if (handleResult.DoStop) { + CloseImpl( + st.Status, + NPersQueue::MakeIssueWithSubIssues( + TStringBuilder() << "Failed to establish connection to server \"" << st.Endpoint + << "\". Attempts done: " << ConnectionAttemptsDone, + st.Issues + ) + ); + } + } + } + if (st.Ok()) + ReadFromProcessor(); // Out of Init + ProcessHandleResult(handleResult); +} + +// Produce init request for session. +void TWriteSessionImpl::InitImpl() { + Y_VERIFY(Lock.IsLocked()); + + TClientMessage req; + auto* init = req.mutable_init_request(); + init->set_path(Settings.Path_); + init->set_producer_id(Settings.ProducerId_); + init->set_message_group_id(Settings.MessageGroupId_); + + for (const auto& attr : Settings.Meta_.Fields) { + (*init->mutable_write_session_meta())[attr.first] = attr.second; + } + DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: send init request: "<< req.ShortDebugString()); + WriteToProcessorImpl(std::move(req)); +} + +// Called under lock. Invokes Processor->Write, which is assumed to be deadlock-safe +void TWriteSessionImpl::WriteToProcessorImpl(TWriteSessionImpl::TClientMessage&& req) { + Y_VERIFY(Lock.IsLocked()); + + Y_ASSERT(Processor); + if (Aborting) { + return; + } + auto callback = [sharedThis = shared_from_this(), + wire = Tracker->MakeTrackedWire(), + connectionGeneration = ConnectionGeneration](NGrpc::TGrpcStatus&& grpcStatus) { + sharedThis->OnWriteDone(std::move(grpcStatus), connectionGeneration); + }; + + Processor->Write(std::move(req), callback); +} + +void TWriteSessionImpl::ReadFromProcessor() { + Y_ASSERT(Processor); + IProcessor::TPtr prc; + ui64 generation; + std::function<void(NGrpc::TGrpcStatus&&)> callback; + with_lock(Lock) { + if (Aborting) { + return; + } + prc = Processor; + generation = ConnectionGeneration; + callback = [sharedThis = shared_from_this(), + wire = Tracker->MakeTrackedWire(), + connectionGeneration = generation, + processor = prc, + serverMessage = ServerMessage] + (NGrpc::TGrpcStatus&& grpcStatus) { + sharedThis->OnReadDone(std::move(grpcStatus), connectionGeneration); + }; + } + prc->Read(ServerMessage.get(), std::move(callback)); +} + +void TWriteSessionImpl::OnWriteDone(NGrpc::TGrpcStatus&& status, size_t connectionGeneration) { + THandleResult handleResult; + with_lock (Lock) { + if (connectionGeneration != ConnectionGeneration) { + return; // Message from previous connection. Ignore. + } + if (Aborting) { + return; + } + if(!status.Ok()) { + handleResult = OnErrorImpl(status); + } + } + ProcessHandleResult(handleResult); +} + +void TWriteSessionImpl::OnReadDone(NGrpc::TGrpcStatus&& grpcStatus, size_t connectionGeneration) { + TPlainStatus errorStatus; + TProcessSrvMessageResult processResult; + bool needSetValue = false; + if (!grpcStatus.Ok()) { + errorStatus = TPlainStatus(std::move(grpcStatus)); + } + bool doRead = false; + with_lock (Lock) { + UpdateTimedCountersImpl(); + if (connectionGeneration != ConnectionGeneration) { + return; // Message from previous connection. Ignore. + } + if (errorStatus.Ok()) { + if (NPersQueue::IsErrorMessage(*ServerMessage)) { + errorStatus = NPersQueue::MakeErrorFromProto(*ServerMessage); + } else { + processResult = ProcessServerMessageImpl(); + needSetValue = !InitSeqNoSetDone && processResult.InitSeqNo.Defined() && (InitSeqNoSetDone = true); + if (errorStatus.Ok() && processResult.Ok) { + doRead = true; + } + } + } + } + if (doRead) + ReadFromProcessor(); + + with_lock(Lock) { + if (!errorStatus.Ok()) { + if (processResult.Ok) { // Otherwise, OnError was already called + processResult.HandleResult = RestartImpl(errorStatus); + } + } + if (processResult.HandleResult.DoStop) { + CloseImpl(std::move(errorStatus)); + } + } + for (auto& event : processResult.Events) { + EventsQueue->PushEvent(std::move(event)); + } + if (needSetValue) { + InitSeqNoPromise.SetValue(*processResult.InitSeqNo); + processResult.HandleResult.DoSetSeqNo = false; // Redundant. Just in case. + } + ProcessHandleResult(processResult.HandleResult); +} + +TStringBuilder TWriteSessionImpl::LogPrefix() const { + return TStringBuilder() << "ProducerId [" << Settings.ProducerId_ << "] MessageGroupId [" << Settings.MessageGroupId_ << "] SessionId [" << SessionId << "] "; +} + +TString TWriteSessionEvent::TAcksEvent::DebugString() const { + TStringBuilder res; + res << "AcksEvent:"; + for (auto& ack : Acks) { + res << " { seqNo : " << ack.SeqNo << ", State : " << ack.State; + if (ack.Details) { + res << ", offset : " << ack.Details->Offset << ", partitionId : " << ack.Details->PartitionId; + } + res << " }"; + } + if (!Acks.empty() && Acks.back().Stat) { + auto& stat = Acks.back().Stat; + res << " write stat: Write time " << stat->WriteTime + << " minimal time in partition queue " << stat->MinTimeInPartitionQueue + << " maximal time in partition queue " << stat->MaxTimeInPartitionQueue + << " partition quoted time " << stat->PartitionQuotedTime + << " topic quoted time " << stat->TopicQuotedTime; + } + return res; +} + +TString TWriteSessionEvent::TReadyToAcceptEvent::DebugString() const { + return "ReadyToAcceptEvent"; +} + + +TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMessageImpl() { + Y_VERIFY(Lock.IsLocked()); + + TProcessSrvMessageResult result; + switch (ServerMessage->GetServerMessageCase()) { + case TServerMessage::SERVER_MESSAGE_NOT_SET: { + SessionEstablished = false; + result.HandleResult = OnErrorImpl({ + static_cast<NYdb::EStatus>(ServerMessage->status()), + {NYql::TIssue{ServerMessage->DebugString()}} + }); + result.Ok = false; + break; + } + case TServerMessage::kInitResponse: { + const auto& initResponse = ServerMessage->init_response(); + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session established. Init response: " << initResponse.ShortDebugString()); + SessionId = initResponse.session_id(); + PartitionId = initResponse.partition_id(); + ui64 newLastSeqNo = initResponse.last_seq_no(); + // SeqNo increased, so there's a risk of loss, apply SeqNo shift. + // MinUnsentSeqNo must be > 0 if anything was ever sent yet + if (MinUnsentSeqNo && OnSeqNoShift && newLastSeqNo > MinUnsentSeqNo) { + SeqNoShift = newLastSeqNo - MinUnsentSeqNo; + } + result.InitSeqNo = newLastSeqNo; + LastSeqNo = newLastSeqNo; + + SessionEstablished = true; + LastCountersUpdateTs = TInstant::Now(); + SessionStartedTs = TInstant::Now(); + OnErrorResolved(); + + //EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); + result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); + // Kickstart send after session reestablishment + SendImpl(); + break; + } + case TServerMessage::kWriteResponse: { + TWriteSessionEvent::TAcksEvent acksEvent; + const auto& batchWriteResponse = ServerMessage->write_response(); + DbDriverState->Log.Write( + TLOG_DEBUG, + LogPrefix() << "Write session got write response: " << batchWriteResponse.ShortDebugString() + ); + TWriteStat::TPtr writeStat = new TWriteStat{}; + const auto& stat = batchWriteResponse.write_statistics(); + + auto durationConv = [](const ::google::protobuf::Duration& dur) { + return TDuration::MilliSeconds(::google::protobuf::util::TimeUtil::DurationToMilliseconds(dur)); + }; + + writeStat->WriteTime = durationConv(stat.persisting_time()); + writeStat->MinTimeInPartitionQueue = durationConv(stat.min_queue_wait_time()); + writeStat->MaxTimeInPartitionQueue = durationConv(stat.max_queue_wait_time()); + writeStat->PartitionQuotedTime = durationConv(stat.partition_quota_wait_time()); + writeStat->TopicQuotedTime = durationConv(stat.topic_quota_wait_time()); + + for (size_t messageIndex = 0, endIndex = batchWriteResponse.acks_size(); messageIndex != endIndex; ++messageIndex) { + // TODO: Fill writer statistics + auto ack = batchWriteResponse.acks(messageIndex); + ui64 sequenceNumber = ack.seq_no(); + + Y_VERIFY(ack.has_written() || ack.has_skipped()); + auto msgWriteStatus = ack.has_written() + ? TWriteSessionEvent::TWriteAck::EES_WRITTEN + : (ack.skipped().reason() == Ydb::Topic::StreamWriteMessage_WriteResponse_WriteAck_Skipped_Reason::StreamWriteMessage_WriteResponse_WriteAck_Skipped_Reason_REASON_ALREADY_WRITTEN + ? TWriteSessionEvent::TWriteAck::EES_ALREADY_WRITTEN + : TWriteSessionEvent::TWriteAck::EES_DISCARDED); + + ui64 offset = ack.has_written() ? ack.written().offset() : 0; + + acksEvent.Acks.push_back(TWriteSessionEvent::TWriteAck{ + sequenceNumber - SeqNoShift, + msgWriteStatus, + TWriteSessionEvent::TWriteAck::TWrittenMessageDetails { + offset, + PartitionId, + }, + writeStat, + }); + + if (CleanupOnAcknowledged(sequenceNumber - SeqNoShift)) { + result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); + } + } + //EventsQueue->PushEvent(std::move(acksEvent)); + result.Events.emplace_back(std::move(acksEvent)); + break; + } + case TServerMessage::kUpdateTokenResponse: { + UpdateTokenInProgress = false; + DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: token updated successfully"); + UpdateTokenIfNeededImpl(); + break; + } + } + return result; +} + +bool TWriteSessionImpl::CleanupOnAcknowledged(ui64 sequenceNumber) { + bool result = false; + DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: acknoledged message " << sequenceNumber); + UpdateTimedCountersImpl(); + const auto& sentFront = SentOriginalMessages.front(); + ui64 size = 0; + ui64 compressedSize = 0; + if(!SentPackedMessage.empty() && SentPackedMessage.front().Offset == sequenceNumber) { + auto memoryUsage = OnMemoryUsageChangedImpl(-SentPackedMessage.front().Data.size()); + result = memoryUsage.NowOk && !memoryUsage.WasOk; + //EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); + const auto& front = SentPackedMessage.front(); + if (front.Compressed) { + compressedSize = front.Data.size(); + } else { + size = front.Data.size(); + } + + (*Counters->MessagesWritten) += front.MessageCount; + (*Counters->MessagesInflight) -= front.MessageCount; + (*Counters->BytesWritten) += front.OriginalSize; + + SentPackedMessage.pop(); + } else { + size = sentFront.Size; + (*Counters->BytesWritten) += sentFront.Size; + (*Counters->MessagesWritten)++; + (*Counters->MessagesInflight)--; + } + + (*Counters->BytesInflightCompressed) -= compressedSize; + (*Counters->BytesWrittenCompressed) += compressedSize; + (*Counters->BytesInflightUncompressed) -= size; + + Y_VERIFY(Counters->BytesInflightCompressed->Val() >= 0); + Y_VERIFY(Counters->BytesInflightUncompressed->Val() >= 0); + + Y_VERIFY(sentFront.SeqNo == sequenceNumber); + + (*Counters->BytesInflightTotal) = MemoryUsage; + SentOriginalMessages.pop(); + return result; +} + +TMemoryUsageChange TWriteSessionImpl::OnMemoryUsageChangedImpl(i64 diff) { + Y_VERIFY(Lock.IsLocked()); + + bool wasOk = MemoryUsage <= Settings.MaxMemoryUsage_; + //if (diff < 0) { + // Y_VERIFY(MemoryUsage >= static_cast<size_t>(std::abs(diff))); + //} + MemoryUsage += diff; + bool nowOk = MemoryUsage <= Settings.MaxMemoryUsage_; + if (wasOk != nowOk) { + if (wasOk) { + DbDriverState->Log.Write( + TLOG_DEBUG, + LogPrefix() << "Estimated memory usage " << MemoryUsage + << "[B] reached maximum (" << Settings.MaxMemoryUsage_ << "[B])" + ); + } + else { + DbDriverState->Log.Write( + TLOG_DEBUG, + LogPrefix() << "Estimated memory usage got back to normal " << MemoryUsage << "[B]" + ); + } + } + return {wasOk, nowOk}; +} + +TBuffer CompressBuffer(TVector<TStringBuf>& data, ECodec codec, i32 level) { + TBuffer result; + THolder<IOutputStream> coder = NCompressionDetails::CreateCoder(codec, result, level); + for (auto& buffer : data) { + coder->Write(buffer.data(), buffer.size()); + } + coder->Finish(); + return result; +} + +// May call OnCompressed with sync executor. No external lock. +void TWriteSessionImpl::CompressImpl(TBlock&& block_) { + Y_VERIFY(Lock.IsLocked()); + + if (Aborting) { + return; + } + Y_VERIFY(block_.Valid); + + std::shared_ptr<TBlock> blockPtr(std::make_shared<TBlock>()); + blockPtr->Move(block_); + auto lambda = [sharedThis = shared_from_this(), + wire = Tracker->MakeTrackedWire(), + codec = Settings.Codec_, + level = Settings.CompressionLevel_, + isSyncCompression = !CompressionExecutor->IsAsync(), + blockPtr]() mutable { + Y_VERIFY(!blockPtr->Compressed); + + auto compressedData = CompressBuffer( + blockPtr->OriginalDataRefs, codec, level + ); + Y_VERIFY(!compressedData.Empty()); + blockPtr->Data = std::move(compressedData); + blockPtr->Compressed = true; + blockPtr->CodecID = static_cast<ui32>(sharedThis->Settings.Codec_); + sharedThis->OnCompressed(std::move(*blockPtr), isSyncCompression); + }; + + CompressionExecutor->Post(lambda); +} + +void TWriteSessionImpl::OnCompressed(TBlock&& block, bool isSyncCompression) { + TMemoryUsageChange memoryUsage; + if (!isSyncCompression) { + with_lock(Lock) { + memoryUsage = OnCompressedImpl(std::move(block)); + } + } else { + memoryUsage = OnCompressedImpl(std::move(block)); + } + if (memoryUsage.NowOk && !memoryUsage.WasOk) { + EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); + } +} + +TMemoryUsageChange TWriteSessionImpl::OnCompressedImpl(TBlock&& block) { + Y_VERIFY(Lock.IsLocked()); + + UpdateTimedCountersImpl(); + Y_VERIFY(block.Valid); + auto memoryUsage = OnMemoryUsageChangedImpl(static_cast<i64>(block.Data.size()) - block.OriginalMemoryUsage); + (*Counters->BytesInflightUncompressed) -= block.OriginalSize; + (*Counters->BytesInflightCompressed) += block.Data.size(); + + PackedMessagesToSend.emplace(std::move(block)); + SendImpl(); + return memoryUsage; +} + +void TWriteSessionImpl::ResetForRetryImpl() { + Y_VERIFY(Lock.IsLocked()); + + SessionEstablished = false; + const size_t totalPackedMessages = PackedMessagesToSend.size() + SentPackedMessage.size(); + const size_t totalOriginalMessages = OriginalMessagesToSend.size() + SentOriginalMessages.size(); + while (!SentPackedMessage.empty()) { + PackedMessagesToSend.emplace(std::move(SentPackedMessage.front())); + SentPackedMessage.pop(); + } + ui64 minSeqNo = PackedMessagesToSend.empty() ? LastSeqNo + 1 : PackedMessagesToSend.top().Offset; + std::queue<TOriginalMessage> freshOriginalMessagesToSend; + OriginalMessagesToSend.swap(freshOriginalMessagesToSend); + while (!SentOriginalMessages.empty()) { + OriginalMessagesToSend.emplace(std::move(SentOriginalMessages.front())); + SentOriginalMessages.pop(); + } + while (!freshOriginalMessagesToSend.empty()) { + OriginalMessagesToSend.emplace(std::move(freshOriginalMessagesToSend.front())); + freshOriginalMessagesToSend.pop(); + } + if (!OriginalMessagesToSend.empty() && OriginalMessagesToSend.front().SeqNo < minSeqNo) + minSeqNo = OriginalMessagesToSend.front().SeqNo; + MinUnsentSeqNo = minSeqNo; + Y_VERIFY(PackedMessagesToSend.size() == totalPackedMessages); + Y_VERIFY(OriginalMessagesToSend.size() == totalOriginalMessages); +} + +// Called from client Write() methods +void TWriteSessionImpl::FlushWriteIfRequiredImpl() { + Y_VERIFY(Lock.IsLocked()); + + if (!CurrentBatch.Empty() && !CurrentBatch.FlushRequested) { + MessagesAcquired += static_cast<ui64>(CurrentBatch.Acquire()); + if (TInstant::Now() - CurrentBatch.StartedAt >= Settings.BatchFlushInterval_.GetOrElse(TDuration::Zero()) + || CurrentBatch.CurrentSize >= Settings.BatchFlushSizeBytes_.GetOrElse(0) + || CurrentBatch.CurrentSize >= MaxBlockSize + || CurrentBatch.Messages.size() >= MaxBlockMessageCount + || CurrentBatch.HasCodec() + ) { + WriteBatchImpl(); + return; + } + } +} + +size_t TWriteSessionImpl::WriteBatchImpl() { + Y_VERIFY(Lock.IsLocked()); + + DbDriverState->Log.Write( + TLOG_DEBUG, + LogPrefix() << "write " << CurrentBatch.Messages.size() << " messages with seqNo from " + << CurrentBatch.Messages.begin()->SeqNo << " to " << CurrentBatch.Messages.back().SeqNo + ); + + Y_VERIFY(CurrentBatch.Messages.size() <= MaxBlockMessageCount); + + const bool skipCompression = Settings.Codec_ == ECodec::RAW || CurrentBatch.HasCodec(); + if (!skipCompression && Settings.CompressionExecutor_->IsAsync()) { + MessagesAcquired += static_cast<ui64>(CurrentBatch.Acquire()); + } + + size_t size = 0; + for (size_t i = 0; i != CurrentBatch.Messages.size();) { + TBlock block{}; + for (; block.OriginalSize < MaxBlockSize && i != CurrentBatch.Messages.size(); ++i) { + auto sequenceNumber = CurrentBatch.Messages[i].SeqNo; + auto createTs = CurrentBatch.Messages[i].CreatedAt; + + if (!block.MessageCount) { + block.Offset = sequenceNumber; + } + + block.MessageCount += 1; + const auto& datum = CurrentBatch.Messages[i].DataRef; + block.OriginalSize += datum.size(); + block.OriginalMemoryUsage = CurrentBatch.Data.size(); + block.OriginalDataRefs.emplace_back(datum); + if (CurrentBatch.Messages[i].Codec.Defined()) { + Y_VERIFY(CurrentBatch.Messages.size() == 1); + block.CodecID = static_cast<ui32>(*CurrentBatch.Messages[i].Codec); + block.OriginalSize = CurrentBatch.Messages[i].OriginalSize; + block.Compressed = false; + } + size += datum.size(); + UpdateTimedCountersImpl(); + (*Counters->BytesInflightUncompressed) += datum.size(); + (*Counters->MessagesInflight)++; + OriginalMessagesToSend.emplace(sequenceNumber, createTs, datum.size()); + } + block.Data = std::move(CurrentBatch.Data); + if (skipCompression) { + PackedMessagesToSend.emplace(std::move(block)); + } else { + CompressImpl(std::move(block)); + } + } + CurrentBatch.Reset(); + if (skipCompression) { + SendImpl(); + } + return size; +} + +size_t GetMaxGrpcMessageSize() { + return 120_MB; +} + +bool TWriteSessionImpl::IsReadyToSendNextImpl() const { + Y_VERIFY(Lock.IsLocked()); + + if (!SessionEstablished) { + return false; + } + if (Aborting) + return false; + if (PackedMessagesToSend.empty()) { + return false; + } + Y_VERIFY(!OriginalMessagesToSend.empty(), "There are packed messages but no original messages"); + Y_VERIFY(OriginalMessagesToSend.front().SeqNo <= PackedMessagesToSend.top().Offset, "Lost original message(s)"); + + return PackedMessagesToSend.top().Offset == OriginalMessagesToSend.front().SeqNo; +} + + +void TWriteSessionImpl::UpdateTokenIfNeededImpl() { + Y_VERIFY(Lock.IsLocked()); + + DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: try to update token"); + + if (!DbDriverState->CredentialsProvider || UpdateTokenInProgress || !SessionEstablished) + return; + TClientMessage clientMessage; + auto* updateRequest = clientMessage.mutable_update_token_request(); + auto token = DbDriverState->CredentialsProvider->GetAuthInfo(); + if (token == PrevToken) + return; + UpdateTokenInProgress = true; + updateRequest->set_token(token); + PrevToken = token; + + DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: updating token"); + + Processor->Write(std::move(clientMessage)); +} + +void TWriteSessionImpl::SendImpl() { + Y_VERIFY(Lock.IsLocked()); + + // External cycle splits ready blocks into multiple gRPC messages. Current gRPC message size hard limit is 64MiB + while(IsReadyToSendNextImpl()) { + TClientMessage clientMessage; + auto* writeRequest = clientMessage.mutable_write_request(); + + // Sent blocks while we can without messages reordering + while (IsReadyToSendNextImpl() && clientMessage.ByteSizeLong() < GetMaxGrpcMessageSize()) { + const auto& block = PackedMessagesToSend.top(); + Y_VERIFY(block.Valid); + writeRequest->set_codec(static_cast<i32>(block.CodecID)); + Y_VERIFY(block.MessageCount == 1); + for (size_t i = 0; i != block.MessageCount; ++i) { + Y_VERIFY(!OriginalMessagesToSend.empty()); + + auto& message = OriginalMessagesToSend.front(); + + auto* msgData = writeRequest->add_messages(); + + + msgData->set_seq_no(message.SeqNo + SeqNoShift); + *msgData->mutable_created_at() = ::google::protobuf::util::TimeUtil::MillisecondsToTimestamp(message.CreatedAt.MilliSeconds()); + + SentOriginalMessages.emplace(std::move(message)); + OriginalMessagesToSend.pop(); + + msgData->set_uncompressed_size(block.OriginalSize); + if (block.Compressed) + msgData->set_data(block.Data.data(), block.Data.size()); + else { + for (auto& buffer: block.OriginalDataRefs) { + msgData->set_data(buffer.data(), buffer.size()); + } + } + } + + + TBlock moveBlock; + moveBlock.Move(block); + SentPackedMessage.emplace(std::move(moveBlock)); + PackedMessagesToSend.pop(); + } + UpdateTokenIfNeededImpl(); + DbDriverState->Log.Write( + TLOG_DEBUG, + LogPrefix() << "Send " << writeRequest->messages_size() << " message(s) (" + << OriginalMessagesToSend.size() << " left), first sequence number is " + << writeRequest->messages(0).seq_no() + ); + Processor->Write(std::move(clientMessage)); + } +} + +// Client method, no Lock +bool TWriteSessionImpl::Close(TDuration closeTimeout) { + if (AtomicGet(Aborting)) + return false; + DbDriverState->Log.Write( + TLOG_INFO, + LogPrefix() << "Write session: close. Timeout = " << closeTimeout.MilliSeconds() << " ms" + ); + auto startTime = TInstant::Now(); + auto remaining = closeTimeout; + bool ready = false; + bool needSetSeqNoValue = false; + while (remaining > TDuration::Zero()) { + with_lock(Lock) { + if (OriginalMessagesToSend.empty() && SentOriginalMessages.empty()) { + ready = true; + } + if (AtomicGet(Aborting)) + break; + } + if (ready) { + break; + } + remaining = closeTimeout - (TInstant::Now() - startTime); + Sleep(Min(TDuration::MilliSeconds(100), remaining)); + } + with_lock(Lock) { + ready = (OriginalMessagesToSend.empty() && SentOriginalMessages.empty()) && !AtomicGet(Aborting); + } + with_lock(Lock) { + CloseImpl(EStatus::SUCCESS, NYql::TIssues{}); + needSetSeqNoValue = !InitSeqNoSetDone && (InitSeqNoSetDone = true); + } + if (needSetSeqNoValue) { + InitSeqNoPromise.SetException("session closed"); + } + if (ready) { + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session: gracefully shut down, all writes complete"); + } else { + DbDriverState->Log.Write( + TLOG_WARNING, + LogPrefix() << "Write session: could not confirm all writes in time" + << " or session aborted, perform hard shutdown" + ); + } + return ready; +} + +void TWriteSessionImpl::HandleWakeUpImpl() { + Y_VERIFY(Lock.IsLocked()); + + FlushWriteIfRequiredImpl(); + if (AtomicGet(Aborting)) { + return; + } + auto callback = [sharedThis = this->shared_from_this(), wire = Tracker->MakeTrackedWire()] (bool ok) + { + if (!ok) { + return; + } + with_lock(sharedThis->Lock) { + sharedThis->HandleWakeUpImpl(); + } + }; + if (TInstant::Now() - LastTokenUpdate > UPDATE_TOKEN_PERIOD) { + LastTokenUpdate = TInstant::Now(); + UpdateTokenIfNeededImpl(); + } + + const auto flushAfter = CurrentBatch.StartedAt == TInstant::Zero() + ? WakeupInterval + : WakeupInterval - Min(Now() - CurrentBatch.StartedAt, WakeupInterval); + Connections->ScheduleCallback(flushAfter, std::move(callback)); +} + +void TWriteSessionImpl::UpdateTimedCountersImpl() { + Y_VERIFY(Lock.IsLocked()); + + auto now = TInstant::Now(); + auto delta = (now - LastCountersUpdateTs).MilliSeconds(); + double percent = 100.0 / Settings.MaxMemoryUsage_; + + Counters->TotalBytesInflightUsageByTime->Collect(*Counters->BytesInflightTotal * percent, delta); + Counters->UncompressedBytesInflightUsageByTime->Collect(*Counters->BytesInflightUncompressed * percent, delta); + Counters->CompressedBytesInflightUsageByTime->Collect(*Counters->BytesInflightCompressed * percent, delta); + + *Counters->CurrentSessionLifetimeMs = (TInstant::Now() - SessionStartedTs).MilliSeconds(); + LastCountersUpdateTs = now; + if (LastCountersLogTs == TInstant::Zero() || TInstant::Now() - LastCountersLogTs > TDuration::Seconds(60)) { + LastCountersLogTs = TInstant::Now(); + +#define LOG_COUNTER(counter) \ + << " " Y_STRINGIZE(counter) ": " \ + << Counters->counter->Val() \ + /**/ + + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() + << "Counters: {" + LOG_COUNTER(Errors) + LOG_COUNTER(CurrentSessionLifetimeMs) + LOG_COUNTER(BytesWritten) + LOG_COUNTER(MessagesWritten) + LOG_COUNTER(BytesWrittenCompressed) + LOG_COUNTER(BytesInflightUncompressed) + LOG_COUNTER(BytesInflightCompressed) + LOG_COUNTER(BytesInflightTotal) + LOG_COUNTER(MessagesInflight) + << " }" + ); + +#undef LOG_COUNTER + } +} + +void TWriteSessionImpl::AbortImpl() { + Y_VERIFY(Lock.IsLocked()); + + if (!AtomicGet(Aborting)) { + DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: aborting"); + AtomicSet(Aborting, 1); + NPersQueue::Cancel(ConnectContext); + NPersQueue::Cancel(ConnectTimeoutContext); + NPersQueue::Cancel(ConnectDelayContext); + if (Processor) + Processor->Cancel(); + + NPersQueue::Cancel(ClientContext); + ClientContext.reset(); // removes context from contexts set from underlying gRPC-client. + } +} + +void TWriteSessionImpl::CloseImpl(EStatus statusCode, NYql::TIssues&& issues) { + Y_VERIFY(Lock.IsLocked()); + + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will now close"); + EventsQueue->Close(TSessionClosedEvent(statusCode, std::move(issues))); + AbortImpl(); +} + +void TWriteSessionImpl::CloseImpl(EStatus statusCode, const TString& message) { + Y_VERIFY(Lock.IsLocked()); + + NYql::TIssues issues; + issues.AddIssue(message); + CloseImpl(statusCode, std::move(issues)); +} + +void TWriteSessionImpl::CloseImpl(TPlainStatus&& status) { + Y_VERIFY(Lock.IsLocked()); + + DbDriverState->Log.Write(TLOG_INFO, LogPrefix() << "Write session will now close"); + EventsQueue->Close(TSessionClosedEvent(std::move(status))); + AbortImpl(); +} + +TWriteSessionImpl::~TWriteSessionImpl() { + DbDriverState->Log.Write(TLOG_DEBUG, LogPrefix() << "Write session: destroy"); + bool needClose = false; + with_lock(Lock) { + if (!AtomicGet(Aborting)) { + CloseImpl(EStatus::SUCCESS, NYql::TIssues{}); + + needClose = !InitSeqNoSetDone && (InitSeqNoSetDone = true); + } + } + if (needClose) { + InitSeqNoPromise.SetException("session closed"); + } +} + +}; // namespace NYdb::NTopic diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h new file mode 100644 index 0000000000..b9e5ae72ee --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h @@ -0,0 +1,438 @@ +#pragma once + +#include "topic_impl.h" + +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/impl_tracker.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> + +#include <util/generic/buffer.h> + + +namespace NYdb::NTopic { + +inline const TString& GetCodecId(const ECodec codec) { + static THashMap<ECodec, TString> idByCodec{ + {ECodec::RAW, TString(1, '\0')}, + {ECodec::GZIP, "\1"}, + {ECodec::LZOP, "\2"}, + {ECodec::ZSTD, "\3"} + }; + Y_VERIFY(idByCodec.contains(codec)); + return idByCodec[codec]; +} + +class TWriteSessionEventsQueue: public NPersQueue::TBaseSessionEventsQueue<TWriteSessionSettings, TWriteSessionEvent::TEvent, TSessionClosedEvent, IExecutor> { + using TParent = TBaseSessionEventsQueue<TWriteSessionSettings, TWriteSessionEvent::TEvent, TSessionClosedEvent, IExecutor>; + +public: + TWriteSessionEventsQueue(const TWriteSessionSettings& settings, + std::shared_ptr<NPersQueue::TImplTracker> tracker = std::make_shared<NPersQueue::TImplTracker>()) + : TParent(settings) + , Tracker(std::move(tracker)) + {} + + void PushEvent(TEventInfo eventInfo) { + if (Closed || ApplyHandler(eventInfo)) { + return; + } + + NPersQueue::TWaiter waiter; + with_lock (Mutex) { + Events.emplace(std::move(eventInfo)); + waiter = PopWaiterImpl(); + } + waiter.Signal(); // Does nothing if waiter is empty. + } + + TMaybe<TEvent> GetEvent(bool block = false) { + TMaybe<TEventInfo> eventInfo; + with_lock (Mutex) { + if (block) { + WaitEventsImpl(); + } + if (HasEventsImpl()) { + eventInfo = GetEventImpl(); + } else { + return Nothing(); + } + } + eventInfo->OnUserRetrievedEvent(); + return std::move(eventInfo->Event); + } + + TVector<TEvent> GetEvents(bool block = false, TMaybe<size_t> maxEventsCount = Nothing()) { + TVector<TEventInfo> eventInfos; + with_lock (Mutex) { + if (block) { + WaitEventsImpl(); + } + eventInfos.reserve(Min(Events.size() + CloseEvent.Defined(), maxEventsCount ? *maxEventsCount : std::numeric_limits<size_t>::max())); + while (!Events.empty()) { + eventInfos.emplace_back(GetEventImpl()); + if (maxEventsCount && eventInfos.size() >= *maxEventsCount) { + break; + } + } + if (CloseEvent && Events.empty() && (!maxEventsCount || eventInfos.size() < *maxEventsCount)) { + eventInfos.push_back({*CloseEvent}); + } + } + + TVector<TEvent> result; + result.reserve(eventInfos.size()); + for (TEventInfo& eventInfo : eventInfos) { + eventInfo.OnUserRetrievedEvent(); + result.emplace_back(std::move(eventInfo.Event)); + } + return result; + } + + void Close(const TSessionClosedEvent& event) { + NPersQueue::TWaiter waiter; + with_lock (Mutex) { + CloseEvent = event; + Closed = true; + waiter = NPersQueue::TWaiter(Waiter.ExtractPromise(), this); + } + + TEventInfo info(event); + ApplyHandler(info); + + waiter.Signal(); + } + +private: + struct THandlersVisitor : public TParent::TBaseHandlersVisitor { + using TParent::TBaseHandlersVisitor::TBaseHandlersVisitor; +#define DECLARE_HANDLER(type, handler, answer) \ + bool operator()(type& event) { \ + if (Settings.EventHandlers_.handler) { \ + Settings.EventHandlers_.handler(event); \ + return answer; \ + } \ + return false; \ + } \ + /**/ + DECLARE_HANDLER(TWriteSessionEvent::TAcksEvent, AcksHandler_, true); + DECLARE_HANDLER(TWriteSessionEvent::TReadyToAcceptEvent, ReadyToAcceptHander_, true); + DECLARE_HANDLER(TSessionClosedEvent, SessionClosedHandler_, false); // Not applied + +#undef DECLARE_HANDLER + bool Visit() { + return std::visit(*this, Event); + } + + }; + + bool ApplyHandler(TEventInfo& eventInfo) { + THandlersVisitor visitor(Settings, eventInfo.Event, Tracker); + return visitor.Visit(); + } + + TEventInfo GetEventImpl() { // Assumes that we're under lock and that the event queue has events. + Y_VERIFY(!Mutex.TryAcquire()); // We are under lock + Y_ASSERT(HasEventsImpl()); + if (!Events.empty()) { + TEventInfo event = std::move(Events.front()); + Events.pop(); + RenewWaiterImpl(); + return event; + } + Y_ASSERT(CloseEvent); + return {*CloseEvent}; + } + +private: + std::shared_ptr<NPersQueue::TImplTracker> Tracker; +}; + +struct TMemoryUsageChange { + bool WasOk; //!< MemoryUsage <= Config.MaxMemoryUsage_ before update + bool NowOk; //!< Same, only after update +}; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// TWriteSessionImpl + +class TWriteSessionImpl : public IWriteSession, + public std::enable_shared_from_this<TWriteSessionImpl> { +private: + friend class TWriteSession; + friend class TSimpleBlockingWriteSession; + +private: + using TClientMessage = Ydb::Topic::StreamWriteMessage::FromClient; + using TServerMessage = Ydb::Topic::StreamWriteMessage::FromServer; + using IWriteSessionConnectionProcessorFactory = + TTopicClient::TImpl::IWriteSessionConnectionProcessorFactory; + using IProcessor = IWriteSessionConnectionProcessorFactory::IProcessor; + + struct TMessage { + ui64 SeqNo; + TInstant CreatedAt; + TStringBuf DataRef; + TMaybe<ECodec> Codec; + ui32 OriginalSize; // only for coded messages + TMessage(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec = {}, ui32 originalSize = 0) + : SeqNo(seqNo) + , CreatedAt(createdAt) + , DataRef(data) + , Codec(codec) + , OriginalSize(originalSize) + {} + }; + + struct TMessageBatch { + TBuffer Data; + TVector<TMessage> Messages; + ui64 CurrentSize = 0; + TInstant StartedAt = TInstant::Zero(); + bool Acquired = false; + bool FlushRequested = false; + void Add(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize) { + if (StartedAt == TInstant::Zero()) + StartedAt = TInstant::Now(); + CurrentSize += codec ? originalSize : data.size(); + Messages.emplace_back(seqNo, createdAt, data, codec, originalSize); + Acquired = false; + } + + bool HasCodec() const { + return Messages.empty() ? false : Messages.front().Codec.Defined(); + } + + bool Acquire() { + if (Acquired || Messages.empty()) + return false; + auto currSize = Data.size(); + Data.Append(Messages.back().DataRef.data(), Messages.back().DataRef.size()); + Messages.back().DataRef = TStringBuf(Data.data() + currSize, Data.size() - currSize); + Acquired = true; + return true; + } + + bool Empty() const noexcept { + return CurrentSize == 0 && Messages.empty(); + } + + void Reset() { + StartedAt = TInstant::Zero(); + Messages.clear(); + Data.Clear(); + Acquired = false; + CurrentSize = 0; + FlushRequested = false; + } + }; + + struct TBlock { + size_t Offset = 0; //!< First message sequence number in the block + size_t MessageCount = 0; + size_t PartNumber = 0; + size_t OriginalSize = 0; + size_t OriginalMemoryUsage = 0; + ui32 CodecID = static_cast<ui32>(ECodec::RAW); + mutable TVector<TStringBuf> OriginalDataRefs; + mutable TBuffer Data; + bool Compressed = false; + mutable bool Valid = true; + + TBlock& operator=(TBlock&&) = default; + TBlock(TBlock&&) = default; + TBlock() = default; + + //For taking ownership by copying from const object, f.e. lambda -> std::function, priority_queue + void Move(const TBlock& rhs) { + Offset = rhs.Offset; + MessageCount = rhs.MessageCount; + PartNumber = rhs.PartNumber; + OriginalSize = rhs.OriginalSize; + OriginalMemoryUsage = rhs.OriginalMemoryUsage; + CodecID = rhs.CodecID; + OriginalDataRefs.swap(rhs.OriginalDataRefs); + Data.Swap(rhs.Data); + Compressed = rhs.Compressed; + + rhs.Data.Clear(); + rhs.OriginalDataRefs.clear(); + } + }; + + struct TOriginalMessage { + ui64 SeqNo; + TInstant CreatedAt; + size_t Size; + TOriginalMessage(const ui64 sequenceNumber, const TInstant createdAt, const size_t size) + : SeqNo(sequenceNumber) + , CreatedAt(createdAt) + , Size(size) + {} + }; + + //! Block comparer, makes block with smallest offset (first sequence number) appear on top of the PackedMessagesToSend priority queue + struct Greater { + bool operator() (const TBlock& lhs, const TBlock& rhs) { + return lhs.Offset > rhs.Offset; + } + }; + + struct THandleResult { + bool DoRestart = false; + TDuration StartDelay = TDuration::Zero(); + bool DoStop = false; + bool DoSetSeqNo = false; + }; + struct TProcessSrvMessageResult { + THandleResult HandleResult; + TMaybe<ui64> InitSeqNo; + TVector<TWriteSessionEvent::TEvent> Events; + bool Ok = true; + }; + + THandleResult OnErrorImpl(NYdb::TPlainStatus&& status); // true - should Start(), false - should Close(), empty - no action + +public: + TWriteSessionImpl(const TWriteSessionSettings& settings, + std::shared_ptr<TTopicClient::TImpl> client, + std::shared_ptr<TGRpcConnectionsImpl> connections, + TDbDriverStatePtr dbDriverState, + std::shared_ptr<NPersQueue::TImplTracker> tracker); + + TMaybe<TWriteSessionEvent::TEvent> GetEvent(bool block = false) override; + TVector<TWriteSessionEvent::TEvent> GetEvents(bool block = false, + TMaybe<size_t> maxEventsCount = Nothing()) override; + NThreading::TFuture<ui64> GetInitSeqNo() override; + + void Write(TContinuationToken&& continuationToken, TStringBuf data, + TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) override; + + void WriteEncoded(TContinuationToken&& continuationToken, TStringBuf data, ECodec codec, ui32 originalSize, + TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) override; + + + NThreading::TFuture<void> WaitEvent() override; + + // Empty maybe - block till all work is done. Otherwise block at most at closeTimeout duration. + bool Close(TDuration closeTimeout = TDuration::Max()) override; + + TWriterCounters::TPtr GetCounters() override {Y_FAIL("Unimplemented"); } //ToDo - unimplemented; + + ~TWriteSessionImpl(); // will not call close - destroy everything without acks + +private: + + TStringBuilder LogPrefix() const; + + void UpdateTokenIfNeededImpl(); + + void WriteInternal(TContinuationToken&& continuationToken, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize, + TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()); + + void FlushWriteIfRequiredImpl(); + size_t WriteBatchImpl(); + void Start(const TDuration& delay); + void InitWriter(); + + void OnConnect(TPlainStatus&& st, typename IProcessor::TPtr&& processor, + const NGrpc::IQueueClientContextPtr& connectContext); + void OnConnectTimeout(const NGrpc::IQueueClientContextPtr& connectTimeoutContext); + void ResetForRetryImpl(); + THandleResult RestartImpl(const TPlainStatus& status); + void DoConnect(const TDuration& delay, const TString& endpoint); + void InitImpl(); + void ReadFromProcessor(); // Assumes that we're under lock. + void WriteToProcessorImpl(TClientMessage&& req); // Assumes that we're under lock. + void OnReadDone(NGrpc::TGrpcStatus&& grpcStatus, size_t connectionGeneration); + void OnWriteDone(NGrpc::TGrpcStatus&& status, size_t connectionGeneration); + TProcessSrvMessageResult ProcessServerMessageImpl(); + TMemoryUsageChange OnMemoryUsageChangedImpl(i64 diff); + void CompressImpl(TBlock&& block); + void OnCompressed(TBlock&& block, bool isSyncCompression=false); + TMemoryUsageChange OnCompressedImpl(TBlock&& block); + + //TString GetDebugIdentity() const; + TClientMessage GetInitClientMessage(); + bool CleanupOnAcknowledged(ui64 sequenceNumber); + bool IsReadyToSendNextImpl() const; + ui64 GetNextSeqNoImpl(const TMaybe<ui64>& seqNo); + void SendImpl(); + void AbortImpl(); + void CloseImpl(EStatus statusCode, NYql::TIssues&& issues); + void CloseImpl(EStatus statusCode, const TString& message); + void CloseImpl(TPlainStatus&& status); + + void OnErrorResolved() { + RetryState = nullptr; + } + void CheckHandleResultImpl(THandleResult& result); + void ProcessHandleResult(THandleResult& result); + void HandleWakeUpImpl(); + void UpdateTimedCountersImpl(); + +private: + TWriteSessionSettings Settings; + std::shared_ptr<TTopicClient::TImpl> Client; + std::shared_ptr<TGRpcConnectionsImpl> Connections; + TString TargetCluster; + TString InitialCluster; + TString CurrentCluster; + bool OnSeqNoShift = false; + TString PreferredClusterByCDS; + std::shared_ptr<IWriteSessionConnectionProcessorFactory> ConnectionFactory; + TDbDriverStatePtr DbDriverState; + TStringType PrevToken; + bool UpdateTokenInProgress = false; + TInstant LastTokenUpdate = TInstant::Zero(); + std::shared_ptr<NPersQueue::TImplTracker> Tracker; + std::shared_ptr<TWriteSessionEventsQueue> EventsQueue; + NGrpc::IQueueClientContextPtr ClientContext; // Common client context. + NGrpc::IQueueClientContextPtr ConnectContext; + NGrpc::IQueueClientContextPtr ConnectTimeoutContext; + NGrpc::IQueueClientContextPtr ConnectDelayContext; + size_t ConnectionGeneration = 0; + size_t ConnectionAttemptsDone = 0; + TAdaptiveLock Lock; + IProcessor::TPtr Processor; + IRetryPolicy::IRetryState::TPtr RetryState; // Current retry state (if now we are (re)connecting). + std::shared_ptr<TServerMessage> ServerMessage; // Server message to write server response to. + + TString SessionId; + IExecutor::TPtr Executor; + IExecutor::TPtr CompressionExecutor; + size_t MemoryUsage = 0; //!< Estimated amount of memory used + + TMessageBatch CurrentBatch; + + std::queue<TOriginalMessage> OriginalMessagesToSend; + std::priority_queue<TBlock, std::vector<TBlock>, Greater> PackedMessagesToSend; + //! Messages that are sent but yet not acknowledged + std::queue<TOriginalMessage> SentOriginalMessages; + std::queue<TBlock> SentPackedMessage; + + const size_t MaxBlockSize = std::numeric_limits<size_t>::max(); + const size_t MaxBlockMessageCount = 1; //!< Max message count that can be packed into a single block. In block version 0 is equal to 1 for compatibility + bool Connected = false; + bool Started = false; + TAtomic Aborting = 0; + bool SessionEstablished = false; + ui32 PartitionId = 0; + ui64 LastSeqNo = 0; + ui64 MinUnsentSeqNo = 0; + ui64 SeqNoShift = 0; + TMaybe<bool> AutoSeqNoMode; + bool ValidateSeqNoMode = false; + + NThreading::TPromise<ui64> InitSeqNoPromise; + bool InitSeqNoSetDone = false; + TInstant SessionStartedTs; + TInstant LastCountersUpdateTs = TInstant::Zero(); + TInstant LastCountersLogTs; + TWriterCounters::TPtr Counters; + TDuration WakeupInterval; + +protected: + ui64 MessagesAcquired = 0; +}; + +}; // namespace NYdb::NTopic diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h index 5225948e6b..e857f9ce4a 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h @@ -524,7 +524,7 @@ struct TWriteStat : public TThrRefBase { }; class TContinuationToken : public TMoveOnly { - friend class TWriteSession; + friend class TWriteSessionImpl; private: TContinuationToken() = default; }; @@ -1235,8 +1235,12 @@ struct TWriteSessionSettings : public TRequestSettings<TWriteSessionSettings> { //! Function to handle all event types. //! If event with current type has no handler for this type of event, //! this handler (if specified) will be used. - //! If this handler is not specified, event can be received with TReadSession::GetEvent() method. - FLUENT_SETTING(std::function<void(TReadSessionEvent::TEvent&)>, CommonHandler); + //! If this handler is not specified, event can be received with TWriteSession::GetEvent() method. + std::function<void(TWriteSessionEvent::TEvent&)> CommonHandler_; + TSelf& CommonHandler(std::function<void(TWriteSessionEvent::TEvent&)>&& handler) { + CommonHandler_ = std::move(handler); + return static_cast<TSelf&>(*this); + } //! Executor for handlers. //! If not set, default single threaded executor will be used. diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp index 286fd722d7..32613c935d 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp @@ -14,6 +14,8 @@ #include <library/cpp/threading/future/future.h> #include <library/cpp/threading/future/async.h> +#include <future> + namespace NYdb::NTopic::NTests { Y_UNIT_TEST_SUITE(BasicUsage) { @@ -216,6 +218,244 @@ Y_UNIT_TEST_SUITE(BasicUsage) { ReadSession->Close(TDuration::MilliSeconds(10)); Cerr << ">>> TEST: Session gracefully closed" << Endl; } + + Y_UNIT_TEST(SessionNotDestroyedWhileCompressionInFlight) { + auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME); + + // controlled executor + auto stepByStepExecutor = CreateThreadPoolManagedExecutor(1); + + // Create topic client. + NYdb::NTopic::TTopicClient topicClient(setup->GetDriver()); + + NThreading::TPromise<void> promiseToWrite = NThreading::NewPromise<void>(); + auto futureWrite = promiseToWrite.GetFuture(); + + NThreading::TPromise<void> promiseToRead = NThreading::NewPromise<void>(); + auto futureRead = promiseToRead.GetFuture(); + + NYdb::NTopic::TWriteSessionSettings writeSettings; + writeSettings.Path(setup->GetTestTopic()) + .MessageGroupId("src_id") + .ProducerId("src_id") + .CompressionExecutor(stepByStepExecutor); + + // Create read session. + NYdb::NTopic::TReadSessionSettings readSettings; + readSettings + .ConsumerName("shared/user") + .MaxMemoryUsageBytes(1_MB) + .AppendTopics(setup->GetTestTopic()) + .DecompressionExecutor(stepByStepExecutor); + + auto f = std::async(std::launch::async, + [readSettings, writeSettings, &topicClient, + promiseToWrite = std::move(promiseToWrite), + promiseToRead = std::move(promiseToRead)]() mutable { + { + auto writeSession = topicClient.CreateSimpleBlockingWriteSession(writeSettings); + std::string message(2'000, 'x'); + bool res = writeSession->Write(message); + UNIT_ASSERT(res); + writeSession->Close(TDuration::Seconds(10)); + } + promiseToWrite.SetValue(); + Cerr << ">>>TEST: write promise set " << Endl; + + { + NThreading::TPromise<void> promise = NThreading::NewPromise<void>(); + auto future = promise.GetFuture(); + + readSettings.EventHandlers_.SimpleDataHandlers( + [promise = std::move(promise)](NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& ev) mutable { + ev.Commit(); + promise.SetValue(); + Cerr << ">>>TEST: get read event " << Endl; + }); + + auto readSession = topicClient.CreateReadSession(readSettings); + future.Wait(); + readSession->Close(TDuration::Seconds(10)); + } + promiseToRead.SetValue(); + Cerr << ">>>TEST: read promise set " << Endl; + }); + + + // + // auxiliary functions for decompressor and handler control + // + auto WaitTasks = [&](auto f, size_t c) { + while (f() < c) { + Sleep(TDuration::MilliSeconds(100)); + }; + }; + auto WaitPlannedTasks = [&](auto e, size_t count) { + WaitTasks([&]() { return e->GetPlannedCount(); }, count); + }; + auto WaitExecutedTasks = [&](auto e, size_t count) { + WaitTasks([&]() { return e->GetExecutedCount(); }, count); + }; + + auto RunTasks = [&](auto e, const std::vector<size_t>& tasks) { + size_t n = tasks.size(); + Cerr << ">>>TEST in RunTasks: before WaitPlannedTasks" << Endl; + WaitPlannedTasks(e, n); + Cerr << ">>>TEST in RunTasks: before WaitExecutedTasks" << Endl; + size_t completed = e->GetExecutedCount(); + e->StartFuncs(tasks); + WaitExecutedTasks(e, completed + n); + }; + + UNIT_ASSERT(!futureWrite.HasValue()); + Cerr << ">>>TEST: future write has no value " << Endl; + RunTasks(stepByStepExecutor, {0}); + futureWrite.GetValueSync(); + UNIT_ASSERT(futureWrite.HasValue()); + Cerr << ">>>TEST: future write has value " << Endl; + + UNIT_ASSERT(!futureRead.HasValue()); + Cerr << ">>>TEST: future read has no value " << Endl; + RunTasks(stepByStepExecutor, {1}); + futureRead.GetValueSync(); + UNIT_ASSERT(futureRead.HasValue()); + Cerr << ">>>TEST: future read has value " << Endl; + + f.get(); + + Cerr << ">>> TEST: gracefully closed" << Endl; + } + + Y_UNIT_TEST(SessionNotDestroyedWhileUserEventHandlingInFlight) { + auto setup = std::make_shared<NPersQueue::NTests::TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME); + + // controlled executor + auto stepByStepExecutor = CreateThreadPoolManagedExecutor(1); + + // Create topic client. + NYdb::NTopic::TTopicClient topicClient(setup->GetDriver()); + + // NThreading::TPromise<void> promiseToWrite = NThreading::NewPromise<void>(); + // auto futureWrite = promiseToWrite.GetFuture(); + + NThreading::TPromise<void> promiseToRead = NThreading::NewPromise<void>(); + auto futureRead = promiseToRead.GetFuture(); + + auto writeSettings = TWriteSessionSettings() + .Path(setup->GetTestTopic()) + .MessageGroupId("src_id") + .ProducerId("src_id"); + + auto writeSession = topicClient.CreateSimpleBlockingWriteSession(writeSettings); + std::string message(2'000, 'x'); + bool res = writeSession->Write(message); + UNIT_ASSERT(res); + writeSession->Close(TDuration::Seconds(10)); + + // writeSettings.EventHandlers_ + // .HandlersExecutor(stepByStepExecutor); + + // Create read session. + auto readSettings = TReadSessionSettings() + .ConsumerName("shared/user") + .MaxMemoryUsageBytes(1_MB) + .AppendTopics(setup->GetTestTopic()); + + readSettings.EventHandlers_ + .HandlersExecutor(stepByStepExecutor); + + auto f = std::async(std::launch::async, + [readSettings, /*writeSettings,*/ &topicClient, + // promiseToWrite = std::move(promiseToWrite), + promiseToRead = std::move(promiseToRead)]() mutable { + // { + // std::shared_ptr<TContinuationToken> token; + // writeSettings.EventHandlers_.CommonHandler([token](TWriteSessionEvent::TEvent& event){ + // Cerr << ">>>TEST: in CommonHandler " << Endl; + + // if (std::holds_alternative<TWriteSessionEvent::TReadyToAcceptEvent>(event)) { + // *token = std::move(std::get<TWriteSessionEvent::TReadyToAcceptEvent>(event).ContinuationToken); + // } + // }); + + // auto writeSession = topicClient.CreateWriteSession(writeSettings); + // std::string message(2'000, 'x'); + // writeSession->WaitEvent().Wait(); + // writeSession->Write(std::move(*token), message); + // writeSession->WaitEvent().Wait(); + // writeSession->Close(TDuration::Seconds(10)); + // } + // promiseToWrite.SetValue(); + // Cerr << ">>>TEST: write promise set " << Endl; + + { + NThreading::TPromise<void> promise = NThreading::NewPromise<void>(); + auto future = promise.GetFuture(); + + readSettings.EventHandlers_.SimpleDataHandlers( + [promise = std::move(promise)](NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& ev) mutable { + Cerr << ">>>TEST: in SimpleDataHandlers " << Endl; + ev.Commit(); + promise.SetValue(); + }); + + auto readSession = topicClient.CreateReadSession(readSettings); + future.Wait(); + readSession->Close(TDuration::Seconds(10)); + } + promiseToRead.SetValue(); + Cerr << ">>>TEST: read promise set " << Endl; + }); + + + // + // auxiliary functions for decompressor and handler control + // + auto WaitTasks = [&](auto f, size_t c) { + while (f() < c) { + Sleep(TDuration::MilliSeconds(100)); + }; + }; + auto WaitPlannedTasks = [&](auto e, size_t count) { + WaitTasks([&]() { return e->GetPlannedCount(); }, count); + }; + auto WaitExecutedTasks = [&](auto e, size_t count) { + WaitTasks([&]() { return e->GetExecutedCount(); }, count); + }; + + auto RunTasks = [&](auto e, const std::vector<size_t>& tasks) { + size_t n = tasks.size(); + Cerr << ">>>TEST in RunTasks: before WaitPlannedTasks" << Endl; + WaitPlannedTasks(e, n); + Cerr << ">>>TEST in RunTasks: before WaitExecutedTasks" << Endl; + size_t completed = e->GetExecutedCount(); + e->StartFuncs(tasks); + WaitExecutedTasks(e, completed + n); + }; + + // RunTasks(stepByStepExecutor, {0}); + // UNIT_ASSERT(!futureWrite.HasValue()); + // Cerr << ">>>TEST: future write has no value " << Endl; + // RunTasks(stepByStepExecutor, {1}); + // futureWrite.GetValueSync(); + // UNIT_ASSERT(futureWrite.HasValue()); + // Cerr << ">>>TEST: future write has value " << Endl; + + UNIT_ASSERT(!futureRead.HasValue()); + Cerr << ">>>TEST: future read has no value " << Endl; + // 0: TStartPartitionSessionEvent + RunTasks(stepByStepExecutor, {0}); + // 1: TDataReceivedEvent + RunTasks(stepByStepExecutor, {1}); + futureRead.GetValueSync(); + UNIT_ASSERT(futureRead.HasValue()); + Cerr << ">>>TEST: future read has value " << Endl; + + f.get(); + + Cerr << ">>> TEST: gracefully closed" << Endl; + } + } } diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index f3cc7b1ba1..839f5d282b 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -6314,6 +6314,9 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightCompressed->Val(), 0); UNIT_ASSERT_VALUES_EQUAL(counters->BytesInflightTotal->Val(), 0); + decompressor->RunAllTasks(); + executor->RunAllTasks(); + session->Close(TDuration::Seconds(10)); driver->Stop(); |