diff options
| author | ivanmorozov333 <[email protected]> | 2025-04-02 20:17:47 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2025-04-02 20:17:47 +0300 |
| commit | 9a05b0404bc14e137110f8f7835c7694876fdba9 (patch) | |
| tree | 54bd3401bd40b806795abdd60768f0d49a1144b7 | |
| parent | 107ab50888b62b426f44cc7678bd313065b3e7cc (diff) | |
case unsensitive index (#16642)
32 files changed, 555 insertions, 239 deletions
diff --git a/ydb/core/formats/arrow/program/abstract.h b/ydb/core/formats/arrow/program/abstract.h index 01724d31aec..9ca8a96f908 100644 --- a/ydb/core/formats/arrow/program/abstract.h +++ b/ydb/core/formats/arrow/program/abstract.h @@ -13,6 +13,44 @@ class TAccessorsCollection; namespace NKikimr::NArrow::NSSA { +class TIndexCheckOperation { +public: + enum class EOperation : ui32 { + Equals, + StartsWith, + EndsWith, + Contains + }; + +private: + const EOperation Operation; + YDB_READONLY(bool, CaseSensitive, true); + +public: + TString GetSignalId() const { + return TStringBuilder() << Operation << "::" << (CaseSensitive ? 1 : 0); + } + + TString DebugString() const { + return TStringBuilder() << "{" << Operation << "," << CaseSensitive << "}"; + } + + EOperation GetOperation() const { + return Operation; + } + + TIndexCheckOperation(const EOperation op, const bool caseSensitive) + : Operation(op) + , CaseSensitive(caseSensitive) { + } + + explicit operator size_t() const { + return (size_t)Operation; + } + + bool operator==(const TIndexCheckOperation& op) const = default; +}; + using IChunkedArray = NAccessor::IChunkedArray; using TAccessorsCollection = NAccessor::TAccessorsCollection; diff --git a/ydb/core/formats/arrow/program/assign_internal.cpp b/ydb/core/formats/arrow/program/assign_internal.cpp index a1432806e6c..e29d2693857 100644 --- a/ydb/core/formats/arrow/program/assign_internal.cpp +++ b/ydb/core/formats/arrow/program/assign_internal.cpp @@ -51,19 +51,12 @@ NJson::TJsonValue TCalculationProcessor::DoDebugJson() const { ui64 TCalculationProcessor::DoGetWeight() const { if (KernelLogic) { - return 0; + return (ui64)KernelLogic->GetWeight(); } if (!YqlOperationId) { - return 10; - } else if ((NYql::TKernelRequestBuilder::EBinaryOp)*YqlOperationId == NYql::TKernelRequestBuilder::EBinaryOp::StartsWith || - (NYql::TKernelRequestBuilder::EBinaryOp)*YqlOperationId == NYql::TKernelRequestBuilder::EBinaryOp::EndsWith) { - return 7; - } else if ((NYql::TKernelRequestBuilder::EBinaryOp)*YqlOperationId == NYql::TKernelRequestBuilder::EBinaryOp::StringContains) { - return 10; - } else if ((NYql::TKernelRequestBuilder::EBinaryOp)*YqlOperationId == NYql::TKernelRequestBuilder::EBinaryOp::Equals) { - return 5; + return (ui64)ECalculationHardness::Unknown; } - return 0; + return (ui64)ECalculationHardness::NotSpecified; } TString TCalculationProcessor::DoGetSignalCategoryName() const { diff --git a/ydb/core/formats/arrow/program/execution.h b/ydb/core/formats/arrow/program/execution.h index 348dea253fe..4be60989c84 100644 --- a/ydb/core/formats/arrow/program/execution.h +++ b/ydb/core/formats/arrow/program/execution.h @@ -10,13 +10,6 @@ namespace NKikimr::NArrow::NSSA { -enum class EIndexCheckOperation { - Equals, - StartsWith, - EndsWith, - Contains -}; - class TProcessorContext; class IFetchLogic { @@ -153,15 +146,15 @@ public: class TFetchIndexContext { public: - using EOperation = EIndexCheckOperation; + using TOperation = TIndexCheckOperation; class TOperationsBySubColumn { private: std::optional<bool> FullColumnOperations; - THashMap<TString, THashSet<EOperation>> Data; + THashMap<TString, THashSet<TOperation>> Data; public: - const THashMap<TString, THashSet<EOperation>>& GetData() const { + const THashMap<TString, THashSet<TOperation>>& GetData() const { return Data; } @@ -170,7 +163,7 @@ public: return !*FullColumnOperations; } - TOperationsBySubColumn& Add(const TString& subColumn, const EOperation operation, const bool strict = true) { + TOperationsBySubColumn& Add(const TString& subColumn, const TOperation operation, const bool strict = true) { if (FullColumnOperations) { AFL_VERIFY(*FullColumnOperations == !subColumn); } else { @@ -196,7 +189,7 @@ public: for (auto&& i : OperationsBySubColumn.GetData()) { auto& subColumnJson = result.InsertValue(i.first, NJson::JSON_ARRAY); for (auto&& op : i.second) { - subColumnJson.AppendValue(::ToString(op)); + subColumnJson.AppendValue(op.DebugString()); } } return result; @@ -231,15 +224,19 @@ public: private: YDB_READONLY(ui32, ColumnId, 0); YDB_READONLY_DEF(TString, SubColumnName); - YDB_READONLY(EIndexCheckOperation, Operation, EIndexCheckOperation::Equals); + TIndexCheckOperation Operation; public: - TCheckIndexContext(const ui32 columnId, const TString& subColumnName, const EIndexCheckOperation operation) + TCheckIndexContext(const ui32 columnId, const TString& subColumnName, const TIndexCheckOperation& operation) : ColumnId(columnId) , SubColumnName(subColumnName) , Operation(operation) { } + const TIndexCheckOperation& GetOperation() const { + return Operation; + } + bool operator==(const TCheckIndexContext& item) const { return std::tie(ColumnId, SubColumnName, Operation) == std::tie(item.ColumnId, item.SubColumnName, item.Operation); } diff --git a/ydb/core/formats/arrow/program/graph_optimization.cpp b/ydb/core/formats/arrow/program/graph_optimization.cpp index 95033e6f4c4..f6a49e9de22 100644 --- a/ydb/core/formats/arrow/program/graph_optimization.cpp +++ b/ydb/core/formats/arrow/program/graph_optimization.cpp @@ -10,7 +10,9 @@ #include <ydb/library/arrow_kernels/operations.h> #include <ydb/library/formats/arrow/switch/switch_type.h> +#include <library/cpp/string_utils/quote/quote.h> #include <util/string/builder.h> +#include <util/string/escape.h> #include <yql/essentials/core/arrow_kernels/request/request.h> namespace NKikimr::NArrow::NSSA::NGraph::NOptimization { @@ -199,9 +201,10 @@ TConclusion<bool> TGraph::OptimizeMergeFetching(TGraphNode* baseNode) { if (!i.second->Is(EProcessorType::FetchOriginalData)) { continue; } - if (i.second->GetProcessorAs<TOriginalColumnDataProcessor>()->GetDataAddresses().size() + - i.second->GetProcessorAs<TOriginalColumnDataProcessor>()->GetIndexContext().size() + - i.second->GetProcessorAs<TOriginalColumnDataProcessor>()->GetHeaderContext().size() > 1) { + if (i.second->GetProcessorAs<TOriginalColumnDataProcessor>()->GetDataAddresses().size() + + i.second->GetProcessorAs<TOriginalColumnDataProcessor>()->GetIndexContext().size() + + i.second->GetProcessorAs<TOriginalColumnDataProcessor>()->GetHeaderContext().size() > + 1) { continue; } if (i.second->GetProcessorAs<TOriginalColumnDataProcessor>()->GetDataAddresses().size()) { @@ -220,8 +223,7 @@ TConclusion<bool> TGraph::OptimizeMergeFetching(TGraphNode* baseNode) { for (auto&& i : dataAddresses) { columnIds.emplace(i->GetProcessorAs<TOriginalColumnDataProcessor>()->GetOutputColumnIdOnce()); } - auto proc = - std::make_shared<TOriginalColumnDataProcessor>(std::vector<ui32>(columnIds.begin(), columnIds.end())); + auto proc = std::make_shared<TOriginalColumnDataProcessor>(std::vector<ui32>(columnIds.begin(), columnIds.end())); for (auto&& i : dataAddresses) { for (auto&& addr : i->GetProcessorAs<TOriginalColumnDataProcessor>()->GetDataAddresses()) { proc->Add(addr.second); @@ -230,7 +232,7 @@ TConclusion<bool> TGraph::OptimizeMergeFetching(TGraphNode* baseNode) { auto nodeFetch = AddNode(proc); FetchersMerged.emplace(nodeFetch->GetIdentifier()); for (auto&& i : dataAddresses) { - for (auto&& to: i->GetOutputEdges()) { + for (auto&& to : i->GetOutputEdges()) { AddEdge(nodeFetch.get(), to.second, to.first.GetResourceId()); } RemoveNode(i->GetIdentifier()); @@ -245,8 +247,7 @@ TConclusion<bool> TGraph::OptimizeMergeFetching(TGraphNode* baseNode) { for (auto&& i : headers) { columnIds.emplace(i->GetProcessorAs<TOriginalColumnDataProcessor>()->GetOutputColumnIdOnce()); } - auto proc = - std::make_shared<TOriginalColumnDataProcessor>(std::vector<ui32>(columnIds.begin(), columnIds.end())); + auto proc = std::make_shared<TOriginalColumnDataProcessor>(std::vector<ui32>(columnIds.begin(), columnIds.end())); for (auto&& i : indexes) { for (auto&& addr : i->GetProcessorAs<TOriginalColumnDataProcessor>()->GetIndexContext()) { proc->Add(addr.second); @@ -361,11 +362,11 @@ TConclusion<bool> TGraph::OptimizeConditionsForIndexes(TGraphNode* condNode) { if (condNode->GetProcessor()->GetProcessorType() != EProcessorType::Calculation) { return false; } - if (condNode->GetProcessor()->GetInput().size() != 2) { + auto calc = condNode->GetProcessorAs<TCalculationProcessor>(); + if (!calc->GetKernelLogic()) { return false; } - auto calc = condNode->GetProcessorAs<TCalculationProcessor>(); - if (!calc->GetYqlOperationId()) { + if (condNode->GetProcessor()->GetInput().size() != 2) { return false; } if (condNode->GetOutputEdges().size() != 1) { @@ -376,17 +377,7 @@ TConclusion<bool> TGraph::OptimizeConditionsForIndexes(TGraphNode* condNode) { if (constNode->GetProcessor()->GetProcessorType() != EProcessorType::Const) { return false; } - if (!!calc->GetKernelLogic()) { - if (!calc->GetKernelLogic()->IsBoolInResult()) { - return false; - } - } - if (calc->GetYqlOperationId()) { - if (!IsBoolResultYqlOperator((NYql::TKernelRequestBuilder::EBinaryOp)*calc->GetYqlOperationId())) { - return false; - } - } - if (!calc->GetYqlOperationId() && !calc->GetKernelLogic()) { + if (!calc->GetKernelLogic()->IsBoolInResult()) { return false; } std::optional<TResourceAddress> dataAddr = GetOriginalAddress(dataNode); @@ -395,63 +386,44 @@ TConclusion<bool> TGraph::OptimizeConditionsForIndexes(TGraphNode* condNode) { } auto* dest = condNode->GetOutputEdges().begin()->second; const ui32 destResourceId = condNode->GetOutputEdges().begin()->first.GetResourceId(); - if ((NYql::TKernelRequestBuilder::EBinaryOp)*calc->GetYqlOperationId() == NYql::TKernelRequestBuilder::EBinaryOp::Equals || - (NYql::TKernelRequestBuilder::EBinaryOp)*calc->GetYqlOperationId() == NYql::TKernelRequestBuilder::EBinaryOp::StartsWith || - (NYql::TKernelRequestBuilder::EBinaryOp)*calc->GetYqlOperationId() == NYql::TKernelRequestBuilder::EBinaryOp::EndsWith || - (NYql::TKernelRequestBuilder::EBinaryOp)*calc->GetYqlOperationId() == NYql::TKernelRequestBuilder::EBinaryOp::StringContains) { - if (!IndexesConstructed.emplace(condNode->GetIdentifier()).second) { - return false; - } - RemoveEdge(condNode, dest, destResourceId); - - const EIndexCheckOperation indexOperation = [&]() { - if ((NYql::TKernelRequestBuilder::EBinaryOp)*calc->GetYqlOperationId() == NYql::TKernelRequestBuilder::EBinaryOp::Equals) { - return EIndexCheckOperation::Equals; - } - if ((NYql::TKernelRequestBuilder::EBinaryOp)*calc->GetYqlOperationId() == NYql::TKernelRequestBuilder::EBinaryOp::StartsWith) { - return EIndexCheckOperation::StartsWith; - } - if ((NYql::TKernelRequestBuilder::EBinaryOp)*calc->GetYqlOperationId() == NYql::TKernelRequestBuilder::EBinaryOp::EndsWith) { - return EIndexCheckOperation::EndsWith; - } - if ((NYql::TKernelRequestBuilder::EBinaryOp)*calc->GetYqlOperationId() == NYql::TKernelRequestBuilder::EBinaryOp::StringContains) { - return EIndexCheckOperation::Contains; - } - return EIndexCheckOperation::Contains; - AFL_VERIFY(false); - }(); + auto indexChecker = calc->GetKernelLogic()->GetIndexCheckerOperation(); + if (!indexChecker) { + return false; + } + if (!IndexesConstructed.emplace(condNode->GetIdentifier()).second) { + return false; + } + RemoveEdge(condNode, dest, destResourceId); - const ui32 resourceIdxFetch = BuildNextResourceId(); - IDataSource::TFetchIndexContext indexContext(dataAddr->GetColumnId(), - IDataSource::TFetchIndexContext::TOperationsBySubColumn().Add(dataAddr->GetSubColumnName(), indexOperation)); - auto indexFetchProc = std::make_shared<TOriginalColumnDataProcessor>(resourceIdxFetch, indexContext); - auto indexFetchNode = AddNode(indexFetchProc); - RegisterProducer(resourceIdxFetch, indexFetchNode.get()); + const ui32 resourceIdxFetch = BuildNextResourceId(); + IDataSource::TFetchIndexContext indexContext( + dataAddr->GetColumnId(), IDataSource::TFetchIndexContext::TOperationsBySubColumn().Add(dataAddr->GetSubColumnName(), *indexChecker)); + auto indexFetchProc = std::make_shared<TOriginalColumnDataProcessor>(resourceIdxFetch, indexContext); + auto indexFetchNode = AddNode(indexFetchProc); + RegisterProducer(resourceIdxFetch, indexFetchNode.get()); - const ui32 resourceIdIndexToAnd = BuildNextResourceId(); - IDataSource::TCheckIndexContext checkIndexContext(dataAddr->GetColumnId(), dataAddr->GetSubColumnName(), indexOperation); - auto indexCheckProc = std::make_shared<TIndexCheckerProcessor>( - resourceIdxFetch, constNode->GetProcessor()->GetOutputColumnIdOnce(), checkIndexContext, resourceIdIndexToAnd); - auto indexProcNode = AddNode(indexCheckProc); - RegisterProducer(resourceIdIndexToAnd, indexProcNode.get()); - AddEdge(indexFetchNode.get(), indexProcNode.get(), resourceIdxFetch); - AddEdge(constNode, indexProcNode.get(), constNode->GetProcessor()->GetOutputColumnIdOnce()); + const ui32 resourceIdIndexToAnd = BuildNextResourceId(); + IDataSource::TCheckIndexContext checkIndexContext(dataAddr->GetColumnId(), dataAddr->GetSubColumnName(), *indexChecker); + auto indexCheckProc = std::make_shared<TIndexCheckerProcessor>( + resourceIdxFetch, constNode->GetProcessor()->GetOutputColumnIdOnce(), checkIndexContext, resourceIdIndexToAnd); + auto indexProcNode = AddNode(indexCheckProc); + RegisterProducer(resourceIdIndexToAnd, indexProcNode.get()); + AddEdge(indexFetchNode.get(), indexProcNode.get(), resourceIdxFetch); + AddEdge(constNode, indexProcNode.get(), constNode->GetProcessor()->GetOutputColumnIdOnce()); - const ui32 resourceIdEqToAnd = BuildNextResourceId(); - RegisterProducer(resourceIdEqToAnd, condNode); - calc->SetOutputResourceIdOnce(resourceIdEqToAnd); + const ui32 resourceIdEqToAnd = BuildNextResourceId(); + RegisterProducer(resourceIdEqToAnd, condNode); + calc->SetOutputResourceIdOnce(resourceIdEqToAnd); - auto andProcessor = std::make_shared<TStreamLogicProcessor>(TColumnChainInfo::BuildVector({ resourceIdEqToAnd, resourceIdIndexToAnd }), - TColumnChainInfo(destResourceId), NKernels::EOperation::And); - auto andNode = AddNode(andProcessor); - AddEdge(andNode.get(), dest, destResourceId); + auto andProcessor = std::make_shared<TStreamLogicProcessor>( + TColumnChainInfo::BuildVector({ resourceIdEqToAnd, resourceIdIndexToAnd }), TColumnChainInfo(destResourceId), NKernels::EOperation::And); + auto andNode = AddNode(andProcessor); + AddEdge(andNode.get(), dest, destResourceId); - AddEdge(indexProcNode.get(), andNode.get(), resourceIdIndexToAnd); - AddEdge(condNode, andNode.get(), resourceIdEqToAnd); - ResetProducer(destResourceId, andNode.get()); - return true; - } - return false; + AddEdge(indexProcNode.get(), andNode.get(), resourceIdIndexToAnd); + AddEdge(condNode, andNode.get(), resourceIdEqToAnd); + ResetProducer(destResourceId, andNode.get()); + return true; } bool TGraph::IsBoolResultYqlOperator(const NYql::TKernelRequestBuilder::EBinaryOp op) const { @@ -687,16 +659,16 @@ TConclusionStatus TGraph::Collapse() { } } -// { -// auto conclusion = OptimizeConditionsForHeadersCheck(n.get()); -// if (conclusion.IsFail()) { -// return conclusion; -// } -// if (*conclusion) { -// hasChanges = true; -// break; -// } -// } + // { + // auto conclusion = OptimizeConditionsForHeadersCheck(n.get()); + // if (conclusion.IsFail()) { + // return conclusion; + // } + // if (*conclusion) { + // hasChanges = true; + // break; + // } + // } { auto conclusion = OptimizeConditionsForStream(n.get()); diff --git a/ydb/core/formats/arrow/program/index.h b/ydb/core/formats/arrow/program/index.h index 3a54dd9c5b1..1ff82c80164 100644 --- a/ydb/core/formats/arrow/program/index.h +++ b/ydb/core/formats/arrow/program/index.h @@ -28,7 +28,7 @@ private: bool ApplyToFilterFlag = false; virtual TString DoGetSignalCategoryName() const override { - return ::ToString(GetProcessorType()) + "::" + ::ToString(IndexContext.GetOperation()); + return ::ToString(GetProcessorType()) + "::" + IndexContext.GetOperation().GetSignalId(); } public: diff --git a/ydb/core/formats/arrow/program/kernel_logic.cpp b/ydb/core/formats/arrow/program/kernel_logic.cpp index f20d22d9f55..8fb418f7745 100644 --- a/ydb/core/formats/arrow/program/kernel_logic.cpp +++ b/ydb/core/formats/arrow/program/kernel_logic.cpp @@ -47,37 +47,6 @@ std::shared_ptr<IChunkedArray> TGetJsonPath::ExtractArray(const std::shared_ptr< } } -std::optional<TFetchingInfo> TGetJsonPath::BuildFetchTask(const ui32 columnId, const NAccessor::IChunkedArray::EType arrType, - const std::vector<TColumnChainInfo>& input, const std::shared_ptr<TAccessorsCollection>& resources) const { - if (arrType != NAccessor::IChunkedArray::EType::SubColumnsArray) { - return TFetchingInfo::BuildFullRestore(false); - } - AFL_VERIFY(input.size() == 2 && input.front().GetColumnId() == columnId); - auto description = BuildDescription(input, resources).DetachResult(); - const std::vector<TString> subColumns = { TString(description.GetJsonPath().data(), description.GetJsonPath().size()) }; - if (!description.GetInputAccessor()) { - return TFetchingInfo::BuildSubColumnsRestore(subColumns); - } - - std::optional<bool> hasSubColumns; - return NAccessor::TCompositeChunkedArray::VisitDataOwners<TFetchingInfo>( - description.GetInputAccessor(), [&](const std::shared_ptr<NAccessor::IChunkedArray>& arr) { - if (arr->GetType() == NAccessor::IChunkedArray::EType::SubColumnsPartialArray) { - AFL_VERIFY(!hasSubColumns || *hasSubColumns); - hasSubColumns = true; - auto scArr = std::static_pointer_cast<NAccessor::TSubColumnsPartialArray>(arr); - if (scArr->NeedFetch(description.GetJsonPath())) { - return std::optional<TFetchingInfo>(TFetchingInfo::BuildSubColumnsRestore(subColumns)); - } - } else { - AFL_VERIFY(arr->GetType() == NAccessor::IChunkedArray::EType::SubColumnsArray); - AFL_VERIFY(!hasSubColumns || !*hasSubColumns); - hasSubColumns = false; - } - return std::optional<TFetchingInfo>(); - }); -} - NAccessor::TCompositeChunkedArray::TBuilder TGetJsonPath::MakeCompositeBuilder() const { return NAccessor::TCompositeChunkedArray::TBuilder(arrow::utf8()); } diff --git a/ydb/core/formats/arrow/program/kernel_logic.h b/ydb/core/formats/arrow/program/kernel_logic.h index 5dfd4e5d1ac..9306019a57f 100644 --- a/ydb/core/formats/arrow/program/kernel_logic.h +++ b/ydb/core/formats/arrow/program/kernel_logic.h @@ -8,21 +8,30 @@ namespace NKikimr::NArrow::NSSA { +enum class ECalculationHardness { + JustAccessorUsage = 1, + NotSpecified = 3, + Equals = 5, + StringMatching = 10, + Unknown = 20 +}; + class IKernelLogic { private: virtual TConclusion<bool> DoExecute(const std::vector<TColumnChainInfo>& input, const std::vector<TColumnChainInfo>& output, const std::shared_ptr<TAccessorsCollection>& resources) const = 0; + virtual std::optional<TIndexCheckOperation> DoGetIndexCheckerOperation() const = 0; + public: virtual ~IKernelLogic() = default; + virtual ECalculationHardness GetWeight() const = 0; + using TFactory = NObjectFactory::TObjectFactory<IKernelLogic, TString>; virtual TString GetClassName() const = 0; - virtual std::optional<TFetchingInfo> BuildFetchTask(const ui32 columnId, const NAccessor::IChunkedArray::EType arrType, - const std::vector<TColumnChainInfo>& input, const std::shared_ptr<TAccessorsCollection>& resources) const = 0; - TConclusion<bool> Execute(const std::vector<TColumnChainInfo>& input, const std::vector<TColumnChainInfo>& output, const std::shared_ptr<TAccessorsCollection>& resources) const { if (!resources) { @@ -32,6 +41,73 @@ public: } virtual bool IsBoolInResult() const = 0; + std::optional<TIndexCheckOperation> GetIndexCheckerOperation() const { + return DoGetIndexCheckerOperation(); + } +}; + +class TLogicMatchString: public IKernelLogic { +private: + virtual TConclusion<bool> DoExecute(const std::vector<TColumnChainInfo>& /*input*/, const std::vector<TColumnChainInfo>& /*output*/, + const std::shared_ptr<TAccessorsCollection>& /*resources*/) const override { + return false; + } + virtual std::optional<TIndexCheckOperation> DoGetIndexCheckerOperation() const override { + return TIndexCheckOperation(Operation, CaseSensitive); + } + virtual ECalculationHardness GetWeight() const override { + return ECalculationHardness::StringMatching; + } + + const TIndexCheckOperation::EOperation Operation; + const bool CaseSensitive; + const bool IsSimpleFunction; + +public: + TLogicMatchString(const TIndexCheckOperation::EOperation operation, const bool caseSensitive, const bool isSimpleFunction) + : Operation(operation) + , CaseSensitive(caseSensitive) + , IsSimpleFunction(isSimpleFunction) + { + } + + virtual TString GetClassName() const override { + return "MATCH_STRING::" + ::ToString(Operation) + "::" + ::ToString(CaseSensitive); + } + + virtual bool IsBoolInResult() const override { + return !IsSimpleFunction; + } +}; + +class TLogicEquals: public IKernelLogic { +private: + virtual TConclusion<bool> DoExecute(const std::vector<TColumnChainInfo>& /*input*/, const std::vector<TColumnChainInfo>& /*output*/, + const std::shared_ptr<TAccessorsCollection>& /*resources*/) const override { + return false; + } + virtual std::optional<TIndexCheckOperation> DoGetIndexCheckerOperation() const override { + return TIndexCheckOperation(TIndexCheckOperation::EOperation::Equals, true); + } + const bool IsSimpleFunction; + + virtual ECalculationHardness GetWeight() const override { + return ECalculationHardness::Equals; + } + +public: + TLogicEquals(const bool isSimpleFunction) + : IsSimpleFunction(isSimpleFunction) + { + } + + virtual TString GetClassName() const override { + return "EQUALS"; + } + + virtual bool IsBoolInResult() const override { + return !IsSimpleFunction; + } }; class TGetJsonPath: public IKernelLogic { @@ -39,6 +115,13 @@ public: static TString GetClassNameStatic() { return "JsonValue"; } + virtual std::optional<TIndexCheckOperation> DoGetIndexCheckerOperation() const override { + return std::nullopt; + } + + virtual ECalculationHardness GetWeight() const override { + return ECalculationHardness::JustAccessorUsage; + } private: virtual bool IsBoolInResult() const override { @@ -91,9 +174,6 @@ private: static const inline TFactory::TRegistrator<TGetJsonPath> Registrator = TFactory::TRegistrator<TGetJsonPath>(GetClassNameStatic()); - virtual std::optional<TFetchingInfo> BuildFetchTask(const ui32 columnId, const NAccessor::IChunkedArray::EType arrType, - const std::vector<TColumnChainInfo>& input, const std::shared_ptr<TAccessorsCollection>& resources) const override; - virtual TConclusion<bool> DoExecute(const std::vector<TColumnChainInfo>& input, const std::vector<TColumnChainInfo>& output, const std::shared_ptr<TAccessorsCollection>& resources) const override; diff --git a/ydb/core/formats/arrow/ut/ut_program_step.cpp b/ydb/core/formats/arrow/ut/ut_program_step.cpp index 687777f3312..82612943a4c 100644 --- a/ydb/core/formats/arrow/ut/ut_program_step.cpp +++ b/ydb/core/formats/arrow/ut/ut_program_step.cpp @@ -552,7 +552,8 @@ Y_UNIT_TEST_SUITE(ProgramStep) { } { auto proc = TCalculationProcessor::Build(TColumnChainInfo::BuildVector({10001}), TColumnChainInfo(1001), std::make_shared<TSimpleFunction>(EOperation::MatchSubstring)).DetachResult(); - proc->SetYqlOperationId((ui32)NYql::TKernelRequestBuilder::EBinaryOp::StringContains); + proc->SetKernelLogic(std::make_shared<NKikimr::NArrow::NSSA::TLogicMatchString>( + NKikimr::NArrow::NSSA::TIndexCheckOperation::EOperation::Contains, true, false)); builder.Add(proc); } { @@ -562,7 +563,8 @@ Y_UNIT_TEST_SUITE(ProgramStep) { } { auto proc = TCalculationProcessor::Build(TColumnChainInfo::BuildVector({2}), TColumnChainInfo(1002), std::make_shared<TSimpleFunction>(EOperation::StartsWith)).DetachResult(); - proc->SetYqlOperationId((ui32)NYql::TKernelRequestBuilder::EBinaryOp::StartsWith); + proc->SetKernelLogic(std::make_shared<NKikimr::NArrow::NSSA::TLogicMatchString>( + NKikimr::NArrow::NSSA::TIndexCheckOperation::EOperation::StartsWith, true, false)); builder.Add(proc); } { @@ -572,7 +574,7 @@ Y_UNIT_TEST_SUITE(ProgramStep) { } { auto proc = TCalculationProcessor::Build(TColumnChainInfo::BuildVector({1, 3}), TColumnChainInfo(1003), std::make_shared<TSimpleFunction>(EOperation::Equal)).DetachResult(); - proc->SetYqlOperationId((ui32)NYql::TKernelRequestBuilder::EBinaryOp::Equals); + proc->SetKernelLogic(std::make_shared<NKikimr::NArrow::NSSA::TLogicEquals>(false)); builder.Add(proc); } { diff --git a/ydb/core/kqp/ut/olap/json_ut.cpp b/ydb/core/kqp/ut/olap/json_ut.cpp index ea27065af8a..b4349f04138 100644 --- a/ydb/core/kqp/ut/olap/json_ut.cpp +++ b/ydb/core/kqp/ut/olap/json_ut.cpp @@ -341,7 +341,13 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) { public: TScriptVariator(const TString& script) { - auto commands = StringSplitter(script).SplitByString("------").ToList<TString>(); + auto lines = StringSplitter(script).SplitByString("\n").ToList<TString>(); + lines.erase(std::remove_if(lines.begin(), lines.end(), + [](const TString& l) { + return Strip(l).StartsWith("#"); + }), + lines.end()); + auto commands = StringSplitter(JoinSeq("\n", lines)).SplitByString("------").ToList<TString>(); std::vector<std::vector<std::shared_ptr<ICommand>>> commandsDescription; for (auto&& i : commands) { auto& cVariants = commandsDescription.emplace_back(); @@ -861,7 +867,7 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) { TScriptVariator(script).Execute(); } - Y_UNIT_TEST(BloomIndexesVariants) { + Y_UNIT_TEST(BloomMixIndexesVariants) { TString script = R"( STOP_COMPACTION ------ @@ -872,7 +878,7 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) { PRIMARY KEY (Col1) ) PARTITION BY HASH(Col1) - WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = $$2$$); + WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 2); ------ SCHEMA: ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=UPSERT_OPTIONS, `SCAN_READER_POLICY_NAME`=`SIMPLE`) @@ -898,7 +904,12 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) { SCHEMA: ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=UPSERT_INDEX, NAME=index_ngramm_b, TYPE=BLOOM_NGRAMM_FILTER, FEATURES=`{"column_name" : "Col2", "ngramm_size" : 3, "hashes_count" : 2, "filter_size_bytes" : 4096, - "records_count" : 1024, "data_extractor" : {"class_name" : "SUB_COLUMN", "sub_column_name" : "b.c.d"}}`); + "records_count" : 1024, "case_sensitive" : false, "data_extractor" : {"class_name" : "SUB_COLUMN", "sub_column_name" : '"b.c.d"'}}`); + ------ + SCHEMA: + ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=UPSERT_INDEX, NAME=index_ngramm_a, TYPE=BLOOM_NGRAMM_FILTER, + FEATURES=`{"column_name" : "Col2", "ngramm_size" : 3, "hashes_count" : 2, "filter_size_bytes" : 4096, + "records_count" : 1024, "case_sensitive" : true, "data_extractor" : {"class_name" : "SUB_COLUMN", "sub_column_name" : "a"}}`); ------ DATA: REPLACE INTO `/Root/ColumnTable` (Col1) VALUES(10u) @@ -924,9 +935,29 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) { EXPECTED: [[14u;["{\"a\":\"a4\",\"b.c.d\":\"1b4\"}"]]] IDX_ND_SKIP_APPROVE: 0, 4, 1 ------ - READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d\"") = "1b5" ORDER BY Col1; - EXPECTED: [] - IDX_ND_SKIP_APPROVE: 0, 5, 0 + READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d\"") like "%1b4%" ORDER BY Col1; + EXPECTED: [[14u;["{\"a\":\"a4\",\"b.c.d\":\"1b4\"}"]]] + IDX_ND_SKIP_APPROVE: 0, 4, 1 +# ------ +# READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d\"") ilike "%1b4%" ORDER BY Col1; +# EXPECTED: [[14u;["{\"a\":\"a4\",\"b.c.d\":\"1b4\"}"]]] +# IDX_ND_SKIP_APPROVE: 0, 4, 1 +# ------ +# READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d\"") ilike "%1B4" ORDER BY Col1; +# EXPECTED: [[14u;["{\"a\":\"a4\",\"b.c.d\":\"1b4\"}"]]] +# IDX_ND_SKIP_APPROVE: 0, 4, 1 +# ------ +# READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d\"") ilike "1b5" ORDER BY Col1; +# EXPECTED: [] +# IDX_ND_SKIP_APPROVE: 0, 5, 0 +# ------ +# READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.a") = "1b5" ORDER BY Col1; +# EXPECTED: [] +# IDX_ND_SKIP_APPROVE: 0, 5, 0 + ------ + READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.a") = "a4" ORDER BY Col1; + EXPECTED: [[4u;["{\"a\":\"a4\",\"b.c.d\":\"b4\"}"]];[14u;["{\"a\":\"a4\",\"b.c.d\":\"1b4\"}"]]] + IDX_ND_SKIP_APPROVE: 0, 3, 2 ------ READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d111\"") = "1b5" ORDER BY Col1; EXPECTED: [] @@ -944,6 +975,172 @@ Y_UNIT_TEST_SUITE(KqpOlapJson) { TScriptVariator(script).Execute(); } + Y_UNIT_TEST(BloomCategoryIndexesVariants) { + TString script = R"( + STOP_COMPACTION + ------ + SCHEMA: + CREATE TABLE `/Root/ColumnTable` ( + Col1 Uint64 NOT NULL, + Col2 JsonDocument, + PRIMARY KEY (Col1) + ) + PARTITION BY HASH(Col1) + WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 2); + ------ + SCHEMA: + ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=UPSERT_OPTIONS, `SCAN_READER_POLICY_NAME`=`SIMPLE`) + ------ + SCHEMA: + ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=ALTER_COLUMN, NAME=Col2, `DATA_ACCESSOR_CONSTRUCTOR.CLASS_NAME`=`SUB_COLUMNS`, + `DATA_EXTRACTOR_CLASS_NAME`=`JSON_SCANNER`, `FORCE_SIMD_PARSING`=`$$true|false$$`, `SCAN_FIRST_LEVEL_ONLY`=`$$true|false$$`, + `COLUMNS_LIMIT`=`$$0|1|1024$$`, `SPARSED_DETECTOR_KFF`=`$$0|10$$`, + `MEM_LIMIT_CHUNK`=`$$0|1000$$`, `OTHERS_ALLOWED_FRACTION`=`$$0|0.5$$`) + ------ + DATA: + REPLACE INTO `/Root/ColumnTable` (Col1, Col2) VALUES(1u, JsonDocument('{"a.b.c" : "a1"}')), (2u, JsonDocument('{"a.b.c" : "a2"}')), + (3u, JsonDocument('{"b.c.d" : "b3"}')), (4u, JsonDocument('{"b.c.d" : "b4", "a" : "a4"}')) + ------ + DATA: + REPLACE INTO `/Root/ColumnTable` (Col1, Col2) VALUES(11u, JsonDocument('{"a.b.c" : "1a1"}')), (12u, JsonDocument('{"a.b.c" : "1a2"}')), + (13u, JsonDocument('{"b.c.d" : "1b3"}')), (14u, JsonDocument('{"b.c.d" : "1b4", "a" : "a4"}')) + ------ + SCHEMA: + ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=UPSERT_INDEX, NAME=a_index, TYPE=$$CATEGORY_BLOOM_FILTER|BLOOM_FILTER$$, + FEATURES=`{"column_name" : "Col2", "false_positive_probability" : 0.01}`) + ------ + DATA: + REPLACE INTO `/Root/ColumnTable` (Col1) VALUES(10u) + ------ + ONE_ACTUALIZATION + ------ + READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"a.b.c\"") = "a1" ORDER BY Col1; + EXPECTED: [[1u;["{\"a.b.c\":\"a1\"}"]]] + IDX_ND_SKIP_APPROVE: 0, 4, 1 + ------ + SCHEMA: + ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=DROP_INDEX, NAME=a_index) + ------ + READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"a.b.c\"") = "1a1" ORDER BY Col1; + EXPECTED: [[11u;["{\"a.b.c\":\"1a1\"}"]]] + IDX_ND_SKIP_APPROVE: 0, 4, 1 + ------ + SCHEMA: + ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=UPSERT_INDEX, NAME=b_index, TYPE=CATEGORY_BLOOM_FILTER, + FEATURES=`{"column_name" : "Col2", "false_positive_probability" : 0.01}`) + ------ + READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d\"") = "1b4" ORDER BY Col1; + EXPECTED: [[14u;["{\"a\":\"a4\",\"b.c.d\":\"1b4\"}"]]] + IDX_ND_SKIP_APPROVE: 0, 4, 1 + ------ + READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.a") = "1b5" ORDER BY Col1; + EXPECTED: [] + IDX_ND_SKIP_APPROVE: 0, 5, 0 + ------ + READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.a") = "a4" ORDER BY Col1; + EXPECTED: [[4u;["{\"a\":\"a4\",\"b.c.d\":\"b4\"}"]];[14u;["{\"a\":\"a4\",\"b.c.d\":\"1b4\"}"]]] + IDX_ND_SKIP_APPROVE: 0, 3, 2 + ------ + READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d111\"") = "1b5" ORDER BY Col1; + EXPECTED: [] + IDX_ND_SKIP_APPROVE: 0, 5, 0 + + )"; + TScriptVariator(script).Execute(); + } + + Y_UNIT_TEST(BloomNGrammIndexesVariants) { + TString script = R"( + STOP_COMPACTION + ------ + SCHEMA: + CREATE TABLE `/Root/ColumnTable` ( + Col1 Uint64 NOT NULL, + Col2 JsonDocument, + PRIMARY KEY (Col1) + ) + PARTITION BY HASH(Col1) + WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 2); + ------ + SCHEMA: + ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=UPSERT_OPTIONS, `SCAN_READER_POLICY_NAME`=`SIMPLE`) + ------ + SCHEMA: + ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=ALTER_COLUMN, NAME=Col2, `DATA_ACCESSOR_CONSTRUCTOR.CLASS_NAME`=`SUB_COLUMNS`, + `DATA_EXTRACTOR_CLASS_NAME`=`JSON_SCANNER`, `FORCE_SIMD_PARSING`=`$$true|false$$`, `SCAN_FIRST_LEVEL_ONLY`=`$$true|false$$`, + `COLUMNS_LIMIT`=`$$0|1|1024$$`, `SPARSED_DETECTOR_KFF`=`$$0|10$$`, + `MEM_LIMIT_CHUNK`=`$$0|1000$$`, `OTHERS_ALLOWED_FRACTION`=`$$0|0.5$$`) + ------ + DATA: + REPLACE INTO `/Root/ColumnTable` (Col1, Col2) VALUES(1u, JsonDocument('{"a.b.c" : "a1"}')), (2u, JsonDocument('{"a.b.c" : "a2"}')), + (3u, JsonDocument('{"b.c.d" : "b3"}')), (4u, JsonDocument('{"b.c.d" : "b4", "a" : "a4"}')) + ------ + DATA: + REPLACE INTO `/Root/ColumnTable` (Col1, Col2) VALUES(11u, JsonDocument('{"a.b.c" : "1a1"}')), (12u, JsonDocument('{"a.b.c" : "1a2"}')), + (13u, JsonDocument('{"b.c.d" : "1b3"}')), (14u, JsonDocument('{"b.c.d" : "1b4", "a" : "a4"}')) + ------ + SCHEMA: + ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=UPSERT_INDEX, NAME=index_ngramm_b, TYPE=BLOOM_NGRAMM_FILTER, + FEATURES=`{"column_name" : "Col2", "ngramm_size" : 3, "hashes_count" : 2, "filter_size_bytes" : 4096, + "records_count" : 1024, "case_sensitive" : false, "data_extractor" : {"class_name" : "SUB_COLUMN", "sub_column_name" : '"b.c.d"'}}`); + ------ + SCHEMA: + ALTER OBJECT `/Root/ColumnTable` (TYPE TABLE) SET (ACTION=UPSERT_INDEX, NAME=index_ngramm_a, TYPE=BLOOM_NGRAMM_FILTER, + FEATURES=`{"column_name" : "Col2", "ngramm_size" : 3, "hashes_count" : 2, "filter_size_bytes" : 4096, + "records_count" : 1024, "case_sensitive" : true, "data_extractor" : {"class_name" : "SUB_COLUMN", "sub_column_name" : "a"}}`); + ------ + DATA: + REPLACE INTO `/Root/ColumnTable` (Col1) VALUES(10u) + ------ + ONE_ACTUALIZATION + ------ + READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"a.b.c\"") = "a1" ORDER BY Col1; + EXPECTED: [[1u;["{\"a.b.c\":\"a1\"}"]]] + IDX_ND_SKIP_APPROVE: 5, 0, 0 + ------ + READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d\"") = "1b4" ORDER BY Col1; + EXPECTED: [[14u;["{\"a\":\"a4\",\"b.c.d\":\"1b4\"}"]]] + IDX_ND_SKIP_APPROVE: 0, 4, 1 + ------ + READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d\"") like "%1b4%" ORDER BY Col1; + EXPECTED: [[14u;["{\"a\":\"a4\",\"b.c.d\":\"1b4\"}"]]] + IDX_ND_SKIP_APPROVE: 0, 4, 1 +# ------ +# READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d\"") ilike "%1b4%" ORDER BY Col1; +# EXPECTED: [[14u;["{\"a\":\"a4\",\"b.c.d\":\"1b4\"}"]]] +# IDX_ND_SKIP_APPROVE: 0, 4, 1 +# ------ +# READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d\"") ilike "%1B4" ORDER BY Col1; +# EXPECTED: [[14u;["{\"a\":\"a4\",\"b.c.d\":\"1b4\"}"]]] +# IDX_ND_SKIP_APPROVE: 0, 4, 1 +# ------ +# READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d\"") ilike "1b5" ORDER BY Col1; +# EXPECTED: [] +# IDX_ND_SKIP_APPROVE: 0, 5, 0 + ------ + READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.a") = "1b5" ORDER BY Col1; + EXPECTED: [] + IDX_ND_SKIP_APPROVE: 0, 5, 0 + ------ + READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.a") = "a4" ORDER BY Col1; + EXPECTED: [[4u;["{\"a\":\"a4\",\"b.c.d\":\"b4\"}"]];[14u;["{\"a\":\"a4\",\"b.c.d\":\"1b4\"}"]]] + IDX_ND_SKIP_APPROVE: 0, 3, 2 + ------ + READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d111\"") = "1b5" ORDER BY Col1; + EXPECTED: [] + IDX_ND_SKIP_APPROVE: 5, 0, 0 + ------ + READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d\"") like "1b3" ORDER BY Col1; + EXPECTED: [[13u;["{\"b.c.d\":\"1b3\"}"]]] + IDX_ND_SKIP_APPROVE: 0, 4, 1 + ------ + READ: SELECT * FROM `/Root/ColumnTable` WHERE JSON_VALUE(Col2, "$.\"b.c.d\"") like "1B3" ORDER BY Col1; + EXPECTED: [] + IDX_ND_SKIP_APPROVE: 0, 4, 1 + )"; + TScriptVariator(script).Execute(); + } + Y_UNIT_TEST(SwitchAccessorCompactionVariants) { TString script = R"( STOP_COMPACTION diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index e048e9051a7..983245835fe 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -431,6 +431,7 @@ message TRequestedBloomNGrammFilter { optional uint32 RecordsCount = 5; optional TIndexDataExtractor DataExtractor = 6; optional TSkipIndexBitSetStorage BitsStorage = 7; + optional bool CaseSensitive = 8 [default = true]; } message TRequestedMaxIndex { @@ -472,6 +473,7 @@ message TBloomNGrammFilter { optional uint32 RecordsCount = 5; optional TIndexDataExtractor DataExtractor = 6; optional TSkipIndexBitSetStorage BitsStorage = 7; + optional bool CaseSensitive = 8 [default = true]; } message TMaxIndex { diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp index 33cb9f8a735..9bb81ebf474 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp @@ -138,7 +138,7 @@ bool TPortionDataSource::DoStartFetchingColumns( } std::shared_ptr<NIndexes::TSkipIndex> TPortionDataSource::SelectOptimalIndex( - const std::vector<std::shared_ptr<NIndexes::TSkipIndex>>& indexes, const NArrow::NSSA::EIndexCheckOperation /*op*/) const { + const std::vector<std::shared_ptr<NIndexes::TSkipIndex>>& indexes, const NArrow::NSSA::TIndexCheckOperation& /*op*/) const { if (indexes.size() == 0) { return nullptr; } diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h index ea8e5769a20..d3ed8214c00 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h @@ -282,7 +282,7 @@ private: virtual void DoAssembleColumns(const std::shared_ptr<TColumnsSet>& columns, const bool sequential) override; std::shared_ptr<NIndexes::TSkipIndex> SelectOptimalIndex( - const std::vector<std::shared_ptr<NIndexes::TSkipIndex>>& indexes, const NArrow::NSSA::EIndexCheckOperation op) const; + const std::vector<std::shared_ptr<NIndexes::TSkipIndex>>& indexes, const NArrow::NSSA::TIndexCheckOperation& op) const; virtual TConclusion<bool> DoStartFetchImpl( const NArrow::NSSA::TProcessorContext& context, const std::vector<std::shared_ptr<NCommon::IKernelFetchLogic>>& fetchersExt) override; diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp index b49760873c2..12b1a1b9c6f 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp @@ -667,7 +667,7 @@ ui32 TIndexInfo::GetColumnIndexVerified(const ui32 id) const { } std::vector<std::shared_ptr<NIndexes::TSkipIndex>> TIndexInfo::FindSkipIndexes( - const NIndexes::NRequest::TOriginalDataAddress& originalDataAddress, const NArrow::NSSA::EIndexCheckOperation op) const { + const NIndexes::NRequest::TOriginalDataAddress& originalDataAddress, const NArrow::NSSA::TIndexCheckOperation& op) const { std::vector<std::shared_ptr<NIndexes::TSkipIndex>> result; for (auto&& [_, i] : Indexes) { if (!i->IsSkipIndex()) { diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h index a2a13726479..92574ef1a2b 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.h +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h @@ -335,7 +335,7 @@ public: } std::vector<std::shared_ptr<NIndexes::TSkipIndex>> FindSkipIndexes( - const NIndexes::NRequest::TOriginalDataAddress& originalDataAddress, const NArrow::NSSA::EIndexCheckOperation op) const; + const NIndexes::NRequest::TOriginalDataAddress& originalDataAddress, const NArrow::NSSA::TIndexCheckOperation& op) const; std::shared_ptr<NIndexes::NMax::TIndexMeta> GetIndexMetaMax(const ui32 columnId) const; std::shared_ptr<NIndexes::NCountMinSketch::TIndexMeta> GetIndexMetaCountMinSketch(const std::set<ui32>& columnIds) const; diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/collection.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/collection.cpp index 07a765aacb9..12b6d788a38 100644 --- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/collection.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/collection.cpp @@ -6,7 +6,7 @@ namespace NKikimr::NOlap::NIndexes { std::shared_ptr<IIndexMeta> TIndexesCollection::FindIndexFor( - const NRequest::TOriginalDataAddress& address, const NArrow::NSSA::EIndexCheckOperation op) const { + const NRequest::TOriginalDataAddress& address, const NArrow::NSSA::TIndexCheckOperation& op) const { auto it = IndexByOriginalData.find(address); if (it == IndexByOriginalData.end()) { return nullptr; diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/collection.h b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/collection.h index 5b6a88985f6..1fc9c5adc2c 100644 --- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/collection.h +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/collection.h @@ -181,7 +181,7 @@ public: } } - std::shared_ptr<IIndexMeta> FindIndexFor(const NRequest::TOriginalDataAddress& address, const NArrow::NSSA::EIndexCheckOperation op) const; + std::shared_ptr<IIndexMeta> FindIndexFor(const NRequest::TOriginalDataAddress& address, const NArrow::NSSA::TIndexCheckOperation& op) const; }; } // namespace NKikimr::NOlap::NIndexes diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.cpp index 6a2bc50a2df..2ba8811ff4a 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.cpp +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.cpp @@ -63,10 +63,10 @@ TString TBloomIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader, const ui3 return GetBitsStorageConstructor()->Build(std::move(filterBits))->SerializeToString(); } -bool TBloomIndexMeta::DoCheckValueImpl( - const IBitsStorage& data, const std::optional<ui64> category, const std::shared_ptr<arrow::Scalar>& value, const EOperation op) const { +bool TBloomIndexMeta::DoCheckValueImpl(const IBitsStorage& data, const std::optional<ui64> category, const std::shared_ptr<arrow::Scalar>& value, + const NArrow::NSSA::TIndexCheckOperation& op) const { std::set<ui64> hashes; - AFL_VERIFY(op == EOperation::Equals)("op", op); + AFL_VERIFY(op.GetOperation() == EOperation::Equals)("op", op.DebugString()); const ui32 bitsCount = data.GetBitsCount(); if (!!category) { for (ui64 hashSeed = 0; hashSeed < HashesCount; ++hashSeed) { @@ -88,8 +88,8 @@ bool TBloomIndexMeta::DoCheckValueImpl( std::optional<ui64> TBloomIndexMeta::DoCalcCategory(const TString& subColumnName) const { ui64 result; - const NRequest::TOriginalDataAddress addr(Max<ui32>(), subColumnName); - AFL_VERIFY(GetDataExtractor()->CheckForIndex(addr, result)); + const NRequest::TOriginalDataAddress addr(GetColumnId(), subColumnName); + AFL_VERIFY(GetDataExtractor()->CheckForIndex(addr, &result)); if (subColumnName) { return result; } else { diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.h b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.h index 2be01b8d794..bdb39bb4450 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.h +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom/meta.h @@ -20,8 +20,8 @@ private: virtual std::optional<ui64> DoCalcCategory(const TString& subColumnName) const override; - virtual bool DoIsAppropriateFor(const TString& /*subColumnName*/, const EOperation op) const override { - return op == EOperation::Equals; + virtual bool DoIsAppropriateFor(const NArrow::NSSA::TIndexCheckOperation& op) const override { + return op.GetOperation() == EOperation::Equals && op.GetCaseSensitive(); } protected: @@ -32,7 +32,7 @@ protected: virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapIndexDescription& proto) const override; virtual bool DoCheckValueImpl(const IBitsStorage& data, const std::optional<ui64> category, const std::shared_ptr<arrow::Scalar>& value, - const EOperation op) const override; + const NArrow::NSSA::TIndexCheckOperation& op) const override; public: TBloomIndexMeta() = default; diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.cpp index b73e9195fea..60131226f06 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.cpp +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.cpp @@ -16,7 +16,7 @@ std::shared_ptr<IIndexMeta> TIndexConstructor::DoCreateIndexMeta( } const ui32 columnId = columnInfo->GetId(); return std::make_shared<TIndexMeta>(indexId, indexName, GetStorageId().value_or(NBlobOperations::TGlobal::DefaultStorageId), columnId, - GetDataExtractor(), HashesCount, FilterSizeBytes, NGrammSize, RecordsCount, TBase::GetBitsStorageConstructor()); + GetDataExtractor(), HashesCount, FilterSizeBytes, NGrammSize, RecordsCount, TBase::GetBitsStorageConstructor(), CaseSensitive); } TConclusionStatus TIndexConstructor::DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) { @@ -61,7 +61,15 @@ TConclusionStatus TIndexConstructor::DoDeserializeFromJson(const NJson::TJsonVal return TConclusionStatus::Fail( "hashes_count have to be in bloom ngramm filter in interval " + TConstants::GetHashesCountIntervalString()); } + + if (jsonInfo.Has("case_sensitive")) { + if (!jsonInfo["case_sensitive"].IsBoolean()) { + return TConclusionStatus::Fail("case_sensitive have to be in bloom filter features as boolean field"); + } + CaseSensitive = jsonInfo["case_sensitive"].GetBoolean(); + } return TConclusionStatus::Success(); + } NKikimr::TConclusionStatus TIndexConstructor::DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexRequested& proto) { @@ -77,6 +85,9 @@ NKikimr::TConclusionStatus TIndexConstructor::DoDeserializeFromProto(const NKiki return conclusion; } } + if (bFilter.HasCaseSensitive()) { + CaseSensitive = bFilter.GetCaseSensitive(); + } RecordsCount = bFilter.GetRecordsCount(); if (!TConstants::CheckRecordsCount(RecordsCount)) { return TConclusionStatus::Fail("RecordsCount have to be in " + TConstants::GetRecordsCountIntervalString()); @@ -107,6 +118,7 @@ void TIndexConstructor::DoSerializeToProto(NKikimrSchemeOp::TOlapIndexRequested& auto* filterProto = proto.MutableBloomNGrammFilter(); TBase::SerializeToProtoBitsStorageOnly(*filterProto); filterProto->SetColumnName(GetColumnName()); + filterProto->SetCaseSensitive(CaseSensitive); filterProto->SetRecordsCount(RecordsCount); filterProto->SetNGrammSize(NGrammSize); filterProto->SetFilterSizeBytes(FilterSizeBytes); diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.h b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.h index 375de80ebd7..343984c0a99 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.h +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/constructor.h @@ -19,6 +19,7 @@ private: ui32 FilterSizeBytes = 512; ui32 HashesCount = 2; ui32 RecordsCount = 10000; + bool CaseSensitive = true; static inline auto Registrator = TFactory::TRegistrator<TIndexConstructor>(GetClassNameStatic()); protected: diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.cpp index c0dabbc4ba5..f0d5443b89d 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.cpp +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.cpp @@ -16,6 +16,7 @@ namespace NKikimr::NOlap::NIndexes::NBloomNGramm { class TNGrammBuilder { private: const ui32 HashesCount; + const bool CaseSensitive; template <ui32 CharsRemained> class THashesBuilder { @@ -133,17 +134,29 @@ private: AFL_VERIFY(false); } }; + TBuffer LowerStringBuffer; public: - TNGrammBuilder(const ui32 hashesCount) - : HashesCount(hashesCount) { + TNGrammBuilder(const ui32 hashesCount, const bool caseSensitive) + : HashesCount(hashesCount) + , CaseSensitive(caseSensitive) { } template <class TAction> void BuildNGramms( const char* data, const ui32 dataSize, const std::optional<NRequest::TLikePart::EOperation> op, const ui32 nGrammSize, TAction& pred) { - THashesSelector<TConstants::MaxHashesCount, TConstants::MaxNGrammSize>::BuildHashes( - (const ui8*)data, dataSize, HashesCount, nGrammSize, op, pred); + if (CaseSensitive) { + THashesSelector<TConstants::MaxHashesCount, TConstants::MaxNGrammSize>::BuildHashes( + (const ui8*)data, dataSize, HashesCount, nGrammSize, op, pred); + } else { + LowerStringBuffer.Clear(); + LowerStringBuffer.Reserve(dataSize); + for (ui32 i = 0; i < dataSize; ++i) { + LowerStringBuffer.Append(std::tolower(data[i])); + } + THashesSelector<TConstants::MaxHashesCount, TConstants::MaxNGrammSize>::BuildHashes( + (const ui8*)LowerStringBuffer.Data(), dataSize, HashesCount, nGrammSize, op, pred); + } } template <class TFiller> @@ -171,8 +184,14 @@ public: } template <class TFiller> - void FillNGrammHashes(const ui32 nGrammSize, const NRequest::TLikePart::EOperation op, const TString& userReq, TFiller& fillData) { - BuildNGramms(userReq.data(), userReq.size(), op, nGrammSize, fillData); + void FillNGrammHashes( + const ui32 nGrammSize, const NRequest::TLikePart::EOperation op, const TString& userReq, TFiller& fillData) { + if (CaseSensitive) { + BuildNGramms(userReq.data(), userReq.size(), op, nGrammSize, fillData); + } else { + const TString lowerString = to_lower(userReq); + BuildNGramms(lowerString.data(), lowerString.size(), op, nGrammSize, fillData); + } } }; @@ -259,7 +278,7 @@ public: TString TIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader, const ui32 recordsCount) const { AFL_VERIFY(reader.GetColumnsCount() == 1)("count", reader.GetColumnsCount()); - TNGrammBuilder builder(HashesCount); + TNGrammBuilder builder(HashesCount, CaseSensitive); ui32 size = FilterSizeBytes * 8; if ((size & (size - 1)) == 0) { @@ -311,8 +330,8 @@ TString TIndexMeta::DoBuildIndexImpl(TChunkedBatchReader& reader, const ui32 rec return GetBitsStorageConstructor()->Build(inserter.ExtractBits())->SerializeToString(); } -bool TIndexMeta::DoCheckValueImpl( - const IBitsStorage& data, const std::optional<ui64> category, const std::shared_ptr<arrow::Scalar>& value, const EOperation op) const { +bool TIndexMeta::DoCheckValueImpl(const IBitsStorage& data, const std::optional<ui64> category, const std::shared_ptr<arrow::Scalar>& value, + const NArrow::NSSA::TIndexCheckOperation& op) const { AFL_VERIFY(!category); AFL_VERIFY(value->type->id() == arrow::utf8()->id() || value->type->id() == arrow::binary()->id())("id", value->type->ToString()); bool result = true; @@ -322,10 +341,11 @@ bool TIndexMeta::DoCheckValueImpl( result = false; } }; - TNGrammBuilder builder(HashesCount); + TNGrammBuilder builder(HashesCount, CaseSensitive); + AFL_VERIFY(!CaseSensitive || op.GetCaseSensitive()); NRequest::TLikePart::EOperation opLike; - switch (op) { + switch (op.GetOperation()) { case TSkipIndex::EOperation::Equals: opLike = NRequest::TLikePart::EOperation::Equals; break; diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.h b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.h index d1326c3da46..31f29d1e7a5 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.h +++ b/ydb/core/tx/columnshard/engines/storage/indexes/bloom_ngramm/meta.h @@ -13,6 +13,7 @@ public: private: using TBase = TSkipBitmapIndex; std::shared_ptr<arrow::Schema> ResultSchema; + bool CaseSensitive = true; ui32 NGrammSize = 3; ui32 FilterSizeBytes = 512; ui32 RecordsCount = 10000; @@ -28,16 +29,13 @@ private: AFL_VERIFY(TConstants::CheckRecordsCount(RecordsCount)); } - virtual bool DoIsAppropriateFor(const TString& subColumnName, const EOperation op) const override { - if (!!subColumnName) { - return false; - } - switch (op) { + virtual bool DoIsAppropriateFor(const NArrow::NSSA::TIndexCheckOperation& op) const override { + switch (op.GetOperation()) { case EOperation::Equals: case EOperation::StartsWith: case EOperation::EndsWith: case EOperation::Contains: - return true; + return !CaseSensitive || op.GetCaseSensitive(); } return false; @@ -50,12 +48,6 @@ protected: return TConclusionStatus::Fail( "cannot read meta as appropriate class: " + GetClassName() + ". Meta said that class name is " + newMeta.GetClassName()); } - if (HashesCount != bMeta->HashesCount) { - return TConclusionStatus::Fail("cannot modify hashes count"); - } - if (NGrammSize != bMeta->NGrammSize) { - return TConclusionStatus::Fail("cannot modify ngramm size"); - } return TBase::CheckSameColumnsForModification(newMeta); } virtual TString DoBuildIndexImpl(TChunkedBatchReader& reader, const ui32 recordsCount) const override; @@ -80,6 +72,9 @@ protected: if (!MutableDataExtractor().DeserializeFromProto(bFilter.GetDataExtractor())) { return false; } + if (bFilter.HasCaseSensitive()) { + CaseSensitive = bFilter.GetCaseSensitive(); + } HashesCount = bFilter.GetHashesCount(); if (!TConstants::CheckHashesCount(HashesCount)) { return false; @@ -111,18 +106,20 @@ protected: filterProto->SetFilterSizeBytes(FilterSizeBytes); filterProto->SetHashesCount(HashesCount); filterProto->SetColumnId(GetColumnId()); + filterProto->SetCaseSensitive(CaseSensitive); *filterProto->MutableDataExtractor() = GetDataExtractor().SerializeToProto(); } virtual bool DoCheckValueImpl(const IBitsStorage& data, const std::optional<ui64> category, const std::shared_ptr<arrow::Scalar>& value, - const EOperation op) const override; + const NArrow::NSSA::TIndexCheckOperation& op) const override; public: TIndexMeta() = default; TIndexMeta(const ui32 indexId, const TString& indexName, const TString& storageId, const ui32 columnId, const TReadDataExtractorContainer& dataExtractor, const ui32 hashesCount, const ui32 filterSizeBytes, const ui32 nGrammSize, - const ui32 recordsCount, const std::shared_ptr<IBitsStorageConstructor>& bitsStorageConstructor) + const ui32 recordsCount, const std::shared_ptr<IBitsStorageConstructor>& bitsStorageConstructor, const bool caseSensitive) : TBase(indexId, indexName, columnId, storageId, dataExtractor, bitsStorageConstructor) + , CaseSensitive(caseSensitive) , NGrammSize(nGrammSize) , FilterSizeBytes(filterSizeBytes) , RecordsCount(recordsCount) diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/categories_bloom/meta.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/categories_bloom/meta.cpp index e6d7c62b1e5..6ce01b442d5 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/categories_bloom/meta.cpp +++ b/ydb/core/tx/columnshard/engines/storage/indexes/categories_bloom/meta.cpp @@ -166,10 +166,11 @@ TConclusion<std::shared_ptr<IIndexHeader>> TIndexMeta::DoBuildHeader(const TChun return std::make_shared<TCompositeBloomHeader>(std::move(proto), IIndexHeader::ReadHeaderSize(data.GetDataVerified(), true).DetachResult()); } -bool TIndexMeta::DoCheckValueImpl( - const IBitsStorage& data, const std::optional<ui64> category, const std::shared_ptr<arrow::Scalar>& value, const EOperation op) const { +bool TIndexMeta::DoCheckValueImpl(const IBitsStorage& data, const std::optional<ui64> category, const std::shared_ptr<arrow::Scalar>& value, + const NArrow::NSSA::TIndexCheckOperation& op) const { AFL_VERIFY(!!category); - AFL_VERIFY(op == EOperation::Equals)("op", op); + AFL_VERIFY(op.GetOperation() == EOperation::Equals)("op", op.DebugString()); + AFL_VERIFY(op.GetCaseSensitive()); const ui32 bitsCount = data.GetBitsCount(); if (!bitsCount) { return false; @@ -185,8 +186,8 @@ bool TIndexMeta::DoCheckValueImpl( std::optional<ui64> TIndexMeta::DoCalcCategory(const TString& subColumnName) const { ui64 result; - const NRequest::TOriginalDataAddress addr(Max<ui32>(), subColumnName); - AFL_VERIFY(GetDataExtractor()->CheckForIndex(addr, result)); + const NRequest::TOriginalDataAddress addr(GetColumnId(), subColumnName); + AFL_VERIFY(GetDataExtractor()->CheckForIndex(addr, &result)); return result; } diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/categories_bloom/meta.h b/ydb/core/tx/columnshard/engines/storage/indexes/categories_bloom/meta.h index 6f7a2396971..decf03fb792 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/categories_bloom/meta.h +++ b/ydb/core/tx/columnshard/engines/storage/indexes/categories_bloom/meta.h @@ -21,15 +21,15 @@ private: } virtual bool DoCheckValueImpl(const IBitsStorage& data, const std::optional<ui64> category, const std::shared_ptr<arrow::Scalar>& value, - const EOperation op) const override; + const NArrow::NSSA::TIndexCheckOperation& op) const override; virtual TConclusion<std::shared_ptr<IIndexHeader>> DoBuildHeader(const TChunkOriginalData& data) const override; - virtual bool DoIsAppropriateFor(const TString& subColumnName, const EOperation op) const override { - if (!subColumnName) { + virtual bool DoIsAppropriateFor(const NArrow::NSSA::TIndexCheckOperation& op) const override { + if (!op.GetCaseSensitive()) { return false; } - if (op != EOperation::Equals) { + if (op.GetOperation() != EOperation::Equals) { return false; } return true; diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/abstract.h b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/abstract.h index 82fda999716..ebdb32051af 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/abstract.h +++ b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/abstract.h @@ -23,7 +23,7 @@ private: virtual void DoSerializeToProto(TProto& proto) const = 0; virtual bool DoDeserializeFromProto(const TProto& proto) = 0; - virtual bool DoCheckForIndex(const NRequest::TOriginalDataAddress& dataSource, ui64& baseHash) const = 0; + virtual bool DoCheckForIndex(const NRequest::TOriginalDataAddress& dataSource, ui64* baseHash) const = 0; virtual THashMap<ui64, ui32> DoGetIndexHitsCount(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& dataArray) const = 0; public: @@ -34,8 +34,10 @@ public: return DoGetIndexHitsCount(dataArray); } - bool CheckForIndex(const NRequest::TOriginalDataAddress& dataSource, ui64& baseHash) const { - baseHash = 0; + bool CheckForIndex(const NRequest::TOriginalDataAddress& dataSource, ui64* baseHash) const { + if (baseHash) { + *baseHash = 0; + } return DoCheckForIndex(dataSource, baseHash); } diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/default.cpp b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/default.cpp index fd85f2ef333..9a38a9580ea 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/default.cpp +++ b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/default.cpp @@ -37,7 +37,7 @@ void TDefaultDataExtractor::DoVisitAll(const std::shared_ptr<NArrow::NAccessor:: } } -bool TDefaultDataExtractor::DoCheckForIndex(const NRequest::TOriginalDataAddress& request, ui64& hashBase) const { +bool TDefaultDataExtractor::DoCheckForIndex(const NRequest::TOriginalDataAddress& request, ui64* hashBase) const { if (request.GetSubColumnName()) { std::string_view sv = [&]() { if (request.GetSubColumnName().StartsWith("$.")) { @@ -46,7 +46,9 @@ bool TDefaultDataExtractor::DoCheckForIndex(const NRequest::TOriginalDataAddress return std::string_view(request.GetSubColumnName().data(), request.GetSubColumnName().size()); } }(); - hashBase = NRequest::TOriginalDataAddress::CalcSubColumnHash(sv); + if (hashBase) { + *hashBase = NRequest::TOriginalDataAddress::CalcSubColumnHash(sv); + } } return true; } diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/default.h b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/default.h index 64f534edf59..47de05f3f94 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/default.h +++ b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/default.h @@ -27,7 +27,7 @@ private: virtual void DoVisitAll(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& dataArray, const TChunkVisitor& chunkVisitor, const TRecordVisitor& recordVisitor) const override; - virtual bool DoCheckForIndex(const NRequest::TOriginalDataAddress& request, ui64& hashBase) const override; + virtual bool DoCheckForIndex(const NRequest::TOriginalDataAddress& request, ui64* hashBase) const override; virtual THashMap<ui64, ui32> DoGetIndexHitsCount(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& dataArray) const override; public: diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/sub_column.h b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/sub_column.h index f7e9e6daa69..6479afc7892 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/sub_column.h +++ b/ydb/core/tx/columnshard/engines/storage/indexes/portions/extractor/sub_column.h @@ -45,7 +45,7 @@ private: virtual void DoVisitAll(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& dataArray, const TChunkVisitor& chunkVisitor, const TRecordVisitor& recordVisitor) const override; - virtual bool DoCheckForIndex(const NRequest::TOriginalDataAddress& request, ui64& /*hashBase*/) const override { + virtual bool DoCheckForIndex(const NRequest::TOriginalDataAddress& request, ui64* /*hashBase*/) const override { return request.GetSubColumnName() == SubColumnName; } diff --git a/ydb/core/tx/columnshard/engines/storage/indexes/skip_index/meta.h b/ydb/core/tx/columnshard/engines/storage/indexes/skip_index/meta.h index 35eb6b0efe3..32941908876 100644 --- a/ydb/core/tx/columnshard/engines/storage/indexes/skip_index/meta.h +++ b/ydb/core/tx/columnshard/engines/storage/indexes/skip_index/meta.h @@ -13,15 +13,16 @@ private: using TBase = TIndexByColumns; public: - using EOperation = NArrow::NSSA::EIndexCheckOperation; + using EOperation = NArrow::NSSA::TIndexCheckOperation::EOperation; private: - virtual bool DoIsAppropriateFor(const TString& subColumnName, const EOperation op) const = 0; - virtual bool DoCheckValue( - const TString& data, const std::optional<ui64> cat, const std::shared_ptr<arrow::Scalar>& value, const EOperation op) const = 0; + virtual bool DoIsAppropriateFor(const NArrow::NSSA::TIndexCheckOperation& op) const = 0; + virtual bool DoCheckValue(const TString& data, const std::optional<ui64> cat, const std::shared_ptr<arrow::Scalar>& value, + const NArrow::NSSA::TIndexCheckOperation& op) const = 0; public: - bool CheckValue(const TString& data, const std::optional<ui64> cat, const std::shared_ptr<arrow::Scalar>& value, const EOperation op) const { + bool CheckValue(const TString& data, const std::optional<ui64> cat, const std::shared_ptr<arrow::Scalar>& value, + const NArrow::NSSA::TIndexCheckOperation& op) const { return DoCheckValue(data, cat, value, op); } @@ -29,11 +30,14 @@ public: return true; } - bool IsAppropriateFor(const NRequest::TOriginalDataAddress& addr, const EOperation op) const { + bool IsAppropriateFor(const NRequest::TOriginalDataAddress& addr, const NArrow::NSSA::TIndexCheckOperation& op) const { if (GetColumnId() != addr.GetColumnId()) { return false; } - return DoIsAppropriateFor(addr.GetSubColumnName(), op); + if (!GetDataExtractor()->CheckForIndex(addr, nullptr)) { + return false; + } + return DoIsAppropriateFor(op); } using TBase::TBase; }; @@ -42,11 +46,11 @@ class TSkipBitmapIndex: public TSkipIndex { private: std::shared_ptr<IBitsStorageConstructor> BitsStorageConstructor; using TBase = TSkipIndex; - virtual bool DoCheckValueImpl( - const IBitsStorage& data, const std::optional<ui64> cat, const std::shared_ptr<arrow::Scalar>& value, const EOperation op) const = 0; + virtual bool DoCheckValueImpl(const IBitsStorage& data, const std::optional<ui64> cat, const std::shared_ptr<arrow::Scalar>& value, + const NArrow::NSSA::TIndexCheckOperation& op) const = 0; virtual bool DoCheckValue(const TString& data, const std::optional<ui64> cat, const std::shared_ptr<arrow::Scalar>& value, - const EOperation op) const override final { + const NArrow::NSSA::TIndexCheckOperation& op) const override final { if (data.empty()) { return false; } diff --git a/ydb/core/tx/columnshard/operations/slice_builder/pack_builder.cpp b/ydb/core/tx/columnshard/operations/slice_builder/pack_builder.cpp index 6ab38d7cbc2..d3bc168d9b7 100644 --- a/ydb/core/tx/columnshard/operations/slice_builder/pack_builder.cpp +++ b/ydb/core/tx/columnshard/operations/slice_builder/pack_builder.cpp @@ -89,13 +89,14 @@ public: SequentialWriteId.emplace_back(data->GetWriteMeta().GetWriteId()); } - [[nodiscard]] TConclusionStatus Finalize(const NOlap::TWritingContext& context, std::vector<TPortionWriteController::TInsertPortion>& result) { + [[nodiscard]] TConclusionStatus Finalize( + const NOlap::TWritingContext& context, std::vector<TPortionWriteController::TInsertPortion>& result) { if (Batches.size() == 0) { return TConclusionStatus::Success(); } if (Batches.size() == 1) { - auto portionConclusion = context.GetActualSchema()->PrepareForWrite(context.GetActualSchema(), PathId, Batches.front().GetContainer(), - ModificationType, context.GetStoragesManager(), context.GetSplitterCounters()); + auto portionConclusion = context.GetActualSchema()->PrepareForWrite(context.GetActualSchema(), PathId, + Batches.front().GetContainer(), ModificationType, context.GetStoragesManager(), context.GetSplitterCounters()); result.emplace_back(portionConclusion.DetachResult()); } else { ui32 idx = 0; @@ -121,10 +122,12 @@ public: if (defaultColumn.IsFail()) { return defaultColumn; } - gContainer->AddField(context.GetActualSchema()->GetFieldByIndexVerified(*itAllIndexes), defaultColumn.DetachResult()).Validate(); + gContainer->AddField(context.GetActualSchema()->GetFieldByIndexVerified(*itAllIndexes), defaultColumn.DetachResult()) + .Validate(); } else { AFL_VERIFY(*itAllIndexes == *itBatchIndexes); - gContainer->AddField(context.GetActualSchema()->GetFieldByIndexVerified(*itAllIndexes), + gContainer + ->AddField(context.GetActualSchema()->GetFieldByIndexVerified(*itAllIndexes), i->column(itBatchIndexes - i.GetColumnIndexes().begin())) .Validate(); ++itBatchIndexes; @@ -193,7 +196,10 @@ TConclusionStatus TBuildPackSlicesTask::DoExecute(const std::shared_ptr<ITask>& } std::vector<TPortionWriteController::TInsertPortion> portionsToWrite; for (auto&& i : slicesToMerge) { - i.Finalize(Context, portionsToWrite).Validate(); + auto conclusion = i.Finalize(Context, portionsToWrite); + if (conclusion.IsFail()) { + return conclusion; + } } auto actions = WriteUnits.front().GetData()->GetBlobsAction(); auto writeController = diff --git a/ydb/core/tx/program/builder.cpp b/ydb/core/tx/program/builder.cpp index 31a5d1be32f..1f314b3974c 100644 --- a/ydb/core/tx/program/builder.cpp +++ b/ydb/core/tx/program/builder.cpp @@ -17,8 +17,13 @@ namespace NKikimr::NArrow::NSSA { -TConclusion<std::shared_ptr<IStepFunction>> TProgramBuilder::MakeFunction( - const TColumnInfo& name, const NKikimrSSA::TProgram::TAssignment::TFunction& func, std::vector<TColumnChainInfo>& arguments) const { +TConclusion<std::shared_ptr<IStepFunction>> TProgramBuilder::MakeFunction(const TColumnInfo& name, + const NKikimrSSA::TProgram::TAssignment::TFunction& func, std::shared_ptr<NArrow::NSSA::IKernelLogic>& kernelLogic, + std::vector<TColumnChainInfo>& arguments) const { + if (func.GetKernelName()) { + kernelLogic.reset(IKernelLogic::TFactory::Construct(func.GetKernelName())); + } + using TId = NKikimrSSA::TProgram::TAssignment; arguments.clear(); @@ -27,6 +32,17 @@ TConclusion<std::shared_ptr<IStepFunction>> TProgramBuilder::MakeFunction( } if (func.GetFunctionType() == NKikimrSSA::TProgram::EFunctionType::TProgram_EFunctionType_YQL_KERNEL) { + if (func.HasYqlOperationId() && !kernelLogic) { + if (func.GetYqlOperationId() == (ui32)NYql::TKernelRequestBuilder::EBinaryOp::Equals) { + kernelLogic = std::make_shared<TLogicEquals>(false); + } else if (func.GetYqlOperationId() == (ui32)NYql::TKernelRequestBuilder::EBinaryOp::StringContains) { + kernelLogic = std::make_shared<TLogicMatchString>(TIndexCheckOperation::EOperation::Contains, true, false); + } else if (func.GetYqlOperationId() == (ui32)NYql::TKernelRequestBuilder::EBinaryOp::StartsWith) { + kernelLogic = std::make_shared<TLogicMatchString>(TIndexCheckOperation::EOperation::StartsWith, true, false); + } else if (func.GetYqlOperationId() == (ui32)NYql::TKernelRequestBuilder::EBinaryOp::EndsWith) { + kernelLogic = std::make_shared<TLogicMatchString>(TIndexCheckOperation::EOperation::EndsWith, true, false); + } + } auto kernelFunction = KernelsRegistry.GetFunction(func.GetKernelIdx()); if (!kernelFunction) { return TConclusionStatus::Fail( @@ -59,6 +75,7 @@ TConclusion<std::shared_ptr<IStepFunction>> TProgramBuilder::MakeFunction( switch (func.GetId()) { case TId::FUNC_CMP_EQUAL: + kernelLogic = std::make_shared<TLogicEquals>(true); return std::make_shared<TSimpleFunction>(EOperation::Equal); case TId::FUNC_CMP_NOT_EQUAL: return std::make_shared<TSimpleFunction>(EOperation::NotEqual); @@ -76,6 +93,7 @@ TConclusion<std::shared_ptr<IStepFunction>> TProgramBuilder::MakeFunction( return std::make_shared<TSimpleFunction>(EOperation::BinaryLength); case TId::FUNC_STR_MATCH: { if (auto opts = mkLikeOptions(false)) { + kernelLogic = std::make_shared<TLogicMatchString>(TIndexCheckOperation::EOperation::Contains, true, true); return std::make_shared<TSimpleFunction>(EOperation::MatchSubstring, opts); } break; @@ -88,30 +106,35 @@ TConclusion<std::shared_ptr<IStepFunction>> TProgramBuilder::MakeFunction( } case TId::FUNC_STR_STARTS_WITH: { if (auto opts = mkLikeOptions(false)) { + kernelLogic = std::make_shared<TLogicMatchString>(TIndexCheckOperation::EOperation::StartsWith, true, true); return std::make_shared<TSimpleFunction>(EOperation::StartsWith, opts); } break; } case TId::FUNC_STR_ENDS_WITH: { if (auto opts = mkLikeOptions(false)) { + kernelLogic = std::make_shared<TLogicMatchString>(TIndexCheckOperation::EOperation::EndsWith, true, true); return std::make_shared<TSimpleFunction>(EOperation::EndsWith, opts); } break; } case TId::FUNC_STR_MATCH_IGNORE_CASE: { if (auto opts = mkLikeOptions(true)) { + kernelLogic = std::make_shared<TLogicMatchString>(TIndexCheckOperation::EOperation::Contains, false, true); return std::make_shared<TSimpleFunction>(EOperation::MatchSubstring, opts); } break; } case TId::FUNC_STR_STARTS_WITH_IGNORE_CASE: { if (auto opts = mkLikeOptions(true)) { + kernelLogic = std::make_shared<TLogicMatchString>(TIndexCheckOperation::EOperation::StartsWith, false, true); return std::make_shared<TSimpleFunction>(EOperation::StartsWith, opts); } break; } case TId::FUNC_STR_ENDS_WITH_IGNORE_CASE: { if (auto opts = mkLikeOptions(true)) { + kernelLogic = std::make_shared<TLogicMatchString>(TIndexCheckOperation::EOperation::EndsWith, false, true); return std::make_shared<TSimpleFunction>(EOperation::EndsWith, opts); } break; @@ -276,18 +299,14 @@ TConclusionStatus TProgramBuilder::ReadAssign( switch (assign.GetExpressionCase()) { case TId::kFunction: { std::shared_ptr<IKernelLogic> kernelLogic; - if (assign.GetFunction().GetKernelName()) { - kernelLogic.reset(IKernelLogic::TFactory::Construct(assign.GetFunction().GetKernelName())); - } - std::vector<TColumnChainInfo> arguments; - auto function = MakeFunction(columnName, assign.GetFunction(), arguments); + auto function = MakeFunction(columnName, assign.GetFunction(), kernelLogic, arguments); if (function.IsFail()) { return function; } - if (assign.GetFunction().HasYqlOperationId() && assign.GetFunction().GetYqlOperationId() == - (ui32)NYql::TKernelRequestBuilder::EBinaryOp::And) { + if (assign.GetFunction().HasYqlOperationId() && + assign.GetFunction().GetYqlOperationId() == (ui32)NYql::TKernelRequestBuilder::EBinaryOp::And) { auto processor = std::make_shared<TStreamLogicProcessor>(std::move(arguments), columnName.GetColumnId(), NKernels::EOperation::And); Builder.Add(processor); diff --git a/ydb/core/tx/program/builder.h b/ydb/core/tx/program/builder.h index 94d67930087..5e7b0fcb17d 100644 --- a/ydb/core/tx/program/builder.h +++ b/ydb/core/tx/program/builder.h @@ -7,6 +7,7 @@ #include <ydb/core/formats/arrow/program/functions.h> #include <ydb/core/formats/arrow/program/graph_execute.h> #include <ydb/core/formats/arrow/program/graph_optimization.h> +#include <ydb/core/formats/arrow/program/kernel_logic.h> #include <ydb/library/formats/arrow/protos/ssa.pb.h> @@ -37,8 +38,9 @@ private: TColumnInfo GetColumnInfo(const NKikimrSSA::TProgram::TColumn& column) const; std::string GenerateName(const NKikimrSSA::TProgram::TColumn& column) const; - [[nodiscard]] TConclusion<std::shared_ptr<IStepFunction>> MakeFunction( - const TColumnInfo& name, const NKikimrSSA::TProgram::TAssignment::TFunction& func, std::vector<TColumnChainInfo>& arguments) const; + [[nodiscard]] TConclusion<std::shared_ptr<IStepFunction>> MakeFunction(const TColumnInfo& name, + const NKikimrSSA::TProgram::TAssignment::TFunction& func, std::shared_ptr<IKernelLogic>& kernelLogic, + std::vector<TColumnChainInfo>& arguments) const; [[nodiscard]] TConclusion<std::shared_ptr<TConstProcessor>> MakeConstant( const TColumnInfo& name, const NKikimrSSA::TProgram::TConstant& constant) const; [[nodiscard]] TConclusion<std::shared_ptr<TConstProcessor>> MaterializeParameter(const TColumnInfo& name, |
