diff options
author | nsofya <nsofya@yandex-team.com> | 2023-05-11 08:25:54 +0300 |
---|---|---|
committer | nsofya <nsofya@yandex-team.com> | 2023-05-11 08:25:54 +0300 |
commit | 10b60fb01a6505d865f2a9e57a1fe1425e4147af (patch) | |
tree | d203850cf06ac3ae9df95c30e1dbf6ecb9a6d465 | |
parent | e6ce2324e500026937dd7f34c7671f3c5ca62b03 (diff) | |
download | ydb-10b60fb01a6505d865f2a9e57a1fe1425e4147af.tar.gz |
Add not null column
Add not null table
20 files changed, 489 insertions, 165 deletions
diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index 284740bd03c..b3a0bbfc7fd 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -3,6 +3,8 @@ #include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> #include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h> #include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> +#include <ydb/public/sdk/cpp/client/draft/ydb_long_tx.h> +#include <ydb/core/testlib/cs_helper.h> #include <library/cpp/threading/local_executor/local_executor.h> @@ -3330,37 +3332,6 @@ Y_UNIT_TEST_SUITE(KqpScheme) { UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::GENERIC_ERROR, result.GetIssues().ToString()); } - Y_UNIT_TEST(AddColumnOlapTable) { - TKikimrSettings runnerSettings; - runnerSettings.WithSampleTables = false; - TKikimrRunner kikimr(runnerSettings); - auto db = kikimr.GetTableClient(); - auto session = db.CreateSession().GetValueSync().GetSession(); - TString tableName = "/Root/ColumnTableTest"; - - auto query = TStringBuilder() << R"( - --!syntax_v1 - CREATE TABLE `)" << tableName << R"(` ( - Key Uint64 NOT NULL, - Value1 String, - Value2 Int64 NOT NULL, - PRIMARY KEY (Key) - ) - PARTITION BY HASH(Key) - WITH ( - STORE = COLUMN, - AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10 - );)"; - auto result = session.ExecuteSchemeQuery(query).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - - auto query2 = TStringBuilder() << R"( - --!syntax_v1 - ALTER TABLE `)" << tableName << R"(`ADD COLUMN Value3 Uint64;)"; - result = session.ExecuteSchemeQuery(query2).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SCHEME_ERROR, result.GetIssues().ToString()); - } - Y_UNIT_TEST(CreateDropColumnTableNegative) { TKikimrSettings runnerSettings; runnerSettings.WithSampleTables = false; @@ -4114,5 +4085,285 @@ Y_UNIT_TEST_SUITE(KqpScheme) { } } +Y_UNIT_TEST_SUITE(KqpOlapScheme) { + class TTestHelper { + public: + class TColumnSchema { + YDB_ACCESSOR_DEF(TString, Name); + YDB_ACCESSOR_DEF(NScheme::TTypeId, Type); + YDB_FLAG_ACCESSOR(Nullable, true); + public: + TString BuildQuery() const { + auto str = TStringBuilder() << Name << " " << NScheme::GetTypeName(Type); + if (!NullableFlag) { + str << " NOT NULL"; + } + return str; + } + }; + + class TUpdatesBuilder { + std::vector<std::unique_ptr<arrow::ArrayBuilder>> Builders; + std::shared_ptr<arrow::Schema> Schema; + ui32 RowsCount = 0; + public: + class TRowBuilder { + TUpdatesBuilder& Owner; + YDB_READONLY(ui32, Index, 0); + public: + TRowBuilder(ui32 index, TUpdatesBuilder& owner) + : Owner(owner) + , Index(index) + {} + + template <class T> + TRowBuilder Add(const T& data) { + Y_VERIFY(Index < Owner.Builders.size()); + auto dataScalar = arrow::MakeScalar(data); + auto res = Owner.Builders[Index]->AppendScalar(*dataScalar); + return TRowBuilder(Index + 1, Owner); + } + + TRowBuilder AddNull() { + Y_VERIFY(Index < Owner.Builders.size()); + auto res = Owner.Builders[Index]->AppendNull(); + return TRowBuilder(Index + 1, Owner); + } + }; + + TUpdatesBuilder(std::shared_ptr<arrow::Schema> schema) + : Schema(schema) + { + Builders = NArrow::MakeBuilders(schema); + Y_VERIFY(Builders.size() == schema->fields().size()); + } + + TRowBuilder AddRow() { + ++RowsCount; + return TRowBuilder(0, *this); + } + + std::shared_ptr<arrow::RecordBatch> BuildArrow() { + TVector<std::shared_ptr<arrow::Array>> columns; + columns.reserve(Builders.size()); + for (auto&& builder : Builders) { + auto arrayDataRes = builder->Finish(); + Y_VERIFY(arrayDataRes.ok()); + columns.push_back(*arrayDataRes); + } + return arrow::RecordBatch::Make(Schema, RowsCount, columns); + } + }; + + class TColumnTable { + YDB_ACCESSOR_DEF(TString, Name); + YDB_ACCESSOR_DEF(TVector<TColumnSchema>, Schema); + YDB_ACCESSOR_DEF(TVector<TString>, PrimaryKey); + YDB_ACCESSOR_DEF(TVector<TString>, Sharding); + YDB_ACCESSOR(ui32, MinPartitionsCount, 1); + public: + TString BuildQuery() const { + auto str = TStringBuilder() << "CREATE TABLE `" << Name << "`"; + str << " (" << BuildColumnsStr(Schema) << ", PRIMARY KEY (" << JoinStrings(PrimaryKey, ", ") << "))"; + str << " PARTITION BY HASH(" << JoinStrings(Sharding, ", ") << ")"; + str << " WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT =" << MinPartitionsCount << ");"; + return str; + } + + std::shared_ptr<arrow::Schema> GetArrowSchema(const TVector<TColumnSchema>& columns) { + std::vector<std::shared_ptr<arrow::Field>> result; + for (auto&& col : columns) { + result.push_back(BuildField(col.GetName(), col.GetType())); + } + return std::make_shared<arrow::Schema>(result); + } + + private: + TString BuildColumnsStr(const TVector<TColumnSchema>& clumns) const { + TVector<TString> columnStr; + for (auto&& c : clumns) { + columnStr.push_back(c.BuildQuery()); + } + return JoinStrings(columnStr, ", "); + } + + std::shared_ptr<arrow::Field> BuildField(const TString name, const NScheme::TTypeId& typeId) const { + switch(typeId) { + case NScheme::NTypeIds::Bool: + return arrow::field(name, arrow::boolean()); + case NScheme::NTypeIds::Int8: + return arrow::field(name, arrow::int8()); + case NScheme::NTypeIds::Int16: + return arrow::field(name, arrow::int16()); + case NScheme::NTypeIds::Int32: + return arrow::field(name, arrow::int32()); + case NScheme::NTypeIds::Int64: + return arrow::field(name, arrow::int64()); + case NScheme::NTypeIds::Uint8: + return arrow::field(name, arrow::uint8()); + case NScheme::NTypeIds::Uint16: + return arrow::field(name, arrow::uint16()); + case NScheme::NTypeIds::Uint32: + return arrow::field(name, arrow::uint32()); + case NScheme::NTypeIds::Uint64: + return arrow::field(name, arrow::uint64()); + case NScheme::NTypeIds::Float: + return arrow::field(name, arrow::float32()); + case NScheme::NTypeIds::Double: + return arrow::field(name, arrow::float64()); + case NScheme::NTypeIds::String: + return arrow::field(name, arrow::binary()); + case NScheme::NTypeIds::Utf8: + return arrow::field(name, arrow::utf8()); + case NScheme::NTypeIds::Json: + return arrow::field(name, arrow::binary()); + case NScheme::NTypeIds::Yson: + return arrow::field(name, arrow::binary()); + case NScheme::NTypeIds::Date: + return arrow::field(name, arrow::uint16()); + case NScheme::NTypeIds::Datetime: + return arrow::field(name, arrow::uint32()); + case NScheme::NTypeIds::Timestamp: + return arrow::field(name, arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO)); + case NScheme::NTypeIds::Interval: + return arrow::field(name, arrow::duration(arrow::TimeUnit::TimeUnit::MICRO)); + case NScheme::NTypeIds::JsonDocument: + return arrow::field(name, arrow::binary()); + } + return nullptr; + } + }; + + private: + NYdb::NTable::TTableClient TableClient; + NYdb::NLongTx::TClient LongTxClient; + NYdb::NTable::TSession Session; + + public: + TTestHelper(const TKikimrRunner& kikimr) + : TableClient(kikimr.GetTableClient()) + , LongTxClient(kikimr.GetDriver()) + , Session(TableClient.CreateSession().GetValueSync().GetSession()) + {} + + NYdb::NTable::TSession& GetSession() { + return Session; + } + + void CreateTable(const TColumnTable& table) { + auto result = Session.ExecuteSchemeQuery(table.BuildQuery()).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + void InsertData(const TColumnTable& table, TTestHelper::TUpdatesBuilder& updates) { + NLongTx::TLongTxBeginResult resBeginTx = LongTxClient.BeginWriteTx().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(resBeginTx.Status().GetStatus(), EStatus::SUCCESS, resBeginTx.Status().GetIssues().ToString()); + + auto txId = resBeginTx.GetResult().tx_id(); + auto batch = updates.BuildArrow(); + TString data = NArrow::SerializeBatchNoCompression(batch); + + NLongTx::TLongTxWriteResult resWrite = + LongTxClient.Write(txId, table.GetName(), txId, data, Ydb::LongTx::Data::APACHE_ARROW).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(resWrite.Status().GetStatus(), EStatus::SUCCESS, resWrite.Status().GetIssues().ToString()); + + NLongTx::TLongTxCommitResult resCommitTx = LongTxClient.CommitTx(txId).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(resCommitTx.Status().GetStatus(), EStatus::SUCCESS, resCommitTx.Status().GetIssues().ToString()); + } + + void ReadData(const TString& query, const TString& expected) { + auto it = TableClient.StreamExecuteScanQuery(query).GetValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + TString result = StreamResultToYson(it); + UNIT_ASSERT_NO_DIFF(ReformatYson(result), ReformatYson(expected)); + } + }; + + Y_UNIT_TEST(AddColumn) { + TKikimrSettings runnerSettings; + runnerSettings.WithSampleTables = false; + TKikimrRunner kikimr(runnerSettings); + + TVector<TTestHelper::TColumnSchema> schema = { + TTestHelper::TColumnSchema().SetName("id").SetType(NScheme::NTypeIds::Int32).SetNullable(false), + TTestHelper::TColumnSchema().SetName("resource_id").SetType(NScheme::NTypeIds::Utf8), + TTestHelper::TColumnSchema().SetName("level").SetType(NScheme::NTypeIds::Int32) + }; + + TTestHelper testHelper(kikimr); + TTestHelper::TColumnTable testTable; + + testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({"id"}).SetSharding({"id"}).SetSchema(schema); + testHelper.CreateTable(testTable); + + { + TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema)); + tableInserter.AddRow().Add(1).Add("test_res_1").AddNull(); + tableInserter.AddRow().Add(2).Add("test_res_2").Add(123); + testHelper.InsertData(testTable, tableInserter); + } + + testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#;[\"test_res_1\"]]]"); + + { + schema.push_back(TTestHelper::TColumnSchema().SetName("new_column").SetType(NScheme::NTypeIds::Uint64)); + auto alterQuery = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "` ADD COLUMN new_column Uint64;"; + auto alterResult = testHelper.GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString()); + } + + { + auto settings = TDescribeTableSettings().WithTableStatistics(true); + auto describeResult = testHelper.GetSession().DescribeTable("/Root/ColumnTableTest", settings).GetValueSync(); + UNIT_ASSERT_C(describeResult.IsSuccess(), describeResult.GetIssues().ToString()); + + const auto& description = describeResult.GetTableDescription(); + auto columns = description.GetTableColumns(); + UNIT_ASSERT_VALUES_EQUAL(columns.size(), 4); + } + + testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=1", "[[1;#;#;[\"test_res_1\"]]]"); + testHelper.ReadData("SELECT new_column FROM `/Root/ColumnTableTest` WHERE id=1", "[[#]]"); + testHelper.ReadData("SELECT resource_id FROM `/Root/ColumnTableTest` WHERE id=1", "[[[\"test_res_1\"]]]"); + + { + TTestHelper::TUpdatesBuilder tableInserter(testTable.GetArrowSchema(schema)); + tableInserter.AddRow().Add(3).Add("test_res_3").Add(123).Add<uint64_t>(200); + testHelper.InsertData(testTable, tableInserter); + } + + testHelper.ReadData("SELECT * FROM `/Root/ColumnTableTest` WHERE id=3", "[[3;[123];[200u];[\"test_res_3\"]]]"); + testHelper.ReadData("SELECT new_column FROM `/Root/ColumnTableTest` WHERE id=3", "[[[200u]]]"); + testHelper.ReadData("SELECT resource_id FROM `/Root/ColumnTableTest` WHERE id=3", "[[[\"test_res_3\"]]]"); + testHelper.ReadData("SELECT new_column FROM `/Root/ColumnTableTest`", "[[#];[#];[[200u]]]"); + } + + Y_UNIT_TEST(AddColumnErrors) { + TKikimrSettings runnerSettings; + runnerSettings.WithSampleTables = false; + TKikimrRunner kikimr(runnerSettings); + + TVector<TTestHelper::TColumnSchema> schema = { + TTestHelper::TColumnSchema().SetName("id").SetType(NScheme::NTypeIds::Int32).SetNullable(false), + TTestHelper::TColumnSchema().SetName("resource_id").SetType(NScheme::NTypeIds::Utf8), + TTestHelper::TColumnSchema().SetName("level").SetType(NScheme::NTypeIds::Int32) + }; + + TTestHelper testHelper(kikimr); + TTestHelper::TColumnTable testTable; + + testTable.SetName("/Root/ColumnTableTest").SetPrimaryKey({"id"}).SetSharding({"id"}).SetSchema(schema); + testHelper.CreateTable(testTable); + + { + schema.push_back(TTestHelper::TColumnSchema().SetName("new_column").SetType(NScheme::NTypeIds::Uint64).SetNullable(false)); + auto alterQuery = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "`ADD COLUMN new_column Uint64 NOT NULL;"; + auto alterResult = testHelper.GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SCHEME_ERROR, alterResult.GetIssues().ToString()); + } + } +} + + } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/tx/columnshard/columnshard__read.cpp b/ydb/core/tx/columnshard/columnshard__read.cpp index 2345be01eff..cff05627660 100644 --- a/ydb/core/tx/columnshard/columnshard__read.cpp +++ b/ydb/core/tx/columnshard/columnshard__read.cpp @@ -47,8 +47,6 @@ bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& ctx) { txc.DB.NoMoreReadsForTx(); auto& record = Proto(Ev->Get()); - const NOlap::TIndexInfo& indexInfo = Self->TablesManager.GetIndexInfo(NOlap::TSnapshot(record.GetPlanStep(), record.GetTxId())); - ui64 metaShard = record.GetTxInitiator(); NOlap::TReadDescription read(NOlap::TSnapshot(record.GetPlanStep(), record.GetTxId()), false); @@ -56,6 +54,8 @@ bool TTxRead::Execute(TTransactionContext& txc, const TActorContext& ctx) { read.ReadNothing = !(Self->TablesManager.HasTable(read.PathId)); read.ColumnIds = ProtoToVector<ui32>(record.GetColumnIds()); read.ColumnNames = ProtoToVector<TString>(record.GetColumnNames()); + + const NOlap::TIndexInfo& indexInfo = Self->TablesManager.GetIndexInfo(read.GetSnapshot()); if (read.ColumnIds.empty() && read.ColumnNames.empty()) { auto allColumnNames = indexInfo.ArrowSchema()->field_names(); read.ColumnNames.assign(allColumnNames.begin(), allColumnNames.end()); diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp index 19d7195fb87..464c6902169 100644 --- a/ydb/core/tx/columnshard/columnshard__write_index.cpp +++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp @@ -62,6 +62,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) NOlap::TSnapshot snapshot = changes->ApplySnapshot; if (snapshot.IsZero()) { snapshot = NOlap::TSnapshot(Self->LastPlannedStep, Self->LastPlannedTxId); + Y_VERIFY(Ev->Get()->IndexInfo.GetLastSchema()->GetSnapshot() <= snapshot); } TBlobGroupSelector dsGroupSelector(Self->Info()); diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 4004ca1511e..cbf3431927a 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -746,7 +746,7 @@ std::unique_ptr<TEvPrivate::TEvIndexing> TColumnShard::SetupIndexation() { return {}; } - auto actualIndexInfo = TablesManager.GetIndexInfo(); + auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex(); ActiveIndexingOrCompaction = true; auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, Settings.CacheDataAfterIndexing, std::move(cachedBlobs)); @@ -784,7 +784,7 @@ std::unique_ptr<TEvPrivate::TEvCompaction> TColumnShard::SetupCompaction() { return {}; } - auto actualIndexInfo = TablesManager.GetIndexInfo(); + auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex(); ActiveIndexingOrCompaction = true; auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move(actualIndexInfo), indexChanges, Settings.CacheDataAfterCompaction); @@ -828,9 +828,9 @@ std::unique_ptr<TEvPrivate::TEvEviction> TColumnShard::SetupTtl(const THashMap<u LOG_S_DEBUG("Evicting path " << i.first << " with " << i.second.GetDebugString() << " at tablet " << TabletID()); } - auto actualIndexInfo = TablesManager.GetIndexInfo(); + auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex(); std::shared_ptr<NOlap::TColumnEngineChanges> indexChanges; - indexChanges = TablesManager.MutablePrimaryIndex().StartTtl(eviction, actualIndexInfo.ArrowSchema()); + indexChanges = TablesManager.MutablePrimaryIndex().StartTtl(eviction, actualIndexInfo.GetLastSchema()->GetIndexInfo().ArrowSchema()); if (!indexChanges) { LOG_S_DEBUG("Cannot prepare TTL at tablet " << TabletID()); @@ -893,7 +893,7 @@ std::unique_ptr<TEvPrivate::TEvWriteIndex> TColumnShard::SetupCleanup() { return {}; } - auto actualIndexInfo = TablesManager.GetIndexInfo(); + auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndex(); #if 0 // No need for now if (Tiers) { ... diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h index 685f6c47888..05a23aca572 100644 --- a/ydb/core/tx/columnshard/columnshard_private_events.h +++ b/ydb/core/tx/columnshard/columnshard_private_events.h @@ -27,7 +27,7 @@ struct TEvPrivate { /// Common event for Indexing and GranuleCompaction: write index data in TTxWriteIndex transaction. struct TEvWriteIndex : public TEventLocal<TEvWriteIndex, EvWriteIndex> { NKikimrProto::EReplyStatus PutStatus = NKikimrProto::UNKNOWN; - NOlap::TIndexInfo IndexInfo; + NOlap::TVersionedIndex IndexInfo; THashMap<ui64, NKikimr::NOlap::TTiering> Tiering; std::shared_ptr<NOlap::TColumnEngineChanges> IndexChanges; THashMap<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>> CachedBlobs; @@ -40,7 +40,7 @@ struct TEvPrivate { bool CacheData{false}; TDuration Duration; - TEvWriteIndex(NOlap::TIndexInfo&& indexInfo, + TEvWriteIndex(NOlap::TVersionedIndex&& indexInfo, std::shared_ptr<NOlap::TColumnEngineChanges> indexChanges, bool cacheData, THashMap<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>>&& cachedBlobs = {}) diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index 444fe64427e..faac7b563e6 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -339,17 +339,14 @@ class TVersionedIndex { std::map<TSnapshot, ISnapshotSchema::TPtr> Snapshots; public: ISnapshotSchema::TPtr GetSchema(const TSnapshot& version) const { - Y_UNUSED(version); - return GetLastSchema(); - /* - for (auto it = Snapshots.rbegin(); it != Snapshots.rend(); ++it) { - if (it->first <= version) { - return it->second; - } + for (auto it = Snapshots.rbegin(); it != Snapshots.rend(); ++it) { + if (it->first <= version) { + return it->second; } - Y_VERIFY(false); - return nullptr; - */ + } + Y_VERIFY(!Snapshots.empty()); + Y_VERIFY(version.IsZero()); + return Snapshots.begin()->second; // For old compaction logic compatibility } ISnapshotSchema::TPtr GetLastSchema() const { diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index ec5b3b2579c..b99a5951f3d 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -27,7 +27,7 @@ bool InitInGranuleMerge(const TMark& granuleMark, std::vector<TPortionInfo>& por for (const auto& portionInfo : portions) { if (portionInfo.IsInserted()) { ++insertedCount; - if (portionInfo.Snapshot().GetPlanStep() > oldTimePlanStep) { + if (portionInfo.GetSnapshot().GetPlanStep() > oldTimePlanStep) { ++insertedNew; } } else if (portionInfo.BlobsSizes().second >= limits.GoodBlobSize) { @@ -602,7 +602,7 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const T } isClean = false; - if (info.XSnapshot() < snapshot) { + if (info.GetXSnapshot() < snapshot) { affectedRecords += info.NumRecords(); changes->PortionsToDrop.push_back(info); } @@ -1204,8 +1204,8 @@ TMap<TSnapshot, std::vector<ui64>> TColumnEngineForLogs::GetOrderedPortions(ui64 continue; } - TSnapshot recSnapshot = portionInfo.Snapshot(); - TSnapshot recXSnapshot = portionInfo.XSnapshot(); + TSnapshot recSnapshot = portionInfo.GetSnapshot(); + TSnapshot recXSnapshot = portionInfo.GetXSnapshot(); bool visible = (recSnapshot <= snapshot); if (recXSnapshot.GetPlanStep()) { diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp index e4ed20b5cb8..4bccce87cc3 100644 --- a/ydb/core/tx/columnshard/engines/index_logic_logs.cpp +++ b/ydb/core/tx/columnshard/engines/index_logic_logs.cpp @@ -59,19 +59,21 @@ bool TEvictionLogic::UpdateEvictedPortion(TPortionInfo& portionInfo, Y_VERIFY(!evictFeatures.NeedExport); - auto schema = IndexInfo.ArrowSchemaWithSpecials(); - auto batch = portionInfo.AssembleInBatch(IndexInfo, schema, srcBlobs); + TPortionInfo undo = portionInfo; + + auto blobSchema = IndexInfo.GetSchema(undo.GetSnapshot()); + auto resultSchema = IndexInfo.GetLastSchema(); + auto batch = portionInfo.AssembleInBatch(*blobSchema, *resultSchema, srcBlobs); auto writeOptions = WriteOptions(*compression); - TPortionInfo undo = portionInfo; size_t undoSize = newBlobs.size(); for (auto& rec : portionInfo.Records) { - auto colName = IndexInfo.GetColumnName(rec.ColumnId); - std::string name(colName.data(), colName.size()); - auto field = schema->GetFieldByName(name); + auto pos = resultSchema->GetFieldIndex(rec.ColumnId); + Y_VERIFY(pos >= 0); + auto field = resultSchema->GetField(pos); - auto blob = TPortionInfo::SerializeColumn(batch->GetColumnByName(name), field, writeOptions); + auto blob = TPortionInfo::SerializeColumn(batch->GetColumnByName(field->name()), field, writeOptions); if (blob.size() >= TPortionInfo::BLOB_BYTES_LIMIT) { portionInfo = undo; newBlobs.resize(undoSize); @@ -85,7 +87,7 @@ bool TEvictionLogic::UpdateEvictedPortion(TPortionInfo& portionInfo, evictedRecords.emplace_back(std::move(rec)); } - portionInfo.AddMetadata(IndexInfo, batch, evictFeatures.TargetTierName); + portionInfo.AddMetadata(*resultSchema, batch, evictFeatures.TargetTierName); return true; } @@ -95,11 +97,12 @@ std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathI const TSnapshot& minSnapshot, std::vector<TString>& blobs) const { Y_VERIFY(batch->num_rows()); - const auto schema = IndexInfo.ArrowSchemaWithSpecials(); + + auto resultSchema = IndexInfo.GetSchema(minSnapshot); std::vector<TPortionInfo> out; TString tierName; - TCompression compression = IndexInfo.GetDefaultCompression(); + TCompression compression = resultSchema->GetIndexInfo().GetDefaultCompression(); if (pathId) { if (auto* tiering = GetTieringMap().FindPtr(pathId)) { tierName = tiering->GetHottestTierName(); @@ -115,16 +118,16 @@ std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathI Y_VERIFY(portionBatch->num_rows()); TPortionInfo portionInfo; - portionInfo.Records.reserve(schema->num_fields()); + portionInfo.Records.reserve(resultSchema->GetSchema()->num_fields()); std::vector<TString> portionBlobs; - portionBlobs.reserve(schema->num_fields()); + portionBlobs.reserve(resultSchema->GetSchema()->num_fields()); // Serialize portion's columns into blobs bool ok = true; - for (const auto& field : schema->fields()) { + for (const auto& field : resultSchema->GetSchema()->fields()) { const auto& name = field->name(); - ui32 columnId = IndexInfo.GetColumnId(TString(name.data(), name.size())); + ui32 columnId = resultSchema->GetIndexInfo().GetColumnId(TString(name.data(), name.size())); /// @warnign records are not valid cause of empty BlobId and zero Portion TColumnRecord record = TColumnRecord::Make(granule, columnId, minSnapshot, 0); @@ -140,7 +143,7 @@ std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathI } if (ok) { - portionInfo.AddMetadata(IndexInfo, portionBatch, tierName); + portionInfo.AddMetadata(*resultSchema, portionBatch, tierName); out.emplace_back(std::move(portionInfo)); for (auto& blob : portionBlobs) { blobs.push_back(blob); @@ -162,14 +165,12 @@ std::vector<TPortionInfo> TIndexLogicBase::MakeAppendedPortions(const ui64 pathI std::vector<std::shared_ptr<arrow::RecordBatch>> TCompactionLogic::PortionsToBatches(const std::vector<TPortionInfo>& portions, const THashMap<TBlobRange, TString>& blobs, bool insertedOnly) const { - // TODO: schema changes - const std::shared_ptr<arrow::Schema> schema = IndexInfo.ArrowSchemaWithSpecials(); - std::vector<std::shared_ptr<arrow::RecordBatch>> batches; batches.reserve(portions.size()); - + auto resultSchema = IndexInfo.GetLastSchema(); for (auto& portionInfo : portions) { - auto batch = portionInfo.AssembleInBatch(IndexInfo, schema, blobs); + auto blobSchema = IndexInfo.GetSchema(portionInfo.GetSnapshot()); + auto batch = portionInfo.AssembleInBatch(*blobSchema, *resultSchema, blobs); if (!insertedOnly || portionInfo.IsInserted()) { batches.push_back(batch); } @@ -224,17 +225,22 @@ std::vector<TString> TIndexationLogic::Apply(std::shared_ptr<TColumnEngineChange auto changes = std::static_pointer_cast<TColumnEngineForLogs::TChanges>(indexChanges); Y_VERIFY(!changes->DataToIndex.empty()); Y_VERIFY(changes->AppendedPortions.empty()); - Y_VERIFY(IndexInfo.IsSorted()); - TSnapshot& minSnapshot = changes->ApplySnapshot; - THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> pathBatches; + + TSnapshot minSnapshot = changes->ApplySnapshot; for (auto& inserted : changes->DataToIndex) { TSnapshot insertSnap = inserted.GetSnapshot(); Y_VERIFY(insertSnap.Valid()); if (minSnapshot.IsZero() || insertSnap <= minSnapshot) { minSnapshot = insertSnap; } - + } + Y_VERIFY(minSnapshot.Valid()); + auto& indexInfo = IndexInfo.GetSchema(minSnapshot)->GetIndexInfo(); + Y_VERIFY(indexInfo.IsSorted()); + + THashMap<ui64, std::vector<std::shared_ptr<arrow::RecordBatch>>> pathBatches; + for (auto& inserted : changes->DataToIndex) { TBlobRange blobRange(inserted.BlobId, 0, inserted.BlobId.BlobSize()); std::shared_ptr<arrow::RecordBatch> batch; @@ -242,18 +248,16 @@ std::vector<TString> TIndexationLogic::Apply(std::shared_ptr<TColumnEngineChange batch = it->second; } else if (auto* blobData = changes->Blobs.FindPtr(blobRange)) { Y_VERIFY(!blobData->empty(), "Blob data not present"); - batch = NArrow::DeserializeBatch(*blobData, IndexInfo.ArrowSchema()); + batch = NArrow::DeserializeBatch(*blobData, indexInfo.ArrowSchema()); } else { Y_VERIFY(blobData, "Data for range %s has not been read", blobRange.ToString().c_str()); } Y_VERIFY(batch); - batch = AddSpecials(batch, IndexInfo, inserted); + batch = AddSpecials(batch, indexInfo, inserted); pathBatches[inserted.PathId].push_back(batch); - Y_VERIFY_DEBUG(NArrow::IsSorted(pathBatches[inserted.PathId].back(), IndexInfo.GetReplaceKey())); + Y_VERIFY_DEBUG(NArrow::IsSorted(pathBatches[inserted.PathId].back(), indexInfo.GetReplaceKey())); } - Y_VERIFY(minSnapshot.Valid()); - std::vector<TString> blobs; for (auto& [pathId, batches] : pathBatches) { @@ -265,13 +269,13 @@ std::vector<TString> TIndexationLogic::Apply(std::shared_ptr<TColumnEngineChange Y_VERIFY(merged); Y_VERIFY_DEBUG(NArrow::IsSorted(merged, indexInfo.GetReplaceKey())); #else - auto merged = NArrow::CombineSortedBatches(batches, IndexInfo.SortReplaceDescription()); + auto merged = NArrow::CombineSortedBatches(batches, indexInfo.SortReplaceDescription()); Y_VERIFY(merged); - Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(merged, IndexInfo.GetReplaceKey())); + Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(merged, indexInfo.GetReplaceKey())); #endif - auto granuleBatches = SliceIntoGranules(merged, changes->PathToGranule[pathId], IndexInfo); + auto granuleBatches = SliceIntoGranules(merged, changes->PathToGranule[pathId], indexInfo); for (auto& [granule, batch] : granuleBatches) { auto portions = MakeAppendedPortions(pathId, batch, granule, minSnapshot, blobs); Y_VERIFY(portions.size() > 0); @@ -291,17 +295,17 @@ std::shared_ptr<arrow::RecordBatch> TCompactionLogic::CompactInOneGranule(ui64 g std::vector<std::shared_ptr<arrow::RecordBatch>> batches; batches.reserve(portions.size()); - auto schema = IndexInfo.ArrowSchemaWithSpecials(); + auto resultSchema = IndexInfo.GetLastSchema(); for (auto& portionInfo : portions) { Y_VERIFY(!portionInfo.Empty()); Y_VERIFY(portionInfo.Granule() == granule); - - auto batch = portionInfo.AssembleInBatch(IndexInfo, schema, blobs); + auto blobSchema = IndexInfo.GetSchema(portionInfo.GetSnapshot()); + auto batch = portionInfo.AssembleInBatch(*blobSchema, *resultSchema, blobs); batches.push_back(batch); } - auto sortedBatch = NArrow::CombineSortedBatches(batches, IndexInfo.SortReplaceDescription()); - Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(sortedBatch, IndexInfo.GetReplaceKey())); + auto sortedBatch = NArrow::CombineSortedBatches(batches, resultSchema->GetIndexInfo().SortReplaceDescription()); + Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(sortedBatch, resultSchema->GetIndexInfo().GetReplaceKey())); return sortedBatch; } @@ -315,10 +319,11 @@ std::vector<TString> TCompactionLogic::CompactInGranule(std::shared_ptr<TColumnE ui64 granule = switchedProtions[0].Granule(); auto batch = CompactInOneGranule(granule, switchedProtions, changes->Blobs); + auto resultSchema = IndexInfo.GetLastSchema(); std::vector<TPortionInfo> portions; if (!changes->MergeBorders.Empty()) { Y_VERIFY(changes->MergeBorders.GetOrderedMarks().size() > 1); - auto slices = changes->MergeBorders.SliceIntoGranules(batch, IndexInfo); + auto slices = changes->MergeBorders.SliceIntoGranules(batch, resultSchema->GetIndexInfo()); portions.reserve(slices.size()); for (auto& [_, slice] : slices) { @@ -577,6 +582,8 @@ std::vector<TString> TCompactionLogic::CompactSplitGranule(const std::shared_ptr std::vector<TString> blobs; + auto resultSchema = IndexInfo.GetLastSchema(); + if (movedRows) { Y_VERIFY(changes->PortionsToMove.size() >= 2); Y_VERIFY(changes->PortionsToMove.size() == tsIds.size()); @@ -628,7 +635,7 @@ std::vector<TString> TCompactionLogic::CompactSplitGranule(const std::shared_ptr for (size_t i = 0; i < portions.size(); ++i) { auto& portion = portions[i]; auto& batch = srcBatches[i]; - auto slices = marksGranules.SliceIntoGranules(batch, IndexInfo); + auto slices = marksGranules.SliceIntoGranules(batch, resultSchema->GetIndexInfo()); THashSet<ui64> ids; for (auto& [id, slice] : slices) { @@ -672,7 +679,7 @@ std::vector<TString> TCompactionLogic::CompactSplitGranule(const std::shared_ptr } } } else { - auto batches = SliceGranuleBatches(IndexInfo, *changes, srcBatches, ts0); + auto batches = SliceGranuleBatches(resultSchema->GetIndexInfo(), *changes, srcBatches, ts0); changes->SetTmpGranule(pathId, ts0); for (auto& [ts, batch] : batches) { diff --git a/ydb/core/tx/columnshard/engines/index_logic_logs.h b/ydb/core/tx/columnshard/engines/index_logic_logs.h index ecd2100dbef..91471217ee3 100644 --- a/ydb/core/tx/columnshard/engines/index_logic_logs.h +++ b/ydb/core/tx/columnshard/engines/index_logic_logs.h @@ -8,19 +8,20 @@ namespace NKikimr::NOlap { class TIndexLogicBase { protected: - const TIndexInfo& IndexInfo; + const TVersionedIndex& IndexInfo; private: const THashMap<ui64, NKikimr::NOlap::TTiering>* TieringMap = nullptr; public: - TIndexLogicBase(const TIndexInfo& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap) + TIndexLogicBase(const TVersionedIndex& indexInfo, const THashMap<ui64, NKikimr::NOlap::TTiering>& tieringMap) : IndexInfo(indexInfo) , TieringMap(&tieringMap) { } - TIndexLogicBase(const TIndexInfo& indexInfo) - : IndexInfo(indexInfo) { + TIndexLogicBase(const TVersionedIndex& indexInfo) + : IndexInfo(indexInfo) + { } virtual ~TIndexLogicBase() { diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index ebc5a0fb4b1..edccbfcad1f 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -190,7 +190,7 @@ TIndexedReadData::MakeNotIndexedBatch(const std::shared_ptr<arrow::RecordBatch>& // (do not filter snapshot columns) auto loadSchema = ReadMetadata->GetLoadSchema(snapshot); - auto batch = NArrow::ExtractExistedColumns(srcBatch, loadSchema); + auto batch = NArrow::ExtractExistedColumns(srcBatch, loadSchema->GetSchema()); Y_VERIFY(batch); auto filter = FilterNotIndexed(batch, *ReadMetadata); @@ -200,7 +200,7 @@ TIndexedReadData::MakeNotIndexedBatch(const std::shared_ptr<arrow::RecordBatch>& auto preparedBatch = batch; preparedBatch = TIndexInfo::AddSpecialColumns(preparedBatch, snapshot); - preparedBatch = NArrow::ExtractColumns(preparedBatch, loadSchema); + preparedBatch = NArrow::ExtractColumns(preparedBatch, loadSchema->GetSchema()); Y_VERIFY(preparedBatch); filter.Apply(preparedBatch); diff --git a/ydb/core/tx/columnshard/engines/portion_info.cpp b/ydb/core/tx/columnshard/engines/portion_info.cpp index 5ac90c67303..55c36d4c5d8 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portion_info.cpp @@ -44,11 +44,12 @@ void TPortionInfo::AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& Meta.ColumnMeta[columnId].Max = NArrow::GetScalar(column, minMaxPos.second); } -void TPortionInfo::AddMetadata(const TIndexInfo& indexInfo, const std::shared_ptr<arrow::RecordBatch>& batch, +void TPortionInfo::AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch, const TString& tierName) { TierName = tierName; Meta = {}; + auto& indexInfo = snapshotSchema.GetIndexInfo(); /// @note It does not add RawBytes info for snapshot columns, only for user ones. for (auto& [columnId, col] : indexInfo.Columns) { auto column = batch->GetColumnByName(col.Name); diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h index c70e4d7dcf6..97a680aefc2 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portion_info.h @@ -56,6 +56,55 @@ public: } }; +class TFilteredSnapshotSchema : public ISnapshotSchema { + ISnapshotSchema::TPtr OriginalSnapshot; + std::shared_ptr<arrow::Schema> Schema; + std::set<ui32> ColumnIds; +public: + TFilteredSnapshotSchema(ISnapshotSchema::TPtr originalSnapshot, const std::vector<ui32>& columnIds) + : TFilteredSnapshotSchema(originalSnapshot, std::set(columnIds.begin(), columnIds.end())) + {} + + TFilteredSnapshotSchema(ISnapshotSchema::TPtr originalSnapshot, const std::set<ui32>& columnIds) + : OriginalSnapshot(originalSnapshot) + , ColumnIds(columnIds) + { + std::vector<std::shared_ptr<arrow::Field>> schemaFields; + for (auto&& i : OriginalSnapshot->GetSchema()->fields()) { + if (!ColumnIds.contains(OriginalSnapshot->GetIndexInfo().GetColumnId(i->name()))) { + continue; + } + schemaFields.emplace_back(i); + } + Schema = std::make_shared<arrow::Schema>(schemaFields); + } + + int GetFieldIndex(const ui32 columnId) const override { + if (!ColumnIds.contains(columnId)) { + return -1; + } + TString columnName = OriginalSnapshot->GetIndexInfo().GetColumnName(columnId); + std::string name(columnName.data(), columnName.size()); + return Schema->GetFieldIndex(name); + } + + std::shared_ptr<arrow::Field> GetField(const int index) const override { + return Schema->field(index); + } + + const std::shared_ptr<arrow::Schema>& GetSchema() const override { + return Schema; + } + + const TIndexInfo& GetIndexInfo() const override { + return OriginalSnapshot->GetIndexInfo(); + } + + const TSnapshot& GetSnapshot() const override { + return OriginalSnapshot->GetSnapshot(); + } +}; + struct TPortionMeta { // NOTE: These values are persisted in LocalDB so they must be stable enum EProduced : ui32 { @@ -139,20 +188,20 @@ struct TPortionInfo { } } - TSnapshot Snapshot() const { + TSnapshot GetSnapshot() const { Y_VERIFY(!Empty()); auto& rec = Records[0]; return TSnapshot(rec.PlanStep, rec.TxId); } - TSnapshot XSnapshot() const { + TSnapshot GetXSnapshot() const { Y_VERIFY(!Empty()); auto& rec = Records[0]; return TSnapshot(rec.XPlanStep, rec.XTxId); } bool IsActive() const { - return XSnapshot().IsZero(); + return GetXSnapshot().IsZero(); } std::pair<ui32, ui32> BlobsSizes() const { @@ -208,7 +257,7 @@ struct TPortionInfo { TString GetMetadata(const TColumnRecord& rec) const; void LoadMetadata(const TIndexInfo& indexInfo, const TColumnRecord& rec); - void AddMetadata(const TIndexInfo& indexInfo, const std::shared_ptr<arrow::RecordBatch>& batch, + void AddMetadata(const ISnapshotSchema& snapshotSchema, const std::shared_ptr<arrow::RecordBatch>& batch, const TString& tierName); void AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted); @@ -363,6 +412,7 @@ public: private: std::vector<TPreparedColumn> Columns; std::shared_ptr<arrow::Schema> Schema; + size_t RowsCount = 0; public: struct TAssembleOptions { @@ -398,9 +448,14 @@ public: return Columns.size(); } - TPreparedBatchData(std::vector<TPreparedColumn>&& columns, std::shared_ptr<arrow::Schema> schema) + size_t GetRowsCount() const { + return RowsCount; + } + + TPreparedBatchData(std::vector<TPreparedColumn>&& columns, std::shared_ptr<arrow::Schema> schema, const size_t rowsCount) : Columns(std::move(columns)) , Schema(schema) + , RowsCount(rowsCount) { } @@ -408,42 +463,45 @@ public: }; template <class TExternalBlobInfo> - TPreparedBatchData PrepareForAssemble(const TIndexInfo& indexInfo, - const std::shared_ptr<arrow::Schema>& schema, - const THashMap<TBlobRange, TExternalBlobInfo>& blobsData, const std::optional<std::set<ui32>>& columnIds) const { - // Correct records order - TMap<int, TMap<ui32, TBlobRange>> columnChunks; // position in schema -> ordered chunks - - std::vector<std::shared_ptr<arrow::Field>> schemaFields; + TPreparedBatchData PrepareForAssemble(const ISnapshotSchema& dataSchema, const ISnapshotSchema& resultSchema, + const THashMap<TBlobRange, TExternalBlobInfo>& blobsData) const { + std::vector<TPreparedColumn> columns; + columns.reserve(resultSchema.GetSchema()->num_fields()); - for (auto&& i : schema->fields()) { - if (columnIds && !columnIds->contains(indexInfo.GetColumnId(i->name()))) { - continue; - } - schemaFields.emplace_back(i); + Y_VERIFY(!Meta.ColumnMeta.empty()); + const ui32 rowsCount = Meta.ColumnMeta.begin()->second.NumRows; + const auto& indexInfo = resultSchema.GetIndexInfo(); + for (auto&& field : resultSchema.GetSchema()->fields()) { + columns.emplace_back(TPreparedColumn(field, {TAssembleBlobInfo(rowsCount)}, indexInfo.GetColumnId(field->name()))); } + TMap<size_t, TMap<ui32, TBlobRange>> columnChunks; // position in schema -> ordered chunks + TMap<size_t, size_t> positionsMap; + for (auto& rec : Records) { - if (columnIds && !columnIds->contains(rec.ColumnId)) { + auto resulPos = resultSchema.GetFieldIndex(rec.ColumnId); + if (resulPos < 0) { continue; } - ui32 columnId = rec.ColumnId; - TString columnName = indexInfo.GetColumnName(columnId); - std::string name(columnName.data(), columnName.size()); - int pos = schema->GetFieldIndex(name); - if (pos < 0) { - continue; // no such column in schema - do not need it + auto pos = dataSchema.GetFieldIndex(rec.ColumnId); + Y_ASSERT(pos >= 0); + positionsMap[resulPos] = pos; + columnChunks[resulPos][rec.Chunk] = rec.BlobRange; + auto columnMeta = Meta.ColumnMeta.FindPtr(rec.ColumnId); + if (columnMeta) { + Y_VERIFY_S(rowsCount == columnMeta->NumRows, TStringBuilder() << "Inconsistent rows " << rowsCount << "/" << columnMeta->NumRows); } - - columnChunks[pos][rec.Chunk] = rec.BlobRange; } // Make chunked arrays for columns - std::vector<TPreparedColumn> columns; - columns.reserve(columnChunks.size()); - for (auto& [pos, orderedChunks] : columnChunks) { - auto field = schema->field(pos); + Y_VERIFY(positionsMap.contains(pos)); + size_t dataPos = positionsMap[pos]; + auto portionField = dataSchema.GetField(dataPos); + auto resultField = resultSchema.GetField(pos); + + Y_VERIFY(portionField->IsCompatibleWith(*resultField)); + std::vector<TAssembleBlobInfo> blobs; blobs.reserve(orderedChunks.size()); ui32 expected = 0; @@ -456,16 +514,17 @@ public: blobs.emplace_back(it->second); } - columns.emplace_back(TPreparedColumn(field, std::move(blobs), indexInfo.GetColumnId(field->name()))); + Y_VERIFY(pos < columns.size()); + columns[pos] = TPreparedColumn(resultField, std::move(blobs), indexInfo.GetColumnId(resultField->name())); } - return TPreparedBatchData(std::move(columns), std::make_shared<arrow::Schema>(schemaFields)); + return TPreparedBatchData(std::move(columns), resultSchema.GetSchema(), rowsCount); } - std::shared_ptr<arrow::RecordBatch> AssembleInBatch(const TIndexInfo& indexInfo, - const std::shared_ptr<arrow::Schema>& schema, - const THashMap<TBlobRange, TString>& data) const { - return PrepareForAssemble(indexInfo, schema, data, {}).Assemble(); + std::shared_ptr<arrow::RecordBatch> AssembleInBatch(const ISnapshotSchema& dataSchema, + const ISnapshotSchema& resultSchema, + const THashMap<TBlobRange, TString>& data) const { + return PrepareForAssemble(dataSchema, resultSchema, data).Assemble(); } static TString SerializeColumn(const std::shared_ptr<arrow::Array>& array, diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp index 1f320b18fae..f66481c1d0d 100644 --- a/ydb/core/tx/columnshard/engines/reader/batch.cpp +++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp @@ -28,8 +28,15 @@ NColumnShard::IDataTasksProcessor::ITask::TPtr TBatch::AssembleTask(NColumnShard Y_VERIFY(WaitIndexed.empty()); Y_VERIFY(PortionInfo->Produced()); Y_VERIFY(!FetchedInfo.GetFilteredBatch()); - auto loadSchema = readMetadata->GetLoadSchema(readMetadata->GetSnapshot()); - auto batchConstructor = PortionInfo->PrepareForAssemble(readMetadata->GetIndexInfo(), loadSchema, Data, CurrentColumnIds); + + auto blobSchema = readMetadata->GetLoadSchema(PortionInfo->GetSnapshot()); + ISnapshotSchema::TPtr resultSchema; + if (CurrentColumnIds) { + resultSchema= std::make_shared<TFilteredSnapshotSchema>(readMetadata->GetLoadSchema(readMetadata->GetSnapshot()), *CurrentColumnIds); + } else { + resultSchema = readMetadata->GetLoadSchema(readMetadata->GetSnapshot()); + } + auto batchConstructor = PortionInfo->PrepareForAssemble(*blobSchema, *resultSchema, Data); Data.clear(); if (!FetchedInfo.GetFilter()) { return std::make_shared<TAssembleFilter>(std::move(batchConstructor), readMetadata, diff --git a/ydb/core/tx/columnshard/engines/reader/description.h b/ydb/core/tx/columnshard/engines/reader/description.h index 69f299e4493..b2928673e2a 100644 --- a/ydb/core/tx/columnshard/engines/reader/description.h +++ b/ydb/core/tx/columnshard/engines/reader/description.h @@ -36,6 +36,7 @@ public: std::vector<TString> ColumnNames; std::shared_ptr<NSsa::TProgram> AddProgram(const IColumnResolver& columnResolver, const NKikimrSSA::TProgram& program); + TReadDescription(const TSnapshot& snapshot, const bool isReverse) : Snapshot(snapshot) , PKRangesFilter(isReverse) { diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp index a984369d33d..da799734056 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp @@ -94,16 +94,10 @@ bool TReadMetadata::Init(const TReadDescription& readDescription, const TDataSto CommittedBatches.emplace(cmt.GetBlobId(), batch); } } - - auto loadSchema = GetLoadSchema(Snapshot); - if (!loadSchema) { - return false; - } - + THashSet<ui32> columnIds; - for (auto& field : loadSchema->fields()) { - TString column(field->name().data(), field->name().size()); - columnIds.insert(indexInfo.GetColumnId(column)); + for (auto& columnId : AllColumns) { + columnIds.insert(columnId); } Program = readDescription.Program; diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/read_metadata.h index 56b3878581e..44aab0cd508 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.h +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.h @@ -162,9 +162,18 @@ public: return AllColumns.size(); } - std::shared_ptr<arrow::Schema> GetLoadSchema(const TSnapshot& version) const { - const auto& indexInfo = IndexVersions.GetSchema(version)->GetIndexInfo(); - return indexInfo.ArrowSchema(AllColumns, true); + ISnapshotSchema::TPtr GetSnapshotSchema(const TSnapshot& version) const { + if (version >=Snapshot){ + return ResultIndexSchema; + } + return IndexVersions.GetSchema(version); + } + + ISnapshotSchema::TPtr GetLoadSchema(const std::optional<TSnapshot>& version = {}) const { + if (!version) { + return make_shared<TFilteredSnapshotSchema>(ResultIndexSchema, AllColumns); + } + return make_shared<TFilteredSnapshotSchema>(IndexVersions.GetSchema(*version), AllColumns); } std::shared_ptr<arrow::Schema> GetBlobSchema(const TSnapshot& version) const { @@ -185,7 +194,7 @@ public: std::vector<std::string> GetColumnsOrder() const { auto loadSchema = GetLoadSchema(Snapshot); std::vector<std::string> result; - for (auto&& i : loadSchema->fields()) { + for (auto&& i : loadSchema->GetSchema()->fields()) { result.emplace_back(i->name()); } return result; diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index 36e22f2699d..48d007f2a34 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -280,7 +280,7 @@ bool Insert(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, changes->Blobs.insert(blobs.begin(), blobs.end()); - TIndexationLogic logic(engine.GetIndexInfo()); + TIndexationLogic logic(engine.GetVersionedIndex()); std::vector<TString> newBlobs = logic.Apply(changes); UNIT_ASSERT_VALUES_EQUAL(changes->AppendedPortions.size(), 1); UNIT_ASSERT_VALUES_EQUAL(newBlobs.size(), testColumns.size() + 2); // add 2 columns: planStep, txId @@ -316,7 +316,7 @@ bool Compact(TColumnEngineForLogs& engine, TTestDbWrapper& db, TSnapshot snap, T UNIT_ASSERT_VALUES_EQUAL(changes->SwitchedPortions.size(), expected.SrcPortions); changes->SetBlobs(std::move(blobs)); - TCompactionLogic logic(engine.GetIndexInfo()); + TCompactionLogic logic(engine.GetVersionedIndex()); std::vector<TString> newBlobs = logic.Apply(changes); UNIT_ASSERT_VALUES_EQUAL(changes->AppendedPortions.size(), expected.NewPortions); AddIdsToBlobs(newBlobs, changes->AppendedPortions, changes->Blobs, step); @@ -499,7 +499,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // compact planStep = 2; - bool ok = Compact(tableInfo, db, TSnapshot{planStep, 1}, std::move(blobs), step, {20, 4, 4}); + bool ok = Compact(tableInfo, db, TSnapshot(planStep, 1), std::move(blobs), step, {20, 4, 4}); UNIT_ASSERT(ok); // load @@ -620,7 +620,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // compact planStep = 2; - bool ok = Compact(engine, db, TSnapshot{planStep, 1}, std::move(blobs), step, {23, 5, 5}); + bool ok = Compact(engine, db, TSnapshot(planStep, 1), std::move(blobs), step, {23, 5, 5}); UNIT_ASSERT(ok); // success write after compaction @@ -680,7 +680,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // compact planStep = 2; - bool ok = Compact(tableInfo, db, TSnapshot{planStep, 1}, std::move(blobs), step, {20, 4, 4}); + bool ok = Compact(tableInfo, db, TSnapshot(planStep, 1), std::move(blobs), step, {20, 4, 4}); UNIT_ASSERT(ok); // load @@ -704,7 +704,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { } // Cleanup - Cleanup(engine, db, TSnapshot{planStep, 1}, 20); + Cleanup(engine, db, TSnapshot(planStep, 1), 20); { // full scan ui64 txId = 1; diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp index f0cc7d8fcc8..c84debaa779 100644 --- a/ydb/core/tx/columnshard/tables_manager.cpp +++ b/ydb/core/tx/columnshard/tables_manager.cpp @@ -222,10 +222,7 @@ bool TTablesManager::RegisterSchemaPreset(const TSchemaPreset& schemaPreset, NIc void TTablesManager::AddPresetVersion(const ui32 presetId, const TRowVersion& version, const TTableSchema& schema, NIceDb::TNiceDb& db) { Y_VERIFY(SchemaPresets.contains(presetId)); auto preset = SchemaPresets.at(presetId); - if (!preset.IsEmpty()) { - LOG_S_DEBUG("EnsureSchemaPreset for existed preset " << presetId << " at tablet " << TabletId); - return; - } + TSchemaPreset::TSchemaPresetVersionInfo versionInfo; versionInfo.SetId(presetId); versionInfo.SetSinceStep(version.Step); @@ -283,7 +280,7 @@ void TTablesManager::IndexSchemaVersion(const TRowVersion& version, const TTable } } -std::shared_ptr<NOlap::TColumnEngineChanges> TTablesManager::StartIndexCleanup(const NOlap::TSnapshot& snapshot, ui32 maxRecords) { +std::shared_ptr<NOlap::TColumnEngineChanges> TTablesManager::StartIndexCleanup(const NOlap::TSnapshot& snapshot, ui32 maxRecords) { Y_VERIFY(PrimaryIndex); return PrimaryIndex->StartCleanup(snapshot, PathsToDrop, maxRecords); } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp index 7e927ea4520..610bc8a3c5b 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_table.cpp @@ -38,7 +38,7 @@ public: return false; } - if (AlterRequest.HasAlterSchema() || AlterRequest.HasAlterSchemaPresetName()) { + if (AlterRequest.HasAlterSchemaPresetName()) { errors.AddError(NKikimrScheme::StatusSchemeError, "Changing table schema is not supported"); return false; } diff --git a/ydb/core/tx/schemeshard/schemeshard_olap_types.h b/ydb/core/tx/schemeshard/schemeshard_olap_types.h index f85cc2c25d4..19c5ea65124 100644 --- a/ydb/core/tx/schemeshard/schemeshard_olap_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_olap_types.h @@ -98,7 +98,6 @@ namespace NKikimr::NSchemeShard { bool Validate(const NKikimrSchemeOp::TColumnTableSchema& opSchema, IErrorCollector& errors) const; bool ValidateTtlSettings(const NKikimrSchemeOp::TColumnDataLifeCycle& ttlSettings, IErrorCollector& errors) const; - static bool UpdateProto(NKikimrSchemeOp::TColumnTableSchema& proto, TString& errStr); static bool IsAllowedType(ui32 typeId); static bool IsAllowedFirstPkType(ui32 typeId); }; |