diff options
author | auzhegov <auzhegov@yandex-team.com> | 2023-02-22 14:36:34 +0300 |
---|---|---|
committer | auzhegov <auzhegov@yandex-team.com> | 2023-02-22 14:36:34 +0300 |
commit | 3d9a59583ae857ecc45c709288c5d0e9972fe029 (patch) | |
tree | be62aee52e222ab63d1314bf2d22937fe94dc818 | |
parent | d7c89e8978179791514520aca563a81093f35013 (diff) | |
download | ydb-3d9a59583ae857ecc45c709288c5d0e9972fe029.tar.gz |
Поддержка большого числа файлов в s3
35 files changed, 2650 insertions, 547 deletions
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp index bb3133b869..97930d951c 100644 --- a/ydb/core/yq/libs/init/init.cpp +++ b/ydb/core/yq/libs/init/init.cpp @@ -175,6 +175,17 @@ void Init( if (const ui64 dataInflight = s3readConfig.GetDataInflight()) { readActorFactoryCfg.DataInflight = dataInflight; } + for (auto& formatSizeLimit: protoConfig.GetGateways().GetS3().GetFormatSizeLimit()) { + if (formatSizeLimit.GetName()) { // ignore unnamed limits + readActorFactoryCfg.FormatSizeLimits.emplace( + formatSizeLimit.GetName(), formatSizeLimit.GetFileSizeLimit()); + } + } + if (protoConfig.GetGateways().GetS3().HasFileSizeLimit()) { + readActorFactoryCfg.FileSizeLimit = + protoConfig.GetGateways().GetS3().GetFileSizeLimit(); + } + RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, !protoConfig.GetReadActorsFactoryConfig().GetPqReadActorFactoryConfig().GetCookieCommitMode()); RegisterYdbReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory); RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway, s3HttpRetryPolicy, readActorFactoryCfg, diff --git a/ydb/library/yql/providers/common/proto/gateways_config.proto b/ydb/library/yql/providers/common/proto/gateways_config.proto index 0a9612c687..2726661e48 100644 --- a/ydb/library/yql/providers/common/proto/gateways_config.proto +++ b/ydb/library/yql/providers/common/proto/gateways_config.proto @@ -381,6 +381,8 @@ message TS3GatewayConfig { optional uint64 MaxReadSizePerQuery = 4; optional uint64 MaxDiscoveryFilesPerQuery = 5; optional uint64 MaxInflightListsPerQuery = 7; + optional uint64 MaxDirectoriesAndFilesPerQuery = 8; + optional uint64 MinDesiredDirectoriesOfFilesPerQuery = 9; repeated TAttr DefaultSettings = 100; } diff --git a/ydb/library/yql/providers/s3/CMakeLists.txt b/ydb/library/yql/providers/s3/CMakeLists.txt index bcd41e3119..26b1666177 100644 --- a/ydb/library/yql/providers/s3/CMakeLists.txt +++ b/ydb/library/yql/providers/s3/CMakeLists.txt @@ -10,6 +10,7 @@ add_subdirectory(actors) add_subdirectory(common) add_subdirectory(compressors) add_subdirectory(expr_nodes) +add_subdirectory(object_listers) add_subdirectory(path_generator) add_subdirectory(proto) add_subdirectory(provider) diff --git a/ydb/library/yql/providers/s3/actors/CMakeLists.darwin.txt b/ydb/library/yql/providers/s3/actors/CMakeLists.darwin.txt index 30b8f4cc17..aa908c5da1 100644 --- a/ydb/library/yql/providers/s3/actors/CMakeLists.darwin.txt +++ b/ydb/library/yql/providers/s3/actors/CMakeLists.darwin.txt @@ -37,6 +37,7 @@ target_link_libraries(providers-s3-actors PUBLIC common-arrow-interface providers-s3-common providers-s3-compressors + providers-s3-object_listers providers-s3-proto clickhouse_client_udf providers-s3-range_helpers diff --git a/ydb/library/yql/providers/s3/actors/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/s3/actors/CMakeLists.linux-aarch64.txt index 6971d9cdc8..387fada651 100644 --- a/ydb/library/yql/providers/s3/actors/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/providers/s3/actors/CMakeLists.linux-aarch64.txt @@ -38,6 +38,7 @@ target_link_libraries(providers-s3-actors PUBLIC common-arrow-interface providers-s3-common providers-s3-compressors + providers-s3-object_listers providers-s3-proto clickhouse_client_udf providers-s3-range_helpers diff --git a/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt b/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt index 6971d9cdc8..387fada651 100644 --- a/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt +++ b/ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt @@ -38,6 +38,7 @@ target_link_libraries(providers-s3-actors PUBLIC common-arrow-interface providers-s3-common providers-s3-compressors + providers-s3-object_listers providers-s3-proto clickhouse_client_udf providers-s3-range_helpers diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 101ee0fee5..4976a9c1e6 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -52,6 +52,7 @@ #include <ydb/library/yql/providers/s3/common/util.h> #include <ydb/library/yql/providers/s3/compressors/factory.h> +#include <ydb/library/yql/providers/s3/object_listers/yql_s3_list.h> #include <ydb/library/yql/providers/s3/proto/range.pb.h> #include <ydb/library/yql/providers/s3/range_helpers/path_list_reader.h> #include <ydb/library/yql/providers/s3/serializations/serialization_interval.h> @@ -108,6 +109,9 @@ namespace NYql::NDq { using namespace ::NActors; using namespace ::NYql::NS3Details; +using ::NYql::NS3Lister::ES3PatternVariant; +using ::NYql::NS3Lister::ES3PatternType; + namespace { constexpr TDuration MEMORY_USAGE_REPORT_PERIOD = TDuration::Seconds(10); @@ -120,6 +124,15 @@ struct TS3ReadError : public yexception { using yexception::yexception; }; +struct TObjectPath { + TString Path; + size_t Size; + size_t PathIndex; + + TObjectPath(TString path, size_t size, size_t pathIndex) + : Path(std::move(path)), Size(size), PathIndex(pathIndex) { } +}; + struct TEvPrivate { // Event ids enum EEv : ui32 { @@ -138,6 +151,8 @@ struct TEvPrivate { EvPause, EvContinue, EvFutureResolved, + EvObjectPathBatch, + EvObjectPathReadError, EvEnd }; @@ -146,15 +161,20 @@ struct TEvPrivate { // Events struct TEvReadResult : public TEventLocal<TEvReadResult, EvReadResult> { - TEvReadResult(IHTTPGateway::TContent&& result, const TString& requestId, size_t pathInd) + TEvReadResult( + IHTTPGateway::TContent&& result, + const TString& requestId, + size_t pathInd, + TString path) : Result(std::move(result)) , RequestId(requestId) , PathIndex(pathInd) - {} + , Path(std::move(path)) { } IHTTPGateway::TContent Result; const TString RequestId; const size_t PathIndex; + const TString Path; }; struct TEvDataPart : public TEventLocal<TEvDataPart, EvDataPart> { @@ -188,15 +208,20 @@ struct TEvPrivate { }; struct TEvReadError : public TEventLocal<TEvReadError, EvReadError> { - TEvReadError(TIssues&& error, const TString& requestId, size_t pathInd = std::numeric_limits<size_t>::max()) + TEvReadError( + TIssues&& error, + const TString& requestId, + size_t pathInd, + TString path) : Error(std::move(error)) , RequestId(requestId) , PathIndex(pathInd) - {} + , Path(std::move(path)) { } const TIssues Error; const TString RequestId; const size_t PathIndex; + const TString Path; }; struct TEvRetryEventFunc : public NActors::TEventLocal<TEvRetryEventFunc, EvRetry> { @@ -230,10 +255,393 @@ struct TEvPrivate { struct TEvContinue : public NActors::TEventLocal<TEvContinue, EvContinue> { }; + struct TEvObjectPathBatch : + public NActors::TEventLocal<TEvObjectPathBatch, EvObjectPathBatch> { + std::vector<TObjectPath> ObjectPaths; + bool NoMoreFiles = false; + TEvObjectPathBatch( + std::vector<TObjectPath> objectPaths, bool noMoreFiles) + : ObjectPaths(std::move(objectPaths)), NoMoreFiles(noMoreFiles) { } + }; + + struct TEvObjectPathReadError : + public NActors::TEventLocal<TEvObjectPathReadError, EvObjectPathReadError> { + TIssues Issues; + TEvObjectPathReadError(TIssues issues) : Issues(std::move(issues)) { } + }; }; using namespace NKikimr::NMiniKQL; +class TS3FileQueueActor : public TActorBootstrapped<TS3FileQueueActor> { +public: + static constexpr char ActorName[] = "YQ_S3_FILE_QUEUE_ACTOR"; + + struct TEvPrivatePrivate { + enum { + EvGetNextFile = EventSpaceBegin(TEvents::ES_PRIVATE), + EvNextListingChunkReceived, + EvEnd + }; + static_assert( + EvEnd <= EventSpaceEnd(TEvents::ES_PRIVATE), + "expected EvEnd <= EventSpaceEnd(TEvents::ES_PRIVATE)"); + + struct TEvGetNextFile : public TEventLocal<TEvGetNextFile, EvGetNextFile> { + size_t RequestedAmount = 1; + TEvGetNextFile(size_t requestedAmount) : RequestedAmount(requestedAmount){}; + }; + struct TEvNextListingChunkReceived : + public TEventLocal<TEvNextListingChunkReceived, EvNextListingChunkReceived> { + NS3Lister::TListResult ListingResult; + TEvNextListingChunkReceived(NS3Lister::TListResult listingResult) + : ListingResult(std::move(listingResult)){}; + }; + }; + using TBase = TActorBootstrapped<TS3FileQueueActor>; + + TS3FileQueueActor( + TTxId txId, + TPathList paths, + size_t prefetchSize, + ui64 fileSizeLimit, + IHTTPGateway::TPtr gateway, + TString url, + TString token, + TString pattern, + ES3PatternVariant patternVariant, + ES3PatternType patternType) + : TxId(std::move(txId)) + , PrefetchSize(prefetchSize) + , FileSizeLimit(fileSizeLimit) + , MaybeIssues(Nothing()) + , Gateway(std::move(gateway)) + , Url(std::move(url)) + , Token(std::move(token)) + , Pattern(std::move(pattern)) + , PatternVariant(patternVariant) + , PatternType(patternType) { + for (size_t i = 0; i < paths.size(); ++i) { + if (paths[i].IsDirectory) { + Directories.emplace_back(paths[i].Path, 0, i); + } else { + Objects.emplace_back(paths[i].Path, paths[i].Size, i); + } + } + } + + void Bootstrap() { + if (Directories.empty()) { + LOG_I("TS3FileQueueActor", "Bootstrap there is no directories to list"); + Become(&TS3FileQueueActor::NoMoreDirectoriesState); + } else { + LOG_I("TS3FileQueueActor", "Bootstrap there are directories to list"); + TryPreFetch(); + Become(&TS3FileQueueActor::ThereAreDirectoriesToListState); + } + } + + STATEFN(ThereAreDirectoriesToListState) { + try { + switch (const auto etype = ev->GetTypeRewrite()) { + hFunc(TEvPrivatePrivate::TEvGetNextFile, HandleGetNextFile); + hFunc(TEvPrivatePrivate::TEvNextListingChunkReceived, HandleNextListingChunkReceived); + cFunc(TEvents::TSystem::Poison, PassAway); + default: + MaybeIssues = TIssues{TIssue{TStringBuilder() << "An event with unknown type has been received: '" << etype << "'"}}; + TransitToErrorState(); + break; + } + } catch (const std::exception& e) { + MaybeIssues = TIssues{TIssue{TStringBuilder() << "An unknown exception has occurred: '" << e.what() << "'"}}; + TransitToErrorState(); + } + } + + void HandleGetNextFile(TEvPrivatePrivate::TEvGetNextFile::TPtr& ev) { + auto requestAmount = ev->Get()->RequestedAmount; + LOG_D("TS3FileQueueActor", "HandleGetNextFile requestAmount:" << requestAmount); + if (Objects.size() > requestAmount) { + LOG_D("TS3FileQueueActor", "HandleGetNextFile sending right away"); + SendObjects(ev->Sender, requestAmount); + TryPreFetch(); + } else { + LOG_D("TS3FileQueueActor", "HandleGetNextFile have not enough objects cached. Start fetching"); + RequestQueue.emplace_back(ev->Sender, requestAmount); + TryFetch(); + } + } + + void HandleNextListingChunkReceived(TEvPrivatePrivate::TEvNextListingChunkReceived::TPtr& ev) { + Y_ENSURE(FetchingInProgress()); + ListingFuture = Nothing(); + LOG_D("TS3FileQueueActor", "HandleNextListingChunkReceived"); + if (SaveRetrievedResults(ev->Get()->ListingResult)) { + AnswerPendingRequests(); + if (RequestQueue.empty()) { + LOG_D("TS3FileQueueActor", "HandleNextListingChunkReceived RequestQueue is empty. Trying to prefetch"); + TryPreFetch(); + } else { + LOG_D("TS3FileQueueActor", "HandleNextListingChunkReceived RequestQueue is not empty. Fetching more objects"); + TryFetch(); + } + } else { + TransitToErrorState(); + } + } + + bool SaveRetrievedResults(const NS3Lister::TListResult& listingResult) { + LOG_T("TS3FileQueueActor", "SaveRetrievedResults"); + if (std::holds_alternative<TIssues>(listingResult)) { + MaybeIssues = std::get<TIssues>(listingResult); + return false; + } + + auto listingChunk = std::get<NS3Lister::TListEntries>(listingResult); + LOG_D("TS3FileQueueActor", "SaveRetrievedResults saving: " << listingChunk.Objects.size() << " entries"); + Y_ENSURE(listingChunk.Directories.empty()); + for (auto& object: listingChunk.Objects) { + if (object.Path.EndsWith('/')) { + // skip 'directories' + continue; + } + if (object.Size > FileSizeLimit) { + auto errorMessage = TStringBuilder() + << "Size of object " << object.Path << " = " + << object.Size + << " and exceeds limit = " << FileSizeLimit; + LOG_E("TS3FileQueueActor", errorMessage); + MaybeIssues = TIssues{TIssue{errorMessage}}; + return false; + } + LOG_T("TS3FileQueueActor", "SaveRetrievedResults adding path: " << object.Path); + Objects.emplace_back(object.Path, object.Size, CurrentDirectoryPathIndex); + } + return true; + } + + void AnswerPendingRequests() { + while (!RequestQueue.empty()) { + auto requestToFulfil = std::find_if( + RequestQueue.begin(), + RequestQueue.end(), + [this](auto& val) { return val.second <= Objects.size(); }); + + if (requestToFulfil != RequestQueue.end()) { + auto [actorId, requestedAmount] = *requestToFulfil; + LOG_T( + "TS3FileQueueActor", + "AnswerPendingRequests responding to " + << requestToFulfil->first << " with " << requestToFulfil->second + << " items"); + SendObjects(actorId, requestedAmount); + RequestQueue.erase(requestToFulfil); + } else { + LOG_T( + "TS3FileQueueActor", + "AnswerPendingRequests no more pending requests to fulfil"); + break; + } + } + } + + bool FetchingInProgress() const { return ListingFuture.Defined(); } + + void TransitToNoMoreDirectoriesToListState() { + LOG_I("TS3FileQueueActor", "TransitToNoMoreDirectoriesToListState no more directories to list"); + for (auto& [requestorId, size]: RequestQueue) { + SendObjects(requestorId, size); + } + RequestQueue.clear(); + Become(&TS3FileQueueActor::NoMoreDirectoriesState); + } + + void TransitToErrorState() { + Y_ENSURE(MaybeIssues.Defined()); + LOG_I("TS3FileQueueActor", "TransitToErrorState an error occurred sending "); + for (auto& [requestorId, _]: RequestQueue) { + Send( + requestorId, + std::make_unique<TEvPrivate::TEvObjectPathReadError>(*MaybeIssues)); + } + RequestQueue.clear(); + Objects.clear(); + Directories.clear(); + Become(&TS3FileQueueActor::AnErrorOccurredState); + } + + STATEFN(NoMoreDirectoriesState) { + try { + switch (const auto etype = ev->GetTypeRewrite()) { + hFunc(TEvPrivatePrivate::TEvGetNextFile, HandleGetNextFileForEmptyState); + cFunc(TEvents::TSystem::Poison, PassAway); + default: + MaybeIssues = TIssues{TIssue{TStringBuilder() << "An event with unknown type has been received: '" << etype << "'"}}; + TransitToErrorState(); + break; + } + } catch (const std::exception& e) { + MaybeIssues = TIssues{TIssue{TStringBuilder() << "An unknown exception has occurred: '" << e.what() << "'"}}; + TransitToErrorState(); + } + } + + void HandleGetNextFileForEmptyState(TEvPrivatePrivate::TEvGetNextFile::TPtr& ev) { + LOG_D("TS3FileQueueActor", "HandleGetNextFileForEmptyState Giving away rest of Objects"); + SendObjects(ev->Sender, ev->Get()->RequestedAmount); + } + + STATEFN(AnErrorOccurredState) { + try { + switch (const auto etype = ev->GetTypeRewrite()) { + hFunc(TEvPrivatePrivate::TEvGetNextFile, HandleGetNextFileForErrorState); + cFunc(TEvents::TSystem::Poison, PassAway); + default: + MaybeIssues = TIssues{TIssue{TStringBuilder() << "An event with unknown type has been received: '" << etype << "'"}}; + break; + } + } catch (const std::exception& e) { + MaybeIssues = TIssues{TIssue{TStringBuilder() << "An unknown exception has occurred: '" << e.what() << "'"}}; + } + } + + void HandleGetNextFileForErrorState(TEvPrivatePrivate::TEvGetNextFile::TPtr& ev) { + LOG_D( + "TS3FileQueueActor", + "HandleGetNextFileForErrorState Giving away rest of Objects"); + Send(ev->Sender, std::make_unique<TEvPrivate::TEvObjectPathReadError>(*MaybeIssues)); + } + + void PassAway() override { + if (!MaybeIssues.Defined()) { + for (auto& [requestorId, size]: RequestQueue) { + SendObjects(requestorId, size); + } + } else { + for (auto& [requestorId, _]: RequestQueue) { + Send( + requestorId, + std::make_unique<TEvPrivate::TEvObjectPathReadError>(*MaybeIssues)); + } + } + + RequestQueue.clear(); + Objects.clear(); + Directories.clear(); + TBase::PassAway(); + } + +private: + void SendObjects(const TActorId& recipient, size_t amount) { + Y_ENSURE(!MaybeIssues.Defined()); + size_t correctedAmount = std::min(amount, Objects.size()); + std::vector<TObjectPath> result; + if (correctedAmount != 0) { + result.reserve(correctedAmount); + for (size_t i = 0; i < correctedAmount; ++i) { + result.push_back(Objects.back()); + Objects.pop_back(); + } + } + + LOG_T( + "TS3FileQueueActor", + "SendObjects amount: " << amount << " correctedAmount: " << correctedAmount + << " result size: " << result.size()); + + Send( + recipient, + std::make_unique<TEvPrivate::TEvObjectPathBatch>( + std::move(result), HasNoMoreItems())); + } + bool HasNoMoreItems() const { + return !(MaybeLister.Defined() && (*MaybeLister)->HasNext()) && + Directories.empty() && Objects.empty(); + } + + bool TryPreFetch () { + if (Objects.size() < PrefetchSize) { + return TryFetch(); + } + return false; + } + bool TryFetch() { + if (FetchingInProgress()) { + LOG_D("TS3FileQueueActor", "TryFetch fetching already in progress"); + return true; + } + + if (MaybeLister.Defined() && (*MaybeLister)->HasNext()) { + LOG_D("TS3FileQueueActor", "TryFetch fetching from current lister"); + Fetch(); + return true; + } + + if (!Directories.empty()) { + LOG_D("TS3FileQueueActor", "TryFetch fetching from new lister"); + + auto [path, size, pathIndex] = Directories.back(); + Directories.pop_back(); + CurrentDirectoryPathIndex = pathIndex; + MaybeLister = NS3Lister::MakeS3Lister( + Gateway, + NS3Lister::TListingRequest{ + Url, + Token, + PatternVariant == ES3PatternVariant::PathPattern + ? Pattern + : TStringBuilder{} << path << Pattern, + PatternType, + path}, + Nothing(), + false); + Fetch(); + return true; + } + + LOG_D("TS3FileQueueActor", "TryFetch couldn't start fetching"); + MaybeLister = Nothing(); + TransitToNoMoreDirectoriesToListState(); + return false; + } + void Fetch() { + Y_ENSURE(!ListingFuture.Defined()); + Y_ENSURE(MaybeLister.Defined()); + NActors::TActorSystem* actorSystem = NActors::TActivationContext::ActorSystem(); + ListingFuture = + (*MaybeLister) + ->Next() + .Subscribe([actorSystem, selfId = SelfId()]( + const NThreading::TFuture<NS3Lister::TListResult>& future) { + actorSystem->Send( + selfId, + new TEvPrivatePrivate::TEvNextListingChunkReceived( + future.GetValue())); + }); + } + +private: + const TTxId TxId; + + std::vector<TObjectPath> Objects; + std::vector<TObjectPath> Directories; + + size_t PrefetchSize; + ui64 FileSizeLimit; + TMaybe<NS3Lister::IS3Lister::TPtr> MaybeLister = Nothing(); + TMaybe<NThreading::TFuture<NS3Lister::TListResult>> ListingFuture; + size_t CurrentDirectoryPathIndex = 0; + std::deque<std::pair<TActorId, size_t>> RequestQueue; + TMaybe<TIssues> MaybeIssues; + + const IHTTPGateway::TPtr Gateway; + const TString Url; + const TString Token; + const TString Pattern; + const ES3PatternVariant PatternVariant; + const ES3PatternType PatternType; +}; + class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqComputeActorAsyncInput { public: TS3ReadActor(ui64 inputIndex, @@ -242,6 +650,8 @@ public: const THolderFactory& holderFactory, const TString& url, const TString& token, + const TString& pattern, + ES3PatternVariant patternVariant, TPathList&& paths, bool addPathIndex, ui64 startPathIndex, @@ -250,8 +660,9 @@ public: const IRetryPolicy<long>::TPtr& retryPolicy, const TS3ReadActorFactoryConfig& readActorFactoryCfg, ::NMonitoring::TDynamicCounterPtr counters, - ::NMonitoring::TDynamicCounterPtr taskCounters - ) : ReadActorFactoryCfg(readActorFactoryCfg) + ::NMonitoring::TDynamicCounterPtr taskCounters, + ui64 fileSizeLimit) + : ReadActorFactoryCfg(readActorFactoryCfg) , Gateway(std::move(gateway)) , HolderFactory(holderFactory) , InputIndex(inputIndex) @@ -261,13 +672,15 @@ public: , ActorSystem(TActivationContext::ActorSystem()) , Url(url) , Token(token) + , Pattern(pattern) + , PatternVariant(patternVariant) , Paths(std::move(paths)) , AddPathIndex(addPathIndex) , StartPathIndex(startPathIndex) , SizeLimit(sizeLimit) , Counters(counters) , TaskCounters(taskCounters) - { + , FileSizeLimit(fileSizeLimit) { if (Counters) { QueueDataSize = Counters->GetCounter("QueueDataSize"); QueueDataLimit = Counters->GetCounter("QueueDataLimit"); @@ -283,14 +696,23 @@ public: void Bootstrap() { LOG_D("TS3ReadActor", "Bootstrap" << ", InputIndex: " << InputIndex); + FileQueueActor = RegisterWithSameMailbox(new TS3FileQueueActor{ + TxId, + std::move(Paths), + ReadActorFactoryCfg.MaxInflight * 2, + FileSizeLimit, + Gateway, + Url, + Token, + Pattern, + PatternVariant, + ES3PatternType::Wildcard}); + SendPathRequest(); Become(&TS3ReadActor::StateFunc); - while (TryStartDownload()) { - - } } bool TryStartDownload() { - if (CurrentPathIndex >= Paths.size()) { + if (ObjectPathCache.empty()) { // no path is pending return false; } @@ -303,19 +725,37 @@ public: return false; } - StartDownload(CurrentPathIndex++); + StartDownload(); return true; } - void StartDownload(size_t index) { + void StartDownload() { DownloadInflight++; - const TPath& path = Paths[index]; - auto url = Url + path.Path; + const auto& [path, size, index] = ReadPathFromCache(); + auto url = Url + path; auto id = index + StartPathIndex; const TString requestId = CreateGuidAsString(); LOG_D("TS3ReadActor", "Download: " << url << ", ID: " << id << ", request id: [" << requestId << "]"); - Gateway->Download(url, MakeHeaders(Token, requestId), 0U, std::min(path.Size, SizeLimit), - std::bind(&TS3ReadActor::OnDownloadFinished, ActorSystem, SelfId(), requestId, std::placeholders::_1, id), {}, RetryPolicy); + Gateway->Download(url, MakeHeaders(Token, requestId), 0U, std::min(size, SizeLimit), + std::bind(&TS3ReadActor::OnDownloadFinished, ActorSystem, SelfId(), requestId, std::placeholders::_1, id, path), {}, RetryPolicy); + } + + TObjectPath ReadPathFromCache() { + Y_ENSURE(!ObjectPathCache.empty()); + auto object = ObjectPathCache.back(); + ObjectPathCache.pop_back(); + if (ObjectPathCache.empty() && !IsObjectQueueEmpty) { + SendPathRequest(); + } + return object; + } + void SendPathRequest() { + Y_ENSURE(!IsWaitingObjectQueueResponse); + Send( + FileQueueActor, + std::make_unique<TS3FileQueueActor::TEvPrivatePrivate::TEvGetNextFile>( + ReadActorFactoryCfg.MaxInflight)); + IsWaitingObjectQueueResponse = true; } static constexpr char ActorName[] = "S3_READ_ACTOR"; @@ -333,15 +773,35 @@ private: STRICT_STFUNC(StateFunc, hFunc(TEvPrivate::TEvReadResult, Handle); hFunc(TEvPrivate::TEvReadError, Handle); + hFunc(TEvPrivate::TEvObjectPathBatch, HandleObjectPathBatch); + hFunc(TEvPrivate::TEvObjectPathReadError, HandleObjectPathReadError); ) - static void OnDownloadFinished(TActorSystem* actorSystem, TActorId selfId, const TString& requestId, IHTTPGateway::TResult&& result, size_t pathInd) { + void HandleObjectPathBatch(TEvPrivate::TEvObjectPathBatch::TPtr& objectPathBatch) { + Y_ENSURE(IsWaitingObjectQueueResponse); + IsWaitingObjectQueueResponse = false; + ListedFiles += objectPathBatch->Get()->ObjectPaths.size(); + IsObjectQueueEmpty = objectPathBatch->Get()->NoMoreFiles; + ObjectPathCache.insert( + ObjectPathCache.end(), + std::make_move_iterator(objectPathBatch->Get()->ObjectPaths.begin()), + std::make_move_iterator(objectPathBatch->Get()->ObjectPaths.end())); + while (TryStartDownload()) {} + } + void HandleObjectPathReadError(TEvPrivate::TEvObjectPathReadError::TPtr& result) { + IsObjectQueueEmpty = true; + LOG_E("TS3ReadActor", "Error while object listing, details: TEvObjectPathReadError: " << result->Get()->Issues.ToOneLineString()); + auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "Error while object listing", TIssues{result->Get()->Issues}); + Send(ComputeActorId, new TEvAsyncInputError(InputIndex, issues, NYql::NDqProto::StatusIds::EXTERNAL_ERROR)); + } + + static void OnDownloadFinished(TActorSystem* actorSystem, TActorId selfId, const TString& requestId, IHTTPGateway::TResult&& result, size_t pathInd, const TString path) { switch (result.index()) { case 0U: - actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadResult(std::get<IHTTPGateway::TContent>(std::move(result)), requestId, pathInd))); + actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadResult(std::get<IHTTPGateway::TContent>(std::move(result)), requestId, pathInd, path))); return; case 1U: - actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadError(std::get<TIssues>(std::move(result)), requestId, pathInd))); + actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadError(std::get<TIssues>(std::move(result)), requestId, pathInd, path))); return; default: break; @@ -381,18 +841,21 @@ private: } while (!Blocks.empty() && freeSpace > 0LL); } - if (Blocks.empty() && IsDoneCounter == Paths.size()) { + if (LastFileWasProcessed()) { finished = true; ContainerCache.Clear(); } return total; } + bool LastFileWasProcessed() const { + return Blocks.empty() && (ListedFiles == CompletedFiles) && IsObjectQueueEmpty; + } void Handle(TEvPrivate::TEvReadResult::TPtr& result) { - ++IsDoneCounter; + ++CompletedFiles; const auto id = result->Get()->PathIndex; - const auto path = Paths[id - StartPathIndex].Path; + const auto path = result->Get()->Path; const auto httpCode = result->Get()->Result.HttpResponseCode; const auto requestId = result->Get()->RequestId; IngressBytes += result->Get()->Result.size(); @@ -424,10 +887,10 @@ private: } void Handle(TEvPrivate::TEvReadError::TPtr& result) { - ++IsDoneCounter; + ++CompletedFiles; auto id = result->Get()->PathIndex; const auto requestId = result->Get()->RequestId; - const auto path = Paths[id - StartPathIndex].Path; + const auto path = result->Get()->Path; LOG_W("TS3ReadActor", "Error while reading file " << path << ", details: ID: " << id << ", TEvReadError: " << result->Get()->Error.ToOneLineString() << ", request id: [" << requestId << "]"); auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "Error while reading file " << path << " with request id [" << requestId << "]", TIssues{result->Get()->Error}); Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), NYql::NDqProto::StatusIds::EXTERNAL_ERROR)); @@ -449,6 +912,7 @@ private: QueueTotalDataSize = 0; ContainerCache.Clear(); + Send(FileQueueActor, new NActors::TEvents::TEvPoison()); TActorBootstrapped<TS3ReadActor>::PassAway(); } @@ -461,8 +925,6 @@ private: } private: - size_t IsDoneCounter = 0U; - const TS3ReadActorFactoryConfig ReadActorFactoryCfg; const IHTTPGateway::TPtr Gateway; const THolderFactory& HolderFactory; @@ -477,7 +939,15 @@ private: const TString Url; const TString Token; - const TPathList Paths; + const TString Pattern; + const ES3PatternVariant PatternVariant; + TPathList Paths; + std::vector<TObjectPath> ObjectPathCache; + bool IsObjectQueueEmpty = false; + bool IsWaitingObjectQueueResponse = false; + size_t ListedFiles = 0; + size_t CompletedFiles = 0; + NActors::TActorId FileQueueActor; const bool AddPathIndex; const ui64 StartPathIndex; const ui64 SizeLimit; @@ -495,9 +965,9 @@ private: ::NMonitoring::TDynamicCounters::TCounterPtr TaskQueueDataLimit; ::NMonitoring::TDynamicCounterPtr Counters; ::NMonitoring::TDynamicCounterPtr TaskCounters; - ui64 CurrentPathIndex = 0; ui64 QueueTotalDataSize = 0; ui64 DownloadInflight = 0; + const ui64 FileSizeLimit; }; struct TReadSpec { @@ -1184,6 +1654,8 @@ public: const THolderFactory& holderFactory, const TString& url, const TString& token, + const TString& pattern, + ES3PatternVariant patternVariant, TPathList&& paths, bool addPathIndex, ui64 startPathIndex, @@ -1194,7 +1666,8 @@ public: IArrowReader::TPtr arrowReader, const TS3ReadActorFactoryConfig& readActorFactoryCfg, ::NMonitoring::TDynamicCounterPtr counters, - ::NMonitoring::TDynamicCounterPtr taskCounters + ::NMonitoring::TDynamicCounterPtr taskCounters, + ui64 fileSizeLimit ) : ReadActorFactoryCfg(readActorFactoryCfg) , Gateway(std::move(gateway)) , HolderFactory(holderFactory) @@ -1204,16 +1677,17 @@ public: , RetryPolicy(retryPolicy) , Url(url) , Token(token) + , Pattern(pattern) + , PatternVariant(patternVariant) , Paths(std::move(paths)) , AddPathIndex(addPathIndex) , StartPathIndex(startPathIndex) , ReadSpec(readSpec) - , Count(Paths.size()) , MaxBlocksInFly(maxBlocksInFly) - , ArrowReader(arrowReader) - , Counters(counters) - , TaskCounters(taskCounters) - { + , ArrowReader(std::move(arrowReader)) + , Counters(std::move(counters)) + , TaskCounters(std::move(taskCounters)) + , FileSizeLimit(fileSizeLimit) { if (Counters) { QueueDataSize = Counters->GetCounter("QueueDataSize"); QueueDataLimit = Counters->GetCounter("QueueDataLimit"); @@ -1235,17 +1709,27 @@ public: void Bootstrap() { LOG_D("TS3StreamReadActor", "Bootstrap"); + FileQueueActor = RegisterWithSameMailbox(new TS3FileQueueActor{ + TxId, + std::move(Paths), + ReadActorFactoryCfg.MaxInflight * 2, + FileSizeLimit, + Gateway, + Url, + Token, + Pattern, + PatternVariant, + ES3PatternType::Wildcard}); + SendPathRequest(); Become(&TS3StreamReadActor::StateFunc); - while (TryRegisterCoro()) { - - } } bool TryRegisterCoro() { - if (CurrentPathIndex >= Paths.size()) { + if (ObjectPathCache.empty()) { // no path is pending return false; } + if (QueueTotalDataSize > ReadActorFactoryCfg.DataInflight) { // too large data inflight return false; @@ -1254,23 +1738,70 @@ public: // too large download inflight return false; } - RegisterCoro(CurrentPathIndex++); + RegisterCoro(); return true; } - void RegisterCoro(size_t index) { + void RegisterCoro() { DownloadInflight++; - const TPath& path = Paths[index]; + const auto& objectPath = ReadPathFromCache(); const TString requestId = CreateGuidAsString(); - auto stuff = std::make_shared<TRetryStuff>(Gateway, Url + path.Path, MakeHeaders(Token, requestId), path.Size, TxId, requestId, RetryPolicy); - auto pathIndex = index + StartPathIndex; + auto stuff = std::make_shared<TRetryStuff>( + Gateway, + Url + objectPath.Path, + MakeHeaders(Token, requestId), + objectPath.Size, + TxId, + requestId, + RetryPolicy); + auto pathIndex = objectPath.PathIndex + StartPathIndex; RetryStuffForFile.emplace(pathIndex, stuff); if (TaskCounters) { HttpInflightLimit->Add(Gateway->GetBuffersSizePerStream()); } + LOG_D( + "TS3StreamReadActor", + "RegisterCoro with path " << objectPath.Path << " with pathIndex " + << pathIndex); ::NMonitoring::TDynamicCounters::TCounterPtr inflightCounter; - auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, TxId, ComputeActorId, stuff, ReadSpec, pathIndex, path.Path, Url, MaxBlocksInFly, ArrowReader, ReadActorFactoryCfg, DeferredQueueSize, HttpInflightSize, HttpDataRps); - CoroActors.insert(RegisterWithSameMailbox(std::make_unique<TS3ReadCoroActor>(std::move(impl), std::move(stuff), pathIndex, impl->IsDownloadNeeded(), impl->HttpInflightSize).release())); + auto impl = MakeHolder<TS3ReadCoroImpl>( + InputIndex, + TxId, + ComputeActorId, + stuff, + ReadSpec, + pathIndex, + objectPath.Path, + Url, + MaxBlocksInFly, + ArrowReader, + ReadActorFactoryCfg, + DeferredQueueSize, + HttpInflightSize, + HttpDataRps); + auto isDownloadNeeded = impl->IsDownloadNeeded(); + const auto& httpInflightSize = impl->HttpInflightSize; + CoroActors.insert(RegisterWithSameMailbox(new TS3ReadCoroActor( + std::move(impl), std::move(stuff), pathIndex, isDownloadNeeded, httpInflightSize))); + } + + TObjectPath ReadPathFromCache() { + Y_ENSURE(!ObjectPathCache.empty()); + auto object = ObjectPathCache.back(); + ObjectPathCache.pop_back(); + if (ObjectPathCache.empty() && !IsObjectQueueEmpty) { + SendPathRequest(); + } + return object; + } + void SendPathRequest() { + Y_ENSURE(!IsWaitingObjectQueueResponse); + LOG_D("TS3StreamReadActor", "SendPathRequest " << ReadActorFactoryCfg.MaxInflight); + Send( + FileQueueActor, + std::make_unique<TS3FileQueueActor::TEvPrivatePrivate::TEvGetNextFile>( + ReadActorFactoryCfg.MaxInflight)); + IsWaitingObjectQueueResponse = true; } static constexpr char ActorName[] = "S3_STREAM_READ_ACTOR"; @@ -1381,7 +1912,7 @@ private: MaybeContinue(); - finished = Blocks.empty() && !Count; + finished = LastFileWasProcessed(); if (finished) { ContainerCache.Clear(); ArrowTupleContainerCache.Clear(); @@ -1416,6 +1947,7 @@ private: for (const auto actorId : CoroActors) { Send(actorId, new NActors::TEvents::TEvPoison()); } + Send(FileQueueActor, new NActors::TEvents::TEvPoison()); ContainerCache.Clear(); ArrowTupleContainerCache.Clear(); @@ -1468,8 +2000,33 @@ private: hFunc(TEvPrivate::TEvNextBlock, HandleNextBlock); hFunc(TEvPrivate::TEvNextRecordBatch, HandleNextRecordBatch); hFunc(TEvPrivate::TEvFileFinished, HandleFileFinished); + hFunc(TEvPrivate::TEvObjectPathBatch, HandleObjectPathBatch); + hFunc(TEvPrivate::TEvObjectPathReadError, HandleObjectPathReadError); ) + void HandleObjectPathBatch(TEvPrivate::TEvObjectPathBatch::TPtr& objectPathBatch) { + Y_ENSURE(IsWaitingObjectQueueResponse); + IsWaitingObjectQueueResponse = false; + ListedFiles += objectPathBatch->Get()->ObjectPaths.size(); + IsObjectQueueEmpty = objectPathBatch->Get()->NoMoreFiles; + + ObjectPathCache.insert( + ObjectPathCache.end(), + std::make_move_iterator(objectPathBatch->Get()->ObjectPaths.begin()), + std::make_move_iterator(objectPathBatch->Get()->ObjectPaths.end())); + LOG_W( + "TS3StreamReadActor", + "HandleObjectPathBatch " << ObjectPathCache.size() << " IsObjectQueueEmpty " + << IsObjectQueueEmpty << " MaxInflight " << ReadActorFactoryCfg.MaxInflight); + while (TryRegisterCoro()) {} + } + void HandleObjectPathReadError(TEvPrivate::TEvObjectPathReadError::TPtr& result) { + IsObjectQueueEmpty = true; + LOG_W("TS3StreamReadActor", "Error while object listing, details: TEvObjectPathReadError: " << result->Get()->Issues.ToOneLineString()); + auto issues = NS3Util::AddParentIssue(TStringBuilder{} << "Error while object listing", TIssues{result->Get()->Issues}); + Send(ComputeActorId, new TEvAsyncInputError(InputIndex, std::move(issues), NYql::NDqProto::StatusIds::EXTERNAL_ERROR)); + } + void HandleRetry(TEvPrivate::TEvRetryEventFunc::TPtr& retry) { return retry->Get()->Functor(); } @@ -1514,14 +2071,13 @@ private: CoroActors.erase(ev->Sender); IngressBytes += ev->Get()->IngressDelta; RetryStuffForFile.erase(ev->Get()->PathIndex); - Y_VERIFY(Count); - --Count; if (TaskCounters) { HttpInflightLimit->Sub(Gateway->GetBuffersSizePerStream()); } DownloadInflight--; - if (CurrentPathIndex < Paths.size()) { + CompletedFiles++; + if (!ObjectPathCache.empty()) { TryRegisterCoro(); } else { /* @@ -1529,12 +2085,16 @@ private: then we need to pass the information to Compute Actor that the download of all data is finished in this place */ - if (Blocks.empty() && Count == 0) { + if (LastFileWasProcessed()) { Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived(InputIndex)); } } } + bool LastFileWasProcessed() const { + return Blocks.empty() && (ListedFiles == CompletedFiles) && IsObjectQueueEmpty; + } + const TS3ReadActorFactoryConfig ReadActorFactoryCfg; const IHTTPGateway::TPtr Gateway; THashMap<size_t, TRetryStuff::TPtr> RetryStuffForFile; @@ -1550,16 +2110,21 @@ private: const TString Url; const TString Token; - const TPathList Paths; + const TString Pattern; + const ES3PatternVariant PatternVariant; + TPathList Paths; + std::vector<TObjectPath> ObjectPathCache; + bool IsObjectQueueEmpty = false; + bool IsWaitingObjectQueueResponse = false; const bool AddPathIndex; const ui64 StartPathIndex; + size_t ListedFiles = 0; + size_t CompletedFiles = 0; const TReadSpec::TPtr ReadSpec; std::deque<TReadyBlock> Blocks; - ui32 Count; const std::size_t MaxBlocksInFly; IArrowReader::TPtr ArrowReader; ui64 IngressBytes = 0; - size_t CurrentPathIndex = 0; mutable TInstant LastMemoryReport = TInstant::Now(); ui64 QueueTotalDataSize = 0; ::NMonitoring::TDynamicCounters::TCounterPtr QueueDataSize; @@ -1577,7 +2142,9 @@ private: ::NMonitoring::TDynamicCounterPtr TaskCounters; ui64 DownloadInflight = 0; std::set<NActors::TActorId> CoroActors; + NActors::TActorId FileQueueActor; bool Paused = false; + const ui64 FileSizeLimit; }; using namespace NKikimr::NMiniKQL; @@ -1741,6 +2308,38 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( const auto authToken = credentialsProviderFactory->CreateProvider()->GetAuthInfo(); const auto& settings = params.GetSettings(); + TString pathPattern = "*"; + ES3PatternVariant pathPatternVariant = ES3PatternVariant::FilePattern; + auto hasDirectories = std::find_if(paths.begin(), paths.end(), [](const TPath& a) { + return a.IsDirectory; + }) != paths.end(); + if (hasDirectories) { + auto pathPatternValue = settings.find("pathpattern"); + if (pathPatternValue == settings.cend()) { + ythrow yexception() << "'pathpattern' must be configured for directory listing"; + } + pathPattern = pathPatternValue->second; + + auto pathPatternVariantValue = settings.find("pathpatternvariant"); + if (pathPatternVariantValue == settings.cend()) { + ythrow yexception() + << "'pathpatternvariant' must be configured for directory listing"; + } + auto maybePathPatternVariant = + NS3Lister::DeserializePatternVariant(pathPatternVariantValue->second); + if (maybePathPatternVariant.Empty()) { + ythrow yexception() + << "Unknown 'pathpatternvariant': " << pathPatternVariantValue->second; + } + pathPatternVariant = *maybePathPatternVariant; + } + ui64 fileSizeLimit = cfg.FileSizeLimit; + if (params.HasFormat()) { + if (auto it = cfg.FormatSizeLimits.find(params.GetFormat()); it != cfg.FormatSizeLimits.end()) { + fileSizeLimit = it->second; + } + } + bool addPathIndex = false; if (auto it = settings.find("addPathIndex"); it != settings.cend()) { addPathIndex = FromString<bool>(it->second); @@ -1840,9 +2439,9 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( std::size_t maxBlocksInFly = 2; if (const auto it = settings.find("fileReadBlocksInFly"); settings.cend() != it) maxBlocksInFly = FromString<ui64>(it->second); - const auto actor = new TS3StreamReadActor(inputIndex, txId, std::move(gateway), holderFactory, params.GetUrl(), authToken, + const auto actor = new TS3StreamReadActor(inputIndex, txId, std::move(gateway), holderFactory, params.GetUrl(), authToken, pathPattern, pathPatternVariant, std::move(paths), addPathIndex, startPathIndex, readSpec, computeActorId, retryPolicy, - maxBlocksInFly, arrowReader, cfg, counters, taskCounters); + maxBlocksInFly, arrowReader, cfg, counters, taskCounters, fileSizeLimit); return {actor, actor}; } else { @@ -1850,9 +2449,9 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( if (const auto it = settings.find("sizeLimit"); settings.cend() != it) sizeLimit = FromString<ui64>(it->second); - const auto actor = new TS3ReadActor(inputIndex, txId, std::move(gateway), holderFactory, params.GetUrl(), authToken, + const auto actor = new TS3ReadActor(inputIndex, txId, std::move(gateway), holderFactory, params.GetUrl(), authToken, pathPattern, pathPatternVariant, std::move(paths), addPathIndex, startPathIndex, computeActorId, sizeLimit, retryPolicy, - cfg, counters, taskCounters); + cfg, counters, taskCounters, fileSizeLimit); return {actor, actor}; } } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h index 60b1c0d4ff..00f5434248 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.h @@ -17,6 +17,8 @@ struct TS3ReadActorFactoryConfig { ui64 RowsInBatch = 1000; ui64 MaxInflight = 20; ui64 DataInflight = 200_MB; + ui64 FileSizeLimit = 2_GB; + std::unordered_map<TString, ui64> FormatSizeLimits; }; void RegisterS3ReadActorFactory( diff --git a/ydb/library/yql/providers/s3/object_listers/CMakeLists.darwin.txt b/ydb/library/yql/providers/s3/object_listers/CMakeLists.darwin.txt new file mode 100644 index 0000000000..7188435f83 --- /dev/null +++ b/ydb/library/yql/providers/s3/object_listers/CMakeLists.darwin.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(providers-s3-object_listers) +target_link_libraries(providers-s3-object_listers PUBLIC + contrib-libs-cxxsupp + yutil + contrib-libs-re2 + cpp-xml-document + providers-common-http_gateway + library-yql-utils + yql-utils-threading +) +target_sources(providers-s3-object_listers PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/object_listers/yql_s3_path.cpp +) diff --git a/ydb/library/yql/providers/s3/object_listers/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/s3/object_listers/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..2b11109c92 --- /dev/null +++ b/ydb/library/yql/providers/s3/object_listers/CMakeLists.linux-aarch64.txt @@ -0,0 +1,24 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(providers-s3-object_listers) +target_link_libraries(providers-s3-object_listers PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + contrib-libs-re2 + cpp-xml-document + providers-common-http_gateway + library-yql-utils + yql-utils-threading +) +target_sources(providers-s3-object_listers PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/object_listers/yql_s3_path.cpp +) diff --git a/ydb/library/yql/providers/s3/object_listers/CMakeLists.linux.txt b/ydb/library/yql/providers/s3/object_listers/CMakeLists.linux.txt new file mode 100644 index 0000000000..2b11109c92 --- /dev/null +++ b/ydb/library/yql/providers/s3/object_listers/CMakeLists.linux.txt @@ -0,0 +1,24 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(providers-s3-object_listers) +target_link_libraries(providers-s3-object_listers PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + contrib-libs-re2 + cpp-xml-document + providers-common-http_gateway + library-yql-utils + yql-utils-threading +) +target_sources(providers-s3-object_listers PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/object_listers/yql_s3_path.cpp +) diff --git a/ydb/library/yql/providers/s3/object_listers/CMakeLists.txt b/ydb/library/yql/providers/s3/object_listers/CMakeLists.txt new file mode 100644 index 0000000000..5bb4faffb4 --- /dev/null +++ b/ydb/library/yql/providers/s3/object_listers/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux.txt) +endif() diff --git a/ydb/library/yql/providers/s3/object_listers/yql_s3_future_algorithms.h b/ydb/library/yql/providers/s3/object_listers/yql_s3_future_algorithms.h new file mode 100644 index 0000000000..2fd32c50f3 --- /dev/null +++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_future_algorithms.h @@ -0,0 +1,102 @@ +#pragma once + +#include "yql_s3_list.h" + +#include <library/cpp/threading/future/future.h> + +#include <memory> +#include <variant> +#include <vector> + +namespace NYql { + +enum class EAggregationAction : ui8 { Proceed = 0, Stop = 1 }; + +template<typename State, typename Increment> +using TAggregationOperation = + std::function<EAggregationAction(State& state, Increment&& increment)>; + +template<typename State> +using TAggregationErrorHandler = + std::function<EAggregationAction(State& state, const std::exception& exception)>; + +namespace NDetails { + +template<typename State> +struct TAggregationState { + State Acc; + TAggregationOperation<State, NS3Lister::TListResult> IncrementHandler; + TAggregationErrorHandler<State> ErrorHandler; + std::shared_ptr<NS3Lister::IS3Lister> Lister; + NThreading::TPromise<State> Promise; + + TAggregationState( + State&& acc, + TAggregationOperation<State, NS3Lister::TListResult>&& incrementHandler, + TAggregationErrorHandler<State>&& errorHandler, + std::shared_ptr<NS3Lister::IS3Lister> lister, + NThreading::TPromise<State> promise) + : Acc(std::move(acc)) + , IncrementHandler(std::move(incrementHandler)) + , ErrorHandler(std::move(errorHandler)) + , Lister(std::move(lister)) + , Promise(std::move(promise)) { } +}; + +template<typename State> +void SubscribeCallback( + const NThreading::TFuture<NS3Lister::TListResult>& strategyListingResult, + std::shared_ptr<TAggregationState<State>> state) { + EAggregationAction nextAction; + + try { + NS3Lister::TListResult listingResult = strategyListingResult.GetValue(); + nextAction = state->IncrementHandler(state->Acc, std::move(listingResult)); + } catch (const std::exception& e) { + nextAction = state->ErrorHandler(state->Acc, e); + } + + if (nextAction == EAggregationAction::Proceed && state->Lister->HasNext()) { + auto nextFuture = state->Lister->Next(); + nextFuture.Subscribe( + [state = std::move(state)]( + const NThreading::TFuture<NS3Lister::TListResult>& listingResult) mutable { + SubscribeCallback(listingResult, std::move(state)); + }); + } else { + state->Promise.SetValue(std::move(state->Acc)); + } +} + +} // namespace + +template<typename State> +NThreading::TFuture<State> AccumulateWithEarlyStop( + std::shared_ptr<NS3Lister::IS3Lister> lister, + typename std::remove_reference<State>::type&& initialState, + TAggregationOperation<State, NS3Lister::TListResult>&& incrementHandler, + TAggregationErrorHandler<State>&& errorHandler) { + auto state = std::make_shared<NDetails::TAggregationState<State>>( + std::move(initialState), + std::move(incrementHandler), + std::move(errorHandler), + std::move(lister), + NThreading::NewPromise<State>()); + + auto promise = state->Promise; + + if (state->Lister->HasNext()) { + auto nextFuture = state->Lister->Next(); + nextFuture.Subscribe( + [state = std::move(state)]( + const NThreading::TFuture<NS3Lister::TListResult>& listingResult) mutable { + SubscribeCallback(listingResult, std::move(state)); + }); + } else { + state->Promise.SetValue(std::move(state->Acc)); + } + + return promise.GetFuture(); +} + +} // namespace NYql diff --git a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp new file mode 100644 index 0000000000..f2437741f5 --- /dev/null +++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp @@ -0,0 +1,443 @@ +#include "yql_s3_list.h" +#include "yql_s3_path.h" + +#include <ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h> +#include <ydb/library/yql/providers/s3/common/util.h> +#include <ydb/library/yql/utils/log/log.h> +#include <ydb/library/yql/utils/url_builder.h> +#include <ydb/library/yql/utils/yql_panic.h> + +#include <contrib/libs/re2/re2/re2.h> + +#ifdef THROW +#undef THROW +#endif +#include <library/cpp/threading/future/async_semaphore.h> +#include <library/cpp/xml/document/xml-document.h> +#include <util/folder/iterator.h> +#include <util/generic/guid.h> +#include <util/string/builder.h> + +#include <deque> +#include <utility> + +namespace NYql::NS3Lister { + +TMaybe<TString> SerializePatternVariant(ES3PatternVariant variant) { + switch (variant) { + case ES3PatternVariant::PathPattern: + return "path_pattern"; + case ES3PatternVariant::FilePattern: + return "file_pattern"; + default: + return Nothing(); + } +} + +TMaybe<ES3PatternVariant> DeserializePatternVariant(const TString& variant) { + if (variant == "path_pattern") { + return ES3PatternVariant::PathPattern; + } + + if (variant == "file_pattern") { + return ES3PatternVariant::FilePattern; + } + + return Nothing(); +} + +namespace { + +using namespace NThreading; + +using TResultFilter = + std::function<bool(const TString& path, std::vector<TString>& matchedGlobs)>; + +TResultFilter MakeFilterRegexp(const TString& regex) { + auto re = std::make_shared<RE2>(re2::StringPiece(regex), RE2::Options()); + YQL_ENSURE(re->ok()); + + const size_t numGroups = re->NumberOfCapturingGroups(); + YQL_CLOG(DEBUG, ProviderS3) + << "Got regex: '" << regex << "' with " << numGroups << " capture groups "; + + auto groups = std::make_shared<std::vector<std::string>>(numGroups); + auto reArgs = std::make_shared<std::vector<re2::RE2::Arg>>(numGroups); + auto reArgsPtr = std::make_shared<std::vector<re2::RE2::Arg*>>(numGroups); + + for (size_t i = 0; i < numGroups; ++i) { + (*reArgs)[i] = &(*groups)[i]; + (*reArgsPtr)[i] = &(*reArgs)[i]; + } + + return [groups, + reArgs, + reArgsPtr, + re](const TString& path, std::vector<TString>& matchedGlobs) { + matchedGlobs.clear(); + bool matched = + re2::RE2::FullMatchN(path, *re, reArgsPtr->data(), reArgsPtr->size()); + if (matched) { + matchedGlobs.reserve(groups->size()); + for (auto& group : *groups) { + matchedGlobs.push_back(ToString(group)); + } + } + return matched; + }; +} + +TResultFilter MakeFilterWildcard(const TString& pattern) { + auto regexPatternPrefix = pattern.substr(0, NS3::GetFirstWildcardPos(pattern)); + if (regexPatternPrefix == pattern) { + // just match for equality + return [pattern](const TString& path, std::vector<TString>& matchedGlobs) { + matchedGlobs.clear(); + return path == pattern; + }; + } + + const auto regex = NS3::RegexFromWildcards(pattern); + YQL_CLOG(DEBUG, ProviderS3) << "Got prefix: '" << regexPatternPrefix << "', regex: '" + << regex << "' from original pattern '" << pattern << "'"; + + return MakeFilterRegexp(regex); +} + +TResultFilter MakeFilter(const TString& pattern, ES3PatternType patternType) { + switch (patternType) { + case ES3PatternType::Wildcard: + return MakeFilterWildcard(pattern); + case ES3PatternType::Regexp: + return MakeFilterRegexp(pattern); + default: + ythrow yexception() << "Unknown 'patternType': " << int(patternType); + } +} + +struct TS3ListObjectV2Response { + bool IsTruncated = false; + ui64 MaxKeys = 0U; + ui64 KeyCount = 0U; + TMaybe<TString> ContinuationToken; + std::vector<TObjectListEntry> Contents; + std::vector<TString> CommonPrefixes; +}; + +TS3ListObjectV2Response ParseListObjectV2Response( + const NXml::TDocument& xml, const TString& requestId = "UnspecifiedRequestId") { + if (const auto& root = xml.Root(); root.Name() == "Error") { + const auto& code = root.Node("Code", true).Value<TString>(); + const auto& message = root.Node("Message", true).Value<TString>(); + ythrow yexception() << message << ", error: code: " << code << ", request id: [" + << requestId << "]"; + } else if (root.Name() != "ListBucketResult") { + ythrow yexception() << "Unexpected response '" << root.Name() + << "' on discovery, request id: [" << requestId << "]"; + } else { + const NXml::TNamespacesForXPath nss( + 1U, {"s3", "http://s3.amazonaws.com/doc/2006-03-01/"}); + + TS3ListObjectV2Response result; + auto continuationTokenNode = root.Node("s3:NextContinuationToken", true, nss); + result.ContinuationToken = + (!continuationTokenNode.IsNull()) + ? TMaybe<TString>(continuationTokenNode.Value<TString>()) + : Nothing(); + result.IsTruncated = root.Node("s3:IsTruncated", false, nss).Value<bool>(); + result.MaxKeys = root.Node("s3:MaxKeys", false, nss).Value<size_t>(); + result.KeyCount = root.Node("s3:KeyCount", false, nss).Value<size_t>(); + + const auto& prefixes = root.XPath("s3:CommonPrefixes/s3:Prefix", true, nss); + for (const auto& prefix : prefixes) { + auto prefixString = prefix.Value<TString>(); + result.CommonPrefixes.push_back(prefixString); + } + const auto& contents = root.XPath("s3:Contents", true, nss); + for (const auto& content : contents) { + auto& newContent = result.Contents.emplace_back(); + newContent.Path = content.Node("s3:Key", false, nss).Value<TString>(); + newContent.Size = content.Node("s3:Size", false, nss).Value<ui64>(); + } + return result; + } +} + +class TLocalS3Lister : public IS3Lister { +public: + TLocalS3Lister(const TListingRequest& listingRequest, const TMaybe<TString>& delimiter) + : ListingRequest(listingRequest) { + Y_ENSURE(!delimiter.Defined(), "delimiter is not supported for local files"); + Filter = MakeFilter(listingRequest.Pattern, listingRequest.PatternType); + } + + TFuture<TListResult> Next() override { + Y_ENSURE(IsFirst, "Should not be called more than once"); + Y_ENSURE(ListingRequest.Url.substr(0, 7) == "file://"); + IsFirst = false; + auto promise = NewPromise<TListResult>(); + try { + auto fullPath = ListingRequest.Url.substr(7); + for (const auto& e : TPathSplit(fullPath)) { + if (e == "..") { + promise.SetException( + "Security violation: trying access parent directory in path"); + } + } + + auto output = TListEntries{}; + for (const auto& entry : TDirIterator(fullPath)) { + if (entry.fts_type != FTS_F) { + continue; + } + + auto filename = TString(entry.fts_path + ListingRequest.Url.size() - 7); + TVector<TString> matches; + if (Filter(filename, matches)) { + auto& object = output.Objects.emplace_back(); + object.Path = filename; + object.Size = entry.fts_statp->st_size; + object.MatchedGlobs.swap(matches); + } + } + promise.SetValue(std::move(output)); + } catch (const std::exception& ex) { + promise.SetException(std::current_exception()); + } + return promise.GetFuture(); + } + + bool HasNext() override { return IsFirst; } + +private: + const TListingRequest ListingRequest; + TResultFilter Filter; + bool IsFirst = true; +}; + +class TS3Lister : public IS3Lister { +public: + struct TListingContext { + // Filter + const TResultFilter Filter; + // Result processing + NThreading::TPromise<TListResult> Promise; + NThreading::TPromise<TMaybe<TListingContext>> NextRequestPromise; + const std::shared_ptr<TListEntries> Output; + // HTTP control + const IHTTPGateway::TWeakPtr GatewayWeak; + const IRetryPolicy<long>::TPtr RetryPolicy; + const TString RequestId; + const TListingRequest ListingRequest; + const TMaybe<TString> Delimiter; + const TMaybe<TString> ContinuationToken; + const ui64 MaxKeys; + }; + + TS3Lister( + IHTTPGateway::TPtr httpGateway, + const TListingRequest& listingRequest, + const TMaybe<TString>& delimiter, + size_t maxFilesPerQuery = 1000) + : MaxFilesPerQuery(maxFilesPerQuery) { + Y_ENSURE( + listingRequest.Url.substr(0, 7) != "file://", + "This lister does not support reading local files"); + + auto filter = MakeFilter(listingRequest.Pattern, listingRequest.PatternType); + + auto ctx = TListingContext{ + filter, + NewPromise<TListResult>(), + NewPromise<TMaybe<TListingContext>>(), + std::make_shared<TListEntries>(), + IHTTPGateway::TWeakPtr(httpGateway), + GetHTTPDefaultRetryPolicy(), + CreateGuidAsString(), + listingRequest, + delimiter, + Nothing(), + MaxFilesPerQuery}; + YQL_CLOG(TRACE, ProviderS3) + << "[TS3Lister] Got URL: '" << ctx.ListingRequest.Url + << "' with path prefix '" << ctx.ListingRequest.Prefix + << "' capture pattern '" << ctx.ListingRequest.Pattern << "' and delimiter '" + << ctx.Delimiter.GetOrElse("NO_DELIMITER") << "'"; + + auto promise = NewPromise<TMaybe<TListingContext>>(); + promise.SetValue(TMaybe<TListingContext>(ctx)); + NextRequestCtx = promise; + } + + ~TS3Lister() override = default; + +private: + static void SubmitRequestIntoGateway(TListingContext& ctx) { + IHTTPGateway::THeaders headers; + if (!ctx.ListingRequest.Token.empty()) { + headers.emplace_back("X-YaCloud-SubjectToken:" + ctx.ListingRequest.Token); + } + headers.emplace_back(TString{"X-Request-ID:"} + ctx.RequestId); + + TUrlBuilder urlBuilder(ctx.ListingRequest.Url); + urlBuilder.AddUrlParam("list-type", "2") + .AddUrlParam("prefix", ctx.ListingRequest.Prefix) + .AddUrlParam("max-keys", TStringBuilder() << ctx.MaxKeys); + + if (ctx.ContinuationToken.Defined()) { + urlBuilder.AddUrlParam("continuation-token", *ctx.ContinuationToken); + } + if (ctx.Delimiter.Defined()) { + urlBuilder.AddUrlParam("delimiter", *ctx.Delimiter); + } + + auto gateway = ctx.GatewayWeak.lock(); + if (!gateway) { + ythrow yexception() << "Gateway disappeared"; + } + + auto retryPolicy = ctx.RetryPolicy; + gateway->Download( + urlBuilder.Build(), + headers, + 0U, + 0U, + CallbackFactoryMethod(std::move(ctx)), + /*data=*/"", + retryPolicy); + } + static IHTTPGateway::TOnResult CallbackFactoryMethod(TListingContext&& listingContext) { + return [c = std::move(listingContext)](IHTTPGateway::TResult&& result) { + OnDiscovery(c, std::move(result)); + }; + } + + static void OnDiscovery(TListingContext ctx, IHTTPGateway::TResult&& result) try { + auto gateway = ctx.GatewayWeak.lock(); + if (!gateway) { + ythrow yexception() << "Gateway disappeared"; + } + switch (result.index()) { + case 0U: { + auto xmlString = + std::get<IHTTPGateway::TContent>(std::move(result)).Extract(); + const NXml::TDocument xml(xmlString, NXml::TDocument::String); + auto parsedResponse = ParseListObjectV2Response(xml, ctx.RequestId); + YQL_CLOG(DEBUG, ProviderS3) + << "Listing of " << ctx.ListingRequest.Url + << ctx.ListingRequest.Prefix << ": have " << ctx.Output->Size() + << " entries, got another " << parsedResponse.KeyCount + << " entries, request id: [" << ctx.RequestId << "]"; + + for (const auto& content : parsedResponse.Contents) { + if (content.Path.EndsWith('/')) { + // skip 'directories' + continue; + } + TVector<TString> matchedGlobs; + if (ctx.Filter(content.Path, matchedGlobs)) { + auto& object = ctx.Output->Objects.emplace_back(); + object.Path = content.Path; + object.Size = content.Size; + object.MatchedGlobs.swap(matchedGlobs); + } + } + for (const auto& prefix : parsedResponse.CommonPrefixes) { + auto& directory = ctx.Output->Directories.emplace_back(); + directory.Path = prefix; + directory.MatchedRegexp = ctx.Filter(prefix, directory.MatchedGlobs); + } + + if (parsedResponse.IsTruncated) { + YQL_CLOG(DEBUG, ProviderS3) << "Listing of " << ctx.ListingRequest.Url + << ctx.ListingRequest.Prefix + << ": got truncated flag, will continue"; + + auto newCtx = TListingContext{ + ctx.Filter, + NewPromise<TListResult>(), + NewPromise<TMaybe<TListingContext>>(), + std::make_shared<TListEntries>(), + ctx.GatewayWeak, + GetHTTPDefaultRetryPolicy(), + CreateGuidAsString(), + ctx.ListingRequest, + ctx.Delimiter, + parsedResponse.ContinuationToken, + parsedResponse.MaxKeys}; + + ctx.NextRequestPromise.SetValue(TMaybe<TListingContext>(newCtx)); + } else { + ctx.NextRequestPromise.SetValue(Nothing()); + } + ctx.Promise.SetValue(std::move(*ctx.Output)); + break; + } + case 1U: { + auto issues = std::get<TIssues>(std::move(result)); + issues = NS3Util::AddParentIssue( + TStringBuilder{} << "request id: [" << ctx.RequestId << "]", + std::move(issues)); + YQL_CLOG(INFO, ProviderS3) + << "Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix + << ": got error from http gateway: " << issues.ToString(true); + ctx.Promise.SetValue(std::move(issues)); + ctx.NextRequestPromise.SetValue(Nothing()); + break; + } + default: + ythrow yexception() << "Undefined variant index: " << result.index() + << ", request id: [" << ctx.RequestId << "]"; + } + } catch (const std::exception& ex) { + YQL_CLOG(INFO, ProviderS3) + << "Listing of " << ctx.ListingRequest.Url << ctx.ListingRequest.Prefix + << " : got exception: " << ex.what(); + ctx.Promise.SetException(std::current_exception()); + ctx.NextRequestPromise.SetValue(Nothing()); + } + +public: + TFuture<TListResult> Next() override { + auto maybeRequestCtx = NextRequestCtx.GetValueSync(); + + Y_ENSURE(maybeRequestCtx.Defined()); + auto result = maybeRequestCtx->Promise; + + NextRequestCtx = maybeRequestCtx->NextRequestPromise; + + SubmitRequestIntoGateway(*maybeRequestCtx); + return result.GetFuture(); + } + + bool HasNext() override { + // User should process future returned from `Next()` method call before asking if more data is available + // If returned DATA from `Next()` was NOT PROCESSED than this method might BLOCK. + NextRequestCtx.Wait(); + return NextRequestCtx.HasValue() && NextRequestCtx.GetValue().Defined(); + } + +private: + const size_t MaxFilesPerQuery; + TFuture<TMaybe<TListingContext>> NextRequestCtx; +}; + +} // namespace + +IS3Lister::TPtr MakeS3Lister( + const IHTTPGateway::TPtr& httpGateway, + const TListingRequest& listingRequest, + const TMaybe<TString>& delimiter, + bool allowLocalFiles) { + if (listingRequest.Url.substr(0, 7) != "file://") { + return std::make_shared<TS3Lister>(httpGateway, listingRequest, delimiter); + } + + if (!allowLocalFiles) { + ythrow yexception() << "Using local files as DataSource isn't allowed, but trying access " + << listingRequest.Url; + } + return std::make_shared<TLocalS3Lister>(listingRequest, delimiter); +} + +} // namespace NYql::NS3Lister diff --git a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h new file mode 100644 index 0000000000..5134481949 --- /dev/null +++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h @@ -0,0 +1,100 @@ +#pragma once + +#include <library/cpp/threading/future/future.h> +#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> + +#include <memory> +#include <variant> +#include <vector> + +namespace NYql { + +template<typename T> +class TIterator { +public: + virtual T Next() = 0; + + virtual bool HasNext() = 0; + + virtual ~TIterator() = default; +}; + +namespace NS3Lister { + +enum class ES3PatternVariant { FilePattern, PathPattern }; + +TMaybe<TString> SerializePatternVariant(ES3PatternVariant variant); + +TMaybe<ES3PatternVariant> DeserializePatternVariant(const TString& variant); + +enum class ES3PatternType { + /** + * Pattern may include following wildcard expressions: + * * - any (possibly empty) sequence of characters + * ? - single character + * {variant1, variant2} - list of alternatives + */ + Wildcard, + /** + * Pattern should be valid RE2 regex + */ + Regexp +}; + +struct TObjectListEntry { + TString Path; + ui64 Size = 0; + std::vector<TString> MatchedGlobs; +}; + +struct TDirectoryListEntry { + TString Path; + bool MatchedRegexp = false; + std::vector<TString> MatchedGlobs; +}; + +class TListEntries { +public: + [[nodiscard]] size_t Size() const { return Objects.size() + Directories.size(); } + [[nodiscard]] bool Empty() const { return Objects.empty() && Directories.empty(); } + + TListEntries& operator+=(TListEntries&& other) { + Objects.insert( + Objects.end(), + std::make_move_iterator(other.Objects.begin()), + std::make_move_iterator(other.Objects.end())); + Directories.insert( + Directories.end(), + std::make_move_iterator(other.Directories.begin()), + std::make_move_iterator(other.Directories.end())); + return *this; + } + +public: + std::vector<TObjectListEntry> Objects; + std::vector<TDirectoryListEntry> Directories; +}; + +using TListResult = std::variant<TListEntries, TIssues>; + +struct TListingRequest { + TString Url; + TString Token; + TString Pattern; + ES3PatternType PatternType = ES3PatternType::Wildcard; + TString Prefix; +}; + +class IS3Lister : public TIterator<NThreading::TFuture<TListResult>> { +public: + using TPtr = std::shared_ptr<IS3Lister>; +}; + +IS3Lister::TPtr MakeS3Lister( + const IHTTPGateway::TPtr& httpGateway, + const TListingRequest& listingRequest, + const TMaybe<TString>& delimiter, + bool allowLocalFiles); + +} // namespace NS3Lister +} // namespace NYql
\ No newline at end of file diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_path.cpp b/ydb/library/yql/providers/s3/object_listers/yql_s3_path.cpp index e746aeedde..e746aeedde 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_path.cpp +++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_path.cpp diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_path.h b/ydb/library/yql/providers/s3/object_listers/yql_s3_path.h index c39f476f88..c39f476f88 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_path.h +++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_path.h diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_path_ut.cpp b/ydb/library/yql/providers/s3/object_listers/yql_s3_path_ut.cpp index f2ecaa8487..1b452a2683 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_path_ut.cpp +++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_path_ut.cpp @@ -1,4 +1,4 @@ -#include <ydb/library/yql/providers/s3/provider/yql_s3_path.h> +#include "yql_s3_path.h" #include <library/cpp/testing/unittest/registar.h> diff --git a/ydb/library/yql/providers/s3/provider/CMakeLists.darwin.txt b/ydb/library/yql/providers/s3/provider/CMakeLists.darwin.txt index 83a19eed84..73c17ce754 100644 --- a/ydb/library/yql/providers/s3/provider/CMakeLists.darwin.txt +++ b/ydb/library/yql/providers/s3/provider/CMakeLists.darwin.txt @@ -45,6 +45,7 @@ target_link_libraries(providers-s3-provider PUBLIC providers-result-expr_nodes providers-s3-common providers-s3-expr_nodes + providers-s3-object_listers providers-s3-path_generator providers-s3-proto providers-s3-range_helpers @@ -60,10 +61,9 @@ target_sources(providers-s3-provider PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_exec.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_path.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_provider_impl.cpp diff --git a/ydb/library/yql/providers/s3/provider/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/s3/provider/CMakeLists.linux-aarch64.txt index f75614e737..50fe0185f0 100644 --- a/ydb/library/yql/providers/s3/provider/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/providers/s3/provider/CMakeLists.linux-aarch64.txt @@ -46,6 +46,7 @@ target_link_libraries(providers-s3-provider PUBLIC providers-result-expr_nodes providers-s3-common providers-s3-expr_nodes + providers-s3-object_listers providers-s3-path_generator providers-s3-proto providers-s3-range_helpers @@ -61,10 +62,9 @@ target_sources(providers-s3-provider PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_exec.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_path.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_provider_impl.cpp diff --git a/ydb/library/yql/providers/s3/provider/CMakeLists.linux.txt b/ydb/library/yql/providers/s3/provider/CMakeLists.linux.txt index f75614e737..50fe0185f0 100644 --- a/ydb/library/yql/providers/s3/provider/CMakeLists.linux.txt +++ b/ydb/library/yql/providers/s3/provider/CMakeLists.linux.txt @@ -46,6 +46,7 @@ target_link_libraries(providers-s3-provider PUBLIC providers-result-expr_nodes providers-s3-common providers-s3-expr_nodes + providers-s3-object_listers providers-s3-path_generator providers-s3-proto providers-s3-range_helpers @@ -61,10 +62,9 @@ target_sources(providers-s3-provider PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_exec.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_path.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_provider_impl.cpp diff --git a/ydb/library/yql/providers/s3/provider/ut/CMakeLists.darwin.txt b/ydb/library/yql/providers/s3/provider/ut/CMakeLists.darwin.txt index c158b5a01c..9647ab029a 100644 --- a/ydb/library/yql/providers/s3/provider/ut/CMakeLists.darwin.txt +++ b/ydb/library/yql/providers/s3/provider/ut/CMakeLists.darwin.txt @@ -8,9 +8,6 @@ add_executable(ydb-library-yql-providers-s3-provider-ut) -target_compile_options(ydb-library-yql-providers-s3-provider-ut PRIVATE - -DUSE_CURRENT_UDF_ABI_VERSION -) target_include_directories(ydb-library-yql-providers-s3-provider-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider ) @@ -21,9 +18,6 @@ target_link_libraries(ydb-library-yql-providers-s3-provider-ut PUBLIC library-cpp-cpuid_check cpp-testing-unittest_main providers-s3-provider - udf-service-exception_policy - yql-sql-pg_dummy - yql-dq-opt ) target_link_options(ydb-library-yql-providers-s3-provider-ut PRIVATE -Wl,-no_deduplicate @@ -34,7 +28,7 @@ target_link_options(ydb-library-yql-providers-s3-provider-ut PRIVATE CoreFoundation ) target_sources(ydb-library-yql-providers-s3-provider-ut PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_path_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp ) set_property( TARGET diff --git a/ydb/library/yql/providers/s3/provider/ut/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/s3/provider/ut/CMakeLists.linux-aarch64.txt index 2b7084c413..27e5bf0cea 100644 --- a/ydb/library/yql/providers/s3/provider/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/providers/s3/provider/ut/CMakeLists.linux-aarch64.txt @@ -8,9 +8,6 @@ add_executable(ydb-library-yql-providers-s3-provider-ut) -target_compile_options(ydb-library-yql-providers-s3-provider-ut PRIVATE - -DUSE_CURRENT_UDF_ABI_VERSION -) target_include_directories(ydb-library-yql-providers-s3-provider-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider ) @@ -21,9 +18,6 @@ target_link_libraries(ydb-library-yql-providers-s3-provider-ut PUBLIC library-cpp-lfalloc cpp-testing-unittest_main providers-s3-provider - udf-service-exception_policy - yql-sql-pg_dummy - yql-dq-opt ) target_link_options(ydb-library-yql-providers-s3-provider-ut PRIVATE -ldl @@ -36,7 +30,7 @@ target_link_options(ydb-library-yql-providers-s3-provider-ut PRIVATE -ldl ) target_sources(ydb-library-yql-providers-s3-provider-ut PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_path_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp ) set_property( TARGET diff --git a/ydb/library/yql/providers/s3/provider/ut/CMakeLists.linux.txt b/ydb/library/yql/providers/s3/provider/ut/CMakeLists.linux.txt index fe6d83f738..1cdb1de1d3 100644 --- a/ydb/library/yql/providers/s3/provider/ut/CMakeLists.linux.txt +++ b/ydb/library/yql/providers/s3/provider/ut/CMakeLists.linux.txt @@ -8,9 +8,6 @@ add_executable(ydb-library-yql-providers-s3-provider-ut) -target_compile_options(ydb-library-yql-providers-s3-provider-ut PRIVATE - -DUSE_CURRENT_UDF_ABI_VERSION -) target_include_directories(ydb-library-yql-providers-s3-provider-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider ) @@ -23,9 +20,6 @@ target_link_libraries(ydb-library-yql-providers-s3-provider-ut PUBLIC library-cpp-cpuid_check cpp-testing-unittest_main providers-s3-provider - udf-service-exception_policy - yql-sql-pg_dummy - yql-dq-opt ) target_link_options(ydb-library-yql-providers-s3-provider-ut PRIVATE -ldl @@ -38,7 +32,7 @@ target_link_options(ydb-library-yql-providers-s3-provider-ut PRIVATE -ldl ) target_sources(ydb-library-yql-providers-s3-provider-ut PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_path_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp ) set_property( TARGET diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp index d9628877a3..cfc5293dac 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp @@ -1,4 +1,3 @@ -#include "yql_s3_path.h" #include "yql_s3_provider_impl.h" #include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h> @@ -8,6 +7,7 @@ #include <ydb/library/yql/providers/common/provider/yql_provider.h> #include <ydb/library/yql/providers/common/provider/yql_provider_names.h> #include <ydb/library/yql/providers/common/provider/yql_data_provider_impl.h> +#include <ydb/library/yql/providers/s3/object_listers/yql_s3_path.h> #include <ydb/library/yql/utils/log/log.h> diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp index cb708b15fe..0e1c9a218c 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp @@ -441,6 +441,22 @@ public: return true; } + if (name == "pathpattern"sv) { + TStringBuf unused; + if (!ExtractSettingValue(setting.Tail(), "path_pattern"sv, format, {}, ctx, unused)) { + return false; + } + return true; + } + + if (name == "pathpatternvariant"sv) { + TStringBuf unused; + if (!ExtractSettingValue(setting.Tail(), "path_pattern_variant"sv, format, {}, ctx, unused)) { + return false; + } + return true; + } + YQL_ENSURE(name == "projection"sv); haveProjection = true; if (!EnsureAtom(setting.Tail(), ctx)) { @@ -457,7 +473,7 @@ public: if (!EnsureValidSettings(*input->Child(TS3Object::idx_Settings), { "compression"sv, "partitionedby"sv, "projection"sv, "data.interval.unit"sv, "data.datetime.formatname"sv, "data.datetime.format"sv, "data.timestamp.formatname"sv, "data.timestamp.format"sv, - "readmaxbytes"sv, "csvdelimiter"sv, "directories"sv, "filepattern"sv }, validator, ctx)) + "readmaxbytes"sv, "csvdelimiter"sv, "directories"sv, "filepattern"sv, "pathpattern"sv, "pathpatternvariant"sv }, validator, ctx)) { return TStatus::Error; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp index 976f3e3ff2..d37b8de521 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp @@ -1,9 +1,9 @@ -#include "yql_s3_list.h" -#include "yql_s3_path.h" #include "yql_s3_provider_impl.h" +#include "yql_s3_listing_strategy.h" #include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h> #include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h> +#include <ydb/library/yql/providers/s3/object_listers/yql_s3_path.h> #include <ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h> #include <ydb/library/yql/providers/s3/range_helpers/path_list_reader.h> #include <ydb/library/yql/core/yql_expr_optimize.h> @@ -53,18 +53,18 @@ bool FindFilePattern(const TExprNode& settings, TExprContext& ctx, TString& file using namespace NPathGenerator; struct TListRequest { - TString Token; - TString Url; - TString Pattern; // can contain capturing groups - TMaybe<TString> PathPrefix; // set iff Pattern is regex (not glob pattern) + NS3Lister::TListingRequest S3Request; + TString FilePattern; + ES3ListingOptions Options = ES3ListingOptions::NoOptions; TVector<IPathGenerator::TColumnWithValue> ColumnValues; }; bool operator<(const TListRequest& a, const TListRequest& b) { - return std::tie(a.Token, a.Url, a.Pattern) < std::tie(b.Token, b.Url, b.Pattern); + return std::tie(a.S3Request.Token, a.S3Request.Url, a.S3Request.Pattern) < + std::tie(b.S3Request.Token, b.S3Request.Url, b.S3Request.Pattern); } -using TPendingRequests = TMap<TListRequest, NThreading::TFuture<IS3Lister::TListResult>>; +using TPendingRequests = TMap<TListRequest, NThreading::TFuture<NS3Lister::TListResult>>; struct TGeneratedColumnsConfig { TVector<TString> Columns; @@ -76,8 +76,22 @@ class TS3IODiscoveryTransformer : public TGraphTransformerBase { public: TS3IODiscoveryTransformer(TS3State::TPtr state, IHTTPGateway::TPtr gateway) : State_(std::move(state)) - , Lister_(IS3Lister::Make(gateway, State_->Configuration->MaxDiscoveryFilesPerQuery, State_->Configuration->MaxInflightListsPerQuery, State_->Configuration->AllowLocalFiles)) - {} + , ListerFactory_( + MakeS3ListerFactory(State_->Configuration->MaxInflightListsPerQuery)) + , ListingStrategy_(MakeS3ListingStrategy( + gateway, + ListerFactory_, + State_->Configuration->MaxDiscoveryFilesPerQuery, + State_->Configuration->MaxDirectoriesAndFilesPerQuery, + State_->Configuration->MaxDirectoriesAndFilesPerQuery, + State_->Configuration->AllowLocalFiles)) + , ListingBatchStrategy_(MakeS3BatchListingStrategy( + gateway, + ListerFactory_, + State_->Configuration->MaxDiscoveryFilesPerQuery, + State_->Configuration->MaxDirectoriesAndFilesPerQuery, + State_->Configuration->MaxDirectoriesAndFilesPerQuery, + State_->Configuration->AllowLocalFiles)) { } void Rewind() final { PendingRequests_.clear(); @@ -114,7 +128,7 @@ private: return false; }); - TVector<NThreading::TFuture<IS3Lister::TListResult>> futures; + TVector<NThreading::TFuture<NS3Lister::TListResult>> futures; for (const auto& n : nodes) { try { if (auto maybeDqSource = TMaybeNode<TDqSourceWrap>(n)) { @@ -167,17 +181,21 @@ private: YQL_ENSURE(it != pendingRequests.end()); YQL_ENSURE(it->second.HasValue()); - const IS3Lister::TListResult& listResult = it->second.GetValue(); + const NS3Lister::TListResult& listResult = it->second.GetValue(); if (listResult.index() == 1) { const auto& issues = std::get<TIssues>(listResult); - YQL_CLOG(INFO, ProviderS3) << "Discovery " << req.Url << req.Pattern << " error " << issues.ToString(); + YQL_CLOG(INFO, ProviderS3) << "Discovery " << req.S3Request.Url << req.S3Request.Pattern << " error " << issues.ToString(); std::for_each(issues.begin(), issues.end(), std::bind(&TExprContext::AddError, std::ref(ctx), std::placeholders::_1)); return TStatus::Error; } - const auto& listEntries = std::get<IS3Lister::TListEntries>(listResult); - for (auto& entry : listEntries) { - listedPaths.emplace_back(entry.Path, entry.Size, false); + const auto& listEntries = std::get<NS3Lister::TListEntries>(listResult); + for (auto& entry: listEntries.Objects) { + listedPaths.emplace_back( + NS3Details::TPath{entry.Path, entry.Size, false}); + } + for (auto& path: listEntries.Directories) { + listedPaths.emplace_back(NS3Details::TPath{path.Path, 0, false}); } } @@ -250,6 +268,15 @@ private: return TStatus::Ok; } + struct TExtraColumnValue { + TString Name; + TMaybe<NUdf::EDataSlot> Type; + TString Value; + bool operator<(const TExtraColumnValue& other) const { + return std::tie(Name, Type, Value) < std::tie(other.Name, other.Type, other.Value); + } + }; + TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { // Raise errors if any AllFuture_.GetValue(); @@ -277,15 +304,6 @@ private: size_t readSize = 0; TExprNode::TListType pathNodes; - struct TExtraColumnValue { - TString Name; - TMaybe<NUdf::EDataSlot> Type; - TString Value; - bool operator<(const TExtraColumnValue& other) const { - return std::tie(Name, Type, Value) < std::tie(other.Name, other.Type, other.Value); - } - }; - TMap<TMaybe<TVector<TExtraColumnValue>>, NS3Details::TPathList> pathsByExtraValues; const TGeneratedColumnsConfig* generatedColumnsConfig = nullptr; if (auto it = genColumnsByNode.find(node); it != genColumnsByNode.end()) { @@ -293,58 +311,55 @@ private: } const bool assumeDirectories = generatedColumnsConfig && generatedColumnsConfig->Generator; + bool needsListingOnActors = false; for (auto& req : requests) { auto it = pendingRequests.find(req); YQL_ENSURE(it != pendingRequests.end()); YQL_ENSURE(it->second.HasValue()); - const IS3Lister::TListResult& listResult = it->second.GetValue(); + const NS3Lister::TListResult& listResult = it->second.GetValue(); if (listResult.index() == 1) { const auto& issues = std::get<TIssues>(listResult); - YQL_CLOG(INFO, ProviderS3) << "Discovery " << req.Url << req.Pattern << " error " << issues.ToString(); + YQL_CLOG(INFO, ProviderS3) << "Discovery " << req.S3Request.Url << req.S3Request.Pattern << " error " << issues.ToString(); std::for_each(issues.begin(), issues.end(), std::bind(&TExprContext::AddError, std::ref(ctx), std::placeholders::_1)); return TStatus::Error; } - const auto& listEntries = std::get<IS3Lister::TListEntries>(listResult); - if (listEntries.empty() && !NS3::HasWildcards(req.Pattern)) { + const auto& listEntries = std::get<NS3Lister::TListEntries>(listResult); + if (listEntries.Size() == 0 && !NS3::HasWildcards(req.S3Request.Pattern)) { // request to list particular files that are missing ctx.AddError(TIssue(ctx.GetPosition(object.Pos()), - TStringBuilder() << "Object " << req.Pattern << " doesn't exist.")); + TStringBuilder() << "Object " << req.S3Request.Pattern << " doesn't exist.")); return TStatus::Error; } - for (auto& entry : listEntries) { + if (!listEntries.Directories.empty()) { + needsListingOnActors = true; + } + + for (auto& entry: listEntries.Objects) { TMaybe<TVector<TExtraColumnValue>> extraValues; if (generatedColumnsConfig) { - extraValues = TVector<TExtraColumnValue>{}; - if (!req.ColumnValues.empty()) { - // explicit partitioning - YQL_ENSURE(req.ColumnValues.size() == generatedColumnsConfig->Columns.size()); - for (auto& cv : req.ColumnValues) { - TExtraColumnValue value; - value.Name = cv.Name; - value.Type = cv.Type; - value.Value = cv.Value; - extraValues->push_back(std::move(value)); - } - } else { - YQL_ENSURE(entry.MatchedGlobs.size() == generatedColumnsConfig->Columns.size()); - for (size_t i = 0; i < generatedColumnsConfig->Columns.size(); ++i) { - TExtraColumnValue value; - value.Name = generatedColumnsConfig->Columns[i]; - value.Value = entry.MatchedGlobs[i]; - extraValues->push_back(std::move(value)); - } - } + extraValues = ExtractExtraColumnValues( + req, generatedColumnsConfig, entry.MatchedGlobs); } auto& pathList = pathsByExtraValues[extraValues]; - pathList.emplace_back(entry.Path, entry.Size, false); + pathList.emplace_back(NS3Details::TPath{entry.Path, entry.Size, false}); readSize += entry.Size; } + for (auto& entry: listEntries.Directories) { + TMaybe<TVector<TExtraColumnValue>> extraValues; + if (generatedColumnsConfig) { + extraValues = ExtractExtraColumnValues( + req, generatedColumnsConfig, entry.MatchedGlobs); + } + + auto& pathList = pathsByExtraValues[extraValues]; + pathList.emplace_back(NS3Details::TPath{entry.Path, 0, true}); + } - YQL_CLOG(INFO, ProviderS3) << "Object " << req.Pattern << " has " << listEntries.size() << " items with total size " << readSize; + YQL_CLOG(INFO, ProviderS3) << "Object " << req.S3Request.Pattern << " has " << listEntries.Objects.size() << " items with total size " << readSize; } for (const auto& [extraValues, pathList] : pathsByExtraValues) { @@ -438,6 +453,44 @@ private: if (assumeDirectories) { settings.push_back(ctx.NewList(settingsPos, { ctx.NewAtom(settingsPos, "directories", TNodeFlags::Default) })); } + if (needsListingOnActors) { + TString pathPattern; + NS3Lister::ES3PatternVariant pathPatternVariant; + if (requests[0].Options == ES3ListingOptions::UnPartitionedDataset) { + pathPattern = requests[0].S3Request.Pattern; + pathPatternVariant = NS3Lister::ES3PatternVariant::PathPattern; + } else if (requests[0].Options == ES3ListingOptions::PartitionedDataset) { + pathPattern = requests[0].FilePattern; + pathPatternVariant = NS3Lister::ES3PatternVariant::FilePattern; + } else { + ctx.AddError(TIssue( + ctx.GetPosition(read.Pos()), + TStringBuilder() + << "Unknown listing option " << int(requests[0].Options))); + return TStatus::Error; + } + auto maybeSerializedVariant = NS3Lister::SerializePatternVariant(pathPatternVariant); + if (!maybeSerializedVariant.Defined()) { + ctx.AddError(TIssue( + ctx.GetPosition(read.Pos()), + TStringBuilder() << "Couldn't serialize path pattern variant " + << int(pathPatternVariant))); + return TStatus::Error; + } + + settings.push_back(ctx.NewList( + settingsPos, + { + ctx.NewAtom(settingsPos, "pathpattern"), + ctx.NewAtom(settingsPos, pathPattern), + })); + settings.push_back(ctx.NewList( + settingsPos, + { + ctx.NewAtom(settingsPos, "pathpatternvariant"), + ctx.NewAtom(settingsPos, *maybeSerializedVariant), + })); + } TExprNode::TPtr s3Object; s3Object = Build<TS3Object>(ctx, object.Pos()) @@ -465,6 +518,33 @@ private: return RemapExpr(input, output, replaces, ctx, TOptimizeExprSettings(nullptr)); } + static TVector<TExtraColumnValue> ExtractExtraColumnValues( + const TListRequest& req, + const TGeneratedColumnsConfig* generatedColumnsConfig, + const std::vector<TString>& matchedGlobs) { + auto extraValues = TVector<TExtraColumnValue>{}; + if (!req.ColumnValues.empty()) { + // explicit partitioning + YQL_ENSURE(req.ColumnValues.size() == generatedColumnsConfig->Columns.size()); + for (auto& cv: req.ColumnValues) { + TExtraColumnValue value; + value.Name = cv.Name; + value.Type = cv.Type; + value.Value = cv.Value; + extraValues.push_back(std::move(value)); + } + } else { + YQL_ENSURE(matchedGlobs.size() == generatedColumnsConfig->Columns.size()); + for (size_t i = 0; i < generatedColumnsConfig->Columns.size(); ++i) { + TExtraColumnValue value; + value.Name = generatedColumnsConfig->Columns[i]; + value.Value = matchedGlobs[i]; + extraValues.push_back(std::move(value)); + } + } + return extraValues; + } + static bool ValidateProjection(TPositionHandle pos, const TPathGeneratorPtr& generator, const TVector<TString>& partitionedBy, TExprContext& ctx) { const TSet<TString> partitionedBySet(partitionedBy.begin(), partitionedBy.end()); TSet<TString> projectionSet; @@ -485,7 +565,7 @@ private: return true; } - bool LaunchListsForNode(const TDqSourceWrap& source, TVector<NThreading::TFuture<IS3Lister::TListResult>>& futures, TExprContext& ctx) { + bool LaunchListsForNode(const TDqSourceWrap& source, TVector<NThreading::TFuture<NS3Lister::TListResult>>& futures, TExprContext& ctx) { TS3DataSource dataSource = source.DataSource().Maybe<TS3DataSource>().Cast(); const auto& connect = State_->Configuration->Clusters.at(dataSource.Cluster().StringValue()); const auto& token = State_->Configuration->Tokens.at(dataSource.Cluster().StringValue()); @@ -508,24 +588,35 @@ private: NS3Details::TPathList directories; NS3Details::UnpackPathsList(path.Data().Literal().Value(), FromString<bool>(path.IsText().Literal().Value()), directories); - TListRequest req; - req.Token = tokenStr; - req.Url = url; - for (const auto& directory : directories) { - req.Pattern = NS3::NormalizePath(TStringBuilder() << directory.Path << "/" << effectiveFilePattern); - RequestsByNode_[source.Raw()].push_back(req); - if (PendingRequests_.find(req) == PendingRequests_.end()) { - auto future = Lister_->List(req.Token, req.Url, req.Pattern); - PendingRequests_[req] = future; - futures.push_back(std::move(future)); - } + auto req = TListRequest{.S3Request{ + .Url = url, + .Token = tokenStr, + .PatternType = NS3Lister::ES3PatternType::Wildcard}}; + RequestsByNode_[source.Raw()].push_back(req); + + std::vector<TString> paths; + for (const auto& prefixObject: directories) { + paths.emplace_back(prefixObject.Path); } + + auto future = ListingBatchStrategy_->List( + req.S3Request, + paths, + [effectiveFilePattern]( + const NS3Lister::TListingRequest& baseRequest, const TString& prefix) { + Y_UNUSED(baseRequest); + return NS3::NormalizePath( + TStringBuilder() << prefix << "/" << effectiveFilePattern); + }, + ES3ListingOptions::UnPartitionedDataset); + PendingRequests_[req] = future; + futures.push_back(std::move(future)); } return true; } - bool LaunchListsForNode(const TS3Read& read, TVector<NThreading::TFuture<IS3Lister::TListResult>>& futures, TExprContext& ctx) { + bool LaunchListsForNode(const TS3Read& read, TVector<NThreading::TFuture<NS3Lister::TListResult>>& futures, TExprContext& ctx) { const auto& settings = *read.Ref().Child(4); // schema is required @@ -618,9 +709,9 @@ private: // each path in CONCAT() can generate multiple list requests for explicit partitioning TVector<TListRequest> reqs; - TListRequest req; - req.Token = tokenStr; - req.Url = url; + auto req = TListRequest{ + .S3Request{.Url = url, .Token = tokenStr}, + .FilePattern = effectiveFilePattern}; if (partitionedBy.empty()) { if (path.empty()) { @@ -628,7 +719,7 @@ private: return false; } if (path.EndsWith("/")) { - req.Pattern = path + effectiveFilePattern; + req.S3Request.Pattern = path + effectiveFilePattern; } else { // treat paths as regular wildcard patterns if (filePattern) { @@ -636,9 +727,13 @@ private: return false; } - req.Pattern = path; + req.S3Request.Pattern = path; } - req.Pattern = NS3::NormalizePath(req.Pattern); + req.S3Request.Pattern = NS3::NormalizePath(req.S3Request.Pattern); + req.S3Request.PatternType = NS3Lister::ES3PatternType::Wildcard; + req.S3Request.Prefix = req.S3Request.Pattern.substr( + 0, NS3::GetFirstWildcardPos(req.S3Request.Pattern)); + req.Options = ES3ListingOptions::UnPartitionedDataset; reqs.push_back(req); } else { if (NS3::HasWildcards(path)) { @@ -647,14 +742,14 @@ private: } if (!config.Generator) { // Hive-style partitioning - req.PathPrefix = path; + req.S3Request.Prefix = path; if (!path.empty()) { - req.PathPrefix = NS3::NormalizePath(TStringBuilder() << path << "/"); - if (req.PathPrefix == "/") { - req.PathPrefix = ""; + req.S3Request.Prefix = NS3::NormalizePath(TStringBuilder() << path << "/"); + if (req.S3Request.Prefix == "/") { + req.S3Request.Prefix = ""; } } - TString pp = *req.PathPrefix; + TString pp = req.S3Request.Prefix; if (!pp.empty() && pp.back() == '/') { pp.pop_back(); } @@ -668,14 +763,20 @@ private: generated << NS3::EscapeRegex(col) << "=(.*?)"; } generated << '/' << NS3::RegexFromWildcards(effectiveFilePattern); - req.Pattern = generated; + req.S3Request.Pattern = generated; + req.S3Request.PatternType = NS3Lister::ES3PatternType::Regexp; + req.Options = ES3ListingOptions::PartitionedDataset; reqs.push_back(req); } else { for (auto& rule : config.Generator->GetRules()) { YQL_ENSURE(rule.ColumnValues.size() == config.Columns.size()); req.ColumnValues.assign(rule.ColumnValues.begin(), rule.ColumnValues.end()); // Pattern will be directory path - req.Pattern = NS3::NormalizePath(TStringBuilder() << path << "/" << rule.Path); + req.S3Request.Pattern = NS3::NormalizePath(TStringBuilder() << path << "/" << rule.Path); + req.S3Request.PatternType = NS3Lister::ES3PatternType::Wildcard; + req.S3Request.Prefix = req.S3Request.Pattern.substr( + 0, NS3::GetFirstWildcardPos(req.S3Request.Pattern)); + req.Options = ES3ListingOptions::PartitionedDataset; reqs.push_back(req); } } @@ -684,16 +785,16 @@ private: for (auto& req : reqs) { RequestsByNode_[read.Raw()].push_back(req); if (PendingRequests_.find(req) == PendingRequests_.end()) { - NThreading::TFuture<IS3Lister::TListResult> future; + NThreading::TFuture<NS3Lister::TListResult> future; if (config.Generator) { // postpone actual directory listing (will do it after path pruning) - IS3Lister::TListEntries entries(1); - entries.back().Path = req.Pattern; - future = NThreading::MakeFuture<IS3Lister::TListResult>(std::move(entries)); + NS3Lister::TListEntries entries{ + std::vector<NS3Lister::TObjectListEntry>{1}, + std::vector<NS3Lister::TDirectoryListEntry>{}}; + entries.Objects.back().Path = req.S3Request.Pattern; + future = NThreading::MakeFuture<NS3Lister::TListResult>(std::move(entries)); } else { - future = req.PathPrefix.Defined() ? - Lister_->ListRegex(req.Token, req.Url, req.Pattern, *req.PathPrefix) : - Lister_->List(req.Token, req.Url, req.Pattern); + future = ListingStrategy_->List(req.S3Request, req.Options); } PendingRequests_[req] = future; futures.push_back(std::move(future)); @@ -705,7 +806,9 @@ private: } const TS3State::TPtr State_; - const IS3Lister::TPtr Lister_; + const IS3ListerFactory::TPtr ListerFactory_; + const IS3ListingStrategy::TPtr ListingStrategy_; + const IS3BatchListingStrategy::TPtr ListingBatchStrategy_; TPendingRequests PendingRequests_; TNodeMap<TVector<TListRequest>> RequestsByNode_; diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp deleted file mode 100644 index 4fc30924b1..0000000000 --- a/ydb/library/yql/providers/s3/provider/yql_s3_list.cpp +++ /dev/null @@ -1,330 +0,0 @@ -#include "yql_s3_list.h" -#include "yql_s3_path.h" - -#include <ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h> -#include <ydb/library/yql/providers/s3/common/util.h> -#include <ydb/library/yql/utils/log/log.h> -#include <ydb/library/yql/utils/url_builder.h> -#include <ydb/library/yql/utils/yql_panic.h> - -#include <contrib/libs/re2/re2/re2.h> - -#ifdef THROW -#undef THROW -#endif -#include <library/cpp/threading/future/async_semaphore.h> -#include <library/cpp/xml/document/xml-document.h> -#include <util/folder/iterator.h> -#include <util/generic/guid.h> -#include <util/string/builder.h> - -namespace NYql { - -namespace { - -using namespace NThreading; - -class TS3Lister : public IS3Lister { -public: - explicit TS3Lister(const IHTTPGateway::TPtr& httpGateway, ui64 maxFilesPerQuery, bool allowLocalFiles) - : Gateway(httpGateway) - , MaxFilesPerQuery(maxFilesPerQuery) - , AllowLocalFiles(allowLocalFiles) - {} -private: - using TResultFilter = std::function<bool (const TString& path, TVector<TString>& matchedGlobs)>; - - static TResultFilter MakeFilter(const TString& pattern, const TMaybe<TString>& regexPatternPrefix, TString& prefix) { - const bool isRegex = regexPatternPrefix.Defined(); - prefix = isRegex ? *regexPatternPrefix : pattern.substr(0, NS3::GetFirstWildcardPos(pattern)); - if (!isRegex && prefix == pattern) { - // just match for equality - return [pattern](const TString& path, TVector<TString>& matchedGlobs) { - matchedGlobs.clear(); - return path == pattern; - }; - } - - const auto regex = isRegex ? pattern : NS3::RegexFromWildcards(pattern); - auto re = std::make_shared<RE2>(re2::StringPiece(regex), RE2::Options()); - YQL_ENSURE(re->ok()); - - const size_t numGroups = re->NumberOfCapturingGroups(); - YQL_CLOG(INFO, ProviderS3) << "Got prefix: '" << prefix << "', regex: '" << regex - << "' with " << numGroups << " capture groups from original pattern '" << pattern << "'"; - - auto groups = std::make_shared<std::vector<std::string>>(numGroups); - auto reArgs = std::make_shared<std::vector<re2::RE2::Arg>>(numGroups); - auto reArgsPtr = std::make_shared<std::vector<re2::RE2::Arg*>>(numGroups); - - for (size_t i = 0; i < numGroups; ++i) { - (*reArgs)[i] = &(*groups)[i]; - (*reArgsPtr)[i] = &(*reArgs)[i]; - } - - return [groups, reArgs, reArgsPtr, re](const TString& path, TVector<TString>& matchedGlobs) { - matchedGlobs.clear(); - bool matched = re2::RE2::FullMatchN(path, *re, reArgsPtr->data(), reArgsPtr->size()); - if (matched) { - matchedGlobs.reserve(groups->size()); - for (auto& group : *groups) { - matchedGlobs.push_back(ToString(group)); - } - } - return matched; - }; - } - - static void OnDiscovery( - const IHTTPGateway::TWeakPtr& gatewayWeak, - IHTTPGateway::TResult&& result, - NThreading::TPromise<IS3Lister::TListResult> promise, - const std::shared_ptr<IS3Lister::TListEntries>& output, - const IRetryPolicy<long>::TPtr& retryPolicy, - const TResultFilter& filter, - const TString& token, - const TString& urlStr, - const TString& prefix, - const TString& requestId, - ui64 maxDiscoveryFilesPerQuery) - try { - auto gateway = gatewayWeak.lock(); - if (!gateway) { - ythrow yexception() << "Gateway disappeared"; - } - switch (result.index()) { - case 0U: { - const NXml::TDocument xml(std::get<IHTTPGateway::TContent>(std::move(result)).Extract(), NXml::TDocument::String); - if (const auto& root = xml.Root(); root.Name() == "Error") { - const auto& code = root.Node("Code", true).Value<TString>(); - const auto& message = root.Node("Message", true).Value<TString>(); - ythrow yexception() << message << ", error: code: " << code << " [" << urlStr << prefix << "], request id: [" << requestId << "]"; - } else if (root.Name() != "ListBucketResult") { - ythrow yexception() << "Unexpected response '" << root.Name() << "' on discovery, request id: [" << requestId << "]"; - } else if ( - const NXml::TNamespacesForXPath nss(1U, {"s3", "http://s3.amazonaws.com/doc/2006-03-01/"}); - root.Node("s3:KeyCount", false, nss).Value<unsigned>() > 0U) - { - const auto& contents = root.XPath("s3:Contents", false, nss); - YQL_CLOG(INFO, ProviderS3) << "Listing of " << urlStr << prefix << ": have " << output->size() << " entries, got another " << contents.size() << " entries, request id: [" << requestId << "]"; - if (maxDiscoveryFilesPerQuery && output->size() + contents.size() > maxDiscoveryFilesPerQuery) { - ythrow yexception() << "Over " << maxDiscoveryFilesPerQuery << " files discovered in '" << urlStr << prefix << "', request id: [" << requestId << "]"; - } - - for (const auto& content : contents) { - TString path = content.Node("s3:Key", false, nss).Value<TString>(); - if (path.EndsWith('/')) { - // skip 'directories' - continue; - } - TVector<TString> matchedGlobs; - if (filter(path, matchedGlobs)) { - output->emplace_back(); - output->back().Path = path; - output->back().Size = content.Node("s3:Size", false, nss).Value<ui64>(); - output->back().MatchedGlobs.swap(matchedGlobs); - } - } - - if (root.Node("s3:IsTruncated", false, nss).Value<bool>()) { - YQL_CLOG(INFO, ProviderS3) << "Listing of " << urlStr << prefix << ": got truncated flag, will continue"; - const auto& next = root.Node("s3:NextContinuationToken", false, nss).Value<TString>(); - const auto& maxKeys = root.Node("s3:MaxKeys", false, nss).Value<TString>(); - - IHTTPGateway::THeaders headers; - if (!token.empty()) { - headers.emplace_back("X-YaCloud-SubjectToken:" + token); - } - - TString requestId = CreateGuidAsString(); - headers.emplace_back(TString{"X-Request-ID:"} + requestId); - - TUrlBuilder urlBuilder(urlStr); - auto url = urlBuilder.AddUrlParam("list-type", "2") - .AddUrlParam("prefix", prefix) - .AddUrlParam("continuation-token", next) - .AddUrlParam("max-keys", maxKeys) - .Build(); - - return gateway->Download( - url, - std::move(headers), - 0U, - 0U, - std::bind(&OnDiscovery, - IHTTPGateway::TWeakPtr(gateway), - std::placeholders::_1, - promise, - output, - retryPolicy, - filter, - token, - urlStr, - prefix, - requestId, - maxDiscoveryFilesPerQuery), - /*data=*/"", - retryPolicy); - } - } - promise.SetValue(std::move(*output)); - break; - } - case 1U: { - auto issues = std::get<TIssues>(std::move(result)); - issues = NS3Util::AddParentIssue(TStringBuilder{} << "request id: [" << requestId << "]", std::move(issues)); - YQL_CLOG(INFO, ProviderS3) << "Listing of " << urlStr << prefix << ": got error from http gateway: " << issues.ToString(true); - promise.SetValue(std::move(issues)); - break; - } - default: - ythrow yexception() << "Undefined variant index: " << result.index() << ", request id: [" << requestId << "]"; - } - } catch (const std::exception& ex) { - YQL_CLOG(INFO, ProviderS3) << "Listing of " << urlStr << prefix << " : got exception: " << ex.what(); - promise.SetException(std::current_exception()); - } - - TFuture<TListResult> DoList(const TString& token, const TString& urlStr, const TString& pattern, const TMaybe<TString>& pathPrefix) { - TString prefix; - TResultFilter filter = MakeFilter(pattern, pathPrefix, prefix); - auto promise = NewPromise<IS3Lister::TListResult>(); - auto future = promise.GetFuture(); - - if (urlStr.substr(0, 7) == "file://") { - try { - if (!AllowLocalFiles) { - ythrow yexception() << "Using local files as DataSource isn't allowed, but trying access " << urlStr; - } - auto fullPath = urlStr.substr(7); - for (const auto &e: TPathSplit(fullPath)) { - if (e == "..") { - ythrow yexception() << "Security violation: trying access parent directory in path"; - } - } - - IS3Lister::TListEntries output; - - for (const auto& entry: TDirIterator(fullPath)) { - if (entry.fts_type != FTS_F) { - continue; - } - - auto filename = TString(entry.fts_path + urlStr.size() - 7); - TVector<TString> matches; - if (filter(filename, matches)) { - output.emplace_back(); - output.back().Path = filename; - output.back().Size = entry.fts_statp->st_size; - output.back().MatchedGlobs.swap(matches); - } - } - - promise.SetValue(std::move(output)); - } catch (const std::exception& ex) { - promise.SetException(std::current_exception()); - } - return future; - } - - - const auto retryPolicy = GetHTTPDefaultRetryPolicy(); - TUrlBuilder urlBuilder(urlStr); - const auto url = urlBuilder - .AddUrlParam("list-type", "2") - .AddUrlParam("prefix", prefix) - .Build(); - - IHTTPGateway::THeaders headers; - if (!token.empty()) { - headers.emplace_back("X-YaCloud-SubjectToken:" + token); - } - - TString requestId = CreateGuidAsString(); - headers.emplace_back(TString{"X-Request-ID:"} + requestId); - - Gateway->Download( - url, - std::move(headers), - 0U, - 0U, - std::bind(&OnDiscovery, - IHTTPGateway::TWeakPtr(Gateway), - std::placeholders::_1, - promise, - std::make_shared<IS3Lister::TListEntries>(), - retryPolicy, - filter, - token, - urlStr, - prefix, - requestId, - MaxFilesPerQuery), - /*data=*/"", - retryPolicy); - return future; - } - - NThreading::TFuture<TListResult> List(const TString& token, const TString& url, const TString& pattern) override { - YQL_CLOG(INFO, ProviderS3) << "Enumerating items using glob pattern " << url << pattern; - return DoList(token, url, pattern, {}); - } - - NThreading::TFuture<TListResult> ListRegex(const TString& token, const TString& url, const TString& pattern, const TString& pathPrefix) override { - YQL_CLOG(INFO, ProviderS3) << "Enumerating items using RE2 pattern " << url << pattern; - return DoList(token, url, pattern, pathPrefix); - } - - const IHTTPGateway::TPtr Gateway; - const ui64 MaxFilesPerQuery; - const bool AllowLocalFiles; -}; - -class TS3ParallelLimitedLister : public IS3Lister { -public: - explicit TS3ParallelLimitedLister(const IS3Lister::TPtr& lister, size_t maxParallelOps = 1) - : Lister(lister) - , Semaphore(TAsyncSemaphore::Make(std::max<size_t>(1, maxParallelOps))) - {} - -private: - TFuture<TListResult> DoList(const TString& token, const TString& url, const TString& pattern, const TMaybe<TString>& pathPrefix) { - auto promise = NewPromise<TListResult>(); - auto future = promise.GetFuture(); - auto acquired = Semaphore->AcquireAsync(); - acquired.Subscribe([lister = Lister, promise, token, url, pattern, pathPrefix](const auto& f) { - auto lock = std::make_shared<TAsyncSemaphore::TAutoRelease>(f.GetValue()->MakeAutoRelease()); - TFuture<TListResult> listFuture = pathPrefix.Defined() ? - lister->ListRegex(token, url, pattern, *pathPrefix) : - lister->List(token, url, pattern); - listFuture.Subscribe([promise, lock](const auto& f) mutable { - try { - promise.SetValue(f.GetValue()); - } catch (...) { - promise.SetException(std::current_exception()); - } - }); - }); - return future; - } - - TFuture<TListResult> List(const TString& token, const TString& url, const TString& pattern) override { - return DoList(token, url, pattern, {}); - } - - TFuture<TListResult> ListRegex(const TString& token, const TString& url, const TString& pattern, const TString& pathPrefix) override { - return DoList(token, url, pattern, pathPrefix); - } - - const IS3Lister::TPtr Lister; - const TAsyncSemaphore::TPtr Semaphore; -}; - -} - -IS3Lister::TPtr IS3Lister::Make(const IHTTPGateway::TPtr& httpGateway, ui64 maxFilesPerQuery, ui64 maxInflightListsPerQuery, bool allowLocalFiles) { - auto lister = IS3Lister::TPtr(new TS3Lister(httpGateway, maxFilesPerQuery, allowLocalFiles)); - return IS3Lister::TPtr(new TS3ParallelLimitedLister(lister, maxInflightListsPerQuery)); -} - -} diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_list.h b/ydb/library/yql/providers/s3/provider/yql_s3_list.h deleted file mode 100644 index 04f5dd1a74..0000000000 --- a/ydb/library/yql/providers/s3/provider/yql_s3_list.h +++ /dev/null @@ -1,41 +0,0 @@ -#pragma once - -#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> - -#include <library/cpp/threading/future/future.h> - -#include <variant> -#include <vector> -#include <memory> - -namespace NYql { - -class IS3Lister { -public: - using TPtr = std::shared_ptr<IS3Lister>; - - struct TListEntry { - TString Path; - ui64 Size = 0; - std::vector<TString> MatchedGlobs; - }; - - using TListEntries = std::vector<TListEntry>; - using TListResult = std::variant<TListEntries, TIssues>; - - virtual ~IS3Lister() = default; - // List all S3 objects matching wildcard pattern. - // Pattern may include following wildcard expressions: - // * - any (possibly empty) sequence of characters - // ? - single character - // {variant1, variant2} - list of alternatives - virtual NThreading::TFuture<TListResult> List(const TString& token, const TString& url, const TString& pattern) = 0; - - // pattern should be valid RE2 regex - // pathPrefix is a "constant" path prefix - virtual NThreading::TFuture<TListResult> ListRegex(const TString& token, const TString& url, const TString& pattern, const TString& pathPrefix) = 0; - - static TPtr Make(const IHTTPGateway::TPtr& httpGateway, ui64 maxFilesPerQuery, ui64 maxInflightListsPerQuery, bool allowLocalFiles); -}; - -}
\ No newline at end of file diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp new file mode 100644 index 0000000000..1d8d4f4463 --- /dev/null +++ b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.cpp @@ -0,0 +1,724 @@ +#include "yql_s3_listing_strategy.h" + +#include <ydb/library/yql/providers/s3/common/util.h> +#include <ydb/library/yql/providers/s3/object_listers/yql_s3_future_algorithms.h> +#include <ydb/library/yql/providers/s3/object_listers/yql_s3_path.h> +#include <ydb/library/yql/utils/log/log.h> +#include <ydb/library/yql/utils/url_builder.h> + +#ifdef THROW +#undef THROW +#endif +#include <library/cpp/threading/future/async_semaphore.h> +#include <library/cpp/xml/document/xml-document.h> + +#include <util/generic/overloaded.h> +#include <deque> +#include <utility> + +namespace NYql { + +IOutputStream& operator<<(IOutputStream& stream, ES3ListingOptions option) { + switch (option) { + case ES3ListingOptions::NoOptions: + stream << "[NoOptions]"; + break; + case ES3ListingOptions::PartitionedDataset: + stream << "[PartitionedDataset]"; + break; + case ES3ListingOptions::UnPartitionedDataset: + stream << "[UnPartitionedDataset]"; + break; + default: + ythrow yexception() << "Undefined option: " << int(option); + } + return stream; +} + +namespace { + +using namespace NThreading; + +TIssue MakeLimitExceededIssue() { + auto issue = TIssue("Limit exceeded"); + issue.SetCode(0U, ESeverity::TSeverityIds_ESeverityId_S_WARNING); + return issue; +} + +bool IsRecoverableIssue(const TIssues& issues) { + if (issues.Size() != 1) { + return false; + } + + return issues.begin()->GetSeverity() >= ESeverity::TSeverityIds_ESeverityId_S_WARNING; +} + +class TCollectingS3ListingStrategy : public IS3ListingStrategy { +public: + using TListerFactoryMethod = std::function<TFuture<NS3Lister::IS3Lister::TPtr>( + const NS3Lister::TListingRequest& listingRequest, ES3ListingOptions options)>; + + TCollectingS3ListingStrategy(size_t limit, TListerFactoryMethod&& listerFactoryMethod) + : Limit(limit) + , ListerFactoryMethod(std::move(listerFactoryMethod)) { } + + TFuture<NS3Lister::TListResult> List( + const NS3Lister::TListingRequest& listingRequest, + ES3ListingOptions options) override { + Y_UNUSED(options); + auto futureLister = ListerFactoryMethod(listingRequest, options); + return futureLister.Apply([this, listingRequest, options]( + const TFuture<NS3Lister::IS3Lister::TPtr>& lister) { + try { + return DoListCallback(lister.GetValue(), options); + } catch (...) { + return MakeErrorFuture<NS3Lister::TListResult>(std::current_exception()); + } + }); + } + +private: + static inline auto MakeNewListingChunkHandler( + NS3Lister::TListResult& state, size_t limit) { + return [&state, limit](NS3Lister::TListEntries&& chunkEntries) { + auto& stateEntries = std::get<NS3Lister::TListEntries>(state); + if (stateEntries.Size() + chunkEntries.Size() > limit) { + YQL_CLOG(INFO, ProviderS3) + << "[TCollectingS3ListingStrategy] Collected " + << stateEntries.Size() + chunkEntries.Size() + << " object paths which is more than limit " << limit; + state = TIssues{MakeLimitExceededIssue()}; + return EAggregationAction::Stop; + } + stateEntries += std::move(chunkEntries); + return EAggregationAction::Proceed; + }; + } + static inline auto MakeIssuesHandler(NS3Lister::TListResult& state) { + return [&state](const TIssues& issues) { + state = issues; + return EAggregationAction::Stop; + }; + } + static inline EAggregationAction ExceptionHandler( + NS3Lister::TListResult& state, const std::exception& exception) { + state = TIssues{TIssue{exception.what()}}; + return EAggregationAction::Stop; + } + TFuture<NS3Lister::TListResult> DoListCallback( + NS3Lister::IS3Lister::TPtr lister, ES3ListingOptions options) const { + Y_UNUSED(options); + return NYql::AccumulateWithEarlyStop<NS3Lister::TListResult>( + std::move(lister), + NS3Lister::TListResult{}, + [limit = Limit](NS3Lister::TListResult& state, NS3Lister::TListResult&& chunk) { + return std::visit( + TOverloaded{ + std::move(MakeNewListingChunkHandler(state, limit)), + std::move(MakeIssuesHandler(state))}, + std::move(chunk)); + }, + ExceptionHandler); + } + +private: + const size_t Limit; + const TListerFactoryMethod ListerFactoryMethod; +}; + +class TFlatFileS3ListingStrategy : public TCollectingS3ListingStrategy { +public: + TFlatFileS3ListingStrategy( + const IS3ListerFactory::TPtr& listerFactory, + const IHTTPGateway::TPtr& httpGateway, + size_t limit, + bool allowLocalFiles) + : TCollectingS3ListingStrategy( + limit, + [allowLocalFiles, httpGateway, listerFactory]( + const NS3Lister::TListingRequest& listingRequest, + ES3ListingOptions options) { + Y_UNUSED(options); + return listerFactory->Make( + httpGateway, listingRequest, Nothing(), allowLocalFiles); + }) { } +}; + +class TDirectoryS3ListingStrategy : public TCollectingS3ListingStrategy { +public: + TDirectoryS3ListingStrategy( + const IS3ListerFactory::TPtr& listerFactory, + const IHTTPGateway::TPtr& httpGateway, + size_t limit, + bool allowLocalFiles) + : TCollectingS3ListingStrategy( + limit, + [allowLocalFiles, httpGateway, listerFactory]( + const NS3Lister::TListingRequest& listingRequest, + ES3ListingOptions options) { + Y_UNUSED(options); + return listerFactory->Make( + httpGateway, listingRequest, "/", allowLocalFiles); + }) { } +}; + +class TCompositeS3ListingStrategy : public IS3ListingStrategy { +public: + using TStrategyContainer = std::vector<std::shared_ptr<IS3ListingStrategy>>; + using TStrategyContainerPtr = std::shared_ptr<TStrategyContainer>; + + class TStrategyListIterator : public NS3Lister::IS3Lister { + public: + TStrategyListIterator( + NS3Lister::TListingRequest listingRequest, + ES3ListingOptions options, + TStrategyContainerPtr strategies) + : ListingRequest(std::move(listingRequest)) + , Options(options) + , Strategies(std::move(strategies)) + , Iterator(Strategies->cbegin()) + , End(Strategies->end()) { } + + TFuture<NS3Lister::TListResult> Next() override { + return (*Iterator)->List(ListingRequest, Options); + } + + bool HasNext() override { + if (!IsFirst) { + Iterator++; + } else { + IsFirst = false; + } + return Iterator != End; + } + + private: + const NS3Lister::TListingRequest ListingRequest; + const ES3ListingOptions Options; + const TStrategyContainerPtr Strategies; + TStrategyContainer::const_iterator Iterator; + const TStrategyContainer::const_iterator End; + bool IsFirst = true; + }; + + class AggregationState { + public: + NS3Lister::TListResult Result; + bool Set = false; + }; + + explicit TCompositeS3ListingStrategy(TStrategyContainer&& strategies) + : Strategies(std::make_shared<TStrategyContainer>(std::move(strategies))) { } + + TFuture<NS3Lister::TListResult> List( + const NS3Lister::TListingRequest& listingRequest, + ES3ListingOptions options) override { + auto strategyListIterator = + std::make_unique<TStrategyListIterator>(listingRequest, options, Strategies); + + return NYql::AccumulateWithEarlyStop<AggregationState>( + std::move(strategyListIterator), + AggregationState{NS3Lister::TListEntries{}}, + [](AggregationState& state, NS3Lister::TListResult&& chunk) { + auto newChunkHandler = + [&state](NS3Lister::TListEntries&& chunkEntries) mutable { + YQL_CLOG(INFO, ProviderS3) + << "[TCompositeS3ListingStrategy] Strategy successfully listed paths. Returning result: " + << chunkEntries.Objects.size() << " objects, " + << chunkEntries.Directories.size() << " path prefixes"; + std::get<NS3Lister::TListEntries>(state.Result) = + chunkEntries; + state.Set = true; + return EAggregationAction::Stop; + }; + auto errorHandler = [&state](const TIssues& issues) mutable { + if (IsRecoverableIssue(issues)) { + YQL_CLOG(INFO, ProviderS3) + << "[TCompositeS3ListingStrategy] Strategy failed " + << " to list paths. Trying next one... "; + return EAggregationAction::Proceed; + } + + state.Result = issues; + state.Set = true; + return EAggregationAction::Stop; + }; + return std::visit( + TOverloaded{ + std::move(newChunkHandler), + std::move(errorHandler), + }, + std::move(chunk)); + }, + [](AggregationState& state, const std::exception& exception) { + state.Result = TIssues{TIssue{exception.what()}}; + state.Set = true; + return EAggregationAction::Stop; + }) + .Apply([](const TFuture<AggregationState>& state) { + auto& result = state.GetValue(); + return result.Set ? result.Result + : TIssues{TIssue("No more strategy to test")}; + }); + } + +private: + const TStrategyContainerPtr Strategies; +}; + +class TConditionalS3ListingStrategy : public IS3ListingStrategy { +public: + using TStrategyCondition = std::function<bool(ES3ListingOptions options)>; + using TPair = std::pair<TStrategyCondition, std::shared_ptr<IS3ListingStrategy>>; + + TConditionalS3ListingStrategy(std::initializer_list<TPair> list) + : Strategies(list.begin(), list.end()) { } + + TFuture<NS3Lister::TListResult> List( + const NS3Lister::TListingRequest& listingRequest, + ES3ListingOptions options) override { + auto strategyIt = std::find_if( + Strategies.begin(), Strategies.end(), [options](const TPair& record) { + auto [condition, strategy] = record; + return condition(options); + }); + + if (strategyIt == Strategies.end()) { + auto issue = TIssues{TIssue("No strategy matched listing request")}; + return MakeFuture(NS3Lister::TListResult{issue}); + } + + return strategyIt->second->List(listingRequest, options); + } + +private: + const std::vector<TPair> Strategies; +}; + +class TPartitionedDatasetS3ListingStrategy : public TCollectingS3ListingStrategy { +public: + class PartitionedDirectoryResolverIterator : public NS3Lister::IS3Lister { + public: + PartitionedDirectoryResolverIterator( + NS3Lister::TListingRequest defaultParams, + const TString& basePrefix, + ES3ListingOptions options, + TDirectoryS3ListingStrategy directoryListingStrategy) + : DefaultParams(std::move(defaultParams)) + , Options(options) + , DirectoryPrefixQueue(std::make_shared<std::deque<TString>>()) + , DirectoryListingStrategy(std::move(directoryListingStrategy)) { + DirectoryPrefixQueue->push_front(basePrefix); + } + + /** + * For each directory in listing queue: + * 1) List path + * 2) If files are matched against regexp - remember them (extract extra columns) + * 3) If file is not matched against regexp - filter it out + * 3) If directory is matched against regexp - remember them (extract extra columns) + * 4) If directory is not matched against regexp - list this directory recursively + * 5) if there is no directory to list - algorithm ends + */ + TFuture<NS3Lister::TListResult> Next() override { + First = false; + NextDirectoryListeningChunk = GetNextPrefixLister().Apply( + [queue = DirectoryPrefixQueue]( + const TFuture<NS3Lister::TListResult>& future) -> NS3Lister::TListResult { + try { + auto& nextBatch = future.GetValue(); + if (std::holds_alternative<TIssues>(nextBatch)) { + return std::get<TIssues>(nextBatch); + } + + auto& listingResult = std::get<NS3Lister::TListEntries>(nextBatch); + auto result = NS3Lister::TListEntries{}; + result.Objects.insert( + result.Objects.begin(), + listingResult.Objects.cbegin(), + listingResult.Objects.cend()); + for (auto& directoryPrefix : listingResult.Directories) { + if (directoryPrefix.MatchedGlobs.empty()) { + // We need to list until extra columns are extracted + queue->push_back(directoryPrefix.Path); + } else { + result.Directories.push_back(directoryPrefix); + } + } + + return NS3Lister::TListResult{result}; + } catch (std::exception& e) { + return TIssues{TIssue{e.what()}}; + } + }); + return NextDirectoryListeningChunk; + } + + bool HasNext() override { + if (!First) { + NextDirectoryListeningChunk.Wait(); + } + return !DirectoryPrefixQueue->empty(); + } + + private: + TFuture<NS3Lister::TListResult> GetNextPrefixLister() { + if (DirectoryPrefixQueue->empty()) { + return MakeFuture( + NS3Lister::TListResult{TIssues{TIssue{"No path to list"}}}); + } + auto prefix = DirectoryPrefixQueue->front(); + DirectoryPrefixQueue->pop_front(); + + auto request = NS3Lister::TListingRequest(DefaultParams); + request.Prefix = prefix; + + return DirectoryListingStrategy.List(request, Options); + } + + private: + const NS3Lister::TListingRequest DefaultParams; + const ES3ListingOptions Options; + std::shared_ptr<std::deque<TString>> DirectoryPrefixQueue; + std::vector<NS3Lister::TObjectListEntry> Objects; + TDirectoryS3ListingStrategy DirectoryListingStrategy; + TFuture<NS3Lister::TListResult> NextDirectoryListeningChunk; + bool First = true; + }; + + TPartitionedDatasetS3ListingStrategy( + const IS3ListerFactory::TPtr& listerFactory, + const IHTTPGateway::TPtr& httpGateway, + size_t limit, + bool allowLocalFiles) + : TCollectingS3ListingStrategy( + limit, + [listerFactory, httpGateway, limit, allowLocalFiles]( + const NS3Lister::TListingRequest& listingRequest, + ES3ListingOptions options) { + auto ptr = std::shared_ptr<NS3Lister::IS3Lister>( + new PartitionedDirectoryResolverIterator{ + listingRequest, + listingRequest.Prefix, + options, + TDirectoryS3ListingStrategy{ + listerFactory, httpGateway, limit, allowLocalFiles}}); + return MakeFuture(std::move(ptr)); + }) { } +}; + +class TBFSDirectoryResolverIterator : public NS3Lister::IS3Lister { +public: + using TListingRequestFactory = std::function<NS3Lister::TListingRequest( + const NS3Lister::TListingRequest& defaultParams, const TString& pathPrefix)>; + + TBFSDirectoryResolverIterator( + NS3Lister::TListingRequest defaultParams, + TListingRequestFactory listingRequestFactory, + ES3ListingOptions options, + std::deque<TString> initialPathPrefixes, + TDirectoryS3ListingStrategy directoryListingStrategy, + size_t minParallelism, + size_t limit) + : DefaultParams(std::move(defaultParams)) + , Options(options) + , ListingRequestFactory(std::move(listingRequestFactory)) + , DirectoryPrefixQueue(std::move(initialPathPrefixes)) + , DirectoryListingStrategy(std::move(directoryListingStrategy)) + , MinParallelism(minParallelism) + , Limit(limit) { } + + TFuture<NS3Lister::TListResult> Next() override { + First = false; + if (DirectoryPrefixQueue.empty()) { + return MakeFuture(NS3Lister::TListResult{TIssues{TIssue{"No path to list"}}}); + } + auto sourcePrefix = DirectoryPrefixQueue.front(); + DirectoryPrefixQueue.pop_front(); + NextDirectoryListeningChunk = + GetPrefixLister(sourcePrefix) + .Apply( + [this, sourcePrefix](const TFuture<NS3Lister::TListResult>& future) + -> NS3Lister::TListResult { + try { + auto& nextBatch = future.GetValue(); + if (std::holds_alternative<TIssues>(nextBatch)) { + return std::get<TIssues>(nextBatch); + } + + auto& listingResult = + std::get<NS3Lister::TListEntries>(nextBatch); + auto result = NS3Lister::TListEntries{}; + auto currentListingTotalSize = ReturnedSize + + DirectoryPrefixQueue.size() + + listingResult.Size(); + if (currentListingTotalSize > Limit) { + // Stop listing + result.Directories.push_back({.Path = sourcePrefix}); + for (auto& directoryPrefix : DirectoryPrefixQueue) { + result.Directories.push_back({.Path = directoryPrefix}); + } + DirectoryPrefixQueue.clear(); + } else { + result.Objects.insert( + result.Objects.end(), + std::make_move_iterator(listingResult.Objects.begin()), + std::make_move_iterator(listingResult.Objects.end())); + if (currentListingTotalSize < MinParallelism) { + for (auto& directoryPrefix : listingResult.Directories) { + DirectoryPrefixQueue.push_back(directoryPrefix.Path); + } + } else { + for (auto& directoryPrefix : listingResult.Directories) { + result.Directories.push_back( + {.Path = directoryPrefix.Path}); + } + for (auto& directoryPrefix : DirectoryPrefixQueue) { + result.Directories.push_back( + {.Path = directoryPrefix}); + } + DirectoryPrefixQueue.clear(); + } + } + ReturnedSize += result.Size(); + return NS3Lister::TListResult{result}; + } catch (std::exception& e) { + return TIssues{TIssue{e.what()}}; + } + }); + return NextDirectoryListeningChunk; + } + + bool HasNext() override { + if (!First) { + NextDirectoryListeningChunk.Wait(); + if (NextDirectoryListeningChunk.HasException()) { + return false; + } + NextDirectoryListeningChunk.GetValue(); + } + return !DirectoryPrefixQueue.empty(); + } + +private: + TFuture<NS3Lister::TListResult> GetPrefixLister(const TString& prefix) { + const auto& listingRequest = ListingRequestFactory(DefaultParams, prefix); + auto listResult = DirectoryListingStrategy.List(listingRequest, Options); + return listResult; + } + +private: + const NS3Lister::TListingRequest DefaultParams; + const ES3ListingOptions Options; + TListingRequestFactory ListingRequestFactory; + std::deque<TString> DirectoryPrefixQueue; + std::vector<NS3Lister::TObjectListEntry> Objects; + TDirectoryS3ListingStrategy DirectoryListingStrategy; + TFuture<NS3Lister::TListResult> NextDirectoryListeningChunk; + bool First = true; + size_t ReturnedSize = 0; + const size_t MinParallelism; + const size_t Limit; +}; + +class TUnPartitionedDatasetS3ListingStrategy : public TCollectingS3ListingStrategy { +public: + TUnPartitionedDatasetS3ListingStrategy( + const IS3ListerFactory::TPtr& listerFactory, + const IHTTPGateway::TPtr& httpGateway, + size_t limit, + size_t minParallelism, + bool allowLocalFiles) + : TCollectingS3ListingStrategy( + limit, + [listerFactory, httpGateway, limit, minParallelism, allowLocalFiles]( + const NS3Lister::TListingRequest& listingRequest, + ES3ListingOptions options) { + auto ptr = std::shared_ptr<NS3Lister::IS3Lister>( + new TBFSDirectoryResolverIterator{ + listingRequest, + [](const NS3Lister::TListingRequest& defaultParams, + const TString& pathPrefix) { + NS3Lister::TListingRequest request(defaultParams); + request.Prefix = pathPrefix; + return request; + }, + options, + std::deque<TString>{ + (!listingRequest.Prefix.empty()) + ? listingRequest.Prefix + : listingRequest.Pattern.substr( + 0, NS3::GetFirstWildcardPos(listingRequest.Pattern))}, + TDirectoryS3ListingStrategy{ + listerFactory, httpGateway, limit, allowLocalFiles}, + minParallelism, + limit}); + return MakeFuture(std::move(ptr)); + }) { } +}; + +class TSimpleS3BatchListingStrategy : public IS3BatchListingStrategy { +public: + TSimpleS3BatchListingStrategy( + IS3ListerFactory::TPtr listerFactory, + IHTTPGateway::TPtr httpGateway, + size_t limit, + size_t minParallelism, + bool allowLocalFiles) + : ListerFactory(std::move(listerFactory)) + , HttpGateway(std::move(httpGateway)) + , Limit(limit) + , MinParallelism(minParallelism) + , AllowLocalFiles(allowLocalFiles) { } + + NThreading::TFuture<NS3Lister::TListResult> List( + const NS3Lister::TListingRequest& baseRequest, + const std::vector<TString>& paths, + TPatternFactory patternFactory, + ES3ListingOptions options) override { + Y_ENSURE( + options == ES3ListingOptions::UnPartitionedDataset, + "This strategy only works for un partitioned datasets"); + + auto strategy = TCollectingS3ListingStrategy( + Limit, + [this, paths, patternFactory]( + const NS3Lister::TListingRequest& listingRequest, + ES3ListingOptions options) { + auto listerPtr = std::make_shared<TBFSDirectoryResolverIterator>( + listingRequest, + [patternFactory]( + const NS3Lister::TListingRequest& defaultParams, + const TString& pathPrefix) { + NS3Lister::TListingRequest request(defaultParams); + request.Prefix = pathPrefix; + request.Pattern = patternFactory(defaultParams, pathPrefix); + return request; + }, + options, + std::deque<TString>(paths.begin(), paths.end()), + TDirectoryS3ListingStrategy{ + ListerFactory, HttpGateway, Limit, AllowLocalFiles}, + MinParallelism, + Limit); + return MakeFuture(std::static_pointer_cast<NS3Lister::IS3Lister>(listerPtr)); + }); + + return strategy.List(baseRequest, options); + }; + +private: + IS3ListerFactory::TPtr ListerFactory; + const IHTTPGateway::TPtr HttpGateway; + const size_t Limit; + const size_t MinParallelism; + const bool AllowLocalFiles; +}; + +class TS3ParallelLimitedListerFactory : public IS3ListerFactory { +public: + using TPtr = std::shared_ptr<TS3ParallelLimitedListerFactory>; + + explicit TS3ParallelLimitedListerFactory(size_t maxParallelOps = 1) + : Semaphore(TAsyncSemaphore::Make(std::max<size_t>(1, maxParallelOps))) { } + + TFuture<NS3Lister::IS3Lister::TPtr> Make( + const IHTTPGateway::TPtr& httpGateway, + const NS3Lister::TListingRequest& listingRequest, + const TMaybe<TString>& delimiter, + bool allowLocalFiles) override { + auto acquired = Semaphore->AcquireAsync(); + return acquired.Apply( + [httpGateway, listingRequest, delimiter, allowLocalFiles](const auto& f) { + return std::shared_ptr<NS3Lister::IS3Lister>(new TListerLockReleaseWrapper{ + NS3Lister::MakeS3Lister( + httpGateway, listingRequest, delimiter, allowLocalFiles), + std::make_unique<TAsyncSemaphore::TAutoRelease>( + f.GetValue()->MakeAutoRelease())}); + }); + } + +private: + class TListerLockReleaseWrapper : public NS3Lister::IS3Lister { + public: + using TLockPtr = std::unique_ptr<TAsyncSemaphore::TAutoRelease>; + + TListerLockReleaseWrapper(NS3Lister::IS3Lister::TPtr listerPtr, TLockPtr lock) + : ListerPtr(std::move(listerPtr)) + , Lock(std::move(lock)) { + if (ListerPtr == nullptr) { + Lock.reset(); + } + } + + TFuture<NS3Lister::TListResult> Next() override { return ListerPtr->Next(); } + bool HasNext() override { + auto hasNext = ListerPtr->HasNext(); + if (!hasNext) { + Lock.reset(); + } + return ListerPtr->HasNext(); + } + + private: + NS3Lister::IS3Lister::TPtr ListerPtr; + TLockPtr Lock; + }; + +private: + const TAsyncSemaphore::TPtr Semaphore; +}; + +} // namespace + +IS3ListerFactory::TPtr MakeS3ListerFactory(size_t maxParallelOps) { + return std::make_shared<TS3ParallelLimitedListerFactory>(maxParallelOps); +} + +IS3ListingStrategy::TPtr MakeS3ListingStrategy( + const IHTTPGateway::TPtr& httpGateway, + const IS3ListerFactory::TPtr& listerFactory, + ui64 maxFilesPerQueryFiles, + ui64 maxFilesPerQueryDirectory, + ui64 minDesiredDirectoriesOfFilesPerQuery, + bool allowLocalFiles) { + return std::make_shared<TCompositeS3ListingStrategy>( + std::vector<std::shared_ptr<IS3ListingStrategy>>{ + std::make_shared<TFlatFileS3ListingStrategy>( + listerFactory, httpGateway, maxFilesPerQueryFiles, allowLocalFiles), + std::make_shared<TConditionalS3ListingStrategy>( + std::initializer_list<TConditionalS3ListingStrategy::TPair>{ + {[](ES3ListingOptions options) { + return options == ES3ListingOptions::PartitionedDataset; + }, + std::make_shared<TPartitionedDatasetS3ListingStrategy>( + listerFactory, httpGateway, maxFilesPerQueryDirectory, allowLocalFiles)}, + {[](ES3ListingOptions options) { + return options == ES3ListingOptions::UnPartitionedDataset; + }, + std::make_shared<TUnPartitionedDatasetS3ListingStrategy>( + listerFactory, + httpGateway, + maxFilesPerQueryDirectory, + minDesiredDirectoriesOfFilesPerQuery, + allowLocalFiles)}, + })}); +} + +IS3BatchListingStrategy::TPtr MakeS3BatchListingStrategy( + const IHTTPGateway::TPtr& httpGateway, + const IS3ListerFactory::TPtr& listerFactory, + ui64 maxFilesPerQueryFiles, + ui64 maxFilesPerQueryDirectory, + ui64 minDesiredDirectoriesOfFilesPerQuery, + bool allowLocalFiles) { + auto maxSize = std::max(maxFilesPerQueryFiles, maxFilesPerQueryDirectory); + return std::make_shared<TSimpleS3BatchListingStrategy>( + listerFactory, + httpGateway, + maxSize, + minDesiredDirectoriesOfFilesPerQuery, + allowLocalFiles); +} + +} // namespace NYql diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.h b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.h new file mode 100644 index 0000000000..a77d4d27a0 --- /dev/null +++ b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy.h @@ -0,0 +1,78 @@ +#pragma once + +#include <ydb/library/yql/providers/s3/object_listers/yql_s3_list.h> +#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> + +#include <library/cpp/threading/future/future.h> + +#include <memory> +#include <variant> +#include <vector> + +namespace NYql { + +class IS3ListerFactory { +public: + using TPtr = std::shared_ptr<IS3ListerFactory>; + + virtual NThreading::TFuture<NS3Lister::IS3Lister::TPtr> Make( + const IHTTPGateway::TPtr& httpGateway, + const NS3Lister::TListingRequest& listingRequest, + const TMaybe<TString>& delimiter, + bool allowLocalFiles) = 0; + + virtual ~IS3ListerFactory() = default; +}; + +IS3ListerFactory::TPtr MakeS3ListerFactory(size_t maxParallelOps); + +enum class ES3ListingOptions : ui8 { + NoOptions = 0, + UnPartitionedDataset = 1, + PartitionedDataset = 2 +}; + +IOutputStream& operator<<(IOutputStream& stream, ES3ListingOptions option); + +class IS3ListingStrategy { +public: + using TPtr = std::shared_ptr<IS3ListingStrategy>; + + virtual NThreading::TFuture<NS3Lister::TListResult> List( + const NS3Lister::TListingRequest& listingRequest, ES3ListingOptions options) = 0; + + virtual ~IS3ListingStrategy() = default; +}; + +IS3ListingStrategy::TPtr MakeS3ListingStrategy( + const IHTTPGateway::TPtr& httpGateway, + const IS3ListerFactory::TPtr& listerFactory, + ui64 maxFilesPerQueryFiles, + ui64 maxFilesPerQueryDirectory, + ui64 minDesiredDirectoriesOfFilesPerQuery, + bool allowLocalFiles); + +class IS3BatchListingStrategy { +public: + using TPatternFactory = + std::function<TString(const NS3Lister::TListingRequest& baseRequest, const TString& prefix)>; + using TPtr = std::shared_ptr<IS3BatchListingStrategy>; + + virtual NThreading::TFuture<NS3Lister::TListResult> List( + const NS3Lister::TListingRequest& baseRequest, + const std::vector<TString>& paths, + TPatternFactory prefixFactory, + ES3ListingOptions options) = 0; + virtual ~IS3BatchListingStrategy() = default; +}; + +IS3BatchListingStrategy::TPtr MakeS3BatchListingStrategy( + const IHTTPGateway::TPtr& httpGateway, + const IS3ListerFactory::TPtr& listerFactory, + ui64 maxFilesPerQueryFiles, + ui64 maxFilesPerQueryDirectory, + ui64 minDesiredDirectoriesOfFilesPerQuery, + bool allowLocalFiles); + + +} // namespace NYql
\ No newline at end of file diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp new file mode 100644 index 0000000000..b03bf0e0cc --- /dev/null +++ b/ydb/library/yql/providers/s3/provider/yql_s3_listing_strategy_ut.cpp @@ -0,0 +1,216 @@ +#include "yql_s3_listing_strategy.cpp" + +#include <library/cpp/testing/unittest/registar.h> + +namespace NYql { + +class TMockS3Lister : public NS3Lister::IS3Lister { +public: + TMockS3Lister(std::vector<NS3Lister::TListResult> batches) + : Batches(std::move(batches)) { } + + TFuture<NS3Lister::TListResult> Next() override { + Y_ENSURE(!Consumed); + Consumed = true; + return MakeFuture(Batches[Step]); + } + bool HasNext() override { + Y_ENSURE(Consumed); + ++Step; + Consumed = false; + return static_cast<ui16>(Step) < Batches.size(); + } + +private: + std::vector<NS3Lister::TListResult> Batches; + i32 Step = -1; + bool Consumed = true; +}; + +class TMockS3ExceptionLister : public NS3Lister::IS3Lister { +public: + TMockS3ExceptionLister(TString exceptionMessage) + : ExceptionMessage(std::move(exceptionMessage)) { } + + TFuture<NS3Lister::TListResult> Next() override { + Y_ENSURE(!Consumed); + Consumed = true; + auto promise = NewPromise<NS3Lister::TListResult>(); + promise.SetException(ExceptionMessage); + return promise; + } + bool HasNext() override { return !Consumed; } + +private: + TString ExceptionMessage; + bool Consumed = false; +}; + +Y_UNIT_TEST_SUITE(TCollectingS3ListingStrategyTests) { + +void UnitAssertListResultEquals( + const NS3Lister::TListResult& expected, const NS3Lister::TListResult& actual) { + UNIT_ASSERT_VALUES_EQUAL(expected.index(), actual.index()); + if (std::holds_alternative<TIssues>(expected)) { + const auto& expectedIssues = std::get<TIssues>(expected); + const auto& actualIssues = std::get<TIssues>(actual); + UNIT_ASSERT_VALUES_EQUAL(expectedIssues.Size(), actualIssues.Size()); + UNIT_ASSERT_VALUES_EQUAL( + expectedIssues.ToOneLineString(), actualIssues.ToOneLineString()); + return; + } + + const auto& expectedEntries = std::get<NS3Lister::TListEntries>(expected); + const auto& actualEntries = std::get<NS3Lister::TListEntries>(actual); + + UNIT_ASSERT_VALUES_EQUAL(expectedEntries.Objects.size(), actualEntries.Objects.size()); + for (size_t i = 0; i < expectedEntries.Objects.size(); ++i) { + UNIT_ASSERT_VALUES_EQUAL( + expectedEntries.Objects[i].Path, actualEntries.Objects[i].Path); + UNIT_ASSERT_VALUES_EQUAL( + expectedEntries.Objects[i].Size, actualEntries.Objects[i].Size); + UNIT_ASSERT_VALUES_EQUAL( + expectedEntries.Objects[i].MatchedGlobs.size(), + actualEntries.Objects[i].MatchedGlobs.size()); + for (size_t j = 0; j < expectedEntries.Objects[i].MatchedGlobs.size(); ++j) { + UNIT_ASSERT_VALUES_EQUAL( + expectedEntries.Objects[i].MatchedGlobs[j], + actualEntries.Objects[i].MatchedGlobs[j]); + } + } + UNIT_ASSERT_VALUES_EQUAL( + expectedEntries.Directories.size(), actualEntries.Directories.size()); + for (size_t i = 0; i < expectedEntries.Directories.size(); ++i) { + UNIT_ASSERT_VALUES_EQUAL( + expectedEntries.Directories[i].Path, actualEntries.Directories[i].Path); + UNIT_ASSERT_VALUES_EQUAL( + expectedEntries.Directories[i].MatchedGlobs.size(), + actualEntries.Directories[i].MatchedGlobs.size()); + for (size_t j = 0; j < expectedEntries.Directories[i].MatchedGlobs.size(); ++j) { + UNIT_ASSERT_VALUES_EQUAL( + expectedEntries.Directories[i].MatchedGlobs[j], + actualEntries.Directories[i].MatchedGlobs[j]); + } + } +} + +Y_UNIT_TEST(IfNoIssuesOccursShouldReturnCollectedPaths) { + auto strategy = TCollectingS3ListingStrategy{ + 10, + [](const NS3Lister::TListingRequest& listingRequest, + ES3ListingOptions options) { + UNIT_ASSERT_VALUES_EQUAL(listingRequest.Prefix, "TEST_INPUT"); + UNIT_ASSERT_VALUES_EQUAL(options, ES3ListingOptions::NoOptions); + return MakeFuture(std::static_pointer_cast<NS3Lister::IS3Lister>( + std::make_shared<TMockS3Lister>(std::vector<NS3Lister::TListResult>{ + NS3Lister::TListEntries{ + .Objects = + std::vector<NS3Lister::TObjectListEntry>{NS3Lister::TObjectListEntry{ + .Path = "a/a", + .Size = 10, + }}}, + NS3Lister::TListEntries{ + .Objects = std::vector<NS3Lister::TObjectListEntry>{ + NS3Lister::TObjectListEntry{ + .Path = "a/b", + .Size = 15, + }}}}))); + }}; + + auto actualResultFuture = strategy.List( + NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"}, + ES3ListingOptions::NoOptions); + auto expectedResult = NS3Lister::TListResult{NS3Lister::TListEntries{ + .Objects = std::vector<NS3Lister::TObjectListEntry>{ + NS3Lister::TObjectListEntry{ + .Path = "a/a", + .Size = 10, + }, + NS3Lister::TObjectListEntry{ + .Path = "a/b", + .Size = 15, + }}}}; + const auto& actualResult = actualResultFuture.GetValue(); + UnitAssertListResultEquals(expectedResult, actualResult); +} + +Y_UNIT_TEST(IfThereAreMoreRecordsThanSpecifiedByLimitShouldReturnError) { + auto strategy = TCollectingS3ListingStrategy{ + 1, + [](const NS3Lister::TListingRequest& listingRequest, + ES3ListingOptions options) { + UNIT_ASSERT_VALUES_EQUAL(listingRequest.Prefix, "TEST_INPUT"); + UNIT_ASSERT_VALUES_EQUAL(options, ES3ListingOptions::NoOptions); + return MakeFuture(std::static_pointer_cast<NS3Lister::IS3Lister>( + std::make_shared<TMockS3Lister>(std::vector<NS3Lister::TListResult>{ + NS3Lister::TListEntries{ + .Objects = + std::vector<NS3Lister::TObjectListEntry>{NS3Lister::TObjectListEntry{ + .Path = "a/a", + .Size = 10, + }}}, + NS3Lister::TListEntries{ + .Objects = std::vector<NS3Lister::TObjectListEntry>{ + NS3Lister::TObjectListEntry{ + .Path = "a/b", + .Size = 15, + }}}}))); + }}; + + auto actualResultFuture = strategy.List( + NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"}, + ES3ListingOptions::NoOptions); + auto expectedResult = NS3Lister::TListResult{TIssues{MakeLimitExceededIssue()}}; + const auto& actualResult = actualResultFuture.GetValue(); + UnitAssertListResultEquals(expectedResult, actualResult); +} + +Y_UNIT_TEST(IfAnyIterationReturnIssueThanWholeStrategyShouldReturnIt) { + auto strategy = TCollectingS3ListingStrategy{ + 1, + [](const NS3Lister::TListingRequest& listingRequest, + ES3ListingOptions options) { + UNIT_ASSERT_VALUES_EQUAL(listingRequest.Prefix, "TEST_INPUT"); + UNIT_ASSERT_VALUES_EQUAL(options, ES3ListingOptions::NoOptions); + return MakeFuture(std::static_pointer_cast<NS3Lister::IS3Lister>( + std::make_shared<TMockS3Lister>(std::vector<NS3Lister::TListResult>{ + NS3Lister::TListEntries{ + .Objects = + std::vector<NS3Lister::TObjectListEntry>{NS3Lister::TObjectListEntry{ + .Path = "a/a", + .Size = 10, + }}}, + TIssues{TIssue("TEST_ISSUE")}}))); + }}; + + auto actualResultFuture = strategy.List( + NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"}, + ES3ListingOptions::NoOptions); + auto expectedResult = NS3Lister::TListResult{TIssues{TIssue("TEST_ISSUE")}}; + const auto& actualResult = actualResultFuture.GetValue(); + UnitAssertListResultEquals(expectedResult, actualResult); +} + +Y_UNIT_TEST(IfExceptionIsReturnedFromIteratorThanItShouldCovertItToIssue) { + auto strategy = TCollectingS3ListingStrategy{ + 10, + [](const NS3Lister::TListingRequest& listingRequest, + ES3ListingOptions options) { + UNIT_ASSERT_VALUES_EQUAL(listingRequest.Prefix, "TEST_INPUT"); + UNIT_ASSERT_VALUES_EQUAL(options, ES3ListingOptions::NoOptions); + return MakeFuture(std::static_pointer_cast<NS3Lister::IS3Lister>( + std::make_shared<TMockS3ExceptionLister>("EXCEPTION MESSAGE"))); + }}; + + auto actualResultFuture = strategy.List( + NS3Lister::TListingRequest{.Prefix = "TEST_INPUT"}, + ES3ListingOptions::NoOptions); + UNIT_ASSERT(actualResultFuture.HasValue()); + auto expectedResult = NS3Lister::TListResult{TIssues{TIssue("EXCEPTION MESSAGE")}}; + const auto& actualResult = actualResultFuture.GetValue(); + UnitAssertListResultEquals(expectedResult, actualResult); +} + +} // Y_UNIT_TEST_SUITE(TPathTests2) + +} // namespace NYql diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp index e62c23fab9..bd3e92d89a 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_logical_opt.cpp @@ -262,7 +262,9 @@ public: return TStatus::Error; } - const auto maxFiles = State_->Configuration->MaxFilesPerQuery; + const auto maxFiles = std::max( + State_->Configuration->MaxFilesPerQuery, + State_->Configuration->MaxDirectoriesAndFilesPerQuery); if (count > maxFiles) { ctx.AddError(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder() << "Too many objects to read: " << count << ", but limit is " << maxFiles)); return TStatus::Error; diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp index ab10097afd..72d7a691a0 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp @@ -35,6 +35,8 @@ void TS3Configuration::Init(const TS3GatewayConfig& config, TIntrusivePtr<TTypeA FileSizeLimit = config.HasFileSizeLimit() ? config.GetFileSizeLimit() : 2_GB; MaxFilesPerQuery = config.HasMaxFilesPerQuery() ? config.GetMaxFilesPerQuery() : 7000; MaxDiscoveryFilesPerQuery = config.HasMaxDiscoveryFilesPerQuery() ? config.GetMaxDiscoveryFilesPerQuery() : 9000; + MaxDirectoriesAndFilesPerQuery = config.HasMaxDirectoriesAndFilesPerQuery() ? config.GetMaxDirectoriesAndFilesPerQuery() : 9000; + MinDesiredDirectoriesOfFilesPerQuery = config.HasMinDesiredDirectoriesOfFilesPerQuery() ? config.GetMinDesiredDirectoriesOfFilesPerQuery() : 100; MaxReadSizePerQuery = config.HasMaxReadSizePerQuery() ? config.GetMaxReadSizePerQuery() : 4_GB; MaxInflightListsPerQuery = config.HasMaxInflightListsPerQuery() ? config.GetMaxInflightListsPerQuery() : 1; diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h index 437715fd5a..86bb1ed5c9 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h +++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h @@ -41,6 +41,8 @@ struct TS3Configuration : public TS3Settings, public NCommon::TSettingDispatcher std::unordered_map<TString, ui64> FormatSizeLimits; ui64 MaxFilesPerQuery; ui64 MaxDiscoveryFilesPerQuery; + ui64 MaxDirectoriesAndFilesPerQuery; + ui64 MinDesiredDirectoriesOfFilesPerQuery; ui64 MaxReadSizePerQuery; ui64 MaxInflightListsPerQuery; bool AllowLocalFiles; |