summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <[email protected]>2022-06-23 20:04:03 +0300
committera-romanov <[email protected]>2022-06-23 20:04:03 +0300
commit014dc364e6d6ad3584f891604069aacfb1f6f941 (patch)
tree3f11a2ed098fec67453421a611c3ba501cd99f96
parenta77ae5f11416ba3add5205dea69d1efe9012a096 (diff)
YQL-15033 Support MrTableConcat for S3 read.
ref:c6121c76879a2f89a765eaa80bdae995caa5650f
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp151
1 files changed, 94 insertions, 57 deletions
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
index 91079b22d2f..2626a56dfc6 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
@@ -21,10 +21,24 @@ namespace {
using namespace NNodes;
-std::array<TExprNode::TPtr, 2U> GetSchema(const TExprNode& settings) {
- for (auto i = 0U; i < settings.ChildrenSize(); ++i)
- if (settings.Child(i)->Head().IsAtom("userschema"))
- return {settings.Child(i)->ChildPtr(1), settings.Child(i)->ChildrenSize() > 2 ? settings.Child(i)->TailPtr() : TExprNode::TPtr()};
+std::array<TExprNode::TPtr, 2U> ExtractSchema(TExprNode::TListType& settings) {
+ for (auto it = settings.cbegin(); settings.cend() != it; ++it) {
+ if (const auto item = *it; item->Head().IsAtom("userschema")) {
+ settings.erase(it);
+ return {item->ChildPtr(1), item->ChildrenSize() > 2 ? item->TailPtr() : TExprNode::TPtr()};
+ }
+ }
+
+ return {};
+}
+
+TExprNode::TPtr ExtractFormat(TExprNode::TListType& settings) {
+ for (auto it = settings.cbegin(); settings.cend() != it; ++it) {
+ if (const auto item = *it; item->Head().IsAtom("format")) {
+ settings.erase(it);
+ return item->TailPtr();
+ }
+ }
return {};
}
@@ -193,22 +207,27 @@ public:
if (auto reads = FindNodes(input, [&](const TExprNode::TPtr& node) {
if (const auto maybeRead = TMaybeNode<TS3Read>(node)) {
if (maybeRead.DataSource()) {
- return maybeRead.Cast().Arg(2).Ref().IsCallable("MrObject");
+ return maybeRead.Cast().Arg(2).Ref().IsCallable({"MrObject", "MrTableConcat"});
}
}
return false;
}); !reads.empty()) {
for (auto& r : reads) {
const TS3Read read(std::move(r));
- const auto& object = read.Arg(2).Ref();
- const std::string_view& path = object.Head().Content();
- const auto& prefix = path.substr(0U, path.find_first_of("*?{"));
+ std::unordered_set<std::string_view> paths;
+ if (const auto& object = read.Arg(2).Ref(); object.IsCallable("MrObject"))
+ paths.emplace(object.Head().Content());
+ else if (object.IsCallable("MrTableConcat"))
+ object.ForEachChild([&paths](const TExprNode& child){ paths.emplace(child.Head().Tail().Head().Content()); });
const auto& connect = State_->Configuration->Clusters.at(read.DataSource().Cluster().StringValue());
const auto& token = State_->Configuration->Tokens.at(read.DataSource().Cluster().StringValue());
const auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(State_->CredentialsFactory, token);
const auto authToken = credentialsProviderFactory->CreateProvider()->GetAuthInfo();
- std::get<TNodeSet>((*PendingBuckets_)[std::make_tuple(connect.Url, TString(prefix), authToken.empty() ? TString() : TString("X-YaCloud-SubjectToken:") += authToken)]).emplace(read.Raw());
+ for (const auto& path : paths) {
+ const auto& prefix = path.substr(0U, path.find_first_of("*?{"));
+ std::get<TNodeSet>((*PendingBuckets_)[std::make_tuple(connect.Url, TString(prefix), authToken.empty() ? TString() : TString("X-YaCloud-SubjectToken:") += authToken)]).emplace(read.Raw());
+ }
}
}
@@ -272,67 +291,85 @@ public:
const auto nodes = std::move(std::get<TNodeSet>(bucket.second));
for (const auto r : nodes) {
const TS3Read read(r);
+
+ std::vector<std::string_view> keys;
const auto& object = read.Arg(2).Ref();
- const std::string_view& path = object.Head().Content();
+ if (object.IsCallable("MrObject"))
+ keys.emplace_back(object.Head().Content());
+ else if (object.IsCallable("MrTableConcat"))
+ object.ForEachChild([&keys](const TExprNode& child){ keys.emplace_back(child.Head().Tail().Head().Content()); });
+
const auto& items = std::get<TItemsMap>(bucket.second);
YQL_CLOG(INFO, ProviderS3) << "Discovered " << items.size() << " items in " << std::get<0U>(bucket.first) << std::get<1U>(bucket.first);
TExprNode::TListType paths;
- if (std::string_view::npos != path.find_first_of("?*{")) {
- const RE2 re(re2::StringPiece(RegexFromWildcards(path)), RE2::Options());
- paths.reserve(items.size());
- auto total = 0ULL;
- for (const auto& item : items) {
- if (const re2::StringPiece piece(item.first); re.Match(piece, 0, item.first.size(), RE2::ANCHOR_BOTH, nullptr, 0)) {
- if (item.second > State_->Configuration->FileSizeLimit) {
- ctx.AddError(TIssue(ctx.GetPosition(object.Pos()), TStringBuilder() << "Object " << item.first << " size " << item.second << " is too large, but limit is " << State_->Configuration->FileSizeLimit));
- return TStatus::Error;
+ for (const auto& path : keys) {
+ if (std::string_view::npos != path.find_first_of("?*{")) {
+ const RE2 re(re2::StringPiece(RegexFromWildcards(path)), RE2::Options());
+ paths.reserve(items.size());
+ auto total = 0ULL;
+ for (const auto& item : items) {
+ if (const re2::StringPiece piece(item.first); re.Match(piece, 0, item.first.size(), RE2::ANCHOR_BOTH, nullptr, 0)) {
+ if (item.second > State_->Configuration->FileSizeLimit) {
+ ctx.AddError(TIssue(ctx.GetPosition(object.Pos()), TStringBuilder() << "Object " << item.first << " size " << item.second << " is too large, but limit is " << State_->Configuration->FileSizeLimit));
+ return TStatus::Error;
+ }
+
+ total += item.second;
+ ++count;
+ paths.emplace_back(
+ ctx.Builder(object.Pos())
+ .List()
+ .Atom(0, item.first)
+ .Atom(1, ToString(item.second), TNodeFlags::Default)
+ .Seal()
+ .Build()
+ );
}
-
- total += item.second;
- ++count;
- paths.emplace_back(
- ctx.Builder(object.Pos())
- .List()
- .Atom(0, item.first)
- .Atom(1, ToString(item.second), TNodeFlags::Default)
- .Seal()
- .Build()
- );
}
- }
- readSize += total;
+ readSize += total;
- if (paths.empty()) {
- ctx.AddError(TIssue(ctx.GetPosition(object.Pos()), TStringBuilder() << "Object " << path << " has no items."));
+ if (paths.empty()) {
+ ctx.AddError(TIssue(ctx.GetPosition(object.Pos()), TStringBuilder() << "Object " << path << " has no items."));
+ return TStatus::Error;
+ }
+ YQL_CLOG(INFO, ProviderS3) << "Object " << path << " has " << paths.size() << " items with total size " << total;
+ } else if (const auto f = items.find(TString(path)); items.cend() == f) {
+ ctx.AddError(TIssue(ctx.GetPosition(object.Pos()), TStringBuilder() << "Object " << path << " doesn't exist."));
return TStatus::Error;
+ } else if (const auto size = f->second; size > State_->Configuration->FileSizeLimit) {
+ ctx.AddError(TIssue(ctx.GetPosition(object.Pos()), TStringBuilder() << "Object " << path << " size " << size << " is too large, but limit is " << State_->Configuration->FileSizeLimit));
+ return TStatus::Error;
+ } else {
+ YQL_CLOG(INFO, ProviderS3) << "Object " << path << " size is " << size;
+ readSize += size;
+ ++count;
+ paths.emplace_back(
+ ctx.Builder(object.Pos())
+ .List()
+ .Atom(0, path)
+ .Atom(1, ToString(size), TNodeFlags::Default)
+ .Seal()
+ .Build()
+ );
}
- YQL_CLOG(INFO, ProviderS3) << "Object " << path << " has " << paths.size() << " items with total size " << total;
- } else if (const auto f = items.find(TString(path)); items.cend() == f) {
- ctx.AddError(TIssue(ctx.GetPosition(object.Pos()), TStringBuilder() << "Object " << path << " doesn't exist."));
- return TStatus::Error;
- } else if (const auto size = f->second; size > State_->Configuration->FileSizeLimit) {
- ctx.AddError(TIssue(ctx.GetPosition(object.Pos()), TStringBuilder() << "Object " << path << " size " << size << " is too large, but limit is " << State_->Configuration->FileSizeLimit));
- return TStatus::Error;
- } else {
- YQL_CLOG(INFO, ProviderS3) << "Object " << path << " size is " << size;
- readSize += size;
- ++count;
- paths.emplace_back(
- ctx.Builder(object.Pos())
- .List()
- .Add(0, object.HeadPtr())
- .Atom(1, ToString(size), TNodeFlags::Default)
- .Seal()
- .Build()
- );
}
- auto children = object.ChildrenList();
- children.front() = ctx.NewList(object.Pos(), std::move(paths));
- auto s3Object = ctx.NewCallable(object.Pos(), TS3Object::CallableName(), std::move(children));
- auto userSchema = GetSchema(*read.Ref().Child(4));
+ auto settings = read.Ref().Child(4)->ChildrenList();
+ auto userSchema = ExtractSchema(settings);
+ TExprNode::TPtr s3Object;
+ if (object.IsCallable("MrObject")) {
+ auto children = object.ChildrenList();
+ children.front() = ctx.NewList(object.Pos(), std::move(paths));
+ s3Object = ctx.NewCallable(object.Pos(), TS3Object::CallableName(), std::move(children));
+ } else if (object.IsCallable("MrTableConcat")) {
+ s3Object = Build<TS3Object>(ctx, object.Pos())
+ .Paths(ctx.NewList(object.Pos(), std::move(paths)))
+ .Format(ExtractFormat(settings))
+ .Settings(ctx.NewList(object.Pos(), std::move(settings)))
+ .Done().Ptr();
+ }
replaces.emplace(r, userSchema.back() ?
Build<TS3ReadObject>(ctx, read.Pos())