aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-11-16 12:56:40 +0300
committerilnaz <ilnaz@ydb.tech>2023-11-16 13:32:16 +0300
commitb97a33413d2c0fb35d9c57b75dda5715d2339824 (patch)
tree8dfc9c5dbb85b4064120fce664e238e3799966ee
parent33ef64e7d000aaa1ed57e87eae157142edcd1418 (diff)
downloadydb-b97a33413d2c0fb35d9c57b75dda5715d2339824.tar.gz
CDC & async index data lag KIKIMR-16851
-rw-r--r--ydb/core/protos/counters_datashard.proto1
-rw-r--r--ydb/core/tx/datashard/change_collector.h8
-rw-r--r--ydb/core/tx/datashard/datashard.cpp13
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h10
4 files changed, 23 insertions, 9 deletions
diff --git a/ydb/core/protos/counters_datashard.proto b/ydb/core/protos/counters_datashard.proto
index e9542e83a5..0d872d299f 100644
--- a/ydb/core/protos/counters_datashard.proto
+++ b/ydb/core/protos/counters_datashard.proto
@@ -26,6 +26,7 @@ enum ESimpleCounters {
COUNTER_READ_ITERATORS_EXHAUSTED_COUNT = 16 [(CounterOpts) = {Name: "ReadIteratorsExhaustedCount"}];
COUNTER_CHANGE_RECORDS_REQUESTED = 17 [(CounterOpts) = {Name: "ChangeRecordsRequested"}];
COUNTER_CHANGE_DELIVERY_LAG = 18 [(CounterOpts) = {Name: "ChangeDeliveryLag"}];
+ COUNTER_CHANGE_DATA_LAG = 19 [(CounterOpts) = {Name: "ChangeDataLag"}];
}
enum ECumulativeCounters {
diff --git a/ydb/core/tx/datashard/change_collector.h b/ydb/core/tx/datashard/change_collector.h
index b8171933ea..dcef250a70 100644
--- a/ydb/core/tx/datashard/change_collector.h
+++ b/ydb/core/tx/datashard/change_collector.h
@@ -3,6 +3,8 @@
#include <ydb/core/engine/minikql/change_collector_iface.h>
#include <ydb/core/tablet_flat/tablet_flat_executor.h>
+#include <util/datetime/base.h>
+
namespace NKikimr {
namespace NDataShard {
@@ -57,6 +59,12 @@ public:
ui64 SchemaVersion;
ui64 LockId = 0;
ui64 LockOffset = 0;
+
+ TInstant CreatedAt() const {
+ return Group
+ ? TInstant::FromValue(Group)
+ : TInstant::MilliSeconds(Step);
+ }
};
public:
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index eb7e00d2cf..eac541490f 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -916,7 +916,7 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
}
}
- UpdateChangeDeliveryLag(AppData()->TimeProvider->Now());
+ UpdateChangeExchangeLag(AppData()->TimeProvider->Now());
ChangesQueue.erase(it);
IncCounter(COUNTER_CHANGE_RECORDS_REMOVED);
@@ -965,7 +965,7 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
}
}
- UpdateChangeDeliveryLag(now);
+ UpdateChangeExchangeLag(now);
IncCounter(COUNTER_CHANGE_RECORDS_ENQUEUED, forward.size());
SetCounter(COUNTER_CHANGE_QUEUE_SIZE, ChangesQueue.size());
@@ -973,10 +973,13 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
Send(OutChangeSender, new TEvChangeExchange::TEvEnqueueRecords(std::move(forward)));
}
-void TDataShard::UpdateChangeDeliveryLag(TInstant now) {
+void TDataShard::UpdateChangeExchangeLag(TInstant now) {
if (!ChangesList.Empty()) {
- SetCounter(COUNTER_CHANGE_DELIVERY_LAG, (now - ChangesList.Front()->EnqueuedAt).MilliSeconds());
+ const auto* front = ChangesList.Front();
+ SetCounter(COUNTER_CHANGE_DATA_LAG, Max(now - front->CreatedAt, TDuration::Zero()).MilliSeconds());
+ SetCounter(COUNTER_CHANGE_DELIVERY_LAG, (now - front->EnqueuedAt).MilliSeconds());
} else {
+ SetCounter(COUNTER_CHANGE_DATA_LAG, 0);
SetCounter(COUNTER_CHANGE_DELIVERY_LAG, 0);
}
}
@@ -3265,7 +3268,7 @@ void TDataShard::CheckChangesQueueNoOverflow() {
void TDataShard::DoPeriodicTasks(const TActorContext &ctx) {
UpdateLagCounters(ctx);
- UpdateChangeDeliveryLag(ctx.Now());
+ UpdateChangeExchangeLag(ctx.Now());
UpdateTableStats(ctx);
SendPeriodicTableStats(ctx);
CollectCpuUsage(ctx);
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index a1eb936183..6bcbfe60b1 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -1835,7 +1835,7 @@ public:
void MoveChangeRecord(NIceDb::TNiceDb& db, ui64 lockId, ui64 lockOffset, const TPathId& pathId);
void RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order);
void EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records);
- void UpdateChangeDeliveryLag(TInstant now);
+ void UpdateChangeExchangeLag(TInstant now);
void CreateChangeSender(const TActorContext& ctx);
void KillChangeSender(const TActorContext& ctx);
void MaybeActivateChangeSender(const TActorContext& ctx);
@@ -2702,24 +2702,26 @@ private:
TPathId TableId;
ui64 SchemaVersion;
bool SchemaSnapshotAcquired;
+ TInstant CreatedAt;
TInstant EnqueuedAt;
ui64 LockId;
ui64 LockOffset;
explicit TEnqueuedRecord(ui64 bodySize, const TPathId& tableId,
- ui64 schemaVersion, TInstant now, ui64 lockId = 0, ui64 lockOffset = 0)
+ ui64 schemaVersion, TInstant created, TInstant enqueued, ui64 lockId = 0, ui64 lockOffset = 0)
: BodySize(bodySize)
, TableId(tableId)
, SchemaVersion(schemaVersion)
, SchemaSnapshotAcquired(false)
- , EnqueuedAt(now)
+ , CreatedAt(created)
+ , EnqueuedAt(enqueued)
, LockId(lockId)
, LockOffset(lockOffset)
{
}
explicit TEnqueuedRecord(const IDataShardChangeCollector::TChange& record, TInstant now)
- : TEnqueuedRecord(record.BodySize, record.TableId, record.SchemaVersion, now,
+ : TEnqueuedRecord(record.BodySize, record.TableId, record.SchemaVersion, record.CreatedAt(), now,
record.LockId, record.LockOffset)
{
}