aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorfedor-miron <fedor-miron@yandex-team.com>2023-08-02 20:24:34 +0300
committerfedor-miron <fedor-miron@yandex-team.com>2023-08-02 20:24:34 +0300
commit660860fd294c53ccec52cc83a742b0bb966c524b (patch)
tree1cd344b6cd7a5a7667d2c57bb37342f7e2141925
parent6e81914eac95d3ff33e3325a4aaacdc571165f88 (diff)
downloadydb-660860fd294c53ccec52cc83a742b0bb966c524b.tar.gz
YQL-9853: add batch list for yt
-rw-r--r--ydb/library/yql/providers/yt/gateway/file/yql_yt_file.cpp62
-rw-r--r--ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h6
-rw-r--r--ydb/library/yql/providers/yt/gateway/native/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/yt/gateway/native/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/providers/yt/gateway/native/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/yt/gateway/native/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/providers/yt/gateway/native/ut/ya.make19
-rw-r--r--ydb/library/yql/providers/yt/gateway/native/ut/yql_yt_native_folders_ut.cpp196
-rw-r--r--ydb/library/yql/providers/yt/gateway/native/ya.make5
-rw-r--r--ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp312
-rw-r--r--ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.cpp229
-rw-r--r--ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.h27
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_gateway.h66
-rw-r--r--ydb/library/yql/providers/yt/provider/yql_yt_io_discovery.cpp257
14 files changed, 848 insertions, 335 deletions
diff --git a/ydb/library/yql/providers/yt/gateway/file/yql_yt_file.cpp b/ydb/library/yql/providers/yt/gateway/file/yql_yt_file.cpp
index e5b2f266ce..008d4e50bd 100644
--- a/ydb/library/yql/providers/yt/gateway/file/yql_yt_file.cpp
+++ b/ydb/library/yql/providers/yt/gateway/file/yql_yt_file.cpp
@@ -529,6 +529,7 @@ public:
}
}
+ TVector<TFolderResult::TFolderItem> items;
TFolderResult res;
res.SetSuccess();
@@ -545,15 +546,70 @@ public:
}
item.Attributes = NYT::NodeToYsonString(attrs);
- res.Items.push_back(item);
+ items.push_back(std::move(item));
}
-
+ res.ItemsOrFileLink = std::move(items);
return MakeFuture(res);
} catch (const yexception& e) {
return MakeFuture(NCommon::ResultFromException<TFolderResult>(e, pos));
}
}
+ TFuture<TBatchFolderResult> ResolveLinks(TResolveOptions&& options) final {
+ TBatchFolderResult res;
+ res.SetSuccess();
+ for (auto&& [item, reqAttrs] : options.Items()) {
+ if (item.Type != "link") {
+ res.Items.push_back(item);
+ }
+ else {
+ if (item.Attributes.HasKey("broken") || item.Attributes["broken"].AsBool()) {
+ continue;
+ }
+ const TStringBuf targetPath = item.Attributes["target_path"].AsString();
+ const auto folder = targetPath.RBefore('/');
+ const auto folderContent = GetFolder(TFolderOptions(options.SessionId())
+ .Attributes(reqAttrs)
+ .Cluster(options.Cluster())
+ .Prefix(TString(folder))
+ .Config(options.Config())
+ .Pos(options.Pos())).GetValue();
+
+ if (std::holds_alternative<TFileLinkPtr>(folderContent.ItemsOrFileLink)) {
+ continue;
+ }
+ for (const auto& item: std::get<TVector<TFolderResult::TFolderItem>>(folderContent.ItemsOrFileLink)) {
+ if (item.Path == targetPath) {
+ res.Items.push_back({item.Path, item.Type, NYT::NodeFromYsonString(item.Attributes)});
+ break;
+ }
+ }
+ }
+ }
+ return MakeFuture(res);
+ }
+
+ TFuture<TBatchFolderResult> GetFolders(TBatchFolderOptions&& options) final {
+ TBatchFolderResult res;
+ res.SetSuccess();
+ for (const auto& folder : options.Folders()) {
+ TFolderOptions folderOptions(options.SessionId());
+ folderOptions.Attributes(folder.AttrKeys)
+ .Cluster(options.Cluster())
+ .Prefix(folder.Prefix)
+ .Config(options.Config())
+ .Pos(options.Pos());
+ const auto folderContent = GetFolder(TFolderOptions(std::move(folderOptions))).GetValue();
+ if (std::holds_alternative<TFileLinkPtr>(folderContent.ItemsOrFileLink)) {
+ continue;
+ }
+ for (const auto& item: std::get<TVector<TFolderResult::TFolderItem>>(folderContent.ItemsOrFileLink)) {
+ res.Items.push_back({item.Path, item.Type, NYT::NodeFromYsonString(item.Attributes)});
+ }
+ }
+ return MakeFuture(res);
+ }
+
TFuture<TResOrPullResult> ResOrPull(const TExprNode::TPtr& node, TExprContext& ctx, TResOrPullOptions&& options) final {
TResOrPullResult res;
auto nodePos = ctx.GetPosition(node->Pos());
@@ -1426,4 +1482,4 @@ IYtGateway::TPtr CreateYtFileGateway(const NFile::TYtFileServices::TPtr& service
return new NFile::TYtFileGateway(services, emulateOutputForMultirunPtr);
}
-} // NYql
+} // NYql \ No newline at end of file
diff --git a/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h b/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h
index b99912ee1e..dfe2376d08 100644
--- a/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h
+++ b/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h
@@ -54,7 +54,11 @@ public:
TString DefaultTmpFolder;
THashMap<std::tuple<TString, TString, TString>, std::vector<NYT::TRichYPath>> RangeCache;
THashMap<TString, std::pair<std::vector<TString>, std::vector<std::exception_ptr>>> PartialRangeCache;
- THashMap<TString, std::variant<std::vector<std::tuple<TString, TString, TString>>, TFileLinkPtr>> FolderCache;
+
+ using TFolderCache = THashMap<TString, std::vector<std::tuple<TString, TString, NYT::TNode>>>;
+ TFolderCache FolderCache;
+
+ THashMap<TString, TFileLinkPtr> FolderFilePtrCache;
TMutex Lock_;
diff --git a/ydb/library/yql/providers/yt/gateway/native/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/yt/gateway/native/CMakeLists.darwin-x86_64.txt
index b0b5b9bfdb..92bdf4cee5 100644
--- a/ydb/library/yql/providers/yt/gateway/native/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/providers/yt/gateway/native/CMakeLists.darwin-x86_64.txt
@@ -65,6 +65,7 @@ target_sources(yt-gateway-native PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_exec_ctx.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_lambda_builder.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_op_tracker.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_qb2.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_session.cpp
diff --git a/ydb/library/yql/providers/yt/gateway/native/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/yt/gateway/native/CMakeLists.linux-aarch64.txt
index 80b7c65cdc..f377da4581 100644
--- a/ydb/library/yql/providers/yt/gateway/native/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/providers/yt/gateway/native/CMakeLists.linux-aarch64.txt
@@ -66,6 +66,7 @@ target_sources(yt-gateway-native PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_exec_ctx.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_lambda_builder.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_op_tracker.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_qb2.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_session.cpp
diff --git a/ydb/library/yql/providers/yt/gateway/native/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/yt/gateway/native/CMakeLists.linux-x86_64.txt
index 80b7c65cdc..f377da4581 100644
--- a/ydb/library/yql/providers/yt/gateway/native/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/providers/yt/gateway/native/CMakeLists.linux-x86_64.txt
@@ -66,6 +66,7 @@ target_sources(yt-gateway-native PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_exec_ctx.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_lambda_builder.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_op_tracker.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_qb2.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_session.cpp
diff --git a/ydb/library/yql/providers/yt/gateway/native/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/yt/gateway/native/CMakeLists.windows-x86_64.txt
index b0b5b9bfdb..92bdf4cee5 100644
--- a/ydb/library/yql/providers/yt/gateway/native/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/providers/yt/gateway/native/CMakeLists.windows-x86_64.txt
@@ -65,6 +65,7 @@ target_sources(yt-gateway-native PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_exec_ctx.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_lambda_builder.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_op_tracker.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_qb2.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_session.cpp
diff --git a/ydb/library/yql/providers/yt/gateway/native/ut/ya.make b/ydb/library/yql/providers/yt/gateway/native/ut/ya.make
new file mode 100644
index 0000000000..42f056a7f3
--- /dev/null
+++ b/ydb/library/yql/providers/yt/gateway/native/ut/ya.make
@@ -0,0 +1,19 @@
+GTEST()
+
+SRCS(
+ yql_yt_native_folders_ut.cpp
+)
+
+PEERDIR(
+ ydb/library/yql/providers/yt/gateway/native
+ ydb/library/yql/providers/yt/gateway/file
+ ydb/library/yql/core/ut_common
+ library/cpp/testing/mock_server
+ library/cpp/testing/common
+ ydb/library/yql/public/udf/service/terminate_policy
+ ydb/library/yql/sql/pg
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/library/yql/providers/yt/gateway/native/ut/yql_yt_native_folders_ut.cpp b/ydb/library/yql/providers/yt/gateway/native/ut/yql_yt_native_folders_ut.cpp
new file mode 100644
index 0000000000..d83173e2d4
--- /dev/null
+++ b/ydb/library/yql/providers/yt/gateway/native/ut/yql_yt_native_folders_ut.cpp
@@ -0,0 +1,196 @@
+#include <library/cpp/yson/node/node_io.h>
+#include <ydb/library/yql/core/ut_common/yql_ut_common.h>
+#include <library/cpp/testing/common/network.h>
+#include <library/cpp/testing/mock_server/server.h>
+#include <library/cpp/testing/gtest/gtest.h>
+#include <ydb/library/yql/providers/yt/gateway/native/yql_yt_native.h>
+#include <ydb/library/yql/core/file_storage/proto/file_storage.pb.h>
+#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h>
+#include <ydb/library/yql/providers/yt/provider/yql_yt_provider.h>
+
+namespace NYql {
+
+namespace {
+
+constexpr auto CYPRES_TX_ID = "\"123321\"";
+constexpr auto CYPRES_NODE_A_CONTENT = R"(
+[
+ {
+ output = [
+ <
+ "user_attributes" = {};
+ "type" = "table";
+ > "a";
+ <
+ "user_attributes" = {};
+ "type" = "table";
+ > "b";
+ <
+ "user_attributes" = {};
+ "target_path" = "//link_dest";
+ "broken" = %false;
+ "type" = "link";
+ > "link";
+ <
+ "user_attributes" = {};
+ "target_path" = "//link_broken_dest";
+ "broken" = %true;
+ "type" = "link";
+ > "link_broken";
+ ];
+ };
+]
+)";
+constexpr auto CYPRES_LINK_DEST = R"(
+[
+ {
+ "output" = <
+ "user_attributes" = {};
+ "type" = "table";
+ > #;
+ };
+]
+)";
+TVector<IYtGateway::TFolderResult::TFolderItem> EXPECTED_ITEMS {
+ {"test/a/a", "table", R"({"user_attributes"={}})"},
+ {"test/a/b", "table", R"({"user_attributes"={}})"},
+ {"test/a/link", "table", R"({"user_attributes"={}})"}
+};
+
+TGatewaysConfig MakeGatewaysConfig(size_t port)
+{
+ TGatewaysConfig config {};
+ auto* clusters = config.MutableYt()->MutableClusterMapping();
+ NYql::TYtClusterConfig cluster;
+ cluster.SetName("ut_cluster");
+ cluster.SetYTName("ut_cluster");
+ cluster.SetCluster("localhost:" + ToString(port));
+ clusters->Add(std::move(cluster));
+ return config;
+}
+
+class TYtReplier : public TRequestReplier {
+public:
+ bool DoReply(const TReplyParams& params) override {
+ const TParsedHttpFull parsed(params.Input.FirstLine());
+ Cout << parsed.Path << Endl;
+
+ HttpCodes code = HTTP_NOT_FOUND;
+ TString content;
+ if (parsed.Path == "/api/v3/start_tx") {
+ content = CYPRES_TX_ID;
+ code = HTTP_OK;
+ }
+ else if (parsed.Path == "/api/v3/ping_tx") {
+ code = HTTP_OK;
+ }
+ else if (parsed.Path == "/api/v3/execute_batch") {
+ auto executeBatchRes = HandleExecuteBatch(params.Input);
+ executeBatchRes.OutTo(params.Output);
+ return true;
+ }
+ THttpResponse resp(code);
+ resp.SetContent(content);
+ resp.OutTo(params.Output);
+
+ return true;
+ }
+ explicit TYtReplier(THashMap<TString, THashSet<TString>> requiredAttributes): requiredAttributes(requiredAttributes) {}
+
+private:
+ void CheckReqAttributes(TStringBuf path, const NYT::TNode& attributes) {
+ THashSet<TString> attributesSet;
+ for (const auto& attribute : attributes.AsList()) {
+ attributesSet.insert(attribute.AsString());
+ }
+ EXPECT_EQ(requiredAttributes[path], attributesSet);
+ }
+
+ THttpResponse HandleListCommand(const NYT::TNode& path, const NYT::TNode& attributes) {
+ CheckReqAttributes(path.AsString(), attributes);
+
+ THttpResponse resp{HTTP_OK};
+ if (path == "//test/a") {
+ resp.SetContent(CYPRES_NODE_A_CONTENT);
+ return resp;
+ }
+ return THttpResponse{HTTP_NOT_FOUND};
+ }
+
+ THttpResponse HandleGetCommand(const NYT::TNode& path, const NYT::TNode& attributes) {
+ CheckReqAttributes(path.AsString(), attributes);
+
+ THttpResponse resp{HTTP_OK};
+ if (path == "//link_dest") {
+ resp.SetContent(CYPRES_LINK_DEST);
+ return resp;
+ }
+
+ return THttpResponse{HTTP_NOT_FOUND};
+ }
+
+ THttpResponse HandleExecuteBatch(THttpInput& input) {
+ auto requestBody = input.ReadAll();
+ auto requestBodyNode = NYT::NodeFromYsonString(requestBody);
+ if (!requestBodyNode.HasKey("requests")) {
+ return THttpResponse{HTTP_INTERNAL_SERVER_ERROR};
+ }
+ auto& requests = requestBodyNode["requests"];
+ if (!requests.IsList()) {
+ return THttpResponse{HTTP_INTERNAL_SERVER_ERROR};
+ }
+ for (auto& request : requests.AsList()) {
+ auto& command = request["command"];
+ auto& parameters = request["parameters"];
+ if (command == "list") {
+ return HandleListCommand(parameters["path"], parameters.HasKey("attributes") ? parameters["attributes"] : NYT::TNode{});
+ }
+ if (command == "get") {
+ return HandleGetCommand(parameters["path"], parameters.HasKey("attributes") ? parameters["attributes"] : NYT::TNode{});
+ }
+ }
+ return THttpResponse{HTTP_NOT_FOUND};
+ }
+
+ THashMap<TString, THashSet<TString>> requiredAttributes;
+};
+
+TEST(YtNativeGateway, GetFolder) {
+ const auto port = NTesting::GetFreePort();
+ THashMap<TString, THashSet<TString>> requiredAttributes {
+ {"//test/a", {"type", "broken", "target_path", "user_attributes"}},
+ {"//link_dest", {"type", "user_attributes"}}
+ };
+ NMock::TMockServer mockServer{port, [&requiredAttributes] () {return new TYtReplier(requiredAttributes);}};
+
+ TYtNativeServices nativeServices;
+ auto gatewaysConfig = MakeGatewaysConfig(port);
+ nativeServices.Config = std::make_shared<TYtGatewayConfig>(gatewaysConfig.GetYt());
+ nativeServices.FileStorage = CreateFileStorage(TFileStorageConfig{});
+
+ auto ytGateway = CreateYtNativeGateway(nativeServices);
+ auto ytState = MakeIntrusive<TYtState>();
+ ytState->Gateway = ytGateway;
+
+ InitializeYtGateway(ytGateway, ytState);
+
+ IYtGateway::TFolderOptions folderOptions{ytState->SessionId};
+ TYtSettings ytSettings {};
+ folderOptions.Cluster("ut_cluster")
+ .Config(std::make_shared<TYtSettings>(ytSettings))
+ .Prefix("//test/a")
+ .Attributes({"user_attributes"});
+ auto folderFuture = ytGateway->GetFolder(std::move(folderOptions));
+
+ folderFuture.Wait();
+ ytState->Gateway->CloseSession({ytState->SessionId});
+ auto folderRes = folderFuture.GetValue();
+ ASSERT_EQ(folderRes.Success(), true) << folderRes.Issues().ToString();
+ ASSERT_EQ(
+ folderRes.ItemsOrFileLink,
+ (std::variant<TVector<IYtGateway::TFolderResult::TFolderItem>, TFileLinkPtr>(EXPECTED_ITEMS)));
+}
+
+} // namespace
+
+} // namespace NYql
diff --git a/ydb/library/yql/providers/yt/gateway/native/ya.make b/ydb/library/yql/providers/yt/gateway/native/ya.make
index 27ac2da5f5..daf8fc46d3 100644
--- a/ydb/library/yql/providers/yt/gateway/native/ya.make
+++ b/ydb/library/yql/providers/yt/gateway/native/ya.make
@@ -4,6 +4,7 @@ SRCS(
yql_yt_exec_ctx.cpp
yql_yt_lambda_builder.cpp
yql_yt_native.cpp
+ yql_yt_native_folders.cpp
yql_yt_op_tracker.cpp
yql_yt_qb2.cpp
yql_yt_session.cpp
@@ -63,3 +64,7 @@ PEERDIR(
YQL_LAST_ABI_VERSION()
END()
+
+RECURSE_FOR_TESTS(
+ ut
+)
diff --git a/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp
index 1c9b3b3355..67b4a04d86 100644
--- a/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp
+++ b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp
@@ -5,6 +5,7 @@
#include "yql_yt_session.h"
#include "yql_yt_spec.h"
#include "yql_yt_transform.h"
+#include "yql_yt_native_folders.h"
#include <ydb/library/yql/providers/yt/gateway/lib/yt_helpers.h>
#include <ydb/library/yql/providers/yt/lib/config_clusters/config_clusters.h>
@@ -134,22 +135,6 @@ TString ToColumnList(const TVector<T>& list) {
return (builder << ']');
}
-TString GetType(const NYT::TNode& attr) {
- if (!attr.HasKey("type")) {
- return "unknown";
- }
-
- return attr["type"].AsString();
-}
-
-TString GetAttrType(const NYT::TNode& node) {
- if (!node.HasAttributes()) {
- return "unknown";
- }
-
- return GetType(node.GetAttributes());
-}
-
const NYT::TJobBinaryConfig GetJobBinary(const NYT::TRawMapOperationSpec& spec) {
return spec.MapperSpec_.GetJobBinary();
}
@@ -474,6 +459,88 @@ public:
YQL_CLOG(INFO, ProviderYt) << "Server=" << options.Cluster() << ", Prefix=" << options.Prefix();
+ try {
+ TSession::TPtr session = GetSession(options.SessionId());
+
+ auto batchOptions = TBatchFolderOptions(options.SessionId())
+ .Cluster(options.Cluster())
+ .Pos(options.Pos())
+ .Config(options.Config())
+ .Folders({{options.Prefix(), options.Attributes()}});
+ auto execCtx = MakeExecCtx(std::move(batchOptions), session, options.Cluster(), nullptr, nullptr);
+
+ if (auto filePtr = MaybeGetFilePtrFromCache(execCtx->GetOrCreateEntry(), execCtx->Options_.Folders().front())) {
+ TFolderResult res;
+ res.SetSuccess();
+ res.ItemsOrFileLink = *filePtr;
+ return MakeFuture(res);
+ }
+
+ auto getFolderFuture = session->Queue_->Async([execCtx] () {
+ YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_);
+ return ExecGetFolder(execCtx);
+ });
+ auto resolvedLinksFuture = getFolderFuture.Apply([options, this, session] (const TFuture<TBatchFolderResult>& f) {
+ TVector<IYtGateway::TResolveOptions::TItemWithReqAttrs> resolveItems;
+ auto res = f.GetValue();
+ for (auto&& item : res.Items) {
+ IYtGateway::TResolveOptions::TItemWithReqAttrs resolveItem {
+ .Item = item,
+ .AttrKeys = options.Attributes()
+ };
+ resolveItems.push_back(std::move(resolveItem));
+ }
+
+ auto resolveOptions = TResolveOptions(options.SessionId())
+ .Cluster(options.Cluster())
+ .Pos(options.Pos())
+ .Config(options.Config())
+ .Items(resolveItems);
+ auto execCtx = MakeExecCtx(std::move(resolveOptions), session, options.Cluster(), nullptr, nullptr);
+ return ExecResolveLinks(execCtx);
+ });
+
+ return resolvedLinksFuture.Apply([execCtx] (const TFuture<TBatchFolderResult>& f) {
+ const ui32 countLimit = execCtx->Options_.Config()->FolderInlineItemsLimit.Get().GetOrElse(100);
+ const ui64 sizeLimit = execCtx->Options_.Config()->FolderInlineDataLimit.Get().GetOrElse(100_KB);
+
+ TFolderResult res;
+ res.SetSuccess();
+
+ auto resolveRes = f.GetValue();
+ TVector<TFolderResult::TFolderItem> items;
+ for (auto& batchItem : resolveRes.Items) {
+ TFolderResult::TFolderItem item {
+ .Path = std::move(batchItem.Path),
+ .Type = std::move(batchItem.Type),
+ .Attributes = NYT::NodeToYsonString(batchItem.Attributes)
+ };
+ items.emplace_back(std::move(item));
+ }
+ if (items.size() > countLimit) {
+ res.ItemsOrFileLink = SaveItemsToTempFile(execCtx, items);
+ return res;
+ }
+ ui64 total_size = std::accumulate(items.begin(), items.end(), 0, [] (ui64 size, const TFolderResult::TFolderItem& i) {
+ return size + i.Type.length() + i.Path.length() + i.Attributes.length();
+ });
+ if (total_size > sizeLimit) {
+ res.ItemsOrFileLink = SaveItemsToTempFile(execCtx, items);
+ return res;
+ }
+ res.ItemsOrFileLink = std::move(items);
+ return res;
+ });
+ } catch (...) {
+ return MakeFuture(ResultFromCurrentException<TFolderResult>(options.Pos()));
+ }
+ }
+
+ TFuture<TBatchFolderResult> GetFolders(TBatchFolderOptions&& options) final {
+ YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__);
+
+ YQL_CLOG(INFO, ProviderYt) << "Server=" << options.Cluster();
+
auto pos = options.Pos();
try {
TSession::TPtr session = GetSession(options.SessionId());
@@ -486,7 +553,29 @@ public:
return ExecGetFolder(execCtx);
});
} catch (...) {
- return MakeFuture(ResultFromCurrentException<TFolderResult>(pos));
+ return MakeFuture(ResultFromCurrentException<TBatchFolderResult>(pos));
+ }
+ }
+
+
+ TFuture<TBatchFolderResult> ResolveLinks(TResolveOptions&& options) final {
+ YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__);
+
+ YQL_CLOG(INFO, ProviderYt) << "Server=" << options.Cluster();
+
+ auto pos = options.Pos();
+ try {
+ TSession::TPtr session = GetSession(options.SessionId());
+
+ auto cluster = options.Cluster();
+ auto execCtx = MakeExecCtx(std::move(options), session, cluster, nullptr, nullptr);
+
+ return session->Queue_->Async([execCtx] () {
+ YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_);
+ return ExecResolveLinks(execCtx);
+ });
+ } catch (...) {
+ return MakeFuture(ResultFromCurrentException<TBatchFolderResult>(pos));
}
}
@@ -1829,193 +1918,6 @@ private:
return rangeRes;
}
- static TFolderResult ExecGetFolder(const TExecContext<TFolderOptions>::TPtr& execCtx)
- {
- try {
- auto entry = execCtx->GetOrCreateEntry();
-
- TFolderResult res;
- res.SetSuccess();
-
- TString prefix = execCtx->Options_.Prefix();
- TString cacheKey = prefix;
-
- TSet<TString> uniqueAttrs;
- for (const auto& attr : execCtx->Options_.Attributes()) {
- uniqueAttrs.insert(attr);
- }
- // Make key with sorted attrs
- std::for_each(uniqueAttrs.cbegin(), uniqueAttrs.cend(), [&cacheKey](const auto& attr) { cacheKey.append('&').append(attr); } );
-
- with_lock(entry->Lock_) {
- if (auto p = entry->FolderCache.FindPtr(cacheKey)) {
- if (auto file = std::get_if<TFileLinkPtr>(p)) {
- res.File = *file;
- YQL_CLOG(INFO, ProviderYt) << "Found folder in cache with key ('" << cacheKey << "') with tmp file " << res.File->GetPath().GetPath();
- } else {
- const auto& list = std::get<std::vector<std::tuple<TString, TString, TString>>>(*p);
- YQL_CLOG(INFO, ProviderYt) << "Found folder in cache with key ('" << cacheKey << "') with " << list.size() << " items";
- for (auto& el: list) {
- TFolderResult::TFolderItem item;
- std::tie(item.Type, item.Path, item.Attributes) = el;
- res.Items.push_back(std::move(item));
- }
- }
- return res;
- }
- }
-
- if (!prefix.empty() && !entry->Tx->Exists(prefix)) {
- YQL_CLOG(INFO, ProviderYt) << "Storing empty folder in cache with key ('" << cacheKey << "')";
- with_lock(entry->Lock_) {
- entry->FolderCache[cacheKey] = std::vector<std::tuple<TString, TString, TString>>{};
- }
- return res;
- }
-
-
- TSet<TString> uniqueAttrsWithSystemAttrs = uniqueAttrs;
- uniqueAttrsWithSystemAttrs.insert("type");
- uniqueAttrsWithSystemAttrs.insert("target_path");
- uniqueAttrsWithSystemAttrs.insert("broken");
-
- auto makeAttrFilter = [](const TSet<TString>& attrs) {
- NYT::TAttributeFilter ret;
- for (const auto& attr : attrs) {
- ret.AddAttribute(attr);
- }
- return ret;
- };
-
- auto nodeList = entry->Tx->List(prefix, TListOptions().AttributeFilter(makeAttrFilter(uniqueAttrsWithSystemAttrs)));
- res.Items.reserve(nodeList.size());
-
- uniqueAttrsWithSystemAttrs.erase("target_path");
- uniqueAttrsWithSystemAttrs.erase("broken");
-
- if (prefix) {
- prefix.append('/');
- }
-
- THolder<TOFStream> out;
- ui32 count = 0;
- ui64 size = 0;
- const ui32 countLimit = execCtx->Options_.Config()->FolderInlineItemsLimit.Get().GetOrElse(100);
- const ui64 sizeLimit = execCtx->Options_.Config()->FolderInlineDataLimit.Get().GetOrElse(100_KB);
- TString file;
- Y_DEFER {
- if (file) {
- NFs::Remove(file);
- }
- };
-
- auto writeItem = [&uniqueAttrs, &out, &res, &count, &file,
- countLimit, &size, sizeLimit, execCtx](const NYT::TNode& node, TString path) {
- auto type = GetAttrType(node);
- if (path.StartsWith(NYT::TConfig::Get()->Prefix)) {
- path = path.substr(NYT::TConfig::Get()->Prefix.size());
- }
- NYT::TNode retAttrs = NYT::TNode::CreateMap();
- auto& nodeAttrs = node.GetAttributes();
- for (const auto& attrName : uniqueAttrs) {
- if (attrName && nodeAttrs.HasKey(attrName)) {
- retAttrs[attrName] = nodeAttrs[attrName];
- }
- }
- auto attrs = NYT::NodeToYsonString(retAttrs);
-
- if (!out) {
- ++count;
- size += type.length() + path.length() + attrs.length();
- if (count <= countLimit && size <= sizeLimit) {
- TFolderResult::TFolderItem item;
- item.Type = std::move(type);
- item.Path = std::move(path);
- item.Attributes = std::move(attrs);
- res.Items.push_back(std::move(item));
- return;
- }
- file = execCtx->FileStorage_->GetTemp() / GetGuidAsString(execCtx->Session_->RandomProvider_->GenGuid());
- out = MakeHolder<TOFStream>(file);
- for (auto& item: res.Items) {
- ::SaveMany(out.Get(), item.Type, item.Path, item.Attributes);
- }
- res.Items.clear();
- YQL_CLOG(INFO, ProviderYt) << "Folder limit exceeded. Writing items to file " << file;
- }
-
- ::SaveMany(out.Get(), type, path, attrs);
- };
-
- auto batchGet = entry->Tx->CreateBatchRequest();
- TVector<TFuture<void>> batchRes;
-
- for (const auto& node : nodeList) {
- auto path = prefix + node.AsString();
- auto type = GetAttrType(node);
- if (type != "link") {
- writeItem(node, path);
- }
- else if (!node.GetAttributes()["broken"].AsBool()) {
- auto targetPath = node.GetAttributes()["target_path"].AsString();
- batchRes.push_back(
- batchGet->Get(targetPath, TGetOptions().AttributeFilter(makeAttrFilter(uniqueAttrsWithSystemAttrs)))
- .Apply([path, writeItem](const NThreading::TFuture<NYT::TNode>& f) {
- try {
- writeItem(f.GetValue(), path);
- } catch (...) {
- writeItem(NYT::TNode::CreateMap(), path);
- }
- })
- );
- }
- }
-
- if (!batchRes.empty()) {
- batchGet->ExecuteBatch();
- WaitExceptionOrAll(batchRes).GetValue();
- }
- if (out) {
- ::SaveSize(out.Get(), 0);
- out.Destroy();
- res.File = CreateFakeFileLink(file, TString(), true);
- file = {};
- }
- YQL_CLOG(INFO, ProviderYt) << "Folder items count=" << count << ", size=" << size;
-
- if (res.File) {
- YQL_CLOG(INFO, ProviderYt) << "Storing folder tmp file " << res.File->GetPath().GetPath() << " in cache with key ('" << cacheKey << "')";
- with_lock(entry->Lock_) {
- entry->FolderCache[cacheKey] = res.File;
- }
- } else {
- YQL_CLOG(INFO, ProviderYt) << "Storing folder with " << res.Items.size() << " items in cache with key ('" << cacheKey << "')";
- std::vector<std::tuple<TString, TString, TString>> cache;
- for (const auto& item: res.Items) {
- cache.emplace_back(item.Type, item.Path, item.Attributes);
- }
- with_lock(entry->Lock_) {
- entry->FolderCache[cacheKey] = std::move(cache);
- }
- }
-
- return res;
-
- } catch (const NYT::TErrorResponse &e) {
- if (e.GetError().ContainsErrorCode(NYT::NClusterErrorCodes::NRpc::NoSuchMethod)) {
- TFolderResult res;
- TString errMsg = execCtx->Options_.Prefix() + " is not a folder.";
- TIssue rootIssue = YqlIssue(execCtx->Options_.Pos(), TIssuesIds::YT_FOLDER_INPUT_IS_NOT_A_FOLDER, errMsg);
- res.SetStatus(TIssuesIds::YT_FOLDER_INPUT_IS_NOT_A_FOLDER);
- res.AddIssue(rootIssue);
- return res;
- }
- return ResultFromCurrentException<TFolderResult>(execCtx->Options_.Pos());
- } catch (...) {
- return ResultFromCurrentException<TFolderResult>(execCtx->Options_.Pos());
- }
- }
-
static TFuture<void> ExecPublish(
const TExecContext<TPublishOptions>::TPtr& execCtx,
const TVector<TString>& src,
@@ -4905,4 +4807,4 @@ IYtGateway::TPtr CreateYtNativeGateway(const TYtNativeServices& services) {
return MakeIntrusive<NNative::TYtNativeGateway>(services);
}
-} // NYql
+} // NYql \ No newline at end of file
diff --git a/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.cpp b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.cpp
new file mode 100644
index 0000000000..f668633115
--- /dev/null
+++ b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.cpp
@@ -0,0 +1,229 @@
+#include "yql_yt_native_folders.h"
+
+#include <yt/cpp/mapreduce/interface/error_codes.h>
+#include <ydb/library/yql/utils/log/log.h>
+#include <ydb/library/yql/providers/yt/gateway/lib/yt_helpers.h>
+
+namespace NYql::NNative {
+
+using namespace NYT;
+using namespace NCommon;
+using namespace NKikimr;
+using namespace NKikimr::NMiniKQL;
+using namespace NNodes;
+using namespace NThreading;
+
+TString GetType(const NYT::TNode& attr) {
+ if (!attr.HasKey("type")) {
+ return "unknown";
+ }
+
+ return attr["type"].AsString();
+}
+
+TString GetAttrType(const NYT::TNode& node) {
+ if (!node.HasAttributes()) {
+ return "unknown";
+ }
+
+ return GetType(node.GetAttributes());
+}
+
+TMaybe<TVector<IYtGateway::TBatchFolderResult::TFolderItem>> MaybeGetFolderFromCache(TTransactionCache::TEntry::TPtr entry, TStringBuf cacheKey) {
+ TVector<IYtGateway::TBatchFolderResult::TFolderItem> items;
+ with_lock(entry->Lock_) {
+ const auto listPtr = entry->FolderCache.FindPtr(cacheKey);
+ if (listPtr) {
+ YQL_CLOG(INFO, ProviderYt) << "Found folder in cache with key ('" << cacheKey << "') with " << listPtr->size() << " items";
+ for (auto& el : *listPtr) {
+ IYtGateway::TBatchFolderResult::TFolderItem item;
+ std::tie(item.Type, item.Path, item.Attributes) = el;
+ items.emplace_back(std::move(item));
+ }
+ return items;
+ }
+ }
+ return {};
+}
+
+TMaybe<TFileLinkPtr> MaybeGetFilePtrFromCache(TTransactionCache::TEntry::TPtr entry, const IYtGateway::TBatchFolderOptions::TFolderPrefixAttrs& folder) {
+ const auto cacheKey = std::accumulate(folder.AttrKeys.begin(), folder.AttrKeys.end(), folder.Prefix,
+ [] (TString&& str, const TString& arg) {
+ return str + "&" + arg;
+ });
+ with_lock(entry->Lock_) {
+ const auto filePtr = entry->FolderFilePtrCache.FindPtr(cacheKey);
+ if (filePtr) {
+ return filePtr->Get();
+ }
+ }
+ return {};
+}
+
+TAttributeFilter MakeAttrFilter(const TSet<TString>& attributes, bool isResolvingLink) {
+ NYT::TAttributeFilter filter;
+ for (const auto& attr : attributes) {
+ filter.AddAttribute(attr);
+ }
+ if (!isResolvingLink) {
+ filter.AddAttribute("target_path");
+ filter.AddAttribute("broken");
+ }
+ filter.AddAttribute("type");
+ return filter;
+}
+
+IYtGateway::TBatchFolderResult::TFolderItem MakeFolderItem(const NYT::TNode& node, const TString& path) {
+ IYtGateway::TBatchFolderResult::TFolderItem item;
+ item.Attributes = NYT::TNode::CreateMap();
+ for (const auto& attr: node.GetAttributes().AsMap()) {
+ if (attr.first == "type") {
+ continue;
+ }
+ item.Attributes[attr.first] = attr.second;
+ }
+ item.Type = GetAttrType(node);
+ item.Path = path.StartsWith(NYT::TConfig::Get()->Prefix)
+ ? path.substr(NYT::TConfig::Get()->Prefix.size())
+ : path;
+ return item;
+}
+
+const TTransactionCache::TEntry::TFolderCache::value_type& StoreResInCache(const TTransactionCache::TEntry::TPtr& entry, TVector<IYtGateway::TBatchFolderResult::TFolderItem>&& items, const TString& cacheKey) {
+ std::vector<std::tuple<TString, TString, NYT::TNode>> cache;
+ for (const auto& item : items) {
+ cache.emplace_back(std::move(item.Type), std::move(item.Path), std::move(item.Attributes));
+ }
+ with_lock(entry->Lock_) {
+ const auto [it, _] = entry->FolderCache.insert_or_assign(cacheKey, std::move(cache));
+ return *it;
+ }
+}
+
+TFileLinkPtr SaveItemsToTempFile(const TExecContext<IYtGateway::TBatchFolderOptions>::TPtr& execCtx, const TVector<IYtGateway::TFolderResult::TFolderItem>& folderItems) {
+ const TString file = execCtx->FileStorage_->GetTemp() / GetGuidAsString(execCtx->Session_->RandomProvider_->GenGuid());
+ YQL_CLOG(INFO, ProviderYt) << "Folder limit exceeded. Writing items to file " << file;
+
+ auto out = MakeHolder<TOFStream>(file);
+ for (auto& item: folderItems) {
+ ::SaveMany(out.Get(), item.Type, item.Path, item.Attributes);
+ }
+ ::SaveSize(out.Get(), 0);
+ out.Destroy();
+ return CreateFakeFileLink(file, "", true);
+}
+
+IYtGateway::TBatchFolderResult ExecResolveLinks(const TExecContext<IYtGateway::TResolveOptions>::TPtr& execCtx) {
+ try {
+ auto batchGet = execCtx->GetEntry()->Tx->CreateBatchRequest();
+ TVector<TFuture<IYtGateway::TBatchFolderResult::TFolderItem>> batchRes;
+
+ for (const auto& [item, reqAttributes]: execCtx->Options_.Items()) {
+ if (item.Type != "link") {
+ batchRes.push_back(MakeFuture<IYtGateway::TBatchFolderResult::TFolderItem>(std::move(item)));
+ continue;
+ }
+ if (item.Attributes["broken"].AsBool()) {
+ continue;
+ }
+ const auto& targetPath = item.Attributes["target_path"].AsString();
+ const auto& path = item.Path;
+ const auto attrFilter = MakeAttrFilter(reqAttributes, /* isResolvingLink */ true);
+
+ batchRes.push_back(
+ batchGet->Get(targetPath, TGetOptions().AttributeFilter(attrFilter))
+ .Apply([path] (const auto& f) {
+ const auto linkNode = f.GetValue();
+ return MakeFolderItem(linkNode, path);
+ })
+ );
+ }
+ if (batchRes.empty()) {
+ return {};
+ }
+ IYtGateway::TBatchFolderResult res;
+ res.SetSuccess();
+ res.Items.reserve(batchRes.size());
+
+ batchGet->ExecuteBatch();
+ WaitAll(batchRes).Wait();
+ for (auto& f : batchRes) {
+ res.Items.emplace_back(f.ExtractValue());
+ }
+ return res;
+ }
+ catch (...) {
+ return ResultFromCurrentException<IYtGateway::TBatchFolderResult>(execCtx->Options_.Pos());
+ }
+}
+
+IYtGateway::TBatchFolderResult ExecGetFolder(const TExecContext<IYtGateway::TBatchFolderOptions>::TPtr& execCtx) {
+ const auto entry = execCtx->GetOrCreateEntry();
+ auto batchList = entry->Tx->CreateBatchRequest();
+ TVector<TFuture<TVector<IYtGateway::TBatchFolderResult::TFolderItem>>> batchRes;
+
+ IYtGateway::TBatchFolderResult folderResult;
+ folderResult.SetSuccess();
+
+ for (const auto& folder : execCtx->Options_.Folders()) {
+ const auto cacheKey = std::accumulate(folder.AttrKeys.begin(), folder.AttrKeys.end(), folder.Prefix,
+ [] (TString&& str, const TString& arg) {
+ return str + "&" + arg;
+ });
+
+ auto maybeCached = MaybeGetFolderFromCache(entry, cacheKey);
+ if (maybeCached) {
+ batchRes.push_back(MakeFuture<TVector<IYtGateway::TBatchFolderResult::TFolderItem>>(std::move(*maybeCached)));
+ continue;
+ }
+
+ const auto attrFilter = MakeAttrFilter(folder.AttrKeys, /* isResolvingLink */ false);
+ batchRes.push_back(
+ batchList->List(folder.Prefix, TListOptions().AttributeFilter(attrFilter))
+ .Apply([&folder, cacheKey = std::move(cacheKey), &entry] (const TFuture<NYT::TNode::TListType>& f)
+ -> TFuture<TVector<IYtGateway::TBatchFolderResult::TFolderItem>> {
+ TVector<IYtGateway::TBatchFolderResult::TFolderItem> folderItems;
+ try {
+ auto nodeList = f.GetValue();
+ folderItems.reserve(nodeList.size());
+ for (const auto& node : nodeList) {
+ TStringBuilder path;
+ path << folder.Prefix << "/" << node.AsString();
+ folderItems.push_back(MakeFolderItem(node, path));
+ }
+ StoreResInCache(entry, std::move(folderItems), cacheKey);
+ return MakeFuture(std::move(folderItems));
+ }
+ catch (const NYT::TErrorResponse& e) {
+ if (e.GetError().ContainsErrorCode(NYT::NClusterErrorCodes::NYTree::ResolveError)) {
+ // Return empty list on missing path
+ YQL_CLOG(INFO, ProviderYt) << "Storing empty folder in cache with key ('" << cacheKey << "')";
+ StoreResInCache(entry, {}, cacheKey);
+ return MakeFuture<TVector<IYtGateway::TBatchFolderResult::TFolderItem>>({});
+ }
+ return MakeErrorFuture<TVector<IYtGateway::TBatchFolderResult::TFolderItem>>(std::current_exception());
+ }
+ catch (...) {
+ return MakeErrorFuture<TVector<IYtGateway::TBatchFolderResult::TFolderItem>>(std::current_exception());
+ }
+ })
+ );
+ }
+ batchList->ExecuteBatch();
+ try {
+ WaitExceptionOrAll(batchRes).Wait();
+ for (auto& res : batchRes) {
+ const auto items = res.ExtractValue();
+ folderResult.Items.reserve(folderResult.Items.size() + items.size());
+ for (const auto& item : items) {
+ folderResult.Items.push_back(std::move(item));
+ }
+ }
+ }
+ catch (...) {
+ return ResultFromCurrentException<IYtGateway::TBatchFolderResult>(execCtx->Options_.Pos());
+ }
+ return folderResult;
+}
+
+} // NYql::NNative
diff --git a/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.h b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.h
new file mode 100644
index 0000000000..645fc2a4b2
--- /dev/null
+++ b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.h
@@ -0,0 +1,27 @@
+#include <ydb/library/yql/providers/yt/gateway/native/yql_yt_exec_ctx.h>
+#include <ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h>
+#include <ydb/library/yql/providers/yt/provider/yql_yt_gateway.h>
+
+namespace NYql::NNative {
+
+TString GetType(const NYT::TNode& attr);
+
+TString GetAttrType(const NYT::TNode& node);
+
+TMaybe<TVector<IYtGateway::TBatchFolderResult::TFolderItem>> MaybeGetFolderFromCache(TTransactionCache::TEntry::TPtr entry, TStringBuf cacheKey);
+
+TMaybe<TFileLinkPtr> MaybeGetFilePtrFromCache(TTransactionCache::TEntry::TPtr entry, const IYtGateway::TBatchFolderOptions::TFolderPrefixAttrs& folder);
+
+NYT::TAttributeFilter MakeAttrFilter(const TSet<TString>& attributes, bool isResolvingLink);
+
+IYtGateway::TBatchFolderResult::TFolderItem MakeFolderItem(const NYT::TNode& node, const TString& path);
+
+const TTransactionCache::TEntry::TFolderCache::value_type& StoreResInCache(const TTransactionCache::TEntry::TPtr& entry, TVector<IYtGateway::TBatchFolderResult::TFolderItem>&& items, const TString& cacheKey);
+
+TFileLinkPtr SaveItemsToTempFile(const TExecContext<IYtGateway::TBatchFolderOptions>::TPtr& execCtx, const TVector<IYtGateway::TFolderResult::TFolderItem>& folderItems);
+
+IYtGateway::TBatchFolderResult ExecResolveLinks(const TExecContext<IYtGateway::TResolveOptions>::TPtr& execCtx);
+
+IYtGateway::TBatchFolderResult ExecGetFolder(const TExecContext<IYtGateway::TBatchFolderOptions>::TPtr& execCtx);
+
+} // NYql::NNative
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_gateway.h b/ydb/library/yql/providers/yt/provider/yql_yt_gateway.h
index 18fabc03f2..b01ee60a31 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_gateway.h
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_gateway.h
@@ -263,19 +263,73 @@ public:
OPTION_FIELD(TString, Cluster)
OPTION_FIELD(TString, Prefix)
- OPTION_FIELD(TVector<TString>, Attributes)
+ OPTION_FIELD(TSet<TString>, Attributes)
OPTION_FIELD(TYtSettings::TConstPtr, Config)
OPTION_FIELD(TPosition, Pos)
};
+ struct TBatchFolderOptions : public TCommonOptions {
+ using TSelf = TBatchFolderOptions;
+
+ TBatchFolderOptions(const TString& sessionId)
+ : TCommonOptions(sessionId)
+ {
+ }
+
+ struct TFolderPrefixAttrs {
+ TString Prefix;
+ TSet<TString> AttrKeys;
+ };
+
+ OPTION_FIELD(TString, Cluster)
+ OPTION_FIELD(TVector<TFolderPrefixAttrs>, Folders)
+ OPTION_FIELD(TYtSettings::TConstPtr, Config)
+ OPTION_FIELD(TPosition, Pos)
+ };
+
+ struct TBatchFolderResult : public NCommon::TOperationResult {
+ struct TFolderItem {
+ TString Path;
+ TString Type;
+ NYT::TNode Attributes;
+
+ };
+ TVector<TFolderItem> Items;
+ };
+
+ struct TSerializedFolderItem {
+ };
+
struct TFolderResult : public NCommon::TOperationResult {
struct TFolderItem {
TString Path;
TString Type;
TString Attributes;
+
+ auto operator<=>(const TFolderItem&) const = default;
};
- TVector<TFolderItem> Items;
- TFileLinkPtr File;
+ std::variant<TVector<TFolderItem>, TFileLinkPtr> ItemsOrFileLink;
+ };
+
+ //////////////////////////////////////////////////////////////
+
+ struct TResolveOptions : public TCommonOptions {
+ using TSelf = TResolveOptions;
+
+ TResolveOptions(const TString& sessionId)
+ : TCommonOptions(sessionId)
+ {
+ }
+
+ struct TItemWithReqAttrs {
+ TBatchFolderResult::TFolderItem Item;
+ TSet<TString> AttrKeys;
+ };
+
+ OPTION_FIELD(TString, Cluster)
+ OPTION_FIELD(TVector<TItemWithReqAttrs>, Items)
+ OPTION_FIELD(TYtSettings::TConstPtr, Config)
+ OPTION_FIELD(TPosition, Pos)
};
//////////////////////////////////////////////////////////////
@@ -522,6 +576,10 @@ public:
virtual NThreading::TFuture<TFolderResult> GetFolder(TFolderOptions&& options) = 0;
+ virtual NThreading::TFuture<TBatchFolderResult> ResolveLinks(TResolveOptions&& options) = 0;
+
+ virtual NThreading::TFuture<TBatchFolderResult> GetFolders(TBatchFolderOptions&& options) = 0;
+
virtual NThreading::TFuture<TResOrPullResult> ResOrPull(const TExprNode::TPtr& node, TExprContext& ctx, TResOrPullOptions&& options) = 0;
virtual NThreading::TFuture<TRunResult> Run(const TExprNode::TPtr& node, TExprContext& ctx, TRunOptions&& options) = 0;
@@ -556,4 +614,4 @@ public:
virtual TGetTablePartitionsResult GetTablePartitions(TGetTablePartitionsOptions&& options) = 0;
};
-}
+} \ No newline at end of file
diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery.cpp
index a83adf3349..08ce8f059e 100644
--- a/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery.cpp
+++ b/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery.cpp
@@ -17,6 +17,7 @@
#include <ydb/library/yql/core/issue/protos/issue_id.pb.h>
#include <ydb/library/yql/core/issue/yql_issue.h>
+#include <library/cpp/yson/node/node_io.h>
#include <library/cpp/threading/future/future.h>
#include <util/generic/vector.h>
@@ -323,7 +324,7 @@ public:
IYtGateway::TFolderOptions(State_->SessionId)
.Cluster(cluster)
.Prefix(folder.Prefix)
- .Attributes(folder.Attributes)
+ .Attributes(TSet<TString>(folder.Attributes.begin(), folder.Attributes.end()))
.Config(State_->Configuration->Snapshot())
.Pos(x.second.first)
);
@@ -372,147 +373,50 @@ public:
}
if (keys.GetType() == TYtKey::EType::Folder) {
- auto p = PendingFolders_.FindPtr(std::make_pair(cluster, *keys.GetKeys().front().GetFolder()));
- YQL_ENSURE(p);
- auto& res = p->second.GetValue();
- res.ReportIssues(ctx.IssueManager);
- if (!res.Success()) {
- return {};
+ const auto res = FetchFolderResult(ctx, cluster, *keys.GetKeys().front().GetFolder());
+ if (!res) {
+ return node;
}
-
- if (res.File) {
+ if (auto file = std::get_if<TFileLinkPtr>(&res->ItemsOrFileLink)) {
TString alias;
- if (auto p = FolderFileToAlias_.FindPtr(res.File->GetPath().GetPath())) {
+ if (auto p = FolderFileToAlias_.FindPtr(file->Get()->GetPath().GetPath())) {
alias = *p;
} else {
alias = TString("_yql_folder").append(ToString(FolderFileToAlias_.size()));
- FolderFileToAlias_.emplace(res.File->GetPath().GetPath(), alias);
+ FolderFileToAlias_.emplace(file->Get()->GetPath().GetPath(), alias);
TUserDataBlock tmpBlock;
tmpBlock.Type = EUserDataType::PATH;
- tmpBlock.Data = res.File->GetPath().GetPath();
+ tmpBlock.Data = file->Get()->GetPath().GetPath();
tmpBlock.Usage.Set(EUserDataBlockUsage::Path);
- tmpBlock.FrozenFile = res.File;
+ tmpBlock.FrozenFile = file->Get();
State_->Types->UserDataStorage->AddUserDataBlock(alias, tmpBlock);
}
- return ctx.Builder(node->Pos())
- .Callable("Cons!")
- .World(0)
- .Callable(1, "AssumeColumnOrder")
- .Callable(0, "Collect")
- .Callable(0, "Apply")
- .Callable(0, "Udf")
- .Atom(0, "File.FolderListFromFile")
- .Seal()
- .Callable(1, "FilePath")
- .Atom(0, alias)
- .Seal()
- .Seal()
+ auto folderListFromFile = ctx.Builder(node->Pos())
+ .Callable("Collect")
+ .Callable(0, "Apply")
+ .Callable(0, "Udf")
+ .Atom(0, "File.FolderListFromFile")
.Seal()
- .List(1)
- .Atom(0, "Path")
- .Atom(1, "Type")
- .Atom(2, "Attributes")
+ .Callable(1, "FilePath")
+ .Atom(0, alias)
.Seal()
.Seal()
.Seal()
.Build();
+
+ return BuildFolderTableResExpr(ctx, node->Pos(), read.World(), folderListFromFile).Ptr();
}
+ auto items = std::get<TVector<IYtGateway::TFolderResult::TFolderItem>>(res->ItemsOrFileLink);
TVector<TExprBase> listItems;
- for (auto& item: res.Items) {
- listItems.push_back(Build<TCoAsStruct>(ctx, node->Pos())
- .Add()
- .Add<TCoAtom>()
- .Value("Path")
- .Build()
- .Add<TCoString>()
- .Literal()
- .Value(item.Path)
- .Build()
- .Build()
- .Build()
- .Add()
- .Add<TCoAtom>()
- .Value("Type")
- .Build()
- .Add<TCoString>()
- .Literal()
- .Value(item.Type)
- .Build()
- .Build()
- .Build()
- .Add()
- .Add<TCoAtom>()
- .Value("Attributes")
- .Build()
- .Add<TCoYson>()
- .Literal()
- .Value(item.Attributes)
- .Build()
- .Build()
- .Build()
- .Done()
- );
- }
-
- return Build<TCoCons>(ctx, node->Pos())
- .World(read.World())
- .Input<TCoAssumeColumnOrder>()
- .Input<TCoList>()
- .ListType<TCoListType>()
- .ItemType<TCoStructType>()
- .Add<TExprList>()
- .Add<TCoAtom>()
- .Value("Path")
- .Build()
- .Add<TCoDataType>()
- .Type()
- .Value("String")
- .Build()
- .Build()
- .Build()
- .Add<TExprList>()
- .Add<TCoAtom>()
- .Value("Type")
- .Build()
- .Add<TCoDataType>()
- .Type()
- .Value("String")
- .Build()
- .Build()
- .Build()
- .Add<TExprList>()
- .Add<TCoAtom>()
- .Value("Attributes")
- .Build()
- .Add<TCoDataType>()
- .Type()
- .Value("Yson")
- .Build()
- .Build()
- .Build()
- .Build()
- .Build()
- .FreeArgs()
- .Add(listItems)
- .Build()
- .Build()
- .ColumnOrder<TCoAtomList>()
- .Add()
- .Value("Path")
- .Build()
- .Add()
- .Value("Type")
- .Build()
- .Add()
- .Value("Attributes")
- .Build()
- .Build()
- .Build()
- .Done().Ptr();
+ for (auto& item: items) {
+ listItems.push_back(BuildFolderListItemExpr(ctx, node->Pos(), item));
+ }
+
+ return BuildFolderTableResExpr(ctx, node->Pos(), read.World(), BuildFolderListExpr(ctx, node->Pos(), listItems).Ptr()).Ptr();
}
if (keys.GetType() != TYtKey::EType::Table) {
@@ -756,6 +660,115 @@ private:
return res;
}
+ TExprBase BuildFolderListItemExpr(TExprContext& ctx, NYql::TPositionHandle pos, const IYtGateway::TFolderResult::TFolderItem& folderItem) {
+ return Build<TCoAsStruct>(ctx, pos)
+ .Add()
+ .Add<TCoAtom>()
+ .Value("Path")
+ .Build()
+ .Add<TCoString>()
+ .Literal()
+ .Value(folderItem.Path)
+ .Build()
+ .Build()
+ .Build()
+ .Add()
+ .Add<TCoAtom>()
+ .Value("Type")
+ .Build()
+ .Add<TCoString>()
+ .Literal()
+ .Value(folderItem.Type)
+ .Build()
+ .Build()
+ .Build()
+ .Add()
+ .Add<TCoAtom>()
+ .Value("Attributes")
+ .Build()
+ .Add<TCoYson>()
+ .Literal()
+ .Value(folderItem.Attributes)
+ .Build()
+ .Build()
+ .Build()
+ .Done();
+ }
+
+ TCoList BuildFolderListExpr(TExprContext& ctx, NYql::TPositionHandle pos, const TVector<TExprBase>& folderItems) {
+ return Build<TCoList>(ctx, pos)
+ .ListType<TCoListType>()
+ .ItemType<TCoStructType>()
+ .Add<TExprList>()
+ .Add<TCoAtom>()
+ .Value("Path")
+ .Build()
+ .Add<TCoDataType>()
+ .Type()
+ .Value("String")
+ .Build()
+ .Build()
+ .Build()
+ .Add<TExprList>()
+ .Add<TCoAtom>()
+ .Value("Type")
+ .Build()
+ .Add<TCoDataType>()
+ .Type()
+ .Value("String")
+ .Build()
+ .Build()
+ .Build()
+ .Add<TExprList>()
+ .Add<TCoAtom>()
+ .Value("Attributes")
+ .Build()
+ .Add<TCoDataType>()
+ .Type()
+ .Value("Yson")
+ .Build()
+ .Build()
+ .Build()
+ .Build()
+ .Build()
+ .FreeArgs()
+ .Add(folderItems)
+ .Build()
+ .Build()
+ .Value();
+ }
+
+ TCoCons BuildFolderTableResExpr(TExprContext& ctx, NYql::TPositionHandle pos, const TExprBase& world, const TExprNodePtr& folderList) {
+ return Build<TCoCons>(ctx, pos)
+ .World(world)
+ .Input<TCoAssumeColumnOrder>()
+ .Input(folderList)
+ .ColumnOrder<TCoAtomList>()
+ .Add()
+ .Value("Path")
+ .Build()
+ .Add()
+ .Value("Type")
+ .Build()
+ .Add()
+ .Value("Attributes")
+ .Build()
+ .Build()
+ .Build()
+ .Done();
+ }
+
+ TMaybe<NYql::IYtGateway::TFolderResult> FetchFolderResult(TExprContext& ctx, const TString& cluster, const TYtKey::TFolderList& folder) {
+ auto p = PendingFolders_.FindPtr(std::make_pair(cluster, folder));
+ YQL_ENSURE(p);
+ auto res = p->second.GetValue();
+ res.ReportIssues(ctx.IssueManager);
+ if (!res.Success()) {
+ return {};
+ }
+ return res;
+ }
+
private:
TYtState::TPtr State_;
@@ -772,4 +785,4 @@ THolder<IGraphTransformer> CreateYtIODiscoveryTransformer(TYtState::TPtr state)
return THolder(new TYtIODiscoveryTransformer(state));
}
-}
+} \ No newline at end of file