diff options
| author | Vasily Gerasimov <[email protected]> | 2022-07-07 15:54:06 +0300 | 
|---|---|---|
| committer | Vasily Gerasimov <[email protected]> | 2022-07-07 15:54:06 +0300 | 
| commit | 03f024c4412e3aa613bb543cf1660176320ba8f4 (patch) | |
| tree | 79ab92258e8f03e748d34adc956147fcb5241f00 | |
| parent | 738d38c64668549cf891563217279af0881109fd (diff) | |
YQ-1209 Wait for async output to finish before finishing compute actor
Wait for async output to finish before finishing compute actor
ref:d4dcdffaba799eabfb53790a89f2d6bef688c974
7 files changed, 86 insertions, 8 deletions
| diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h index 37033e92510..d934f437f41 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h @@ -110,6 +110,9 @@ struct IDqComputeActorAsyncOutput {          // Checkpointing          virtual void OnAsyncOutputStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) = 0; +        // Finishing +        virtual void OnAsyncOutputFinished(ui64 outputIndex) = 0; // Signal that async output has successfully written its finish flag and so compute actor is ready to finish. +          virtual ~ICallbacks() = default;      }; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 5cd9474552d..4fa67082d1c 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -63,8 +63,13 @@ struct TSinkCallbacks : public IDqComputeActorAsyncOutput::ICallbacks {          OnSinkStateSaved(std::move(state), outputIndex, checkpoint);      } +    void OnAsyncOutputFinished(ui64 outputIndex) override final { +        OnSinkFinished(outputIndex); +    } +      virtual void OnSinkError(ui64 outputIndex, const TIssues& issues, bool isFatal) = 0;      virtual void OnSinkStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) = 0; +    virtual void OnSinkFinished(ui64 outputIndex) = 0;  };  struct TOutputTransformCallbacks : public IDqComputeActorAsyncOutput::ICallbacks { @@ -76,8 +81,13 @@ struct TOutputTransformCallbacks : public IDqComputeActorAsyncOutput::ICallbacks          OnTransformStateSaved(std::move(state), outputIndex, checkpoint);      } +    void OnAsyncOutputFinished(ui64 outputIndex) override final { +        OnTransformFinished(outputIndex); +    } +      virtual void OnOutputTransformError(ui64 outputIndex, const TIssues& issues, bool isFatal) = 0;      virtual void OnTransformStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) = 0; +    virtual void OnTransformFinished(ui64 outputIndex) = 0;  };  namespace NDetails { @@ -440,9 +450,9 @@ protected:                      CA_LOG_D("Continue execution, not all input channels are initialized");                      return;                  } -                if (Channels->CheckInFlight("Tasks execution finished")) { +                if (Channels->CheckInFlight("Tasks execution finished") && AllAsyncOutputsFinished()) {                      State = NDqProto::COMPUTE_STATE_FINISHED; -                    CA_LOG_D("Compute state finished. All channels finished"); +                    CA_LOG_D("Compute state finished. All channels and sinks finished");                      ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::SUCCESS, {TIssue("success")});                  }              } @@ -626,7 +636,7 @@ public:              Channels->SendChannelDataAck(channel->GetChannelId(), channel->GetFreeSpace());          } -        ResumeExecution(); +        ContinueExecute();      }      void PeerFinished(ui64 channelId) override { @@ -664,6 +674,16 @@ public:          Checkpoints->OnTransformStateSaved(std::move(state), outputIndex, checkpoint);      } +    void OnSinkFinished(ui64 outputIndex) override { +        SinksMap.at(outputIndex).FinishIsAcknowledged = true; +        ContinueExecute(); +    } + +    void OnTransformFinished(ui64 outputIndex) override { +        OutputTransformsMap.at(outputIndex).FinishIsAcknowledged = true; +        ContinueExecute(); +    } +  protected:      bool ReadyToCheckpoint() const override {          for (auto& [id, channelInfo] : InputChannelsMap) { @@ -870,6 +890,7 @@ protected:          IDqComputeActorAsyncOutput* AsyncOutput = nullptr;          NActors::IActor* Actor = nullptr;          bool Finished = false; // If sink/transform is in finished state, it receives only checkpoints. +        bool FinishIsAcknowledged = false; // Async output has acknowledged its finish.          TIssuesBuffer IssuesBuffer;          bool PopStarted = false;          i64 FreeSpaceBeforeSend = 0; @@ -1523,6 +1544,24 @@ protected:          InternalError(NYql::NDqProto::StatusIds::EXTERNAL_ERROR, issues);      } +    bool AllAsyncOutputsFinished() const { +        for (const auto& [outputIndex, sinkInfo] : SinksMap) { +            if (!sinkInfo.FinishIsAcknowledged) { +                ui64 index = outputIndex; // Crutch for logging through lambda. +                CA_LOG_D("Waiting finish of sink[" << index << "]"); +                return false; +            } +        } +        for (const auto& [outputIndex, transformInfo] : OutputTransformsMap) { +            if (!transformInfo.FinishIsAcknowledged) { +                ui64 index = outputIndex; // Crutch for logging through lambda. +                CA_LOG_D("Waiting finish of transform[" << index << "]"); +                return false; +            } +        } +        return true; +    } +      virtual ui64 CalcMkqlMemoryLimit() {          auto& opts = Task.GetProgram().GetSettings();          return opts.GetHasMapJoin()/* || opts.GetHasSort()*/ diff --git a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h index b58ab1ce3f4..2ce1acd7aa5 100644 --- a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h +++ b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h @@ -106,6 +106,10 @@ class TFakeActor : public NActors::TActor<TFakeActor> {              Parent.AsyncOutputPromises.StateSaved = NThreading::NewPromise<NDqProto::TSinkState>();          }; +        void OnAsyncOutputFinished(ui64 outputIndex) override { +            Y_UNUSED(outputIndex); +        } +          TFakeActor& Parent;      }; diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp index 3f4534036ea..41756240c6b 100644 --- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp @@ -680,6 +680,10 @@ private:          SendFailure(MakeHolder<TEvDqFailure>(isFatal ? NYql::NDqProto::StatusIds::UNSPECIFIED : NYql::NDqProto::StatusIds::INTERNAL_ERROR, issues.ToString()));      } +    void OnAsyncOutputFinished(ui64 outputIndex) override { +        Y_UNUSED(outputIndex); +    } +      void OnAsyncOutputStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) override {          Y_UNUSED(state);          Y_UNUSED(outputIndex); diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp index ea5236e9b95..a51fcd240bf 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp @@ -117,9 +117,12 @@ public:          const TMaybe<NDqProto::TCheckpoint>& checkpoint,          bool finished) override      { -        Y_UNUSED(finished);          Y_UNUSED(dataSize); +        if (finished) { +            Finished = true; +        } +          CreateSessionIfNotExists();          for (const NUdf::TUnboxedValue& item : batch) { @@ -282,6 +285,7 @@ private:                  ShouldNotifyNewFreeSpace = false;              }          } +        CheckFinished();          return !events.empty();      } @@ -363,6 +367,12 @@ private:          TDqPqWriteActor& Self;      }; +    void CheckFinished() { +        if (Finished && Buffer.empty() && WaitingAcks.empty()) { +            Callbacks->OnAsyncOutputFinished(OutputIndex); +        } +    } +  private:      const ui64 OutputIndex;      const TTxId TxId; @@ -372,6 +382,7 @@ private:      IDqComputeActorAsyncOutput::ICallbacks* const Callbacks;      const TString LogPrefix;      i64 FreeSpace = 0; +    bool Finished = false;      NYdb::NPersQueue::TPersQueueClient PersQueueClient;      std::shared_ptr<NYdb::NPersQueue::IWriteSession> WriteSession; @@ -380,7 +391,6 @@ private:      ui64 ConfirmedSeqNo = 0;      std::optional<NYdb::NPersQueue::TContinuationToken> ContinuationToken;      NThreading::TFuture<void> EventFuture; -    std::queue<i64> InflightMessageSizes;      bool ShouldNotifyNewFreeSpace = false;      std::queue<TString> Buffer;      std::queue<i64> WaitingAcks; // Size of items which are waiting for acks (used to update free space) diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index c026e8280c8..2aedf58d892 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -252,7 +252,11 @@ private:          }      } -    void HandleFinished() {} +    void HandleFinished() { +        if (InputFinished && !InFlight && Parts.empty()) { +            Callbacks->OnAsyncOutputFinished(OutputIndex); +        } +    }      // IActor & IDqComputeActorAsyncOutput      void PassAway() override { // Is called from Compute Actor @@ -317,4 +321,3 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor(  }  } // namespace NYql::NDq - diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp index 9030e75ceda..f2a24eaab07 100644 --- a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp +++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp @@ -154,12 +154,16 @@ public:          NKikimr::NMiniKQL::TUnboxedValueVector&& batch,          i64,          const TMaybe<NDqProto::TCheckpoint>& checkpoint, -        bool) override +        bool finished) override      {          SINK_LOG_D("Got " << batch.size() << " items to send. Checkpoint: " << checkpoint.Defined()                     << ". Send queue: " << SendingBuffer.size() << ". Inflight: " << InflightBuffer.size()                     << ". Checkpoint in progress: " << CheckpointInProgress.has_value()); +        if (finished) { +            Finished = true; +        } +          ui64 metricsCount = 0;          for (const auto& item : batch) {              if (metricsCount + WriteParams.Shard.GetScheme().GetSensors().size() > MaxMetricsPerRequest) { @@ -182,6 +186,8 @@ public:          if (FreeSpace <= 0) {              ShouldNotifyNewFreeSpace = true;          } + +        CheckFinished();      };      void LoadState(const NDqProto::TSinkState&) override { } @@ -445,6 +451,8 @@ private:          if (CheckpointInProgress && InflightBuffer.empty()) {              DoCheckpoint();          } + +        CheckFinished();      }      void DoCheckpoint() { @@ -452,6 +460,12 @@ private:          CheckpointInProgress = std::nullopt;      } +    void CheckFinished() { +        if (Finished && InflightBuffer.empty() && SendingBuffer.empty()) { +            Callbacks->OnAsyncOutputFinished(OutputIndex); +        } +    } +  private:      const ui64 OutputIndex;      const TTxId TxId; @@ -462,6 +476,7 @@ private:      TDqSolomonWriteActorMetrics Metrics;      i64 FreeSpace = 0;      TActorId HttpProxyId; +    bool Finished = false;      TString SourceId;      bool ShouldNotifyNewFreeSpace = false; | 
