diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-11-16 12:56:40 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-11-16 13:32:16 +0300 |
commit | b97a33413d2c0fb35d9c57b75dda5715d2339824 (patch) | |
tree | 8dfc9c5dbb85b4064120fce664e238e3799966ee | |
parent | 33ef64e7d000aaa1ed57e87eae157142edcd1418 (diff) | |
download | ydb-b97a33413d2c0fb35d9c57b75dda5715d2339824.tar.gz |
CDC & async index data lag KIKIMR-16851
-rw-r--r-- | ydb/core/protos/counters_datashard.proto | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/change_collector.h | 8 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard.cpp | 13 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 10 |
4 files changed, 23 insertions, 9 deletions
diff --git a/ydb/core/protos/counters_datashard.proto b/ydb/core/protos/counters_datashard.proto index e9542e83a5..0d872d299f 100644 --- a/ydb/core/protos/counters_datashard.proto +++ b/ydb/core/protos/counters_datashard.proto @@ -26,6 +26,7 @@ enum ESimpleCounters { COUNTER_READ_ITERATORS_EXHAUSTED_COUNT = 16 [(CounterOpts) = {Name: "ReadIteratorsExhaustedCount"}]; COUNTER_CHANGE_RECORDS_REQUESTED = 17 [(CounterOpts) = {Name: "ChangeRecordsRequested"}]; COUNTER_CHANGE_DELIVERY_LAG = 18 [(CounterOpts) = {Name: "ChangeDeliveryLag"}]; + COUNTER_CHANGE_DATA_LAG = 19 [(CounterOpts) = {Name: "ChangeDataLag"}]; } enum ECumulativeCounters { diff --git a/ydb/core/tx/datashard/change_collector.h b/ydb/core/tx/datashard/change_collector.h index b8171933ea..dcef250a70 100644 --- a/ydb/core/tx/datashard/change_collector.h +++ b/ydb/core/tx/datashard/change_collector.h @@ -3,6 +3,8 @@ #include <ydb/core/engine/minikql/change_collector_iface.h> #include <ydb/core/tablet_flat/tablet_flat_executor.h> +#include <util/datetime/base.h> + namespace NKikimr { namespace NDataShard { @@ -57,6 +59,12 @@ public: ui64 SchemaVersion; ui64 LockId = 0; ui64 LockOffset = 0; + + TInstant CreatedAt() const { + return Group + ? TInstant::FromValue(Group) + : TInstant::MilliSeconds(Step); + } }; public: diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index eb7e00d2cf..eac541490f 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -916,7 +916,7 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) { } } - UpdateChangeDeliveryLag(AppData()->TimeProvider->Now()); + UpdateChangeExchangeLag(AppData()->TimeProvider->Now()); ChangesQueue.erase(it); IncCounter(COUNTER_CHANGE_RECORDS_REMOVED); @@ -965,7 +965,7 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange } } - UpdateChangeDeliveryLag(now); + UpdateChangeExchangeLag(now); IncCounter(COUNTER_CHANGE_RECORDS_ENQUEUED, forward.size()); SetCounter(COUNTER_CHANGE_QUEUE_SIZE, ChangesQueue.size()); @@ -973,10 +973,13 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange Send(OutChangeSender, new TEvChangeExchange::TEvEnqueueRecords(std::move(forward))); } -void TDataShard::UpdateChangeDeliveryLag(TInstant now) { +void TDataShard::UpdateChangeExchangeLag(TInstant now) { if (!ChangesList.Empty()) { - SetCounter(COUNTER_CHANGE_DELIVERY_LAG, (now - ChangesList.Front()->EnqueuedAt).MilliSeconds()); + const auto* front = ChangesList.Front(); + SetCounter(COUNTER_CHANGE_DATA_LAG, Max(now - front->CreatedAt, TDuration::Zero()).MilliSeconds()); + SetCounter(COUNTER_CHANGE_DELIVERY_LAG, (now - front->EnqueuedAt).MilliSeconds()); } else { + SetCounter(COUNTER_CHANGE_DATA_LAG, 0); SetCounter(COUNTER_CHANGE_DELIVERY_LAG, 0); } } @@ -3265,7 +3268,7 @@ void TDataShard::CheckChangesQueueNoOverflow() { void TDataShard::DoPeriodicTasks(const TActorContext &ctx) { UpdateLagCounters(ctx); - UpdateChangeDeliveryLag(ctx.Now()); + UpdateChangeExchangeLag(ctx.Now()); UpdateTableStats(ctx); SendPeriodicTableStats(ctx); CollectCpuUsage(ctx); diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index a1eb936183..6bcbfe60b1 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1835,7 +1835,7 @@ public: void MoveChangeRecord(NIceDb::TNiceDb& db, ui64 lockId, ui64 lockOffset, const TPathId& pathId); void RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order); void EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records); - void UpdateChangeDeliveryLag(TInstant now); + void UpdateChangeExchangeLag(TInstant now); void CreateChangeSender(const TActorContext& ctx); void KillChangeSender(const TActorContext& ctx); void MaybeActivateChangeSender(const TActorContext& ctx); @@ -2702,24 +2702,26 @@ private: TPathId TableId; ui64 SchemaVersion; bool SchemaSnapshotAcquired; + TInstant CreatedAt; TInstant EnqueuedAt; ui64 LockId; ui64 LockOffset; explicit TEnqueuedRecord(ui64 bodySize, const TPathId& tableId, - ui64 schemaVersion, TInstant now, ui64 lockId = 0, ui64 lockOffset = 0) + ui64 schemaVersion, TInstant created, TInstant enqueued, ui64 lockId = 0, ui64 lockOffset = 0) : BodySize(bodySize) , TableId(tableId) , SchemaVersion(schemaVersion) , SchemaSnapshotAcquired(false) - , EnqueuedAt(now) + , CreatedAt(created) + , EnqueuedAt(enqueued) , LockId(lockId) , LockOffset(lockOffset) { } explicit TEnqueuedRecord(const IDataShardChangeCollector::TChange& record, TInstant now) - : TEnqueuedRecord(record.BodySize, record.TableId, record.SchemaVersion, now, + : TEnqueuedRecord(record.BodySize, record.TableId, record.SchemaVersion, record.CreatedAt(), now, record.LockId, record.LockOffset) { } |