diff options
author | alexbogo <alexbogo@ydb.tech> | 2023-03-06 11:52:42 +0300 |
---|---|---|
committer | alexbogo <alexbogo@ydb.tech> | 2023-03-06 11:52:42 +0300 |
commit | 477afee9815c0675df743b46d486d751b88f9885 (patch) | |
tree | a1839848c7ce63fd4eb8278a395ad53b647432ae | |
parent | b92c1faacc932ce36e23dce74b5b3d31a1cbb02c (diff) | |
download | ydb-477afee9815c0675df743b46d486d751b88f9885.tar.gz |
[ymq] correct process of outdated infly version
init
-rw-r--r-- | ydb/core/ymq/actor/queue_leader.cpp | 45 | ||||
-rw-r--r-- | ydb/core/ymq/actor/queue_leader.h | 2 |
2 files changed, 26 insertions, 21 deletions
diff --git a/ydb/core/ymq/actor/queue_leader.cpp b/ydb/core/ymq/actor/queue_leader.cpp index 0b8d330405f..2d9fecad7ca 100644 --- a/ydb/core/ymq/actor/queue_leader.cpp +++ b/ydb/core/ymq/actor/queue_leader.cpp @@ -1701,7 +1701,7 @@ void TQueueLeader::CreateBackgroundActors() { } } -void TQueueLeader::MarkInflyReloading(ui64 shard, size_t invalidatedCount, const TString& invalidationReason) { +void TQueueLeader::MarkInflyReloading(ui64 shard, i64 invalidatedCount, const TString& invalidationReason) { LWPROBE(InflyInvalidation, UserName_, QueueName_, shard, invalidatedCount, invalidationReason); auto& shardInfo = Shards_[shard]; if (!shardInfo.NeedInflyReload) { @@ -1915,35 +1915,40 @@ void TQueueLeader::OnAddedMessagesToInfly(ui64 shard, const TSqsEvents::TEvExecu shardInfo.LastAddMessagesToInfly = TActivationContext::Now(); bool markInflyReloading = false; - size_t inflyVersionDiff = 0; + i64 inflyVersionDiff = 0; if (reply.GetStatus() == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete) { using NKikimr::NClient::TValue; const TValue val(TValue::Create(reply.GetExecutionEngineEvaluatedResponse())); - const TValue list = val["messages"]; - for (size_t i = 0, size = list.Size(); i < size; ++i) { - const TValue& message = list[i]; - const TValue& delayDeadlineValue = message["DelayDeadline"]; - const ui64 delayDeadlineMs = delayDeadlineValue.HaveValue() ? ui64(delayDeadlineValue) : 0; - const TInstant delayDeadline = TInstant::MilliSeconds(delayDeadlineMs); - const ui64 offset = message["Offset"]; - const ui32 receiveCount = 0; // as in transaction - LOG_SQS_TRACE("Adding message to infly struct for shard " << TLogQueueName(UserName_, QueueName_, shard) << ": { Offset: " << offset << ", DelayDeadline: " << delayDeadline << ", ReceiveCount: " << receiveCount << " }"); - shardInfo.Infly->Add(MakeHolder<TInflyMessage>(offset, message["RandomId"], delayDeadline, receiveCount)); - } - LWPROBE(AddMessagesToInfly, UserName_, QueueName_, shard, list.Size()); - shardInfo.ReadOffset = val["readOffset"]; const ui64 currentInflyVersion = val["currentInflyVersion"]; if (shardInfo.InflyVersion != currentInflyVersion) { - Y_VERIFY(shardInfo.InflyVersion < currentInflyVersion); - inflyVersionDiff = currentInflyVersion - shardInfo.InflyVersion; + inflyVersionDiff = i64(currentInflyVersion) - shardInfo.InflyVersion; LOG_SQS_WARN("Concurrent infly version change detected for " << TLogQueueName(UserName_, QueueName_, shard) << ". Expected " << shardInfo.InflyVersion << ", but got: " << currentInflyVersion << ". Mark infly for reloading"); markInflyReloading = true; } - shardInfo.InflyVersion = val["newInflyVersion"]; + if (shardInfo.InflyVersion > currentInflyVersion) { + LOG_SQS_ERROR("Skip added messages to inflight because infly version is outdated for " << TLogQueueName(UserName_, QueueName_, shard) + << ". Known " << shardInfo.InflyVersion << ", got " << currentInflyVersion); + } else { + shardInfo.InflyVersion = val["newInflyVersion"]; + + const TValue list = val["messages"]; + for (size_t i = 0, size = list.Size(); i < size; ++i) { + const TValue& message = list[i]; + const TValue& delayDeadlineValue = message["DelayDeadline"]; + const ui64 delayDeadlineMs = delayDeadlineValue.HaveValue() ? ui64(delayDeadlineValue) : 0; + const TInstant delayDeadline = TInstant::MilliSeconds(delayDeadlineMs); + const ui64 offset = message["Offset"]; + const ui32 receiveCount = 0; // as in transaction + LOG_SQS_TRACE("Adding message to infly struct for shard " << TLogQueueName(UserName_, QueueName_, shard) << ": { Offset: " << offset << ", DelayDeadline: " << delayDeadline << ", ReceiveCount: " << receiveCount << " }"); + shardInfo.Infly->Add(MakeHolder<TInflyMessage>(offset, message["RandomId"], delayDeadline, receiveCount)); + } + LWPROBE(AddMessagesToInfly, UserName_, QueueName_, shard, list.Size()); + shardInfo.ReadOffset = val["readOffset"]; - // Update messages count - shardInfo.MessagesCount = static_cast<ui64>(i64(val["messagesCount"])); + // Update messages count + shardInfo.MessagesCount = static_cast<ui64>(i64(val["messagesCount"])); + } } else { LOG_SQS_ERROR("Failed to add new messages to infly for " << TLogQueueName(UserName_, QueueName_, shard) << ": " << reply); } diff --git a/ydb/core/ymq/actor/queue_leader.h b/ydb/core/ymq/actor/queue_leader.h index 2b6e6a00571..0f0fa172e7b 100644 --- a/ydb/core/ymq/actor/queue_leader.h +++ b/ydb/core/ymq/actor/queue_leader.h @@ -90,7 +90,7 @@ private: void AnswerFailed(TSqsEvents::TEvGetConfiguration::TPtr& ev); void AskQueueAttributes(); void OnQueueAttributes(const TSqsEvents::TEvExecuted::TRecord& ev); - void MarkInflyReloading(ui64 shard, size_t invalidatedCount, const TString& invalidationReason); + void MarkInflyReloading(ui64 shard, i64 invalidatedCount, const TString& invalidationReason); void StartLoadingInfly(); void StartLoadingInfly(ui64 shard, bool afterFailure = false); void OnInflyLoaded(ui64 shard, const TSqsEvents::TEvExecuted::TRecord& reply); |