aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2022-09-05 12:37:11 +0300
committerhor911 <hor911@ydb.tech>2022-09-05 12:37:11 +0300
commit44e5c0e63920f9d9359f45894903c8dc26da542b (patch)
treecd32d07795b3926fc168f56b9254342c90f5b597
parentcbdfdc30a59b809a7a578d417c5b19e51afa8d07 (diff)
downloadydb-44e5c0e63920f9d9359f45894903c8dc26da542b.tar.gz
Fix early finish
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp19
1 files changed, 14 insertions, 5 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
index 47eae564c3d..6c5e30ae448 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
@@ -356,7 +356,7 @@ public:
}
void Bootstrap() {
- LOG_D("TS3WriteActor", "BootStrapped");
+ LOG_D("TS3WriteActor", "Bootstrapped");
Become(&TS3WriteActor::StateFunc);
}
@@ -409,8 +409,11 @@ private:
ins.first->second.back()->SendData(TString((Keys.empty() ? v : *v.GetElements()).AsStringRef()));
}
- if (finished)
+ if (finished) {
std::for_each(FileWriteActors.cbegin(), FileWriteActors.cend(), [](const std::pair<const TString, std::vector<TS3FileWriteActor*>>& item){ item.second.back()->Finish(); });
+ Finished = true;
+ FinishIfNeeded();
+ }
data.clear();
}
@@ -419,6 +422,13 @@ private:
Callbacks->OnAsyncOutputError(OutputIndex, result->Get()->Error, NYql::NDqProto::StatusIds::EXTERNAL_ERROR);
}
+ void FinishIfNeeded() {
+ if (FileWriteActors.empty() && Finished) {
+ LOG_D("TS3WriteActor", "Finished, notify owner");
+ Callbacks->OnAsyncOutputFinished(OutputIndex);
+ }
+ }
+
void Handle(TEvPrivate::TEvUploadFinished::TPtr& result) {
if (const auto it = FileWriteActors.find(result->Get()->Key); FileWriteActors.cend() != it) {
if (const auto ft = std::find_if(it->second.cbegin(), it->second.cend(), [&](TS3FileWriteActor* actor){ return result->Get()->Url == actor->GetUrl(); }); it->second.cend() != ft) {
@@ -428,9 +438,7 @@ private:
FileWriteActors.erase(it);
}
}
-
- if (FileWriteActors.empty())
- Callbacks->OnAsyncOutputFinished(OutputIndex);
+ FinishIfNeeded();
}
// IActor & IDqComputeActorAsyncOutput
@@ -469,6 +477,7 @@ private:
const size_t MemoryLimit;
const size_t MaxFileSize;
const TString Compression;
+ bool Finished = false;
std::unordered_map<TString, std::vector<TS3FileWriteActor*>> FileWriteActors;
};