diff options
author | eivanov89 <eivanov89@ydb.tech> | 2022-11-23 12:44:32 +0300 |
---|---|---|
committer | eivanov89 <eivanov89@ydb.tech> | 2022-11-23 12:44:32 +0300 |
commit | 6e3cb898bdd8a3bdb3e06fad38a0b5df9ee56cf5 (patch) | |
tree | 01484ff8017351508978b5e44952bc9096fc97e2 | |
parent | 6699aa370edfb45e3d09a7bb233da0341f693870 (diff) | |
download | ydb-6e3cb898bdd8a3bdb3e06fad38a0b5df9ee56cf5.tar.gz |
PR from branch users/eivanov89/-simplify-ds-load-actors
add proto option to be implemented
add warmup to load actors
add partition settings to load actors
-rw-r--r-- | ydb/core/protos/datashard_load.proto | 19 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_testload.cpp | 19 | ||||
-rw-r--r-- | ydb/core/tx/datashard/testload/test_load_actor.cpp | 74 |
3 files changed, 103 insertions, 9 deletions
diff --git a/ydb/core/protos/datashard_load.proto b/ydb/core/protos/datashard_load.proto index 3a0c1d28294..d85bdea56d3 100644 --- a/ydb/core/protos/datashard_load.proto +++ b/ydb/core/protos/datashard_load.proto @@ -24,6 +24,9 @@ message TEvTestLoadRequest { // Key is a string like sprintf("user%.19lu", keyNum) optional uint64 KeyFrom = 3 [default = 0]; + + // special mode: actor writes RowCount rows again and again + optional bool Infinite = 4; } message TReadStart { @@ -42,13 +45,21 @@ message TEvTestLoadRequest { } message TTableSetup { + optional string WorkingDir = 1; // i.e. /Root/db1 + optional string TableName = 2; + // if DropTable specified - then drop and create - optional bool DropTable = 1; + optional bool DropTable = 3; - optional bool CreateTable = 2; + optional bool CreateTable = 4; - optional string WorkingDir = 3; // i.e. /Root/db1 - optional string TableName = 4; + // makes sence only with DropTable or CreateTable + // implicitely: split by load is false, split by size is true + optional uint64 MinParts = 5 [default = 1]; + optional uint64 MaxParts = 6 [default = 1]; + optional uint64 MaxPartSizeMb = 7 [default = 2000]; + + optional bool SkipWarmup = 8; } optional uint64 Cookie = 1; diff --git a/ydb/core/tx/datashard/datashard_ut_testload.cpp b/ydb/core/tx/datashard/datashard_ut_testload.cpp index 2f519142ae4..45927ddc60e 100644 --- a/ydb/core/tx/datashard/datashard_ut_testload.cpp +++ b/ydb/core/tx/datashard/datashard_ut_testload.cpp @@ -487,7 +487,18 @@ Y_UNIT_TEST_SUITE(UpsertLoad) { setupTable.SetWorkingDir("/Root"); setupTable.SetTableName(settings.TableName); + setupTable.SetMinParts(11); + setupTable.SetMaxParts(13); + setupTable.SetMaxPartSizeMb(1234); + helper.RunUpsertTestLoad(std::move(request), 0, expectedRowCount); + + const auto& description = helper.Table.UserTable.GetDescription(); + const auto& partitioning = description.GetPartitionConfig().GetPartitioningPolicy(); + UNIT_ASSERT_VALUES_EQUAL(partitioning.GetMinPartitionsCount(), 11UL); + UNIT_ASSERT_VALUES_EQUAL(partitioning.GetMaxPartitionsCount(), 13UL); + UNIT_ASSERT_VALUES_EQUAL(partitioning.GetSizeToSplit(), 1234UL << 20); + UNIT_ASSERT_VALUES_EQUAL(partitioning.GetSplitByLoadSettings().GetEnabled(), false); } Y_UNIT_TEST(ShouldDropCreateTable) { @@ -526,6 +537,14 @@ Y_UNIT_TEST_SUITE(UpsertLoad) { setupTable.SetTableName(settings.TableName); helper.RunUpsertTestLoad(std::move(request), 0, expectedRowCount, true); + + // check default table params + const auto& description = helper.Table.UserTable.GetDescription(); + const auto& partitioning = description.GetPartitionConfig().GetPartitioningPolicy(); + UNIT_ASSERT_VALUES_EQUAL(partitioning.GetMinPartitionsCount(), 1UL); + UNIT_ASSERT_VALUES_EQUAL(partitioning.GetMaxPartitionsCount(), 1UL); + UNIT_ASSERT_VALUES_EQUAL(partitioning.GetSizeToSplit(), 2000UL << 20); + UNIT_ASSERT_VALUES_EQUAL(partitioning.GetSplitByLoadSettings().GetEnabled(), false); } } // Y_UNIT_TEST_SUITE(UpsertLoad) diff --git a/ydb/core/tx/datashard/testload/test_load_actor.cpp b/ydb/core/tx/datashard/testload/test_load_actor.cpp index 9d8412fb116..60146c1248e 100644 --- a/ydb/core/tx/datashard/testload/test_load_actor.cpp +++ b/ydb/core/tx/datashard/testload/test_load_actor.cpp @@ -45,6 +45,7 @@ public: DropTable, CreateTable, DescribePath, + Warmup, RunLoad, }; @@ -166,6 +167,8 @@ public: return CreateTable(ctx); case EState::DescribePath: return DescribePath(ctx); + case EState::Warmup: + return Warmup(ctx); case EState::RunLoad: return RunLoad(ctx); default: @@ -215,13 +218,19 @@ public: field8 Utf8, field9 Utf8, PRIMARY KEY(key) - ); - )__"; + ) WITH (AUTO_PARTITIONING_BY_LOAD = DISABLED, + AUTO_PARTITIONING_BY_SIZE = ENABLED, + AUTO_PARTITIONING_PARTITION_SIZE_MB = %)__" PRIu64 R"__(, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %)__" PRIu64 R"__(, + AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = %)__" PRIu64 ");"; TString query = Sprintf( queryBase.c_str(), setup.GetWorkingDir().c_str(), - setup.GetTableName().c_str()); + setup.GetTableName().c_str(), + setup.GetMaxPartSizeMb(), + setup.GetMinParts(), + setup.GetMaxParts()); SendQuery(ctx, std::move(query)); } @@ -295,10 +304,57 @@ public: target.SetWorkingDir(setup.GetWorkingDir()); target.SetTableName(setup.GetTableName()); - State = EState::RunLoad; + State = EState::Warmup; PrepareTable(ctx); } + void Warmup(const TActorContext& ctx) { + // load initial rows, so that later we write *same* rows to non-empty shard, + // i.e. shard which calculates stats, compacts, etc + + if (!Request.HasTableSetup() || Request.GetTableSetup().GetSkipWarmup()) { + State = EState::RunLoad; + PrepareTable(ctx); + return; + } + + NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart cmd; + switch (Request.Command_case()) { + case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertBulkStart: + cmd = Request.GetUpsertBulkStart(); + break; + case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertLocalMkqlStart: + cmd = Request.GetUpsertLocalMkqlStart(); + break; + case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertKqpStart: + cmd = Request.GetUpsertKqpStart(); + break; + case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertProposeStart: + cmd = Request.GetUpsertProposeStart(); + break; + default: + State = EState::RunLoad; + return PrepareTable(ctx); + } + + const auto& target = Request.GetTargetShard(); + LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag + << " warmups table# " << target.GetTableName() + << " in dir# " << target.GetWorkingDir() + << " with rows# " << cmd.GetRowCount()); + + // TODO: we need bulk upsert with normal batch size, not 1 row per request + cmd.SetInflight(100); + + LoadActors.insert(ctx.Register( + CreateUpsertBulkActor( + cmd, + target, + ctx.SelfID, + GetServiceCounters(Counters, "load_actor"), + ++LastTag))); + } + void RunLoad(const TActorContext& ctx) { StartTs = TAppData::TimeProvider->Now(); const ui64 tag = ++LastTag; @@ -363,6 +419,7 @@ public: void Handle(TEvDataShardLoad::TEvTestLoadFinished::TPtr& ev, const TActorContext& ctx) { const auto& msg = ev->Get(); + LoadActors.erase(ev->Sender); if (msg->ErrorReason || !msg->Report) { TStringStream ss; @@ -371,12 +428,16 @@ public: return; } + if (State == EState::Warmup) { + State = EState::RunLoad; + return PrepareTable(ctx); + } + LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag << " received finished from actor# " << ev->Sender << " with tag# " << msg->Tag); FinishedTests.push_back({msg->Tag, msg->ErrorReason, TAppData::TimeProvider->Now(), msg->Report}); - LoadActors.erase(ev->Sender); if (LoadActors.empty()) { Finish(ctx); return; @@ -710,6 +771,9 @@ inline void Out<NKikimr::NDataShardLoad::TLoad::EState>( case NKikimr::NDataShardLoad::TLoad::EState::DescribePath: o << "describePath"; break; + case NKikimr::NDataShardLoad::TLoad::EState::Warmup: + o << "warmup"; + break; case NKikimr::NDataShardLoad::TLoad::EState::RunLoad: o << "runLoad"; break; |