aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <a-romanov@yandex-team.ru>2022-06-30 17:30:21 +0300
committera-romanov <a-romanov@yandex-team.ru>2022-06-30 17:30:21 +0300
commit1d8f8dfd93dc506a0fd6a696eda1cf149afdb1f8 (patch)
treea026078e8d7233a503cb3349ada7289a8db4022c
parent9a30021fae9ff838945d397ccf4f3ae60dabb47e (diff)
downloadydb-1d8f8dfd93dc506a0fd6a696eda1cf149afdb1f8.tar.gz
YQL-15033 S3 sink draft.
ref:bdf2543586747072f2f6ab8149b0e732df2fb9f9
-rw-r--r--ydb/core/yq/libs/test_connection/test_object_storage.cpp1
-rw-r--r--ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp3
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp107
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h9
-rw-r--r--ydb/library/yql/providers/s3/actors/CMakeLists.darwin.txt3
-rw-r--r--ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt3
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp15
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h18
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp321
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h24
-rw-r--r--ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json50
-rw-r--r--ydb/library/yql/providers/s3/proto/CMakeLists.txt1
-rw-r--r--ydb/library/yql/providers/s3/proto/sink.proto11
-rw-r--r--ydb/library/yql/providers/s3/provider/CMakeLists.txt1
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasink.cpp73
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasink_execution.cpp89
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp84
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasource.cpp2
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp19
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp16
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp50
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp104
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp2
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_provider.h2
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_provider_impl.cpp11
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_provider_impl.h2
26 files changed, 972 insertions, 49 deletions
diff --git a/ydb/core/yq/libs/test_connection/test_object_storage.cpp b/ydb/core/yq/libs/test_connection/test_object_storage.cpp
index 80832654cf..52a2b81e55 100644
--- a/ydb/core/yq/libs/test_connection/test_object_storage.cpp
+++ b/ydb/core/yq/libs/test_connection/test_object_storage.cpp
@@ -175,6 +175,7 @@ private:
0U,
std::bind(&DiscoveryCallback, std::placeholders::_1, SelfId(), TActivationContext::ActorSystem()),
/*data=*/"",
+ false,
retryPolicy
);
}
diff --git a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp
index 8b61bbddce..6c66c927a1 100644
--- a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp
@@ -43,12 +43,15 @@ public:
return std::move(ret);
}
+ void Upload(TString, THeaders, TString, TOnResponse) {}
+
void Download(
TString url,
IHTTPGateway::THeaders headers,
std::size_t expectedSize,
IHTTPGateway::TOnResult callback,
TString data,
+ bool,
IRetryPolicy<long>::TPtr retryPolicy)
{
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
index 0b60b1e515..83429896b6 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
@@ -18,13 +18,27 @@ class TEasyCurl {
public:
using TPtr = std::shared_ptr<TEasyCurl>;
- TEasyCurl(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, size_t offset, bool withData)
+ enum class EMethod {
+ GET,
+ POST,
+ PUT
+ };
+
+ TEasyCurl(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, EMethod method, size_t offset = 0ULL, bool withBody = false)
: Offset(offset), Handle(curl_easy_init()), Counter(counter), DownloadedBytes(downloadedBytes)
{
+ switch (method) {
+ case EMethod::GET:
+ break;
+ case EMethod::POST:
+ curl_easy_setopt(Handle, CURLOPT_POST, 1L);
+ break;
+ case EMethod::PUT:
+ curl_easy_setopt(Handle, CURLOPT_UPLOAD, 1L);
+ break;
+ }
+
curl_easy_setopt(Handle, CURLOPT_URL, url.c_str());
- curl_easy_setopt(Handle, CURLOPT_POST, withData ? 1L : 0L);
- curl_easy_setopt(Handle, CURLOPT_WRITEFUNCTION, &WriteMemoryCallback);
- curl_easy_setopt(Handle, CURLOPT_WRITEDATA, static_cast<void*>(this));
curl_easy_setopt(Handle, CURLOPT_USERAGENT, "YQ HTTP gateway");
curl_easy_setopt(Handle, CURLOPT_SSL_VERIFYPEER, 0L);
@@ -38,7 +52,12 @@ public:
curl_easy_setopt(Handle, CURLOPT_RANGE, (ToString(Offset) += '-').c_str());
}
- if (withData) {
+ if (EMethod::PUT != method) {
+ curl_easy_setopt(Handle, CURLOPT_WRITEFUNCTION, &WriteMemoryCallback);
+ curl_easy_setopt(Handle, CURLOPT_WRITEDATA, static_cast<void*>(this));
+ }
+
+ if (withBody) {
curl_easy_setopt(Handle, CURLOPT_READFUNCTION, &ReadMemoryCallback);
curl_easy_setopt(Handle, CURLOPT_READDATA, static_cast<void*>(this));
}
@@ -53,6 +72,8 @@ public:
}
}
+ virtual size_t GetExpectedSize() const { return 0ULL; }
+
CURL* GetHandle() const {
return Handle;
}
@@ -86,23 +107,61 @@ private:
const NMonitoring::TDynamicCounters::TCounterPtr DownloadedBytes;
};
+class TEasyCurlUpload : public TEasyCurl {
+public:
+ using TPtr = std::shared_ptr<TEasyCurlUpload>;
+ using TWeakPtr = std::weak_ptr<TEasyCurlUpload>;
+
+ TEasyCurlUpload(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, TString data, IHTTPGateway::THeaders headers, IHTTPGateway::TOnResponse callback)
+ : TEasyCurl(counter, uploadedBytes, url, headers, EMethod::PUT), Data(std::move(data)), Input(Data), Callback(std::move(callback))
+ {}
+
+ static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, TString data, IHTTPGateway::THeaders headers, IHTTPGateway::TOnResponse callback) {
+ return std::make_shared<TEasyCurlUpload>(counter, uploadedBytes, std::move(url), std::move(data), std::move(headers), std::move(callback));
+ }
+private:
+ void Fail(const TIssue& error) final {
+ Callback(TIssues{error});
+ }
+
+ void Done(CURLcode result) final {
+ if (CURLE_OK != result)
+ return Fail(TIssue(curl_easy_strerror(result)));
+
+ long httpResponseCode = 0;
+ curl_easy_getinfo(GetHandle(), CURLINFO_RESPONSE_CODE, &httpResponseCode);
+ Callback(httpResponseCode);
+ }
+
+ size_t Write(void*, size_t, size_t) final { return 0ULL; }
+
+ size_t Read(char *buffer, size_t size, size_t nmemb) final {
+ return Input.Read(buffer, size * nmemb);
+ }
+
+ const TString Data;
+ TStringInput Input;
+
+ IHTTPGateway::TOnResponse Callback;
+};
+
class TEasyCurlBuffer : public TEasyCurl {
public:
using TPtr = std::shared_ptr<TEasyCurlBuffer>;
using TWeakPtr = std::weak_ptr<TEasyCurlBuffer>;
- TEasyCurlBuffer(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback)
- : TEasyCurl(counter, downloadedBytes, url, headers, offset, !data.empty()), ExpectedSize(expectedSize), Data(std::move(data)), Input(Data), Output(Buffer)
+ TEasyCurlBuffer(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, bool post, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback)
+ : TEasyCurl(counter, downloadedBytes, url, headers, post ? EMethod::POST : EMethod::GET, offset, !data.empty()), ExpectedSize(expectedSize), Data(std::move(data)), Input(Data), Output(Buffer)
{
Output.Reserve(ExpectedSize);
Callbacks.emplace(std::move(callback));
}
- static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback) {
- return std::make_shared<TEasyCurlBuffer>(counter, downloadedBytes, std::move(url), std::move(data), std::move(headers), offset, expectedSize, std::move(callback));
+ static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, bool post, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback) {
+ return std::make_shared<TEasyCurlBuffer>(counter, downloadedBytes, std::move(url), post, std::move(data), std::move(headers), offset, expectedSize, std::move(callback));
}
- size_t GetExpectedSize() const {
+ size_t GetExpectedSize() const final {
return ExpectedSize;
}
@@ -145,11 +204,11 @@ private:
const auto realsize = size * nmemb;
Output.Write(contents, realsize);
return realsize;
- };
+ }
size_t Read(char *buffer, size_t size, size_t nmemb) final {
return Input.Read(buffer, size * nmemb);
- };
+ }
const size_t ExpectedSize;
const TString Data;
@@ -167,7 +226,7 @@ public:
using TWeakPtr = std::weak_ptr<TEasyCurlStream>;
TEasyCurlStream(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, size_t offset, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish)
- : TEasyCurl(counter, downloadedBytes, url, headers, offset, false), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL))
+ : TEasyCurl(counter, downloadedBytes, url, headers, EMethod::GET, offset), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL))
{}
static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, size_t offset, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish) {
@@ -208,7 +267,7 @@ private:
Position += realsize;
OnNewData(IHTTPGateway::TCountedContent(TString(static_cast<char*>(contents), realsize), Counter));
return realsize;
- };
+ }
size_t Read(char*, size_t, size_t) final { return 0ULL; }
@@ -258,6 +317,7 @@ public:
, AwaitQueue(Counters->GetCounter("AwaitQueue"))
, AwaitQueueTopExpectedSize(Counters->GetCounter("AwaitQueueTopExpectedSize"))
, DownloadedBytes(Counters->GetCounter("DownloadedBytes", true))
+ , UploadedBytes(Counters->GetCounter("UploadedBytes", true))
{
if (httpGatewaysCfg) {
if (httpGatewaysCfg->HasMaxInFlightCount()) {
@@ -443,12 +503,22 @@ private:
}
}
+ void Upload(TString url, THeaders headers, TString body, TOnResponse callback) final {
+ Rps->Inc();
+
+ auto upload = TEasyCurlUpload::Make(InFlight, UploadedBytes, std::move(url), std::move(body), std::move(headers), std::move(callback));
+ const std::unique_lock lock(Sync);
+ Await.emplace(std::move(upload));
+ Wakeup(0ULL);
+ }
+
void Download(
TString url,
THeaders headers,
size_t expectedSize,
TOnResult callback,
TString data,
+ bool post,
IRetryPolicy<long>::TPtr retryPolicy) final
{
Rps->Inc();
@@ -459,12 +529,13 @@ private:
return;
}
const std::unique_lock lock(Sync);
- auto& entry = Requests[TKeyType(url, 0U, headers, data, retryPolicy)];
+ TEasyCurlBuffer::TWeakPtr stub;
+ auto& entry = post ? stub : Requests[TKeyType(url, 0U, headers, data, retryPolicy)];
if (const auto& easy = entry.lock())
if (easy->AddCallback(callback))
return;
- auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, std::move(url), std::move(data), std::move(headers), 0U, expectedSize, std::move(callback));
+ auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, std::move(url), post, std::move(data), std::move(headers), 0U, expectedSize, std::move(callback));
entry = easy;
Easy2RetryState.emplace(easy, std::move(retryPolicy->CreateRetryState()));
Await.emplace(std::move(easy));
@@ -478,7 +549,6 @@ private:
TOnNewDataPart onNewData,
TOnDownloadFinish onFinish) final
{
-
auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, std::move(url), std::move(headers), offset, std::move(onNewData), std::move(onFinish));
const std::unique_lock lock(Sync);
const auto handle = stream->GetHandle();
@@ -529,7 +599,7 @@ private:
private:
CURLM* Handle = nullptr;
- std::queue<TEasyCurlBuffer::TPtr> Await;
+ std::queue<TEasyCurl::TPtr> Await;
std::vector<TEasyCurlStream::TWeakPtr> Streams;
std::unordered_map<CURL*, TEasyCurl::TPtr> Allocated;
@@ -557,6 +627,7 @@ private:
const NMonitoring::TDynamicCounters::TCounterPtr AwaitQueue;
const NMonitoring::TDynamicCounters::TCounterPtr AwaitQueueTopExpectedSize;
const NMonitoring::TDynamicCounters::TCounterPtr DownloadedBytes;
+ const NMonitoring::TDynamicCounters::TCounterPtr UploadedBytes;
TTaskScheduler TaskScheduler;
};
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
index 15e554a7d8..8c7a142f90 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
@@ -61,9 +61,15 @@ public:
long HttpResponseCode;
};
+ using THeaders = TSmallVec<TString>;
+
+ using TResponse = std::variant<long, TIssues>;
+ using TOnResponse = std::function<void(TResponse&&)>;
+
+ virtual void Upload(TString url, THeaders headers, TString body, TOnResponse callback) = 0;
+
using TResult = std::variant<TContent, TIssues>;
using TOnResult = std::function<void(TResult&&)>;
- using THeaders = TSmallVec<TString>;
virtual void Download(
TString url,
@@ -71,6 +77,7 @@ public:
std::size_t expectedSize,
TOnResult callback,
TString data = {},
+ bool post = false,
IRetryPolicy</*http response code*/long>::TPtr RetryPolicy = IRetryPolicy<long>::GetNoRetryPolicy()
) = 0;
diff --git a/ydb/library/yql/providers/s3/actors/CMakeLists.darwin.txt b/ydb/library/yql/providers/s3/actors/CMakeLists.darwin.txt
index af03361e05..4ad08563c3 100644
--- a/ydb/library/yql/providers/s3/actors/CMakeLists.darwin.txt
+++ b/ydb/library/yql/providers/s3/actors/CMakeLists.darwin.txt
@@ -14,6 +14,7 @@ target_compile_options(providers-s3-actors PRIVATE
target_link_libraries(providers-s3-actors PUBLIC
contrib-libs-cxxsupp
yutil
+ cpp-xml-document
yql-minikql-computation
common-token_accessor-client
common-schema-mkql
@@ -23,5 +24,7 @@ target_link_libraries(providers-s3-actors PUBLIC
providers-s3-proto
)
target_sources(providers-s3-actors PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
)
diff --git a/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt b/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt
index 5127a2fcbc..d1802d1ec9 100644
--- a/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt
+++ b/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt
@@ -21,6 +21,7 @@ target_link_libraries(providers-s3-actors PUBLIC
yutil
contrib-libs-fmt
libs-poco-Util
+ cpp-xml-document
yql-minikql-computation
common-token_accessor-client
common-schema-mkql
@@ -33,5 +34,7 @@ target_link_libraries(providers-s3-actors PUBLIC
)
target_sources(providers-s3-actors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
)
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp
new file mode 100644
index 0000000000..fd9663f4c0
--- /dev/null
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp
@@ -0,0 +1,15 @@
+#include "yql_s3_sink_factory.h"
+#include "yql_s3_write_actor.h"
+
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
+
+namespace NYql::NDq {
+
+void RegisterS3WriteActorFactory(TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway, const std::shared_ptr<NYql::NS3::TRetryConfig>& retryConfig) {
+ factory.RegisterSink<NS3::TSink>("S3Sink",
+ [credentialsFactory, gateway, retryConfig](NS3::TSink&& settings, IDqAsyncIoFactory::TSinkArguments&& args) {
+ return CreateS3WriteActor(args.TypeEnv, *args.HolderFactory.GetFunctionRegistry(), gateway, std::move(settings), args.OutputIndex, args.SecureParams, args.Callback, credentialsFactory, retryConfig);
+ });
+}
+
+}
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h b/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h
new file mode 100644
index 0000000000..80b7fbf7ca
--- /dev/null
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h
@@ -0,0 +1,18 @@
+#pragma once
+
+#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
+
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
+#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
+#include <ydb/library/yql/providers/s3/proto/retry_config.pb.h>
+
+
+namespace NYql::NDq {
+
+void RegisterS3WriteActorFactory(
+ TDqAsyncIoFactory& factory,
+ ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
+ IHTTPGateway::TPtr gateway,
+ const std::shared_ptr<NYql::NS3::TRetryConfig>& retryConfig = nullptr);
+}
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
new file mode 100644
index 0000000000..79785df724
--- /dev/null
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
@@ -0,0 +1,321 @@
+#include "yql_s3_write_actor.h"
+
+#include <ydb/library/yql/utils/yql_panic.h>
+
+#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/events.h>
+#include <library/cpp/actors/core/event_local.h>
+#include <library/cpp/actors/core/hfunc.h>
+
+#include <util/generic/size_literals.h>
+
+#include <queue>
+
+#ifdef THROW
+#undef THROW
+#endif
+#include <library/cpp/xml/document/xml-document.h>
+
+namespace NYql::NDq {
+
+using namespace NActors;
+
+namespace {
+
+struct TEvPrivate {
+ // Event ids
+ enum EEv : ui32 {
+ EvBegin = EventSpaceBegin(TEvents::ES_PRIVATE),
+
+ EvUploadError = EvBegin,
+ EvUploadStarted,
+ EvUploadPartFinished,
+ EvUploadFinished,
+
+ EvEnd
+ };
+
+ static_assert(EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)");
+
+ // Events
+ struct TEvUploadFinished : public TEventLocal<TEvUploadFinished, EvUploadFinished> {};
+
+ struct TEvUploadError : public TEventLocal<TEvUploadError, EvUploadError> {
+ explicit TEvUploadError(TIssues&& error) : Error(std::move(error)) {}
+ const TIssues Error;
+ };
+
+ struct TEvUploadStarted : public TEventLocal<TEvUploadStarted, EvUploadStarted> {
+ explicit TEvUploadStarted(TString&& uploadId) : UploadId(std::move(uploadId)) {}
+ const TString UploadId;
+ };
+
+ struct TEvUploadPartFinished : public TEventLocal<TEvUploadPartFinished, EvUploadPartFinished> {
+ explicit TEvUploadPartFinished(size_t size) : Size(size) {}
+ const size_t Size;
+ };
+};
+
+using TPath = std::tuple<TString, size_t>;
+using TPathList = std::vector<TPath>;
+
+class TRetryParams {
+public:
+ TRetryParams(const std::shared_ptr<NS3::TRetryConfig>& retryConfig)
+ : MaxRetries(retryConfig && retryConfig->GetMaxRetriesPerPath() ? retryConfig->GetMaxRetriesPerPath() : 3U)
+ , InitDelayMs(retryConfig && retryConfig->GetInitialDelayMs() ? TDuration::MilliSeconds(retryConfig->GetInitialDelayMs()) : TDuration::MilliSeconds(100))
+ , InitEpsilon(retryConfig && retryConfig->GetEpsilon() ? retryConfig->GetEpsilon() : 0.1)
+ {
+ Y_VERIFY(0. < InitEpsilon && InitEpsilon < 1.);
+ Reset();
+ }
+
+ void Reset() {
+ Retries = 0U;
+ DelayMs = InitDelayMs;
+ Epsilon = InitEpsilon;
+ }
+
+ TDuration GetNextDelay() {
+ if (++Retries > MaxRetries)
+ return TDuration::Zero();
+ return DelayMs = GenerateNextDelay();
+ }
+private:
+ TDuration GenerateNextDelay() {
+ const auto low = 1. - Epsilon;
+ const auto jitter = low + std::rand() / (RAND_MAX / (2. * Epsilon));
+ return DelayMs * jitter;
+ }
+
+ const ui32 MaxRetries;
+ const TDuration InitDelayMs;
+ const double InitEpsilon;
+
+ ui32 Retries;
+ TDuration DelayMs;
+ double Epsilon;
+};
+
+using namespace NKikimr::NMiniKQL;
+
+class TS3WriteActor : public TActorBootstrapped<TS3WriteActor>, public IDqComputeActorAsyncOutput {
+public:
+ TS3WriteActor(ui64 outputIndex,
+ IHTTPGateway::TPtr gateway,
+ const TString& url,
+ const TString& token,
+ const TString& path,
+ IDqComputeActorAsyncOutput::ICallbacks* callbacks,
+ const std::shared_ptr<NS3::TRetryConfig>& retryConfig)
+ : Gateway(std::move(gateway))
+ , OutputIndex(outputIndex)
+ , Callbacks(callbacks)
+ , ActorSystem(TActivationContext::ActorSystem())
+ , Url(url)
+ , Headers(MakeHeader(token))
+ , Path(path)
+ , RetryConfig(retryConfig)
+ {}
+
+ void Bootstrap() {
+ Become(&TS3WriteActor::InitialStateFunc);
+ Gateway->Download(Url + Path + "?uploads", Headers, 0, std::bind(&TS3WriteActor::OnUploadsCreated, ActorSystem, SelfId(), std::placeholders::_1), "", true);
+ }
+
+ static constexpr char ActorName[] = "S3_WRITE_ACTOR";
+private:
+ void CommitState(const NDqProto::TCheckpoint&) final {};
+ void LoadState(const NDqProto::TSinkState&) final {};
+ ui64 GetOutputIndex() const final { return OutputIndex; }
+ i64 GetFreeSpace() const final {
+ return 1_GB - InFlight - std::accumulate(Parts.cbegin(), Parts.cend(), 0LL, [](i64 s, const NUdf::TUnboxedValuePod v){ return v ? s + v.AsStringRef().Size() : s; });
+ }
+
+ STRICT_STFUNC(InitialStateFunc,
+ hFunc(TEvPrivate::TEvUploadError, Handle);
+ hFunc(TEvPrivate::TEvUploadStarted, Handle);
+ )
+
+ STRICT_STFUNC(WorkingStateFunc,
+ hFunc(TEvPrivate::TEvUploadError, Handle);
+ hFunc(TEvPrivate::TEvUploadPartFinished, Handle);
+ )
+
+ STRICT_STFUNC(FinalStateFunc,
+ hFunc(TEvPrivate::TEvUploadError, Handle);
+ cFunc(TEvPrivate::EvUploadFinished, HandleFinished);
+ )
+
+ static void OnUploadsCreated(TActorSystem* actorSystem, TActorId selfId, IHTTPGateway::TResult&& result) {
+ switch (result.index()) {
+ case 0U: try {
+ const NXml::TDocument xml(std::get<IHTTPGateway::TContent>(std::move(result)).Extract(), NXml::TDocument::String);
+ if (const auto& root = xml.Root(); root.Name() == "Error") {
+ const auto& code = root.Node("Code", true).Value<TString>();
+ const auto& message = root.Node("Message", true).Value<TString>();
+ actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << message << ", error: code: " << code)})));
+ } else if (root.Name() != "InitiateMultipartUploadResult")
+ actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response '" << root.Name() << "' on discovery.")})));
+ else
+ actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadStarted(root.Node("UploadId", true).Value<TString>())));
+
+ break;
+ } catch (const std::exception& ex) {
+ actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Error '" << ex.what() << "' on parse discovery response.")})));
+ break;
+ }
+ case 1U:
+ actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError(std::get<TIssues>(std::move(result)))));
+ break;
+ default:
+ actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected variant index " << result.index() << " on discovery.")})));
+ break;
+ }
+ }
+
+ static void OnPartUploadFinish(TActorSystem* actorSystem, TActorId selfId, size_t size, IHTTPGateway::TResponse&& response) {
+ switch (response.index()) {
+ case 0U:
+ if (const auto code = std::get<long>(std::move(response)); 200L == code)
+ actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadPartFinished(size)));
+ else
+ actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response code " << code)})));
+ case 1U:
+ actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError(std::get<TIssues>(std::move(response)))));
+ }
+ }
+
+ static void OnUploadFinish(TActorSystem* actorSystem, TActorId selfId, IHTTPGateway::TResult&& result) {
+ switch (result.index()) {
+ case 0U: try {
+ const NXml::TDocument xml(std::get<IHTTPGateway::TContent>(std::move(result)).Extract(), NXml::TDocument::String);
+ if (const auto& root = xml.Root(); root.Name() == "Error") {
+ const auto& code = root.Node("Code", true).Value<TString>();
+ const auto& message = root.Node("Message", true).Value<TString>();
+ actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << message << ", error: code: " << code)})));
+ } else if (root.Name() != "CompleteMultipartUploadResult")
+ actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response '" << root.Name() << "' on discovery.")})));
+ else
+ actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadFinished()));
+
+ break;
+ } catch (const std::exception& ex) {
+ actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Error '" << ex.what() << "' on parse discovery response.")})));
+ break;
+ }
+ case 1U:
+ actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError(std::get<TIssues>(std::move(result)))));
+ break;
+ default:
+ actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected variant index " << result.index() << " on discovery.")})));
+ break;
+ }
+ }
+
+ void SendData(TUnboxedValueVector&& data, i64 size, const TMaybe<NDqProto::TCheckpoint>&, bool finished) final {
+ Cerr << __func__ << '(' << data.size() << ',' << size << ',' << finished << ')' << Endl;
+ InputFinished = finished;
+ std::move(data.begin(), data.end(), std::back_inserter(Parts));
+ if (!UploadId.empty())
+ StartUploadParts();
+ }
+
+ void Handle(TEvPrivate::TEvUploadError::TPtr& result) {
+ Parts.clear();
+ Callbacks->OnAsyncOutputError(OutputIndex, result->Get()->Error, true);
+ if (!UploadId.empty()) {
+ // TODO: Send delete.
+ }
+ }
+
+ void Handle(TEvPrivate::TEvUploadStarted::TPtr& result) {
+ UploadId = result->Get()->UploadId;
+ Become(&TS3WriteActor::InitialStateFunc);
+ StartUploadParts();
+ }
+
+ void Handle(TEvPrivate::TEvUploadPartFinished::TPtr& result) {
+ InFlight -= result->Get()->Size;
+
+ if (!InFlight && std::all_of(Parts.cbegin(), Parts.cend(), std::logical_not<NUdf::TUnboxedValuePod>())) {
+ Become(&TS3WriteActor::FinalStateFunc);
+
+ TStringBuilder xml;
+ xml << "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" << Endl;
+ xml << "<CompleteMultipartUpload xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">" << Endl;
+ for (auto i = 1U; i <= Parts.size(); ++i)
+ xml << "<Part><PartNumber>" << i << "</Part></PartNumber>" << Endl;
+ xml << "</CompleteMultipartUpload>" << Endl;
+ Gateway->Download(Url + Path + "?uploadId=" + UploadId, Headers, 0, std::bind(&TS3WriteActor::OnUploadFinish, ActorSystem, SelfId(), std::placeholders::_1), xml, true);
+ }
+ }
+
+ void HandleFinished() {}
+
+ // IActor & IDqComputeActorAsyncInput
+ void PassAway() override { // Is called from Compute Actor
+ TActorBootstrapped<TS3WriteActor>::PassAway();
+ }
+
+ static IHTTPGateway::THeaders MakeHeader(const TString& token) {
+ return token.empty() ? IHTTPGateway::THeaders() : IHTTPGateway::THeaders{TString("X-YaCloud-SubjectToken:") += token};
+ }
+
+ void StartUploadParts() {
+ for (auto i = 0U; i < Parts.size(); ++i) {
+ if (auto part = std::move(Parts[i])) {
+ const auto size = part.AsStringRef().Size();
+ InFlight += size;
+ Gateway->Upload(Url + Path + "?partNumber=" + std::to_string(i + 1) + "&uploadId=" + UploadId, Headers, TString(part.AsStringRef()), std::bind(&TS3WriteActor::OnPartUploadFinish, ActorSystem, SelfId(), size, std::placeholders::_1));
+ }
+ }
+ }
+
+ bool InputFinished = false;
+ size_t InFlight = 0ULL;
+
+ const IHTTPGateway::TPtr Gateway;
+
+ const ui64 OutputIndex;
+ IDqComputeActorAsyncOutput::ICallbacks *const Callbacks;
+
+ TActorSystem* const ActorSystem;
+
+ const TString Url;
+ const IHTTPGateway::THeaders Headers;
+ const TString Path;
+
+ TUnboxedValueVector Parts;
+
+ std::vector<TRetryParams> RetriesPerPath;
+ const std::shared_ptr<NS3::TRetryConfig> RetryConfig;
+
+ TString UploadId;
+};
+
+} // namespace
+
+std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor(
+ const NKikimr::NMiniKQL::TTypeEnvironment&,
+ const NKikimr::NMiniKQL::IFunctionRegistry&,
+ IHTTPGateway::TPtr gateway,
+ NS3::TSink&& params,
+ ui64 outputIndex,
+ const THashMap<TString, TString>& secureParams,
+ IDqComputeActorAsyncOutput::ICallbacks* callbacks,
+ ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
+ const std::shared_ptr<NYql::NS3::TRetryConfig>& retryConfig)
+{
+ const auto token = secureParams.Value(params.GetToken(), TString{});
+ const auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token);
+ const auto authToken = credentialsProviderFactory->CreateProvider()->GetAuthInfo();
+
+ const auto actor = new TS3WriteActor(outputIndex, std::move(gateway), params.GetUrl(), authToken, params.GetPath(), callbacks, retryConfig);
+ return {actor, actor};
+}
+
+} // namespace NYql::NDq
+
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h
new file mode 100644
index 0000000000..2f759e83e1
--- /dev/null
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h
@@ -0,0 +1,24 @@
+#pragma once
+
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
+#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
+#include <ydb/library/yql/providers/s3/proto/sink.pb.h>
+#include <ydb/library/yql/providers/s3/proto/retry_config.pb.h>
+#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
+#include <library/cpp/actors/core/actor.h>
+
+namespace NYql::NDq {
+
+std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor(
+ const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,
+ const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry,
+ IHTTPGateway::TPtr gateway,
+ NS3::TSink&& params,
+ ui64 inputIndex,
+ const THashMap<TString, TString>& secureParams,
+ IDqComputeActorAsyncOutput::ICallbacks* callbacks,
+ ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
+ const std::shared_ptr<NYql::NS3::TRetryConfig>& retryConfig);
+
+} // namespace NYql::NDq
+
diff --git a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
index 91be8e870a..4991984cb1 100644
--- a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
+++ b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
@@ -72,6 +72,15 @@
]
},
{
+ "Name": "TS3Write",
+ "Base": "TFreeArgCallable",
+ "Match": {"Type": "Callable", "Name": "Write!"},
+ "Children": [
+ {"Index": 0, "Name": "World", "Type": "TExprBase"},
+ {"Index": 1, "Name": "DataSink", "Type": "TS3DataSink"}
+ ]
+ },
+ {
"Name": "TS3Object",
"Base": "TCallable",
"Match": {"Type": "Callable", "Name": "S3Object"},
@@ -92,6 +101,47 @@
{"Index": 3, "Name": "RowType", "Type": "TExprBase"},
{"Index": 4, "Name": "ColumnOrder", "Type": "TExprBase", "Optional": true}
]
+ },
+ {
+ "Name": "TS3Target",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "S3Target"},
+ "Children": [
+ {"Index": 0, "Name": "Path", "Type": "TCoAtom"},
+ {"Index": 1, "Name": "Format", "Type": "TCoAtom"},
+ {"Index": 2, "Name": "Settings", "Type": "TExprBase", "Optional": true}
+ ]
+ },
+ {
+ "Name": "TS3WriteObject",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "S3WriteObject!"},
+ "Children": [
+ {"Index": 0, "Name": "World", "Type": "TExprBase"},
+ {"Index": 1, "Name": "DataSink", "Type": "TS3DataSink"},
+ {"Index": 2, "Name": "Target", "Type": "TS3Target"},
+ {"Index": 3, "Name": "Input", "Type": "TExprBase"}
+ ]
+ },
+ {
+ "Name": "TS3SinkSettings",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "S3SinkSettings"},
+ "Children": [
+ {"Index": 0, "Name": "Path", "Type": "TCoAtom"},
+ {"Index": 1, "Name": "Settings", "Type": "TCoNameValueTupleList"},
+ {"Index": 2, "Name": "Token", "Type": "TCoSecureParam"}
+ ]
+ },
+ {
+ "Name": "TS3SinkOutput",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "S3SinkOutput"},
+ "Children": [
+ {"Index": 0, "Name": "Input", "Type": "TExprBase"},
+ {"Index": 1, "Name": "Format", "Type": "TCoAtom"},
+ {"Index": 2, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true}
+ ]
}
]
}
diff --git a/ydb/library/yql/providers/s3/proto/CMakeLists.txt b/ydb/library/yql/providers/s3/proto/CMakeLists.txt
index 6c6b6ecd03..ccdedfbd0c 100644
--- a/ydb/library/yql/providers/s3/proto/CMakeLists.txt
+++ b/ydb/library/yql/providers/s3/proto/CMakeLists.txt
@@ -16,6 +16,7 @@ target_link_libraries(providers-s3-proto PUBLIC
target_proto_messages(providers-s3-proto PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/proto/range.proto
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/proto/retry_config.proto
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/proto/sink.proto
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/proto/source.proto
)
target_proto_addincls(providers-s3-proto
diff --git a/ydb/library/yql/providers/s3/proto/sink.proto b/ydb/library/yql/providers/s3/proto/sink.proto
new file mode 100644
index 0000000000..bbc0405af3
--- /dev/null
+++ b/ydb/library/yql/providers/s3/proto/sink.proto
@@ -0,0 +1,11 @@
+syntax = "proto3";
+option cc_enable_arenas = true;
+
+package NYql.NS3;
+
+message TSink {
+ string Url = 1;
+ string Token = 2;
+ string Path = 3;
+}
+
diff --git a/ydb/library/yql/providers/s3/provider/CMakeLists.txt b/ydb/library/yql/providers/s3/provider/CMakeLists.txt
index 9a1814b1a1..0e2cc06957 100644
--- a/ydb/library/yql/providers/s3/provider/CMakeLists.txt
+++ b/ydb/library/yql/providers/s3/provider/CMakeLists.txt
@@ -56,6 +56,7 @@ target_sources(providers-s3-provider PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_provider_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink.cpp
index 92c99b1f24..56e5a1cd52 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink.cpp
@@ -1,4 +1,5 @@
#include "yql_s3_provider_impl.h"
+#include "yql_s3_dq_integration.h"
#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
#include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h>
@@ -15,16 +16,28 @@ using namespace NNodes;
namespace {
+void ScanPlanDependencies(const TExprNode::TPtr& input, TExprNode::TListType& children) {
+ VisitExpr(input, [&children](const TExprNode::TPtr& node) {
+ if (node->IsCallable("DqCnResult")) {
+ children.push_back(node->HeadPtr());
+ return false;
+ }
+
+ return true;
+ });
+}
+
class TS3DataSinkProvider : public TDataProviderBase {
public:
- TS3DataSinkProvider(TS3State::TPtr state)
+ TS3DataSinkProvider(TS3State::TPtr state, IHTTPGateway::TPtr)
: State_(state)
, TypeAnnotationTransformer_(CreateS3DataSinkTypeAnnotationTransformer(State_))
, ExecutionTransformer_(CreateS3DataSinkExecTransformer(State_))
, LogicalOptProposalTransformer_(CreateS3LogicalOptProposalTransformer(State_))
- {
- }
-
+ , PhysicalOptProposalTransformer_(CreateS3PhysicalOptProposalTransformer(State_))
+ , DqIntegration_(CreateS3DqIntegration(State_))
+ {}
+private:
TStringBuf GetName() const override {
return S3ProviderName;
}
@@ -46,6 +59,10 @@ public:
return *ExecutionTransformer_;
}
+ IGraphTransformer& GetPhysicalOptProposalTransformer() override {
+ return *PhysicalOptProposalTransformer_;
+ }
+
bool CanExecute(const TExprNode& node) override {
return ExecutionTransformer_->CanExec(node);
}
@@ -69,17 +86,59 @@ public:
IGraphTransformer& GetLogicalOptProposalTransformer() override {
return *LogicalOptProposalTransformer_;
}
-private:
+
+ TExprNode::TPtr RewriteIO(const TExprNode::TPtr& write, TExprContext& ctx) override {
+ const TS3Write w(write);
+ auto settings = write->Tail().ChildrenList();
+ return Build<TS3WriteObject>(ctx, w.Pos())
+ .World(w.World())
+ .DataSink(w.DataSink())
+ .Target<TS3Target>()
+ .Path(write->Child(2U)->Head().Tail().HeadPtr())
+ .Format(ExtractFormat(settings))
+ .Settings(ctx.NewList(w.Pos(), std::move(settings)))
+ .Build()
+ .Input(write->ChildPtr(3))
+ .Done().Ptr();
+ }
+
+ void GetOutputs(const TExprNode& node, TVector<TPinInfo>& outputs) override {
+ if (const auto& maybeOp = TMaybeNode<TS3WriteObject>(&node)) {
+ const auto& op = maybeOp.Cast();
+ outputs.push_back(TPinInfo(nullptr, op.DataSink().Raw(), op.Target().Raw(), op.DataSink().Cluster().StringValue() + '.' + op.Target().Path().StringValue(), false));
+ }
+ }
+
+ bool GetDependencies(const TExprNode& node, TExprNode::TListType& children, bool) override {
+ if (CanExecute(node)) {
+ children.push_back(node.HeadPtr());
+
+ if (TS3WriteObject::Match(&node)) {
+ ScanPlanDependencies(node.ChildPtr(TS3WriteObject::idx_Input), children);
+ }
+
+ return true;
+ }
+
+ return false;
+ }
+
+ IDqIntegration* GetDqIntegration() override {
+ return DqIntegration_.Get();
+ }
+
const TS3State::TPtr State_;
const THolder<TVisitorTransformerBase> TypeAnnotationTransformer_;
const THolder<TExecTransformerBase> ExecutionTransformer_;
const THolder<IGraphTransformer> LogicalOptProposalTransformer_;
+ const THolder<IGraphTransformer> PhysicalOptProposalTransformer_;
+ const THolder<IDqIntegration> DqIntegration_;
};
}
-TIntrusivePtr<IDataProvider> CreateS3DataSink(TS3State::TPtr state) {
- return new TS3DataSinkProvider(state);
+TIntrusivePtr<IDataProvider> CreateS3DataSink(TS3State::TPtr state, IHTTPGateway::TPtr gateway) {
+ return new TS3DataSinkProvider(std::move(state), std::move(gateway));
}
} // namespace NYql
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_execution.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_execution.cpp
index 1eb216e5a5..684bf5b7bf 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_execution.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_execution.cpp
@@ -1,11 +1,13 @@
#include "yql_s3_provider_impl.h"
#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
+#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h>
#include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h>
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
#include <ydb/library/yql/providers/common/provider/yql_data_provider_impl.h>
+#include <ydb/library/yql/providers/result/expr_nodes/yql_res_expr_nodes.h>
#include <ydb/library/yql/utils/log/log.h>
@@ -20,11 +22,92 @@ public:
TS3DataSinkExecTransformer(TS3State::TPtr state)
: State_(state)
{
- AddHandler({TCoCommit::CallableName()}, RequireFirst(), Pass());
+ AddHandler({TCoCommit::CallableName()}, RequireFirst(), Hndl(&TS3DataSinkExecTransformer::HandleCommit));
}
-
private:
- TS3State::TPtr State_;
+ TStatusCallbackPair HandleCommit(const TExprNode::TPtr& input, TExprContext& ctx) {
+ if (TDqQuery::Match(input->Child(TCoCommit::idx_World))) {
+ return DelegateExecutionToDqProvider(input->ChildPtr(TCoCommit::idx_World), input, ctx);
+ } else { // Pass
+ input->SetState(TExprNode::EState::ExecutionComplete);
+ input->SetResult(ctx.NewWorld(input->Pos()));
+ return SyncOk();
+ }
+ }
+
+ TStatusCallbackPair DelegateExecutionToDqProvider(const TExprNode::TPtr& input, const TExprNode::TPtr& originInput, TExprContext& ctx) {
+ YQL_CLOG(INFO, ProviderS3) << "Delegate execution of " << input->Content() << " to DQ provider.";
+ auto delegatedNode = Build<TPull>(ctx, input->Pos())
+ .Input(input)
+ .BytesLimit()
+ .Value(TString())
+ .Build()
+ .RowsLimit()
+ .Value(TString("0"))
+ .Build()
+ .FormatDetails()
+ .Value(ToString((ui32)NYson::EYsonFormat::Binary))
+ .Build()
+ .Settings()
+ .Build()
+ .Format()
+ .Value(ToString("0"))
+ .Build()
+ .PublicId()
+ .Value("id")
+ .Build()
+ .Discard()
+ .Value(ToString(true))
+ .Build()
+ .Origin(originInput)
+ .Done()
+ .Ptr();
+
+ for (auto idx: {TResOrPullBase::idx_BytesLimit, TResOrPullBase::idx_RowsLimit, TResOrPullBase::idx_FormatDetails,
+ TResOrPullBase::idx_Format, TResOrPullBase::idx_PublicId, TResOrPullBase::idx_Discard }) {
+ delegatedNode->Child(idx)->SetTypeAnn(ctx.MakeType<TUnitExprType>());
+ delegatedNode->Child(idx)->SetState(TExprNode::EState::ConstrComplete);
+ }
+
+ delegatedNode->SetTypeAnn(originInput->GetTypeAnn());
+ delegatedNode->SetState(TExprNode::EState::ConstrComplete);
+ originInput->SetState(TExprNode::EState::ExecutionInProgress);
+
+ const auto dqProvider = State_->Types->DataSourceMap.FindPtr(DqProviderName);
+
+ TExprNode::TPtr delegatedNodeOutput;
+ if (const auto status = dqProvider->Get()->GetCallableExecutionTransformer().Transform(delegatedNode, delegatedNodeOutput, ctx); status.Level != TStatus::Async) {
+ YQL_ENSURE(status.Level != TStatus::Ok, "Asynchronous execution is expected in a happy path.");
+ return SyncStatus(status);
+ }
+
+ auto dqFuture = dqProvider->Get()->GetCallableExecutionTransformer().GetAsyncFuture(*delegatedNode);
+
+ TAsyncTransformCallbackFuture callbackFuture = dqFuture.Apply(
+ [dqProvider, delegatedNode](const NThreading::TFuture<void>& completedFuture) {
+ return TAsyncTransformCallback(
+ [completedFuture, dqProvider, delegatedNode](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
+ completedFuture.GetValue();
+ output = input;
+ TExprNode::TPtr delegatedNodeOutput;
+ auto dqWriteStatus = dqProvider->Get()->GetCallableExecutionTransformer()
+ .ApplyAsyncChanges(delegatedNode, delegatedNodeOutput, ctx);
+
+ YQL_ENSURE(dqWriteStatus != TStatus::Async, "ApplyAsyncChanges should not return Async.");
+
+ if (dqWriteStatus != TStatus::Ok)
+ return dqWriteStatus;
+
+ output->SetState(TExprNode::EState::ExecutionComplete);
+ output->SetResult(ctx.NewAtom(input->Pos(), "DQ_completed"));
+ return IGraphTransformer::TStatus(IGraphTransformer::TStatus::Ok);
+ });
+ });
+
+ return std::make_pair(IGraphTransformer::TStatus::Async, callbackFuture);
+ }
+
+ const TS3State::TPtr State_;
};
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
index 30196b3f4a..9fe5d05bb7 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
@@ -23,15 +23,95 @@ public:
{
using TSelf = TS3DataSinkTypeAnnotationTransformer;
AddHandler({TCoCommit::CallableName()}, Hndl(&TSelf::HandleCommit));
+ AddHandler({TS3WriteObject::CallableName()}, Hndl(&TSelf::HandleWrite));
+ AddHandler({TS3Target::CallableName()}, Hndl(&TSelf::HandleTarget));
+ AddHandler({TS3SinkSettings::CallableName()}, Hndl(&TSelf::HandleSink));
+ AddHandler({TS3SinkOutput::CallableName()}, Hndl(&TSelf::HandleOutput));
}
-
+private:
TStatus HandleCommit(TExprBase input, TExprContext&) {
const auto commit = input.Cast<TCoCommit>();
input.Ptr()->SetTypeAnn(commit.World().Ref().GetTypeAnn());
return TStatus::Ok;
}
-private:
+ TStatus HandleWrite(const TExprNode::TPtr& input, TExprContext& ctx) {
+ if (!EnsureArgsCount(*input, 4U, ctx)) {
+ return TStatus::Error;
+ }
+
+ if (!EnsureWorldType(*input->Child(TS3WriteObject::idx_World), ctx)) {
+ return TStatus::Error;
+ }
+
+ if (!EnsureSpecificDataSink(*input->Child(TS3WriteObject::idx_DataSink), S3ProviderName, ctx)) {
+ return TStatus::Error;
+ }
+
+ if (!TS3Target::Match(input->Child(TS3WriteObject::idx_Target))) {
+ ctx.AddError(TIssue(ctx.GetPosition(input->Child(TS3WriteObject::idx_Target)->Pos()), "Expected S3 target."));
+ return TStatus::Error;
+ }
+
+ input->SetTypeAnn(ctx.MakeType<TWorldExprType>());
+ return TStatus::Ok;
+ }
+
+ TStatus HandleTarget(const TExprNode::TPtr& input, TExprContext& ctx) {
+ if (!EnsureMinMaxArgsCount(*input, 2U, 3U, ctx)) {
+ return TStatus::Error;
+ }
+
+ if (!EnsureAtom(*input->Child(TS3Target::idx_Path), ctx)) {
+ return TStatus::Error;
+ }
+
+ if (!EnsureAtom(*input->Child(TS3Target::idx_Format), ctx) || !NCommon::ValidateFormat(input->Child(TS3Target::idx_Format)->Content(), ctx)) {
+ return TStatus::Error;
+ }
+
+ if (input->ChildrenSize() > TS3Target::idx_Settings && !EnsureTuple(*input->Child(TS3Target::idx_Settings), ctx)) {
+ return TStatus::Error;
+ }
+/* TODO
+ const auto compression = GetCompression(*input->Child(TS3Target::idx_Settings));
+ if (!NCommon::ValidateCompression(compression, ctx)) {
+ return TStatus::Error;
+ }
+*/
+ input->SetTypeAnn(ctx.MakeType<TUnitExprType>());
+ return TStatus::Ok;
+ }
+
+ TStatus HandleSink(const TExprNode::TPtr& input, TExprContext& ctx) {
+ if (!EnsureArgsCount(*input, 3, ctx)) {
+ return TStatus::Error;
+ }
+ input->SetTypeAnn(ctx.MakeType<TVoidExprType>());
+ return TStatus::Ok;
+ }
+
+ TStatus HandleOutput(const TExprNode::TPtr& input, TExprContext& ctx) {
+ if (!EnsureMinMaxArgsCount(*input, 2U, 3U, ctx)) {
+ return TStatus::Error;
+ }
+
+ if (!EnsureFlowType(*input->Child(TS3SinkOutput::idx_Input), ctx)) {
+ return TStatus::Error;
+ }
+
+ if (!EnsureAtom(*input->Child(TS3SinkOutput::idx_Format), ctx)) {
+ return TStatus::Error;
+ }
+
+ if (input->ChildrenSize() > TS3SinkOutput::idx_Settings && !EnsureTuple(*input->Child(TS3SinkOutput::idx_Settings), ctx)) {
+ return TStatus::Error;
+ }
+
+ input->SetTypeAnn(ctx.MakeType<TFlowExprType>(ctx.MakeType<TDataExprType>(EDataSlot::String)));
+ return TStatus::Ok;
+ }
+
const TS3State::TPtr State_;
};
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource.cpp
index 8f77b7bfe9..3377e38317 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource.cpp
@@ -114,8 +114,6 @@ public:
IDqIntegration* GetDqIntegration() override {
return DqIntegration_.Get();
}
-
-
private:
const TS3State::TPtr State_;
const THolder<IGraphTransformer> IODiscoveryTransformer_;
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
index af7d70a35e..ba58f6d9ac 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
@@ -8,6 +8,7 @@
#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
#include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h>
#include <ydb/library/yql/providers/s3/proto/range.pb.h>
+#include <ydb/library/yql/providers/s3/proto/sink.pb.h>
#include <ydb/library/yql/providers/s3/proto/source.pb.h>
#include <ydb/library/yql/utils/log/log.h>
@@ -173,10 +174,26 @@ public:
}
}
+ void FillSinkSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, TString& sinkType) override {
+ const TDqSink sink(&node);
+ if (const auto maySettings = sink.Settings().Maybe<TS3SinkSettings>()) {
+ const auto settings = maySettings.Cast();
+ const auto& cluster = sink.DataSink().Cast<TS3DataSink>().Cluster().StringValue();
+ const auto& connect = State_->Configuration->Clusters.at(cluster);
+
+ NS3::TSink sinkDesc;
+ sinkDesc.SetUrl(connect.Url);
+ sinkDesc.SetToken(settings.Token().Name().StringValue());
+ sinkDesc.SetPath(settings.Path().StringValue());
+
+ protoSettings.PackFrom(sinkDesc);
+ sinkType = "S3Sink";
+ }
+ }
+
void RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) override {
RegisterDqS3MkqlCompilers(compiler, State_);
}
-
private:
const TS3State::TPtr State_;
};
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
index 2626a56dfc..e8c3f5a320 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
@@ -6,14 +6,13 @@
#include <ydb/library/yql/utils/url_builder.h>
#include <util/generic/size_literals.h>
-
#include <contrib/libs/re2/re2/re2.h>
+#include <library/cpp/retry/retry_policy.h>
#ifdef THROW
#undef THROW
#endif
#include <library/cpp/xml/document/xml-document.h>
-#include <library/cpp/retry/retry_policy.h>
namespace NYql {
@@ -32,17 +31,6 @@ std::array<TExprNode::TPtr, 2U> ExtractSchema(TExprNode::TListType& settings) {
return {};
}
-TExprNode::TPtr ExtractFormat(TExprNode::TListType& settings) {
- for (auto it = settings.cbegin(); settings.cend() != it; ++it) {
- if (const auto item = *it; item->Head().IsAtom("format")) {
- settings.erase(it);
- return item->TailPtr();
- }
- }
-
- return {};
-}
-
using TItemsMap = std::map<TString, ui64>;
using TPendingBuckets = std::unordered_map<std::tuple<TString, TString, TString>, std::tuple<TNodeSet, TItemsMap, TIssues>, THash<std::tuple<TString, TString, TString>>>;
@@ -114,6 +102,7 @@ void OnDiscovery(
0U,
std::bind(&OnDiscovery, gateway, pos, std::placeholders::_1, std::cref(keys), std::ref(output), std::move(promise), pendingBucketsWPtr, promiseInd, retryPolicy, maxDiscoveryFilesPerQuery),
/*data=*/"",
+ false,
retryPolicy);
}
YQL_CLOG(INFO, ProviderS3) << "Gateway disappeared.";
@@ -256,6 +245,7 @@ public:
IHTTPGateway::TWeakPtr(Gateway_), ctx.GetPosition((*std::get<TNodeSet>(bucket.second).cbegin())->Pos()), std::placeholders::_1,
std::cref(bucket.first), std::ref(bucket.second), std::move(promise), pendingBucketsWPtr, i++, retryPolicy, State_->Configuration->MaxDiscoveryFilesPerQuery),
/*data=*/"",
+ false,
retryPolicy
);
YQL_CLOG(INFO, ProviderS3) << "Enumerate items in " << std::get<0U>(bucket.first) << std::get<1U>(bucket.first);
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp
index fab2d6d0f7..d9e83ee3cc 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp
@@ -10,6 +10,50 @@ namespace NYql {
using namespace NKikimr::NMiniKQL;
using namespace NNodes;
+namespace {
+
+
+TRuntimeNode BuildSerializeCall(
+ TRuntimeNode input,
+ const std::string_view& format,
+ const std::string_view& /*compression*/,
+ TType* inputType,
+ NCommon::TMkqlBuildContext& ctx)
+{
+ if (format == "raw") {
+ const auto structType = AS_TYPE(TStructType, AS_TYPE(TFlowType, inputType)->GetItemType());
+ MKQL_ENSURE(1U == structType->GetMembersCount(), "Expected single column.");
+ const auto schemeType = AS_TYPE(TDataType, structType->GetMemberType(0U))->GetSchemeType();
+ return ctx.ProgramBuilder.Map(input,
+ [&](TRuntimeNode item) {
+ const auto member = ctx.ProgramBuilder.Member(item, structType->GetMemberName(0U));
+ return NUdf::TDataType<const char*>::Id == schemeType ? member : ctx.ProgramBuilder.ToString(member);
+ }
+ );
+ } else if (format == "json_list") {
+ return ctx.ProgramBuilder.FlatMap(
+ ctx.ProgramBuilder.Condense(input, ctx.ProgramBuilder.NewList(AS_TYPE(TFlowType, inputType)->GetItemType(), {}),
+ [&ctx] (TRuntimeNode, TRuntimeNode) { return ctx.ProgramBuilder.NewDataLiteral<bool>(false); },
+ [&ctx] (TRuntimeNode item, TRuntimeNode state) { return ctx.ProgramBuilder.Append(state, item); }
+ ),
+ [&ctx] (TRuntimeNode list) {
+ const auto userType = ctx.ProgramBuilder.NewTupleType({ctx.ProgramBuilder.NewTupleType({list.GetStaticType()})});
+ return ctx.ProgramBuilder.ToString(ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("Yson2.SerializeJson"), {ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("Yson2.From", {}, userType), {list})}));
+ }
+ );
+ }
+
+ throw yexception() << "Unsupported format: " << format;
+}
+
+TRuntimeNode SerializeForS3(const TS3SinkOutput& wrapper, NCommon::TMkqlBuildContext& ctx) {
+ const auto input = MkqlBuildExpr(wrapper.Input().Ref(), ctx);
+ const auto inputItemType = NCommon::BuildType(wrapper.Input().Ref(), *wrapper.Input().Ref().GetTypeAnn(), ctx.ProgramBuilder);
+ return BuildSerializeCall(input, wrapper.Format().Value(), "TODO", inputItemType, ctx);
+}
+
+}
+
void RegisterDqS3MkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler, const TS3State::TPtr&) {
compiler.ChainCallable(TDqSourceWideWrap::CallableName(),
[](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) {
@@ -22,6 +66,12 @@ void RegisterDqS3MkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler, con
return TRuntimeNode();
});
+
+ if (!compiler.HasCallable(TS3SinkOutput::CallableName()))
+ compiler.AddCallable(TS3SinkOutput::CallableName(),
+ [](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) {
+ return SerializeForS3(TS3SinkOutput(&node), ctx);
+ });
}
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
new file mode 100644
index 0000000000..6718f26061
--- /dev/null
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
@@ -0,0 +1,104 @@
+#include "yql_s3_provider_impl.h"
+
+#include <ydb/library/yql/utils/log/log.h>
+#include <ydb/library/yql/core/yql_opt_utils.h>
+#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h>
+#include <ydb/library/yql/dq/opt/dq_opt.h>
+#include <ydb/library/yql/dq/opt/dq_opt_phy.h>
+#include <ydb/library/yql/providers/common/transform/yql_optimize.h>
+#include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h>
+
+namespace NYql {
+
+namespace {
+
+using namespace NNodes;
+using namespace NDq;
+
+class TS3PhysicalOptProposalTransformer : public TOptimizeTransformerBase {
+public:
+ explicit TS3PhysicalOptProposalTransformer(TS3State::TPtr state)
+ : TOptimizeTransformerBase(state->Types, NLog::EComponent::ProviderS3, {})
+ , State_(std::move(state))
+ {
+#define HNDL(name) "PhysicalOptimizer-"#name, Hndl(&TS3PhysicalOptProposalTransformer::name)
+ AddHandler(0, &TS3WriteObject::Match, HNDL(S3WriteObject));
+#undef HNDL
+
+ SetGlobal(0); // Stage 0 of this optimizer is global => we can remap nodes.
+ }
+
+ TMaybeNode<TExprBase> S3WriteObject(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) const {
+ auto write = node.Cast<TS3WriteObject>();
+ if (!TDqCnUnionAll::Match(write.Input().Raw())) { // => this code is not for RTMR mode.
+ return node;
+ }
+
+ const auto& targetNode = write.Target();
+ const auto& cluster = write.DataSink().Cluster().StringValue();
+
+ const TParentsMap* parentsMap = getParents();
+ auto dqUnion = write.Input().Cast<TDqCnUnionAll>();
+ if (!NDq::IsSingleConsumerConnection(dqUnion, *parentsMap)) {
+ return node;
+ }
+
+
+ YQL_CLOG(INFO, ProviderS3) << "Optimize S3WriteObject `" << cluster << "`.`" << targetNode.Path().StringValue() << "`";
+
+ const auto token = "cluster:default_" + cluster;
+
+ auto dqSink = Build<TDqSink>(ctx, write.Pos())
+ .DataSink(write.DataSink())
+ .Index(dqUnion.Output().Index())
+ .Settings<TS3SinkSettings>()
+ .Path(write.Target().Path())
+ .Settings<TCoNameValueTupleList>().Build()
+ .Token<TCoSecureParam>()
+ .Name().Build(token)
+ .Build()
+ .Build()
+ .Done();
+
+ auto inputStage = dqUnion.Output().Stage().Cast<TDqStage>();
+ auto outputsBuilder = Build<TDqStageOutputsList>(ctx, targetNode.Pos());
+ if (inputStage.Outputs()) {
+ outputsBuilder.InitFrom(inputStage.Outputs().Cast());
+ }
+ outputsBuilder.Add(dqSink);
+
+ auto dqStageWithSink = Build<TDqStage>(ctx, inputStage.Pos())
+ .InitFrom(inputStage)
+ .Program<TCoLambda>()
+ .Args({"input"})
+ .Body<TS3SinkOutput>()
+ .Input<TExprApplier>()
+ .Apply(inputStage.Program()).With(0, "input")
+ .Build()
+ .Format(write.Target().Format())
+ .Build()
+ .Build()
+ .Outputs(outputsBuilder.Done())
+ .Done();
+
+ auto dqQueryBuilder = Build<TDqQuery>(ctx, write.Pos());
+ dqQueryBuilder.World(write.World());
+ dqQueryBuilder.SinkStages().Add(dqStageWithSink).Build();
+
+ optCtx.RemapNode(inputStage.Ref(), dqStageWithSink.Ptr());
+
+ return dqQueryBuilder.Done();
+ }
+
+private:
+ TS3State::TPtr State_;
+};
+
+} // namespace
+
+THolder<IGraphTransformer> CreateS3PhysicalOptProposalTransformer(TS3State::TPtr state) {
+ return MakeHolder<TS3PhysicalOptProposalTransformer>(std::move(state));
+}
+
+} // namespace NYql
+
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp
index 4d70add3c0..4b11f78ad7 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp
@@ -35,7 +35,7 @@ TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway
info.Names.insert({TString{S3ProviderName}});
info.Source = CreateS3DataSource(state, gateway);
- info.Sink = CreateS3DataSink(state);
+ info.Sink = CreateS3DataSink(state, gateway);
return info;
};
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_provider.h b/ydb/library/yql/providers/s3/provider/yql_s3_provider.h
index 12cd74f805..d4367b14b2 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_provider.h
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_provider.h
@@ -32,6 +32,6 @@ struct TS3State : public TThrRefBase
TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory = nullptr);
TIntrusivePtr<IDataProvider> CreateS3DataSource(TS3State::TPtr state, IHTTPGateway::TPtr gateway);
-TIntrusivePtr<IDataProvider> CreateS3DataSink(TS3State::TPtr state);
+TIntrusivePtr<IDataProvider> CreateS3DataSink(TS3State::TPtr state, IHTTPGateway::TPtr gateway);
} // namespace NYql
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_provider_impl.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_provider_impl.cpp
index 6c59efc837..cf92965115 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_provider_impl.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_provider_impl.cpp
@@ -4,4 +4,15 @@
namespace NYql {
+TExprNode::TPtr ExtractFormat(TExprNode::TListType& settings) {
+ for (auto it = settings.cbegin(); settings.cend() != it; ++it) {
+ if (const auto item = *it; item->Head().IsAtom("format")) {
+ settings.erase(it);
+ return item->TailPtr();
+ }
+ }
+
+ return {};
+}
+
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_provider_impl.h b/ydb/library/yql/providers/s3/provider/yql_s3_provider_impl.h
index 732615b23f..e3694a3ba9 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_provider_impl.h
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_provider_impl.h
@@ -19,6 +19,8 @@ THolder<TExecTransformerBase> CreateS3DataSinkExecTransformer(TS3State::TPtr sta
THolder<IGraphTransformer> CreateS3LogicalOptProposalTransformer(TS3State::TPtr state);
THolder<IGraphTransformer> CreateS3SourceCallableExecutionTransformer(TS3State::TPtr state);
THolder<IGraphTransformer> CreateS3IODiscoveryTransformer(TS3State::TPtr state, IHTTPGateway::TPtr gateway);
+THolder<IGraphTransformer> CreateS3PhysicalOptProposalTransformer(TS3State::TPtr state);
+TExprNode::TPtr ExtractFormat(TExprNode::TListType& settings);
} // namespace NYql