diff options
author | fedor-miron <fedor-miron@ydb.tech> | 2023-12-06 21:10:28 +0300 |
---|---|---|
committer | fedor-miron <fedor-miron@ydb.tech> | 2023-12-06 23:46:58 +0300 |
commit | 40302d2a5a8b044cc96c75e956916c2a18ad4fdb (patch) | |
tree | abb17a18b8a12ffbd5a93215ffc72702d968b0a5 | |
parent | b2c749509aee1b26713f88e93aa7c7f7059e31e0 (diff) | |
download | ydb-40302d2a5a8b044cc96c75e956916c2a18ad4fdb.tar.gz |
YQL-9853: implement WalkFolders
30 files changed, 1266 insertions, 142 deletions
diff --git a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json index 09f1cb27f4..42daf56f28 100644 --- a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json +++ b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json @@ -707,6 +707,15 @@ "Match": {"Type": "Callable", "Name": "AggregateFinalize"} }, { + "Name": "TCoFold", + "Base": "TCoInputBase", + "Match": {"Type": "Callable", "Name": "Fold"}, + "Children": [ + {"Index": 1, "Name": "State", "Type": "TExprBase"}, + {"Index": 2, "Name": "UpdateHandler", "Type": "TCoLambda"} + ] + }, + { "Name": "TCoFold1", "Base": "TCoInputBase", "Match": {"Type": "Callable", "Name": "Fold1"}, @@ -2393,7 +2402,6 @@ "Base": "TCoInputBase", "Match": {"Type": "Callable", "Name": "WideFromBlocks"} }, - { "Name": "TCoPgSelect", "Base": "TCallable", @@ -2419,6 +2427,23 @@ {"Index": 1, "Name": "Type", "Type": "TCallable"}, {"Index": 2, "Name": "Lambda", "Type": "TCoLambda"} ] + }, + { + "Name": "TCoPickle", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "Pickle"}, + "Children": [ + {"Index": 0, "Name": "Value", "Type": "TExprBase"} + ] + }, + { + "Name": "TCoUnpickle", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "Unpickle"}, + "Children": [ + {"Index": 0, "Name": "Type", "Type": "TCallable"}, + {"Index": 1, "Name": "Buffer", "Type": "TExprBase"} + ] } ] } diff --git a/ydb/library/yql/core/expr_nodes_gen/gen/__main__.py b/ydb/library/yql/core/expr_nodes_gen/gen/__main__.py index 2bc809f4e7..2bc809f4e7 100644..100755 --- a/ydb/library/yql/core/expr_nodes_gen/gen/__main__.py +++ b/ydb/library/yql/core/expr_nodes_gen/gen/__main__.py diff --git a/ydb/library/yql/core/services/mounts/CMakeLists.darwin-arm64.txt b/ydb/library/yql/core/services/mounts/CMakeLists.darwin-arm64.txt index 21c6fedd14..656e2bf4a6 100644 --- a/ydb/library/yql/core/services/mounts/CMakeLists.darwin-arm64.txt +++ b/ydb/library/yql/core/services/mounts/CMakeLists.darwin-arm64.txt @@ -40,20 +40,22 @@ target_link_libraries(core-services-mounts.global PUBLIC library-yql-core ) target_sources(core-services-mounts.global PRIVATE - ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/bee1a30d03545744c170685330eaf0c3.cpp + ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/97eabb381e1d4dd4acd7e8000ea2563e.cpp ) resources(core-services-mounts.global - ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/bee1a30d03545744c170685330eaf0c3.cpp + ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/97eabb381e1d4dd4acd7e8000ea2563e.cpp INPUTS ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/aggregate.yql ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/window.yql ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/id.yql ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/sqr.yql ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/core.yql + ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/walk_folders.yql KEYS /lib/yql/aggregate.yql /lib/yql/window.yql /lib/yql/id.yql /lib/yql/sqr.yql /lib/yql/core.yql + /lib/yql/walk_folders.yql ) diff --git a/ydb/library/yql/core/services/mounts/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/core/services/mounts/CMakeLists.darwin-x86_64.txt index 21c6fedd14..656e2bf4a6 100644 --- a/ydb/library/yql/core/services/mounts/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/core/services/mounts/CMakeLists.darwin-x86_64.txt @@ -40,20 +40,22 @@ target_link_libraries(core-services-mounts.global PUBLIC library-yql-core ) target_sources(core-services-mounts.global PRIVATE - ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/bee1a30d03545744c170685330eaf0c3.cpp + ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/97eabb381e1d4dd4acd7e8000ea2563e.cpp ) resources(core-services-mounts.global - ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/bee1a30d03545744c170685330eaf0c3.cpp + ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/97eabb381e1d4dd4acd7e8000ea2563e.cpp INPUTS ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/aggregate.yql ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/window.yql ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/id.yql ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/sqr.yql ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/core.yql + ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/walk_folders.yql KEYS /lib/yql/aggregate.yql /lib/yql/window.yql /lib/yql/id.yql /lib/yql/sqr.yql /lib/yql/core.yql + /lib/yql/walk_folders.yql ) diff --git a/ydb/library/yql/core/services/mounts/CMakeLists.linux-aarch64.txt b/ydb/library/yql/core/services/mounts/CMakeLists.linux-aarch64.txt index 83a9cd73e8..336f24123f 100644 --- a/ydb/library/yql/core/services/mounts/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/core/services/mounts/CMakeLists.linux-aarch64.txt @@ -42,20 +42,22 @@ target_link_libraries(core-services-mounts.global PUBLIC library-yql-core ) target_sources(core-services-mounts.global PRIVATE - ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/bee1a30d03545744c170685330eaf0c3.cpp + ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/97eabb381e1d4dd4acd7e8000ea2563e.cpp ) resources(core-services-mounts.global - ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/bee1a30d03545744c170685330eaf0c3.cpp + ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/97eabb381e1d4dd4acd7e8000ea2563e.cpp INPUTS ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/aggregate.yql ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/window.yql ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/id.yql ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/sqr.yql ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/core.yql + ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/walk_folders.yql KEYS /lib/yql/aggregate.yql /lib/yql/window.yql /lib/yql/id.yql /lib/yql/sqr.yql /lib/yql/core.yql + /lib/yql/walk_folders.yql ) diff --git a/ydb/library/yql/core/services/mounts/CMakeLists.linux-x86_64.txt b/ydb/library/yql/core/services/mounts/CMakeLists.linux-x86_64.txt index 83a9cd73e8..336f24123f 100644 --- a/ydb/library/yql/core/services/mounts/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/core/services/mounts/CMakeLists.linux-x86_64.txt @@ -42,20 +42,22 @@ target_link_libraries(core-services-mounts.global PUBLIC library-yql-core ) target_sources(core-services-mounts.global PRIVATE - ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/bee1a30d03545744c170685330eaf0c3.cpp + ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/97eabb381e1d4dd4acd7e8000ea2563e.cpp ) resources(core-services-mounts.global - ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/bee1a30d03545744c170685330eaf0c3.cpp + ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/97eabb381e1d4dd4acd7e8000ea2563e.cpp INPUTS ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/aggregate.yql ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/window.yql ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/id.yql ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/sqr.yql ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/core.yql + ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/walk_folders.yql KEYS /lib/yql/aggregate.yql /lib/yql/window.yql /lib/yql/id.yql /lib/yql/sqr.yql /lib/yql/core.yql + /lib/yql/walk_folders.yql ) diff --git a/ydb/library/yql/core/services/mounts/CMakeLists.windows-x86_64.txt b/ydb/library/yql/core/services/mounts/CMakeLists.windows-x86_64.txt index 21c6fedd14..656e2bf4a6 100644 --- a/ydb/library/yql/core/services/mounts/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/core/services/mounts/CMakeLists.windows-x86_64.txt @@ -40,20 +40,22 @@ target_link_libraries(core-services-mounts.global PUBLIC library-yql-core ) target_sources(core-services-mounts.global PRIVATE - ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/bee1a30d03545744c170685330eaf0c3.cpp + ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/97eabb381e1d4dd4acd7e8000ea2563e.cpp ) resources(core-services-mounts.global - ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/bee1a30d03545744c170685330eaf0c3.cpp + ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/97eabb381e1d4dd4acd7e8000ea2563e.cpp INPUTS ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/aggregate.yql ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/window.yql ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/id.yql ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/sqr.yql ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/core.yql + ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/walk_folders.yql KEYS /lib/yql/aggregate.yql /lib/yql/window.yql /lib/yql/id.yql /lib/yql/sqr.yql /lib/yql/core.yql + /lib/yql/walk_folders.yql ) diff --git a/ydb/library/yql/core/services/mounts/ya.make b/ydb/library/yql/core/services/mounts/ya.make index 5092ba058e..4a85b1b3e4 100644 --- a/ydb/library/yql/core/services/mounts/ya.make +++ b/ydb/library/yql/core/services/mounts/ya.make @@ -19,6 +19,7 @@ RESOURCE( ydb/library/yql/mount/lib/yql/id.yql /lib/yql/id.yql ydb/library/yql/mount/lib/yql/sqr.yql /lib/yql/sqr.yql ydb/library/yql/mount/lib/yql/core.yql /lib/yql/core.yql + ydb/library/yql/mount/lib/yql/walk_folders.yql /lib/yql/walk_folders.yql ) END() diff --git a/ydb/library/yql/core/services/mounts/yql_mounts.cpp b/ydb/library/yql/core/services/mounts/yql_mounts.cpp index cd6e8e2559..63151a99e8 100644 --- a/ydb/library/yql/core/services/mounts/yql_mounts.cpp +++ b/ydb/library/yql/core/services/mounts/yql_mounts.cpp @@ -105,6 +105,7 @@ namespace NYql { AddLibraryFromResource(userData, "/lib/yql/id.yql"); AddLibraryFromResource(userData, "/lib/yql/sqr.yql"); AddLibraryFromResource(userData, "/lib/yql/core.yql"); + AddLibraryFromResource(userData, "/lib/yql/walk_folders.yql"); } TUserDataTable GetYqlModuleResolverImpl( diff --git a/ydb/library/yql/mount/lib/yql/walk_folders.yql b/ydb/library/yql/mount/lib/yql/walk_folders.yql new file mode 100644 index 0000000000..1cf37bc31f --- /dev/null +++ b/ydb/library/yql/mount/lib/yql/walk_folders.yql @@ -0,0 +1,36 @@ +#library +( +(let AnyNodeDiveHandler + (lambda + '(nodes state attrList level) + '((Map (ListExtract nodes 'Path) (lambda '(path) '(path attrList))) state) + ) +) + +(let AttrListType (ListType (DataType 'String))) + +(let YtFolderNodeType + (StructType '('"Attributes" (DataType 'Yson)) '('"Path" (DataType 'String)) '('"Type" (DataType 'String))) +) + +(let TraverseLevelType (DataType 'Int32)) + +(let MakePrePostHandlersType + (lambda '(stateType) + (CallableType '() '(stateType) '((ListType YtFolderNodeType)) + '(stateType) '(TraverseLevelType)) + ) +) + +(let MakeResolveDiveHandlersType + (lambda '(stateType) + (CallableType '() '((TupleType (ListType (TupleType (DataType 'String) AttrListType)) stateType)) + '((ListType YtFolderNodeType)) '(stateType) '(AttrListType) '(TraverseLevelType)) + ) +) + +(export AnyNodeDiveHandler) +(export YtFolderNodeType) +(export MakePrePostHandlersType) +(export MakeResolveDiveHandlersType) +)
\ No newline at end of file diff --git a/ydb/library/yql/providers/yt/common/yql_configuration.h b/ydb/library/yql/providers/yt/common/yql_configuration.h index 4ffa3068dc..6d9ca0338c 100644 --- a/ydb/library/yql/providers/yt/common/yql_configuration.h +++ b/ydb/library/yql/providers/yt/common/yql_configuration.h @@ -64,4 +64,6 @@ constexpr ui64 DEFAULT_APPLY_STORED_CONSTRAINTS = 0ULL; constexpr bool DEFAULT_TABLE_CONTENT_LOCAL_EXEC = false; +constexpr ui32 DEFAULT_BATCH_LIST_FOLDER_CONCURRENCY = 5; + } // NYql diff --git a/ydb/library/yql/providers/yt/common/yql_names.h b/ydb/library/yql/providers/yt/common/yql_names.h index 1fece7c714..61fadf3495 100644 --- a/ydb/library/yql/providers/yt/common/yql_names.h +++ b/ydb/library/yql/providers/yt/common/yql_names.h @@ -49,6 +49,8 @@ const TStringBuf MrTableRangeName = "MrTableRange"; const TStringBuf MrTableRangeStrictName = "MrTableRangeStrict"; const TStringBuf MrTableConcatName = "MrTableConcat"; const TStringBuf MrFolderName = "MrFolder"; +const TStringBuf MrWalkFoldersName = "MrWalkFolders"; +const TStringBuf MrWalkFoldersImplName = "MrWalkFoldersImpl"; const TStringBuf MrRangeInputListInternal = "MrRangeInputListInternal"; // YT related names diff --git a/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp b/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp index a107bc59f1..508aae116b 100644 --- a/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp +++ b/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp @@ -427,6 +427,7 @@ TYtConfiguration::TYtConfiguration() REGISTER_SETTING(*this, FileCacheTtl); REGISTER_SETTING(*this, _ImpersonationUser); REGISTER_SETTING(*this, InferSchemaMode).Parser([](const TString& v) { return FromString<EInferSchemaMode>(v); }); + REGISTER_SETTING(*this, BatchListFolderConcurrency).Lower(1); // Upper bound on concurrent batch folder list requests https://yt.yandex-team.ru/docs/api/commands#execute_batch REGISTER_SETTING(*this, JoinCommonUseMapMultiOut); REGISTER_SETTING(*this, _EnableYtPartitioning); REGISTER_SETTING(*this, UseAggPhases); diff --git a/ydb/library/yql/providers/yt/common/yql_yt_settings.h b/ydb/library/yql/providers/yt/common/yql_yt_settings.h index 1aa9c4c40c..9b5de8aa7c 100644 --- a/ydb/library/yql/providers/yt/common/yql_yt_settings.h +++ b/ydb/library/yql/providers/yt/common/yql_yt_settings.h @@ -101,6 +101,7 @@ struct TYtSettings { NCommon::TConfSetting<TDuration, false> FileCacheTtl; NCommon::TConfSetting<TString, false> _ImpersonationUser; NCommon::TConfSetting<EInferSchemaMode, false> InferSchemaMode; + NCommon::TConfSetting<ui32, false> BatchListFolderConcurrency; // Job runtime NCommon::TConfSetting<TString, true> Pool; diff --git a/ydb/library/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.json b/ydb/library/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.json index 3b7c224fd6..447d044684 100644 --- a/ydb/library/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.json +++ b/ydb/library/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.json @@ -457,6 +457,31 @@ {"Index": 0, "Name": "First", "Type": "TYtOutputOpBase"}, {"Index": 1, "Name": "Second", "Type": "TYtOutputOpBase"} ] + }, + { + "Name": "TYtWalkFolders", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "MrWalkFolders"}, + "Children": [ + {"Index": 0, "Name": "Prefix", "Type": "TCoAtom"}, + {"Index": 1, "Name": "Attributes", "Type": "TCoAtom"}, + {"Index": 2, "Name": "PickledUserState", "Type": "TExprBase"}, + {"Index": 3, "Name": "UserStateType", "Type": "TExprBase"}, + {"Index": 4, "Name": "PreHandler", "Type": "TExprBase"}, + {"Index": 5, "Name": "ResolveHandler", "Type": "TExprBase"}, + {"Index": 6, "Name": "DiveHandler", "Type": "TExprBase"}, + {"Index": 7, "Name": "PostHandler", "Type": "TExprBase"} + ] + }, + { + "Name": "TYtWalkFoldersImpl", + "Base": "TCallable", + "Match": {"Type": "Callable", "Name": "MrWalkFoldersImpl"}, + "Children": [ + {"Index": 0, "Name": "PickledUserState", "Type": "TExprBase"}, + {"Index": 1, "Name": "UserStateType", "Type": "TExprBase"}, + {"Index": 2, "Name": "ProcessStateKey", "Type": "TCoAtom"} + ] } - ] + ] } diff --git a/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.cpp b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.cpp index 99d0088803..e5fa5a3742 100644 --- a/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.cpp +++ b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.cpp @@ -2,6 +2,7 @@ #include <yt/cpp/mapreduce/interface/error_codes.h> #include <ydb/library/yql/utils/log/log.h> +#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h> #include <ydb/library/yql/providers/yt/gateway/lib/yt_helpers.h> namespace NYql::NNative { @@ -170,11 +171,11 @@ IYtGateway::TBatchFolderResult ExecGetFolder(const TExecContext<IYtGateway::TBat folderResult.SetSuccess(); for (const auto& folder : execCtx->Options_.Folders()) { - YQL_CLOG(INFO, ProviderYt) << "Executing list command with prefix: " << folder.Prefix; const auto cacheKey = std::accumulate(folder.AttrKeys.begin(), folder.AttrKeys.end(), folder.Prefix, [] (TString&& str, const TString& arg) { return str + "&" + arg; }); + YQL_CLOG(INFO, ProviderYt) << "Executing list command with prefix: " << folder.Prefix << " , cacheKey = " << cacheKey; auto maybeCached = MaybeGetFolderFromCache(entry, cacheKey); if (maybeCached) { @@ -217,7 +218,15 @@ IYtGateway::TBatchFolderResult ExecGetFolder(const TExecContext<IYtGateway::TBat }) ); } - batchList->ExecuteBatch(); + + TExecuteBatchOptions batchOptions; + if (batchRes.size() > 1) { + const size_t concurrency = execCtx->Options_.Config()->BatchListFolderConcurrency + .Get().GetOrElse(DEFAULT_BATCH_LIST_FOLDER_CONCURRENCY); + batchOptions.Concurrency(concurrency); + } + batchList->ExecuteBatch(batchOptions); + try { WaitExceptionOrAll(batchRes).Wait(); for (auto& res : batchRes) { diff --git a/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.h b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.h index 645fc2a4b2..c2eb8d1365 100644 --- a/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.h +++ b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.h @@ -14,7 +14,7 @@ TMaybe<TFileLinkPtr> MaybeGetFilePtrFromCache(TTransactionCache::TEntry::TPtr en NYT::TAttributeFilter MakeAttrFilter(const TSet<TString>& attributes, bool isResolvingLink); -IYtGateway::TBatchFolderResult::TFolderItem MakeFolderItem(const NYT::TNode& node, const TString& path); +IYtGateway::TBatchFolderResult::TFolderItem MakeFolderItem(const NYT::TNode& node, const TString& prefix, const TString& name, const TVector<TString>& reqAttrKeys); const TTransactionCache::TEntry::TFolderCache::value_type& StoreResInCache(const TTransactionCache::TEntry::TPtr& entry, TVector<IYtGateway::TBatchFolderResult::TFolderItem>&& items, const TString& cacheKey); diff --git a/ydb/library/yql/providers/yt/provider/CMakeLists.darwin-arm64.txt b/ydb/library/yql/providers/yt/provider/CMakeLists.darwin-arm64.txt index a55b537886..0ccf8f9454 100644 --- a/ydb/library/yql/providers/yt/provider/CMakeLists.darwin-arm64.txt +++ b/ydb/library/yql/providers/yt/provider/CMakeLists.darwin-arm64.txt @@ -89,6 +89,7 @@ target_sources(providers-yt-provider PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_helpers.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_intent_determination.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery_walk_folders.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_join_reorder.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_key.cpp diff --git a/ydb/library/yql/providers/yt/provider/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/yt/provider/CMakeLists.darwin-x86_64.txt index a55b537886..0ccf8f9454 100644 --- a/ydb/library/yql/providers/yt/provider/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/providers/yt/provider/CMakeLists.darwin-x86_64.txt @@ -89,6 +89,7 @@ target_sources(providers-yt-provider PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_helpers.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_intent_determination.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery_walk_folders.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_join_reorder.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_key.cpp diff --git a/ydb/library/yql/providers/yt/provider/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/yt/provider/CMakeLists.linux-aarch64.txt index 885a4f2be9..1ddc770b11 100644 --- a/ydb/library/yql/providers/yt/provider/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/providers/yt/provider/CMakeLists.linux-aarch64.txt @@ -90,6 +90,7 @@ target_sources(providers-yt-provider PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_helpers.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_intent_determination.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery_walk_folders.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_join_reorder.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_key.cpp diff --git a/ydb/library/yql/providers/yt/provider/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/yt/provider/CMakeLists.linux-x86_64.txt index 885a4f2be9..1ddc770b11 100644 --- a/ydb/library/yql/providers/yt/provider/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/providers/yt/provider/CMakeLists.linux-x86_64.txt @@ -90,6 +90,7 @@ target_sources(providers-yt-provider PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_helpers.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_intent_determination.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery_walk_folders.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_join_reorder.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_key.cpp diff --git a/ydb/library/yql/providers/yt/provider/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/yt/provider/CMakeLists.windows-x86_64.txt index a55b537886..0ccf8f9454 100644 --- a/ydb/library/yql/providers/yt/provider/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/providers/yt/provider/CMakeLists.windows-x86_64.txt @@ -89,6 +89,7 @@ target_sources(providers-yt-provider PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_helpers.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_intent_determination.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery_walk_folders.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_join_reorder.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_key.cpp diff --git a/ydb/library/yql/providers/yt/provider/ya.make b/ydb/library/yql/providers/yt/provider/ya.make index 8c79642442..3c132cc5f6 100644 --- a/ydb/library/yql/providers/yt/provider/ya.make +++ b/ydb/library/yql/providers/yt/provider/ya.make @@ -17,6 +17,7 @@ SRCS( yql_yt_helpers.cpp yql_yt_intent_determination.cpp yql_yt_io_discovery.cpp + yql_yt_io_discovery_walk_folders.cpp yql_yt_join_impl.cpp yql_yt_join_reorder.cpp yql_yt_key.cpp diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery.cpp index 0e7249c0dc..9f06686e92 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery.cpp @@ -3,11 +3,13 @@ #include "yql_yt_gateway.h" #include "yql_yt_op_settings.h" #include "yql_yt_helpers.h" +#include "yql_yt_io_discovery_walk_folders.h" #include <ydb/library/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.h> #include <ydb/library/yql/providers/yt/common/yql_names.h> #include <ydb/library/yql/providers/common/provider/yql_provider_names.h> #include <ydb/library/yql/providers/common/provider/yql_provider.h> +#include <ydb/library/yql/core/services/yql_eval_expr.h> #include <ydb/library/yql/core/yql_expr_optimize.h> #include <ydb/library/yql/core/yql_expr_type_annotation.h> #include <ydb/library/yql/core/yql_expr_constraint.h> @@ -40,8 +42,11 @@ public: } TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { + YQL_CLOG(INFO, ProviderYt) << "YtIODiscovery - start"; + output = input; if (ctx.Step.IsDone(TExprStep::DiscoveryIO)) { + YQL_CLOG(INFO, ProviderYt) << "YtIODiscovery - finish, is done"; return TStatus::Ok; } @@ -175,67 +180,46 @@ public: return status; } - status = OptimizeExpr(output, output, [&](const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr { - if (auto maybeRead = TMaybeNode<TYtRead>(node)) { - if (!maybeRead.DataSource()) { // Validates provider - return node; - } - auto read = maybeRead.Cast(); - auto ds = read.DataSource(); - if (!EnsureArgsCount(read.Ref(), 5, ctx)) { - return {}; - } - - TYtInputKeys keys; - if (!keys.Parse(read.Arg(2).Ref(), ctx)) { - return {}; - } - - if (keys.IsProcessed()) { - // Already processed - return node; - } - - if (keys.GetKeys().empty()) { - ctx.AddError(TIssue(ctx.GetPosition(read.Arg(2).Pos()), "The list of tables is empty")); - return {}; - } - - if (keys.GetType() != TYtKey::EType::TableScheme) { - auto cluster = TString{ds.Cluster().Value()}; - for (auto& key: keys.GetKeys()) { - auto keyPos = ctx.GetPosition(key.GetNode()->Pos()); - if (key.GetRange()) { - PendingRanges_.emplace(std::make_pair(cluster, *key.GetRange()), std::make_pair(keyPos, NThreading::TFuture<IYtGateway::TTableRangeResult>())); - } - else if (key.GetFolder()) { - PendingFolders_.emplace(std::make_pair(cluster, *key.GetFolder()), std::make_pair(keyPos, NThreading::TFuture<IYtGateway::TFolderResult>())); - } - else if (!key.IsAnonymous()) { - if (PendingCanonizations_.insert(std::make_pair(std::make_pair(cluster, key.GetPath()), paths.size())).second) { - paths.push_back(IYtGateway::TCanonizeReq() - .Cluster(cluster) - .Path(key.GetPath()) - .Pos(keyPos) - ); - } + status = VisitInputKeys(output, ctx, [this, &ctx, &paths] (TYtRead readNode, TYtInputKeys&& keys) -> TExprNode::TPtr { + if (keys.GetType() != TYtKey::EType::TableScheme) { + const auto cluster = TString{readNode.DataSource().Cluster().Value()}; + for (auto&& key: keys.ExtractKeys()) { + auto keyPos = ctx.GetPosition(key.GetNode()->Pos()); + if (key.GetRange()) { + PendingRanges_.emplace(std::make_pair(cluster, *key.GetRange()), std::make_pair(keyPos, NThreading::TFuture<IYtGateway::TTableRangeResult>())); + } + else if (key.GetFolder()) { + PendingFolders_.emplace(std::make_pair(cluster, *key.GetFolder()), std::make_pair(keyPos, NThreading::TFuture<IYtGateway::TFolderResult>())); + } + else if (key.GetWalkFolderArgs()) { + return ctx.ChangeChild(readNode.Ref(), 2, InitializeWalkFolders(std::move(key), cluster, keyPos, ctx)); + } + else if (!key.IsAnonymous()) { + if (PendingCanonizations_.insert(std::make_pair(std::make_pair(cluster, key.GetPath()), paths.size())).second) { + paths.push_back(IYtGateway::TCanonizeReq() + .Cluster(cluster) + .Path(key.GetPath()) + .Pos(keyPos) + ); } } } - return node; } + return readNode.Ptr(); + }, /* visitChanges */ true); - return node; - }, ctx, TOptimizeExprSettings(nullptr)); - - if (status.Level != TStatus::Ok) { + if (status.Level == TStatus::Error) { PendingCanonizations_.clear(); PendingFolders_.clear(); PendingRanges_.clear(); + PendingWalkFolders_.clear(); + YQL_CLOG(INFO, ProviderYt) << "YtIODiscovery - finish, status: " << (TStatus::ELevel)status.Level; return status; } - - if (PendingRanges_.empty() && PendingFolders_.empty() && PendingCanonizations_.empty()) { + + if (PendingRanges_.empty() && PendingFolders_.empty() + && PendingCanonizations_.empty() && PendingWalkFolders_.empty()) { + YQL_CLOG(INFO, ProviderYt) << "YtIODiscovery - finish, status: " << (TStatus::ELevel)status.Level; return status; } @@ -333,16 +317,25 @@ public: x.second.second = result; } - AllFuture_ = NThreading::WaitExceptionOrAll(allFutures); + CanonizationRangesFoldersFuture_ = NThreading::WaitExceptionOrAll(allFutures); + YQL_CLOG(INFO, ProviderYt) << "YtIODiscovery - finish, status: " << (TStatus::ELevel)status.Level; return TStatus::Async; } NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) final { Y_UNUSED(input); - return AllFuture_; + if (auto walkFoldersFuture = MaybeGetWalkFoldersFuture()) { + if (PendingCanonizations_.empty() && PendingRanges_.empty() && PendingFolders_.empty()) { + return walkFoldersFuture.GetRef(); + } + return NThreading::WaitExceptionOrAll(walkFoldersFuture.GetRef(), CanonizationRangesFoldersFuture_); + } + + return CanonizationRangesFoldersFuture_; } TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { + YQL_CLOG(INFO, ProviderYt) << "YtIODiscovery - DoApplyAsyncChanges start"; output = input; if (!PendingCanonizations_.empty()) { @@ -353,7 +346,7 @@ public: PendingCanonizations_.clear(); PendingRanges_.clear(); CanonizeFuture_ = {}; - AllFuture_ = {}; + CanonizationRangesFoldersFuture_ = {}; return TStatus::Error; } @@ -415,12 +408,12 @@ public: YQL_CLOG(INFO, ProviderYt) << "Got " << items.size() << " items for " << " GetFolder"; TVector<TExprBase> listItems; for (auto& item: items) { - listItems.push_back(BuildFolderListItemExpr(ctx, node->Pos(), item)); + listItems.push_back(BuildFolderListItemExpr(ctx, node->Pos(), item.Path, item.Type, item.Attributes)); } return BuildFolderTableResExpr(ctx, node->Pos(), read.World(), BuildFolderListExpr(ctx, node->Pos(), listItems).Ptr()).Ptr(); } - + if (keys.GetType() != TYtKey::EType::Table) { return node; } @@ -546,17 +539,26 @@ public: PendingRanges_.clear(); PendingFolders_.clear(); CanonizeFuture_ = {}; - AllFuture_ = {}; + CanonizationRangesFoldersFuture_ = {}; + if (!PendingWalkFolders_.empty() && !State_->Types->EvaluationInProgress) { + const auto walkFoldersStatus = RewriteWalkFoldersOnAsyncOrEvalChanges(output, ctx); + if (walkFoldersStatus != TStatus::Ok) { + return walkFoldersStatus; + } + } + + YQL_CLOG(INFO, ProviderYt) << "YtIODiscovery DoApplyAsyncChanges - finish"; return status; } void Rewind() final { + YQL_CLOG(INFO, ProviderYt) << "Rewinding YtIODiscovery"; PendingRanges_.clear(); PendingFolders_.clear(); PendingCanonizations_.clear(); CanonizeFuture_ = {}; - AllFuture_ = {}; + CanonizationRangesFoldersFuture_ = {}; } private: @@ -661,83 +663,169 @@ private: res->ChildRef(4) = Build<TCoNameValueTupleList>(ctx, read.Pos()).Add(readSettings).Done().Ptr(); return res; } - - TExprBase BuildFolderListItemExpr(TExprContext& ctx, NYql::TPositionHandle pos, const IYtGateway::TFolderResult::TFolderItem& folderItem) { - return Build<TCoAsStruct>(ctx, pos) - .Add() - .Add<TCoAtom>() - .Value("Path") - .Build() - .Add<TCoString>() - .Literal() - .Value(folderItem.Path) - .Build() - .Build() - .Build() - .Add() - .Add<TCoAtom>() - .Value("Type") - .Build() - .Add<TCoString>() - .Literal() - .Value(folderItem.Type) - .Build() - .Build() - .Build() - .Add() - .Add<TCoAtom>() - .Value("Attributes") - .Build() - .Add<TCoYson>() - .Literal() - .Value(folderItem.Attributes) - .Build() - .Build() + + [[nodiscard]] + TExprNode::TPtr InitializeWalkFolders(TYtKey&& key, const TString& cluster, TPosition pos, TExprContext& ctx) { + auto& args = key.GetWalkFolderArgs().GetRef(); + const auto instanceKey = args.StateKey; + + TWalkFoldersImpl walkFolders {State_->SessionId, cluster, State_->Configuration->Snapshot(), + pos, std::move(args), State_->Gateway}; + YQL_CLOG(INFO, ProviderYt) << "Initialized WalkFolders from " << cluster << ".`" + << args.InitialFolder.Prefix << "`" << " with root attributes cnt: " + << args.InitialFolder.Attributes.size(); + PendingWalkFolders_.emplace(instanceKey, std::move(walkFolders)); + + auto walkFoldersImplNode = Build<TYtWalkFoldersImpl>(ctx, key.GetNode()->Pos()) + .ProcessStateKey() + .Value(args.StateKey) .Build() - .Done(); + .PickledUserState(args.PickledUserState) + .UserStateType(args.UserStateType) + .Build() + .Value() + .Ptr(); + + return walkFoldersImplNode; } + + TStatus RewriteWalkFoldersOnAsyncOrEvalChanges(TExprNode::TPtr& output, TExprContext& ctx) { + Y_ENSURE(!PendingWalkFolders_.empty()); - TCoList BuildFolderListExpr(TExprContext& ctx, NYql::TPositionHandle pos, const TVector<TExprBase>& folderItems) { - return Build<TCoList>(ctx, pos) - .ListType<TCoListType>() - .ItemType<TCoStructType>() - .Add<TExprList>() - .Add<TCoAtom>() - .Value("Path") - .Build() - .Add<TCoDataType>() - .Type() - .Value("String") + auto currImplKey = PendingWalkFolders_.begin()->first; + + auto status = VisitInputKeys(output, ctx, [this, &ctx, currImplKey] (TYtRead readNode, TYtInputKeys&& keys) -> TExprNode::TPtr { + if (keys.GetType() == TYtKey::EType::WalkFoldersImpl) { + YQL_CLOG(INFO, ProviderYt) << "YtIODiscovery - DoApplyAsyncChanges WalkFoldersImpl handling start"; + + auto parsedKey = keys.ExtractKeys().front(); + if (!parsedKey.GetWalkFolderImplArgs()) { + YQL_CLOG(INFO, ProviderYt) << "Failed to parse WalkFolderImpl args"; + return {}; + } + + const auto instanceKey = parsedKey.GetWalkFolderImplArgs()->StateKey; + if (instanceKey != currImplKey) { + return readNode.Ptr(); + } + + auto walkFoldersInstanceIt = PendingWalkFolders_.find(currImplKey); + YQL_ENSURE(!walkFoldersInstanceIt.IsEnd(), "Failed to find walkFoldersInstance with key: " << instanceKey); + + auto& walkFoldersImpl = walkFoldersInstanceIt->second; + + Y_ENSURE(walkFoldersImpl.GetAnyOpFuture().HasValue(), + "Called RewriteWalkFoldersOnAsyncChanges, but impl future is not ready"); + + auto userState = + walkFoldersImpl.GetNextStateExpr(ctx, std::move(parsedKey.GetWalkFolderImplArgs().GetRef())); + if (walkFoldersImpl.IsFinished()) { + YQL_CLOG(INFO, ProviderYt) << "Building result expr for WalkFolders with key: " << instanceKey; + PendingWalkFolders_.erase(currImplKey); + + auto type = Build<TCoStructType>(ctx, readNode.Pos()) + .Add<TExprList>() + .Add<TCoAtom>() + .Value("State") .Build() + .Add(parsedKey.GetWalkFolderImplArgs()->UserStateType) .Build() - .Build() - .Add<TExprList>() - .Add<TCoAtom>() - .Value("Type") - .Build() - .Add<TCoDataType>() - .Type() - .Value("String") + .DoBuild(); + + auto resList = Build<TCoList>(ctx, readNode.Pos()) + .ListType<TCoListType>() + .ItemType<TCoStructType>() + .InitFrom(type) .Build() .Build() - .Build() - .Add<TExprList>() - .Add<TCoAtom>() - .Value("Attributes") + .FreeArgs() + .Add<TCoAsStruct>() + .Add() + .Add<TCoAtom>() + .Value("State") + .Build() + .Add(userState) + .Build() + .Build() .Build() - .Add<TCoDataType>() - .Type() - .Value("Yson") + .DoBuild(); + + return Build<TCoCons>(ctx, readNode.Pos()) + .World(readNode.World()) + .Input<TCoAssumeColumnOrder>() + .Input(resList) + .ColumnOrder<TCoAtomList>() + .Add() + .Value("State") + .Build() .Build() .Build() - .Build() - .Build() - .Build() - .FreeArgs() - .Add(folderItems) - .Build() - .Build() - .Value(); + .Done() + .Ptr(); + } + + if (userState == parsedKey.GetWalkFolderImplArgs()->UserStateExpr) { + return readNode.Ptr(); + } + + YQL_CLOG(DEBUG, ProviderYt) << "State expr ast: " << ConvertToAst(*userState, ctx, {}).Root->ToString(); + + auto walkFoldersImplNode = ctx.ChangeChild(*parsedKey.GetNode(), 0, std::move(userState)); + return ctx.ChangeChild(readNode.Ref(), 2, std::move(walkFoldersImplNode)); + + return readNode.Ptr(); + } + return readNode.Ptr(); + }); + + if (status != TStatus::Error && !PendingWalkFolders_.empty()) { + YQL_CLOG(INFO, ProviderYt) << "Has pending WalkFolders, repeating. "; + status = TStatus::Repeat; + } else { + YQL_CLOG(INFO, ProviderYt) << "All WalkFolders instances are finished. "; + status = TStatus::Ok; + } + + YQL_CLOG(INFO, ProviderYt) << "WalkFolders next status: " << (TStatus::ELevel)status.Level; + return status; + } + + IGraphTransformer::TStatus VisitInputKeys(TExprNode::TPtr& output, + TExprContext& ctx, std::function<TExprNode::TPtr(TYtRead node, TYtInputKeys&&)> processKeys, bool visitChanges = false) { + TOptimizeExprSettings settings(nullptr); + settings.VisitChanges = visitChanges; + + const auto status = OptimizeExpr(output, output, [&](const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr { + if (auto maybeRead = TMaybeNode<TYtRead>(node)) { + if (!maybeRead.DataSource()) { // Validates provider + return node; + } + auto read = maybeRead.Cast(); + auto ds = read.DataSource(); + if (!EnsureArgsCount(read.Ref(), 5, ctx)) { + return {}; + } + + TYtInputKeys keys; + auto& keysNode = read.Arg(2).Ref(); + if (!keys.Parse(keysNode, ctx)) { + return {}; + } + + if (keys.IsProcessed()) { + // Already processed + return node; + } + + if (keys.GetKeys().empty()) { + ctx.AddError(TIssue(ctx.GetPosition(read.Arg(2).Pos()), "The list of tables is empty")); + return {}; + } + return processKeys(read, std::move(keys)); + } + return node; + }, ctx, settings); + return status; } TCoCons BuildFolderTableResExpr(TExprContext& ctx, NYql::TPositionHandle pos, const TExprBase& world, const TExprNodePtr& folderList) { @@ -771,14 +859,22 @@ private: return res; } + TMaybe<NThreading::TFuture<void>> MaybeGetWalkFoldersFuture() const { + // inflight 1 + return !PendingWalkFolders_.empty() + ? MakeMaybe(PendingWalkFolders_.begin()->second.GetAnyOpFuture()) + : Nothing(); + } + private: TYtState::TPtr State_; THashMap<std::pair<TString, TYtKey::TRange>, std::pair<TPosition, NThreading::TFuture<IYtGateway::TTableRangeResult>>> PendingRanges_; THashMap<std::pair<TString, TYtKey::TFolderList>, std::pair<TPosition, NThreading::TFuture<IYtGateway::TFolderResult>>> PendingFolders_; THashMap<std::pair<TString, TString>, size_t> PendingCanonizations_; // cluster, original table path -> positions in canon result + THashMap<ui64, TWalkFoldersImpl> PendingWalkFolders_; NThreading::TFuture<IYtGateway::TCanonizePathsResult> CanonizeFuture_; - NThreading::TFuture<void> AllFuture_; + NThreading::TFuture<void> CanonizationRangesFoldersFuture_; THashMap<TString, TString> FolderFileToAlias_; }; diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery_walk_folders.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery_walk_folders.cpp new file mode 100644 index 0000000000..c907d630a4 --- /dev/null +++ b/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery_walk_folders.cpp @@ -0,0 +1,582 @@ +#include "yql_yt_io_discovery_walk_folders.h" + +#include <ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.h> +#include <ydb/library/yql/providers/yt/provider/yql_yt_gateway.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_pack.h> +#include <ydb/library/yql/utils/log/log.h> + +#include <util/string/split.h> + +namespace NYql { +using namespace NNodes; + + +NNodes::TCoStructType +BuildFolderItemStructType(TExprContext& ctx, NYql::TPositionHandle pos) { + return Build<TCoStructType>(ctx, pos) + .Add<TExprList>() + .Add<TCoAtom>() + .Value("Path") + .Build() + .Add<TCoDataType>() + .Type() + .Value("String") + .Build() + .Build() + .Build() + .Add<TExprList>() + .Add<TCoAtom>() + .Value("Type") + .Build() + .Add<TCoDataType>() + .Type() + .Value("String") + .Build() + .Build() + .Build() + .Add<TExprList>() + .Add<TCoAtom>() + .Value("Attributes") + .Build() + .Add<TCoDataType>() + .Type() + .Value("Yson") + .Build() + .Build() + .Build() + .Done(); +} + +TCoList +BuildFolderListExpr(TExprContext& ctx, NYql::TPositionHandle pos, const TVector<NNodes::TExprBase>& folderItems) { + return Build<TCoList>(ctx, pos) + .ListType<TCoListType>() + .ItemType<TCoStructType>() + .InitFrom(BuildFolderItemStructType(ctx, pos)) + .Build() + .Build() + .FreeArgs() + .Add(folderItems) + .Build() + .Build() + .Value(); +} + +TExprBase +BuildFolderListItemExpr(TExprContext &ctx, NYql::TPositionHandle pos, + const TString &path, const TString &type, + const TString &attributesYson) { + return Build<TCoAsStruct>(ctx, pos) + .Add() + .Add<TCoAtom>() + .Value("Path") + .Build() + .Add<TCoString>() + .Literal() + .Value(path) + .Build() + .Build() + .Build() + .Add() + .Add<TCoAtom>() + .Value("Type") + .Build() + .Add<TCoString>() + .Literal() + .Value(type) + .Build() + .Build() + .Build() + .Add() + .Add<TCoAtom>() + .Value("Attributes") + .Build() + .Add<TCoYson>() + .Literal() + .Value(attributesYson) + .Build() + .Build() + .Build() + .Done(); +} + +TWalkFoldersImpl::TWalkFoldersImpl(const TString& sessionId, const TString& cluster, TYtSettings::TConstPtr config, + TPosition pos, TYtKey::TWalkFoldersArgs&& args, const IYtGateway::TPtr gateway): + Pos_(pos), SessionId_(sessionId), Cluster_(cluster), Config_(config), Gateway_(gateway) { + + PreHandler_ = args.PreHandler->IsCallable("Void") ? Nothing() : MakeMaybe(args.PreHandler); + ResolveHandler_ = args.ResolveHandler; + DiveHandler_ = args.DiveHandler; + PostHandler_ = args.PostHandler->IsCallable("Void") ? Nothing() : MakeMaybe(args.PostHandler); + + ProcessFoldersQueue_.emplace_back(TFolderQueueItem { + .Folder = args.InitialFolder, + }); + IYtGateway::TBatchFolderOptions::TFolderPrefixAttrs folder { + std::move(args.InitialFolder.Prefix), + TSet<TString>(args.InitialFolder.Attributes.begin(), args.InitialFolder.Attributes.end()) + }; + DoFolderListOperation({folder}); +} + +TExprNode::TPtr TWalkFoldersImpl::GetNextStateExpr(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args) { + YQL_CLOG(INFO, ProviderYt) << "Current processing state: " << int(ProcessingState_); + switch (ProcessingState_) { + case WaitingListFolderOp: { + return AfterListFolderOp(ctx, args); + } + case PreHandling: { + return PreHandleVisitedInSingleFolder(ctx, args, ProcessFoldersQueue_.front()); + } + case ResolveHandling: { + return ResolveHandleInSingleFolder(ctx, args, ProcessFoldersQueue_.front()); + } + case AfterResolveHandling: { + return AfterResolveHandle(ctx, args, ProcessFoldersQueue_.front()); + } + case WaitingResolveLinkOp: { + return HandleAfterResolveFuture(ctx, args, ProcessFoldersQueue_.front()); + } + case DiveHandling: { + return DiveHandleInSingleFolder(ctx, args, ProcessFoldersQueue_.front()); + } + case AfterDiveHandling: { + return AfterDiveHandle(ctx, args, ProcessFoldersQueue_.front()); + } + case PostHandling: { + return PostHandleVisitedInSingleFolder(ctx, args, ProcessFoldersQueue_.front()); + } + case FinishingHandling: { + return BuildFinishedState(ctx, args); + } + case FinishedHandling: { + } + } + return args.UserStateExpr; +} + +void TWalkFoldersImpl::DoFolderListOperation(TVector<IYtGateway::TBatchFolderOptions::TFolderPrefixAttrs>&& folders) { + YQL_CLOG(INFO, ProviderYt) << "Sending folder list batch with " << folders.size() << " items"; + auto options = IYtGateway::TBatchFolderOptions(SessionId_) + .Pos(Pos_) + .Cluster(Cluster_) + .Config(Config_) + .Folders(folders); + BatchFolderListFuture_ = Gateway_->GetFolders(std::move(options)); +} + +TExprNode::TPtr TWalkFoldersImpl::EvaluateNextUserStateExpr(TExprContext& ctx, const TExprNode::TPtr& userStateType, const TExprNode::TPtr userStateExpr, std::function<TExprNode::TPtr(const NNodes::TExprBase&)> nextStateFunc) { + + const auto userStateUnpickled = Build<TCoUnpickle>(ctx, PosHandle_) + .Type(userStateType) + .Buffer(userStateExpr) + .Build(); + + const auto nextUserStatePickled = Build<TCoPickle>(ctx, PosHandle_) + .Value(nextStateFunc(userStateUnpickled.Value())) + .Build() + .Value() + .Ptr(); + + ctx.Step.Repeat(TExprStep::ExprEval); + + YQL_CLOG(TRACE, ProviderYt) << "WalkFolders - next evaluate ast: " << ConvertToAst(*nextUserStatePickled, ctx, {}).Root->ToString(); + + return ctx.Builder(PosHandle_) + .Callable("EvaluateExpr") + .Add(0, nextUserStatePickled) + .Seal() + .Build(); +} + +TExprNode::TPtr TWalkFoldersImpl::AfterListFolderOp(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args) { + if (!BatchFolderListFuture_) { + YQL_CLOG(INFO, ProviderYt) << "Folder queue is empty, finishing WalkFolders with key: " << args.StateKey; + ProcessingState_ = FinishingHandling; + return GetNextStateExpr(ctx, args); + } else { + if (!BatchFolderListFuture_->HasValue()) { + YQL_CLOG(INFO, ProviderYt) << "Batch list future is not ready"; + return args.UserStateExpr; + } + + Y_ENSURE(!ProcessFoldersQueue_.empty(), "Got future result for Yt List but no folder in queue"); + auto folderListVal = BatchFolderListFuture_->ExtractValue(); + if (folderListVal.Success()) { + auto& folder = ProcessFoldersQueue_.front(); + YQL_CLOG(INFO, ProviderYt) << "Got " << folderListVal.Items.size() << " results for list op at `" << folder.Folder.Prefix << "`"; + folder.ItemsToPreHandle = std::move(folderListVal.Items); + folder.PreHandleItemsFetched = true; + ProcessingState_ = PreHandling; + } else { + folderListVal.ReportIssues(ctx.IssueManager); + } + + BatchFolderListFuture_ = Nothing(); + } + return PreHandleVisitedInSingleFolder(ctx, args, ProcessFoldersQueue_.front()); +} + +TExprNode::TPtr TWalkFoldersImpl::PreHandleVisitedInSingleFolder(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args, TFolderQueueItem& folder) { + YQL_CLOG(INFO, ProviderYt) << "Processing preHandler at " << folder.Folder.Prefix + << " for WalkFolders with key: " << args.StateKey; + + if (!folder.PreHandleItemsFetched) { + YQL_CLOG(INFO, ProviderYt) << "Waiting for folder list: `" << folder.Folder.Prefix << "`"; + ProcessingState_ = WaitingListFolderOp; + return GetNextStateExpr(ctx, args); + } + + if (folder.ItemsToPreHandle.empty()) { + YQL_CLOG(INFO, ProviderYt) << "Items to preHandle are empty, skipping " << folder.Folder.Prefix; + ProcessFoldersQueue_.pop_front(); + ProcessingState_ = WaitingListFolderOp; + return GetNextStateExpr(ctx, args); + } + + TVector<TExprBase> folderListItems; + for (auto&& item : folder.ItemsToPreHandle) { + if (PreHandler_) { + folderListItems.push_back( + BuildFolderListItemExpr(ctx, PosHandle_, item.Path,item.Type, + NYT::NodeToYsonString(item.Attributes))); + } + + if (item.Type == "link") { + folder.LinksToResolveHandle.emplace_back(std::move(item)); + } else if (item.Type == "map_node") { + folder.ItemsToDiveHandle.emplace_back(std::move(item)); + } else { + folder.ItemsToPostHandle.emplace_back(std::move(item)); + } + } + + if (!PreHandler_) { + YQL_CLOG(INFO, ProviderYt) << "No preHandler defined, skipping for WalkFolders with key: " << args.StateKey; + ProcessingState_ = ResolveHandling; + return GetNextStateExpr(ctx, args); + } + + const auto folderListExpr = BuildFolderListExpr(ctx, PosHandle_, folderListItems); + + const auto makeNextUserState = [&] (const TExprBase& userStateUnpickled) { + return Build<TCoApply>(ctx, PosHandle_) + .Callable(PreHandler_.GetRef()) + .FreeArgs() + .Add(folderListExpr) + .Add(userStateUnpickled) + .Add<TCoInt64>() + .Literal() + .Value(ToString(folder.Level)) + .Build() + .Build() + .Build() + .Build() + .Value() + .Ptr(); + }; + + ProcessingState_ = ResolveHandling; + return EvaluateNextUserStateExpr(ctx, args.UserStateType, args.UserStateExpr, makeNextUserState); +} + +TExprNode::TPtr TWalkFoldersImpl::ResolveHandleInSingleFolder(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args, TFolderQueueItem& folder) { + YQL_CLOG(INFO, ProviderYt) << "Processing resolveHandler at " << folder.Folder.Prefix + << "for WalkFolders with key: " << args.StateKey; + ProcessingState_ = AfterResolveHandling; + return BuildDiveOrResolveHandlerEval(ctx, args, ResolveHandler_.GetRef(), folder.LinksToResolveHandle, folder.Folder.Attributes, folder.Level); +} + +TExprNode::TPtr TWalkFoldersImpl::BuildDiveOrResolveHandlerEval(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args, TExprNode::TPtr& handler, + const TVector<IYtGateway::TBatchFolderResult::TFolderItem>& res, const TVector<TString>& attributes, ui64 level) { + using namespace NNodes; + + TVector<TExprBase> items; + items.reserve(res.size()); + for (auto& link : res) { + auto itemsExpr = + BuildFolderListItemExpr(ctx, PosHandle_, + link.Path, link.Type, NYT::NodeToYsonString(link.Attributes)); + items.push_back(itemsExpr); + } + const auto itemsNode = BuildFolderListExpr(ctx, PosHandle_, items); + + TVector<TExprBase> attributeExprs; + for (auto& attr : attributes) { + const auto attributeExpr = Build<TCoString>(ctx, PosHandle_) + .Literal() + .Value(attr) + .Build() + .Build() + .Value(); + attributeExprs.push_back(attributeExpr); + } + + const auto userStateUnpickled = Build<TCoUnpickle>(ctx, PosHandle_) + .Type(args.UserStateType) + .Buffer(args.UserStateExpr) + .DoBuild(); + + const auto handlerResult = Build<TCoApply>(ctx, PosHandle_) + .Callable(handler) + .FreeArgs() + .Add(itemsNode) + .Add(userStateUnpickled) + .Add<TCoAsList>() + .Add(attributeExprs) + .Build() + .Add<TCoInt64>() + .Literal() + .Value(ToString(level)) + .Build() + .Build() + .Build() + .Build() + .Value() + .Ptr(); + + auto resolveHandlerResPickled = ctx.Builder(PosHandle_) + .Callable("StaticMap") + .Add(0, handlerResult) + .Lambda(1) + .Param("item") + .Callable("Pickle") + .Arg(0, "item") + .Seal() + .Seal() + .Seal() + .Build(); + + ctx.Step.Repeat(TExprStep::ExprEval); + return ctx.Builder(PosHandle_) + .Callable("EvaluateExpr") + .Add(0, resolveHandlerResPickled) + .Seal() + .Build(); +} + +void ParseNameAttributesPickledList(TStringBuf pickledTupleList, std::function<void(TString&&, TSet<TString>)> handleParsedNameAndAttrs) { + using namespace NKikimr::NMiniKQL; + + TScopedAlloc alloc(__LOCATION__); + TTypeEnvironment env(alloc); + TMemoryUsageInfo memInfo("Yt WalkFolders"); + THolderFactory holderFactory(alloc.Ref(), memInfo); + + TSmallVec<TType*> nodeToResolveWithAttrListTypes; + auto stringType = TDataType::Create(NUdf::TDataType<char*>::Id, env); + nodeToResolveWithAttrListTypes.push_back(stringType); + nodeToResolveWithAttrListTypes.push_back(TListType::Create(stringType, env)); + + auto nodeToResolveTuple = TTupleType::Create(2, nodeToResolveWithAttrListTypes.data(), env); + TValuePacker packer(false, TListType::Create(nodeToResolveTuple, env)); + auto parsedList = packer.Unpack(pickledTupleList, holderFactory); + + YQL_CLOG(INFO, ProviderYt) << "Parsing list with length: " << parsedList.GetListLength(); + + for (size_t i = 0; i < parsedList.GetListLength(); ++i) { + const auto requestedTuple = parsedList.GetElement(i); + const auto nameEl = requestedTuple.GetElement(0); + const auto name = nameEl.AsStringRef(); + YQL_CLOG(INFO, ProviderYt) << "Parsed dive or resolve item name: " << name; + + auto requestedAttrsVal = requestedTuple.GetElement(1); + TSet<TString> attrs; + for (size_t j = 0; j < requestedAttrsVal.GetListLength(); ++j) { + const auto attrEl = requestedAttrsVal.GetElement(j); + YQL_CLOG(INFO, ProviderYt) << "Parsed requested attribute: " << attrEl.AsStringRef(); + attrs.insert(TString(attrEl.AsStringRef())); + } + handleParsedNameAndAttrs(TString(name), attrs); + } +} + +TExprNode::TPtr TWalkFoldersImpl::AfterResolveHandle(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args, TFolderQueueItem& folder) { + EnsureTupleSize(*args.UserStateExpr, 2, ctx); + YQL_CLOG(INFO, ProviderYt) << "After resolveHandler EvaluateExpr"; + + TCoString pickledLinksToResolve(args.UserStateExpr->Child(0)); + THashMap<TString, TSet<TString>> nameAndRequestedAttrs; + ParseNameAttributesPickledList(pickledLinksToResolve.Literal().StringValue(), + [&nameAndRequestedAttrs] (TString name, TSet<TString> attrs) { + nameAndRequestedAttrs[name] = std::move(attrs); + }); + + TVector<IYtGateway::TResolveOptions::TItemWithReqAttrs> links; + links.reserve(nameAndRequestedAttrs.size()); + for (auto&& linkToResolve : folder.LinksToResolveHandle) { + auto it = nameAndRequestedAttrs.find(linkToResolve.Path); + if (it == nameAndRequestedAttrs.end()) { + continue; + } + + IYtGateway::TResolveOptions::TItemWithReqAttrs link { + .Item = std::move(linkToResolve), + .AttrKeys = std::move(it->second), + }; + links.emplace_back(std::move(link)); + } + + if (links.empty()) { + YQL_CLOG(INFO, ProviderYt) << "Links to visit are empty"; + ProcessingState_ = DiveHandling; + return GetNextStateExpr(ctx, {.UserStateExpr = args.UserStateExpr->Child(1), .UserStateType = args.UserStateType, .StateKey = args.StateKey}); + } + + ProcessingState_ = WaitingResolveLinkOp; + auto options = IYtGateway::TResolveOptions(SessionId_) + .Pos(Pos_) + .Cluster(Cluster_) + .Config(Config_) + .Items(links); + BatchResolveFuture_ = Gateway_->ResolveLinks(std::move(options)); + + return args.UserStateExpr->Child(1); +} + +TExprNode::TPtr TWalkFoldersImpl::HandleAfterResolveFuture(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args, TFolderQueueItem& folder) { + YQL_CLOG(INFO, ProviderYt) << "After resolve future result"; + + if (!BatchResolveFuture_) { + YQL_CLOG(WARN, ProviderYt) << "Resolve future not set"; + return nullptr; + } + if (!BatchResolveFuture_->HasValue() && !BatchResolveFuture_->HasException()) { + YQL_CLOG(INFO, ProviderYt) << "Batch resolve future is not ready"; + return args.UserStateExpr; + } + + auto res = BatchResolveFuture_->ExtractValue(); + BatchResolveFuture_ = Nothing(); + YQL_CLOG(INFO, ProviderYt) << "Added items to handle after batch resolve future completion"; + + for (auto&& node : res.Items) { + if (node.Type == "map_node") { + folder.ItemsToDiveHandle.emplace_back(std::move(node)); + } else { + folder.ItemsToPostHandle.emplace_back(std::move(node)); + } + } + + ProcessingState_ = DiveHandling; + return GetNextStateExpr(ctx, args); +} + +TExprNode::TPtr TWalkFoldersImpl::DiveHandleInSingleFolder(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args, TFolderQueueItem& folder) { + YQL_CLOG(INFO, ProviderYt) << "Processing diveHandler at " << folder.Folder.Prefix + << " for WalkFolders with key: " << args.StateKey; + ProcessingState_ = AfterDiveHandling; + return BuildDiveOrResolveHandlerEval(ctx, args, DiveHandler_.GetRef(), folder.ItemsToDiveHandle, folder.Folder.Attributes, folder.Level); +} + +TExprNode::TPtr TWalkFoldersImpl::AfterDiveHandle(TExprContext& ctx, TYtKey::TWalkFoldersImplArgs args, TFolderQueueItem& folder) { + using namespace NKikimr::NMiniKQL; + + EnsureTupleSize(*args.UserStateExpr, 2, ctx); + YQL_CLOG(INFO, ProviderYt) << "After diveHandler EvaluateExpr"; + + TVector<IYtGateway::TBatchFolderOptions::TFolderPrefixAttrs> diveItems; + TCoString pickledLinksToResolve(args.UserStateExpr->Child(0)); + THashMap<TString, TSet<TString>> nameAndRequestedAttrs; + ParseNameAttributesPickledList(pickledLinksToResolve.Literal().StringValue(), + [&queue=ProcessFoldersQueue_, &diveItems, nextLevel = folder.Level + 1] (TString path, TSet<TString> attrs) { + diveItems.push_back({.Prefix = path, .AttrKeys = attrs}); + queue.push_back({ + .Folder = {.Prefix = std::move(path), .Attributes = TVector<TString>(attrs.begin(), attrs.end())}, + .Level = nextLevel, + }); + }); + + folder.ItemsToPostHandle.insert(folder.ItemsToPostHandle.end(), + std::make_move_iterator(folder.ItemsToDiveHandle.begin()), + std::make_move_iterator(folder.ItemsToDiveHandle.end())); + folder.ItemsToDiveHandle.clear(); + + args.UserStateExpr = args.UserStateExpr->Child(1); + ProcessingState_ = PostHandling; + + if (diveItems.empty()) { + YQL_CLOG(INFO, ProviderYt) << "Nodes to dive are empty"; + return GetNextStateExpr(ctx, args); + } + + auto options = IYtGateway::TBatchFolderOptions(SessionId_) + .Pos(Pos_) + .Cluster(Cluster_) + .Config(Config_) + .Folders(diveItems); + Y_ENSURE(!BatchFolderListFuture_, "Single inflight batch folder request allowed"); + BatchFolderListFuture_ = Gateway_->GetFolders(std::move(options)); + + return GetNextStateExpr(ctx, args); +} + +TExprNode::TPtr TWalkFoldersImpl::PostHandleVisitedInSingleFolder(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args, TFolderQueueItem& folder) { + if (!PostHandler_) { + YQL_CLOG(INFO, ProviderYt) << "No postHandler defined, skipping for WalkFolders with key: " << args.StateKey; + ProcessingState_ = WaitingListFolderOp; + return args.UserStateExpr; + } + + YQL_CLOG(INFO, ProviderYt) << "Processing postHandler at " << folder.Folder.Prefix + << " for WalkFolders with key: " << args.StateKey; + + TVector<TExprBase> folderListItems; + for (auto&& item : folder.ItemsToPostHandle) { + folderListItems.push_back( + BuildFolderListItemExpr(ctx, + PosHandle_, + item.Path, + item.Type, + NYT::NodeToYsonString(item.Attributes))); + + } + + const auto folderListExpr = BuildFolderListExpr(ctx, PosHandle_, folderListItems); + + const auto makeNextUserState = [&] (const TExprBase& userStateUnpickled) { + return Build<TCoApply>(ctx, PosHandle_) + .Callable(PostHandler_.GetRef()) + .FreeArgs() + .Add(folderListExpr) + .Add(userStateUnpickled) + .Add<TCoInt64>() + .Literal() + .Value(ToString(folder.Level)) + .Build() + .Build() + .Build() + .Build() + .Value() + .Ptr(); + }; + + ProcessingState_ = WaitingListFolderOp; + + ProcessFoldersQueue_.pop_front(); + return EvaluateNextUserStateExpr(ctx, args.UserStateType, args.UserStateExpr, makeNextUserState); +} + + +TExprNode::TPtr TWalkFoldersImpl::BuildFinishedState(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args) { + // TODO: Dump large user state to file + // const auto dataLen = args.UserStateExpr->IsCallable("String") + // ? TCoString(args.UserStateExpr).Literal().StringValue().Size() + // : 0; + + const auto userStateUnpickled = Build<TCoUnpickle>(ctx, PosHandle_) + .Type(args.UserStateType) + .Buffer(args.UserStateExpr) + .DoBuild() + .Ptr(); + + ctx.Step.Repeat(TExprStep::ExprEval); + ProcessingState_ = FinishedHandling; + + return ctx.Builder(PosHandle_) + .Callable("EvaluateExpr") + .Add(0, userStateUnpickled) + .Seal() + .Build(); +} +} // namespace NYql diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery_walk_folders.h b/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery_walk_folders.h new file mode 100644 index 0000000000..281d3a0b18 --- /dev/null +++ b/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery_walk_folders.h @@ -0,0 +1,126 @@ +#include <ydb/library/yql/providers/yt/provider/yql_yt_gateway.h> +#include <ydb/library/yql/providers/yt/provider/yql_yt_key.h> + +#include <library/cpp/yson/node/node_io.h> +#include <library/cpp/threading/future/core/future.h> + +namespace NYql { +NNodes::TExprBase +BuildFolderListItemExpr(TExprContext &ctx, NYql::TPositionHandle pos, + const TString &path, const TString &type, + const TString &attributesYson); + +NNodes::TCoList +BuildFolderListExpr(TExprContext& ctx, NYql::TPositionHandle pos, + const TVector<NNodes::TExprBase>& folderItems); + +NNodes::TCoStructType +BuildFolderItemStructType(TExprContext& ctx, NYql::TPositionHandle pos); + +class TWalkFoldersImpl { +public: + TWalkFoldersImpl(const TString& sessionId, const TString& cluster, TYtSettings::TConstPtr config, + TPosition pos, TYtKey::TWalkFoldersArgs&& args, const IYtGateway::TPtr gateway); + + TExprNode::TPtr GetNextStateExpr(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args); + + enum EProcessingState { + WaitingListFolderOp, + PreHandling, + ResolveHandling, + AfterResolveHandling, + WaitingResolveLinkOp, + DiveHandling, + AfterDiveHandling, + PostHandling, + FinishingHandling, + FinishedHandling + }; + + EProcessingState GetProcessingState() const { + return ProcessingState_; + } + + bool IsFinished() const { + return ProcessingState_ == FinishedHandling; + } + + NThreading::TFuture<void> GetAnyOpFuture() const { + TVector<NThreading::TFuture<void>> futures; + if (BatchFolderListFuture_ && BatchFolderListFuture_->Initialized()) { + futures.push_back(BatchFolderListFuture_->IgnoreResult()); + } + if (BatchResolveFuture_&& BatchResolveFuture_->Initialized()) { + futures.push_back(BatchResolveFuture_->IgnoreResult()); + } + return NThreading::WaitAny(futures); + } + + TWalkFoldersImpl& operator=(const TWalkFoldersImpl&) = delete; + TWalkFoldersImpl(const TWalkFoldersImpl&) = delete; + + TWalkFoldersImpl(TWalkFoldersImpl&&) = default; + TWalkFoldersImpl& operator=(TWalkFoldersImpl&&) = default; + +private: + static constexpr size_t LARGE_USER_STATE = 8192; + + TPosition Pos_; + TPositionHandle PosHandle_; + + TMaybe<TExprNode::TPtr> PreHandler_; + TMaybe<TExprNode::TPtr> ResolveHandler_; + TMaybe<TExprNode::TPtr> DiveHandler_; + TMaybe<TExprNode::TPtr> PostHandler_; + + struct TFolderQueueItem { + TYtKey::TFolderList Folder; + + bool PreHandleItemsFetched = false; + + TVector<IYtGateway::TBatchFolderResult::TFolderItem> ItemsToPreHandle; + TVector<IYtGateway::TBatchFolderResult::TFolderItem> LinksToResolveHandle; + TVector<IYtGateway::TBatchFolderResult::TFolderItem> ItemsToDiveHandle; + TVector<IYtGateway::TBatchFolderResult::TFolderItem> ItemsToPostHandle; + + ui64 Level = 0; + }; + TDeque<TFolderQueueItem> ProcessFoldersQueue_; + + EProcessingState ProcessingState_ = WaitingListFolderOp; + + TString SessionId_; + TString Cluster_; + TYtSettings::TConstPtr Config_; + + IYtGateway::TPtr Gateway_; + + TMaybe<NThreading::TFuture<IYtGateway::TBatchFolderResult>> BatchFolderListFuture_; + TMaybe<NThreading::TFuture<IYtGateway::TBatchFolderResult>> BatchResolveFuture_; + + void DoFolderListOperation(TVector<IYtGateway::TBatchFolderOptions::TFolderPrefixAttrs>&& folders); + + TExprNode::TPtr EvaluateNextUserStateExpr(TExprContext& ctx, const TExprNode::TPtr& userStateType, const TExprNode::TPtr userStateExpr, std::function<TExprNode::TPtr(const NNodes::TExprBase&)> nextStateFunc); + + TExprNode::TPtr AfterListFolderOp(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args); + + TExprNode::TPtr PreHandleVisitedInSingleFolder(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args, TFolderQueueItem& folder); + + TExprNode::TPtr ResolveHandleInSingleFolder(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args, TFolderQueueItem& folder); + + TExprNode::TPtr BuildDiveOrResolveHandlerEval(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args, TExprNode::TPtr& handler, + const TVector<IYtGateway::TBatchFolderResult::TFolderItem>& res, const TVector<TString>& attributes, ui64 level); + + TExprNode::TPtr AfterResolveHandle(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args, TFolderQueueItem& folder); + + TExprNode::TPtr HandleAfterResolveFuture(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args, TFolderQueueItem& folder); + + TExprNode::TPtr DiveHandleInSingleFolder(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args, TFolderQueueItem& folder); + + TExprNode::TPtr AfterDiveHandle(TExprContext& ctx, TYtKey::TWalkFoldersImplArgs args, TFolderQueueItem& folder); + + TExprNode::TPtr PostHandleVisitedInSingleFolder(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args, TFolderQueueItem& folder); + + TExprNode::TPtr BuildFinishedState(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args); +}; +} // namespace NYql diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_key.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_key.cpp index a60603cb43..ab78d45824 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_key.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_key.cpp @@ -1,6 +1,8 @@ #include "yql_yt_key.h" +#include <library/cpp/yson/node/node_io.h> #include <ydb/library/yql/providers/yt/common/yql_names.h> +#include <ydb/library/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.h> #include <ydb/library/yql/core/yql_expr_type_annotation.h> #include <util/string/builder.h> @@ -16,6 +18,8 @@ THashSet<TStringBuf> EXT_KEY_CALLABLES = { TStringBuf("Key"), TStringBuf("TempTable"), MrFolderName, + MrWalkFoldersName, + MrWalkFoldersImplName }; THashSet<TStringBuf> KEY_CALLABLES = { @@ -26,6 +30,7 @@ THashSet<TStringBuf> KEY_CALLABLES = { } bool TYtKey::Parse(const TExprNode& key, TExprContext& ctx) { + using namespace NNodes; if (!key.IsCallable(EXT_KEY_CALLABLES)) { ctx.AddError(TIssue(ctx.GetPosition(key.Pos()), TStringBuf("Expected key"))); return false; @@ -63,6 +68,47 @@ bool TYtKey::Parse(const TExprNode& key, TExprContext& ctx) { " and may have second tag - view"))); return false; } + else if (const auto maybeWalkFolders = TMaybeNode<TYtWalkFolders>(&key)) { + Type = EType::WalkFolders; + const auto walkFolders = maybeWalkFolders.Cast(); + + TFolderList initialListFolder; + initialListFolder.Prefix = walkFolders.Prefix(); + Split(TString(walkFolders.Attributes().StringValue()), ";", initialListFolder.Attributes); + + WalkFolderArgs = MakeMaybe(TWalkFoldersArgs{ + .InitialFolder = std::move(initialListFolder), + .PickledUserState = walkFolders.PickledUserState().Ptr(), + .UserStateType = walkFolders.UserStateType().Ptr(), + .PreHandler = walkFolders.PreHandler().Ptr(), + .ResolveHandler = walkFolders.ResolveHandler().Ptr(), + .DiveHandler = walkFolders.DiveHandler().Ptr(), + .PostHandler = walkFolders.PostHandler().Ptr(), + .StateKey = walkFolders.Ref().UniqueId(), + }); + + return true; + } + else if (const auto maybeWalkFolders = TMaybeNode<TYtWalkFoldersImpl>(&key)) { + Type = EType::WalkFoldersImpl; + const auto walkFolders = maybeWalkFolders.Cast(); + + ui64 stateKey; + if (!TryFromString(walkFolders.ProcessStateKey().StringValue(), stateKey)) { + ctx.AddError(TIssue(ctx.GetPosition(key.Pos()), + TStringBuilder() << MrWalkFoldersImplName << ": incorrect format of state map key")); + return false; + } + + WalkFolderImplArgs = MakeMaybe(TWalkFoldersImplArgs{ + .UserStateExpr = walkFolders.PickledUserState().Ptr(), + .UserStateType = walkFolders.UserStateType().Ptr(), + .StateKey = stateKey, + }); + + Type = EType::WalkFoldersImpl; + return true; + } auto tagName = key.Child(0)->Child(0)->Content(); if (tagName == TStringBuf("table")) { diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_key.h b/ydb/library/yql/providers/yt/provider/yql_yt_key.h index 14c17479ad..f1381900d7 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_key.h +++ b/ydb/library/yql/providers/yt/provider/yql_yt_key.h @@ -19,6 +19,8 @@ public: Table, TableScheme, Folder, + WalkFolders, + WalkFoldersImpl, }; struct TRange { @@ -44,6 +46,27 @@ public: && left.Attributes == right.Attributes; } }; + + struct TWalkFoldersArgs { + TFolderList InitialFolder; + + TExprNode::TPtr PickledUserState; + TExprNode::TPtr UserStateType; + + TExprNode::TPtr PreHandler; + TExprNode::TPtr ResolveHandler; + TExprNode::TPtr DiveHandler; + TExprNode::TPtr PostHandler; + + ui64 StateKey; + }; + + struct TWalkFoldersImplArgs { + TExprNode::TPtr UserStateExpr; + TExprNode::TPtr UserStateType; + + ui64 StateKey; + }; public: TYtKey() { @@ -81,6 +104,16 @@ public: return Folder; } + TMaybe<TWalkFoldersArgs>& GetWalkFolderArgs() { + YQL_ENSURE(Type != EType::Undefined); + return WalkFolderArgs; + } + + TMaybe<TWalkFoldersImplArgs>& GetWalkFolderImplArgs() { + YQL_ENSURE(Type != EType::Undefined); + return WalkFolderImplArgs; + } + bool Parse(const TExprNode& key, TExprContext& ctx); private: @@ -91,6 +124,8 @@ private: bool Anonymous = false; TMaybe<TRange> Range; TMaybe<TFolderList> Folder; + TMaybe<TWalkFoldersArgs> WalkFolderArgs; + TMaybe<TWalkFoldersImplArgs> WalkFolderImplArgs; }; class TYtInputKeys { @@ -106,6 +141,10 @@ public: return Keys; } + TVector<TYtKey>&& ExtractKeys() { + return std::move(Keys); + } + bool IsProcessed() const { return HasNonKeys; } diff --git a/ydb/library/yql/sql/v1/context.h b/ydb/library/yql/sql/v1/context.h index a1e531b587..5120d49f09 100644 --- a/ydb/library/yql/sql/v1/context.h +++ b/ydb/library/yql/sql/v1/context.h @@ -235,6 +235,7 @@ namespace NSQLTranslationV1 { TMap<TString, TNodePtr> UniversalAliases; THashSet<TString> Exports; THashMap<TString, TString> ImportModuleAliases; + THashMap<TString, TString> RequiredModules; TMap<TString, TString> SimpleUdfs; NSQLTranslation::TIncrementMonCounterFunction IncrementMonCounterFunction; TScopedStatePtr Scoped; diff --git a/ydb/library/yql/sql/v1/query.cpp b/ydb/library/yql/sql/v1/query.cpp index 8d2147ca98..c0414b919e 100644 --- a/ydb/library/yql/sql/v1/query.cpp +++ b/ydb/library/yql/sql/v1/query.cpp @@ -241,7 +241,7 @@ public: } TCiString func(Func); - if (func != "object") { + if (func != "object" && func != "walkfolders") { for (auto& arg: Args) { if (arg.Expr->GetLabel()) { ctx.Error(Pos) << "Named arguments are not supported for table function " << to_upper(Func); @@ -480,6 +480,115 @@ public: folder = L(folder, Args.size() > 1 ? Args[1].Id.Build() : BuildQuotedAtom(Pos, "")); return folder; } + else if (func == "walkfolders") { + const size_t minPositionalArgs = 1; + const size_t maxPositionalArgs = 2; + + size_t positionalArgsCnt = 0; + for (const auto& arg : Args) { + if (!arg.Expr->GetLabel()) { + positionalArgsCnt++; + } else { + break; + } + } + if (positionalArgsCnt < minPositionalArgs || positionalArgsCnt > maxPositionalArgs) { + ctx.Error(Pos) << Func << " requires from " << minPositionalArgs + << " to " << maxPositionalArgs + << " positional arguments, but got: " << positionalArgsCnt; + return nullptr; + } + + constexpr auto walkFoldersModuleName = "walk_folders_module"; + ctx.RequiredModules.emplace(walkFoldersModuleName, "/lib/yql/walk_folders.yql"); + + auto& rootFolderArg = Args[0]; + if (rootFolderArg.HasAt) { + ctx.Error(Pos) << "Temporary tables are not supported here"; + return nullptr; + } + if (!rootFolderArg.View.empty()) { + ctx.Error(Pos) << Func << " doesn't supports views"; + return nullptr; + } + ExtractTableName(ctx, rootFolderArg); + + const auto initState = + positionalArgsCnt > 1 + ? Args[1].Expr + : Y("List", Y("ListType", Y("DataType", Q("String")))); + + TNodePtr rootAttributes; + TNodePtr preHandler; + TNodePtr resolveHandler; + TNodePtr diveHandler; + TNodePtr postHandler; + for (auto it = Args.begin() + positionalArgsCnt; it != Args.end(); ++it) { + auto& arg = *it; + const auto label = arg.Expr->GetLabel(); + if (label == "RootAttributes") { + ExtractTableName(ctx, arg); + rootAttributes = arg.Id.Build(); + } + else if (label == "PreHandler") { + preHandler = arg.Expr; + } + else if (label == "ResolveHandler") { + resolveHandler = arg.Expr; + } + else if (label == "DiveHandler") { + diveHandler = arg.Expr; + } + else if (label == "PostHandler") { + postHandler = arg.Expr; + } + else { + ctx.Warning(Pos, DEFAULT_ERROR) << "Unsupported named argument: " + << label << " in " << Func; + } + } + if (rootAttributes == nullptr) { + rootAttributes = BuildQuotedAtom(Pos, ""); + } + + if (preHandler != nullptr || postHandler != nullptr) { + const auto makePrePostHandlerType = BuildBind(Pos, walkFoldersModuleName, "MakePrePostHandlersType"); + const auto prePostHandlerType = Y("EvaluateType", Y("TypeHandle", Y("Apply", makePrePostHandlerType, Y("TypeOf", initState)))); + + if (preHandler != nullptr) { + preHandler = Y("Callable", prePostHandlerType, preHandler); + } + if (postHandler != nullptr) { + postHandler = Y("Callable", prePostHandlerType, postHandler); + } + } + if (preHandler == nullptr) { + preHandler = Y("Void"); + } + if (postHandler == nullptr) { + postHandler = Y("Void"); + } + + const auto makeResolveDiveHandlerType = BuildBind(Pos, walkFoldersModuleName, "MakeResolveDiveHandlersType"); + const auto resolveDiveHandlerType = Y("EvaluateType", Y("TypeHandle", Y("Apply", makeResolveDiveHandlerType, Y("TypeOf", initState)))); + if (resolveHandler == nullptr) { + resolveHandler = BuildBind(Pos, walkFoldersModuleName, "AnyNodeDiveHandler"); + } + if (diveHandler == nullptr) { + diveHandler = BuildBind(Pos, walkFoldersModuleName, "AnyNodeDiveHandler"); + } + + resolveHandler = Y("Callable", resolveDiveHandlerType, resolveHandler); + diveHandler = Y("Callable", resolveDiveHandlerType, diveHandler); + + const auto initStateType = Y("EvaluateType", Y("TypeHandle", Y("TypeOf", initState))); + const auto pickledInitState = Y("Pickle", initState); + + const auto initPath = rootFolderArg.Id.Build(); + + return Y("MrWalkFolders", initPath, rootAttributes, pickledInitState, initStateType, + preHandler, resolveHandler, diveHandler, postHandler); + } else if (func == "tables") { if (!Args.empty()) { ctx.Error(Pos) << Func << " doesn't accept arguments"; @@ -2619,6 +2728,12 @@ public: Nodes.insert(Nodes.end(), preparedNodes.begin(), preparedNodes.end()); } + decltype(Nodes) imports; + for (const auto& [alias, path]: ctx.RequiredModules) { + imports.push_back(Y("import", alias, BuildQuotedAtom(Pos, path))); + } + Nodes.insert(Nodes.begin(), std::make_move_iterator(imports.begin()), std::make_move_iterator(imports.end())); + for (const auto& symbol: ctx.Exports) { Add(Y("export", symbol)); } |