diff options
author | nsofya <nsofya@yandex-team.com> | 2023-07-17 17:45:20 +0300 |
---|---|---|
committer | nsofya <nsofya@yandex-team.com> | 2023-07-17 17:45:20 +0300 |
commit | 89ec8bd3dbb74ea165a4340b80dfd52f319fd4a1 (patch) | |
tree | 09219ed0cd928f134766b598b904b22d06c39264 | |
parent | 3337775f0751aa9bb532c245c8056f38ea336237 (diff) | |
download | ydb-89ec8bd3dbb74ea165a4340b80dfd52f319fd4a1.tar.gz |
Wrap WritesInFlight control
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__write.cpp | 15 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.h | 72 |
3 files changed, 67 insertions, 21 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 68a33666551..b2dcecd56b0 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -150,7 +150,7 @@ TColumnShard::EOverloadStatus TColumnShard::CheckOverloaded(const ui64 tableId, } else if (TablesManager.IsOverloaded(tableId)) { CSCounters.OnOverloadGranule(dataSize);; return EOverloadStatus::Granule; - } else if (ShardOverloaded()) { + } else if (WritesMonitor.ShardOverloaded()) { CSCounters.OnOverloadShard(dataSize); return EOverloadStatus::Shard; } @@ -162,8 +162,7 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo OnYellowChannels(putResult); const auto& writeMeta = ev->Get()->GetWriteMeta(); - --WritesInFlight; - WritesSizeInFlight -= putResult.GetResourceUsage().SourceMemorySize; + auto wg = WritesMonitor.FinishWrite(putResult.GetResourceUsage().SourceMemorySize); if (putResult.GetPutStatus() != NKikimrProto::OK) { IncCounter(COUNTER_WRITE_FAIL); @@ -179,7 +178,6 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, errCode); ctx.Send(writeMeta.GetSource(), result.release()); - SetCounter(COUNTER_WRITES_IN_FLY, WritesInFlight); return; } @@ -189,7 +187,6 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo << (writeMeta.GetWriteId() ? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : "") << " at tablet " << TabletID()); Execute(new TTxWrite(this, ev), ctx); - SetCounter(COUNTER_WRITES_IN_FLY, WritesInFlight); } void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx) { @@ -249,19 +246,17 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex } } - ++WritesInFlight; - WritesSizeInFlight += writeData.GetSize(); + auto wg = WritesMonitor.RegisterWrite(writeData.GetSize()); LOG_S_DEBUG("Write (blob) " << writeData.GetSize() << " bytes into pathId " << writeMeta.GetTableId() - << (writeMeta.GetWriteId()? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : "") - << " inflight " << WritesInFlight << " (" << WritesSizeInFlight << " bytes)" + << (writeMeta.GetWriteId()? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : " ") + << WritesMonitor.DebugString() << " at tablet " << TabletID()); const auto& snapshotSchema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema(); auto writeController = std::make_shared<NOlap::TIndexedWriteController>(ctx.SelfID, writeData, snapshotSchema); ctx.Register(CreateWriteActor(TabletID(), writeController, BlobManager->StartBlobBatch(), TInstant::Max(), Settings.MaxSmallBlobSize)); } - SetCounter(COUNTER_WRITES_IN_FLY, WritesInFlight); } } diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index af9689b3530..c542dc36161 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -132,6 +132,7 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet) , InsertTable(std::make_unique<NOlap::TInsertTable>()) , ReadCounters("Read") , ScanCounters("Scan") + , WritesMonitor(*this) { TabletCountersPtr.reset(new TProtobufTabletCounters< ESimpleCounters_descriptor, diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 584497c9189..f344de4a30e 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -436,6 +436,65 @@ private: TTablesManager TablesManager; + class TWritesMonitor { + private: + TColumnShard& Owner; + ui64 WritesInFlight = 0; + ui64 WritesSizeInFlight = 0; + + public: + class TGuard: public TNonCopyable { + friend class TWritesMonitor; + private: + TWritesMonitor& Owner; + + explicit TGuard(TWritesMonitor& owner) + : Owner(owner) + {} + + public: + ~TGuard() { + Owner.UpdateCounters(); + } + }; + + TWritesMonitor(TColumnShard& owner) + : Owner(owner) + {} + + TGuard RegisterWrite(const ui64 dataSize) { + ++WritesInFlight; + WritesSizeInFlight += dataSize; + return TGuard(*this); + } + + TGuard FinishWrite(const ui64 dataSize) { + Y_VERIFY(WritesInFlight > 0); + Y_VERIFY(WritesSizeInFlight >= dataSize); + --WritesInFlight; + WritesSizeInFlight -= dataSize; + return TGuard(*this); + } + + bool ShardOverloaded() const { + ui64 txLimit = Owner.Settings.OverloadTxInFlight; + ui64 writesLimit = Owner.Settings.OverloadWritesInFlight; + ui64 writesSizeLimit = Owner.Settings.OverloadWritesSizeInFlight; + return (txLimit && Owner.Executor()->GetStats().TxInFly > txLimit) || + (writesLimit && WritesInFlight > writesLimit) || + (writesSizeLimit && WritesSizeInFlight > writesSizeLimit); + } + + TString DebugString() const { + return TStringBuilder() << "TWritesMonitor: inflight " << WritesInFlight << " (" << WritesSizeInFlight << " bytes)"; + } + + private: + void UpdateCounters() { + Owner.SetCounter(COUNTER_WRITES_IN_FLY, WritesInFlight); + } + }; + ui64 CurrentSchemeShardId = 0; TMessageSeqNo LastSchemaSeqNo; std::optional<NKikimrSubDomains::TProcessingParams> ProcessingParams; @@ -443,8 +502,7 @@ private: ui64 LastPlannedStep = 0; ui64 LastPlannedTxId = 0; ui64 LastExportNo = 0; - ui64 WritesInFlight = 0; - ui64 WritesSizeInFlight = 0; + ui64 OwnerPathId = 0; ui64 TabletTxCounter = 0; ui64 StatsReportRound = 0; @@ -480,6 +538,7 @@ private: const TIndexationCounters EvictionCounters = TIndexationCounters("Eviction"); const TCSCounters CSCounters; + TWritesMonitor WritesMonitor; THashMap<ui64, TBasicTxInfo> BasicTxInfo; @@ -515,15 +574,6 @@ private: ui64 GetAllowedStep() const; bool HaveOutdatedTxs() const; - bool ShardOverloaded() const { - ui64 txLimit = Settings.OverloadTxInFlight; - ui64 writesLimit = Settings.OverloadWritesInFlight; - ui64 writesSizeLimit = Settings.OverloadWritesSizeInFlight; - return (txLimit && Executor()->GetStats().TxInFly > txLimit) || - (writesLimit && WritesInFlight > writesLimit) || - (writesSizeLimit && WritesSizeInFlight > writesSizeLimit); - } - TWriteId HasLongTxWrite(const NLongTxService::TLongTxId& longTxId, const ui32 partId); TWriteId GetLongTxWrite(NIceDb::TNiceDb& db, const NLongTxService::TLongTxId& longTxId, const ui32 partId); void AddLongTxWrite(TWriteId writeId, ui64 txId); |