aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsvc <svc@yandex-team.ru>2022-04-18 18:40:16 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-04-18 18:40:16 +0300
commit9fc8a71d4ba1e90db7d0ccae943793e428a5aa7b (patch)
tree8cb95ccd4d3e7fff53d8c73d7da180b6fe43c9be
parent9da6203614b2d40070ace14a3f99ac3a969ad0d1 (diff)
downloadydb-9fc8a71d4ba1e90db7d0ccae943793e428a5aa7b.tar.gz
KIKIMR-14636 split after move
REVIEW: 2433139 REVIEW: 2436301 x-ydb-stable-ref: 5eaa2d8a95381eb10a74e2f1ae5abde9d0a7e0b1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.cpp16
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.h3
-rw-r--r--ydb/services/ydb/ydb_table_split_ut.cpp173
5 files changed, 187 insertions, 8 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp
index fc39ce04ba..019069b16d 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp
@@ -228,6 +228,7 @@ public:
Y_VERIFY(context.SS->Tables.contains(srcPath.Base()->PathId));
TTableInfo::TPtr tableInfo = new TTableInfo(*context.SS->Tables.at(srcPath.Base()->PathId));
+ tableInfo->ResetDescriptionCache();
tableInfo->AlterVersion += 1;
// copy table info
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index d08ebf339a..b2c8a9b4fd 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -5652,7 +5652,7 @@ void TSchemeShard::FillTableDescriptionForShardIdx(
const TTableInfo::TPtr tinfo = Tables.at(tableId);
TPathElement::TPtr pinfo = *PathsById.FindPtr(tableId);
- TVector<ui32> keyColumnIds = tinfo->FillDescription(pinfo);
+ TVector<ui32> keyColumnIds = tinfo->FillDescriptionCache(pinfo);
if (!tinfo->TableDescription.HasPath()) {
tinfo->TableDescription.SetPath(PathToString(pinfo));
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
index 67c209da1e..b5a9000841 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
@@ -276,7 +276,15 @@ TTableInfo::TAlterDataPtr TTableInfo::CreateAlterData(
return alterData;
}
-TVector<ui32> TTableInfo::FillDescription(TPathElement::TPtr pathInfo) {
+void TTableInfo::ResetDescriptionCache() {
+ TableDescription.ClearId_Deprecated();
+ TableDescription.ClearPathId();
+ TableDescription.ClearName();
+ TableDescription.ClearColumns();
+ TableDescription.ClearKeyColumnIds();
+}
+
+TVector<ui32> TTableInfo::FillDescriptionCache(TPathElement::TPtr pathInfo) {
Y_VERIFY(pathInfo && pathInfo->IsTable());
TVector<ui32> keyColumnIds;
@@ -1197,11 +1205,7 @@ void TTableInfo::FinishAlter() {
}
// Force FillDescription to regenerate TableDescription
- TableDescription.ClearId_Deprecated();
- TableDescription.ClearPathId();
- TableDescription.ClearName();
- TableDescription.ClearColumns();
- TableDescription.ClearKeyColumnIds();
+ ResetDescriptionCache();
AlterData.Reset();
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h
index 7e3c1a4a5d..f68d915b41 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h
@@ -479,7 +479,8 @@ public:
}
}
- TVector<ui32> FillDescription(TPathElement::TPtr pathInfo);
+ void ResetDescriptionCache();
+ TVector<ui32> FillDescriptionCache(TPathElement::TPtr pathInfo);
void SetRoom(const TStorageRoom& room) {
// WARNING: this is legacy support code
diff --git a/ydb/services/ydb/ydb_table_split_ut.cpp b/ydb/services/ydb/ydb_table_split_ut.cpp
index f492820f7a..401cc36dd2 100644
--- a/ydb/services/ydb/ydb_table_split_ut.cpp
+++ b/ydb/services/ydb/ydb_table_split_ut.cpp
@@ -1,6 +1,7 @@
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/public/sdk/cpp/client/ydb_params/params.h>
#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h>
+#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/core/client/flat_ut_client.h>
@@ -419,4 +420,176 @@ Y_UNIT_TEST_SUITE(YdbTableSplit) {
}
UNIT_ASSERT_C(shardsAfter < shardsBefore, "Merge didn't happen!!11 O_O");
}
+
+ Y_UNIT_TEST(RenameTablesAndSplit) {
+ // KIKIMR-14636
+
+ NDataShard::gDbStatsReportInterval = TDuration::Seconds(2);
+ NDataShard::gDbStatsDataSizeResolution = 10;
+ NDataShard::gDbStatsRowCountResolution = 10;
+
+ TIntrusivePtr<ITimeProvider> originalTimeProvider = NKikimr::TAppData::TimeProvider;
+ TIntrusivePtr<TTestTimeProvider> testTimeProvider = new TTestTimeProvider(originalTimeProvider);
+ NKikimr::TAppData::TimeProvider = testTimeProvider;
+
+ TKikimrWithGrpcAndRootSchemaNoSystemViews server;
+ server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_NOTICE);
+ server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::GRPC_SERVER, NActors::NLog::PRI_NOTICE);
+ server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_PROXY, NActors::NLog::PRI_NOTICE);
+
+ auto connection = NYdb::TDriver(
+ TDriverConfig()
+ .SetEndpoint(TStringBuilder() << "localhost:" << server.GetPort()));
+
+ NYdb::NTable::TTableClient client(connection);
+
+ auto sessionResult = client.CreateSession().ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(sessionResult.GetStatus(), EStatus::SUCCESS);
+ auto session = sessionResult.GetSession();
+
+ {
+ auto query = TStringBuilder() << R"(
+ --!syntax_v1
+ CREATE TABLE `/Root/Foo` (
+ NameHash Uint32,
+ Name Utf8,
+ Version Uint32,
+ `Timestamp` Int64,
+ Data String,
+ PRIMARY KEY (NameHash, Name)
+ ) WITH ( UNIFORM_PARTITIONS = 2 );)";
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+
+ Cerr << result.GetIssues().ToString();
+ UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+ }
+
+ { // prepare for split
+ auto query = TStringBuilder() << R"(
+ --!syntax_v1
+ ALTER TABLE `/Root/Foo`
+ SET (
+ AUTO_PARTITIONING_BY_SIZE = ENABLED,
+ AUTO_PARTITIONING_PARTITION_SIZE_MB = 1,
+ AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1,
+ AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = 10
+ );)";
+
+ auto result = session.ExecuteSchemeQuery(query).ExtractValueSync();
+
+ Cerr << result.GetIssues().ToString();
+ UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+ }
+
+ ui64 partitions = 2;
+ do { // wait until merge
+ Cerr << "Fast forward 1m" << Endl;
+ testTimeProvider->AddShift(TDuration::Minutes(2));
+ Sleep(TDuration::Seconds(3));
+
+ auto result = session.DescribeTable("/Root/Foo", NYdb::NTable::TDescribeTableSettings().WithTableStatistics(true)).ExtractValueSync();
+ UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+
+ partitions = result.GetTableDescription().GetPartitionsCount();
+ Cerr << "partitions " << partitions << Endl;
+
+ } while (partitions == 2);
+
+ { //rename
+ auto result = session.RenameTables({{"/Root/Foo", "/Root/Bar"}}).ExtractValueSync();
+ UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+ }
+
+ { // add data for triger split
+ int key = 0;
+ for (int i = 0 ; i < 100; ++i) {
+ TValueBuilder rows;
+ rows.BeginList();
+ for (int j = 0; j < 500; ++j) {
+ key += 1;
+ TString name = "key " + ToString(key);
+
+ rows.AddListItem()
+ .BeginStruct()
+ .AddMember("NameHash").Uint32(MurmurHash<ui32>(name.data(), name.size()))
+ .AddMember("Name").Utf8(name)
+ .AddMember("Version").Uint32(key%5)
+ .AddMember("Timestamp").Int64(key%10)
+ .EndStruct();
+ }
+ rows.EndList();
+
+ auto result = client.BulkUpsert("/Root/Bar", rows.Build()).ExtractValueSync();
+
+ if (!result.IsSuccess() && result.GetStatus() != NYdb::EStatus::OVERLOADED) {
+ TString err = result.GetIssues().ToString();
+ Cerr << result.GetStatus() << ": " << err << Endl;
+ }
+ UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
+ }
+ }
+
+ server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::TX_DATASHARD, NActors::NLog::PRI_DEBUG);
+ server.Server_->GetRuntime()->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_DEBUG);
+
+ partitions = 1;
+ do { // wait until split
+ Cerr << "Fast forward 1m" << Endl;
+ testTimeProvider->AddShift(TDuration::Minutes(1));
+ Sleep(TDuration::Seconds(3));
+
+ auto result = session.DescribeTable("/Root/Bar", NYdb::NTable::TDescribeTableSettings().WithTableStatistics(true)).ExtractValueSync();
+ UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+
+ partitions = result.GetTableDescription().GetPartitionsCount();
+ Cerr << "partitions " << partitions << Endl;
+
+ } while (partitions == 1);
+
+
+ { // fail if shema has been broken
+ TString readQuery =
+ "SELECT * FROM `/Root/Bar`;";
+
+ TExecDataQuerySettings querySettings;
+ querySettings.KeepInQueryCache(true);
+
+ auto result = session.ExecuteDataQuery(
+ readQuery,
+ TTxControl::BeginTx().CommitTx(),
+ querySettings)
+ .ExtractValueSync();
+
+ if (!result.IsSuccess() && result.GetStatus() != NYdb::EStatus::OVERLOADED) {
+ TString err = result.GetIssues().ToString();
+ Cerr << result.GetStatus() << ": " << err << Endl;
+ }
+ UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+ }
+
+ {
+ auto asyncDescDir = NYdb::NScheme::TSchemeClient(connection).ListDirectory("/Root");
+ asyncDescDir.Wait();
+ const auto& val = asyncDescDir.GetValue();
+ auto entry = val.GetEntry();
+ UNIT_ASSERT_EQUAL(entry.Name, "Root");
+ UNIT_ASSERT_EQUAL(entry.Type, NYdb::NScheme::ESchemeEntryType::Directory);
+
+ auto children = val.GetChildren();
+ UNIT_ASSERT_EQUAL_C(children.size(), 1, children.size());
+ for (const auto& child: children) {
+ UNIT_ASSERT_EQUAL(child.Type, NYdb::NScheme::ESchemeEntryType::Table);
+
+ auto result = session.DropTable(TStringBuilder() << "Root" << "/" << child.Name).ExtractValueSync();
+ UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetStatus());
+ }
+ }
+ }
}