diff options
author | hor911 <hor911@ydb.tech> | 2022-09-05 12:37:11 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2022-09-05 12:37:11 +0300 |
commit | 44e5c0e63920f9d9359f45894903c8dc26da542b (patch) | |
tree | cd32d07795b3926fc168f56b9254342c90f5b597 | |
parent | cbdfdc30a59b809a7a578d417c5b19e51afa8d07 (diff) | |
download | ydb-44e5c0e63920f9d9359f45894903c8dc26da542b.tar.gz |
Fix early finish
-rw-r--r-- | ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp | 19 |
1 files changed, 14 insertions, 5 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index 47eae564c3d..6c5e30ae448 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -356,7 +356,7 @@ public: } void Bootstrap() { - LOG_D("TS3WriteActor", "BootStrapped"); + LOG_D("TS3WriteActor", "Bootstrapped"); Become(&TS3WriteActor::StateFunc); } @@ -409,8 +409,11 @@ private: ins.first->second.back()->SendData(TString((Keys.empty() ? v : *v.GetElements()).AsStringRef())); } - if (finished) + if (finished) { std::for_each(FileWriteActors.cbegin(), FileWriteActors.cend(), [](const std::pair<const TString, std::vector<TS3FileWriteActor*>>& item){ item.second.back()->Finish(); }); + Finished = true; + FinishIfNeeded(); + } data.clear(); } @@ -419,6 +422,13 @@ private: Callbacks->OnAsyncOutputError(OutputIndex, result->Get()->Error, NYql::NDqProto::StatusIds::EXTERNAL_ERROR); } + void FinishIfNeeded() { + if (FileWriteActors.empty() && Finished) { + LOG_D("TS3WriteActor", "Finished, notify owner"); + Callbacks->OnAsyncOutputFinished(OutputIndex); + } + } + void Handle(TEvPrivate::TEvUploadFinished::TPtr& result) { if (const auto it = FileWriteActors.find(result->Get()->Key); FileWriteActors.cend() != it) { if (const auto ft = std::find_if(it->second.cbegin(), it->second.cend(), [&](TS3FileWriteActor* actor){ return result->Get()->Url == actor->GetUrl(); }); it->second.cend() != ft) { @@ -428,9 +438,7 @@ private: FileWriteActors.erase(it); } } - - if (FileWriteActors.empty()) - Callbacks->OnAsyncOutputFinished(OutputIndex); + FinishIfNeeded(); } // IActor & IDqComputeActorAsyncOutput @@ -469,6 +477,7 @@ private: const size_t MemoryLimit; const size_t MaxFileSize; const TString Compression; + bool Finished = false; std::unordered_map<TString, std::vector<TS3FileWriteActor*>> FileWriteActors; }; |