summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h21
-rw-r--r--ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp2
-rw-r--r--ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp14
-rw-r--r--ydb/library/yql/dq/runtime/dq_async_input.cpp7
-rw-r--r--ydb/library/yql/dq/runtime/dq_async_input.h2
5 files changed, 36 insertions, 10 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 0eb406aded3..006c315aae6 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -1402,8 +1402,20 @@ protected:
}
void PollAsyncInput() {
+ if (!Running) {
+ CA_LOG_T("Skip polling inputs and sources because not running");
+ return;
+ }
+
+ CA_LOG_T("Poll inputs");
+ for (auto& [inputIndex, transform] : InputTransformsMap) {
+ if (auto resume = transform.PollAsyncInput(MetricsReporter, WatermarksTracker, RuntimeSettings.AsyncInputPushLimit)) {
+ ContinueExecute(*resume);
+ }
+ }
+
// Don't produce any input from sources if we're about to save checkpoint.
- if (!Running || (Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved())) {
+ if ((Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved())) {
CA_LOG_T("Skip polling sources because of pending checkpoint");
return;
}
@@ -1414,13 +1426,6 @@ protected:
ContinueExecute(*resume);
}
}
-
- CA_LOG_T("Poll inputs");
- for (auto& [inputIndex, transform] : InputTransformsMap) {
- if (auto resume = transform.PollAsyncInput(MetricsReporter, WatermarksTracker, RuntimeSettings.AsyncInputPushLimit)) {
- ContinueExecute(*resume);
- }
- }
}
void OnNewAsyncInputDataArrived(const IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::TPtr& ev) {
diff --git a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp
index 77b53fff73f..23f9cd0b702 100644
--- a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp
+++ b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp
@@ -184,7 +184,7 @@ private: //IDqComputeActorAsyncInput
}
}
finished = IsFinished();
- return 0;
+ return AwaitingQueue.RowCount();
}
TMaybe<google::protobuf::Any> ExtraData() override {
diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
index 1f708c54381..5ea1337e6a5 100644
--- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
+++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
@@ -144,6 +144,18 @@ private:
return false;
}
}
+ for (const auto transformId: InputTransforms) {
+ const auto t = TaskRunner->GetInputTransform(transformId);
+ if (t) {
+ auto [_, transform] = *t;
+ if (!transform->Empty()) {
+ return false;
+ }
+ if (transform->IsPending()) {
+ return false;
+ }
+ }
+ }
return true;
}
@@ -443,6 +455,7 @@ private:
for (auto i = 0; i != inputs.size(); ++i) {
if (auto t = TaskRunner->GetInputTransform(i)) {
inputTransforms[i] = *t;
+ InputTransforms.emplace(i);
}
}
@@ -490,6 +503,7 @@ private:
const TTxId TxId;
const ui64 TaskId;
THashSet<ui32> Inputs;
+ THashSet<ui32> InputTransforms;
THashSet<ui32> Sources;
TIntrusivePtr<NDq::IDqTaskRunner> TaskRunner;
THashSet<ui32> InputChannelsWithDisabledCheckpoints;
diff --git a/ydb/library/yql/dq/runtime/dq_async_input.cpp b/ydb/library/yql/dq/runtime/dq_async_input.cpp
index 7d515e5cb31..9f5c1704813 100644
--- a/ydb/library/yql/dq/runtime/dq_async_input.cpp
+++ b/ydb/library/yql/dq/runtime/dq_async_input.cpp
@@ -6,6 +6,7 @@ namespace NYql::NDq {
class TDqAsyncInputBuffer : public TDqInputImpl<TDqAsyncInputBuffer, IDqAsyncInputBuffer> {
using TBaseImpl = TDqInputImpl<TDqAsyncInputBuffer, IDqAsyncInputBuffer>;
friend TBaseImpl;
+ bool Pending = false;
public:
TDqAsyncInputBufferStats PushStats;
TDqInputStats PopStats;
@@ -32,7 +33,7 @@ public:
}
void Push(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space) override {
- Y_ABORT_UNLESS(!batch.empty() || !space);
+ Pending = space != 0;
if (!batch.empty()) {
AddBatch(std::move(batch), space);
}
@@ -41,6 +42,10 @@ public:
virtual void Push(TDqSerializedBatch&&, i64) override {
YQL_ENSURE(!"Unimplemented");
}
+
+ bool IsPending() const override {
+ return Pending;
+ }
};
IDqAsyncInputBuffer::TPtr CreateDqAsyncInputBuffer(
diff --git a/ydb/library/yql/dq/runtime/dq_async_input.h b/ydb/library/yql/dq/runtime/dq_async_input.h
index 939f278709d..ee881a4b6de 100644
--- a/ydb/library/yql/dq/runtime/dq_async_input.h
+++ b/ydb/library/yql/dq/runtime/dq_async_input.h
@@ -23,6 +23,8 @@ public:
virtual void Push(TDqSerializedBatch&& batch, i64 space) = 0;
virtual void Finish() = 0;
+
+ virtual bool IsPending() const { return false; };
};
IDqAsyncInputBuffer::TPtr CreateDqAsyncInputBuffer(ui64 inputIndex, const TString& type, NKikimr::NMiniKQL::TType* inputType, ui64 maxBufferBytes,