diff options
author | chertus <[email protected]> | 2023-06-13 15:50:29 +0300 |
---|---|---|
committer | chertus <[email protected]> | 2023-06-13 15:50:29 +0300 |
commit | 915e9312e516b36debfbda26eb504bfe3a3bf257 (patch) | |
tree | 8b6fa4b620a577bc6d975235fe9219400ff78457 | |
parent | a3ff846aff3c30ea9f5d8d2fb96f731bde92c501 (diff) |
limit writes in flight
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__write.cpp | 19 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.h | 20 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_ut_common.cpp | 12 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_ut_common.h | 5 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/defs.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp | 81 |
6 files changed, 116 insertions, 23 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 82b994b1a98..4581164bec0 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -180,7 +180,8 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex } else { errCode = NKikimrTxColumnShard::EResultStatus::STORAGE_ERROR; } - --WritesInFly; // write failed + --WritesInFlight; // write failed + WritesSizeInFlight -= ev->Get()->ResourceUsage.SourceMemorySize; } auto result = std::make_unique<TEvColumnShard::TEvWriteResult>( @@ -191,7 +192,8 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex LOG_S_DEBUG("Write (record) " << data.size() << " bytes into pathId " << tableId << (writeId? (" writeId " + ToString(writeId)).c_str() : "") << " at tablet " << TabletID()); - --WritesInFly; // write successed + --WritesInFlight; // write successed + WritesSizeInFlight -= ev->Get()->ResourceUsage.SourceMemorySize; Y_VERIFY(putStatus == NKikimrProto::OK); Execute(new TTxWrite(this, ev), ctx); } else if (isOutOfSpace) { @@ -230,20 +232,23 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex } } + ev->Get()->MaxSmallBlobSize = Settings.MaxSmallBlobSize; + ev->Get()->ResourceUsage.SourceMemorySize = data.size(); + + ++WritesInFlight; // write started + WritesSizeInFlight += ev->Get()->ResourceUsage.SourceMemorySize; + LOG_S_DEBUG("Write (blob) " << data.size() << " bytes into pathId " << tableId << (writeId? (" writeId " + ToString(writeId)).c_str() : "") + << " inflight " << WritesInFlight << " (" << WritesSizeInFlight << " bytes)" << " at tablet " << TabletID()); - ev->Get()->MaxSmallBlobSize = Settings.MaxSmallBlobSize; - - ++WritesInFly; // write started - const auto& snapshotSchema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema(); ctx.Register(CreateWriteActor(TabletID(), snapshotSchema, ctx.SelfID, BlobManager->StartBlobBatch(), Settings.BlobWriteGrouppingEnabled, ev->Release())); } - SetCounter(COUNTER_WRITES_IN_FLY, WritesInFly); + SetCounter(COUNTER_WRITES_IN_FLY, WritesInFlight); } } diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index f2302d96466..c78a1d14cba 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -53,16 +53,15 @@ struct TSettings { TControlWrapper CacheDataAfterIndexing; TControlWrapper CacheDataAfterCompaction; TControlWrapper MaxSmallBlobSize; - TControlWrapper OverloadTxInFly; - TControlWrapper OverloadWritesInFly; + static constexpr ui64 OverloadTxInFlight = 1000; + static constexpr ui64 OverloadWritesInFlight = 1000; + static constexpr ui64 OverloadWritesSizeInFlight = 128 * 1024 * 1024; TSettings() : BlobWriteGrouppingEnabled(1, 0, 1) , CacheDataAfterIndexing(1, 0, 1) , CacheDataAfterCompaction(1, 0, 1) , MaxSmallBlobSize(0, 0, 8000000) - , OverloadTxInFly(1000, 0, 10000) - , OverloadWritesInFly(1000, 0, 10000) {} void RegisterControls(TControlBoard& icb) { @@ -70,8 +69,6 @@ struct TSettings { icb.RegisterSharedControl(CacheDataAfterIndexing, "ColumnShardControls.CacheDataAfterIndexing"); icb.RegisterSharedControl(CacheDataAfterCompaction, "ColumnShardControls.CacheDataAfterCompaction"); icb.RegisterSharedControl(MaxSmallBlobSize, "ColumnShardControls.MaxSmallBlobSize"); - icb.RegisterSharedControl(OverloadTxInFly, "ColumnShardControls.OverloadTxInFly"); - icb.RegisterSharedControl(OverloadWritesInFly, "ColumnShardControls.OverloadWritesInFly"); } }; @@ -416,7 +413,8 @@ private: ui64 LastPlannedStep = 0; ui64 LastPlannedTxId = 0; ui64 LastExportNo = 0; - ui64 WritesInFly = 0; + ui64 WritesInFlight = 0; + ui64 WritesSizeInFlight = 0; ui64 OwnerPathId = 0; ui64 StatsReportRound = 0; ui32 SkippedIndexations = TSettings::MAX_INDEXATIONS_TO_SKIP; // Force indexation on tablet init @@ -487,10 +485,12 @@ private: bool HaveOutdatedTxs() const; bool ShardOverloaded() const { - ui64 txLimit = Settings.OverloadTxInFly; - ui64 writesLimit = Settings.OverloadWritesInFly; + ui64 txLimit = Settings.OverloadTxInFlight; + ui64 writesLimit = Settings.OverloadWritesInFlight; + ui64 writesSizeLimit = Settings.OverloadWritesSizeInFlight; return (txLimit && Executor()->GetStats().TxInFly > txLimit) || - (writesLimit && WritesInFly > writesLimit); + (writesLimit && WritesInFlight > writesLimit) || + (writesSizeLimit && WritesSizeInFlight > writesSizeLimit); } TWriteId HasLongTxWrite(const NLongTxService::TLongTxId& longTxId, const ui32 partId); diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp index a80fba2971d..345dbe751a3 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp +++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp @@ -76,13 +76,21 @@ void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot } bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 writeId, ui64 tableId, - const TString& data, std::shared_ptr<arrow::Schema> schema) { + const TString& data, std::shared_ptr<arrow::Schema> schema, bool waitResult) { const TString dedupId = ToString(writeId); auto write = std::make_unique<TEvColumnShard::TEvWrite>(sender, metaShard, writeId, tableId, dedupId, data, 1); if (schema) { write->SetArrowSchema(NArrow::SerializeSchema(*schema)); } ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, write.release()); + + if (waitResult) { + return WaitWriteResult(runtime, metaShard) == NKikimrTxColumnShard::EResultStatus::SUCCESS; + } + return true; +} + +ui32 WaitWriteResult(TTestBasicRuntime& runtime, ui64 metaShard) { TAutoPtr<IEventHandle> handle; auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvWriteResult>(handle); UNIT_ASSERT(event); @@ -90,7 +98,7 @@ bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui6 auto& resWrite = Proto(event); UNIT_ASSERT_EQUAL(resWrite.GetOrigin(), TTestTxConfig::TxTablet0); UNIT_ASSERT_EQUAL(resWrite.GetTxInitiator(), metaShard); - return (resWrite.GetStatus() == NKikimrTxColumnShard::EResultStatus::SUCCESS); + return resWrite.GetStatus(); } std::optional<ui64> WriteData(TTestBasicRuntime& runtime, TActorId& sender, const NLongTxService::TLongTxId& longTxId, diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h index 49e114ff2c1..29f56d65074 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.h +++ b/ydb/core/tx/columnshard/columnshard_ut_common.h @@ -388,10 +388,11 @@ bool ProposeSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, const TString void ProvideTieringSnapshot(TTestBasicRuntime& runtime, TActorId& sender, NMetadata::NFetcher::ISnapshot::TPtr snapshot); void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap); bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 writeId, ui64 tableId, - const TString& data, std::shared_ptr<arrow::Schema> schema = {}); + const TString& data, std::shared_ptr<arrow::Schema> schema = {}, bool waitResult = true); std::optional<ui64> WriteData(TTestBasicRuntime& runtime, TActorId& sender, const NLongTxService::TLongTxId& longTxId, ui64 tableId, const TString& dedupId, const TString& data, std::shared_ptr<arrow::Schema> schema = {}); +ui32 WaitWriteResult(TTestBasicRuntime& runtime, ui64 metaShard); void ScanIndexStats(TTestBasicRuntime& runtime, TActorId& sender, const std::vector<ui64>& pathIds, NOlap::TSnapshot snap, ui64 scanId = 0); void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 txId, const std::vector<ui64>& writeIds); @@ -416,7 +417,7 @@ TString MakeTestBlob(std::pair<ui64, ui64> range, const std::vector<std::pair<TS TSerializedTableRange MakeTestRange(std::pair<ui64, ui64> range, bool inclusiveFrom, bool inclusiveTo, const std::vector<std::pair<TString, NScheme::TTypeInfo>>& columns); - + } namespace NKikimr::NColumnShard { diff --git a/ydb/core/tx/columnshard/defs.h b/ydb/core/tx/columnshard/defs.h index 49f59feae87..71dc63ee75d 100644 --- a/ydb/core/tx/columnshard/defs.h +++ b/ydb/core/tx/columnshard/defs.h @@ -98,10 +98,12 @@ struct TCompactionLimits { struct TUsage { ui64 CPUExecTime{}; ui64 Network{}; + ui64 SourceMemorySize{}; void Add(const TUsage& other) { CPUExecTime += other.CPUExecTime; Network += other.Network; + SourceMemorySize += other.SourceMemorySize; } }; diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp index a11ee3c94f6..8a5d17a5695 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp @@ -463,8 +463,74 @@ void TestWrite(const TestTableDescription& table) { UNIT_ASSERT(!ok); } +void TestWriteOverload(const TestTableDescription& table) { + TTestBasicRuntime runtime; + TTester::Setup(runtime); + + TActorId sender = runtime.AllocateEdgeActor(); + CreateTestBootstrapper(runtime, CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard); + + TDispatchOptions options; + options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot)); + runtime.DispatchEvents(options); + + // + + ui64 metaShard = TTestTxConfig::TxTablet1; + ui64 writeId = 0; + ui64 tableId = 1; + + SetupSchema(runtime, sender, tableId, table); + + TString testBlob = MakeTestBlob({0, 100 * 1000}, table.Schema); + UNIT_ASSERT(testBlob.size() > NOlap::TCompactionLimits::MAX_BLOB_SIZE / 2); + UNIT_ASSERT(testBlob.size() < NOlap::TCompactionLimits::MAX_BLOB_SIZE); + + const ui64 overloadSize = NColumnShard::TSettings::OverloadWritesSizeInFlight; + ui32 toCatch = overloadSize / testBlob.size() + 1; + UNIT_ASSERT_VALUES_EQUAL(toCatch, 22); + TDeque<TAutoPtr<IEventHandle>> capturedWrites; + + auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &ev) { + if (auto* msg = TryGetPrivateEvent<TEvColumnShard::TEvWrite>(ev)) { + Cerr << "CATCH TEvWrite, status " << msg->GetPutStatus() << Endl; + if (toCatch && msg->GetPutStatus() != NKikimrProto::UNKNOWN) { + capturedWrites.push_back(ev.Release()); + --toCatch; + return true; + } else { + return false; + } + } + return false; + }; + + auto resendOneCaptured = [&]() { + UNIT_ASSERT(capturedWrites.size()); + Cerr << "RESEND TEvWrite" << Endl; + runtime.Send(capturedWrites.front().Release()); + capturedWrites.pop_front(); + }; + + runtime.SetEventFilter(captureEvents); + + const ui32 toSend = toCatch + 1; + for (ui32 i = 0; i < toSend; ++i) { + UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, testBlob, {}, false)); + } + + UNIT_ASSERT_VALUES_EQUAL(WaitWriteResult(runtime, metaShard), (ui32)NKikimrTxColumnShard::EResultStatus::OVERLOADED); + + while (capturedWrites.size()) { + resendOneCaptured(); + UNIT_ASSERT_VALUES_EQUAL(WaitWriteResult(runtime, metaShard), (ui32)NKikimrTxColumnShard::EResultStatus::SUCCESS); + } + + UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, testBlob)); // OK after overload +} + // TODO: Improve test. It does not catch KIKIMR-14890 -void TestWriteReadDup() { +void TestWriteReadDup(const TestTableDescription& table = {}) { TTestBasicRuntime runtime; TTester::Setup(runtime); @@ -481,7 +547,7 @@ void TestWriteReadDup() { ui64 writeId = 0; ui64 tableId = 1; - auto ydbSchema = TTestSchema::YdbSchema(); + auto ydbSchema = table.Schema; SetupSchema(runtime, sender, tableId); constexpr ui32 numRows = 10; @@ -1792,6 +1858,17 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { TestWrite(table); } + Y_UNIT_TEST(WriteOverload) { + TestTableDescription table; + TestWriteOverload(table); + } + + Y_UNIT_TEST(WriteStandaloneOverload) { + TestTableDescription table; + table.InStore = false; + TestWriteOverload(table); + } + Y_UNIT_TEST(WriteReadDuplicate) { TestWriteReadDup(); TestWriteReadLongTxDup(); |