diff options
author | a-romanov <a-romanov@yandex-team.ru> | 2022-06-30 17:30:21 +0300 |
---|---|---|
committer | a-romanov <a-romanov@yandex-team.ru> | 2022-06-30 17:30:21 +0300 |
commit | 1d8f8dfd93dc506a0fd6a696eda1cf149afdb1f8 (patch) | |
tree | a026078e8d7233a503cb3349ada7289a8db4022c | |
parent | 9a30021fae9ff838945d397ccf4f3ae60dabb47e (diff) | |
download | ydb-1d8f8dfd93dc506a0fd6a696eda1cf149afdb1f8.tar.gz |
YQL-15033 S3 sink draft.
ref:bdf2543586747072f2f6ab8149b0e732df2fb9f9
26 files changed, 972 insertions, 49 deletions
diff --git a/ydb/core/yq/libs/test_connection/test_object_storage.cpp b/ydb/core/yq/libs/test_connection/test_object_storage.cpp index 80832654cf..52a2b81e55 100644 --- a/ydb/core/yq/libs/test_connection/test_object_storage.cpp +++ b/ydb/core/yq/libs/test_connection/test_object_storage.cpp @@ -175,6 +175,7 @@ private: 0U, std::bind(&DiscoveryCallback, std::placeholders::_1, SelfId(), TActivationContext::ActorSystem()), /*data=*/"", + false, retryPolicy ); } diff --git a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp index 8b61bbddce..6c66c927a1 100644 --- a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp @@ -43,12 +43,15 @@ public: return std::move(ret); } + void Upload(TString, THeaders, TString, TOnResponse) {} + void Download( TString url, IHTTPGateway::THeaders headers, std::size_t expectedSize, IHTTPGateway::TOnResult callback, TString data, + bool, IRetryPolicy<long>::TPtr retryPolicy) { diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp index 0b60b1e515..83429896b6 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp @@ -18,13 +18,27 @@ class TEasyCurl { public: using TPtr = std::shared_ptr<TEasyCurl>; - TEasyCurl(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, size_t offset, bool withData) + enum class EMethod { + GET, + POST, + PUT + }; + + TEasyCurl(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, EMethod method, size_t offset = 0ULL, bool withBody = false) : Offset(offset), Handle(curl_easy_init()), Counter(counter), DownloadedBytes(downloadedBytes) { + switch (method) { + case EMethod::GET: + break; + case EMethod::POST: + curl_easy_setopt(Handle, CURLOPT_POST, 1L); + break; + case EMethod::PUT: + curl_easy_setopt(Handle, CURLOPT_UPLOAD, 1L); + break; + } + curl_easy_setopt(Handle, CURLOPT_URL, url.c_str()); - curl_easy_setopt(Handle, CURLOPT_POST, withData ? 1L : 0L); - curl_easy_setopt(Handle, CURLOPT_WRITEFUNCTION, &WriteMemoryCallback); - curl_easy_setopt(Handle, CURLOPT_WRITEDATA, static_cast<void*>(this)); curl_easy_setopt(Handle, CURLOPT_USERAGENT, "YQ HTTP gateway"); curl_easy_setopt(Handle, CURLOPT_SSL_VERIFYPEER, 0L); @@ -38,7 +52,12 @@ public: curl_easy_setopt(Handle, CURLOPT_RANGE, (ToString(Offset) += '-').c_str()); } - if (withData) { + if (EMethod::PUT != method) { + curl_easy_setopt(Handle, CURLOPT_WRITEFUNCTION, &WriteMemoryCallback); + curl_easy_setopt(Handle, CURLOPT_WRITEDATA, static_cast<void*>(this)); + } + + if (withBody) { curl_easy_setopt(Handle, CURLOPT_READFUNCTION, &ReadMemoryCallback); curl_easy_setopt(Handle, CURLOPT_READDATA, static_cast<void*>(this)); } @@ -53,6 +72,8 @@ public: } } + virtual size_t GetExpectedSize() const { return 0ULL; } + CURL* GetHandle() const { return Handle; } @@ -86,23 +107,61 @@ private: const NMonitoring::TDynamicCounters::TCounterPtr DownloadedBytes; }; +class TEasyCurlUpload : public TEasyCurl { +public: + using TPtr = std::shared_ptr<TEasyCurlUpload>; + using TWeakPtr = std::weak_ptr<TEasyCurlUpload>; + + TEasyCurlUpload(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, TString data, IHTTPGateway::THeaders headers, IHTTPGateway::TOnResponse callback) + : TEasyCurl(counter, uploadedBytes, url, headers, EMethod::PUT), Data(std::move(data)), Input(Data), Callback(std::move(callback)) + {} + + static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, TString data, IHTTPGateway::THeaders headers, IHTTPGateway::TOnResponse callback) { + return std::make_shared<TEasyCurlUpload>(counter, uploadedBytes, std::move(url), std::move(data), std::move(headers), std::move(callback)); + } +private: + void Fail(const TIssue& error) final { + Callback(TIssues{error}); + } + + void Done(CURLcode result) final { + if (CURLE_OK != result) + return Fail(TIssue(curl_easy_strerror(result))); + + long httpResponseCode = 0; + curl_easy_getinfo(GetHandle(), CURLINFO_RESPONSE_CODE, &httpResponseCode); + Callback(httpResponseCode); + } + + size_t Write(void*, size_t, size_t) final { return 0ULL; } + + size_t Read(char *buffer, size_t size, size_t nmemb) final { + return Input.Read(buffer, size * nmemb); + } + + const TString Data; + TStringInput Input; + + IHTTPGateway::TOnResponse Callback; +}; + class TEasyCurlBuffer : public TEasyCurl { public: using TPtr = std::shared_ptr<TEasyCurlBuffer>; using TWeakPtr = std::weak_ptr<TEasyCurlBuffer>; - TEasyCurlBuffer(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback) - : TEasyCurl(counter, downloadedBytes, url, headers, offset, !data.empty()), ExpectedSize(expectedSize), Data(std::move(data)), Input(Data), Output(Buffer) + TEasyCurlBuffer(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, bool post, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback) + : TEasyCurl(counter, downloadedBytes, url, headers, post ? EMethod::POST : EMethod::GET, offset, !data.empty()), ExpectedSize(expectedSize), Data(std::move(data)), Input(Data), Output(Buffer) { Output.Reserve(ExpectedSize); Callbacks.emplace(std::move(callback)); } - static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback) { - return std::make_shared<TEasyCurlBuffer>(counter, downloadedBytes, std::move(url), std::move(data), std::move(headers), offset, expectedSize, std::move(callback)); + static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, bool post, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback) { + return std::make_shared<TEasyCurlBuffer>(counter, downloadedBytes, std::move(url), post, std::move(data), std::move(headers), offset, expectedSize, std::move(callback)); } - size_t GetExpectedSize() const { + size_t GetExpectedSize() const final { return ExpectedSize; } @@ -145,11 +204,11 @@ private: const auto realsize = size * nmemb; Output.Write(contents, realsize); return realsize; - }; + } size_t Read(char *buffer, size_t size, size_t nmemb) final { return Input.Read(buffer, size * nmemb); - }; + } const size_t ExpectedSize; const TString Data; @@ -167,7 +226,7 @@ public: using TWeakPtr = std::weak_ptr<TEasyCurlStream>; TEasyCurlStream(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, size_t offset, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish) - : TEasyCurl(counter, downloadedBytes, url, headers, offset, false), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL)) + : TEasyCurl(counter, downloadedBytes, url, headers, EMethod::GET, offset), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL)) {} static TPtr Make(const NMonitoring::TDynamicCounters::TCounterPtr& counter, const NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, TString url, IHTTPGateway::THeaders headers, size_t offset, IHTTPGateway::TOnNewDataPart onNewData, IHTTPGateway::TOnDownloadFinish onFinish) { @@ -208,7 +267,7 @@ private: Position += realsize; OnNewData(IHTTPGateway::TCountedContent(TString(static_cast<char*>(contents), realsize), Counter)); return realsize; - }; + } size_t Read(char*, size_t, size_t) final { return 0ULL; } @@ -258,6 +317,7 @@ public: , AwaitQueue(Counters->GetCounter("AwaitQueue")) , AwaitQueueTopExpectedSize(Counters->GetCounter("AwaitQueueTopExpectedSize")) , DownloadedBytes(Counters->GetCounter("DownloadedBytes", true)) + , UploadedBytes(Counters->GetCounter("UploadedBytes", true)) { if (httpGatewaysCfg) { if (httpGatewaysCfg->HasMaxInFlightCount()) { @@ -443,12 +503,22 @@ private: } } + void Upload(TString url, THeaders headers, TString body, TOnResponse callback) final { + Rps->Inc(); + + auto upload = TEasyCurlUpload::Make(InFlight, UploadedBytes, std::move(url), std::move(body), std::move(headers), std::move(callback)); + const std::unique_lock lock(Sync); + Await.emplace(std::move(upload)); + Wakeup(0ULL); + } + void Download( TString url, THeaders headers, size_t expectedSize, TOnResult callback, TString data, + bool post, IRetryPolicy<long>::TPtr retryPolicy) final { Rps->Inc(); @@ -459,12 +529,13 @@ private: return; } const std::unique_lock lock(Sync); - auto& entry = Requests[TKeyType(url, 0U, headers, data, retryPolicy)]; + TEasyCurlBuffer::TWeakPtr stub; + auto& entry = post ? stub : Requests[TKeyType(url, 0U, headers, data, retryPolicy)]; if (const auto& easy = entry.lock()) if (easy->AddCallback(callback)) return; - auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, std::move(url), std::move(data), std::move(headers), 0U, expectedSize, std::move(callback)); + auto easy = TEasyCurlBuffer::Make(InFlight, DownloadedBytes, std::move(url), post, std::move(data), std::move(headers), 0U, expectedSize, std::move(callback)); entry = easy; Easy2RetryState.emplace(easy, std::move(retryPolicy->CreateRetryState())); Await.emplace(std::move(easy)); @@ -478,7 +549,6 @@ private: TOnNewDataPart onNewData, TOnDownloadFinish onFinish) final { - auto stream = TEasyCurlStream::Make(InFlightStreams, DownloadedBytes, std::move(url), std::move(headers), offset, std::move(onNewData), std::move(onFinish)); const std::unique_lock lock(Sync); const auto handle = stream->GetHandle(); @@ -529,7 +599,7 @@ private: private: CURLM* Handle = nullptr; - std::queue<TEasyCurlBuffer::TPtr> Await; + std::queue<TEasyCurl::TPtr> Await; std::vector<TEasyCurlStream::TWeakPtr> Streams; std::unordered_map<CURL*, TEasyCurl::TPtr> Allocated; @@ -557,6 +627,7 @@ private: const NMonitoring::TDynamicCounters::TCounterPtr AwaitQueue; const NMonitoring::TDynamicCounters::TCounterPtr AwaitQueueTopExpectedSize; const NMonitoring::TDynamicCounters::TCounterPtr DownloadedBytes; + const NMonitoring::TDynamicCounters::TCounterPtr UploadedBytes; TTaskScheduler TaskScheduler; }; diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h index 15e554a7d8..8c7a142f90 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h @@ -61,9 +61,15 @@ public: long HttpResponseCode; }; + using THeaders = TSmallVec<TString>; + + using TResponse = std::variant<long, TIssues>; + using TOnResponse = std::function<void(TResponse&&)>; + + virtual void Upload(TString url, THeaders headers, TString body, TOnResponse callback) = 0; + using TResult = std::variant<TContent, TIssues>; using TOnResult = std::function<void(TResult&&)>; - using THeaders = TSmallVec<TString>; virtual void Download( TString url, @@ -71,6 +77,7 @@ public: std::size_t expectedSize, TOnResult callback, TString data = {}, + bool post = false, IRetryPolicy</*http response code*/long>::TPtr RetryPolicy = IRetryPolicy<long>::GetNoRetryPolicy() ) = 0; diff --git a/ydb/library/yql/providers/s3/actors/CMakeLists.darwin.txt b/ydb/library/yql/providers/s3/actors/CMakeLists.darwin.txt index af03361e05..4ad08563c3 100644 --- a/ydb/library/yql/providers/s3/actors/CMakeLists.darwin.txt +++ b/ydb/library/yql/providers/s3/actors/CMakeLists.darwin.txt @@ -14,6 +14,7 @@ target_compile_options(providers-s3-actors PRIVATE target_link_libraries(providers-s3-actors PUBLIC contrib-libs-cxxsupp yutil + cpp-xml-document yql-minikql-computation common-token_accessor-client common-schema-mkql @@ -23,5 +24,7 @@ target_link_libraries(providers-s3-actors PUBLIC providers-s3-proto ) target_sources(providers-s3-actors PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp ) diff --git a/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt b/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt index 5127a2fcbc..d1802d1ec9 100644 --- a/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt +++ b/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt @@ -21,6 +21,7 @@ target_link_libraries(providers-s3-actors PUBLIC yutil contrib-libs-fmt libs-poco-Util + cpp-xml-document yql-minikql-computation common-token_accessor-client common-schema-mkql @@ -33,5 +34,7 @@ target_link_libraries(providers-s3-actors PUBLIC ) target_sources(providers-s3-actors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp ) diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp new file mode 100644 index 0000000000..fd9663f4c0 --- /dev/null +++ b/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp @@ -0,0 +1,15 @@ +#include "yql_s3_sink_factory.h" +#include "yql_s3_write_actor.h" + +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> + +namespace NYql::NDq { + +void RegisterS3WriteActorFactory(TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway, const std::shared_ptr<NYql::NS3::TRetryConfig>& retryConfig) { + factory.RegisterSink<NS3::TSink>("S3Sink", + [credentialsFactory, gateway, retryConfig](NS3::TSink&& settings, IDqAsyncIoFactory::TSinkArguments&& args) { + return CreateS3WriteActor(args.TypeEnv, *args.HolderFactory.GetFunctionRegistry(), gateway, std::move(settings), args.OutputIndex, args.SecureParams, args.Callback, credentialsFactory, retryConfig); + }); +} + +} diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h b/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h new file mode 100644 index 0000000000..80b7fbf7ca --- /dev/null +++ b/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.h @@ -0,0 +1,18 @@ +#pragma once + +#include <ydb/library/yql/providers/common/token_accessor/client/factory.h> + +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> +#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> +#include <ydb/library/yql/providers/s3/proto/retry_config.pb.h> + + +namespace NYql::NDq { + +void RegisterS3WriteActorFactory( + TDqAsyncIoFactory& factory, + ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, + IHTTPGateway::TPtr gateway, + const std::shared_ptr<NYql::NS3::TRetryConfig>& retryConfig = nullptr); +} diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp new file mode 100644 index 0000000000..79785df724 --- /dev/null +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -0,0 +1,321 @@ +#include "yql_s3_write_actor.h" + +#include <ydb/library/yql/utils/yql_panic.h> + +#include <ydb/library/yql/providers/common/provider/yql_provider_names.h> + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/events.h> +#include <library/cpp/actors/core/event_local.h> +#include <library/cpp/actors/core/hfunc.h> + +#include <util/generic/size_literals.h> + +#include <queue> + +#ifdef THROW +#undef THROW +#endif +#include <library/cpp/xml/document/xml-document.h> + +namespace NYql::NDq { + +using namespace NActors; + +namespace { + +struct TEvPrivate { + // Event ids + enum EEv : ui32 { + EvBegin = EventSpaceBegin(TEvents::ES_PRIVATE), + + EvUploadError = EvBegin, + EvUploadStarted, + EvUploadPartFinished, + EvUploadFinished, + + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)"); + + // Events + struct TEvUploadFinished : public TEventLocal<TEvUploadFinished, EvUploadFinished> {}; + + struct TEvUploadError : public TEventLocal<TEvUploadError, EvUploadError> { + explicit TEvUploadError(TIssues&& error) : Error(std::move(error)) {} + const TIssues Error; + }; + + struct TEvUploadStarted : public TEventLocal<TEvUploadStarted, EvUploadStarted> { + explicit TEvUploadStarted(TString&& uploadId) : UploadId(std::move(uploadId)) {} + const TString UploadId; + }; + + struct TEvUploadPartFinished : public TEventLocal<TEvUploadPartFinished, EvUploadPartFinished> { + explicit TEvUploadPartFinished(size_t size) : Size(size) {} + const size_t Size; + }; +}; + +using TPath = std::tuple<TString, size_t>; +using TPathList = std::vector<TPath>; + +class TRetryParams { +public: + TRetryParams(const std::shared_ptr<NS3::TRetryConfig>& retryConfig) + : MaxRetries(retryConfig && retryConfig->GetMaxRetriesPerPath() ? retryConfig->GetMaxRetriesPerPath() : 3U) + , InitDelayMs(retryConfig && retryConfig->GetInitialDelayMs() ? TDuration::MilliSeconds(retryConfig->GetInitialDelayMs()) : TDuration::MilliSeconds(100)) + , InitEpsilon(retryConfig && retryConfig->GetEpsilon() ? retryConfig->GetEpsilon() : 0.1) + { + Y_VERIFY(0. < InitEpsilon && InitEpsilon < 1.); + Reset(); + } + + void Reset() { + Retries = 0U; + DelayMs = InitDelayMs; + Epsilon = InitEpsilon; + } + + TDuration GetNextDelay() { + if (++Retries > MaxRetries) + return TDuration::Zero(); + return DelayMs = GenerateNextDelay(); + } +private: + TDuration GenerateNextDelay() { + const auto low = 1. - Epsilon; + const auto jitter = low + std::rand() / (RAND_MAX / (2. * Epsilon)); + return DelayMs * jitter; + } + + const ui32 MaxRetries; + const TDuration InitDelayMs; + const double InitEpsilon; + + ui32 Retries; + TDuration DelayMs; + double Epsilon; +}; + +using namespace NKikimr::NMiniKQL; + +class TS3WriteActor : public TActorBootstrapped<TS3WriteActor>, public IDqComputeActorAsyncOutput { +public: + TS3WriteActor(ui64 outputIndex, + IHTTPGateway::TPtr gateway, + const TString& url, + const TString& token, + const TString& path, + IDqComputeActorAsyncOutput::ICallbacks* callbacks, + const std::shared_ptr<NS3::TRetryConfig>& retryConfig) + : Gateway(std::move(gateway)) + , OutputIndex(outputIndex) + , Callbacks(callbacks) + , ActorSystem(TActivationContext::ActorSystem()) + , Url(url) + , Headers(MakeHeader(token)) + , Path(path) + , RetryConfig(retryConfig) + {} + + void Bootstrap() { + Become(&TS3WriteActor::InitialStateFunc); + Gateway->Download(Url + Path + "?uploads", Headers, 0, std::bind(&TS3WriteActor::OnUploadsCreated, ActorSystem, SelfId(), std::placeholders::_1), "", true); + } + + static constexpr char ActorName[] = "S3_WRITE_ACTOR"; +private: + void CommitState(const NDqProto::TCheckpoint&) final {}; + void LoadState(const NDqProto::TSinkState&) final {}; + ui64 GetOutputIndex() const final { return OutputIndex; } + i64 GetFreeSpace() const final { + return 1_GB - InFlight - std::accumulate(Parts.cbegin(), Parts.cend(), 0LL, [](i64 s, const NUdf::TUnboxedValuePod v){ return v ? s + v.AsStringRef().Size() : s; }); + } + + STRICT_STFUNC(InitialStateFunc, + hFunc(TEvPrivate::TEvUploadError, Handle); + hFunc(TEvPrivate::TEvUploadStarted, Handle); + ) + + STRICT_STFUNC(WorkingStateFunc, + hFunc(TEvPrivate::TEvUploadError, Handle); + hFunc(TEvPrivate::TEvUploadPartFinished, Handle); + ) + + STRICT_STFUNC(FinalStateFunc, + hFunc(TEvPrivate::TEvUploadError, Handle); + cFunc(TEvPrivate::EvUploadFinished, HandleFinished); + ) + + static void OnUploadsCreated(TActorSystem* actorSystem, TActorId selfId, IHTTPGateway::TResult&& result) { + switch (result.index()) { + case 0U: try { + const NXml::TDocument xml(std::get<IHTTPGateway::TContent>(std::move(result)).Extract(), NXml::TDocument::String); + if (const auto& root = xml.Root(); root.Name() == "Error") { + const auto& code = root.Node("Code", true).Value<TString>(); + const auto& message = root.Node("Message", true).Value<TString>(); + actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << message << ", error: code: " << code)}))); + } else if (root.Name() != "InitiateMultipartUploadResult") + actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response '" << root.Name() << "' on discovery.")}))); + else + actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadStarted(root.Node("UploadId", true).Value<TString>()))); + + break; + } catch (const std::exception& ex) { + actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Error '" << ex.what() << "' on parse discovery response.")}))); + break; + } + case 1U: + actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError(std::get<TIssues>(std::move(result))))); + break; + default: + actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected variant index " << result.index() << " on discovery.")}))); + break; + } + } + + static void OnPartUploadFinish(TActorSystem* actorSystem, TActorId selfId, size_t size, IHTTPGateway::TResponse&& response) { + switch (response.index()) { + case 0U: + if (const auto code = std::get<long>(std::move(response)); 200L == code) + actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadPartFinished(size))); + else + actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response code " << code)}))); + case 1U: + actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError(std::get<TIssues>(std::move(response))))); + } + } + + static void OnUploadFinish(TActorSystem* actorSystem, TActorId selfId, IHTTPGateway::TResult&& result) { + switch (result.index()) { + case 0U: try { + const NXml::TDocument xml(std::get<IHTTPGateway::TContent>(std::move(result)).Extract(), NXml::TDocument::String); + if (const auto& root = xml.Root(); root.Name() == "Error") { + const auto& code = root.Node("Code", true).Value<TString>(); + const auto& message = root.Node("Message", true).Value<TString>(); + actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << message << ", error: code: " << code)}))); + } else if (root.Name() != "CompleteMultipartUploadResult") + actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response '" << root.Name() << "' on discovery.")}))); + else + actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadFinished())); + + break; + } catch (const std::exception& ex) { + actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Error '" << ex.what() << "' on parse discovery response.")}))); + break; + } + case 1U: + actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError(std::get<TIssues>(std::move(result))))); + break; + default: + actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected variant index " << result.index() << " on discovery.")}))); + break; + } + } + + void SendData(TUnboxedValueVector&& data, i64 size, const TMaybe<NDqProto::TCheckpoint>&, bool finished) final { + Cerr << __func__ << '(' << data.size() << ',' << size << ',' << finished << ')' << Endl; + InputFinished = finished; + std::move(data.begin(), data.end(), std::back_inserter(Parts)); + if (!UploadId.empty()) + StartUploadParts(); + } + + void Handle(TEvPrivate::TEvUploadError::TPtr& result) { + Parts.clear(); + Callbacks->OnAsyncOutputError(OutputIndex, result->Get()->Error, true); + if (!UploadId.empty()) { + // TODO: Send delete. + } + } + + void Handle(TEvPrivate::TEvUploadStarted::TPtr& result) { + UploadId = result->Get()->UploadId; + Become(&TS3WriteActor::InitialStateFunc); + StartUploadParts(); + } + + void Handle(TEvPrivate::TEvUploadPartFinished::TPtr& result) { + InFlight -= result->Get()->Size; + + if (!InFlight && std::all_of(Parts.cbegin(), Parts.cend(), std::logical_not<NUdf::TUnboxedValuePod>())) { + Become(&TS3WriteActor::FinalStateFunc); + + TStringBuilder xml; + xml << "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" << Endl; + xml << "<CompleteMultipartUpload xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">" << Endl; + for (auto i = 1U; i <= Parts.size(); ++i) + xml << "<Part><PartNumber>" << i << "</Part></PartNumber>" << Endl; + xml << "</CompleteMultipartUpload>" << Endl; + Gateway->Download(Url + Path + "?uploadId=" + UploadId, Headers, 0, std::bind(&TS3WriteActor::OnUploadFinish, ActorSystem, SelfId(), std::placeholders::_1), xml, true); + } + } + + void HandleFinished() {} + + // IActor & IDqComputeActorAsyncInput + void PassAway() override { // Is called from Compute Actor + TActorBootstrapped<TS3WriteActor>::PassAway(); + } + + static IHTTPGateway::THeaders MakeHeader(const TString& token) { + return token.empty() ? IHTTPGateway::THeaders() : IHTTPGateway::THeaders{TString("X-YaCloud-SubjectToken:") += token}; + } + + void StartUploadParts() { + for (auto i = 0U; i < Parts.size(); ++i) { + if (auto part = std::move(Parts[i])) { + const auto size = part.AsStringRef().Size(); + InFlight += size; + Gateway->Upload(Url + Path + "?partNumber=" + std::to_string(i + 1) + "&uploadId=" + UploadId, Headers, TString(part.AsStringRef()), std::bind(&TS3WriteActor::OnPartUploadFinish, ActorSystem, SelfId(), size, std::placeholders::_1)); + } + } + } + + bool InputFinished = false; + size_t InFlight = 0ULL; + + const IHTTPGateway::TPtr Gateway; + + const ui64 OutputIndex; + IDqComputeActorAsyncOutput::ICallbacks *const Callbacks; + + TActorSystem* const ActorSystem; + + const TString Url; + const IHTTPGateway::THeaders Headers; + const TString Path; + + TUnboxedValueVector Parts; + + std::vector<TRetryParams> RetriesPerPath; + const std::shared_ptr<NS3::TRetryConfig> RetryConfig; + + TString UploadId; +}; + +} // namespace + +std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor( + const NKikimr::NMiniKQL::TTypeEnvironment&, + const NKikimr::NMiniKQL::IFunctionRegistry&, + IHTTPGateway::TPtr gateway, + NS3::TSink&& params, + ui64 outputIndex, + const THashMap<TString, TString>& secureParams, + IDqComputeActorAsyncOutput::ICallbacks* callbacks, + ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, + const std::shared_ptr<NYql::NS3::TRetryConfig>& retryConfig) +{ + const auto token = secureParams.Value(params.GetToken(), TString{}); + const auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token); + const auto authToken = credentialsProviderFactory->CreateProvider()->GetAuthInfo(); + + const auto actor = new TS3WriteActor(outputIndex, std::move(gateway), params.GetUrl(), authToken, params.GetPath(), callbacks, retryConfig); + return {actor, actor}; +} + +} // namespace NYql::NDq + diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h new file mode 100644 index 0000000000..2f759e83e1 --- /dev/null +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h @@ -0,0 +1,24 @@ +#pragma once + +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> +#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> +#include <ydb/library/yql/providers/s3/proto/sink.pb.h> +#include <ydb/library/yql/providers/s3/proto/retry_config.pb.h> +#include <ydb/library/yql/providers/common/token_accessor/client/factory.h> +#include <library/cpp/actors/core/actor.h> + +namespace NYql::NDq { + +std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor( + const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, + const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry, + IHTTPGateway::TPtr gateway, + NS3::TSink&& params, + ui64 inputIndex, + const THashMap<TString, TString>& secureParams, + IDqComputeActorAsyncOutput::ICallbacks* callbacks, + ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, + const std::shared_ptr<NYql::NS3::TRetryConfig>& retryConfig); + +} // namespace NYql::NDq + diff --git a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json index 91be8e870a..4991984cb1 100644 --- a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json +++ b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json @@ -72,6 +72,15 @@ ] }, { + "Name": "TS3Write", + "Base": "TFreeArgCallable", + "Match": {"Type": "Callable", "Name": "Write!"}, + "Children": [ + {"Index": 0, "Name": "World", "Type": "TExprBase"}, + {"Index": 1, "Name": "DataSink", "Type": "TS3DataSink"} + ] + }, + { "Name": "TS3Object", "Base": "TCallable", "Match": {"Type": "Callable", "Name": "S3Object"}, @@ -92,6 +101,47 @@ {"Index": 3, "Name": "RowType", "Type": "TExprBase"}, {"Index": 4, "Name": "ColumnOrder", "Type": "TExprBase", "Optional": true} ] + }, + { + "Name": "TS3Target", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "S3Target"}, + "Children": [ + {"Index": 0, "Name": "Path", "Type": "TCoAtom"}, + {"Index": 1, "Name": "Format", "Type": "TCoAtom"}, + {"Index": 2, "Name": "Settings", "Type": "TExprBase", "Optional": true} + ] + }, + { + "Name": "TS3WriteObject", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "S3WriteObject!"}, + "Children": [ + {"Index": 0, "Name": "World", "Type": "TExprBase"}, + {"Index": 1, "Name": "DataSink", "Type": "TS3DataSink"}, + {"Index": 2, "Name": "Target", "Type": "TS3Target"}, + {"Index": 3, "Name": "Input", "Type": "TExprBase"} + ] + }, + { + "Name": "TS3SinkSettings", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "S3SinkSettings"}, + "Children": [ + {"Index": 0, "Name": "Path", "Type": "TCoAtom"}, + {"Index": 1, "Name": "Settings", "Type": "TCoNameValueTupleList"}, + {"Index": 2, "Name": "Token", "Type": "TCoSecureParam"} + ] + }, + { + "Name": "TS3SinkOutput", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "S3SinkOutput"}, + "Children": [ + {"Index": 0, "Name": "Input", "Type": "TExprBase"}, + {"Index": 1, "Name": "Format", "Type": "TCoAtom"}, + {"Index": 2, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true} + ] } ] } diff --git a/ydb/library/yql/providers/s3/proto/CMakeLists.txt b/ydb/library/yql/providers/s3/proto/CMakeLists.txt index 6c6b6ecd03..ccdedfbd0c 100644 --- a/ydb/library/yql/providers/s3/proto/CMakeLists.txt +++ b/ydb/library/yql/providers/s3/proto/CMakeLists.txt @@ -16,6 +16,7 @@ target_link_libraries(providers-s3-proto PUBLIC target_proto_messages(providers-s3-proto PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/proto/range.proto ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/proto/retry_config.proto + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/proto/sink.proto ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/proto/source.proto ) target_proto_addincls(providers-s3-proto diff --git a/ydb/library/yql/providers/s3/proto/sink.proto b/ydb/library/yql/providers/s3/proto/sink.proto new file mode 100644 index 0000000000..bbc0405af3 --- /dev/null +++ b/ydb/library/yql/providers/s3/proto/sink.proto @@ -0,0 +1,11 @@ +syntax = "proto3"; +option cc_enable_arenas = true; + +package NYql.NS3; + +message TSink { + string Url = 1; + string Token = 2; + string Path = 3; +} + diff --git a/ydb/library/yql/providers/s3/provider/CMakeLists.txt b/ydb/library/yql/providers/s3/provider/CMakeLists.txt index 9a1814b1a1..0e2cc06957 100644 --- a/ydb/library/yql/providers/s3/provider/CMakeLists.txt +++ b/ydb/library/yql/providers/s3/provider/CMakeLists.txt @@ -56,6 +56,7 @@ target_sources(providers-s3-provider PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_provider_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink.cpp index 92c99b1f24..56e5a1cd52 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink.cpp @@ -1,4 +1,5 @@ #include "yql_s3_provider_impl.h" +#include "yql_s3_dq_integration.h" #include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h> #include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h> @@ -15,16 +16,28 @@ using namespace NNodes; namespace { +void ScanPlanDependencies(const TExprNode::TPtr& input, TExprNode::TListType& children) { + VisitExpr(input, [&children](const TExprNode::TPtr& node) { + if (node->IsCallable("DqCnResult")) { + children.push_back(node->HeadPtr()); + return false; + } + + return true; + }); +} + class TS3DataSinkProvider : public TDataProviderBase { public: - TS3DataSinkProvider(TS3State::TPtr state) + TS3DataSinkProvider(TS3State::TPtr state, IHTTPGateway::TPtr) : State_(state) , TypeAnnotationTransformer_(CreateS3DataSinkTypeAnnotationTransformer(State_)) , ExecutionTransformer_(CreateS3DataSinkExecTransformer(State_)) , LogicalOptProposalTransformer_(CreateS3LogicalOptProposalTransformer(State_)) - { - } - + , PhysicalOptProposalTransformer_(CreateS3PhysicalOptProposalTransformer(State_)) + , DqIntegration_(CreateS3DqIntegration(State_)) + {} +private: TStringBuf GetName() const override { return S3ProviderName; } @@ -46,6 +59,10 @@ public: return *ExecutionTransformer_; } + IGraphTransformer& GetPhysicalOptProposalTransformer() override { + return *PhysicalOptProposalTransformer_; + } + bool CanExecute(const TExprNode& node) override { return ExecutionTransformer_->CanExec(node); } @@ -69,17 +86,59 @@ public: IGraphTransformer& GetLogicalOptProposalTransformer() override { return *LogicalOptProposalTransformer_; } -private: + + TExprNode::TPtr RewriteIO(const TExprNode::TPtr& write, TExprContext& ctx) override { + const TS3Write w(write); + auto settings = write->Tail().ChildrenList(); + return Build<TS3WriteObject>(ctx, w.Pos()) + .World(w.World()) + .DataSink(w.DataSink()) + .Target<TS3Target>() + .Path(write->Child(2U)->Head().Tail().HeadPtr()) + .Format(ExtractFormat(settings)) + .Settings(ctx.NewList(w.Pos(), std::move(settings))) + .Build() + .Input(write->ChildPtr(3)) + .Done().Ptr(); + } + + void GetOutputs(const TExprNode& node, TVector<TPinInfo>& outputs) override { + if (const auto& maybeOp = TMaybeNode<TS3WriteObject>(&node)) { + const auto& op = maybeOp.Cast(); + outputs.push_back(TPinInfo(nullptr, op.DataSink().Raw(), op.Target().Raw(), op.DataSink().Cluster().StringValue() + '.' + op.Target().Path().StringValue(), false)); + } + } + + bool GetDependencies(const TExprNode& node, TExprNode::TListType& children, bool) override { + if (CanExecute(node)) { + children.push_back(node.HeadPtr()); + + if (TS3WriteObject::Match(&node)) { + ScanPlanDependencies(node.ChildPtr(TS3WriteObject::idx_Input), children); + } + + return true; + } + + return false; + } + + IDqIntegration* GetDqIntegration() override { + return DqIntegration_.Get(); + } + const TS3State::TPtr State_; const THolder<TVisitorTransformerBase> TypeAnnotationTransformer_; const THolder<TExecTransformerBase> ExecutionTransformer_; const THolder<IGraphTransformer> LogicalOptProposalTransformer_; + const THolder<IGraphTransformer> PhysicalOptProposalTransformer_; + const THolder<IDqIntegration> DqIntegration_; }; } -TIntrusivePtr<IDataProvider> CreateS3DataSink(TS3State::TPtr state) { - return new TS3DataSinkProvider(state); +TIntrusivePtr<IDataProvider> CreateS3DataSink(TS3State::TPtr state, IHTTPGateway::TPtr gateway) { + return new TS3DataSinkProvider(std::move(state), std::move(gateway)); } } // namespace NYql diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_execution.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_execution.cpp index 1eb216e5a5..684bf5b7bf 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_execution.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_execution.cpp @@ -1,11 +1,13 @@ #include "yql_s3_provider_impl.h" #include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h> +#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h> #include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h> #include <ydb/library/yql/providers/common/provider/yql_provider.h> #include <ydb/library/yql/providers/common/provider/yql_provider_names.h> #include <ydb/library/yql/providers/common/provider/yql_data_provider_impl.h> +#include <ydb/library/yql/providers/result/expr_nodes/yql_res_expr_nodes.h> #include <ydb/library/yql/utils/log/log.h> @@ -20,11 +22,92 @@ public: TS3DataSinkExecTransformer(TS3State::TPtr state) : State_(state) { - AddHandler({TCoCommit::CallableName()}, RequireFirst(), Pass()); + AddHandler({TCoCommit::CallableName()}, RequireFirst(), Hndl(&TS3DataSinkExecTransformer::HandleCommit)); } - private: - TS3State::TPtr State_; + TStatusCallbackPair HandleCommit(const TExprNode::TPtr& input, TExprContext& ctx) { + if (TDqQuery::Match(input->Child(TCoCommit::idx_World))) { + return DelegateExecutionToDqProvider(input->ChildPtr(TCoCommit::idx_World), input, ctx); + } else { // Pass + input->SetState(TExprNode::EState::ExecutionComplete); + input->SetResult(ctx.NewWorld(input->Pos())); + return SyncOk(); + } + } + + TStatusCallbackPair DelegateExecutionToDqProvider(const TExprNode::TPtr& input, const TExprNode::TPtr& originInput, TExprContext& ctx) { + YQL_CLOG(INFO, ProviderS3) << "Delegate execution of " << input->Content() << " to DQ provider."; + auto delegatedNode = Build<TPull>(ctx, input->Pos()) + .Input(input) + .BytesLimit() + .Value(TString()) + .Build() + .RowsLimit() + .Value(TString("0")) + .Build() + .FormatDetails() + .Value(ToString((ui32)NYson::EYsonFormat::Binary)) + .Build() + .Settings() + .Build() + .Format() + .Value(ToString("0")) + .Build() + .PublicId() + .Value("id") + .Build() + .Discard() + .Value(ToString(true)) + .Build() + .Origin(originInput) + .Done() + .Ptr(); + + for (auto idx: {TResOrPullBase::idx_BytesLimit, TResOrPullBase::idx_RowsLimit, TResOrPullBase::idx_FormatDetails, + TResOrPullBase::idx_Format, TResOrPullBase::idx_PublicId, TResOrPullBase::idx_Discard }) { + delegatedNode->Child(idx)->SetTypeAnn(ctx.MakeType<TUnitExprType>()); + delegatedNode->Child(idx)->SetState(TExprNode::EState::ConstrComplete); + } + + delegatedNode->SetTypeAnn(originInput->GetTypeAnn()); + delegatedNode->SetState(TExprNode::EState::ConstrComplete); + originInput->SetState(TExprNode::EState::ExecutionInProgress); + + const auto dqProvider = State_->Types->DataSourceMap.FindPtr(DqProviderName); + + TExprNode::TPtr delegatedNodeOutput; + if (const auto status = dqProvider->Get()->GetCallableExecutionTransformer().Transform(delegatedNode, delegatedNodeOutput, ctx); status.Level != TStatus::Async) { + YQL_ENSURE(status.Level != TStatus::Ok, "Asynchronous execution is expected in a happy path."); + return SyncStatus(status); + } + + auto dqFuture = dqProvider->Get()->GetCallableExecutionTransformer().GetAsyncFuture(*delegatedNode); + + TAsyncTransformCallbackFuture callbackFuture = dqFuture.Apply( + [dqProvider, delegatedNode](const NThreading::TFuture<void>& completedFuture) { + return TAsyncTransformCallback( + [completedFuture, dqProvider, delegatedNode](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) { + completedFuture.GetValue(); + output = input; + TExprNode::TPtr delegatedNodeOutput; + auto dqWriteStatus = dqProvider->Get()->GetCallableExecutionTransformer() + .ApplyAsyncChanges(delegatedNode, delegatedNodeOutput, ctx); + + YQL_ENSURE(dqWriteStatus != TStatus::Async, "ApplyAsyncChanges should not return Async."); + + if (dqWriteStatus != TStatus::Ok) + return dqWriteStatus; + + output->SetState(TExprNode::EState::ExecutionComplete); + output->SetResult(ctx.NewAtom(input->Pos(), "DQ_completed")); + return IGraphTransformer::TStatus(IGraphTransformer::TStatus::Ok); + }); + }); + + return std::make_pair(IGraphTransformer::TStatus::Async, callbackFuture); + } + + const TS3State::TPtr State_; }; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp index 30196b3f4a..9fe5d05bb7 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp @@ -23,15 +23,95 @@ public: { using TSelf = TS3DataSinkTypeAnnotationTransformer; AddHandler({TCoCommit::CallableName()}, Hndl(&TSelf::HandleCommit)); + AddHandler({TS3WriteObject::CallableName()}, Hndl(&TSelf::HandleWrite)); + AddHandler({TS3Target::CallableName()}, Hndl(&TSelf::HandleTarget)); + AddHandler({TS3SinkSettings::CallableName()}, Hndl(&TSelf::HandleSink)); + AddHandler({TS3SinkOutput::CallableName()}, Hndl(&TSelf::HandleOutput)); } - +private: TStatus HandleCommit(TExprBase input, TExprContext&) { const auto commit = input.Cast<TCoCommit>(); input.Ptr()->SetTypeAnn(commit.World().Ref().GetTypeAnn()); return TStatus::Ok; } -private: + TStatus HandleWrite(const TExprNode::TPtr& input, TExprContext& ctx) { + if (!EnsureArgsCount(*input, 4U, ctx)) { + return TStatus::Error; + } + + if (!EnsureWorldType(*input->Child(TS3WriteObject::idx_World), ctx)) { + return TStatus::Error; + } + + if (!EnsureSpecificDataSink(*input->Child(TS3WriteObject::idx_DataSink), S3ProviderName, ctx)) { + return TStatus::Error; + } + + if (!TS3Target::Match(input->Child(TS3WriteObject::idx_Target))) { + ctx.AddError(TIssue(ctx.GetPosition(input->Child(TS3WriteObject::idx_Target)->Pos()), "Expected S3 target.")); + return TStatus::Error; + } + + input->SetTypeAnn(ctx.MakeType<TWorldExprType>()); + return TStatus::Ok; + } + + TStatus HandleTarget(const TExprNode::TPtr& input, TExprContext& ctx) { + if (!EnsureMinMaxArgsCount(*input, 2U, 3U, ctx)) { + return TStatus::Error; + } + + if (!EnsureAtom(*input->Child(TS3Target::idx_Path), ctx)) { + return TStatus::Error; + } + + if (!EnsureAtom(*input->Child(TS3Target::idx_Format), ctx) || !NCommon::ValidateFormat(input->Child(TS3Target::idx_Format)->Content(), ctx)) { + return TStatus::Error; + } + + if (input->ChildrenSize() > TS3Target::idx_Settings && !EnsureTuple(*input->Child(TS3Target::idx_Settings), ctx)) { + return TStatus::Error; + } +/* TODO + const auto compression = GetCompression(*input->Child(TS3Target::idx_Settings)); + if (!NCommon::ValidateCompression(compression, ctx)) { + return TStatus::Error; + } +*/ + input->SetTypeAnn(ctx.MakeType<TUnitExprType>()); + return TStatus::Ok; + } + + TStatus HandleSink(const TExprNode::TPtr& input, TExprContext& ctx) { + if (!EnsureArgsCount(*input, 3, ctx)) { + return TStatus::Error; + } + input->SetTypeAnn(ctx.MakeType<TVoidExprType>()); + return TStatus::Ok; + } + + TStatus HandleOutput(const TExprNode::TPtr& input, TExprContext& ctx) { + if (!EnsureMinMaxArgsCount(*input, 2U, 3U, ctx)) { + return TStatus::Error; + } + + if (!EnsureFlowType(*input->Child(TS3SinkOutput::idx_Input), ctx)) { + return TStatus::Error; + } + + if (!EnsureAtom(*input->Child(TS3SinkOutput::idx_Format), ctx)) { + return TStatus::Error; + } + + if (input->ChildrenSize() > TS3SinkOutput::idx_Settings && !EnsureTuple(*input->Child(TS3SinkOutput::idx_Settings), ctx)) { + return TStatus::Error; + } + + input->SetTypeAnn(ctx.MakeType<TFlowExprType>(ctx.MakeType<TDataExprType>(EDataSlot::String))); + return TStatus::Ok; + } + const TS3State::TPtr State_; }; diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource.cpp index 8f77b7bfe9..3377e38317 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource.cpp @@ -114,8 +114,6 @@ public: IDqIntegration* GetDqIntegration() override { return DqIntegration_.Get(); } - - private: const TS3State::TPtr State_; const THolder<IGraphTransformer> IODiscoveryTransformer_; diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index af7d70a35e..ba58f6d9ac 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -8,6 +8,7 @@ #include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h> #include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h> #include <ydb/library/yql/providers/s3/proto/range.pb.h> +#include <ydb/library/yql/providers/s3/proto/sink.pb.h> #include <ydb/library/yql/providers/s3/proto/source.pb.h> #include <ydb/library/yql/utils/log/log.h> @@ -173,10 +174,26 @@ public: } } + void FillSinkSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, TString& sinkType) override { + const TDqSink sink(&node); + if (const auto maySettings = sink.Settings().Maybe<TS3SinkSettings>()) { + const auto settings = maySettings.Cast(); + const auto& cluster = sink.DataSink().Cast<TS3DataSink>().Cluster().StringValue(); + const auto& connect = State_->Configuration->Clusters.at(cluster); + + NS3::TSink sinkDesc; + sinkDesc.SetUrl(connect.Url); + sinkDesc.SetToken(settings.Token().Name().StringValue()); + sinkDesc.SetPath(settings.Path().StringValue()); + + protoSettings.PackFrom(sinkDesc); + sinkType = "S3Sink"; + } + } + void RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) override { RegisterDqS3MkqlCompilers(compiler, State_); } - private: const TS3State::TPtr State_; }; diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp index 2626a56dfc..e8c3f5a320 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp @@ -6,14 +6,13 @@ #include <ydb/library/yql/utils/url_builder.h> #include <util/generic/size_literals.h> - #include <contrib/libs/re2/re2/re2.h> +#include <library/cpp/retry/retry_policy.h> #ifdef THROW #undef THROW #endif #include <library/cpp/xml/document/xml-document.h> -#include <library/cpp/retry/retry_policy.h> namespace NYql { @@ -32,17 +31,6 @@ std::array<TExprNode::TPtr, 2U> ExtractSchema(TExprNode::TListType& settings) { return {}; } -TExprNode::TPtr ExtractFormat(TExprNode::TListType& settings) { - for (auto it = settings.cbegin(); settings.cend() != it; ++it) { - if (const auto item = *it; item->Head().IsAtom("format")) { - settings.erase(it); - return item->TailPtr(); - } - } - - return {}; -} - using TItemsMap = std::map<TString, ui64>; using TPendingBuckets = std::unordered_map<std::tuple<TString, TString, TString>, std::tuple<TNodeSet, TItemsMap, TIssues>, THash<std::tuple<TString, TString, TString>>>; @@ -114,6 +102,7 @@ void OnDiscovery( 0U, std::bind(&OnDiscovery, gateway, pos, std::placeholders::_1, std::cref(keys), std::ref(output), std::move(promise), pendingBucketsWPtr, promiseInd, retryPolicy, maxDiscoveryFilesPerQuery), /*data=*/"", + false, retryPolicy); } YQL_CLOG(INFO, ProviderS3) << "Gateway disappeared."; @@ -256,6 +245,7 @@ public: IHTTPGateway::TWeakPtr(Gateway_), ctx.GetPosition((*std::get<TNodeSet>(bucket.second).cbegin())->Pos()), std::placeholders::_1, std::cref(bucket.first), std::ref(bucket.second), std::move(promise), pendingBucketsWPtr, i++, retryPolicy, State_->Configuration->MaxDiscoveryFilesPerQuery), /*data=*/"", + false, retryPolicy ); YQL_CLOG(INFO, ProviderS3) << "Enumerate items in " << std::get<0U>(bucket.first) << std::get<1U>(bucket.first); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp index fab2d6d0f7..d9e83ee3cc 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp @@ -10,6 +10,50 @@ namespace NYql { using namespace NKikimr::NMiniKQL; using namespace NNodes; +namespace { + + +TRuntimeNode BuildSerializeCall( + TRuntimeNode input, + const std::string_view& format, + const std::string_view& /*compression*/, + TType* inputType, + NCommon::TMkqlBuildContext& ctx) +{ + if (format == "raw") { + const auto structType = AS_TYPE(TStructType, AS_TYPE(TFlowType, inputType)->GetItemType()); + MKQL_ENSURE(1U == structType->GetMembersCount(), "Expected single column."); + const auto schemeType = AS_TYPE(TDataType, structType->GetMemberType(0U))->GetSchemeType(); + return ctx.ProgramBuilder.Map(input, + [&](TRuntimeNode item) { + const auto member = ctx.ProgramBuilder.Member(item, structType->GetMemberName(0U)); + return NUdf::TDataType<const char*>::Id == schemeType ? member : ctx.ProgramBuilder.ToString(member); + } + ); + } else if (format == "json_list") { + return ctx.ProgramBuilder.FlatMap( + ctx.ProgramBuilder.Condense(input, ctx.ProgramBuilder.NewList(AS_TYPE(TFlowType, inputType)->GetItemType(), {}), + [&ctx] (TRuntimeNode, TRuntimeNode) { return ctx.ProgramBuilder.NewDataLiteral<bool>(false); }, + [&ctx] (TRuntimeNode item, TRuntimeNode state) { return ctx.ProgramBuilder.Append(state, item); } + ), + [&ctx] (TRuntimeNode list) { + const auto userType = ctx.ProgramBuilder.NewTupleType({ctx.ProgramBuilder.NewTupleType({list.GetStaticType()})}); + return ctx.ProgramBuilder.ToString(ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("Yson2.SerializeJson"), {ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("Yson2.From", {}, userType), {list})})); + } + ); + } + + throw yexception() << "Unsupported format: " << format; +} + +TRuntimeNode SerializeForS3(const TS3SinkOutput& wrapper, NCommon::TMkqlBuildContext& ctx) { + const auto input = MkqlBuildExpr(wrapper.Input().Ref(), ctx); + const auto inputItemType = NCommon::BuildType(wrapper.Input().Ref(), *wrapper.Input().Ref().GetTypeAnn(), ctx.ProgramBuilder); + return BuildSerializeCall(input, wrapper.Format().Value(), "TODO", inputItemType, ctx); +} + +} + void RegisterDqS3MkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler, const TS3State::TPtr&) { compiler.ChainCallable(TDqSourceWideWrap::CallableName(), [](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) { @@ -22,6 +66,12 @@ void RegisterDqS3MkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler, con return TRuntimeNode(); }); + + if (!compiler.HasCallable(TS3SinkOutput::CallableName())) + compiler.AddCallable(TS3SinkOutput::CallableName(), + [](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) { + return SerializeForS3(TS3SinkOutput(&node), ctx); + }); } } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp new file mode 100644 index 0000000000..6718f26061 --- /dev/null +++ b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp @@ -0,0 +1,104 @@ +#include "yql_s3_provider_impl.h" + +#include <ydb/library/yql/utils/log/log.h> +#include <ydb/library/yql/core/yql_opt_utils.h> +#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h> +#include <ydb/library/yql/dq/opt/dq_opt.h> +#include <ydb/library/yql/dq/opt/dq_opt_phy.h> +#include <ydb/library/yql/providers/common/transform/yql_optimize.h> +#include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h> + +namespace NYql { + +namespace { + +using namespace NNodes; +using namespace NDq; + +class TS3PhysicalOptProposalTransformer : public TOptimizeTransformerBase { +public: + explicit TS3PhysicalOptProposalTransformer(TS3State::TPtr state) + : TOptimizeTransformerBase(state->Types, NLog::EComponent::ProviderS3, {}) + , State_(std::move(state)) + { +#define HNDL(name) "PhysicalOptimizer-"#name, Hndl(&TS3PhysicalOptProposalTransformer::name) + AddHandler(0, &TS3WriteObject::Match, HNDL(S3WriteObject)); +#undef HNDL + + SetGlobal(0); // Stage 0 of this optimizer is global => we can remap nodes. + } + + TMaybeNode<TExprBase> S3WriteObject(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) const { + auto write = node.Cast<TS3WriteObject>(); + if (!TDqCnUnionAll::Match(write.Input().Raw())) { // => this code is not for RTMR mode. + return node; + } + + const auto& targetNode = write.Target(); + const auto& cluster = write.DataSink().Cluster().StringValue(); + + const TParentsMap* parentsMap = getParents(); + auto dqUnion = write.Input().Cast<TDqCnUnionAll>(); + if (!NDq::IsSingleConsumerConnection(dqUnion, *parentsMap)) { + return node; + } + + + YQL_CLOG(INFO, ProviderS3) << "Optimize S3WriteObject `" << cluster << "`.`" << targetNode.Path().StringValue() << "`"; + + const auto token = "cluster:default_" + cluster; + + auto dqSink = Build<TDqSink>(ctx, write.Pos()) + .DataSink(write.DataSink()) + .Index(dqUnion.Output().Index()) + .Settings<TS3SinkSettings>() + .Path(write.Target().Path()) + .Settings<TCoNameValueTupleList>().Build() + .Token<TCoSecureParam>() + .Name().Build(token) + .Build() + .Build() + .Done(); + + auto inputStage = dqUnion.Output().Stage().Cast<TDqStage>(); + auto outputsBuilder = Build<TDqStageOutputsList>(ctx, targetNode.Pos()); + if (inputStage.Outputs()) { + outputsBuilder.InitFrom(inputStage.Outputs().Cast()); + } + outputsBuilder.Add(dqSink); + + auto dqStageWithSink = Build<TDqStage>(ctx, inputStage.Pos()) + .InitFrom(inputStage) + .Program<TCoLambda>() + .Args({"input"}) + .Body<TS3SinkOutput>() + .Input<TExprApplier>() + .Apply(inputStage.Program()).With(0, "input") + .Build() + .Format(write.Target().Format()) + .Build() + .Build() + .Outputs(outputsBuilder.Done()) + .Done(); + + auto dqQueryBuilder = Build<TDqQuery>(ctx, write.Pos()); + dqQueryBuilder.World(write.World()); + dqQueryBuilder.SinkStages().Add(dqStageWithSink).Build(); + + optCtx.RemapNode(inputStage.Ref(), dqStageWithSink.Ptr()); + + return dqQueryBuilder.Done(); + } + +private: + TS3State::TPtr State_; +}; + +} // namespace + +THolder<IGraphTransformer> CreateS3PhysicalOptProposalTransformer(TS3State::TPtr state) { + return MakeHolder<TS3PhysicalOptProposalTransformer>(std::move(state)); +} + +} // namespace NYql + diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp index 4d70add3c0..4b11f78ad7 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp @@ -35,7 +35,7 @@ TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway info.Names.insert({TString{S3ProviderName}}); info.Source = CreateS3DataSource(state, gateway); - info.Sink = CreateS3DataSink(state); + info.Sink = CreateS3DataSink(state, gateway); return info; }; diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_provider.h b/ydb/library/yql/providers/s3/provider/yql_s3_provider.h index 12cd74f805..d4367b14b2 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_provider.h +++ b/ydb/library/yql/providers/s3/provider/yql_s3_provider.h @@ -32,6 +32,6 @@ struct TS3State : public TThrRefBase TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory = nullptr); TIntrusivePtr<IDataProvider> CreateS3DataSource(TS3State::TPtr state, IHTTPGateway::TPtr gateway); -TIntrusivePtr<IDataProvider> CreateS3DataSink(TS3State::TPtr state); +TIntrusivePtr<IDataProvider> CreateS3DataSink(TS3State::TPtr state, IHTTPGateway::TPtr gateway); } // namespace NYql diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_provider_impl.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_provider_impl.cpp index 6c59efc837..cf92965115 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_provider_impl.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_provider_impl.cpp @@ -4,4 +4,15 @@ namespace NYql { +TExprNode::TPtr ExtractFormat(TExprNode::TListType& settings) { + for (auto it = settings.cbegin(); settings.cend() != it; ++it) { + if (const auto item = *it; item->Head().IsAtom("format")) { + settings.erase(it); + return item->TailPtr(); + } + } + + return {}; +} + } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_provider_impl.h b/ydb/library/yql/providers/s3/provider/yql_s3_provider_impl.h index 732615b23f..e3694a3ba9 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_provider_impl.h +++ b/ydb/library/yql/providers/s3/provider/yql_s3_provider_impl.h @@ -19,6 +19,8 @@ THolder<TExecTransformerBase> CreateS3DataSinkExecTransformer(TS3State::TPtr sta THolder<IGraphTransformer> CreateS3LogicalOptProposalTransformer(TS3State::TPtr state); THolder<IGraphTransformer> CreateS3SourceCallableExecutionTransformer(TS3State::TPtr state); THolder<IGraphTransformer> CreateS3IODiscoveryTransformer(TS3State::TPtr state, IHTTPGateway::TPtr gateway); +THolder<IGraphTransformer> CreateS3PhysicalOptProposalTransformer(TS3State::TPtr state); +TExprNode::TPtr ExtractFormat(TExprNode::TListType& settings); } // namespace NYql |