diff options
author | a-romanov <[email protected]> | 2022-08-08 13:53:03 +0300 |
---|---|---|
committer | a-romanov <[email protected]> | 2022-08-08 13:53:03 +0300 |
commit | 30b75c56ba08c4f72729cd8bc15675948f19b66e (patch) | |
tree | a6b32c6c559aa2ee90fe5424492db99f8079204a | |
parent | 2f00a4247cb909db4b10114100645e037fe114dc (diff) |
Parse error on read from S3.
6 files changed, 221 insertions, 76 deletions
diff --git a/contrib/restricted/boost/qvm/LICENSE_1_0.txt b/contrib/restricted/boost/qvm/LICENSE_1_0.txt new file mode 100644 index 00000000000..36b7cd93cdf --- /dev/null +++ b/contrib/restricted/boost/qvm/LICENSE_1_0.txt @@ -0,0 +1,23 @@ +Boost Software License - Version 1.0 - August 17th, 2003 + +Permission is hereby granted, free of charge, to any person or organization +obtaining a copy of the software and accompanying documentation covered by +this license (the "Software") to use, reproduce, display, distribute, +execute, and transmit the Software, and to prepare derivative works of the +Software, and to permit third-parties to whom the Software is furnished to +do so, all subject to the following: + +The copyright notices in the Software and this entire statement, including +the above license grant, this restriction and the following disclaimer, +must be included in all copies of the Software, in whole or in part, and +all derivative works of the Software, unless such copies or derivative +works are solely in the form of machine-executable object code generated by +a source language processor. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT +SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE +FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE, +ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/contrib/restricted/boost/qvm/README.md b/contrib/restricted/boost/qvm/README.md index 1a6d6137ade..0205b27c7d9 100644 --- a/contrib/restricted/boost/qvm/README.md +++ b/contrib/restricted/boost/qvm/README.md @@ -1 +1,33 @@ -Boost QVM is a generic library for working with Quaternions, Vectors and Matrices. For complete documentation, see http://boostorg.github.io/qvm/.
\ No newline at end of file +# QVM + +> A generic C++ library for working with `Q`uaternions, `V`ectors and `M`atrices. + +## Documentation + +https://boostorg.github.io/qvm/ + +## Features + +* Emphasis on 2, 3 and 4-dimensional operations needed in graphics, video games and simulation applications. +* Free function templates operate on any compatible user-defined Quaternion, Vector or Matrix type. +* Enables Quaternion, Vector and Matrix types from different libraries to be safely mixed in the same expression. +* Type-safe mapping between compatible lvalue types with no temporary objects; f.ex. transpose remaps the access to the elements, rather than transforming the matrix. +* Requires only {CPP}03. +* Zero dependencies. + +## Support + +* [cpplang on Slack](https://Cpplang.slack.com) (use the `#boost` channel) +* [Boost Users Mailing List](https://lists.boost.org/mailman/listinfo.cgi/boost-users) +* [Boost Developers Mailing List](https://lists.boost.org/mailman/listinfo.cgi/boost) + +## Distribution + +Besides GitHub, there are two other distribution channels: + +* QVM is included in official [Boost](https://www.boost.org/) releases. +* For maximum portability, the library is also available in single-header format, in two variants (direct download links): + * [qvm.hpp](https://boostorg.github.io/qvm/qvm.hpp): single header containing the complete QVM source, including the complete set of swizzling overloads. + * [qvm_lite.hpp](https://boostorg.github.io/qvm/qvm_lite.hpp): single header containing everything except for the swizzling overloads. + +Copyright 2008-2022 Emil Dotchevski and Reverge Studios, Inc. Distributed under the [Boost Software License, Version 1.0](http://www.boost.org/LICENSE_1_0.txt). diff --git a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp index 5dd2b295d8e..7aec0935635 100644 --- a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp @@ -73,6 +73,7 @@ public: TString , THeaders , std::size_t , + TOnDownloadStart , TOnNewDataPart , TOnDownloadFinish ) final { return {}; diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp index 1b244f983b8..2d1512596de 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp @@ -200,12 +200,30 @@ public: using TPtr = std::shared_ptr<TEasyCurlStream>; using TWeakPtr = std::weak_ptr<TEasyCurlStream>; - TEasyCurlStream(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, IHTTPGateway::THeaders headers, size_t offset, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish) - : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, EMethod::GET, offset), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL)) + TEasyCurlStream( + const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, + const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, + const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, + TString url, IHTTPGateway::THeaders headers, + size_t offset, + IHTTPGateway::TOnDownloadStart onStart, + IHTTPGateway::TOnNewDataPart onNewData, + IHTTPGateway::TOnDownloadFinish onFinish) + : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, EMethod::GET, offset), OnStart(std::move(onStart)), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL)) {} - static TPtr Make(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, IHTTPGateway::THeaders headers, size_t offset, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish) { - return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, uploadededBytes, std::move(url), std::move(headers), offset, std::move(onNewData), std::move(onFinish)); + static TPtr Make( + const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, + const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, + const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, + TString url, + IHTTPGateway::THeaders headers, + size_t offset, + IHTTPGateway::TOnDownloadStart onStart, + IHTTPGateway::TOnNewDataPart onNewData, + IHTTPGateway::TOnDownloadFinish onFinish) + { + return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, uploadededBytes, std::move(url), std::move(headers), offset, std::move(onStart), std::move(onNewData), std::move(onFinish)); } enum class EAction : i8 { @@ -242,15 +260,22 @@ private: return OnFinish(TIssues{error}); } - void Done(CURLcode result, long httpResponseCode) final { + void Done(CURLcode result, long) final { if (CURLE_OK != result) return Fail(TIssue(curl_easy_strerror(result))); Working = false; - return OnFinish(httpResponseCode); + return OnFinish(TIssues()); } size_t Write(void* contents, size_t size, size_t nmemb) final { + if (!StreamStarted) { + StreamStarted = true; + long httpResponseCode = 0L; + curl_easy_getinfo(GetHandle(), CURLINFO_RESPONSE_CODE, &httpResponseCode); + OnStart(httpResponseCode); + } + const auto realsize = size * nmemb; Position += realsize; OnNewData(IHTTPGateway::TCountedContent(TString(static_cast<char*>(contents), realsize), Counter)); @@ -259,12 +284,15 @@ private: size_t Read(char*, size_t, size_t) final { return 0ULL; } + const IHTTPGateway::TOnDownloadStart OnStart; const IHTTPGateway::TOnNewDataPart OnNewData; const IHTTPGateway::TOnDownloadFinish OnFinish; + const std::shared_ptr<std::atomic_size_t> Counter; bool Working = false; size_t Position = 0ULL; bool Cancelled = false; + bool StreamStarted = false; }; using TKeyType = std::tuple<TString, size_t, IHTTPGateway::THeaders, TString, IRetryPolicy<long>::TPtr>; @@ -540,10 +568,11 @@ private: TString url, THeaders headers, size_t offset, + TOnDownloadStart onStart, TOnNewDataPart onNewData, TOnDownloadFinish onFinish) final { - auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, std::move(onNewData), std::move(onFinish)); + auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, std::move(onStart), std::move(onNewData), std::move(onFinish)); const std::unique_lock lock(Sync); const auto handle = stream->GetHandle(); TEasyCurlStream::TWeakPtr weak = stream; diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h index 2f80cb7d801..8399029aff6 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h @@ -94,14 +94,16 @@ public: const std::shared_ptr<std::atomic_size_t> Counter; }; + using TOnDownloadStart = std::function<void(long)>; // http code. using TOnNewDataPart = std::function<void(TCountedContent&&)>; - using TOnDownloadFinish = std::function<void(std::variant<long, TIssues>)>; // http code or issues. + using TOnDownloadFinish = std::function<void(TIssues)>; using TCancelHook = std::function<void(TIssue)>; virtual TCancelHook Download( TString url, THeaders headers, std::size_t offset, + TOnDownloadStart onStart, TOnNewDataPart onNewData, TOnDownloadFinish onFinish) = 0; }; diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 1660aa58913..312092e51a7 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -47,6 +47,11 @@ #include <queue> +#ifdef THROW +#undef THROW +#endif +#include <library/cpp/xml/document/xml-document.h> + namespace NYql::NDq { using namespace ::NActors; @@ -60,6 +65,7 @@ struct TEvPrivate { EvBegin = EventSpaceBegin(TEvents::ES_PRIVATE), EvReadResult = EvBegin, + EvReadStarted, EvReadFinished, EvReadError, EvRetry, @@ -82,11 +88,13 @@ struct TEvPrivate { IHTTPGateway::TCountedContent Result; }; - struct TEvReadFinished : public TEventLocal<TEvReadFinished, EvReadFinished> { - explicit TEvReadFinished(long httpResponseCode) : HttpResponseCode(httpResponseCode) {} + struct TEvReadStarted : public TEventLocal<TEvReadStarted, EvReadStarted> { + explicit TEvReadStarted(long httpResponseCode) : HttpResponseCode(httpResponseCode) {} const long HttpResponseCode; }; + struct TEvReadFinished : public TEventLocal<TEvReadFinished, EvReadFinished> {}; + struct TEvReadError : public TEventLocal<TEvReadError, EvReadError> { TEvReadError(TIssues&& error, size_t pathInd = std::numeric_limits<size_t>::max()) : Error(std::move(error)), PathIndex(pathInd) {} const TIssues Error; @@ -247,6 +255,36 @@ struct TReadSpec { TString Format, Compression; }; +struct TRetryStuff { + using TPtr = std::shared_ptr<TRetryStuff>; + + TRetryStuff( + IHTTPGateway::TPtr gateway, + TString url, + const IHTTPGateway::THeaders& headers, + std::size_t expectedSize + ) : Gateway(std::move(gateway)), Url(std::move(url)), Headers(headers), ExpectedSize(expectedSize), Offset(0U), RetryState(GetS3RetryPolicy()->CreateRetryState()) + {} + + const IHTTPGateway::TPtr Gateway; + const TString Url; + const IHTTPGateway::THeaders Headers; + const std::size_t ExpectedSize; + + std::size_t Offset = 0U; + const IRetryPolicy<long>::IRetryState::TPtr RetryState; + IHTTPGateway::TCancelHook CancelHook; + TMaybe<TDuration> NextRetryDelay; + + void Cancel() { + NextRetryDelay = {}; + if (const auto cancelHook = std::move(CancelHook)) { + CancelHook = {}; + cancelHook(TIssue("Request cancelled.")); + } + } +}; + class TS3ReadCoroImpl : public TActorCoroImpl { private: class TReadBufferFromStream : public NDB::ReadBuffer { @@ -267,27 +305,44 @@ private: TS3ReadCoroImpl *const Coro; TString Value; }; + + static constexpr std::string_view TruncatedSuffix = "... [truncated]"sv; public: - TS3ReadCoroImpl(ui64 inputIndex, const NActors::TActorId& sourceActorId, const NActors::TActorId& computeActorId, const TReadSpec::TPtr& readSpec, size_t pathIndex, const TString& path) - : TActorCoroImpl(256_KB), InputIndex(inputIndex), ReadSpec(readSpec), SourceActorId(sourceActorId), ComputeActorId(computeActorId), PathIndex(pathIndex), Path(path) + TS3ReadCoroImpl(ui64 inputIndex, const NActors::TActorId& computeActorId, const TRetryStuff::TPtr& retryStuff, const TReadSpec::TPtr& readSpec, size_t pathIndex, const TString& path) + : TActorCoroImpl(256_KB), InputIndex(inputIndex), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId), PathIndex(pathIndex), Path(path) {} bool Next(TString& value) { - if (HttpResponseCode) + if (InputFinished) return false; - const auto ev = WaitForSpecificEvent<TEvPrivate::TEvDataPart, TEvPrivate::TEvReadError, TEvPrivate::TEvReadFinished>(); + const auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadStarted, TEvPrivate::TEvDataPart, TEvPrivate::TEvReadError, TEvPrivate::TEvReadFinished>(); switch (const auto etype = ev->GetTypeRewrite()) { + case TEvPrivate::TEvReadStarted::EventType: + ErrorText.clear(); + Issues.Clear(); + value.clear(); + RetryStuff->NextRetryDelay = RetryStuff->RetryState->GetNextRetryDelay(HttpResponseCode = ev->Get<TEvPrivate::TEvReadStarted>()->HttpResponseCode); + return true; case TEvPrivate::TEvReadFinished::EventType: - HttpResponseCode = ev->Get<TEvPrivate::TEvReadFinished>()->HttpResponseCode; + InputFinished = true; return false; case TEvPrivate::TEvReadError::EventType: - HttpResponseCode = 0L; - Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, ev->Get<TEvPrivate::TEvReadError>()->Error, true)); + InputFinished = true; + Issues = std::move(ev->Get<TEvPrivate::TEvReadError>()->Error); return false; case TEvPrivate::TEvDataPart::EventType: - value = ev->Get<TEvPrivate::TEvDataPart>()->Result.Extract(); - Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex)); + if (200L == HttpResponseCode || 206L == HttpResponseCode) { + value = ev->Get<TEvPrivate::TEvDataPart>()->Result.Extract(); + RetryStuff->Offset += value.size(); + Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex)); + } else if (HttpResponseCode && !RetryStuff->NextRetryDelay) { + if (ErrorText.size() < 256_KB) + ErrorText.append(ev->Get<TEvPrivate::TEvDataPart>()->Result.Extract()); + else if (!ErrorText.EndsWith(TruncatedSuffix)) + ErrorText.append(TruncatedSuffix); + value.clear(); + } return true; default: return false; @@ -295,17 +350,16 @@ public: } private: void WaitFinish() { - if (HttpResponseCode) + if (InputFinished) return; const auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadError, TEvPrivate::TEvReadFinished>(); + InputFinished = true; switch (const auto etype = ev->GetTypeRewrite()) { case TEvPrivate::TEvReadFinished::EventType: - HttpResponseCode = ev->Get<TEvPrivate::TEvReadFinished>()->HttpResponseCode; break; case TEvPrivate::TEvReadError::EventType: - HttpResponseCode = 0L; - Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, ev->Get<TEvPrivate::TEvReadError>()->Error, true)); + Issues = std::move(ev->Get<TEvPrivate::TEvReadError>()->Error); break; default: break; @@ -319,14 +373,36 @@ private: NDB::InputStreamFromInputFormat stream(NDB::FormatFactory::instance().getInputFormat(ReadSpec->Format, decompress ? *decompress : buffer, NDB::Block(ReadSpec->Columns), nullptr, 1_MB, ReadSpec->Settings)); while (auto block = stream.read()) - Send(SourceActorId, new TEvPrivate::TEvNextBlock(block, PathIndex)); + Send(ParentActorId, new TEvPrivate::TEvNextBlock(block, PathIndex)); WaitFinish(); - if (*HttpResponseCode) - Send(SourceActorId, new TEvPrivate::TEvReadFinished(*HttpResponseCode)); + if (!ErrorText.empty()) { + TStringBuilder str; + + if (HttpResponseCode) + str << "HTTP response code: " << HttpResponseCode << ", "; + + if (ErrorText.StartsWith("<?xml") && !ErrorText.EndsWith(TruncatedSuffix)) { + const NXml::TDocument xml(ErrorText, NXml::TDocument::String); + if (const auto& root = xml.Root(); root.Name() == "Error") { + const auto& code = root.Node("Code", true).Value<TString>(); + const auto& message = root.Node("Message", true).Value<TString>(); + str << message << ", error code: " << code; + } else + str << ErrorText; + } else + str << ErrorText; + + Issues.AddIssues({TIssue(str)}); + } + + if (Issues) + Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, std::move(Issues), true)); + else + Send(ParentActorId, new TEvPrivate::TEvReadFinished); } catch (const TDtorException&) { - return; + return RetryStuff->Cancel(); } catch (const std::exception& err) { Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, TIssues{TIssue(TStringBuilder() << "Error while reading file " << Path << ", details: " << err.what())}, true)); return; @@ -337,72 +413,53 @@ private: } private: const ui64 InputIndex; + const TRetryStuff::TPtr RetryStuff; const TReadSpec::TPtr ReadSpec; const TString Format, RowType, Compression; - const NActors::TActorId SourceActorId; const NActors::TActorId ComputeActorId; const size_t PathIndex; const TString Path; - std::optional<long> HttpResponseCode; + + bool InputFinished = false; + long HttpResponseCode = 0L; + TString ErrorText; + TIssues Issues; }; class TS3ReadCoroActor : public TActorCoro { - struct TRetryStuff { - using TPtr = std::shared_ptr<TRetryStuff>; - - TRetryStuff( - IHTTPGateway::TPtr gateway, - TString url, - const IHTTPGateway::THeaders& headers, - std::size_t expectedSize - ) : Gateway(std::move(gateway)), Url(std::move(url)), Headers(headers), ExpectedSize(expectedSize), Offset(0U), RetryState(GetS3RetryPolicy()->CreateRetryState()) - {} - - const IHTTPGateway::TPtr Gateway; - const TString Url; - const IHTTPGateway::THeaders Headers; - const std::size_t ExpectedSize; - - std::size_t Offset = 0U; - const IRetryPolicy<long>::IRetryState::TPtr RetryState; - }; public: - TS3ReadCoroActor(THolder<TS3ReadCoroImpl> impl, - IHTTPGateway::TPtr gateway, - const TString& url, - const IHTTPGateway::THeaders& headers, - const TString& path, - const std::size_t expectedSize) + TS3ReadCoroActor(THolder<TS3ReadCoroImpl> impl, TRetryStuff::TPtr retryStuff) : TActorCoro(THolder<TActorCoroImpl>(impl.Release())) - , RetryStuff(std::make_shared<TRetryStuff>(std::move(gateway), url + path, headers, expectedSize)) + , RetryStuff(std::move(retryStuff)) {} private: - static void OnNewData(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, const TRetryStuff::TPtr& retryStuff, IHTTPGateway::TCountedContent&& data) { - retryStuff->Offset += data.size(); + static void OnDownloadStart(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, long httpResponseCode) { + actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadStarted(httpResponseCode))); + } + + static void OnNewData(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, IHTTPGateway::TCountedContent&& data) { actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvDataPart(std::move(data)))); } - static void OnDownloadFinished(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, const TRetryStuff::TPtr& retryStuff, std::variant<long, TIssues> result) { - switch (result.index()) { - case 0U: - if (const auto httpCode = std::get<long>(result); const auto nextDelayMs = retryStuff->RetryState->GetNextRetryDelay(httpCode)) - actorSystem->Schedule(*nextDelayMs, new IEventHandle(parent, self, new TEvPrivate::TEvRetryEventFunc(std::bind(&TS3ReadCoroActor::DownloadStart, retryStuff, self, parent)))); - else - actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadFinished(httpCode))); - break; - case 1U: - if (const auto nextDelayMs = retryStuff->RetryState->GetNextRetryDelay(0L)) - actorSystem->Schedule(*nextDelayMs, new IEventHandle(parent, self, new TEvPrivate::TEvRetryEventFunc(std::bind(&TS3ReadCoroActor::DownloadStart, retryStuff, self, parent)))); - else - actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadError(std::get<TIssues>(std::move(result))))); - break; + static void OnDownloadFinished(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, const TRetryStuff::TPtr& retryStuff, TIssues issues) { + if (issues) { + if (retryStuff->NextRetryDelay = retryStuff->RetryState->GetNextRetryDelay(0L); !retryStuff->NextRetryDelay) { + actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadError(std::move(issues)))); + return; + } } + + if (retryStuff->NextRetryDelay) + actorSystem->Schedule(*retryStuff->NextRetryDelay, new IEventHandle(parent, self, new TEvPrivate::TEvRetryEventFunc(std::bind(&TS3ReadCoroActor::DownloadStart, retryStuff, self, parent)))); + else + actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadFinished)); } static void DownloadStart(const TRetryStuff::TPtr& retryStuff, const TActorId& self, const TActorId& parent) { - retryStuff->Gateway->Download(retryStuff->Url, + retryStuff->CancelHook = retryStuff->Gateway->Download(retryStuff->Url, retryStuff->Headers, retryStuff->Offset, - std::bind(&TS3ReadCoroActor::OnNewData, TActivationContext::ActorSystem(), self, parent, retryStuff, std::placeholders::_1), + std::bind(&TS3ReadCoroActor::OnDownloadStart, TActivationContext::ActorSystem(), self, parent, std::placeholders::_1), + std::bind(&TS3ReadCoroActor::OnNewData, TActivationContext::ActorSystem(), self, parent, std::placeholders::_1), std::bind(&TS3ReadCoroActor::OnDownloadFinished, TActivationContext::ActorSystem(), self, parent, retryStuff, std::placeholders::_1)); } @@ -448,8 +505,9 @@ public: Become(&TS3StreamReadActor::StateFunc); for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) { const TPath& path = Paths[pathInd]; - auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, SelfId(), ComputeActorId, ReadSpec, pathInd + StartPathIndex, std::get<TString>(path)); - RegisterWithSameMailbox(MakeHolder<TS3ReadCoroActor>(std::move(impl), Gateway, Url, Headers, std::get<TString>(path), std::get<std::size_t>(path)).Release()); + auto stuff = std::make_shared<TRetryStuff>(Gateway, Url + std::get<TString>(path), Headers, std::get<std::size_t>(path)); + auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, ComputeActorId, stuff, ReadSpec, pathInd + StartPathIndex, std::get<TString>(path)); + RegisterWithSameMailbox(std::make_unique<TS3ReadCoroActor>(std::move(impl), std::move(stuff)).release()); } } |