aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorildar-khisam <ikhis@ydb.tech>2023-09-28 11:40:46 +0300
committerildar-khisam <ikhis@ydb.tech>2023-09-28 12:10:12 +0300
commitaf36c1aa28ec2ef337bc39a6095c1905f52fdb94 (patch)
treee030f6d05450536adb6571787f032301f58809ab
parentf9a265a5c4b8a1780cf24c29088dffc342362813 (diff)
downloadydb-af36c1aa28ec2ef337bc39a6095c1905f52fdb94.tar.gz
add callbacks context
add callback context
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/callback_context.h71
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h9
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/counters_logger.h157
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/impl_tracker.h123
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp112
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h121
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp227
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp7
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp78
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h18
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp22
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp114
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h76
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp7
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h7
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp89
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h18
19 files changed, 582 insertions, 679 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h
index 7e11903a463..040e5117c54 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h
@@ -2,7 +2,6 @@
#include <ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.h>
-#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/impl_tracker.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h>
#include <ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h>
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/callback_context.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/callback_context.h
new file mode 100644
index 00000000000..5de727a5ab5
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/callback_context.h
@@ -0,0 +1,71 @@
+#pragma once
+
+#include <library/cpp/threading/future/core/future.h>
+#include <util/system/guard.h>
+#include <util/system/spinlock.h>
+
+#include <memory>
+#include <shared_mutex>
+#include <vector>
+
+namespace NYdb::NPersQueue {
+
+template <typename TGuardedObject>
+class TCallbackContext {
+public:
+ using TMutexPtr = std::shared_ptr<std::shared_mutex>;
+
+ class TBorrowed {
+ public:
+ explicit TBorrowed(const TCallbackContext& parent) : Mutex(parent.Mutex) {
+ Mutex->lock_shared();
+ Ptr = parent.GuardedObjectPtr.get();
+ }
+
+ ~TBorrowed() {
+ Mutex->unlock_shared();
+ }
+
+ TGuardedObject* operator->() {
+ return Ptr;
+ }
+
+ const TGuardedObject* operator->() const {
+ return Ptr;
+ }
+
+ operator bool() {
+ return Ptr;
+ }
+
+ private:
+ TMutexPtr Mutex;
+ TGuardedObject* Ptr = nullptr;
+ };
+
+public:
+ explicit TCallbackContext(typename std::shared_ptr<TGuardedObject> ptr)
+ : Mutex(std::make_shared<std::shared_mutex>())
+ , GuardedObjectPtr(std::move(ptr))
+ {}
+
+ // may block
+ ~TCallbackContext() {
+ Cancel();
+ }
+
+ TBorrowed LockShared() {
+ return TBorrowed(*this);
+ }
+
+ void Cancel() {
+ std::lock_guard lock(*Mutex);
+ GuardedObjectPtr.reset();
+ }
+
+private:
+ TMutexPtr Mutex;
+ typename std::shared_ptr<TGuardedObject> GuardedObjectPtr;
+};
+
+}
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h
index 21179740daa..6acb3cfc4ff 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h
@@ -1,7 +1,6 @@
#pragma once
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h>
-#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/impl_tracker.h>
#include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h>
#include <util/generic/queue.h>
@@ -314,10 +313,9 @@ protected:
// Template for visitor implementation.
struct TBaseHandlersVisitor {
- TBaseHandlersVisitor(const TSettings& settings, TEvent& event, std::shared_ptr<TImplTracker> tracker)
+ TBaseHandlersVisitor(const TSettings& settings, TEvent& event)
: Settings(settings)
, Event(event)
- , Tracker(tracker)
{}
template <class TEventType, class TFunc, class TCommonFunc>
@@ -336,7 +334,7 @@ protected:
template <class TEventType, class TFunc>
void PushSpecificHandler(TEvent&& event, const TFunc& f) {
Post(Settings.EventHandlers_.HandlersExecutor_,
- [func = f, event = std::move(event), wire = Tracker->MakeTrackedWire()]() mutable {
+ [func = f, event = std::move(event)]() mutable {
func(std::get<TEventType>(event));
});
}
@@ -344,7 +342,7 @@ protected:
template <class TFunc>
void PushCommonHandler(TEvent&& event, const TFunc& f) {
Post(Settings.EventHandlers_.HandlersExecutor_,
- [func = f, event = std::move(event), wire = Tracker->MakeTrackedWire()]() mutable { func(event); });
+ [func = f, event = std::move(event)]() mutable { func(event); });
}
virtual void Post(const typename TExecutor::TPtr& executor, typename TExecutor::TFunction&& f) {
@@ -353,7 +351,6 @@ protected:
const TSettings& Settings;
TEvent& Event;
- std::shared_ptr<TImplTracker> Tracker;
};
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/counters_logger.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/counters_logger.h
new file mode 100644
index 00000000000..cb238fa262a
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/counters_logger.h
@@ -0,0 +1,157 @@
+#pragma once
+
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h>
+
+#include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h>
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h>
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/callback_context.h>
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/log_lazy.h>
+
+#include <util/system/spinlock.h>
+
+namespace NYdb::NPersQueue {
+
+template <bool UseMigrationProtocol>
+class TSingleClusterReadSessionImpl;
+
+template <bool UseMigrationProtocol>
+using TCallbackContextPtr = std::shared_ptr<TCallbackContext<TSingleClusterReadSessionImpl<UseMigrationProtocol>>>;
+
+template<bool UseMigrationProtocol>
+class TCountersLogger : public std::enable_shared_from_this<TCountersLogger<UseMigrationProtocol>> {
+public:
+ static constexpr auto UPDATE_PERIOD = TDuration::Seconds(1);
+
+ using TReaderCountersPtr = std::conditional_t<UseMigrationProtocol,
+ NYdb::NPersQueue::TReaderCounters::TPtr,
+ NYdb::NTopic::TReaderCounters::TPtr>;
+
+public:
+ explicit TCountersLogger(std::shared_ptr<TGRpcConnectionsImpl> connections,
+ std::vector<TCallbackContextPtr<UseMigrationProtocol>> sessions,
+ TReaderCountersPtr counters, const TLog& log, TString prefix, TInstant startSessionTime)
+ : Connections(std::move(connections))
+ , SessionsContexts(std::move(sessions))
+ , Counters(std::move(counters))
+ , Log(log)
+ , Prefix(std::move(prefix))
+ , StartSessionTime(startSessionTime) {
+ }
+
+ std::shared_ptr<TCallbackContext<TCountersLogger<UseMigrationProtocol>>> MakeCallbackContext() {
+ SelfContext = std::make_shared<TCallbackContext<TCountersLogger<UseMigrationProtocol>>>(this->shared_from_this());
+ return SelfContext;
+ }
+
+ void Start() {
+ Y_VERIFY(SelfContext);
+ ScheduleDumpCountersToLog();
+ }
+
+ void Stop() {
+ Y_VERIFY(SelfContext);
+
+ with_lock(Lock) {
+ Stopping = true;
+ }
+
+ // Log final counters.
+ DumpCountersToLog();
+ }
+
+private:
+ void ScheduleDumpCountersToLog(size_t timeNumber = 0) {
+ with_lock(Lock) {
+ if (Stopping) {
+ return;
+ }
+
+ DumpCountersContext = Connections->CreateContext();
+ if (DumpCountersContext) {
+ auto callback = [ctx = SelfContext, timeNumber](bool ok) {
+ if (ok) {
+ if (auto borrowedSelf = ctx->LockShared()) {
+ borrowedSelf->DumpCountersToLog(timeNumber);
+ }
+ }
+ };
+ Connections->ScheduleCallback(UPDATE_PERIOD,
+ std::move(callback),
+ DumpCountersContext);
+ }
+ }
+ }
+
+ void DumpCountersToLog(size_t timeNumber = 0) {
+ const bool logCounters = timeNumber % 60 == 0; // Every 1 minute.
+ const bool dumpSessionsStatistics = timeNumber % 600 == 0; // Every 10 minutes.
+
+ *Counters->CurrentSessionLifetimeMs = (TInstant::Now() - StartSessionTime).MilliSeconds();
+
+ {
+ TMaybe<TLogElement> log;
+ bool dumpHeader = true;
+
+ for (auto& sessionCtx : SessionsContexts) {
+ if (auto borrowedSession = sessionCtx->LockShared()) {
+ borrowedSession->UpdateMemoryUsageStatistics();
+ if (dumpSessionsStatistics) {
+ if (dumpHeader) {
+ log.ConstructInPlace(&Log, TLOG_INFO);
+ (*log) << "Read/commit by partition streams (cluster:topic:partition:stream-id:read-offset:committed-offset):";
+ dumpHeader = false;
+ }
+ borrowedSession->DumpStatisticsToLog(*log);
+ }
+ }
+ }
+ }
+
+#define C(counter) \
+ << " " Y_STRINGIZE(counter) ": " \
+ << Counters->counter->Val() \
+ /**/
+
+ if (logCounters) {
+ LOG_LAZY(Log, TLOG_INFO,
+ TStringBuilder() << Prefix << "Counters: {"
+ C(Errors)
+ C(CurrentSessionLifetimeMs)
+ C(BytesRead)
+ C(MessagesRead)
+ C(BytesReadCompressed)
+ C(BytesInflightUncompressed)
+ C(BytesInflightCompressed)
+ C(BytesInflightTotal)
+ C(MessagesInflight)
+ << " }"
+ );
+ }
+
+#undef C
+
+ ScheduleDumpCountersToLog(timeNumber + 1);
+ }
+
+
+private:
+ std::shared_ptr<TCallbackContext<TCountersLogger<UseMigrationProtocol>>> SelfContext;
+
+ TSpinLock Lock;
+
+ std::shared_ptr<TGRpcConnectionsImpl> Connections;
+ std::vector<TCallbackContextPtr<UseMigrationProtocol>> SessionsContexts;
+
+ IQueueClientContextPtr DumpCountersContext;
+
+ TReaderCountersPtr Counters;
+
+ TLog Log;
+ const TString Prefix;
+ const TInstant StartSessionTime;
+
+ bool Stopping = false;
+};
+
+}
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/impl_tracker.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/impl_tracker.h
deleted file mode 100644
index 9fe0d3ae04e..00000000000
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/impl_tracker.h
+++ /dev/null
@@ -1,123 +0,0 @@
-#pragma once
-
-#include <library/cpp/threading/future/core/future.h>
-#include <util/system/guard.h>
-#include <util/system/spinlock.h>
-
-#include <memory>
-
-namespace NYdb::NPersQueue {
-
-class TImplTracker {
-public:
- struct TWire {
- TImplTracker* Owner;
-
- TWire()
- : Owner(nullptr) {
- }
-
- TWire(TImplTracker& owner)
- : Owner(&owner) {
- Owner->Increment();
- }
-
- ~TWire() {
- if (Owner) {
- Owner->Decrement();
- }
- }
-
- TWire(const TWire& that) : Owner(that.Owner) {
- if (Owner)
- Owner->Increment();
- }
- TWire(TWire&& that)
- : Owner(std::exchange(that.Owner, nullptr)) {
- }
- TWire& operator=(const TWire& that) {
- if (Owner == that.Owner) {
- return *this;
- }
- if (Owner) {
- Owner->Decrement();
- }
- Owner = that.Owner;
- if (Owner) {
- Owner->Increment();
- }
- return *this;
- }
- TWire& operator=(TWire&& that) {
- if (this == &that) {
- return *this;
- }
- if (Owner == that.Owner) {
- if (that.Owner) {
- that.Owner->Decrement();
- }
- that.Owner = nullptr;
- return *this;
- }
- if (Owner) {
- Owner->Decrement();
- }
- Owner = std::exchange(that.Owner, nullptr);
- return *this;
- }
-
- operator bool() const {
- return Owner;
- }
- };
-
-public:
- ~TImplTracker() {
- // to synchronize with last Decrement() in other thread
- TGuard guard(Lock);
- }
-
- std::shared_ptr<TWire> MakeTrackedWire() {
- return std::make_shared<TWire>(*this);
- }
-
- void Increment() {
- with_lock(Lock) {
- Y_VERIFY(!AwaitingCompletion());
- ++RefCount;
- }
- }
-
- void Decrement() {
- with_lock(Lock) {
- --RefCount;
- if (RefCount == 0 && AwaitingCompletion()) {
- CompletionPromise.SetValue();
- }
- }
- }
-
- NThreading::TFuture<void> AsyncComplete() {
- with_lock(Lock) {
- if (!AwaitingCompletion()) {
- CompletionPromise = NThreading::NewPromise();
- }
- if (RefCount == 0) {
- CompletionPromise.SetValue();
- }
- return CompletionPromise.GetFuture();
- }
- }
-
-private:
- inline bool AwaitingCompletion() const {
- return CompletionPromise.Initialized();
- }
-
-private:
- TSpinLock Lock;
- size_t RefCount = 0;
- NThreading::TPromise<void> CompletionPromise;
-};
-
-}
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp
index 1bdb5307add..d7c680ecfaa 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp
@@ -67,8 +67,11 @@ TReadSession::~TReadSession() {
Abort();
ClearAllEvents();
- if (Tracker) {
- Tracker->AsyncComplete().Wait();
+ for (const auto& ctx : CbContexts) {
+ ctx->Cancel();
+ }
+ if (DumpCountersContext) {
+ DumpCountersContext->Cancel();
}
}
@@ -83,8 +86,7 @@ Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest TReadSession::MakeClus
}
void TReadSession::Start() {
- Tracker = std::make_shared<TImplTracker>();
- EventsQueue = std::make_shared<TReadSessionEventsQueue<true>>(Settings, weak_from_this(), Tracker);
+ EventsQueue = std::make_shared<TReadSessionEventsQueue<true>>(Settings);
if (!ValidateSettings()) {
return;
@@ -178,7 +180,7 @@ void TReadSession::ProceedWithoutClusterDiscovery() {
clusterSessionInfo.Topics = Settings.Topics_;
CreateClusterSessionsImpl(deferred);
}
- // ScheduleDumpCountersToLog();
+ SetupCountersLogger();
}
void TReadSession::CreateClusterSessionsImpl(TDeferredActions<true>& deferred) {
@@ -215,10 +217,10 @@ void TReadSession::CreateClusterSessionsImpl(TDeferredActions<true>& deferred) {
EventsQueue,
context,
partitionStreamIdStart++,
- clusterSessionsCount,
- Tracker);
+ clusterSessionsCount);
- deferred.DeferStartSession(clusterSessionInfo.Session);
+ CbContexts.push_back(clusterSessionInfo.Session->MakeCallbackContext());
+ deferred.DeferStartSession(CbContexts.back());
}
}
@@ -320,7 +322,7 @@ void TReadSession::OnClusterDiscovery(const TStatus& status, const Ydb::PersQueu
CreateClusterSessionsImpl(deferred);
}
- // ScheduleDumpCountersToLog();
+ SetupCountersLogger();
}
void TReadSession::RestartClusterDiscoveryImpl(TDuration delay, TDeferredActions<true>& deferred) {
@@ -350,7 +352,7 @@ void TReadSession::RestartClusterDiscoveryImpl(TDuration delay, TDeferredActions
bool TReadSession::Close(TDuration timeout) {
LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Closing read session. Close timeout: " << timeout);
// Log final counters.
- DumpCountersToLog();
+ CountersLogger->Stop();
std::vector<TSingleClusterReadSessionImpl<true>::TPtr> sessions;
NThreading::TPromise<bool> promise = NThreading::NewPromise<bool>();
@@ -528,15 +530,6 @@ void TReadSession::ResumeReadingData() {
}
}
-void TReadSession::OnUserRetrievedEvent(i64 decompressedSize, size_t messagesCount) {
- LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix()
- << "The application data is transferred to the client. Number of messages "
- << messagesCount
- << ", size "
- << decompressedSize
- << " bytes");
-}
-
void TReadSession::MakeCountersIfNeeded() {
if (!Settings.Counters_ || HasNullCounters(*Settings.Counters_)) {
TReaderCounters::TPtr counters = MakeIntrusive<TReaderCounters>();
@@ -548,83 +541,12 @@ void TReadSession::MakeCountersIfNeeded() {
}
}
-void TReadSession::DumpCountersToLog(size_t timeNumber) {
- const bool logCounters = timeNumber % 60 == 0; // Every 1 minute.
- const bool dumpSessionsStatistics = timeNumber % 600 == 0; // Every 10 minutes.
-
- *Settings.Counters_->CurrentSessionLifetimeMs = (TInstant::Now() - StartSessionTime).MilliSeconds();
- std::vector<TSingleClusterReadSessionImpl<true>::TPtr> sessions;
- with_lock (Lock) {
- if (Closing || Aborting) {
- return;
- }
-
- sessions.reserve(ClusterSessions.size());
- for (auto& [cluster, sessionInfo] : ClusterSessions) {
- if (sessionInfo.Session) {
- sessions.emplace_back(sessionInfo.Session);
- }
- }
- }
-
- {
- TMaybe<TLogElement> log;
- if (dumpSessionsStatistics) {
- log.ConstructInPlace(&Log, TLOG_INFO);
- (*log) << "Read/commit by partition streams (cluster:topic:partition:stream-id:read-offset:committed-offset):";
- }
- for (const auto& session : sessions) {
- session->UpdateMemoryUsageStatistics();
- if (dumpSessionsStatistics) {
- session->DumpStatisticsToLog(*log);
- }
- }
- }
-
-#define C(counter) \
- << " " Y_STRINGIZE(counter) ": " \
- << Settings.Counters_->counter->Val() \
- /**/
-
- if (logCounters) {
- LOG_LAZY(Log, TLOG_INFO,
- GetLogPrefix() << "Counters: {"
- C(Errors)
- C(CurrentSessionLifetimeMs)
- C(BytesRead)
- C(MessagesRead)
- C(BytesReadCompressed)
- C(BytesInflightUncompressed)
- C(BytesInflightCompressed)
- C(BytesInflightTotal)
- C(MessagesInflight)
- << " }"
- );
- }
-
-#undef C
-
- // ScheduleDumpCountersToLog(timeNumber + 1);
-}
-
-void TReadSession::ScheduleDumpCountersToLog(size_t timeNumber) {
+void TReadSession::SetupCountersLogger() {
with_lock(Lock) {
- if (Aborting || Closing) {
- return;
- }
- DumpCountersContext = Connections->CreateContext();
- if (DumpCountersContext) {
- auto callback = [self = weak_from_this(), timeNumber](bool ok) {
- if (ok) {
- if (auto sharedSelf = self.lock()) {
- sharedSelf->DumpCountersToLog(timeNumber);
- }
- }
- };
- Connections->ScheduleCallback(TDuration::Seconds(1),
- std::move(callback),
- DumpCountersContext);
- }
+ CountersLogger = std::make_shared<TCountersLogger<true>>(Connections, CbContexts, Settings.Counters_, Log,
+ GetLogPrefix(), StartSessionTime);
+ DumpCountersContext = CountersLogger->MakeCallbackContext();
+ CountersLogger->Start();
}
}
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
index 2586ed97985..29440c8d363 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
@@ -1,7 +1,8 @@
#pragma once
#include "common.h"
-#include "impl_tracker.h"
+#include "callback_context.h"
+#include "counters_logger.h"
#include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h>
@@ -19,6 +20,7 @@
#include <atomic>
#include <deque>
+#include <vector>
namespace NYdb::NPersQueue {
@@ -107,27 +109,9 @@ class TDataDecompressionInfo;
template <bool UseMigrationProtocol>
using TDataDecompressionInfoPtr = typename TDataDecompressionInfo<UseMigrationProtocol>::TPtr;
-
template <bool UseMigrationProtocol>
-struct IErrorHandler : public TThrRefBase {
- using TPtr = TIntrusivePtr<IErrorHandler>;
-
- virtual void AbortSession(TASessionClosedEvent<UseMigrationProtocol>&& closeEvent) = 0;
-
- void AbortSession(EStatus statusCode, NYql::TIssues&& issues) {
- AbortSession(TASessionClosedEvent<UseMigrationProtocol>(statusCode, std::move(issues)));
- }
-
- void AbortSession(EStatus statusCode, const TString& message) {
- NYql::TIssues issues;
- issues.AddIssue(message);
- AbortSession(statusCode, std::move(issues));
- }
+using TCallbackContextPtr = std::shared_ptr<TCallbackContext<TSingleClusterReadSessionImpl<UseMigrationProtocol>>>;
- void AbortSession(TPlainStatus&& status) {
- AbortSession(TASessionClosedEvent<UseMigrationProtocol>(std::move(status)));
- }
-};
template <bool UseMigrationProtocol>
class TUserRetrievedEventsInfoAccumulator {
@@ -154,12 +138,12 @@ public:
void DeferReadFromProcessor(const typename IProcessor<UseMigrationProtocol>::TPtr& processor, TServerMessage<UseMigrationProtocol>* dst, typename IProcessor<UseMigrationProtocol>::TReadCallback callback);
void DeferStartExecutorTask(const typename IAExecutor<UseMigrationProtocol>::TPtr& executor, typename IAExecutor<UseMigrationProtocol>::TFunction task);
- void DeferAbortSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, TASessionClosedEvent<UseMigrationProtocol>&& closeEvent);
- void DeferAbortSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, EStatus statusCode, NYql::TIssues&& issues);
- void DeferAbortSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, EStatus statusCode, const TString& message);
- void DeferAbortSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, TPlainStatus&& status);
- void DeferReconnection(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, TPlainStatus&& status);
- void DeferStartSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session);
+ void DeferAbortSession(TCallbackContextPtr<UseMigrationProtocol> cbContext, TASessionClosedEvent<UseMigrationProtocol>&& closeEvent);
+ void DeferAbortSession(TCallbackContextPtr<UseMigrationProtocol> cbContext, EStatus statusCode, NYql::TIssues&& issues);
+ void DeferAbortSession(TCallbackContextPtr<UseMigrationProtocol> cbContext, EStatus statusCode, const TString& message);
+ void DeferAbortSession(TCallbackContextPtr<UseMigrationProtocol> cbContext, TPlainStatus&& status);
+ void DeferReconnection(TCallbackContextPtr<UseMigrationProtocol> cbContext, TPlainStatus&& status);
+ void DeferStartSession(TCallbackContextPtr<UseMigrationProtocol> cbContext);
void DeferSignalWaiter(TWaiter&& waiter);
void DeferDestroyDecompressionInfos(std::vector<TDataDecompressionInfoPtr<UseMigrationProtocol>>&& infos);
@@ -189,11 +173,11 @@ private:
std::vector<TWaiter> Waiters;
// Reconnection and abort.
- std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> Session;
+ TCallbackContextPtr<UseMigrationProtocol> CbContext;
TPlainStatus ReconnectionStatus;
- // Session to start
- std::vector<std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>>> Sessions;
+ // Contexts for sessions to start
+ std::vector<TCallbackContextPtr<UseMigrationProtocol>> CbContexts;
std::vector<TDataDecompressionInfoPtr<UseMigrationProtocol>> DecompressionInfos;
};
@@ -207,7 +191,7 @@ public:
TDataDecompressionInfo(TDataDecompressionInfo&&) = default;
TDataDecompressionInfo(
TPartitionData<UseMigrationProtocol>&& msg,
- std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session,
+ TCallbackContextPtr<UseMigrationProtocol> cbContext,
bool doDecompress,
i64 serverBytesSize = 0 // to increment read request bytes size
);
@@ -338,7 +322,7 @@ private:
using TMessageMetaPtrVector = std::vector<typename TAMessageMeta<UseMigrationProtocol>::TPtr>;
TMetadataPtrVector BatchesMeta;
std::vector<TMessageMetaPtrVector> MessagesMeta;
- std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> Session;
+ TCallbackContextPtr<UseMigrationProtocol> CbContext;
bool DoDecompress;
i64 ServerBytesSize = 0;
std::atomic<i64> SourceDataNotProcessed = 0;
@@ -411,21 +395,21 @@ struct TReadSessionEventInfo {
bool HasDataEvents = false;
size_t EventsCount = 0;
TMaybe<TEvent> Event;
- std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> Session;
+ TCallbackContextPtr<UseMigrationProtocol> CbContext;
// Close event.
- TReadSessionEventInfo(const TASessionClosedEvent<UseMigrationProtocol>& event, std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session = {})
+ TReadSessionEventInfo(const TASessionClosedEvent<UseMigrationProtocol>& event, TCallbackContextPtr<UseMigrationProtocol> cbContext = {})
: Event(TEvent(event))
- , Session(session)
+ , CbContext(std::move(cbContext))
{
}
// Usual event.
- TReadSessionEventInfo(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session, TEvent event);
+ TReadSessionEventInfo(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream, TCallbackContextPtr<UseMigrationProtocol> cbContext, TEvent event);
// Data event.
TReadSessionEventInfo(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
- std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session,
+ TCallbackContextPtr<UseMigrationProtocol> cbContext,
bool hasDataEvents);
bool IsEmpty() const;
@@ -583,11 +567,11 @@ public:
ui64 partitionId,
ui64 assignId,
ui64 readOffset,
- std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> parentSession)
+ TCallbackContextPtr<UseMigrationProtocol> cbContext)
: Key{topicPath, cluster, partitionId}
, AssignId(assignId)
, FirstNotReadOffset(readOffset)
- , Session(std::move(parentSession))
+ , CbContext(std::move(cbContext))
{
TAPartitionStream<true>::PartitionStreamId = partitionStreamId;
TAPartitionStream<true>::TopicPath = std::move(topicPath);
@@ -603,11 +587,11 @@ public:
i64 partitionId,
i64 assignId,
i64 readOffset,
- std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> parentSession)
+ TCallbackContextPtr<UseMigrationProtocol> cbContext)
: Key{topicPath, "", static_cast<ui64>(partitionId)}
, AssignId(static_cast<ui64>(assignId))
, FirstNotReadOffset(static_cast<ui64>(readOffset))
- , Session(std::move(parentSession))
+ , CbContext(std::move(cbContext))
{
TAPartitionStream<false>::PartitionSessionId = partitionStreamId;
TAPartitionStream<false>::TopicPath = std::move(topicPath);
@@ -667,8 +651,8 @@ public:
EventsQueue.pop_front();
}
- std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> GetSession() const {
- return Session;
+ TCallbackContextPtr<UseMigrationProtocol> GetCbContext() const {
+ return CbContext;
}
TLog GetLog() const;
@@ -746,7 +730,7 @@ private:
const TKey Key;
ui64 AssignId;
ui64 FirstNotReadOffset;
- std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> Session;
+ TCallbackContextPtr<UseMigrationProtocol> CbContext;
TRawPartitionStreamEventQueue<UseMigrationProtocol> EventsQueue;
ui64 MaxReadOffset = 0;
ui64 MaxCommittedOffset = 0;
@@ -768,22 +752,21 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
TReadSessionEventInfo<UseMigrationProtocol>>;
public:
- TReadSessionEventsQueue(const TAReadSessionSettings<UseMigrationProtocol>& settings,
- std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session,
- std::shared_ptr<TImplTracker> tracker = std::make_shared<TImplTracker>());
+ TReadSessionEventsQueue(const TAReadSessionSettings<UseMigrationProtocol>& settings);
+ // Assumes we are under lock.
TReadSessionEventInfo<UseMigrationProtocol>
- GetEventImpl(size_t& maxByteSize,
- TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>& accumulator); // Assumes that we're under lock.
+ GetEventImpl(size_t& maxByteSize,
+ TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>& accumulator);
TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TEvent>
- GetEvents(bool block = false,
- TMaybe<size_t> maxEventsCount = Nothing(),
- size_t maxByteSize = std::numeric_limits<size_t>::max());
+ GetEvents(bool block = false,
+ TMaybe<size_t> maxEventsCount = Nothing(),
+ size_t maxByteSize = std::numeric_limits<size_t>::max());
TMaybe<typename TAReadSessionEvent<UseMigrationProtocol>::TEvent>
- GetEvent(bool block = false,
- size_t maxByteSize = std::numeric_limits<size_t>::max());
+ GetEvent(bool block = false,
+ size_t maxByteSize = std::numeric_limits<size_t>::max());
bool Close(const TASessionClosedEvent<UseMigrationProtocol>& event, TDeferredActions<UseMigrationProtocol>& deferred) {
TWaiter waiter;
@@ -812,7 +795,6 @@ public:
// Push usual event. Returns false if queue is closed
bool PushEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
- std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session,
typename TAReadSessionEvent<UseMigrationProtocol>::TEvent event,
TDeferredActions<UseMigrationProtocol>& deferred);
@@ -843,9 +825,8 @@ private:
struct THandlersVisitor : public TParent::TBaseHandlersVisitor {
THandlersVisitor(const TAReadSessionSettings<UseMigrationProtocol>& settings,
typename TParent::TEvent& event,
- TDeferredActions<UseMigrationProtocol>& deferred,
- std::shared_ptr<TImplTracker> tracker = std::make_shared<TImplTracker>())
- : TParent::TBaseHandlersVisitor(settings, event, tracker)
+ TDeferredActions<UseMigrationProtocol>& deferred)
+ : TParent::TBaseHandlersVisitor(settings, event)
, Deferred(deferred) {
}
@@ -896,14 +877,12 @@ private:
TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>& accumulator); // Assumes that we're under lock.
bool ApplyHandler(TReadSessionEventInfo<UseMigrationProtocol>& eventInfo, TDeferredActions<UseMigrationProtocol>& deferred) {
- THandlersVisitor visitor(this->Settings, eventInfo.GetEvent(), deferred, Tracker);
+ THandlersVisitor visitor(this->Settings, eventInfo.GetEvent(), deferred);
return visitor.Visit();
}
private:
bool HasEventCallbacks;
- std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> Session;
- std::shared_ptr<TImplTracker> Tracker;
};
} // namespace NYdb::NPersQueue
@@ -956,8 +935,7 @@ public:
std::shared_ptr<TReadSessionEventsQueue<UseMigrationProtocol>> eventsQueue,
NGrpc::IQueueClientContextPtr clientContext,
ui64 partitionStreamIdStart,
- ui64 partitionStreamIdStep,
- std::shared_ptr<TImplTracker> tracker = std::make_shared<TImplTracker>()
+ ui64 partitionStreamIdStep
)
: Settings(settings)
, Database(database)
@@ -972,7 +950,6 @@ public:
, CookieMapping()
, ReadSizeBudget(GetCompressedDataSizeLimit())
, ReadSizeServerDelta(0)
- , Tracker(std::move(tracker))
{
}
@@ -1026,6 +1003,8 @@ public:
return Log;
}
+ TCallbackContextPtr<UseMigrationProtocol> MakeCallbackContext();
+
private:
void BreakConnectionAndReconnectImpl(TPlainStatus&& status, TDeferredActions<UseMigrationProtocol>& deferred);
@@ -1208,7 +1187,7 @@ private:
i64 ReadSizeBudget;
i64 ReadSizeServerDelta = 0;
- std::shared_ptr<TImplTracker> Tracker;
+ TCallbackContextPtr<UseMigrationProtocol> CbContext;
};
// High level class that manages several read session impls.
@@ -1216,7 +1195,6 @@ private:
// This class communicates with cluster discovery service and then creates
// sessions to each cluster.
class TReadSession : public IReadSession,
- public IUserRetrievedEventCallback<true>,
public std::enable_shared_from_this<TReadSession> {
struct TClusterSessionInfo {
TClusterSessionInfo(const TString& cluster)
@@ -1299,11 +1277,8 @@ private:
void AbortImpl(EStatus statusCode, NYql::TIssues&& issues, TDeferredActions<true>& deferred);
void AbortImpl(EStatus statusCode, const TString& message, TDeferredActions<true>& deferred);
- void OnUserRetrievedEvent(i64 decompressedSize, size_t messagesCount) override;
-
void MakeCountersIfNeeded();
- void DumpCountersToLog(size_t timeNumber = 0);
- void ScheduleDumpCountersToLog(size_t timeNumber = 0);
+ void SetupCountersLogger();
private:
TReadSessionSettings Settings;
@@ -1320,14 +1295,14 @@ private:
IRetryPolicy::IRetryState::TPtr ClusterDiscoveryRetryState;
bool DataReadingSuspended = false;
- NGrpc::IQueueClientContextPtr DumpCountersContext;
-
// Exiting.
bool Aborting = false;
bool Closing = false;
- //
- std::shared_ptr<TImplTracker> Tracker;
+ std::vector<TCallbackContextPtr<true>> CbContexts;
+
+ std::shared_ptr<TCountersLogger<true>> CountersLogger;
+ std::shared_ptr<TCallbackContext<TCountersLogger<true>>> DumpCountersContext;
};
} // namespace NYdb::NPersQueue
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
index 038d207c18c..5256feb1fee 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
@@ -54,7 +54,7 @@ bool HasNullCounters(TReaderCounters& counters);
template<bool UseMigrationProtocol>
TLog TPartitionStreamImpl<UseMigrationProtocol>::GetLog() const {
- if (auto session = Session.lock()) {
+ if (auto session = CbContext->LockShared()) {
return session->GetLog();
}
return {};
@@ -63,7 +63,7 @@ TLog TPartitionStreamImpl<UseMigrationProtocol>::GetLog() const {
template<bool UseMigrationProtocol>
void TPartitionStreamImpl<UseMigrationProtocol>::Commit(ui64 startOffset, ui64 endOffset) {
std::vector<std::pair<ui64, ui64>> toCommit;
- if (auto sessionShared = Session.lock()) {
+ if (auto sessionShared = CbContext->LockShared()) {
Y_VERIFY(endOffset > startOffset);
with_lock(sessionShared->Lock) {
if (!AddToCommitRanges(startOffset, endOffset, true)) // Add range for real commit always.
@@ -84,21 +84,21 @@ void TPartitionStreamImpl<UseMigrationProtocol>::Commit(ui64 startOffset, ui64 e
template<bool UseMigrationProtocol>
void TPartitionStreamImpl<UseMigrationProtocol>::RequestStatus() {
- if (auto sessionShared = Session.lock()) {
+ if (auto sessionShared = CbContext->LockShared()) {
sessionShared->RequestPartitionStreamStatus(this);
}
}
template<bool UseMigrationProtocol>
void TPartitionStreamImpl<UseMigrationProtocol>::ConfirmCreate(TMaybe<ui64> readOffset, TMaybe<ui64> commitOffset) {
- if (auto sessionShared = Session.lock()) {
+ if (auto sessionShared = CbContext->LockShared()) {
sessionShared->ConfirmPartitionStreamCreate(this, readOffset, commitOffset);
}
}
template<bool UseMigrationProtocol>
void TPartitionStreamImpl<UseMigrationProtocol>::ConfirmDestroy() {
- if (auto sessionShared = Session.lock()) {
+ if (auto sessionShared = CbContext->LockShared()) {
sessionShared->ConfirmPartitionStreamDestroy(this);
}
}
@@ -129,6 +129,22 @@ void TPartitionStreamImpl<UseMigrationProtocol>::DeleteNotReadyTail(TDeferredAct
EventsQueue.DeleteNotReadyTail(deferred);
}
+template <bool UseMigrationProtocol>
+void TPartitionStreamImpl<UseMigrationProtocol>::GetDataEventImpl(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
+ size_t& maxEventsCount,
+ size_t& maxByteSize,
+ TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage>& messages,
+ TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage>& compressedMessages,
+ TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>& accumulator)
+{
+ partitionStream->EventsQueue.GetDataEventImpl(partitionStream,
+ maxEventsCount,
+ maxByteSize,
+ messages,
+ compressedMessages,
+ accumulator);
+}
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// TRawPartitionStreamEventQueue
@@ -213,8 +229,15 @@ TStringBuilder TSingleClusterReadSessionImpl<UseMigrationProtocol>::GetLogPrefix
return TStringBuilder() << GetDatabaseLogPrefix(Database) << "[" << SessionId << "] [" << ClusterName << "] ";
}
+template <bool UseMigrationProtocol>
+TCallbackContextPtr<UseMigrationProtocol> TSingleClusterReadSessionImpl<UseMigrationProtocol>::MakeCallbackContext() {
+ CbContext = std::make_shared<TCallbackContext<TSelf>>(this->shared_from_this());
+ return CbContext;
+}
+
template<bool UseMigrationProtocol>
void TSingleClusterReadSessionImpl<UseMigrationProtocol>::Start() {
+ Y_VERIFY(CbContext);
Settings.DecompressionExecutor_->Start();
Settings.EventHandlers_.HandlersExecutor_->Start();
if (!Reconnect(TPlainStatus())) {
@@ -300,17 +323,21 @@ bool TSingleClusterReadSessionImpl<UseMigrationProtocol>::Reconnect(const TPlain
// Destroy all partition streams before connecting.
DestroyAllPartitionStreamsImpl(deferred);
- connectCallback = [wire = Tracker->MakeTrackedWire(),
- sessionImpl = this->shared_from_this(),
- connectContext = connectContext](TPlainStatus&& st, typename IProcessor::TPtr&& processor) {
- sessionImpl->OnConnect(std::move(st), std::move(processor), connectContext); // OnConnect could be called inplace!
+ Y_VERIFY(CbContext);
+
+ connectCallback = [cbContext = CbContext,
+ connectContext = connectContext](TPlainStatus&& st, typename IProcessor::TPtr&& processor) {
+ if (auto borrowedSelf = cbContext->LockShared()) {
+ borrowedSelf->OnConnect(std::move(st), std::move(processor), connectContext); // OnConnect could be called inplace!
+ }
};
- connectTimeoutCallback = [wire = Tracker->MakeTrackedWire(),
- sessionImpl = this->shared_from_this(),
- connectTimeoutContext = connectTimeoutContext](bool ok) {
+ connectTimeoutCallback = [cbContext = CbContext,
+ connectTimeoutContext = connectTimeoutContext](bool ok) {
if (ok) {
- sessionImpl->OnConnectTimeout(connectTimeoutContext);
+ if (auto borrowedSelf = cbContext->LockShared()) {
+ borrowedSelf->OnConnectTimeout(connectTimeoutContext);
+ }
}
};
}
@@ -347,7 +374,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::BreakConnectionAndReco
Processor = nullptr;
RetryState = Settings.RetryPolicy_->CreateRetryState(); // Explicitly create retry state to determine whether we should connect to server again.
- deferred.DeferReconnection(this->shared_from_this(), std::move(status));
+ deferred.DeferReconnection(CbContext, std::move(status));
}
template<bool UseMigrationProtocol>
@@ -630,11 +657,11 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ConfirmPartitionStream
bool pushRes = true;
if constexpr (UseMigrationProtocol) {
- pushRes = EventsQueue->PushEvent(partitionStream, TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(),
+ pushRes = EventsQueue->PushEvent(partitionStream,
TClosedEvent(partitionStream, TClosedEvent::EReason::DestroyConfirmedByUser),
deferred);
} else {
- pushRes = EventsQueue->PushEvent(partitionStream, TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(),
+ pushRes = EventsQueue->PushEvent(partitionStream,
TClosedEvent(partitionStream, TClosedEvent::EReason::StopConfirmedByUser),
deferred);
}
@@ -796,13 +823,16 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ReadFromProcessorImpl(
if (Processor && !Closing) {
ServerMessage->Clear();
- auto callback = [wire = Tracker->MakeTrackedWire(),
- sessionImpl = this->shared_from_this(),
+ Y_VERIFY(CbContext);
+
+ auto callback = [cbContext = CbContext,
connectionGeneration = ConnectionGeneration,
// Capture message & processor not to read in freed memory.
serverMessage = ServerMessage,
processor = Processor](NGrpc::TGrpcStatus&& grpcStatus) {
- sessionImpl->OnReadDone(std::move(grpcStatus), connectionGeneration);
+ if (auto borrowedSelf = cbContext->LockShared()) {
+ borrowedSelf->OnReadDone(std::move(grpcStatus), connectionGeneration);
+ }
};
deferred.DeferReadFromProcessor(Processor, ServerMessage.get(), std::move(callback));
@@ -987,7 +1017,7 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl(
}
auto decompressionInfo = std::make_shared<TDataDecompressionInfo<true>>(std::move(partitionData),
- shared_from_this(),
+ CbContext,
Settings.Decompress_);
Y_VERIFY(decompressionInfo);
@@ -1013,7 +1043,7 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl(
NextPartitionStreamId, msg.topic().path(), msg.cluster(),
msg.partition() + 1, // Group.
msg.partition(), // Partition.
- msg.assign_id(), msg.read_offset(), weak_from_this());
+ msg.assign_id(), msg.read_offset(), CbContext);
NextPartitionStreamId += PartitionStreamIdStep;
// Renew partition stream.
@@ -1022,7 +1052,7 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl(
if (currentPartitionStream) {
CookieMapping.RemoveMapping(currentPartitionStream->GetPartitionStreamId());
bool pushRes = EventsQueue->PushEvent(
- currentPartitionStream, weak_from_this(),
+ currentPartitionStream,
TReadSessionEvent::TPartitionStreamClosedEvent(
currentPartitionStream, TReadSessionEvent::TPartitionStreamClosedEvent::EReason::Lost),
deferred);
@@ -1035,7 +1065,7 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl(
// Send event to user.
bool pushRes = EventsQueue->PushEvent(
- partitionStream, weak_from_this(),
+ partitionStream,
TReadSessionEvent::TCreatePartitionStreamEvent(partitionStream, msg.read_offset(), msg.end_offset()),
deferred);
if (!pushRes) {
@@ -1060,13 +1090,13 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl(
if (msg.forceful_release()) {
PartitionStreams.erase(msg.assign_id());
CookieMapping.RemoveMapping(partitionStream->GetPartitionStreamId());
- pushRes = EventsQueue->PushEvent(partitionStream, weak_from_this(),
+ pushRes = EventsQueue->PushEvent(partitionStream,
TReadSessionEvent::TPartitionStreamClosedEvent(
partitionStream, TReadSessionEvent::TPartitionStreamClosedEvent::EReason::Lost),
deferred);
} else {
pushRes = EventsQueue->PushEvent(
- partitionStream, weak_from_this(),
+ partitionStream,
TReadSessionEvent::TDestroyPartitionStreamEvent(std::move(partitionStream), msg.commit_offset()),
deferred);
}
@@ -1097,7 +1127,7 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl(
}
for (auto& [id, partitionStream] : partitionStreams) {
bool pushRes = EventsQueue->PushEvent(
- partitionStream, weak_from_this(),
+ partitionStream,
TReadSessionEvent::TCommitAcknowledgementEvent(partitionStream, partitionStream->GetMaxCommittedOffset()),
deferred);
if (!pushRes) {
@@ -1112,7 +1142,7 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl(
auto partitionStream = partitionStreamIt->second;
partitionStream->UpdateMaxCommittedOffset(rangeProto.end_offset());
bool pushRes = EventsQueue->PushEvent(
- partitionStream, weak_from_this(),
+ partitionStream,
TReadSessionEvent::TCommitAcknowledgementEvent(partitionStream, rangeProto.end_offset()),
deferred);
if (!pushRes) {
@@ -1134,7 +1164,7 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl(
if (partitionStreamIt == PartitionStreams.end()) {
return;
}
- bool pushRes = EventsQueue->PushEvent(partitionStreamIt->second, weak_from_this(),
+ bool pushRes = EventsQueue->PushEvent(partitionStreamIt->second,
TReadSessionEvent::TPartitionStreamStatusEvent(
partitionStreamIt->second, msg.committed_offset(),
0, // TODO: support read offset in status
@@ -1231,7 +1261,7 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
partitionStream->SetFirstNotReadOffset(desiredOffset);
auto decompressionInfo = std::make_shared<TDataDecompressionInfo<false>>(std::move(partitionData),
- shared_from_this(),
+ CbContext,
Settings.Decompress_,
serverBytesSize);
// TODO (ildar-khisam@): share serverBytesSize between partitions data according to their actual sizes;
@@ -1259,14 +1289,14 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
auto partitionStream = MakeIntrusive<TPartitionStreamImpl<false>>(
NextPartitionStreamId, msg.partition_session().path(), msg.partition_session().partition_id(),
msg.partition_session().partition_session_id(), msg.committed_offset(),
- weak_from_this());
+ CbContext);
NextPartitionStreamId += PartitionStreamIdStep;
// Renew partition stream.
TIntrusivePtr<TPartitionStreamImpl<false>>& currentPartitionStream = PartitionStreams[partitionStream->GetAssignId()];
if (currentPartitionStream) {
bool pushRes = EventsQueue->PushEvent(
- currentPartitionStream, weak_from_this(),
+ currentPartitionStream,
NTopic::TReadSessionEvent::TPartitionSessionClosedEvent(
currentPartitionStream, NTopic::TReadSessionEvent::TPartitionSessionClosedEvent::EReason::Lost),
deferred);
@@ -1278,7 +1308,7 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
currentPartitionStream = partitionStream;
// Send event to user.
- bool pushRes = EventsQueue->PushEvent(partitionStream, weak_from_this(),
+ bool pushRes = EventsQueue->PushEvent(partitionStream,
NTopic::TReadSessionEvent::TStartPartitionSessionEvent(
partitionStream, msg.committed_offset(), msg.partition_offsets().end()),
deferred);
@@ -1303,13 +1333,13 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
bool pushRes = true;
if (!msg.graceful()) {
PartitionStreams.erase(msg.partition_session_id());
- pushRes = EventsQueue->PushEvent(partitionStream, weak_from_this(),
+ pushRes = EventsQueue->PushEvent(partitionStream,
NTopic::TReadSessionEvent::TPartitionSessionClosedEvent(
partitionStream, NTopic::TReadSessionEvent::TPartitionSessionClosedEvent::EReason::Lost),
deferred);
} else {
pushRes = EventsQueue->PushEvent(
- partitionStream, weak_from_this(),
+ partitionStream,
NTopic::TReadSessionEvent::TStopPartitionSessionEvent(std::move(partitionStream), msg.committed_offset()),
deferred);
}
@@ -1333,7 +1363,7 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
if (partitionStreamIt != PartitionStreams.end()) {
auto partitionStream = partitionStreamIt->second;
partitionStream->UpdateMaxCommittedOffset(rangeProto.committed_offset());
- bool pushRes = EventsQueue->PushEvent(partitionStream, weak_from_this(),
+ bool pushRes = EventsQueue->PushEvent(partitionStream,
NTopic::TReadSessionEvent::TCommitOffsetAcknowledgementEvent(
partitionStream, rangeProto.committed_offset()),
deferred);
@@ -1356,7 +1386,7 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
if (partitionStreamIt == PartitionStreams.end()) {
return;
}
- bool pushRes = EventsQueue->PushEvent(partitionStreamIt->second, weak_from_this(),
+ bool pushRes = EventsQueue->PushEvent(partitionStreamIt->second,
NTopic::TReadSessionEvent::TPartitionSessionStatusEvent(
partitionStreamIt->second, msg.committed_offset(),
0, // TODO: support read offset in status
@@ -1421,7 +1451,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::DestroyAllPartitionStr
>;
for (auto&& [key, partitionStream] : PartitionStreams) {
- bool pushRes = EventsQueue->PushEvent(partitionStream, TSingleClusterReadSessionImpl<UseMigrationProtocol>::weak_from_this(),
+ bool pushRes = EventsQueue->PushEvent(partitionStream,
TClosedEvent(std::move(partitionStream), TClosedEvent::EReason::ConnectionLost),
deferred);
if (!pushRes) {
@@ -1738,22 +1768,22 @@ bool TSingleClusterReadSessionImpl<UseMigrationProtocol>::TPartitionCookieMappin
template<bool UseMigrationProtocol>
TReadSessionEventInfo<UseMigrationProtocol>::TReadSessionEventInfo(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
- std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session,
+ TCallbackContextPtr<UseMigrationProtocol> cbContext,
TEvent event)
: PartitionStream(std::move(partitionStream))
, Event(std::move(event))
- , Session(std::move(session))
+ , CbContext(std::move(cbContext))
{
}
template<bool UseMigrationProtocol>
TReadSessionEventInfo<UseMigrationProtocol>::TReadSessionEventInfo(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
- std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session,
+ TCallbackContextPtr<UseMigrationProtocol> cbContext,
bool hasDataEvents)
: PartitionStream(std::move(partitionStream))
, HasDataEvents(hasDataEvents)
, EventsCount(1)
- , Session(std::move(session))
+ , CbContext(std::move(cbContext))
{
}
@@ -1772,12 +1802,8 @@ bool TReadSessionEventInfo<UseMigrationProtocol>::IsDataEvent() const {
template <bool UseMigrationProtocol>
TReadSessionEventsQueue<UseMigrationProtocol>::TReadSessionEventsQueue(
- const TAReadSessionSettings<UseMigrationProtocol>& settings,
- std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> session,
- std::shared_ptr<TImplTracker> tracker)
- : TParent(settings)
- , Session(std::move(session))
- , Tracker(std::move(tracker)) {
+ const TAReadSessionSettings<UseMigrationProtocol>& settings)
+ : TParent(settings) {
const auto& h = TParent::Settings.EventHandlers_;
if constexpr (UseMigrationProtocol) {
@@ -1803,7 +1829,6 @@ TReadSessionEventsQueue<UseMigrationProtocol>::TReadSessionEventsQueue(
template <bool UseMigrationProtocol>
bool TReadSessionEventsQueue<UseMigrationProtocol>::PushEvent(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> stream,
- std::weak_ptr<IUserRetrievedEventCallback<UseMigrationProtocol>> /*session*/,
typename TAReadSessionEvent<UseMigrationProtocol>::TEvent event,
TDeferredActions<UseMigrationProtocol>& deferred)
{
@@ -1840,10 +1865,10 @@ void TReadSessionEventsQueue<UseMigrationProtocol>::SignalEventImpl(
return;
}
- auto session = partitionStream->GetSession();
+ auto cbContext = partitionStream->GetCbContext();
if (TParent::Events.empty()) {
- TParent::Events.emplace(std::move(partitionStream), std::move(session), isDataEvent);
+ TParent::Events.emplace(std::move(partitionStream), std::move(cbContext), isDataEvent);
} else {
auto& event = TParent::Events.back();
if (event.HasDataEvents
@@ -1851,7 +1876,7 @@ void TReadSessionEventsQueue<UseMigrationProtocol>::SignalEventImpl(
&& (event.PartitionStream == partitionStream)) {
++event.EventsCount;
} else {
- TParent::Events.emplace(std::move(partitionStream), std::move(session), isDataEvent);
+ TParent::Events.emplace(std::move(partitionStream), std::move(cbContext), isDataEvent);
}
}
@@ -1875,22 +1900,6 @@ bool TReadSessionEventsQueue<UseMigrationProtocol>::PushDataEvent(TIntrusivePtr<
}
template <bool UseMigrationProtocol>
-void TPartitionStreamImpl<UseMigrationProtocol>::GetDataEventImpl(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
- size_t& maxEventsCount,
- size_t& maxByteSize,
- TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TMessage>& messages,
- TVector<typename TAReadSessionEvent<UseMigrationProtocol>::TDataReceivedEvent::TCompressedMessage>& compressedMessages,
- TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>& accumulator)
-{
- partitionStream->EventsQueue.GetDataEventImpl(partitionStream,
- maxEventsCount,
- maxByteSize,
- messages,
- compressedMessages,
- accumulator);
-}
-
-template <bool UseMigrationProtocol>
void TRawPartitionStreamEventQueue<UseMigrationProtocol>::GetDataEventImpl(TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
size_t& maxEventsCount,
size_t& maxByteSize,
@@ -1989,7 +1998,7 @@ TReadSessionEventsQueue<UseMigrationProtocol>::GetEventImpl(size_t& maxByteSize,
}
TMaybe<typename TAReadSessionEvent<UseMigrationProtocol>::TEvent> event;
- auto frontSession = front.Session;
+ auto frontCbContext = front.CbContext;
if (partitionStream->TopEvent().IsDataEvent()) {
event = GetDataEventImpl(partitionStream, maxByteSize, accumulator);
} else {
@@ -2001,12 +2010,12 @@ TReadSessionEventsQueue<UseMigrationProtocol>::GetEventImpl(size_t& maxByteSize,
TParent::RenewWaiterImpl();
- return {partitionStream, std::move(frontSession), std::move(*event)};
+ return {partitionStream, std::move(frontCbContext), std::move(*event)};
}
Y_ASSERT(TParent::CloseEvent);
- return {*TParent::CloseEvent, Session};
+ return {*TParent::CloseEvent};
}
template <bool UseMigrationProtocol>
@@ -2104,7 +2113,7 @@ template <bool UseMigrationProtocol>
bool TReadSessionEventsQueue<UseMigrationProtocol>::TryApplyCallbackToEventImpl(typename TParent::TEvent& event,
TDeferredActions<UseMigrationProtocol>& deferred)
{
- THandlersVisitor visitor(TParent::Settings, event, deferred, Tracker);
+ THandlersVisitor visitor(TParent::Settings, event, deferred);
return visitor.Visit();
}
@@ -2133,8 +2142,7 @@ void TReadSessionEventsQueue<UseMigrationProtocol>::ApplyCallbackToEventImpl(TDa
Y_VERIFY(HasEventCallbacks);
if (TParent::Settings.EventHandlers_.DataReceivedHandler_) {
- auto action = [wire = Tracker->MakeTrackedWire(),
- func = TParent::Settings.EventHandlers_.DataReceivedHandler_,
+ auto action = [func = TParent::Settings.EventHandlers_.DataReceivedHandler_,
data = std::move(data),
eventsInfo = std::move(eventsInfo)]() mutable {
func(data);
@@ -2143,8 +2151,7 @@ void TReadSessionEventsQueue<UseMigrationProtocol>::ApplyCallbackToEventImpl(TDa
deferred.DeferStartExecutorTask(TParent::Settings.EventHandlers_.HandlersExecutor_, std::move(action));
} else if (TParent::Settings.EventHandlers_.CommonHandler_) {
- auto action = [wire = Tracker->MakeTrackedWire(),
- func = TParent::Settings.EventHandlers_.CommonHandler_,
+ auto action = [func = TParent::Settings.EventHandlers_.CommonHandler_,
data = std::move(data),
eventsInfo = std::move(eventsInfo)]() mutable {
typename TParent::TEvent event(std::move(data));
@@ -2185,12 +2192,12 @@ void TReadSessionEventsQueue<UseMigrationProtocol>::ClearAllEvents() {
template<bool UseMigrationProtocol>
TDataDecompressionInfo<UseMigrationProtocol>::TDataDecompressionInfo(
TPartitionData<UseMigrationProtocol>&& msg,
- std::weak_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session,
+ TCallbackContextPtr<UseMigrationProtocol> cbContext,
bool doDecompress,
i64 serverBytesSize
)
: ServerMessage(std::move(msg))
- , Session(std::move(session))
+ , CbContext(std::move(cbContext))
, DoDecompress(doDecompress)
, ServerBytesSize(serverBytesSize)
{
@@ -2214,7 +2221,7 @@ TDataDecompressionInfo<UseMigrationProtocol>::TDataDecompressionInfo(
template<bool UseMigrationProtocol>
TDataDecompressionInfo<UseMigrationProtocol>::~TDataDecompressionInfo()
{
- if (auto session = Session.lock()) {
+ if (auto session = CbContext->LockShared()) {
session->OnDecompressionInfoDestroy(CompressedDataSize, DecompressedDataSize, MessagesInflight, ServerBytesSize);
}
}
@@ -2288,7 +2295,7 @@ i64 TDataDecompressionInfo<UseMigrationProtocol>::StartDecompressionTasks(
const typename IAExecutor<UseMigrationProtocol>::TPtr& executor, i64 availableMemory,
TDeferredActions<UseMigrationProtocol>& deferred)
{
- auto session = Session.lock();
+ auto session = CbContext->LockShared();
Y_ASSERT(session);
i64 used = 0;
@@ -2314,7 +2321,7 @@ void TDataDecompressionInfo<UseMigrationProtocol>::PlanDecompressionTasks(double
{
constexpr size_t TASK_LIMIT = 512_KB;
- auto session = Session.lock();
+ auto session = CbContext->LockShared();
Y_ASSERT(session);
ReadyThresholds.emplace_back();
@@ -2340,8 +2347,10 @@ void TDataDecompressionInfo<UseMigrationProtocol>::PlanDecompressionTasks(double
TDataDecompressionInfo::shared_from_this(),
ReadyThresholds.back().Ready);
if (!pushRes) {
- session->AbortImpl();
+ // with_lock(session->Lock) {
+ session->Abort();
return;
+ // }
}
}
@@ -2471,7 +2480,7 @@ void TDataDecompressionInfo<UseMigrationProtocol>::OnDataDecompressed(i64 source
CompressedDataSize -= sourceSize;
DecompressedDataSize += decompressedSize;
- if (auto session = Session.lock()) {
+ if (auto session = CbContext->LockShared()) {
// TODO (ildar-khisam@): distribute total ServerBytesSize in proportion of source size
// Use CompressedDataSize, sourceSize, ServerBytesSize
session->OnDataDecompressed(sourceSize, estimatedDecompressedSize, decompressedSize, messagesCount, std::exchange(ServerBytesSize, 0));
@@ -2484,7 +2493,7 @@ void TDataDecompressionInfo<UseMigrationProtocol>::OnUserRetrievedEvent(i64 deco
MessagesInflight -= messagesCount;
DecompressedDataSize -= decompressedSize;
- if (auto session = Session.lock()) {
+ if (auto session = CbContext->LockShared()) {
session->OnUserRetrievedEvent(decompressedSize, messagesCount);
}
}
@@ -2561,27 +2570,25 @@ void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::operator(
Parent->PutDecompressionError(std::current_exception(), messages.Batch, i);
data.clear_data(); // Free memory, because we don't count it.
- std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session = Parent->Session.lock();
- if (session) {
+ if (auto session = Parent->CbContext->LockShared()) {
session->GetLog() << TLOG_INFO << "Error decompressing data: " << CurrentExceptionMessage();
}
}
}
}
- if (auto session = Parent->Session.lock()) {
+ if (auto session = Parent->CbContext->LockShared()) {
LOG_LAZY(session->GetLog(), TLOG_DEBUG, TStringBuilder() << "Decompression task done. Partition/PartitionSessionId: "
<< partition_id << " (" << minOffset << "-"
<< maxOffset << ")");
}
Y_ASSERT(dataProcessed == SourceDataSize);
- std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session = Parent->Session.lock();
Parent->OnDataDecompressed(SourceDataSize, EstimatedDecompressedSize, DecompressedSize, messagesProcessed);
Parent->SourceDataNotProcessed -= dataProcessed;
Ready->Ready = true;
- if (session) {
+ if (auto session = Parent->CbContext->LockShared()) {
session->GetEventsQueue()->SignalReadyEvents(PartitionStream);
}
}
@@ -2628,37 +2635,37 @@ void TDeferredActions<UseMigrationProtocol>::DeferStartExecutorTask(const typena
}
template<bool UseMigrationProtocol>
-void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, TASessionClosedEvent<UseMigrationProtocol>&& closeEvent) {
- Session = session;
+void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(TCallbackContextPtr<UseMigrationProtocol> cbContext, TASessionClosedEvent<UseMigrationProtocol>&& closeEvent) {
+ CbContext = std::move(cbContext);
SessionClosedEvent.ConstructInPlace(std::move(closeEvent));
}
template<bool UseMigrationProtocol>
-void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, EStatus statusCode, NYql::TIssues&& issues) {
- DeferAbortSession(session, TASessionClosedEvent<UseMigrationProtocol>(statusCode, std::move(issues)));
+void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(TCallbackContextPtr<UseMigrationProtocol> cbContext, EStatus statusCode, NYql::TIssues&& issues) {
+ DeferAbortSession(std::move(cbContext), TASessionClosedEvent<UseMigrationProtocol>(statusCode, std::move(issues)));
}
template<bool UseMigrationProtocol>
-void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, EStatus statusCode, const TString& message) {
+void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(TCallbackContextPtr<UseMigrationProtocol> cbContext, EStatus statusCode, const TString& message) {
NYql::TIssues issues;
issues.AddIssue(message);
- DeferAbortSession(session, statusCode, std::move(issues));
+ DeferAbortSession(std::move(cbContext), statusCode, std::move(issues));
}
template<bool UseMigrationProtocol>
-void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, TPlainStatus&& status) {
- DeferAbortSession(session, TASessionClosedEvent<UseMigrationProtocol>(std::move(status)));
+void TDeferredActions<UseMigrationProtocol>::DeferAbortSession(TCallbackContextPtr<UseMigrationProtocol> cbContext, TPlainStatus&& status) {
+ DeferAbortSession(std::move(cbContext), TASessionClosedEvent<UseMigrationProtocol>(std::move(status)));
}
template<bool UseMigrationProtocol>
-void TDeferredActions<UseMigrationProtocol>::DeferReconnection(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session, TPlainStatus&& status) {
- Session = std::move(session);
+void TDeferredActions<UseMigrationProtocol>::DeferReconnection(TCallbackContextPtr<UseMigrationProtocol> cbContext, TPlainStatus&& status) {
+ CbContext = std::move(cbContext);
ReconnectionStatus = std::move(status);
}
template<bool UseMigrationProtocol>
-void TDeferredActions<UseMigrationProtocol>::DeferStartSession(std::shared_ptr<TSingleClusterReadSessionImpl<UseMigrationProtocol>> session) {
- Sessions.push_back(std::move(session));
+void TDeferredActions<UseMigrationProtocol>::DeferStartSession(TCallbackContextPtr<UseMigrationProtocol> cbContext) {
+ CbContexts.push_back(std::move(cbContext));
}
template<bool UseMigrationProtocol>
@@ -2684,8 +2691,10 @@ void TDeferredActions<UseMigrationProtocol>::DoActions() {
template<bool UseMigrationProtocol>
void TDeferredActions<UseMigrationProtocol>::StartSessions() {
- for (auto& session : Sessions) {
- session->Start();
+ for (auto& ctx : CbContexts) {
+ if (auto session = ctx->LockShared()) {
+ session->Start();
+ }
}
}
@@ -2708,16 +2717,20 @@ void TDeferredActions<UseMigrationProtocol>::StartExecutorTasks() {
template<bool UseMigrationProtocol>
void TDeferredActions<UseMigrationProtocol>::AbortSession() {
if (SessionClosedEvent) {
- Y_VERIFY(Session);
- Session->AbortSession(std::move(*SessionClosedEvent));
+ Y_VERIFY(CbContext);
+ if (auto session = CbContext->LockShared()) {
+ session->AbortSession(std::move(*SessionClosedEvent));
+ }
}
}
template<bool UseMigrationProtocol>
void TDeferredActions<UseMigrationProtocol>::Reconnect() {
- if (Session) {
- if (!Session->Reconnect(ReconnectionStatus)) {
- Session->AbortSession(std::move(ReconnectionStatus));
+ if (CbContext) {
+ if (auto session = CbContext->LockShared()) {
+ if (!session->Reconnect(ReconnectionStatus)) {
+ session->AbortSession(std::move(ReconnectionStatus));
+ }
}
}
}
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp
index 607ebfb8552..9b1248ef375 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp
@@ -17,9 +17,10 @@ TWriteSession::TWriteSession(
std::shared_ptr<TPersQueueClient::TImpl> client,
std::shared_ptr<TGRpcConnectionsImpl> connections,
TDbDriverStatePtr dbDriverState)
- : Tracker(std::make_shared<TImplTracker>())
- , Impl(std::make_shared<TWriteSessionImpl>(settings, std::move(client), std::move(connections), std::move(dbDriverState), Tracker))
+ : Impl(std::make_shared<TWriteSessionImpl>(settings, std::move(client), std::move(connections), std::move(dbDriverState)))
{
+ CbContext = std::make_shared<TCallbackContext<TWriteSessionImpl>>(Impl);
+ Impl->SetCallbackContext(CbContext);
}
void TWriteSession::Start(const TDuration& delay) {
@@ -58,7 +59,7 @@ bool TWriteSession::Close(TDuration closeTimeout) {
TWriteSession::~TWriteSession() {
Impl->Close(TDuration::Zero());
- Tracker->AsyncComplete().Wait();
+ CbContext->Cancel();
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h
index 62732d67c94..404b9444e58 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h
@@ -1,11 +1,11 @@
#pragma once
#include "common.h"
-#include "impl_tracker.h"
#include "persqueue_impl.h"
#include "write_session_impl.h"
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h>
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/callback_context.h>
#include <util/generic/buffer.h>
@@ -58,7 +58,7 @@ private:
void Start(const TDuration& delay);
private:
- std::shared_ptr<TImplTracker> Tracker;
+ std::shared_ptr<NPersQueue::TCallbackContext<TWriteSessionImpl>> CbContext;
std::shared_ptr<TWriteSessionImpl> Impl;
};
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp
index 908ee390333..531f2655e6e 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp
@@ -45,15 +45,13 @@ TWriteSessionImpl::TWriteSessionImpl(
const TWriteSessionSettings& settings,
std::shared_ptr<TPersQueueClient::TImpl> client,
std::shared_ptr<TGRpcConnectionsImpl> connections,
- TDbDriverStatePtr dbDriverState,
- std::shared_ptr<TImplTracker> tracker)
+ TDbDriverStatePtr dbDriverState)
: Settings(settings)
, Client(std::move(client))
, Connections(std::move(connections))
, DbDriverState(std::move(dbDriverState))
, PrevToken(DbDriverState->CredentialsProvider ? DbDriverState->CredentialsProvider->GetAuthInfo() : "")
- , Tracker(tracker)
- , EventsQueue(std::make_shared<TWriteSessionEventsQueue>(Settings, Tracker))
+ , EventsQueue(std::make_shared<TWriteSessionEventsQueue>(Settings))
, InitSeqNoPromise(NThreading::NewPromise<ui64>())
, WakeupInterval(
Settings.BatchFlushInterval_.GetOrElse(TDuration::Zero()) ?
@@ -77,7 +75,12 @@ TWriteSessionImpl::TWriteSessionImpl(
}
+void TWriteSessionImpl::SetCallbackContext(std::shared_ptr<TCallbackContext<TWriteSessionImpl>> ctx) {
+ CbContext = std::move(ctx);
+}
+
void TWriteSessionImpl::Start(const TDuration& delay) {
+ Y_VERIFY(CbContext);
++ConnectionAttemptsDone;
if (!Started) {
with_lock(Lock) {
@@ -143,14 +146,16 @@ void TWriteSessionImpl::DoCdsRequest(TDuration delay) {
(Settings.ClusterDiscoveryMode_ == EClusterDiscoveryMode::Auto && !IsFederation(DbDriverState->DiscoveryEndpoint)));
if (!cdsRequestIsUnnecessary) {
- auto extractor = [sharedThis = shared_from_this(), wire = Tracker->MakeTrackedWire()]
+ auto extractor = [cbContext = CbContext]
(google::protobuf::Any* any, TPlainStatus status) mutable {
Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResult result;
if (any) {
any->UnpackTo(&result);
}
TStatus st(std::move(status));
- sharedThis->OnCdsResponse(st, result);
+ if (auto self = cbContext->LockShared()) {
+ self->OnCdsResponse(st, result);
+ }
};
Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest req;
@@ -163,7 +168,7 @@ void TWriteSessionImpl::DoCdsRequest(TDuration delay) {
params->set_preferred_cluster_name(*Settings.PreferredCluster_);
LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Do schedule cds request after " << delay.MilliSeconds() << " ms\n");
- auto cdsRequestCall = [wire = Tracker->MakeTrackedWire(), req_=std::move(req), extr=std::move(extractor), connections = std::shared_ptr<TGRpcConnectionsImpl>(Connections), dbState=DbDriverState, settings=Settings]() mutable {
+ auto cdsRequestCall = [req_=std::move(req), extr=std::move(extractor), connections = std::shared_ptr<TGRpcConnectionsImpl>(Connections), dbState=DbDriverState, settings=Settings]() mutable {
LOG_LAZY(dbState->Log, TLOG_INFO, TStringBuilder() << "MessageGroupId [" << settings.MessageGroupId_ << "] Running cds request ms\n");
connections->RunDeferred<Ydb::PersQueue::V1::ClusterDiscoveryService,
Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest,
@@ -463,19 +468,18 @@ void TWriteSessionImpl::DoConnect(const TDuration& delay, const TString& endpoin
Y_ASSERT(connectTimeoutContext);
reqSettings = TRpcRequestSettings::Make(Settings);
- connectCallback = [sharedThis = shared_from_this(),
- wire = Tracker->MakeTrackedWire(),
- connectContext = connectContext]
- (TPlainStatus&& st, typename IProcessor::TPtr&& processor) {
- sharedThis->OnConnect(std::move(st), std::move(processor), connectContext);
+ connectCallback = [cbContext = CbContext,
+ connectContext = connectContext](TPlainStatus&& st, typename IProcessor::TPtr&& processor) {
+ if (auto self = cbContext->LockShared()) {
+ self->OnConnect(std::move(st), std::move(processor), connectContext);
+ }
};
- connectTimeoutCallback = [sharedThis = shared_from_this(),
- wire = Tracker->MakeTrackedWire(),
- connectTimeoutContext = connectTimeoutContext]
- (bool ok) {
+ connectTimeoutCallback = [cbContext = CbContext, connectTimeoutContext = connectTimeoutContext](bool ok) {
if (ok) {
- sharedThis->OnConnectTimeout(connectTimeoutContext);
+ if (auto self = cbContext->LockShared()) {
+ self->OnConnectTimeout(connectTimeoutContext);
+ }
}
};
}
@@ -586,10 +590,11 @@ void TWriteSessionImpl::WriteToProcessorImpl(TWriteSessionImpl::TClientMessage&&
if (Aborting) {
return;
}
- auto callback = [sharedThis = shared_from_this(),
- wire = Tracker->MakeTrackedWire(),
+ auto callback = [cbContext = CbContext,
connectionGeneration = ConnectionGeneration](NGrpc::TGrpcStatus&& grpcStatus) {
- sharedThis->OnWriteDone(std::move(grpcStatus), connectionGeneration);
+ if (auto self = cbContext->LockShared()) {
+ self->OnWriteDone(std::move(grpcStatus), connectionGeneration);
+ }
};
Processor->Write(std::move(req), std::move(callback));
@@ -606,13 +611,14 @@ void TWriteSessionImpl::ReadFromProcessor() {
}
prc = Processor;
generation = ConnectionGeneration;
- callback = [sharedThis = shared_from_this(),
- wire = Tracker->MakeTrackedWire(),
- connectionGeneration = generation,
- processor = prc,
- serverMessage = ServerMessage]
- (NGrpc::TGrpcStatus&& grpcStatus) {
- sharedThis->OnReadDone(std::move(grpcStatus), connectionGeneration);
+ callback = [cbContext = CbContext,
+ connectionGeneration = generation,
+ processor = prc,
+ serverMessage = ServerMessage]
+ (NGrpc::TGrpcStatus&& grpcStatus) {
+ if (auto self = cbContext->LockShared()) {
+ self->OnReadDone(std::move(grpcStatus), connectionGeneration);
+ }
};
}
prc->Read(ServerMessage.get(), std::move(callback));
@@ -896,8 +902,7 @@ void TWriteSessionImpl::CompressImpl(TBlock&& block_) {
std::shared_ptr<TBlock> blockPtr(std::make_shared<TBlock>());
blockPtr->Move(block_);
- auto lambda = [sharedThis = shared_from_this(),
- wire = Tracker->MakeTrackedWire(),
+ auto lambda = [cbContext = CbContext,
codec = Settings.Codec_,
level = Settings.CompressionLevel_,
isSyncCompression = !CompressionExecutor->IsAsync(),
@@ -910,8 +915,10 @@ void TWriteSessionImpl::CompressImpl(TBlock&& block_) {
Y_VERIFY(!compressedData.Empty());
blockPtr->Data = std::move(compressedData);
blockPtr->Compressed = true;
- blockPtr->CodecID = GetCodecId(sharedThis->Settings.Codec_);
- sharedThis->OnCompressed(std::move(*blockPtr), isSyncCompression);
+ blockPtr->CodecID = GetCodecId(codec);
+ if (auto self = cbContext->LockShared()) {
+ self->OnCompressed(std::move(*blockPtr), isSyncCompression);
+ }
};
CompressionExecutor->Post(std::move(lambda));
@@ -1272,13 +1279,16 @@ void TWriteSessionImpl::HandleWakeUpImpl() {
if (AtomicGet(Aborting)) {
return;
}
- auto callback = [sharedThis = this->shared_from_this(), wire = Tracker->MakeTrackedWire()] (bool ok)
+ auto callback = [cbContext = CbContext] (bool ok)
{
if (!ok) {
return;
}
- with_lock(sharedThis->Lock) {
- sharedThis->HandleWakeUpImpl();
+
+ if (auto self = cbContext->LockShared()) {
+ with_lock(self->Lock) {
+ self->HandleWakeUpImpl();
+ }
}
};
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h
index 8124fc60320..06f27b86007 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h
@@ -1,10 +1,10 @@
#pragma once
#include "common.h"
-#include "impl_tracker.h"
#include "persqueue_impl.h"
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h>
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/callback_context.h>
#include <util/generic/buffer.h>
@@ -28,10 +28,8 @@ class TWriteSessionEventsQueue: public TBaseSessionEventsQueue<TWriteSessionSett
using TParent = TBaseSessionEventsQueue<TWriteSessionSettings, TWriteSessionEvent::TEvent, TSessionClosedEvent, IExecutor>;
public:
- TWriteSessionEventsQueue(const TWriteSessionSettings& settings,
- std::shared_ptr<TImplTracker> tracker = std::make_shared<TImplTracker>())
+ TWriteSessionEventsQueue(const TWriteSessionSettings& settings)
: TParent(settings)
- , Tracker(std::move(tracker))
{}
void PushEvent(TEventInfo eventInfo) {
@@ -131,7 +129,7 @@ private:
};
bool ApplyHandler(TEventInfo& eventInfo) {
- THandlersVisitor visitor(Settings, eventInfo.GetEvent(), Tracker);
+ THandlersVisitor visitor(Settings, eventInfo.GetEvent());
return visitor.Visit();
}
@@ -146,9 +144,6 @@ private:
Y_ASSERT(CloseEvent);
return {*CloseEvent};
}
-
-private:
- std::shared_ptr<TImplTracker> Tracker;
};
struct TMemoryUsageChange {
@@ -305,8 +300,7 @@ public:
TWriteSessionImpl(const TWriteSessionSettings& settings,
std::shared_ptr<TPersQueueClient::TImpl> client,
std::shared_ptr<TGRpcConnectionsImpl> connections,
- TDbDriverStatePtr dbDriverState,
- std::shared_ptr<TImplTracker> tracker);
+ TDbDriverStatePtr dbDriverState);
TMaybe<TWriteSessionEvent::TEvent> GetEvent(bool block = false) override;
TVector<TWriteSessionEvent::TEvent> GetEvents(bool block = false,
@@ -329,6 +323,8 @@ public:
~TWriteSessionImpl(); // will not call close - destroy everything without acks
+ void SetCallbackContext(std::shared_ptr<TCallbackContext<TWriteSessionImpl>> ctx);
+
private:
TStringBuilder LogPrefix() const;
@@ -396,7 +392,7 @@ private:
TStringType PrevToken;
bool UpdateTokenInProgress = false;
TInstant LastTokenUpdate = TInstant::Zero();
- std::shared_ptr<TImplTracker> Tracker;
+ std::shared_ptr<TCallbackContext<TWriteSessionImpl>> CbContext;
std::shared_ptr<TWriteSessionEventsQueue> EventsQueue;
NGrpc::IQueueClientContextPtr ClientContext; // Common client context.
NGrpc::IQueueClientContextPtr ConnectContext;
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp
index 1e399534d67..989ff5b72ab 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp
@@ -510,6 +510,7 @@ public:
ui64 PartitionIdStart = 1;
ui64 PartitionIdStep = 1;
typename TSingleClusterReadSessionImpl<true>::TPtr Session;
+ std::shared_ptr<TCallbackContext<TSingleClusterReadSessionImpl<true>>> CbContext;
std::shared_ptr<TThreadPool> ThreadPool;
::IExecutor::TPtr DefaultExecutor;
};
@@ -598,6 +599,7 @@ TReadSessionImplTestSetup::~TReadSessionImplTestSetup() noexcept(false) {
MockProcessor->Validate();
}
+ CbContext->Cancel();
Session = nullptr;
ThreadPool->Stop();
@@ -630,13 +632,14 @@ TSingleClusterReadSessionImpl<true>* TReadSessionImplTestSetup::GetSession() {
GetEventsQueue(),
FakeContext,
PartitionIdStart, PartitionIdStep);
+ CbContext = Session->MakeCallbackContext();
}
return Session.get();
}
std::shared_ptr<TReadSessionEventsQueue<true>> TReadSessionImplTestSetup::GetEventsQueue() {
if (!EventsQueue) {
- EventsQueue = std::make_shared<TReadSessionEventsQueue<true>>(Settings, std::weak_ptr<IUserRetrievedEventCallback<true>>());
+ EventsQueue = std::make_shared<TReadSessionEventsQueue<true>>(Settings);
}
return EventsQueue;
}
@@ -727,14 +730,14 @@ Y_UNIT_TEST_SUITE(PersQueueSdkReadSessionTest) {
}
// Event 4: commit ack.
- if (commit && !close) { // (commit && close) branch check is broken with current TReadSession::Close quick fix
+ if (commit) { // (commit && close) branch check is broken with current TReadSession::Close quick fix
TMaybe<TReadSessionEvent::TEvent> event = session->GetEvent(!close); // Event is expected to be already in queue if closed.
UNIT_ASSERT(event);
- Cerr << "commit ack event " << DebugString(*event) << Endl;
- UNIT_ASSERT(std::holds_alternative<TReadSessionEvent::TCommitAcknowledgementEvent>(*event));
+ Cerr << "commit ack or close event " << DebugString(*event) << Endl;
+ UNIT_ASSERT(std::holds_alternative<TReadSessionEvent::TCommitAcknowledgementEvent>(*event) || std::holds_alternative<TSessionClosedEvent>(*event));
}
- if (close) {
+ if (close && !commit) {
TMaybe<TReadSessionEvent::TEvent> event = session->GetEvent(false);
UNIT_ASSERT(event);
Cerr << "close event " << DebugString(*event) << Endl;
@@ -1885,7 +1888,8 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) {
TAReadSessionSettings<true> settings;
std::shared_ptr<TSingleClusterReadSessionImpl<true>> session;
- TReadSessionEventsQueue<true> sessionQueue{settings, session};
+ auto cbCtx = std::make_shared<TCallbackContext<TSingleClusterReadSessionImpl<true>>>(session);
+ TReadSessionEventsQueue<true> sessionQueue{settings};
auto stream = MakeIntrusive<TPartitionStreamImpl<true>>(1ull,
"",
@@ -1894,7 +1898,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) {
1ull,
1ull,
0ull,
- session);
+ cbCtx);
TPartitionData<true> message;
Ydb::PersQueue::V1::MigrationStreamingReadServerMessage_DataBatch_Batch* batch =
@@ -1904,7 +1908,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) {
*messageData->mutable_data() = "*";
auto data = std::make_shared<TDataDecompressionInfo<true>>(std::move(message),
- session,
+ cbCtx,
false,
0);
@@ -1933,5 +1937,7 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) {
#undef UNIT_ASSERT_CONTROL_EVENT
#undef UNIT_ASSERT_DATA_EVENT
+
+ cbCtx->Cancel();
}
}
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp
index fb340907393..0d7ad23ea1b 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp
@@ -37,14 +37,16 @@ TReadSession::~TReadSession() {
Abort(EStatus::ABORTED, "Aborted");
ClearAllEvents();
- if (Tracker) {
- Tracker->AsyncComplete().Wait();
+ if (CbContext) {
+ CbContext->Cancel();
+ }
+ if (DumpCountersContext) {
+ DumpCountersContext->Cancel();
}
}
void TReadSession::Start() {
- Tracker = std::make_shared<NPersQueue::TImplTracker>();
- EventsQueue = std::make_shared<NPersQueue::TReadSessionEventsQueue<false>>(Settings, weak_from_this(), Tracker);
+ EventsQueue = std::make_shared<NPersQueue::TReadSessionEventsQueue<false>>(Settings);
if (!ValidateSettings()) {
return;
@@ -60,7 +62,7 @@ void TReadSession::Start() {
Topics = Settings.Topics_;
CreateClusterSessionsImpl(deferred);
}
- // ScheduleDumpCountersToLog();
+ SetupCountersLogger();
}
void TReadSession::CreateClusterSessionsImpl(NPersQueue::TDeferredActions<false>& deferred) {
@@ -85,10 +87,10 @@ void TReadSession::CreateClusterSessionsImpl(NPersQueue::TDeferredActions<false>
Client->CreateReadSessionConnectionProcessorFactory(),
EventsQueue,
context,
- 1, 1,
- Tracker);
+ 1, 1);
- deferred.DeferStartSession(Session);
+ CbContext = Session->MakeCallbackContext();
+ deferred.DeferStartSession(CbContext);
}
bool TReadSession::ValidateSettings() {
@@ -97,12 +99,12 @@ bool TReadSession::ValidateSettings() {
issues.AddIssue("Empty topics list.");
}
- if (Settings.ConsumerName_.empty() && !Settings.WithoutConsumer_) {
- issues.AddIssue("No consumer specified.");
+ if (Settings.ConsumerName_.empty() && !Settings.WithoutConsumer_) {
+ issues.AddIssue("No consumer specified.");
}
- if (!Settings.ConsumerName_.empty() && Settings.WithoutConsumer_) {
- issues.AddIssue("No need to specify a consumer when reading without a consumer.");
+ if (!Settings.ConsumerName_.empty() && Settings.WithoutConsumer_) {
+ issues.AddIssue("No need to specify a consumer when reading without a consumer.");
}
if (Settings.MaxMemoryUsageBytes_ < 1_MB) {
@@ -244,11 +246,10 @@ void TReadSession::UpdateOffsets(const NTable::TTransaction& tx)
bool TReadSession::Close(TDuration timeout) {
LOG_LAZY(Log, TLOG_INFO, GetLogPrefix() << "Closing read session. Close timeout: " << timeout);
// Log final counters.
- DumpCountersToLog();
+ CountersLogger->Stop();
with_lock (Lock) {
if (DumpCountersContext) {
DumpCountersContext->Cancel();
- DumpCountersContext.reset();
}
}
@@ -321,15 +322,6 @@ TStringBuilder TReadSession::GetLogPrefix() const {
return TStringBuilder() << GetDatabaseLogPrefix(DbDriverState->Database) << "[" << SessionId << "] ";
}
-void TReadSession::OnUserRetrievedEvent(i64 decompressedSize, size_t messagesCount) {
- LOG_LAZY(Log, TLOG_DEBUG, GetLogPrefix()
- << "The application data is transferred to the client. Number of messages "
- << messagesCount
- << ", size "
- << decompressedSize
- << " bytes");
-}
-
void TReadSession::MakeCountersIfNeeded() {
if (!Settings.Counters_ || NPersQueue::HasNullCounters(*Settings.Counters_)) {
TReaderCounters::TPtr counters = MakeIntrusive<TReaderCounters>();
@@ -341,78 +333,17 @@ void TReadSession::MakeCountersIfNeeded() {
}
}
-void TReadSession::DumpCountersToLog(size_t timeNumber) {
- const bool logCounters = timeNumber % 60 == 0; // Every 1 minute.
- const bool dumpSessionsStatistics = timeNumber % 600 == 0; // Every 10 minutes.
-
- *Settings.Counters_->CurrentSessionLifetimeMs = (TInstant::Now() - StartSessionTime).MilliSeconds();
- NPersQueue::TSingleClusterReadSessionImpl<false>::TPtr session;
- with_lock (Lock) {
- if (Closing || Aborting) {
- return;
- }
-
- session = Session;
- }
-
- {
- TMaybe<TLogElement> log;
- if (dumpSessionsStatistics) {
- log.ConstructInPlace(&Log, TLOG_INFO);
- (*log) << "Read/commit by partition streams (cluster:topic:partition:stream-id:read-offset:committed-offset):";
- }
- session->UpdateMemoryUsageStatistics();
- if (dumpSessionsStatistics) {
- session->DumpStatisticsToLog(*log);
- }
- }
-
-#define C(counter) \
- << " " Y_STRINGIZE(counter) ": " \
- << Settings.Counters_->counter->Val() \
- /**/
-
- if (logCounters) {
- LOG_LAZY(Log, TLOG_INFO,
- GetLogPrefix() << "Counters: {"
- C(Errors)
- C(CurrentSessionLifetimeMs)
- C(BytesRead)
- C(MessagesRead)
- C(BytesReadCompressed)
- C(BytesInflightUncompressed)
- C(BytesInflightCompressed)
- C(BytesInflightTotal)
- C(MessagesInflight)
- << " }"
- );
- }
-
-#undef C
-
- // ScheduleDumpCountersToLog(timeNumber + 1);
-}
-
-void TReadSession::ScheduleDumpCountersToLog(size_t timeNumber) {
+void TReadSession::SetupCountersLogger() {
with_lock(Lock) {
- if (Aborting || Closing) {
- return;
- }
- DumpCountersContext = Connections->CreateContext();
- if (DumpCountersContext) {
- auto callback = [self = shared_from_this(), timeNumber](bool ok) {
- if (ok) {
- self->DumpCountersToLog(timeNumber);
- }
- };
- Connections->ScheduleCallback(TDuration::Seconds(1),
- std::move(callback),
- DumpCountersContext);
- }
+ std::vector<NPersQueue::TCallbackContextPtr<false>> sessions{CbContext};
+
+ CountersLogger = std::make_shared<NPersQueue::TCountersLogger<false>>(Connections, sessions, Settings.Counters_, Log,
+ GetLogPrefix(), StartSessionTime);
+ DumpCountersContext = CountersLogger->MakeCallbackContext();
+ CountersLogger->Start();
}
}
-
void TReadSession::AbortImpl(NPersQueue::TDeferredActions<false>&) {
Y_VERIFY(Lock.IsLocked());
@@ -420,7 +351,6 @@ void TReadSession::AbortImpl(NPersQueue::TDeferredActions<false>&) {
Aborting = true;
if (DumpCountersContext) {
DumpCountersContext->Cancel();
- DumpCountersContext.reset();
}
Session->Abort();
}
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h
index 064a395a697..aa626de4b15 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h
@@ -2,75 +2,13 @@
#include "topic_impl.h"
-#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/impl_tracker.h>
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/callback_context.h>
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/counters_logger.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h>
namespace NYdb::NTopic {
-class TDummyReadSession: public IReadSession, public std::enable_shared_from_this<TDummyReadSession> {
-public:
- TDummyReadSession() = default;
-
- inline TDummyReadSession(const TReadSessionSettings& settings) {
- (void)settings;
- }
-
- inline NThreading::TFuture<void> WaitEvent() override {
- Y_VERIFY(false);
-
- NThreading::TPromise<void> promise = NThreading::NewPromise<void>();
- return promise.GetFuture();
- }
-
- inline TVector<TReadSessionEvent::TEvent> GetEvents(bool block, TMaybe<size_t> maxEventsCount, size_t maxByteSize) override {
- Y_VERIFY(false);
-
- (void)block;
- (void)maxEventsCount;
- (void)maxByteSize;
- return {};
- }
-
- TVector<TReadSessionEvent::TEvent> GetEvents(const TReadSessionGetEventSettings& settings) override {
- return GetEvents(settings.Block_,
- settings.MaxEventsCount_,
- settings.MaxByteSize_);
- }
-
- inline TMaybe<TReadSessionEvent::TEvent> GetEvent(bool block, size_t maxByteSize) override {
- Y_VERIFY(false);
-
- (void)block;
- (void)maxByteSize;
- return {};
- }
-
- TMaybe<TReadSessionEvent::TEvent> GetEvent(const TReadSessionGetEventSettings& settings) override {
- return GetEvent(settings.Block_,
- settings.MaxByteSize_);
- }
-
- inline bool Close(TDuration timeout) override {
- Y_VERIFY(false);
-
- return !(bool)timeout;
- }
-
- inline TString GetSessionId() const override {
- Y_VERIFY(false);
-
- return "dummy_session_id";
- }
-
- inline TReaderCounters::TPtr GetCounters() const override {
- Y_VERIFY(false);
-
- return nullptr;
- }
-};
-
class TReadSession : public IReadSession,
- public NPersQueue::IUserRetrievedEventCallback<false>,
public std::enable_shared_from_this<TReadSession> {
public:
TReadSession(const TReadSessionSettings& settings,
@@ -113,11 +51,8 @@ private:
void CreateClusterSessionsImpl(NPersQueue::TDeferredActions<false>& deferred);
- void OnUserRetrievedEvent(i64 decompressedSize, size_t messagesCount) override;
-
void MakeCountersIfNeeded();
- void DumpCountersToLog(size_t timeNumber = 0);
- void ScheduleDumpCountersToLog(size_t timeNumber = 0);
+ void SetupCountersLogger();
// Shutdown.
void Abort(EStatus statusCode, NYql::TIssues&& issues);
@@ -150,13 +85,14 @@ private:
std::shared_ptr<TGRpcConnectionsImpl> Connections;
TDbDriverStatePtr DbDriverState;
TAdaptiveLock Lock;
- std::shared_ptr<NPersQueue::TImplTracker> Tracker;
std::shared_ptr<NPersQueue::TReadSessionEventsQueue<false>> EventsQueue;
NPersQueue::TSingleClusterReadSessionImpl<false>::TPtr Session;
+ std::shared_ptr<NPersQueue::TCallbackContext<NPersQueue::TSingleClusterReadSessionImpl<false>>> CbContext;
TVector<TTopicReadSettings> Topics;
- NGrpc::IQueueClientContextPtr DumpCountersContext;
+ std::shared_ptr<NPersQueue::TCountersLogger<false>> CountersLogger;
+ std::shared_ptr<NPersQueue::TCallbackContext<NPersQueue::TCountersLogger<false>>> DumpCountersContext;
// Exiting.
bool Aborting = false;
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp
index 593a0362923..6a8a618831c 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp
@@ -19,9 +19,10 @@ TWriteSession::TWriteSession(
std::shared_ptr<TTopicClient::TImpl> client,
std::shared_ptr<TGRpcConnectionsImpl> connections,
TDbDriverStatePtr dbDriverState)
- : Tracker(std::make_shared<NPersQueue::TImplTracker>())
- , Impl(std::make_shared<TWriteSessionImpl>(settings, std::move(client), std::move(connections), std::move(dbDriverState), Tracker))
+ : Impl(std::make_shared<TWriteSessionImpl>(settings, std::move(client), std::move(connections), std::move(dbDriverState)))
{
+ CbContext = std::make_shared<NPersQueue::TCallbackContext<TWriteSessionImpl>>(Impl);
+ Impl->SetCallbackContext(CbContext);
}
void TWriteSession::Start(const TDuration& delay) {
@@ -79,7 +80,7 @@ bool TWriteSession::Close(TDuration closeTimeout) {
TWriteSession::~TWriteSession() {
Impl->Close(TDuration::Zero());
- Tracker->AsyncComplete().Wait();
+ CbContext->Cancel();
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h
index d4e11d34082..342e0c52e76 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h
@@ -4,7 +4,7 @@
#include "write_session_impl.h"
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h>
-#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/impl_tracker.h>
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/callback_context.h>
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
#include <util/generic/buffer.h>
@@ -55,10 +55,13 @@ private:
void Start(const TDuration& delay);
private:
- std::shared_ptr<NPersQueue::TImplTracker> Tracker;
+ std::shared_ptr<NPersQueue::TCallbackContext<TWriteSessionImpl>> CbContext;
std::shared_ptr<TWriteSessionImpl> Impl;
};
+////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+// TSimpleBlockingWriteSession
+
class TSimpleBlockingWriteSession : public ISimpleBlockingWriteSession {
public:
TSimpleBlockingWriteSession(
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
index e86ca445fd6..8eee84c3a5d 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
@@ -49,15 +49,13 @@ TWriteSessionImpl::TWriteSessionImpl(
const TWriteSessionSettings& settings,
std::shared_ptr<TTopicClient::TImpl> client,
std::shared_ptr<TGRpcConnectionsImpl> connections,
- TDbDriverStatePtr dbDriverState,
- std::shared_ptr<NPersQueue::TImplTracker> tracker)
+ TDbDriverStatePtr dbDriverState)
: Settings(settings)
, Client(std::move(client))
, Connections(std::move(connections))
, DbDriverState(std::move(dbDriverState))
, PrevToken(DbDriverState->CredentialsProvider ? DbDriverState->CredentialsProvider->GetAuthInfo() : "")
- , Tracker(tracker)
- , EventsQueue(std::make_shared<TWriteSessionEventsQueue>(Settings, Tracker))
+ , EventsQueue(std::make_shared<TWriteSessionEventsQueue>(Settings))
, InitSeqNoPromise(NThreading::NewPromise<ui64>())
, WakeupInterval(
Settings.BatchFlushInterval_.GetOrElse(TDuration::Zero()) ?
@@ -77,7 +75,12 @@ TWriteSessionImpl::TWriteSessionImpl(
}
+void TWriteSessionImpl::SetCallbackContext(std::shared_ptr<NPersQueue::TCallbackContext<TWriteSessionImpl>> ctx) {
+ CbContext = std::move(ctx);
+}
+
void TWriteSessionImpl::Start(const TDuration& delay) {
+ Y_VERIFY(CbContext);
++ConnectionAttemptsDone;
if (!Started) {
with_lock(Lock) {
@@ -159,17 +162,22 @@ void TWriteSessionImpl::ConnectToPreferredPartitionLocation(const TDuration& del
request.set_partition_id(*Settings.PartitionId_);
request.set_include_location(true);
- auto extractor = [sharedThis = shared_from_this(), wire = Tracker->MakeTrackedWire(), context = describePartitionContext](Ydb::Topic::DescribePartitionResponse* response, TPlainStatus status) mutable {
+ auto extractor = [cbContext = CbContext, context = describePartitionContext](Ydb::Topic::DescribePartitionResponse* response, TPlainStatus status) mutable {
Ydb::Topic::DescribePartitionResult result;
if (response)
response->operation().result().UnpackTo(&result);
TStatus st(std::move(status));
- sharedThis->OnDescribePartition(st, result, context);
+ if (auto self = cbContext->LockShared()) {
+ self->OnDescribePartition(st, result, context);
+ }
};
- auto callback = [sharedThis = this->shared_from_this(), wire = Tracker->MakeTrackedWire(), req = std::move(request), extr = std::move(extractor), connections = std::shared_ptr<TGRpcConnectionsImpl>(Connections), dbState = DbDriverState, context = describePartitionContext]() mutable {
- LOG_LAZY(dbState->Log, TLOG_DEBUG, sharedThis->LogPrefix() << " Getting partition location, partition " << sharedThis->Settings.PartitionId_);
+ auto callback = [req = std::move(request), extr = std::move(extractor),
+ connections = std::shared_ptr<TGRpcConnectionsImpl>(Connections), dbState = DbDriverState,
+ context = describePartitionContext, prefix = TString(LogPrefix()),
+ partId = Settings.PartitionId_]() mutable {
+ LOG_LAZY(dbState->Log, TLOG_DEBUG, prefix + " Getting partition location, partition " + ToString(partId));
connections->Run<Ydb::Topic::V1::TopicService, Ydb::Topic::DescribePartitionRequest, Ydb::Topic::DescribePartitionResponse>(
std::move(req),
std::move(extr),
@@ -189,10 +197,11 @@ void TWriteSessionImpl::OnDescribePartition(const TStatus& status, const Ydb::To
THandleResult handleResult;
with_lock (Lock) {
- if (DescribePartitionContext == describePartitionContext)
+ if (DescribePartitionContext == describePartitionContext) {
DescribePartitionContext = nullptr;
- else
+ } else {
return;
+ }
}
if (!status.IsSuccess()) {
@@ -468,19 +477,18 @@ void TWriteSessionImpl::Connect(const TDuration& delay) {
reqSettings = TRpcRequestSettings::Make(Settings, PreferredPartitionLocation.Endpoint);
- connectCallback = [sharedThis = shared_from_this(),
- wire = Tracker->MakeTrackedWire(),
- connectContext = connectContext]
- (TPlainStatus&& st, typename IProcessor::TPtr&& processor) {
- sharedThis->OnConnect(std::move(st), std::move(processor), connectContext);
+ connectCallback = [cbContext = CbContext,
+ connectContext = connectContext](TPlainStatus&& st, typename IProcessor::TPtr&& processor) {
+ if (auto self = cbContext->LockShared()) {
+ self->OnConnect(std::move(st), std::move(processor), connectContext);
+ }
};
- connectTimeoutCallback = [sharedThis = shared_from_this(),
- wire = Tracker->MakeTrackedWire(),
- connectTimeoutContext = connectTimeoutContext]
- (bool ok) {
+ connectTimeoutCallback = [cbContext = CbContext, connectTimeoutContext = connectTimeoutContext](bool ok) {
if (ok) {
- sharedThis->OnConnectTimeout(connectTimeoutContext);
+ if (auto self = cbContext->LockShared()) {
+ self->OnConnectTimeout(connectTimeoutContext);
+ }
}
};
}
@@ -570,7 +578,7 @@ void TWriteSessionImpl::InitImpl() {
auto* init = req.mutable_init_request();
init->set_path(Settings.Path_);
init->set_producer_id(Settings.ProducerId_);
-
+
if (Settings.PartitionId_.Defined()) {
if (Settings.DirectWriteToPartition_) {
auto* partitionWithGeneration = init->mutable_partition_with_generation();
@@ -604,10 +612,11 @@ void TWriteSessionImpl::WriteToProcessorImpl(TWriteSessionImpl::TClientMessage&&
if (Aborting) {
return;
}
- auto callback = [sharedThis = shared_from_this(),
- wire = Tracker->MakeTrackedWire(),
+ auto callback = [cbContext = CbContext,
connectionGeneration = ConnectionGeneration](NGrpc::TGrpcStatus&& grpcStatus) {
- sharedThis->OnWriteDone(std::move(grpcStatus), connectionGeneration);
+ if (auto self = cbContext->LockShared()) {
+ self->OnWriteDone(std::move(grpcStatus), connectionGeneration);
+ }
};
Processor->Write(std::move(req), callback);
@@ -624,13 +633,14 @@ void TWriteSessionImpl::ReadFromProcessor() {
}
prc = Processor;
generation = ConnectionGeneration;
- callback = [sharedThis = shared_from_this(),
- wire = Tracker->MakeTrackedWire(),
- connectionGeneration = generation,
- processor = prc,
- serverMessage = ServerMessage]
- (NGrpc::TGrpcStatus&& grpcStatus) {
- sharedThis->OnReadDone(std::move(grpcStatus), connectionGeneration);
+ callback = [cbContext = CbContext,
+ connectionGeneration = generation,
+ processor = prc,
+ serverMessage = ServerMessage]
+ (NGrpc::TGrpcStatus&& grpcStatus) {
+ if (auto self = cbContext->LockShared()) {
+ self->OnReadDone(std::move(grpcStatus), connectionGeneration);
+ }
};
}
prc->Read(ServerMessage.get(), std::move(callback));
@@ -943,8 +953,7 @@ void TWriteSessionImpl::CompressImpl(TBlock&& block_) {
std::shared_ptr<TBlock> blockPtr(std::make_shared<TBlock>());
blockPtr->Move(block_);
- auto lambda = [sharedThis = shared_from_this(),
- wire = Tracker->MakeTrackedWire(),
+ auto lambda = [cbContext = CbContext,
codec = Settings.Codec_,
level = Settings.CompressionLevel_,
isSyncCompression = !CompressionExecutor->IsAsync(),
@@ -957,8 +966,10 @@ void TWriteSessionImpl::CompressImpl(TBlock&& block_) {
Y_VERIFY(!compressedData.Empty());
blockPtr->Data = std::move(compressedData);
blockPtr->Compressed = true;
- blockPtr->CodecID = static_cast<ui32>(sharedThis->Settings.Codec_);
- sharedThis->OnCompressed(std::move(*blockPtr), isSyncCompression);
+ blockPtr->CodecID = static_cast<ui32>(codec);
+ if (auto self = cbContext->LockShared()) {
+ self->OnCompressed(std::move(*blockPtr), isSyncCompression);
+ }
};
CompressionExecutor->Post(lambda);
@@ -1261,13 +1272,15 @@ void TWriteSessionImpl::HandleWakeUpImpl() {
if (AtomicGet(Aborting)) {
return;
}
- auto callback = [sharedThis = this->shared_from_this(), wire = Tracker->MakeTrackedWire()] (bool ok)
+ auto callback = [cbContext = CbContext] (bool ok)
{
if (!ok) {
return;
}
- with_lock(sharedThis->Lock) {
- sharedThis->HandleWakeUpImpl();
+ if (auto self = cbContext->LockShared()) {
+ with_lock(self->Lock) {
+ self->HandleWakeUpImpl();
+ }
}
};
if (TInstant::Now() - LastTokenUpdate > UPDATE_TOKEN_PERIOD) {
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
index e5ed606a669..67b69cb6d57 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
@@ -3,7 +3,7 @@
#include "topic_impl.h"
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h>
-#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/impl_tracker.h>
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/callback_context.h>
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
#include <util/generic/buffer.h>
@@ -26,10 +26,8 @@ class TWriteSessionEventsQueue: public NPersQueue::TBaseSessionEventsQueue<TWrit
using TParent = TBaseSessionEventsQueue<TWriteSessionSettings, TWriteSessionEvent::TEvent, TSessionClosedEvent, IExecutor>;
public:
- TWriteSessionEventsQueue(const TWriteSessionSettings& settings,
- std::shared_ptr<NPersQueue::TImplTracker> tracker = std::make_shared<NPersQueue::TImplTracker>())
+ TWriteSessionEventsQueue(const TWriteSessionSettings& settings)
: TParent(settings)
- , Tracker(std::move(tracker))
{}
void PushEvent(TEventInfo eventInfo) {
@@ -126,7 +124,7 @@ private:
};
bool ApplyHandler(TEventInfo& eventInfo) {
- THandlersVisitor visitor(Settings, eventInfo.Event, Tracker);
+ THandlersVisitor visitor(Settings, eventInfo.Event);
return visitor.Visit();
}
@@ -141,9 +139,6 @@ private:
Y_ASSERT(CloseEvent);
return {*CloseEvent};
}
-
-private:
- std::shared_ptr<NPersQueue::TImplTracker> Tracker;
};
struct TMemoryUsageChange {
@@ -325,8 +320,7 @@ public:
TWriteSessionImpl(const TWriteSessionSettings& settings,
std::shared_ptr<TTopicClient::TImpl> client,
std::shared_ptr<TGRpcConnectionsImpl> connections,
- TDbDriverStatePtr dbDriverState,
- std::shared_ptr<NPersQueue::TImplTracker> tracker);
+ TDbDriverStatePtr dbDriverState);
TMaybe<TWriteSessionEvent::TEvent> GetEvent(bool block = false) override;
TVector<TWriteSessionEvent::TEvent> GetEvents(bool block = false,
@@ -361,6 +355,8 @@ public:
~TWriteSessionImpl(); // will not call close - destroy everything without acks
+ void SetCallbackContext(std::shared_ptr<NPersQueue::TCallbackContext<TWriteSessionImpl>> ctx);
+
private:
TStringBuilder LogPrefix() const;
@@ -429,7 +425,7 @@ private:
TStringType PrevToken;
bool UpdateTokenInProgress = false;
TInstant LastTokenUpdate = TInstant::Zero();
- std::shared_ptr<NPersQueue::TImplTracker> Tracker;
+ std::shared_ptr<NPersQueue::TCallbackContext<TWriteSessionImpl>> CbContext;
std::shared_ptr<TWriteSessionEventsQueue> EventsQueue;
NGrpc::IQueueClientContextPtr ClientContext; // Common client context.
NGrpc::IQueueClientContextPtr ConnectContext;