diff options
author | alexnick <alexnick@ydb.tech> | 2023-03-23 18:13:29 +0300 |
---|---|---|
committer | alexnick <alexnick@ydb.tech> | 2023-03-23 18:13:29 +0300 |
commit | 6321e25841fc751c1dd05a9ea1c65d00b712c2d6 (patch) | |
tree | b8476344147892d581a69b2030c0d3adaae01991 | |
parent | 06cfc1eb303f9de45c4a85938bf5782e05c4d0d1 (diff) | |
download | ydb-6321e25841fc751c1dd05a9ea1c65d00b712c2d6.tar.gz |
fix for several continuation tokens
fix for double continuation token
4 files changed, 10 insertions, 6 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp index 338a71fab6f..6cf20645068 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp @@ -741,8 +741,10 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess SessionStartedTs = TInstant::Now(); OnErrorResolved(); - //EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); - result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); + if (!FirstTokenSent) { + result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); + FirstTokenSent = true; + } // Kickstart send after session reestablishment SendImpl(); break; @@ -804,7 +806,6 @@ bool TWriteSessionImpl::CleanupOnAcknowledged(ui64 sequenceNumber) { if(!SentPackedMessage.empty() && SentPackedMessage.front().Offset == sequenceNumber) { auto memoryUsage = OnMemoryUsageChangedImpl(-SentPackedMessage.front().Data.size()); result = memoryUsage.NowOk && !memoryUsage.WasOk; - //EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); const auto& front = SentPackedMessage.front(); if (front.Compressed) { compressedSize = front.Data.size(); diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h index c806caa9729..3b25b28d9c8 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h @@ -412,6 +412,7 @@ private: IExecutor::TPtr Executor; IExecutor::TPtr CompressionExecutor; size_t MemoryUsage = 0; //!< Estimated amount of memory used + bool FirstTokenSent = false; TMessageBatch CurrentBatch; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp index 3af50d9986d..21b97070eaa 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp @@ -595,8 +595,10 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess SessionStartedTs = TInstant::Now(); OnErrorResolved(); - //EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); - result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); + if (!FirstTokenSent) { + result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); + FirstTokenSent = true; + } // Kickstart send after session reestablishment SendImpl(); break; @@ -673,7 +675,6 @@ bool TWriteSessionImpl::CleanupOnAcknowledged(ui64 sequenceNumber) { if(!SentPackedMessage.empty() && SentPackedMessage.front().Offset == sequenceNumber) { auto memoryUsage = OnMemoryUsageChangedImpl(-SentPackedMessage.front().Data.size()); result = memoryUsage.NowOk && !memoryUsage.WasOk; - //EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); const auto& front = SentPackedMessage.front(); if (front.Compressed) { compressedSize = front.Data.size(); diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h index b9e5ae72eed..e23d14e8a1b 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h @@ -401,6 +401,7 @@ private: IExecutor::TPtr Executor; IExecutor::TPtr CompressionExecutor; size_t MemoryUsage = 0; //!< Estimated amount of memory used + bool FirstTokenSent = false; TMessageBatch CurrentBatch; |