diff options
27 files changed, 456 insertions, 83 deletions
diff --git a/ydb/core/formats/arrow/arrow_filter.h b/ydb/core/formats/arrow/arrow_filter.h index 29dc8471c54..b33b9a13707 100644 --- a/ydb/core/formats/arrow/arrow_filter.h +++ b/ydb/core/formats/arrow/arrow_filter.h @@ -46,13 +46,13 @@ private: static ui32 CrossSize(const ui32 s1, const ui32 f1, const ui32 s2, const ui32 f2); class TMergerImpl; - void Add(const bool value, const ui32 count = 1); void Reset(const ui32 count); void ResetCaches() const { FilterPlain.reset(); FilteredCount.reset(); } public: + void Add(const bool value, const ui32 count = 1); std::optional<ui32> GetFilteredCount() const; const std::vector<bool>& BuildSimpleFilter() const; std::shared_ptr<arrow::BooleanArray> BuildArrowFilter(const ui32 expectedSize, const std::optional<ui32> startPos = {}, const std::optional<ui32> count = {}) const; diff --git a/ydb/core/formats/arrow/hash/calcer.cpp b/ydb/core/formats/arrow/hash/calcer.cpp index f5d2dc3515e..b9833216dc6 100644 --- a/ydb/core/formats/arrow/hash/calcer.cpp +++ b/ydb/core/formats/arrow/hash/calcer.cpp @@ -18,9 +18,9 @@ void TXX64::AppendField(const std::shared_ptr<arrow::Scalar>& scalar, NXX64::TSt auto& typedScalar = static_cast<const TScalar&>(*scalar); if constexpr (arrow::has_string_view<T>()) { - hashCalcer.Update((const ui8*)typedScalar.value->data(), typedScalar.value->size()); + hashCalcer.Update(reinterpret_cast<const ui8*>(typedScalar.value->data()), typedScalar.value->size()); } else if constexpr (arrow::has_c_type<T>()) { - hashCalcer.Update((const ui8*)(typedScalar.data()), sizeof(T)); + hashCalcer.Update(reinterpret_cast<const ui8*>(typedScalar.data()), sizeof(typedScalar.value)); } else { static_assert(arrow::is_decimal_type<T>()); } @@ -130,4 +130,11 @@ std::vector<std::shared_ptr<arrow::Array>> TXX64::GetColumns(const std::shared_p return columns; } +ui64 TXX64::CalcHash(const std::shared_ptr<arrow::Scalar>& scalar) { + NXX64::TStreamStringHashCalcer calcer(0); + calcer.Start(); + AppendField(scalar, calcer); + return calcer.Finish(); +} + } diff --git a/ydb/core/formats/arrow/hash/calcer.h b/ydb/core/formats/arrow/hash/calcer.h index 511c8c40187..d7f49e2ef1e 100644 --- a/ydb/core/formats/arrow/hash/calcer.h +++ b/ydb/core/formats/arrow/hash/calcer.h @@ -31,6 +31,7 @@ public: static void AppendField(const std::shared_ptr<arrow::Array>& array, const int row, NXX64::TStreamStringHashCalcer& hashCalcer); static void AppendField(const std::shared_ptr<arrow::Scalar>& scalar, NXX64::TStreamStringHashCalcer& hashCalcer); + static ui64 CalcHash(const std::shared_ptr<arrow::Scalar>& scalar); std::optional<std::vector<ui64>> Execute(const std::shared_ptr<arrow::RecordBatch>& batch) const; std::shared_ptr<arrow::Array> ExecuteToArray(const std::shared_ptr<arrow::RecordBatch>& batch, const std::string& hashFieldName) const; }; diff --git a/ydb/core/formats/arrow/ut/ut_hash.cpp b/ydb/core/formats/arrow/ut/ut_hash.cpp new file mode 100644 index 00000000000..3255d430352 --- /dev/null +++ b/ydb/core/formats/arrow/ut/ut_hash.cpp @@ -0,0 +1,60 @@ +#include <library/cpp/testing/unittest/registar.h> +#include <ydb/core/formats/arrow/arrow_helpers.h> +#include <ydb/core/formats/arrow/hash/xx_hash.h> +#include <ydb/core/formats/arrow/hash/calcer.h> + +Y_UNIT_TEST_SUITE(Hash) { + + using namespace NKikimr::NArrow; + + Y_UNIT_TEST(ScalarBinaryHash) { + std::shared_ptr<arrow::Scalar> s1 = std::make_shared<arrow::StringScalar>("abcde"); + std::shared_ptr<arrow::Scalar> s2 = std::make_shared<arrow::StringScalar>("abcde"); + NHash::NXX64::TStreamStringHashCalcer calcer1(0); + calcer1.Start(); + NHash::TXX64::AppendField(s1, calcer1); + + NHash::NXX64::TStreamStringHashCalcer calcer2(0); + calcer2.Start(); + NHash::TXX64::AppendField(s2, calcer2); + const ui64 hash = calcer1.Finish(); + Cerr << hash << Endl; + Y_ABORT_UNLESS(hash == calcer2.Finish()); + } + + Y_UNIT_TEST(ScalarCTypeHash) { + std::shared_ptr<arrow::Scalar> s1 = std::make_shared<arrow::UInt32Scalar>(52); + std::shared_ptr<arrow::Scalar> s2 = std::make_shared<arrow::UInt32Scalar>(52); + NHash::NXX64::TStreamStringHashCalcer calcer1(0); + calcer1.Start(); + NHash::TXX64::AppendField(s1, calcer1); + + NHash::NXX64::TStreamStringHashCalcer calcer2(0); + calcer2.Start(); + NHash::TXX64::AppendField(s2, calcer2); + const ui64 hash = calcer1.Finish(); + Cerr << hash << Endl; + Y_ABORT_UNLESS(hash == calcer2.Finish()); + } + + Y_UNIT_TEST(ScalarCompositeHash) { + std::shared_ptr<arrow::Scalar> s11 = std::make_shared<arrow::StringScalar>("abcde"); + std::shared_ptr<arrow::Scalar> s12 = std::make_shared<arrow::UInt32Scalar>(52); + std::shared_ptr<arrow::Scalar> s21 = std::make_shared<arrow::StringScalar>("abcde"); + std::shared_ptr<arrow::Scalar> s22 = std::make_shared<arrow::UInt32Scalar>(52); + NHash::NXX64::TStreamStringHashCalcer calcer1(0); + calcer1.Start(); + NHash::TXX64::AppendField(s11, calcer1); + NHash::TXX64::AppendField(s12, calcer1); + + NHash::NXX64::TStreamStringHashCalcer calcer2(0); + calcer2.Start(); + NHash::TXX64::AppendField(s21, calcer2); + NHash::TXX64::AppendField(s22, calcer2); + const ui64 hash = calcer1.Finish(); + Cerr << hash << Endl; + Y_ABORT_UNLESS(hash == calcer2.Finish()); + } + + +}; diff --git a/ydb/core/formats/arrow/ut/ya.make b/ydb/core/formats/arrow/ut/ya.make index 312e9033e7e..c4c993ec322 100644 --- a/ydb/core/formats/arrow/ut/ya.make +++ b/ydb/core/formats/arrow/ut/ya.make @@ -28,6 +28,7 @@ SRCS( ut_dictionary.cpp ut_size_calcer.cpp ut_column_filter.cpp + ut_hash.cpp ) END() diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index e38258931b1..c7fa1dae577 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -1229,14 +1229,14 @@ Y_UNIT_TEST_SUITE(KqpOlap) { TLocalHelper(kikimr).CreateTestOlapTable(); auto tableClient = kikimr.GetTableClient(); - Tests::NCommon::TLoggerInit(kikimr).Initialize(); +// Tests::NCommon::TLoggerInit(kikimr).Initialize(); auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>(); { auto alterQuery = TStringBuilder() << R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER, - FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.1}`); + FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.05}`); )"; auto session = tableClient.CreateSession().GetValueSync().GetSession(); auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); @@ -1245,13 +1245,17 @@ Y_UNIT_TEST_SUITE(KqpOlap) { { auto alterQuery = TStringBuilder() << R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_resource_id, TYPE=BLOOM_FILTER, - FEATURES=`{"column_names" : ["resource_id", "uid"], "false_positive_probability" : 0.2}`); + FEATURES=`{"column_names" : ["resource_id", "level"], "false_positive_probability" : 0.05}`); )"; auto session = tableClient.CreateSession().GetValueSync().GetSession(); auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString()); } + std::vector<TString> uids; + std::vector<TString> resourceIds; + std::vector<ui32> levels; + { WriteTestData(kikimr, "/Root/olapStore/olapTable", 1000000, 300000000, 10000); WriteTestData(kikimr, "/Root/olapStore/olapTable", 1100000, 300100000, 10000); @@ -1260,6 +1264,23 @@ Y_UNIT_TEST_SUITE(KqpOlap) { WriteTestData(kikimr, "/Root/olapStore/olapTable", 1400000, 300400000, 10000); WriteTestData(kikimr, "/Root/olapStore/olapTable", 2000000, 200000000, 70000); WriteTestData(kikimr, "/Root/olapStore/olapTable", 3000000, 100000000, 110000); + + const auto filler = [&](const ui32 startRes, const ui32 startUid, const ui32 count) { + for (ui32 i = 0; i < count; ++i) { + uids.emplace_back("uid_" + ::ToString(startUid + i)); + resourceIds.emplace_back(::ToString(startRes + i)); + levels.emplace_back(i % 5); + } + }; + + filler(1000000, 300000000, 10000); + filler(1100000, 300100000, 10000); + filler(1200000, 300200000, 10000); + filler(1300000, 300300000, 10000); + filler(1400000, 300400000, 10000); + filler(2000000, 200000000, 70000); + filler(3000000, 100000000, 110000); + } { @@ -1276,7 +1297,9 @@ Y_UNIT_TEST_SUITE(KqpOlap) { Cout << result << Endl; CompareYson(result, R"([[230000u;]])"); } + AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() == 0); + AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() == 0); TInstant start = Now(); ui32 compactionsStart = csController->GetCompactions().Val(); while (Now() - start < TDuration::Seconds(10)) { @@ -1295,35 +1318,97 @@ Y_UNIT_TEST_SUITE(KqpOlap) { SELECT COUNT(*) FROM `/Root/olapStore/olapTable` - WHERE resource_id = '3000008' AND level = 3 AND uid = 'uid_100000008' + WHERE ((resource_id = '2' AND level = 222222) OR (resource_id = '1' AND level = 111111) OR (resource_id LIKE '%11dd%')) AND uid = '222' )").GetValueSync(); UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); TString result = StreamResultToYson(it); Cout << result << Endl; + Cout << csController->GetIndexesSkippingOnSelect().Val() << " / " << csController->GetIndexesApprovedOnSelect().Val() << Endl; + CompareYson(result, R"([[0u;]])"); + AFL_VERIFY(csController->GetIndexesSkippedNoData().Val() == 0); + AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() == 17); + AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() == 4); + } + ui32 requestsCount = 100; + for (ui32 i = 0; i < requestsCount; ++i) { + const ui32 idx = RandomNumber<ui32>(uids.size()); + const auto query = [](const TString& res, const TString& uid, const ui32 level) { + TStringBuilder sb; + sb << "SELECT" << Endl; + sb << "COUNT(*)" << Endl; + sb << "FROM `/Root/olapStore/olapTable`" << Endl; + sb << "WHERE(" << Endl; + sb << "resource_id = '" << res << "' AND" << Endl; + sb << "uid= '" << uid << "' AND" << Endl; + sb << "level= " << level << Endl; + sb << ")"; + return sb; + }; + auto it = tableClient.StreamExecuteScanQuery(query(resourceIds[idx], uids[idx], levels[idx])).GetValueSync(); + + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + TString result = StreamResultToYson(it); + Cout << csController->GetIndexesSkippingOnSelect().Val() << " / " << csController->GetIndexesApprovedOnSelect().Val() << " / " << csController->GetIndexesSkippedNoData().Val() << Endl; CompareYson(result, R"([[1u;]])"); - AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val()); - AFL_VERIFY(!csController->GetIndexesApprovedOnSelect().Val()); } + AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() / csController->GetIndexesSkippingOnSelect().Val() < 0.15); + + } + + Y_UNIT_TEST(IndexesModificationError) { + auto settings = TKikimrSettings() + .SetWithSampleTables(false); + TKikimrRunner kikimr(settings); + + TLocalHelper(kikimr).CreateTestOlapTable(); + auto tableClient = kikimr.GetTableClient(); + + // Tests::NCommon::TLoggerInit(kikimr).Initialize(); + + auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>(); + { - const i64 before = csController->GetIndexesSkippingOnSelect().Val(); - auto it = tableClient.StreamExecuteScanQuery(R"( - --!syntax_v1 + auto alterQuery = TStringBuilder() << + R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER, + FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.05}`); + )"; + auto session = tableClient.CreateSession().GetValueSync().GetSession(); + auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString()); + } - SELECT - COUNT(*) - FROM `/Root/olapStore/olapTable` - WHERE ((resource_id = '2' AND level = 222222) OR (resource_id = '1' AND level = 111111) OR (resource_id LIKE '%11dd%')) AND uid = '222' - )").GetValueSync(); + { + auto alterQuery = TStringBuilder() << + R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER, + FEATURES=`{"column_names" : ["uid", "resource_id"], "false_positive_probability" : 0.05}`); + )"; + auto session = tableClient.CreateSession().GetValueSync().GetSession(); + auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); + UNIT_ASSERT_VALUES_UNEQUAL(alterResult.GetStatus(), EStatus::SUCCESS); + } - UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); - TString result = StreamResultToYson(it); - AFL_VERIFY(before != csController->GetIndexesSkippingOnSelect().Val()); - Cout << result << Endl; - CompareYson(result, R"([[0u;]])"); - AFL_VERIFY(!csController->GetIndexesApprovedOnSelect().Val()); + { + auto alterQuery = TStringBuilder() << + R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER, + FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.005}`); + )"; + auto session = tableClient.CreateSession().GetValueSync().GetSession(); + auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); + UNIT_ASSERT_VALUES_UNEQUAL(alterResult.GetStatus(), EStatus::SUCCESS); + } + + { + auto alterQuery = TStringBuilder() << + R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_uid, TYPE=BLOOM_FILTER, + FEATURES=`{"column_names" : ["uid"], "false_positive_probability" : 0.01}`); + )"; + auto session = tableClient.CreateSession().GetValueSync().GetSession(); + auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString()); } + } Y_UNIT_TEST(PushdownFilter) { diff --git a/ydb/core/protos/ssa.proto b/ydb/core/protos/ssa.proto index 92e5738a3ac..0b48c433242 100644 --- a/ydb/core/protos/ssa.proto +++ b/ydb/core/protos/ssa.proto @@ -41,7 +41,7 @@ message TProgram { } message TBloomFilterChecker { - repeated uint32 HashValues = 1; + repeated uint64 HashValues = 1; } message TOlapIndexChecker { diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index 6dfe9d7e1e2..d7fbdf27d50 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -298,9 +298,10 @@ struct Schema : NIceDb::Schema { struct Blob: Column<5, NScheme::NTypeIds::String> {}; struct Offset: Column<6, NScheme::NTypeIds::Uint32> {}; struct Size: Column<7, NScheme::NTypeIds::Uint32> {}; + struct RecordsCount: Column<8, NScheme::NTypeIds::Uint32> {}; using TKey = TableKey<PathId, PortionId, IndexId, ChunkIdx>; - using TColumns = TableColumns<PathId, PortionId, IndexId, ChunkIdx, Blob, Offset, Size>; + using TColumns = TableColumns<PathId, PortionId, IndexId, ChunkIdx, Blob, Offset, Size, RecordsCount>; }; using TTables = SchemaTables< @@ -618,14 +619,17 @@ class TIndexChunkLoadContext { private: YDB_READONLY_DEF(TBlobRange, BlobRange); TChunkAddress Address; + const ui32 RecordsCount; public: TIndexChunk BuildIndexChunk() const { - return TIndexChunk(Address.GetColumnId(), Address.GetChunkIdx(), BlobRange); + return TIndexChunk(Address.GetColumnId(), Address.GetChunkIdx(), RecordsCount, BlobRange); } template <class TSource> TIndexChunkLoadContext(const TSource& rowset, const IBlobGroupSelector* dsGroupSelector) - : Address(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::IndexId>(), rowset.template GetValue<NColumnShard::Schema::IndexIndexes::ChunkIdx>()) { + : Address(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::IndexId>(), rowset.template GetValue<NColumnShard::Schema::IndexIndexes::ChunkIdx>()) + , RecordsCount(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::RecordsCount>()) + { AFL_VERIFY(Address.GetColumnId())("event", "incorrect address")("address", Address.DebugString()); TString strBlobId = rowset.template GetValue<NColumnShard::Schema::IndexIndexes::Blob>(); Y_ABORT_UNLESS(strBlobId.size() == sizeof(TLogoBlobID), "Size %" PRISZT " doesn't match TLogoBlobID", strBlobId.size()); diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp index 36b099471fb..2ff9533af29 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp @@ -98,26 +98,40 @@ void TChangesWithAppend::DoCompile(TFinalizationContext& context) { } std::vector<TPortionInfoWithBlobs> TChangesWithAppend::MakeAppendedPortions(const std::shared_ptr<arrow::RecordBatch> batch, - const ui64 granule, const TSnapshot& snapshot, const TGranuleMeta* granuleMeta, TConstructionContext& context) const { + const ui64 pathId, const TSnapshot& snapshot, const TGranuleMeta* granuleMeta, TConstructionContext& context) const { Y_ABORT_UNLESS(batch->num_rows()); auto resultSchema = context.SchemaVersions.GetSchema(snapshot); - std::vector<TPortionInfoWithBlobs> out; std::shared_ptr<NOlap::TSerializationStats> stats = std::make_shared<NOlap::TSerializationStats>(); if (granuleMeta) { stats = granuleMeta->BuildSerializationStats(resultSchema); } auto schema = std::make_shared<TDefaultSchemaDetails>(resultSchema, SaverContext, stats); - TRBSplitLimiter limiter(context.Counters.SplitterCounters, schema, batch, SplitSettings); + std::vector<TPortionInfoWithBlobs> out; + { + std::vector<TBatchSerializedSlice> pages = TRBSplitLimiter::BuildSimpleSlices(batch, SplitSettings, context.Counters.SplitterCounters, schema); + std::vector<TGeneralSerializedSlice> generalPages; + for (auto&& i : pages) { + std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>> portionColumns = i.GetPortionChunks(); + resultSchema->GetIndexInfo().AppendIndexes(portionColumns); + generalPages.emplace_back(portionColumns, schema, context.Counters.SplitterCounters, SplitSettings); + } + + TSimilarSlicer slicer(SplitSettings.GetExpectedPortionSize()); + auto packs = slicer.Split(generalPages); - std::vector<std::vector<std::shared_ptr<IPortionDataChunk>>> chunkByBlobs; - std::shared_ptr<arrow::RecordBatch> portionBatch; - while (limiter.Next(chunkByBlobs, portionBatch)) { - TPortionInfoWithBlobs infoWithBlob = TPortionInfoWithBlobs::BuildByBlobs(chunkByBlobs, nullptr, granule, snapshot, SaverContext.GetStorageOperator()); - infoWithBlob.GetPortionInfo().AddMetadata(*resultSchema, portionBatch, SaverContext.GetTierName()); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("portion_appended", infoWithBlob.GetPortionInfo().DebugString()); - out.emplace_back(std::move(infoWithBlob)); + ui32 recordIdx = 0; + for (auto&& i : packs) { + TGeneralSerializedSlice slice(std::move(i)); + auto b = batch->Slice(recordIdx, slice.GetRecordsCount()); + std::vector<std::vector<std::shared_ptr<IPortionDataChunk>>> chunksByBlobs = slice.GroupChunksByBlobs(); + out.emplace_back(TPortionInfoWithBlobs::BuildByBlobs(chunksByBlobs, nullptr, pathId, snapshot, SaverContext.GetStorageOperator())); + NArrow::TFirstLastSpecialKeys primaryKeys(slice.GetFirstLastPKBatch(resultSchema->GetIndexInfo().GetReplaceKey())); + NArrow::TMinMaxSpecialKeys snapshotKeys(b, TIndexInfo::ArrowSchemaSnapshot()); + out.back().GetPortionInfo().AddMetadata(*resultSchema, primaryKeys, snapshotKeys, SaverContext.GetTierName()); + recordIdx += slice.GetRecordsCount(); + } } return out; diff --git a/ydb/core/tx/columnshard/engines/portions/column_record.h b/ydb/core/tx/columnshard/engines/portions/column_record.h index 3917a035227..a56197774c4 100644 --- a/ydb/core/tx/columnshard/engines/portions/column_record.h +++ b/ydb/core/tx/columnshard/engines/portions/column_record.h @@ -20,12 +20,14 @@ class TIndexChunk { private: YDB_READONLY(ui32, IndexId, 0); YDB_READONLY(ui32, ChunkIdx, 0); + YDB_READONLY(ui32, RecordsCount, 0); YDB_READONLY_DEF(TBlobRange, BlobRange); public: - TIndexChunk(const ui32 indexId, const ui32 chunkIdx, const TBlobRange& blobRange) + TIndexChunk(const ui32 indexId, const ui32 chunkIdx, const ui32 recordsCount, const TBlobRange& blobRange) : IndexId(indexId) , ChunkIdx(chunkIdx) + , RecordsCount(recordsCount) , BlobRange(blobRange) { } diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index fdf35132ff1..3deb26a0a73 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -235,6 +235,77 @@ void TPortionInfo::SaveToDatabase(IDbWrapper& db) const { } } +std::vector<NKikimr::NOlap::TPortionInfo::TPage> TPortionInfo::BuildPages() const { + std::vector<TPage> pages; + struct TPart { + public: + const TColumnRecord* Record = nullptr; + const TIndexChunk* Index = nullptr; + const ui32 RecordsCount; + TPart(const TColumnRecord* record, const ui32 recordsCount) + : Record(record) + , RecordsCount(recordsCount) { + + } + TPart(const TIndexChunk* record, const ui32 recordsCount) + : Index(record) + , RecordsCount(recordsCount) { + + } + }; + std::map<ui32, std::deque<TPart>> entities; + std::map<ui32, ui32> currentCursor; + ui32 currentSize = 0; + ui32 currentId = 0; + for (auto&& i : Records) { + if (currentId != i.GetColumnId()) { + currentSize = 0; + currentId = i.GetColumnId(); + } + currentSize += i.GetMeta().GetNumRowsVerified(); + ++currentCursor[currentSize]; + entities[i.GetColumnId()].emplace_back(&i, i.GetMeta().GetNumRowsVerified()); + } + for (auto&& i : Indexes) { + if (currentId != i.GetIndexId()) { + currentSize = 0; + currentId = i.GetIndexId(); + } + currentSize += i.GetRecordsCount(); + ++currentCursor[currentSize]; + entities[i.GetIndexId()].emplace_back(&i, i.GetRecordsCount()); + } + const ui32 entitiesCount = entities.size(); + ui32 predCount = 0; + for (auto&& i : currentCursor) { + if (i.second != entitiesCount) { + continue; + } + std::vector<const TColumnRecord*> records; + std::vector<const TIndexChunk*> indexes; + for (auto&& c : entities) { + ui32 readyCount = 0; + while (readyCount < i.first - predCount && c.second.size()) { + if (c.second.front().Record) { + records.emplace_back(c.second.front().Record); + } else { + AFL_VERIFY(c.second.front().Index); + indexes.emplace_back(c.second.front().Index); + } + readyCount += c.second.front().RecordsCount; + c.second.pop_front(); + } + AFL_VERIFY(readyCount == i.first - predCount)("ready", readyCount)("cursor", i.first)("pred_cursor", predCount); + } + pages.emplace_back(std::move(records), std::move(indexes), i.first - predCount); + predCount = i.first; + } + for (auto&& i : entities) { + AFL_VERIFY(i.second.empty()); + } + return pages; +} + std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble() const { Y_ABORT_UNLESS(!Blobs.empty()); diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index 225c1381c79..ffca573d8cc 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -30,6 +30,23 @@ private: YDB_READONLY_DEF(std::vector<TIndexChunk>, Indexes); public: + class TPage { + private: + YDB_READONLY_DEF(std::vector<const TColumnRecord*>, Records); + YDB_READONLY_DEF(std::vector<const TIndexChunk*>, Indexes); + YDB_READONLY(ui32, RecordsCount, 0); + public: + TPage(std::vector<const TColumnRecord*>&& records, std::vector<const TIndexChunk*>&& indexes, const ui32 recordsCount) + : Records(std::move(records)) + , Indexes(std::move(indexes)) + , RecordsCount(recordsCount) + { + + } + }; + + std::vector<TPage> BuildPages() const; + std::vector<TColumnRecord> Records; const std::vector<TColumnRecord>& GetRecords() const { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp index e0118a290b0..6bc4ddf410b 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp @@ -121,6 +121,7 @@ bool TPortionDataSource::DoStartFetchingIndexes(const std::shared_ptr<IDataSourc } if (!readAction->GetExpectedBlobsSize()) { + NYDBTest::TControllers::GetColumnShardController()->OnIndexSelectProcessed({}); return false; } @@ -136,24 +137,37 @@ void TPortionDataSource::DoAbort() { void TPortionDataSource::DoApplyIndex(const NIndexes::TIndexCheckerContainer& indexChecker) { THashMap<ui32, std::vector<TString>> indexBlobs; std::set<ui32> indexIds = indexChecker->GetIndexIds(); - for (auto&& i : Portion->GetIndexes()) { - if (!indexIds.contains(i.GetIndexId())) { - continue; +// NActors::TLogContextGuard gLog = NActors::TLogContextBuilder::Build()("records_count", GetRecordsCount())("portion_id", Portion->GetAddress().DebugString()); + std::vector<TPortionInfo::TPage> pages = Portion->BuildPages(); + NArrow::TColumnFilter constructor = NArrow::TColumnFilter::BuildAllowFilter(); + for (auto&& p : pages) { + for (auto&& i : p.GetIndexes()) { + if (!indexIds.contains(i->GetIndexId())) { + continue; + } + indexBlobs[i->GetIndexId()].emplace_back(StageData->ExtractBlob(i->GetBlobRange())); } - indexBlobs[i.GetIndexId()].emplace_back(StageData->ExtractBlob(i.GetBlobRange())); - } - for (auto&& i : indexIds) { - if (!indexBlobs.contains(i)) { - return; + for (auto&& i : indexIds) { + if (!indexBlobs.contains(i)) { + return; + } + } + if (indexChecker->Check(indexBlobs)) { + NYDBTest::TControllers::GetColumnShardController()->OnIndexSelectProcessed(true); + constructor.Add(true, p.GetRecordsCount()); + } else { + NYDBTest::TControllers::GetColumnShardController()->OnIndexSelectProcessed(false); + constructor.Add(false, p.GetRecordsCount()); } } - if (!indexChecker->Check(indexBlobs)) { - NYDBTest::TControllers::GetColumnShardController()->OnIndexSelectProcessed(false); + AFL_VERIFY(constructor.Size() == Portion->GetRecordsCount()); + if (constructor.IsTotalDenyFilter()) { StageData->AddFilter(NArrow::TColumnFilter::BuildDenyFilter()); + } else if (constructor.IsTotalAllowFilter()) { + return; } else { - NYDBTest::TControllers::GetColumnShardController()->OnIndexSelectProcessed(true); + StageData->AddFilter(constructor); } - return; } bool TCommittedDataSource::DoStartFetchingColumns(const std::shared_ptr<IDataSource>& sourcePtr, const std::shared_ptr<IFetchingStep>& step, const std::shared_ptr<TColumnsSet>& /*columns*/) { diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/composite.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/composite.h index 04d6646c9b6..17bb40a2b96 100644 --- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/composite.h +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/composite.h @@ -78,7 +78,9 @@ private: static inline auto Registrator = TFactory::TRegistrator<TOrIndexChecker>(GetClassNameStatic()); protected: virtual bool DoCheck(const THashMap<ui32, std::vector<TString>>& blobsByIndexId) const override { +// ui32 idx = 0; for (auto&& i : Checkers) { +// NActors::TLogContextGuard gLog = NActors::TLogContextBuilder::Build()("branch", idx++); if (i->Check(blobsByIndexId)) { return true; } diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp index c64d9b11a29..4debc30257d 100644 --- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp @@ -9,20 +9,25 @@ namespace NKikimr::NOlap::NIndexes { void TPortionIndexChunk::DoAddIntoPortion(const TBlobRange& bRange, TPortionInfo& portionInfo) const { - portionInfo.AddIndex(TIndexChunk(GetEntityId(), GetChunkIdx(), bRange)); + portionInfo.AddIndex(TIndexChunk(GetEntityId(), GetChunkIdx(), RecordsCount, bRange)); } std::shared_ptr<NKikimr::NOlap::IPortionDataChunk> TIndexByColumns::DoBuildIndex(const ui32 indexId, std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data, const TIndexInfo& indexInfo) const { + AFL_VERIFY(data.size()); std::vector<TChunkedColumnReader> columnReaders; for (auto&& i : ColumnIds) { auto it = data.find(i); AFL_VERIFY(it != data.end()); columnReaders.emplace_back(it->second, indexInfo.GetColumnLoaderVerified(i)); } + ui32 recordsCount = 0; + for (auto&& i : data.begin()->second) { + recordsCount += i->GetRecordsCountVerified(); + } TChunkedBatchReader reader(std::move(columnReaders)); std::shared_ptr<arrow::RecordBatch> indexBatch = DoBuildIndexImpl(reader); const TString indexData = TColumnSaver(nullptr, Serializer).Apply(indexBatch); - return std::make_shared<TPortionIndexChunk>(indexId, indexData); + return std::make_shared<TPortionIndexChunk>(indexId, recordsCount, indexData); } bool TIndexByColumns::DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& /*proto*/) { @@ -37,4 +42,20 @@ TIndexByColumns::TIndexByColumns(const ui32 indexId, const std::set<ui32>& colum Serializer = std::make_shared<NArrow::NSerialization::TFullDataSerializer>(arrow::ipc::IpcWriteOptions::Defaults()); } +NKikimr::TConclusionStatus TIndexByColumns::CheckSameColumnsForModification(const IIndexMeta& newMeta) const { + const auto* bMeta = dynamic_cast<const TIndexByColumns*>(&newMeta); + if (!bMeta) { + return TConclusionStatus::Fail("cannot read meta as appropriate class: " + GetClassName() + ". Meta said that class name is " + newMeta.GetClassName()); + } + if (bMeta->ColumnIds.size() != ColumnIds.size()) { + return TConclusionStatus::Fail("columns count is different"); + } + for (auto&& i : bMeta->ColumnIds) { + if (!ColumnIds.contains(i)) { + return TConclusionStatus::Fail("columns set is different or column was recreated in database"); + } + } + return TConclusionStatus::Success(); +} + } // namespace NKikimr::NOlap::NIndexes
\ No newline at end of file diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.h index d80da770d1b..28d8825cb10 100644 --- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.h +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.h @@ -5,6 +5,7 @@ #include <ydb/core/tx/columnshard/splitter/chunks.h> #include <ydb/core/protos/flat_scheme_op.pb.h> #include <ydb/services/bg_tasks/abstract/interface.h> +#include <ydb/library/conclusion/status.h> #include <library/cpp/object_factory/object_factory.h> @@ -31,6 +32,7 @@ protected: virtual void DoFillIndexCheckers(const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& schema) const = 0; virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) = 0; virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const = 0; + virtual TConclusionStatus DoCheckModificationCompatibility(const IIndexMeta& newMeta) const = 0; public: using TFactory = NObjectFactory::TObjectFactory<IIndexMeta, TString>; @@ -43,6 +45,16 @@ public: } + TConclusionStatus CheckModificationCompatibility(const std::shared_ptr<IIndexMeta>& newMeta) const { + if (!newMeta) { + return TConclusionStatus::Fail("new meta cannot be absent"); + } + if (newMeta->GetClassName() != GetClassName()) { + return TConclusionStatus::Fail("new meta have to be same index class (" + GetClassName() + "), but new class name: " + newMeta->GetClassName()); + } + return DoCheckModificationCompatibility(*newMeta); + } + virtual ~IIndexMeta() = default; std::shared_ptr<IPortionDataChunk> BuildIndex(const ui32 indexId, std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data, const TIndexInfo& indexInfo) const { @@ -83,6 +95,7 @@ public: class TPortionIndexChunk: public IPortionDataChunk { private: using TBase = IPortionDataChunk; + const ui32 RecordsCount; const TString Data; protected: virtual const TString& DoGetData() const override { @@ -98,7 +111,7 @@ protected: return false; } virtual std::optional<ui32> DoGetRecordsCount() const override { - return {}; + return RecordsCount; } virtual std::shared_ptr<arrow::Scalar> DoGetFirstScalar() const override { return nullptr; @@ -108,8 +121,9 @@ protected: } virtual void DoAddIntoPortion(const TBlobRange& bRange, TPortionInfo& portionInfo) const override; public: - TPortionIndexChunk(const ui32 entityId, const TString& data) - : TBase(entityId, 0) + TPortionIndexChunk(const ui32 entityId, const ui32 recordsCount, const TString& data) + : TBase(entityId, 0) + , RecordsCount(recordsCount) , Data(data) { } @@ -126,6 +140,9 @@ protected: virtual std::shared_ptr<IPortionDataChunk> DoBuildIndex(const ui32 indexId, std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>>& data, const TIndexInfo& indexInfo) const override final; virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& /*proto*/) override; + + TConclusionStatus CheckSameColumnsForModification(const IIndexMeta& newMeta) const; + public: TIndexByColumns() = default; TIndexByColumns(const ui32 indexId, const std::set<ui32>& columnIds); diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/simple.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/simple.h index 6fa589899a1..f50566a0ff4 100644 --- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/simple.h +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/simple.h @@ -12,7 +12,7 @@ protected: virtual bool DoCheck(const THashMap<ui32, std::vector<TString>>& blobs) const override final { auto it = blobs.find(IndexId); AFL_VERIFY(it != blobs.end()); - return DoCheckImpl(std::move(it->second)); + return DoCheckImpl(it->second); } virtual bool DoDeserializeFromProtoImpl(const NKikimrSSA::TProgram::TOlapIndexChecker& proto) = 0; virtual void DoSerializeToProtoImpl(NKikimrSSA::TProgram::TOlapIndexChecker& proto) const = 0; diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/checker.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/checker.cpp index ac368bf8a46..368589b11c3 100644 --- a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/checker.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/checker.cpp @@ -27,6 +27,7 @@ bool TBloomFilterChecker::DoCheckImpl(const std::vector<TString>& blobs) const { } } if (found) { +// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("size", bArray.length())("data", bArray.ToString())("index_id", GetIndexId()); return true; } } diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/meta.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/meta.cpp index f24100e81d3..3c52518d0b8 100644 --- a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/meta.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/meta.cpp @@ -11,18 +11,23 @@ namespace NKikimr::NOlap::NIndexes { std::shared_ptr<arrow::RecordBatch> TBloomIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader) const { - std::vector<bool> flags; - flags.resize(BitsCount, false); + std::set<ui64> hashes; for (ui32 i = 0; i < HashesCount; ++i) { NArrow::NHash::NXX64::TStreamStringHashCalcer hashCalcer(3 * i); - for (; reader.IsCorrect(); reader.ReadNext()) { + for (reader.Start(); reader.IsCorrect(); reader.ReadNext()) { hashCalcer.Start(); for (auto&& i : reader) { NArrow::NHash::TXX64::AppendField(i.GetCurrentChunk(), i.GetCurrentRecordIndex(), hashCalcer); } - flags[hashCalcer.Finish() % BitsCount] = true; + const ui64 h = hashCalcer.Finish(); + hashes.emplace(h); } } + const ui32 bitsCount = hashes.size() / std::log(2); + std::vector<bool> flags(bitsCount, false); + for (auto&& i : hashes) { + flags[i % flags.size()] = true; + } arrow::BooleanBuilder builder; auto res = builder.Reserve(flags.size()); @@ -30,7 +35,7 @@ std::shared_ptr<arrow::RecordBatch> TBloomIndexMeta::DoBuildIndexImpl(TChunkedBa std::shared_ptr<arrow::BooleanArray> out; NArrow::TStatusValidator::Validate(builder.Finish(&out)); - return arrow::RecordBatch::Make(ResultSchema, BitsCount, {out}); + return arrow::RecordBatch::Make(ResultSchema, bitsCount, {out}); } void TBloomIndexMeta::DoFillIndexCheckers(const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& schema) const { @@ -58,7 +63,8 @@ void TBloomIndexMeta::DoFillIndexCheckers(const std::shared_ptr<NRequest::TDataF for (auto&& i : foundColumns) { NArrow::NHash::TXX64::AppendField(i.second, calcer); } - hashes.emplace(calcer.Finish()); + const ui64 hash = calcer.Finish(); + hashes.emplace(hash); } branch->MutableIndexes().emplace_back(std::make_shared<TBloomFilterChecker>(GetIndexId(), std::move(hashes))); } diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/meta.h b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/meta.h index bc4db867d60..b077c6cfc6f 100644 --- a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/meta.h +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/meta.h @@ -10,20 +10,25 @@ public: private: using TBase = TIndexByColumns; std::shared_ptr<arrow::Schema> ResultSchema; - const ui64 RowsCountExpectation = 10000; double FalsePositiveProbability = 0.1; ui32 HashesCount = 0; - ui32 BitsCount = 0; static inline auto Registrator = TFactory::TRegistrator<TBloomIndexMeta>(GetClassNameStatic()); void Initialize() { AFL_VERIFY(!ResultSchema); std::vector<std::shared_ptr<arrow::Field>> fields = {std::make_shared<arrow::Field>("", arrow::TypeTraits<arrow::BooleanType>::type_singleton())}; ResultSchema = std::make_shared<arrow::Schema>(fields); - AFL_VERIFY(FalsePositiveProbability < 1 && FalsePositiveProbability > 0.01); + AFL_VERIFY(FalsePositiveProbability < 1 && FalsePositiveProbability >= 0.01); HashesCount = -1 * std::log(FalsePositiveProbability) / std::log(2); - BitsCount = RowsCountExpectation * HashesCount / std::log(2); } protected: + virtual TConclusionStatus DoCheckModificationCompatibility(const IIndexMeta& newMeta) const override { + const auto* bMeta = dynamic_cast<const TBloomIndexMeta*>(&newMeta); + if (!bMeta) { + return TConclusionStatus::Fail("cannot read meta as appropriate class: " + GetClassName() + ". Meta said that class name is " + newMeta.GetClassName()); + } + AFL_VERIFY(FalsePositiveProbability < 1 && FalsePositiveProbability >= 0.01); + return TBase::CheckSameColumnsForModification(newMeta); + } virtual void DoFillIndexCheckers(const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& schema) const override; virtual std::shared_ptr<arrow::RecordBatch> DoBuildIndexImpl(TChunkedBatchReader& reader) const override; diff --git a/ydb/core/tx/columnshard/hooks/abstract/abstract.h b/ydb/core/tx/columnshard/hooks/abstract/abstract.h index 8a3530fc731..e9b9144cacb 100644 --- a/ydb/core/tx/columnshard/hooks/abstract/abstract.h +++ b/ydb/core/tx/columnshard/hooks/abstract/abstract.h @@ -82,7 +82,7 @@ public: bool OnStartCompaction(std::shared_ptr<NOlap::TColumnEngineChanges>& changes) { return DoOnStartCompaction(changes); } - virtual void OnIndexSelectProcessed(const bool /*result*/) { + virtual void OnIndexSelectProcessed(const std::optional<bool> /*result*/) { } virtual EOptimizerCompactionWeightControl GetCompactionControl() const { return EOptimizerCompactionWeightControl::Force; diff --git a/ydb/core/tx/columnshard/hooks/testing/controller.h b/ydb/core/tx/columnshard/hooks/testing/controller.h index a6680395472..7fc824508e9 100644 --- a/ydb/core/tx/columnshard/hooks/testing/controller.h +++ b/ydb/core/tx/columnshard/hooks/testing/controller.h @@ -10,6 +10,7 @@ private: YDB_READONLY(TAtomicCounter, Indexations, 0); YDB_READONLY(TAtomicCounter, IndexesSkippingOnSelect, 0); YDB_READONLY(TAtomicCounter, IndexesApprovedOnSelect, 0); + YDB_READONLY(TAtomicCounter, IndexesSkippedNoData, 0); YDB_ACCESSOR(std::optional<TDuration>, GuaranteeIndexationInterval, TDuration::Zero()); YDB_ACCESSOR(std::optional<TDuration>, PeriodicWakeupActivationPeriod, std::nullopt); YDB_ACCESSOR(std::optional<TDuration>, StatsReportInterval, std::nullopt); @@ -40,8 +41,10 @@ protected: } public: - virtual void OnIndexSelectProcessed(const bool result) override { - if (result) { + virtual void OnIndexSelectProcessed(const std::optional<bool> result) override { + if (!result) { + IndexesSkippedNoData.Inc(); + } else if (*result) { IndexesApprovedOnSelect.Inc(); } else { IndexesSkippingOnSelect.Inc(); diff --git a/ydb/core/tx/columnshard/splitter/batch_slice.h b/ydb/core/tx/columnshard/splitter/batch_slice.h index d86db7ae8ca..a97a1e1770e 100644 --- a/ydb/core/tx/columnshard/splitter/batch_slice.h +++ b/ydb/core/tx/columnshard/splitter/batch_slice.h @@ -108,6 +108,15 @@ protected: } public: + + std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>> GetPortionChunks() const { + std::map<ui32, std::vector<std::shared_ptr<IPortionDataChunk>>> result; + for (auto&& i : Data) { + AFL_VERIFY(result.emplace(i.GetEntityId(), i.GetChunks()).second); + } + return result; + } + std::shared_ptr<arrow::RecordBatch> GetFirstLastPKBatch(const std::shared_ptr<arrow::Schema>& pkSchema) const { std::vector<std::shared_ptr<arrow::Array>> pkColumns; for (auto&& i : pkSchema->fields()) { diff --git a/ydb/core/tx/columnshard/splitter/chunks.h b/ydb/core/tx/columnshard/splitter/chunks.h index 1623bd2914d..73f88f15e5a 100644 --- a/ydb/core/tx/columnshard/splitter/chunks.h +++ b/ydb/core/tx/columnshard/splitter/chunks.h @@ -139,6 +139,12 @@ public: : Chunks(chunks) , Loader(loader) { + Start(); + } + + void Start() { + CurrentChunkIndex = 0; + CurrentRecordIndex = 0; if (Chunks.size()) { CurrentChunk = Loader->ApplyVerifiedColumn(Chunks.front()->GetData()); } @@ -191,6 +197,13 @@ public: return IsCorrectFlag; } + void Start() { + IsCorrectFlag = true; + for (auto&& i : Columns) { + i.Start(); + } + } + bool ReadNext() { std::optional<bool> result; for (auto&& i : Columns) { diff --git a/ydb/core/tx/columnshard/splitter/rb_splitter.cpp b/ydb/core/tx/columnshard/splitter/rb_splitter.cpp index 5fae273e44c..698a9e6f3e4 100644 --- a/ydb/core/tx/columnshard/splitter/rb_splitter.cpp +++ b/ydb/core/tx/columnshard/splitter/rb_splitter.cpp @@ -10,20 +10,7 @@ TRBSplitLimiter::TRBSplitLimiter(std::shared_ptr<NColumnShard::TSplitterCounters , Settings(settings) { Y_ABORT_UNLESS(Batch->num_rows()); - std::vector<TBatchSerializedSlice> slices; - auto stats = schemaInfo->GetBatchSerializationStats(Batch); - ui32 recordsCount = Settings.GetMinRecordsCount(); - if (stats) { - const ui32 recordsCountForMinSize = stats->PredictOptimalPackRecordsCount(Batch->num_rows(), Settings.GetMinBlobSize()).value_or(recordsCount); - const ui32 recordsCountForMaxPortionSize = stats->PredictOptimalPackRecordsCount(Batch->num_rows(), Settings.GetMaxPortionSize()).value_or(recordsCount); - recordsCount = std::min(recordsCountForMaxPortionSize, std::max(recordsCount, recordsCountForMinSize)); - } - auto linearSplitInfo = TSimpleSplitter::GetOptimalLinearSplitting(Batch->num_rows(), recordsCount); - for (auto it = linearSplitInfo.StartIterator(); it.IsValid(); it.Next()) { - std::shared_ptr<arrow::RecordBatch> current = batch->Slice(it.GetPosition(), it.GetCurrentPackSize()); - TBatchSerializedSlice slice(current, schemaInfo, Counters, settings); - slices.emplace_back(std::move(slice)); - } + std::vector<TBatchSerializedSlice> slices = BuildSimpleSlices(Batch, Settings, Counters, schemaInfo); auto chunks = TSimilarSlicer(Settings.GetMinBlobSize()).Split(slices); ui32 chunkStartPosition = 0; @@ -60,4 +47,25 @@ bool TRBSplitLimiter::Next(std::vector<std::vector<std::shared_ptr<IPortionDataC Slices.pop_front(); return true; } + +std::vector<NKikimr::NOlap::TBatchSerializedSlice> TRBSplitLimiter::BuildSimpleSlices(const std::shared_ptr<arrow::RecordBatch>& batch, const TSplitSettings& settings, + const std::shared_ptr<NColumnShard::TSplitterCounters>& counters, const ISchemaDetailInfo::TPtr& schemaInfo) +{ + std::vector<TBatchSerializedSlice> slices; + auto stats = schemaInfo->GetBatchSerializationStats(batch); + ui32 recordsCount = settings.GetMinRecordsCount(); + if (stats) { + const ui32 recordsCountForMinSize = stats->PredictOptimalPackRecordsCount(batch->num_rows(), settings.GetMinBlobSize()).value_or(recordsCount); + const ui32 recordsCountForMaxPortionSize = stats->PredictOptimalPackRecordsCount(batch->num_rows(), settings.GetMaxPortionSize()).value_or(recordsCount); + recordsCount = std::min(recordsCountForMaxPortionSize, std::max(recordsCount, recordsCountForMinSize)); + } + auto linearSplitInfo = TSimpleSplitter::GetOptimalLinearSplitting(batch->num_rows(), recordsCount); + for (auto it = linearSplitInfo.StartIterator(); it.IsValid(); it.Next()) { + std::shared_ptr<arrow::RecordBatch> current = batch->Slice(it.GetPosition(), it.GetCurrentPackSize()); + TBatchSerializedSlice slice(current, schemaInfo, counters, settings); + slices.emplace_back(std::move(slice)); + } + return slices; +} + } diff --git a/ydb/core/tx/columnshard/splitter/rb_splitter.h b/ydb/core/tx/columnshard/splitter/rb_splitter.h index 0b0356b86a5..e5d25f2dcef 100644 --- a/ydb/core/tx/columnshard/splitter/rb_splitter.h +++ b/ydb/core/tx/columnshard/splitter/rb_splitter.h @@ -56,6 +56,13 @@ public: TRBSplitLimiter(std::shared_ptr<NColumnShard::TSplitterCounters> counters, ISchemaDetailInfo::TPtr schemaInfo, const std::shared_ptr<arrow::RecordBatch> batch, const TSplitSettings& settings); + static std::vector<TBatchSerializedSlice> BuildSimpleSlices(const std::shared_ptr<arrow::RecordBatch>& batch, const TSplitSettings& settings, + const std::shared_ptr<NColumnShard::TSplitterCounters>& counters, const ISchemaDetailInfo::TPtr& schemaInfo); + + std::deque<TBatchSerializedSlice> ExtractSlices() { + return std::move(Slices); + } + bool Next(std::vector<std::vector<std::shared_ptr<IPortionDataChunk>>>& portionBlobs, std::shared_ptr<arrow::RecordBatch>& batch); }; diff --git a/ydb/core/tx/schemeshard/olap/indexes/schema.cpp b/ydb/core/tx/schemeshard/olap/indexes/schema.cpp index af35646873d..81aa18edff5 100644 --- a/ydb/core/tx/schemeshard/olap/indexes/schema.cpp +++ b/ydb/core/tx/schemeshard/olap/indexes/schema.cpp @@ -26,6 +26,11 @@ bool TOlapIndexSchema::ApplyUpdate(const TOlapSchema& currentSchema, const TOlap if (!object) { return false; } + auto conclusion = IndexMeta->CheckModificationCompatibility(object); + if (conclusion.IsFail()) { + errors.AddError("cannot modify index: " + conclusion.GetErrorMessage()); + return false; + } IndexMeta = NBackgroundTasks::TInterfaceProtoContainer<NOlap::NIndexes::IIndexMeta>(object); return true; } |
