aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoruzhas <uzhas@ydb.tech>2022-11-15 17:08:27 +0300
committeruzhas <uzhas@ydb.tech>2022-11-15 17:08:27 +0300
commitb2a62d16c4f4d27d04e5764ebe669ceeb9b5557e (patch)
tree05ef66aef17ec3e6dc9f6e97162a3ac89a23dc6b
parent3d7181a331668643d91bb422a22c07d8f3a7c33e (diff)
downloadydb-b2a62d16c4f4d27d04e5764ebe669ceeb9b5557e.tar.gz
support file filter
-rw-r--r--ydb/core/yq/libs/control_plane_storage/request_validators.cpp19
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp7
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp10
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp57
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_list.cpp55
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_path.cpp56
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_path.h3
7 files changed, 135 insertions, 72 deletions
diff --git a/ydb/core/yq/libs/control_plane_storage/request_validators.cpp b/ydb/core/yq/libs/control_plane_storage/request_validators.cpp
index d67bf5cb2f..00cd0db996 100644
--- a/ydb/core/yq/libs/control_plane_storage/request_validators.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/request_validators.cpp
@@ -106,22 +106,29 @@ NYql::TIssues ValidateConnectionSetting(const YandexQuery::ConnectionSetting& se
}
NYql::TIssues ValidateFormatSetting(const TString& format, const google::protobuf::Map<TString, TString>& formatSetting) {
- NYql::TIssues issues;
+ NYql::TIssues issues;
for (const auto& [key, value]: formatSetting) {
- if (key == "data.interval.unit") {
+ if (key == "file_pattern"sv) {
+ continue;
+ }
+
+ if (key == "data.interval.unit"sv) {
if (!IsValidIntervalUnit(value)) {
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "unknown value for data.interval.unit " + value));
}
- } else if (key == "csv_delimiter") {
- if (format != "csv_with_names") {
+ continue;
+ }
+ if (key == "csv_delimiter"sv) {
+ if (format != "csv_with_names"sv) {
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "csv_delimiter should be used only with format csv_with_names"));
}
if (value.size() != 1) {
issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "csv_delimiter should contain only one character"));
}
- } else {
- issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "unknown format setting " + key));
+ continue;
}
+
+ issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "unknown format setting " + key));
}
return issues;
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
index 19b1c3441a..6626328e13 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
@@ -164,10 +164,15 @@ private:
return true;
}
+ if (name == "filepattern") {
+ // just skip, used in reading only
+ return true;
+ }
+
return true;
};
- if (!EnsureValidSettings(*input->Child(TS3Object::idx_Settings), {"compression", "partitionedby", "mode", "userschema", "csvdelimiter"}, validator, ctx)) {
+ if (!EnsureValidSettings(*input->Child(TS3Object::idx_Settings), {"compression", "partitionedby", "mode", "userschema", "csvdelimiter", "filepattern"}, validator, ctx)) {
return TStatus::Error;
}
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
index ddbced2339..27ce85f423 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
@@ -371,6 +371,14 @@ public:
return true;
}
+ if (name == "filepattern"sv) {
+ TStringBuf unused;
+ if (!ExtractSettingValue(setting.Tail(), "file_pattern"sv, format, {}, ctx, unused)) {
+ return false;
+ }
+ return true;
+ }
+
YQL_ENSURE(name == "projection"sv);
haveProjection = true;
if (!EnsureAtom(setting.Tail(), ctx)) {
@@ -386,7 +394,7 @@ public:
};
if (!EnsureValidSettings(*input->Child(TS3Object::idx_Settings),
{ "compression"sv, "partitionedby"sv, "projection"sv, "data.interval.unit"sv,
- "readmaxbytes"sv, "csvdelimiter"sv, "directories"sv }, validator, ctx))
+ "readmaxbytes"sv, "csvdelimiter"sv, "directories"sv, "filepattern"sv }, validator, ctx))
{
return TStatus::Error;
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
index 396f55417c..7cee2e1550 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
@@ -31,12 +31,31 @@ std::array<TExprNode::TPtr, 2U> ExtractSchema(TExprNode::TListType& settings) {
return {};
}
+bool FindFilePattern(const TExprNode& settings, TExprContext& ctx, TString& filePattern) {
+ auto filePatternSetting = GetSetting(settings, "filepattern");
+ if (!filePatternSetting) {
+ // it is ok if settings is not set
+ return true;
+ }
+
+ if (!EnsureTupleSize(*filePatternSetting, 2, ctx)) {
+ return false;
+ }
+
+ if (!EnsureAtom(filePatternSetting->Tail(), ctx)) {
+ return false;
+ }
+
+ filePattern = filePatternSetting->Tail().Content();
+ return true;
+}
+
using namespace NPathGenerator;
struct TListRequest {
TString Token;
TString Url;
- TString Pattern;
+ TString Pattern; // can contain capturing groups
TMaybe<TString> PathPrefix; // set iff Pattern is regex (not glob pattern)
TVector<IPathGenerator::TColumnWithValue> ColumnValues;
};
@@ -303,8 +322,7 @@ private:
extraValues->push_back(std::move(value));
}
} else {
- // last entry matches file name
- YQL_ENSURE(entry.MatchedGlobs.size() == generatedColumnsConfig->Columns.size() + 1);
+ YQL_ENSURE(entry.MatchedGlobs.size() == generatedColumnsConfig->Columns.size());
for (size_t i = 0; i < generatedColumnsConfig->Columns.size(); ++i) {
TExtraColumnValue value;
value.Name = generatedColumnsConfig->Columns[i];
@@ -461,8 +479,6 @@ private:
}
bool LaunchListsForNode(const TDqSourceWrap& source, TVector<NThreading::TFuture<IS3Lister::TListResult>>& futures, TExprContext& ctx) {
- Y_UNUSED(ctx);
-
TS3DataSource dataSource = source.DataSource().Maybe<TS3DataSource>().Cast();
const auto& connect = State_->Configuration->Clusters.at(dataSource.Cluster().StringValue());
const auto& token = State_->Configuration->Tokens.at(dataSource.Cluster().StringValue());
@@ -471,15 +487,25 @@ private:
const TString url = connect.Url;
const TString tokenStr = credentialsProviderFactory->CreateProvider()->GetAuthInfo();
- for (auto path : source.Input().Maybe<TS3ParseSettingsBase>().Cast().Paths()) {
+ auto s3ParseSettingsBase = source.Input().Maybe<TS3ParseSettingsBase>().Cast();
+ TString filePattern;
+ if (s3ParseSettingsBase.Ref().ChildrenSize() > TS3ParseSettingsBase::idx_Settings) {
+ const auto& settings = *s3ParseSettingsBase.Ref().Child(TS3ParseSettingsBase::idx_Settings);
+ if (!FindFilePattern(settings, ctx, filePattern)) {
+ return false;
+ }
+ }
+ const TString effectiveFilePattern = filePattern ? filePattern : "*";
+
+ for (auto path : s3ParseSettingsBase.Paths()) {
NS3Details::TPathList directories;
NS3Details::UnpackPathsList(path.Data().Literal().Value(), FromString<bool>(path.IsText().Literal().Value()), directories);
TListRequest req;
req.Token = tokenStr;
req.Url = url;
- for (auto directory : directories) {
- req.Pattern = NS3::NormalizePath(TStringBuilder() << std::get<0>(directory) << "/*");
+ for (const auto& directory : directories) {
+ req.Pattern = NS3::NormalizePath(TStringBuilder() << std::get<0>(directory) << "/" << effectiveFilePattern);
RequestsByNode_[source.Raw()].push_back(req);
if (PendingRequests_.find(req) == PendingRequests_.end()) {
auto future = Lister_->List(req.Token, req.Url, req.Pattern);
@@ -550,6 +576,12 @@ private:
projectionPos = projectionSetting->Tail().Pos();
}
+ TString filePattern;
+ if (!FindFilePattern(settings, ctx, filePattern)) {
+ return false;
+ }
+ const TString effectiveFilePattern = filePattern ? filePattern : "*";
+
TVector<TString> paths;
const auto& object = read.Arg(2).Ref();
YQL_ENSURE(object.IsCallable("MrTableConcat"));
@@ -589,9 +621,14 @@ private:
return false;
}
if (path.EndsWith("/")) {
- req.Pattern = path + "*";
+ req.Pattern = path + effectiveFilePattern;
} else {
// treat paths as regular wildcard patterns
+ if (filePattern) {
+ ctx.AddError(TIssue(ctx.GetPosition(read.Pos()), TStringBuilder() << "Path pattern cannot be used with file_pattern"));
+ return false;
+ }
+
req.Pattern = path;
}
req.Pattern = NS3::NormalizePath(req.Pattern);
@@ -623,7 +660,7 @@ private:
}
generated << NS3::EscapeRegex(col) << "=(.*?)";
}
- generated << "/(.*)";
+ generated << '/' << NS3::RegexFromWildcards(effectiveFilePattern);
req.Pattern = generated;
reqs.push_back(req);
} else {
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp
index fc9786554a..d9c166b43d 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp
@@ -22,56 +22,6 @@ namespace NYql {
namespace {
-TString RegexFromWildcards(const std::string_view& pattern) {
- const auto& escaped = NS3::EscapeRegex(ToString(pattern));
- TStringBuilder result;
- result << "(?s)";
- bool slash = false;
- bool group = false;
-
- for (const char& c : escaped) {
- switch (c) {
- case '{':
- result << '(';
- group = true;
- slash = false;
- break;
- case '}':
- result << ')';
- group = false;
- slash = false;
- break;
- case ',':
- if (group)
- result << '|';
- else
- result << "\\,";
- slash = false;
- break;
- case '\\':
- if (slash)
- result << "\\\\";
- slash = !slash;
- break;
- case '*':
- result << "(.*)";
- slash = false;
- break;
- case '?':
- result << "(.)";
- slash = false;
- break;
- default:
- if (slash)
- result << '\\';
- result << c;
- slash = false;
- break;
- }
- }
- return result;
-}
-
using namespace NThreading;
class TS3Lister : public IS3Lister {
@@ -95,10 +45,9 @@ private:
};
}
- const auto regex = isRegex ? pattern : RegexFromWildcards(pattern);
+ const auto regex = isRegex ? pattern : NS3::RegexFromWildcards(pattern);
auto re = std::make_shared<RE2>(re2::StringPiece(regex), RE2::Options());
YQL_ENSURE(re->ok());
- YQL_ENSURE(re->NumberOfCapturingGroups() > 0);
const size_t numGroups = re->NumberOfCapturingGroups();
YQL_CLOG(INFO, ProviderS3) << "Got prefix: '" << prefix << "', regex: '" << regex
@@ -108,7 +57,7 @@ private:
auto reArgs = std::make_shared<std::vector<re2::RE2::Arg>>(numGroups);
auto reArgsPtr = std::make_shared<std::vector<re2::RE2::Arg*>>(numGroups);
- for (size_t i = 0; i < size_t(numGroups); ++i) {
+ for (size_t i = 0; i < numGroups; ++i) {
(*reArgs)[i] = &(*groups)[i];
(*reArgsPtr)[i] = &(*reArgs)[i];
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_path.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_path.cpp
index 600c50a3c7..e746aeedde 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_path.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_path.cpp
@@ -34,8 +34,62 @@ size_t GetFirstWildcardPos(const TString& path) {
return path.find_first_of("*?{");
}
-TString EscapeRegex(const TString& str) {
+TString EscapeRegex(const std::string_view& str) {
return RE2::QuoteMeta(re2::StringPiece(str));
+
+}
+
+TString EscapeRegex(const TString& str) {
+ return EscapeRegex(static_cast<std::string_view>(str));
+}
+
+TString RegexFromWildcards(const std::string_view& pattern) {
+ const auto& escaped = EscapeRegex(pattern);
+ TStringBuilder result;
+ bool slash = false;
+ bool group = false;
+
+ for (const char& c : escaped) {
+ switch (c) {
+ case '{':
+ result << "(?:";
+ group = true;
+ slash = false;
+ break;
+ case '}':
+ result << ')';
+ group = false;
+ slash = false;
+ break;
+ case ',':
+ if (group)
+ result << '|';
+ else
+ result << "\\,";
+ slash = false;
+ break;
+ case '\\':
+ if (slash)
+ result << "\\\\";
+ slash = !slash;
+ break;
+ case '*':
+ result << ".*";
+ slash = false;
+ break;
+ case '?':
+ result << ".";
+ slash = false;
+ break;
+ default:
+ if (slash)
+ result << '\\';
+ result << c;
+ slash = false;
+ break;
+ }
+ }
+ return result;
}
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_path.h b/ydb/library/yql/providers/s3/provider/yql_s3_path.h
index 8fb1e86606..c39f476f88 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_path.h
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_path.h
@@ -16,5 +16,8 @@ inline bool HasWildcards(const TString& path) {
/// quotes regex meta characters
TString EscapeRegex(const TString& str);
+TString EscapeRegex(const std::string_view& str);
+
+TString RegexFromWildcards(const std::string_view& pattern);
}