aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorildar-khisam <ikhis@ydb.tech>2023-12-13 15:27:47 +0300
committerildar-khisam <ikhis@ydb.tech>2023-12-13 16:36:49 +0300
commit5c33ae007ff2467953e59126c1e2035a058949de (patch)
tree5341db64c49e095129df8236b21e72e936886047
parentb0943903218f95edba19ac132ce7b358dd1e297b (diff)
downloadydb-5c33ae007ff2467953e59126c1e2035a058949de.tar.gz
serial execution in simple write session
copy changes in topic write sdk to pqv1
-rw-r--r--ydb/core/persqueue/ut/splitmerge_ut.cpp8
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp86
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp28
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h28
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h45
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp99
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h5
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h6
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/topic.h2
11 files changed, 179 insertions, 134 deletions
diff --git a/ydb/core/persqueue/ut/splitmerge_ut.cpp b/ydb/core/persqueue/ut/splitmerge_ut.cpp
index ecb2e1c2d2..f5fc57fb87 100644
--- a/ydb/core/persqueue/ut/splitmerge_ut.cpp
+++ b/ydb/core/persqueue/ut/splitmerge_ut.cpp
@@ -112,10 +112,6 @@ std::shared_ptr<ISimpleBlockingWriteSession> CreateWriteSession(TTopicClient& cl
writeSettings.MessageGroupId(producer);
}
- writeSettings.EventHandlers_.AcksHandler([&](TWriteSessionEvent::TAcksEvent& ev) {
- Cerr << ">>>>> Received TWriteSessionEvent::TAcksEvent " << ev.DebugString() << Endl << Flush;
- });
-
return client.CreateSimpleBlockingWriteSession(writeSettings);
}
@@ -145,8 +141,8 @@ struct TTestReadSession {
for (size_t i = 0u; i < messages.size(); ++i) {
auto& message = messages[i];
- Cerr << ">>>>> Received TDataReceivedEvent message partitionId=" << message.GetPartitionSession()->GetPartitionId()
- << ", message=" << message.GetData()
+ Cerr << ">>>>> Received TDataReceivedEvent message partitionId=" << message.GetPartitionSession()->GetPartitionId()
+ << ", message=" << message.GetData()
<< ", seqNo=" << message.GetSeqNo()
<< ", offset=" << message.GetOffset()
<< Endl;
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp
index 0a4ee65ae3..64a44c8068 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.cpp
@@ -1,5 +1,8 @@
#include "write_session.h"
+
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h>
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/log_lazy.h>
+
#include <library/cpp/string_utils/url/url.h>
#include <util/generic/store_policy.h>
@@ -67,15 +70,24 @@ TSimpleBlockingWriteSession::TSimpleBlockingWriteSession(
std::shared_ptr<TGRpcConnectionsImpl> connections,
TDbDriverStatePtr dbDriverState
) {
- auto alteredSettings = settings;
- alteredSettings.EventHandlers_.AcksHandler_ = [this](TWriteSessionEvent::TAcksEvent& event) {this->HandleAck(event); };
- alteredSettings.EventHandlers_.ReadyToAcceptHander_ = [this](TWriteSessionEvent::TReadyToAcceptEvent& event)
- {this->HandleReady(event); };
- alteredSettings.EventHandlers_.SessionClosedHandler_ = [this](const TSessionClosedEvent& event) {this->HandleClosed(event); };
-
- Writer = std::make_shared<TWriteSession>(
- alteredSettings, client, connections, dbDriverState
- );
+ auto subSettings = settings;
+ if (settings.EventHandlers_.AcksHandler_) {
+ LOG_LAZY(dbDriverState->Log, TLOG_WARNING, "TSimpleBlockingWriteSession: Cannot use AcksHandler, resetting.");
+ subSettings.EventHandlers_.AcksHandler({});
+ }
+ if (settings.EventHandlers_.ReadyToAcceptHandler_) {
+ LOG_LAZY(dbDriverState->Log, TLOG_WARNING, "TSimpleBlockingWriteSession: Cannot use ReadyToAcceptHandler, resetting.");
+ subSettings.EventHandlers_.ReadyToAcceptHandler({});
+ }
+ if (settings.EventHandlers_.SessionClosedHandler_) {
+ LOG_LAZY(dbDriverState->Log, TLOG_WARNING, "TSimpleBlockingWriteSession: Cannot use SessionClosedHandler, resetting.");
+ subSettings.EventHandlers_.SessionClosedHandler({});
+ }
+ if (settings.EventHandlers_.CommonHandler_) {
+ LOG_LAZY(dbDriverState->Log, TLOG_WARNING, "TSimpleBlockingWriteSession: Cannot use CommonHandler, resetting.");
+ subSettings.EventHandlers_.CommonHandler({});
+ }
+ Writer = std::make_shared<TWriteSession>(subSettings, client, connections, dbDriverState);
Writer->Start(TDuration::Max());
}
@@ -86,9 +98,6 @@ ui64 TSimpleBlockingWriteSession::GetInitSeqNo() {
bool TSimpleBlockingWriteSession::Write(
TStringBuf data, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp, const TDuration& blockTimeout
) {
- if (!IsAlive())
- return false;
-
auto continuationToken = WaitForToken(blockTimeout);
if (continuationToken.Defined()) {
Writer->Write(std::move(*continuationToken), std::move(data), seqNo, createTimestamp);
@@ -98,27 +107,33 @@ bool TSimpleBlockingWriteSession::Write(
}
TMaybe<TContinuationToken> TSimpleBlockingWriteSession::WaitForToken(const TDuration& timeout) {
- auto startTime = TInstant::Now();
+ TInstant startTime = TInstant::Now();
TDuration remainingTime = timeout;
+
TMaybe<TContinuationToken> token = Nothing();
- while(!token.Defined() && remainingTime > TDuration::Zero()) {
- with_lock(Lock) {
- if (!ContinueTokens.empty()) {
- token = std::move(ContinueTokens.front());
- ContinueTokens.pop();
+
+ while (IsAlive() && remainingTime > TDuration::Zero()) {
+ Writer->WaitEvent().Wait(remainingTime);
+
+ for (auto event : Writer->GetEvents()) {
+ if (auto* readyEvent = std::get_if<TWriteSessionEvent::TReadyToAcceptEvent>(&event)) {
+ Y_ABORT_UNLESS(token.Empty());
+ token = std::move(readyEvent->ContinuationToken);
+ } else if (auto* ackEvent = std::get_if<TWriteSessionEvent::TAcksEvent>(&event)) {
+ // discard
+ } else if (auto* closeSessionEvent = std::get_if<TSessionClosedEvent>(&event)) {
+ Closed.store(true);
+ return Nothing();
}
}
- if (!IsAlive())
- return Nothing();
if (token.Defined()) {
- return std::move(*token);
- }
- else {
- remainingTime = timeout - (TInstant::Now() - startTime);
- Sleep(Min(remainingTime, TDuration::MilliSeconds(100)));
+ return token;
}
+
+ remainingTime = timeout - (TInstant::Now() - startTime);
}
+
return Nothing();
}
@@ -128,28 +143,11 @@ TWriterCounters::TPtr TSimpleBlockingWriteSession::GetCounters() {
bool TSimpleBlockingWriteSession::IsAlive() const {
- bool closed = false;
- with_lock(Lock) {
- closed = Closed;
- }
- return !closed;
-}
-
-void TSimpleBlockingWriteSession::HandleAck(TWriteSessionEvent::TAcksEvent& event) {
- Y_UNUSED(event);
+ return !Closed.load();
}
-void TSimpleBlockingWriteSession::HandleReady(TWriteSessionEvent::TReadyToAcceptEvent& event) {
- with_lock(Lock) {
- ContinueTokens.emplace(std::move(event.ContinuationToken));
- }
-}
-void TSimpleBlockingWriteSession::HandleClosed(const TSessionClosedEvent&) {
- with_lock(Lock) {
- Closed = true;
- }
-}
bool TSimpleBlockingWriteSession::Close(TDuration closeTimeout) {
+ Closed.store(true);
return Writer->Close(std::move(closeTimeout));
}
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h
index 8057d4560e..4e6cad7c5d 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session.h
@@ -96,9 +96,7 @@ private:
void HandleReady(TWriteSessionEvent::TReadyToAcceptEvent&);
void HandleClosed(const TSessionClosedEvent&);
- TAdaptiveLock Lock;
- std::queue<TContinuationToken> ContinueTokens;
- bool Closed = false;
+ std::atomic_bool Closed = false;
};
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp
index 3adf1dfeda..5a514ede8f 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp
@@ -51,7 +51,6 @@ TWriteSessionImpl::TWriteSessionImpl(
, Connections(std::move(connections))
, DbDriverState(std::move(dbDriverState))
, PrevToken(DbDriverState->CredentialsProvider ? DbDriverState->CredentialsProvider->GetAuthInfo() : "")
- , EventsQueue(std::make_shared<TWriteSessionEventsQueue>(Settings))
, InitSeqNoPromise(NThreading::NewPromise<ui64>())
, WakeupInterval(
Settings.BatchFlushInterval_.GetOrElse(TDuration::Zero()) ?
@@ -77,6 +76,25 @@ TWriteSessionImpl::TWriteSessionImpl(
void TWriteSessionImpl::Start(const TDuration& delay) {
Y_ABORT_UNLESS(SelfContext);
+
+ if (!EventsQueue) {
+#define WRAP_HANDLER(type, handler, ...) \
+ if (auto h = Settings.EventHandlers_.handler##_) { \
+ Settings.EventHandlers_.handler([ctx = SelfContext, h = std::move(h)](__VA_ARGS__ type& ev){ \
+ if (auto self = ctx->LockShared()) { \
+ h(ev); \
+ } \
+ }); \
+ }
+ WRAP_HANDLER(TWriteSessionEvent::TAcksEvent, AcksHandler);
+ WRAP_HANDLER(TWriteSessionEvent::TReadyToAcceptEvent, ReadyToAcceptHandler);
+ WRAP_HANDLER(TSessionClosedEvent, SessionClosedHandler, const);
+ WRAP_HANDLER(TWriteSessionEvent::TEvent, CommonHandler);
+#undef WRAP_HANDLER
+
+ EventsQueue = std::make_shared<TWriteSessionEventsQueue>(Settings);
+ }
+
++ConnectionAttemptsDone;
if (!Started) {
with_lock(Lock) {
@@ -374,7 +392,7 @@ void TWriteSessionImpl::WriteInternal(
readyToAccept = OnMemoryUsageChangedImpl(bufferSize).NowOk;
}
if (readyToAccept) {
- EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}});
+ EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()});
}
}
@@ -740,7 +758,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
OnErrorResolved();
if (!FirstTokenSent) {
- result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}});
+ result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()});
FirstTokenSent = true;
}
// Kickstart send after session reestablishment
@@ -777,7 +795,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
});
if (CleanupOnAcknowledged(GetIdImpl(sequenceNumber))) {
- result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}});
+ result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()});
}
}
//EventsQueue->PushEvent(std::move(acksEvent));
@@ -925,7 +943,7 @@ void TWriteSessionImpl::OnCompressed(TBlock&& block, bool isSyncCompression) {
memoryUsage = OnCompressedImpl(std::move(block));
}
if (memoryUsage.NowOk && !memoryUsage.WasOk) {
- EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}});
+ EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()});
}
}
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h
index 8101341645..bda181433a 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h
@@ -106,18 +106,20 @@ private:
struct THandlersVisitor : public TParent::TBaseHandlersVisitor {
using TParent::TBaseHandlersVisitor::TBaseHandlersVisitor;
-#define DECLARE_HANDLER(type, handler, answer) \
- bool operator()(type& event) { \
- if (Settings.EventHandlers_.handler) { \
- Settings.EventHandlers_.handler(event); \
- return answer; \
- } \
- return false; \
- } \
+#define DECLARE_HANDLER(type, handler, answer) \
+ bool operator()(type&) { \
+ if (this->PushHandler<type>( \
+ std::move(TParent::TBaseHandlersVisitor::Event), \
+ this->Settings.EventHandlers_.handler, \
+ this->Settings.EventHandlers_.CommonHandler_)) { \
+ return answer; \
+ } \
+ return false; \
+ } \
/**/
DECLARE_HANDLER(TWriteSessionEvent::TAcksEvent, AcksHandler_, true);
- DECLARE_HANDLER(TWriteSessionEvent::TReadyToAcceptEvent, ReadyToAcceptHander_, true);
+ DECLARE_HANDLER(TWriteSessionEvent::TReadyToAcceptEvent, ReadyToAcceptHandler_, true);
DECLARE_HANDLER(TSessionClosedEvent, SessionClosedHandler_, false); // Not applied
#undef DECLARE_HANDLER
@@ -125,7 +127,6 @@ private:
bool Visit() {
return std::visit(*this, Event);
}
-
};
bool ApplyHandler(TEventInfo& eventInfo) {
@@ -158,7 +159,8 @@ namespace NTests {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// TWriteSessionImpl
-class TWriteSessionImpl : public TEnableSelfContext<TWriteSessionImpl> {
+class TWriteSessionImpl : public TContinuationTokenIssuer,
+ public TEnableSelfContext<TWriteSessionImpl> {
private:
friend class TWriteSession;
friend class TSimpleBlockingWriteSession;
@@ -320,6 +322,10 @@ public:
TWriterCounters::TPtr GetCounters() {Y_ABORT("Unimplemented"); } //ToDo - unimplemented;
+ const TWriteSessionSettings& GetSettings() const {
+ return Settings;
+ }
+
~TWriteSessionImpl(); // will not call close - destroy everything without acks
private:
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h
index fc42bead04..e7fe810460 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/persqueue.h
@@ -1,6 +1,7 @@
#pragma once
#include <ydb/public/api/grpc/draft/ydb_persqueue_v1.grpc.pb.h>
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
+#include <ydb/public/sdk/cpp/client/ydb_types/exceptions/exceptions.h>
#include <library/cpp/monlib/dynamic_counters/counters.h>
#include <library/cpp/logger/log.h>
@@ -356,12 +357,35 @@ enum class EClusterDiscoveryMode {
Off
};
-class TContinuationToken : public TMoveOnly {
- friend class TWriteSessionImpl;
+class TContinuationToken : public TNonCopyable {
+ friend class TContinuationTokenIssuer;
+
+ bool Valid = true;
+
+public:
+ TContinuationToken& operator=(TContinuationToken&& other) {
+ if (!other.Valid) {
+ ythrow TContractViolation("Cannot move invalid token");
+ }
+ Valid = std::exchange(other.Valid, false);
+ return *this;
+ }
+
+ TContinuationToken(TContinuationToken&& other) {
+ *this = std::move(other);
+ }
+
private:
TContinuationToken() = default;
};
+class TContinuationTokenIssuer {
+protected:
+ static auto IssueContinuationToken() {
+ return TContinuationToken{};
+ }
+};
+
struct TWriterCounters : public TThrRefBase {
using TSelf = TWriterCounters;
using TPtr = TIntrusivePtr<TSelf>;
@@ -1002,10 +1026,21 @@ struct TWriteSessionEvent {
//! Indicates that a writer is ready to accept new message(s).
//! Continuation token should be kept and then used in write methods.
struct TReadyToAcceptEvent {
- TContinuationToken ContinuationToken;
+ mutable TContinuationToken ContinuationToken;
- TString DebugString() const;
+ TReadyToAcceptEvent() = delete;
+ TReadyToAcceptEvent(TContinuationToken&& t) : ContinuationToken(std::move(t)) {
+ }
+ TReadyToAcceptEvent(TReadyToAcceptEvent&&) = default;
+ TReadyToAcceptEvent(const TReadyToAcceptEvent& other) : ContinuationToken(std::move(other.ContinuationToken)) {
+ }
+ TReadyToAcceptEvent& operator=(TReadyToAcceptEvent&&) = default;
+ TReadyToAcceptEvent& operator=(const TReadyToAcceptEvent& other) {
+ ContinuationToken = std::move(other.ContinuationToken);
+ return *this;
+ }
+ TString DebugString() const;
};
using TEvent = std::variant<TAcksEvent, TReadyToAcceptEvent, TSessionClosedEvent>;
@@ -1103,7 +1138,7 @@ struct TWriteSessionSettings : public TRequestSettings<TWriteSessionSettings> {
//! Function to handle ReadyToAccept event.
//! If this handler is set, write these events will be handled by handler,
//! otherwise sent to TWriteSession::GetEvent().
- FLUENT_SETTING(TReadyToAcceptHandler, ReadyToAcceptHander);
+ FLUENT_SETTING(TReadyToAcceptHandler, ReadyToAcceptHandler);
//! Function to handle close session events.
//! If this handler is set, close session events will be handled by handler
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp
index f6884c017b..91962449f9 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp
@@ -1,5 +1,7 @@
#include "write_session.h"
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/log_lazy.h>
+
namespace NYdb::NTopic {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -79,15 +81,24 @@ TSimpleBlockingWriteSession::TSimpleBlockingWriteSession(
std::shared_ptr<TGRpcConnectionsImpl> connections,
TDbDriverStatePtr dbDriverState
) {
- auto alteredSettings = settings;
- alteredSettings.EventHandlers_.AcksHandler_ = [this](TWriteSessionEvent::TAcksEvent& event) {this->HandleAck(event); };
- alteredSettings.EventHandlers_.ReadyToAcceptHander_ = [this](TWriteSessionEvent::TReadyToAcceptEvent& event)
- {this->HandleReady(event); };
- alteredSettings.EventHandlers_.SessionClosedHandler_ = [this](const TSessionClosedEvent& event) {this->HandleClosed(event); };
-
- Writer = std::make_shared<TWriteSession>(
- alteredSettings, client, connections, dbDriverState
- );
+ auto subSettings = settings;
+ if (settings.EventHandlers_.AcksHandler_) {
+ LOG_LAZY(dbDriverState->Log, TLOG_WARNING, "TSimpleBlockingWriteSession: Cannot use AcksHandler, resetting.");
+ subSettings.EventHandlers_.AcksHandler({});
+ }
+ if (settings.EventHandlers_.ReadyToAcceptHandler_) {
+ LOG_LAZY(dbDriverState->Log, TLOG_WARNING, "TSimpleBlockingWriteSession: Cannot use ReadyToAcceptHandler, resetting.");
+ subSettings.EventHandlers_.ReadyToAcceptHandler({});
+ }
+ if (settings.EventHandlers_.SessionClosedHandler_) {
+ LOG_LAZY(dbDriverState->Log, TLOG_WARNING, "TSimpleBlockingWriteSession: Cannot use SessionClosedHandler, resetting.");
+ subSettings.EventHandlers_.SessionClosedHandler({});
+ }
+ if (settings.EventHandlers_.CommonHandler_) {
+ LOG_LAZY(dbDriverState->Log, TLOG_WARNING, "TSimpleBlockingWriteSession: Cannot use CommonHandler, resetting.");
+ subSettings.EventHandlers_.CommonHandler({});
+ }
+ Writer = std::make_shared<TWriteSession>(subSettings, client, connections, dbDriverState);
Writer->Start(TDuration::Max());
}
@@ -98,23 +109,15 @@ ui64 TSimpleBlockingWriteSession::GetInitSeqNo() {
bool TSimpleBlockingWriteSession::Write(
TStringBuf data, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp, const TDuration& blockTimeout
) {
- if (!IsAlive())
- return false;
-
- auto continuationToken = WaitForToken(blockTimeout);
- if (continuationToken.Defined()) {
- Writer->Write(std::move(*continuationToken), std::move(data), seqNo, createTimestamp);
- return true;
- }
- return false;
+ auto message = TWriteMessage(std::move(data))
+ .SeqNo(seqNo)
+ .CreateTimestamp(createTimestamp);
+ return Write(std::move(message), blockTimeout);
}
bool TSimpleBlockingWriteSession::Write(
TWriteMessage&& message, const TDuration& blockTimeout
) {
- if (!IsAlive())
- return false;
-
auto continuationToken = WaitForToken(blockTimeout);
if (continuationToken.Defined()) {
Writer->Write(std::move(*continuationToken), std::move(message));
@@ -124,27 +127,33 @@ bool TSimpleBlockingWriteSession::Write(
}
TMaybe<TContinuationToken> TSimpleBlockingWriteSession::WaitForToken(const TDuration& timeout) {
- auto startTime = TInstant::Now();
+ TInstant startTime = TInstant::Now();
TDuration remainingTime = timeout;
+
TMaybe<TContinuationToken> token = Nothing();
- while(!token.Defined() && remainingTime > TDuration::Zero()) {
- with_lock(Lock) {
- if (!ContinueTokens.empty()) {
- token = std::move(ContinueTokens.front());
- ContinueTokens.pop();
+
+ while (IsAlive() && remainingTime > TDuration::Zero()) {
+ Writer->WaitEvent().Wait(remainingTime);
+
+ for (auto event : Writer->GetEvents()) {
+ if (auto* readyEvent = std::get_if<TWriteSessionEvent::TReadyToAcceptEvent>(&event)) {
+ Y_ABORT_UNLESS(token.Empty());
+ token = std::move(readyEvent->ContinuationToken);
+ } else if (auto* ackEvent = std::get_if<TWriteSessionEvent::TAcksEvent>(&event)) {
+ // discard
+ } else if (auto* closeSessionEvent = std::get_if<TSessionClosedEvent>(&event)) {
+ Closed.store(true);
+ return Nothing();
}
}
- if (!IsAlive())
- return Nothing();
if (token.Defined()) {
- return std::move(*token);
- }
- else {
- remainingTime = timeout - (TInstant::Now() - startTime);
- Sleep(Min(remainingTime, TDuration::MilliSeconds(100)));
+ return token;
}
+
+ remainingTime = timeout - (TInstant::Now() - startTime);
}
+
return Nothing();
}
@@ -152,30 +161,12 @@ TWriterCounters::TPtr TSimpleBlockingWriteSession::GetCounters() {
return Writer->GetCounters();
}
-
bool TSimpleBlockingWriteSession::IsAlive() const {
- bool closed = false;
- with_lock(Lock) {
- closed = Closed;
- }
- return !closed;
+ return !Closed.load();
}
-void TSimpleBlockingWriteSession::HandleAck(TWriteSessionEvent::TAcksEvent& event) {
- Y_UNUSED(event);
-}
-
-void TSimpleBlockingWriteSession::HandleReady(TWriteSessionEvent::TReadyToAcceptEvent& event) {
- with_lock(Lock) {
- ContinueTokens.emplace(std::move(event.ContinuationToken));
- }
-}
-void TSimpleBlockingWriteSession::HandleClosed(const TSessionClosedEvent&) {
- with_lock(Lock) {
- Closed = true;
- }
-}
bool TSimpleBlockingWriteSession::Close(TDuration closeTimeout) {
+ Closed.store(true);
return Writer->Close(std::move(closeTimeout));
}
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h
index e8c618ba2b..b8c725f9c9 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h
@@ -9,6 +9,7 @@
#include <util/generic/buffer.h>
+#include <atomic>
namespace NYdb::NTopic {
@@ -87,9 +88,7 @@ private:
void HandleReady(TWriteSessionEvent::TReadyToAcceptEvent&);
void HandleClosed(const TSessionClosedEvent&);
- TAdaptiveLock Lock;
- std::queue<TContinuationToken> ContinueTokens;
- bool Closed = false;
+ std::atomic_bool Closed = false;
};
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
index 20d1484e40..c89cdf1eca 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
@@ -87,7 +87,7 @@ void TWriteSessionImpl::Start(const TDuration& delay) {
}); \
}
WRAP_HANDLER(TWriteSessionEvent::TAcksEvent, AcksHandler);
- WRAP_HANDLER(TWriteSessionEvent::TReadyToAcceptEvent, ReadyToAcceptHander);
+ WRAP_HANDLER(TWriteSessionEvent::TReadyToAcceptEvent, ReadyToAcceptHandler);
WRAP_HANDLER(TSessionClosedEvent, SessionClosedHandler, const);
WRAP_HANDLER(TWriteSessionEvent::TEvent, CommonHandler);
#undef WRAP_HANDLER
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
index 5c6f9c41dd..9373e9c811 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
@@ -115,7 +115,7 @@ private:
} \
/**/
DECLARE_HANDLER(TWriteSessionEvent::TAcksEvent, AcksHandler_, true);
- DECLARE_HANDLER(TWriteSessionEvent::TReadyToAcceptEvent, ReadyToAcceptHander_, true);
+ DECLARE_HANDLER(TWriteSessionEvent::TReadyToAcceptEvent, ReadyToAcceptHandler_, true);
DECLARE_HANDLER(TSessionClosedEvent, SessionClosedHandler_, false); // Not applied
#undef DECLARE_HANDLER
@@ -355,6 +355,10 @@ public:
TWriterCounters::TPtr GetCounters() {Y_ABORT("Unimplemented"); } //ToDo - unimplemented;
+ const TWriteSessionSettings& GetSettings() const {
+ return Settings;
+ }
+
~TWriteSessionImpl(); // will not call close - destroy everything without acks
private:
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
index f1080b3fb7..546483d088 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
@@ -1369,7 +1369,7 @@ struct TWriteSessionSettings : public TRequestSettings<TWriteSessionSettings> {
//! Function to handle ReadyToAccept event.
//! If this handler is set, write these events will be handled by handler,
//! otherwise sent to TWriteSession::GetEvent().
- FLUENT_SETTING(TReadyToAcceptHandler, ReadyToAcceptHander);
+ FLUENT_SETTING(TReadyToAcceptHandler, ReadyToAcceptHandler);
//! Function to handle close session events.
//! If this handler is set, close session events will be handled by handler