aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexbogo <alexbogo@ydb.tech>2023-03-21 12:45:15 +0300
committeralexbogo <alexbogo@ydb.tech>2023-03-21 12:45:15 +0300
commit6c583eeffc50a6ee9277f24d25c5705ea9b9ccd7 (patch)
treebdd6fd8ad53c36f052a718c4e4b2d335ad10928a
parent1cf6f6a03c1e2d29102a6482c3444a106e1d1ab0 (diff)
downloadydb-6c583eeffc50a6ee9277f24d25c5705ea9b9ccd7.tar.gz
[ymq] store oldest message info in state actual
init
-rw-r--r--ydb/core/ymq/actor/events.h22
-rw-r--r--ydb/core/ymq/actor/node_tracker.cpp20
-rw-r--r--ydb/core/ymq/actor/node_tracker.h2
-rw-r--r--ydb/core/ymq/actor/purge.cpp16
-rw-r--r--ydb/core/ymq/actor/purge.h1
-rw-r--r--ydb/core/ymq/actor/queue_leader.cpp221
-rw-r--r--ydb/core/ymq/actor/queue_leader.h22
-rw-r--r--ydb/core/ymq/actor/retention.cpp13
-rw-r--r--ydb/core/ymq/actor/retention.h2
-rw-r--r--ydb/core/ymq/actor/service.cpp32
-rw-r--r--ydb/core/ymq/actor/user_settings_names.cpp1
-rw-r--r--ydb/core/ymq/actor/user_settings_names.h1
-rw-r--r--ydb/core/ymq/base/query_id.h1
-rw-r--r--ydb/core/ymq/queues/fifo/queries.cpp31
-rw-r--r--ydb/core/ymq/queues/std/queries.cpp30
15 files changed, 363 insertions, 52 deletions
diff --git a/ydb/core/ymq/actor/events.h b/ydb/core/ymq/actor/events.h
index 540b314b3ee..93744c74934 100644
--- a/ydb/core/ymq/actor/events.h
+++ b/ydb/core/ymq/actor/events.h
@@ -130,6 +130,9 @@ struct TSqsEvents {
EvNodeTrackerUnsubscribeRequest,
EvNodeTrackerSubscriptionStatus,
+ EvForceReloadState,
+ EvChangeRetentionActiveCheck,
+
EvEnd,
};
@@ -730,6 +733,7 @@ struct TSqsEvents {
struct TEvQueuePurgedNotification : public NActors::TEventLocal<TEvQueuePurgedNotification, EvQueuePurgedNotification> {
ui64 Shard = 0;
ui64 NewMessagesCount = 0;
+ TVector<ui64> DeletedOffsets;
};
struct TEvGetRuntimeQueueAttributes : public NActors::TEventLocal<TEvGetRuntimeQueueAttributes, EvGetRuntimeQueueAttributes> {
@@ -926,12 +930,28 @@ struct TSqsEvents {
};
struct TEvNodeTrackerSubscriptionStatus : public NActors::TEventLocal<TEvNodeTrackerSubscriptionStatus, EvNodeTrackerSubscriptionStatus> {
- explicit TEvNodeTrackerSubscriptionStatus(ui64 subscriptionId, ui32 nodeId)
+ TEvNodeTrackerSubscriptionStatus(ui64 subscriptionId, ui32 nodeId, bool disconnected=false)
: SubscriptionId(subscriptionId)
, NodeId(nodeId)
+ , Disconnected(disconnected)
{}
ui64 SubscriptionId;
ui32 NodeId;
+ bool Disconnected;
+ };
+
+ struct TEvForceReloadState : public NActors::TEventLocal<TEvForceReloadState, EvForceReloadState> {
+ explicit TEvForceReloadState(TDuration nextTryAfter = TDuration::Zero())
+ : NextTryAfter(nextTryAfter)
+ {}
+ TDuration NextTryAfter;
+ };
+
+ struct TEvChangeRetentionActiveCheck : public NActors::TEventLocal<TEvChangeRetentionActiveCheck, EvChangeRetentionActiveCheck> {
+ explicit TEvChangeRetentionActiveCheck(bool active)
+ : Active(active)
+ {}
+ bool Active;
};
};
diff --git a/ydb/core/ymq/actor/node_tracker.cpp b/ydb/core/ymq/actor/node_tracker.cpp
index 1fdc020347e..ba2d27004e1 100644
--- a/ydb/core/ymq/actor/node_tracker.cpp
+++ b/ydb/core/ymq/actor/node_tracker.cpp
@@ -132,15 +132,25 @@ namespace NKikimr::NSQS {
}
}
- void TNodeTrackerActor::AnswerForSubscriber(ui64 subscriptionId, ui32 nodeId) {
- Send(ParentActor, new TSqsEvents::TEvNodeTrackerSubscriptionStatus(subscriptionId, nodeId));
+ void TNodeTrackerActor::AnswerForSubscriber(ui64 subscriptionId, ui32 nodeId, bool disconnected) {
+ Send(ParentActor, new TSqsEvents::TEvNodeTrackerSubscriptionStatus(subscriptionId, nodeId, disconnected));
}
void TNodeTrackerActor::HandlePipeClientDisconnected(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const NActors::TActorContext&) {
- auto it = TabletsInfo.find(ev->Get()->TabletId);
+ ui64 tabletId = ev->Get()->TabletId;
+ auto it = TabletsInfo.find(tabletId);
if (it != TabletsInfo.end()) {
- LOG_SQS_DEBUG(GetLogPrefix() << "tablet pipe " << ev->Get()->TabletId << " disconnected");
- ReconnectToTablet(ev->Get()->TabletId);
+ LOG_SQS_DEBUG(GetLogPrefix() << "tablet pipe " << tabletId << " disconnected");
+
+ auto& info = it->second;
+ if (info.PipeServer) {
+ for (auto& [id, subscriber] : info.Subscribers) {
+ if (subscriber->NodeId) {
+ AnswerForSubscriber(id, subscriber->NodeId.value(), true);
+ }
+ }
+ }
+ ReconnectToTablet(tabletId);
} else {
LOG_SQS_WARN(GetLogPrefix() << " disconnected from unrequired tablet id: [" << ev->Get()->TabletId << "]. Client pipe actor: " << ev->Get()->ClientId << ". Server pipe actor: " << ev->Get()->ServerId);
}
diff --git a/ydb/core/ymq/actor/node_tracker.h b/ydb/core/ymq/actor/node_tracker.h
index 0529589b07b..7042f584473 100644
--- a/ydb/core/ymq/actor/node_tracker.h
+++ b/ydb/core/ymq/actor/node_tracker.h
@@ -76,7 +76,7 @@ public:
ui64 GetTabletId(const TMap<TKeyPrefix, ui64>& tabletsPerEndKeyRange, TKeyPrefix keyPrefix) const;
ui64 GetTabletId(const TSubscriberInfo& subscriber) const;
- void AnswerForSubscriber(ui64 subscriptionId, ui32 nodeId);
+ void AnswerForSubscriber(ui64 subscriptionId, ui32 nodeId, bool disconnected=false);
void RemoveSubscriber(TSqsEvents::TEvNodeTrackerUnsubscribeRequest::TPtr& request, const NActors::TActorContext& ctx);
bool SubscriberMustWait(const TSubscriberInfo& subscriber) const;
void AddSubscriber(TSqsEvents::TEvNodeTrackerSubscribeRequest::TPtr& request, const NActors::TActorContext& ctx);
diff --git a/ydb/core/ymq/actor/purge.cpp b/ydb/core/ymq/actor/purge.cpp
index e9c4c9b6eff..76bba1c4772 100644
--- a/ydb/core/ymq/actor/purge.cpp
+++ b/ydb/core/ymq/actor/purge.cpp
@@ -151,7 +151,20 @@ static void FillMessagesParam(NClient::TWriteValue& messagesParam, const NClient
}
void TPurgeActor::MakeStage2Request(ui64 cleanupVersion, const TValue& messages, const TMaybe<TValue>& inflyMessages, const ui64 shardId, TShard* shard) {
- auto onExecuted = [this, shardId, shard] (const TSqsEvents::TEvExecuted::TRecord& ev) {
+ TVector<ui64> offsets;
+ auto collectOffsetsFrom = [&](const TValue& msgs) {
+ for (size_t i = 0; i < msgs.Size(); ++i) {
+ const ui64 offset = msgs[i]["Offset"];
+ offsets.push_back(offset);
+ }
+ };
+
+ collectOffsetsFrom(messages);
+ if (inflyMessages) {
+ collectOffsetsFrom(*inflyMessages);
+ }
+
+ auto onExecuted = [this, shardId, shard, offsets] (const TSqsEvents::TEvExecuted::TRecord& ev) {
const ui32 status = ev.GetStatus();
if (status == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete) {
const TValue val(TValue::Create(ev.GetExecutionEngineEvaluatedResponse()));
@@ -168,6 +181,7 @@ void TPurgeActor::MakeStage2Request(ui64 cleanupVersion, const TValue& messages,
auto notification = MakeHolder<TSqsEvents::TEvQueuePurgedNotification>();
notification->Shard = shardId;
notification->NewMessagesCount = static_cast<ui64>(newMessagesCount);
+ notification->DeletedOffsets = std::move(offsets);
Send(QueueLeader_, std::move(notification));
}
diff --git a/ydb/core/ymq/actor/purge.h b/ydb/core/ymq/actor/purge.h
index 69a1f836de4..77b385c5cf4 100644
--- a/ydb/core/ymq/actor/purge.h
+++ b/ydb/core/ymq/actor/purge.h
@@ -21,7 +21,6 @@ class TPurgeActor : public TActorBootstrapped<TPurgeActor> {
ui64 Offset = 0;
TInstant SentTimestamp = TInstant::Zero();
};
- std::pair<ui64, ui64> CurrentOffsets;
TMessageBoundary CurrentLastMessage;
TMessageBoundary PreviousSuccessfullyProcessedLastMessage;
};
diff --git a/ydb/core/ymq/actor/queue_leader.cpp b/ydb/core/ymq/actor/queue_leader.cpp
index 0cd68ab1601..32472412e88 100644
--- a/ydb/core/ymq/actor/queue_leader.cpp
+++ b/ydb/core/ymq/actor/queue_leader.cpp
@@ -27,6 +27,7 @@
LWTRACE_USING(SQS_PROVIDER);
+
namespace NKikimr::NSQS {
constexpr ui64 UPDATE_COUNTERS_TAG = 0;
@@ -38,7 +39,29 @@ const TString INFLY_INVALIDATION_REASON_VERSION_CHANGED = "InflyVersionChanged";
const TString INFLY_INVALIDATION_REASON_DEADLINE_CHANGED = "MessageDeadlineChanged";
const TString INFLY_INVALIDATION_REASON_DELETED = "MessageDeleted";
-TQueueLeader::TQueueLeader(TString userName, TString queueName, TString folderId, TString rootUrl, TIntrusivePtr<TQueueCounters> counters, TIntrusivePtr<TUserCounters> userCounters, const TActorId& schemeCache, const TIntrusivePtr<TSqsEvents::TQuoterResourcesForActions>& quoterResourcesForUser)
+constexpr TDuration STATE_UPDATE_PERIOD_MIN = TDuration::MilliSeconds(200);
+constexpr TDuration STATE_UPDATE_PERIOD_MAX = TDuration::Minutes(1);
+
+TDuration GetNextReloadStateWaitPeriod(TDuration current = TDuration::Zero()) {
+ if (current == STATE_UPDATE_PERIOD_MAX) {
+ return TDuration::Max();
+ }
+ auto nextWaitPeriod = 2 * Max(current, STATE_UPDATE_PERIOD_MIN);
+ nextWaitPeriod += TDuration::MilliSeconds(RandomNumber<ui64>(nextWaitPeriod.MilliSeconds() / 2));
+ return Max(STATE_UPDATE_PERIOD_MAX, nextWaitPeriod);
+}
+
+TQueueLeader::TQueueLeader(
+ TString userName,
+ TString queueName,
+ TString folderId,
+ TString rootUrl,
+ TIntrusivePtr<TQueueCounters> counters,
+ TIntrusivePtr<TUserCounters> userCounters,
+ const TActorId& schemeCache,
+ const TIntrusivePtr<TSqsEvents::TQuoterResourcesForActions>& quoterResourcesForUser,
+ bool useCPUOptimization
+)
: UserName_(std::move(userName))
, QueueName_(std::move(queueName))
, FolderId_(std::move(folderId))
@@ -46,10 +69,14 @@ TQueueLeader::TQueueLeader(TString userName, TString queueName, TString folderId
, SchemeCache_(schemeCache)
, Counters_(std::move(counters))
, UserCounters_(std::move(userCounters))
+ , UseCPUOptimization(useCPUOptimization)
{
if (quoterResourcesForUser) {
QuoterResources_ = new TSqsEvents::TQuoterResourcesForActions(*quoterResourcesForUser);
}
+ if (UseCPUOptimization) {
+ LOG_SQS_INFO("Use CPU optimization for queue " << TLogQueueName(UserName_, QueueName_));
+ }
}
void TQueueLeader::Bootstrap() {
@@ -62,8 +89,11 @@ void TQueueLeader::BecomeWorking() {
Become(&TQueueLeader::StateWorking);
const auto& cfg = Cfg();
const ui64 randomTimeToWait = RandomNumber<ui64>(cfg.GetBackgroundMetricsUpdateTimeMs() / 4); // Don't start all such operations at one moment
- Schedule(TDuration::MilliSeconds(randomTimeToWait), new TEvWakeup(UPDATE_COUNTERS_TAG));
-
+ if (UseCPUOptimization) {
+ Schedule(TDuration::MilliSeconds(randomTimeToWait), new TSqsEvents::TEvForceReloadState(GetNextReloadStateWaitPeriod()));
+ } else {
+ Schedule(TDuration::MilliSeconds(randomTimeToWait), new TEvWakeup(UPDATE_COUNTERS_TAG));
+ }
Schedule(TDuration::Seconds(1), new TEvWakeup(UPDATE_MESSAGES_METRICS_TAG));
std::vector<TSqsEvents::TEvExecute::TPtr> requests;
@@ -103,6 +133,7 @@ STATEFN(TQueueLeader::StateInit) {
hFunc(TSqsEvents::TEvChangeMessageVisibilityBatch, HandleChangeMessageVisibilityBatchWhileIniting); // from change message visibility action actor
hFunc(TSqsEvents::TEvGetRuntimeQueueAttributes, HandleGetRuntimeQueueAttributesWhileIniting); // from get queue attributes action actor
hFunc(TSqsEvents::TEvDeadLetterQueueNotification, HandleDeadLetterQueueNotification); // service periodically notifies active dead letter queues
+ hFunc(TSqsEvents::TEvForceReloadState, HandleForceReloadState);
// internal
hFunc(TSqsEvents::TEvQueueId, HandleQueueId); // discover dlq id and version
@@ -127,6 +158,7 @@ STATEFN(TQueueLeader::StateWorking) {
hFunc(TSqsEvents::TEvChangeMessageVisibilityBatch, HandleChangeMessageVisibilityBatchWhileWorking); // from change message visibility action actor
hFunc(TSqsEvents::TEvGetRuntimeQueueAttributes, HandleGetRuntimeQueueAttributesWhileWorking); // from get queue attributes action actor
hFunc(TSqsEvents::TEvDeadLetterQueueNotification, HandleDeadLetterQueueNotification); // service periodically notifies active dead letter queues
+ hFunc(TSqsEvents::TEvForceReloadState, HandleForceReloadState);
// internal
hFunc(TSqsEvents::TEvQueueId, HandleQueueId); // discover dlq id and version
@@ -196,6 +228,79 @@ void TQueueLeader::HandleWakeup(TEvWakeup::TPtr& ev) {
}
}
+
+void TQueueLeader::HandleForceReloadState(TSqsEvents::TEvForceReloadState::TPtr& ev) {
+ if (!UseCPUOptimization) {
+ return;
+ }
+ auto nextTryAfter = ev->Get()->NextTryAfter;
+ if (nextTryAfter != TDuration::Max()) {
+ Schedule(nextTryAfter, new TSqsEvents::TEvForceReloadState(GetNextReloadStateWaitPeriod(nextTryAfter)));
+ }
+
+ if (UpdateStateRequestInProcess) {
+ LOG_SQS_DEBUG("Update state request already in process for queue " << TLogQueueName(UserName_, QueueName_));
+ return;
+ }
+ LOG_SQS_DEBUG("Start update state request for queue " << TLogQueueName(UserName_, QueueName_));
+ TExecutorBuilder(SelfId(), "")
+ .User(UserName_)
+ .Queue(QueueName_)
+ .QueueLeader(SelfId())
+ .TablesFormat(TablesFormat_)
+ .QueryId(GET_STATE_ID)
+ .QueueVersion(QueueVersion_)
+ .Fifo(IsFifoQueue_)
+ .RetryOnTimeout()
+ .OnExecuted([this](const TSqsEvents::TEvExecuted::TRecord& ev) { HandleState(ev); })
+ .Counters(Counters_)
+ .Params()
+ .Uint64("QUEUE_ID_NUMBER", QueueVersion_)
+ .Uint64("QUEUE_ID_NUMBER_HASH", GetKeysHash(QueueVersion_))
+ .ParentBuilder().Start();
+}
+
+void TQueueLeader::HandleState(const TSqsEvents::TEvExecuted::TRecord& reply) {
+ LOG_SQS_DEBUG("Handle state for " << TLogQueueName(UserName_, QueueName_));
+ Y_VERIFY(!UpdateStateRequestInProcess);
+ UpdateStateRequestInProcess = false;
+
+ if (reply.GetStatus() == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete) {
+ using NKikimr::NClient::TValue;
+ const TValue val(TValue::Create(reply.GetExecutionEngineEvaluatedResponse()));
+ const TValue shardStates = val["state"];
+
+ for (size_t i = 0; i < shardStates.Size(); ++i) {
+ const auto& state = shardStates[i];
+ const ui64 shard = IsFifoQueue_ ? 0 : (TablesFormat_ == 1 ? ui32(state["Shard"]) : ui64(state["State"]));
+ auto& shardInfo = Shards_[shard];
+ auto inflyCountVal = state["InflyCount"];
+ Y_VERIFY(i64(inflyCountVal) >= 0);
+ ui64 inflyMessagesCount = static_cast<ui64>(i64(inflyCountVal));
+ ui64 inflyVersion = state["InflyVersion"].IsNull() ? 0 : ui64(state["InflyVersion"]);
+
+ SetMessagesCount(shard, state["MessageCount"]);
+ shardInfo.InflyMessagesCount = inflyMessagesCount;
+ shardInfo.CreatedTimestamp = TInstant::MilliSeconds(ui64(state["CreatedTimestamp"]));
+
+ if (shardInfo.InflyVersion != inflyVersion) {
+ shardInfo.NeedInflyReload = true;
+ }
+
+ if (shardInfo.MessagesCount > 0) {
+ RequestOldestTimestampMetrics(shard);
+ }
+ if (shardInfo.MessagesCount == 0 && !shardInfo.MessagesCountWasGot) {
+ Send(RetentionActor_, new TSqsEvents::TEvChangeRetentionActiveCheck(false));
+ }
+
+ shardInfo.MessagesCountWasGot = true;
+ }
+ } else {
+ LOG_SQS_ERROR("Failed to update state for " << TLogQueueName(UserName_, QueueName_) << ": " << reply);
+ }
+}
+
void TQueueLeader::HandleGetConfigurationWhileIniting(TSqsEvents::TEvGetConfiguration::TPtr& ev) {
GetConfigurationRequests_.emplace_back(ev);
}
@@ -442,11 +547,7 @@ void TQueueLeader::OnSendBatchExecuted(ui64 shard, ui64 batchId, const TSqsEvent
DelayStatistics_.AddDelayedMessage(batch->TransactionStartedTime + entry.Message.Delay, batch->TransactionStartedTime);
}
}
- if (!IsFifoQueue_) {
- const i64 newMessagesCount = val["newMessagesCount"];
- Y_VERIFY(newMessagesCount >= 0);
- shardInfo.MessagesCount = static_cast<ui64>(newMessagesCount);
- }
+ SetMessagesCount(shard, val["newMessagesCount"]);
} else {
const TString* prevRequestId = nullptr;
for (size_t i = 0; i < batch->Size(); ++i) {
@@ -642,10 +743,15 @@ void TQueueLeader::OnFifoMessagesReadSuccess(const NKikimr::NClient::TValue& val
if (const ui64 movedMessagesCount = value["movedMessagesCount"]) {
ADD_COUNTER(Counters_, MessagesMovedToDLQ, movedMessagesCount);
- const i64 newMessagesCount = value["newMessagesCount"];
- Y_VERIFY(newMessagesCount >= 0);
- auto& shardInfo = Shards_[0];
- shardInfo.MessagesCount = static_cast<ui64>(newMessagesCount);
+ SetMessagesCount(0, value["newMessagesCount"]);
+
+ const auto& moved = value["movedMessages"]; // TODO return only moved offsets
+ for (size_t i = 0; i < moved.Size(); ++i) {
+ const ui64 offset = moved[i]["SourceOffset"];
+ if (offset == Shards_.front().OldestMessageOffset) {
+ RequestOldestTimestampMetrics(0);
+ }
+ }
}
reqInfo.Answer->Messages.resize(list.Size());
@@ -831,16 +937,14 @@ void TQueueLeader::OnLoadStdMessageResult(const TString& requestId, const ui64 o
}
}
-void TQueueLeader::OnLoadStdMessagesBatchSuccess(const NKikimr::NClient::TValue& value, TShardInfo& shardInfo, TIntrusivePtr<TLoadBatch> batch) {
+void TQueueLeader::OnLoadStdMessagesBatchSuccess(const NKikimr::NClient::TValue& value, ui64 shard, TShardInfo& shardInfo, TIntrusivePtr<TLoadBatch> batch) {
const NKikimr::NClient::TValue list(value["result"]);
Y_VERIFY(list.Size() == batch->Size());
if (const ui64 movedMessagesCount = value["movedMessagesCount"]) {
ADD_COUNTER(Counters_, MessagesMovedToDLQ, movedMessagesCount);
- const i64 newMessagesCount = value["newMessagesCount"];
- Y_VERIFY(newMessagesCount >= 0);
- shardInfo.MessagesCount = static_cast<ui64>(newMessagesCount);
+ SetMessagesCount(shard, value["newMessagesCount"]);
}
THashMap<ui64, const TLoadBatchEntry*> offset2entry;
@@ -852,6 +956,14 @@ void TQueueLeader::OnLoadStdMessagesBatchSuccess(const NKikimr::NClient::TValue&
for (size_t i = 0; i < list.Size(); ++i) {
auto msg = list[i];
const ui64 offset = msg["Offset"];
+
+ const bool exists = msg["Exists"];
+ const auto wasDeadLetterValue = msg["IsDeadLetter"];
+ const bool wasDeadLetter = wasDeadLetterValue.HaveValue() ? bool(wasDeadLetterValue) : false;
+ const bool valid = msg["Valid"];
+ if (exists && wasDeadLetter && valid && shardInfo.OldestMessageOffset == offset) {
+ RequestOldestTimestampMetrics(shard);
+ }
const auto entry = offset2entry.find(offset);
Y_VERIFY(entry != offset2entry.end());
OnLoadStdMessageResult(entry->second->RequestId, offset, true, &msg, false);
@@ -873,7 +985,7 @@ void TQueueLeader::OnLoadStdMessagesBatchExecuted(ui64 shard, ui64 batchId, cons
dlqExists = value["dlqExists"];
if (dlqExists) {
success = true;
- OnLoadStdMessagesBatchSuccess(value, shardInfo, batch);
+ OnLoadStdMessagesBatchSuccess(value, shard, shardInfo, batch);
}
}
@@ -1081,6 +1193,9 @@ void TQueueLeader::OnDeleteBatchExecuted(ui64 shard, ui64 batchId, const TSqsEve
OnMessageDeleted(entry.RequestId, shard, entry.IndexInRequest, reply, &messageResult);
}
batch->Offset2Entry.erase(first, last);
+ if (shardInfo.OldestMessageOffset == offset) {
+ RequestOldestTimestampMetrics(shard);
+ }
}
// others are already deleted messages:
for (const auto& [offset, entryIndex] : batch->Offset2Entry) {
@@ -1088,11 +1203,7 @@ void TQueueLeader::OnDeleteBatchExecuted(ui64 shard, ui64 batchId, const TSqsEve
OnMessageDeleted(entry.RequestId, shard, entry.IndexInRequest, reply, nullptr);
}
- if (!IsFifoQueue_) {
- const i64 newMessagesCount = val["newMessagesCount"];
- Y_VERIFY(newMessagesCount >= 0);
- shardInfo.MessagesCount = static_cast<ui64>(newMessagesCount);
- }
+ SetMessagesCount(shard, val["newMessagesCount"]);
} else {
const TString* prevRequestId = nullptr;
for (size_t i = 0; i < batch->Size(); ++i) {
@@ -1496,7 +1607,11 @@ void TQueueLeader::StartGatheringMetrics() {
IsDlqQueue_ = false;
}
-
+
+ if (UseCPUOptimization) {
+ return;
+ }
+
for (ui64 shard = 0; shard < ShardsCount_; ++shard) {
if (IsFifoQueue_ || IsDlqQueue_) {
RequestMessagesCountMetrics(shard);
@@ -1564,7 +1679,7 @@ void TQueueLeader::ReceiveMessagesCountMetrics(ui64 shard, const TSqsEvents::TEv
LOG_SQS_DEBUG("Handle message count metrics for " << TLogQueueName(UserName_, QueueName_, shard));
Y_VERIFY(MetricsQueriesInfly_ > 0);
--MetricsQueriesInfly_;
- if (MetricsQueriesInfly_ == 0) {
+ if (MetricsQueriesInfly_ == 0 && !UseCPUOptimization) {
ScheduleMetricsRequest();
}
Y_VERIFY(shard < Shards_.size());
@@ -1575,8 +1690,7 @@ void TQueueLeader::ReceiveMessagesCountMetrics(ui64 shard, const TSqsEvents::TEv
const TValue val(TValue::Create(reply.GetExecutionEngineEvaluatedResponse()));
const TValue messagesCount = val["messagesCount"];
if (!messagesCount.IsNull()) { // can be null in case of parallel queue deletion (SQS-292)
- Y_VERIFY(i64(messagesCount) >= 0);
- Shards_[shard].MessagesCount = static_cast<ui64>(i64(messagesCount)); // MessageCount is Int64 type in database
+ SetMessagesCount(shard, messagesCount);
}
const TValue createdTimestamp = val["createdTimestamp"];
if (!createdTimestamp.IsNull()) {
@@ -1599,7 +1713,7 @@ void TQueueLeader::ReceiveOldestTimestampMetrics(ui64 shard, const TSqsEvents::T
LOG_SQS_DEBUG("Handle oldest timestamp metrics for " << TLogQueueName(UserName_, QueueName_, shard));
Y_VERIFY(MetricsQueriesInfly_ > 0);
--MetricsQueriesInfly_;
- if (MetricsQueriesInfly_ == 0) {
+ if (MetricsQueriesInfly_ == 0 && !UseCPUOptimization) {
ScheduleMetricsRequest();
}
Y_VERIFY(shard < Shards_.size());
@@ -1610,6 +1724,10 @@ void TQueueLeader::ReceiveOldestTimestampMetrics(ui64 shard, const TSqsEvents::T
const TValue list = val["messages"];
if (list.Size()) {
Shards_[shard].LastSuccessfulOldestMessageTimestampValueMs = Shards_[shard].OldestMessageTimestampMs = list[0]["SentTimestamp"];
+ Shards_[shard].OldestMessageOffset = list[0]["Offset"];
+ LOG_SQS_INFO("Got oldest message for " << TLogQueueName(UserName_, QueueName_, shard)
+ << ": offset=" << Shards_[shard].OldestMessageOffset << " ts=" << TInstant::MilliSeconds(Shards_[shard].OldestMessageTimestampMs));
+
} else {
Shards_[shard].OldestMessageTimestampMs = Max();
}
@@ -1620,6 +1738,27 @@ void TQueueLeader::ReceiveOldestTimestampMetrics(ui64 shard, const TSqsEvents::T
ReportOldestTimestampMetricsIfReady();
}
+void TQueueLeader::SetMessagesCount(ui64 shard, const NKikimr::NClient::TValue& value) {
+ const i64 newMessagesCountVal = value;
+ Y_VERIFY(newMessagesCountVal >= 0);
+ ui64 newMessagesCount = static_cast<ui64>(newMessagesCountVal);
+ SetMessagesCount(shard, newMessagesCount);
+}
+
+void TQueueLeader::SetMessagesCount(ui64 shard, ui64 newMessagesCount) {
+ TShardInfo& shardInfo = Shards_[shard];
+ if (UseCPUOptimization) {
+ if (shardInfo.MessagesCount == 0 && newMessagesCount > 0) {
+ RequestOldestTimestampMetrics(shard);
+ Send(RetentionActor_, new TSqsEvents::TEvChangeRetentionActiveCheck(true));
+ }
+ if (shardInfo.MessagesCount > 0 && newMessagesCount == 0) {
+ Send(RetentionActor_, new TSqsEvents::TEvChangeRetentionActiveCheck(false));
+ }
+ }
+ shardInfo.MessagesCount = newMessagesCount;
+}
+
void TQueueLeader::ScheduleMetricsRequest() {
const ui64 updateTime = Cfg().GetBackgroundMetricsUpdateTimeMs();
const ui64 randomTimeToWait = RandomNumber<ui32>(updateTime / 4);
@@ -1632,7 +1771,7 @@ void TQueueLeader::ReportMessagesCountMetricsIfReady() {
const TInstant now = TActivationContext::Now();
for (const auto& shardInfo : Shards_) {
if (IsFifoQueue_) {
- if (shardInfo.MessagesCountIsRequesting) {
+ if (shardInfo.MessagesCountIsRequesting || !shardInfo.MessagesCountWasGot) {
return;
}
} else {
@@ -1658,8 +1797,13 @@ void TQueueLeader::ReportOldestTimestampMetricsIfReady() {
if (shardInfo.OldestMessageAgeIsRequesting) {
return;
}
- oldestMessagesTimestamp = Min(oldestMessagesTimestamp, shardInfo.OldestMessageTimestampMs);
+ ui64 ts = shardInfo.OldestMessageTimestampMs;
+ if (UseCPUOptimization && shardInfo.MessagesCountWasGot && shardInfo.MessagesCount == 0) {
+ ts = Max<ui64>();
+ }
+ oldestMessagesTimestamp = Min(oldestMessagesTimestamp, ts);
}
+
if (Counters_) {
if (oldestMessagesTimestamp != Max<ui64>()) {
auto age = (TActivationContext::Now() - TInstant::MilliSeconds(oldestMessagesTimestamp)).Seconds();
@@ -1793,14 +1937,13 @@ void TQueueLeader::OnInflyLoaded(ui64 shard, const TSqsEvents::TEvExecuted::TRec
shardInfo.InflyVersion = val["inflyVersion"];
LOG_SQS_DEBUG("Infly version for shard " << TLogQueueName(UserName_, QueueName_, shard) << ": " << shardInfo.InflyVersion);
- auto messagesCount = val["messageCount"];
auto inflyCount = val["inflyCount"];
- Y_VERIFY(i64(messagesCount) >= 0);
Y_VERIFY(i64(inflyCount) >= 0);
- shardInfo.MessagesCount = static_cast<ui64>(i64(messagesCount));
+ SetMessagesCount(shard, val["messageCount"]);
shardInfo.InflyMessagesCount = static_cast<ui64>(i64(inflyCount));
shardInfo.ReadOffset = val["readOffset"];
shardInfo.CreatedTimestamp = TInstant::MilliSeconds(ui64(val["createdTimestamp"]));
+
shardInfo.DelayStatisticsInited = true;
@@ -1894,7 +2037,7 @@ void TQueueLeader::OnAddedMessagesToInfly(ui64 shard, const TSqsEvents::TEvExecu
shardInfo.ReadOffset = val["readOffset"];
// Update messages count
- shardInfo.MessagesCount = static_cast<ui64>(i64(val["messagesCount"]));
+ SetMessagesCount(shard, val["messagesCount"]);
}
} else {
LOG_SQS_ERROR("Failed to add new messages to infly for " << TLogQueueName(UserName_, QueueName_, shard) << ": " << reply);
@@ -2113,8 +2256,15 @@ void TQueueLeader::HandleInflyIsPurgingNotification(TSqsEvents::TEvInflyIsPurgin
}
void TQueueLeader::HandleQueuePurgedNotification(TSqsEvents::TEvQueuePurgedNotification::TPtr& ev) {
- auto& shardInfo = Shards_[ev->Get()->Shard];
- shardInfo.MessagesCount = ev->Get()->NewMessagesCount;
+ ui64 shard = ev->Get()->Shard;
+ auto& shardInfo = Shards_[shard];
+ SetMessagesCount(shard, ev->Get()->NewMessagesCount);
+ for (ui64 offset : ev->Get()->DeletedOffsets) {
+ if (shardInfo.OldestMessageOffset <= offset) {
+ RequestOldestTimestampMetrics(shard);
+ break;
+ }
+ }
}
void TQueueLeader::HandleGetRuntimeQueueAttributesWhileIniting(TSqsEvents::TEvGetRuntimeQueueAttributes::TPtr& ev) {
@@ -2134,6 +2284,7 @@ void TQueueLeader::HandleDeadLetterQueueNotification(TSqsEvents::TEvDeadLetterQu
if (!IsFifoQueue_ && !IsDlqQueue_) {
// we need to start the process only once
IsDlqQueue_ = true;
+ UseCPUOptimization = false;
LOG_SQS_INFO("Started periodic message counting for queue " << TLogQueueName(UserName_, QueueName_)
<< ". Latest dlq notification was at " << LatestDlqNotificationTs_);
diff --git a/ydb/core/ymq/actor/queue_leader.h b/ydb/core/ymq/actor/queue_leader.h
index c4a6a7ffddc..ef68e5c0d89 100644
--- a/ydb/core/ymq/actor/queue_leader.h
+++ b/ydb/core/ymq/actor/queue_leader.h
@@ -33,7 +33,17 @@ class TQueueLeader : public TActorBootstrapped<TQueueLeader> {
struct TLoadBatch;
public:
- TQueueLeader(TString userName, TString queueName, TString folderId, TString rootUrl, TIntrusivePtr<TQueueCounters> counters, TIntrusivePtr<TUserCounters> userCounters, const TActorId& schemeCache, const TIntrusivePtr<TSqsEvents::TQuoterResourcesForActions>& quoterResourcesForUser);
+ TQueueLeader(
+ TString userName,
+ TString queueName,
+ TString folderId,
+ TString rootUrl,
+ TIntrusivePtr<TQueueCounters> counters,
+ TIntrusivePtr<TUserCounters> userCounters,
+ const TActorId& schemeCache,
+ const TIntrusivePtr<TSqsEvents::TQuoterResourcesForActions>& quoterResourcesForUser,
+ bool useCPUOptimization
+ );
void Bootstrap();
@@ -47,6 +57,7 @@ private:
void PassAway() override;
void HandleWakeup(TEvWakeup::TPtr& ev);
+ void HandleState(const TSqsEvents::TEvExecuted::TRecord& ev);
void HandleGetConfigurationWhileIniting(TSqsEvents::TEvGetConfiguration::TPtr& ev);
void HandleGetConfigurationWhileWorking(TSqsEvents::TEvGetConfiguration::TPtr& ev);
void HandleExecuteWhileIniting(TSqsEvents::TEvExecute::TPtr& ev);
@@ -68,9 +79,11 @@ private:
void HandleGetRuntimeQueueAttributesWhileIniting(TSqsEvents::TEvGetRuntimeQueueAttributes::TPtr& ev);
void HandleGetRuntimeQueueAttributesWhileWorking(TSqsEvents::TEvGetRuntimeQueueAttributes::TPtr& ev);
void HandleDeadLetterQueueNotification(TSqsEvents::TEvDeadLetterQueueNotification::TPtr& ev);
+ void HandleForceReloadState(TSqsEvents::TEvForceReloadState::TPtr& ev);
void BecomeWorking();
void RequestConfiguration();
+ void UpdateStateRequest();
void StartGatheringMetrics();
void RequestMessagesCountMetrics(ui64 shard);
void RequestOldestTimestampMetrics(ui64 shard);
@@ -128,7 +141,7 @@ private:
void WaitAddMessagesToInflyOrTryAnotherShard(TReceiveMessageBatchRequestProcessing& reqInfo);
void Reply(TReceiveMessageBatchRequestProcessing& reqInfo);
// batching
- void OnLoadStdMessagesBatchSuccess(const NKikimr::NClient::TValue& value, TShardInfo& shardInfo, TIntrusivePtr<TLoadBatch> batch);
+ void OnLoadStdMessagesBatchSuccess(const NKikimr::NClient::TValue& value, ui64 shard, TShardInfo& shardInfo, TIntrusivePtr<TLoadBatch> batch);
void OnLoadStdMessagesBatchExecuted(ui64 shard, ui64 batchId, const bool usedDLQ, const TSqsEvents::TEvExecuted::TRecord& reply);
// delete
@@ -144,6 +157,8 @@ private:
TQueuePath GetQueuePath() {
return TQueuePath(Cfg().GetRoot(), UserName_, QueueName_, QueueVersion_);
}
+ void SetMessagesCount(ui64 shard, const NKikimr::NClient::TValue& value);
+ void SetMessagesCount(ui64 shard, ui64 value);
void ScheduleMetricsRequest();
@@ -376,6 +391,7 @@ private:
ui64 MessagesCount = 0;
ui64 InflyMessagesCount = 0;
ui64 OldestMessageTimestampMs = Max();
+ ui64 OldestMessageOffset = 0;
ui64 LastSuccessfulOldestMessageTimestampValueMs = 0; // for query optimization - more accurate range
bool MessagesCountIsRequesting = false;
@@ -417,6 +433,8 @@ private:
};
std::vector<TShardInfo> Shards_;
TMessageDelayStatistics DelayStatistics_;
+ bool UpdateStateRequestInProcess = false;
+ bool UseCPUOptimization = false;
// background actors
TActorId DeduplicationCleanupActor_;
diff --git a/ydb/core/ymq/actor/retention.cpp b/ydb/core/ymq/actor/retention.cpp
index 6384ca1127c..26957e71188 100644
--- a/ydb/core/ymq/actor/retention.cpp
+++ b/ydb/core/ymq/actor/retention.cpp
@@ -57,7 +57,9 @@ void TRetentionActor::SetRetentionBoundary() {
RLOG_SQS_ERROR("Failed to set retention boundary for queue " << TLogQueueName(QueuePath_));
}
- Schedule(RandomRetentionPeriod(), new TEvWakeup());
+ if (Active) {
+ Schedule(RandomRetentionPeriod(), new TEvWakeup());
+ }
};
TExecutorBuilder(SelfId(), RequestId_)
@@ -83,6 +85,14 @@ void TRetentionActor::HandleExecuted(TSqsEvents::TEvExecuted::TPtr& ev) {
ev->Get()->Call();
}
+void TRetentionActor::Handle(TSqsEvents::TEvChangeRetentionActiveCheck::TPtr& ev) {
+ if (!Active && ev->Get()->Active) {
+ Schedule(RandomRetentionPeriod(), new TEvWakeup());
+ }
+ Active = ev->Get()->Active;
+ RLOG_SQS_INFO("Handle change retention actor state " << TLogQueueName(QueuePath_) << " : " << Active);
+}
+
void TRetentionActor::HandlePoisonPill(TEvPoisonPill::TPtr&) {
RLOG_SQS_DEBUG("Handle poison pill in retention actor for queue " << TLogQueueName(QueuePath_));
PassAway();
@@ -97,6 +107,7 @@ STATEFN(TRetentionActor::StateFunc) {
switch (ev->GetTypeRewrite()) {
cFunc(TEvWakeup::EventType, HandleWakeup);
hFunc(TSqsEvents::TEvExecuted, HandleExecuted);
+ hFunc(TSqsEvents::TEvChangeRetentionActiveCheck, Handle);
hFunc(TEvPoisonPill, HandlePoisonPill);
}
}
diff --git a/ydb/core/ymq/actor/retention.h b/ydb/core/ymq/actor/retention.h
index 3c4f47f4a2d..e253102dcc8 100644
--- a/ydb/core/ymq/actor/retention.h
+++ b/ydb/core/ymq/actor/retention.h
@@ -23,6 +23,7 @@ private:
void SetRetentionBoundary();
void HandleExecuted(TSqsEvents::TEvExecuted::TPtr& ev);
+ void Handle(TSqsEvents::TEvChangeRetentionActiveCheck::TPtr& ev);
void HandlePoisonPill(TEvPoisonPill::TPtr&);
void HandleWakeup();
@@ -34,6 +35,7 @@ private:
const ui32 TablesFormat_;
const TString RequestId_;
const TActorId QueueLeader_;
+ bool Active = true;
};
} // namespace NKikimr::NSQS
diff --git a/ydb/core/ymq/actor/service.cpp b/ydb/core/ymq/actor/service.cpp
index b42e38e8304..4e491df71b9 100644
--- a/ydb/core/ymq/actor/service.cpp
+++ b/ydb/core/ymq/actor/service.cpp
@@ -77,7 +77,7 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> {
TString folderId, ui32 tablesFormat, ui64 version, ui64 shardsCount, const TIntrusivePtr<TUserCounters>& userCounters,
const TIntrusivePtr<TFolderCounters>& folderCounters,
const TActorId& schemeCache, TIntrusivePtr<TSqsEvents::TQuoterResourcesForActions> quoterResourcesForUser,
- bool insertCounters
+ bool insertCounters, bool useLeaderCPUOptimization
)
: UserName_(std::move(userName))
, QueueName_(std::move(queueName))
@@ -94,6 +94,7 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> {
, FolderCounters_(folderCounters)
, SchemeCache_(schemeCache)
, QuoterResourcesForUser_(std::move(quoterResourcesForUser))
+ , UseLeaderCPUOptimization(useLeaderCPUOptimization)
{
}
@@ -112,12 +113,21 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> {
StopLocalLeaderIfNeeded(LEADER_DESTROY_REASON_TABLET_ON_ANOTHER_NODE);
}
}
+
+ void LocalLeaderWayMoved() const {
+ if (LocalLeader_) {
+ TActivationContext::Send(new IEventHandleFat(LocalLeader_, SelfId(), new TSqsEvents::TEvForceReloadState()));
+ }
+ }
void StartLocalLeader(const TString& reason) {
if (!LocalLeader_) {
Counters_ = Counters_->GetCountersForLeaderNode();
LWPROBE(CreateLeader, UserName_, QueueName_, reason);
- LocalLeader_ = TActivationContext::Register(new TQueueLeader(UserName_, QueueName_, FolderId_, RootUrl_, Counters_, UserCounters_, SchemeCache_, QuoterResourcesForUser_));
+ LocalLeader_ = TActivationContext::Register(new TQueueLeader(
+ UserName_, QueueName_, FolderId_, RootUrl_, Counters_, UserCounters_,
+ SchemeCache_, QuoterResourcesForUser_, UseLeaderCPUOptimization
+ ));
LOG_SQS_INFO("Start local leader [" << UserName_ << "/" << QueueName_ << "] actor " << LocalLeader_);
if (FolderId_) {
@@ -183,6 +193,7 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> {
TActorId SchemeCache_;
ui64 LocalLeaderRefCount_ = 0;
TIntrusivePtr<TSqsEvents::TQuoterResourcesForActions> QuoterResourcesForUser_;
+ bool UseLeaderCPUOptimization;
// State machine
THashSet<TSqsEvents::TEvGetLeaderNodeForQueueRequest::TPtr> GetLeaderNodeRequests_;
@@ -246,6 +257,7 @@ struct TSqsService::TUserInfo : public TAtomicRefCount<TUserInfo> {
TLocalRateLimiterResource DeleteObjectsQuoterResource_;
TLocalRateLimiterResource OtherActionsQuoterResource_;
i64 EarlyRequestQueuesListBudget_ = EARLY_REQUEST_QUEUES_LIST_MAX_BUDGET; // Defence from continuously requesting queues list.
+ bool UseLeaderCPUOptimization = false;
// State machine
THashMultiMap<TString, TSqsEvents::TEvGetLeaderNodeForQueueRequest::TPtr> GetLeaderNodeRequests_; // queue name -> request
@@ -747,7 +759,11 @@ void TSqsService::HandleNodeTrackingSubscriptionStatus(TSqsEvents::TEvNodeTracke
auto queuePtr = it->second;
auto& queue = *queuePtr;
auto nodeId = ev->Get()->NodeId;
+ bool disconnected = ev->Get()->Disconnected;
queue.SetLeaderNodeId(nodeId);
+ if (disconnected) {
+ queue.LocalLeaderWayMoved();
+ }
LOG_SQS_DEBUG(
"Got node leader for queue [" << queue.UserName_ << "/" << queue.QueueName_
<< "]. Node: " << nodeId << " subscription id: " << subscriptionId
@@ -921,6 +937,16 @@ void TSqsService::HandleUserSettingsChanged(TSqsEvents::TEvUserSettingsChanged::
const bool needExport = FromStringWithDefault(value->second, false);
user->Counters_->ExportTransactionCounters(needExport);
}
+
+ if (IsIn(*diff, USE_CPU_LEADER_OPTIMIZATION)) {
+ const auto value = newSettings->find(USE_CPU_LEADER_OPTIMIZATION);
+ Y_VERIFY(value != newSettings->end());
+ const bool use = FromStringWithDefault(value->second, false);
+ user->UseLeaderCPUOptimization = use;
+ for (auto queue : user->Queues_) {
+ queue.second->UseLeaderCPUOptimization = use;
+ }
+ }
}
TSqsService::TUserInfoPtr TSqsService::MutableUser(const TString& userName, bool moveUserRequestsToUserRecord, bool* requestsWereMoved) {
@@ -1039,7 +1065,7 @@ std::map<TString, TSqsService::TQueueInfoPtr>::iterator TSqsService::AddQueue(co
auto ret = user->Queues_.insert(std::make_pair(queue, TQueueInfoPtr(new TQueueInfo(
userName, queue, RootUrl_, leaderTabletId, isFifo, customName, folderId, tablesFormat, version, shardsCount,
- user->Counters_, folderCntrIter->second, SchemeCache_, user->QuoterResources_, insertCounters)))
+ user->Counters_, folderCntrIter->second, SchemeCache_, user->QuoterResources_, insertCounters, user->UseLeaderCPUOptimization)))
).first;
auto queueInfo = ret->second;
diff --git a/ydb/core/ymq/actor/user_settings_names.cpp b/ydb/core/ymq/actor/user_settings_names.cpp
index 3d315490bc8..08ba0c19e60 100644
--- a/ydb/core/ymq/actor/user_settings_names.cpp
+++ b/ydb/core/ymq/actor/user_settings_names.cpp
@@ -6,5 +6,6 @@ extern const TString USER_SETTING_MAX_QUEUES_COUNT = "MaxQueuesCount";
extern const TString USER_SETTING_DISABLE_COUNTERS = "DisableCounters";
extern const TString USER_SETTING_SHOW_DETAILED_COUNTERS_DEADLINE_MS = "ShowDetailedCountersDeadlineMs";
extern const TString USER_SETTING_EXPORT_TRANSACTION_COUNTERS = "ExportTransactionCounters";
+extern const TString USE_CPU_LEADER_OPTIMIZATION = "UseCPULeaderOptimization";
} // namespace NKikimr::NSQS
diff --git a/ydb/core/ymq/actor/user_settings_names.h b/ydb/core/ymq/actor/user_settings_names.h
index 11ee80f43a7..5cf2a8e13ef 100644
--- a/ydb/core/ymq/actor/user_settings_names.h
+++ b/ydb/core/ymq/actor/user_settings_names.h
@@ -9,5 +9,6 @@ extern const TString USER_SETTING_MAX_QUEUES_COUNT;
extern const TString USER_SETTING_DISABLE_COUNTERS;
extern const TString USER_SETTING_SHOW_DETAILED_COUNTERS_DEADLINE_MS;
extern const TString USER_SETTING_EXPORT_TRANSACTION_COUNTERS;
+extern const TString USE_CPU_LEADER_OPTIMIZATION;
} // namespace NKikimr::NSQS
diff --git a/ydb/core/ymq/base/query_id.h b/ydb/core/ymq/base/query_id.h
index 4c75bcb5514..c2e0d9ba908 100644
--- a/ydb/core/ymq/base/query_id.h
+++ b/ydb/core/ymq/base/query_id.h
@@ -29,6 +29,7 @@ enum EQueryId {
READ_OR_REDRIVE_MESSAGE_ID,
GET_USER_SETTINGS_ID,
GET_QUEUES_LIST_ID,
+ GET_STATE_ID,
QUERY_VECTOR_SIZE,
};
diff --git a/ydb/core/ymq/queues/fifo/queries.cpp b/ydb/core/ymq/queues/fifo/queries.cpp
index d70f58caca1..424fcf5c856 100644
--- a/ydb/core/ymq/queues/fifo/queries.cpp
+++ b/ydb/core/ymq/queues/fifo/queries.cpp
@@ -400,6 +400,7 @@ const char* const DeleteMessageQuery = R"__(
(return (Extend
(AsList (SetResult 'deleted result))
+ (AsList (SetResult 'newMessagesCount count))
(ListIf (HasItems valid) (UpdateRow stateTable stateRow stateUpdate))
(Map valid (lambda '(item) (block '(
@@ -865,6 +866,7 @@ const char* const ReadOrRedriveMessageQuery = R"__(
(AsList (SetResult 'dlqExists dlqExists))
(AsList (SetResult 'result messagesToReturnAsStruct))
(AsList (SetResult 'movedMessagesCount (Length messagesToMoveAsStruct)))
+ (AsList (SetResult 'movedMessages messagesToMoveAsStruct))
(AsList (SetResult 'newMessagesCount newSourceMsgCount))
(ListIf (And (HasItems messagesToMoveAsStruct) dlqExists) (UpdateRow dlqStateTable dlqStateRow dlqStateUpdate))
(ListIf (And (HasItems messagesToMoveAsStruct) dlqExists) (UpdateRow sourceStateTable sourceStateRow sourceStateUpdate))
@@ -1132,6 +1134,7 @@ const char* const WriteMessageQuery = R"__(
(return (Extend
(AsList (SetResult 'result result))
+ (AsList (SetResult 'newMessagesCount newMessagesCount))
(AsList (If (Greater (Length messagesAdded) (Uint64 '0)) (UpdateRow stateTable stateRow stateUpdate) (Void)))
@@ -1410,7 +1413,8 @@ const char* const GetOldestMessageTimestampMetricsQuery = R"__(
'('SentTimestamp timeFrom (Uint64 '18446744073709551615))
'('Offset (Uint64 '0) (Uint64 '18446744073709551615))))
(let sentIdxSelect '(
- 'SentTimestamp))
+ 'SentTimestamp
+ 'Offset))
(let selectResult (SelectRange sentTsIdx sentIdxRange sentIdxSelect '('('"ItemsLimit" (Uint64 '1)))))
(let messages (Member selectResult 'List))
@@ -1495,6 +1499,29 @@ const char* const ListDeadLetterSourceQueuesQuery = R"__(
)
)__";
+const char* const GetStateQuery = R"__(
+ (
+ (let queueIdNumber (Parameter 'QUEUE_ID_NUMBER (DataType 'Uint64)))
+ (let queueIdNumberHash (Parameter 'QUEUE_ID_NUMBER_HASH (DataType 'Uint64)))
+
+ (let stateTable ')__" QUEUE_TABLES_FOLDER_PARAM R"__(/State)
+
+ (let stateRange '(
+ )__" ALL_SHARDS_RANGE_PARAM R"__(
+ ))
+ (let stateSelect '(
+ 'MessageCount
+ 'InflyCount
+ 'InflyVersion
+ 'CreatedTimestamp
+ 'RetentionBoundary))
+
+ (let stateRead (Member (SelectRange stateTable stateRange stateSelect '()) 'List))
+
+ (return (AsList (SetResult 'state stateRead)))
+ )
+)__";
+
} // namespace
const char* GetFifoQueryById(size_t id) {
@@ -1535,6 +1562,8 @@ const char* GetFifoQueryById(size_t id) {
return ListDeadLetterSourceQueuesQuery;
case READ_OR_REDRIVE_MESSAGE_ID:
return ReadOrRedriveMessageQuery;
+ case GET_STATE_ID:
+ return GetStateQuery;
}
return nullptr;
diff --git a/ydb/core/ymq/queues/std/queries.cpp b/ydb/core/ymq/queues/std/queries.cpp
index cef1237c2ee..8b6d93f951e 100644
--- a/ydb/core/ymq/queues/std/queries.cpp
+++ b/ydb/core/ymq/queues/std/queries.cpp
@@ -1105,7 +1105,8 @@ const char* const GetOldestMessageTimestampMetricsQuery = R"__(
'('SentTimestamp timeFrom (Uint64 '18446744073709551615))
'('Offset (Uint64 '0) (Uint64 '18446744073709551615))))
(let sentIdxSelect '(
- 'SentTimestamp))
+ 'SentTimestamp
+ 'Offset))
(let selectResult (SelectRange sentTsIdx sentIdxRange sentIdxSelect '('('"ItemsLimit" (Uint64 '1)))))
(let messages (Member selectResult 'List))
@@ -1286,6 +1287,31 @@ const char* const GetMessageCountMetricsQuery = R"__(
)
)__";
+
+const char* const GetStateQuery = R"__(
+ (
+ (let queueIdNumber (Parameter 'QUEUE_ID_NUMBER (DataType 'Uint64)))
+ (let queueIdNumberHash (Parameter 'QUEUE_ID_NUMBER_HASH (DataType 'Uint64)))
+
+ (let stateTable ')__" QUEUE_TABLES_FOLDER_PARAM R"__(/State)
+
+ (let stateRange '(
+ )__" ALL_SHARDS_RANGE_PARAM R"__(
+ ))
+ (let stateSelect '(
+ ')__" SHARD_COLUMN_NAME_PARAM R"__(
+ 'MessageCount
+ 'InflyCount
+ 'InflyVersion
+ 'CreatedTimestamp
+ 'RetentionBoundary))
+
+ (let stateRead (Member (SelectRange stateTable stateRange stateSelect '()) 'List))
+ (return (AsList (SetResult 'state stateRead)))
+ )
+)__";
+
+
const char* const GetQueuesListQuery = R"__(
(
(let fromUser (Parameter 'FROM_USER (DataType 'Utf8String)))
@@ -1363,6 +1389,8 @@ const char* GetStdQueryById(size_t id) {
return GetQueuesListQuery;
case GET_MESSAGE_COUNT_METRIC_ID:
return GetMessageCountMetricsQuery;
+ case GET_STATE_ID:
+ return GetStateQuery;
}
return nullptr;
}