aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorauzhegov <auzhegov@yandex-team.com>2023-02-22 14:36:34 +0300
committerauzhegov <auzhegov@yandex-team.com>2023-02-22 14:36:34 +0300
commit3d9a59583ae857ecc45c709288c5d0e9972fe029 (patch)
treebe62aee52e222ab63d1314bf2d22937fe94dc818
parentd7c89e8978179791514520aca563a81093f35013 (diff)
downloadydb-3d9a59583ae857ecc45c709288c5d0e9972fe029.tar.gz
Поддержка большого числа файлов в s3
-rw-r--r--ydb/core/yq/libs/init/init.cpp11
-rw-r--r--ydb/library/yql/providers/common/proto/gateways_config.proto2
-rw-r--r--ydb/library/yql/providers/s3/CMakeLists.txt1
-rw-r--r--ydb/library/yql/providers/s3/actors/CMakeLists.darwin.txt1
-rw-r--r--ydb/library/yql/providers/s3/actors/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt1
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp715
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h2
-rw-r--r--ydb/library/yql/providers/s3/object_listers/CMakeLists.darwin.txt23
-rw-r--r--ydb/library/yql/providers/s3/object_listers/CMakeLists.linux-aarch64.txt24
-rw-r--r--ydb/library/yql/providers/s3/object_listers/CMakeLists.linux.txt24
-rw-r--r--ydb/library/yql/providers/s3/object_listers/CMakeLists.txt15
-rw-r--r--ydb/library/yql/providers/s3/object_listers/yql_s3_future_algorithms.h102
-rw-r--r--ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp443
-rw-r--r--ydb/library/yql/providers/s3/object_listers/yql_s3_list.h100
-rw-r--r--ydb/library/yql/providers/s3/object_listers/yql_s3_path.cpp (renamed from ydb/library/yql/providers/s3/provider/yql_s3_path.cpp)0
-rw-r--r--ydb/library/yql/providers/s3/object_listers/yql_s3_path.h (renamed from ydb/library/yql/providers/s3/provider/yql_s3_path.h)0
-rw-r--r--ydb/library/yql/providers/s3/object_listers/yql_s3_path_ut.cpp (renamed from ydb/library/yql/providers/s3/provider/yql_s3_path_ut.cpp)2
-rw-r--r--ydb/library/yql/providers/s3/provider/CMakeLists.darwin.txt4
-rw-r--r--ydb/library/yql/providers/s3/provider/CMakeLists.linux-aarch64.txt4
-rw-r--r--ydb/library/yql/providers/s3/provider/CMakeLists.linux.txt4
-rw-r--r--ydb/library/yql/providers/s3/provider/ut/CMakeLists.darwin.txt8
-rw-r--r--ydb/library/yql/providers/s3/provider/ut/CMakeLists.linux-aarch64.txt8
-rw-r--r--ydb/library/yql/providers/s3/provider/ut/CMakeLists.linux.txt8
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp2
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp18
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp277
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_list.cpp330
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_list.h41
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp724
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.h78
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp216
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp4
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp2
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_settings.h2
35 files changed, 2650 insertions, 547 deletions
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp
index bb3133b869..97930d951c 100644
--- a/ydb/core/yq/libs/init/init.cpp
+++ b/ydb/core/yq/libs/init/init.cpp
@@ -175,6 +175,17 @@ void Init(
if (const ui64 dataInflight = s3readConfig.GetDataInflight()) {
readActorFactoryCfg.DataInflight = dataInflight;
}
+ for (auto& formatSizeLimit: protoConfig.GetGateways().GetS3().GetFormatSizeLimit()) {
+ if (formatSizeLimit.GetName()) { // ignore unnamed limits
+ readActorFactoryCfg.FormatSizeLimits.emplace(
+ formatSizeLimit.GetName(), formatSizeLimit.GetFileSizeLimit());
+ }
+ }
+ if (protoConfig.GetGateways().GetS3().HasFileSizeLimit()) {
+ readActorFactoryCfg.FileSizeLimit =
+ protoConfig.GetGateways().GetS3().GetFileSizeLimit();
+ }
+
RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode());
RegisterYdbReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory);
RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway, s3HttpRetryPolicy, readActorFactoryCfg,
diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto
index 0a9612c687..2726661e48 100644
--- a/ydb/library/yql/providers/common/proto/gateways_config.proto
+++ b/ydb/library/yql/providers/common/proto/gateways_config.proto
@@ -381,6 +381,8 @@ message TS3GatewayConfig {
optional uint64 MaxReadSizePerQuery = 4;
optional uint64 MaxDiscoveryFilesPerQuery = 5;
optional uint64 MaxInflightListsPerQuery = 7;
+ optional uint64 MaxDirectoriesAndFilesPerQuery = 8;
+ optional uint64 MinDesiredDirectoriesOfFilesPerQuery = 9;
repeated TAttr DefaultSettings = 100;
}
diff --git a/ydb/library/yql/providers/s3/CMakeLists.txt b/ydb/library/yql/providers/s3/CMakeLists.txt
index bcd41e3119..26b1666177 100644
--- a/ydb/library/yql/providers/s3/CMakeLists.txt
+++ b/ydb/library/yql/providers/s3/CMakeLists.txt
@@ -10,6 +10,7 @@ add_subdirectory(actors)
add_subdirectory(common)
add_subdirectory(compressors)
add_subdirectory(expr_nodes)
+add_subdirectory(object_listers)
add_subdirectory(path_generator)
add_subdirectory(proto)
add_subdirectory(provider)
diff --git a/ydb/library/yql/providers/s3/actors/CMakeLists.darwin.txt b/ydb/library/yql/providers/s3/actors/CMakeLists.darwin.txt
index 30b8f4cc17..aa908c5da1 100644
--- a/ydb/library/yql/providers/s3/actors/CMakeLists.darwin.txt
+++ b/ydb/library/yql/providers/s3/actors/CMakeLists.darwin.txt
@@ -37,6 +37,7 @@ target_link_libraries(providers-s3-actors PUBLIC
common-arrow-interface
providers-s3-common
providers-s3-compressors
+ providers-s3-object_listers
providers-s3-proto
clickhouse_client_udf
providers-s3-range_helpers
diff --git a/ydb/library/yql/providers/s3/actors/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/s3/actors/CMakeLists.linux-aarch64.txt
index 6971d9cdc8..387fada651 100644
--- a/ydb/library/yql/providers/s3/actors/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/providers/s3/actors/CMakeLists.linux-aarch64.txt
@@ -38,6 +38,7 @@ target_link_libraries(providers-s3-actors PUBLIC
common-arrow-interface
providers-s3-common
providers-s3-compressors
+ providers-s3-object_listers
providers-s3-proto
clickhouse_client_udf
providers-s3-range_helpers
diff --git a/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt b/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt
index 6971d9cdc8..387fada651 100644
--- a/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt
+++ b/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt
@@ -38,6 +38,7 @@ target_link_libraries(providers-s3-actors PUBLIC
common-arrow-interface
providers-s3-common
providers-s3-compressors
+ providers-s3-object_listers
providers-s3-proto
clickhouse_client_udf
providers-s3-range_helpers
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 101ee0fee5..4976a9c1e6 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -52,6 +52,7 @@
#include <ydb/library/yql/providers/s3/common/util.h>
#include <ydb/library/yql/providers/s3/compressors/factory.h>
+#include <ydb/library/yql/providers/s3/object_listers/yql_s3_list.h>
#include <ydb/library/yql/providers/s3/proto/range.pb.h>
#include <ydb/library/yql/providers/s3/range_helpers/path_list_reader.h>
#include <ydb/library/yql/providers/s3/serializations/serialization_interval.h>
@@ -108,6 +109,9 @@ namespace NYql::NDq {
using namespace ::NActors;
using namespace ::NYql::NS3Details;
+using ::NYql::NS3Lister::ES3PatternVariant;
+using ::NYql::NS3Lister::ES3PatternType;
+
namespace {
constexpr TDuration MEMORY_USAGE_REPORT_PERIOD = TDuration::Seconds(10);
@@ -120,6 +124,15 @@ struct TS3ReadError : public yexception {
using yexception::yexception;
};
+struct TObjectPath {
+ TString Path;
+ size_t Size;
+ size_t PathIndex;
+
+ TObjectPath(TString path, size_t size, size_t pathIndex)
+ : Path(std::move(path)), Size(size), PathIndex(pathIndex) { }
+};
+
struct TEvPrivate {
// Event ids
enum EEv : ui32 {
@@ -138,6 +151,8 @@ struct TEvPrivate {
EvPause,
EvContinue,
EvFutureResolved,
+ EvObjectPathBatch,
+ EvObjectPathReadError,
EvEnd
};
@@ -146,15 +161,20 @@ struct TEvPrivate {
// Events
struct TEvReadResult : public TEventLocal<TEvReadResult, EvReadResult> {
- TEvReadResult(IHTTPGateway::TContent&& result, const TString& requestId, size_t pathInd)
+ TEvReadResult(
+ IHTTPGateway::TContent&& result,
+ const TString& requestId,
+ size_t pathInd,
+ TString path)
: Result(std::move(result))
, RequestId(requestId)
, PathIndex(pathInd)
- {}
+ , Path(std::move(path)) { }
IHTTPGateway::TContent Result;
const TString RequestId;
const size_t PathIndex;
+ const TString Path;
};
struct TEvDataPart : public TEventLocal<TEvDataPart, EvDataPart> {
@@ -188,15 +208,20 @@ struct TEvPrivate {
};
struct TEvReadError : public TEventLocal<TEvReadError, EvReadError> {
- TEvReadError(TIssues&& error, const TString& requestId, size_t pathInd = std::numeric_limits<size_t>::max())
+ TEvReadError(
+ TIssues&& error,
+ const TString& requestId,
+ size_t pathInd,
+ TString path)
: Error(std::move(error))
, RequestId(requestId)
, PathIndex(pathInd)
- {}
+ , Path(std::move(path)) { }
const TIssues Error;
const TString RequestId;
const size_t PathIndex;
+ const TString Path;
};
struct TEvRetryEventFunc : public NActors::TEventLocal<TEvRetryEventFunc, EvRetry> {
@@ -230,10 +255,393 @@ struct TEvPrivate {
struct TEvContinue : public NActors::TEventLocal<TEvContinue, EvContinue> {
};
+ struct TEvObjectPathBatch :
+ public NActors::TEventLocal<TEvObjectPathBatch, EvObjectPathBatch> {
+ std::vector<TObjectPath> ObjectPaths;
+ bool NoMoreFiles = false;
+ TEvObjectPathBatch(
+ std::vector<TObjectPath> objectPaths, bool noMoreFiles)
+ : ObjectPaths(std::move(objectPaths)), NoMoreFiles(noMoreFiles) { }
+ };
+
+ struct TEvObjectPathReadError :
+ public NActors::TEventLocal<TEvObjectPathReadError, EvObjectPathReadError> {
+ TIssues Issues;
+ TEvObjectPathReadError(TIssues issues) : Issues(std::move(issues)) { }
+ };
};
using namespace NKikimr::NMiniKQL;
+class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> {
+public:
+ static constexpr char ActorName[] = "YQ_S3_FILE_QUEUE_ACTOR";
+
+ struct TEvPrivatePrivate {
+ enum {
+ EvGetNextFile = EventSpaceBegin(TEvents::ES_PRIVATE),
+ EvNextListingChunkReceived,
+ EvEnd
+ };
+ static_assert(
+ EvEnd <= EventSpaceEnd(TEvents::ES_PRIVATE),
+ "expected EvEnd <= EventSpaceEnd(TEvents::ES_PRIVATE)");
+
+ struct TEvGetNextFile : public TEventLocal<TEvGetNextFile, EvGetNextFile> {
+ size_t RequestedAmount = 1;
+ TEvGetNextFile(size_t requestedAmount) : RequestedAmount(requestedAmount){};
+ };
+ struct TEvNextListingChunkReceived :
+ public TEventLocal<TEvNextListingChunkReceived, EvNextListingChunkReceived> {
+ NS3Lister::TListResult ListingResult;
+ TEvNextListingChunkReceived(NS3Lister::TListResult listingResult)
+ : ListingResult(std::move(listingResult)){};
+ };
+ };
+ using TBase = TActorBootstrapped<TS3FileQueueActor>;
+
+ TS3FileQueueActor(
+ TTxId txId,
+ TPathList paths,
+ size_t prefetchSize,
+ ui64 fileSizeLimit,
+ IHTTPGateway::TPtr gateway,
+ TString url,
+ TString token,
+ TString pattern,
+ ES3PatternVariant patternVariant,
+ ES3PatternType patternType)
+ : TxId(std::move(txId))
+ , PrefetchSize(prefetchSize)
+ , FileSizeLimit(fileSizeLimit)
+ , MaybeIssues(Nothing())
+ , Gateway(std::move(gateway))
+ , Url(std::move(url))
+ , Token(std::move(token))
+ , Pattern(std::move(pattern))
+ , PatternVariant(patternVariant)
+ , PatternType(patternType) {
+ for (size_t i = 0; i < paths.size(); ++i) {
+ if (paths[i].IsDirectory) {
+ Directories.emplace_back(paths[i].Path, 0, i);
+ } else {
+ Objects.emplace_back(paths[i].Path, paths[i].Size, i);
+ }
+ }
+ }
+
+ void Bootstrap() {
+ if (Directories.empty()) {
+ LOG_I("TS3FileQueueActor", "Bootstrap there is no directories to list");
+ Become(&TS3FileQueueActor::NoMoreDirectoriesState);
+ } else {
+ LOG_I("TS3FileQueueActor", "Bootstrap there are directories to list");
+ TryPreFetch();
+ Become(&TS3FileQueueActor::ThereAreDirectoriesToListState);
+ }
+ }
+
+ STATEFN(ThereAreDirectoriesToListState) {
+ try {
+ switch (const auto etype = ev->GetTypeRewrite()) {
+ hFunc(TEvPrivatePrivate::TEvGetNextFile, HandleGetNextFile);
+ hFunc(TEvPrivatePrivate::TEvNextListingChunkReceived, HandleNextListingChunkReceived);
+ cFunc(TEvents::TSystem::Poison, PassAway);
+ default:
+ MaybeIssues = TIssues{TIssue{TStringBuilder() << "An event with unknown type has been received: '" << etype << "'"}};
+ TransitToErrorState();
+ break;
+ }
+ } catch (const std::exception& e) {
+ MaybeIssues = TIssues{TIssue{TStringBuilder() << "An unknown exception has occurred: '" << e.what() << "'"}};
+ TransitToErrorState();
+ }
+ }
+
+ void HandleGetNextFile(TEvPrivatePrivate::TEvGetNextFile::TPtr& ev) {
+ auto requestAmount = ev->Get()->RequestedAmount;
+ LOG_D("TS3FileQueueActor", "HandleGetNextFile requestAmount:" << requestAmount);
+ if (Objects.size() > requestAmount) {
+ LOG_D("TS3FileQueueActor", "HandleGetNextFile sending right away");
+ SendObjects(ev->Sender, requestAmount);
+ TryPreFetch();
+ } else {
+ LOG_D("TS3FileQueueActor", "HandleGetNextFile have not enough objects cached. Start fetching");
+ RequestQueue.emplace_back(ev->Sender, requestAmount);
+ TryFetch();
+ }
+ }
+
+ void HandleNextListingChunkReceived(TEvPrivatePrivate::TEvNextListingChunkReceived::TPtr& ev) {
+ Y_ENSURE(FetchingInProgress());
+ ListingFuture = Nothing();
+ LOG_D("TS3FileQueueActor", "HandleNextListingChunkReceived");
+ if (SaveRetrievedResults(ev->Get()->ListingResult)) {
+ AnswerPendingRequests();
+ if (RequestQueue.empty()) {
+ LOG_D("TS3FileQueueActor", "HandleNextListingChunkReceived RequestQueue is empty. Trying to prefetch");
+ TryPreFetch();
+ } else {
+ LOG_D("TS3FileQueueActor", "HandleNextListingChunkReceived RequestQueue is not empty. Fetching more objects");
+ TryFetch();
+ }
+ } else {
+ TransitToErrorState();
+ }
+ }
+
+ bool SaveRetrievedResults(const NS3Lister::TListResult& listingResult) {
+ LOG_T("TS3FileQueueActor", "SaveRetrievedResults");
+ if (std::holds_alternative<TIssues>(listingResult)) {
+ MaybeIssues = std::get<TIssues>(listingResult);
+ return false;
+ }
+
+ auto listingChunk = std::get<NS3Lister::TListEntries>(listingResult);
+ LOG_D("TS3FileQueueActor", "SaveRetrievedResults saving: " << listingChunk.Objects.size() << " entries");
+ Y_ENSURE(listingChunk.Directories.empty());
+ for (auto& object: listingChunk.Objects) {
+ if (object.Path.EndsWith('/')) {
+ // skip 'directories'
+ continue;
+ }
+ if (object.Size > FileSizeLimit) {
+ auto errorMessage = TStringBuilder()
+ << "Size of object " << object.Path << " = "
+ << object.Size
+ << " and exceeds limit = " << FileSizeLimit;
+ LOG_E("TS3FileQueueActor", errorMessage);
+ MaybeIssues = TIssues{TIssue{errorMessage}};
+ return false;
+ }
+ LOG_T("TS3FileQueueActor", "SaveRetrievedResults adding path: " << object.Path);
+ Objects.emplace_back(object.Path, object.Size, CurrentDirectoryPathIndex);
+ }
+ return true;
+ }
+
+ void AnswerPendingRequests() {
+ while (!RequestQueue.empty()) {
+ auto requestToFulfil = std::find_if(
+ RequestQueue.begin(),
+ RequestQueue.end(),
+ [this](auto& val) { return val.second <= Objects.size(); });
+
+ if (requestToFulfil != RequestQueue.end()) {
+ auto [actorId, requestedAmount] = *requestToFulfil;
+ LOG_T(
+ "TS3FileQueueActor",
+ "AnswerPendingRequests responding to "
+ << requestToFulfil->first << " with " << requestToFulfil->second
+ << " items");
+ SendObjects(actorId, requestedAmount);
+ RequestQueue.erase(requestToFulfil);
+ } else {
+ LOG_T(
+ "TS3FileQueueActor",
+ "AnswerPendingRequests no more pending requests to fulfil");
+ break;
+ }
+ }
+ }
+
+ bool FetchingInProgress() const { return ListingFuture.Defined(); }
+
+ void TransitToNoMoreDirectoriesToListState() {
+ LOG_I("TS3FileQueueActor", "TransitToNoMoreDirectoriesToListState no more directories to list");
+ for (auto& [requestorId, size]: RequestQueue) {
+ SendObjects(requestorId, size);
+ }
+ RequestQueue.clear();
+ Become(&TS3FileQueueActor::NoMoreDirectoriesState);
+ }
+
+ void TransitToErrorState() {
+ Y_ENSURE(MaybeIssues.Defined());
+ LOG_I("TS3FileQueueActor", "TransitToErrorState an error occurred sending ");
+ for (auto& [requestorId, _]: RequestQueue) {
+ Send(
+ requestorId,
+ std::make_unique<TEvPrivate::TEvObjectPathReadError>(*MaybeIssues));
+ }
+ RequestQueue.clear();
+ Objects.clear();
+ Directories.clear();
+ Become(&TS3FileQueueActor::AnErrorOccurredState);
+ }
+
+ STATEFN(NoMoreDirectoriesState) {
+ try {
+ switch (const auto etype = ev->GetTypeRewrite()) {
+ hFunc(TEvPrivatePrivate::TEvGetNextFile, HandleGetNextFileForEmptyState);
+ cFunc(TEvents::TSystem::Poison, PassAway);
+ default:
+ MaybeIssues = TIssues{TIssue{TStringBuilder() << "An event with unknown type has been received: '" << etype << "'"}};
+ TransitToErrorState();
+ break;
+ }
+ } catch (const std::exception& e) {
+ MaybeIssues = TIssues{TIssue{TStringBuilder() << "An unknown exception has occurred: '" << e.what() << "'"}};
+ TransitToErrorState();
+ }
+ }
+
+ void HandleGetNextFileForEmptyState(TEvPrivatePrivate::TEvGetNextFile::TPtr& ev) {
+ LOG_D("TS3FileQueueActor", "HandleGetNextFileForEmptyState Giving away rest of Objects");
+ SendObjects(ev->Sender, ev->Get()->RequestedAmount);
+ }
+
+ STATEFN(AnErrorOccurredState) {
+ try {
+ switch (const auto etype = ev->GetTypeRewrite()) {
+ hFunc(TEvPrivatePrivate::TEvGetNextFile, HandleGetNextFileForErrorState);
+ cFunc(TEvents::TSystem::Poison, PassAway);
+ default:
+ MaybeIssues = TIssues{TIssue{TStringBuilder() << "An event with unknown type has been received: '" << etype << "'"}};
+ break;
+ }
+ } catch (const std::exception& e) {
+ MaybeIssues = TIssues{TIssue{TStringBuilder() << "An unknown exception has occurred: '" << e.what() << "'"}};
+ }
+ }
+
+ void HandleGetNextFileForErrorState(TEvPrivatePrivate::TEvGetNextFile::TPtr& ev) {
+ LOG_D(
+ "TS3FileQueueActor",
+ "HandleGetNextFileForErrorState Giving away rest of Objects");
+ Send(ev->Sender, std::make_unique<TEvPrivate::TEvObjectPathReadError>(*MaybeIssues));
+ }
+
+ void PassAway() override {
+ if (!MaybeIssues.Defined()) {
+ for (auto& [requestorId, size]: RequestQueue) {
+ SendObjects(requestorId, size);
+ }
+ } else {
+ for (auto& [requestorId, _]: RequestQueue) {
+ Send(
+ requestorId,
+ std::make_unique<TEvPrivate::TEvObjectPathReadError>(*MaybeIssues));
+ }
+ }
+
+ RequestQueue.clear();
+ Objects.clear();
+ Directories.clear();
+ TBase::PassAway();
+ }
+
+private:
+ void SendObjects(const TActorId& recipient, size_t amount) {
+ Y_ENSURE(!MaybeIssues.Defined());
+ size_t correctedAmount = std::min(amount, Objects.size());
+ std::vector<TObjectPath> result;
+ if (correctedAmount != 0) {
+ result.reserve(correctedAmount);
+ for (size_t i = 0; i < correctedAmount; ++i) {
+ result.push_back(Objects.back());
+ Objects.pop_back();
+ }
+ }
+
+ LOG_T(
+ "TS3FileQueueActor",
+ "SendObjects amount: " << amount << " correctedAmount: " << correctedAmount
+ << " result size: " << result.size());
+
+ Send(
+ recipient,
+ std::make_unique<TEvPrivate::TEvObjectPathBatch>(
+ std::move(result), HasNoMoreItems()));
+ }
+ bool HasNoMoreItems() const {
+ return !(MaybeLister.Defined() && (*MaybeLister)->HasNext()) &&
+ Directories.empty() && Objects.empty();
+ }
+
+ bool TryPreFetch () {
+ if (Objects.size() < PrefetchSize) {
+ return TryFetch();
+ }
+ return false;
+ }
+ bool TryFetch() {
+ if (FetchingInProgress()) {
+ LOG_D("TS3FileQueueActor", "TryFetch fetching already in progress");
+ return true;
+ }
+
+ if (MaybeLister.Defined() && (*MaybeLister)->HasNext()) {
+ LOG_D("TS3FileQueueActor", "TryFetch fetching from current lister");
+ Fetch();
+ return true;
+ }
+
+ if (!Directories.empty()) {
+ LOG_D("TS3FileQueueActor", "TryFetch fetching from new lister");
+
+ auto [path, size, pathIndex] = Directories.back();
+ Directories.pop_back();
+ CurrentDirectoryPathIndex = pathIndex;
+ MaybeLister = NS3Lister::MakeS3Lister(
+ Gateway,
+ NS3Lister::TListingRequest{
+ Url,
+ Token,
+ PatternVariant == ES3PatternVariant::PathPattern
+ ? Pattern
+ : TStringBuilder{} << path << Pattern,
+ PatternType,
+ path},
+ Nothing(),
+ false);
+ Fetch();
+ return true;
+ }
+
+ LOG_D("TS3FileQueueActor", "TryFetch couldn't start fetching");
+ MaybeLister = Nothing();
+ TransitToNoMoreDirectoriesToListState();
+ return false;
+ }
+ void Fetch() {
+ Y_ENSURE(!ListingFuture.Defined());
+ Y_ENSURE(MaybeLister.Defined());
+ NActors::TActorSystem* actorSystem = NActors::TActivationContext::ActorSystem();
+ ListingFuture =
+ (*MaybeLister)
+ ->Next()
+ .Subscribe([actorSystem, selfId = SelfId()](
+ const NThreading::TFuture<NS3Lister::TListResult>& future) {
+ actorSystem->Send(
+ selfId,
+ new TEvPrivatePrivate::TEvNextListingChunkReceived(
+ future.GetValue()));
+ });
+ }
+
+private:
+ const TTxId TxId;
+
+ std::vector<TObjectPath> Objects;
+ std::vector<TObjectPath> Directories;
+
+ size_t PrefetchSize;
+ ui64 FileSizeLimit;
+ TMaybe<NS3Lister::IS3Lister::TPtr> MaybeLister = Nothing();
+ TMaybe<NThreading::TFuture<NS3Lister::TListResult>> ListingFuture;
+ size_t CurrentDirectoryPathIndex = 0;
+ std::deque<std::pair<TActorId, size_t>> RequestQueue;
+ TMaybe<TIssues> MaybeIssues;
+
+ const IHTTPGateway::TPtr Gateway;
+ const TString Url;
+ const TString Token;
+ const TString Pattern;
+ const ES3PatternVariant PatternVariant;
+ const ES3PatternType PatternType;
+};
+
class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeActorAsyncInput {
public:
TS3ReadActor(ui64 inputIndex,
@@ -242,6 +650,8 @@ public:
const THolderFactory& holderFactory,
const TString& url,
const TString& token,
+ const TString& pattern,
+ ES3PatternVariant patternVariant,
TPathList&& paths,
bool addPathIndex,
ui64 startPathIndex,
@@ -250,8 +660,9 @@ public:
const IRetryPolicy<long>::TPtr& retryPolicy,
const TS3ReadActorFactoryConfig& readActorFactoryCfg,
::NMonitoring::TDynamicCounterPtr counters,
- ::NMonitoring::TDynamicCounterPtr taskCounters
- ) : ReadActorFactoryCfg(readActorFactoryCfg)
+ ::NMonitoring::TDynamicCounterPtr taskCounters,
+ ui64 fileSizeLimit)
+ : ReadActorFactoryCfg(readActorFactoryCfg)
, Gateway(std::move(gateway))
, HolderFactory(holderFactory)
, InputIndex(inputIndex)
@@ -261,13 +672,15 @@ public:
, ActorSystem(TActivationContext::ActorSystem())
, Url(url)
, Token(token)
+ , Pattern(pattern)
+ , PatternVariant(patternVariant)
, Paths(std::move(paths))
, AddPathIndex(addPathIndex)
, StartPathIndex(startPathIndex)
, SizeLimit(sizeLimit)
, Counters(counters)
, TaskCounters(taskCounters)
- {
+ , FileSizeLimit(fileSizeLimit) {
if (Counters) {
QueueDataSize = Counters->GetCounter("QueueDataSize");
QueueDataLimit = Counters->GetCounter("QueueDataLimit");
@@ -283,14 +696,23 @@ public:
void Bootstrap() {
LOG_D("TS3ReadActor", "Bootstrap" << ", InputIndex: " << InputIndex);
+ FileQueueActor = RegisterWithSameMailbox(new TS3FileQueueActor{
+ TxId,
+ std::move(Paths),
+ ReadActorFactoryCfg.MaxInflight * 2,
+ FileSizeLimit,
+ Gateway,
+ Url,
+ Token,
+ Pattern,
+ PatternVariant,
+ ES3PatternType::Wildcard});
+ SendPathRequest();
Become(&TS3ReadActor::StateFunc);
- while (TryStartDownload()) {
-
- }
}
bool TryStartDownload() {
- if (CurrentPathIndex >= Paths.size()) {
+ if (ObjectPathCache.empty()) {
// no path is pending
return false;
}
@@ -303,19 +725,37 @@ public:
return false;
}
- StartDownload(CurrentPathIndex++);
+ StartDownload();
return true;
}
- void StartDownload(size_t index) {
+ void StartDownload() {
DownloadInflight++;
- const TPath& path = Paths[index];
- auto url = Url + path.Path;
+ const auto& [path, size, index] = ReadPathFromCache();
+ auto url = Url + path;
auto id = index + StartPathIndex;
const TString requestId = CreateGuidAsString();
LOG_D("TS3ReadActor", "Download: " << url << ", ID: " << id << ", request id: [" << requestId << "]");
- Gateway->Download(url, MakeHeaders(Token, requestId), 0U, std::min(path.Size, SizeLimit),
- std::bind(&TS3ReadActor::OnDownloadFinished, ActorSystem, SelfId(), requestId, std::placeholders::_1, id), {}, RetryPolicy);
+ Gateway->Download(url, MakeHeaders(Token, requestId), 0U, std::min(size, SizeLimit),
+ std::bind(&TS3ReadActor::OnDownloadFinished, ActorSystem, SelfId(), requestId, std::placeholders::_1, id, path), {}, RetryPolicy);
+ }
+
+ TObjectPath ReadPathFromCache() {
+ Y_ENSURE(!ObjectPathCache.empty());
+ auto object = ObjectPathCache.back();
+ ObjectPathCache.pop_back();
+ if (ObjectPathCache.empty() && !IsObjectQueueEmpty) {
+ SendPathRequest();
+ }
+ return object;
+ }
+ void SendPathRequest() {
+ Y_ENSURE(!IsWaitingObjectQueueResponse);
+ Send(
+ FileQueueActor,
+ std::make_unique<TS3FileQueueActor::TEvPrivatePrivate::TEvGetNextFile>(
+ ReadActorFactoryCfg.MaxInflight));
+ IsWaitingObjectQueueResponse = true;
}
static constexpr char ActorName[] = "S3_READ_ACTOR";
@@ -333,15 +773,35 @@ private:
STRICT_STFUNC(StateFunc,
hFunc(TEvPrivate::TEvReadResult, Handle);
hFunc(TEvPrivate::TEvReadError, Handle);
+ hFunc(TEvPrivate::TEvObjectPathBatch, HandleObjectPathBatch);
+ hFunc(TEvPrivate::TEvObjectPathReadError, HandleObjectPathReadError);
)
- static void OnDownloadFinished(TActorSystem* actorSystem, TActorId selfId, const TString& requestId, IHTTPGateway::TResult&& result, size_t pathInd) {
+ void HandleObjectPathBatch(TEvPrivate::TEvObjectPathBatch::TPtr& objectPathBatch) {
+ Y_ENSURE(IsWaitingObjectQueueResponse);
+ IsWaitingObjectQueueResponse = false;
+ ListedFiles += objectPathBatch->Get()->ObjectPaths.size();
+ IsObjectQueueEmpty = objectPathBatch->Get()->NoMoreFiles;
+ ObjectPathCache.insert(
+ ObjectPathCache.end(),
+ std::make_move_iterator(objectPathBatch->Get()->ObjectPaths.begin()),
+ std::make_move_iterator(objectPathBatch->Get()->ObjectPaths.end()));
+ while (TryStartDownload()) {}
+ }
+ void HandleObjectPathReadError(TEvPrivate::TEvObjectPathReadError::TPtr& result) {
+ IsObjectQueueEmpty = true;
+ LOG_E("TS3ReadActor", "Error while object listing, details: TEvObjectPathReadError: " << result->Get()->Issues.ToOneLineString());
+ auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "Error while object listing", TIssues{result->Get()->Issues});
+ Send(ComputeActorId, new TEvAsyncInputError(InputIndex, issues, NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
+ }
+
+ static void OnDownloadFinished(TActorSystem* actorSystem, TActorId selfId, const TString& requestId, IHTTPGateway::TResult&& result, size_t pathInd, const TString path) {
switch (result.index()) {
case 0U:
- actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadResult(std::get<IHTTPGateway::TContent>(std::move(result)), requestId, pathInd)));
+ actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadResult(std::get<IHTTPGateway::TContent>(std::move(result)), requestId, pathInd, path)));
return;
case 1U:
- actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadError(std::get<TIssues>(std::move(result)), requestId, pathInd)));
+ actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadError(std::get<TIssues>(std::move(result)), requestId, pathInd, path)));
return;
default:
break;
@@ -381,18 +841,21 @@ private:
} while (!Blocks.empty() && freeSpace > 0LL);
}
- if (Blocks.empty() && IsDoneCounter == Paths.size()) {
+ if (LastFileWasProcessed()) {
finished = true;
ContainerCache.Clear();
}
return total;
}
+ bool LastFileWasProcessed() const {
+ return Blocks.empty() && (ListedFiles == CompletedFiles) && IsObjectQueueEmpty;
+ }
void Handle(TEvPrivate::TEvReadResult::TPtr& result) {
- ++IsDoneCounter;
+ ++CompletedFiles;
const auto id = result->Get()->PathIndex;
- const auto path = Paths[id - StartPathIndex].Path;
+ const auto path = result->Get()->Path;
const auto httpCode = result->Get()->Result.HttpResponseCode;
const auto requestId = result->Get()->RequestId;
IngressBytes += result->Get()->Result.size();
@@ -424,10 +887,10 @@ private:
}
void Handle(TEvPrivate::TEvReadError::TPtr& result) {
- ++IsDoneCounter;
+ ++CompletedFiles;
auto id = result->Get()->PathIndex;
const auto requestId = result->Get()->RequestId;
- const auto path = Paths[id - StartPathIndex].Path;
+ const auto path = result->Get()->Path;
LOG_W("TS3ReadActor", "Error while reading file " << path << ", details: ID: " << id << ", TEvReadError: " << result->Get()->Error.ToOneLineString() << ", request id: [" << requestId << "]");
auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "Error while reading file " << path << " with request id [" << requestId << "]", TIssues{result->Get()->Error});
Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
@@ -449,6 +912,7 @@ private:
QueueTotalDataSize = 0;
ContainerCache.Clear();
+ Send(FileQueueActor, new NActors::TEvents::TEvPoison());
TActorBootstrapped<TS3ReadActor>::PassAway();
}
@@ -461,8 +925,6 @@ private:
}
private:
- size_t IsDoneCounter = 0U;
-
const TS3ReadActorFactoryConfig ReadActorFactoryCfg;
const IHTTPGateway::TPtr Gateway;
const THolderFactory& HolderFactory;
@@ -477,7 +939,15 @@ private:
const TString Url;
const TString Token;
- const TPathList Paths;
+ const TString Pattern;
+ const ES3PatternVariant PatternVariant;
+ TPathList Paths;
+ std::vector<TObjectPath> ObjectPathCache;
+ bool IsObjectQueueEmpty = false;
+ bool IsWaitingObjectQueueResponse = false;
+ size_t ListedFiles = 0;
+ size_t CompletedFiles = 0;
+ NActors::TActorId FileQueueActor;
const bool AddPathIndex;
const ui64 StartPathIndex;
const ui64 SizeLimit;
@@ -495,9 +965,9 @@ private:
::NMonitoring::TDynamicCounters::TCounterPtr TaskQueueDataLimit;
::NMonitoring::TDynamicCounterPtr Counters;
::NMonitoring::TDynamicCounterPtr TaskCounters;
- ui64 CurrentPathIndex = 0;
ui64 QueueTotalDataSize = 0;
ui64 DownloadInflight = 0;
+ const ui64 FileSizeLimit;
};
struct TReadSpec {
@@ -1184,6 +1654,8 @@ public:
const THolderFactory& holderFactory,
const TString& url,
const TString& token,
+ const TString& pattern,
+ ES3PatternVariant patternVariant,
TPathList&& paths,
bool addPathIndex,
ui64 startPathIndex,
@@ -1194,7 +1666,8 @@ public:
IArrowReader::TPtr arrowReader,
const TS3ReadActorFactoryConfig& readActorFactoryCfg,
::NMonitoring::TDynamicCounterPtr counters,
- ::NMonitoring::TDynamicCounterPtr taskCounters
+ ::NMonitoring::TDynamicCounterPtr taskCounters,
+ ui64 fileSizeLimit
) : ReadActorFactoryCfg(readActorFactoryCfg)
, Gateway(std::move(gateway))
, HolderFactory(holderFactory)
@@ -1204,16 +1677,17 @@ public:
, RetryPolicy(retryPolicy)
, Url(url)
, Token(token)
+ , Pattern(pattern)
+ , PatternVariant(patternVariant)
, Paths(std::move(paths))
, AddPathIndex(addPathIndex)
, StartPathIndex(startPathIndex)
, ReadSpec(readSpec)
- , Count(Paths.size())
, MaxBlocksInFly(maxBlocksInFly)
- , ArrowReader(arrowReader)
- , Counters(counters)
- , TaskCounters(taskCounters)
- {
+ , ArrowReader(std::move(arrowReader))
+ , Counters(std::move(counters))
+ , TaskCounters(std::move(taskCounters))
+ , FileSizeLimit(fileSizeLimit) {
if (Counters) {
QueueDataSize = Counters->GetCounter("QueueDataSize");
QueueDataLimit = Counters->GetCounter("QueueDataLimit");
@@ -1235,17 +1709,27 @@ public:
void Bootstrap() {
LOG_D("TS3StreamReadActor", "Bootstrap");
+ FileQueueActor = RegisterWithSameMailbox(new TS3FileQueueActor{
+ TxId,
+ std::move(Paths),
+ ReadActorFactoryCfg.MaxInflight * 2,
+ FileSizeLimit,
+ Gateway,
+ Url,
+ Token,
+ Pattern,
+ PatternVariant,
+ ES3PatternType::Wildcard});
+ SendPathRequest();
Become(&TS3StreamReadActor::StateFunc);
- while (TryRegisterCoro()) {
-
- }
}
bool TryRegisterCoro() {
- if (CurrentPathIndex >= Paths.size()) {
+ if (ObjectPathCache.empty()) {
// no path is pending
return false;
}
+
if (QueueTotalDataSize > ReadActorFactoryCfg.DataInflight) {
// too large data inflight
return false;
@@ -1254,23 +1738,70 @@ public:
// too large download inflight
return false;
}
- RegisterCoro(CurrentPathIndex++);
+ RegisterCoro();
return true;
}
- void RegisterCoro(size_t index) {
+ void RegisterCoro() {
DownloadInflight++;
- const TPath& path = Paths[index];
+ const auto& objectPath = ReadPathFromCache();
const TString requestId = CreateGuidAsString();
- auto stuff = std::make_shared<TRetryStuff>(Gateway, Url + path.Path, MakeHeaders(Token, requestId), path.Size, TxId, requestId, RetryPolicy);
- auto pathIndex = index + StartPathIndex;
+ auto stuff = std::make_shared<TRetryStuff>(
+ Gateway,
+ Url + objectPath.Path,
+ MakeHeaders(Token, requestId),
+ objectPath.Size,
+ TxId,
+ requestId,
+ RetryPolicy);
+ auto pathIndex = objectPath.PathIndex + StartPathIndex;
RetryStuffForFile.emplace(pathIndex, stuff);
if (TaskCounters) {
HttpInflightLimit->Add(Gateway->GetBuffersSizePerStream());
}
+ LOG_D(
+ "TS3StreamReadActor",
+ "RegisterCoro with path " << objectPath.Path << " with pathIndex "
+ << pathIndex);
::NMonitoring::TDynamicCounters::TCounterPtr inflightCounter;
- auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, TxId, ComputeActorId, stuff, ReadSpec, pathIndex, path.Path, Url, MaxBlocksInFly, ArrowReader, ReadActorFactoryCfg, DeferredQueueSize, HttpInflightSize, HttpDataRps);
- CoroActors.insert(RegisterWithSameMailbox(std::make_unique<TS3ReadCoroActor>(std::move(impl), std::move(stuff), pathIndex, impl->IsDownloadNeeded(), impl->HttpInflightSize).release()));
+ auto impl = MakeHolder<TS3ReadCoroImpl>(
+ InputIndex,
+ TxId,
+ ComputeActorId,
+ stuff,
+ ReadSpec,
+ pathIndex,
+ objectPath.Path,
+ Url,
+ MaxBlocksInFly,
+ ArrowReader,
+ ReadActorFactoryCfg,
+ DeferredQueueSize,
+ HttpInflightSize,
+ HttpDataRps);
+ auto isDownloadNeeded = impl->IsDownloadNeeded();
+ const auto& httpInflightSize = impl->HttpInflightSize;
+ CoroActors.insert(RegisterWithSameMailbox(new TS3ReadCoroActor(
+ std::move(impl), std::move(stuff), pathIndex, isDownloadNeeded, httpInflightSize)));
+ }
+
+ TObjectPath ReadPathFromCache() {
+ Y_ENSURE(!ObjectPathCache.empty());
+ auto object = ObjectPathCache.back();
+ ObjectPathCache.pop_back();
+ if (ObjectPathCache.empty() && !IsObjectQueueEmpty) {
+ SendPathRequest();
+ }
+ return object;
+ }
+ void SendPathRequest() {
+ Y_ENSURE(!IsWaitingObjectQueueResponse);
+ LOG_D("TS3StreamReadActor", "SendPathRequest " << ReadActorFactoryCfg.MaxInflight);
+ Send(
+ FileQueueActor,
+ std::make_unique<TS3FileQueueActor::TEvPrivatePrivate::TEvGetNextFile>(
+ ReadActorFactoryCfg.MaxInflight));
+ IsWaitingObjectQueueResponse = true;
}
static constexpr char ActorName[] = "S3_STREAM_READ_ACTOR";
@@ -1381,7 +1912,7 @@ private:
MaybeContinue();
- finished = Blocks.empty() && !Count;
+ finished = LastFileWasProcessed();
if (finished) {
ContainerCache.Clear();
ArrowTupleContainerCache.Clear();
@@ -1416,6 +1947,7 @@ private:
for (const auto actorId : CoroActors) {
Send(actorId, new NActors::TEvents::TEvPoison());
}
+ Send(FileQueueActor, new NActors::TEvents::TEvPoison());
ContainerCache.Clear();
ArrowTupleContainerCache.Clear();
@@ -1468,8 +2000,33 @@ private:
hFunc(TEvPrivate::TEvNextBlock, HandleNextBlock);
hFunc(TEvPrivate::TEvNextRecordBatch, HandleNextRecordBatch);
hFunc(TEvPrivate::TEvFileFinished, HandleFileFinished);
+ hFunc(TEvPrivate::TEvObjectPathBatch, HandleObjectPathBatch);
+ hFunc(TEvPrivate::TEvObjectPathReadError, HandleObjectPathReadError);
)
+ void HandleObjectPathBatch(TEvPrivate::TEvObjectPathBatch::TPtr& objectPathBatch) {
+ Y_ENSURE(IsWaitingObjectQueueResponse);
+ IsWaitingObjectQueueResponse = false;
+ ListedFiles += objectPathBatch->Get()->ObjectPaths.size();
+ IsObjectQueueEmpty = objectPathBatch->Get()->NoMoreFiles;
+
+ ObjectPathCache.insert(
+ ObjectPathCache.end(),
+ std::make_move_iterator(objectPathBatch->Get()->ObjectPaths.begin()),
+ std::make_move_iterator(objectPathBatch->Get()->ObjectPaths.end()));
+ LOG_W(
+ "TS3StreamReadActor",
+ "HandleObjectPathBatch " << ObjectPathCache.size() << " IsObjectQueueEmpty "
+ << IsObjectQueueEmpty << " MaxInflight " << ReadActorFactoryCfg.MaxInflight);
+ while (TryRegisterCoro()) {}
+ }
+ void HandleObjectPathReadError(TEvPrivate::TEvObjectPathReadError::TPtr& result) {
+ IsObjectQueueEmpty = true;
+ LOG_W("TS3StreamReadActor", "Error while object listing, details: TEvObjectPathReadError: " << result->Get()->Issues.ToOneLineString());
+ auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "Error while object listing", TIssues{result->Get()->Issues});
+ Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), NYql::NDqProto::StatusIds::EXTERNAL_ERROR));
+ }
+
void HandleRetry(TEvPrivate::TEvRetryEventFunc::TPtr& retry) {
return retry->Get()->Functor();
}
@@ -1514,14 +2071,13 @@ private:
CoroActors.erase(ev->Sender);
IngressBytes += ev->Get()->IngressDelta;
RetryStuffForFile.erase(ev->Get()->PathIndex);
- Y_VERIFY(Count);
- --Count;
if (TaskCounters) {
HttpInflightLimit->Sub(Gateway->GetBuffersSizePerStream());
}
DownloadInflight--;
- if (CurrentPathIndex < Paths.size()) {
+ CompletedFiles++;
+ if (!ObjectPathCache.empty()) {
TryRegisterCoro();
} else {
/*
@@ -1529,12 +2085,16 @@ private:
then we need to pass the information to Compute Actor that
the download of all data is finished in this place
*/
- if (Blocks.empty() && Count == 0) {
+ if (LastFileWasProcessed()) {
Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex));
}
}
}
+ bool LastFileWasProcessed() const {
+ return Blocks.empty() && (ListedFiles == CompletedFiles) && IsObjectQueueEmpty;
+ }
+
const TS3ReadActorFactoryConfig ReadActorFactoryCfg;
const IHTTPGateway::TPtr Gateway;
THashMap<size_t, TRetryStuff::TPtr> RetryStuffForFile;
@@ -1550,16 +2110,21 @@ private:
const TString Url;
const TString Token;
- const TPathList Paths;
+ const TString Pattern;
+ const ES3PatternVariant PatternVariant;
+ TPathList Paths;
+ std::vector<TObjectPath> ObjectPathCache;
+ bool IsObjectQueueEmpty = false;
+ bool IsWaitingObjectQueueResponse = false;
const bool AddPathIndex;
const ui64 StartPathIndex;
+ size_t ListedFiles = 0;
+ size_t CompletedFiles = 0;
const TReadSpec::TPtr ReadSpec;
std::deque<TReadyBlock> Blocks;
- ui32 Count;
const std::size_t MaxBlocksInFly;
IArrowReader::TPtr ArrowReader;
ui64 IngressBytes = 0;
- size_t CurrentPathIndex = 0;
mutable TInstant LastMemoryReport = TInstant::Now();
ui64 QueueTotalDataSize = 0;
::NMonitoring::TDynamicCounters::TCounterPtr QueueDataSize;
@@ -1577,7 +2142,9 @@ private:
::NMonitoring::TDynamicCounterPtr TaskCounters;
ui64 DownloadInflight = 0;
std::set<NActors::TActorId> CoroActors;
+ NActors::TActorId FileQueueActor;
bool Paused = false;
+ const ui64 FileSizeLimit;
};
using namespace NKikimr::NMiniKQL;
@@ -1741,6 +2308,38 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
const auto authToken = credentialsProviderFactory->CreateProvider()->GetAuthInfo();
const auto& settings = params.GetSettings();
+ TString pathPattern = "*";
+ ES3PatternVariant pathPatternVariant = ES3PatternVariant::FilePattern;
+ auto hasDirectories = std::find_if(paths.begin(), paths.end(), [](const TPath& a) {
+ return a.IsDirectory;
+ }) != paths.end();
+ if (hasDirectories) {
+ auto pathPatternValue = settings.find("pathpattern");
+ if (pathPatternValue == settings.cend()) {
+ ythrow yexception() << "'pathpattern' must be configured for directory listing";
+ }
+ pathPattern = pathPatternValue->second;
+
+ auto pathPatternVariantValue = settings.find("pathpatternvariant");
+ if (pathPatternVariantValue == settings.cend()) {
+ ythrow yexception()
+ << "'pathpatternvariant' must be configured for directory listing";
+ }
+ auto maybePathPatternVariant =
+ NS3Lister::DeserializePatternVariant(pathPatternVariantValue->second);
+ if (maybePathPatternVariant.Empty()) {
+ ythrow yexception()
+ << "Unknown 'pathpatternvariant': " << pathPatternVariantValue->second;
+ }
+ pathPatternVariant = *maybePathPatternVariant;
+ }
+ ui64 fileSizeLimit = cfg.FileSizeLimit;
+ if (params.HasFormat()) {
+ if (auto it = cfg.FormatSizeLimits.find(params.GetFormat()); it != cfg.FormatSizeLimits.end()) {
+ fileSizeLimit = it->second;
+ }
+ }
+
bool addPathIndex = false;
if (auto it = settings.find("addPathIndex"); it != settings.cend()) {
addPathIndex = FromString<bool>(it->second);
@@ -1840,9 +2439,9 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
std::size_t maxBlocksInFly = 2;
if (const auto it = settings.find("fileReadBlocksInFly"); settings.cend() != it)
maxBlocksInFly = FromString<ui64>(it->second);
- const auto actor = new TS3StreamReadActor(inputIndex, txId, std::move(gateway), holderFactory, params.GetUrl(), authToken,
+ const auto actor = new TS3StreamReadActor(inputIndex, txId, std::move(gateway), holderFactory, params.GetUrl(), authToken, pathPattern, pathPatternVariant,
std::move(paths), addPathIndex, startPathIndex, readSpec, computeActorId, retryPolicy,
- maxBlocksInFly, arrowReader, cfg, counters, taskCounters);
+ maxBlocksInFly, arrowReader, cfg, counters, taskCounters, fileSizeLimit);
return {actor, actor};
} else {
@@ -1850,9 +2449,9 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
if (const auto it = settings.find("sizeLimit"); settings.cend() != it)
sizeLimit = FromString<ui64>(it->second);
- const auto actor = new TS3ReadActor(inputIndex, txId, std::move(gateway), holderFactory, params.GetUrl(), authToken,
+ const auto actor = new TS3ReadActor(inputIndex, txId, std::move(gateway), holderFactory, params.GetUrl(), authToken, pathPattern, pathPatternVariant,
std::move(paths), addPathIndex, startPathIndex, computeActorId, sizeLimit, retryPolicy,
- cfg, counters, taskCounters);
+ cfg, counters, taskCounters, fileSizeLimit);
return {actor, actor};
}
}
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h
index 60b1c0d4ff..00f5434248 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h
@@ -17,6 +17,8 @@ struct TS3ReadActorFactoryConfig {
ui64 RowsInBatch = 1000;
ui64 MaxInflight = 20;
ui64 DataInflight = 200_MB;
+ ui64 FileSizeLimit = 2_GB;
+ std::unordered_map<TString, ui64> FormatSizeLimits;
};
void RegisterS3ReadActorFactory(
diff --git a/ydb/library/yql/providers/s3/object_listers/CMakeLists.darwin.txt b/ydb/library/yql/providers/s3/object_listers/CMakeLists.darwin.txt
new file mode 100644
index 0000000000..7188435f83
--- /dev/null
+++ b/ydb/library/yql/providers/s3/object_listers/CMakeLists.darwin.txt
@@ -0,0 +1,23 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(providers-s3-object_listers)
+target_link_libraries(providers-s3-object_listers PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-re2
+ cpp-xml-document
+ providers-common-http_gateway
+ library-yql-utils
+ yql-utils-threading
+)
+target_sources(providers-s3-object_listers PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/object_listers/yql_s3_path.cpp
+)
diff --git a/ydb/library/yql/providers/s3/object_listers/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/s3/object_listers/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..2b11109c92
--- /dev/null
+++ b/ydb/library/yql/providers/s3/object_listers/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,24 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(providers-s3-object_listers)
+target_link_libraries(providers-s3-object_listers PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-re2
+ cpp-xml-document
+ providers-common-http_gateway
+ library-yql-utils
+ yql-utils-threading
+)
+target_sources(providers-s3-object_listers PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/object_listers/yql_s3_path.cpp
+)
diff --git a/ydb/library/yql/providers/s3/object_listers/CMakeLists.linux.txt b/ydb/library/yql/providers/s3/object_listers/CMakeLists.linux.txt
new file mode 100644
index 0000000000..2b11109c92
--- /dev/null
+++ b/ydb/library/yql/providers/s3/object_listers/CMakeLists.linux.txt
@@ -0,0 +1,24 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(providers-s3-object_listers)
+target_link_libraries(providers-s3-object_listers PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ contrib-libs-re2
+ cpp-xml-document
+ providers-common-http_gateway
+ library-yql-utils
+ yql-utils-threading
+)
+target_sources(providers-s3-object_listers PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/object_listers/yql_s3_path.cpp
+)
diff --git a/ydb/library/yql/providers/s3/object_listers/CMakeLists.txt b/ydb/library/yql/providers/s3/object_listers/CMakeLists.txt
new file mode 100644
index 0000000000..5bb4faffb4
--- /dev/null
+++ b/ydb/library/yql/providers/s3/object_listers/CMakeLists.txt
@@ -0,0 +1,15 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin.txt)
+elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux.txt)
+endif()
diff --git a/ydb/library/yql/providers/s3/object_listers/yql_s3_future_algorithms.h b/ydb/library/yql/providers/s3/object_listers/yql_s3_future_algorithms.h
new file mode 100644
index 0000000000..2fd32c50f3
--- /dev/null
+++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_future_algorithms.h
@@ -0,0 +1,102 @@
+#pragma once
+
+#include "yql_s3_list.h"
+
+#include <library/cpp/threading/future/future.h>
+
+#include <memory>
+#include <variant>
+#include <vector>
+
+namespace NYql {
+
+enum class EAggregationAction : ui8 { Proceed = 0, Stop = 1 };
+
+template<typename State, typename Increment>
+using TAggregationOperation =
+ std::function<EAggregationAction(State& state, Increment&& increment)>;
+
+template<typename State>
+using TAggregationErrorHandler =
+ std::function<EAggregationAction(State& state, const std::exception& exception)>;
+
+namespace NDetails {
+
+template<typename State>
+struct TAggregationState {
+ State Acc;
+ TAggregationOperation<State, NS3Lister::TListResult> IncrementHandler;
+ TAggregationErrorHandler<State> ErrorHandler;
+ std::shared_ptr<NS3Lister::IS3Lister> Lister;
+ NThreading::TPromise<State> Promise;
+
+ TAggregationState(
+ State&& acc,
+ TAggregationOperation<State, NS3Lister::TListResult>&& incrementHandler,
+ TAggregationErrorHandler<State>&& errorHandler,
+ std::shared_ptr<NS3Lister::IS3Lister> lister,
+ NThreading::TPromise<State> promise)
+ : Acc(std::move(acc))
+ , IncrementHandler(std::move(incrementHandler))
+ , ErrorHandler(std::move(errorHandler))
+ , Lister(std::move(lister))
+ , Promise(std::move(promise)) { }
+};
+
+template<typename State>
+void SubscribeCallback(
+ const NThreading::TFuture<NS3Lister::TListResult>& strategyListingResult,
+ std::shared_ptr<TAggregationState<State>> state) {
+ EAggregationAction nextAction;
+
+ try {
+ NS3Lister::TListResult listingResult = strategyListingResult.GetValue();
+ nextAction = state->IncrementHandler(state->Acc, std::move(listingResult));
+ } catch (const std::exception& e) {
+ nextAction = state->ErrorHandler(state->Acc, e);
+ }
+
+ if (nextAction == EAggregationAction::Proceed && state->Lister->HasNext()) {
+ auto nextFuture = state->Lister->Next();
+ nextFuture.Subscribe(
+ [state = std::move(state)](
+ const NThreading::TFuture<NS3Lister::TListResult>& listingResult) mutable {
+ SubscribeCallback(listingResult, std::move(state));
+ });
+ } else {
+ state->Promise.SetValue(std::move(state->Acc));
+ }
+}
+
+} // namespace
+
+template<typename State>
+NThreading::TFuture<State> AccumulateWithEarlyStop(
+ std::shared_ptr<NS3Lister::IS3Lister> lister,
+ typename std::remove_reference<State>::type&& initialState,
+ TAggregationOperation<State, NS3Lister::TListResult>&& incrementHandler,
+ TAggregationErrorHandler<State>&& errorHandler) {
+ auto state = std::make_shared<NDetails::TAggregationState<State>>(
+ std::move(initialState),
+ std::move(incrementHandler),
+ std::move(errorHandler),
+ std::move(lister),
+ NThreading::NewPromise<State>());
+
+ auto promise = state->Promise;
+
+ if (state->Lister->HasNext()) {
+ auto nextFuture = state->Lister->Next();
+ nextFuture.Subscribe(
+ [state = std::move(state)](
+ const NThreading::TFuture<NS3Lister::TListResult>& listingResult) mutable {
+ SubscribeCallback(listingResult, std::move(state));
+ });
+ } else {
+ state->Promise.SetValue(std::move(state->Acc));
+ }
+
+ return promise.GetFuture();
+}
+
+} // namespace NYql
diff --git a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp
new file mode 100644
index 0000000000..f2437741f5
--- /dev/null
+++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp
@@ -0,0 +1,443 @@
+#include "yql_s3_list.h"
+#include "yql_s3_path.h"
+
+#include <ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h>
+#include <ydb/library/yql/providers/s3/common/util.h>
+#include <ydb/library/yql/utils/log/log.h>
+#include <ydb/library/yql/utils/url_builder.h>
+#include <ydb/library/yql/utils/yql_panic.h>
+
+#include <contrib/libs/re2/re2/re2.h>
+
+#ifdef THROW
+#undef THROW
+#endif
+#include <library/cpp/threading/future/async_semaphore.h>
+#include <library/cpp/xml/document/xml-document.h>
+#include <util/folder/iterator.h>
+#include <util/generic/guid.h>
+#include <util/string/builder.h>
+
+#include <deque>
+#include <utility>
+
+namespace NYql::NS3Lister {
+
+TMaybe<TString> SerializePatternVariant(ES3PatternVariant variant) {
+ switch (variant) {
+ case ES3PatternVariant::PathPattern:
+ return "path_pattern";
+ case ES3PatternVariant::FilePattern:
+ return "file_pattern";
+ default:
+ return Nothing();
+ }
+}
+
+TMaybe<ES3PatternVariant> DeserializePatternVariant(const TString& variant) {
+ if (variant == "path_pattern") {
+ return ES3PatternVariant::PathPattern;
+ }
+
+ if (variant == "file_pattern") {
+ return ES3PatternVariant::FilePattern;
+ }
+
+ return Nothing();
+}
+
+namespace {
+
+using namespace NThreading;
+
+using TResultFilter =
+ std::function<bool(const TString& path, std::vector<TString>& matchedGlobs)>;
+
+TResultFilter MakeFilterRegexp(const TString& regex) {
+ auto re = std::make_shared<RE2>(re2::StringPiece(regex), RE2::Options());
+ YQL_ENSURE(re->ok());
+
+ const size_t numGroups = re->NumberOfCapturingGroups();
+ YQL_CLOG(DEBUG, ProviderS3)
+ << "Got regex: '" << regex << "' with " << numGroups << " capture groups ";
+
+ auto groups = std::make_shared<std::vector<std::string>>(numGroups);
+ auto reArgs = std::make_shared<std::vector<re2::RE2::Arg>>(numGroups);
+ auto reArgsPtr = std::make_shared<std::vector<re2::RE2::Arg*>>(numGroups);
+
+ for (size_t i = 0; i < numGroups; ++i) {
+ (*reArgs)[i] = &(*groups)[i];
+ (*reArgsPtr)[i] = &(*reArgs)[i];
+ }
+
+ return [groups,
+ reArgs,
+ reArgsPtr,
+ re](const TString& path, std::vector<TString>& matchedGlobs) {
+ matchedGlobs.clear();
+ bool matched =
+ re2::RE2::FullMatchN(path, *re, reArgsPtr->data(), reArgsPtr->size());
+ if (matched) {
+ matchedGlobs.reserve(groups->size());
+ for (auto& group : *groups) {
+ matchedGlobs.push_back(ToString(group));
+ }
+ }
+ return matched;
+ };
+}
+
+TResultFilter MakeFilterWildcard(const TString& pattern) {
+ auto regexPatternPrefix = pattern.substr(0, NS3::GetFirstWildcardPos(pattern));
+ if (regexPatternPrefix == pattern) {
+ // just match for equality
+ return [pattern](const TString& path, std::vector<TString>& matchedGlobs) {
+ matchedGlobs.clear();
+ return path == pattern;
+ };
+ }
+
+ const auto regex = NS3::RegexFromWildcards(pattern);
+ YQL_CLOG(DEBUG, ProviderS3) << "Got prefix: '" << regexPatternPrefix << "', regex: '"
+ << regex << "' from original pattern '" << pattern << "'";
+
+ return MakeFilterRegexp(regex);
+}
+
+TResultFilter MakeFilter(const TString& pattern, ES3PatternType patternType) {
+ switch (patternType) {
+ case ES3PatternType::Wildcard:
+ return MakeFilterWildcard(pattern);
+ case ES3PatternType::Regexp:
+ return MakeFilterRegexp(pattern);
+ default:
+ ythrow yexception() << "Unknown 'patternType': " << int(patternType);
+ }
+}
+
+struct TS3ListObjectV2Response {
+ bool IsTruncated = false;
+ ui64 MaxKeys = 0U;
+ ui64 KeyCount = 0U;
+ TMaybe<TString> ContinuationToken;
+ std::vector<TObjectListEntry> Contents;
+ std::vector<TString> CommonPrefixes;
+};
+
+TS3ListObjectV2Response ParseListObjectV2Response(
+ const NXml::TDocument& xml, const TString& requestId = "UnspecifiedRequestId") {
+ if (const auto& root = xml.Root(); root.Name() == "Error") {
+ const auto& code = root.Node("Code", true).Value<TString>();
+ const auto& message = root.Node("Message", true).Value<TString>();
+ ythrow yexception() << message << ", error: code: " << code << ", request id: ["
+ << requestId << "]";
+ } else if (root.Name() != "ListBucketResult") {
+ ythrow yexception() << "Unexpected response '" << root.Name()
+ << "' on discovery, request id: [" << requestId << "]";
+ } else {
+ const NXml::TNamespacesForXPath nss(
+ 1U, {"s3", "http://s3.amazonaws.com/doc/2006-03-01/"});
+
+ TS3ListObjectV2Response result;
+ auto continuationTokenNode = root.Node("s3:NextContinuationToken", true, nss);
+ result.ContinuationToken =
+ (!continuationTokenNode.IsNull())
+ ? TMaybe<TString>(continuationTokenNode.Value<TString>())
+ : Nothing();
+ result.IsTruncated = root.Node("s3:IsTruncated", false, nss).Value<bool>();
+ result.MaxKeys = root.Node("s3:MaxKeys", false, nss).Value<size_t>();
+ result.KeyCount = root.Node("s3:KeyCount", false, nss).Value<size_t>();
+
+ const auto& prefixes = root.XPath("s3:CommonPrefixes/s3:Prefix", true, nss);
+ for (const auto& prefix : prefixes) {
+ auto prefixString = prefix.Value<TString>();
+ result.CommonPrefixes.push_back(prefixString);
+ }
+ const auto& contents = root.XPath("s3:Contents", true, nss);
+ for (const auto& content : contents) {
+ auto& newContent = result.Contents.emplace_back();
+ newContent.Path = content.Node("s3:Key", false, nss).Value<TString>();
+ newContent.Size = content.Node("s3:Size", false, nss).Value<ui64>();
+ }
+ return result;
+ }
+}
+
+class TLocalS3Lister : public IS3Lister {
+public:
+ TLocalS3Lister(const TListingRequest& listingRequest, const TMaybe<TString>& delimiter)
+ : ListingRequest(listingRequest) {
+ Y_ENSURE(!delimiter.Defined(), "delimiter is not supported for local files");
+ Filter = MakeFilter(listingRequest.Pattern, listingRequest.PatternType);
+ }
+
+ TFuture<TListResult> Next() override {
+ Y_ENSURE(IsFirst, "Should not be called more than once");
+ Y_ENSURE(ListingRequest.Url.substr(0, 7) == "file://");
+ IsFirst = false;
+ auto promise = NewPromise<TListResult>();
+ try {
+ auto fullPath = ListingRequest.Url.substr(7);
+ for (const auto& e : TPathSplit(fullPath)) {
+ if (e == "..") {
+ promise.SetException(
+ "Security violation: trying access parent directory in path");
+ }
+ }
+
+ auto output = TListEntries{};
+ for (const auto& entry : TDirIterator(fullPath)) {
+ if (entry.fts_type != FTS_F) {
+ continue;
+ }
+
+ auto filename = TString(entry.fts_path + ListingRequest.Url.size() - 7);
+ TVector<TString> matches;
+ if (Filter(filename, matches)) {
+ auto& object = output.Objects.emplace_back();
+ object.Path = filename;
+ object.Size = entry.fts_statp->st_size;
+ object.MatchedGlobs.swap(matches);
+ }
+ }
+ promise.SetValue(std::move(output));
+ } catch (const std::exception& ex) {
+ promise.SetException(std::current_exception());
+ }
+ return promise.GetFuture();
+ }
+
+ bool HasNext() override { return IsFirst; }
+
+private:
+ const TListingRequest ListingRequest;
+ TResultFilter Filter;
+ bool IsFirst = true;
+};
+
+class TS3Lister : public IS3Lister {
+public:
+ struct TListingContext {
+ // Filter
+ const TResultFilter Filter;
+ // Result processing
+ NThreading::TPromise<TListResult> Promise;
+ NThreading::TPromise<TMaybe<TListingContext>> NextRequestPromise;
+ const std::shared_ptr<TListEntries> Output;
+ // HTTP control
+ const IHTTPGateway::TWeakPtr GatewayWeak;
+ const IRetryPolicy<long>::TPtr RetryPolicy;
+ const TString RequestId;
+ const TListingRequest ListingRequest;
+ const TMaybe<TString> Delimiter;
+ const TMaybe<TString> ContinuationToken;
+ const ui64 MaxKeys;
+ };
+
+ TS3Lister(
+ IHTTPGateway::TPtr httpGateway,
+ const TListingRequest& listingRequest,
+ const TMaybe<TString>& delimiter,
+ size_t maxFilesPerQuery = 1000)
+ : MaxFilesPerQuery(maxFilesPerQuery) {
+ Y_ENSURE(
+ listingRequest.Url.substr(0, 7) != "file://",
+ "This lister does not support reading local files");
+
+ auto filter = MakeFilter(listingRequest.Pattern, listingRequest.PatternType);
+
+ auto ctx = TListingContext{
+ filter,
+ NewPromise<TListResult>(),
+ NewPromise<TMaybe<TListingContext>>(),
+ std::make_shared<TListEntries>(),
+ IHTTPGateway::TWeakPtr(httpGateway),
+ GetHTTPDefaultRetryPolicy(),
+ CreateGuidAsString(),
+ listingRequest,
+ delimiter,
+ Nothing(),
+ MaxFilesPerQuery};
+ YQL_CLOG(TRACE, ProviderS3)
+ << "[TS3Lister] Got URL: '" << ctx.ListingRequest.Url
+ << "' with path prefix '" << ctx.ListingRequest.Prefix
+ << "' capture pattern '" << ctx.ListingRequest.Pattern << "' and delimiter '"
+ << ctx.Delimiter.GetOrElse("NO_DELIMITER") << "'";
+
+ auto promise = NewPromise<TMaybe<TListingContext>>();
+ promise.SetValue(TMaybe<TListingContext>(ctx));
+ NextRequestCtx = promise;
+ }
+
+ ~TS3Lister() override = default;
+
+private:
+ static void SubmitRequestIntoGateway(TListingContext& ctx) {
+ IHTTPGateway::THeaders headers;
+ if (!ctx.ListingRequest.Token.empty()) {
+ headers.emplace_back("X-YaCloud-SubjectToken:" + ctx.ListingRequest.Token);
+ }
+ headers.emplace_back(TString{"X-Request-ID:"} + ctx.RequestId);
+
+ TUrlBuilder urlBuilder(ctx.ListingRequest.Url);
+ urlBuilder.AddUrlParam("list-type", "2")
+ .AddUrlParam("prefix", ctx.ListingRequest.Prefix)
+ .AddUrlParam("max-keys", TStringBuilder() << ctx.MaxKeys);
+
+ if (ctx.ContinuationToken.Defined()) {
+ urlBuilder.AddUrlParam("continuation-token", *ctx.ContinuationToken);
+ }
+ if (ctx.Delimiter.Defined()) {
+ urlBuilder.AddUrlParam("delimiter", *ctx.Delimiter);
+ }
+
+ auto gateway = ctx.GatewayWeak.lock();
+ if (!gateway) {
+ ythrow yexception() << "Gateway disappeared";
+ }
+
+ auto retryPolicy = ctx.RetryPolicy;
+ gateway->Download(
+ urlBuilder.Build(),
+ headers,
+ 0U,
+ 0U,
+ CallbackFactoryMethod(std::move(ctx)),
+ /*data=*/"",
+ retryPolicy);
+ }
+ static IHTTPGateway::TOnResult CallbackFactoryMethod(TListingContext&& listingContext) {
+ return [c = std::move(listingContext)](IHTTPGateway::TResult&& result) {
+ OnDiscovery(c, std::move(result));
+ };
+ }
+
+ static void OnDiscovery(TListingContext ctx, IHTTPGateway::TResult&& result) try {
+ auto gateway = ctx.GatewayWeak.lock();
+ if (!gateway) {
+ ythrow yexception() << "Gateway disappeared";
+ }
+ switch (result.index()) {
+ case 0U: {
+ auto xmlString =
+ std::get<IHTTPGateway::TContent>(std::move(result)).Extract();
+ const NXml::TDocument xml(xmlString, NXml::TDocument::String);
+ auto parsedResponse = ParseListObjectV2Response(xml, ctx.RequestId);
+ YQL_CLOG(DEBUG, ProviderS3)
+ << "Listing of " << ctx.ListingRequest.Url
+ << ctx.ListingRequest.Prefix << ": have " << ctx.Output->Size()
+ << " entries, got another " << parsedResponse.KeyCount
+ << " entries, request id: [" << ctx.RequestId << "]";
+
+ for (const auto& content : parsedResponse.Contents) {
+ if (content.Path.EndsWith('/')) {
+ // skip 'directories'
+ continue;
+ }
+ TVector<TString> matchedGlobs;
+ if (ctx.Filter(content.Path, matchedGlobs)) {
+ auto& object = ctx.Output->Objects.emplace_back();
+ object.Path = content.Path;
+ object.Size = content.Size;
+ object.MatchedGlobs.swap(matchedGlobs);
+ }
+ }
+ for (const auto& prefix : parsedResponse.CommonPrefixes) {
+ auto& directory = ctx.Output->Directories.emplace_back();
+ directory.Path = prefix;
+ directory.MatchedRegexp = ctx.Filter(prefix, directory.MatchedGlobs);
+ }
+
+ if (parsedResponse.IsTruncated) {
+ YQL_CLOG(DEBUG, ProviderS3) << "Listing of " << ctx.ListingRequest.Url
+ << ctx.ListingRequest.Prefix
+ << ": got truncated flag, will continue";
+
+ auto newCtx = TListingContext{
+ ctx.Filter,
+ NewPromise<TListResult>(),
+ NewPromise<TMaybe<TListingContext>>(),
+ std::make_shared<TListEntries>(),
+ ctx.GatewayWeak,
+ GetHTTPDefaultRetryPolicy(),
+ CreateGuidAsString(),
+ ctx.ListingRequest,
+ ctx.Delimiter,
+ parsedResponse.ContinuationToken,
+ parsedResponse.MaxKeys};
+
+ ctx.NextRequestPromise.SetValue(TMaybe<TListingContext>(newCtx));
+ } else {
+ ctx.NextRequestPromise.SetValue(Nothing());
+ }
+ ctx.Promise.SetValue(std::move(*ctx.Output));
+ break;
+ }
+ case 1U: {
+ auto issues = std::get<TIssues>(std::move(result));
+ issues = NS3Util::AddParentIssue(
+ TStringBuilder{} << "request id: [" << ctx.RequestId << "]",
+ std::move(issues));
+ YQL_CLOG(INFO, ProviderS3)
+ << "Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix
+ << ": got error from http gateway: " << issues.ToString(true);
+ ctx.Promise.SetValue(std::move(issues));
+ ctx.NextRequestPromise.SetValue(Nothing());
+ break;
+ }
+ default:
+ ythrow yexception() << "Undefined variant index: " << result.index()
+ << ", request id: [" << ctx.RequestId << "]";
+ }
+ } catch (const std::exception& ex) {
+ YQL_CLOG(INFO, ProviderS3)
+ << "Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix
+ << " : got exception: " << ex.what();
+ ctx.Promise.SetException(std::current_exception());
+ ctx.NextRequestPromise.SetValue(Nothing());
+ }
+
+public:
+ TFuture<TListResult> Next() override {
+ auto maybeRequestCtx = NextRequestCtx.GetValueSync();
+
+ Y_ENSURE(maybeRequestCtx.Defined());
+ auto result = maybeRequestCtx->Promise;
+
+ NextRequestCtx = maybeRequestCtx->NextRequestPromise;
+
+ SubmitRequestIntoGateway(*maybeRequestCtx);
+ return result.GetFuture();
+ }
+
+ bool HasNext() override {
+ // User should process future returned from `Next()` method call before asking if more data is available
+ // If returned DATA from `Next()` was NOT PROCESSED than this method might BLOCK.
+ NextRequestCtx.Wait();
+ return NextRequestCtx.HasValue() && NextRequestCtx.GetValue().Defined();
+ }
+
+private:
+ const size_t MaxFilesPerQuery;
+ TFuture<TMaybe<TListingContext>> NextRequestCtx;
+};
+
+} // namespace
+
+IS3Lister::TPtr MakeS3Lister(
+ const IHTTPGateway::TPtr& httpGateway,
+ const TListingRequest& listingRequest,
+ const TMaybe<TString>& delimiter,
+ bool allowLocalFiles) {
+ if (listingRequest.Url.substr(0, 7) != "file://") {
+ return std::make_shared<TS3Lister>(httpGateway, listingRequest, delimiter);
+ }
+
+ if (!allowLocalFiles) {
+ ythrow yexception() << "Using local files as DataSource isn't allowed, but trying access "
+ << listingRequest.Url;
+ }
+ return std::make_shared<TLocalS3Lister>(listingRequest, delimiter);
+}
+
+} // namespace NYql::NS3Lister
diff --git a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h
new file mode 100644
index 0000000000..5134481949
--- /dev/null
+++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h
@@ -0,0 +1,100 @@
+#pragma once
+
+#include <library/cpp/threading/future/future.h>
+#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
+
+#include <memory>
+#include <variant>
+#include <vector>
+
+namespace NYql {
+
+template<typename T>
+class TIterator {
+public:
+ virtual T Next() = 0;
+
+ virtual bool HasNext() = 0;
+
+ virtual ~TIterator() = default;
+};
+
+namespace NS3Lister {
+
+enum class ES3PatternVariant { FilePattern, PathPattern };
+
+TMaybe<TString> SerializePatternVariant(ES3PatternVariant variant);
+
+TMaybe<ES3PatternVariant> DeserializePatternVariant(const TString& variant);
+
+enum class ES3PatternType {
+ /**
+ * Pattern may include following wildcard expressions:
+ * * - any (possibly empty) sequence of characters
+ * ? - single character
+ * {variant1, variant2} - list of alternatives
+ */
+ Wildcard,
+ /**
+ * Pattern should be valid RE2 regex
+ */
+ Regexp
+};
+
+struct TObjectListEntry {
+ TString Path;
+ ui64 Size = 0;
+ std::vector<TString> MatchedGlobs;
+};
+
+struct TDirectoryListEntry {
+ TString Path;
+ bool MatchedRegexp = false;
+ std::vector<TString> MatchedGlobs;
+};
+
+class TListEntries {
+public:
+ [[nodiscard]] size_t Size() const { return Objects.size() + Directories.size(); }
+ [[nodiscard]] bool Empty() const { return Objects.empty() && Directories.empty(); }
+
+ TListEntries& operator+=(TListEntries&& other) {
+ Objects.insert(
+ Objects.end(),
+ std::make_move_iterator(other.Objects.begin()),
+ std::make_move_iterator(other.Objects.end()));
+ Directories.insert(
+ Directories.end(),
+ std::make_move_iterator(other.Directories.begin()),
+ std::make_move_iterator(other.Directories.end()));
+ return *this;
+ }
+
+public:
+ std::vector<TObjectListEntry> Objects;
+ std::vector<TDirectoryListEntry> Directories;
+};
+
+using TListResult = std::variant<TListEntries, TIssues>;
+
+struct TListingRequest {
+ TString Url;
+ TString Token;
+ TString Pattern;
+ ES3PatternType PatternType = ES3PatternType::Wildcard;
+ TString Prefix;
+};
+
+class IS3Lister : public TIterator<NThreading::TFuture<TListResult>> {
+public:
+ using TPtr = std::shared_ptr<IS3Lister>;
+};
+
+IS3Lister::TPtr MakeS3Lister(
+ const IHTTPGateway::TPtr& httpGateway,
+ const TListingRequest& listingRequest,
+ const TMaybe<TString>& delimiter,
+ bool allowLocalFiles);
+
+} // namespace NS3Lister
+} // namespace NYql \ No newline at end of file
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_path.cpp b/ydb/library/yql/providers/s3/object_listers/yql_s3_path.cpp
index e746aeedde..e746aeedde 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_path.cpp
+++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_path.cpp
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_path.h b/ydb/library/yql/providers/s3/object_listers/yql_s3_path.h
index c39f476f88..c39f476f88 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_path.h
+++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_path.h
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_path_ut.cpp b/ydb/library/yql/providers/s3/object_listers/yql_s3_path_ut.cpp
index f2ecaa8487..1b452a2683 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_path_ut.cpp
+++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_path_ut.cpp
@@ -1,4 +1,4 @@
-#include <ydb/library/yql/providers/s3/provider/yql_s3_path.h>
+#include "yql_s3_path.h"
#include <library/cpp/testing/unittest/registar.h>
diff --git a/ydb/library/yql/providers/s3/provider/CMakeLists.darwin.txt b/ydb/library/yql/providers/s3/provider/CMakeLists.darwin.txt
index 83a19eed84..73c17ce754 100644
--- a/ydb/library/yql/providers/s3/provider/CMakeLists.darwin.txt
+++ b/ydb/library/yql/providers/s3/provider/CMakeLists.darwin.txt
@@ -45,6 +45,7 @@ target_link_libraries(providers-s3-provider PUBLIC
providers-result-expr_nodes
providers-s3-common
providers-s3-expr_nodes
+ providers-s3-object_listers
providers-s3-path_generator
providers-s3-proto
providers-s3-range_helpers
@@ -60,10 +61,9 @@ target_sources(providers-s3-provider PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_exec.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_path.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_provider_impl.cpp
diff --git a/ydb/library/yql/providers/s3/provider/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/s3/provider/CMakeLists.linux-aarch64.txt
index f75614e737..50fe0185f0 100644
--- a/ydb/library/yql/providers/s3/provider/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/providers/s3/provider/CMakeLists.linux-aarch64.txt
@@ -46,6 +46,7 @@ target_link_libraries(providers-s3-provider PUBLIC
providers-result-expr_nodes
providers-s3-common
providers-s3-expr_nodes
+ providers-s3-object_listers
providers-s3-path_generator
providers-s3-proto
providers-s3-range_helpers
@@ -61,10 +62,9 @@ target_sources(providers-s3-provider PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_exec.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_path.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_provider_impl.cpp
diff --git a/ydb/library/yql/providers/s3/provider/CMakeLists.linux.txt b/ydb/library/yql/providers/s3/provider/CMakeLists.linux.txt
index f75614e737..50fe0185f0 100644
--- a/ydb/library/yql/providers/s3/provider/CMakeLists.linux.txt
+++ b/ydb/library/yql/providers/s3/provider/CMakeLists.linux.txt
@@ -46,6 +46,7 @@ target_link_libraries(providers-s3-provider PUBLIC
providers-result-expr_nodes
providers-s3-common
providers-s3-expr_nodes
+ providers-s3-object_listers
providers-s3-path_generator
providers-s3-proto
providers-s3-range_helpers
@@ -61,10 +62,9 @@ target_sources(providers-s3-provider PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_exec.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_path.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_provider_impl.cpp
diff --git a/ydb/library/yql/providers/s3/provider/ut/CMakeLists.darwin.txt b/ydb/library/yql/providers/s3/provider/ut/CMakeLists.darwin.txt
index c158b5a01c..9647ab029a 100644
--- a/ydb/library/yql/providers/s3/provider/ut/CMakeLists.darwin.txt
+++ b/ydb/library/yql/providers/s3/provider/ut/CMakeLists.darwin.txt
@@ -8,9 +8,6 @@
add_executable(ydb-library-yql-providers-s3-provider-ut)
-target_compile_options(ydb-library-yql-providers-s3-provider-ut PRIVATE
- -DUSE_CURRENT_UDF_ABI_VERSION
-)
target_include_directories(ydb-library-yql-providers-s3-provider-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider
)
@@ -21,9 +18,6 @@ target_link_libraries(ydb-library-yql-providers-s3-provider-ut PUBLIC
library-cpp-cpuid_check
cpp-testing-unittest_main
providers-s3-provider
- udf-service-exception_policy
- yql-sql-pg_dummy
- yql-dq-opt
)
target_link_options(ydb-library-yql-providers-s3-provider-ut PRIVATE
-Wl,-no_deduplicate
@@ -34,7 +28,7 @@ target_link_options(ydb-library-yql-providers-s3-provider-ut PRIVATE
CoreFoundation
)
target_sources(ydb-library-yql-providers-s3-provider-ut PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_path_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp
)
set_property(
TARGET
diff --git a/ydb/library/yql/providers/s3/provider/ut/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/s3/provider/ut/CMakeLists.linux-aarch64.txt
index 2b7084c413..27e5bf0cea 100644
--- a/ydb/library/yql/providers/s3/provider/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/providers/s3/provider/ut/CMakeLists.linux-aarch64.txt
@@ -8,9 +8,6 @@
add_executable(ydb-library-yql-providers-s3-provider-ut)
-target_compile_options(ydb-library-yql-providers-s3-provider-ut PRIVATE
- -DUSE_CURRENT_UDF_ABI_VERSION
-)
target_include_directories(ydb-library-yql-providers-s3-provider-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider
)
@@ -21,9 +18,6 @@ target_link_libraries(ydb-library-yql-providers-s3-provider-ut PUBLIC
library-cpp-lfalloc
cpp-testing-unittest_main
providers-s3-provider
- udf-service-exception_policy
- yql-sql-pg_dummy
- yql-dq-opt
)
target_link_options(ydb-library-yql-providers-s3-provider-ut PRIVATE
-ldl
@@ -36,7 +30,7 @@ target_link_options(ydb-library-yql-providers-s3-provider-ut PRIVATE
-ldl
)
target_sources(ydb-library-yql-providers-s3-provider-ut PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_path_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp
)
set_property(
TARGET
diff --git a/ydb/library/yql/providers/s3/provider/ut/CMakeLists.linux.txt b/ydb/library/yql/providers/s3/provider/ut/CMakeLists.linux.txt
index fe6d83f738..1cdb1de1d3 100644
--- a/ydb/library/yql/providers/s3/provider/ut/CMakeLists.linux.txt
+++ b/ydb/library/yql/providers/s3/provider/ut/CMakeLists.linux.txt
@@ -8,9 +8,6 @@
add_executable(ydb-library-yql-providers-s3-provider-ut)
-target_compile_options(ydb-library-yql-providers-s3-provider-ut PRIVATE
- -DUSE_CURRENT_UDF_ABI_VERSION
-)
target_include_directories(ydb-library-yql-providers-s3-provider-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider
)
@@ -23,9 +20,6 @@ target_link_libraries(ydb-library-yql-providers-s3-provider-ut PUBLIC
library-cpp-cpuid_check
cpp-testing-unittest_main
providers-s3-provider
- udf-service-exception_policy
- yql-sql-pg_dummy
- yql-dq-opt
)
target_link_options(ydb-library-yql-providers-s3-provider-ut PRIVATE
-ldl
@@ -38,7 +32,7 @@ target_link_options(ydb-library-yql-providers-s3-provider-ut PRIVATE
-ldl
)
target_sources(ydb-library-yql-providers-s3-provider-ut PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_path_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp
)
set_property(
TARGET
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
index d9628877a3..cfc5293dac 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
@@ -1,4 +1,3 @@
-#include "yql_s3_path.h"
#include "yql_s3_provider_impl.h"
#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
@@ -8,6 +7,7 @@
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
#include <ydb/library/yql/providers/common/provider/yql_data_provider_impl.h>
+#include <ydb/library/yql/providers/s3/object_listers/yql_s3_path.h>
#include <ydb/library/yql/utils/log/log.h>
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
index cb708b15fe..0e1c9a218c 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp
@@ -441,6 +441,22 @@ public:
return true;
}
+ if (name == "pathpattern"sv) {
+ TStringBuf unused;
+ if (!ExtractSettingValue(setting.Tail(), "path_pattern"sv, format, {}, ctx, unused)) {
+ return false;
+ }
+ return true;
+ }
+
+ if (name == "pathpatternvariant"sv) {
+ TStringBuf unused;
+ if (!ExtractSettingValue(setting.Tail(), "path_pattern_variant"sv, format, {}, ctx, unused)) {
+ return false;
+ }
+ return true;
+ }
+
YQL_ENSURE(name == "projection"sv);
haveProjection = true;
if (!EnsureAtom(setting.Tail(), ctx)) {
@@ -457,7 +473,7 @@ public:
if (!EnsureValidSettings(*input->Child(TS3Object::idx_Settings),
{ "compression"sv, "partitionedby"sv, "projection"sv, "data.interval.unit"sv,
"data.datetime.formatname"sv, "data.datetime.format"sv, "data.timestamp.formatname"sv, "data.timestamp.format"sv,
- "readmaxbytes"sv, "csvdelimiter"sv, "directories"sv, "filepattern"sv }, validator, ctx))
+ "readmaxbytes"sv, "csvdelimiter"sv, "directories"sv, "filepattern"sv, "pathpattern"sv, "pathpatternvariant"sv }, validator, ctx))
{
return TStatus::Error;
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
index 976f3e3ff2..d37b8de521 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp
@@ -1,9 +1,9 @@
-#include "yql_s3_list.h"
-#include "yql_s3_path.h"
#include "yql_s3_provider_impl.h"
+#include "yql_s3_listing_strategy.h"
#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
#include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h>
+#include <ydb/library/yql/providers/s3/object_listers/yql_s3_path.h>
#include <ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h>
#include <ydb/library/yql/providers/s3/range_helpers/path_list_reader.h>
#include <ydb/library/yql/core/yql_expr_optimize.h>
@@ -53,18 +53,18 @@ bool FindFilePattern(const TExprNode& settings, TExprContext& ctx, TString& file
using namespace NPathGenerator;
struct TListRequest {
- TString Token;
- TString Url;
- TString Pattern; // can contain capturing groups
- TMaybe<TString> PathPrefix; // set iff Pattern is regex (not glob pattern)
+ NS3Lister::TListingRequest S3Request;
+ TString FilePattern;
+ ES3ListingOptions Options = ES3ListingOptions::NoOptions;
TVector<IPathGenerator::TColumnWithValue> ColumnValues;
};
bool operator<(const TListRequest& a, const TListRequest& b) {
- return std::tie(a.Token, a.Url, a.Pattern) < std::tie(b.Token, b.Url, b.Pattern);
+ return std::tie(a.S3Request.Token, a.S3Request.Url, a.S3Request.Pattern) <
+ std::tie(b.S3Request.Token, b.S3Request.Url, b.S3Request.Pattern);
}
-using TPendingRequests = TMap<TListRequest, NThreading::TFuture<IS3Lister::TListResult>>;
+using TPendingRequests = TMap<TListRequest, NThreading::TFuture<NS3Lister::TListResult>>;
struct TGeneratedColumnsConfig {
TVector<TString> Columns;
@@ -76,8 +76,22 @@ class TS3IODiscoveryTransformer : public TGraphTransformerBase {
public:
TS3IODiscoveryTransformer(TS3State::TPtr state, IHTTPGateway::TPtr gateway)
: State_(std::move(state))
- , Lister_(IS3Lister::Make(gateway, State_->Configuration->MaxDiscoveryFilesPerQuery, State_->Configuration->MaxInflightListsPerQuery, State_->Configuration->AllowLocalFiles))
- {}
+ , ListerFactory_(
+ MakeS3ListerFactory(State_->Configuration->MaxInflightListsPerQuery))
+ , ListingStrategy_(MakeS3ListingStrategy(
+ gateway,
+ ListerFactory_,
+ State_->Configuration->MaxDiscoveryFilesPerQuery,
+ State_->Configuration->MaxDirectoriesAndFilesPerQuery,
+ State_->Configuration->MaxDirectoriesAndFilesPerQuery,
+ State_->Configuration->AllowLocalFiles))
+ , ListingBatchStrategy_(MakeS3BatchListingStrategy(
+ gateway,
+ ListerFactory_,
+ State_->Configuration->MaxDiscoveryFilesPerQuery,
+ State_->Configuration->MaxDirectoriesAndFilesPerQuery,
+ State_->Configuration->MaxDirectoriesAndFilesPerQuery,
+ State_->Configuration->AllowLocalFiles)) { }
void Rewind() final {
PendingRequests_.clear();
@@ -114,7 +128,7 @@ private:
return false;
});
- TVector<NThreading::TFuture<IS3Lister::TListResult>> futures;
+ TVector<NThreading::TFuture<NS3Lister::TListResult>> futures;
for (const auto& n : nodes) {
try {
if (auto maybeDqSource = TMaybeNode<TDqSourceWrap>(n)) {
@@ -167,17 +181,21 @@ private:
YQL_ENSURE(it != pendingRequests.end());
YQL_ENSURE(it->second.HasValue());
- const IS3Lister::TListResult& listResult = it->second.GetValue();
+ const NS3Lister::TListResult& listResult = it->second.GetValue();
if (listResult.index() == 1) {
const auto& issues = std::get<TIssues>(listResult);
- YQL_CLOG(INFO, ProviderS3) << "Discovery " << req.Url << req.Pattern << " error " << issues.ToString();
+ YQL_CLOG(INFO, ProviderS3) << "Discovery " << req.S3Request.Url << req.S3Request.Pattern << " error " << issues.ToString();
std::for_each(issues.begin(), issues.end(), std::bind(&TExprContext::AddError, std::ref(ctx), std::placeholders::_1));
return TStatus::Error;
}
- const auto& listEntries = std::get<IS3Lister::TListEntries>(listResult);
- for (auto& entry : listEntries) {
- listedPaths.emplace_back(entry.Path, entry.Size, false);
+ const auto& listEntries = std::get<NS3Lister::TListEntries>(listResult);
+ for (auto& entry: listEntries.Objects) {
+ listedPaths.emplace_back(
+ NS3Details::TPath{entry.Path, entry.Size, false});
+ }
+ for (auto& path: listEntries.Directories) {
+ listedPaths.emplace_back(NS3Details::TPath{path.Path, 0, false});
}
}
@@ -250,6 +268,15 @@ private:
return TStatus::Ok;
}
+ struct TExtraColumnValue {
+ TString Name;
+ TMaybe<NUdf::EDataSlot> Type;
+ TString Value;
+ bool operator<(const TExtraColumnValue& other) const {
+ return std::tie(Name, Type, Value) < std::tie(other.Name, other.Type, other.Value);
+ }
+ };
+
TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
// Raise errors if any
AllFuture_.GetValue();
@@ -277,15 +304,6 @@ private:
size_t readSize = 0;
TExprNode::TListType pathNodes;
- struct TExtraColumnValue {
- TString Name;
- TMaybe<NUdf::EDataSlot> Type;
- TString Value;
- bool operator<(const TExtraColumnValue& other) const {
- return std::tie(Name, Type, Value) < std::tie(other.Name, other.Type, other.Value);
- }
- };
-
TMap<TMaybe<TVector<TExtraColumnValue>>, NS3Details::TPathList> pathsByExtraValues;
const TGeneratedColumnsConfig* generatedColumnsConfig = nullptr;
if (auto it = genColumnsByNode.find(node); it != genColumnsByNode.end()) {
@@ -293,58 +311,55 @@ private:
}
const bool assumeDirectories = generatedColumnsConfig && generatedColumnsConfig->Generator;
+ bool needsListingOnActors = false;
for (auto& req : requests) {
auto it = pendingRequests.find(req);
YQL_ENSURE(it != pendingRequests.end());
YQL_ENSURE(it->second.HasValue());
- const IS3Lister::TListResult& listResult = it->second.GetValue();
+ const NS3Lister::TListResult& listResult = it->second.GetValue();
if (listResult.index() == 1) {
const auto& issues = std::get<TIssues>(listResult);
- YQL_CLOG(INFO, ProviderS3) << "Discovery " << req.Url << req.Pattern << " error " << issues.ToString();
+ YQL_CLOG(INFO, ProviderS3) << "Discovery " << req.S3Request.Url << req.S3Request.Pattern << " error " << issues.ToString();
std::for_each(issues.begin(), issues.end(), std::bind(&TExprContext::AddError, std::ref(ctx), std::placeholders::_1));
return TStatus::Error;
}
- const auto& listEntries = std::get<IS3Lister::TListEntries>(listResult);
- if (listEntries.empty() && !NS3::HasWildcards(req.Pattern)) {
+ const auto& listEntries = std::get<NS3Lister::TListEntries>(listResult);
+ if (listEntries.Size() == 0 && !NS3::HasWildcards(req.S3Request.Pattern)) {
// request to list particular files that are missing
ctx.AddError(TIssue(ctx.GetPosition(object.Pos()),
- TStringBuilder() << "Object " << req.Pattern << " doesn't exist."));
+ TStringBuilder() << "Object " << req.S3Request.Pattern << " doesn't exist."));
return TStatus::Error;
}
- for (auto& entry : listEntries) {
+ if (!listEntries.Directories.empty()) {
+ needsListingOnActors = true;
+ }
+
+ for (auto& entry: listEntries.Objects) {
TMaybe<TVector<TExtraColumnValue>> extraValues;
if (generatedColumnsConfig) {
- extraValues = TVector<TExtraColumnValue>{};
- if (!req.ColumnValues.empty()) {
- // explicit partitioning
- YQL_ENSURE(req.ColumnValues.size() == generatedColumnsConfig->Columns.size());
- for (auto& cv : req.ColumnValues) {
- TExtraColumnValue value;
- value.Name = cv.Name;
- value.Type = cv.Type;
- value.Value = cv.Value;
- extraValues->push_back(std::move(value));
- }
- } else {
- YQL_ENSURE(entry.MatchedGlobs.size() == generatedColumnsConfig->Columns.size());
- for (size_t i = 0; i < generatedColumnsConfig->Columns.size(); ++i) {
- TExtraColumnValue value;
- value.Name = generatedColumnsConfig->Columns[i];
- value.Value = entry.MatchedGlobs[i];
- extraValues->push_back(std::move(value));
- }
- }
+ extraValues = ExtractExtraColumnValues(
+ req, generatedColumnsConfig, entry.MatchedGlobs);
}
auto& pathList = pathsByExtraValues[extraValues];
- pathList.emplace_back(entry.Path, entry.Size, false);
+ pathList.emplace_back(NS3Details::TPath{entry.Path, entry.Size, false});
readSize += entry.Size;
}
+ for (auto& entry: listEntries.Directories) {
+ TMaybe<TVector<TExtraColumnValue>> extraValues;
+ if (generatedColumnsConfig) {
+ extraValues = ExtractExtraColumnValues(
+ req, generatedColumnsConfig, entry.MatchedGlobs);
+ }
+
+ auto& pathList = pathsByExtraValues[extraValues];
+ pathList.emplace_back(NS3Details::TPath{entry.Path, 0, true});
+ }
- YQL_CLOG(INFO, ProviderS3) << "Object " << req.Pattern << " has " << listEntries.size() << " items with total size " << readSize;
+ YQL_CLOG(INFO, ProviderS3) << "Object " << req.S3Request.Pattern << " has " << listEntries.Objects.size() << " items with total size " << readSize;
}
for (const auto& [extraValues, pathList] : pathsByExtraValues) {
@@ -438,6 +453,44 @@ private:
if (assumeDirectories) {
settings.push_back(ctx.NewList(settingsPos, { ctx.NewAtom(settingsPos, "directories", TNodeFlags::Default) }));
}
+ if (needsListingOnActors) {
+ TString pathPattern;
+ NS3Lister::ES3PatternVariant pathPatternVariant;
+ if (requests[0].Options == ES3ListingOptions::UnPartitionedDataset) {
+ pathPattern = requests[0].S3Request.Pattern;
+ pathPatternVariant = NS3Lister::ES3PatternVariant::PathPattern;
+ } else if (requests[0].Options == ES3ListingOptions::PartitionedDataset) {
+ pathPattern = requests[0].FilePattern;
+ pathPatternVariant = NS3Lister::ES3PatternVariant::FilePattern;
+ } else {
+ ctx.AddError(TIssue(
+ ctx.GetPosition(read.Pos()),
+ TStringBuilder()
+ << "Unknown listing option " << int(requests[0].Options)));
+ return TStatus::Error;
+ }
+ auto maybeSerializedVariant = NS3Lister::SerializePatternVariant(pathPatternVariant);
+ if (!maybeSerializedVariant.Defined()) {
+ ctx.AddError(TIssue(
+ ctx.GetPosition(read.Pos()),
+ TStringBuilder() << "Couldn't serialize path pattern variant "
+ << int(pathPatternVariant)));
+ return TStatus::Error;
+ }
+
+ settings.push_back(ctx.NewList(
+ settingsPos,
+ {
+ ctx.NewAtom(settingsPos, "pathpattern"),
+ ctx.NewAtom(settingsPos, pathPattern),
+ }));
+ settings.push_back(ctx.NewList(
+ settingsPos,
+ {
+ ctx.NewAtom(settingsPos, "pathpatternvariant"),
+ ctx.NewAtom(settingsPos, *maybeSerializedVariant),
+ }));
+ }
TExprNode::TPtr s3Object;
s3Object = Build<TS3Object>(ctx, object.Pos())
@@ -465,6 +518,33 @@ private:
return RemapExpr(input, output, replaces, ctx, TOptimizeExprSettings(nullptr));
}
+ static TVector<TExtraColumnValue> ExtractExtraColumnValues(
+ const TListRequest& req,
+ const TGeneratedColumnsConfig* generatedColumnsConfig,
+ const std::vector<TString>& matchedGlobs) {
+ auto extraValues = TVector<TExtraColumnValue>{};
+ if (!req.ColumnValues.empty()) {
+ // explicit partitioning
+ YQL_ENSURE(req.ColumnValues.size() == generatedColumnsConfig->Columns.size());
+ for (auto& cv: req.ColumnValues) {
+ TExtraColumnValue value;
+ value.Name = cv.Name;
+ value.Type = cv.Type;
+ value.Value = cv.Value;
+ extraValues.push_back(std::move(value));
+ }
+ } else {
+ YQL_ENSURE(matchedGlobs.size() == generatedColumnsConfig->Columns.size());
+ for (size_t i = 0; i < generatedColumnsConfig->Columns.size(); ++i) {
+ TExtraColumnValue value;
+ value.Name = generatedColumnsConfig->Columns[i];
+ value.Value = matchedGlobs[i];
+ extraValues.push_back(std::move(value));
+ }
+ }
+ return extraValues;
+ }
+
static bool ValidateProjection(TPositionHandle pos, const TPathGeneratorPtr& generator, const TVector<TString>& partitionedBy, TExprContext& ctx) {
const TSet<TString> partitionedBySet(partitionedBy.begin(), partitionedBy.end());
TSet<TString> projectionSet;
@@ -485,7 +565,7 @@ private:
return true;
}
- bool LaunchListsForNode(const TDqSourceWrap& source, TVector<NThreading::TFuture<IS3Lister::TListResult>>& futures, TExprContext& ctx) {
+ bool LaunchListsForNode(const TDqSourceWrap& source, TVector<NThreading::TFuture<NS3Lister::TListResult>>& futures, TExprContext& ctx) {
TS3DataSource dataSource = source.DataSource().Maybe<TS3DataSource>().Cast();
const auto& connect = State_->Configuration->Clusters.at(dataSource.Cluster().StringValue());
const auto& token = State_->Configuration->Tokens.at(dataSource.Cluster().StringValue());
@@ -508,24 +588,35 @@ private:
NS3Details::TPathList directories;
NS3Details::UnpackPathsList(path.Data().Literal().Value(), FromString<bool>(path.IsText().Literal().Value()), directories);
- TListRequest req;
- req.Token = tokenStr;
- req.Url = url;
- for (const auto& directory : directories) {
- req.Pattern = NS3::NormalizePath(TStringBuilder() << directory.Path << "/" << effectiveFilePattern);
- RequestsByNode_[source.Raw()].push_back(req);
- if (PendingRequests_.find(req) == PendingRequests_.end()) {
- auto future = Lister_->List(req.Token, req.Url, req.Pattern);
- PendingRequests_[req] = future;
- futures.push_back(std::move(future));
- }
+ auto req = TListRequest{.S3Request{
+ .Url = url,
+ .Token = tokenStr,
+ .PatternType = NS3Lister::ES3PatternType::Wildcard}};
+ RequestsByNode_[source.Raw()].push_back(req);
+
+ std::vector<TString> paths;
+ for (const auto& prefixObject: directories) {
+ paths.emplace_back(prefixObject.Path);
}
+
+ auto future = ListingBatchStrategy_->List(
+ req.S3Request,
+ paths,
+ [effectiveFilePattern](
+ const NS3Lister::TListingRequest& baseRequest, const TString& prefix) {
+ Y_UNUSED(baseRequest);
+ return NS3::NormalizePath(
+ TStringBuilder() << prefix << "/" << effectiveFilePattern);
+ },
+ ES3ListingOptions::UnPartitionedDataset);
+ PendingRequests_[req] = future;
+ futures.push_back(std::move(future));
}
return true;
}
- bool LaunchListsForNode(const TS3Read& read, TVector<NThreading::TFuture<IS3Lister::TListResult>>& futures, TExprContext& ctx) {
+ bool LaunchListsForNode(const TS3Read& read, TVector<NThreading::TFuture<NS3Lister::TListResult>>& futures, TExprContext& ctx) {
const auto& settings = *read.Ref().Child(4);
// schema is required
@@ -618,9 +709,9 @@ private:
// each path in CONCAT() can generate multiple list requests for explicit partitioning
TVector<TListRequest> reqs;
- TListRequest req;
- req.Token = tokenStr;
- req.Url = url;
+ auto req = TListRequest{
+ .S3Request{.Url = url, .Token = tokenStr},
+ .FilePattern = effectiveFilePattern};
if (partitionedBy.empty()) {
if (path.empty()) {
@@ -628,7 +719,7 @@ private:
return false;
}
if (path.EndsWith("/")) {
- req.Pattern = path + effectiveFilePattern;
+ req.S3Request.Pattern = path + effectiveFilePattern;
} else {
// treat paths as regular wildcard patterns
if (filePattern) {
@@ -636,9 +727,13 @@ private:
return false;
}
- req.Pattern = path;
+ req.S3Request.Pattern = path;
}
- req.Pattern = NS3::NormalizePath(req.Pattern);
+ req.S3Request.Pattern = NS3::NormalizePath(req.S3Request.Pattern);
+ req.S3Request.PatternType = NS3Lister::ES3PatternType::Wildcard;
+ req.S3Request.Prefix = req.S3Request.Pattern.substr(
+ 0, NS3::GetFirstWildcardPos(req.S3Request.Pattern));
+ req.Options = ES3ListingOptions::UnPartitionedDataset;
reqs.push_back(req);
} else {
if (NS3::HasWildcards(path)) {
@@ -647,14 +742,14 @@ private:
}
if (!config.Generator) {
// Hive-style partitioning
- req.PathPrefix = path;
+ req.S3Request.Prefix = path;
if (!path.empty()) {
- req.PathPrefix = NS3::NormalizePath(TStringBuilder() << path << "/");
- if (req.PathPrefix == "/") {
- req.PathPrefix = "";
+ req.S3Request.Prefix = NS3::NormalizePath(TStringBuilder() << path << "/");
+ if (req.S3Request.Prefix == "/") {
+ req.S3Request.Prefix = "";
}
}
- TString pp = *req.PathPrefix;
+ TString pp = req.S3Request.Prefix;
if (!pp.empty() && pp.back() == '/') {
pp.pop_back();
}
@@ -668,14 +763,20 @@ private:
generated << NS3::EscapeRegex(col) << "=(.*?)";
}
generated << '/' << NS3::RegexFromWildcards(effectiveFilePattern);
- req.Pattern = generated;
+ req.S3Request.Pattern = generated;
+ req.S3Request.PatternType = NS3Lister::ES3PatternType::Regexp;
+ req.Options = ES3ListingOptions::PartitionedDataset;
reqs.push_back(req);
} else {
for (auto& rule : config.Generator->GetRules()) {
YQL_ENSURE(rule.ColumnValues.size() == config.Columns.size());
req.ColumnValues.assign(rule.ColumnValues.begin(), rule.ColumnValues.end());
// Pattern will be directory path
- req.Pattern = NS3::NormalizePath(TStringBuilder() << path << "/" << rule.Path);
+ req.S3Request.Pattern = NS3::NormalizePath(TStringBuilder() << path << "/" << rule.Path);
+ req.S3Request.PatternType = NS3Lister::ES3PatternType::Wildcard;
+ req.S3Request.Prefix = req.S3Request.Pattern.substr(
+ 0, NS3::GetFirstWildcardPos(req.S3Request.Pattern));
+ req.Options = ES3ListingOptions::PartitionedDataset;
reqs.push_back(req);
}
}
@@ -684,16 +785,16 @@ private:
for (auto& req : reqs) {
RequestsByNode_[read.Raw()].push_back(req);
if (PendingRequests_.find(req) == PendingRequests_.end()) {
- NThreading::TFuture<IS3Lister::TListResult> future;
+ NThreading::TFuture<NS3Lister::TListResult> future;
if (config.Generator) {
// postpone actual directory listing (will do it after path pruning)
- IS3Lister::TListEntries entries(1);
- entries.back().Path = req.Pattern;
- future = NThreading::MakeFuture<IS3Lister::TListResult>(std::move(entries));
+ NS3Lister::TListEntries entries{
+ std::vector<NS3Lister::TObjectListEntry>{1},
+ std::vector<NS3Lister::TDirectoryListEntry>{}};
+ entries.Objects.back().Path = req.S3Request.Pattern;
+ future = NThreading::MakeFuture<NS3Lister::TListResult>(std::move(entries));
} else {
- future = req.PathPrefix.Defined() ?
- Lister_->ListRegex(req.Token, req.Url, req.Pattern, *req.PathPrefix) :
- Lister_->List(req.Token, req.Url, req.Pattern);
+ future = ListingStrategy_->List(req.S3Request, req.Options);
}
PendingRequests_[req] = future;
futures.push_back(std::move(future));
@@ -705,7 +806,9 @@ private:
}
const TS3State::TPtr State_;
- const IS3Lister::TPtr Lister_;
+ const IS3ListerFactory::TPtr ListerFactory_;
+ const IS3ListingStrategy::TPtr ListingStrategy_;
+ const IS3BatchListingStrategy::TPtr ListingBatchStrategy_;
TPendingRequests PendingRequests_;
TNodeMap<TVector<TListRequest>> RequestsByNode_;
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp
deleted file mode 100644
index 4fc30924b1..0000000000
--- a/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp
+++ /dev/null
@@ -1,330 +0,0 @@
-#include "yql_s3_list.h"
-#include "yql_s3_path.h"
-
-#include <ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h>
-#include <ydb/library/yql/providers/s3/common/util.h>
-#include <ydb/library/yql/utils/log/log.h>
-#include <ydb/library/yql/utils/url_builder.h>
-#include <ydb/library/yql/utils/yql_panic.h>
-
-#include <contrib/libs/re2/re2/re2.h>
-
-#ifdef THROW
-#undef THROW
-#endif
-#include <library/cpp/threading/future/async_semaphore.h>
-#include <library/cpp/xml/document/xml-document.h>
-#include <util/folder/iterator.h>
-#include <util/generic/guid.h>
-#include <util/string/builder.h>
-
-namespace NYql {
-
-namespace {
-
-using namespace NThreading;
-
-class TS3Lister : public IS3Lister {
-public:
- explicit TS3Lister(const IHTTPGateway::TPtr& httpGateway, ui64 maxFilesPerQuery, bool allowLocalFiles)
- : Gateway(httpGateway)
- , MaxFilesPerQuery(maxFilesPerQuery)
- , AllowLocalFiles(allowLocalFiles)
- {}
-private:
- using TResultFilter = std::function<bool (const TString& path, TVector<TString>& matchedGlobs)>;
-
- static TResultFilter MakeFilter(const TString& pattern, const TMaybe<TString>& regexPatternPrefix, TString& prefix) {
- const bool isRegex = regexPatternPrefix.Defined();
- prefix = isRegex ? *regexPatternPrefix : pattern.substr(0, NS3::GetFirstWildcardPos(pattern));
- if (!isRegex && prefix == pattern) {
- // just match for equality
- return [pattern](const TString& path, TVector<TString>& matchedGlobs) {
- matchedGlobs.clear();
- return path == pattern;
- };
- }
-
- const auto regex = isRegex ? pattern : NS3::RegexFromWildcards(pattern);
- auto re = std::make_shared<RE2>(re2::StringPiece(regex), RE2::Options());
- YQL_ENSURE(re->ok());
-
- const size_t numGroups = re->NumberOfCapturingGroups();
- YQL_CLOG(INFO, ProviderS3) << "Got prefix: '" << prefix << "', regex: '" << regex
- << "' with " << numGroups << " capture groups from original pattern '" << pattern << "'";
-
- auto groups = std::make_shared<std::vector<std::string>>(numGroups);
- auto reArgs = std::make_shared<std::vector<re2::RE2::Arg>>(numGroups);
- auto reArgsPtr = std::make_shared<std::vector<re2::RE2::Arg*>>(numGroups);
-
- for (size_t i = 0; i < numGroups; ++i) {
- (*reArgs)[i] = &(*groups)[i];
- (*reArgsPtr)[i] = &(*reArgs)[i];
- }
-
- return [groups, reArgs, reArgsPtr, re](const TString& path, TVector<TString>& matchedGlobs) {
- matchedGlobs.clear();
- bool matched = re2::RE2::FullMatchN(path, *re, reArgsPtr->data(), reArgsPtr->size());
- if (matched) {
- matchedGlobs.reserve(groups->size());
- for (auto& group : *groups) {
- matchedGlobs.push_back(ToString(group));
- }
- }
- return matched;
- };
- }
-
- static void OnDiscovery(
- const IHTTPGateway::TWeakPtr& gatewayWeak,
- IHTTPGateway::TResult&& result,
- NThreading::TPromise<IS3Lister::TListResult> promise,
- const std::shared_ptr<IS3Lister::TListEntries>& output,
- const IRetryPolicy<long>::TPtr& retryPolicy,
- const TResultFilter& filter,
- const TString& token,
- const TString& urlStr,
- const TString& prefix,
- const TString& requestId,
- ui64 maxDiscoveryFilesPerQuery)
- try {
- auto gateway = gatewayWeak.lock();
- if (!gateway) {
- ythrow yexception() << "Gateway disappeared";
- }
- switch (result.index()) {
- case 0U: {
- const NXml::TDocument xml(std::get<IHTTPGateway::TContent>(std::move(result)).Extract(), NXml::TDocument::String);
- if (const auto& root = xml.Root(); root.Name() == "Error") {
- const auto& code = root.Node("Code", true).Value<TString>();
- const auto& message = root.Node("Message", true).Value<TString>();
- ythrow yexception() << message << ", error: code: " << code << " [" << urlStr << prefix << "], request id: [" << requestId << "]";
- } else if (root.Name() != "ListBucketResult") {
- ythrow yexception() << "Unexpected response '" << root.Name() << "' on discovery, request id: [" << requestId << "]";
- } else if (
- const NXml::TNamespacesForXPath nss(1U, {"s3", "http://s3.amazonaws.com/doc/2006-03-01/"});
- root.Node("s3:KeyCount", false, nss).Value<unsigned>() > 0U)
- {
- const auto& contents = root.XPath("s3:Contents", false, nss);
- YQL_CLOG(INFO, ProviderS3) << "Listing of " << urlStr << prefix << ": have " << output->size() << " entries, got another " << contents.size() << " entries, request id: [" << requestId << "]";
- if (maxDiscoveryFilesPerQuery && output->size() + contents.size() > maxDiscoveryFilesPerQuery) {
- ythrow yexception() << "Over " << maxDiscoveryFilesPerQuery << " files discovered in '" << urlStr << prefix << "', request id: [" << requestId << "]";
- }
-
- for (const auto& content : contents) {
- TString path = content.Node("s3:Key", false, nss).Value<TString>();
- if (path.EndsWith('/')) {
- // skip 'directories'
- continue;
- }
- TVector<TString> matchedGlobs;
- if (filter(path, matchedGlobs)) {
- output->emplace_back();
- output->back().Path = path;
- output->back().Size = content.Node("s3:Size", false, nss).Value<ui64>();
- output->back().MatchedGlobs.swap(matchedGlobs);
- }
- }
-
- if (root.Node("s3:IsTruncated", false, nss).Value<bool>()) {
- YQL_CLOG(INFO, ProviderS3) << "Listing of " << urlStr << prefix << ": got truncated flag, will continue";
- const auto& next = root.Node("s3:NextContinuationToken", false, nss).Value<TString>();
- const auto& maxKeys = root.Node("s3:MaxKeys", false, nss).Value<TString>();
-
- IHTTPGateway::THeaders headers;
- if (!token.empty()) {
- headers.emplace_back("X-YaCloud-SubjectToken:" + token);
- }
-
- TString requestId = CreateGuidAsString();
- headers.emplace_back(TString{"X-Request-ID:"} + requestId);
-
- TUrlBuilder urlBuilder(urlStr);
- auto url = urlBuilder.AddUrlParam("list-type", "2")
- .AddUrlParam("prefix", prefix)
- .AddUrlParam("continuation-token", next)
- .AddUrlParam("max-keys", maxKeys)
- .Build();
-
- return gateway->Download(
- url,
- std::move(headers),
- 0U,
- 0U,
- std::bind(&OnDiscovery,
- IHTTPGateway::TWeakPtr(gateway),
- std::placeholders::_1,
- promise,
- output,
- retryPolicy,
- filter,
- token,
- urlStr,
- prefix,
- requestId,
- maxDiscoveryFilesPerQuery),
- /*data=*/"",
- retryPolicy);
- }
- }
- promise.SetValue(std::move(*output));
- break;
- }
- case 1U: {
- auto issues = std::get<TIssues>(std::move(result));
- issues = NS3Util::AddParentIssue(TStringBuilder{} << "request id: [" << requestId << "]", std::move(issues));
- YQL_CLOG(INFO, ProviderS3) << "Listing of " << urlStr << prefix << ": got error from http gateway: " << issues.ToString(true);
- promise.SetValue(std::move(issues));
- break;
- }
- default:
- ythrow yexception() << "Undefined variant index: " << result.index() << ", request id: [" << requestId << "]";
- }
- } catch (const std::exception& ex) {
- YQL_CLOG(INFO, ProviderS3) << "Listing of " << urlStr << prefix << " : got exception: " << ex.what();
- promise.SetException(std::current_exception());
- }
-
- TFuture<TListResult> DoList(const TString& token, const TString& urlStr, const TString& pattern, const TMaybe<TString>& pathPrefix) {
- TString prefix;
- TResultFilter filter = MakeFilter(pattern, pathPrefix, prefix);
- auto promise = NewPromise<IS3Lister::TListResult>();
- auto future = promise.GetFuture();
-
- if (urlStr.substr(0, 7) == "file://") {
- try {
- if (!AllowLocalFiles) {
- ythrow yexception() << "Using local files as DataSource isn't allowed, but trying access " << urlStr;
- }
- auto fullPath = urlStr.substr(7);
- for (const auto &e: TPathSplit(fullPath)) {
- if (e == "..") {
- ythrow yexception() << "Security violation: trying access parent directory in path";
- }
- }
-
- IS3Lister::TListEntries output;
-
- for (const auto& entry: TDirIterator(fullPath)) {
- if (entry.fts_type != FTS_F) {
- continue;
- }
-
- auto filename = TString(entry.fts_path + urlStr.size() - 7);
- TVector<TString> matches;
- if (filter(filename, matches)) {
- output.emplace_back();
- output.back().Path = filename;
- output.back().Size = entry.fts_statp->st_size;
- output.back().MatchedGlobs.swap(matches);
- }
- }
-
- promise.SetValue(std::move(output));
- } catch (const std::exception& ex) {
- promise.SetException(std::current_exception());
- }
- return future;
- }
-
-
- const auto retryPolicy = GetHTTPDefaultRetryPolicy();
- TUrlBuilder urlBuilder(urlStr);
- const auto url = urlBuilder
- .AddUrlParam("list-type", "2")
- .AddUrlParam("prefix", prefix)
- .Build();
-
- IHTTPGateway::THeaders headers;
- if (!token.empty()) {
- headers.emplace_back("X-YaCloud-SubjectToken:" + token);
- }
-
- TString requestId = CreateGuidAsString();
- headers.emplace_back(TString{"X-Request-ID:"} + requestId);
-
- Gateway->Download(
- url,
- std::move(headers),
- 0U,
- 0U,
- std::bind(&OnDiscovery,
- IHTTPGateway::TWeakPtr(Gateway),
- std::placeholders::_1,
- promise,
- std::make_shared<IS3Lister::TListEntries>(),
- retryPolicy,
- filter,
- token,
- urlStr,
- prefix,
- requestId,
- MaxFilesPerQuery),
- /*data=*/"",
- retryPolicy);
- return future;
- }
-
- NThreading::TFuture<TListResult> List(const TString& token, const TString& url, const TString& pattern) override {
- YQL_CLOG(INFO, ProviderS3) << "Enumerating items using glob pattern " << url << pattern;
- return DoList(token, url, pattern, {});
- }
-
- NThreading::TFuture<TListResult> ListRegex(const TString& token, const TString& url, const TString& pattern, const TString& pathPrefix) override {
- YQL_CLOG(INFO, ProviderS3) << "Enumerating items using RE2 pattern " << url << pattern;
- return DoList(token, url, pattern, pathPrefix);
- }
-
- const IHTTPGateway::TPtr Gateway;
- const ui64 MaxFilesPerQuery;
- const bool AllowLocalFiles;
-};
-
-class TS3ParallelLimitedLister : public IS3Lister {
-public:
- explicit TS3ParallelLimitedLister(const IS3Lister::TPtr& lister, size_t maxParallelOps = 1)
- : Lister(lister)
- , Semaphore(TAsyncSemaphore::Make(std::max<size_t>(1, maxParallelOps)))
- {}
-
-private:
- TFuture<TListResult> DoList(const TString& token, const TString& url, const TString& pattern, const TMaybe<TString>& pathPrefix) {
- auto promise = NewPromise<TListResult>();
- auto future = promise.GetFuture();
- auto acquired = Semaphore->AcquireAsync();
- acquired.Subscribe([lister = Lister, promise, token, url, pattern, pathPrefix](const auto& f) {
- auto lock = std::make_shared<TAsyncSemaphore::TAutoRelease>(f.GetValue()->MakeAutoRelease());
- TFuture<TListResult> listFuture = pathPrefix.Defined() ?
- lister->ListRegex(token, url, pattern, *pathPrefix) :
- lister->List(token, url, pattern);
- listFuture.Subscribe([promise, lock](const auto& f) mutable {
- try {
- promise.SetValue(f.GetValue());
- } catch (...) {
- promise.SetException(std::current_exception());
- }
- });
- });
- return future;
- }
-
- TFuture<TListResult> List(const TString& token, const TString& url, const TString& pattern) override {
- return DoList(token, url, pattern, {});
- }
-
- TFuture<TListResult> ListRegex(const TString& token, const TString& url, const TString& pattern, const TString& pathPrefix) override {
- return DoList(token, url, pattern, pathPrefix);
- }
-
- const IS3Lister::TPtr Lister;
- const TAsyncSemaphore::TPtr Semaphore;
-};
-
-}
-
-IS3Lister::TPtr IS3Lister::Make(const IHTTPGateway::TPtr& httpGateway, ui64 maxFilesPerQuery, ui64 maxInflightListsPerQuery, bool allowLocalFiles) {
- auto lister = IS3Lister::TPtr(new TS3Lister(httpGateway, maxFilesPerQuery, allowLocalFiles));
- return IS3Lister::TPtr(new TS3ParallelLimitedLister(lister, maxInflightListsPerQuery));
-}
-
-}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_list.h b/ydb/library/yql/providers/s3/provider/yql_s3_list.h
deleted file mode 100644
index 04f5dd1a74..0000000000
--- a/ydb/library/yql/providers/s3/provider/yql_s3_list.h
+++ /dev/null
@@ -1,41 +0,0 @@
-#pragma once
-
-#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
-
-#include <library/cpp/threading/future/future.h>
-
-#include <variant>
-#include <vector>
-#include <memory>
-
-namespace NYql {
-
-class IS3Lister {
-public:
- using TPtr = std::shared_ptr<IS3Lister>;
-
- struct TListEntry {
- TString Path;
- ui64 Size = 0;
- std::vector<TString> MatchedGlobs;
- };
-
- using TListEntries = std::vector<TListEntry>;
- using TListResult = std::variant<TListEntries, TIssues>;
-
- virtual ~IS3Lister() = default;
- // List all S3 objects matching wildcard pattern.
- // Pattern may include following wildcard expressions:
- // * - any (possibly empty) sequence of characters
- // ? - single character
- // {variant1, variant2} - list of alternatives
- virtual NThreading::TFuture<TListResult> List(const TString& token, const TString& url, const TString& pattern) = 0;
-
- // pattern should be valid RE2 regex
- // pathPrefix is a "constant" path prefix
- virtual NThreading::TFuture<TListResult> ListRegex(const TString& token, const TString& url, const TString& pattern, const TString& pathPrefix) = 0;
-
- static TPtr Make(const IHTTPGateway::TPtr& httpGateway, ui64 maxFilesPerQuery, ui64 maxInflightListsPerQuery, bool allowLocalFiles);
-};
-
-} \ No newline at end of file
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp
new file mode 100644
index 0000000000..1d8d4f4463
--- /dev/null
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp
@@ -0,0 +1,724 @@
+#include "yql_s3_listing_strategy.h"
+
+#include <ydb/library/yql/providers/s3/common/util.h>
+#include <ydb/library/yql/providers/s3/object_listers/yql_s3_future_algorithms.h>
+#include <ydb/library/yql/providers/s3/object_listers/yql_s3_path.h>
+#include <ydb/library/yql/utils/log/log.h>
+#include <ydb/library/yql/utils/url_builder.h>
+
+#ifdef THROW
+#undef THROW
+#endif
+#include <library/cpp/threading/future/async_semaphore.h>
+#include <library/cpp/xml/document/xml-document.h>
+
+#include <util/generic/overloaded.h>
+#include <deque>
+#include <utility>
+
+namespace NYql {
+
+IOutputStream& operator<<(IOutputStream& stream, ES3ListingOptions option) {
+ switch (option) {
+ case ES3ListingOptions::NoOptions:
+ stream << "[NoOptions]";
+ break;
+ case ES3ListingOptions::PartitionedDataset:
+ stream << "[PartitionedDataset]";
+ break;
+ case ES3ListingOptions::UnPartitionedDataset:
+ stream << "[UnPartitionedDataset]";
+ break;
+ default:
+ ythrow yexception() << "Undefined option: " << int(option);
+ }
+ return stream;
+}
+
+namespace {
+
+using namespace NThreading;
+
+TIssue MakeLimitExceededIssue() {
+ auto issue = TIssue("Limit exceeded");
+ issue.SetCode(0U, ESeverity::TSeverityIds_ESeverityId_S_WARNING);
+ return issue;
+}
+
+bool IsRecoverableIssue(const TIssues& issues) {
+ if (issues.Size() != 1) {
+ return false;
+ }
+
+ return issues.begin()->GetSeverity() >= ESeverity::TSeverityIds_ESeverityId_S_WARNING;
+}
+
+class TCollectingS3ListingStrategy : public IS3ListingStrategy {
+public:
+ using TListerFactoryMethod = std::function<TFuture<NS3Lister::IS3Lister::TPtr>(
+ const NS3Lister::TListingRequest& listingRequest, ES3ListingOptions options)>;
+
+ TCollectingS3ListingStrategy(size_t limit, TListerFactoryMethod&& listerFactoryMethod)
+ : Limit(limit)
+ , ListerFactoryMethod(std::move(listerFactoryMethod)) { }
+
+ TFuture<NS3Lister::TListResult> List(
+ const NS3Lister::TListingRequest& listingRequest,
+ ES3ListingOptions options) override {
+ Y_UNUSED(options);
+ auto futureLister = ListerFactoryMethod(listingRequest, options);
+ return futureLister.Apply([this, listingRequest, options](
+ const TFuture<NS3Lister::IS3Lister::TPtr>& lister) {
+ try {
+ return DoListCallback(lister.GetValue(), options);
+ } catch (...) {
+ return MakeErrorFuture<NS3Lister::TListResult>(std::current_exception());
+ }
+ });
+ }
+
+private:
+ static inline auto MakeNewListingChunkHandler(
+ NS3Lister::TListResult& state, size_t limit) {
+ return [&state, limit](NS3Lister::TListEntries&& chunkEntries) {
+ auto& stateEntries = std::get<NS3Lister::TListEntries>(state);
+ if (stateEntries.Size() + chunkEntries.Size() > limit) {
+ YQL_CLOG(INFO, ProviderS3)
+ << "[TCollectingS3ListingStrategy] Collected "
+ << stateEntries.Size() + chunkEntries.Size()
+ << " object paths which is more than limit " << limit;
+ state = TIssues{MakeLimitExceededIssue()};
+ return EAggregationAction::Stop;
+ }
+ stateEntries += std::move(chunkEntries);
+ return EAggregationAction::Proceed;
+ };
+ }
+ static inline auto MakeIssuesHandler(NS3Lister::TListResult& state) {
+ return [&state](const TIssues& issues) {
+ state = issues;
+ return EAggregationAction::Stop;
+ };
+ }
+ static inline EAggregationAction ExceptionHandler(
+ NS3Lister::TListResult& state, const std::exception& exception) {
+ state = TIssues{TIssue{exception.what()}};
+ return EAggregationAction::Stop;
+ }
+ TFuture<NS3Lister::TListResult> DoListCallback(
+ NS3Lister::IS3Lister::TPtr lister, ES3ListingOptions options) const {
+ Y_UNUSED(options);
+ return NYql::AccumulateWithEarlyStop<NS3Lister::TListResult>(
+ std::move(lister),
+ NS3Lister::TListResult{},
+ [limit = Limit](NS3Lister::TListResult& state, NS3Lister::TListResult&& chunk) {
+ return std::visit(
+ TOverloaded{
+ std::move(MakeNewListingChunkHandler(state, limit)),
+ std::move(MakeIssuesHandler(state))},
+ std::move(chunk));
+ },
+ ExceptionHandler);
+ }
+
+private:
+ const size_t Limit;
+ const TListerFactoryMethod ListerFactoryMethod;
+};
+
+class TFlatFileS3ListingStrategy : public TCollectingS3ListingStrategy {
+public:
+ TFlatFileS3ListingStrategy(
+ const IS3ListerFactory::TPtr& listerFactory,
+ const IHTTPGateway::TPtr& httpGateway,
+ size_t limit,
+ bool allowLocalFiles)
+ : TCollectingS3ListingStrategy(
+ limit,
+ [allowLocalFiles, httpGateway, listerFactory](
+ const NS3Lister::TListingRequest& listingRequest,
+ ES3ListingOptions options) {
+ Y_UNUSED(options);
+ return listerFactory->Make(
+ httpGateway, listingRequest, Nothing(), allowLocalFiles);
+ }) { }
+};
+
+class TDirectoryS3ListingStrategy : public TCollectingS3ListingStrategy {
+public:
+ TDirectoryS3ListingStrategy(
+ const IS3ListerFactory::TPtr& listerFactory,
+ const IHTTPGateway::TPtr& httpGateway,
+ size_t limit,
+ bool allowLocalFiles)
+ : TCollectingS3ListingStrategy(
+ limit,
+ [allowLocalFiles, httpGateway, listerFactory](
+ const NS3Lister::TListingRequest& listingRequest,
+ ES3ListingOptions options) {
+ Y_UNUSED(options);
+ return listerFactory->Make(
+ httpGateway, listingRequest, "/", allowLocalFiles);
+ }) { }
+};
+
+class TCompositeS3ListingStrategy : public IS3ListingStrategy {
+public:
+ using TStrategyContainer = std::vector<std::shared_ptr<IS3ListingStrategy>>;
+ using TStrategyContainerPtr = std::shared_ptr<TStrategyContainer>;
+
+ class TStrategyListIterator : public NS3Lister::IS3Lister {
+ public:
+ TStrategyListIterator(
+ NS3Lister::TListingRequest listingRequest,
+ ES3ListingOptions options,
+ TStrategyContainerPtr strategies)
+ : ListingRequest(std::move(listingRequest))
+ , Options(options)
+ , Strategies(std::move(strategies))
+ , Iterator(Strategies->cbegin())
+ , End(Strategies->end()) { }
+
+ TFuture<NS3Lister::TListResult> Next() override {
+ return (*Iterator)->List(ListingRequest, Options);
+ }
+
+ bool HasNext() override {
+ if (!IsFirst) {
+ Iterator++;
+ } else {
+ IsFirst = false;
+ }
+ return Iterator != End;
+ }
+
+ private:
+ const NS3Lister::TListingRequest ListingRequest;
+ const ES3ListingOptions Options;
+ const TStrategyContainerPtr Strategies;
+ TStrategyContainer::const_iterator Iterator;
+ const TStrategyContainer::const_iterator End;
+ bool IsFirst = true;
+ };
+
+ class AggregationState {
+ public:
+ NS3Lister::TListResult Result;
+ bool Set = false;
+ };
+
+ explicit TCompositeS3ListingStrategy(TStrategyContainer&& strategies)
+ : Strategies(std::make_shared<TStrategyContainer>(std::move(strategies))) { }
+
+ TFuture<NS3Lister::TListResult> List(
+ const NS3Lister::TListingRequest& listingRequest,
+ ES3ListingOptions options) override {
+ auto strategyListIterator =
+ std::make_unique<TStrategyListIterator>(listingRequest, options, Strategies);
+
+ return NYql::AccumulateWithEarlyStop<AggregationState>(
+ std::move(strategyListIterator),
+ AggregationState{NS3Lister::TListEntries{}},
+ [](AggregationState& state, NS3Lister::TListResult&& chunk) {
+ auto newChunkHandler =
+ [&state](NS3Lister::TListEntries&& chunkEntries) mutable {
+ YQL_CLOG(INFO, ProviderS3)
+ << "[TCompositeS3ListingStrategy] Strategy successfully listed paths. Returning result: "
+ << chunkEntries.Objects.size() << " objects, "
+ << chunkEntries.Directories.size() << " path prefixes";
+ std::get<NS3Lister::TListEntries>(state.Result) =
+ chunkEntries;
+ state.Set = true;
+ return EAggregationAction::Stop;
+ };
+ auto errorHandler = [&state](const TIssues& issues) mutable {
+ if (IsRecoverableIssue(issues)) {
+ YQL_CLOG(INFO, ProviderS3)
+ << "[TCompositeS3ListingStrategy] Strategy failed "
+ << " to list paths. Trying next one... ";
+ return EAggregationAction::Proceed;
+ }
+
+ state.Result = issues;
+ state.Set = true;
+ return EAggregationAction::Stop;
+ };
+ return std::visit(
+ TOverloaded{
+ std::move(newChunkHandler),
+ std::move(errorHandler),
+ },
+ std::move(chunk));
+ },
+ [](AggregationState& state, const std::exception& exception) {
+ state.Result = TIssues{TIssue{exception.what()}};
+ state.Set = true;
+ return EAggregationAction::Stop;
+ })
+ .Apply([](const TFuture<AggregationState>& state) {
+ auto& result = state.GetValue();
+ return result.Set ? result.Result
+ : TIssues{TIssue("No more strategy to test")};
+ });
+ }
+
+private:
+ const TStrategyContainerPtr Strategies;
+};
+
+class TConditionalS3ListingStrategy : public IS3ListingStrategy {
+public:
+ using TStrategyCondition = std::function<bool(ES3ListingOptions options)>;
+ using TPair = std::pair<TStrategyCondition, std::shared_ptr<IS3ListingStrategy>>;
+
+ TConditionalS3ListingStrategy(std::initializer_list<TPair> list)
+ : Strategies(list.begin(), list.end()) { }
+
+ TFuture<NS3Lister::TListResult> List(
+ const NS3Lister::TListingRequest& listingRequest,
+ ES3ListingOptions options) override {
+ auto strategyIt = std::find_if(
+ Strategies.begin(), Strategies.end(), [options](const TPair& record) {
+ auto [condition, strategy] = record;
+ return condition(options);
+ });
+
+ if (strategyIt == Strategies.end()) {
+ auto issue = TIssues{TIssue("No strategy matched listing request")};
+ return MakeFuture(NS3Lister::TListResult{issue});
+ }
+
+ return strategyIt->second->List(listingRequest, options);
+ }
+
+private:
+ const std::vector<TPair> Strategies;
+};
+
+class TPartitionedDatasetS3ListingStrategy : public TCollectingS3ListingStrategy {
+public:
+ class PartitionedDirectoryResolverIterator : public NS3Lister::IS3Lister {
+ public:
+ PartitionedDirectoryResolverIterator(
+ NS3Lister::TListingRequest defaultParams,
+ const TString& basePrefix,
+ ES3ListingOptions options,
+ TDirectoryS3ListingStrategy directoryListingStrategy)
+ : DefaultParams(std::move(defaultParams))
+ , Options(options)
+ , DirectoryPrefixQueue(std::make_shared<std::deque<TString>>())
+ , DirectoryListingStrategy(std::move(directoryListingStrategy)) {
+ DirectoryPrefixQueue->push_front(basePrefix);
+ }
+
+ /**
+ * For each directory in listing queue:
+ * 1) List path
+ * 2) If files are matched against regexp - remember them (extract extra columns)
+ * 3) If file is not matched against regexp - filter it out
+ * 3) If directory is matched against regexp - remember them (extract extra columns)
+ * 4) If directory is not matched against regexp - list this directory recursively
+ * 5) if there is no directory to list - algorithm ends
+ */
+ TFuture<NS3Lister::TListResult> Next() override {
+ First = false;
+ NextDirectoryListeningChunk = GetNextPrefixLister().Apply(
+ [queue = DirectoryPrefixQueue](
+ const TFuture<NS3Lister::TListResult>& future) -> NS3Lister::TListResult {
+ try {
+ auto& nextBatch = future.GetValue();
+ if (std::holds_alternative<TIssues>(nextBatch)) {
+ return std::get<TIssues>(nextBatch);
+ }
+
+ auto& listingResult = std::get<NS3Lister::TListEntries>(nextBatch);
+ auto result = NS3Lister::TListEntries{};
+ result.Objects.insert(
+ result.Objects.begin(),
+ listingResult.Objects.cbegin(),
+ listingResult.Objects.cend());
+ for (auto& directoryPrefix : listingResult.Directories) {
+ if (directoryPrefix.MatchedGlobs.empty()) {
+ // We need to list until extra columns are extracted
+ queue->push_back(directoryPrefix.Path);
+ } else {
+ result.Directories.push_back(directoryPrefix);
+ }
+ }
+
+ return NS3Lister::TListResult{result};
+ } catch (std::exception& e) {
+ return TIssues{TIssue{e.what()}};
+ }
+ });
+ return NextDirectoryListeningChunk;
+ }
+
+ bool HasNext() override {
+ if (!First) {
+ NextDirectoryListeningChunk.Wait();
+ }
+ return !DirectoryPrefixQueue->empty();
+ }
+
+ private:
+ TFuture<NS3Lister::TListResult> GetNextPrefixLister() {
+ if (DirectoryPrefixQueue->empty()) {
+ return MakeFuture(
+ NS3Lister::TListResult{TIssues{TIssue{"No path to list"}}});
+ }
+ auto prefix = DirectoryPrefixQueue->front();
+ DirectoryPrefixQueue->pop_front();
+
+ auto request = NS3Lister::TListingRequest(DefaultParams);
+ request.Prefix = prefix;
+
+ return DirectoryListingStrategy.List(request, Options);
+ }
+
+ private:
+ const NS3Lister::TListingRequest DefaultParams;
+ const ES3ListingOptions Options;
+ std::shared_ptr<std::deque<TString>> DirectoryPrefixQueue;
+ std::vector<NS3Lister::TObjectListEntry> Objects;
+ TDirectoryS3ListingStrategy DirectoryListingStrategy;
+ TFuture<NS3Lister::TListResult> NextDirectoryListeningChunk;
+ bool First = true;
+ };
+
+ TPartitionedDatasetS3ListingStrategy(
+ const IS3ListerFactory::TPtr& listerFactory,
+ const IHTTPGateway::TPtr& httpGateway,
+ size_t limit,
+ bool allowLocalFiles)
+ : TCollectingS3ListingStrategy(
+ limit,
+ [listerFactory, httpGateway, limit, allowLocalFiles](
+ const NS3Lister::TListingRequest& listingRequest,
+ ES3ListingOptions options) {
+ auto ptr = std::shared_ptr<NS3Lister::IS3Lister>(
+ new PartitionedDirectoryResolverIterator{
+ listingRequest,
+ listingRequest.Prefix,
+ options,
+ TDirectoryS3ListingStrategy{
+ listerFactory, httpGateway, limit, allowLocalFiles}});
+ return MakeFuture(std::move(ptr));
+ }) { }
+};
+
+class TBFSDirectoryResolverIterator : public NS3Lister::IS3Lister {
+public:
+ using TListingRequestFactory = std::function<NS3Lister::TListingRequest(
+ const NS3Lister::TListingRequest& defaultParams, const TString& pathPrefix)>;
+
+ TBFSDirectoryResolverIterator(
+ NS3Lister::TListingRequest defaultParams,
+ TListingRequestFactory listingRequestFactory,
+ ES3ListingOptions options,
+ std::deque<TString> initialPathPrefixes,
+ TDirectoryS3ListingStrategy directoryListingStrategy,
+ size_t minParallelism,
+ size_t limit)
+ : DefaultParams(std::move(defaultParams))
+ , Options(options)
+ , ListingRequestFactory(std::move(listingRequestFactory))
+ , DirectoryPrefixQueue(std::move(initialPathPrefixes))
+ , DirectoryListingStrategy(std::move(directoryListingStrategy))
+ , MinParallelism(minParallelism)
+ , Limit(limit) { }
+
+ TFuture<NS3Lister::TListResult> Next() override {
+ First = false;
+ if (DirectoryPrefixQueue.empty()) {
+ return MakeFuture(NS3Lister::TListResult{TIssues{TIssue{"No path to list"}}});
+ }
+ auto sourcePrefix = DirectoryPrefixQueue.front();
+ DirectoryPrefixQueue.pop_front();
+ NextDirectoryListeningChunk =
+ GetPrefixLister(sourcePrefix)
+ .Apply(
+ [this, sourcePrefix](const TFuture<NS3Lister::TListResult>& future)
+ -> NS3Lister::TListResult {
+ try {
+ auto& nextBatch = future.GetValue();
+ if (std::holds_alternative<TIssues>(nextBatch)) {
+ return std::get<TIssues>(nextBatch);
+ }
+
+ auto& listingResult =
+ std::get<NS3Lister::TListEntries>(nextBatch);
+ auto result = NS3Lister::TListEntries{};
+ auto currentListingTotalSize = ReturnedSize +
+ DirectoryPrefixQueue.size() +
+ listingResult.Size();
+ if (currentListingTotalSize > Limit) {
+ // Stop listing
+ result.Directories.push_back({.Path = sourcePrefix});
+ for (auto& directoryPrefix : DirectoryPrefixQueue) {
+ result.Directories.push_back({.Path = directoryPrefix});
+ }
+ DirectoryPrefixQueue.clear();
+ } else {
+ result.Objects.insert(
+ result.Objects.end(),
+ std::make_move_iterator(listingResult.Objects.begin()),
+ std::make_move_iterator(listingResult.Objects.end()));
+ if (currentListingTotalSize < MinParallelism) {
+ for (auto& directoryPrefix : listingResult.Directories) {
+ DirectoryPrefixQueue.push_back(directoryPrefix.Path);
+ }
+ } else {
+ for (auto& directoryPrefix : listingResult.Directories) {
+ result.Directories.push_back(
+ {.Path = directoryPrefix.Path});
+ }
+ for (auto& directoryPrefix : DirectoryPrefixQueue) {
+ result.Directories.push_back(
+ {.Path = directoryPrefix});
+ }
+ DirectoryPrefixQueue.clear();
+ }
+ }
+ ReturnedSize += result.Size();
+ return NS3Lister::TListResult{result};
+ } catch (std::exception& e) {
+ return TIssues{TIssue{e.what()}};
+ }
+ });
+ return NextDirectoryListeningChunk;
+ }
+
+ bool HasNext() override {
+ if (!First) {
+ NextDirectoryListeningChunk.Wait();
+ if (NextDirectoryListeningChunk.HasException()) {
+ return false;
+ }
+ NextDirectoryListeningChunk.GetValue();
+ }
+ return !DirectoryPrefixQueue.empty();
+ }
+
+private:
+ TFuture<NS3Lister::TListResult> GetPrefixLister(const TString& prefix) {
+ const auto& listingRequest = ListingRequestFactory(DefaultParams, prefix);
+ auto listResult = DirectoryListingStrategy.List(listingRequest, Options);
+ return listResult;
+ }
+
+private:
+ const NS3Lister::TListingRequest DefaultParams;
+ const ES3ListingOptions Options;
+ TListingRequestFactory ListingRequestFactory;
+ std::deque<TString> DirectoryPrefixQueue;
+ std::vector<NS3Lister::TObjectListEntry> Objects;
+ TDirectoryS3ListingStrategy DirectoryListingStrategy;
+ TFuture<NS3Lister::TListResult> NextDirectoryListeningChunk;
+ bool First = true;
+ size_t ReturnedSize = 0;
+ const size_t MinParallelism;
+ const size_t Limit;
+};
+
+class TUnPartitionedDatasetS3ListingStrategy : public TCollectingS3ListingStrategy {
+public:
+ TUnPartitionedDatasetS3ListingStrategy(
+ const IS3ListerFactory::TPtr& listerFactory,
+ const IHTTPGateway::TPtr& httpGateway,
+ size_t limit,
+ size_t minParallelism,
+ bool allowLocalFiles)
+ : TCollectingS3ListingStrategy(
+ limit,
+ [listerFactory, httpGateway, limit, minParallelism, allowLocalFiles](
+ const NS3Lister::TListingRequest& listingRequest,
+ ES3ListingOptions options) {
+ auto ptr = std::shared_ptr<NS3Lister::IS3Lister>(
+ new TBFSDirectoryResolverIterator{
+ listingRequest,
+ [](const NS3Lister::TListingRequest& defaultParams,
+ const TString& pathPrefix) {
+ NS3Lister::TListingRequest request(defaultParams);
+ request.Prefix = pathPrefix;
+ return request;
+ },
+ options,
+ std::deque<TString>{
+ (!listingRequest.Prefix.empty())
+ ? listingRequest.Prefix
+ : listingRequest.Pattern.substr(
+ 0, NS3::GetFirstWildcardPos(listingRequest.Pattern))},
+ TDirectoryS3ListingStrategy{
+ listerFactory, httpGateway, limit, allowLocalFiles},
+ minParallelism,
+ limit});
+ return MakeFuture(std::move(ptr));
+ }) { }
+};
+
+class TSimpleS3BatchListingStrategy : public IS3BatchListingStrategy {
+public:
+ TSimpleS3BatchListingStrategy(
+ IS3ListerFactory::TPtr listerFactory,
+ IHTTPGateway::TPtr httpGateway,
+ size_t limit,
+ size_t minParallelism,
+ bool allowLocalFiles)
+ : ListerFactory(std::move(listerFactory))
+ , HttpGateway(std::move(httpGateway))
+ , Limit(limit)
+ , MinParallelism(minParallelism)
+ , AllowLocalFiles(allowLocalFiles) { }
+
+ NThreading::TFuture<NS3Lister::TListResult> List(
+ const NS3Lister::TListingRequest& baseRequest,
+ const std::vector<TString>& paths,
+ TPatternFactory patternFactory,
+ ES3ListingOptions options) override {
+ Y_ENSURE(
+ options == ES3ListingOptions::UnPartitionedDataset,
+ "This strategy only works for un partitioned datasets");
+
+ auto strategy = TCollectingS3ListingStrategy(
+ Limit,
+ [this, paths, patternFactory](
+ const NS3Lister::TListingRequest& listingRequest,
+ ES3ListingOptions options) {
+ auto listerPtr = std::make_shared<TBFSDirectoryResolverIterator>(
+ listingRequest,
+ [patternFactory](
+ const NS3Lister::TListingRequest& defaultParams,
+ const TString& pathPrefix) {
+ NS3Lister::TListingRequest request(defaultParams);
+ request.Prefix = pathPrefix;
+ request.Pattern = patternFactory(defaultParams, pathPrefix);
+ return request;
+ },
+ options,
+ std::deque<TString>(paths.begin(), paths.end()),
+ TDirectoryS3ListingStrategy{
+ ListerFactory, HttpGateway, Limit, AllowLocalFiles},
+ MinParallelism,
+ Limit);
+ return MakeFuture(std::static_pointer_cast<NS3Lister::IS3Lister>(listerPtr));
+ });
+
+ return strategy.List(baseRequest, options);
+ };
+
+private:
+ IS3ListerFactory::TPtr ListerFactory;
+ const IHTTPGateway::TPtr HttpGateway;
+ const size_t Limit;
+ const size_t MinParallelism;
+ const bool AllowLocalFiles;
+};
+
+class TS3ParallelLimitedListerFactory : public IS3ListerFactory {
+public:
+ using TPtr = std::shared_ptr<TS3ParallelLimitedListerFactory>;
+
+ explicit TS3ParallelLimitedListerFactory(size_t maxParallelOps = 1)
+ : Semaphore(TAsyncSemaphore::Make(std::max<size_t>(1, maxParallelOps))) { }
+
+ TFuture<NS3Lister::IS3Lister::TPtr> Make(
+ const IHTTPGateway::TPtr& httpGateway,
+ const NS3Lister::TListingRequest& listingRequest,
+ const TMaybe<TString>& delimiter,
+ bool allowLocalFiles) override {
+ auto acquired = Semaphore->AcquireAsync();
+ return acquired.Apply(
+ [httpGateway, listingRequest, delimiter, allowLocalFiles](const auto& f) {
+ return std::shared_ptr<NS3Lister::IS3Lister>(new TListerLockReleaseWrapper{
+ NS3Lister::MakeS3Lister(
+ httpGateway, listingRequest, delimiter, allowLocalFiles),
+ std::make_unique<TAsyncSemaphore::TAutoRelease>(
+ f.GetValue()->MakeAutoRelease())});
+ });
+ }
+
+private:
+ class TListerLockReleaseWrapper : public NS3Lister::IS3Lister {
+ public:
+ using TLockPtr = std::unique_ptr<TAsyncSemaphore::TAutoRelease>;
+
+ TListerLockReleaseWrapper(NS3Lister::IS3Lister::TPtr listerPtr, TLockPtr lock)
+ : ListerPtr(std::move(listerPtr))
+ , Lock(std::move(lock)) {
+ if (ListerPtr == nullptr) {
+ Lock.reset();
+ }
+ }
+
+ TFuture<NS3Lister::TListResult> Next() override { return ListerPtr->Next(); }
+ bool HasNext() override {
+ auto hasNext = ListerPtr->HasNext();
+ if (!hasNext) {
+ Lock.reset();
+ }
+ return ListerPtr->HasNext();
+ }
+
+ private:
+ NS3Lister::IS3Lister::TPtr ListerPtr;
+ TLockPtr Lock;
+ };
+
+private:
+ const TAsyncSemaphore::TPtr Semaphore;
+};
+
+} // namespace
+
+IS3ListerFactory::TPtr MakeS3ListerFactory(size_t maxParallelOps) {
+ return std::make_shared<TS3ParallelLimitedListerFactory>(maxParallelOps);
+}
+
+IS3ListingStrategy::TPtr MakeS3ListingStrategy(
+ const IHTTPGateway::TPtr& httpGateway,
+ const IS3ListerFactory::TPtr& listerFactory,
+ ui64 maxFilesPerQueryFiles,
+ ui64 maxFilesPerQueryDirectory,
+ ui64 minDesiredDirectoriesOfFilesPerQuery,
+ bool allowLocalFiles) {
+ return std::make_shared<TCompositeS3ListingStrategy>(
+ std::vector<std::shared_ptr<IS3ListingStrategy>>{
+ std::make_shared<TFlatFileS3ListingStrategy>(
+ listerFactory, httpGateway, maxFilesPerQueryFiles, allowLocalFiles),
+ std::make_shared<TConditionalS3ListingStrategy>(
+ std::initializer_list<TConditionalS3ListingStrategy::TPair>{
+ {[](ES3ListingOptions options) {
+ return options == ES3ListingOptions::PartitionedDataset;
+ },
+ std::make_shared<TPartitionedDatasetS3ListingStrategy>(
+ listerFactory, httpGateway, maxFilesPerQueryDirectory, allowLocalFiles)},
+ {[](ES3ListingOptions options) {
+ return options == ES3ListingOptions::UnPartitionedDataset;
+ },
+ std::make_shared<TUnPartitionedDatasetS3ListingStrategy>(
+ listerFactory,
+ httpGateway,
+ maxFilesPerQueryDirectory,
+ minDesiredDirectoriesOfFilesPerQuery,
+ allowLocalFiles)},
+ })});
+}
+
+IS3BatchListingStrategy::TPtr MakeS3BatchListingStrategy(
+ const IHTTPGateway::TPtr& httpGateway,
+ const IS3ListerFactory::TPtr& listerFactory,
+ ui64 maxFilesPerQueryFiles,
+ ui64 maxFilesPerQueryDirectory,
+ ui64 minDesiredDirectoriesOfFilesPerQuery,
+ bool allowLocalFiles) {
+ auto maxSize = std::max(maxFilesPerQueryFiles, maxFilesPerQueryDirectory);
+ return std::make_shared<TSimpleS3BatchListingStrategy>(
+ listerFactory,
+ httpGateway,
+ maxSize,
+ minDesiredDirectoriesOfFilesPerQuery,
+ allowLocalFiles);
+}
+
+} // namespace NYql
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.h b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.h
new file mode 100644
index 0000000000..a77d4d27a0
--- /dev/null
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.h
@@ -0,0 +1,78 @@
+#pragma once
+
+#include <ydb/library/yql/providers/s3/object_listers/yql_s3_list.h>
+#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
+
+#include <library/cpp/threading/future/future.h>
+
+#include <memory>
+#include <variant>
+#include <vector>
+
+namespace NYql {
+
+class IS3ListerFactory {
+public:
+ using TPtr = std::shared_ptr<IS3ListerFactory>;
+
+ virtual NThreading::TFuture<NS3Lister::IS3Lister::TPtr> Make(
+ const IHTTPGateway::TPtr& httpGateway,
+ const NS3Lister::TListingRequest& listingRequest,
+ const TMaybe<TString>& delimiter,
+ bool allowLocalFiles) = 0;
+
+ virtual ~IS3ListerFactory() = default;
+};
+
+IS3ListerFactory::TPtr MakeS3ListerFactory(size_t maxParallelOps);
+
+enum class ES3ListingOptions : ui8 {
+ NoOptions = 0,
+ UnPartitionedDataset = 1,
+ PartitionedDataset = 2
+};
+
+IOutputStream& operator<<(IOutputStream& stream, ES3ListingOptions option);
+
+class IS3ListingStrategy {
+public:
+ using TPtr = std::shared_ptr<IS3ListingStrategy>;
+
+ virtual NThreading::TFuture<NS3Lister::TListResult> List(
+ const NS3Lister::TListingRequest& listingRequest, ES3ListingOptions options) = 0;
+
+ virtual ~IS3ListingStrategy() = default;
+};
+
+IS3ListingStrategy::TPtr MakeS3ListingStrategy(
+ const IHTTPGateway::TPtr& httpGateway,
+ const IS3ListerFactory::TPtr& listerFactory,
+ ui64 maxFilesPerQueryFiles,
+ ui64 maxFilesPerQueryDirectory,
+ ui64 minDesiredDirectoriesOfFilesPerQuery,
+ bool allowLocalFiles);
+
+class IS3BatchListingStrategy {
+public:
+ using TPatternFactory =
+ std::function<TString(const NS3Lister::TListingRequest& baseRequest, const TString& prefix)>;
+ using TPtr = std::shared_ptr<IS3BatchListingStrategy>;
+
+ virtual NThreading::TFuture<NS3Lister::TListResult> List(
+ const NS3Lister::TListingRequest& baseRequest,
+ const std::vector<TString>& paths,
+ TPatternFactory prefixFactory,
+ ES3ListingOptions options) = 0;
+ virtual ~IS3BatchListingStrategy() = default;
+};
+
+IS3BatchListingStrategy::TPtr MakeS3BatchListingStrategy(
+ const IHTTPGateway::TPtr& httpGateway,
+ const IS3ListerFactory::TPtr& listerFactory,
+ ui64 maxFilesPerQueryFiles,
+ ui64 maxFilesPerQueryDirectory,
+ ui64 minDesiredDirectoriesOfFilesPerQuery,
+ bool allowLocalFiles);
+
+
+} // namespace NYql \ No newline at end of file
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp
new file mode 100644
index 0000000000..b03bf0e0cc
--- /dev/null
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp
@@ -0,0 +1,216 @@
+#include "yql_s3_listing_strategy.cpp"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace NYql {
+
+class TMockS3Lister : public NS3Lister::IS3Lister {
+public:
+ TMockS3Lister(std::vector<NS3Lister::TListResult> batches)
+ : Batches(std::move(batches)) { }
+
+ TFuture<NS3Lister::TListResult> Next() override {
+ Y_ENSURE(!Consumed);
+ Consumed = true;
+ return MakeFuture(Batches[Step]);
+ }
+ bool HasNext() override {
+ Y_ENSURE(Consumed);
+ ++Step;
+ Consumed = false;
+ return static_cast<ui16>(Step) < Batches.size();
+ }
+
+private:
+ std::vector<NS3Lister::TListResult> Batches;
+ i32 Step = -1;
+ bool Consumed = true;
+};
+
+class TMockS3ExceptionLister : public NS3Lister::IS3Lister {
+public:
+ TMockS3ExceptionLister(TString exceptionMessage)
+ : ExceptionMessage(std::move(exceptionMessage)) { }
+
+ TFuture<NS3Lister::TListResult> Next() override {
+ Y_ENSURE(!Consumed);
+ Consumed = true;
+ auto promise = NewPromise<NS3Lister::TListResult>();
+ promise.SetException(ExceptionMessage);
+ return promise;
+ }
+ bool HasNext() override { return !Consumed; }
+
+private:
+ TString ExceptionMessage;
+ bool Consumed = false;
+};
+
+Y_UNIT_TEST_SUITE(TCollectingS3ListingStrategyTests) {
+
+void UnitAssertListResultEquals(
+ const NS3Lister::TListResult& expected, const NS3Lister::TListResult& actual) {
+ UNIT_ASSERT_VALUES_EQUAL(expected.index(), actual.index());
+ if (std::holds_alternative<TIssues>(expected)) {
+ const auto& expectedIssues = std::get<TIssues>(expected);
+ const auto& actualIssues = std::get<TIssues>(actual);
+ UNIT_ASSERT_VALUES_EQUAL(expectedIssues.Size(), actualIssues.Size());
+ UNIT_ASSERT_VALUES_EQUAL(
+ expectedIssues.ToOneLineString(), actualIssues.ToOneLineString());
+ return;
+ }
+
+ const auto& expectedEntries = std::get<NS3Lister::TListEntries>(expected);
+ const auto& actualEntries = std::get<NS3Lister::TListEntries>(actual);
+
+ UNIT_ASSERT_VALUES_EQUAL(expectedEntries.Objects.size(), actualEntries.Objects.size());
+ for (size_t i = 0; i < expectedEntries.Objects.size(); ++i) {
+ UNIT_ASSERT_VALUES_EQUAL(
+ expectedEntries.Objects[i].Path, actualEntries.Objects[i].Path);
+ UNIT_ASSERT_VALUES_EQUAL(
+ expectedEntries.Objects[i].Size, actualEntries.Objects[i].Size);
+ UNIT_ASSERT_VALUES_EQUAL(
+ expectedEntries.Objects[i].MatchedGlobs.size(),
+ actualEntries.Objects[i].MatchedGlobs.size());
+ for (size_t j = 0; j < expectedEntries.Objects[i].MatchedGlobs.size(); ++j) {
+ UNIT_ASSERT_VALUES_EQUAL(
+ expectedEntries.Objects[i].MatchedGlobs[j],
+ actualEntries.Objects[i].MatchedGlobs[j]);
+ }
+ }
+ UNIT_ASSERT_VALUES_EQUAL(
+ expectedEntries.Directories.size(), actualEntries.Directories.size());
+ for (size_t i = 0; i < expectedEntries.Directories.size(); ++i) {
+ UNIT_ASSERT_VALUES_EQUAL(
+ expectedEntries.Directories[i].Path, actualEntries.Directories[i].Path);
+ UNIT_ASSERT_VALUES_EQUAL(
+ expectedEntries.Directories[i].MatchedGlobs.size(),
+ actualEntries.Directories[i].MatchedGlobs.size());
+ for (size_t j = 0; j < expectedEntries.Directories[i].MatchedGlobs.size(); ++j) {
+ UNIT_ASSERT_VALUES_EQUAL(
+ expectedEntries.Directories[i].MatchedGlobs[j],
+ actualEntries.Directories[i].MatchedGlobs[j]);
+ }
+ }
+}
+
+Y_UNIT_TEST(IfNoIssuesOccursShouldReturnCollectedPaths) {
+ auto strategy = TCollectingS3ListingStrategy{
+ 10,
+ [](const NS3Lister::TListingRequest& listingRequest,
+ ES3ListingOptions options) {
+ UNIT_ASSERT_VALUES_EQUAL(listingRequest.Prefix, "TEST_INPUT");
+ UNIT_ASSERT_VALUES_EQUAL(options, ES3ListingOptions::NoOptions);
+ return MakeFuture(std::static_pointer_cast<NS3Lister::IS3Lister>(
+ std::make_shared<TMockS3Lister>(std::vector<NS3Lister::TListResult>{
+ NS3Lister::TListEntries{
+ .Objects =
+ std::vector<NS3Lister::TObjectListEntry>{NS3Lister::TObjectListEntry{
+ .Path = "a/a",
+ .Size = 10,
+ }}},
+ NS3Lister::TListEntries{
+ .Objects = std::vector<NS3Lister::TObjectListEntry>{
+ NS3Lister::TObjectListEntry{
+ .Path = "a/b",
+ .Size = 15,
+ }}}})));
+ }};
+
+ auto actualResultFuture = strategy.List(
+ NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"},
+ ES3ListingOptions::NoOptions);
+ auto expectedResult = NS3Lister::TListResult{NS3Lister::TListEntries{
+ .Objects = std::vector<NS3Lister::TObjectListEntry>{
+ NS3Lister::TObjectListEntry{
+ .Path = "a/a",
+ .Size = 10,
+ },
+ NS3Lister::TObjectListEntry{
+ .Path = "a/b",
+ .Size = 15,
+ }}}};
+ const auto& actualResult = actualResultFuture.GetValue();
+ UnitAssertListResultEquals(expectedResult, actualResult);
+}
+
+Y_UNIT_TEST(IfThereAreMoreRecordsThanSpecifiedByLimitShouldReturnError) {
+ auto strategy = TCollectingS3ListingStrategy{
+ 1,
+ [](const NS3Lister::TListingRequest& listingRequest,
+ ES3ListingOptions options) {
+ UNIT_ASSERT_VALUES_EQUAL(listingRequest.Prefix, "TEST_INPUT");
+ UNIT_ASSERT_VALUES_EQUAL(options, ES3ListingOptions::NoOptions);
+ return MakeFuture(std::static_pointer_cast<NS3Lister::IS3Lister>(
+ std::make_shared<TMockS3Lister>(std::vector<NS3Lister::TListResult>{
+ NS3Lister::TListEntries{
+ .Objects =
+ std::vector<NS3Lister::TObjectListEntry>{NS3Lister::TObjectListEntry{
+ .Path = "a/a",
+ .Size = 10,
+ }}},
+ NS3Lister::TListEntries{
+ .Objects = std::vector<NS3Lister::TObjectListEntry>{
+ NS3Lister::TObjectListEntry{
+ .Path = "a/b",
+ .Size = 15,
+ }}}})));
+ }};
+
+ auto actualResultFuture = strategy.List(
+ NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"},
+ ES3ListingOptions::NoOptions);
+ auto expectedResult = NS3Lister::TListResult{TIssues{MakeLimitExceededIssue()}};
+ const auto& actualResult = actualResultFuture.GetValue();
+ UnitAssertListResultEquals(expectedResult, actualResult);
+}
+
+Y_UNIT_TEST(IfAnyIterationReturnIssueThanWholeStrategyShouldReturnIt) {
+ auto strategy = TCollectingS3ListingStrategy{
+ 1,
+ [](const NS3Lister::TListingRequest& listingRequest,
+ ES3ListingOptions options) {
+ UNIT_ASSERT_VALUES_EQUAL(listingRequest.Prefix, "TEST_INPUT");
+ UNIT_ASSERT_VALUES_EQUAL(options, ES3ListingOptions::NoOptions);
+ return MakeFuture(std::static_pointer_cast<NS3Lister::IS3Lister>(
+ std::make_shared<TMockS3Lister>(std::vector<NS3Lister::TListResult>{
+ NS3Lister::TListEntries{
+ .Objects =
+ std::vector<NS3Lister::TObjectListEntry>{NS3Lister::TObjectListEntry{
+ .Path = "a/a",
+ .Size = 10,
+ }}},
+ TIssues{TIssue("TEST_ISSUE")}})));
+ }};
+
+ auto actualResultFuture = strategy.List(
+ NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"},
+ ES3ListingOptions::NoOptions);
+ auto expectedResult = NS3Lister::TListResult{TIssues{TIssue("TEST_ISSUE")}};
+ const auto& actualResult = actualResultFuture.GetValue();
+ UnitAssertListResultEquals(expectedResult, actualResult);
+}
+
+Y_UNIT_TEST(IfExceptionIsReturnedFromIteratorThanItShouldCovertItToIssue) {
+ auto strategy = TCollectingS3ListingStrategy{
+ 10,
+ [](const NS3Lister::TListingRequest& listingRequest,
+ ES3ListingOptions options) {
+ UNIT_ASSERT_VALUES_EQUAL(listingRequest.Prefix, "TEST_INPUT");
+ UNIT_ASSERT_VALUES_EQUAL(options, ES3ListingOptions::NoOptions);
+ return MakeFuture(std::static_pointer_cast<NS3Lister::IS3Lister>(
+ std::make_shared<TMockS3ExceptionLister>("EXCEPTION MESSAGE")));
+ }};
+
+ auto actualResultFuture = strategy.List(
+ NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"},
+ ES3ListingOptions::NoOptions);
+ UNIT_ASSERT(actualResultFuture.HasValue());
+ auto expectedResult = NS3Lister::TListResult{TIssues{TIssue("EXCEPTION MESSAGE")}};
+ const auto& actualResult = actualResultFuture.GetValue();
+ UnitAssertListResultEquals(expectedResult, actualResult);
+}
+
+} // Y_UNIT_TEST_SUITE(TPathTests2)
+
+} // namespace NYql
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp
index e62c23fab9..bd3e92d89a 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp
@@ -262,7 +262,9 @@ public:
return TStatus::Error;
}
- const auto maxFiles = State_->Configuration->MaxFilesPerQuery;
+ const auto maxFiles = std::max(
+ State_->Configuration->MaxFilesPerQuery,
+ State_->Configuration->MaxDirectoriesAndFilesPerQuery);
if (count > maxFiles) {
ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "Too many objects to read: " << count << ", but limit is " << maxFiles));
return TStatus::Error;
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
index ab10097afd..72d7a691a0 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
@@ -35,6 +35,8 @@ void TS3Configuration::Init(const TS3GatewayConfig& config, TIntrusivePtr<TTypeA
FileSizeLimit = config.HasFileSizeLimit() ? config.GetFileSizeLimit() : 2_GB;
MaxFilesPerQuery = config.HasMaxFilesPerQuery() ? config.GetMaxFilesPerQuery() : 7000;
MaxDiscoveryFilesPerQuery = config.HasMaxDiscoveryFilesPerQuery() ? config.GetMaxDiscoveryFilesPerQuery() : 9000;
+ MaxDirectoriesAndFilesPerQuery = config.HasMaxDirectoriesAndFilesPerQuery() ? config.GetMaxDirectoriesAndFilesPerQuery() : 9000;
+ MinDesiredDirectoriesOfFilesPerQuery = config.HasMinDesiredDirectoriesOfFilesPerQuery() ? config.GetMinDesiredDirectoriesOfFilesPerQuery() : 100;
MaxReadSizePerQuery = config.HasMaxReadSizePerQuery() ? config.GetMaxReadSizePerQuery() : 4_GB;
MaxInflightListsPerQuery = config.HasMaxInflightListsPerQuery() ? config.GetMaxInflightListsPerQuery() : 1;
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
index 437715fd5a..86bb1ed5c9 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
@@ -41,6 +41,8 @@ struct TS3Configuration : public TS3Settings, public NCommon::TSettingDispatcher
std::unordered_map<TString, ui64> FormatSizeLimits;
ui64 MaxFilesPerQuery;
ui64 MaxDiscoveryFilesPerQuery;
+ ui64 MaxDirectoriesAndFilesPerQuery;
+ ui64 MinDesiredDirectoriesOfFilesPerQuery;
ui64 MaxReadSizePerQuery;
ui64 MaxInflightListsPerQuery;
bool AllowLocalFiles;