diff options
author | chertus <azuikov@ydb.tech> | 2022-12-30 09:23:56 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2022-12-30 09:23:56 +0300 |
commit | bbb9ce84f6295f0ecdb2f124e861ab18a71aff16 (patch) | |
tree | d16473a718dd21b750ec0a8d4b31de5699cbdf23 | |
parent | 3dd676f1ae386d1a1c5cb831a14fb4aa66d0e334 (diff) | |
download | ydb-bbb9ce84f6295f0ecdb2f124e861ab18a71aff16.tar.gz |
catch exception in SSA
-rw-r--r-- | ydb/core/formats/program.cpp | 8 | ||||
-rw-r--r-- | ydb/core/formats/program.h | 40 | ||||
-rw-r--r-- | ydb/core/formats/ut_program_step.cpp | 14 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__stats_scan.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/indexed_read_data.cpp | 2 |
5 files changed, 38 insertions, 28 deletions
diff --git a/ydb/core/formats/program.cpp b/ydb/core/formats/program.cpp index f1a2cea353..b4cadbd9a1 100644 --- a/ydb/core/formats/program.cpp +++ b/ydb/core/formats/program.cpp @@ -383,12 +383,8 @@ arrow::Result<arrow::Datum> CallHouseFunctionByAssign( TProgramStep::TDatumBatch& batch, arrow::compute::ExecContext* ctx) { - try { - return CallFunctionById<true>(assign.GetOperation(), assign.GetArguments(), &assign.GetAggregateOptions(), - batch, ctx); - } catch (const std::exception& ex) { - Y_VERIFY_S(false, ex.what()); - } + return CallFunctionById<true>(assign.GetOperation(), assign.GetArguments(), &assign.GetAggregateOptions(), + batch, ctx); } CH::GroupByOptions::Assign GetGroupByAssign(const TAggregateAssign& assign) { diff --git a/ydb/core/formats/program.h b/ydb/core/formats/program.h index 590025a4fb..f9cdfa8bfd 100644 --- a/ydb/core/formats/program.h +++ b/ydb/core/formats/program.h @@ -204,23 +204,37 @@ struct TProgramStep { arrow::Status ApplyProjection(TDatumBatch& batch) const; }; +struct TProgram { + std::vector<std::shared_ptr<TProgramStep>> Steps; + THashMap<ui32, TString> SourceColumns; + + TProgram() = default; + + TProgram(std::vector<std::shared_ptr<TProgramStep>>&& steps) + : Steps(std::move(steps)) + {} + + arrow::Status ApplyTo(std::shared_ptr<arrow::RecordBatch>& batch, arrow::compute::ExecContext* ctx) const { + try { + for (auto& step : Steps) { + auto status = step->Apply(batch, ctx); + if (!status.ok()) { + return status; + } + } + } catch (const std::exception& ex) { + return arrow::Status::Invalid(ex.what()); + } + return arrow::Status::OK(); + } +}; + inline arrow::Status ApplyProgram( std::shared_ptr<arrow::RecordBatch>& batch, - const std::vector<std::shared_ptr<TProgramStep>>& program, + const TProgram& program, arrow::compute::ExecContext* ctx = nullptr) { - for (auto& step : program) { - auto status = step->Apply(batch, ctx); - if (!status.ok()) { - return status; - } - } - return arrow::Status::OK(); + return program.ApplyTo(batch, ctx); } -struct TProgram { - std::vector<std::shared_ptr<TProgramStep>> Steps; - THashMap<ui32, TString> SourceColumns; -}; - } diff --git a/ydb/core/formats/ut_program_step.cpp b/ydb/core/formats/ut_program_step.cpp index bd71ea1afa..fe9a1ea79f 100644 --- a/ydb/core/formats/ut_program_step.cpp +++ b/ydb/core/formats/ut_program_step.cpp @@ -29,7 +29,7 @@ size_t FilterTest(std::vector<std::shared_ptr<arrow::Array>> args, EOperation op step->Assignes = {TAssign("res1", op1, {"x", "y"}), TAssign("res2", op2, {"res1", "z"})}; step->Filters = {"res2"}; step->Projection = {"res1", "res2"}; - UNIT_ASSERT(ApplyProgram(batch, {step}, GetCustomExecContext()).ok()); + UNIT_ASSERT(ApplyProgram(batch, TProgram({step}), GetCustomExecContext()).ok()); UNIT_ASSERT(batch->ValidateFull().ok()); UNIT_ASSERT_VALUES_EQUAL(batch->num_columns(), 2); return batch->num_rows(); @@ -46,7 +46,7 @@ size_t FilterTestUnary(std::vector<std::shared_ptr<arrow::Array>> args, EOperati step->Assignes = {TAssign("res1", op1, {"x"}), TAssign("res2", op2, {"res1", "z"})}; step->Filters = {"res2"}; step->Projection = {"res1", "res2"}; - UNIT_ASSERT(ApplyProgram(batch, {step}, GetCustomExecContext()).ok()); + UNIT_ASSERT(ApplyProgram(batch, TProgram({step}), GetCustomExecContext()).ok()); UNIT_ASSERT(batch->ValidateFull().ok()); UNIT_ASSERT_VALUES_EQUAL(batch->num_columns(), 2); return batch->num_rows(); @@ -75,7 +75,7 @@ void SumGroupBy(bool nullable, ui32 numKeys = 1) { step->GroupByKeys.push_back("y"); } - UNIT_ASSERT(ApplyProgram(batch, {step}, GetCustomExecContext()).ok()); + UNIT_ASSERT(ApplyProgram(batch, TProgram({step}), GetCustomExecContext()).ok()); UNIT_ASSERT(batch->ValidateFull().ok()); UNIT_ASSERT_VALUES_EQUAL(batch->num_columns(), numKeys + 2); UNIT_ASSERT_VALUES_EQUAL(batch->num_rows(), 2); @@ -234,7 +234,7 @@ Y_UNIT_TEST_SUITE(ProgramStep) { step->Assignes = {TAssign("y", 56), TAssign("res", EOperation::Add, {"x", "y"})}; step->Filters = {"filter"}; step->Projection = {"res", "filter"}; - UNIT_ASSERT(ApplyProgram(batch, {step}, GetCustomExecContext()).ok()); + UNIT_ASSERT(ApplyProgram(batch, TProgram({step}), GetCustomExecContext()).ok()); UNIT_ASSERT(batch->ValidateFull().ok()); UNIT_ASSERT_VALUES_EQUAL(batch->num_columns(), 2); UNIT_ASSERT_VALUES_EQUAL(batch->num_rows(), 2); @@ -250,7 +250,7 @@ Y_UNIT_TEST_SUITE(ProgramStep) { auto step = std::make_shared<TProgramStep>(); step->Projection = {"x"}; - UNIT_ASSERT(ApplyProgram(batch, {step}, GetCustomExecContext()).ok()); + UNIT_ASSERT(ApplyProgram(batch, TProgram({step}), GetCustomExecContext()).ok()); UNIT_ASSERT(batch->ValidateFull().ok()); UNIT_ASSERT_VALUES_EQUAL(batch->num_columns(), 1); UNIT_ASSERT_VALUES_EQUAL(batch->num_rows(), 4); @@ -272,7 +272,7 @@ Y_UNIT_TEST_SUITE(ProgramStep) { TAggregateAssign("min_x", EAggregate::Min, {"x"}), TAggregateAssign("max_y", EAggregate::Max, {"y"}) }; - UNIT_ASSERT(ApplyProgram(batch, {step}, GetCustomExecContext()).ok()); + UNIT_ASSERT(ApplyProgram(batch, TProgram({step}), GetCustomExecContext()).ok()); UNIT_ASSERT(batch->ValidateFull().ok()); UNIT_ASSERT_VALUES_EQUAL(batch->num_columns(), 2); UNIT_ASSERT_VALUES_EQUAL(batch->num_rows(), 1); @@ -296,7 +296,7 @@ Y_UNIT_TEST_SUITE(ProgramStep) { TAggregateAssign("sum_x", EAggregate::Sum, {"x"}), TAggregateAssign("sum_y", EAggregate::Sum, {"y"}) }; - UNIT_ASSERT(ApplyProgram(batch, {step}, GetCustomExecContext()).ok()); + UNIT_ASSERT(ApplyProgram(batch, TProgram({step}), GetCustomExecContext()).ok()); UNIT_ASSERT(batch->ValidateFull().ok()); UNIT_ASSERT_VALUES_EQUAL(batch->num_columns(), 2); UNIT_ASSERT_VALUES_EQUAL(batch->num_rows(), 1); diff --git a/ydb/core/tx/columnshard/columnshard__stats_scan.h b/ydb/core/tx/columnshard/columnshard__stats_scan.h index c31df229a7..cee4f525f7 100644 --- a/ydb/core/tx/columnshard/columnshard__stats_scan.h +++ b/ydb/core/tx/columnshard/columnshard__stats_scan.h @@ -66,7 +66,7 @@ public: }; if (ReadMetadata->Program) { - auto status = ApplyProgram(out.ResultBatch, ReadMetadata->Program->Steps, NArrow::GetCustomExecContext()); + auto status = ApplyProgram(out.ResultBatch, *ReadMetadata->Program, NArrow::GetCustomExecContext()); Y_VERIFY_S(status.ok(), status.message()); } return out; diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index 2511f85c4d..f82cb2c26b 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -526,7 +526,7 @@ TIndexedReadData::MakeResult(TVector<std::vector<std::shared_ptr<arrow::RecordBa if (ReadMetadata->Program) { for (auto& batch : out) { - auto status = ApplyProgram(batch.ResultBatch, ReadMetadata->Program->Steps, NArrow::GetCustomExecContext()); + auto status = ApplyProgram(batch.ResultBatch, *ReadMetadata->Program, NArrow::GetCustomExecContext()); Y_VERIFY_S(status.ok(), status.message()); } } |