aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2022-11-07 12:35:38 +0300
committerchertus <azuikov@ydb.tech>2022-11-07 12:35:38 +0300
commitf570e9e1e98392e10ddfe79909f0ad949f9dec8f (patch)
treefc67d83d75a302df74bf5748e54f0fe3302fae3c
parent4440f803dff303fe38644369105d264fe848946c (diff)
downloadydb-f570e9e1e98392e10ddfe79909f0ad949f9dec8f.tar.gz
CREATE/DROP standalone column table
-rw-r--r--ydb/core/kqp/kqp_ic_gateway.cpp6
-rw-r--r--ydb/core/kqp/ut/kqp_scheme_ut.cpp8
-rw-r--r--ydb/core/protos/flat_scheme_op.proto3
-rw-r--r--ydb/core/protos/tx_columnshard.proto14
-rw-r--r--ydb/core/tx/columnshard/columnshard.cpp4
-rw-r--r--ydb/core/tx/columnshard/columnshard__init.cpp25
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp100
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h14
-rw-r--r--ydb/core/tx/columnshard/columnshard_schema.h4
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.h75
-rw-r--r--ydb/core/tx/columnshard/ut_columnshard_read_write.cpp99
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__init.cpp31
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp14
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_common.h9
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp108
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp712
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp27
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_store.cpp16
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp136
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp38
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.h2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.cpp153
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.h16
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_path_describer.cpp5
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_schema.h6
-rw-r--r--ydb/core/tx/schemeshard/ut_olap.cpp35
-rw-r--r--ydb/core/tx/schemeshard/ut_olap_reboots.cpp154
-rw-r--r--ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema16
29 files changed, 1263 insertions, 569 deletions
diff --git a/ydb/core/kqp/kqp_ic_gateway.cpp b/ydb/core/kqp/kqp_ic_gateway.cpp
index 7a407992e0e..5a4a5e737b9 100644
--- a/ydb/core/kqp/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/kqp_ic_gateway.cpp
@@ -2135,6 +2135,8 @@ private:
for (const auto& keyColumn : metadata.KeyColumnNames) {
schema.AddKeyColumnNames(keyColumn);
}
+
+ schema.SetEngine(NKikimrSchemeOp::EColumnTableEngine::COLUMN_ENGINE_REPLACING_TIMESERIES);
}
static bool CheckLoadTableMetadataStatus(ui32 status, const TString& reason,
@@ -2487,7 +2489,9 @@ private:
static bool FillCreateColumnTableDesc(NYql::TKikimrTableMetadataPtr metadata,
NKikimrSchemeOp::TColumnTableDescription& tableDesc, Ydb::StatusIds::StatusCode& code, TString& error)
{
- tableDesc.SetSchemaPresetName("default"); // TODO: CREATE TABLE without TABLESTORE needs schema
+ if (metadata->Columns.empty()) {
+ tableDesc.SetSchemaPresetName("default");
+ }
// TODO: not first PK column
if (metadata->KeyColumnNames.empty()) {
diff --git a/ydb/core/kqp/ut/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/kqp_scheme_ut.cpp
index d20202bf59f..0775f128245 100644
--- a/ydb/core/kqp/ut/kqp_scheme_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_scheme_ut.cpp
@@ -3042,7 +3042,6 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
-#if 0 // TODO
TString tableName = "/Root/TableStoreTest/ColumnTableTest";
auto query2 = TStringBuilder() << R"(
--!syntax_v1
@@ -3058,19 +3057,18 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
);)";
result = session.ExecuteSchemeQuery(query2).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
-
+#if 0 // TODO
auto query3 = TStringBuilder() << R"(
--!syntax_v1
ALTER TABLE `)" << tableName << R"(`;)";
result = session.ExecuteSchemeQuery(query3).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
-
+#endif
auto query4 = TStringBuilder() << R"(
--!syntax_v1
DROP TABLE `)" << tableName << R"(`;)";
result = session.ExecuteSchemeQuery(query4).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
-#endif
auto query5 = TStringBuilder() << R"(
--!syntax_v1
@@ -3079,7 +3077,6 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
-#if 0 // TODO
Y_UNIT_TEST(CreateDropColumnTable) {
TKikimrRunner kikimr;
auto db = kikimr.GetTableClient();
@@ -3106,7 +3103,6 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
result = session.ExecuteSchemeQuery(query2).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
-#endif
Y_UNIT_TEST(CreateDropColumnTableNegative) {
TKikimrRunner kikimr;
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index f5e3384bab4..e8ffb753a6b 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -646,6 +646,9 @@ message TColumnTableDescription {
// Internal fields that make sure versions always increase when presets are switched
optional uint64 SchemaPresetVersionAdj = 11;
optional uint64 TtlSettingsPresetVersionAdj = 12;
+
+ // Channels for standalone column table
+ optional TColumnStorageConfig StorageConfig = 13;
}
message TAlterColumnTable {
diff --git a/ydb/core/protos/tx_columnshard.proto b/ydb/core/protos/tx_columnshard.proto
index 92514e6a920..e503da4984c 100644
--- a/ydb/core/protos/tx_columnshard.proto
+++ b/ydb/core/protos/tx_columnshard.proto
@@ -159,13 +159,6 @@ message TCommitTxBody {
repeated uint64 WriteIds = 2;
}
-message TInitShard {
- optional uint32 DataChannelCount = 1;
- optional uint64 StorePathId = 2;
- optional uint64 TablePathId = 3;
- optional string OwnerPath = 4; // tablestore or table path
-}
-
message TSchemaPresetVersionInfo {
optional uint64 Id = 1;
optional uint64 SinceStep = 2;
@@ -202,6 +195,13 @@ message TCreateTable {
optional uint64 TtlSettingsPresetVersionAdj = 7;
}
+message TInitShard {
+ optional uint32 DataChannelCount = 1;
+ optional uint64 OwnerPathId = 2;
+ repeated TCreateTable Tables = 3;
+ optional string OwnerPath = 4;
+}
+
message TAlterTable {
optional uint64 PathId = 1;
optional NKikimrSchemeOp.TAlterColumnTable AlterBody = 2;
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp
index 67aedff28cb..4af0bf3dd85 100644
--- a/ydb/core/tx/columnshard/columnshard.cpp
+++ b/ydb/core/tx/columnshard/columnshard.cpp
@@ -276,7 +276,7 @@ void TColumnShard::UpdateResourceMetrics(const TActorContext& ctx, const TUsage&
}
void TColumnShard::SendPeriodicStats() {
- if (!CurrentSchemeShardId || !StorePathId) {
+ if (!CurrentSchemeShardId || !OwnerPathId) {
LOG_S_DEBUG("Disabled periodic stats at tablet " << TabletID());
return;
}
@@ -294,7 +294,7 @@ void TColumnShard::SendPeriodicStats() {
StatsReportPipe = ctx.Register(NTabletPipe::CreateClient(ctx.SelfID, CurrentSchemeShardId, clientConfig));
}
- auto ev = std::make_unique<TEvDataShard::TEvPeriodicTableStats>(TabletID(), StorePathId);
+ auto ev = std::make_unique<TEvDataShard::TEvPeriodicTableStats>(TabletID(), OwnerPathId);
{
ev->Record.SetShardState(2); // NKikimrTxDataShard.EDatashardState.Ready
ev->Record.SetGeneration(Executor()->Generation());
diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp
index 6683260c142..b0f3882c6ad 100644
--- a/ydb/core/tx/columnshard/columnshard__init.cpp
+++ b/ydb/core/tx/columnshard/columnshard__init.cpp
@@ -34,7 +34,7 @@ void TTxInit::SetDefaults() {
Self->LastWriteId = TWriteId{0};
Self->LastPlannedStep = 0;
Self->LastPlannedTxId = 0;
- Self->StorePathId = 0;
+ Self->OwnerPathId = 0;
Self->OwnerPath.clear();
Self->BasicTxInfo.clear();
Self->DeadlineQueue.clear();
@@ -79,7 +79,7 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::LastPlannedStep, Self->LastPlannedStep);
ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::LastPlannedTxId, Self->LastPlannedTxId);
ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::LastExportNumber, Self->LastExportNo);
- ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::StorePathId, Self->StorePathId);
+ ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::OwnerPathId, Self->OwnerPathId);
ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::OwnerPath, Self->OwnerPath);
if (!ready)
@@ -139,6 +139,7 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
// Primary index defaut schema and TTL (both are versioned)
TMap<NOlap::TSnapshot, NOlap::TIndexInfo> schemaPreset;
+ TMap<NOlap::TSnapshot, NOlap::TIndexInfo> commonSchema;
THashMap<ui64, TMap<TRowVersion, TTtl::TDescription>> ttls;
{ // Load schema presets
@@ -150,8 +151,10 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
const ui32 id = rowset.GetValue<Schema::SchemaPresetInfo::Id>();
auto& preset = Self->SchemaPresets[id];
preset.Id = id;
- preset.Name = rowset.GetValue<Schema::SchemaPresetInfo::Name>();
- Y_VERIFY(preset.Name == "default", "Unsupported preset at load time");
+ if (id) {
+ preset.Name = rowset.GetValue<Schema::SchemaPresetInfo::Name>();
+ }
+ Y_VERIFY(!id || preset.Name == "default", "Unsupported preset at load time");
if (rowset.HaveValue<Schema::SchemaPresetInfo::DropStep>() &&
rowset.HaveValue<Schema::SchemaPresetInfo::DropTxId>())
@@ -180,9 +183,11 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
auto& info = preset.Versions[version];
Y_VERIFY(info.ParseFromString(rowset.GetValue<Schema::SchemaPresetVersionInfo::InfoProto>()));
- if (preset.Name == "default") {
- schemaPreset.emplace(NOlap::TSnapshot{version.Step, version.TxId},
- Self->ConvertSchema(info.GetSchema()));
+ NOlap::TSnapshot snap{version.Step, version.TxId};
+ if (!id) {
+ commonSchema.emplace(snap, Self->ConvertSchema(info.GetSchema()));
+ } else if (preset.Name == "default") {
+ schemaPreset.emplace(snap, Self->ConvertSchema(info.GetSchema()));
}
if (!rowset.Next())
@@ -246,7 +251,13 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
Self->SetCounter(COUNTER_TABLE_TTLS, ttls.size());
if (!schemaPreset.empty()) {
+ Y_VERIFY(commonSchema.empty(), "Mix of schema preset and common schema");
Self->SetPrimaryIndex(std::move(schemaPreset));
+ } else if (!commonSchema.empty()) {
+ Self->SetPrimaryIndex(std::move(commonSchema));
+ } else {
+ Y_VERIFY(Self->Tables.empty());
+ Y_VERIFY(Self->SchemaPresets.empty());
}
{ // Load long tx writes
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index ac332a83127..6b672049d45 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -380,26 +380,36 @@ bool TColumnShard::IsTableWritable(ui64 tableId) const {
return !it->second.IsDropped();
}
-ui32 TColumnShard::EnsureSchemaPreset(NIceDb::TNiceDb& db, const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto,
+ui32 TColumnShard::EnsureSchemaPreset(NIceDb::TNiceDb& db, ui32 presetId, const TString& name,
+ const NKikimrSchemeOp::TColumnTableSchema& schemaProto,
const TRowVersion& version) {
- if (!SchemaPresets.contains(presetProto.GetId())) {
- auto& preset = SchemaPresets[presetProto.GetId()];
- preset.Id = presetProto.GetId();
- preset.Name = presetProto.GetName();
+ if (!SchemaPresets.contains(presetId)) {
+ LOG_S_DEBUG("EnsureSchemaPreset " << presetId << " at tablet " << TabletID());
+
+ auto& preset = SchemaPresets[presetId];
+ preset.Id = presetId;
+ preset.Name = name;
auto& info = preset.Versions[version];
info.SetId(preset.Id);
info.SetSinceStep(version.Step);
info.SetSinceTxId(version.TxId);
- *info.MutableSchema() = presetProto.GetSchema();
-
- Y_VERIFY(preset.Name == "default", "Only schema preset named 'default' is supported");
+ *info.MutableSchema() = schemaProto;
Schema::SaveSchemaPresetInfo(db, preset.Id, preset.Name);
Schema::SaveSchemaPresetVersionInfo(db, preset.Id, version, info);
SetCounter(COUNTER_TABLE_PRESETS, SchemaPresets.size());
+ } else {
+ LOG_S_DEBUG("EnsureSchemaPreset for existed preset " << presetId << " at tablet " << TabletID());
}
- return presetProto.GetId();
+ return presetId;
+}
+
+ui32 TColumnShard::EnsureSchemaPreset(NIceDb::TNiceDb& db, const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto,
+ const TRowVersion& version) {
+ Y_VERIFY(presetProto.GetName() == "default", "Only schema preset named 'default' is supported");
+
+ return EnsureSchemaPreset(db, presetProto.GetId(), presetProto.GetName(), presetProto.GetSchema(), version);
}
void TColumnShard::RunSchemaTx(const NKikimrTxColumnShard::TSchemaTxBody& body, const TRowVersion& version,
@@ -441,14 +451,19 @@ void TColumnShard::RunInit(const NKikimrTxColumnShard::TInitShard& proto, const
NIceDb::TNiceDb db(txc.DB);
- if (proto.HasStorePathId()) {
- StorePathId = proto.GetStorePathId();
- Schema::SaveSpecialValue(db, Schema::EValueIds::StorePathId, StorePathId);
+ if (proto.HasOwnerPathId()) {
+ OwnerPathId = proto.GetOwnerPathId();
+ Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPathId, OwnerPathId);
}
+
if (proto.HasOwnerPath()) {
OwnerPath = proto.GetOwnerPath();
Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPath, OwnerPath);
}
+
+ for (auto& createTable : proto.GetTables()) {
+ RunEnsureTable(createTable, version, txc);
+ }
}
void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tableProto, const TRowVersion& version,
@@ -457,32 +472,49 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl
const ui64 pathId = tableProto.GetPathId();
if (!Tables.contains(pathId)) {
- auto& table = Tables[pathId];
- table.PathId = pathId;
- auto& tableVerProto = table.Versions[version];
- tableVerProto.SetPathId(pathId);
-
- Y_VERIFY(!tableProto.HasSchema(), "Tables with explicit schema are not supported");
+ LOG_S_DEBUG("EnsureTable for pathId: " << pathId << " at tablet " << TabletID());
ui32 schemaPresetId = 0;
if (tableProto.HasSchemaPreset()) {
+ Y_VERIFY(!tableProto.HasSchema(), "Tables has either schema or preset");
+
schemaPresetId = EnsureSchemaPreset(db, tableProto.GetSchemaPreset(), version);
- tableVerProto.SetSchemaPresetId(schemaPresetId);
+ Y_VERIFY(schemaPresetId);
+ } else {
+ Y_VERIFY(tableProto.HasSchema(), "Tables has either schema or preset");
+
+ // Save first table schema as common one with schemaPresetId == 0
+
+ if (SchemaPresets.count(0)) {
+ LOG_S_WARN("Colocated standalone tables are not supported. "
+ << "EnsureTable failed at tablet " << TabletID());
+ return;
+ }
+
+ schemaPresetId = EnsureSchemaPreset(db, 0, "", tableProto.GetSchema(), version);
+ Y_VERIFY(!schemaPresetId);
}
+ auto& table = Tables[pathId];
+ table.PathId = pathId;
+ auto& tableVerProto = table.Versions[version];
+ tableVerProto.SetPathId(pathId);
+ tableVerProto.SetSchemaPresetId(schemaPresetId);
+
if (tableProto.HasTtlSettings()) {
*tableVerProto.MutableTtlSettings() = tableProto.GetTtlSettings();
Ttl.SetPathTtl(pathId, TTtl::TDescription(tableProto.GetTtlSettings()));
SetCounter(COUNTER_TABLE_TTLS, Ttl.PathsCount());
}
- if (!PrimaryIndex && schemaPresetId) {
+ if (!PrimaryIndex) {
+ TMap<NOlap::TSnapshot, NOlap::TIndexInfo> schemaHistory;
+
auto& schemaPresetVerProto = SchemaPresets[schemaPresetId].Versions[version];
- TMap<NOlap::TSnapshot, NOlap::TIndexInfo> schemaPreset;
- schemaPreset.emplace(NOlap::TSnapshot{version.Step, version.TxId},
- ConvertSchema(schemaPresetVerProto.GetSchema()));
+ schemaHistory.emplace(NOlap::TSnapshot{version.Step, version.TxId},
+ ConvertSchema(schemaPresetVerProto.GetSchema()));
- SetPrimaryIndex(std::move(schemaPreset));
+ SetPrimaryIndex(std::move(schemaHistory));
}
tableVerProto.SetSchemaPresetVersionAdj(tableProto.GetSchemaPresetVersionAdj());
@@ -491,6 +523,8 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl
Schema::SaveTableInfo(db, table.PathId);
Schema::SaveTableVersionInfo(db, table.PathId, version, tableVerProto);
SetCounter(COUNTER_TABLES, Tables.size());
+ } else {
+ LOG_S_DEBUG("EnsureTable for existed pathId: " << pathId << " at tablet " << TabletID());
}
}
@@ -503,6 +537,8 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP
Y_VERIFY(tablePtr && !tablePtr->IsDropped(), "AlterTable on a dropped or non-existent table");
auto& table = *tablePtr;
+ LOG_S_DEBUG("AlterTable for pathId: " << pathId << " at tablet " << TabletID());
+
Y_VERIFY(!alterProto.HasSchema(), "Tables with explicit schema are not supported");
auto& info = table.Versions[version];
@@ -531,6 +567,8 @@ void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProt
auto* table = Tables.FindPtr(pathId);
Y_VERIFY_DEBUG(table && !table->IsDropped());
if (table && !table->IsDropped()) {
+ LOG_S_DEBUG("DropTable for pathId: " << pathId << " at tablet " << TabletID());
+
PathsToDrop.insert(pathId);
Ttl.DropPathTtl(pathId);
@@ -542,6 +580,8 @@ void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProt
table->DropVersion = version;
Schema::SaveTableDropVersion(db, pathId, version.Step, version.TxId);
+ } else {
+ LOG_S_DEBUG("DropTable for unknown or deleted pathId: " << pathId << " at tablet " << TabletID());
}
}
@@ -550,11 +590,11 @@ void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto,
NIceDb::TNiceDb db(txc.DB);
if (proto.HasStorePathId()) {
- StorePathId = proto.GetStorePathId();
- Schema::SaveSpecialValue(db, Schema::EValueIds::StorePathId, StorePathId);
+ OwnerPathId = proto.GetStorePathId();
+ Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPathId, OwnerPathId);
}
- TMap<NOlap::TSnapshot, NOlap::TIndexInfo> schemaPreset;
+ TMap<NOlap::TSnapshot, NOlap::TIndexInfo> schemaHistory;
for (ui32 id : proto.GetDroppedSchemaPresets()) {
if (!SchemaPresets.contains(id)) {
@@ -579,14 +619,14 @@ void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto,
*info.MutableSchema() = presetProto.GetSchema();
if (preset.Name == "default") {
- schemaPreset.emplace(NOlap::TSnapshot{version.Step, version.TxId}, ConvertSchema(info.GetSchema()));
+ schemaHistory.emplace(NOlap::TSnapshot{version.Step, version.TxId}, ConvertSchema(info.GetSchema()));
}
Schema::SaveSchemaPresetVersionInfo(db, preset.Id, version, info);
}
- if (!schemaPreset.empty()) {
- SetPrimaryIndex(std::move(schemaPreset));
+ if (!schemaHistory.empty()) {
+ SetPrimaryIndex(std::move(schemaHistory));
}
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 5595ccf502d..1595cbda2dd 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -125,7 +125,7 @@ class TColumnShard
void Handle(TEvPrivate::TEvForget::TPtr& ev, const TActorContext& ctx);
void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr& ev, const TActorContext& ctx);
void Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev);
-
+
ITransaction* CreateTxInitSchema();
ITransaction* CreateTxRunGc();
@@ -293,11 +293,11 @@ private:
};
struct TSchemaPreset {
- using TVerProto = NKikimrTxColumnShard::TSchemaPresetVersionInfo;
+ using TSchemaPresetVersionInfo = NKikimrTxColumnShard::TSchemaPresetVersionInfo;
ui32 Id;
TString Name;
- TMap<TRowVersion, TVerProto> Versions;
+ TMap<TRowVersion, TSchemaPresetVersionInfo> Versions;
TRowVersion DropVersion = TRowVersion::Max();
bool IsDropped() const {
@@ -306,10 +306,10 @@ private:
};
struct TTableInfo {
- using TVerProto = NKikimrTxColumnShard::TTableVersionInfo;
+ using TTableVersionInfo = NKikimrTxColumnShard::TTableVersionInfo;
ui64 PathId;
- std::map<TRowVersion, TVerProto> Versions;
+ std::map<TRowVersion, TTableVersionInfo> Versions;
TRowVersion DropVersion = TRowVersion::Max();
bool IsDropped() const {
@@ -332,7 +332,7 @@ private:
ui64 LastCompactedGranule = 0;
ui64 LastExportNo = 0;
ui64 WritesInFly = 0;
- ui64 StorePathId = 0;
+ ui64 OwnerPathId = 0;
ui64 StatsReportRound = 0;
ui64 BackgroundActivation = 0;
ui32 SkippedIndexations = TSettings::MAX_INDEXATIONS_TO_SKIP; // Force indexation on tablet init
@@ -432,6 +432,8 @@ private:
bool IsTableWritable(ui64 tableId) const;
+ ui32 EnsureSchemaPreset(NIceDb::TNiceDb& db, ui32 presetId, const TString& name,
+ const NKikimrSchemeOp::TColumnTableSchema& schemaProto, const TRowVersion& version);
ui32 EnsureSchemaPreset(NIceDb::TNiceDb& db, const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto, const TRowVersion& version);
//ui32 EnsureTtlSettingsPreset(NIceDb::TNiceDb& db, const NKikimrSchemeOp::TColumnTableTtlSettingsPreset& presetProto, const TRowVersion& version);
diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h
index 671e49cf487..360e30d7140 100644
--- a/ydb/core/tx/columnshard/columnshard_schema.h
+++ b/ydb/core/tx/columnshard/columnshard_schema.h
@@ -47,7 +47,7 @@ struct Schema : NIceDb::Schema {
LastGcBarrierGen = 8,
LastGcBarrierStep = 9,
LastExportNumber = 10,
- StorePathId = 11,
+ OwnerPathId = 11,
OwnerPath = 12,
};
@@ -95,7 +95,7 @@ struct Schema : NIceDb::Schema {
struct Id : Column<1, NScheme::NTypeIds::Uint32> {};
struct SinceStep : Column<2, NScheme::NTypeIds::Uint64> {};
struct SinceTxId : Column<3, NScheme::NTypeIds::Uint64> {};
- struct InfoProto : Column<4, NScheme::NTypeIds::String> {}; // TSchemaPresetVersionInfo
+ struct InfoProto : Column<4, NScheme::NTypeIds::String> {}; // TCommonSchemaVersionInfo
using TKey = TableKey<Id, SinceStep, SinceTxId>;
using TColumns = TableColumns<Id, SinceStep, SinceTxId, InfoProto>;
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h
index e0cdb1766f0..3c4829c6c27 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.h
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.h
@@ -38,7 +38,7 @@ struct TTestSchema {
TStorageTier(const TString& name = {})
: Name(name)
-
+
{}
NKikimrSchemeOp::EColumnCodec GetCodecId() const {
@@ -173,6 +173,39 @@ struct TTestSchema {
return col;
}
+ static void InitSchema(const TVector<std::pair<TString, TTypeInfo>>& columns,
+ const TVector<std::pair<TString, TTypeInfo>>& pk,
+ const TTableSpecials& specials,
+ NKikimrSchemeOp::TColumnTableSchema* schema)
+ {
+ schema->SetEngine(NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES);
+
+ for (ui32 i = 0; i < columns.size(); ++i) {
+ *schema->MutableColumns()->Add() = CreateColumn(i + 1, columns[i].first, columns[i].second);
+ }
+
+ Y_VERIFY(pk.size() == 4);
+ for (auto& column : ExtractNames(pk)) {
+ schema->AddKeyColumnNames(column);
+ }
+
+ if (specials.HasCodec()) {
+ schema->MutableDefaultCompression()->SetCompressionCodec(specials.GetCodecId());
+ }
+ if (specials.CompressionLevel) {
+ schema->MutableDefaultCompression()->SetCompressionLevel(*specials.CompressionLevel);
+ }
+
+ schema->SetEnableTiering(specials.HasTiers());
+ }
+
+ static void InitTtl(const TTableSpecials& specials, NKikimrSchemeOp::TColumnDataLifeCycle* ttlSettings) {
+ ttlSettings->SetVersion(1);
+ auto* enable = ttlSettings->MutableEnabled();
+ enable->SetColumnName(specials.GetTtlColumn());
+ enable->SetExpireAfterSeconds(specials.GetEvictAfterSecondsUnsafe());
+ }
+
static TString CreateTableTxBody(ui64 pathId, const TVector<std::pair<TString, TTypeInfo>>& columns,
const TVector<std::pair<TString, TTypeInfo>>& pk,
const TTableSpecials& specials = {}) {
@@ -186,35 +219,29 @@ struct TTestSchema {
preset->SetName("default");
// schema
+ InitSchema(columns, pk, specials, preset->MutableSchema());
+ }
- auto* schema = preset->MutableSchema();
- schema->SetEngine(NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES);
-
- for (ui32 i = 0; i < columns.size(); ++i) {
- *schema->MutableColumns()->Add() = CreateColumn(i + 1, columns[i].first, columns[i].second);
- }
+ if (specials.HasTtl()) {
+ InitTtl(specials, table->MutableTtlSettings());
+ }
- Y_VERIFY(pk.size() == 4);
- for (auto& column : ExtractNames(pk)) {
- schema->AddKeyColumnNames(column);
- }
+ TString out;
+ Y_PROTOBUF_SUPPRESS_NODISCARD tx.SerializeToString(&out);
+ return out;
+ }
- if (specials.HasCodec()) {
- schema->MutableDefaultCompression()->SetCompressionCodec(specials.GetCodecId());
- }
- if (specials.CompressionLevel) {
- schema->MutableDefaultCompression()->SetCompressionLevel(*specials.CompressionLevel);
- }
+ static TString CreateStandaloneTableTxBody(ui64 pathId, const TVector<std::pair<TString, TTypeInfo>>& columns,
+ const TVector<std::pair<TString, TTypeInfo>>& pk,
+ const TTableSpecials& specials = {}) {
+ NKikimrTxColumnShard::TSchemaTxBody tx;
+ auto* table = tx.MutableEnsureTables()->AddTables();
+ table->SetPathId(pathId);
- schema->SetEnableTiering(specials.HasTiers());
- }
+ InitSchema(columns, pk, specials, table->MutableSchema());
if (specials.HasTtl()) {
- auto* ttlSettings = table->MutableTtlSettings();
- ttlSettings->SetVersion(1);
- auto* enable = ttlSettings->MutableEnabled();
- enable->SetColumnName(specials.GetTtlColumn());
- enable->SetExpireAfterSeconds(specials.GetEvictAfterSecondsUnsafe());
+ InitTtl(specials, table->MutableTtlSettings());
}
TString out;
diff --git a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
index 5836e05b087..40db0e15c49 100644
--- a/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
@@ -282,20 +282,39 @@ bool CheckColumns(const TString& blob, const NKikimrTxColumnShard::TMetadata& me
return CheckColumns(batch, colNames, rowsCount);
}
+struct TestTableDescription {
+ TVector<std::pair<TString, TTypeInfo>> Schema = TTestSchema::YdbSchema();
+ TVector<std::pair<TString, TTypeInfo>> Pk = TTestSchema::YdbPkSchema();
+ bool InStore = true;
+};
+
void SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId,
- const TVector<std::pair<TString, TTypeInfo>>& schema = TTestSchema::YdbSchema(),
- const TVector<std::pair<TString, TTypeInfo>>& pk = TTestSchema::YdbPkSchema(),
- TString codec = "none") {
+ const TestTableDescription& table, TString codec = "none") {
NOlap::TSnapshot snap = {10, 10};
- bool ok = ProposeSchemaTx(runtime, sender,
- TTestSchema::CreateTableTxBody(pathId, schema, pk,
- TTestSchema::TTableSpecials().WithCodec(codec)),
- snap);
+ TString txBody;
+ if (table.InStore) {
+ txBody = TTestSchema::CreateTableTxBody(
+ pathId, table.Schema, table.Pk, TTestSchema::TTableSpecials().WithCodec(codec));
+
+ } else {
+ txBody = TTestSchema::CreateStandaloneTableTxBody(
+ pathId, table.Schema, table.Pk, TTestSchema::TTableSpecials().WithCodec(codec));
+ }
+ bool ok = ProposeSchemaTx(runtime, sender, txBody, snap);
UNIT_ASSERT(ok);
+
PlanSchemaTx(runtime, sender, snap);
}
-void TestWrite(const TVector<std::pair<TString, TTypeInfo>>& ydbSchema) {
+void SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId,
+ const TVector<std::pair<TString, TTypeInfo>>& schema = TTestSchema::YdbSchema(),
+ const TVector<std::pair<TString, TTypeInfo>>& pk = TTestSchema::YdbPkSchema(),
+ TString codec = "none") {
+ TestTableDescription table{schema, pk, true};
+ SetupSchema(runtime, sender, pathId, table, codec);
+}
+
+void TestWrite(const TestTableDescription& table) {
TTestBasicRuntime runtime;
TTester::Setup(runtime);
@@ -312,7 +331,10 @@ void TestWrite(const TVector<std::pair<TString, TTypeInfo>>& ydbSchema) {
ui64 writeId = 0;
ui64 tableId = 1;
- SetupSchema(runtime, sender, tableId, ydbSchema);
+ SetupSchema(runtime, sender, tableId, table);
+
+ const TVector<std::pair<TString, TTypeInfo>>& ydbSchema = table.Schema;
+
bool ok = WriteData(runtime, sender, metaShard, writeId, tableId, MakeTestBlob({0, 100}, ydbSchema));
UNIT_ASSERT(ok);
@@ -453,9 +475,7 @@ void TestWriteReadDup() {
}
}
-void TestWriteRead(bool reboots, const TVector<std::pair<TString, TTypeInfo>>& ydbSchema = TTestSchema::YdbSchema(),
- const TVector<std::pair<TString, TTypeInfo>>& testYdbPk = TTestSchema::YdbPkSchema(),
- TString codec = "") {
+void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString codec = "") {
TTestBasicRuntime runtime;
TTester::Setup(runtime);
@@ -498,7 +518,10 @@ void TestWriteRead(bool reboots, const TVector<std::pair<TString, TTypeInfo>>& y
ui64 writeId = 0;
ui64 tableId = 1;
- SetupSchema(runtime, sender, tableId, ydbSchema, testYdbPk, codec);
+ SetupSchema(runtime, sender, tableId, table, codec);
+
+ const TVector<std::pair<TString, TTypeInfo>>& ydbSchema = table.Schema;
+ const TVector<std::pair<TString, TTypeInfo>>& testYdbPk = table.Pk;
// ----xx
// -----xx..
@@ -1474,11 +1497,27 @@ void TestReadAggregate(const TVector<std::pair<TString, TTypeInfo>>& ydbSchema,
Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
Y_UNIT_TEST(Write) {
- TestWrite(TTestSchema::YdbSchema());
+ TestTableDescription table;
+ TestWrite(table);
+ }
+
+ Y_UNIT_TEST(WriteStandalone) {
+ TestTableDescription table;
+ table.InStore = false;
+ TestWrite(table);
}
Y_UNIT_TEST(WriteExoticTypes) {
- TestWrite(TTestSchema::YdbExoticSchema());
+ TestTableDescription table;
+ table.Schema = TTestSchema::YdbExoticSchema();
+ TestWrite(table);
+ }
+
+ Y_UNIT_TEST(WriteStandaloneExoticTypes) {
+ TestTableDescription table;
+ table.Schema = TTestSchema::YdbExoticSchema();
+ table.InStore = false;
+ TestWrite(table);
}
Y_UNIT_TEST(WriteReadDuplicate) {
@@ -1486,23 +1525,45 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
}
Y_UNIT_TEST(WriteRead) {
- TestWriteRead(false);
+ TestTableDescription table;
+ TestWriteRead(false, table);
+ }
+
+ Y_UNIT_TEST(WriteReadStandalone) {
+ TestTableDescription table;
+ table.InStore = false;
+ TestWriteRead(false, table);
}
Y_UNIT_TEST(WriteReadExoticTypes) {
- TestWriteRead(false, TTestSchema::YdbExoticSchema());
+ TestTableDescription table;
+ table.Schema = TTestSchema::YdbExoticSchema();
+ TestWriteRead(false, table);
+ }
+
+ Y_UNIT_TEST(WriteReadStandaloneExoticTypes) {
+ TestTableDescription table;
+ table.Schema = TTestSchema::YdbExoticSchema();
+ table.InStore = false;
+ TestWriteRead(false, table);
}
Y_UNIT_TEST(RebootWriteRead) {
TestWriteRead(true);
}
+ Y_UNIT_TEST(RebootWriteReadStandalone) {
+ TestTableDescription table;
+ table.InStore = false;
+ TestWriteRead(true, table);
+ }
+
Y_UNIT_TEST(WriteReadNoCompression) {
- TestWriteRead(true, TTestSchema::YdbSchema(), TTestSchema::YdbPkSchema(), "none");
+ TestWriteRead(true, {}, "none");
}
Y_UNIT_TEST(WriteReadZSTD) {
- TestWriteRead(true, TTestSchema::YdbSchema(), TTestSchema::YdbPkSchema(), "zstd");
+ TestWriteRead(true, {}, "zstd");
}
Y_UNIT_TEST(CompactionInGranule) {
diff --git a/ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp b/ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp
index d5c39e7d7ba..c8bfaffba65 100644
--- a/ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__delete_tablet_reply.cpp
@@ -93,7 +93,7 @@ struct TSchemeShard::TTxDeleteTabletReply : public TSchemeShard::TRwTxBase {
Self->TabletCounters->Simple()[COUNTER_SYS_VIEW_PROCESSOR_COUNT].Sub(1);
break;
case ETabletType::ColumnShard:
- Self->TabletCounters->Simple()[COUNTER_COLUMN_SHARDS].Sub(-1);
+ Self->TabletCounters->Simple()[COUNTER_COLUMN_SHARDS].Sub(1);
break;
case ETabletType::SequenceShard:
Self->TabletCounters->Simple()[COUNTER_SEQUENCESHARD_COUNT].Sub(1);
diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp
index 94517ae74ea..c562e944841 100644
--- a/ydb/core/tx/schemeshard/schemeshard__init.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp
@@ -4460,16 +4460,24 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
Y_VERIFY(description.ParseFromString(rowset.GetValue<Schema::ColumnTables::Description>()));
NKikimrSchemeOp::TColumnTableSharding sharding;
Y_VERIFY(sharding.ParseFromString(rowset.GetValue<Schema::ColumnTables::Sharding>()));
+ TMaybe<NKikimrSchemeOp::TColumnStoreSharding> storeSharding;
+ if (rowset.HaveValue<Schema::ColumnTables::StandaloneSharding>()) {
+ Y_VERIFY(storeSharding.ConstructInPlace().ParseFromString(
+ rowset.GetValue<Schema::ColumnTables::StandaloneSharding>()));
+ }
- TColumnTableInfo::TPtr tableInfo = new TColumnTableInfo(alterVersion, std::move(description), std::move(sharding));
+ TColumnTableInfo::TPtr tableInfo = new TColumnTableInfo(alterVersion,
+ std::move(description), std::move(sharding), std::move(storeSharding));
Self->ColumnTables[pathId] = tableInfo;
Self->IncrementPathDbRefCount(pathId);
- auto itStore = Self->OlapStores.find(tableInfo->OlapStorePathId);
- if (itStore != Self->OlapStores.end()) {
- itStore->second->ColumnTables.insert(pathId);
- if (pathsUnderOperation.contains(pathId)) {
- itStore->second->ColumnTablesUnderOperation.insert(pathId);
+ if (tableInfo->OlapStorePathId) {
+ auto itStore = Self->OlapStores.find(*tableInfo->OlapStorePathId);
+ if (itStore != Self->OlapStores.end()) {
+ itStore->second->ColumnTables.insert(pathId);
+ if (pathsUnderOperation.contains(pathId)) {
+ itStore->second->ColumnTablesUnderOperation.insert(pathId);
+ }
}
}
@@ -4497,11 +4505,18 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
if (rowset.HaveValue<Schema::ColumnTablesAlters::AlterBody>()) {
Y_VERIFY(alterBody.ConstructInPlace().ParseFromString(rowset.GetValue<Schema::ColumnTablesAlters::AlterBody>()));
}
-
- TColumnTableInfo::TPtr alterData = new TColumnTableInfo(alterVersion, std::move(description), std::move(sharding), std::move(alterBody));
+ TMaybe<NKikimrSchemeOp::TColumnStoreSharding> storeSharding;
+ if (rowset.HaveValue<Schema::ColumnTablesAlters::StandaloneSharding>()) {
+ Y_VERIFY(storeSharding.ConstructInPlace().ParseFromString(
+ rowset.GetValue<Schema::ColumnTablesAlters::StandaloneSharding>()));
+ }
Y_VERIFY_S(Self->ColumnTables.contains(pathId),
"Cannot load alter for olap table " << pathId);
+
+ TColumnTableInfo::TPtr alterData = new TColumnTableInfo(alterVersion,
+ std::move(description), std::move(sharding), std::move(storeSharding), std::move(alterBody));
+
Self->ColumnTables[pathId]->AlterData = alterData;
if (!rowset.Next()) {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp
index 6f167751aa1..4b3626c6e43 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp
@@ -453,7 +453,15 @@ public:
Y_VERIFY(context.SS->ColumnTables.contains(path.Base()->PathId));
TColumnTableInfo::TPtr tableInfo = context.SS->ColumnTables.at(path.Base()->PathId);
- TPath storePath = TPath::Init(tableInfo->OlapStorePathId, context.SS);
+ if (!tableInfo->OlapStorePathId) {
+ result->SetError(NKikimrScheme::StatusSchemeError,
+ "Alter for standalone column table is not supported yet");
+ return result;
+ }
+
+ auto& storePathId = *tableInfo->OlapStorePathId;
+
+ TPath storePath = TPath::Init(storePathId, context.SS);
{
TPath::TChecker checks = storePath.Check();
checks
@@ -471,8 +479,8 @@ public:
}
}
- Y_VERIFY(context.SS->OlapStores.contains(tableInfo->OlapStorePathId));
- TOlapStoreInfo::TPtr storeInfo = context.SS->OlapStores.at(tableInfo->OlapStorePathId);
+ Y_VERIFY(context.SS->OlapStores.contains(storePathId));
+ TOlapStoreInfo::TPtr storeInfo = context.SS->OlapStores.at(storePathId);
TString errStr;
if (!context.SS->CheckApplyIf(Transaction, errStr)) {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h
index 413f18d8666..54acba06b11 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h
@@ -887,9 +887,12 @@ public:
// OlapStore tracks all tables that are under operation, make sure to unlink
if (context.SS->ColumnTables.contains(pathId)) {
auto tableInfo = context.SS->ColumnTables.at(pathId);
- if (context.SS->OlapStores.contains(tableInfo->OlapStorePathId)) {
- auto storeInfo = context.SS->OlapStores.at(tableInfo->OlapStorePathId);
- storeInfo->ColumnTablesUnderOperation.erase(pathId);
+ if (tableInfo->OlapStorePathId) {
+ auto& storePathId = *tableInfo->OlapStorePathId;
+ if (context.SS->OlapStores.contains(storePathId)) {
+ auto storeInfo = context.SS->OlapStores.at(storePathId);
+ storeInfo->ColumnTablesUnderOperation.erase(pathId);
+ }
}
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp
index a2685bf2b2d..4da833de00a 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp
@@ -12,113 +12,11 @@ namespace {
using namespace NKikimr;
using namespace NSchemeShard;
-// TODO: make it a part of TOlapSchema
bool PrepareSchema(NKikimrSchemeOp::TColumnTableSchema& proto, TOlapSchema& schema, TString& errStr) {
- schema.NextColumnId = proto.GetNextColumnId();
-
- const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry;
-
- schema.Columns.clear();
- for (auto& colProto : *proto.MutableColumns()) {
- if (colProto.GetName().empty()) {
- errStr = Sprintf("Columns cannot have an empty name");
- return false;
- }
- if (!colProto.HasId()) {
- colProto.SetId(schema.NextColumnId++);
- } else if (colProto.GetId() <= 0 || colProto.GetId() >= schema.NextColumnId) {
- errStr = Sprintf("Column id is incorrect");
- return false;
- }
- ui32 colId = colProto.GetId();
- if (schema.Columns.contains(colId)) {
- errStr = Sprintf("Duplicate column id %" PRIu32 " for column '%s'", colId, colProto.GetName().c_str());
- return false;
- }
- auto& col = schema.Columns[colId];
- col.Id = colId;
- col.Name = colProto.GetName();
-
- if (colProto.HasTypeId()) {
- errStr = Sprintf("Cannot set TypeId for column '%s', use Type", col.Name.c_str());
- return false;
- }
- if (!colProto.HasType()) {
- errStr = Sprintf("Missing Type for column '%s'", col.Name.c_str());
- return false;
- }
-
- auto typeName = NMiniKQL::AdaptLegacyYqlType(colProto.GetType());
- const NScheme::IType* type = typeRegistry->GetType(typeName);
- if (type) {
- if (!NScheme::NTypeIds::IsYqlType(type->GetTypeId())) {
- errStr = Sprintf("Type '%s' specified for column '%s' is not supported", colProto.GetType().c_str(), col.Name.c_str());
- return false;
- }
- col.Type = NScheme::TTypeInfo(type->GetTypeId());
- } else {
- auto* typeDesc = NPg::TypeDescFromPgTypeName(typeName);
- if (!typeDesc) {
- errStr = Sprintf("Type '%s' specified for column '%s' is not supported", colProto.GetType().c_str(), col.Name.c_str());
- }
- col.Type = NScheme::TTypeInfo(NScheme::NTypeIds::Pg, typeDesc);
- }
- auto columnType = NScheme::ProtoColumnTypeFromTypeInfo(col.Type);
- colProto.SetTypeId(columnType.TypeId);
- if (columnType.TypeInfo) {
- *colProto.MutableTypeInfo() = *columnType.TypeInfo;
- }
-
- if (schema.ColumnsByName.contains(col.Name)) {
- errStr = Sprintf("Duplicate column '%s'", col.Name.c_str());
- return false;
- }
- schema.ColumnsByName[col.Name] = col.Id;
- }
-
- if (schema.Columns.empty()) {
- errStr = Sprintf("At least one column is required");
- return false;
- }
-
- schema.KeyColumnIds.clear();
- for (const TString& keyName : proto.GetKeyColumnNames()) {
- auto* col = schema.FindColumnByName(keyName);
- if (!col) {
- errStr = Sprintf("Unknown key column '%s'", keyName.c_str());
- return false;
- }
- if (col->IsKeyColumn()) {
- errStr = Sprintf("Duplicate key column '%s'", keyName.c_str());
- return false;
- }
- col->KeyOrder = schema.KeyColumnIds.size();
- schema.KeyColumnIds.push_back(col->Id);
- }
-
- if (schema.KeyColumnIds.empty()) {
- errStr = "At least one key column is required";
+ if (!TOlapSchema::UpdateProto(proto, errStr)) {
return false;
}
-#if 0
- for (auto& tierConfig : proto.GetStorageTiers()) {
- TString tierName = tierConfig.GetName();
- if (schema.Tiers.count(tierName)) {
- errStr = Sprintf("Same tier name in schema: '%s'", tierName.c_str());
- return false;
- }
- schema.Tiers.insert(tierName);
-
- if (!PrepareTier(tierConfig, errStr)) {
- return false;
- }
- }
-#endif
-
- schema.Engine = proto.GetEngine();
-
- proto.SetNextColumnId(schema.NextColumnId);
- return true;
+ return schema.Parse(proto, errStr);
}
// TODO: make it a part of TOlapStoreInfo
@@ -294,7 +192,7 @@ public:
// TODO: we may need to specify a more complex data channel mapping
auto* init = tx.MutableInitShard();
init->SetDataChannelCount(storeInfo->Description.GetStorageConfig().GetDataChannelCount());
- init->SetStorePathId(txState->TargetPathId.LocalPathId);
+ init->SetOwnerPathId(txState->TargetPathId.LocalPathId);
init->SetOwnerPath(path.PathString());
Y_PROTOBUF_SUPPRESS_NODISCARD tx.SerializeToString(&columnShardTxBody);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp
index 917688b71c5..3f5f52ef035 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp
@@ -13,201 +13,147 @@ namespace NSchemeShard {
namespace {
-TColumnTableInfo::TPtr CreateColumnTable(
- const NKikimrSchemeOp::TColumnTableDescription& opSrc,
- TOlapStoreInfo::TPtr storeInfo, const TSubDomainInfo& subDomain,
- TEvSchemeShard::EStatus& status, TString& errStr,
- TSchemeShard* ss)
-{
- Y_UNUSED(subDomain);
-
- TColumnTableInfo::TPtr tableInfo = new TColumnTableInfo;
- tableInfo->AlterVersion = 1;
- tableInfo->Description.CopyFrom(opSrc);
-
- auto& op = tableInfo->Description;
+bool PrepareSchema(NKikimrSchemeOp::TColumnTableSchema& proto, TOlapSchema& schema, TString& errStr) {
+ proto.SetNextColumnId(1);
+ proto.SetVersion(1);
- if (op.HasRESERVED_TtlSettingsPresetName() || op.HasRESERVED_TtlSettingsPresetId()) {
- status = NKikimrScheme::StatusSchemeError;
- errStr = "TTL presets are not supported";
- return nullptr;
- }
-
- if (!op.HasSchemaPresetName() && !op.HasSchemaPresetId()) {
- op.SetSchemaPresetName("default");
+ if (!TOlapSchema::UpdateProto(proto, errStr)) {
+ return false;
}
+ return schema.Parse(proto, errStr);
+}
- const TOlapSchema* pSchema = nullptr;
+bool ValidateSchema(const TOlapSchema& schema, const NKikimrSchemeOp::TColumnTableSchema& opSchema,
+ TEvSchemeShard::EStatus& status, TString& errStr)
+{
+ const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry;
- if (op.HasSchemaPresetName()) {
- const TString presetName = op.GetSchemaPresetName();
- if (!storeInfo->SchemaPresetByName.contains(presetName)) {
+ ui32 lastColumnId = 0;
+ THashSet<ui32> usedColumns;
+ for (const auto& colProto : opSchema.GetColumns()) {
+ if (colProto.GetName().empty()) {
status = NKikimrScheme::StatusSchemeError;
- errStr = Sprintf("Specified schema preset '%s' does not exist in olap store", presetName.c_str());
- return nullptr;
+ errStr = "Columns cannot have an empty name";
+ return false;
}
- const ui32 presetId = storeInfo->SchemaPresetByName.at(presetName);
- if (!op.HasSchemaPresetId()) {
- op.SetSchemaPresetId(presetId);
- }
- if (op.GetSchemaPresetId() != presetId) {
+ const TString& colName = colProto.GetName();
+ auto* col = schema.FindColumnByName(colName);
+ if (!col) {
status = NKikimrScheme::StatusSchemeError;
- errStr = Sprintf("Specified schema preset '%s' and id %" PRIu32 " do not match in olap store", presetName.c_str(), presetId);
- return nullptr;
+ errStr = TStringBuilder()
+ << "Column '" << colName << "' does not match schema preset";
+ return false;
}
- pSchema = &storeInfo->SchemaPresets.at(presetId);
- } else if (op.HasSchemaPresetId()) {
- const ui32 presetId = op.GetSchemaPresetId();
- if (!storeInfo->SchemaPresets.contains(presetId)) {
+ if (colProto.HasId() && colProto.GetId() != col->Id) {
status = NKikimrScheme::StatusSchemeError;
- errStr = Sprintf("Specified schema preset %" PRIu32 " does not exist in olap store", presetId);
- return nullptr;
+ errStr = TStringBuilder()
+ << "Column '" << colName << "' has id " << colProto.GetId() << " that does not match schema preset";
+ return false;
}
- const TString& presetName = storeInfo->SchemaPresets.at(presetId).Name;
- op.SetSchemaPresetName(presetName);
- pSchema = &storeInfo->SchemaPresets.at(presetId);
- }
- Y_VERIFY(pSchema, "Expected to find a preset schema");
-
- if (op.HasSchema()) {
- auto& opSchema = op.GetSchema();
- const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry;
-
- ui32 lastColumnId = 0;
- THashSet<ui32> usedColumns;
- for (const auto& colProto : opSchema.GetColumns()) {
- if (colProto.GetName().empty()) {
- status = NKikimrScheme::StatusSchemeError;
- errStr = "Columns cannot have an empty name";
- return nullptr;
- }
- const TString& colName = colProto.GetName();
- auto* col = pSchema->FindColumnByName(colName);
- if (!col) {
- status = NKikimrScheme::StatusSchemeError;
- errStr = TStringBuilder()
- << "Column '" << colName << "' does not match schema preset";
- return nullptr;
- }
- if (colProto.HasId() && colProto.GetId() != col->Id) {
- status = NKikimrScheme::StatusSchemeError;
- errStr = TStringBuilder()
- << "Column '" << colName << "' has id " << colProto.GetId() << " that does not match schema preset";
- return nullptr;
- }
-
- if (!usedColumns.insert(col->Id).second) {
- status = NKikimrScheme::StatusSchemeError;
- errStr = TStringBuilder() << "Column '" << colName << "' is specified multiple times";
- return nullptr;
- }
- if (col->Id < lastColumnId) {
- status = NKikimrScheme::StatusSchemeError;
- errStr = "Column order does not match schema preset";
- return nullptr;
- }
- lastColumnId = col->Id;
-
- if (colProto.HasTypeId()) {
- status = NKikimrScheme::StatusSchemeError;
- errStr = TStringBuilder() << "Cannot set TypeId for column '" << colName << "', use Type";
- return nullptr;
- }
- if (!colProto.HasType()) {
- status = NKikimrScheme::StatusSchemeError;
- errStr = TStringBuilder() << "Missing Type for column '" << colName << "'";
- return nullptr;
- }
+ if (!usedColumns.insert(col->Id).second) {
+ status = NKikimrScheme::StatusSchemeError;
+ errStr = TStringBuilder() << "Column '" << colName << "' is specified multiple times";
+ return false;
+ }
+ if (col->Id < lastColumnId) {
+ status = NKikimrScheme::StatusSchemeError;
+ errStr = "Column order does not match schema preset";
+ return false;
+ }
+ lastColumnId = col->Id;
- auto typeName = NMiniKQL::AdaptLegacyYqlType(colProto.GetType());
- const NScheme::IType* type = typeRegistry->GetType(typeName);
- NScheme::TTypeInfo typeInfo;
- if (!type || !NScheme::NTypeIds::IsYqlType(type->GetTypeId())) {
- auto* typeDesc = NPg::TypeDescFromPgTypeName(typeName);
- if (!typeDesc) {
- status = NKikimrScheme::StatusSchemeError;
- errStr = TStringBuilder()
- << "Type '" << colProto.GetType() << "' specified for column '" << colName << "' is not supported";
- return nullptr;
- }
- typeInfo = NScheme::TTypeInfo(NScheme::NTypeIds::Pg, typeDesc);
- } else {
- typeInfo = NScheme::TTypeInfo(type->GetTypeId());
- }
+ if (colProto.HasTypeId()) {
+ status = NKikimrScheme::StatusSchemeError;
+ errStr = TStringBuilder() << "Cannot set TypeId for column '" << colName << "', use Type";
+ return false;
+ }
+ if (!colProto.HasType()) {
+ status = NKikimrScheme::StatusSchemeError;
+ errStr = TStringBuilder() << "Missing Type for column '" << colName << "'";
+ return false;
+ }
- if (typeInfo != col->Type) {
+ auto typeName = NMiniKQL::AdaptLegacyYqlType(colProto.GetType());
+ const NScheme::IType* type = typeRegistry->GetType(typeName);
+ NScheme::TTypeInfo typeInfo;
+ if (!type || !NScheme::NTypeIds::IsYqlType(type->GetTypeId())) {
+ auto* typeDesc = NPg::TypeDescFromPgTypeName(typeName);
+ if (!typeDesc) {
status = NKikimrScheme::StatusSchemeError;
errStr = TStringBuilder()
- << "Type '" << colProto.GetType() << "' specified for column '" << colName
- << "' does not match schema preset";
- return nullptr;
+ << "Type '" << colProto.GetType() << "' specified for column '" << colName << "' is not supported";
+ return false;
}
+ typeInfo = NScheme::TTypeInfo(NScheme::NTypeIds::Pg, typeDesc);
+ } else {
+ typeInfo = NScheme::TTypeInfo(type->GetTypeId());
}
- for (auto& pr : pSchema->Columns) {
- if (!usedColumns.contains(pr.second.Id)) {
- status = NKikimrScheme::StatusSchemeError;
- errStr = "Specified schema is missing some schema preset columns";
- return nullptr;
- }
+ if (typeInfo != col->Type) {
+ status = NKikimrScheme::StatusSchemeError;
+ errStr = TStringBuilder()
+ << "Type '" << TypeName(typeInfo) << "' specified for column '" << colName
+ << "' does not match schema preset type '" << TypeName(col->Type) << "'";
+ return false;
}
+ }
- TVector<ui32> keyColumnIds;
- for (const TString& keyName : opSchema.GetKeyColumnNames()) {
- auto* col = pSchema->FindColumnByName(keyName);
- if (!col) {
- status = NKikimrScheme::StatusSchemeError;
- errStr = TStringBuilder() << "Unknown key column '" << keyName << "'";
- return nullptr;
- }
- keyColumnIds.push_back(col->Id);
- }
- if (keyColumnIds != pSchema->KeyColumnIds) {
+ for (auto& pr : schema.Columns) {
+ if (!usedColumns.contains(pr.second.Id)) {
status = NKikimrScheme::StatusSchemeError;
- errStr = "Specified schema key columns not matching schema preset";
- return nullptr;
+ errStr = "Specified schema is missing some schema preset columns";
+ return false;
}
+ }
- if (opSchema.GetEngine() != pSchema->Engine) {
+ TVector<ui32> keyColumnIds;
+ for (const TString& keyName : opSchema.GetKeyColumnNames()) {
+ auto* col = schema.FindColumnByName(keyName);
+ if (!col) {
status = NKikimrScheme::StatusSchemeError;
- errStr = "Specified schema engine does not match schema preset";
- return nullptr;
+ errStr = TStringBuilder() << "Unknown key column '" << keyName << "'";
+ return false;
}
-
- op.ClearSchema();
+ keyColumnIds.push_back(col->Id);
}
-
- if (op.HasRESERVED_TtlSettingsPresetName() || op.HasRESERVED_TtlSettingsPresetId()) {
+ if (keyColumnIds != schema.KeyColumnIds) {
status = NKikimrScheme::StatusSchemeError;
- errStr = "TTL presets are not supported";
- return nullptr;
- }
-
- if (op.HasTtlSettings()) {
- op.MutableTtlSettings()->SetVersion(1);
+ errStr = "Specified schema key columns not matching schema preset";
+ return false;
}
- // Validate ttl settings and schema compatibility
- if (op.HasTtlSettings()) {
- if (!ValidateTtlSettings(op.GetTtlSettings(), pSchema->Columns, pSchema->ColumnsByName, errStr)) {
- status = NKikimrScheme::StatusInvalidParameter;
- return nullptr;
- }
+ if (opSchema.GetEngine() != schema.Engine) {
+ status = NKikimrScheme::StatusSchemeError;
+ errStr = "Specified schema engine does not match schema preset";
+ return false;
}
+ return true;
+}
+bool SetSharding(const TOlapSchema& schema, NKikimrSchemeOp::TColumnTableDescription& op,
+ TColumnTableInfo::TPtr tableInfo,
+ TEvSchemeShard::EStatus& status, TString& errStr)
+{
+ ui32 shardsCount = Max(ui32(1), op.GetColumnShardCount());
if (op.HasSharding()) {
tableInfo->Sharding = std::move(*op.MutableSharding());
- op.ClearSharding();
- } else {
- // Use default random sharding
+ } else if (shardsCount < 2) {
tableInfo->Sharding.MutableRandomSharding();
+ } else {
+ status = NKikimrScheme::StatusSchemeError;
+ errStr = Sprintf("Sharding is not set");
+ return false;
}
+ op.ClearSharding();
+
switch (tableInfo->Sharding.Method_case()) {
case NKikimrSchemeOp::TColumnTableSharding::kRandomSharding: {
// Random sharding implies non-unique primary key
- tableInfo->Sharding.SetUniquePrimaryKey(false);
+ if (shardsCount > 1) {
+ tableInfo->Sharding.SetUniquePrimaryKey(false);
+ }
break;
}
case NKikimrSchemeOp::TColumnTableSharding::kHashSharding: {
@@ -215,16 +161,15 @@ TColumnTableInfo::TPtr CreateColumnTable(
if (sharding.ColumnsSize() == 0) {
status = NKikimrScheme::StatusSchemeError;
errStr = Sprintf("Hash sharding requires a non-empty list of columns");
- return nullptr;
+ return false;
}
- Y_VERIFY(pSchema);
bool keysOnly = true;
for (const TString& columnName : sharding.GetColumns()) {
- auto* pColumn = pSchema->FindColumnByName(columnName);
+ auto* pColumn = schema.FindColumnByName(columnName);
if (!pColumn) {
status = NKikimrScheme::StatusSchemeError;
errStr = Sprintf("Hash sharding is using an unknown column '%s'", columnName.c_str());
- return nullptr;
+ return false;
}
if (!pColumn->IsKeyColumn()) {
keysOnly = false;
@@ -237,11 +182,103 @@ TColumnTableInfo::TPtr CreateColumnTable(
default: {
status = NKikimrScheme::StatusSchemeError;
errStr = "Unsupported sharding method";
+ return false;
+ }
+ }
+ return true;
+}
+
+bool CheckSupported(const NKikimrSchemeOp::TColumnTableDescription& op,
+ TEvSchemeShard::EStatus& status, TString& errStr)
+{
+ if (op.HasRESERVED_TtlSettingsPresetName() || op.HasRESERVED_TtlSettingsPresetId()) {
+ status = NKikimrScheme::StatusSchemeError;
+ errStr = "TTL presets are not supported";
+ return false;
+ }
+ if (op.HasRESERVED_TtlSettingsPresetName() || op.HasRESERVED_TtlSettingsPresetId()) {
+ status = NKikimrScheme::StatusSchemeError;
+ errStr = "TTL presets are not supported";
+ return false;
+ }
+ return true;
+}
+
+TColumnTableInfo::TPtr CreateColumnTableInStore(
+ TColumnTableInfo::TPtr& tableInfo,
+ TOlapStoreInfo::TPtr storeInfo,
+ ui32 columnShardCount,
+ TEvSchemeShard::EStatus& status, TString& errStr)
+{
+ auto& op = tableInfo->Description;
+
+ if (!CheckSupported(op, status, errStr)) {
+ return nullptr;
+ }
+
+ if (!op.HasSchemaPresetName() && !op.HasSchemaPresetId()) {
+ op.SetSchemaPresetName("default");
+ }
+
+ const TOlapSchema* pSchema = nullptr;
+
+ if (op.HasSchemaPresetName()) {
+ const TString presetName = op.GetSchemaPresetName();
+ if (!storeInfo->SchemaPresetByName.contains(presetName)) {
+ status = NKikimrScheme::StatusSchemeError;
+ errStr = Sprintf("Specified schema preset '%s' does not exist in tablestore", presetName.c_str());
return nullptr;
}
+ const ui32 presetId = storeInfo->SchemaPresetByName.at(presetName);
+ if (!op.HasSchemaPresetId()) {
+ op.SetSchemaPresetId(presetId);
+ }
+ if (op.GetSchemaPresetId() != presetId) {
+ status = NKikimrScheme::StatusSchemeError;
+ errStr = Sprintf("Specified schema preset '%s' and id %" PRIu32 " do not match in tablestore", presetName.c_str(), presetId);
+ return nullptr;
+ }
+ pSchema = &storeInfo->SchemaPresets.at(presetId);
+ } else if (op.HasSchemaPresetId()) {
+ const ui32 presetId = op.GetSchemaPresetId();
+ if (!storeInfo->SchemaPresets.contains(presetId)) {
+ status = NKikimrScheme::StatusSchemeError;
+ errStr = Sprintf("Specified schema preset %" PRIu32 " does not exist in tablestore", presetId);
+ return nullptr;
+ }
+ const TString& presetName = storeInfo->SchemaPresets.at(presetId).Name;
+ op.SetSchemaPresetName(presetName);
+ pSchema = &storeInfo->SchemaPresets.at(presetId);
+ }
+
+ Y_VERIFY(pSchema, "No schema preset id/name for in-store column table");
+
+ if (op.HasSchema()) {
+ auto& opSchema = op.GetSchema();
+
+ if (!ValidateSchema(*pSchema, opSchema, status, errStr)) {
+ return nullptr;
+ }
+
+ op.ClearSchema();
+ }
+
+ if (op.HasTtlSettings()) {
+ op.MutableTtlSettings()->SetVersion(1);
+ }
+
+ // Validate ttl settings and schema compatibility
+ if (op.HasTtlSettings()) {
+ if (!ValidateTtlSettings(op.GetTtlSettings(), pSchema->Columns, pSchema->ColumnsByName, errStr)) {
+ status = NKikimrScheme::StatusInvalidParameter;
+ return nullptr;
+ }
+ }
+
+ if (!SetSharding(*pSchema, op, tableInfo, status, errStr)) {
+ return nullptr;
}
- const ui32 columnShardCount = Max(ui32(1), op.GetColumnShardCount());
if (columnShardCount > storeInfo->ColumnShards.size()) {
status = NKikimrScheme::StatusSchemeError;
errStr = Sprintf("Cannot create table with %" PRIu32 " column shards, only %" PRIu32 " are available",
@@ -249,13 +286,23 @@ TColumnTableInfo::TPtr CreateColumnTable(
return nullptr;
}
- tableInfo->ColumnShards.reserve(storeInfo->ColumnShards.size());
- for (const auto& shardIdx : storeInfo->ColumnShards) {
+ return tableInfo;
+}
+
+void SetShardingTablets(
+ TColumnTableInfo::TPtr& tableInfo,
+ const TVector<TShardIdx>& columnShards, ui32 columnShardCount, bool shuffle,
+ TSchemeShard* ss)
+{
+ tableInfo->ColumnShards.reserve(columnShards.size());
+ for (const auto& shardIdx : columnShards) {
auto* shardInfo = ss->ShardInfos.FindPtr(shardIdx);
Y_VERIFY(shardInfo, "ColumnShard not found");
tableInfo->ColumnShards.push_back(shardInfo->TabletID.GetValue());
}
- ShuffleRange(tableInfo->ColumnShards);
+ if (shuffle) {
+ ShuffleRange(tableInfo->ColumnShards);
+ }
tableInfo->ColumnShards.resize(columnShardCount);
tableInfo->Sharding.SetVersion(1);
@@ -267,14 +314,60 @@ TColumnTableInfo::TPtr CreateColumnTable(
}
tableInfo->Sharding.ClearAdditionalColumnShards();
+}
- // Don't allow users to set these fields
- op.ClearSchemaPresetVersionAdj();
- op.ClearTtlSettingsPresetVersionAdj();
+TColumnTableInfo::TPtr CreateColumnTable(
+ TColumnTableInfo::TPtr& tableInfo,
+ TEvSchemeShard::EStatus& status, TString& errStr)
+{
+ auto& op = tableInfo->Description;
+
+ if (!CheckSupported(op, status, errStr)) {
+ return nullptr;
+ }
+
+ if (op.HasSchemaPresetName() || op.HasSchemaPresetId()) {
+ status = NKikimrScheme::StatusSchemeError;
+ errStr = "Schema presets are not supported for standalone column tables";
+ return nullptr;
+ }
+
+ if (!op.HasSchema()) {
+ status = NKikimrScheme::StatusSchemeError;
+ errStr = "No schema for column table specified";
+ return nullptr;
+ }
+
+ NKikimrSchemeOp::TColumnTableSchema* opSchema = op.MutableSchema();
+ tableInfo->Schema = TOlapSchema();
+ auto& schema = *tableInfo->Schema;
+
+ if (!PrepareSchema(*opSchema, schema, errStr)) {
+ status = NKikimrScheme::StatusSchemeError;
+ return nullptr;
+ }
+
+ if (op.HasTtlSettings()) {
+ op.MutableTtlSettings()->SetVersion(1);
+
+ if (!ValidateTtlSettings(op.GetTtlSettings(), schema.Columns, schema.ColumnsByName, errStr)) {
+ status = NKikimrScheme::StatusInvalidParameter;
+ return nullptr;
+ }
+ }
+
+ if (!SetSharding(schema, op, tableInfo, status, errStr)) {
+ return nullptr;
+ }
+
+ if (!op.GetStorageConfig().HasDataChannelCount()) {
+ op.MutableStorageConfig()->SetDataChannelCount(1);
+ }
return tableInfo;
}
+
class TConfigureParts: public TSubOperationState {
private:
TOperationId OperationId;
@@ -308,50 +401,67 @@ public:
TPathId pathId = txState->TargetPathId;
TPath path = TPath::Init(pathId, context.SS);
- TString pathString = path.PathString();
TColumnTableInfo::TPtr pendingInfo = context.SS->ColumnTables[pathId];
Y_VERIFY(pendingInfo);
Y_VERIFY(pendingInfo->AlterData);
TColumnTableInfo::TPtr tableInfo = pendingInfo->AlterData;
- auto olapStorePath = path.FindOlapStore();
- Y_VERIFY(olapStorePath, "Unexpected failure to find an olap store");
- auto storeInfo = context.SS->OlapStores.at(olapStorePath->PathId);
-
txState->ClearShardsInProgress();
auto seqNo = context.SS->StartRound(*txState);
+ Y_VERIFY(tableInfo->ColumnShards.empty() || tableInfo->OwnedColumnShards.empty());
+
TString columnShardTxBody;
+ NKikimrTxColumnShard::TSchemaTxBody tx;
+ context.SS->FillSeqNo(tx, seqNo);
{
- NKikimrTxColumnShard::TSchemaTxBody tx;
- context.SS->FillSeqNo(tx, seqNo);
+ NKikimrTxColumnShard::TCreateTable* create{};
+ if (tableInfo->IsStandalone()) {
+ Y_VERIFY(tableInfo->ColumnShards.empty());
+ Y_VERIFY(tableInfo->Description.HasSchema());
- auto* create = tx.MutableEnsureTables()->AddTables();
+ auto* init = tx.MutableInitShard();
+ init->SetDataChannelCount(tableInfo->Description.GetStorageConfig().GetDataChannelCount());
+ init->SetOwnerPathId(pathId.LocalPathId);
+ init->SetOwnerPath(path.PathString());
- create->SetPathId(pathId.LocalPathId);
- if (tableInfo->Description.HasSchema()) {
+ create = init->AddTables();
create->MutableSchema()->CopyFrom(tableInfo->Description.GetSchema());
- }
- if (tableInfo->Description.HasSchemaPresetId()) {
+ } else {
+ Y_VERIFY(tableInfo->OwnedColumnShards.empty());
+ Y_VERIFY(!tableInfo->Description.HasSchema());
+ Y_VERIFY(tableInfo->Description.HasSchemaPresetId());
+
+ create = tx.MutableEnsureTables()->AddTables();
+
+ if (tableInfo->Description.HasSchemaPresetVersionAdj()) {
+ create->SetSchemaPresetVersionAdj(tableInfo->Description.GetSchemaPresetVersionAdj());
+ }
+
+ auto olapStorePath = path.FindOlapStore();
+ Y_VERIFY(olapStorePath, "Unexpected failure to find a tablestore");
+ auto storeInfo = context.SS->OlapStores.at(olapStorePath->PathId);
+
const ui32 presetId = tableInfo->Description.GetSchemaPresetId();
Y_VERIFY(storeInfo->SchemaPresets.contains(presetId),
- "Failed to find schema preset %" PRIu32 " in an olap store", presetId);
+ "Failed to find schema preset %" PRIu32 " in a tablestore", presetId);
auto& preset = storeInfo->SchemaPresets.at(presetId);
size_t presetIndex = preset.ProtoIndex;
create->MutableSchemaPreset()->CopyFrom(storeInfo->Description.GetSchemaPresets(presetIndex));
}
+
+ Y_VERIFY(create);
+ create->SetPathId(pathId.LocalPathId);
+
if (tableInfo->Description.HasTtlSettings()) {
create->MutableTtlSettings()->CopyFrom(tableInfo->Description.GetTtlSettings());
}
- if (tableInfo->Description.HasSchemaPresetVersionAdj()) {
- create->SetSchemaPresetVersionAdj(tableInfo->Description.GetSchemaPresetVersionAdj());
- }
-
- Y_VERIFY(tx.SerializeToString(&columnShardTxBody));
}
+ Y_VERIFY(tx.SerializeToString(&columnShardTxBody));
+
for (auto& shard : txState->Shards) {
TTabletId tabletId = context.SS->ShardInfos[shard.Idx].TabletID;
@@ -423,6 +533,11 @@ public:
Y_VERIFY(pending);
TColumnTableInfo::TPtr table = pending->AlterData;
Y_VERIFY(table);
+ if (table->IsStandalone()) {
+ Y_VERIFY(table->ColumnShards.empty());
+ SetShardingTablets(table, table->OwnedColumnShards, table->OwnedColumnShards.size(), false, context.SS);
+ }
+
context.SS->ColumnTables[pathId] = table;
context.SS->PersistColumnTableAlterRemove(db, pathId);
@@ -544,13 +659,18 @@ class TCreateColumnTable: public TSubOperation {
const TTxTransaction Transaction;
TTxState::ETxState State = TTxState::Invalid;
- TTxState::ETxState NextState() {
- return TTxState::ConfigureParts;
+ TTxState::ETxState NextState(bool inStore) {
+ if (inStore) {
+ return TTxState::ConfigureParts;
+ }
+ return TTxState::CreateParts;
}
TTxState::ETxState NextState(TTxState::ETxState state) {
switch(state) {
case TTxState::Waiting:
+ case TTxState::CreateParts:
+ return TTxState::ConfigureParts;
case TTxState::ConfigureParts:
return TTxState::Propose;
case TTxState::Propose:
@@ -568,6 +688,8 @@ class TCreateColumnTable: public TSubOperation {
switch(state) {
case TTxState::Waiting:
+ case TTxState::CreateParts:
+ return TPtr(new TCreateParts(OperationId));
case TTxState::ConfigureParts:
return TPtr(new TConfigureParts(OperationId));
case TTxState::Propose:
@@ -610,6 +732,8 @@ public:
const TString& parentPathStr = Transaction.GetWorkingDir();
auto& createDescription = Transaction.GetCreateColumnTable();
const TString& name = createDescription.GetName();
+ const ui32 shardsCount = Max(ui32(1), createDescription.GetColumnShardCount());
+ auto opTxId = OperationId.GetTxId();
LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"TCreateColumnTable Propose"
@@ -618,8 +742,9 @@ public:
<< ", at schemeshard: " << ssId);
TEvSchemeShard::EStatus status = NKikimrScheme::StatusAccepted;
- auto result = MakeHolder<TProposeResponse>(status, ui64(OperationId.GetTxId()), ui64(ssId));
+ auto result = MakeHolder<TProposeResponse>(status, ui64(opTxId), ui64(ssId));
+ TOlapStoreInfo::TPtr storeInfo;
NSchemeShard::TPath parentPath = NSchemeShard::TPath::Resolve(parentPathStr, context.SS);
{
NSchemeShard::TPath::TChecker checks = parentPath.Check();
@@ -629,7 +754,6 @@ public:
.IsResolved()
.NotDeleted()
.NotUnderDeleting()
- .HasOlapStore()
.IsCommonSensePath()
.IsLikeDirectory();
@@ -640,6 +764,29 @@ public:
result->SetError(status, explain);
return result;
}
+
+ if (auto olapStorePath = parentPath.FindOlapStore()) {
+ storeInfo = context.SS->OlapStores.at(olapStorePath->PathId);
+ Y_VERIFY(storeInfo, "Unexpected failure to find an tablestore info");
+
+ NSchemeShard::TPath::TChecker ckecksStore = olapStorePath.Check();
+ ckecksStore
+ .NotUnderDomainUpgrade()
+ .IsAtLocalSchemeShard()
+ .IsResolved()
+ .NotDeleted()
+ .NotUnderDeleting()
+ .IsOlapStore()
+ .NotUnderOperation();
+
+ if (!ckecksStore) {
+ TString explain = TStringBuilder() << "tablestore fail checks"
+ << ", path: " << olapStorePath.PathString();
+ auto status = ckecksStore.GetStatus(&explain);
+ result->SetError(status, explain);
+ return result;
+ }
+ }
}
const TString acl = Transaction.GetModifyACL().GetDiffACL();
@@ -664,6 +811,8 @@ public:
.IsValidLeafName()
.DepthLimit()
.PathsLimit()
+ .ShardsLimit(storeInfo ? 0 : shardsCount)
+ .PathShardsLimit(storeInfo ? 0 : shardsCount)
.DirChildrenLimit()
.IsValidACL(acl);
}
@@ -688,40 +837,35 @@ public:
return result;
}
- auto olapStorePath = dstPath.FindOlapStore();
- Y_VERIFY(olapStorePath, "Unexpected failure to find an olap store");
- auto storeInfo = context.SS->OlapStores.at(olapStorePath->PathId);
- {
- NSchemeShard::TPath::TChecker checks = olapStorePath.Check();
- checks
- .NotUnderDomainUpgrade()
- .IsAtLocalSchemeShard()
- .IsResolved()
- .NotDeleted()
- .NotUnderDeleting()
- .IsOlapStore()
- .NotUnderOperation();
-
- if (!checks) {
- TString explain = TStringBuilder() << "olap store fail checks"
- << ", path: " << olapStorePath.PathString();
- auto status = checks.GetStatus(&explain);
- result->SetError(status, explain);
- return result;
- }
- }
-
if (!AppData()->FeatureFlags.GetEnableOlapSchemaOperations()) {
result->SetError(NKikimrScheme::StatusPreconditionFailed,
"Olap schema operations are not supported");
return result;
}
- TColumnTableInfo::TPtr tableInfo = CreateColumnTable(createDescription, storeInfo, *parentPath.DomainInfo(), status, errStr, context.SS);
+ TColumnTableInfo::TPtr tableInfo = new TColumnTableInfo;
+ {
+ tableInfo->AlterVersion = 1;
+ tableInfo->Description.CopyFrom(createDescription);
+ // Don't allow users to set these fields
+ tableInfo->Description.ClearSchemaPresetVersionAdj();
+ tableInfo->Description.ClearTtlSettingsPresetVersionAdj();
+ }
+
+ if (storeInfo) {
+ tableInfo = CreateColumnTableInStore(tableInfo, storeInfo, shardsCount, status, errStr);
+ if (tableInfo) {
+ SetShardingTablets(tableInfo, storeInfo->ColumnShards, shardsCount, true, context.SS);
+ }
+ } else {
+ tableInfo = CreateColumnTable(tableInfo, status, errStr);
+ }
+
if (!tableInfo) {
result->SetError(status, errStr);
return result;
}
+
if (!context.SS->CheckInFlightLimit(TTxState::TxCreateColumnTable, errStr)) {
result->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
return result;
@@ -733,49 +877,111 @@ public:
context.SS->TabletCounters->Simple()[COUNTER_COLUMN_TABLE_COUNT].Add(1);
TPathId pathId = dstPath.Base()->PathId;
- dstPath.Base()->CreateTxId = OperationId.GetTxId();
- dstPath.Base()->LastTxId = OperationId.GetTxId();
+ dstPath.Base()->CreateTxId = opTxId;
+ dstPath.Base()->LastTxId = opTxId;
dstPath.Base()->PathState = TPathElement::EPathState::EPathStateCreate;
dstPath.Base()->PathType = TPathElement::EPathType::EPathTypeColumnTable;
NIceDb::TNiceDb db(context.GetDB());
TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxCreateColumnTable, pathId);
- txState.State = TTxState::ConfigureParts;
-
- txState.Shards.reserve(tableInfo->ColumnShards.size());
- for (ui64 columnShardId : tableInfo->ColumnShards) {
- auto tabletId = TTabletId(columnShardId);
- auto shardIdx = context.SS->TabletIdToShardIdx.at(tabletId);
- TShardInfo& shardInfo = context.SS->ShardInfos.at(shardIdx);
- txState.Shards.emplace_back(shardIdx, ETabletType::ColumnShard, TTxState::ConfigureParts);
- // N.B. we seem to only need CurrentTxId when creating/modifying tablets
- shardInfo.CurrentTxId = OperationId.GetTxId();
- context.SS->PersistShardTx(db, shardIdx, OperationId.GetTxId());
- }
-
- TColumnTableInfo::TPtr pending = new TColumnTableInfo;
- pending->AlterData = tableInfo;
- pending->SetOlapStorePathId(olapStorePath->PathId);
- tableInfo->SetOlapStorePathId(olapStorePath->PathId);
- context.SS->ColumnTables[pathId] = pending;
- storeInfo->ColumnTables.insert(pathId);
- storeInfo->ColumnTablesUnderOperation.insert(pathId);
- context.SS->PersistColumnTable(db, pathId, *pending);
- context.SS->PersistColumnTableAlter(db, pathId, *tableInfo);
- context.SS->IncrementPathDbRefCount(pathId);
-
- if (parentPath.Base()->HasActiveChanges()) {
- TTxId parentTxId = parentPath.Base()->PlannedToCreate() ? parentPath.Base()->CreateTxId : parentPath.Base()->LastTxId;
- context.OnComplete.Dependence(parentTxId, OperationId.GetTxId());
- }
-
- // Sequentially chain operations in the same olap store
- if (context.SS->Operations.contains(olapStorePath.Base()->LastTxId)) {
- context.OnComplete.Dependence(olapStorePath.Base()->LastTxId, OperationId.GetTxId());
- }
- olapStorePath.Base()->LastTxId = OperationId.GetTxId();
- context.SS->PersistLastTxId(db, olapStorePath.Base());
+
+ Y_VERIFY(tableInfo);
+ if (storeInfo) {
+ auto olapStorePath = parentPath.FindOlapStore();
+
+ txState.State = TTxState::ConfigureParts;
+ txState.Shards.reserve(tableInfo->ColumnShards.size());
+
+ for (ui64 columnShardId : tableInfo->ColumnShards) {
+ auto tabletId = TTabletId(columnShardId);
+ auto shardIdx = context.SS->TabletIdToShardIdx.at(tabletId);
+ TShardInfo& shardInfo = context.SS->ShardInfos.at(shardIdx);
+ txState.Shards.emplace_back(shardIdx, ETabletType::ColumnShard, TTxState::ConfigureParts);
+ // N.B. we seem to only need CurrentTxId when creating/modifying tablets
+ shardInfo.CurrentTxId = opTxId;
+ context.SS->PersistShardTx(db, shardIdx, opTxId);
+ }
+
+ TColumnTableInfo::TPtr pending = new TColumnTableInfo;
+ pending->AlterData = tableInfo;
+ pending->SetOlapStorePathId(olapStorePath->PathId);
+ tableInfo->SetOlapStorePathId(olapStorePath->PathId);
+ context.SS->ColumnTables[pathId] = pending;
+ storeInfo->ColumnTables.insert(pathId);
+ storeInfo->ColumnTablesUnderOperation.insert(pathId);
+ context.SS->PersistColumnTable(db, pathId, *pending);
+ context.SS->PersistColumnTableAlter(db, pathId, *tableInfo);
+ context.SS->IncrementPathDbRefCount(pathId);
+
+ if (parentPath.Base()->HasActiveChanges()) {
+ TTxId parentTxId = parentPath.Base()->PlannedToCreate()
+ ? parentPath.Base()->CreateTxId : parentPath.Base()->LastTxId;
+ context.OnComplete.Dependence(parentTxId, opTxId);
+ }
+
+ // Sequentially chain operations in the same store
+ if (context.SS->Operations.contains(olapStorePath.Base()->LastTxId)) {
+ context.OnComplete.Dependence(olapStorePath.Base()->LastTxId, opTxId);
+ }
+ olapStorePath.Base()->LastTxId = opTxId;
+ context.SS->PersistLastTxId(db, olapStorePath.Base());
+ } else {
+ NKikimrSchemeOp::TColumnStorageConfig storageConfig; // default
+ storageConfig.SetDataChannelCount(1);
+
+ TChannelsBindings channelsBindings;
+ if (!context.SS->GetOlapChannelsBindings(dstPath.GetPathIdForDomain(),
+ storageConfig, channelsBindings, errStr))
+ {
+ result->SetError(NKikimrScheme::StatusInvalidParameter, errStr);
+ return result;
+ }
+
+ txState.State = TTxState::CreateParts;
+ txState.Shards.reserve(shardsCount);
+
+ TShardInfo columnShardInfo = TShardInfo::ColumnShardInfo(opTxId, pathId);
+ columnShardInfo.BindedChannels = channelsBindings;
+
+ tableInfo->StandaloneSharding = NKikimrSchemeOp::TColumnStoreSharding();
+ Y_VERIFY(tableInfo->OwnedColumnShards.empty());
+ tableInfo->OwnedColumnShards.reserve(shardsCount);
+
+ for (ui64 i = 0; i < shardsCount; ++i) {
+ TShardIdx idx = context.SS->RegisterShardInfo(columnShardInfo);
+ context.SS->TabletCounters->Simple()[COUNTER_COLUMN_SHARDS].Add(1);
+ txState.Shards.emplace_back(idx, ETabletType::ColumnShard, TTxState::CreateParts);
+
+ auto* shardInfoProto = tableInfo->StandaloneSharding->AddColumnShards();
+ shardInfoProto->SetOwnerId(idx.GetOwnerId());
+ shardInfoProto->SetLocalId(idx.GetLocalId().GetValue());
+
+ tableInfo->OwnedColumnShards.emplace_back(std::move(idx));
+ }
+
+ context.SS->SetPartitioning(pathId, tableInfo);
+
+ for (auto shard : txState.Shards) {
+ context.SS->PersistShardMapping(db, shard.Idx, InvalidTabletId, pathId, opTxId, shard.TabletType);
+ context.SS->PersistChannelsBinding(db, shard.Idx, channelsBindings);
+ }
+ Y_VERIFY(txState.Shards.size() == shardsCount);
+
+ TColumnTableInfo::TPtr pending = new TColumnTableInfo;
+ pending->AlterData = tableInfo;
+
+ context.SS->ColumnTables[pathId] = pending;
+ context.SS->PersistColumnTable(db, pathId, *pending);
+ context.SS->PersistColumnTableAlter(db, pathId, *tableInfo);
+ context.SS->IncrementPathDbRefCount(pathId);
+
+ if (parentPath.Base()->HasActiveChanges()) {
+ TTxId parentTxId = parentPath.Base()->PlannedToCreate()
+ ? parentPath.Base()->CreateTxId : parentPath.Base()->LastTxId;
+ context.OnComplete.Dependence(parentTxId, opTxId);
+ }
+ }
context.SS->PersistTxState(db, OperationId);
context.SS->PersistPath(db, dstPath.Base()->PathId);
@@ -799,9 +1005,13 @@ public:
context.OnComplete.PublishToSchemeBoard(OperationId, dstPath.Base()->PathId);
dstPath.DomainInfo()->IncPathsInside();
+ if (!storeInfo) {
+ dstPath.DomainInfo()->AddInternalShards(txState);
+ dstPath.Base()->IncShardsInside(tableInfo->OwnedColumnShards.size());
+ }
parentPath.Base()->IncAliveChildren();
- State = NextState();
+ State = NextState(!!storeInfo);
SetState(SelectStateFunc(State));
return result;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp
index 446c86b93b4..f1d294b5e27 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp
@@ -446,11 +446,28 @@ TVector<ISubOperationBase::TPtr> CreateDropIndexedTable(TOperationId nextId, con
checks
.NotEmpty()
.IsResolved()
- .NotDeleted()
- .IsTable()
- .NotUnderDeleting()
- .NotUnderOperation()
- .IsCommonSensePath();
+ .NotDeleted();
+
+ if (checks) {
+ if (table.Base()->IsColumnTable()) {
+ checks
+ .IsColumnTable()
+ .NotUnderDeleting()
+ .NotUnderOperation()
+ .IsCommonSensePath();
+
+ if (checks) {
+ // DROP TABLE statement has no info is it a drop of row or column table
+ return {CreateDropColumnTable(nextId, tx)};
+ }
+ } else {
+ checks
+ .IsTable()
+ .NotUnderDeleting()
+ .NotUnderOperation()
+ .IsCommonSensePath();
+ }
+ }
if (!checks) {
TString explain = TStringBuilder() << "path table fail checks"
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_store.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_store.cpp
index 369b25fa8b1..174ad91e908 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_store.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_store.cpp
@@ -189,19 +189,13 @@ public:
txState->ClearShardsInProgress();
for (auto& shard : txState->Shards) {
+ Y_VERIFY(shard.TabletType == ETabletType::ColumnShard);
+
TTabletId tabletId = context.SS->ShardInfos[shard.Idx].TabletID;
- switch (shard.TabletType) {
- case ETabletType::ColumnShard: {
- auto event = std::make_unique<TEvColumnShard::TEvNotifyTxCompletion>(ui64(OperationId.GetTxId()));
+ auto event = std::make_unique<TEvColumnShard::TEvNotifyTxCompletion>(ui64(OperationId.GetTxId()));
- context.OnComplete.BindMsgToPipe(OperationId, tabletId, shard.Idx, event.release());
- txState->ShardsInProgress.insert(shard.Idx);
- break;
- }
- default: {
- Y_FAIL("unexpected tablet type");
- }
- }
+ context.OnComplete.BindMsgToPipe(OperationId, tabletId, shard.Idx, event.release());
+ txState->ShardsInProgress.insert(shard.Idx);
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
DebugHint() << " ProgressState"
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp
index 21719ccdde2..bdb673602af 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp
@@ -60,9 +60,11 @@ public:
}
for (auto& shard : txState->Shards) {
+ Y_VERIFY(shard.TabletType == ETabletType::ColumnShard);
+
TTabletId tabletId = context.SS->ShardInfos[shard.Idx].TabletID;
- if (shard.TabletType == ETabletType::ColumnShard) {
+ {
auto event = std::make_unique<TEvColumnShard::TEvProposeTransaction>(
NKikimrTxColumnShard::TX_KIND_SCHEMA,
context.SS->TabletID(),
@@ -72,8 +74,6 @@ public:
context.SS->SelectProcessingPrarams(txState->TargetPathId));
context.OnComplete.BindMsgToPipe(OperationId, tabletId, shard.Idx, event.release());
- } else {
- Y_FAIL("unexpected tablet type");
}
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
@@ -233,19 +233,13 @@ public:
txState->ClearShardsInProgress();
for (auto& shard : txState->Shards) {
+ Y_VERIFY(shard.TabletType == ETabletType::ColumnShard);
+
TTabletId tabletId = context.SS->ShardInfos[shard.Idx].TabletID;
- switch (shard.TabletType) {
- case ETabletType::ColumnShard: {
- auto event = std::make_unique<TEvColumnShard::TEvNotifyTxCompletion>(ui64(OperationId.GetTxId()));
+ auto event = std::make_unique<TEvColumnShard::TEvNotifyTxCompletion>(ui64(OperationId.GetTxId()));
- context.OnComplete.BindMsgToPipe(OperationId, tabletId, shard.Idx, event.release());
- txState->ShardsInProgress.insert(shard.Idx);
- break;
- }
- default: {
- Y_FAIL("unexpected tablet type");
- }
- }
+ context.OnComplete.BindMsgToPipe(OperationId, tabletId, shard.Idx, event.release());
+ txState->ShardsInProgress.insert(shard.Idx);
LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
DebugHint() << " ProgressState"
@@ -289,9 +283,23 @@ public:
Y_VERIFY(txState);
Y_VERIFY(txState->TxType == TTxState::TxDropColumnTable);
+ bool isStandalone = false;
+ {
+ Y_VERIFY(context.SS->ColumnTables.contains(txState->TargetPathId));
+ TColumnTableInfo::TPtr tableInfo = context.SS->ColumnTables.at(txState->TargetPathId);
+ Y_VERIFY(tableInfo);
+ isStandalone = tableInfo->IsStandalone();
+ }
+
NIceDb::TNiceDb db(context.GetDB());
context.SS->PersistColumnTableRemove(db, txState->TargetPathId);
+ if (isStandalone) {
+ for (auto& shard : txState->Shards) {
+ context.OnComplete.DeleteShard(shard.Idx);
+ }
+ }
+
context.OnComplete.DoneOperation(OperationId);
return true;
}
@@ -319,6 +327,7 @@ public:
const TString& parentPathStr = Transaction.GetWorkingDir();
const TString& name = drop.GetName();
+ auto opTxId = OperationId.GetTxId();
LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"TDropColumnTable Propose"
@@ -327,7 +336,7 @@ public:
<< ", opId: " << OperationId
<< ", at schemeshard: " << ssId);
- auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64(OperationId.GetTxId()), ui64(ssId));
+ auto result = MakeHolder<TProposeResponse>(NKikimrScheme::StatusAccepted, ui64(opTxId), ui64(ssId));
TPath path = drop.HasId()
? TPath::Init(context.SS->MakeLocalId(drop.GetId()), context.SS)
@@ -378,30 +387,6 @@ public:
}
}
- Y_VERIFY(context.SS->ColumnTables.contains(path.Base()->PathId));
- TColumnTableInfo::TPtr tableInfo = context.SS->ColumnTables.at(path.Base()->PathId);
-
- TPath storePath = TPath::Init(tableInfo->OlapStorePathId, context.SS);
- {
- TPath::TChecker checks = storePath.Check();
- checks
- .NotEmpty()
- .IsResolved()
- .IsOlapStore()
- .NotUnderOperation();
-
- if (!checks) {
- TString explain = TStringBuilder() << "store path fail checks"
- << ", path: " << storePath.PathString();
- auto status = checks.GetStatus(&explain);
- result->SetError(status, explain);
- return result;
- }
- }
-
- Y_VERIFY(context.SS->OlapStores.contains(tableInfo->OlapStorePathId));
- TOlapStoreInfo::TPtr storeInfo = context.SS->OlapStores.at(tableInfo->OlapStorePathId);
-
TString errStr;
if (!context.SS->CheckApplyIf(Transaction, errStr)) {
result->SetError(NKikimrScheme::StatusPreconditionFailed, errStr);
@@ -412,9 +397,6 @@ public:
return result;
}
- Y_VERIFY(storeInfo->ColumnTables.contains(path->PathId));
- storeInfo->ColumnTablesUnderOperation.insert(path->PathId);
-
TTxState& txState = context.SS->CreateTx(OperationId, TTxState::TxDropColumnTable, path.Base()->PathId);
txState.State = TTxState::DropParts;
// Dirty hack: drop step must not be zero because 0 is treated as "hasn't been dropped"
@@ -422,30 +404,68 @@ public:
NIceDb::TNiceDb db(context.GetDB());
- // TODO: we need to know all shards where this table has ever been created
- for (ui64 columnShardId : tableInfo->ColumnShards) {
- auto tabletId = TTabletId(columnShardId);
- auto shardIdx = context.SS->TabletIdToShardIdx.at(tabletId);
+ Y_VERIFY(context.SS->ColumnTables.contains(path.Base()->PathId));
+ TColumnTableInfo::TPtr tableInfo = context.SS->ColumnTables.at(path.Base()->PathId);
- Y_VERIFY_S(context.SS->ShardInfos.contains(shardIdx), "Unknown shardIdx " << shardIdx);
- txState.Shards.emplace_back(shardIdx, context.SS->ShardInfos[shardIdx].TabletType, TTxState::DropParts);
+ if (tableInfo->IsStandalone()) {
+ for (auto shardIdx : tableInfo->OwnedColumnShards) {
+ Y_VERIFY_S(context.SS->ShardInfos.contains(shardIdx), "Unknown shardIdx " << shardIdx);
+ txState.Shards.emplace_back(shardIdx, context.SS->ShardInfos[shardIdx].TabletType, TTxState::DropParts);
- context.SS->ShardInfos[shardIdx].CurrentTxId = OperationId.GetTxId();
- context.SS->PersistShardTx(db, shardIdx, OperationId.GetTxId());
- }
+ context.SS->ShardInfos[shardIdx].CurrentTxId = opTxId;
+ context.SS->PersistShardTx(db, shardIdx, opTxId);
+ }
+ } else {
+ auto& storePathId = *tableInfo->OlapStorePathId;
+ TPath storePath = TPath::Init(storePathId, context.SS);
+ {
+ TPath::TChecker checks = storePath.Check();
+ checks
+ .NotEmpty()
+ .IsResolved()
+ .IsOlapStore()
+ .NotUnderOperation();
+
+ if (!checks) {
+ TString explain = TStringBuilder() << "store path fail checks"
+ << ", path: " << storePath.PathString();
+ auto status = checks.GetStatus(&explain);
+ result->SetError(status, explain);
+ return result;
+ }
+ }
+
+ Y_VERIFY(context.SS->OlapStores.contains(storePathId));
+ TOlapStoreInfo::TPtr storeInfo = context.SS->OlapStores.at(storePathId);
+
+ Y_VERIFY(storeInfo->ColumnTables.contains(path->PathId));
+ storeInfo->ColumnTablesUnderOperation.insert(path->PathId);
- // Sequentially chain operations in the same olap store
- if (context.SS->Operations.contains(storePath.Base()->LastTxId)) {
- context.OnComplete.Dependence(storePath.Base()->LastTxId, OperationId.GetTxId());
+ // Sequentially chain operations in the same olap store
+ if (context.SS->Operations.contains(storePath.Base()->LastTxId)) {
+ context.OnComplete.Dependence(storePath.Base()->LastTxId, opTxId);
+ }
+ storePath.Base()->LastTxId = opTxId;
+ context.SS->PersistLastTxId(db, storePath.Base());
+
+ // TODO: we need to know all shards where this table has ever been created
+ for (ui64 columnShardId : tableInfo->ColumnShards) {
+ auto tabletId = TTabletId(columnShardId);
+ auto shardIdx = context.SS->TabletIdToShardIdx.at(tabletId);
+
+ Y_VERIFY_S(context.SS->ShardInfos.contains(shardIdx), "Unknown shardIdx " << shardIdx);
+ txState.Shards.emplace_back(shardIdx, context.SS->ShardInfos[shardIdx].TabletType, TTxState::DropParts);
+
+ context.SS->ShardInfos[shardIdx].CurrentTxId = opTxId;
+ context.SS->PersistShardTx(db, shardIdx, opTxId);
+ }
}
- storePath.Base()->LastTxId = OperationId.GetTxId();
- context.SS->PersistLastTxId(db, storePath.Base());
context.OnComplete.ActivateTx(OperationId);
path.Base()->PathState = TPathElement::EPathState::EPathStateDrop;
- path.Base()->DropTxId = OperationId.GetTxId();
- path.Base()->LastTxId = OperationId.GetTxId();
+ path.Base()->DropTxId = opTxId;
+ path.Base()->LastTxId = opTxId;
context.SS->PersistLastTxId(db, path.Base());
context.SS->PersistTxState(db, OperationId);
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index 5999e2f0b94..de6e27a2eee 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -3005,11 +3005,23 @@ void TSchemeShard::PersistColumnTable(NIceDb::TNiceDb& db, TPathId pathId, const
db.Table<Schema::ColumnTablesAlters>().Key(pathId.LocalPathId).Update(
NIceDb::TUpdate<Schema::ColumnTablesAlters::AlterBody>(serializedAlterBody));
}
+ if (tableInfo.StandaloneSharding) {
+ TString serializedOwnedShards;
+ Y_VERIFY(tableInfo.StandaloneSharding->SerializeToString(&serializedOwnedShards));
+ db.Table<Schema::ColumnTablesAlters>().Key(pathId.LocalPathId).Update(
+ NIceDb::TUpdate<Schema::ColumnTablesAlters::StandaloneSharding>(serializedOwnedShards));
+ }
} else {
db.Table<Schema::ColumnTables>().Key(pathId.LocalPathId).Update(
NIceDb::TUpdate<Schema::ColumnTables::AlterVersion>(tableInfo.AlterVersion),
NIceDb::TUpdate<Schema::ColumnTables::Description>(serialized),
NIceDb::TUpdate<Schema::ColumnTables::Sharding>(serializedSharding));
+ if (tableInfo.StandaloneSharding) {
+ TString serializedOwnedShards;
+ Y_VERIFY(tableInfo.StandaloneSharding->SerializeToString(&serializedOwnedShards));
+ db.Table<Schema::ColumnTables>().Key(pathId.LocalPathId).Update(
+ NIceDb::TUpdate<Schema::ColumnTables::StandaloneSharding>(serializedOwnedShards));
+ }
}
}
@@ -3032,8 +3044,9 @@ void TSchemeShard::PersistColumnTableRemove(NIceDb::TNiceDb& db, TPathId pathId,
}
// Unlink table from olap store
- if (OlapStores.contains(tableInfo->OlapStorePathId)) {
- auto storeInfo = OlapStores.at(tableInfo->OlapStorePathId);
+ if (tableInfo->OlapStorePathId && *tableInfo->OlapStorePathId) {
+ Y_VERIFY(OlapStores.contains(*tableInfo->OlapStorePathId));
+ auto storeInfo = OlapStores.at(*tableInfo->OlapStorePathId);
storeInfo->ColumnTablesUnderOperation.erase(pathId);
storeInfo->ColumnTables.erase(pathId);
}
@@ -3657,7 +3670,8 @@ NKikimrSchemeOp::TPathVersion TSchemeShard::GetPathVersion(const TPath& path) co
if (tableInfo->Description.HasSchema()) {
result.SetColumnTableSchemaVersion(tableInfo->Description.GetSchema().GetVersion());
} else if (tableInfo->Description.HasSchemaPresetId() && tableInfo->OlapStorePathId) {
- auto storeInfo = OlapStores.at(tableInfo->OlapStorePathId);
+ Y_VERIFY(OlapStores.contains(*tableInfo->OlapStorePathId));
+ auto& storeInfo = OlapStores.at(*tableInfo->OlapStorePathId);
auto& preset = storeInfo->SchemaPresets.at(tableInfo->Description.GetSchemaPresetId());
result.SetColumnTableSchemaVersion(tableInfo->Description.GetSchemaPresetVersionAdj() + preset.Version);
} else {
@@ -5954,9 +5968,7 @@ bool TSchemeShard::FillUniformPartitioning(TVector<TString>& rangeEnds, ui32 key
return true;
}
-void TSchemeShard::SetPartitioning(TPathId pathId, TOlapStoreInfo::TPtr storeInfo) {
- const TVector<TShardIdx>& partitioning = storeInfo->ColumnShards;
-
+void TSchemeShard::SetPartitioning(TPathId pathId, const TVector<TShardIdx>& partitioning) {
if (AppData()->FeatureFlags.GetEnableSystemViews()) {
TVector<std::pair<ui64, ui64>> shardIndices;
shardIndices.reserve(partitioning.size());
@@ -5971,6 +5983,14 @@ void TSchemeShard::SetPartitioning(TPathId pathId, TOlapStoreInfo::TPtr storeInf
}
}
+void TSchemeShard::SetPartitioning(TPathId pathId, TOlapStoreInfo::TPtr storeInfo) {
+ SetPartitioning(pathId, storeInfo->ColumnShards);
+}
+
+void TSchemeShard::SetPartitioning(TPathId pathId, TColumnTableInfo::TPtr tableInfo) {
+ SetPartitioning(pathId, tableInfo->OwnedColumnShards);
+}
+
void TSchemeShard::SetPartitioning(TPathId pathId, TTableInfo::TPtr tableInfo, TVector<TTableShardInfo>&& newPartitioning) {
if (AppData()->FeatureFlags.GetEnableSystemViews()) {
TVector<std::pair<ui64, ui64>> shardIndices;
@@ -6232,7 +6252,7 @@ void TSchemeShard::ConfigureStatsOperations(const NKikimrConfig::TSchemeShardCon
auto txState = TTxState::ConvertToTxType(operationConfig.GetType());
InFlightLimits[txState] = limit;
}
-
+
if (InFlightLimits.empty()) {
NKikimrConfig::TSchemeShardConfig_TInFlightCounterConfig inFlightCounterConfig;
auto defaultInFlightLimit = inFlightCounterConfig.GetInFlightLimit();
@@ -6252,7 +6272,7 @@ void TSchemeShard::ConfigureStatsOperations(const NKikimrConfig::TSchemeShardCon
bool TSchemeShard::CheckInFlightLimit(const TTxState::ETxType txType, TString& errStr) const {
auto it = InFlightLimits.find(txType);
if (it == InFlightLimits.end()) {
- return true;
+ return true;
}
if (it->second != 0 && TabletCounters->Simple()[TTxState::TxTypeInFlightCounter(txType)].Get() >= it->second)
{
@@ -6261,7 +6281,7 @@ bool TSchemeShard::CheckInFlightLimit(const TTxState::ETxType txType, TString& e
<< ", limit: " << it->second;
return false;
}
-
+
return true;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h
index 50bdc6bc3d4..859562ecfb1 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.h
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.h
@@ -537,7 +537,9 @@ public:
void DoShardsDeletion(const THashSet<TShardIdx>& shardIdx, const TActorContext& ctx);
+ void SetPartitioning(TPathId pathId, const TVector<TShardIdx>& partitioning);
void SetPartitioning(TPathId pathId, TOlapStoreInfo::TPtr storeInfo);
+ void SetPartitioning(TPathId pathId, TColumnTableInfo::TPtr tableInfo);
void SetPartitioning(TPathId pathId, TTableInfo::TPtr tableInfo, TVector<TTableShardInfo>&& newPartitioning);
auto BuildStatsForCollector(TPathId tableId, TShardIdx shardIdx, TTabletId datashardId,
TMaybe<ui32> nodeId, TMaybe<ui64> startTime, const TPartitionStats& stats);
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
index 5c9a351992d..84611de6408 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
@@ -1983,6 +1983,134 @@ NKikimr::NSchemeShard::TBillingStats::operator bool() const {
return Rows || Bytes;
}
+bool TOlapSchema::UpdateProto(NKikimrSchemeOp::TColumnTableSchema& proto, TString& errStr) {
+ ui32 nextColumnId = proto.GetNextColumnId();
+
+ const NScheme::TTypeRegistry* typeRegistry = AppData()->TypeRegistry;
+
+ for (auto& colProto : *proto.MutableColumns()) {
+ auto& colName = colProto.GetName();
+
+ if (!colProto.HasId()) {
+ colProto.SetId(nextColumnId++);
+ } else if (colProto.GetId() <= 0 || colProto.GetId() >= nextColumnId) {
+ errStr = Sprintf("Column id is incorrect");
+ return false;
+ }
+
+ if (colProto.HasTypeId()) {
+ errStr = Sprintf("Cannot set TypeId for column '%s', use Type", colName.c_str());
+ return false;
+ }
+ if (!colProto.HasType()) {
+ errStr = Sprintf("Missing Type for column '%s'", colName.c_str());
+ return false;
+ }
+
+ auto typeName = NMiniKQL::AdaptLegacyYqlType(colProto.GetType());
+ const NScheme::IType* type = typeRegistry->GetType(typeName);
+ NScheme::TTypeInfo typeInfo;
+ if (type) {
+ if (!NScheme::NTypeIds::IsYqlType(type->GetTypeId())) {
+ errStr = Sprintf("Type '%s' specified for column '%s' is not supported",
+ colProto.GetType().c_str(), colName.c_str());
+ return false;
+ }
+ typeInfo = NScheme::TTypeInfo(type->GetTypeId());
+ } else {
+#if 0 // TODO: support PG types in ColumnShard
+ auto* typeDesc = NPg::TypeDescFromPgTypeName(typeName);
+ if (!typeDesc) {
+ errStr = Sprintf("Type '%s' specified for column '%s' is not supported",
+ colProto.GetType().c_str(), colName.c_str());
+ return false;
+ }
+ typeInfo = NScheme::TTypeInfo(NScheme::NTypeIds::Pg, typeDesc);
+#else
+ errStr = Sprintf("Type '%s' specified for column '%s' is not supported",
+ colProto.GetType().c_str(), colName.c_str());
+ return false;
+#endif
+ }
+
+ auto columnType = NScheme::ProtoColumnTypeFromTypeInfo(typeInfo);
+ colProto.SetTypeId(columnType.TypeId);
+ if (columnType.TypeInfo) {
+ *colProto.MutableTypeInfo() = *columnType.TypeInfo;
+ }
+ }
+
+ proto.SetNextColumnId(nextColumnId);
+ return true;
+}
+
+bool TOlapSchema::Parse(const NKikimrSchemeOp::TColumnTableSchema& proto, TString& errStr) {
+ NextColumnId = proto.GetNextColumnId();
+
+ Columns.clear();
+ for (auto& colProto : proto.GetColumns()) {
+ if (colProto.GetName().empty()) {
+ errStr = Sprintf("Columns cannot have an empty name");
+ return false;
+ }
+
+ ui32 colId = colProto.GetId();
+ if (Columns.contains(colId)) {
+ errStr = Sprintf("Duplicate column id %" PRIu32 " for column '%s'", colId, colProto.GetName().c_str());
+ return false;
+ }
+ auto& col = Columns[colId];
+ col.Id = colId;
+ col.Name = colProto.GetName();
+
+ if (ColumnsByName.contains(col.Name)) {
+ errStr = Sprintf("Duplicate column '%s'", col.Name.c_str());
+ return false;
+ }
+
+ if (!colProto.HasTypeId()) {
+ errStr = Sprintf("No generated TypeId for column '%s'", col.Name.c_str());
+ return false;
+ }
+
+ if (colProto.HasTypeInfo()) {
+ col.Type = NScheme::TypeInfoFromProtoColumnType(colProto.GetTypeId(), &colProto.GetTypeInfo());
+ } else {
+ col.Type = NScheme::TTypeInfo(colProto.GetTypeId());
+ }
+
+ ColumnsByName[col.Name] = col.Id;
+ }
+
+ if (Columns.empty()) {
+ errStr = Sprintf("At least one column is required");
+ return false;
+ }
+
+ KeyColumnIds.clear();
+ for (const TString& keyName : proto.GetKeyColumnNames()) {
+ auto* col = FindColumnByName(keyName);
+ if (!col) {
+ errStr = Sprintf("Unknown key column '%s'", keyName.c_str());
+ return false;
+ }
+ if (col->IsKeyColumn()) {
+ errStr = Sprintf("Duplicate key column '%s'", keyName.c_str());
+ return false;
+ }
+ col->KeyOrder = KeyColumnIds.size();
+ KeyColumnIds.push_back(col->Id);
+ }
+
+ if (KeyColumnIds.empty()) {
+ errStr = "At least one key column is required";
+ return false;
+ }
+
+ Engine = proto.GetEngine();
+ return true;
+}
+
TOlapStoreInfo::TOlapStoreInfo(
ui64 alterVersion,
NKikimrSchemeOp::TColumnStoreDescription&& description,
@@ -2039,20 +2167,39 @@ TColumnTableInfo::TColumnTableInfo(
ui64 alterVersion,
NKikimrSchemeOp::TColumnTableDescription&& description,
NKikimrSchemeOp::TColumnTableSharding&& sharding,
+ TMaybe<NKikimrSchemeOp::TColumnStoreSharding>&& standaloneSharding,
TMaybe<NKikimrSchemeOp::TAlterColumnTable>&& alterBody)
: AlterVersion(alterVersion)
, Description(std::move(description))
, Sharding(std::move(sharding))
+ , StandaloneSharding(std::move(standaloneSharding))
, AlterBody(std::move(alterBody))
{
- OlapStorePathId = TPathId(
- TOwnerId(Description.GetColumnStorePathId().GetOwnerId()),
- TLocalPathId(Description.GetColumnStorePathId().GetLocalId()));
+ if (Description.HasColumnStorePathId()) {
+ OlapStorePathId = TPathId(
+ TOwnerId(Description.GetColumnStorePathId().GetOwnerId()),
+ TLocalPathId(Description.GetColumnStorePathId().GetLocalId()));
+ }
+
+ if (Description.HasSchema()) {
+ Schema = TOlapSchema();
+ TString strError;
+ Y_VERIFY((*Schema).Parse(Description.GetSchema(), strError), "Cannot parse column table schema");
+ }
ColumnShards.reserve(Sharding.GetColumnShards().size());
for (ui64 columnShard : Sharding.GetColumnShards()) {
ColumnShards.push_back(columnShard);
}
+
+ if (StandaloneSharding) {
+ OwnedColumnShards.reserve(StandaloneSharding->GetColumnShards().size());
+ for (const auto& shardIdx : StandaloneSharding->GetColumnShards()) {
+ OwnedColumnShards.push_back(TShardIdx(
+ TOwnerId(shardIdx.GetOwnerId()),
+ TLocalShardIdx(shardIdx.GetLocalId())));
+ }
+ }
}
TSequenceInfo::TSequenceInfo(
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h
index 1d03551aa85..4023f4e6ac4 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h
@@ -823,6 +823,9 @@ struct TOlapSchema {
}
return nullptr;
}
+
+ static bool UpdateProto(NKikimrSchemeOp::TColumnTableSchema& proto, TString& errStr);
+ bool Parse(const NKikimrSchemeOp::TColumnTableSchema& proto, TString& errStr);
};
struct TOlapStoreSchemaPreset : public TOlapSchema {
@@ -880,17 +883,20 @@ struct TColumnTableInfo : TSimpleRefCount<TColumnTableInfo> {
NKikimrSchemeOp::TColumnTableDescription Description;
NKikimrSchemeOp::TColumnTableSharding Sharding;
+ TMaybe<NKikimrSchemeOp::TColumnStoreSharding> StandaloneSharding;
TMaybe<NKikimrSchemeOp::TAlterColumnTable> AlterBody;
- // Path id of the olap store
- TPathId OlapStorePathId;
+ TMaybe<TPathId> OlapStorePathId; // PathId of the table store
+ TMaybe<TOlapSchema> Schema; // schema for standalone table
// Current list of column shards
TVector<ui64> ColumnShards;
+ TVector<TShardIdx> OwnedColumnShards;
TColumnTableInfo() = default;
TColumnTableInfo(ui64 alterVersion, NKikimrSchemeOp::TColumnTableDescription&& description,
NKikimrSchemeOp::TColumnTableSharding&& sharding,
+ TMaybe<NKikimrSchemeOp::TColumnStoreSharding>&& standaloneSharding,
TMaybe<NKikimrSchemeOp::TAlterColumnTable>&& alterBody = Nothing());
void SetOlapStorePathId(const TPathId& pathId) {
@@ -898,6 +904,12 @@ struct TColumnTableInfo : TSimpleRefCount<TColumnTableInfo> {
Description.MutableColumnStorePathId()->SetOwnerId(pathId.OwnerId);
Description.MutableColumnStorePathId()->SetLocalId(pathId.LocalPathId);
}
+
+ bool IsStandalone() const {
+ return !OwnedColumnShards.empty();
+ }
+
+ // TODO: UpdateShardStats(), GetStats() for standalone table
};
struct TPQShardInfo : TSimpleRefCount<TPQShardInfo> {
diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
index 80c6c09bf39..f6e4db42840 100644
--- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
@@ -369,8 +369,6 @@ void TPathDescriber::DescribeOlapStore(TPathId pathId, TPathElement::TPtr pathEl
void TPathDescriber::DescribeColumnTable(TPathId pathId, TPathElement::TPtr pathEl) {
const TColumnTableInfo::TPtr tableInfo = *Self->ColumnTables.FindPtr(pathId);
Y_VERIFY(tableInfo, "ColumnTable not found");
- const TOlapStoreInfo::TPtr storeInfo = *Self->OlapStores.FindPtr(tableInfo->OlapStorePathId);
- Y_VERIFY(storeInfo, "OlapStore not found");
Y_UNUSED(pathEl);
auto description = Result->Record.MutablePathDescription()->MutableColumnTableDescription();
@@ -378,6 +376,9 @@ void TPathDescriber::DescribeColumnTable(TPathId pathId, TPathElement::TPtr path
description->MutableSharding()->CopyFrom(tableInfo->Sharding);
if (!description->HasSchema() && description->HasSchemaPresetId()) {
+ const TOlapStoreInfo::TPtr storeInfo = *Self->OlapStores.FindPtr(*tableInfo->OlapStorePathId);
+ Y_VERIFY(storeInfo, "OlapStore not found");
+
auto& preset = storeInfo->SchemaPresets.at(description->GetSchemaPresetId());
auto& presetProto = storeInfo->Description.GetSchemaPresets(preset.ProtoIndex);
*description->MutableSchema() = presetProto.GetSchema();
diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h
index 76160368e86..f504946b830 100644
--- a/ydb/core/tx/schemeshard/schemeshard_schema.h
+++ b/ydb/core/tx/schemeshard/schemeshard_schema.h
@@ -1497,9 +1497,10 @@ struct Schema : NIceDb::Schema {
struct AlterVersion : Column<2, NScheme::NTypeIds::Uint64> {};
struct Description : Column<3, NScheme::NTypeIds::String> {}; // TColumnTableDescription
struct Sharding : Column<4, NScheme::NTypeIds::String> {}; // TColumnTableSharding
+ struct StandaloneSharding : Column<5, NScheme::NTypeIds::String> {}; // TColumnStoreSharding
using TKey = TableKey<PathId>;
- using TColumns = TableColumns<PathId, AlterVersion, Description, Sharding>;
+ using TColumns = TableColumns<PathId, AlterVersion, Description, Sharding, StandaloneSharding>;
};
struct ColumnTablesAlters : Table<91> {
@@ -1508,9 +1509,10 @@ struct Schema : NIceDb::Schema {
struct Description : Column<3, NScheme::NTypeIds::String> {}; // TColumnTableDescription
struct Sharding : Column<4, NScheme::NTypeIds::String> {}; // TColumnTableSharding
struct AlterBody : Column<5, NScheme::NTypeIds::String> {}; // TAlterColumnTable
+ struct StandaloneSharding : Column<6, NScheme::NTypeIds::String> {}; // TColumnStoreSharding
using TKey = TableKey<PathId>;
- using TColumns = TableColumns<PathId, AlterVersion, Description, Sharding, AlterBody>;
+ using TColumns = TableColumns<PathId, AlterVersion, Description, Sharding, AlterBody, StandaloneSharding>;
};
struct LoginKeys : Table<92> {
diff --git a/ydb/core/tx/schemeshard/ut_olap.cpp b/ydb/core/tx/schemeshard/ut_olap.cpp
index 82e6a5549ec..918bf68d4a1 100644
--- a/ydb/core/tx/schemeshard/ut_olap.cpp
+++ b/ydb/core/tx/schemeshard/ut_olap.cpp
@@ -28,6 +28,17 @@ static const TString defaultStoreSchema = R"(
}
)";
+TString defaultTableSchema = R"(
+ Name: "ColumnTable"
+ ColumnShardCount: 1
+ Schema {
+ Columns { Name: "timestamp" Type: "Timestamp" }
+ Columns { Name: "data" Type: "Utf8" }
+ KeyColumnNames: "timestamp"
+ Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
+ }
+)";
+
static const TVector<std::pair<TString, TTypeInfo>> defaultYdbSchema = {
{"timestamp", TTypeInfo(NTypeIds::Timestamp) },
{"data", TTypeInfo(NTypeIds::Utf8) }
@@ -359,7 +370,9 @@ Y_UNIT_TEST_SUITE(TOlap) {
TestCreateColumnTable(runtime, ++txId, "/MyRoot/OlapStore/MyDir", tableSchema);
env.TestWaitNotification(runtime, txId);
+
TestLsPathId(runtime, 4, NLs::PathStringEqual("/MyRoot/OlapStore/MyDir/ColumnTable"));
+
TestDropColumnTable(runtime, ++txId, "/MyRoot/OlapStore/MyDir", "ColumnTable");
env.TestWaitNotification(runtime, txId);
@@ -377,6 +390,28 @@ Y_UNIT_TEST_SUITE(TOlap) {
TestLsPathId(runtime, 2, NLs::PathStringEqual(""));
}
+ Y_UNIT_TEST(CreateDropStandaloneTable) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+ ui64 txId = 100;
+
+ TestMkDir(runtime, ++txId, "/MyRoot", "MyDir");
+ env.TestWaitNotification(runtime, txId);
+
+ TestLs(runtime, "/MyRoot/MyDir", false, NLs::PathExist);
+
+ TestCreateColumnTable(runtime, ++txId, "/MyRoot/MyDir", defaultTableSchema);
+ env.TestWaitNotification(runtime, txId);
+
+ TestLsPathId(runtime, 3, NLs::PathStringEqual("/MyRoot/MyDir/ColumnTable"));
+
+ TestDropColumnTable(runtime, ++txId, "/MyRoot/MyDir", "ColumnTable");
+ env.TestWaitNotification(runtime, txId);
+
+ TestLs(runtime, "/MyRoot/MyDir/ColumnTable", false, NLs::PathNotExist);
+ TestLsPathId(runtime, 3, NLs::PathStringEqual(""));
+ }
+
Y_UNIT_TEST(CreateTableTtl) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);
diff --git a/ydb/core/tx/schemeshard/ut_olap_reboots.cpp b/ydb/core/tx/schemeshard/ut_olap_reboots.cpp
index f07276278e9..a9e9c897e8b 100644
--- a/ydb/core/tx/schemeshard/ut_olap_reboots.cpp
+++ b/ydb/core/tx/schemeshard/ut_olap_reboots.cpp
@@ -79,6 +79,35 @@ Y_UNIT_TEST_SUITE(TOlapReboots) {
});
}
+ Y_UNIT_TEST(CreateStandaloneTable) {
+ TTestWithReboots t(false);
+ t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
+
+ {
+ TInactiveZone inactive(activeZone);
+ // no inactive initialization
+ }
+
+ TestCreateColumnTable(runtime, ++t.TxId, "/MyRoot", R"(
+ Name: "ColumnTable"
+ ColumnShardCount: 1
+ Schema {
+ Columns { Name: "timestamp" Type: "Timestamp" }
+ Columns { Name: "data" Type: "Utf8" }
+ KeyColumnNames: "timestamp"
+ Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
+ }
+ )");
+ t.TestEnv->TestWaitNotification(runtime, t.TxId);
+
+ {
+ TInactiveZone inactive(activeZone);
+
+ TestLs(runtime, "/MyRoot/ColumnTable", false, NLs::PathExist);
+ }
+ });
+ }
+
Y_UNIT_TEST(CreateDropTable) {
TTestWithReboots t(false);
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
@@ -119,6 +148,38 @@ Y_UNIT_TEST_SUITE(TOlapReboots) {
});
}
+ Y_UNIT_TEST(CreateDropStandaloneTable) {
+ TTestWithReboots t(false);
+ t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
+
+ {
+ TInactiveZone inactive(activeZone);
+ // no inactive initialization
+ }
+
+ TestCreateColumnTable(runtime, ++t.TxId, "/MyRoot", R"(
+ Name: "ColumnTable"
+ ColumnShardCount: 1
+ Schema {
+ Columns { Name: "timestamp" Type: "Timestamp" }
+ Columns { Name: "data" Type: "Utf8" }
+ KeyColumnNames: "timestamp"
+ Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
+ }
+ )");
+ t.TestEnv->TestWaitNotification(runtime, t.TxId);
+
+ TestDropColumnTable(runtime, ++t.TxId, "/MyRoot", "ColumnTable");
+ t.TestEnv->TestWaitNotification(runtime, t.TxId);
+
+ {
+ TInactiveZone inactive(activeZone);
+
+ TestLs(runtime, "/MyRoot/ColumnTable", false, NLs::PathNotExist);
+ }
+ });
+ }
+
Y_UNIT_TEST(CreateMultipleTables) {
TTestWithReboots t(false);
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
@@ -163,6 +224,50 @@ Y_UNIT_TEST_SUITE(TOlapReboots) {
});
}
+ Y_UNIT_TEST(CreateMultipleStandaloneTables) {
+ TTestWithReboots t(false);
+ t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
+
+ {
+ TInactiveZone inactive(activeZone);
+ // no inactive initialization
+ }
+
+ t.TestEnv->ReliablePropose(runtime,
+ CreateColumnTableRequest(t.TxId += 2, "/MyRoot", R"(
+ Name: "ColumnTable1"
+ ColumnShardCount: 1
+ Schema {
+ Columns { Name: "timestamp" Type: "Timestamp" }
+ Columns { Name: "data" Type: "Utf8" }
+ KeyColumnNames: "timestamp"
+ Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
+ }
+ )"),
+ {NKikimrScheme::StatusAccepted, NKikimrScheme::StatusAlreadyExists, NKikimrScheme::StatusMultipleModifications});
+ t.TestEnv->ReliablePropose(runtime,
+ CreateColumnTableRequest(t.TxId - 1, "/MyRoot", R"(
+ Name: "ColumnTable2"
+ ColumnShardCount: 1
+ Schema {
+ Columns { Name: "timestamp" Type: "Timestamp" }
+ Columns { Name: "data" Type: "Utf8" }
+ KeyColumnNames: "timestamp"
+ Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
+ }
+ )"),
+ {NKikimrScheme::StatusAccepted, NKikimrScheme::StatusAlreadyExists, NKikimrScheme::StatusMultipleModifications});
+ t.TestEnv->TestWaitNotification(runtime, {t.TxId - 1, t.TxId});
+
+ {
+ TInactiveZone inactive(activeZone);
+
+ TestLs(runtime, "/MyRoot/ColumnTable1", false, NLs::PathExist);
+ TestLs(runtime, "/MyRoot/ColumnTable2", false, NLs::PathExist);
+ }
+ });
+ }
+
Y_UNIT_TEST(DropMultipleTables) {
TTestWithReboots t(false);
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
@@ -213,6 +318,55 @@ Y_UNIT_TEST_SUITE(TOlapReboots) {
});
}
+ Y_UNIT_TEST(DropMultipleStandaloneTables) {
+ TTestWithReboots t(false);
+ t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
+
+ {
+ TInactiveZone inactive(activeZone);
+
+ TestCreateColumnTable(runtime, ++t.TxId, "/MyRoot", R"(
+ Name: "ColumnTable1"
+ ColumnShardCount: 1
+ Schema {
+ Columns { Name: "timestamp" Type: "Timestamp" }
+ Columns { Name: "data" Type: "Utf8" }
+ KeyColumnNames: "timestamp"
+ Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
+ }
+ )");
+ t.TestEnv->TestWaitNotification(runtime, t.TxId);
+
+ TestCreateColumnTable(runtime, ++t.TxId, "/MyRoot", R"(
+ Name: "ColumnTable2"
+ ColumnShardCount: 1
+ Schema {
+ Columns { Name: "timestamp" Type: "Timestamp" }
+ Columns { Name: "data" Type: "Utf8" }
+ KeyColumnNames: "timestamp"
+ Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
+ }
+ )");
+ t.TestEnv->TestWaitNotification(runtime, t.TxId);
+ }
+
+ t.TestEnv->ReliablePropose(runtime,
+ DropColumnTableRequest(t.TxId += 2, "/MyRoot", "ColumnTable1"),
+ {NKikimrScheme::StatusAccepted, NKikimrScheme::StatusMultipleModifications});
+ t.TestEnv->ReliablePropose(runtime,
+ DropColumnTableRequest(t.TxId - 1, "/MyRoot", "ColumnTable2"),
+ {NKikimrScheme::StatusAccepted, NKikimrScheme::StatusMultipleModifications});
+ t.TestEnv->TestWaitNotification(runtime, {t.TxId - 1, t.TxId});
+
+ {
+ TInactiveZone inactive(activeZone);
+
+ TestLs(runtime, "/MyRoot/ColumnTable1", false, NLs::PathNotExist);
+ TestLs(runtime, "/MyRoot/ColumnTable2", false, NLs::PathNotExist);
+ }
+ });
+ }
+
Y_UNIT_TEST(CreateDropStore) {
TTestWithReboots t(false);
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema
index 247e9e1f645..05803c18423 100644
--- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema
+++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema
@@ -5861,6 +5861,11 @@
"ColumnId": 4,
"ColumnName": "Sharding",
"ColumnType": "String"
+ },
+ {
+ "ColumnId": 5,
+ "ColumnName": "StandaloneSharding",
+ "ColumnType": "String"
}
],
"ColumnsDropped": [],
@@ -5870,7 +5875,8 @@
1,
2,
3,
- 4
+ 4,
+ 5
],
"RoomID": 0,
"Codec": 0,
@@ -5919,6 +5925,11 @@
"ColumnId": 5,
"ColumnName": "AlterBody",
"ColumnType": "String"
+ },
+ {
+ "ColumnId": 6,
+ "ColumnName": "StandaloneSharding",
+ "ColumnType": "String"
}
],
"ColumnsDropped": [],
@@ -5929,7 +5940,8 @@
2,
3,
4,
- 5
+ 5,
+ 6
],
"RoomID": 0,
"Codec": 0,