aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authornsofya <nsofya@yandex-team.com>2023-05-11 08:25:54 +0300
committernsofya <nsofya@yandex-team.com>2023-05-11 08:25:54 +0300
commit10b60fb01a6505d865f2a9e57a1fe1425e4147af (patch)
treed203850cf06ac3ae9df95c30e1dbf6ecb9a6d465
parente6ce2324e500026937dd7f34c7671f3c5ca62b03 (diff)
downloadydb-10b60fb01a6505d865f2a9e57a1fe1425e4147af.tar.gz
Add not null column
Add not null table
-rw-r--r--ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp313
-rw-r--r--ydb/core/tx/columnshard/columnshard__read.cpp4
-rw-r--r--ydb/core/tx/columnshard/columnshard__write_index.cpp1
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp10
-rw-r--r--ydb/core/tx/columnshard/columnshard_private_events.h4
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h17
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp8
-rw-r--r--ydb/core/tx/columnshard/engines/index_logic_logs.cpp87
-rw-r--r--ydb/core/tx/columnshard/engines/index_logic_logs.h9
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.cpp3
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.h131
-rw-r--r--ydb/core/tx/columnshard/engines/reader/batch.cpp11
-rw-r--r--ydb/core/tx/columnshard/engines/reader/description.h1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_metadata.cpp12
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_metadata.h17
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp12
-rw-r--r--ydb/core/tx/columnshard/tables_manager.cpp7
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_olap_types.h1
20 files changed, 489 insertions, 165 deletions
diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
index 284740bd03c..b3a0bbfc7fd 100644
--- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
+++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
@@ -3,6 +3,8 @@
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
+#include <ydb/public/sdk/cpp/client/draft/ydb_long_tx.h>
+#include <ydb/core/testlib/cs_helper.h>
#include <library/cpp/threading/local_executor/local_executor.h>
@@ -3330,37 +3332,6 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString());
}
- Y_UNIT_TEST(AddColumnOlapTable) {
- TKikimrSettings runnerSettings;
- runnerSettings.WithSampleTables = false;
- TKikimrRunner kikimr(runnerSettings);
- auto db = kikimr.GetTableClient();
- auto session = db.CreateSession().GetValueSync().GetSession();
- TString tableName = "/Root/ColumnTableTest";
-
- auto query = TStringBuilder() << R"(
- --!syntax_v1
- CREATE TABLE `)" << tableName << R"(` (
- Key Uint64 NOT NULL,
- Value1 String,
- Value2 Int64 NOT NULL,
- PRIMARY KEY (Key)
- )
- PARTITION BY HASH(Key)
- WITH (
- STORE = COLUMN,
- AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10
- );)";
- auto result = session.ExecuteSchemeQuery(query).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
-
- auto query2 = TStringBuilder() << R"(
- --!syntax_v1
- ALTER TABLE `)" << tableName << R"(`ADD COLUMN Value3 Uint64;)";
- result = session.ExecuteSchemeQuery(query2).GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString());
- }
-
Y_UNIT_TEST(CreateDropColumnTableNegative) {
TKikimrSettings runnerSettings;
runnerSettings.WithSampleTables = false;
@@ -4114,5 +4085,285 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
}
}
+Y_UNIT_TEST_SUITE(KqpOlapScheme) {
+ class TTestHelper {
+ public:
+ class TColumnSchema {
+ YDB_ACCESSOR_DEF(TString, Name);
+ YDB_ACCESSOR_DEF(NScheme::TTypeId, Type);
+ YDB_FLAG_ACCESSOR(Nullable, true);
+ public:
+ TString BuildQuery() const {
+ auto str = TStringBuilder() << Name << " " << NScheme::GetTypeName(Type);
+ if (!NullableFlag) {
+ str << " NOT NULL";
+ }
+ return str;
+ }
+ };
+
+ class TUpdatesBuilder {
+ std::vector<std::unique_ptr<arrow::ArrayBuilder>> Builders;
+ std::shared_ptr<arrow::Schema> Schema;
+ ui32 RowsCount = 0;
+ public:
+ class TRowBuilder {
+ TUpdatesBuilder& Owner;
+ YDB_READONLY(ui32, Index, 0);
+ public:
+ TRowBuilder(ui32 index, TUpdatesBuilder& owner)
+ : Owner(owner)
+ , Index(index)
+ {}
+
+ template <class T>
+ TRowBuilder Add(const T& data) {
+ Y_VERIFY(Index < Owner.Builders.size());
+ auto dataScalar = arrow::MakeScalar(data);
+ auto res = Owner.Builders[Index]->AppendScalar(*dataScalar);
+ return TRowBuilder(Index + 1, Owner);
+ }
+
+ TRowBuilder AddNull() {
+ Y_VERIFY(Index < Owner.Builders.size());
+ auto res = Owner.Builders[Index]->AppendNull();
+ return TRowBuilder(Index + 1, Owner);
+ }
+ };
+
+ TUpdatesBuilder(std::shared_ptr<arrow::Schema> schema)
+ : Schema(schema)
+ {
+ Builders = NArrow::MakeBuilders(schema);
+ Y_VERIFY(Builders.size() == schema->fields().size());
+ }
+
+ TRowBuilder AddRow() {
+ ++RowsCount;
+ return TRowBuilder(0, *this);
+ }
+
+ std::shared_ptr<arrow::RecordBatch> BuildArrow() {
+ TVector<std::shared_ptr<arrow::Array>> columns;
+ columns.reserve(Builders.size());
+ for (auto&& builder : Builders) {
+ auto arrayDataRes = builder->Finish();
+ Y_VERIFY(arrayDataRes.ok());
+ columns.push_back(*arrayDataRes);
+ }
+ return arrow::RecordBatch::Make(Schema, RowsCount, columns);
+ }
+ };
+
+ class TColumnTable {
+ YDB_ACCESSOR_DEF(TString, Name);
+ YDB_ACCESSOR_DEF(TVector<TColumnSchema>, Schema);
+ YDB_ACCESSOR_DEF(TVector<TString>, PrimaryKey);
+ YDB_ACCESSOR_DEF(TVector<TString>, Sharding);
+ YDB_ACCESSOR(ui32, MinPartitionsCount, 1);
+ public:
+ TString BuildQuery() const {
+ auto str = TStringBuilder() << "CREATE TABLE `" << Name << "`";
+ str << " (" << BuildColumnsStr(Schema) << ", PRIMARY KEY (" << JoinStrings(PrimaryKey, ", ") << "))";
+ str << " PARTITION BY HASH(" << JoinStrings(Sharding, ", ") << ")";
+ str << " WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT =" << MinPartitionsCount << ");";
+ return str;
+ }
+
+ std::shared_ptr<arrow::Schema> GetArrowSchema(const TVector<TColumnSchema>& columns) {
+ std::vector<std::shared_ptr<arrow::Field>> result;
+ for (auto&& col : columns) {
+ result.push_back(BuildField(col.GetName(), col.GetType()));
+ }
+ return std::make_shared<arrow::Schema>(result);
+ }
+
+ private:
+ TString BuildColumnsStr(const TVector<TColumnSchema>& clumns) const {
+ TVector<TString> columnStr;
+ for (auto&& c : clumns) {
+ columnStr.push_back(c.BuildQuery());
+ }
+ return JoinStrings(columnStr, ", ");
+ }
+
+ std::shared_ptr<arrow::Field> BuildField(const TString name, const NScheme::TTypeId& typeId) const {
+ switch(typeId) {
+ case NScheme::NTypeIds::Bool:
+ return arrow::field(name, arrow::boolean());
+ case NScheme::NTypeIds::Int8:
+ return arrow::field(name, arrow::int8());
+ case NScheme::NTypeIds::Int16:
+ return arrow::field(name, arrow::int16());
+ case NScheme::NTypeIds::Int32:
+ return arrow::field(name, arrow::int32());
+ case NScheme::NTypeIds::Int64:
+ return arrow::field(name, arrow::int64());
+ case NScheme::NTypeIds::Uint8:
+ return arrow::field(name, arrow::uint8());
+ case NScheme::NTypeIds::Uint16:
+ return arrow::field(name, arrow::uint16());
+ case NScheme::NTypeIds::Uint32:
+ return arrow::field(name, arrow::uint32());
+ case NScheme::NTypeIds::Uint64:
+ return arrow::field(name, arrow::uint64());
+ case NScheme::NTypeIds::Float:
+ return arrow::field(name, arrow::float32());
+ case NScheme::NTypeIds::Double:
+ return arrow::field(name, arrow::float64());
+ case NScheme::NTypeIds::String:
+ return arrow::field(name, arrow::binary());
+ case NScheme::NTypeIds::Utf8:
+ return arrow::field(name, arrow::utf8());
+ case NScheme::NTypeIds::Json:
+ return arrow::field(name, arrow::binary());
+ case NScheme::NTypeIds::Yson:
+ return arrow::field(name, arrow::binary());
+ case NScheme::NTypeIds::Date:
+ return arrow::field(name, arrow::uint16());
+ case NScheme::NTypeIds::Datetime:
+ return arrow::field(name, arrow::uint32());
+ case NScheme::NTypeIds::Timestamp:
+ return arrow::field(name, arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO));
+ case NScheme::NTypeIds::Interval:
+ return arrow::field(name, arrow::duration(arrow::TimeUnit::TimeUnit::MICRO));
+ case NScheme::NTypeIds::JsonDocument:
+ return arrow::field(name, arrow::binary());
+ }
+ return nullptr;
+ }
+ };
+
+ private:
+ NYdb::NTable::TTableClient TableClient;
+ NYdb::NLongTx::TClient LongTxClient;
+ NYdb::NTable::TSession Session;
+
+ public:
+ TTestHelper(const TKikimrRunner& kikimr)
+ : TableClient(kikimr.GetTableClient())
+ , LongTxClient(kikimr.GetDriver())
+ , Session(TableClient.CreateSession().GetValueSync().GetSession())
+ {}
+
+ NYdb::NTable::TSession& GetSession() {
+ return Session;
+ }
+
+ void CreateTable(const TColumnTable& table) {
+ auto result = Session.ExecuteSchemeQuery(table.BuildQuery()).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ void InsertData(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates) {
+ NLongTx::TLongTxBeginResult resBeginTx = LongTxClient.BeginWriteTx().GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(resBeginTx.Status().GetStatus(), EStatus::SUCCESS, resBeginTx.Status().GetIssues().ToString());
+
+ auto txId = resBeginTx.GetResult().tx_id();
+ auto batch = updates.BuildArrow();
+ TString data = NArrow::SerializeBatchNoCompression(batch);
+
+ NLongTx::TLongTxWriteResult resWrite =
+ LongTxClient.Write(txId, table.GetName(), txId, data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(resWrite.Status().GetStatus(), EStatus::SUCCESS, resWrite.Status().GetIssues().ToString());
+
+ NLongTx::TLongTxCommitResult resCommitTx = LongTxClient.CommitTx(txId).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(resCommitTx.Status().GetStatus(), EStatus::SUCCESS, resCommitTx.Status().GetIssues().ToString());
+ }
+
+ void ReadData(const TString& query, const TString& expected) {
+ auto it = TableClient.StreamExecuteScanQuery(query).GetValueSync();
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+ TString result = StreamResultToYson(it);
+ UNIT_ASSERT_NO_DIFF(ReformatYson(result), ReformatYson(expected));
+ }
+ };
+
+ Y_UNIT_TEST(AddColumn) {
+ TKikimrSettings runnerSettings;
+ runnerSettings.WithSampleTables = false;
+ TKikimrRunner kikimr(runnerSettings);
+
+ TVector<TTestHelper::TColumnSchema> schema = {
+ TTestHelper::TColumnSchema().SetName("id").SetType(NScheme::NTypeIds::Int32).SetNullable(false),
+ TTestHelper::TColumnSchema().SetName("resource_id").SetType(NScheme::NTypeIds::Utf8),
+ TTestHelper::TColumnSchema().SetName("level").SetType(NScheme::NTypeIds::Int32)
+ };
+
+ TTestHelper testHelper(kikimr);
+ TTestHelper::TColumnTable testTable;
+
+ testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({"id"}).SetSharding({"id"}).SetSchema(schema);
+ testHelper.CreateTable(testTable);
+
+ {
+ TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema));
+ tableInserter.AddRow().Add(1).Add("test_res_1").AddNull();
+ tableInserter.AddRow().Add(2).Add("test_res_2").Add(123);
+ testHelper.InsertData(testTable, tableInserter);
+ }
+
+ testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#;[\"test_res_1\"]]]");
+
+ {
+ schema.push_back(TTestHelper::TColumnSchema().SetName("new_column").SetType(NScheme::NTypeIds::Uint64));
+ auto alterQuery = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "` ADD COLUMN new_column Uint64;";
+ auto alterResult = testHelper.GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());
+ }
+
+ {
+ auto settings = TDescribeTableSettings().WithTableStatistics(true);
+ auto describeResult = testHelper.GetSession().DescribeTable("/Root/ColumnTableTest", settings).GetValueSync();
+ UNIT_ASSERT_C(describeResult.IsSuccess(), describeResult.GetIssues().ToString());
+
+ const auto& description = describeResult.GetTableDescription();
+ auto columns = description.GetTableColumns();
+ UNIT_ASSERT_VALUES_EQUAL(columns.size(), 4);
+ }
+
+ testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#;#;[\"test_res_1\"]]]");
+ testHelper.ReadData("SELECT new_column FROM `/Root/ColumnTableTest` WHERE id=1", "[[#]]");
+ testHelper.ReadData("SELECT resource_id FROM `/Root/ColumnTableTest` WHERE id=1", "[[[\"test_res_1\"]]]");
+
+ {
+ TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema));
+ tableInserter.AddRow().Add(3).Add("test_res_3").Add(123).Add<uint64_t>(200);
+ testHelper.InsertData(testTable, tableInserter);
+ }
+
+ testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=3", "[[3;[123];[200u];[\"test_res_3\"]]]");
+ testHelper.ReadData("SELECT new_column FROM `/Root/ColumnTableTest` WHERE id=3", "[[[200u]]]");
+ testHelper.ReadData("SELECT resource_id FROM `/Root/ColumnTableTest` WHERE id=3", "[[[\"test_res_3\"]]]");
+ testHelper.ReadData("SELECT new_column FROM `/Root/ColumnTableTest`", "[[#];[#];[[200u]]]");
+ }
+
+ Y_UNIT_TEST(AddColumnErrors) {
+ TKikimrSettings runnerSettings;
+ runnerSettings.WithSampleTables = false;
+ TKikimrRunner kikimr(runnerSettings);
+
+ TVector<TTestHelper::TColumnSchema> schema = {
+ TTestHelper::TColumnSchema().SetName("id").SetType(NScheme::NTypeIds::Int32).SetNullable(false),
+ TTestHelper::TColumnSchema().SetName("resource_id").SetType(NScheme::NTypeIds::Utf8),
+ TTestHelper::TColumnSchema().SetName("level").SetType(NScheme::NTypeIds::Int32)
+ };
+
+ TTestHelper testHelper(kikimr);
+ TTestHelper::TColumnTable testTable;
+
+ testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({"id"}).SetSharding({"id"}).SetSchema(schema);
+ testHelper.CreateTable(testTable);
+
+ {
+ schema.push_back(TTestHelper::TColumnSchema().SetName("new_column").SetType(NScheme::NTypeIds::Uint64).SetNullable(false));
+ auto alterQuery = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "`ADD COLUMN new_column Uint64 NOT NULL;";
+ auto alterResult = testHelper.GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SCHEME_ERROR, alterResult.GetIssues().ToString());
+ }
+ }
+}
+
+
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/tx/columnshard/columnshard__read.cpp b/ydb/core/tx/columnshard/columnshard__read.cpp
index 2345be01eff..cff05627660 100644
--- a/ydb/core/tx/columnshard/columnshard__read.cpp
+++ b/ydb/core/tx/columnshard/columnshard__read.cpp
@@ -47,8 +47,6 @@ bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& ctx) {
txc.DB.NoMoreReadsForTx();
auto& record = Proto(Ev->Get());
- const NOlap::TIndexInfo& indexInfo = Self->TablesManager.GetIndexInfo(NOlap::TSnapshot(record.GetPlanStep(), record.GetTxId()));
-
ui64 metaShard = record.GetTxInitiator();
NOlap::TReadDescription read(NOlap::TSnapshot(record.GetPlanStep(), record.GetTxId()), false);
@@ -56,6 +54,8 @@ bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& ctx) {
read.ReadNothing = !(Self->TablesManager.HasTable(read.PathId));
read.ColumnIds = ProtoToVector<ui32>(record.GetColumnIds());
read.ColumnNames = ProtoToVector<TString>(record.GetColumnNames());
+
+ const NOlap::TIndexInfo& indexInfo = Self->TablesManager.GetIndexInfo(read.GetSnapshot());
if (read.ColumnIds.empty() && read.ColumnNames.empty()) {
auto allColumnNames = indexInfo.ArrowSchema()->field_names();
read.ColumnNames.assign(allColumnNames.begin(), allColumnNames.end());
diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp
index 19d7195fb87..464c6902169 100644
--- a/ydb/core/tx/columnshard/columnshard__write_index.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp
@@ -62,6 +62,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
NOlap::TSnapshot snapshot = changes->ApplySnapshot;
if (snapshot.IsZero()) {
snapshot = NOlap::TSnapshot(Self->LastPlannedStep, Self->LastPlannedTxId);
+ Y_VERIFY(Ev->Get()->IndexInfo.GetLastSchema()->GetSnapshot() <= snapshot);
}
TBlobGroupSelector dsGroupSelector(Self->Info());
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 4004ca1511e..cbf3431927a 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -746,7 +746,7 @@ std::unique_ptr<TEvPrivate::TEvIndexing> TColumnShard::SetupIndexation() {
return {};
}
- auto actualIndexInfo = TablesManager.GetIndexInfo();
+ auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex();
ActiveIndexingOrCompaction = true;
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges,
Settings.CacheDataAfterIndexing, std::move(cachedBlobs));
@@ -784,7 +784,7 @@ std::unique_ptr<TEvPrivate::TEvCompaction> TColumnShard::SetupCompaction() {
return {};
}
- auto actualIndexInfo = TablesManager.GetIndexInfo();
+ auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex();
ActiveIndexingOrCompaction = true;
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges,
Settings.CacheDataAfterCompaction);
@@ -828,9 +828,9 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u
LOG_S_DEBUG("Evicting path " << i.first << " with " << i.second.GetDebugString() << " at tablet " << TabletID());
}
- auto actualIndexInfo = TablesManager.GetIndexInfo();
+ auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex();
std::shared_ptr<NOlap::TColumnEngineChanges> indexChanges;
- indexChanges = TablesManager.MutablePrimaryIndex().StartTtl(eviction, actualIndexInfo.ArrowSchema());
+ indexChanges = TablesManager.MutablePrimaryIndex().StartTtl(eviction, actualIndexInfo.GetLastSchema()->GetIndexInfo().ArrowSchema());
if (!indexChanges) {
LOG_S_DEBUG("Cannot prepare TTL at tablet " << TabletID());
@@ -893,7 +893,7 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() {
return {};
}
- auto actualIndexInfo = TablesManager.GetIndexInfo();
+ auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex();
#if 0 // No need for now
if (Tiers) {
...
diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h
index 685f6c47888..05a23aca572 100644
--- a/ydb/core/tx/columnshard/columnshard_private_events.h
+++ b/ydb/core/tx/columnshard/columnshard_private_events.h
@@ -27,7 +27,7 @@ struct TEvPrivate {
/// Common event for Indexing and GranuleCompaction: write index data in TTxWriteIndex transaction.
struct TEvWriteIndex : public TEventLocal<TEvWriteIndex, EvWriteIndex> {
NKikimrProto::EReplyStatus PutStatus = NKikimrProto::UNKNOWN;
- NOlap::TIndexInfo IndexInfo;
+ NOlap::TVersionedIndex IndexInfo;
THashMap<ui64, NKikimr::NOlap::TTiering> Tiering;
std::shared_ptr<NOlap::TColumnEngineChanges> IndexChanges;
THashMap<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>> CachedBlobs;
@@ -40,7 +40,7 @@ struct TEvPrivate {
bool CacheData{false};
TDuration Duration;
- TEvWriteIndex(NOlap::TIndexInfo&& indexInfo,
+ TEvWriteIndex(NOlap::TVersionedIndex&& indexInfo,
std::shared_ptr<NOlap::TColumnEngineChanges> indexChanges,
bool cacheData,
THashMap<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>>&& cachedBlobs = {})
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index 444fe64427e..faac7b563e6 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -339,17 +339,14 @@ class TVersionedIndex {
std::map<TSnapshot, ISnapshotSchema::TPtr> Snapshots;
public:
ISnapshotSchema::TPtr GetSchema(const TSnapshot& version) const {
- Y_UNUSED(version);
- return GetLastSchema();
- /*
- for (auto it = Snapshots.rbegin(); it != Snapshots.rend(); ++it) {
- if (it->first <= version) {
- return it->second;
- }
+ for (auto it = Snapshots.rbegin(); it != Snapshots.rend(); ++it) {
+ if (it->first <= version) {
+ return it->second;
}
- Y_VERIFY(false);
- return nullptr;
- */
+ }
+ Y_VERIFY(!Snapshots.empty());
+ Y_VERIFY(version.IsZero());
+ return Snapshots.begin()->second; // For old compaction logic compatibility
}
ISnapshotSchema::TPtr GetLastSchema() const {
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index ec5b3b2579c..b99a5951f3d 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -27,7 +27,7 @@ bool InitInGranuleMerge(const TMark& granuleMark, std::vector<TPortionInfo>& por
for (const auto& portionInfo : portions) {
if (portionInfo.IsInserted()) {
++insertedCount;
- if (portionInfo.Snapshot().GetPlanStep() > oldTimePlanStep) {
+ if (portionInfo.GetSnapshot().GetPlanStep() > oldTimePlanStep) {
++insertedNew;
}
} else if (portionInfo.BlobsSizes().second >= limits.GoodBlobSize) {
@@ -602,7 +602,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T
}
isClean = false;
- if (info.XSnapshot() < snapshot) {
+ if (info.GetXSnapshot() < snapshot) {
affectedRecords += info.NumRecords();
changes->PortionsToDrop.push_back(info);
}
@@ -1204,8 +1204,8 @@ TMap<TSnapshot, std::vector<ui64>> TColumnEngineForLogs::GetOrderedPortions(ui64
continue;
}
- TSnapshot recSnapshot = portionInfo.Snapshot();
- TSnapshot recXSnapshot = portionInfo.XSnapshot();
+ TSnapshot recSnapshot = portionInfo.GetSnapshot();
+ TSnapshot recXSnapshot = portionInfo.GetXSnapshot();
bool visible = (recSnapshot <= snapshot);
if (recXSnapshot.GetPlanStep()) {
diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
index e4ed20b5cb8..4bccce87cc3 100644
--- a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp
@@ -59,19 +59,21 @@ bool TEvictionLogic::UpdateEvictedPortion(TPortionInfo& portionInfo,
Y_VERIFY(!evictFeatures.NeedExport);
- auto schema = IndexInfo.ArrowSchemaWithSpecials();
- auto batch = portionInfo.AssembleInBatch(IndexInfo, schema, srcBlobs);
+ TPortionInfo undo = portionInfo;
+
+ auto blobSchema = IndexInfo.GetSchema(undo.GetSnapshot());
+ auto resultSchema = IndexInfo.GetLastSchema();
+ auto batch = portionInfo.AssembleInBatch(*blobSchema, *resultSchema, srcBlobs);
auto writeOptions = WriteOptions(*compression);
- TPortionInfo undo = portionInfo;
size_t undoSize = newBlobs.size();
for (auto& rec : portionInfo.Records) {
- auto colName = IndexInfo.GetColumnName(rec.ColumnId);
- std::string name(colName.data(), colName.size());
- auto field = schema->GetFieldByName(name);
+ auto pos = resultSchema->GetFieldIndex(rec.ColumnId);
+ Y_VERIFY(pos >= 0);
+ auto field = resultSchema->GetField(pos);
- auto blob = TPortionInfo::SerializeColumn(batch->GetColumnByName(name), field, writeOptions);
+ auto blob = TPortionInfo::SerializeColumn(batch->GetColumnByName(field->name()), field, writeOptions);
if (blob.size() >= TPortionInfo::BLOB_BYTES_LIMIT) {
portionInfo = undo;
newBlobs.resize(undoSize);
@@ -85,7 +87,7 @@ bool TEvictionLogic::UpdateEvictedPortion(TPortionInfo& portionInfo,
evictedRecords.emplace_back(std::move(rec));
}
- portionInfo.AddMetadata(IndexInfo, batch, evictFeatures.TargetTierName);
+ portionInfo.AddMetadata(*resultSchema, batch, evictFeatures.TargetTierName);
return true;
}
@@ -95,11 +97,12 @@ std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathI
const TSnapshot& minSnapshot,
std::vector<TString>& blobs) const {
Y_VERIFY(batch->num_rows());
- const auto schema = IndexInfo.ArrowSchemaWithSpecials();
+
+ auto resultSchema = IndexInfo.GetSchema(minSnapshot);
std::vector<TPortionInfo> out;
TString tierName;
- TCompression compression = IndexInfo.GetDefaultCompression();
+ TCompression compression = resultSchema->GetIndexInfo().GetDefaultCompression();
if (pathId) {
if (auto* tiering = GetTieringMap().FindPtr(pathId)) {
tierName = tiering->GetHottestTierName();
@@ -115,16 +118,16 @@ std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathI
Y_VERIFY(portionBatch->num_rows());
TPortionInfo portionInfo;
- portionInfo.Records.reserve(schema->num_fields());
+ portionInfo.Records.reserve(resultSchema->GetSchema()->num_fields());
std::vector<TString> portionBlobs;
- portionBlobs.reserve(schema->num_fields());
+ portionBlobs.reserve(resultSchema->GetSchema()->num_fields());
// Serialize portion's columns into blobs
bool ok = true;
- for (const auto& field : schema->fields()) {
+ for (const auto& field : resultSchema->GetSchema()->fields()) {
const auto& name = field->name();
- ui32 columnId = IndexInfo.GetColumnId(TString(name.data(), name.size()));
+ ui32 columnId = resultSchema->GetIndexInfo().GetColumnId(TString(name.data(), name.size()));
/// @warnign records are not valid cause of empty BlobId and zero Portion
TColumnRecord record = TColumnRecord::Make(granule, columnId, minSnapshot, 0);
@@ -140,7 +143,7 @@ std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathI
}
if (ok) {
- portionInfo.AddMetadata(IndexInfo, portionBatch, tierName);
+ portionInfo.AddMetadata(*resultSchema, portionBatch, tierName);
out.emplace_back(std::move(portionInfo));
for (auto& blob : portionBlobs) {
blobs.push_back(blob);
@@ -162,14 +165,12 @@ std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathI
std::vector<std::shared_ptr<arrow::RecordBatch>> TCompactionLogic::PortionsToBatches(const std::vector<TPortionInfo>& portions,
const THashMap<TBlobRange, TString>& blobs,
bool insertedOnly) const {
- // TODO: schema changes
- const std::shared_ptr<arrow::Schema> schema = IndexInfo.ArrowSchemaWithSpecials();
-
std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
batches.reserve(portions.size());
-
+ auto resultSchema = IndexInfo.GetLastSchema();
for (auto& portionInfo : portions) {
- auto batch = portionInfo.AssembleInBatch(IndexInfo, schema, blobs);
+ auto blobSchema = IndexInfo.GetSchema(portionInfo.GetSnapshot());
+ auto batch = portionInfo.AssembleInBatch(*blobSchema, *resultSchema, blobs);
if (!insertedOnly || portionInfo.IsInserted()) {
batches.push_back(batch);
}
@@ -224,17 +225,22 @@ std::vector<TString> TIndexationLogic::Apply(std::shared_ptr<TColumnEngineChange
auto changes = std::static_pointer_cast<TColumnEngineForLogs::TChanges>(indexChanges);
Y_VERIFY(!changes->DataToIndex.empty());
Y_VERIFY(changes->AppendedPortions.empty());
- Y_VERIFY(IndexInfo.IsSorted());
- TSnapshot& minSnapshot = changes->ApplySnapshot;
- THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> pathBatches;
+
+ TSnapshot minSnapshot = changes->ApplySnapshot;
for (auto& inserted : changes->DataToIndex) {
TSnapshot insertSnap = inserted.GetSnapshot();
Y_VERIFY(insertSnap.Valid());
if (minSnapshot.IsZero() || insertSnap <= minSnapshot) {
minSnapshot = insertSnap;
}
-
+ }
+ Y_VERIFY(minSnapshot.Valid());
+ auto& indexInfo = IndexInfo.GetSchema(minSnapshot)->GetIndexInfo();
+ Y_VERIFY(indexInfo.IsSorted());
+
+ THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> pathBatches;
+ for (auto& inserted : changes->DataToIndex) {
TBlobRange blobRange(inserted.BlobId, 0, inserted.BlobId.BlobSize());
std::shared_ptr<arrow::RecordBatch> batch;
@@ -242,18 +248,16 @@ std::vector<TString> TIndexationLogic::Apply(std::shared_ptr<TColumnEngineChange
batch = it->second;
} else if (auto* blobData = changes->Blobs.FindPtr(blobRange)) {
Y_VERIFY(!blobData->empty(), "Blob data not present");
- batch = NArrow::DeserializeBatch(*blobData, IndexInfo.ArrowSchema());
+ batch = NArrow::DeserializeBatch(*blobData, indexInfo.ArrowSchema());
} else {
Y_VERIFY(blobData, "Data for range %s has not been read", blobRange.ToString().c_str());
}
Y_VERIFY(batch);
- batch = AddSpecials(batch, IndexInfo, inserted);
+ batch = AddSpecials(batch, indexInfo, inserted);
pathBatches[inserted.PathId].push_back(batch);
- Y_VERIFY_DEBUG(NArrow::IsSorted(pathBatches[inserted.PathId].back(), IndexInfo.GetReplaceKey()));
+ Y_VERIFY_DEBUG(NArrow::IsSorted(pathBatches[inserted.PathId].back(), indexInfo.GetReplaceKey()));
}
- Y_VERIFY(minSnapshot.Valid());
-
std::vector<TString> blobs;
for (auto& [pathId, batches] : pathBatches) {
@@ -265,13 +269,13 @@ std::vector<TString> TIndexationLogic::Apply(std::shared_ptr<TColumnEngineChange
Y_VERIFY(merged);
Y_VERIFY_DEBUG(NArrow::IsSorted(merged, indexInfo.GetReplaceKey()));
#else
- auto merged = NArrow::CombineSortedBatches(batches, IndexInfo.SortReplaceDescription());
+ auto merged = NArrow::CombineSortedBatches(batches, indexInfo.SortReplaceDescription());
Y_VERIFY(merged);
- Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(merged, IndexInfo.GetReplaceKey()));
+ Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(merged, indexInfo.GetReplaceKey()));
#endif
- auto granuleBatches = SliceIntoGranules(merged, changes->PathToGranule[pathId], IndexInfo);
+ auto granuleBatches = SliceIntoGranules(merged, changes->PathToGranule[pathId], indexInfo);
for (auto& [granule, batch] : granuleBatches) {
auto portions = MakeAppendedPortions(pathId, batch, granule, minSnapshot, blobs);
Y_VERIFY(portions.size() > 0);
@@ -291,17 +295,17 @@ std::shared_ptr<arrow::RecordBatch> TCompactionLogic::CompactInOneGranule(ui64 g
std::vector<std::shared_ptr<arrow::RecordBatch>> batches;
batches.reserve(portions.size());
- auto schema = IndexInfo.ArrowSchemaWithSpecials();
+ auto resultSchema = IndexInfo.GetLastSchema();
for (auto& portionInfo : portions) {
Y_VERIFY(!portionInfo.Empty());
Y_VERIFY(portionInfo.Granule() == granule);
-
- auto batch = portionInfo.AssembleInBatch(IndexInfo, schema, blobs);
+ auto blobSchema = IndexInfo.GetSchema(portionInfo.GetSnapshot());
+ auto batch = portionInfo.AssembleInBatch(*blobSchema, *resultSchema, blobs);
batches.push_back(batch);
}
- auto sortedBatch = NArrow::CombineSortedBatches(batches, IndexInfo.SortReplaceDescription());
- Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(sortedBatch, IndexInfo.GetReplaceKey()));
+ auto sortedBatch = NArrow::CombineSortedBatches(batches, resultSchema->GetIndexInfo().SortReplaceDescription());
+ Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(sortedBatch, resultSchema->GetIndexInfo().GetReplaceKey()));
return sortedBatch;
}
@@ -315,10 +319,11 @@ std::vector<TString> TCompactionLogic::CompactInGranule(std::shared_ptr<TColumnE
ui64 granule = switchedProtions[0].Granule();
auto batch = CompactInOneGranule(granule, switchedProtions, changes->Blobs);
+ auto resultSchema = IndexInfo.GetLastSchema();
std::vector<TPortionInfo> portions;
if (!changes->MergeBorders.Empty()) {
Y_VERIFY(changes->MergeBorders.GetOrderedMarks().size() > 1);
- auto slices = changes->MergeBorders.SliceIntoGranules(batch, IndexInfo);
+ auto slices = changes->MergeBorders.SliceIntoGranules(batch, resultSchema->GetIndexInfo());
portions.reserve(slices.size());
for (auto& [_, slice] : slices) {
@@ -577,6 +582,8 @@ std::vector<TString> TCompactionLogic::CompactSplitGranule(const std::shared_ptr
std::vector<TString> blobs;
+ auto resultSchema = IndexInfo.GetLastSchema();
+
if (movedRows) {
Y_VERIFY(changes->PortionsToMove.size() >= 2);
Y_VERIFY(changes->PortionsToMove.size() == tsIds.size());
@@ -628,7 +635,7 @@ std::vector<TString> TCompactionLogic::CompactSplitGranule(const std::shared_ptr
for (size_t i = 0; i < portions.size(); ++i) {
auto& portion = portions[i];
auto& batch = srcBatches[i];
- auto slices = marksGranules.SliceIntoGranules(batch, IndexInfo);
+ auto slices = marksGranules.SliceIntoGranules(batch, resultSchema->GetIndexInfo());
THashSet<ui64> ids;
for (auto& [id, slice] : slices) {
@@ -672,7 +679,7 @@ std::vector<TString> TCompactionLogic::CompactSplitGranule(const std::shared_ptr
}
}
} else {
- auto batches = SliceGranuleBatches(IndexInfo, *changes, srcBatches, ts0);
+ auto batches = SliceGranuleBatches(resultSchema->GetIndexInfo(), *changes, srcBatches, ts0);
changes->SetTmpGranule(pathId, ts0);
for (auto& [ts, batch] : batches) {
diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.h b/ydb/core/tx/columnshard/engines/index_logic_logs.h
index ecd2100dbef..91471217ee3 100644
--- a/ydb/core/tx/columnshard/engines/index_logic_logs.h
+++ b/ydb/core/tx/columnshard/engines/index_logic_logs.h
@@ -8,19 +8,20 @@ namespace NKikimr::NOlap {
class TIndexLogicBase {
protected:
- const TIndexInfo& IndexInfo;
+ const TVersionedIndex& IndexInfo;
private:
const THashMap<ui64, NKikimr::NOlap::TTiering>* TieringMap = nullptr;
public:
- TIndexLogicBase(const TIndexInfo& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap)
+ TIndexLogicBase(const TVersionedIndex& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap)
: IndexInfo(indexInfo)
, TieringMap(&tieringMap)
{
}
- TIndexLogicBase(const TIndexInfo& indexInfo)
- : IndexInfo(indexInfo) {
+ TIndexLogicBase(const TVersionedIndex& indexInfo)
+ : IndexInfo(indexInfo)
+ {
}
virtual ~TIndexLogicBase() {
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index ebc5a0fb4b1..edccbfcad1f 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -190,7 +190,7 @@ TIndexedReadData::MakeNotIndexedBatch(const std::shared_ptr<arrow::RecordBatch>&
// (do not filter snapshot columns)
auto loadSchema = ReadMetadata->GetLoadSchema(snapshot);
- auto batch = NArrow::ExtractExistedColumns(srcBatch, loadSchema);
+ auto batch = NArrow::ExtractExistedColumns(srcBatch, loadSchema->GetSchema());
Y_VERIFY(batch);
auto filter = FilterNotIndexed(batch, *ReadMetadata);
@@ -200,7 +200,7 @@ TIndexedReadData::MakeNotIndexedBatch(const std::shared_ptr<arrow::RecordBatch>&
auto preparedBatch = batch;
preparedBatch = TIndexInfo::AddSpecialColumns(preparedBatch, snapshot);
- preparedBatch = NArrow::ExtractColumns(preparedBatch, loadSchema);
+ preparedBatch = NArrow::ExtractColumns(preparedBatch, loadSchema->GetSchema());
Y_VERIFY(preparedBatch);
filter.Apply(preparedBatch);
diff --git a/ydb/core/tx/columnshard/engines/portion_info.cpp b/ydb/core/tx/columnshard/engines/portion_info.cpp
index 5ac90c67303..55c36d4c5d8 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portion_info.cpp
@@ -44,11 +44,12 @@ void TPortionInfo::AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>&
Meta.ColumnMeta[columnId].Max = NArrow::GetScalar(column, minMaxPos.second);
}
-void TPortionInfo::AddMetadata(const TIndexInfo& indexInfo, const std::shared_ptr<arrow::RecordBatch>& batch,
+void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch,
const TString& tierName) {
TierName = tierName;
Meta = {};
+ auto& indexInfo = snapshotSchema.GetIndexInfo();
/// @note It does not add RawBytes info for snapshot columns, only for user ones.
for (auto& [columnId, col] : indexInfo.Columns) {
auto column = batch->GetColumnByName(col.Name);
diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h
index c70e4d7dcf6..97a680aefc2 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portion_info.h
@@ -56,6 +56,55 @@ public:
}
};
+class TFilteredSnapshotSchema : public ISnapshotSchema {
+ ISnapshotSchema::TPtr OriginalSnapshot;
+ std::shared_ptr<arrow::Schema> Schema;
+ std::set<ui32> ColumnIds;
+public:
+ TFilteredSnapshotSchema(ISnapshotSchema::TPtr originalSnapshot, const std::vector<ui32>& columnIds)
+ : TFilteredSnapshotSchema(originalSnapshot, std::set(columnIds.begin(), columnIds.end()))
+ {}
+
+ TFilteredSnapshotSchema(ISnapshotSchema::TPtr originalSnapshot, const std::set<ui32>& columnIds)
+ : OriginalSnapshot(originalSnapshot)
+ , ColumnIds(columnIds)
+ {
+ std::vector<std::shared_ptr<arrow::Field>> schemaFields;
+ for (auto&& i : OriginalSnapshot->GetSchema()->fields()) {
+ if (!ColumnIds.contains(OriginalSnapshot->GetIndexInfo().GetColumnId(i->name()))) {
+ continue;
+ }
+ schemaFields.emplace_back(i);
+ }
+ Schema = std::make_shared<arrow::Schema>(schemaFields);
+ }
+
+ int GetFieldIndex(const ui32 columnId) const override {
+ if (!ColumnIds.contains(columnId)) {
+ return -1;
+ }
+ TString columnName = OriginalSnapshot->GetIndexInfo().GetColumnName(columnId);
+ std::string name(columnName.data(), columnName.size());
+ return Schema->GetFieldIndex(name);
+ }
+
+ std::shared_ptr<arrow::Field> GetField(const int index) const override {
+ return Schema->field(index);
+ }
+
+ const std::shared_ptr<arrow::Schema>& GetSchema() const override {
+ return Schema;
+ }
+
+ const TIndexInfo& GetIndexInfo() const override {
+ return OriginalSnapshot->GetIndexInfo();
+ }
+
+ const TSnapshot& GetSnapshot() const override {
+ return OriginalSnapshot->GetSnapshot();
+ }
+};
+
struct TPortionMeta {
// NOTE: These values are persisted in LocalDB so they must be stable
enum EProduced : ui32 {
@@ -139,20 +188,20 @@ struct TPortionInfo {
}
}
- TSnapshot Snapshot() const {
+ TSnapshot GetSnapshot() const {
Y_VERIFY(!Empty());
auto& rec = Records[0];
return TSnapshot(rec.PlanStep, rec.TxId);
}
- TSnapshot XSnapshot() const {
+ TSnapshot GetXSnapshot() const {
Y_VERIFY(!Empty());
auto& rec = Records[0];
return TSnapshot(rec.XPlanStep, rec.XTxId);
}
bool IsActive() const {
- return XSnapshot().IsZero();
+ return GetXSnapshot().IsZero();
}
std::pair<ui32, ui32> BlobsSizes() const {
@@ -208,7 +257,7 @@ struct TPortionInfo {
TString GetMetadata(const TColumnRecord& rec) const;
void LoadMetadata(const TIndexInfo& indexInfo, const TColumnRecord& rec);
- void AddMetadata(const TIndexInfo& indexInfo, const std::shared_ptr<arrow::RecordBatch>& batch,
+ void AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch,
const TString& tierName);
void AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted);
@@ -363,6 +412,7 @@ public:
private:
std::vector<TPreparedColumn> Columns;
std::shared_ptr<arrow::Schema> Schema;
+ size_t RowsCount = 0;
public:
struct TAssembleOptions {
@@ -398,9 +448,14 @@ public:
return Columns.size();
}
- TPreparedBatchData(std::vector<TPreparedColumn>&& columns, std::shared_ptr<arrow::Schema> schema)
+ size_t GetRowsCount() const {
+ return RowsCount;
+ }
+
+ TPreparedBatchData(std::vector<TPreparedColumn>&& columns, std::shared_ptr<arrow::Schema> schema, const size_t rowsCount)
: Columns(std::move(columns))
, Schema(schema)
+ , RowsCount(rowsCount)
{
}
@@ -408,42 +463,45 @@ public:
};
template <class TExternalBlobInfo>
- TPreparedBatchData PrepareForAssemble(const TIndexInfo& indexInfo,
- const std::shared_ptr<arrow::Schema>& schema,
- const THashMap<TBlobRange, TExternalBlobInfo>& blobsData, const std::optional<std::set<ui32>>& columnIds) const {
- // Correct records order
- TMap<int, TMap<ui32, TBlobRange>> columnChunks; // position in schema -> ordered chunks
-
- std::vector<std::shared_ptr<arrow::Field>> schemaFields;
+ TPreparedBatchData PrepareForAssemble(const ISnapshotSchema& dataSchema, const ISnapshotSchema& resultSchema,
+ const THashMap<TBlobRange, TExternalBlobInfo>& blobsData) const {
+ std::vector<TPreparedColumn> columns;
+ columns.reserve(resultSchema.GetSchema()->num_fields());
- for (auto&& i : schema->fields()) {
- if (columnIds && !columnIds->contains(indexInfo.GetColumnId(i->name()))) {
- continue;
- }
- schemaFields.emplace_back(i);
+ Y_VERIFY(!Meta.ColumnMeta.empty());
+ const ui32 rowsCount = Meta.ColumnMeta.begin()->second.NumRows;
+ const auto& indexInfo = resultSchema.GetIndexInfo();
+ for (auto&& field : resultSchema.GetSchema()->fields()) {
+ columns.emplace_back(TPreparedColumn(field, {TAssembleBlobInfo(rowsCount)}, indexInfo.GetColumnId(field->name())));
}
+ TMap<size_t, TMap<ui32, TBlobRange>> columnChunks; // position in schema -> ordered chunks
+ TMap<size_t, size_t> positionsMap;
+
for (auto& rec : Records) {
- if (columnIds && !columnIds->contains(rec.ColumnId)) {
+ auto resulPos = resultSchema.GetFieldIndex(rec.ColumnId);
+ if (resulPos < 0) {
continue;
}
- ui32 columnId = rec.ColumnId;
- TString columnName = indexInfo.GetColumnName(columnId);
- std::string name(columnName.data(), columnName.size());
- int pos = schema->GetFieldIndex(name);
- if (pos < 0) {
- continue; // no such column in schema - do not need it
+ auto pos = dataSchema.GetFieldIndex(rec.ColumnId);
+ Y_ASSERT(pos >= 0);
+ positionsMap[resulPos] = pos;
+ columnChunks[resulPos][rec.Chunk] = rec.BlobRange;
+ auto columnMeta = Meta.ColumnMeta.FindPtr(rec.ColumnId);
+ if (columnMeta) {
+ Y_VERIFY_S(rowsCount == columnMeta->NumRows, TStringBuilder() << "Inconsistent rows " << rowsCount << "/" << columnMeta->NumRows);
}
-
- columnChunks[pos][rec.Chunk] = rec.BlobRange;
}
// Make chunked arrays for columns
- std::vector<TPreparedColumn> columns;
- columns.reserve(columnChunks.size());
-
for (auto& [pos, orderedChunks] : columnChunks) {
- auto field = schema->field(pos);
+ Y_VERIFY(positionsMap.contains(pos));
+ size_t dataPos = positionsMap[pos];
+ auto portionField = dataSchema.GetField(dataPos);
+ auto resultField = resultSchema.GetField(pos);
+
+ Y_VERIFY(portionField->IsCompatibleWith(*resultField));
+
std::vector<TAssembleBlobInfo> blobs;
blobs.reserve(orderedChunks.size());
ui32 expected = 0;
@@ -456,16 +514,17 @@ public:
blobs.emplace_back(it->second);
}
- columns.emplace_back(TPreparedColumn(field, std::move(blobs), indexInfo.GetColumnId(field->name())));
+ Y_VERIFY(pos < columns.size());
+ columns[pos] = TPreparedColumn(resultField, std::move(blobs), indexInfo.GetColumnId(resultField->name()));
}
- return TPreparedBatchData(std::move(columns), std::make_shared<arrow::Schema>(schemaFields));
+ return TPreparedBatchData(std::move(columns), resultSchema.GetSchema(), rowsCount);
}
- std::shared_ptr<arrow::RecordBatch> AssembleInBatch(const TIndexInfo& indexInfo,
- const std::shared_ptr<arrow::Schema>& schema,
- const THashMap<TBlobRange, TString>& data) const {
- return PrepareForAssemble(indexInfo, schema, data, {}).Assemble();
+ std::shared_ptr<arrow::RecordBatch> AssembleInBatch(const ISnapshotSchema& dataSchema,
+ const ISnapshotSchema& resultSchema,
+ const THashMap<TBlobRange, TString>& data) const {
+ return PrepareForAssemble(dataSchema, resultSchema, data).Assemble();
}
static TString SerializeColumn(const std::shared_ptr<arrow::Array>& array,
diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp
index 1f320b18fae..f66481c1d0d 100644
--- a/ydb/core/tx/columnshard/engines/reader/batch.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp
@@ -28,8 +28,15 @@ NColumnShard::IDataTasksProcessor::ITask::TPtr TBatch::AssembleTask(NColumnShard
Y_VERIFY(WaitIndexed.empty());
Y_VERIFY(PortionInfo->Produced());
Y_VERIFY(!FetchedInfo.GetFilteredBatch());
- auto loadSchema = readMetadata->GetLoadSchema(readMetadata->GetSnapshot());
- auto batchConstructor = PortionInfo->PrepareForAssemble(readMetadata->GetIndexInfo(), loadSchema, Data, CurrentColumnIds);
+
+ auto blobSchema = readMetadata->GetLoadSchema(PortionInfo->GetSnapshot());
+ ISnapshotSchema::TPtr resultSchema;
+ if (CurrentColumnIds) {
+ resultSchema= std::make_shared<TFilteredSnapshotSchema>(readMetadata->GetLoadSchema(readMetadata->GetSnapshot()), *CurrentColumnIds);
+ } else {
+ resultSchema = readMetadata->GetLoadSchema(readMetadata->GetSnapshot());
+ }
+ auto batchConstructor = PortionInfo->PrepareForAssemble(*blobSchema, *resultSchema, Data);
Data.clear();
if (!FetchedInfo.GetFilter()) {
return std::make_shared<TAssembleFilter>(std::move(batchConstructor), readMetadata,
diff --git a/ydb/core/tx/columnshard/engines/reader/description.h b/ydb/core/tx/columnshard/engines/reader/description.h
index 69f299e4493..b2928673e2a 100644
--- a/ydb/core/tx/columnshard/engines/reader/description.h
+++ b/ydb/core/tx/columnshard/engines/reader/description.h
@@ -36,6 +36,7 @@ public:
std::vector<TString> ColumnNames;
std::shared_ptr<NSsa::TProgram> AddProgram(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& program);
+
TReadDescription(const TSnapshot& snapshot, const bool isReverse)
: Snapshot(snapshot)
, PKRangesFilter(isReverse) {
diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
index a984369d33d..da799734056 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
@@ -94,16 +94,10 @@ bool TReadMetadata::Init(const TReadDescription& readDescription, const TDataSto
CommittedBatches.emplace(cmt.GetBlobId(), batch);
}
}
-
- auto loadSchema = GetLoadSchema(Snapshot);
- if (!loadSchema) {
- return false;
- }
-
+
THashSet<ui32> columnIds;
- for (auto& field : loadSchema->fields()) {
- TString column(field->name().data(), field->name().size());
- columnIds.insert(indexInfo.GetColumnId(column));
+ for (auto& columnId : AllColumns) {
+ columnIds.insert(columnId);
}
Program = readDescription.Program;
diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/read_metadata.h
index 56b3878581e..44aab0cd508 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_metadata.h
+++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.h
@@ -162,9 +162,18 @@ public:
return AllColumns.size();
}
- std::shared_ptr<arrow::Schema> GetLoadSchema(const TSnapshot& version) const {
- const auto& indexInfo = IndexVersions.GetSchema(version)->GetIndexInfo();
- return indexInfo.ArrowSchema(AllColumns, true);
+ ISnapshotSchema::TPtr GetSnapshotSchema(const TSnapshot& version) const {
+ if (version >=Snapshot){
+ return ResultIndexSchema;
+ }
+ return IndexVersions.GetSchema(version);
+ }
+
+ ISnapshotSchema::TPtr GetLoadSchema(const std::optional<TSnapshot>& version = {}) const {
+ if (!version) {
+ return make_shared<TFilteredSnapshotSchema>(ResultIndexSchema, AllColumns);
+ }
+ return make_shared<TFilteredSnapshotSchema>(IndexVersions.GetSchema(*version), AllColumns);
}
std::shared_ptr<arrow::Schema> GetBlobSchema(const TSnapshot& version) const {
@@ -185,7 +194,7 @@ public:
std::vector<std::string> GetColumnsOrder() const {
auto loadSchema = GetLoadSchema(Snapshot);
std::vector<std::string> result;
- for (auto&& i : loadSchema->fields()) {
+ for (auto&& i : loadSchema->GetSchema()->fields()) {
result.emplace_back(i->name());
}
return result;
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index 36e22f2699d..48d007f2a34 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -280,7 +280,7 @@ bool Insert(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap,
changes->Blobs.insert(blobs.begin(), blobs.end());
- TIndexationLogic logic(engine.GetIndexInfo());
+ TIndexationLogic logic(engine.GetVersionedIndex());
std::vector<TString> newBlobs = logic.Apply(changes);
UNIT_ASSERT_VALUES_EQUAL(changes->AppendedPortions.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(newBlobs.size(), testColumns.size() + 2); // add 2 columns: planStep, txId
@@ -316,7 +316,7 @@ bool Compact(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, T
UNIT_ASSERT_VALUES_EQUAL(changes->SwitchedPortions.size(), expected.SrcPortions);
changes->SetBlobs(std::move(blobs));
- TCompactionLogic logic(engine.GetIndexInfo());
+ TCompactionLogic logic(engine.GetVersionedIndex());
std::vector<TString> newBlobs = logic.Apply(changes);
UNIT_ASSERT_VALUES_EQUAL(changes->AppendedPortions.size(), expected.NewPortions);
AddIdsToBlobs(newBlobs, changes->AppendedPortions, changes->Blobs, step);
@@ -499,7 +499,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// compact
planStep = 2;
- bool ok = Compact(tableInfo, db, TSnapshot{planStep, 1}, std::move(blobs), step, {20, 4, 4});
+ bool ok = Compact(tableInfo, db, TSnapshot(planStep, 1), std::move(blobs), step, {20, 4, 4});
UNIT_ASSERT(ok);
// load
@@ -620,7 +620,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// compact
planStep = 2;
- bool ok = Compact(engine, db, TSnapshot{planStep, 1}, std::move(blobs), step, {23, 5, 5});
+ bool ok = Compact(engine, db, TSnapshot(planStep, 1), std::move(blobs), step, {23, 5, 5});
UNIT_ASSERT(ok);
// success write after compaction
@@ -680,7 +680,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// compact
planStep = 2;
- bool ok = Compact(tableInfo, db, TSnapshot{planStep, 1}, std::move(blobs), step, {20, 4, 4});
+ bool ok = Compact(tableInfo, db, TSnapshot(planStep, 1), std::move(blobs), step, {20, 4, 4});
UNIT_ASSERT(ok);
// load
@@ -704,7 +704,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
}
// Cleanup
- Cleanup(engine, db, TSnapshot{planStep, 1}, 20);
+ Cleanup(engine, db, TSnapshot(planStep, 1), 20);
{ // full scan
ui64 txId = 1;
diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp
index f0cc7d8fcc8..c84debaa779 100644
--- a/ydb/core/tx/columnshard/tables_manager.cpp
+++ b/ydb/core/tx/columnshard/tables_manager.cpp
@@ -222,10 +222,7 @@ bool TTablesManager::RegisterSchemaPreset(const TSchemaPreset& schemaPreset, NIc
void TTablesManager::AddPresetVersion(const ui32 presetId, const TRowVersion& version, const TTableSchema& schema, NIceDb::TNiceDb& db) {
Y_VERIFY(SchemaPresets.contains(presetId));
auto preset = SchemaPresets.at(presetId);
- if (!preset.IsEmpty()) {
- LOG_S_DEBUG("EnsureSchemaPreset for existed preset " << presetId << " at tablet " << TabletId);
- return;
- }
+
TSchemaPreset::TSchemaPresetVersionInfo versionInfo;
versionInfo.SetId(presetId);
versionInfo.SetSinceStep(version.Step);
@@ -283,7 +280,7 @@ void TTablesManager::IndexSchemaVersion(const TRowVersion& version, const TTable
}
}
-std::shared_ptr<NOlap::TColumnEngineChanges> TTablesManager::StartIndexCleanup(const NOlap::TSnapshot& snapshot, ui32 maxRecords) {
+std::shared_ptr<NOlap::TColumnEngineChanges> TTablesManager::StartIndexCleanup(const NOlap::TSnapshot& snapshot, ui32 maxRecords) {
Y_VERIFY(PrimaryIndex);
return PrimaryIndex->StartCleanup(snapshot, PathsToDrop, maxRecords);
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp
index 7e927ea4520..610bc8a3c5b 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp
@@ -38,7 +38,7 @@ public:
return false;
}
- if (AlterRequest.HasAlterSchema() || AlterRequest.HasAlterSchemaPresetName()) {
+ if (AlterRequest.HasAlterSchemaPresetName()) {
errors.AddError(NKikimrScheme::StatusSchemeError, "Changing table schema is not supported");
return false;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_olap_types.h b/ydb/core/tx/schemeshard/schemeshard_olap_types.h
index f85cc2c25d4..19c5ea65124 100644
--- a/ydb/core/tx/schemeshard/schemeshard_olap_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_olap_types.h
@@ -98,7 +98,6 @@ namespace NKikimr::NSchemeShard {
bool Validate(const NKikimrSchemeOp::TColumnTableSchema& opSchema, IErrorCollector& errors) const;
bool ValidateTtlSettings(const NKikimrSchemeOp::TColumnDataLifeCycle& ttlSettings, IErrorCollector& errors) const;
- static bool UpdateProto(NKikimrSchemeOp::TColumnTableSchema& proto, TString& errStr);
static bool IsAllowedType(ui32 typeId);
static bool IsAllowedFirstPkType(ui32 typeId);
};