diff options
author | uzhas <uzhas@ydb.tech> | 2022-11-15 17:08:27 +0300 |
---|---|---|
committer | uzhas <uzhas@ydb.tech> | 2022-11-15 17:08:27 +0300 |
commit | b2a62d16c4f4d27d04e5764ebe669ceeb9b5557e (patch) | |
tree | 05ef66aef17ec3e6dc9f6e97162a3ac89a23dc6b | |
parent | 3d7181a331668643d91bb422a22c07d8f3a7c33e (diff) | |
download | ydb-b2a62d16c4f4d27d04e5764ebe669ceeb9b5557e.tar.gz |
support file filter
7 files changed, 135 insertions, 72 deletions
diff --git a/ydb/core/yq/libs/control_plane_storage/request_validators.cpp b/ydb/core/yq/libs/control_plane_storage/request_validators.cpp index d67bf5cb2f..00cd0db996 100644 --- a/ydb/core/yq/libs/control_plane_storage/request_validators.cpp +++ b/ydb/core/yq/libs/control_plane_storage/request_validators.cpp @@ -106,22 +106,29 @@ NYql::TIssues ValidateConnectionSetting(const YandexQuery::ConnectionSetting& se } NYql::TIssues ValidateFormatSetting(const TString& format, const google::protobuf::Map<TString, TString>& formatSetting) { - NYql::TIssues issues; + NYql::TIssues issues; for (const auto& [key, value]: formatSetting) { - if (key == "data.interval.unit") { + if (key == "file_pattern"sv) { + continue; + } + + if (key == "data.interval.unit"sv) { if (!IsValidIntervalUnit(value)) { issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "unknown value for data.interval.unit " + value)); } - } else if (key == "csv_delimiter") { - if (format != "csv_with_names") { + continue; + } + if (key == "csv_delimiter"sv) { + if (format != "csv_with_names"sv) { issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "csv_delimiter should be used only with format csv_with_names")); } if (value.size() != 1) { issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "csv_delimiter should contain only one character")); } - } else { - issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "unknown format setting " + key)); + continue; } + + issues.AddIssue(MakeErrorIssue(TIssuesIds::BAD_REQUEST, "unknown format setting " + key)); } return issues; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp index 19b1c3441a..6626328e13 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp @@ -164,10 +164,15 @@ private: return true; } + if (name == "filepattern") { + // just skip, used in reading only + return true; + } + return true; }; - if (!EnsureValidSettings(*input->Child(TS3Object::idx_Settings), {"compression", "partitionedby", "mode", "userschema", "csvdelimiter"}, validator, ctx)) { + if (!EnsureValidSettings(*input->Child(TS3Object::idx_Settings), {"compression", "partitionedby", "mode", "userschema", "csvdelimiter", "filepattern"}, validator, ctx)) { return TStatus::Error; } } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp index ddbced2339..27ce85f423 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp @@ -371,6 +371,14 @@ public: return true; } + if (name == "filepattern"sv) { + TStringBuf unused; + if (!ExtractSettingValue(setting.Tail(), "file_pattern"sv, format, {}, ctx, unused)) { + return false; + } + return true; + } + YQL_ENSURE(name == "projection"sv); haveProjection = true; if (!EnsureAtom(setting.Tail(), ctx)) { @@ -386,7 +394,7 @@ public: }; if (!EnsureValidSettings(*input->Child(TS3Object::idx_Settings), { "compression"sv, "partitionedby"sv, "projection"sv, "data.interval.unit"sv, - "readmaxbytes"sv, "csvdelimiter"sv, "directories"sv }, validator, ctx)) + "readmaxbytes"sv, "csvdelimiter"sv, "directories"sv, "filepattern"sv }, validator, ctx)) { return TStatus::Error; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp index 396f55417c..7cee2e1550 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp @@ -31,12 +31,31 @@ std::array<TExprNode::TPtr, 2U> ExtractSchema(TExprNode::TListType& settings) { return {}; } +bool FindFilePattern(const TExprNode& settings, TExprContext& ctx, TString& filePattern) { + auto filePatternSetting = GetSetting(settings, "filepattern"); + if (!filePatternSetting) { + // it is ok if settings is not set + return true; + } + + if (!EnsureTupleSize(*filePatternSetting, 2, ctx)) { + return false; + } + + if (!EnsureAtom(filePatternSetting->Tail(), ctx)) { + return false; + } + + filePattern = filePatternSetting->Tail().Content(); + return true; +} + using namespace NPathGenerator; struct TListRequest { TString Token; TString Url; - TString Pattern; + TString Pattern; // can contain capturing groups TMaybe<TString> PathPrefix; // set iff Pattern is regex (not glob pattern) TVector<IPathGenerator::TColumnWithValue> ColumnValues; }; @@ -303,8 +322,7 @@ private: extraValues->push_back(std::move(value)); } } else { - // last entry matches file name - YQL_ENSURE(entry.MatchedGlobs.size() == generatedColumnsConfig->Columns.size() + 1); + YQL_ENSURE(entry.MatchedGlobs.size() == generatedColumnsConfig->Columns.size()); for (size_t i = 0; i < generatedColumnsConfig->Columns.size(); ++i) { TExtraColumnValue value; value.Name = generatedColumnsConfig->Columns[i]; @@ -461,8 +479,6 @@ private: } bool LaunchListsForNode(const TDqSourceWrap& source, TVector<NThreading::TFuture<IS3Lister::TListResult>>& futures, TExprContext& ctx) { - Y_UNUSED(ctx); - TS3DataSource dataSource = source.DataSource().Maybe<TS3DataSource>().Cast(); const auto& connect = State_->Configuration->Clusters.at(dataSource.Cluster().StringValue()); const auto& token = State_->Configuration->Tokens.at(dataSource.Cluster().StringValue()); @@ -471,15 +487,25 @@ private: const TString url = connect.Url; const TString tokenStr = credentialsProviderFactory->CreateProvider()->GetAuthInfo(); - for (auto path : source.Input().Maybe<TS3ParseSettingsBase>().Cast().Paths()) { + auto s3ParseSettingsBase = source.Input().Maybe<TS3ParseSettingsBase>().Cast(); + TString filePattern; + if (s3ParseSettingsBase.Ref().ChildrenSize() > TS3ParseSettingsBase::idx_Settings) { + const auto& settings = *s3ParseSettingsBase.Ref().Child(TS3ParseSettingsBase::idx_Settings); + if (!FindFilePattern(settings, ctx, filePattern)) { + return false; + } + } + const TString effectiveFilePattern = filePattern ? filePattern : "*"; + + for (auto path : s3ParseSettingsBase.Paths()) { NS3Details::TPathList directories; NS3Details::UnpackPathsList(path.Data().Literal().Value(), FromString<bool>(path.IsText().Literal().Value()), directories); TListRequest req; req.Token = tokenStr; req.Url = url; - for (auto directory : directories) { - req.Pattern = NS3::NormalizePath(TStringBuilder() << std::get<0>(directory) << "/*"); + for (const auto& directory : directories) { + req.Pattern = NS3::NormalizePath(TStringBuilder() << std::get<0>(directory) << "/" << effectiveFilePattern); RequestsByNode_[source.Raw()].push_back(req); if (PendingRequests_.find(req) == PendingRequests_.end()) { auto future = Lister_->List(req.Token, req.Url, req.Pattern); @@ -550,6 +576,12 @@ private: projectionPos = projectionSetting->Tail().Pos(); } + TString filePattern; + if (!FindFilePattern(settings, ctx, filePattern)) { + return false; + } + const TString effectiveFilePattern = filePattern ? filePattern : "*"; + TVector<TString> paths; const auto& object = read.Arg(2).Ref(); YQL_ENSURE(object.IsCallable("MrTableConcat")); @@ -589,9 +621,14 @@ private: return false; } if (path.EndsWith("/")) { - req.Pattern = path + "*"; + req.Pattern = path + effectiveFilePattern; } else { // treat paths as regular wildcard patterns + if (filePattern) { + ctx.AddError(TIssue(ctx.GetPosition(read.Pos()), TStringBuilder() << "Path pattern cannot be used with file_pattern")); + return false; + } + req.Pattern = path; } req.Pattern = NS3::NormalizePath(req.Pattern); @@ -623,7 +660,7 @@ private: } generated << NS3::EscapeRegex(col) << "=(.*?)"; } - generated << "/(.*)"; + generated << '/' << NS3::RegexFromWildcards(effectiveFilePattern); req.Pattern = generated; reqs.push_back(req); } else { diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp index fc9786554a..d9c166b43d 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp @@ -22,56 +22,6 @@ namespace NYql { namespace { -TString RegexFromWildcards(const std::string_view& pattern) { - const auto& escaped = NS3::EscapeRegex(ToString(pattern)); - TStringBuilder result; - result << "(?s)"; - bool slash = false; - bool group = false; - - for (const char& c : escaped) { - switch (c) { - case '{': - result << '('; - group = true; - slash = false; - break; - case '}': - result << ')'; - group = false; - slash = false; - break; - case ',': - if (group) - result << '|'; - else - result << "\\,"; - slash = false; - break; - case '\\': - if (slash) - result << "\\\\"; - slash = !slash; - break; - case '*': - result << "(.*)"; - slash = false; - break; - case '?': - result << "(.)"; - slash = false; - break; - default: - if (slash) - result << '\\'; - result << c; - slash = false; - break; - } - } - return result; -} - using namespace NThreading; class TS3Lister : public IS3Lister { @@ -95,10 +45,9 @@ private: }; } - const auto regex = isRegex ? pattern : RegexFromWildcards(pattern); + const auto regex = isRegex ? pattern : NS3::RegexFromWildcards(pattern); auto re = std::make_shared<RE2>(re2::StringPiece(regex), RE2::Options()); YQL_ENSURE(re->ok()); - YQL_ENSURE(re->NumberOfCapturingGroups() > 0); const size_t numGroups = re->NumberOfCapturingGroups(); YQL_CLOG(INFO, ProviderS3) << "Got prefix: '" << prefix << "', regex: '" << regex @@ -108,7 +57,7 @@ private: auto reArgs = std::make_shared<std::vector<re2::RE2::Arg>>(numGroups); auto reArgsPtr = std::make_shared<std::vector<re2::RE2::Arg*>>(numGroups); - for (size_t i = 0; i < size_t(numGroups); ++i) { + for (size_t i = 0; i < numGroups; ++i) { (*reArgs)[i] = &(*groups)[i]; (*reArgsPtr)[i] = &(*reArgs)[i]; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_path.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_path.cpp index 600c50a3c7..e746aeedde 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_path.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_path.cpp @@ -34,8 +34,62 @@ size_t GetFirstWildcardPos(const TString& path) { return path.find_first_of("*?{"); } -TString EscapeRegex(const TString& str) { +TString EscapeRegex(const std::string_view& str) { return RE2::QuoteMeta(re2::StringPiece(str)); + +} + +TString EscapeRegex(const TString& str) { + return EscapeRegex(static_cast<std::string_view>(str)); +} + +TString RegexFromWildcards(const std::string_view& pattern) { + const auto& escaped = EscapeRegex(pattern); + TStringBuilder result; + bool slash = false; + bool group = false; + + for (const char& c : escaped) { + switch (c) { + case '{': + result << "(?:"; + group = true; + slash = false; + break; + case '}': + result << ')'; + group = false; + slash = false; + break; + case ',': + if (group) + result << '|'; + else + result << "\\,"; + slash = false; + break; + case '\\': + if (slash) + result << "\\\\"; + slash = !slash; + break; + case '*': + result << ".*"; + slash = false; + break; + case '?': + result << "."; + slash = false; + break; + default: + if (slash) + result << '\\'; + result << c; + slash = false; + break; + } + } + return result; } } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_path.h b/ydb/library/yql/providers/s3/provider/yql_s3_path.h index 8fb1e86606..c39f476f88 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_path.h +++ b/ydb/library/yql/providers/s3/provider/yql_s3_path.h @@ -16,5 +16,8 @@ inline bool HasWildcards(const TString& path) { /// quotes regex meta characters TString EscapeRegex(const TString& str); +TString EscapeRegex(const std::string_view& str); + +TString RegexFromWildcards(const std::string_view& pattern); } |