aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorfedor-miron <fedor-miron@ydb.tech>2023-12-06 21:10:28 +0300
committerfedor-miron <fedor-miron@ydb.tech>2023-12-06 23:46:58 +0300
commit40302d2a5a8b044cc96c75e956916c2a18ad4fdb (patch)
treeabb17a18b8a12ffbd5a93215ffc72702d968b0a5
parentb2c749509aee1b26713f88e93aa7c7f7059e31e0 (diff)
downloadydb-40302d2a5a8b044cc96c75e956916c2a18ad4fdb.tar.gz
YQL-9853: implement WalkFolders
-rw-r--r--ydb/library/yql/core/expr_nodes/yql_expr_nodes.json27
-rwxr-xr-x[-rw-r--r--]ydb/library/yql/core/expr_nodes_gen/gen/__main__.py0
-rw-r--r--ydb/library/yql/core/services/mounts/CMakeLists.darwin-arm64.txt6
-rw-r--r--ydb/library/yql/core/services/mounts/CMakeLists.darwin-x86_64.txt6
-rw-r--r--ydb/library/yql/core/services/mounts/CMakeLists.linux-aarch64.txt6
-rw-r--r--ydb/library/yql/core/services/mounts/CMakeLists.linux-x86_64.txt6
-rw-r--r--ydb/library/yql/core/services/mounts/CMakeLists.windows-x86_64.txt6
-rw-r--r--ydb/library/yql/core/services/mounts/ya.make1
-rw-r--r--ydb/library/yql/core/services/mounts/yql_mounts.cpp1
-rw-r--r--ydb/library/yql/mount/lib/yql/walk_folders.yql36
-rw-r--r--ydb/library/yql/providers/yt/common/yql_configuration.h2
-rw-r--r--ydb/library/yql/providers/yt/common/yql_names.h2
-rw-r--r--ydb/library/yql/providers/yt/common/yql_yt_settings.cpp1
-rw-r--r--ydb/library/yql/providers/yt/common/yql_yt_settings.h1
-rw-r--r--ydb/library/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.json27
-rw-r--r--ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.cpp13
-rw-r--r--ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.h2
-rw-r--r--ydb/library/yql/providers/yt/provider/CMakeLists.darwin-arm64.txt1
-rw-r--r--ydb/library/yql/providers/yt/provider/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/yt/provider/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/providers/yt/provider/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/yt/provider/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/yt/provider/ya.make1
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_io_discovery.cpp348
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_io_discovery_walk_folders.cpp582
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_io_discovery_walk_folders.h126
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_key.cpp46
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_key.h39
-rw-r--r--ydb/library/yql/sql/v1/context.h1
-rw-r--r--ydb/library/yql/sql/v1/query.cpp117
30 files changed, 1266 insertions, 142 deletions
diff --git a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
index 09f1cb27f4..42daf56f28 100644
--- a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
+++ b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
@@ -707,6 +707,15 @@
"Match": {"Type": "Callable", "Name": "AggregateFinalize"}
},
{
+ "Name": "TCoFold",
+ "Base": "TCoInputBase",
+ "Match": {"Type": "Callable", "Name": "Fold"},
+ "Children": [
+ {"Index": 1, "Name": "State", "Type": "TExprBase"},
+ {"Index": 2, "Name": "UpdateHandler", "Type": "TCoLambda"}
+ ]
+ },
+ {
"Name": "TCoFold1",
"Base": "TCoInputBase",
"Match": {"Type": "Callable", "Name": "Fold1"},
@@ -2393,7 +2402,6 @@
"Base": "TCoInputBase",
"Match": {"Type": "Callable", "Name": "WideFromBlocks"}
},
-
{
"Name": "TCoPgSelect",
"Base": "TCallable",
@@ -2419,6 +2427,23 @@
{"Index": 1, "Name": "Type", "Type": "TCallable"},
{"Index": 2, "Name": "Lambda", "Type": "TCoLambda"}
]
+ },
+ {
+ "Name": "TCoPickle",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "Pickle"},
+ "Children": [
+ {"Index": 0, "Name": "Value", "Type": "TExprBase"}
+ ]
+ },
+ {
+ "Name": "TCoUnpickle",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "Unpickle"},
+ "Children": [
+ {"Index": 0, "Name": "Type", "Type": "TCallable"},
+ {"Index": 1, "Name": "Buffer", "Type": "TExprBase"}
+ ]
}
]
}
diff --git a/ydb/library/yql/core/expr_nodes_gen/gen/__main__.py b/ydb/library/yql/core/expr_nodes_gen/gen/__main__.py
index 2bc809f4e7..2bc809f4e7 100644..100755
--- a/ydb/library/yql/core/expr_nodes_gen/gen/__main__.py
+++ b/ydb/library/yql/core/expr_nodes_gen/gen/__main__.py
diff --git a/ydb/library/yql/core/services/mounts/CMakeLists.darwin-arm64.txt b/ydb/library/yql/core/services/mounts/CMakeLists.darwin-arm64.txt
index 21c6fedd14..656e2bf4a6 100644
--- a/ydb/library/yql/core/services/mounts/CMakeLists.darwin-arm64.txt
+++ b/ydb/library/yql/core/services/mounts/CMakeLists.darwin-arm64.txt
@@ -40,20 +40,22 @@ target_link_libraries(core-services-mounts.global PUBLIC
library-yql-core
)
target_sources(core-services-mounts.global PRIVATE
- ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/bee1a30d03545744c170685330eaf0c3.cpp
+ ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/97eabb381e1d4dd4acd7e8000ea2563e.cpp
)
resources(core-services-mounts.global
- ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/bee1a30d03545744c170685330eaf0c3.cpp
+ ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/97eabb381e1d4dd4acd7e8000ea2563e.cpp
INPUTS
${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/aggregate.yql
${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/window.yql
${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/id.yql
${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/sqr.yql
${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/core.yql
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/walk_folders.yql
KEYS
/lib/yql/aggregate.yql
/lib/yql/window.yql
/lib/yql/id.yql
/lib/yql/sqr.yql
/lib/yql/core.yql
+ /lib/yql/walk_folders.yql
)
diff --git a/ydb/library/yql/core/services/mounts/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/core/services/mounts/CMakeLists.darwin-x86_64.txt
index 21c6fedd14..656e2bf4a6 100644
--- a/ydb/library/yql/core/services/mounts/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/core/services/mounts/CMakeLists.darwin-x86_64.txt
@@ -40,20 +40,22 @@ target_link_libraries(core-services-mounts.global PUBLIC
library-yql-core
)
target_sources(core-services-mounts.global PRIVATE
- ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/bee1a30d03545744c170685330eaf0c3.cpp
+ ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/97eabb381e1d4dd4acd7e8000ea2563e.cpp
)
resources(core-services-mounts.global
- ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/bee1a30d03545744c170685330eaf0c3.cpp
+ ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/97eabb381e1d4dd4acd7e8000ea2563e.cpp
INPUTS
${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/aggregate.yql
${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/window.yql
${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/id.yql
${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/sqr.yql
${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/core.yql
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/walk_folders.yql
KEYS
/lib/yql/aggregate.yql
/lib/yql/window.yql
/lib/yql/id.yql
/lib/yql/sqr.yql
/lib/yql/core.yql
+ /lib/yql/walk_folders.yql
)
diff --git a/ydb/library/yql/core/services/mounts/CMakeLists.linux-aarch64.txt b/ydb/library/yql/core/services/mounts/CMakeLists.linux-aarch64.txt
index 83a9cd73e8..336f24123f 100644
--- a/ydb/library/yql/core/services/mounts/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/core/services/mounts/CMakeLists.linux-aarch64.txt
@@ -42,20 +42,22 @@ target_link_libraries(core-services-mounts.global PUBLIC
library-yql-core
)
target_sources(core-services-mounts.global PRIVATE
- ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/bee1a30d03545744c170685330eaf0c3.cpp
+ ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/97eabb381e1d4dd4acd7e8000ea2563e.cpp
)
resources(core-services-mounts.global
- ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/bee1a30d03545744c170685330eaf0c3.cpp
+ ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/97eabb381e1d4dd4acd7e8000ea2563e.cpp
INPUTS
${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/aggregate.yql
${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/window.yql
${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/id.yql
${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/sqr.yql
${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/core.yql
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/walk_folders.yql
KEYS
/lib/yql/aggregate.yql
/lib/yql/window.yql
/lib/yql/id.yql
/lib/yql/sqr.yql
/lib/yql/core.yql
+ /lib/yql/walk_folders.yql
)
diff --git a/ydb/library/yql/core/services/mounts/CMakeLists.linux-x86_64.txt b/ydb/library/yql/core/services/mounts/CMakeLists.linux-x86_64.txt
index 83a9cd73e8..336f24123f 100644
--- a/ydb/library/yql/core/services/mounts/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/core/services/mounts/CMakeLists.linux-x86_64.txt
@@ -42,20 +42,22 @@ target_link_libraries(core-services-mounts.global PUBLIC
library-yql-core
)
target_sources(core-services-mounts.global PRIVATE
- ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/bee1a30d03545744c170685330eaf0c3.cpp
+ ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/97eabb381e1d4dd4acd7e8000ea2563e.cpp
)
resources(core-services-mounts.global
- ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/bee1a30d03545744c170685330eaf0c3.cpp
+ ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/97eabb381e1d4dd4acd7e8000ea2563e.cpp
INPUTS
${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/aggregate.yql
${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/window.yql
${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/id.yql
${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/sqr.yql
${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/core.yql
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/walk_folders.yql
KEYS
/lib/yql/aggregate.yql
/lib/yql/window.yql
/lib/yql/id.yql
/lib/yql/sqr.yql
/lib/yql/core.yql
+ /lib/yql/walk_folders.yql
)
diff --git a/ydb/library/yql/core/services/mounts/CMakeLists.windows-x86_64.txt b/ydb/library/yql/core/services/mounts/CMakeLists.windows-x86_64.txt
index 21c6fedd14..656e2bf4a6 100644
--- a/ydb/library/yql/core/services/mounts/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/core/services/mounts/CMakeLists.windows-x86_64.txt
@@ -40,20 +40,22 @@ target_link_libraries(core-services-mounts.global PUBLIC
library-yql-core
)
target_sources(core-services-mounts.global PRIVATE
- ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/bee1a30d03545744c170685330eaf0c3.cpp
+ ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/97eabb381e1d4dd4acd7e8000ea2563e.cpp
)
resources(core-services-mounts.global
- ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/bee1a30d03545744c170685330eaf0c3.cpp
+ ${CMAKE_BINARY_DIR}/ydb/library/yql/core/services/mounts/97eabb381e1d4dd4acd7e8000ea2563e.cpp
INPUTS
${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/aggregate.yql
${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/window.yql
${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/id.yql
${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/sqr.yql
${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/core.yql
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/mount/lib/yql/walk_folders.yql
KEYS
/lib/yql/aggregate.yql
/lib/yql/window.yql
/lib/yql/id.yql
/lib/yql/sqr.yql
/lib/yql/core.yql
+ /lib/yql/walk_folders.yql
)
diff --git a/ydb/library/yql/core/services/mounts/ya.make b/ydb/library/yql/core/services/mounts/ya.make
index 5092ba058e..4a85b1b3e4 100644
--- a/ydb/library/yql/core/services/mounts/ya.make
+++ b/ydb/library/yql/core/services/mounts/ya.make
@@ -19,6 +19,7 @@ RESOURCE(
ydb/library/yql/mount/lib/yql/id.yql /lib/yql/id.yql
ydb/library/yql/mount/lib/yql/sqr.yql /lib/yql/sqr.yql
ydb/library/yql/mount/lib/yql/core.yql /lib/yql/core.yql
+ ydb/library/yql/mount/lib/yql/walk_folders.yql /lib/yql/walk_folders.yql
)
END()
diff --git a/ydb/library/yql/core/services/mounts/yql_mounts.cpp b/ydb/library/yql/core/services/mounts/yql_mounts.cpp
index cd6e8e2559..63151a99e8 100644
--- a/ydb/library/yql/core/services/mounts/yql_mounts.cpp
+++ b/ydb/library/yql/core/services/mounts/yql_mounts.cpp
@@ -105,6 +105,7 @@ namespace NYql {
AddLibraryFromResource(userData, "/lib/yql/id.yql");
AddLibraryFromResource(userData, "/lib/yql/sqr.yql");
AddLibraryFromResource(userData, "/lib/yql/core.yql");
+ AddLibraryFromResource(userData, "/lib/yql/walk_folders.yql");
}
TUserDataTable GetYqlModuleResolverImpl(
diff --git a/ydb/library/yql/mount/lib/yql/walk_folders.yql b/ydb/library/yql/mount/lib/yql/walk_folders.yql
new file mode 100644
index 0000000000..1cf37bc31f
--- /dev/null
+++ b/ydb/library/yql/mount/lib/yql/walk_folders.yql
@@ -0,0 +1,36 @@
+#library
+(
+(let AnyNodeDiveHandler
+ (lambda
+ '(nodes state attrList level)
+ '((Map (ListExtract nodes 'Path) (lambda '(path) '(path attrList))) state)
+ )
+)
+
+(let AttrListType (ListType (DataType 'String)))
+
+(let YtFolderNodeType
+ (StructType '('"Attributes" (DataType 'Yson)) '('"Path" (DataType 'String)) '('"Type" (DataType 'String)))
+)
+
+(let TraverseLevelType (DataType 'Int32))
+
+(let MakePrePostHandlersType
+ (lambda '(stateType)
+ (CallableType '() '(stateType) '((ListType YtFolderNodeType))
+ '(stateType) '(TraverseLevelType))
+ )
+)
+
+(let MakeResolveDiveHandlersType
+ (lambda '(stateType)
+ (CallableType '() '((TupleType (ListType (TupleType (DataType 'String) AttrListType)) stateType))
+ '((ListType YtFolderNodeType)) '(stateType) '(AttrListType) '(TraverseLevelType))
+ )
+)
+
+(export AnyNodeDiveHandler)
+(export YtFolderNodeType)
+(export MakePrePostHandlersType)
+(export MakeResolveDiveHandlersType)
+) \ No newline at end of file
diff --git a/ydb/library/yql/providers/yt/common/yql_configuration.h b/ydb/library/yql/providers/yt/common/yql_configuration.h
index 4ffa3068dc..6d9ca0338c 100644
--- a/ydb/library/yql/providers/yt/common/yql_configuration.h
+++ b/ydb/library/yql/providers/yt/common/yql_configuration.h
@@ -64,4 +64,6 @@ constexpr ui64 DEFAULT_APPLY_STORED_CONSTRAINTS = 0ULL;
constexpr bool DEFAULT_TABLE_CONTENT_LOCAL_EXEC = false;
+constexpr ui32 DEFAULT_BATCH_LIST_FOLDER_CONCURRENCY = 5;
+
} // NYql
diff --git a/ydb/library/yql/providers/yt/common/yql_names.h b/ydb/library/yql/providers/yt/common/yql_names.h
index 1fece7c714..61fadf3495 100644
--- a/ydb/library/yql/providers/yt/common/yql_names.h
+++ b/ydb/library/yql/providers/yt/common/yql_names.h
@@ -49,6 +49,8 @@ const TStringBuf MrTableRangeName = "MrTableRange";
const TStringBuf MrTableRangeStrictName = "MrTableRangeStrict";
const TStringBuf MrTableConcatName = "MrTableConcat";
const TStringBuf MrFolderName = "MrFolder";
+const TStringBuf MrWalkFoldersName = "MrWalkFolders";
+const TStringBuf MrWalkFoldersImplName = "MrWalkFoldersImpl";
const TStringBuf MrRangeInputListInternal = "MrRangeInputListInternal";
// YT related names
diff --git a/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp b/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp
index a107bc59f1..508aae116b 100644
--- a/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp
+++ b/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp
@@ -427,6 +427,7 @@ TYtConfiguration::TYtConfiguration()
REGISTER_SETTING(*this, FileCacheTtl);
REGISTER_SETTING(*this, _ImpersonationUser);
REGISTER_SETTING(*this, InferSchemaMode).Parser([](const TString& v) { return FromString<EInferSchemaMode>(v); });
+ REGISTER_SETTING(*this, BatchListFolderConcurrency).Lower(1); // Upper bound on concurrent batch folder list requests https://yt.yandex-team.ru/docs/api/commands#execute_batch
REGISTER_SETTING(*this, JoinCommonUseMapMultiOut);
REGISTER_SETTING(*this, _EnableYtPartitioning);
REGISTER_SETTING(*this, UseAggPhases);
diff --git a/ydb/library/yql/providers/yt/common/yql_yt_settings.h b/ydb/library/yql/providers/yt/common/yql_yt_settings.h
index 1aa9c4c40c..9b5de8aa7c 100644
--- a/ydb/library/yql/providers/yt/common/yql_yt_settings.h
+++ b/ydb/library/yql/providers/yt/common/yql_yt_settings.h
@@ -101,6 +101,7 @@ struct TYtSettings {
NCommon::TConfSetting<TDuration, false> FileCacheTtl;
NCommon::TConfSetting<TString, false> _ImpersonationUser;
NCommon::TConfSetting<EInferSchemaMode, false> InferSchemaMode;
+ NCommon::TConfSetting<ui32, false> BatchListFolderConcurrency;
// Job runtime
NCommon::TConfSetting<TString, true> Pool;
diff --git a/ydb/library/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.json b/ydb/library/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.json
index 3b7c224fd6..447d044684 100644
--- a/ydb/library/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.json
+++ b/ydb/library/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.json
@@ -457,6 +457,31 @@
{"Index": 0, "Name": "First", "Type": "TYtOutputOpBase"},
{"Index": 1, "Name": "Second", "Type": "TYtOutputOpBase"}
]
+ },
+ {
+ "Name": "TYtWalkFolders",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "MrWalkFolders"},
+ "Children": [
+ {"Index": 0, "Name": "Prefix", "Type": "TCoAtom"},
+ {"Index": 1, "Name": "Attributes", "Type": "TCoAtom"},
+ {"Index": 2, "Name": "PickledUserState", "Type": "TExprBase"},
+ {"Index": 3, "Name": "UserStateType", "Type": "TExprBase"},
+ {"Index": 4, "Name": "PreHandler", "Type": "TExprBase"},
+ {"Index": 5, "Name": "ResolveHandler", "Type": "TExprBase"},
+ {"Index": 6, "Name": "DiveHandler", "Type": "TExprBase"},
+ {"Index": 7, "Name": "PostHandler", "Type": "TExprBase"}
+ ]
+ },
+ {
+ "Name": "TYtWalkFoldersImpl",
+ "Base": "TCallable",
+ "Match": {"Type": "Callable", "Name": "MrWalkFoldersImpl"},
+ "Children": [
+ {"Index": 0, "Name": "PickledUserState", "Type": "TExprBase"},
+ {"Index": 1, "Name": "UserStateType", "Type": "TExprBase"},
+ {"Index": 2, "Name": "ProcessStateKey", "Type": "TCoAtom"}
+ ]
}
- ]
+ ]
}
diff --git a/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.cpp b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.cpp
index 99d0088803..e5fa5a3742 100644
--- a/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.cpp
+++ b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.cpp
@@ -2,6 +2,7 @@
#include <yt/cpp/mapreduce/interface/error_codes.h>
#include <ydb/library/yql/utils/log/log.h>
+#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h>
#include <ydb/library/yql/providers/yt/gateway/lib/yt_helpers.h>
namespace NYql::NNative {
@@ -170,11 +171,11 @@ IYtGateway::TBatchFolderResult ExecGetFolder(const TExecContext<IYtGateway::TBat
folderResult.SetSuccess();
for (const auto& folder : execCtx->Options_.Folders()) {
- YQL_CLOG(INFO, ProviderYt) << "Executing list command with prefix: " << folder.Prefix;
const auto cacheKey = std::accumulate(folder.AttrKeys.begin(), folder.AttrKeys.end(), folder.Prefix,
[] (TString&& str, const TString& arg) {
return str + "&" + arg;
});
+ YQL_CLOG(INFO, ProviderYt) << "Executing list command with prefix: " << folder.Prefix << " , cacheKey = " << cacheKey;
auto maybeCached = MaybeGetFolderFromCache(entry, cacheKey);
if (maybeCached) {
@@ -217,7 +218,15 @@ IYtGateway::TBatchFolderResult ExecGetFolder(const TExecContext<IYtGateway::TBat
})
);
}
- batchList->ExecuteBatch();
+
+ TExecuteBatchOptions batchOptions;
+ if (batchRes.size() > 1) {
+ const size_t concurrency = execCtx->Options_.Config()->BatchListFolderConcurrency
+ .Get().GetOrElse(DEFAULT_BATCH_LIST_FOLDER_CONCURRENCY);
+ batchOptions.Concurrency(concurrency);
+ }
+ batchList->ExecuteBatch(batchOptions);
+
try {
WaitExceptionOrAll(batchRes).Wait();
for (auto& res : batchRes) {
diff --git a/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.h b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.h
index 645fc2a4b2..c2eb8d1365 100644
--- a/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.h
+++ b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.h
@@ -14,7 +14,7 @@ TMaybe<TFileLinkPtr> MaybeGetFilePtrFromCache(TTransactionCache::TEntry::TPtr en
NYT::TAttributeFilter MakeAttrFilter(const TSet<TString>& attributes, bool isResolvingLink);
-IYtGateway::TBatchFolderResult::TFolderItem MakeFolderItem(const NYT::TNode& node, const TString& path);
+IYtGateway::TBatchFolderResult::TFolderItem MakeFolderItem(const NYT::TNode& node, const TString& prefix, const TString& name, const TVector<TString>& reqAttrKeys);
const TTransactionCache::TEntry::TFolderCache::value_type& StoreResInCache(const TTransactionCache::TEntry::TPtr& entry, TVector<IYtGateway::TBatchFolderResult::TFolderItem>&& items, const TString& cacheKey);
diff --git a/ydb/library/yql/providers/yt/provider/CMakeLists.darwin-arm64.txt b/ydb/library/yql/providers/yt/provider/CMakeLists.darwin-arm64.txt
index a55b537886..0ccf8f9454 100644
--- a/ydb/library/yql/providers/yt/provider/CMakeLists.darwin-arm64.txt
+++ b/ydb/library/yql/providers/yt/provider/CMakeLists.darwin-arm64.txt
@@ -89,6 +89,7 @@ target_sources(providers-yt-provider PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_helpers.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_intent_determination.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery_walk_folders.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_join_reorder.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_key.cpp
diff --git a/ydb/library/yql/providers/yt/provider/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/yt/provider/CMakeLists.darwin-x86_64.txt
index a55b537886..0ccf8f9454 100644
--- a/ydb/library/yql/providers/yt/provider/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/providers/yt/provider/CMakeLists.darwin-x86_64.txt
@@ -89,6 +89,7 @@ target_sources(providers-yt-provider PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_helpers.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_intent_determination.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery_walk_folders.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_join_reorder.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_key.cpp
diff --git a/ydb/library/yql/providers/yt/provider/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/yt/provider/CMakeLists.linux-aarch64.txt
index 885a4f2be9..1ddc770b11 100644
--- a/ydb/library/yql/providers/yt/provider/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/providers/yt/provider/CMakeLists.linux-aarch64.txt
@@ -90,6 +90,7 @@ target_sources(providers-yt-provider PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_helpers.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_intent_determination.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery_walk_folders.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_join_reorder.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_key.cpp
diff --git a/ydb/library/yql/providers/yt/provider/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/yt/provider/CMakeLists.linux-x86_64.txt
index 885a4f2be9..1ddc770b11 100644
--- a/ydb/library/yql/providers/yt/provider/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/providers/yt/provider/CMakeLists.linux-x86_64.txt
@@ -90,6 +90,7 @@ target_sources(providers-yt-provider PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_helpers.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_intent_determination.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery_walk_folders.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_join_reorder.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_key.cpp
diff --git a/ydb/library/yql/providers/yt/provider/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/yt/provider/CMakeLists.windows-x86_64.txt
index a55b537886..0ccf8f9454 100644
--- a/ydb/library/yql/providers/yt/provider/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/providers/yt/provider/CMakeLists.windows-x86_64.txt
@@ -89,6 +89,7 @@ target_sources(providers-yt-provider PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_helpers.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_intent_determination.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery_walk_folders.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_join_impl.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_join_reorder.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/provider/yql_yt_key.cpp
diff --git a/ydb/library/yql/providers/yt/provider/ya.make b/ydb/library/yql/providers/yt/provider/ya.make
index 8c79642442..3c132cc5f6 100644
--- a/ydb/library/yql/providers/yt/provider/ya.make
+++ b/ydb/library/yql/providers/yt/provider/ya.make
@@ -17,6 +17,7 @@ SRCS(
yql_yt_helpers.cpp
yql_yt_intent_determination.cpp
yql_yt_io_discovery.cpp
+ yql_yt_io_discovery_walk_folders.cpp
yql_yt_join_impl.cpp
yql_yt_join_reorder.cpp
yql_yt_key.cpp
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery.cpp
index 0e7249c0dc..9f06686e92 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery.cpp
@@ -3,11 +3,13 @@
#include "yql_yt_gateway.h"
#include "yql_yt_op_settings.h"
#include "yql_yt_helpers.h"
+#include "yql_yt_io_discovery_walk_folders.h"
#include <ydb/library/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.h>
#include <ydb/library/yql/providers/yt/common/yql_names.h>
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
+#include <ydb/library/yql/core/services/yql_eval_expr.h>
#include <ydb/library/yql/core/yql_expr_optimize.h>
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
#include <ydb/library/yql/core/yql_expr_constraint.h>
@@ -40,8 +42,11 @@ public:
}
TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
+ YQL_CLOG(INFO, ProviderYt) << "YtIODiscovery - start";
+
output = input;
if (ctx.Step.IsDone(TExprStep::DiscoveryIO)) {
+ YQL_CLOG(INFO, ProviderYt) << "YtIODiscovery - finish, is done";
return TStatus::Ok;
}
@@ -175,67 +180,46 @@ public:
return status;
}
- status = OptimizeExpr(output, output, [&](const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr {
- if (auto maybeRead = TMaybeNode<TYtRead>(node)) {
- if (!maybeRead.DataSource()) { // Validates provider
- return node;
- }
- auto read = maybeRead.Cast();
- auto ds = read.DataSource();
- if (!EnsureArgsCount(read.Ref(), 5, ctx)) {
- return {};
- }
-
- TYtInputKeys keys;
- if (!keys.Parse(read.Arg(2).Ref(), ctx)) {
- return {};
- }
-
- if (keys.IsProcessed()) {
- // Already processed
- return node;
- }
-
- if (keys.GetKeys().empty()) {
- ctx.AddError(TIssue(ctx.GetPosition(read.Arg(2).Pos()), "The list of tables is empty"));
- return {};
- }
-
- if (keys.GetType() != TYtKey::EType::TableScheme) {
- auto cluster = TString{ds.Cluster().Value()};
- for (auto& key: keys.GetKeys()) {
- auto keyPos = ctx.GetPosition(key.GetNode()->Pos());
- if (key.GetRange()) {
- PendingRanges_.emplace(std::make_pair(cluster, *key.GetRange()), std::make_pair(keyPos, NThreading::TFuture<IYtGateway::TTableRangeResult>()));
- }
- else if (key.GetFolder()) {
- PendingFolders_.emplace(std::make_pair(cluster, *key.GetFolder()), std::make_pair(keyPos, NThreading::TFuture<IYtGateway::TFolderResult>()));
- }
- else if (!key.IsAnonymous()) {
- if (PendingCanonizations_.insert(std::make_pair(std::make_pair(cluster, key.GetPath()), paths.size())).second) {
- paths.push_back(IYtGateway::TCanonizeReq()
- .Cluster(cluster)
- .Path(key.GetPath())
- .Pos(keyPos)
- );
- }
+ status = VisitInputKeys(output, ctx, [this, &ctx, &paths] (TYtRead readNode, TYtInputKeys&& keys) -> TExprNode::TPtr {
+ if (keys.GetType() != TYtKey::EType::TableScheme) {
+ const auto cluster = TString{readNode.DataSource().Cluster().Value()};
+ for (auto&& key: keys.ExtractKeys()) {
+ auto keyPos = ctx.GetPosition(key.GetNode()->Pos());
+ if (key.GetRange()) {
+ PendingRanges_.emplace(std::make_pair(cluster, *key.GetRange()), std::make_pair(keyPos, NThreading::TFuture<IYtGateway::TTableRangeResult>()));
+ }
+ else if (key.GetFolder()) {
+ PendingFolders_.emplace(std::make_pair(cluster, *key.GetFolder()), std::make_pair(keyPos, NThreading::TFuture<IYtGateway::TFolderResult>()));
+ }
+ else if (key.GetWalkFolderArgs()) {
+ return ctx.ChangeChild(readNode.Ref(), 2, InitializeWalkFolders(std::move(key), cluster, keyPos, ctx));
+ }
+ else if (!key.IsAnonymous()) {
+ if (PendingCanonizations_.insert(std::make_pair(std::make_pair(cluster, key.GetPath()), paths.size())).second) {
+ paths.push_back(IYtGateway::TCanonizeReq()
+ .Cluster(cluster)
+ .Path(key.GetPath())
+ .Pos(keyPos)
+ );
}
}
}
- return node;
}
+ return readNode.Ptr();
+ }, /* visitChanges */ true);
- return node;
- }, ctx, TOptimizeExprSettings(nullptr));
-
- if (status.Level != TStatus::Ok) {
+ if (status.Level == TStatus::Error) {
PendingCanonizations_.clear();
PendingFolders_.clear();
PendingRanges_.clear();
+ PendingWalkFolders_.clear();
+ YQL_CLOG(INFO, ProviderYt) << "YtIODiscovery - finish, status: " << (TStatus::ELevel)status.Level;
return status;
}
-
- if (PendingRanges_.empty() && PendingFolders_.empty() && PendingCanonizations_.empty()) {
+
+ if (PendingRanges_.empty() && PendingFolders_.empty()
+ && PendingCanonizations_.empty() && PendingWalkFolders_.empty()) {
+ YQL_CLOG(INFO, ProviderYt) << "YtIODiscovery - finish, status: " << (TStatus::ELevel)status.Level;
return status;
}
@@ -333,16 +317,25 @@ public:
x.second.second = result;
}
- AllFuture_ = NThreading::WaitExceptionOrAll(allFutures);
+ CanonizationRangesFoldersFuture_ = NThreading::WaitExceptionOrAll(allFutures);
+ YQL_CLOG(INFO, ProviderYt) << "YtIODiscovery - finish, status: " << (TStatus::ELevel)status.Level;
return TStatus::Async;
}
NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) final {
Y_UNUSED(input);
- return AllFuture_;
+ if (auto walkFoldersFuture = MaybeGetWalkFoldersFuture()) {
+ if (PendingCanonizations_.empty() && PendingRanges_.empty() && PendingFolders_.empty()) {
+ return walkFoldersFuture.GetRef();
+ }
+ return NThreading::WaitExceptionOrAll(walkFoldersFuture.GetRef(), CanonizationRangesFoldersFuture_);
+ }
+
+ return CanonizationRangesFoldersFuture_;
}
TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
+ YQL_CLOG(INFO, ProviderYt) << "YtIODiscovery - DoApplyAsyncChanges start";
output = input;
if (!PendingCanonizations_.empty()) {
@@ -353,7 +346,7 @@ public:
PendingCanonizations_.clear();
PendingRanges_.clear();
CanonizeFuture_ = {};
- AllFuture_ = {};
+ CanonizationRangesFoldersFuture_ = {};
return TStatus::Error;
}
@@ -415,12 +408,12 @@ public:
YQL_CLOG(INFO, ProviderYt) << "Got " << items.size() << " items for " << " GetFolder";
TVector<TExprBase> listItems;
for (auto& item: items) {
- listItems.push_back(BuildFolderListItemExpr(ctx, node->Pos(), item));
+ listItems.push_back(BuildFolderListItemExpr(ctx, node->Pos(), item.Path, item.Type, item.Attributes));
}
return BuildFolderTableResExpr(ctx, node->Pos(), read.World(), BuildFolderListExpr(ctx, node->Pos(), listItems).Ptr()).Ptr();
}
-
+
if (keys.GetType() != TYtKey::EType::Table) {
return node;
}
@@ -546,17 +539,26 @@ public:
PendingRanges_.clear();
PendingFolders_.clear();
CanonizeFuture_ = {};
- AllFuture_ = {};
+ CanonizationRangesFoldersFuture_ = {};
+ if (!PendingWalkFolders_.empty() && !State_->Types->EvaluationInProgress) {
+ const auto walkFoldersStatus = RewriteWalkFoldersOnAsyncOrEvalChanges(output, ctx);
+ if (walkFoldersStatus != TStatus::Ok) {
+ return walkFoldersStatus;
+ }
+ }
+
+ YQL_CLOG(INFO, ProviderYt) << "YtIODiscovery DoApplyAsyncChanges - finish";
return status;
}
void Rewind() final {
+ YQL_CLOG(INFO, ProviderYt) << "Rewinding YtIODiscovery";
PendingRanges_.clear();
PendingFolders_.clear();
PendingCanonizations_.clear();
CanonizeFuture_ = {};
- AllFuture_ = {};
+ CanonizationRangesFoldersFuture_ = {};
}
private:
@@ -661,83 +663,169 @@ private:
res->ChildRef(4) = Build<TCoNameValueTupleList>(ctx, read.Pos()).Add(readSettings).Done().Ptr();
return res;
}
-
- TExprBase BuildFolderListItemExpr(TExprContext& ctx, NYql::TPositionHandle pos, const IYtGateway::TFolderResult::TFolderItem& folderItem) {
- return Build<TCoAsStruct>(ctx, pos)
- .Add()
- .Add<TCoAtom>()
- .Value("Path")
- .Build()
- .Add<TCoString>()
- .Literal()
- .Value(folderItem.Path)
- .Build()
- .Build()
- .Build()
- .Add()
- .Add<TCoAtom>()
- .Value("Type")
- .Build()
- .Add<TCoString>()
- .Literal()
- .Value(folderItem.Type)
- .Build()
- .Build()
- .Build()
- .Add()
- .Add<TCoAtom>()
- .Value("Attributes")
- .Build()
- .Add<TCoYson>()
- .Literal()
- .Value(folderItem.Attributes)
- .Build()
- .Build()
+
+ [[nodiscard]]
+ TExprNode::TPtr InitializeWalkFolders(TYtKey&& key, const TString& cluster, TPosition pos, TExprContext& ctx) {
+ auto& args = key.GetWalkFolderArgs().GetRef();
+ const auto instanceKey = args.StateKey;
+
+ TWalkFoldersImpl walkFolders {State_->SessionId, cluster, State_->Configuration->Snapshot(),
+ pos, std::move(args), State_->Gateway};
+ YQL_CLOG(INFO, ProviderYt) << "Initialized WalkFolders from " << cluster << ".`"
+ << args.InitialFolder.Prefix << "`" << " with root attributes cnt: "
+ << args.InitialFolder.Attributes.size();
+ PendingWalkFolders_.emplace(instanceKey, std::move(walkFolders));
+
+ auto walkFoldersImplNode = Build<TYtWalkFoldersImpl>(ctx, key.GetNode()->Pos())
+ .ProcessStateKey()
+ .Value(args.StateKey)
.Build()
- .Done();
+ .PickledUserState(args.PickledUserState)
+ .UserStateType(args.UserStateType)
+ .Build()
+ .Value()
+ .Ptr();
+
+ return walkFoldersImplNode;
}
+
+ TStatus RewriteWalkFoldersOnAsyncOrEvalChanges(TExprNode::TPtr& output, TExprContext& ctx) {
+ Y_ENSURE(!PendingWalkFolders_.empty());
- TCoList BuildFolderListExpr(TExprContext& ctx, NYql::TPositionHandle pos, const TVector<TExprBase>& folderItems) {
- return Build<TCoList>(ctx, pos)
- .ListType<TCoListType>()
- .ItemType<TCoStructType>()
- .Add<TExprList>()
- .Add<TCoAtom>()
- .Value("Path")
- .Build()
- .Add<TCoDataType>()
- .Type()
- .Value("String")
+ auto currImplKey = PendingWalkFolders_.begin()->first;
+
+ auto status = VisitInputKeys(output, ctx, [this, &ctx, currImplKey] (TYtRead readNode, TYtInputKeys&& keys) -> TExprNode::TPtr {
+ if (keys.GetType() == TYtKey::EType::WalkFoldersImpl) {
+ YQL_CLOG(INFO, ProviderYt) << "YtIODiscovery - DoApplyAsyncChanges WalkFoldersImpl handling start";
+
+ auto parsedKey = keys.ExtractKeys().front();
+ if (!parsedKey.GetWalkFolderImplArgs()) {
+ YQL_CLOG(INFO, ProviderYt) << "Failed to parse WalkFolderImpl args";
+ return {};
+ }
+
+ const auto instanceKey = parsedKey.GetWalkFolderImplArgs()->StateKey;
+ if (instanceKey != currImplKey) {
+ return readNode.Ptr();
+ }
+
+ auto walkFoldersInstanceIt = PendingWalkFolders_.find(currImplKey);
+ YQL_ENSURE(!walkFoldersInstanceIt.IsEnd(), "Failed to find walkFoldersInstance with key: " << instanceKey);
+
+ auto& walkFoldersImpl = walkFoldersInstanceIt->second;
+
+ Y_ENSURE(walkFoldersImpl.GetAnyOpFuture().HasValue(),
+ "Called RewriteWalkFoldersOnAsyncChanges, but impl future is not ready");
+
+ auto userState =
+ walkFoldersImpl.GetNextStateExpr(ctx, std::move(parsedKey.GetWalkFolderImplArgs().GetRef()));
+ if (walkFoldersImpl.IsFinished()) {
+ YQL_CLOG(INFO, ProviderYt) << "Building result expr for WalkFolders with key: " << instanceKey;
+ PendingWalkFolders_.erase(currImplKey);
+
+ auto type = Build<TCoStructType>(ctx, readNode.Pos())
+ .Add<TExprList>()
+ .Add<TCoAtom>()
+ .Value("State")
.Build()
+ .Add(parsedKey.GetWalkFolderImplArgs()->UserStateType)
.Build()
- .Build()
- .Add<TExprList>()
- .Add<TCoAtom>()
- .Value("Type")
- .Build()
- .Add<TCoDataType>()
- .Type()
- .Value("String")
+ .DoBuild();
+
+ auto resList = Build<TCoList>(ctx, readNode.Pos())
+ .ListType<TCoListType>()
+ .ItemType<TCoStructType>()
+ .InitFrom(type)
.Build()
.Build()
- .Build()
- .Add<TExprList>()
- .Add<TCoAtom>()
- .Value("Attributes")
+ .FreeArgs()
+ .Add<TCoAsStruct>()
+ .Add()
+ .Add<TCoAtom>()
+ .Value("State")
+ .Build()
+ .Add(userState)
+ .Build()
+ .Build()
.Build()
- .Add<TCoDataType>()
- .Type()
- .Value("Yson")
+ .DoBuild();
+
+ return Build<TCoCons>(ctx, readNode.Pos())
+ .World(readNode.World())
+ .Input<TCoAssumeColumnOrder>()
+ .Input(resList)
+ .ColumnOrder<TCoAtomList>()
+ .Add()
+ .Value("State")
+ .Build()
.Build()
.Build()
- .Build()
- .Build()
- .Build()
- .FreeArgs()
- .Add(folderItems)
- .Build()
- .Build()
- .Value();
+ .Done()
+ .Ptr();
+ }
+
+ if (userState == parsedKey.GetWalkFolderImplArgs()->UserStateExpr) {
+ return readNode.Ptr();
+ }
+
+ YQL_CLOG(DEBUG, ProviderYt) << "State expr ast: " << ConvertToAst(*userState, ctx, {}).Root->ToString();
+
+ auto walkFoldersImplNode = ctx.ChangeChild(*parsedKey.GetNode(), 0, std::move(userState));
+ return ctx.ChangeChild(readNode.Ref(), 2, std::move(walkFoldersImplNode));
+
+ return readNode.Ptr();
+ }
+ return readNode.Ptr();
+ });
+
+ if (status != TStatus::Error && !PendingWalkFolders_.empty()) {
+ YQL_CLOG(INFO, ProviderYt) << "Has pending WalkFolders, repeating. ";
+ status = TStatus::Repeat;
+ } else {
+ YQL_CLOG(INFO, ProviderYt) << "All WalkFolders instances are finished. ";
+ status = TStatus::Ok;
+ }
+
+ YQL_CLOG(INFO, ProviderYt) << "WalkFolders next status: " << (TStatus::ELevel)status.Level;
+ return status;
+ }
+
+ IGraphTransformer::TStatus VisitInputKeys(TExprNode::TPtr& output,
+ TExprContext& ctx, std::function<TExprNode::TPtr(TYtRead node, TYtInputKeys&&)> processKeys, bool visitChanges = false) {
+ TOptimizeExprSettings settings(nullptr);
+ settings.VisitChanges = visitChanges;
+
+ const auto status = OptimizeExpr(output, output, [&](const TExprNode::TPtr& node, TExprContext& ctx) -> TExprNode::TPtr {
+ if (auto maybeRead = TMaybeNode<TYtRead>(node)) {
+ if (!maybeRead.DataSource()) { // Validates provider
+ return node;
+ }
+ auto read = maybeRead.Cast();
+ auto ds = read.DataSource();
+ if (!EnsureArgsCount(read.Ref(), 5, ctx)) {
+ return {};
+ }
+
+ TYtInputKeys keys;
+ auto& keysNode = read.Arg(2).Ref();
+ if (!keys.Parse(keysNode, ctx)) {
+ return {};
+ }
+
+ if (keys.IsProcessed()) {
+ // Already processed
+ return node;
+ }
+
+ if (keys.GetKeys().empty()) {
+ ctx.AddError(TIssue(ctx.GetPosition(read.Arg(2).Pos()), "The list of tables is empty"));
+ return {};
+ }
+ return processKeys(read, std::move(keys));
+ }
+ return node;
+ }, ctx, settings);
+ return status;
}
TCoCons BuildFolderTableResExpr(TExprContext& ctx, NYql::TPositionHandle pos, const TExprBase& world, const TExprNodePtr& folderList) {
@@ -771,14 +859,22 @@ private:
return res;
}
+ TMaybe<NThreading::TFuture<void>> MaybeGetWalkFoldersFuture() const {
+ // inflight 1
+ return !PendingWalkFolders_.empty()
+ ? MakeMaybe(PendingWalkFolders_.begin()->second.GetAnyOpFuture())
+ : Nothing();
+ }
+
private:
TYtState::TPtr State_;
THashMap<std::pair<TString, TYtKey::TRange>, std::pair<TPosition, NThreading::TFuture<IYtGateway::TTableRangeResult>>> PendingRanges_;
THashMap<std::pair<TString, TYtKey::TFolderList>, std::pair<TPosition, NThreading::TFuture<IYtGateway::TFolderResult>>> PendingFolders_;
THashMap<std::pair<TString, TString>, size_t> PendingCanonizations_; // cluster, original table path -> positions in canon result
+ THashMap<ui64, TWalkFoldersImpl> PendingWalkFolders_;
NThreading::TFuture<IYtGateway::TCanonizePathsResult> CanonizeFuture_;
- NThreading::TFuture<void> AllFuture_;
+ NThreading::TFuture<void> CanonizationRangesFoldersFuture_;
THashMap<TString, TString> FolderFileToAlias_;
};
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery_walk_folders.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery_walk_folders.cpp
new file mode 100644
index 0000000000..c907d630a4
--- /dev/null
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery_walk_folders.cpp
@@ -0,0 +1,582 @@
+#include "yql_yt_io_discovery_walk_folders.h"
+
+#include <ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.h>
+#include <ydb/library/yql/providers/yt/provider/yql_yt_gateway.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_pack.h>
+#include <ydb/library/yql/utils/log/log.h>
+
+#include <util/string/split.h>
+
+namespace NYql {
+using namespace NNodes;
+
+
+NNodes::TCoStructType
+BuildFolderItemStructType(TExprContext& ctx, NYql::TPositionHandle pos) {
+ return Build<TCoStructType>(ctx, pos)
+ .Add<TExprList>()
+ .Add<TCoAtom>()
+ .Value("Path")
+ .Build()
+ .Add<TCoDataType>()
+ .Type()
+ .Value("String")
+ .Build()
+ .Build()
+ .Build()
+ .Add<TExprList>()
+ .Add<TCoAtom>()
+ .Value("Type")
+ .Build()
+ .Add<TCoDataType>()
+ .Type()
+ .Value("String")
+ .Build()
+ .Build()
+ .Build()
+ .Add<TExprList>()
+ .Add<TCoAtom>()
+ .Value("Attributes")
+ .Build()
+ .Add<TCoDataType>()
+ .Type()
+ .Value("Yson")
+ .Build()
+ .Build()
+ .Build()
+ .Done();
+}
+
+TCoList
+BuildFolderListExpr(TExprContext& ctx, NYql::TPositionHandle pos, const TVector<NNodes::TExprBase>& folderItems) {
+ return Build<TCoList>(ctx, pos)
+ .ListType<TCoListType>()
+ .ItemType<TCoStructType>()
+ .InitFrom(BuildFolderItemStructType(ctx, pos))
+ .Build()
+ .Build()
+ .FreeArgs()
+ .Add(folderItems)
+ .Build()
+ .Build()
+ .Value();
+}
+
+TExprBase
+BuildFolderListItemExpr(TExprContext &ctx, NYql::TPositionHandle pos,
+ const TString &path, const TString &type,
+ const TString &attributesYson) {
+ return Build<TCoAsStruct>(ctx, pos)
+ .Add()
+ .Add<TCoAtom>()
+ .Value("Path")
+ .Build()
+ .Add<TCoString>()
+ .Literal()
+ .Value(path)
+ .Build()
+ .Build()
+ .Build()
+ .Add()
+ .Add<TCoAtom>()
+ .Value("Type")
+ .Build()
+ .Add<TCoString>()
+ .Literal()
+ .Value(type)
+ .Build()
+ .Build()
+ .Build()
+ .Add()
+ .Add<TCoAtom>()
+ .Value("Attributes")
+ .Build()
+ .Add<TCoYson>()
+ .Literal()
+ .Value(attributesYson)
+ .Build()
+ .Build()
+ .Build()
+ .Done();
+}
+
+TWalkFoldersImpl::TWalkFoldersImpl(const TString& sessionId, const TString& cluster, TYtSettings::TConstPtr config,
+ TPosition pos, TYtKey::TWalkFoldersArgs&& args, const IYtGateway::TPtr gateway):
+ Pos_(pos), SessionId_(sessionId), Cluster_(cluster), Config_(config), Gateway_(gateway) {
+
+ PreHandler_ = args.PreHandler->IsCallable("Void") ? Nothing() : MakeMaybe(args.PreHandler);
+ ResolveHandler_ = args.ResolveHandler;
+ DiveHandler_ = args.DiveHandler;
+ PostHandler_ = args.PostHandler->IsCallable("Void") ? Nothing() : MakeMaybe(args.PostHandler);
+
+ ProcessFoldersQueue_.emplace_back(TFolderQueueItem {
+ .Folder = args.InitialFolder,
+ });
+ IYtGateway::TBatchFolderOptions::TFolderPrefixAttrs folder {
+ std::move(args.InitialFolder.Prefix),
+ TSet<TString>(args.InitialFolder.Attributes.begin(), args.InitialFolder.Attributes.end())
+ };
+ DoFolderListOperation({folder});
+}
+
+TExprNode::TPtr TWalkFoldersImpl::GetNextStateExpr(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args) {
+ YQL_CLOG(INFO, ProviderYt) << "Current processing state: " << int(ProcessingState_);
+ switch (ProcessingState_) {
+ case WaitingListFolderOp: {
+ return AfterListFolderOp(ctx, args);
+ }
+ case PreHandling: {
+ return PreHandleVisitedInSingleFolder(ctx, args, ProcessFoldersQueue_.front());
+ }
+ case ResolveHandling: {
+ return ResolveHandleInSingleFolder(ctx, args, ProcessFoldersQueue_.front());
+ }
+ case AfterResolveHandling: {
+ return AfterResolveHandle(ctx, args, ProcessFoldersQueue_.front());
+ }
+ case WaitingResolveLinkOp: {
+ return HandleAfterResolveFuture(ctx, args, ProcessFoldersQueue_.front());
+ }
+ case DiveHandling: {
+ return DiveHandleInSingleFolder(ctx, args, ProcessFoldersQueue_.front());
+ }
+ case AfterDiveHandling: {
+ return AfterDiveHandle(ctx, args, ProcessFoldersQueue_.front());
+ }
+ case PostHandling: {
+ return PostHandleVisitedInSingleFolder(ctx, args, ProcessFoldersQueue_.front());
+ }
+ case FinishingHandling: {
+ return BuildFinishedState(ctx, args);
+ }
+ case FinishedHandling: {
+ }
+ }
+ return args.UserStateExpr;
+}
+
+void TWalkFoldersImpl::DoFolderListOperation(TVector<IYtGateway::TBatchFolderOptions::TFolderPrefixAttrs>&& folders) {
+ YQL_CLOG(INFO, ProviderYt) << "Sending folder list batch with " << folders.size() << " items";
+ auto options = IYtGateway::TBatchFolderOptions(SessionId_)
+ .Pos(Pos_)
+ .Cluster(Cluster_)
+ .Config(Config_)
+ .Folders(folders);
+ BatchFolderListFuture_ = Gateway_->GetFolders(std::move(options));
+}
+
+TExprNode::TPtr TWalkFoldersImpl::EvaluateNextUserStateExpr(TExprContext& ctx, const TExprNode::TPtr& userStateType, const TExprNode::TPtr userStateExpr, std::function<TExprNode::TPtr(const NNodes::TExprBase&)> nextStateFunc) {
+
+ const auto userStateUnpickled = Build<TCoUnpickle>(ctx, PosHandle_)
+ .Type(userStateType)
+ .Buffer(userStateExpr)
+ .Build();
+
+ const auto nextUserStatePickled = Build<TCoPickle>(ctx, PosHandle_)
+ .Value(nextStateFunc(userStateUnpickled.Value()))
+ .Build()
+ .Value()
+ .Ptr();
+
+ ctx.Step.Repeat(TExprStep::ExprEval);
+
+ YQL_CLOG(TRACE, ProviderYt) << "WalkFolders - next evaluate ast: " << ConvertToAst(*nextUserStatePickled, ctx, {}).Root->ToString();
+
+ return ctx.Builder(PosHandle_)
+ .Callable("EvaluateExpr")
+ .Add(0, nextUserStatePickled)
+ .Seal()
+ .Build();
+}
+
+TExprNode::TPtr TWalkFoldersImpl::AfterListFolderOp(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args) {
+ if (!BatchFolderListFuture_) {
+ YQL_CLOG(INFO, ProviderYt) << "Folder queue is empty, finishing WalkFolders with key: " << args.StateKey;
+ ProcessingState_ = FinishingHandling;
+ return GetNextStateExpr(ctx, args);
+ } else {
+ if (!BatchFolderListFuture_->HasValue()) {
+ YQL_CLOG(INFO, ProviderYt) << "Batch list future is not ready";
+ return args.UserStateExpr;
+ }
+
+ Y_ENSURE(!ProcessFoldersQueue_.empty(), "Got future result for Yt List but no folder in queue");
+ auto folderListVal = BatchFolderListFuture_->ExtractValue();
+ if (folderListVal.Success()) {
+ auto& folder = ProcessFoldersQueue_.front();
+ YQL_CLOG(INFO, ProviderYt) << "Got " << folderListVal.Items.size() << " results for list op at `" << folder.Folder.Prefix << "`";
+ folder.ItemsToPreHandle = std::move(folderListVal.Items);
+ folder.PreHandleItemsFetched = true;
+ ProcessingState_ = PreHandling;
+ } else {
+ folderListVal.ReportIssues(ctx.IssueManager);
+ }
+
+ BatchFolderListFuture_ = Nothing();
+ }
+ return PreHandleVisitedInSingleFolder(ctx, args, ProcessFoldersQueue_.front());
+}
+
+TExprNode::TPtr TWalkFoldersImpl::PreHandleVisitedInSingleFolder(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args, TFolderQueueItem& folder) {
+ YQL_CLOG(INFO, ProviderYt) << "Processing preHandler at " << folder.Folder.Prefix
+ << " for WalkFolders with key: " << args.StateKey;
+
+ if (!folder.PreHandleItemsFetched) {
+ YQL_CLOG(INFO, ProviderYt) << "Waiting for folder list: `" << folder.Folder.Prefix << "`";
+ ProcessingState_ = WaitingListFolderOp;
+ return GetNextStateExpr(ctx, args);
+ }
+
+ if (folder.ItemsToPreHandle.empty()) {
+ YQL_CLOG(INFO, ProviderYt) << "Items to preHandle are empty, skipping " << folder.Folder.Prefix;
+ ProcessFoldersQueue_.pop_front();
+ ProcessingState_ = WaitingListFolderOp;
+ return GetNextStateExpr(ctx, args);
+ }
+
+ TVector<TExprBase> folderListItems;
+ for (auto&& item : folder.ItemsToPreHandle) {
+ if (PreHandler_) {
+ folderListItems.push_back(
+ BuildFolderListItemExpr(ctx, PosHandle_, item.Path,item.Type,
+ NYT::NodeToYsonString(item.Attributes)));
+ }
+
+ if (item.Type == "link") {
+ folder.LinksToResolveHandle.emplace_back(std::move(item));
+ } else if (item.Type == "map_node") {
+ folder.ItemsToDiveHandle.emplace_back(std::move(item));
+ } else {
+ folder.ItemsToPostHandle.emplace_back(std::move(item));
+ }
+ }
+
+ if (!PreHandler_) {
+ YQL_CLOG(INFO, ProviderYt) << "No preHandler defined, skipping for WalkFolders with key: " << args.StateKey;
+ ProcessingState_ = ResolveHandling;
+ return GetNextStateExpr(ctx, args);
+ }
+
+ const auto folderListExpr = BuildFolderListExpr(ctx, PosHandle_, folderListItems);
+
+ const auto makeNextUserState = [&] (const TExprBase& userStateUnpickled) {
+ return Build<TCoApply>(ctx, PosHandle_)
+ .Callable(PreHandler_.GetRef())
+ .FreeArgs()
+ .Add(folderListExpr)
+ .Add(userStateUnpickled)
+ .Add<TCoInt64>()
+ .Literal()
+ .Value(ToString(folder.Level))
+ .Build()
+ .Build()
+ .Build()
+ .Build()
+ .Value()
+ .Ptr();
+ };
+
+ ProcessingState_ = ResolveHandling;
+ return EvaluateNextUserStateExpr(ctx, args.UserStateType, args.UserStateExpr, makeNextUserState);
+}
+
+TExprNode::TPtr TWalkFoldersImpl::ResolveHandleInSingleFolder(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args, TFolderQueueItem& folder) {
+ YQL_CLOG(INFO, ProviderYt) << "Processing resolveHandler at " << folder.Folder.Prefix
+ << "for WalkFolders with key: " << args.StateKey;
+ ProcessingState_ = AfterResolveHandling;
+ return BuildDiveOrResolveHandlerEval(ctx, args, ResolveHandler_.GetRef(), folder.LinksToResolveHandle, folder.Folder.Attributes, folder.Level);
+}
+
+TExprNode::TPtr TWalkFoldersImpl::BuildDiveOrResolveHandlerEval(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args, TExprNode::TPtr& handler,
+ const TVector<IYtGateway::TBatchFolderResult::TFolderItem>& res, const TVector<TString>& attributes, ui64 level) {
+ using namespace NNodes;
+
+ TVector<TExprBase> items;
+ items.reserve(res.size());
+ for (auto& link : res) {
+ auto itemsExpr =
+ BuildFolderListItemExpr(ctx, PosHandle_,
+ link.Path, link.Type, NYT::NodeToYsonString(link.Attributes));
+ items.push_back(itemsExpr);
+ }
+ const auto itemsNode = BuildFolderListExpr(ctx, PosHandle_, items);
+
+ TVector<TExprBase> attributeExprs;
+ for (auto& attr : attributes) {
+ const auto attributeExpr = Build<TCoString>(ctx, PosHandle_)
+ .Literal()
+ .Value(attr)
+ .Build()
+ .Build()
+ .Value();
+ attributeExprs.push_back(attributeExpr);
+ }
+
+ const auto userStateUnpickled = Build<TCoUnpickle>(ctx, PosHandle_)
+ .Type(args.UserStateType)
+ .Buffer(args.UserStateExpr)
+ .DoBuild();
+
+ const auto handlerResult = Build<TCoApply>(ctx, PosHandle_)
+ .Callable(handler)
+ .FreeArgs()
+ .Add(itemsNode)
+ .Add(userStateUnpickled)
+ .Add<TCoAsList>()
+ .Add(attributeExprs)
+ .Build()
+ .Add<TCoInt64>()
+ .Literal()
+ .Value(ToString(level))
+ .Build()
+ .Build()
+ .Build()
+ .Build()
+ .Value()
+ .Ptr();
+
+ auto resolveHandlerResPickled = ctx.Builder(PosHandle_)
+ .Callable("StaticMap")
+ .Add(0, handlerResult)
+ .Lambda(1)
+ .Param("item")
+ .Callable("Pickle")
+ .Arg(0, "item")
+ .Seal()
+ .Seal()
+ .Seal()
+ .Build();
+
+ ctx.Step.Repeat(TExprStep::ExprEval);
+ return ctx.Builder(PosHandle_)
+ .Callable("EvaluateExpr")
+ .Add(0, resolveHandlerResPickled)
+ .Seal()
+ .Build();
+}
+
+void ParseNameAttributesPickledList(TStringBuf pickledTupleList, std::function<void(TString&&, TSet<TString>)> handleParsedNameAndAttrs) {
+ using namespace NKikimr::NMiniKQL;
+
+ TScopedAlloc alloc(__LOCATION__);
+ TTypeEnvironment env(alloc);
+ TMemoryUsageInfo memInfo("Yt WalkFolders");
+ THolderFactory holderFactory(alloc.Ref(), memInfo);
+
+ TSmallVec<TType*> nodeToResolveWithAttrListTypes;
+ auto stringType = TDataType::Create(NUdf::TDataType<char*>::Id, env);
+ nodeToResolveWithAttrListTypes.push_back(stringType);
+ nodeToResolveWithAttrListTypes.push_back(TListType::Create(stringType, env));
+
+ auto nodeToResolveTuple = TTupleType::Create(2, nodeToResolveWithAttrListTypes.data(), env);
+ TValuePacker packer(false, TListType::Create(nodeToResolveTuple, env));
+ auto parsedList = packer.Unpack(pickledTupleList, holderFactory);
+
+ YQL_CLOG(INFO, ProviderYt) << "Parsing list with length: " << parsedList.GetListLength();
+
+ for (size_t i = 0; i < parsedList.GetListLength(); ++i) {
+ const auto requestedTuple = parsedList.GetElement(i);
+ const auto nameEl = requestedTuple.GetElement(0);
+ const auto name = nameEl.AsStringRef();
+ YQL_CLOG(INFO, ProviderYt) << "Parsed dive or resolve item name: " << name;
+
+ auto requestedAttrsVal = requestedTuple.GetElement(1);
+ TSet<TString> attrs;
+ for (size_t j = 0; j < requestedAttrsVal.GetListLength(); ++j) {
+ const auto attrEl = requestedAttrsVal.GetElement(j);
+ YQL_CLOG(INFO, ProviderYt) << "Parsed requested attribute: " << attrEl.AsStringRef();
+ attrs.insert(TString(attrEl.AsStringRef()));
+ }
+ handleParsedNameAndAttrs(TString(name), attrs);
+ }
+}
+
+TExprNode::TPtr TWalkFoldersImpl::AfterResolveHandle(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args, TFolderQueueItem& folder) {
+ EnsureTupleSize(*args.UserStateExpr, 2, ctx);
+ YQL_CLOG(INFO, ProviderYt) << "After resolveHandler EvaluateExpr";
+
+ TCoString pickledLinksToResolve(args.UserStateExpr->Child(0));
+ THashMap<TString, TSet<TString>> nameAndRequestedAttrs;
+ ParseNameAttributesPickledList(pickledLinksToResolve.Literal().StringValue(),
+ [&nameAndRequestedAttrs] (TString name, TSet<TString> attrs) {
+ nameAndRequestedAttrs[name] = std::move(attrs);
+ });
+
+ TVector<IYtGateway::TResolveOptions::TItemWithReqAttrs> links;
+ links.reserve(nameAndRequestedAttrs.size());
+ for (auto&& linkToResolve : folder.LinksToResolveHandle) {
+ auto it = nameAndRequestedAttrs.find(linkToResolve.Path);
+ if (it == nameAndRequestedAttrs.end()) {
+ continue;
+ }
+
+ IYtGateway::TResolveOptions::TItemWithReqAttrs link {
+ .Item = std::move(linkToResolve),
+ .AttrKeys = std::move(it->second),
+ };
+ links.emplace_back(std::move(link));
+ }
+
+ if (links.empty()) {
+ YQL_CLOG(INFO, ProviderYt) << "Links to visit are empty";
+ ProcessingState_ = DiveHandling;
+ return GetNextStateExpr(ctx, {.UserStateExpr = args.UserStateExpr->Child(1), .UserStateType = args.UserStateType, .StateKey = args.StateKey});
+ }
+
+ ProcessingState_ = WaitingResolveLinkOp;
+ auto options = IYtGateway::TResolveOptions(SessionId_)
+ .Pos(Pos_)
+ .Cluster(Cluster_)
+ .Config(Config_)
+ .Items(links);
+ BatchResolveFuture_ = Gateway_->ResolveLinks(std::move(options));
+
+ return args.UserStateExpr->Child(1);
+}
+
+TExprNode::TPtr TWalkFoldersImpl::HandleAfterResolveFuture(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args, TFolderQueueItem& folder) {
+ YQL_CLOG(INFO, ProviderYt) << "After resolve future result";
+
+ if (!BatchResolveFuture_) {
+ YQL_CLOG(WARN, ProviderYt) << "Resolve future not set";
+ return nullptr;
+ }
+ if (!BatchResolveFuture_->HasValue() && !BatchResolveFuture_->HasException()) {
+ YQL_CLOG(INFO, ProviderYt) << "Batch resolve future is not ready";
+ return args.UserStateExpr;
+ }
+
+ auto res = BatchResolveFuture_->ExtractValue();
+ BatchResolveFuture_ = Nothing();
+ YQL_CLOG(INFO, ProviderYt) << "Added items to handle after batch resolve future completion";
+
+ for (auto&& node : res.Items) {
+ if (node.Type == "map_node") {
+ folder.ItemsToDiveHandle.emplace_back(std::move(node));
+ } else {
+ folder.ItemsToPostHandle.emplace_back(std::move(node));
+ }
+ }
+
+ ProcessingState_ = DiveHandling;
+ return GetNextStateExpr(ctx, args);
+}
+
+TExprNode::TPtr TWalkFoldersImpl::DiveHandleInSingleFolder(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args, TFolderQueueItem& folder) {
+ YQL_CLOG(INFO, ProviderYt) << "Processing diveHandler at " << folder.Folder.Prefix
+ << " for WalkFolders with key: " << args.StateKey;
+ ProcessingState_ = AfterDiveHandling;
+ return BuildDiveOrResolveHandlerEval(ctx, args, DiveHandler_.GetRef(), folder.ItemsToDiveHandle, folder.Folder.Attributes, folder.Level);
+}
+
+TExprNode::TPtr TWalkFoldersImpl::AfterDiveHandle(TExprContext& ctx, TYtKey::TWalkFoldersImplArgs args, TFolderQueueItem& folder) {
+ using namespace NKikimr::NMiniKQL;
+
+ EnsureTupleSize(*args.UserStateExpr, 2, ctx);
+ YQL_CLOG(INFO, ProviderYt) << "After diveHandler EvaluateExpr";
+
+ TVector<IYtGateway::TBatchFolderOptions::TFolderPrefixAttrs> diveItems;
+ TCoString pickledLinksToResolve(args.UserStateExpr->Child(0));
+ THashMap<TString, TSet<TString>> nameAndRequestedAttrs;
+ ParseNameAttributesPickledList(pickledLinksToResolve.Literal().StringValue(),
+ [&queue=ProcessFoldersQueue_, &diveItems, nextLevel = folder.Level + 1] (TString path, TSet<TString> attrs) {
+ diveItems.push_back({.Prefix = path, .AttrKeys = attrs});
+ queue.push_back({
+ .Folder = {.Prefix = std::move(path), .Attributes = TVector<TString>(attrs.begin(), attrs.end())},
+ .Level = nextLevel,
+ });
+ });
+
+ folder.ItemsToPostHandle.insert(folder.ItemsToPostHandle.end(),
+ std::make_move_iterator(folder.ItemsToDiveHandle.begin()),
+ std::make_move_iterator(folder.ItemsToDiveHandle.end()));
+ folder.ItemsToDiveHandle.clear();
+
+ args.UserStateExpr = args.UserStateExpr->Child(1);
+ ProcessingState_ = PostHandling;
+
+ if (diveItems.empty()) {
+ YQL_CLOG(INFO, ProviderYt) << "Nodes to dive are empty";
+ return GetNextStateExpr(ctx, args);
+ }
+
+ auto options = IYtGateway::TBatchFolderOptions(SessionId_)
+ .Pos(Pos_)
+ .Cluster(Cluster_)
+ .Config(Config_)
+ .Folders(diveItems);
+ Y_ENSURE(!BatchFolderListFuture_, "Single inflight batch folder request allowed");
+ BatchFolderListFuture_ = Gateway_->GetFolders(std::move(options));
+
+ return GetNextStateExpr(ctx, args);
+}
+
+TExprNode::TPtr TWalkFoldersImpl::PostHandleVisitedInSingleFolder(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args, TFolderQueueItem& folder) {
+ if (!PostHandler_) {
+ YQL_CLOG(INFO, ProviderYt) << "No postHandler defined, skipping for WalkFolders with key: " << args.StateKey;
+ ProcessingState_ = WaitingListFolderOp;
+ return args.UserStateExpr;
+ }
+
+ YQL_CLOG(INFO, ProviderYt) << "Processing postHandler at " << folder.Folder.Prefix
+ << " for WalkFolders with key: " << args.StateKey;
+
+ TVector<TExprBase> folderListItems;
+ for (auto&& item : folder.ItemsToPostHandle) {
+ folderListItems.push_back(
+ BuildFolderListItemExpr(ctx,
+ PosHandle_,
+ item.Path,
+ item.Type,
+ NYT::NodeToYsonString(item.Attributes)));
+
+ }
+
+ const auto folderListExpr = BuildFolderListExpr(ctx, PosHandle_, folderListItems);
+
+ const auto makeNextUserState = [&] (const TExprBase& userStateUnpickled) {
+ return Build<TCoApply>(ctx, PosHandle_)
+ .Callable(PostHandler_.GetRef())
+ .FreeArgs()
+ .Add(folderListExpr)
+ .Add(userStateUnpickled)
+ .Add<TCoInt64>()
+ .Literal()
+ .Value(ToString(folder.Level))
+ .Build()
+ .Build()
+ .Build()
+ .Build()
+ .Value()
+ .Ptr();
+ };
+
+ ProcessingState_ = WaitingListFolderOp;
+
+ ProcessFoldersQueue_.pop_front();
+ return EvaluateNextUserStateExpr(ctx, args.UserStateType, args.UserStateExpr, makeNextUserState);
+}
+
+
+TExprNode::TPtr TWalkFoldersImpl::BuildFinishedState(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args) {
+ // TODO: Dump large user state to file
+ // const auto dataLen = args.UserStateExpr->IsCallable("String")
+ // ? TCoString(args.UserStateExpr).Literal().StringValue().Size()
+ // : 0;
+
+ const auto userStateUnpickled = Build<TCoUnpickle>(ctx, PosHandle_)
+ .Type(args.UserStateType)
+ .Buffer(args.UserStateExpr)
+ .DoBuild()
+ .Ptr();
+
+ ctx.Step.Repeat(TExprStep::ExprEval);
+ ProcessingState_ = FinishedHandling;
+
+ return ctx.Builder(PosHandle_)
+ .Callable("EvaluateExpr")
+ .Add(0, userStateUnpickled)
+ .Seal()
+ .Build();
+}
+} // namespace NYql
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery_walk_folders.h b/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery_walk_folders.h
new file mode 100644
index 0000000000..281d3a0b18
--- /dev/null
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery_walk_folders.h
@@ -0,0 +1,126 @@
+#include <ydb/library/yql/providers/yt/provider/yql_yt_gateway.h>
+#include <ydb/library/yql/providers/yt/provider/yql_yt_key.h>
+
+#include <library/cpp/yson/node/node_io.h>
+#include <library/cpp/threading/future/core/future.h>
+
+namespace NYql {
+NNodes::TExprBase
+BuildFolderListItemExpr(TExprContext &ctx, NYql::TPositionHandle pos,
+ const TString &path, const TString &type,
+ const TString &attributesYson);
+
+NNodes::TCoList
+BuildFolderListExpr(TExprContext& ctx, NYql::TPositionHandle pos,
+ const TVector<NNodes::TExprBase>& folderItems);
+
+NNodes::TCoStructType
+BuildFolderItemStructType(TExprContext& ctx, NYql::TPositionHandle pos);
+
+class TWalkFoldersImpl {
+public:
+ TWalkFoldersImpl(const TString& sessionId, const TString& cluster, TYtSettings::TConstPtr config,
+ TPosition pos, TYtKey::TWalkFoldersArgs&& args, const IYtGateway::TPtr gateway);
+
+ TExprNode::TPtr GetNextStateExpr(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args);
+
+ enum EProcessingState {
+ WaitingListFolderOp,
+ PreHandling,
+ ResolveHandling,
+ AfterResolveHandling,
+ WaitingResolveLinkOp,
+ DiveHandling,
+ AfterDiveHandling,
+ PostHandling,
+ FinishingHandling,
+ FinishedHandling
+ };
+
+ EProcessingState GetProcessingState() const {
+ return ProcessingState_;
+ }
+
+ bool IsFinished() const {
+ return ProcessingState_ == FinishedHandling;
+ }
+
+ NThreading::TFuture<void> GetAnyOpFuture() const {
+ TVector<NThreading::TFuture<void>> futures;
+ if (BatchFolderListFuture_ && BatchFolderListFuture_->Initialized()) {
+ futures.push_back(BatchFolderListFuture_->IgnoreResult());
+ }
+ if (BatchResolveFuture_&& BatchResolveFuture_->Initialized()) {
+ futures.push_back(BatchResolveFuture_->IgnoreResult());
+ }
+ return NThreading::WaitAny(futures);
+ }
+
+ TWalkFoldersImpl& operator=(const TWalkFoldersImpl&) = delete;
+ TWalkFoldersImpl(const TWalkFoldersImpl&) = delete;
+
+ TWalkFoldersImpl(TWalkFoldersImpl&&) = default;
+ TWalkFoldersImpl& operator=(TWalkFoldersImpl&&) = default;
+
+private:
+ static constexpr size_t LARGE_USER_STATE = 8192;
+
+ TPosition Pos_;
+ TPositionHandle PosHandle_;
+
+ TMaybe<TExprNode::TPtr> PreHandler_;
+ TMaybe<TExprNode::TPtr> ResolveHandler_;
+ TMaybe<TExprNode::TPtr> DiveHandler_;
+ TMaybe<TExprNode::TPtr> PostHandler_;
+
+ struct TFolderQueueItem {
+ TYtKey::TFolderList Folder;
+
+ bool PreHandleItemsFetched = false;
+
+ TVector<IYtGateway::TBatchFolderResult::TFolderItem> ItemsToPreHandle;
+ TVector<IYtGateway::TBatchFolderResult::TFolderItem> LinksToResolveHandle;
+ TVector<IYtGateway::TBatchFolderResult::TFolderItem> ItemsToDiveHandle;
+ TVector<IYtGateway::TBatchFolderResult::TFolderItem> ItemsToPostHandle;
+
+ ui64 Level = 0;
+ };
+ TDeque<TFolderQueueItem> ProcessFoldersQueue_;
+
+ EProcessingState ProcessingState_ = WaitingListFolderOp;
+
+ TString SessionId_;
+ TString Cluster_;
+ TYtSettings::TConstPtr Config_;
+
+ IYtGateway::TPtr Gateway_;
+
+ TMaybe<NThreading::TFuture<IYtGateway::TBatchFolderResult>> BatchFolderListFuture_;
+ TMaybe<NThreading::TFuture<IYtGateway::TBatchFolderResult>> BatchResolveFuture_;
+
+ void DoFolderListOperation(TVector<IYtGateway::TBatchFolderOptions::TFolderPrefixAttrs>&& folders);
+
+ TExprNode::TPtr EvaluateNextUserStateExpr(TExprContext& ctx, const TExprNode::TPtr& userStateType, const TExprNode::TPtr userStateExpr, std::function<TExprNode::TPtr(const NNodes::TExprBase&)> nextStateFunc);
+
+ TExprNode::TPtr AfterListFolderOp(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args);
+
+ TExprNode::TPtr PreHandleVisitedInSingleFolder(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args, TFolderQueueItem& folder);
+
+ TExprNode::TPtr ResolveHandleInSingleFolder(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args, TFolderQueueItem& folder);
+
+ TExprNode::TPtr BuildDiveOrResolveHandlerEval(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args, TExprNode::TPtr& handler,
+ const TVector<IYtGateway::TBatchFolderResult::TFolderItem>& res, const TVector<TString>& attributes, ui64 level);
+
+ TExprNode::TPtr AfterResolveHandle(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args, TFolderQueueItem& folder);
+
+ TExprNode::TPtr HandleAfterResolveFuture(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args, TFolderQueueItem& folder);
+
+ TExprNode::TPtr DiveHandleInSingleFolder(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args, TFolderQueueItem& folder);
+
+ TExprNode::TPtr AfterDiveHandle(TExprContext& ctx, TYtKey::TWalkFoldersImplArgs args, TFolderQueueItem& folder);
+
+ TExprNode::TPtr PostHandleVisitedInSingleFolder(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args, TFolderQueueItem& folder);
+
+ TExprNode::TPtr BuildFinishedState(TExprContext& ctx, const TYtKey::TWalkFoldersImplArgs& args);
+};
+} // namespace NYql
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_key.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_key.cpp
index a60603cb43..ab78d45824 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_key.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_key.cpp
@@ -1,6 +1,8 @@
#include "yql_yt_key.h"
+#include <library/cpp/yson/node/node_io.h>
#include <ydb/library/yql/providers/yt/common/yql_names.h>
+#include <ydb/library/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.h>
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
#include <util/string/builder.h>
@@ -16,6 +18,8 @@ THashSet<TStringBuf> EXT_KEY_CALLABLES = {
TStringBuf("Key"),
TStringBuf("TempTable"),
MrFolderName,
+ MrWalkFoldersName,
+ MrWalkFoldersImplName
};
THashSet<TStringBuf> KEY_CALLABLES = {
@@ -26,6 +30,7 @@ THashSet<TStringBuf> KEY_CALLABLES = {
}
bool TYtKey::Parse(const TExprNode& key, TExprContext& ctx) {
+ using namespace NNodes;
if (!key.IsCallable(EXT_KEY_CALLABLES)) {
ctx.AddError(TIssue(ctx.GetPosition(key.Pos()), TStringBuf("Expected key")));
return false;
@@ -63,6 +68,47 @@ bool TYtKey::Parse(const TExprNode& key, TExprContext& ctx) {
" and may have second tag - view")));
return false;
}
+ else if (const auto maybeWalkFolders = TMaybeNode<TYtWalkFolders>(&key)) {
+ Type = EType::WalkFolders;
+ const auto walkFolders = maybeWalkFolders.Cast();
+
+ TFolderList initialListFolder;
+ initialListFolder.Prefix = walkFolders.Prefix();
+ Split(TString(walkFolders.Attributes().StringValue()), ";", initialListFolder.Attributes);
+
+ WalkFolderArgs = MakeMaybe(TWalkFoldersArgs{
+ .InitialFolder = std::move(initialListFolder),
+ .PickledUserState = walkFolders.PickledUserState().Ptr(),
+ .UserStateType = walkFolders.UserStateType().Ptr(),
+ .PreHandler = walkFolders.PreHandler().Ptr(),
+ .ResolveHandler = walkFolders.ResolveHandler().Ptr(),
+ .DiveHandler = walkFolders.DiveHandler().Ptr(),
+ .PostHandler = walkFolders.PostHandler().Ptr(),
+ .StateKey = walkFolders.Ref().UniqueId(),
+ });
+
+ return true;
+ }
+ else if (const auto maybeWalkFolders = TMaybeNode<TYtWalkFoldersImpl>(&key)) {
+ Type = EType::WalkFoldersImpl;
+ const auto walkFolders = maybeWalkFolders.Cast();
+
+ ui64 stateKey;
+ if (!TryFromString(walkFolders.ProcessStateKey().StringValue(), stateKey)) {
+ ctx.AddError(TIssue(ctx.GetPosition(key.Pos()),
+ TStringBuilder() << MrWalkFoldersImplName << ": incorrect format of state map key"));
+ return false;
+ }
+
+ WalkFolderImplArgs = MakeMaybe(TWalkFoldersImplArgs{
+ .UserStateExpr = walkFolders.PickledUserState().Ptr(),
+ .UserStateType = walkFolders.UserStateType().Ptr(),
+ .StateKey = stateKey,
+ });
+
+ Type = EType::WalkFoldersImpl;
+ return true;
+ }
auto tagName = key.Child(0)->Child(0)->Content();
if (tagName == TStringBuf("table")) {
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_key.h b/ydb/library/yql/providers/yt/provider/yql_yt_key.h
index 14c17479ad..f1381900d7 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_key.h
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_key.h
@@ -19,6 +19,8 @@ public:
Table,
TableScheme,
Folder,
+ WalkFolders,
+ WalkFoldersImpl,
};
struct TRange {
@@ -44,6 +46,27 @@ public:
&& left.Attributes == right.Attributes;
}
};
+
+ struct TWalkFoldersArgs {
+ TFolderList InitialFolder;
+
+ TExprNode::TPtr PickledUserState;
+ TExprNode::TPtr UserStateType;
+
+ TExprNode::TPtr PreHandler;
+ TExprNode::TPtr ResolveHandler;
+ TExprNode::TPtr DiveHandler;
+ TExprNode::TPtr PostHandler;
+
+ ui64 StateKey;
+ };
+
+ struct TWalkFoldersImplArgs {
+ TExprNode::TPtr UserStateExpr;
+ TExprNode::TPtr UserStateType;
+
+ ui64 StateKey;
+ };
public:
TYtKey() {
@@ -81,6 +104,16 @@ public:
return Folder;
}
+ TMaybe<TWalkFoldersArgs>& GetWalkFolderArgs() {
+ YQL_ENSURE(Type != EType::Undefined);
+ return WalkFolderArgs;
+ }
+
+ TMaybe<TWalkFoldersImplArgs>& GetWalkFolderImplArgs() {
+ YQL_ENSURE(Type != EType::Undefined);
+ return WalkFolderImplArgs;
+ }
+
bool Parse(const TExprNode& key, TExprContext& ctx);
private:
@@ -91,6 +124,8 @@ private:
bool Anonymous = false;
TMaybe<TRange> Range;
TMaybe<TFolderList> Folder;
+ TMaybe<TWalkFoldersArgs> WalkFolderArgs;
+ TMaybe<TWalkFoldersImplArgs> WalkFolderImplArgs;
};
class TYtInputKeys {
@@ -106,6 +141,10 @@ public:
return Keys;
}
+ TVector<TYtKey>&& ExtractKeys() {
+ return std::move(Keys);
+ }
+
bool IsProcessed() const {
return HasNonKeys;
}
diff --git a/ydb/library/yql/sql/v1/context.h b/ydb/library/yql/sql/v1/context.h
index a1e531b587..5120d49f09 100644
--- a/ydb/library/yql/sql/v1/context.h
+++ b/ydb/library/yql/sql/v1/context.h
@@ -235,6 +235,7 @@ namespace NSQLTranslationV1 {
TMap<TString, TNodePtr> UniversalAliases;
THashSet<TString> Exports;
THashMap<TString, TString> ImportModuleAliases;
+ THashMap<TString, TString> RequiredModules;
TMap<TString, TString> SimpleUdfs;
NSQLTranslation::TIncrementMonCounterFunction IncrementMonCounterFunction;
TScopedStatePtr Scoped;
diff --git a/ydb/library/yql/sql/v1/query.cpp b/ydb/library/yql/sql/v1/query.cpp
index 8d2147ca98..c0414b919e 100644
--- a/ydb/library/yql/sql/v1/query.cpp
+++ b/ydb/library/yql/sql/v1/query.cpp
@@ -241,7 +241,7 @@ public:
}
TCiString func(Func);
- if (func != "object") {
+ if (func != "object" && func != "walkfolders") {
for (auto& arg: Args) {
if (arg.Expr->GetLabel()) {
ctx.Error(Pos) << "Named arguments are not supported for table function " << to_upper(Func);
@@ -480,6 +480,115 @@ public:
folder = L(folder, Args.size() > 1 ? Args[1].Id.Build() : BuildQuotedAtom(Pos, ""));
return folder;
}
+ else if (func == "walkfolders") {
+ const size_t minPositionalArgs = 1;
+ const size_t maxPositionalArgs = 2;
+
+ size_t positionalArgsCnt = 0;
+ for (const auto& arg : Args) {
+ if (!arg.Expr->GetLabel()) {
+ positionalArgsCnt++;
+ } else {
+ break;
+ }
+ }
+ if (positionalArgsCnt < minPositionalArgs || positionalArgsCnt > maxPositionalArgs) {
+ ctx.Error(Pos) << Func << " requires from " << minPositionalArgs
+ << " to " << maxPositionalArgs
+ << " positional arguments, but got: " << positionalArgsCnt;
+ return nullptr;
+ }
+
+ constexpr auto walkFoldersModuleName = "walk_folders_module";
+ ctx.RequiredModules.emplace(walkFoldersModuleName, "/lib/yql/walk_folders.yql");
+
+ auto& rootFolderArg = Args[0];
+ if (rootFolderArg.HasAt) {
+ ctx.Error(Pos) << "Temporary tables are not supported here";
+ return nullptr;
+ }
+ if (!rootFolderArg.View.empty()) {
+ ctx.Error(Pos) << Func << " doesn't supports views";
+ return nullptr;
+ }
+ ExtractTableName(ctx, rootFolderArg);
+
+ const auto initState =
+ positionalArgsCnt > 1
+ ? Args[1].Expr
+ : Y("List", Y("ListType", Y("DataType", Q("String"))));
+
+ TNodePtr rootAttributes;
+ TNodePtr preHandler;
+ TNodePtr resolveHandler;
+ TNodePtr diveHandler;
+ TNodePtr postHandler;
+ for (auto it = Args.begin() + positionalArgsCnt; it != Args.end(); ++it) {
+ auto& arg = *it;
+ const auto label = arg.Expr->GetLabel();
+ if (label == "RootAttributes") {
+ ExtractTableName(ctx, arg);
+ rootAttributes = arg.Id.Build();
+ }
+ else if (label == "PreHandler") {
+ preHandler = arg.Expr;
+ }
+ else if (label == "ResolveHandler") {
+ resolveHandler = arg.Expr;
+ }
+ else if (label == "DiveHandler") {
+ diveHandler = arg.Expr;
+ }
+ else if (label == "PostHandler") {
+ postHandler = arg.Expr;
+ }
+ else {
+ ctx.Warning(Pos, DEFAULT_ERROR) << "Unsupported named argument: "
+ << label << " in " << Func;
+ }
+ }
+ if (rootAttributes == nullptr) {
+ rootAttributes = BuildQuotedAtom(Pos, "");
+ }
+
+ if (preHandler != nullptr || postHandler != nullptr) {
+ const auto makePrePostHandlerType = BuildBind(Pos, walkFoldersModuleName, "MakePrePostHandlersType");
+ const auto prePostHandlerType = Y("EvaluateType", Y("TypeHandle", Y("Apply", makePrePostHandlerType, Y("TypeOf", initState))));
+
+ if (preHandler != nullptr) {
+ preHandler = Y("Callable", prePostHandlerType, preHandler);
+ }
+ if (postHandler != nullptr) {
+ postHandler = Y("Callable", prePostHandlerType, postHandler);
+ }
+ }
+ if (preHandler == nullptr) {
+ preHandler = Y("Void");
+ }
+ if (postHandler == nullptr) {
+ postHandler = Y("Void");
+ }
+
+ const auto makeResolveDiveHandlerType = BuildBind(Pos, walkFoldersModuleName, "MakeResolveDiveHandlersType");
+ const auto resolveDiveHandlerType = Y("EvaluateType", Y("TypeHandle", Y("Apply", makeResolveDiveHandlerType, Y("TypeOf", initState))));
+ if (resolveHandler == nullptr) {
+ resolveHandler = BuildBind(Pos, walkFoldersModuleName, "AnyNodeDiveHandler");
+ }
+ if (diveHandler == nullptr) {
+ diveHandler = BuildBind(Pos, walkFoldersModuleName, "AnyNodeDiveHandler");
+ }
+
+ resolveHandler = Y("Callable", resolveDiveHandlerType, resolveHandler);
+ diveHandler = Y("Callable", resolveDiveHandlerType, diveHandler);
+
+ const auto initStateType = Y("EvaluateType", Y("TypeHandle", Y("TypeOf", initState)));
+ const auto pickledInitState = Y("Pickle", initState);
+
+ const auto initPath = rootFolderArg.Id.Build();
+
+ return Y("MrWalkFolders", initPath, rootAttributes, pickledInitState, initStateType,
+ preHandler, resolveHandler, diveHandler, postHandler);
+ }
else if (func == "tables") {
if (!Args.empty()) {
ctx.Error(Pos) << Func << " doesn't accept arguments";
@@ -2619,6 +2728,12 @@ public:
Nodes.insert(Nodes.end(), preparedNodes.begin(), preparedNodes.end());
}
+ decltype(Nodes) imports;
+ for (const auto& [alias, path]: ctx.RequiredModules) {
+ imports.push_back(Y("import", alias, BuildQuotedAtom(Pos, path)));
+ }
+ Nodes.insert(Nodes.begin(), std::make_move_iterator(imports.begin()), std::make_move_iterator(imports.end()));
+
for (const auto& symbol: ctx.Exports) {
Add(Y("export", symbol));
}