diff options
author | ildar-khisam <ikhis@ydb.tech> | 2023-12-13 15:27:47 +0300 |
---|---|---|
committer | ildar-khisam <ikhis@ydb.tech> | 2023-12-13 16:36:49 +0300 |
commit | 5c33ae007ff2467953e59126c1e2035a058949de (patch) | |
tree | 5341db64c49e095129df8236b21e72e936886047 | |
parent | b0943903218f95edba19ac132ce7b358dd1e297b (diff) | |
download | ydb-5c33ae007ff2467953e59126c1e2035a058949de.tar.gz |
serial execution in simple write session
copy changes in topic write sdk to pqv1
11 files changed, 179 insertions, 134 deletions
diff --git a/ydb/core/persqueue/ut/splitmerge_ut.cpp b/ydb/core/persqueue/ut/splitmerge_ut.cpp index ecb2e1c2d2..f5fc57fb87 100644 --- a/ydb/core/persqueue/ut/splitmerge_ut.cpp +++ b/ydb/core/persqueue/ut/splitmerge_ut.cpp @@ -112,10 +112,6 @@ std::shared_ptr<ISimpleBlockingWriteSession> CreateWriteSession(TTopicClient& cl writeSettings.MessageGroupId(producer); } - writeSettings.EventHandlers_.AcksHandler([&](TWriteSessionEvent::TAcksEvent& ev) { - Cerr << ">>>>> Received TWriteSessionEvent::TAcksEvent " << ev.DebugString() << Endl << Flush; - }); - return client.CreateSimpleBlockingWriteSession(writeSettings); } @@ -145,8 +141,8 @@ struct TTestReadSession { for (size_t i = 0u; i < messages.size(); ++i) { auto& message = messages[i]; - Cerr << ">>>>> Received TDataReceivedEvent message partitionId=" << message.GetPartitionSession()->GetPartitionId() - << ", message=" << message.GetData() + Cerr << ">>>>> Received TDataReceivedEvent message partitionId=" << message.GetPartitionSession()->GetPartitionId() + << ", message=" << message.GetData() << ", seqNo=" << message.GetSeqNo() << ", offset=" << message.GetOffset() << Endl; diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp index 0a4ee65ae3..64a44c8068 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp @@ -1,5 +1,8 @@ #include "write_session.h" + #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/log_lazy.h> + #include <library/cpp/string_utils/url/url.h> #include <util/generic/store_policy.h> @@ -67,15 +70,24 @@ TSimpleBlockingWriteSession::TSimpleBlockingWriteSession( std::shared_ptr<TGRpcConnectionsImpl> connections, TDbDriverStatePtr dbDriverState ) { - auto alteredSettings = settings; - alteredSettings.EventHandlers_.AcksHandler_ = [this](TWriteSessionEvent::TAcksEvent& event) {this->HandleAck(event); }; - alteredSettings.EventHandlers_.ReadyToAcceptHander_ = [this](TWriteSessionEvent::TReadyToAcceptEvent& event) - {this->HandleReady(event); }; - alteredSettings.EventHandlers_.SessionClosedHandler_ = [this](const TSessionClosedEvent& event) {this->HandleClosed(event); }; - - Writer = std::make_shared<TWriteSession>( - alteredSettings, client, connections, dbDriverState - ); + auto subSettings = settings; + if (settings.EventHandlers_.AcksHandler_) { + LOG_LAZY(dbDriverState->Log, TLOG_WARNING, "TSimpleBlockingWriteSession: Cannot use AcksHandler, resetting."); + subSettings.EventHandlers_.AcksHandler({}); + } + if (settings.EventHandlers_.ReadyToAcceptHandler_) { + LOG_LAZY(dbDriverState->Log, TLOG_WARNING, "TSimpleBlockingWriteSession: Cannot use ReadyToAcceptHandler, resetting."); + subSettings.EventHandlers_.ReadyToAcceptHandler({}); + } + if (settings.EventHandlers_.SessionClosedHandler_) { + LOG_LAZY(dbDriverState->Log, TLOG_WARNING, "TSimpleBlockingWriteSession: Cannot use SessionClosedHandler, resetting."); + subSettings.EventHandlers_.SessionClosedHandler({}); + } + if (settings.EventHandlers_.CommonHandler_) { + LOG_LAZY(dbDriverState->Log, TLOG_WARNING, "TSimpleBlockingWriteSession: Cannot use CommonHandler, resetting."); + subSettings.EventHandlers_.CommonHandler({}); + } + Writer = std::make_shared<TWriteSession>(subSettings, client, connections, dbDriverState); Writer->Start(TDuration::Max()); } @@ -86,9 +98,6 @@ ui64 TSimpleBlockingWriteSession::GetInitSeqNo() { bool TSimpleBlockingWriteSession::Write( TStringBuf data, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp, const TDuration& blockTimeout ) { - if (!IsAlive()) - return false; - auto continuationToken = WaitForToken(blockTimeout); if (continuationToken.Defined()) { Writer->Write(std::move(*continuationToken), std::move(data), seqNo, createTimestamp); @@ -98,27 +107,33 @@ bool TSimpleBlockingWriteSession::Write( } TMaybe<TContinuationToken> TSimpleBlockingWriteSession::WaitForToken(const TDuration& timeout) { - auto startTime = TInstant::Now(); + TInstant startTime = TInstant::Now(); TDuration remainingTime = timeout; + TMaybe<TContinuationToken> token = Nothing(); - while(!token.Defined() && remainingTime > TDuration::Zero()) { - with_lock(Lock) { - if (!ContinueTokens.empty()) { - token = std::move(ContinueTokens.front()); - ContinueTokens.pop(); + + while (IsAlive() && remainingTime > TDuration::Zero()) { + Writer->WaitEvent().Wait(remainingTime); + + for (auto event : Writer->GetEvents()) { + if (auto* readyEvent = std::get_if<TWriteSessionEvent::TReadyToAcceptEvent>(&event)) { + Y_ABORT_UNLESS(token.Empty()); + token = std::move(readyEvent->ContinuationToken); + } else if (auto* ackEvent = std::get_if<TWriteSessionEvent::TAcksEvent>(&event)) { + // discard + } else if (auto* closeSessionEvent = std::get_if<TSessionClosedEvent>(&event)) { + Closed.store(true); + return Nothing(); } } - if (!IsAlive()) - return Nothing(); if (token.Defined()) { - return std::move(*token); - } - else { - remainingTime = timeout - (TInstant::Now() - startTime); - Sleep(Min(remainingTime, TDuration::MilliSeconds(100))); + return token; } + + remainingTime = timeout - (TInstant::Now() - startTime); } + return Nothing(); } @@ -128,28 +143,11 @@ TWriterCounters::TPtr TSimpleBlockingWriteSession::GetCounters() { bool TSimpleBlockingWriteSession::IsAlive() const { - bool closed = false; - with_lock(Lock) { - closed = Closed; - } - return !closed; -} - -void TSimpleBlockingWriteSession::HandleAck(TWriteSessionEvent::TAcksEvent& event) { - Y_UNUSED(event); + return !Closed.load(); } -void TSimpleBlockingWriteSession::HandleReady(TWriteSessionEvent::TReadyToAcceptEvent& event) { - with_lock(Lock) { - ContinueTokens.emplace(std::move(event.ContinuationToken)); - } -} -void TSimpleBlockingWriteSession::HandleClosed(const TSessionClosedEvent&) { - with_lock(Lock) { - Closed = true; - } -} bool TSimpleBlockingWriteSession::Close(TDuration closeTimeout) { + Closed.store(true); return Writer->Close(std::move(closeTimeout)); } diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h index 8057d4560e..4e6cad7c5d 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h @@ -96,9 +96,7 @@ private: void HandleReady(TWriteSessionEvent::TReadyToAcceptEvent&); void HandleClosed(const TSessionClosedEvent&); - TAdaptiveLock Lock; - std::queue<TContinuationToken> ContinueTokens; - bool Closed = false; + std::atomic_bool Closed = false; }; diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp index 3adf1dfeda..5a514ede8f 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp @@ -51,7 +51,6 @@ TWriteSessionImpl::TWriteSessionImpl( , Connections(std::move(connections)) , DbDriverState(std::move(dbDriverState)) , PrevToken(DbDriverState->CredentialsProvider ? DbDriverState->CredentialsProvider->GetAuthInfo() : "") - , EventsQueue(std::make_shared<TWriteSessionEventsQueue>(Settings)) , InitSeqNoPromise(NThreading::NewPromise<ui64>()) , WakeupInterval( Settings.BatchFlushInterval_.GetOrElse(TDuration::Zero()) ? @@ -77,6 +76,25 @@ TWriteSessionImpl::TWriteSessionImpl( void TWriteSessionImpl::Start(const TDuration& delay) { Y_ABORT_UNLESS(SelfContext); + + if (!EventsQueue) { +#define WRAP_HANDLER(type, handler, ...) \ + if (auto h = Settings.EventHandlers_.handler##_) { \ + Settings.EventHandlers_.handler([ctx = SelfContext, h = std::move(h)](__VA_ARGS__ type& ev){ \ + if (auto self = ctx->LockShared()) { \ + h(ev); \ + } \ + }); \ + } + WRAP_HANDLER(TWriteSessionEvent::TAcksEvent, AcksHandler); + WRAP_HANDLER(TWriteSessionEvent::TReadyToAcceptEvent, ReadyToAcceptHandler); + WRAP_HANDLER(TSessionClosedEvent, SessionClosedHandler, const); + WRAP_HANDLER(TWriteSessionEvent::TEvent, CommonHandler); +#undef WRAP_HANDLER + + EventsQueue = std::make_shared<TWriteSessionEventsQueue>(Settings); + } + ++ConnectionAttemptsDone; if (!Started) { with_lock(Lock) { @@ -374,7 +392,7 @@ void TWriteSessionImpl::WriteInternal( readyToAccept = OnMemoryUsageChangedImpl(bufferSize).NowOk; } if (readyToAccept) { - EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); + EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()}); } } @@ -740,7 +758,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess OnErrorResolved(); if (!FirstTokenSent) { - result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); + result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()}); FirstTokenSent = true; } // Kickstart send after session reestablishment @@ -777,7 +795,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess }); if (CleanupOnAcknowledged(GetIdImpl(sequenceNumber))) { - result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); + result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()}); } } //EventsQueue->PushEvent(std::move(acksEvent)); @@ -925,7 +943,7 @@ void TWriteSessionImpl::OnCompressed(TBlock&& block, bool isSyncCompression) { memoryUsage = OnCompressedImpl(std::move(block)); } if (memoryUsage.NowOk && !memoryUsage.WasOk) { - EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); + EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()}); } } diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h index 8101341645..bda181433a 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h @@ -106,18 +106,20 @@ private: struct THandlersVisitor : public TParent::TBaseHandlersVisitor { using TParent::TBaseHandlersVisitor::TBaseHandlersVisitor; -#define DECLARE_HANDLER(type, handler, answer) \ - bool operator()(type& event) { \ - if (Settings.EventHandlers_.handler) { \ - Settings.EventHandlers_.handler(event); \ - return answer; \ - } \ - return false; \ - } \ +#define DECLARE_HANDLER(type, handler, answer) \ + bool operator()(type&) { \ + if (this->PushHandler<type>( \ + std::move(TParent::TBaseHandlersVisitor::Event), \ + this->Settings.EventHandlers_.handler, \ + this->Settings.EventHandlers_.CommonHandler_)) { \ + return answer; \ + } \ + return false; \ + } \ /**/ DECLARE_HANDLER(TWriteSessionEvent::TAcksEvent, AcksHandler_, true); - DECLARE_HANDLER(TWriteSessionEvent::TReadyToAcceptEvent, ReadyToAcceptHander_, true); + DECLARE_HANDLER(TWriteSessionEvent::TReadyToAcceptEvent, ReadyToAcceptHandler_, true); DECLARE_HANDLER(TSessionClosedEvent, SessionClosedHandler_, false); // Not applied #undef DECLARE_HANDLER @@ -125,7 +127,6 @@ private: bool Visit() { return std::visit(*this, Event); } - }; bool ApplyHandler(TEventInfo& eventInfo) { @@ -158,7 +159,8 @@ namespace NTests { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // TWriteSessionImpl -class TWriteSessionImpl : public TEnableSelfContext<TWriteSessionImpl> { +class TWriteSessionImpl : public TContinuationTokenIssuer, + public TEnableSelfContext<TWriteSessionImpl> { private: friend class TWriteSession; friend class TSimpleBlockingWriteSession; @@ -320,6 +322,10 @@ public: TWriterCounters::TPtr GetCounters() {Y_ABORT("Unimplemented"); } //ToDo - unimplemented; + const TWriteSessionSettings& GetSettings() const { + return Settings; + } + ~TWriteSessionImpl(); // will not call close - destroy everything without acks private: diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h index fc42bead04..e7fe810460 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h @@ -1,6 +1,7 @@ #pragma once #include <ydb/public/api/grpc/draft/ydb_persqueue_v1.grpc.pb.h> #include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> +#include <ydb/public/sdk/cpp/client/ydb_types/exceptions/exceptions.h> #include <library/cpp/monlib/dynamic_counters/counters.h> #include <library/cpp/logger/log.h> @@ -356,12 +357,35 @@ enum class EClusterDiscoveryMode { Off }; -class TContinuationToken : public TMoveOnly { - friend class TWriteSessionImpl; +class TContinuationToken : public TNonCopyable { + friend class TContinuationTokenIssuer; + + bool Valid = true; + +public: + TContinuationToken& operator=(TContinuationToken&& other) { + if (!other.Valid) { + ythrow TContractViolation("Cannot move invalid token"); + } + Valid = std::exchange(other.Valid, false); + return *this; + } + + TContinuationToken(TContinuationToken&& other) { + *this = std::move(other); + } + private: TContinuationToken() = default; }; +class TContinuationTokenIssuer { +protected: + static auto IssueContinuationToken() { + return TContinuationToken{}; + } +}; + struct TWriterCounters : public TThrRefBase { using TSelf = TWriterCounters; using TPtr = TIntrusivePtr<TSelf>; @@ -1002,10 +1026,21 @@ struct TWriteSessionEvent { //! Indicates that a writer is ready to accept new message(s). //! Continuation token should be kept and then used in write methods. struct TReadyToAcceptEvent { - TContinuationToken ContinuationToken; + mutable TContinuationToken ContinuationToken; - TString DebugString() const; + TReadyToAcceptEvent() = delete; + TReadyToAcceptEvent(TContinuationToken&& t) : ContinuationToken(std::move(t)) { + } + TReadyToAcceptEvent(TReadyToAcceptEvent&&) = default; + TReadyToAcceptEvent(const TReadyToAcceptEvent& other) : ContinuationToken(std::move(other.ContinuationToken)) { + } + TReadyToAcceptEvent& operator=(TReadyToAcceptEvent&&) = default; + TReadyToAcceptEvent& operator=(const TReadyToAcceptEvent& other) { + ContinuationToken = std::move(other.ContinuationToken); + return *this; + } + TString DebugString() const; }; using TEvent = std::variant<TAcksEvent, TReadyToAcceptEvent, TSessionClosedEvent>; @@ -1103,7 +1138,7 @@ struct TWriteSessionSettings : public TRequestSettings<TWriteSessionSettings> { //! Function to handle ReadyToAccept event. //! If this handler is set, write these events will be handled by handler, //! otherwise sent to TWriteSession::GetEvent(). - FLUENT_SETTING(TReadyToAcceptHandler, ReadyToAcceptHander); + FLUENT_SETTING(TReadyToAcceptHandler, ReadyToAcceptHandler); //! Function to handle close session events. //! If this handler is set, close session events will be handled by handler diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp index f6884c017b..91962449f9 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp @@ -1,5 +1,7 @@ #include "write_session.h" +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/log_lazy.h> + namespace NYdb::NTopic { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -79,15 +81,24 @@ TSimpleBlockingWriteSession::TSimpleBlockingWriteSession( std::shared_ptr<TGRpcConnectionsImpl> connections, TDbDriverStatePtr dbDriverState ) { - auto alteredSettings = settings; - alteredSettings.EventHandlers_.AcksHandler_ = [this](TWriteSessionEvent::TAcksEvent& event) {this->HandleAck(event); }; - alteredSettings.EventHandlers_.ReadyToAcceptHander_ = [this](TWriteSessionEvent::TReadyToAcceptEvent& event) - {this->HandleReady(event); }; - alteredSettings.EventHandlers_.SessionClosedHandler_ = [this](const TSessionClosedEvent& event) {this->HandleClosed(event); }; - - Writer = std::make_shared<TWriteSession>( - alteredSettings, client, connections, dbDriverState - ); + auto subSettings = settings; + if (settings.EventHandlers_.AcksHandler_) { + LOG_LAZY(dbDriverState->Log, TLOG_WARNING, "TSimpleBlockingWriteSession: Cannot use AcksHandler, resetting."); + subSettings.EventHandlers_.AcksHandler({}); + } + if (settings.EventHandlers_.ReadyToAcceptHandler_) { + LOG_LAZY(dbDriverState->Log, TLOG_WARNING, "TSimpleBlockingWriteSession: Cannot use ReadyToAcceptHandler, resetting."); + subSettings.EventHandlers_.ReadyToAcceptHandler({}); + } + if (settings.EventHandlers_.SessionClosedHandler_) { + LOG_LAZY(dbDriverState->Log, TLOG_WARNING, "TSimpleBlockingWriteSession: Cannot use SessionClosedHandler, resetting."); + subSettings.EventHandlers_.SessionClosedHandler({}); + } + if (settings.EventHandlers_.CommonHandler_) { + LOG_LAZY(dbDriverState->Log, TLOG_WARNING, "TSimpleBlockingWriteSession: Cannot use CommonHandler, resetting."); + subSettings.EventHandlers_.CommonHandler({}); + } + Writer = std::make_shared<TWriteSession>(subSettings, client, connections, dbDriverState); Writer->Start(TDuration::Max()); } @@ -98,23 +109,15 @@ ui64 TSimpleBlockingWriteSession::GetInitSeqNo() { bool TSimpleBlockingWriteSession::Write( TStringBuf data, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp, const TDuration& blockTimeout ) { - if (!IsAlive()) - return false; - - auto continuationToken = WaitForToken(blockTimeout); - if (continuationToken.Defined()) { - Writer->Write(std::move(*continuationToken), std::move(data), seqNo, createTimestamp); - return true; - } - return false; + auto message = TWriteMessage(std::move(data)) + .SeqNo(seqNo) + .CreateTimestamp(createTimestamp); + return Write(std::move(message), blockTimeout); } bool TSimpleBlockingWriteSession::Write( TWriteMessage&& message, const TDuration& blockTimeout ) { - if (!IsAlive()) - return false; - auto continuationToken = WaitForToken(blockTimeout); if (continuationToken.Defined()) { Writer->Write(std::move(*continuationToken), std::move(message)); @@ -124,27 +127,33 @@ bool TSimpleBlockingWriteSession::Write( } TMaybe<TContinuationToken> TSimpleBlockingWriteSession::WaitForToken(const TDuration& timeout) { - auto startTime = TInstant::Now(); + TInstant startTime = TInstant::Now(); TDuration remainingTime = timeout; + TMaybe<TContinuationToken> token = Nothing(); - while(!token.Defined() && remainingTime > TDuration::Zero()) { - with_lock(Lock) { - if (!ContinueTokens.empty()) { - token = std::move(ContinueTokens.front()); - ContinueTokens.pop(); + + while (IsAlive() && remainingTime > TDuration::Zero()) { + Writer->WaitEvent().Wait(remainingTime); + + for (auto event : Writer->GetEvents()) { + if (auto* readyEvent = std::get_if<TWriteSessionEvent::TReadyToAcceptEvent>(&event)) { + Y_ABORT_UNLESS(token.Empty()); + token = std::move(readyEvent->ContinuationToken); + } else if (auto* ackEvent = std::get_if<TWriteSessionEvent::TAcksEvent>(&event)) { + // discard + } else if (auto* closeSessionEvent = std::get_if<TSessionClosedEvent>(&event)) { + Closed.store(true); + return Nothing(); } } - if (!IsAlive()) - return Nothing(); if (token.Defined()) { - return std::move(*token); - } - else { - remainingTime = timeout - (TInstant::Now() - startTime); - Sleep(Min(remainingTime, TDuration::MilliSeconds(100))); + return token; } + + remainingTime = timeout - (TInstant::Now() - startTime); } + return Nothing(); } @@ -152,30 +161,12 @@ TWriterCounters::TPtr TSimpleBlockingWriteSession::GetCounters() { return Writer->GetCounters(); } - bool TSimpleBlockingWriteSession::IsAlive() const { - bool closed = false; - with_lock(Lock) { - closed = Closed; - } - return !closed; + return !Closed.load(); } -void TSimpleBlockingWriteSession::HandleAck(TWriteSessionEvent::TAcksEvent& event) { - Y_UNUSED(event); -} - -void TSimpleBlockingWriteSession::HandleReady(TWriteSessionEvent::TReadyToAcceptEvent& event) { - with_lock(Lock) { - ContinueTokens.emplace(std::move(event.ContinuationToken)); - } -} -void TSimpleBlockingWriteSession::HandleClosed(const TSessionClosedEvent&) { - with_lock(Lock) { - Closed = true; - } -} bool TSimpleBlockingWriteSession::Close(TDuration closeTimeout) { + Closed.store(true); return Writer->Close(std::move(closeTimeout)); } diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h index e8c618ba2b..b8c725f9c9 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h @@ -9,6 +9,7 @@ #include <util/generic/buffer.h> +#include <atomic> namespace NYdb::NTopic { @@ -87,9 +88,7 @@ private: void HandleReady(TWriteSessionEvent::TReadyToAcceptEvent&); void HandleClosed(const TSessionClosedEvent&); - TAdaptiveLock Lock; - std::queue<TContinuationToken> ContinueTokens; - bool Closed = false; + std::atomic_bool Closed = false; }; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp index 20d1484e40..c89cdf1eca 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp @@ -87,7 +87,7 @@ void TWriteSessionImpl::Start(const TDuration& delay) { }); \ } WRAP_HANDLER(TWriteSessionEvent::TAcksEvent, AcksHandler); - WRAP_HANDLER(TWriteSessionEvent::TReadyToAcceptEvent, ReadyToAcceptHander); + WRAP_HANDLER(TWriteSessionEvent::TReadyToAcceptEvent, ReadyToAcceptHandler); WRAP_HANDLER(TSessionClosedEvent, SessionClosedHandler, const); WRAP_HANDLER(TWriteSessionEvent::TEvent, CommonHandler); #undef WRAP_HANDLER diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h index 5c6f9c41dd..9373e9c811 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h @@ -115,7 +115,7 @@ private: } \ /**/ DECLARE_HANDLER(TWriteSessionEvent::TAcksEvent, AcksHandler_, true); - DECLARE_HANDLER(TWriteSessionEvent::TReadyToAcceptEvent, ReadyToAcceptHander_, true); + DECLARE_HANDLER(TWriteSessionEvent::TReadyToAcceptEvent, ReadyToAcceptHandler_, true); DECLARE_HANDLER(TSessionClosedEvent, SessionClosedHandler_, false); // Not applied #undef DECLARE_HANDLER @@ -355,6 +355,10 @@ public: TWriterCounters::TPtr GetCounters() {Y_ABORT("Unimplemented"); } //ToDo - unimplemented; + const TWriteSessionSettings& GetSettings() const { + return Settings; + } + ~TWriteSessionImpl(); // will not call close - destroy everything without acks private: diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h index f1080b3fb7..546483d088 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h @@ -1369,7 +1369,7 @@ struct TWriteSessionSettings : public TRequestSettings<TWriteSessionSettings> { //! Function to handle ReadyToAccept event. //! If this handler is set, write these events will be handled by handler, //! otherwise sent to TWriteSession::GetEvent(). - FLUENT_SETTING(TReadyToAcceptHandler, ReadyToAcceptHander); + FLUENT_SETTING(TReadyToAcceptHandler, ReadyToAcceptHandler); //! Function to handle close session events. //! If this handler is set, close session events will be handled by handler |