diff options
author | bbiff <bbiff@yandex-team.com> | 2022-09-05 17:12:07 +0300 |
---|---|---|
committer | bbiff <bbiff@yandex-team.com> | 2022-09-05 17:12:07 +0300 |
commit | 2a5400d4e3fc4b33e4217f4bff5ccdf452365d64 (patch) | |
tree | c735a1297a4063654324ddc0d156df7c977795ce | |
parent | 23c28cea084e95991be3d100459b8b0eeb939c17 (diff) | |
download | ydb-2a5400d4e3fc4b33e4217f4bff5ccdf452365d64.tar.gz |
cancel downloads on read actor finalizing
-rw-r--r-- | ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp | 8 |
1 files changed, 7 insertions, 1 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 7174954b93..96f1ea5b84 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -394,8 +394,9 @@ private: YQL_ENSURE(ReadSpec->Compression.empty() == !decompress, "Unsupported " << ReadSpec->Compression << " compression."); NDB::InputStreamFromInputFormat stream(NDB::FormatFactory::instance().getInputFormat(ReadSpec->Format, decompress ? *decompress : buffer, NDB::Block(ReadSpec->Columns), nullptr, 1_MB, ReadSpec->Settings)); - while (auto block = stream.read()) + while (auto block = stream.read()) { Send(ParentActorId, new TEvPrivate::TEvNextBlock(block, PathIndex)); + } } catch (const std::exception& err) { exceptIssue.Message = TStringBuilder() << "Error while reading file " << Path << ", details: " << err.what(); fatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST; @@ -541,6 +542,7 @@ public: for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) { const TPath& path = Paths[pathInd]; auto stuff = std::make_shared<TRetryStuff>(Gateway, Url + std::get<TString>(path), Headers, std::get<std::size_t>(path)); + RetryStuffForFile.push_back(stuff); auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, ComputeActorId, stuff, ReadSpec, pathInd + StartPathIndex, std::get<TString>(path)); RegisterWithSameMailbox(std::make_unique<TS3ReadCoroActor>(std::move(impl), std::move(stuff)).release()); } @@ -603,6 +605,9 @@ private: // IActor & IDqComputeActorAsyncInput void PassAway() override { // Is called from Compute Actor + for (auto stuff: RetryStuffForFile) { + stuff->Cancel(); + } ContainerCache.Clear(); TActorBootstrapped<TS3StreamReadActor>::PassAway(); } @@ -636,6 +641,7 @@ private: } const IHTTPGateway::TPtr Gateway; + std::vector<TRetryStuff::TPtr> RetryStuffForFile; const THolderFactory& HolderFactory; TPlainContainerCache ContainerCache; |