aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornsofya <nsofya@yandex-team.com>2023-07-17 17:45:20 +0300
committernsofya <nsofya@yandex-team.com>2023-07-17 17:45:20 +0300
commit89ec8bd3dbb74ea165a4340b80dfd52f319fd4a1 (patch)
tree09219ed0cd928f134766b598b904b22d06c39264
parent3337775f0751aa9bb532c245c8056f38ea336237 (diff)
downloadydb-89ec8bd3dbb74ea165a4340b80dfd52f319fd4a1.tar.gz
Wrap WritesInFlight control
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp15
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp1
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h72
3 files changed, 67 insertions, 21 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index 68a33666551..b2dcecd56b0 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -150,7 +150,7 @@ TColumnShard::EOverloadStatus TColumnShard::CheckOverloaded(const ui64 tableId,
} else if (TablesManager.IsOverloaded(tableId)) {
CSCounters.OnOverloadGranule(dataSize);;
return EOverloadStatus::Granule;
- } else if (ShardOverloaded()) {
+ } else if (WritesMonitor.ShardOverloaded()) {
CSCounters.OnOverloadShard(dataSize);
return EOverloadStatus::Shard;
}
@@ -162,8 +162,7 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
OnYellowChannels(putResult);
const auto& writeMeta = ev->Get()->GetWriteMeta();
- --WritesInFlight;
- WritesSizeInFlight -= putResult.GetResourceUsage().SourceMemorySize;
+ auto wg = WritesMonitor.FinishWrite(putResult.GetResourceUsage().SourceMemorySize);
if (putResult.GetPutStatus() != NKikimrProto::OK) {
IncCounter(COUNTER_WRITE_FAIL);
@@ -179,7 +178,6 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, errCode);
ctx.Send(writeMeta.GetSource(), result.release());
- SetCounter(COUNTER_WRITES_IN_FLY, WritesInFlight);
return;
}
@@ -189,7 +187,6 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
<< (writeMeta.GetWriteId() ? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : "") << " at tablet " << TabletID());
Execute(new TTxWrite(this, ev), ctx);
- SetCounter(COUNTER_WRITES_IN_FLY, WritesInFlight);
}
void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx) {
@@ -249,19 +246,17 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
}
}
- ++WritesInFlight;
- WritesSizeInFlight += writeData.GetSize();
+ auto wg = WritesMonitor.RegisterWrite(writeData.GetSize());
LOG_S_DEBUG("Write (blob) " << writeData.GetSize() << " bytes into pathId " << writeMeta.GetTableId()
- << (writeMeta.GetWriteId()? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : "")
- << " inflight " << WritesInFlight << " (" << WritesSizeInFlight << " bytes)"
+ << (writeMeta.GetWriteId()? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : " ")
+ << WritesMonitor.DebugString()
<< " at tablet " << TabletID());
const auto& snapshotSchema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema();
auto writeController = std::make_shared<NOlap::TIndexedWriteController>(ctx.SelfID, writeData, snapshotSchema);
ctx.Register(CreateWriteActor(TabletID(), writeController, BlobManager->StartBlobBatch(), TInstant::Max(), Settings.MaxSmallBlobSize));
}
- SetCounter(COUNTER_WRITES_IN_FLY, WritesInFlight);
}
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index af9689b3530..c542dc36161 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -132,6 +132,7 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
, InsertTable(std::make_unique<NOlap::TInsertTable>())
, ReadCounters("Read")
, ScanCounters("Scan")
+ , WritesMonitor(*this)
{
TabletCountersPtr.reset(new TProtobufTabletCounters<
ESimpleCounters_descriptor,
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 584497c9189..f344de4a30e 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -436,6 +436,65 @@ private:
TTablesManager TablesManager;
+ class TWritesMonitor {
+ private:
+ TColumnShard& Owner;
+ ui64 WritesInFlight = 0;
+ ui64 WritesSizeInFlight = 0;
+
+ public:
+ class TGuard: public TNonCopyable {
+ friend class TWritesMonitor;
+ private:
+ TWritesMonitor& Owner;
+
+ explicit TGuard(TWritesMonitor& owner)
+ : Owner(owner)
+ {}
+
+ public:
+ ~TGuard() {
+ Owner.UpdateCounters();
+ }
+ };
+
+ TWritesMonitor(TColumnShard& owner)
+ : Owner(owner)
+ {}
+
+ TGuard RegisterWrite(const ui64 dataSize) {
+ ++WritesInFlight;
+ WritesSizeInFlight += dataSize;
+ return TGuard(*this);
+ }
+
+ TGuard FinishWrite(const ui64 dataSize) {
+ Y_VERIFY(WritesInFlight > 0);
+ Y_VERIFY(WritesSizeInFlight >= dataSize);
+ --WritesInFlight;
+ WritesSizeInFlight -= dataSize;
+ return TGuard(*this);
+ }
+
+ bool ShardOverloaded() const {
+ ui64 txLimit = Owner.Settings.OverloadTxInFlight;
+ ui64 writesLimit = Owner.Settings.OverloadWritesInFlight;
+ ui64 writesSizeLimit = Owner.Settings.OverloadWritesSizeInFlight;
+ return (txLimit && Owner.Executor()->GetStats().TxInFly > txLimit) ||
+ (writesLimit && WritesInFlight > writesLimit) ||
+ (writesSizeLimit && WritesSizeInFlight > writesSizeLimit);
+ }
+
+ TString DebugString() const {
+ return TStringBuilder() << "TWritesMonitor: inflight " << WritesInFlight << " (" << WritesSizeInFlight << " bytes)";
+ }
+
+ private:
+ void UpdateCounters() {
+ Owner.SetCounter(COUNTER_WRITES_IN_FLY, WritesInFlight);
+ }
+ };
+
ui64 CurrentSchemeShardId = 0;
TMessageSeqNo LastSchemaSeqNo;
std::optional<NKikimrSubDomains::TProcessingParams> ProcessingParams;
@@ -443,8 +502,7 @@ private:
ui64 LastPlannedStep = 0;
ui64 LastPlannedTxId = 0;
ui64 LastExportNo = 0;
- ui64 WritesInFlight = 0;
- ui64 WritesSizeInFlight = 0;
+
ui64 OwnerPathId = 0;
ui64 TabletTxCounter = 0;
ui64 StatsReportRound = 0;
@@ -480,6 +538,7 @@ private:
const TIndexationCounters EvictionCounters = TIndexationCounters("Eviction");
const TCSCounters CSCounters;
+ TWritesMonitor WritesMonitor;
THashMap<ui64, TBasicTxInfo> BasicTxInfo;
@@ -515,15 +574,6 @@ private:
ui64 GetAllowedStep() const;
bool HaveOutdatedTxs() const;
- bool ShardOverloaded() const {
- ui64 txLimit = Settings.OverloadTxInFlight;
- ui64 writesLimit = Settings.OverloadWritesInFlight;
- ui64 writesSizeLimit = Settings.OverloadWritesSizeInFlight;
- return (txLimit && Executor()->GetStats().TxInFly > txLimit) ||
- (writesLimit && WritesInFlight > writesLimit) ||
- (writesSizeLimit && WritesSizeInFlight > writesSizeLimit);
- }
-
TWriteId HasLongTxWrite(const NLongTxService::TLongTxId& longTxId, const ui32 partId);
TWriteId GetLongTxWrite(NIceDb::TNiceDb& db, const NLongTxService::TLongTxId& longTxId, const ui32 partId);
void AddLongTxWrite(TWriteId writeId, ui64 txId);