diff options
author | eivanov89 <eivanov89@ydb.tech> | 2022-11-11 00:02:38 +0300 |
---|---|---|
committer | eivanov89 <eivanov89@ydb.tech> | 2022-11-11 00:02:38 +0300 |
commit | 80da9a26ff9dfef3939e054f3e02dd45a9185dd2 (patch) | |
tree | 7f565eddb2854bd2c2bc7de03f99486b74382b15 | |
parent | a2ffc87e9209ab37cff394ceef9d1d857c93a5d0 (diff) | |
download | ydb-80da9a26ff9dfef3939e054f3e02dd45a9185dd2.tar.gz |
compact borrowed data right after split/merge
-rw-r--r-- | ydb/core/base/feature_flags.h | 4 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 1 | ||||
-rw-r--r-- | ydb/core/protos/tx_datashard.proto | 2 | ||||
-rw-r--r-- | ydb/core/testlib/basics/feature_flags.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 3 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__table_stats.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_impl.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_impl.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_compaction.cpp | 218 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_helpers/test_env.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_helpers/test_env.h | 1 |
11 files changed, 231 insertions, 7 deletions
diff --git a/ydb/core/base/feature_flags.h b/ydb/core/base/feature_flags.h index d733e8abea..8a07d1c348 100644 --- a/ydb/core/base/feature_flags.h +++ b/ydb/core/base/feature_flags.h @@ -36,6 +36,10 @@ public: SetEnableBackgroundCompactionServerless(value); } + inline void SetEnableBorrowedSplitCompactionForTest(bool value) { + SetEnableBorrowedSplitCompaction(value); + } + inline void SetEnableMvccForTest(bool value) { SetEnableMvcc(value ? NKikimrConfig::TFeatureFlags::VALUE_TRUE diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index ad86a33996..5d5777e139 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -741,6 +741,7 @@ message TFeatureFlags { optional bool EnableNotNullDataColumns = 73 [default = false]; optional bool EnableGrpcAudit = 74 [default = false]; optional bool EnableKqpDataQueryStreamLookup = 75 [default = false]; + optional bool EnableBorrowedSplitCompaction = 76 [default = true]; } diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto index d415560d6e..b47a346fa9 100644 --- a/ydb/core/protos/tx_datashard.proto +++ b/ydb/core/protos/tx_datashard.proto @@ -789,6 +789,8 @@ message TEvPeriodicTableStats { optional uint64 StartTime = 11; // milliseconds since epoch optional uint64 TableOwnerId = 12; + + optional bool IsDstSplit = 13; } message TEvS3ListingRequest { diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h index 661a5fd29b..2886f7baed 100644 --- a/ydb/core/testlib/basics/feature_flags.h +++ b/ydb/core/testlib/basics/feature_flags.h @@ -29,6 +29,7 @@ public: FEATURE_FLAG_SETTER(EnableMvccSnapshotReads) FEATURE_FLAG_SETTER(EnableBackgroundCompaction) FEATURE_FLAG_SETTER(EnableBackgroundCompactionServerless) + FEATURE_FLAG_SETTER(EnableBorrowedSplitCompaction) FEATURE_FLAG_SETTER(EnableNotNullColumns) FEATURE_FLAG_SETTER(EnableBulkUpsertToAsyncIndexedTables) FEATURE_FLAG_SETTER(EnableChangefeeds) diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index e76cbd4abd..0f270c7928 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -2800,6 +2800,9 @@ protected: ev->Record.SetNodeId(ctx.ExecutorThread.ActorSystem->NodeId); ev->Record.SetStartTime(StartTime().MilliSeconds()); + if (DstSplitDescription) + ev->Record.SetIsDstSplit(true); + NTabletPipe::SendData(ctx, DbStatsReportPipe, ev.Release()); } diff --git a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp index da85500f5a..8b9cdada93 100644 --- a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp @@ -260,6 +260,10 @@ bool TTxStorePartitionStats::PersistSingleStats(TTransactionContext& txc, const if (!newStats.HasBorrowedData) { Self->RemoveBorrowedCompaction(shardIdx); + } else if (Self->EnableBorrowedSplitCompaction && rec.GetIsDstSplit()) { + // note that we want to compact only shards originating + // from split/merge and not shards created via copytable + Self->EnqueueBorrowedCompaction(shardIdx); } if (!table->IsBackup && !table->IsShardsStatsDetached()) { diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 66032109e2..3777d4c25f 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -3905,6 +3905,7 @@ void TSchemeShard::OnActivateExecutor(const TActorContext &ctx) { EnableBackgroundCompaction = appData->FeatureFlags.GetEnableBackgroundCompaction(); EnableBackgroundCompactionServerless = appData->FeatureFlags.GetEnableBackgroundCompactionServerless(); + EnableBorrowedSplitCompaction = appData->FeatureFlags.GetEnableBorrowedSplitCompaction(); EnableMoveIndex = appData->FeatureFlags.GetEnableMoveIndex(); ConfigureCompactionQueues(appData->CompactionConfig, ctx); @@ -6266,6 +6267,7 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TFeatureFlags& featu EnableBackgroundCompaction = featureFlags.GetEnableBackgroundCompaction(); EnableBackgroundCompactionServerless = featureFlags.GetEnableBackgroundCompactionServerless(); + EnableBorrowedSplitCompaction = featureFlags.GetEnableBorrowedSplitCompaction(); EnableMoveIndex = featureFlags.GetEnableMoveIndex(); } diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 572cb913bb..da52c6b7e7 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -252,6 +252,7 @@ public: THashSet<TShardIdx> ShardsWithLoaned; // shards have parts loaned to another shards bool EnableBackgroundCompaction = false; bool EnableBackgroundCompactionServerless = false; + bool EnableBorrowedSplitCompaction = false; bool EnableMoveIndex = false; TShardDeleter ShardDeleter; diff --git a/ydb/core/tx/schemeshard/ut_compaction.cpp b/ydb/core/tx/schemeshard/ut_compaction.cpp index f80f73621e..b18b6e8d0c 100644 --- a/ydb/core/tx/schemeshard/ut_compaction.cpp +++ b/ydb/core/tx/schemeshard/ut_compaction.cpp @@ -84,7 +84,7 @@ void WriteData( TString writeQuery = Sprintf(R"( ( (let key '( '('key (Uint64 '%lu)) ) ) - (let value '('('value (Utf8 'MostMeaninglessValueInTheWorld)) ) ) + (let value '('('value (Utf8 'MostMeaninglessValueInTheWorldButMaybeItIsSizeMeaningFullThusItIsMostMeaningFullValueInTheWorldOfMeaninglessFullness)) ) ) (return (AsList (UpdateRow '__user__%s key value) )) ) )", key, tableName); @@ -100,6 +100,32 @@ void WriteData( } } +void WriteDataSpreadKeys( + TTestActorRuntime &runtime, + const char* name, + ui64 rowCount, + ui64 tabletId = TTestTxConfig::FakeHiveTablets) +{ + auto fnWriteRow = [&] (ui64 tabletId, ui64 key, const char* tableName) { + TString writeQuery = Sprintf(R"( + ( + (let key '( '('key (Uint64 '%lu)) ) ) + (let value '('('value (Utf8 'MostMeaninglessValueInTheWorldButMaybeItIsSizeMeaningFullThusItIsMostMeaningFullValueInTheWorldOfMeaninglessFullness)) ) ) + (return (AsList (UpdateRow '__user__%s key value) )) + ) + )", key, tableName); + NKikimrMiniKQL::TResult result; + TString err; + NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, writeQuery, result, err); + UNIT_ASSERT_VALUES_EQUAL(err, ""); + UNIT_ASSERT_VALUES_EQUAL(status, NKikimrProto::EReplyStatus::OK);; + }; + + for (ui64 key = 0; key < rowCount; ++key) { + fnWriteRow(tabletId, key * 1'000'000, name); + } +} + void CreateTableWithData( TTestActorRuntime &runtime, TTestEnv& env, @@ -172,6 +198,12 @@ void SetBackgroundCompaction(TTestActorRuntime &runtime, TTestEnv& env, ui64 sch SetFeatures(runtime, env, schemeShard, features); } +void SetEnableBorrowedSplitCompaction(TTestActorRuntime &runtime, TTestEnv& env, ui64 schemeShard, bool value) { + NKikimrConfig::TFeatureFlags features; + features.SetEnableBorrowedSplitCompaction(value); + SetFeatures(runtime, env, schemeShard, features); +} + void DisableBackgroundCompactionViaRestart( TTestActorRuntime& runtime, TTestEnv&, @@ -731,15 +763,24 @@ Y_UNIT_TEST_SUITE(TSchemeshardBackgroundCompactionTest) { }; Y_UNIT_TEST_SUITE(TSchemeshardBorrowedCompactionTest) { - Y_UNIT_TEST(SchemeshardShouldCompactBorrowed) { + Y_UNIT_TEST(SchemeshardShouldCompactBorrowedBeforeSplit) { + // In this test we check that + // 1. Copy table is not compacted until we want to split it + // 2. After borrow compaction both src and dst tables are background compacted + + NDataShard::gDbStatsReportInterval = TDuration::Seconds(1); + NDataShard::gDbStatsDataSizeResolution = 10; + NDataShard::gDbStatsRowCountResolution = 10; + TTestBasicRuntime runtime; TTestEnv env(runtime); - runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG); runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); // in case it is not enabled by default SetBackgroundCompaction(runtime, env, TTestTxConfig::SchemeShard, true); + SetEnableBorrowedSplitCompaction(runtime, env, TTestTxConfig::SchemeShard, true); auto configRequest = GetTestCompactionConfig(); auto* compactionConfig = configRequest->Record.MutableConfig()->MutableCompactionConfig(); @@ -755,11 +796,14 @@ Y_UNIT_TEST_SUITE(TSchemeshardBorrowedCompactionTest) { // write to all shards in hacky way auto simpleInfo = GetPathInfo(runtime, "/MyRoot/Simple"); for (auto shard: simpleInfo.Shards) { - WriteData(runtime, "Simple", 0, 100, shard); + WriteDataSpreadKeys(runtime, "Simple", 100, shard); } } env.SimulateSleep(runtime, TDuration::Seconds(1)); + auto simpleInfo = GetPathInfo(runtime, "/MyRoot/Simple"); + UNIT_ASSERT_VALUES_EQUAL(simpleInfo.Shards.size(), 5UL); + // copy table TestCreateTable(runtime, ++txId, "/MyRoot", R"( Name: "CopyTable" @@ -769,8 +813,11 @@ Y_UNIT_TEST_SUITE(TSchemeshardBorrowedCompactionTest) { env.SimulateSleep(runtime, TDuration::Seconds(30)); - auto simpleInfo = GetPathInfo(runtime, "/MyRoot/Simple"); + simpleInfo = GetPathInfo(runtime, "/MyRoot/Simple"); + UNIT_ASSERT_VALUES_EQUAL(simpleInfo.Shards.size(), 5UL); + auto copyInfo = GetPathInfo(runtime, "/MyRoot/CopyTable"); + UNIT_ASSERT_VALUES_EQUAL(copyInfo.Shards.size(), 5UL); // borrow compaction only runs when we split, so nothing should be borrow compacted yet @@ -798,10 +845,15 @@ Y_UNIT_TEST_SUITE(TSchemeshardBorrowedCompactionTest) { } })"); env.TestWaitNotification(runtime, txId); - - // schemeshard should get stats from DS to start borrower compactions env.SimulateSleep(runtime, TDuration::Seconds(30)); + simpleInfo = GetPathInfo(runtime, "/MyRoot/Simple"); + UNIT_ASSERT_VALUES_EQUAL(simpleInfo.Shards.size(), 5UL); + + copyInfo = GetPathInfo(runtime, "/MyRoot/CopyTable"); + + UNIT_ASSERT(copyInfo.Shards.size() > 5); + // should compact all borrowed data (note that background will not compact until then) { @@ -842,6 +894,158 @@ Y_UNIT_TEST_SUITE(TSchemeshardBorrowedCompactionTest) { } } + Y_UNIT_TEST(SchemeshardShouldCompactBorrowedAfterSplitMerge) { + // KIKIMR-15632: we want to compact shard right after split, merge. + // I.e. we compact borrowed data ASAP except copy table case, when + // we don't want to compact at all. + + NDataShard::gDbStatsReportInterval = TDuration::Seconds(1); + NDataShard::gDbStatsDataSizeResolution = 10; + NDataShard::gDbStatsRowCountResolution = 10; + + TTestBasicRuntime runtime; + TTestEnv env(runtime); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); + + // in case it is not enabled by default + SetBackgroundCompaction(runtime, env, TTestTxConfig::SchemeShard, true); + SetEnableBorrowedSplitCompaction(runtime, env, TTestTxConfig::SchemeShard, true); + + auto configRequest = GetTestCompactionConfig(); + auto* compactionConfig = configRequest->Record.MutableConfig()->MutableCompactionConfig(); + compactionConfig->MutableBorrowedCompactionConfig()->SetInflightLimit(1); + + SetConfig(runtime, TTestTxConfig::SchemeShard, std::move(configRequest)); + + ui64 txId = 1000; + + CreateTableWithData(runtime, env, "/MyRoot", "Simple", 1, txId); + + WriteDataSpreadKeys(runtime, "Simple", 1000); + env.SimulateSleep(runtime, TDuration::Seconds(2)); + + auto simpleInfo = GetPathInfo(runtime, "/MyRoot/Simple"); + UNIT_ASSERT_VALUES_EQUAL(simpleInfo.Shards.size(), 1UL); + + // borrow compaction only runs when we split, so nothing should be borrow compacted yet + + { + for (auto shard: simpleInfo.Shards) { + CheckShardNotBorrowedCompacted(runtime, simpleInfo.UserTable, shard, simpleInfo.OwnerId); + } + } + + // now force split + + TestAlterTable(runtime, ++txId, "/MyRoot", R"( + Name: "Simple" + PartitionConfig { + PartitioningPolicy { + MinPartitionsCount: 2 + MaxPartitionsCount: 2 + SizeToSplit: 1 + FastSplitSettings { + SizeThreshold: 10 + RowCountThreshold: 10 + } + } + })"); + env.TestWaitNotification(runtime, txId); + env.SimulateSleep(runtime, TDuration::Seconds(30)); + + while (simpleInfo.Shards.size() < 2) { + // schemeshard should get stats from DS to start borrower compactions + env.SimulateSleep(runtime, TDuration::Seconds(1)); + + simpleInfo = GetPathInfo(runtime, "/MyRoot/Simple"); + } + + // should compact all borrowed data (note that background will not compact until then) + + { + for (auto shard: simpleInfo.Shards) { + CheckShardBorrowedCompacted(runtime, simpleInfo.UserTable, shard, simpleInfo.OwnerId); + } + } + } + + Y_UNIT_TEST(SchemeshardShouldNotCompactBorrowedAfterSplitMergeWhenDisabled) { + + NDataShard::gDbStatsReportInterval = TDuration::Seconds(1); + NDataShard::gDbStatsDataSizeResolution = 10; + NDataShard::gDbStatsRowCountResolution = 10; + + TTestBasicRuntime runtime; + TTestEnv env(runtime); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE); + + // in case it is not enabled by default + SetBackgroundCompaction(runtime, env, TTestTxConfig::SchemeShard, true); + SetEnableBorrowedSplitCompaction(runtime, env, TTestTxConfig::SchemeShard, false); + + auto configRequest = GetTestCompactionConfig(); + auto* compactionConfig = configRequest->Record.MutableConfig()->MutableCompactionConfig(); + compactionConfig->MutableBorrowedCompactionConfig()->SetInflightLimit(1); + + SetConfig(runtime, TTestTxConfig::SchemeShard, std::move(configRequest)); + + ui64 txId = 1000; + + CreateTableWithData(runtime, env, "/MyRoot", "Simple", 1, txId); + + WriteDataSpreadKeys(runtime, "Simple", 1000); + env.SimulateSleep(runtime, TDuration::Seconds(2)); + + auto simpleInfo = GetPathInfo(runtime, "/MyRoot/Simple"); + UNIT_ASSERT_VALUES_EQUAL(simpleInfo.Shards.size(), 1UL); + + // borrow compaction only runs when we split, so nothing should be borrow compacted yet + + { + for (auto shard: simpleInfo.Shards) { + CheckShardNotBorrowedCompacted(runtime, simpleInfo.UserTable, shard, simpleInfo.OwnerId); + } + } + + // now force split + + TestAlterTable(runtime, ++txId, "/MyRoot", R"( + Name: "Simple" + PartitionConfig { + PartitioningPolicy { + MinPartitionsCount: 2 + MaxPartitionsCount: 2 + SizeToSplit: 1 + FastSplitSettings { + SizeThreshold: 10 + RowCountThreshold: 10 + } + } + })"); + env.TestWaitNotification(runtime, txId); + env.SimulateSleep(runtime, TDuration::Seconds(30)); + + while (simpleInfo.Shards.size() < 2) { + // schemeshard should get stats from DS to start borrower compactions + env.SimulateSleep(runtime, TDuration::Seconds(1)); + + simpleInfo = GetPathInfo(runtime, "/MyRoot/Simple"); + } + + // should not compact borrowed data + + { + for (auto shard: simpleInfo.Shards) { + CheckShardNotBorrowedCompacted(runtime, simpleInfo.UserTable, shard, simpleInfo.OwnerId); + CheckShardNotBackgroundCompacted(runtime, simpleInfo.UserTable, shard, simpleInfo.OwnerId); + } + } + } + Y_UNIT_TEST(SchemeshardShouldHandleBorrowCompactionTimeouts) { TTestBasicRuntime runtime; TTestEnv env(runtime); diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp index 787a021ba2..6e4f140802 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp @@ -503,6 +503,7 @@ NSchemeShardUT_Private::TTestEnv::TTestEnv(TTestActorRuntime& runtime, const TTe app.SetEnableProtoSourceIdInfo(opts.EnableProtoSourceIdInfo_); app.SetEnablePqBilling(opts.EnablePqBilling_); app.SetEnableBackgroundCompaction(opts.EnableBackgroundCompaction_); + app.SetEnableBorrowedSplitCompaction(opts.EnableBorrowedSplitCompaction_); app.FeatureFlags.SetEnablePublicApiExternalBlobs(true); app.SetEnableMoveIndex(opts.EnableMoveIndex_); diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.h b/ydb/core/tx/schemeshard/ut_helpers/test_env.h index 64aa3ec284..f9effc9b7c 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/test_env.h +++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.h @@ -40,6 +40,7 @@ namespace NSchemeShardUT_Private { OPTION(std::optional<bool>, EnableProtoSourceIdInfo, std::nullopt); OPTION(std::optional<bool>, EnablePqBilling, std::nullopt); OPTION(std::optional<bool>, EnableBackgroundCompaction, std::nullopt); + OPTION(std::optional<bool>, EnableBorrowedSplitCompaction, std::nullopt); OPTION(std::optional<bool>, DisableStatsBatching, std::nullopt); OPTION(THashSet<TString>, SystemBackupSIDs, {}); OPTION(std::optional<bool>, EnableMoveIndex, std::nullopt); |