diff options
author | alexnick <alexnick@ydb.tech> | 2023-10-20 10:07:10 +0300 |
---|---|---|
committer | alexnick <alexnick@ydb.tech> | 2023-10-20 10:51:13 +0300 |
commit | b167b93e612d7a4f28f8ece777049f4c83a51758 (patch) | |
tree | f6305a5422aba190addb1b5f548f1c6fd058e599 | |
parent | bcbb765da1b14178a1bc4597e7f2980d1edde337 (diff) | |
download | ydb-b167b93e612d7a4f28f8ece777049f4c83a51758.tar.gz |
fix for assigning seqno
fix for assigning seqno
6 files changed, 171 insertions, 128 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp index 5ed79a72e3..0aeae6d05d 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp @@ -256,18 +256,6 @@ void TWriteSessionImpl::OnCdsResponse( with_lock(Lock) { if (!InitialCluster) { InitialCluster = name; - } else if (CurrentCluster != name) { // Switched to another cluster - Y_ABORT_UNLESS(CurrentCluster); - if (name == InitialCluster) { // Returned to initial cluster, disabled SeqNo Shift - SeqNoShift = 0; - OnSeqNoShift = false; - } else { // Switched from initial cluster to second one; - Y_ABORT_UNLESS(CurrentCluster == InitialCluster); - if (AutoSeqNoMode.GetOrElse(true)) { - OnSeqNoShift = true; - } - } - } CurrentCluster = name; } @@ -313,10 +301,23 @@ TVector<TWriteSessionEvent::TEvent> TWriteSessionImpl::GetEvents(bool block, TMa return EventsQueue->GetEvents(block, maxEventsCount); } -ui64 TWriteSessionImpl::GetNextSeqNoImpl(const TMaybe<ui64>& seqNo) { +ui64 TWriteSessionImpl::GetIdImpl(ui64 seqNo) { + Y_ABORT_UNLESS(AutoSeqNoMode.Defined()); + Y_ABORT_UNLESS(!*AutoSeqNoMode || InitSeqNo.contains(CurrentCluster) && seqNo > InitSeqNo[CurrentCluster]); + return *AutoSeqNoMode ? seqNo - InitSeqNo[CurrentCluster] : seqNo; +} + +ui64 TWriteSessionImpl::GetSeqNoImpl(ui64 id) { + Y_ABORT_UNLESS(AutoSeqNoMode.Defined()); + return *AutoSeqNoMode ? id + InitSeqNo[CurrentCluster] : id; + +} + +ui64 TWriteSessionImpl::GetNextIdImpl(const TMaybe<ui64>& seqNo) { Y_ABORT_UNLESS(Lock.IsLocked()); - ui64 seqNoValue = LastSeqNo + 1; + ui64 id = ++NextId; + if (!AutoSeqNoMode.Defined()) { AutoSeqNoMode = !seqNo.Defined(); } @@ -331,11 +332,8 @@ ui64 TWriteSessionImpl::GetNextSeqNoImpl(const TMaybe<ui64>& seqNo) { ); } else { - seqNoValue = *seqNo; + id = *seqNo; } - //! Disable SeqNo shift for manual SeqNo mode; - OnSeqNoShift = false; - SeqNoShift = 0; } else if (!(*AutoSeqNoMode)) { LOG_LAZY(DbDriverState->Log, TLOG_ERR, @@ -345,9 +343,9 @@ ui64 TWriteSessionImpl::GetNextSeqNoImpl(const TMaybe<ui64>& seqNo) { "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode" ); } - LastSeqNo = seqNoValue; - return seqNoValue; + return id; } + inline void TWriteSessionImpl::CheckHandleResultImpl(THandleResult& result) { Y_ABORT_UNLESS(Lock.IsLocked()); @@ -374,7 +372,7 @@ void TWriteSessionImpl::WriteInternal( bool readyToAccept = false; size_t bufferSize = data.size(); with_lock(Lock) { - CurrentBatch.Add(GetNextSeqNoImpl(seqNo), createdAtValue, data, codec, originalSize); + CurrentBatch.Add(GetNextIdImpl(seqNo), createdAtValue, data, codec, originalSize); FlushWriteIfRequiredImpl(); readyToAccept = OnMemoryUsageChangedImpl(bufferSize).NowOk; @@ -735,13 +733,10 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess SessionId = initResponse.session_id(); PartitionId = initResponse.partition_id(); ui64 newLastSeqNo = initResponse.last_sequence_number(); - // SeqNo increased, so there's a risk of loss, apply SeqNo shift. - // MinUnsentSeqNo must be > 0 if anything was ever sent yet - if (MinUnsentSeqNo && OnSeqNoShift && newLastSeqNo > MinUnsentSeqNo) { - SeqNoShift = newLastSeqNo - MinUnsentSeqNo; - } result.InitSeqNo = newLastSeqNo; - LastSeqNo = newLastSeqNo; + if (!InitSeqNo.contains(CurrentCluster)) { + InitSeqNo[CurrentCluster] = newLastSeqNo >= MinUnsentId ? newLastSeqNo - MinUnsentId + 1 : 0; + } SessionEstablished = true; LastCountersUpdateTs = TInstant::Now(); @@ -775,7 +770,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess ui64 sequenceNumber = batchWriteResponse.sequence_numbers(messageIndex); acksEvent.Acks.push_back(TWriteSessionEvent::TWriteAck{ - sequenceNumber - SeqNoShift, + GetIdImpl(sequenceNumber), batchWriteResponse.already_written(messageIndex) ? TWriteSessionEvent::TWriteAck::EES_ALREADY_WRITTEN: TWriteSessionEvent::TWriteAck::EES_WRITTEN, TWriteSessionEvent::TWriteAck::TWrittenMessageDetails { @@ -785,7 +780,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess writeStat, }); - if (CleanupOnAcknowledged(sequenceNumber - SeqNoShift)) { + if (CleanupOnAcknowledged(GetIdImpl(sequenceNumber))) { result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}}); } } @@ -803,14 +798,14 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess return result; } -bool TWriteSessionImpl::CleanupOnAcknowledged(ui64 sequenceNumber) { +bool TWriteSessionImpl::CleanupOnAcknowledged(ui64 id) { bool result = false; - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: acknoledged message " << sequenceNumber); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: acknoledged message " << id); UpdateTimedCountersImpl(); - if (SentOriginalMessages.empty() || SentOriginalMessages.front().SeqNo != sequenceNumber){ + if (SentOriginalMessages.empty() || SentOriginalMessages.front().Id != id){ Cerr << "State before restart was:\n" << StateStr << "\n\n"; DumpState(); - Cerr << "State on ack with seqNo " << sequenceNumber << " is:\n"; + Cerr << "State on ack with id " << id << " is:\n"; Cerr << StateStr << "\n\n"; Y_ABORT("got unknown ack"); } @@ -818,7 +813,7 @@ bool TWriteSessionImpl::CleanupOnAcknowledged(ui64 sequenceNumber) { const auto& sentFront = SentOriginalMessages.front(); ui64 size = 0; ui64 compressedSize = 0; - if(!SentPackedMessage.empty() && SentPackedMessage.front().Offset == sequenceNumber) { + if(!SentPackedMessage.empty() && SentPackedMessage.front().Offset == id) { auto memoryUsage = OnMemoryUsageChangedImpl(-SentPackedMessage.front().Data.size()); result = memoryUsage.NowOk && !memoryUsage.WasOk; const auto& front = SentPackedMessage.front(); @@ -847,7 +842,7 @@ bool TWriteSessionImpl::CleanupOnAcknowledged(ui64 sequenceNumber) { Y_ABORT_UNLESS(Counters->BytesInflightCompressed->Val() >= 0); Y_ABORT_UNLESS(Counters->BytesInflightUncompressed->Val() >= 0); - Y_ABORT_UNLESS(sentFront.SeqNo == sequenceNumber); + Y_ABORT_UNLESS(sentFront.Id == id); (*Counters->BytesInflightTotal) = MemoryUsage; SentOriginalMessages.pop(); @@ -964,7 +959,7 @@ void TWriteSessionImpl::ResetForRetryImpl() { PackedMessagesToSend.emplace(std::move(SentPackedMessage.front())); SentPackedMessage.pop(); } - ui64 minSeqNo = PackedMessagesToSend.empty() ? LastSeqNo + 1 : PackedMessagesToSend.top().Offset; + ui64 minId = PackedMessagesToSend.empty() ? NextId + 1 : PackedMessagesToSend.top().Offset; std::queue<TOriginalMessage> freshOriginalMessagesToSend; OriginalMessagesToSend.swap(freshOriginalMessagesToSend); while (!SentOriginalMessages.empty()) { @@ -975,9 +970,9 @@ void TWriteSessionImpl::ResetForRetryImpl() { OriginalMessagesToSend.emplace(std::move(freshOriginalMessagesToSend.front())); freshOriginalMessagesToSend.pop(); } - if (!OriginalMessagesToSend.empty() && OriginalMessagesToSend.front().SeqNo < minSeqNo) - minSeqNo = OriginalMessagesToSend.front().SeqNo; - MinUnsentSeqNo = minSeqNo; + if (!OriginalMessagesToSend.empty() && OriginalMessagesToSend.front().Id < minId) + minId = OriginalMessagesToSend.front().Id; + MinUnsentId = minId; Y_ABORT_UNLESS(PackedMessagesToSend.size() == totalPackedMessages); Y_ABORT_UNLESS(OriginalMessagesToSend.size() == totalOriginalMessages); } @@ -1005,8 +1000,8 @@ size_t TWriteSessionImpl::WriteBatchImpl() { LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, - LogPrefix() << "Write " << CurrentBatch.Messages.size() << " messages with seqNo from " - << CurrentBatch.Messages.begin()->SeqNo << " to " << CurrentBatch.Messages.back().SeqNo + LogPrefix() << "Write " << CurrentBatch.Messages.size() << " messages with Id from " + << CurrentBatch.Messages.begin()->Id << " to " << CurrentBatch.Messages.back().Id ); Y_ABORT_UNLESS(CurrentBatch.Messages.size() <= MaxBlockMessageCount); @@ -1020,11 +1015,11 @@ size_t TWriteSessionImpl::WriteBatchImpl() { for (size_t i = 0; i != CurrentBatch.Messages.size();) { TBlock block{}; for (; block.OriginalSize < MaxBlockSize && i != CurrentBatch.Messages.size(); ++i) { - auto sequenceNumber = CurrentBatch.Messages[i].SeqNo; + auto id = CurrentBatch.Messages[i].Id; auto createTs = CurrentBatch.Messages[i].CreatedAt; if (!block.MessageCount) { - block.Offset = sequenceNumber; + block.Offset = id; } block.MessageCount += 1; @@ -1042,7 +1037,7 @@ size_t TWriteSessionImpl::WriteBatchImpl() { UpdateTimedCountersImpl(); (*Counters->BytesInflightUncompressed) += datum.size(); (*Counters->MessagesInflight)++; - OriginalMessagesToSend.emplace(sequenceNumber, createTs, datum.size()); + OriginalMessagesToSend.emplace(id, createTs, datum.size()); } block.Data = std::move(CurrentBatch.Data); if (skipCompression) { @@ -1074,7 +1069,7 @@ bool TWriteSessionImpl::IsReadyToSendNextImpl() { return false; } Y_ABORT_UNLESS(!OriginalMessagesToSend.empty(), "There are packed messages but no original messages"); - if (OriginalMessagesToSend.front().SeqNo > PackedMessagesToSend.top().Offset) { + if (OriginalMessagesToSend.front().Id > PackedMessagesToSend.top().Offset) { Cerr << " State before restart was:\n" << StateStr << "\n\n"; DumpState(); @@ -1082,18 +1077,18 @@ bool TWriteSessionImpl::IsReadyToSendNextImpl() { Y_ABORT("Lost original message(s)"); } - return PackedMessagesToSend.top().Offset == OriginalMessagesToSend.front().SeqNo; + return PackedMessagesToSend.top().Offset == OriginalMessagesToSend.front().Id; } void TWriteSessionImpl::DumpState() { TStringBuilder s; - s << "STATE:\nSeqNoShift = " << SeqNoShift << "\n"; + s << "STATE:\n"; auto omts = OriginalMessagesToSend; s << "OriginalMessagesToSend(" << omts.size() << "):"; ui32 i = 20; while(!omts.empty() && i-- > 0) { - s << " " << omts.front().SeqNo; + s << " " << omts.front().Id; omts.pop(); } if (!omts.empty()) s << " ..."; @@ -1103,7 +1098,7 @@ void TWriteSessionImpl::DumpState() { omts = SentOriginalMessages; i = 20; while(!omts.empty() && i-- > 0) { - s << " " << omts.front().SeqNo; + s << " " << omts.front().Id; omts.pop(); } if (omts.size() > 20) { @@ -1113,7 +1108,7 @@ void TWriteSessionImpl::DumpState() { } } while(!omts.empty()) { - s << " " << omts.front().SeqNo; + s << " " << omts.front().Id; omts.pop(); } s << "\n"; @@ -1187,7 +1182,7 @@ void TWriteSessionImpl::SendImpl() { auto& message = OriginalMessagesToSend.front(); writeRequest->add_sent_at_ms(sentAtMs); - writeRequest->add_sequence_numbers(message.SeqNo + SeqNoShift); + writeRequest->add_sequence_numbers(GetSeqNoImpl(message.Id)); writeRequest->add_message_sizes(message.Size); writeRequest->add_created_at_ms(message.CreatedAt.MilliSeconds()); diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h index 9bd95ea7cd..aba702f049 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h @@ -173,13 +173,13 @@ private: using IProcessor = IWriteSessionConnectionProcessorFactory::IProcessor; struct TMessage { - ui64 SeqNo; + ui64 Id; TInstant CreatedAt; TStringBuf DataRef; TMaybe<ECodec> Codec; ui32 OriginalSize; // only for coded messages - TMessage(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec = {}, ui32 originalSize = 0) - : SeqNo(seqNo) + TMessage(ui64 id, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec = {}, ui32 originalSize = 0) + : Id(id) , CreatedAt(createdAt) , DataRef(data) , Codec(codec) @@ -194,11 +194,11 @@ private: TInstant StartedAt = TInstant::Zero(); bool Acquired = false; bool FlushRequested = false; - void Add(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize) { + void Add(ui64 id, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize) { if (StartedAt == TInstant::Zero()) StartedAt = TInstant::Now(); CurrentSize += codec ? originalSize : data.size(); - Messages.emplace_back(seqNo, createdAt, data, codec, originalSize); + Messages.emplace_back(id, createdAt, data, codec, originalSize); Acquired = false; } @@ -264,11 +264,11 @@ private: }; struct TOriginalMessage { - ui64 SeqNo; + ui64 Id; TInstant CreatedAt; size_t Size; - TOriginalMessage(const ui64 sequenceNumber, const TInstant createdAt, const size_t size) - : SeqNo(sequenceNumber) + TOriginalMessage(const ui64 id, const TInstant createdAt, const size_t size) + : Id(id) , CreatedAt(createdAt) , Size(size) {} @@ -360,10 +360,12 @@ private: //TString GetDebugIdentity() const; Ydb::PersQueue::V1::StreamingWriteClientMessage GetInitClientMessage(); - bool CleanupOnAcknowledged(ui64 sequenceNumber); + bool CleanupOnAcknowledged(ui64 id); bool IsReadyToSendNextImpl(); void DumpState(); - ui64 GetNextSeqNoImpl(const TMaybe<ui64>& seqNo); + ui64 GetNextIdImpl(const TMaybe<ui64>& seqNo); + ui64 GetSeqNoImpl(ui64 id); + ui64 GetIdImpl(ui64 seqNo); void SendImpl(); void AbortImpl(); void CloseImpl(EStatus statusCode, NYql::TIssues&& issues); @@ -385,7 +387,6 @@ private: TString TargetCluster; TString InitialCluster; TString CurrentCluster; - bool OnSeqNoShift = false; TString PreferredClusterByCDS; std::shared_ptr<IWriteSessionConnectionProcessorFactory> ConnectionFactory; TDbDriverStatePtr DbDriverState; @@ -426,9 +427,9 @@ private: TAtomic Aborting = 0; bool SessionEstablished = false; ui32 PartitionId = 0; - ui64 LastSeqNo = 0; - ui64 MinUnsentSeqNo = 0; - ui64 SeqNoShift = 0; + ui64 NextId = 0; + ui64 MinUnsentId = 1; + std::map<TString, ui64> InitSeqNo; TMaybe<bool> AutoSeqNoMode; bool ValidateSeqNoMode = false; diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp index ac81189cb0..298c574ae1 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp @@ -167,7 +167,7 @@ Y_UNIT_TEST_SUITE(RetryPolicy) { auto settings = setup1->GetWriteSessionSettings(); auto& client = setup1->GetPersQueueClient(); - + //! Fill data in dc1 1 with SeqNo = 1..10 for 2 different SrcId Cerr << "===Write 10 messages into every writer\n"; for (auto i = 0; i != 10; i++) { @@ -210,7 +210,7 @@ Y_UNIT_TEST_SUITE(RetryPolicy) { //! Put some data inflight. It cannot be written now, but SeqNo will be assigned. Cerr << "===Write four async message into every writer\n"; - for (auto i = 0; i != 4; i++) { + for (auto i = 0; i != 3; i++) { writer1->Write(false); writer2->Write(false); } @@ -244,7 +244,7 @@ Y_UNIT_TEST_SUITE(RetryPolicy) { //! Put some data inflight again; Cerr << "===Write four async messages into writer2\n"; - for (auto i = 0; i != 4; i++) { + for (auto i = 0; i != 3; i++) { writer2->Write(false); } f2 = writer2->Write(false); @@ -285,7 +285,7 @@ Y_UNIT_TEST_SUITE(RetryPolicy) { CheckSeqNo("dc1", 14); //! DC2 has no shift in SeqNo since 5 messages were written to dc 1. Cerr << "===Check SeqNo writer2 dc2\n"; - CheckSeqNo("dc2", 15); + CheckSeqNo("dc2", 9); auto readSession = client.CreateReadSession(setup1->GetReadSessionSettings()); @@ -303,7 +303,7 @@ Y_UNIT_TEST_SUITE(RetryPolicy) { }; THashMap<TString, ui64> MsgCountByClusterSrc2 = { {"dc1", 14}, - {"dc2", 6} + {"dc2", 5} }; ui32 clustersPendingSrc1 = 2; ui32 clustersPendingSrc2 = 2; @@ -337,7 +337,7 @@ Y_UNIT_TEST_SUITE(RetryPolicy) { if (prevSeqNo == 0) { UNIT_ASSERT_VALUES_EQUAL(seqNo, 1); } else if (prevSeqNo == 1) { - UNIT_ASSERT_VALUES_EQUAL(seqNo, 11); + UNIT_ASSERT_VALUES_EQUAL(seqNo, 6); } else { UNIT_ASSERT_VALUES_EQUAL(seqNo, prevSeqNo + 1); } diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp index ccc57181fb..c02e463c66 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp @@ -319,17 +319,26 @@ TVector<TWriteSessionEvent::TEvent> TWriteSessionImpl::GetEvents(bool block, TMa return EventsQueue->GetEvents(block, maxEventsCount); } -ui64 TWriteSessionImpl::GetNextSeqNoImpl(const TMaybe<ui64>& seqNo) { +ui64 TWriteSessionImpl::GetIdImpl(ui64 seqNo) { + Y_ABORT_UNLESS(AutoSeqNoMode.Defined()); + Y_ABORT_UNLESS(!*AutoSeqNoMode || InitSeqNo.Defined() && seqNo > *InitSeqNo); + return *AutoSeqNoMode ? seqNo - *InitSeqNo : seqNo; +} + +ui64 TWriteSessionImpl::GetSeqNoImpl(ui64 id) { + Y_ABORT_UNLESS(AutoSeqNoMode.Defined()); + Y_ABORT_UNLESS(InitSeqNo.Defined()); + return *AutoSeqNoMode ? id + *InitSeqNo : id; + +} + +ui64 TWriteSessionImpl::GetNextIdImpl(const TMaybe<ui64>& seqNo) { + Y_ABORT_UNLESS(Lock.IsLocked()); - ui64 seqNoValue = LastSeqNo + 1; + ui64 id = ++NextId; if (!AutoSeqNoMode.Defined()) { AutoSeqNoMode = !seqNo.Defined(); - //! Disable SeqNo shift for manual SeqNo mode; - if (seqNo.Defined()) { - OnSeqNoShift = false; - SeqNoShift = 0; - } } if (seqNo.Defined()) { if (!Settings.DeduplicationEnabled_.GetOrElse(true)) { @@ -345,7 +354,7 @@ ui64 TWriteSessionImpl::GetNextSeqNoImpl(const TMaybe<ui64>& seqNo) { "Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode" ); } else { - seqNoValue = *seqNo; + id = *seqNo; } } else if (!(*AutoSeqNoMode)) { LOG_LAZY(DbDriverState->Log, @@ -356,9 +365,9 @@ ui64 TWriteSessionImpl::GetNextSeqNoImpl(const TMaybe<ui64>& seqNo) { "Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode" ); } - LastSeqNo = seqNoValue; - return seqNoValue; + return id; } + inline void TWriteSessionImpl::CheckHandleResultImpl(THandleResult& result) { Y_ABORT_UNLESS(Lock.IsLocked()); @@ -384,7 +393,7 @@ void TWriteSessionImpl::WriteInternal(TContinuationToken&&, TWriteMessage&& mess size_t bufferSize = message.Data.size(); with_lock(Lock) { CurrentBatch.Add( - GetNextSeqNoImpl(message.SeqNo_), createdAtValue, message.Data, message.Codec, message.OriginalSize, + GetNextIdImpl(message.SeqNo_), createdAtValue, message.Data, message.Codec, message.OriginalSize, message.MessageMeta_, message.GetTxPtr() ); @@ -775,17 +784,13 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess SessionId = initResponse.session_id(); PartitionId = initResponse.partition_id(); ui64 newLastSeqNo = initResponse.last_seq_no(); - // SeqNo increased, so there's a risk of loss, apply SeqNo shift. - // MinUnsentSeqNo must be > 0 if anything was ever sent yet - if (Settings.DeduplicationEnabled_.GetOrElse(true)) { - if(MinUnsentSeqNo && OnSeqNoShift && newLastSeqNo > MinUnsentSeqNo) { - SeqNoShift = newLastSeqNo - MinUnsentSeqNo; - } - } else { - newLastSeqNo = 1; - } + if (!Settings.DeduplicationEnabled_.GetOrElse(true)) { + newLastSeqNo = 0; + } result.InitSeqNo = newLastSeqNo; - LastSeqNo = newLastSeqNo; + if (!InitSeqNo.Defined()) { + InitSeqNo = newLastSeqNo; + } SessionEstablished = true; LastCountersUpdateTs = TInstant::Now(); @@ -835,7 +840,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess ui64 offset = ack.has_written() ? ack.written().offset() : 0; acksEvent.Acks.push_back(TWriteSessionEvent::TWriteAck{ - sequenceNumber - SeqNoShift, + GetIdImpl(sequenceNumber), msgWriteStatus, TWriteSessionEvent::TWriteAck::TWrittenMessageDetails { offset, @@ -844,7 +849,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess writeStat, }); - if (CleanupOnAcknowledged(sequenceNumber - SeqNoShift)) { + if (CleanupOnAcknowledged(GetIdImpl(sequenceNumber))) { result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{{}, TContinuationToken{}}); } } @@ -862,14 +867,14 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess return result; } -bool TWriteSessionImpl::CleanupOnAcknowledged(ui64 sequenceNumber) { +bool TWriteSessionImpl::CleanupOnAcknowledged(ui64 id) { bool result = false; - LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: acknoledged message " << sequenceNumber); + LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: acknoledged message " << id); UpdateTimedCountersImpl(); const auto& sentFront = SentOriginalMessages.front(); ui64 size = 0; ui64 compressedSize = 0; - if(!SentPackedMessage.empty() && SentPackedMessage.front().Offset == sequenceNumber) { + if(!SentPackedMessage.empty() && SentPackedMessage.front().Offset == id) { auto memoryUsage = OnMemoryUsageChangedImpl(-SentPackedMessage.front().Data.size()); result = memoryUsage.NowOk && !memoryUsage.WasOk; const auto& front = SentPackedMessage.front(); @@ -898,7 +903,7 @@ bool TWriteSessionImpl::CleanupOnAcknowledged(ui64 sequenceNumber) { Y_ABORT_UNLESS(Counters->BytesInflightCompressed->Val() >= 0); Y_ABORT_UNLESS(Counters->BytesInflightUncompressed->Val() >= 0); - Y_ABORT_UNLESS(sentFront.SeqNo == sequenceNumber); + Y_ABORT_UNLESS(sentFront.Id == id); (*Counters->BytesInflightTotal) = MemoryUsage; SentOriginalMessages.pop(); @@ -1013,7 +1018,7 @@ void TWriteSessionImpl::ResetForRetryImpl() { PackedMessagesToSend.emplace(std::move(SentPackedMessage.front())); SentPackedMessage.pop(); } - ui64 minSeqNo = PackedMessagesToSend.empty() ? LastSeqNo + 1 : PackedMessagesToSend.top().Offset; + ui64 minId = PackedMessagesToSend.empty() ? NextId + 1 : PackedMessagesToSend.top().Offset; std::queue<TOriginalMessage> freshOriginalMessagesToSend; OriginalMessagesToSend.swap(freshOriginalMessagesToSend); while (!SentOriginalMessages.empty()) { @@ -1024,9 +1029,9 @@ void TWriteSessionImpl::ResetForRetryImpl() { OriginalMessagesToSend.emplace(std::move(freshOriginalMessagesToSend.front())); freshOriginalMessagesToSend.pop(); } - if (!OriginalMessagesToSend.empty() && OriginalMessagesToSend.front().SeqNo < minSeqNo) - minSeqNo = OriginalMessagesToSend.front().SeqNo; - MinUnsentSeqNo = minSeqNo; + if (!OriginalMessagesToSend.empty() && OriginalMessagesToSend.front().Id < minId) + minId = OriginalMessagesToSend.front().Id; + MinUnsentId = minId; Y_ABORT_UNLESS(PackedMessagesToSend.size() == totalPackedMessages); Y_ABORT_UNLESS(OriginalMessagesToSend.size() == totalOriginalMessages); } @@ -1054,8 +1059,8 @@ size_t TWriteSessionImpl::WriteBatchImpl() { LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, - LogPrefix() << "Write " << CurrentBatch.Messages.size() << " messages with seqNo from " - << CurrentBatch.Messages.begin()->SeqNo << " to " << CurrentBatch.Messages.back().SeqNo + LogPrefix() << "Write " << CurrentBatch.Messages.size() << " messages with Id from " + << CurrentBatch.Messages.begin()->Id << " to " << CurrentBatch.Messages.back().Id ); Y_ABORT_UNLESS(CurrentBatch.Messages.size() <= MaxBlockMessageCount); @@ -1070,11 +1075,11 @@ size_t TWriteSessionImpl::WriteBatchImpl() { TBlock block{}; for (; block.OriginalSize < MaxBlockSize && i != CurrentBatch.Messages.size(); ++i) { auto& currMessage = CurrentBatch.Messages[i]; - auto sequenceNumber = currMessage.SeqNo; + auto id = currMessage.Id; auto createTs = currMessage.CreatedAt; if (!block.MessageCount) { - block.Offset = sequenceNumber; + block.Offset = id; } block.MessageCount += 1; @@ -1093,11 +1098,11 @@ size_t TWriteSessionImpl::WriteBatchImpl() { (*Counters->BytesInflightUncompressed) += datum.size(); (*Counters->MessagesInflight)++; if (!currMessage.MessageMeta.empty()) { - OriginalMessagesToSend.emplace(sequenceNumber, createTs, datum.size(), + OriginalMessagesToSend.emplace(id, createTs, datum.size(), std::move(currMessage.MessageMeta), currMessage.Tx); } else { - OriginalMessagesToSend.emplace(sequenceNumber, createTs, datum.size(), + OriginalMessagesToSend.emplace(id, createTs, datum.size(), currMessage.Tx); } } @@ -1131,9 +1136,9 @@ bool TWriteSessionImpl::IsReadyToSendNextImpl() const { return false; } Y_ABORT_UNLESS(!OriginalMessagesToSend.empty(), "There are packed messages but no original messages"); - Y_ABORT_UNLESS(OriginalMessagesToSend.front().SeqNo <= PackedMessagesToSend.top().Offset, "Lost original message(s)"); + Y_ABORT_UNLESS(OriginalMessagesToSend.front().Id <= PackedMessagesToSend.top().Offset, "Lost original message(s)"); - return PackedMessagesToSend.top().Offset == OriginalMessagesToSend.front().SeqNo; + return PackedMessagesToSend.top().Offset == OriginalMessagesToSend.front().Id; } @@ -1184,7 +1189,7 @@ void TWriteSessionImpl::SendImpl() { writeRequest->mutable_tx()->set_session(message.Tx->GetSession().GetId()); } - msgData->set_seq_no(message.SeqNo + SeqNoShift); + msgData->set_seq_no(GetSeqNoImpl(message.Id)); *msgData->mutable_created_at() = ::google::protobuf::util::TimeUtil::MillisecondsToTimestamp(message.CreatedAt.MilliSeconds()); if (!message.MessageMeta.empty()) { diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h index 90eb3b409f..cc8da2d38d 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h @@ -163,7 +163,7 @@ private: using IProcessor = IWriteSessionConnectionProcessorFactory::IProcessor; struct TMessage { - ui64 SeqNo; + ui64 Id; TInstant CreatedAt; TStringBuf DataRef; TMaybe<ECodec> Codec; @@ -171,10 +171,10 @@ private: TVector<std::pair<TString, TString>> MessageMeta; const NTable::TTransaction* Tx; - TMessage(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec = {}, + TMessage(ui64 id, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec = {}, ui32 originalSize = 0, const TVector<std::pair<TString, TString>>& messageMeta = {}, const NTable::TTransaction* tx = nullptr) - : SeqNo(seqNo) + : Id(id) , CreatedAt(createdAt) , DataRef(data) , Codec(codec) @@ -192,13 +192,13 @@ private: bool Acquired = false; bool FlushRequested = false; - void Add(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize, + void Add(ui64 id, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize, const TVector<std::pair<TString, TString>>& messageMeta, const NTable::TTransaction* tx) { if (StartedAt == TInstant::Zero()) StartedAt = TInstant::Now(); CurrentSize += codec ? originalSize : data.size(); - Messages.emplace_back(seqNo, createdAt, data, codec, originalSize, messageMeta, tx); + Messages.emplace_back(id, createdAt, data, codec, originalSize, messageMeta, tx); Acquired = false; } @@ -264,24 +264,24 @@ private: }; struct TOriginalMessage { - ui64 SeqNo; + ui64 Id; TInstant CreatedAt; size_t Size; TVector<std::pair<TString, TString>> MessageMeta; const NTable::TTransaction* Tx; - TOriginalMessage(const ui64 sequenceNumber, const TInstant createdAt, const size_t size, + TOriginalMessage(const ui64 id, const TInstant createdAt, const size_t size, const NTable::TTransaction* tx) - : SeqNo(sequenceNumber) + : Id(id) , CreatedAt(createdAt) , Size(size) , Tx(tx) {} - TOriginalMessage(const ui64 sequenceNumber, const TInstant createdAt, const size_t size, + TOriginalMessage(const ui64 id, const TInstant createdAt, const size_t size, TVector<std::pair<TString, TString>>&& messageMeta, const NTable::TTransaction* tx) - : SeqNo(sequenceNumber) + : Id(id) , CreatedAt(createdAt) , Size(size) , MessageMeta(std::move(messageMeta)) @@ -389,9 +389,11 @@ private: //TString GetDebugIdentity() const; TClientMessage GetInitClientMessage(); - bool CleanupOnAcknowledged(ui64 sequenceNumber); + bool CleanupOnAcknowledged(ui64 id); bool IsReadyToSendNextImpl() const; - ui64 GetNextSeqNoImpl(const TMaybe<ui64>& seqNo); + ui64 GetNextIdImpl(const TMaybe<ui64>& seqNo); + ui64 GetSeqNoImpl(ui64 id); + ui64 GetIdImpl(ui64 seqNo); void SendImpl(); void AbortImpl(); void CloseImpl(EStatus statusCode, NYql::TIssues&& issues); @@ -418,7 +420,6 @@ private: TString TargetCluster; TString InitialCluster; TString CurrentCluster; - bool OnSeqNoShift = false; TString PreferredClusterByCDS; std::shared_ptr<IWriteSessionConnectionProcessorFactory> ConnectionFactory; TDbDriverStatePtr DbDriverState; @@ -461,9 +462,9 @@ private: bool SessionEstablished = false; ui32 PartitionId = 0; TPartitionLocation PreferredPartitionLocation = {}; - ui64 LastSeqNo = 0; - ui64 MinUnsentSeqNo = 0; - ui64 SeqNoShift = 0; + ui64 NextId = 0; + ui64 MinUnsentId = 1; + TMaybe<ui64> InitSeqNo; TMaybe<bool> AutoSeqNoMode; bool ValidateSeqNoMode = false; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp index 4bb5f40c76..f4d90703f2 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp @@ -137,7 +137,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) { Y_UNIT_TEST(WriteRead) { TTopicSdkTestSetup setup(TEST_CASE_NAME); TTopicClient client = setup.MakeClient(); - + { auto writeSettings = TWriteSessionSettings() .Path(TEST_TOPIC) @@ -653,6 +653,47 @@ Y_UNIT_TEST_SUITE(BasicUsage) { // UNIT_ASSERT(false); } + Y_UNIT_TEST(ConflictingWrites) { + + TTopicSdkTestSetup setup(TEST_CASE_NAME); + + NTopic::TWriteSessionSettings writeSettings; + writeSettings.Path(setup.GetTopicPath()).MessageGroupId(TEST_MESSAGE_GROUP_ID); + writeSettings.Path(setup.GetTopicPath()).ProducerId(TEST_MESSAGE_GROUP_ID); + writeSettings.Codec(NTopic::ECodec::RAW); + NTopic::IExecutor::TPtr executor = new NTopic::TSyncExecutor(); + writeSettings.CompressionExecutor(executor); + + ui64 count = 100u; + + auto client = setup.MakeClient(); + auto session = client.CreateSimpleBlockingWriteSession(writeSettings); + + TString messageBase = "message----"; + + for (auto i = 0u; i < count; i++) { + auto res = session->Write(messageBase); + UNIT_ASSERT(res); + if (i % 10 == 0) { + setup.GetServer().KillTopicPqTablets(setup.GetTopicPath()); + } + } + session->Close(); + + auto describeTopicSettings = TDescribeTopicSettings().IncludeStats(true); + auto result = client.DescribeTopic(setup.GetTopicPath(), describeTopicSettings).GetValueSync(); + UNIT_ASSERT(result.IsSuccess()); + + auto description = result.GetTopicDescription(); + UNIT_ASSERT(description.GetPartitions().size() == 1); + auto stats = description.GetPartitions().front().GetPartitionStats(); + UNIT_ASSERT(stats.Defined()); + UNIT_ASSERT_VALUES_EQUAL(stats->GetEndOffset(), count); + + } + + + } } |