aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <ivanmorozov@ydb.tech>2025-03-01 12:28:33 +0300
committerGitHub <noreply@github.com>2025-03-01 12:28:33 +0300
commit1215fa43fddf7d1ed0bfc8644a014dd909347f1b (patch)
tree8bb6b371ade1bc883b2cc234c21b5db914eb3b86
parentd08419b4a78e460944e81523fe9c7ce7c7f5ca14 (diff)
downloadydb-1215fa43fddf7d1ed0bfc8644a014dd909347f1b.tar.gz
indexes data extractor for providing subcolumns info (#15175)
-rw-r--r--.github/config/muted_ya.txt1
-rw-r--r--ydb/core/formats/arrow/accessor/sub_columns/stats.h8
-rw-r--r--ydb/core/kqp/ut/olap/indexes_ut.cpp7
-rw-r--r--ydb/core/kqp/ut/olap/json_ut.cpp160
-rw-r--r--ydb/core/protos/flat_scheme_op.proto21
-rw-r--r--ydb/core/tx/columnshard/bg_tasks/abstract/status_channel.h2
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract/common.cpp38
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract/common.h105
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract/coverage.cpp17
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract/coverage.h32
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract/tree.cpp43
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract/tree.h118
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract/ut/ut_program.cpp31
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/indexes/abstract/ya.make3
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/bloom/checker.cpp1
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/bloom/checker.h6
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/bloom/constructor.cpp26
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/bloom/constructor.h9
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.cpp95
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.h33
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.cpp17
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.h2
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.cpp59
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.h13
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/constructor.cpp7
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.cpp45
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.h22
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/max/meta.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/max/meta.h13
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/abstract.cpp36
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/abstract.h73
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/default.cpp61
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/default.h39
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/sub_column.cpp38
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/sub_column.h60
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/ya.make18
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.cpp24
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.h33
-rw-r--r--ydb/core/tx/columnshard/engines/storage/indexes/portions/ya.make1
-rw-r--r--ydb/core/tx/columnshard/splitter/chunks.h50
-rw-r--r--ydb/core/tx/columnshard/test_helper/program_constructor.cpp15
-rw-r--r--ydb/core/tx/columnshard/test_helper/program_constructor.h1
-rw-r--r--ydb/services/bg_tasks/abstract/interface.h2
43 files changed, 1044 insertions, 346 deletions
diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt
index 6500d3169e8..c353e465b4d 100644
--- a/.github/config/muted_ya.txt
+++ b/.github/config/muted_ya.txt
@@ -39,6 +39,7 @@ ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleSplitsWithRestartsWhenWait
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.TableReshardingConsistency64
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.TableReshardingModuloN
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.UpsertWhileSplitTest
+ydb/core/kqp/ut/olap KqpOlapJson.BloomIndexesVariants
ydb/core/kqp/ut/olap KqpOlapSysView.StatsSysViewBytesDictActualization
ydb/core/kqp/ut/olap KqpOlapSysView.StatsSysViewBytesDictStatActualization
ydb/core/kqp/ut/olap KqpOlapWrite.TierDraftsGCWithRestart
diff --git a/ydb/core/formats/arrow/accessor/sub_columns/stats.h b/ydb/core/formats/arrow/accessor/sub_columns/stats.h
index 371634eb147..176c223d468 100644
--- a/ydb/core/formats/arrow/accessor/sub_columns/stats.h
+++ b/ydb/core/formats/arrow/accessor/sub_columns/stats.h
@@ -24,6 +24,14 @@ private:
std::shared_ptr<arrow::UInt8Array> AccessorType;
public:
+ ui32 GetFilledValuesCount() const {
+ ui32 result = 0;
+ for (ui32 i = 0; i < (ui32)DataRecordsCount->length(); ++i) {
+ result += DataRecordsCount->Value(i);
+ }
+ return result;
+ }
+
NJson::TJsonValue DebugJson() const {
NJson::TJsonValue result = NJson::JSON_MAP;
result.InsertValue("key_names", NArrow::DebugJson(DataNames, 1000000, 1000000)["data"]);
diff --git a/ydb/core/kqp/ut/olap/indexes_ut.cpp b/ydb/core/kqp/ut/olap/indexes_ut.cpp
index 7a0708a031f..db511d91475 100644
--- a/ydb/core/kqp/ut/olap/indexes_ut.cpp
+++ b/ydb/core/kqp/ut/olap/indexes_ut.cpp
@@ -93,7 +93,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
auto alterQuery =
TStringBuilder() <<
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_resource_id, TYPE=BLOOM_FILTER,
- FEATURES=`{"column_names" : ["resource_id", "level"], "false_positive_probability" : 0.05}`);
+ FEATURES=`{"column_names" : ["resource_id"], "false_positive_probability" : 0.05}`);
)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
@@ -128,6 +128,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
Cerr << csController->GetIndexesSkippingOnSelect().Val() << " / " << csController->GetIndexesApprovedOnSelect().Val() << Endl;
CompareYson(result, R"([[0u;]])");
AFL_VERIFY(csController->GetIndexesSkippedNoData().Val() == 0);
+ AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() == 0);
AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() < csController->GetIndexesSkippingOnSelect().Val())
("approve", csController->GetIndexesApprovedOnSelect().Val())("skip", csController->GetIndexesSkippingOnSelect().Val());
}
@@ -432,7 +433,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
auto alterQuery =
TStringBuilder() << Sprintf(
R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_resource_id, TYPE=BLOOM_FILTER,
- FEATURES=`{"column_names" : ["resource_id", "level"], "false_positive_probability" : 0.05, "storage_id" : "%s"}`);
+ FEATURES=`{"column_names" : ["resource_id"], "false_positive_probability" : 0.05, "storage_id" : "%s"}`);
)",
StorageId.data());
auto session = tableClient.CreateSession().GetValueSync().GetSession();
@@ -474,7 +475,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() == 0);
AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() == 0);
- csController->WaitCompactions(TDuration::Seconds(25));
+ csController->WaitCompactions(TDuration::Seconds(5));
// important checker for control compactions (<=21) and control indexes constructed (>=21)
AFL_VERIFY(csController->GetCompactionStartedCounter().Val() == 3)("count", csController->GetCompactionStartedCounter().Val());
diff --git a/ydb/core/kqp/ut/olap/json_ut.cpp b/ydb/core/kqp/ut/olap/json_ut.cpp
index dccff1e6094..cace9eeb983 100644
--- a/ydb/core/kqp/ut/olap/json_ut.cpp
+++ b/ydb/core/kqp/ut/olap/json_ut.cpp
@@ -65,9 +65,21 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) {
class TSelectCommand: public ICommand {
private:
- const TString Command;
- const TString Compare;
+ TString Command;
+ TString Compare;
+ std::optional<ui64> ExpectIndexSkip;
+ std::optional<ui64> ExpectIndexNoData;
+ std::optional<ui64> ExpectIndexApprove;
+ ui64 IndexSkipStart = 0;
+ ui64 IndexNoDataStart = 0;
+ ui64 IndexApproveStart = 0;
+
virtual TConclusionStatus DoExecute(TKikimrRunner& kikimr) override {
+ auto controller = NYDBTest::TControllers::GetControllerAs<NYDBTest::NColumnShard::TController>();
+ AFL_VERIFY(controller);
+ IndexSkipStart = controller->GetIndexesSkippingOnSelect().Val();
+ IndexApproveStart = controller->GetIndexesApprovedOnSelect().Val();
+ IndexNoDataStart = controller->GetIndexesSkippedNoData().Val();
Cerr << "EXECUTE: " << Command << Endl;
auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
auto it = kikimr.GetQueryClient().StreamExecuteQuery(Command, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
@@ -78,14 +90,76 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) {
Cerr << "OUTPUT: " << output << Endl;
CompareYson(output, Compare);
}
+ const ui32 skip = controller->GetIndexesSkippingOnSelect().Val() - IndexSkipStart;
+ const ui32 noData = controller->GetIndexesSkippedNoData().Val() - IndexNoDataStart;
+ const ui32 approves = controller->GetIndexesApprovedOnSelect().Val() - IndexApproveStart;
+ Cerr << noData << "/" << skip << "/" << approves << Endl;
+ if (ExpectIndexSkip) {
+ AFL_VERIFY(skip == *ExpectIndexSkip)("expect", ExpectIndexSkip)("real", skip)(
+ "current", controller->GetIndexesSkippingOnSelect().Val())(
+ "pred", IndexSkipStart);
+ }
+ if (ExpectIndexNoData) {
+ AFL_VERIFY(noData == *ExpectIndexNoData)("expect", ExpectIndexNoData)("real", noData)(
+ "current", controller->GetIndexesSkippedNoData().Val())(
+ "pred", IndexNoDataStart);
+ }
+ if (ExpectIndexApprove) {
+ AFL_VERIFY(approves == *ExpectIndexApprove)("expect", ExpectIndexApprove)("real", approves)(
+ "current", controller->GetIndexesApprovedOnSelect().Val())("pred", IndexApproveStart);
+ }
return TConclusionStatus::Success();
}
public:
- TSelectCommand(const TString& command, const TString& compare)
- : Command(command)
- , Compare(compare) {
+ bool DeserializeFromString(const TString& info) {
+ auto lines = StringSplitter(info).SplitBySet("\n").ToList<TString>();
+ std::optional<ui32> state;
+ for (auto&& l : lines) {
+ l = Strip(l);
+ if (l.StartsWith("READ:")) {
+ l = l.substr(5);
+ state = 0;
+ } else if (l.StartsWith("EXPECTED:")) {
+ l = l.substr(9);
+ state = 1;
+ } else if (l.StartsWith("IDX_ND_SKIP_APPROVE:")) {
+ state = 2;
+ l = l.substr(20);
+ } else {
+ AFL_VERIFY(state)("line", l);
+ }
+
+ if (*state == 0) {
+ Command += l;
+ } else if (*state == 1) {
+ Compare += l;
+ } else if (*state == 2) {
+ auto idxExpectations = StringSplitter(l).SplitBySet(" ,.;").SkipEmpty().ToList<TString>();
+ AFL_VERIFY(idxExpectations.size() == 3)("size", idxExpectations.size())("string", l);
+ if (idxExpectations[0] != "{}") {
+ ui32 res;
+ AFL_VERIFY(TryFromString<ui32>(idxExpectations[0], res))("string", l);
+ ExpectIndexNoData = res;
+ }
+ if (idxExpectations[1] != "{}") {
+ ui32 res;
+ AFL_VERIFY(TryFromString<ui32>(idxExpectations[1], res))("string", l);
+ ExpectIndexSkip = res;
+ }
+ if (idxExpectations[2] != "{}") {
+ ui32 res;
+ AFL_VERIFY(TryFromString<ui32>(idxExpectations[2], res))("string", l);
+ ExpectIndexApprove = res;
+ }
+ } else {
+ AFL_VERIFY(false)("line", l);
+ }
+ }
+ return true;
}
+
+ TSelectCommand() = default;
};
class TStopCompactionCommand: public ICommand {
@@ -167,9 +241,7 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) {
public:
TScriptExecutor(const std::vector<std::shared_ptr<ICommand>>& commands)
- : Commands(commands)
- {
-
+ : Commands(commands) {
}
void Execute() {
NKikimrConfig::TAppConfig appConfig;
@@ -198,23 +270,9 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) {
command = command.substr(5);
return std::make_shared<TDataCommand>(command);
} else if (command.StartsWith("READ:")) {
- auto lines = StringSplitter(command.substr(5)).SplitBySet("\n").ToList<TString>();
- int step = 0;
- TString request;
- TString expectation;
- for (auto&& i : lines) {
- i = Strip(i);
- if (i.StartsWith("EXPECTED:")) {
- step = 1;
- i = i.substr(9);
- }
- if (step == 0) {
- request += i;
- } else if (step == 1) {
- expectation += i;
- }
- }
- return std::make_shared<TSelectCommand>(request, expectation);
+ auto result = std::make_shared<TSelectCommand>();
+ AFL_VERIFY(result->DeserializeFromString(command));
+ return result;
} else if (command.StartsWith("WAIT_COMPACTION")) {
return std::make_shared<TWaitCompactionCommand>();
} else if (command.StartsWith("STOP_COMPACTION")) {
@@ -290,7 +348,6 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) {
for (auto&& i : Scripts) {
i.Execute();
}
-
}
};
@@ -636,7 +693,7 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) {
)";
TScriptVariator(script).Execute();
}
-/*
+
Y_UNIT_TEST(BloomIndexesVariants) {
TString script = R"(
STOP_COMPACTION
@@ -648,7 +705,7 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) {
PRIMARY KEY (Col1)
)
PARTITION BY HASH(Col1)
- WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = $$1|2$$);
+ WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = $$2$$);
------
SCHEMA:
ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=UPSERT_OPTIONS, `SCAN_READER_POLICY_NAME`=`SIMPLE`)
@@ -657,34 +714,56 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) {
ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=ALTER_COLUMN, NAME=Col2, `DATA_ACCESSOR_CONSTRUCTOR.CLASS_NAME`=`SUB_COLUMNS`,
`COLUMNS_LIMIT`=`$$0|1|1024$$`, `SPARSED_DETECTOR_KFF`=`$$0|10|1000$$`, `MEM_LIMIT_CHUNK`=`$$0|100|1000000$$`, `OTHERS_ALLOWED_FRACTION`=`$$0|0.5$$`)
------
- SCHEMA:
- ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=ALTER_COLUMN, NAME=Col2, `DATA_ACCESSOR_CONSTRUCTOR.CLASS_NAME`=`SUB_COLUMNS`,
- `COLUMNS_LIMIT`=`$$0|1|1024$$`, `SPARSED_DETECTOR_KFF`=`$$0|10|1000$$`, `MEM_LIMIT_CHUNK`=`$$0|100|1000000$$`, `OTHERS_ALLOWED_FRACTION`=`$$0|0.5$$`)
- ------
DATA:
- REPLACE INTO `/Root/ColumnTable` (Col1, Col2) VALUES(1u, JsonDocument('{"a" : "a1"}')), (2u, JsonDocument('{"a" : "a2"}')),
- (3u, JsonDocument('{"b" : "b3"}')), (4u, JsonDocument('{"b" : "b4", "a" : "a4"}'))
+ REPLACE INTO `/Root/ColumnTable` (Col1, Col2) VALUES(1u, JsonDocument('{"a.b.c" : "a1"}')), (2u, JsonDocument('{"a.b.c" : "a2"}')),
+ (3u, JsonDocument('{"b.c.d" : "b3"}')), (4u, JsonDocument('{"b.c.d" : "b4", "a" : "a4"}'))
------
DATA:
- REPLACE INTO `/Root/ColumnTable` (Col1, Col2) VALUES(11u, JsonDocument('{"a" : "1a1"}')), (12u, JsonDocument('{"a" : "1a2"}')),
- (13u, JsonDocument('{"b" : "1b3"}')), (14u, JsonDocument('{"b" : "1b4", "a" : "a4"}'))
+ REPLACE INTO `/Root/ColumnTable` (Col1, Col2) VALUES(11u, JsonDocument('{"a.b.c" : "1a1"}')), (12u, JsonDocument('{"a.b.c" : "1a2"}')),
+ (13u, JsonDocument('{"b.c.d" : "1b3"}')), (14u, JsonDocument('{"b.c.d" : "1b4", "a" : "a4"}'))
------
SCHEMA:
ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=UPSERT_INDEX, NAME=a_index, TYPE=BLOOM_FILTER,
- FEATURES=`{"column_names" : ["Col2"], "data_extractor" : {"class_name" : "SUB_COLUMN", "sub_column_name" : "a"}, "false_positive_probability" : 0.05}`)
+ FEATURES=`{"column_names" : ["Col2"], "false_positive_probability" : 0.01}`)
+ ------
+ SCHEMA:
+ ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=UPSERT_INDEX, NAME=index_ngramm_b, TYPE=BLOOM_NGRAMM_FILTER,
+ FEATURES=`{"column_name" : "Col2", "ngramm_size" : 3, "hashes_count" : 2, "filter_size_bytes" : 4096,
+ "records_count" : 1024, "data_extractor" : {"class_name" : "SUB_COLUMN", "sub_column_name" : "b.c.d"}}`);
------
DATA:
REPLACE INTO `/Root/ColumnTable` (Col1) VALUES(10u)
------
ONE_ACTUALIZATION
------
- READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.a") = "1a1" ORDER BY Col1;
- EXPECTED: [[11u;["{\"a\":\"1a1\"}"]]]
+ READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"a.b.c\"") = "a1" ORDER BY Col1;
+ EXPECTED: [[1u;["{\"a.b.c\":\"a1\"}"]]]
+ IDX_ND_SKIP_APPROVE: 0, 3, 1
+ ------
+ READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"a.b.c\"") = "1a1" ORDER BY Col1;
+ EXPECTED: [[11u;["{\"a.b.c\":\"1a1\"}"]]]
+ IDX_ND_SKIP_APPROVE: 0, 3, 1
+ ------
+ READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d\"") = "1b4" ORDER BY Col1;
+ EXPECTED: [[14u;["{\"a\":\"a4\",\"b.c.d\":\"1b4\"}"]]]
+ IDX_ND_SKIP_APPROVE: 0, 3, 1
+ ------
+ READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d\"") = "1b5" ORDER BY Col1;
+ EXPECTED: []
+ IDX_ND_SKIP_APPROVE: 0, 4, 0
+ ------
+ READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d\"") like "1b3" ORDER BY Col1;
+ EXPECTED: [[13u;["{\"b.c.d\":\"1b3\"}"]]]
+ IDX_ND_SKIP_APPROVE: 0, 3, 1
+ ------
+ READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d\"") like "1b5" ORDER BY Col1;
+ EXPECTED: []
+ IDX_ND_SKIP_APPROVE: 0, 4, 0
)";
TScriptVariator(script).Execute();
}
-*/
+
Y_UNIT_TEST(SwitchAccessorCompactionVariants) {
TString script = R"(
STOP_COMPACTION
@@ -765,7 +844,6 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) {
)";
TScriptVariator(script).Execute();
}
-
}
} // namespace NKikimr::NKqp
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index d63b0e157d4..8fd5bd10f6a 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -388,9 +388,25 @@ message TOlapColumnDescription {
optional string ColumnFamilyName = 15;
}
+message TIndexDataExtractor {
+ optional string ClassName = 1;
+ message TDefault {
+ }
+
+ message TSubColumn {
+ optional string SubColumnName = 1;
+ }
+
+ oneof Implementation {
+ TDefault Default = 20;
+ TSubColumn SubColumn = 21;
+ }
+}
+
message TRequestedBloomFilter {
optional double FalsePositiveProbability = 1 [default = 0.1];
repeated string ColumnNames = 3;
+ optional TIndexDataExtractor DataExtractor = 4;
}
message TRequestedBloomNGrammFilter {
@@ -399,6 +415,7 @@ message TRequestedBloomNGrammFilter {
optional uint32 HashesCount = 3;
optional string ColumnName = 4;
optional uint32 RecordsCount = 5;
+ optional TIndexDataExtractor DataExtractor = 6;
}
message TRequestedMaxIndex {
@@ -428,6 +445,7 @@ message TBloomFilter {
optional double FalsePositiveProbability = 1 [default = 0.1];
optional uint64 MaxBytesCount = 2 [default = 8196];
repeated uint32 ColumnIds = 3;
+ optional TIndexDataExtractor DataExtractor = 4;
}
message TBloomNGrammFilter {
@@ -436,6 +454,7 @@ message TBloomNGrammFilter {
optional uint32 HashesCount = 3;
optional uint32 ColumnId = 4;
optional uint32 RecordsCount = 5;
+ optional TIndexDataExtractor DataExtractor = 6;
}
message TMaxIndex {
@@ -1204,7 +1223,7 @@ message TBackupTask {
// currently available only for s3:
optional TCompressionOptions Compression = 13;
- optional bool EnableChecksums = 16;
+ optional bool EnableChecksums = 16;
optional bool EnablePermissions = 18;
}
diff --git a/ydb/core/tx/columnshard/bg_tasks/abstract/status_channel.h b/ydb/core/tx/columnshard/bg_tasks/abstract/status_channel.h
index 2ab8c1f4180..47303000b82 100644
--- a/ydb/core/tx/columnshard/bg_tasks/abstract/status_channel.h
+++ b/ydb/core/tx/columnshard/bg_tasks/abstract/status_channel.h
@@ -78,7 +78,7 @@ public:
using TBase::TBase;
bool DeserializeFromString(const TString& data) {
if (!TBase::DeserializeFromString(data)) {
- Initialize(TFakeStatusChannel::GetClassNameStatic());
+ AFL_VERIFY(Initialize(TFakeStatusChannel::GetClassNameStatic()));
return false;
}
return true;
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/common.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/common.cpp
new file mode 100644
index 00000000000..f1f760ca89a
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/common.cpp
@@ -0,0 +1,38 @@
+#include "common.h"
+
+#include <ydb/library/actors/core/log.h>
+
+#include <contrib/libs/xxhash/xxhash.h>
+#include <util/string/builder.h>
+
+namespace NKikimr::NOlap::NIndexes::NRequest {
+
+TString TNodeId::ToString() const {
+ return TStringBuilder() << "[" << ColumnId << "." << GenerationId << "." << NodeType << "]";
+}
+
+TNodeId TNodeId::Original(const ui32 columnId, const TString& subColumnName) {
+ AFL_VERIFY(columnId);
+ TNodeId result(columnId, Counter.Inc(), ENodeType::OriginalColumn);
+ result.SubColumnName = subColumnName;
+ return result;
+}
+
+TOriginalDataAddress TNodeId::BuildOriginalDataAddress() const {
+ AFL_VERIFY(NodeType == ENodeType::OriginalColumn);
+ return TOriginalDataAddress(ColumnId, SubColumnName);
+}
+
+TString TOriginalDataAddress::DebugString() const {
+ if (SubColumnName) {
+ return TStringBuilder() << "{cId=" << ColumnId << ";sub=" << SubColumnName << "}";
+ } else {
+ return ::ToString(ColumnId);
+ }
+}
+
+ui64 TOriginalDataAddress::CalcSubColumnHash(const std::string_view sv) {
+ return XXH3_64bits(sv.data(), sv.size());
+}
+
+} // namespace NKikimr::NOlap::NIndexes::NRequest
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/common.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/common.h
new file mode 100644
index 00000000000..78e6e457ef8
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/common.h
@@ -0,0 +1,105 @@
+#pragma once
+#include <ydb/library/accessor/accessor.h>
+
+#include <util/digest/fnv.h>
+#include <util/digest/numeric.h>
+#include <util/generic/refcount.h>
+#include <util/generic/string.h>
+
+namespace NKikimr::NOlap::NIndexes::NRequest {
+
+enum class ENodeType : ui32 {
+ Aggregation,
+ OriginalColumn,
+ SubColumn,
+ Root,
+ Operation,
+ Constant
+};
+
+class TOriginalDataAddress {
+private:
+ YDB_READONLY(ui32, ColumnId, 0);
+ YDB_READONLY_DEF(TString, SubColumnName);
+
+public:
+ static ui64 CalcSubColumnHash(const std::string_view sv);
+
+ static ui64 CalcSubColumnHash(const TString& path) {
+ return CalcSubColumnHash(std::string_view(path.data(), path.size()));
+ }
+
+ explicit TOriginalDataAddress(const ui32 columnId, const TString& subColumnName = "")
+ : ColumnId(columnId)
+ , SubColumnName(subColumnName) {
+ }
+
+ bool operator==(const TOriginalDataAddress& item) const {
+ return std::tie(ColumnId, SubColumnName) == std::tie(item.ColumnId, item.SubColumnName);
+ }
+
+ explicit operator size_t() const {
+ if (SubColumnName) {
+ return CombineHashes<ui64>(ColumnId, FnvHash<ui64>(SubColumnName.data(), SubColumnName.size()));
+ } else {
+ return ColumnId;
+ }
+ }
+
+ TString DebugString() const;
+};
+
+class TNodeId {
+private:
+ YDB_READONLY(ui32, ColumnId, 0);
+ YDB_READONLY_DEF(TString, SubColumnName);
+ YDB_READONLY(ui32, GenerationId, 0);
+ YDB_READONLY(ENodeType, NodeType, ENodeType::OriginalColumn);
+
+ static inline TAtomicCounter Counter = 0;
+
+ TNodeId(const ui32 columnId, const ui32 generationId, const ENodeType type)
+ : ColumnId(columnId)
+ , GenerationId(generationId)
+ , NodeType(type) {
+ }
+
+public:
+ bool operator==(const TNodeId& item) const {
+ return ColumnId == item.ColumnId && GenerationId == item.GenerationId && NodeType == item.NodeType &&
+ SubColumnName == item.SubColumnName;
+ }
+
+ TOriginalDataAddress BuildOriginalDataAddress() const;
+
+ TNodeId BuildCopy() const {
+ return TNodeId(ColumnId, Counter.Inc(), NodeType);
+ }
+
+ TString ToString() const;
+
+ static TNodeId RootNodeId() {
+ return TNodeId(0, 0, ENodeType::Root);
+ }
+
+ static TNodeId Constant(const ui32 columnId) {
+ return TNodeId(columnId, Counter.Inc(), ENodeType::Constant);
+ }
+
+ static TNodeId Original(const ui32 columnId, const TString& subColumnName = "");
+
+ static TNodeId Aggregation() {
+ return TNodeId(0, Counter.Inc(), ENodeType::Aggregation);
+ }
+
+ static TNodeId Operation(const ui32 columnId) {
+ return TNodeId(columnId, Counter.Inc(), ENodeType::Operation);
+ }
+
+ bool operator<(const TNodeId& item) const {
+ return std::tie(ColumnId, GenerationId, NodeType, SubColumnName) <
+ std::tie(item.ColumnId, item.GenerationId, item.NodeType, item.SubColumnName);
+ }
+};
+
+} // namespace NKikimr::NOlap::NIndexes::NRequest
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/coverage.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/coverage.cpp
index 872f07414e1..a21520cc76e 100644
--- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/coverage.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/coverage.cpp
@@ -85,4 +85,21 @@ std::shared_ptr<NKikimr::NOlap::NIndexes::IIndexChecker> TBranchCoverage::GetAnd
return std::make_shared<TAndIndexChecker>(Indexes);
}
+NJson::TJsonValue TBranchCoverage::DebugJson() const {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ if (Equals.size()) {
+ auto& jsonEquals = result.InsertValue("equals", NJson::JSON_MAP);
+ for (auto&& i : Equals) {
+ jsonEquals.InsertValue(i.first.DebugString(), i.second ? i.second->ToString() : "NULL");
+ }
+ }
+ if (Likes.size()) {
+ auto& jsonLikes = result.InsertValue("likes", NJson::JSON_MAP);
+ for (auto&& i : Likes) {
+ jsonLikes.InsertValue(i.first.DebugString(), i.second.DebugJson());
+ }
+ }
+ return result;
+}
+
} // namespace NKikimr::NOlap::NIndexes::NRequest
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/coverage.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/coverage.h
index f568f28b564..b6f37e0771e 100644
--- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/coverage.h
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/coverage.h
@@ -1,5 +1,6 @@
#pragma once
#include "checker.h"
+#include "common.h"
#include "like.h"
#include <ydb/core/tx/program/program.h>
@@ -10,21 +11,22 @@ namespace NKikimr::NOlap::NIndexes::NRequest {
class TBranchCoverage {
private:
- THashMap<ui32, std::shared_ptr<arrow::Scalar>> Equals;
- THashMap<ui32, TLikeDescription> Likes;
+ THashMap<TOriginalDataAddress, std::shared_ptr<arrow::Scalar>> Equals;
+ THashMap<TOriginalDataAddress, TLikeDescription> Likes;
YDB_ACCESSOR_DEF(std::vector<std::shared_ptr<IIndexChecker>>, Indexes);
public:
- TBranchCoverage(const THashMap<ui32, std::shared_ptr<arrow::Scalar>>& equals, const THashMap<ui32, TLikeDescription>& likes)
+ TBranchCoverage(const THashMap<TOriginalDataAddress, std::shared_ptr<arrow::Scalar>>& equals,
+ const THashMap<TOriginalDataAddress, TLikeDescription>& likes)
: Equals(equals)
, Likes(likes) {
}
- const THashMap<ui32, std::shared_ptr<arrow::Scalar>>& GetEquals() const {
+ const THashMap<TOriginalDataAddress, std::shared_ptr<arrow::Scalar>>& GetEquals() const {
return Equals;
}
- const THashMap<ui32, TLikeDescription>& GetLikes() const {
+ const THashMap<TOriginalDataAddress, TLikeDescription>& GetLikes() const {
return Likes;
}
@@ -34,22 +36,7 @@ public:
return DebugJson().GetStringRobust();
}
- NJson::TJsonValue DebugJson() const {
- NJson::TJsonValue result = NJson::JSON_MAP;
- if (Equals.size()) {
- auto& jsonEquals = result.InsertValue("equals", NJson::JSON_MAP);
- for (auto&& i : Equals) {
- jsonEquals.InsertValue(::ToString(i.first), i.second ? i.second->ToString() : "NULL");
- }
- }
- if (Likes.size()) {
- auto& jsonLikes = result.InsertValue("likes", NJson::JSON_MAP);
- for (auto&& i : Likes) {
- jsonLikes.InsertValue(::ToString(i.first), i.second.DebugJson());
- }
- }
- return result;
- }
+ NJson::TJsonValue DebugJson() const;
};
class TDataForIndexesCheckers {
@@ -70,7 +57,8 @@ public:
return result;
}
- void AddBranch(const THashMap<ui32, std::shared_ptr<arrow::Scalar>>& equalsData, const THashMap<ui32, TLikeDescription>& likesData) {
+ void AddBranch(const THashMap<TOriginalDataAddress, std::shared_ptr<arrow::Scalar>>& equalsData,
+ const THashMap<TOriginalDataAddress, TLikeDescription>& likesData) {
Branches.emplace_back(std::make_shared<TBranchCoverage>(equalsData, likesData));
}
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/tree.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/tree.cpp
index f6625452c12..b45f93bb0f7 100644
--- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/tree.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/tree.cpp
@@ -9,15 +9,6 @@
namespace NKikimr::NOlap::NIndexes::NRequest {
-TString TNodeId::ToString() const {
- return TStringBuilder() << "[" << ColumnId << "." << GenerationId << "." << NodeType << "]";
-}
-
-TNodeId TNodeId::Original(const ui32 columnId) {
- AFL_VERIFY(columnId);
- return TNodeId(columnId, Counter.Inc(), ENodeType::OriginalColumn);
-}
-
std::shared_ptr<IRequestNode> IRequestNode::Copy() const {
auto selfCopy = DoCopy();
selfCopy->Parent = nullptr;
@@ -94,24 +85,24 @@ NJson::TJsonValue TPackAnd::DoSerializeToJson() const {
auto& arrJson = result.InsertValue("equals", NJson::JSON_ARRAY);
for (auto&& i : Equals) {
auto& jsonCondition = arrJson.AppendValue(NJson::JSON_MAP);
- jsonCondition.InsertValue(::ToString(i.first), i.second->ToString());
+ jsonCondition.InsertValue(i.first.DebugString(), i.second->ToString());
}
}
{
auto& arrJson = result.InsertValue("likes", NJson::JSON_ARRAY);
for (auto&& i : Likes) {
auto& jsonCondition = arrJson.AppendValue(NJson::JSON_MAP);
- jsonCondition.InsertValue(::ToString(i.first), i.second.ToString());
+ jsonCondition.InsertValue(i.first.DebugString(), i.second.ToString());
}
}
return result;
}
-void TPackAnd::AddEqual(const ui32 columnId, const std::shared_ptr<arrow::Scalar>& value) {
+void TPackAnd::AddEqual(const TOriginalDataAddress& originalDataAddress, const std::shared_ptr<arrow::Scalar>& value) {
AFL_VERIFY(value);
- auto it = Equals.find(columnId);
+ auto it = Equals.find(originalDataAddress);
if (it == Equals.end()) {
- Equals.emplace(columnId, value);
+ Equals.emplace(originalDataAddress, value);
} else if (it->second->Equals(*value)) {
return;
} else {
@@ -129,7 +120,7 @@ bool TOperationNode::DoCollapse() {
}
if (Operation == NYql::TKernelRequestBuilder::EBinaryOp::Equals && Children.size() == 2 && Children[1]->Is<TConstantNode>() &&
Children[0]->Is<TOriginalColumn>()) {
- Parent->Exchange(GetNodeId(), std::make_shared<TPackAnd>(Children[0]->As<TOriginalColumn>()->GetNodeId().GetColumnId(),
+ Parent->Exchange(GetNodeId(), std::make_shared<TPackAnd>(Children[0]->As<TOriginalColumn>()->GetNodeId().BuildOriginalDataAddress(),
Children[1]->As<TConstantNode>()->GetConstant()));
return true;
}
@@ -150,7 +141,8 @@ bool TOperationNode::DoCollapse() {
}
AFL_VERIFY(op);
TLikePart likePart(*op, TString((const char*)scalarString->value->data(), scalarString->value->size()));
- Parent->Exchange(GetNodeId(), std::make_shared<TPackAnd>(Children[0]->As<TOriginalColumn>()->GetNodeId().GetColumnId(), likePart));
+ Parent->Exchange(
+ GetNodeId(), std::make_shared<TPackAnd>(Children[0]->As<TOriginalColumn>()->GetNodeId().BuildOriginalDataAddress(), likePart));
return true;
}
if (Operation == NYql::TKernelRequestBuilder::EBinaryOp::And) {
@@ -218,6 +210,18 @@ bool TOperationNode::DoCollapse() {
return false;
}
+bool TKernelNode::DoCollapse() {
+ if (KernelName == "JsonValue" && Children.size() == 2 && Children[1]->Is<TConstantNode>() && Children[0]->Is<TOriginalColumn>()) {
+ auto scalar = Children[1]->As<TConstantNode>()->GetConstant();
+ AFL_VERIFY(scalar->type->id() == arrow::binary()->id() || scalar->type->id() == arrow::utf8()->id())("type", scalar->type->ToString());
+ auto scalarString = static_pointer_cast<arrow::BinaryScalar>(scalar);
+ const TString jsonPath((const char*)scalarString->value->data(), scalarString->value->size());
+ Parent->Exchange(GetNodeId(), std::make_shared<TOriginalColumn>(Children[0]->As<TOriginalColumn>()->GetNodeId().GetColumnId(), jsonPath));
+ return true;
+ }
+ return false;
+}
+
bool TNormalForm::Add(const NArrow::NSSA::IResourceProcessor& processor, const TProgramContainer& program) {
if (processor.GetProcessorType() == NArrow::NSSA::EProcessorType::Filter) {
return true;
@@ -230,7 +234,8 @@ bool TNormalForm::Add(const NArrow::NSSA::IResourceProcessor& processor, const T
if (it == Nodes.end()) {
it = NodesGlobal.find(arg.GetColumnId());
if (it == NodesGlobal.end()) {
- AFL_CRIT(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "program_arg_is_missing")("program", program.DebugString());
+ AFL_CRIT(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "program_arg_is_missing")("program", program.DebugString())(
+ "column_id", arg.GetColumnId());
return false;
}
data = it->second->Copy();
@@ -259,6 +264,10 @@ bool TNormalForm::Add(const NArrow::NSSA::IResourceProcessor& processor, const T
processor.GetOutputColumnIdOnce(), (NYql::TKernelRequestBuilder::EBinaryOp)*calcProcessor->GetYqlOperationId(), argNodes);
Nodes.emplace(processor.GetOutputColumnIdOnce(), node);
NodesGlobal.emplace(processor.GetOutputColumnIdOnce(), node);
+ } else if (calcProcessor->GetKernelLogic()) {
+ auto node = std::make_shared<TKernelNode>(processor.GetOutputColumnIdOnce(), calcProcessor->GetKernelLogic()->GetClassName(), argNodes);
+ Nodes.emplace(processor.GetOutputColumnIdOnce(), node);
+ NodesGlobal.emplace(processor.GetOutputColumnIdOnce(), node);
}
} else {
return false;
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/tree.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/tree.h
index 3b68300d4ba..655e7444302 100644
--- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/tree.h
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/tree.h
@@ -1,4 +1,5 @@
#pragma once
+#include "common.h"
#include "like.h"
#include <ydb/core/formats/arrow/program/abstract.h>
@@ -12,62 +13,6 @@
namespace NKikimr::NOlap::NIndexes::NRequest {
-enum class ENodeType : ui32 {
- Aggregation,
- OriginalColumn,
- Root,
- Operation,
- Constant
-};
-
-class TNodeId {
-private:
- YDB_READONLY(ui32, ColumnId, 0);
- YDB_READONLY(ui32, GenerationId, 0);
- YDB_READONLY(ENodeType, NodeType, ENodeType::OriginalColumn);
-
- static inline TAtomicCounter Counter = 0;
-
- TNodeId(const ui32 columnId, const ui32 generationId, const ENodeType type)
- : ColumnId(columnId)
- , GenerationId(generationId)
- , NodeType(type) {
- }
-
-public:
- bool operator==(const TNodeId& item) const {
- return ColumnId == item.ColumnId && GenerationId == item.GenerationId && NodeType == item.NodeType;
- }
-
- TNodeId BuildCopy() const {
- return TNodeId(ColumnId, Counter.Inc(), NodeType);
- }
-
- TString ToString() const;
-
- static TNodeId RootNodeId() {
- return TNodeId(0, 0, ENodeType::Root);
- }
-
- static TNodeId Constant(const ui32 columnId) {
- return TNodeId(columnId, Counter.Inc(), ENodeType::Constant);
- }
-
- static TNodeId Original(const ui32 columnId);
-
- static TNodeId Aggregation() {
- return TNodeId(0, Counter.Inc(), ENodeType::Aggregation);
- }
-
- static TNodeId Operation(const ui32 columnId) {
- return TNodeId(columnId, Counter.Inc(), ENodeType::Operation);
- }
-
- bool operator<(const TNodeId& item) const {
- return std::tie(ColumnId, GenerationId, NodeType) < std::tie(item.ColumnId, item.GenerationId, item.NodeType);
- }
-};
-
class IRequestNode {
protected:
TNodeId NodeId;
@@ -211,16 +156,16 @@ protected:
}
public:
- TOriginalColumn(const ui32 columnId)
- : TBase(TNodeId::Original(columnId)) {
+ TOriginalColumn(const ui32 columnId, const TString& subColumnName = "")
+ : TBase(TNodeId::Original(columnId, subColumnName)) {
}
};
class TPackAnd: public IRequestNode {
private:
using TBase = IRequestNode;
- THashMap<ui32, std::shared_ptr<arrow::Scalar>> Equals;
- THashMap<ui32, TLikeDescription> Likes;
+ THashMap<TOriginalDataAddress, std::shared_ptr<arrow::Scalar>> Equals;
+ THashMap<TOriginalDataAddress, TLikeDescription> Likes;
bool IsEmptyFlag = false;
protected:
@@ -235,32 +180,32 @@ protected:
public:
TPackAnd(const TPackAnd&) = default;
- TPackAnd(const ui32 columnId, const std::shared_ptr<arrow::Scalar>& value)
+ TPackAnd(const TOriginalDataAddress& originalDataAddress, const std::shared_ptr<arrow::Scalar>& value)
: TBase(TNodeId::Aggregation()) {
- AddEqual(columnId, value);
+ AddEqual(originalDataAddress, value);
}
- TPackAnd(const ui32 columnId, const TLikePart& part)
+ TPackAnd(const TOriginalDataAddress& originalDataAddress, const TLikePart& part)
: TBase(TNodeId::Aggregation()) {
- AddLike(columnId, TLikeDescription(part));
+ AddLike(originalDataAddress, TLikeDescription(part));
}
- const THashMap<ui32, std::shared_ptr<arrow::Scalar>>& GetEquals() const {
+ const THashMap<TOriginalDataAddress, std::shared_ptr<arrow::Scalar>>& GetEquals() const {
return Equals;
}
- const THashMap<ui32, TLikeDescription>& GetLikes() const {
+ const THashMap<TOriginalDataAddress, TLikeDescription>& GetLikes() const {
return Likes;
}
bool IsEmpty() const {
return IsEmptyFlag;
}
- void AddEqual(const ui32 columnId, const std::shared_ptr<arrow::Scalar>& value);
- void AddLike(const ui32 columnId, const TLikeDescription& value) {
- auto it = Likes.find(columnId);
+ void AddEqual(const TOriginalDataAddress& originalDataAddress, const std::shared_ptr<arrow::Scalar>& value);
+ void AddLike(const TOriginalDataAddress& originalDataAddress, const TLikeDescription& value) {
+ auto it = Likes.find(originalDataAddress);
if (it == Likes.end()) {
- Likes.emplace(columnId, value);
+ Likes.emplace(originalDataAddress, value);
} else {
it->second.Merge(value);
}
@@ -309,6 +254,39 @@ public:
}
};
+class TKernelNode: public IRequestNode {
+private:
+ using TBase = IRequestNode;
+ const TString KernelName;
+
+protected:
+ virtual NJson::TJsonValue DoSerializeToJson() const override {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ result.InsertValue("type", "operation");
+ result.InsertValue("kernel_name", KernelName);
+ return result;
+ }
+
+ virtual bool DoCollapse() override;
+ virtual std::shared_ptr<IRequestNode> DoCopy() const override {
+ std::vector<std::shared_ptr<IRequestNode>> children;
+ return std::make_shared<TKernelNode>(GetNodeId().GetColumnId(), KernelName, children);
+ }
+
+public:
+ const TString GetKernelName() const {
+ return KernelName;
+ }
+
+ TKernelNode(const ui32 columnId, const TString kernelName, const std::vector<std::shared_ptr<IRequestNode>>& args)
+ : TBase(TNodeId::Operation(columnId))
+ , KernelName(kernelName) {
+ for (auto&& i : args) {
+ Attach(i);
+ }
+ }
+};
+
class TNormalForm {
private:
std::map<ui32, std::shared_ptr<IRequestNode>> Nodes;
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/ut/ut_program.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/ut/ut_program.cpp
index 3e356f50804..fc8c384cad6 100644
--- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/ut/ut_program.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/ut/ut_program.cpp
@@ -47,7 +47,8 @@ Y_UNIT_TEST_SUITE(TestProgramBloomCoverage) {
AFL_VERIFY(coverage);
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("coverage", coverage->DebugString());
AFL_VERIFY(coverage->GetBranches().size() == 1)("coverage", coverage->DebugString());
- AFL_VERIFY(coverage->GetBranches().front()->DebugString() == R"({"likes":{"7":{"sequences":["%amet."]}}})")("coverage", coverage->DebugString());
+ AFL_VERIFY(coverage->GetBranches().front()->DebugString() == R"({"likes":{"7":{"sequences":["%amet."]}}})")(
+ "coverage", coverage->DebugString());
}
}
@@ -164,4 +165,32 @@ Y_UNIT_TEST_SUITE(TestProgramBloomCoverage) {
AFL_VERIFY(coverage->GetBranches()[0]->DebugString() = R"({"likes":{"9":{"sequences":["%like_string"]},"7":{"sequences":["%like_string"]}},"equals":{"9":"equals_string","7":"equals_string"}})");
}
}
+
+ Y_UNIT_TEST(JsonSubColumnUsage) {
+ TIndexInfo indexInfo = BuildTableInfo(testColumns, testKey);
+ NReader::NCommon::TIndexColumnResolver columnResolver(indexInfo);
+
+ TProgramProtoBuilder builder;
+ const auto idPathString1 = builder.AddConstant("json.path1");
+ const auto idPathString2 = builder.AddConstant("json.path2");
+ const auto idColumn = columnResolver.GetColumnIdVerified("json_binary");
+ const auto idJsonValue1 = builder.AddOperation("JsonValue", { idColumn, idPathString1 });
+ const auto idJsonValue2 = builder.AddOperation("JsonValue", { idColumn, idPathString2 });
+
+ const auto idLikeString = builder.AddConstant("like_string");
+ const auto idEqualString = builder.AddConstant("equals_string");
+ const auto idEndsWith1 = builder.AddOperation(NYql::TKernelRequestBuilder::EBinaryOp::EndsWith, { idJsonValue1, idLikeString });
+ const auto idEquals1 = builder.AddOperation(NYql::TKernelRequestBuilder::EBinaryOp::Equals, { idJsonValue2, idEqualString });
+ const auto idFilter1 = builder.AddOperation(NYql::TKernelRequestBuilder::EBinaryOp::And, { idEndsWith1, idEquals1 });
+ builder.AddFilter(idFilter1);
+ {
+ TProgramContainer program;
+ program.Init(columnResolver, builder.FinishProto()).Validate();
+ auto coverage = NOlap::NIndexes::NRequest::TDataForIndexesCheckers::Build(program);
+ AFL_VERIFY(coverage);
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("coverage", coverage->DebugString());
+ AFL_VERIFY(coverage->GetBranches().size() == 1);
+ AFL_VERIFY(coverage->GetBranches().front()->DebugString() == R"({"likes":{"{cId=6;sub=json.path1}":{"sequences":["%like_string"]}},"equals":{"{cId=6;sub=json.path2}":"equals_string"}})");
+ }
+ }
}
diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/ya.make b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/ya.make
index 935fdb80b44..d764849e301 100644
--- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/ya.make
+++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/ya.make
@@ -13,6 +13,7 @@ SRCS(
tree.cpp
coverage.cpp
like.cpp
+ common.cpp
)
PEERDIR(
@@ -22,7 +23,7 @@ PEERDIR(
ydb/core/formats/arrow/program
)
-GENERATE_ENUM_SERIALIZATION(tree.h)
+GENERATE_ENUM_SERIALIZATION(common.h)
YQL_LAST_ABI_VERSION()
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/checker.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/checker.cpp
index 1afa71281e5..93cb8029a43 100644
--- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/checker.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/checker.cpp
@@ -23,6 +23,7 @@ bool TBloomFilterChecker::DoCheckImpl(const std::vector<TString>& blobs) const {
break;
}
}
+// Cerr << bits.DebugString() << Endl;
if (found) {
// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("size", bArray.length())("data", bArray.ToString())("index_id", GetIndexId());
return true;
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/checker.h b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/checker.h
index 8192a2fb8cf..58b55d012ad 100644
--- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/checker.h
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/checker.h
@@ -43,6 +43,9 @@ public:
ui32 count1 = 0;
ui32 count0 = 0;
for (ui32 i = 0; i < GetSizeBits(); ++i) {
+// if (i % 20 == 0 && i) {
+// sb << i << " ";
+// }
if (Get(i)) {
// sb << 1 << " ";
++count1;
@@ -50,9 +53,6 @@ public:
// sb << 0 << " ";
++count0;
}
-// if (i % 20 == 0) {
-// sb << i << " ";
-// }
}
sb << GetSizeBits() << "=" << count0 << "[0]+" << count1 << "[1]";
return sb;
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/constructor.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/constructor.cpp
index fa11002fe17..83922c63e3d 100644
--- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/constructor.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/constructor.cpp
@@ -1,11 +1,13 @@
#include "constructor.h"
#include "meta.h"
+#include <ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/default.h>
#include <ydb/core/tx/schemeshard/olap/schema/schema.h>
namespace NKikimr::NOlap::NIndexes {
-std::shared_ptr<NKikimr::NOlap::NIndexes::IIndexMeta> TBloomIndexConstructor::DoCreateIndexMeta(const ui32 indexId, const TString& indexName, const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const {
+std::shared_ptr<IIndexMeta> TBloomIndexConstructor::DoCreateIndexMeta(
+ const ui32 indexId, const TString& indexName, const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const {
std::set<ui32> columnIds;
for (auto&& i : ColumnNames) {
auto* columnInfo = currentSchema.GetColumns().GetByName(i);
@@ -15,7 +17,9 @@ std::shared_ptr<NKikimr::NOlap::NIndexes::IIndexMeta> TBloomIndexConstructor::Do
}
AFL_VERIFY(columnIds.emplace(columnInfo->GetId()).second);
}
- return std::make_shared<TBloomIndexMeta>(indexId, indexName, GetStorageId().value_or(NBlobOperations::TGlobal::DefaultStorageId), columnIds, FalsePositiveProbability);
+ AFL_VERIFY(columnIds.size() == 1);
+ return std::make_shared<TBloomIndexMeta>(indexId, indexName, GetStorageId().value_or(NBlobOperations::TGlobal::DefaultStorageId), *columnIds.begin(),
+ FalsePositiveProbability, std::make_shared<TDefaultDataExtractor>());
}
NKikimr::TConclusionStatus TBloomIndexConstructor::DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) {
@@ -28,13 +32,23 @@ NKikimr::TConclusionStatus TBloomIndexConstructor::DoDeserializeFromJson(const N
}
for (auto&& i : *columnNamesArray) {
if (!i.IsString()) {
- return TConclusionStatus::Fail("column_names have to be in bloom filter features as array of strings ['column_name_1', ... , 'column_name_N']");
+ return TConclusionStatus::Fail(
+ "column_names have to be in bloom filter features as array of strings ['column_name_1', ... , 'column_name_N']");
}
ColumnNames.emplace(i.GetString());
}
+ if (ColumnNames.size() != 1) {
+ return TConclusionStatus::Fail("column_names count possible only 1 temporary");
+ }
if (!jsonInfo["false_positive_probability"].IsDouble()) {
return TConclusionStatus::Fail("false_positive_probability have to be in bloom filter features as double field");
}
+ {
+ auto conclusion = DataExtractor.DeserializeFromJson(jsonInfo["data_extractor"]);
+ if (conclusion.IsFail()) {
+ return conclusion;
+ }
+ }
FalsePositiveProbability = jsonInfo["false_positive_probability"].GetDouble();
if (FalsePositiveProbability < 0.01 || FalsePositiveProbability >= 1) {
return TConclusionStatus::Fail("false_positive_probability have to be in bloom filter features as double field in interval [0.01, 1)");
@@ -50,6 +64,9 @@ NKikimr::TConclusionStatus TBloomIndexConstructor::DoDeserializeFromProto(const
}
auto& bFilter = proto.GetBloomFilter();
FalsePositiveProbability = bFilter.GetFalsePositiveProbability();
+ if (!DataExtractor.DeserializeFromProto(bFilter.GetDataExtractor())) {
+ return TConclusionStatus::Fail("cannot parse data extractor: " + bFilter.GetDataExtractor().DebugString());
+ }
if (FalsePositiveProbability < 0.01 || FalsePositiveProbability >= 1) {
const TString errorMessage = "FalsePositiveProbability have to be in interval[0.01, 1)";
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("problem", errorMessage);
@@ -67,6 +84,7 @@ void TBloomIndexConstructor::DoSerializeToProto(NKikimrSchemeOp::TOlapIndexReque
for (auto&& i : ColumnNames) {
filterProto->AddColumnNames(i);
}
+ *filterProto->MutableDataExtractor() = DataExtractor.SerializeToProto();
}
-} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file
+} // namespace NKikimr::NOlap::NIndexes
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/constructor.h b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/constructor.h
index 1801027416b..cb202765fdc 100644
--- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/constructor.h
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/constructor.h
@@ -1,5 +1,6 @@
#pragma once
#include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract/constructor.h>
+#include <ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/abstract.h>
namespace NKikimr::NOlap::NIndexes {
class TBloomIndexConstructor: public IIndexMetaConstructor {
@@ -7,12 +8,16 @@ public:
static TString GetClassNameStatic() {
return "BLOOM_FILTER";
}
+
private:
std::set<TString> ColumnNames;
double FalsePositiveProbability = 0.1;
+ TReadDataExtractorContainer DataExtractor;
static inline auto Registrator = TFactory::TRegistrator<TBloomIndexConstructor>(GetClassNameStatic());
+
protected:
- virtual std::shared_ptr<IIndexMeta> DoCreateIndexMeta(const ui32 indexId, const TString& indexName, const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const override;
+ virtual std::shared_ptr<IIndexMeta> DoCreateIndexMeta(const ui32 indexId, const TString& indexName,
+ const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const override;
virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) override;
@@ -27,4 +32,4 @@ public:
}
};
-} // namespace NKikimr::NOlap::NIndexes \ No newline at end of file
+} // namespace NKikimr::NOlap::NIndexes
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.cpp
index d9e61c5cf81..df3f5610ea0 100644
--- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.cpp
@@ -1,56 +1,87 @@
-#include "meta.h"
#include "checker.h"
-#include <ydb/library/formats/arrow/hash/xx_hash.h>
+#include "meta.h"
+
#include <ydb/core/formats/arrow/hash/calcer.h>
#include <ydb/core/tx/program/program.h>
#include <ydb/core/tx/schemeshard/olap/schema/schema.h>
+#include <ydb/library/formats/arrow/hash/xx_hash.h>
+
#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_primitive.h>
#include <library/cpp/deprecated/atomic/atomic.h>
namespace NKikimr::NOlap::NIndexes {
-TString TBloomIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader, const ui32 recordsCount) const {
- const ui32 bitsCount = TFixStringBitsStorage::GrowBitsCountToByte(HashesCount * recordsCount / std::log(2));
- std::vector<bool> filterBits(bitsCount, false);
- for (ui32 i = 0; i < HashesCount; ++i) {
- NArrow::NHash::NXX64::TStreamStringHashCalcer_H3 hashCalcer(i);
- for (reader.Start(); reader.IsCorrect(); reader.ReadNext()) {
- hashCalcer.Start();
- for (auto&& i : reader) {
- NArrow::NHash::TXX64::AppendField(i.GetCurrentChunk(), i.GetCurrentRecordIndex(), hashCalcer);
- }
- filterBits[hashCalcer.Finish() % bitsCount] = true;
+TString TBloomIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader, const ui32 /*recordsCount*/) const {
+ std::deque<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> dataOwners;
+ ui32 indexHitsCount = 0;
+ for (reader.Start(); reader.IsCorrect();) {
+ AFL_VERIFY(reader.GetColumnsCount() == 1);
+ for (auto&& i : reader) {
+ dataOwners.emplace_back(i.GetCurrentChunk());
+ indexHitsCount += GetDataExtractor()->GetIndexHitsCount(dataOwners.back());
}
+ reader.ReadNext(reader.begin()->GetCurrentChunk()->GetRecordsCount());
+ }
+ const ui32 bitsCount = TFixStringBitsStorage::GrowBitsCountToByte(HashesCount * std::max<ui32>(indexHitsCount, 10) / std::log(2));
+ std::vector<bool> filterBits(bitsCount, false);
+
+ while (dataOwners.size()) {
+ GetDataExtractor()->VisitAll(
+ dataOwners.front(),
+ [&](const std::shared_ptr<arrow::Array>& arr, const ui64 hashBase) {
+ for (ui32 idx = 0; idx < arr->length(); ++idx) {
+ for (ui32 i = 0; i < HashesCount; ++i) {
+ NArrow::NHash::NXX64::TStreamStringHashCalcer_H3 hashCalcer(i);
+ hashCalcer.Start();
+ if (hashBase) {
+ hashCalcer.Update((const ui8*)&hashBase, sizeof(hashBase));
+ }
+ NArrow::NHash::TXX64::AppendField(arr, idx, hashCalcer);
+ filterBits[hashCalcer.Finish() % bitsCount] = true;
+ }
+ }
+ },
+ [&](const std::string_view data, const ui64 hashBase) {
+ for (ui32 i = 0; i < HashesCount; ++i) {
+ NArrow::NHash::NXX64::TStreamStringHashCalcer_H3 hashCalcer(i);
+ hashCalcer.Start();
+ if (hashBase) {
+ hashCalcer.Update((const ui8*)&hashBase, sizeof(hashBase));
+ }
+ hashCalcer.Update((const ui8*)data.data(), data.size());
+ filterBits[hashCalcer.Finish() % bitsCount] = true;
+ }
+ });
+ dataOwners.pop_front();
}
return TFixStringBitsStorage(filterBits).GetData();
}
-void TBloomIndexMeta::DoFillIndexCheckers(const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& /*schema*/) const {
+void TBloomIndexMeta::DoFillIndexCheckers(
+ const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& /*schema*/) const {
for (auto&& branch : info->GetBranches()) {
- std::map<ui32, std::shared_ptr<arrow::Scalar>> foundColumns;
- for (auto&& cId : ColumnIds) {
- auto itEqual = branch->GetEquals().find(cId);
- if (itEqual == branch->GetEquals().end()) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("warn", "column not found for equal")("id", cId);
- break;
+ for (auto&& i : branch->GetEquals()) {
+ if (i.first.GetColumnId() != GetColumnId()) {
+ continue;
}
- foundColumns.emplace(cId, itEqual->second);
- }
- if (foundColumns.size() != ColumnIds.size()) {
- continue;
- }
- std::set<ui64> hashes;
- for (ui32 i = 0; i < HashesCount; ++i) {
- NArrow::NHash::NXX64::TStreamStringHashCalcer_H3 calcer(i);
- calcer.Start();
- for (auto&& i : foundColumns) {
+ ui64 hashBase = 0;
+ if (!GetDataExtractor()->CheckForIndex(i.first, hashBase)) {
+ continue;
+ }
+ std::set<ui64> hashes;
+ for (ui32 hashSeed = 0; hashSeed < HashesCount; ++hashSeed) {
+ NArrow::NHash::NXX64::TStreamStringHashCalcer_H3 calcer(hashSeed);
+ calcer.Start();
+ if (hashBase) {
+ calcer.Update((const ui8*)&hashBase, sizeof(hashBase));
+ }
NArrow::NHash::TXX64::AppendField(i.second, calcer);
+ hashes.emplace(calcer.Finish());
}
- hashes.emplace(calcer.Finish());
+ branch->MutableIndexes().emplace_back(std::make_shared<TBloomFilterChecker>(GetIndexId(), std::move(hashes)));
}
- branch->MutableIndexes().emplace_back(std::make_shared<TBloomFilterChecker>(GetIndexId(), std::move(hashes)));
}
}
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.h b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.h
index cfd9bc85cf2..2ff9a5a4c34 100644
--- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.h
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.h
@@ -21,26 +21,6 @@ private:
HashesCount = -1 * std::log(FalsePositiveProbability) / std::log(2);
}
- static const ui64 HashesConstructorP = ((ui64)2 << 31) - 1;
- static const ui64 HashesConstructorA = (ui64)2 << 16;
-
- template <class TActor>
- void BuildHashesSet(const ui64 originalHash, const TActor& actor) const {
- AFL_VERIFY(HashesCount < HashesConstructorP);
- for (ui32 b = 1; b < HashesCount; ++b) {
- const ui64 hash = (HashesConstructorA * originalHash + b) % HashesConstructorP;
- actor(hash);
- }
- }
-
- template <class TContainer, class TActor>
- void BuildHashesSet(const TContainer& originalHashes, const TActor& actor) const {
- AFL_VERIFY(HashesCount < HashesConstructorP);
- for (auto&& hOriginal : originalHashes) {
- BuildHashesSet(hOriginal, actor);
- }
- }
-
protected:
virtual TConclusionStatus DoCheckModificationCompatibility(const IIndexMeta& newMeta) const override {
const auto* bMeta = dynamic_cast<const TBloomIndexMeta*>(&newMeta);
@@ -60,7 +40,10 @@ protected:
auto& bFilter = proto.GetBloomFilter();
FalsePositiveProbability = bFilter.GetFalsePositiveProbability();
for (auto&& i : bFilter.GetColumnIds()) {
- ColumnIds.emplace(i);
+ AddColumnId(i);
+ }
+ if (!MutableDataExtractor().DeserializeFromProto(bFilter.GetDataExtractor())) {
+ return false;
}
Initialize();
return true;
@@ -68,15 +51,17 @@ protected:
virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const override {
auto* filterProto = proto.MutableBloomFilter();
filterProto->SetFalsePositiveProbability(FalsePositiveProbability);
- for (auto&& i : ColumnIds) {
+ for (auto&& i : GetColumnIds()) {
filterProto->AddColumnIds(i);
}
+ *filterProto->MutableDataExtractor() = GetDataExtractor().SerializeToProto();
}
public:
TBloomIndexMeta() = default;
- TBloomIndexMeta(const ui32 indexId, const TString& indexName, const TString& storageId, std::set<ui32>& columnIds, const double fpProbability)
- : TBase(indexId, indexName, columnIds, storageId)
+ TBloomIndexMeta(const ui32 indexId, const TString& indexName, const TString& storageId, const ui32 columnId,
+ const double fpProbability, const TReadDataExtractorContainer& dataExtractor)
+ : TBase(indexId, indexName, columnId, storageId, dataExtractor)
, FalsePositiveProbability(fpProbability) {
Initialize();
}
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.cpp
index 8929d9fd4c5..3a57c06618f 100644
--- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.cpp
@@ -2,6 +2,7 @@
#include "constructor.h"
#include "meta.h"
+#include <ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/default.h>
#include <ydb/core/tx/schemeshard/olap/schema/schema.h>
namespace NKikimr::NOlap::NIndexes::NBloomNGramm {
@@ -15,7 +16,7 @@ std::shared_ptr<IIndexMeta> TIndexConstructor::DoCreateIndexMeta(
}
const ui32 columnId = columnInfo->GetId();
return std::make_shared<TIndexMeta>(indexId, indexName, GetStorageId().value_or(NBlobOperations::TGlobal::DefaultStorageId), columnId,
- HashesCount, FilterSizeBytes, NGrammSize, RecordsCount);
+ DataExtractor, HashesCount, FilterSizeBytes, NGrammSize, RecordsCount);
}
TConclusionStatus TIndexConstructor::DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) {
@@ -29,12 +30,20 @@ TConclusionStatus TIndexConstructor::DoDeserializeFromJson(const NJson::TJsonVal
return TConclusionStatus::Fail("empty column_name in bloom ngramm filter features");
}
+ {
+ auto conclusion = DataExtractor.DeserializeFromJson(jsonInfo["data_extractor"]);
+ if (conclusion.IsFail()) {
+ return conclusion;
+ }
+ }
+
if (!jsonInfo["records_count"].IsUInteger()) {
return TConclusionStatus::Fail("records_count have to be in bloom filter features as uint field");
}
RecordsCount = jsonInfo["records_count"].GetUInteger();
if (!TConstants::CheckRecordsCount(RecordsCount)) {
- return TConclusionStatus::Fail("records_count have to be in bloom ngramm filter in interval " + TConstants::GetRecordsCountIntervalString());
+ return TConclusionStatus::Fail(
+ "records_count have to be in bloom ngramm filter in interval " + TConstants::GetRecordsCountIntervalString());
}
if (!jsonInfo["ngramm_size"].IsUInteger()) {
@@ -92,6 +101,9 @@ NKikimr::TConclusionStatus TIndexConstructor::DoDeserializeFromProto(const NKiki
if (!ColumnName) {
return TConclusionStatus::Fail("empty column name");
}
+ if (!DataExtractor.DeserializeFromProto(bFilter.GetDataExtractor())) {
+ return TConclusionStatus::Fail("cannot parse data extractor from proto: " + bFilter.GetDataExtractor().DebugString());
+ }
return TConclusionStatus::Success();
}
@@ -102,6 +114,7 @@ void TIndexConstructor::DoSerializeToProto(NKikimrSchemeOp::TOlapIndexRequested&
filterProto->SetNGrammSize(NGrammSize);
filterProto->SetFilterSizeBytes(FilterSizeBytes);
filterProto->SetHashesCount(HashesCount);
+ *filterProto->MutableDataExtractor() = DataExtractor.SerializeToProto();
}
} // namespace NKikimr::NOlap::NIndexes::NBloomNGramm
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.h b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.h
index 209b1a5de8f..1e29eb0ecd5 100644
--- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.h
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.h
@@ -1,5 +1,6 @@
#pragma once
#include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract/constructor.h>
+#include <ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/abstract.h>
namespace NKikimr::NOlap::NIndexes::NBloomNGramm {
class TIndexConstructor: public IIndexMetaConstructor {
@@ -10,6 +11,7 @@ public:
private:
TString ColumnName;
+ TReadDataExtractorContainer DataExtractor;
ui32 NGrammSize = 3;
ui32 FilterSizeBytes = 512;
ui32 HashesCount = 2;
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.cpp
index 2c5d294cb77..124a7e5f9c5 100644
--- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.cpp
@@ -38,6 +38,7 @@ private:
template <ui32 HashIdx, ui32 CharsCount>
class THashesCountSelector {
static constexpr ui64 HashStart = (ui64)HashIdx * (ui64)2166136261;
+
public:
template <class TActor>
static void BuildHashes(const ui8* data, TActor& actor) {
@@ -134,6 +135,11 @@ private:
}
};
+public:
+ TNGrammBuilder(const ui32 hashesCount)
+ : HashesCount(hashesCount) {
+ }
+
template <class TAction>
void BuildNGramms(
const char* data, const ui32 dataSize, const std::optional<NRequest::TLikePart::EOperation> op, const ui32 nGrammSize, TAction& pred) {
@@ -141,11 +147,6 @@ private:
(const ui8*)data, dataSize, HashesCount, nGrammSize, op, pred);
}
-public:
- TNGrammBuilder(const ui32 hashesCount)
- : HashesCount(hashesCount) {
- }
-
template <class TFiller>
void FillNGrammHashes(const ui32 nGrammSize, const std::shared_ptr<arrow::Array>& array, TFiller& fillData) {
AFL_VERIFY(array->type_id() == arrow::utf8()->id())("id", array->type()->ToString());
@@ -229,8 +230,18 @@ TString TIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader, const ui32 rec
const auto doFillFilter = [&](auto& inserter) {
for (reader.Start(); reader.IsCorrect();) {
- builder.FillNGrammHashes(NGrammSize, reader.begin()->GetCurrentChunk(), inserter);
- reader.ReadNext(reader.begin()->GetCurrentChunk()->length());
+ AFL_VERIFY(reader.GetColumnsCount() == 1);
+ for (auto&& r : reader) {
+ GetDataExtractor()->VisitAll(
+ r.GetCurrentChunk(),
+ [&](const std::shared_ptr<arrow::Array>& arr, const ui32 /*hashBase*/) {
+ builder.FillNGrammHashes(NGrammSize, arr, inserter);
+ },
+ [&](const std::string_view data, const ui32 /*hashBase*/) {
+ builder.BuildNGramms(data.data(), data.size(), {}, NGrammSize, inserter);
+ });
+ }
+ reader.ReadNext(reader.begin()->GetCurrentChunk()->GetRecordsCount());
}
};
@@ -247,30 +258,24 @@ TString TIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader, const ui32 rec
void TIndexMeta::DoFillIndexCheckers(
const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& /*schema*/) const {
for (auto&& branch : info->GetBranches()) {
- std::map<ui32, NRequest::TLikeDescription> foundColumns;
- for (auto&& cId : ColumnIds) {
- auto it = branch->GetLikes().find(cId);
- if (it == branch->GetLikes().end()) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("warn", "not found like for column")("id", cId);
- break;
+ for (auto&& i : branch->GetLikes()) {
+ if (i.first.GetColumnId() != GetColumnId()) {
+ continue;
}
- foundColumns.emplace(cId, it->second);
- }
- if (foundColumns.size() != ColumnIds.size()) {
- continue;
- }
-
- std::set<ui64> hashes;
- const auto predSet = [&](const ui64 hashSecondary) {
- hashes.emplace(hashSecondary);
- };
- TNGrammBuilder builder(HashesCount);
- for (auto&& c : foundColumns) {
- for (auto&& ls : c.second.GetLikeSequences()) {
+ ui64 hashBase;
+ if (!GetDataExtractor()->CheckForIndex(i.first, hashBase)) {
+ continue;
+ }
+ std::set<ui64> hashes;
+ const auto predSet = [&](const ui64 hashSecondary) {
+ hashes.emplace(hashSecondary);
+ };
+ TNGrammBuilder builder(HashesCount);
+ for (auto&& ls : i.second.GetLikeSequences()) {
builder.FillNGrammHashes(NGrammSize, ls.second.GetOperation(), ls.second.GetValue(), predSet);
}
+ branch->MutableIndexes().emplace_back(std::make_shared<TFilterChecker>(GetIndexId(), std::move(hashes)));
}
- branch->MutableIndexes().emplace_back(std::make_shared<TFilterChecker>(GetIndexId(), std::move(hashes)));
}
}
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.h b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.h
index 1e9135e8c9d..9291219402c 100644
--- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.h
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.h
@@ -54,6 +54,9 @@ protected:
return false;
}
}
+ if (!MutableDataExtractor().DeserializeFromProto(bFilter.GetDataExtractor())) {
+ return false;
+ }
HashesCount = bFilter.GetHashesCount();
if (!TConstants::CheckHashesCount(HashesCount)) {
return false;
@@ -69,7 +72,7 @@ protected:
if (!bFilter.HasColumnId() || !bFilter.GetColumnId()) {
return false;
}
- ColumnIds.emplace(bFilter.GetColumnId());
+ AddColumnId(bFilter.GetColumnId());
Initialize();
return true;
}
@@ -79,19 +82,19 @@ protected:
AFL_VERIFY(TConstants::CheckFilterSizeBytes(FilterSizeBytes));
AFL_VERIFY(TConstants::CheckHashesCount(HashesCount));
AFL_VERIFY(TConstants::CheckRecordsCount(RecordsCount));
- AFL_VERIFY(ColumnIds.size() == 1);
filterProto->SetRecordsCount(RecordsCount);
filterProto->SetNGrammSize(NGrammSize);
filterProto->SetFilterSizeBytes(FilterSizeBytes);
filterProto->SetHashesCount(HashesCount);
- filterProto->SetColumnId(*ColumnIds.begin());
+ filterProto->SetColumnId(GetColumnId());
+ *filterProto->MutableDataExtractor() = GetDataExtractor().SerializeToProto();
}
public:
TIndexMeta() = default;
- TIndexMeta(const ui32 indexId, const TString& indexName, const TString& storageId, const ui32 columnId, const ui32 hashesCount,
+ TIndexMeta(const ui32 indexId, const TString& indexName, const TString& storageId, const ui32 columnId, const TReadDataExtractorContainer& dataExtractor, const ui32 hashesCount,
const ui32 filterSizeBytes, const ui32 nGrammSize, const ui32 recordsCount)
- : TBase(indexId, indexName, { columnId }, storageId)
+ : TBase(indexId, indexName, columnId, storageId, dataExtractor)
, NGrammSize(nGrammSize)
, FilterSizeBytes(filterSizeBytes)
, RecordsCount(recordsCount)
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/constructor.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/constructor.cpp
index 362ee4a098b..118ea0537fb 100644
--- a/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/constructor.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/constructor.cpp
@@ -20,7 +20,8 @@ std::shared_ptr<NKikimr::NOlap::NIndexes::IIndexMeta> TCountMinSketchConstructor
}
AFL_VERIFY(columnIds.emplace(columnInfo->GetId()).second);
}
- return std::make_shared<TIndexMeta>(indexId, indexName, GetStorageId().value_or(NBlobOperations::TGlobal::DefaultStorageId), columnIds);
+ AFL_VERIFY(columnIds.size() == 1);
+ return std::make_shared<TIndexMeta>(indexId, indexName, GetStorageId().value_or(NBlobOperations::TGlobal::DefaultStorageId), *columnIds.begin());
}
NKikimr::TConclusionStatus TCountMinSketchConstructor::DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) {
@@ -37,6 +38,10 @@ NKikimr::TConclusionStatus TCountMinSketchConstructor::DoDeserializeFromJson(con
}
ColumnNames.emplace(i.GetString());
}
+ if (ColumnNames.size() > 1) {
+ return TConclusionStatus::Fail(
+ "column_names elements count have to be equal to 1 temporary");
+ }
return TConclusionStatus::Success();
}
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.cpp
index c48b9996964..6cb012b960f 100644
--- a/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.cpp
@@ -16,30 +16,31 @@ TString TIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader, const ui32 /*r
for (auto& colReader : reader) {
for (colReader.Start(); colReader.IsCorrect(); colReader.ReadNextChunk()) {
- auto array = colReader.GetCurrentChunk();
-
- NArrow::SwitchType(array->type_id(), [&](const auto& type) {
- using TWrap = std::decay_t<decltype(type)>;
- using TArray = typename arrow::TypeTraits<typename TWrap::T>::ArrayType;
-
- const TArray& arrTyped = static_cast<const TArray&>(*array);
- if constexpr (arrow::has_c_type<typename TWrap::T>()) {
- for (int64_t i = 0; i < arrTyped.length(); ++i) {
- auto cell = TCell::Make(arrTyped.Value(i));
- sketch->Count(cell.Data(), cell.Size());
+ auto cArray = colReader.GetCurrentChunk()->GetChunkedArray();
+ for (auto&& array : cArray->chunks()) {
+ NArrow::SwitchType(array->type_id(), [&](const auto& type) {
+ using TWrap = std::decay_t<decltype(type)>;
+ using TArray = typename arrow::TypeTraits<typename TWrap::T>::ArrayType;
+
+ const TArray& arrTyped = static_cast<const TArray&>(*array);
+ if constexpr (arrow::has_c_type<typename TWrap::T>()) {
+ for (int64_t i = 0; i < arrTyped.length(); ++i) {
+ auto cell = TCell::Make(arrTyped.Value(i));
+ sketch->Count(cell.Data(), cell.Size());
+ }
+ return true;
}
- return true;
- }
- if constexpr (arrow::has_string_view<typename TWrap::T>()) {
- for (int64_t i = 0; i < arrTyped.length(); ++i) {
- auto view = arrTyped.GetView(i);
- sketch->Count(view.data(), view.size());
+ if constexpr (arrow::has_string_view<typename TWrap::T>()) {
+ for (int64_t i = 0; i < arrTyped.length(); ++i) {
+ auto view = arrTyped.GetView(i);
+ sketch->Count(view.data(), view.size());
+ }
+ return true;
}
- return true;
- }
- AFL_VERIFY(false)("message", "Unsupported arrow type for building an index");
- return false;
- });
+ AFL_VERIFY(false)("message", "Unsupported arrow type for building an index");
+ return false;
+ });
+ }
}
}
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.h b/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.h
index cb7abe56c61..78ab255c4f6 100644
--- a/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.h
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.h
@@ -1,4 +1,5 @@
#pragma once
+#include <ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/default.h>
#include <ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.h>
namespace NKikimr::NOlap::NIndexes::NCountMinSketch {
@@ -18,12 +19,14 @@ protected:
virtual TConclusionStatus DoCheckModificationCompatibility(const IIndexMeta& newMeta) const override {
const auto* bMeta = dynamic_cast<const TIndexMeta*>(&newMeta);
if (!bMeta) {
- return TConclusionStatus::Fail("cannot read meta as appropriate class: " + GetClassName() + ". Meta said that class name is " + newMeta.GetClassName());
+ return TConclusionStatus::Fail(
+ "cannot read meta as appropriate class: " + GetClassName() + ". Meta said that class name is " + newMeta.GetClassName());
}
return TBase::CheckSameColumnsForModification(newMeta);
}
- virtual void DoFillIndexCheckers(const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& schema) const override;
+ virtual void DoFillIndexCheckers(
+ const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& schema) const override;
virtual TString DoBuildIndexImpl(TChunkedBatchReader& reader, const ui32 recordsCount) const override;
@@ -32,32 +35,27 @@ protected:
AFL_VERIFY(proto.HasCountMinSketch());
auto& sketch = proto.GetCountMinSketch();
for (auto&& i : sketch.GetColumnIds()) {
- ColumnIds.emplace(i);
+ AddColumnId(i);
}
return true;
}
virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const override {
auto* sketchProto = proto.MutableCountMinSketch();
- for (auto&& i : ColumnIds) {
+ for (auto&& i : GetColumnIds()) {
sketchProto->AddColumnIds(i);
}
}
public:
TIndexMeta() = default;
- TIndexMeta(const ui32 indexId, const TString& indexName, const TString& storageId, const std::set<ui32>& columnIds)
- : TBase(indexId, indexName, columnIds, storageId) {
+ TIndexMeta(const ui32 indexId, const TString& indexName, const TString& storageId, const ui32 columnId)
+ : TBase(indexId, indexName, columnId, storageId, std::make_shared<TDefaultDataExtractor>()) {
}
virtual TString GetClassName() const override {
return GetClassNameStatic();
}
-
- const std::set<ui32>& GetColumnIds() const {
- return ColumnIds;
- }
-
};
-} // namespace NKikimr::NOlap::NIndexes
+} // namespace NKikimr::NOlap::NIndexes::NCountMinSketch
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/max/meta.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/max/meta.cpp
index e7dbcdd8bfe..7d5467a55c0 100644
--- a/ydb/core/tx/columnshard/engines/storage/indexes/max/meta.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/max/meta.cpp
@@ -16,7 +16,7 @@ TString TIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader, const ui32 /*r
{
TChunkedColumnReader cReader = *reader.begin();
for (reader.Start(); cReader.IsCorrect(); cReader.ReadNextChunk()) {
- auto currentScalar = cReader.GetCurrentAccessor()->GetMaxScalar();
+ auto currentScalar = cReader.GetCurrentChunk()->GetMaxScalar();
AFL_VERIFY(currentScalar);
if (!result || NArrow::ScalarCompare(*result, *currentScalar) == -1) {
result = currentScalar;
@@ -44,8 +44,7 @@ std::shared_ptr<arrow::Scalar> TIndexMeta::GetMaxScalarVerified(
}
NJson::TJsonValue TIndexMeta::DoSerializeDataToJson(const TString& data, const TIndexInfo& indexInfo) const {
- AFL_VERIFY(ColumnIds.size() == 1);
- auto scalar = GetMaxScalarVerified({ data }, indexInfo.GetColumnFeaturesVerified(*ColumnIds.begin()).GetArrowField()->type());
+ auto scalar = GetMaxScalarVerified({ data }, indexInfo.GetColumnFeaturesVerified(GetColumnId()).GetArrowField()->type());
return scalar->ToString();
}
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/max/meta.h b/ydb/core/tx/columnshard/engines/storage/indexes/max/meta.h
index 2925e9fbc6c..5e024612e88 100644
--- a/ydb/core/tx/columnshard/engines/storage/indexes/max/meta.h
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/max/meta.h
@@ -1,5 +1,6 @@
#pragma once
#include <ydb/core/formats/arrow/arrow_helpers.h>
+#include <ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/default.h>
#include <ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.h>
#include <ydb/library/formats/arrow/switch/switch_type.h>
@@ -34,27 +35,21 @@ protected:
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("problem", "incorrect column id");
return false;
};
- ColumnIds.emplace(bFilter.GetColumnId());
+ AddColumnId(bFilter.GetColumnId());
return true;
}
virtual NJson::TJsonValue DoSerializeDataToJson(const TString& data, const TIndexInfo& indexInfo) const override;
virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const override {
- AFL_VERIFY(ColumnIds.size() == 1);
auto* filterProto = proto.MutableMaxIndex();
- filterProto->SetColumnId(*ColumnIds.begin());
+ filterProto->SetColumnId(GetColumnId());
}
public:
TIndexMeta() = default;
TIndexMeta(const ui32 indexId, const TString& indexName, const TString& storageId, const ui32& columnId)
- : TBase(indexId, indexName, { columnId }, storageId) {
- }
-
- ui32 GetColumnId() const {
- AFL_VERIFY(ColumnIds.size() == 1);
- return *ColumnIds.begin();
+ : TBase(indexId, indexName, columnId, storageId, std::make_shared<TDefaultDataExtractor>()) {
}
static bool IsAvailableType(const NScheme::TTypeInfo type) {
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/abstract.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/abstract.cpp
new file mode 100644
index 00000000000..e7cb68807a8
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/abstract.cpp
@@ -0,0 +1,36 @@
+#include "abstract.h"
+#include "default.h"
+
+namespace NKikimr::NOlap::NIndexes {
+
+TConclusionStatus TReadDataExtractorContainer::DeserializeFromJson(const NJson::TJsonValue& jsonValue) {
+ if (!jsonValue.IsDefined() || jsonValue.IsNull()) {
+ if (!Initialize(TDefaultDataExtractor::GetClassNameStatic())) {
+ return TConclusionStatus::Fail(
+ "cannot build default data extractor ('" + TDefaultDataExtractor::GetClassNameStatic() + "')");
+ }
+ return TConclusionStatus::Success();
+ }
+ const TString className = jsonValue["class_name"].GetStringRobust();
+ auto dataExtractor = IReadDataExtractor::TFactory::MakeHolder(className);
+ if (!dataExtractor) {
+ return TConclusionStatus::Fail("cannot build data extractor (unexpected class name: '" + className + "')");
+ }
+ auto parseConclusion = dataExtractor->DeserializeFromJson(jsonValue);
+ if (parseConclusion.IsFail()) {
+ return parseConclusion;
+ }
+ Object = std::shared_ptr<IReadDataExtractor>(dataExtractor.Release());
+ return TConclusionStatus::Success();
+}
+
+bool TReadDataExtractorContainer::DeserializeFromProto(const IReadDataExtractor::TProto& data) {
+ if (!data.GetClassName()) {
+ AFL_VERIFY(Initialize(TDefaultDataExtractor::GetClassNameStatic()));
+ return true;
+ } else {
+ return TBase::DeserializeFromProto(data);
+ }
+}
+
+} // namespace NKikimr::NOlap::NIndexes
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/abstract.h b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/abstract.h
new file mode 100644
index 00000000000..a2518363c8b
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/abstract.h
@@ -0,0 +1,73 @@
+#pragma once
+#include <ydb/core/formats/arrow/accessor/abstract/accessor.h>
+#include <ydb/core/protos/flat_scheme_op.pb.h>
+#include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract/common.h>
+
+#include <ydb/services/bg_tasks/abstract/interface.h>
+
+#include <library/cpp/json/writer/json_value.h>
+
+namespace NKikimr::NOlap::NIndexes {
+
+class IReadDataExtractor {
+public:
+ using TRecordVisitor = const std::function<void(const std::string_view value, const ui64 hashBase)>;
+ using TChunkVisitor = const std::function<void(const std::shared_ptr<arrow::Array>&, const ui64 hashBase)>;
+ using TProto = NKikimrSchemeOp::TIndexDataExtractor;
+ using TFactory = NObjectFactory::TObjectFactory<IReadDataExtractor, TString>;
+
+private:
+ virtual void DoVisitAll(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& dataArray, const TChunkVisitor& chunkVisitor,
+ const TRecordVisitor& recordVisitor) const = 0;
+ virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) = 0;
+
+ virtual void DoSerializeToProto(TProto& proto) const = 0;
+ virtual bool DoDeserializeFromProto(const TProto& proto) = 0;
+ virtual bool DoCheckForIndex(const NRequest::TOriginalDataAddress& dataSource, ui64& baseHash) const = 0;
+ virtual ui32 DoGetIndexHitsCount(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& dataArray) const = 0;
+
+public:
+ virtual TString GetClassName() const = 0;
+ virtual ~IReadDataExtractor() = default;
+
+ ui32 GetIndexHitsCount(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& dataArray) const {
+ return DoGetIndexHitsCount(dataArray);
+ }
+
+ bool CheckForIndex(const NRequest::TOriginalDataAddress& dataSource, ui64& baseHash) const {
+ baseHash = 0;
+ return DoCheckForIndex(dataSource, baseHash);
+ }
+
+ virtual void SerializeToProto(TProto& proto) const {
+ return DoSerializeToProto(proto);
+ }
+
+ virtual bool DeserializeFromProto(const TProto& proto) {
+ return DoDeserializeFromProto(proto);
+ }
+
+ void VisitAll(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& dataArray, const TChunkVisitor& chunkVisitor,
+ const TRecordVisitor& recordVisitor) const {
+ AFL_VERIFY(dataArray->IsDataOwner())("type", dataArray->GetType());
+ DoVisitAll(dataArray, chunkVisitor, recordVisitor);
+ }
+
+ TConclusionStatus DeserializeFromJson(const NJson::TJsonValue& jsonInfo) {
+ return DoDeserializeFromJson(jsonInfo);
+ }
+};
+
+class TReadDataExtractorContainer: public NBackgroundTasks::TInterfaceProtoContainer<IReadDataExtractor> {
+private:
+ using TBase = NBackgroundTasks::TInterfaceProtoContainer<IReadDataExtractor>;
+
+public:
+ using TBase::TBase;
+ TReadDataExtractorContainer() = default;
+
+ bool DeserializeFromProto(const IReadDataExtractor::TProto& data);
+ TConclusionStatus DeserializeFromJson(const NJson::TJsonValue& jsonValue);
+};
+
+} // namespace NKikimr::NOlap::NIndexes
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/default.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/default.cpp
new file mode 100644
index 00000000000..a78cddce1b6
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/default.cpp
@@ -0,0 +1,61 @@
+#include "default.h"
+
+#include <ydb/core/formats/arrow/accessor/sub_columns/accessor.h>
+
+#include <util/digest/fnv.h>
+
+namespace NKikimr::NOlap::NIndexes {
+
+void TDefaultDataExtractor::VisitSimple(
+ const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& dataArray, const ui64 hashBase, const TChunkVisitor& visitor) const {
+ auto chunkedArray = dataArray->GetChunkedArray();
+ for (auto&& i : chunkedArray->chunks()) {
+ visitor(i, hashBase);
+ }
+}
+
+void TDefaultDataExtractor::DoVisitAll(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& dataArray, const TChunkVisitor& chunkVisitor,
+ const TRecordVisitor& recordVisitor) const {
+ if (dataArray->GetType() != NArrow::NAccessor::IChunkedArray::EType::SubColumnsArray) {
+ VisitSimple(dataArray, 0, chunkVisitor);
+ return;
+ }
+ const auto subColumns = std::static_pointer_cast<NArrow::NAccessor::TSubColumnsArray>(dataArray);
+ for (ui32 idx = 0; idx < subColumns->GetColumnsData().GetRecords()->GetColumnsCount(); ++idx) {
+ const std::string_view svColName = subColumns->GetColumnsData().GetStats().GetColumnName(idx);
+ const ui64 hashBase = NRequest::TOriginalDataAddress::CalcSubColumnHash(svColName);
+ VisitSimple(subColumns->GetColumnsData().GetRecords()->GetColumnVerified(idx), hashBase, chunkVisitor);
+ }
+ std::vector<ui64> hashByColumnIdx;
+ for (ui32 idx = 0; idx < subColumns->GetOthersData().GetStats().GetColumnsCount(); ++idx) {
+ const std::string_view svColName = subColumns->GetOthersData().GetStats().GetColumnName(idx);
+ hashByColumnIdx.emplace_back(NRequest::TOriginalDataAddress::CalcSubColumnHash(svColName));
+ }
+ auto iterator = subColumns->GetOthersData().BuildIterator();
+ for (; iterator.IsValid(); iterator.Next()) {
+ recordVisitor(iterator.GetValue(), hashByColumnIdx[iterator.GetKeyIndex()]);
+ }
+}
+
+bool TDefaultDataExtractor::DoCheckForIndex(const NRequest::TOriginalDataAddress& request, ui64& hashBase) const {
+ if (request.GetSubColumnName()) {
+ AFL_VERIFY(request.GetSubColumnName().StartsWith("$."));
+ std::string_view sv(request.GetSubColumnName().data() + 2, request.GetSubColumnName().size() - 2);
+ if (sv.starts_with("\"") && sv.ends_with("\"")) {
+ sv = std::string_view(sv.data() + 1, sv.size() - 2);
+ }
+ hashBase = NRequest::TOriginalDataAddress::CalcSubColumnHash(sv);
+ }
+ return true;
+}
+
+ui32 TDefaultDataExtractor::DoGetIndexHitsCount(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& dataArray) const {
+ if (dataArray->GetType() != NArrow::NAccessor::IChunkedArray::EType::SubColumnsArray) {
+ return dataArray->GetRecordsCount();
+ } else {
+ const auto subColumns = std::static_pointer_cast<NArrow::NAccessor::TSubColumnsArray>(dataArray);
+ return subColumns->GetColumnsData().GetStats().GetFilledValuesCount() + subColumns->GetOthersData().GetStats().GetFilledValuesCount();
+ }
+}
+
+} // namespace NKikimr::NOlap::NIndexes
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/default.h b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/default.h
new file mode 100644
index 00000000000..0a5e7ec061b
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/default.h
@@ -0,0 +1,39 @@
+#pragma once
+#include "abstract.h"
+
+namespace NKikimr::NOlap::NIndexes {
+
+class TDefaultDataExtractor: public IReadDataExtractor {
+public:
+ static TString GetClassNameStatic() {
+ return "DEFAULT";
+ }
+
+private:
+ static const inline auto Registrator = TFactory::TRegistrator<TDefaultDataExtractor>(GetClassNameStatic());
+ virtual void DoSerializeToProto(TProto& /*proto*/) const override {
+ return;
+ }
+ virtual bool DoDeserializeFromProto(const TProto& /*proto*/) override {
+ return true;
+ }
+ virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& /*jsonInfo*/) override {
+ return TConclusionStatus::Success();
+ }
+
+ void VisitSimple(
+ const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& dataArray, const ui64 hashBase, const TChunkVisitor& visitor) const;
+
+ virtual void DoVisitAll(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& dataArray, const TChunkVisitor& chunkVisitor,
+ const TRecordVisitor& recordVisitor) const override;
+
+ virtual bool DoCheckForIndex(const NRequest::TOriginalDataAddress& request, ui64& hashBase) const override;
+ virtual ui32 DoGetIndexHitsCount(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& dataArray) const override;
+
+public:
+ virtual TString GetClassName() const override {
+ return GetClassNameStatic();
+ }
+};
+
+} // namespace NKikimr::NOlap::NIndexes
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/sub_column.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/sub_column.cpp
new file mode 100644
index 00000000000..e3f297de2a0
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/sub_column.cpp
@@ -0,0 +1,38 @@
+#include "sub_column.h"
+#include <ydb/core/formats/arrow/accessor/sub_columns/accessor.h>
+
+namespace NKikimr::NOlap::NIndexes {
+
+void TSubColumnDataExtractor::DoVisitAll(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& dataArray, const TChunkVisitor& chunkVisitor,
+ const TRecordVisitor& recordVisitor) const {
+ AFL_VERIFY(dataArray->GetType() == NArrow::NAccessor::IChunkedArray::EType::SubColumnsArray);
+ const auto subColumns = std::static_pointer_cast<NArrow::NAccessor::TSubColumnsArray>(dataArray);
+ if (auto idxColumn = subColumns->GetColumnsData().GetStats().GetKeyIndexOptional(SubColumnName)) {
+ auto chunkedArray = subColumns->GetColumnsData().GetRecords()->GetColumnVerified(*idxColumn)->GetChunkedArray();
+ for (auto&& i : chunkedArray->chunks()) {
+ chunkVisitor(i, 0);
+ }
+ } else if (auto idxColumn = subColumns->GetOthersData().GetStats().GetKeyIndexOptional(SubColumnName)) {
+ auto iterator = subColumns->GetOthersData().BuildIterator();
+ for (; iterator.IsValid(); iterator.Next()) {
+ if (iterator.GetKeyIndex() != *idxColumn) {
+ continue;
+ }
+ recordVisitor(iterator.GetValue(), 0);
+ }
+ }
+}
+
+ui32 TSubColumnDataExtractor::DoGetIndexHitsCount(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& dataArray) const {
+ AFL_VERIFY(dataArray->GetType() == NArrow::NAccessor::IChunkedArray::EType::SubColumnsArray);
+ const auto subColumns = std::static_pointer_cast<NArrow::NAccessor::TSubColumnsArray>(dataArray);
+ if (auto idxColumn = subColumns->GetColumnsData().GetStats().GetKeyIndexOptional(SubColumnName)) {
+ return subColumns->GetColumnsData().GetStats().GetColumnRecordsCount(*idxColumn);
+ } else if (auto idxColumn = subColumns->GetOthersData().GetStats().GetKeyIndexOptional(SubColumnName)) {
+ return subColumns->GetOthersData().GetStats().GetColumnRecordsCount(*idxColumn);
+ } else {
+ return 0;
+ }
+}
+
+} // namespace NKikimr::NOlap::NIndexes
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/sub_column.h b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/sub_column.h
new file mode 100644
index 00000000000..d352061756e
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/sub_column.h
@@ -0,0 +1,60 @@
+#pragma once
+#include "abstract.h"
+
+namespace NKikimr::NOlap::NIndexes {
+
+class TSubColumnDataExtractor: public IReadDataExtractor {
+public:
+ static TString GetClassNameStatic() {
+ return "SUB_COLUMN";
+ }
+
+private:
+ static const inline auto Registrator = TFactory::TRegistrator<TSubColumnDataExtractor>(GetClassNameStatic());
+
+ TString SubColumnName;
+
+ virtual void DoSerializeToProto(TProto& proto) const override {
+ AFL_VERIFY(!!SubColumnName);
+ proto.MutableSubColumn()->SetSubColumnName(SubColumnName);
+ }
+ virtual bool DoDeserializeFromProto(const TProto& proto) override {
+ if (!proto.HasSubColumn()) {
+ return false;
+ }
+ SubColumnName = proto.GetSubColumn().GetSubColumnName();
+ if (!SubColumnName) {
+ return false;
+ }
+ return true;
+ }
+ virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) override {
+ if (!jsonInfo.Has("sub_column_name")) {
+ return TConclusionStatus::Fail("extractor description has to have 'sub_column_name' parameter");
+ }
+ if (!jsonInfo["sub_column_name"].IsString()) {
+ return TConclusionStatus::Fail("extractor description parameter 'sub_column_name' has to been string");
+ }
+ SubColumnName = jsonInfo["sub_column_name"].GetString();
+ if (!SubColumnName) {
+ return TConclusionStatus::Fail("extractor description parameter 'sub_column_name' hasn't been empty");
+ }
+ return TConclusionStatus::Success();
+ }
+
+ virtual void DoVisitAll(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& dataArray, const TChunkVisitor& chunkVisitor,
+ const TRecordVisitor& recordVisitor) const override;
+
+ virtual bool DoCheckForIndex(const NRequest::TOriginalDataAddress& request, ui64& /*hashBase*/) const override {
+ return request.GetSubColumnName() == SubColumnName;
+ }
+
+ virtual ui32 DoGetIndexHitsCount(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& dataArray) const override;
+
+public:
+ virtual TString GetClassName() const override {
+ return GetClassNameStatic();
+ }
+};
+
+} // namespace NKikimr::NOlap::NIndexes
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/ya.make b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/ya.make
new file mode 100644
index 00000000000..25071ccf9e4
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/ya.make
@@ -0,0 +1,18 @@
+LIBRARY()
+
+
+SRCS(
+ abstract.cpp
+ GLOBAL sub_column.cpp
+ GLOBAL default.cpp
+)
+
+PEERDIR(
+ ydb/core/formats/arrow
+ ydb/core/protos
+ ydb/core/formats/arrow/accessor/sub_columns
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.cpp
index 00c02406358..63c09d0f135 100644
--- a/ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.cpp
@@ -1,7 +1,8 @@
#include "meta.h"
-#include <ydb/core/tx/columnshard/engines/storage/chunks/data.h>
-#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
+
#include <ydb/core/formats/arrow/size_calcer.h>
+#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
+#include <ydb/core/tx/columnshard/engines/storage/chunks/data.h>
namespace NKikimr::NOlap::NIndexes {
@@ -12,7 +13,11 @@ std::shared_ptr<NKikimr::NOlap::IPortionDataChunk> TIndexByColumns::DoBuildIndex
std::vector<TChunkedColumnReader> columnReaders;
for (auto&& i : ColumnIds) {
auto it = data.find(i);
- AFL_VERIFY(it != data.end());
+ if (it == data.end()) {
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "index_data_absent")("column_id", i)("index_name", GetIndexName())(
+ "index_id", GetIndexId());
+ return nullptr;
+ }
columnReaders.emplace_back(it->second, indexInfo.GetColumnLoaderVerified(i));
}
TChunkedBatchReader reader(std::move(columnReaders));
@@ -25,17 +30,22 @@ bool TIndexByColumns::DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDe
return true;
}
-TIndexByColumns::TIndexByColumns(const ui32 indexId, const TString& indexName, const std::set<ui32>& columnIds, const TString& storageId)
+TIndexByColumns::TIndexByColumns(
+ const ui32 indexId, const TString& indexName, const ui32 columnId, const TString& storageId, const TReadDataExtractorContainer& extractor)
: TBase(indexId, indexName, storageId)
- , ColumnIds(columnIds)
-{
+ , DataExtractor(extractor)
+ , ColumnIds({ columnId }) {
Serializer = NArrow::NSerialization::TSerializerContainer::GetDefaultSerializer();
}
NKikimr::TConclusionStatus TIndexByColumns::CheckSameColumnsForModification(const IIndexMeta& newMeta) const {
const auto* bMeta = dynamic_cast<const TIndexByColumns*>(&newMeta);
if (!bMeta) {
- return TConclusionStatus::Fail("cannot read meta as appropriate class: " + GetClassName() + ". Meta said that class name is " + newMeta.GetClassName());
+ return TConclusionStatus::Fail(
+ "cannot read meta as appropriate class: " + GetClassName() + ". Meta said that class name is " + newMeta.GetClassName());
+ }
+ if (bMeta->ColumnIds.size() != 1) {
+ return TConclusionStatus::Fail("one column per index is necessary");
}
if (bMeta->ColumnIds.size() != ColumnIds.size()) {
return TConclusionStatus::Fail("columns count is different");
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.h b/ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.h
index f8e601c80fc..a120134df8b 100644
--- a/ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.h
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.h
@@ -1,7 +1,9 @@
#pragma once
-#include <ydb/core/tx/columnshard/splitter/abstract/chunks.h>
+#include "extractor/abstract.h"
+
#include <ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h>
#include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.h>
+#include <ydb/core/tx/columnshard/splitter/abstract/chunks.h>
namespace NKikimr::NOlap::NIndexes {
@@ -9,9 +11,17 @@ class TIndexByColumns: public IIndexMeta {
private:
using TBase = IIndexMeta;
std::shared_ptr<NArrow::NSerialization::ISerializer> Serializer;
-
-protected:
+ TReadDataExtractorContainer DataExtractor;
std::set<ui32> ColumnIds;
+protected:
+
+ const TReadDataExtractorContainer& GetDataExtractor() const {
+ return DataExtractor;
+ }
+
+ TReadDataExtractorContainer& MutableDataExtractor() {
+ return DataExtractor;
+ }
virtual TString DoBuildIndexImpl(TChunkedBatchReader& reader, const ui32 recordsCount) const = 0;
@@ -22,8 +32,23 @@ protected:
TConclusionStatus CheckSameColumnsForModification(const IIndexMeta& newMeta) const;
public:
+ void AddColumnId(const ui32 columnId) {
+ AFL_VERIFY(ColumnIds.emplace(columnId).second);
+ AFL_VERIFY(ColumnIds.size() == 1);
+ }
+
+ ui32 GetColumnId() const {
+ AFL_VERIFY(ColumnIds.size() == 1)("size", ColumnIds.size());
+ return *ColumnIds.begin();
+ }
+
+ const std::set<ui32>& GetColumnIds() const {
+ return ColumnIds;
+ }
+
TIndexByColumns() = default;
- TIndexByColumns(const ui32 indexId, const TString& indexName, const std::set<ui32>& columnIds, const TString& storageId);
+ TIndexByColumns(const ui32 indexId, const TString& indexName, const ui32 columnId, const TString& storageId,
+ const TReadDataExtractorContainer& extractor);
};
} // namespace NKikimr::NOlap::NIndexes
diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/portions/ya.make b/ydb/core/tx/columnshard/engines/storage/indexes/portions/ya.make
index 0ce6d8f9987..403f7f797a9 100644
--- a/ydb/core/tx/columnshard/engines/storage/indexes/portions/ya.make
+++ b/ydb/core/tx/columnshard/engines/storage/indexes/portions/ya.make
@@ -9,6 +9,7 @@ PEERDIR(
ydb/core/formats/arrow
ydb/library/formats/arrow/protos
ydb/core/tx/columnshard/engines/storage/chunks
+ ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor
ydb/core/tx/columnshard/engines/scheme/indexes/abstract
ydb/core/tx/columnshard/engines/portions
)
diff --git a/ydb/core/tx/columnshard/splitter/chunks.h b/ydb/core/tx/columnshard/splitter/chunks.h
index 315fc18604e..8c4458fd57e 100644
--- a/ydb/core/tx/columnshard/splitter/chunks.h
+++ b/ydb/core/tx/columnshard/splitter/chunks.h
@@ -8,7 +8,7 @@
namespace NKikimr::NOlap {
-class IPortionColumnChunk : public IPortionDataChunk {
+class IPortionColumnChunk: public IPortionDataChunk {
private:
using TBase = IPortionDataChunk;
@@ -55,68 +55,57 @@ private:
std::vector<std::shared_ptr<IPortionDataChunk>> Chunks;
std::shared_ptr<TColumnLoader> Loader;
- std::shared_ptr<NArrow::NAccessor::IChunkedArray> CurrentChunk;
- std::optional<NArrow::NAccessor::IChunkedArray::TFullDataAddress> CurrentChunkArray;
- ui32 CurrentChunkIndex = 0;
+ std::shared_ptr<NArrow::NAccessor::IChunkedArray> CurrentArray;
+ std::optional<NArrow::NAccessor::IChunkedArray::TFullChunkedArrayAddress> CurrentChunkArray;
+ ui32 CurrentArrayIndex = 0;
ui32 CurrentRecordIndex = 0;
+
public:
+
TChunkedColumnReader(const std::vector<std::shared_ptr<IPortionDataChunk>>& chunks, const std::shared_ptr<TColumnLoader>& loader)
: Chunks(chunks)
- , Loader(loader)
- {
+ , Loader(loader) {
Start();
}
void Start() {
- CurrentChunkIndex = 0;
+ CurrentArrayIndex = 0;
CurrentRecordIndex = 0;
if (Chunks.size()) {
- CurrentChunk = Loader->ApplyVerified(Chunks.front()->GetData(), Chunks.front()->GetRecordsCountVerified());
+ CurrentArray = Loader->ApplyVerified(Chunks.front()->GetData(), Chunks.front()->GetRecordsCountVerified());
CurrentChunkArray.reset();
}
}
- const std::shared_ptr<arrow::Array>& GetCurrentChunk() {
+ const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& GetCurrentChunk() {
if (!CurrentChunkArray || !CurrentChunkArray->GetAddress().Contains(CurrentRecordIndex)) {
- CurrentChunkArray = CurrentChunk->GetChunk(CurrentChunkArray, CurrentRecordIndex);
+ CurrentChunkArray = CurrentArray->GetArray(CurrentChunkArray, CurrentRecordIndex, CurrentArray);
}
AFL_VERIFY(CurrentChunkArray);
return CurrentChunkArray->GetArray();
}
- const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& GetCurrentAccessor() const {
- AFL_VERIFY(CurrentChunk);
- return CurrentChunk;
- }
-
- ui32 GetCurrentRecordIndex() {
- if (!CurrentChunkArray || !CurrentChunkArray->GetAddress().Contains(CurrentRecordIndex)) {
- CurrentChunkArray = CurrentChunk->GetChunk(CurrentChunkArray->GetAddress(), CurrentRecordIndex);
- }
- return CurrentChunkArray->GetAddress().GetLocalIndex(CurrentRecordIndex);
- }
-
bool IsCorrect() const {
- return !!CurrentChunk;
+ return !!CurrentArray;
}
bool ReadNextChunk() {
- while (++CurrentChunkIndex < Chunks.size()) {
- CurrentChunk = Loader->ApplyVerified(Chunks[CurrentChunkIndex]->GetData(), Chunks[CurrentChunkIndex]->GetRecordsCountVerified());
+ while (++CurrentArrayIndex < Chunks.size()) {
+ CurrentArray = Loader->ApplyVerified(Chunks[CurrentArrayIndex]->GetData(), Chunks[CurrentArrayIndex]->GetRecordsCountVerified());
CurrentChunkArray.reset();
CurrentRecordIndex = 0;
- if (CurrentRecordIndex < CurrentChunk->GetRecordsCount()) {
+ if (CurrentRecordIndex < CurrentArray->GetRecordsCount()) {
return true;
}
}
CurrentChunkArray.reset();
- CurrentChunk = nullptr;
+ CurrentArray = nullptr;
return false;
}
bool ReadNext() {
- AFL_VERIFY(!!CurrentChunk);
- if (++CurrentRecordIndex < CurrentChunk->GetRecordsCount()) {
+ AFL_VERIFY(!!CurrentArray);
+ if (++CurrentRecordIndex < CurrentArray->GetRecordsCount()) {
return true;
}
return ReadNextChunk();
@@ -127,6 +116,7 @@ class TChunkedBatchReader {
private:
std::vector<TChunkedColumnReader> Columns;
bool IsCorrectFlag = true;
+
public:
TChunkedBatchReader(const std::vector<TChunkedColumnReader>& columnReaders)
: Columns(columnReaders) {
@@ -193,4 +183,4 @@ public:
}
};
-}
+} // namespace NKikimr::NOlap
diff --git a/ydb/core/tx/columnshard/test_helper/program_constructor.cpp b/ydb/core/tx/columnshard/test_helper/program_constructor.cpp
index 267e2700fa7..9d597f408df 100644
--- a/ydb/core/tx/columnshard/test_helper/program_constructor.cpp
+++ b/ydb/core/tx/columnshard/test_helper/program_constructor.cpp
@@ -13,7 +13,8 @@ ui32 TProgramProtoBuilder::AddConstant(const TString& bytes) {
return CurrentGenericColumnId;
}
-ui32 TProgramProtoBuilder::AddOperation(const NKikimrSSA::TProgram::TAssignment::EFunction op, const std::vector<ui32>& arguments) {
+ui32 TProgramProtoBuilder::AddOperation(
+ const NKikimrSSA::TProgram::TAssignment::EFunction op, const std::vector<ui32>& arguments) {
auto* command = Proto.AddCommand();
auto* functionProto = command->MutableAssign()->MutableFunction();
for (auto&& i : arguments) {
@@ -43,6 +44,18 @@ ui32 TProgramProtoBuilder::AddOperation(const NYql::TKernelRequestBuilder::EBina
return CurrentGenericColumnId;
}
+ui32 TProgramProtoBuilder::AddOperation(const TString& kernelName, const std::vector<ui32>& arguments) {
+ auto* command = Proto.AddCommand();
+ auto* functionProto = command->MutableAssign()->MutableFunction();
+ for (auto&& i : arguments) {
+ functionProto->AddArguments()->SetId(i);
+ }
+ functionProto->SetId(NKikimrSSA::TProgram::TAssignment::FUNC_CMP_GREATER_EQUAL);
+ functionProto->SetKernelName(kernelName);
+ command->MutableAssign()->MutableColumn()->SetId(++CurrentGenericColumnId);
+ return CurrentGenericColumnId;
+}
+
void TProgramProtoBuilder::AddFilter(const ui32 colId) {
auto* command = Proto.AddCommand();
command->MutableFilter()->MutablePredicate()->SetId(colId);
diff --git a/ydb/core/tx/columnshard/test_helper/program_constructor.h b/ydb/core/tx/columnshard/test_helper/program_constructor.h
index 47d44389cc2..735cd2e3c1e 100644
--- a/ydb/core/tx/columnshard/test_helper/program_constructor.h
+++ b/ydb/core/tx/columnshard/test_helper/program_constructor.h
@@ -23,6 +23,7 @@ public:
ui32 AddConstant(const TString& bytes);
ui32 AddOperation(const NYql::TKernelRequestBuilder::EBinaryOp op, const std::vector<ui32>& arguments);
ui32 AddOperation(const NKikimrSSA::TProgram::TAssignment::EFunction op, const std::vector<ui32>& arguments);
+ ui32 AddOperation(const TString& kernelName, const std::vector<ui32>& arguments);
ui32 AddAggregation(
const NArrow::NSSA::NAggregation::EAggregate op, const std::vector<ui32>& arguments, const std::vector<ui32>& groupByKeys);
void AddFilter(const ui32 colId);
diff --git a/ydb/services/bg_tasks/abstract/interface.h b/ydb/services/bg_tasks/abstract/interface.h
index 6e57260ee4a..dfd03d1a5c0 100644
--- a/ydb/services/bg_tasks/abstract/interface.h
+++ b/ydb/services/bg_tasks/abstract/interface.h
@@ -189,7 +189,7 @@ protected:
public:
using TBase::TBase;
- bool Initialize(const TString& className, const bool maybeExists = false) {
+ [[nodiscard]] bool Initialize(const TString& className, const bool maybeExists = false) {
AFL_VERIFY(maybeExists || !Object)("problem", "initialize for not-empty-object");
Object.reset(TFactory::Construct(className));
if (!Object) {