aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <Anton.Romanov@ydb.tech>2022-07-21 17:21:34 +0300
committera-romanov <Anton.Romanov@ydb.tech>2022-07-21 17:21:34 +0300
commitd926129dfb1c137e8167e47d4ec5c2936b3a10d7 (patch)
tree2bf2cdb2f01447cb98a9ddb5b97ec3fe2fee5c38
parent9c219e382a4d947d003aff1f83cd782a34cd56af (diff)
downloadydb-d926129dfb1c137e8167e47d4ec5c2936b3a10d7.tar.gz
S3 split on partitions on write.
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h1
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h3
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.cpp4
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.h2
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp4
-rw-r--r--ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt2
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp2
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp33
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h1
-rw-r--r--ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json2
-rw-r--r--ydb/library/yql/providers/s3/proto/sink.proto2
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp5
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp14
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp42
14 files changed, 77 insertions, 40 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
index d934f437f4..2757809c7c 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
@@ -160,6 +160,7 @@ public:
const THashMap<TString, TString>& SecureParams;
const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv;
const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
+ IRandomProvider *const RandomProvider;
};
struct TInputTransformArguments {
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 068b36662e..513c002e2f 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -1456,7 +1456,8 @@ protected:
.Callback = static_cast<TSinkCallbacks*>(this),
.SecureParams = secureParams,
.TypeEnv = typeEnv,
- .HolderFactory = holderFactory
+ .HolderFactory = holderFactory,
+ .RandomProvider = TaskRunner ? TaskRunner->GetRandomProvider() : nullptr
});
} catch (const std::exception& ex) {
throw yexception() << "Failed to create sink " << outputDesc.GetSink().GetType() << ": " << ex.what();
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
index 03a45041e0..13a79ebb06 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
@@ -715,6 +715,10 @@ public:
return Settings.TaskParams;
}
+ IRandomProvider* GetRandomProvider() const override {
+ return Context.RandomProvider;
+ }
+
void UpdateStats() override {
if (Stats) {
Stats->RunStatusTimeMetrics.UpdateStatusTime();
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
index 4fbefc752b..dbeec8cf8d 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
@@ -306,6 +306,8 @@ public:
virtual std::pair<NUdf::TUnboxedValue, IDqAsyncInputBuffer::TPtr> GetInputTransform(ui64 inputIndex) = 0;
virtual std::pair<IDqAsyncOutputBuffer::TPtr, IDqOutputConsumer::TPtr> GetOutputTransform(ui64 outputIndex) = 0;
+ virtual IRandomProvider* GetRandomProvider() const = 0;
+
// if memoryLimit = Nothing() then don't set memory limit, use existing one (if any)
// if memoryLimit = 0 then set unlimited
// otherwise use particular memory limit
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
index eda0fa7fd1..72bf3bc24d 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
@@ -1534,6 +1534,10 @@ public:
return Delegate->IsAllocatorAttached();
}
+ IRandomProvider* GetRandomProvider() const override {
+ return nullptr;
+ }
+
void UpdateStats() override {
}
diff --git a/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt b/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt
index e828c67edb..fdeab19adc 100644
--- a/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt
+++ b/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt
@@ -21,6 +21,8 @@ target_link_libraries(providers-s3-actors PUBLIC
yutil
contrib-libs-fmt
libs-poco-Util
+ cpp-string_utils-base64
+ cpp-string_utils-quote
cpp-xml-document
dq-actors-compute
yql-minikql-computation
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp
index fd9663f4c0..8ad2a0e4ea 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_sink_factory.cpp
@@ -8,7 +8,7 @@ namespace NYql::NDq {
void RegisterS3WriteActorFactory(TDqAsyncIoFactory& factory, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, IHTTPGateway::TPtr gateway, const std::shared_ptr<NYql::NS3::TRetryConfig>& retryConfig) {
factory.RegisterSink<NS3::TSink>("S3Sink",
[credentialsFactory, gateway, retryConfig](NS3::TSink&& settings, IDqAsyncIoFactory::TSinkArguments&& args) {
- return CreateS3WriteActor(args.TypeEnv, *args.HolderFactory.GetFunctionRegistry(), gateway, std::move(settings), args.OutputIndex, args.SecureParams, args.Callback, credentialsFactory, retryConfig);
+ return CreateS3WriteActor(args.TypeEnv, *args.HolderFactory.GetFunctionRegistry(), args.RandomProvider, gateway, std::move(settings), args.OutputIndex, args.SecureParams, args.Callback, credentialsFactory, retryConfig);
});
}
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
index 516cf0edef..633eba0220 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
@@ -8,6 +8,8 @@
#include <library/cpp/actors/core/events.h>
#include <library/cpp/actors/core/event_local.h>
#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/string_utils/base64/base64.h>
+#include <library/cpp/string_utils/quote/quote.h>
#include <util/generic/size_literals.h>
@@ -116,7 +118,7 @@ public:
: Gateway(std::move(gateway))
, CredProvider(std::move(credProvider))
, ActorSystem(TActivationContext::ActorSystem())
- , Key(key), Url(url + key)
+ , Key(key), Url(url)
, RetryConfig(retryConfig)
{}
@@ -303,18 +305,20 @@ public:
TS3WriteActor(ui64 outputIndex,
IHTTPGateway::TPtr gateway,
NYdb::TCredentialsProviderPtr credProvider,
+ IRandomProvider* randomProvider,
const TString& url,
const TString& path,
- const ui32 keyWidth,
+ const std::vector<TString>& keys,
IDqComputeActorAsyncOutput::ICallbacks* callbacks,
const std::shared_ptr<NS3::TRetryConfig>& retryConfig)
: Gateway(std::move(gateway))
, CredProvider(std::move(credProvider))
+ , RandomProvider(randomProvider)
, OutputIndex(outputIndex)
, Callbacks(callbacks)
, Url(url)
, Path(path)
- , KeyWidth(keyWidth)
+ , Keys(keys)
, RetryConfig(retryConfig)
{}
@@ -332,17 +336,22 @@ private:
}
TString MakeKey(const NUdf::TUnboxedValuePod v) const {
- if (!KeyWidth)
+ if (Keys.empty())
return {};
auto elements = v.GetElements();
TStringBuilder key;
- for (auto k = KeyWidth; k; --k) {
+ for (const auto& k : Keys) {
const std::string_view keyPart = (++elements)->AsStringRef();
YQL_ENSURE(std::string_view::npos == keyPart.find('/'), "Invalid partition key, contains '/': " << keyPart);
- key << keyPart << '/';
+ key << k << '=' << keyPart << '/';
}
- return key;
+ return UrlEscapeRet(key);
+ }
+
+ TString MakeSuffix() const {
+ const auto rand = std::make_tuple(RandomProvider->GenUuid4(), RandomProvider->GenRand());
+ return Base64EncodeUrl(TStringBuf(reinterpret_cast<const char*>(&rand), sizeof(rand)));
}
STRICT_STFUNC(StateFunc,
@@ -355,12 +364,12 @@ private:
const auto& key = MakeKey(v);
const auto ins = FileWriteActors.emplace(key, nullptr);
if (ins.second) {
- auto fileWrite = std::make_unique<TS3FileWriteActor>(Gateway, CredProvider, key, Url + Path, RetryConfig);
+ auto fileWrite = std::make_unique<TS3FileWriteActor>(Gateway, CredProvider, key, Url + Path + key + MakeSuffix(), RetryConfig);
ins.first->second = fileWrite.get();
RegisterWithSameMailbox(fileWrite.release());
}
- ins.first->second->SendData(TString((KeyWidth > 0U ? *v.GetElements() : v).AsStringRef()));
+ ins.first->second->SendData(TString((Keys.empty() ? v : *v.GetElements()).AsStringRef()));
}
if (finished)
@@ -385,13 +394,14 @@ private:
const IHTTPGateway::TPtr Gateway;
const NYdb::TCredentialsProviderPtr CredProvider;
+ IRandomProvider *const RandomProvider;
const ui64 OutputIndex;
IDqComputeActorAsyncOutput::ICallbacks *const Callbacks;
const TString Url;
const TString Path;
- const ui32 KeyWidth;
+ const std::vector<TString> Keys;
std::vector<TRetryParams> RetriesPerPath;
const std::shared_ptr<NS3::TRetryConfig> RetryConfig;
@@ -404,6 +414,7 @@ private:
std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor(
const NKikimr::NMiniKQL::TTypeEnvironment&,
const NKikimr::NMiniKQL::IFunctionRegistry&,
+ IRandomProvider* randomProvider,
IHTTPGateway::TPtr gateway,
NS3::TSink&& params,
ui64 outputIndex,
@@ -415,7 +426,7 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor(
const auto token = secureParams.Value(params.GetToken(), TString{});
const auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token);
const auto authToken = credentialsProviderFactory->CreateProvider();
- const auto actor = new TS3WriteActor(outputIndex, std::move(gateway), credentialsProviderFactory->CreateProvider(), params.GetUrl(), params.GetPath(), params.HasKeys() ? params.GetKeys() : 0U, callbacks, retryConfig);
+ const auto actor = new TS3WriteActor(outputIndex, std::move(gateway), credentialsProviderFactory->CreateProvider(), randomProvider, params.GetUrl(), params.GetPath(), std::vector<TString>(params.GetKeys().cbegin(), params.GetKeys().cend()), callbacks, retryConfig);
return {actor, actor};
}
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h
index 2f759e83e1..c998bb20f5 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.h
@@ -12,6 +12,7 @@ namespace NYql::NDq {
std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor(
const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,
const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry,
+ IRandomProvider*,
IHTTPGateway::TPtr gateway,
NS3::TSink&& params,
ui64 inputIndex,
diff --git a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
index 93abdf6b5b..c72c3b79e9 100644
--- a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
+++ b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
@@ -130,7 +130,7 @@
"Match": {"Type": "Callable", "Name": "S3SinkSettings"},
"Children": [
{"Index": 0, "Name": "Path", "Type": "TCoAtom"},
- {"Index": 1, "Name": "Settings", "Type": "TCoNameValueTupleList"},
+ {"Index": 1, "Name": "Settings", "Type": "TExprList"},
{"Index": 2, "Name": "Token", "Type": "TCoSecureParam"}
]
},
diff --git a/ydb/library/yql/providers/s3/proto/sink.proto b/ydb/library/yql/providers/s3/proto/sink.proto
index 2b995a3cab..ab3ad50adf 100644
--- a/ydb/library/yql/providers/s3/proto/sink.proto
+++ b/ydb/library/yql/providers/s3/proto/sink.proto
@@ -7,6 +7,6 @@ message TSink {
string Url = 1;
string Token = 2;
string Path = 3;
- optional uint32 Keys = 4;
+ repeated string Keys = 4;
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
index c67af6e61a..0658761a31 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
@@ -66,6 +66,11 @@ private:
return TStatus::Error;
}
+ if (const auto& path = input->Child(TS3Target::idx_Path)->Content(); path.empty() || path.back() != '/') {
+ ctx.AddError(TIssue(ctx.GetPosition(input->Child(TS3Target::idx_Path)->Pos()), "Expected non empty path to directory ends with '/'."));
+ return TStatus::Error;
+ }
+
if (!EnsureAtom(*input->Child(TS3Target::idx_Format), ctx) || !NCommon::ValidateFormat(input->Child(TS3Target::idx_Format)->Content(), ctx)) {
return TStatus::Error;
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
index 2343f79ea1..064d056b14 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
@@ -19,11 +19,13 @@ using namespace NNodes;
namespace {
-std::string_view GetKeys(const TExprNode& settings) {
+TExprNode::TListType GetKeys(const TExprNode& settings) {
for (auto i = 0U; i < settings.ChildrenSize(); ++i) {
- const auto& child = *settings.Child(i);
- if (child.Head().IsAtom("keys"))
- return child.Tail().Content();
+ if (const auto& child = *settings.Child(i); child.Head().IsAtom("partitionedby")) {
+ auto children = child.ChildrenList();
+ children.erase(children.cbegin());
+ return children;
+ }
}
return {};
}
@@ -230,8 +232,8 @@ public:
sinkDesc.SetUrl(connect.Url);
sinkDesc.SetToken(settings.Token().Name().StringValue());
sinkDesc.SetPath(settings.Path().StringValue());
- if (const auto& keys = GetKeys(maySettings.Settings().Ref()); !keys.empty())
- sinkDesc.SetKeys(::FromString<ui32>(keys));
+ for (const auto& key : GetKeys(maySettings.Settings().Ref()))
+ sinkDesc.MutableKeys()->Add(TString(key->Content()));
protoSettings.PackFrom(sinkDesc);
sinkType = "S3Sink";
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
index 384e828ea4..5173b82ff5 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
@@ -15,18 +15,26 @@ namespace {
using namespace NNodes;
using namespace NDq;
-TExprNode::TListType GetPartitionBy(const TExprNode& settings) {
+TExprNode::TPtr GetPartitionBy(const TExprNode& settings) {
for (auto i = 0U; i < settings.ChildrenSize(); ++i) {
- if (settings.Child(i)->Head().IsAtom("partitionby")) {
- auto children = settings.Child(i)->ChildrenList();
- children.erase(children.cbegin());
- return children;
+ if (settings.Child(i)->Head().IsAtom("partitionedby")) {
+ return settings.ChildPtr(i);
}
}
return {};
}
+TExprNode::TListType GetPartitionKeys(const TExprNode::TPtr& partBy) {
+ if (partBy) {
+ auto children = partBy->ChildrenList();
+ children.erase(children.cbegin());
+ return children;
+ }
+
+ return {};
+}
+
class TS3PhysicalOptProposalTransformer : public TOptimizeTransformerBase {
public:
explicit TS3PhysicalOptProposalTransformer(TS3State::TPtr state)
@@ -43,7 +51,13 @@ public:
const auto& targetNode = write.Target();
const auto& cluster = write.DataSink().Cluster().StringValue();
const auto token = "cluster:default_" + cluster;
- const auto keys = GetPartitionBy(write.Target().Settings().Ref());
+ auto partBy = GetPartitionBy(write.Target().Settings().Ref());
+ auto keys = GetPartitionKeys(partBy);
+
+ auto sinkSettingsBuilder = Build<TExprList>(ctx, targetNode.Pos());
+ if (partBy)
+ sinkSettingsBuilder.Add(std::move(partBy));
+
if (!FindNode(write.Input().Ptr(), [] (const TExprNode::TPtr& node) { return node->IsCallable(TCoDataSource::CallableName()); })) {
YQL_CLOG(INFO, ProviderS3) << "Rewrite pure S3WriteObject `" << cluster << "`.`" << targetNode.Path().StringValue() << "` as stage with sink.";
@@ -69,7 +83,7 @@ public:
.Index().Value("0").Build()
.Settings<TS3SinkSettings>()
.Path(write.Target().Path())
- .Settings<TCoNameValueTupleList>().Build()
+ .Settings(sinkSettingsBuilder.Done())
.Token<TCoSecureParam>()
.Name().Build(token)
.Build()
@@ -116,12 +130,7 @@ public:
.Index().Value("0", TNodeFlags::Default).Build()
.Settings<TS3SinkSettings>()
.Path(write.Target().Path())
- .Settings<TCoNameValueTupleList>()
- .Add()
- .Name().Build("keys", TNodeFlags::Default)
- .Value<TCoAtom>().Build(ToString(keys.size()), TNodeFlags::Default)
- .Build()
- .Build()
+ .Settings(sinkSettingsBuilder.Done())
.Token<TCoSecureParam>()
.Name().Build(token)
.Build()
@@ -153,12 +162,7 @@ public:
.Index(dqUnion.Output().Index())
.Settings<TS3SinkSettings>()
.Path(write.Target().Path())
- .Settings<TCoNameValueTupleList>()
- .Add()
- .Name().Build("keys", TNodeFlags::Default)
- .Value<TCoAtom>().Build(ToString(keys.size()), TNodeFlags::Default)
- .Build()
- .Build()
+ .Settings(sinkSettingsBuilder.Done())
.Token<TCoSecureParam>()
.Name().Build(token)
.Build()