diff options
author | vvvv <vvvv@ydb.tech> | 2022-10-21 12:56:00 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2022-10-21 12:56:00 +0300 |
commit | 4045f3143dc5214b8aaa84891d69f4c868d5d092 (patch) | |
tree | cfbabe56e8084f4a508f0cb6620417bab5ced11b | |
parent | fc0a02a9671836e4ba8f2e3f7c4f7cdc1948c699 (diff) | |
download | ydb-4045f3143dc5214b8aaa84891d69f4c868d5d092.tar.gz |
fixes for yield and object reuse
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp | 26 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp | 47 |
2 files changed, 47 insertions, 26 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp index f99f451172a..b9bce7f6de9 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp @@ -50,23 +50,22 @@ struct TState : public TComputationValue<TState> { const arrow::compute::ScalarKernel& kernel, const arrow::compute::FunctionRegistry& registry, const std::vector<arrow::ValueDescr>& argsValuesDescr, TComputationContext& ctx) : TComputationValue(memInfo) + , Options(options) , ExecContext(&ctx.ArrowMemoryPool, nullptr, const_cast<arrow::compute::FunctionRegistry*>(®istry)) , KernelContext(&ExecContext) - , Executor(arrow::compute::detail::KernelExecutor::MakeScalar()) { if (kernel.init) {
State = ARROW_RESULT(kernel.init(&KernelContext, { &kernel, argsValuesDescr, options }));
KernelContext.SetState(State.get());
}
- ARROW_OK(Executor->Init(&KernelContext, { &kernel, argsValuesDescr, options }));
Values.reserve(argsValuesDescr.size()); } + const arrow::compute::FunctionOptions* Options; arrow::compute::ExecContext ExecContext; arrow::compute::KernelContext KernelContext; std::unique_ptr<arrow::compute::KernelState> State; - std::unique_ptr<arrow::compute::detail::KernelExecutor> Executor; std::vector<arrow::Datum> Values; }; @@ -99,9 +98,11 @@ public: Y_VERIFY_DEBUG(ArgsValuesDescr[i] == state.Values.back().descr()); } - auto listener = std::make_shared<arrow::compute::detail::DatumAccumulator>();
- ARROW_OK(state.Executor->Execute(state.Values, listener.get()));
- auto output = state.Executor->WrapResults(state.Values, listener->values()); + auto listener = std::make_shared<arrow::compute::detail::DatumAccumulator>(); + auto executor = arrow::compute::detail::KernelExecutor::MakeScalar(); + ARROW_OK(executor->Init(&state.KernelContext, { &Kernel, ArgsValuesDescr, state.Options }));
+ ARROW_OK(executor->Execute(state.Values, listener.get()));
+ auto output = executor->WrapResults(state.Values, listener->values()); return ctx.HolderFactory.CreateArrowBlock(std::move(output)); } @@ -154,6 +155,7 @@ public: , FunctionRegistry(functionRegistry) , Function(ResolveFunction(FunctionRegistry, to)) , Kernel(ResolveKernel(Function, ArgsValuesDescr)) + , CastOptions(false) { } @@ -164,9 +166,11 @@ public: state.Values.emplace_back(TArrowBlock::From(Arg->GetValue(ctx)).GetDatum()); Y_VERIFY_DEBUG(ArgsValuesDescr[0] == state.Values.back().descr()); - auto listener = std::make_shared<arrow::compute::detail::DatumAccumulator>();
- ARROW_OK(state.Executor->Execute(state.Values, listener.get()));
- auto output = state.Executor->WrapResults(state.Values, listener->values()); + auto listener = std::make_shared<arrow::compute::detail::DatumAccumulator>(); + auto executor = arrow::compute::detail::KernelExecutor::MakeScalar(); + ARROW_OK(executor->Init(&state.KernelContext, { &Kernel, ArgsValuesDescr, state.Options }));
+ ARROW_OK(executor->Execute(state.Values, listener.get()));
+ auto output = executor->WrapResults(state.Values, listener->values()); return ctx.HolderFactory.CreateArrowBlock(std::move(output)); } @@ -189,8 +193,7 @@ private: TState& GetState(TComputationContext& ctx) const { auto& result = ctx.MutableValues[StateIndex]; if (!result.HasValue()) { - arrow::compute::CastOptions options(false); - result = ctx.HolderFactory.Create<TState>(Function, (const arrow::compute::FunctionOptions*)&options, Kernel, FunctionRegistry, ArgsValuesDescr, ctx); + result = ctx.HolderFactory.Create<TState>(Function, (const arrow::compute::FunctionOptions*)&CastOptions, Kernel, FunctionRegistry, ArgsValuesDescr, ctx); } return *static_cast<TState*>(result.AsBoxed().Get()); @@ -203,6 +206,7 @@ private: const arrow::compute::FunctionRegistry& FunctionRegistry; const arrow::compute::Function& Function; const arrow::compute::ScalarKernel& Kernel; + arrow::compute::CastOptions CastOptions; }; } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp index 9cd28fde5f0..52c414a77f8 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp @@ -32,7 +32,7 @@ public: } virtual void Add(NUdf::TUnboxedValue& value) = 0; - virtual NUdf::TUnboxedValuePod Build() = 0; + virtual NUdf::TUnboxedValuePod Build(bool finish) = 0; private: static int64_t TypeSize(arrow::DataType& itemType) { @@ -53,33 +53,39 @@ class TFixedSizeBlockBuilder : public TBlockBuilderBase { public: TFixedSizeBlockBuilder(TComputationContext& ctx, const std::shared_ptr<arrow::DataType>& itemType) : TBlockBuilderBase(ctx, itemType) - , Builder_(&Ctx_.ArrowMemoryPool) + , Builder_(std::make_unique<TBuilder>(&Ctx_.ArrowMemoryPool)) { this->Reserve(); } void Add(NUdf::TUnboxedValue& value) override { - Y_VERIFY_DEBUG(Builder_.length() < MaxLength_); + Y_VERIFY_DEBUG(Builder_->length() < MaxLength_); if (value) { - this->Builder_.UnsafeAppend(value.Get<T>()); + this->Builder_->UnsafeAppend(value.Get<T>()); } else { - this->Builder_.UnsafeAppendNull(); + this->Builder_->UnsafeAppendNull(); } } - NUdf::TUnboxedValuePod Build() override { + NUdf::TUnboxedValuePod Build(bool finish) override { std::shared_ptr<arrow::ArrayData> result; - ARROW_OK(this->Builder_.FinishInternal(&result)); + ARROW_OK(this->Builder_->FinishInternal(&result)); + Builder_.reset(); + if (!finish) { + Builder_ = std::make_unique<TBuilder>(&Ctx_.ArrowMemoryPool); + Reserve(); + } + return this->Ctx_.HolderFactory.CreateArrowBlock(std::move(result)); } private: void Reserve() { - ARROW_OK(this->Builder_.Reserve(MaxLength_)); + ARROW_OK(this->Builder_->Reserve(MaxLength_)); } private: - TBuilder Builder_; + std::unique_ptr<TBuilder> Builder_; }; std::unique_ptr<TBlockBuilderBase> MakeBlockBuilder(TComputationContext& ctx, NUdf::EDataSlot slot) { @@ -130,7 +136,7 @@ public: builder->Add(result); } - return builder->Build(); + return builder->Build(true); } private: @@ -160,12 +166,20 @@ public: NUdf::TUnboxedValue*const* output) const { auto& s = GetState(state, ctx); - size_t rows = 0; - for (; rows < s.MaxLength_; ++rows) { + if (s.IsFinished_) { + return EFetchResult::Finish; + } + + for (; s.Rows_ < s.MaxLength_; ++s.Rows_) { if (const auto result = Flow_->FetchValues(ctx, s.ValuePointers_.data()); EFetchResult::One != result) { - if (rows == 0) { + if (EFetchResult::Finish == result) { + s.IsFinished_ = true; + } + + if (EFetchResult::Yield == result || s.Rows_ == 0) { return result; } + break; } for (size_t j = 0; j < Width_; ++j) { @@ -177,14 +191,15 @@ public: for (size_t i = 0; i < Width_; ++i) { if (auto* out = output[i]; out != nullptr) { - *out = s.Builders_[i]->Build(); + *out = s.Builders_[i]->Build(s.IsFinished_); } } if (auto* out = output[Width_]; out != nullptr) { - *out = ctx.HolderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(rows))); + *out = ctx.HolderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(s.Rows_))); } + s.Rows_ = 0; return EFetchResult::One; } @@ -194,6 +209,8 @@ private: std::vector<NUdf::TUnboxedValue*> ValuePointers_; std::vector<std::unique_ptr<TBlockBuilderBase>> Builders_; size_t MaxLength_; + size_t Rows_ = 0; + bool IsFinished_ = false; TState(TMemoryUsageInfo* memInfo, TComputationContext& ctx, const TVector<NUdf::EDataSlot>& slots) : TComputationValue(memInfo) |