summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <[email protected]>2022-08-08 13:53:03 +0300
committera-romanov <[email protected]>2022-08-08 13:53:03 +0300
commit30b75c56ba08c4f72729cd8bc15675948f19b66e (patch)
treea6b32c6c559aa2ee90fe5424492db99f8079204a
parent2f00a4247cb909db4b10114100645e037fe114dc (diff)
Parse error on read from S3.
-rw-r--r--contrib/restricted/boost/qvm/LICENSE_1_0.txt23
-rw-r--r--contrib/restricted/boost/qvm/README.md34
-rw-r--r--ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp1
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp43
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h4
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp192
6 files changed, 221 insertions, 76 deletions
diff --git a/contrib/restricted/boost/qvm/LICENSE_1_0.txt b/contrib/restricted/boost/qvm/LICENSE_1_0.txt
new file mode 100644
index 00000000000..36b7cd93cdf
--- /dev/null
+++ b/contrib/restricted/boost/qvm/LICENSE_1_0.txt
@@ -0,0 +1,23 @@
+Boost Software License - Version 1.0 - August 17th, 2003
+
+Permission is hereby granted, free of charge, to any person or organization
+obtaining a copy of the software and accompanying documentation covered by
+this license (the "Software") to use, reproduce, display, distribute,
+execute, and transmit the Software, and to prepare derivative works of the
+Software, and to permit third-parties to whom the Software is furnished to
+do so, all subject to the following:
+
+The copyright notices in the Software and this entire statement, including
+the above license grant, this restriction and the following disclaimer,
+must be included in all copies of the Software, in whole or in part, and
+all derivative works of the Software, unless such copies or derivative
+works are solely in the form of machine-executable object code generated by
+a source language processor.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
+SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
+FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
+ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+DEALINGS IN THE SOFTWARE.
diff --git a/contrib/restricted/boost/qvm/README.md b/contrib/restricted/boost/qvm/README.md
index 1a6d6137ade..0205b27c7d9 100644
--- a/contrib/restricted/boost/qvm/README.md
+++ b/contrib/restricted/boost/qvm/README.md
@@ -1 +1,33 @@
-Boost QVM is a generic library for working with Quaternions, Vectors and Matrices. For complete documentation, see http://boostorg.github.io/qvm/. \ No newline at end of file
+# QVM
+
+> A generic C++ library for working with `Q`uaternions, `V`ectors and `M`atrices.
+
+## Documentation
+
+https://boostorg.github.io/qvm/
+
+## Features
+
+* Emphasis on 2, 3 and 4-dimensional operations needed in graphics, video games and simulation applications.
+* Free function templates operate on any compatible user-defined Quaternion, Vector or Matrix type.
+* Enables Quaternion, Vector and Matrix types from different libraries to be safely mixed in the same expression.
+* Type-safe mapping between compatible lvalue types with no temporary objects; f.ex. transpose remaps the access to the elements, rather than transforming the matrix.
+* Requires only {CPP}03.
+* Zero dependencies.
+
+## Support
+
+* [cpplang on Slack](https://Cpplang.slack.com) (use the `#boost` channel)
+* [Boost Users Mailing List](https://lists.boost.org/mailman/listinfo.cgi/boost-users)
+* [Boost Developers Mailing List](https://lists.boost.org/mailman/listinfo.cgi/boost)
+
+## Distribution
+
+Besides GitHub, there are two other distribution channels:
+
+* QVM is included in official [Boost](https://www.boost.org/) releases.
+* For maximum portability, the library is also available in single-header format, in two variants (direct download links):
+ * [qvm.hpp](https://boostorg.github.io/qvm/qvm.hpp): single header containing the complete QVM source, including the complete set of swizzling overloads.
+ * [qvm_lite.hpp](https://boostorg.github.io/qvm/qvm_lite.hpp): single header containing everything except for the swizzling overloads.
+
+Copyright 2008-2022 Emil Dotchevski and Reverge Studios, Inc. Distributed under the [Boost Software License, Version 1.0](http://www.boost.org/LICENSE_1_0.txt).
diff --git a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp
index 5dd2b295d8e..7aec0935635 100644
--- a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp
@@ -73,6 +73,7 @@ public:
TString ,
THeaders ,
std::size_t ,
+ TOnDownloadStart ,
TOnNewDataPart ,
TOnDownloadFinish ) final {
return {};
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
index 1b244f983b8..2d1512596de 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
@@ -200,12 +200,30 @@ public:
using TPtr = std::shared_ptr<TEasyCurlStream>;
using TWeakPtr = std::weak_ptr<TEasyCurlStream>;
- TEasyCurlStream(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, IHTTPGateway::THeaders headers, size_t offset, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish)
- : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, EMethod::GET, offset), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL))
+ TEasyCurlStream(
+ const ::NMonitoring::TDynamicCounters::TCounterPtr& counter,
+ const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes,
+ const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes,
+ TString url, IHTTPGateway::THeaders headers,
+ size_t offset,
+ IHTTPGateway::TOnDownloadStart onStart,
+ IHTTPGateway::TOnNewDataPart onNewData,
+ IHTTPGateway::TOnDownloadFinish onFinish)
+ : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, EMethod::GET, offset), OnStart(std::move(onStart)), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL))
{}
- static TPtr Make(const ::NMonitoring::TDynamicCounters::TCounterPtr& counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, IHTTPGateway::THeaders headers, size_t offset, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish) {
- return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, uploadededBytes, std::move(url), std::move(headers), offset, std::move(onNewData), std::move(onFinish));
+ static TPtr Make(
+ const ::NMonitoring::TDynamicCounters::TCounterPtr& counter,
+ const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes,
+ const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes,
+ TString url,
+ IHTTPGateway::THeaders headers,
+ size_t offset,
+ IHTTPGateway::TOnDownloadStart onStart,
+ IHTTPGateway::TOnNewDataPart onNewData,
+ IHTTPGateway::TOnDownloadFinish onFinish)
+ {
+ return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, uploadededBytes, std::move(url), std::move(headers), offset, std::move(onStart), std::move(onNewData), std::move(onFinish));
}
enum class EAction : i8 {
@@ -242,15 +260,22 @@ private:
return OnFinish(TIssues{error});
}
- void Done(CURLcode result, long httpResponseCode) final {
+ void Done(CURLcode result, long) final {
if (CURLE_OK != result)
return Fail(TIssue(curl_easy_strerror(result)));
Working = false;
- return OnFinish(httpResponseCode);
+ return OnFinish(TIssues());
}
size_t Write(void* contents, size_t size, size_t nmemb) final {
+ if (!StreamStarted) {
+ StreamStarted = true;
+ long httpResponseCode = 0L;
+ curl_easy_getinfo(GetHandle(), CURLINFO_RESPONSE_CODE, &httpResponseCode);
+ OnStart(httpResponseCode);
+ }
+
const auto realsize = size * nmemb;
Position += realsize;
OnNewData(IHTTPGateway::TCountedContent(TString(static_cast<char*>(contents), realsize), Counter));
@@ -259,12 +284,15 @@ private:
size_t Read(char*, size_t, size_t) final { return 0ULL; }
+ const IHTTPGateway::TOnDownloadStart OnStart;
const IHTTPGateway::TOnNewDataPart OnNewData;
const IHTTPGateway::TOnDownloadFinish OnFinish;
+
const std::shared_ptr<std::atomic_size_t> Counter;
bool Working = false;
size_t Position = 0ULL;
bool Cancelled = false;
+ bool StreamStarted = false;
};
using TKeyType = std::tuple<TString, size_t, IHTTPGateway::THeaders, TString, IRetryPolicy<long>::TPtr>;
@@ -540,10 +568,11 @@ private:
TString url,
THeaders headers,
size_t offset,
+ TOnDownloadStart onStart,
TOnNewDataPart onNewData,
TOnDownloadFinish onFinish) final
{
- auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, std::move(onNewData), std::move(onFinish));
+ auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, std::move(onStart), std::move(onNewData), std::move(onFinish));
const std::unique_lock lock(Sync);
const auto handle = stream->GetHandle();
TEasyCurlStream::TWeakPtr weak = stream;
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
index 2f80cb7d801..8399029aff6 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
@@ -94,14 +94,16 @@ public:
const std::shared_ptr<std::atomic_size_t> Counter;
};
+ using TOnDownloadStart = std::function<void(long)>; // http code.
using TOnNewDataPart = std::function<void(TCountedContent&&)>;
- using TOnDownloadFinish = std::function<void(std::variant<long, TIssues>)>; // http code or issues.
+ using TOnDownloadFinish = std::function<void(TIssues)>;
using TCancelHook = std::function<void(TIssue)>;
virtual TCancelHook Download(
TString url,
THeaders headers,
std::size_t offset,
+ TOnDownloadStart onStart,
TOnNewDataPart onNewData,
TOnDownloadFinish onFinish) = 0;
};
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 1660aa58913..312092e51a7 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -47,6 +47,11 @@
#include <queue>
+#ifdef THROW
+#undef THROW
+#endif
+#include <library/cpp/xml/document/xml-document.h>
+
namespace NYql::NDq {
using namespace ::NActors;
@@ -60,6 +65,7 @@ struct TEvPrivate {
EvBegin = EventSpaceBegin(TEvents::ES_PRIVATE),
EvReadResult = EvBegin,
+ EvReadStarted,
EvReadFinished,
EvReadError,
EvRetry,
@@ -82,11 +88,13 @@ struct TEvPrivate {
IHTTPGateway::TCountedContent Result;
};
- struct TEvReadFinished : public TEventLocal<TEvReadFinished, EvReadFinished> {
- explicit TEvReadFinished(long httpResponseCode) : HttpResponseCode(httpResponseCode) {}
+ struct TEvReadStarted : public TEventLocal<TEvReadStarted, EvReadStarted> {
+ explicit TEvReadStarted(long httpResponseCode) : HttpResponseCode(httpResponseCode) {}
const long HttpResponseCode;
};
+ struct TEvReadFinished : public TEventLocal<TEvReadFinished, EvReadFinished> {};
+
struct TEvReadError : public TEventLocal<TEvReadError, EvReadError> {
TEvReadError(TIssues&& error, size_t pathInd = std::numeric_limits<size_t>::max()) : Error(std::move(error)), PathIndex(pathInd) {}
const TIssues Error;
@@ -247,6 +255,36 @@ struct TReadSpec {
TString Format, Compression;
};
+struct TRetryStuff {
+ using TPtr = std::shared_ptr<TRetryStuff>;
+
+ TRetryStuff(
+ IHTTPGateway::TPtr gateway,
+ TString url,
+ const IHTTPGateway::THeaders& headers,
+ std::size_t expectedSize
+ ) : Gateway(std::move(gateway)), Url(std::move(url)), Headers(headers), ExpectedSize(expectedSize), Offset(0U), RetryState(GetS3RetryPolicy()->CreateRetryState())
+ {}
+
+ const IHTTPGateway::TPtr Gateway;
+ const TString Url;
+ const IHTTPGateway::THeaders Headers;
+ const std::size_t ExpectedSize;
+
+ std::size_t Offset = 0U;
+ const IRetryPolicy<long>::IRetryState::TPtr RetryState;
+ IHTTPGateway::TCancelHook CancelHook;
+ TMaybe<TDuration> NextRetryDelay;
+
+ void Cancel() {
+ NextRetryDelay = {};
+ if (const auto cancelHook = std::move(CancelHook)) {
+ CancelHook = {};
+ cancelHook(TIssue("Request cancelled."));
+ }
+ }
+};
+
class TS3ReadCoroImpl : public TActorCoroImpl {
private:
class TReadBufferFromStream : public NDB::ReadBuffer {
@@ -267,27 +305,44 @@ private:
TS3ReadCoroImpl *const Coro;
TString Value;
};
+
+ static constexpr std::string_view TruncatedSuffix = "... [truncated]"sv;
public:
- TS3ReadCoroImpl(ui64 inputIndex, const NActors::TActorId& sourceActorId, const NActors::TActorId& computeActorId, const TReadSpec::TPtr& readSpec, size_t pathIndex, const TString& path)
- : TActorCoroImpl(256_KB), InputIndex(inputIndex), ReadSpec(readSpec), SourceActorId(sourceActorId), ComputeActorId(computeActorId), PathIndex(pathIndex), Path(path)
+ TS3ReadCoroImpl(ui64 inputIndex, const NActors::TActorId& computeActorId, const TRetryStuff::TPtr& retryStuff, const TReadSpec::TPtr& readSpec, size_t pathIndex, const TString& path)
+ : TActorCoroImpl(256_KB), InputIndex(inputIndex), RetryStuff(retryStuff), ReadSpec(readSpec), ComputeActorId(computeActorId), PathIndex(pathIndex), Path(path)
{}
bool Next(TString& value) {
- if (HttpResponseCode)
+ if (InputFinished)
return false;
- const auto ev = WaitForSpecificEvent<TEvPrivate::TEvDataPart, TEvPrivate::TEvReadError, TEvPrivate::TEvReadFinished>();
+ const auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadStarted, TEvPrivate::TEvDataPart, TEvPrivate::TEvReadError, TEvPrivate::TEvReadFinished>();
switch (const auto etype = ev->GetTypeRewrite()) {
+ case TEvPrivate::TEvReadStarted::EventType:
+ ErrorText.clear();
+ Issues.Clear();
+ value.clear();
+ RetryStuff->NextRetryDelay = RetryStuff->RetryState->GetNextRetryDelay(HttpResponseCode = ev->Get<TEvPrivate::TEvReadStarted>()->HttpResponseCode);
+ return true;
case TEvPrivate::TEvReadFinished::EventType:
- HttpResponseCode = ev->Get<TEvPrivate::TEvReadFinished>()->HttpResponseCode;
+ InputFinished = true;
return false;
case TEvPrivate::TEvReadError::EventType:
- HttpResponseCode = 0L;
- Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, ev->Get<TEvPrivate::TEvReadError>()->Error, true));
+ InputFinished = true;
+ Issues = std::move(ev->Get<TEvPrivate::TEvReadError>()->Error);
return false;
case TEvPrivate::TEvDataPart::EventType:
- value = ev->Get<TEvPrivate::TEvDataPart>()->Result.Extract();
- Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex));
+ if (200L == HttpResponseCode || 206L == HttpResponseCode) {
+ value = ev->Get<TEvPrivate::TEvDataPart>()->Result.Extract();
+ RetryStuff->Offset += value.size();
+ Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex));
+ } else if (HttpResponseCode && !RetryStuff->NextRetryDelay) {
+ if (ErrorText.size() < 256_KB)
+ ErrorText.append(ev->Get<TEvPrivate::TEvDataPart>()->Result.Extract());
+ else if (!ErrorText.EndsWith(TruncatedSuffix))
+ ErrorText.append(TruncatedSuffix);
+ value.clear();
+ }
return true;
default:
return false;
@@ -295,17 +350,16 @@ public:
}
private:
void WaitFinish() {
- if (HttpResponseCode)
+ if (InputFinished)
return;
const auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadError, TEvPrivate::TEvReadFinished>();
+ InputFinished = true;
switch (const auto etype = ev->GetTypeRewrite()) {
case TEvPrivate::TEvReadFinished::EventType:
- HttpResponseCode = ev->Get<TEvPrivate::TEvReadFinished>()->HttpResponseCode;
break;
case TEvPrivate::TEvReadError::EventType:
- HttpResponseCode = 0L;
- Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, ev->Get<TEvPrivate::TEvReadError>()->Error, true));
+ Issues = std::move(ev->Get<TEvPrivate::TEvReadError>()->Error);
break;
default:
break;
@@ -319,14 +373,36 @@ private:
NDB::InputStreamFromInputFormat stream(NDB::FormatFactory::instance().getInputFormat(ReadSpec->Format, decompress ? *decompress : buffer, NDB::Block(ReadSpec->Columns), nullptr, 1_MB, ReadSpec->Settings));
while (auto block = stream.read())
- Send(SourceActorId, new TEvPrivate::TEvNextBlock(block, PathIndex));
+ Send(ParentActorId, new TEvPrivate::TEvNextBlock(block, PathIndex));
WaitFinish();
- if (*HttpResponseCode)
- Send(SourceActorId, new TEvPrivate::TEvReadFinished(*HttpResponseCode));
+ if (!ErrorText.empty()) {
+ TStringBuilder str;
+
+ if (HttpResponseCode)
+ str << "HTTP response code: " << HttpResponseCode << ", ";
+
+ if (ErrorText.StartsWith("<?xml") && !ErrorText.EndsWith(TruncatedSuffix)) {
+ const NXml::TDocument xml(ErrorText, NXml::TDocument::String);
+ if (const auto& root = xml.Root(); root.Name() == "Error") {
+ const auto& code = root.Node("Code", true).Value<TString>();
+ const auto& message = root.Node("Message", true).Value<TString>();
+ str << message << ", error code: " << code;
+ } else
+ str << ErrorText;
+ } else
+ str << ErrorText;
+
+ Issues.AddIssues({TIssue(str)});
+ }
+
+ if (Issues)
+ Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, std::move(Issues), true));
+ else
+ Send(ParentActorId, new TEvPrivate::TEvReadFinished);
} catch (const TDtorException&) {
- return;
+ return RetryStuff->Cancel();
} catch (const std::exception& err) {
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError(InputIndex, TIssues{TIssue(TStringBuilder() << "Error while reading file " << Path << ", details: " << err.what())}, true));
return;
@@ -337,72 +413,53 @@ private:
}
private:
const ui64 InputIndex;
+ const TRetryStuff::TPtr RetryStuff;
const TReadSpec::TPtr ReadSpec;
const TString Format, RowType, Compression;
- const NActors::TActorId SourceActorId;
const NActors::TActorId ComputeActorId;
const size_t PathIndex;
const TString Path;
- std::optional<long> HttpResponseCode;
+
+ bool InputFinished = false;
+ long HttpResponseCode = 0L;
+ TString ErrorText;
+ TIssues Issues;
};
class TS3ReadCoroActor : public TActorCoro {
- struct TRetryStuff {
- using TPtr = std::shared_ptr<TRetryStuff>;
-
- TRetryStuff(
- IHTTPGateway::TPtr gateway,
- TString url,
- const IHTTPGateway::THeaders& headers,
- std::size_t expectedSize
- ) : Gateway(std::move(gateway)), Url(std::move(url)), Headers(headers), ExpectedSize(expectedSize), Offset(0U), RetryState(GetS3RetryPolicy()->CreateRetryState())
- {}
-
- const IHTTPGateway::TPtr Gateway;
- const TString Url;
- const IHTTPGateway::THeaders Headers;
- const std::size_t ExpectedSize;
-
- std::size_t Offset = 0U;
- const IRetryPolicy<long>::IRetryState::TPtr RetryState;
- };
public:
- TS3ReadCoroActor(THolder<TS3ReadCoroImpl> impl,
- IHTTPGateway::TPtr gateway,
- const TString& url,
- const IHTTPGateway::THeaders& headers,
- const TString& path,
- const std::size_t expectedSize)
+ TS3ReadCoroActor(THolder<TS3ReadCoroImpl> impl, TRetryStuff::TPtr retryStuff)
: TActorCoro(THolder<TActorCoroImpl>(impl.Release()))
- , RetryStuff(std::make_shared<TRetryStuff>(std::move(gateway), url + path, headers, expectedSize))
+ , RetryStuff(std::move(retryStuff))
{}
private:
- static void OnNewData(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, const TRetryStuff::TPtr& retryStuff, IHTTPGateway::TCountedContent&& data) {
- retryStuff->Offset += data.size();
+ static void OnDownloadStart(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, long httpResponseCode) {
+ actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadStarted(httpResponseCode)));
+ }
+
+ static void OnNewData(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, IHTTPGateway::TCountedContent&& data) {
actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvDataPart(std::move(data))));
}
- static void OnDownloadFinished(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, const TRetryStuff::TPtr& retryStuff, std::variant<long, TIssues> result) {
- switch (result.index()) {
- case 0U:
- if (const auto httpCode = std::get<long>(result); const auto nextDelayMs = retryStuff->RetryState->GetNextRetryDelay(httpCode))
- actorSystem->Schedule(*nextDelayMs, new IEventHandle(parent, self, new TEvPrivate::TEvRetryEventFunc(std::bind(&TS3ReadCoroActor::DownloadStart, retryStuff, self, parent))));
- else
- actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadFinished(httpCode)));
- break;
- case 1U:
- if (const auto nextDelayMs = retryStuff->RetryState->GetNextRetryDelay(0L))
- actorSystem->Schedule(*nextDelayMs, new IEventHandle(parent, self, new TEvPrivate::TEvRetryEventFunc(std::bind(&TS3ReadCoroActor::DownloadStart, retryStuff, self, parent))));
- else
- actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadError(std::get<TIssues>(std::move(result)))));
- break;
+ static void OnDownloadFinished(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, const TRetryStuff::TPtr& retryStuff, TIssues issues) {
+ if (issues) {
+ if (retryStuff->NextRetryDelay = retryStuff->RetryState->GetNextRetryDelay(0L); !retryStuff->NextRetryDelay) {
+ actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadError(std::move(issues))));
+ return;
+ }
}
+
+ if (retryStuff->NextRetryDelay)
+ actorSystem->Schedule(*retryStuff->NextRetryDelay, new IEventHandle(parent, self, new TEvPrivate::TEvRetryEventFunc(std::bind(&TS3ReadCoroActor::DownloadStart, retryStuff, self, parent))));
+ else
+ actorSystem->Send(new IEventHandle(self, parent, new TEvPrivate::TEvReadFinished));
}
static void DownloadStart(const TRetryStuff::TPtr& retryStuff, const TActorId& self, const TActorId& parent) {
- retryStuff->Gateway->Download(retryStuff->Url,
+ retryStuff->CancelHook = retryStuff->Gateway->Download(retryStuff->Url,
retryStuff->Headers, retryStuff->Offset,
- std::bind(&TS3ReadCoroActor::OnNewData, TActivationContext::ActorSystem(), self, parent, retryStuff, std::placeholders::_1),
+ std::bind(&TS3ReadCoroActor::OnDownloadStart, TActivationContext::ActorSystem(), self, parent, std::placeholders::_1),
+ std::bind(&TS3ReadCoroActor::OnNewData, TActivationContext::ActorSystem(), self, parent, std::placeholders::_1),
std::bind(&TS3ReadCoroActor::OnDownloadFinished, TActivationContext::ActorSystem(), self, parent, retryStuff, std::placeholders::_1));
}
@@ -448,8 +505,9 @@ public:
Become(&TS3StreamReadActor::StateFunc);
for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) {
const TPath& path = Paths[pathInd];
- auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, SelfId(), ComputeActorId, ReadSpec, pathInd + StartPathIndex, std::get<TString>(path));
- RegisterWithSameMailbox(MakeHolder<TS3ReadCoroActor>(std::move(impl), Gateway, Url, Headers, std::get<TString>(path), std::get<std::size_t>(path)).Release());
+ auto stuff = std::make_shared<TRetryStuff>(Gateway, Url + std::get<TString>(path), Headers, std::get<std::size_t>(path));
+ auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, ComputeActorId, stuff, ReadSpec, pathInd + StartPathIndex, std::get<TString>(path));
+ RegisterWithSameMailbox(std::make_unique<TS3ReadCoroActor>(std::move(impl), std::move(stuff)).release());
}
}