diff options
author | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-07-07 15:54:06 +0300 |
---|---|---|
committer | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-07-07 15:54:06 +0300 |
commit | 03f024c4412e3aa613bb543cf1660176320ba8f4 (patch) | |
tree | 79ab92258e8f03e748d34adc956147fcb5241f00 | |
parent | 738d38c64668549cf891563217279af0881109fd (diff) | |
download | ydb-03f024c4412e3aa613bb543cf1660176320ba8f4.tar.gz |
YQ-1209 Wait for async output to finish before finishing compute actor
Wait for async output to finish before finishing compute actor
ref:d4dcdffaba799eabfb53790a89f2d6bef688c974
7 files changed, 86 insertions, 8 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h index 37033e9251..d934f437f4 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h @@ -110,6 +110,9 @@ struct IDqComputeActorAsyncOutput { // Checkpointing virtual void OnAsyncOutputStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) = 0; + // Finishing + virtual void OnAsyncOutputFinished(ui64 outputIndex) = 0; // Signal that async output has successfully written its finish flag and so compute actor is ready to finish. + virtual ~ICallbacks() = default; }; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 5cd9474552..4fa67082d1 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -63,8 +63,13 @@ struct TSinkCallbacks : public IDqComputeActorAsyncOutput::ICallbacks { OnSinkStateSaved(std::move(state), outputIndex, checkpoint); } + void OnAsyncOutputFinished(ui64 outputIndex) override final { + OnSinkFinished(outputIndex); + } + virtual void OnSinkError(ui64 outputIndex, const TIssues& issues, bool isFatal) = 0; virtual void OnSinkStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) = 0; + virtual void OnSinkFinished(ui64 outputIndex) = 0; }; struct TOutputTransformCallbacks : public IDqComputeActorAsyncOutput::ICallbacks { @@ -76,8 +81,13 @@ struct TOutputTransformCallbacks : public IDqComputeActorAsyncOutput::ICallbacks OnTransformStateSaved(std::move(state), outputIndex, checkpoint); } + void OnAsyncOutputFinished(ui64 outputIndex) override final { + OnTransformFinished(outputIndex); + } + virtual void OnOutputTransformError(ui64 outputIndex, const TIssues& issues, bool isFatal) = 0; virtual void OnTransformStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) = 0; + virtual void OnTransformFinished(ui64 outputIndex) = 0; }; namespace NDetails { @@ -440,9 +450,9 @@ protected: CA_LOG_D("Continue execution, not all input channels are initialized"); return; } - if (Channels->CheckInFlight("Tasks execution finished")) { + if (Channels->CheckInFlight("Tasks execution finished") && AllAsyncOutputsFinished()) { State = NDqProto::COMPUTE_STATE_FINISHED; - CA_LOG_D("Compute state finished. All channels finished"); + CA_LOG_D("Compute state finished. All channels and sinks finished"); ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::SUCCESS, {TIssue("success")}); } } @@ -626,7 +636,7 @@ public: Channels->SendChannelDataAck(channel->GetChannelId(), channel->GetFreeSpace()); } - ResumeExecution(); + ContinueExecute(); } void PeerFinished(ui64 channelId) override { @@ -664,6 +674,16 @@ public: Checkpoints->OnTransformStateSaved(std::move(state), outputIndex, checkpoint); } + void OnSinkFinished(ui64 outputIndex) override { + SinksMap.at(outputIndex).FinishIsAcknowledged = true; + ContinueExecute(); + } + + void OnTransformFinished(ui64 outputIndex) override { + OutputTransformsMap.at(outputIndex).FinishIsAcknowledged = true; + ContinueExecute(); + } + protected: bool ReadyToCheckpoint() const override { for (auto& [id, channelInfo] : InputChannelsMap) { @@ -870,6 +890,7 @@ protected: IDqComputeActorAsyncOutput* AsyncOutput = nullptr; NActors::IActor* Actor = nullptr; bool Finished = false; // If sink/transform is in finished state, it receives only checkpoints. + bool FinishIsAcknowledged = false; // Async output has acknowledged its finish. TIssuesBuffer IssuesBuffer; bool PopStarted = false; i64 FreeSpaceBeforeSend = 0; @@ -1523,6 +1544,24 @@ protected: InternalError(NYql::NDqProto::StatusIds::EXTERNAL_ERROR, issues); } + bool AllAsyncOutputsFinished() const { + for (const auto& [outputIndex, sinkInfo] : SinksMap) { + if (!sinkInfo.FinishIsAcknowledged) { + ui64 index = outputIndex; // Crutch for logging through lambda. + CA_LOG_D("Waiting finish of sink[" << index << "]"); + return false; + } + } + for (const auto& [outputIndex, transformInfo] : OutputTransformsMap) { + if (!transformInfo.FinishIsAcknowledged) { + ui64 index = outputIndex; // Crutch for logging through lambda. + CA_LOG_D("Waiting finish of transform[" << index << "]"); + return false; + } + } + return true; + } + virtual ui64 CalcMkqlMemoryLimit() { auto& opts = Task.GetProgram().GetSettings(); return opts.GetHasMapJoin()/* || opts.GetHasSort()*/ diff --git a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h index b58ab1ce3f..2ce1acd7aa 100644 --- a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h +++ b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h @@ -106,6 +106,10 @@ class TFakeActor : public NActors::TActor<TFakeActor> { Parent.AsyncOutputPromises.StateSaved = NThreading::NewPromise<NDqProto::TSinkState>(); }; + void OnAsyncOutputFinished(ui64 outputIndex) override { + Y_UNUSED(outputIndex); + } + TFakeActor& Parent; }; diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp index 3f4534036e..41756240c6 100644 --- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp @@ -680,6 +680,10 @@ private: SendFailure(MakeHolder<TEvDqFailure>(isFatal ? NYql::NDqProto::StatusIds::UNSPECIFIED : NYql::NDqProto::StatusIds::INTERNAL_ERROR, issues.ToString())); } + void OnAsyncOutputFinished(ui64 outputIndex) override { + Y_UNUSED(outputIndex); + } + void OnAsyncOutputStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) override { Y_UNUSED(state); Y_UNUSED(outputIndex); diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp index ea5236e9b9..a51fcd240b 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp @@ -117,9 +117,12 @@ public: const TMaybe<NDqProto::TCheckpoint>& checkpoint, bool finished) override { - Y_UNUSED(finished); Y_UNUSED(dataSize); + if (finished) { + Finished = true; + } + CreateSessionIfNotExists(); for (const NUdf::TUnboxedValue& item : batch) { @@ -282,6 +285,7 @@ private: ShouldNotifyNewFreeSpace = false; } } + CheckFinished(); return !events.empty(); } @@ -363,6 +367,12 @@ private: TDqPqWriteActor& Self; }; + void CheckFinished() { + if (Finished && Buffer.empty() && WaitingAcks.empty()) { + Callbacks->OnAsyncOutputFinished(OutputIndex); + } + } + private: const ui64 OutputIndex; const TTxId TxId; @@ -372,6 +382,7 @@ private: IDqComputeActorAsyncOutput::ICallbacks* const Callbacks; const TString LogPrefix; i64 FreeSpace = 0; + bool Finished = false; NYdb::NPersQueue::TPersQueueClient PersQueueClient; std::shared_ptr<NYdb::NPersQueue::IWriteSession> WriteSession; @@ -380,7 +391,6 @@ private: ui64 ConfirmedSeqNo = 0; std::optional<NYdb::NPersQueue::TContinuationToken> ContinuationToken; NThreading::TFuture<void> EventFuture; - std::queue<i64> InflightMessageSizes; bool ShouldNotifyNewFreeSpace = false; std::queue<TString> Buffer; std::queue<i64> WaitingAcks; // Size of items which are waiting for acks (used to update free space) diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index c026e8280c..2aedf58d89 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -252,7 +252,11 @@ private: } } - void HandleFinished() {} + void HandleFinished() { + if (InputFinished && !InFlight && Parts.empty()) { + Callbacks->OnAsyncOutputFinished(OutputIndex); + } + } // IActor & IDqComputeActorAsyncOutput void PassAway() override { // Is called from Compute Actor @@ -317,4 +321,3 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor( } } // namespace NYql::NDq - diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp index 9030e75ced..f2a24eaab0 100644 --- a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp +++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp @@ -154,12 +154,16 @@ public: NKikimr::NMiniKQL::TUnboxedValueVector&& batch, i64, const TMaybe<NDqProto::TCheckpoint>& checkpoint, - bool) override + bool finished) override { SINK_LOG_D("Got " << batch.size() << " items to send. Checkpoint: " << checkpoint.Defined() << ". Send queue: " << SendingBuffer.size() << ". Inflight: " << InflightBuffer.size() << ". Checkpoint in progress: " << CheckpointInProgress.has_value()); + if (finished) { + Finished = true; + } + ui64 metricsCount = 0; for (const auto& item : batch) { if (metricsCount + WriteParams.Shard.GetScheme().GetSensors().size() > MaxMetricsPerRequest) { @@ -182,6 +186,8 @@ public: if (FreeSpace <= 0) { ShouldNotifyNewFreeSpace = true; } + + CheckFinished(); }; void LoadState(const NDqProto::TSinkState&) override { } @@ -445,6 +451,8 @@ private: if (CheckpointInProgress && InflightBuffer.empty()) { DoCheckpoint(); } + + CheckFinished(); } void DoCheckpoint() { @@ -452,6 +460,12 @@ private: CheckpointInProgress = std::nullopt; } + void CheckFinished() { + if (Finished && InflightBuffer.empty() && SendingBuffer.empty()) { + Callbacks->OnAsyncOutputFinished(OutputIndex); + } + } + private: const ui64 OutputIndex; const TTxId TxId; @@ -462,6 +476,7 @@ private: TDqSolomonWriteActorMetrics Metrics; i64 FreeSpace = 0; TActorId HttpProxyId; + bool Finished = false; TString SourceId; bool ShouldNotifyNewFreeSpace = false; |