summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/formats/arrow/program/abstract.cpp4
-rw-r--r--ydb/core/formats/arrow/program/abstract.h54
-rw-r--r--ydb/core/formats/arrow/program/aggr_keys.cpp3
-rw-r--r--ydb/core/formats/arrow/program/aggr_keys.h13
-rw-r--r--ydb/core/formats/arrow/program/assign_const.cpp2
-rw-r--r--ydb/core/formats/arrow/program/assign_const.h2
-rw-r--r--ydb/core/formats/arrow/program/assign_internal.cpp3
-rw-r--r--ydb/core/formats/arrow/program/assign_internal.h21
-rw-r--r--ydb/core/formats/arrow/program/chain.cpp15
-rw-r--r--ydb/core/formats/arrow/program/filter.cpp12
-rw-r--r--ydb/core/formats/arrow/program/filter.h17
-rw-r--r--ydb/core/formats/arrow/program/graph.cpp281
-rw-r--r--ydb/core/formats/arrow/program/graph.h147
-rw-r--r--ydb/core/formats/arrow/program/original.cpp5
-rw-r--r--ydb/core/formats/arrow/program/original.h39
-rw-r--r--ydb/core/formats/arrow/program/projection.cpp3
-rw-r--r--ydb/core/formats/arrow/program/projection.h2
-rw-r--r--ydb/core/formats/arrow/program/ya.make4
-rw-r--r--ydb/core/formats/arrow/ut/ut_program_step.cpp75
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common_reader/iterator/sub_columns_fetching.h7
21 files changed, 657 insertions, 54 deletions
diff --git a/ydb/core/formats/arrow/program/abstract.cpp b/ydb/core/formats/arrow/program/abstract.cpp
index 5f3850c1880..dd6e58ad7c0 100644
--- a/ydb/core/formats/arrow/program/abstract.cpp
+++ b/ydb/core/formats/arrow/program/abstract.cpp
@@ -20,13 +20,13 @@ NJson::TJsonValue IResourceProcessor::DebugJson() const {
return result;
}
-TConclusionStatus IResourceProcessor::Execute(const std::shared_ptr<TAccessorsCollection>& resources) const {
+TConclusionStatus IResourceProcessor::Execute(const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& context) const {
for (auto&& i : Output) {
if (resources->HasColumn(i.GetColumnId())) {
return TConclusionStatus::Fail("column " + ::ToString(i.GetColumnId()) + " has already");
}
}
- return DoExecute(resources);
+ return DoExecute(resources, context);
}
std::optional<TFetchingInfo> IResourceProcessor::BuildFetchTask(
diff --git a/ydb/core/formats/arrow/program/abstract.h b/ydb/core/formats/arrow/program/abstract.h
index 35563d089a8..7365d30c3c1 100644
--- a/ydb/core/formats/arrow/program/abstract.h
+++ b/ydb/core/formats/arrow/program/abstract.h
@@ -164,7 +164,8 @@ enum class EProcessorType {
Calculation,
Projection,
Filter,
- Aggregation
+ Aggregation,
+ Original
};
class TFetchingInfo {
@@ -187,20 +188,53 @@ public:
}
};
+class TProcessorContext {
+protected:
+ std::vector<TColumnChainInfo> ColumnsToFetch;
+ std::vector<TColumnChainInfo> OriginalColumnsToUse;
+ std::vector<TColumnChainInfo> ColumnsToDrop;
+
+public:
+ const std::vector<TColumnChainInfo>& GetColumnsToFetch() const {
+ return ColumnsToFetch;
+ }
+ const std::vector<TColumnChainInfo>& GetOriginalColumnsToUse() const {
+ return OriginalColumnsToUse;
+ }
+ const std::vector<TColumnChainInfo>& GetColumnsToDrop() const {
+ return ColumnsToDrop;
+ }
+
+ TProcessorContext(
+ std::vector<TColumnChainInfo>&& toFetch, std::vector<TColumnChainInfo>&& originalToUse, std::vector<TColumnChainInfo>&& toDrop)
+ : ColumnsToFetch(std::move(toFetch))
+ , OriginalColumnsToUse(std::move(originalToUse))
+ , ColumnsToDrop(std::move(toDrop)) {
+ }
+};
+
class IResourceProcessor {
private:
YDB_READONLY_DEF(std::vector<TColumnChainInfo>, Input);
YDB_READONLY_DEF(std::vector<TColumnChainInfo>, Output);
YDB_READONLY(EProcessorType, ProcessorType, EProcessorType::Unknown);
- virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources) const = 0;
+ virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& context) const = 0;
virtual NJson::TJsonValue DoDebugJson() const {
return NJson::JSON_MAP;
}
+ virtual ui64 DoGetWeight() const {
+ return 0;
+ }
public:
- virtual std::optional<TFetchingInfo> BuildFetchTask(const ui32 columnId, const NAccessor::IChunkedArray::EType arrType, const std::shared_ptr<TAccessorsCollection>& resources) const;
+ ui64 GetWeight() const {
+ return DoGetWeight();
+ }
+
+ virtual std::optional<TFetchingInfo> BuildFetchTask(
+ const ui32 columnId, const NAccessor::IChunkedArray::EType arrType, const std::shared_ptr<TAccessorsCollection>& resources) const;
virtual bool IsAggregation() const = 0;
@@ -224,25 +258,21 @@ public:
, ProcessorType(type) {
}
- [[nodiscard]] TConclusionStatus Execute(const std::shared_ptr<TAccessorsCollection>& resources) const;
+ [[nodiscard]] TConclusionStatus Execute(const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& context) const;
};
-class TResourceProcessorStep {
+class TResourceProcessorStep: public TProcessorContext {
private:
- YDB_READONLY_DEF(std::vector<TColumnChainInfo>, ColumnsToFetch);
- YDB_READONLY_DEF(std::vector<TColumnChainInfo>, OriginalColumnsToUse);
+ using TBase = TProcessorContext;
YDB_READONLY_DEF(std::shared_ptr<IResourceProcessor>, Processor);
- YDB_READONLY_DEF(std::vector<TColumnChainInfo>, ColumnsToDrop);
public:
NJson::TJsonValue DebugJson() const;
TResourceProcessorStep(std::vector<TColumnChainInfo>&& toFetch, std::vector<TColumnChainInfo>&& originalToUse,
std::shared_ptr<IResourceProcessor>&& processor, std::vector<TColumnChainInfo>&& toDrop)
- : ColumnsToFetch(std::move(toFetch))
- , OriginalColumnsToUse(std::move(originalToUse))
- , Processor(std::move(processor))
- , ColumnsToDrop(std::move(toDrop)) {
+ : TBase(std::move(toFetch), std::move(originalToUse), std::move(toDrop))
+ , Processor(std::move(processor)) {
AFL_VERIFY(Processor);
}
diff --git a/ydb/core/formats/arrow/program/aggr_keys.cpp b/ydb/core/formats/arrow/program/aggr_keys.cpp
index 3b16025caa6..c1c3d29a199 100644
--- a/ydb/core/formats/arrow/program/aggr_keys.cpp
+++ b/ydb/core/formats/arrow/program/aggr_keys.cpp
@@ -66,7 +66,8 @@ CH::AggFunctionId TWithKeysAggregationOption::GetHouseFunction(const EAggregate
return CH::AggFunctionId::AGG_UNSPECIFIED;
}
-TConclusionStatus TWithKeysAggregationProcessor::DoExecute(const std::shared_ptr<TAccessorsCollection>& resources) const {
+TConclusionStatus TWithKeysAggregationProcessor::DoExecute(
+ const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& /*context*/) const {
CH::GroupByOptions funcOpts;
funcOpts.assigns.reserve(AggregationKeys.size() + Aggregations.size());
funcOpts.has_nullable_key = false;
diff --git a/ydb/core/formats/arrow/program/aggr_keys.h b/ydb/core/formats/arrow/program/aggr_keys.h
index 9950927aa46..24cadba8a63 100644
--- a/ydb/core/formats/arrow/program/aggr_keys.h
+++ b/ydb/core/formats/arrow/program/aggr_keys.h
@@ -20,7 +20,7 @@ private:
}
virtual TConclusion<arrow::Datum> Call(
const TExecFunctionContext& context, const std::shared_ptr<TAccessorsCollection>& resources) const override;
-
+
TConclusion<arrow::Datum> PrepareResult(arrow::Datum&& datum) const override {
if (!datum.is_scalar()) {
return TConclusionStatus::Fail("Aggregate result is not a scalar.");
@@ -103,13 +103,14 @@ public:
return "";
}
- virtual TConclusionStatus CheckIO(const std::vector<TColumnChainInfo>& /*input*/, const std::vector<TColumnChainInfo>& output) const override {
+ virtual TConclusionStatus CheckIO(
+ const std::vector<TColumnChainInfo>& /*input*/, const std::vector<TColumnChainInfo>& output) const override {
if (output.size() != 1) {
return TConclusionStatus::Fail("output size != 1 (" + ::ToString(output.size()) + ")");
}
-// if (input.size() != 1) {
-// return TConclusionStatus::Fail("input size != 1 (" + ::ToString(input.size()) + ")");
-// }
+ // if (input.size() != 1) {
+ // return TConclusionStatus::Fail("input size != 1 (" + ::ToString(input.size()) + ")");
+ // }
return TConclusionStatus::Success();
}
};
@@ -149,7 +150,7 @@ private:
std::vector<TColumnChainInfo> AggregationKeys;
std::vector<TWithKeysAggregationOption> Aggregations;
- virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources) const override;
+ virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& context) const override;
TWithKeysAggregationProcessor(std::vector<TColumnChainInfo>&& input, std::vector<TColumnChainInfo>&& output,
std::vector<TColumnChainInfo>&& aggregationKeys, std::vector<TWithKeysAggregationOption>&& aggregations)
diff --git a/ydb/core/formats/arrow/program/assign_const.cpp b/ydb/core/formats/arrow/program/assign_const.cpp
index 1d01cb7cd69..726cbd2ffdd 100644
--- a/ydb/core/formats/arrow/program/assign_const.cpp
+++ b/ydb/core/formats/arrow/program/assign_const.cpp
@@ -10,7 +10,7 @@
namespace NKikimr::NArrow::NSSA {
-TConclusionStatus TConstProcessor::DoExecute(const std::shared_ptr<TAccessorsCollection>& resources) const {
+TConclusionStatus TConstProcessor::DoExecute(const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& /*context*/) const {
AFL_VERIFY(GetInput().empty());
resources->AddConstantVerified(GetOutputColumnIdOnce(), ScalarConstant);
return TConclusionStatus::Success();
diff --git a/ydb/core/formats/arrow/program/assign_const.h b/ydb/core/formats/arrow/program/assign_const.h
index 5cbebe617b0..28372184771 100644
--- a/ydb/core/formats/arrow/program/assign_const.h
+++ b/ydb/core/formats/arrow/program/assign_const.h
@@ -8,7 +8,7 @@ private:
using TBase = IResourceProcessor;
YDB_READONLY_DEF(std::shared_ptr<arrow::Scalar>, ScalarConstant);
- virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources) const override;
+ virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& context) const override;
virtual bool IsAggregation() const override {
return false;
diff --git a/ydb/core/formats/arrow/program/assign_internal.cpp b/ydb/core/formats/arrow/program/assign_internal.cpp
index db87ede4d55..0d2fc2833df 100644
--- a/ydb/core/formats/arrow/program/assign_internal.cpp
+++ b/ydb/core/formats/arrow/program/assign_internal.cpp
@@ -4,7 +4,8 @@
namespace NKikimr::NArrow::NSSA {
-TConclusionStatus TCalculationProcessor::DoExecute(const std::shared_ptr<TAccessorsCollection>& resources) const {
+TConclusionStatus TCalculationProcessor::DoExecute(
+ const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& /*context*/) const {
if (KernelLogic) {
auto resultKernel = KernelLogic->Execute(GetInput(), GetOutput(), resources);
if (resultKernel.IsFail()) {
diff --git a/ydb/core/formats/arrow/program/assign_internal.h b/ydb/core/formats/arrow/program/assign_internal.h
index de97c1c4c8a..f2e7caf5f4d 100644
--- a/ydb/core/formats/arrow/program/assign_internal.h
+++ b/ydb/core/formats/arrow/program/assign_internal.h
@@ -3,6 +3,8 @@
#include "functions.h"
#include "kernel_logic.h"
+#include <yql/essentials/core/arrow_kernels/request/request.h>
+
namespace NKikimr::NArrow::NSSA {
class TCalculationProcessor: public IResourceProcessor {
@@ -14,7 +16,7 @@ private:
std::shared_ptr<IStepFunction> Function;
- virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources) const override;
+ virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& context) const override;
TCalculationProcessor(std::vector<TColumnChainInfo>&& input, std::vector<TColumnChainInfo>&& output,
const std::shared_ptr<IStepFunction>& function, const std::shared_ptr<IKernelLogic>& kernelLogic)
@@ -27,6 +29,23 @@ private:
return Function->IsAggregation();
}
+ virtual ui64 DoGetWeight() const override {
+ if (KernelLogic) {
+ return 0;
+ }
+ if (!YqlOperationId) {
+ return 10;
+ } else if ((NYql::TKernelRequestBuilder::EBinaryOp)*YqlOperationId == NYql::TKernelRequestBuilder::EBinaryOp::StartsWith ||
+ (NYql::TKernelRequestBuilder::EBinaryOp)*YqlOperationId == NYql::TKernelRequestBuilder::EBinaryOp::EndsWith) {
+ return 7;
+ } else if ((NYql::TKernelRequestBuilder::EBinaryOp)*YqlOperationId == NYql::TKernelRequestBuilder::EBinaryOp::StringContains) {
+ return 10;
+ } else if ((NYql::TKernelRequestBuilder::EBinaryOp)*YqlOperationId == NYql::TKernelRequestBuilder::EBinaryOp::Equals) {
+ return 5;
+ }
+ return 0;
+ }
+
public:
virtual std::optional<TFetchingInfo> BuildFetchTask(const ui32 columnId, const NAccessor::IChunkedArray::EType arrType,
const std::shared_ptr<TAccessorsCollection>& resources) const override {
diff --git a/ydb/core/formats/arrow/program/chain.cpp b/ydb/core/formats/arrow/program/chain.cpp
index b3b9f303374..4aa193e0e81 100644
--- a/ydb/core/formats/arrow/program/chain.cpp
+++ b/ydb/core/formats/arrow/program/chain.cpp
@@ -1,5 +1,6 @@
#include "chain.h"
#include "collection.h"
+#include "graph.h"
namespace NKikimr::NArrow::NSSA {
@@ -39,7 +40,17 @@ public:
};
} // namespace
-TConclusion<TProgramChain> TProgramChain::Build(std::vector<std::shared_ptr<IResourceProcessor>>&& processors, const IColumnResolver& resolver) {
+TConclusion<TProgramChain> TProgramChain::Build(std::vector<std::shared_ptr<IResourceProcessor>>&& processorsExt, const IColumnResolver& resolver) {
+ NOptimization::TGraph graph(std::move(processorsExt), resolver);
+ auto conclusion = graph.Collapse();
+ if (conclusion.IsFail()) {
+ return conclusion;
+ }
+ auto processorsConclusion = graph.BuildChain();
+ if (processorsConclusion.IsFail()) {
+ return processorsConclusion;
+ }
+ auto processors = processorsConclusion.DetachResult();
THashMap<ui32, TColumnUsage> contextUsage;
ui32 stepIdx = 0;
THashSet<ui32> sourceColumns;
@@ -149,7 +160,7 @@ TConclusionStatus TProgramChain::Initialize() {
TConclusionStatus TProgramChain::Apply(const std::shared_ptr<TAccessorsCollection>& resources) const {
for (auto&& i : Processors) {
- auto status = i->Execute(resources);
+ auto status = i->Execute(resources, i);
if (status.IsFail()) {
return status;
}
diff --git a/ydb/core/formats/arrow/program/filter.cpp b/ydb/core/formats/arrow/program/filter.cpp
index f40e498b4db..0ca6276f9ae 100644
--- a/ydb/core/formats/arrow/program/filter.cpp
+++ b/ydb/core/formats/arrow/program/filter.cpp
@@ -60,21 +60,21 @@ private:
arrow::Status VisitImpl(const TArray& array) {
AFL_VERIFY(Started);
for (ui32 i = 0; i < array.length(); ++i) {
- const bool columnValue = (bool)array.Value(i);
const ui32 currentIdx = CursorIdx++;
- FiltersMerged[currentIdx] = FiltersMerged[currentIdx] && columnValue;
+ FiltersMerged[currentIdx] = FiltersMerged[currentIdx] && !array.IsNull(i) && (bool)array.Value(i);
}
AFL_VERIFY(CursorIdx <= FiltersMerged.size());
return arrow::Status::OK();
}
};
-TConclusionStatus TFilterProcessor::DoExecute(const std::shared_ptr<TAccessorsCollection>& resources) const {
+TConclusionStatus TFilterProcessor::DoExecute(const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& context) const {
std::vector<std::shared_ptr<IChunkedArray>> inputColumns;
- if (ReuseColumns) {
- inputColumns = resources->GetAccessors(TColumnChainInfo::ExtractColumnIds(GetInput()));
- } else {
+ AFL_VERIFY(context.GetColumnsToDrop().size() <= 1)("size", context.GetColumnsToDrop().size());
+ if (context.GetColumnsToDrop().size() && GetInputColumnIdOnce() == context.GetColumnsToDrop().front()) {
inputColumns = resources->ExtractAccessors(TColumnChainInfo::ExtractColumnIds(GetInput()));
+ } else {
+ inputColumns = resources->GetAccessors(TColumnChainInfo::ExtractColumnIds(GetInput()));
}
TFilterVisitor filterVisitor(inputColumns.front()->GetRecordsCount());
for (auto& arr : inputColumns) {
diff --git a/ydb/core/formats/arrow/program/filter.h b/ydb/core/formats/arrow/program/filter.h
index 434a47077a5..dbccdf5592e 100644
--- a/ydb/core/formats/arrow/program/filter.h
+++ b/ydb/core/formats/arrow/program/filter.h
@@ -6,25 +6,20 @@ namespace NKikimr::NArrow::NSSA {
class TFilterProcessor: public IResourceProcessor {
private:
using TBase = IResourceProcessor;
- const bool ReuseColumns;
- virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources) const override;
+ virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& context) const override;
virtual bool IsAggregation() const override {
return false;
}
public:
- TFilterProcessor(std::vector<TColumnChainInfo>&& input, const bool reuseColumns = false)
- : TBase(std::move(input), {}, EProcessorType::Filter)
- , ReuseColumns(reuseColumns)
- {
- AFL_VERIFY(GetInput().size());
+ TFilterProcessor(std::vector<TColumnChainInfo>&& input)
+ : TBase(std::move(input), {}, EProcessorType::Filter) {
+ AFL_VERIFY(GetInput().size() == 1)("size", GetInput().size());
}
- TFilterProcessor(const TColumnChainInfo& input, const bool reuseColumns = false)
- : TBase({ input }, {}, EProcessorType::Filter)
- , ReuseColumns(reuseColumns)
- {
+ TFilterProcessor(const TColumnChainInfo& input)
+ : TBase({ input }, {}, EProcessorType::Filter) {
}
};
diff --git a/ydb/core/formats/arrow/program/graph.cpp b/ydb/core/formats/arrow/program/graph.cpp
new file mode 100644
index 00000000000..7246dd5d69b
--- /dev/null
+++ b/ydb/core/formats/arrow/program/graph.cpp
@@ -0,0 +1,281 @@
+#include "assign_const.h"
+#include "assign_internal.h"
+#include "filter.h"
+#include "graph.h"
+#include "original.h"
+
+#include <ydb/library/formats/arrow/switch/switch_type.h>
+
+#include <yql/essentials/core/arrow_kernels/request/request.h>
+
+namespace NKikimr::NArrow::NSSA::NOptimization {
+
+TGraph::TGraph(std::vector<std::shared_ptr<IResourceProcessor>>&& processors, const IColumnResolver& resolver) {
+ for (auto&& i : processors) {
+ auto node = std::make_shared<TGraphNode>(i);
+ Nodes.emplace(node->GetIdentifier(), node);
+ for (auto&& output : i->GetOutput()) {
+ AFL_VERIFY(Producers.emplace(output.GetColumnId(), node.get()).second);
+ }
+ for (auto&& input : i->GetInput()) {
+ if (Producers.find(input.GetColumnId()) != Producers.end()) {
+ continue;
+ }
+ const TString name = resolver.GetColumnName(input.GetColumnId(), false);
+ if (!!name) {
+ auto nodeInput = std::make_shared<TGraphNode>(
+ std::make_shared<TOriginalColumnProcessor>(input.GetColumnId(), resolver.GetColumnName(input.GetColumnId())));
+ Nodes.emplace(nodeInput->GetIdentifier(), nodeInput);
+ Producers.emplace(input.GetColumnId(), nodeInput.get());
+ }
+ }
+ }
+ for (auto&& [_, i] : Nodes) {
+ for (auto&& p : i->GetProcessor()->GetInput()) {
+ auto node = GetProducerVerified(p.GetColumnId());
+ node->AddDataTo(p.GetColumnId(), i);
+ i->AddDataFrom(p.GetColumnId(), node);
+ }
+ }
+}
+
+TConclusion<bool> TGraph::OptimizeFilter(TGraphNode* filterNode) {
+ if (filterNode->GetProcessor()->GetProcessorType() != EProcessorType::Filter) {
+ return false;
+ }
+ if (filterNode->GetDataFrom().size() != 1) {
+ return TConclusionStatus::Fail("incorrect filter incoming columns (!= 1) : " + ::ToString(filterNode->GetDataFrom().size()));
+ }
+ auto* first = filterNode->GetDataFrom().begin()->second;
+ if (first->GetProcessor()->GetProcessorType() != EProcessorType::Calculation) {
+ return false;
+ }
+ auto calc = first->GetProcessorAs<TCalculationProcessor>();
+ if (!calc->GetYqlOperationId()) {
+ return false;
+ }
+ {
+ auto conclusion = OptimizeFilterWithAnd(filterNode, first, calc);
+ if (conclusion.IsFail()) {
+ return conclusion;
+ }
+ if (*conclusion) {
+ return true;
+ }
+ }
+ {
+ auto conclusion = OptimizeFilterWithCoalesce(filterNode, first, calc);
+ if (conclusion.IsFail()) {
+ return conclusion;
+ }
+ if (*conclusion) {
+ return true;
+ }
+ }
+ return false;
+}
+
+TConclusion<bool> TGraph::OptimizeFilterWithAnd(
+ TGraphNode* filterNode, TGraphNode* filterArg, const std::shared_ptr<TCalculationProcessor>& calc) {
+ if ((NYql::TKernelRequestBuilder::EBinaryOp)*calc->GetYqlOperationId() != NYql::TKernelRequestBuilder::EBinaryOp::And) {
+ return false;
+ }
+ if (calc->GetInput().size() < 2) {
+ return TConclusionStatus::Fail("incorrect and operation incoming columns (< 2) : " + ::ToString(calc->GetInput().size()));
+ }
+ for (auto&& c : calc->GetInput()) {
+ AddNode(std::make_shared<TFilterProcessor>(TColumnChainInfo(c)));
+ }
+ DetachNode(filterNode);
+ DetachNode(filterArg);
+ RemoveNode(filterNode);
+ RemoveNode(filterArg);
+ Cerr << DebugJson() << Endl;
+ return true;
+}
+
+TConclusion<bool> TGraph::OptimizeFilterWithCoalesce(
+ TGraphNode* filterNode, TGraphNode* filterArg, const std::shared_ptr<TCalculationProcessor>& calc) {
+ if ((NYql::TKernelRequestBuilder::EBinaryOp)*calc->GetYqlOperationId() != NYql::TKernelRequestBuilder::EBinaryOp::Coalesce) {
+ return false;
+ }
+ if (calc->GetInput().size() != 2) {
+ return TConclusionStatus::Fail("incorrect coalesce incoming columns (!= 2) : " + ::ToString(calc->GetInput().size()));
+ }
+ TGraphNode* dataNode = GetProducerVerified(calc->GetInput()[0].GetColumnId());
+ TGraphNode* argNode = GetProducerVerified(calc->GetInput()[1].GetColumnId());
+ if (argNode->GetProcessor()->GetProcessorType() != EProcessorType::Const) {
+ return false;
+ }
+ auto scalar = argNode->GetProcessorAs<TConstProcessor>()->GetScalarConstant();
+ if (!scalar) {
+ return TConclusionStatus::Fail("coalesce with null arg is impossible");
+ }
+ if (scalar) {
+ bool doOptimize = false;
+ NArrow::SwitchType(scalar->type->id(), [&](const auto& type) {
+ using TWrap = std::decay_t<decltype(type)>;
+ using T = typename TWrap::T;
+ using TScalar = typename arrow::TypeTraits<T>::ScalarType;
+ auto& typedScalar = static_cast<const TScalar&>(*scalar);
+ if constexpr (arrow::has_c_type<T>()) {
+ doOptimize = (typedScalar.value == 0);
+ }
+ return true;
+ });
+ if (!doOptimize) {
+ return false;
+ }
+ }
+ for (auto&& c : dataNode->GetProcessor()->GetOutput()) {
+ AddNode(std::make_shared<TFilterProcessor>(TColumnChainInfo(c.GetColumnId())));
+ }
+ DetachNode(filterNode);
+ DetachNode(filterArg);
+ RemoveNode(filterNode);
+ RemoveNode(filterArg);
+ if (argNode->GetDataFrom().empty() && argNode->GetDataTo().empty()) {
+ RemoveNode(argNode);
+ }
+ return true;
+}
+
+TConclusionStatus TGraph::Collapse() {
+ bool hasChanges = true;
+ // Cerr << DebugJson() << Endl;
+ while (hasChanges) {
+ hasChanges = false;
+ for (auto&& [_, n] : Nodes) {
+ {
+ auto conclusion = OptimizeFilter(n.get());
+ if (conclusion.IsFail()) {
+ return conclusion;
+ }
+ if (*conclusion) {
+ hasChanges = true;
+ break;
+ }
+ }
+ }
+ }
+ return TConclusionStatus::Success();
+}
+
+class TFilterChain {
+private:
+ YDB_READONLY_DEF(std::vector<const TGraphNode*>, Nodes);
+ ui64 Weight = 0;
+
+public:
+ TFilterChain(const std::vector<const TGraphNode*>& nodes)
+ : Nodes(nodes) {
+ for (auto&& i : nodes) {
+ Weight += i->GetProcessor()->GetWeight();
+ }
+ }
+
+ bool operator<(const TFilterChain& item) const {
+ return Weight < item.Weight;
+ }
+};
+
+TConclusion<std::vector<std::shared_ptr<IResourceProcessor>>> TGraph::BuildChain() {
+ std::vector<TFilterChain> nodeChains;
+ THashSet<i64> readyNodeIds;
+ for (auto&& [_, i] : Nodes) {
+ if (i->GetProcessor()->GetProcessorType() == EProcessorType::Filter) {
+ std::vector<const TGraphNode*> chain = i->GetFetchingChain();
+ std::vector<const TGraphNode*> actualChain;
+ for (auto&& c : chain) {
+ if (readyNodeIds.emplace(c->GetIdentifier()).second) {
+ actualChain.emplace_back(c);
+ }
+ }
+ AFL_VERIFY(actualChain.size());
+ nodeChains.emplace_back(std::move(actualChain));
+ }
+ }
+ std::sort(nodeChains.begin(), nodeChains.end());
+ for (auto&& [_, i] : Nodes) {
+ if (i->GetProcessor()->GetProcessorType() != EProcessorType::Filter && i->GetProcessor()->GetOutput().empty()) {
+ std::vector<const TGraphNode*> chain = i->GetFetchingChain();
+ std::vector<const TGraphNode*> actualChain;
+ for (auto&& c : chain) {
+ if (readyNodeIds.emplace(c->GetIdentifier()).second) {
+ actualChain.emplace_back(c);
+ }
+ }
+ AFL_VERIFY(actualChain.size());
+ nodeChains.emplace_back(std::move(actualChain));
+ }
+ }
+ if (readyNodeIds.size() != Nodes.size()) {
+ std::set<ui32> notCoveredIds;
+ TStringBuilder sb;
+ ui32 count = 0;
+ for (auto&& [id, n] : Nodes) {
+ if (!readyNodeIds.contains(id)) {
+ if (n->GetProcessor()->GetProcessorType() != EProcessorType::Const) {
+ ++count;
+ }
+ sb << n->DebugJson().GetStringRobust() << "/" << n->GetProcessor()->DebugJson().GetStringRobust() << Endl;
+ }
+ }
+ if (count) {
+ return TConclusionStatus::Fail(
+ "not found final nodes: " + ::ToString(readyNodeIds.size()) + " covered from " + ::ToString(Nodes.size()) + ": details = " + sb);
+ }
+ }
+ std::vector<std::shared_ptr<IResourceProcessor>> result;
+ for (auto&& c : nodeChains) {
+ for (auto&& p : c.GetNodes()) {
+ if (p->GetProcessor()->GetProcessorType() != EProcessorType::Original) {
+ result.emplace_back(p->GetProcessor());
+ }
+ }
+ }
+ return result;
+}
+
+void TGraph::AddNode(const std::shared_ptr<IResourceProcessor>& processor) {
+ auto node = std::make_shared<TGraphNode>(processor);
+ Nodes.emplace(node->GetIdentifier(), node);
+ for (auto&& i : processor->GetInput()) {
+ auto nodeProducer = GetProducerVerified(i.GetColumnId());
+ nodeProducer->AddDataTo(i.GetColumnId(), node);
+ node->AddDataFrom(i.GetColumnId(), nodeProducer);
+ }
+}
+
+void TGraph::RemoveNode(TGraphNode* node) {
+ Nodes.erase(node->GetIdentifier());
+}
+
+void TGraph::DetachNode(TGraphNode* node) {
+ for (auto&& i : node->GetDataFrom()) {
+ i.second->RemoveDataTo(i.first.AnotherNodeId(node->GetIdentifier()));
+ }
+ for (auto&& i : node->GetDataTo()) {
+ i.second->RemoveDataFrom(i.first.AnotherNodeId(node->GetIdentifier()));
+ }
+}
+
+std::vector<const TGraphNode*> TGraphNode::GetFetchingChain() const {
+ std::vector<const TGraphNode*> result;
+ result.emplace_back(this);
+ ui32 frontStart = 0;
+ ui32 frontFinish = result.size();
+ while (frontFinish > frontStart) {
+ for (ui32 i = frontStart; i < frontFinish; ++i) {
+ for (auto&& input : result[i]->GetDataFrom()) {
+ result.emplace_back(input.second);
+ }
+ }
+ frontStart = frontFinish;
+ frontFinish = result.size();
+ }
+ std::reverse(result.begin(), result.end());
+ return result;
+}
+
+} // namespace NKikimr::NArrow::NSSA::NOptimization
diff --git a/ydb/core/formats/arrow/program/graph.h b/ydb/core/formats/arrow/program/graph.h
new file mode 100644
index 00000000000..d1e27b7094f
--- /dev/null
+++ b/ydb/core/formats/arrow/program/graph.h
@@ -0,0 +1,147 @@
+#pragma once
+#include "abstract.h"
+
+#include <library/cpp/json/writer/json_value.h>
+
+namespace NKikimr::NArrow::NSSA {
+class TCalculationProcessor;
+}
+
+namespace NKikimr::NArrow::NSSA::NOptimization {
+
+class TGraphNode {
+private:
+ static inline TAtomicCounter Counter = 0;
+ YDB_READONLY(i64, Identifier, Counter.Inc());
+ YDB_READONLY_DEF(std::shared_ptr<IResourceProcessor>, Processor);
+ class TAddress {
+ private:
+ const ui32 ColumnId;
+ const i64 NodeId;
+
+ public:
+ TAddress(const ui32 columnId, const i64 nodeId)
+ : ColumnId(columnId)
+ , NodeId(nodeId) {
+ }
+
+ TAddress AnotherNodeId(const i64 nodeId) const {
+ return TAddress(ColumnId, nodeId);
+ }
+
+ bool operator<(const TAddress& item) const {
+ return std::tie(ColumnId, NodeId) < std::tie(item.ColumnId, item.NodeId);
+ }
+
+ NJson::TJsonValue DebugJson() const {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ result.InsertValue("c", ColumnId);
+ result.InsertValue("n", NodeId);
+ return result;
+ }
+ };
+ std::map<TAddress, TGraphNode*> DataFrom;
+ std::map<TAddress, TGraphNode*> DataTo;
+
+public:
+ NJson::TJsonValue DebugJson() const {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ result.InsertValue("id", Identifier);
+ auto& inputArr = result.InsertValue("input", NJson::JSON_ARRAY);
+ for (auto&& i : DataFrom) {
+ inputArr.AppendValue(i.first.DebugJson());
+ }
+ auto& outputArr = result.InsertValue("output", NJson::JSON_ARRAY);
+ for (auto&& i : DataTo) {
+ outputArr.AppendValue(i.first.DebugJson());
+ }
+ return result;
+ }
+
+ const std::map<TAddress, TGraphNode*>& GetDataTo() const {
+ return DataTo;
+ }
+
+ const std::map<TAddress, TGraphNode*>& GetDataFrom() const {
+ return DataFrom;
+ }
+
+ TGraphNode(const std::shared_ptr<IResourceProcessor>& processor)
+ : Processor(processor) {
+ AFL_VERIFY(Processor);
+ }
+
+ template <class TProcessor>
+ std::shared_ptr<TProcessor> GetProcessorAs() const {
+ return std::static_pointer_cast<TProcessor>(Processor);
+ }
+
+ void AddDataFrom(const ui32 columnId, const std::shared_ptr<TGraphNode>& node) {
+ AFL_VERIFY(node);
+ AFL_VERIFY(DataFrom.emplace(TAddress(columnId, node->GetIdentifier()), node.get()).second);
+ }
+
+ void AddDataFrom(const ui32 columnId, TGraphNode* node) {
+ AFL_VERIFY(node);
+ AFL_VERIFY(DataFrom.emplace(TAddress(columnId, node->GetIdentifier()), node).second);
+ }
+
+ void AddDataTo(const ui32 columnId, TGraphNode* node) {
+ AFL_VERIFY(node);
+ AFL_VERIFY(DataTo.emplace(TAddress(columnId, node->GetIdentifier()), node).second);
+ }
+
+ void AddDataTo(const ui32 columnId, const std::shared_ptr<TGraphNode>& node) {
+ AFL_VERIFY(node);
+ AFL_VERIFY(DataTo.emplace(TAddress(columnId, node->GetIdentifier()), node.get()).second);
+ }
+
+ void RemoveDataFrom(const TAddress& addr) {
+ AFL_VERIFY(DataFrom.erase(addr))("addr", addr.DebugJson())("info", DebugJson());
+ }
+
+ void RemoveDataTo(const TAddress& addr) {
+ AFL_VERIFY(DataTo.erase(addr))("addr", addr.DebugJson())("info", DebugJson());
+ }
+
+ std::vector<const TGraphNode*> GetFetchingChain() const;
+};
+
+class TGraph {
+private:
+ std::map<ui64, std::shared_ptr<TGraphNode>> Nodes;
+ THashMap<ui32, TGraphNode*> Producers;
+ TGraphNode* GetProducerVerified(const ui32 columnId) {
+ auto it = Producers.find(columnId);
+ AFL_VERIFY(it != Producers.end());
+ return it->second;
+ }
+ TConclusion<bool> OptimizeFilter(TGraphNode* filterNode);
+ TConclusion<bool> OptimizeFilterWithCoalesce(TGraphNode* filterNode, TGraphNode* filterArg, const std::shared_ptr<TCalculationProcessor>& calc);
+ TConclusion<bool> OptimizeFilterWithAnd(TGraphNode* filterNode, TGraphNode* filterArg, const std::shared_ptr<TCalculationProcessor>& calc);
+
+ void RemoveNode(TGraphNode* node);
+ void DetachNode(TGraphNode* node);
+ void Connect(TGraphNode* from, TGraphNode* to, const ui32 columnId) {
+ from->AddDataTo(columnId, to);
+ to->AddDataFrom(columnId, from);
+ }
+
+ void AddNode(const std::shared_ptr<IResourceProcessor>& processor);
+ NJson::TJsonValue DebugJson() const {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ auto& nodesArr = result.InsertValue("nodes", NJson::JSON_ARRAY);
+ for (auto&& i : Nodes) {
+ nodesArr.AppendValue(i.second->DebugJson());
+ }
+ return result;
+ }
+
+public:
+ TGraph(std::vector<std::shared_ptr<IResourceProcessor>>&& processors, const IColumnResolver& resolver);
+
+ TConclusionStatus Collapse();
+
+ TConclusion<std::vector<std::shared_ptr<IResourceProcessor>>> BuildChain();
+};
+} // namespace NKikimr::NArrow::NSSA::NOptimization
diff --git a/ydb/core/formats/arrow/program/original.cpp b/ydb/core/formats/arrow/program/original.cpp
new file mode 100644
index 00000000000..8dcb5daff7b
--- /dev/null
+++ b/ydb/core/formats/arrow/program/original.cpp
@@ -0,0 +1,5 @@
+#include "original.h"
+
+namespace NKikimr::NArrow::NSSA {
+
+} // namespace NKikimr::NArrow::NSSA
diff --git a/ydb/core/formats/arrow/program/original.h b/ydb/core/formats/arrow/program/original.h
new file mode 100644
index 00000000000..f5716173ce1
--- /dev/null
+++ b/ydb/core/formats/arrow/program/original.h
@@ -0,0 +1,39 @@
+#pragma once
+#include "abstract.h"
+#include "functions.h"
+#include "kernel_logic.h"
+
+namespace NKikimr::NArrow::NSSA {
+
+class TOriginalColumnProcessor: public IResourceProcessor {
+private:
+ using TBase = IResourceProcessor;
+
+ YDB_ACCESSOR(ui32, ColumnId, 0);
+ YDB_ACCESSOR_DEF(TString, ColumnName);
+
+ virtual TConclusionStatus DoExecute(
+ const std::shared_ptr<TAccessorsCollection>& /*resources*/, const TProcessorContext& /*context*/) const override {
+ AFL_VERIFY(false);
+ return TConclusionStatus::Success();
+ }
+
+ virtual bool IsAggregation() const override {
+ return false;
+ }
+
+public:
+ TOriginalColumnProcessor(const ui32 columnId, const TString& columnName)
+ : TBase({}, { columnId }, EProcessorType::Original)
+ , ColumnName(columnName) {
+ AFL_VERIFY(!!ColumnName);
+ }
+
+ virtual std::optional<TFetchingInfo> BuildFetchTask(const ui32 columnId, const NAccessor::IChunkedArray::EType arrType,
+ const std::shared_ptr<TAccessorsCollection>& resources) const override {
+ AFL_VERIFY(false);
+ return TBase::BuildFetchTask(columnId, arrType, resources);
+ }
+};
+
+} // namespace NKikimr::NArrow::NSSA
diff --git a/ydb/core/formats/arrow/program/projection.cpp b/ydb/core/formats/arrow/program/projection.cpp
index 37951230f50..092aa4f4c45 100644
--- a/ydb/core/formats/arrow/program/projection.cpp
+++ b/ydb/core/formats/arrow/program/projection.cpp
@@ -3,7 +3,8 @@
namespace NKikimr::NArrow::NSSA {
-TConclusionStatus TProjectionProcessor::DoExecute(const std::shared_ptr<TAccessorsCollection>& resources) const {
+TConclusionStatus TProjectionProcessor::DoExecute(
+ const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& /*context*/) const {
resources->RemainOnly(TColumnChainInfo::ExtractColumnIds(GetInput()), true);
return TConclusionStatus::Success();
}
diff --git a/ydb/core/formats/arrow/program/projection.h b/ydb/core/formats/arrow/program/projection.h
index 7084d33150d..2c8c3643736 100644
--- a/ydb/core/formats/arrow/program/projection.h
+++ b/ydb/core/formats/arrow/program/projection.h
@@ -7,7 +7,7 @@ class TProjectionProcessor: public IResourceProcessor {
private:
using TBase = IResourceProcessor;
- virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources) const override;
+ virtual TConclusionStatus DoExecute(const std::shared_ptr<TAccessorsCollection>& resources, const TProcessorContext& context) const override;
virtual bool IsAggregation() const override {
return false;
diff --git a/ydb/core/formats/arrow/program/ya.make b/ydb/core/formats/arrow/program/ya.make
index 8b6b536668a..860b7337357 100644
--- a/ydb/core/formats/arrow/program/ya.make
+++ b/ydb/core/formats/arrow/program/ya.make
@@ -23,6 +23,8 @@ ENDIF()
SRCS(
abstract.cpp
+ graph.cpp
+ original.cpp
collection.cpp
functions.cpp
aggr_keys.cpp
@@ -39,4 +41,6 @@ SRCS(
GENERATE_ENUM_SERIALIZATION(abstract.h)
GENERATE_ENUM_SERIALIZATION(aggr_common.h)
+YQL_LAST_ABI_VERSION()
+
END()
diff --git a/ydb/core/formats/arrow/ut/ut_program_step.cpp b/ydb/core/formats/arrow/ut/ut_program_step.cpp
index a0c40c2a200..e38b72e4b42 100644
--- a/ydb/core/formats/arrow/ut/ut_program_step.cpp
+++ b/ydb/core/formats/arrow/ut/ut_program_step.cpp
@@ -41,7 +41,7 @@ size_t FilterTest(const std::vector<std::shared_ptr<arrow::Array>>& args, const
TProgramChain::TBuilder builder(resolver);
builder.Add(TCalculationProcessor::Build(TColumnChainInfo::BuildVector({1, 2}), TColumnChainInfo(4), std::make_shared<TSimpleFunction>(op1)).DetachResult());
builder.Add(TCalculationProcessor::Build(TColumnChainInfo::BuildVector({4, 3}), TColumnChainInfo(5), std::make_shared<TSimpleFunction>(op2)).DetachResult());
- builder.Add(std::make_shared<TFilterProcessor>(TColumnChainInfo::BuildVector({ 5 }), true));
+ builder.Add(std::make_shared<TFilterProcessor>(TColumnChainInfo::BuildVector({ 5 })));
builder.Add(std::make_shared<TProjectionProcessor>(TColumnChainInfo::BuildVector({ 4, 5 })));
auto chain = builder.Finish().DetachResult();
auto resources = std::make_shared<NAccessor::TAccessorsCollection>();
@@ -61,7 +61,7 @@ size_t FilterTestUnary(std::vector<std::shared_ptr<arrow::Array>> args, const EO
TProgramChain::TBuilder builder(resolver);
builder.Add(TCalculationProcessor::Build(TColumnChainInfo::BuildVector({1}), TColumnChainInfo(4), std::make_shared<TSimpleFunction>(op1)).DetachResult());
builder.Add(TCalculationProcessor::Build(TColumnChainInfo::BuildVector({2, 4}), TColumnChainInfo(5), std::make_shared<TSimpleFunction>(op2)).DetachResult());
- builder.Add(std::make_shared<TFilterProcessor>(TColumnChainInfo::BuildVector({ 5 }), true));
+ builder.Add(std::make_shared<TFilterProcessor>(TColumnChainInfo::BuildVector({ 5 })));
builder.Add(std::make_shared<TProjectionProcessor>(TColumnChainInfo::BuildVector({ 4, 5 })));
auto chain = builder.Finish().DetachResult();
auto resources = std::make_shared<NAccessor::TAccessorsCollection>();
@@ -488,7 +488,7 @@ Y_UNIT_TEST_SUITE(ProgramStep) {
TProgramChain::TBuilder builder(resolver);
builder.Add(std::make_shared<TConstProcessor>(std::make_shared<arrow::Int64Scalar>(56), 3));
builder.Add(TCalculationProcessor::Build(TColumnChainInfo::BuildVector({1, 3}), TColumnChainInfo(4), std::make_shared<TSimpleFunction>(EOperation::Add)).DetachResult());
- builder.Add(std::make_shared<TFilterProcessor>(TColumnChainInfo::BuildVector({ 2 }), true));
+ builder.Add(std::make_shared<TFilterProcessor>(TColumnChainInfo::BuildVector({ 2 })));
builder.Add(std::make_shared<TProjectionProcessor>(TColumnChainInfo::BuildVector({ 2, 4 })));
auto chain = builder.Finish().DetachResult();
auto resources = std::make_shared<NAccessor::TAccessorsCollection>();
@@ -501,6 +501,75 @@ Y_UNIT_TEST_SUITE(ProgramStep) {
AFL_VERIFY(resources->GetRecordsCountVerified() == 2);
}
+ Y_UNIT_TEST(TestValueFromNull) {
+ arrow::UInt32Builder sb;
+ sb.AppendNulls(10).ok();
+ auto arr = std::dynamic_pointer_cast<arrow::UInt32Array>(*sb.Finish());
+ AFL_VERIFY(arr->Value(0) == 0)("val", arr->Value(0));
+ }
+
+ Y_UNIT_TEST(SplitFilterSimple) {
+ std::vector<std::string> data = { "aa", "aaa", "aaaa", "bbbbb" };
+ arrow::StringBuilder sb;
+ sb.AppendValues(data).ok();
+
+ auto schema = std::make_shared<arrow::Schema>(
+ std::vector{ std::make_shared<arrow::Field>("int", arrow::int64()), std::make_shared<arrow::Field>("string", arrow::utf8()) });
+ auto batch = arrow::RecordBatch::Make(schema, 4, std::vector{ NumVecToArray(arrow::int64(), { 64, 5, 1, 43 }), *sb.Finish() });
+ UNIT_ASSERT(batch->ValidateFull().ok());
+
+ TSchemaColumnResolver resolver(schema);
+ TProgramChain::TBuilder builder(resolver);
+ builder.Add(std::make_shared<TConstProcessor>(std::make_shared<arrow::Int64Scalar>(56), 3));
+ builder.Add(std::make_shared<TConstProcessor>(std::make_shared<arrow::Int64Scalar>(0), 4));
+
+ {
+ auto proc = TCalculationProcessor::Build(TColumnChainInfo::BuildVector({2}), TColumnChainInfo(1001), std::make_shared<TSimpleFunction>(EOperation::MatchSubstring)).DetachResult();
+ proc->SetYqlOperationId((ui32)NYql::TKernelRequestBuilder::EBinaryOp::StringContains);
+ builder.Add(proc);
+ }
+ {
+ auto proc = TCalculationProcessor::Build(TColumnChainInfo::BuildVector({1001, 4}), TColumnChainInfo(1101), std::make_shared<TSimpleFunction>(EOperation::Add)).DetachResult();
+ proc->SetYqlOperationId((ui32)NYql::TKernelRequestBuilder::EBinaryOp::Coalesce);
+ builder.Add(proc);
+ }
+ {
+ auto proc =
+ TCalculationProcessor::Build(TColumnChainInfo::BuildVector({2}), TColumnChainInfo(1002), std::make_shared<TSimpleFunction>(EOperation::StartsWith)).DetachResult();
+ proc->SetYqlOperationId((ui32)NYql::TKernelRequestBuilder::EBinaryOp::StartsWith);
+ builder.Add(proc);
+ }
+ {
+ auto proc = TCalculationProcessor::Build(TColumnChainInfo::BuildVector({1002, 4}), TColumnChainInfo(1102), std::make_shared<TSimpleFunction>(EOperation::Add)).DetachResult();
+ proc->SetYqlOperationId((ui32)NYql::TKernelRequestBuilder::EBinaryOp::Coalesce);
+ builder.Add(proc);
+ }
+ {
+ auto proc =
+ TCalculationProcessor::Build(TColumnChainInfo::BuildVector({1, 3}), TColumnChainInfo(1003), std::make_shared<TSimpleFunction>(EOperation::Equal)).DetachResult();
+ proc->SetYqlOperationId((ui32)NYql::TKernelRequestBuilder::EBinaryOp::Equals);
+ builder.Add(proc);
+ }
+ {
+ auto proc = TCalculationProcessor::Build(TColumnChainInfo::BuildVector({1003, 4}), TColumnChainInfo(1103), std::make_shared<TSimpleFunction>(EOperation::Add)).DetachResult();
+ proc->SetYqlOperationId((ui32)NYql::TKernelRequestBuilder::EBinaryOp::Coalesce);
+ builder.Add(proc);
+ }
+
+ auto andOperator1 = TCalculationProcessor::Build(TColumnChainInfo::BuildVector({1101, 1102}), TColumnChainInfo(1104), std::make_shared<TSimpleFunction>(EOperation::And)).DetachResult();
+ andOperator1->SetYqlOperationId(0);
+ builder.Add(andOperator1);
+
+ auto andOperator2 = TCalculationProcessor::Build(TColumnChainInfo::BuildVector({1104, 1103}), TColumnChainInfo(1105), std::make_shared<TSimpleFunction>(EOperation::And)).DetachResult();
+ andOperator2->SetYqlOperationId(0);
+ builder.Add(andOperator2);
+ builder.Add(std::make_shared<TFilterProcessor>(TColumnChainInfo::BuildVector({ 1105 })));
+ builder.Add(std::make_shared<TProjectionProcessor>(TColumnChainInfo::BuildVector({ 1, 2 })));
+ auto chain = builder.Finish().DetachResult();
+ Cerr << chain->DebugJson() << Endl;
+ AFL_VERIFY(chain->DebugString() == R"({"processors":[{"processor":{"internal":{},"type":"Const","output":"3"}},{"processor":{"internal":{},"type":"Calculation","input":"1,3","output":"1003"},"fetch":"1","drop":"3"},{"processor":{"internal":{},"type":"Filter","input":"1003"},"drop":"1003"},{"processor":{"internal":{},"type":"Calculation","input":"2","output":"1002"},"fetch":"2"},{"processor":{"internal":{},"type":"Filter","input":"1002"},"drop":"1002"},{"processor":{"internal":{},"type":"Calculation","input":"2","output":"1001"}},{"processor":{"internal":{},"type":"Filter","input":"1001"},"drop":"1001"},{"processor":{"internal":{},"type":"Projection","input":"1,2"}}]})");
+ }
+
Y_UNIT_TEST(Projection) {
auto schema = std::make_shared<arrow::Schema>(
std::vector{ std::make_shared<arrow::Field>("x", arrow::int64()), std::make_shared<arrow::Field>("y", arrow::boolean()) });
diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp
index 937f049a43b..aecf4dd77a6 100644
--- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.cpp
@@ -207,7 +207,7 @@ TConclusion<bool> TProgramStepPrepare::DoExecuteInplace(const std::shared_ptr<ID
TConclusion<bool> TProgramStep::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*cursor*/) const {
// NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()(
// "program", source->GetContext()->GetCommonContext()->GetReadMetadata()->GetProgram().ProtoDebugString());
- auto result = Step->Execute(source->GetStageData().GetTable());
+ auto result = Step->Execute(source->GetStageData().GetTable(), Step);
if (result.IsFail()) {
return result;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/sub_columns_fetching.h b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/sub_columns_fetching.h
index b6f29ff023a..276751c9fa4 100644
--- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/sub_columns_fetching.h
+++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/sub_columns_fetching.h
@@ -78,10 +78,9 @@ public:
i.second.GetBlobDataVerified().size());
std::vector<NArrow::NAccessor::TDeserializeChunkedArray::TChunk> chunks = { NArrow::NAccessor::TDeserializeChunkedArray::TChunk(
GetRecordsCount(), i.second.GetBlobDataVerified()) };
-// const ui32 filledRecordsCount = PartialArray->GetHeader().GetColumnStats().GetColumnRecordsCount(i.second.GetColumnIdx());
const std::shared_ptr<NArrow::NAccessor::IChunkedArray> arrOriginal =
deserialize
- ? columnLoader->ApplyVerified(i.second.GetBlobDataVerified(), GetRecordsCount()/*, filledRecordsCount*/)
+ ? columnLoader->ApplyVerified(i.second.GetBlobDataVerified(), GetRecordsCount())
: std::make_shared<NArrow::NAccessor::TDeserializeChunkedArray>(GetRecordsCount(), columnLoader, std::move(chunks), true);
if (applyFilter) {
PartialArray->AddColumn(i.first, applyFilter->Apply(arrOriginal));
@@ -122,8 +121,8 @@ public:
AFL_VERIFY(!PartialArray);
HeaderRange = std::nullopt;
PartialArray = NArrow::NAccessor::NSubColumns::TConstructor::BuildPartialReader(blob, ChunkExternalInfo).DetachResult();
-// AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("columns", PartialArray->GetHeader().GetColumnStats().DebugJson().GetStringRobust())(
-// "others", PartialArray->GetHeader().GetOtherStats().DebugJson().GetStringRobust());
+ // AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("columns", PartialArray->GetHeader().GetColumnStats().DebugJson().GetStringRobust())(
+ // "others", PartialArray->GetHeader().GetOtherStats().DebugJson().GetStringRobust());
}
void InitPartialReader(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& accessor) {