aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornsofya <nsofya@ydb.tech>2023-12-13 17:58:37 +0300
committernsofya <nsofya@ydb.tech>2023-12-13 20:38:25 +0300
commite2ae1d98f104be3d94d131d04b20886c5c06089f (patch)
tree8f1dc113c34192cdca8894ccbeba0e679aa78224
parent1d09d7aba5ab2b3a137b6bb0efaac974f6ca99b5 (diff)
downloadydb-e2ae1d98f104be3d94d131d04b20886c5c06089f.tar.gz
KIKIMR-20482: Remove duplicates on schema versions
-rw-r--r--ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp8
-rw-r--r--ydb/core/tx/columnshard/engines/changes/indexation.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h25
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp10
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h4
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/index_info.cpp10
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/index_info.h3
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp14
-rw-r--r--ydb/core/tx/columnshard/tables_manager.cpp13
-rw-r--r--ydb/core/tx/columnshard/tables_manager.h2
12 files changed, 52 insertions, 45 deletions
diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
index 5c057f3afd..dac5556582 100644
--- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
+++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
@@ -13,7 +13,7 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWrit
const auto& writeMeta(PutBlobResult->Get()->GetWriteMeta());
- auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaUnsafe(PutBlobResult->Get()->GetSchemaVersion());
+ auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaVerified(PutBlobResult->Get()->GetSchemaVersion());
NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), blobRange, meta, tableSchema->GetVersion(), blob);
bool ok = Self->InsertTable->Insert(dbTable, std::move(insertData));
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index c79a3ee408..6c143dda48 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -519,6 +519,8 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl
TTableInfo::TTableVersionInfo tableVerProto;
tableVerProto.SetPathId(pathId);
+ // check schema changed
+
if (tableProto.HasSchemaPreset()) {
Y_ABORT_UNLESS(!tableProto.HasSchema(), "Tables has either schema or preset");
@@ -528,7 +530,7 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl
tableVerProto.SetSchemaPresetId(preset.GetId());
if (TablesManager.RegisterSchemaPreset(preset, db)) {
- TablesManager.AddPresetVersion(tableProto.GetSchemaPreset().GetId(), version, tableProto.GetSchemaPreset().GetSchema(), db);
+ TablesManager.AddSchemaVersion(tableProto.GetSchemaPreset().GetId(), version, tableProto.GetSchemaPreset().GetSchema(), db);
}
} else {
Y_ABORT_UNLESS(tableProto.HasSchema(), "Tables has either schema or preset");
@@ -571,7 +573,7 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP
TTableInfo::TTableVersionInfo tableVerProto;
if (alterProto.HasSchemaPreset()) {
tableVerProto.SetSchemaPresetId(alterProto.GetSchemaPreset().GetId());
- TablesManager.AddPresetVersion(alterProto.GetSchemaPreset().GetId(), version, alterProto.GetSchemaPreset().GetSchema(), db);
+ TablesManager.AddSchemaVersion(alterProto.GetSchemaPreset().GetId(), version, alterProto.GetSchemaPreset().GetSchema(), db);
} else if (alterProto.HasSchema()) {
*tableVerProto.MutableSchema() = alterProto.GetSchema();
}
@@ -630,7 +632,7 @@ void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto,
if (!TablesManager.HasPreset(presetProto.GetId())) {
continue; // we don't update presets that we don't use
}
- TablesManager.AddPresetVersion(presetProto.GetId(), version, presetProto.GetSchema(), db);
+ TablesManager.AddSchemaVersion(presetProto.GetId(), version, presetProto.GetSchema(), db);
}
}
diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp
index 8c23b15af2..9247e37a64 100644
--- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp
@@ -70,7 +70,7 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont
for (auto& inserted : DataToIndex) {
const TBlobRange& blobRange = inserted.GetBlobRange();
- auto blobSchema = context.SchemaVersions.GetSchema(inserted.GetSchemaVersion());
+ auto blobSchema = context.SchemaVersions.GetSchemaVerified(inserted.GetSchemaVersion());
auto& indexInfo = blobSchema->GetIndexInfo();
Y_ABORT_UNLESS(indexInfo.IsSorted());
@@ -88,7 +88,7 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont
;
}
- batch = AddSpecials(batch, blobSchema->GetIndexInfo(), inserted);
+ batch = AddSpecials(batch, indexInfo, inserted);
batch = resultSchema->NormalizeBatch(*blobSchema, batch);
pathBatches[inserted.PathId].push_back(batch);
Y_DEBUG_ABORT_UNLESS(NArrow::IsSorted(pathBatches[inserted.PathId].back(), resultSchema->GetIndexInfo().GetReplaceKey()));
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index d074e1510d..7697e28fb0 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -305,9 +305,9 @@ public:
return it == SnapshotByVersion.end() ? nullptr : it->second;
}
- ISnapshotSchema::TPtr GetSchemaUnsafe(const ui64 version) const {
+ ISnapshotSchema::TPtr GetSchemaVerified(const ui64 version) const {
auto it = SnapshotByVersion.find(version);
- Y_ABORT_UNLESS(it != SnapshotByVersion.end());
+ Y_ABORT_UNLESS(it != SnapshotByVersion.end(), "no schema for version %lu", version);
return it->second;
}
@@ -331,23 +331,21 @@ public:
return IndexKey;
}
- void AddIndex(const TSnapshot& version, TIndexInfo&& indexInfo) {
+ void AddIndex(const TSnapshot& snapshot, TIndexInfo&& indexInfo) {
if (Snapshots.empty()) {
IndexKey = indexInfo.GetIndexKey();
} else {
Y_ABORT_UNLESS(IndexKey->Equals(indexInfo.GetIndexKey()));
}
- auto it = Snapshots.emplace(version, std::make_shared<TSnapshotSchema>(std::move(indexInfo), version));
- Y_ABORT_UNLESS(it.second);
- auto newVersion = it.first->second->GetVersion();
- if (SnapshotByVersion.contains(newVersion)) {
- Y_VERIFY_S(LastSchemaVersion != 0, TStringBuilder() << "Last: " << LastSchemaVersion);
- Y_VERIFY_S(LastSchemaVersion == newVersion, TStringBuilder() << "Last: " << LastSchemaVersion << ";New: " << newVersion);
+ auto newVersion = indexInfo.GetVersion();
+ auto itVersion = SnapshotByVersion.emplace(newVersion, std::make_shared<TSnapshotSchema>(std::move(indexInfo), snapshot));
+ if (!itVersion.second) {
+ AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("message", "Skip registered version")("version", LastSchemaVersion);
}
-
- SnapshotByVersion[newVersion] = it.first->second;
- LastSchemaVersion = newVersion;
+ auto itSnap = Snapshots.emplace(snapshot, itVersion.first->second);
+ Y_ABORT_UNLESS(itSnap.second);
+ LastSchemaVersion = std::max(newVersion, LastSchemaVersion);
}
};
@@ -378,8 +376,7 @@ public:
virtual std::shared_ptr<TTTLColumnEngineChanges> StartTtl(const THashMap<ui64, TTiering>& pathEviction, const THashSet<TPortionAddress>& busyPortions,
ui64 maxBytesToEvict = TCompactionLimits::DEFAULT_EVICTION_BYTES) noexcept = 0;
virtual bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) noexcept = 0;
- virtual void UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) = 0;
- //virtual void UpdateTableSchema(ui64 pathId, const TSnapshot& snapshot, TIndexInfo&& info) = 0; // TODO
+ virtual void RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& info) = 0;
virtual const TMap<ui64, std::shared_ptr<TColumnEngineStats>>& GetStats() const = 0;
virtual const TColumnEngineStats& GetTotalStats() = 0;
virtual ui64 MemoryUsage() const { return 0; }
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 7480b083d9..18290e79d9 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -129,13 +129,16 @@ void TColumnEngineForLogs::UpdatePortionStats(TColumnEngineStats& engineStats, c
}
}
-void TColumnEngineForLogs::UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) {
+void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& indexInfo) {
if (!ColumnsTable) {
- ui32 indexId = info.GetId();
+ ui32 indexId = indexInfo.GetId();
ColumnsTable = std::make_shared<TColumnsTable>(indexId);
CountersTable = std::make_shared<TCountersTable>(indexId);
+ } else {
+ const NOlap::TIndexInfo& lastIndexInfo = VersionedIndex.GetLastSchema()->GetIndexInfo();
+ Y_ABORT_UNLESS(lastIndexInfo.CheckCompatible(indexInfo));
}
- VersionedIndex.AddIndex(snapshot, std::move(info));
+ VersionedIndex.AddIndex(snapshot, std::move(indexInfo));
}
bool TColumnEngineForLogs::Load(IDbWrapper& db) {
@@ -143,6 +146,7 @@ bool TColumnEngineForLogs::Load(IDbWrapper& db) {
Loaded = true;
THashMap<ui64, ui64> granuleToPathIdDecoder;
{
+ TMemoryProfileGuard g("TTxInit/LoadColumns");
auto guard = GranulesStorage->StartPackModification();
if (!LoadColumns(db)) {
return false;
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index b0098c6d2c..00439c1980 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -141,9 +141,7 @@ public:
bool ApplyChanges(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges,
const TSnapshot& snapshot) noexcept override;
- void UpdateDefaultSchema(const TSnapshot& snapshot, TIndexInfo&& info) override;
-
-
+ void RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& info) override;
std::shared_ptr<TSelectInfo> Select(ui64 pathId, TSnapshot snapshot,
const TPKRangesFilter& pkRangesFilter) const override;
diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp
index 6f72ad5968..7bcfff8a55 100644
--- a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp
@@ -29,8 +29,8 @@ std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::NormalizeBatch(const ISnaps
if (dataSchema.GetSnapshot() == GetSnapshot()) {
return batch;
}
- const std::shared_ptr<arrow::Schema>& resultArrowSchema = GetSchema();
Y_ABORT_UNLESS(dataSchema.GetSnapshot() < GetSnapshot());
+ const std::shared_ptr<arrow::Schema>& resultArrowSchema = GetSchema();
std::vector<std::shared_ptr<arrow::Array>> newColumns;
newColumns.reserve(resultArrowSchema->num_fields());
diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
index 0e492e596d..b438621663 100644
--- a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp
@@ -28,6 +28,16 @@ TIndexInfo::TIndexInfo(const TString& name, ui32 id)
, Name(name)
{}
+bool TIndexInfo::CheckCompatible(const TIndexInfo& other) const {
+ if (!other.GetReplaceKey()->Equals(GetReplaceKey())) {
+ return false;
+ }
+ if (!other.GetIndexKey()->Equals(GetIndexKey())) {
+ return false;
+ }
+ return true;
+}
+
std::shared_ptr<arrow::RecordBatch> TIndexInfo::AddSpecialColumns(const std::shared_ptr<arrow::RecordBatch>& batch, const TSnapshot& snapshot) {
Y_ABORT_UNLESS(batch);
i64 numColumns = batch->num_columns();
diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h
index b365f7227e..4d27e29ebe 100644
--- a/ydb/core/tx/columnshard/engines/scheme/index_info.h
+++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h
@@ -91,7 +91,6 @@ public:
}
public:
-
static TIndexInfo BuildDefault() {
TIndexInfo result("dummy", 0);
return result;
@@ -209,6 +208,8 @@ public:
return Version;
}
+ bool CheckCompatible(const TIndexInfo& other) const;
+
private:
ui32 Id;
ui64 Version = 0;
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index 1e3d7f712f..8e0c8ceef1 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -400,7 +400,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// load
TColumnEngineForLogs engine(0, TestLimits(), CommonStoragesManager);
TSnapshot indexSnaphot(1, 1);
- engine.UpdateDefaultSchema(indexSnaphot, TIndexInfo(tableInfo));
+ engine.RegisterSchemaVersion(indexSnaphot, TIndexInfo(tableInfo));
for (auto&& i : paths) {
engine.RegisterTable(i);
}
@@ -485,7 +485,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
TSnapshot indexSnapshot(1, 1);
TColumnEngineForLogs engine(0, TestLimits(), CommonStoragesManager);
- engine.UpdateDefaultSchema(indexSnapshot, TIndexInfo(tableInfo));
+ engine.RegisterSchemaVersion(indexSnapshot, TIndexInfo(tableInfo));
engine.RegisterTable(pathId);
engine.Load(db);
@@ -585,7 +585,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
TColumnEngineForLogs engine(0, TestLimits(), CommonStoragesManager);
TSnapshot indexSnapshot(1, 1);
- engine.UpdateDefaultSchema(indexSnapshot, TIndexInfo(tableInfo));
+ engine.RegisterSchemaVersion(indexSnapshot, TIndexInfo(tableInfo));
engine.RegisterTable(pathId);
engine.Load(db);
@@ -612,7 +612,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // check it's overloaded after reload
TColumnEngineForLogs tmpEngine(0, TestLimits(), CommonStoragesManager);
- tmpEngine.UpdateDefaultSchema(TSnapshot::Zero(), TIndexInfo(tableInfo));
+ tmpEngine.RegisterSchemaVersion(TSnapshot::Zero(), TIndexInfo(tableInfo));
tmpEngine.RegisterTable(pathId);
tmpEngine.Load(db);
}
@@ -643,7 +643,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // check it's not overloaded after reload
TColumnEngineForLogs tmpEngine(0, TestLimits(), CommonStoragesManager);
- tmpEngine.UpdateDefaultSchema(TSnapshot::Zero(), TIndexInfo(tableInfo));
+ tmpEngine.RegisterSchemaVersion(TSnapshot::Zero(), TIndexInfo(tableInfo));
tmpEngine.RegisterTable(pathId);
tmpEngine.Load(db);
}
@@ -661,7 +661,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
TSnapshot indexSnapshot(1, 1);
{
TColumnEngineForLogs engine(0, TestLimits(), CommonStoragesManager);
- engine.UpdateDefaultSchema(indexSnapshot, TIndexInfo(tableInfo));
+ engine.RegisterSchemaVersion(indexSnapshot, TIndexInfo(tableInfo));
engine.RegisterTable(pathId);
engine.Load(db);
@@ -728,7 +728,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{
// load
TColumnEngineForLogs engine(0, TestLimits(), CommonStoragesManager);
- engine.UpdateDefaultSchema(indexSnapshot, TIndexInfo(tableInfo));
+ engine.RegisterSchemaVersion(indexSnapshot, TIndexInfo(tableInfo));
engine.RegisterTable(pathId);
engine.Load(db);
diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp
index ee9222a40c..cab9f5758c 100644
--- a/ydb/core/tx/columnshard/tables_manager.cpp
+++ b/ydb/core/tx/columnshard/tables_manager.cpp
@@ -219,7 +219,7 @@ bool TTablesManager::RegisterSchemaPreset(const TSchemaPreset& schemaPreset, NIc
return true;
}
-void TTablesManager::AddPresetVersion(const ui32 presetId, const TRowVersion& version, const NKikimrSchemeOp::TColumnTableSchema& schema, NIceDb::TNiceDb& db) {
+void TTablesManager::AddSchemaVersion(const ui32 presetId, const TRowVersion& version, const NKikimrSchemeOp::TColumnTableSchema& schema, NIceDb::TNiceDb& db) {
Y_ABORT_UNLESS(SchemaPresets.contains(presetId));
auto preset = SchemaPresets.at(presetId);
@@ -250,10 +250,10 @@ void TTablesManager::AddTableVersion(const ui64 pathId, const TRowVersion& versi
if (SchemaPresets.empty()) {
TSchemaPreset fakePreset;
Y_ABORT_UNLESS(RegisterSchemaPreset(fakePreset, db));
- AddPresetVersion(fakePreset.GetId(), version, versionInfo.GetSchema(), db);
+ AddSchemaVersion(fakePreset.GetId(), version, versionInfo.GetSchema(), db);
} else {
Y_ABORT_UNLESS(SchemaPresets.contains(fakePreset.GetId()));
- AddPresetVersion(fakePreset.GetId(), version, versionInfo.GetSchema(), db);
+ AddSchemaVersion(fakePreset.GetId(), version, versionInfo.GetSchema(), db);
}
}
@@ -267,7 +267,6 @@ void TTablesManager::AddTableVersion(const ui64 pathId, const TRowVersion& versi
if (PrimaryIndex) {
PrimaryIndex->OnTieringModified(nullptr, Ttl);
}
-
}
Schema::SaveTableVersionInfo(db, pathId, version, versionInfo);
table.AddVersion(version, versionInfo);
@@ -280,12 +279,8 @@ void TTablesManager::IndexSchemaVersion(const TRowVersion& version, const NKikim
const bool isFirstPrimaryIndexInitialization = !PrimaryIndex;
if (!PrimaryIndex) {
PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId, NOlap::TCompactionLimits(), StoragesManager);
- } else {
- const NOlap::TIndexInfo& lastIndexInfo = PrimaryIndex->GetVersionedIndex().GetLastSchema()->GetIndexInfo();
- Y_ABORT_UNLESS(lastIndexInfo.GetReplaceKey()->Equals(indexInfo.GetReplaceKey()));
- Y_ABORT_UNLESS(lastIndexInfo.GetIndexKey()->Equals(indexInfo.GetIndexKey()));
}
- PrimaryIndex->UpdateDefaultSchema(snapshot, std::move(indexInfo));
+ PrimaryIndex->RegisterSchemaVersion(snapshot, std::move(indexInfo));
if (isFirstPrimaryIndexInitialization) {
for (auto&& i : Tables) {
PrimaryIndex->RegisterTable(i.first);
diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h
index 420bbb6a29..4642389721 100644
--- a/ydb/core/tx/columnshard/tables_manager.h
+++ b/ydb/core/tx/columnshard/tables_manager.h
@@ -208,7 +208,7 @@ public:
void RegisterTable(TTableInfo&& table, NIceDb::TNiceDb& db);
bool RegisterSchemaPreset(const TSchemaPreset& schemaPreset, NIceDb::TNiceDb& db);
- void AddPresetVersion(const ui32 presetId, const TRowVersion& version, const NKikimrSchemeOp::TColumnTableSchema& schema, NIceDb::TNiceDb& db);
+ void AddSchemaVersion(const ui32 presetId, const TRowVersion& version, const NKikimrSchemeOp::TColumnTableSchema& schema, NIceDb::TNiceDb& db);
void AddTableVersion(const ui64 pathId, const TRowVersion& version, const TTableInfo::TTableVersionInfo& versionInfo, NIceDb::TNiceDb& db);
private:
void IndexSchemaVersion(const TRowVersion& version, const NKikimrSchemeOp::TColumnTableSchema& schema);