aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreivanov89 <eivanov89@ydb.tech>2022-11-23 12:44:32 +0300
committereivanov89 <eivanov89@ydb.tech>2022-11-23 12:44:32 +0300
commit6e3cb898bdd8a3bdb3e06fad38a0b5df9ee56cf5 (patch)
tree01484ff8017351508978b5e44952bc9096fc97e2
parent6699aa370edfb45e3d09a7bb233da0341f693870 (diff)
downloadydb-6e3cb898bdd8a3bdb3e06fad38a0b5df9ee56cf5.tar.gz
PR from branch users/eivanov89/-simplify-ds-load-actors
add proto option to be implemented add warmup to load actors add partition settings to load actors
-rw-r--r--ydb/core/protos/datashard_load.proto19
-rw-r--r--ydb/core/tx/datashard/datashard_ut_testload.cpp19
-rw-r--r--ydb/core/tx/datashard/testload/test_load_actor.cpp74
3 files changed, 103 insertions, 9 deletions
diff --git a/ydb/core/protos/datashard_load.proto b/ydb/core/protos/datashard_load.proto
index 3a0c1d28294..d85bdea56d3 100644
--- a/ydb/core/protos/datashard_load.proto
+++ b/ydb/core/protos/datashard_load.proto
@@ -24,6 +24,9 @@ message TEvTestLoadRequest {
// Key is a string like sprintf("user%.19lu", keyNum)
optional uint64 KeyFrom = 3 [default = 0];
+
+ // special mode: actor writes RowCount rows again and again
+ optional bool Infinite = 4;
}
message TReadStart {
@@ -42,13 +45,21 @@ message TEvTestLoadRequest {
}
message TTableSetup {
+ optional string WorkingDir = 1; // i.e. /Root/db1
+ optional string TableName = 2;
+
// if DropTable specified - then drop and create
- optional bool DropTable = 1;
+ optional bool DropTable = 3;
- optional bool CreateTable = 2;
+ optional bool CreateTable = 4;
- optional string WorkingDir = 3; // i.e. /Root/db1
- optional string TableName = 4;
+ // makes sence only with DropTable or CreateTable
+ // implicitely: split by load is false, split by size is true
+ optional uint64 MinParts = 5 [default = 1];
+ optional uint64 MaxParts = 6 [default = 1];
+ optional uint64 MaxPartSizeMb = 7 [default = 2000];
+
+ optional bool SkipWarmup = 8;
}
optional uint64 Cookie = 1;
diff --git a/ydb/core/tx/datashard/datashard_ut_testload.cpp b/ydb/core/tx/datashard/datashard_ut_testload.cpp
index 2f519142ae4..45927ddc60e 100644
--- a/ydb/core/tx/datashard/datashard_ut_testload.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_testload.cpp
@@ -487,7 +487,18 @@ Y_UNIT_TEST_SUITE(UpsertLoad) {
setupTable.SetWorkingDir("/Root");
setupTable.SetTableName(settings.TableName);
+ setupTable.SetMinParts(11);
+ setupTable.SetMaxParts(13);
+ setupTable.SetMaxPartSizeMb(1234);
+
helper.RunUpsertTestLoad(std::move(request), 0, expectedRowCount);
+
+ const auto& description = helper.Table.UserTable.GetDescription();
+ const auto& partitioning = description.GetPartitionConfig().GetPartitioningPolicy();
+ UNIT_ASSERT_VALUES_EQUAL(partitioning.GetMinPartitionsCount(), 11UL);
+ UNIT_ASSERT_VALUES_EQUAL(partitioning.GetMaxPartitionsCount(), 13UL);
+ UNIT_ASSERT_VALUES_EQUAL(partitioning.GetSizeToSplit(), 1234UL << 20);
+ UNIT_ASSERT_VALUES_EQUAL(partitioning.GetSplitByLoadSettings().GetEnabled(), false);
}
Y_UNIT_TEST(ShouldDropCreateTable) {
@@ -526,6 +537,14 @@ Y_UNIT_TEST_SUITE(UpsertLoad) {
setupTable.SetTableName(settings.TableName);
helper.RunUpsertTestLoad(std::move(request), 0, expectedRowCount, true);
+
+ // check default table params
+ const auto& description = helper.Table.UserTable.GetDescription();
+ const auto& partitioning = description.GetPartitionConfig().GetPartitioningPolicy();
+ UNIT_ASSERT_VALUES_EQUAL(partitioning.GetMinPartitionsCount(), 1UL);
+ UNIT_ASSERT_VALUES_EQUAL(partitioning.GetMaxPartitionsCount(), 1UL);
+ UNIT_ASSERT_VALUES_EQUAL(partitioning.GetSizeToSplit(), 2000UL << 20);
+ UNIT_ASSERT_VALUES_EQUAL(partitioning.GetSplitByLoadSettings().GetEnabled(), false);
}
} // Y_UNIT_TEST_SUITE(UpsertLoad)
diff --git a/ydb/core/tx/datashard/testload/test_load_actor.cpp b/ydb/core/tx/datashard/testload/test_load_actor.cpp
index 9d8412fb116..60146c1248e 100644
--- a/ydb/core/tx/datashard/testload/test_load_actor.cpp
+++ b/ydb/core/tx/datashard/testload/test_load_actor.cpp
@@ -45,6 +45,7 @@ public:
DropTable,
CreateTable,
DescribePath,
+ Warmup,
RunLoad,
};
@@ -166,6 +167,8 @@ public:
return CreateTable(ctx);
case EState::DescribePath:
return DescribePath(ctx);
+ case EState::Warmup:
+ return Warmup(ctx);
case EState::RunLoad:
return RunLoad(ctx);
default:
@@ -215,13 +218,19 @@ public:
field8 Utf8,
field9 Utf8,
PRIMARY KEY(key)
- );
- )__";
+ ) WITH (AUTO_PARTITIONING_BY_LOAD = DISABLED,
+ AUTO_PARTITIONING_BY_SIZE = ENABLED,
+ AUTO_PARTITIONING_PARTITION_SIZE_MB = %)__" PRIu64 R"__(,
+ AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %)__" PRIu64 R"__(,
+ AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = %)__" PRIu64 ");";
TString query = Sprintf(
queryBase.c_str(),
setup.GetWorkingDir().c_str(),
- setup.GetTableName().c_str());
+ setup.GetTableName().c_str(),
+ setup.GetMaxPartSizeMb(),
+ setup.GetMinParts(),
+ setup.GetMaxParts());
SendQuery(ctx, std::move(query));
}
@@ -295,10 +304,57 @@ public:
target.SetWorkingDir(setup.GetWorkingDir());
target.SetTableName(setup.GetTableName());
- State = EState::RunLoad;
+ State = EState::Warmup;
PrepareTable(ctx);
}
+ void Warmup(const TActorContext& ctx) {
+ // load initial rows, so that later we write *same* rows to non-empty shard,
+ // i.e. shard which calculates stats, compacts, etc
+
+ if (!Request.HasTableSetup() || Request.GetTableSetup().GetSkipWarmup()) {
+ State = EState::RunLoad;
+ PrepareTable(ctx);
+ return;
+ }
+
+ NKikimrDataShardLoad::TEvTestLoadRequest::TUpdateStart cmd;
+ switch (Request.Command_case()) {
+ case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertBulkStart:
+ cmd = Request.GetUpsertBulkStart();
+ break;
+ case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertLocalMkqlStart:
+ cmd = Request.GetUpsertLocalMkqlStart();
+ break;
+ case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertKqpStart:
+ cmd = Request.GetUpsertKqpStart();
+ break;
+ case NKikimrDataShardLoad::TEvTestLoadRequest::CommandCase::kUpsertProposeStart:
+ cmd = Request.GetUpsertProposeStart();
+ break;
+ default:
+ State = EState::RunLoad;
+ return PrepareTable(ctx);
+ }
+
+ const auto& target = Request.GetTargetShard();
+ LOG_INFO_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag
+ << " warmups table# " << target.GetTableName()
+ << " in dir# " << target.GetWorkingDir()
+ << " with rows# " << cmd.GetRowCount());
+
+ // TODO: we need bulk upsert with normal batch size, not 1 row per request
+ cmd.SetInflight(100);
+
+ LoadActors.insert(ctx.Register(
+ CreateUpsertBulkActor(
+ cmd,
+ target,
+ ctx.SelfID,
+ GetServiceCounters(Counters, "load_actor"),
+ ++LastTag)));
+ }
+
void RunLoad(const TActorContext& ctx) {
StartTs = TAppData::TimeProvider->Now();
const ui64 tag = ++LastTag;
@@ -363,6 +419,7 @@ public:
void Handle(TEvDataShardLoad::TEvTestLoadFinished::TPtr& ev, const TActorContext& ctx) {
const auto& msg = ev->Get();
+ LoadActors.erase(ev->Sender);
if (msg->ErrorReason || !msg->Report) {
TStringStream ss;
@@ -371,12 +428,16 @@ public:
return;
}
+ if (State == EState::Warmup) {
+ State = EState::RunLoad;
+ return PrepareTable(ctx);
+ }
+
LOG_DEBUG_S(ctx, NKikimrServices::DS_LOAD_TEST, "TLoad# " << Tag
<< " received finished from actor# " << ev->Sender << " with tag# " << msg->Tag);
FinishedTests.push_back({msg->Tag, msg->ErrorReason, TAppData::TimeProvider->Now(), msg->Report});
- LoadActors.erase(ev->Sender);
if (LoadActors.empty()) {
Finish(ctx);
return;
@@ -710,6 +771,9 @@ inline void Out<NKikimr::NDataShardLoad::TLoad::EState>(
case NKikimr::NDataShardLoad::TLoad::EState::DescribePath:
o << "describePath";
break;
+ case NKikimr::NDataShardLoad::TLoad::EState::Warmup:
+ o << "warmup";
+ break;
case NKikimr::NDataShardLoad::TLoad::EState::RunLoad:
o << "runLoad";
break;