diff options
author | ivanmorozov333 <ivanmorozov@ydb.tech> | 2024-12-25 09:31:56 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-25 09:31:56 +0300 |
commit | d014966628b504e87b452019ecb9a2db047e4a46 (patch) | |
tree | 48eec788f7e39ef5a360ff86f13e8a41ad19121f | |
parent | 13c7b1798a3c35e00ad677c64e948f9fa50bf436 (diff) | |
download | ydb-d014966628b504e87b452019ecb9a2db047e4a46.tar.gz |
bloom filter for ngramms (#12893)
16 files changed, 809 insertions, 305 deletions
diff --git a/ydb/core/kqp/ut/olap/indexes_ut.cpp b/ydb/core/kqp/ut/olap/indexes_ut.cpp index ec03987adf..014b5c6abe 100644 --- a/ydb/core/kqp/ut/olap/indexes_ut.cpp +++ b/ydb/core/kqp/ut/olap/indexes_ut.cpp @@ -302,9 +302,8 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) { } AFL_VERIFY(updatesCount + 6 == - (ui64)csController->GetActualizationRefreshSchemeCount().Val())( - "updates", updatesCount)("count", - csController->GetActualizationRefreshSchemeCount().Val()); + (ui64)csController->GetActualizationRefreshSchemeCount().Val())("updates", updatesCount)( + "count", csController->GetActualizationRefreshSchemeCount().Val()); } class TTestIndexesScenario { @@ -313,6 +312,30 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) { std::unique_ptr<TKikimrRunner> Kikimr; YDB_ACCESSOR(TString, StorageId, "__DEFAULT"); + ui64 SkipStart = 0; + ui64 NoDataStart = 0; + ui64 ApproveStart = 0; + + template <class TController> + void ResetZeroLevel(TController& g) { + SkipStart = g->GetIndexesSkippingOnSelect().Val(); + ApproveStart = g->GetIndexesApprovedOnSelect().Val(); + NoDataStart = g->GetIndexesSkippedNoData().Val(); + } + + void ExecuteSQL(const TString& text, const TString& expectedResult) const { + auto tableClient = Kikimr->GetTableClient(); + auto it = tableClient.StreamExecuteScanQuery(text).GetValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + TString result = StreamResultToYson(it); + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("result", result)("expected", expectedResult); + auto* controller = NYDBTest::TControllers::GetControllerAs<NOlap::TWaitCompactionController>(); + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("skip", controller->GetIndexesSkippingOnSelect().Val() - SkipStart)( + "check", controller->GetIndexesApprovedOnSelect().Val() - ApproveStart)( + "no_data", controller->GetIndexesSkippedNoData().Val() - NoDataStart); + CompareYson(result, expectedResult); + } + public: TTestIndexesScenario& Initialize() { Settings = TKikimrSettings().SetWithSampleTables(false); @@ -320,7 +343,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) { return *this; } - void Execute() const { + void Execute() { auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>(); csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30); csController->SetOverrideMemoryLimitForPortionReading(1e+10); @@ -343,6 +366,17 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) { { auto alterQuery = TStringBuilder() << Sprintf( + R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_ngramm_uid, TYPE=BLOOM_NGRAMM_FILTER, + FEATURES=`{"column_name" : "resource_id", "ngramm_size" : 3, "hashes_count" : 2, "filter_size_bytes" : 64024}`); + )", + StorageId.data()); + auto session = tableClient.CreateSession().GetValueSync().GetSession(); + auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString()); + } + { + auto alterQuery = + TStringBuilder() << Sprintf( R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_resource_id, TYPE=BLOOM_FILTER, FEATURES=`{"column_names" : ["resource_id", "level"], "false_positive_probability" : 0.05, "storage_id" : "%s"}`); )", @@ -384,22 +418,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) { filler(3000000, 100000000, 110000); } - { - auto it = tableClient - .StreamExecuteScanQuery(R"( - --!syntax_v1 - - SELECT - COUNT(*) - FROM `/Root/olapStore/olapTable` - )") - .GetValueSync(); - - UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); - TString result = StreamResultToYson(it); - Cout << result << Endl; - CompareYson(result, R"([[230000u;]])"); - } + ExecuteSQL(R"(SELECT COUNT(*) FROM `/Root/olapStore/olapTable`)", "[[230000u;]]"); AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() == 0); AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() == 0); @@ -417,54 +436,103 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) { AFL_VERIFY(csController->GetCompactionStartedCounter().Val() == 21)("count", csController->GetCompactionStartedCounter().Val()); { - auto it = tableClient - .StreamExecuteScanQuery(R"( - --!syntax_v1 + ExecuteSQL(R"(SELECT COUNT(*) + FROM `/Root/olapStore/olapTable` + WHERE resource_id LIKE '%110a151' AND resource_id LIKE '110a%' AND resource_id LIKE '%dd%')", "[[0u;]]"); + AFL_VERIFY(!csController->GetIndexesApprovedOnSelect().Val()); + AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val()); + } + { + ResetZeroLevel(csController); + ExecuteSQL(R"(SELECT COUNT(*) + FROM `/Root/olapStore/olapTable` + WHERE resource_id LIKE '%110a151%')", "[[0u;]]"); + AFL_VERIFY(!csController->GetIndexesApprovedOnSelect().Val()); + AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart); + } + { + ResetZeroLevel(csController); + ExecuteSQL(R"(SELECT COUNT(*) + FROM `/Root/olapStore/olapTable` + WHERE ((resource_id = '2' AND level = 222222) OR (resource_id = '1' AND level = 111111) OR (resource_id LIKE '%11dd%')) AND uid = '222')", "[[0u;]]"); - SELECT - COUNT(*) - FROM `/Root/olapStore/olapTable` - WHERE ((resource_id = '2' AND level = 222222) OR (resource_id = '1' AND level = 111111) OR (resource_id LIKE '%11dd%')) AND uid = '222' - )") - .GetValueSync(); - - UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); - TString result = StreamResultToYson(it); - AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("result", result); - AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("skip", csController->GetIndexesSkippingOnSelect().Val())( - "check", csController->GetIndexesApprovedOnSelect().Val()); - CompareYson(result, R"([[0u;]])"); - if (StorageId == "__LOCAL_METADATA") { - AFL_VERIFY(csController->GetIndexesSkippedNoData().Val()); - } else { - AFL_VERIFY(csController->GetIndexesSkippedNoData().Val() == 0)("val", csController->GetIndexesSkippedNoData().Val()); + AFL_VERIFY(csController->GetIndexesSkippedNoData().Val() == 0)("val", csController->GetIndexesSkippedNoData().Val()); + AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() - ApproveStart < csController->GetIndexesSkippingOnSelect().Val() - SkipStart); + } + { + ResetZeroLevel(csController); + ui32 requestsCount = 100; + for (ui32 i = 0; i < requestsCount; ++i) { + const ui32 idx = RandomNumber<ui32>(uids.size()); + const auto query = [](const TString& res, const TString& uid, const ui32 level) { + TStringBuilder sb; + sb << "SELECT COUNT(*) FROM `/Root/olapStore/olapTable`" << Endl; + sb << "WHERE(" << Endl; + sb << "resource_id = '" << res << "' AND" << Endl; + sb << "uid= '" << uid << "' AND" << Endl; + sb << "level= " << level << Endl; + sb << ")"; + return sb; + }; + ExecuteSQL(query(resourceIds[idx], uids[idx], levels[idx]), "[[1u;]]"); } - AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() < csController->GetIndexesSkippingOnSelect().Val()); + AFL_VERIFY((csController->GetIndexesApprovedOnSelect().Val() - ApproveStart) * 5 < csController->GetIndexesSkippingOnSelect().Val() - SkipStart) + ("approved", csController->GetIndexesApprovedOnSelect().Val() - ApproveStart)( + "skipped", csController->GetIndexesSkippingOnSelect().Val() - SkipStart); } - ui32 requestsCount = 100; - for (ui32 i = 0; i < requestsCount; ++i) { - const ui32 idx = RandomNumber<ui32>(uids.size()); - const auto query = [](const TString& res, const TString& uid, const ui32 level) { - TStringBuilder sb; - sb << "SELECT COUNT(*) FROM `/Root/olapStore/olapTable`" << Endl; - sb << "WHERE(" << Endl; - sb << "resource_id = '" << res << "' AND" << Endl; - sb << "uid= '" << uid << "' AND" << Endl; - sb << "level= " << level << Endl; - sb << ")"; - return sb; - }; - auto it = tableClient.StreamExecuteScanQuery(query(resourceIds[idx], uids[idx], levels[idx])).GetValueSync(); - - UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); - TString result = StreamResultToYson(it); - Cout << csController->GetIndexesSkippingOnSelect().Val() << " / " << csController->GetIndexesApprovedOnSelect().Val() << " / " - << csController->GetIndexesSkippedNoData().Val() << Endl; - CompareYson(result, R"([[1u;]])"); + { + ResetZeroLevel(csController); + ui32 requestsCount = 100; + for (ui32 i = 0; i < requestsCount; ++i) { + const ui32 idx = RandomNumber<ui32>(uids.size()); + const auto query = [](const TString& res, const TString& uid, const ui32 level) { + TStringBuilder sb; + sb << "SELECT COUNT(*) FROM `/Root/olapStore/olapTable`" << Endl; + sb << "WHERE" << Endl; + sb << "resource_id LIKE '%" << res << "%'" << Endl; + return sb; + }; + ExecuteSQL(query(resourceIds[idx], uids[idx], levels[idx]), "[[1u;]]"); + } + AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart > 1)("approved", csController->GetIndexesApprovedOnSelect().Val() - ApproveStart)( + "skipped", csController->GetIndexesSkippingOnSelect().Val() - SkipStart); + } + { + ResetZeroLevel(csController); + ui32 requestsCount = 100; + for (ui32 i = 0; i < requestsCount; ++i) { + const ui32 idx = RandomNumber<ui32>(uids.size()); + const auto query = [](const TString& res, const TString& uid, const ui32 level) { + TStringBuilder sb; + sb << "SELECT COUNT(*) FROM `/Root/olapStore/olapTable`" << Endl; + sb << "WHERE" << Endl; + sb << "resource_id LIKE '" << res << "%'" << Endl; + return sb; + }; + ExecuteSQL(query(resourceIds[idx], uids[idx], levels[idx]), "[[1u;]]"); + } + AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart > 1)( + "approved", csController->GetIndexesApprovedOnSelect().Val() - ApproveStart)( + "skipped", csController->GetIndexesSkippingOnSelect().Val() - SkipStart); + } + { + ResetZeroLevel(csController); + ui32 requestsCount = 100; + for (ui32 i = 0; i < requestsCount; ++i) { + const ui32 idx = RandomNumber<ui32>(uids.size()); + const auto query = [](const TString& res, const TString& uid, const ui32 level) { + TStringBuilder sb; + sb << "SELECT COUNT(*) FROM `/Root/olapStore/olapTable`" << Endl; + sb << "WHERE" << Endl; + sb << "resource_id LIKE '%" << res << "'" << Endl; + return sb; + }; + ExecuteSQL(query(resourceIds[idx], uids[idx], levels[idx]), "[[1u;]]"); + } + AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart > 1)( + "approved", csController->GetIndexesApprovedOnSelect().Val() - ApproveStart)( + "skipped", csController->GetIndexesSkippingOnSelect().Val() - SkipStart); } - - AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() * 5 < csController->GetIndexesSkippingOnSelect().Val()) - ("approved", csController->GetIndexesApprovedOnSelect().Val())("skipped", csController->GetIndexesSkippingOnSelect().Val()); } }; diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 72151d9d6d..4c52ff262e 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -391,6 +391,13 @@ message TRequestedBloomFilter { repeated string ColumnNames = 3; } +message TRequestedBloomNGrammFilter { + optional uint32 NGrammSize = 1; + optional uint32 FilterSizeBytes = 2; + optional uint32 HashesCount = 3; + optional string ColumnName = 4; +} + message TRequestedMaxIndex { optional string ColumnName = 1; } @@ -410,6 +417,7 @@ message TOlapIndexRequested { TRequestedBloomFilter BloomFilter = 40; TRequestedMaxIndex MaxIndex = 41; TRequestedCountMinSketch CountMinSketch = 42; + TRequestedBloomNGrammFilter BloomNGrammFilter = 43; } } @@ -419,6 +427,13 @@ message TBloomFilter { repeated uint32 ColumnIds = 3; } +message TBloomNGrammFilter { + optional uint32 NGrammSize = 1; + optional uint32 FilterSizeBytes = 2; + optional uint32 HashesCount = 3; + optional uint32 ColumnId = 4; +} + message TMaxIndex { optional uint32 ColumnId = 1; } @@ -441,6 +456,7 @@ message TOlapIndexDescription { TBloomFilter BloomFilter = 41; TMaxIndex MaxIndex = 42; TCountMinSketch CountMinSketch = 43; + TBloomNGrammFilter BloomNGrammFilter = 44; } } diff --git a/ydb/core/protos/ssa.proto b/ydb/core/protos/ssa.proto deleted file mode 100644 index 5ffbf067b3..0000000000 --- a/ydb/core/protos/ssa.proto +++ /dev/null @@ -1,209 +0,0 @@ -package NKikimrSSA; -option java_package = "ru.yandex.kikimr.proto"; - -// Program to pushdown to ColumnShard -// -// > 'SELECT y, z WHERE x > 10' -// PROJECTION x, y, z -// ASSIGN tmp = x > 10 -// FILTER BY tmp -// PROJECTION y, z -// -// > 'SELECT min(x), sum(y) GROUP BY z' -// PROJECTION x, y, z -// ASSIGN agg1 = min(x) -// ASSIGN agg2 = sum(y) -// GROUP BY z -// PROJECTION agg1, agg2 -// -message TProgram { - message TColumn { - optional uint64 Id = 1; - optional string Name = 2; - } - - message TConstant { - oneof value { - bool Bool = 1; - int32 Int32 = 2; - uint32 Uint32 = 3; - int64 Int64 = 4; - uint64 Uint64 = 5; - float Float = 6; - double Double = 7; - bytes Bytes = 8; - string Text = 9; - int32 Int8 = 10; - uint32 Uint8 = 11; - int32 Int16 = 12; - uint32 Uint16 = 13; - uint64 Timestamp = 14; - } - } - - message TBloomFilterChecker { - repeated uint64 HashValues = 1; - } - - message TOlapIndexChecker { - optional uint32 IndexId = 1; - optional string ClassName = 2; - - message TCompositeChecker { - repeated TOlapIndexChecker ChildrenCheckers = 1; - } - - oneof Implementation { - TBloomFilterChecker BloomFilter = 40; - TCompositeChecker Composite = 41; - } - } - - message TParameter { - optional string Name = 1; - } - - enum EFunctionType { - SIMPLE_ARROW = 1; - YQL_KERNEL = 2; - } - - message TAssignment { - enum EFunction { - FUNC_UNSPECIFIED = 0; - FUNC_CMP_EQUAL = 1; - FUNC_CMP_NOT_EQUAL = 2; - FUNC_CMP_LESS = 3; - FUNC_CMP_LESS_EQUAL = 4; - FUNC_CMP_GREATER = 5; - FUNC_CMP_GREATER_EQUAL = 6; - FUNC_IS_NULL = 7; - FUNC_STR_LENGTH = 8; - FUNC_STR_MATCH = 9; - FUNC_BINARY_NOT = 10; - FUNC_BINARY_AND = 11; - FUNC_BINARY_OR = 12; - FUNC_BINARY_XOR = 13; - FUNC_MATH_ADD = 14; - FUNC_MATH_SUBTRACT = 15; - FUNC_MATH_MULTIPLY = 16; - FUNC_MATH_DIVIDE = 17; - FUNC_CAST_TO_BOOLEAN = 18; - FUNC_CAST_TO_INT8 = 19; - FUNC_CAST_TO_INT16 = 20; - FUNC_CAST_TO_INT32 = 21; - FUNC_CAST_TO_INT64 = 22; - FUNC_CAST_TO_UINT8 = 23; - FUNC_CAST_TO_UINT16 = 24; - FUNC_CAST_TO_UINT32 = 25; - FUNC_CAST_TO_UINT64 = 26; - FUNC_CAST_TO_FLOAT = 27; - FUNC_CAST_TO_DOUBLE = 28; - FUNC_CAST_TO_BINARY = 29; - FUNC_CAST_TO_FIXED_SIZE_BINARY = 30; - FUNC_CAST_TO_TIMESTAMP = 31; - FUNC_STR_MATCH_LIKE = 32; - FUNC_STR_STARTS_WITH = 33; - FUNC_STR_ENDS_WITH = 34; - FUNC_STR_MATCH_IGNORE_CASE = 35; - FUNC_STR_STARTS_WITH_IGNORE_CASE = 36; - FUNC_STR_ENDS_WITH_IGNORE_CASE = 37; - } - - message TFunction { - optional uint32 Id = 1; // EFunction - repeated TColumn Arguments = 2; - optional EFunctionType FunctionType = 3 [ default = SIMPLE_ARROW ]; - optional uint32 KernelIdx = 4; - optional uint32 YqlOperationId = 5; // TKernelRequestBuilder::EBinaryOp - } - - message TExternalFunction { - optional string Name = 1; - repeated TColumn Arguments = 2; - } - - optional TColumn Column = 1; - oneof expression { - TFunction Function = 2; - TExternalFunction ExternalFunction = 3; - TConstant Constant = 4; - bool Null = 5; - TParameter Parameter = 6; - } - } - - message TAggregateAssignment { - enum EAggregateFunction { - AGG_UNSPECIFIED = 0; - AGG_SOME = 1; - AGG_COUNT = 2; - AGG_MIN = 3; - AGG_MAX = 4; - AGG_SUM = 5; - //AGG_AVG = 6; - //AGG_VAR = 7; - //AGG_COVAR = 8; - //AGG_STDDEV = 9; - //AGG_CORR = 10; - //AGG_ARG_MIN = 11; - //AGG_ARG_MAX = 12; - //AGG_COUNT_DISTINCT = 13; - //AGG_QUANTILES = 14; - //AGG_TOP_COUNT = 15; - //AGG_TOP_SUM = 16; - } - - message TAggregateFunction { - optional uint32 Id = 1; // EAggregateFunction - repeated TColumn Arguments = 2; - optional string Variant = 3; // i.e. POP/SAMP for AGG_VAR, AGG_COVAR, AGG_STDDEV - optional EFunctionType FunctionType = 4 [ default = SIMPLE_ARROW ]; - optional uint32 KernelIdx = 5; - // TODO: Parameters, i.e. N for topK(N)(arg) - } - - optional TColumn Column = 1; - optional TAggregateFunction Function = 2; - } - - message TProjection { - repeated TColumn Columns = 1; - } - - message TFilter { - // Predicate should be a bool column: - // true - keep the row - // false - remove the row - optional TColumn Predicate = 1; - } - - message TGroupBy { - repeated TAggregateAssignment Aggregates = 1; - repeated TColumn KeyColumns = 2; - } - - message TCommand { - oneof line { - TAssignment Assign = 1; - TProjection Projection = 2; - TFilter Filter = 3; - TGroupBy GroupBy = 4; - // TODO: ORDER BY, LIMIT - } - } - - repeated TCommand Command = 1; - optional uint32 Version = 2; - optional bytes Kernels = 3; -} - -message TOlapProgram { - // Store OLAP program in serialized format in case we do not need to deserialize it in TScanTaskMeta - // Note: when this message exists the program must be present. - optional bytes Program = 1; - // RecordBatch deserialization require arrow::Schema, thus store it here - optional bytes ParametersSchema = 2; - optional bytes Parameters = 3; - optional TProgram.TOlapIndexChecker IndexChecker = 4; -} diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.cpp index 8b62f7890c..6006fe7972 100644 --- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.cpp @@ -235,8 +235,10 @@ public: class TPackAnd: public IRequestNode { private: using TBase = IRequestNode; - THashMap<TString, std::shared_ptr<arrow::Scalar>> Conditions; + THashMap<TString, std::shared_ptr<arrow::Scalar>> Equals; + THashMap<TString, TLikeDescription> Likes; bool IsEmptyFlag = false; + protected: virtual bool DoCollapse() override { return false; @@ -247,10 +249,19 @@ protected: if (IsEmptyFlag) { result.InsertValue("empty", true); } - auto& arrJson = result.InsertValue("conditions", NJson::JSON_ARRAY); - for (auto&& i : Conditions) { - auto& jsonCondition = arrJson.AppendValue(NJson::JSON_MAP); - jsonCondition.InsertValue(i.first, i.second->ToString()); + { + auto& arrJson = result.InsertValue("equals", NJson::JSON_ARRAY); + for (auto&& i : Equals) { + auto& jsonCondition = arrJson.AppendValue(NJson::JSON_MAP); + jsonCondition.InsertValue(i.first, i.second->ToString()); + } + } + { + auto& arrJson = result.InsertValue("likes", NJson::JSON_ARRAY); + for (auto&& i : Likes) { + auto& jsonCondition = arrJson.AppendValue(NJson::JSON_MAP); + jsonCondition.InsertValue(i.first, i.second.ToString()); + } } return result; } @@ -259,32 +270,53 @@ protected: } public: TPackAnd(const TPackAnd&) = default; + TPackAnd(const TString& cName, const std::shared_ptr<arrow::Scalar>& value) : TBase(GetNextId("PackAnd")) { - AddCondition(cName, value); + AddEqual(cName, value); + } + + TPackAnd(const TString& cName, const TLikePart& part) + : TBase(GetNextId("PackAnd")) { + AddLike(cName, TLikeDescription(part)); } const THashMap<TString, std::shared_ptr<arrow::Scalar>>& GetEquals() const { - return Conditions; + return Equals; + } + + const THashMap<TString, TLikeDescription>& GetLikes() const { + return Likes; } bool IsEmpty() const { return IsEmptyFlag; } - void AddCondition(const TString& cName, const std::shared_ptr<arrow::Scalar>& value) { + void AddEqual(const TString& cName, const std::shared_ptr<arrow::Scalar>& value) { AFL_VERIFY(value); - auto it = Conditions.find(cName); - if (it == Conditions.end()) { - Conditions.emplace(cName, value); + auto it = Equals.find(cName); + if (it == Equals.end()) { + Equals.emplace(cName, value); } else if (it->second->Equals(*value)) { return; } else { IsEmptyFlag = true; } } + void AddLike(const TString& cName, const TLikeDescription& value) { + auto it = Likes.find(cName); + if (it == Likes.end()) { + Likes.emplace(cName, value); + } else { + it->second.Merge(value); + } + } void Merge(const TPackAnd& add) { - for (auto&& i : add.Conditions) { - AddCondition(i.first, i.second); + for (auto&& i : add.Equals) { + AddEqual(i.first, i.second); + } + for (auto&& i : add.Likes) { + AddLike(i.first, i.second); } } }; @@ -313,6 +345,26 @@ protected: Parent->Exchange(GetNodeName(), std::make_shared<TPackAnd>(Children[0]->As<TOriginalColumn>()->GetColumnName(), Children[1]->As<TConstantNode>()->GetConstant())); return true; } + const bool isLike = (Operation == NYql::TKernelRequestBuilder::EBinaryOp::StringContains || + Operation == NYql::TKernelRequestBuilder::EBinaryOp::StartsWith || + Operation == NYql::TKernelRequestBuilder::EBinaryOp::EndsWith); + if (isLike && Children.size() == 2 && Children[1]->Is<TConstantNode>() && Children[0]->Is<TOriginalColumn>()) { + auto scalar = Children[1]->As<TConstantNode>()->GetConstant(); + AFL_VERIFY(scalar->type->id() == arrow::binary()->id()); + auto scalarString = static_pointer_cast<arrow::BinaryScalar>(scalar); + std::optional<TLikePart::EOperation> op; + if (Operation == NYql::TKernelRequestBuilder::EBinaryOp::StringContains) { + op = TLikePart::EOperation::Contains; + } else if (Operation == NYql::TKernelRequestBuilder::EBinaryOp::EndsWith) { + op = TLikePart::EOperation::EndsWith; + } else if (Operation == NYql::TKernelRequestBuilder::EBinaryOp::StartsWith) { + op = TLikePart::EOperation::StartsWith; + } + AFL_VERIFY(op); + TLikePart likePart(*op, TString((const char*)scalarString->value->data(), scalarString->value->size())); + Parent->Exchange(GetNodeName(), std::make_shared<TPackAnd>(Children[0]->As<TOriginalColumn>()->GetColumnName(), likePart)); + return true; + } if (Operation == NYql::TKernelRequestBuilder::EBinaryOp::And) { if (Parent->Is<TOperationNode>() && Parent->As<TOperationNode>()->Operation == NYql::TKernelRequestBuilder::EBinaryOp::And) { Parent->Attach(Children); @@ -407,7 +459,7 @@ public: if (arg.IsGenerated()) { auto it = Nodes.find(arg.GetColumnName()); if (it == Nodes.end()) { - AFL_CRIT(NKikimrServices::TX_COLUMNSHARD)("event", "program_arg_is_missing")("program", program.DebugString()); + AFL_CRIT(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "program_arg_is_missing")("program", program.DebugString()); return false; } argNodes.emplace_back(it->second); @@ -442,10 +494,10 @@ public: }; std::shared_ptr<TDataForIndexesCheckers> TDataForIndexesCheckers::Build(const TProgramContainer& program) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("program", program.DebugString()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("program", program.DebugString()); auto& steps = program.GetStepsVerified(); if (!steps.size()) { - AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "no_steps_in_program"); + AFL_WARN(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "no_steps_in_program"); return nullptr; } auto fStep = steps.front(); @@ -459,9 +511,10 @@ std::shared_ptr<TDataForIndexesCheckers> TDataForIndexesCheckers::Build(const TP if (!rootNode) { return nullptr; } + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("original_program", rootNode->SerializeToJson()); while (rootNode->Collapse()) { } - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("collapsed_program", rootNode->SerializeToJson()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("collapsed_program", rootNode->SerializeToJson()); if (rootNode->GetChildren().size() != 1) { return nullptr; } @@ -470,14 +523,14 @@ std::shared_ptr<TDataForIndexesCheckers> TDataForIndexesCheckers::Build(const TP if (orNode->GetOperation() == NYql::TKernelRequestBuilder::EBinaryOp::Or) { for (auto&& i : orNode->GetChildren()) { if (auto* andPackNode = i->As<TPackAnd>()) { - result->AddBranch(andPackNode->GetEquals()); + result->AddBranch(andPackNode->GetEquals(), andPackNode->GetLikes()); } else if (auto* operationNode = i->As<TOperationNode>()) { if (operationNode->GetOperation() == NYql::TKernelRequestBuilder::EBinaryOp::And) { TPackAnd* pack = operationNode->FindFirst<TPackAnd>(); if (!pack) { return nullptr; } - result->AddBranch(pack->GetEquals()); + result->AddBranch(pack->GetEquals(), pack->GetLikes()); } } else { return nullptr; @@ -485,7 +538,7 @@ std::shared_ptr<TDataForIndexesCheckers> TDataForIndexesCheckers::Build(const TP } } } else if (auto* andPackNode = rootNode->GetChildren().front()->As<TPackAnd>()) { - result->AddBranch(andPackNode->GetEquals()); + result->AddBranch(andPackNode->GetEquals(), andPackNode->GetLikes()); } else { return nullptr; } diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.h index 898c4210b0..eb2d6efca9 100644 --- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.h +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.h @@ -3,30 +3,109 @@ namespace NKikimr::NOlap::NIndexes::NRequest { +class TLikePart { +public: + enum class EOperation { + StartsWith, + EndsWith, + Contains + }; + +private: + YDB_READONLY(EOperation, Operation, EOperation::Contains); + YDB_READONLY_DEF(TString, Value); + +public: + TLikePart(const EOperation op, const TString& value) + : Operation(op) + , Value(value) { + } + + static TLikePart MakeStart(const TString& value) { + return TLikePart(EOperation::StartsWith, value); + } + static TLikePart MakeEnd(const TString& value) { + return TLikePart(EOperation::EndsWith, value); + } + static TLikePart MakeContains(const TString& value) { + return TLikePart(EOperation::Contains, value); + } + + TString ToString() const { + if (Operation == EOperation::StartsWith) { + return '%' + Value; + } + if (Operation == EOperation::EndsWith) { + return Value + '%'; + } + if (Operation == EOperation::Contains) { + return Value; + } + AFL_VERIFY(false); + return ""; + } +}; + +class TLikeDescription { +private: + THashMap<TString, TLikePart> LikeSequences; + +public: + TLikeDescription(const TLikePart& likePart) { + LikeSequences.emplace(likePart.ToString(), likePart); + } + + const THashMap<TString, TLikePart>& GetLikeSequences() const { + return LikeSequences; + } + + void Merge(const TLikeDescription& d) { + for (auto&& i : d.LikeSequences) { + LikeSequences.emplace(i.first, i.second); + } + } + + TString ToString() const { + TStringBuilder sb; + sb << "["; + for (auto&& i : LikeSequences) { + sb << i.first << ","; + } + sb << "];"; + return sb; + } +}; + class TBranchCoverage { private: THashMap<TString, std::shared_ptr<arrow::Scalar>> Equals; + THashMap<TString, TLikeDescription> Likes; YDB_ACCESSOR_DEF(std::vector<std::shared_ptr<IIndexChecker>>, Indexes); + public: - TBranchCoverage(const THashMap<TString, std::shared_ptr<arrow::Scalar>>& equals) + TBranchCoverage(const THashMap<TString, std::shared_ptr<arrow::Scalar>>& equals, const THashMap<TString, TLikeDescription>& likes) : Equals(equals) - { - + , Likes(likes) { } const THashMap<TString, std::shared_ptr<arrow::Scalar>>& GetEquals() const { return Equals; } + const THashMap<TString, TLikeDescription>& GetLikes() const { + return Likes; + } + std::shared_ptr<IIndexChecker> GetAndChecker() const; }; class TDataForIndexesCheckers { private: YDB_READONLY_DEF(std::vector<std::shared_ptr<TBranchCoverage>>, Branches); + public: - void AddBranch(const THashMap<TString, std::shared_ptr<arrow::Scalar>>& equalsData) { - Branches.emplace_back(std::make_shared<TBranchCoverage>(equalsData)); + void AddBranch(const THashMap<TString, std::shared_ptr<arrow::Scalar>>& equalsData, const THashMap<TString, TLikeDescription>& likesData) { + Branches.emplace_back(std::make_shared<TBranchCoverage>(equalsData, likesData)); } static std::shared_ptr<TDataForIndexesCheckers> Build(const TProgramContainer& program); @@ -34,4 +113,4 @@ public: TIndexCheckerContainer GetCoverChecker() const; }; -} // namespace NKikimr::NOlap::NIndexes::NRequest
\ No newline at end of file +} // namespace NKikimr::NOlap::NIndexes::NRequest diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.cpp index a2d84cb10f..e3e3cf7d42 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.cpp +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.cpp @@ -55,13 +55,11 @@ void TBloomIndexMeta::DoFillIndexCheckers(const std::shared_ptr<NRequest::TDataF hashes.emplace(hash); }; NArrow::NHash::NXX64::TStreamStringHashCalcer calcer(0); - for (ui32 i = 0; i < HashesCount; ++i) { - calcer.Start(); - for (auto&& i : foundColumns) { - NArrow::NHash::TXX64::AppendField(i.second, calcer); - } - BuildHashesSet(calcer.Finish(), pred); + calcer.Start(); + for (auto&& i : foundColumns) { + NArrow::NHash::TXX64::AppendField(i.second, calcer); } + BuildHashesSet(calcer.Finish(), pred); branch->MutableIndexes().emplace_back(std::make_shared<TBloomFilterChecker>(GetIndexId(), std::move(hashes))); } } diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/checker.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/checker.cpp new file mode 100644 index 0000000000..5eec032ad4 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/checker.cpp @@ -0,0 +1,51 @@ +#include "checker.h" + +#include <ydb/core/formats/arrow/serializer/abstract.h> +#include <ydb/core/tx/columnshard/engines/storage/indexes/bloom/checker.h> + +#include <ydb/library/formats/arrow/common/validation.h> + +#include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_primitive.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> + +namespace NKikimr::NOlap::NIndexes::NBloomNGramm { + +void TFilterChecker::DoSerializeToProtoImpl(NKikimrSSA::TProgram::TOlapIndexChecker& proto) const { + for (auto&& i : HashValues) { + proto.MutableBloomNGrammFilter()->AddHashValues(i); + } +} + +bool TFilterChecker::DoCheckImpl(const std::vector<TString>& blobs) const { + AFL_VERIFY(blobs.size() == 1); + for (auto&& blob : blobs) { + TFixStringBitsStorage bits(blob); + bool found = true; + for (auto&& i : HashValues) { + if (!bits.Get(i % bits.GetSizeBits())) { + found = false; + break; + } + } + if (found) { + // AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("size", bArray.length())("data", bArray.ToString())("index_id", GetIndexId()); + return true; + } + } + return false; +} + +bool TFilterChecker::DoDeserializeFromProtoImpl(const NKikimrSSA::TProgram::TOlapIndexChecker& proto) { + if (!proto.HasBloomNGrammFilter()) { + return false; + } + for (auto&& i : proto.GetBloomNGrammFilter().GetHashValues()) { + HashValues.emplace(i); + } + if (HashValues.empty()) { + return false; + } + return true; +} + +} // namespace NKikimr::NOlap::NIndexes::NBloomNGramm diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/checker.h b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/checker.h new file mode 100644 index 0000000000..37a4f3c316 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/checker.h @@ -0,0 +1,33 @@ +#pragma once +#include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract/simple.h> +namespace NKikimr::NOlap::NIndexes::NBloomNGramm { + +class TFilterChecker: public TSimpleIndexChecker { +public: + static TString GetClassNameStatic() { + return "BLOOM_NGRAMM_FILTER"; + } + +private: + using TBase = TSimpleIndexChecker; + std::set<ui64> HashValues; + static inline auto Registrator = TFactory::TRegistrator<TFilterChecker>(GetClassNameStatic()); + +protected: + virtual bool DoDeserializeFromProtoImpl(const NKikimrSSA::TProgram::TOlapIndexChecker& proto) override; + virtual void DoSerializeToProtoImpl(NKikimrSSA::TProgram::TOlapIndexChecker& proto) const override; + + virtual bool DoCheckImpl(const std::vector<TString>& blobs) const override; + +public: + TFilterChecker() = default; + TFilterChecker(const ui32 indexId, std::set<ui64>&& hashes) + : TBase(indexId) + , HashValues(std::move(hashes)) { + } + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } +}; + +} // namespace NKikimr::NOlap::NIndexes::NBloomNGramm diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.cpp new file mode 100644 index 0000000000..e0068eeb21 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.cpp @@ -0,0 +1,91 @@ +#include "constructor.h" +#include "meta.h" + +#include <ydb/core/tx/schemeshard/olap/schema/schema.h> + +namespace NKikimr::NOlap::NIndexes::NBloomNGramm { + +std::shared_ptr<IIndexMeta> TIndexConstructor::DoCreateIndexMeta( + const ui32 indexId, const TString& indexName, const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const { + auto* columnInfo = currentSchema.GetColumns().GetByName(ColumnName); + if (!columnInfo) { + errors.AddError("no column with name " + ColumnName); + return nullptr; + } + const ui32 columnId = columnInfo->GetId(); + return std::make_shared<TIndexMeta>(indexId, indexName, GetStorageId().value_or(NBlobOperations::TGlobal::DefaultStorageId), columnId, + HashesCount, FilterSizeBytes, NGrammSize); +} + +NKikimr::TConclusionStatus TIndexConstructor::DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) { + if (!jsonInfo.Has("column_name")) { + return TConclusionStatus::Fail("column_name have to be in bloom ngramm filter features"); + } + if (!jsonInfo["column_name"].GetString(&ColumnName)) { + return TConclusionStatus::Fail("column_name have to be string in bloom ngramm filter features"); + } + if (!ColumnName) { + return TConclusionStatus::Fail("empty column_name in bloom ngramm filter features"); + } + + if (!jsonInfo["ngramm_size"].IsUInteger()) { + return TConclusionStatus::Fail("ngramm_size have to be in bloom filter features as uint field"); + } + NGrammSize = jsonInfo["ngramm_size"].GetUInteger(); + if (NGrammSize < 3 || NGrammSize > 10) { + return TConclusionStatus::Fail("ngramm_size have to be in bloom ngramm filter in interval [3, 10]"); + } + + if (!jsonInfo["filter_size_bytes"].IsUInteger()) { + return TConclusionStatus::Fail("filter_size_bytes have to be in bloom filter features as uint field"); + } + FilterSizeBytes = jsonInfo["filter_size_bytes"].GetUInteger(); + if (FilterSizeBytes < 128 || FilterSizeBytes > (1 << 20)) { + return TConclusionStatus::Fail("filter_size_bytes have to be in bloom ngramm filter in interval [128, 1Mb]"); + } + + if (!jsonInfo["hashes_count"].IsUInteger()) { + return TConclusionStatus::Fail("hashes_count have to be in bloom filter features as uint field"); + } + HashesCount = jsonInfo["hashes_count"].GetUInteger(); + if (HashesCount < 1 || HashesCount > 10) { + return TConclusionStatus::Fail("hashes_count have to be in bloom ngramm filter in interval [1, 10]"); + } + return TConclusionStatus::Success(); +} + +NKikimr::TConclusionStatus TIndexConstructor::DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) { + if (!proto.HasBloomNGrammFilter()) { + const TString errorMessage = "not found BloomNGrammFilter section in proto: \"" + proto.DebugString() + "\""; + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("problem", errorMessage); + return TConclusionStatus::Fail(errorMessage); + } + auto& bFilter = proto.GetBloomNGrammFilter(); + NGrammSize = bFilter.GetNGrammSize(); + if (NGrammSize < 3 || NGrammSize > 10) { + return TConclusionStatus::Fail("NGrammSize have to be in [3, 10]"); + } + FilterSizeBytes = bFilter.GetFilterSizeBytes(); + if (FilterSizeBytes < 128 || FilterSizeBytes > (1 << 20)) { + return TConclusionStatus::Fail("FilterSizeBytes have to be in [128, 1Mb]"); + } + HashesCount = bFilter.GetHashesCount(); + if (HashesCount < 1 || HashesCount > 10) { + return TConclusionStatus::Fail("HashesCount size have to be in [3, 10]"); + } + ColumnName = bFilter.GetColumnName(); + if (!ColumnName) { + return TConclusionStatus::Fail("empty column name"); + } + return TConclusionStatus::Success(); +} + +void TIndexConstructor::DoSerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const { + auto* filterProto = proto.MutableBloomNGrammFilter(); + filterProto->SetColumnName(ColumnName); + filterProto->SetNGrammSize(NGrammSize); + filterProto->SetFilterSizeBytes(FilterSizeBytes); + filterProto->SetHashesCount(HashesCount); +} + +} // namespace NKikimr::NOlap::NIndexes::NBloomNGramm diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.h b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.h new file mode 100644 index 0000000000..bf66637039 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.h @@ -0,0 +1,35 @@ +#pragma once +#include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract/constructor.h> +namespace NKikimr::NOlap::NIndexes::NBloomNGramm { + +class TIndexConstructor: public IIndexMetaConstructor { +public: + static TString GetClassNameStatic() { + return "BLOOM_NGRAMM_FILTER"; + } + +private: + TString ColumnName; + ui32 NGrammSize = 3; + ui32 FilterSizeBytes = 512; + ui32 HashesCount = 2; + static inline auto Registrator = TFactory::TRegistrator<TIndexConstructor>(GetClassNameStatic()); + +protected: + virtual std::shared_ptr<IIndexMeta> DoCreateIndexMeta(const ui32 indexId, const TString& indexName, + const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const override; + + virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) override; + + virtual TConclusionStatus DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) override; + virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const override; + +public: + TIndexConstructor() = default; + + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } +}; + +} // namespace NKikimr::NOlap::NIndexes::NBloomNGramm diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.cpp new file mode 100644 index 0000000000..af139065b9 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.cpp @@ -0,0 +1,152 @@ +#include "checker.h" +#include "meta.h" + +#include <ydb/core/formats/arrow/hash/calcer.h> +#include <ydb/core/tx/columnshard/engines/storage/indexes/bloom/checker.h> +#include <ydb/core/tx/program/program.h> +#include <ydb/core/tx/schemeshard/olap/schema/schema.h> + +#include <ydb/library/formats/arrow/hash/xx_hash.h> + +#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_primitive.h> +#include <library/cpp/deprecated/atomic/atomic.h> + +namespace NKikimr::NOlap::NIndexes::NBloomNGramm { + +class TNGrammBuilder { +private: + NArrow::NHash::NXX64::TStreamStringHashCalcer HashCalcer; + TBuffer Zeros; + template <class TAction> + void BuildNGramms(const char* data, const ui32 dataSize, const std::optional<NRequest::TLikePart::EOperation> op, const ui32 nGrammSize, + const TAction& pred) const { + if (!op || op == NRequest::TLikePart::EOperation::StartsWith) { + for (ui32 c = 1; c <= nGrammSize; ++c) { + TBuffer fakeStart; + fakeStart.Fill('\0', nGrammSize - c); + fakeStart.Append(data, std::min(c, dataSize)); + if (fakeStart.size() < nGrammSize) { + fakeStart.Append(Zeros.data(), nGrammSize - fakeStart.size()); + } + pred(fakeStart.data()); + } + } + for (ui32 c = 0; c < dataSize; ++c) { + if (c + nGrammSize <= dataSize) { + pred(data + c); + } else if (!op || op == NRequest::TLikePart::EOperation::EndsWith) { + TBuffer fakeStart; + fakeStart.Append(data + c, dataSize - c); + fakeStart.Append(Zeros.data(), nGrammSize - fakeStart.size()); + pred(fakeStart.data()); + } + } + } + +public: + TNGrammBuilder() + : HashCalcer(0) { + Zeros.Fill('\0', 1024); + } + + template <class TFiller> + void FillNGrammHashes(const ui32 nGrammSize, const std::shared_ptr<arrow::Array>& array, const TFiller& fillData) { + AFL_VERIFY(array->type_id() == arrow::utf8()->id())("id", array->type()->ToString()); + NArrow::SwitchType(array->type_id(), [&](const auto& type) { + using TWrap = std::decay_t<decltype(type)>; + using T = typename TWrap::T; + using TArray = typename arrow::TypeTraits<T>::ArrayType; + auto& typedArray = static_cast<const TArray&>(*array); + + for (ui32 row = 0; row < array->length(); ++row) { + if (array->IsNull(row)) { + continue; + } + if constexpr (arrow::has_string_view<T>()) { + auto value = typedArray.GetView(row); + if (value.size() < nGrammSize) { + continue; + } + const auto pred = [&](const char* data) { + HashCalcer.Start(); + HashCalcer.Update((const ui8*)data, nGrammSize); + fillData(HashCalcer.Finish()); + }; + BuildNGramms(value.data(), value.size(), {}, nGrammSize, pred); + } else { + AFL_VERIFY(false); + } + } + return true; + }); + } + + template <class TFiller> + void FillNGrammHashes(const ui32 nGrammSize, const NRequest::TLikePart::EOperation op, const TString& userReq, const TFiller& fillData) { + const auto pred = [&](const char* value) { + HashCalcer.Start(); + HashCalcer.Update((const ui8*)value, nGrammSize); + fillData(HashCalcer.Finish()); + }; + BuildNGramms(userReq.data(), userReq.size(), op, nGrammSize, pred); + } +}; + +TString TIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader) const { + AFL_VERIFY(reader.GetColumnsCount() == 1)("count", reader.GetColumnsCount()); + TNGrammBuilder builder; + + TFixStringBitsStorage bits(FilterSizeBytes * 8); + + const auto pred = [&](const ui64 hash) { + const auto predSet = [&](const ui64 hashSecondary) { + bits.Set(true, hashSecondary % bits.GetSizeBits()); + }; + BuildHashesSet(hash, predSet); + }; + for (reader.Start(); reader.IsCorrect();) { + builder.FillNGrammHashes(NGrammSize, reader.begin()->GetCurrentChunk(), pred); + reader.ReadNext(reader.begin()->GetCurrentChunk()->length()); + } + + return bits.GetData(); +} + +void TIndexMeta::DoFillIndexCheckers( + const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& schema) const { + for (auto&& branch : info->GetBranches()) { + std::map<ui32, NRequest::TLikeDescription> foundColumns; + for (auto&& cId : ColumnIds) { + auto c = schema.GetColumns().GetById(cId); + if (!c) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("error", "incorrect index column")("id", cId); + return; + } + auto it = branch->GetLikes().find(c->GetName()); + if (it == branch->GetLikes().end()) { + break; + } + foundColumns.emplace(cId, it->second); + } + if (foundColumns.size() != ColumnIds.size()) { + continue; + } + + std::set<ui64> hashes; + const auto pred = [&](const ui64 hash) { + const auto predSet = [&](const ui64 hashSecondary) { + hashes.emplace(hashSecondary); + }; + BuildHashesSet(hash, predSet); + }; + TNGrammBuilder builder; + for (auto&& c : foundColumns) { + for (auto&& ls : c.second.GetLikeSequences()) { + builder.FillNGrammHashes(NGrammSize, ls.second.GetOperation(), ls.second.GetValue(), pred); + } + } + branch->MutableIndexes().emplace_back(std::make_shared<TFilterChecker>(GetIndexId(), std::move(hashes))); + } +} + +} // namespace NKikimr::NOlap::NIndexes::NBloomNGramm diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.h b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.h new file mode 100644 index 0000000000..98af4556a5 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.h @@ -0,0 +1,106 @@ +#pragma once +#include <ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.h> +namespace NKikimr::NOlap::NIndexes::NBloomNGramm { + +class TIndexMeta: public TIndexByColumns { +public: + static TString GetClassNameStatic() { + return "BLOOM_NGRAMM_FILTER"; + } +private: + using TBase = TIndexByColumns; + std::shared_ptr<arrow::Schema> ResultSchema; + ui32 NGrammSize = 3; + ui32 FilterSizeBytes = 512; + ui32 HashesCount = 2; + static inline auto Registrator = TFactory::TRegistrator<TIndexMeta>(GetClassNameStatic()); + void Initialize() { + AFL_VERIFY(!ResultSchema); + std::vector<std::shared_ptr<arrow::Field>> fields = {std::make_shared<arrow::Field>("", arrow::boolean())}; + ResultSchema = std::make_shared<arrow::Schema>(fields); + AFL_VERIFY(HashesCount > 0); + AFL_VERIFY(FilterSizeBytes > 0); + AFL_VERIFY(NGrammSize > 2); + } + + static const ui64 HashesConstructorP = ((ui64)2 << 31) - 1; + static const ui64 HashesConstructorA = (ui64)2 << 16; + + template <class TActor> + void BuildHashesSet(const ui64 originalHash, const TActor& actor) const { + AFL_VERIFY(HashesCount < HashesConstructorP); + for (ui32 b = 1; b <= HashesCount; ++b) { + const ui64 hash = (HashesConstructorA * originalHash + b) % HashesConstructorP; + actor(hash); + } + } + + template <class TContainer, class TActor> + void BuildHashesSet(const TContainer& originalHashes, const TActor& actor) const { + AFL_VERIFY(HashesCount < HashesConstructorP); + for (auto&& hOriginal : originalHashes) { + BuildHashesSet(hOriginal, actor); + } + } + +protected: + virtual TConclusionStatus DoCheckModificationCompatibility(const IIndexMeta& /*newMeta*/) const override { + return TConclusionStatus::Fail("not supported"); + } + virtual void DoFillIndexCheckers(const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& schema) const override; + + virtual TString DoBuildIndexImpl(TChunkedBatchReader& reader) const override; + + virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) override { + AFL_VERIFY(TBase::DoDeserializeFromProto(proto)); + AFL_VERIFY(proto.HasBloomNGrammFilter()); + auto& bFilter = proto.GetBloomNGrammFilter(); + HashesCount = bFilter.GetHashesCount(); + if (HashesCount < 1 || 10 < HashesCount) { + return false; + } + NGrammSize = bFilter.GetNGrammSize(); + if (NGrammSize < 3) { + return false; + } + FilterSizeBytes = bFilter.GetFilterSizeBytes(); + if (FilterSizeBytes < 128) { + return false; + } + if (!bFilter.HasColumnId() || !bFilter.GetColumnId()) { + return false; + } + ColumnIds.emplace(bFilter.GetColumnId()); + Initialize(); + return true; + } + virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const override { + auto* filterProto = proto.MutableBloomNGrammFilter(); + AFL_VERIFY(NGrammSize >= 3); + AFL_VERIFY(FilterSizeBytes >= 128); + AFL_VERIFY(HashesCount >= 1); + AFL_VERIFY(ColumnIds.size() == 1); + filterProto->SetNGrammSize(NGrammSize); + filterProto->SetFilterSizeBytes(FilterSizeBytes); + filterProto->SetHashesCount(HashesCount); + filterProto->SetColumnId(*ColumnIds.begin()); + } + +public: + TIndexMeta() = default; + TIndexMeta(const ui32 indexId, const TString& indexName, const TString& storageId, const ui32 columnId, const ui32 hashesCount, + const ui32 filterSizeBytes, const ui32 nGrammSize) + : TBase(indexId, indexName, { columnId }, storageId) + , NGrammSize(nGrammSize) + , FilterSizeBytes(filterSizeBytes) + , HashesCount(hashesCount) + { + Initialize(); + } + + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } +}; + +} // namespace NKikimr::NOlap::NIndexes diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/ya.make b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/ya.make new file mode 100644 index 0000000000..bcba53e477 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/ya.make @@ -0,0 +1,15 @@ +LIBRARY() + +SRCS( + GLOBAL constructor.cpp + GLOBAL meta.cpp + GLOBAL checker.cpp +) + +PEERDIR( + ydb/core/protos + ydb/core/formats/arrow + ydb/core/tx/columnshard/engines/storage/indexes/portions +) + +END() diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/ya.make b/ydb/core/tx/columnshard/engines/storage/indexes/ya.make index 0459c906d8..b12360d262 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/ya.make +++ b/ydb/core/tx/columnshard/engines/storage/indexes/ya.make @@ -3,6 +3,7 @@ LIBRARY() PEERDIR( ydb/core/tx/columnshard/engines/storage/indexes/portions ydb/core/tx/columnshard/engines/storage/indexes/bloom + ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm ydb/core/tx/columnshard/engines/storage/indexes/max ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch ) diff --git a/ydb/core/tx/columnshard/splitter/chunks.h b/ydb/core/tx/columnshard/splitter/chunks.h index 98dc024c2a..35063d0ae8 100644 --- a/ydb/core/tx/columnshard/splitter/chunks.h +++ b/ydb/core/tx/columnshard/splitter/chunks.h @@ -148,6 +148,16 @@ public: } } + bool ReadNext(const ui32 count) { + for (ui32 i = 0; i < count; ++i) { + if (!ReadNext()) { + AFL_VERIFY(i + 1 == count); + return false; + } + } + return true; + } + bool ReadNext() { std::optional<bool> result; for (auto&& i : Columns) { diff --git a/ydb/library/formats/arrow/protos/ssa.proto b/ydb/library/formats/arrow/protos/ssa.proto index 193c759a3a..38a0bb1480 100644 --- a/ydb/library/formats/arrow/protos/ssa.proto +++ b/ydb/library/formats/arrow/protos/ssa.proto @@ -45,6 +45,10 @@ message TProgram { repeated uint64 HashValues = 1; } + message TBloomNGrammFilterChecker { + repeated uint64 HashValues = 1; + } + message TCountMinSketchChecker { } @@ -60,6 +64,7 @@ message TProgram { TBloomFilterChecker BloomFilter = 40; TCompositeChecker Composite = 41; TCountMinSketchChecker CountMinSketch = 42; + TBloomNGrammFilterChecker BloomNGrammFilter = 43; } } |