diff options
5 files changed, 36 insertions, 10 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 0eb406aded3..006c315aae6 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1402,8 +1402,20 @@ protected: } void PollAsyncInput() { + if (!Running) { + CA_LOG_T("Skip polling inputs and sources because not running"); + return; + } + + CA_LOG_T("Poll inputs"); + for (auto& [inputIndex, transform] : InputTransformsMap) { + if (auto resume = transform.PollAsyncInput(MetricsReporter, WatermarksTracker, RuntimeSettings.AsyncInputPushLimit)) { + ContinueExecute(*resume); + } + } + // Don't produce any input from sources if we're about to save checkpoint. - if (!Running || (Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved())) { + if ((Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved())) { CA_LOG_T("Skip polling sources because of pending checkpoint"); return; } @@ -1414,13 +1426,6 @@ protected: ContinueExecute(*resume); } } - - CA_LOG_T("Poll inputs"); - for (auto& [inputIndex, transform] : InputTransformsMap) { - if (auto resume = transform.PollAsyncInput(MetricsReporter, WatermarksTracker, RuntimeSettings.AsyncInputPushLimit)) { - ContinueExecute(*resume); - } - } } void OnNewAsyncInputDataArrived(const IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::TPtr& ev) { diff --git a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp index 77b53fff73f..23f9cd0b702 100644 --- a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp +++ b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp @@ -184,7 +184,7 @@ private: //IDqComputeActorAsyncInput } } finished = IsFinished(); - return 0; + return AwaitingQueue.RowCount(); } TMaybe<google::protobuf::Any> ExtraData() override { diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp index 1f708c54381..5ea1337e6a5 100644 --- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp +++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp @@ -144,6 +144,18 @@ private: return false; } } + for (const auto transformId: InputTransforms) { + const auto t = TaskRunner->GetInputTransform(transformId); + if (t) { + auto [_, transform] = *t; + if (!transform->Empty()) { + return false; + } + if (transform->IsPending()) { + return false; + } + } + } return true; } @@ -443,6 +455,7 @@ private: for (auto i = 0; i != inputs.size(); ++i) { if (auto t = TaskRunner->GetInputTransform(i)) { inputTransforms[i] = *t; + InputTransforms.emplace(i); } } @@ -490,6 +503,7 @@ private: const TTxId TxId; const ui64 TaskId; THashSet<ui32> Inputs; + THashSet<ui32> InputTransforms; THashSet<ui32> Sources; TIntrusivePtr<NDq::IDqTaskRunner> TaskRunner; THashSet<ui32> InputChannelsWithDisabledCheckpoints; diff --git a/ydb/library/yql/dq/runtime/dq_async_input.cpp b/ydb/library/yql/dq/runtime/dq_async_input.cpp index 7d515e5cb31..9f5c1704813 100644 --- a/ydb/library/yql/dq/runtime/dq_async_input.cpp +++ b/ydb/library/yql/dq/runtime/dq_async_input.cpp @@ -6,6 +6,7 @@ namespace NYql::NDq { class TDqAsyncInputBuffer : public TDqInputImpl<TDqAsyncInputBuffer, IDqAsyncInputBuffer> { using TBaseImpl = TDqInputImpl<TDqAsyncInputBuffer, IDqAsyncInputBuffer>; friend TBaseImpl; + bool Pending = false; public: TDqAsyncInputBufferStats PushStats; TDqInputStats PopStats; @@ -32,7 +33,7 @@ public: } void Push(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space) override { - Y_ABORT_UNLESS(!batch.empty() || !space); + Pending = space != 0; if (!batch.empty()) { AddBatch(std::move(batch), space); } @@ -41,6 +42,10 @@ public: virtual void Push(TDqSerializedBatch&&, i64) override { YQL_ENSURE(!"Unimplemented"); } + + bool IsPending() const override { + return Pending; + } }; IDqAsyncInputBuffer::TPtr CreateDqAsyncInputBuffer( diff --git a/ydb/library/yql/dq/runtime/dq_async_input.h b/ydb/library/yql/dq/runtime/dq_async_input.h index 939f278709d..ee881a4b6de 100644 --- a/ydb/library/yql/dq/runtime/dq_async_input.h +++ b/ydb/library/yql/dq/runtime/dq_async_input.h @@ -23,6 +23,8 @@ public: virtual void Push(TDqSerializedBatch&& batch, i64 space) = 0; virtual void Finish() = 0; + + virtual bool IsPending() const { return false; }; }; IDqAsyncInputBuffer::TPtr CreateDqAsyncInputBuffer(ui64 inputIndex, const TString& type, NKikimr::NMiniKQL::TType* inputType, ui64 maxBufferBytes, |
