diff options
author | Alexander Smirnov <alex@ydb.tech> | 2024-12-24 22:01:20 +0000 |
---|---|---|
committer | Alexander Smirnov <alex@ydb.tech> | 2024-12-24 22:01:20 +0000 |
commit | bd0e2de0b1035962a4d5b9e847eaa6508fad7fcf (patch) | |
tree | 79878ca309f9f7fada064f9b78b4223af4635f28 /yt/yql | |
parent | be43a4691ebdd4dbe260a8d77df4cd8423b14c05 (diff) | |
parent | e6bd80ded127cd064560f7ea471974b602770cb1 (diff) | |
download | ydb-bd0e2de0b1035962a4d5b9e847eaa6508fad7fcf.tar.gz |
Merge branch 'PR'
Diffstat (limited to 'yt/yql')
-rw-r--r-- | yt/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp | 2 | ||||
-rw-r--r-- | yt/yql/providers/yt/gateway/native/ut/ya.make | 6 | ||||
-rw-r--r-- | yt/yql/providers/yt/gateway/native/ut/yql_yt_native_folders_ut.cpp | 366 | ||||
-rw-r--r-- | yt/yql/providers/yt/lib/ut_common/ya.make | 16 | ||||
-rw-r--r-- | yt/yql/providers/yt/lib/ut_common/yql_ut_common.cpp | 55 | ||||
-rw-r--r-- | yt/yql/providers/yt/lib/ut_common/yql_ut_common.h | 23 | ||||
-rw-r--r-- | yt/yql/providers/yt/provider/ut/ya.make | 3 |
7 files changed, 464 insertions, 7 deletions
diff --git a/yt/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp b/yt/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp index c8c2b61607..8b3019ffa3 100644 --- a/yt/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp +++ b/yt/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp @@ -368,7 +368,7 @@ public: LocalListeners_.reserve(Inputs_.size()); for (size_t i = 0; i < Inputs_.size(); ++i) { auto& decoder = Settings_->Specs->Inputs[Settings_->OriginalIndexes[i]]; - bool native = decoder->NativeYtTypeFlags && !decoder->FieldsVec[i].ExplicitYson; + bool native = decoder->NativeYtTypeFlags; LocalListeners_.emplace_back(std::make_shared<TLocalListener>(Listener_, Settings_->ColumnNameMapping, ptr, types, *Settings_->Pool, Settings_->PgBuilder, native, jobStats)); LocalListeners_.back()->Init(LocalListeners_.back()); } diff --git a/yt/yql/providers/yt/gateway/native/ut/ya.make b/yt/yql/providers/yt/gateway/native/ut/ya.make index 702a53d5dd..23f262c22b 100644 --- a/yt/yql/providers/yt/gateway/native/ut/ya.make +++ b/yt/yql/providers/yt/gateway/native/ut/ya.make @@ -1,5 +1,3 @@ -IF (NOT OPENSOURCE) - UNITTEST() SRCS( @@ -11,7 +9,7 @@ PEERDIR( yt/yql/providers/yt/gateway/file yt/yql/providers/yt/codec/codegen yt/yql/providers/yt/comp_nodes/llvm14 - yql/essentials/core/ut_common + yt/yql/providers/yt/lib/ut_common library/cpp/testing/mock_server library/cpp/testing/common yql/essentials/public/udf/service/terminate_policy @@ -24,5 +22,3 @@ YQL_LAST_ABI_VERSION() END() -ENDIF() - diff --git a/yt/yql/providers/yt/gateway/native/ut/yql_yt_native_folders_ut.cpp b/yt/yql/providers/yt/gateway/native/ut/yql_yt_native_folders_ut.cpp new file mode 100644 index 0000000000..b08db52a50 --- /dev/null +++ b/yt/yql/providers/yt/gateway/native/ut/yql_yt_native_folders_ut.cpp @@ -0,0 +1,366 @@ +#include "library/cpp/testing/unittest/registar.h" +#include <library/cpp/yson/node/node_io.h> +#include <yt/yql/providers/yt/lib/ut_common/yql_ut_common.h> +#include <library/cpp/testing/common/network.h> +#include <library/cpp/testing/mock_server/server.h> +#include <yt/yql/providers/yt/gateway/native/yql_yt_native.h> +#include <yql/essentials/core/file_storage/proto/file_storage.pb.h> +#include <yql/essentials/providers/common/proto/gateways_config.pb.h> +#include <yt/yql/providers/yt/provider/yql_yt_provider.h> + +namespace NYql { + +namespace { + +constexpr auto CYPRES_TX_ID = "\"9518f6d4-f0480586-41103e8-ca595920\""; +constexpr auto CYPRES_NODE_A_CONTENT = R"( +[ + { + output = [ + < + "user_attributes" = {}; + "type" = "table"; + > "a"; + < + "user_attributes" = {}; + "type" = "table"; + > "b"; + < + "user_attributes" = {}; + "target_path" = "//link_dest"; + "broken" = %false; + "type" = "link"; + > "link"; + < + "user_attributes" = {}; + "target_path" = "//link_broken_dest"; + "broken" = %true; + "type" = "link"; + > "link_broken"; + < + "user_attributes" = {}; + "target_path" = "//link_access_denied"; + "broken" = %false; + "type" = "link"; + > "link_access_denied"; + ]; + }; +] +)"; + +constexpr auto CYPRES_NODE_W_LINK = R"( +[ + { + output = [ + < + "target_path" = "//link_dest"; + "broken" = %false; + "type" = "link"; + > "link"; + ]; + } +] +)"; + +constexpr auto CYPRES_LINK_DEST = R"( +[ + { + "output" = < + "user_attributes" = {}; + "type" = "table"; + > #; + }; +] +)"; + +constexpr auto CYPRES_ACCESS_ERROR = R"( +[ + { + "error" = { + "code" = 901; + "message" = "Access denied"; + } + } +] +)"; + +constexpr auto CYPRESS_BLACKBOX_ERROR = R"( +[ + { + "error" = { + "code" = 111; + "message" = "Blackbox rejected token"; + } + } +] +)"; + +TVector<IYtGateway::TFolderResult::TFolderItem> EXPECTED_ITEMS { + {"test/a/a", "table", R"({"user_attributes"={}})"}, + {"test/a/b", "table", R"({"user_attributes"={}})"}, + {"test/a/link", "table", R"({"user_attributes"={}})"}, + {"test/a/link_access_denied", "unknown", "{}"} +}; + +TGatewaysConfig MakeGatewaysConfig(size_t port) +{ + TGatewaysConfig config {}; + auto* clusters = config.MutableYt()->MutableClusterMapping(); + NYql::TYtClusterConfig cluster; + cluster.SetName("ut_cluster"); + cluster.SetYTName("ut_cluster"); + cluster.SetCluster("localhost:" + ToString(port)); + clusters->Add(std::move(cluster)); + return config; +} + +class TYtReplier : public TRequestReplier { +public: + using THandler = std::function<THttpResponse(TStringBuf path, const NYT::TNode& attributes)>; + + bool DoReply(const TReplyParams& params) override { + const TParsedHttpFull parsed(params.Input.FirstLine()); + Cout << parsed.Path << Endl; + + HttpCodes code = HTTP_NOT_FOUND; + TString content; + if (parsed.Path == "/api/v3/start_tx") { + content = CYPRES_TX_ID; + code = HTTP_OK; + } + else if (parsed.Path == "/api/v3/ping_tx") { + code = HTTP_OK; + } + else if (parsed.Path == "/api/v3/execute_batch") { + auto executeBatchRes = HandleExecuteBatch(params.Input); + executeBatchRes.OutTo(params.Output); + return true; + } + THttpResponse resp(code); + resp.SetContent(content); + resp.OutTo(params.Output); + + return true; + } + explicit TYtReplier(THandler handleListCommand, THandler handleGetCommand, TMaybe<std::function<void(const NYT::TNode& request)>> assertion): + HandleListCommand_(handleListCommand), HandleGetCommand_(handleGetCommand) { + if (assertion) { + Assertion_ = assertion.GetRef(); + } + } + +private: + THttpResponse HandleExecuteBatch(THttpInput& input) { + auto requestBody = input.ReadAll(); + auto requestBodyNode = NYT::NodeFromYsonString(requestBody); + if (!requestBodyNode.HasKey("requests")) { + return THttpResponse{HTTP_INTERNAL_SERVER_ERROR}; + } + auto& requests = requestBodyNode["requests"]; + if (!requests.IsList()) { + return THttpResponse{HTTP_INTERNAL_SERVER_ERROR}; + } + for (auto& request : requests.AsList()) { + Assertion_(request); + + const auto& command = request["command"]; + const auto& parameters = request["parameters"]; + const auto& path = parameters["path"].AsString(); + const auto& attributes = parameters.HasKey("attributes") ? parameters["attributes"] : NYT::TNode{}; + if (command == "list") { + return HandleListCommand_(path, attributes); + } + if (command == "get") { + return HandleGetCommand_(path, attributes); + } + } + return THttpResponse{HTTP_NOT_FOUND}; + } + + std::function<void(const NYT::TNode& request)> Assertion_ = [] ([[maybe_unused]] auto _) {}; + THandler HandleListCommand_; + THandler HandleGetCommand_; + +}; + +Y_UNIT_TEST_SUITE(YtNativeGateway) { + +std::pair<TIntrusivePtr<TYtState>, IYtGateway::TPtr> InitTest(const NTesting::TPortHolder& port, TTypeAnnotationContext* types) { + TYtNativeServices nativeServices; + auto gatewaysConfig = MakeGatewaysConfig(port); + nativeServices.Config = std::make_shared<TYtGatewayConfig>(gatewaysConfig.GetYt()); + nativeServices.FileStorage = CreateFileStorage(TFileStorageConfig{}); + + auto ytGateway = CreateYtNativeGateway(nativeServices); + auto ytState = MakeIntrusive<TYtState>(types); + ytState->Gateway = ytGateway; + + InitializeYtGateway(ytGateway, ytState); + return {ytState, ytGateway}; +} + +IYtGateway::TFolderResult GetFolderResult(TYtReplier::THandler handleList, TYtReplier::THandler handleGet, +TMaybe<std::function<void(const NYT::TNode& request)>> gatewayRequestAssertion, std::function<IYtGateway::TFolderOptions(TString)> makeFolderOptions) { + const auto port = NTesting::GetFreePort(); + NMock::TMockServer mockServer{port, + [gatewayRequestAssertion, handleList, handleGet] () {return new TYtReplier(handleList, handleGet, gatewayRequestAssertion);} + }; + + TTypeAnnotationContext types; + auto [ytState, ytGateway] = InitTest(port, &types); + + IYtGateway::TFolderOptions folderOptions = makeFolderOptions(ytState->SessionId); + auto folderFuture = ytGateway->GetFolder(std::move(folderOptions)); + + folderFuture.Wait(); + ytState->Gateway->CloseSession({ytState->SessionId}); + auto folderRes = folderFuture.GetValue(); + return folderRes; +} + +Y_UNIT_TEST(GetFolder) { + THashMap<TString, THashSet<TString>> requiredAttributes { + {"//test/a", {"type", "broken", "target_path", "user_attributes"}}, + {"//link_dest", {"type", "user_attributes"}} + }; + const auto checkRequiredAttributes = [&requiredAttributes] (const NYT::TNode& request) { + const auto& parameters = request["parameters"]; + const auto path = parameters["path"].AsString(); + const auto& attributes = parameters.HasKey("attributes") ? parameters["attributes"] : NYT::TNode{}; + + if (!requiredAttributes.contains(path)) { + return; + } + + THashSet<TString> attributesSet; + for (const auto& attribute : attributes.AsList()) { + attributesSet.insert(attribute.AsString()); + } + UNIT_ASSERT_VALUES_EQUAL(requiredAttributes[path], attributesSet); + }; + + const auto handleGet = [] (TStringBuf path, const NYT::TNode& attributes) { + Y_UNUSED(attributes); + THttpResponse resp{HTTP_OK}; + if (path == "//link_dest") { + resp.SetContent(CYPRES_LINK_DEST); + return resp; + } + if (path == "//link_access_denied") { + resp.SetContent(CYPRES_ACCESS_ERROR); + return resp; + } + + return THttpResponse{HTTP_NOT_FOUND}; + }; + + const auto handleList = [] (TStringBuf path, const NYT::TNode& attributes) { + Y_UNUSED(attributes); + THttpResponse resp{HTTP_OK}; + if (path == "//test/a") { + resp.SetContent(CYPRES_NODE_A_CONTENT); + return resp; + } + return THttpResponse{HTTP_NOT_FOUND}; + }; + + const auto makeFolderOptions = [] (const TString& sessionId) { + IYtGateway::TFolderOptions folderOptions{sessionId}; + TYtSettings ytSettings {}; + folderOptions.Cluster("ut_cluster") + .Config(std::make_shared<TYtSettings>(ytSettings)) + .Prefix("//test/a") + .Attributes({"user_attributes"}); + return folderOptions; + }; + + auto folderRes + = GetFolderResult(handleList, handleGet, checkRequiredAttributes, makeFolderOptions); + + UNIT_ASSERT_EQUAL_C(folderRes.Success(), true, folderRes.Issues().ToString()); + UNIT_ASSERT_EQUAL( + folderRes.ItemsOrFileLink, + (std::variant<TVector<IYtGateway::TFolderResult::TFolderItem>, TFileLinkPtr>(EXPECTED_ITEMS))); + } + +Y_UNIT_TEST(EmptyResolveIsNotError) { + const auto port = NTesting::GetFreePort(); + + const auto handleList = [] (TStringBuf path, const NYT::TNode& attributes) { + Y_UNUSED(path); + Y_UNUSED(attributes); + + THttpResponse resp{HTTP_OK}; + resp.SetContent(CYPRES_NODE_W_LINK); + return resp; + }; + + const auto handleGet = [] (TStringBuf path, const NYT::TNode& attributes) { + Y_UNUSED(path); + Y_UNUSED(attributes); + + THttpResponse resp{HTTP_OK}; + resp.SetContent(CYPRES_ACCESS_ERROR); + return resp; + }; + + const auto makeFolderOptions = [] (const TString& sessionId) { + IYtGateway::TFolderOptions folderOptions{sessionId}; + TYtSettings ytSettings {}; + folderOptions.Cluster("ut_cluster") + .Config(std::make_shared<TYtSettings>(ytSettings)) + .Prefix("//test/a") + .Attributes({"user_attributes"}); + return folderOptions; + }; + + auto folderRes + = GetFolderResult(handleList, handleGet, Nothing(), makeFolderOptions); + + UNIT_ASSERT_EQUAL_C(folderRes.Success(), true, folderRes.Issues().ToString()); +} + +Y_UNIT_TEST(GetFolderException) { + const auto port = NTesting::GetFreePort(); + + const auto handleList = [] (TStringBuf path, const NYT::TNode& attributes) { + Y_UNUSED(path); + Y_UNUSED(attributes); + + THttpResponse resp{HTTP_UNAUTHORIZED}; + auto header = R"({"code":900,"message":"Authentication failed"})"; + resp.AddHeader(THttpInputHeader("X-YT-Error", header)); + resp.SetContent(CYPRESS_BLACKBOX_ERROR); + return resp; + }; + + const auto handleGet = [] (TStringBuf path, const NYT::TNode& attributes) { + Y_UNUSED(path); + Y_UNUSED(attributes); + + THttpResponse resp{HTTP_OK}; + resp.SetContent(""); + return resp; + }; + + const auto makeFolderOptions = [] (const TString& sessionId) { + IYtGateway::TFolderOptions folderOptions{sessionId}; + TYtSettings ytSettings {}; + folderOptions.Cluster("ut_cluster") + .Config(std::make_shared<TYtSettings>(ytSettings)) + .Prefix("//test/a") + .Attributes({"user_attributes"}); + return folderOptions; + }; + + const auto folderRes + = GetFolderResult(handleList, handleGet, Nothing(), makeFolderOptions); + + UNIT_ASSERT(!folderRes.Issues().Empty()); + UNIT_ASSERT_STRING_CONTAINS(folderRes.Issues().ToString(), "Authentication failed"); +} +} + +} // namespace + +} // namespace NYql diff --git a/yt/yql/providers/yt/lib/ut_common/ya.make b/yt/yql/providers/yt/lib/ut_common/ya.make new file mode 100644 index 0000000000..4084a3d770 --- /dev/null +++ b/yt/yql/providers/yt/lib/ut_common/ya.make @@ -0,0 +1,16 @@ +LIBRARY() + +SRCS( + yql_ut_common.cpp + yql_ut_common.h +) + +PEERDIR( + yql/essentials/core + yql/essentials/core/expr_nodes +) + +YQL_LAST_ABI_VERSION() + +END() + diff --git a/yt/yql/providers/yt/lib/ut_common/yql_ut_common.cpp b/yt/yql/providers/yt/lib/ut_common/yql_ut_common.cpp new file mode 100644 index 0000000000..cef3f2723c --- /dev/null +++ b/yt/yql/providers/yt/lib/ut_common/yql_ut_common.cpp @@ -0,0 +1,55 @@ +#include "yql_ut_common.h" + +#include <library/cpp/random_provider/random_provider.h> +#include <library/cpp/time_provider/time_provider.h> + +#include <util/generic/guid.h> +#include <util/system/user.h> +#include <util/stream/file.h> + +namespace NYql { + +TTestTablesMapping::TTestTablesMapping() + : TmpInput() + , TmpInputAttr(TmpInput.Name() + ".attr") + , TmpOutput() + , TmpOutputAttr(TmpOutput.Name() + ".attr") +{ + { + TUnbufferedFileOutput tmpInput(TmpInput); + tmpInput << "{\"key\"=\"\";\"subkey\"=\"\";\"value\"=\"\"}" << Endl; + TUnbufferedFileOutput tmpInputAttr(TmpInputAttr); + tmpInputAttr << "{\"_yql_row_spec\" = {\"Type\" = [\"StructType\";[" + << "[\"key\";[\"DataType\";\"String\"]];" + << "[\"subkey\";[\"DataType\";\"String\"]];" + << "[\"value\";[\"DataType\";\"String\"]]" + << "]]}}" << Endl; + } + insert(std::make_pair("yt.plato.Input", TmpInput.Name())); + + { + TUnbufferedFileOutput tmpOutput(TmpOutput); + tmpOutput << "{\"key\"=\"\";\"subkey\"=\"\";\"value\"=\"\"}" << Endl; + TUnbufferedFileOutput tmpOutputAttr(TmpOutputAttr); + tmpOutputAttr << "{\"_yql_row_spec\" = {\"Type\" = [\"StructType\";[" + << "[\"key\";[\"DataType\";\"String\"]];" + << "[\"subkey\";[\"DataType\";\"String\"]];" + << "[\"value\";[\"DataType\";\"String\"]]" + << "]]}}" << Endl; + } + insert(std::make_pair("yt.plato.Output", TmpOutput.Name())); +} + +void InitializeYtGateway(IYtGateway::TPtr gateway, TYtState::TPtr ytState) { + ytState->SessionId = CreateGuidAsString(); + gateway->OpenSession( + IYtGateway::TOpenSessionOptions(ytState->SessionId) + .UserName(GetUsername()) + .ProgressWriter(&NullProgressWriter) + .OperationOptions(TYqlOperationOptions()) + .RandomProvider(CreateDeterministicRandomProvider(1)) + .TimeProvider(CreateDeterministicTimeProvider(10000000)) + ); +} + +} diff --git a/yt/yql/providers/yt/lib/ut_common/yql_ut_common.h b/yt/yql/providers/yt/lib/ut_common/yql_ut_common.h new file mode 100644 index 0000000000..ddee02690a --- /dev/null +++ b/yt/yql/providers/yt/lib/ut_common/yql_ut_common.h @@ -0,0 +1,23 @@ +#pragma once + +#include <yql/essentials/core/yql_expr_type_annotation.h> + +#include <yt/yql/providers/yt/gateway/file/yql_yt_file.h> +#include <yt/yql/providers/yt/provider/yql_yt_provider.h> + +#include <util/system/tempfile.h> + +namespace NYql { + +struct TTestTablesMapping: public THashMap<TString, TString> { + TTempFileHandle TmpInput; + TTempFileHandle TmpInputAttr; + TTempFileHandle TmpOutput; + TTempFileHandle TmpOutputAttr; + + TTestTablesMapping(); +}; + +void InitializeYtGateway(IYtGateway::TPtr gateway, TYtState::TPtr ytState); + +} diff --git a/yt/yql/providers/yt/provider/ut/ya.make b/yt/yql/providers/yt/provider/ut/ya.make index 3b29f30999..888bfe2d25 100644 --- a/yt/yql/providers/yt/provider/ut/ya.make +++ b/yt/yql/providers/yt/provider/ut/ya.make @@ -17,7 +17,7 @@ PEERDIR( yt/yql/providers/yt/gateway/file yt/yql/providers/yt/codec/codegen yt/yql/providers/yt/comp_nodes/llvm14 - yql/essentials/core/ut_common + yt/yql/providers/yt/lib/ut_common yql/essentials/ast yql/essentials/public/udf/service/terminate_policy yql/essentials/core/services @@ -38,3 +38,4 @@ YQL_LAST_ABI_VERSION() END() ENDIF() + |