diff options
author | ildar-khisam <ikhis@ydb.tech> | 2023-11-07 23:23:58 +0300 |
---|---|---|
committer | ildar-khisam <ikhis@ydb.tech> | 2023-11-07 23:39:31 +0300 |
commit | d8b4abf5dc60664a332b8a7636bea320f17dc204 (patch) | |
tree | 5dc5c9aef0565c185715c46ced1524fd68ac59c8 | |
parent | 59e3f9da6fd5a8e9232c51e78a3bc8e87d71a6be (diff) | |
download | ydb-d8b4abf5dc60664a332b8a7636bea320f17dc204.tar.gz |
split fed read session to client and impl parts
split fed read session to client and impl parts
21 files changed, 330 insertions, 224 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp index 65a5eb8c8f0..07645caf6e8 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.cpp @@ -54,10 +54,10 @@ NTopic::TReadSessionSettings FromFederated(const TFederatedReadSessionSettings& return SubsessionSettings; } -TFederatedReadSession::TFederatedReadSession(const TFederatedReadSessionSettings& settings, - std::shared_ptr<TGRpcConnectionsImpl> connections, - const TFederatedTopicClientSettings& clientSetttings, - std::shared_ptr<TFederatedDbObserver> observer) +TFederatedReadSessionImpl::TFederatedReadSessionImpl(const TFederatedReadSessionSettings& settings, + std::shared_ptr<TGRpcConnectionsImpl> connections, + const TFederatedTopicClientSettings& clientSetttings, + std::shared_ptr<TFederatedDbObserver> observer) : Settings(settings) , Connections(std::move(connections)) , SubClientSetttings(FromFederated(clientSetttings)) @@ -68,17 +68,19 @@ TFederatedReadSession::TFederatedReadSession(const TFederatedReadSessionSettings { } -void TFederatedReadSession::Start() { - AsyncInit.Subscribe([self = shared_from_this()](const auto& f){ +void TFederatedReadSessionImpl::Start() { + AsyncInit.Subscribe([selfCtx = SelfContext](const auto& f){ Y_UNUSED(f); - with_lock(self->Lock) { - self->FederationState = self->Observer->GetState(); - self->OnFederatedStateUpdateImpl(); + if (auto self = selfCtx->LockShared()) { + with_lock(self->Lock) { + self->FederationState = self->Observer->GetState(); + self->OnFederatedStateUpdateImpl(); + } } }); } -void TFederatedReadSession::OpenSubSessionsImpl() { +void TFederatedReadSessionImpl::OpenSubSessionsImpl() { for (const auto& db : FederationState->DbInfos) { // TODO check if available NTopic::TTopicClientSettings settings = SubClientSetttings; @@ -92,7 +94,7 @@ void TFederatedReadSession::OpenSubSessionsImpl() { SubsessionIndex = 0; } -void TFederatedReadSession::OnFederatedStateUpdateImpl() { +void TFederatedReadSessionImpl::OnFederatedStateUpdateImpl() { if (!FederationState->Status.IsSuccess()) { CloseImpl(); return; @@ -104,24 +106,27 @@ void TFederatedReadSession::OnFederatedStateUpdateImpl() { // 3) TODO LATER reschedule OnFederatedStateUpdate } -NThreading::TFuture<void> TFederatedReadSession::WaitEvent() { +NThreading::TFuture<void> TFederatedReadSessionImpl::WaitEvent() { // TODO override with read session settings timeout - return AsyncInit.Apply([self = shared_from_this()](const NThreading::TFuture<void>) { - if (self->Closing) { - return NThreading::MakeFuture(); - } - std::vector<NThreading::TFuture<void>> waiters; - with_lock(self->Lock) { - Y_ABORT_UNLESS(!self->SubSessions.empty(), "SubSessions empty in discovered state"); - for (const auto& sub : self->SubSessions) { - waiters.emplace_back(sub.Session->WaitEvent()); + return AsyncInit.Apply([selfCtx = SelfContext](const NThreading::TFuture<void>) { + if (auto self = selfCtx->LockShared()) { + if (self->Closing) { + return NThreading::MakeFuture(); + } + std::vector<NThreading::TFuture<void>> waiters; + with_lock(self->Lock) { + Y_ABORT_UNLESS(!self->SubSessions.empty(), "SubSessions empty in discovered state"); + for (const auto& sub : self->SubSessions) { + waiters.emplace_back(sub.Session->WaitEvent()); + } } + return NThreading::WaitAny(std::move(waiters)); } - return NThreading::WaitAny(std::move(waiters)); + return NThreading::MakeFuture(); }); } -TVector<TReadSessionEvent::TEvent> TFederatedReadSession::GetEvents(bool block, TMaybe<size_t> maxEventsCount, size_t maxByteSize) { +TVector<TReadSessionEvent::TEvent> TFederatedReadSessionImpl::GetEvents(bool block, TMaybe<size_t> maxEventsCount, size_t maxByteSize) { if (block) { WaitEvent().Wait(); } @@ -152,16 +157,11 @@ TVector<TReadSessionEvent::TEvent> TFederatedReadSession::GetEvents(bool block, return result; } -TMaybe<TReadSessionEvent::TEvent> TFederatedReadSession::GetEvent(bool block, size_t maxByteSize) { - auto events = GetEvents(block, 1, maxByteSize); - return events.empty() ? Nothing() : TMaybe<TReadSessionEvent::TEvent>{std::move(events.front())}; -} - -void TFederatedReadSession::CloseImpl() { +void TFederatedReadSessionImpl::CloseImpl() { Closing = true; } -bool TFederatedReadSession::Close(TDuration timeout) { +bool TFederatedReadSessionImpl::Close(TDuration timeout) { bool result = true; for (const auto& sub : SubSessions) { // TODO substract from user timeout diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h index a9ffe4c703c..4bbef50c5c1 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_read_session.h @@ -2,15 +2,16 @@ #include <ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federated_topic_impl.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/callback_context.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h> #include <ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h> namespace NYdb::NFederatedTopic { -class TFederatedReadSession : public IFederatedReadSession, - public std::enable_shared_from_this<TFederatedReadSession> { +class TFederatedReadSessionImpl : public NPersQueue::TEnableSelfContext<TFederatedReadSessionImpl> { friend class TFederatedTopicClient::TImpl; + friend class TFederatedReadSession; private: struct TSubSession { @@ -24,24 +25,23 @@ private: }; public: - TFederatedReadSession(const TFederatedReadSessionSettings& settings, - std::shared_ptr<TGRpcConnectionsImpl> connections, - const TFederatedTopicClientSettings& clientSetttings, - std::shared_ptr<TFederatedDbObserver> observer); + TFederatedReadSessionImpl(const TFederatedReadSessionSettings& settings, + std::shared_ptr<TGRpcConnectionsImpl> connections, + const TFederatedTopicClientSettings& clientSetttings, + std::shared_ptr<TFederatedDbObserver> observer); - ~TFederatedReadSession() = default; + ~TFederatedReadSessionImpl() = default; - NThreading::TFuture<void> WaitEvent() override; - TVector<TReadSessionEvent::TEvent> GetEvents(bool block, TMaybe<size_t> maxEventsCount, size_t maxByteSize) override; - TMaybe<TReadSessionEvent::TEvent> GetEvent(bool block, size_t maxByteSize) override; + NThreading::TFuture<void> WaitEvent(); + TVector<TReadSessionEvent::TEvent> GetEvents(bool block, TMaybe<size_t> maxEventsCount, size_t maxByteSize); - bool Close(TDuration timeout) override; + bool Close(TDuration timeout); - inline TString GetSessionId() const override { + inline TString GetSessionId() const { return SessionId; } - inline NTopic::TReaderCounters::TPtr GetCounters() const override { + inline NTopic::TReaderCounters::TPtr GetCounters() const { return Settings.Counters_; // Always not nullptr. } @@ -93,4 +93,52 @@ private: bool Closing = false; }; + +class TFederatedReadSession : public IFederatedReadSession, + public NPersQueue::TContextOwner<TFederatedReadSessionImpl> { + friend class TFederatedTopicClient::TImpl; + +public: + TFederatedReadSession(const TFederatedReadSessionSettings& settings, + std::shared_ptr<TGRpcConnectionsImpl> connections, + const TFederatedTopicClientSettings& clientSettings, + std::shared_ptr<TFederatedDbObserver> observer) + : TContextOwner(settings, std::move(connections), clientSettings, std::move(observer)) { + } + + ~TFederatedReadSession() { + TryGetImpl()->Close(TDuration::Zero()); + } + + NThreading::TFuture<void> WaitEvent() override { + return TryGetImpl()->WaitEvent(); + } + + TVector<TReadSessionEvent::TEvent> GetEvents(bool block, TMaybe<size_t> maxEventsCount, size_t maxByteSize) override { + return TryGetImpl()->GetEvents(block, maxEventsCount, maxByteSize); + } + + TMaybe<TReadSessionEvent::TEvent> GetEvent(bool block, size_t maxByteSize) override { + auto events = GetEvents(block, 1, maxByteSize); + return events.empty() ? Nothing() : TMaybe<TReadSessionEvent::TEvent>{std::move(events.front())}; + } + + bool Close(TDuration timeout) override { + return TryGetImpl()->Close(timeout); + } + + inline TString GetSessionId() const override { + return TryGetImpl()->GetSessionId(); + } + + inline NTopic::TReaderCounters::TPtr GetCounters() const override { + return TryGetImpl()->GetCounters(); + } + +private: + void Start() { + return TryGetImpl()->Start(); + } +}; + } // namespace NYdb::NFederatedTopic diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp index 9e8e795b4e3..62a8fc5b67e 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.cpp @@ -6,7 +6,7 @@ namespace NYdb::NFederatedTopic { constexpr TDuration REDISCOVERY_DELAY = TDuration::Seconds(30); -TFederatedDbObserver::TFederatedDbObserver(std::shared_ptr<TGRpcConnectionsImpl> connections, const TFederatedTopicClientSettings& settings) +TFederatedDbObserverImpl::TFederatedDbObserverImpl(std::shared_ptr<TGRpcConnectionsImpl> connections, const TFederatedTopicClientSettings& settings) : TClientImplCommon(std::move(connections), settings) , FederatedDbState(std::make_shared<TFederatedDbState>()) , PromiseToInitState(NThreading::NewPromise()) @@ -17,21 +17,21 @@ TFederatedDbObserver::TFederatedDbObserver(std::shared_ptr<TGRpcConnectionsImpl> RpcSettings.UseAuth = true; } -TFederatedDbObserver::~TFederatedDbObserver() { +TFederatedDbObserverImpl::~TFederatedDbObserverImpl() { Stop(); } -std::shared_ptr<TFederatedDbState> TFederatedDbObserver::GetState() { +std::shared_ptr<TFederatedDbState> TFederatedDbObserverImpl::GetState() { with_lock(Lock) { return FederatedDbState; } } -NThreading::TFuture<void> TFederatedDbObserver::WaitForFirstState() { +NThreading::TFuture<void> TFederatedDbObserverImpl::WaitForFirstState() { return PromiseToInitState.GetFuture(); } -void TFederatedDbObserver::Start() { +void TFederatedDbObserverImpl::Start() { with_lock(Lock) { if (Stopping) { return; @@ -40,7 +40,7 @@ void TFederatedDbObserver::Start() { } } -void TFederatedDbObserver::Stop() { +void TFederatedDbObserverImpl::Stop() { NGrpc::IQueueClientContextPtr ctx; with_lock(Lock) { Stopping = true; @@ -52,17 +52,17 @@ void TFederatedDbObserver::Stop() { } // If observer is stale it will never update state again because of client retry policy -bool TFederatedDbObserver::IsStale() const { +bool TFederatedDbObserverImpl::IsStale() const { with_lock(Lock) { return PromiseToInitState.HasValue() && !FederatedDbState->Status.IsSuccess(); } } -Ydb::FederationDiscovery::ListFederationDatabasesRequest TFederatedDbObserver::ComposeRequest() const { +Ydb::FederationDiscovery::ListFederationDatabasesRequest TFederatedDbObserverImpl::ComposeRequest() const { return {}; } -void TFederatedDbObserver::RunFederationDiscoveryImpl() { +void TFederatedDbObserverImpl::RunFederationDiscoveryImpl() { Y_ABORT_UNLESS(Lock.IsLocked()); FederationDiscoveryDelayContext = Connections_->CreateContext(); @@ -72,14 +72,15 @@ void TFederatedDbObserver::RunFederationDiscoveryImpl() { return; } - auto extractor = [self = shared_from_this()] + auto extractor = [selfCtx = SelfContext] (google::protobuf::Any* any, TPlainStatus status) mutable { - - Ydb::FederationDiscovery::ListFederationDatabasesResult result; - if (any) { - any->UnpackTo(&result); + if (auto self = selfCtx->LockShared()) { + Ydb::FederationDiscovery::ListFederationDatabasesResult result; + if (any) { + any->UnpackTo(&result); + } + self->OnFederationDiscovery(std::move(status), std::move(result)); } - self->OnFederationDiscovery(std::move(status), std::move(result)); }; Connections_->RunDeferred<Ydb::FederationDiscovery::V1::FederationDiscoveryService, @@ -94,15 +95,17 @@ void TFederatedDbObserver::RunFederationDiscoveryImpl() { FederationDiscoveryDelayContext); } -void TFederatedDbObserver::ScheduleFederationDiscoveryImpl(TDuration delay) { +void TFederatedDbObserverImpl::ScheduleFederationDiscoveryImpl(TDuration delay) { Y_ABORT_UNLESS(Lock.IsLocked()); - auto cb = [self = shared_from_this()](bool ok) { + auto cb = [selfCtx = SelfContext](bool ok) { if (ok) { - with_lock(self->Lock) { - if (self->Stopping) { - return; + if (auto self = selfCtx->LockShared()) { + with_lock(self->Lock) { + if (self->Stopping) { + return; + } + self->RunFederationDiscoveryImpl(); } - self->RunFederationDiscoveryImpl(); } } }; @@ -119,7 +122,7 @@ void TFederatedDbObserver::ScheduleFederationDiscoveryImpl(TDuration delay) { } -void TFederatedDbObserver::OnFederationDiscovery(TStatus&& status, Ydb::FederationDiscovery::ListFederationDatabasesResult&& result) { +void TFederatedDbObserverImpl::OnFederationDiscovery(TStatus&& status, Ydb::FederationDiscovery::ListFederationDatabasesResult&& result) { with_lock(Lock) { if (Stopping) { // TODO log something diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.h b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.h index 55de50168c1..4a04b20b3ef 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.h +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/impl/federation_observer.h @@ -7,6 +7,7 @@ #include <ydb/public/api/protos/ydb_federation_discovery.pb.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/callback_context.h> #include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h> #include <ydb/public/sdk/cpp/client/ydb_federated_topic/federated_topic.h> @@ -42,14 +43,15 @@ public: }; -class TFederatedDbObserver : public TClientImplCommon<TFederatedDbObserver> { +class TFederatedDbObserverImpl : public TClientImplCommon<TFederatedDbObserverImpl>, + public NPersQueue::TEnableSelfContext<TFederatedDbObserverImpl> { public: static constexpr TDuration REDISCOVER_DELAY = TDuration::Seconds(60); public: - TFederatedDbObserver(std::shared_ptr<TGRpcConnectionsImpl> connections, const TFederatedTopicClientSettings& settings); + TFederatedDbObserverImpl(std::shared_ptr<TGRpcConnectionsImpl> connections, const TFederatedTopicClientSettings& settings); - ~TFederatedDbObserver(); + ~TFederatedDbObserverImpl(); std::shared_ptr<TFederatedDbState> GetState(); @@ -79,4 +81,32 @@ private: bool Stopping = false; }; +class TFederatedDbObserver : public NPersQueue::TContextOwner<TFederatedDbObserverImpl> { +public: + inline TFederatedDbObserver(std::shared_ptr<TGRpcConnectionsImpl> connections, + const TFederatedTopicClientSettings& settings) + : TContextOwner(connections, settings) { + } + + inline std::shared_ptr<TFederatedDbState> GetState() { + return TryGetImpl()->GetState(); + } + + inline NThreading::TFuture<void> WaitForFirstState() { + return TryGetImpl()->WaitForFirstState(); + } + + inline void Start() { + return TryGetImpl()->Start(); + } + + inline void Stop() { + return TryGetImpl()->Stop(); + } + + inline bool IsStale() const { + return TryGetImpl()->IsStale(); + } +}; + } // namespace NYdb::NFederatedTopic diff --git a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp index 7252f7f8aed..7b41ae11c7b 100644 --- a/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_federated_topic/ut/basic_usage_ut.cpp @@ -135,6 +135,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { .MaxMemoryUsageBytes(1_MB) .AppendTopics(setup->GetTestTopic()); + Cerr << "Before ReadSession was created" << Endl; ReadSession = topicClient.CreateFederatedReadSession(readSettings); Cerr << "Session was created" << Endl; diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/callback_context.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/callback_context.h index 5de727a5ab5..9e7ffe2e6d6 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/callback_context.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/callback_context.h @@ -10,8 +10,13 @@ namespace NYdb::NPersQueue { +template<typename T> +class TContextOwner; + template <typename TGuardedObject> class TCallbackContext { + friend class TContextOwner<TGuardedObject>; + public: using TMutexPtr = std::shared_ptr<std::shared_mutex>; @@ -44,28 +49,99 @@ public: }; public: - explicit TCallbackContext(typename std::shared_ptr<TGuardedObject> ptr) + explicit TCallbackContext(std::shared_ptr<TGuardedObject> ptr) : Mutex(std::make_shared<std::shared_mutex>()) , GuardedObjectPtr(std::move(ptr)) {} - // may block - ~TCallbackContext() { - Cancel(); - } - TBorrowed LockShared() { return TBorrowed(*this); } +// TODO change section below to private after removing pqv1 read session implementation +// (relation of 1 owner : n impls) +public: void Cancel() { + std::shared_ptr<TGuardedObject> waste; std::lock_guard lock(*Mutex); - GuardedObjectPtr.reset(); + std::swap(waste, GuardedObjectPtr); + } + + std::shared_ptr<TGuardedObject> TryGet() const { + if (!GuardedObjectPtr) { + ythrow yexception() << "TryGet failed, empty GuardedObjectPtr"; + } + return GuardedObjectPtr; } private: TMutexPtr Mutex; - typename std::shared_ptr<TGuardedObject> GuardedObjectPtr; + std::shared_ptr<TGuardedObject> GuardedObjectPtr; +}; + +template<typename T> +class TEnableSelfContext { + template<typename U, typename... Args> + friend std::shared_ptr<TCallbackContext<U>> MakeWithCallbackContext(Args&&... args); + +public: + TEnableSelfContext() = default; + ~TEnableSelfContext() = default; + + // non-moveable for simplicity, use only via shared pointers + TEnableSelfContext(const TEnableSelfContext&) = delete; + TEnableSelfContext(TEnableSelfContext&&) = delete; + TEnableSelfContext& operator=(const TEnableSelfContext&) = delete; + TEnableSelfContext& operator=(TEnableSelfContext&&) = delete; + +protected: + void SetSelfContext(std::shared_ptr<T> ptr) { + SelfContext = std::make_shared<TCallbackContext<T>>(std::move(ptr)); + } + +protected: + std::shared_ptr<TCallbackContext<T>> SelfContext; +}; + +template<typename T, typename... Args> +std::shared_ptr<TCallbackContext<T>> MakeWithCallbackContext(Args&&... args) { + static_assert(std::is_base_of_v<TEnableSelfContext<T>, T>, "Expected object derived from TEnableSelfContext"); + auto pObject = std::make_shared<T>(std::forward<Args>(args)...); + pObject->SetSelfContext(pObject); + return pObject->SelfContext; +} + +template<typename T> +class TContextOwner { +public: + template <typename... Args> + TContextOwner(Args&&... args) + : ImplContext(MakeWithCallbackContext<T>(std::forward<Args>(args)...)) { + } + + // may block + ~TContextOwner() { + CancelImpl(); + } + + TContextOwner(const TContextOwner&) = delete; + TContextOwner(TContextOwner&&) = default; + TContextOwner& operator=(const TContextOwner&) = delete; + TContextOwner& operator=(TContextOwner&&) = default; + +protected: + std::shared_ptr<T> TryGetImpl() const { + return ImplContext->TryGet(); + } + + void CancelImpl() { + if (ImplContext) { + ImplContext->Cancel(); + } + } + +protected: + std::shared_ptr<TCallbackContext<T>> ImplContext; }; } diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp index 403941309f6..12d7e86b53c 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp @@ -206,20 +206,20 @@ void TReadSession::CreateClusterSessionsImpl(TDeferredActions<true>& deferred) { AbortImpl(EStatus::ABORTED, DRIVER_IS_STOPPING_DESCRIPTION, deferred); return; } - clusterSessionInfo.Session = - std::make_shared<TSingleClusterReadSessionImpl<true>>( - sessionSettings, - DbDriverState->Database, - SessionId, - clusterName, - Log, - subclient->CreateReadSessionConnectionProcessorFactory(), - EventsQueue, - context, - partitionStreamIdStart++, - clusterSessionsCount); - - CbContexts.push_back(clusterSessionInfo.Session->MakeCallbackContext()); + CbContexts.push_back(MakeWithCallbackContext<TSingleClusterReadSessionImpl<true>>( + sessionSettings, + DbDriverState->Database, + SessionId, + clusterName, + Log, + subclient->CreateReadSessionConnectionProcessorFactory(), + EventsQueue, + context, + partitionStreamIdStart++, + clusterSessionsCount + )); + + clusterSessionInfo.Session = CbContexts.back()->TryGet(); deferred.DeferStartSession(CbContexts.back()); } } diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h index 3b89d7bf1c5..a778d7f2737 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h @@ -921,7 +921,7 @@ namespace NYdb::NPersQueue { // It is parametrized with output queue for client events // and connection factory interface to separate logic from transport. template <bool UseMigrationProtocol> -class TSingleClusterReadSessionImpl : public std::enable_shared_from_this<TSingleClusterReadSessionImpl<UseMigrationProtocol>>, +class TSingleClusterReadSessionImpl : public NPersQueue::TEnableSelfContext<TSingleClusterReadSessionImpl<UseMigrationProtocol>>, public IUserRetrievedEventCallback<UseMigrationProtocol> { public: using TSelf = TSingleClusterReadSessionImpl<UseMigrationProtocol>; @@ -1009,8 +1009,6 @@ public: return Log; } - TCallbackContextPtr<UseMigrationProtocol> MakeCallbackContext(); - private: void BreakConnectionAndReconnectImpl(TPlainStatus&& status, TDeferredActions<UseMigrationProtocol>& deferred); @@ -1192,8 +1190,6 @@ private: std::atomic<int> DecompressionTasksInflight = 0; i64 ReadSizeBudget; i64 ReadSizeServerDelta = 0; - - TCallbackContextPtr<UseMigrationProtocol> CbContext; }; // High level class that manages several read session impls. diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp index 7aa03c77015..d0828562424 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp @@ -230,15 +230,9 @@ TStringBuilder TSingleClusterReadSessionImpl<UseMigrationProtocol>::GetLogPrefix return TStringBuilder() << GetDatabaseLogPrefix(Database) << "[" << SessionId << "] [" << ClusterName << "] "; } -template <bool UseMigrationProtocol> -TCallbackContextPtr<UseMigrationProtocol> TSingleClusterReadSessionImpl<UseMigrationProtocol>::MakeCallbackContext() { - CbContext = std::make_shared<TCallbackContext<TSelf>>(this->shared_from_this()); - return CbContext; -} - template<bool UseMigrationProtocol> void TSingleClusterReadSessionImpl<UseMigrationProtocol>::Start() { - Y_ABORT_UNLESS(CbContext); + Y_ABORT_UNLESS(this->SelfContext); Settings.DecompressionExecutor_->Start(); Settings.EventHandlers_.HandlersExecutor_->Start(); if (!Reconnect(TPlainStatus())) { @@ -324,16 +318,16 @@ bool TSingleClusterReadSessionImpl<UseMigrationProtocol>::Reconnect(const TPlain // Destroy all partition streams before connecting. DestroyAllPartitionStreamsImpl(deferred); - Y_ABORT_UNLESS(CbContext); + Y_ABORT_UNLESS(this->SelfContext); - connectCallback = [cbContext = CbContext, + connectCallback = [cbContext = this->SelfContext, connectContext = connectContext](TPlainStatus&& st, typename IProcessor::TPtr&& processor) { if (auto borrowedSelf = cbContext->LockShared()) { borrowedSelf->OnConnect(std::move(st), std::move(processor), connectContext); // OnConnect could be called inplace! } }; - connectTimeoutCallback = [cbContext = CbContext, + connectTimeoutCallback = [cbContext = this->SelfContext, connectTimeoutContext = connectTimeoutContext](bool ok) { if (ok) { if (auto borrowedSelf = cbContext->LockShared()) { @@ -375,7 +369,7 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::BreakConnectionAndReco Processor = nullptr; RetryState = Settings.RetryPolicy_->CreateRetryState(); // Explicitly create retry state to determine whether we should connect to server again. - deferred.DeferReconnection(CbContext, std::move(status)); + deferred.DeferReconnection(this->SelfContext, std::move(status)); } template<bool UseMigrationProtocol> @@ -824,9 +818,9 @@ void TSingleClusterReadSessionImpl<UseMigrationProtocol>::ReadFromProcessorImpl( if (Processor && !Closing) { ServerMessage->Clear(); - Y_ABORT_UNLESS(CbContext); + Y_ABORT_UNLESS(this->SelfContext); - auto callback = [cbContext = CbContext, + auto callback = [cbContext = this->SelfContext, connectionGeneration = ConnectionGeneration, // Capture message & processor not to read in freed memory. serverMessage = ServerMessage, @@ -1018,7 +1012,7 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( } auto decompressionInfo = std::make_shared<TDataDecompressionInfo<true>>(std::move(partitionData), - CbContext, + SelfContext, Settings.Decompress_); Y_ABORT_UNLESS(decompressionInfo); @@ -1044,7 +1038,7 @@ inline void TSingleClusterReadSessionImpl<true>::OnReadDoneImpl( NextPartitionStreamId, msg.topic().path(), msg.cluster(), msg.partition() + 1, // Group. msg.partition(), // Partition. - msg.assign_id(), msg.read_offset(), CbContext); + msg.assign_id(), msg.read_offset(), SelfContext); NextPartitionStreamId += PartitionStreamIdStep; // Renew partition stream. @@ -1262,7 +1256,7 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( partitionStream->SetFirstNotReadOffset(desiredOffset); auto decompressionInfo = std::make_shared<TDataDecompressionInfo<false>>(std::move(partitionData), - CbContext, + SelfContext, Settings.Decompress_, serverBytesSize); // TODO (ildar-khisam@): share serverBytesSize between partitions data according to their actual sizes; @@ -1290,7 +1284,7 @@ inline void TSingleClusterReadSessionImpl<false>::OnReadDoneImpl( auto partitionStream = MakeIntrusive<TPartitionStreamImpl<false>>( NextPartitionStreamId, msg.partition_session().path(), msg.partition_session().partition_id(), msg.partition_session().partition_session_id(), msg.committed_offset(), - CbContext); + SelfContext); NextPartitionStreamId += PartitionStreamIdStep; // Renew partition stream. diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp index 9b1248ef375..0a4ee65ae37 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp @@ -17,49 +17,45 @@ TWriteSession::TWriteSession( std::shared_ptr<TPersQueueClient::TImpl> client, std::shared_ptr<TGRpcConnectionsImpl> connections, TDbDriverStatePtr dbDriverState) - : Impl(std::make_shared<TWriteSessionImpl>(settings, std::move(client), std::move(connections), std::move(dbDriverState))) -{ - CbContext = std::make_shared<TCallbackContext<TWriteSessionImpl>>(Impl); - Impl->SetCallbackContext(CbContext); + : TContextOwner(settings, std::move(client), std::move(connections), std::move(dbDriverState)) { } void TWriteSession::Start(const TDuration& delay) { - Impl->Start(delay); + TryGetImpl()->Start(delay); } NThreading::TFuture<ui64> TWriteSession::GetInitSeqNo() { - return Impl->GetInitSeqNo(); + return TryGetImpl()->GetInitSeqNo(); } TMaybe<TWriteSessionEvent::TEvent> TWriteSession::GetEvent(bool block) { - return Impl->EventsQueue->GetEvent(block); + return TryGetImpl()->EventsQueue->GetEvent(block); } TVector<TWriteSessionEvent::TEvent> TWriteSession::GetEvents(bool block, TMaybe<size_t> maxEventsCount) { - return Impl->EventsQueue->GetEvents(block, maxEventsCount); + return TryGetImpl()->EventsQueue->GetEvents(block, maxEventsCount); } NThreading::TFuture<void> TWriteSession::WaitEvent() { - return Impl->EventsQueue->WaitEvent(); + return TryGetImpl()->EventsQueue->WaitEvent(); } void TWriteSession::WriteEncoded(TContinuationToken&& token, TStringBuf data, ECodec codec, ui32 originalSize, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp) { - Impl->WriteInternal(std::move(token), data, codec, originalSize, seqNo, createTimestamp); + TryGetImpl()->WriteInternal(std::move(token), data, codec, originalSize, seqNo, createTimestamp); } void TWriteSession::Write(TContinuationToken&& token, TStringBuf data, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp) { - Impl->WriteInternal(std::move(token), data, {}, 0, seqNo, createTimestamp); + TryGetImpl()->WriteInternal(std::move(token), data, {}, 0, seqNo, createTimestamp); } bool TWriteSession::Close(TDuration closeTimeout) { - return Impl->Close(closeTimeout); + return TryGetImpl()->Close(closeTimeout); } TWriteSession::~TWriteSession() { - Impl->Close(TDuration::Zero()); - CbContext->Cancel(); + TryGetImpl()->Close(TDuration::Zero()); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h index 7db726d73dc..8057d4560e6 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h @@ -21,7 +21,7 @@ namespace NTests { // TWriteSession class TWriteSession : public IWriteSession, - public std::enable_shared_from_this<TWriteSession> { + public TContextOwner<TWriteSessionImpl> { private: friend class TSimpleBlockingWriteSession; friend class TPersQueueClient; @@ -56,10 +56,6 @@ public: private: void Start(const TDuration& delay); - -private: - std::shared_ptr<NPersQueue::TCallbackContext<TWriteSessionImpl>> CbContext; - std::shared_ptr<TWriteSessionImpl> Impl; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp index 0aeae6d05d3..9ea829fccd4 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp @@ -75,12 +75,8 @@ TWriteSessionImpl::TWriteSessionImpl( } -void TWriteSessionImpl::SetCallbackContext(std::shared_ptr<TCallbackContext<TWriteSessionImpl>> ctx) { - CbContext = std::move(ctx); -} - void TWriteSessionImpl::Start(const TDuration& delay) { - Y_ABORT_UNLESS(CbContext); + Y_ABORT_UNLESS(SelfContext); ++ConnectionAttemptsDone; if (!Started) { with_lock(Lock) { @@ -146,7 +142,7 @@ void TWriteSessionImpl::DoCdsRequest(TDuration delay) { (Settings.ClusterDiscoveryMode_ == EClusterDiscoveryMode::Auto && !IsFederation(DbDriverState->DiscoveryEndpoint))); if (!cdsRequestIsUnnecessary) { - auto extractor = [cbContext = CbContext] + auto extractor = [cbContext = SelfContext] (google::protobuf::Any* any, TPlainStatus status) mutable { Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResult result; if (any) { @@ -466,14 +462,14 @@ void TWriteSessionImpl::DoConnect(const TDuration& delay, const TString& endpoin Y_ASSERT(connectTimeoutContext); reqSettings = TRpcRequestSettings::Make(Settings); - connectCallback = [cbContext = CbContext, + connectCallback = [cbContext = SelfContext, connectContext = connectContext](TPlainStatus&& st, typename IProcessor::TPtr&& processor) { if (auto self = cbContext->LockShared()) { self->OnConnect(std::move(st), std::move(processor), connectContext); } }; - connectTimeoutCallback = [cbContext = CbContext, connectTimeoutContext = connectTimeoutContext](bool ok) { + connectTimeoutCallback = [cbContext = SelfContext, connectTimeoutContext = connectTimeoutContext](bool ok) { if (ok) { if (auto self = cbContext->LockShared()) { self->OnConnectTimeout(connectTimeoutContext); @@ -588,7 +584,7 @@ void TWriteSessionImpl::WriteToProcessorImpl(TWriteSessionImpl::TClientMessage&& if (Aborting) { return; } - auto callback = [cbContext = CbContext, + auto callback = [cbContext = SelfContext, connectionGeneration = ConnectionGeneration](NGrpc::TGrpcStatus&& grpcStatus) { if (auto self = cbContext->LockShared()) { self->OnWriteDone(std::move(grpcStatus), connectionGeneration); @@ -609,7 +605,7 @@ void TWriteSessionImpl::ReadFromProcessor() { } prc = Processor; generation = ConnectionGeneration; - callback = [cbContext = CbContext, + callback = [cbContext = SelfContext, connectionGeneration = generation, processor = prc, serverMessage = ServerMessage] @@ -897,7 +893,7 @@ void TWriteSessionImpl::CompressImpl(TBlock&& block_) { std::shared_ptr<TBlock> blockPtr(std::make_shared<TBlock>()); blockPtr->Move(block_); - auto lambda = [cbContext = CbContext, + auto lambda = [cbContext = SelfContext, codec = Settings.Codec_, level = Settings.CompressionLevel_, isSyncCompression = !CompressionExecutor->IsAsync(), @@ -1274,7 +1270,7 @@ void TWriteSessionImpl::HandleWakeUpImpl() { if (AtomicGet(Aborting)) { return; } - auto callback = [cbContext = CbContext] (bool ok) + auto callback = [cbContext = SelfContext] (bool ok) { if (!ok) { return; diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h index aba702f049c..44db21fc72d 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h @@ -158,8 +158,7 @@ namespace NTests { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // TWriteSessionImpl -class TWriteSessionImpl : public IWriteSession, - public std::enable_shared_from_this<TWriteSessionImpl> { +class TWriteSessionImpl : public TEnableSelfContext<TWriteSessionImpl> { private: friend class TWriteSession; friend class TSimpleBlockingWriteSession; @@ -302,29 +301,27 @@ public: std::shared_ptr<TGRpcConnectionsImpl> connections, TDbDriverStatePtr dbDriverState); - TMaybe<TWriteSessionEvent::TEvent> GetEvent(bool block = false) override; + TMaybe<TWriteSessionEvent::TEvent> GetEvent(bool block = false); TVector<TWriteSessionEvent::TEvent> GetEvents(bool block = false, - TMaybe<size_t> maxEventsCount = Nothing()) override; - NThreading::TFuture<ui64> GetInitSeqNo() override; + TMaybe<size_t> maxEventsCount = Nothing()); + NThreading::TFuture<ui64> GetInitSeqNo(); void Write(TContinuationToken&& continuationToken, TStringBuf data, - TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) override; + TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()); void WriteEncoded(TContinuationToken&& continuationToken, TStringBuf data, ECodec codec, ui32 originalSize, - TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) override; + TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()); - NThreading::TFuture<void> WaitEvent() override; + NThreading::TFuture<void> WaitEvent(); // Empty maybe - block till all work is done. Otherwise block at most at closeTimeout duration. - bool Close(TDuration closeTimeout = TDuration::Max()) override; + bool Close(TDuration closeTimeout = TDuration::Max()); - TWriterCounters::TPtr GetCounters() override {Y_ABORT("Unimplemented"); } //ToDo - unimplemented; + TWriterCounters::TPtr GetCounters() {Y_ABORT("Unimplemented"); } //ToDo - unimplemented; ~TWriteSessionImpl(); // will not call close - destroy everything without acks - void SetCallbackContext(std::shared_ptr<TCallbackContext<TWriteSessionImpl>> ctx); - private: TStringBuilder LogPrefix() const; @@ -393,7 +390,6 @@ private: TStringType PrevToken; bool UpdateTokenInProgress = false; TInstant LastTokenUpdate = TInstant::Zero(); - std::shared_ptr<TCallbackContext<TWriteSessionImpl>> CbContext; std::shared_ptr<TWriteSessionEventsQueue> EventsQueue; NGrpc::IQueueClientContextPtr ClientContext; // Common client context. NGrpc::IQueueClientContextPtr ConnectContext; diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp index 989ff5b72ab..480883d2620 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/read_session_ut.cpp @@ -622,7 +622,7 @@ TSingleClusterReadSessionImpl<true>* TReadSessionImplTestSetup::GetSession() { if (!Settings.EventHandlers_.HandlersExecutor_) { Settings.EventHandlers_.HandlersExecutor(GetDefaultExecutor()); } - Session = std::make_shared<TSingleClusterReadSessionImpl<true>>( + CbContext = MakeWithCallbackContext<TSingleClusterReadSessionImpl<true>>( Settings, "db", "sessionid", @@ -632,7 +632,7 @@ TSingleClusterReadSessionImpl<true>* TReadSessionImplTestSetup::GetSession() { GetEventsQueue(), FakeContext, PartitionIdStart, PartitionIdStep); - CbContext = Session->MakeCallbackContext(); + Session = CbContext->TryGet(); } return Session.get(); } diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.cpp index 3b99df5ec4a..21b2be71948 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/ut_utils.cpp @@ -25,7 +25,7 @@ TSimpleWriteSessionTestAdapter::TSimpleWriteSessionTestAdapter(TSimpleBlockingWr ui64 TSimpleWriteSessionTestAdapter::GetAcquiredMessagesCount() const { if (Session->Writer) - return Session->Writer->Impl->MessagesAcquired; + return Session->Writer->TryGetImpl()->MessagesAcquired; else return 0; } diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp index 4b930f8fbb8..949abf9ab11 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.cpp @@ -78,7 +78,8 @@ void TReadSession::CreateClusterSessionsImpl(NPersQueue::TDeferredActions<false> AbortImpl(EStatus::ABORTED, DRIVER_IS_STOPPING_DESCRIPTION, deferred); return; } - Session = std::make_shared<NPersQueue::TSingleClusterReadSessionImpl<false>>( + + CbContext = NPersQueue::MakeWithCallbackContext<NPersQueue::TSingleClusterReadSessionImpl<false>>( Settings, DbDriverState->Database, SessionId, @@ -87,9 +88,9 @@ void TReadSession::CreateClusterSessionsImpl(NPersQueue::TDeferredActions<false> Client->CreateReadSessionConnectionProcessorFactory(), EventsQueue, context, - 1, 1); + 1, 1 + ); - CbContext = Session->MakeCallbackContext(); deferred.DeferStartSession(CbContext); } @@ -271,7 +272,7 @@ bool TReadSession::Close(TDuration timeout) { } Closing = true; - session = Session; + session = CbContext->TryGet(); } session->Close(callback); @@ -352,7 +353,7 @@ void TReadSession::AbortImpl(NPersQueue::TDeferredActions<false>&) { if (DumpCountersContext) { DumpCountersContext->Cancel(); } - Session->Abort(); + CbContext->TryGet()->Abort(); } } diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h index aa626de4b15..127fcc436f7 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/read_session.h @@ -8,8 +8,7 @@ namespace NYdb::NTopic { -class TReadSession : public IReadSession, - public std::enable_shared_from_this<TReadSession> { +class TReadSession : public IReadSession { public: TReadSession(const TReadSessionSettings& settings, std::shared_ptr<TTopicClient::TImpl> client, @@ -87,7 +86,6 @@ private: TAdaptiveLock Lock; std::shared_ptr<NPersQueue::TReadSessionEventsQueue<false>> EventsQueue; - NPersQueue::TSingleClusterReadSessionImpl<false>::TPtr Session; std::shared_ptr<NPersQueue::TCallbackContext<NPersQueue::TSingleClusterReadSessionImpl<false>>> CbContext; TVector<TTopicReadSettings> Topics; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp index 6a8a618831c..f6884c017bb 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp @@ -1,13 +1,4 @@ #include "write_session.h" -#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> -#include <library/cpp/string_utils/url/url.h> - -#include <google/protobuf/util/time_util.h> - -#include <util/generic/store_policy.h> -#include <util/generic/utility.h> -#include <util/stream/buffer.h> - namespace NYdb::NTopic { @@ -19,30 +10,27 @@ TWriteSession::TWriteSession( std::shared_ptr<TTopicClient::TImpl> client, std::shared_ptr<TGRpcConnectionsImpl> connections, TDbDriverStatePtr dbDriverState) - : Impl(std::make_shared<TWriteSessionImpl>(settings, std::move(client), std::move(connections), std::move(dbDriverState))) -{ - CbContext = std::make_shared<NPersQueue::TCallbackContext<TWriteSessionImpl>>(Impl); - Impl->SetCallbackContext(CbContext); + : TContextOwner(settings, std::move(client), std::move(connections), std::move(dbDriverState)) { } void TWriteSession::Start(const TDuration& delay) { - Impl->Start(delay); + TryGetImpl()->Start(delay); } NThreading::TFuture<ui64> TWriteSession::GetInitSeqNo() { - return Impl->GetInitSeqNo(); + return TryGetImpl()->GetInitSeqNo(); } TMaybe<TWriteSessionEvent::TEvent> TWriteSession::GetEvent(bool block) { - return Impl->EventsQueue->GetEvent(block); + return TryGetImpl()->EventsQueue->GetEvent(block); } TVector<TWriteSessionEvent::TEvent> TWriteSession::GetEvents(bool block, TMaybe<size_t> maxEventsCount) { - return Impl->EventsQueue->GetEvents(block, maxEventsCount); + return TryGetImpl()->EventsQueue->GetEvents(block, maxEventsCount); } NThreading::TFuture<void> TWriteSession::WaitEvent() { - return Impl->EventsQueue->WaitEvent(); + return TryGetImpl()->EventsQueue->WaitEvent(); } void TWriteSession::WriteEncoded(TContinuationToken&& token, TStringBuf data, ECodec codec, ui32 originalSize, @@ -52,12 +40,12 @@ void TWriteSession::WriteEncoded(TContinuationToken&& token, TStringBuf data, EC message.SeqNo(*seqNo); if (createTimestamp.Defined()) message.CreateTimestamp(*createTimestamp); - Impl->WriteInternal(std::move(token), std::move(message)); + TryGetImpl()->WriteInternal(std::move(token), std::move(message)); } void TWriteSession::WriteEncoded(TContinuationToken&& token, TWriteMessage&& message) { - Impl->WriteInternal(std::move(token), std::move(message)); + TryGetImpl()->WriteInternal(std::move(token), std::move(message)); } void TWriteSession::Write(TContinuationToken&& token, TStringBuf data, TMaybe<ui64> seqNo, @@ -67,20 +55,19 @@ void TWriteSession::Write(TContinuationToken&& token, TStringBuf data, TMaybe<ui message.SeqNo(*seqNo); if (createTimestamp.Defined()) message.CreateTimestamp(*createTimestamp); - Impl->WriteInternal(std::move(token), std::move(message)); + TryGetImpl()->WriteInternal(std::move(token), std::move(message)); } void TWriteSession::Write(TContinuationToken&& token, TWriteMessage&& message) { - Impl->WriteInternal(std::move(token), std::move(message)); + TryGetImpl()->WriteInternal(std::move(token), std::move(message)); } bool TWriteSession::Close(TDuration closeTimeout) { - return Impl->Close(closeTimeout); + return TryGetImpl()->Close(closeTimeout); } TWriteSession::~TWriteSession() { - Impl->Close(TDuration::Zero()); - CbContext->Cancel(); + TryGetImpl()->Close(TDuration::Zero()); } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h index 13fec07c33c..e8c618ba2b2 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h @@ -16,7 +16,7 @@ namespace NYdb::NTopic { // TWriteSession class TWriteSession : public IWriteSession, - public std::enable_shared_from_this<TWriteSession> { + public NPersQueue::TContextOwner<TWriteSessionImpl> { private: friend class TSimpleBlockingWriteSession; friend class TTopicClient; @@ -53,10 +53,6 @@ public: private: void Start(const TDuration& delay); - -private: - std::shared_ptr<NPersQueue::TCallbackContext<TWriteSessionImpl>> CbContext; - std::shared_ptr<TWriteSessionImpl> Impl; }; //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp index c02e463c668..8949546cf14 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp @@ -75,12 +75,8 @@ TWriteSessionImpl::TWriteSessionImpl( } -void TWriteSessionImpl::SetCallbackContext(std::shared_ptr<NPersQueue::TCallbackContext<TWriteSessionImpl>> ctx) { - CbContext = std::move(ctx); -} - void TWriteSessionImpl::Start(const TDuration& delay) { - Y_ABORT_UNLESS(CbContext); + Y_ABORT_UNLESS(SelfContext); ++ConnectionAttemptsDone; if (!Started) { with_lock(Lock) { @@ -162,7 +158,7 @@ void TWriteSessionImpl::ConnectToPreferredPartitionLocation(const TDuration& del request.set_partition_id(*Settings.PartitionId_); request.set_include_location(true); - auto extractor = [cbContext = CbContext, context = describePartitionContext](Ydb::Topic::DescribePartitionResponse* response, TPlainStatus status) mutable { + auto extractor = [cbContext = SelfContext, context = describePartitionContext](Ydb::Topic::DescribePartitionResponse* response, TPlainStatus status) mutable { Ydb::Topic::DescribePartitionResult result; if (response) response->operation().result().UnpackTo(&result); @@ -486,14 +482,14 @@ void TWriteSessionImpl::Connect(const TDuration& delay) { reqSettings = TRpcRequestSettings::Make(Settings, PreferredPartitionLocation.Endpoint); - connectCallback = [cbContext = CbContext, + connectCallback = [cbContext = SelfContext, connectContext = connectContext](TPlainStatus&& st, typename IProcessor::TPtr&& processor) { if (auto self = cbContext->LockShared()) { self->OnConnect(std::move(st), std::move(processor), connectContext); } }; - connectTimeoutCallback = [cbContext = CbContext, connectTimeoutContext = connectTimeoutContext](bool ok) { + connectTimeoutCallback = [cbContext = SelfContext, connectTimeoutContext = connectTimeoutContext](bool ok) { if (ok) { if (auto self = cbContext->LockShared()) { self->OnConnectTimeout(connectTimeoutContext); @@ -621,7 +617,7 @@ void TWriteSessionImpl::WriteToProcessorImpl(TWriteSessionImpl::TClientMessage&& if (Aborting) { return; } - auto callback = [cbContext = CbContext, + auto callback = [cbContext = SelfContext, connectionGeneration = ConnectionGeneration](NGrpc::TGrpcStatus&& grpcStatus) { if (auto self = cbContext->LockShared()) { self->OnWriteDone(std::move(grpcStatus), connectionGeneration); @@ -642,7 +638,7 @@ void TWriteSessionImpl::ReadFromProcessor() { } prc = Processor; generation = ConnectionGeneration; - callback = [cbContext = CbContext, + callback = [cbContext = SelfContext, connectionGeneration = generation, processor = prc, serverMessage = ServerMessage] @@ -958,7 +954,7 @@ void TWriteSessionImpl::CompressImpl(TBlock&& block_) { std::shared_ptr<TBlock> blockPtr(std::make_shared<TBlock>()); blockPtr->Move(block_); - auto lambda = [cbContext = CbContext, + auto lambda = [cbContext = SelfContext, codec = Settings.Codec_, level = Settings.CompressionLevel_, isSyncCompression = !CompressionExecutor->IsAsync(), @@ -1277,7 +1273,7 @@ void TWriteSessionImpl::HandleWakeUpImpl() { if (AtomicGet(Aborting)) { return; } - auto callback = [cbContext = CbContext] (bool ok) + auto callback = [cbContext = SelfContext] (bool ok) { if (!ok) { return; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h index cc8da2d38d1..9c5451ece91 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h @@ -149,8 +149,7 @@ struct TMemoryUsageChange { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // TWriteSessionImpl -class TWriteSessionImpl : public IWriteSession, - public std::enable_shared_from_this<TWriteSessionImpl> { +class TWriteSessionImpl : public NPersQueue::TEnableSelfContext<TWriteSessionImpl> { private: friend class TWriteSession; friend class TSimpleBlockingWriteSession; @@ -322,41 +321,39 @@ public: std::shared_ptr<TGRpcConnectionsImpl> connections, TDbDriverStatePtr dbDriverState); - TMaybe<TWriteSessionEvent::TEvent> GetEvent(bool block = false) override; + TMaybe<TWriteSessionEvent::TEvent> GetEvent(bool block = false); TVector<TWriteSessionEvent::TEvent> GetEvents(bool block = false, - TMaybe<size_t> maxEventsCount = Nothing()) override; - NThreading::TFuture<ui64> GetInitSeqNo() override; + TMaybe<size_t> maxEventsCount = Nothing()); + NThreading::TFuture<ui64> GetInitSeqNo(); - void Write(TContinuationToken&& continuationToken, TWriteMessage&& message) override; + void Write(TContinuationToken&& continuationToken, TWriteMessage&& message); void Write(TContinuationToken&&, TStringBuf, TMaybe<ui64> seqNo = Nothing(), - TMaybe<TInstant> createTimestamp = Nothing()) override { + TMaybe<TInstant> createTimestamp = Nothing()) { Y_UNUSED(seqNo); Y_UNUSED(createTimestamp); Y_ABORT("Do not use this method"); }; - void WriteEncoded(TContinuationToken&& continuationToken, TWriteMessage&& message) override; + void WriteEncoded(TContinuationToken&& continuationToken, TWriteMessage&& message); void WriteEncoded(TContinuationToken&&, TStringBuf, ECodec, ui32, - TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) override { + TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) { Y_UNUSED(seqNo); Y_UNUSED(createTimestamp); Y_ABORT("Do not use this method"); } - NThreading::TFuture<void> WaitEvent() override; + NThreading::TFuture<void> WaitEvent(); // Empty maybe - block till all work is done. Otherwise block at most at closeTimeout duration. - bool Close(TDuration closeTimeout = TDuration::Max()) override; + bool Close(TDuration closeTimeout = TDuration::Max()); - TWriterCounters::TPtr GetCounters() override {Y_ABORT("Unimplemented"); } //ToDo - unimplemented; + TWriterCounters::TPtr GetCounters() {Y_ABORT("Unimplemented"); } //ToDo - unimplemented; ~TWriteSessionImpl(); // will not call close - destroy everything without acks - void SetCallbackContext(std::shared_ptr<NPersQueue::TCallbackContext<TWriteSessionImpl>> ctx); - private: TStringBuilder LogPrefix() const; @@ -426,7 +423,6 @@ private: TStringType PrevToken; bool UpdateTokenInProgress = false; TInstant LastTokenUpdate = TInstant::Zero(); - std::shared_ptr<NPersQueue::TCallbackContext<TWriteSessionImpl>> CbContext; std::shared_ptr<TWriteSessionEventsQueue> EventsQueue; NGrpc::IQueueClientContextPtr ClientContext; // Common client context. NGrpc::IQueueClientContextPtr ConnectContext; |