diff options
author | alexbogo <alexbogo@ydb.tech> | 2023-03-21 12:45:15 +0300 |
---|---|---|
committer | alexbogo <alexbogo@ydb.tech> | 2023-03-21 12:45:15 +0300 |
commit | 6c583eeffc50a6ee9277f24d25c5705ea9b9ccd7 (patch) | |
tree | bdd6fd8ad53c36f052a718c4e4b2d335ad10928a | |
parent | 1cf6f6a03c1e2d29102a6482c3444a106e1d1ab0 (diff) | |
download | ydb-6c583eeffc50a6ee9277f24d25c5705ea9b9ccd7.tar.gz |
[ymq] store oldest message info in state actual
init
-rw-r--r-- | ydb/core/ymq/actor/events.h | 22 | ||||
-rw-r--r-- | ydb/core/ymq/actor/node_tracker.cpp | 20 | ||||
-rw-r--r-- | ydb/core/ymq/actor/node_tracker.h | 2 | ||||
-rw-r--r-- | ydb/core/ymq/actor/purge.cpp | 16 | ||||
-rw-r--r-- | ydb/core/ymq/actor/purge.h | 1 | ||||
-rw-r--r-- | ydb/core/ymq/actor/queue_leader.cpp | 221 | ||||
-rw-r--r-- | ydb/core/ymq/actor/queue_leader.h | 22 | ||||
-rw-r--r-- | ydb/core/ymq/actor/retention.cpp | 13 | ||||
-rw-r--r-- | ydb/core/ymq/actor/retention.h | 2 | ||||
-rw-r--r-- | ydb/core/ymq/actor/service.cpp | 32 | ||||
-rw-r--r-- | ydb/core/ymq/actor/user_settings_names.cpp | 1 | ||||
-rw-r--r-- | ydb/core/ymq/actor/user_settings_names.h | 1 | ||||
-rw-r--r-- | ydb/core/ymq/base/query_id.h | 1 | ||||
-rw-r--r-- | ydb/core/ymq/queues/fifo/queries.cpp | 31 | ||||
-rw-r--r-- | ydb/core/ymq/queues/std/queries.cpp | 30 |
15 files changed, 363 insertions, 52 deletions
diff --git a/ydb/core/ymq/actor/events.h b/ydb/core/ymq/actor/events.h index 540b314b3ee..93744c74934 100644 --- a/ydb/core/ymq/actor/events.h +++ b/ydb/core/ymq/actor/events.h @@ -130,6 +130,9 @@ struct TSqsEvents { EvNodeTrackerUnsubscribeRequest, EvNodeTrackerSubscriptionStatus, + EvForceReloadState, + EvChangeRetentionActiveCheck, + EvEnd, }; @@ -730,6 +733,7 @@ struct TSqsEvents { struct TEvQueuePurgedNotification : public NActors::TEventLocal<TEvQueuePurgedNotification, EvQueuePurgedNotification> { ui64 Shard = 0; ui64 NewMessagesCount = 0; + TVector<ui64> DeletedOffsets; }; struct TEvGetRuntimeQueueAttributes : public NActors::TEventLocal<TEvGetRuntimeQueueAttributes, EvGetRuntimeQueueAttributes> { @@ -926,12 +930,28 @@ struct TSqsEvents { }; struct TEvNodeTrackerSubscriptionStatus : public NActors::TEventLocal<TEvNodeTrackerSubscriptionStatus, EvNodeTrackerSubscriptionStatus> { - explicit TEvNodeTrackerSubscriptionStatus(ui64 subscriptionId, ui32 nodeId) + TEvNodeTrackerSubscriptionStatus(ui64 subscriptionId, ui32 nodeId, bool disconnected=false) : SubscriptionId(subscriptionId) , NodeId(nodeId) + , Disconnected(disconnected) {} ui64 SubscriptionId; ui32 NodeId; + bool Disconnected; + }; + + struct TEvForceReloadState : public NActors::TEventLocal<TEvForceReloadState, EvForceReloadState> { + explicit TEvForceReloadState(TDuration nextTryAfter = TDuration::Zero()) + : NextTryAfter(nextTryAfter) + {} + TDuration NextTryAfter; + }; + + struct TEvChangeRetentionActiveCheck : public NActors::TEventLocal<TEvChangeRetentionActiveCheck, EvChangeRetentionActiveCheck> { + explicit TEvChangeRetentionActiveCheck(bool active) + : Active(active) + {} + bool Active; }; }; diff --git a/ydb/core/ymq/actor/node_tracker.cpp b/ydb/core/ymq/actor/node_tracker.cpp index 1fdc020347e..ba2d27004e1 100644 --- a/ydb/core/ymq/actor/node_tracker.cpp +++ b/ydb/core/ymq/actor/node_tracker.cpp @@ -132,15 +132,25 @@ namespace NKikimr::NSQS { }
}
- void TNodeTrackerActor::AnswerForSubscriber(ui64 subscriptionId, ui32 nodeId) {
- Send(ParentActor, new TSqsEvents::TEvNodeTrackerSubscriptionStatus(subscriptionId, nodeId));
+ void TNodeTrackerActor::AnswerForSubscriber(ui64 subscriptionId, ui32 nodeId, bool disconnected) {
+ Send(ParentActor, new TSqsEvents::TEvNodeTrackerSubscriptionStatus(subscriptionId, nodeId, disconnected));
}
void TNodeTrackerActor::HandlePipeClientDisconnected(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const NActors::TActorContext&) {
- auto it = TabletsInfo.find(ev->Get()->TabletId);
+ ui64 tabletId = ev->Get()->TabletId;
+ auto it = TabletsInfo.find(tabletId);
if (it != TabletsInfo.end()) {
- LOG_SQS_DEBUG(GetLogPrefix() << "tablet pipe " << ev->Get()->TabletId << " disconnected");
- ReconnectToTablet(ev->Get()->TabletId);
+ LOG_SQS_DEBUG(GetLogPrefix() << "tablet pipe " << tabletId << " disconnected");
+
+ auto& info = it->second;
+ if (info.PipeServer) {
+ for (auto& [id, subscriber] : info.Subscribers) {
+ if (subscriber->NodeId) {
+ AnswerForSubscriber(id, subscriber->NodeId.value(), true);
+ }
+ }
+ }
+ ReconnectToTablet(tabletId);
} else {
LOG_SQS_WARN(GetLogPrefix() << " disconnected from unrequired tablet id: [" << ev->Get()->TabletId << "]. Client pipe actor: " << ev->Get()->ClientId << ". Server pipe actor: " << ev->Get()->ServerId);
}
diff --git a/ydb/core/ymq/actor/node_tracker.h b/ydb/core/ymq/actor/node_tracker.h index 0529589b07b..7042f584473 100644 --- a/ydb/core/ymq/actor/node_tracker.h +++ b/ydb/core/ymq/actor/node_tracker.h @@ -76,7 +76,7 @@ public: ui64 GetTabletId(const TMap<TKeyPrefix, ui64>& tabletsPerEndKeyRange, TKeyPrefix keyPrefix) const; ui64 GetTabletId(const TSubscriberInfo& subscriber) const; - void AnswerForSubscriber(ui64 subscriptionId, ui32 nodeId); + void AnswerForSubscriber(ui64 subscriptionId, ui32 nodeId, bool disconnected=false); void RemoveSubscriber(TSqsEvents::TEvNodeTrackerUnsubscribeRequest::TPtr& request, const NActors::TActorContext& ctx); bool SubscriberMustWait(const TSubscriberInfo& subscriber) const; void AddSubscriber(TSqsEvents::TEvNodeTrackerSubscribeRequest::TPtr& request, const NActors::TActorContext& ctx); diff --git a/ydb/core/ymq/actor/purge.cpp b/ydb/core/ymq/actor/purge.cpp index e9c4c9b6eff..76bba1c4772 100644 --- a/ydb/core/ymq/actor/purge.cpp +++ b/ydb/core/ymq/actor/purge.cpp @@ -151,7 +151,20 @@ static void FillMessagesParam(NClient::TWriteValue& messagesParam, const NClient } void TPurgeActor::MakeStage2Request(ui64 cleanupVersion, const TValue& messages, const TMaybe<TValue>& inflyMessages, const ui64 shardId, TShard* shard) { - auto onExecuted = [this, shardId, shard] (const TSqsEvents::TEvExecuted::TRecord& ev) { + TVector<ui64> offsets; + auto collectOffsetsFrom = [&](const TValue& msgs) { + for (size_t i = 0; i < msgs.Size(); ++i) { + const ui64 offset = msgs[i]["Offset"]; + offsets.push_back(offset); + } + }; + + collectOffsetsFrom(messages); + if (inflyMessages) { + collectOffsetsFrom(*inflyMessages); + } + + auto onExecuted = [this, shardId, shard, offsets] (const TSqsEvents::TEvExecuted::TRecord& ev) { const ui32 status = ev.GetStatus(); if (status == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete) { const TValue val(TValue::Create(ev.GetExecutionEngineEvaluatedResponse())); @@ -168,6 +181,7 @@ void TPurgeActor::MakeStage2Request(ui64 cleanupVersion, const TValue& messages, auto notification = MakeHolder<TSqsEvents::TEvQueuePurgedNotification>(); notification->Shard = shardId; notification->NewMessagesCount = static_cast<ui64>(newMessagesCount); + notification->DeletedOffsets = std::move(offsets); Send(QueueLeader_, std::move(notification)); } diff --git a/ydb/core/ymq/actor/purge.h b/ydb/core/ymq/actor/purge.h index 69a1f836de4..77b385c5cf4 100644 --- a/ydb/core/ymq/actor/purge.h +++ b/ydb/core/ymq/actor/purge.h @@ -21,7 +21,6 @@ class TPurgeActor : public TActorBootstrapped<TPurgeActor> { ui64 Offset = 0; TInstant SentTimestamp = TInstant::Zero(); }; - std::pair<ui64, ui64> CurrentOffsets; TMessageBoundary CurrentLastMessage; TMessageBoundary PreviousSuccessfullyProcessedLastMessage; }; diff --git a/ydb/core/ymq/actor/queue_leader.cpp b/ydb/core/ymq/actor/queue_leader.cpp index 0cd68ab1601..32472412e88 100644 --- a/ydb/core/ymq/actor/queue_leader.cpp +++ b/ydb/core/ymq/actor/queue_leader.cpp @@ -27,6 +27,7 @@ LWTRACE_USING(SQS_PROVIDER); + namespace NKikimr::NSQS { constexpr ui64 UPDATE_COUNTERS_TAG = 0; @@ -38,7 +39,29 @@ const TString INFLY_INVALIDATION_REASON_VERSION_CHANGED = "InflyVersionChanged"; const TString INFLY_INVALIDATION_REASON_DEADLINE_CHANGED = "MessageDeadlineChanged"; const TString INFLY_INVALIDATION_REASON_DELETED = "MessageDeleted"; -TQueueLeader::TQueueLeader(TString userName, TString queueName, TString folderId, TString rootUrl, TIntrusivePtr<TQueueCounters> counters, TIntrusivePtr<TUserCounters> userCounters, const TActorId& schemeCache, const TIntrusivePtr<TSqsEvents::TQuoterResourcesForActions>& quoterResourcesForUser) +constexpr TDuration STATE_UPDATE_PERIOD_MIN = TDuration::MilliSeconds(200); +constexpr TDuration STATE_UPDATE_PERIOD_MAX = TDuration::Minutes(1); + +TDuration GetNextReloadStateWaitPeriod(TDuration current = TDuration::Zero()) { + if (current == STATE_UPDATE_PERIOD_MAX) { + return TDuration::Max(); + } + auto nextWaitPeriod = 2 * Max(current, STATE_UPDATE_PERIOD_MIN); + nextWaitPeriod += TDuration::MilliSeconds(RandomNumber<ui64>(nextWaitPeriod.MilliSeconds() / 2)); + return Max(STATE_UPDATE_PERIOD_MAX, nextWaitPeriod); +} + +TQueueLeader::TQueueLeader( + TString userName, + TString queueName, + TString folderId, + TString rootUrl, + TIntrusivePtr<TQueueCounters> counters, + TIntrusivePtr<TUserCounters> userCounters, + const TActorId& schemeCache, + const TIntrusivePtr<TSqsEvents::TQuoterResourcesForActions>& quoterResourcesForUser, + bool useCPUOptimization +) : UserName_(std::move(userName)) , QueueName_(std::move(queueName)) , FolderId_(std::move(folderId)) @@ -46,10 +69,14 @@ TQueueLeader::TQueueLeader(TString userName, TString queueName, TString folderId , SchemeCache_(schemeCache) , Counters_(std::move(counters)) , UserCounters_(std::move(userCounters)) + , UseCPUOptimization(useCPUOptimization) { if (quoterResourcesForUser) { QuoterResources_ = new TSqsEvents::TQuoterResourcesForActions(*quoterResourcesForUser); } + if (UseCPUOptimization) { + LOG_SQS_INFO("Use CPU optimization for queue " << TLogQueueName(UserName_, QueueName_)); + } } void TQueueLeader::Bootstrap() { @@ -62,8 +89,11 @@ void TQueueLeader::BecomeWorking() { Become(&TQueueLeader::StateWorking); const auto& cfg = Cfg(); const ui64 randomTimeToWait = RandomNumber<ui64>(cfg.GetBackgroundMetricsUpdateTimeMs() / 4); // Don't start all such operations at one moment - Schedule(TDuration::MilliSeconds(randomTimeToWait), new TEvWakeup(UPDATE_COUNTERS_TAG)); - + if (UseCPUOptimization) { + Schedule(TDuration::MilliSeconds(randomTimeToWait), new TSqsEvents::TEvForceReloadState(GetNextReloadStateWaitPeriod())); + } else { + Schedule(TDuration::MilliSeconds(randomTimeToWait), new TEvWakeup(UPDATE_COUNTERS_TAG)); + } Schedule(TDuration::Seconds(1), new TEvWakeup(UPDATE_MESSAGES_METRICS_TAG)); std::vector<TSqsEvents::TEvExecute::TPtr> requests; @@ -103,6 +133,7 @@ STATEFN(TQueueLeader::StateInit) { hFunc(TSqsEvents::TEvChangeMessageVisibilityBatch, HandleChangeMessageVisibilityBatchWhileIniting); // from change message visibility action actor hFunc(TSqsEvents::TEvGetRuntimeQueueAttributes, HandleGetRuntimeQueueAttributesWhileIniting); // from get queue attributes action actor hFunc(TSqsEvents::TEvDeadLetterQueueNotification, HandleDeadLetterQueueNotification); // service periodically notifies active dead letter queues + hFunc(TSqsEvents::TEvForceReloadState, HandleForceReloadState); // internal hFunc(TSqsEvents::TEvQueueId, HandleQueueId); // discover dlq id and version @@ -127,6 +158,7 @@ STATEFN(TQueueLeader::StateWorking) { hFunc(TSqsEvents::TEvChangeMessageVisibilityBatch, HandleChangeMessageVisibilityBatchWhileWorking); // from change message visibility action actor hFunc(TSqsEvents::TEvGetRuntimeQueueAttributes, HandleGetRuntimeQueueAttributesWhileWorking); // from get queue attributes action actor hFunc(TSqsEvents::TEvDeadLetterQueueNotification, HandleDeadLetterQueueNotification); // service periodically notifies active dead letter queues + hFunc(TSqsEvents::TEvForceReloadState, HandleForceReloadState); // internal hFunc(TSqsEvents::TEvQueueId, HandleQueueId); // discover dlq id and version @@ -196,6 +228,79 @@ void TQueueLeader::HandleWakeup(TEvWakeup::TPtr& ev) { } } + +void TQueueLeader::HandleForceReloadState(TSqsEvents::TEvForceReloadState::TPtr& ev) { + if (!UseCPUOptimization) { + return; + } + auto nextTryAfter = ev->Get()->NextTryAfter; + if (nextTryAfter != TDuration::Max()) { + Schedule(nextTryAfter, new TSqsEvents::TEvForceReloadState(GetNextReloadStateWaitPeriod(nextTryAfter))); + } + + if (UpdateStateRequestInProcess) { + LOG_SQS_DEBUG("Update state request already in process for queue " << TLogQueueName(UserName_, QueueName_)); + return; + } + LOG_SQS_DEBUG("Start update state request for queue " << TLogQueueName(UserName_, QueueName_)); + TExecutorBuilder(SelfId(), "") + .User(UserName_) + .Queue(QueueName_) + .QueueLeader(SelfId()) + .TablesFormat(TablesFormat_) + .QueryId(GET_STATE_ID) + .QueueVersion(QueueVersion_) + .Fifo(IsFifoQueue_) + .RetryOnTimeout() + .OnExecuted([this](const TSqsEvents::TEvExecuted::TRecord& ev) { HandleState(ev); }) + .Counters(Counters_) + .Params() + .Uint64("QUEUE_ID_NUMBER", QueueVersion_) + .Uint64("QUEUE_ID_NUMBER_HASH", GetKeysHash(QueueVersion_)) + .ParentBuilder().Start(); +} + +void TQueueLeader::HandleState(const TSqsEvents::TEvExecuted::TRecord& reply) { + LOG_SQS_DEBUG("Handle state for " << TLogQueueName(UserName_, QueueName_)); + Y_VERIFY(!UpdateStateRequestInProcess); + UpdateStateRequestInProcess = false; + + if (reply.GetStatus() == TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecComplete) { + using NKikimr::NClient::TValue; + const TValue val(TValue::Create(reply.GetExecutionEngineEvaluatedResponse())); + const TValue shardStates = val["state"]; + + for (size_t i = 0; i < shardStates.Size(); ++i) { + const auto& state = shardStates[i]; + const ui64 shard = IsFifoQueue_ ? 0 : (TablesFormat_ == 1 ? ui32(state["Shard"]) : ui64(state["State"])); + auto& shardInfo = Shards_[shard]; + auto inflyCountVal = state["InflyCount"]; + Y_VERIFY(i64(inflyCountVal) >= 0); + ui64 inflyMessagesCount = static_cast<ui64>(i64(inflyCountVal)); + ui64 inflyVersion = state["InflyVersion"].IsNull() ? 0 : ui64(state["InflyVersion"]); + + SetMessagesCount(shard, state["MessageCount"]); + shardInfo.InflyMessagesCount = inflyMessagesCount; + shardInfo.CreatedTimestamp = TInstant::MilliSeconds(ui64(state["CreatedTimestamp"])); + + if (shardInfo.InflyVersion != inflyVersion) { + shardInfo.NeedInflyReload = true; + } + + if (shardInfo.MessagesCount > 0) { + RequestOldestTimestampMetrics(shard); + } + if (shardInfo.MessagesCount == 0 && !shardInfo.MessagesCountWasGot) { + Send(RetentionActor_, new TSqsEvents::TEvChangeRetentionActiveCheck(false)); + } + + shardInfo.MessagesCountWasGot = true; + } + } else { + LOG_SQS_ERROR("Failed to update state for " << TLogQueueName(UserName_, QueueName_) << ": " << reply); + } +} + void TQueueLeader::HandleGetConfigurationWhileIniting(TSqsEvents::TEvGetConfiguration::TPtr& ev) { GetConfigurationRequests_.emplace_back(ev); } @@ -442,11 +547,7 @@ void TQueueLeader::OnSendBatchExecuted(ui64 shard, ui64 batchId, const TSqsEvent DelayStatistics_.AddDelayedMessage(batch->TransactionStartedTime + entry.Message.Delay, batch->TransactionStartedTime); } } - if (!IsFifoQueue_) { - const i64 newMessagesCount = val["newMessagesCount"]; - Y_VERIFY(newMessagesCount >= 0); - shardInfo.MessagesCount = static_cast<ui64>(newMessagesCount); - } + SetMessagesCount(shard, val["newMessagesCount"]); } else { const TString* prevRequestId = nullptr; for (size_t i = 0; i < batch->Size(); ++i) { @@ -642,10 +743,15 @@ void TQueueLeader::OnFifoMessagesReadSuccess(const NKikimr::NClient::TValue& val if (const ui64 movedMessagesCount = value["movedMessagesCount"]) { ADD_COUNTER(Counters_, MessagesMovedToDLQ, movedMessagesCount); - const i64 newMessagesCount = value["newMessagesCount"]; - Y_VERIFY(newMessagesCount >= 0); - auto& shardInfo = Shards_[0]; - shardInfo.MessagesCount = static_cast<ui64>(newMessagesCount); + SetMessagesCount(0, value["newMessagesCount"]); + + const auto& moved = value["movedMessages"]; // TODO return only moved offsets + for (size_t i = 0; i < moved.Size(); ++i) { + const ui64 offset = moved[i]["SourceOffset"]; + if (offset == Shards_.front().OldestMessageOffset) { + RequestOldestTimestampMetrics(0); + } + } } reqInfo.Answer->Messages.resize(list.Size()); @@ -831,16 +937,14 @@ void TQueueLeader::OnLoadStdMessageResult(const TString& requestId, const ui64 o } } -void TQueueLeader::OnLoadStdMessagesBatchSuccess(const NKikimr::NClient::TValue& value, TShardInfo& shardInfo, TIntrusivePtr<TLoadBatch> batch) { +void TQueueLeader::OnLoadStdMessagesBatchSuccess(const NKikimr::NClient::TValue& value, ui64 shard, TShardInfo& shardInfo, TIntrusivePtr<TLoadBatch> batch) { const NKikimr::NClient::TValue list(value["result"]); Y_VERIFY(list.Size() == batch->Size()); if (const ui64 movedMessagesCount = value["movedMessagesCount"]) { ADD_COUNTER(Counters_, MessagesMovedToDLQ, movedMessagesCount); - const i64 newMessagesCount = value["newMessagesCount"]; - Y_VERIFY(newMessagesCount >= 0); - shardInfo.MessagesCount = static_cast<ui64>(newMessagesCount); + SetMessagesCount(shard, value["newMessagesCount"]); } THashMap<ui64, const TLoadBatchEntry*> offset2entry; @@ -852,6 +956,14 @@ void TQueueLeader::OnLoadStdMessagesBatchSuccess(const NKikimr::NClient::TValue& for (size_t i = 0; i < list.Size(); ++i) { auto msg = list[i]; const ui64 offset = msg["Offset"]; + + const bool exists = msg["Exists"]; + const auto wasDeadLetterValue = msg["IsDeadLetter"]; + const bool wasDeadLetter = wasDeadLetterValue.HaveValue() ? bool(wasDeadLetterValue) : false; + const bool valid = msg["Valid"]; + if (exists && wasDeadLetter && valid && shardInfo.OldestMessageOffset == offset) { + RequestOldestTimestampMetrics(shard); + } const auto entry = offset2entry.find(offset); Y_VERIFY(entry != offset2entry.end()); OnLoadStdMessageResult(entry->second->RequestId, offset, true, &msg, false); @@ -873,7 +985,7 @@ void TQueueLeader::OnLoadStdMessagesBatchExecuted(ui64 shard, ui64 batchId, cons dlqExists = value["dlqExists"]; if (dlqExists) { success = true; - OnLoadStdMessagesBatchSuccess(value, shardInfo, batch); + OnLoadStdMessagesBatchSuccess(value, shard, shardInfo, batch); } } @@ -1081,6 +1193,9 @@ void TQueueLeader::OnDeleteBatchExecuted(ui64 shard, ui64 batchId, const TSqsEve OnMessageDeleted(entry.RequestId, shard, entry.IndexInRequest, reply, &messageResult); } batch->Offset2Entry.erase(first, last); + if (shardInfo.OldestMessageOffset == offset) { + RequestOldestTimestampMetrics(shard); + } } // others are already deleted messages: for (const auto& [offset, entryIndex] : batch->Offset2Entry) { @@ -1088,11 +1203,7 @@ void TQueueLeader::OnDeleteBatchExecuted(ui64 shard, ui64 batchId, const TSqsEve OnMessageDeleted(entry.RequestId, shard, entry.IndexInRequest, reply, nullptr); } - if (!IsFifoQueue_) { - const i64 newMessagesCount = val["newMessagesCount"]; - Y_VERIFY(newMessagesCount >= 0); - shardInfo.MessagesCount = static_cast<ui64>(newMessagesCount); - } + SetMessagesCount(shard, val["newMessagesCount"]); } else { const TString* prevRequestId = nullptr; for (size_t i = 0; i < batch->Size(); ++i) { @@ -1496,7 +1607,11 @@ void TQueueLeader::StartGatheringMetrics() { IsDlqQueue_ = false; } - + + if (UseCPUOptimization) { + return; + } + for (ui64 shard = 0; shard < ShardsCount_; ++shard) { if (IsFifoQueue_ || IsDlqQueue_) { RequestMessagesCountMetrics(shard); @@ -1564,7 +1679,7 @@ void TQueueLeader::ReceiveMessagesCountMetrics(ui64 shard, const TSqsEvents::TEv LOG_SQS_DEBUG("Handle message count metrics for " << TLogQueueName(UserName_, QueueName_, shard)); Y_VERIFY(MetricsQueriesInfly_ > 0); --MetricsQueriesInfly_; - if (MetricsQueriesInfly_ == 0) { + if (MetricsQueriesInfly_ == 0 && !UseCPUOptimization) { ScheduleMetricsRequest(); } Y_VERIFY(shard < Shards_.size()); @@ -1575,8 +1690,7 @@ void TQueueLeader::ReceiveMessagesCountMetrics(ui64 shard, const TSqsEvents::TEv const TValue val(TValue::Create(reply.GetExecutionEngineEvaluatedResponse())); const TValue messagesCount = val["messagesCount"]; if (!messagesCount.IsNull()) { // can be null in case of parallel queue deletion (SQS-292) - Y_VERIFY(i64(messagesCount) >= 0); - Shards_[shard].MessagesCount = static_cast<ui64>(i64(messagesCount)); // MessageCount is Int64 type in database + SetMessagesCount(shard, messagesCount); } const TValue createdTimestamp = val["createdTimestamp"]; if (!createdTimestamp.IsNull()) { @@ -1599,7 +1713,7 @@ void TQueueLeader::ReceiveOldestTimestampMetrics(ui64 shard, const TSqsEvents::T LOG_SQS_DEBUG("Handle oldest timestamp metrics for " << TLogQueueName(UserName_, QueueName_, shard)); Y_VERIFY(MetricsQueriesInfly_ > 0); --MetricsQueriesInfly_; - if (MetricsQueriesInfly_ == 0) { + if (MetricsQueriesInfly_ == 0 && !UseCPUOptimization) { ScheduleMetricsRequest(); } Y_VERIFY(shard < Shards_.size()); @@ -1610,6 +1724,10 @@ void TQueueLeader::ReceiveOldestTimestampMetrics(ui64 shard, const TSqsEvents::T const TValue list = val["messages"]; if (list.Size()) { Shards_[shard].LastSuccessfulOldestMessageTimestampValueMs = Shards_[shard].OldestMessageTimestampMs = list[0]["SentTimestamp"]; + Shards_[shard].OldestMessageOffset = list[0]["Offset"]; + LOG_SQS_INFO("Got oldest message for " << TLogQueueName(UserName_, QueueName_, shard) + << ": offset=" << Shards_[shard].OldestMessageOffset << " ts=" << TInstant::MilliSeconds(Shards_[shard].OldestMessageTimestampMs)); + } else { Shards_[shard].OldestMessageTimestampMs = Max(); } @@ -1620,6 +1738,27 @@ void TQueueLeader::ReceiveOldestTimestampMetrics(ui64 shard, const TSqsEvents::T ReportOldestTimestampMetricsIfReady(); } +void TQueueLeader::SetMessagesCount(ui64 shard, const NKikimr::NClient::TValue& value) { + const i64 newMessagesCountVal = value; + Y_VERIFY(newMessagesCountVal >= 0); + ui64 newMessagesCount = static_cast<ui64>(newMessagesCountVal); + SetMessagesCount(shard, newMessagesCount); +} + +void TQueueLeader::SetMessagesCount(ui64 shard, ui64 newMessagesCount) { + TShardInfo& shardInfo = Shards_[shard]; + if (UseCPUOptimization) { + if (shardInfo.MessagesCount == 0 && newMessagesCount > 0) { + RequestOldestTimestampMetrics(shard); + Send(RetentionActor_, new TSqsEvents::TEvChangeRetentionActiveCheck(true)); + } + if (shardInfo.MessagesCount > 0 && newMessagesCount == 0) { + Send(RetentionActor_, new TSqsEvents::TEvChangeRetentionActiveCheck(false)); + } + } + shardInfo.MessagesCount = newMessagesCount; +} + void TQueueLeader::ScheduleMetricsRequest() { const ui64 updateTime = Cfg().GetBackgroundMetricsUpdateTimeMs(); const ui64 randomTimeToWait = RandomNumber<ui32>(updateTime / 4); @@ -1632,7 +1771,7 @@ void TQueueLeader::ReportMessagesCountMetricsIfReady() { const TInstant now = TActivationContext::Now(); for (const auto& shardInfo : Shards_) { if (IsFifoQueue_) { - if (shardInfo.MessagesCountIsRequesting) { + if (shardInfo.MessagesCountIsRequesting || !shardInfo.MessagesCountWasGot) { return; } } else { @@ -1658,8 +1797,13 @@ void TQueueLeader::ReportOldestTimestampMetricsIfReady() { if (shardInfo.OldestMessageAgeIsRequesting) { return; } - oldestMessagesTimestamp = Min(oldestMessagesTimestamp, shardInfo.OldestMessageTimestampMs); + ui64 ts = shardInfo.OldestMessageTimestampMs; + if (UseCPUOptimization && shardInfo.MessagesCountWasGot && shardInfo.MessagesCount == 0) { + ts = Max<ui64>(); + } + oldestMessagesTimestamp = Min(oldestMessagesTimestamp, ts); } + if (Counters_) { if (oldestMessagesTimestamp != Max<ui64>()) { auto age = (TActivationContext::Now() - TInstant::MilliSeconds(oldestMessagesTimestamp)).Seconds(); @@ -1793,14 +1937,13 @@ void TQueueLeader::OnInflyLoaded(ui64 shard, const TSqsEvents::TEvExecuted::TRec shardInfo.InflyVersion = val["inflyVersion"]; LOG_SQS_DEBUG("Infly version for shard " << TLogQueueName(UserName_, QueueName_, shard) << ": " << shardInfo.InflyVersion); - auto messagesCount = val["messageCount"]; auto inflyCount = val["inflyCount"]; - Y_VERIFY(i64(messagesCount) >= 0); Y_VERIFY(i64(inflyCount) >= 0); - shardInfo.MessagesCount = static_cast<ui64>(i64(messagesCount)); + SetMessagesCount(shard, val["messageCount"]); shardInfo.InflyMessagesCount = static_cast<ui64>(i64(inflyCount)); shardInfo.ReadOffset = val["readOffset"]; shardInfo.CreatedTimestamp = TInstant::MilliSeconds(ui64(val["createdTimestamp"])); + shardInfo.DelayStatisticsInited = true; @@ -1894,7 +2037,7 @@ void TQueueLeader::OnAddedMessagesToInfly(ui64 shard, const TSqsEvents::TEvExecu shardInfo.ReadOffset = val["readOffset"]; // Update messages count - shardInfo.MessagesCount = static_cast<ui64>(i64(val["messagesCount"])); + SetMessagesCount(shard, val["messagesCount"]); } } else { LOG_SQS_ERROR("Failed to add new messages to infly for " << TLogQueueName(UserName_, QueueName_, shard) << ": " << reply); @@ -2113,8 +2256,15 @@ void TQueueLeader::HandleInflyIsPurgingNotification(TSqsEvents::TEvInflyIsPurgin } void TQueueLeader::HandleQueuePurgedNotification(TSqsEvents::TEvQueuePurgedNotification::TPtr& ev) { - auto& shardInfo = Shards_[ev->Get()->Shard]; - shardInfo.MessagesCount = ev->Get()->NewMessagesCount; + ui64 shard = ev->Get()->Shard; + auto& shardInfo = Shards_[shard]; + SetMessagesCount(shard, ev->Get()->NewMessagesCount); + for (ui64 offset : ev->Get()->DeletedOffsets) { + if (shardInfo.OldestMessageOffset <= offset) { + RequestOldestTimestampMetrics(shard); + break; + } + } } void TQueueLeader::HandleGetRuntimeQueueAttributesWhileIniting(TSqsEvents::TEvGetRuntimeQueueAttributes::TPtr& ev) { @@ -2134,6 +2284,7 @@ void TQueueLeader::HandleDeadLetterQueueNotification(TSqsEvents::TEvDeadLetterQu if (!IsFifoQueue_ && !IsDlqQueue_) { // we need to start the process only once IsDlqQueue_ = true; + UseCPUOptimization = false; LOG_SQS_INFO("Started periodic message counting for queue " << TLogQueueName(UserName_, QueueName_) << ". Latest dlq notification was at " << LatestDlqNotificationTs_); diff --git a/ydb/core/ymq/actor/queue_leader.h b/ydb/core/ymq/actor/queue_leader.h index c4a6a7ffddc..ef68e5c0d89 100644 --- a/ydb/core/ymq/actor/queue_leader.h +++ b/ydb/core/ymq/actor/queue_leader.h @@ -33,7 +33,17 @@ class TQueueLeader : public TActorBootstrapped<TQueueLeader> { struct TLoadBatch; public: - TQueueLeader(TString userName, TString queueName, TString folderId, TString rootUrl, TIntrusivePtr<TQueueCounters> counters, TIntrusivePtr<TUserCounters> userCounters, const TActorId& schemeCache, const TIntrusivePtr<TSqsEvents::TQuoterResourcesForActions>& quoterResourcesForUser); + TQueueLeader( + TString userName, + TString queueName, + TString folderId, + TString rootUrl, + TIntrusivePtr<TQueueCounters> counters, + TIntrusivePtr<TUserCounters> userCounters, + const TActorId& schemeCache, + const TIntrusivePtr<TSqsEvents::TQuoterResourcesForActions>& quoterResourcesForUser, + bool useCPUOptimization + ); void Bootstrap(); @@ -47,6 +57,7 @@ private: void PassAway() override; void HandleWakeup(TEvWakeup::TPtr& ev); + void HandleState(const TSqsEvents::TEvExecuted::TRecord& ev); void HandleGetConfigurationWhileIniting(TSqsEvents::TEvGetConfiguration::TPtr& ev); void HandleGetConfigurationWhileWorking(TSqsEvents::TEvGetConfiguration::TPtr& ev); void HandleExecuteWhileIniting(TSqsEvents::TEvExecute::TPtr& ev); @@ -68,9 +79,11 @@ private: void HandleGetRuntimeQueueAttributesWhileIniting(TSqsEvents::TEvGetRuntimeQueueAttributes::TPtr& ev); void HandleGetRuntimeQueueAttributesWhileWorking(TSqsEvents::TEvGetRuntimeQueueAttributes::TPtr& ev); void HandleDeadLetterQueueNotification(TSqsEvents::TEvDeadLetterQueueNotification::TPtr& ev); + void HandleForceReloadState(TSqsEvents::TEvForceReloadState::TPtr& ev); void BecomeWorking(); void RequestConfiguration(); + void UpdateStateRequest(); void StartGatheringMetrics(); void RequestMessagesCountMetrics(ui64 shard); void RequestOldestTimestampMetrics(ui64 shard); @@ -128,7 +141,7 @@ private: void WaitAddMessagesToInflyOrTryAnotherShard(TReceiveMessageBatchRequestProcessing& reqInfo); void Reply(TReceiveMessageBatchRequestProcessing& reqInfo); // batching - void OnLoadStdMessagesBatchSuccess(const NKikimr::NClient::TValue& value, TShardInfo& shardInfo, TIntrusivePtr<TLoadBatch> batch); + void OnLoadStdMessagesBatchSuccess(const NKikimr::NClient::TValue& value, ui64 shard, TShardInfo& shardInfo, TIntrusivePtr<TLoadBatch> batch); void OnLoadStdMessagesBatchExecuted(ui64 shard, ui64 batchId, const bool usedDLQ, const TSqsEvents::TEvExecuted::TRecord& reply); // delete @@ -144,6 +157,8 @@ private: TQueuePath GetQueuePath() { return TQueuePath(Cfg().GetRoot(), UserName_, QueueName_, QueueVersion_); } + void SetMessagesCount(ui64 shard, const NKikimr::NClient::TValue& value); + void SetMessagesCount(ui64 shard, ui64 value); void ScheduleMetricsRequest(); @@ -376,6 +391,7 @@ private: ui64 MessagesCount = 0; ui64 InflyMessagesCount = 0; ui64 OldestMessageTimestampMs = Max(); + ui64 OldestMessageOffset = 0; ui64 LastSuccessfulOldestMessageTimestampValueMs = 0; // for query optimization - more accurate range bool MessagesCountIsRequesting = false; @@ -417,6 +433,8 @@ private: }; std::vector<TShardInfo> Shards_; TMessageDelayStatistics DelayStatistics_; + bool UpdateStateRequestInProcess = false; + bool UseCPUOptimization = false; // background actors TActorId DeduplicationCleanupActor_; diff --git a/ydb/core/ymq/actor/retention.cpp b/ydb/core/ymq/actor/retention.cpp index 6384ca1127c..26957e71188 100644 --- a/ydb/core/ymq/actor/retention.cpp +++ b/ydb/core/ymq/actor/retention.cpp @@ -57,7 +57,9 @@ void TRetentionActor::SetRetentionBoundary() { RLOG_SQS_ERROR("Failed to set retention boundary for queue " << TLogQueueName(QueuePath_)); } - Schedule(RandomRetentionPeriod(), new TEvWakeup()); + if (Active) { + Schedule(RandomRetentionPeriod(), new TEvWakeup()); + } }; TExecutorBuilder(SelfId(), RequestId_) @@ -83,6 +85,14 @@ void TRetentionActor::HandleExecuted(TSqsEvents::TEvExecuted::TPtr& ev) { ev->Get()->Call(); } +void TRetentionActor::Handle(TSqsEvents::TEvChangeRetentionActiveCheck::TPtr& ev) { + if (!Active && ev->Get()->Active) { + Schedule(RandomRetentionPeriod(), new TEvWakeup()); + } + Active = ev->Get()->Active; + RLOG_SQS_INFO("Handle change retention actor state " << TLogQueueName(QueuePath_) << " : " << Active); +} + void TRetentionActor::HandlePoisonPill(TEvPoisonPill::TPtr&) { RLOG_SQS_DEBUG("Handle poison pill in retention actor for queue " << TLogQueueName(QueuePath_)); PassAway(); @@ -97,6 +107,7 @@ STATEFN(TRetentionActor::StateFunc) { switch (ev->GetTypeRewrite()) { cFunc(TEvWakeup::EventType, HandleWakeup); hFunc(TSqsEvents::TEvExecuted, HandleExecuted); + hFunc(TSqsEvents::TEvChangeRetentionActiveCheck, Handle); hFunc(TEvPoisonPill, HandlePoisonPill); } } diff --git a/ydb/core/ymq/actor/retention.h b/ydb/core/ymq/actor/retention.h index 3c4f47f4a2d..e253102dcc8 100644 --- a/ydb/core/ymq/actor/retention.h +++ b/ydb/core/ymq/actor/retention.h @@ -23,6 +23,7 @@ private: void SetRetentionBoundary(); void HandleExecuted(TSqsEvents::TEvExecuted::TPtr& ev); + void Handle(TSqsEvents::TEvChangeRetentionActiveCheck::TPtr& ev); void HandlePoisonPill(TEvPoisonPill::TPtr&); void HandleWakeup(); @@ -34,6 +35,7 @@ private: const ui32 TablesFormat_; const TString RequestId_; const TActorId QueueLeader_; + bool Active = true; }; } // namespace NKikimr::NSQS diff --git a/ydb/core/ymq/actor/service.cpp b/ydb/core/ymq/actor/service.cpp index b42e38e8304..4e491df71b9 100644 --- a/ydb/core/ymq/actor/service.cpp +++ b/ydb/core/ymq/actor/service.cpp @@ -77,7 +77,7 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> { TString folderId, ui32 tablesFormat, ui64 version, ui64 shardsCount, const TIntrusivePtr<TUserCounters>& userCounters, const TIntrusivePtr<TFolderCounters>& folderCounters, const TActorId& schemeCache, TIntrusivePtr<TSqsEvents::TQuoterResourcesForActions> quoterResourcesForUser, - bool insertCounters + bool insertCounters, bool useLeaderCPUOptimization ) : UserName_(std::move(userName)) , QueueName_(std::move(queueName)) @@ -94,6 +94,7 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> { , FolderCounters_(folderCounters) , SchemeCache_(schemeCache) , QuoterResourcesForUser_(std::move(quoterResourcesForUser)) + , UseLeaderCPUOptimization(useLeaderCPUOptimization) { } @@ -112,12 +113,21 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> { StopLocalLeaderIfNeeded(LEADER_DESTROY_REASON_TABLET_ON_ANOTHER_NODE); } } + + void LocalLeaderWayMoved() const { + if (LocalLeader_) { + TActivationContext::Send(new IEventHandleFat(LocalLeader_, SelfId(), new TSqsEvents::TEvForceReloadState())); + } + } void StartLocalLeader(const TString& reason) { if (!LocalLeader_) { Counters_ = Counters_->GetCountersForLeaderNode(); LWPROBE(CreateLeader, UserName_, QueueName_, reason); - LocalLeader_ = TActivationContext::Register(new TQueueLeader(UserName_, QueueName_, FolderId_, RootUrl_, Counters_, UserCounters_, SchemeCache_, QuoterResourcesForUser_)); + LocalLeader_ = TActivationContext::Register(new TQueueLeader( + UserName_, QueueName_, FolderId_, RootUrl_, Counters_, UserCounters_, + SchemeCache_, QuoterResourcesForUser_, UseLeaderCPUOptimization + )); LOG_SQS_INFO("Start local leader [" << UserName_ << "/" << QueueName_ << "] actor " << LocalLeader_); if (FolderId_) { @@ -183,6 +193,7 @@ struct TSqsService::TQueueInfo : public TAtomicRefCount<TQueueInfo> { TActorId SchemeCache_; ui64 LocalLeaderRefCount_ = 0; TIntrusivePtr<TSqsEvents::TQuoterResourcesForActions> QuoterResourcesForUser_; + bool UseLeaderCPUOptimization; // State machine THashSet<TSqsEvents::TEvGetLeaderNodeForQueueRequest::TPtr> GetLeaderNodeRequests_; @@ -246,6 +257,7 @@ struct TSqsService::TUserInfo : public TAtomicRefCount<TUserInfo> { TLocalRateLimiterResource DeleteObjectsQuoterResource_; TLocalRateLimiterResource OtherActionsQuoterResource_; i64 EarlyRequestQueuesListBudget_ = EARLY_REQUEST_QUEUES_LIST_MAX_BUDGET; // Defence from continuously requesting queues list. + bool UseLeaderCPUOptimization = false; // State machine THashMultiMap<TString, TSqsEvents::TEvGetLeaderNodeForQueueRequest::TPtr> GetLeaderNodeRequests_; // queue name -> request @@ -747,7 +759,11 @@ void TSqsService::HandleNodeTrackingSubscriptionStatus(TSqsEvents::TEvNodeTracke auto queuePtr = it->second; auto& queue = *queuePtr; auto nodeId = ev->Get()->NodeId; + bool disconnected = ev->Get()->Disconnected; queue.SetLeaderNodeId(nodeId); + if (disconnected) { + queue.LocalLeaderWayMoved(); + } LOG_SQS_DEBUG( "Got node leader for queue [" << queue.UserName_ << "/" << queue.QueueName_ << "]. Node: " << nodeId << " subscription id: " << subscriptionId @@ -921,6 +937,16 @@ void TSqsService::HandleUserSettingsChanged(TSqsEvents::TEvUserSettingsChanged:: const bool needExport = FromStringWithDefault(value->second, false); user->Counters_->ExportTransactionCounters(needExport); } + + if (IsIn(*diff, USE_CPU_LEADER_OPTIMIZATION)) { + const auto value = newSettings->find(USE_CPU_LEADER_OPTIMIZATION); + Y_VERIFY(value != newSettings->end()); + const bool use = FromStringWithDefault(value->second, false); + user->UseLeaderCPUOptimization = use; + for (auto queue : user->Queues_) { + queue.second->UseLeaderCPUOptimization = use; + } + } } TSqsService::TUserInfoPtr TSqsService::MutableUser(const TString& userName, bool moveUserRequestsToUserRecord, bool* requestsWereMoved) { @@ -1039,7 +1065,7 @@ std::map<TString, TSqsService::TQueueInfoPtr>::iterator TSqsService::AddQueue(co auto ret = user->Queues_.insert(std::make_pair(queue, TQueueInfoPtr(new TQueueInfo( userName, queue, RootUrl_, leaderTabletId, isFifo, customName, folderId, tablesFormat, version, shardsCount, - user->Counters_, folderCntrIter->second, SchemeCache_, user->QuoterResources_, insertCounters))) + user->Counters_, folderCntrIter->second, SchemeCache_, user->QuoterResources_, insertCounters, user->UseLeaderCPUOptimization))) ).first; auto queueInfo = ret->second; diff --git a/ydb/core/ymq/actor/user_settings_names.cpp b/ydb/core/ymq/actor/user_settings_names.cpp index 3d315490bc8..08ba0c19e60 100644 --- a/ydb/core/ymq/actor/user_settings_names.cpp +++ b/ydb/core/ymq/actor/user_settings_names.cpp @@ -6,5 +6,6 @@ extern const TString USER_SETTING_MAX_QUEUES_COUNT = "MaxQueuesCount"; extern const TString USER_SETTING_DISABLE_COUNTERS = "DisableCounters"; extern const TString USER_SETTING_SHOW_DETAILED_COUNTERS_DEADLINE_MS = "ShowDetailedCountersDeadlineMs"; extern const TString USER_SETTING_EXPORT_TRANSACTION_COUNTERS = "ExportTransactionCounters"; +extern const TString USE_CPU_LEADER_OPTIMIZATION = "UseCPULeaderOptimization"; } // namespace NKikimr::NSQS diff --git a/ydb/core/ymq/actor/user_settings_names.h b/ydb/core/ymq/actor/user_settings_names.h index 11ee80f43a7..5cf2a8e13ef 100644 --- a/ydb/core/ymq/actor/user_settings_names.h +++ b/ydb/core/ymq/actor/user_settings_names.h @@ -9,5 +9,6 @@ extern const TString USER_SETTING_MAX_QUEUES_COUNT; extern const TString USER_SETTING_DISABLE_COUNTERS; extern const TString USER_SETTING_SHOW_DETAILED_COUNTERS_DEADLINE_MS; extern const TString USER_SETTING_EXPORT_TRANSACTION_COUNTERS; +extern const TString USE_CPU_LEADER_OPTIMIZATION; } // namespace NKikimr::NSQS diff --git a/ydb/core/ymq/base/query_id.h b/ydb/core/ymq/base/query_id.h index 4c75bcb5514..c2e0d9ba908 100644 --- a/ydb/core/ymq/base/query_id.h +++ b/ydb/core/ymq/base/query_id.h @@ -29,6 +29,7 @@ enum EQueryId { READ_OR_REDRIVE_MESSAGE_ID, GET_USER_SETTINGS_ID, GET_QUEUES_LIST_ID, + GET_STATE_ID, QUERY_VECTOR_SIZE, }; diff --git a/ydb/core/ymq/queues/fifo/queries.cpp b/ydb/core/ymq/queues/fifo/queries.cpp index d70f58caca1..424fcf5c856 100644 --- a/ydb/core/ymq/queues/fifo/queries.cpp +++ b/ydb/core/ymq/queues/fifo/queries.cpp @@ -400,6 +400,7 @@ const char* const DeleteMessageQuery = R"__( (return (Extend (AsList (SetResult 'deleted result)) + (AsList (SetResult 'newMessagesCount count)) (ListIf (HasItems valid) (UpdateRow stateTable stateRow stateUpdate)) (Map valid (lambda '(item) (block '( @@ -865,6 +866,7 @@ const char* const ReadOrRedriveMessageQuery = R"__( (AsList (SetResult 'dlqExists dlqExists)) (AsList (SetResult 'result messagesToReturnAsStruct)) (AsList (SetResult 'movedMessagesCount (Length messagesToMoveAsStruct))) + (AsList (SetResult 'movedMessages messagesToMoveAsStruct)) (AsList (SetResult 'newMessagesCount newSourceMsgCount)) (ListIf (And (HasItems messagesToMoveAsStruct) dlqExists) (UpdateRow dlqStateTable dlqStateRow dlqStateUpdate)) (ListIf (And (HasItems messagesToMoveAsStruct) dlqExists) (UpdateRow sourceStateTable sourceStateRow sourceStateUpdate)) @@ -1132,6 +1134,7 @@ const char* const WriteMessageQuery = R"__( (return (Extend (AsList (SetResult 'result result)) + (AsList (SetResult 'newMessagesCount newMessagesCount)) (AsList (If (Greater (Length messagesAdded) (Uint64 '0)) (UpdateRow stateTable stateRow stateUpdate) (Void))) @@ -1410,7 +1413,8 @@ const char* const GetOldestMessageTimestampMetricsQuery = R"__( '('SentTimestamp timeFrom (Uint64 '18446744073709551615)) '('Offset (Uint64 '0) (Uint64 '18446744073709551615)))) (let sentIdxSelect '( - 'SentTimestamp)) + 'SentTimestamp + 'Offset)) (let selectResult (SelectRange sentTsIdx sentIdxRange sentIdxSelect '('('"ItemsLimit" (Uint64 '1))))) (let messages (Member selectResult 'List)) @@ -1495,6 +1499,29 @@ const char* const ListDeadLetterSourceQueuesQuery = R"__( ) )__"; +const char* const GetStateQuery = R"__( + ( + (let queueIdNumber (Parameter 'QUEUE_ID_NUMBER (DataType 'Uint64))) + (let queueIdNumberHash (Parameter 'QUEUE_ID_NUMBER_HASH (DataType 'Uint64))) + + (let stateTable ')__" QUEUE_TABLES_FOLDER_PARAM R"__(/State) + + (let stateRange '( + )__" ALL_SHARDS_RANGE_PARAM R"__( + )) + (let stateSelect '( + 'MessageCount + 'InflyCount + 'InflyVersion + 'CreatedTimestamp + 'RetentionBoundary)) + + (let stateRead (Member (SelectRange stateTable stateRange stateSelect '()) 'List)) + + (return (AsList (SetResult 'state stateRead))) + ) +)__"; + } // namespace const char* GetFifoQueryById(size_t id) { @@ -1535,6 +1562,8 @@ const char* GetFifoQueryById(size_t id) { return ListDeadLetterSourceQueuesQuery; case READ_OR_REDRIVE_MESSAGE_ID: return ReadOrRedriveMessageQuery; + case GET_STATE_ID: + return GetStateQuery; } return nullptr; diff --git a/ydb/core/ymq/queues/std/queries.cpp b/ydb/core/ymq/queues/std/queries.cpp index cef1237c2ee..8b6d93f951e 100644 --- a/ydb/core/ymq/queues/std/queries.cpp +++ b/ydb/core/ymq/queues/std/queries.cpp @@ -1105,7 +1105,8 @@ const char* const GetOldestMessageTimestampMetricsQuery = R"__( '('SentTimestamp timeFrom (Uint64 '18446744073709551615)) '('Offset (Uint64 '0) (Uint64 '18446744073709551615)))) (let sentIdxSelect '( - 'SentTimestamp)) + 'SentTimestamp + 'Offset)) (let selectResult (SelectRange sentTsIdx sentIdxRange sentIdxSelect '('('"ItemsLimit" (Uint64 '1))))) (let messages (Member selectResult 'List)) @@ -1286,6 +1287,31 @@ const char* const GetMessageCountMetricsQuery = R"__( ) )__"; + +const char* const GetStateQuery = R"__( + ( + (let queueIdNumber (Parameter 'QUEUE_ID_NUMBER (DataType 'Uint64))) + (let queueIdNumberHash (Parameter 'QUEUE_ID_NUMBER_HASH (DataType 'Uint64))) + + (let stateTable ')__" QUEUE_TABLES_FOLDER_PARAM R"__(/State) + + (let stateRange '( + )__" ALL_SHARDS_RANGE_PARAM R"__( + )) + (let stateSelect '( + ')__" SHARD_COLUMN_NAME_PARAM R"__( + 'MessageCount + 'InflyCount + 'InflyVersion + 'CreatedTimestamp + 'RetentionBoundary)) + + (let stateRead (Member (SelectRange stateTable stateRange stateSelect '()) 'List)) + (return (AsList (SetResult 'state stateRead))) + ) +)__"; + + const char* const GetQueuesListQuery = R"__( ( (let fromUser (Parameter 'FROM_USER (DataType 'Utf8String))) @@ -1363,6 +1389,8 @@ const char* GetStdQueryById(size_t id) { return GetQueuesListQuery; case GET_MESSAGE_COUNT_METRIC_ID: return GetMessageCountMetricsQuery; + case GET_STATE_ID: + return GetStateQuery; } return nullptr; } |