aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreivanov89 <eivanov89@ydb.tech>2022-11-11 00:02:38 +0300
committereivanov89 <eivanov89@ydb.tech>2022-11-11 00:02:38 +0300
commit80da9a26ff9dfef3939e054f3e02dd45a9185dd2 (patch)
tree7f565eddb2854bd2c2bc7de03f99486b74382b15
parenta2ffc87e9209ab37cff394ceef9d1d857c93a5d0 (diff)
downloadydb-80da9a26ff9dfef3939e054f3e02dd45a9185dd2.tar.gz
compact borrowed data right after split/merge
-rw-r--r--ydb/core/base/feature_flags.h4
-rw-r--r--ydb/core/protos/config.proto1
-rw-r--r--ydb/core/protos/tx_datashard.proto2
-rw-r--r--ydb/core/testlib/basics/feature_flags.h1
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h3
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__table_stats.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.h1
-rw-r--r--ydb/core/tx/schemeshard/ut_compaction.cpp218
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/test_env.cpp1
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/test_env.h1
11 files changed, 231 insertions, 7 deletions
diff --git a/ydb/core/base/feature_flags.h b/ydb/core/base/feature_flags.h
index d733e8abea..8a07d1c348 100644
--- a/ydb/core/base/feature_flags.h
+++ b/ydb/core/base/feature_flags.h
@@ -36,6 +36,10 @@ public:
SetEnableBackgroundCompactionServerless(value);
}
+ inline void SetEnableBorrowedSplitCompactionForTest(bool value) {
+ SetEnableBorrowedSplitCompaction(value);
+ }
+
inline void SetEnableMvccForTest(bool value) {
SetEnableMvcc(value
? NKikimrConfig::TFeatureFlags::VALUE_TRUE
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index ad86a33996..5d5777e139 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -741,6 +741,7 @@ message TFeatureFlags {
optional bool EnableNotNullDataColumns = 73 [default = false];
optional bool EnableGrpcAudit = 74 [default = false];
optional bool EnableKqpDataQueryStreamLookup = 75 [default = false];
+ optional bool EnableBorrowedSplitCompaction = 76 [default = true];
}
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index d415560d6e..b47a346fa9 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -789,6 +789,8 @@ message TEvPeriodicTableStats {
optional uint64 StartTime = 11; // milliseconds since epoch
optional uint64 TableOwnerId = 12;
+
+ optional bool IsDstSplit = 13;
}
message TEvS3ListingRequest {
diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h
index 661a5fd29b..2886f7baed 100644
--- a/ydb/core/testlib/basics/feature_flags.h
+++ b/ydb/core/testlib/basics/feature_flags.h
@@ -29,6 +29,7 @@ public:
FEATURE_FLAG_SETTER(EnableMvccSnapshotReads)
FEATURE_FLAG_SETTER(EnableBackgroundCompaction)
FEATURE_FLAG_SETTER(EnableBackgroundCompactionServerless)
+ FEATURE_FLAG_SETTER(EnableBorrowedSplitCompaction)
FEATURE_FLAG_SETTER(EnableNotNullColumns)
FEATURE_FLAG_SETTER(EnableBulkUpsertToAsyncIndexedTables)
FEATURE_FLAG_SETTER(EnableChangefeeds)
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index e76cbd4abd..0f270c7928 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -2800,6 +2800,9 @@ protected:
ev->Record.SetNodeId(ctx.ExecutorThread.ActorSystem->NodeId);
ev->Record.SetStartTime(StartTime().MilliSeconds());
+ if (DstSplitDescription)
+ ev->Record.SetIsDstSplit(true);
+
NTabletPipe::SendData(ctx, DbStatsReportPipe, ev.Release());
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
index da85500f5a..8b9cdada93 100644
--- a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
@@ -260,6 +260,10 @@ bool TTxStorePartitionStats::PersistSingleStats(TTransactionContext& txc, const
if (!newStats.HasBorrowedData) {
Self->RemoveBorrowedCompaction(shardIdx);
+ } else if (Self->EnableBorrowedSplitCompaction && rec.GetIsDstSplit()) {
+ // note that we want to compact only shards originating
+ // from split/merge and not shards created via copytable
+ Self->EnqueueBorrowedCompaction(shardIdx);
}
if (!table->IsBackup && !table->IsShardsStatsDetached()) {
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index 66032109e2..3777d4c25f 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -3905,6 +3905,7 @@ void TSchemeShard::OnActivateExecutor(const TActorContext &ctx) {
EnableBackgroundCompaction = appData->FeatureFlags.GetEnableBackgroundCompaction();
EnableBackgroundCompactionServerless = appData->FeatureFlags.GetEnableBackgroundCompactionServerless();
+ EnableBorrowedSplitCompaction = appData->FeatureFlags.GetEnableBorrowedSplitCompaction();
EnableMoveIndex = appData->FeatureFlags.GetEnableMoveIndex();
ConfigureCompactionQueues(appData->CompactionConfig, ctx);
@@ -6266,6 +6267,7 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TFeatureFlags& featu
EnableBackgroundCompaction = featureFlags.GetEnableBackgroundCompaction();
EnableBackgroundCompactionServerless = featureFlags.GetEnableBackgroundCompactionServerless();
+ EnableBorrowedSplitCompaction = featureFlags.GetEnableBorrowedSplitCompaction();
EnableMoveIndex = featureFlags.GetEnableMoveIndex();
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h
index 572cb913bb..da52c6b7e7 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.h
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.h
@@ -252,6 +252,7 @@ public:
THashSet<TShardIdx> ShardsWithLoaned; // shards have parts loaned to another shards
bool EnableBackgroundCompaction = false;
bool EnableBackgroundCompactionServerless = false;
+ bool EnableBorrowedSplitCompaction = false;
bool EnableMoveIndex = false;
TShardDeleter ShardDeleter;
diff --git a/ydb/core/tx/schemeshard/ut_compaction.cpp b/ydb/core/tx/schemeshard/ut_compaction.cpp
index f80f73621e..b18b6e8d0c 100644
--- a/ydb/core/tx/schemeshard/ut_compaction.cpp
+++ b/ydb/core/tx/schemeshard/ut_compaction.cpp
@@ -84,7 +84,7 @@ void WriteData(
TString writeQuery = Sprintf(R"(
(
(let key '( '('key (Uint64 '%lu)) ) )
- (let value '('('value (Utf8 'MostMeaninglessValueInTheWorld)) ) )
+ (let value '('('value (Utf8 'MostMeaninglessValueInTheWorldButMaybeItIsSizeMeaningFullThusItIsMostMeaningFullValueInTheWorldOfMeaninglessFullness)) ) )
(return (AsList (UpdateRow '__user__%s key value) ))
)
)", key, tableName);
@@ -100,6 +100,32 @@ void WriteData(
}
}
+void WriteDataSpreadKeys(
+ TTestActorRuntime &runtime,
+ const char* name,
+ ui64 rowCount,
+ ui64 tabletId = TTestTxConfig::FakeHiveTablets)
+{
+ auto fnWriteRow = [&] (ui64 tabletId, ui64 key, const char* tableName) {
+ TString writeQuery = Sprintf(R"(
+ (
+ (let key '( '('key (Uint64 '%lu)) ) )
+ (let value '('('value (Utf8 'MostMeaninglessValueInTheWorldButMaybeItIsSizeMeaningFullThusItIsMostMeaningFullValueInTheWorldOfMeaninglessFullness)) ) )
+ (return (AsList (UpdateRow '__user__%s key value) ))
+ )
+ )", key, tableName);
+ NKikimrMiniKQL::TResult result;
+ TString err;
+ NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, writeQuery, result, err);
+ UNIT_ASSERT_VALUES_EQUAL(err, "");
+ UNIT_ASSERT_VALUES_EQUAL(status, NKikimrProto::EReplyStatus::OK);;
+ };
+
+ for (ui64 key = 0; key < rowCount; ++key) {
+ fnWriteRow(tabletId, key * 1'000'000, name);
+ }
+}
+
void CreateTableWithData(
TTestActorRuntime &runtime,
TTestEnv& env,
@@ -172,6 +198,12 @@ void SetBackgroundCompaction(TTestActorRuntime &runtime, TTestEnv& env, ui64 sch
SetFeatures(runtime, env, schemeShard, features);
}
+void SetEnableBorrowedSplitCompaction(TTestActorRuntime &runtime, TTestEnv& env, ui64 schemeShard, bool value) {
+ NKikimrConfig::TFeatureFlags features;
+ features.SetEnableBorrowedSplitCompaction(value);
+ SetFeatures(runtime, env, schemeShard, features);
+}
+
void DisableBackgroundCompactionViaRestart(
TTestActorRuntime& runtime,
TTestEnv&,
@@ -731,15 +763,24 @@ Y_UNIT_TEST_SUITE(TSchemeshardBackgroundCompactionTest) {
};
Y_UNIT_TEST_SUITE(TSchemeshardBorrowedCompactionTest) {
- Y_UNIT_TEST(SchemeshardShouldCompactBorrowed) {
+ Y_UNIT_TEST(SchemeshardShouldCompactBorrowedBeforeSplit) {
+ // In this test we check that
+ // 1. Copy table is not compacted until we want to split it
+ // 2. After borrow compaction both src and dst tables are background compacted
+
+ NDataShard::gDbStatsReportInterval = TDuration::Seconds(1);
+ NDataShard::gDbStatsDataSizeResolution = 10;
+ NDataShard::gDbStatsRowCountResolution = 10;
+
TTestBasicRuntime runtime;
TTestEnv env(runtime);
- runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG);
runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
// in case it is not enabled by default
SetBackgroundCompaction(runtime, env, TTestTxConfig::SchemeShard, true);
+ SetEnableBorrowedSplitCompaction(runtime, env, TTestTxConfig::SchemeShard, true);
auto configRequest = GetTestCompactionConfig();
auto* compactionConfig = configRequest->Record.MutableConfig()->MutableCompactionConfig();
@@ -755,11 +796,14 @@ Y_UNIT_TEST_SUITE(TSchemeshardBorrowedCompactionTest) {
// write to all shards in hacky way
auto simpleInfo = GetPathInfo(runtime, "/MyRoot/Simple");
for (auto shard: simpleInfo.Shards) {
- WriteData(runtime, "Simple", 0, 100, shard);
+ WriteDataSpreadKeys(runtime, "Simple", 100, shard);
}
}
env.SimulateSleep(runtime, TDuration::Seconds(1));
+ auto simpleInfo = GetPathInfo(runtime, "/MyRoot/Simple");
+ UNIT_ASSERT_VALUES_EQUAL(simpleInfo.Shards.size(), 5UL);
+
// copy table
TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "CopyTable"
@@ -769,8 +813,11 @@ Y_UNIT_TEST_SUITE(TSchemeshardBorrowedCompactionTest) {
env.SimulateSleep(runtime, TDuration::Seconds(30));
- auto simpleInfo = GetPathInfo(runtime, "/MyRoot/Simple");
+ simpleInfo = GetPathInfo(runtime, "/MyRoot/Simple");
+ UNIT_ASSERT_VALUES_EQUAL(simpleInfo.Shards.size(), 5UL);
+
auto copyInfo = GetPathInfo(runtime, "/MyRoot/CopyTable");
+ UNIT_ASSERT_VALUES_EQUAL(copyInfo.Shards.size(), 5UL);
// borrow compaction only runs when we split, so nothing should be borrow compacted yet
@@ -798,10 +845,15 @@ Y_UNIT_TEST_SUITE(TSchemeshardBorrowedCompactionTest) {
}
})");
env.TestWaitNotification(runtime, txId);
-
- // schemeshard should get stats from DS to start borrower compactions
env.SimulateSleep(runtime, TDuration::Seconds(30));
+ simpleInfo = GetPathInfo(runtime, "/MyRoot/Simple");
+ UNIT_ASSERT_VALUES_EQUAL(simpleInfo.Shards.size(), 5UL);
+
+ copyInfo = GetPathInfo(runtime, "/MyRoot/CopyTable");
+
+ UNIT_ASSERT(copyInfo.Shards.size() > 5);
+
// should compact all borrowed data (note that background will not compact until then)
{
@@ -842,6 +894,158 @@ Y_UNIT_TEST_SUITE(TSchemeshardBorrowedCompactionTest) {
}
}
+ Y_UNIT_TEST(SchemeshardShouldCompactBorrowedAfterSplitMerge) {
+ // KIKIMR-15632: we want to compact shard right after split, merge.
+ // I.e. we compact borrowed data ASAP except copy table case, when
+ // we don't want to compact at all.
+
+ NDataShard::gDbStatsReportInterval = TDuration::Seconds(1);
+ NDataShard::gDbStatsDataSizeResolution = 10;
+ NDataShard::gDbStatsRowCountResolution = 10;
+
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
+
+ // in case it is not enabled by default
+ SetBackgroundCompaction(runtime, env, TTestTxConfig::SchemeShard, true);
+ SetEnableBorrowedSplitCompaction(runtime, env, TTestTxConfig::SchemeShard, true);
+
+ auto configRequest = GetTestCompactionConfig();
+ auto* compactionConfig = configRequest->Record.MutableConfig()->MutableCompactionConfig();
+ compactionConfig->MutableBorrowedCompactionConfig()->SetInflightLimit(1);
+
+ SetConfig(runtime, TTestTxConfig::SchemeShard, std::move(configRequest));
+
+ ui64 txId = 1000;
+
+ CreateTableWithData(runtime, env, "/MyRoot", "Simple", 1, txId);
+
+ WriteDataSpreadKeys(runtime, "Simple", 1000);
+ env.SimulateSleep(runtime, TDuration::Seconds(2));
+
+ auto simpleInfo = GetPathInfo(runtime, "/MyRoot/Simple");
+ UNIT_ASSERT_VALUES_EQUAL(simpleInfo.Shards.size(), 1UL);
+
+ // borrow compaction only runs when we split, so nothing should be borrow compacted yet
+
+ {
+ for (auto shard: simpleInfo.Shards) {
+ CheckShardNotBorrowedCompacted(runtime, simpleInfo.UserTable, shard, simpleInfo.OwnerId);
+ }
+ }
+
+ // now force split
+
+ TestAlterTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Simple"
+ PartitionConfig {
+ PartitioningPolicy {
+ MinPartitionsCount: 2
+ MaxPartitionsCount: 2
+ SizeToSplit: 1
+ FastSplitSettings {
+ SizeThreshold: 10
+ RowCountThreshold: 10
+ }
+ }
+ })");
+ env.TestWaitNotification(runtime, txId);
+ env.SimulateSleep(runtime, TDuration::Seconds(30));
+
+ while (simpleInfo.Shards.size() < 2) {
+ // schemeshard should get stats from DS to start borrower compactions
+ env.SimulateSleep(runtime, TDuration::Seconds(1));
+
+ simpleInfo = GetPathInfo(runtime, "/MyRoot/Simple");
+ }
+
+ // should compact all borrowed data (note that background will not compact until then)
+
+ {
+ for (auto shard: simpleInfo.Shards) {
+ CheckShardBorrowedCompacted(runtime, simpleInfo.UserTable, shard, simpleInfo.OwnerId);
+ }
+ }
+ }
+
+ Y_UNIT_TEST(SchemeshardShouldNotCompactBorrowedAfterSplitMergeWhenDisabled) {
+
+ NDataShard::gDbStatsReportInterval = TDuration::Seconds(1);
+ NDataShard::gDbStatsDataSizeResolution = 10;
+ NDataShard::gDbStatsRowCountResolution = 10;
+
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
+
+ // in case it is not enabled by default
+ SetBackgroundCompaction(runtime, env, TTestTxConfig::SchemeShard, true);
+ SetEnableBorrowedSplitCompaction(runtime, env, TTestTxConfig::SchemeShard, false);
+
+ auto configRequest = GetTestCompactionConfig();
+ auto* compactionConfig = configRequest->Record.MutableConfig()->MutableCompactionConfig();
+ compactionConfig->MutableBorrowedCompactionConfig()->SetInflightLimit(1);
+
+ SetConfig(runtime, TTestTxConfig::SchemeShard, std::move(configRequest));
+
+ ui64 txId = 1000;
+
+ CreateTableWithData(runtime, env, "/MyRoot", "Simple", 1, txId);
+
+ WriteDataSpreadKeys(runtime, "Simple", 1000);
+ env.SimulateSleep(runtime, TDuration::Seconds(2));
+
+ auto simpleInfo = GetPathInfo(runtime, "/MyRoot/Simple");
+ UNIT_ASSERT_VALUES_EQUAL(simpleInfo.Shards.size(), 1UL);
+
+ // borrow compaction only runs when we split, so nothing should be borrow compacted yet
+
+ {
+ for (auto shard: simpleInfo.Shards) {
+ CheckShardNotBorrowedCompacted(runtime, simpleInfo.UserTable, shard, simpleInfo.OwnerId);
+ }
+ }
+
+ // now force split
+
+ TestAlterTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Simple"
+ PartitionConfig {
+ PartitioningPolicy {
+ MinPartitionsCount: 2
+ MaxPartitionsCount: 2
+ SizeToSplit: 1
+ FastSplitSettings {
+ SizeThreshold: 10
+ RowCountThreshold: 10
+ }
+ }
+ })");
+ env.TestWaitNotification(runtime, txId);
+ env.SimulateSleep(runtime, TDuration::Seconds(30));
+
+ while (simpleInfo.Shards.size() < 2) {
+ // schemeshard should get stats from DS to start borrower compactions
+ env.SimulateSleep(runtime, TDuration::Seconds(1));
+
+ simpleInfo = GetPathInfo(runtime, "/MyRoot/Simple");
+ }
+
+ // should not compact borrowed data
+
+ {
+ for (auto shard: simpleInfo.Shards) {
+ CheckShardNotBorrowedCompacted(runtime, simpleInfo.UserTable, shard, simpleInfo.OwnerId);
+ CheckShardNotBackgroundCompacted(runtime, simpleInfo.UserTable, shard, simpleInfo.OwnerId);
+ }
+ }
+ }
+
Y_UNIT_TEST(SchemeshardShouldHandleBorrowCompactionTimeouts) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);
diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp
index 787a021ba2..6e4f140802 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp
+++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp
@@ -503,6 +503,7 @@ NSchemeShardUT_Private::TTestEnv::TTestEnv(TTestActorRuntime& runtime, const TTe
app.SetEnableProtoSourceIdInfo(opts.EnableProtoSourceIdInfo_);
app.SetEnablePqBilling(opts.EnablePqBilling_);
app.SetEnableBackgroundCompaction(opts.EnableBackgroundCompaction_);
+ app.SetEnableBorrowedSplitCompaction(opts.EnableBorrowedSplitCompaction_);
app.FeatureFlags.SetEnablePublicApiExternalBlobs(true);
app.SetEnableMoveIndex(opts.EnableMoveIndex_);
diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.h b/ydb/core/tx/schemeshard/ut_helpers/test_env.h
index 64aa3ec284..f9effc9b7c 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/test_env.h
+++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.h
@@ -40,6 +40,7 @@ namespace NSchemeShardUT_Private {
OPTION(std::optional<bool>, EnableProtoSourceIdInfo, std::nullopt);
OPTION(std::optional<bool>, EnablePqBilling, std::nullopt);
OPTION(std::optional<bool>, EnableBackgroundCompaction, std::nullopt);
+ OPTION(std::optional<bool>, EnableBorrowedSplitCompaction, std::nullopt);
OPTION(std::optional<bool>, DisableStatsBatching, std::nullopt);
OPTION(THashSet<TString>, SystemBackupSIDs, {});
OPTION(std::optional<bool>, EnableMoveIndex, std::nullopt);