aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorildar-khisam <ikhis@ydb.tech>2023-11-07 23:23:58 +0300
committerildar-khisam <ikhis@ydb.tech>2023-11-07 23:39:31 +0300
commitd8b4abf5dc60664a332b8a7636bea320f17dc204 (patch)
tree5dc5c9aef0565c185715c46ced1524fd68ac59c8
parent59e3f9da6fd5a8e9232c51e78a3bc8e87d71a6be (diff)
downloadydb-d8b4abf5dc60664a332b8a7636bea320f17dc204.tar.gz
split fed read session to client and impl parts
split fed read session to client and impl parts
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp60
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h74
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp47
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.h36
-rw-r--r--ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/callback_context.h92
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp28
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h6
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp28
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp24
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h6
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp20
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h22
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.cpp2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp11
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp37
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h6
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp20
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h26
21 files changed, 330 insertions, 224 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp
index 65a5eb8c8f0..07645caf6e8 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp
@@ -54,10 +54,10 @@ NTopic::TReadSessionSettings FromFederated(const TFederatedReadSessionSettings&
return SubsessionSettings;
}
-TFederatedReadSession::TFederatedReadSession(const TFederatedReadSessionSettings& settings,
- std::shared_ptr<TGRpcConnectionsImpl> connections,
- const TFederatedTopicClientSettings& clientSetttings,
- std::shared_ptr<TFederatedDbObserver> observer)
+TFederatedReadSessionImpl::TFederatedReadSessionImpl(const TFederatedReadSessionSettings& settings,
+ std::shared_ptr<TGRpcConnectionsImpl> connections,
+ const TFederatedTopicClientSettings& clientSetttings,
+ std::shared_ptr<TFederatedDbObserver> observer)
: Settings(settings)
, Connections(std::move(connections))
, SubClientSetttings(FromFederated(clientSetttings))
@@ -68,17 +68,19 @@ TFederatedReadSession::TFederatedReadSession(const TFederatedReadSessionSettings
{
}
-void TFederatedReadSession::Start() {
- AsyncInit.Subscribe([self = shared_from_this()](const auto& f){
+void TFederatedReadSessionImpl::Start() {
+ AsyncInit.Subscribe([selfCtx = SelfContext](const auto& f){
Y_UNUSED(f);
- with_lock(self->Lock) {
- self->FederationState = self->Observer->GetState();
- self->OnFederatedStateUpdateImpl();
+ if (auto self = selfCtx->LockShared()) {
+ with_lock(self->Lock) {
+ self->FederationState = self->Observer->GetState();
+ self->OnFederatedStateUpdateImpl();
+ }
}
});
}
-void TFederatedReadSession::OpenSubSessionsImpl() {
+void TFederatedReadSessionImpl::OpenSubSessionsImpl() {
for (const auto& db : FederationState->DbInfos) {
// TODO check if available
NTopic::TTopicClientSettings settings = SubClientSetttings;
@@ -92,7 +94,7 @@ void TFederatedReadSession::OpenSubSessionsImpl() {
SubsessionIndex = 0;
}
-void TFederatedReadSession::OnFederatedStateUpdateImpl() {
+void TFederatedReadSessionImpl::OnFederatedStateUpdateImpl() {
if (!FederationState->Status.IsSuccess()) {
CloseImpl();
return;
@@ -104,24 +106,27 @@ void TFederatedReadSession::OnFederatedStateUpdateImpl() {
// 3) TODO LATER reschedule OnFederatedStateUpdate
}
-NThreading::TFuture<void> TFederatedReadSession::WaitEvent() {
+NThreading::TFuture<void> TFederatedReadSessionImpl::WaitEvent() {
// TODO override with read session settings timeout
- return AsyncInit.Apply([self = shared_from_this()](const NThreading::TFuture<void>) {
- if (self->Closing) {
- return NThreading::MakeFuture();
- }
- std::vector<NThreading::TFuture<void>> waiters;
- with_lock(self->Lock) {
- Y_ABORT_UNLESS(!self->SubSessions.empty(), "SubSessions empty in discovered state");
- for (const auto& sub : self->SubSessions) {
- waiters.emplace_back(sub.Session->WaitEvent());
+ return AsyncInit.Apply([selfCtx = SelfContext](const NThreading::TFuture<void>) {
+ if (auto self = selfCtx->LockShared()) {
+ if (self->Closing) {
+ return NThreading::MakeFuture();
+ }
+ std::vector<NThreading::TFuture<void>> waiters;
+ with_lock(self->Lock) {
+ Y_ABORT_UNLESS(!self->SubSessions.empty(), "SubSessions empty in discovered state");
+ for (const auto& sub : self->SubSessions) {
+ waiters.emplace_back(sub.Session->WaitEvent());
+ }
}
+ return NThreading::WaitAny(std::move(waiters));
}
- return NThreading::WaitAny(std::move(waiters));
+ return NThreading::MakeFuture();
});
}
-TVector<TReadSessionEvent::TEvent> TFederatedReadSession::GetEvents(bool block, TMaybe<size_t> maxEventsCount, size_t maxByteSize) {
+TVector<TReadSessionEvent::TEvent> TFederatedReadSessionImpl::GetEvents(bool block, TMaybe<size_t> maxEventsCount, size_t maxByteSize) {
if (block) {
WaitEvent().Wait();
}
@@ -152,16 +157,11 @@ TVector<TReadSessionEvent::TEvent> TFederatedReadSession::GetEvents(bool block,
return result;
}
-TMaybe<TReadSessionEvent::TEvent> TFederatedReadSession::GetEvent(bool block, size_t maxByteSize) {
- auto events = GetEvents(block, 1, maxByteSize);
- return events.empty() ? Nothing() : TMaybe<TReadSessionEvent::TEvent>{std::move(events.front())};
-}
-
-void TFederatedReadSession::CloseImpl() {
+void TFederatedReadSessionImpl::CloseImpl() {
Closing = true;
}
-bool TFederatedReadSession::Close(TDuration timeout) {
+bool TFederatedReadSessionImpl::Close(TDuration timeout) {
bool result = true;
for (const auto& sub : SubSessions) {
// TODO substract from user timeout
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h
index a9ffe4c703c..4bbef50c5c1 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h
@@ -2,15 +2,16 @@
#include <ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.h>
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/callback_context.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h>
#include <ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h>
namespace NYdb::NFederatedTopic {
-class TFederatedReadSession : public IFederatedReadSession,
- public std::enable_shared_from_this<TFederatedReadSession> {
+class TFederatedReadSessionImpl : public NPersQueue::TEnableSelfContext<TFederatedReadSessionImpl> {
friend class TFederatedTopicClient::TImpl;
+ friend class TFederatedReadSession;
private:
struct TSubSession {
@@ -24,24 +25,23 @@ private:
};
public:
- TFederatedReadSession(const TFederatedReadSessionSettings& settings,
- std::shared_ptr<TGRpcConnectionsImpl> connections,
- const TFederatedTopicClientSettings& clientSetttings,
- std::shared_ptr<TFederatedDbObserver> observer);
+ TFederatedReadSessionImpl(const TFederatedReadSessionSettings& settings,
+ std::shared_ptr<TGRpcConnectionsImpl> connections,
+ const TFederatedTopicClientSettings& clientSetttings,
+ std::shared_ptr<TFederatedDbObserver> observer);
- ~TFederatedReadSession() = default;
+ ~TFederatedReadSessionImpl() = default;
- NThreading::TFuture<void> WaitEvent() override;
- TVector<TReadSessionEvent::TEvent> GetEvents(bool block, TMaybe<size_t> maxEventsCount, size_t maxByteSize) override;
- TMaybe<TReadSessionEvent::TEvent> GetEvent(bool block, size_t maxByteSize) override;
+ NThreading::TFuture<void> WaitEvent();
+ TVector<TReadSessionEvent::TEvent> GetEvents(bool block, TMaybe<size_t> maxEventsCount, size_t maxByteSize);
- bool Close(TDuration timeout) override;
+ bool Close(TDuration timeout);
- inline TString GetSessionId() const override {
+ inline TString GetSessionId() const {
return SessionId;
}
- inline NTopic::TReaderCounters::TPtr GetCounters() const override {
+ inline NTopic::TReaderCounters::TPtr GetCounters() const {
return Settings.Counters_; // Always not nullptr.
}
@@ -93,4 +93,52 @@ private:
bool Closing = false;
};
+
+class TFederatedReadSession : public IFederatedReadSession,
+ public NPersQueue::TContextOwner<TFederatedReadSessionImpl> {
+ friend class TFederatedTopicClient::TImpl;
+
+public:
+ TFederatedReadSession(const TFederatedReadSessionSettings& settings,
+ std::shared_ptr<TGRpcConnectionsImpl> connections,
+ const TFederatedTopicClientSettings& clientSettings,
+ std::shared_ptr<TFederatedDbObserver> observer)
+ : TContextOwner(settings, std::move(connections), clientSettings, std::move(observer)) {
+ }
+
+ ~TFederatedReadSession() {
+ TryGetImpl()->Close(TDuration::Zero());
+ }
+
+ NThreading::TFuture<void> WaitEvent() override {
+ return TryGetImpl()->WaitEvent();
+ }
+
+ TVector<TReadSessionEvent::TEvent> GetEvents(bool block, TMaybe<size_t> maxEventsCount, size_t maxByteSize) override {
+ return TryGetImpl()->GetEvents(block, maxEventsCount, maxByteSize);
+ }
+
+ TMaybe<TReadSessionEvent::TEvent> GetEvent(bool block, size_t maxByteSize) override {
+ auto events = GetEvents(block, 1, maxByteSize);
+ return events.empty() ? Nothing() : TMaybe<TReadSessionEvent::TEvent>{std::move(events.front())};
+ }
+
+ bool Close(TDuration timeout) override {
+ return TryGetImpl()->Close(timeout);
+ }
+
+ inline TString GetSessionId() const override {
+ return TryGetImpl()->GetSessionId();
+ }
+
+ inline NTopic::TReaderCounters::TPtr GetCounters() const override {
+ return TryGetImpl()->GetCounters();
+ }
+
+private:
+ void Start() {
+ return TryGetImpl()->Start();
+ }
+};
+
} // namespace NYdb::NFederatedTopic
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp
index 9e8e795b4e3..62a8fc5b67e 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp
@@ -6,7 +6,7 @@ namespace NYdb::NFederatedTopic {
constexpr TDuration REDISCOVERY_DELAY = TDuration::Seconds(30);
-TFederatedDbObserver::TFederatedDbObserver(std::shared_ptr<TGRpcConnectionsImpl> connections, const TFederatedTopicClientSettings& settings)
+TFederatedDbObserverImpl::TFederatedDbObserverImpl(std::shared_ptr<TGRpcConnectionsImpl> connections, const TFederatedTopicClientSettings& settings)
: TClientImplCommon(std::move(connections), settings)
, FederatedDbState(std::make_shared<TFederatedDbState>())
, PromiseToInitState(NThreading::NewPromise())
@@ -17,21 +17,21 @@ TFederatedDbObserver::TFederatedDbObserver(std::shared_ptr<TGRpcConnectionsImpl>
RpcSettings.UseAuth = true;
}
-TFederatedDbObserver::~TFederatedDbObserver() {
+TFederatedDbObserverImpl::~TFederatedDbObserverImpl() {
Stop();
}
-std::shared_ptr<TFederatedDbState> TFederatedDbObserver::GetState() {
+std::shared_ptr<TFederatedDbState> TFederatedDbObserverImpl::GetState() {
with_lock(Lock) {
return FederatedDbState;
}
}
-NThreading::TFuture<void> TFederatedDbObserver::WaitForFirstState() {
+NThreading::TFuture<void> TFederatedDbObserverImpl::WaitForFirstState() {
return PromiseToInitState.GetFuture();
}
-void TFederatedDbObserver::Start() {
+void TFederatedDbObserverImpl::Start() {
with_lock(Lock) {
if (Stopping) {
return;
@@ -40,7 +40,7 @@ void TFederatedDbObserver::Start() {
}
}
-void TFederatedDbObserver::Stop() {
+void TFederatedDbObserverImpl::Stop() {
NGrpc::IQueueClientContextPtr ctx;
with_lock(Lock) {
Stopping = true;
@@ -52,17 +52,17 @@ void TFederatedDbObserver::Stop() {
}
// If observer is stale it will never update state again because of client retry policy
-bool TFederatedDbObserver::IsStale() const {
+bool TFederatedDbObserverImpl::IsStale() const {
with_lock(Lock) {
return PromiseToInitState.HasValue() && !FederatedDbState->Status.IsSuccess();
}
}
-Ydb::FederationDiscovery::ListFederationDatabasesRequest TFederatedDbObserver::ComposeRequest() const {
+Ydb::FederationDiscovery::ListFederationDatabasesRequest TFederatedDbObserverImpl::ComposeRequest() const {
return {};
}
-void TFederatedDbObserver::RunFederationDiscoveryImpl() {
+void TFederatedDbObserverImpl::RunFederationDiscoveryImpl() {
Y_ABORT_UNLESS(Lock.IsLocked());
FederationDiscoveryDelayContext = Connections_->CreateContext();
@@ -72,14 +72,15 @@ void TFederatedDbObserver::RunFederationDiscoveryImpl() {
return;
}
- auto extractor = [self = shared_from_this()]
+ auto extractor = [selfCtx = SelfContext]
(google::protobuf::Any* any, TPlainStatus status) mutable {
-
- Ydb::FederationDiscovery::ListFederationDatabasesResult result;
- if (any) {
- any->UnpackTo(&result);
+ if (auto self = selfCtx->LockShared()) {
+ Ydb::FederationDiscovery::ListFederationDatabasesResult result;
+ if (any) {
+ any->UnpackTo(&result);
+ }
+ self->OnFederationDiscovery(std::move(status), std::move(result));
}
- self->OnFederationDiscovery(std::move(status), std::move(result));
};
Connections_->RunDeferred<Ydb::FederationDiscovery::V1::FederationDiscoveryService,
@@ -94,15 +95,17 @@ void TFederatedDbObserver::RunFederationDiscoveryImpl() {
FederationDiscoveryDelayContext);
}
-void TFederatedDbObserver::ScheduleFederationDiscoveryImpl(TDuration delay) {
+void TFederatedDbObserverImpl::ScheduleFederationDiscoveryImpl(TDuration delay) {
Y_ABORT_UNLESS(Lock.IsLocked());
- auto cb = [self = shared_from_this()](bool ok) {
+ auto cb = [selfCtx = SelfContext](bool ok) {
if (ok) {
- with_lock(self->Lock) {
- if (self->Stopping) {
- return;
+ if (auto self = selfCtx->LockShared()) {
+ with_lock(self->Lock) {
+ if (self->Stopping) {
+ return;
+ }
+ self->RunFederationDiscoveryImpl();
}
- self->RunFederationDiscoveryImpl();
}
}
};
@@ -119,7 +122,7 @@ void TFederatedDbObserver::ScheduleFederationDiscoveryImpl(TDuration delay) {
}
-void TFederatedDbObserver::OnFederationDiscovery(TStatus&& status, Ydb::FederationDiscovery::ListFederationDatabasesResult&& result) {
+void TFederatedDbObserverImpl::OnFederationDiscovery(TStatus&& status, Ydb::FederationDiscovery::ListFederationDatabasesResult&& result) {
with_lock(Lock) {
if (Stopping) {
// TODO log something
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.h
index 55de50168c1..4a04b20b3ef 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.h
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.h
@@ -7,6 +7,7 @@
#include <ydb/public/api/protos/ydb_federation_discovery.pb.h>
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/callback_context.h>
#include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h>
#include <ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h>
@@ -42,14 +43,15 @@ public:
};
-class TFederatedDbObserver : public TClientImplCommon<TFederatedDbObserver> {
+class TFederatedDbObserverImpl : public TClientImplCommon<TFederatedDbObserverImpl>,
+ public NPersQueue::TEnableSelfContext<TFederatedDbObserverImpl> {
public:
static constexpr TDuration REDISCOVER_DELAY = TDuration::Seconds(60);
public:
- TFederatedDbObserver(std::shared_ptr<TGRpcConnectionsImpl> connections, const TFederatedTopicClientSettings& settings);
+ TFederatedDbObserverImpl(std::shared_ptr<TGRpcConnectionsImpl> connections, const TFederatedTopicClientSettings& settings);
- ~TFederatedDbObserver();
+ ~TFederatedDbObserverImpl();
std::shared_ptr<TFederatedDbState> GetState();
@@ -79,4 +81,32 @@ private:
bool Stopping = false;
};
+class TFederatedDbObserver : public NPersQueue::TContextOwner<TFederatedDbObserverImpl> {
+public:
+ inline TFederatedDbObserver(std::shared_ptr<TGRpcConnectionsImpl> connections,
+ const TFederatedTopicClientSettings& settings)
+ : TContextOwner(connections, settings) {
+ }
+
+ inline std::shared_ptr<TFederatedDbState> GetState() {
+ return TryGetImpl()->GetState();
+ }
+
+ inline NThreading::TFuture<void> WaitForFirstState() {
+ return TryGetImpl()->WaitForFirstState();
+ }
+
+ inline void Start() {
+ return TryGetImpl()->Start();
+ }
+
+ inline void Stop() {
+ return TryGetImpl()->Stop();
+ }
+
+ inline bool IsStale() const {
+ return TryGetImpl()->IsStale();
+ }
+};
+
} // namespace NYdb::NFederatedTopic
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp
index 7252f7f8aed..7b41ae11c7b 100644
--- a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp
@@ -135,6 +135,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
.MaxMemoryUsageBytes(1_MB)
.AppendTopics(setup->GetTestTopic());
+ Cerr << "Before ReadSession was created" << Endl;
ReadSession = topicClient.CreateFederatedReadSession(readSettings);
Cerr << "Session was created" << Endl;
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/callback_context.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/callback_context.h
index 5de727a5ab5..9e7ffe2e6d6 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/callback_context.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/callback_context.h
@@ -10,8 +10,13 @@
namespace NYdb::NPersQueue {
+template<typename T>
+class TContextOwner;
+
template <typename TGuardedObject>
class TCallbackContext {
+ friend class TContextOwner<TGuardedObject>;
+
public:
using TMutexPtr = std::shared_ptr<std::shared_mutex>;
@@ -44,28 +49,99 @@ public:
};
public:
- explicit TCallbackContext(typename std::shared_ptr<TGuardedObject> ptr)
+ explicit TCallbackContext(std::shared_ptr<TGuardedObject> ptr)
: Mutex(std::make_shared<std::shared_mutex>())
, GuardedObjectPtr(std::move(ptr))
{}
- // may block
- ~TCallbackContext() {
- Cancel();
- }
-
TBorrowed LockShared() {
return TBorrowed(*this);
}
+// TODO change section below to private after removing pqv1 read session implementation
+// (relation of 1 owner : n impls)
+public:
void Cancel() {
+ std::shared_ptr<TGuardedObject> waste;
std::lock_guard lock(*Mutex);
- GuardedObjectPtr.reset();
+ std::swap(waste, GuardedObjectPtr);
+ }
+
+ std::shared_ptr<TGuardedObject> TryGet() const {
+ if (!GuardedObjectPtr) {
+ ythrow yexception() << "TryGet failed, empty GuardedObjectPtr";
+ }
+ return GuardedObjectPtr;
}
private:
TMutexPtr Mutex;
- typename std::shared_ptr<TGuardedObject> GuardedObjectPtr;
+ std::shared_ptr<TGuardedObject> GuardedObjectPtr;
+};
+
+template<typename T>
+class TEnableSelfContext {
+ template<typename U, typename... Args>
+ friend std::shared_ptr<TCallbackContext<U>> MakeWithCallbackContext(Args&&... args);
+
+public:
+ TEnableSelfContext() = default;
+ ~TEnableSelfContext() = default;
+
+ // non-moveable for simplicity, use only via shared pointers
+ TEnableSelfContext(const TEnableSelfContext&) = delete;
+ TEnableSelfContext(TEnableSelfContext&&) = delete;
+ TEnableSelfContext& operator=(const TEnableSelfContext&) = delete;
+ TEnableSelfContext& operator=(TEnableSelfContext&&) = delete;
+
+protected:
+ void SetSelfContext(std::shared_ptr<T> ptr) {
+ SelfContext = std::make_shared<TCallbackContext<T>>(std::move(ptr));
+ }
+
+protected:
+ std::shared_ptr<TCallbackContext<T>> SelfContext;
+};
+
+template<typename T, typename... Args>
+std::shared_ptr<TCallbackContext<T>> MakeWithCallbackContext(Args&&... args) {
+ static_assert(std::is_base_of_v<TEnableSelfContext<T>, T>, "Expected object derived from TEnableSelfContext");
+ auto pObject = std::make_shared<T>(std::forward<Args>(args)...);
+ pObject->SetSelfContext(pObject);
+ return pObject->SelfContext;
+}
+
+template<typename T>
+class TContextOwner {
+public:
+ template <typename... Args>
+ TContextOwner(Args&&... args)
+ : ImplContext(MakeWithCallbackContext<T>(std::forward<Args>(args)...)) {
+ }
+
+ // may block
+ ~TContextOwner() {
+ CancelImpl();
+ }
+
+ TContextOwner(const TContextOwner&) = delete;
+ TContextOwner(TContextOwner&&) = default;
+ TContextOwner& operator=(const TContextOwner&) = delete;
+ TContextOwner& operator=(TContextOwner&&) = default;
+
+protected:
+ std::shared_ptr<T> TryGetImpl() const {
+ return ImplContext->TryGet();
+ }
+
+ void CancelImpl() {
+ if (ImplContext) {
+ ImplContext->Cancel();
+ }
+ }
+
+protected:
+ std::shared_ptr<TCallbackContext<T>> ImplContext;
};
}
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp
index 403941309f6..12d7e86b53c 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp
@@ -206,20 +206,20 @@ void TReadSession::CreateClusterSessionsImpl(TDeferredActions<true>& deferred) {
AbortImpl(EStatus::ABORTED, DRIVER_IS_STOPPING_DESCRIPTION, deferred);
return;
}
- clusterSessionInfo.Session =
- std::make_shared<TSingleClusterReadSessionImpl<true>>(
- sessionSettings,
- DbDriverState->Database,
- SessionId,
- clusterName,
- Log,
- subclient->CreateReadSessionConnectionProcessorFactory(),
- EventsQueue,
- context,
- partitionStreamIdStart++,
- clusterSessionsCount);
-
- CbContexts.push_back(clusterSessionInfo.Session->MakeCallbackContext());
+ CbContexts.push_back(MakeWithCallbackContext<TSingleClusterReadSessionImpl<true>>(
+ sessionSettings,
+ DbDriverState->Database,
+ SessionId,
+ clusterName,
+ Log,
+ subclient->CreateReadSessionConnectionProcessorFactory(),
+ EventsQueue,
+ context,
+ partitionStreamIdStart++,
+ clusterSessionsCount
+ ));
+
+ clusterSessionInfo.Session = CbContexts.back()->TryGet();
deferred.DeferStartSession(CbContexts.back());
}
}
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
index 3b89d7bf1c5..a778d7f2737 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
@@ -921,7 +921,7 @@ namespace NYdb::NPersQueue {
// It is parametrized with output queue for client events
// and connection factory interface to separate logic from transport.
template <bool UseMigrationProtocol>
-class TSingleClusterReadSessionImpl : public std::enable_shared_from_this<TSingleClusterReadSessionImpl<UseMigrationProtocol>>,
+class TSingleClusterReadSessionImpl : public NPersQueue::TEnableSelfContext<TSingleClusterReadSessionImpl<UseMigrationProtocol>>,
public IUserRetrievedEventCallback<UseMigrationProtocol> {
public:
using TSelf = TSingleClusterReadSessionImpl<UseMigrationProtocol>;
@@ -1009,8 +1009,6 @@ public:
return Log;
}
- TCallbackContextPtr<UseMigrationProtocol> MakeCallbackContext();
-
private:
void BreakConnectionAndReconnectImpl(TPlainStatus&& status, TDeferredActions<UseMigrationProtocol>& deferred);
@@ -1192,8 +1190,6 @@ private:
std::atomic<int> DecompressionTasksInflight = 0;
i64 ReadSizeBudget;
i64 ReadSizeServerDelta = 0;
-
- TCallbackContextPtr<UseMigrationProtocol> CbContext;
};
// High level class that manages several read session impls.
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
index 7aa03c77015..d0828562424 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp
@@ -230,15 +230,9 @@ TStringBuilder TSingleClusterReadSessionImpl<UseMigrationProtocol>::GetLogPrefix
return TStringBuilder() << GetDatabaseLogPrefix(Database) << "[" << SessionId << "] [" << ClusterName << "] ";
}
-template <bool UseMigrationProtocol>
-TCallbackContextPtr<UseMigrationProtocol> TSingleClusterReadSessionImpl<UseMigrationProtocol>::MakeCallbackContext() {
- CbContext = std::make_shared<TCallbackContext<TSelf>>(this->shared_from_this());
- return CbContext;
-}
-
template<bool UseMigrationProtocol>
void TSingleClusterReadSessionImpl<UseMigrationProtocol>::Start() {
- Y_ABORT_UNLESS(CbContext);
+ Y_ABORT_UNLESS(this->SelfContext);
Settings.DecompressionExecutor_->Start();
Settings.EventHandlers_.HandlersExecutor_->Start();
if (!Reconnect(TPlainStatus())) {
@@ -324,16 +318,16 @@ bool TSingleClusterReadSessionImpl<UseMigrationProtocol>::Reconnect(const TPlain
// Destroy all partition streams before connecting.
DestroyAllPartitionStreamsImpl(deferred);
- Y_ABORT_UNLESS(CbContext);
+ Y_ABORT_UNLESS(this->SelfContext);
- connectCallback = [cbContext = CbContext,
+ connectCallback = [cbContext = this->SelfContext,
connectContext = connectContext](TPlainStatus&& st, typename IProcessor::TPtr&& processor) {
if (auto borrowedSelf = cbContext->LockShared()) {
borrowedSelf->OnConnect(std::move(st), std::move(processor), connectContext); // OnConnect could be called inplace!
}
};
- connectTimeoutCallback = [cbContext = CbContext,
+ connectTimeoutCallback = [cbContext = this->SelfContext,
connectTimeoutContext = connectTimeoutContext](bool ok) {
if (ok) {
if (auto borrowedSelf = cbContext->LockShared()) {
@@ -375,7 +369,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::BreakConnectionAndReco
Processor = nullptr;
RetryState = Settings.RetryPolicy_->CreateRetryState(); // Explicitly create retry state to determine whether we should connect to server again.
- deferred.DeferReconnection(CbContext, std::move(status));
+ deferred.DeferReconnection(this->SelfContext, std::move(status));
}
template<bool UseMigrationProtocol>
@@ -824,9 +818,9 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ReadFromProcessorImpl(
if (Processor && !Closing) {
ServerMessage->Clear();
- Y_ABORT_UNLESS(CbContext);
+ Y_ABORT_UNLESS(this->SelfContext);
- auto callback = [cbContext = CbContext,
+ auto callback = [cbContext = this->SelfContext,
connectionGeneration = ConnectionGeneration,
// Capture message & processor not to read in freed memory.
serverMessage = ServerMessage,
@@ -1018,7 +1012,7 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl(
}
auto decompressionInfo = std::make_shared<TDataDecompressionInfo<true>>(std::move(partitionData),
- CbContext,
+ SelfContext,
Settings.Decompress_);
Y_ABORT_UNLESS(decompressionInfo);
@@ -1044,7 +1038,7 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl(
NextPartitionStreamId, msg.topic().path(), msg.cluster(),
msg.partition() + 1, // Group.
msg.partition(), // Partition.
- msg.assign_id(), msg.read_offset(), CbContext);
+ msg.assign_id(), msg.read_offset(), SelfContext);
NextPartitionStreamId += PartitionStreamIdStep;
// Renew partition stream.
@@ -1262,7 +1256,7 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
partitionStream->SetFirstNotReadOffset(desiredOffset);
auto decompressionInfo = std::make_shared<TDataDecompressionInfo<false>>(std::move(partitionData),
- CbContext,
+ SelfContext,
Settings.Decompress_,
serverBytesSize);
// TODO (ildar-khisam@): share serverBytesSize between partitions data according to their actual sizes;
@@ -1290,7 +1284,7 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl(
auto partitionStream = MakeIntrusive<TPartitionStreamImpl<false>>(
NextPartitionStreamId, msg.partition_session().path(), msg.partition_session().partition_id(),
msg.partition_session().partition_session_id(), msg.committed_offset(),
- CbContext);
+ SelfContext);
NextPartitionStreamId += PartitionStreamIdStep;
// Renew partition stream.
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp
index 9b1248ef375..0a4ee65ae37 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp
@@ -17,49 +17,45 @@ TWriteSession::TWriteSession(
std::shared_ptr<TPersQueueClient::TImpl> client,
std::shared_ptr<TGRpcConnectionsImpl> connections,
TDbDriverStatePtr dbDriverState)
- : Impl(std::make_shared<TWriteSessionImpl>(settings, std::move(client), std::move(connections), std::move(dbDriverState)))
-{
- CbContext = std::make_shared<TCallbackContext<TWriteSessionImpl>>(Impl);
- Impl->SetCallbackContext(CbContext);
+ : TContextOwner(settings, std::move(client), std::move(connections), std::move(dbDriverState)) {
}
void TWriteSession::Start(const TDuration& delay) {
- Impl->Start(delay);
+ TryGetImpl()->Start(delay);
}
NThreading::TFuture<ui64> TWriteSession::GetInitSeqNo() {
- return Impl->GetInitSeqNo();
+ return TryGetImpl()->GetInitSeqNo();
}
TMaybe<TWriteSessionEvent::TEvent> TWriteSession::GetEvent(bool block) {
- return Impl->EventsQueue->GetEvent(block);
+ return TryGetImpl()->EventsQueue->GetEvent(block);
}
TVector<TWriteSessionEvent::TEvent> TWriteSession::GetEvents(bool block, TMaybe<size_t> maxEventsCount) {
- return Impl->EventsQueue->GetEvents(block, maxEventsCount);
+ return TryGetImpl()->EventsQueue->GetEvents(block, maxEventsCount);
}
NThreading::TFuture<void> TWriteSession::WaitEvent() {
- return Impl->EventsQueue->WaitEvent();
+ return TryGetImpl()->EventsQueue->WaitEvent();
}
void TWriteSession::WriteEncoded(TContinuationToken&& token, TStringBuf data, ECodec codec, ui32 originalSize,
TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp) {
- Impl->WriteInternal(std::move(token), data, codec, originalSize, seqNo, createTimestamp);
+ TryGetImpl()->WriteInternal(std::move(token), data, codec, originalSize, seqNo, createTimestamp);
}
void TWriteSession::Write(TContinuationToken&& token, TStringBuf data, TMaybe<ui64> seqNo,
TMaybe<TInstant> createTimestamp) {
- Impl->WriteInternal(std::move(token), data, {}, 0, seqNo, createTimestamp);
+ TryGetImpl()->WriteInternal(std::move(token), data, {}, 0, seqNo, createTimestamp);
}
bool TWriteSession::Close(TDuration closeTimeout) {
- return Impl->Close(closeTimeout);
+ return TryGetImpl()->Close(closeTimeout);
}
TWriteSession::~TWriteSession() {
- Impl->Close(TDuration::Zero());
- CbContext->Cancel();
+ TryGetImpl()->Close(TDuration::Zero());
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h
index 7db726d73dc..8057d4560e6 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h
@@ -21,7 +21,7 @@ namespace NTests {
// TWriteSession
class TWriteSession : public IWriteSession,
- public std::enable_shared_from_this<TWriteSession> {
+ public TContextOwner<TWriteSessionImpl> {
private:
friend class TSimpleBlockingWriteSession;
friend class TPersQueueClient;
@@ -56,10 +56,6 @@ public:
private:
void Start(const TDuration& delay);
-
-private:
- std::shared_ptr<NPersQueue::TCallbackContext<TWriteSessionImpl>> CbContext;
- std::shared_ptr<TWriteSessionImpl> Impl;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp
index 0aeae6d05d3..9ea829fccd4 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp
@@ -75,12 +75,8 @@ TWriteSessionImpl::TWriteSessionImpl(
}
-void TWriteSessionImpl::SetCallbackContext(std::shared_ptr<TCallbackContext<TWriteSessionImpl>> ctx) {
- CbContext = std::move(ctx);
-}
-
void TWriteSessionImpl::Start(const TDuration& delay) {
- Y_ABORT_UNLESS(CbContext);
+ Y_ABORT_UNLESS(SelfContext);
++ConnectionAttemptsDone;
if (!Started) {
with_lock(Lock) {
@@ -146,7 +142,7 @@ void TWriteSessionImpl::DoCdsRequest(TDuration delay) {
(Settings.ClusterDiscoveryMode_ == EClusterDiscoveryMode::Auto && !IsFederation(DbDriverState->DiscoveryEndpoint)));
if (!cdsRequestIsUnnecessary) {
- auto extractor = [cbContext = CbContext]
+ auto extractor = [cbContext = SelfContext]
(google::protobuf::Any* any, TPlainStatus status) mutable {
Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResult result;
if (any) {
@@ -466,14 +462,14 @@ void TWriteSessionImpl::DoConnect(const TDuration& delay, const TString& endpoin
Y_ASSERT(connectTimeoutContext);
reqSettings = TRpcRequestSettings::Make(Settings);
- connectCallback = [cbContext = CbContext,
+ connectCallback = [cbContext = SelfContext,
connectContext = connectContext](TPlainStatus&& st, typename IProcessor::TPtr&& processor) {
if (auto self = cbContext->LockShared()) {
self->OnConnect(std::move(st), std::move(processor), connectContext);
}
};
- connectTimeoutCallback = [cbContext = CbContext, connectTimeoutContext = connectTimeoutContext](bool ok) {
+ connectTimeoutCallback = [cbContext = SelfContext, connectTimeoutContext = connectTimeoutContext](bool ok) {
if (ok) {
if (auto self = cbContext->LockShared()) {
self->OnConnectTimeout(connectTimeoutContext);
@@ -588,7 +584,7 @@ void TWriteSessionImpl::WriteToProcessorImpl(TWriteSessionImpl::TClientMessage&&
if (Aborting) {
return;
}
- auto callback = [cbContext = CbContext,
+ auto callback = [cbContext = SelfContext,
connectionGeneration = ConnectionGeneration](NGrpc::TGrpcStatus&& grpcStatus) {
if (auto self = cbContext->LockShared()) {
self->OnWriteDone(std::move(grpcStatus), connectionGeneration);
@@ -609,7 +605,7 @@ void TWriteSessionImpl::ReadFromProcessor() {
}
prc = Processor;
generation = ConnectionGeneration;
- callback = [cbContext = CbContext,
+ callback = [cbContext = SelfContext,
connectionGeneration = generation,
processor = prc,
serverMessage = ServerMessage]
@@ -897,7 +893,7 @@ void TWriteSessionImpl::CompressImpl(TBlock&& block_) {
std::shared_ptr<TBlock> blockPtr(std::make_shared<TBlock>());
blockPtr->Move(block_);
- auto lambda = [cbContext = CbContext,
+ auto lambda = [cbContext = SelfContext,
codec = Settings.Codec_,
level = Settings.CompressionLevel_,
isSyncCompression = !CompressionExecutor->IsAsync(),
@@ -1274,7 +1270,7 @@ void TWriteSessionImpl::HandleWakeUpImpl() {
if (AtomicGet(Aborting)) {
return;
}
- auto callback = [cbContext = CbContext] (bool ok)
+ auto callback = [cbContext = SelfContext] (bool ok)
{
if (!ok) {
return;
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h
index aba702f049c..44db21fc72d 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h
@@ -158,8 +158,7 @@ namespace NTests {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// TWriteSessionImpl
-class TWriteSessionImpl : public IWriteSession,
- public std::enable_shared_from_this<TWriteSessionImpl> {
+class TWriteSessionImpl : public TEnableSelfContext<TWriteSessionImpl> {
private:
friend class TWriteSession;
friend class TSimpleBlockingWriteSession;
@@ -302,29 +301,27 @@ public:
std::shared_ptr<TGRpcConnectionsImpl> connections,
TDbDriverStatePtr dbDriverState);
- TMaybe<TWriteSessionEvent::TEvent> GetEvent(bool block = false) override;
+ TMaybe<TWriteSessionEvent::TEvent> GetEvent(bool block = false);
TVector<TWriteSessionEvent::TEvent> GetEvents(bool block = false,
- TMaybe<size_t> maxEventsCount = Nothing()) override;
- NThreading::TFuture<ui64> GetInitSeqNo() override;
+ TMaybe<size_t> maxEventsCount = Nothing());
+ NThreading::TFuture<ui64> GetInitSeqNo();
void Write(TContinuationToken&& continuationToken, TStringBuf data,
- TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) override;
+ TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing());
void WriteEncoded(TContinuationToken&& continuationToken, TStringBuf data, ECodec codec, ui32 originalSize,
- TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) override;
+ TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing());
- NThreading::TFuture<void> WaitEvent() override;
+ NThreading::TFuture<void> WaitEvent();
// Empty maybe - block till all work is done. Otherwise block at most at closeTimeout duration.
- bool Close(TDuration closeTimeout = TDuration::Max()) override;
+ bool Close(TDuration closeTimeout = TDuration::Max());
- TWriterCounters::TPtr GetCounters() override {Y_ABORT("Unimplemented"); } //ToDo - unimplemented;
+ TWriterCounters::TPtr GetCounters() {Y_ABORT("Unimplemented"); } //ToDo - unimplemented;
~TWriteSessionImpl(); // will not call close - destroy everything without acks
- void SetCallbackContext(std::shared_ptr<TCallbackContext<TWriteSessionImpl>> ctx);
-
private:
TStringBuilder LogPrefix() const;
@@ -393,7 +390,6 @@ private:
TStringType PrevToken;
bool UpdateTokenInProgress = false;
TInstant LastTokenUpdate = TInstant::Zero();
- std::shared_ptr<TCallbackContext<TWriteSessionImpl>> CbContext;
std::shared_ptr<TWriteSessionEventsQueue> EventsQueue;
NGrpc::IQueueClientContextPtr ClientContext; // Common client context.
NGrpc::IQueueClientContextPtr ConnectContext;
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp
index 989ff5b72ab..480883d2620 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp
@@ -622,7 +622,7 @@ TSingleClusterReadSessionImpl<true>* TReadSessionImplTestSetup::GetSession() {
if (!Settings.EventHandlers_.HandlersExecutor_) {
Settings.EventHandlers_.HandlersExecutor(GetDefaultExecutor());
}
- Session = std::make_shared<TSingleClusterReadSessionImpl<true>>(
+ CbContext = MakeWithCallbackContext<TSingleClusterReadSessionImpl<true>>(
Settings,
"db",
"sessionid",
@@ -632,7 +632,7 @@ TSingleClusterReadSessionImpl<true>* TReadSessionImplTestSetup::GetSession() {
GetEventsQueue(),
FakeContext,
PartitionIdStart, PartitionIdStep);
- CbContext = Session->MakeCallbackContext();
+ Session = CbContext->TryGet();
}
return Session.get();
}
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.cpp
index 3b99df5ec4a..21b2be71948 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.cpp
@@ -25,7 +25,7 @@ TSimpleWriteSessionTestAdapter::TSimpleWriteSessionTestAdapter(TSimpleBlockingWr
ui64 TSimpleWriteSessionTestAdapter::GetAcquiredMessagesCount() const {
if (Session->Writer)
- return Session->Writer->Impl->MessagesAcquired;
+ return Session->Writer->TryGetImpl()->MessagesAcquired;
else
return 0;
}
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp
index 4b930f8fbb8..949abf9ab11 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp
@@ -78,7 +78,8 @@ void TReadSession::CreateClusterSessionsImpl(NPersQueue::TDeferredActions<false>
AbortImpl(EStatus::ABORTED, DRIVER_IS_STOPPING_DESCRIPTION, deferred);
return;
}
- Session = std::make_shared<NPersQueue::TSingleClusterReadSessionImpl<false>>(
+
+ CbContext = NPersQueue::MakeWithCallbackContext<NPersQueue::TSingleClusterReadSessionImpl<false>>(
Settings,
DbDriverState->Database,
SessionId,
@@ -87,9 +88,9 @@ void TReadSession::CreateClusterSessionsImpl(NPersQueue::TDeferredActions<false>
Client->CreateReadSessionConnectionProcessorFactory(),
EventsQueue,
context,
- 1, 1);
+ 1, 1
+ );
- CbContext = Session->MakeCallbackContext();
deferred.DeferStartSession(CbContext);
}
@@ -271,7 +272,7 @@ bool TReadSession::Close(TDuration timeout) {
}
Closing = true;
- session = Session;
+ session = CbContext->TryGet();
}
session->Close(callback);
@@ -352,7 +353,7 @@ void TReadSession::AbortImpl(NPersQueue::TDeferredActions<false>&) {
if (DumpCountersContext) {
DumpCountersContext->Cancel();
}
- Session->Abort();
+ CbContext->TryGet()->Abort();
}
}
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h
index aa626de4b15..127fcc436f7 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h
@@ -8,8 +8,7 @@
namespace NYdb::NTopic {
-class TReadSession : public IReadSession,
- public std::enable_shared_from_this<TReadSession> {
+class TReadSession : public IReadSession {
public:
TReadSession(const TReadSessionSettings& settings,
std::shared_ptr<TTopicClient::TImpl> client,
@@ -87,7 +86,6 @@ private:
TAdaptiveLock Lock;
std::shared_ptr<NPersQueue::TReadSessionEventsQueue<false>> EventsQueue;
- NPersQueue::TSingleClusterReadSessionImpl<false>::TPtr Session;
std::shared_ptr<NPersQueue::TCallbackContext<NPersQueue::TSingleClusterReadSessionImpl<false>>> CbContext;
TVector<TTopicReadSettings> Topics;
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp
index 6a8a618831c..f6884c017bb 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp
@@ -1,13 +1,4 @@
#include "write_session.h"
-#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
-#include <library/cpp/string_utils/url/url.h>
-
-#include <google/protobuf/util/time_util.h>
-
-#include <util/generic/store_policy.h>
-#include <util/generic/utility.h>
-#include <util/stream/buffer.h>
-
namespace NYdb::NTopic {
@@ -19,30 +10,27 @@ TWriteSession::TWriteSession(
std::shared_ptr<TTopicClient::TImpl> client,
std::shared_ptr<TGRpcConnectionsImpl> connections,
TDbDriverStatePtr dbDriverState)
- : Impl(std::make_shared<TWriteSessionImpl>(settings, std::move(client), std::move(connections), std::move(dbDriverState)))
-{
- CbContext = std::make_shared<NPersQueue::TCallbackContext<TWriteSessionImpl>>(Impl);
- Impl->SetCallbackContext(CbContext);
+ : TContextOwner(settings, std::move(client), std::move(connections), std::move(dbDriverState)) {
}
void TWriteSession::Start(const TDuration& delay) {
- Impl->Start(delay);
+ TryGetImpl()->Start(delay);
}
NThreading::TFuture<ui64> TWriteSession::GetInitSeqNo() {
- return Impl->GetInitSeqNo();
+ return TryGetImpl()->GetInitSeqNo();
}
TMaybe<TWriteSessionEvent::TEvent> TWriteSession::GetEvent(bool block) {
- return Impl->EventsQueue->GetEvent(block);
+ return TryGetImpl()->EventsQueue->GetEvent(block);
}
TVector<TWriteSessionEvent::TEvent> TWriteSession::GetEvents(bool block, TMaybe<size_t> maxEventsCount) {
- return Impl->EventsQueue->GetEvents(block, maxEventsCount);
+ return TryGetImpl()->EventsQueue->GetEvents(block, maxEventsCount);
}
NThreading::TFuture<void> TWriteSession::WaitEvent() {
- return Impl->EventsQueue->WaitEvent();
+ return TryGetImpl()->EventsQueue->WaitEvent();
}
void TWriteSession::WriteEncoded(TContinuationToken&& token, TStringBuf data, ECodec codec, ui32 originalSize,
@@ -52,12 +40,12 @@ void TWriteSession::WriteEncoded(TContinuationToken&& token, TStringBuf data, EC
message.SeqNo(*seqNo);
if (createTimestamp.Defined())
message.CreateTimestamp(*createTimestamp);
- Impl->WriteInternal(std::move(token), std::move(message));
+ TryGetImpl()->WriteInternal(std::move(token), std::move(message));
}
void TWriteSession::WriteEncoded(TContinuationToken&& token, TWriteMessage&& message)
{
- Impl->WriteInternal(std::move(token), std::move(message));
+ TryGetImpl()->WriteInternal(std::move(token), std::move(message));
}
void TWriteSession::Write(TContinuationToken&& token, TStringBuf data, TMaybe<ui64> seqNo,
@@ -67,20 +55,19 @@ void TWriteSession::Write(TContinuationToken&& token, TStringBuf data, TMaybe<ui
message.SeqNo(*seqNo);
if (createTimestamp.Defined())
message.CreateTimestamp(*createTimestamp);
- Impl->WriteInternal(std::move(token), std::move(message));
+ TryGetImpl()->WriteInternal(std::move(token), std::move(message));
}
void TWriteSession::Write(TContinuationToken&& token, TWriteMessage&& message) {
- Impl->WriteInternal(std::move(token), std::move(message));
+ TryGetImpl()->WriteInternal(std::move(token), std::move(message));
}
bool TWriteSession::Close(TDuration closeTimeout) {
- return Impl->Close(closeTimeout);
+ return TryGetImpl()->Close(closeTimeout);
}
TWriteSession::~TWriteSession() {
- Impl->Close(TDuration::Zero());
- CbContext->Cancel();
+ TryGetImpl()->Close(TDuration::Zero());
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h
index 13fec07c33c..e8c618ba2b2 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h
@@ -16,7 +16,7 @@ namespace NYdb::NTopic {
// TWriteSession
class TWriteSession : public IWriteSession,
- public std::enable_shared_from_this<TWriteSession> {
+ public NPersQueue::TContextOwner<TWriteSessionImpl> {
private:
friend class TSimpleBlockingWriteSession;
friend class TTopicClient;
@@ -53,10 +53,6 @@ public:
private:
void Start(const TDuration& delay);
-
-private:
- std::shared_ptr<NPersQueue::TCallbackContext<TWriteSessionImpl>> CbContext;
- std::shared_ptr<TWriteSessionImpl> Impl;
};
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
index c02e463c668..8949546cf14 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
@@ -75,12 +75,8 @@ TWriteSessionImpl::TWriteSessionImpl(
}
-void TWriteSessionImpl::SetCallbackContext(std::shared_ptr<NPersQueue::TCallbackContext<TWriteSessionImpl>> ctx) {
- CbContext = std::move(ctx);
-}
-
void TWriteSessionImpl::Start(const TDuration& delay) {
- Y_ABORT_UNLESS(CbContext);
+ Y_ABORT_UNLESS(SelfContext);
++ConnectionAttemptsDone;
if (!Started) {
with_lock(Lock) {
@@ -162,7 +158,7 @@ void TWriteSessionImpl::ConnectToPreferredPartitionLocation(const TDuration& del
request.set_partition_id(*Settings.PartitionId_);
request.set_include_location(true);
- auto extractor = [cbContext = CbContext, context = describePartitionContext](Ydb::Topic::DescribePartitionResponse* response, TPlainStatus status) mutable {
+ auto extractor = [cbContext = SelfContext, context = describePartitionContext](Ydb::Topic::DescribePartitionResponse* response, TPlainStatus status) mutable {
Ydb::Topic::DescribePartitionResult result;
if (response)
response->operation().result().UnpackTo(&result);
@@ -486,14 +482,14 @@ void TWriteSessionImpl::Connect(const TDuration& delay) {
reqSettings = TRpcRequestSettings::Make(Settings, PreferredPartitionLocation.Endpoint);
- connectCallback = [cbContext = CbContext,
+ connectCallback = [cbContext = SelfContext,
connectContext = connectContext](TPlainStatus&& st, typename IProcessor::TPtr&& processor) {
if (auto self = cbContext->LockShared()) {
self->OnConnect(std::move(st), std::move(processor), connectContext);
}
};
- connectTimeoutCallback = [cbContext = CbContext, connectTimeoutContext = connectTimeoutContext](bool ok) {
+ connectTimeoutCallback = [cbContext = SelfContext, connectTimeoutContext = connectTimeoutContext](bool ok) {
if (ok) {
if (auto self = cbContext->LockShared()) {
self->OnConnectTimeout(connectTimeoutContext);
@@ -621,7 +617,7 @@ void TWriteSessionImpl::WriteToProcessorImpl(TWriteSessionImpl::TClientMessage&&
if (Aborting) {
return;
}
- auto callback = [cbContext = CbContext,
+ auto callback = [cbContext = SelfContext,
connectionGeneration = ConnectionGeneration](NGrpc::TGrpcStatus&& grpcStatus) {
if (auto self = cbContext->LockShared()) {
self->OnWriteDone(std::move(grpcStatus), connectionGeneration);
@@ -642,7 +638,7 @@ void TWriteSessionImpl::ReadFromProcessor() {
}
prc = Processor;
generation = ConnectionGeneration;
- callback = [cbContext = CbContext,
+ callback = [cbContext = SelfContext,
connectionGeneration = generation,
processor = prc,
serverMessage = ServerMessage]
@@ -958,7 +954,7 @@ void TWriteSessionImpl::CompressImpl(TBlock&& block_) {
std::shared_ptr<TBlock> blockPtr(std::make_shared<TBlock>());
blockPtr->Move(block_);
- auto lambda = [cbContext = CbContext,
+ auto lambda = [cbContext = SelfContext,
codec = Settings.Codec_,
level = Settings.CompressionLevel_,
isSyncCompression = !CompressionExecutor->IsAsync(),
@@ -1277,7 +1273,7 @@ void TWriteSessionImpl::HandleWakeUpImpl() {
if (AtomicGet(Aborting)) {
return;
}
- auto callback = [cbContext = CbContext] (bool ok)
+ auto callback = [cbContext = SelfContext] (bool ok)
{
if (!ok) {
return;
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
index cc8da2d38d1..9c5451ece91 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
@@ -149,8 +149,7 @@ struct TMemoryUsageChange {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// TWriteSessionImpl
-class TWriteSessionImpl : public IWriteSession,
- public std::enable_shared_from_this<TWriteSessionImpl> {
+class TWriteSessionImpl : public NPersQueue::TEnableSelfContext<TWriteSessionImpl> {
private:
friend class TWriteSession;
friend class TSimpleBlockingWriteSession;
@@ -322,41 +321,39 @@ public:
std::shared_ptr<TGRpcConnectionsImpl> connections,
TDbDriverStatePtr dbDriverState);
- TMaybe<TWriteSessionEvent::TEvent> GetEvent(bool block = false) override;
+ TMaybe<TWriteSessionEvent::TEvent> GetEvent(bool block = false);
TVector<TWriteSessionEvent::TEvent> GetEvents(bool block = false,
- TMaybe<size_t> maxEventsCount = Nothing()) override;
- NThreading::TFuture<ui64> GetInitSeqNo() override;
+ TMaybe<size_t> maxEventsCount = Nothing());
+ NThreading::TFuture<ui64> GetInitSeqNo();
- void Write(TContinuationToken&& continuationToken, TWriteMessage&& message) override;
+ void Write(TContinuationToken&& continuationToken, TWriteMessage&& message);
void Write(TContinuationToken&&, TStringBuf, TMaybe<ui64> seqNo = Nothing(),
- TMaybe<TInstant> createTimestamp = Nothing()) override {
+ TMaybe<TInstant> createTimestamp = Nothing()) {
Y_UNUSED(seqNo);
Y_UNUSED(createTimestamp);
Y_ABORT("Do not use this method");
};
- void WriteEncoded(TContinuationToken&& continuationToken, TWriteMessage&& message) override;
+ void WriteEncoded(TContinuationToken&& continuationToken, TWriteMessage&& message);
void WriteEncoded(TContinuationToken&&, TStringBuf, ECodec, ui32,
- TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) override {
+ TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) {
Y_UNUSED(seqNo);
Y_UNUSED(createTimestamp);
Y_ABORT("Do not use this method");
}
- NThreading::TFuture<void> WaitEvent() override;
+ NThreading::TFuture<void> WaitEvent();
// Empty maybe - block till all work is done. Otherwise block at most at closeTimeout duration.
- bool Close(TDuration closeTimeout = TDuration::Max()) override;
+ bool Close(TDuration closeTimeout = TDuration::Max());
- TWriterCounters::TPtr GetCounters() override {Y_ABORT("Unimplemented"); } //ToDo - unimplemented;
+ TWriterCounters::TPtr GetCounters() {Y_ABORT("Unimplemented"); } //ToDo - unimplemented;
~TWriteSessionImpl(); // will not call close - destroy everything without acks
- void SetCallbackContext(std::shared_ptr<NPersQueue::TCallbackContext<TWriteSessionImpl>> ctx);
-
private:
TStringBuilder LogPrefix() const;
@@ -426,7 +423,6 @@ private:
TStringType PrevToken;
bool UpdateTokenInProgress = false;
TInstant LastTokenUpdate = TInstant::Zero();
- std::shared_ptr<NPersQueue::TCallbackContext<TWriteSessionImpl>> CbContext;
std::shared_ptr<TWriteSessionEventsQueue> EventsQueue;
NGrpc::IQueueClientContextPtr ClientContext; // Common client context.
NGrpc::IQueueClientContextPtr ConnectContext;