aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2022-12-30 09:23:56 +0300
committerchertus <azuikov@ydb.tech>2022-12-30 09:23:56 +0300
commitbbb9ce84f6295f0ecdb2f124e861ab18a71aff16 (patch)
treed16473a718dd21b750ec0a8d4b31de5699cbdf23
parent3dd676f1ae386d1a1c5cb831a14fb4aa66d0e334 (diff)
downloadydb-bbb9ce84f6295f0ecdb2f124e861ab18a71aff16.tar.gz
catch exception in SSA
-rw-r--r--ydb/core/formats/program.cpp8
-rw-r--r--ydb/core/formats/program.h40
-rw-r--r--ydb/core/formats/ut_program_step.cpp14
-rw-r--r--ydb/core/tx/columnshard/columnshard__stats_scan.h2
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp2
5 files changed, 38 insertions, 28 deletions
diff --git a/ydb/core/formats/program.cpp b/ydb/core/formats/program.cpp
index f1a2cea353..b4cadbd9a1 100644
--- a/ydb/core/formats/program.cpp
+++ b/ydb/core/formats/program.cpp
@@ -383,12 +383,8 @@ arrow::Result<arrow::Datum> CallHouseFunctionByAssign(
TProgramStep::TDatumBatch& batch,
arrow::compute::ExecContext* ctx)
{
- try {
- return CallFunctionById<true>(assign.GetOperation(), assign.GetArguments(), &assign.GetAggregateOptions(),
- batch, ctx);
- } catch (const std::exception& ex) {
- Y_VERIFY_S(false, ex.what());
- }
+ return CallFunctionById<true>(assign.GetOperation(), assign.GetArguments(), &assign.GetAggregateOptions(),
+ batch, ctx);
}
CH::GroupByOptions::Assign GetGroupByAssign(const TAggregateAssign& assign) {
diff --git a/ydb/core/formats/program.h b/ydb/core/formats/program.h
index 590025a4fb..f9cdfa8bfd 100644
--- a/ydb/core/formats/program.h
+++ b/ydb/core/formats/program.h
@@ -204,23 +204,37 @@ struct TProgramStep {
arrow::Status ApplyProjection(TDatumBatch& batch) const;
};
+struct TProgram {
+ std::vector<std::shared_ptr<TProgramStep>> Steps;
+ THashMap<ui32, TString> SourceColumns;
+
+ TProgram() = default;
+
+ TProgram(std::vector<std::shared_ptr<TProgramStep>>&& steps)
+ : Steps(std::move(steps))
+ {}
+
+ arrow::Status ApplyTo(std::shared_ptr<arrow::RecordBatch>& batch, arrow::compute::ExecContext* ctx) const {
+ try {
+ for (auto& step : Steps) {
+ auto status = step->Apply(batch, ctx);
+ if (!status.ok()) {
+ return status;
+ }
+ }
+ } catch (const std::exception& ex) {
+ return arrow::Status::Invalid(ex.what());
+ }
+ return arrow::Status::OK();
+ }
+};
+
inline arrow::Status ApplyProgram(
std::shared_ptr<arrow::RecordBatch>& batch,
- const std::vector<std::shared_ptr<TProgramStep>>& program,
+ const TProgram& program,
arrow::compute::ExecContext* ctx = nullptr)
{
- for (auto& step : program) {
- auto status = step->Apply(batch, ctx);
- if (!status.ok()) {
- return status;
- }
- }
- return arrow::Status::OK();
+ return program.ApplyTo(batch, ctx);
}
-struct TProgram {
- std::vector<std::shared_ptr<TProgramStep>> Steps;
- THashMap<ui32, TString> SourceColumns;
-};
-
}
diff --git a/ydb/core/formats/ut_program_step.cpp b/ydb/core/formats/ut_program_step.cpp
index bd71ea1afa..fe9a1ea79f 100644
--- a/ydb/core/formats/ut_program_step.cpp
+++ b/ydb/core/formats/ut_program_step.cpp
@@ -29,7 +29,7 @@ size_t FilterTest(std::vector<std::shared_ptr<arrow::Array>> args, EOperation op
step->Assignes = {TAssign("res1", op1, {"x", "y"}), TAssign("res2", op2, {"res1", "z"})};
step->Filters = {"res2"};
step->Projection = {"res1", "res2"};
- UNIT_ASSERT(ApplyProgram(batch, {step}, GetCustomExecContext()).ok());
+ UNIT_ASSERT(ApplyProgram(batch, TProgram({step}), GetCustomExecContext()).ok());
UNIT_ASSERT(batch->ValidateFull().ok());
UNIT_ASSERT_VALUES_EQUAL(batch->num_columns(), 2);
return batch->num_rows();
@@ -46,7 +46,7 @@ size_t FilterTestUnary(std::vector<std::shared_ptr<arrow::Array>> args, EOperati
step->Assignes = {TAssign("res1", op1, {"x"}), TAssign("res2", op2, {"res1", "z"})};
step->Filters = {"res2"};
step->Projection = {"res1", "res2"};
- UNIT_ASSERT(ApplyProgram(batch, {step}, GetCustomExecContext()).ok());
+ UNIT_ASSERT(ApplyProgram(batch, TProgram({step}), GetCustomExecContext()).ok());
UNIT_ASSERT(batch->ValidateFull().ok());
UNIT_ASSERT_VALUES_EQUAL(batch->num_columns(), 2);
return batch->num_rows();
@@ -75,7 +75,7 @@ void SumGroupBy(bool nullable, ui32 numKeys = 1) {
step->GroupByKeys.push_back("y");
}
- UNIT_ASSERT(ApplyProgram(batch, {step}, GetCustomExecContext()).ok());
+ UNIT_ASSERT(ApplyProgram(batch, TProgram({step}), GetCustomExecContext()).ok());
UNIT_ASSERT(batch->ValidateFull().ok());
UNIT_ASSERT_VALUES_EQUAL(batch->num_columns(), numKeys + 2);
UNIT_ASSERT_VALUES_EQUAL(batch->num_rows(), 2);
@@ -234,7 +234,7 @@ Y_UNIT_TEST_SUITE(ProgramStep) {
step->Assignes = {TAssign("y", 56), TAssign("res", EOperation::Add, {"x", "y"})};
step->Filters = {"filter"};
step->Projection = {"res", "filter"};
- UNIT_ASSERT(ApplyProgram(batch, {step}, GetCustomExecContext()).ok());
+ UNIT_ASSERT(ApplyProgram(batch, TProgram({step}), GetCustomExecContext()).ok());
UNIT_ASSERT(batch->ValidateFull().ok());
UNIT_ASSERT_VALUES_EQUAL(batch->num_columns(), 2);
UNIT_ASSERT_VALUES_EQUAL(batch->num_rows(), 2);
@@ -250,7 +250,7 @@ Y_UNIT_TEST_SUITE(ProgramStep) {
auto step = std::make_shared<TProgramStep>();
step->Projection = {"x"};
- UNIT_ASSERT(ApplyProgram(batch, {step}, GetCustomExecContext()).ok());
+ UNIT_ASSERT(ApplyProgram(batch, TProgram({step}), GetCustomExecContext()).ok());
UNIT_ASSERT(batch->ValidateFull().ok());
UNIT_ASSERT_VALUES_EQUAL(batch->num_columns(), 1);
UNIT_ASSERT_VALUES_EQUAL(batch->num_rows(), 4);
@@ -272,7 +272,7 @@ Y_UNIT_TEST_SUITE(ProgramStep) {
TAggregateAssign("min_x", EAggregate::Min, {"x"}),
TAggregateAssign("max_y", EAggregate::Max, {"y"})
};
- UNIT_ASSERT(ApplyProgram(batch, {step}, GetCustomExecContext()).ok());
+ UNIT_ASSERT(ApplyProgram(batch, TProgram({step}), GetCustomExecContext()).ok());
UNIT_ASSERT(batch->ValidateFull().ok());
UNIT_ASSERT_VALUES_EQUAL(batch->num_columns(), 2);
UNIT_ASSERT_VALUES_EQUAL(batch->num_rows(), 1);
@@ -296,7 +296,7 @@ Y_UNIT_TEST_SUITE(ProgramStep) {
TAggregateAssign("sum_x", EAggregate::Sum, {"x"}),
TAggregateAssign("sum_y", EAggregate::Sum, {"y"})
};
- UNIT_ASSERT(ApplyProgram(batch, {step}, GetCustomExecContext()).ok());
+ UNIT_ASSERT(ApplyProgram(batch, TProgram({step}), GetCustomExecContext()).ok());
UNIT_ASSERT(batch->ValidateFull().ok());
UNIT_ASSERT_VALUES_EQUAL(batch->num_columns(), 2);
UNIT_ASSERT_VALUES_EQUAL(batch->num_rows(), 1);
diff --git a/ydb/core/tx/columnshard/columnshard__stats_scan.h b/ydb/core/tx/columnshard/columnshard__stats_scan.h
index c31df229a7..cee4f525f7 100644
--- a/ydb/core/tx/columnshard/columnshard__stats_scan.h
+++ b/ydb/core/tx/columnshard/columnshard__stats_scan.h
@@ -66,7 +66,7 @@ public:
};
if (ReadMetadata->Program) {
- auto status = ApplyProgram(out.ResultBatch, ReadMetadata->Program->Steps, NArrow::GetCustomExecContext());
+ auto status = ApplyProgram(out.ResultBatch, *ReadMetadata->Program, NArrow::GetCustomExecContext());
Y_VERIFY_S(status.ok(), status.message());
}
return out;
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index 2511f85c4d..f82cb2c26b 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -526,7 +526,7 @@ TIndexedReadData::MakeResult(TVector<std::vector<std::shared_ptr<arrow::RecordBa
if (ReadMetadata->Program) {
for (auto& batch : out) {
- auto status = ApplyProgram(batch.ResultBatch, ReadMetadata->Program->Steps, NArrow::GetCustomExecContext());
+ auto status = ApplyProgram(batch.ResultBatch, *ReadMetadata->Program, NArrow::GetCustomExecContext());
Y_VERIFY_S(status.ok(), status.message());
}
}