diff options
author | ildar-khisam <ikhis@ydb.tech> | 2023-09-28 11:40:46 +0300 |
---|---|---|
committer | ildar-khisam <ikhis@ydb.tech> | 2023-09-28 12:10:12 +0300 |
commit | af36c1aa28ec2ef337bc39a6095c1905f52fdb94 (patch) | |
tree | e030f6d05450536adb6571787f032301f58809ab | |
parent | f9a265a5c4b8a1780cf24c29088dffc342362813 (diff) | |
download | ydb-af36c1aa28ec2ef337bc39a6095c1905f52fdb94.tar.gz |
add callbacks context
add callback context
19 files changed, 582 insertions, 679 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h index 7e11903a463..040e5117c54 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h @@ -2,7 +2,6 @@ #include <ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.h> -#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/impl_tracker.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h> #include <ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h> diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/callback_context.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/callback_context.h new file mode 100644 index 00000000000..5de727a5ab5 --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/callback_context.h @@ -0,0 +1,71 @@ +#pragma once + +#include <library/cpp/threading/future/core/future.h> +#include <util/system/guard.h> +#include <util/system/spinlock.h> + +#include <memory> +#include <shared_mutex> +#include <vector> + +namespace NYdb::NPersQueue { + +template <typename TGuardedObject> +class TCallbackContext { +public: + using TMutexPtr = std::shared_ptr<std::shared_mutex>; + + class TBorrowed { + public: + explicit TBorrowed(const TCallbackContext& parent) : Mutex(parent.Mutex) { + Mutex->lock_shared(); + Ptr = parent.GuardedObjectPtr.get(); + } + + ~TBorrowed() { + Mutex->unlock_shared(); + } + + TGuardedObject* operator->() { + return Ptr; + } + + const TGuardedObject* operator->() const { + return Ptr; + } + + operator bool() { + return Ptr; + } + + private: + TMutexPtr Mutex; + TGuardedObject* Ptr = nullptr; + }; + +public: + explicit TCallbackContext(typename std::shared_ptr<TGuardedObject> ptr) + : Mutex(std::make_shared<std::shared_mutex>()) + , GuardedObjectPtr(std::move(ptr)) + {} + + // may block + ~TCallbackContext() { + Cancel(); + } + + TBorrowed LockShared() { + return TBorrowed(*this); + } + + void Cancel() { + std::lock_guard lock(*Mutex); + GuardedObjectPtr.reset(); + } + +private: + TMutexPtr Mutex; + typename std::shared_ptr<TGuardedObject> GuardedObjectPtr; +}; + +} diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h index 21179740daa..6acb3cfc4ff 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h @@ -1,7 +1,6 @@ #pragma once #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h> -#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/impl_tracker.h> #include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h> #include <util/generic/queue.h> @@ -314,10 +313,9 @@ protected: // Template for visitor implementation. struct TBaseHandlersVisitor { - TBaseHandlersVisitor(const TSettings& settings, TEvent& event, std::shared_ptr<TImplTracker> tracker) + TBaseHandlersVisitor(const TSettings& settings, TEvent& event) : Settings(settings) , Event(event) - , Tracker(tracker) {} template <class TEventType, class TFunc, class TCommonFunc> @@ -336,7 +334,7 @@ protected: template <class TEventType, class TFunc> void PushSpecificHandler(TEvent&& event, const TFunc& f) { Post(Settings.EventHandlers_.HandlersExecutor_, - [func = f, event = std::move(event), wire = Tracker->MakeTrackedWire()]() mutable { + [func = f, event = std::move(event)]() mutable { func(std::get<TEventType>(event)); }); } @@ -344,7 +342,7 @@ protected: template <class TFunc> void PushCommonHandler(TEvent&& event, const TFunc& f) { Post(Settings.EventHandlers_.HandlersExecutor_, - [func = f, event = std::move(event), wire = Tracker->MakeTrackedWire()]() mutable { func(event); }); + [func = f, event = std::move(event)]() mutable { func(event); }); } virtual void Post(const typename TExecutor::TPtr& executor, typename TExecutor::TFunction&& f) { @@ -353,7 +351,6 @@ protected: const TSettings& Settings; TEvent& Event; - std::shared_ptr<TImplTracker> Tracker; }; diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/counters_logger.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/counters_logger.h new file mode 100644 index 00000000000..cb238fa262a --- /dev/null +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/counters_logger.h @@ -0,0 +1,157 @@ +#pragma once + +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h> + +#include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/callback_context.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/log_lazy.h> + +#include <util/system/spinlock.h> + +namespace NYdb::NPersQueue { + +template <bool UseMigrationProtocol> +class TSingleClusterReadSessionImpl; + +template <bool UseMigrationProtocol> +using TCallbackContextPtr = std::shared_ptr<TCallbackContext<TSingleClusterReadSessionImpl<UseMigrationProtocol>>>; + +template<bool UseMigrationProtocol> +class TCountersLogger : public std::enable_shared_from_this<TCountersLogger<UseMigrationProtocol>> { +public: + static constexpr auto UPDATE_PERIOD = TDuration::Seconds(1); + + using TReaderCountersPtr = std::conditional_t<UseMigrationProtocol, + NYdb::NPersQueue::TReaderCounters::TPtr, + NYdb::NTopic::TReaderCounters::TPtr>; + +public: + explicit TCountersLogger(std::shared_ptr<TGRpcConnectionsImpl> connections, + std::vector<TCallbackContextPtr<UseMigrationProtocol>> sessions, + TReaderCountersPtr counters, const TLog& log, TString prefix, TInstant startSessionTime) + : Connections(std::move(connections)) + , SessionsContexts(std::move(sessions)) + , Counters(std::move(counters)) + , Log(log) + , Prefix(std::move(prefix)) + , StartSessionTime(startSessionTime) { + } + + std::shared_ptr<TCallbackContext<TCountersLogger<UseMigrationProtocol>>> MakeCallbackContext() { + SelfContext = std::make_shared<TCallbackContext<TCountersLogger<UseMigrationProtocol>>>(this->shared_from_this()); + return SelfContext; + } + + void Start() { + Y_VERIFY(SelfContext); + ScheduleDumpCountersToLog(); + } + + void Stop() { + Y_VERIFY(SelfContext); + + with_lock(Lock) { + Stopping = true; + } + + // Log final counters. + DumpCountersToLog(); + } + +private: + void ScheduleDumpCountersToLog(size_t timeNumber = 0) { + with_lock(Lock) { + if (Stopping) { + return; + } + + DumpCountersContext = Connections->CreateContext(); + if (DumpCountersContext) { + auto callback = [ctx = SelfContext, timeNumber](bool ok) { + if (ok) { + if (auto borrowedSelf = ctx->LockShared()) { + borrowedSelf->DumpCountersToLog(timeNumber); + } + } + }; + Connections->ScheduleCallback(UPDATE_PERIOD, + std::move(callback), + DumpCountersContext); + } + } + } + + void DumpCountersToLog(size_t timeNumber = 0) { + const bool logCounters = timeNumber % 60 == 0; // Every 1 minute. + const bool dumpSessionsStatistics = timeNumber % 600 == 0; // Every 10 minutes. + + *Counters->CurrentSessionLifetimeMs = (TInstant::Now() - StartSessionTime).MilliSeconds(); + + { + TMaybe<TLogElement> log; + bool dumpHeader = true; + + for (auto& sessionCtx : SessionsContexts) { + if (auto borrowedSession = sessionCtx->LockShared()) { + borrowedSession->UpdateMemoryUsageStatistics(); + if (dumpSessionsStatistics) { + if (dumpHeader) { + log.ConstructInPlace(&Log, TLOG_INFO); + (*log) << "Read/commit by partition streams (cluster:topic:partition:stream-id:read-offset:committed-offset):"; + dumpHeader = false; + } + borrowedSession->DumpStatisticsToLog(*log); + } + } + } + } + +#define C(counter) \ + << " " Y_STRINGIZE(counter) ": " \ + << Counters->counter->Val() \ + /**/ + + if (logCounters) { + LOG_LAZY(Log, TLOG_INFO, + TStringBuilder() << Prefix << "Counters: {" + C(Errors) + C(CurrentSessionLifetimeMs) + C(BytesRead) + C(MessagesRead) + C(BytesReadCompressed) + C(BytesInflightUncompressed) + C(BytesInflightCompressed) + C(BytesInflightTotal) + C(MessagesInflight) + << " }" + ); + } + +#undef C + + ScheduleDumpCountersToLog(timeNumber + 1); + } + + +private: + std::shared_ptr<TCallbackContext<TCountersLogger<UseMigrationProtocol>>> SelfContext; + + TSpinLock Lock; + + std::shared_ptr<TGRpcConnectionsImpl> Connections; + std::vector<TCallbackContextPtr<UseMigrationProtocol>> SessionsContexts; + + IQueueClientContextPtr DumpCountersContext; + + TReaderCountersPtr Counters; + + TLog Log; + const TString Prefix; + const TInstant StartSessionTime; + + bool Stopping = false; +}; + +} diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/impl_tracker.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/impl_tracker.h deleted file mode 100644 index 9fe0d3ae04e..00000000000 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/impl_tracker.h +++ /dev/null @@ -1,123 +0,0 @@ -#pragma once - -#include <library/cpp/threading/future/core/future.h> -#include <util/system/guard.h> -#include <util/system/spinlock.h> - -#include <memory> - -namespace NYdb::NPersQueue { - -class TImplTracker { -public: - struct TWire { - TImplTracker* Owner; - - TWire() - : Owner(nullptr) { - } - - TWire(TImplTracker& owner) - : Owner(&owner) { - Owner->Increment(); - } - - ~TWire() { - if (Owner) { - Owner->Decrement(); - } - } - - TWire(const TWire& that) : Owner(that.Owner) { - if (Owner) - Owner->Increment(); - } - TWire(TWire&& that) - : Owner(std::exchange(that.Owner, nullptr)) { - } - TWire& operator=(const TWire& that) { - if (Owner == that.Owner) { - return *this; - } - if (Owner) { - Owner->Decrement(); - } - Owner = that.Owner; - if (Owner) { - Owner->Increment(); - } - return *this; - } - TWire& operator=(TWire&& that) { - if (this == &that) { - return *this; - } - if (Owner == that.Owner) { - if (that.Owner) { - that.Owner->Decrement(); - } - that.Owner = nullptr; - return *this; - } - if (Owner) { - Owner->Decrement(); - } - Owner = std::exchange(that.Owner, nullptr); - return *this; - } - - operator bool() const { - return Owner; - } - }; - -public: - ~TImplTracker() { - // to synchronize with last Decrement() in other thread - TGuard guard(Lock); - } - - std::shared_ptr<TWire> MakeTrackedWire() { - return std::make_shared<TWire>(*this); - } - - void Increment() { - with_lock(Lock) { - Y_VERIFY(!AwaitingCompletion()); - ++RefCount; - } - } - - void Decrement() { - with_lock(Lock) { - --RefCount; - if (RefCount == 0 && AwaitingCompletion()) { - CompletionPromise.SetValue(); - } - } - } - - NThreading::TFuture<void> AsyncComplete() { - with_lock(Lock) { - if (!AwaitingCompletion()) { - CompletionPromise = NThreading::NewPromise(); - } - if (RefCount == 0) { - CompletionPromise.SetValue(); - } - return CompletionPromise.GetFuture(); - } - } - -private: - inline bool AwaitingCompletion() const { - return CompletionPromise.Initialized(); - } - -private: - TSpinLock Lock; - size_t RefCount = 0; - NThreading::TPromise<void> CompletionPromise; -}; - -} diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp index 1bdb5307add..d7c680ecfaa 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp @@ -67,8 +67,11 @@ TReadSession::~TReadSession() { Abort(); ClearAllEvents(); - if (Tracker) { - Tracker->AsyncComplete().Wait(); + for (const auto& ctx : CbContexts) { + ctx->Cancel(); + } + if (DumpCountersContext) { + DumpCountersContext->Cancel(); } } @@ -83,8 +86,7 @@ Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest TReadSession::MakeClus } void TReadSession::Start() { - Tracker = std::make_shared<TImplTracker>(); - EventsQueue = std::make_shared<TReadSessionEventsQueue<true>>(Settings, weak_from_this(), Tracker); + EventsQueue = std::make_shared<TReadSessionEventsQueue<true>>(Settings); if (!ValidateSettings()) { return; @@ -178,7 +180,7 @@ void TReadSession::ProceedWithoutClusterDiscovery() { clusterSessionInfo.Topics = Settings.Topics_; CreateClusterSessionsImpl(deferred); } - // ScheduleDumpCountersToLog(); + SetupCountersLogger(); } void TReadSession::CreateClusterSessionsImpl(TDeferredActions<true>& deferred) { @@ -215,10 +217,10 @@ void TReadSession::CreateClusterSessionsImpl(TDeferredActions<true>& deferred) { EventsQueue, context, partitionStreamIdStart++, - clusterSessionsCount, - Tracker); + clusterSessionsCount); - deferred.DeferStartSession(clusterSessionInfo.Session); + CbContexts.push_back(clusterSessionInfo.Session->MakeCallbackContext()); + deferred.DeferStartSession(CbContexts.back()); } } @@ -320,7 +322,7 @@ void TReadSession::OnClusterDiscovery(const TStatus& status, const Ydb::PersQueu CreateClusterSessionsImpl(deferred); } - // ScheduleDumpCountersToLog(); + SetupCountersLogger(); } void TReadSession::RestartClusterDiscoveryImpl(TDuration delay, TDeferredActions<true>& deferred) { @@ -350,7 +352,7 @@ void TReadSession::RestartClusterDiscoveryImpl(TDuration delay, TDeferredActions bool TReadSession::Close(TDuration timeout) { LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Closing read session. Close timeout: " << timeout); // Log final counters. - DumpCountersToLog(); + CountersLogger->Stop(); std::vector<TSingleClusterReadSessionImpl<true>::TPtr> sessions; NThreading::TPromise<bool> promise = NThreading::NewPromise<bool>(); @@ -528,15 +530,6 @@ void TReadSession::ResumeReadingData() { } } -void TReadSession::OnUserRetrievedEvent(i64 decompressedSize, size_t messagesCount) { - LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() - << "The application data is transferred to the client. Number of messages " - << messagesCount - << ", size " - << decompressedSize - << " bytes"); -} - void TReadSession::MakeCountersIfNeeded() { if (!Settings.Counters_ || HasNullCounters(*Settings.Counters_)) { TReaderCounters::TPtr counters = MakeIntrusive<TReaderCounters>(); @@ -548,83 +541,12 @@ void TReadSession::MakeCountersIfNeeded() { } } -void TReadSession::DumpCountersToLog(size_t timeNumber) { - const bool logCounters = timeNumber % 60 == 0; // Every 1 minute. - const bool dumpSessionsStatistics = timeNumber % 600 == 0; // Every 10 minutes. - - *Settings.Counters_->CurrentSessionLifetimeMs = (TInstant::Now() - StartSessionTime).MilliSeconds(); - std::vector<TSingleClusterReadSessionImpl<true>::TPtr> sessions; - with_lock (Lock) { - if (Closing || Aborting) { - return; - } - - sessions.reserve(ClusterSessions.size()); - for (auto& [cluster, sessionInfo] : ClusterSessions) { - if (sessionInfo.Session) { - sessions.emplace_back(sessionInfo.Session); - } - } - } - - { - TMaybe<TLogElement> log; - if (dumpSessionsStatistics) { - log.ConstructInPlace(&Log, TLOG_INFO); - (*log) << "Read/commit by partition streams (cluster:topic:partition:stream-id:read-offset:committed-offset):"; - } - for (const auto& session : sessions) { - session->UpdateMemoryUsageStatistics(); - if (dumpSessionsStatistics) { - session->DumpStatisticsToLog(*log); - } - } - } - -#define C(counter) \ - << " " Y_STRINGIZE(counter) ": " \ - << Settings.Counters_->counter->Val() \ - /**/ - - if (logCounters) { - LOG_LAZY(Log, TLOG_INFO, - GetLogPrefix() << "Counters: {" - C(Errors) - C(CurrentSessionLifetimeMs) - C(BytesRead) - C(MessagesRead) - C(BytesReadCompressed) - C(BytesInflightUncompressed) - C(BytesInflightCompressed) - C(BytesInflightTotal) - C(MessagesInflight) - << " }" - ); - } - -#undef C - - // ScheduleDumpCountersToLog(timeNumber + 1); -} - -void TReadSession::ScheduleDumpCountersToLog(size_t timeNumber) { +void TReadSession::SetupCountersLogger() { with_lock(Lock) { - if (Aborting || Closing) { - return; - } - DumpCountersContext = Connections->CreateContext(); - if (DumpCountersContext) { - auto callback = [self = weak_from_this(), timeNumber](bool ok) { - if (ok) { - if (auto sharedSelf = self.lock()) { - sharedSelf->DumpCountersToLog(timeNumber); - } - } - }; - Connections->ScheduleCallback(TDuration::Seconds(1), - std::move(callback), - DumpCountersContext); - } + CountersLogger = std::make_shared<TCountersLogger<true>>(Connections, CbContexts, Settings.Counters_, Log, + GetLogPrefix(), StartSessionTime); + DumpCountersContext = CountersLogger->MakeCallbackContext(); + CountersLogger->Start(); } } diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h index 2586ed97985..29440c8d363 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h @@ -1,7 +1,8 @@ #pragma once #include "common.h" -#include "impl_tracker.h" +#include "callback_context.h" +#include "counters_logger.h" #include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h> @@ -19,6 +20,7 @@ #include <atomic> #include <deque> +#include <vector> namespace NYdb::NPersQueue { @@ -107,27 +109,9 @@ class TDataDecompressionInfo; template <bool UseMigrationProtocol> using TDataDecompressionInfoPtr = typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr; - template <bool UseMigrationProtocol> -struct IErrorHandler : public TThrRefBase { - using TPtr = TIntrusivePtr<IErrorHandler>; - - virtual void AbortSession(TASessionClosedEvent<UseMigrationProtocol>&& closeEvent) = 0; - - void AbortSession(EStatus statusCode, NYql::TIssues&& issues) { - AbortSession(TASessionClosedEvent<UseMigrationProtocol>(statusCode, std::move(issues))); - } - - void AbortSession(EStatus statusCode, const TString& message) { - NYql::TIssues issues; - issues.AddIssue(message); - AbortSession(statusCode, std::move(issues)); - } +using TCallbackContextPtr = std::shared_ptr<TCallbackContext<TSingleClusterReadSessionImpl<UseMigrationProtocol>>>; - void AbortSession(TPlainStatus&& status) { - AbortSession(TASessionClosedEvent<UseMigrationProtocol>(std::move(status))); - } -}; template <bool UseMigrationProtocol> class TUserRetrievedEventsInfoAccumulator { @@ -154,12 +138,12 @@ public: void DeferReadFromProcessor(const typename IProcessor<UseMigrationProtocol>::TPtr& processor, TServerMessage<UseMigrationProtocol>* dst, typename IProcessor<UseMigrationProtocol>::TReadCallback callback); void DeferStartExecutorTask(const typename IAExecutor<UseMigrationProtocol>::TPtr& executor, typename IAExecutor<UseMigrationProtocol>::TFunction task); - void DeferAbortSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, TASessionClosedEvent<UseMigrationProtocol>&& closeEvent); - void DeferAbortSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, EStatus statusCode, NYql::TIssues&& issues); - void DeferAbortSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, EStatus statusCode, const TString& message); - void DeferAbortSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, TPlainStatus&& status); - void DeferReconnection(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, TPlainStatus&& status); - void DeferStartSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session); + void DeferAbortSession(TCallbackContextPtr<UseMigrationProtocol> cbContext, TASessionClosedEvent<UseMigrationProtocol>&& closeEvent); + void DeferAbortSession(TCallbackContextPtr<UseMigrationProtocol> cbContext, EStatus statusCode, NYql::TIssues&& issues); + void DeferAbortSession(TCallbackContextPtr<UseMigrationProtocol> cbContext, EStatus statusCode, const TString& message); + void DeferAbortSession(TCallbackContextPtr<UseMigrationProtocol> cbContext, TPlainStatus&& status); + void DeferReconnection(TCallbackContextPtr<UseMigrationProtocol> cbContext, TPlainStatus&& status); + void DeferStartSession(TCallbackContextPtr<UseMigrationProtocol> cbContext); void DeferSignalWaiter(TWaiter&& waiter); void DeferDestroyDecompressionInfos(std::vector<TDataDecompressionInfoPtr<UseMigrationProtocol>>&& infos); @@ -189,11 +173,11 @@ private: std::vector<TWaiter> Waiters; // Reconnection and abort. - std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> Session; + TCallbackContextPtr<UseMigrationProtocol> CbContext; TPlainStatus ReconnectionStatus; - // Session to start - std::vector<std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>>> Sessions; + // Contexts for sessions to start + std::vector<TCallbackContextPtr<UseMigrationProtocol>> CbContexts; std::vector<TDataDecompressionInfoPtr<UseMigrationProtocol>> DecompressionInfos; }; @@ -207,7 +191,7 @@ public: TDataDecompressionInfo(TDataDecompressionInfo&&) = default; TDataDecompressionInfo( TPartitionData<UseMigrationProtocol>&& msg, - std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, + TCallbackContextPtr<UseMigrationProtocol> cbContext, bool doDecompress, i64 serverBytesSize = 0 // to increment read request bytes size ); @@ -338,7 +322,7 @@ private: using TMessageMetaPtrVector = std::vector<typename TAMessageMeta<UseMigrationProtocol>::TPtr>; TMetadataPtrVector BatchesMeta; std::vector<TMessageMetaPtrVector> MessagesMeta; - std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> Session; + TCallbackContextPtr<UseMigrationProtocol> CbContext; bool DoDecompress; i64 ServerBytesSize = 0; std::atomic<i64> SourceDataNotProcessed = 0; @@ -411,21 +395,21 @@ struct TReadSessionEventInfo { bool HasDataEvents = false; size_t EventsCount = 0; TMaybe<TEvent> Event; - std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> Session; + TCallbackContextPtr<UseMigrationProtocol> CbContext; // Close event. - TReadSessionEventInfo(const TASessionClosedEvent<UseMigrationProtocol>& event, std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session = {}) + TReadSessionEventInfo(const TASessionClosedEvent<UseMigrationProtocol>& event, TCallbackContextPtr<UseMigrationProtocol> cbContext = {}) : Event(TEvent(event)) - , Session(session) + , CbContext(std::move(cbContext)) { } // Usual event. - TReadSessionEventInfo(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session, TEvent event); + TReadSessionEventInfo(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, TCallbackContextPtr<UseMigrationProtocol> cbContext, TEvent event); // Data event. TReadSessionEventInfo(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, - std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session, + TCallbackContextPtr<UseMigrationProtocol> cbContext, bool hasDataEvents); bool IsEmpty() const; @@ -583,11 +567,11 @@ public: ui64 partitionId, ui64 assignId, ui64 readOffset, - std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> parentSession) + TCallbackContextPtr<UseMigrationProtocol> cbContext) : Key{topicPath, cluster, partitionId} , AssignId(assignId) , FirstNotReadOffset(readOffset) - , Session(std::move(parentSession)) + , CbContext(std::move(cbContext)) { TAPartitionStream<true>::PartitionStreamId = partitionStreamId; TAPartitionStream<true>::TopicPath = std::move(topicPath); @@ -603,11 +587,11 @@ public: i64 partitionId, i64 assignId, i64 readOffset, - std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> parentSession) + TCallbackContextPtr<UseMigrationProtocol> cbContext) : Key{topicPath, "", static_cast<ui64>(partitionId)} , AssignId(static_cast<ui64>(assignId)) , FirstNotReadOffset(static_cast<ui64>(readOffset)) - , Session(std::move(parentSession)) + , CbContext(std::move(cbContext)) { TAPartitionStream<false>::PartitionSessionId = partitionStreamId; TAPartitionStream<false>::TopicPath = std::move(topicPath); @@ -667,8 +651,8 @@ public: EventsQueue.pop_front(); } - std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> GetSession() const { - return Session; + TCallbackContextPtr<UseMigrationProtocol> GetCbContext() const { + return CbContext; } TLog GetLog() const; @@ -746,7 +730,7 @@ private: const TKey Key; ui64 AssignId; ui64 FirstNotReadOffset; - std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> Session; + TCallbackContextPtr<UseMigrationProtocol> CbContext; TRawPartitionStreamEventQueue<UseMigrationProtocol> EventsQueue; ui64 MaxReadOffset = 0; ui64 MaxCommittedOffset = 0; @@ -768,22 +752,21 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti TReadSessionEventInfo<UseMigrationProtocol>>; public: - TReadSessionEventsQueue(const TAReadSessionSettings<UseMigrationProtocol>& settings, - std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session, - std::shared_ptr<TImplTracker> tracker = std::make_shared<TImplTracker>()); + TReadSessionEventsQueue(const TAReadSessionSettings<UseMigrationProtocol>& settings); + // Assumes we are under lock. TReadSessionEventInfo<UseMigrationProtocol> - GetEventImpl(size_t& maxByteSize, - TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>& accumulator); // Assumes that we're under lock. + GetEventImpl(size_t& maxByteSize, + TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>& accumulator); TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TEvent> - GetEvents(bool block = false, - TMaybe<size_t> maxEventsCount = Nothing(), - size_t maxByteSize = std::numeric_limits<size_t>::max()); + GetEvents(bool block = false, + TMaybe<size_t> maxEventsCount = Nothing(), + size_t maxByteSize = std::numeric_limits<size_t>::max()); TMaybe<typename TAReadSessionEvent<UseMigrationProtocol>::TEvent> - GetEvent(bool block = false, - size_t maxByteSize = std::numeric_limits<size_t>::max()); + GetEvent(bool block = false, + size_t maxByteSize = std::numeric_limits<size_t>::max()); bool Close(const TASessionClosedEvent<UseMigrationProtocol>& event, TDeferredActions<UseMigrationProtocol>& deferred) { TWaiter waiter; @@ -812,7 +795,6 @@ public: // Push usual event. Returns false if queue is closed bool PushEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, - std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session, typename TAReadSessionEvent<UseMigrationProtocol>::TEvent event, TDeferredActions<UseMigrationProtocol>& deferred); @@ -843,9 +825,8 @@ private: struct THandlersVisitor : public TParent::TBaseHandlersVisitor { THandlersVisitor(const TAReadSessionSettings<UseMigrationProtocol>& settings, typename TParent::TEvent& event, - TDeferredActions<UseMigrationProtocol>& deferred, - std::shared_ptr<TImplTracker> tracker = std::make_shared<TImplTracker>()) - : TParent::TBaseHandlersVisitor(settings, event, tracker) + TDeferredActions<UseMigrationProtocol>& deferred) + : TParent::TBaseHandlersVisitor(settings, event) , Deferred(deferred) { } @@ -896,14 +877,12 @@ private: TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>& accumulator); // Assumes that we're under lock. bool ApplyHandler(TReadSessionEventInfo<UseMigrationProtocol>& eventInfo, TDeferredActions<UseMigrationProtocol>& deferred) { - THandlersVisitor visitor(this->Settings, eventInfo.GetEvent(), deferred, Tracker); + THandlersVisitor visitor(this->Settings, eventInfo.GetEvent(), deferred); return visitor.Visit(); } private: bool HasEventCallbacks; - std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> Session; - std::shared_ptr<TImplTracker> Tracker; }; } // namespace NYdb::NPersQueue @@ -956,8 +935,7 @@ public: std::shared_ptr<TReadSessionEventsQueue<UseMigrationProtocol>> eventsQueue, NGrpc::IQueueClientContextPtr clientContext, ui64 partitionStreamIdStart, - ui64 partitionStreamIdStep, - std::shared_ptr<TImplTracker> tracker = std::make_shared<TImplTracker>() + ui64 partitionStreamIdStep ) : Settings(settings) , Database(database) @@ -972,7 +950,6 @@ public: , CookieMapping() , ReadSizeBudget(GetCompressedDataSizeLimit()) , ReadSizeServerDelta(0) - , Tracker(std::move(tracker)) { } @@ -1026,6 +1003,8 @@ public: return Log; } + TCallbackContextPtr<UseMigrationProtocol> MakeCallbackContext(); + private: void BreakConnectionAndReconnectImpl(TPlainStatus&& status, TDeferredActions<UseMigrationProtocol>& deferred); @@ -1208,7 +1187,7 @@ private: i64 ReadSizeBudget; i64 ReadSizeServerDelta = 0; - std::shared_ptr<TImplTracker> Tracker; + TCallbackContextPtr<UseMigrationProtocol> CbContext; }; // High level class that manages several read session impls. @@ -1216,7 +1195,6 @@ private: // This class communicates with cluster discovery service and then creates // sessions to each cluster. class TReadSession : public IReadSession, - public IUserRetrievedEventCallback<true>, public std::enable_shared_from_this<TReadSession> { struct TClusterSessionInfo { TClusterSessionInfo(const TString& cluster) @@ -1299,11 +1277,8 @@ private: void AbortImpl(EStatus statusCode, NYql::TIssues&& issues, TDeferredActions<true>& deferred); void AbortImpl(EStatus statusCode, const TString& message, TDeferredActions<true>& deferred); - void OnUserRetrievedEvent(i64 decompressedSize, size_t messagesCount) override; - void MakeCountersIfNeeded(); - void DumpCountersToLog(size_t timeNumber = 0); - void ScheduleDumpCountersToLog(size_t timeNumber = 0); + void SetupCountersLogger(); private: TReadSessionSettings Settings; @@ -1320,14 +1295,14 @@ private: IRetryPolicy::IRetryState::TPtr ClusterDiscoveryRetryState; bool DataReadingSuspended = false; - NGrpc::IQueueClientContextPtr DumpCountersContext; - // Exiting. bool Aborting = false; bool Closing = false; - // - std::shared_ptr<TImplTracker> Tracker; + std::vector<TCallbackContextPtr<true>> CbContexts; + + std::shared_ptr<TCountersLogger<true>> CountersLogger; + std::shared_ptr<TCallbackContext<TCountersLogger<true>>> DumpCountersContext; }; } // namespace NYdb::NPersQueue diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp index 038d207c18c..5256feb1fee 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp @@ -54,7 +54,7 @@ bool HasNullCounters(TReaderCounters& counters); template<bool UseMigrationProtocol> TLog TPartitionStreamImpl<UseMigrationProtocol>::GetLog() const { - if (auto session = Session.lock()) { + if (auto session = CbContext->LockShared()) { return session->GetLog(); } return {}; @@ -63,7 +63,7 @@ TLog TPartitionStreamImpl<UseMigrationProtocol>::GetLog() const { template<bool UseMigrationProtocol> void TPartitionStreamImpl<UseMigrationProtocol>::Commit(ui64 startOffset, ui64 endOffset) { std::vector<std::pair<ui64, ui64>> toCommit; - if (auto sessionShared = Session.lock()) { + if (auto sessionShared = CbContext->LockShared()) { Y_VERIFY(endOffset > startOffset); with_lock(sessionShared->Lock) { if (!AddToCommitRanges(startOffset, endOffset, true)) // Add range for real commit always. @@ -84,21 +84,21 @@ void TPartitionStreamImpl<UseMigrationProtocol>::Commit(ui64 startOffset, ui64 e template<bool UseMigrationProtocol> void TPartitionStreamImpl<UseMigrationProtocol>::RequestStatus() { - if (auto sessionShared = Session.lock()) { + if (auto sessionShared = CbContext->LockShared()) { sessionShared->RequestPartitionStreamStatus(this); } } template<bool UseMigrationProtocol> void TPartitionStreamImpl<UseMigrationProtocol>::ConfirmCreate(TMaybe<ui64> readOffset, TMaybe<ui64> commitOffset) { - if (auto sessionShared = Session.lock()) { + if (auto sessionShared = CbContext->LockShared()) { sessionShared->ConfirmPartitionStreamCreate(this, readOffset, commitOffset); } } template<bool UseMigrationProtocol> void TPartitionStreamImpl<UseMigrationProtocol>::ConfirmDestroy() { - if (auto sessionShared = Session.lock()) { + if (auto sessionShared = CbContext->LockShared()) { sessionShared->ConfirmPartitionStreamDestroy(this); } } @@ -129,6 +129,22 @@ void TPartitionStreamImpl<UseMigrationProtocol>::DeleteNotReadyTail(TDeferredAct EventsQueue.DeleteNotReadyTail(deferred); } +template <bool UseMigrationProtocol> +void TPartitionStreamImpl<UseMigrationProtocol>::GetDataEventImpl(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, + size_t& maxEventsCount, + size_t& maxByteSize, + TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage>& messages, + TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage>& compressedMessages, + TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>& accumulator) +{ + partitionStream->EventsQueue.GetDataEventImpl(partitionStream, + maxEventsCount, + maxByteSize, + messages, + compressedMessages, + accumulator); +} + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // TRawPartitionStreamEventQueue @@ -213,8 +229,15 @@ TStringBuilder TSingleClusterReadSessionImpl<UseMigrationProtocol>::GetLogPrefix return TStringBuilder() << GetDatabaseLogPrefix(Database) << "[" << SessionId << "] [" << ClusterName << "] "; } +template <bool UseMigrationProtocol> +TCallbackContextPtr<UseMigrationProtocol> TSingleClusterReadSessionImpl<UseMigrationProtocol>::MakeCallbackContext() { + CbContext = std::make_shared<TCallbackContext<TSelf>>(this->shared_from_this()); + return CbContext; +} + template<bool UseMigrationProtocol> void TSingleClusterReadSessionImpl<UseMigrationProtocol>::Start() { + Y_VERIFY(CbContext); Settings.DecompressionExecutor_->Start(); Settings.EventHandlers_.HandlersExecutor_->Start(); if (!Reconnect(TPlainStatus())) { @@ -300,17 +323,21 @@ bool TSingleClusterReadSessionImpl<UseMigrationProtocol>::Reconnect(const TPlain // Destroy all partition streams before connecting. DestroyAllPartitionStreamsImpl(deferred); - connectCallback = [wire = Tracker->MakeTrackedWire(), - sessionImpl = this->shared_from_this(), - connectContext = connectContext](TPlainStatus&& st, typename IProcessor::TPtr&& processor) { - sessionImpl->OnConnect(std::move(st), std::move(processor), connectContext); // OnConnect could be called inplace! + Y_VERIFY(CbContext); + + connectCallback = [cbContext = CbContext, + connectContext = connectContext](TPlainStatus&& st, typename IProcessor::TPtr&& processor) { + if (auto borrowedSelf = cbContext->LockShared()) { + borrowedSelf->OnConnect(std::move(st), std::move(processor), connectContext); // OnConnect could be called inplace! + } }; - connectTimeoutCallback = [wire = Tracker->MakeTrackedWire(), - sessionImpl = this->shared_from_this(), - connectTimeoutContext = connectTimeoutContext](bool ok) { + connectTimeoutCallback = [cbContext = CbContext, + connectTimeoutContext = connectTimeoutContext](bool ok) { if (ok) { - sessionImpl->OnConnectTimeout(connectTimeoutContext); + if (auto borrowedSelf = cbContext->LockShared()) { + borrowedSelf->OnConnectTimeout(connectTimeoutContext); + } } }; } @@ -347,7 +374,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::BreakConnectionAndReco Processor = nullptr; RetryState = Settings.RetryPolicy_->CreateRetryState(); // Explicitly create retry state to determine whether we should connect to server again. - deferred.DeferReconnection(this->shared_from_this(), std::move(status)); + deferred.DeferReconnection(CbContext, std::move(status)); } template<bool UseMigrationProtocol> @@ -630,11 +657,11 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ConfirmPartitionStream bool pushRes = true; if constexpr (UseMigrationProtocol) { - pushRes = EventsQueue->PushEvent(partitionStream, TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(), + pushRes = EventsQueue->PushEvent(partitionStream, TClosedEvent(partitionStream, TClosedEvent::EReason::DestroyConfirmedByUser), deferred); } else { - pushRes = EventsQueue->PushEvent(partitionStream, TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(), + pushRes = EventsQueue->PushEvent(partitionStream, TClosedEvent(partitionStream, TClosedEvent::EReason::StopConfirmedByUser), deferred); } @@ -796,13 +823,16 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ReadFromProcessorImpl( if (Processor && !Closing) { ServerMessage->Clear(); - auto callback = [wire = Tracker->MakeTrackedWire(), - sessionImpl = this->shared_from_this(), + Y_VERIFY(CbContext); + + auto callback = [cbContext = CbContext, connectionGeneration = ConnectionGeneration, // Capture message & processor not to read in freed memory. serverMessage = ServerMessage, processor = Processor](NGrpc::TGrpcStatus&& grpcStatus) { - sessionImpl->OnReadDone(std::move(grpcStatus), connectionGeneration); + if (auto borrowedSelf = cbContext->LockShared()) { + borrowedSelf->OnReadDone(std::move(grpcStatus), connectionGeneration); + } }; deferred.DeferReadFromProcessor(Processor, ServerMessage.get(), std::move(callback)); @@ -987,7 +1017,7 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( } auto decompressionInfo = std::make_shared<TDataDecompressionInfo<true>>(std::move(partitionData), - shared_from_this(), + CbContext, Settings.Decompress_); Y_VERIFY(decompressionInfo); @@ -1013,7 +1043,7 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( NextPartitionStreamId, msg.topic().path(), msg.cluster(), msg.partition() + 1, // Group. msg.partition(), // Partition. - msg.assign_id(), msg.read_offset(), weak_from_this()); + msg.assign_id(), msg.read_offset(), CbContext); NextPartitionStreamId += PartitionStreamIdStep; // Renew partition stream. @@ -1022,7 +1052,7 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( if (currentPartitionStream) { CookieMapping.RemoveMapping(currentPartitionStream->GetPartitionStreamId()); bool pushRes = EventsQueue->PushEvent( - currentPartitionStream, weak_from_this(), + currentPartitionStream, TReadSessionEvent::TPartitionStreamClosedEvent( currentPartitionStream, TReadSessionEvent::TPartitionStreamClosedEvent::EReason::Lost), deferred); @@ -1035,7 +1065,7 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( // Send event to user. bool pushRes = EventsQueue->PushEvent( - partitionStream, weak_from_this(), + partitionStream, TReadSessionEvent::TCreatePartitionStreamEvent(partitionStream, msg.read_offset(), msg.end_offset()), deferred); if (!pushRes) { @@ -1060,13 +1090,13 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( if (msg.forceful_release()) { PartitionStreams.erase(msg.assign_id()); CookieMapping.RemoveMapping(partitionStream->GetPartitionStreamId()); - pushRes = EventsQueue->PushEvent(partitionStream, weak_from_this(), + pushRes = EventsQueue->PushEvent(partitionStream, TReadSessionEvent::TPartitionStreamClosedEvent( partitionStream, TReadSessionEvent::TPartitionStreamClosedEvent::EReason::Lost), deferred); } else { pushRes = EventsQueue->PushEvent( - partitionStream, weak_from_this(), + partitionStream, TReadSessionEvent::TDestroyPartitionStreamEvent(std::move(partitionStream), msg.commit_offset()), deferred); } @@ -1097,7 +1127,7 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( } for (auto& [id, partitionStream] : partitionStreams) { bool pushRes = EventsQueue->PushEvent( - partitionStream, weak_from_this(), + partitionStream, TReadSessionEvent::TCommitAcknowledgementEvent(partitionStream, partitionStream->GetMaxCommittedOffset()), deferred); if (!pushRes) { @@ -1112,7 +1142,7 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( auto partitionStream = partitionStreamIt->second; partitionStream->UpdateMaxCommittedOffset(rangeProto.end_offset()); bool pushRes = EventsQueue->PushEvent( - partitionStream, weak_from_this(), + partitionStream, TReadSessionEvent::TCommitAcknowledgementEvent(partitionStream, rangeProto.end_offset()), deferred); if (!pushRes) { @@ -1134,7 +1164,7 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( if (partitionStreamIt == PartitionStreams.end()) { return; } - bool pushRes = EventsQueue->PushEvent(partitionStreamIt->second, weak_from_this(), + bool pushRes = EventsQueue->PushEvent(partitionStreamIt->second, TReadSessionEvent::TPartitionStreamStatusEvent( partitionStreamIt->second, msg.committed_offset(), 0, // TODO: support read offset in status @@ -1231,7 +1261,7 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( partitionStream->SetFirstNotReadOffset(desiredOffset); auto decompressionInfo = std::make_shared<TDataDecompressionInfo<false>>(std::move(partitionData), - shared_from_this(), + CbContext, Settings.Decompress_, serverBytesSize); // TODO (ildar-khisam@): share serverBytesSize between partitions data according to their actual sizes; @@ -1259,14 +1289,14 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( auto partitionStream = MakeIntrusive<TPartitionStreamImpl<false>>( NextPartitionStreamId, msg.partition_session().path(), msg.partition_session().partition_id(), msg.partition_session().partition_session_id(), msg.committed_offset(), - weak_from_this()); + CbContext); NextPartitionStreamId += PartitionStreamIdStep; // Renew partition stream. TIntrusivePtr<TPartitionStreamImpl<false>>& currentPartitionStream = PartitionStreams[partitionStream->GetAssignId()]; if (currentPartitionStream) { bool pushRes = EventsQueue->PushEvent( - currentPartitionStream, weak_from_this(), + currentPartitionStream, NTopic::TReadSessionEvent::TPartitionSessionClosedEvent( currentPartitionStream, NTopic::TReadSessionEvent::TPartitionSessionClosedEvent::EReason::Lost), deferred); @@ -1278,7 +1308,7 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( currentPartitionStream = partitionStream; // Send event to user. - bool pushRes = EventsQueue->PushEvent(partitionStream, weak_from_this(), + bool pushRes = EventsQueue->PushEvent(partitionStream, NTopic::TReadSessionEvent::TStartPartitionSessionEvent( partitionStream, msg.committed_offset(), msg.partition_offsets().end()), deferred); @@ -1303,13 +1333,13 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( bool pushRes = true; if (!msg.graceful()) { PartitionStreams.erase(msg.partition_session_id()); - pushRes = EventsQueue->PushEvent(partitionStream, weak_from_this(), + pushRes = EventsQueue->PushEvent(partitionStream, NTopic::TReadSessionEvent::TPartitionSessionClosedEvent( partitionStream, NTopic::TReadSessionEvent::TPartitionSessionClosedEvent::EReason::Lost), deferred); } else { pushRes = EventsQueue->PushEvent( - partitionStream, weak_from_this(), + partitionStream, NTopic::TReadSessionEvent::TStopPartitionSessionEvent(std::move(partitionStream), msg.committed_offset()), deferred); } @@ -1333,7 +1363,7 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( if (partitionStreamIt != PartitionStreams.end()) { auto partitionStream = partitionStreamIt->second; partitionStream->UpdateMaxCommittedOffset(rangeProto.committed_offset()); - bool pushRes = EventsQueue->PushEvent(partitionStream, weak_from_this(), + bool pushRes = EventsQueue->PushEvent(partitionStream, NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent( partitionStream, rangeProto.committed_offset()), deferred); @@ -1356,7 +1386,7 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( if (partitionStreamIt == PartitionStreams.end()) { return; } - bool pushRes = EventsQueue->PushEvent(partitionStreamIt->second, weak_from_this(), + bool pushRes = EventsQueue->PushEvent(partitionStreamIt->second, NTopic::TReadSessionEvent::TPartitionSessionStatusEvent( partitionStreamIt->second, msg.committed_offset(), 0, // TODO: support read offset in status @@ -1421,7 +1451,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::DestroyAllPartitionStr >; for (auto&& [key, partitionStream] : PartitionStreams) { - bool pushRes = EventsQueue->PushEvent(partitionStream, TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(), + bool pushRes = EventsQueue->PushEvent(partitionStream, TClosedEvent(std::move(partitionStream), TClosedEvent::EReason::ConnectionLost), deferred); if (!pushRes) { @@ -1738,22 +1768,22 @@ bool TSingleClusterReadSessionImpl<UseMigrationProtocol>::TPartitionCookieMappin template<bool UseMigrationProtocol> TReadSessionEventInfo<UseMigrationProtocol>::TReadSessionEventInfo(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, - std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session, + TCallbackContextPtr<UseMigrationProtocol> cbContext, TEvent event) : PartitionStream(std::move(partitionStream)) , Event(std::move(event)) - , Session(std::move(session)) + , CbContext(std::move(cbContext)) { } template<bool UseMigrationProtocol> TReadSessionEventInfo<UseMigrationProtocol>::TReadSessionEventInfo(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, - std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session, + TCallbackContextPtr<UseMigrationProtocol> cbContext, bool hasDataEvents) : PartitionStream(std::move(partitionStream)) , HasDataEvents(hasDataEvents) , EventsCount(1) - , Session(std::move(session)) + , CbContext(std::move(cbContext)) { } @@ -1772,12 +1802,8 @@ bool TReadSessionEventInfo<UseMigrationProtocol>::IsDataEvent() const { template <bool UseMigrationProtocol> TReadSessionEventsQueue<UseMigrationProtocol>::TReadSessionEventsQueue( - const TAReadSessionSettings<UseMigrationProtocol>& settings, - std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session, - std::shared_ptr<TImplTracker> tracker) - : TParent(settings) - , Session(std::move(session)) - , Tracker(std::move(tracker)) { + const TAReadSessionSettings<UseMigrationProtocol>& settings) + : TParent(settings) { const auto& h = TParent::Settings.EventHandlers_; if constexpr (UseMigrationProtocol) { @@ -1803,7 +1829,6 @@ TReadSessionEventsQueue<UseMigrationProtocol>::TReadSessionEventsQueue( template <bool UseMigrationProtocol> bool TReadSessionEventsQueue<UseMigrationProtocol>::PushEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream, - std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> /*session*/, typename TAReadSessionEvent<UseMigrationProtocol>::TEvent event, TDeferredActions<UseMigrationProtocol>& deferred) { @@ -1840,10 +1865,10 @@ void TReadSessionEventsQueue<UseMigrationProtocol>::SignalEventImpl( return; } - auto session = partitionStream->GetSession(); + auto cbContext = partitionStream->GetCbContext(); if (TParent::Events.empty()) { - TParent::Events.emplace(std::move(partitionStream), std::move(session), isDataEvent); + TParent::Events.emplace(std::move(partitionStream), std::move(cbContext), isDataEvent); } else { auto& event = TParent::Events.back(); if (event.HasDataEvents @@ -1851,7 +1876,7 @@ void TReadSessionEventsQueue<UseMigrationProtocol>::SignalEventImpl( && (event.PartitionStream == partitionStream)) { ++event.EventsCount; } else { - TParent::Events.emplace(std::move(partitionStream), std::move(session), isDataEvent); + TParent::Events.emplace(std::move(partitionStream), std::move(cbContext), isDataEvent); } } @@ -1875,22 +1900,6 @@ bool TReadSessionEventsQueue<UseMigrationProtocol>::PushDataEvent(TIntrusivePtr< } template <bool UseMigrationProtocol> -void TPartitionStreamImpl<UseMigrationProtocol>::GetDataEventImpl(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, - size_t& maxEventsCount, - size_t& maxByteSize, - TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage>& messages, - TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage>& compressedMessages, - TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>& accumulator) -{ - partitionStream->EventsQueue.GetDataEventImpl(partitionStream, - maxEventsCount, - maxByteSize, - messages, - compressedMessages, - accumulator); -} - -template <bool UseMigrationProtocol> void TRawPartitionStreamEventQueue<UseMigrationProtocol>::GetDataEventImpl(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, size_t& maxEventsCount, size_t& maxByteSize, @@ -1989,7 +1998,7 @@ TReadSessionEventsQueue<UseMigrationProtocol>::GetEventImpl(size_t& maxByteSize, } TMaybe<typename TAReadSessionEvent<UseMigrationProtocol>::TEvent> event; - auto frontSession = front.Session; + auto frontCbContext = front.CbContext; if (partitionStream->TopEvent().IsDataEvent()) { event = GetDataEventImpl(partitionStream, maxByteSize, accumulator); } else { @@ -2001,12 +2010,12 @@ TReadSessionEventsQueue<UseMigrationProtocol>::GetEventImpl(size_t& maxByteSize, TParent::RenewWaiterImpl(); - return {partitionStream, std::move(frontSession), std::move(*event)}; + return {partitionStream, std::move(frontCbContext), std::move(*event)}; } Y_ASSERT(TParent::CloseEvent); - return {*TParent::CloseEvent, Session}; + return {*TParent::CloseEvent}; } template <bool UseMigrationProtocol> @@ -2104,7 +2113,7 @@ template <bool UseMigrationProtocol> bool TReadSessionEventsQueue<UseMigrationProtocol>::TryApplyCallbackToEventImpl(typename TParent::TEvent& event, TDeferredActions<UseMigrationProtocol>& deferred) { - THandlersVisitor visitor(TParent::Settings, event, deferred, Tracker); + THandlersVisitor visitor(TParent::Settings, event, deferred); return visitor.Visit(); } @@ -2133,8 +2142,7 @@ void TReadSessionEventsQueue<UseMigrationProtocol>::ApplyCallbackToEventImpl(TDa Y_VERIFY(HasEventCallbacks); if (TParent::Settings.EventHandlers_.DataReceivedHandler_) { - auto action = [wire = Tracker->MakeTrackedWire(), - func = TParent::Settings.EventHandlers_.DataReceivedHandler_, + auto action = [func = TParent::Settings.EventHandlers_.DataReceivedHandler_, data = std::move(data), eventsInfo = std::move(eventsInfo)]() mutable { func(data); @@ -2143,8 +2151,7 @@ void TReadSessionEventsQueue<UseMigrationProtocol>::ApplyCallbackToEventImpl(TDa deferred.DeferStartExecutorTask(TParent::Settings.EventHandlers_.HandlersExecutor_, std::move(action)); } else if (TParent::Settings.EventHandlers_.CommonHandler_) { - auto action = [wire = Tracker->MakeTrackedWire(), - func = TParent::Settings.EventHandlers_.CommonHandler_, + auto action = [func = TParent::Settings.EventHandlers_.CommonHandler_, data = std::move(data), eventsInfo = std::move(eventsInfo)]() mutable { typename TParent::TEvent event(std::move(data)); @@ -2185,12 +2192,12 @@ void TReadSessionEventsQueue<UseMigrationProtocol>::ClearAllEvents() { template<bool UseMigrationProtocol> TDataDecompressionInfo<UseMigrationProtocol>::TDataDecompressionInfo( TPartitionData<UseMigrationProtocol>&& msg, - std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, + TCallbackContextPtr<UseMigrationProtocol> cbContext, bool doDecompress, i64 serverBytesSize ) : ServerMessage(std::move(msg)) - , Session(std::move(session)) + , CbContext(std::move(cbContext)) , DoDecompress(doDecompress) , ServerBytesSize(serverBytesSize) { @@ -2214,7 +2221,7 @@ TDataDecompressionInfo<UseMigrationProtocol>::TDataDecompressionInfo( template<bool UseMigrationProtocol> TDataDecompressionInfo<UseMigrationProtocol>::~TDataDecompressionInfo() { - if (auto session = Session.lock()) { + if (auto session = CbContext->LockShared()) { session->OnDecompressionInfoDestroy(CompressedDataSize, DecompressedDataSize, MessagesInflight, ServerBytesSize); } } @@ -2288,7 +2295,7 @@ i64 TDataDecompressionInfo<UseMigrationProtocol>::StartDecompressionTasks( const typename IAExecutor<UseMigrationProtocol>::TPtr& executor, i64 availableMemory, TDeferredActions<UseMigrationProtocol>& deferred) { - auto session = Session.lock(); + auto session = CbContext->LockShared(); Y_ASSERT(session); i64 used = 0; @@ -2314,7 +2321,7 @@ void TDataDecompressionInfo<UseMigrationProtocol>::PlanDecompressionTasks(double { constexpr size_t TASK_LIMIT = 512_KB; - auto session = Session.lock(); + auto session = CbContext->LockShared(); Y_ASSERT(session); ReadyThresholds.emplace_back(); @@ -2340,8 +2347,10 @@ void TDataDecompressionInfo<UseMigrationProtocol>::PlanDecompressionTasks(double TDataDecompressionInfo::shared_from_this(), ReadyThresholds.back().Ready); if (!pushRes) { - session->AbortImpl(); + // with_lock(session->Lock) { + session->Abort(); return; + // } } } @@ -2471,7 +2480,7 @@ void TDataDecompressionInfo<UseMigrationProtocol>::OnDataDecompressed(i64 source CompressedDataSize -= sourceSize; DecompressedDataSize += decompressedSize; - if (auto session = Session.lock()) { + if (auto session = CbContext->LockShared()) { // TODO (ildar-khisam@): distribute total ServerBytesSize in proportion of source size // Use CompressedDataSize, sourceSize, ServerBytesSize session->OnDataDecompressed(sourceSize, estimatedDecompressedSize, decompressedSize, messagesCount, std::exchange(ServerBytesSize, 0)); @@ -2484,7 +2493,7 @@ void TDataDecompressionInfo<UseMigrationProtocol>::OnUserRetrievedEvent(i64 deco MessagesInflight -= messagesCount; DecompressedDataSize -= decompressedSize; - if (auto session = Session.lock()) { + if (auto session = CbContext->LockShared()) { session->OnUserRetrievedEvent(decompressedSize, messagesCount); } } @@ -2561,27 +2570,25 @@ void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::operator( Parent->PutDecompressionError(std::current_exception(), messages.Batch, i); data.clear_data(); // Free memory, because we don't count it. - std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session = Parent->Session.lock(); - if (session) { + if (auto session = Parent->CbContext->LockShared()) { session->GetLog() << TLOG_INFO << "Error decompressing data: " << CurrentExceptionMessage(); } } } } - if (auto session = Parent->Session.lock()) { + if (auto session = Parent->CbContext->LockShared()) { LOG_LAZY(session->GetLog(), TLOG_DEBUG, TStringBuilder() << "Decompression task done. Partition/PartitionSessionId: " << partition_id << " (" << minOffset << "-" << maxOffset << ")"); } Y_ASSERT(dataProcessed == SourceDataSize); - std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session = Parent->Session.lock(); Parent->OnDataDecompressed(SourceDataSize, EstimatedDecompressedSize, DecompressedSize, messagesProcessed); Parent->SourceDataNotProcessed -= dataProcessed; Ready->Ready = true; - if (session) { + if (auto session = Parent->CbContext->LockShared()) { session->GetEventsQueue()->SignalReadyEvents(PartitionStream); } } @@ -2628,37 +2635,37 @@ void TDeferredActions<UseMigrationProtocol>::DeferStartExecutorTask(const typena } template<bool UseMigrationProtocol> -void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, TASessionClosedEvent<UseMigrationProtocol>&& closeEvent) { - Session = session; +void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(TCallbackContextPtr<UseMigrationProtocol> cbContext, TASessionClosedEvent<UseMigrationProtocol>&& closeEvent) { + CbContext = std::move(cbContext); SessionClosedEvent.ConstructInPlace(std::move(closeEvent)); } template<bool UseMigrationProtocol> -void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, EStatus statusCode, NYql::TIssues&& issues) { - DeferAbortSession(session, TASessionClosedEvent<UseMigrationProtocol>(statusCode, std::move(issues))); +void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(TCallbackContextPtr<UseMigrationProtocol> cbContext, EStatus statusCode, NYql::TIssues&& issues) { + DeferAbortSession(std::move(cbContext), TASessionClosedEvent<UseMigrationProtocol>(statusCode, std::move(issues))); } template<bool UseMigrationProtocol> -void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, EStatus statusCode, const TString& message) { +void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(TCallbackContextPtr<UseMigrationProtocol> cbContext, EStatus statusCode, const TString& message) { NYql::TIssues issues; issues.AddIssue(message); - DeferAbortSession(session, statusCode, std::move(issues)); + DeferAbortSession(std::move(cbContext), statusCode, std::move(issues)); } template<bool UseMigrationProtocol> -void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, TPlainStatus&& status) { - DeferAbortSession(session, TASessionClosedEvent<UseMigrationProtocol>(std::move(status))); +void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(TCallbackContextPtr<UseMigrationProtocol> cbContext, TPlainStatus&& status) { + DeferAbortSession(std::move(cbContext), TASessionClosedEvent<UseMigrationProtocol>(std::move(status))); } template<bool UseMigrationProtocol> -void TDeferredActions<UseMigrationProtocol>::DeferReconnection(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, TPlainStatus&& status) { - Session = std::move(session); +void TDeferredActions<UseMigrationProtocol>::DeferReconnection(TCallbackContextPtr<UseMigrationProtocol> cbContext, TPlainStatus&& status) { + CbContext = std::move(cbContext); ReconnectionStatus = std::move(status); } template<bool UseMigrationProtocol> -void TDeferredActions<UseMigrationProtocol>::DeferStartSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session) { - Sessions.push_back(std::move(session)); +void TDeferredActions<UseMigrationProtocol>::DeferStartSession(TCallbackContextPtr<UseMigrationProtocol> cbContext) { + CbContexts.push_back(std::move(cbContext)); } template<bool UseMigrationProtocol> @@ -2684,8 +2691,10 @@ void TDeferredActions<UseMigrationProtocol>::DoActions() { template<bool UseMigrationProtocol> void TDeferredActions<UseMigrationProtocol>::StartSessions() { - for (auto& session : Sessions) { - session->Start(); + for (auto& ctx : CbContexts) { + if (auto session = ctx->LockShared()) { + session->Start(); + } } } @@ -2708,16 +2717,20 @@ void TDeferredActions<UseMigrationProtocol>::StartExecutorTasks() { template<bool UseMigrationProtocol> void TDeferredActions<UseMigrationProtocol>::AbortSession() { if (SessionClosedEvent) { - Y_VERIFY(Session); - Session->AbortSession(std::move(*SessionClosedEvent)); + Y_VERIFY(CbContext); + if (auto session = CbContext->LockShared()) { + session->AbortSession(std::move(*SessionClosedEvent)); + } } } template<bool UseMigrationProtocol> void TDeferredActions<UseMigrationProtocol>::Reconnect() { - if (Session) { - if (!Session->Reconnect(ReconnectionStatus)) { - Session->AbortSession(std::move(ReconnectionStatus)); + if (CbContext) { + if (auto session = CbContext->LockShared()) { + if (!session->Reconnect(ReconnectionStatus)) { + session->AbortSession(std::move(ReconnectionStatus)); + } } } } diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp index 607ebfb8552..9b1248ef375 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp @@ -17,9 +17,10 @@ TWriteSession::TWriteSession( std::shared_ptr<TPersQueueClient::TImpl> client, std::shared_ptr<TGRpcConnectionsImpl> connections, TDbDriverStatePtr dbDriverState) - : Tracker(std::make_shared<TImplTracker>()) - , Impl(std::make_shared<TWriteSessionImpl>(settings, std::move(client), std::move(connections), std::move(dbDriverState), Tracker)) + : Impl(std::make_shared<TWriteSessionImpl>(settings, std::move(client), std::move(connections), std::move(dbDriverState))) { + CbContext = std::make_shared<TCallbackContext<TWriteSessionImpl>>(Impl); + Impl->SetCallbackContext(CbContext); } void TWriteSession::Start(const TDuration& delay) { @@ -58,7 +59,7 @@ bool TWriteSession::Close(TDuration closeTimeout) { TWriteSession::~TWriteSession() { Impl->Close(TDuration::Zero()); - Tracker->AsyncComplete().Wait(); + CbContext->Cancel(); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h index 62732d67c94..404b9444e58 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h @@ -1,11 +1,11 @@ #pragma once #include "common.h" -#include "impl_tracker.h" #include "persqueue_impl.h" #include "write_session_impl.h" #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/callback_context.h> #include <util/generic/buffer.h> @@ -58,7 +58,7 @@ private: void Start(const TDuration& delay); private: - std::shared_ptr<TImplTracker> Tracker; + std::shared_ptr<NPersQueue::TCallbackContext<TWriteSessionImpl>> CbContext; std::shared_ptr<TWriteSessionImpl> Impl; }; diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp index 908ee390333..531f2655e6e 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp @@ -45,15 +45,13 @@ TWriteSessionImpl::TWriteSessionImpl( const TWriteSessionSettings& settings, std::shared_ptr<TPersQueueClient::TImpl> client, std::shared_ptr<TGRpcConnectionsImpl> connections, - TDbDriverStatePtr dbDriverState, - std::shared_ptr<TImplTracker> tracker) + TDbDriverStatePtr dbDriverState) : Settings(settings) , Client(std::move(client)) , Connections(std::move(connections)) , DbDriverState(std::move(dbDriverState)) , PrevToken(DbDriverState->CredentialsProvider ? DbDriverState->CredentialsProvider->GetAuthInfo() : "") - , Tracker(tracker) - , EventsQueue(std::make_shared<TWriteSessionEventsQueue>(Settings, Tracker)) + , EventsQueue(std::make_shared<TWriteSessionEventsQueue>(Settings)) , InitSeqNoPromise(NThreading::NewPromise<ui64>()) , WakeupInterval( Settings.BatchFlushInterval_.GetOrElse(TDuration::Zero()) ? @@ -77,7 +75,12 @@ TWriteSessionImpl::TWriteSessionImpl( } +void TWriteSessionImpl::SetCallbackContext(std::shared_ptr<TCallbackContext<TWriteSessionImpl>> ctx) { + CbContext = std::move(ctx); +} + void TWriteSessionImpl::Start(const TDuration& delay) { + Y_VERIFY(CbContext); ++ConnectionAttemptsDone; if (!Started) { with_lock(Lock) { @@ -143,14 +146,16 @@ void TWriteSessionImpl::DoCdsRequest(TDuration delay) { (Settings.ClusterDiscoveryMode_ == EClusterDiscoveryMode::Auto && !IsFederation(DbDriverState->DiscoveryEndpoint))); if (!cdsRequestIsUnnecessary) { - auto extractor = [sharedThis = shared_from_this(), wire = Tracker->MakeTrackedWire()] + auto extractor = [cbContext = CbContext] (google::protobuf::Any* any, TPlainStatus status) mutable { Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResult result; if (any) { any->UnpackTo(&result); } TStatus st(std::move(status)); - sharedThis->OnCdsResponse(st, result); + if (auto self = cbContext->LockShared()) { + self->OnCdsResponse(st, result); + } }; Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest req; @@ -163,7 +168,7 @@ void TWriteSessionImpl::DoCdsRequest(TDuration delay) { params->set_preferred_cluster_name(*Settings.PreferredCluster_); LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Do schedule cds request after " << delay.MilliSeconds() << " ms\n"); - auto cdsRequestCall = [wire = Tracker->MakeTrackedWire(), req_=std::move(req), extr=std::move(extractor), connections = std::shared_ptr<TGRpcConnectionsImpl>(Connections), dbState=DbDriverState, settings=Settings]() mutable { + auto cdsRequestCall = [req_=std::move(req), extr=std::move(extractor), connections = std::shared_ptr<TGRpcConnectionsImpl>(Connections), dbState=DbDriverState, settings=Settings]() mutable { LOG_LAZY(dbState->Log, TLOG_INFO, TStringBuilder() << "MessageGroupId [" << settings.MessageGroupId_ << "] Running cds request ms\n"); connections->RunDeferred<Ydb::PersQueue::V1::ClusterDiscoveryService, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest, @@ -463,19 +468,18 @@ void TWriteSessionImpl::DoConnect(const TDuration& delay, const TString& endpoin Y_ASSERT(connectTimeoutContext); reqSettings = TRpcRequestSettings::Make(Settings); - connectCallback = [sharedThis = shared_from_this(), - wire = Tracker->MakeTrackedWire(), - connectContext = connectContext] - (TPlainStatus&& st, typename IProcessor::TPtr&& processor) { - sharedThis->OnConnect(std::move(st), std::move(processor), connectContext); + connectCallback = [cbContext = CbContext, + connectContext = connectContext](TPlainStatus&& st, typename IProcessor::TPtr&& processor) { + if (auto self = cbContext->LockShared()) { + self->OnConnect(std::move(st), std::move(processor), connectContext); + } }; - connectTimeoutCallback = [sharedThis = shared_from_this(), - wire = Tracker->MakeTrackedWire(), - connectTimeoutContext = connectTimeoutContext] - (bool ok) { + connectTimeoutCallback = [cbContext = CbContext, connectTimeoutContext = connectTimeoutContext](bool ok) { if (ok) { - sharedThis->OnConnectTimeout(connectTimeoutContext); + if (auto self = cbContext->LockShared()) { + self->OnConnectTimeout(connectTimeoutContext); + } } }; } @@ -586,10 +590,11 @@ void TWriteSessionImpl::WriteToProcessorImpl(TWriteSessionImpl::TClientMessage&& if (Aborting) { return; } - auto callback = [sharedThis = shared_from_this(), - wire = Tracker->MakeTrackedWire(), + auto callback = [cbContext = CbContext, connectionGeneration = ConnectionGeneration](NGrpc::TGrpcStatus&& grpcStatus) { - sharedThis->OnWriteDone(std::move(grpcStatus), connectionGeneration); + if (auto self = cbContext->LockShared()) { + self->OnWriteDone(std::move(grpcStatus), connectionGeneration); + } }; Processor->Write(std::move(req), std::move(callback)); @@ -606,13 +611,14 @@ void TWriteSessionImpl::ReadFromProcessor() { } prc = Processor; generation = ConnectionGeneration; - callback = [sharedThis = shared_from_this(), - wire = Tracker->MakeTrackedWire(), - connectionGeneration = generation, - processor = prc, - serverMessage = ServerMessage] - (NGrpc::TGrpcStatus&& grpcStatus) { - sharedThis->OnReadDone(std::move(grpcStatus), connectionGeneration); + callback = [cbContext = CbContext, + connectionGeneration = generation, + processor = prc, + serverMessage = ServerMessage] + (NGrpc::TGrpcStatus&& grpcStatus) { + if (auto self = cbContext->LockShared()) { + self->OnReadDone(std::move(grpcStatus), connectionGeneration); + } }; } prc->Read(ServerMessage.get(), std::move(callback)); @@ -896,8 +902,7 @@ void TWriteSessionImpl::CompressImpl(TBlock&& block_) { std::shared_ptr<TBlock> blockPtr(std::make_shared<TBlock>()); blockPtr->Move(block_); - auto lambda = [sharedThis = shared_from_this(), - wire = Tracker->MakeTrackedWire(), + auto lambda = [cbContext = CbContext, codec = Settings.Codec_, level = Settings.CompressionLevel_, isSyncCompression = !CompressionExecutor->IsAsync(), @@ -910,8 +915,10 @@ void TWriteSessionImpl::CompressImpl(TBlock&& block_) { Y_VERIFY(!compressedData.Empty()); blockPtr->Data = std::move(compressedData); blockPtr->Compressed = true; - blockPtr->CodecID = GetCodecId(sharedThis->Settings.Codec_); - sharedThis->OnCompressed(std::move(*blockPtr), isSyncCompression); + blockPtr->CodecID = GetCodecId(codec); + if (auto self = cbContext->LockShared()) { + self->OnCompressed(std::move(*blockPtr), isSyncCompression); + } }; CompressionExecutor->Post(std::move(lambda)); @@ -1272,13 +1279,16 @@ void TWriteSessionImpl::HandleWakeUpImpl() { if (AtomicGet(Aborting)) { return; } - auto callback = [sharedThis = this->shared_from_this(), wire = Tracker->MakeTrackedWire()] (bool ok) + auto callback = [cbContext = CbContext] (bool ok) { if (!ok) { return; } - with_lock(sharedThis->Lock) { - sharedThis->HandleWakeUpImpl(); + + if (auto self = cbContext->LockShared()) { + with_lock(self->Lock) { + self->HandleWakeUpImpl(); + } } }; diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h index 8124fc60320..06f27b86007 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h @@ -1,10 +1,10 @@ #pragma once #include "common.h" -#include "impl_tracker.h" #include "persqueue_impl.h" #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/callback_context.h> #include <util/generic/buffer.h> @@ -28,10 +28,8 @@ class TWriteSessionEventsQueue: public TBaseSessionEventsQueue<TWriteSessionSett using TParent = TBaseSessionEventsQueue<TWriteSessionSettings, TWriteSessionEvent::TEvent, TSessionClosedEvent, IExecutor>; public: - TWriteSessionEventsQueue(const TWriteSessionSettings& settings, - std::shared_ptr<TImplTracker> tracker = std::make_shared<TImplTracker>()) + TWriteSessionEventsQueue(const TWriteSessionSettings& settings) : TParent(settings) - , Tracker(std::move(tracker)) {} void PushEvent(TEventInfo eventInfo) { @@ -131,7 +129,7 @@ private: }; bool ApplyHandler(TEventInfo& eventInfo) { - THandlersVisitor visitor(Settings, eventInfo.GetEvent(), Tracker); + THandlersVisitor visitor(Settings, eventInfo.GetEvent()); return visitor.Visit(); } @@ -146,9 +144,6 @@ private: Y_ASSERT(CloseEvent); return {*CloseEvent}; } - -private: - std::shared_ptr<TImplTracker> Tracker; }; struct TMemoryUsageChange { @@ -305,8 +300,7 @@ public: TWriteSessionImpl(const TWriteSessionSettings& settings, std::shared_ptr<TPersQueueClient::TImpl> client, std::shared_ptr<TGRpcConnectionsImpl> connections, - TDbDriverStatePtr dbDriverState, - std::shared_ptr<TImplTracker> tracker); + TDbDriverStatePtr dbDriverState); TMaybe<TWriteSessionEvent::TEvent> GetEvent(bool block = false) override; TVector<TWriteSessionEvent::TEvent> GetEvents(bool block = false, @@ -329,6 +323,8 @@ public: ~TWriteSessionImpl(); // will not call close - destroy everything without acks + void SetCallbackContext(std::shared_ptr<TCallbackContext<TWriteSessionImpl>> ctx); + private: TStringBuilder LogPrefix() const; @@ -396,7 +392,7 @@ private: TStringType PrevToken; bool UpdateTokenInProgress = false; TInstant LastTokenUpdate = TInstant::Zero(); - std::shared_ptr<TImplTracker> Tracker; + std::shared_ptr<TCallbackContext<TWriteSessionImpl>> CbContext; std::shared_ptr<TWriteSessionEventsQueue> EventsQueue; NGrpc::IQueueClientContextPtr ClientContext; // Common client context. NGrpc::IQueueClientContextPtr ConnectContext; diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp index 1e399534d67..989ff5b72ab 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp @@ -510,6 +510,7 @@ public: ui64 PartitionIdStart = 1; ui64 PartitionIdStep = 1; typename TSingleClusterReadSessionImpl<true>::TPtr Session; + std::shared_ptr<TCallbackContext<TSingleClusterReadSessionImpl<true>>> CbContext; std::shared_ptr<TThreadPool> ThreadPool; ::IExecutor::TPtr DefaultExecutor; }; @@ -598,6 +599,7 @@ TReadSessionImplTestSetup::~TReadSessionImplTestSetup() noexcept(false) { MockProcessor->Validate(); } + CbContext->Cancel(); Session = nullptr; ThreadPool->Stop(); @@ -630,13 +632,14 @@ TSingleClusterReadSessionImpl<true>* TReadSessionImplTestSetup::GetSession() { GetEventsQueue(), FakeContext, PartitionIdStart, PartitionIdStep); + CbContext = Session->MakeCallbackContext(); } return Session.get(); } std::shared_ptr<TReadSessionEventsQueue<true>> TReadSessionImplTestSetup::GetEventsQueue() { if (!EventsQueue) { - EventsQueue = std::make_shared<TReadSessionEventsQueue<true>>(Settings, std::weak_ptr<IUserRetrievedEventCallback<true>>()); + EventsQueue = std::make_shared<TReadSessionEventsQueue<true>>(Settings); } return EventsQueue; } @@ -727,14 +730,14 @@ Y_UNIT_TEST_SUITE(PersQueueSdkReadSessionTest) { } // Event 4: commit ack. - if (commit && !close) { // (commit && close) branch check is broken with current TReadSession::Close quick fix + if (commit) { // (commit && close) branch check is broken with current TReadSession::Close quick fix TMaybe<TReadSessionEvent::TEvent> event = session->GetEvent(!close); // Event is expected to be already in queue if closed. UNIT_ASSERT(event); - Cerr << "commit ack event " << DebugString(*event) << Endl; - UNIT_ASSERT(std::holds_alternative<TReadSessionEvent::TCommitAcknowledgementEvent>(*event)); + Cerr << "commit ack or close event " << DebugString(*event) << Endl; + UNIT_ASSERT(std::holds_alternative<TReadSessionEvent::TCommitAcknowledgementEvent>(*event) || std::holds_alternative<TSessionClosedEvent>(*event)); } - if (close) { + if (close && !commit) { TMaybe<TReadSessionEvent::TEvent> event = session->GetEvent(false); UNIT_ASSERT(event); Cerr << "close event " << DebugString(*event) << Endl; @@ -1885,7 +1888,8 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) { TAReadSessionSettings<true> settings; std::shared_ptr<TSingleClusterReadSessionImpl<true>> session; - TReadSessionEventsQueue<true> sessionQueue{settings, session}; + auto cbCtx = std::make_shared<TCallbackContext<TSingleClusterReadSessionImpl<true>>>(session); + TReadSessionEventsQueue<true> sessionQueue{settings}; auto stream = MakeIntrusive<TPartitionStreamImpl<true>>(1ull, "", @@ -1894,7 +1898,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) { 1ull, 1ull, 0ull, - session); + cbCtx); TPartitionData<true> message; Ydb::PersQueue::V1::MigrationStreamingReadServerMessage_DataBatch_Batch* batch = @@ -1904,7 +1908,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) { *messageData->mutable_data() = "*"; auto data = std::make_shared<TDataDecompressionInfo<true>>(std::move(message), - session, + cbCtx, false, 0); @@ -1933,5 +1937,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) { #undef UNIT_ASSERT_CONTROL_EVENT #undef UNIT_ASSERT_DATA_EVENT + + cbCtx->Cancel(); } } diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp index fb340907393..0d7ad23ea1b 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp @@ -37,14 +37,16 @@ TReadSession::~TReadSession() { Abort(EStatus::ABORTED, "Aborted"); ClearAllEvents(); - if (Tracker) { - Tracker->AsyncComplete().Wait(); + if (CbContext) { + CbContext->Cancel(); + } + if (DumpCountersContext) { + DumpCountersContext->Cancel(); } } void TReadSession::Start() { - Tracker = std::make_shared<NPersQueue::TImplTracker>(); - EventsQueue = std::make_shared<NPersQueue::TReadSessionEventsQueue<false>>(Settings, weak_from_this(), Tracker); + EventsQueue = std::make_shared<NPersQueue::TReadSessionEventsQueue<false>>(Settings); if (!ValidateSettings()) { return; @@ -60,7 +62,7 @@ void TReadSession::Start() { Topics = Settings.Topics_; CreateClusterSessionsImpl(deferred); } - // ScheduleDumpCountersToLog(); + SetupCountersLogger(); } void TReadSession::CreateClusterSessionsImpl(NPersQueue::TDeferredActions<false>& deferred) { @@ -85,10 +87,10 @@ void TReadSession::CreateClusterSessionsImpl(NPersQueue::TDeferredActions<false> Client->CreateReadSessionConnectionProcessorFactory(), EventsQueue, context, - 1, 1, - Tracker); + 1, 1); - deferred.DeferStartSession(Session); + CbContext = Session->MakeCallbackContext(); + deferred.DeferStartSession(CbContext); } bool TReadSession::ValidateSettings() { @@ -97,12 +99,12 @@ bool TReadSession::ValidateSettings() { issues.AddIssue("Empty topics list."); } - if (Settings.ConsumerName_.empty() && !Settings.WithoutConsumer_) { - issues.AddIssue("No consumer specified."); + if (Settings.ConsumerName_.empty() && !Settings.WithoutConsumer_) { + issues.AddIssue("No consumer specified."); } - if (!Settings.ConsumerName_.empty() && Settings.WithoutConsumer_) { - issues.AddIssue("No need to specify a consumer when reading without a consumer."); + if (!Settings.ConsumerName_.empty() && Settings.WithoutConsumer_) { + issues.AddIssue("No need to specify a consumer when reading without a consumer."); } if (Settings.MaxMemoryUsageBytes_ < 1_MB) { @@ -244,11 +246,10 @@ void TReadSession::UpdateOffsets(const NTable::TTransaction& tx) bool TReadSession::Close(TDuration timeout) { LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Closing read session. Close timeout: " << timeout); // Log final counters. - DumpCountersToLog(); + CountersLogger->Stop(); with_lock (Lock) { if (DumpCountersContext) { DumpCountersContext->Cancel(); - DumpCountersContext.reset(); } } @@ -321,15 +322,6 @@ TStringBuilder TReadSession::GetLogPrefix() const { return TStringBuilder() << GetDatabaseLogPrefix(DbDriverState->Database) << "[" << SessionId << "] "; } -void TReadSession::OnUserRetrievedEvent(i64 decompressedSize, size_t messagesCount) { - LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix() - << "The application data is transferred to the client. Number of messages " - << messagesCount - << ", size " - << decompressedSize - << " bytes"); -} - void TReadSession::MakeCountersIfNeeded() { if (!Settings.Counters_ || NPersQueue::HasNullCounters(*Settings.Counters_)) { TReaderCounters::TPtr counters = MakeIntrusive<TReaderCounters>(); @@ -341,78 +333,17 @@ void TReadSession::MakeCountersIfNeeded() { } } -void TReadSession::DumpCountersToLog(size_t timeNumber) { - const bool logCounters = timeNumber % 60 == 0; // Every 1 minute. - const bool dumpSessionsStatistics = timeNumber % 600 == 0; // Every 10 minutes. - - *Settings.Counters_->CurrentSessionLifetimeMs = (TInstant::Now() - StartSessionTime).MilliSeconds(); - NPersQueue::TSingleClusterReadSessionImpl<false>::TPtr session; - with_lock (Lock) { - if (Closing || Aborting) { - return; - } - - session = Session; - } - - { - TMaybe<TLogElement> log; - if (dumpSessionsStatistics) { - log.ConstructInPlace(&Log, TLOG_INFO); - (*log) << "Read/commit by partition streams (cluster:topic:partition:stream-id:read-offset:committed-offset):"; - } - session->UpdateMemoryUsageStatistics(); - if (dumpSessionsStatistics) { - session->DumpStatisticsToLog(*log); - } - } - -#define C(counter) \ - << " " Y_STRINGIZE(counter) ": " \ - << Settings.Counters_->counter->Val() \ - /**/ - - if (logCounters) { - LOG_LAZY(Log, TLOG_INFO, - GetLogPrefix() << "Counters: {" - C(Errors) - C(CurrentSessionLifetimeMs) - C(BytesRead) - C(MessagesRead) - C(BytesReadCompressed) - C(BytesInflightUncompressed) - C(BytesInflightCompressed) - C(BytesInflightTotal) - C(MessagesInflight) - << " }" - ); - } - -#undef C - - // ScheduleDumpCountersToLog(timeNumber + 1); -} - -void TReadSession::ScheduleDumpCountersToLog(size_t timeNumber) { +void TReadSession::SetupCountersLogger() { with_lock(Lock) { - if (Aborting || Closing) { - return; - } - DumpCountersContext = Connections->CreateContext(); - if (DumpCountersContext) { - auto callback = [self = shared_from_this(), timeNumber](bool ok) { - if (ok) { - self->DumpCountersToLog(timeNumber); - } - }; - Connections->ScheduleCallback(TDuration::Seconds(1), - std::move(callback), - DumpCountersContext); - } + std::vector<NPersQueue::TCallbackContextPtr<false>> sessions{CbContext}; + + CountersLogger = std::make_shared<NPersQueue::TCountersLogger<false>>(Connections, sessions, Settings.Counters_, Log, + GetLogPrefix(), StartSessionTime); + DumpCountersContext = CountersLogger->MakeCallbackContext(); + CountersLogger->Start(); } } - void TReadSession::AbortImpl(NPersQueue::TDeferredActions<false>&) { Y_VERIFY(Lock.IsLocked()); @@ -420,7 +351,6 @@ void TReadSession::AbortImpl(NPersQueue::TDeferredActions<false>&) { Aborting = true; if (DumpCountersContext) { DumpCountersContext->Cancel(); - DumpCountersContext.reset(); } Session->Abort(); } diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h index 064a395a697..aa626de4b15 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h @@ -2,75 +2,13 @@ #include "topic_impl.h" -#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/impl_tracker.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/callback_context.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/counters_logger.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h> namespace NYdb::NTopic { -class TDummyReadSession: public IReadSession, public std::enable_shared_from_this<TDummyReadSession> { -public: - TDummyReadSession() = default; - - inline TDummyReadSession(const TReadSessionSettings& settings) { - (void)settings; - } - - inline NThreading::TFuture<void> WaitEvent() override { - Y_VERIFY(false); - - NThreading::TPromise<void> promise = NThreading::NewPromise<void>(); - return promise.GetFuture(); - } - - inline TVector<TReadSessionEvent::TEvent> GetEvents(bool block, TMaybe<size_t> maxEventsCount, size_t maxByteSize) override { - Y_VERIFY(false); - - (void)block; - (void)maxEventsCount; - (void)maxByteSize; - return {}; - } - - TVector<TReadSessionEvent::TEvent> GetEvents(const TReadSessionGetEventSettings& settings) override { - return GetEvents(settings.Block_, - settings.MaxEventsCount_, - settings.MaxByteSize_); - } - - inline TMaybe<TReadSessionEvent::TEvent> GetEvent(bool block, size_t maxByteSize) override { - Y_VERIFY(false); - - (void)block; - (void)maxByteSize; - return {}; - } - - TMaybe<TReadSessionEvent::TEvent> GetEvent(const TReadSessionGetEventSettings& settings) override { - return GetEvent(settings.Block_, - settings.MaxByteSize_); - } - - inline bool Close(TDuration timeout) override { - Y_VERIFY(false); - - return !(bool)timeout; - } - - inline TString GetSessionId() const override { - Y_VERIFY(false); - - return "dummy_session_id"; - } - - inline TReaderCounters::TPtr GetCounters() const override { - Y_VERIFY(false); - - return nullptr; - } -}; - class TReadSession : public IReadSession, - public NPersQueue::IUserRetrievedEventCallback<false>, public std::enable_shared_from_this<TReadSession> { public: TReadSession(const TReadSessionSettings& settings, @@ -113,11 +51,8 @@ private: void CreateClusterSessionsImpl(NPersQueue::TDeferredActions<false>& deferred); - void OnUserRetrievedEvent(i64 decompressedSize, size_t messagesCount) override; - void MakeCountersIfNeeded(); - void DumpCountersToLog(size_t timeNumber = 0); - void ScheduleDumpCountersToLog(size_t timeNumber = 0); + void SetupCountersLogger(); // Shutdown. void Abort(EStatus statusCode, NYql::TIssues&& issues); @@ -150,13 +85,14 @@ private: std::shared_ptr<TGRpcConnectionsImpl> Connections; TDbDriverStatePtr DbDriverState; TAdaptiveLock Lock; - std::shared_ptr<NPersQueue::TImplTracker> Tracker; std::shared_ptr<NPersQueue::TReadSessionEventsQueue<false>> EventsQueue; NPersQueue::TSingleClusterReadSessionImpl<false>::TPtr Session; + std::shared_ptr<NPersQueue::TCallbackContext<NPersQueue::TSingleClusterReadSessionImpl<false>>> CbContext; TVector<TTopicReadSettings> Topics; - NGrpc::IQueueClientContextPtr DumpCountersContext; + std::shared_ptr<NPersQueue::TCountersLogger<false>> CountersLogger; + std::shared_ptr<NPersQueue::TCallbackContext<NPersQueue::TCountersLogger<false>>> DumpCountersContext; // Exiting. bool Aborting = false; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp index 593a0362923..6a8a618831c 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp @@ -19,9 +19,10 @@ TWriteSession::TWriteSession( std::shared_ptr<TTopicClient::TImpl> client, std::shared_ptr<TGRpcConnectionsImpl> connections, TDbDriverStatePtr dbDriverState) - : Tracker(std::make_shared<NPersQueue::TImplTracker>()) - , Impl(std::make_shared<TWriteSessionImpl>(settings, std::move(client), std::move(connections), std::move(dbDriverState), Tracker)) + : Impl(std::make_shared<TWriteSessionImpl>(settings, std::move(client), std::move(connections), std::move(dbDriverState))) { + CbContext = std::make_shared<NPersQueue::TCallbackContext<TWriteSessionImpl>>(Impl); + Impl->SetCallbackContext(CbContext); } void TWriteSession::Start(const TDuration& delay) { @@ -79,7 +80,7 @@ bool TWriteSession::Close(TDuration closeTimeout) { TWriteSession::~TWriteSession() { Impl->Close(TDuration::Zero()); - Tracker->AsyncComplete().Wait(); + CbContext->Cancel(); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h index d4e11d34082..342e0c52e76 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h @@ -4,7 +4,7 @@ #include "write_session_impl.h" #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h> -#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/impl_tracker.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/callback_context.h> #include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> #include <util/generic/buffer.h> @@ -55,10 +55,13 @@ private: void Start(const TDuration& delay); private: - std::shared_ptr<NPersQueue::TImplTracker> Tracker; + std::shared_ptr<NPersQueue::TCallbackContext<TWriteSessionImpl>> CbContext; std::shared_ptr<TWriteSessionImpl> Impl; }; +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// TSimpleBlockingWriteSession + class TSimpleBlockingWriteSession : public ISimpleBlockingWriteSession { public: TSimpleBlockingWriteSession( diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp index e86ca445fd6..8eee84c3a5d 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp @@ -49,15 +49,13 @@ TWriteSessionImpl::TWriteSessionImpl( const TWriteSessionSettings& settings, std::shared_ptr<TTopicClient::TImpl> client, std::shared_ptr<TGRpcConnectionsImpl> connections, - TDbDriverStatePtr dbDriverState, - std::shared_ptr<NPersQueue::TImplTracker> tracker) + TDbDriverStatePtr dbDriverState) : Settings(settings) , Client(std::move(client)) , Connections(std::move(connections)) , DbDriverState(std::move(dbDriverState)) , PrevToken(DbDriverState->CredentialsProvider ? DbDriverState->CredentialsProvider->GetAuthInfo() : "") - , Tracker(tracker) - , EventsQueue(std::make_shared<TWriteSessionEventsQueue>(Settings, Tracker)) + , EventsQueue(std::make_shared<TWriteSessionEventsQueue>(Settings)) , InitSeqNoPromise(NThreading::NewPromise<ui64>()) , WakeupInterval( Settings.BatchFlushInterval_.GetOrElse(TDuration::Zero()) ? @@ -77,7 +75,12 @@ TWriteSessionImpl::TWriteSessionImpl( } +void TWriteSessionImpl::SetCallbackContext(std::shared_ptr<NPersQueue::TCallbackContext<TWriteSessionImpl>> ctx) { + CbContext = std::move(ctx); +} + void TWriteSessionImpl::Start(const TDuration& delay) { + Y_VERIFY(CbContext); ++ConnectionAttemptsDone; if (!Started) { with_lock(Lock) { @@ -159,17 +162,22 @@ void TWriteSessionImpl::ConnectToPreferredPartitionLocation(const TDuration& del request.set_partition_id(*Settings.PartitionId_); request.set_include_location(true); - auto extractor = [sharedThis = shared_from_this(), wire = Tracker->MakeTrackedWire(), context = describePartitionContext](Ydb::Topic::DescribePartitionResponse* response, TPlainStatus status) mutable { + auto extractor = [cbContext = CbContext, context = describePartitionContext](Ydb::Topic::DescribePartitionResponse* response, TPlainStatus status) mutable { Ydb::Topic::DescribePartitionResult result; if (response) response->operation().result().UnpackTo(&result); TStatus st(std::move(status)); - sharedThis->OnDescribePartition(st, result, context); + if (auto self = cbContext->LockShared()) { + self->OnDescribePartition(st, result, context); + } }; - auto callback = [sharedThis = this->shared_from_this(), wire = Tracker->MakeTrackedWire(), req = std::move(request), extr = std::move(extractor), connections = std::shared_ptr<TGRpcConnectionsImpl>(Connections), dbState = DbDriverState, context = describePartitionContext]() mutable { - LOG_LAZY(dbState->Log, TLOG_DEBUG, sharedThis->LogPrefix() << " Getting partition location, partition " << sharedThis->Settings.PartitionId_); + auto callback = [req = std::move(request), extr = std::move(extractor), + connections = std::shared_ptr<TGRpcConnectionsImpl>(Connections), dbState = DbDriverState, + context = describePartitionContext, prefix = TString(LogPrefix()), + partId = Settings.PartitionId_]() mutable { + LOG_LAZY(dbState->Log, TLOG_DEBUG, prefix + " Getting partition location, partition " + ToString(partId)); connections->Run<Ydb::Topic::V1::TopicService, Ydb::Topic::DescribePartitionRequest, Ydb::Topic::DescribePartitionResponse>( std::move(req), std::move(extr), @@ -189,10 +197,11 @@ void TWriteSessionImpl::OnDescribePartition(const TStatus& status, const Ydb::To THandleResult handleResult; with_lock (Lock) { - if (DescribePartitionContext == describePartitionContext) + if (DescribePartitionContext == describePartitionContext) { DescribePartitionContext = nullptr; - else + } else { return; + } } if (!status.IsSuccess()) { @@ -468,19 +477,18 @@ void TWriteSessionImpl::Connect(const TDuration& delay) { reqSettings = TRpcRequestSettings::Make(Settings, PreferredPartitionLocation.Endpoint); - connectCallback = [sharedThis = shared_from_this(), - wire = Tracker->MakeTrackedWire(), - connectContext = connectContext] - (TPlainStatus&& st, typename IProcessor::TPtr&& processor) { - sharedThis->OnConnect(std::move(st), std::move(processor), connectContext); + connectCallback = [cbContext = CbContext, + connectContext = connectContext](TPlainStatus&& st, typename IProcessor::TPtr&& processor) { + if (auto self = cbContext->LockShared()) { + self->OnConnect(std::move(st), std::move(processor), connectContext); + } }; - connectTimeoutCallback = [sharedThis = shared_from_this(), - wire = Tracker->MakeTrackedWire(), - connectTimeoutContext = connectTimeoutContext] - (bool ok) { + connectTimeoutCallback = [cbContext = CbContext, connectTimeoutContext = connectTimeoutContext](bool ok) { if (ok) { - sharedThis->OnConnectTimeout(connectTimeoutContext); + if (auto self = cbContext->LockShared()) { + self->OnConnectTimeout(connectTimeoutContext); + } } }; } @@ -570,7 +578,7 @@ void TWriteSessionImpl::InitImpl() { auto* init = req.mutable_init_request(); init->set_path(Settings.Path_); init->set_producer_id(Settings.ProducerId_); - + if (Settings.PartitionId_.Defined()) { if (Settings.DirectWriteToPartition_) { auto* partitionWithGeneration = init->mutable_partition_with_generation(); @@ -604,10 +612,11 @@ void TWriteSessionImpl::WriteToProcessorImpl(TWriteSessionImpl::TClientMessage&& if (Aborting) { return; } - auto callback = [sharedThis = shared_from_this(), - wire = Tracker->MakeTrackedWire(), + auto callback = [cbContext = CbContext, connectionGeneration = ConnectionGeneration](NGrpc::TGrpcStatus&& grpcStatus) { - sharedThis->OnWriteDone(std::move(grpcStatus), connectionGeneration); + if (auto self = cbContext->LockShared()) { + self->OnWriteDone(std::move(grpcStatus), connectionGeneration); + } }; Processor->Write(std::move(req), callback); @@ -624,13 +633,14 @@ void TWriteSessionImpl::ReadFromProcessor() { } prc = Processor; generation = ConnectionGeneration; - callback = [sharedThis = shared_from_this(), - wire = Tracker->MakeTrackedWire(), - connectionGeneration = generation, - processor = prc, - serverMessage = ServerMessage] - (NGrpc::TGrpcStatus&& grpcStatus) { - sharedThis->OnReadDone(std::move(grpcStatus), connectionGeneration); + callback = [cbContext = CbContext, + connectionGeneration = generation, + processor = prc, + serverMessage = ServerMessage] + (NGrpc::TGrpcStatus&& grpcStatus) { + if (auto self = cbContext->LockShared()) { + self->OnReadDone(std::move(grpcStatus), connectionGeneration); + } }; } prc->Read(ServerMessage.get(), std::move(callback)); @@ -943,8 +953,7 @@ void TWriteSessionImpl::CompressImpl(TBlock&& block_) { std::shared_ptr<TBlock> blockPtr(std::make_shared<TBlock>()); blockPtr->Move(block_); - auto lambda = [sharedThis = shared_from_this(), - wire = Tracker->MakeTrackedWire(), + auto lambda = [cbContext = CbContext, codec = Settings.Codec_, level = Settings.CompressionLevel_, isSyncCompression = !CompressionExecutor->IsAsync(), @@ -957,8 +966,10 @@ void TWriteSessionImpl::CompressImpl(TBlock&& block_) { Y_VERIFY(!compressedData.Empty()); blockPtr->Data = std::move(compressedData); blockPtr->Compressed = true; - blockPtr->CodecID = static_cast<ui32>(sharedThis->Settings.Codec_); - sharedThis->OnCompressed(std::move(*blockPtr), isSyncCompression); + blockPtr->CodecID = static_cast<ui32>(codec); + if (auto self = cbContext->LockShared()) { + self->OnCompressed(std::move(*blockPtr), isSyncCompression); + } }; CompressionExecutor->Post(lambda); @@ -1261,13 +1272,15 @@ void TWriteSessionImpl::HandleWakeUpImpl() { if (AtomicGet(Aborting)) { return; } - auto callback = [sharedThis = this->shared_from_this(), wire = Tracker->MakeTrackedWire()] (bool ok) + auto callback = [cbContext = CbContext] (bool ok) { if (!ok) { return; } - with_lock(sharedThis->Lock) { - sharedThis->HandleWakeUpImpl(); + if (auto self = cbContext->LockShared()) { + with_lock(self->Lock) { + self->HandleWakeUpImpl(); + } } }; if (TInstant::Now() - LastTokenUpdate > UPDATE_TOKEN_PERIOD) { diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h index e5ed606a669..67b69cb6d57 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h @@ -3,7 +3,7 @@ #include "topic_impl.h" #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h> -#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/impl_tracker.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/callback_context.h> #include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> #include <util/generic/buffer.h> @@ -26,10 +26,8 @@ class TWriteSessionEventsQueue: public NPersQueue::TBaseSessionEventsQueue<TWrit using TParent = TBaseSessionEventsQueue<TWriteSessionSettings, TWriteSessionEvent::TEvent, TSessionClosedEvent, IExecutor>; public: - TWriteSessionEventsQueue(const TWriteSessionSettings& settings, - std::shared_ptr<NPersQueue::TImplTracker> tracker = std::make_shared<NPersQueue::TImplTracker>()) + TWriteSessionEventsQueue(const TWriteSessionSettings& settings) : TParent(settings) - , Tracker(std::move(tracker)) {} void PushEvent(TEventInfo eventInfo) { @@ -126,7 +124,7 @@ private: }; bool ApplyHandler(TEventInfo& eventInfo) { - THandlersVisitor visitor(Settings, eventInfo.Event, Tracker); + THandlersVisitor visitor(Settings, eventInfo.Event); return visitor.Visit(); } @@ -141,9 +139,6 @@ private: Y_ASSERT(CloseEvent); return {*CloseEvent}; } - -private: - std::shared_ptr<NPersQueue::TImplTracker> Tracker; }; struct TMemoryUsageChange { @@ -325,8 +320,7 @@ public: TWriteSessionImpl(const TWriteSessionSettings& settings, std::shared_ptr<TTopicClient::TImpl> client, std::shared_ptr<TGRpcConnectionsImpl> connections, - TDbDriverStatePtr dbDriverState, - std::shared_ptr<NPersQueue::TImplTracker> tracker); + TDbDriverStatePtr dbDriverState); TMaybe<TWriteSessionEvent::TEvent> GetEvent(bool block = false) override; TVector<TWriteSessionEvent::TEvent> GetEvents(bool block = false, @@ -361,6 +355,8 @@ public: ~TWriteSessionImpl(); // will not call close - destroy everything without acks + void SetCallbackContext(std::shared_ptr<NPersQueue::TCallbackContext<TWriteSessionImpl>> ctx); + private: TStringBuilder LogPrefix() const; @@ -429,7 +425,7 @@ private: TStringType PrevToken; bool UpdateTokenInProgress = false; TInstant LastTokenUpdate = TInstant::Zero(); - std::shared_ptr<NPersQueue::TImplTracker> Tracker; + std::shared_ptr<NPersQueue::TCallbackContext<TWriteSessionImpl>> CbContext; std::shared_ptr<TWriteSessionEventsQueue> EventsQueue; NGrpc::IQueueClientContextPtr ClientContext; // Common client context. NGrpc::IQueueClientContextPtr ConnectContext; |