aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-01-31 12:50:47 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-01-31 12:50:47 +0300
commitbfc4889dce7f30e1bf2fef4fb6d9d079c1cbb105 (patch)
tree3012896f5bcb4a611e030a7a2a817a7693b6a04f
parentf81ed92a57f366855191aed00ea789816f5a6c69 (diff)
downloadydb-bfc4889dce7f30e1bf2fef4fb6d9d079c1cbb105.tar.gz
IsFull method optimization for shuffle consumer
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_consumer.cpp31
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_consumer.h13
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.cpp19
3 files changed, 59 insertions, 4 deletions
diff --git a/ydb/library/yql/dq/runtime/dq_output_consumer.cpp b/ydb/library/yql/dq/runtime/dq_output_consumer.cpp
index 7f182c3e426..da9e8315c0b 100644
--- a/ydb/library/yql/dq/runtime/dq_output_consumer.cpp
+++ b/ydb/library/yql/dq/runtime/dq_output_consumer.cpp
@@ -81,6 +81,25 @@ private:
};
class TDqOutputHashPartitionConsumer : public IDqOutputConsumer {
+private:
+ mutable bool IsWaitingFlag = false;
+ mutable TUnboxedValue WaitingValue;
+ mutable IDqOutput::TPtr OutputWaiting;
+protected:
+ void DrainWaiting() const {
+ if (Y_UNLIKELY(IsWaitingFlag)) {
+ if (OutputWaiting->IsFull()) {
+ return;
+ }
+ OutputWaiting->Push(std::move(WaitingValue));
+ IsWaitingFlag = false;
+ }
+ }
+
+ virtual bool DoTryFinish() override {
+ DrainWaiting();
+ return !IsWaitingFlag;
+ }
public:
TDqOutputHashPartitionConsumer(TVector<IDqOutput::TPtr>&& outputs,
TVector<NKikimr::NMiniKQL::TType*>&& keyColumnTypes, TVector<ui32>&& keyColumnIndices)
@@ -96,12 +115,20 @@ public:
}
bool IsFull() const override {
- return AnyOf(Outputs, [](const auto& output) { return output->IsFull(); });
+ DrainWaiting();
+ return IsWaitingFlag;
}
void Consume(TUnboxedValue&& value) final {
ui32 partitionIndex = GetHashPartitionIndex(value);
- Outputs[partitionIndex]->Push(std::move(value));
+ if (Outputs[partitionIndex]->IsFull()) {
+ YQL_ENSURE(!IsWaitingFlag);
+ IsWaitingFlag = true;
+ OutputWaiting = Outputs[partitionIndex];
+ WaitingValue = std::move(value);
+ } else {
+ Outputs[partitionIndex]->Push(std::move(value));
+ }
}
void Consume(NDqProto::TCheckpoint&& checkpoint) override {
diff --git a/ydb/library/yql/dq/runtime/dq_output_consumer.h b/ydb/library/yql/dq/runtime/dq_output_consumer.h
index 6aee7c6df0a..cdfcec0eee3 100644
--- a/ydb/library/yql/dq/runtime/dq_output_consumer.h
+++ b/ydb/library/yql/dq/runtime/dq_output_consumer.h
@@ -12,16 +12,29 @@ namespace NYql::NDq {
class IDqOutputConsumer : public TSimpleRefCount<IDqOutputConsumer>,
public NKikimr::NMiniKQL::TWithDefaultMiniKQLAlloc {
+private:
+ bool IsFinishingFlag = false;
public:
using TPtr = TIntrusivePtr<IDqOutputConsumer>;
public:
virtual ~IDqOutputConsumer() = default;
+ bool TryFinish() {
+ IsFinishingFlag = true;
+ return DoTryFinish();
+ }
virtual bool IsFull() const = 0;
virtual void Consume(NKikimr::NUdf::TUnboxedValue&& value) = 0;
virtual void Consume(NDqProto::TCheckpoint&& checkpoint) = 0;
virtual void Finish() = 0;
+ bool IsFinishing() const {
+ return IsFinishingFlag;
+ }
+protected:
+ virtual bool DoTryFinish() {
+ return true;
+ }
};
IDqOutputConsumer::TPtr CreateOutputMultiConsumer(TVector<IDqOutputConsumer::TPtr>&& consumers);
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
index a97aa628693..79bb5cc96c2 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
@@ -827,6 +827,11 @@ private:
return Context.Alloc ? *Context.Alloc : *SelfAlloc;
}
+ void FinishImpl() {
+ LOG(TStringBuilder() << "task" << TaskId << ", execution finished, finish consumers");
+ Output->Finish();
+ }
+
ERunStatus FetchAndDispatch() {
if (!Output) {
LOG("no consumers, Finish execution");
@@ -847,6 +852,14 @@ private:
};
auto guard = BindAllocator();
+ if (Output->IsFinishing()) {
+ if (Output->TryFinish()) {
+ FinishImpl();
+ return ERunStatus::Finished;
+ } else {
+ return ERunStatus::PendingOutput;
+ }
+ }
while (!Output->IsFull()) {
if (Y_UNLIKELY(CollectProfileStats)) {
auto now = TInstant::Now();
@@ -863,8 +876,10 @@ private:
break;
}
case NUdf::EFetchStatus::Finish: {
- LOG(TStringBuilder() << "task" << TaskId << ", execution finished, finish consumers");
- Output->Finish();
+ if (!Output->TryFinish()) {
+ break;
+ }
+ FinishImpl();
return ERunStatus::Finished;
}
case NUdf::EFetchStatus::Yield: {