aboutsummaryrefslogtreecommitdiffstats
path: root/yt/yql
diff options
context:
space:
mode:
authorAlexander Smirnov <alex@ydb.tech>2024-12-24 15:46:17 +0000
committerAlexander Smirnov <alex@ydb.tech>2024-12-24 15:46:17 +0000
commitc7decaf9230ddcb1ec2c42d1f50fb3998166c4ef (patch)
tree4efde4e4276bb0f24c314909403a1f6ed94c60d7 /yt/yql
parentcf344b64297e6a79d1e538be9f8f59afb06a2a97 (diff)
parentb821606f7bd364dc755d37b5bcb3559130675364 (diff)
downloadydb-c7decaf9230ddcb1ec2c42d1f50fb3998166c4ef.tar.gz
Merge branch 'rightlib' into merge-libs-241224-1545
Diffstat (limited to 'yt/yql')
-rw-r--r--yt/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp2
-rw-r--r--yt/yql/providers/yt/gateway/native/ut/ya.make6
-rw-r--r--yt/yql/providers/yt/gateway/native/ut/yql_yt_native_folders_ut.cpp366
-rw-r--r--yt/yql/providers/yt/lib/ut_common/ya.make16
-rw-r--r--yt/yql/providers/yt/lib/ut_common/yql_ut_common.cpp55
-rw-r--r--yt/yql/providers/yt/lib/ut_common/yql_ut_common.h23
-rw-r--r--yt/yql/providers/yt/provider/ut/ya.make3
7 files changed, 464 insertions, 7 deletions
diff --git a/yt/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp b/yt/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp
index c8c2b61607..8b3019ffa3 100644
--- a/yt/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp
+++ b/yt/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp
@@ -368,7 +368,7 @@ public:
LocalListeners_.reserve(Inputs_.size());
for (size_t i = 0; i < Inputs_.size(); ++i) {
auto& decoder = Settings_->Specs->Inputs[Settings_->OriginalIndexes[i]];
- bool native = decoder->NativeYtTypeFlags && !decoder->FieldsVec[i].ExplicitYson;
+ bool native = decoder->NativeYtTypeFlags;
LocalListeners_.emplace_back(std::make_shared<TLocalListener>(Listener_, Settings_->ColumnNameMapping, ptr, types, *Settings_->Pool, Settings_->PgBuilder, native, jobStats));
LocalListeners_.back()->Init(LocalListeners_.back());
}
diff --git a/yt/yql/providers/yt/gateway/native/ut/ya.make b/yt/yql/providers/yt/gateway/native/ut/ya.make
index 702a53d5dd..23f262c22b 100644
--- a/yt/yql/providers/yt/gateway/native/ut/ya.make
+++ b/yt/yql/providers/yt/gateway/native/ut/ya.make
@@ -1,5 +1,3 @@
-IF (NOT OPENSOURCE)
-
UNITTEST()
SRCS(
@@ -11,7 +9,7 @@ PEERDIR(
yt/yql/providers/yt/gateway/file
yt/yql/providers/yt/codec/codegen
yt/yql/providers/yt/comp_nodes/llvm14
- yql/essentials/core/ut_common
+ yt/yql/providers/yt/lib/ut_common
library/cpp/testing/mock_server
library/cpp/testing/common
yql/essentials/public/udf/service/terminate_policy
@@ -24,5 +22,3 @@ YQL_LAST_ABI_VERSION()
END()
-ENDIF()
-
diff --git a/yt/yql/providers/yt/gateway/native/ut/yql_yt_native_folders_ut.cpp b/yt/yql/providers/yt/gateway/native/ut/yql_yt_native_folders_ut.cpp
new file mode 100644
index 0000000000..b08db52a50
--- /dev/null
+++ b/yt/yql/providers/yt/gateway/native/ut/yql_yt_native_folders_ut.cpp
@@ -0,0 +1,366 @@
+#include "library/cpp/testing/unittest/registar.h"
+#include <library/cpp/yson/node/node_io.h>
+#include <yt/yql/providers/yt/lib/ut_common/yql_ut_common.h>
+#include <library/cpp/testing/common/network.h>
+#include <library/cpp/testing/mock_server/server.h>
+#include <yt/yql/providers/yt/gateway/native/yql_yt_native.h>
+#include <yql/essentials/core/file_storage/proto/file_storage.pb.h>
+#include <yql/essentials/providers/common/proto/gateways_config.pb.h>
+#include <yt/yql/providers/yt/provider/yql_yt_provider.h>
+
+namespace NYql {
+
+namespace {
+
+constexpr auto CYPRES_TX_ID = "\"9518f6d4-f0480586-41103e8-ca595920\"";
+constexpr auto CYPRES_NODE_A_CONTENT = R"(
+[
+ {
+ output = [
+ <
+ "user_attributes" = {};
+ "type" = "table";
+ > "a";
+ <
+ "user_attributes" = {};
+ "type" = "table";
+ > "b";
+ <
+ "user_attributes" = {};
+ "target_path" = "//link_dest";
+ "broken" = %false;
+ "type" = "link";
+ > "link";
+ <
+ "user_attributes" = {};
+ "target_path" = "//link_broken_dest";
+ "broken" = %true;
+ "type" = "link";
+ > "link_broken";
+ <
+ "user_attributes" = {};
+ "target_path" = "//link_access_denied";
+ "broken" = %false;
+ "type" = "link";
+ > "link_access_denied";
+ ];
+ };
+]
+)";
+
+constexpr auto CYPRES_NODE_W_LINK = R"(
+[
+ {
+ output = [
+ <
+ "target_path" = "//link_dest";
+ "broken" = %false;
+ "type" = "link";
+ > "link";
+ ];
+ }
+]
+)";
+
+constexpr auto CYPRES_LINK_DEST = R"(
+[
+ {
+ "output" = <
+ "user_attributes" = {};
+ "type" = "table";
+ > #;
+ };
+]
+)";
+
+constexpr auto CYPRES_ACCESS_ERROR = R"(
+[
+ {
+ "error" = {
+ "code" = 901;
+ "message" = "Access denied";
+ }
+ }
+]
+)";
+
+constexpr auto CYPRESS_BLACKBOX_ERROR = R"(
+[
+ {
+ "error" = {
+ "code" = 111;
+ "message" = "Blackbox rejected token";
+ }
+ }
+]
+)";
+
+TVector<IYtGateway::TFolderResult::TFolderItem> EXPECTED_ITEMS {
+ {"test/a/a", "table", R"({"user_attributes"={}})"},
+ {"test/a/b", "table", R"({"user_attributes"={}})"},
+ {"test/a/link", "table", R"({"user_attributes"={}})"},
+ {"test/a/link_access_denied", "unknown", "{}"}
+};
+
+TGatewaysConfig MakeGatewaysConfig(size_t port)
+{
+ TGatewaysConfig config {};
+ auto* clusters = config.MutableYt()->MutableClusterMapping();
+ NYql::TYtClusterConfig cluster;
+ cluster.SetName("ut_cluster");
+ cluster.SetYTName("ut_cluster");
+ cluster.SetCluster("localhost:" + ToString(port));
+ clusters->Add(std::move(cluster));
+ return config;
+}
+
+class TYtReplier : public TRequestReplier {
+public:
+ using THandler = std::function<THttpResponse(TStringBuf path, const NYT::TNode& attributes)>;
+
+ bool DoReply(const TReplyParams& params) override {
+ const TParsedHttpFull parsed(params.Input.FirstLine());
+ Cout << parsed.Path << Endl;
+
+ HttpCodes code = HTTP_NOT_FOUND;
+ TString content;
+ if (parsed.Path == "/api/v3/start_tx") {
+ content = CYPRES_TX_ID;
+ code = HTTP_OK;
+ }
+ else if (parsed.Path == "/api/v3/ping_tx") {
+ code = HTTP_OK;
+ }
+ else if (parsed.Path == "/api/v3/execute_batch") {
+ auto executeBatchRes = HandleExecuteBatch(params.Input);
+ executeBatchRes.OutTo(params.Output);
+ return true;
+ }
+ THttpResponse resp(code);
+ resp.SetContent(content);
+ resp.OutTo(params.Output);
+
+ return true;
+ }
+ explicit TYtReplier(THandler handleListCommand, THandler handleGetCommand, TMaybe<std::function<void(const NYT::TNode& request)>> assertion):
+ HandleListCommand_(handleListCommand), HandleGetCommand_(handleGetCommand) {
+ if (assertion) {
+ Assertion_ = assertion.GetRef();
+ }
+ }
+
+private:
+ THttpResponse HandleExecuteBatch(THttpInput& input) {
+ auto requestBody = input.ReadAll();
+ auto requestBodyNode = NYT::NodeFromYsonString(requestBody);
+ if (!requestBodyNode.HasKey("requests")) {
+ return THttpResponse{HTTP_INTERNAL_SERVER_ERROR};
+ }
+ auto& requests = requestBodyNode["requests"];
+ if (!requests.IsList()) {
+ return THttpResponse{HTTP_INTERNAL_SERVER_ERROR};
+ }
+ for (auto& request : requests.AsList()) {
+ Assertion_(request);
+
+ const auto& command = request["command"];
+ const auto& parameters = request["parameters"];
+ const auto& path = parameters["path"].AsString();
+ const auto& attributes = parameters.HasKey("attributes") ? parameters["attributes"] : NYT::TNode{};
+ if (command == "list") {
+ return HandleListCommand_(path, attributes);
+ }
+ if (command == "get") {
+ return HandleGetCommand_(path, attributes);
+ }
+ }
+ return THttpResponse{HTTP_NOT_FOUND};
+ }
+
+ std::function<void(const NYT::TNode& request)> Assertion_ = [] ([[maybe_unused]] auto _) {};
+ THandler HandleListCommand_;
+ THandler HandleGetCommand_;
+
+};
+
+Y_UNIT_TEST_SUITE(YtNativeGateway) {
+
+std::pair<TIntrusivePtr<TYtState>, IYtGateway::TPtr> InitTest(const NTesting::TPortHolder& port, TTypeAnnotationContext* types) {
+ TYtNativeServices nativeServices;
+ auto gatewaysConfig = MakeGatewaysConfig(port);
+ nativeServices.Config = std::make_shared<TYtGatewayConfig>(gatewaysConfig.GetYt());
+ nativeServices.FileStorage = CreateFileStorage(TFileStorageConfig{});
+
+ auto ytGateway = CreateYtNativeGateway(nativeServices);
+ auto ytState = MakeIntrusive<TYtState>(types);
+ ytState->Gateway = ytGateway;
+
+ InitializeYtGateway(ytGateway, ytState);
+ return {ytState, ytGateway};
+}
+
+IYtGateway::TFolderResult GetFolderResult(TYtReplier::THandler handleList, TYtReplier::THandler handleGet,
+TMaybe<std::function<void(const NYT::TNode& request)>> gatewayRequestAssertion, std::function<IYtGateway::TFolderOptions(TString)> makeFolderOptions) {
+ const auto port = NTesting::GetFreePort();
+ NMock::TMockServer mockServer{port,
+ [gatewayRequestAssertion, handleList, handleGet] () {return new TYtReplier(handleList, handleGet, gatewayRequestAssertion);}
+ };
+
+ TTypeAnnotationContext types;
+ auto [ytState, ytGateway] = InitTest(port, &types);
+
+ IYtGateway::TFolderOptions folderOptions = makeFolderOptions(ytState->SessionId);
+ auto folderFuture = ytGateway->GetFolder(std::move(folderOptions));
+
+ folderFuture.Wait();
+ ytState->Gateway->CloseSession({ytState->SessionId});
+ auto folderRes = folderFuture.GetValue();
+ return folderRes;
+}
+
+Y_UNIT_TEST(GetFolder) {
+ THashMap<TString, THashSet<TString>> requiredAttributes {
+ {"//test/a", {"type", "broken", "target_path", "user_attributes"}},
+ {"//link_dest", {"type", "user_attributes"}}
+ };
+ const auto checkRequiredAttributes = [&requiredAttributes] (const NYT::TNode& request) {
+ const auto& parameters = request["parameters"];
+ const auto path = parameters["path"].AsString();
+ const auto& attributes = parameters.HasKey("attributes") ? parameters["attributes"] : NYT::TNode{};
+
+ if (!requiredAttributes.contains(path)) {
+ return;
+ }
+
+ THashSet<TString> attributesSet;
+ for (const auto& attribute : attributes.AsList()) {
+ attributesSet.insert(attribute.AsString());
+ }
+ UNIT_ASSERT_VALUES_EQUAL(requiredAttributes[path], attributesSet);
+ };
+
+ const auto handleGet = [] (TStringBuf path, const NYT::TNode& attributes) {
+ Y_UNUSED(attributes);
+ THttpResponse resp{HTTP_OK};
+ if (path == "//link_dest") {
+ resp.SetContent(CYPRES_LINK_DEST);
+ return resp;
+ }
+ if (path == "//link_access_denied") {
+ resp.SetContent(CYPRES_ACCESS_ERROR);
+ return resp;
+ }
+
+ return THttpResponse{HTTP_NOT_FOUND};
+ };
+
+ const auto handleList = [] (TStringBuf path, const NYT::TNode& attributes) {
+ Y_UNUSED(attributes);
+ THttpResponse resp{HTTP_OK};
+ if (path == "//test/a") {
+ resp.SetContent(CYPRES_NODE_A_CONTENT);
+ return resp;
+ }
+ return THttpResponse{HTTP_NOT_FOUND};
+ };
+
+ const auto makeFolderOptions = [] (const TString& sessionId) {
+ IYtGateway::TFolderOptions folderOptions{sessionId};
+ TYtSettings ytSettings {};
+ folderOptions.Cluster("ut_cluster")
+ .Config(std::make_shared<TYtSettings>(ytSettings))
+ .Prefix("//test/a")
+ .Attributes({"user_attributes"});
+ return folderOptions;
+ };
+
+ auto folderRes
+ = GetFolderResult(handleList, handleGet, checkRequiredAttributes, makeFolderOptions);
+
+ UNIT_ASSERT_EQUAL_C(folderRes.Success(), true, folderRes.Issues().ToString());
+ UNIT_ASSERT_EQUAL(
+ folderRes.ItemsOrFileLink,
+ (std::variant<TVector<IYtGateway::TFolderResult::TFolderItem>, TFileLinkPtr>(EXPECTED_ITEMS)));
+ }
+
+Y_UNIT_TEST(EmptyResolveIsNotError) {
+ const auto port = NTesting::GetFreePort();
+
+ const auto handleList = [] (TStringBuf path, const NYT::TNode& attributes) {
+ Y_UNUSED(path);
+ Y_UNUSED(attributes);
+
+ THttpResponse resp{HTTP_OK};
+ resp.SetContent(CYPRES_NODE_W_LINK);
+ return resp;
+ };
+
+ const auto handleGet = [] (TStringBuf path, const NYT::TNode& attributes) {
+ Y_UNUSED(path);
+ Y_UNUSED(attributes);
+
+ THttpResponse resp{HTTP_OK};
+ resp.SetContent(CYPRES_ACCESS_ERROR);
+ return resp;
+ };
+
+ const auto makeFolderOptions = [] (const TString& sessionId) {
+ IYtGateway::TFolderOptions folderOptions{sessionId};
+ TYtSettings ytSettings {};
+ folderOptions.Cluster("ut_cluster")
+ .Config(std::make_shared<TYtSettings>(ytSettings))
+ .Prefix("//test/a")
+ .Attributes({"user_attributes"});
+ return folderOptions;
+ };
+
+ auto folderRes
+ = GetFolderResult(handleList, handleGet, Nothing(), makeFolderOptions);
+
+ UNIT_ASSERT_EQUAL_C(folderRes.Success(), true, folderRes.Issues().ToString());
+}
+
+Y_UNIT_TEST(GetFolderException) {
+ const auto port = NTesting::GetFreePort();
+
+ const auto handleList = [] (TStringBuf path, const NYT::TNode& attributes) {
+ Y_UNUSED(path);
+ Y_UNUSED(attributes);
+
+ THttpResponse resp{HTTP_UNAUTHORIZED};
+ auto header = R"({"code":900,"message":"Authentication failed"})";
+ resp.AddHeader(THttpInputHeader("X-YT-Error", header));
+ resp.SetContent(CYPRESS_BLACKBOX_ERROR);
+ return resp;
+ };
+
+ const auto handleGet = [] (TStringBuf path, const NYT::TNode& attributes) {
+ Y_UNUSED(path);
+ Y_UNUSED(attributes);
+
+ THttpResponse resp{HTTP_OK};
+ resp.SetContent("");
+ return resp;
+ };
+
+ const auto makeFolderOptions = [] (const TString& sessionId) {
+ IYtGateway::TFolderOptions folderOptions{sessionId};
+ TYtSettings ytSettings {};
+ folderOptions.Cluster("ut_cluster")
+ .Config(std::make_shared<TYtSettings>(ytSettings))
+ .Prefix("//test/a")
+ .Attributes({"user_attributes"});
+ return folderOptions;
+ };
+
+ const auto folderRes
+ = GetFolderResult(handleList, handleGet, Nothing(), makeFolderOptions);
+
+ UNIT_ASSERT(!folderRes.Issues().Empty());
+ UNIT_ASSERT_STRING_CONTAINS(folderRes.Issues().ToString(), "Authentication failed");
+}
+}
+
+} // namespace
+
+} // namespace NYql
diff --git a/yt/yql/providers/yt/lib/ut_common/ya.make b/yt/yql/providers/yt/lib/ut_common/ya.make
new file mode 100644
index 0000000000..4084a3d770
--- /dev/null
+++ b/yt/yql/providers/yt/lib/ut_common/ya.make
@@ -0,0 +1,16 @@
+LIBRARY()
+
+SRCS(
+ yql_ut_common.cpp
+ yql_ut_common.h
+)
+
+PEERDIR(
+ yql/essentials/core
+ yql/essentials/core/expr_nodes
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
+
diff --git a/yt/yql/providers/yt/lib/ut_common/yql_ut_common.cpp b/yt/yql/providers/yt/lib/ut_common/yql_ut_common.cpp
new file mode 100644
index 0000000000..cef3f2723c
--- /dev/null
+++ b/yt/yql/providers/yt/lib/ut_common/yql_ut_common.cpp
@@ -0,0 +1,55 @@
+#include "yql_ut_common.h"
+
+#include <library/cpp/random_provider/random_provider.h>
+#include <library/cpp/time_provider/time_provider.h>
+
+#include <util/generic/guid.h>
+#include <util/system/user.h>
+#include <util/stream/file.h>
+
+namespace NYql {
+
+TTestTablesMapping::TTestTablesMapping()
+ : TmpInput()
+ , TmpInputAttr(TmpInput.Name() + ".attr")
+ , TmpOutput()
+ , TmpOutputAttr(TmpOutput.Name() + ".attr")
+{
+ {
+ TUnbufferedFileOutput tmpInput(TmpInput);
+ tmpInput << "{\"key\"=\"\";\"subkey\"=\"\";\"value\"=\"\"}" << Endl;
+ TUnbufferedFileOutput tmpInputAttr(TmpInputAttr);
+ tmpInputAttr << "{\"_yql_row_spec\" = {\"Type\" = [\"StructType\";["
+ << "[\"key\";[\"DataType\";\"String\"]];"
+ << "[\"subkey\";[\"DataType\";\"String\"]];"
+ << "[\"value\";[\"DataType\";\"String\"]]"
+ << "]]}}" << Endl;
+ }
+ insert(std::make_pair("yt.plato.Input", TmpInput.Name()));
+
+ {
+ TUnbufferedFileOutput tmpOutput(TmpOutput);
+ tmpOutput << "{\"key\"=\"\";\"subkey\"=\"\";\"value\"=\"\"}" << Endl;
+ TUnbufferedFileOutput tmpOutputAttr(TmpOutputAttr);
+ tmpOutputAttr << "{\"_yql_row_spec\" = {\"Type\" = [\"StructType\";["
+ << "[\"key\";[\"DataType\";\"String\"]];"
+ << "[\"subkey\";[\"DataType\";\"String\"]];"
+ << "[\"value\";[\"DataType\";\"String\"]]"
+ << "]]}}" << Endl;
+ }
+ insert(std::make_pair("yt.plato.Output", TmpOutput.Name()));
+}
+
+void InitializeYtGateway(IYtGateway::TPtr gateway, TYtState::TPtr ytState) {
+ ytState->SessionId = CreateGuidAsString();
+ gateway->OpenSession(
+ IYtGateway::TOpenSessionOptions(ytState->SessionId)
+ .UserName(GetUsername())
+ .ProgressWriter(&NullProgressWriter)
+ .OperationOptions(TYqlOperationOptions())
+ .RandomProvider(CreateDeterministicRandomProvider(1))
+ .TimeProvider(CreateDeterministicTimeProvider(10000000))
+ );
+}
+
+}
diff --git a/yt/yql/providers/yt/lib/ut_common/yql_ut_common.h b/yt/yql/providers/yt/lib/ut_common/yql_ut_common.h
new file mode 100644
index 0000000000..ddee02690a
--- /dev/null
+++ b/yt/yql/providers/yt/lib/ut_common/yql_ut_common.h
@@ -0,0 +1,23 @@
+#pragma once
+
+#include <yql/essentials/core/yql_expr_type_annotation.h>
+
+#include <yt/yql/providers/yt/gateway/file/yql_yt_file.h>
+#include <yt/yql/providers/yt/provider/yql_yt_provider.h>
+
+#include <util/system/tempfile.h>
+
+namespace NYql {
+
+struct TTestTablesMapping: public THashMap<TString, TString> {
+ TTempFileHandle TmpInput;
+ TTempFileHandle TmpInputAttr;
+ TTempFileHandle TmpOutput;
+ TTempFileHandle TmpOutputAttr;
+
+ TTestTablesMapping();
+};
+
+void InitializeYtGateway(IYtGateway::TPtr gateway, TYtState::TPtr ytState);
+
+}
diff --git a/yt/yql/providers/yt/provider/ut/ya.make b/yt/yql/providers/yt/provider/ut/ya.make
index 3b29f30999..888bfe2d25 100644
--- a/yt/yql/providers/yt/provider/ut/ya.make
+++ b/yt/yql/providers/yt/provider/ut/ya.make
@@ -17,7 +17,7 @@ PEERDIR(
yt/yql/providers/yt/gateway/file
yt/yql/providers/yt/codec/codegen
yt/yql/providers/yt/comp_nodes/llvm14
- yql/essentials/core/ut_common
+ yt/yql/providers/yt/lib/ut_common
yql/essentials/ast
yql/essentials/public/udf/service/terminate_policy
yql/essentials/core/services
@@ -38,3 +38,4 @@ YQL_LAST_ABI_VERSION()
END()
ENDIF()
+