aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexbogo <alexbogo@ydb.tech>2023-03-06 11:52:42 +0300
committeralexbogo <alexbogo@ydb.tech>2023-03-06 11:52:42 +0300
commit477afee9815c0675df743b46d486d751b88f9885 (patch)
treea1839848c7ce63fd4eb8278a395ad53b647432ae
parentb92c1faacc932ce36e23dce74b5b3d31a1cbb02c (diff)
downloadydb-477afee9815c0675df743b46d486d751b88f9885.tar.gz
[ymq] correct process of outdated infly version
init
-rw-r--r--ydb/core/ymq/actor/queue_leader.cpp45
-rw-r--r--ydb/core/ymq/actor/queue_leader.h2
2 files changed, 26 insertions, 21 deletions
diff --git a/ydb/core/ymq/actor/queue_leader.cpp b/ydb/core/ymq/actor/queue_leader.cpp
index 0b8d330405f..2d9fecad7ca 100644
--- a/ydb/core/ymq/actor/queue_leader.cpp
+++ b/ydb/core/ymq/actor/queue_leader.cpp
@@ -1701,7 +1701,7 @@ void TQueueLeader::CreateBackgroundActors() {
}
}
-void TQueueLeader::MarkInflyReloading(ui64 shard, size_t invalidatedCount, const TString& invalidationReason) {
+void TQueueLeader::MarkInflyReloading(ui64 shard, i64 invalidatedCount, const TString& invalidationReason) {
LWPROBE(InflyInvalidation, UserName_, QueueName_, shard, invalidatedCount, invalidationReason);
auto& shardInfo = Shards_[shard];
if (!shardInfo.NeedInflyReload) {
@@ -1915,35 +1915,40 @@ void TQueueLeader::OnAddedMessagesToInfly(ui64 shard, const TSqsEvents::TEvExecu
shardInfo.LastAddMessagesToInfly = TActivationContext::Now();
bool markInflyReloading = false;
- size_t inflyVersionDiff = 0;
+ i64 inflyVersionDiff = 0;
if (reply.GetStatus() == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete) {
using NKikimr::NClient::TValue;
const TValue val(TValue::Create(reply.GetExecutionEngineEvaluatedResponse()));
- const TValue list = val["messages"];
- for (size_t i = 0, size = list.Size(); i < size; ++i) {
- const TValue& message = list[i];
- const TValue& delayDeadlineValue = message["DelayDeadline"];
- const ui64 delayDeadlineMs = delayDeadlineValue.HaveValue() ? ui64(delayDeadlineValue) : 0;
- const TInstant delayDeadline = TInstant::MilliSeconds(delayDeadlineMs);
- const ui64 offset = message["Offset"];
- const ui32 receiveCount = 0; // as in transaction
- LOG_SQS_TRACE("Adding message to infly struct for shard " << TLogQueueName(UserName_, QueueName_, shard) << ": { Offset: " << offset << ", DelayDeadline: " << delayDeadline << ", ReceiveCount: " << receiveCount << " }");
- shardInfo.Infly->Add(MakeHolder<TInflyMessage>(offset, message["RandomId"], delayDeadline, receiveCount));
- }
- LWPROBE(AddMessagesToInfly, UserName_, QueueName_, shard, list.Size());
- shardInfo.ReadOffset = val["readOffset"];
const ui64 currentInflyVersion = val["currentInflyVersion"];
if (shardInfo.InflyVersion != currentInflyVersion) {
- Y_VERIFY(shardInfo.InflyVersion < currentInflyVersion);
- inflyVersionDiff = currentInflyVersion - shardInfo.InflyVersion;
+ inflyVersionDiff = i64(currentInflyVersion) - shardInfo.InflyVersion;
LOG_SQS_WARN("Concurrent infly version change detected for " << TLogQueueName(UserName_, QueueName_, shard) << ". Expected "
<< shardInfo.InflyVersion << ", but got: " << currentInflyVersion << ". Mark infly for reloading");
markInflyReloading = true;
}
- shardInfo.InflyVersion = val["newInflyVersion"];
+ if (shardInfo.InflyVersion > currentInflyVersion) {
+ LOG_SQS_ERROR("Skip added messages to inflight because infly version is outdated for " << TLogQueueName(UserName_, QueueName_, shard)
+ << ". Known " << shardInfo.InflyVersion << ", got " << currentInflyVersion);
+ } else {
+ shardInfo.InflyVersion = val["newInflyVersion"];
+
+ const TValue list = val["messages"];
+ for (size_t i = 0, size = list.Size(); i < size; ++i) {
+ const TValue& message = list[i];
+ const TValue& delayDeadlineValue = message["DelayDeadline"];
+ const ui64 delayDeadlineMs = delayDeadlineValue.HaveValue() ? ui64(delayDeadlineValue) : 0;
+ const TInstant delayDeadline = TInstant::MilliSeconds(delayDeadlineMs);
+ const ui64 offset = message["Offset"];
+ const ui32 receiveCount = 0; // as in transaction
+ LOG_SQS_TRACE("Adding message to infly struct for shard " << TLogQueueName(UserName_, QueueName_, shard) << ": { Offset: " << offset << ", DelayDeadline: " << delayDeadline << ", ReceiveCount: " << receiveCount << " }");
+ shardInfo.Infly->Add(MakeHolder<TInflyMessage>(offset, message["RandomId"], delayDeadline, receiveCount));
+ }
+ LWPROBE(AddMessagesToInfly, UserName_, QueueName_, shard, list.Size());
+ shardInfo.ReadOffset = val["readOffset"];
- // Update messages count
- shardInfo.MessagesCount = static_cast<ui64>(i64(val["messagesCount"]));
+ // Update messages count
+ shardInfo.MessagesCount = static_cast<ui64>(i64(val["messagesCount"]));
+ }
} else {
LOG_SQS_ERROR("Failed to add new messages to infly for " << TLogQueueName(UserName_, QueueName_, shard) << ": " << reply);
}
diff --git a/ydb/core/ymq/actor/queue_leader.h b/ydb/core/ymq/actor/queue_leader.h
index 2b6e6a00571..0f0fa172e7b 100644
--- a/ydb/core/ymq/actor/queue_leader.h
+++ b/ydb/core/ymq/actor/queue_leader.h
@@ -90,7 +90,7 @@ private:
void AnswerFailed(TSqsEvents::TEvGetConfiguration::TPtr& ev);
void AskQueueAttributes();
void OnQueueAttributes(const TSqsEvents::TEvExecuted::TRecord& ev);
- void MarkInflyReloading(ui64 shard, size_t invalidatedCount, const TString& invalidationReason);
+ void MarkInflyReloading(ui64 shard, i64 invalidatedCount, const TString& invalidationReason);
void StartLoadingInfly();
void StartLoadingInfly(ui64 shard, bool afterFailure = false);
void OnInflyLoaded(ui64 shard, const TSqsEvents::TEvExecuted::TRecord& reply);