diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-01-31 12:50:47 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-01-31 12:50:47 +0300 |
commit | bfc4889dce7f30e1bf2fef4fb6d9d079c1cbb105 (patch) | |
tree | 3012896f5bcb4a611e030a7a2a817a7693b6a04f | |
parent | f81ed92a57f366855191aed00ea789816f5a6c69 (diff) | |
download | ydb-bfc4889dce7f30e1bf2fef4fb6d9d079c1cbb105.tar.gz |
IsFull method optimization for shuffle consumer
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_output_consumer.cpp | 31 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_output_consumer.h | 13 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_tasks_runner.cpp | 19 |
3 files changed, 59 insertions, 4 deletions
diff --git a/ydb/library/yql/dq/runtime/dq_output_consumer.cpp b/ydb/library/yql/dq/runtime/dq_output_consumer.cpp index 7f182c3e426..da9e8315c0b 100644 --- a/ydb/library/yql/dq/runtime/dq_output_consumer.cpp +++ b/ydb/library/yql/dq/runtime/dq_output_consumer.cpp @@ -81,6 +81,25 @@ private: }; class TDqOutputHashPartitionConsumer : public IDqOutputConsumer { +private: + mutable bool IsWaitingFlag = false; + mutable TUnboxedValue WaitingValue; + mutable IDqOutput::TPtr OutputWaiting; +protected: + void DrainWaiting() const { + if (Y_UNLIKELY(IsWaitingFlag)) { + if (OutputWaiting->IsFull()) { + return; + } + OutputWaiting->Push(std::move(WaitingValue)); + IsWaitingFlag = false; + } + } + + virtual bool DoTryFinish() override { + DrainWaiting(); + return !IsWaitingFlag; + } public: TDqOutputHashPartitionConsumer(TVector<IDqOutput::TPtr>&& outputs, TVector<NKikimr::NMiniKQL::TType*>&& keyColumnTypes, TVector<ui32>&& keyColumnIndices) @@ -96,12 +115,20 @@ public: } bool IsFull() const override { - return AnyOf(Outputs, [](const auto& output) { return output->IsFull(); }); + DrainWaiting(); + return IsWaitingFlag; } void Consume(TUnboxedValue&& value) final { ui32 partitionIndex = GetHashPartitionIndex(value); - Outputs[partitionIndex]->Push(std::move(value)); + if (Outputs[partitionIndex]->IsFull()) { + YQL_ENSURE(!IsWaitingFlag); + IsWaitingFlag = true; + OutputWaiting = Outputs[partitionIndex]; + WaitingValue = std::move(value); + } else { + Outputs[partitionIndex]->Push(std::move(value)); + } } void Consume(NDqProto::TCheckpoint&& checkpoint) override { diff --git a/ydb/library/yql/dq/runtime/dq_output_consumer.h b/ydb/library/yql/dq/runtime/dq_output_consumer.h index 6aee7c6df0a..cdfcec0eee3 100644 --- a/ydb/library/yql/dq/runtime/dq_output_consumer.h +++ b/ydb/library/yql/dq/runtime/dq_output_consumer.h @@ -12,16 +12,29 @@ namespace NYql::NDq { class IDqOutputConsumer : public TSimpleRefCount<IDqOutputConsumer>, public NKikimr::NMiniKQL::TWithDefaultMiniKQLAlloc { +private: + bool IsFinishingFlag = false; public: using TPtr = TIntrusivePtr<IDqOutputConsumer>; public: virtual ~IDqOutputConsumer() = default; + bool TryFinish() { + IsFinishingFlag = true; + return DoTryFinish(); + } virtual bool IsFull() const = 0; virtual void Consume(NKikimr::NUdf::TUnboxedValue&& value) = 0; virtual void Consume(NDqProto::TCheckpoint&& checkpoint) = 0; virtual void Finish() = 0; + bool IsFinishing() const { + return IsFinishingFlag; + } +protected: + virtual bool DoTryFinish() { + return true; + } }; IDqOutputConsumer::TPtr CreateOutputMultiConsumer(TVector<IDqOutputConsumer::TPtr>&& consumers); diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index a97aa628693..79bb5cc96c2 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -827,6 +827,11 @@ private: return Context.Alloc ? *Context.Alloc : *SelfAlloc; } + void FinishImpl() { + LOG(TStringBuilder() << "task" << TaskId << ", execution finished, finish consumers"); + Output->Finish(); + } + ERunStatus FetchAndDispatch() { if (!Output) { LOG("no consumers, Finish execution"); @@ -847,6 +852,14 @@ private: }; auto guard = BindAllocator(); + if (Output->IsFinishing()) { + if (Output->TryFinish()) { + FinishImpl(); + return ERunStatus::Finished; + } else { + return ERunStatus::PendingOutput; + } + } while (!Output->IsFull()) { if (Y_UNLIKELY(CollectProfileStats)) { auto now = TInstant::Now(); @@ -863,8 +876,10 @@ private: break; } case NUdf::EFetchStatus::Finish: { - LOG(TStringBuilder() << "task" << TaskId << ", execution finished, finish consumers"); - Output->Finish(); + if (!Output->TryFinish()) { + break; + } + FinishImpl(); return ERunStatus::Finished; } case NUdf::EFetchStatus::Yield: { |