diff options
author | a-romanov <Anton.Romanov@ydb.tech> | 2022-07-21 17:21:34 +0300 |
---|---|---|
committer | a-romanov <Anton.Romanov@ydb.tech> | 2022-07-21 17:21:34 +0300 |
commit | d926129dfb1c137e8167e47d4ec5c2936b3a10d7 (patch) | |
tree | 2bf2cdb2f01447cb98a9ddb5b97ec3fe2fee5c38 | |
parent | 9c219e382a4d947d003aff1f83cd782a34cd56af (diff) | |
download | ydb-d926129dfb1c137e8167e47d4ec5c2936b3a10d7.tar.gz |
S3 split on partitions on write.
14 files changed, 77 insertions, 40 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h index d934f437f4..2757809c7c 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h @@ -160,6 +160,7 @@ public: const THashMap<TString, TString>& SecureParams; const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv; const NKikimr::NMiniKQL::THolderFactory& HolderFactory; + IRandomProvider *const RandomProvider; }; struct TInputTransformArguments { diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 068b36662e..513c002e2f 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1456,7 +1456,8 @@ protected: .Callback = static_cast<TSinkCallbacks*>(this), .SecureParams = secureParams, .TypeEnv = typeEnv, - .HolderFactory = holderFactory + .HolderFactory = holderFactory, + .RandomProvider = TaskRunner ? TaskRunner->GetRandomProvider() : nullptr }); } catch (const std::exception& ex) { throw yexception() << "Failed to create sink " << outputDesc.GetSink().GetType() << ": " << ex.what(); diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index 03a45041e0..13a79ebb06 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -715,6 +715,10 @@ public: return Settings.TaskParams; } + IRandomProvider* GetRandomProvider() const override { + return Context.RandomProvider; + } + void UpdateStats() override { if (Stats) { Stats->RunStatusTimeMetrics.UpdateStatusTime(); diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h index 4fbefc752b..dbeec8cf8d 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h @@ -306,6 +306,8 @@ public: virtual std::pair<NUdf::TUnboxedValue, IDqAsyncInputBuffer::TPtr> GetInputTransform(ui64 inputIndex) = 0; virtual std::pair<IDqAsyncOutputBuffer::TPtr, IDqOutputConsumer::TPtr> GetOutputTransform(ui64 outputIndex) = 0; + virtual IRandomProvider* GetRandomProvider() const = 0; + // if memoryLimit = Nothing() then don't set memory limit, use existing one (if any) // if memoryLimit = 0 then set unlimited // otherwise use particular memory limit diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp index eda0fa7fd1..72bf3bc24d 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp @@ -1534,6 +1534,10 @@ public: return Delegate->IsAllocatorAttached(); } + IRandomProvider* GetRandomProvider() const override { + return nullptr; + } + void UpdateStats() override { } diff --git a/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt b/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt index e828c67edb..fdeab19adc 100644 --- a/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt +++ b/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt @@ -21,6 +21,8 @@ target_link_libraries(providers-s3-actors PUBLIC yutil contrib-libs-fmt libs-poco-Util + cpp-string_utils-base64 + cpp-string_utils-quote cpp-xml-document dq-actors-compute yql-minikql-computation diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp index fd9663f4c0..8ad2a0e4ea 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp @@ -8,7 +8,7 @@ namespace NYql::NDq { void RegisterS3WriteActorFactory(TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway, const std::shared_ptr<NYql::NS3::TRetryConfig>& retryConfig) { factory.RegisterSink<NS3::TSink>("S3Sink", [credentialsFactory, gateway, retryConfig](NS3::TSink&& settings, IDqAsyncIoFactory::TSinkArguments&& args) { - return CreateS3WriteActor(args.TypeEnv, *args.HolderFactory.GetFunctionRegistry(), gateway, std::move(settings), args.OutputIndex, args.SecureParams, args.Callback, credentialsFactory, retryConfig); + return CreateS3WriteActor(args.TypeEnv, *args.HolderFactory.GetFunctionRegistry(), args.RandomProvider, gateway, std::move(settings), args.OutputIndex, args.SecureParams, args.Callback, credentialsFactory, retryConfig); }); } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index 516cf0edef..633eba0220 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -8,6 +8,8 @@ #include <library/cpp/actors/core/events.h> #include <library/cpp/actors/core/event_local.h> #include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/string_utils/base64/base64.h> +#include <library/cpp/string_utils/quote/quote.h> #include <util/generic/size_literals.h> @@ -116,7 +118,7 @@ public: : Gateway(std::move(gateway)) , CredProvider(std::move(credProvider)) , ActorSystem(TActivationContext::ActorSystem()) - , Key(key), Url(url + key) + , Key(key), Url(url) , RetryConfig(retryConfig) {} @@ -303,18 +305,20 @@ public: TS3WriteActor(ui64 outputIndex, IHTTPGateway::TPtr gateway, NYdb::TCredentialsProviderPtr credProvider, + IRandomProvider* randomProvider, const TString& url, const TString& path, - const ui32 keyWidth, + const std::vector<TString>& keys, IDqComputeActorAsyncOutput::ICallbacks* callbacks, const std::shared_ptr<NS3::TRetryConfig>& retryConfig) : Gateway(std::move(gateway)) , CredProvider(std::move(credProvider)) + , RandomProvider(randomProvider) , OutputIndex(outputIndex) , Callbacks(callbacks) , Url(url) , Path(path) - , KeyWidth(keyWidth) + , Keys(keys) , RetryConfig(retryConfig) {} @@ -332,17 +336,22 @@ private: } TString MakeKey(const NUdf::TUnboxedValuePod v) const { - if (!KeyWidth) + if (Keys.empty()) return {}; auto elements = v.GetElements(); TStringBuilder key; - for (auto k = KeyWidth; k; --k) { + for (const auto& k : Keys) { const std::string_view keyPart = (++elements)->AsStringRef(); YQL_ENSURE(std::string_view::npos == keyPart.find('/'), "Invalid partition key, contains '/': " << keyPart); - key << keyPart << '/'; + key << k << '=' << keyPart << '/'; } - return key; + return UrlEscapeRet(key); + } + + TString MakeSuffix() const { + const auto rand = std::make_tuple(RandomProvider->GenUuid4(), RandomProvider->GenRand()); + return Base64EncodeUrl(TStringBuf(reinterpret_cast<const char*>(&rand), sizeof(rand))); } STRICT_STFUNC(StateFunc, @@ -355,12 +364,12 @@ private: const auto& key = MakeKey(v); const auto ins = FileWriteActors.emplace(key, nullptr); if (ins.second) { - auto fileWrite = std::make_unique<TS3FileWriteActor>(Gateway, CredProvider, key, Url + Path, RetryConfig); + auto fileWrite = std::make_unique<TS3FileWriteActor>(Gateway, CredProvider, key, Url + Path + key + MakeSuffix(), RetryConfig); ins.first->second = fileWrite.get(); RegisterWithSameMailbox(fileWrite.release()); } - ins.first->second->SendData(TString((KeyWidth > 0U ? *v.GetElements() : v).AsStringRef())); + ins.first->second->SendData(TString((Keys.empty() ? v : *v.GetElements()).AsStringRef())); } if (finished) @@ -385,13 +394,14 @@ private: const IHTTPGateway::TPtr Gateway; const NYdb::TCredentialsProviderPtr CredProvider; + IRandomProvider *const RandomProvider; const ui64 OutputIndex; IDqComputeActorAsyncOutput::ICallbacks *const Callbacks; const TString Url; const TString Path; - const ui32 KeyWidth; + const std::vector<TString> Keys; std::vector<TRetryParams> RetriesPerPath; const std::shared_ptr<NS3::TRetryConfig> RetryConfig; @@ -404,6 +414,7 @@ private: std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor( const NKikimr::NMiniKQL::TTypeEnvironment&, const NKikimr::NMiniKQL::IFunctionRegistry&, + IRandomProvider* randomProvider, IHTTPGateway::TPtr gateway, NS3::TSink&& params, ui64 outputIndex, @@ -415,7 +426,7 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor( const auto token = secureParams.Value(params.GetToken(), TString{}); const auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token); const auto authToken = credentialsProviderFactory->CreateProvider(); - const auto actor = new TS3WriteActor(outputIndex, std::move(gateway), credentialsProviderFactory->CreateProvider(), params.GetUrl(), params.GetPath(), params.HasKeys() ? params.GetKeys() : 0U, callbacks, retryConfig); + const auto actor = new TS3WriteActor(outputIndex, std::move(gateway), credentialsProviderFactory->CreateProvider(), randomProvider, params.GetUrl(), params.GetPath(), std::vector<TString>(params.GetKeys().cbegin(), params.GetKeys().cend()), callbacks, retryConfig); return {actor, actor}; } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h index 2f759e83e1..c998bb20f5 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h @@ -12,6 +12,7 @@ namespace NYql::NDq { std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor( const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry, + IRandomProvider*, IHTTPGateway::TPtr gateway, NS3::TSink&& params, ui64 inputIndex, diff --git a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json index 93abdf6b5b..c72c3b79e9 100644 --- a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json +++ b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json @@ -130,7 +130,7 @@ "Match": {"Type": "Callable", "Name": "S3SinkSettings"}, "Children": [ {"Index": 0, "Name": "Path", "Type": "TCoAtom"}, - {"Index": 1, "Name": "Settings", "Type": "TCoNameValueTupleList"}, + {"Index": 1, "Name": "Settings", "Type": "TExprList"}, {"Index": 2, "Name": "Token", "Type": "TCoSecureParam"} ] }, diff --git a/ydb/library/yql/providers/s3/proto/sink.proto b/ydb/library/yql/providers/s3/proto/sink.proto index 2b995a3cab..ab3ad50adf 100644 --- a/ydb/library/yql/providers/s3/proto/sink.proto +++ b/ydb/library/yql/providers/s3/proto/sink.proto @@ -7,6 +7,6 @@ message TSink { string Url = 1; string Token = 2; string Path = 3; - optional uint32 Keys = 4; + repeated string Keys = 4; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp index c67af6e61a..0658761a31 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp @@ -66,6 +66,11 @@ private: return TStatus::Error; } + if (const auto& path = input->Child(TS3Target::idx_Path)->Content(); path.empty() || path.back() != '/') { + ctx.AddError(TIssue(ctx.GetPosition(input->Child(TS3Target::idx_Path)->Pos()), "Expected non empty path to directory ends with '/'.")); + return TStatus::Error; + } + if (!EnsureAtom(*input->Child(TS3Target::idx_Format), ctx) || !NCommon::ValidateFormat(input->Child(TS3Target::idx_Format)->Content(), ctx)) { return TStatus::Error; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index 2343f79ea1..064d056b14 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -19,11 +19,13 @@ using namespace NNodes; namespace { -std::string_view GetKeys(const TExprNode& settings) { +TExprNode::TListType GetKeys(const TExprNode& settings) { for (auto i = 0U; i < settings.ChildrenSize(); ++i) { - const auto& child = *settings.Child(i); - if (child.Head().IsAtom("keys")) - return child.Tail().Content(); + if (const auto& child = *settings.Child(i); child.Head().IsAtom("partitionedby")) { + auto children = child.ChildrenList(); + children.erase(children.cbegin()); + return children; + } } return {}; } @@ -230,8 +232,8 @@ public: sinkDesc.SetUrl(connect.Url); sinkDesc.SetToken(settings.Token().Name().StringValue()); sinkDesc.SetPath(settings.Path().StringValue()); - if (const auto& keys = GetKeys(maySettings.Settings().Ref()); !keys.empty()) - sinkDesc.SetKeys(::FromString<ui32>(keys)); + for (const auto& key : GetKeys(maySettings.Settings().Ref())) + sinkDesc.MutableKeys()->Add(TString(key->Content())); protoSettings.PackFrom(sinkDesc); sinkType = "S3Sink"; diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp index 384e828ea4..5173b82ff5 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp @@ -15,18 +15,26 @@ namespace { using namespace NNodes; using namespace NDq; -TExprNode::TListType GetPartitionBy(const TExprNode& settings) { +TExprNode::TPtr GetPartitionBy(const TExprNode& settings) { for (auto i = 0U; i < settings.ChildrenSize(); ++i) { - if (settings.Child(i)->Head().IsAtom("partitionby")) { - auto children = settings.Child(i)->ChildrenList(); - children.erase(children.cbegin()); - return children; + if (settings.Child(i)->Head().IsAtom("partitionedby")) { + return settings.ChildPtr(i); } } return {}; } +TExprNode::TListType GetPartitionKeys(const TExprNode::TPtr& partBy) { + if (partBy) { + auto children = partBy->ChildrenList(); + children.erase(children.cbegin()); + return children; + } + + return {}; +} + class TS3PhysicalOptProposalTransformer : public TOptimizeTransformerBase { public: explicit TS3PhysicalOptProposalTransformer(TS3State::TPtr state) @@ -43,7 +51,13 @@ public: const auto& targetNode = write.Target(); const auto& cluster = write.DataSink().Cluster().StringValue(); const auto token = "cluster:default_" + cluster; - const auto keys = GetPartitionBy(write.Target().Settings().Ref()); + auto partBy = GetPartitionBy(write.Target().Settings().Ref()); + auto keys = GetPartitionKeys(partBy); + + auto sinkSettingsBuilder = Build<TExprList>(ctx, targetNode.Pos()); + if (partBy) + sinkSettingsBuilder.Add(std::move(partBy)); + if (!FindNode(write.Input().Ptr(), [] (const TExprNode::TPtr& node) { return node->IsCallable(TCoDataSource::CallableName()); })) { YQL_CLOG(INFO, ProviderS3) << "Rewrite pure S3WriteObject `" << cluster << "`.`" << targetNode.Path().StringValue() << "` as stage with sink."; @@ -69,7 +83,7 @@ public: .Index().Value("0").Build() .Settings<TS3SinkSettings>() .Path(write.Target().Path()) - .Settings<TCoNameValueTupleList>().Build() + .Settings(sinkSettingsBuilder.Done()) .Token<TCoSecureParam>() .Name().Build(token) .Build() @@ -116,12 +130,7 @@ public: .Index().Value("0", TNodeFlags::Default).Build() .Settings<TS3SinkSettings>() .Path(write.Target().Path()) - .Settings<TCoNameValueTupleList>() - .Add() - .Name().Build("keys", TNodeFlags::Default) - .Value<TCoAtom>().Build(ToString(keys.size()), TNodeFlags::Default) - .Build() - .Build() + .Settings(sinkSettingsBuilder.Done()) .Token<TCoSecureParam>() .Name().Build(token) .Build() @@ -153,12 +162,7 @@ public: .Index(dqUnion.Output().Index()) .Settings<TS3SinkSettings>() .Path(write.Target().Path()) - .Settings<TCoNameValueTupleList>() - .Add() - .Name().Build("keys", TNodeFlags::Default) - .Value<TCoAtom>().Build(ToString(keys.size()), TNodeFlags::Default) - .Build() - .Build() + .Settings(sinkSettingsBuilder.Done()) .Token<TCoSecureParam>() .Name().Build(token) .Build() |