aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbbiff <bbiff@yandex-team.com>2022-09-05 17:12:07 +0300
committerbbiff <bbiff@yandex-team.com>2022-09-05 17:12:07 +0300
commit2a5400d4e3fc4b33e4217f4bff5ccdf452365d64 (patch)
treec735a1297a4063654324ddc0d156df7c977795ce
parent23c28cea084e95991be3d100459b8b0eeb939c17 (diff)
downloadydb-2a5400d4e3fc4b33e4217f4bff5ccdf452365d64.tar.gz
cancel downloads on read actor finalizing
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp8
1 files changed, 7 insertions, 1 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 7174954b93..96f1ea5b84 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -394,8 +394,9 @@ private:
YQL_ENSURE(ReadSpec->Compression.empty() == !decompress, "Unsupported " << ReadSpec->Compression << " compression.");
NDB::InputStreamFromInputFormat stream(NDB::FormatFactory::instance().getInputFormat(ReadSpec->Format, decompress ? *decompress : buffer, NDB::Block(ReadSpec->Columns), nullptr, 1_MB, ReadSpec->Settings));
- while (auto block = stream.read())
+ while (auto block = stream.read()) {
Send(ParentActorId, new TEvPrivate::TEvNextBlock(block, PathIndex));
+ }
} catch (const std::exception& err) {
exceptIssue.Message = TStringBuilder() << "Error while reading file " << Path << ", details: " << err.what();
fatalCode = NYql::NDqProto::StatusIds::BAD_REQUEST;
@@ -541,6 +542,7 @@ public:
for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) {
const TPath& path = Paths[pathInd];
auto stuff = std::make_shared<TRetryStuff>(Gateway, Url + std::get<TString>(path), Headers, std::get<std::size_t>(path));
+ RetryStuffForFile.push_back(stuff);
auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, ComputeActorId, stuff, ReadSpec, pathInd + StartPathIndex, std::get<TString>(path));
RegisterWithSameMailbox(std::make_unique<TS3ReadCoroActor>(std::move(impl), std::move(stuff)).release());
}
@@ -603,6 +605,9 @@ private:
// IActor & IDqComputeActorAsyncInput
void PassAway() override { // Is called from Compute Actor
+ for (auto stuff: RetryStuffForFile) {
+ stuff->Cancel();
+ }
ContainerCache.Clear();
TActorBootstrapped<TS3StreamReadActor>::PassAway();
}
@@ -636,6 +641,7 @@ private:
}
const IHTTPGateway::TPtr Gateway;
+ std::vector<TRetryStuff::TPtr> RetryStuffForFile;
const THolderFactory& HolderFactory;
TPlainContainerCache ContainerCache;