aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVasily Gerasimov <UgnineSirdis@gmail.com>2022-07-07 15:54:06 +0300
committerVasily Gerasimov <UgnineSirdis@gmail.com>2022-07-07 15:54:06 +0300
commit03f024c4412e3aa613bb543cf1660176320ba8f4 (patch)
tree79ab92258e8f03e748d34adc956147fcb5241f00
parent738d38c64668549cf891563217279af0881109fd (diff)
downloadydb-03f024c4412e3aa613bb543cf1660176320ba8f4.tar.gz
YQ-1209 Wait for async output to finish before finishing compute actor
Wait for async output to finish before finishing compute actor ref:d4dcdffaba799eabfb53790a89f2d6bef688c974
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h3
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h45
-rw-r--r--ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h4
-rw-r--r--ydb/library/yql/providers/dq/actors/worker_actor.cpp4
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp14
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp7
-rw-r--r--ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp17
7 files changed, 86 insertions, 8 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
index 37033e9251..d934f437f4 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
@@ -110,6 +110,9 @@ struct IDqComputeActorAsyncOutput {
// Checkpointing
virtual void OnAsyncOutputStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) = 0;
+ // Finishing
+ virtual void OnAsyncOutputFinished(ui64 outputIndex) = 0; // Signal that async output has successfully written its finish flag and so compute actor is ready to finish.
+
virtual ~ICallbacks() = default;
};
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 5cd9474552..4fa67082d1 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -63,8 +63,13 @@ struct TSinkCallbacks : public IDqComputeActorAsyncOutput::ICallbacks {
OnSinkStateSaved(std::move(state), outputIndex, checkpoint);
}
+ void OnAsyncOutputFinished(ui64 outputIndex) override final {
+ OnSinkFinished(outputIndex);
+ }
+
virtual void OnSinkError(ui64 outputIndex, const TIssues& issues, bool isFatal) = 0;
virtual void OnSinkStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) = 0;
+ virtual void OnSinkFinished(ui64 outputIndex) = 0;
};
struct TOutputTransformCallbacks : public IDqComputeActorAsyncOutput::ICallbacks {
@@ -76,8 +81,13 @@ struct TOutputTransformCallbacks : public IDqComputeActorAsyncOutput::ICallbacks
OnTransformStateSaved(std::move(state), outputIndex, checkpoint);
}
+ void OnAsyncOutputFinished(ui64 outputIndex) override final {
+ OnTransformFinished(outputIndex);
+ }
+
virtual void OnOutputTransformError(ui64 outputIndex, const TIssues& issues, bool isFatal) = 0;
virtual void OnTransformStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) = 0;
+ virtual void OnTransformFinished(ui64 outputIndex) = 0;
};
namespace NDetails {
@@ -440,9 +450,9 @@ protected:
CA_LOG_D("Continue execution, not all input channels are initialized");
return;
}
- if (Channels->CheckInFlight("Tasks execution finished")) {
+ if (Channels->CheckInFlight("Tasks execution finished") && AllAsyncOutputsFinished()) {
State = NDqProto::COMPUTE_STATE_FINISHED;
- CA_LOG_D("Compute state finished. All channels finished");
+ CA_LOG_D("Compute state finished. All channels and sinks finished");
ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::SUCCESS, {TIssue("success")});
}
}
@@ -626,7 +636,7 @@ public:
Channels->SendChannelDataAck(channel->GetChannelId(), channel->GetFreeSpace());
}
- ResumeExecution();
+ ContinueExecute();
}
void PeerFinished(ui64 channelId) override {
@@ -664,6 +674,16 @@ public:
Checkpoints->OnTransformStateSaved(std::move(state), outputIndex, checkpoint);
}
+ void OnSinkFinished(ui64 outputIndex) override {
+ SinksMap.at(outputIndex).FinishIsAcknowledged = true;
+ ContinueExecute();
+ }
+
+ void OnTransformFinished(ui64 outputIndex) override {
+ OutputTransformsMap.at(outputIndex).FinishIsAcknowledged = true;
+ ContinueExecute();
+ }
+
protected:
bool ReadyToCheckpoint() const override {
for (auto& [id, channelInfo] : InputChannelsMap) {
@@ -870,6 +890,7 @@ protected:
IDqComputeActorAsyncOutput* AsyncOutput = nullptr;
NActors::IActor* Actor = nullptr;
bool Finished = false; // If sink/transform is in finished state, it receives only checkpoints.
+ bool FinishIsAcknowledged = false; // Async output has acknowledged its finish.
TIssuesBuffer IssuesBuffer;
bool PopStarted = false;
i64 FreeSpaceBeforeSend = 0;
@@ -1523,6 +1544,24 @@ protected:
InternalError(NYql::NDqProto::StatusIds::EXTERNAL_ERROR, issues);
}
+ bool AllAsyncOutputsFinished() const {
+ for (const auto& [outputIndex, sinkInfo] : SinksMap) {
+ if (!sinkInfo.FinishIsAcknowledged) {
+ ui64 index = outputIndex; // Crutch for logging through lambda.
+ CA_LOG_D("Waiting finish of sink[" << index << "]");
+ return false;
+ }
+ }
+ for (const auto& [outputIndex, transformInfo] : OutputTransformsMap) {
+ if (!transformInfo.FinishIsAcknowledged) {
+ ui64 index = outputIndex; // Crutch for logging through lambda.
+ CA_LOG_D("Waiting finish of transform[" << index << "]");
+ return false;
+ }
+ }
+ return true;
+ }
+
virtual ui64 CalcMkqlMemoryLimit() {
auto& opts = Task.GetProgram().GetSettings();
return opts.GetHasMapJoin()/* || opts.GetHasSort()*/
diff --git a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h
index b58ab1ce3f..2ce1acd7aa 100644
--- a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h
+++ b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h
@@ -106,6 +106,10 @@ class TFakeActor : public NActors::TActor<TFakeActor> {
Parent.AsyncOutputPromises.StateSaved = NThreading::NewPromise<NDqProto::TSinkState>();
};
+ void OnAsyncOutputFinished(ui64 outputIndex) override {
+ Y_UNUSED(outputIndex);
+ }
+
TFakeActor& Parent;
};
diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
index 3f4534036e..41756240c6 100644
--- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
@@ -680,6 +680,10 @@ private:
SendFailure(MakeHolder<TEvDqFailure>(isFatal ? NYql::NDqProto::StatusIds::UNSPECIFIED : NYql::NDqProto::StatusIds::INTERNAL_ERROR, issues.ToString()));
}
+ void OnAsyncOutputFinished(ui64 outputIndex) override {
+ Y_UNUSED(outputIndex);
+ }
+
void OnAsyncOutputStateSaved(NDqProto::TSinkState&& state, ui64 outputIndex, const NDqProto::TCheckpoint& checkpoint) override {
Y_UNUSED(state);
Y_UNUSED(outputIndex);
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp
index ea5236e9b9..a51fcd240b 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp
@@ -117,9 +117,12 @@ public:
const TMaybe<NDqProto::TCheckpoint>& checkpoint,
bool finished) override
{
- Y_UNUSED(finished);
Y_UNUSED(dataSize);
+ if (finished) {
+ Finished = true;
+ }
+
CreateSessionIfNotExists();
for (const NUdf::TUnboxedValue& item : batch) {
@@ -282,6 +285,7 @@ private:
ShouldNotifyNewFreeSpace = false;
}
}
+ CheckFinished();
return !events.empty();
}
@@ -363,6 +367,12 @@ private:
TDqPqWriteActor& Self;
};
+ void CheckFinished() {
+ if (Finished && Buffer.empty() && WaitingAcks.empty()) {
+ Callbacks->OnAsyncOutputFinished(OutputIndex);
+ }
+ }
+
private:
const ui64 OutputIndex;
const TTxId TxId;
@@ -372,6 +382,7 @@ private:
IDqComputeActorAsyncOutput::ICallbacks* const Callbacks;
const TString LogPrefix;
i64 FreeSpace = 0;
+ bool Finished = false;
NYdb::NPersQueue::TPersQueueClient PersQueueClient;
std::shared_ptr<NYdb::NPersQueue::IWriteSession> WriteSession;
@@ -380,7 +391,6 @@ private:
ui64 ConfirmedSeqNo = 0;
std::optional<NYdb::NPersQueue::TContinuationToken> ContinuationToken;
NThreading::TFuture<void> EventFuture;
- std::queue<i64> InflightMessageSizes;
bool ShouldNotifyNewFreeSpace = false;
std::queue<TString> Buffer;
std::queue<i64> WaitingAcks; // Size of items which are waiting for acks (used to update free space)
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
index c026e8280c..2aedf58d89 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
@@ -252,7 +252,11 @@ private:
}
}
- void HandleFinished() {}
+ void HandleFinished() {
+ if (InputFinished && !InFlight && Parts.empty()) {
+ Callbacks->OnAsyncOutputFinished(OutputIndex);
+ }
+ }
// IActor & IDqComputeActorAsyncOutput
void PassAway() override { // Is called from Compute Actor
@@ -317,4 +321,3 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor(
}
} // namespace NYql::NDq
-
diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp
index 9030e75ced..f2a24eaab0 100644
--- a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp
+++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp
@@ -154,12 +154,16 @@ public:
NKikimr::NMiniKQL::TUnboxedValueVector&& batch,
i64,
const TMaybe<NDqProto::TCheckpoint>& checkpoint,
- bool) override
+ bool finished) override
{
SINK_LOG_D("Got " << batch.size() << " items to send. Checkpoint: " << checkpoint.Defined()
<< ". Send queue: " << SendingBuffer.size() << ". Inflight: " << InflightBuffer.size()
<< ". Checkpoint in progress: " << CheckpointInProgress.has_value());
+ if (finished) {
+ Finished = true;
+ }
+
ui64 metricsCount = 0;
for (const auto& item : batch) {
if (metricsCount + WriteParams.Shard.GetScheme().GetSensors().size() > MaxMetricsPerRequest) {
@@ -182,6 +186,8 @@ public:
if (FreeSpace <= 0) {
ShouldNotifyNewFreeSpace = true;
}
+
+ CheckFinished();
};
void LoadState(const NDqProto::TSinkState&) override { }
@@ -445,6 +451,8 @@ private:
if (CheckpointInProgress && InflightBuffer.empty()) {
DoCheckpoint();
}
+
+ CheckFinished();
}
void DoCheckpoint() {
@@ -452,6 +460,12 @@ private:
CheckpointInProgress = std::nullopt;
}
+ void CheckFinished() {
+ if (Finished && InflightBuffer.empty() && SendingBuffer.empty()) {
+ Callbacks->OnAsyncOutputFinished(OutputIndex);
+ }
+ }
+
private:
const ui64 OutputIndex;
const TTxId TxId;
@@ -462,6 +476,7 @@ private:
TDqSolomonWriteActorMetrics Metrics;
i64 FreeSpace = 0;
TActorId HttpProxyId;
+ bool Finished = false;
TString SourceId;
bool ShouldNotifyNewFreeSpace = false;