diff options
author | auzhegov <auzhegov@yandex-team.com> | 2023-06-30 20:38:14 +0300 |
---|---|---|
committer | auzhegov <auzhegov@yandex-team.com> | 2023-06-30 20:38:14 +0300 |
commit | fa133227cb17a256e3e20c774312e12b779761ee (patch) | |
tree | 42f67b5b6a7f7ab51a7c635fb863434111098074 | |
parent | 2364d4de6d0bb2b84c98308b7ba0693d841bf83a (diff) | |
download | ydb-fa133227cb17a256e3e20c774312e12b779761ee.tar.gz |
IL: no more strategy to test
6 files changed, 280 insertions, 235 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index c3f8d44331c..dcda43cca01 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -415,8 +415,8 @@ public: bool SaveRetrievedResults(const NS3Lister::TListResult& listingResult) { LOG_T("TS3FileQueueActor", "SaveRetrievedResults"); - if (std::holds_alternative<TIssues>(listingResult)) { - MaybeIssues = std::get<TIssues>(listingResult); + if (std::holds_alternative<NS3Lister::TListError>(listingResult)) { + MaybeIssues = std::get<NS3Lister::TListError>(listingResult).Issues; return false; } diff --git a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp index 161b3c1d7b7..620d3b509e6 100644 --- a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp +++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp @@ -408,7 +408,7 @@ private: YQL_CLOG(INFO, ProviderS3) << "Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix << ": got error from http gateway: " << issues.ToString(true); - ctx.Promise.SetValue(std::move(issues)); + ctx.Promise.SetValue(TListError{EListError::GENERAL, std::move(issues)}); ctx.NextRequestPromise.SetValue(Nothing()); } } catch (const std::exception& ex) { diff --git a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h index ecd1449c1a6..e4971dedcc0 100644 --- a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h +++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h @@ -137,7 +137,15 @@ public: std::vector<TDirectoryListEntry> Directories; }; -using TListResult = std::variant<TListEntries, TIssues>; +enum class EListError { + GENERAL, + LIMIT_EXCEEDED +}; +struct TListError { + EListError Type; + TIssues Issues; +}; +using TListResult = std::variant<TListEntries, TListError>; struct TListingRequest { TString Url; diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp index 418ff83e70f..9feecb67225 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp @@ -184,13 +184,13 @@ private: const auto& listResult = it->second.GetValue(); if (listResult.index() == 1) { - const auto& issues = std::get<TIssues>(listResult); + const auto& error = std::get<NS3Lister::TListError>(listResult); YQL_CLOG(INFO, ProviderS3) << "Discovery " << req.S3Request.Url << req.S3Request.Pattern - << " error " << issues.ToString(); + << " error " << error.Issues.ToString(); std::for_each( - issues.begin(), - issues.end(), + error.Issues.begin(), + error.Issues.end(), std::bind( &TExprContext::AddError, std::ref(ctx), std::placeholders::_1)); return TStatus::Error; @@ -326,9 +326,15 @@ private: const NS3Lister::TListResult& listResult = it->second.GetValue(); if (listResult.index() == 1) { - const auto& issues = std::get<TIssues>(listResult); - YQL_CLOG(INFO, ProviderS3) << "Discovery " << req.S3Request.Url << req.S3Request.Pattern << " error " << issues.ToString(); - std::for_each(issues.begin(), issues.end(), std::bind(&TExprContext::AddError, std::ref(ctx), std::placeholders::_1)); + const auto& error = std::get<NS3Lister::TListError>(listResult); + YQL_CLOG(INFO, ProviderS3) + << "Discovery " << req.S3Request.Url << req.S3Request.Pattern + << " error " << error.Issues.ToString(); + std::for_each( + error.Issues.begin(), + error.Issues.end(), + std::bind( + &TExprContext::AddError, std::ref(ctx), std::placeholders::_1)); return TStatus::Error; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp index 2ff15a035a8..3351d3ba8fb 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp @@ -1,5 +1,6 @@ #include "yql_s3_listing_strategy.h" +#include <ydb/library/yql/core/issue/protos/issue_id.pb.h> #include <ydb/library/yql/providers/s3/common/util.h> #include <ydb/library/yql/providers/s3/object_listers/yql_s3_future_algorithms.h> #include <ydb/library/yql/providers/s3/object_listers/yql_s3_path.h> @@ -26,32 +27,37 @@ IOutputStream& operator<<(IOutputStream& stream, const TS3ListingOptions& option namespace { using namespace NThreading; +using namespace NS3Lister; + +TListError MakeLimitExceededError( + const TString& componentName, ui64 limit, ui64 actual) { + auto issue = TIssue( + TStringBuilder{} << '[' << componentName << "] Limit exceeded. Limit: " << limit + << " Actual: " << actual); + return TListError{EListError::LIMIT_EXCEEDED, TIssues{std::move(issue)}}; +} -TIssue MakeLimitExceededIssue() { - auto issue = TIssue("Limit exceeded"); - issue.SetCode(0U, ESeverity::TSeverityIds_ESeverityId_S_WARNING); - return issue; +TListError MakeGenericError(const TString& description) { + auto issue = TIssue(description); + return TListError{EListError::GENERAL, TIssues{std::move(issue)}}; } -bool IsRecoverableIssue(const TIssues& issues) { - if (issues.Size() != 1) { - return false; - } - return issues.begin()->GetSeverity() >= ESeverity::TSeverityIds_ESeverityId_S_WARNING; +bool IsRecoverableError(const TListError& error) { + return error.Type == EListError::LIMIT_EXCEEDED; } class TCollectingS3ListingStrategy : public IS3ListingStrategy { public: - using TListerFactoryMethod = std::function<TFuture<NS3Lister::IS3Lister::TPtr>( - const NS3Lister::TListingRequest& listingRequest, TS3ListingOptions options)>; + using TListerFactoryMethod = std::function<TFuture<IS3Lister::TPtr>( + const TListingRequest& listingRequest, TS3ListingOptions options)>; TCollectingS3ListingStrategy(TListerFactoryMethod&& listerFactoryMethod, TString collectingName) : ListerFactoryMethod(std::move(listerFactoryMethod)) , CollectingName(std::move(collectingName)) { } - TFuture<NS3Lister::TListResult> List( - const NS3Lister::TListingRequest& listingRequest, + TFuture<TListResult> List( + const TListingRequest& listingRequest, const TS3ListingOptions& options) override { Y_UNUSED(options); YQL_CLOG(TRACE, ProviderS3) << "[TCollectingS3ListingStrategy:" << CollectingName @@ -59,27 +65,28 @@ public: auto futureLister = ListerFactoryMethod(listingRequest, options); return futureLister.Apply([listingRequest, options, name = CollectingName, limit = options.MaxResultSet]( - const TFuture<NS3Lister::IS3Lister::TPtr>& lister) { + const TFuture<IS3Lister::TPtr>& lister) { try { return DoListCallback(lister.GetValue(), options, name, limit); } catch (...) { - return MakeErrorFuture<NS3Lister::TListResult>(std::current_exception()); + return MakeErrorFuture<TListResult>(std::current_exception()); } }); } private: static inline auto MakeNewListingChunkHandler( - NS3Lister::TListResult& state, size_t limit, TString collectingName) { - return [&state, limit, name = std::move(collectingName)](NS3Lister::TListEntries&& chunkEntries) { - auto& stateEntries = std::get<NS3Lister::TListEntries>(state); + TListResult& state, size_t limit, TString collectingName) { + return [&state, limit, name = std::move(collectingName)](TListEntries&& chunkEntries) { + auto& stateEntries = std::get<TListEntries>(state); if (stateEntries.Size() + chunkEntries.Size() > limit) { YQL_CLOG(TRACE, ProviderS3) << "[TCollectingS3ListingStrategy:" << name << "] Collected " << stateEntries.Size() + chunkEntries.Size() << " object paths which is more than limit " << limit; - state = TIssues{MakeLimitExceededIssue()}; + state = MakeLimitExceededError( + name, limit, stateEntries.Size() + chunkEntries.Size()); return EAggregationAction::Stop; } YQL_CLOG(TRACE, ProviderS3) @@ -90,24 +97,24 @@ private: return EAggregationAction::Proceed; }; } - static inline auto MakeIssuesHandler(NS3Lister::TListResult& state) { - return [&state](const TIssues& issues) { - state = issues; + static inline auto MakeIssuesHandler(TListResult& state) { + return [&state](const TListError& error) { + state = error; return EAggregationAction::Stop; }; } static inline EAggregationAction ExceptionHandler( - NS3Lister::TListResult& state, const std::exception& exception) { - state = TIssues{TIssue{exception.what()}}; + TListResult& state, const std::exception& exception) { + state = MakeGenericError(exception.what()); return EAggregationAction::Stop; } - static TFuture<NS3Lister::TListResult> DoListCallback( - NS3Lister::IS3Lister::TPtr lister, TS3ListingOptions options, TString name, ui64 limit) { + static TFuture<TListResult> DoListCallback( + IS3Lister::TPtr lister, TS3ListingOptions options, TString name, ui64 limit) { Y_UNUSED(options); - return NYql::AccumulateWithEarlyStop<NS3Lister::TListResult>( + return NYql::AccumulateWithEarlyStop<TListResult>( std::move(lister), - NS3Lister::TListResult{}, - [limit, name](NS3Lister::TListResult& state, NS3Lister::TListResult&& chunk) { + TListResult{}, + [limit, name](TListResult& state, TListResult&& chunk) { return std::visit( TOverloaded{ std::move(MakeNewListingChunkHandler(state, limit, std::move(name))), @@ -125,12 +132,12 @@ private: class TFlatFileS3ListingStrategy : public TCollectingS3ListingStrategy { public: TFlatFileS3ListingStrategy( - const NS3Lister::IS3ListerFactory::TPtr& listerFactory, + const IS3ListerFactory::TPtr& listerFactory, const IHTTPGateway::TPtr& httpGateway, bool allowLocalFiles) : TCollectingS3ListingStrategy( [allowLocalFiles, httpGateway, listerFactory]( - const NS3Lister::TListingRequest& listingRequest, + const TListingRequest& listingRequest, TS3ListingOptions options) { Y_UNUSED(options); return listerFactory->Make( @@ -142,12 +149,12 @@ public: class TDirectoryS3ListingStrategy : public TCollectingS3ListingStrategy { public: TDirectoryS3ListingStrategy( - const NS3Lister::IS3ListerFactory::TPtr& listerFactory, + const IS3ListerFactory::TPtr& listerFactory, const IHTTPGateway::TPtr& httpGateway, bool allowLocalFiles) : TCollectingS3ListingStrategy( [allowLocalFiles, httpGateway, listerFactory]( - const NS3Lister::TListingRequest& listingRequest, + const TListingRequest& listingRequest, TS3ListingOptions options) { Y_UNUSED(options); return listerFactory->Make( @@ -161,10 +168,10 @@ public: using TStrategyContainer = std::vector<std::shared_ptr<IS3ListingStrategy>>; using TStrategyContainerPtr = std::shared_ptr<TStrategyContainer>; - class TStrategyListIterator : public NS3Lister::IS3Lister { + class TStrategyListIterator : public IS3Lister { public: TStrategyListIterator( - NS3Lister::TListingRequest listingRequest, + TListingRequest listingRequest, TS3ListingOptions options, TStrategyContainerPtr strategies) : ListingRequest(std::move(listingRequest)) @@ -173,7 +180,7 @@ public: , Iterator(Strategies->cbegin()) , End(Strategies->end()) { } - TFuture<NS3Lister::TListResult> Next() override { + TFuture<TListResult> Next() override { return (*Iterator)->List(ListingRequest, Options); } @@ -187,7 +194,7 @@ public: } private: - const NS3Lister::TListingRequest ListingRequest; + const TListingRequest ListingRequest; const TS3ListingOptions Options; const TStrategyContainerPtr Strategies; TStrategyContainer::const_iterator Iterator; @@ -195,45 +202,52 @@ public: bool IsFirst = true; }; - class AggregationState { - public: - NS3Lister::TListResult Result; + struct AggregationState { + TListResult Result; + std::vector<TIntrusivePtr<TIssue>> PreviousIssues; bool Set = false; }; explicit TCompositeS3ListingStrategy(TStrategyContainer&& strategies) : Strategies(std::make_shared<TStrategyContainer>(std::move(strategies))) { } - TFuture<NS3Lister::TListResult> List( - const NS3Lister::TListingRequest& listingRequest, + TFuture<TListResult> List( + const TListingRequest& listingRequest, const TS3ListingOptions& options) override { auto strategyListIterator = std::make_unique<TStrategyListIterator>(listingRequest, options, Strategies); return NYql::AccumulateWithEarlyStop<AggregationState>( std::move(strategyListIterator), - AggregationState{NS3Lister::TListEntries{}}, - [](AggregationState& state, NS3Lister::TListResult&& chunk) { + AggregationState{ + TListEntries{}, std::vector<TIntrusivePtr<TIssue>>{}}, + [](AggregationState& state, TListResult&& chunk) { auto newChunkHandler = - [&state](NS3Lister::TListEntries&& chunkEntries) mutable { + [&state](TListEntries&& chunkEntries) mutable { YQL_CLOG(INFO, ProviderS3) << "[TCompositeS3ListingStrategy] Strategy successfully listed paths. Returning result: " << chunkEntries.Objects.size() << " objects, " << chunkEntries.Directories.size() << " path prefixes"; - std::get<NS3Lister::TListEntries>(state.Result) = + std::get<TListEntries>(state.Result) = chunkEntries; state.Set = true; return EAggregationAction::Stop; }; - auto errorHandler = [&state](const TIssues& issues) mutable { - if (IsRecoverableIssue(issues)) { + auto errorHandler = [&state](const TListError& error) mutable { + auto issue = MakeIntrusive<TIssue>("Strategy failed with issues"); + for (auto& subIssue: error.Issues) { + issue->AddSubIssue(MakeIntrusive<TIssue>(subIssue)); + } + state.PreviousIssues.emplace_back(std::move(issue)); + + if (IsRecoverableError(error)) { YQL_CLOG(INFO, ProviderS3) << "[TCompositeS3ListingStrategy] Strategy failed " << " to list paths. Trying next one... "; return EAggregationAction::Proceed; } - state.Result = issues; + state.Result = error; state.Set = true; return EAggregationAction::Stop; }; @@ -245,14 +259,20 @@ public: std::move(chunk)); }, [](AggregationState& state, const std::exception& exception) { - state.Result = TIssues{TIssue{exception.what()}}; + state.Result = MakeGenericError(exception.what()); state.Set = true; return EAggregationAction::Stop; }) - .Apply([](const TFuture<AggregationState>& state) { + .Apply([](const TFuture<AggregationState>& state) -> TListResult { auto& result = state.GetValue(); - return result.Set ? result.Result - : TIssues{TIssue("No more strategy to test")}; + if (!result.Set) { + auto issue = TIssue{"Couldn’t list paths in S3 source"}; + for (auto& subIssue : result.PreviousIssues) { + issue.AddSubIssue(subIssue); + } + return TListError{EListError::GENERAL, TIssues{std::move(issue)}}; + } + return result.Result; }); } @@ -268,8 +288,8 @@ public: TConditionalS3ListingStrategy(std::initializer_list<TPair> list) : Strategies(list.begin(), list.end()) { } - TFuture<NS3Lister::TListResult> List( - const NS3Lister::TListingRequest& listingRequest, + TFuture<TListResult> List( + const TListingRequest& listingRequest, const TS3ListingOptions& options) override { auto strategyIt = std::find_if( Strategies.begin(), Strategies.end(), [options](const TPair& record) { @@ -278,8 +298,8 @@ public: }); if (strategyIt == Strategies.end()) { - auto issue = TIssues{TIssue("No strategy matched listing request")}; - return MakeFuture(NS3Lister::TListResult{issue}); + auto error = MakeGenericError("No strategy matched listing request"); + return MakeFuture(TListResult{std::move(error)}); } return strategyIt->second->List(listingRequest, options); @@ -291,10 +311,10 @@ private: class TPartitionedDatasetS3ListingStrategy : public TCollectingS3ListingStrategy { public: - class PartitionedDirectoryResolverIterator : public NS3Lister::IS3Lister { + class PartitionedDirectoryResolverIterator : public IS3Lister { public: PartitionedDirectoryResolverIterator( - NS3Lister::TListingRequest defaultParams, + TListingRequest defaultParams, const TString& basePrefix, TS3ListingOptions options, TDirectoryS3ListingStrategy directoryListingStrategy) @@ -314,19 +334,19 @@ public: * 4) If directory is not matched against regexp - list this directory recursively * 5) if there is no directory to list - algorithm ends */ - TFuture<NS3Lister::TListResult> Next() override { + TFuture<TListResult> Next() override { First = false; NextDirectoryListeningChunk = GetNextPrefixLister().Apply( [queue = DirectoryPrefixQueue]( - const TFuture<NS3Lister::TListResult>& future) -> NS3Lister::TListResult { + const TFuture<TListResult>& future) -> TListResult { try { auto& nextBatch = future.GetValue(); - if (std::holds_alternative<TIssues>(nextBatch)) { - return std::get<TIssues>(nextBatch); + if (std::holds_alternative<TListError>(nextBatch)) { + return std::get<TListError>(nextBatch); } - auto& listingResult = std::get<NS3Lister::TListEntries>(nextBatch); - auto result = NS3Lister::TListEntries{}; + auto& listingResult = std::get<TListEntries>(nextBatch); + auto result = TListEntries{}; result.Objects.insert( result.Objects.begin(), listingResult.Objects.cbegin(), @@ -340,9 +360,9 @@ public: } } - return NS3Lister::TListResult{result}; + return TListResult{result}; } catch (std::exception& e) { - return TIssues{TIssue{e.what()}}; + return MakeGenericError(e.what()); } }); return NextDirectoryListeningChunk; @@ -356,39 +376,38 @@ public: } private: - TFuture<NS3Lister::TListResult> GetNextPrefixLister() { + TFuture<TListResult> GetNextPrefixLister() { if (DirectoryPrefixQueue->empty()) { - return MakeFuture( - NS3Lister::TListResult{TIssues{TIssue{"No path to list"}}}); + return MakeFuture(TListResult{MakeGenericError("No path to list")}); } auto prefix = DirectoryPrefixQueue->front(); DirectoryPrefixQueue->pop_front(); - auto request = NS3Lister::TListingRequest(DefaultParams); + auto request = TListingRequest(DefaultParams); request.Prefix = prefix; return DirectoryListingStrategy.List(request, Options); } private: - const NS3Lister::TListingRequest DefaultParams; + const TListingRequest DefaultParams; const TS3ListingOptions Options; std::shared_ptr<std::deque<TString>> DirectoryPrefixQueue; - std::vector<NS3Lister::TObjectListEntry> Objects; + std::vector<TObjectListEntry> Objects; TDirectoryS3ListingStrategy DirectoryListingStrategy; - TFuture<NS3Lister::TListResult> NextDirectoryListeningChunk; + TFuture<TListResult> NextDirectoryListeningChunk; bool First = true; }; TPartitionedDatasetS3ListingStrategy( - const NS3Lister::IS3ListerFactory::TPtr& listerFactory, + const IS3ListerFactory::TPtr& listerFactory, const IHTTPGateway::TPtr& httpGateway, bool allowLocalFiles) : TCollectingS3ListingStrategy( [listerFactory, httpGateway, allowLocalFiles]( - const NS3Lister::TListingRequest& listingRequest, + const TListingRequest& listingRequest, TS3ListingOptions options) { - auto ptr = std::shared_ptr<NS3Lister::IS3Lister>( + auto ptr = std::shared_ptr<IS3Lister>( new PartitionedDirectoryResolverIterator{ listingRequest, listingRequest.Prefix, @@ -400,13 +419,13 @@ public: "TPartitionedDatasetS3ListingStrategy") { } }; -class TBFSDirectoryResolverIterator : public NS3Lister::IS3Lister { +class TBFSDirectoryResolverIterator : public IS3Lister { public: - using TListingRequestFactory = std::function<NS3Lister::TListingRequest( - const NS3Lister::TListingRequest& defaultParams, const TString& pathPrefix)>; + using TListingRequestFactory = std::function<TListingRequest( + const TListingRequest& defaultParams, const TString& pathPrefix)>; TBFSDirectoryResolverIterator( - NS3Lister::TListingRequest defaultParams, + TListingRequest defaultParams, TListingRequestFactory listingRequestFactory, TS3ListingOptions options, std::deque<TString> initialPathPrefixes, @@ -421,72 +440,83 @@ public: , MinParallelism(minParallelism) , Limit(limit) { } - TFuture<NS3Lister::TListResult> Next() override { + TFuture<TListResult> Next() override { First = false; if (DirectoryPrefixQueue.empty()) { - return MakeFuture(NS3Lister::TListResult{TIssues{TIssue{"No path to list"}}}); + return MakeFuture(TListResult{MakeGenericError("No path to list")}); } auto sourcePrefix = DirectoryPrefixQueue.front(); DirectoryPrefixQueue.pop_front(); NextDirectoryListeningChunk = GetPrefixLister(sourcePrefix) - .Apply( - [this, sourcePrefix](const TFuture<NS3Lister::TListResult>& future) - -> NS3Lister::TListResult { - YQL_CLOG(TRACE, ProviderS3) - << "[TBFSDirectoryResolverIterator] Got new listing result. Collected entries: " - << ReturnedSize + DirectoryPrefixQueue.size(); - - try { - auto& nextBatch = future.GetValue(); - if (std::holds_alternative<TIssues>(nextBatch)) { - return std::get<TIssues>(nextBatch); - } + .Apply([this, sourcePrefix](const TFuture<TListResult>& future) -> TListResult { + YQL_CLOG(TRACE, ProviderS3) + << "[TBFSDirectoryResolverIterator] Got new listing result. Collected entries: " + << ReturnedSize + DirectoryPrefixQueue.size(); - auto& listingResult = - std::get<NS3Lister::TListEntries>(nextBatch); - auto result = NS3Lister::TListEntries{}; - auto currentListingTotalSize = ReturnedSize + - DirectoryPrefixQueue.size() + - listingResult.Size(); - - if (currentListingTotalSize > Limit) { - // Stop listing - result.Directories.push_back({.Path = sourcePrefix}); - for (auto& directoryPrefix : DirectoryPrefixQueue) { - result.Directories.push_back({.Path = directoryPrefix}); - } - DirectoryPrefixQueue.clear(); - } else { - result.Objects.insert( - result.Objects.end(), - std::make_move_iterator(listingResult.Objects.begin()), - std::make_move_iterator(listingResult.Objects.end())); - if (currentListingTotalSize < MinParallelism) { - for (auto& directoryPrefix : listingResult.Directories) { - DirectoryPrefixQueue.push_back(directoryPrefix.Path); - } - } else { - for (auto& directoryPrefix : listingResult.Directories) { - result.Directories.push_back( - {.Path = directoryPrefix.Path}); - } - for (auto& directoryPrefix : DirectoryPrefixQueue) { - result.Directories.push_back( - {.Path = directoryPrefix}); - } - DirectoryPrefixQueue.clear(); - } + try { + auto& nextBatch = future.GetValue(); + if (std::holds_alternative<TListError>(nextBatch)) { + auto& error = std::get<TListError>(nextBatch); + if (error.Type == EListError::LIMIT_EXCEEDED) { + auto result = TListEntries{}; + PerformEarlyStop(result, sourcePrefix); + return result; } - ReturnedSize += result.Size(); - return NS3Lister::TListResult{result}; - } catch (std::exception& e) { - return TIssues{TIssue{e.what()}}; + return error; } - }); + + auto& listingResult = std::get<TListEntries>(nextBatch); + auto currentListingTotalSize = ReturnedSize + + DirectoryPrefixQueue.size() + + listingResult.Size(); + + auto result = TListEntries{}; + if (currentListingTotalSize > Limit) { + // Stop listing + PerformEarlyStop(result, sourcePrefix); + } else { + ProcessDataChunk(result, listingResult, currentListingTotalSize); + } + + ReturnedSize += result.Size(); + return result; + } catch (std::exception& e) { + return MakeGenericError(e.what()); + } + }); return NextDirectoryListeningChunk; } + void PerformEarlyStop(TListEntries& result, const TString& sourcePrefix) { + result.Directories.push_back({.Path = sourcePrefix}); + for (auto& directoryPrefix : DirectoryPrefixQueue) { + result.Directories.push_back({.Path = directoryPrefix}); + } + DirectoryPrefixQueue.clear(); + } + void ProcessDataChunk( + TListEntries& result, + const TListEntries& listingResult, + ui64 currentListingTotalSize) { + result.Objects.insert( + result.Objects.end(), + std::make_move_iterator(listingResult.Objects.begin()), + std::make_move_iterator(listingResult.Objects.end())); + if (currentListingTotalSize < MinParallelism) { + for (auto& directoryPrefix : listingResult.Directories) { + DirectoryPrefixQueue.push_back(directoryPrefix.Path); + } + } else { + for (auto& directoryPrefix : listingResult.Directories) { + result.Directories.push_back({.Path = directoryPrefix.Path}); + } + for (auto& directoryPrefix : DirectoryPrefixQueue) { + result.Directories.push_back({.Path = directoryPrefix}); + } + DirectoryPrefixQueue.clear(); + } + } bool HasNext() override { if (!First) { @@ -500,21 +530,21 @@ public: } private: - TFuture<NS3Lister::TListResult> GetPrefixLister(const TString& prefix) { + TFuture<TListResult> GetPrefixLister(const TString& prefix) { const auto& listingRequest = ListingRequestFactory(DefaultParams, prefix); auto listResult = DirectoryListingStrategy.List(listingRequest, Options); return listResult; } private: - const NS3Lister::TListingRequest DefaultParams; + const TListingRequest DefaultParams; const TS3ListingOptions Options; TDirectoryS3ListingStrategy DirectoryListingStrategy; TListingRequestFactory ListingRequestFactory; std::deque<TString> DirectoryPrefixQueue; - std::vector<NS3Lister::TObjectListEntry> Objects; - TFuture<NS3Lister::TListResult> NextDirectoryListeningChunk; + std::vector<TObjectListEntry> Objects; + TFuture<TListResult> NextDirectoryListeningChunk; bool First = true; size_t ReturnedSize = 0; @@ -525,20 +555,20 @@ private: class TUnPartitionedDatasetS3ListingStrategy : public TCollectingS3ListingStrategy { public: TUnPartitionedDatasetS3ListingStrategy( - const NS3Lister::IS3ListerFactory::TPtr& listerFactory, + const IS3ListerFactory::TPtr& listerFactory, const IHTTPGateway::TPtr& httpGateway, size_t minParallelism, bool allowLocalFiles) : TCollectingS3ListingStrategy( [listerFactory, httpGateway, minParallelism, allowLocalFiles]( - const NS3Lister::TListingRequest& listingRequest, + const TListingRequest& listingRequest, TS3ListingOptions options) { - auto ptr = std::shared_ptr<NS3Lister::IS3Lister>( + auto ptr = std::shared_ptr<IS3Lister>( new TBFSDirectoryResolverIterator{ listingRequest, - [](const NS3Lister::TListingRequest& defaultParams, + [](const TListingRequest& defaultParams, const TString& pathPrefix) { - NS3Lister::TListingRequest request(defaultParams); + TListingRequest request(defaultParams); request.Prefix = pathPrefix; return request; }, @@ -557,37 +587,37 @@ public: "TUnPartitionedDatasetS3ListingStrategy") {} }; +enum class ELimitExceededAction : ui8 { Proceed = 0, RaiseError = 1 }; + // When data is collected result will be emitted (occurs once per iterator); -class TConcurrentBFSDirectoryResolverIterator : public NS3Lister::IS3Lister { +class TConcurrentBFSDirectoryResolverIterator : public IS3Lister { public: - using TListingRequestFactory = std::function<NS3Lister::TListingRequest( - const NS3Lister::TListingRequest& defaultParams, const TString& pathPrefix)>; + using TListingRequestFactory = std::function<TListingRequest( + const TListingRequest& defaultParams, const TString& pathPrefix)>; struct TSharedState { - using TLimitExceededStateModificator = - std::function<void(TSharedState& state, const TString& pathPrefix)>; using TDirectoryToListMatcher = - std::function<bool(const NS3Lister::TDirectoryListEntry& entry)>; + std::function<bool(const TDirectoryListEntry& entry)>; using TEarlyStopMatcher = std::function<bool(const TSharedState& state)>; // Initial params - const NS3Lister::TListingRequest DefaultParams; + const TListingRequest DefaultParams; const TS3ListingOptions Options; TDirectoryS3ListingStrategy DirectoryListingStrategy; const TListingRequestFactory ListingRequestFactory; - const TLimitExceededStateModificator LimitExceededStateModificator; + const ELimitExceededAction LimitExceededAction; const TDirectoryToListMatcher DirectoryToListMatcher; const TEarlyStopMatcher EarlyStopMatcher; // Mutable state std::mutex StateLock; std::deque<TString> DirectoryPrefixQueue; std::list<TString> InProgressPaths; - std::vector<TIssues> Issues; - std::vector<NS3Lister::TObjectListEntry> Objects; - std::vector<NS3Lister::TDirectoryListEntry> Directories; - std::vector<TFuture<NS3Lister::TListResult>> NextDirectoryListeningChunk; + TMaybe<TListError> MaybeError; + std::vector<TObjectListEntry> Objects; + std::vector<TDirectoryListEntry> Directories; + std::vector<TFuture<TListResult>> NextDirectoryListeningChunk; // CurrentListing - TPromise<NS3Lister::TListResult> CurrentPromise; + TPromise<TListResult> CurrentPromise; bool IsListingFinished = false; // Configuration const size_t Limit = 1; @@ -597,7 +627,7 @@ public: public: static void ListingCallback( const std::weak_ptr<TSharedState>& stateWeakPtr, - const TFuture<NS3Lister::TListResult>& future, + const TFuture<TListResult>& future, const TString& sourcePath ) { auto state = stateWeakPtr.lock(); @@ -635,7 +665,7 @@ public: } catch (std::exception& e) { YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] An exception has happened - saving an issue"; - state->Issues.emplace_back(TIssues{TIssue{e.what()}}); + state->MaybeError = MakeGenericError(e.what()); state->IsListingFinished = true; } @@ -651,18 +681,27 @@ public: InProgressPaths.erase(pos); Y_VERIFY(sizeBeforeRemoval == InProgressPaths.size() + 1); } + void HandleLimitExceeded(const TString& sourcePath, const TListError& error) { + IsListingFinished = true; + if (LimitExceededAction == ELimitExceededAction::RaiseError) { + YQL_CLOG(TRACE, ProviderS3) + << "[TConcurrentBFSDirectoryResolverIterator] AddChunkToState listing is finished due to Limit"; + MaybeError = error; + } else { + DirectoryPrefixQueue.push_back(sourcePath); + }; + } void AddChunkToState( - const NS3Lister::TListResult& nextBatch, + const TListResult& nextBatch, const TString& sourcePath) { - if (std::holds_alternative<TIssues>(nextBatch)) { - IsListingFinished = true; - Issues.push_back(std::get<TIssues>(nextBatch)); - YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] AddChunkToState listing is finished due to Issue"; + if (std::holds_alternative<TListError>(nextBatch)) { + auto& error = std::get<TListError>(nextBatch); + HandleLimitExceeded(sourcePath, error); return; } - auto& listingResult = std::get<NS3Lister::TListEntries>(nextBatch); + auto& listingResult = std::get<TListEntries>(nextBatch); auto currentListingTotalSize = InProgressPaths.size() + DirectoryPrefixQueue.size() + Objects.size() + @@ -670,9 +709,11 @@ public: // Process new listing result if (currentListingTotalSize > Limit) { - IsListingFinished = true; - YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] AddChunkToState listing is finished due to Limit"; - LimitExceededStateModificator(*this, sourcePath); + auto error = MakeLimitExceededError( + "TConcurrentBFSDirectoryResolverIterator", + Limit, + currentListingTotalSize); + HandleLimitExceeded(sourcePath, std::move(error)); return; } @@ -707,17 +748,13 @@ public: void SetPromise() { Y_ENSURE(IsListingFinished); YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] SetPromise going to set promise"; - NS3Lister::TListResult res; - if (!Issues.empty()) { - auto result = TIssues{}; - for (auto& issues : Issues) { - result.AddIssues(issues); - } + TListResult res; + if (MaybeError) { YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] SetPromise before set 1"; - res = std::move(result); + res = *MaybeError; } else { // TODO: add verification - auto result = NS3Lister::TListEntries{.Objects = Objects}; + auto result = TListEntries{.Objects = Objects}; for (auto& directoryPrefix : DirectoryPrefixQueue) { result.Directories.push_back({.Path = directoryPrefix}); } @@ -759,16 +796,16 @@ public: DirectoryListingStrategy.List(listingRequest, Options) .Subscribe( - [prefix, self = This](const TFuture<NS3Lister::TListResult>& future) { + [prefix, self = This](const TFuture<TListResult>& future) { ListingCallback(self, future, prefix); }); } }; using TSharedStatePtr = std::shared_ptr<TSharedState>; static TSharedStatePtr MakeState( - NS3Lister::TListingRequest defaultParams, + TListingRequest defaultParams, TListingRequestFactory listingRequestFactory, - TSharedState::TLimitExceededStateModificator limitExceededStateModificator, + ELimitExceededAction limitExceededAction, TSharedState::TDirectoryToListMatcher directoryToListMatcher, TSharedState::TEarlyStopMatcher earlyStopMatcher, TS3ListingOptions options, @@ -781,11 +818,11 @@ public: .Options = (options), .DirectoryListingStrategy = (std::move(directoryListingStrategy)), .ListingRequestFactory = (std::move(listingRequestFactory)), - .LimitExceededStateModificator = (std::move(limitExceededStateModificator)), + .LimitExceededAction = (std::move(limitExceededAction)), .DirectoryToListMatcher = (std::move(directoryToListMatcher)), .EarlyStopMatcher = (std::move(earlyStopMatcher)), .DirectoryPrefixQueue = (std::move(initialPathPrefixes)), - .CurrentPromise = NewPromise<NS3Lister::TListResult>(), + .CurrentPromise = NewPromise<TListResult>(), .Limit = (limit), .MaxParallelOps = maxParallelOps}); res->This = res; @@ -793,9 +830,9 @@ public: } TConcurrentBFSDirectoryResolverIterator( - NS3Lister::TListingRequest defaultParams, + TListingRequest defaultParams, TListingRequestFactory listingRequestFactory, - TSharedState::TLimitExceededStateModificator limitExceededStateModificator, + ELimitExceededAction limitExceededAction, TSharedState::TDirectoryToListMatcher directoryToListMatcher, TSharedState::TEarlyStopMatcher earlyStopMatcher, TS3ListingOptions options, @@ -806,7 +843,7 @@ public: : State(MakeState( std::move(defaultParams), std::move(listingRequestFactory), - std::move(limitExceededStateModificator), + std::move(limitExceededAction), std::move(directoryToListMatcher), std::move(earlyStopMatcher), options, @@ -815,10 +852,10 @@ public: limit, maxParallelOps)) { } - TFuture<NS3Lister::TListResult> Next() override { + TFuture<TListResult> Next() override { if (!First) { - return MakeFuture(NS3Lister::TListResult{TIssues{ - TIssue{"This iterator should be finished after first iteration"}}}); + return MakeFuture(TListResult{MakeGenericError( + "This iterator should be finished after first iteration")}); } YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] Next before lock"; @@ -826,7 +863,7 @@ public: First = false; if (State->DirectoryPrefixQueue.empty()) { - return MakeFuture(NS3Lister::TListResult{TIssues{TIssue{"No path to list"}}}); + return MakeFuture(TListResult{MakeGenericError("No path to list")}); } if (!State->IsListingFinished) { @@ -854,29 +891,26 @@ class TConcurrentUnPartitionedDatasetS3ListingStrategy : public TCollectingS3ListingStrategy { public: TConcurrentUnPartitionedDatasetS3ListingStrategy( - const NS3Lister::IS3ListerFactory::TPtr& listerFactory, + const IS3ListerFactory::TPtr& listerFactory, const IHTTPGateway::TPtr& httpGateway, size_t minParallelism, size_t maxParallelOps, bool allowLocalFiles) : TCollectingS3ListingStrategy( [listerFactory, httpGateway, minParallelism, allowLocalFiles, maxParallelOps]( - const NS3Lister::TListingRequest& listingRequest, + const TListingRequest& listingRequest, TS3ListingOptions options) { - auto ptr = std::shared_ptr<NS3Lister::IS3Lister>( + auto ptr = std::shared_ptr<IS3Lister>( new TConcurrentBFSDirectoryResolverIterator{ listingRequest, - [](const NS3Lister::TListingRequest& defaultParams, + [](const TListingRequest& defaultParams, const TString& pathPrefix) { - NS3Lister::TListingRequest request(defaultParams); + TListingRequest request(defaultParams); request.Prefix = pathPrefix; return request; }, - [](TConcurrentBFSDirectoryResolverIterator::TSharedState& state, - const TString& pathPrefix) { - state.DirectoryPrefixQueue.push_back(pathPrefix); - }, - [](const NS3Lister::TDirectoryListEntry& entry) -> bool { + ELimitExceededAction::Proceed, + [](const TDirectoryListEntry& entry) -> bool { Y_UNUSED(entry); return true; }, @@ -907,29 +941,25 @@ class TConcurrentPartitionedDatasetS3ListingStrategy : public TCollectingS3ListingStrategy { public: TConcurrentPartitionedDatasetS3ListingStrategy( - const NS3Lister::IS3ListerFactory::TPtr& listerFactory, + const IS3ListerFactory::TPtr& listerFactory, const IHTTPGateway::TPtr& httpGateway, size_t maxParallelOps, bool allowLocalFiles) : TCollectingS3ListingStrategy( [listerFactory, httpGateway, allowLocalFiles, maxParallelOps]( - const NS3Lister::TListingRequest& listingRequest, + const TListingRequest& listingRequest, TS3ListingOptions options) { - auto ptr = std::shared_ptr<NS3Lister::IS3Lister>( + auto ptr = std::shared_ptr<IS3Lister>( new TConcurrentBFSDirectoryResolverIterator{ listingRequest, - [](const NS3Lister::TListingRequest& defaultParams, + [](const TListingRequest& defaultParams, const TString& pathPrefix) { - NS3Lister::TListingRequest request(defaultParams); + TListingRequest request(defaultParams); request.Prefix = pathPrefix; return request; }, - [](TConcurrentBFSDirectoryResolverIterator::TSharedState& state, - const TString& pathPrefix) { - Y_UNUSED(pathPrefix); - state.Issues.push_back(TIssues{MakeLimitExceededIssue()}); - }, - [](const NS3Lister::TDirectoryListEntry& entry) -> bool { + ELimitExceededAction::RaiseError, + [](const TDirectoryListEntry& entry) -> bool { return !entry.MatchedRegexp; }, [](const TConcurrentBFSDirectoryResolverIterator::TSharedState& @@ -961,25 +991,25 @@ public: explicit TLoggingS3ListingStrategy(IS3ListingStrategy::TPtr lister) : Lister(std::move(lister)) { } - TFuture<NS3Lister::TListResult> List( - const NS3Lister::TListingRequest& listingRequest, + TFuture<TListResult> List( + const TListingRequest& listingRequest, const TS3ListingOptions& options) override { YQL_CLOG(INFO, ProviderS3) << "[TLoggingS3ListingStrategy] Going to list request " << listingRequest << " with options " << options; return Lister->List(listingRequest, options) - .Apply([start = TInstant::Now()](const TFuture<NS3Lister::TListResult>& future) { + .Apply([start = TInstant::Now()](const TFuture<TListResult>& future) { auto duration = TInstant::Now() - start; std::visit( TOverloaded{ - [duration](const NS3Lister::TListEntries& entries) { + [duration](const TListEntries& entries) { YQL_CLOG(INFO, ProviderS3) << "[TLoggingS3ListingStrategy] Listing took " << duration << " and ended with " << entries.Size() << " entries"; }, - [duration](const TIssues& issues) { + [duration](const TListError& error) { YQL_CLOG(INFO, ProviderS3) << "[TLoggingS3ListingStrategy] Listing took " << duration - << " and ended with " << issues.Size() << " issues"; + << " and ended with " << error.Issues.Size() << " issues"; }}, future.GetValueSync()); @@ -994,7 +1024,7 @@ private: IS3ListingStrategy::TPtr MakeS3ListingStrategy( const IHTTPGateway::TPtr& httpGateway, - const NS3Lister::IS3ListerFactory::TPtr& listerFactory, + const IS3ListerFactory::TPtr& listerFactory, ui64 minDesiredDirectoriesOfFilesPerQuery, size_t maxParallelOps, bool allowLocalFiles) { diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp index 98bc232a1e6..2236550cf94 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp @@ -51,12 +51,13 @@ Y_UNIT_TEST_SUITE(TCollectingS3ListingStrategyTests) { void UnitAssertListResultEquals( const NS3Lister::TListResult& expected, const NS3Lister::TListResult& actual) { UNIT_ASSERT_VALUES_EQUAL(expected.index(), actual.index()); - if (std::holds_alternative<TIssues>(expected)) { - const auto& expectedIssues = std::get<TIssues>(expected); - const auto& actualIssues = std::get<TIssues>(actual); - UNIT_ASSERT_VALUES_EQUAL(expectedIssues.Size(), actualIssues.Size()); + if (std::holds_alternative<TListError>(expected)) { + const auto& expectedError = std::get<TListError>(expected); + const auto& actualError = std::get<TListError>(actual); + UNIT_ASSERT_VALUES_EQUAL(expectedError.Type, actualError.Type); + UNIT_ASSERT_VALUES_EQUAL(expectedError.Issues.Size(), actualError.Issues.Size()); UNIT_ASSERT_VALUES_EQUAL( - expectedIssues.ToOneLineString(), actualIssues.ToOneLineString()); + expectedError.Issues.ToOneLineString(), actualError.Issues.ToOneLineString()); return; } @@ -158,7 +159,7 @@ Y_UNIT_TEST(IfThereAreMoreRecordsThanSpecifiedByLimitShouldReturnError) { auto actualResultFuture = strategy.List( NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"}, TS3ListingOptions{.MaxResultSet = 1}); - auto expectedResult = NS3Lister::TListResult{TIssues{MakeLimitExceededIssue()}}; + auto expectedResult = NS3Lister::TListResult{MakeLimitExceededError("TTest", 1, 2)}; const auto& actualResult = actualResultFuture.GetValue(); UnitAssertListResultEquals(expectedResult, actualResult); } @@ -177,13 +178,13 @@ Y_UNIT_TEST(IfAnyIterationReturnIssueThanWholeStrategyShouldReturnIt) { .Path = "a/a", .Size = 10, }}}, - TIssues{TIssue("TEST_ISSUE")}}))); + MakeGenericError("TEST_ISSUE")}))); }, "TTest"}; auto actualResultFuture = strategy.List( NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"}, TS3ListingOptions{.MaxResultSet = 1}); - auto expectedResult = NS3Lister::TListResult{TIssues{TIssue("TEST_ISSUE")}}; + auto expectedResult = NS3Lister::TListResult{MakeGenericError("TEST_ISSUE")}; const auto& actualResult = actualResultFuture.GetValue(); UnitAssertListResultEquals(expectedResult, actualResult); } @@ -202,7 +203,7 @@ Y_UNIT_TEST(IfExceptionIsReturnedFromIteratorThanItShouldCovertItToIssue) { auto actualResultFuture = strategy.List( NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"}, TS3ListingOptions{.MaxResultSet = 10}); UNIT_ASSERT(actualResultFuture.HasValue()); - auto expectedResult = NS3Lister::TListResult{TIssues{TIssue("EXCEPTION MESSAGE")}}; + auto expectedResult = NS3Lister::TListResult{MakeGenericError("EXCEPTION MESSAGE")}; const auto& actualResult = actualResultFuture.GetValue(); UnitAssertListResultEquals(expectedResult, actualResult); } |