diff options
author | alexbogo <[email protected]> | 2022-08-30 11:25:59 +0300 |
---|---|---|
committer | alexbogo <[email protected]> | 2022-08-30 11:25:59 +0300 |
commit | adb44f55f06baa2e46dfb9f688f9f733ddd3bb78 (patch) | |
tree | 46ba0de2802a47d07379075270cf3961cc755c4c | |
parent | 9f9a20af2885daf80e5e7d16c7a3597aeefdb5c8 (diff) |
[ymq] add more logs in cleanup actor, add unremoved queues lag in count
init
-rw-r--r-- | ydb/core/ymq/actor/cleanup_queue_data.cpp | 18 | ||||
-rw-r--r-- | ydb/core/ymq/actor/monitoring.cpp | 6 | ||||
-rw-r--r-- | ydb/core/ymq/base/counters.cpp | 1 | ||||
-rw-r--r-- | ydb/core/ymq/base/counters.h | 1 |
4 files changed, 16 insertions, 10 deletions
diff --git a/ydb/core/ymq/actor/cleanup_queue_data.cpp b/ydb/core/ymq/actor/cleanup_queue_data.cpp index 45f34170992..0d94214b856 100644 --- a/ydb/core/ymq/actor/cleanup_queue_data.cpp +++ b/ydb/core/ymq/actor/cleanup_queue_data.cpp @@ -96,6 +96,7 @@ namespace NKikimr::NSQS { } void TCleanupQueueDataActor::RunGetQueuesQuery(EState state, TDuration sendAfter, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::SQS, "[cleanup removed queues] getting queues..."); State = state; NClient::TParameters params; @@ -128,7 +129,7 @@ namespace NKikimr::NSQS { Y_VERIFY(response.GetResults().size() == 1); const auto& rr = response.GetResults(0).GetValue().GetStruct(0); if (rr.GetList().empty()) { - LOG_DEBUG_S(ctx, NKikimrServices::SQS, "there are no queues to delete"); + LOG_DEBUG_S(ctx, NKikimrServices::SQS, "[cleanup removed queues] there are no queues to delete"); LockQueueToRemove(IDLE_TIMEOUT, ctx); return; } @@ -145,12 +146,11 @@ namespace NKikimr::NSQS { Y_VERIFY(response.GetResults().size() == 1); const auto& rr = response.GetResults(0).GetValue().GetStruct(0); ui64 removedRows = rr.GetList()[0].GetStruct(0).GetUint64(); - LOG_DEBUG_S(ctx, NKikimrServices::SQS, "removed " << removedRows << " rows for queue_id_number=" << QueueIdNumber); OnRemovedData(removedRows, ctx); break; } case EState::Finish: { - LOG_INFO_S(ctx, NKikimrServices::SQS, "queue data (queue_id_number=" << QueueIdNumber << ") removed successfuly."); + LOG_INFO_S(ctx, NKikimrServices::SQS, "[cleanup removed queues] queue data (queue_id_number=" << QueueIdNumber << ") removed successfuly."); MonitoringCounters->CleanupRemovedQueuesDone->Inc(); LockQueueToRemove(TDuration::Zero(), ctx); break; @@ -166,7 +166,7 @@ namespace NKikimr::NSQS { MonitoringCounters->CleanupRemovedQueuesErrors->Inc(); auto runAfter = RetryPeriod; RetryPeriod = Min(RetryPeriod * 2, RETRY_PERIOD_MAX); - LOG_ERROR_S(ctx, NKikimrServices::SQS, "Got an error while deleting data : " << error); + LOG_ERROR_S(ctx, NKikimrServices::SQS, "[cleanup removed queues] got an error while deleting data : " << error); LockQueueToRemove(runAfter, ctx); } @@ -183,6 +183,7 @@ namespace NKikimr::NSQS { } void TCleanupQueueDataActor::UpdateLock(const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::SQS, "[cleanup removed queues] update queue lock..."); State = EState::UpdateLockQueue; NClient::TParameters params; @@ -200,7 +201,7 @@ namespace NKikimr::NSQS { // Select RemoveTimestamp, QueueIdNumber, FifoQueue, Shards, TablesFormat ui64 queueIdNumber = queueRow.GetStruct(1).GetOptional().GetUint64(); if (queueIdNumber != QueueIdNumber) { - LOG_WARN_S(ctx, NKikimrServices::SQS, "Got queue to remove data queue_id_number=" << queueIdNumber + LOG_WARN_S(ctx, NKikimrServices::SQS, "[cleanup removed queues] got queue to continue remove data queue_id_number=" << queueIdNumber << ", but was locked queue_id_number=" << QueueIdNumber); StartRemoveData(queueRow, ctx); return; @@ -221,7 +222,7 @@ namespace NKikimr::NSQS { Shards = queueRow.GetStruct(3).GetOptional().GetUint32(); TablesFormat = queueRow.GetStruct(4).GetOptional().GetUint32(); - LOG_INFO_S(ctx, NKikimrServices::SQS, "Got queue to remove data: removed at " << RemoveQueueTimetsamp + LOG_INFO_S(ctx, NKikimrServices::SQS, "[cleanup removed queues] got queue to remove data: removed at " << RemoveQueueTimetsamp << " queue_id_number=" << QueueIdNumber << " tables_format=" << TablesFormat); if (TablesFormat == 0) { Finish(ctx); // TODO move code for removing directories @@ -231,6 +232,9 @@ namespace NKikimr::NSQS { } void TCleanupQueueDataActor::OnRemovedData(ui64 removedRows, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::SQS, "[cleanup removed queues] removed rows " << removedRows + << ", cleared tables " << ClearedTablesCount << ", shards to remove " << ShardsToRemove + ); MonitoringCounters->CleanupRemovedQueuesRows->Add(removedRows); if (removedRows == 0) { if (ShardsToRemove) { @@ -263,7 +267,7 @@ namespace NKikimr::NSQS { } void TCleanupQueueDataActor::RunRemoveData(const TActorContext& ctx) { - if (StartProcessTimestamp + UPDATE_LOCK_PERIOD > ctx.Now()) { + if (ctx.Now() - StartProcessTimestamp > UPDATE_LOCK_PERIOD) { UpdateLock(ctx); return; } diff --git a/ydb/core/ymq/actor/monitoring.cpp b/ydb/core/ymq/actor/monitoring.cpp index 14f64b8f809..3f0d5698257 100644 --- a/ydb/core/ymq/actor/monitoring.cpp +++ b/ydb/core/ymq/actor/monitoring.cpp @@ -21,7 +21,7 @@ namespace NKikimr::NSQS { TString removedQueuesTable = Cfg().GetRoot() + "/.RemovedQueues";
RemovedQueuesQuery = TStringBuilder() << R"__(
--!syntax_v1
- SELECT RemoveTimestamp FROM `)__" << removedQueuesTable << R"__(` ORDER BY RemoveTimestamp LIMIT 1;
+ SELECT RemoveTimestamp FROM `)__" << removedQueuesTable << R"__(` ORDER BY RemoveTimestamp LIMIT 1000;
)__";
RequestMetrics(TDuration::Zero(), ctx);
@@ -52,13 +52,13 @@ namespace NKikimr::NSQS { TDuration removeQueuesDataLag;
if (!rr.GetList().empty()) {
- Y_VERIFY(rr.GetList().size() == 1);
TInstant minRemoveQueueTimestamp = TInstant::MilliSeconds(rr.GetList()[0].GetStruct(0).GetOptional().GetUint64());
removeQueuesDataLag = ctx.Now() - minRemoveQueueTimestamp;
}
- LOG_DEBUG_S(ctx, NKikimrServices::SQS, "[monitoring] Report deletion queue data lag: " << removeQueuesDataLag << ", list is empty: " << rr.GetList().empty());
+ LOG_DEBUG_S(ctx, NKikimrServices::SQS, "[monitoring] Report deletion queue data lag: " << removeQueuesDataLag << ", count: " << rr.GetList().size());
*Counters->CleanupRemovedQueuesLagSec = removeQueuesDataLag.Seconds();
+ *Counters->CleanupRemovedQueuesLagCount = rr.GetList().size();
RequestMetrics(RetryPeriod, ctx);
}
diff --git a/ydb/core/ymq/base/counters.cpp b/ydb/core/ymq/base/counters.cpp index afd392cdd26..8bbdf5613cb 100644 --- a/ydb/core/ymq/base/counters.cpp +++ b/ydb/core/ymq/base/counters.cpp @@ -999,6 +999,7 @@ void TMeteringCounters::InitCounters(const TVector<TString>& classifierLabels) { void TMonitoringCounters::InitCounters() { INIT_COUNTER(MonitoringCounters, CleanupRemovedQueuesLagSec, ELifetime::Persistent, EValueType::Derivative, Lazy(Config)); + INIT_COUNTER(MonitoringCounters, CleanupRemovedQueuesLagCount, ELifetime::Persistent, EValueType::Derivative, Lazy(Config)); INIT_COUNTER(MonitoringCounters, CleanupRemovedQueuesDone, ELifetime::Persistent, EValueType::Derivative, Lazy(Config)); INIT_COUNTER(MonitoringCounters, CleanupRemovedQueuesRows, ELifetime::Persistent, EValueType::Derivative, Lazy(Config)); INIT_COUNTER(MonitoringCounters, CleanupRemovedQueuesErrors, ELifetime::Persistent, EValueType::Derivative, Lazy(Config)); diff --git a/ydb/core/ymq/base/counters.h b/ydb/core/ymq/base/counters.h index ce3c6caa999..7dda5ff14ae 100644 --- a/ydb/core/ymq/base/counters.h +++ b/ydb/core/ymq/base/counters.h @@ -830,6 +830,7 @@ private: // Common service monitoring counters struct TMonitoringCounters : public TAtomicRefCount<TMonitoringCounters> { TLazyCachedCounter CleanupRemovedQueuesLagSec; + TLazyCachedCounter CleanupRemovedQueuesLagCount; TLazyCachedCounter CleanupRemovedQueuesDone; TLazyCachedCounter CleanupRemovedQueuesRows; |