diff options
author | hor911 <hor911@ydb.tech> | 2023-08-24 23:57:07 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2023-08-25 00:11:15 +0300 |
commit | 1f74f9e363b1517cf9797e88d21f05b828f3ae44 (patch) | |
tree | a90c2535459d40be46eee888982ec4bb027f30a5 | |
parent | a04249ef2ae43b56dcca4b5c8e1ba59525feb9be (diff) | |
download | ydb-1f74f9e363b1517cf9797e88d21f05b828f3ae44.tar.gz |
Fix S3 Source Multi-Range Processing
7 files changed, 57 insertions, 59 deletions
diff --git a/ydb/library/yql/dq/proto/dq_tasks.proto b/ydb/library/yql/dq/proto/dq_tasks.proto index e6f82748d3d..90698008313 100644 --- a/ydb/library/yql/dq/proto/dq_tasks.proto +++ b/ydb/library/yql/dq/proto/dq_tasks.proto @@ -184,5 +184,5 @@ message TDqTask { map<string, string> SecureParams = 14; optional uint32 MetaId = 15; optional bool UseLlvm = 16; - repeated string ReadRanges = 17; + repeated bytes ReadRanges = 17; } diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 84cacc6185f..42a144aaf43 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -346,9 +346,9 @@ public: , PatternType(patternType) { for (size_t i = 0; i < paths.size(); ++i) { if (paths[i].IsDirectory) { - Directories.emplace_back(paths[i].Path, 0, i); + Directories.emplace_back(paths[i].Path, 0, paths[i].PathIndex); } else { - Objects.emplace_back(paths[i].Path, paths[i].Size, i); + Objects.emplace_back(paths[i].Path, paths[i].Size, paths[i].PathIndex); } } } @@ -677,7 +677,6 @@ public: ES3PatternVariant patternVariant, TPathList&& paths, bool addPathIndex, - ui64 startPathIndex, const NActors::TActorId& computeActorId, ui64 sizeLimit, const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, @@ -699,7 +698,6 @@ public: , PatternVariant(patternVariant) , Paths(std::move(paths)) , AddPathIndex(addPathIndex) - , StartPathIndex(startPathIndex) , SizeLimit(sizeLimit) , Counters(counters) , TaskCounters(taskCounters) @@ -756,7 +754,7 @@ public: DownloadInflight++; const auto& [path, size, index] = ReadPathFromCache(); auto url = Url + path; - auto id = index + StartPathIndex; + auto id = index; const TString requestId = CreateGuidAsString(); LOG_D("TS3ReadActor", "Download: " << url << ", ID: " << id << ", request id: [" << requestId << "]"); Gateway->Download( @@ -968,7 +966,6 @@ private: size_t CompletedFiles = 0; NActors::TActorId FileQueueActor; const bool AddPathIndex; - const ui64 StartPathIndex; const ui64 SizeLimit; ui64 IngressBytes = 0; TDuration CpuTime; @@ -2227,7 +2224,6 @@ public: ES3PatternVariant patternVariant, TPathList&& paths, bool addPathIndex, - ui64 startPathIndex, const TReadSpec::TPtr& readSpec, const NActors::TActorId& computeActorId, const IHTTPGateway::TRetryPolicy::TPtr& retryPolicy, @@ -2249,7 +2245,6 @@ public: , PatternVariant(patternVariant) , Paths(std::move(paths)) , AddPathIndex(addPathIndex) - , StartPathIndex(startPathIndex) , ReadSpec(readSpec) , Counters(std::move(counters)) , TaskCounters(std::move(taskCounters)) @@ -2363,7 +2358,7 @@ public: TxId, requestId, RetryPolicy); - auto pathIndex = objectPath.PathIndex + StartPathIndex; + auto pathIndex = objectPath.PathIndex; if (TaskCounters) { HttpInflightLimit->Add(Gateway->GetBuffersSizePerStream()); } @@ -2670,7 +2665,6 @@ private: bool IsObjectQueueEmpty = false; bool IsWaitingObjectQueueResponse = false; const bool AddPathIndex; - const ui64 StartPathIndex; size_t ListedFiles = 0; size_t CompletedFiles = 0; const TReadSpec::TPtr ReadSpec; @@ -2864,8 +2858,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( const IFunctionRegistry& functionRegistry = *holderFactory.GetFunctionRegistry(); TPathList paths; - ui64 startPathIndex = 0; - ReadPathsList(params, taskParams, readRanges, paths, startPathIndex); + ReadPathsList(params, taskParams, readRanges, paths); const auto token = secureParams.Value(params.GetToken(), TString{}); const auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token); @@ -3005,7 +2998,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( #undef SET_FLAG #undef SUPPORTED_FLAGS const auto actor = new TS3StreamReadActor(inputIndex, txId, std::move(gateway), holderFactory, params.GetUrl(), authToken, pathPattern, pathPatternVariant, - std::move(paths), addPathIndex, startPathIndex, readSpec, computeActorId, retryPolicy, + std::move(paths), addPathIndex, readSpec, computeActorId, retryPolicy, cfg, counters, taskCounters, fileSizeLimit, memoryQuotaManager); return {actor, actor}; @@ -3015,7 +3008,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor( sizeLimit = FromString<ui64>(it->second); const auto actor = new TS3ReadActor(inputIndex, txId, std::move(gateway), holderFactory, params.GetUrl(), authToken, pathPattern, pathPatternVariant, - std::move(paths), addPathIndex, startPathIndex, computeActorId, sizeLimit, retryPolicy, + std::move(paths), addPathIndex, computeActorId, sizeLimit, retryPolicy, cfg, counters, taskCounters, fileSizeLimit); return {actor, actor}; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp index 61700c459ac..0535de0f960 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp @@ -199,10 +199,10 @@ private: const auto& listEntries = std::get<NS3Lister::TListEntries>(listResult); for (auto& entry : listEntries.Objects) { - listedPaths.emplace_back(entry.Path, entry.Size, false); + listedPaths.emplace_back(entry.Path, entry.Size, false, dirIndex + i); } for (auto& path : listEntries.Directories) { - listedPaths.emplace_back(path.Path, 0, true); + listedPaths.emplace_back(path.Path, 0, true, dirIndex + i); } } @@ -358,7 +358,7 @@ private: } auto& pathList = pathsByExtraValues[extraValues]; - pathList.emplace_back(NS3Details::TPath{entry.Path, entry.Size, false}); + pathList.emplace_back(NS3Details::TPath{entry.Path, entry.Size, false, pathList.size()}); readSize += entry.Size; } for (auto& entry: listEntries.Directories) { @@ -369,7 +369,7 @@ private: } auto& pathList = pathsByExtraValues[extraValues]; - pathList.emplace_back(NS3Details::TPath{entry.Path, 0, true}); + pathList.emplace_back(NS3Details::TPath{entry.Path, 0, true, pathList.size()}); } YQL_CLOG(INFO, ProviderS3) << "Pattern " << req.S3Request.Pattern << " has " << listEntries.Size() << " items with total size " << readSize; diff --git a/ydb/library/yql/providers/s3/range_helpers/file_tree_builder_ut.cpp b/ydb/library/yql/providers/s3/range_helpers/file_tree_builder_ut.cpp index 911e8119264..4ae5a609ab3 100644 --- a/ydb/library/yql/providers/s3/range_helpers/file_tree_builder_ut.cpp +++ b/ydb/library/yql/providers/s3/range_helpers/file_tree_builder_ut.cpp @@ -107,13 +107,13 @@ Y_UNIT_TEST_SUITE(S3FileTreeBuilderTest) { b.Save(&range); TPathList paths; - ui64 startPathIndex = 0; - ReadPathsList({}, MakeParams(range), {}, paths, startPathIndex); + ReadPathsList({}, MakeParams(range), {}, paths); UNIT_ASSERT_VALUES_EQUAL(paths.size(), 1); UNIT_ASSERT_VALUES_EQUAL(paths[0].Path, "name"); UNIT_ASSERT_VALUES_EQUAL(paths[0].Size, 0); UNIT_ASSERT_VALUES_EQUAL(paths[0].IsDirectory, false); + UNIT_ASSERT_VALUES_EQUAL(paths[0].PathIndex, 0); } Y_UNIT_TEST(DeserializesManySlashes) { @@ -124,13 +124,13 @@ Y_UNIT_TEST_SUITE(S3FileTreeBuilderTest) { b.Save(&range); TPathList paths; - ui64 startPathIndex = 0; - ReadPathsList({}, MakeParams(range), {}, paths, startPathIndex); + ReadPathsList({}, MakeParams(range), {}, paths); UNIT_ASSERT_VALUES_EQUAL(paths.size(), 1); UNIT_ASSERT_VALUES_EQUAL(paths[0].Path, "a///b"); UNIT_ASSERT_VALUES_EQUAL(paths[0].Size, 42); UNIT_ASSERT_VALUES_EQUAL(paths[0].IsDirectory, false); + UNIT_ASSERT_VALUES_EQUAL(paths[0].PathIndex, 0); } Y_UNIT_TEST(DeserializesTrailingSlash) { @@ -142,17 +142,18 @@ Y_UNIT_TEST_SUITE(S3FileTreeBuilderTest) { b.Save(&range); TPathList paths; - ui64 startPathIndex = 0; - ReadPathsList({}, MakeParams(range), {}, paths, startPathIndex); + ReadPathsList({}, MakeParams(range), {}, paths); UNIT_ASSERT_VALUES_EQUAL(paths.size(), 2); UNIT_ASSERT_VALUES_EQUAL(paths[0].Path, "root/name/"); UNIT_ASSERT_VALUES_EQUAL(paths[0].Size, 0); UNIT_ASSERT_VALUES_EQUAL(paths[0].IsDirectory, true); + UNIT_ASSERT_VALUES_EQUAL(paths[0].PathIndex, 0); UNIT_ASSERT_VALUES_EQUAL(paths[1].Path, "root/name//"); UNIT_ASSERT_VALUES_EQUAL(paths[1].Size, 3); UNIT_ASSERT_VALUES_EQUAL(paths[1].IsDirectory, true); + UNIT_ASSERT_VALUES_EQUAL(paths[1].PathIndex, 1); } Y_UNIT_TEST(DeserializesLeadingSlash) { @@ -164,18 +165,19 @@ Y_UNIT_TEST_SUITE(S3FileTreeBuilderTest) { b.Save(&range); TPathList paths; - ui64 startPathIndex = 0; - ReadPathsList({}, MakeParams(range), {}, paths, startPathIndex); + ReadPathsList({}, MakeParams(range), {}, paths); UNIT_ASSERT_VALUES_EQUAL(paths.size(), 2); UNIT_ASSERT_VALUES_EQUAL(paths[0].Path, "/"); UNIT_ASSERT_VALUES_EQUAL(paths[0].Size, 42); UNIT_ASSERT_VALUES_EQUAL(paths[0].IsDirectory, true); + UNIT_ASSERT_VALUES_EQUAL(paths[0].PathIndex, 0); UNIT_ASSERT_VALUES_EQUAL(paths[1].Path, "/root/name"); UNIT_ASSERT_VALUES_EQUAL(paths[1].Size, 3); UNIT_ASSERT_VALUES_EQUAL(paths[1].IsDirectory, false); + UNIT_ASSERT_VALUES_EQUAL(paths[1].PathIndex, 1); } } diff --git a/ydb/library/yql/providers/s3/range_helpers/path_list_reader.cpp b/ydb/library/yql/providers/s3/range_helpers/path_list_reader.cpp index 328d2eb5c3b..d30685a45fc 100644 --- a/ydb/library/yql/providers/s3/range_helpers/path_list_reader.cpp +++ b/ydb/library/yql/providers/s3/range_helpers/path_list_reader.cpp @@ -13,7 +13,7 @@ namespace NYql::NS3Details { -static void BuildPathsFromTree(const google::protobuf::RepeatedPtrField<NYql::NS3::TRange::TPath>& children, TPathList& paths, TString& currentPath, size_t currentDepth = 0) { +static void BuildPathsFromTree(const google::protobuf::RepeatedPtrField<NYql::NS3::TRange::TPath>& children, TPathList& paths, TString& currentPath, size_t currentDepth, ui64& nextPathIndex) { if (children.empty()) { return; } @@ -26,23 +26,28 @@ static void BuildPathsFromTree(const google::protobuf::RepeatedPtrField<NYql::NS if (path.GetRead()) { auto isDirectory = path.GetIsDirectory(); auto readPath = isDirectory ? currentPath + "/" : currentPath; - paths.emplace_back(TPath{readPath, path.GetSize(), path.GetIsDirectory()}); + paths.emplace_back(TPath{readPath, path.GetSize(), path.GetIsDirectory(), nextPathIndex++}); } - BuildPathsFromTree(path.GetChildren(), paths, currentPath, currentDepth + 1); + BuildPathsFromTree(path.GetChildren(), paths, currentPath, currentDepth + 1, nextPathIndex); currentPath.resize(prevSize); } } -void DecodeS3Range(const NS3::TSource& sourceDesc, const TString& data, TPathList& paths, ui64& startPathIndex) { +static void BuildPathsFromTree(const google::protobuf::RepeatedPtrField<NYql::NS3::TRange::TPath>& children, TPathList& paths, TString& currentPath, size_t currentDepth = 0) { + ui64 nextPathIndex = 0; + BuildPathsFromTree(children, paths, currentPath, currentDepth, nextPathIndex); +} + +void DecodeS3Range(const NS3::TSource& sourceDesc, const TString& data, TPathList& paths) { NS3::TRange range; TStringInput input(data); range.Load(&input); - startPathIndex = range.GetStartPathIndex(); + auto startPathIndex = range.GetStartPathIndex(); // Modern way if (range.PathsSize()) { TString buf; - return BuildPathsFromTree(range.GetPaths(), paths, buf); + return BuildPathsFromTree(range.GetPaths(), paths, buf, 0, startPathIndex); } std::unordered_map<TString, size_t> map(sourceDesc.GetDeprecatedPath().size()); @@ -54,38 +59,32 @@ void DecodeS3Range(const NS3::TSource& sourceDesc, const TString& data, TPathLis const auto& path = range.GetDeprecatedPath().Get(i); auto it = map.find(path); YQL_ENSURE(it != map.end()); - paths.emplace_back(TPath{path, it->second, false}); + paths.emplace_back(TPath{path, it->second, false, i + startPathIndex}); } } -void ReadPathsList(const NS3::TSource& sourceDesc, const THashMap<TString, TString>& taskParams, const TVector<TString>& readRanges, TPathList& paths, ui64& startPathIndex) { +void ReadPathsList(const NS3::TSource& sourceDesc, const THashMap<TString, TString>& taskParams, const TVector<TString>& readRanges, TPathList& paths) { if (!readRanges.empty()) { - bool firstReadRange = true; for (auto readRange : readRanges) { - ui64 rangeStartPathIndex; - DecodeS3Range(sourceDesc, readRange, paths, rangeStartPathIndex); - if (firstReadRange) { - startPathIndex = rangeStartPathIndex; - firstReadRange = false; - } else { - YQL_ENSURE(startPathIndex == rangeStartPathIndex); - } + DecodeS3Range(sourceDesc, readRange, paths); } } else if (const auto taskParamsIt = taskParams.find(S3ProviderName); taskParamsIt != taskParams.cend()) { - DecodeS3Range(sourceDesc, taskParamsIt->second, paths, startPathIndex); + DecodeS3Range(sourceDesc, taskParamsIt->second, paths); } else { for (auto i = 0; i < sourceDesc.GetDeprecatedPath().size(); ++i) { paths.emplace_back(TPath{ sourceDesc.GetDeprecatedPath().Get(i).GetPath(), sourceDesc.GetDeprecatedPath().Get(i).GetSize(), - false}); + false, + static_cast<ui64>(i)}); } } } void PackPathsList(const TPathList& paths, TString& packed, bool& isTextEncoded) { TFileTreeBuilder builder; - for (const auto& [path, size, isDirectory] : paths) { + for (const auto& [path, size, isDirectory, pathIndex] : paths) { + Y_UNUSED(pathIndex); builder.AddPath(path, size, isDirectory); } NS3::TRange range; diff --git a/ydb/library/yql/providers/s3/range_helpers/path_list_reader.h b/ydb/library/yql/providers/s3/range_helpers/path_list_reader.h index 8d883d1c8ec..ab490d0cae5 100644 --- a/ydb/library/yql/providers/s3/range_helpers/path_list_reader.h +++ b/ydb/library/yql/providers/s3/range_helpers/path_list_reader.h @@ -13,15 +13,17 @@ struct TPath { TString Path; size_t Size = 0; bool IsDirectory = false; + ui64 PathIndex = 0; - TPath(TString path, size_t size, bool isDirectory) + TPath(TString path, size_t size, bool isDirectory, ui64 pathIndex) : Path(std::move(path)) , Size(size) - , IsDirectory(isDirectory) { } + , IsDirectory(isDirectory) + , PathIndex(pathIndex) { } }; using TPathList = std::vector<TPath>; -void ReadPathsList(const NS3::TSource& sourceDesc, const THashMap<TString, TString>& taskParams, const TVector<TString>& readRanges, TPathList& paths, ui64& startPathIndex); +void ReadPathsList(const NS3::TSource& sourceDesc, const THashMap<TString, TString>& taskParams, const TVector<TString>& readRanges, TPathList& paths); void PackPathsList(const TPathList& paths, TString& packed, bool& isTextEncoded); void UnpackPathsList(TStringBuf packed, bool isTextEncoded, TPathList& paths); diff --git a/ydb/library/yql/providers/s3/range_helpers/path_list_reader_ut.cpp b/ydb/library/yql/providers/s3/range_helpers/path_list_reader_ut.cpp index e36381dead0..8e28d06e0aa 100644 --- a/ydb/library/yql/providers/s3/range_helpers/path_list_reader_ut.cpp +++ b/ydb/library/yql/providers/s3/range_helpers/path_list_reader_ut.cpp @@ -32,19 +32,19 @@ Y_UNIT_TEST_SUITE(PathListReaderTest) { } TPathList paths; - ui64 startPathIndex = 42; - ReadPathsList(src, {}, {}, paths, startPathIndex); + ReadPathsList(src, {}, {}, paths); - UNIT_ASSERT_VALUES_EQUAL(startPathIndex, 42); UNIT_ASSERT_VALUES_EQUAL(paths.size(), 2); UNIT_ASSERT_VALUES_EQUAL(paths[0].Path, "my/path"); UNIT_ASSERT_VALUES_EQUAL(paths[0].Size, 100500); UNIT_ASSERT_VALUES_EQUAL(paths[0].IsDirectory, false); + UNIT_ASSERT_VALUES_EQUAL(paths[0].PathIndex, 0); UNIT_ASSERT_VALUES_EQUAL(paths[1].Path, "other/path"); UNIT_ASSERT_VALUES_EQUAL(paths[1].Size, 1); UNIT_ASSERT_VALUES_EQUAL(paths[1].IsDirectory, false); + UNIT_ASSERT_VALUES_EQUAL(paths[1].PathIndex, 1); } Y_UNIT_TEST(ReadsFilesListFromParamsAndSourceSettings) { @@ -65,15 +65,14 @@ Y_UNIT_TEST_SUITE(PathListReaderTest) { range.AddDeprecatedPath("my/path"); TPathList paths; - ui64 startPathIndex = 0; - ReadPathsList(src, MakeParams(range), {}, paths, startPathIndex); + ReadPathsList(src, MakeParams(range), {}, paths); - UNIT_ASSERT_VALUES_EQUAL(startPathIndex, 42); UNIT_ASSERT_VALUES_EQUAL(paths.size(), 1); UNIT_ASSERT_VALUES_EQUAL(paths[0].Path, "my/path"); UNIT_ASSERT_VALUES_EQUAL(paths[0].Size, 100500); UNIT_ASSERT_VALUES_EQUAL(paths[0].IsDirectory, false); + UNIT_ASSERT_VALUES_EQUAL(paths[0].PathIndex, 42); } NYql::NS3::TRange::TPath* SetPath(NYql::NS3::TRange::TPath* path, const TString& name = {}, ui64 size = 0, bool read = false) { @@ -111,31 +110,34 @@ Y_UNIT_TEST_SUITE(PathListReaderTest) { } TPathList paths; - ui64 startPathIndex = 0; - ReadPathsList(src, MakeParams(range), {}, paths, startPathIndex); + ReadPathsList(src, MakeParams(range), {}, paths); - UNIT_ASSERT_VALUES_EQUAL(startPathIndex, 42); UNIT_ASSERT_VALUES_EQUAL(paths.size(), 5); UNIT_ASSERT_VALUES_EQUAL(paths[0].Path, "root"); UNIT_ASSERT_VALUES_EQUAL(paths[0].Size, 1); UNIT_ASSERT_VALUES_EQUAL(paths[0].IsDirectory, false); + UNIT_ASSERT_VALUES_EQUAL(paths[0].PathIndex, 42); UNIT_ASSERT_VALUES_EQUAL(paths[1].Path, "root/folder/f1"); UNIT_ASSERT_VALUES_EQUAL(paths[1].Size, 42); UNIT_ASSERT_VALUES_EQUAL(paths[1].IsDirectory, false); + UNIT_ASSERT_VALUES_EQUAL(paths[1].PathIndex, 43); UNIT_ASSERT_VALUES_EQUAL(paths[2].Path, "root/folder/f2"); UNIT_ASSERT_VALUES_EQUAL(paths[2].Size, 100500); UNIT_ASSERT_VALUES_EQUAL(paths[2].IsDirectory, false); + UNIT_ASSERT_VALUES_EQUAL(paths[2].PathIndex, 44); UNIT_ASSERT_VALUES_EQUAL(paths[3].Path, "root/f3"); UNIT_ASSERT_VALUES_EQUAL(paths[3].Size, 0); UNIT_ASSERT_VALUES_EQUAL(paths[3].IsDirectory, false); + UNIT_ASSERT_VALUES_EQUAL(paths[3].PathIndex, 45); UNIT_ASSERT_VALUES_EQUAL(paths[4].Path, "root2/f4"); UNIT_ASSERT_VALUES_EQUAL(paths[4].Size, 42); UNIT_ASSERT_VALUES_EQUAL(paths[4].IsDirectory, false); + UNIT_ASSERT_VALUES_EQUAL(paths[4].PathIndex, 46); } } |