aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorauzhegov <auzhegov@yandex-team.com>2023-06-30 20:38:14 +0300
committerauzhegov <auzhegov@yandex-team.com>2023-06-30 20:38:14 +0300
commitfa133227cb17a256e3e20c774312e12b779761ee (patch)
tree42f67b5b6a7f7ab51a7c635fb863434111098074
parent2364d4de6d0bb2b84c98308b7ba0693d841bf83a (diff)
downloadydb-fa133227cb17a256e3e20c774312e12b779761ee.tar.gz
IL: no more strategy to test
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp4
-rw-r--r--ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp2
-rw-r--r--ydb/library/yql/providers/s3/object_listers/yql_s3_list.h10
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp20
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp460
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp19
6 files changed, 280 insertions, 235 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index c3f8d44331c..dcda43cca01 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -415,8 +415,8 @@ public:
bool SaveRetrievedResults(const NS3Lister::TListResult& listingResult) {
LOG_T("TS3FileQueueActor", "SaveRetrievedResults");
- if (std::holds_alternative<TIssues>(listingResult)) {
- MaybeIssues = std::get<TIssues>(listingResult);
+ if (std::holds_alternative<NS3Lister::TListError>(listingResult)) {
+ MaybeIssues = std::get<NS3Lister::TListError>(listingResult).Issues;
return false;
}
diff --git a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp
index 161b3c1d7b7..620d3b509e6 100644
--- a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp
+++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp
@@ -408,7 +408,7 @@ private:
YQL_CLOG(INFO, ProviderS3)
<< "Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix
<< ": got error from http gateway: " << issues.ToString(true);
- ctx.Promise.SetValue(std::move(issues));
+ ctx.Promise.SetValue(TListError{EListError::GENERAL, std::move(issues)});
ctx.NextRequestPromise.SetValue(Nothing());
}
} catch (const std::exception& ex) {
diff --git a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h
index ecd1449c1a6..e4971dedcc0 100644
--- a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h
+++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h
@@ -137,7 +137,15 @@ public:
std::vector<TDirectoryListEntry> Directories;
};
-using TListResult = std::variant<TListEntries, TIssues>;
+enum class EListError {
+ GENERAL,
+ LIMIT_EXCEEDED
+};
+struct TListError {
+ EListError Type;
+ TIssues Issues;
+};
+using TListResult = std::variant<TListEntries, TListError>;
struct TListingRequest {
TString Url;
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
index 418ff83e70f..9feecb67225 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
@@ -184,13 +184,13 @@ private:
const auto& listResult = it->second.GetValue();
if (listResult.index() == 1) {
- const auto& issues = std::get<TIssues>(listResult);
+ const auto& error = std::get<NS3Lister::TListError>(listResult);
YQL_CLOG(INFO, ProviderS3)
<< "Discovery " << req.S3Request.Url << req.S3Request.Pattern
- << " error " << issues.ToString();
+ << " error " << error.Issues.ToString();
std::for_each(
- issues.begin(),
- issues.end(),
+ error.Issues.begin(),
+ error.Issues.end(),
std::bind(
&TExprContext::AddError, std::ref(ctx), std::placeholders::_1));
return TStatus::Error;
@@ -326,9 +326,15 @@ private:
const NS3Lister::TListResult& listResult = it->second.GetValue();
if (listResult.index() == 1) {
- const auto& issues = std::get<TIssues>(listResult);
- YQL_CLOG(INFO, ProviderS3) << "Discovery " << req.S3Request.Url << req.S3Request.Pattern << " error " << issues.ToString();
- std::for_each(issues.begin(), issues.end(), std::bind(&TExprContext::AddError, std::ref(ctx), std::placeholders::_1));
+ const auto& error = std::get<NS3Lister::TListError>(listResult);
+ YQL_CLOG(INFO, ProviderS3)
+ << "Discovery " << req.S3Request.Url << req.S3Request.Pattern
+ << " error " << error.Issues.ToString();
+ std::for_each(
+ error.Issues.begin(),
+ error.Issues.end(),
+ std::bind(
+ &TExprContext::AddError, std::ref(ctx), std::placeholders::_1));
return TStatus::Error;
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp
index 2ff15a035a8..3351d3ba8fb 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp
@@ -1,5 +1,6 @@
#include "yql_s3_listing_strategy.h"
+#include <ydb/library/yql/core/issue/protos/issue_id.pb.h>
#include <ydb/library/yql/providers/s3/common/util.h>
#include <ydb/library/yql/providers/s3/object_listers/yql_s3_future_algorithms.h>
#include <ydb/library/yql/providers/s3/object_listers/yql_s3_path.h>
@@ -26,32 +27,37 @@ IOutputStream& operator<<(IOutputStream& stream, const TS3ListingOptions& option
namespace {
using namespace NThreading;
+using namespace NS3Lister;
+
+TListError MakeLimitExceededError(
+ const TString& componentName, ui64 limit, ui64 actual) {
+ auto issue = TIssue(
+ TStringBuilder{} << '[' << componentName << "] Limit exceeded. Limit: " << limit
+ << " Actual: " << actual);
+ return TListError{EListError::LIMIT_EXCEEDED, TIssues{std::move(issue)}};
+}
-TIssue MakeLimitExceededIssue() {
- auto issue = TIssue("Limit exceeded");
- issue.SetCode(0U, ESeverity::TSeverityIds_ESeverityId_S_WARNING);
- return issue;
+TListError MakeGenericError(const TString& description) {
+ auto issue = TIssue(description);
+ return TListError{EListError::GENERAL, TIssues{std::move(issue)}};
}
-bool IsRecoverableIssue(const TIssues& issues) {
- if (issues.Size() != 1) {
- return false;
- }
- return issues.begin()->GetSeverity() >= ESeverity::TSeverityIds_ESeverityId_S_WARNING;
+bool IsRecoverableError(const TListError& error) {
+ return error.Type == EListError::LIMIT_EXCEEDED;
}
class TCollectingS3ListingStrategy : public IS3ListingStrategy {
public:
- using TListerFactoryMethod = std::function<TFuture<NS3Lister::IS3Lister::TPtr>(
- const NS3Lister::TListingRequest& listingRequest, TS3ListingOptions options)>;
+ using TListerFactoryMethod = std::function<TFuture<IS3Lister::TPtr>(
+ const TListingRequest& listingRequest, TS3ListingOptions options)>;
TCollectingS3ListingStrategy(TListerFactoryMethod&& listerFactoryMethod, TString collectingName)
: ListerFactoryMethod(std::move(listerFactoryMethod))
, CollectingName(std::move(collectingName)) { }
- TFuture<NS3Lister::TListResult> List(
- const NS3Lister::TListingRequest& listingRequest,
+ TFuture<TListResult> List(
+ const TListingRequest& listingRequest,
const TS3ListingOptions& options) override {
Y_UNUSED(options);
YQL_CLOG(TRACE, ProviderS3) << "[TCollectingS3ListingStrategy:" << CollectingName
@@ -59,27 +65,28 @@ public:
auto futureLister = ListerFactoryMethod(listingRequest, options);
return futureLister.Apply([listingRequest, options, name = CollectingName, limit = options.MaxResultSet](
- const TFuture<NS3Lister::IS3Lister::TPtr>& lister) {
+ const TFuture<IS3Lister::TPtr>& lister) {
try {
return DoListCallback(lister.GetValue(), options, name, limit);
} catch (...) {
- return MakeErrorFuture<NS3Lister::TListResult>(std::current_exception());
+ return MakeErrorFuture<TListResult>(std::current_exception());
}
});
}
private:
static inline auto MakeNewListingChunkHandler(
- NS3Lister::TListResult& state, size_t limit, TString collectingName) {
- return [&state, limit, name = std::move(collectingName)](NS3Lister::TListEntries&& chunkEntries) {
- auto& stateEntries = std::get<NS3Lister::TListEntries>(state);
+ TListResult& state, size_t limit, TString collectingName) {
+ return [&state, limit, name = std::move(collectingName)](TListEntries&& chunkEntries) {
+ auto& stateEntries = std::get<TListEntries>(state);
if (stateEntries.Size() + chunkEntries.Size() > limit) {
YQL_CLOG(TRACE, ProviderS3)
<< "[TCollectingS3ListingStrategy:" << name
<< "] Collected "
<< stateEntries.Size() + chunkEntries.Size()
<< " object paths which is more than limit " << limit;
- state = TIssues{MakeLimitExceededIssue()};
+ state = MakeLimitExceededError(
+ name, limit, stateEntries.Size() + chunkEntries.Size());
return EAggregationAction::Stop;
}
YQL_CLOG(TRACE, ProviderS3)
@@ -90,24 +97,24 @@ private:
return EAggregationAction::Proceed;
};
}
- static inline auto MakeIssuesHandler(NS3Lister::TListResult& state) {
- return [&state](const TIssues& issues) {
- state = issues;
+ static inline auto MakeIssuesHandler(TListResult& state) {
+ return [&state](const TListError& error) {
+ state = error;
return EAggregationAction::Stop;
};
}
static inline EAggregationAction ExceptionHandler(
- NS3Lister::TListResult& state, const std::exception& exception) {
- state = TIssues{TIssue{exception.what()}};
+ TListResult& state, const std::exception& exception) {
+ state = MakeGenericError(exception.what());
return EAggregationAction::Stop;
}
- static TFuture<NS3Lister::TListResult> DoListCallback(
- NS3Lister::IS3Lister::TPtr lister, TS3ListingOptions options, TString name, ui64 limit) {
+ static TFuture<TListResult> DoListCallback(
+ IS3Lister::TPtr lister, TS3ListingOptions options, TString name, ui64 limit) {
Y_UNUSED(options);
- return NYql::AccumulateWithEarlyStop<NS3Lister::TListResult>(
+ return NYql::AccumulateWithEarlyStop<TListResult>(
std::move(lister),
- NS3Lister::TListResult{},
- [limit, name](NS3Lister::TListResult& state, NS3Lister::TListResult&& chunk) {
+ TListResult{},
+ [limit, name](TListResult& state, TListResult&& chunk) {
return std::visit(
TOverloaded{
std::move(MakeNewListingChunkHandler(state, limit, std::move(name))),
@@ -125,12 +132,12 @@ private:
class TFlatFileS3ListingStrategy : public TCollectingS3ListingStrategy {
public:
TFlatFileS3ListingStrategy(
- const NS3Lister::IS3ListerFactory::TPtr& listerFactory,
+ const IS3ListerFactory::TPtr& listerFactory,
const IHTTPGateway::TPtr& httpGateway,
bool allowLocalFiles)
: TCollectingS3ListingStrategy(
[allowLocalFiles, httpGateway, listerFactory](
- const NS3Lister::TListingRequest& listingRequest,
+ const TListingRequest& listingRequest,
TS3ListingOptions options) {
Y_UNUSED(options);
return listerFactory->Make(
@@ -142,12 +149,12 @@ public:
class TDirectoryS3ListingStrategy : public TCollectingS3ListingStrategy {
public:
TDirectoryS3ListingStrategy(
- const NS3Lister::IS3ListerFactory::TPtr& listerFactory,
+ const IS3ListerFactory::TPtr& listerFactory,
const IHTTPGateway::TPtr& httpGateway,
bool allowLocalFiles)
: TCollectingS3ListingStrategy(
[allowLocalFiles, httpGateway, listerFactory](
- const NS3Lister::TListingRequest& listingRequest,
+ const TListingRequest& listingRequest,
TS3ListingOptions options) {
Y_UNUSED(options);
return listerFactory->Make(
@@ -161,10 +168,10 @@ public:
using TStrategyContainer = std::vector<std::shared_ptr<IS3ListingStrategy>>;
using TStrategyContainerPtr = std::shared_ptr<TStrategyContainer>;
- class TStrategyListIterator : public NS3Lister::IS3Lister {
+ class TStrategyListIterator : public IS3Lister {
public:
TStrategyListIterator(
- NS3Lister::TListingRequest listingRequest,
+ TListingRequest listingRequest,
TS3ListingOptions options,
TStrategyContainerPtr strategies)
: ListingRequest(std::move(listingRequest))
@@ -173,7 +180,7 @@ public:
, Iterator(Strategies->cbegin())
, End(Strategies->end()) { }
- TFuture<NS3Lister::TListResult> Next() override {
+ TFuture<TListResult> Next() override {
return (*Iterator)->List(ListingRequest, Options);
}
@@ -187,7 +194,7 @@ public:
}
private:
- const NS3Lister::TListingRequest ListingRequest;
+ const TListingRequest ListingRequest;
const TS3ListingOptions Options;
const TStrategyContainerPtr Strategies;
TStrategyContainer::const_iterator Iterator;
@@ -195,45 +202,52 @@ public:
bool IsFirst = true;
};
- class AggregationState {
- public:
- NS3Lister::TListResult Result;
+ struct AggregationState {
+ TListResult Result;
+ std::vector<TIntrusivePtr<TIssue>> PreviousIssues;
bool Set = false;
};
explicit TCompositeS3ListingStrategy(TStrategyContainer&& strategies)
: Strategies(std::make_shared<TStrategyContainer>(std::move(strategies))) { }
- TFuture<NS3Lister::TListResult> List(
- const NS3Lister::TListingRequest& listingRequest,
+ TFuture<TListResult> List(
+ const TListingRequest& listingRequest,
const TS3ListingOptions& options) override {
auto strategyListIterator =
std::make_unique<TStrategyListIterator>(listingRequest, options, Strategies);
return NYql::AccumulateWithEarlyStop<AggregationState>(
std::move(strategyListIterator),
- AggregationState{NS3Lister::TListEntries{}},
- [](AggregationState& state, NS3Lister::TListResult&& chunk) {
+ AggregationState{
+ TListEntries{}, std::vector<TIntrusivePtr<TIssue>>{}},
+ [](AggregationState& state, TListResult&& chunk) {
auto newChunkHandler =
- [&state](NS3Lister::TListEntries&& chunkEntries) mutable {
+ [&state](TListEntries&& chunkEntries) mutable {
YQL_CLOG(INFO, ProviderS3)
<< "[TCompositeS3ListingStrategy] Strategy successfully listed paths. Returning result: "
<< chunkEntries.Objects.size() << " objects, "
<< chunkEntries.Directories.size() << " path prefixes";
- std::get<NS3Lister::TListEntries>(state.Result) =
+ std::get<TListEntries>(state.Result) =
chunkEntries;
state.Set = true;
return EAggregationAction::Stop;
};
- auto errorHandler = [&state](const TIssues& issues) mutable {
- if (IsRecoverableIssue(issues)) {
+ auto errorHandler = [&state](const TListError& error) mutable {
+ auto issue = MakeIntrusive<TIssue>("Strategy failed with issues");
+ for (auto& subIssue: error.Issues) {
+ issue->AddSubIssue(MakeIntrusive<TIssue>(subIssue));
+ }
+ state.PreviousIssues.emplace_back(std::move(issue));
+
+ if (IsRecoverableError(error)) {
YQL_CLOG(INFO, ProviderS3)
<< "[TCompositeS3ListingStrategy] Strategy failed "
<< " to list paths. Trying next one... ";
return EAggregationAction::Proceed;
}
- state.Result = issues;
+ state.Result = error;
state.Set = true;
return EAggregationAction::Stop;
};
@@ -245,14 +259,20 @@ public:
std::move(chunk));
},
[](AggregationState& state, const std::exception& exception) {
- state.Result = TIssues{TIssue{exception.what()}};
+ state.Result = MakeGenericError(exception.what());
state.Set = true;
return EAggregationAction::Stop;
})
- .Apply([](const TFuture<AggregationState>& state) {
+ .Apply([](const TFuture<AggregationState>& state) -> TListResult {
auto& result = state.GetValue();
- return result.Set ? result.Result
- : TIssues{TIssue("No more strategy to test")};
+ if (!result.Set) {
+ auto issue = TIssue{"Couldn’t list paths in S3 source"};
+ for (auto& subIssue : result.PreviousIssues) {
+ issue.AddSubIssue(subIssue);
+ }
+ return TListError{EListError::GENERAL, TIssues{std::move(issue)}};
+ }
+ return result.Result;
});
}
@@ -268,8 +288,8 @@ public:
TConditionalS3ListingStrategy(std::initializer_list<TPair> list)
: Strategies(list.begin(), list.end()) { }
- TFuture<NS3Lister::TListResult> List(
- const NS3Lister::TListingRequest& listingRequest,
+ TFuture<TListResult> List(
+ const TListingRequest& listingRequest,
const TS3ListingOptions& options) override {
auto strategyIt = std::find_if(
Strategies.begin(), Strategies.end(), [options](const TPair& record) {
@@ -278,8 +298,8 @@ public:
});
if (strategyIt == Strategies.end()) {
- auto issue = TIssues{TIssue("No strategy matched listing request")};
- return MakeFuture(NS3Lister::TListResult{issue});
+ auto error = MakeGenericError("No strategy matched listing request");
+ return MakeFuture(TListResult{std::move(error)});
}
return strategyIt->second->List(listingRequest, options);
@@ -291,10 +311,10 @@ private:
class TPartitionedDatasetS3ListingStrategy : public TCollectingS3ListingStrategy {
public:
- class PartitionedDirectoryResolverIterator : public NS3Lister::IS3Lister {
+ class PartitionedDirectoryResolverIterator : public IS3Lister {
public:
PartitionedDirectoryResolverIterator(
- NS3Lister::TListingRequest defaultParams,
+ TListingRequest defaultParams,
const TString& basePrefix,
TS3ListingOptions options,
TDirectoryS3ListingStrategy directoryListingStrategy)
@@ -314,19 +334,19 @@ public:
* 4) If directory is not matched against regexp - list this directory recursively
* 5) if there is no directory to list - algorithm ends
*/
- TFuture<NS3Lister::TListResult> Next() override {
+ TFuture<TListResult> Next() override {
First = false;
NextDirectoryListeningChunk = GetNextPrefixLister().Apply(
[queue = DirectoryPrefixQueue](
- const TFuture<NS3Lister::TListResult>& future) -> NS3Lister::TListResult {
+ const TFuture<TListResult>& future) -> TListResult {
try {
auto& nextBatch = future.GetValue();
- if (std::holds_alternative<TIssues>(nextBatch)) {
- return std::get<TIssues>(nextBatch);
+ if (std::holds_alternative<TListError>(nextBatch)) {
+ return std::get<TListError>(nextBatch);
}
- auto& listingResult = std::get<NS3Lister::TListEntries>(nextBatch);
- auto result = NS3Lister::TListEntries{};
+ auto& listingResult = std::get<TListEntries>(nextBatch);
+ auto result = TListEntries{};
result.Objects.insert(
result.Objects.begin(),
listingResult.Objects.cbegin(),
@@ -340,9 +360,9 @@ public:
}
}
- return NS3Lister::TListResult{result};
+ return TListResult{result};
} catch (std::exception& e) {
- return TIssues{TIssue{e.what()}};
+ return MakeGenericError(e.what());
}
});
return NextDirectoryListeningChunk;
@@ -356,39 +376,38 @@ public:
}
private:
- TFuture<NS3Lister::TListResult> GetNextPrefixLister() {
+ TFuture<TListResult> GetNextPrefixLister() {
if (DirectoryPrefixQueue->empty()) {
- return MakeFuture(
- NS3Lister::TListResult{TIssues{TIssue{"No path to list"}}});
+ return MakeFuture(TListResult{MakeGenericError("No path to list")});
}
auto prefix = DirectoryPrefixQueue->front();
DirectoryPrefixQueue->pop_front();
- auto request = NS3Lister::TListingRequest(DefaultParams);
+ auto request = TListingRequest(DefaultParams);
request.Prefix = prefix;
return DirectoryListingStrategy.List(request, Options);
}
private:
- const NS3Lister::TListingRequest DefaultParams;
+ const TListingRequest DefaultParams;
const TS3ListingOptions Options;
std::shared_ptr<std::deque<TString>> DirectoryPrefixQueue;
- std::vector<NS3Lister::TObjectListEntry> Objects;
+ std::vector<TObjectListEntry> Objects;
TDirectoryS3ListingStrategy DirectoryListingStrategy;
- TFuture<NS3Lister::TListResult> NextDirectoryListeningChunk;
+ TFuture<TListResult> NextDirectoryListeningChunk;
bool First = true;
};
TPartitionedDatasetS3ListingStrategy(
- const NS3Lister::IS3ListerFactory::TPtr& listerFactory,
+ const IS3ListerFactory::TPtr& listerFactory,
const IHTTPGateway::TPtr& httpGateway,
bool allowLocalFiles)
: TCollectingS3ListingStrategy(
[listerFactory, httpGateway, allowLocalFiles](
- const NS3Lister::TListingRequest& listingRequest,
+ const TListingRequest& listingRequest,
TS3ListingOptions options) {
- auto ptr = std::shared_ptr<NS3Lister::IS3Lister>(
+ auto ptr = std::shared_ptr<IS3Lister>(
new PartitionedDirectoryResolverIterator{
listingRequest,
listingRequest.Prefix,
@@ -400,13 +419,13 @@ public:
"TPartitionedDatasetS3ListingStrategy") { }
};
-class TBFSDirectoryResolverIterator : public NS3Lister::IS3Lister {
+class TBFSDirectoryResolverIterator : public IS3Lister {
public:
- using TListingRequestFactory = std::function<NS3Lister::TListingRequest(
- const NS3Lister::TListingRequest& defaultParams, const TString& pathPrefix)>;
+ using TListingRequestFactory = std::function<TListingRequest(
+ const TListingRequest& defaultParams, const TString& pathPrefix)>;
TBFSDirectoryResolverIterator(
- NS3Lister::TListingRequest defaultParams,
+ TListingRequest defaultParams,
TListingRequestFactory listingRequestFactory,
TS3ListingOptions options,
std::deque<TString> initialPathPrefixes,
@@ -421,72 +440,83 @@ public:
, MinParallelism(minParallelism)
, Limit(limit) { }
- TFuture<NS3Lister::TListResult> Next() override {
+ TFuture<TListResult> Next() override {
First = false;
if (DirectoryPrefixQueue.empty()) {
- return MakeFuture(NS3Lister::TListResult{TIssues{TIssue{"No path to list"}}});
+ return MakeFuture(TListResult{MakeGenericError("No path to list")});
}
auto sourcePrefix = DirectoryPrefixQueue.front();
DirectoryPrefixQueue.pop_front();
NextDirectoryListeningChunk =
GetPrefixLister(sourcePrefix)
- .Apply(
- [this, sourcePrefix](const TFuture<NS3Lister::TListResult>& future)
- -> NS3Lister::TListResult {
- YQL_CLOG(TRACE, ProviderS3)
- << "[TBFSDirectoryResolverIterator] Got new listing result. Collected entries: "
- << ReturnedSize + DirectoryPrefixQueue.size();
-
- try {
- auto& nextBatch = future.GetValue();
- if (std::holds_alternative<TIssues>(nextBatch)) {
- return std::get<TIssues>(nextBatch);
- }
+ .Apply([this, sourcePrefix](const TFuture<TListResult>& future) -> TListResult {
+ YQL_CLOG(TRACE, ProviderS3)
+ << "[TBFSDirectoryResolverIterator] Got new listing result. Collected entries: "
+ << ReturnedSize + DirectoryPrefixQueue.size();
- auto& listingResult =
- std::get<NS3Lister::TListEntries>(nextBatch);
- auto result = NS3Lister::TListEntries{};
- auto currentListingTotalSize = ReturnedSize +
- DirectoryPrefixQueue.size() +
- listingResult.Size();
-
- if (currentListingTotalSize > Limit) {
- // Stop listing
- result.Directories.push_back({.Path = sourcePrefix});
- for (auto& directoryPrefix : DirectoryPrefixQueue) {
- result.Directories.push_back({.Path = directoryPrefix});
- }
- DirectoryPrefixQueue.clear();
- } else {
- result.Objects.insert(
- result.Objects.end(),
- std::make_move_iterator(listingResult.Objects.begin()),
- std::make_move_iterator(listingResult.Objects.end()));
- if (currentListingTotalSize < MinParallelism) {
- for (auto& directoryPrefix : listingResult.Directories) {
- DirectoryPrefixQueue.push_back(directoryPrefix.Path);
- }
- } else {
- for (auto& directoryPrefix : listingResult.Directories) {
- result.Directories.push_back(
- {.Path = directoryPrefix.Path});
- }
- for (auto& directoryPrefix : DirectoryPrefixQueue) {
- result.Directories.push_back(
- {.Path = directoryPrefix});
- }
- DirectoryPrefixQueue.clear();
- }
+ try {
+ auto& nextBatch = future.GetValue();
+ if (std::holds_alternative<TListError>(nextBatch)) {
+ auto& error = std::get<TListError>(nextBatch);
+ if (error.Type == EListError::LIMIT_EXCEEDED) {
+ auto result = TListEntries{};
+ PerformEarlyStop(result, sourcePrefix);
+ return result;
}
- ReturnedSize += result.Size();
- return NS3Lister::TListResult{result};
- } catch (std::exception& e) {
- return TIssues{TIssue{e.what()}};
+ return error;
}
- });
+
+ auto& listingResult = std::get<TListEntries>(nextBatch);
+ auto currentListingTotalSize = ReturnedSize +
+ DirectoryPrefixQueue.size() +
+ listingResult.Size();
+
+ auto result = TListEntries{};
+ if (currentListingTotalSize > Limit) {
+ // Stop listing
+ PerformEarlyStop(result, sourcePrefix);
+ } else {
+ ProcessDataChunk(result, listingResult, currentListingTotalSize);
+ }
+
+ ReturnedSize += result.Size();
+ return result;
+ } catch (std::exception& e) {
+ return MakeGenericError(e.what());
+ }
+ });
return NextDirectoryListeningChunk;
}
+ void PerformEarlyStop(TListEntries& result, const TString& sourcePrefix) {
+ result.Directories.push_back({.Path = sourcePrefix});
+ for (auto& directoryPrefix : DirectoryPrefixQueue) {
+ result.Directories.push_back({.Path = directoryPrefix});
+ }
+ DirectoryPrefixQueue.clear();
+ }
+ void ProcessDataChunk(
+ TListEntries& result,
+ const TListEntries& listingResult,
+ ui64 currentListingTotalSize) {
+ result.Objects.insert(
+ result.Objects.end(),
+ std::make_move_iterator(listingResult.Objects.begin()),
+ std::make_move_iterator(listingResult.Objects.end()));
+ if (currentListingTotalSize < MinParallelism) {
+ for (auto& directoryPrefix : listingResult.Directories) {
+ DirectoryPrefixQueue.push_back(directoryPrefix.Path);
+ }
+ } else {
+ for (auto& directoryPrefix : listingResult.Directories) {
+ result.Directories.push_back({.Path = directoryPrefix.Path});
+ }
+ for (auto& directoryPrefix : DirectoryPrefixQueue) {
+ result.Directories.push_back({.Path = directoryPrefix});
+ }
+ DirectoryPrefixQueue.clear();
+ }
+ }
bool HasNext() override {
if (!First) {
@@ -500,21 +530,21 @@ public:
}
private:
- TFuture<NS3Lister::TListResult> GetPrefixLister(const TString& prefix) {
+ TFuture<TListResult> GetPrefixLister(const TString& prefix) {
const auto& listingRequest = ListingRequestFactory(DefaultParams, prefix);
auto listResult = DirectoryListingStrategy.List(listingRequest, Options);
return listResult;
}
private:
- const NS3Lister::TListingRequest DefaultParams;
+ const TListingRequest DefaultParams;
const TS3ListingOptions Options;
TDirectoryS3ListingStrategy DirectoryListingStrategy;
TListingRequestFactory ListingRequestFactory;
std::deque<TString> DirectoryPrefixQueue;
- std::vector<NS3Lister::TObjectListEntry> Objects;
- TFuture<NS3Lister::TListResult> NextDirectoryListeningChunk;
+ std::vector<TObjectListEntry> Objects;
+ TFuture<TListResult> NextDirectoryListeningChunk;
bool First = true;
size_t ReturnedSize = 0;
@@ -525,20 +555,20 @@ private:
class TUnPartitionedDatasetS3ListingStrategy : public TCollectingS3ListingStrategy {
public:
TUnPartitionedDatasetS3ListingStrategy(
- const NS3Lister::IS3ListerFactory::TPtr& listerFactory,
+ const IS3ListerFactory::TPtr& listerFactory,
const IHTTPGateway::TPtr& httpGateway,
size_t minParallelism,
bool allowLocalFiles)
: TCollectingS3ListingStrategy(
[listerFactory, httpGateway, minParallelism, allowLocalFiles](
- const NS3Lister::TListingRequest& listingRequest,
+ const TListingRequest& listingRequest,
TS3ListingOptions options) {
- auto ptr = std::shared_ptr<NS3Lister::IS3Lister>(
+ auto ptr = std::shared_ptr<IS3Lister>(
new TBFSDirectoryResolverIterator{
listingRequest,
- [](const NS3Lister::TListingRequest& defaultParams,
+ [](const TListingRequest& defaultParams,
const TString& pathPrefix) {
- NS3Lister::TListingRequest request(defaultParams);
+ TListingRequest request(defaultParams);
request.Prefix = pathPrefix;
return request;
},
@@ -557,37 +587,37 @@ public:
"TUnPartitionedDatasetS3ListingStrategy") {}
};
+enum class ELimitExceededAction : ui8 { Proceed = 0, RaiseError = 1 };
+
// When data is collected result will be emitted (occurs once per iterator);
-class TConcurrentBFSDirectoryResolverIterator : public NS3Lister::IS3Lister {
+class TConcurrentBFSDirectoryResolverIterator : public IS3Lister {
public:
- using TListingRequestFactory = std::function<NS3Lister::TListingRequest(
- const NS3Lister::TListingRequest& defaultParams, const TString& pathPrefix)>;
+ using TListingRequestFactory = std::function<TListingRequest(
+ const TListingRequest& defaultParams, const TString& pathPrefix)>;
struct TSharedState {
- using TLimitExceededStateModificator =
- std::function<void(TSharedState& state, const TString& pathPrefix)>;
using TDirectoryToListMatcher =
- std::function<bool(const NS3Lister::TDirectoryListEntry& entry)>;
+ std::function<bool(const TDirectoryListEntry& entry)>;
using TEarlyStopMatcher = std::function<bool(const TSharedState& state)>;
// Initial params
- const NS3Lister::TListingRequest DefaultParams;
+ const TListingRequest DefaultParams;
const TS3ListingOptions Options;
TDirectoryS3ListingStrategy DirectoryListingStrategy;
const TListingRequestFactory ListingRequestFactory;
- const TLimitExceededStateModificator LimitExceededStateModificator;
+ const ELimitExceededAction LimitExceededAction;
const TDirectoryToListMatcher DirectoryToListMatcher;
const TEarlyStopMatcher EarlyStopMatcher;
// Mutable state
std::mutex StateLock;
std::deque<TString> DirectoryPrefixQueue;
std::list<TString> InProgressPaths;
- std::vector<TIssues> Issues;
- std::vector<NS3Lister::TObjectListEntry> Objects;
- std::vector<NS3Lister::TDirectoryListEntry> Directories;
- std::vector<TFuture<NS3Lister::TListResult>> NextDirectoryListeningChunk;
+ TMaybe<TListError> MaybeError;
+ std::vector<TObjectListEntry> Objects;
+ std::vector<TDirectoryListEntry> Directories;
+ std::vector<TFuture<TListResult>> NextDirectoryListeningChunk;
// CurrentListing
- TPromise<NS3Lister::TListResult> CurrentPromise;
+ TPromise<TListResult> CurrentPromise;
bool IsListingFinished = false;
// Configuration
const size_t Limit = 1;
@@ -597,7 +627,7 @@ public:
public:
static void ListingCallback(
const std::weak_ptr<TSharedState>& stateWeakPtr,
- const TFuture<NS3Lister::TListResult>& future,
+ const TFuture<TListResult>& future,
const TString& sourcePath
) {
auto state = stateWeakPtr.lock();
@@ -635,7 +665,7 @@ public:
} catch (std::exception& e) {
YQL_CLOG(TRACE, ProviderS3)
<< "[TConcurrentBFSDirectoryResolverIterator] An exception has happened - saving an issue";
- state->Issues.emplace_back(TIssues{TIssue{e.what()}});
+ state->MaybeError = MakeGenericError(e.what());
state->IsListingFinished = true;
}
@@ -651,18 +681,27 @@ public:
InProgressPaths.erase(pos);
Y_VERIFY(sizeBeforeRemoval == InProgressPaths.size() + 1);
}
+ void HandleLimitExceeded(const TString& sourcePath, const TListError& error) {
+ IsListingFinished = true;
+ if (LimitExceededAction == ELimitExceededAction::RaiseError) {
+ YQL_CLOG(TRACE, ProviderS3)
+ << "[TConcurrentBFSDirectoryResolverIterator] AddChunkToState listing is finished due to Limit";
+ MaybeError = error;
+ } else {
+ DirectoryPrefixQueue.push_back(sourcePath);
+ };
+ }
void AddChunkToState(
- const NS3Lister::TListResult& nextBatch,
+ const TListResult& nextBatch,
const TString& sourcePath) {
- if (std::holds_alternative<TIssues>(nextBatch)) {
- IsListingFinished = true;
- Issues.push_back(std::get<TIssues>(nextBatch));
- YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] AddChunkToState listing is finished due to Issue";
+ if (std::holds_alternative<TListError>(nextBatch)) {
+ auto& error = std::get<TListError>(nextBatch);
+ HandleLimitExceeded(sourcePath, error);
return;
}
- auto& listingResult = std::get<NS3Lister::TListEntries>(nextBatch);
+ auto& listingResult = std::get<TListEntries>(nextBatch);
auto currentListingTotalSize = InProgressPaths.size() +
DirectoryPrefixQueue.size() + Objects.size() +
@@ -670,9 +709,11 @@ public:
// Process new listing result
if (currentListingTotalSize > Limit) {
- IsListingFinished = true;
- YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] AddChunkToState listing is finished due to Limit";
- LimitExceededStateModificator(*this, sourcePath);
+ auto error = MakeLimitExceededError(
+ "TConcurrentBFSDirectoryResolverIterator",
+ Limit,
+ currentListingTotalSize);
+ HandleLimitExceeded(sourcePath, std::move(error));
return;
}
@@ -707,17 +748,13 @@ public:
void SetPromise() {
Y_ENSURE(IsListingFinished);
YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] SetPromise going to set promise";
- NS3Lister::TListResult res;
- if (!Issues.empty()) {
- auto result = TIssues{};
- for (auto& issues : Issues) {
- result.AddIssues(issues);
- }
+ TListResult res;
+ if (MaybeError) {
YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] SetPromise before set 1";
- res = std::move(result);
+ res = *MaybeError;
} else {
// TODO: add verification
- auto result = NS3Lister::TListEntries{.Objects = Objects};
+ auto result = TListEntries{.Objects = Objects};
for (auto& directoryPrefix : DirectoryPrefixQueue) {
result.Directories.push_back({.Path = directoryPrefix});
}
@@ -759,16 +796,16 @@ public:
DirectoryListingStrategy.List(listingRequest, Options)
.Subscribe(
- [prefix, self = This](const TFuture<NS3Lister::TListResult>& future) {
+ [prefix, self = This](const TFuture<TListResult>& future) {
ListingCallback(self, future, prefix);
});
}
};
using TSharedStatePtr = std::shared_ptr<TSharedState>;
static TSharedStatePtr MakeState(
- NS3Lister::TListingRequest defaultParams,
+ TListingRequest defaultParams,
TListingRequestFactory listingRequestFactory,
- TSharedState::TLimitExceededStateModificator limitExceededStateModificator,
+ ELimitExceededAction limitExceededAction,
TSharedState::TDirectoryToListMatcher directoryToListMatcher,
TSharedState::TEarlyStopMatcher earlyStopMatcher,
TS3ListingOptions options,
@@ -781,11 +818,11 @@ public:
.Options = (options),
.DirectoryListingStrategy = (std::move(directoryListingStrategy)),
.ListingRequestFactory = (std::move(listingRequestFactory)),
- .LimitExceededStateModificator = (std::move(limitExceededStateModificator)),
+ .LimitExceededAction = (std::move(limitExceededAction)),
.DirectoryToListMatcher = (std::move(directoryToListMatcher)),
.EarlyStopMatcher = (std::move(earlyStopMatcher)),
.DirectoryPrefixQueue = (std::move(initialPathPrefixes)),
- .CurrentPromise = NewPromise<NS3Lister::TListResult>(),
+ .CurrentPromise = NewPromise<TListResult>(),
.Limit = (limit),
.MaxParallelOps = maxParallelOps});
res->This = res;
@@ -793,9 +830,9 @@ public:
}
TConcurrentBFSDirectoryResolverIterator(
- NS3Lister::TListingRequest defaultParams,
+ TListingRequest defaultParams,
TListingRequestFactory listingRequestFactory,
- TSharedState::TLimitExceededStateModificator limitExceededStateModificator,
+ ELimitExceededAction limitExceededAction,
TSharedState::TDirectoryToListMatcher directoryToListMatcher,
TSharedState::TEarlyStopMatcher earlyStopMatcher,
TS3ListingOptions options,
@@ -806,7 +843,7 @@ public:
: State(MakeState(
std::move(defaultParams),
std::move(listingRequestFactory),
- std::move(limitExceededStateModificator),
+ std::move(limitExceededAction),
std::move(directoryToListMatcher),
std::move(earlyStopMatcher),
options,
@@ -815,10 +852,10 @@ public:
limit,
maxParallelOps)) { }
- TFuture<NS3Lister::TListResult> Next() override {
+ TFuture<TListResult> Next() override {
if (!First) {
- return MakeFuture(NS3Lister::TListResult{TIssues{
- TIssue{"This iterator should be finished after first iteration"}}});
+ return MakeFuture(TListResult{MakeGenericError(
+ "This iterator should be finished after first iteration")});
}
YQL_CLOG(TRACE, ProviderS3) << "[TConcurrentBFSDirectoryResolverIterator] Next before lock";
@@ -826,7 +863,7 @@ public:
First = false;
if (State->DirectoryPrefixQueue.empty()) {
- return MakeFuture(NS3Lister::TListResult{TIssues{TIssue{"No path to list"}}});
+ return MakeFuture(TListResult{MakeGenericError("No path to list")});
}
if (!State->IsListingFinished) {
@@ -854,29 +891,26 @@ class TConcurrentUnPartitionedDatasetS3ListingStrategy :
public TCollectingS3ListingStrategy {
public:
TConcurrentUnPartitionedDatasetS3ListingStrategy(
- const NS3Lister::IS3ListerFactory::TPtr& listerFactory,
+ const IS3ListerFactory::TPtr& listerFactory,
const IHTTPGateway::TPtr& httpGateway,
size_t minParallelism,
size_t maxParallelOps,
bool allowLocalFiles)
: TCollectingS3ListingStrategy(
[listerFactory, httpGateway, minParallelism, allowLocalFiles, maxParallelOps](
- const NS3Lister::TListingRequest& listingRequest,
+ const TListingRequest& listingRequest,
TS3ListingOptions options) {
- auto ptr = std::shared_ptr<NS3Lister::IS3Lister>(
+ auto ptr = std::shared_ptr<IS3Lister>(
new TConcurrentBFSDirectoryResolverIterator{
listingRequest,
- [](const NS3Lister::TListingRequest& defaultParams,
+ [](const TListingRequest& defaultParams,
const TString& pathPrefix) {
- NS3Lister::TListingRequest request(defaultParams);
+ TListingRequest request(defaultParams);
request.Prefix = pathPrefix;
return request;
},
- [](TConcurrentBFSDirectoryResolverIterator::TSharedState& state,
- const TString& pathPrefix) {
- state.DirectoryPrefixQueue.push_back(pathPrefix);
- },
- [](const NS3Lister::TDirectoryListEntry& entry) -> bool {
+ ELimitExceededAction::Proceed,
+ [](const TDirectoryListEntry& entry) -> bool {
Y_UNUSED(entry);
return true;
},
@@ -907,29 +941,25 @@ class TConcurrentPartitionedDatasetS3ListingStrategy :
public TCollectingS3ListingStrategy {
public:
TConcurrentPartitionedDatasetS3ListingStrategy(
- const NS3Lister::IS3ListerFactory::TPtr& listerFactory,
+ const IS3ListerFactory::TPtr& listerFactory,
const IHTTPGateway::TPtr& httpGateway,
size_t maxParallelOps,
bool allowLocalFiles)
: TCollectingS3ListingStrategy(
[listerFactory, httpGateway, allowLocalFiles, maxParallelOps](
- const NS3Lister::TListingRequest& listingRequest,
+ const TListingRequest& listingRequest,
TS3ListingOptions options) {
- auto ptr = std::shared_ptr<NS3Lister::IS3Lister>(
+ auto ptr = std::shared_ptr<IS3Lister>(
new TConcurrentBFSDirectoryResolverIterator{
listingRequest,
- [](const NS3Lister::TListingRequest& defaultParams,
+ [](const TListingRequest& defaultParams,
const TString& pathPrefix) {
- NS3Lister::TListingRequest request(defaultParams);
+ TListingRequest request(defaultParams);
request.Prefix = pathPrefix;
return request;
},
- [](TConcurrentBFSDirectoryResolverIterator::TSharedState& state,
- const TString& pathPrefix) {
- Y_UNUSED(pathPrefix);
- state.Issues.push_back(TIssues{MakeLimitExceededIssue()});
- },
- [](const NS3Lister::TDirectoryListEntry& entry) -> bool {
+ ELimitExceededAction::RaiseError,
+ [](const TDirectoryListEntry& entry) -> bool {
return !entry.MatchedRegexp;
},
[](const TConcurrentBFSDirectoryResolverIterator::TSharedState&
@@ -961,25 +991,25 @@ public:
explicit TLoggingS3ListingStrategy(IS3ListingStrategy::TPtr lister)
: Lister(std::move(lister)) { }
- TFuture<NS3Lister::TListResult> List(
- const NS3Lister::TListingRequest& listingRequest,
+ TFuture<TListResult> List(
+ const TListingRequest& listingRequest,
const TS3ListingOptions& options) override {
YQL_CLOG(INFO, ProviderS3) << "[TLoggingS3ListingStrategy] Going to list request "
<< listingRequest << " with options " << options;
return Lister->List(listingRequest, options)
- .Apply([start = TInstant::Now()](const TFuture<NS3Lister::TListResult>& future) {
+ .Apply([start = TInstant::Now()](const TFuture<TListResult>& future) {
auto duration = TInstant::Now() - start;
std::visit(
TOverloaded{
- [duration](const NS3Lister::TListEntries& entries) {
+ [duration](const TListEntries& entries) {
YQL_CLOG(INFO, ProviderS3)
<< "[TLoggingS3ListingStrategy] Listing took " << duration
<< " and ended with " << entries.Size() << " entries";
},
- [duration](const TIssues& issues) {
+ [duration](const TListError& error) {
YQL_CLOG(INFO, ProviderS3)
<< "[TLoggingS3ListingStrategy] Listing took " << duration
- << " and ended with " << issues.Size() << " issues";
+ << " and ended with " << error.Issues.Size() << " issues";
}},
future.GetValueSync());
@@ -994,7 +1024,7 @@ private:
IS3ListingStrategy::TPtr MakeS3ListingStrategy(
const IHTTPGateway::TPtr& httpGateway,
- const NS3Lister::IS3ListerFactory::TPtr& listerFactory,
+ const IS3ListerFactory::TPtr& listerFactory,
ui64 minDesiredDirectoriesOfFilesPerQuery,
size_t maxParallelOps,
bool allowLocalFiles) {
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp
index 98bc232a1e6..2236550cf94 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp
@@ -51,12 +51,13 @@ Y_UNIT_TEST_SUITE(TCollectingS3ListingStrategyTests) {
void UnitAssertListResultEquals(
const NS3Lister::TListResult& expected, const NS3Lister::TListResult& actual) {
UNIT_ASSERT_VALUES_EQUAL(expected.index(), actual.index());
- if (std::holds_alternative<TIssues>(expected)) {
- const auto& expectedIssues = std::get<TIssues>(expected);
- const auto& actualIssues = std::get<TIssues>(actual);
- UNIT_ASSERT_VALUES_EQUAL(expectedIssues.Size(), actualIssues.Size());
+ if (std::holds_alternative<TListError>(expected)) {
+ const auto& expectedError = std::get<TListError>(expected);
+ const auto& actualError = std::get<TListError>(actual);
+ UNIT_ASSERT_VALUES_EQUAL(expectedError.Type, actualError.Type);
+ UNIT_ASSERT_VALUES_EQUAL(expectedError.Issues.Size(), actualError.Issues.Size());
UNIT_ASSERT_VALUES_EQUAL(
- expectedIssues.ToOneLineString(), actualIssues.ToOneLineString());
+ expectedError.Issues.ToOneLineString(), actualError.Issues.ToOneLineString());
return;
}
@@ -158,7 +159,7 @@ Y_UNIT_TEST(IfThereAreMoreRecordsThanSpecifiedByLimitShouldReturnError) {
auto actualResultFuture = strategy.List(
NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"}, TS3ListingOptions{.MaxResultSet = 1});
- auto expectedResult = NS3Lister::TListResult{TIssues{MakeLimitExceededIssue()}};
+ auto expectedResult = NS3Lister::TListResult{MakeLimitExceededError("TTest", 1, 2)};
const auto& actualResult = actualResultFuture.GetValue();
UnitAssertListResultEquals(expectedResult, actualResult);
}
@@ -177,13 +178,13 @@ Y_UNIT_TEST(IfAnyIterationReturnIssueThanWholeStrategyShouldReturnIt) {
.Path = "a/a",
.Size = 10,
}}},
- TIssues{TIssue("TEST_ISSUE")}})));
+ MakeGenericError("TEST_ISSUE")})));
},
"TTest"};
auto actualResultFuture = strategy.List(
NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"}, TS3ListingOptions{.MaxResultSet = 1});
- auto expectedResult = NS3Lister::TListResult{TIssues{TIssue("TEST_ISSUE")}};
+ auto expectedResult = NS3Lister::TListResult{MakeGenericError("TEST_ISSUE")};
const auto& actualResult = actualResultFuture.GetValue();
UnitAssertListResultEquals(expectedResult, actualResult);
}
@@ -202,7 +203,7 @@ Y_UNIT_TEST(IfExceptionIsReturnedFromIteratorThanItShouldCovertItToIssue) {
auto actualResultFuture = strategy.List(
NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"}, TS3ListingOptions{.MaxResultSet = 10});
UNIT_ASSERT(actualResultFuture.HasValue());
- auto expectedResult = NS3Lister::TListResult{TIssues{TIssue("EXCEPTION MESSAGE")}};
+ auto expectedResult = NS3Lister::TListResult{MakeGenericError("EXCEPTION MESSAGE")};
const auto& actualResult = actualResultFuture.GetValue();
UnitAssertListResultEquals(expectedResult, actualResult);
}