summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <[email protected]>2023-06-13 15:50:29 +0300
committerchertus <[email protected]>2023-06-13 15:50:29 +0300
commit915e9312e516b36debfbda26eb504bfe3a3bf257 (patch)
tree8b6fa4b620a577bc6d975235fe9219400ff78457
parenta3ff846aff3c30ea9f5d8d2fb96f731bde92c501 (diff)
limit writes in flight
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp19
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h20
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.cpp12
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.h5
-rw-r--r--ydb/core/tx/columnshard/defs.h2
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp81
6 files changed, 116 insertions, 23 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index 82b994b1a98..4581164bec0 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -180,7 +180,8 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
} else {
errCode = NKikimrTxColumnShard::EResultStatus::STORAGE_ERROR;
}
- --WritesInFly; // write failed
+ --WritesInFlight; // write failed
+ WritesSizeInFlight -= ev->Get()->ResourceUsage.SourceMemorySize;
}
auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(
@@ -191,7 +192,8 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
LOG_S_DEBUG("Write (record) " << data.size() << " bytes into pathId " << tableId
<< (writeId? (" writeId " + ToString(writeId)).c_str() : "") << " at tablet " << TabletID());
- --WritesInFly; // write successed
+ --WritesInFlight; // write successed
+ WritesSizeInFlight -= ev->Get()->ResourceUsage.SourceMemorySize;
Y_VERIFY(putStatus == NKikimrProto::OK);
Execute(new TTxWrite(this, ev), ctx);
} else if (isOutOfSpace) {
@@ -230,20 +232,23 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
}
}
+ ev->Get()->MaxSmallBlobSize = Settings.MaxSmallBlobSize;
+ ev->Get()->ResourceUsage.SourceMemorySize = data.size();
+
+ ++WritesInFlight; // write started
+ WritesSizeInFlight += ev->Get()->ResourceUsage.SourceMemorySize;
+
LOG_S_DEBUG("Write (blob) " << data.size() << " bytes into pathId " << tableId
<< (writeId? (" writeId " + ToString(writeId)).c_str() : "")
+ << " inflight " << WritesInFlight << " (" << WritesSizeInFlight << " bytes)"
<< " at tablet " << TabletID());
- ev->Get()->MaxSmallBlobSize = Settings.MaxSmallBlobSize;
-
- ++WritesInFly; // write started
-
const auto& snapshotSchema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema();
ctx.Register(CreateWriteActor(TabletID(), snapshotSchema, ctx.SelfID,
BlobManager->StartBlobBatch(), Settings.BlobWriteGrouppingEnabled, ev->Release()));
}
- SetCounter(COUNTER_WRITES_IN_FLY, WritesInFly);
+ SetCounter(COUNTER_WRITES_IN_FLY, WritesInFlight);
}
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index f2302d96466..c78a1d14cba 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -53,16 +53,15 @@ struct TSettings {
TControlWrapper CacheDataAfterIndexing;
TControlWrapper CacheDataAfterCompaction;
TControlWrapper MaxSmallBlobSize;
- TControlWrapper OverloadTxInFly;
- TControlWrapper OverloadWritesInFly;
+ static constexpr ui64 OverloadTxInFlight = 1000;
+ static constexpr ui64 OverloadWritesInFlight = 1000;
+ static constexpr ui64 OverloadWritesSizeInFlight = 128 * 1024 * 1024;
TSettings()
: BlobWriteGrouppingEnabled(1, 0, 1)
, CacheDataAfterIndexing(1, 0, 1)
, CacheDataAfterCompaction(1, 0, 1)
, MaxSmallBlobSize(0, 0, 8000000)
- , OverloadTxInFly(1000, 0, 10000)
- , OverloadWritesInFly(1000, 0, 10000)
{}
void RegisterControls(TControlBoard& icb) {
@@ -70,8 +69,6 @@ struct TSettings {
icb.RegisterSharedControl(CacheDataAfterIndexing, "ColumnShardControls.CacheDataAfterIndexing");
icb.RegisterSharedControl(CacheDataAfterCompaction, "ColumnShardControls.CacheDataAfterCompaction");
icb.RegisterSharedControl(MaxSmallBlobSize, "ColumnShardControls.MaxSmallBlobSize");
- icb.RegisterSharedControl(OverloadTxInFly, "ColumnShardControls.OverloadTxInFly");
- icb.RegisterSharedControl(OverloadWritesInFly, "ColumnShardControls.OverloadWritesInFly");
}
};
@@ -416,7 +413,8 @@ private:
ui64 LastPlannedStep = 0;
ui64 LastPlannedTxId = 0;
ui64 LastExportNo = 0;
- ui64 WritesInFly = 0;
+ ui64 WritesInFlight = 0;
+ ui64 WritesSizeInFlight = 0;
ui64 OwnerPathId = 0;
ui64 StatsReportRound = 0;
ui32 SkippedIndexations = TSettings::MAX_INDEXATIONS_TO_SKIP; // Force indexation on tablet init
@@ -487,10 +485,12 @@ private:
bool HaveOutdatedTxs() const;
bool ShardOverloaded() const {
- ui64 txLimit = Settings.OverloadTxInFly;
- ui64 writesLimit = Settings.OverloadWritesInFly;
+ ui64 txLimit = Settings.OverloadTxInFlight;
+ ui64 writesLimit = Settings.OverloadWritesInFlight;
+ ui64 writesSizeLimit = Settings.OverloadWritesSizeInFlight;
return (txLimit && Executor()->GetStats().TxInFly > txLimit) ||
- (writesLimit && WritesInFly > writesLimit);
+ (writesLimit && WritesInFlight > writesLimit) ||
+ (writesSizeLimit && WritesSizeInFlight > writesSizeLimit);
}
TWriteId HasLongTxWrite(const NLongTxService::TLongTxId& longTxId, const ui32 partId);
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
index a80fba2971d..345dbe751a3 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
@@ -76,13 +76,21 @@ void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot
}
bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 writeId, ui64 tableId,
- const TString& data, std::shared_ptr<arrow::Schema> schema) {
+ const TString& data, std::shared_ptr<arrow::Schema> schema, bool waitResult) {
const TString dedupId = ToString(writeId);
auto write = std::make_unique<TEvColumnShard::TEvWrite>(sender, metaShard, writeId, tableId, dedupId, data, 1);
if (schema) {
write->SetArrowSchema(NArrow::SerializeSchema(*schema));
}
ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, write.release());
+
+ if (waitResult) {
+ return WaitWriteResult(runtime, metaShard) == NKikimrTxColumnShard::EResultStatus::SUCCESS;
+ }
+ return true;
+}
+
+ui32 WaitWriteResult(TTestBasicRuntime& runtime, ui64 metaShard) {
TAutoPtr<IEventHandle> handle;
auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvWriteResult>(handle);
UNIT_ASSERT(event);
@@ -90,7 +98,7 @@ bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui6
auto& resWrite = Proto(event);
UNIT_ASSERT_EQUAL(resWrite.GetOrigin(), TTestTxConfig::TxTablet0);
UNIT_ASSERT_EQUAL(resWrite.GetTxInitiator(), metaShard);
- return (resWrite.GetStatus() == NKikimrTxColumnShard::EResultStatus::SUCCESS);
+ return resWrite.GetStatus();
}
std::optional<ui64> WriteData(TTestBasicRuntime& runtime, TActorId& sender, const NLongTxService::TLongTxId& longTxId,
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h
index 49e114ff2c1..29f56d65074 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.h
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.h
@@ -388,10 +388,11 @@ bool ProposeSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, const TString
void ProvideTieringSnapshot(TTestBasicRuntime& runtime, TActorId& sender, NMetadata::NFetcher::ISnapshot::TPtr snapshot);
void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap);
bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 writeId, ui64 tableId,
- const TString& data, std::shared_ptr<arrow::Schema> schema = {});
+ const TString& data, std::shared_ptr<arrow::Schema> schema = {}, bool waitResult = true);
std::optional<ui64> WriteData(TTestBasicRuntime& runtime, TActorId& sender, const NLongTxService::TLongTxId& longTxId,
ui64 tableId, const TString& dedupId, const TString& data,
std::shared_ptr<arrow::Schema> schema = {});
+ui32 WaitWriteResult(TTestBasicRuntime& runtime, ui64 metaShard);
void ScanIndexStats(TTestBasicRuntime& runtime, TActorId& sender, const std::vector<ui64>& pathIds,
NOlap::TSnapshot snap, ui64 scanId = 0);
void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 metaShard, ui64 txId, const std::vector<ui64>& writeIds);
@@ -416,7 +417,7 @@ TString MakeTestBlob(std::pair<ui64, ui64> range, const std::vector<std::pair<TS
TSerializedTableRange MakeTestRange(std::pair<ui64, ui64> range, bool inclusiveFrom, bool inclusiveTo,
const std::vector<std::pair<TString, NScheme::TTypeInfo>>& columns);
-
+
}
namespace NKikimr::NColumnShard {
diff --git a/ydb/core/tx/columnshard/defs.h b/ydb/core/tx/columnshard/defs.h
index 49f59feae87..71dc63ee75d 100644
--- a/ydb/core/tx/columnshard/defs.h
+++ b/ydb/core/tx/columnshard/defs.h
@@ -98,10 +98,12 @@ struct TCompactionLimits {
struct TUsage {
ui64 CPUExecTime{};
ui64 Network{};
+ ui64 SourceMemorySize{};
void Add(const TUsage& other) {
CPUExecTime += other.CPUExecTime;
Network += other.Network;
+ SourceMemorySize += other.SourceMemorySize;
}
};
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
index a11ee3c94f6..8a5d17a5695 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
@@ -463,8 +463,74 @@ void TestWrite(const TestTableDescription& table) {
UNIT_ASSERT(!ok);
}
+void TestWriteOverload(const TestTableDescription& table) {
+ TTestBasicRuntime runtime;
+ TTester::Setup(runtime);
+
+ TActorId sender = runtime.AllocateEdgeActor();
+ CreateTestBootstrapper(runtime, CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard);
+
+ TDispatchOptions options;
+ options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot));
+ runtime.DispatchEvents(options);
+
+ //
+
+ ui64 metaShard = TTestTxConfig::TxTablet1;
+ ui64 writeId = 0;
+ ui64 tableId = 1;
+
+ SetupSchema(runtime, sender, tableId, table);
+
+ TString testBlob = MakeTestBlob({0, 100 * 1000}, table.Schema);
+ UNIT_ASSERT(testBlob.size() > NOlap::TCompactionLimits::MAX_BLOB_SIZE / 2);
+ UNIT_ASSERT(testBlob.size() < NOlap::TCompactionLimits::MAX_BLOB_SIZE);
+
+ const ui64 overloadSize = NColumnShard::TSettings::OverloadWritesSizeInFlight;
+ ui32 toCatch = overloadSize / testBlob.size() + 1;
+ UNIT_ASSERT_VALUES_EQUAL(toCatch, 22);
+ TDeque<TAutoPtr<IEventHandle>> capturedWrites;
+
+ auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &ev) {
+ if (auto* msg = TryGetPrivateEvent<TEvColumnShard::TEvWrite>(ev)) {
+ Cerr << "CATCH TEvWrite, status " << msg->GetPutStatus() << Endl;
+ if (toCatch && msg->GetPutStatus() != NKikimrProto::UNKNOWN) {
+ capturedWrites.push_back(ev.Release());
+ --toCatch;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ return false;
+ };
+
+ auto resendOneCaptured = [&]() {
+ UNIT_ASSERT(capturedWrites.size());
+ Cerr << "RESEND TEvWrite" << Endl;
+ runtime.Send(capturedWrites.front().Release());
+ capturedWrites.pop_front();
+ };
+
+ runtime.SetEventFilter(captureEvents);
+
+ const ui32 toSend = toCatch + 1;
+ for (ui32 i = 0; i < toSend; ++i) {
+ UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, testBlob, {}, false));
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(WaitWriteResult(runtime, metaShard), (ui32)NKikimrTxColumnShard::EResultStatus::OVERLOADED);
+
+ while (capturedWrites.size()) {
+ resendOneCaptured();
+ UNIT_ASSERT_VALUES_EQUAL(WaitWriteResult(runtime, metaShard), (ui32)NKikimrTxColumnShard::EResultStatus::SUCCESS);
+ }
+
+ UNIT_ASSERT(WriteData(runtime, sender, metaShard, ++writeId, tableId, testBlob)); // OK after overload
+}
+
// TODO: Improve test. It does not catch KIKIMR-14890
-void TestWriteReadDup() {
+void TestWriteReadDup(const TestTableDescription& table = {}) {
TTestBasicRuntime runtime;
TTester::Setup(runtime);
@@ -481,7 +547,7 @@ void TestWriteReadDup() {
ui64 writeId = 0;
ui64 tableId = 1;
- auto ydbSchema = TTestSchema::YdbSchema();
+ auto ydbSchema = table.Schema;
SetupSchema(runtime, sender, tableId);
constexpr ui32 numRows = 10;
@@ -1792,6 +1858,17 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
TestWrite(table);
}
+ Y_UNIT_TEST(WriteOverload) {
+ TestTableDescription table;
+ TestWriteOverload(table);
+ }
+
+ Y_UNIT_TEST(WriteStandaloneOverload) {
+ TestTableDescription table;
+ table.InStore = false;
+ TestWriteOverload(table);
+ }
+
Y_UNIT_TEST(WriteReadDuplicate) {
TestWriteReadDup();
TestWriteReadLongTxDup();