aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <Anton.Romanov@ydb.tech>2022-07-20 12:44:09 +0300
committera-romanov <Anton.Romanov@ydb.tech>2022-07-20 12:44:09 +0300
commit4aab304cf9fba378a52a0bcfc2b5a9663f2300cf (patch)
treedf067052d8fbe065eee06b1ab5b9c9f26c18674a
parent4c24c91c583db70673b56f8a2b85f69e6e4fd8a0 (diff)
downloadydb-4aab304cf9fba378a52a0bcfc2b5a9663f2300cf.tar.gz
S3 split on partitions on write. Draft.
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp257
-rw-r--r--ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json3
-rw-r--r--ydb/library/yql/providers/s3/proto/sink.proto1
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp33
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp11
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp58
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp200
7 files changed, 433 insertions, 130 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
index 24e3997679a..516cf0edefa 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
@@ -40,7 +40,10 @@ struct TEvPrivate {
static_assert(EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)");
// Events
- struct TEvUploadFinished : public TEventLocal<TEvUploadFinished, EvUploadFinished> {};
+ struct TEvUploadFinished : public TEventLocal<TEvUploadFinished, EvUploadFinished> {
+ explicit TEvUploadFinished(const TString& key) : Key(key) {}
+ const TString Key;
+ };
struct TEvUploadError : public TEventLocal<TEvUploadError, EvUploadError> {
explicit TEvUploadError(TIssues&& error) : Error(std::move(error)) {}
@@ -102,83 +105,84 @@ private:
using namespace NKikimr::NMiniKQL;
-class TS3WriteActor : public TActorBootstrapped<TS3WriteActor>, public IDqComputeActorAsyncOutput {
+class TS3FileWriteActor : public TActorBootstrapped<TS3FileWriteActor> {
public:
- TS3WriteActor(ui64 outputIndex,
+ TS3FileWriteActor(
IHTTPGateway::TPtr gateway,
+ NYdb::TCredentialsProviderPtr credProvider,
+ const TString& key,
const TString& url,
- const TString& token,
- const TString& path,
- IDqComputeActorAsyncOutput::ICallbacks* callbacks,
const std::shared_ptr<NS3::TRetryConfig>& retryConfig)
: Gateway(std::move(gateway))
- , OutputIndex(outputIndex)
- , Callbacks(callbacks)
+ , CredProvider(std::move(credProvider))
, ActorSystem(TActivationContext::ActorSystem())
- , Url(url)
- , Headers(MakeHeader(token))
- , Path(path)
+ , Key(key), Url(url + key)
, RetryConfig(retryConfig)
{}
- void Bootstrap() {
- Become(&TS3WriteActor::InitialStateFunc);
- Gateway->Upload(Url + Path + "?uploads", Headers, "", std::bind(&TS3WriteActor::OnUploadsCreated, ActorSystem, SelfId(), std::placeholders::_1), false);
+ void Bootstrap(const TActorId& parentId) {
+ ParentId = parentId;
+ Become(&TS3FileWriteActor::InitialStateFunc);
+ Gateway->Upload(Url + "?uploads", MakeHeader(), "", std::bind(&TS3FileWriteActor::OnUploadsCreated, ActorSystem, SelfId(), ParentId, std::placeholders::_1), false);
}
- static constexpr char ActorName[] = "S3_WRITE_ACTOR";
-private:
- void CommitState(const NDqProto::TCheckpoint&) final {};
- void LoadState(const NDqProto::TSinkState&) final {};
- ui64 GetOutputIndex() const final { return OutputIndex; }
- i64 GetFreeSpace() const final {
- return 1_GB - InFlight - InQueue;
+ static constexpr char ActorName[] = "S3_FILE_WRITE_ACTOR";
+
+ void SendData(TString&& data) {
+ InQueue += data.size();
+ Parts.emplace(std::move(data));
+ if (!UploadId.empty())
+ StartUploadParts();
}
+ void Finish() {
+ InputFinished = true;
+ if (!InFlight && Parts.empty())
+ CommitUploadedParts();
+ }
+
+ i64 GetMemoryUsed() const {
+ return InFlight + InQueue;
+ }
+private:
STRICT_STFUNC(InitialStateFunc,
- hFunc(TEvPrivate::TEvUploadError, Handle);
hFunc(TEvPrivate::TEvUploadStarted, Handle);
)
STRICT_STFUNC(WorkingStateFunc,
- hFunc(TEvPrivate::TEvUploadError, Handle);
hFunc(TEvPrivate::TEvUploadPartFinished, Handle);
)
- STRICT_STFUNC(FinalStateFunc,
- hFunc(TEvPrivate::TEvUploadError, Handle);
- cFunc(TEvPrivate::EvUploadFinished, HandleFinished);
- )
- static void OnUploadsCreated(TActorSystem* actorSystem, TActorId selfId, IHTTPGateway::TResult&& result) {
+ static void OnUploadsCreated(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, IHTTPGateway::TResult&& result) {
switch (result.index()) {
case 0U: try {
const NXml::TDocument xml(std::get<IHTTPGateway::TContent>(std::move(result)).Extract(), NXml::TDocument::String);
if (const auto& root = xml.Root(); root.Name() == "Error") {
const auto& code = root.Node("Code", true).Value<TString>();
const auto& message = root.Node("Message", true).Value<TString>();
- actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << message << ", error: code: " << code)})));
+ actorSystem->Send(new IEventHandle(parentId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << message << ", error: code: " << code)})));
} else if (root.Name() != "InitiateMultipartUploadResult")
- actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response '" << root.Name() << "' on create upload.")})));
+ actorSystem->Send(new IEventHandle(parentId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response '" << root.Name() << "' on create upload.")})));
else {
const NXml::TNamespacesForXPath nss(1U, {"s3", "http://s3.amazonaws.com/doc/2006-03-01/"});
actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadStarted(root.Node("s3:UploadId", false, nss).Value<TString>())));
}
break;
} catch (const std::exception& ex) {
- actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Error '" << ex.what() << "' on parse create upload response.")})));
+ actorSystem->Send(new IEventHandle(parentId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Error '" << ex.what() << "' on parse create upload response.")})));
break;
}
case 1U:
- actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError(std::get<TIssues>(std::move(result)))));
+ actorSystem->Send(new IEventHandle(parentId, TActorId(), new TEvPrivate::TEvUploadError(std::get<TIssues>(std::move(result)))));
break;
default:
- actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected variant index " << result.index() << " on create upload response.")})));
+ actorSystem->Send(new IEventHandle(parentId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected variant index " << result.index() << " on create upload response.")})));
break;
}
}
- static void OnPartUploadFinish(TActorSystem* actorSystem, TActorId selfId, size_t size, size_t index, IHTTPGateway::TResult&& response) {
+ static void OnPartUploadFinish(TActorSystem* actorSystem, TActorId selfId, TActorId parentId, size_t size, size_t index, IHTTPGateway::TResult&& response) {
switch (response.index()) {
case 0U: {
const auto str = std::get<IHTTPGateway::TContent>(std::move(response)).Extract();
@@ -189,16 +193,16 @@ private:
break;
}
}
- actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response:" << Endl << str)})));
+ actorSystem->Send(new IEventHandle(parentId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response:" << Endl << str)})));
break;
}
case 1U:
- actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError(std::get<TIssues>(std::move(response)))));
+ actorSystem->Send(new IEventHandle(parentId, TActorId(), new TEvPrivate::TEvUploadError(std::get<TIssues>(std::move(response)))));
break;
}
}
- static void OnUploadFinish(TActorSystem* actorSystem, TActorId selfId, IHTTPGateway::TResult&& result) {
+ static void OnUploadFinish(TActorSystem* actorSystem, TActorId selfId, const TString& key, IHTTPGateway::TResult&& result) {
switch (result.index()) {
case 0U: try {
const NXml::TDocument xml(std::get<IHTTPGateway::TContent>(std::move(result)).Extract(), NXml::TDocument::String);
@@ -209,7 +213,7 @@ private:
} else if (root.Name() != "CompleteMultipartUploadResult")
actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Unexpected response '" << root.Name() << "' on finish upload.")})));
else
- actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadFinished()));
+ actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadFinished(key)));
break;
} catch (const std::exception& ex) {
actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvUploadError({TIssue(TStringBuilder() << "Error '" << ex.what() << "' on parse finish upload response.")})));
@@ -224,26 +228,9 @@ private:
}
}
- void SendData(TUnboxedValueVector&& data, i64 size, const TMaybe<NDqProto::TCheckpoint>&, bool finished) final {
- InputFinished = finished;
- for (const auto& v : data)
- Parts.emplace(v.AsStringRef());
- data.clear();
- InQueue += size;
- if (!UploadId.empty())
- StartUploadParts();
- }
-
- void Handle(TEvPrivate::TEvUploadError::TPtr& result) {
- Callbacks->OnAsyncOutputError(OutputIndex, result->Get()->Error, true);
- if (!UploadId.empty()) {
- // TODO: Send delete.
- }
- }
-
void Handle(TEvPrivate::TEvUploadStarted::TPtr& result) {
UploadId = result->Get()->UploadId;
- Become(&TS3WriteActor::WorkingStateFunc);
+ Become(&TS3FileWriteActor::WorkingStateFunc);
StartUploadParts();
}
@@ -251,30 +238,13 @@ private:
InFlight -= result->Get()->Size;
Tags[result->Get()->Index] = std::move(result->Get()->ETag);
- if (!InFlight && InputFinished && Parts.empty()) {
- Become(&TS3WriteActor::FinalStateFunc);
- TStringBuilder xml;
- xml << "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" << Endl;
- xml << "<CompleteMultipartUpload xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">" << Endl;
- size_t i = 0U;
- for (const auto& tag : Tags)
- xml << "<Part><PartNumber>" << ++i << "</PartNumber><ETag>" << tag << "</ETag></Part>" << Endl;
- xml << "</CompleteMultipartUpload>" << Endl;
- Gateway->Upload(Url + Path + "?uploadId=" + UploadId, Headers, xml, std::bind(&TS3WriteActor::OnUploadFinish, ActorSystem, SelfId(), std::placeholders::_1), false);
- }
- }
-
- void HandleFinished() {
- return Callbacks->OnAsyncOutputFinished(OutputIndex);
+ if (!InFlight && InputFinished && Parts.empty())
+ CommitUploadedParts();
}
// IActor & IDqComputeActorAsyncOutput
void PassAway() override { // Is called from Compute Actor
- TActorBootstrapped<TS3WriteActor>::PassAway();
- }
-
- static IHTTPGateway::THeaders MakeHeader(const TString& token) {
- return token.empty() ? IHTTPGateway::THeaders() : IHTTPGateway::THeaders{TString("X-YaCloud-SubjectToken:") += token};
+ TActorBootstrapped<TS3FileWriteActor>::PassAway();
}
void StartUploadParts() {
@@ -283,24 +253,41 @@ private:
const auto index = Tags.size();
Tags.emplace_back();
InFlight += size;
- Gateway->Upload(Url + Path + "?partNumber=" + std::to_string(index + 1) + "&uploadId=" + UploadId, Headers, std::move(Parts.front()), std::bind(&TS3WriteActor::OnPartUploadFinish, ActorSystem, SelfId(), size, index, std::placeholders::_1), true);
+ Gateway->Upload(Url + "?partNumber=" + std::to_string(index + 1) + "&uploadId=" + UploadId, MakeHeader(), std::move(Parts.front()), std::bind(&TS3FileWriteActor::OnPartUploadFinish, ActorSystem, SelfId(), ParentId, size, index, std::placeholders::_1), true);
}
}
+ void CommitUploadedParts() {
+ Become(nullptr);
+ TStringBuilder xml;
+ xml << "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" << Endl;
+ xml << "<CompleteMultipartUpload xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">" << Endl;
+ size_t i = 0U;
+ for (const auto& tag : Tags)
+ xml << "<Part><PartNumber>" << ++i << "</PartNumber><ETag>" << tag << "</ETag></Part>" << Endl;
+ xml << "</CompleteMultipartUpload>" << Endl;
+ Gateway->Upload(Url + "?uploadId=" + UploadId, MakeHeader(), xml, std::bind(&TS3FileWriteActor::OnUploadFinish, ActorSystem, ParentId, Key, std::placeholders::_1), false);
+ }
+
+ IHTTPGateway::THeaders MakeHeader() const {
+ if (const auto& token = CredProvider->GetAuthInfo(); token.empty())
+ return IHTTPGateway::THeaders();
+ else
+ return IHTTPGateway::THeaders{TString("X-YaCloud-SubjectToken:") += token};
+ }
+
bool InputFinished = false;
size_t InQueue = 0ULL;
size_t InFlight = 0ULL;
const IHTTPGateway::TPtr Gateway;
-
- const ui64 OutputIndex;
- IDqComputeActorAsyncOutput::ICallbacks *const Callbacks;
+ const NYdb::TCredentialsProviderPtr CredProvider;
TActorSystem* const ActorSystem;
+ TActorId ParentId;
+ const TString Key;
const TString Url;
- const IHTTPGateway::THeaders Headers;
- const TString Path;
std::queue<TString> Parts;
std::vector<TString> Tags;
@@ -311,6 +298,107 @@ private:
TString UploadId;
};
+class TS3WriteActor : public TActorBootstrapped<TS3WriteActor>, public IDqComputeActorAsyncOutput {
+public:
+ TS3WriteActor(ui64 outputIndex,
+ IHTTPGateway::TPtr gateway,
+ NYdb::TCredentialsProviderPtr credProvider,
+ const TString& url,
+ const TString& path,
+ const ui32 keyWidth,
+ IDqComputeActorAsyncOutput::ICallbacks* callbacks,
+ const std::shared_ptr<NS3::TRetryConfig>& retryConfig)
+ : Gateway(std::move(gateway))
+ , CredProvider(std::move(credProvider))
+ , OutputIndex(outputIndex)
+ , Callbacks(callbacks)
+ , Url(url)
+ , Path(path)
+ , KeyWidth(keyWidth)
+ , RetryConfig(retryConfig)
+ {}
+
+ void Bootstrap() {
+ Become(&TS3WriteActor::StateFunc);
+ }
+
+ static constexpr char ActorName[] = "S3_WRITE_ACTOR";
+private:
+ void CommitState(const NDqProto::TCheckpoint&) final {};
+ void LoadState(const NDqProto::TSinkState&) final {};
+ ui64 GetOutputIndex() const final { return OutputIndex; }
+ i64 GetFreeSpace() const final {
+ return std::accumulate(FileWriteActors.cbegin(), FileWriteActors.cend(), i64(1_GB), [](i64 free, const std::pair<const TString, TS3FileWriteActor*>& item){ return free - item.second->GetMemoryUsed(); });
+ }
+
+ TString MakeKey(const NUdf::TUnboxedValuePod v) const {
+ if (!KeyWidth)
+ return {};
+
+ auto elements = v.GetElements();
+ TStringBuilder key;
+ for (auto k = KeyWidth; k; --k) {
+ const std::string_view keyPart = (++elements)->AsStringRef();
+ YQL_ENSURE(std::string_view::npos == keyPart.find('/'), "Invalid partition key, contains '/': " << keyPart);
+ key << keyPart << '/';
+ }
+ return key;
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvPrivate::TEvUploadError, Handle);
+ hFunc(TEvPrivate::TEvUploadFinished, Handle);
+ )
+
+ void SendData(TUnboxedValueVector&& data, i64, const TMaybe<NDqProto::TCheckpoint>&, bool finished) final {
+ for (const auto& v : data) {
+ const auto& key = MakeKey(v);
+ const auto ins = FileWriteActors.emplace(key, nullptr);
+ if (ins.second) {
+ auto fileWrite = std::make_unique<TS3FileWriteActor>(Gateway, CredProvider, key, Url + Path, RetryConfig);
+ ins.first->second = fileWrite.get();
+ RegisterWithSameMailbox(fileWrite.release());
+ }
+
+ ins.first->second->SendData(TString((KeyWidth > 0U ? *v.GetElements() : v).AsStringRef()));
+ }
+
+ if (finished)
+ std::for_each(FileWriteActors.cbegin(), FileWriteActors.cend(), [](const std::pair<const TString, TS3FileWriteActor*>& item){ item.second->Finish(); });
+ data.clear();
+ }
+
+ void Handle(TEvPrivate::TEvUploadError::TPtr& result) {
+ Callbacks->OnAsyncOutputError(OutputIndex, result->Get()->Error, true);
+ }
+
+ void Handle(TEvPrivate::TEvUploadFinished::TPtr& result) {
+ FileWriteActors.erase(result->Get()->Key);
+ if (FileWriteActors.empty())
+ Callbacks->OnAsyncOutputFinished(OutputIndex);
+ }
+
+ // IActor & IDqComputeActorAsyncOutput
+ void PassAway() override { // Is called from Compute Actor
+ TActorBootstrapped<TS3WriteActor>::PassAway();
+ }
+
+ const IHTTPGateway::TPtr Gateway;
+ const NYdb::TCredentialsProviderPtr CredProvider;
+
+ const ui64 OutputIndex;
+ IDqComputeActorAsyncOutput::ICallbacks *const Callbacks;
+
+ const TString Url;
+ const TString Path;
+ const ui32 KeyWidth;
+
+ std::vector<TRetryParams> RetriesPerPath;
+ const std::shared_ptr<NS3::TRetryConfig> RetryConfig;
+
+ std::unordered_map<TString, TS3FileWriteActor*> FileWriteActors;
+};
+
} // namespace
std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor(
@@ -326,9 +414,8 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor(
{
const auto token = secureParams.Value(params.GetToken(), TString{});
const auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token);
- const auto authToken = credentialsProviderFactory->CreateProvider()->GetAuthInfo();
-
- const auto actor = new TS3WriteActor(outputIndex, std::move(gateway), params.GetUrl(), authToken, params.GetPath(), callbacks, retryConfig);
+ const auto authToken = credentialsProviderFactory->CreateProvider();
+ const auto actor = new TS3WriteActor(outputIndex, std::move(gateway), credentialsProviderFactory->CreateProvider(), params.GetUrl(), params.GetPath(), params.HasKeys() ? params.GetKeys() : 0U, callbacks, retryConfig);
return {actor, actor};
}
diff --git a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
index 40be1652ccd..93abdf6b5b1 100644
--- a/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
+++ b/ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json
@@ -141,7 +141,8 @@
"Children": [
{"Index": 0, "Name": "Input", "Type": "TExprBase"},
{"Index": 1, "Name": "Format", "Type": "TCoAtom"},
- {"Index": 2, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true}
+ {"Index": 2, "Name": "KeyColumns", "Type": "TCoAtomList"},
+ {"Index": 3, "Name": "Settings", "Type": "TCoNameValueTupleList", "Optional": true}
]
}
]
diff --git a/ydb/library/yql/providers/s3/proto/sink.proto b/ydb/library/yql/providers/s3/proto/sink.proto
index bbc0405af3a..2b995a3cab2 100644
--- a/ydb/library/yql/providers/s3/proto/sink.proto
+++ b/ydb/library/yql/providers/s3/proto/sink.proto
@@ -7,5 +7,6 @@ message TSink {
string Url = 1;
string Token = 2;
string Path = 3;
+ optional uint32 Keys = 4;
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
index 9fe5d05bb7b..c67af6e61a9 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
@@ -92,7 +92,7 @@ private:
}
TStatus HandleOutput(const TExprNode::TPtr& input, TExprContext& ctx) {
- if (!EnsureMinMaxArgsCount(*input, 2U, 3U, ctx)) {
+ if (!EnsureMinMaxArgsCount(*input, 3U, 4U, ctx)) {
return TStatus::Error;
}
@@ -104,11 +104,40 @@ private:
return TStatus::Error;
}
+ if (!EnsureTupleOfAtoms(*input->Child(TS3SinkOutput::idx_KeyColumns), ctx)) {
+ return TStatus::Error;
+ }
+
if (input->ChildrenSize() > TS3SinkOutput::idx_Settings && !EnsureTuple(*input->Child(TS3SinkOutput::idx_Settings), ctx)) {
return TStatus::Error;
}
- input->SetTypeAnn(ctx.MakeType<TFlowExprType>(ctx.MakeType<TDataExprType>(EDataSlot::String)));
+ if (const auto keysCount = input->Child(TS3SinkOutput::idx_KeyColumns)->ChildrenSize()) {
+ const auto source = input->Child(TS3SinkOutput::idx_Input);
+ const auto itemType = source->GetTypeAnn()->Cast<TFlowExprType>()->GetItemType();
+ if (!EnsureStructType(source->Pos(), *itemType, ctx)) {
+ return TStatus::Error;
+ }
+
+ const auto structType = itemType->Cast<TStructExprType>();
+ for (auto i = 0U; i < keysCount; ++i) {
+ const auto key = input->Child(TS3SinkOutput::idx_KeyColumns)->Child(i);
+ if (const auto keyType = structType->FindItemType(key->Content())) {
+ if (!EnsureDataType(key->Pos(), *keyType, ctx)) {
+ return TStatus::Error;
+ }
+ } else {
+ ctx.AddError(TIssue(ctx.GetPosition(key->Pos()), "Missed key column."));
+ return TStatus::Error;
+ }
+
+ TTypeAnnotationNode::TListType itemTypes(keysCount + 1U, ctx.MakeType<TDataExprType>(EDataSlot::Utf8));
+ itemTypes.front() = ctx.MakeType<TDataExprType>(EDataSlot::String);
+ input->SetTypeAnn(ctx.MakeType<TFlowExprType>(ctx.MakeType<TTupleExprType>(itemTypes)));
+ }
+ } else
+ input->SetTypeAnn(ctx.MakeType<TFlowExprType>(ctx.MakeType<TDataExprType>(EDataSlot::String)));
+
return TStatus::Ok;
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
index 2f3b05faa6f..2343f79ea18 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
@@ -19,6 +19,15 @@ using namespace NNodes;
namespace {
+std::string_view GetKeys(const TExprNode& settings) {
+ for (auto i = 0U; i < settings.ChildrenSize(); ++i) {
+ const auto& child = *settings.Child(i);
+ if (child.Head().IsAtom("keys"))
+ return child.Tail().Content();
+ }
+ return {};
+}
+
using namespace NYql::NS3Details;
class TS3DqIntegration: public TDqIntegrationBase {
@@ -221,6 +230,8 @@ public:
sinkDesc.SetUrl(connect.Url);
sinkDesc.SetToken(settings.Token().Name().StringValue());
sinkDesc.SetPath(settings.Path().StringValue());
+ if (const auto& keys = GetKeys(maySettings.Settings().Ref()); !keys.empty())
+ sinkDesc.SetKeys(::FromString<ui32>(keys));
protoSettings.PackFrom(sinkDesc);
sinkType = "S3Sink";
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp
index 8ffbd2be00d..3956e84a996 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp
@@ -3,6 +3,7 @@
#include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h>
#include <ydb/library/yql/providers/common/mkql/parser.h>
+#include <library/cpp/json/json_writer.h>
#include <util/stream/str.h>
namespace NYql {
@@ -12,11 +13,10 @@ using namespace NNodes;
namespace {
-
TRuntimeNode BuildSerializeCall(
TRuntimeNode input,
+ const std::vector<std::string_view>& keys,
const std::string_view& format,
- const std::string_view& /*compression*/,
TType* inputType,
NCommon::TMkqlBuildContext& ctx)
{
@@ -40,14 +40,62 @@ TRuntimeNode BuildSerializeCall(
);
}
- const auto userType = ctx.ProgramBuilder.NewTupleType({ctx.ProgramBuilder.NewTupleType({ctx.ProgramBuilder.NewStreamType(inputItemType)})});
- return ctx.ProgramBuilder.ToFlow(ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("ClickHouseClient.SerializeFormat", {}, userType, format), {ctx.ProgramBuilder.FromFlow(input)}));
+ TString settings;
+ if (!keys.empty()) {
+ const std::unordered_set<std::string_view> set(keys.cbegin(), keys.cend());
+ const auto structType = AS_TYPE(TStructType, inputItemType);
+ MKQL_ENSURE(set.size() < structType->GetMembersCount(), "Expected non key columns.");
+ std::vector<std::pair<std::string_view, TType*>> types(structType->GetMembersCount());
+ const auto keyType = ctx.ProgramBuilder.NewDataType(NUdf::TDataType<NUdf::TUtf8>::Id);
+ for (auto i = 0U; i < types.size(); ++i) {
+ const auto& name = structType->GetMemberName(i);
+ types[i].first = name;
+ types[i].second = set.contains(name) ? keyType : structType->GetMemberType(i);
+ }
+
+ if (const auto newStructType = static_cast<TStructType*>(ctx.ProgramBuilder.NewStructType(types)); !newStructType->IsSameType(*structType)) {
+ input = ctx.ProgramBuilder.Map(input,
+ [&](TRuntimeNode item) {
+ std::vector<std::pair<std::string_view, TRuntimeNode>> members(types.size());
+ for (auto i = 0U; i < members.size(); ++i) {
+ const auto& name = members[i].first = types[i].first;
+ members[i].second = ctx.ProgramBuilder.Member(item, name);
+ if (const auto oldType = structType->GetMemberType(i); !newStructType->GetMemberType(i)->IsSameType(*oldType)) {
+ const auto dataType = AS_TYPE(TDataType, oldType);
+ members[i].second = dataType->GetSchemeType() == NUdf::TDataType<const char*>::Id ?
+ ctx.ProgramBuilder.StrictFromString(members[i].second, newStructType->GetMemberType(i)):
+ ctx.ProgramBuilder.ToString<true>(members[i].second);
+ }
+ }
+
+ return ctx.ProgramBuilder.NewStruct(newStructType, members);
+ }
+ );
+ }
+
+ TStringOutput stream(settings);
+ NJson::TJsonWriter writer(&stream, NJson::TJsonWriterConfig());
+ writer.OpenMap();
+ writer.WriteKey("keys");
+ writer.OpenArray();
+ std::for_each(keys.cbegin(), keys.cend(), [&writer](const std::string_view& key){ writer.Write(key); });
+ writer.CloseArray();
+ writer.CloseMap();
+ writer.Flush();
+ }
+
+ input = ctx.ProgramBuilder.FromFlow(input);
+ const auto userType = ctx.ProgramBuilder.NewTupleType({ctx.ProgramBuilder.NewTupleType({input.GetStaticType()})});
+ return ctx.ProgramBuilder.ToFlow(ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("ClickHouseClient.SerializeFormat", {}, userType, format + settings), {input}));
}
TRuntimeNode SerializeForS3(const TS3SinkOutput& wrapper, NCommon::TMkqlBuildContext& ctx) {
const auto input = MkqlBuildExpr(wrapper.Input().Ref(), ctx);
const auto inputItemType = NCommon::BuildType(wrapper.Input().Ref(), *wrapper.Input().Ref().GetTypeAnn(), ctx.ProgramBuilder);
- return BuildSerializeCall(input, wrapper.Format().Value(), "TODO", inputItemType, ctx);
+ std::vector<std::string_view> keys;
+ keys.reserve(wrapper.KeyColumns().Size());
+ wrapper.KeyColumns().Ref().ForEachChild([&](const TExprNode& key){ keys.emplace_back(key.Content()); });
+ return BuildSerializeCall(input, keys, wrapper.Format().Value(), inputItemType, ctx);
}
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
index 6718f26061d..384e828ea45 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
@@ -15,6 +15,18 @@ namespace {
using namespace NNodes;
using namespace NDq;
+TExprNode::TListType GetPartitionBy(const TExprNode& settings) {
+ for (auto i = 0U; i < settings.ChildrenSize(); ++i) {
+ if (settings.Child(i)->Head().IsAtom("partitionby")) {
+ auto children = settings.Child(i)->ChildrenList();
+ children.erase(children.cbegin());
+ return children;
+ }
+ }
+
+ return {};
+}
+
class TS3PhysicalOptProposalTransformer : public TOptimizeTransformerBase {
public:
explicit TS3PhysicalOptProposalTransformer(TS3State::TPtr state)
@@ -24,74 +36,188 @@ public:
#define HNDL(name) "PhysicalOptimizer-"#name, Hndl(&TS3PhysicalOptProposalTransformer::name)
AddHandler(0, &TS3WriteObject::Match, HNDL(S3WriteObject));
#undef HNDL
-
- SetGlobal(0); // Stage 0 of this optimizer is global => we can remap nodes.
}
- TMaybeNode<TExprBase> S3WriteObject(TExprBase node, TExprContext& ctx, IOptimizationContext& optCtx, const TGetParents& getParents) const {
- auto write = node.Cast<TS3WriteObject>();
- if (!TDqCnUnionAll::Match(write.Input().Raw())) { // => this code is not for RTMR mode.
- return node;
- }
-
+ TMaybeNode<TExprBase> S3WriteObject(TExprBase node, TExprContext& ctx, const TGetParents& getParents) const {
+ const auto& write = node.Cast<TS3WriteObject>();
const auto& targetNode = write.Target();
const auto& cluster = write.DataSink().Cluster().StringValue();
+ const auto token = "cluster:default_" + cluster;
+ const auto keys = GetPartitionBy(write.Target().Settings().Ref());
+
+ if (!FindNode(write.Input().Ptr(), [] (const TExprNode::TPtr& node) { return node->IsCallable(TCoDataSource::CallableName()); })) {
+ YQL_CLOG(INFO, ProviderS3) << "Rewrite pure S3WriteObject `" << cluster << "`.`" << targetNode.Path().StringValue() << "` as stage with sink.";
+ return keys.empty() ?
+ Build<TDqQuery>(ctx, write.Pos())
+ .World(write.World())
+ .SinkStages()
+ .Add<TDqStage>()
+ .Inputs().Build()
+ .Program<TCoLambda>()
+ .Args({})
+ .Body<TS3SinkOutput>()
+ .Input<TCoToFlow>()
+ .Input(write.Input())
+ .Build()
+ .Format(write.Target().Format())
+ .KeyColumns().Build()
+ .Build()
+ .Build()
+ .Outputs<TDqStageOutputsList>()
+ .Add<TDqSink>()
+ .DataSink(write.DataSink())
+ .Index().Value("0").Build()
+ .Settings<TS3SinkSettings>()
+ .Path(write.Target().Path())
+ .Settings<TCoNameValueTupleList>().Build()
+ .Token<TCoSecureParam>()
+ .Name().Build(token)
+ .Build()
+ .Build()
+ .Build()
+ .Build()
+ .Settings().Build()
+ .Build()
+ .Build()
+ .Done():
+ Build<TDqQuery>(ctx, write.Pos())
+ .World(write.World())
+ .SinkStages()
+ .Add<TDqStage>()
+ .Inputs()
+ .Add<TDqCnHashShuffle>()
+ .Output<TDqOutput>()
+ .Stage<TDqStage>()
+ .Inputs().Build()
+ .Program<TCoLambda>()
+ .Args({})
+ .Body<TCoToFlow>()
+ .Input(write.Input())
+ .Build()
+ .Build()
+ .Settings().Build()
+ .Build()
+ .Index().Value("0", TNodeFlags::Default).Build()
+ .Build()
+ .KeyColumns().Add(keys).Build()
+ .Build()
+ .Build()
+ .Program<TCoLambda>()
+ .Args({"in"})
+ .Body<TS3SinkOutput>()
+ .Input("in")
+ .Format(write.Target().Format())
+ .KeyColumns().Add(keys).Build()
+ .Build()
+ .Build()
+ .Outputs<TDqStageOutputsList>()
+ .Add<TDqSink>()
+ .DataSink(write.DataSink())
+ .Index().Value("0", TNodeFlags::Default).Build()
+ .Settings<TS3SinkSettings>()
+ .Path(write.Target().Path())
+ .Settings<TCoNameValueTupleList>()
+ .Add()
+ .Name().Build("keys", TNodeFlags::Default)
+ .Value<TCoAtom>().Build(ToString(keys.size()), TNodeFlags::Default)
+ .Build()
+ .Build()
+ .Token<TCoSecureParam>()
+ .Name().Build(token)
+ .Build()
+ .Build()
+ .Build()
+ .Build()
+ .Settings().Build()
+ .Build()
+ .Build()
+ .Done();
+ }
+
+ if (!TDqCnUnionAll::Match(write.Input().Raw())) {
+ return node;
+ }
const TParentsMap* parentsMap = getParents();
- auto dqUnion = write.Input().Cast<TDqCnUnionAll>();
+ const auto dqUnion = write.Input().Cast<TDqCnUnionAll>();
if (!NDq::IsSingleConsumerConnection(dqUnion, *parentsMap)) {
return node;
}
+ YQL_CLOG(INFO, ProviderS3) << "Rewrite S3WriteObject `" << cluster << "`.`" << targetNode.Path().StringValue() << "` as sink.";
- YQL_CLOG(INFO, ProviderS3) << "Optimize S3WriteObject `" << cluster << "`.`" << targetNode.Path().StringValue() << "`";
-
- const auto token = "cluster:default_" + cluster;
+ const auto inputStage = dqUnion.Output().Stage().Cast<TDqStage>();
- auto dqSink = Build<TDqSink>(ctx, write.Pos())
+ const auto sink = Build<TDqSink>(ctx, write.Pos())
.DataSink(write.DataSink())
.Index(dqUnion.Output().Index())
.Settings<TS3SinkSettings>()
.Path(write.Target().Path())
- .Settings<TCoNameValueTupleList>().Build()
+ .Settings<TCoNameValueTupleList>()
+ .Add()
+ .Name().Build("keys", TNodeFlags::Default)
+ .Value<TCoAtom>().Build(ToString(keys.size()), TNodeFlags::Default)
+ .Build()
+ .Build()
.Token<TCoSecureParam>()
.Name().Build(token)
.Build()
.Build()
.Done();
- auto inputStage = dqUnion.Output().Stage().Cast<TDqStage>();
auto outputsBuilder = Build<TDqStageOutputsList>(ctx, targetNode.Pos());
- if (inputStage.Outputs()) {
+ if (inputStage.Outputs() && keys.empty()) {
outputsBuilder.InitFrom(inputStage.Outputs().Cast());
}
- outputsBuilder.Add(dqSink);
-
- auto dqStageWithSink = Build<TDqStage>(ctx, inputStage.Pos())
- .InitFrom(inputStage)
- .Program<TCoLambda>()
- .Args({"input"})
- .Body<TS3SinkOutput>()
- .Input<TExprApplier>()
- .Apply(inputStage.Program()).With(0, "input")
+ outputsBuilder.Add(sink);
+
+ auto dqStageWithSink = keys.empty() ?
+ Build<TDqStage>(ctx, inputStage.Pos())
+ .InitFrom(inputStage)
+ .Program<TCoLambda>()
+ .Args({"input"})
+ .Body<TS3SinkOutput>()
+ .Input<TExprApplier>()
+ .Apply(inputStage.Program()).With(0, "input")
+ .Build()
+ .Format(write.Target().Format())
+ .KeyColumns()
+ .Add(std::move(keys))
+ .Build()
.Build()
- .Format(write.Target().Format())
.Build()
- .Build()
- .Outputs(outputsBuilder.Done())
- .Done();
-
- auto dqQueryBuilder = Build<TDqQuery>(ctx, write.Pos());
- dqQueryBuilder.World(write.World());
- dqQueryBuilder.SinkStages().Add(dqStageWithSink).Build();
-
- optCtx.RemapNode(inputStage.Ref(), dqStageWithSink.Ptr());
+ .Outputs(outputsBuilder.Done())
+ .Done():
+ Build<TDqStage>(ctx, inputStage.Pos())
+ .Inputs()
+ .Add<TDqCnHashShuffle>()
+ .Output<TDqOutput>()
+ .Stage(inputStage)
+ .Index(dqUnion.Output().Index())
+ .Build()
+ .KeyColumns().Add(keys).Build()
+ .Build()
+ .Build()
+ .Program<TCoLambda>()
+ .Args({"in"})
+ .Body<TS3SinkOutput>()
+ .Input("in")
+ .Format(write.Target().Format())
+ .KeyColumns().Add(std::move(keys)).Build()
+ .Build()
+ .Build()
+ .Settings().Build()
+ .Outputs(outputsBuilder.Done())
+ .Done();
- return dqQueryBuilder.Done();
+ return Build<TDqQuery>(ctx, write.Pos())
+ .World(write.World())
+ .SinkStages().Add(dqStageWithSink).Build()
+ .Done();
}
private:
- TS3State::TPtr State_;
+ const TS3State::TPtr State_;
};
} // namespace