diff options
author | a-romanov <[email protected]> | 2022-06-23 20:04:03 +0300 |
---|---|---|
committer | a-romanov <[email protected]> | 2022-06-23 20:04:03 +0300 |
commit | 014dc364e6d6ad3584f891604069aacfb1f6f941 (patch) | |
tree | 3f11a2ed098fec67453421a611c3ba501cd99f96 | |
parent | a77ae5f11416ba3add5205dea69d1efe9012a096 (diff) |
YQL-15033 Support MrTableConcat for S3 read.
ref:c6121c76879a2f89a765eaa80bdae995caa5650f
-rw-r--r-- | ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp | 151 |
1 files changed, 94 insertions, 57 deletions
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp index 91079b22d2f..2626a56dfc6 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp @@ -21,10 +21,24 @@ namespace { using namespace NNodes; -std::array<TExprNode::TPtr, 2U> GetSchema(const TExprNode& settings) { - for (auto i = 0U; i < settings.ChildrenSize(); ++i) - if (settings.Child(i)->Head().IsAtom("userschema")) - return {settings.Child(i)->ChildPtr(1), settings.Child(i)->ChildrenSize() > 2 ? settings.Child(i)->TailPtr() : TExprNode::TPtr()}; +std::array<TExprNode::TPtr, 2U> ExtractSchema(TExprNode::TListType& settings) { + for (auto it = settings.cbegin(); settings.cend() != it; ++it) { + if (const auto item = *it; item->Head().IsAtom("userschema")) { + settings.erase(it); + return {item->ChildPtr(1), item->ChildrenSize() > 2 ? item->TailPtr() : TExprNode::TPtr()}; + } + } + + return {}; +} + +TExprNode::TPtr ExtractFormat(TExprNode::TListType& settings) { + for (auto it = settings.cbegin(); settings.cend() != it; ++it) { + if (const auto item = *it; item->Head().IsAtom("format")) { + settings.erase(it); + return item->TailPtr(); + } + } return {}; } @@ -193,22 +207,27 @@ public: if (auto reads = FindNodes(input, [&](const TExprNode::TPtr& node) { if (const auto maybeRead = TMaybeNode<TS3Read>(node)) { if (maybeRead.DataSource()) { - return maybeRead.Cast().Arg(2).Ref().IsCallable("MrObject"); + return maybeRead.Cast().Arg(2).Ref().IsCallable({"MrObject", "MrTableConcat"}); } } return false; }); !reads.empty()) { for (auto& r : reads) { const TS3Read read(std::move(r)); - const auto& object = read.Arg(2).Ref(); - const std::string_view& path = object.Head().Content(); - const auto& prefix = path.substr(0U, path.find_first_of("*?{")); + std::unordered_set<std::string_view> paths; + if (const auto& object = read.Arg(2).Ref(); object.IsCallable("MrObject")) + paths.emplace(object.Head().Content()); + else if (object.IsCallable("MrTableConcat")) + object.ForEachChild([&paths](const TExprNode& child){ paths.emplace(child.Head().Tail().Head().Content()); }); const auto& connect = State_->Configuration->Clusters.at(read.DataSource().Cluster().StringValue()); const auto& token = State_->Configuration->Tokens.at(read.DataSource().Cluster().StringValue()); const auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(State_->CredentialsFactory, token); const auto authToken = credentialsProviderFactory->CreateProvider()->GetAuthInfo(); - std::get<TNodeSet>((*PendingBuckets_)[std::make_tuple(connect.Url, TString(prefix), authToken.empty() ? TString() : TString("X-YaCloud-SubjectToken:") += authToken)]).emplace(read.Raw()); + for (const auto& path : paths) { + const auto& prefix = path.substr(0U, path.find_first_of("*?{")); + std::get<TNodeSet>((*PendingBuckets_)[std::make_tuple(connect.Url, TString(prefix), authToken.empty() ? TString() : TString("X-YaCloud-SubjectToken:") += authToken)]).emplace(read.Raw()); + } } } @@ -272,67 +291,85 @@ public: const auto nodes = std::move(std::get<TNodeSet>(bucket.second)); for (const auto r : nodes) { const TS3Read read(r); + + std::vector<std::string_view> keys; const auto& object = read.Arg(2).Ref(); - const std::string_view& path = object.Head().Content(); + if (object.IsCallable("MrObject")) + keys.emplace_back(object.Head().Content()); + else if (object.IsCallable("MrTableConcat")) + object.ForEachChild([&keys](const TExprNode& child){ keys.emplace_back(child.Head().Tail().Head().Content()); }); + const auto& items = std::get<TItemsMap>(bucket.second); YQL_CLOG(INFO, ProviderS3) << "Discovered " << items.size() << " items in " << std::get<0U>(bucket.first) << std::get<1U>(bucket.first); TExprNode::TListType paths; - if (std::string_view::npos != path.find_first_of("?*{")) { - const RE2 re(re2::StringPiece(RegexFromWildcards(path)), RE2::Options()); - paths.reserve(items.size()); - auto total = 0ULL; - for (const auto& item : items) { - if (const re2::StringPiece piece(item.first); re.Match(piece, 0, item.first.size(), RE2::ANCHOR_BOTH, nullptr, 0)) { - if (item.second > State_->Configuration->FileSizeLimit) { - ctx.AddError(TIssue(ctx.GetPosition(object.Pos()), TStringBuilder() << "Object " << item.first << " size " << item.second << " is too large, but limit is " << State_->Configuration->FileSizeLimit)); - return TStatus::Error; + for (const auto& path : keys) { + if (std::string_view::npos != path.find_first_of("?*{")) { + const RE2 re(re2::StringPiece(RegexFromWildcards(path)), RE2::Options()); + paths.reserve(items.size()); + auto total = 0ULL; + for (const auto& item : items) { + if (const re2::StringPiece piece(item.first); re.Match(piece, 0, item.first.size(), RE2::ANCHOR_BOTH, nullptr, 0)) { + if (item.second > State_->Configuration->FileSizeLimit) { + ctx.AddError(TIssue(ctx.GetPosition(object.Pos()), TStringBuilder() << "Object " << item.first << " size " << item.second << " is too large, but limit is " << State_->Configuration->FileSizeLimit)); + return TStatus::Error; + } + + total += item.second; + ++count; + paths.emplace_back( + ctx.Builder(object.Pos()) + .List() + .Atom(0, item.first) + .Atom(1, ToString(item.second), TNodeFlags::Default) + .Seal() + .Build() + ); } - - total += item.second; - ++count; - paths.emplace_back( - ctx.Builder(object.Pos()) - .List() - .Atom(0, item.first) - .Atom(1, ToString(item.second), TNodeFlags::Default) - .Seal() - .Build() - ); } - } - readSize += total; + readSize += total; - if (paths.empty()) { - ctx.AddError(TIssue(ctx.GetPosition(object.Pos()), TStringBuilder() << "Object " << path << " has no items.")); + if (paths.empty()) { + ctx.AddError(TIssue(ctx.GetPosition(object.Pos()), TStringBuilder() << "Object " << path << " has no items.")); + return TStatus::Error; + } + YQL_CLOG(INFO, ProviderS3) << "Object " << path << " has " << paths.size() << " items with total size " << total; + } else if (const auto f = items.find(TString(path)); items.cend() == f) { + ctx.AddError(TIssue(ctx.GetPosition(object.Pos()), TStringBuilder() << "Object " << path << " doesn't exist.")); return TStatus::Error; + } else if (const auto size = f->second; size > State_->Configuration->FileSizeLimit) { + ctx.AddError(TIssue(ctx.GetPosition(object.Pos()), TStringBuilder() << "Object " << path << " size " << size << " is too large, but limit is " << State_->Configuration->FileSizeLimit)); + return TStatus::Error; + } else { + YQL_CLOG(INFO, ProviderS3) << "Object " << path << " size is " << size; + readSize += size; + ++count; + paths.emplace_back( + ctx.Builder(object.Pos()) + .List() + .Atom(0, path) + .Atom(1, ToString(size), TNodeFlags::Default) + .Seal() + .Build() + ); } - YQL_CLOG(INFO, ProviderS3) << "Object " << path << " has " << paths.size() << " items with total size " << total; - } else if (const auto f = items.find(TString(path)); items.cend() == f) { - ctx.AddError(TIssue(ctx.GetPosition(object.Pos()), TStringBuilder() << "Object " << path << " doesn't exist.")); - return TStatus::Error; - } else if (const auto size = f->second; size > State_->Configuration->FileSizeLimit) { - ctx.AddError(TIssue(ctx.GetPosition(object.Pos()), TStringBuilder() << "Object " << path << " size " << size << " is too large, but limit is " << State_->Configuration->FileSizeLimit)); - return TStatus::Error; - } else { - YQL_CLOG(INFO, ProviderS3) << "Object " << path << " size is " << size; - readSize += size; - ++count; - paths.emplace_back( - ctx.Builder(object.Pos()) - .List() - .Add(0, object.HeadPtr()) - .Atom(1, ToString(size), TNodeFlags::Default) - .Seal() - .Build() - ); } - auto children = object.ChildrenList(); - children.front() = ctx.NewList(object.Pos(), std::move(paths)); - auto s3Object = ctx.NewCallable(object.Pos(), TS3Object::CallableName(), std::move(children)); - auto userSchema = GetSchema(*read.Ref().Child(4)); + auto settings = read.Ref().Child(4)->ChildrenList(); + auto userSchema = ExtractSchema(settings); + TExprNode::TPtr s3Object; + if (object.IsCallable("MrObject")) { + auto children = object.ChildrenList(); + children.front() = ctx.NewList(object.Pos(), std::move(paths)); + s3Object = ctx.NewCallable(object.Pos(), TS3Object::CallableName(), std::move(children)); + } else if (object.IsCallable("MrTableConcat")) { + s3Object = Build<TS3Object>(ctx, object.Pos()) + .Paths(ctx.NewList(object.Pos(), std::move(paths))) + .Format(ExtractFormat(settings)) + .Settings(ctx.NewList(object.Pos(), std::move(settings))) + .Done().Ptr(); + } replaces.emplace(r, userSchema.back() ? Build<TS3ReadObject>(ctx, read.Pos()) |