aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <ivanmorozov@ydb.tech>2024-12-25 09:31:56 +0300
committerGitHub <noreply@github.com>2024-12-25 09:31:56 +0300
commitd014966628b504e87b452019ecb9a2db047e4a46 (patch)
tree48eec788f7e39ef5a360ff86f13e8a41ad19121f
parent13c7b1798a3c35e00ad677c64e948f9fa50bf436 (diff)
downloadydb-d014966628b504e87b452019ecb9a2db047e4a46.tar.gz
bloom filter for ngramms (#12893)
-rw-r--r--ydb/core/kqp/ut/olap/indexes_ut.cpp196
-rw-r--r--ydb/core/protos/flat_scheme_op.proto16
-rw-r--r--ydb/core/protos/ssa.proto209
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.cpp93
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.h91
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.cpp10
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/checker.cpp51
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/checker.h33
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.cpp91
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.h35
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.cpp152
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.h106
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/ya.make15
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/ya.make1
-rw-r--r--ydb/core/tx/columnshard/splitter/chunks.h10
-rw-r--r--ydb/library/formats/arrow/protos/ssa.proto5
16 files changed, 809 insertions, 305 deletions
diff --git a/ydb/core/kqp/ut/olap/indexes_ut.cpp b/ydb/core/kqp/ut/olap/indexes_ut.cpp
index ec03987adf..014b5c6abe 100644
--- a/ydb/core/kqp/ut/olap/indexes_ut.cpp
+++ b/ydb/core/kqp/ut/olap/indexes_ut.cpp
@@ -302,9 +302,8 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
}
AFL_VERIFY(updatesCount + 6 ==
- (ui64)csController->GetActualizationRefreshSchemeCount().Val())(
- "updates", updatesCount)("count",
- csController->GetActualizationRefreshSchemeCount().Val());
+ (ui64)csController->GetActualizationRefreshSchemeCount().Val())("updates", updatesCount)(
+ "count", csController->GetActualizationRefreshSchemeCount().Val());
}
class TTestIndexesScenario {
@@ -313,6 +312,30 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
std::unique_ptr<TKikimrRunner> Kikimr;
YDB_ACCESSOR(TString, StorageId, "__DEFAULT");
+ ui64 SkipStart = 0;
+ ui64 NoDataStart = 0;
+ ui64 ApproveStart = 0;
+
+ template <class TController>
+ void ResetZeroLevel(TController& g) {
+ SkipStart = g->GetIndexesSkippingOnSelect().Val();
+ ApproveStart = g->GetIndexesApprovedOnSelect().Val();
+ NoDataStart = g->GetIndexesSkippedNoData().Val();
+ }
+
+ void ExecuteSQL(const TString& text, const TString& expectedResult) const {
+ auto tableClient = Kikimr->GetTableClient();
+ auto it = tableClient.StreamExecuteScanQuery(text).GetValueSync();
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+ TString result = StreamResultToYson(it);
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("result", result)("expected", expectedResult);
+ auto* controller = NYDBTest::TControllers::GetControllerAs<NOlap::TWaitCompactionController>();
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("skip", controller->GetIndexesSkippingOnSelect().Val() - SkipStart)(
+ "check", controller->GetIndexesApprovedOnSelect().Val() - ApproveStart)(
+ "no_data", controller->GetIndexesSkippedNoData().Val() - NoDataStart);
+ CompareYson(result, expectedResult);
+ }
+
public:
TTestIndexesScenario& Initialize() {
Settings = TKikimrSettings().SetWithSampleTables(false);
@@ -320,7 +343,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
return *this;
}
- void Execute() const {
+ void Execute() {
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NOlap::TWaitCompactionController>();
csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);
csController->SetOverrideMemoryLimitForPortionReading(1e+10);
@@ -343,6 +366,17 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
{
auto alterQuery =
TStringBuilder() << Sprintf(
+ R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_ngramm_uid, TYPE=BLOOM_NGRAMM_FILTER,
+ FEATURES=`{"column_name" : "resource_id", "ngramm_size" : 3, "hashes_count" : 2, "filter_size_bytes" : 64024}`);
+ )",
+ StorageId.data());
+ auto session = tableClient.CreateSession().GetValueSync().GetSession();
+ auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
+ }
+ {
+ auto alterQuery =
+ TStringBuilder() << Sprintf(
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_resource_id, TYPE=BLOOM_FILTER,
FEATURES=`{"column_names" : ["resource_id", "level"], "false_positive_probability" : 0.05, "storage_id" : "%s"}`);
)",
@@ -384,22 +418,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
filler(3000000, 100000000, 110000);
}
- {
- auto it = tableClient
- .StreamExecuteScanQuery(R"(
- --!syntax_v1
-
- SELECT
- COUNT(*)
- FROM `/Root/olapStore/olapTable`
- )")
- .GetValueSync();
-
- UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
- TString result = StreamResultToYson(it);
- Cout << result << Endl;
- CompareYson(result, R"([[230000u;]])");
- }
+ ExecuteSQL(R"(SELECT COUNT(*) FROM `/Root/olapStore/olapTable`)", "[[230000u;]]");
AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() == 0);
AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() == 0);
@@ -417,54 +436,103 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
AFL_VERIFY(csController->GetCompactionStartedCounter().Val() == 21)("count", csController->GetCompactionStartedCounter().Val());
{
- auto it = tableClient
- .StreamExecuteScanQuery(R"(
- --!syntax_v1
+ ExecuteSQL(R"(SELECT COUNT(*)
+ FROM `/Root/olapStore/olapTable`
+ WHERE resource_id LIKE '%110a151' AND resource_id LIKE '110a%' AND resource_id LIKE '%dd%')", "[[0u;]]");
+ AFL_VERIFY(!csController->GetIndexesApprovedOnSelect().Val());
+ AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val());
+ }
+ {
+ ResetZeroLevel(csController);
+ ExecuteSQL(R"(SELECT COUNT(*)
+ FROM `/Root/olapStore/olapTable`
+ WHERE resource_id LIKE '%110a151%')", "[[0u;]]");
+ AFL_VERIFY(!csController->GetIndexesApprovedOnSelect().Val());
+ AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart);
+ }
+ {
+ ResetZeroLevel(csController);
+ ExecuteSQL(R"(SELECT COUNT(*)
+ FROM `/Root/olapStore/olapTable`
+ WHERE ((resource_id = '2' AND level = 222222) OR (resource_id = '1' AND level = 111111) OR (resource_id LIKE '%11dd%')) AND uid = '222')", "[[0u;]]");
- SELECT
- COUNT(*)
- FROM `/Root/olapStore/olapTable`
- WHERE ((resource_id = '2' AND level = 222222) OR (resource_id = '1' AND level = 111111) OR (resource_id LIKE '%11dd%')) AND uid = '222'
- )")
- .GetValueSync();
-
- UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
- TString result = StreamResultToYson(it);
- AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("result", result);
- AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("skip", csController->GetIndexesSkippingOnSelect().Val())(
- "check", csController->GetIndexesApprovedOnSelect().Val());
- CompareYson(result, R"([[0u;]])");
- if (StorageId == "__LOCAL_METADATA") {
- AFL_VERIFY(csController->GetIndexesSkippedNoData().Val());
- } else {
- AFL_VERIFY(csController->GetIndexesSkippedNoData().Val() == 0)("val", csController->GetIndexesSkippedNoData().Val());
+ AFL_VERIFY(csController->GetIndexesSkippedNoData().Val() == 0)("val", csController->GetIndexesSkippedNoData().Val());
+ AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() - ApproveStart < csController->GetIndexesSkippingOnSelect().Val() - SkipStart);
+ }
+ {
+ ResetZeroLevel(csController);
+ ui32 requestsCount = 100;
+ for (ui32 i = 0; i < requestsCount; ++i) {
+ const ui32 idx = RandomNumber<ui32>(uids.size());
+ const auto query = [](const TString& res, const TString& uid, const ui32 level) {
+ TStringBuilder sb;
+ sb << "SELECT COUNT(*) FROM `/Root/olapStore/olapTable`" << Endl;
+ sb << "WHERE(" << Endl;
+ sb << "resource_id = '" << res << "' AND" << Endl;
+ sb << "uid= '" << uid << "' AND" << Endl;
+ sb << "level= " << level << Endl;
+ sb << ")";
+ return sb;
+ };
+ ExecuteSQL(query(resourceIds[idx], uids[idx], levels[idx]), "[[1u;]]");
}
- AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() < csController->GetIndexesSkippingOnSelect().Val());
+ AFL_VERIFY((csController->GetIndexesApprovedOnSelect().Val() - ApproveStart) * 5 < csController->GetIndexesSkippingOnSelect().Val() - SkipStart)
+ ("approved", csController->GetIndexesApprovedOnSelect().Val() - ApproveStart)(
+ "skipped", csController->GetIndexesSkippingOnSelect().Val() - SkipStart);
}
- ui32 requestsCount = 100;
- for (ui32 i = 0; i < requestsCount; ++i) {
- const ui32 idx = RandomNumber<ui32>(uids.size());
- const auto query = [](const TString& res, const TString& uid, const ui32 level) {
- TStringBuilder sb;
- sb << "SELECT COUNT(*) FROM `/Root/olapStore/olapTable`" << Endl;
- sb << "WHERE(" << Endl;
- sb << "resource_id = '" << res << "' AND" << Endl;
- sb << "uid= '" << uid << "' AND" << Endl;
- sb << "level= " << level << Endl;
- sb << ")";
- return sb;
- };
- auto it = tableClient.StreamExecuteScanQuery(query(resourceIds[idx], uids[idx], levels[idx])).GetValueSync();
-
- UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
- TString result = StreamResultToYson(it);
- Cout << csController->GetIndexesSkippingOnSelect().Val() << " / " << csController->GetIndexesApprovedOnSelect().Val() << " / "
- << csController->GetIndexesSkippedNoData().Val() << Endl;
- CompareYson(result, R"([[1u;]])");
+ {
+ ResetZeroLevel(csController);
+ ui32 requestsCount = 100;
+ for (ui32 i = 0; i < requestsCount; ++i) {
+ const ui32 idx = RandomNumber<ui32>(uids.size());
+ const auto query = [](const TString& res, const TString& uid, const ui32 level) {
+ TStringBuilder sb;
+ sb << "SELECT COUNT(*) FROM `/Root/olapStore/olapTable`" << Endl;
+ sb << "WHERE" << Endl;
+ sb << "resource_id LIKE '%" << res << "%'" << Endl;
+ return sb;
+ };
+ ExecuteSQL(query(resourceIds[idx], uids[idx], levels[idx]), "[[1u;]]");
+ }
+ AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart > 1)("approved", csController->GetIndexesApprovedOnSelect().Val() - ApproveStart)(
+ "skipped", csController->GetIndexesSkippingOnSelect().Val() - SkipStart);
+ }
+ {
+ ResetZeroLevel(csController);
+ ui32 requestsCount = 100;
+ for (ui32 i = 0; i < requestsCount; ++i) {
+ const ui32 idx = RandomNumber<ui32>(uids.size());
+ const auto query = [](const TString& res, const TString& uid, const ui32 level) {
+ TStringBuilder sb;
+ sb << "SELECT COUNT(*) FROM `/Root/olapStore/olapTable`" << Endl;
+ sb << "WHERE" << Endl;
+ sb << "resource_id LIKE '" << res << "%'" << Endl;
+ return sb;
+ };
+ ExecuteSQL(query(resourceIds[idx], uids[idx], levels[idx]), "[[1u;]]");
+ }
+ AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart > 1)(
+ "approved", csController->GetIndexesApprovedOnSelect().Val() - ApproveStart)(
+ "skipped", csController->GetIndexesSkippingOnSelect().Val() - SkipStart);
+ }
+ {
+ ResetZeroLevel(csController);
+ ui32 requestsCount = 100;
+ for (ui32 i = 0; i < requestsCount; ++i) {
+ const ui32 idx = RandomNumber<ui32>(uids.size());
+ const auto query = [](const TString& res, const TString& uid, const ui32 level) {
+ TStringBuilder sb;
+ sb << "SELECT COUNT(*) FROM `/Root/olapStore/olapTable`" << Endl;
+ sb << "WHERE" << Endl;
+ sb << "resource_id LIKE '%" << res << "'" << Endl;
+ return sb;
+ };
+ ExecuteSQL(query(resourceIds[idx], uids[idx], levels[idx]), "[[1u;]]");
+ }
+ AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() - SkipStart > 1)(
+ "approved", csController->GetIndexesApprovedOnSelect().Val() - ApproveStart)(
+ "skipped", csController->GetIndexesSkippingOnSelect().Val() - SkipStart);
}
-
- AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() * 5 < csController->GetIndexesSkippingOnSelect().Val())
- ("approved", csController->GetIndexesApprovedOnSelect().Val())("skipped", csController->GetIndexesSkippingOnSelect().Val());
}
};
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index 72151d9d6d..4c52ff262e 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -391,6 +391,13 @@ message TRequestedBloomFilter {
repeated string ColumnNames = 3;
}
+message TRequestedBloomNGrammFilter {
+ optional uint32 NGrammSize = 1;
+ optional uint32 FilterSizeBytes = 2;
+ optional uint32 HashesCount = 3;
+ optional string ColumnName = 4;
+}
+
message TRequestedMaxIndex {
optional string ColumnName = 1;
}
@@ -410,6 +417,7 @@ message TOlapIndexRequested {
TRequestedBloomFilter BloomFilter = 40;
TRequestedMaxIndex MaxIndex = 41;
TRequestedCountMinSketch CountMinSketch = 42;
+ TRequestedBloomNGrammFilter BloomNGrammFilter = 43;
}
}
@@ -419,6 +427,13 @@ message TBloomFilter {
repeated uint32 ColumnIds = 3;
}
+message TBloomNGrammFilter {
+ optional uint32 NGrammSize = 1;
+ optional uint32 FilterSizeBytes = 2;
+ optional uint32 HashesCount = 3;
+ optional uint32 ColumnId = 4;
+}
+
message TMaxIndex {
optional uint32 ColumnId = 1;
}
@@ -441,6 +456,7 @@ message TOlapIndexDescription {
TBloomFilter BloomFilter = 41;
TMaxIndex MaxIndex = 42;
TCountMinSketch CountMinSketch = 43;
+ TBloomNGrammFilter BloomNGrammFilter = 44;
}
}
diff --git a/ydb/core/protos/ssa.proto b/ydb/core/protos/ssa.proto
deleted file mode 100644
index 5ffbf067b3..0000000000
--- a/ydb/core/protos/ssa.proto
+++ /dev/null
@@ -1,209 +0,0 @@
-package NKikimrSSA;
-option java_package = "ru.yandex.kikimr.proto";
-
-// Program to pushdown to ColumnShard
-//
-// > 'SELECT y, z WHERE x > 10'
-// PROJECTION x, y, z
-// ASSIGN tmp = x > 10
-// FILTER BY tmp
-// PROJECTION y, z
-//
-// > 'SELECT min(x), sum(y) GROUP BY z'
-// PROJECTION x, y, z
-// ASSIGN agg1 = min(x)
-// ASSIGN agg2 = sum(y)
-// GROUP BY z
-// PROJECTION agg1, agg2
-//
-message TProgram {
- message TColumn {
- optional uint64 Id = 1;
- optional string Name = 2;
- }
-
- message TConstant {
- oneof value {
- bool Bool = 1;
- int32 Int32 = 2;
- uint32 Uint32 = 3;
- int64 Int64 = 4;
- uint64 Uint64 = 5;
- float Float = 6;
- double Double = 7;
- bytes Bytes = 8;
- string Text = 9;
- int32 Int8 = 10;
- uint32 Uint8 = 11;
- int32 Int16 = 12;
- uint32 Uint16 = 13;
- uint64 Timestamp = 14;
- }
- }
-
- message TBloomFilterChecker {
- repeated uint64 HashValues = 1;
- }
-
- message TOlapIndexChecker {
- optional uint32 IndexId = 1;
- optional string ClassName = 2;
-
- message TCompositeChecker {
- repeated TOlapIndexChecker ChildrenCheckers = 1;
- }
-
- oneof Implementation {
- TBloomFilterChecker BloomFilter = 40;
- TCompositeChecker Composite = 41;
- }
- }
-
- message TParameter {
- optional string Name = 1;
- }
-
- enum EFunctionType {
- SIMPLE_ARROW = 1;
- YQL_KERNEL = 2;
- }
-
- message TAssignment {
- enum EFunction {
- FUNC_UNSPECIFIED = 0;
- FUNC_CMP_EQUAL = 1;
- FUNC_CMP_NOT_EQUAL = 2;
- FUNC_CMP_LESS = 3;
- FUNC_CMP_LESS_EQUAL = 4;
- FUNC_CMP_GREATER = 5;
- FUNC_CMP_GREATER_EQUAL = 6;
- FUNC_IS_NULL = 7;
- FUNC_STR_LENGTH = 8;
- FUNC_STR_MATCH = 9;
- FUNC_BINARY_NOT = 10;
- FUNC_BINARY_AND = 11;
- FUNC_BINARY_OR = 12;
- FUNC_BINARY_XOR = 13;
- FUNC_MATH_ADD = 14;
- FUNC_MATH_SUBTRACT = 15;
- FUNC_MATH_MULTIPLY = 16;
- FUNC_MATH_DIVIDE = 17;
- FUNC_CAST_TO_BOOLEAN = 18;
- FUNC_CAST_TO_INT8 = 19;
- FUNC_CAST_TO_INT16 = 20;
- FUNC_CAST_TO_INT32 = 21;
- FUNC_CAST_TO_INT64 = 22;
- FUNC_CAST_TO_UINT8 = 23;
- FUNC_CAST_TO_UINT16 = 24;
- FUNC_CAST_TO_UINT32 = 25;
- FUNC_CAST_TO_UINT64 = 26;
- FUNC_CAST_TO_FLOAT = 27;
- FUNC_CAST_TO_DOUBLE = 28;
- FUNC_CAST_TO_BINARY = 29;
- FUNC_CAST_TO_FIXED_SIZE_BINARY = 30;
- FUNC_CAST_TO_TIMESTAMP = 31;
- FUNC_STR_MATCH_LIKE = 32;
- FUNC_STR_STARTS_WITH = 33;
- FUNC_STR_ENDS_WITH = 34;
- FUNC_STR_MATCH_IGNORE_CASE = 35;
- FUNC_STR_STARTS_WITH_IGNORE_CASE = 36;
- FUNC_STR_ENDS_WITH_IGNORE_CASE = 37;
- }
-
- message TFunction {
- optional uint32 Id = 1; // EFunction
- repeated TColumn Arguments = 2;
- optional EFunctionType FunctionType = 3 [ default = SIMPLE_ARROW ];
- optional uint32 KernelIdx = 4;
- optional uint32 YqlOperationId = 5; // TKernelRequestBuilder::EBinaryOp
- }
-
- message TExternalFunction {
- optional string Name = 1;
- repeated TColumn Arguments = 2;
- }
-
- optional TColumn Column = 1;
- oneof expression {
- TFunction Function = 2;
- TExternalFunction ExternalFunction = 3;
- TConstant Constant = 4;
- bool Null = 5;
- TParameter Parameter = 6;
- }
- }
-
- message TAggregateAssignment {
- enum EAggregateFunction {
- AGG_UNSPECIFIED = 0;
- AGG_SOME = 1;
- AGG_COUNT = 2;
- AGG_MIN = 3;
- AGG_MAX = 4;
- AGG_SUM = 5;
- //AGG_AVG = 6;
- //AGG_VAR = 7;
- //AGG_COVAR = 8;
- //AGG_STDDEV = 9;
- //AGG_CORR = 10;
- //AGG_ARG_MIN = 11;
- //AGG_ARG_MAX = 12;
- //AGG_COUNT_DISTINCT = 13;
- //AGG_QUANTILES = 14;
- //AGG_TOP_COUNT = 15;
- //AGG_TOP_SUM = 16;
- }
-
- message TAggregateFunction {
- optional uint32 Id = 1; // EAggregateFunction
- repeated TColumn Arguments = 2;
- optional string Variant = 3; // i.e. POP/SAMP for AGG_VAR, AGG_COVAR, AGG_STDDEV
- optional EFunctionType FunctionType = 4 [ default = SIMPLE_ARROW ];
- optional uint32 KernelIdx = 5;
- // TODO: Parameters, i.e. N for topK(N)(arg)
- }
-
- optional TColumn Column = 1;
- optional TAggregateFunction Function = 2;
- }
-
- message TProjection {
- repeated TColumn Columns = 1;
- }
-
- message TFilter {
- // Predicate should be a bool column:
- // true - keep the row
- // false - remove the row
- optional TColumn Predicate = 1;
- }
-
- message TGroupBy {
- repeated TAggregateAssignment Aggregates = 1;
- repeated TColumn KeyColumns = 2;
- }
-
- message TCommand {
- oneof line {
- TAssignment Assign = 1;
- TProjection Projection = 2;
- TFilter Filter = 3;
- TGroupBy GroupBy = 4;
- // TODO: ORDER BY, LIMIT
- }
- }
-
- repeated TCommand Command = 1;
- optional uint32 Version = 2;
- optional bytes Kernels = 3;
-}
-
-message TOlapProgram {
- // Store OLAP program in serialized format in case we do not need to deserialize it in TScanTaskMeta
- // Note: when this message exists the program must be present.
- optional bytes Program = 1;
- // RecordBatch deserialization require arrow::Schema, thus store it here
- optional bytes ParametersSchema = 2;
- optional bytes Parameters = 3;
- optional TProgram.TOlapIndexChecker IndexChecker = 4;
-}
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.cpp
index 8b62f7890c..6006fe7972 100644
--- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.cpp
@@ -235,8 +235,10 @@ public:
class TPackAnd: public IRequestNode {
private:
using TBase = IRequestNode;
- THashMap<TString, std::shared_ptr<arrow::Scalar>> Conditions;
+ THashMap<TString, std::shared_ptr<arrow::Scalar>> Equals;
+ THashMap<TString, TLikeDescription> Likes;
bool IsEmptyFlag = false;
+
protected:
virtual bool DoCollapse() override {
return false;
@@ -247,10 +249,19 @@ protected:
if (IsEmptyFlag) {
result.InsertValue("empty", true);
}
- auto& arrJson = result.InsertValue("conditions", NJson::JSON_ARRAY);
- for (auto&& i : Conditions) {
- auto& jsonCondition = arrJson.AppendValue(NJson::JSON_MAP);
- jsonCondition.InsertValue(i.first, i.second->ToString());
+ {
+ auto& arrJson = result.InsertValue("equals", NJson::JSON_ARRAY);
+ for (auto&& i : Equals) {
+ auto& jsonCondition = arrJson.AppendValue(NJson::JSON_MAP);
+ jsonCondition.InsertValue(i.first, i.second->ToString());
+ }
+ }
+ {
+ auto& arrJson = result.InsertValue("likes", NJson::JSON_ARRAY);
+ for (auto&& i : Likes) {
+ auto& jsonCondition = arrJson.AppendValue(NJson::JSON_MAP);
+ jsonCondition.InsertValue(i.first, i.second.ToString());
+ }
}
return result;
}
@@ -259,32 +270,53 @@ protected:
}
public:
TPackAnd(const TPackAnd&) = default;
+
TPackAnd(const TString& cName, const std::shared_ptr<arrow::Scalar>& value)
: TBase(GetNextId("PackAnd")) {
- AddCondition(cName, value);
+ AddEqual(cName, value);
+ }
+
+ TPackAnd(const TString& cName, const TLikePart& part)
+ : TBase(GetNextId("PackAnd")) {
+ AddLike(cName, TLikeDescription(part));
}
const THashMap<TString, std::shared_ptr<arrow::Scalar>>& GetEquals() const {
- return Conditions;
+ return Equals;
+ }
+
+ const THashMap<TString, TLikeDescription>& GetLikes() const {
+ return Likes;
}
bool IsEmpty() const {
return IsEmptyFlag;
}
- void AddCondition(const TString& cName, const std::shared_ptr<arrow::Scalar>& value) {
+ void AddEqual(const TString& cName, const std::shared_ptr<arrow::Scalar>& value) {
AFL_VERIFY(value);
- auto it = Conditions.find(cName);
- if (it == Conditions.end()) {
- Conditions.emplace(cName, value);
+ auto it = Equals.find(cName);
+ if (it == Equals.end()) {
+ Equals.emplace(cName, value);
} else if (it->second->Equals(*value)) {
return;
} else {
IsEmptyFlag = true;
}
}
+ void AddLike(const TString& cName, const TLikeDescription& value) {
+ auto it = Likes.find(cName);
+ if (it == Likes.end()) {
+ Likes.emplace(cName, value);
+ } else {
+ it->second.Merge(value);
+ }
+ }
void Merge(const TPackAnd& add) {
- for (auto&& i : add.Conditions) {
- AddCondition(i.first, i.second);
+ for (auto&& i : add.Equals) {
+ AddEqual(i.first, i.second);
+ }
+ for (auto&& i : add.Likes) {
+ AddLike(i.first, i.second);
}
}
};
@@ -313,6 +345,26 @@ protected:
Parent->Exchange(GetNodeName(), std::make_shared<TPackAnd>(Children[0]->As<TOriginalColumn>()->GetColumnName(), Children[1]->As<TConstantNode>()->GetConstant()));
return true;
}
+ const bool isLike = (Operation == NYql::TKernelRequestBuilder::EBinaryOp::StringContains ||
+ Operation == NYql::TKernelRequestBuilder::EBinaryOp::StartsWith ||
+ Operation == NYql::TKernelRequestBuilder::EBinaryOp::EndsWith);
+ if (isLike && Children.size() == 2 && Children[1]->Is<TConstantNode>() && Children[0]->Is<TOriginalColumn>()) {
+ auto scalar = Children[1]->As<TConstantNode>()->GetConstant();
+ AFL_VERIFY(scalar->type->id() == arrow::binary()->id());
+ auto scalarString = static_pointer_cast<arrow::BinaryScalar>(scalar);
+ std::optional<TLikePart::EOperation> op;
+ if (Operation == NYql::TKernelRequestBuilder::EBinaryOp::StringContains) {
+ op = TLikePart::EOperation::Contains;
+ } else if (Operation == NYql::TKernelRequestBuilder::EBinaryOp::EndsWith) {
+ op = TLikePart::EOperation::EndsWith;
+ } else if (Operation == NYql::TKernelRequestBuilder::EBinaryOp::StartsWith) {
+ op = TLikePart::EOperation::StartsWith;
+ }
+ AFL_VERIFY(op);
+ TLikePart likePart(*op, TString((const char*)scalarString->value->data(), scalarString->value->size()));
+ Parent->Exchange(GetNodeName(), std::make_shared<TPackAnd>(Children[0]->As<TOriginalColumn>()->GetColumnName(), likePart));
+ return true;
+ }
if (Operation == NYql::TKernelRequestBuilder::EBinaryOp::And) {
if (Parent->Is<TOperationNode>() && Parent->As<TOperationNode>()->Operation == NYql::TKernelRequestBuilder::EBinaryOp::And) {
Parent->Attach(Children);
@@ -407,7 +459,7 @@ public:
if (arg.IsGenerated()) {
auto it = Nodes.find(arg.GetColumnName());
if (it == Nodes.end()) {
- AFL_CRIT(NKikimrServices::TX_COLUMNSHARD)("event", "program_arg_is_missing")("program", program.DebugString());
+ AFL_CRIT(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "program_arg_is_missing")("program", program.DebugString());
return false;
}
argNodes.emplace_back(it->second);
@@ -442,10 +494,10 @@ public:
};
std::shared_ptr<TDataForIndexesCheckers> TDataForIndexesCheckers::Build(const TProgramContainer& program) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("program", program.DebugString());
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("program", program.DebugString());
auto& steps = program.GetStepsVerified();
if (!steps.size()) {
- AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "no_steps_in_program");
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "no_steps_in_program");
return nullptr;
}
auto fStep = steps.front();
@@ -459,9 +511,10 @@ std::shared_ptr<TDataForIndexesCheckers> TDataForIndexesCheckers::Build(const TP
if (!rootNode) {
return nullptr;
}
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("original_program", rootNode->SerializeToJson());
while (rootNode->Collapse()) {
}
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("collapsed_program", rootNode->SerializeToJson());
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("collapsed_program", rootNode->SerializeToJson());
if (rootNode->GetChildren().size() != 1) {
return nullptr;
}
@@ -470,14 +523,14 @@ std::shared_ptr<TDataForIndexesCheckers> TDataForIndexesCheckers::Build(const TP
if (orNode->GetOperation() == NYql::TKernelRequestBuilder::EBinaryOp::Or) {
for (auto&& i : orNode->GetChildren()) {
if (auto* andPackNode = i->As<TPackAnd>()) {
- result->AddBranch(andPackNode->GetEquals());
+ result->AddBranch(andPackNode->GetEquals(), andPackNode->GetLikes());
} else if (auto* operationNode = i->As<TOperationNode>()) {
if (operationNode->GetOperation() == NYql::TKernelRequestBuilder::EBinaryOp::And) {
TPackAnd* pack = operationNode->FindFirst<TPackAnd>();
if (!pack) {
return nullptr;
}
- result->AddBranch(pack->GetEquals());
+ result->AddBranch(pack->GetEquals(), pack->GetLikes());
}
} else {
return nullptr;
@@ -485,7 +538,7 @@ std::shared_ptr<TDataForIndexesCheckers> TDataForIndexesCheckers::Build(const TP
}
}
} else if (auto* andPackNode = rootNode->GetChildren().front()->As<TPackAnd>()) {
- result->AddBranch(andPackNode->GetEquals());
+ result->AddBranch(andPackNode->GetEquals(), andPackNode->GetLikes());
} else {
return nullptr;
}
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.h
index 898c4210b0..eb2d6efca9 100644
--- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.h
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/program.h
@@ -3,30 +3,109 @@
namespace NKikimr::NOlap::NIndexes::NRequest {
+class TLikePart {
+public:
+ enum class EOperation {
+ StartsWith,
+ EndsWith,
+ Contains
+ };
+
+private:
+ YDB_READONLY(EOperation, Operation, EOperation::Contains);
+ YDB_READONLY_DEF(TString, Value);
+
+public:
+ TLikePart(const EOperation op, const TString& value)
+ : Operation(op)
+ , Value(value) {
+ }
+
+ static TLikePart MakeStart(const TString& value) {
+ return TLikePart(EOperation::StartsWith, value);
+ }
+ static TLikePart MakeEnd(const TString& value) {
+ return TLikePart(EOperation::EndsWith, value);
+ }
+ static TLikePart MakeContains(const TString& value) {
+ return TLikePart(EOperation::Contains, value);
+ }
+
+ TString ToString() const {
+ if (Operation == EOperation::StartsWith) {
+ return '%' + Value;
+ }
+ if (Operation == EOperation::EndsWith) {
+ return Value + '%';
+ }
+ if (Operation == EOperation::Contains) {
+ return Value;
+ }
+ AFL_VERIFY(false);
+ return "";
+ }
+};
+
+class TLikeDescription {
+private:
+ THashMap<TString, TLikePart> LikeSequences;
+
+public:
+ TLikeDescription(const TLikePart& likePart) {
+ LikeSequences.emplace(likePart.ToString(), likePart);
+ }
+
+ const THashMap<TString, TLikePart>& GetLikeSequences() const {
+ return LikeSequences;
+ }
+
+ void Merge(const TLikeDescription& d) {
+ for (auto&& i : d.LikeSequences) {
+ LikeSequences.emplace(i.first, i.second);
+ }
+ }
+
+ TString ToString() const {
+ TStringBuilder sb;
+ sb << "[";
+ for (auto&& i : LikeSequences) {
+ sb << i.first << ",";
+ }
+ sb << "];";
+ return sb;
+ }
+};
+
class TBranchCoverage {
private:
THashMap<TString, std::shared_ptr<arrow::Scalar>> Equals;
+ THashMap<TString, TLikeDescription> Likes;
YDB_ACCESSOR_DEF(std::vector<std::shared_ptr<IIndexChecker>>, Indexes);
+
public:
- TBranchCoverage(const THashMap<TString, std::shared_ptr<arrow::Scalar>>& equals)
+ TBranchCoverage(const THashMap<TString, std::shared_ptr<arrow::Scalar>>& equals, const THashMap<TString, TLikeDescription>& likes)
: Equals(equals)
- {
-
+ , Likes(likes) {
}
const THashMap<TString, std::shared_ptr<arrow::Scalar>>& GetEquals() const {
return Equals;
}
+ const THashMap<TString, TLikeDescription>& GetLikes() const {
+ return Likes;
+ }
+
std::shared_ptr<IIndexChecker> GetAndChecker() const;
};
class TDataForIndexesCheckers {
private:
YDB_READONLY_DEF(std::vector<std::shared_ptr<TBranchCoverage>>, Branches);
+
public:
- void AddBranch(const THashMap<TString, std::shared_ptr<arrow::Scalar>>& equalsData) {
- Branches.emplace_back(std::make_shared<TBranchCoverage>(equalsData));
+ void AddBranch(const THashMap<TString, std::shared_ptr<arrow::Scalar>>& equalsData, const THashMap<TString, TLikeDescription>& likesData) {
+ Branches.emplace_back(std::make_shared<TBranchCoverage>(equalsData, likesData));
}
static std::shared_ptr<TDataForIndexesCheckers> Build(const TProgramContainer& program);
@@ -34,4 +113,4 @@ public:
TIndexCheckerContainer GetCoverChecker() const;
};
-} // namespace NKikimr::NOlap::NIndexes::NRequest \ No newline at end of file
+} // namespace NKikimr::NOlap::NIndexes::NRequest
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.cpp
index a2d84cb10f..e3e3cf7d42 100644
--- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.cpp
@@ -55,13 +55,11 @@ void TBloomIndexMeta::DoFillIndexCheckers(const std::shared_ptr<NRequest::TDataF
hashes.emplace(hash);
};
NArrow::NHash::NXX64::TStreamStringHashCalcer calcer(0);
- for (ui32 i = 0; i < HashesCount; ++i) {
- calcer.Start();
- for (auto&& i : foundColumns) {
- NArrow::NHash::TXX64::AppendField(i.second, calcer);
- }
- BuildHashesSet(calcer.Finish(), pred);
+ calcer.Start();
+ for (auto&& i : foundColumns) {
+ NArrow::NHash::TXX64::AppendField(i.second, calcer);
}
+ BuildHashesSet(calcer.Finish(), pred);
branch->MutableIndexes().emplace_back(std::make_shared<TBloomFilterChecker>(GetIndexId(), std::move(hashes)));
}
}
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/checker.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/checker.cpp
new file mode 100644
index 0000000000..5eec032ad4
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/checker.cpp
@@ -0,0 +1,51 @@
+#include "checker.h"
+
+#include <ydb/core/formats/arrow/serializer/abstract.h>
+#include <ydb/core/tx/columnshard/engines/storage/indexes/bloom/checker.h>
+
+#include <ydb/library/formats/arrow/common/validation.h>
+
+#include <contrib/libs/apache/arrow/cpp/src/arrow/array/array_primitive.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+
+namespace NKikimr::NOlap::NIndexes::NBloomNGramm {
+
+void TFilterChecker::DoSerializeToProtoImpl(NKikimrSSA::TProgram::TOlapIndexChecker& proto) const {
+ for (auto&& i : HashValues) {
+ proto.MutableBloomNGrammFilter()->AddHashValues(i);
+ }
+}
+
+bool TFilterChecker::DoCheckImpl(const std::vector<TString>& blobs) const {
+ AFL_VERIFY(blobs.size() == 1);
+ for (auto&& blob : blobs) {
+ TFixStringBitsStorage bits(blob);
+ bool found = true;
+ for (auto&& i : HashValues) {
+ if (!bits.Get(i % bits.GetSizeBits())) {
+ found = false;
+ break;
+ }
+ }
+ if (found) {
+ // AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("size", bArray.length())("data", bArray.ToString())("index_id", GetIndexId());
+ return true;
+ }
+ }
+ return false;
+}
+
+bool TFilterChecker::DoDeserializeFromProtoImpl(const NKikimrSSA::TProgram::TOlapIndexChecker& proto) {
+ if (!proto.HasBloomNGrammFilter()) {
+ return false;
+ }
+ for (auto&& i : proto.GetBloomNGrammFilter().GetHashValues()) {
+ HashValues.emplace(i);
+ }
+ if (HashValues.empty()) {
+ return false;
+ }
+ return true;
+}
+
+} // namespace NKikimr::NOlap::NIndexes::NBloomNGramm
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/checker.h b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/checker.h
new file mode 100644
index 0000000000..37a4f3c316
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/checker.h
@@ -0,0 +1,33 @@
+#pragma once
+#include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract/simple.h>
+namespace NKikimr::NOlap::NIndexes::NBloomNGramm {
+
+class TFilterChecker: public TSimpleIndexChecker {
+public:
+ static TString GetClassNameStatic() {
+ return "BLOOM_NGRAMM_FILTER";
+ }
+
+private:
+ using TBase = TSimpleIndexChecker;
+ std::set<ui64> HashValues;
+ static inline auto Registrator = TFactory::TRegistrator<TFilterChecker>(GetClassNameStatic());
+
+protected:
+ virtual bool DoDeserializeFromProtoImpl(const NKikimrSSA::TProgram::TOlapIndexChecker& proto) override;
+ virtual void DoSerializeToProtoImpl(NKikimrSSA::TProgram::TOlapIndexChecker& proto) const override;
+
+ virtual bool DoCheckImpl(const std::vector<TString>& blobs) const override;
+
+public:
+ TFilterChecker() = default;
+ TFilterChecker(const ui32 indexId, std::set<ui64>&& hashes)
+ : TBase(indexId)
+ , HashValues(std::move(hashes)) {
+ }
+ virtual TString GetClassName() const override {
+ return GetClassNameStatic();
+ }
+};
+
+} // namespace NKikimr::NOlap::NIndexes::NBloomNGramm
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.cpp
new file mode 100644
index 0000000000..e0068eeb21
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.cpp
@@ -0,0 +1,91 @@
+#include "constructor.h"
+#include "meta.h"
+
+#include <ydb/core/tx/schemeshard/olap/schema/schema.h>
+
+namespace NKikimr::NOlap::NIndexes::NBloomNGramm {
+
+std::shared_ptr<IIndexMeta> TIndexConstructor::DoCreateIndexMeta(
+ const ui32 indexId, const TString& indexName, const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const {
+ auto* columnInfo = currentSchema.GetColumns().GetByName(ColumnName);
+ if (!columnInfo) {
+ errors.AddError("no column with name " + ColumnName);
+ return nullptr;
+ }
+ const ui32 columnId = columnInfo->GetId();
+ return std::make_shared<TIndexMeta>(indexId, indexName, GetStorageId().value_or(NBlobOperations::TGlobal::DefaultStorageId), columnId,
+ HashesCount, FilterSizeBytes, NGrammSize);
+}
+
+NKikimr::TConclusionStatus TIndexConstructor::DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) {
+ if (!jsonInfo.Has("column_name")) {
+ return TConclusionStatus::Fail("column_name have to be in bloom ngramm filter features");
+ }
+ if (!jsonInfo["column_name"].GetString(&ColumnName)) {
+ return TConclusionStatus::Fail("column_name have to be string in bloom ngramm filter features");
+ }
+ if (!ColumnName) {
+ return TConclusionStatus::Fail("empty column_name in bloom ngramm filter features");
+ }
+
+ if (!jsonInfo["ngramm_size"].IsUInteger()) {
+ return TConclusionStatus::Fail("ngramm_size have to be in bloom filter features as uint field");
+ }
+ NGrammSize = jsonInfo["ngramm_size"].GetUInteger();
+ if (NGrammSize < 3 || NGrammSize > 10) {
+ return TConclusionStatus::Fail("ngramm_size have to be in bloom ngramm filter in interval [3, 10]");
+ }
+
+ if (!jsonInfo["filter_size_bytes"].IsUInteger()) {
+ return TConclusionStatus::Fail("filter_size_bytes have to be in bloom filter features as uint field");
+ }
+ FilterSizeBytes = jsonInfo["filter_size_bytes"].GetUInteger();
+ if (FilterSizeBytes < 128 || FilterSizeBytes > (1 << 20)) {
+ return TConclusionStatus::Fail("filter_size_bytes have to be in bloom ngramm filter in interval [128, 1Mb]");
+ }
+
+ if (!jsonInfo["hashes_count"].IsUInteger()) {
+ return TConclusionStatus::Fail("hashes_count have to be in bloom filter features as uint field");
+ }
+ HashesCount = jsonInfo["hashes_count"].GetUInteger();
+ if (HashesCount < 1 || HashesCount > 10) {
+ return TConclusionStatus::Fail("hashes_count have to be in bloom ngramm filter in interval [1, 10]");
+ }
+ return TConclusionStatus::Success();
+}
+
+NKikimr::TConclusionStatus TIndexConstructor::DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) {
+ if (!proto.HasBloomNGrammFilter()) {
+ const TString errorMessage = "not found BloomNGrammFilter section in proto: \"" + proto.DebugString() + "\"";
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("problem", errorMessage);
+ return TConclusionStatus::Fail(errorMessage);
+ }
+ auto& bFilter = proto.GetBloomNGrammFilter();
+ NGrammSize = bFilter.GetNGrammSize();
+ if (NGrammSize < 3 || NGrammSize > 10) {
+ return TConclusionStatus::Fail("NGrammSize have to be in [3, 10]");
+ }
+ FilterSizeBytes = bFilter.GetFilterSizeBytes();
+ if (FilterSizeBytes < 128 || FilterSizeBytes > (1 << 20)) {
+ return TConclusionStatus::Fail("FilterSizeBytes have to be in [128, 1Mb]");
+ }
+ HashesCount = bFilter.GetHashesCount();
+ if (HashesCount < 1 || HashesCount > 10) {
+ return TConclusionStatus::Fail("HashesCount size have to be in [3, 10]");
+ }
+ ColumnName = bFilter.GetColumnName();
+ if (!ColumnName) {
+ return TConclusionStatus::Fail("empty column name");
+ }
+ return TConclusionStatus::Success();
+}
+
+void TIndexConstructor::DoSerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const {
+ auto* filterProto = proto.MutableBloomNGrammFilter();
+ filterProto->SetColumnName(ColumnName);
+ filterProto->SetNGrammSize(NGrammSize);
+ filterProto->SetFilterSizeBytes(FilterSizeBytes);
+ filterProto->SetHashesCount(HashesCount);
+}
+
+} // namespace NKikimr::NOlap::NIndexes::NBloomNGramm
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.h b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.h
new file mode 100644
index 0000000000..bf66637039
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.h
@@ -0,0 +1,35 @@
+#pragma once
+#include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract/constructor.h>
+namespace NKikimr::NOlap::NIndexes::NBloomNGramm {
+
+class TIndexConstructor: public IIndexMetaConstructor {
+public:
+ static TString GetClassNameStatic() {
+ return "BLOOM_NGRAMM_FILTER";
+ }
+
+private:
+ TString ColumnName;
+ ui32 NGrammSize = 3;
+ ui32 FilterSizeBytes = 512;
+ ui32 HashesCount = 2;
+ static inline auto Registrator = TFactory::TRegistrator<TIndexConstructor>(GetClassNameStatic());
+
+protected:
+ virtual std::shared_ptr<IIndexMeta> DoCreateIndexMeta(const ui32 indexId, const TString& indexName,
+ const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const override;
+
+ virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) override;
+
+ virtual TConclusionStatus DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) override;
+ virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& proto) const override;
+
+public:
+ TIndexConstructor() = default;
+
+ virtual TString GetClassName() const override {
+ return GetClassNameStatic();
+ }
+};
+
+} // namespace NKikimr::NOlap::NIndexes::NBloomNGramm
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.cpp
new file mode 100644
index 0000000000..af139065b9
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.cpp
@@ -0,0 +1,152 @@
+#include "checker.h"
+#include "meta.h"
+
+#include <ydb/core/formats/arrow/hash/calcer.h>
+#include <ydb/core/tx/columnshard/engines/storage/indexes/bloom/checker.h>
+#include <ydb/core/tx/program/program.h>
+#include <ydb/core/tx/schemeshard/olap/schema/schema.h>
+
+#include <ydb/library/formats/arrow/hash/xx_hash.h>
+
+#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_primitive.h>
+#include <library/cpp/deprecated/atomic/atomic.h>
+
+namespace NKikimr::NOlap::NIndexes::NBloomNGramm {
+
+class TNGrammBuilder {
+private:
+ NArrow::NHash::NXX64::TStreamStringHashCalcer HashCalcer;
+ TBuffer Zeros;
+ template <class TAction>
+ void BuildNGramms(const char* data, const ui32 dataSize, const std::optional<NRequest::TLikePart::EOperation> op, const ui32 nGrammSize,
+ const TAction& pred) const {
+ if (!op || op == NRequest::TLikePart::EOperation::StartsWith) {
+ for (ui32 c = 1; c <= nGrammSize; ++c) {
+ TBuffer fakeStart;
+ fakeStart.Fill('\0', nGrammSize - c);
+ fakeStart.Append(data, std::min(c, dataSize));
+ if (fakeStart.size() < nGrammSize) {
+ fakeStart.Append(Zeros.data(), nGrammSize - fakeStart.size());
+ }
+ pred(fakeStart.data());
+ }
+ }
+ for (ui32 c = 0; c < dataSize; ++c) {
+ if (c + nGrammSize <= dataSize) {
+ pred(data + c);
+ } else if (!op || op == NRequest::TLikePart::EOperation::EndsWith) {
+ TBuffer fakeStart;
+ fakeStart.Append(data + c, dataSize - c);
+ fakeStart.Append(Zeros.data(), nGrammSize - fakeStart.size());
+ pred(fakeStart.data());
+ }
+ }
+ }
+
+public:
+ TNGrammBuilder()
+ : HashCalcer(0) {
+ Zeros.Fill('\0', 1024);
+ }
+
+ template <class TFiller>
+ void FillNGrammHashes(const ui32 nGrammSize, const std::shared_ptr<arrow::Array>& array, const TFiller& fillData) {
+ AFL_VERIFY(array->type_id() == arrow::utf8()->id())("id", array->type()->ToString());
+ NArrow::SwitchType(array->type_id(), [&](const auto& type) {
+ using TWrap = std::decay_t<decltype(type)>;
+ using T = typename TWrap::T;
+ using TArray = typename arrow::TypeTraits<T>::ArrayType;
+ auto& typedArray = static_cast<const TArray&>(*array);
+
+ for (ui32 row = 0; row < array->length(); ++row) {
+ if (array->IsNull(row)) {
+ continue;
+ }
+ if constexpr (arrow::has_string_view<T>()) {
+ auto value = typedArray.GetView(row);
+ if (value.size() < nGrammSize) {
+ continue;
+ }
+ const auto pred = [&](const char* data) {
+ HashCalcer.Start();
+ HashCalcer.Update((const ui8*)data, nGrammSize);
+ fillData(HashCalcer.Finish());
+ };
+ BuildNGramms(value.data(), value.size(), {}, nGrammSize, pred);
+ } else {
+ AFL_VERIFY(false);
+ }
+ }
+ return true;
+ });
+ }
+
+ template <class TFiller>
+ void FillNGrammHashes(const ui32 nGrammSize, const NRequest::TLikePart::EOperation op, const TString& userReq, const TFiller& fillData) {
+ const auto pred = [&](const char* value) {
+ HashCalcer.Start();
+ HashCalcer.Update((const ui8*)value, nGrammSize);
+ fillData(HashCalcer.Finish());
+ };
+ BuildNGramms(userReq.data(), userReq.size(), op, nGrammSize, pred);
+ }
+};
+
+TString TIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader) const {
+ AFL_VERIFY(reader.GetColumnsCount() == 1)("count", reader.GetColumnsCount());
+ TNGrammBuilder builder;
+
+ TFixStringBitsStorage bits(FilterSizeBytes * 8);
+
+ const auto pred = [&](const ui64 hash) {
+ const auto predSet = [&](const ui64 hashSecondary) {
+ bits.Set(true, hashSecondary % bits.GetSizeBits());
+ };
+ BuildHashesSet(hash, predSet);
+ };
+ for (reader.Start(); reader.IsCorrect();) {
+ builder.FillNGrammHashes(NGrammSize, reader.begin()->GetCurrentChunk(), pred);
+ reader.ReadNext(reader.begin()->GetCurrentChunk()->length());
+ }
+
+ return bits.GetData();
+}
+
+void TIndexMeta::DoFillIndexCheckers(
+ const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& schema) const {
+ for (auto&& branch : info->GetBranches()) {
+ std::map<ui32, NRequest::TLikeDescription> foundColumns;
+ for (auto&& cId : ColumnIds) {
+ auto c = schema.GetColumns().GetById(cId);
+ if (!c) {
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("error", "incorrect index column")("id", cId);
+ return;
+ }
+ auto it = branch->GetLikes().find(c->GetName());
+ if (it == branch->GetLikes().end()) {
+ break;
+ }
+ foundColumns.emplace(cId, it->second);
+ }
+ if (foundColumns.size() != ColumnIds.size()) {
+ continue;
+ }
+
+ std::set<ui64> hashes;
+ const auto pred = [&](const ui64 hash) {
+ const auto predSet = [&](const ui64 hashSecondary) {
+ hashes.emplace(hashSecondary);
+ };
+ BuildHashesSet(hash, predSet);
+ };
+ TNGrammBuilder builder;
+ for (auto&& c : foundColumns) {
+ for (auto&& ls : c.second.GetLikeSequences()) {
+ builder.FillNGrammHashes(NGrammSize, ls.second.GetOperation(), ls.second.GetValue(), pred);
+ }
+ }
+ branch->MutableIndexes().emplace_back(std::make_shared<TFilterChecker>(GetIndexId(), std::move(hashes)));
+ }
+}
+
+} // namespace NKikimr::NOlap::NIndexes::NBloomNGramm
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.h b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.h
new file mode 100644
index 0000000000..98af4556a5
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.h
@@ -0,0 +1,106 @@
+#pragma once
+#include <ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.h>
+namespace NKikimr::NOlap::NIndexes::NBloomNGramm {
+
+class TIndexMeta: public TIndexByColumns {
+public:
+ static TString GetClassNameStatic() {
+ return "BLOOM_NGRAMM_FILTER";
+ }
+private:
+ using TBase = TIndexByColumns;
+ std::shared_ptr<arrow::Schema> ResultSchema;
+ ui32 NGrammSize = 3;
+ ui32 FilterSizeBytes = 512;
+ ui32 HashesCount = 2;
+ static inline auto Registrator = TFactory::TRegistrator<TIndexMeta>(GetClassNameStatic());
+ void Initialize() {
+ AFL_VERIFY(!ResultSchema);
+ std::vector<std::shared_ptr<arrow::Field>> fields = {std::make_shared<arrow::Field>("", arrow::boolean())};
+ ResultSchema = std::make_shared<arrow::Schema>(fields);
+ AFL_VERIFY(HashesCount > 0);
+ AFL_VERIFY(FilterSizeBytes > 0);
+ AFL_VERIFY(NGrammSize > 2);
+ }
+
+ static const ui64 HashesConstructorP = ((ui64)2 << 31) - 1;
+ static const ui64 HashesConstructorA = (ui64)2 << 16;
+
+ template <class TActor>
+ void BuildHashesSet(const ui64 originalHash, const TActor& actor) const {
+ AFL_VERIFY(HashesCount < HashesConstructorP);
+ for (ui32 b = 1; b <= HashesCount; ++b) {
+ const ui64 hash = (HashesConstructorA * originalHash + b) % HashesConstructorP;
+ actor(hash);
+ }
+ }
+
+ template <class TContainer, class TActor>
+ void BuildHashesSet(const TContainer& originalHashes, const TActor& actor) const {
+ AFL_VERIFY(HashesCount < HashesConstructorP);
+ for (auto&& hOriginal : originalHashes) {
+ BuildHashesSet(hOriginal, actor);
+ }
+ }
+
+protected:
+ virtual TConclusionStatus DoCheckModificationCompatibility(const IIndexMeta& /*newMeta*/) const override {
+ return TConclusionStatus::Fail("not supported");
+ }
+ virtual void DoFillIndexCheckers(const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& schema) const override;
+
+ virtual TString DoBuildIndexImpl(TChunkedBatchReader& reader) const override;
+
+ virtual bool DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& proto) override {
+ AFL_VERIFY(TBase::DoDeserializeFromProto(proto));
+ AFL_VERIFY(proto.HasBloomNGrammFilter());
+ auto& bFilter = proto.GetBloomNGrammFilter();
+ HashesCount = bFilter.GetHashesCount();
+ if (HashesCount < 1 || 10 < HashesCount) {
+ return false;
+ }
+ NGrammSize = bFilter.GetNGrammSize();
+ if (NGrammSize < 3) {
+ return false;
+ }
+ FilterSizeBytes = bFilter.GetFilterSizeBytes();
+ if (FilterSizeBytes < 128) {
+ return false;
+ }
+ if (!bFilter.HasColumnId() || !bFilter.GetColumnId()) {
+ return false;
+ }
+ ColumnIds.emplace(bFilter.GetColumnId());
+ Initialize();
+ return true;
+ }
+ virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const override {
+ auto* filterProto = proto.MutableBloomNGrammFilter();
+ AFL_VERIFY(NGrammSize >= 3);
+ AFL_VERIFY(FilterSizeBytes >= 128);
+ AFL_VERIFY(HashesCount >= 1);
+ AFL_VERIFY(ColumnIds.size() == 1);
+ filterProto->SetNGrammSize(NGrammSize);
+ filterProto->SetFilterSizeBytes(FilterSizeBytes);
+ filterProto->SetHashesCount(HashesCount);
+ filterProto->SetColumnId(*ColumnIds.begin());
+ }
+
+public:
+ TIndexMeta() = default;
+ TIndexMeta(const ui32 indexId, const TString& indexName, const TString& storageId, const ui32 columnId, const ui32 hashesCount,
+ const ui32 filterSizeBytes, const ui32 nGrammSize)
+ : TBase(indexId, indexName, { columnId }, storageId)
+ , NGrammSize(nGrammSize)
+ , FilterSizeBytes(filterSizeBytes)
+ , HashesCount(hashesCount)
+ {
+ Initialize();
+ }
+
+ virtual TString GetClassName() const override {
+ return GetClassNameStatic();
+ }
+};
+
+} // namespace NKikimr::NOlap::NIndexes
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/ya.make b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/ya.make
new file mode 100644
index 0000000000..bcba53e477
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/ya.make
@@ -0,0 +1,15 @@
+LIBRARY()
+
+SRCS(
+ GLOBAL constructor.cpp
+ GLOBAL meta.cpp
+ GLOBAL checker.cpp
+)
+
+PEERDIR(
+ ydb/core/protos
+ ydb/core/formats/arrow
+ ydb/core/tx/columnshard/engines/storage/indexes/portions
+)
+
+END()
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/ya.make b/ydb/core/tx/columnshard/engines/storage/indexes/ya.make
index 0459c906d8..b12360d262 100644
--- a/ydb/core/tx/columnshard/engines/storage/indexes/ya.make
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/ya.make
@@ -3,6 +3,7 @@ LIBRARY()
PEERDIR(
ydb/core/tx/columnshard/engines/storage/indexes/portions
ydb/core/tx/columnshard/engines/storage/indexes/bloom
+ ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm
ydb/core/tx/columnshard/engines/storage/indexes/max
ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch
)
diff --git a/ydb/core/tx/columnshard/splitter/chunks.h b/ydb/core/tx/columnshard/splitter/chunks.h
index 98dc024c2a..35063d0ae8 100644
--- a/ydb/core/tx/columnshard/splitter/chunks.h
+++ b/ydb/core/tx/columnshard/splitter/chunks.h
@@ -148,6 +148,16 @@ public:
}
}
+ bool ReadNext(const ui32 count) {
+ for (ui32 i = 0; i < count; ++i) {
+ if (!ReadNext()) {
+ AFL_VERIFY(i + 1 == count);
+ return false;
+ }
+ }
+ return true;
+ }
+
bool ReadNext() {
std::optional<bool> result;
for (auto&& i : Columns) {
diff --git a/ydb/library/formats/arrow/protos/ssa.proto b/ydb/library/formats/arrow/protos/ssa.proto
index 193c759a3a..38a0bb1480 100644
--- a/ydb/library/formats/arrow/protos/ssa.proto
+++ b/ydb/library/formats/arrow/protos/ssa.proto
@@ -45,6 +45,10 @@ message TProgram {
repeated uint64 HashValues = 1;
}
+ message TBloomNGrammFilterChecker {
+ repeated uint64 HashValues = 1;
+ }
+
message TCountMinSketchChecker {
}
@@ -60,6 +64,7 @@ message TProgram {
TBloomFilterChecker BloomFilter = 40;
TCompositeChecker Composite = 41;
TCountMinSketchChecker CountMinSketch = 42;
+ TBloomNGrammFilterChecker BloomNGrammFilter = 43;
}
}