summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexbogo <[email protected]>2022-08-30 11:25:59 +0300
committeralexbogo <[email protected]>2022-08-30 11:25:59 +0300
commitadb44f55f06baa2e46dfb9f688f9f733ddd3bb78 (patch)
tree46ba0de2802a47d07379075270cf3961cc755c4c
parent9f9a20af2885daf80e5e7d16c7a3597aeefdb5c8 (diff)
[ymq] add more logs in cleanup actor, add unremoved queues lag in count
init
-rw-r--r--ydb/core/ymq/actor/cleanup_queue_data.cpp18
-rw-r--r--ydb/core/ymq/actor/monitoring.cpp6
-rw-r--r--ydb/core/ymq/base/counters.cpp1
-rw-r--r--ydb/core/ymq/base/counters.h1
4 files changed, 16 insertions, 10 deletions
diff --git a/ydb/core/ymq/actor/cleanup_queue_data.cpp b/ydb/core/ymq/actor/cleanup_queue_data.cpp
index 45f34170992..0d94214b856 100644
--- a/ydb/core/ymq/actor/cleanup_queue_data.cpp
+++ b/ydb/core/ymq/actor/cleanup_queue_data.cpp
@@ -96,6 +96,7 @@ namespace NKikimr::NSQS {
}
void TCleanupQueueDataActor::RunGetQueuesQuery(EState state, TDuration sendAfter, const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::SQS, "[cleanup removed queues] getting queues...");
State = state;
NClient::TParameters params;
@@ -128,7 +129,7 @@ namespace NKikimr::NSQS {
Y_VERIFY(response.GetResults().size() == 1);
const auto& rr = response.GetResults(0).GetValue().GetStruct(0);
if (rr.GetList().empty()) {
- LOG_DEBUG_S(ctx, NKikimrServices::SQS, "there are no queues to delete");
+ LOG_DEBUG_S(ctx, NKikimrServices::SQS, "[cleanup removed queues] there are no queues to delete");
LockQueueToRemove(IDLE_TIMEOUT, ctx);
return;
}
@@ -145,12 +146,11 @@ namespace NKikimr::NSQS {
Y_VERIFY(response.GetResults().size() == 1);
const auto& rr = response.GetResults(0).GetValue().GetStruct(0);
ui64 removedRows = rr.GetList()[0].GetStruct(0).GetUint64();
- LOG_DEBUG_S(ctx, NKikimrServices::SQS, "removed " << removedRows << " rows for queue_id_number=" << QueueIdNumber);
OnRemovedData(removedRows, ctx);
break;
}
case EState::Finish: {
- LOG_INFO_S(ctx, NKikimrServices::SQS, "queue data (queue_id_number=" << QueueIdNumber << ") removed successfuly.");
+ LOG_INFO_S(ctx, NKikimrServices::SQS, "[cleanup removed queues] queue data (queue_id_number=" << QueueIdNumber << ") removed successfuly.");
MonitoringCounters->CleanupRemovedQueuesDone->Inc();
LockQueueToRemove(TDuration::Zero(), ctx);
break;
@@ -166,7 +166,7 @@ namespace NKikimr::NSQS {
MonitoringCounters->CleanupRemovedQueuesErrors->Inc();
auto runAfter = RetryPeriod;
RetryPeriod = Min(RetryPeriod * 2, RETRY_PERIOD_MAX);
- LOG_ERROR_S(ctx, NKikimrServices::SQS, "Got an error while deleting data : " << error);
+ LOG_ERROR_S(ctx, NKikimrServices::SQS, "[cleanup removed queues] got an error while deleting data : " << error);
LockQueueToRemove(runAfter, ctx);
}
@@ -183,6 +183,7 @@ namespace NKikimr::NSQS {
}
void TCleanupQueueDataActor::UpdateLock(const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::SQS, "[cleanup removed queues] update queue lock...");
State = EState::UpdateLockQueue;
NClient::TParameters params;
@@ -200,7 +201,7 @@ namespace NKikimr::NSQS {
// Select RemoveTimestamp, QueueIdNumber, FifoQueue, Shards, TablesFormat
ui64 queueIdNumber = queueRow.GetStruct(1).GetOptional().GetUint64();
if (queueIdNumber != QueueIdNumber) {
- LOG_WARN_S(ctx, NKikimrServices::SQS, "Got queue to remove data queue_id_number=" << queueIdNumber
+ LOG_WARN_S(ctx, NKikimrServices::SQS, "[cleanup removed queues] got queue to continue remove data queue_id_number=" << queueIdNumber
<< ", but was locked queue_id_number=" << QueueIdNumber);
StartRemoveData(queueRow, ctx);
return;
@@ -221,7 +222,7 @@ namespace NKikimr::NSQS {
Shards = queueRow.GetStruct(3).GetOptional().GetUint32();
TablesFormat = queueRow.GetStruct(4).GetOptional().GetUint32();
- LOG_INFO_S(ctx, NKikimrServices::SQS, "Got queue to remove data: removed at " << RemoveQueueTimetsamp
+ LOG_INFO_S(ctx, NKikimrServices::SQS, "[cleanup removed queues] got queue to remove data: removed at " << RemoveQueueTimetsamp
<< " queue_id_number=" << QueueIdNumber << " tables_format=" << TablesFormat);
if (TablesFormat == 0) {
Finish(ctx); // TODO move code for removing directories
@@ -231,6 +232,9 @@ namespace NKikimr::NSQS {
}
void TCleanupQueueDataActor::OnRemovedData(ui64 removedRows, const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::SQS, "[cleanup removed queues] removed rows " << removedRows
+ << ", cleared tables " << ClearedTablesCount << ", shards to remove " << ShardsToRemove
+ );
MonitoringCounters->CleanupRemovedQueuesRows->Add(removedRows);
if (removedRows == 0) {
if (ShardsToRemove) {
@@ -263,7 +267,7 @@ namespace NKikimr::NSQS {
}
void TCleanupQueueDataActor::RunRemoveData(const TActorContext& ctx) {
- if (StartProcessTimestamp + UPDATE_LOCK_PERIOD > ctx.Now()) {
+ if (ctx.Now() - StartProcessTimestamp > UPDATE_LOCK_PERIOD) {
UpdateLock(ctx);
return;
}
diff --git a/ydb/core/ymq/actor/monitoring.cpp b/ydb/core/ymq/actor/monitoring.cpp
index 14f64b8f809..3f0d5698257 100644
--- a/ydb/core/ymq/actor/monitoring.cpp
+++ b/ydb/core/ymq/actor/monitoring.cpp
@@ -21,7 +21,7 @@ namespace NKikimr::NSQS {
TString removedQueuesTable = Cfg().GetRoot() + "/.RemovedQueues";
RemovedQueuesQuery = TStringBuilder() << R"__(
--!syntax_v1
- SELECT RemoveTimestamp FROM `)__" << removedQueuesTable << R"__(` ORDER BY RemoveTimestamp LIMIT 1;
+ SELECT RemoveTimestamp FROM `)__" << removedQueuesTable << R"__(` ORDER BY RemoveTimestamp LIMIT 1000;
)__";
RequestMetrics(TDuration::Zero(), ctx);
@@ -52,13 +52,13 @@ namespace NKikimr::NSQS {
TDuration removeQueuesDataLag;
if (!rr.GetList().empty()) {
- Y_VERIFY(rr.GetList().size() == 1);
TInstant minRemoveQueueTimestamp = TInstant::MilliSeconds(rr.GetList()[0].GetStruct(0).GetOptional().GetUint64());
removeQueuesDataLag = ctx.Now() - minRemoveQueueTimestamp;
}
- LOG_DEBUG_S(ctx, NKikimrServices::SQS, "[monitoring] Report deletion queue data lag: " << removeQueuesDataLag << ", list is empty: " << rr.GetList().empty());
+ LOG_DEBUG_S(ctx, NKikimrServices::SQS, "[monitoring] Report deletion queue data lag: " << removeQueuesDataLag << ", count: " << rr.GetList().size());
*Counters->CleanupRemovedQueuesLagSec = removeQueuesDataLag.Seconds();
+ *Counters->CleanupRemovedQueuesLagCount = rr.GetList().size();
RequestMetrics(RetryPeriod, ctx);
}
diff --git a/ydb/core/ymq/base/counters.cpp b/ydb/core/ymq/base/counters.cpp
index afd392cdd26..8bbdf5613cb 100644
--- a/ydb/core/ymq/base/counters.cpp
+++ b/ydb/core/ymq/base/counters.cpp
@@ -999,6 +999,7 @@ void TMeteringCounters::InitCounters(const TVector<TString>& classifierLabels) {
void TMonitoringCounters::InitCounters() {
INIT_COUNTER(MonitoringCounters, CleanupRemovedQueuesLagSec, ELifetime::Persistent, EValueType::Derivative, Lazy(Config));
+ INIT_COUNTER(MonitoringCounters, CleanupRemovedQueuesLagCount, ELifetime::Persistent, EValueType::Derivative, Lazy(Config));
INIT_COUNTER(MonitoringCounters, CleanupRemovedQueuesDone, ELifetime::Persistent, EValueType::Derivative, Lazy(Config));
INIT_COUNTER(MonitoringCounters, CleanupRemovedQueuesRows, ELifetime::Persistent, EValueType::Derivative, Lazy(Config));
INIT_COUNTER(MonitoringCounters, CleanupRemovedQueuesErrors, ELifetime::Persistent, EValueType::Derivative, Lazy(Config));
diff --git a/ydb/core/ymq/base/counters.h b/ydb/core/ymq/base/counters.h
index ce3c6caa999..7dda5ff14ae 100644
--- a/ydb/core/ymq/base/counters.h
+++ b/ydb/core/ymq/base/counters.h
@@ -830,6 +830,7 @@ private:
// Common service monitoring counters
struct TMonitoringCounters : public TAtomicRefCount<TMonitoringCounters> {
TLazyCachedCounter CleanupRemovedQueuesLagSec;
+ TLazyCachedCounter CleanupRemovedQueuesLagCount;
TLazyCachedCounter CleanupRemovedQueuesDone;
TLazyCachedCounter CleanupRemovedQueuesRows;