aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <alexnick@ydb.tech>2023-03-23 18:13:29 +0300
committeralexnick <alexnick@ydb.tech>2023-03-23 18:13:29 +0300
commit6321e25841fc751c1dd05a9ea1c65d00b712c2d6 (patch)
treeb8476344147892d581a69b2030c0d3adaae01991
parent06cfc1eb303f9de45c4a85938bf5782e05c4d0d1 (diff)
downloadydb-6321e25841fc751c1dd05a9ea1c65d00b712c2d6.tar.gz
fix for several continuation tokens
fix for double continuation token
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp7
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h1
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp7
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h1
4 files changed, 10 insertions, 6 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp
index 338a71fab6f..6cf20645068 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp
@@ -741,8 +741,10 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
SessionStartedTs = TInstant::Now();
OnErrorResolved();
- //EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}});
- result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}});
+ if (!FirstTokenSent) {
+ result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}});
+ FirstTokenSent = true;
+ }
// Kickstart send after session reestablishment
SendImpl();
break;
@@ -804,7 +806,6 @@ bool TWriteSessionImpl::CleanupOnAcknowledged(ui64 sequenceNumber) {
if(!SentPackedMessage.empty() && SentPackedMessage.front().Offset == sequenceNumber) {
auto memoryUsage = OnMemoryUsageChangedImpl(-SentPackedMessage.front().Data.size());
result = memoryUsage.NowOk && !memoryUsage.WasOk;
- //EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}});
const auto& front = SentPackedMessage.front();
if (front.Compressed) {
compressedSize = front.Data.size();
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h
index c806caa9729..3b25b28d9c8 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h
@@ -412,6 +412,7 @@ private:
IExecutor::TPtr Executor;
IExecutor::TPtr CompressionExecutor;
size_t MemoryUsage = 0; //!< Estimated amount of memory used
+ bool FirstTokenSent = false;
TMessageBatch CurrentBatch;
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
index 3af50d9986d..21b97070eaa 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
@@ -595,8 +595,10 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
SessionStartedTs = TInstant::Now();
OnErrorResolved();
- //EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}});
- result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}});
+ if (!FirstTokenSent) {
+ result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}});
+ FirstTokenSent = true;
+ }
// Kickstart send after session reestablishment
SendImpl();
break;
@@ -673,7 +675,6 @@ bool TWriteSessionImpl::CleanupOnAcknowledged(ui64 sequenceNumber) {
if(!SentPackedMessage.empty() && SentPackedMessage.front().Offset == sequenceNumber) {
auto memoryUsage = OnMemoryUsageChangedImpl(-SentPackedMessage.front().Data.size());
result = memoryUsage.NowOk && !memoryUsage.WasOk;
- //EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}});
const auto& front = SentPackedMessage.front();
if (front.Compressed) {
compressedSize = front.Data.size();
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
index b9e5ae72eed..e23d14e8a1b 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
@@ -401,6 +401,7 @@ private:
IExecutor::TPtr Executor;
IExecutor::TPtr CompressionExecutor;
size_t MemoryUsage = 0; //!< Estimated amount of memory used
+ bool FirstTokenSent = false;
TMessageBatch CurrentBatch;