diff options
author | fedor-miron <fedor-miron@yandex-team.com> | 2023-08-02 20:24:34 +0300 |
---|---|---|
committer | fedor-miron <fedor-miron@yandex-team.com> | 2023-08-02 20:24:34 +0300 |
commit | 660860fd294c53ccec52cc83a742b0bb966c524b (patch) | |
tree | 1cd344b6cd7a5a7667d2c57bb37342f7e2141925 | |
parent | 6e81914eac95d3ff33e3325a4aaacdc571165f88 (diff) | |
download | ydb-660860fd294c53ccec52cc83a742b0bb966c524b.tar.gz |
YQL-9853: add batch list for yt
14 files changed, 848 insertions, 335 deletions
diff --git a/ydb/library/yql/providers/yt/gateway/file/yql_yt_file.cpp b/ydb/library/yql/providers/yt/gateway/file/yql_yt_file.cpp index e5b2f266ce..008d4e50bd 100644 --- a/ydb/library/yql/providers/yt/gateway/file/yql_yt_file.cpp +++ b/ydb/library/yql/providers/yt/gateway/file/yql_yt_file.cpp @@ -529,6 +529,7 @@ public: } } + TVector<TFolderResult::TFolderItem> items; TFolderResult res; res.SetSuccess(); @@ -545,15 +546,70 @@ public: } item.Attributes = NYT::NodeToYsonString(attrs); - res.Items.push_back(item); + items.push_back(std::move(item)); } - + res.ItemsOrFileLink = std::move(items); return MakeFuture(res); } catch (const yexception& e) { return MakeFuture(NCommon::ResultFromException<TFolderResult>(e, pos)); } } + TFuture<TBatchFolderResult> ResolveLinks(TResolveOptions&& options) final { + TBatchFolderResult res; + res.SetSuccess(); + for (auto&& [item, reqAttrs] : options.Items()) { + if (item.Type != "link") { + res.Items.push_back(item); + } + else { + if (item.Attributes.HasKey("broken") || item.Attributes["broken"].AsBool()) { + continue; + } + const TStringBuf targetPath = item.Attributes["target_path"].AsString(); + const auto folder = targetPath.RBefore('/'); + const auto folderContent = GetFolder(TFolderOptions(options.SessionId()) + .Attributes(reqAttrs) + .Cluster(options.Cluster()) + .Prefix(TString(folder)) + .Config(options.Config()) + .Pos(options.Pos())).GetValue(); + + if (std::holds_alternative<TFileLinkPtr>(folderContent.ItemsOrFileLink)) { + continue; + } + for (const auto& item: std::get<TVector<TFolderResult::TFolderItem>>(folderContent.ItemsOrFileLink)) { + if (item.Path == targetPath) { + res.Items.push_back({item.Path, item.Type, NYT::NodeFromYsonString(item.Attributes)}); + break; + } + } + } + } + return MakeFuture(res); + } + + TFuture<TBatchFolderResult> GetFolders(TBatchFolderOptions&& options) final { + TBatchFolderResult res; + res.SetSuccess(); + for (const auto& folder : options.Folders()) { + TFolderOptions folderOptions(options.SessionId()); + folderOptions.Attributes(folder.AttrKeys) + .Cluster(options.Cluster()) + .Prefix(folder.Prefix) + .Config(options.Config()) + .Pos(options.Pos()); + const auto folderContent = GetFolder(TFolderOptions(std::move(folderOptions))).GetValue(); + if (std::holds_alternative<TFileLinkPtr>(folderContent.ItemsOrFileLink)) { + continue; + } + for (const auto& item: std::get<TVector<TFolderResult::TFolderItem>>(folderContent.ItemsOrFileLink)) { + res.Items.push_back({item.Path, item.Type, NYT::NodeFromYsonString(item.Attributes)}); + } + } + return MakeFuture(res); + } + TFuture<TResOrPullResult> ResOrPull(const TExprNode::TPtr& node, TExprContext& ctx, TResOrPullOptions&& options) final { TResOrPullResult res; auto nodePos = ctx.GetPosition(node->Pos()); @@ -1426,4 +1482,4 @@ IYtGateway::TPtr CreateYtFileGateway(const NFile::TYtFileServices::TPtr& service return new NFile::TYtFileGateway(services, emulateOutputForMultirunPtr); } -} // NYql +} // NYql
\ No newline at end of file diff --git a/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h b/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h index b99912ee1e..dfe2376d08 100644 --- a/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h +++ b/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h @@ -54,7 +54,11 @@ public: TString DefaultTmpFolder; THashMap<std::tuple<TString, TString, TString>, std::vector<NYT::TRichYPath>> RangeCache; THashMap<TString, std::pair<std::vector<TString>, std::vector<std::exception_ptr>>> PartialRangeCache; - THashMap<TString, std::variant<std::vector<std::tuple<TString, TString, TString>>, TFileLinkPtr>> FolderCache; + + using TFolderCache = THashMap<TString, std::vector<std::tuple<TString, TString, NYT::TNode>>>; + TFolderCache FolderCache; + + THashMap<TString, TFileLinkPtr> FolderFilePtrCache; TMutex Lock_; diff --git a/ydb/library/yql/providers/yt/gateway/native/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/providers/yt/gateway/native/CMakeLists.darwin-x86_64.txt index b0b5b9bfdb..92bdf4cee5 100644 --- a/ydb/library/yql/providers/yt/gateway/native/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/providers/yt/gateway/native/CMakeLists.darwin-x86_64.txt @@ -65,6 +65,7 @@ target_sources(yt-gateway-native PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_exec_ctx.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_lambda_builder.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_op_tracker.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_qb2.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_session.cpp diff --git a/ydb/library/yql/providers/yt/gateway/native/CMakeLists.linux-aarch64.txt b/ydb/library/yql/providers/yt/gateway/native/CMakeLists.linux-aarch64.txt index 80b7c65cdc..f377da4581 100644 --- a/ydb/library/yql/providers/yt/gateway/native/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/providers/yt/gateway/native/CMakeLists.linux-aarch64.txt @@ -66,6 +66,7 @@ target_sources(yt-gateway-native PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_exec_ctx.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_lambda_builder.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_op_tracker.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_qb2.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_session.cpp diff --git a/ydb/library/yql/providers/yt/gateway/native/CMakeLists.linux-x86_64.txt b/ydb/library/yql/providers/yt/gateway/native/CMakeLists.linux-x86_64.txt index 80b7c65cdc..f377da4581 100644 --- a/ydb/library/yql/providers/yt/gateway/native/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/providers/yt/gateway/native/CMakeLists.linux-x86_64.txt @@ -66,6 +66,7 @@ target_sources(yt-gateway-native PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_exec_ctx.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_lambda_builder.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_op_tracker.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_qb2.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_session.cpp diff --git a/ydb/library/yql/providers/yt/gateway/native/CMakeLists.windows-x86_64.txt b/ydb/library/yql/providers/yt/gateway/native/CMakeLists.windows-x86_64.txt index b0b5b9bfdb..92bdf4cee5 100644 --- a/ydb/library/yql/providers/yt/gateway/native/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/providers/yt/gateway/native/CMakeLists.windows-x86_64.txt @@ -65,6 +65,7 @@ target_sources(yt-gateway-native PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_exec_ctx.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_lambda_builder.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_op_tracker.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_qb2.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/yt/gateway/native/yql_yt_session.cpp diff --git a/ydb/library/yql/providers/yt/gateway/native/ut/ya.make b/ydb/library/yql/providers/yt/gateway/native/ut/ya.make new file mode 100644 index 0000000000..42f056a7f3 --- /dev/null +++ b/ydb/library/yql/providers/yt/gateway/native/ut/ya.make @@ -0,0 +1,19 @@ +GTEST() + +SRCS( + yql_yt_native_folders_ut.cpp +) + +PEERDIR( + ydb/library/yql/providers/yt/gateway/native + ydb/library/yql/providers/yt/gateway/file + ydb/library/yql/core/ut_common + library/cpp/testing/mock_server + library/cpp/testing/common + ydb/library/yql/public/udf/service/terminate_policy + ydb/library/yql/sql/pg +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/library/yql/providers/yt/gateway/native/ut/yql_yt_native_folders_ut.cpp b/ydb/library/yql/providers/yt/gateway/native/ut/yql_yt_native_folders_ut.cpp new file mode 100644 index 0000000000..d83173e2d4 --- /dev/null +++ b/ydb/library/yql/providers/yt/gateway/native/ut/yql_yt_native_folders_ut.cpp @@ -0,0 +1,196 @@ +#include <library/cpp/yson/node/node_io.h> +#include <ydb/library/yql/core/ut_common/yql_ut_common.h> +#include <library/cpp/testing/common/network.h> +#include <library/cpp/testing/mock_server/server.h> +#include <library/cpp/testing/gtest/gtest.h> +#include <ydb/library/yql/providers/yt/gateway/native/yql_yt_native.h> +#include <ydb/library/yql/core/file_storage/proto/file_storage.pb.h> +#include <ydb/library/yql/providers/common/proto/gateways_config.pb.h> +#include <ydb/library/yql/providers/yt/provider/yql_yt_provider.h> + +namespace NYql { + +namespace { + +constexpr auto CYPRES_TX_ID = "\"123321\""; +constexpr auto CYPRES_NODE_A_CONTENT = R"( +[ + { + output = [ + < + "user_attributes" = {}; + "type" = "table"; + > "a"; + < + "user_attributes" = {}; + "type" = "table"; + > "b"; + < + "user_attributes" = {}; + "target_path" = "//link_dest"; + "broken" = %false; + "type" = "link"; + > "link"; + < + "user_attributes" = {}; + "target_path" = "//link_broken_dest"; + "broken" = %true; + "type" = "link"; + > "link_broken"; + ]; + }; +] +)"; +constexpr auto CYPRES_LINK_DEST = R"( +[ + { + "output" = < + "user_attributes" = {}; + "type" = "table"; + > #; + }; +] +)"; +TVector<IYtGateway::TFolderResult::TFolderItem> EXPECTED_ITEMS { + {"test/a/a", "table", R"({"user_attributes"={}})"}, + {"test/a/b", "table", R"({"user_attributes"={}})"}, + {"test/a/link", "table", R"({"user_attributes"={}})"} +}; + +TGatewaysConfig MakeGatewaysConfig(size_t port) +{ + TGatewaysConfig config {}; + auto* clusters = config.MutableYt()->MutableClusterMapping(); + NYql::TYtClusterConfig cluster; + cluster.SetName("ut_cluster"); + cluster.SetYTName("ut_cluster"); + cluster.SetCluster("localhost:" + ToString(port)); + clusters->Add(std::move(cluster)); + return config; +} + +class TYtReplier : public TRequestReplier { +public: + bool DoReply(const TReplyParams& params) override { + const TParsedHttpFull parsed(params.Input.FirstLine()); + Cout << parsed.Path << Endl; + + HttpCodes code = HTTP_NOT_FOUND; + TString content; + if (parsed.Path == "/api/v3/start_tx") { + content = CYPRES_TX_ID; + code = HTTP_OK; + } + else if (parsed.Path == "/api/v3/ping_tx") { + code = HTTP_OK; + } + else if (parsed.Path == "/api/v3/execute_batch") { + auto executeBatchRes = HandleExecuteBatch(params.Input); + executeBatchRes.OutTo(params.Output); + return true; + } + THttpResponse resp(code); + resp.SetContent(content); + resp.OutTo(params.Output); + + return true; + } + explicit TYtReplier(THashMap<TString, THashSet<TString>> requiredAttributes): requiredAttributes(requiredAttributes) {} + +private: + void CheckReqAttributes(TStringBuf path, const NYT::TNode& attributes) { + THashSet<TString> attributesSet; + for (const auto& attribute : attributes.AsList()) { + attributesSet.insert(attribute.AsString()); + } + EXPECT_EQ(requiredAttributes[path], attributesSet); + } + + THttpResponse HandleListCommand(const NYT::TNode& path, const NYT::TNode& attributes) { + CheckReqAttributes(path.AsString(), attributes); + + THttpResponse resp{HTTP_OK}; + if (path == "//test/a") { + resp.SetContent(CYPRES_NODE_A_CONTENT); + return resp; + } + return THttpResponse{HTTP_NOT_FOUND}; + } + + THttpResponse HandleGetCommand(const NYT::TNode& path, const NYT::TNode& attributes) { + CheckReqAttributes(path.AsString(), attributes); + + THttpResponse resp{HTTP_OK}; + if (path == "//link_dest") { + resp.SetContent(CYPRES_LINK_DEST); + return resp; + } + + return THttpResponse{HTTP_NOT_FOUND}; + } + + THttpResponse HandleExecuteBatch(THttpInput& input) { + auto requestBody = input.ReadAll(); + auto requestBodyNode = NYT::NodeFromYsonString(requestBody); + if (!requestBodyNode.HasKey("requests")) { + return THttpResponse{HTTP_INTERNAL_SERVER_ERROR}; + } + auto& requests = requestBodyNode["requests"]; + if (!requests.IsList()) { + return THttpResponse{HTTP_INTERNAL_SERVER_ERROR}; + } + for (auto& request : requests.AsList()) { + auto& command = request["command"]; + auto& parameters = request["parameters"]; + if (command == "list") { + return HandleListCommand(parameters["path"], parameters.HasKey("attributes") ? parameters["attributes"] : NYT::TNode{}); + } + if (command == "get") { + return HandleGetCommand(parameters["path"], parameters.HasKey("attributes") ? parameters["attributes"] : NYT::TNode{}); + } + } + return THttpResponse{HTTP_NOT_FOUND}; + } + + THashMap<TString, THashSet<TString>> requiredAttributes; +}; + +TEST(YtNativeGateway, GetFolder) { + const auto port = NTesting::GetFreePort(); + THashMap<TString, THashSet<TString>> requiredAttributes { + {"//test/a", {"type", "broken", "target_path", "user_attributes"}}, + {"//link_dest", {"type", "user_attributes"}} + }; + NMock::TMockServer mockServer{port, [&requiredAttributes] () {return new TYtReplier(requiredAttributes);}}; + + TYtNativeServices nativeServices; + auto gatewaysConfig = MakeGatewaysConfig(port); + nativeServices.Config = std::make_shared<TYtGatewayConfig>(gatewaysConfig.GetYt()); + nativeServices.FileStorage = CreateFileStorage(TFileStorageConfig{}); + + auto ytGateway = CreateYtNativeGateway(nativeServices); + auto ytState = MakeIntrusive<TYtState>(); + ytState->Gateway = ytGateway; + + InitializeYtGateway(ytGateway, ytState); + + IYtGateway::TFolderOptions folderOptions{ytState->SessionId}; + TYtSettings ytSettings {}; + folderOptions.Cluster("ut_cluster") + .Config(std::make_shared<TYtSettings>(ytSettings)) + .Prefix("//test/a") + .Attributes({"user_attributes"}); + auto folderFuture = ytGateway->GetFolder(std::move(folderOptions)); + + folderFuture.Wait(); + ytState->Gateway->CloseSession({ytState->SessionId}); + auto folderRes = folderFuture.GetValue(); + ASSERT_EQ(folderRes.Success(), true) << folderRes.Issues().ToString(); + ASSERT_EQ( + folderRes.ItemsOrFileLink, + (std::variant<TVector<IYtGateway::TFolderResult::TFolderItem>, TFileLinkPtr>(EXPECTED_ITEMS))); +} + +} // namespace + +} // namespace NYql diff --git a/ydb/library/yql/providers/yt/gateway/native/ya.make b/ydb/library/yql/providers/yt/gateway/native/ya.make index 27ac2da5f5..daf8fc46d3 100644 --- a/ydb/library/yql/providers/yt/gateway/native/ya.make +++ b/ydb/library/yql/providers/yt/gateway/native/ya.make @@ -4,6 +4,7 @@ SRCS( yql_yt_exec_ctx.cpp yql_yt_lambda_builder.cpp yql_yt_native.cpp + yql_yt_native_folders.cpp yql_yt_op_tracker.cpp yql_yt_qb2.cpp yql_yt_session.cpp @@ -63,3 +64,7 @@ PEERDIR( YQL_LAST_ABI_VERSION() END() + +RECURSE_FOR_TESTS( + ut +) diff --git a/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp index 1c9b3b3355..67b4a04d86 100644 --- a/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp +++ b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp @@ -5,6 +5,7 @@ #include "yql_yt_session.h" #include "yql_yt_spec.h" #include "yql_yt_transform.h" +#include "yql_yt_native_folders.h" #include <ydb/library/yql/providers/yt/gateway/lib/yt_helpers.h> #include <ydb/library/yql/providers/yt/lib/config_clusters/config_clusters.h> @@ -134,22 +135,6 @@ TString ToColumnList(const TVector<T>& list) { return (builder << ']'); } -TString GetType(const NYT::TNode& attr) { - if (!attr.HasKey("type")) { - return "unknown"; - } - - return attr["type"].AsString(); -} - -TString GetAttrType(const NYT::TNode& node) { - if (!node.HasAttributes()) { - return "unknown"; - } - - return GetType(node.GetAttributes()); -} - const NYT::TJobBinaryConfig GetJobBinary(const NYT::TRawMapOperationSpec& spec) { return spec.MapperSpec_.GetJobBinary(); } @@ -474,6 +459,88 @@ public: YQL_CLOG(INFO, ProviderYt) << "Server=" << options.Cluster() << ", Prefix=" << options.Prefix(); + try { + TSession::TPtr session = GetSession(options.SessionId()); + + auto batchOptions = TBatchFolderOptions(options.SessionId()) + .Cluster(options.Cluster()) + .Pos(options.Pos()) + .Config(options.Config()) + .Folders({{options.Prefix(), options.Attributes()}}); + auto execCtx = MakeExecCtx(std::move(batchOptions), session, options.Cluster(), nullptr, nullptr); + + if (auto filePtr = MaybeGetFilePtrFromCache(execCtx->GetOrCreateEntry(), execCtx->Options_.Folders().front())) { + TFolderResult res; + res.SetSuccess(); + res.ItemsOrFileLink = *filePtr; + return MakeFuture(res); + } + + auto getFolderFuture = session->Queue_->Async([execCtx] () { + YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_); + return ExecGetFolder(execCtx); + }); + auto resolvedLinksFuture = getFolderFuture.Apply([options, this, session] (const TFuture<TBatchFolderResult>& f) { + TVector<IYtGateway::TResolveOptions::TItemWithReqAttrs> resolveItems; + auto res = f.GetValue(); + for (auto&& item : res.Items) { + IYtGateway::TResolveOptions::TItemWithReqAttrs resolveItem { + .Item = item, + .AttrKeys = options.Attributes() + }; + resolveItems.push_back(std::move(resolveItem)); + } + + auto resolveOptions = TResolveOptions(options.SessionId()) + .Cluster(options.Cluster()) + .Pos(options.Pos()) + .Config(options.Config()) + .Items(resolveItems); + auto execCtx = MakeExecCtx(std::move(resolveOptions), session, options.Cluster(), nullptr, nullptr); + return ExecResolveLinks(execCtx); + }); + + return resolvedLinksFuture.Apply([execCtx] (const TFuture<TBatchFolderResult>& f) { + const ui32 countLimit = execCtx->Options_.Config()->FolderInlineItemsLimit.Get().GetOrElse(100); + const ui64 sizeLimit = execCtx->Options_.Config()->FolderInlineDataLimit.Get().GetOrElse(100_KB); + + TFolderResult res; + res.SetSuccess(); + + auto resolveRes = f.GetValue(); + TVector<TFolderResult::TFolderItem> items; + for (auto& batchItem : resolveRes.Items) { + TFolderResult::TFolderItem item { + .Path = std::move(batchItem.Path), + .Type = std::move(batchItem.Type), + .Attributes = NYT::NodeToYsonString(batchItem.Attributes) + }; + items.emplace_back(std::move(item)); + } + if (items.size() > countLimit) { + res.ItemsOrFileLink = SaveItemsToTempFile(execCtx, items); + return res; + } + ui64 total_size = std::accumulate(items.begin(), items.end(), 0, [] (ui64 size, const TFolderResult::TFolderItem& i) { + return size + i.Type.length() + i.Path.length() + i.Attributes.length(); + }); + if (total_size > sizeLimit) { + res.ItemsOrFileLink = SaveItemsToTempFile(execCtx, items); + return res; + } + res.ItemsOrFileLink = std::move(items); + return res; + }); + } catch (...) { + return MakeFuture(ResultFromCurrentException<TFolderResult>(options.Pos())); + } + } + + TFuture<TBatchFolderResult> GetFolders(TBatchFolderOptions&& options) final { + YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__); + + YQL_CLOG(INFO, ProviderYt) << "Server=" << options.Cluster(); + auto pos = options.Pos(); try { TSession::TPtr session = GetSession(options.SessionId()); @@ -486,7 +553,29 @@ public: return ExecGetFolder(execCtx); }); } catch (...) { - return MakeFuture(ResultFromCurrentException<TFolderResult>(pos)); + return MakeFuture(ResultFromCurrentException<TBatchFolderResult>(pos)); + } + } + + + TFuture<TBatchFolderResult> ResolveLinks(TResolveOptions&& options) final { + YQL_LOG_CTX_SCOPE(TStringBuf("Gateway"), __FUNCTION__); + + YQL_CLOG(INFO, ProviderYt) << "Server=" << options.Cluster(); + + auto pos = options.Pos(); + try { + TSession::TPtr session = GetSession(options.SessionId()); + + auto cluster = options.Cluster(); + auto execCtx = MakeExecCtx(std::move(options), session, cluster, nullptr, nullptr); + + return session->Queue_->Async([execCtx] () { + YQL_LOG_CTX_ROOT_SESSION_SCOPE(execCtx->LogCtx_); + return ExecResolveLinks(execCtx); + }); + } catch (...) { + return MakeFuture(ResultFromCurrentException<TBatchFolderResult>(pos)); } } @@ -1829,193 +1918,6 @@ private: return rangeRes; } - static TFolderResult ExecGetFolder(const TExecContext<TFolderOptions>::TPtr& execCtx) - { - try { - auto entry = execCtx->GetOrCreateEntry(); - - TFolderResult res; - res.SetSuccess(); - - TString prefix = execCtx->Options_.Prefix(); - TString cacheKey = prefix; - - TSet<TString> uniqueAttrs; - for (const auto& attr : execCtx->Options_.Attributes()) { - uniqueAttrs.insert(attr); - } - // Make key with sorted attrs - std::for_each(uniqueAttrs.cbegin(), uniqueAttrs.cend(), [&cacheKey](const auto& attr) { cacheKey.append('&').append(attr); } ); - - with_lock(entry->Lock_) { - if (auto p = entry->FolderCache.FindPtr(cacheKey)) { - if (auto file = std::get_if<TFileLinkPtr>(p)) { - res.File = *file; - YQL_CLOG(INFO, ProviderYt) << "Found folder in cache with key ('" << cacheKey << "') with tmp file " << res.File->GetPath().GetPath(); - } else { - const auto& list = std::get<std::vector<std::tuple<TString, TString, TString>>>(*p); - YQL_CLOG(INFO, ProviderYt) << "Found folder in cache with key ('" << cacheKey << "') with " << list.size() << " items"; - for (auto& el: list) { - TFolderResult::TFolderItem item; - std::tie(item.Type, item.Path, item.Attributes) = el; - res.Items.push_back(std::move(item)); - } - } - return res; - } - } - - if (!prefix.empty() && !entry->Tx->Exists(prefix)) { - YQL_CLOG(INFO, ProviderYt) << "Storing empty folder in cache with key ('" << cacheKey << "')"; - with_lock(entry->Lock_) { - entry->FolderCache[cacheKey] = std::vector<std::tuple<TString, TString, TString>>{}; - } - return res; - } - - - TSet<TString> uniqueAttrsWithSystemAttrs = uniqueAttrs; - uniqueAttrsWithSystemAttrs.insert("type"); - uniqueAttrsWithSystemAttrs.insert("target_path"); - uniqueAttrsWithSystemAttrs.insert("broken"); - - auto makeAttrFilter = [](const TSet<TString>& attrs) { - NYT::TAttributeFilter ret; - for (const auto& attr : attrs) { - ret.AddAttribute(attr); - } - return ret; - }; - - auto nodeList = entry->Tx->List(prefix, TListOptions().AttributeFilter(makeAttrFilter(uniqueAttrsWithSystemAttrs))); - res.Items.reserve(nodeList.size()); - - uniqueAttrsWithSystemAttrs.erase("target_path"); - uniqueAttrsWithSystemAttrs.erase("broken"); - - if (prefix) { - prefix.append('/'); - } - - THolder<TOFStream> out; - ui32 count = 0; - ui64 size = 0; - const ui32 countLimit = execCtx->Options_.Config()->FolderInlineItemsLimit.Get().GetOrElse(100); - const ui64 sizeLimit = execCtx->Options_.Config()->FolderInlineDataLimit.Get().GetOrElse(100_KB); - TString file; - Y_DEFER { - if (file) { - NFs::Remove(file); - } - }; - - auto writeItem = [&uniqueAttrs, &out, &res, &count, &file, - countLimit, &size, sizeLimit, execCtx](const NYT::TNode& node, TString path) { - auto type = GetAttrType(node); - if (path.StartsWith(NYT::TConfig::Get()->Prefix)) { - path = path.substr(NYT::TConfig::Get()->Prefix.size()); - } - NYT::TNode retAttrs = NYT::TNode::CreateMap(); - auto& nodeAttrs = node.GetAttributes(); - for (const auto& attrName : uniqueAttrs) { - if (attrName && nodeAttrs.HasKey(attrName)) { - retAttrs[attrName] = nodeAttrs[attrName]; - } - } - auto attrs = NYT::NodeToYsonString(retAttrs); - - if (!out) { - ++count; - size += type.length() + path.length() + attrs.length(); - if (count <= countLimit && size <= sizeLimit) { - TFolderResult::TFolderItem item; - item.Type = std::move(type); - item.Path = std::move(path); - item.Attributes = std::move(attrs); - res.Items.push_back(std::move(item)); - return; - } - file = execCtx->FileStorage_->GetTemp() / GetGuidAsString(execCtx->Session_->RandomProvider_->GenGuid()); - out = MakeHolder<TOFStream>(file); - for (auto& item: res.Items) { - ::SaveMany(out.Get(), item.Type, item.Path, item.Attributes); - } - res.Items.clear(); - YQL_CLOG(INFO, ProviderYt) << "Folder limit exceeded. Writing items to file " << file; - } - - ::SaveMany(out.Get(), type, path, attrs); - }; - - auto batchGet = entry->Tx->CreateBatchRequest(); - TVector<TFuture<void>> batchRes; - - for (const auto& node : nodeList) { - auto path = prefix + node.AsString(); - auto type = GetAttrType(node); - if (type != "link") { - writeItem(node, path); - } - else if (!node.GetAttributes()["broken"].AsBool()) { - auto targetPath = node.GetAttributes()["target_path"].AsString(); - batchRes.push_back( - batchGet->Get(targetPath, TGetOptions().AttributeFilter(makeAttrFilter(uniqueAttrsWithSystemAttrs))) - .Apply([path, writeItem](const NThreading::TFuture<NYT::TNode>& f) { - try { - writeItem(f.GetValue(), path); - } catch (...) { - writeItem(NYT::TNode::CreateMap(), path); - } - }) - ); - } - } - - if (!batchRes.empty()) { - batchGet->ExecuteBatch(); - WaitExceptionOrAll(batchRes).GetValue(); - } - if (out) { - ::SaveSize(out.Get(), 0); - out.Destroy(); - res.File = CreateFakeFileLink(file, TString(), true); - file = {}; - } - YQL_CLOG(INFO, ProviderYt) << "Folder items count=" << count << ", size=" << size; - - if (res.File) { - YQL_CLOG(INFO, ProviderYt) << "Storing folder tmp file " << res.File->GetPath().GetPath() << " in cache with key ('" << cacheKey << "')"; - with_lock(entry->Lock_) { - entry->FolderCache[cacheKey] = res.File; - } - } else { - YQL_CLOG(INFO, ProviderYt) << "Storing folder with " << res.Items.size() << " items in cache with key ('" << cacheKey << "')"; - std::vector<std::tuple<TString, TString, TString>> cache; - for (const auto& item: res.Items) { - cache.emplace_back(item.Type, item.Path, item.Attributes); - } - with_lock(entry->Lock_) { - entry->FolderCache[cacheKey] = std::move(cache); - } - } - - return res; - - } catch (const NYT::TErrorResponse &e) { - if (e.GetError().ContainsErrorCode(NYT::NClusterErrorCodes::NRpc::NoSuchMethod)) { - TFolderResult res; - TString errMsg = execCtx->Options_.Prefix() + " is not a folder."; - TIssue rootIssue = YqlIssue(execCtx->Options_.Pos(), TIssuesIds::YT_FOLDER_INPUT_IS_NOT_A_FOLDER, errMsg); - res.SetStatus(TIssuesIds::YT_FOLDER_INPUT_IS_NOT_A_FOLDER); - res.AddIssue(rootIssue); - return res; - } - return ResultFromCurrentException<TFolderResult>(execCtx->Options_.Pos()); - } catch (...) { - return ResultFromCurrentException<TFolderResult>(execCtx->Options_.Pos()); - } - } - static TFuture<void> ExecPublish( const TExecContext<TPublishOptions>::TPtr& execCtx, const TVector<TString>& src, @@ -4905,4 +4807,4 @@ IYtGateway::TPtr CreateYtNativeGateway(const TYtNativeServices& services) { return MakeIntrusive<NNative::TYtNativeGateway>(services); } -} // NYql +} // NYql
\ No newline at end of file diff --git a/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.cpp b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.cpp new file mode 100644 index 0000000000..f668633115 --- /dev/null +++ b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.cpp @@ -0,0 +1,229 @@ +#include "yql_yt_native_folders.h" + +#include <yt/cpp/mapreduce/interface/error_codes.h> +#include <ydb/library/yql/utils/log/log.h> +#include <ydb/library/yql/providers/yt/gateway/lib/yt_helpers.h> + +namespace NYql::NNative { + +using namespace NYT; +using namespace NCommon; +using namespace NKikimr; +using namespace NKikimr::NMiniKQL; +using namespace NNodes; +using namespace NThreading; + +TString GetType(const NYT::TNode& attr) { + if (!attr.HasKey("type")) { + return "unknown"; + } + + return attr["type"].AsString(); +} + +TString GetAttrType(const NYT::TNode& node) { + if (!node.HasAttributes()) { + return "unknown"; + } + + return GetType(node.GetAttributes()); +} + +TMaybe<TVector<IYtGateway::TBatchFolderResult::TFolderItem>> MaybeGetFolderFromCache(TTransactionCache::TEntry::TPtr entry, TStringBuf cacheKey) { + TVector<IYtGateway::TBatchFolderResult::TFolderItem> items; + with_lock(entry->Lock_) { + const auto listPtr = entry->FolderCache.FindPtr(cacheKey); + if (listPtr) { + YQL_CLOG(INFO, ProviderYt) << "Found folder in cache with key ('" << cacheKey << "') with " << listPtr->size() << " items"; + for (auto& el : *listPtr) { + IYtGateway::TBatchFolderResult::TFolderItem item; + std::tie(item.Type, item.Path, item.Attributes) = el; + items.emplace_back(std::move(item)); + } + return items; + } + } + return {}; +} + +TMaybe<TFileLinkPtr> MaybeGetFilePtrFromCache(TTransactionCache::TEntry::TPtr entry, const IYtGateway::TBatchFolderOptions::TFolderPrefixAttrs& folder) { + const auto cacheKey = std::accumulate(folder.AttrKeys.begin(), folder.AttrKeys.end(), folder.Prefix, + [] (TString&& str, const TString& arg) { + return str + "&" + arg; + }); + with_lock(entry->Lock_) { + const auto filePtr = entry->FolderFilePtrCache.FindPtr(cacheKey); + if (filePtr) { + return filePtr->Get(); + } + } + return {}; +} + +TAttributeFilter MakeAttrFilter(const TSet<TString>& attributes, bool isResolvingLink) { + NYT::TAttributeFilter filter; + for (const auto& attr : attributes) { + filter.AddAttribute(attr); + } + if (!isResolvingLink) { + filter.AddAttribute("target_path"); + filter.AddAttribute("broken"); + } + filter.AddAttribute("type"); + return filter; +} + +IYtGateway::TBatchFolderResult::TFolderItem MakeFolderItem(const NYT::TNode& node, const TString& path) { + IYtGateway::TBatchFolderResult::TFolderItem item; + item.Attributes = NYT::TNode::CreateMap(); + for (const auto& attr: node.GetAttributes().AsMap()) { + if (attr.first == "type") { + continue; + } + item.Attributes[attr.first] = attr.second; + } + item.Type = GetAttrType(node); + item.Path = path.StartsWith(NYT::TConfig::Get()->Prefix) + ? path.substr(NYT::TConfig::Get()->Prefix.size()) + : path; + return item; +} + +const TTransactionCache::TEntry::TFolderCache::value_type& StoreResInCache(const TTransactionCache::TEntry::TPtr& entry, TVector<IYtGateway::TBatchFolderResult::TFolderItem>&& items, const TString& cacheKey) { + std::vector<std::tuple<TString, TString, NYT::TNode>> cache; + for (const auto& item : items) { + cache.emplace_back(std::move(item.Type), std::move(item.Path), std::move(item.Attributes)); + } + with_lock(entry->Lock_) { + const auto [it, _] = entry->FolderCache.insert_or_assign(cacheKey, std::move(cache)); + return *it; + } +} + +TFileLinkPtr SaveItemsToTempFile(const TExecContext<IYtGateway::TBatchFolderOptions>::TPtr& execCtx, const TVector<IYtGateway::TFolderResult::TFolderItem>& folderItems) { + const TString file = execCtx->FileStorage_->GetTemp() / GetGuidAsString(execCtx->Session_->RandomProvider_->GenGuid()); + YQL_CLOG(INFO, ProviderYt) << "Folder limit exceeded. Writing items to file " << file; + + auto out = MakeHolder<TOFStream>(file); + for (auto& item: folderItems) { + ::SaveMany(out.Get(), item.Type, item.Path, item.Attributes); + } + ::SaveSize(out.Get(), 0); + out.Destroy(); + return CreateFakeFileLink(file, "", true); +} + +IYtGateway::TBatchFolderResult ExecResolveLinks(const TExecContext<IYtGateway::TResolveOptions>::TPtr& execCtx) { + try { + auto batchGet = execCtx->GetEntry()->Tx->CreateBatchRequest(); + TVector<TFuture<IYtGateway::TBatchFolderResult::TFolderItem>> batchRes; + + for (const auto& [item, reqAttributes]: execCtx->Options_.Items()) { + if (item.Type != "link") { + batchRes.push_back(MakeFuture<IYtGateway::TBatchFolderResult::TFolderItem>(std::move(item))); + continue; + } + if (item.Attributes["broken"].AsBool()) { + continue; + } + const auto& targetPath = item.Attributes["target_path"].AsString(); + const auto& path = item.Path; + const auto attrFilter = MakeAttrFilter(reqAttributes, /* isResolvingLink */ true); + + batchRes.push_back( + batchGet->Get(targetPath, TGetOptions().AttributeFilter(attrFilter)) + .Apply([path] (const auto& f) { + const auto linkNode = f.GetValue(); + return MakeFolderItem(linkNode, path); + }) + ); + } + if (batchRes.empty()) { + return {}; + } + IYtGateway::TBatchFolderResult res; + res.SetSuccess(); + res.Items.reserve(batchRes.size()); + + batchGet->ExecuteBatch(); + WaitAll(batchRes).Wait(); + for (auto& f : batchRes) { + res.Items.emplace_back(f.ExtractValue()); + } + return res; + } + catch (...) { + return ResultFromCurrentException<IYtGateway::TBatchFolderResult>(execCtx->Options_.Pos()); + } +} + +IYtGateway::TBatchFolderResult ExecGetFolder(const TExecContext<IYtGateway::TBatchFolderOptions>::TPtr& execCtx) { + const auto entry = execCtx->GetOrCreateEntry(); + auto batchList = entry->Tx->CreateBatchRequest(); + TVector<TFuture<TVector<IYtGateway::TBatchFolderResult::TFolderItem>>> batchRes; + + IYtGateway::TBatchFolderResult folderResult; + folderResult.SetSuccess(); + + for (const auto& folder : execCtx->Options_.Folders()) { + const auto cacheKey = std::accumulate(folder.AttrKeys.begin(), folder.AttrKeys.end(), folder.Prefix, + [] (TString&& str, const TString& arg) { + return str + "&" + arg; + }); + + auto maybeCached = MaybeGetFolderFromCache(entry, cacheKey); + if (maybeCached) { + batchRes.push_back(MakeFuture<TVector<IYtGateway::TBatchFolderResult::TFolderItem>>(std::move(*maybeCached))); + continue; + } + + const auto attrFilter = MakeAttrFilter(folder.AttrKeys, /* isResolvingLink */ false); + batchRes.push_back( + batchList->List(folder.Prefix, TListOptions().AttributeFilter(attrFilter)) + .Apply([&folder, cacheKey = std::move(cacheKey), &entry] (const TFuture<NYT::TNode::TListType>& f) + -> TFuture<TVector<IYtGateway::TBatchFolderResult::TFolderItem>> { + TVector<IYtGateway::TBatchFolderResult::TFolderItem> folderItems; + try { + auto nodeList = f.GetValue(); + folderItems.reserve(nodeList.size()); + for (const auto& node : nodeList) { + TStringBuilder path; + path << folder.Prefix << "/" << node.AsString(); + folderItems.push_back(MakeFolderItem(node, path)); + } + StoreResInCache(entry, std::move(folderItems), cacheKey); + return MakeFuture(std::move(folderItems)); + } + catch (const NYT::TErrorResponse& e) { + if (e.GetError().ContainsErrorCode(NYT::NClusterErrorCodes::NYTree::ResolveError)) { + // Return empty list on missing path + YQL_CLOG(INFO, ProviderYt) << "Storing empty folder in cache with key ('" << cacheKey << "')"; + StoreResInCache(entry, {}, cacheKey); + return MakeFuture<TVector<IYtGateway::TBatchFolderResult::TFolderItem>>({}); + } + return MakeErrorFuture<TVector<IYtGateway::TBatchFolderResult::TFolderItem>>(std::current_exception()); + } + catch (...) { + return MakeErrorFuture<TVector<IYtGateway::TBatchFolderResult::TFolderItem>>(std::current_exception()); + } + }) + ); + } + batchList->ExecuteBatch(); + try { + WaitExceptionOrAll(batchRes).Wait(); + for (auto& res : batchRes) { + const auto items = res.ExtractValue(); + folderResult.Items.reserve(folderResult.Items.size() + items.size()); + for (const auto& item : items) { + folderResult.Items.push_back(std::move(item)); + } + } + } + catch (...) { + return ResultFromCurrentException<IYtGateway::TBatchFolderResult>(execCtx->Options_.Pos()); + } + return folderResult; +} + +} // NYql::NNative diff --git a/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.h b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.h new file mode 100644 index 0000000000..645fc2a4b2 --- /dev/null +++ b/ydb/library/yql/providers/yt/gateway/native/yql_yt_native_folders.h @@ -0,0 +1,27 @@ +#include <ydb/library/yql/providers/yt/gateway/native/yql_yt_exec_ctx.h> +#include <ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h> +#include <ydb/library/yql/providers/yt/provider/yql_yt_gateway.h> + +namespace NYql::NNative { + +TString GetType(const NYT::TNode& attr); + +TString GetAttrType(const NYT::TNode& node); + +TMaybe<TVector<IYtGateway::TBatchFolderResult::TFolderItem>> MaybeGetFolderFromCache(TTransactionCache::TEntry::TPtr entry, TStringBuf cacheKey); + +TMaybe<TFileLinkPtr> MaybeGetFilePtrFromCache(TTransactionCache::TEntry::TPtr entry, const IYtGateway::TBatchFolderOptions::TFolderPrefixAttrs& folder); + +NYT::TAttributeFilter MakeAttrFilter(const TSet<TString>& attributes, bool isResolvingLink); + +IYtGateway::TBatchFolderResult::TFolderItem MakeFolderItem(const NYT::TNode& node, const TString& path); + +const TTransactionCache::TEntry::TFolderCache::value_type& StoreResInCache(const TTransactionCache::TEntry::TPtr& entry, TVector<IYtGateway::TBatchFolderResult::TFolderItem>&& items, const TString& cacheKey); + +TFileLinkPtr SaveItemsToTempFile(const TExecContext<IYtGateway::TBatchFolderOptions>::TPtr& execCtx, const TVector<IYtGateway::TFolderResult::TFolderItem>& folderItems); + +IYtGateway::TBatchFolderResult ExecResolveLinks(const TExecContext<IYtGateway::TResolveOptions>::TPtr& execCtx); + +IYtGateway::TBatchFolderResult ExecGetFolder(const TExecContext<IYtGateway::TBatchFolderOptions>::TPtr& execCtx); + +} // NYql::NNative diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_gateway.h b/ydb/library/yql/providers/yt/provider/yql_yt_gateway.h index 18fabc03f2..b01ee60a31 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_gateway.h +++ b/ydb/library/yql/providers/yt/provider/yql_yt_gateway.h @@ -263,19 +263,73 @@ public: OPTION_FIELD(TString, Cluster) OPTION_FIELD(TString, Prefix) - OPTION_FIELD(TVector<TString>, Attributes) + OPTION_FIELD(TSet<TString>, Attributes) OPTION_FIELD(TYtSettings::TConstPtr, Config) OPTION_FIELD(TPosition, Pos) }; + struct TBatchFolderOptions : public TCommonOptions { + using TSelf = TBatchFolderOptions; + + TBatchFolderOptions(const TString& sessionId) + : TCommonOptions(sessionId) + { + } + + struct TFolderPrefixAttrs { + TString Prefix; + TSet<TString> AttrKeys; + }; + + OPTION_FIELD(TString, Cluster) + OPTION_FIELD(TVector<TFolderPrefixAttrs>, Folders) + OPTION_FIELD(TYtSettings::TConstPtr, Config) + OPTION_FIELD(TPosition, Pos) + }; + + struct TBatchFolderResult : public NCommon::TOperationResult { + struct TFolderItem { + TString Path; + TString Type; + NYT::TNode Attributes; + + }; + TVector<TFolderItem> Items; + }; + + struct TSerializedFolderItem { + }; + struct TFolderResult : public NCommon::TOperationResult { struct TFolderItem { TString Path; TString Type; TString Attributes; + + auto operator<=>(const TFolderItem&) const = default; }; - TVector<TFolderItem> Items; - TFileLinkPtr File; + std::variant<TVector<TFolderItem>, TFileLinkPtr> ItemsOrFileLink; + }; + + ////////////////////////////////////////////////////////////// + + struct TResolveOptions : public TCommonOptions { + using TSelf = TResolveOptions; + + TResolveOptions(const TString& sessionId) + : TCommonOptions(sessionId) + { + } + + struct TItemWithReqAttrs { + TBatchFolderResult::TFolderItem Item; + TSet<TString> AttrKeys; + }; + + OPTION_FIELD(TString, Cluster) + OPTION_FIELD(TVector<TItemWithReqAttrs>, Items) + OPTION_FIELD(TYtSettings::TConstPtr, Config) + OPTION_FIELD(TPosition, Pos) }; ////////////////////////////////////////////////////////////// @@ -522,6 +576,10 @@ public: virtual NThreading::TFuture<TFolderResult> GetFolder(TFolderOptions&& options) = 0; + virtual NThreading::TFuture<TBatchFolderResult> ResolveLinks(TResolveOptions&& options) = 0; + + virtual NThreading::TFuture<TBatchFolderResult> GetFolders(TBatchFolderOptions&& options) = 0; + virtual NThreading::TFuture<TResOrPullResult> ResOrPull(const TExprNode::TPtr& node, TExprContext& ctx, TResOrPullOptions&& options) = 0; virtual NThreading::TFuture<TRunResult> Run(const TExprNode::TPtr& node, TExprContext& ctx, TRunOptions&& options) = 0; @@ -556,4 +614,4 @@ public: virtual TGetTablePartitionsResult GetTablePartitions(TGetTablePartitionsOptions&& options) = 0; }; -} +}
\ No newline at end of file diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery.cpp index a83adf3349..08ce8f059e 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_io_discovery.cpp @@ -17,6 +17,7 @@ #include <ydb/library/yql/core/issue/protos/issue_id.pb.h> #include <ydb/library/yql/core/issue/yql_issue.h> +#include <library/cpp/yson/node/node_io.h> #include <library/cpp/threading/future/future.h> #include <util/generic/vector.h> @@ -323,7 +324,7 @@ public: IYtGateway::TFolderOptions(State_->SessionId) .Cluster(cluster) .Prefix(folder.Prefix) - .Attributes(folder.Attributes) + .Attributes(TSet<TString>(folder.Attributes.begin(), folder.Attributes.end())) .Config(State_->Configuration->Snapshot()) .Pos(x.second.first) ); @@ -372,147 +373,50 @@ public: } if (keys.GetType() == TYtKey::EType::Folder) { - auto p = PendingFolders_.FindPtr(std::make_pair(cluster, *keys.GetKeys().front().GetFolder())); - YQL_ENSURE(p); - auto& res = p->second.GetValue(); - res.ReportIssues(ctx.IssueManager); - if (!res.Success()) { - return {}; + const auto res = FetchFolderResult(ctx, cluster, *keys.GetKeys().front().GetFolder()); + if (!res) { + return node; } - - if (res.File) { + if (auto file = std::get_if<TFileLinkPtr>(&res->ItemsOrFileLink)) { TString alias; - if (auto p = FolderFileToAlias_.FindPtr(res.File->GetPath().GetPath())) { + if (auto p = FolderFileToAlias_.FindPtr(file->Get()->GetPath().GetPath())) { alias = *p; } else { alias = TString("_yql_folder").append(ToString(FolderFileToAlias_.size())); - FolderFileToAlias_.emplace(res.File->GetPath().GetPath(), alias); + FolderFileToAlias_.emplace(file->Get()->GetPath().GetPath(), alias); TUserDataBlock tmpBlock; tmpBlock.Type = EUserDataType::PATH; - tmpBlock.Data = res.File->GetPath().GetPath(); + tmpBlock.Data = file->Get()->GetPath().GetPath(); tmpBlock.Usage.Set(EUserDataBlockUsage::Path); - tmpBlock.FrozenFile = res.File; + tmpBlock.FrozenFile = file->Get(); State_->Types->UserDataStorage->AddUserDataBlock(alias, tmpBlock); } - return ctx.Builder(node->Pos()) - .Callable("Cons!") - .World(0) - .Callable(1, "AssumeColumnOrder") - .Callable(0, "Collect") - .Callable(0, "Apply") - .Callable(0, "Udf") - .Atom(0, "File.FolderListFromFile") - .Seal() - .Callable(1, "FilePath") - .Atom(0, alias) - .Seal() - .Seal() + auto folderListFromFile = ctx.Builder(node->Pos()) + .Callable("Collect") + .Callable(0, "Apply") + .Callable(0, "Udf") + .Atom(0, "File.FolderListFromFile") .Seal() - .List(1) - .Atom(0, "Path") - .Atom(1, "Type") - .Atom(2, "Attributes") + .Callable(1, "FilePath") + .Atom(0, alias) .Seal() .Seal() .Seal() .Build(); + + return BuildFolderTableResExpr(ctx, node->Pos(), read.World(), folderListFromFile).Ptr(); } + auto items = std::get<TVector<IYtGateway::TFolderResult::TFolderItem>>(res->ItemsOrFileLink); TVector<TExprBase> listItems; - for (auto& item: res.Items) { - listItems.push_back(Build<TCoAsStruct>(ctx, node->Pos()) - .Add() - .Add<TCoAtom>() - .Value("Path") - .Build() - .Add<TCoString>() - .Literal() - .Value(item.Path) - .Build() - .Build() - .Build() - .Add() - .Add<TCoAtom>() - .Value("Type") - .Build() - .Add<TCoString>() - .Literal() - .Value(item.Type) - .Build() - .Build() - .Build() - .Add() - .Add<TCoAtom>() - .Value("Attributes") - .Build() - .Add<TCoYson>() - .Literal() - .Value(item.Attributes) - .Build() - .Build() - .Build() - .Done() - ); - } - - return Build<TCoCons>(ctx, node->Pos()) - .World(read.World()) - .Input<TCoAssumeColumnOrder>() - .Input<TCoList>() - .ListType<TCoListType>() - .ItemType<TCoStructType>() - .Add<TExprList>() - .Add<TCoAtom>() - .Value("Path") - .Build() - .Add<TCoDataType>() - .Type() - .Value("String") - .Build() - .Build() - .Build() - .Add<TExprList>() - .Add<TCoAtom>() - .Value("Type") - .Build() - .Add<TCoDataType>() - .Type() - .Value("String") - .Build() - .Build() - .Build() - .Add<TExprList>() - .Add<TCoAtom>() - .Value("Attributes") - .Build() - .Add<TCoDataType>() - .Type() - .Value("Yson") - .Build() - .Build() - .Build() - .Build() - .Build() - .FreeArgs() - .Add(listItems) - .Build() - .Build() - .ColumnOrder<TCoAtomList>() - .Add() - .Value("Path") - .Build() - .Add() - .Value("Type") - .Build() - .Add() - .Value("Attributes") - .Build() - .Build() - .Build() - .Done().Ptr(); + for (auto& item: items) { + listItems.push_back(BuildFolderListItemExpr(ctx, node->Pos(), item)); + } + + return BuildFolderTableResExpr(ctx, node->Pos(), read.World(), BuildFolderListExpr(ctx, node->Pos(), listItems).Ptr()).Ptr(); } if (keys.GetType() != TYtKey::EType::Table) { @@ -756,6 +660,115 @@ private: return res; } + TExprBase BuildFolderListItemExpr(TExprContext& ctx, NYql::TPositionHandle pos, const IYtGateway::TFolderResult::TFolderItem& folderItem) { + return Build<TCoAsStruct>(ctx, pos) + .Add() + .Add<TCoAtom>() + .Value("Path") + .Build() + .Add<TCoString>() + .Literal() + .Value(folderItem.Path) + .Build() + .Build() + .Build() + .Add() + .Add<TCoAtom>() + .Value("Type") + .Build() + .Add<TCoString>() + .Literal() + .Value(folderItem.Type) + .Build() + .Build() + .Build() + .Add() + .Add<TCoAtom>() + .Value("Attributes") + .Build() + .Add<TCoYson>() + .Literal() + .Value(folderItem.Attributes) + .Build() + .Build() + .Build() + .Done(); + } + + TCoList BuildFolderListExpr(TExprContext& ctx, NYql::TPositionHandle pos, const TVector<TExprBase>& folderItems) { + return Build<TCoList>(ctx, pos) + .ListType<TCoListType>() + .ItemType<TCoStructType>() + .Add<TExprList>() + .Add<TCoAtom>() + .Value("Path") + .Build() + .Add<TCoDataType>() + .Type() + .Value("String") + .Build() + .Build() + .Build() + .Add<TExprList>() + .Add<TCoAtom>() + .Value("Type") + .Build() + .Add<TCoDataType>() + .Type() + .Value("String") + .Build() + .Build() + .Build() + .Add<TExprList>() + .Add<TCoAtom>() + .Value("Attributes") + .Build() + .Add<TCoDataType>() + .Type() + .Value("Yson") + .Build() + .Build() + .Build() + .Build() + .Build() + .FreeArgs() + .Add(folderItems) + .Build() + .Build() + .Value(); + } + + TCoCons BuildFolderTableResExpr(TExprContext& ctx, NYql::TPositionHandle pos, const TExprBase& world, const TExprNodePtr& folderList) { + return Build<TCoCons>(ctx, pos) + .World(world) + .Input<TCoAssumeColumnOrder>() + .Input(folderList) + .ColumnOrder<TCoAtomList>() + .Add() + .Value("Path") + .Build() + .Add() + .Value("Type") + .Build() + .Add() + .Value("Attributes") + .Build() + .Build() + .Build() + .Done(); + } + + TMaybe<NYql::IYtGateway::TFolderResult> FetchFolderResult(TExprContext& ctx, const TString& cluster, const TYtKey::TFolderList& folder) { + auto p = PendingFolders_.FindPtr(std::make_pair(cluster, folder)); + YQL_ENSURE(p); + auto res = p->second.GetValue(); + res.ReportIssues(ctx.IssueManager); + if (!res.Success()) { + return {}; + } + return res; + } + private: TYtState::TPtr State_; @@ -772,4 +785,4 @@ THolder<IGraphTransformer> CreateYtIODiscoveryTransformer(TYtState::TPtr state) return THolder(new TYtIODiscoveryTransformer(state)); } -} +}
\ No newline at end of file |