aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <alexnick@ydb.tech>2023-10-20 10:07:10 +0300
committeralexnick <alexnick@ydb.tech>2023-10-20 10:51:13 +0300
commitb167b93e612d7a4f28f8ece777049f4c83a51758 (patch)
treef6305a5422aba190addb1b5f548f1c6fd058e599
parentbcbb765da1b14178a1bc4597e7f2980d1edde337 (diff)
downloadydb-b167b93e612d7a4f28f8ece777049f4c83a51758.tar.gz
fix for assigning seqno
fix for assigning seqno
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp97
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h29
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp12
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp85
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h33
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp43
6 files changed, 171 insertions, 128 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp
index 5ed79a72e3..0aeae6d05d 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp
@@ -256,18 +256,6 @@ void TWriteSessionImpl::OnCdsResponse(
with_lock(Lock) {
if (!InitialCluster) {
InitialCluster = name;
- } else if (CurrentCluster != name) { // Switched to another cluster
- Y_ABORT_UNLESS(CurrentCluster);
- if (name == InitialCluster) { // Returned to initial cluster, disabled SeqNo Shift
- SeqNoShift = 0;
- OnSeqNoShift = false;
- } else { // Switched from initial cluster to second one;
- Y_ABORT_UNLESS(CurrentCluster == InitialCluster);
- if (AutoSeqNoMode.GetOrElse(true)) {
- OnSeqNoShift = true;
- }
- }
-
}
CurrentCluster = name;
}
@@ -313,10 +301,23 @@ TVector<TWriteSessionEvent::TEvent> TWriteSessionImpl::GetEvents(bool block, TMa
return EventsQueue->GetEvents(block, maxEventsCount);
}
-ui64 TWriteSessionImpl::GetNextSeqNoImpl(const TMaybe<ui64>& seqNo) {
+ui64 TWriteSessionImpl::GetIdImpl(ui64 seqNo) {
+ Y_ABORT_UNLESS(AutoSeqNoMode.Defined());
+ Y_ABORT_UNLESS(!*AutoSeqNoMode || InitSeqNo.contains(CurrentCluster) && seqNo > InitSeqNo[CurrentCluster]);
+ return *AutoSeqNoMode ? seqNo - InitSeqNo[CurrentCluster] : seqNo;
+}
+
+ui64 TWriteSessionImpl::GetSeqNoImpl(ui64 id) {
+ Y_ABORT_UNLESS(AutoSeqNoMode.Defined());
+ return *AutoSeqNoMode ? id + InitSeqNo[CurrentCluster] : id;
+
+}
+
+ui64 TWriteSessionImpl::GetNextIdImpl(const TMaybe<ui64>& seqNo) {
Y_ABORT_UNLESS(Lock.IsLocked());
- ui64 seqNoValue = LastSeqNo + 1;
+ ui64 id = ++NextId;
+
if (!AutoSeqNoMode.Defined()) {
AutoSeqNoMode = !seqNo.Defined();
}
@@ -331,11 +332,8 @@ ui64 TWriteSessionImpl::GetNextSeqNoImpl(const TMaybe<ui64>& seqNo) {
);
} else {
- seqNoValue = *seqNo;
+ id = *seqNo;
}
- //! Disable SeqNo shift for manual SeqNo mode;
- OnSeqNoShift = false;
- SeqNoShift = 0;
} else if (!(*AutoSeqNoMode)) {
LOG_LAZY(DbDriverState->Log,
TLOG_ERR,
@@ -345,9 +343,9 @@ ui64 TWriteSessionImpl::GetNextSeqNoImpl(const TMaybe<ui64>& seqNo) {
"Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode"
);
}
- LastSeqNo = seqNoValue;
- return seqNoValue;
+ return id;
}
+
inline void TWriteSessionImpl::CheckHandleResultImpl(THandleResult& result) {
Y_ABORT_UNLESS(Lock.IsLocked());
@@ -374,7 +372,7 @@ void TWriteSessionImpl::WriteInternal(
bool readyToAccept = false;
size_t bufferSize = data.size();
with_lock(Lock) {
- CurrentBatch.Add(GetNextSeqNoImpl(seqNo), createdAtValue, data, codec, originalSize);
+ CurrentBatch.Add(GetNextIdImpl(seqNo), createdAtValue, data, codec, originalSize);
FlushWriteIfRequiredImpl();
readyToAccept = OnMemoryUsageChangedImpl(bufferSize).NowOk;
@@ -735,13 +733,10 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
SessionId = initResponse.session_id();
PartitionId = initResponse.partition_id();
ui64 newLastSeqNo = initResponse.last_sequence_number();
- // SeqNo increased, so there's a risk of loss, apply SeqNo shift.
- // MinUnsentSeqNo must be > 0 if anything was ever sent yet
- if (MinUnsentSeqNo && OnSeqNoShift && newLastSeqNo > MinUnsentSeqNo) {
- SeqNoShift = newLastSeqNo - MinUnsentSeqNo;
- }
result.InitSeqNo = newLastSeqNo;
- LastSeqNo = newLastSeqNo;
+ if (!InitSeqNo.contains(CurrentCluster)) {
+ InitSeqNo[CurrentCluster] = newLastSeqNo >= MinUnsentId ? newLastSeqNo - MinUnsentId + 1 : 0;
+ }
SessionEstablished = true;
LastCountersUpdateTs = TInstant::Now();
@@ -775,7 +770,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
ui64 sequenceNumber = batchWriteResponse.sequence_numbers(messageIndex);
acksEvent.Acks.push_back(TWriteSessionEvent::TWriteAck{
- sequenceNumber - SeqNoShift,
+ GetIdImpl(sequenceNumber),
batchWriteResponse.already_written(messageIndex) ? TWriteSessionEvent::TWriteAck::EES_ALREADY_WRITTEN:
TWriteSessionEvent::TWriteAck::EES_WRITTEN,
TWriteSessionEvent::TWriteAck::TWrittenMessageDetails {
@@ -785,7 +780,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
writeStat,
});
- if (CleanupOnAcknowledged(sequenceNumber - SeqNoShift)) {
+ if (CleanupOnAcknowledged(GetIdImpl(sequenceNumber))) {
result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{TContinuationToken{}});
}
}
@@ -803,14 +798,14 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
return result;
}
-bool TWriteSessionImpl::CleanupOnAcknowledged(ui64 sequenceNumber) {
+bool TWriteSessionImpl::CleanupOnAcknowledged(ui64 id) {
bool result = false;
- LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: acknoledged message " << sequenceNumber);
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: acknoledged message " << id);
UpdateTimedCountersImpl();
- if (SentOriginalMessages.empty() || SentOriginalMessages.front().SeqNo != sequenceNumber){
+ if (SentOriginalMessages.empty() || SentOriginalMessages.front().Id != id){
Cerr << "State before restart was:\n" << StateStr << "\n\n";
DumpState();
- Cerr << "State on ack with seqNo " << sequenceNumber << " is:\n";
+ Cerr << "State on ack with id " << id << " is:\n";
Cerr << StateStr << "\n\n";
Y_ABORT("got unknown ack");
}
@@ -818,7 +813,7 @@ bool TWriteSessionImpl::CleanupOnAcknowledged(ui64 sequenceNumber) {
const auto& sentFront = SentOriginalMessages.front();
ui64 size = 0;
ui64 compressedSize = 0;
- if(!SentPackedMessage.empty() && SentPackedMessage.front().Offset == sequenceNumber) {
+ if(!SentPackedMessage.empty() && SentPackedMessage.front().Offset == id) {
auto memoryUsage = OnMemoryUsageChangedImpl(-SentPackedMessage.front().Data.size());
result = memoryUsage.NowOk && !memoryUsage.WasOk;
const auto& front = SentPackedMessage.front();
@@ -847,7 +842,7 @@ bool TWriteSessionImpl::CleanupOnAcknowledged(ui64 sequenceNumber) {
Y_ABORT_UNLESS(Counters->BytesInflightCompressed->Val() >= 0);
Y_ABORT_UNLESS(Counters->BytesInflightUncompressed->Val() >= 0);
- Y_ABORT_UNLESS(sentFront.SeqNo == sequenceNumber);
+ Y_ABORT_UNLESS(sentFront.Id == id);
(*Counters->BytesInflightTotal) = MemoryUsage;
SentOriginalMessages.pop();
@@ -964,7 +959,7 @@ void TWriteSessionImpl::ResetForRetryImpl() {
PackedMessagesToSend.emplace(std::move(SentPackedMessage.front()));
SentPackedMessage.pop();
}
- ui64 minSeqNo = PackedMessagesToSend.empty() ? LastSeqNo + 1 : PackedMessagesToSend.top().Offset;
+ ui64 minId = PackedMessagesToSend.empty() ? NextId + 1 : PackedMessagesToSend.top().Offset;
std::queue<TOriginalMessage> freshOriginalMessagesToSend;
OriginalMessagesToSend.swap(freshOriginalMessagesToSend);
while (!SentOriginalMessages.empty()) {
@@ -975,9 +970,9 @@ void TWriteSessionImpl::ResetForRetryImpl() {
OriginalMessagesToSend.emplace(std::move(freshOriginalMessagesToSend.front()));
freshOriginalMessagesToSend.pop();
}
- if (!OriginalMessagesToSend.empty() && OriginalMessagesToSend.front().SeqNo < minSeqNo)
- minSeqNo = OriginalMessagesToSend.front().SeqNo;
- MinUnsentSeqNo = minSeqNo;
+ if (!OriginalMessagesToSend.empty() && OriginalMessagesToSend.front().Id < minId)
+ minId = OriginalMessagesToSend.front().Id;
+ MinUnsentId = minId;
Y_ABORT_UNLESS(PackedMessagesToSend.size() == totalPackedMessages);
Y_ABORT_UNLESS(OriginalMessagesToSend.size() == totalOriginalMessages);
}
@@ -1005,8 +1000,8 @@ size_t TWriteSessionImpl::WriteBatchImpl() {
LOG_LAZY(DbDriverState->Log,
TLOG_DEBUG,
- LogPrefix() << "Write " << CurrentBatch.Messages.size() << " messages with seqNo from "
- << CurrentBatch.Messages.begin()->SeqNo << " to " << CurrentBatch.Messages.back().SeqNo
+ LogPrefix() << "Write " << CurrentBatch.Messages.size() << " messages with Id from "
+ << CurrentBatch.Messages.begin()->Id << " to " << CurrentBatch.Messages.back().Id
);
Y_ABORT_UNLESS(CurrentBatch.Messages.size() <= MaxBlockMessageCount);
@@ -1020,11 +1015,11 @@ size_t TWriteSessionImpl::WriteBatchImpl() {
for (size_t i = 0; i != CurrentBatch.Messages.size();) {
TBlock block{};
for (; block.OriginalSize < MaxBlockSize && i != CurrentBatch.Messages.size(); ++i) {
- auto sequenceNumber = CurrentBatch.Messages[i].SeqNo;
+ auto id = CurrentBatch.Messages[i].Id;
auto createTs = CurrentBatch.Messages[i].CreatedAt;
if (!block.MessageCount) {
- block.Offset = sequenceNumber;
+ block.Offset = id;
}
block.MessageCount += 1;
@@ -1042,7 +1037,7 @@ size_t TWriteSessionImpl::WriteBatchImpl() {
UpdateTimedCountersImpl();
(*Counters->BytesInflightUncompressed) += datum.size();
(*Counters->MessagesInflight)++;
- OriginalMessagesToSend.emplace(sequenceNumber, createTs, datum.size());
+ OriginalMessagesToSend.emplace(id, createTs, datum.size());
}
block.Data = std::move(CurrentBatch.Data);
if (skipCompression) {
@@ -1074,7 +1069,7 @@ bool TWriteSessionImpl::IsReadyToSendNextImpl() {
return false;
}
Y_ABORT_UNLESS(!OriginalMessagesToSend.empty(), "There are packed messages but no original messages");
- if (OriginalMessagesToSend.front().SeqNo > PackedMessagesToSend.top().Offset) {
+ if (OriginalMessagesToSend.front().Id > PackedMessagesToSend.top().Offset) {
Cerr << " State before restart was:\n" << StateStr << "\n\n";
DumpState();
@@ -1082,18 +1077,18 @@ bool TWriteSessionImpl::IsReadyToSendNextImpl() {
Y_ABORT("Lost original message(s)");
}
- return PackedMessagesToSend.top().Offset == OriginalMessagesToSend.front().SeqNo;
+ return PackedMessagesToSend.top().Offset == OriginalMessagesToSend.front().Id;
}
void TWriteSessionImpl::DumpState() {
TStringBuilder s;
- s << "STATE:\nSeqNoShift = " << SeqNoShift << "\n";
+ s << "STATE:\n";
auto omts = OriginalMessagesToSend;
s << "OriginalMessagesToSend(" << omts.size() << "):";
ui32 i = 20;
while(!omts.empty() && i-- > 0) {
- s << " " << omts.front().SeqNo;
+ s << " " << omts.front().Id;
omts.pop();
}
if (!omts.empty()) s << " ...";
@@ -1103,7 +1098,7 @@ void TWriteSessionImpl::DumpState() {
omts = SentOriginalMessages;
i = 20;
while(!omts.empty() && i-- > 0) {
- s << " " << omts.front().SeqNo;
+ s << " " << omts.front().Id;
omts.pop();
}
if (omts.size() > 20) {
@@ -1113,7 +1108,7 @@ void TWriteSessionImpl::DumpState() {
}
}
while(!omts.empty()) {
- s << " " << omts.front().SeqNo;
+ s << " " << omts.front().Id;
omts.pop();
}
s << "\n";
@@ -1187,7 +1182,7 @@ void TWriteSessionImpl::SendImpl() {
auto& message = OriginalMessagesToSend.front();
writeRequest->add_sent_at_ms(sentAtMs);
- writeRequest->add_sequence_numbers(message.SeqNo + SeqNoShift);
+ writeRequest->add_sequence_numbers(GetSeqNoImpl(message.Id));
writeRequest->add_message_sizes(message.Size);
writeRequest->add_created_at_ms(message.CreatedAt.MilliSeconds());
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h
index 9bd95ea7cd..aba702f049 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.h
@@ -173,13 +173,13 @@ private:
using IProcessor = IWriteSessionConnectionProcessorFactory::IProcessor;
struct TMessage {
- ui64 SeqNo;
+ ui64 Id;
TInstant CreatedAt;
TStringBuf DataRef;
TMaybe<ECodec> Codec;
ui32 OriginalSize; // only for coded messages
- TMessage(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec = {}, ui32 originalSize = 0)
- : SeqNo(seqNo)
+ TMessage(ui64 id, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec = {}, ui32 originalSize = 0)
+ : Id(id)
, CreatedAt(createdAt)
, DataRef(data)
, Codec(codec)
@@ -194,11 +194,11 @@ private:
TInstant StartedAt = TInstant::Zero();
bool Acquired = false;
bool FlushRequested = false;
- void Add(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize) {
+ void Add(ui64 id, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize) {
if (StartedAt == TInstant::Zero())
StartedAt = TInstant::Now();
CurrentSize += codec ? originalSize : data.size();
- Messages.emplace_back(seqNo, createdAt, data, codec, originalSize);
+ Messages.emplace_back(id, createdAt, data, codec, originalSize);
Acquired = false;
}
@@ -264,11 +264,11 @@ private:
};
struct TOriginalMessage {
- ui64 SeqNo;
+ ui64 Id;
TInstant CreatedAt;
size_t Size;
- TOriginalMessage(const ui64 sequenceNumber, const TInstant createdAt, const size_t size)
- : SeqNo(sequenceNumber)
+ TOriginalMessage(const ui64 id, const TInstant createdAt, const size_t size)
+ : Id(id)
, CreatedAt(createdAt)
, Size(size)
{}
@@ -360,10 +360,12 @@ private:
//TString GetDebugIdentity() const;
Ydb::PersQueue::V1::StreamingWriteClientMessage GetInitClientMessage();
- bool CleanupOnAcknowledged(ui64 sequenceNumber);
+ bool CleanupOnAcknowledged(ui64 id);
bool IsReadyToSendNextImpl();
void DumpState();
- ui64 GetNextSeqNoImpl(const TMaybe<ui64>& seqNo);
+ ui64 GetNextIdImpl(const TMaybe<ui64>& seqNo);
+ ui64 GetSeqNoImpl(ui64 id);
+ ui64 GetIdImpl(ui64 seqNo);
void SendImpl();
void AbortImpl();
void CloseImpl(EStatus statusCode, NYql::TIssues&& issues);
@@ -385,7 +387,6 @@ private:
TString TargetCluster;
TString InitialCluster;
TString CurrentCluster;
- bool OnSeqNoShift = false;
TString PreferredClusterByCDS;
std::shared_ptr<IWriteSessionConnectionProcessorFactory> ConnectionFactory;
TDbDriverStatePtr DbDriverState;
@@ -426,9 +427,9 @@ private:
TAtomic Aborting = 0;
bool SessionEstablished = false;
ui32 PartitionId = 0;
- ui64 LastSeqNo = 0;
- ui64 MinUnsentSeqNo = 0;
- ui64 SeqNoShift = 0;
+ ui64 NextId = 0;
+ ui64 MinUnsentId = 1;
+ std::map<TString, ui64> InitSeqNo;
TMaybe<bool> AutoSeqNoMode;
bool ValidateSeqNoMode = false;
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp
index ac81189cb0..298c574ae1 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/retry_policy_ut.cpp
@@ -167,7 +167,7 @@ Y_UNIT_TEST_SUITE(RetryPolicy) {
auto settings = setup1->GetWriteSessionSettings();
auto& client = setup1->GetPersQueueClient();
-
+
//! Fill data in dc1 1 with SeqNo = 1..10 for 2 different SrcId
Cerr << "===Write 10 messages into every writer\n";
for (auto i = 0; i != 10; i++) {
@@ -210,7 +210,7 @@ Y_UNIT_TEST_SUITE(RetryPolicy) {
//! Put some data inflight. It cannot be written now, but SeqNo will be assigned.
Cerr << "===Write four async message into every writer\n";
- for (auto i = 0; i != 4; i++) {
+ for (auto i = 0; i != 3; i++) {
writer1->Write(false);
writer2->Write(false);
}
@@ -244,7 +244,7 @@ Y_UNIT_TEST_SUITE(RetryPolicy) {
//! Put some data inflight again;
Cerr << "===Write four async messages into writer2\n";
- for (auto i = 0; i != 4; i++) {
+ for (auto i = 0; i != 3; i++) {
writer2->Write(false);
}
f2 = writer2->Write(false);
@@ -285,7 +285,7 @@ Y_UNIT_TEST_SUITE(RetryPolicy) {
CheckSeqNo("dc1", 14);
//! DC2 has no shift in SeqNo since 5 messages were written to dc 1.
Cerr << "===Check SeqNo writer2 dc2\n";
- CheckSeqNo("dc2", 15);
+ CheckSeqNo("dc2", 9);
auto readSession = client.CreateReadSession(setup1->GetReadSessionSettings());
@@ -303,7 +303,7 @@ Y_UNIT_TEST_SUITE(RetryPolicy) {
};
THashMap<TString, ui64> MsgCountByClusterSrc2 = {
{"dc1", 14},
- {"dc2", 6}
+ {"dc2", 5}
};
ui32 clustersPendingSrc1 = 2;
ui32 clustersPendingSrc2 = 2;
@@ -337,7 +337,7 @@ Y_UNIT_TEST_SUITE(RetryPolicy) {
if (prevSeqNo == 0) {
UNIT_ASSERT_VALUES_EQUAL(seqNo, 1);
} else if (prevSeqNo == 1) {
- UNIT_ASSERT_VALUES_EQUAL(seqNo, 11);
+ UNIT_ASSERT_VALUES_EQUAL(seqNo, 6);
} else {
UNIT_ASSERT_VALUES_EQUAL(seqNo, prevSeqNo + 1);
}
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
index ccc57181fb..c02e463c66 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
@@ -319,17 +319,26 @@ TVector<TWriteSessionEvent::TEvent> TWriteSessionImpl::GetEvents(bool block, TMa
return EventsQueue->GetEvents(block, maxEventsCount);
}
-ui64 TWriteSessionImpl::GetNextSeqNoImpl(const TMaybe<ui64>& seqNo) {
+ui64 TWriteSessionImpl::GetIdImpl(ui64 seqNo) {
+ Y_ABORT_UNLESS(AutoSeqNoMode.Defined());
+ Y_ABORT_UNLESS(!*AutoSeqNoMode || InitSeqNo.Defined() && seqNo > *InitSeqNo);
+ return *AutoSeqNoMode ? seqNo - *InitSeqNo : seqNo;
+}
+
+ui64 TWriteSessionImpl::GetSeqNoImpl(ui64 id) {
+ Y_ABORT_UNLESS(AutoSeqNoMode.Defined());
+ Y_ABORT_UNLESS(InitSeqNo.Defined());
+ return *AutoSeqNoMode ? id + *InitSeqNo : id;
+
+}
+
+ui64 TWriteSessionImpl::GetNextIdImpl(const TMaybe<ui64>& seqNo) {
+
Y_ABORT_UNLESS(Lock.IsLocked());
- ui64 seqNoValue = LastSeqNo + 1;
+ ui64 id = ++NextId;
if (!AutoSeqNoMode.Defined()) {
AutoSeqNoMode = !seqNo.Defined();
- //! Disable SeqNo shift for manual SeqNo mode;
- if (seqNo.Defined()) {
- OnSeqNoShift = false;
- SeqNoShift = 0;
- }
}
if (seqNo.Defined()) {
if (!Settings.DeduplicationEnabled_.GetOrElse(true)) {
@@ -345,7 +354,7 @@ ui64 TWriteSessionImpl::GetNextSeqNoImpl(const TMaybe<ui64>& seqNo) {
"Cannot call write() with defined SeqNo on WriteSession running in auto-seqNo mode"
);
} else {
- seqNoValue = *seqNo;
+ id = *seqNo;
}
} else if (!(*AutoSeqNoMode)) {
LOG_LAZY(DbDriverState->Log,
@@ -356,9 +365,9 @@ ui64 TWriteSessionImpl::GetNextSeqNoImpl(const TMaybe<ui64>& seqNo) {
"Cannot call write() without defined SeqNo on WriteSession running in manual-seqNo mode"
);
}
- LastSeqNo = seqNoValue;
- return seqNoValue;
+ return id;
}
+
inline void TWriteSessionImpl::CheckHandleResultImpl(THandleResult& result) {
Y_ABORT_UNLESS(Lock.IsLocked());
@@ -384,7 +393,7 @@ void TWriteSessionImpl::WriteInternal(TContinuationToken&&, TWriteMessage&& mess
size_t bufferSize = message.Data.size();
with_lock(Lock) {
CurrentBatch.Add(
- GetNextSeqNoImpl(message.SeqNo_), createdAtValue, message.Data, message.Codec, message.OriginalSize,
+ GetNextIdImpl(message.SeqNo_), createdAtValue, message.Data, message.Codec, message.OriginalSize,
message.MessageMeta_,
message.GetTxPtr()
);
@@ -775,17 +784,13 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
SessionId = initResponse.session_id();
PartitionId = initResponse.partition_id();
ui64 newLastSeqNo = initResponse.last_seq_no();
- // SeqNo increased, so there's a risk of loss, apply SeqNo shift.
- // MinUnsentSeqNo must be > 0 if anything was ever sent yet
- if (Settings.DeduplicationEnabled_.GetOrElse(true)) {
- if(MinUnsentSeqNo && OnSeqNoShift && newLastSeqNo > MinUnsentSeqNo) {
- SeqNoShift = newLastSeqNo - MinUnsentSeqNo;
- }
- } else {
- newLastSeqNo = 1;
- }
+ if (!Settings.DeduplicationEnabled_.GetOrElse(true)) {
+ newLastSeqNo = 0;
+ }
result.InitSeqNo = newLastSeqNo;
- LastSeqNo = newLastSeqNo;
+ if (!InitSeqNo.Defined()) {
+ InitSeqNo = newLastSeqNo;
+ }
SessionEstablished = true;
LastCountersUpdateTs = TInstant::Now();
@@ -835,7 +840,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
ui64 offset = ack.has_written() ? ack.written().offset() : 0;
acksEvent.Acks.push_back(TWriteSessionEvent::TWriteAck{
- sequenceNumber - SeqNoShift,
+ GetIdImpl(sequenceNumber),
msgWriteStatus,
TWriteSessionEvent::TWriteAck::TWrittenMessageDetails {
offset,
@@ -844,7 +849,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
writeStat,
});
- if (CleanupOnAcknowledged(sequenceNumber - SeqNoShift)) {
+ if (CleanupOnAcknowledged(GetIdImpl(sequenceNumber))) {
result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{{}, TContinuationToken{}});
}
}
@@ -862,14 +867,14 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
return result;
}
-bool TWriteSessionImpl::CleanupOnAcknowledged(ui64 sequenceNumber) {
+bool TWriteSessionImpl::CleanupOnAcknowledged(ui64 id) {
bool result = false;
- LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: acknoledged message " << sequenceNumber);
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: acknoledged message " << id);
UpdateTimedCountersImpl();
const auto& sentFront = SentOriginalMessages.front();
ui64 size = 0;
ui64 compressedSize = 0;
- if(!SentPackedMessage.empty() && SentPackedMessage.front().Offset == sequenceNumber) {
+ if(!SentPackedMessage.empty() && SentPackedMessage.front().Offset == id) {
auto memoryUsage = OnMemoryUsageChangedImpl(-SentPackedMessage.front().Data.size());
result = memoryUsage.NowOk && !memoryUsage.WasOk;
const auto& front = SentPackedMessage.front();
@@ -898,7 +903,7 @@ bool TWriteSessionImpl::CleanupOnAcknowledged(ui64 sequenceNumber) {
Y_ABORT_UNLESS(Counters->BytesInflightCompressed->Val() >= 0);
Y_ABORT_UNLESS(Counters->BytesInflightUncompressed->Val() >= 0);
- Y_ABORT_UNLESS(sentFront.SeqNo == sequenceNumber);
+ Y_ABORT_UNLESS(sentFront.Id == id);
(*Counters->BytesInflightTotal) = MemoryUsage;
SentOriginalMessages.pop();
@@ -1013,7 +1018,7 @@ void TWriteSessionImpl::ResetForRetryImpl() {
PackedMessagesToSend.emplace(std::move(SentPackedMessage.front()));
SentPackedMessage.pop();
}
- ui64 minSeqNo = PackedMessagesToSend.empty() ? LastSeqNo + 1 : PackedMessagesToSend.top().Offset;
+ ui64 minId = PackedMessagesToSend.empty() ? NextId + 1 : PackedMessagesToSend.top().Offset;
std::queue<TOriginalMessage> freshOriginalMessagesToSend;
OriginalMessagesToSend.swap(freshOriginalMessagesToSend);
while (!SentOriginalMessages.empty()) {
@@ -1024,9 +1029,9 @@ void TWriteSessionImpl::ResetForRetryImpl() {
OriginalMessagesToSend.emplace(std::move(freshOriginalMessagesToSend.front()));
freshOriginalMessagesToSend.pop();
}
- if (!OriginalMessagesToSend.empty() && OriginalMessagesToSend.front().SeqNo < minSeqNo)
- minSeqNo = OriginalMessagesToSend.front().SeqNo;
- MinUnsentSeqNo = minSeqNo;
+ if (!OriginalMessagesToSend.empty() && OriginalMessagesToSend.front().Id < minId)
+ minId = OriginalMessagesToSend.front().Id;
+ MinUnsentId = minId;
Y_ABORT_UNLESS(PackedMessagesToSend.size() == totalPackedMessages);
Y_ABORT_UNLESS(OriginalMessagesToSend.size() == totalOriginalMessages);
}
@@ -1054,8 +1059,8 @@ size_t TWriteSessionImpl::WriteBatchImpl() {
LOG_LAZY(DbDriverState->Log,
TLOG_DEBUG,
- LogPrefix() << "Write " << CurrentBatch.Messages.size() << " messages with seqNo from "
- << CurrentBatch.Messages.begin()->SeqNo << " to " << CurrentBatch.Messages.back().SeqNo
+ LogPrefix() << "Write " << CurrentBatch.Messages.size() << " messages with Id from "
+ << CurrentBatch.Messages.begin()->Id << " to " << CurrentBatch.Messages.back().Id
);
Y_ABORT_UNLESS(CurrentBatch.Messages.size() <= MaxBlockMessageCount);
@@ -1070,11 +1075,11 @@ size_t TWriteSessionImpl::WriteBatchImpl() {
TBlock block{};
for (; block.OriginalSize < MaxBlockSize && i != CurrentBatch.Messages.size(); ++i) {
auto& currMessage = CurrentBatch.Messages[i];
- auto sequenceNumber = currMessage.SeqNo;
+ auto id = currMessage.Id;
auto createTs = currMessage.CreatedAt;
if (!block.MessageCount) {
- block.Offset = sequenceNumber;
+ block.Offset = id;
}
block.MessageCount += 1;
@@ -1093,11 +1098,11 @@ size_t TWriteSessionImpl::WriteBatchImpl() {
(*Counters->BytesInflightUncompressed) += datum.size();
(*Counters->MessagesInflight)++;
if (!currMessage.MessageMeta.empty()) {
- OriginalMessagesToSend.emplace(sequenceNumber, createTs, datum.size(),
+ OriginalMessagesToSend.emplace(id, createTs, datum.size(),
std::move(currMessage.MessageMeta),
currMessage.Tx);
} else {
- OriginalMessagesToSend.emplace(sequenceNumber, createTs, datum.size(),
+ OriginalMessagesToSend.emplace(id, createTs, datum.size(),
currMessage.Tx);
}
}
@@ -1131,9 +1136,9 @@ bool TWriteSessionImpl::IsReadyToSendNextImpl() const {
return false;
}
Y_ABORT_UNLESS(!OriginalMessagesToSend.empty(), "There are packed messages but no original messages");
- Y_ABORT_UNLESS(OriginalMessagesToSend.front().SeqNo <= PackedMessagesToSend.top().Offset, "Lost original message(s)");
+ Y_ABORT_UNLESS(OriginalMessagesToSend.front().Id <= PackedMessagesToSend.top().Offset, "Lost original message(s)");
- return PackedMessagesToSend.top().Offset == OriginalMessagesToSend.front().SeqNo;
+ return PackedMessagesToSend.top().Offset == OriginalMessagesToSend.front().Id;
}
@@ -1184,7 +1189,7 @@ void TWriteSessionImpl::SendImpl() {
writeRequest->mutable_tx()->set_session(message.Tx->GetSession().GetId());
}
- msgData->set_seq_no(message.SeqNo + SeqNoShift);
+ msgData->set_seq_no(GetSeqNoImpl(message.Id));
*msgData->mutable_created_at() = ::google::protobuf::util::TimeUtil::MillisecondsToTimestamp(message.CreatedAt.MilliSeconds());
if (!message.MessageMeta.empty()) {
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
index 90eb3b409f..cc8da2d38d 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
@@ -163,7 +163,7 @@ private:
using IProcessor = IWriteSessionConnectionProcessorFactory::IProcessor;
struct TMessage {
- ui64 SeqNo;
+ ui64 Id;
TInstant CreatedAt;
TStringBuf DataRef;
TMaybe<ECodec> Codec;
@@ -171,10 +171,10 @@ private:
TVector<std::pair<TString, TString>> MessageMeta;
const NTable::TTransaction* Tx;
- TMessage(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec = {},
+ TMessage(ui64 id, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec = {},
ui32 originalSize = 0, const TVector<std::pair<TString, TString>>& messageMeta = {},
const NTable::TTransaction* tx = nullptr)
- : SeqNo(seqNo)
+ : Id(id)
, CreatedAt(createdAt)
, DataRef(data)
, Codec(codec)
@@ -192,13 +192,13 @@ private:
bool Acquired = false;
bool FlushRequested = false;
- void Add(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize,
+ void Add(ui64 id, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize,
const TVector<std::pair<TString, TString>>& messageMeta,
const NTable::TTransaction* tx) {
if (StartedAt == TInstant::Zero())
StartedAt = TInstant::Now();
CurrentSize += codec ? originalSize : data.size();
- Messages.emplace_back(seqNo, createdAt, data, codec, originalSize, messageMeta, tx);
+ Messages.emplace_back(id, createdAt, data, codec, originalSize, messageMeta, tx);
Acquired = false;
}
@@ -264,24 +264,24 @@ private:
};
struct TOriginalMessage {
- ui64 SeqNo;
+ ui64 Id;
TInstant CreatedAt;
size_t Size;
TVector<std::pair<TString, TString>> MessageMeta;
const NTable::TTransaction* Tx;
- TOriginalMessage(const ui64 sequenceNumber, const TInstant createdAt, const size_t size,
+ TOriginalMessage(const ui64 id, const TInstant createdAt, const size_t size,
const NTable::TTransaction* tx)
- : SeqNo(sequenceNumber)
+ : Id(id)
, CreatedAt(createdAt)
, Size(size)
, Tx(tx)
{}
- TOriginalMessage(const ui64 sequenceNumber, const TInstant createdAt, const size_t size,
+ TOriginalMessage(const ui64 id, const TInstant createdAt, const size_t size,
TVector<std::pair<TString, TString>>&& messageMeta,
const NTable::TTransaction* tx)
- : SeqNo(sequenceNumber)
+ : Id(id)
, CreatedAt(createdAt)
, Size(size)
, MessageMeta(std::move(messageMeta))
@@ -389,9 +389,11 @@ private:
//TString GetDebugIdentity() const;
TClientMessage GetInitClientMessage();
- bool CleanupOnAcknowledged(ui64 sequenceNumber);
+ bool CleanupOnAcknowledged(ui64 id);
bool IsReadyToSendNextImpl() const;
- ui64 GetNextSeqNoImpl(const TMaybe<ui64>& seqNo);
+ ui64 GetNextIdImpl(const TMaybe<ui64>& seqNo);
+ ui64 GetSeqNoImpl(ui64 id);
+ ui64 GetIdImpl(ui64 seqNo);
void SendImpl();
void AbortImpl();
void CloseImpl(EStatus statusCode, NYql::TIssues&& issues);
@@ -418,7 +420,6 @@ private:
TString TargetCluster;
TString InitialCluster;
TString CurrentCluster;
- bool OnSeqNoShift = false;
TString PreferredClusterByCDS;
std::shared_ptr<IWriteSessionConnectionProcessorFactory> ConnectionFactory;
TDbDriverStatePtr DbDriverState;
@@ -461,9 +462,9 @@ private:
bool SessionEstablished = false;
ui32 PartitionId = 0;
TPartitionLocation PreferredPartitionLocation = {};
- ui64 LastSeqNo = 0;
- ui64 MinUnsentSeqNo = 0;
- ui64 SeqNoShift = 0;
+ ui64 NextId = 0;
+ ui64 MinUnsentId = 1;
+ TMaybe<ui64> InitSeqNo;
TMaybe<bool> AutoSeqNoMode;
bool ValidateSeqNoMode = false;
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
index 4bb5f40c76..f4d90703f2 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
@@ -137,7 +137,7 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
Y_UNIT_TEST(WriteRead) {
TTopicSdkTestSetup setup(TEST_CASE_NAME);
TTopicClient client = setup.MakeClient();
-
+
{
auto writeSettings = TWriteSessionSettings()
.Path(TEST_TOPIC)
@@ -653,6 +653,47 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
// UNIT_ASSERT(false);
}
+ Y_UNIT_TEST(ConflictingWrites) {
+
+ TTopicSdkTestSetup setup(TEST_CASE_NAME);
+
+ NTopic::TWriteSessionSettings writeSettings;
+ writeSettings.Path(setup.GetTopicPath()).MessageGroupId(TEST_MESSAGE_GROUP_ID);
+ writeSettings.Path(setup.GetTopicPath()).ProducerId(TEST_MESSAGE_GROUP_ID);
+ writeSettings.Codec(NTopic::ECodec::RAW);
+ NTopic::IExecutor::TPtr executor = new NTopic::TSyncExecutor();
+ writeSettings.CompressionExecutor(executor);
+
+ ui64 count = 100u;
+
+ auto client = setup.MakeClient();
+ auto session = client.CreateSimpleBlockingWriteSession(writeSettings);
+
+ TString messageBase = "message----";
+
+ for (auto i = 0u; i < count; i++) {
+ auto res = session->Write(messageBase);
+ UNIT_ASSERT(res);
+ if (i % 10 == 0) {
+ setup.GetServer().KillTopicPqTablets(setup.GetTopicPath());
+ }
+ }
+ session->Close();
+
+ auto describeTopicSettings = TDescribeTopicSettings().IncludeStats(true);
+ auto result = client.DescribeTopic(setup.GetTopicPath(), describeTopicSettings).GetValueSync();
+ UNIT_ASSERT(result.IsSuccess());
+
+ auto description = result.GetTopicDescription();
+ UNIT_ASSERT(description.GetPartitions().size() == 1);
+ auto stats = description.GetPartitions().front().GetPartitionStats();
+ UNIT_ASSERT(stats.Defined());
+ UNIT_ASSERT_VALUES_EQUAL(stats->GetEndOffset(), count);
+
+ }
+
+
+
}
}