aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-10-18 22:47:32 +0300
committerilnaz <ilnaz@ydb.tech>2023-10-18 23:05:10 +0300
commit40dbf4725976afe51fea468f3821a432dbfb08dc (patch)
treee7f387c678685e0b8aa5ec9269a3ece4907859a0
parent4f75a6c99c9d9615b5663caa4632a10dce70a7f0 (diff)
downloadydb-40dbf4725976afe51fea468f3821a432dbfb08dc.tar.gz
Change delivery lag KIKIMR-19368
-rw-r--r--ydb/core/protos/counters_datashard.proto1
-rw-r--r--ydb/core/tx/datashard/datashard.cpp21
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h15
3 files changed, 31 insertions, 6 deletions
diff --git a/ydb/core/protos/counters_datashard.proto b/ydb/core/protos/counters_datashard.proto
index 00b451ca6e4..e9542e83a5d 100644
--- a/ydb/core/protos/counters_datashard.proto
+++ b/ydb/core/protos/counters_datashard.proto
@@ -25,6 +25,7 @@ enum ESimpleCounters {
COUNTER_READ_ITERATORS_COUNT = 15 [(CounterOpts) = {Name: "ReadIteratorsCount"}];
COUNTER_READ_ITERATORS_EXHAUSTED_COUNT = 16 [(CounterOpts) = {Name: "ReadIteratorsExhaustedCount"}];
COUNTER_CHANGE_RECORDS_REQUESTED = 17 [(CounterOpts) = {Name: "ChangeRecordsRequested"}];
+ COUNTER_CHANGE_DELIVERY_LAG = 18 [(CounterOpts) = {Name: "ChangeDeliveryLag"}];
}
enum ECumulativeCounters {
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 7b7616d7d4c..bb9de6094a2 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -914,6 +914,7 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
}
}
+ UpdateChangeDeliveryLag(AppData()->TimeProvider->Now());
ChangesQueue.erase(it);
IncCounter(COUNTER_CHANGE_RECORDS_REMOVED);
@@ -939,11 +940,19 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
<< ": at tablet: " << TabletID()
<< ", records: " << JoinSeq(", ", records));
+ const auto now = AppData()->TimeProvider->Now();
TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> forward(Reserve(records.size()));
for (const auto& record : records) {
forward.emplace_back(record.Order, record.PathId, record.BodySize);
- if (auto res = ChangesQueue.emplace(record.Order, record); res.second) {
+ auto res = ChangesQueue.emplace(
+ std::piecewise_construct,
+ std::forward_as_tuple(record.Order),
+ std::forward_as_tuple(record, now)
+ );
+ if (res.second) {
+ ChangesList.PushBack(&res.first->second);
+
Y_ABORT_UNLESS(ChangesQueueBytes <= (Max<ui64>() - record.BodySize));
ChangesQueueBytes += record.BodySize;
@@ -954,6 +963,7 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
}
}
+ UpdateChangeDeliveryLag(now);
IncCounter(COUNTER_CHANGE_RECORDS_ENQUEUED, forward.size());
SetCounter(COUNTER_CHANGE_QUEUE_SIZE, ChangesQueue.size());
@@ -961,6 +971,14 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
Send(OutChangeSender, new TEvChangeExchange::TEvEnqueueRecords(std::move(forward)));
}
+void TDataShard::UpdateChangeDeliveryLag(TInstant now) {
+ if (!ChangesList.Empty()) {
+ SetCounter(COUNTER_CHANGE_DELIVERY_LAG, (now - ChangesList.Front()->EnqueuedAt).MilliSeconds());
+ } else {
+ SetCounter(COUNTER_CHANGE_DELIVERY_LAG, 0);
+ }
+}
+
void TDataShard::CreateChangeSender(const TActorContext& ctx) {
Y_ABORT_UNLESS(!OutChangeSender);
OutChangeSender = Register(NDataShard::CreateChangeSender(this));
@@ -3244,6 +3262,7 @@ void TDataShard::CheckChangesQueueNoOverflow() {
void TDataShard::DoPeriodicTasks(const TActorContext &ctx) {
UpdateLagCounters(ctx);
+ UpdateChangeDeliveryLag(ctx.Now());
UpdateTableStats(ctx);
SendPeriodicTableStats(ctx);
CollectCpuUsage(ctx);
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 5ff3a5f3b08..accb120c79f 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -1834,6 +1834,7 @@ public:
void MoveChangeRecord(NIceDb::TNiceDb& db, ui64 lockId, ui64 lockOffset, const TPathId& pathId);
void RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order);
void EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records);
+ void UpdateChangeDeliveryLag(TInstant now);
void CreateChangeSender(const TActorContext& ctx);
void KillChangeSender(const TActorContext& ctx);
void MaybeActivateChangeSender(const TActorContext& ctx);
@@ -2675,28 +2676,31 @@ private:
}
};
- struct TEnqueuedRecord {
+ struct TEnqueuedRecordTag {};
+ struct TEnqueuedRecord: public TIntrusiveListItem<TEnqueuedRecord, TEnqueuedRecordTag> {
ui64 BodySize;
TPathId TableId;
ui64 SchemaVersion;
bool SchemaSnapshotAcquired;
+ TInstant EnqueuedAt;
ui64 LockId;
ui64 LockOffset;
explicit TEnqueuedRecord(ui64 bodySize, const TPathId& tableId,
- ui64 schemaVersion, ui64 lockId = 0, ui64 lockOffset = 0)
+ ui64 schemaVersion, TInstant now, ui64 lockId = 0, ui64 lockOffset = 0)
: BodySize(bodySize)
, TableId(tableId)
, SchemaVersion(schemaVersion)
, SchemaSnapshotAcquired(false)
+ , EnqueuedAt(now)
, LockId(lockId)
, LockOffset(lockOffset)
{
}
- explicit TEnqueuedRecord(const IDataShardChangeCollector::TChange& record)
- : TEnqueuedRecord(record.BodySize, record.TableId,
- record.SchemaVersion, record.LockId, record.LockOffset)
+ explicit TEnqueuedRecord(const IDataShardChangeCollector::TChange& record, TInstant now)
+ : TEnqueuedRecord(record.BodySize, record.TableId, record.SchemaVersion, now,
+ record.LockId, record.LockOffset)
{
}
};
@@ -2714,6 +2718,7 @@ private:
bool RequestChangeRecordsInFly = false;
bool RemoveChangeRecordsInFly = false;
THashMap<ui64, TEnqueuedRecord> ChangesQueue; // ui64 is order
+ TIntrusiveList<TEnqueuedRecord, TEnqueuedRecordTag> ChangesList;
ui64 ChangesQueueBytes = 0;
TActorId OutChangeSender;
bool OutChangeSenderSuspended = false;