diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-10-18 22:47:32 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-10-18 23:05:10 +0300 |
commit | 40dbf4725976afe51fea468f3821a432dbfb08dc (patch) | |
tree | e7f387c678685e0b8aa5ec9269a3ece4907859a0 | |
parent | 4f75a6c99c9d9615b5663caa4632a10dce70a7f0 (diff) | |
download | ydb-40dbf4725976afe51fea468f3821a432dbfb08dc.tar.gz |
Change delivery lag KIKIMR-19368
-rw-r--r-- | ydb/core/protos/counters_datashard.proto | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard.cpp | 21 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 15 |
3 files changed, 31 insertions, 6 deletions
diff --git a/ydb/core/protos/counters_datashard.proto b/ydb/core/protos/counters_datashard.proto index 00b451ca6e4..e9542e83a5d 100644 --- a/ydb/core/protos/counters_datashard.proto +++ b/ydb/core/protos/counters_datashard.proto @@ -25,6 +25,7 @@ enum ESimpleCounters { COUNTER_READ_ITERATORS_COUNT = 15 [(CounterOpts) = {Name: "ReadIteratorsCount"}]; COUNTER_READ_ITERATORS_EXHAUSTED_COUNT = 16 [(CounterOpts) = {Name: "ReadIteratorsExhaustedCount"}]; COUNTER_CHANGE_RECORDS_REQUESTED = 17 [(CounterOpts) = {Name: "ChangeRecordsRequested"}]; + COUNTER_CHANGE_DELIVERY_LAG = 18 [(CounterOpts) = {Name: "ChangeDeliveryLag"}]; } enum ECumulativeCounters { diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index 7b7616d7d4c..bb9de6094a2 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -914,6 +914,7 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) { } } + UpdateChangeDeliveryLag(AppData()->TimeProvider->Now()); ChangesQueue.erase(it); IncCounter(COUNTER_CHANGE_RECORDS_REMOVED); @@ -939,11 +940,19 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange << ": at tablet: " << TabletID() << ", records: " << JoinSeq(", ", records)); + const auto now = AppData()->TimeProvider->Now(); TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> forward(Reserve(records.size())); for (const auto& record : records) { forward.emplace_back(record.Order, record.PathId, record.BodySize); - if (auto res = ChangesQueue.emplace(record.Order, record); res.second) { + auto res = ChangesQueue.emplace( + std::piecewise_construct, + std::forward_as_tuple(record.Order), + std::forward_as_tuple(record, now) + ); + if (res.second) { + ChangesList.PushBack(&res.first->second); + Y_ABORT_UNLESS(ChangesQueueBytes <= (Max<ui64>() - record.BodySize)); ChangesQueueBytes += record.BodySize; @@ -954,6 +963,7 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange } } + UpdateChangeDeliveryLag(now); IncCounter(COUNTER_CHANGE_RECORDS_ENQUEUED, forward.size()); SetCounter(COUNTER_CHANGE_QUEUE_SIZE, ChangesQueue.size()); @@ -961,6 +971,14 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange Send(OutChangeSender, new TEvChangeExchange::TEvEnqueueRecords(std::move(forward))); } +void TDataShard::UpdateChangeDeliveryLag(TInstant now) { + if (!ChangesList.Empty()) { + SetCounter(COUNTER_CHANGE_DELIVERY_LAG, (now - ChangesList.Front()->EnqueuedAt).MilliSeconds()); + } else { + SetCounter(COUNTER_CHANGE_DELIVERY_LAG, 0); + } +} + void TDataShard::CreateChangeSender(const TActorContext& ctx) { Y_ABORT_UNLESS(!OutChangeSender); OutChangeSender = Register(NDataShard::CreateChangeSender(this)); @@ -3244,6 +3262,7 @@ void TDataShard::CheckChangesQueueNoOverflow() { void TDataShard::DoPeriodicTasks(const TActorContext &ctx) { UpdateLagCounters(ctx); + UpdateChangeDeliveryLag(ctx.Now()); UpdateTableStats(ctx); SendPeriodicTableStats(ctx); CollectCpuUsage(ctx); diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 5ff3a5f3b08..accb120c79f 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -1834,6 +1834,7 @@ public: void MoveChangeRecord(NIceDb::TNiceDb& db, ui64 lockId, ui64 lockOffset, const TPathId& pathId); void RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order); void EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records); + void UpdateChangeDeliveryLag(TInstant now); void CreateChangeSender(const TActorContext& ctx); void KillChangeSender(const TActorContext& ctx); void MaybeActivateChangeSender(const TActorContext& ctx); @@ -2675,28 +2676,31 @@ private: } }; - struct TEnqueuedRecord { + struct TEnqueuedRecordTag {}; + struct TEnqueuedRecord: public TIntrusiveListItem<TEnqueuedRecord, TEnqueuedRecordTag> { ui64 BodySize; TPathId TableId; ui64 SchemaVersion; bool SchemaSnapshotAcquired; + TInstant EnqueuedAt; ui64 LockId; ui64 LockOffset; explicit TEnqueuedRecord(ui64 bodySize, const TPathId& tableId, - ui64 schemaVersion, ui64 lockId = 0, ui64 lockOffset = 0) + ui64 schemaVersion, TInstant now, ui64 lockId = 0, ui64 lockOffset = 0) : BodySize(bodySize) , TableId(tableId) , SchemaVersion(schemaVersion) , SchemaSnapshotAcquired(false) + , EnqueuedAt(now) , LockId(lockId) , LockOffset(lockOffset) { } - explicit TEnqueuedRecord(const IDataShardChangeCollector::TChange& record) - : TEnqueuedRecord(record.BodySize, record.TableId, - record.SchemaVersion, record.LockId, record.LockOffset) + explicit TEnqueuedRecord(const IDataShardChangeCollector::TChange& record, TInstant now) + : TEnqueuedRecord(record.BodySize, record.TableId, record.SchemaVersion, now, + record.LockId, record.LockOffset) { } }; @@ -2714,6 +2718,7 @@ private: bool RequestChangeRecordsInFly = false; bool RemoveChangeRecordsInFly = false; THashMap<ui64, TEnqueuedRecord> ChangesQueue; // ui64 is order + TIntrusiveList<TEnqueuedRecord, TEnqueuedRecordTag> ChangesList; ui64 ChangesQueueBytes = 0; TActorId OutChangeSender; bool OutChangeSenderSuspended = false; |