diff options
author | ivanmorozov333 <ivanmorozov@ydb.tech> | 2025-03-01 12:28:33 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-03-01 12:28:33 +0300 |
commit | 1215fa43fddf7d1ed0bfc8644a014dd909347f1b (patch) | |
tree | 8bb6b371ade1bc883b2cc234c21b5db914eb3b86 | |
parent | d08419b4a78e460944e81523fe9c7ce7c7f5ca14 (diff) | |
download | ydb-1215fa43fddf7d1ed0bfc8644a014dd909347f1b.tar.gz |
indexes data extractor for providing subcolumns info (#15175)
43 files changed, 1044 insertions, 346 deletions
diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt index 6500d3169e8..c353e465b4d 100644 --- a/.github/config/muted_ya.txt +++ b/.github/config/muted_ya.txt @@ -39,6 +39,7 @@ ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleSplitsWithRestartsWhenWait ydb/core/kqp/ut/olap KqpOlapBlobsSharing.TableReshardingConsistency64 ydb/core/kqp/ut/olap KqpOlapBlobsSharing.TableReshardingModuloN ydb/core/kqp/ut/olap KqpOlapBlobsSharing.UpsertWhileSplitTest +ydb/core/kqp/ut/olap KqpOlapJson.BloomIndexesVariants ydb/core/kqp/ut/olap KqpOlapSysView.StatsSysViewBytesDictActualization ydb/core/kqp/ut/olap KqpOlapSysView.StatsSysViewBytesDictStatActualization ydb/core/kqp/ut/olap KqpOlapWrite.TierDraftsGCWithRestart diff --git a/ydb/core/formats/arrow/accessor/sub_columns/stats.h b/ydb/core/formats/arrow/accessor/sub_columns/stats.h index 371634eb147..176c223d468 100644 --- a/ydb/core/formats/arrow/accessor/sub_columns/stats.h +++ b/ydb/core/formats/arrow/accessor/sub_columns/stats.h @@ -24,6 +24,14 @@ private: std::shared_ptr<arrow::UInt8Array> AccessorType; public: + ui32 GetFilledValuesCount() const { + ui32 result = 0; + for (ui32 i = 0; i < (ui32)DataRecordsCount->length(); ++i) { + result += DataRecordsCount->Value(i); + } + return result; + } + NJson::TJsonValue DebugJson() const { NJson::TJsonValue result = NJson::JSON_MAP; result.InsertValue("key_names", NArrow::DebugJson(DataNames, 1000000, 1000000)["data"]); diff --git a/ydb/core/kqp/ut/olap/indexes_ut.cpp b/ydb/core/kqp/ut/olap/indexes_ut.cpp index 7a0708a031f..db511d91475 100644 --- a/ydb/core/kqp/ut/olap/indexes_ut.cpp +++ b/ydb/core/kqp/ut/olap/indexes_ut.cpp @@ -93,7 +93,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) { auto alterQuery = TStringBuilder() << R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_resource_id, TYPE=BLOOM_FILTER, - FEATURES=`{"column_names" : ["resource_id", "level"], "false_positive_probability" : 0.05}`); + FEATURES=`{"column_names" : ["resource_id"], "false_positive_probability" : 0.05}`); )"; auto session = tableClient.CreateSession().GetValueSync().GetSession(); auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); @@ -128,6 +128,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) { Cerr << csController->GetIndexesSkippingOnSelect().Val() << " / " << csController->GetIndexesApprovedOnSelect().Val() << Endl; CompareYson(result, R"([[0u;]])"); AFL_VERIFY(csController->GetIndexesSkippedNoData().Val() == 0); + AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() == 0); AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() < csController->GetIndexesSkippingOnSelect().Val()) ("approve", csController->GetIndexesApprovedOnSelect().Val())("skip", csController->GetIndexesSkippingOnSelect().Val()); } @@ -432,7 +433,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) { auto alterQuery = TStringBuilder() << Sprintf( R"(ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_INDEX, NAME=index_resource_id, TYPE=BLOOM_FILTER, - FEATURES=`{"column_names" : ["resource_id", "level"], "false_positive_probability" : 0.05, "storage_id" : "%s"}`); + FEATURES=`{"column_names" : ["resource_id"], "false_positive_probability" : 0.05, "storage_id" : "%s"}`); )", StorageId.data()); auto session = tableClient.CreateSession().GetValueSync().GetSession(); @@ -474,7 +475,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) { AFL_VERIFY(csController->GetIndexesSkippingOnSelect().Val() == 0); AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() == 0); - csController->WaitCompactions(TDuration::Seconds(25)); + csController->WaitCompactions(TDuration::Seconds(5)); // important checker for control compactions (<=21) and control indexes constructed (>=21) AFL_VERIFY(csController->GetCompactionStartedCounter().Val() == 3)("count", csController->GetCompactionStartedCounter().Val()); diff --git a/ydb/core/kqp/ut/olap/json_ut.cpp b/ydb/core/kqp/ut/olap/json_ut.cpp index dccff1e6094..cace9eeb983 100644 --- a/ydb/core/kqp/ut/olap/json_ut.cpp +++ b/ydb/core/kqp/ut/olap/json_ut.cpp @@ -65,9 +65,21 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) { class TSelectCommand: public ICommand { private: - const TString Command; - const TString Compare; + TString Command; + TString Compare; + std::optional<ui64> ExpectIndexSkip; + std::optional<ui64> ExpectIndexNoData; + std::optional<ui64> ExpectIndexApprove; + ui64 IndexSkipStart = 0; + ui64 IndexNoDataStart = 0; + ui64 IndexApproveStart = 0; + virtual TConclusionStatus DoExecute(TKikimrRunner& kikimr) override { + auto controller = NYDBTest::TControllers::GetControllerAs<NYDBTest::NColumnShard::TController>(); + AFL_VERIFY(controller); + IndexSkipStart = controller->GetIndexesSkippingOnSelect().Val(); + IndexApproveStart = controller->GetIndexesApprovedOnSelect().Val(); + IndexNoDataStart = controller->GetIndexesSkippedNoData().Val(); Cerr << "EXECUTE: " << Command << Endl; auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession(); auto it = kikimr.GetQueryClient().StreamExecuteQuery(Command, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); @@ -78,14 +90,76 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) { Cerr << "OUTPUT: " << output << Endl; CompareYson(output, Compare); } + const ui32 skip = controller->GetIndexesSkippingOnSelect().Val() - IndexSkipStart; + const ui32 noData = controller->GetIndexesSkippedNoData().Val() - IndexNoDataStart; + const ui32 approves = controller->GetIndexesApprovedOnSelect().Val() - IndexApproveStart; + Cerr << noData << "/" << skip << "/" << approves << Endl; + if (ExpectIndexSkip) { + AFL_VERIFY(skip == *ExpectIndexSkip)("expect", ExpectIndexSkip)("real", skip)( + "current", controller->GetIndexesSkippingOnSelect().Val())( + "pred", IndexSkipStart); + } + if (ExpectIndexNoData) { + AFL_VERIFY(noData == *ExpectIndexNoData)("expect", ExpectIndexNoData)("real", noData)( + "current", controller->GetIndexesSkippedNoData().Val())( + "pred", IndexNoDataStart); + } + if (ExpectIndexApprove) { + AFL_VERIFY(approves == *ExpectIndexApprove)("expect", ExpectIndexApprove)("real", approves)( + "current", controller->GetIndexesApprovedOnSelect().Val())("pred", IndexApproveStart); + } return TConclusionStatus::Success(); } public: - TSelectCommand(const TString& command, const TString& compare) - : Command(command) - , Compare(compare) { + bool DeserializeFromString(const TString& info) { + auto lines = StringSplitter(info).SplitBySet("\n").ToList<TString>(); + std::optional<ui32> state; + for (auto&& l : lines) { + l = Strip(l); + if (l.StartsWith("READ:")) { + l = l.substr(5); + state = 0; + } else if (l.StartsWith("EXPECTED:")) { + l = l.substr(9); + state = 1; + } else if (l.StartsWith("IDX_ND_SKIP_APPROVE:")) { + state = 2; + l = l.substr(20); + } else { + AFL_VERIFY(state)("line", l); + } + + if (*state == 0) { + Command += l; + } else if (*state == 1) { + Compare += l; + } else if (*state == 2) { + auto idxExpectations = StringSplitter(l).SplitBySet(" ,.;").SkipEmpty().ToList<TString>(); + AFL_VERIFY(idxExpectations.size() == 3)("size", idxExpectations.size())("string", l); + if (idxExpectations[0] != "{}") { + ui32 res; + AFL_VERIFY(TryFromString<ui32>(idxExpectations[0], res))("string", l); + ExpectIndexNoData = res; + } + if (idxExpectations[1] != "{}") { + ui32 res; + AFL_VERIFY(TryFromString<ui32>(idxExpectations[1], res))("string", l); + ExpectIndexSkip = res; + } + if (idxExpectations[2] != "{}") { + ui32 res; + AFL_VERIFY(TryFromString<ui32>(idxExpectations[2], res))("string", l); + ExpectIndexApprove = res; + } + } else { + AFL_VERIFY(false)("line", l); + } + } + return true; } + + TSelectCommand() = default; }; class TStopCompactionCommand: public ICommand { @@ -167,9 +241,7 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) { public: TScriptExecutor(const std::vector<std::shared_ptr<ICommand>>& commands) - : Commands(commands) - { - + : Commands(commands) { } void Execute() { NKikimrConfig::TAppConfig appConfig; @@ -198,23 +270,9 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) { command = command.substr(5); return std::make_shared<TDataCommand>(command); } else if (command.StartsWith("READ:")) { - auto lines = StringSplitter(command.substr(5)).SplitBySet("\n").ToList<TString>(); - int step = 0; - TString request; - TString expectation; - for (auto&& i : lines) { - i = Strip(i); - if (i.StartsWith("EXPECTED:")) { - step = 1; - i = i.substr(9); - } - if (step == 0) { - request += i; - } else if (step == 1) { - expectation += i; - } - } - return std::make_shared<TSelectCommand>(request, expectation); + auto result = std::make_shared<TSelectCommand>(); + AFL_VERIFY(result->DeserializeFromString(command)); + return result; } else if (command.StartsWith("WAIT_COMPACTION")) { return std::make_shared<TWaitCompactionCommand>(); } else if (command.StartsWith("STOP_COMPACTION")) { @@ -290,7 +348,6 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) { for (auto&& i : Scripts) { i.Execute(); } - } }; @@ -636,7 +693,7 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) { )"; TScriptVariator(script).Execute(); } -/* + Y_UNIT_TEST(BloomIndexesVariants) { TString script = R"( STOP_COMPACTION @@ -648,7 +705,7 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) { PRIMARY KEY (Col1) ) PARTITION BY HASH(Col1) - WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = $$1|2$$); + WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = $$2$$); ------ SCHEMA: ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=UPSERT_OPTIONS, `SCAN_READER_POLICY_NAME`=`SIMPLE`) @@ -657,34 +714,56 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) { ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=ALTER_COLUMN, NAME=Col2, `DATA_ACCESSOR_CONSTRUCTOR.CLASS_NAME`=`SUB_COLUMNS`, `COLUMNS_LIMIT`=`$$0|1|1024$$`, `SPARSED_DETECTOR_KFF`=`$$0|10|1000$$`, `MEM_LIMIT_CHUNK`=`$$0|100|1000000$$`, `OTHERS_ALLOWED_FRACTION`=`$$0|0.5$$`) ------ - SCHEMA: - ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=ALTER_COLUMN, NAME=Col2, `DATA_ACCESSOR_CONSTRUCTOR.CLASS_NAME`=`SUB_COLUMNS`, - `COLUMNS_LIMIT`=`$$0|1|1024$$`, `SPARSED_DETECTOR_KFF`=`$$0|10|1000$$`, `MEM_LIMIT_CHUNK`=`$$0|100|1000000$$`, `OTHERS_ALLOWED_FRACTION`=`$$0|0.5$$`) - ------ DATA: - REPLACE INTO `/Root/ColumnTable` (Col1, Col2) VALUES(1u, JsonDocument('{"a" : "a1"}')), (2u, JsonDocument('{"a" : "a2"}')), - (3u, JsonDocument('{"b" : "b3"}')), (4u, JsonDocument('{"b" : "b4", "a" : "a4"}')) + REPLACE INTO `/Root/ColumnTable` (Col1, Col2) VALUES(1u, JsonDocument('{"a.b.c" : "a1"}')), (2u, JsonDocument('{"a.b.c" : "a2"}')), + (3u, JsonDocument('{"b.c.d" : "b3"}')), (4u, JsonDocument('{"b.c.d" : "b4", "a" : "a4"}')) ------ DATA: - REPLACE INTO `/Root/ColumnTable` (Col1, Col2) VALUES(11u, JsonDocument('{"a" : "1a1"}')), (12u, JsonDocument('{"a" : "1a2"}')), - (13u, JsonDocument('{"b" : "1b3"}')), (14u, JsonDocument('{"b" : "1b4", "a" : "a4"}')) + REPLACE INTO `/Root/ColumnTable` (Col1, Col2) VALUES(11u, JsonDocument('{"a.b.c" : "1a1"}')), (12u, JsonDocument('{"a.b.c" : "1a2"}')), + (13u, JsonDocument('{"b.c.d" : "1b3"}')), (14u, JsonDocument('{"b.c.d" : "1b4", "a" : "a4"}')) ------ SCHEMA: ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=UPSERT_INDEX, NAME=a_index, TYPE=BLOOM_FILTER, - FEATURES=`{"column_names" : ["Col2"], "data_extractor" : {"class_name" : "SUB_COLUMN", "sub_column_name" : "a"}, "false_positive_probability" : 0.05}`) + FEATURES=`{"column_names" : ["Col2"], "false_positive_probability" : 0.01}`) + ------ + SCHEMA: + ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=UPSERT_INDEX, NAME=index_ngramm_b, TYPE=BLOOM_NGRAMM_FILTER, + FEATURES=`{"column_name" : "Col2", "ngramm_size" : 3, "hashes_count" : 2, "filter_size_bytes" : 4096, + "records_count" : 1024, "data_extractor" : {"class_name" : "SUB_COLUMN", "sub_column_name" : "b.c.d"}}`); ------ DATA: REPLACE INTO `/Root/ColumnTable` (Col1) VALUES(10u) ------ ONE_ACTUALIZATION ------ - READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.a") = "1a1" ORDER BY Col1; - EXPECTED: [[11u;["{\"a\":\"1a1\"}"]]] + READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"a.b.c\"") = "a1" ORDER BY Col1; + EXPECTED: [[1u;["{\"a.b.c\":\"a1\"}"]]] + IDX_ND_SKIP_APPROVE: 0, 3, 1 + ------ + READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"a.b.c\"") = "1a1" ORDER BY Col1; + EXPECTED: [[11u;["{\"a.b.c\":\"1a1\"}"]]] + IDX_ND_SKIP_APPROVE: 0, 3, 1 + ------ + READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d\"") = "1b4" ORDER BY Col1; + EXPECTED: [[14u;["{\"a\":\"a4\",\"b.c.d\":\"1b4\"}"]]] + IDX_ND_SKIP_APPROVE: 0, 3, 1 + ------ + READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d\"") = "1b5" ORDER BY Col1; + EXPECTED: [] + IDX_ND_SKIP_APPROVE: 0, 4, 0 + ------ + READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d\"") like "1b3" ORDER BY Col1; + EXPECTED: [[13u;["{\"b.c.d\":\"1b3\"}"]]] + IDX_ND_SKIP_APPROVE: 0, 3, 1 + ------ + READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d\"") like "1b5" ORDER BY Col1; + EXPECTED: [] + IDX_ND_SKIP_APPROVE: 0, 4, 0 )"; TScriptVariator(script).Execute(); } -*/ + Y_UNIT_TEST(SwitchAccessorCompactionVariants) { TString script = R"( STOP_COMPACTION @@ -765,7 +844,6 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) { )"; TScriptVariator(script).Execute(); } - } } // namespace NKikimr::NKqp diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index d63b0e157d4..8fd5bd10f6a 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -388,9 +388,25 @@ message TOlapColumnDescription { optional string ColumnFamilyName = 15; } +message TIndexDataExtractor { + optional string ClassName = 1; + message TDefault { + } + + message TSubColumn { + optional string SubColumnName = 1; + } + + oneof Implementation { + TDefault Default = 20; + TSubColumn SubColumn = 21; + } +} + message TRequestedBloomFilter { optional double FalsePositiveProbability = 1 [default = 0.1]; repeated string ColumnNames = 3; + optional TIndexDataExtractor DataExtractor = 4; } message TRequestedBloomNGrammFilter { @@ -399,6 +415,7 @@ message TRequestedBloomNGrammFilter { optional uint32 HashesCount = 3; optional string ColumnName = 4; optional uint32 RecordsCount = 5; + optional TIndexDataExtractor DataExtractor = 6; } message TRequestedMaxIndex { @@ -428,6 +445,7 @@ message TBloomFilter { optional double FalsePositiveProbability = 1 [default = 0.1]; optional uint64 MaxBytesCount = 2 [default = 8196]; repeated uint32 ColumnIds = 3; + optional TIndexDataExtractor DataExtractor = 4; } message TBloomNGrammFilter { @@ -436,6 +454,7 @@ message TBloomNGrammFilter { optional uint32 HashesCount = 3; optional uint32 ColumnId = 4; optional uint32 RecordsCount = 5; + optional TIndexDataExtractor DataExtractor = 6; } message TMaxIndex { @@ -1204,7 +1223,7 @@ message TBackupTask { // currently available only for s3: optional TCompressionOptions Compression = 13; - optional bool EnableChecksums = 16; + optional bool EnableChecksums = 16; optional bool EnablePermissions = 18; } diff --git a/ydb/core/tx/columnshard/bg_tasks/abstract/status_channel.h b/ydb/core/tx/columnshard/bg_tasks/abstract/status_channel.h index 2ab8c1f4180..47303000b82 100644 --- a/ydb/core/tx/columnshard/bg_tasks/abstract/status_channel.h +++ b/ydb/core/tx/columnshard/bg_tasks/abstract/status_channel.h @@ -78,7 +78,7 @@ public: using TBase::TBase; bool DeserializeFromString(const TString& data) { if (!TBase::DeserializeFromString(data)) { - Initialize(TFakeStatusChannel::GetClassNameStatic()); + AFL_VERIFY(Initialize(TFakeStatusChannel::GetClassNameStatic())); return false; } return true; diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/common.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/common.cpp new file mode 100644 index 00000000000..f1f760ca89a --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/common.cpp @@ -0,0 +1,38 @@ +#include "common.h" + +#include <ydb/library/actors/core/log.h> + +#include <contrib/libs/xxhash/xxhash.h> +#include <util/string/builder.h> + +namespace NKikimr::NOlap::NIndexes::NRequest { + +TString TNodeId::ToString() const { + return TStringBuilder() << "[" << ColumnId << "." << GenerationId << "." << NodeType << "]"; +} + +TNodeId TNodeId::Original(const ui32 columnId, const TString& subColumnName) { + AFL_VERIFY(columnId); + TNodeId result(columnId, Counter.Inc(), ENodeType::OriginalColumn); + result.SubColumnName = subColumnName; + return result; +} + +TOriginalDataAddress TNodeId::BuildOriginalDataAddress() const { + AFL_VERIFY(NodeType == ENodeType::OriginalColumn); + return TOriginalDataAddress(ColumnId, SubColumnName); +} + +TString TOriginalDataAddress::DebugString() const { + if (SubColumnName) { + return TStringBuilder() << "{cId=" << ColumnId << ";sub=" << SubColumnName << "}"; + } else { + return ::ToString(ColumnId); + } +} + +ui64 TOriginalDataAddress::CalcSubColumnHash(const std::string_view sv) { + return XXH3_64bits(sv.data(), sv.size()); +} + +} // namespace NKikimr::NOlap::NIndexes::NRequest diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/common.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/common.h new file mode 100644 index 00000000000..78e6e457ef8 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/common.h @@ -0,0 +1,105 @@ +#pragma once +#include <ydb/library/accessor/accessor.h> + +#include <util/digest/fnv.h> +#include <util/digest/numeric.h> +#include <util/generic/refcount.h> +#include <util/generic/string.h> + +namespace NKikimr::NOlap::NIndexes::NRequest { + +enum class ENodeType : ui32 { + Aggregation, + OriginalColumn, + SubColumn, + Root, + Operation, + Constant +}; + +class TOriginalDataAddress { +private: + YDB_READONLY(ui32, ColumnId, 0); + YDB_READONLY_DEF(TString, SubColumnName); + +public: + static ui64 CalcSubColumnHash(const std::string_view sv); + + static ui64 CalcSubColumnHash(const TString& path) { + return CalcSubColumnHash(std::string_view(path.data(), path.size())); + } + + explicit TOriginalDataAddress(const ui32 columnId, const TString& subColumnName = "") + : ColumnId(columnId) + , SubColumnName(subColumnName) { + } + + bool operator==(const TOriginalDataAddress& item) const { + return std::tie(ColumnId, SubColumnName) == std::tie(item.ColumnId, item.SubColumnName); + } + + explicit operator size_t() const { + if (SubColumnName) { + return CombineHashes<ui64>(ColumnId, FnvHash<ui64>(SubColumnName.data(), SubColumnName.size())); + } else { + return ColumnId; + } + } + + TString DebugString() const; +}; + +class TNodeId { +private: + YDB_READONLY(ui32, ColumnId, 0); + YDB_READONLY_DEF(TString, SubColumnName); + YDB_READONLY(ui32, GenerationId, 0); + YDB_READONLY(ENodeType, NodeType, ENodeType::OriginalColumn); + + static inline TAtomicCounter Counter = 0; + + TNodeId(const ui32 columnId, const ui32 generationId, const ENodeType type) + : ColumnId(columnId) + , GenerationId(generationId) + , NodeType(type) { + } + +public: + bool operator==(const TNodeId& item) const { + return ColumnId == item.ColumnId && GenerationId == item.GenerationId && NodeType == item.NodeType && + SubColumnName == item.SubColumnName; + } + + TOriginalDataAddress BuildOriginalDataAddress() const; + + TNodeId BuildCopy() const { + return TNodeId(ColumnId, Counter.Inc(), NodeType); + } + + TString ToString() const; + + static TNodeId RootNodeId() { + return TNodeId(0, 0, ENodeType::Root); + } + + static TNodeId Constant(const ui32 columnId) { + return TNodeId(columnId, Counter.Inc(), ENodeType::Constant); + } + + static TNodeId Original(const ui32 columnId, const TString& subColumnName = ""); + + static TNodeId Aggregation() { + return TNodeId(0, Counter.Inc(), ENodeType::Aggregation); + } + + static TNodeId Operation(const ui32 columnId) { + return TNodeId(columnId, Counter.Inc(), ENodeType::Operation); + } + + bool operator<(const TNodeId& item) const { + return std::tie(ColumnId, GenerationId, NodeType, SubColumnName) < + std::tie(item.ColumnId, item.GenerationId, item.NodeType, item.SubColumnName); + } +}; + +} // namespace NKikimr::NOlap::NIndexes::NRequest diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/coverage.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/coverage.cpp index 872f07414e1..a21520cc76e 100644 --- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/coverage.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/coverage.cpp @@ -85,4 +85,21 @@ std::shared_ptr<NKikimr::NOlap::NIndexes::IIndexChecker> TBranchCoverage::GetAnd return std::make_shared<TAndIndexChecker>(Indexes); } +NJson::TJsonValue TBranchCoverage::DebugJson() const { + NJson::TJsonValue result = NJson::JSON_MAP; + if (Equals.size()) { + auto& jsonEquals = result.InsertValue("equals", NJson::JSON_MAP); + for (auto&& i : Equals) { + jsonEquals.InsertValue(i.first.DebugString(), i.second ? i.second->ToString() : "NULL"); + } + } + if (Likes.size()) { + auto& jsonLikes = result.InsertValue("likes", NJson::JSON_MAP); + for (auto&& i : Likes) { + jsonLikes.InsertValue(i.first.DebugString(), i.second.DebugJson()); + } + } + return result; +} + } // namespace NKikimr::NOlap::NIndexes::NRequest diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/coverage.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/coverage.h index f568f28b564..b6f37e0771e 100644 --- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/coverage.h +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/coverage.h @@ -1,5 +1,6 @@ #pragma once #include "checker.h" +#include "common.h" #include "like.h" #include <ydb/core/tx/program/program.h> @@ -10,21 +11,22 @@ namespace NKikimr::NOlap::NIndexes::NRequest { class TBranchCoverage { private: - THashMap<ui32, std::shared_ptr<arrow::Scalar>> Equals; - THashMap<ui32, TLikeDescription> Likes; + THashMap<TOriginalDataAddress, std::shared_ptr<arrow::Scalar>> Equals; + THashMap<TOriginalDataAddress, TLikeDescription> Likes; YDB_ACCESSOR_DEF(std::vector<std::shared_ptr<IIndexChecker>>, Indexes); public: - TBranchCoverage(const THashMap<ui32, std::shared_ptr<arrow::Scalar>>& equals, const THashMap<ui32, TLikeDescription>& likes) + TBranchCoverage(const THashMap<TOriginalDataAddress, std::shared_ptr<arrow::Scalar>>& equals, + const THashMap<TOriginalDataAddress, TLikeDescription>& likes) : Equals(equals) , Likes(likes) { } - const THashMap<ui32, std::shared_ptr<arrow::Scalar>>& GetEquals() const { + const THashMap<TOriginalDataAddress, std::shared_ptr<arrow::Scalar>>& GetEquals() const { return Equals; } - const THashMap<ui32, TLikeDescription>& GetLikes() const { + const THashMap<TOriginalDataAddress, TLikeDescription>& GetLikes() const { return Likes; } @@ -34,22 +36,7 @@ public: return DebugJson().GetStringRobust(); } - NJson::TJsonValue DebugJson() const { - NJson::TJsonValue result = NJson::JSON_MAP; - if (Equals.size()) { - auto& jsonEquals = result.InsertValue("equals", NJson::JSON_MAP); - for (auto&& i : Equals) { - jsonEquals.InsertValue(::ToString(i.first), i.second ? i.second->ToString() : "NULL"); - } - } - if (Likes.size()) { - auto& jsonLikes = result.InsertValue("likes", NJson::JSON_MAP); - for (auto&& i : Likes) { - jsonLikes.InsertValue(::ToString(i.first), i.second.DebugJson()); - } - } - return result; - } + NJson::TJsonValue DebugJson() const; }; class TDataForIndexesCheckers { @@ -70,7 +57,8 @@ public: return result; } - void AddBranch(const THashMap<ui32, std::shared_ptr<arrow::Scalar>>& equalsData, const THashMap<ui32, TLikeDescription>& likesData) { + void AddBranch(const THashMap<TOriginalDataAddress, std::shared_ptr<arrow::Scalar>>& equalsData, + const THashMap<TOriginalDataAddress, TLikeDescription>& likesData) { Branches.emplace_back(std::make_shared<TBranchCoverage>(equalsData, likesData)); } diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/tree.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/tree.cpp index f6625452c12..b45f93bb0f7 100644 --- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/tree.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/tree.cpp @@ -9,15 +9,6 @@ namespace NKikimr::NOlap::NIndexes::NRequest { -TString TNodeId::ToString() const { - return TStringBuilder() << "[" << ColumnId << "." << GenerationId << "." << NodeType << "]"; -} - -TNodeId TNodeId::Original(const ui32 columnId) { - AFL_VERIFY(columnId); - return TNodeId(columnId, Counter.Inc(), ENodeType::OriginalColumn); -} - std::shared_ptr<IRequestNode> IRequestNode::Copy() const { auto selfCopy = DoCopy(); selfCopy->Parent = nullptr; @@ -94,24 +85,24 @@ NJson::TJsonValue TPackAnd::DoSerializeToJson() const { auto& arrJson = result.InsertValue("equals", NJson::JSON_ARRAY); for (auto&& i : Equals) { auto& jsonCondition = arrJson.AppendValue(NJson::JSON_MAP); - jsonCondition.InsertValue(::ToString(i.first), i.second->ToString()); + jsonCondition.InsertValue(i.first.DebugString(), i.second->ToString()); } } { auto& arrJson = result.InsertValue("likes", NJson::JSON_ARRAY); for (auto&& i : Likes) { auto& jsonCondition = arrJson.AppendValue(NJson::JSON_MAP); - jsonCondition.InsertValue(::ToString(i.first), i.second.ToString()); + jsonCondition.InsertValue(i.first.DebugString(), i.second.ToString()); } } return result; } -void TPackAnd::AddEqual(const ui32 columnId, const std::shared_ptr<arrow::Scalar>& value) { +void TPackAnd::AddEqual(const TOriginalDataAddress& originalDataAddress, const std::shared_ptr<arrow::Scalar>& value) { AFL_VERIFY(value); - auto it = Equals.find(columnId); + auto it = Equals.find(originalDataAddress); if (it == Equals.end()) { - Equals.emplace(columnId, value); + Equals.emplace(originalDataAddress, value); } else if (it->second->Equals(*value)) { return; } else { @@ -129,7 +120,7 @@ bool TOperationNode::DoCollapse() { } if (Operation == NYql::TKernelRequestBuilder::EBinaryOp::Equals && Children.size() == 2 && Children[1]->Is<TConstantNode>() && Children[0]->Is<TOriginalColumn>()) { - Parent->Exchange(GetNodeId(), std::make_shared<TPackAnd>(Children[0]->As<TOriginalColumn>()->GetNodeId().GetColumnId(), + Parent->Exchange(GetNodeId(), std::make_shared<TPackAnd>(Children[0]->As<TOriginalColumn>()->GetNodeId().BuildOriginalDataAddress(), Children[1]->As<TConstantNode>()->GetConstant())); return true; } @@ -150,7 +141,8 @@ bool TOperationNode::DoCollapse() { } AFL_VERIFY(op); TLikePart likePart(*op, TString((const char*)scalarString->value->data(), scalarString->value->size())); - Parent->Exchange(GetNodeId(), std::make_shared<TPackAnd>(Children[0]->As<TOriginalColumn>()->GetNodeId().GetColumnId(), likePart)); + Parent->Exchange( + GetNodeId(), std::make_shared<TPackAnd>(Children[0]->As<TOriginalColumn>()->GetNodeId().BuildOriginalDataAddress(), likePart)); return true; } if (Operation == NYql::TKernelRequestBuilder::EBinaryOp::And) { @@ -218,6 +210,18 @@ bool TOperationNode::DoCollapse() { return false; } +bool TKernelNode::DoCollapse() { + if (KernelName == "JsonValue" && Children.size() == 2 && Children[1]->Is<TConstantNode>() && Children[0]->Is<TOriginalColumn>()) { + auto scalar = Children[1]->As<TConstantNode>()->GetConstant(); + AFL_VERIFY(scalar->type->id() == arrow::binary()->id() || scalar->type->id() == arrow::utf8()->id())("type", scalar->type->ToString()); + auto scalarString = static_pointer_cast<arrow::BinaryScalar>(scalar); + const TString jsonPath((const char*)scalarString->value->data(), scalarString->value->size()); + Parent->Exchange(GetNodeId(), std::make_shared<TOriginalColumn>(Children[0]->As<TOriginalColumn>()->GetNodeId().GetColumnId(), jsonPath)); + return true; + } + return false; +} + bool TNormalForm::Add(const NArrow::NSSA::IResourceProcessor& processor, const TProgramContainer& program) { if (processor.GetProcessorType() == NArrow::NSSA::EProcessorType::Filter) { return true; @@ -230,7 +234,8 @@ bool TNormalForm::Add(const NArrow::NSSA::IResourceProcessor& processor, const T if (it == Nodes.end()) { it = NodesGlobal.find(arg.GetColumnId()); if (it == NodesGlobal.end()) { - AFL_CRIT(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "program_arg_is_missing")("program", program.DebugString()); + AFL_CRIT(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "program_arg_is_missing")("program", program.DebugString())( + "column_id", arg.GetColumnId()); return false; } data = it->second->Copy(); @@ -259,6 +264,10 @@ bool TNormalForm::Add(const NArrow::NSSA::IResourceProcessor& processor, const T processor.GetOutputColumnIdOnce(), (NYql::TKernelRequestBuilder::EBinaryOp)*calcProcessor->GetYqlOperationId(), argNodes); Nodes.emplace(processor.GetOutputColumnIdOnce(), node); NodesGlobal.emplace(processor.GetOutputColumnIdOnce(), node); + } else if (calcProcessor->GetKernelLogic()) { + auto node = std::make_shared<TKernelNode>(processor.GetOutputColumnIdOnce(), calcProcessor->GetKernelLogic()->GetClassName(), argNodes); + Nodes.emplace(processor.GetOutputColumnIdOnce(), node); + NodesGlobal.emplace(processor.GetOutputColumnIdOnce(), node); } } else { return false; diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/tree.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/tree.h index 3b68300d4ba..655e7444302 100644 --- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/tree.h +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/tree.h @@ -1,4 +1,5 @@ #pragma once +#include "common.h" #include "like.h" #include <ydb/core/formats/arrow/program/abstract.h> @@ -12,62 +13,6 @@ namespace NKikimr::NOlap::NIndexes::NRequest { -enum class ENodeType : ui32 { - Aggregation, - OriginalColumn, - Root, - Operation, - Constant -}; - -class TNodeId { -private: - YDB_READONLY(ui32, ColumnId, 0); - YDB_READONLY(ui32, GenerationId, 0); - YDB_READONLY(ENodeType, NodeType, ENodeType::OriginalColumn); - - static inline TAtomicCounter Counter = 0; - - TNodeId(const ui32 columnId, const ui32 generationId, const ENodeType type) - : ColumnId(columnId) - , GenerationId(generationId) - , NodeType(type) { - } - -public: - bool operator==(const TNodeId& item) const { - return ColumnId == item.ColumnId && GenerationId == item.GenerationId && NodeType == item.NodeType; - } - - TNodeId BuildCopy() const { - return TNodeId(ColumnId, Counter.Inc(), NodeType); - } - - TString ToString() const; - - static TNodeId RootNodeId() { - return TNodeId(0, 0, ENodeType::Root); - } - - static TNodeId Constant(const ui32 columnId) { - return TNodeId(columnId, Counter.Inc(), ENodeType::Constant); - } - - static TNodeId Original(const ui32 columnId); - - static TNodeId Aggregation() { - return TNodeId(0, Counter.Inc(), ENodeType::Aggregation); - } - - static TNodeId Operation(const ui32 columnId) { - return TNodeId(columnId, Counter.Inc(), ENodeType::Operation); - } - - bool operator<(const TNodeId& item) const { - return std::tie(ColumnId, GenerationId, NodeType) < std::tie(item.ColumnId, item.GenerationId, item.NodeType); - } -}; - class IRequestNode { protected: TNodeId NodeId; @@ -211,16 +156,16 @@ protected: } public: - TOriginalColumn(const ui32 columnId) - : TBase(TNodeId::Original(columnId)) { + TOriginalColumn(const ui32 columnId, const TString& subColumnName = "") + : TBase(TNodeId::Original(columnId, subColumnName)) { } }; class TPackAnd: public IRequestNode { private: using TBase = IRequestNode; - THashMap<ui32, std::shared_ptr<arrow::Scalar>> Equals; - THashMap<ui32, TLikeDescription> Likes; + THashMap<TOriginalDataAddress, std::shared_ptr<arrow::Scalar>> Equals; + THashMap<TOriginalDataAddress, TLikeDescription> Likes; bool IsEmptyFlag = false; protected: @@ -235,32 +180,32 @@ protected: public: TPackAnd(const TPackAnd&) = default; - TPackAnd(const ui32 columnId, const std::shared_ptr<arrow::Scalar>& value) + TPackAnd(const TOriginalDataAddress& originalDataAddress, const std::shared_ptr<arrow::Scalar>& value) : TBase(TNodeId::Aggregation()) { - AddEqual(columnId, value); + AddEqual(originalDataAddress, value); } - TPackAnd(const ui32 columnId, const TLikePart& part) + TPackAnd(const TOriginalDataAddress& originalDataAddress, const TLikePart& part) : TBase(TNodeId::Aggregation()) { - AddLike(columnId, TLikeDescription(part)); + AddLike(originalDataAddress, TLikeDescription(part)); } - const THashMap<ui32, std::shared_ptr<arrow::Scalar>>& GetEquals() const { + const THashMap<TOriginalDataAddress, std::shared_ptr<arrow::Scalar>>& GetEquals() const { return Equals; } - const THashMap<ui32, TLikeDescription>& GetLikes() const { + const THashMap<TOriginalDataAddress, TLikeDescription>& GetLikes() const { return Likes; } bool IsEmpty() const { return IsEmptyFlag; } - void AddEqual(const ui32 columnId, const std::shared_ptr<arrow::Scalar>& value); - void AddLike(const ui32 columnId, const TLikeDescription& value) { - auto it = Likes.find(columnId); + void AddEqual(const TOriginalDataAddress& originalDataAddress, const std::shared_ptr<arrow::Scalar>& value); + void AddLike(const TOriginalDataAddress& originalDataAddress, const TLikeDescription& value) { + auto it = Likes.find(originalDataAddress); if (it == Likes.end()) { - Likes.emplace(columnId, value); + Likes.emplace(originalDataAddress, value); } else { it->second.Merge(value); } @@ -309,6 +254,39 @@ public: } }; +class TKernelNode: public IRequestNode { +private: + using TBase = IRequestNode; + const TString KernelName; + +protected: + virtual NJson::TJsonValue DoSerializeToJson() const override { + NJson::TJsonValue result = NJson::JSON_MAP; + result.InsertValue("type", "operation"); + result.InsertValue("kernel_name", KernelName); + return result; + } + + virtual bool DoCollapse() override; + virtual std::shared_ptr<IRequestNode> DoCopy() const override { + std::vector<std::shared_ptr<IRequestNode>> children; + return std::make_shared<TKernelNode>(GetNodeId().GetColumnId(), KernelName, children); + } + +public: + const TString GetKernelName() const { + return KernelName; + } + + TKernelNode(const ui32 columnId, const TString kernelName, const std::vector<std::shared_ptr<IRequestNode>>& args) + : TBase(TNodeId::Operation(columnId)) + , KernelName(kernelName) { + for (auto&& i : args) { + Attach(i); + } + } +}; + class TNormalForm { private: std::map<ui32, std::shared_ptr<IRequestNode>> Nodes; diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/ut/ut_program.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/ut/ut_program.cpp index 3e356f50804..fc8c384cad6 100644 --- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/ut/ut_program.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/ut/ut_program.cpp @@ -47,7 +47,8 @@ Y_UNIT_TEST_SUITE(TestProgramBloomCoverage) { AFL_VERIFY(coverage); AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("coverage", coverage->DebugString()); AFL_VERIFY(coverage->GetBranches().size() == 1)("coverage", coverage->DebugString()); - AFL_VERIFY(coverage->GetBranches().front()->DebugString() == R"({"likes":{"7":{"sequences":["%amet."]}}})")("coverage", coverage->DebugString()); + AFL_VERIFY(coverage->GetBranches().front()->DebugString() == R"({"likes":{"7":{"sequences":["%amet."]}}})")( + "coverage", coverage->DebugString()); } } @@ -164,4 +165,32 @@ Y_UNIT_TEST_SUITE(TestProgramBloomCoverage) { AFL_VERIFY(coverage->GetBranches()[0]->DebugString() = R"({"likes":{"9":{"sequences":["%like_string"]},"7":{"sequences":["%like_string"]}},"equals":{"9":"equals_string","7":"equals_string"}})"); } } + + Y_UNIT_TEST(JsonSubColumnUsage) { + TIndexInfo indexInfo = BuildTableInfo(testColumns, testKey); + NReader::NCommon::TIndexColumnResolver columnResolver(indexInfo); + + TProgramProtoBuilder builder; + const auto idPathString1 = builder.AddConstant("json.path1"); + const auto idPathString2 = builder.AddConstant("json.path2"); + const auto idColumn = columnResolver.GetColumnIdVerified("json_binary"); + const auto idJsonValue1 = builder.AddOperation("JsonValue", { idColumn, idPathString1 }); + const auto idJsonValue2 = builder.AddOperation("JsonValue", { idColumn, idPathString2 }); + + const auto idLikeString = builder.AddConstant("like_string"); + const auto idEqualString = builder.AddConstant("equals_string"); + const auto idEndsWith1 = builder.AddOperation(NYql::TKernelRequestBuilder::EBinaryOp::EndsWith, { idJsonValue1, idLikeString }); + const auto idEquals1 = builder.AddOperation(NYql::TKernelRequestBuilder::EBinaryOp::Equals, { idJsonValue2, idEqualString }); + const auto idFilter1 = builder.AddOperation(NYql::TKernelRequestBuilder::EBinaryOp::And, { idEndsWith1, idEquals1 }); + builder.AddFilter(idFilter1); + { + TProgramContainer program; + program.Init(columnResolver, builder.FinishProto()).Validate(); + auto coverage = NOlap::NIndexes::NRequest::TDataForIndexesCheckers::Build(program); + AFL_VERIFY(coverage); + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("coverage", coverage->DebugString()); + AFL_VERIFY(coverage->GetBranches().size() == 1); + AFL_VERIFY(coverage->GetBranches().front()->DebugString() == R"({"likes":{"{cId=6;sub=json.path1}":{"sequences":["%like_string"]}},"equals":{"{cId=6;sub=json.path2}":"equals_string"}})"); + } + } } diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/ya.make b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/ya.make index 935fdb80b44..d764849e301 100644 --- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/ya.make +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/ya.make @@ -13,6 +13,7 @@ SRCS( tree.cpp coverage.cpp like.cpp + common.cpp ) PEERDIR( @@ -22,7 +23,7 @@ PEERDIR( ydb/core/formats/arrow/program ) -GENERATE_ENUM_SERIALIZATION(tree.h) +GENERATE_ENUM_SERIALIZATION(common.h) YQL_LAST_ABI_VERSION() diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/checker.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/checker.cpp index 1afa71281e5..93cb8029a43 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/checker.cpp +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/checker.cpp @@ -23,6 +23,7 @@ bool TBloomFilterChecker::DoCheckImpl(const std::vector<TString>& blobs) const { break; } } +// Cerr << bits.DebugString() << Endl; if (found) { // AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("size", bArray.length())("data", bArray.ToString())("index_id", GetIndexId()); return true; diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/checker.h b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/checker.h index 8192a2fb8cf..58b55d012ad 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/checker.h +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/checker.h @@ -43,6 +43,9 @@ public: ui32 count1 = 0; ui32 count0 = 0; for (ui32 i = 0; i < GetSizeBits(); ++i) { +// if (i % 20 == 0 && i) { +// sb << i << " "; +// } if (Get(i)) { // sb << 1 << " "; ++count1; @@ -50,9 +53,6 @@ public: // sb << 0 << " "; ++count0; } -// if (i % 20 == 0) { -// sb << i << " "; -// } } sb << GetSizeBits() << "=" << count0 << "[0]+" << count1 << "[1]"; return sb; diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/constructor.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/constructor.cpp index fa11002fe17..83922c63e3d 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/constructor.cpp +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/constructor.cpp @@ -1,11 +1,13 @@ #include "constructor.h" #include "meta.h" +#include <ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/default.h> #include <ydb/core/tx/schemeshard/olap/schema/schema.h> namespace NKikimr::NOlap::NIndexes { -std::shared_ptr<NKikimr::NOlap::NIndexes::IIndexMeta> TBloomIndexConstructor::DoCreateIndexMeta(const ui32 indexId, const TString& indexName, const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const { +std::shared_ptr<IIndexMeta> TBloomIndexConstructor::DoCreateIndexMeta( + const ui32 indexId, const TString& indexName, const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const { std::set<ui32> columnIds; for (auto&& i : ColumnNames) { auto* columnInfo = currentSchema.GetColumns().GetByName(i); @@ -15,7 +17,9 @@ std::shared_ptr<NKikimr::NOlap::NIndexes::IIndexMeta> TBloomIndexConstructor::Do } AFL_VERIFY(columnIds.emplace(columnInfo->GetId()).second); } - return std::make_shared<TBloomIndexMeta>(indexId, indexName, GetStorageId().value_or(NBlobOperations::TGlobal::DefaultStorageId), columnIds, FalsePositiveProbability); + AFL_VERIFY(columnIds.size() == 1); + return std::make_shared<TBloomIndexMeta>(indexId, indexName, GetStorageId().value_or(NBlobOperations::TGlobal::DefaultStorageId), *columnIds.begin(), + FalsePositiveProbability, std::make_shared<TDefaultDataExtractor>()); } NKikimr::TConclusionStatus TBloomIndexConstructor::DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) { @@ -28,13 +32,23 @@ NKikimr::TConclusionStatus TBloomIndexConstructor::DoDeserializeFromJson(const N } for (auto&& i : *columnNamesArray) { if (!i.IsString()) { - return TConclusionStatus::Fail("column_names have to be in bloom filter features as array of strings ['column_name_1', ... , 'column_name_N']"); + return TConclusionStatus::Fail( + "column_names have to be in bloom filter features as array of strings ['column_name_1', ... , 'column_name_N']"); } ColumnNames.emplace(i.GetString()); } + if (ColumnNames.size() != 1) { + return TConclusionStatus::Fail("column_names count possible only 1 temporary"); + } if (!jsonInfo["false_positive_probability"].IsDouble()) { return TConclusionStatus::Fail("false_positive_probability have to be in bloom filter features as double field"); } + { + auto conclusion = DataExtractor.DeserializeFromJson(jsonInfo["data_extractor"]); + if (conclusion.IsFail()) { + return conclusion; + } + } FalsePositiveProbability = jsonInfo["false_positive_probability"].GetDouble(); if (FalsePositiveProbability < 0.01 || FalsePositiveProbability >= 1) { return TConclusionStatus::Fail("false_positive_probability have to be in bloom filter features as double field in interval [0.01, 1)"); @@ -50,6 +64,9 @@ NKikimr::TConclusionStatus TBloomIndexConstructor::DoDeserializeFromProto(const } auto& bFilter = proto.GetBloomFilter(); FalsePositiveProbability = bFilter.GetFalsePositiveProbability(); + if (!DataExtractor.DeserializeFromProto(bFilter.GetDataExtractor())) { + return TConclusionStatus::Fail("cannot parse data extractor: " + bFilter.GetDataExtractor().DebugString()); + } if (FalsePositiveProbability < 0.01 || FalsePositiveProbability >= 1) { const TString errorMessage = "FalsePositiveProbability have to be in interval[0.01, 1)"; AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("problem", errorMessage); @@ -67,6 +84,7 @@ void TBloomIndexConstructor::DoSerializeToProto(NKikimrSchemeOp::TOlapIndexReque for (auto&& i : ColumnNames) { filterProto->AddColumnNames(i); } + *filterProto->MutableDataExtractor() = DataExtractor.SerializeToProto(); } -} // namespace NKikimr::NOlap::NIndexes
\ No newline at end of file +} // namespace NKikimr::NOlap::NIndexes diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/constructor.h b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/constructor.h index 1801027416b..cb202765fdc 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/constructor.h +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/constructor.h @@ -1,5 +1,6 @@ #pragma once #include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract/constructor.h> +#include <ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/abstract.h> namespace NKikimr::NOlap::NIndexes { class TBloomIndexConstructor: public IIndexMetaConstructor { @@ -7,12 +8,16 @@ public: static TString GetClassNameStatic() { return "BLOOM_FILTER"; } + private: std::set<TString> ColumnNames; double FalsePositiveProbability = 0.1; + TReadDataExtractorContainer DataExtractor; static inline auto Registrator = TFactory::TRegistrator<TBloomIndexConstructor>(GetClassNameStatic()); + protected: - virtual std::shared_ptr<IIndexMeta> DoCreateIndexMeta(const ui32 indexId, const TString& indexName, const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const override; + virtual std::shared_ptr<IIndexMeta> DoCreateIndexMeta(const ui32 indexId, const TString& indexName, + const NSchemeShard::TOlapSchema& currentSchema, NSchemeShard::IErrorCollector& errors) const override; virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) override; @@ -27,4 +32,4 @@ public: } }; -} // namespace NKikimr::NOlap::NIndexes
\ No newline at end of file +} // namespace NKikimr::NOlap::NIndexes diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.cpp index d9e61c5cf81..df3f5610ea0 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.cpp +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.cpp @@ -1,56 +1,87 @@ -#include "meta.h" #include "checker.h" -#include <ydb/library/formats/arrow/hash/xx_hash.h> +#include "meta.h" + #include <ydb/core/formats/arrow/hash/calcer.h> #include <ydb/core/tx/program/program.h> #include <ydb/core/tx/schemeshard/olap/schema/schema.h> +#include <ydb/library/formats/arrow/hash/xx_hash.h> + #include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_primitive.h> #include <library/cpp/deprecated/atomic/atomic.h> namespace NKikimr::NOlap::NIndexes { -TString TBloomIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader, const ui32 recordsCount) const { - const ui32 bitsCount = TFixStringBitsStorage::GrowBitsCountToByte(HashesCount * recordsCount / std::log(2)); - std::vector<bool> filterBits(bitsCount, false); - for (ui32 i = 0; i < HashesCount; ++i) { - NArrow::NHash::NXX64::TStreamStringHashCalcer_H3 hashCalcer(i); - for (reader.Start(); reader.IsCorrect(); reader.ReadNext()) { - hashCalcer.Start(); - for (auto&& i : reader) { - NArrow::NHash::TXX64::AppendField(i.GetCurrentChunk(), i.GetCurrentRecordIndex(), hashCalcer); - } - filterBits[hashCalcer.Finish() % bitsCount] = true; +TString TBloomIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader, const ui32 /*recordsCount*/) const { + std::deque<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> dataOwners; + ui32 indexHitsCount = 0; + for (reader.Start(); reader.IsCorrect();) { + AFL_VERIFY(reader.GetColumnsCount() == 1); + for (auto&& i : reader) { + dataOwners.emplace_back(i.GetCurrentChunk()); + indexHitsCount += GetDataExtractor()->GetIndexHitsCount(dataOwners.back()); } + reader.ReadNext(reader.begin()->GetCurrentChunk()->GetRecordsCount()); + } + const ui32 bitsCount = TFixStringBitsStorage::GrowBitsCountToByte(HashesCount * std::max<ui32>(indexHitsCount, 10) / std::log(2)); + std::vector<bool> filterBits(bitsCount, false); + + while (dataOwners.size()) { + GetDataExtractor()->VisitAll( + dataOwners.front(), + [&](const std::shared_ptr<arrow::Array>& arr, const ui64 hashBase) { + for (ui32 idx = 0; idx < arr->length(); ++idx) { + for (ui32 i = 0; i < HashesCount; ++i) { + NArrow::NHash::NXX64::TStreamStringHashCalcer_H3 hashCalcer(i); + hashCalcer.Start(); + if (hashBase) { + hashCalcer.Update((const ui8*)&hashBase, sizeof(hashBase)); + } + NArrow::NHash::TXX64::AppendField(arr, idx, hashCalcer); + filterBits[hashCalcer.Finish() % bitsCount] = true; + } + } + }, + [&](const std::string_view data, const ui64 hashBase) { + for (ui32 i = 0; i < HashesCount; ++i) { + NArrow::NHash::NXX64::TStreamStringHashCalcer_H3 hashCalcer(i); + hashCalcer.Start(); + if (hashBase) { + hashCalcer.Update((const ui8*)&hashBase, sizeof(hashBase)); + } + hashCalcer.Update((const ui8*)data.data(), data.size()); + filterBits[hashCalcer.Finish() % bitsCount] = true; + } + }); + dataOwners.pop_front(); } return TFixStringBitsStorage(filterBits).GetData(); } -void TBloomIndexMeta::DoFillIndexCheckers(const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& /*schema*/) const { +void TBloomIndexMeta::DoFillIndexCheckers( + const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& /*schema*/) const { for (auto&& branch : info->GetBranches()) { - std::map<ui32, std::shared_ptr<arrow::Scalar>> foundColumns; - for (auto&& cId : ColumnIds) { - auto itEqual = branch->GetEquals().find(cId); - if (itEqual == branch->GetEquals().end()) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("warn", "column not found for equal")("id", cId); - break; + for (auto&& i : branch->GetEquals()) { + if (i.first.GetColumnId() != GetColumnId()) { + continue; } - foundColumns.emplace(cId, itEqual->second); - } - if (foundColumns.size() != ColumnIds.size()) { - continue; - } - std::set<ui64> hashes; - for (ui32 i = 0; i < HashesCount; ++i) { - NArrow::NHash::NXX64::TStreamStringHashCalcer_H3 calcer(i); - calcer.Start(); - for (auto&& i : foundColumns) { + ui64 hashBase = 0; + if (!GetDataExtractor()->CheckForIndex(i.first, hashBase)) { + continue; + } + std::set<ui64> hashes; + for (ui32 hashSeed = 0; hashSeed < HashesCount; ++hashSeed) { + NArrow::NHash::NXX64::TStreamStringHashCalcer_H3 calcer(hashSeed); + calcer.Start(); + if (hashBase) { + calcer.Update((const ui8*)&hashBase, sizeof(hashBase)); + } NArrow::NHash::TXX64::AppendField(i.second, calcer); + hashes.emplace(calcer.Finish()); } - hashes.emplace(calcer.Finish()); + branch->MutableIndexes().emplace_back(std::make_shared<TBloomFilterChecker>(GetIndexId(), std::move(hashes))); } - branch->MutableIndexes().emplace_back(std::make_shared<TBloomFilterChecker>(GetIndexId(), std::move(hashes))); } } diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.h b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.h index cfd9bc85cf2..2ff9a5a4c34 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.h +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.h @@ -21,26 +21,6 @@ private: HashesCount = -1 * std::log(FalsePositiveProbability) / std::log(2); } - static const ui64 HashesConstructorP = ((ui64)2 << 31) - 1; - static const ui64 HashesConstructorA = (ui64)2 << 16; - - template <class TActor> - void BuildHashesSet(const ui64 originalHash, const TActor& actor) const { - AFL_VERIFY(HashesCount < HashesConstructorP); - for (ui32 b = 1; b < HashesCount; ++b) { - const ui64 hash = (HashesConstructorA * originalHash + b) % HashesConstructorP; - actor(hash); - } - } - - template <class TContainer, class TActor> - void BuildHashesSet(const TContainer& originalHashes, const TActor& actor) const { - AFL_VERIFY(HashesCount < HashesConstructorP); - for (auto&& hOriginal : originalHashes) { - BuildHashesSet(hOriginal, actor); - } - } - protected: virtual TConclusionStatus DoCheckModificationCompatibility(const IIndexMeta& newMeta) const override { const auto* bMeta = dynamic_cast<const TBloomIndexMeta*>(&newMeta); @@ -60,7 +40,10 @@ protected: auto& bFilter = proto.GetBloomFilter(); FalsePositiveProbability = bFilter.GetFalsePositiveProbability(); for (auto&& i : bFilter.GetColumnIds()) { - ColumnIds.emplace(i); + AddColumnId(i); + } + if (!MutableDataExtractor().DeserializeFromProto(bFilter.GetDataExtractor())) { + return false; } Initialize(); return true; @@ -68,15 +51,17 @@ protected: virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const override { auto* filterProto = proto.MutableBloomFilter(); filterProto->SetFalsePositiveProbability(FalsePositiveProbability); - for (auto&& i : ColumnIds) { + for (auto&& i : GetColumnIds()) { filterProto->AddColumnIds(i); } + *filterProto->MutableDataExtractor() = GetDataExtractor().SerializeToProto(); } public: TBloomIndexMeta() = default; - TBloomIndexMeta(const ui32 indexId, const TString& indexName, const TString& storageId, std::set<ui32>& columnIds, const double fpProbability) - : TBase(indexId, indexName, columnIds, storageId) + TBloomIndexMeta(const ui32 indexId, const TString& indexName, const TString& storageId, const ui32 columnId, + const double fpProbability, const TReadDataExtractorContainer& dataExtractor) + : TBase(indexId, indexName, columnId, storageId, dataExtractor) , FalsePositiveProbability(fpProbability) { Initialize(); } diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.cpp index 8929d9fd4c5..3a57c06618f 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.cpp +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.cpp @@ -2,6 +2,7 @@ #include "constructor.h" #include "meta.h" +#include <ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/default.h> #include <ydb/core/tx/schemeshard/olap/schema/schema.h> namespace NKikimr::NOlap::NIndexes::NBloomNGramm { @@ -15,7 +16,7 @@ std::shared_ptr<IIndexMeta> TIndexConstructor::DoCreateIndexMeta( } const ui32 columnId = columnInfo->GetId(); return std::make_shared<TIndexMeta>(indexId, indexName, GetStorageId().value_or(NBlobOperations::TGlobal::DefaultStorageId), columnId, - HashesCount, FilterSizeBytes, NGrammSize, RecordsCount); + DataExtractor, HashesCount, FilterSizeBytes, NGrammSize, RecordsCount); } TConclusionStatus TIndexConstructor::DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) { @@ -29,12 +30,20 @@ TConclusionStatus TIndexConstructor::DoDeserializeFromJson(const NJson::TJsonVal return TConclusionStatus::Fail("empty column_name in bloom ngramm filter features"); } + { + auto conclusion = DataExtractor.DeserializeFromJson(jsonInfo["data_extractor"]); + if (conclusion.IsFail()) { + return conclusion; + } + } + if (!jsonInfo["records_count"].IsUInteger()) { return TConclusionStatus::Fail("records_count have to be in bloom filter features as uint field"); } RecordsCount = jsonInfo["records_count"].GetUInteger(); if (!TConstants::CheckRecordsCount(RecordsCount)) { - return TConclusionStatus::Fail("records_count have to be in bloom ngramm filter in interval " + TConstants::GetRecordsCountIntervalString()); + return TConclusionStatus::Fail( + "records_count have to be in bloom ngramm filter in interval " + TConstants::GetRecordsCountIntervalString()); } if (!jsonInfo["ngramm_size"].IsUInteger()) { @@ -92,6 +101,9 @@ NKikimr::TConclusionStatus TIndexConstructor::DoDeserializeFromProto(const NKiki if (!ColumnName) { return TConclusionStatus::Fail("empty column name"); } + if (!DataExtractor.DeserializeFromProto(bFilter.GetDataExtractor())) { + return TConclusionStatus::Fail("cannot parse data extractor from proto: " + bFilter.GetDataExtractor().DebugString()); + } return TConclusionStatus::Success(); } @@ -102,6 +114,7 @@ void TIndexConstructor::DoSerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& filterProto->SetNGrammSize(NGrammSize); filterProto->SetFilterSizeBytes(FilterSizeBytes); filterProto->SetHashesCount(HashesCount); + *filterProto->MutableDataExtractor() = DataExtractor.SerializeToProto(); } } // namespace NKikimr::NOlap::NIndexes::NBloomNGramm diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.h b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.h index 209b1a5de8f..1e29eb0ecd5 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.h +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.h @@ -1,5 +1,6 @@ #pragma once #include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract/constructor.h> +#include <ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/abstract.h> namespace NKikimr::NOlap::NIndexes::NBloomNGramm { class TIndexConstructor: public IIndexMetaConstructor { @@ -10,6 +11,7 @@ public: private: TString ColumnName; + TReadDataExtractorContainer DataExtractor; ui32 NGrammSize = 3; ui32 FilterSizeBytes = 512; ui32 HashesCount = 2; diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.cpp index 2c5d294cb77..124a7e5f9c5 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.cpp +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.cpp @@ -38,6 +38,7 @@ private: template <ui32 HashIdx, ui32 CharsCount> class THashesCountSelector { static constexpr ui64 HashStart = (ui64)HashIdx * (ui64)2166136261; + public: template <class TActor> static void BuildHashes(const ui8* data, TActor& actor) { @@ -134,6 +135,11 @@ private: } }; +public: + TNGrammBuilder(const ui32 hashesCount) + : HashesCount(hashesCount) { + } + template <class TAction> void BuildNGramms( const char* data, const ui32 dataSize, const std::optional<NRequest::TLikePart::EOperation> op, const ui32 nGrammSize, TAction& pred) { @@ -141,11 +147,6 @@ private: (const ui8*)data, dataSize, HashesCount, nGrammSize, op, pred); } -public: - TNGrammBuilder(const ui32 hashesCount) - : HashesCount(hashesCount) { - } - template <class TFiller> void FillNGrammHashes(const ui32 nGrammSize, const std::shared_ptr<arrow::Array>& array, TFiller& fillData) { AFL_VERIFY(array->type_id() == arrow::utf8()->id())("id", array->type()->ToString()); @@ -229,8 +230,18 @@ TString TIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader, const ui32 rec const auto doFillFilter = [&](auto& inserter) { for (reader.Start(); reader.IsCorrect();) { - builder.FillNGrammHashes(NGrammSize, reader.begin()->GetCurrentChunk(), inserter); - reader.ReadNext(reader.begin()->GetCurrentChunk()->length()); + AFL_VERIFY(reader.GetColumnsCount() == 1); + for (auto&& r : reader) { + GetDataExtractor()->VisitAll( + r.GetCurrentChunk(), + [&](const std::shared_ptr<arrow::Array>& arr, const ui32 /*hashBase*/) { + builder.FillNGrammHashes(NGrammSize, arr, inserter); + }, + [&](const std::string_view data, const ui32 /*hashBase*/) { + builder.BuildNGramms(data.data(), data.size(), {}, NGrammSize, inserter); + }); + } + reader.ReadNext(reader.begin()->GetCurrentChunk()->GetRecordsCount()); } }; @@ -247,30 +258,24 @@ TString TIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader, const ui32 rec void TIndexMeta::DoFillIndexCheckers( const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& /*schema*/) const { for (auto&& branch : info->GetBranches()) { - std::map<ui32, NRequest::TLikeDescription> foundColumns; - for (auto&& cId : ColumnIds) { - auto it = branch->GetLikes().find(cId); - if (it == branch->GetLikes().end()) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("warn", "not found like for column")("id", cId); - break; + for (auto&& i : branch->GetLikes()) { + if (i.first.GetColumnId() != GetColumnId()) { + continue; } - foundColumns.emplace(cId, it->second); - } - if (foundColumns.size() != ColumnIds.size()) { - continue; - } - - std::set<ui64> hashes; - const auto predSet = [&](const ui64 hashSecondary) { - hashes.emplace(hashSecondary); - }; - TNGrammBuilder builder(HashesCount); - for (auto&& c : foundColumns) { - for (auto&& ls : c.second.GetLikeSequences()) { + ui64 hashBase; + if (!GetDataExtractor()->CheckForIndex(i.first, hashBase)) { + continue; + } + std::set<ui64> hashes; + const auto predSet = [&](const ui64 hashSecondary) { + hashes.emplace(hashSecondary); + }; + TNGrammBuilder builder(HashesCount); + for (auto&& ls : i.second.GetLikeSequences()) { builder.FillNGrammHashes(NGrammSize, ls.second.GetOperation(), ls.second.GetValue(), predSet); } + branch->MutableIndexes().emplace_back(std::make_shared<TFilterChecker>(GetIndexId(), std::move(hashes))); } - branch->MutableIndexes().emplace_back(std::make_shared<TFilterChecker>(GetIndexId(), std::move(hashes))); } } diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.h b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.h index 1e9135e8c9d..9291219402c 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.h +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.h @@ -54,6 +54,9 @@ protected: return false; } } + if (!MutableDataExtractor().DeserializeFromProto(bFilter.GetDataExtractor())) { + return false; + } HashesCount = bFilter.GetHashesCount(); if (!TConstants::CheckHashesCount(HashesCount)) { return false; @@ -69,7 +72,7 @@ protected: if (!bFilter.HasColumnId() || !bFilter.GetColumnId()) { return false; } - ColumnIds.emplace(bFilter.GetColumnId()); + AddColumnId(bFilter.GetColumnId()); Initialize(); return true; } @@ -79,19 +82,19 @@ protected: AFL_VERIFY(TConstants::CheckFilterSizeBytes(FilterSizeBytes)); AFL_VERIFY(TConstants::CheckHashesCount(HashesCount)); AFL_VERIFY(TConstants::CheckRecordsCount(RecordsCount)); - AFL_VERIFY(ColumnIds.size() == 1); filterProto->SetRecordsCount(RecordsCount); filterProto->SetNGrammSize(NGrammSize); filterProto->SetFilterSizeBytes(FilterSizeBytes); filterProto->SetHashesCount(HashesCount); - filterProto->SetColumnId(*ColumnIds.begin()); + filterProto->SetColumnId(GetColumnId()); + *filterProto->MutableDataExtractor() = GetDataExtractor().SerializeToProto(); } public: TIndexMeta() = default; - TIndexMeta(const ui32 indexId, const TString& indexName, const TString& storageId, const ui32 columnId, const ui32 hashesCount, + TIndexMeta(const ui32 indexId, const TString& indexName, const TString& storageId, const ui32 columnId, const TReadDataExtractorContainer& dataExtractor, const ui32 hashesCount, const ui32 filterSizeBytes, const ui32 nGrammSize, const ui32 recordsCount) - : TBase(indexId, indexName, { columnId }, storageId) + : TBase(indexId, indexName, columnId, storageId, dataExtractor) , NGrammSize(nGrammSize) , FilterSizeBytes(filterSizeBytes) , RecordsCount(recordsCount) diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/constructor.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/constructor.cpp index 362ee4a098b..118ea0537fb 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/constructor.cpp +++ b/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/constructor.cpp @@ -20,7 +20,8 @@ std::shared_ptr<NKikimr::NOlap::NIndexes::IIndexMeta> TCountMinSketchConstructor } AFL_VERIFY(columnIds.emplace(columnInfo->GetId()).second); } - return std::make_shared<TIndexMeta>(indexId, indexName, GetStorageId().value_or(NBlobOperations::TGlobal::DefaultStorageId), columnIds); + AFL_VERIFY(columnIds.size() == 1); + return std::make_shared<TIndexMeta>(indexId, indexName, GetStorageId().value_or(NBlobOperations::TGlobal::DefaultStorageId), *columnIds.begin()); } NKikimr::TConclusionStatus TCountMinSketchConstructor::DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) { @@ -37,6 +38,10 @@ NKikimr::TConclusionStatus TCountMinSketchConstructor::DoDeserializeFromJson(con } ColumnNames.emplace(i.GetString()); } + if (ColumnNames.size() > 1) { + return TConclusionStatus::Fail( + "column_names elements count have to be equal to 1 temporary"); + } return TConclusionStatus::Success(); } diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.cpp index c48b9996964..6cb012b960f 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.cpp +++ b/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.cpp @@ -16,30 +16,31 @@ TString TIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader, const ui32 /*r for (auto& colReader : reader) { for (colReader.Start(); colReader.IsCorrect(); colReader.ReadNextChunk()) { - auto array = colReader.GetCurrentChunk(); - - NArrow::SwitchType(array->type_id(), [&](const auto& type) { - using TWrap = std::decay_t<decltype(type)>; - using TArray = typename arrow::TypeTraits<typename TWrap::T>::ArrayType; - - const TArray& arrTyped = static_cast<const TArray&>(*array); - if constexpr (arrow::has_c_type<typename TWrap::T>()) { - for (int64_t i = 0; i < arrTyped.length(); ++i) { - auto cell = TCell::Make(arrTyped.Value(i)); - sketch->Count(cell.Data(), cell.Size()); + auto cArray = colReader.GetCurrentChunk()->GetChunkedArray(); + for (auto&& array : cArray->chunks()) { + NArrow::SwitchType(array->type_id(), [&](const auto& type) { + using TWrap = std::decay_t<decltype(type)>; + using TArray = typename arrow::TypeTraits<typename TWrap::T>::ArrayType; + + const TArray& arrTyped = static_cast<const TArray&>(*array); + if constexpr (arrow::has_c_type<typename TWrap::T>()) { + for (int64_t i = 0; i < arrTyped.length(); ++i) { + auto cell = TCell::Make(arrTyped.Value(i)); + sketch->Count(cell.Data(), cell.Size()); + } + return true; } - return true; - } - if constexpr (arrow::has_string_view<typename TWrap::T>()) { - for (int64_t i = 0; i < arrTyped.length(); ++i) { - auto view = arrTyped.GetView(i); - sketch->Count(view.data(), view.size()); + if constexpr (arrow::has_string_view<typename TWrap::T>()) { + for (int64_t i = 0; i < arrTyped.length(); ++i) { + auto view = arrTyped.GetView(i); + sketch->Count(view.data(), view.size()); + } + return true; } - return true; - } - AFL_VERIFY(false)("message", "Unsupported arrow type for building an index"); - return false; - }); + AFL_VERIFY(false)("message", "Unsupported arrow type for building an index"); + return false; + }); + } } } diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.h b/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.h index cb7abe56c61..78ab255c4f6 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.h +++ b/ydb/core/tx/columnshard/engines/storage/indexes/count_min_sketch/meta.h @@ -1,4 +1,5 @@ #pragma once +#include <ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/default.h> #include <ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.h> namespace NKikimr::NOlap::NIndexes::NCountMinSketch { @@ -18,12 +19,14 @@ protected: virtual TConclusionStatus DoCheckModificationCompatibility(const IIndexMeta& newMeta) const override { const auto* bMeta = dynamic_cast<const TIndexMeta*>(&newMeta); if (!bMeta) { - return TConclusionStatus::Fail("cannot read meta as appropriate class: " + GetClassName() + ". Meta said that class name is " + newMeta.GetClassName()); + return TConclusionStatus::Fail( + "cannot read meta as appropriate class: " + GetClassName() + ". Meta said that class name is " + newMeta.GetClassName()); } return TBase::CheckSameColumnsForModification(newMeta); } - virtual void DoFillIndexCheckers(const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& schema) const override; + virtual void DoFillIndexCheckers( + const std::shared_ptr<NRequest::TDataForIndexesCheckers>& info, const NSchemeShard::TOlapSchema& schema) const override; virtual TString DoBuildIndexImpl(TChunkedBatchReader& reader, const ui32 recordsCount) const override; @@ -32,32 +35,27 @@ protected: AFL_VERIFY(proto.HasCountMinSketch()); auto& sketch = proto.GetCountMinSketch(); for (auto&& i : sketch.GetColumnIds()) { - ColumnIds.emplace(i); + AddColumnId(i); } return true; } virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const override { auto* sketchProto = proto.MutableCountMinSketch(); - for (auto&& i : ColumnIds) { + for (auto&& i : GetColumnIds()) { sketchProto->AddColumnIds(i); } } public: TIndexMeta() = default; - TIndexMeta(const ui32 indexId, const TString& indexName, const TString& storageId, const std::set<ui32>& columnIds) - : TBase(indexId, indexName, columnIds, storageId) { + TIndexMeta(const ui32 indexId, const TString& indexName, const TString& storageId, const ui32 columnId) + : TBase(indexId, indexName, columnId, storageId, std::make_shared<TDefaultDataExtractor>()) { } virtual TString GetClassName() const override { return GetClassNameStatic(); } - - const std::set<ui32>& GetColumnIds() const { - return ColumnIds; - } - }; -} // namespace NKikimr::NOlap::NIndexes +} // namespace NKikimr::NOlap::NIndexes::NCountMinSketch diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/max/meta.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/max/meta.cpp index e7dbcdd8bfe..7d5467a55c0 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/max/meta.cpp +++ b/ydb/core/tx/columnshard/engines/storage/indexes/max/meta.cpp @@ -16,7 +16,7 @@ TString TIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader, const ui32 /*r { TChunkedColumnReader cReader = *reader.begin(); for (reader.Start(); cReader.IsCorrect(); cReader.ReadNextChunk()) { - auto currentScalar = cReader.GetCurrentAccessor()->GetMaxScalar(); + auto currentScalar = cReader.GetCurrentChunk()->GetMaxScalar(); AFL_VERIFY(currentScalar); if (!result || NArrow::ScalarCompare(*result, *currentScalar) == -1) { result = currentScalar; @@ -44,8 +44,7 @@ std::shared_ptr<arrow::Scalar> TIndexMeta::GetMaxScalarVerified( } NJson::TJsonValue TIndexMeta::DoSerializeDataToJson(const TString& data, const TIndexInfo& indexInfo) const { - AFL_VERIFY(ColumnIds.size() == 1); - auto scalar = GetMaxScalarVerified({ data }, indexInfo.GetColumnFeaturesVerified(*ColumnIds.begin()).GetArrowField()->type()); + auto scalar = GetMaxScalarVerified({ data }, indexInfo.GetColumnFeaturesVerified(GetColumnId()).GetArrowField()->type()); return scalar->ToString(); } diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/max/meta.h b/ydb/core/tx/columnshard/engines/storage/indexes/max/meta.h index 2925e9fbc6c..5e024612e88 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/max/meta.h +++ b/ydb/core/tx/columnshard/engines/storage/indexes/max/meta.h @@ -1,5 +1,6 @@ #pragma once #include <ydb/core/formats/arrow/arrow_helpers.h> +#include <ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/default.h> #include <ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.h> #include <ydb/library/formats/arrow/switch/switch_type.h> @@ -34,27 +35,21 @@ protected: AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("problem", "incorrect column id"); return false; }; - ColumnIds.emplace(bFilter.GetColumnId()); + AddColumnId(bFilter.GetColumnId()); return true; } virtual NJson::TJsonValue DoSerializeDataToJson(const TString& data, const TIndexInfo& indexInfo) const override; virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const override { - AFL_VERIFY(ColumnIds.size() == 1); auto* filterProto = proto.MutableMaxIndex(); - filterProto->SetColumnId(*ColumnIds.begin()); + filterProto->SetColumnId(GetColumnId()); } public: TIndexMeta() = default; TIndexMeta(const ui32 indexId, const TString& indexName, const TString& storageId, const ui32& columnId) - : TBase(indexId, indexName, { columnId }, storageId) { - } - - ui32 GetColumnId() const { - AFL_VERIFY(ColumnIds.size() == 1); - return *ColumnIds.begin(); + : TBase(indexId, indexName, columnId, storageId, std::make_shared<TDefaultDataExtractor>()) { } static bool IsAvailableType(const NScheme::TTypeInfo type) { diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/abstract.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/abstract.cpp new file mode 100644 index 00000000000..e7cb68807a8 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/abstract.cpp @@ -0,0 +1,36 @@ +#include "abstract.h" +#include "default.h" + +namespace NKikimr::NOlap::NIndexes { + +TConclusionStatus TReadDataExtractorContainer::DeserializeFromJson(const NJson::TJsonValue& jsonValue) { + if (!jsonValue.IsDefined() || jsonValue.IsNull()) { + if (!Initialize(TDefaultDataExtractor::GetClassNameStatic())) { + return TConclusionStatus::Fail( + "cannot build default data extractor ('" + TDefaultDataExtractor::GetClassNameStatic() + "')"); + } + return TConclusionStatus::Success(); + } + const TString className = jsonValue["class_name"].GetStringRobust(); + auto dataExtractor = IReadDataExtractor::TFactory::MakeHolder(className); + if (!dataExtractor) { + return TConclusionStatus::Fail("cannot build data extractor (unexpected class name: '" + className + "')"); + } + auto parseConclusion = dataExtractor->DeserializeFromJson(jsonValue); + if (parseConclusion.IsFail()) { + return parseConclusion; + } + Object = std::shared_ptr<IReadDataExtractor>(dataExtractor.Release()); + return TConclusionStatus::Success(); +} + +bool TReadDataExtractorContainer::DeserializeFromProto(const IReadDataExtractor::TProto& data) { + if (!data.GetClassName()) { + AFL_VERIFY(Initialize(TDefaultDataExtractor::GetClassNameStatic())); + return true; + } else { + return TBase::DeserializeFromProto(data); + } +} + +} // namespace NKikimr::NOlap::NIndexes diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/abstract.h b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/abstract.h new file mode 100644 index 00000000000..a2518363c8b --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/abstract.h @@ -0,0 +1,73 @@ +#pragma once +#include <ydb/core/formats/arrow/accessor/abstract/accessor.h> +#include <ydb/core/protos/flat_scheme_op.pb.h> +#include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract/common.h> + +#include <ydb/services/bg_tasks/abstract/interface.h> + +#include <library/cpp/json/writer/json_value.h> + +namespace NKikimr::NOlap::NIndexes { + +class IReadDataExtractor { +public: + using TRecordVisitor = const std::function<void(const std::string_view value, const ui64 hashBase)>; + using TChunkVisitor = const std::function<void(const std::shared_ptr<arrow::Array>&, const ui64 hashBase)>; + using TProto = NKikimrSchemeOp::TIndexDataExtractor; + using TFactory = NObjectFactory::TObjectFactory<IReadDataExtractor, TString>; + +private: + virtual void DoVisitAll(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& dataArray, const TChunkVisitor& chunkVisitor, + const TRecordVisitor& recordVisitor) const = 0; + virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) = 0; + + virtual void DoSerializeToProto(TProto& proto) const = 0; + virtual bool DoDeserializeFromProto(const TProto& proto) = 0; + virtual bool DoCheckForIndex(const NRequest::TOriginalDataAddress& dataSource, ui64& baseHash) const = 0; + virtual ui32 DoGetIndexHitsCount(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& dataArray) const = 0; + +public: + virtual TString GetClassName() const = 0; + virtual ~IReadDataExtractor() = default; + + ui32 GetIndexHitsCount(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& dataArray) const { + return DoGetIndexHitsCount(dataArray); + } + + bool CheckForIndex(const NRequest::TOriginalDataAddress& dataSource, ui64& baseHash) const { + baseHash = 0; + return DoCheckForIndex(dataSource, baseHash); + } + + virtual void SerializeToProto(TProto& proto) const { + return DoSerializeToProto(proto); + } + + virtual bool DeserializeFromProto(const TProto& proto) { + return DoDeserializeFromProto(proto); + } + + void VisitAll(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& dataArray, const TChunkVisitor& chunkVisitor, + const TRecordVisitor& recordVisitor) const { + AFL_VERIFY(dataArray->IsDataOwner())("type", dataArray->GetType()); + DoVisitAll(dataArray, chunkVisitor, recordVisitor); + } + + TConclusionStatus DeserializeFromJson(const NJson::TJsonValue& jsonInfo) { + return DoDeserializeFromJson(jsonInfo); + } +}; + +class TReadDataExtractorContainer: public NBackgroundTasks::TInterfaceProtoContainer<IReadDataExtractor> { +private: + using TBase = NBackgroundTasks::TInterfaceProtoContainer<IReadDataExtractor>; + +public: + using TBase::TBase; + TReadDataExtractorContainer() = default; + + bool DeserializeFromProto(const IReadDataExtractor::TProto& data); + TConclusionStatus DeserializeFromJson(const NJson::TJsonValue& jsonValue); +}; + +} // namespace NKikimr::NOlap::NIndexes diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/default.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/default.cpp new file mode 100644 index 00000000000..a78cddce1b6 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/default.cpp @@ -0,0 +1,61 @@ +#include "default.h" + +#include <ydb/core/formats/arrow/accessor/sub_columns/accessor.h> + +#include <util/digest/fnv.h> + +namespace NKikimr::NOlap::NIndexes { + +void TDefaultDataExtractor::VisitSimple( + const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& dataArray, const ui64 hashBase, const TChunkVisitor& visitor) const { + auto chunkedArray = dataArray->GetChunkedArray(); + for (auto&& i : chunkedArray->chunks()) { + visitor(i, hashBase); + } +} + +void TDefaultDataExtractor::DoVisitAll(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& dataArray, const TChunkVisitor& chunkVisitor, + const TRecordVisitor& recordVisitor) const { + if (dataArray->GetType() != NArrow::NAccessor::IChunkedArray::EType::SubColumnsArray) { + VisitSimple(dataArray, 0, chunkVisitor); + return; + } + const auto subColumns = std::static_pointer_cast<NArrow::NAccessor::TSubColumnsArray>(dataArray); + for (ui32 idx = 0; idx < subColumns->GetColumnsData().GetRecords()->GetColumnsCount(); ++idx) { + const std::string_view svColName = subColumns->GetColumnsData().GetStats().GetColumnName(idx); + const ui64 hashBase = NRequest::TOriginalDataAddress::CalcSubColumnHash(svColName); + VisitSimple(subColumns->GetColumnsData().GetRecords()->GetColumnVerified(idx), hashBase, chunkVisitor); + } + std::vector<ui64> hashByColumnIdx; + for (ui32 idx = 0; idx < subColumns->GetOthersData().GetStats().GetColumnsCount(); ++idx) { + const std::string_view svColName = subColumns->GetOthersData().GetStats().GetColumnName(idx); + hashByColumnIdx.emplace_back(NRequest::TOriginalDataAddress::CalcSubColumnHash(svColName)); + } + auto iterator = subColumns->GetOthersData().BuildIterator(); + for (; iterator.IsValid(); iterator.Next()) { + recordVisitor(iterator.GetValue(), hashByColumnIdx[iterator.GetKeyIndex()]); + } +} + +bool TDefaultDataExtractor::DoCheckForIndex(const NRequest::TOriginalDataAddress& request, ui64& hashBase) const { + if (request.GetSubColumnName()) { + AFL_VERIFY(request.GetSubColumnName().StartsWith("$.")); + std::string_view sv(request.GetSubColumnName().data() + 2, request.GetSubColumnName().size() - 2); + if (sv.starts_with("\"") && sv.ends_with("\"")) { + sv = std::string_view(sv.data() + 1, sv.size() - 2); + } + hashBase = NRequest::TOriginalDataAddress::CalcSubColumnHash(sv); + } + return true; +} + +ui32 TDefaultDataExtractor::DoGetIndexHitsCount(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& dataArray) const { + if (dataArray->GetType() != NArrow::NAccessor::IChunkedArray::EType::SubColumnsArray) { + return dataArray->GetRecordsCount(); + } else { + const auto subColumns = std::static_pointer_cast<NArrow::NAccessor::TSubColumnsArray>(dataArray); + return subColumns->GetColumnsData().GetStats().GetFilledValuesCount() + subColumns->GetOthersData().GetStats().GetFilledValuesCount(); + } +} + +} // namespace NKikimr::NOlap::NIndexes diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/default.h b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/default.h new file mode 100644 index 00000000000..0a5e7ec061b --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/default.h @@ -0,0 +1,39 @@ +#pragma once +#include "abstract.h" + +namespace NKikimr::NOlap::NIndexes { + +class TDefaultDataExtractor: public IReadDataExtractor { +public: + static TString GetClassNameStatic() { + return "DEFAULT"; + } + +private: + static const inline auto Registrator = TFactory::TRegistrator<TDefaultDataExtractor>(GetClassNameStatic()); + virtual void DoSerializeToProto(TProto& /*proto*/) const override { + return; + } + virtual bool DoDeserializeFromProto(const TProto& /*proto*/) override { + return true; + } + virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& /*jsonInfo*/) override { + return TConclusionStatus::Success(); + } + + void VisitSimple( + const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& dataArray, const ui64 hashBase, const TChunkVisitor& visitor) const; + + virtual void DoVisitAll(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& dataArray, const TChunkVisitor& chunkVisitor, + const TRecordVisitor& recordVisitor) const override; + + virtual bool DoCheckForIndex(const NRequest::TOriginalDataAddress& request, ui64& hashBase) const override; + virtual ui32 DoGetIndexHitsCount(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& dataArray) const override; + +public: + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } +}; + +} // namespace NKikimr::NOlap::NIndexes diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/sub_column.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/sub_column.cpp new file mode 100644 index 00000000000..e3f297de2a0 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/sub_column.cpp @@ -0,0 +1,38 @@ +#include "sub_column.h" +#include <ydb/core/formats/arrow/accessor/sub_columns/accessor.h> + +namespace NKikimr::NOlap::NIndexes { + +void TSubColumnDataExtractor::DoVisitAll(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& dataArray, const TChunkVisitor& chunkVisitor, + const TRecordVisitor& recordVisitor) const { + AFL_VERIFY(dataArray->GetType() == NArrow::NAccessor::IChunkedArray::EType::SubColumnsArray); + const auto subColumns = std::static_pointer_cast<NArrow::NAccessor::TSubColumnsArray>(dataArray); + if (auto idxColumn = subColumns->GetColumnsData().GetStats().GetKeyIndexOptional(SubColumnName)) { + auto chunkedArray = subColumns->GetColumnsData().GetRecords()->GetColumnVerified(*idxColumn)->GetChunkedArray(); + for (auto&& i : chunkedArray->chunks()) { + chunkVisitor(i, 0); + } + } else if (auto idxColumn = subColumns->GetOthersData().GetStats().GetKeyIndexOptional(SubColumnName)) { + auto iterator = subColumns->GetOthersData().BuildIterator(); + for (; iterator.IsValid(); iterator.Next()) { + if (iterator.GetKeyIndex() != *idxColumn) { + continue; + } + recordVisitor(iterator.GetValue(), 0); + } + } +} + +ui32 TSubColumnDataExtractor::DoGetIndexHitsCount(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& dataArray) const { + AFL_VERIFY(dataArray->GetType() == NArrow::NAccessor::IChunkedArray::EType::SubColumnsArray); + const auto subColumns = std::static_pointer_cast<NArrow::NAccessor::TSubColumnsArray>(dataArray); + if (auto idxColumn = subColumns->GetColumnsData().GetStats().GetKeyIndexOptional(SubColumnName)) { + return subColumns->GetColumnsData().GetStats().GetColumnRecordsCount(*idxColumn); + } else if (auto idxColumn = subColumns->GetOthersData().GetStats().GetKeyIndexOptional(SubColumnName)) { + return subColumns->GetOthersData().GetStats().GetColumnRecordsCount(*idxColumn); + } else { + return 0; + } +} + +} // namespace NKikimr::NOlap::NIndexes diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/sub_column.h b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/sub_column.h new file mode 100644 index 00000000000..d352061756e --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/sub_column.h @@ -0,0 +1,60 @@ +#pragma once +#include "abstract.h" + +namespace NKikimr::NOlap::NIndexes { + +class TSubColumnDataExtractor: public IReadDataExtractor { +public: + static TString GetClassNameStatic() { + return "SUB_COLUMN"; + } + +private: + static const inline auto Registrator = TFactory::TRegistrator<TSubColumnDataExtractor>(GetClassNameStatic()); + + TString SubColumnName; + + virtual void DoSerializeToProto(TProto& proto) const override { + AFL_VERIFY(!!SubColumnName); + proto.MutableSubColumn()->SetSubColumnName(SubColumnName); + } + virtual bool DoDeserializeFromProto(const TProto& proto) override { + if (!proto.HasSubColumn()) { + return false; + } + SubColumnName = proto.GetSubColumn().GetSubColumnName(); + if (!SubColumnName) { + return false; + } + return true; + } + virtual TConclusionStatus DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) override { + if (!jsonInfo.Has("sub_column_name")) { + return TConclusionStatus::Fail("extractor description has to have 'sub_column_name' parameter"); + } + if (!jsonInfo["sub_column_name"].IsString()) { + return TConclusionStatus::Fail("extractor description parameter 'sub_column_name' has to been string"); + } + SubColumnName = jsonInfo["sub_column_name"].GetString(); + if (!SubColumnName) { + return TConclusionStatus::Fail("extractor description parameter 'sub_column_name' hasn't been empty"); + } + return TConclusionStatus::Success(); + } + + virtual void DoVisitAll(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& dataArray, const TChunkVisitor& chunkVisitor, + const TRecordVisitor& recordVisitor) const override; + + virtual bool DoCheckForIndex(const NRequest::TOriginalDataAddress& request, ui64& /*hashBase*/) const override { + return request.GetSubColumnName() == SubColumnName; + } + + virtual ui32 DoGetIndexHitsCount(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& dataArray) const override; + +public: + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } +}; + +} // namespace NKikimr::NOlap::NIndexes diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/ya.make b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/ya.make new file mode 100644 index 00000000000..25071ccf9e4 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/ya.make @@ -0,0 +1,18 @@ +LIBRARY() + + +SRCS( + abstract.cpp + GLOBAL sub_column.cpp + GLOBAL default.cpp +) + +PEERDIR( + ydb/core/formats/arrow + ydb/core/protos + ydb/core/formats/arrow/accessor/sub_columns +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.cpp index 00c02406358..63c09d0f135 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.cpp +++ b/ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.cpp @@ -1,7 +1,8 @@ #include "meta.h" -#include <ydb/core/tx/columnshard/engines/storage/chunks/data.h> -#include <ydb/core/tx/columnshard/engines/scheme/index_info.h> + #include <ydb/core/formats/arrow/size_calcer.h> +#include <ydb/core/tx/columnshard/engines/scheme/index_info.h> +#include <ydb/core/tx/columnshard/engines/storage/chunks/data.h> namespace NKikimr::NOlap::NIndexes { @@ -12,7 +13,11 @@ std::shared_ptr<NKikimr::NOlap::IPortionDataChunk> TIndexByColumns::DoBuildIndex std::vector<TChunkedColumnReader> columnReaders; for (auto&& i : ColumnIds) { auto it = data.find(i); - AFL_VERIFY(it != data.end()); + if (it == data.end()) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "index_data_absent")("column_id", i)("index_name", GetIndexName())( + "index_id", GetIndexId()); + return nullptr; + } columnReaders.emplace_back(it->second, indexInfo.GetColumnLoaderVerified(i)); } TChunkedBatchReader reader(std::move(columnReaders)); @@ -25,17 +30,22 @@ bool TIndexByColumns::DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDe return true; } -TIndexByColumns::TIndexByColumns(const ui32 indexId, const TString& indexName, const std::set<ui32>& columnIds, const TString& storageId) +TIndexByColumns::TIndexByColumns( + const ui32 indexId, const TString& indexName, const ui32 columnId, const TString& storageId, const TReadDataExtractorContainer& extractor) : TBase(indexId, indexName, storageId) - , ColumnIds(columnIds) -{ + , DataExtractor(extractor) + , ColumnIds({ columnId }) { Serializer = NArrow::NSerialization::TSerializerContainer::GetDefaultSerializer(); } NKikimr::TConclusionStatus TIndexByColumns::CheckSameColumnsForModification(const IIndexMeta& newMeta) const { const auto* bMeta = dynamic_cast<const TIndexByColumns*>(&newMeta); if (!bMeta) { - return TConclusionStatus::Fail("cannot read meta as appropriate class: " + GetClassName() + ". Meta said that class name is " + newMeta.GetClassName()); + return TConclusionStatus::Fail( + "cannot read meta as appropriate class: " + GetClassName() + ". Meta said that class name is " + newMeta.GetClassName()); + } + if (bMeta->ColumnIds.size() != 1) { + return TConclusionStatus::Fail("one column per index is necessary"); } if (bMeta->ColumnIds.size() != ColumnIds.size()) { return TConclusionStatus::Fail("columns count is different"); diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.h b/ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.h index f8e601c80fc..a120134df8b 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.h +++ b/ydb/core/tx/columnshard/engines/storage/indexes/portions/meta.h @@ -1,7 +1,9 @@ #pragma once -#include <ydb/core/tx/columnshard/splitter/abstract/chunks.h> +#include "extractor/abstract.h" + #include <ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h> #include <ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.h> +#include <ydb/core/tx/columnshard/splitter/abstract/chunks.h> namespace NKikimr::NOlap::NIndexes { @@ -9,9 +11,17 @@ class TIndexByColumns: public IIndexMeta { private: using TBase = IIndexMeta; std::shared_ptr<NArrow::NSerialization::ISerializer> Serializer; - -protected: + TReadDataExtractorContainer DataExtractor; std::set<ui32> ColumnIds; +protected: + + const TReadDataExtractorContainer& GetDataExtractor() const { + return DataExtractor; + } + + TReadDataExtractorContainer& MutableDataExtractor() { + return DataExtractor; + } virtual TString DoBuildIndexImpl(TChunkedBatchReader& reader, const ui32 recordsCount) const = 0; @@ -22,8 +32,23 @@ protected: TConclusionStatus CheckSameColumnsForModification(const IIndexMeta& newMeta) const; public: + void AddColumnId(const ui32 columnId) { + AFL_VERIFY(ColumnIds.emplace(columnId).second); + AFL_VERIFY(ColumnIds.size() == 1); + } + + ui32 GetColumnId() const { + AFL_VERIFY(ColumnIds.size() == 1)("size", ColumnIds.size()); + return *ColumnIds.begin(); + } + + const std::set<ui32>& GetColumnIds() const { + return ColumnIds; + } + TIndexByColumns() = default; - TIndexByColumns(const ui32 indexId, const TString& indexName, const std::set<ui32>& columnIds, const TString& storageId); + TIndexByColumns(const ui32 indexId, const TString& indexName, const ui32 columnId, const TString& storageId, + const TReadDataExtractorContainer& extractor); }; } // namespace NKikimr::NOlap::NIndexes diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/portions/ya.make b/ydb/core/tx/columnshard/engines/storage/indexes/portions/ya.make index 0ce6d8f9987..403f7f797a9 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/portions/ya.make +++ b/ydb/core/tx/columnshard/engines/storage/indexes/portions/ya.make @@ -9,6 +9,7 @@ PEERDIR( ydb/core/formats/arrow ydb/library/formats/arrow/protos ydb/core/tx/columnshard/engines/storage/chunks + ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor ydb/core/tx/columnshard/engines/scheme/indexes/abstract ydb/core/tx/columnshard/engines/portions ) diff --git a/ydb/core/tx/columnshard/splitter/chunks.h b/ydb/core/tx/columnshard/splitter/chunks.h index 315fc18604e..8c4458fd57e 100644 --- a/ydb/core/tx/columnshard/splitter/chunks.h +++ b/ydb/core/tx/columnshard/splitter/chunks.h @@ -8,7 +8,7 @@ namespace NKikimr::NOlap { -class IPortionColumnChunk : public IPortionDataChunk { +class IPortionColumnChunk: public IPortionDataChunk { private: using TBase = IPortionDataChunk; @@ -55,68 +55,57 @@ private: std::vector<std::shared_ptr<IPortionDataChunk>> Chunks; std::shared_ptr<TColumnLoader> Loader; - std::shared_ptr<NArrow::NAccessor::IChunkedArray> CurrentChunk; - std::optional<NArrow::NAccessor::IChunkedArray::TFullDataAddress> CurrentChunkArray; - ui32 CurrentChunkIndex = 0; + std::shared_ptr<NArrow::NAccessor::IChunkedArray> CurrentArray; + std::optional<NArrow::NAccessor::IChunkedArray::TFullChunkedArrayAddress> CurrentChunkArray; + ui32 CurrentArrayIndex = 0; ui32 CurrentRecordIndex = 0; + public: + TChunkedColumnReader(const std::vector<std::shared_ptr<IPortionDataChunk>>& chunks, const std::shared_ptr<TColumnLoader>& loader) : Chunks(chunks) - , Loader(loader) - { + , Loader(loader) { Start(); } void Start() { - CurrentChunkIndex = 0; + CurrentArrayIndex = 0; CurrentRecordIndex = 0; if (Chunks.size()) { - CurrentChunk = Loader->ApplyVerified(Chunks.front()->GetData(), Chunks.front()->GetRecordsCountVerified()); + CurrentArray = Loader->ApplyVerified(Chunks.front()->GetData(), Chunks.front()->GetRecordsCountVerified()); CurrentChunkArray.reset(); } } - const std::shared_ptr<arrow::Array>& GetCurrentChunk() { + const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& GetCurrentChunk() { if (!CurrentChunkArray || !CurrentChunkArray->GetAddress().Contains(CurrentRecordIndex)) { - CurrentChunkArray = CurrentChunk->GetChunk(CurrentChunkArray, CurrentRecordIndex); + CurrentChunkArray = CurrentArray->GetArray(CurrentChunkArray, CurrentRecordIndex, CurrentArray); } AFL_VERIFY(CurrentChunkArray); return CurrentChunkArray->GetArray(); } - const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& GetCurrentAccessor() const { - AFL_VERIFY(CurrentChunk); - return CurrentChunk; - } - - ui32 GetCurrentRecordIndex() { - if (!CurrentChunkArray || !CurrentChunkArray->GetAddress().Contains(CurrentRecordIndex)) { - CurrentChunkArray = CurrentChunk->GetChunk(CurrentChunkArray->GetAddress(), CurrentRecordIndex); - } - return CurrentChunkArray->GetAddress().GetLocalIndex(CurrentRecordIndex); - } - bool IsCorrect() const { - return !!CurrentChunk; + return !!CurrentArray; } bool ReadNextChunk() { - while (++CurrentChunkIndex < Chunks.size()) { - CurrentChunk = Loader->ApplyVerified(Chunks[CurrentChunkIndex]->GetData(), Chunks[CurrentChunkIndex]->GetRecordsCountVerified()); + while (++CurrentArrayIndex < Chunks.size()) { + CurrentArray = Loader->ApplyVerified(Chunks[CurrentArrayIndex]->GetData(), Chunks[CurrentArrayIndex]->GetRecordsCountVerified()); CurrentChunkArray.reset(); CurrentRecordIndex = 0; - if (CurrentRecordIndex < CurrentChunk->GetRecordsCount()) { + if (CurrentRecordIndex < CurrentArray->GetRecordsCount()) { return true; } } CurrentChunkArray.reset(); - CurrentChunk = nullptr; + CurrentArray = nullptr; return false; } bool ReadNext() { - AFL_VERIFY(!!CurrentChunk); - if (++CurrentRecordIndex < CurrentChunk->GetRecordsCount()) { + AFL_VERIFY(!!CurrentArray); + if (++CurrentRecordIndex < CurrentArray->GetRecordsCount()) { return true; } return ReadNextChunk(); @@ -127,6 +116,7 @@ class TChunkedBatchReader { private: std::vector<TChunkedColumnReader> Columns; bool IsCorrectFlag = true; + public: TChunkedBatchReader(const std::vector<TChunkedColumnReader>& columnReaders) : Columns(columnReaders) { @@ -193,4 +183,4 @@ public: } }; -} +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/test_helper/program_constructor.cpp b/ydb/core/tx/columnshard/test_helper/program_constructor.cpp index 267e2700fa7..9d597f408df 100644 --- a/ydb/core/tx/columnshard/test_helper/program_constructor.cpp +++ b/ydb/core/tx/columnshard/test_helper/program_constructor.cpp @@ -13,7 +13,8 @@ ui32 TProgramProtoBuilder::AddConstant(const TString& bytes) { return CurrentGenericColumnId; } -ui32 TProgramProtoBuilder::AddOperation(const NKikimrSSA::TProgram::TAssignment::EFunction op, const std::vector<ui32>& arguments) { +ui32 TProgramProtoBuilder::AddOperation( + const NKikimrSSA::TProgram::TAssignment::EFunction op, const std::vector<ui32>& arguments) { auto* command = Proto.AddCommand(); auto* functionProto = command->MutableAssign()->MutableFunction(); for (auto&& i : arguments) { @@ -43,6 +44,18 @@ ui32 TProgramProtoBuilder::AddOperation(const NYql::TKernelRequestBuilder::EBina return CurrentGenericColumnId; } +ui32 TProgramProtoBuilder::AddOperation(const TString& kernelName, const std::vector<ui32>& arguments) { + auto* command = Proto.AddCommand(); + auto* functionProto = command->MutableAssign()->MutableFunction(); + for (auto&& i : arguments) { + functionProto->AddArguments()->SetId(i); + } + functionProto->SetId(NKikimrSSA::TProgram::TAssignment::FUNC_CMP_GREATER_EQUAL); + functionProto->SetKernelName(kernelName); + command->MutableAssign()->MutableColumn()->SetId(++CurrentGenericColumnId); + return CurrentGenericColumnId; +} + void TProgramProtoBuilder::AddFilter(const ui32 colId) { auto* command = Proto.AddCommand(); command->MutableFilter()->MutablePredicate()->SetId(colId); diff --git a/ydb/core/tx/columnshard/test_helper/program_constructor.h b/ydb/core/tx/columnshard/test_helper/program_constructor.h index 47d44389cc2..735cd2e3c1e 100644 --- a/ydb/core/tx/columnshard/test_helper/program_constructor.h +++ b/ydb/core/tx/columnshard/test_helper/program_constructor.h @@ -23,6 +23,7 @@ public: ui32 AddConstant(const TString& bytes); ui32 AddOperation(const NYql::TKernelRequestBuilder::EBinaryOp op, const std::vector<ui32>& arguments); ui32 AddOperation(const NKikimrSSA::TProgram::TAssignment::EFunction op, const std::vector<ui32>& arguments); + ui32 AddOperation(const TString& kernelName, const std::vector<ui32>& arguments); ui32 AddAggregation( const NArrow::NSSA::NAggregation::EAggregate op, const std::vector<ui32>& arguments, const std::vector<ui32>& groupByKeys); void AddFilter(const ui32 colId); diff --git a/ydb/services/bg_tasks/abstract/interface.h b/ydb/services/bg_tasks/abstract/interface.h index 6e57260ee4a..dfd03d1a5c0 100644 --- a/ydb/services/bg_tasks/abstract/interface.h +++ b/ydb/services/bg_tasks/abstract/interface.h @@ -189,7 +189,7 @@ protected: public: using TBase::TBase; - bool Initialize(const TString& className, const bool maybeExists = false) { + [[nodiscard]] bool Initialize(const TString& className, const bool maybeExists = false) { AFL_VERIFY(maybeExists || !Object)("problem", "initialize for not-empty-object"); Object.reset(TFactory::Construct(className)); if (!Object) { |