diff options
author | a-romanov <Anton.Romanov@ydb.tech> | 2022-07-08 14:06:30 +0300 |
---|---|---|
committer | a-romanov <Anton.Romanov@ydb.tech> | 2022-07-08 14:06:30 +0300 |
commit | 597891480429fa7d44a23f6e03d208bfdd58b8b1 (patch) | |
tree | cfdeda8a7f8c47bbaeae140771e16dd1269a5d00 | |
parent | 635fcbaa8b2abae1672956b1ff43cd07ec7d0683 (diff) | |
download | ydb-597891480429fa7d44a23f6e03d208bfdd58b8b1.tar.gz |
S3 upload first version.
7 files changed, 97 insertions, 128 deletions
diff --git a/ydb/core/yq/libs/test_connection/test_object_storage.cpp b/ydb/core/yq/libs/test_connection/test_object_storage.cpp index 52a2b81e559..80832654cf4 100644 --- a/ydb/core/yq/libs/test_connection/test_object_storage.cpp +++ b/ydb/core/yq/libs/test_connection/test_object_storage.cpp @@ -175,7 +175,6 @@ private: 0U, std::bind(&DiscoveryCallback, std::placeholders::_1, SelfId(), TActivationContext::ActorSystem()), /*data=*/"", - false, retryPolicy ); } diff --git a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp index 6c66c927a1e..75edbe3669e 100644 --- a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp @@ -43,15 +43,14 @@ public: return std::move(ret); } - void Upload(TString, THeaders, TString, TOnResponse) {} + void Upload(TString, THeaders, TString, TOnResult, bool, IRetryPolicy<long>::TPtr) {} void Download( TString url, - IHTTPGateway::THeaders headers, + THeaders headers, std::size_t expectedSize, - IHTTPGateway::TOnResult callback, + TOnResult callback, TString data, - bool, IRetryPolicy<long>::TPtr retryPolicy) { @@ -72,10 +71,10 @@ public: virtual void Download( TString , - IHTTPGateway::THeaders , + THeaders , std::size_t , - IHTTPGateway::TOnNewDataPart , - IHTTPGateway::TOnDownloadFinish ) { + TOnNewDataPart , + TOnDownloadFinish ) { } void AddDefaultResponse(TDataDefaultResponse response) { @@ -84,7 +83,7 @@ public: void AddDownloadResponse( TString url, - IHTTPGateway::THeaders headers, + THeaders headers, TString data, TDataResponse response) { diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp index 83429896b6f..aa0c973e239 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp @@ -24,8 +24,8 @@ public: PUT }; - TEasyCurl(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, EMethod method, size_t offset = 0ULL, bool withBody = false) - : Offset(offset), Handle(curl_easy_init()), Counter(counter), DownloadedBytes(downloadedBytes) + TEasyCurl(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, IHTTPGateway::THeaders headers, EMethod method, size_t offset = 0ULL, bool withBody = false) + : Offset(offset), Handle(curl_easy_init()), Counter(counter), DownloadedBytes(downloadedBytes), UploadedBytes(uploadedBytes) { switch (method) { case EMethod::GET: @@ -52,10 +52,8 @@ public: curl_easy_setopt(Handle, CURLOPT_RANGE, (ToString(Offset) += '-').c_str()); } - if (EMethod::PUT != method) { - curl_easy_setopt(Handle, CURLOPT_WRITEFUNCTION, &WriteMemoryCallback); - curl_easy_setopt(Handle, CURLOPT_WRITEDATA, static_cast<void*>(this)); - } + curl_easy_setopt(Handle, EMethod::PUT == method ? CURLOPT_HEADERFUNCTION : CURLOPT_WRITEFUNCTION, &WriteMemoryCallback); + curl_easy_setopt(Handle, EMethod::PUT == method ? CURLOPT_HEADERDATA :CURLOPT_WRITEDATA, static_cast<void*>(this)); if (withBody) { curl_easy_setopt(Handle, CURLOPT_READFUNCTION, &ReadMemoryCallback); @@ -72,8 +70,6 @@ public: } } - virtual size_t GetExpectedSize() const { return 0ULL; } - CURL* GetHandle() const { return Handle; } @@ -90,14 +86,18 @@ protected: private: static size_t WriteMemoryCallback(void* contents, size_t size, size_t nmemb, void* userp) { - auto self = static_cast<TEasyCurl*>(userp); - self->DownloadedBytes->Add(size * nmemb); - return self->Write(contents, size, nmemb); + const auto self = static_cast<TEasyCurl*>(userp); + const auto res = self->Write(contents, size, nmemb); + self->DownloadedBytes->Add(res); + return res; }; static size_t ReadMemoryCallback(char *buffer, size_t size, size_t nmemb, void *userp) { - return static_cast<TEasyCurl*>(userp)->Read(buffer, size, nmemb); + const auto self = static_cast<TEasyCurl*>(userp); + const auto res = self->Read(buffer, size, nmemb); + self->UploadedBytes->Add(res); + return res; }; const size_t Offset; @@ -105,44 +105,7 @@ private: curl_slist* Headers = nullptr; const NMonitoring::TDynamicCounters::TCounterPtr Counter; const NMonitoring::TDynamicCounters::TCounterPtr DownloadedBytes; -}; - -class TEasyCurlUpload : public TEasyCurl { -public: - using TPtr = std::shared_ptr<TEasyCurlUpload>; - using TWeakPtr = std::weak_ptr<TEasyCurlUpload>; - - TEasyCurlUpload(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, TString data, IHTTPGateway::THeaders headers, IHTTPGateway::TOnResponse callback) - : TEasyCurl(counter, uploadedBytes, url, headers, EMethod::PUT), Data(std::move(data)), Input(Data), Callback(std::move(callback)) - {} - - static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, TString data, IHTTPGateway::THeaders headers, IHTTPGateway::TOnResponse callback) { - return std::make_shared<TEasyCurlUpload>(counter, uploadedBytes, std::move(url), std::move(data), std::move(headers), std::move(callback)); - } -private: - void Fail(const TIssue& error) final { - Callback(TIssues{error}); - } - - void Done(CURLcode result) final { - if (CURLE_OK != result) - return Fail(TIssue(curl_easy_strerror(result))); - - long httpResponseCode = 0; - curl_easy_getinfo(GetHandle(), CURLINFO_RESPONSE_CODE, &httpResponseCode); - Callback(httpResponseCode); - } - - size_t Write(void*, size_t, size_t) final { return 0ULL; } - - size_t Read(char *buffer, size_t size, size_t nmemb) final { - return Input.Read(buffer, size * nmemb); - } - - const TString Data; - TStringInput Input; - - IHTTPGateway::TOnResponse Callback; + const NMonitoring::TDynamicCounters::TCounterPtr UploadedBytes; }; class TEasyCurlBuffer : public TEasyCurl { @@ -150,18 +113,18 @@ public: using TPtr = std::shared_ptr<TEasyCurlBuffer>; using TWeakPtr = std::weak_ptr<TEasyCurlBuffer>; - TEasyCurlBuffer(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, bool post, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback) - : TEasyCurl(counter, downloadedBytes, url, headers, post ? EMethod::POST : EMethod::GET, offset, !data.empty()), ExpectedSize(expectedSize), Data(std::move(data)), Input(Data), Output(Buffer) + TEasyCurlBuffer(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback) + : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, method, offset, !data.empty()), ExpectedSize(expectedSize), Data(std::move(data)), Input(Data), Output(Buffer) { Output.Reserve(ExpectedSize); Callbacks.emplace(std::move(callback)); } - static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, bool post, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback) { - return std::make_shared<TEasyCurlBuffer>(counter, downloadedBytes, std::move(url), post, std::move(data), std::move(headers), offset, expectedSize, std::move(callback)); + static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback) { + return std::make_shared<TEasyCurlBuffer>(counter, downloadedBytes, uploadededBytes, std::move(url), method, std::move(data), std::move(headers), offset, expectedSize, std::move(callback)); } - size_t GetExpectedSize() const final { + size_t GetExpectedSize() const { return ExpectedSize; } @@ -225,12 +188,12 @@ public: using TPtr = std::shared_ptr<TEasyCurlStream>; using TWeakPtr = std::weak_ptr<TEasyCurlStream>; - TEasyCurlStream(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, size_t offset, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish) - : TEasyCurl(counter, downloadedBytes, url, headers, EMethod::GET, offset), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL)) + TEasyCurlStream(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, IHTTPGateway::THeaders headers, size_t offset, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish) + : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, EMethod::GET, offset), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL)) {} - static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, size_t offset, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish) { - return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, std::move(url), std::move(headers), offset, std::move(onNewData), std::move(onFinish)); + static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, IHTTPGateway::THeaders headers, size_t offset, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish) { + return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, uploadededBytes, std::move(url), std::move(headers), offset, std::move(onNewData), std::move(onFinish)); } enum class EAction : i8 { @@ -503,13 +466,14 @@ private: } } - void Upload(TString url, THeaders headers, TString body, TOnResponse callback) final { + void Upload(TString url, THeaders headers, TString body, TOnResult callback, bool put, IRetryPolicy<long>::TPtr retryPolicy) final { Rps->Inc(); - auto upload = TEasyCurlUpload::Make(InFlight, UploadedBytes, std::move(url), std::move(body), std::move(headers), std::move(callback)); const std::unique_lock lock(Sync); - Await.emplace(std::move(upload)); - Wakeup(0ULL); + auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, UploadedBytes, std::move(url), put ? TEasyCurl::EMethod::PUT : TEasyCurl::EMethod::POST, std::move(body), std::move(headers), 0U, 0U, std::move(callback)); + Easy2RetryState.emplace(easy, std::move(retryPolicy->CreateRetryState())); + Await.emplace(std::move(easy)); + Wakeup(0U); } void Download( @@ -518,7 +482,6 @@ private: size_t expectedSize, TOnResult callback, TString data, - bool post, IRetryPolicy<long>::TPtr retryPolicy) final { Rps->Inc(); @@ -529,13 +492,12 @@ private: return; } const std::unique_lock lock(Sync); - TEasyCurlBuffer::TWeakPtr stub; - auto& entry = post ? stub : Requests[TKeyType(url, 0U, headers, data, retryPolicy)]; + auto& entry = Requests[TKeyType(url, 0U, headers, data, retryPolicy)]; if (const auto& easy = entry.lock()) if (easy->AddCallback(callback)) return; - auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, std::move(url), post, std::move(data), std::move(headers), 0U, expectedSize, std::move(callback)); + auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, UploadedBytes, std::move(url), TEasyCurl::EMethod::GET, std::move(data), std::move(headers), 0U, expectedSize, std::move(callback)); entry = easy; Easy2RetryState.emplace(easy, std::move(retryPolicy->CreateRetryState())); Await.emplace(std::move(easy)); @@ -549,7 +511,7 @@ private: TOnNewDataPart onNewData, TOnDownloadFinish onFinish) final { - auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, std::move(url), std::move(headers), offset, std::move(onNewData), std::move(onFinish)); + auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, std::move(onNewData), std::move(onFinish)); const std::unique_lock lock(Sync); const auto handle = stream->GetHandle(); Streams.emplace_back(stream); @@ -599,7 +561,7 @@ private: private: CURLM* Handle = nullptr; - std::queue<TEasyCurl::TPtr> Await; + std::queue<TEasyCurlBuffer::TPtr> Await; std::vector<TEasyCurlStream::TWeakPtr> Streams; std::unordered_map<CURL*, TEasyCurl::TPtr> Allocated; diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h index 8c7a142f900..926c4f790bf 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h @@ -62,22 +62,23 @@ public: }; using THeaders = TSmallVec<TString>; - - using TResponse = std::variant<long, TIssues>; - using TOnResponse = std::function<void(TResponse&&)>; - - virtual void Upload(TString url, THeaders headers, TString body, TOnResponse callback) = 0; - using TResult = std::variant<TContent, TIssues>; using TOnResult = std::function<void(TResult&&)>; + virtual void Upload( + TString url, + THeaders headers, + TString body, + TOnResult callback, + bool put = false, + IRetryPolicy</*http response code*/long>::TPtr RetryPolicy = IRetryPolicy<long>::GetNoRetryPolicy()) = 0; + virtual void Download( TString url, THeaders headers, std::size_t expectedSize, TOnResult callback, TString data = {}, - bool post = false, IRetryPolicy</*http response code*/long>::TPtr RetryPolicy = IRetryPolicy<long>::GetNoRetryPolicy() ) = 0; diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index 2aedf58d892..24e3997679a 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -53,8 +53,9 @@ struct TEvPrivate { }; struct TEvUploadPartFinished : public TEventLocal<TEvUploadPartFinished, EvUploadPartFinished> { - explicit TEvUploadPartFinished(size_t size) : Size(size) {} - const size_t Size; + TEvUploadPartFinished(size_t size, size_t index, TString&& etag) : Size(size), Index(index), ETag(std::move(etag)) {} + const size_t Size, Index; + const TString ETag; }; }; @@ -122,7 +123,7 @@ public: void Bootstrap() { Become(&TS3WriteActor::InitialStateFunc); - Gateway->Download(Url + Path + "?uploads", Headers, 0, std::bind(&TS3WriteActor::OnUploadsCreated, ActorSystem, SelfId(), std::placeholders::_1), "", true); + Gateway->Upload(Url + Path + "?uploads", Headers, "", std::bind(&TS3WriteActor::OnUploadsCreated, ActorSystem, SelfId(), std::placeholders::_1), false); } static constexpr char ActorName[] = "S3_WRITE_ACTOR"; @@ -131,7 +132,7 @@ private: void LoadState(const NDqProto::TSinkState&) final {}; ui64 GetOutputIndex() const final { return OutputIndex; } i64 GetFreeSpace() const final { - return 1_GB - InFlight - std::accumulate(Parts.cbegin(), Parts.cend(), 0LL, [](i64 s, const NUdf::TUnboxedValuePod v){ return v ? s + v.AsStringRef().Size() : s; }); + return 1_GB - InFlight - InQueue; } STRICT_STFUNC(InitialStateFunc, @@ -159,9 +160,10 @@ private: actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << message << ", error: code: " << code)}))); } else if (root.Name() != "InitiateMultipartUploadResult") actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response '" << root.Name() << "' on create upload.")}))); - else - actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadStarted(root.Node("UploadId", true).Value<TString>()))); - + else { + const NXml::TNamespacesForXPath nss(1U, {"s3", "http://s3.amazonaws.com/doc/2006-03-01/"}); + actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadStarted(root.Node("s3:UploadId", false, nss).Value<TString>()))); + } break; } catch (const std::exception& ex) { actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Error '" << ex.what() << "' on parse create upload response.")}))); @@ -176,15 +178,23 @@ private: } } - static void OnPartUploadFinish(TActorSystem* actorSystem, TActorId selfId, size_t size, IHTTPGateway::TResponse&& response) { + static void OnPartUploadFinish(TActorSystem* actorSystem, TActorId selfId, size_t size, size_t index, IHTTPGateway::TResult&& response) { switch (response.index()) { - case 0U: - if (const auto code = std::get<long>(std::move(response)); 200L == code) - actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadPartFinished(size))); - else - actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response code " << code)}))); + case 0U: { + const auto str = std::get<IHTTPGateway::TContent>(std::move(response)).Extract(); + + if (const auto p = str.find("etag: \""); p != TString::npos) { + if (const auto p1 = p + 7, p2 = str.find("\"", p1); p2 != TString::npos) { + actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadPartFinished(size, index, str.substr(p1, p2 - p1)))); + break; + } + } + actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response:" << Endl << str)}))); + break; + } case 1U: actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError(std::get<TIssues>(std::move(response))))); + break; } } @@ -200,7 +210,6 @@ private: actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response '" << root.Name() << "' on finish upload.")}))); else actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadFinished())); - break; } catch (const std::exception& ex) { actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Error '" << ex.what() << "' on parse finish upload response.")}))); @@ -215,15 +224,17 @@ private: } } - void SendData(TUnboxedValueVector&& data, i64, const TMaybe<NDqProto::TCheckpoint>&, bool finished) final { + void SendData(TUnboxedValueVector&& data, i64 size, const TMaybe<NDqProto::TCheckpoint>&, bool finished) final { InputFinished = finished; - std::move(data.begin(), data.end(), std::back_inserter(Parts)); + for (const auto& v : data) + Parts.emplace(v.AsStringRef()); + data.clear(); + InQueue += size; if (!UploadId.empty()) StartUploadParts(); } void Handle(TEvPrivate::TEvUploadError::TPtr& result) { - Parts.clear(); Callbacks->OnAsyncOutputError(OutputIndex, result->Get()->Error, true); if (!UploadId.empty()) { // TODO: Send delete. @@ -232,30 +243,29 @@ private: void Handle(TEvPrivate::TEvUploadStarted::TPtr& result) { UploadId = result->Get()->UploadId; - Become(&TS3WriteActor::InitialStateFunc); + Become(&TS3WriteActor::WorkingStateFunc); StartUploadParts(); } void Handle(TEvPrivate::TEvUploadPartFinished::TPtr& result) { InFlight -= result->Get()->Size; + Tags[result->Get()->Index] = std::move(result->Get()->ETag); - if (!InFlight && std::all_of(Parts.cbegin(), Parts.cend(), std::logical_not<NUdf::TUnboxedValuePod>())) { + if (!InFlight && InputFinished && Parts.empty()) { Become(&TS3WriteActor::FinalStateFunc); - TStringBuilder xml; xml << "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" << Endl; xml << "<CompleteMultipartUpload xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">" << Endl; - for (auto i = 1U; i <= Parts.size(); ++i) - xml << "<Part><PartNumber>" << i << "</Part></PartNumber>" << Endl; + size_t i = 0U; + for (const auto& tag : Tags) + xml << "<Part><PartNumber>" << ++i << "</PartNumber><ETag>" << tag << "</ETag></Part>" << Endl; xml << "</CompleteMultipartUpload>" << Endl; - Gateway->Download(Url + Path + "?uploadId=" + UploadId, Headers, 0, std::bind(&TS3WriteActor::OnUploadFinish, ActorSystem, SelfId(), std::placeholders::_1), xml, true); + Gateway->Upload(Url + Path + "?uploadId=" + UploadId, Headers, xml, std::bind(&TS3WriteActor::OnUploadFinish, ActorSystem, SelfId(), std::placeholders::_1), false); } } void HandleFinished() { - if (InputFinished && !InFlight && Parts.empty()) { - Callbacks->OnAsyncOutputFinished(OutputIndex); - } + return Callbacks->OnAsyncOutputFinished(OutputIndex); } // IActor & IDqComputeActorAsyncOutput @@ -268,16 +278,17 @@ private: } void StartUploadParts() { - for (auto i = 0U; i < Parts.size(); ++i) { - if (auto part = std::move(Parts[i])) { - const auto size = part.AsStringRef().Size(); - InFlight += size; - Gateway->Upload(Url + Path + "?partNumber=" + std::to_string(i + 1) + "&uploadId=" + UploadId, Headers, TString(part.AsStringRef()), std::bind(&TS3WriteActor::OnPartUploadFinish, ActorSystem, SelfId(), size, std::placeholders::_1)); - } + for (InQueue = 0ULL; !Parts.empty(); Parts.pop()) { + const auto size = Parts.front().size(); + const auto index = Tags.size(); + Tags.emplace_back(); + InFlight += size; + Gateway->Upload(Url + Path + "?partNumber=" + std::to_string(index + 1) + "&uploadId=" + UploadId, Headers, std::move(Parts.front()), std::bind(&TS3WriteActor::OnPartUploadFinish, ActorSystem, SelfId(), size, index, std::placeholders::_1), true); } } - bool InputFinished = false; + bool InputFinished = false; + size_t InQueue = 0ULL; size_t InFlight = 0ULL; const IHTTPGateway::TPtr Gateway; @@ -291,7 +302,8 @@ private: const IHTTPGateway::THeaders Headers; const TString Path; - TUnboxedValueVector Parts; + std::queue<TString> Parts; + std::vector<TString> Tags; std::vector<TRetryParams> RetriesPerPath; const std::shared_ptr<NS3::TRetryConfig> RetryConfig; diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp index 5b6af20c7b1..5174721c66a 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp @@ -212,7 +212,6 @@ private: prefix, maxDiscoveryFilesPerQuery), /*data=*/"", - false, retryPolicy); } } @@ -270,7 +269,6 @@ private: prefix, MaxFilesPerQuery), /*data=*/"", - false, retryPolicy); return future; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp index d9e83ee3ccd..8ffbd2be00d 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp @@ -20,8 +20,9 @@ TRuntimeNode BuildSerializeCall( TType* inputType, NCommon::TMkqlBuildContext& ctx) { + const auto inputItemType = AS_TYPE(TFlowType, inputType)->GetItemType(); if (format == "raw") { - const auto structType = AS_TYPE(TStructType, AS_TYPE(TFlowType, inputType)->GetItemType()); + const auto structType = AS_TYPE(TStructType, inputItemType); MKQL_ENSURE(1U == structType->GetMembersCount(), "Expected single column."); const auto schemeType = AS_TYPE(TDataType, structType->GetMemberType(0U))->GetSchemeType(); return ctx.ProgramBuilder.Map(input, @@ -31,11 +32,7 @@ TRuntimeNode BuildSerializeCall( } ); } else if (format == "json_list") { - return ctx.ProgramBuilder.FlatMap( - ctx.ProgramBuilder.Condense(input, ctx.ProgramBuilder.NewList(AS_TYPE(TFlowType, inputType)->GetItemType(), {}), - [&ctx] (TRuntimeNode, TRuntimeNode) { return ctx.ProgramBuilder.NewDataLiteral<bool>(false); }, - [&ctx] (TRuntimeNode item, TRuntimeNode state) { return ctx.ProgramBuilder.Append(state, item); } - ), + return ctx.ProgramBuilder.FlatMap(ctx.ProgramBuilder.SqueezeToList(input, ctx.ProgramBuilder.NewEmptyOptionalDataLiteral(NUdf::TDataType<ui64>::Id)), [&ctx] (TRuntimeNode list) { const auto userType = ctx.ProgramBuilder.NewTupleType({ctx.ProgramBuilder.NewTupleType({list.GetStaticType()})}); return ctx.ProgramBuilder.ToString(ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("Yson2.SerializeJson"), {ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("Yson2.From", {}, userType), {list})})); @@ -43,7 +40,8 @@ TRuntimeNode BuildSerializeCall( ); } - throw yexception() << "Unsupported format: " << format; + const auto userType = ctx.ProgramBuilder.NewTupleType({ctx.ProgramBuilder.NewTupleType({ctx.ProgramBuilder.NewStreamType(inputItemType)})}); + return ctx.ProgramBuilder.ToFlow(ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("ClickHouseClient.SerializeFormat", {}, userType, format), {ctx.ProgramBuilder.FromFlow(input)})); } TRuntimeNode SerializeForS3(const TS3SinkOutput& wrapper, NCommon::TMkqlBuildContext& ctx) { |