aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@ydb.tech>2022-09-01 09:26:52 +0300
committeraneporada <aneporada@ydb.tech>2022-09-01 09:26:52 +0300
commit7d5476fba99311601d2d2333696c9be1496b5553 (patch)
treed97ee760ff603edacae5cfdff7cb22bc7ef8e1ca
parente83b936566d59e48df0aa4c414beaf0c4519c772 (diff)
downloadydb-7d5476fba99311601d2d2333696c9be1496b5553.tar.gz
[] Do not use greedy regex pattern when parsing Hive paths
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp2
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp36
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_list.cpp95
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_list.h6
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_path.cpp14
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_path.h15
6 files changed, 106 insertions, 62 deletions
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
index 01f6f396e3..6953715653 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
@@ -73,7 +73,7 @@ private:
return TStatus::Error;
}
- TString normalized = NormalizeS3Path(ToString(path));
+ TString normalized = NS3::NormalizePath(ToString(path));
if (normalized == "/") {
ctx.AddError(TIssue(ctx.GetPosition(input->Child(TS3Target::idx_Path)->Pos()), "Unable to write to root directory"));
return TStatus::Error;
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
index 78d10d034b..8c16b913c1 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
@@ -36,6 +36,7 @@ struct TListRequest {
TString Token;
TString Url;
TString Pattern;
+ TMaybe<TString> PathPrefix; // set iff Pattern is regex (not glob pattern)
TVector<IPathGenerator::TColumnWithValue> ColumnValues;
};
@@ -147,7 +148,7 @@ public:
}
const auto& listEntries = std::get<IS3Lister::TListEntries>(listResult);
- if (listEntries.empty() && !IS3Lister::HasWildcards(req.Pattern)) {
+ if (listEntries.empty() && !NS3::HasWildcards(req.Pattern)) {
// request to list particular files that are missing
ctx.AddError(TIssue(ctx.GetPosition(object.Pos()),
TStringBuilder() << "Object " << req.Pattern << " doesn't exist."));
@@ -417,28 +418,43 @@ private:
// treat paths as regular wildcard patterns
req.Pattern = path;
}
- req.Pattern = NormalizeS3Path(req.Pattern);
+ req.Pattern = NS3::NormalizePath(req.Pattern);
reqs.push_back(req);
} else {
- if (IS3Lister::HasWildcards(path)) {
+ if (NS3::HasWildcards(path)) {
ctx.AddError(TIssue(ctx.GetPosition(read.Pos()), TStringBuilder() << "Path prefix: '" << path << "' contains wildcards"));
return false;
}
if (!config.Generator) {
// Hive-style partitioning
+ req.PathPrefix = path;
+ if (!path.empty()) {
+ req.PathPrefix = NS3::NormalizePath(TStringBuilder() << path << "/");
+ if (req.PathPrefix == "/") {
+ req.PathPrefix = "";
+ }
+ }
+ TString pp = *req.PathPrefix;
+ if (!pp.empty() && pp.back() == '/') {
+ pp.pop_back();
+ }
+
TStringBuilder generated;
- generated << path;
+ generated << NS3::EscapeRegex(pp);
for (auto& col : config.Columns) {
- generated << "/" << col << "=*";
+ if (!generated.empty()) {
+ generated << "/";
+ }
+ generated << NS3::EscapeRegex(col) << "=(.*?)";
}
- generated << "/*";
- req.Pattern = NormalizeS3Path(generated);
+ generated << "/(.*)";
+ req.Pattern = generated;
reqs.push_back(req);
} else {
for (auto& rule : config.Generator->GetRules()) {
YQL_ENSURE(rule.ColumnValues.size() == config.Columns.size());
req.ColumnValues.assign(rule.ColumnValues.begin(), rule.ColumnValues.end());
- req.Pattern = NormalizeS3Path(TStringBuilder() << path << "/" << rule.Path << "/*");
+ req.Pattern = NS3::NormalizePath(TStringBuilder() << path << "/" << rule.Path << "/*");
reqs.push_back(req);
}
}
@@ -447,7 +463,9 @@ private:
for (auto& req : reqs) {
RequestsByNode_[read.Raw()].push_back(req);
if (PendingRequests_.find(req) == PendingRequests_.end()) {
- auto future = Lister_->List(req.Token, req.Url, req.Pattern);
+ auto future = req.PathPrefix.Defined() ?
+ Lister_->ListRegex(req.Token, req.Url, req.Pattern, *req.PathPrefix) :
+ Lister_->List(req.Token, req.Url, req.Pattern);
PendingRequests_[req] = future;
futures.push_back(std::move(future));
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp
index 3637d38fc9..87cb666a5b 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp
@@ -1,4 +1,5 @@
#include "yql_s3_list.h"
+#include "yql_s3_path.h"
#include <ydb/library/yql/utils/log/log.h>
#include <ydb/library/yql/utils/url_builder.h>
@@ -23,12 +24,8 @@ ERetryErrorClass RetryS3SlowDown(long httpResponseCode) {
return httpResponseCode == 503 ? ERetryErrorClass::LongRetry : ERetryErrorClass::NoRetry; // S3 Slow Down == 503
}
-size_t GetFirstWildcardPos(const TString& pattern) {
- return pattern.find_first_of("*?{");
-}
-
TString RegexFromWildcards(const std::string_view& pattern) {
- const auto& escaped = RE2::QuoteMeta(re2::StringPiece(pattern));
+ const auto& escaped = NS3::EscapeRegex(ToString(pattern));
TStringBuilder result;
result << "(?s)";
bool slash = false;
@@ -88,44 +85,45 @@ public:
private:
using TResultFilter = std::function<bool (const TString& path, TVector<TString>& matchedGlobs)>;
- static TResultFilter MakeFilter(const TString& pattern, TString& prefix) {
- prefix.clear();
- if (auto pos = GetFirstWildcardPos(pattern); pos != TString::npos) {
- prefix = pattern.substr(0, pos);
- const auto regex = RegexFromWildcards(pattern);
- auto re = std::make_shared<RE2>(re2::StringPiece(regex), RE2::Options());
- YQL_ENSURE(re->ok());
- YQL_ENSURE(re->NumberOfCapturingGroups() > 0);
-
- const size_t numGroups = re->NumberOfCapturingGroups();
- YQL_CLOG(INFO, ProviderS3) << "Got prefix: '" << prefix << "', regex: '" << regex
- << "' with " << numGroups << " capture groups from original pattern '" << pattern << "'";
-
- auto groups = std::make_shared<std::vector<std::string>>(numGroups);
- auto reArgs = std::make_shared<std::vector<re2::RE2::Arg>>(numGroups);
- auto reArgsPtr = std::make_shared<std::vector<re2::RE2::Arg*>>(numGroups);
-
- for (size_t i = 0; i < size_t(numGroups); ++i) {
- (*reArgs)[i] = &(*groups)[i];
- (*reArgsPtr)[i] = &(*reArgs)[i];
- }
-
- return [groups, reArgs, reArgsPtr, re](const TString& path, TVector<TString>& matchedGlobs) {
+ static TResultFilter MakeFilter(const TString& pattern, const TMaybe<TString>& regexPatternPrefix, TString& prefix) {
+ const bool isRegex = regexPatternPrefix.Defined();
+ prefix = isRegex ? *regexPatternPrefix : pattern.substr(0, NS3::GetFirstWildcardPos(pattern));
+ if (!isRegex && prefix == pattern) {
+ // just match for equality
+ return [pattern](const TString& path, TVector<TString>& matchedGlobs) {
matchedGlobs.clear();
- bool matched = re2::RE2::FullMatchN(path, *re, reArgsPtr->data(), reArgsPtr->size());
- if (matched) {
- matchedGlobs.reserve(groups->size());
- for (auto& group : *groups) {
- matchedGlobs.push_back(ToString(group));
- }
- }
- return matched;
+ return path == pattern;
};
}
- prefix = pattern;
- return [pattern](const TString& path, TVector<TString>& matchedGlobs) {
+
+ const auto regex = isRegex ? pattern : RegexFromWildcards(pattern);
+ auto re = std::make_shared<RE2>(re2::StringPiece(regex), RE2::Options());
+ YQL_ENSURE(re->ok());
+ YQL_ENSURE(re->NumberOfCapturingGroups() > 0);
+
+ const size_t numGroups = re->NumberOfCapturingGroups();
+ YQL_CLOG(INFO, ProviderS3) << "Got prefix: '" << prefix << "', regex: '" << regex
+ << "' with " << numGroups << " capture groups from original pattern '" << pattern << "'";
+
+ auto groups = std::make_shared<std::vector<std::string>>(numGroups);
+ auto reArgs = std::make_shared<std::vector<re2::RE2::Arg>>(numGroups);
+ auto reArgsPtr = std::make_shared<std::vector<re2::RE2::Arg*>>(numGroups);
+
+ for (size_t i = 0; i < size_t(numGroups); ++i) {
+ (*reArgs)[i] = &(*groups)[i];
+ (*reArgsPtr)[i] = &(*reArgs)[i];
+ }
+
+ return [groups, reArgs, reArgsPtr, re](const TString& path, TVector<TString>& matchedGlobs) {
matchedGlobs.clear();
- return path == pattern;
+ bool matched = re2::RE2::FullMatchN(path, *re, reArgsPtr->data(), reArgsPtr->size());
+ if (matched) {
+ matchedGlobs.reserve(groups->size());
+ for (auto& group : *groups) {
+ matchedGlobs.push_back(ToString(group));
+ }
+ }
+ return matched;
};
}
@@ -233,10 +231,9 @@ private:
}
- TFuture<TListResult> List(const TString& token, const TString& urlStr, const TString& pattern) override {
+ TFuture<TListResult> DoList(const TString& token, const TString& urlStr, const TString& pattern, const TMaybe<TString>& pathPrefix) {
TString prefix;
- TResultFilter filter = MakeFilter(pattern, prefix);
- YQL_CLOG(INFO, ProviderS3) << "Enumerate items in " << urlStr << pattern;
+ TResultFilter filter = MakeFilter(pattern, pathPrefix, prefix);
const auto retryPolicy = IRetryPolicy<long>::GetExponentialBackoffPolicy(RetryS3SlowDown);
TUrlBuilder urlBuilder(urlStr);
@@ -273,6 +270,16 @@ private:
return future;
}
+ NThreading::TFuture<TListResult> List(const TString& token, const TString& url, const TString& pattern) override {
+ YQL_CLOG(INFO, ProviderS3) << "Enumerating items using glob pattern " << url << pattern;
+ return DoList(token, url, pattern, {});
+ }
+
+ NThreading::TFuture<TListResult> ListRegex(const TString& token, const TString& url, const TString& pattern, const TString& pathPrefix) override {
+ YQL_CLOG(INFO, ProviderS3) << "Enumerating items using RE2 pattern " << url << pattern;
+ return DoList(token, url, pattern, pathPrefix);
+ }
+
const IHTTPGateway::TPtr Gateway;
const ui64 MaxFilesPerQuery;
};
@@ -280,10 +287,6 @@ private:
}
-bool IS3Lister::HasWildcards(const TString& pattern) {
- return GetFirstWildcardPos(pattern) != TString::npos;
-}
-
IS3Lister::TPtr IS3Lister::Make(const IHTTPGateway::TPtr& httpGateway, ui64 maxFilesPerQuery) {
return IS3Lister::TPtr(new TS3Lister(httpGateway, maxFilesPerQuery));
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_list.h b/ydb/library/yql/providers/s3/provider/yql_s3_list.h
index 577c4e507b..1a4e4798d0 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_list.h
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_list.h
@@ -31,9 +31,11 @@ public:
// {variant1, variant2} - list of alternatives
virtual NThreading::TFuture<TListResult> List(const TString& token, const TString& url, const TString& pattern) = 0;
- static TPtr Make(const IHTTPGateway::TPtr& httpGateway, ui64 maxFilesPerQuery = 0);
+ // pattern should be valid RE2 regex
+ // pathPrefix is a "constant" path prefix
+ virtual NThreading::TFuture<TListResult> ListRegex(const TString& token, const TString& url, const TString& pattern, const TString& pathPrefix) = 0;
- static bool HasWildcards(const TString& pattern);
+ static TPtr Make(const IHTTPGateway::TPtr& httpGateway, ui64 maxFilesPerQuery = 0);
};
} \ No newline at end of file
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_path.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_path.cpp
index aafd62d3c0..2fd6b89f3f 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_path.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_path.cpp
@@ -2,9 +2,11 @@
#include <ydb/library/yql/utils/yql_panic.h>
-namespace NYql {
+#include <contrib/libs/re2/re2/re2.h>
-TString NormalizeS3Path(const TString& path, char slash) {
+namespace NYql::NS3 {
+
+TString NormalizePath(const TString& path, char slash) {
YQL_ENSURE(!path.empty());
TString result;
bool start = true;
@@ -26,4 +28,12 @@ TString NormalizeS3Path(const TString& path, char slash) {
return result;
}
+size_t GetFirstWildcardPos(const TString& path) {
+ return path.find_first_of("*?{");
+}
+
+TString EscapeRegex(const TString& str) {
+ return RE2::QuoteMeta(re2::StringPiece(str));
+}
+
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_path.h b/ydb/library/yql/providers/s3/provider/yql_s3_path.h
index 119903108f..8fb1e86606 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_path.h
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_path.h
@@ -2,8 +2,19 @@
#include <util/generic/string.h>
-namespace NYql {
+namespace NYql::NS3 {
-TString NormalizeS3Path(const TString& path, char slash = '/');
+/// remove duplicate slashes
+TString NormalizePath(const TString& path, char slash = '/');
+
+/// supported wildcards are: *, ?, {alt1, alt2, ...}
+size_t GetFirstWildcardPos(const TString& path);
+
+inline bool HasWildcards(const TString& path) {
+ return GetFirstWildcardPos(path) != TString::npos;
+}
+
+/// quotes regex meta characters
+TString EscapeRegex(const TString& str);
}