aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2022-10-21 12:56:00 +0300
committervvvv <vvvv@ydb.tech>2022-10-21 12:56:00 +0300
commit4045f3143dc5214b8aaa84891d69f4c868d5d092 (patch)
treecfbabe56e8084f4a508f0cb6620417bab5ced11b
parentfc0a02a9671836e4ba8f2e3f7c4f7cdc1948c699 (diff)
downloadydb-4045f3143dc5214b8aaa84891d69f4c868d5d092.tar.gz
fixes for yield and object reuse
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp26
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp47
2 files changed, 47 insertions, 26 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp
index f99f451172a..b9bce7f6de9 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_block_func.cpp
@@ -50,23 +50,22 @@ struct TState : public TComputationValue<TState> {
const arrow::compute::ScalarKernel& kernel,
const arrow::compute::FunctionRegistry& registry, const std::vector<arrow::ValueDescr>& argsValuesDescr, TComputationContext& ctx)
: TComputationValue(memInfo)
+ , Options(options)
, ExecContext(&ctx.ArrowMemoryPool, nullptr, const_cast<arrow::compute::FunctionRegistry*>(&registry))
, KernelContext(&ExecContext)
- , Executor(arrow::compute::detail::KernelExecutor::MakeScalar())
{
if (kernel.init) {
State = ARROW_RESULT(kernel.init(&KernelContext, { &kernel, argsValuesDescr, options }));
KernelContext.SetState(State.get());
}
- ARROW_OK(Executor->Init(&KernelContext, { &kernel, argsValuesDescr, options }));
Values.reserve(argsValuesDescr.size());
}
+ const arrow::compute::FunctionOptions* Options;
arrow::compute::ExecContext ExecContext;
arrow::compute::KernelContext KernelContext;
std::unique_ptr<arrow::compute::KernelState> State;
- std::unique_ptr<arrow::compute::detail::KernelExecutor> Executor;
std::vector<arrow::Datum> Values;
};
@@ -99,9 +98,11 @@ public:
Y_VERIFY_DEBUG(ArgsValuesDescr[i] == state.Values.back().descr());
}
- auto listener = std::make_shared<arrow::compute::detail::DatumAccumulator>();
- ARROW_OK(state.Executor->Execute(state.Values, listener.get()));
- auto output = state.Executor->WrapResults(state.Values, listener->values());
+ auto listener = std::make_shared<arrow::compute::detail::DatumAccumulator>();
+ auto executor = arrow::compute::detail::KernelExecutor::MakeScalar();
+ ARROW_OK(executor->Init(&state.KernelContext, { &Kernel, ArgsValuesDescr, state.Options }));
+ ARROW_OK(executor->Execute(state.Values, listener.get()));
+ auto output = executor->WrapResults(state.Values, listener->values());
return ctx.HolderFactory.CreateArrowBlock(std::move(output));
}
@@ -154,6 +155,7 @@ public:
, FunctionRegistry(functionRegistry)
, Function(ResolveFunction(FunctionRegistry, to))
, Kernel(ResolveKernel(Function, ArgsValuesDescr))
+ , CastOptions(false)
{
}
@@ -164,9 +166,11 @@ public:
state.Values.emplace_back(TArrowBlock::From(Arg->GetValue(ctx)).GetDatum());
Y_VERIFY_DEBUG(ArgsValuesDescr[0] == state.Values.back().descr());
- auto listener = std::make_shared<arrow::compute::detail::DatumAccumulator>();
- ARROW_OK(state.Executor->Execute(state.Values, listener.get()));
- auto output = state.Executor->WrapResults(state.Values, listener->values());
+ auto listener = std::make_shared<arrow::compute::detail::DatumAccumulator>();
+ auto executor = arrow::compute::detail::KernelExecutor::MakeScalar();
+ ARROW_OK(executor->Init(&state.KernelContext, { &Kernel, ArgsValuesDescr, state.Options }));
+ ARROW_OK(executor->Execute(state.Values, listener.get()));
+ auto output = executor->WrapResults(state.Values, listener->values());
return ctx.HolderFactory.CreateArrowBlock(std::move(output));
}
@@ -189,8 +193,7 @@ private:
TState& GetState(TComputationContext& ctx) const {
auto& result = ctx.MutableValues[StateIndex];
if (!result.HasValue()) {
- arrow::compute::CastOptions options(false);
- result = ctx.HolderFactory.Create<TState>(Function, (const arrow::compute::FunctionOptions*)&options, Kernel, FunctionRegistry, ArgsValuesDescr, ctx);
+ result = ctx.HolderFactory.Create<TState>(Function, (const arrow::compute::FunctionOptions*)&CastOptions, Kernel, FunctionRegistry, ArgsValuesDescr, ctx);
}
return *static_cast<TState*>(result.AsBoxed().Get());
@@ -203,6 +206,7 @@ private:
const arrow::compute::FunctionRegistry& FunctionRegistry;
const arrow::compute::Function& Function;
const arrow::compute::ScalarKernel& Kernel;
+ arrow::compute::CastOptions CastOptions;
};
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
index 9cd28fde5f0..52c414a77f8 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_blocks.cpp
@@ -32,7 +32,7 @@ public:
}
virtual void Add(NUdf::TUnboxedValue& value) = 0;
- virtual NUdf::TUnboxedValuePod Build() = 0;
+ virtual NUdf::TUnboxedValuePod Build(bool finish) = 0;
private:
static int64_t TypeSize(arrow::DataType& itemType) {
@@ -53,33 +53,39 @@ class TFixedSizeBlockBuilder : public TBlockBuilderBase {
public:
TFixedSizeBlockBuilder(TComputationContext& ctx, const std::shared_ptr<arrow::DataType>& itemType)
: TBlockBuilderBase(ctx, itemType)
- , Builder_(&Ctx_.ArrowMemoryPool)
+ , Builder_(std::make_unique<TBuilder>(&Ctx_.ArrowMemoryPool))
{
this->Reserve();
}
void Add(NUdf::TUnboxedValue& value) override {
- Y_VERIFY_DEBUG(Builder_.length() < MaxLength_);
+ Y_VERIFY_DEBUG(Builder_->length() < MaxLength_);
if (value) {
- this->Builder_.UnsafeAppend(value.Get<T>());
+ this->Builder_->UnsafeAppend(value.Get<T>());
} else {
- this->Builder_.UnsafeAppendNull();
+ this->Builder_->UnsafeAppendNull();
}
}
- NUdf::TUnboxedValuePod Build() override {
+ NUdf::TUnboxedValuePod Build(bool finish) override {
std::shared_ptr<arrow::ArrayData> result;
- ARROW_OK(this->Builder_.FinishInternal(&result));
+ ARROW_OK(this->Builder_->FinishInternal(&result));
+ Builder_.reset();
+ if (!finish) {
+ Builder_ = std::make_unique<TBuilder>(&Ctx_.ArrowMemoryPool);
+ Reserve();
+ }
+
return this->Ctx_.HolderFactory.CreateArrowBlock(std::move(result));
}
private:
void Reserve() {
- ARROW_OK(this->Builder_.Reserve(MaxLength_));
+ ARROW_OK(this->Builder_->Reserve(MaxLength_));
}
private:
- TBuilder Builder_;
+ std::unique_ptr<TBuilder> Builder_;
};
std::unique_ptr<TBlockBuilderBase> MakeBlockBuilder(TComputationContext& ctx, NUdf::EDataSlot slot) {
@@ -130,7 +136,7 @@ public:
builder->Add(result);
}
- return builder->Build();
+ return builder->Build(true);
}
private:
@@ -160,12 +166,20 @@ public:
NUdf::TUnboxedValue*const* output) const
{
auto& s = GetState(state, ctx);
- size_t rows = 0;
- for (; rows < s.MaxLength_; ++rows) {
+ if (s.IsFinished_) {
+ return EFetchResult::Finish;
+ }
+
+ for (; s.Rows_ < s.MaxLength_; ++s.Rows_) {
if (const auto result = Flow_->FetchValues(ctx, s.ValuePointers_.data()); EFetchResult::One != result) {
- if (rows == 0) {
+ if (EFetchResult::Finish == result) {
+ s.IsFinished_ = true;
+ }
+
+ if (EFetchResult::Yield == result || s.Rows_ == 0) {
return result;
}
+
break;
}
for (size_t j = 0; j < Width_; ++j) {
@@ -177,14 +191,15 @@ public:
for (size_t i = 0; i < Width_; ++i) {
if (auto* out = output[i]; out != nullptr) {
- *out = s.Builders_[i]->Build();
+ *out = s.Builders_[i]->Build(s.IsFinished_);
}
}
if (auto* out = output[Width_]; out != nullptr) {
- *out = ctx.HolderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(rows)));
+ *out = ctx.HolderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(s.Rows_)));
}
+ s.Rows_ = 0;
return EFetchResult::One;
}
@@ -194,6 +209,8 @@ private:
std::vector<NUdf::TUnboxedValue*> ValuePointers_;
std::vector<std::unique_ptr<TBlockBuilderBase>> Builders_;
size_t MaxLength_;
+ size_t Rows_ = 0;
+ bool IsFinished_ = false;
TState(TMemoryUsageInfo* memInfo, TComputationContext& ctx, const TVector<NUdf::EDataSlot>& slots)
: TComputationValue(memInfo)