aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <Anton.Romanov@ydb.tech>2022-07-08 14:06:30 +0300
committera-romanov <Anton.Romanov@ydb.tech>2022-07-08 14:06:30 +0300
commit597891480429fa7d44a23f6e03d208bfdd58b8b1 (patch)
treecfdeda8a7f8c47bbaeae140771e16dd1269a5d00
parent635fcbaa8b2abae1672956b1ff43cd07ec7d0683 (diff)
downloadydb-597891480429fa7d44a23f6e03d208bfdd58b8b1.tar.gz
S3 upload first version.
-rw-r--r--ydb/core/yq/libs/test_connection/test_object_storage.cpp1
-rw-r--r--ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp15
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp100
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h15
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp80
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_list.cpp2
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp12
7 files changed, 97 insertions, 128 deletions
diff --git a/ydb/core/yq/libs/test_connection/test_object_storage.cpp b/ydb/core/yq/libs/test_connection/test_object_storage.cpp
index 52a2b81e559..80832654cf4 100644
--- a/ydb/core/yq/libs/test_connection/test_object_storage.cpp
+++ b/ydb/core/yq/libs/test_connection/test_object_storage.cpp
@@ -175,7 +175,6 @@ private:
0U,
std::bind(&DiscoveryCallback, std::placeholders::_1, SelfId(), TActivationContext::ActorSystem()),
/*data=*/"",
- false,
retryPolicy
);
}
diff --git a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp
index 6c66c927a1e..75edbe3669e 100644
--- a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp
@@ -43,15 +43,14 @@ public:
return std::move(ret);
}
- void Upload(TString, THeaders, TString, TOnResponse) {}
+ void Upload(TString, THeaders, TString, TOnResult, bool, IRetryPolicy<long>::TPtr) {}
void Download(
TString url,
- IHTTPGateway::THeaders headers,
+ THeaders headers,
std::size_t expectedSize,
- IHTTPGateway::TOnResult callback,
+ TOnResult callback,
TString data,
- bool,
IRetryPolicy<long>::TPtr retryPolicy)
{
@@ -72,10 +71,10 @@ public:
virtual void Download(
TString ,
- IHTTPGateway::THeaders ,
+ THeaders ,
std::size_t ,
- IHTTPGateway::TOnNewDataPart ,
- IHTTPGateway::TOnDownloadFinish ) {
+ TOnNewDataPart ,
+ TOnDownloadFinish ) {
}
void AddDefaultResponse(TDataDefaultResponse response) {
@@ -84,7 +83,7 @@ public:
void AddDownloadResponse(
TString url,
- IHTTPGateway::THeaders headers,
+ THeaders headers,
TString data,
TDataResponse response) {
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
index 83429896b6f..aa0c973e239 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
@@ -24,8 +24,8 @@ public:
PUT
};
- TEasyCurl(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, EMethod method, size_t offset = 0ULL, bool withBody = false)
- : Offset(offset), Handle(curl_easy_init()), Counter(counter), DownloadedBytes(downloadedBytes)
+ TEasyCurl(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, IHTTPGateway::THeaders headers, EMethod method, size_t offset = 0ULL, bool withBody = false)
+ : Offset(offset), Handle(curl_easy_init()), Counter(counter), DownloadedBytes(downloadedBytes), UploadedBytes(uploadedBytes)
{
switch (method) {
case EMethod::GET:
@@ -52,10 +52,8 @@ public:
curl_easy_setopt(Handle, CURLOPT_RANGE, (ToString(Offset) += '-').c_str());
}
- if (EMethod::PUT != method) {
- curl_easy_setopt(Handle, CURLOPT_WRITEFUNCTION, &WriteMemoryCallback);
- curl_easy_setopt(Handle, CURLOPT_WRITEDATA, static_cast<void*>(this));
- }
+ curl_easy_setopt(Handle, EMethod::PUT == method ? CURLOPT_HEADERFUNCTION : CURLOPT_WRITEFUNCTION, &WriteMemoryCallback);
+ curl_easy_setopt(Handle, EMethod::PUT == method ? CURLOPT_HEADERDATA :CURLOPT_WRITEDATA, static_cast<void*>(this));
if (withBody) {
curl_easy_setopt(Handle, CURLOPT_READFUNCTION, &ReadMemoryCallback);
@@ -72,8 +70,6 @@ public:
}
}
- virtual size_t GetExpectedSize() const { return 0ULL; }
-
CURL* GetHandle() const {
return Handle;
}
@@ -90,14 +86,18 @@ protected:
private:
static size_t
WriteMemoryCallback(void* contents, size_t size, size_t nmemb, void* userp) {
- auto self = static_cast<TEasyCurl*>(userp);
- self->DownloadedBytes->Add(size * nmemb);
- return self->Write(contents, size, nmemb);
+ const auto self = static_cast<TEasyCurl*>(userp);
+ const auto res = self->Write(contents, size, nmemb);
+ self->DownloadedBytes->Add(res);
+ return res;
};
static size_t
ReadMemoryCallback(char *buffer, size_t size, size_t nmemb, void *userp) {
- return static_cast<TEasyCurl*>(userp)->Read(buffer, size, nmemb);
+ const auto self = static_cast<TEasyCurl*>(userp);
+ const auto res = self->Read(buffer, size, nmemb);
+ self->UploadedBytes->Add(res);
+ return res;
};
const size_t Offset;
@@ -105,44 +105,7 @@ private:
curl_slist* Headers = nullptr;
const NMonitoring::TDynamicCounters::TCounterPtr Counter;
const NMonitoring::TDynamicCounters::TCounterPtr DownloadedBytes;
-};
-
-class TEasyCurlUpload : public TEasyCurl {
-public:
- using TPtr = std::shared_ptr<TEasyCurlUpload>;
- using TWeakPtr = std::weak_ptr<TEasyCurlUpload>;
-
- TEasyCurlUpload(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, TString data, IHTTPGateway::THeaders headers, IHTTPGateway::TOnResponse callback)
- : TEasyCurl(counter, uploadedBytes, url, headers, EMethod::PUT), Data(std::move(data)), Input(Data), Callback(std::move(callback))
- {}
-
- static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, TString data, IHTTPGateway::THeaders headers, IHTTPGateway::TOnResponse callback) {
- return std::make_shared<TEasyCurlUpload>(counter, uploadedBytes, std::move(url), std::move(data), std::move(headers), std::move(callback));
- }
-private:
- void Fail(const TIssue& error) final {
- Callback(TIssues{error});
- }
-
- void Done(CURLcode result) final {
- if (CURLE_OK != result)
- return Fail(TIssue(curl_easy_strerror(result)));
-
- long httpResponseCode = 0;
- curl_easy_getinfo(GetHandle(), CURLINFO_RESPONSE_CODE, &httpResponseCode);
- Callback(httpResponseCode);
- }
-
- size_t Write(void*, size_t, size_t) final { return 0ULL; }
-
- size_t Read(char *buffer, size_t size, size_t nmemb) final {
- return Input.Read(buffer, size * nmemb);
- }
-
- const TString Data;
- TStringInput Input;
-
- IHTTPGateway::TOnResponse Callback;
+ const NMonitoring::TDynamicCounters::TCounterPtr UploadedBytes;
};
class TEasyCurlBuffer : public TEasyCurl {
@@ -150,18 +113,18 @@ public:
using TPtr = std::shared_ptr<TEasyCurlBuffer>;
using TWeakPtr = std::weak_ptr<TEasyCurlBuffer>;
- TEasyCurlBuffer(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, bool post, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback)
- : TEasyCurl(counter, downloadedBytes, url, headers, post ? EMethod::POST : EMethod::GET, offset, !data.empty()), ExpectedSize(expectedSize), Data(std::move(data)), Input(Data), Output(Buffer)
+ TEasyCurlBuffer(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback)
+ : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, method, offset, !data.empty()), ExpectedSize(expectedSize), Data(std::move(data)), Input(Data), Output(Buffer)
{
Output.Reserve(ExpectedSize);
Callbacks.emplace(std::move(callback));
}
- static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, bool post, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback) {
- return std::make_shared<TEasyCurlBuffer>(counter, downloadedBytes, std::move(url), post, std::move(data), std::move(headers), offset, expectedSize, std::move(callback));
+ static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback) {
+ return std::make_shared<TEasyCurlBuffer>(counter, downloadedBytes, uploadededBytes, std::move(url), method, std::move(data), std::move(headers), offset, expectedSize, std::move(callback));
}
- size_t GetExpectedSize() const final {
+ size_t GetExpectedSize() const {
return ExpectedSize;
}
@@ -225,12 +188,12 @@ public:
using TPtr = std::shared_ptr<TEasyCurlStream>;
using TWeakPtr = std::weak_ptr<TEasyCurlStream>;
- TEasyCurlStream(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, size_t offset, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish)
- : TEasyCurl(counter, downloadedBytes, url, headers, EMethod::GET, offset), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL))
+ TEasyCurlStream(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, IHTTPGateway::THeaders headers, size_t offset, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish)
+ : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, EMethod::GET, offset), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL))
{}
- static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, size_t offset, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish) {
- return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, std::move(url), std::move(headers), offset, std::move(onNewData), std::move(onFinish));
+ static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, IHTTPGateway::THeaders headers, size_t offset, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish) {
+ return std::make_shared<TEasyCurlStream>(counter, downloadedBytes, uploadededBytes, std::move(url), std::move(headers), offset, std::move(onNewData), std::move(onFinish));
}
enum class EAction : i8 {
@@ -503,13 +466,14 @@ private:
}
}
- void Upload(TString url, THeaders headers, TString body, TOnResponse callback) final {
+ void Upload(TString url, THeaders headers, TString body, TOnResult callback, bool put, IRetryPolicy<long>::TPtr retryPolicy) final {
Rps->Inc();
- auto upload = TEasyCurlUpload::Make(InFlight, UploadedBytes, std::move(url), std::move(body), std::move(headers), std::move(callback));
const std::unique_lock lock(Sync);
- Await.emplace(std::move(upload));
- Wakeup(0ULL);
+ auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, UploadedBytes, std::move(url), put ? TEasyCurl::EMethod::PUT : TEasyCurl::EMethod::POST, std::move(body), std::move(headers), 0U, 0U, std::move(callback));
+ Easy2RetryState.emplace(easy, std::move(retryPolicy->CreateRetryState()));
+ Await.emplace(std::move(easy));
+ Wakeup(0U);
}
void Download(
@@ -518,7 +482,6 @@ private:
size_t expectedSize,
TOnResult callback,
TString data,
- bool post,
IRetryPolicy<long>::TPtr retryPolicy) final
{
Rps->Inc();
@@ -529,13 +492,12 @@ private:
return;
}
const std::unique_lock lock(Sync);
- TEasyCurlBuffer::TWeakPtr stub;
- auto& entry = post ? stub : Requests[TKeyType(url, 0U, headers, data, retryPolicy)];
+ auto& entry = Requests[TKeyType(url, 0U, headers, data, retryPolicy)];
if (const auto& easy = entry.lock())
if (easy->AddCallback(callback))
return;
- auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, std::move(url), post, std::move(data), std::move(headers), 0U, expectedSize, std::move(callback));
+ auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, UploadedBytes, std::move(url), TEasyCurl::EMethod::GET, std::move(data), std::move(headers), 0U, expectedSize, std::move(callback));
entry = easy;
Easy2RetryState.emplace(easy, std::move(retryPolicy->CreateRetryState()));
Await.emplace(std::move(easy));
@@ -549,7 +511,7 @@ private:
TOnNewDataPart onNewData,
TOnDownloadFinish onFinish) final
{
- auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, std::move(url), std::move(headers), offset, std::move(onNewData), std::move(onFinish));
+ auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, UploadedBytes, std::move(url), std::move(headers), offset, std::move(onNewData), std::move(onFinish));
const std::unique_lock lock(Sync);
const auto handle = stream->GetHandle();
Streams.emplace_back(stream);
@@ -599,7 +561,7 @@ private:
private:
CURLM* Handle = nullptr;
- std::queue<TEasyCurl::TPtr> Await;
+ std::queue<TEasyCurlBuffer::TPtr> Await;
std::vector<TEasyCurlStream::TWeakPtr> Streams;
std::unordered_map<CURL*, TEasyCurl::TPtr> Allocated;
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
index 8c7a142f900..926c4f790bf 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
@@ -62,22 +62,23 @@ public:
};
using THeaders = TSmallVec<TString>;
-
- using TResponse = std::variant<long, TIssues>;
- using TOnResponse = std::function<void(TResponse&&)>;
-
- virtual void Upload(TString url, THeaders headers, TString body, TOnResponse callback) = 0;
-
using TResult = std::variant<TContent, TIssues>;
using TOnResult = std::function<void(TResult&&)>;
+ virtual void Upload(
+ TString url,
+ THeaders headers,
+ TString body,
+ TOnResult callback,
+ bool put = false,
+ IRetryPolicy</*http response code*/long>::TPtr RetryPolicy = IRetryPolicy<long>::GetNoRetryPolicy()) = 0;
+
virtual void Download(
TString url,
THeaders headers,
std::size_t expectedSize,
TOnResult callback,
TString data = {},
- bool post = false,
IRetryPolicy</*http response code*/long>::TPtr RetryPolicy = IRetryPolicy<long>::GetNoRetryPolicy()
) = 0;
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
index 2aedf58d892..24e3997679a 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
@@ -53,8 +53,9 @@ struct TEvPrivate {
};
struct TEvUploadPartFinished : public TEventLocal<TEvUploadPartFinished, EvUploadPartFinished> {
- explicit TEvUploadPartFinished(size_t size) : Size(size) {}
- const size_t Size;
+ TEvUploadPartFinished(size_t size, size_t index, TString&& etag) : Size(size), Index(index), ETag(std::move(etag)) {}
+ const size_t Size, Index;
+ const TString ETag;
};
};
@@ -122,7 +123,7 @@ public:
void Bootstrap() {
Become(&TS3WriteActor::InitialStateFunc);
- Gateway->Download(Url + Path + "?uploads", Headers, 0, std::bind(&TS3WriteActor::OnUploadsCreated, ActorSystem, SelfId(), std::placeholders::_1), "", true);
+ Gateway->Upload(Url + Path + "?uploads", Headers, "", std::bind(&TS3WriteActor::OnUploadsCreated, ActorSystem, SelfId(), std::placeholders::_1), false);
}
static constexpr char ActorName[] = "S3_WRITE_ACTOR";
@@ -131,7 +132,7 @@ private:
void LoadState(const NDqProto::TSinkState&) final {};
ui64 GetOutputIndex() const final { return OutputIndex; }
i64 GetFreeSpace() const final {
- return 1_GB - InFlight - std::accumulate(Parts.cbegin(), Parts.cend(), 0LL, [](i64 s, const NUdf::TUnboxedValuePod v){ return v ? s + v.AsStringRef().Size() : s; });
+ return 1_GB - InFlight - InQueue;
}
STRICT_STFUNC(InitialStateFunc,
@@ -159,9 +160,10 @@ private:
actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << message << ", error: code: " << code)})));
} else if (root.Name() != "InitiateMultipartUploadResult")
actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response '" << root.Name() << "' on create upload.")})));
- else
- actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadStarted(root.Node("UploadId", true).Value<TString>())));
-
+ else {
+ const NXml::TNamespacesForXPath nss(1U, {"s3", "http://s3.amazonaws.com/doc/2006-03-01/"});
+ actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadStarted(root.Node("s3:UploadId", false, nss).Value<TString>())));
+ }
break;
} catch (const std::exception& ex) {
actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Error '" << ex.what() << "' on parse create upload response.")})));
@@ -176,15 +178,23 @@ private:
}
}
- static void OnPartUploadFinish(TActorSystem* actorSystem, TActorId selfId, size_t size, IHTTPGateway::TResponse&& response) {
+ static void OnPartUploadFinish(TActorSystem* actorSystem, TActorId selfId, size_t size, size_t index, IHTTPGateway::TResult&& response) {
switch (response.index()) {
- case 0U:
- if (const auto code = std::get<long>(std::move(response)); 200L == code)
- actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadPartFinished(size)));
- else
- actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response code " << code)})));
+ case 0U: {
+ const auto str = std::get<IHTTPGateway::TContent>(std::move(response)).Extract();
+
+ if (const auto p = str.find("etag: \""); p != TString::npos) {
+ if (const auto p1 = p + 7, p2 = str.find("\"", p1); p2 != TString::npos) {
+ actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadPartFinished(size, index, str.substr(p1, p2 - p1))));
+ break;
+ }
+ }
+ actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response:" << Endl << str)})));
+ break;
+ }
case 1U:
actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError(std::get<TIssues>(std::move(response)))));
+ break;
}
}
@@ -200,7 +210,6 @@ private:
actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response '" << root.Name() << "' on finish upload.")})));
else
actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadFinished()));
-
break;
} catch (const std::exception& ex) {
actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Error '" << ex.what() << "' on parse finish upload response.")})));
@@ -215,15 +224,17 @@ private:
}
}
- void SendData(TUnboxedValueVector&& data, i64, const TMaybe<NDqProto::TCheckpoint>&, bool finished) final {
+ void SendData(TUnboxedValueVector&& data, i64 size, const TMaybe<NDqProto::TCheckpoint>&, bool finished) final {
InputFinished = finished;
- std::move(data.begin(), data.end(), std::back_inserter(Parts));
+ for (const auto& v : data)
+ Parts.emplace(v.AsStringRef());
+ data.clear();
+ InQueue += size;
if (!UploadId.empty())
StartUploadParts();
}
void Handle(TEvPrivate::TEvUploadError::TPtr& result) {
- Parts.clear();
Callbacks->OnAsyncOutputError(OutputIndex, result->Get()->Error, true);
if (!UploadId.empty()) {
// TODO: Send delete.
@@ -232,30 +243,29 @@ private:
void Handle(TEvPrivate::TEvUploadStarted::TPtr& result) {
UploadId = result->Get()->UploadId;
- Become(&TS3WriteActor::InitialStateFunc);
+ Become(&TS3WriteActor::WorkingStateFunc);
StartUploadParts();
}
void Handle(TEvPrivate::TEvUploadPartFinished::TPtr& result) {
InFlight -= result->Get()->Size;
+ Tags[result->Get()->Index] = std::move(result->Get()->ETag);
- if (!InFlight && std::all_of(Parts.cbegin(), Parts.cend(), std::logical_not<NUdf::TUnboxedValuePod>())) {
+ if (!InFlight && InputFinished && Parts.empty()) {
Become(&TS3WriteActor::FinalStateFunc);
-
TStringBuilder xml;
xml << "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" << Endl;
xml << "<CompleteMultipartUpload xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">" << Endl;
- for (auto i = 1U; i <= Parts.size(); ++i)
- xml << "<Part><PartNumber>" << i << "</Part></PartNumber>" << Endl;
+ size_t i = 0U;
+ for (const auto& tag : Tags)
+ xml << "<Part><PartNumber>" << ++i << "</PartNumber><ETag>" << tag << "</ETag></Part>" << Endl;
xml << "</CompleteMultipartUpload>" << Endl;
- Gateway->Download(Url + Path + "?uploadId=" + UploadId, Headers, 0, std::bind(&TS3WriteActor::OnUploadFinish, ActorSystem, SelfId(), std::placeholders::_1), xml, true);
+ Gateway->Upload(Url + Path + "?uploadId=" + UploadId, Headers, xml, std::bind(&TS3WriteActor::OnUploadFinish, ActorSystem, SelfId(), std::placeholders::_1), false);
}
}
void HandleFinished() {
- if (InputFinished && !InFlight && Parts.empty()) {
- Callbacks->OnAsyncOutputFinished(OutputIndex);
- }
+ return Callbacks->OnAsyncOutputFinished(OutputIndex);
}
// IActor & IDqComputeActorAsyncOutput
@@ -268,16 +278,17 @@ private:
}
void StartUploadParts() {
- for (auto i = 0U; i < Parts.size(); ++i) {
- if (auto part = std::move(Parts[i])) {
- const auto size = part.AsStringRef().Size();
- InFlight += size;
- Gateway->Upload(Url + Path + "?partNumber=" + std::to_string(i + 1) + "&uploadId=" + UploadId, Headers, TString(part.AsStringRef()), std::bind(&TS3WriteActor::OnPartUploadFinish, ActorSystem, SelfId(), size, std::placeholders::_1));
- }
+ for (InQueue = 0ULL; !Parts.empty(); Parts.pop()) {
+ const auto size = Parts.front().size();
+ const auto index = Tags.size();
+ Tags.emplace_back();
+ InFlight += size;
+ Gateway->Upload(Url + Path + "?partNumber=" + std::to_string(index + 1) + "&uploadId=" + UploadId, Headers, std::move(Parts.front()), std::bind(&TS3WriteActor::OnPartUploadFinish, ActorSystem, SelfId(), size, index, std::placeholders::_1), true);
}
}
- bool InputFinished = false;
+ bool InputFinished = false;
+ size_t InQueue = 0ULL;
size_t InFlight = 0ULL;
const IHTTPGateway::TPtr Gateway;
@@ -291,7 +302,8 @@ private:
const IHTTPGateway::THeaders Headers;
const TString Path;
- TUnboxedValueVector Parts;
+ std::queue<TString> Parts;
+ std::vector<TString> Tags;
std::vector<TRetryParams> RetriesPerPath;
const std::shared_ptr<NS3::TRetryConfig> RetryConfig;
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp
index 5b6af20c7b1..5174721c66a 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp
@@ -212,7 +212,6 @@ private:
prefix,
maxDiscoveryFilesPerQuery),
/*data=*/"",
- false,
retryPolicy);
}
}
@@ -270,7 +269,6 @@ private:
prefix,
MaxFilesPerQuery),
/*data=*/"",
- false,
retryPolicy);
return future;
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp
index d9e83ee3ccd..8ffbd2be00d 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp
@@ -20,8 +20,9 @@ TRuntimeNode BuildSerializeCall(
TType* inputType,
NCommon::TMkqlBuildContext& ctx)
{
+ const auto inputItemType = AS_TYPE(TFlowType, inputType)->GetItemType();
if (format == "raw") {
- const auto structType = AS_TYPE(TStructType, AS_TYPE(TFlowType, inputType)->GetItemType());
+ const auto structType = AS_TYPE(TStructType, inputItemType);
MKQL_ENSURE(1U == structType->GetMembersCount(), "Expected single column.");
const auto schemeType = AS_TYPE(TDataType, structType->GetMemberType(0U))->GetSchemeType();
return ctx.ProgramBuilder.Map(input,
@@ -31,11 +32,7 @@ TRuntimeNode BuildSerializeCall(
}
);
} else if (format == "json_list") {
- return ctx.ProgramBuilder.FlatMap(
- ctx.ProgramBuilder.Condense(input, ctx.ProgramBuilder.NewList(AS_TYPE(TFlowType, inputType)->GetItemType(), {}),
- [&ctx] (TRuntimeNode, TRuntimeNode) { return ctx.ProgramBuilder.NewDataLiteral<bool>(false); },
- [&ctx] (TRuntimeNode item, TRuntimeNode state) { return ctx.ProgramBuilder.Append(state, item); }
- ),
+ return ctx.ProgramBuilder.FlatMap(ctx.ProgramBuilder.SqueezeToList(input, ctx.ProgramBuilder.NewEmptyOptionalDataLiteral(NUdf::TDataType<ui64>::Id)),
[&ctx] (TRuntimeNode list) {
const auto userType = ctx.ProgramBuilder.NewTupleType({ctx.ProgramBuilder.NewTupleType({list.GetStaticType()})});
return ctx.ProgramBuilder.ToString(ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("Yson2.SerializeJson"), {ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("Yson2.From", {}, userType), {list})}));
@@ -43,7 +40,8 @@ TRuntimeNode BuildSerializeCall(
);
}
- throw yexception() << "Unsupported format: " << format;
+ const auto userType = ctx.ProgramBuilder.NewTupleType({ctx.ProgramBuilder.NewTupleType({ctx.ProgramBuilder.NewStreamType(inputItemType)})});
+ return ctx.ProgramBuilder.ToFlow(ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("ClickHouseClient.SerializeFormat", {}, userType, format), {ctx.ProgramBuilder.FromFlow(input)}));
}
TRuntimeNode SerializeForS3(const TS3SinkOutput& wrapper, NCommon::TMkqlBuildContext& ctx) {