aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp
diff options
context:
space:
mode:
authornadya73 <nadya73@yandex-team.com>2024-07-02 23:10:50 +0300
committernadya73 <nadya73@yandex-team.com>2024-07-02 23:21:03 +0300
commit5ea9afc5ee7edc24efa5f45b3a15e184872b0854 (patch)
tree4ccc339d97575cba8b3ed47b6f0615326bdb5324 /yt/cpp
parent96b239778766d32d5158aca805e08199b3c0a743 (diff)
downloadydb-5ea9afc5ee7edc24efa5f45b3a15e184872b0854.tar.gz
[yt/cpp/mapreduce] YT-21595: Use gtest instead of ytest in all mapreduce tests
85671f0cf4f45b4f015fa2cc0d195b81c16c6e8a
Diffstat (limited to 'yt/cpp')
-rw-r--r--yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp199
-rw-r--r--yt/cpp/mapreduce/http/ut/http_ut.cpp94
-rw-r--r--yt/cpp/mapreduce/http/ut/simple_server.cpp90
-rw-r--r--yt/cpp/mapreduce/http/ut/simple_server.h35
-rw-r--r--yt/cpp/mapreduce/http/ut/ya.make14
-rw-r--r--yt/cpp/mapreduce/http/ya.make4
-rw-r--r--yt/cpp/mapreduce/interface/common_ut.cpp357
-rw-r--r--yt/cpp/mapreduce/interface/config_ut.cpp20
-rw-r--r--yt/cpp/mapreduce/interface/error_ut.cpp81
-rw-r--r--yt/cpp/mapreduce/interface/format_ut.cpp235
-rw-r--r--yt/cpp/mapreduce/interface/job_counters.cpp2
-rw-r--r--yt/cpp/mapreduce/interface/job_counters.h2
-rw-r--r--yt/cpp/mapreduce/interface/job_counters_ut.cpp103
-rw-r--r--yt/cpp/mapreduce/interface/job_statistics_ut.cpp257
-rw-r--r--yt/cpp/mapreduce/interface/logging/ut/log_ut.cpp19
-rw-r--r--yt/cpp/mapreduce/interface/logging/ut/ya.make11
-rw-r--r--yt/cpp/mapreduce/interface/logging/ya.make4
-rw-r--r--yt/cpp/mapreduce/interface/operation_ut.cpp269
-rw-r--r--yt/cpp/mapreduce/interface/protobuf_file_options_ut.cpp271
-rw-r--r--yt/cpp/mapreduce/interface/protobuf_table_schema_ut.cpp451
-rw-r--r--yt/cpp/mapreduce/interface/serialize_ut.cpp49
-rw-r--r--yt/cpp/mapreduce/interface/ut/common_ut.cpp353
-rw-r--r--yt/cpp/mapreduce/interface/ut/common_ut.h (renamed from yt/cpp/mapreduce/interface/common_ut.h)0
-rw-r--r--yt/cpp/mapreduce/interface/ut/config_ut.cpp17
-rw-r--r--yt/cpp/mapreduce/interface/ut/error_ut.cpp81
-rw-r--r--yt/cpp/mapreduce/interface/ut/format_ut.cpp232
-rw-r--r--yt/cpp/mapreduce/interface/ut/job_counters_ut.cpp100
-rw-r--r--yt/cpp/mapreduce/interface/ut/job_statistics_ut.cpp254
-rw-r--r--yt/cpp/mapreduce/interface/ut/operation_ut.cpp272
-rw-r--r--yt/cpp/mapreduce/interface/ut/proto3_ut.proto (renamed from yt/cpp/mapreduce/interface/proto3_ut.proto)0
-rw-r--r--yt/cpp/mapreduce/interface/ut/protobuf_file_options_ut.cpp270
-rw-r--r--yt/cpp/mapreduce/interface/ut/protobuf_file_options_ut.proto (renamed from yt/cpp/mapreduce/interface/protobuf_file_options_ut.proto)0
-rw-r--r--yt/cpp/mapreduce/interface/ut/protobuf_table_schema_ut.cpp444
-rw-r--r--yt/cpp/mapreduce/interface/ut/protobuf_table_schema_ut.proto (renamed from yt/cpp/mapreduce/interface/protobuf_table_schema_ut.proto)0
-rw-r--r--yt/cpp/mapreduce/interface/ut/serialize_ut.cpp46
-rw-r--r--yt/cpp/mapreduce/interface/ut/ya.make5
-rw-r--r--yt/cpp/mapreduce/interface/ya.make4
-rw-r--r--yt/cpp/mapreduce/io/ut/end_of_stream_ut.cpp94
-rw-r--r--yt/cpp/mapreduce/io/ut/readers_ut.cpp232
-rw-r--r--yt/cpp/mapreduce/io/ut/ut_row.proto7
-rw-r--r--yt/cpp/mapreduce/io/ut/ya.make15
-rw-r--r--yt/cpp/mapreduce/io/ut/yamr_table_reader_ut.cpp185
-rw-r--r--yt/cpp/mapreduce/io/ya.make6
-rw-r--r--yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp124
-rw-r--r--yt/cpp/mapreduce/raw_client/ut/raw_requests_ut.cpp191
-rw-r--r--yt/cpp/mapreduce/raw_client/ut/ya.make12
-rw-r--r--yt/cpp/mapreduce/raw_client/ya.make4
-rw-r--r--yt/cpp/mapreduce/skiff/checked_parser.h1
48 files changed, 3416 insertions, 2100 deletions
diff --git a/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp b/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp
new file mode 100644
index 0000000000..90196246c5
--- /dev/null
+++ b/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp
@@ -0,0 +1,199 @@
+#include "simple_server.h"
+
+#include <yt/cpp/mapreduce/http/http.h>
+
+#include <yt/cpp/mapreduce/interface/config.h>
+
+#include <library/cpp/threading/future/async.h>
+
+#include <library/cpp/http/io/stream.h>
+
+#include <library/cpp/testing/gtest/gtest.h>
+
+#include <library/cpp/testing/common/network.h>
+
+#include <util/string/builder.h>
+#include <util/stream/tee.h>
+#include <util/system/thread.h>
+
+using namespace NYT;
+
+namespace {
+ void ParseFirstLine(const TString firstLine, TString& method, TString& host , ui64& port, TString& command)
+ {
+ size_t idx = firstLine.find_first_of(' ');
+ method = firstLine.substr(0, idx);
+ size_t idx2 = firstLine.find_first_of(':', idx + 1);
+ host = firstLine.substr(idx + 1, idx2 - idx - 1);
+ idx = firstLine.find_first_of('/', idx2 + 1);
+ port = std::atoi(firstLine.substr(idx2 + 1, idx - idx2 - 1).c_str());
+ idx2 = firstLine.find_first_of(' ', idx + 1);
+ command = firstLine.substr(idx, idx2 - idx);
+ }
+} // namespace
+
+THolder<TSimpleServer> CreateSimpleHttpServer()
+{
+ auto port = NTesting::GetFreePort();
+ return MakeHolder<TSimpleServer>(
+ port,
+ [] (IInputStream* input, IOutputStream* output) {
+ try {
+ while (true) {
+ THttpInput httpInput(input);
+ httpInput.ReadAll();
+
+ THttpOutput httpOutput(output);
+ httpOutput.EnableKeepAlive(true);
+ httpOutput << "HTTP/1.1 200 OK\r\n";
+ httpOutput << "\r\n";
+ for (size_t i = 0; i != 10000; ++i) {
+ httpOutput << "The grass was greener";
+ }
+ httpOutput.Flush();
+ }
+ } catch (const std::exception&) {
+ }
+ });
+}
+
+THolder<TSimpleServer> CreateProxyHttpServer()
+{
+ auto port = NTesting::GetFreePort();
+ return MakeHolder<TSimpleServer>(
+ port,
+ [] (IInputStream* input, IOutputStream* output) {
+ try {
+ while (true) {
+ THttpInput httpInput(input);
+ const TString inputStr = httpInput.FirstLine();
+ auto headers = httpInput.Headers();
+ TString method, command, host;
+ ui64 port;
+ ParseFirstLine(inputStr, method, host, port, command);
+
+ THttpRequest request;
+ const TString hostName = ::TStringBuilder() << host << ":" << port;
+ request.Connect(hostName);
+ auto header = THttpHeader(method, command);
+ request.StartRequest(header);
+ request.FinishRequest();
+ auto res = request.GetResponseStream();
+ THttpOutput httpOutput(output);
+ httpOutput.EnableKeepAlive(true);
+ auto strRes = res->ReadAll();
+ httpOutput << "HTTP/1.1 200 OK\r\n";
+ httpOutput << "\r\n";
+ httpOutput << strRes;
+ httpOutput.Flush();
+ }
+ } catch (const std::exception&) {
+ }
+ });
+}
+
+
+class TConnectionPoolConfigGuard
+{
+public:
+ TConnectionPoolConfigGuard(int newSize)
+ {
+ OldValue_ = TConfig::Get()->ConnectionPoolSize;
+ TConfig::Get()->ConnectionPoolSize = newSize;
+ }
+
+ ~TConnectionPoolConfigGuard()
+ {
+ TConfig::Get()->ConnectionPoolSize = OldValue_;
+ }
+
+private:
+ int OldValue_;
+};
+
+class TFuncThread
+ : public ISimpleThread
+{
+public:
+ using TFunc = std::function<void()>;
+
+public:
+ TFuncThread(const TFunc& func)
+ : Func_(func)
+ { }
+
+ void* ThreadProc() noexcept override {
+ Func_();
+ return nullptr;
+ }
+
+private:
+ TFunc Func_;
+};
+
+TEST(TConnectionPool, TestReleaseUnread)
+{
+ auto simpleServer = CreateSimpleHttpServer();
+
+ const TString hostName = ::TStringBuilder() << "localhost:" << simpleServer->GetPort();
+
+ for (size_t i = 0; i != 10; ++i) {
+ THttpRequest request;
+ request.Connect(hostName);
+ request.StartRequest(THttpHeader("GET", "foo"));
+ request.FinishRequest();
+ request.GetResponseStream();
+ }
+}
+
+TEST(TConnectionPool, TestProxy)
+{
+ auto simpleServer = CreateSimpleHttpServer();
+ auto simpleServer2 = CreateProxyHttpServer();
+
+ const TString hostName = ::TStringBuilder() << "localhost:" << simpleServer->GetPort();
+ const TString hostName2 = ::TStringBuilder() << "localhost:" << simpleServer2->GetPort();
+
+ for (size_t i = 0; i != 10; ++i) {
+ THttpRequest request;
+ request.Connect(hostName2);
+ auto header = THttpHeader("GET", "foo");
+ header.SetProxyAddress(hostName2);
+ header.SetHostPort(hostName);
+ request.StartRequest(header);
+ request.FinishRequest();
+ request.GetResponseStream();
+ }
+}
+
+TEST(TConnectionPool, TestConcurrency)
+{
+ TConnectionPoolConfigGuard g(1);
+
+ auto simpleServer = CreateSimpleHttpServer();
+ const TString hostName = ::TStringBuilder() << "localhost:" << simpleServer->GetPort();
+ auto threadPool = CreateThreadPool(20);
+
+ const auto func = [&] {
+ for (int i = 0; i != 100; ++i) {
+ THttpRequest request;
+ request.Connect(hostName);
+ request.StartRequest(THttpHeader("GET", "foo"));
+ request.FinishRequest();
+ auto res = request.GetResponseStream();
+ res->ReadAll();
+ }
+ };
+
+ TVector<THolder<TFuncThread>> threads;
+ for (int i = 0; i != 10; ++i) {
+ threads.push_back(MakeHolder<TFuncThread>(func));
+ };
+
+ for (auto& t : threads) {
+ t->Start();
+ }
+ for (auto& t : threads) {
+ t->Join();
+ }
+}
diff --git a/yt/cpp/mapreduce/http/ut/http_ut.cpp b/yt/cpp/mapreduce/http/ut/http_ut.cpp
new file mode 100644
index 0000000000..0c0ab3363e
--- /dev/null
+++ b/yt/cpp/mapreduce/http/ut/http_ut.cpp
@@ -0,0 +1,94 @@
+#include "simple_server.h"
+
+#include <yt/cpp/mapreduce/http/http.h>
+
+#include <yt/cpp/mapreduce/interface/config.h>
+
+#include <library/cpp/testing/common/network.h>
+
+#include <library/cpp/testing/gtest/gtest.h>
+
+#include <util/system/byteorder.h>
+
+using namespace NYT;
+
+void WriteDataFrame(TStringBuf string, IOutputStream* stream)
+{
+ stream->Write("\x01");
+ ui32 size = string.Size();
+ auto littleEndianSize = HostToLittle(size);
+ stream->Write(&littleEndianSize, sizeof(littleEndianSize));
+ stream->Write(string);
+}
+
+THolder<TSimpleServer> CreateFramingEchoServer()
+{
+ auto port = NTesting::GetFreePort();
+ return MakeHolder<TSimpleServer>(
+ port,
+ [] (IInputStream* input, IOutputStream* output) {
+ try {
+ THttpInput httpInput(input);
+ if (!httpInput.Headers().FindHeader("X-YT-Accept-Framing")) {
+ FAIL() << "X-YT-Accept-Framing header not found";
+ }
+ auto input = httpInput.ReadAll();
+
+ THttpOutput httpOutput(output);
+ httpOutput << "HTTP/1.1 200 OK\r\n";
+ httpOutput << "X-YT-Framing: 1\r\n";
+ httpOutput << "\r\n";
+ httpOutput << "\x02\x02"; // Two KeepAlive frames.
+ WriteDataFrame("", &httpOutput);
+ WriteDataFrame(TStringBuf(input).SubString(0, 10), &httpOutput);
+ httpOutput << "\x02"; // KeepAlive.
+ WriteDataFrame("", &httpOutput);
+ WriteDataFrame(TStringBuf(input).SubString(10, std::string::npos), &httpOutput);
+ httpOutput << "\x02"; // KeepAlive.
+
+ httpOutput.Flush();
+ } catch (const std::exception& exc) {
+ }
+ });
+}
+
+TEST(THttpHeaderTest, AddParameter)
+{
+ THttpHeader header("POST", "/foo");
+ header.AddMutationId();
+
+ auto id1 = header.GetParameters()["mutation_id"].AsString();
+
+ header.AddMutationId();
+
+ auto id2 = header.GetParameters()["mutation_id"].AsString();
+
+ EXPECT_TRUE(id1 != id2);
+}
+
+TEST(TFramingTest, FramingSimple)
+{
+ auto server = CreateFramingEchoServer();
+
+ THttpRequest request;
+ request.Connect(server->GetAddress());
+ auto requestStream = request.StartRequest(THttpHeader("POST", "concatenate"));
+ *requestStream << "Some funny data";
+ request.FinishRequest();
+ auto response = request.GetResponseStream()->ReadAll();
+ EXPECT_EQ(response, "Some funny data");
+}
+
+TEST(TFramingTest, FramingLarge)
+{
+ auto server = CreateFramingEchoServer();
+
+ THttpRequest request;
+ request.Connect(server->GetAddress());
+ auto requestStream = request.StartRequest(THttpHeader("POST", "concatenate"));
+ auto data = TString(100000, 'x');
+ *requestStream << data;
+ request.FinishRequest();
+ auto response = request.GetResponseStream()->ReadAll();
+ EXPECT_EQ(response, data);
+}
diff --git a/yt/cpp/mapreduce/http/ut/simple_server.cpp b/yt/cpp/mapreduce/http/ut/simple_server.cpp
new file mode 100644
index 0000000000..fbc369ec20
--- /dev/null
+++ b/yt/cpp/mapreduce/http/ut/simple_server.cpp
@@ -0,0 +1,90 @@
+#include "simple_server.h"
+
+#include <util/network/pair.h>
+#include <util/network/poller.h>
+#include <util/network/sock.h>
+#include <util/string/builder.h>
+#include <util/system/thread.h>
+#include <util/thread/pool.h>
+
+TSimpleServer::TSimpleServer(int port, TRequestHandler requestHandler)
+ : Port_(port)
+{
+ auto listenSocket = MakeAtomicShared<TInetStreamSocket>();
+ TSockAddrInet addr((TIpHost)INADDR_ANY, Port_);
+ SetSockOpt(*listenSocket, SOL_SOCKET, SO_REUSEADDR, 1);
+ int ret = listenSocket->Bind(&addr);
+ Y_ENSURE_EX(ret == 0, TSystemError() << "Can not bind");
+
+ SOCKET socketPair[2];
+ ret = SocketPair(socketPair);
+ Y_ENSURE_EX(ret == 0, TSystemError() << "Can not create socket pair");
+
+ ret = listenSocket->Listen(10);
+ Y_ENSURE_EX(ret == 0, TSystemError() << "Can not listen socket");
+
+ SendFinishSocket_ = MakeHolder<TInetStreamSocket>(socketPair[1]);
+
+ ThreadPool_ = MakeHolder<TAdaptiveThreadPool>();
+ ThreadPool_->Start(1);
+
+ auto receiveFinish = MakeAtomicShared<TInetStreamSocket>(socketPair[0]);
+ ListenerThread_ = ThreadPool_->Run([listenSocket, receiveFinish, requestHandler] {
+ TSocketPoller socketPoller;
+ socketPoller.WaitRead(*receiveFinish, nullptr);
+ socketPoller.WaitRead(*listenSocket, (void*)1);
+
+ bool running = true;
+ while (running) {
+ void* cookies[2];
+ size_t cookieCount = socketPoller.WaitI(cookies, 2);
+ for (size_t i = 0; i != cookieCount; ++i) {
+ if (!cookies[i]) {
+ running = false;
+ } else {
+ TSockAddrInet addr;
+ TAtomicSharedPtr<TStreamSocket> socket = MakeAtomicShared<TInetStreamSocket>();
+ int ret = listenSocket->Accept(socket.Get(), &addr);
+ Y_ENSURE_EX(ret == 0, TSystemError() << "Can not accept connection");
+
+ SystemThreadFactory()->Run(
+ [socket, requestHandler] {
+ TStreamSocketInput input(socket.Get());
+ TStreamSocketOutput output(socket.Get());
+ requestHandler(&input, &output);
+ socket->Close();
+ });
+ }
+ }
+ }
+ });
+}
+
+TSimpleServer::~TSimpleServer()
+{
+ try {
+ if (ThreadPool_) {
+ Stop();
+ }
+ } catch (...) {
+ }
+}
+
+void TSimpleServer::Stop()
+{
+ // Just send something to indicate shutdown.
+ SendFinishSocket_->Send("X", 1);
+ ListenerThread_->Join();
+ ThreadPool_->Stop();
+ ThreadPool_.Destroy();
+}
+
+int TSimpleServer::GetPort() const
+{
+ return Port_;
+}
+
+TString TSimpleServer::GetAddress() const
+{
+ return TStringBuilder() << "localhost:" << Port_;
+}
diff --git a/yt/cpp/mapreduce/http/ut/simple_server.h b/yt/cpp/mapreduce/http/ut/simple_server.h
new file mode 100644
index 0000000000..f468ca55a6
--- /dev/null
+++ b/yt/cpp/mapreduce/http/ut/simple_server.h
@@ -0,0 +1,35 @@
+#pragma once
+
+#include <util/generic/ptr.h>
+
+#include <util/stream/input.h>
+#include <util/stream/output.h>
+
+#include <util/thread/pool.h>
+
+#include <functional>
+
+class TInetStreamSocket;
+
+// Simple server listens on the specified port and launches
+// requestHandler in the separate thread for each incoming connection.
+class TSimpleServer
+{
+public:
+ using TRequestHandler = std::function<void(IInputStream* input, IOutputStream* output)>;
+
+public:
+ TSimpleServer(int port, TRequestHandler requestHandler);
+ ~TSimpleServer();
+
+ void Stop();
+
+ int GetPort() const;
+ TString GetAddress() const;
+
+private:
+ const int Port_;
+ THolder<IThreadPool> ThreadPool_;
+ THolder<IThreadFactory::IThread> ListenerThread_;
+ THolder<TInetStreamSocket> SendFinishSocket_;
+};
diff --git a/yt/cpp/mapreduce/http/ut/ya.make b/yt/cpp/mapreduce/http/ut/ya.make
new file mode 100644
index 0000000000..fdc632ac95
--- /dev/null
+++ b/yt/cpp/mapreduce/http/ut/ya.make
@@ -0,0 +1,14 @@
+GTEST()
+
+SRCS(
+ connection_pool_ut.cpp
+ http_ut.cpp
+ simple_server.cpp
+)
+
+PEERDIR(
+ yt/cpp/mapreduce/http
+ library/cpp/testing/common
+)
+
+END()
diff --git a/yt/cpp/mapreduce/http/ya.make b/yt/cpp/mapreduce/http/ya.make
index ef81a4b64a..4db8e99bd7 100644
--- a/yt/cpp/mapreduce/http/ya.make
+++ b/yt/cpp/mapreduce/http/ya.make
@@ -27,3 +27,7 @@ PEERDIR(
)
END()
+
+RECURSE_FOR_TESTS(
+ ut
+)
diff --git a/yt/cpp/mapreduce/interface/common_ut.cpp b/yt/cpp/mapreduce/interface/common_ut.cpp
deleted file mode 100644
index 05d7495ca2..0000000000
--- a/yt/cpp/mapreduce/interface/common_ut.cpp
+++ /dev/null
@@ -1,357 +0,0 @@
-#include "common_ut.h"
-
-#include "fluent.h"
-
-#include <yt/cpp/mapreduce/interface/common.h>
-
-#include <yt/cpp/mapreduce/tests/yt_unittest_lib/yt_unittest_lib.h>
-
-#include <library/cpp/testing/unittest/registar.h>
-
-#include <library/cpp/yson/node/node_io.h>
-#include <library/cpp/yson/node/node_builder.h>
-
-#include <util/generic/xrange.h>
-
-using namespace NYT;
-
-template <class T>
-TString SaveToString(const T& obj)
-{
- TString s;
- TStringOutput out(s);
- ::Save(&out, obj);
- return s;
-}
-
-template <class T>
-T LoadFromString(TStringBuf s)
-{
- TMemoryInput in(s);
- T obj;
- ::Load(&in, obj);
- return obj;
-}
-
-template <class T>
-T SaveLoad(const T& obj)
-{
- return LoadFromString<T>(SaveToString(obj));
-}
-
-Y_UNIT_TEST_SUITE(Common)
-{
- Y_UNIT_TEST(SortColumnsLegacy)
- {
- TSortColumns keys1("a", "b");
- UNIT_ASSERT((keys1.Parts_ == TSortColumns{"a", "b"}));
-
- keys1.Add("c", "d");
- UNIT_ASSERT((keys1.Parts_ == TSortColumns{"a", "b", "c", "d"}));
-
- auto keys2 = TSortColumns(keys1).Add("e", "f");
- UNIT_ASSERT((keys1.Parts_ == TSortColumns{"a", "b", "c", "d"}));
- UNIT_ASSERT((keys2.Parts_ == TSortColumns{"a", "b", "c", "d", "e", "f"}));
-
- auto keys3 = TSortColumns(keys1).Add("e").Add("f").Add("g");
- UNIT_ASSERT((keys1.Parts_ == TSortColumns{"a", "b", "c", "d"}));
- UNIT_ASSERT((keys3.Parts_ == TSortColumns{"a", "b", "c", "d", "e", "f", "g"}));
- }
-
- Y_UNIT_TEST(SortColumn)
- {
- auto ascending = TSortColumn("a");
- UNIT_ASSERT_VALUES_EQUAL(ascending.Name(), "a");
- UNIT_ASSERT_VALUES_EQUAL(ascending.SortOrder(), ESortOrder::SO_ASCENDING);
- UNIT_ASSERT_VALUES_EQUAL(ascending, TSortColumn("a", ESortOrder::SO_ASCENDING));
- UNIT_ASSERT_VALUES_UNEQUAL(ascending, TSortColumn("a", ESortOrder::SO_DESCENDING));
-
- UNIT_ASSERT_NO_EXCEPTION(ascending.EnsureAscending());
- UNIT_ASSERT_VALUES_EQUAL(static_cast<TString>(ascending), "a");
- UNIT_ASSERT_VALUES_EQUAL(ascending, "a");
-
- auto another = ascending;
- UNIT_ASSERT_NO_EXCEPTION(another = "another");
- UNIT_ASSERT_VALUES_EQUAL(another.Name(), "another");
- UNIT_ASSERT_VALUES_EQUAL(another.SortOrder(), ESortOrder::SO_ASCENDING);
- UNIT_ASSERT_VALUES_EQUAL(another, TSortColumn("another", ESortOrder::SO_ASCENDING));
- UNIT_ASSERT_VALUES_UNEQUAL(another, TSortColumn("another", ESortOrder::SO_DESCENDING));
-
- auto ascendingNode = BuildYsonNodeFluently().Value(ascending);
- UNIT_ASSERT_VALUES_EQUAL(ascendingNode, TNode("a"));
-
- UNIT_ASSERT_VALUES_EQUAL(SaveLoad(ascending), ascending);
- UNIT_ASSERT_VALUES_UNEQUAL(SaveToString(ascending), SaveToString(TString("a")));
-
- auto descending = TSortColumn("a", ESortOrder::SO_DESCENDING);
- UNIT_ASSERT_VALUES_EQUAL(descending.Name(), "a");
- UNIT_ASSERT_VALUES_EQUAL(descending.SortOrder(), ESortOrder::SO_DESCENDING);
- UNIT_ASSERT_VALUES_EQUAL(descending, TSortColumn("a", ESortOrder::SO_DESCENDING));
- UNIT_ASSERT_VALUES_UNEQUAL(descending, TSortColumn("a", ESortOrder::SO_ASCENDING));
-
- UNIT_ASSERT_EXCEPTION(descending.EnsureAscending(), yexception);
- UNIT_ASSERT_EXCEPTION(static_cast<TString>(descending), yexception);
- UNIT_ASSERT_EXCEPTION(descending == "a", yexception);
- UNIT_ASSERT_EXCEPTION(descending = "a", yexception);
-
- auto descendingNode = BuildYsonNodeFluently().Value(descending);
- UNIT_ASSERT_VALUES_EQUAL(descendingNode, TNode()("name", "a")("sort_order", "descending"));
-
- UNIT_ASSERT_VALUES_EQUAL(SaveLoad(descending), descending);
- UNIT_ASSERT_VALUES_UNEQUAL(SaveToString(descending), SaveToString("a"));
-
- UNIT_ASSERT_VALUES_EQUAL(ToString(TSortColumn("blah")), "blah");
- UNIT_ASSERT_VALUES_EQUAL(ToString(TSortColumn("blah", ESortOrder::SO_DESCENDING)), "{\"name\"=\"blah\";\"sort_order\"=\"descending\"}");
- }
-
- Y_UNIT_TEST(SortColumns)
- {
- TSortColumns ascending("a", "b");
- UNIT_ASSERT(ascending.Parts_ == (TSortColumns{"a", "b"}));
- UNIT_ASSERT_NO_EXCEPTION(ascending.EnsureAscending());
- UNIT_ASSERT_VALUES_EQUAL(static_cast<TColumnNames>(ascending).Parts_, (TVector<TString>{"a", "b"}));
- UNIT_ASSERT_VALUES_EQUAL(ascending.GetNames(), (TVector<TString>{"a", "b"}));
-
- auto mixed = ascending;
- mixed.Add(TSortColumn("c", ESortOrder::SO_DESCENDING), "d");
- UNIT_ASSERT((mixed.Parts_ != TVector<TSortColumn>{"a", "b", "c", "d"}));
- UNIT_ASSERT((mixed.Parts_ == TVector<TSortColumn>{"a", "b", TSortColumn("c", ESortOrder::SO_DESCENDING), "d"}));
- UNIT_ASSERT_VALUES_EQUAL(mixed.GetNames(), (TVector<TString>{"a", "b", "c", "d"}));
- UNIT_ASSERT_EXCEPTION(mixed.EnsureAscending(), yexception);
- UNIT_ASSERT_EXCEPTION(static_cast<TColumnNames>(mixed), yexception);
- }
-
- Y_UNIT_TEST(KeyBound)
- {
- auto keyBound = TKeyBound(ERelation::Greater, TKey(7, "a", TNode()("x", "y")));
- UNIT_ASSERT_VALUES_EQUAL(keyBound.Relation(), ERelation::Greater);
- UNIT_ASSERT_EQUAL(keyBound.Key(), TKey(7, "a", TNode()("x", "y")));
-
- auto keyBound1 = TKeyBound().Relation(ERelation::Greater).Key(TKey(7, "a", TNode()("x", "y")));
- auto expectedNode = TNode()
- .Add(">")
- .Add(TNode().Add(7).Add("a").Add(TNode()("x", "y")));
-
- UNIT_ASSERT_VALUES_EQUAL(expectedNode, BuildYsonNodeFluently().Value(keyBound));
- UNIT_ASSERT_VALUES_EQUAL(expectedNode, BuildYsonNodeFluently().Value(keyBound1));
-
- keyBound.Relation(ERelation::LessOrEqual);
- keyBound.Key(TKey("A", 7));
- UNIT_ASSERT_VALUES_EQUAL(keyBound.Relation(), ERelation::LessOrEqual);
- UNIT_ASSERT_EQUAL(keyBound.Key(), TKey("A", 7));
-
- UNIT_ASSERT_VALUES_EQUAL(
- BuildYsonNodeFluently().Value(keyBound),
- TNode()
- .Add("<=")
- .Add(TNode().Add("A").Add(7)));
- }
-
- Y_UNIT_TEST(TTableSchema)
- {
- TTableSchema schema;
- schema
- .AddColumn(TColumnSchema().Name("a").Type(EValueType::VT_STRING).SortOrder(SO_ASCENDING))
- .AddColumn(TColumnSchema().Name("b").Type(EValueType::VT_UINT64))
- .AddColumn(TColumnSchema().Name("c").Type(EValueType::VT_INT64));
- auto checkSortBy = [](TTableSchema schema, const TVector<TString>& columns) {
- auto initialSchema = schema;
- schema.SortBy(columns);
- for (auto i: xrange(columns.size())) {
- UNIT_ASSERT_VALUES_EQUAL(schema.Columns()[i].Name(), columns[i]);
- UNIT_ASSERT_VALUES_EQUAL(schema.Columns()[i].SortOrder(), ESortOrder::SO_ASCENDING);
- }
- for (auto i: xrange(columns.size(), (size_t)initialSchema.Columns().size())) {
- UNIT_ASSERT_VALUES_EQUAL(schema.Columns()[i].SortOrder(), Nothing());
- }
- UNIT_ASSERT_VALUES_EQUAL(initialSchema.Columns().size(), schema.Columns().size());
- return schema;
- };
- auto newSchema = checkSortBy(schema, {"b"});
- UNIT_ASSERT_VALUES_EQUAL(newSchema.Columns()[1].Name(), TString("a"));
- UNIT_ASSERT_VALUES_EQUAL(newSchema.Columns()[2].Name(), TString("c"));
- checkSortBy(schema, {"b", "c"});
- checkSortBy(schema, {"c", "a"});
- UNIT_ASSERT_EXCEPTION(checkSortBy(schema, {"b", "b"}), yexception);
- UNIT_ASSERT_EXCEPTION(checkSortBy(schema, {"a", "junk"}), yexception);
- }
-
- Y_UNIT_TEST(TTableSchema_Decimal)
- {
- NYT::TTableSchema tableSchema;
-
- tableSchema.AddColumn("a", NTi::Decimal(35, 18));
- tableSchema.AddColumn("b", NTi::Optional(NTi::Decimal(35, 18)));
- tableSchema.AddColumn("c", NTi::List(NTi::Decimal(35, 18)));
-
- auto tableSchemaNode = tableSchema.ToNode();
- const auto& tableSchemaNodeList = tableSchemaNode.AsList();
-
- // There was a bug in the serialization of decimal type: https://github.com/ytsaurus/ytsaurus/issues/173
- {
- const auto& currentType = tableSchemaNodeList[0];
- UNIT_ASSERT_VALUES_EQUAL(currentType.ChildAsString("type"), "string");
- UNIT_ASSERT(currentType.ChildAsBool("required"));
- UNIT_ASSERT(currentType.HasKey("type_v3"));
- UNIT_ASSERT_VALUES_EQUAL(currentType.At("type_v3").ChildAsString("type_name"), "decimal");
- }
- {
- const auto& currentType = tableSchemaNodeList[1];
- UNIT_ASSERT_VALUES_EQUAL(currentType.ChildAsString("type"), "string");
- UNIT_ASSERT(!currentType.ChildAsBool("required"));
- UNIT_ASSERT(currentType.HasKey("type_v3"));
- UNIT_ASSERT_VALUES_EQUAL(currentType.At("type_v3").ChildAsString("type_name"), "optional");
- UNIT_ASSERT_VALUES_EQUAL(currentType.At("type_v3").At("item").ChildAsString("type_name"), "decimal");
- }
- {
- const auto& currentType = tableSchemaNodeList[2];
- UNIT_ASSERT_VALUES_EQUAL(currentType.ChildAsString("type"), "any");
- UNIT_ASSERT(currentType.ChildAsBool("required"));
- UNIT_ASSERT(currentType.HasKey("type_v3"));
- UNIT_ASSERT_VALUES_EQUAL(currentType.At("type_v3").ChildAsString("type_name"), "list");
- UNIT_ASSERT_VALUES_EQUAL(currentType.At("type_v3").At("item").ChildAsString("type_name"), "decimal");
- }
-
- UNIT_ASSERT_EQUAL(tableSchema, TTableSchema::FromNode(tableSchemaNode));
- }
-
- Y_UNIT_TEST(TColumnSchema_TypeV3)
- {
- {
- auto column = TColumnSchema().Type(NTi::Interval());
- UNIT_ASSERT_VALUES_EQUAL(column.Required(), true);
- UNIT_ASSERT_VALUES_EQUAL(column.Type(), VT_INTERVAL);
- }
- {
- auto column = TColumnSchema().Type(NTi::Optional(NTi::Date()));
- UNIT_ASSERT_VALUES_EQUAL(column.Required(), false);
- UNIT_ASSERT_VALUES_EQUAL(column.Type(), VT_DATE);
- }
- {
- auto column = TColumnSchema().Type(NTi::Interval64());
- UNIT_ASSERT_VALUES_EQUAL(column.Required(), true);
- UNIT_ASSERT_VALUES_EQUAL(column.Type(), VT_INTERVAL64);
- }
- {
- auto column = TColumnSchema().Type(NTi::Optional(NTi::Date32()));
- UNIT_ASSERT_VALUES_EQUAL(column.Required(), false);
- UNIT_ASSERT_VALUES_EQUAL(column.Type(), VT_DATE32);
- }
- {
- auto column = TColumnSchema().Type(NTi::Null());
- UNIT_ASSERT_VALUES_EQUAL(column.Required(), false);
- UNIT_ASSERT_VALUES_EQUAL(column.Type(), VT_NULL);
- }
- {
- auto column = TColumnSchema().Type(NTi::Optional(NTi::Null()));
- UNIT_ASSERT_VALUES_EQUAL(column.Required(), false);
- UNIT_ASSERT_VALUES_EQUAL(column.Type(), VT_ANY);
- }
- {
- auto column = TColumnSchema().Type(NTi::Decimal(35, 18));
- UNIT_ASSERT_VALUES_EQUAL(column.Required(), true);
- UNIT_ASSERT_VALUES_EQUAL(column.Type(), VT_STRING);
- }
- }
-
- Y_UNIT_TEST(ToTypeV3)
- {
- UNIT_ASSERT_VALUES_EQUAL(*ToTypeV3(VT_INT32, true), *NTi::Int32());
- UNIT_ASSERT_VALUES_EQUAL(*ToTypeV3(VT_UTF8, false), *NTi::Optional(NTi::Utf8()));
- }
-
- Y_UNIT_TEST(DeserializeColumn)
- {
- auto deserialize = [] (TStringBuf yson) {
- auto node = NodeFromYsonString(yson);
- TColumnSchema column;
- Deserialize(column, node);
- return column;
- };
-
- auto column = deserialize("{name=foo; type=int64; required=%false}");
- UNIT_ASSERT_VALUES_EQUAL(column.Name(), "foo");
- UNIT_ASSERT_VALUES_EQUAL(*column.TypeV3(), *NTi::Optional(NTi::Int64()));
-
- column = deserialize("{name=bar; type=utf8; required=%true; type_v3=utf8}");
- UNIT_ASSERT_VALUES_EQUAL(column.Name(), "bar");
- UNIT_ASSERT_VALUES_EQUAL(*column.TypeV3(), *NTi::Utf8());
- }
-
- Y_UNIT_TEST(ColumnSchemaEquality)
- {
- auto base = TColumnSchema()
- .Name("col")
- .TypeV3(NTi::Optional(NTi::List(NTi::String())))
- .SortOrder(ESortOrder::SO_ASCENDING)
- .Lock("lock")
- .Expression("x + 12")
- .Aggregate("sum")
- .Group("group");
-
- auto other = base;
- ASSERT_SERIALIZABLES_EQUAL(other, base);
- other.Name("other");
- ASSERT_SERIALIZABLES_UNEQUAL(other, base);
-
- other = base;
- other.TypeV3(NTi::List(NTi::String()));
- ASSERT_SERIALIZABLES_UNEQUAL(other, base);
-
- other = base;
- other.ResetSortOrder();
- ASSERT_SERIALIZABLES_UNEQUAL(other, base);
-
- other = base;
- other.Lock("lock1");
- ASSERT_SERIALIZABLES_UNEQUAL(other, base);
-
- other = base;
- other.Expression("x + 13");
- ASSERT_SERIALIZABLES_UNEQUAL(other, base);
-
- other = base;
- other.ResetAggregate();
- ASSERT_SERIALIZABLES_UNEQUAL(other, base);
-
- other = base;
- other.Group("group1");
- ASSERT_SERIALIZABLES_UNEQUAL(other, base);
- }
-
- Y_UNIT_TEST(TableSchemaEquality)
- {
- auto col1 = TColumnSchema()
- .Name("col1")
- .TypeV3(NTi::Optional(NTi::List(NTi::String())))
- .SortOrder(ESortOrder::SO_ASCENDING);
-
- auto col2 = TColumnSchema()
- .Name("col2")
- .TypeV3(NTi::Uint32());
-
- auto schema = TTableSchema()
- .AddColumn(col1)
- .AddColumn(col2)
- .Strict(true)
- .UniqueKeys(true);
-
- auto other = schema;
- ASSERT_SERIALIZABLES_EQUAL(other, schema);
-
- other.Strict(false);
- ASSERT_SERIALIZABLES_UNEQUAL(other, schema);
-
- other = schema;
- other.MutableColumns()[0].TypeV3(NTi::List(NTi::String()));
- ASSERT_SERIALIZABLES_UNEQUAL(other, schema);
-
- other = schema;
- other.MutableColumns().push_back(col1);
- ASSERT_SERIALIZABLES_UNEQUAL(other, schema);
-
- other = schema;
- other.UniqueKeys(false);
- ASSERT_SERIALIZABLES_UNEQUAL(other, schema);
- }
-}
diff --git a/yt/cpp/mapreduce/interface/config_ut.cpp b/yt/cpp/mapreduce/interface/config_ut.cpp
deleted file mode 100644
index e49ba02108..0000000000
--- a/yt/cpp/mapreduce/interface/config_ut.cpp
+++ /dev/null
@@ -1,20 +0,0 @@
-#include <library/cpp/testing/unittest/registar.h>
-
-#include <yt/cpp/mapreduce/interface/config.h>
-
-using namespace NYT;
-
-Y_UNIT_TEST_SUITE(ConfigSuite)
-{
- Y_UNIT_TEST(TestReset) {
- // very limited test, checks only one config field
-
- auto origConfig = *TConfig::Get();
- TConfig::Get()->Reset();
- UNIT_ASSERT_VALUES_EQUAL(origConfig.Hosts, TConfig::Get()->Hosts);
-
- TConfig::Get()->Hosts = "hosts/fb867";
- TConfig::Get()->Reset();
- UNIT_ASSERT_VALUES_EQUAL(origConfig.Hosts, TConfig::Get()->Hosts);
- }
-}
diff --git a/yt/cpp/mapreduce/interface/error_ut.cpp b/yt/cpp/mapreduce/interface/error_ut.cpp
deleted file mode 100644
index 03f2751b23..0000000000
--- a/yt/cpp/mapreduce/interface/error_ut.cpp
+++ /dev/null
@@ -1,81 +0,0 @@
-#include <library/cpp/testing/unittest/registar.h>
-
-#include <library/cpp/json/json_reader.h>
-
-#include <yt/cpp/mapreduce/interface/errors.h>
-#include <yt/cpp/mapreduce/common/helpers.h>
-
-using namespace NYT;
-
-template<>
-void Out<NYT::TNode>(IOutputStream& s, const NYT::TNode& node)
-{
- s << "TNode:" << NodeToYsonString(node);
-}
-
-Y_UNIT_TEST_SUITE(ErrorSuite)
-{
- Y_UNIT_TEST(TestParseJson)
- {
- // Scary real world error! Бу!
- const char* jsonText =
- R"""({)"""
- R"""("code":500,)"""
- R"""("message":"Error resolving path //home/user/link",)"""
- R"""("attributes":{)"""
- R"""("fid":18446484571700269066,)"""
- R"""("method":"Create",)"""
- R"""("tid":17558639495721339338,)"""
- R"""("datetime":"2017-04-07T13:38:56.474819Z",)"""
- R"""("pid":414529,)"""
- R"""("host":"build01-01g.yt.yandex.net"},)"""
- R"""("inner_errors":[{)"""
- R"""("code":1,)"""
- R"""("message":"Node //tt cannot have children",)"""
- R"""("attributes":{)"""
- R"""("fid":18446484571700269066,)"""
- R"""("tid":17558639495721339338,)"""
- R"""("datetime":"2017-04-07T13:38:56.474725Z",)"""
- R"""("pid":414529,)"""
- R"""("host":"build01-01g.yt.yandex.net"},)"""
- R"""("inner_errors":[]}]})""";
-
- NJson::TJsonValue jsonValue;
- ReadJsonFastTree(jsonText, &jsonValue, /*throwOnError=*/ true);
-
- TYtError error(jsonValue);
- UNIT_ASSERT_VALUES_EQUAL(error.GetCode(), 500);
- UNIT_ASSERT_VALUES_EQUAL(error.GetMessage(), R"""(Error resolving path //home/user/link)""");
- UNIT_ASSERT_VALUES_EQUAL(error.InnerErrors().size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(error.InnerErrors()[0].GetCode(), 1);
-
- UNIT_ASSERT_VALUES_EQUAL(error.HasAttributes(), true);
- UNIT_ASSERT_VALUES_EQUAL(error.GetAttributes().at("method"), TNode("Create"));
-
- UNIT_ASSERT_VALUES_EQUAL(error.GetAllErrorCodes(), TSet<int>({500, 1}));
- }
-
- Y_UNIT_TEST(TestGetYsonText) {
- const char* jsonText =
- R"""({)"""
- R"""("code":500,)"""
- R"""("message":"outer error",)"""
- R"""("attributes":{)"""
- R"""("method":"Create",)"""
- R"""("pid":414529},)"""
- R"""("inner_errors":[{)"""
- R"""("code":1,)"""
- R"""("message":"inner error",)"""
- R"""("attributes":{},)"""
- R"""("inner_errors":[])"""
- R"""(}]})""";
- TYtError error;
- error.ParseFrom(jsonText);
- TString ysonText = error.GetYsonText();
- TYtError error2(NodeFromYsonString(ysonText));
- UNIT_ASSERT_EQUAL(
- ysonText,
- R"""({"code"=500;"message"="outer error";"attributes"={"method"="Create";"pid"=414529};"inner_errors"=[{"code"=1;"message"="inner error"}]})""");
- UNIT_ASSERT_EQUAL(error2.GetYsonText(), ysonText);
- }
-}
diff --git a/yt/cpp/mapreduce/interface/format_ut.cpp b/yt/cpp/mapreduce/interface/format_ut.cpp
deleted file mode 100644
index 069c29087d..0000000000
--- a/yt/cpp/mapreduce/interface/format_ut.cpp
+++ /dev/null
@@ -1,235 +0,0 @@
-#include "common.h"
-#include "errors.h"
-#include "format.h"
-#include "common_ut.h"
-
-#include <yt/cpp/mapreduce/interface/proto3_ut.pb.h>
-#include <yt/cpp/mapreduce/interface/protobuf_table_schema_ut.pb.h>
-
-#include <library/cpp/testing/unittest/registar.h>
-
-using namespace NYT;
-
-static TNode GetColumns(const TFormat& format, int tableIndex = 0)
-{
- return format.Config.GetAttributes()["tables"][tableIndex]["columns"];
-}
-
-Y_UNIT_TEST_SUITE(ProtobufFormat)
-{
- Y_UNIT_TEST(TIntegral)
- {
- const auto format = TFormat::Protobuf<NUnitTesting::TIntegral>();
- auto columns = GetColumns(format);
-
- struct TColumn
- {
- TString Name;
- TString ProtoType;
- int FieldNumber;
- };
-
- auto expected = TVector<TColumn>{
- {"DoubleField", "double", 1},
- {"FloatField", "float", 2},
- {"Int32Field", "int32", 3},
- {"Int64Field", "int64", 4},
- {"Uint32Field", "uint32", 5},
- {"Uint64Field", "uint64", 6},
- {"Sint32Field", "sint32", 7},
- {"Sint64Field", "sint64", 8},
- {"Fixed32Field", "fixed32", 9},
- {"Fixed64Field", "fixed64", 10},
- {"Sfixed32Field", "sfixed32", 11},
- {"Sfixed64Field", "sfixed64", 12},
- {"BoolField", "bool", 13},
- {"EnumField", "enum_string", 14},
- };
-
- UNIT_ASSERT_VALUES_EQUAL(columns.Size(), expected.size());
- for (int i = 0; i < static_cast<int>(columns.Size()); ++i) {
- UNIT_ASSERT_VALUES_EQUAL(columns[i]["name"], expected[i].Name);
- UNIT_ASSERT_VALUES_EQUAL(columns[i]["proto_type"], expected[i].ProtoType);
- UNIT_ASSERT_VALUES_EQUAL(columns[i]["field_number"], expected[i].FieldNumber);
- }
- }
-
- Y_UNIT_TEST(TRowFieldSerializationOption)
- {
- const auto format = TFormat::Protobuf<NUnitTesting::TRowFieldSerializationOption>();
- auto columns = GetColumns(format);
-
- UNIT_ASSERT_VALUES_EQUAL(columns[0]["name"], "UrlRow_1");
- UNIT_ASSERT_VALUES_EQUAL(columns[0]["proto_type"], "structured_message");
- UNIT_ASSERT_VALUES_EQUAL(columns[0]["field_number"], 1);
- const auto& fields = columns[0]["fields"];
- UNIT_ASSERT_VALUES_EQUAL(fields[0]["name"], "Host");
- UNIT_ASSERT_VALUES_EQUAL(fields[0]["proto_type"], "string");
- UNIT_ASSERT_VALUES_EQUAL(fields[0]["field_number"], 1);
-
- UNIT_ASSERT_VALUES_EQUAL(fields[1]["name"], "Path");
- UNIT_ASSERT_VALUES_EQUAL(fields[1]["proto_type"], "string");
- UNIT_ASSERT_VALUES_EQUAL(fields[1]["field_number"], 2);
-
- UNIT_ASSERT_VALUES_EQUAL(fields[2]["name"], "HttpCode");
- UNIT_ASSERT_VALUES_EQUAL(fields[2]["proto_type"], "sint32");
- UNIT_ASSERT_VALUES_EQUAL(fields[2]["field_number"], 3);
-
- UNIT_ASSERT_VALUES_EQUAL(columns[1]["name"], "UrlRow_2");
- UNIT_ASSERT_VALUES_EQUAL(columns[1]["proto_type"], "message");
- UNIT_ASSERT_VALUES_EQUAL(columns[1]["field_number"], 2);
- }
-
- Y_UNIT_TEST(Packed)
- {
- const auto format = TFormat::Protobuf<NUnitTesting::TPacked>();
- auto column = GetColumns(format)[0];
-
- UNIT_ASSERT_VALUES_EQUAL(column["name"], "PackedListInt64");
- UNIT_ASSERT_VALUES_EQUAL(column["proto_type"], "int64");
- UNIT_ASSERT_VALUES_EQUAL(column["field_number"], 1);
- UNIT_ASSERT_VALUES_EQUAL(column["packed"], true);
- UNIT_ASSERT_VALUES_EQUAL(column["repeated"], true);
- }
-
- Y_UNIT_TEST(Cyclic)
- {
- UNIT_ASSERT_EXCEPTION(TFormat::Protobuf<NUnitTesting::TCyclic>(), TApiUsageError);
- UNIT_ASSERT_EXCEPTION(TFormat::Protobuf<NUnitTesting::TCyclic::TA>(), TApiUsageError);
- UNIT_ASSERT_EXCEPTION(TFormat::Protobuf<NUnitTesting::TCyclic::TB>(), TApiUsageError);
- UNIT_ASSERT_EXCEPTION(TFormat::Protobuf<NUnitTesting::TCyclic::TC>(), TApiUsageError);
- UNIT_ASSERT_EXCEPTION(TFormat::Protobuf<NUnitTesting::TCyclic::TD>(), TApiUsageError);
-
- const auto format = TFormat::Protobuf<NUnitTesting::TCyclic::TE>();
- auto column = GetColumns(format)[0];
- UNIT_ASSERT_VALUES_EQUAL(column["name"], "d");
- UNIT_ASSERT_VALUES_EQUAL(column["proto_type"], "message");
- UNIT_ASSERT_VALUES_EQUAL(column["field_number"], 1);
- }
-
- Y_UNIT_TEST(Map)
- {
- const auto format = TFormat::Protobuf<NUnitTesting::TWithMap>();
- auto columns = GetColumns(format);
-
- UNIT_ASSERT_VALUES_EQUAL(columns.Size(), 5);
- {
- const auto& column = columns[0];
- UNIT_ASSERT_VALUES_EQUAL(column["name"], "MapDefault");
- UNIT_ASSERT_VALUES_EQUAL(column["proto_type"], "structured_message");
- UNIT_ASSERT_VALUES_EQUAL(column["fields"].Size(), 2);
- UNIT_ASSERT_VALUES_EQUAL(column["fields"][0]["proto_type"], "int64");
- UNIT_ASSERT_VALUES_EQUAL(column["fields"][1]["proto_type"], "message");
- }
- {
- const auto& column = columns[1];
- UNIT_ASSERT_VALUES_EQUAL(column["name"], "MapListOfStructsLegacy");
- UNIT_ASSERT_VALUES_EQUAL(column["proto_type"], "structured_message");
- UNIT_ASSERT_VALUES_EQUAL(column["fields"].Size(), 2);
- UNIT_ASSERT_VALUES_EQUAL(column["fields"][0]["proto_type"], "int64");
- UNIT_ASSERT_VALUES_EQUAL(column["fields"][1]["proto_type"], "message");
- }
- {
- const auto& column = columns[2];
- UNIT_ASSERT_VALUES_EQUAL(column["name"], "MapListOfStructs");
- UNIT_ASSERT_VALUES_EQUAL(column["proto_type"], "structured_message");
- UNIT_ASSERT_VALUES_EQUAL(column["fields"].Size(), 2);
- UNIT_ASSERT_VALUES_EQUAL(column["fields"][0]["proto_type"], "int64");
- UNIT_ASSERT_VALUES_EQUAL(column["fields"][1]["proto_type"], "structured_message");
- }
- {
- const auto& column = columns[3];
- UNIT_ASSERT_VALUES_EQUAL(column["name"], "MapOptionalDict");
- UNIT_ASSERT_VALUES_EQUAL(column["proto_type"], "structured_message");
- UNIT_ASSERT_VALUES_EQUAL(column["fields"].Size(), 2);
- UNIT_ASSERT_VALUES_EQUAL(column["fields"][0]["proto_type"], "int64");
- UNIT_ASSERT_VALUES_EQUAL(column["fields"][1]["proto_type"], "structured_message");
- }
- {
- const auto& column = columns[4];
- UNIT_ASSERT_VALUES_EQUAL(column["name"], "MapDict");
- UNIT_ASSERT_VALUES_EQUAL(column["proto_type"], "structured_message");
- UNIT_ASSERT_VALUES_EQUAL(column["fields"].Size(), 2);
- UNIT_ASSERT_VALUES_EQUAL(column["fields"][0]["proto_type"], "int64");
- UNIT_ASSERT_VALUES_EQUAL(column["fields"][1]["proto_type"], "structured_message");
- }
- }
-
- Y_UNIT_TEST(Oneof)
- {
- const auto format = TFormat::Protobuf<NUnitTesting::TWithOneof>();
- auto columns = GetColumns(format);
-
- UNIT_ASSERT_VALUES_EQUAL(columns.Size(), 4);
- auto check = [] (const TNode& column, TStringBuf name, TStringBuf oneof2Name) {
- UNIT_ASSERT_VALUES_EQUAL(column["name"], name);
- UNIT_ASSERT_VALUES_EQUAL(column["proto_type"], "structured_message");
- UNIT_ASSERT_VALUES_EQUAL(column["fields"].Size(), 5);
- UNIT_ASSERT_VALUES_EQUAL(column["fields"][0]["name"], "field");
-
- const auto& oneof2 = column["fields"][1];
- UNIT_ASSERT_VALUES_EQUAL(oneof2["name"], oneof2Name);
- UNIT_ASSERT_VALUES_EQUAL(oneof2["proto_type"], "oneof");
- UNIT_ASSERT_VALUES_EQUAL(oneof2["fields"][0]["name"], "y2");
- UNIT_ASSERT_VALUES_EQUAL(oneof2["fields"][1]["name"], "z2");
- UNIT_ASSERT_VALUES_EQUAL(oneof2["fields"][1]["proto_type"], "structured_message");
- const auto& embeddedOneof = oneof2["fields"][1]["fields"][0];
- UNIT_ASSERT_VALUES_EQUAL(embeddedOneof["name"], "Oneof");
- UNIT_ASSERT_VALUES_EQUAL(embeddedOneof["fields"][0]["name"], "x");
- UNIT_ASSERT_VALUES_EQUAL(embeddedOneof["fields"][1]["name"], "y");
- UNIT_ASSERT_VALUES_EQUAL(oneof2["fields"][2]["name"], "x2");
-
- UNIT_ASSERT_VALUES_EQUAL(column["fields"][2]["name"], "x1");
- UNIT_ASSERT_VALUES_EQUAL(column["fields"][3]["name"], "y1");
- UNIT_ASSERT_VALUES_EQUAL(column["fields"][4]["name"], "z1");
- };
-
- check(columns[0], "DefaultSeparateFields", "variant_field_name");
- check(columns[1], "NoDefault", "Oneof2");
-
- {
- const auto& column = columns[2];
- UNIT_ASSERT_VALUES_EQUAL(column["name"], "SerializationProtobuf");
- UNIT_ASSERT_VALUES_EQUAL(column["proto_type"], "structured_message");
- UNIT_ASSERT_VALUES_EQUAL(column["fields"].Size(), 3);
- UNIT_ASSERT_VALUES_EQUAL(column["fields"][0]["name"], "x1");
- UNIT_ASSERT_VALUES_EQUAL(column["fields"][1]["name"], "y1");
- UNIT_ASSERT_VALUES_EQUAL(column["fields"][2]["name"], "z1");
- }
- {
- const auto& column = columns[3];
- UNIT_ASSERT_VALUES_EQUAL(column["name"], "TopLevelOneof");
- UNIT_ASSERT_VALUES_EQUAL(column["proto_type"], "oneof");
- UNIT_ASSERT_VALUES_EQUAL(column["fields"].Size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(column["fields"][0]["name"], "MemberOfTopLevelOneof");
- }
- }
-}
-
-Y_UNIT_TEST_SUITE(Proto3)
-{
- Y_UNIT_TEST(TWithOptional)
- {
- const auto format = TFormat::Protobuf<NTestingProto3::TWithOptional>();
- auto columns = GetColumns(format);
-
- UNIT_ASSERT_VALUES_EQUAL(columns[0]["name"], "x");
- UNIT_ASSERT_VALUES_EQUAL(columns[0]["proto_type"], "int64");
- UNIT_ASSERT_VALUES_EQUAL(columns[0]["field_number"], 1);
- }
-
- Y_UNIT_TEST(TWithOptionalMessage)
- {
- const auto format = TFormat::Protobuf<NTestingProto3::TWithOptionalMessage>();
- auto columns = GetColumns(format);
-
- UNIT_ASSERT_VALUES_EQUAL(columns[0]["name"], "x");
- UNIT_ASSERT_VALUES_EQUAL(columns[0]["proto_type"], "structured_message");
- UNIT_ASSERT_VALUES_EQUAL(columns[0]["field_number"], 1);
-
- UNIT_ASSERT_VALUES_EQUAL(columns[0]["fields"].Size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(columns[0]["fields"][0]["name"], "x");
- UNIT_ASSERT_VALUES_EQUAL(columns[0]["fields"][0]["proto_type"], "int64");
- UNIT_ASSERT_VALUES_EQUAL(columns[0]["fields"][0]["field_number"], 1);
- }
-}
diff --git a/yt/cpp/mapreduce/interface/job_counters.cpp b/yt/cpp/mapreduce/interface/job_counters.cpp
index 6d4a2a6fcb..717982e216 100644
--- a/yt/cpp/mapreduce/interface/job_counters.cpp
+++ b/yt/cpp/mapreduce/interface/job_counters.cpp
@@ -60,7 +60,7 @@ ui64 TJobCounter::GetValue(const TStringBuf key) const
////////////////////////////////////////////////////////////////////
-TJobCounters::TJobCounters(const NYT::TNode& counters)
+TJobCounters::TJobCounters(const TNode& counters)
: Total_(0)
{
if (!counters.IsMap()) {
diff --git a/yt/cpp/mapreduce/interface/job_counters.h b/yt/cpp/mapreduce/interface/job_counters.h
index 9257cc1ec1..573cbe4784 100644
--- a/yt/cpp/mapreduce/interface/job_counters.h
+++ b/yt/cpp/mapreduce/interface/job_counters.h
@@ -33,7 +33,7 @@ public:
///
/// Construct counter from counters node.
- TJobCounters(const NYT::TNode& counters);
+ TJobCounters(const TNode& counters);
const TJobCounter& GetAborted() const;
const TJobCounter& GetAbortedScheduled() const;
diff --git a/yt/cpp/mapreduce/interface/job_counters_ut.cpp b/yt/cpp/mapreduce/interface/job_counters_ut.cpp
deleted file mode 100644
index 56d3932b8f..0000000000
--- a/yt/cpp/mapreduce/interface/job_counters_ut.cpp
+++ /dev/null
@@ -1,103 +0,0 @@
-#include <yt/cpp/mapreduce/interface/job_counters.h>
-#include <yt/cpp/mapreduce/interface/operation.h>
-
-#include <library/cpp/yson/node/node_io.h>
-
-#include <library/cpp/testing/unittest/registar.h>
-
-using namespace NYT;
-
-Y_UNIT_TEST_SUITE(JobCounters)
-{
- Y_UNIT_TEST(Full)
- {
- const TString input = R"""(
- {
- "completed" = {
- "total" = 6;
- "non-interrupted" = 1;
- "interrupted" = {
- "whatever_interrupted" = 2;
- "whatever_else_interrupted" = 3;
- };
- };
- "aborted" = {
- "non_scheduled" = {
- "whatever_non_scheduled" = 4;
- "whatever_else_non_scheduled" = 5;
- };
- "scheduled" = {
- "whatever_scheduled" = 6;
- "whatever_else_scheduled" = 7;
- };
- "total" = 22;
- };
- "lost" = 8;
- "invalidated" = 9;
- "failed" = 10;
- "running" = 11;
- "suspended" = 12;
- "pending" = 13;
- "blocked" = 14;
- "total" = 105;
- })""";
-
- TJobCounters counters(NodeFromYsonString(input));
-
- UNIT_ASSERT_VALUES_EQUAL(counters.GetTotal(), 105);
-
- UNIT_ASSERT_VALUES_EQUAL(counters.GetCompleted().GetTotal(), 6);
- UNIT_ASSERT_VALUES_EQUAL(counters.GetCompletedNonInterrupted().GetTotal(), 1);
- UNIT_ASSERT_VALUES_EQUAL(counters.GetCompletedInterrupted().GetTotal(), 5);
- UNIT_ASSERT_VALUES_EQUAL(counters.GetAborted().GetTotal(), 22);
- UNIT_ASSERT_VALUES_EQUAL(counters.GetAbortedNonScheduled().GetTotal(), 9);
- UNIT_ASSERT_VALUES_EQUAL(counters.GetAbortedScheduled().GetTotal(), 13);
- UNIT_ASSERT_VALUES_EQUAL(counters.GetLost().GetTotal(), 8);
- UNIT_ASSERT_VALUES_EQUAL(counters.GetInvalidated().GetTotal(), 9);
- UNIT_ASSERT_VALUES_EQUAL(counters.GetFailed().GetTotal(), 10);
- UNIT_ASSERT_VALUES_EQUAL(counters.GetRunning().GetTotal(), 11);
- UNIT_ASSERT_VALUES_EQUAL(counters.GetSuspended().GetTotal(), 12);
- UNIT_ASSERT_VALUES_EQUAL(counters.GetPending().GetTotal(), 13);
- UNIT_ASSERT_VALUES_EQUAL(counters.GetBlocked().GetTotal(), 14);
-
- UNIT_ASSERT_VALUES_EQUAL(counters.GetCompletedInterrupted().GetValue("whatever_interrupted"), 2);
- UNIT_ASSERT_VALUES_EQUAL(counters.GetCompletedInterrupted().GetValue("whatever_else_interrupted"), 3);
- UNIT_ASSERT_VALUES_EQUAL(counters.GetAbortedNonScheduled().GetValue("whatever_non_scheduled"), 4);
- UNIT_ASSERT_VALUES_EQUAL(counters.GetAbortedNonScheduled().GetValue("whatever_else_non_scheduled"), 5);
- UNIT_ASSERT_VALUES_EQUAL(counters.GetAbortedScheduled().GetValue("whatever_scheduled"), 6);
- UNIT_ASSERT_VALUES_EQUAL(counters.GetAbortedScheduled().GetValue("whatever_else_scheduled"), 7);
-
- UNIT_ASSERT_EXCEPTION(counters.GetCompletedInterrupted().GetValue("Nothingness"), yexception);
- }
-
- Y_UNIT_TEST(Empty)
- {
- const TString input = "{}";
-
- TJobCounters counters(NodeFromYsonString(input));
-
- UNIT_ASSERT_VALUES_EQUAL(counters.GetTotal(), 0);
-
- UNIT_ASSERT_VALUES_EQUAL(counters.GetCompleted().GetTotal(), 0);
- UNIT_ASSERT_VALUES_EQUAL(counters.GetCompletedNonInterrupted().GetTotal(), 0);
- UNIT_ASSERT_VALUES_EQUAL(counters.GetCompletedInterrupted().GetTotal(), 0);
- UNIT_ASSERT_VALUES_EQUAL(counters.GetAborted().GetTotal(), 0);
- UNIT_ASSERT_VALUES_EQUAL(counters.GetAbortedNonScheduled().GetTotal(), 0);
- UNIT_ASSERT_VALUES_EQUAL(counters.GetAbortedScheduled().GetTotal(), 0);
- UNIT_ASSERT_VALUES_EQUAL(counters.GetLost().GetTotal(), 0);
- UNIT_ASSERT_VALUES_EQUAL(counters.GetInvalidated().GetTotal(), 0);
- UNIT_ASSERT_VALUES_EQUAL(counters.GetFailed().GetTotal(), 0);
- UNIT_ASSERT_VALUES_EQUAL(counters.GetRunning().GetTotal(), 0);
- UNIT_ASSERT_VALUES_EQUAL(counters.GetSuspended().GetTotal(), 0);
- UNIT_ASSERT_VALUES_EQUAL(counters.GetPending().GetTotal(), 0);
- UNIT_ASSERT_VALUES_EQUAL(counters.GetBlocked().GetTotal(), 0);
- }
-
- Y_UNIT_TEST(Broken)
- {
- UNIT_ASSERT_EXCEPTION_CONTAINS(TJobCounters(TNode()), yexception, "TJobCounters");
- UNIT_ASSERT_EXCEPTION_CONTAINS(TJobCounters(TNode(1)), yexception, "TJobCounters");
- UNIT_ASSERT_EXCEPTION_CONTAINS(TJobCounters(TNode(1.0)), yexception, "TJobCounters");
- UNIT_ASSERT_EXCEPTION_CONTAINS(TJobCounters(TNode("Whatever")), yexception, "TJobCounters");
- }
-}
diff --git a/yt/cpp/mapreduce/interface/job_statistics_ut.cpp b/yt/cpp/mapreduce/interface/job_statistics_ut.cpp
deleted file mode 100644
index 2603a4fbf0..0000000000
--- a/yt/cpp/mapreduce/interface/job_statistics_ut.cpp
+++ /dev/null
@@ -1,257 +0,0 @@
-#include <yt/cpp/mapreduce/interface/job_statistics.h>
-#include <yt/cpp/mapreduce/interface/operation.h>
-
-#include <library/cpp/yson/node/node_io.h>
-
-#include <library/cpp/testing/unittest/registar.h>
-
-using namespace NYT;
-
-Y_UNIT_TEST_SUITE(JobStatistics)
-{
- Y_UNIT_TEST(Simple)
- {
- const TString input = R"""(
- {
- "data" = {
- "output" = {
- "0" = {
- "uncompressed_data_size" = {
- "$" = {
- "completed" = {
- "simple_sort" = {
- "max" = 130;
- "count" = 1;
- "min" = 130;
- "sum" = 130;
- };
- "map" = {
- "max" = 42;
- "count" = 1;
- "min" = 42;
- "sum" = 42;
- };
- };
- "aborted" = {
- "simple_sort" = {
- "max" = 24;
- "count" = 1;
- "min" = 24;
- "sum" = 24;
- };
- };
- };
- };
- };
- };
- };
- })""";
-
- TJobStatistics stat(NodeFromYsonString(input));
-
- UNIT_ASSERT(stat.HasStatistics("data/output/0/uncompressed_data_size"));
- UNIT_ASSERT(!stat.HasStatistics("nonexistent-statistics"));
- UNIT_ASSERT_EXCEPTION_CONTAINS(stat.GetStatistics("BLAH-BLAH"), yexception, "Statistics");
-
- UNIT_ASSERT_VALUES_EQUAL(stat.GetStatisticsNames(), TVector<TString>{"data/output/0/uncompressed_data_size"});
-
- UNIT_ASSERT_VALUES_EQUAL(stat.GetStatistics("data/output/0/uncompressed_data_size").Max(), 130);
- UNIT_ASSERT_VALUES_EQUAL(stat.GetStatistics("data/output/0/uncompressed_data_size").Count(), 2);
- UNIT_ASSERT_VALUES_EQUAL(stat.GetStatistics("data/output/0/uncompressed_data_size").Min(), 42);
- UNIT_ASSERT_VALUES_EQUAL(stat.GetStatistics("data/output/0/uncompressed_data_size").Sum(), 172);
- UNIT_ASSERT_VALUES_EQUAL(stat.GetStatistics("data/output/0/uncompressed_data_size").Avg(), 172 / 2);
-
- UNIT_ASSERT_VALUES_EQUAL(stat.JobState({EJobState::Aborted}).GetStatistics("data/output/0/uncompressed_data_size").Sum(), 24);
- UNIT_ASSERT_VALUES_EQUAL(stat.JobType({EJobType::Map}).JobState({EJobState::Aborted}).GetStatistics("data/output/0/uncompressed_data_size").Sum(), TMaybe<i64>());
- }
-
- Y_UNIT_TEST(TestOtherTypes)
- {
- const TString input = R"""(
- {
- "time" = {
- "exec" = {
- "$" = {
- "completed" = {
- "map" = {
- "max" = 2482468;
- "count" = 38;
- "min" = 578976;
- "sum" = 47987270;
- };
- };
- };
- };
- };
- })""";
-
- TJobStatistics stat(NodeFromYsonString(input));
-
- UNIT_ASSERT_VALUES_EQUAL(stat.GetStatisticsAs<TDuration>("time/exec").Max(), TDuration::MilliSeconds(2482468));
- }
-
- Y_UNIT_TEST(Custom)
- {
- const TString input = R"""(
- {
- "custom" = {
- "some" = {
- "path" = {
- "$" = {
- "completed" = {
- "map" = {
- "max" = -1;
- "count" = 1;
- "min" = -1;
- "sum" = -1;
- };
- };
- };
- };
- };
- "another" = {
- "path" = {
- "$" = {
- "completed" = {
- "map" = {
- "max" = 1001;
- "count" = 2;
- "min" = 1001;
- "sum" = 2002;
- };
- };
- };
- };
- };
- };
- })""";
-
- TJobStatistics stat(NodeFromYsonString(input));
-
- UNIT_ASSERT(stat.HasCustomStatistics("some/path"));
- UNIT_ASSERT(!stat.HasCustomStatistics("nonexistent-statistics"));
- UNIT_ASSERT_EXCEPTION_CONTAINS(stat.GetCustomStatistics("BLAH-BLAH"), yexception, "Statistics");
-
- const auto names = stat.GetCustomStatisticsNames();
- const THashSet<TString> expected = {"some/path", "another/path"};
- UNIT_ASSERT_VALUES_EQUAL(THashSet<TString>(names.begin(), names.end()), expected);
-
- UNIT_ASSERT_VALUES_EQUAL(stat.GetCustomStatistics("some/path").Max(), -1);
- UNIT_ASSERT_VALUES_EQUAL(stat.GetCustomStatistics("another/path").Avg(), 1001);
- }
-
- Y_UNIT_TEST(TaskNames)
- {
- const TString input = R"""(
- {
- "data" = {
- "output" = {
- "0" = {
- "uncompressed_data_size" = {
- "$" = {
- "completed" = {
- "partition_map" = {
- "max" = 130;
- "count" = 1;
- "min" = 130;
- "sum" = 130;
- };
- "partition(0)" = {
- "max" = 42;
- "count" = 1;
- "min" = 42;
- "sum" = 42;
- };
- };
- "aborted" = {
- "simple_sort" = {
- "max" = 24;
- "count" = 1;
- "min" = 24;
- "sum" = 24;
- };
- };
- };
- };
- };
- };
- };
- })""";
-
- TJobStatistics stat(NodeFromYsonString(input));
-
- UNIT_ASSERT(stat.HasStatistics("data/output/0/uncompressed_data_size"));
- UNIT_ASSERT(!stat.HasStatistics("nonexistent-statistics"));
- UNIT_ASSERT_EXCEPTION_CONTAINS(stat.GetStatistics("BLAH-BLAH"), yexception, "Statistics");
-
- UNIT_ASSERT_VALUES_EQUAL(stat.GetStatisticsNames(), TVector<TString>{"data/output/0/uncompressed_data_size"});
-
- UNIT_ASSERT_VALUES_EQUAL(stat.GetStatistics("data/output/0/uncompressed_data_size").Max(), 130);
- UNIT_ASSERT_VALUES_EQUAL(stat.GetStatistics("data/output/0/uncompressed_data_size").Count(), 2);
- UNIT_ASSERT_VALUES_EQUAL(stat.GetStatistics("data/output/0/uncompressed_data_size").Min(), 42);
- UNIT_ASSERT_VALUES_EQUAL(stat.GetStatistics("data/output/0/uncompressed_data_size").Sum(), 172);
- UNIT_ASSERT_VALUES_EQUAL(stat.GetStatistics("data/output/0/uncompressed_data_size").Avg(), 172 / 2);
-
- UNIT_ASSERT_VALUES_EQUAL(
- stat
- .JobState({EJobState::Aborted})
- .GetStatistics("data/output/0/uncompressed_data_size")
- .Sum(),
- 24);
- UNIT_ASSERT_VALUES_EQUAL(
- stat
- .JobType({EJobType::Partition})
- .JobState({EJobState::Aborted})
- .GetStatistics("data/output/0/uncompressed_data_size")
- .Sum(),
- TMaybe<i64>());
- UNIT_ASSERT_VALUES_EQUAL(
- stat
- .TaskName({"partition(0)"})
- .GetStatistics("data/output/0/uncompressed_data_size")
- .Sum(),
- 42);
- UNIT_ASSERT_VALUES_EQUAL(
- stat
- .TaskName({"partition"})
- .GetStatistics("data/output/0/uncompressed_data_size")
- .Sum(),
- TMaybe<i64>());
- UNIT_ASSERT_VALUES_EQUAL(
- stat
- .TaskName({"partition_map(0)"})
- .GetStatistics("data/output/0/uncompressed_data_size")
- .Sum(),
- 130);
- UNIT_ASSERT_VALUES_EQUAL(
- stat
- .JobType({EJobType::Partition})
- .GetStatistics("data/output/0/uncompressed_data_size")
- .Sum(),
- 42);
- UNIT_ASSERT_VALUES_EQUAL(
- stat
- .JobType({EJobType::PartitionMap})
- .GetStatistics("data/output/0/uncompressed_data_size")
- .Sum(),
- 130);
- UNIT_ASSERT_VALUES_EQUAL(
- stat
- .TaskName({ETaskName::Partition0})
- .GetStatistics("data/output/0/uncompressed_data_size")
- .Sum(),
- 42);
- UNIT_ASSERT_VALUES_EQUAL(
- stat
- .TaskName({ETaskName::Partition1})
- .GetStatistics("data/output/0/uncompressed_data_size")
- .Sum(),
- TMaybe<i64>());
- UNIT_ASSERT_VALUES_EQUAL(
- stat
- .TaskName({ETaskName::PartitionMap0})
- .GetStatistics("data/output/0/uncompressed_data_size")
- .Sum(),
- 130);
- }
-}
diff --git a/yt/cpp/mapreduce/interface/logging/ut/log_ut.cpp b/yt/cpp/mapreduce/interface/logging/ut/log_ut.cpp
new file mode 100644
index 0000000000..b79b2f707f
--- /dev/null
+++ b/yt/cpp/mapreduce/interface/logging/ut/log_ut.cpp
@@ -0,0 +1,19 @@
+#include <yt/cpp/mapreduce/interface/logging/logger.h>
+
+#include <library/cpp/testing/gtest/gtest.h>
+
+#include <util/string/cast.h>
+
+using namespace NYT;
+
+TEST(TLoggingTest, FromString) {
+ EXPECT_EQ(FromString("error"), ILogger::ELevel::ERROR);
+ EXPECT_EQ(FromString("warning"), ILogger::ELevel::ERROR);
+ EXPECT_EQ(FromString("info"), ILogger::ELevel::INFO);
+ EXPECT_EQ(FromString("debug"), ILogger::ELevel::DEBUG);
+ EXPECT_EQ(FromString("ERROR"), ILogger::ELevel::ERROR);
+ EXPECT_EQ(FromString("WARNING"), ILogger::ELevel::ERROR);
+ EXPECT_EQ(FromString("INFO"), ILogger::ELevel::INFO);
+ EXPECT_EQ(FromString("DEBUG"), ILogger::ELevel::DEBUG);
+ EXPECT_THROW(FromString<ILogger::ELevel>("no"), yexception);
+}
diff --git a/yt/cpp/mapreduce/interface/logging/ut/ya.make b/yt/cpp/mapreduce/interface/logging/ut/ya.make
new file mode 100644
index 0000000000..53d94509fb
--- /dev/null
+++ b/yt/cpp/mapreduce/interface/logging/ut/ya.make
@@ -0,0 +1,11 @@
+GTEST()
+
+SRCS(
+ log_ut.cpp
+)
+
+PEERDIR(
+ yt/cpp/mapreduce/interface/logging
+)
+
+END()
diff --git a/yt/cpp/mapreduce/interface/logging/ya.make b/yt/cpp/mapreduce/interface/logging/ya.make
index 8095bfe4ba..03407caf14 100644
--- a/yt/cpp/mapreduce/interface/logging/ya.make
+++ b/yt/cpp/mapreduce/interface/logging/ya.make
@@ -14,3 +14,7 @@ PEERDIR(
GENERATE_ENUM_SERIALIZATION(logger.h)
END()
+
+RECURSE_FOR_TESTS(
+ ut
+)
diff --git a/yt/cpp/mapreduce/interface/operation_ut.cpp b/yt/cpp/mapreduce/interface/operation_ut.cpp
deleted file mode 100644
index 0fa62e1568..0000000000
--- a/yt/cpp/mapreduce/interface/operation_ut.cpp
+++ /dev/null
@@ -1,269 +0,0 @@
-#include <yt/cpp/mapreduce/interface/common_ut.h>
-#include <yt/cpp/mapreduce/interface/job_statistics.h>
-#include <yt/cpp/mapreduce/interface/operation.h>
-#include <yt/cpp/mapreduce/interface/protobuf_table_schema_ut.pb.h>
-
-#include <yt/cpp/mapreduce/tests/yt_unittest_lib/yt_unittest_lib.h>
-
-#include <library/cpp/yson/node/node_io.h>
-
-#include <library/cpp/testing/unittest/registar.h>
-
-using namespace NYT;
-using namespace NYT::NUnitTesting;
-
-class TDummyInferenceContext
- : public IOperationPreparationContext
-{
-public:
- TDummyInferenceContext(int inputCount, int outputCount)
- : InputCount_(inputCount)
- , OutputCount_(outputCount)
- , InputSchemas_(inputCount)
- { }
-
- int GetInputCount() const override
- {
- return InputCount_;
- }
-
- int GetOutputCount() const override
- {
- return OutputCount_;
- }
-
- const TVector<TTableSchema>& GetInputSchemas() const override
- {
- return InputSchemas_;
- }
-
- const TTableSchema& GetInputSchema(int index) const override
- {
- return InputSchemas_[index];
- }
-
- TMaybe<TYPath> GetInputPath(int) const override
- {
- return Nothing();
- }
-
- TMaybe<TYPath> GetOutputPath(int) const override
- {
- return Nothing();
- }
-
-private:
- int InputCount_;
- int OutputCount_;
- TVector<TTableSchema> InputSchemas_;
-};
-
-Y_UNIT_TEST_SUITE(PrepareOperation)
-{
-
- Y_UNIT_TEST(BasicSchemas)
- {
- auto firstSchema = TTableSchema()
- .AddColumn(TColumnSchema().Name("some_column").Type(EValueType::VT_UINT64));
- auto otherSchema = TTableSchema()
- .AddColumn(TColumnSchema().Name("other_column").Type(EValueType::VT_BOOLEAN));
- auto thirdSchema = TTableSchema()
- .AddColumn(TColumnSchema().Name("third_column").Type(EValueType::VT_STRING));
-
- TDummyInferenceContext context(3,7);
- TJobOperationPreparer builder(context);
-
- builder
- .OutputSchema(1, firstSchema)
- .BeginOutputGroup(TVector<int>{2, 5})
- .Schema(otherSchema)
- .EndOutputGroup()
- .BeginOutputGroup(3, 5)
- .Schema(thirdSchema)
- .EndOutputGroup()
- .BeginOutputGroup(TVector<int>{0, 6})
- .Schema(thirdSchema)
- .EndOutputGroup();
-
- UNIT_ASSERT_EXCEPTION(builder.OutputSchema(1, otherSchema), TApiUsageError);
- UNIT_ASSERT_EXCEPTION(builder.BeginOutputGroup(3, 5).Schema(otherSchema), TApiUsageError);
- UNIT_ASSERT_EXCEPTION(builder.BeginOutputGroup(TVector<int>{3,6,7}).Schema(otherSchema), TApiUsageError);
-
- builder.Finish();
- auto result = builder.GetOutputSchemas();
-
- ASSERT_SERIALIZABLES_EQUAL(result[0], thirdSchema);
- ASSERT_SERIALIZABLES_EQUAL(result[1], firstSchema);
- ASSERT_SERIALIZABLES_EQUAL(result[2], otherSchema);
- ASSERT_SERIALIZABLES_EQUAL(result[3], thirdSchema);
- ASSERT_SERIALIZABLES_EQUAL(result[4], thirdSchema);
- ASSERT_SERIALIZABLES_EQUAL(result[5], otherSchema);
- ASSERT_SERIALIZABLES_EQUAL(result[6], thirdSchema);
- }
-
- Y_UNIT_TEST(NoSchema)
- {
- auto schema = TTableSchema()
- .AddColumn(TColumnSchema().Name("some_column").Type(EValueType::VT_UINT64));
-
- TDummyInferenceContext context(3,4);
- TJobOperationPreparer builder(context);
-
- builder
- .OutputSchema(1, schema)
- .NoOutputSchema(0)
- .BeginOutputGroup(2, 4)
- .Schema(schema)
- .EndOutputGroup();
-
- UNIT_ASSERT_EXCEPTION(builder.OutputSchema(0, schema), TApiUsageError);
-
- builder.Finish();
- auto result = builder.GetOutputSchemas();
-
- UNIT_ASSERT(result[0].Empty());
-
- ASSERT_SERIALIZABLES_EQUAL(result[1], schema);
- ASSERT_SERIALIZABLES_EQUAL(result[2], schema);
- ASSERT_SERIALIZABLES_EQUAL(result[3], schema);
- }
-
- Y_UNIT_TEST(Descriptions)
- {
- auto urlRowSchema = TTableSchema()
- .AddColumn(TColumnSchema().Name("Host").Type(NTi::Optional(NTi::String())))
- .AddColumn(TColumnSchema().Name("Path").Type(NTi::Optional(NTi::String())))
- .AddColumn(TColumnSchema().Name("HttpCode").Type(NTi::Optional(NTi::Int32())));
-
- auto urlRowStruct = NTi::Struct({
- {"Host", NTi::Optional(NTi::String())},
- {"Path", NTi::Optional(NTi::String())},
- {"HttpCode", NTi::Optional(NTi::Int32())},
- });
-
- auto rowFieldSerializationOptionSchema = TTableSchema()
- .AddColumn(TColumnSchema().Name("UrlRow_1").Type(NTi::Optional(urlRowStruct)))
- .AddColumn(TColumnSchema().Name("UrlRow_2").Type(NTi::Optional(NTi::String())));
-
- auto rowSerializedRepeatedFieldsSchema = TTableSchema()
- .AddColumn(TColumnSchema().Name("Ints").Type(NTi::List(NTi::Int64())))
- .AddColumn(TColumnSchema().Name("UrlRows").Type(NTi::List(urlRowStruct)));
-
- TDummyInferenceContext context(5,7);
- TJobOperationPreparer builder(context);
-
- builder
- .InputDescription<TUrlRow>(0)
- .BeginInputGroup(2, 3)
- .Description<TUrlRow>()
- .EndInputGroup()
- .BeginInputGroup(TVector<int>{1, 4})
- .Description<TRowSerializedRepeatedFields>()
- .EndInputGroup()
- .InputDescription<TUrlRow>(3);
-
- UNIT_ASSERT_EXCEPTION(builder.InputDescription<TUrlRow>(0), TApiUsageError);
-
- builder
- .OutputDescription<TUrlRow>(0, false)
- .OutputDescription<TRowFieldSerializationOption>(1)
- .BeginOutputGroup(2, 4)
- .Description<TUrlRow>()
- .EndOutputGroup()
- .BeginOutputGroup(TVector<int>{4,6})
- .Description<TRowSerializedRepeatedFields>()
- .EndOutputGroup()
- .OutputDescription<TUrlRow>(5, false);
-
- UNIT_ASSERT_EXCEPTION(builder.OutputDescription<TUrlRow>(0), TApiUsageError);
- UNIT_ASSERT_NO_EXCEPTION(builder.OutputSchema(0, urlRowSchema));
- UNIT_ASSERT_NO_EXCEPTION(builder.OutputSchema(5, urlRowSchema));
- UNIT_ASSERT_EXCEPTION(builder.OutputSchema(1, urlRowSchema), TApiUsageError);
-
- builder.Finish();
- auto result = builder.GetOutputSchemas();
-
- ASSERT_SERIALIZABLES_EQUAL(result[0], urlRowSchema);
- ASSERT_SERIALIZABLES_EQUAL(result[1], rowFieldSerializationOptionSchema);
- ASSERT_SERIALIZABLES_EQUAL(result[2], urlRowSchema);
- ASSERT_SERIALIZABLES_EQUAL(result[3], urlRowSchema);
- ASSERT_SERIALIZABLES_EQUAL(result[4], rowSerializedRepeatedFieldsSchema);
- ASSERT_SERIALIZABLES_EQUAL(result[5], urlRowSchema);
- ASSERT_SERIALIZABLES_EQUAL(result[6], rowSerializedRepeatedFieldsSchema);
-
- auto expectedInputDescriptions = TVector<TMaybe<TTableStructure>>{
- {TProtobufTableStructure{TUrlRow::descriptor()}},
- {TProtobufTableStructure{TRowSerializedRepeatedFields::descriptor()}},
- {TProtobufTableStructure{TUrlRow::descriptor()}},
- {TProtobufTableStructure{TUrlRow::descriptor()}},
- {TProtobufTableStructure{TRowSerializedRepeatedFields::descriptor()}},
- };
- UNIT_ASSERT_EQUAL(expectedInputDescriptions, builder.GetInputDescriptions());
-
- auto expectedOutputDescriptions = TVector<TMaybe<TTableStructure>>{
- {TProtobufTableStructure{TUrlRow::descriptor()}},
- {TProtobufTableStructure{TRowFieldSerializationOption::descriptor()}},
- {TProtobufTableStructure{TUrlRow::descriptor()}},
- {TProtobufTableStructure{TUrlRow::descriptor()}},
- {TProtobufTableStructure{TRowSerializedRepeatedFields::descriptor()}},
- {TProtobufTableStructure{TUrlRow::descriptor()}},
- {TProtobufTableStructure{TRowSerializedRepeatedFields::descriptor()}},
- };
- UNIT_ASSERT_EQUAL(expectedOutputDescriptions, builder.GetOutputDescriptions());
- }
-
- Y_UNIT_TEST(InputColumns)
- {
- TDummyInferenceContext context(5, 1);
- TJobOperationPreparer builder(context);
- builder
- .InputColumnFilter(2, {"a", "b"})
- .BeginInputGroup(0, 2)
- .ColumnFilter({"b", "c"})
- .ColumnRenaming({{"b", "B"}, {"c", "C"}})
- .EndInputGroup()
- .InputColumnRenaming(3, {{"a", "AAA"}})
- .NoOutputSchema(0);
- builder.Finish();
-
- auto expectedRenamings = TVector<THashMap<TString, TString>>{
- {{"b", "B"}, {"c", "C"}},
- {{"b", "B"}, {"c", "C"}},
- {},
- {{"a", "AAA"}},
- {},
- };
- UNIT_ASSERT_EQUAL(builder.GetInputColumnRenamings(), expectedRenamings);
-
- auto expectedFilters = TVector<TMaybe<TVector<TString>>>{
- {{"b", "c"}},
- {{"b", "c"}},
- {{"a", "b"}},
- {},
- {},
- };
- UNIT_ASSERT_EQUAL(builder.GetInputColumnFilters(), expectedFilters);
- }
-
- Y_UNIT_TEST(Bug_r7349102)
- {
- auto firstSchema = TTableSchema()
- .AddColumn(TColumnSchema().Name("some_column").Type(EValueType::VT_UINT64));
- auto otherSchema = TTableSchema()
- .AddColumn(TColumnSchema().Name("other_column").Type(EValueType::VT_BOOLEAN));
- auto thirdSchema = TTableSchema()
- .AddColumn(TColumnSchema().Name("third_column").Type(EValueType::VT_STRING));
-
- TDummyInferenceContext context(3,1);
- TJobOperationPreparer builder(context);
-
- builder
- .InputDescription<TUrlRow>(0)
- .InputDescription<TUrlRow>(1)
- .InputDescription<TUrlRow>(2)
- .OutputDescription<TUrlRow>(0);
-
- builder.Finish();
- }
-
-} // Y_UNIT_TEST_SUITE(SchemaInference)
diff --git a/yt/cpp/mapreduce/interface/protobuf_file_options_ut.cpp b/yt/cpp/mapreduce/interface/protobuf_file_options_ut.cpp
deleted file mode 100644
index 5ffa9564d7..0000000000
--- a/yt/cpp/mapreduce/interface/protobuf_file_options_ut.cpp
+++ /dev/null
@@ -1,271 +0,0 @@
-#include "errors.h"
-#include "format.h"
-#include "common_ut.h"
-
-#include <yt/cpp/mapreduce/interface/protobuf_file_options_ut.pb.h>
-
-#include <yt/cpp/mapreduce/tests/yt_unittest_lib/yt_unittest_lib.h>
-
-#include <library/cpp/testing/unittest/registar.h>
-
-using namespace NYT;
-
-Y_UNIT_TEST_SUITE(ProtobufFileOptions)
-{
- NTi::TTypePtr GetUrlRowType(bool required)
- {
- static const NTi::TTypePtr structType = NTi::Struct({
- {"Host", ToTypeV3(EValueType::VT_STRING, false)},
- {"Path", ToTypeV3(EValueType::VT_STRING, false)},
- {"HttpCode", ToTypeV3(EValueType::VT_INT32, false)}});
- return required ? structType : NTi::TTypePtr(NTi::Optional(structType));
- }
-
- Y_UNIT_TEST(TRowFieldSerializationOption)
- {
- const auto schema = CreateTableSchema<NTestingFileOptions::TRowFieldSerializationOption>();
-
- ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema()
- .AddColumn(TColumnSchema().Name("UrlRow_1").Type(ToTypeV3(EValueType::VT_STRING, false)))
- .AddColumn(TColumnSchema().Name("UrlRow_2").Type(GetUrlRowType(false))));
- }
-
- Y_UNIT_TEST(TRowMixedSerializationOptions)
- {
- const auto schema = CreateTableSchema<NTestingFileOptions::TRowMixedSerializationOptions>();
-
- ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema()
- .AddColumn(TColumnSchema().Name("UrlRow_1").Type(ToTypeV3(EValueType::VT_STRING, false)))
- .AddColumn(TColumnSchema().Name("UrlRow_2").Type(GetUrlRowType(false))));
- }
-
- Y_UNIT_TEST(FieldSortOrder)
- {
- const auto schema = CreateTableSchema<NTestingFileOptions::TFieldSortOrder>();
-
- auto asInProtoFile = NTi::Optional(NTi::Struct({
- {"x", NTi::Optional(NTi::Int64())},
- {"y", NTi::Optional(NTi::String())},
- {"z", NTi::Optional(NTi::Bool())},
- }));
- auto byFieldNumber = NTi::Optional(NTi::Struct({
- {"z", NTi::Optional(NTi::Bool())},
- {"x", NTi::Optional(NTi::Int64())},
- {"y", NTi::Optional(NTi::String())},
- }));
-
- ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema()
- .AddColumn(TColumnSchema().Name("EmbeddedDefault").Type(asInProtoFile))
- .AddColumn(TColumnSchema().Name("EmbeddedAsInProtoFile").Type(asInProtoFile))
- .AddColumn(TColumnSchema().Name("EmbeddedByFieldNumber").Type(byFieldNumber)));
- }
-
- Y_UNIT_TEST(Map)
- {
- const auto schema = CreateTableSchema<NTestingFileOptions::TWithMap>();
-
- auto createKeyValueStruct = [] (NTi::TTypePtr key, NTi::TTypePtr value) {
- return NTi::List(NTi::Struct({
- {"key", NTi::Optional(key)},
- {"value", NTi::Optional(value)},
- }));
- };
-
- auto embedded = NTi::Struct({
- {"x", NTi::Optional(NTi::Int64())},
- {"y", NTi::Optional(NTi::String())},
- });
-
- ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema()
- .AddColumn(TColumnSchema()
- .Name("MapDefault")
- .Type(createKeyValueStruct(NTi::Int64(), embedded)))
- .AddColumn(TColumnSchema()
- .Name("MapDict")
- .Type(NTi::Dict(NTi::Int64(), embedded))));
- }
-
- Y_UNIT_TEST(Oneof)
- {
- const auto schema = CreateTableSchema<NTestingFileOptions::TWithOneof>();
-
- auto embedded = NTi::Struct({
- {"x", NTi::Optional(NTi::Int64())},
- {"y", NTi::Optional(NTi::String())},
- });
-
- auto defaultVariantType = NTi::Optional(NTi::Struct({
- {"field", NTi::Optional(NTi::String())},
- {"Oneof2", NTi::Optional(NTi::Variant(NTi::Struct({
- {"y2", NTi::String()},
- {"z2", embedded},
- {"x2", NTi::Int64()},
- })))},
- {"x1", NTi::Optional(NTi::Int64())},
- {"y1", NTi::Optional(NTi::String())},
- {"z1", NTi::Optional(embedded)},
- }));
-
- auto noDefaultType = NTi::Optional(NTi::Struct({
- {"field", NTi::Optional(NTi::String())},
- {"y2", NTi::Optional(NTi::String())},
- {"z2", NTi::Optional(embedded)},
- {"x2", NTi::Optional(NTi::Int64())},
- {"x1", NTi::Optional(NTi::Int64())},
- {"y1", NTi::Optional(NTi::String())},
- {"z1", NTi::Optional(embedded)},
- }));
-
- ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema()
- .AddColumn(TColumnSchema()
- .Name("DefaultVariant")
- .Type(defaultVariantType)
- )
- .AddColumn(TColumnSchema()
- .Name("NoDefault")
- .Type(noDefaultType)
- )
- .AddColumn(TColumnSchema()
- .Name("SerializationProtobuf")
- .Type(NTi::Optional(NTi::Struct({
- {"x1", NTi::Optional(NTi::Int64())},
- {"y1", NTi::Optional(NTi::String())},
- {"z1", NTi::Optional(NTi::String())},
- })))
- )
- .AddColumn(TColumnSchema()
- .Name("MemberOfTopLevelOneof")
- .Type(NTi::Optional(NTi::Int64()))
- )
- );
- }
-}
-
-static TNode GetColumns(const TFormat& format, int tableIndex = 0)
-{
- return format.Config.GetAttributes()["tables"][tableIndex]["columns"];
-}
-
-Y_UNIT_TEST_SUITE(ProtobufFormatFileOptions)
-{
- Y_UNIT_TEST(TRowFieldSerializationOption)
- {
- const auto format = TFormat::Protobuf<NTestingFileOptions::TRowFieldSerializationOption>();
- auto columns = GetColumns(format);
-
- UNIT_ASSERT_VALUES_EQUAL(columns[0]["name"], "UrlRow_1");
- UNIT_ASSERT_VALUES_EQUAL(columns[0]["proto_type"], "message");
- UNIT_ASSERT_VALUES_EQUAL(columns[0]["field_number"], 1);
-
- UNIT_ASSERT_VALUES_EQUAL(columns[1]["name"], "UrlRow_2");
- UNIT_ASSERT_VALUES_EQUAL(columns[1]["proto_type"], "structured_message");
- UNIT_ASSERT_VALUES_EQUAL(columns[1]["field_number"], 2);
- const auto& fields = columns[1]["fields"];
- UNIT_ASSERT_VALUES_EQUAL(fields[0]["name"], "Host");
- UNIT_ASSERT_VALUES_EQUAL(fields[0]["proto_type"], "string");
- UNIT_ASSERT_VALUES_EQUAL(fields[0]["field_number"], 1);
-
- UNIT_ASSERT_VALUES_EQUAL(fields[1]["name"], "Path");
- UNIT_ASSERT_VALUES_EQUAL(fields[1]["proto_type"], "string");
- UNIT_ASSERT_VALUES_EQUAL(fields[1]["field_number"], 2);
-
- UNIT_ASSERT_VALUES_EQUAL(fields[2]["name"], "HttpCode");
- UNIT_ASSERT_VALUES_EQUAL(fields[2]["proto_type"], "sint32");
- UNIT_ASSERT_VALUES_EQUAL(fields[2]["field_number"], 3);
- }
-
- Y_UNIT_TEST(Map)
- {
- const auto format = TFormat::Protobuf<NTestingFileOptions::TWithMap>();
- auto columns = GetColumns(format);
-
- UNIT_ASSERT_VALUES_EQUAL(columns.Size(), 2);
- {
- const auto& column = columns[0];
- UNIT_ASSERT_VALUES_EQUAL(column["name"], "MapDefault");
- UNIT_ASSERT_VALUES_EQUAL(column["proto_type"], "structured_message");
- UNIT_ASSERT_VALUES_EQUAL(column["fields"].Size(), 2);
- UNIT_ASSERT_VALUES_EQUAL(column["fields"][0]["proto_type"], "int64");
- UNIT_ASSERT_VALUES_EQUAL(column["fields"][1]["proto_type"], "structured_message");
- }
- {
- const auto& column = columns[1];
- UNIT_ASSERT_VALUES_EQUAL(column["name"], "MapDict");
- UNIT_ASSERT_VALUES_EQUAL(column["proto_type"], "structured_message");
- UNIT_ASSERT_VALUES_EQUAL(column["fields"].Size(), 2);
- UNIT_ASSERT_VALUES_EQUAL(column["fields"][0]["proto_type"], "int64");
- UNIT_ASSERT_VALUES_EQUAL(column["fields"][1]["proto_type"], "structured_message");
- }
- }
-
- Y_UNIT_TEST(Oneof)
- {
- const auto format = TFormat::Protobuf<NTestingFileOptions::TWithOneof>();
- auto columns = GetColumns(format);
-
- UNIT_ASSERT_VALUES_EQUAL(columns.Size(), 4);
-
- {
- const auto& column = columns[0];
- UNIT_ASSERT_VALUES_EQUAL(column["name"], "DefaultVariant");
- UNIT_ASSERT_VALUES_EQUAL(column["proto_type"], "structured_message");
- UNIT_ASSERT_VALUES_EQUAL(column["fields"].Size(), 5);
- UNIT_ASSERT_VALUES_EQUAL(column["fields"][0]["name"], "field");
-
- const auto& oneof2 = column["fields"][1];
- UNIT_ASSERT_VALUES_EQUAL(oneof2["name"], "Oneof2");
- UNIT_ASSERT_VALUES_EQUAL(oneof2["proto_type"], "oneof");
- UNIT_ASSERT_VALUES_EQUAL(oneof2["fields"][0]["name"], "y2");
- UNIT_ASSERT_VALUES_EQUAL(oneof2["fields"][1]["name"], "z2");
- UNIT_ASSERT_VALUES_EQUAL(oneof2["fields"][1]["proto_type"], "structured_message");
- const auto& embeddedFields = oneof2["fields"][1]["fields"];
- UNIT_ASSERT_VALUES_EQUAL(embeddedFields[0]["name"], "x");
- UNIT_ASSERT_VALUES_EQUAL(embeddedFields[1]["name"], "y");
-
- UNIT_ASSERT_VALUES_EQUAL(oneof2["fields"][2]["name"], "x2");
-
- UNIT_ASSERT_VALUES_EQUAL(column["fields"][2]["name"], "x1");
- UNIT_ASSERT_VALUES_EQUAL(column["fields"][3]["name"], "y1");
- UNIT_ASSERT_VALUES_EQUAL(column["fields"][4]["name"], "z1");
- };
-
- {
- const auto& column = columns[1];
- UNIT_ASSERT_VALUES_EQUAL(column["name"], "NoDefault");
- UNIT_ASSERT_VALUES_EQUAL(column["proto_type"], "structured_message");
- const auto& fields = column["fields"];
- UNIT_ASSERT_VALUES_EQUAL(fields.Size(), 7);
-
- UNIT_ASSERT_VALUES_EQUAL(fields[0]["name"], "field");
-
- UNIT_ASSERT_VALUES_EQUAL(fields[1]["name"], "y2");
-
- UNIT_ASSERT_VALUES_EQUAL(fields[2]["name"], "z2");
- UNIT_ASSERT_VALUES_EQUAL(fields[2]["proto_type"], "structured_message");
- const auto& embeddedFields = fields[2]["fields"];
- UNIT_ASSERT_VALUES_EQUAL(embeddedFields[0]["name"], "x");
- UNIT_ASSERT_VALUES_EQUAL(embeddedFields[1]["name"], "y");
-
- UNIT_ASSERT_VALUES_EQUAL(fields[3]["name"], "x2");
-
- UNIT_ASSERT_VALUES_EQUAL(fields[4]["name"], "x1");
- UNIT_ASSERT_VALUES_EQUAL(fields[5]["name"], "y1");
- UNIT_ASSERT_VALUES_EQUAL(fields[6]["name"], "z1");
- };
-
- {
- const auto& column = columns[2];
- UNIT_ASSERT_VALUES_EQUAL(column["name"], "SerializationProtobuf");
- UNIT_ASSERT_VALUES_EQUAL(column["proto_type"], "structured_message");
- UNIT_ASSERT_VALUES_EQUAL(column["fields"].Size(), 3);
- UNIT_ASSERT_VALUES_EQUAL(column["fields"][0]["name"], "x1");
- UNIT_ASSERT_VALUES_EQUAL(column["fields"][1]["name"], "y1");
- UNIT_ASSERT_VALUES_EQUAL(column["fields"][2]["name"], "z1");
- }
- {
- const auto& column = columns[3];
- UNIT_ASSERT_VALUES_EQUAL(column["name"], "MemberOfTopLevelOneof");
- UNIT_ASSERT_VALUES_EQUAL(column["proto_type"], "int64");
- }
- }
-}
diff --git a/yt/cpp/mapreduce/interface/protobuf_table_schema_ut.cpp b/yt/cpp/mapreduce/interface/protobuf_table_schema_ut.cpp
deleted file mode 100644
index 19a3d5163f..0000000000
--- a/yt/cpp/mapreduce/interface/protobuf_table_schema_ut.cpp
+++ /dev/null
@@ -1,451 +0,0 @@
-#include "common.h"
-#include "errors.h"
-#include "common_ut.h"
-#include "util/generic/fwd.h"
-
-#include <yt/cpp/mapreduce/interface/protobuf_table_schema_ut.pb.h>
-#include <yt/cpp/mapreduce/interface/proto3_ut.pb.h>
-
-#include <yt/cpp/mapreduce/tests/yt_unittest_lib/yt_unittest_lib.h>
-
-#include <library/cpp/testing/unittest/registar.h>
-
-#include <algorithm>
-
-using namespace NYT;
-
-bool IsFieldPresent(const TTableSchema& schema, TStringBuf name)
-{
- for (const auto& field : schema.Columns()) {
- if (field.Name() == name) {
- return true;
- }
- }
- return false;
-}
-
-Y_UNIT_TEST_SUITE(ProtoSchemaTest_Simple)
-{
- Y_UNIT_TEST(TIntegral)
- {
- const auto schema = CreateTableSchema<NUnitTesting::TIntegral>();
-
- ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema()
- .AddColumn(TColumnSchema().Name("DoubleField").Type(ToTypeV3(EValueType::VT_DOUBLE, false)))
- .AddColumn(TColumnSchema().Name("FloatField").Type(ToTypeV3(EValueType::VT_DOUBLE, false)))
- .AddColumn(TColumnSchema().Name("Int32Field").Type(ToTypeV3(EValueType::VT_INT32, false)))
- .AddColumn(TColumnSchema().Name("Int64Field").Type(ToTypeV3(EValueType::VT_INT64, false)))
- .AddColumn(TColumnSchema().Name("Uint32Field").Type(ToTypeV3(EValueType::VT_UINT32, false)))
- .AddColumn(TColumnSchema().Name("Uint64Field").Type(ToTypeV3(EValueType::VT_UINT64, false)))
- .AddColumn(TColumnSchema().Name("Sint32Field").Type(ToTypeV3(EValueType::VT_INT32, false)))
- .AddColumn(TColumnSchema().Name("Sint64Field").Type(ToTypeV3(EValueType::VT_INT64, false)))
- .AddColumn(TColumnSchema().Name("Fixed32Field").Type(ToTypeV3(EValueType::VT_UINT32, false)))
- .AddColumn(TColumnSchema().Name("Fixed64Field").Type(ToTypeV3(EValueType::VT_UINT64, false)))
- .AddColumn(TColumnSchema().Name("Sfixed32Field").Type(ToTypeV3(EValueType::VT_INT32, false)))
- .AddColumn(TColumnSchema().Name("Sfixed64Field").Type(ToTypeV3(EValueType::VT_INT64, false)))
- .AddColumn(TColumnSchema().Name("BoolField").Type(ToTypeV3(EValueType::VT_BOOLEAN, false)))
- .AddColumn(TColumnSchema().Name("EnumField").Type(ToTypeV3(EValueType::VT_STRING, false))));
- }
-
- Y_UNIT_TEST(TOneOf)
- {
- const auto schema = CreateTableSchema<NUnitTesting::TOneOf>();
-
- ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema()
- .AddColumn(TColumnSchema().Name("DoubleField").Type(ToTypeV3(EValueType::VT_DOUBLE, false)))
- .AddColumn(TColumnSchema().Name("Int32Field").Type(ToTypeV3(EValueType::VT_INT32, false)))
- .AddColumn(TColumnSchema().Name("BoolField").Type(ToTypeV3(EValueType::VT_BOOLEAN, false))));
- }
-
- Y_UNIT_TEST(TWithRequired)
- {
- const auto schema = CreateTableSchema<NUnitTesting::TWithRequired>();
-
- ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema()
- .AddColumn(TColumnSchema().Name("RequiredField").Type(ToTypeV3(EValueType::VT_STRING, true)))
- .AddColumn(TColumnSchema().Name("NotRequiredField").Type(ToTypeV3(EValueType::VT_STRING, false))));
- }
-
- Y_UNIT_TEST(TAggregated)
- {
- const auto schema = CreateTableSchema<NUnitTesting::TAggregated>();
-
- UNIT_ASSERT_VALUES_EQUAL(6, schema.Columns().size());
- ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema()
- .AddColumn(TColumnSchema().Name("StringField").Type(ToTypeV3(EValueType::VT_STRING, false)))
- .AddColumn(TColumnSchema().Name("BytesField").Type(ToTypeV3(EValueType::VT_STRING, false)))
- .AddColumn(TColumnSchema().Name("NestedField").Type(ToTypeV3(EValueType::VT_STRING, false)))
- .AddColumn(TColumnSchema().Name("NestedRepeatedField").Type(ToTypeV3(EValueType::VT_STRING, false)))
- .AddColumn(TColumnSchema().Name("NestedOneOfField").Type(ToTypeV3(EValueType::VT_STRING, false)))
- .AddColumn(TColumnSchema().Name("NestedRecursiveField").Type(ToTypeV3(EValueType::VT_STRING, false))));
- }
-
- Y_UNIT_TEST(TAliased)
- {
- const auto schema = CreateTableSchema<NUnitTesting::TAliased>();
-
- ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema()
- .AddColumn(TColumnSchema().Name("key").Type(ToTypeV3(EValueType::VT_INT32, false)))
- .AddColumn(TColumnSchema().Name("subkey").Type(ToTypeV3(EValueType::VT_DOUBLE, false)))
- .AddColumn(TColumnSchema().Name("Data").Type(ToTypeV3(EValueType::VT_STRING, false))));
- }
-
- Y_UNIT_TEST(SortColumns)
- {
- const TSortColumns keys = {"key", "subkey"};
-
- const auto schema = CreateTableSchema<NUnitTesting::TAliased>(keys);
-
- ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema()
- .AddColumn(TColumnSchema()
- .Name("key")
- .Type(ToTypeV3(EValueType::VT_INT32, false))
- .SortOrder(ESortOrder::SO_ASCENDING))
- .AddColumn(TColumnSchema()
- .Name("subkey")
- .Type(ToTypeV3(EValueType::VT_DOUBLE, false))
- .SortOrder(ESortOrder::SO_ASCENDING))
- .AddColumn(TColumnSchema().Name("Data").Type(ToTypeV3(EValueType::VT_STRING, false))));
- }
-
- Y_UNIT_TEST(SortColumnsReordered)
- {
- const TSortColumns keys = {"subkey"};
-
- const auto schema = CreateTableSchema<NUnitTesting::TAliased>(keys);
-
- ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema()
- .AddColumn(TColumnSchema()
- .Name("subkey")
- .Type(ToTypeV3(EValueType::VT_DOUBLE, false))
- .SortOrder(ESortOrder::SO_ASCENDING))
- .AddColumn(TColumnSchema().Name("key").Type(ToTypeV3(EValueType::VT_INT32, false)))
- .AddColumn(TColumnSchema().Name("Data").Type(ToTypeV3(EValueType::VT_STRING, false))));
- }
-
- Y_UNIT_TEST(SortColumnsInvalid)
- {
- UNIT_ASSERT_EXCEPTION(CreateTableSchema<NUnitTesting::TAliased>({"subkey", "subkey"}), yexception);
- UNIT_ASSERT_EXCEPTION(CreateTableSchema<NUnitTesting::TAliased>({"key", "junk"}), yexception);
- }
-
- Y_UNIT_TEST(KeepFieldsWithoutExtensionTrue)
- {
- const auto schema = CreateTableSchema<NUnitTesting::TAliased>({}, true);
- UNIT_ASSERT(IsFieldPresent(schema, "key"));
- UNIT_ASSERT(IsFieldPresent(schema, "subkey"));
- UNIT_ASSERT(IsFieldPresent(schema, "Data"));
- UNIT_ASSERT(schema.Strict());
- }
-
- Y_UNIT_TEST(KeepFieldsWithoutExtensionFalse)
- {
- const auto schema = CreateTableSchema<NUnitTesting::TAliased>({}, false);
- UNIT_ASSERT(IsFieldPresent(schema, "key"));
- UNIT_ASSERT(IsFieldPresent(schema, "subkey"));
- UNIT_ASSERT(!IsFieldPresent(schema, "Data"));
- UNIT_ASSERT(schema.Strict());
- }
-
- Y_UNIT_TEST(ProtobufTypeOption)
- {
- const auto schema = CreateTableSchema<NUnitTesting::TWithTypeOptions>({});
-
- ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema()
- .Strict(false)
- .AddColumn(TColumnSchema().Name("ColorIntField").Type(ToTypeV3(EValueType::VT_INT64, false)))
- .AddColumn(TColumnSchema().Name("ColorStringField").Type(ToTypeV3(EValueType::VT_STRING, false)))
- .AddColumn(TColumnSchema().Name("AnyField").Type(ToTypeV3(EValueType::VT_ANY, false)))
- .AddColumn(TColumnSchema().Name("EmbeddedField").Type(
- NTi::Optional(NTi::Struct({
- {"ColorIntField", ToTypeV3(EValueType::VT_INT64, false)},
- {"ColorStringField", ToTypeV3(EValueType::VT_STRING, false)},
- {"AnyField", ToTypeV3(EValueType::VT_ANY, false)}}))))
- .AddColumn(TColumnSchema().Name("RepeatedEnumIntField").Type(NTi::List(NTi::Int64()))));
- }
-
- Y_UNIT_TEST(ProtobufTypeOption_TypeMismatch)
- {
- UNIT_ASSERT_EXCEPTION(
- CreateTableSchema<NUnitTesting::TWithTypeOptions_TypeMismatch_EnumInt>({}),
- yexception);
- UNIT_ASSERT_EXCEPTION(
- CreateTableSchema<NUnitTesting::TWithTypeOptions_TypeMismatch_EnumString>({}),
- yexception);
- UNIT_ASSERT_EXCEPTION(
- CreateTableSchema<NUnitTesting::TWithTypeOptions_TypeMismatch_Any>({}),
- yexception);
- UNIT_ASSERT_EXCEPTION(
- CreateTableSchema<NUnitTesting::TWithTypeOptions_TypeMismatch_OtherColumns>({}),
- yexception);
- }
-}
-
-Y_UNIT_TEST_SUITE(ProtoSchemaTest_Complex)
-{
- Y_UNIT_TEST(TRepeated)
- {
- UNIT_ASSERT_EXCEPTION(CreateTableSchema<NUnitTesting::TRepeated>(), yexception);
-
- const auto schema = CreateTableSchema<NUnitTesting::TRepeatedYtMode>();
- ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema()
- .AddColumn(TColumnSchema().Name("Int32Field").Type(NTi::List(ToTypeV3(EValueType::VT_INT32, true)))));
- }
-
- Y_UNIT_TEST(TRepeatedOptionalList)
- {
- const auto schema = CreateTableSchema<NUnitTesting::TOptionalList>();
- auto type = NTi::Optional(NTi::List(NTi::Int64()));
- ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema()
- .AddColumn(TColumnSchema().Name("OptionalListInt64").TypeV3(type)));
- }
-
- NTi::TTypePtr GetUrlRowType(bool required)
- {
- static const NTi::TTypePtr structType = NTi::Struct({
- {"Host", ToTypeV3(EValueType::VT_STRING, false)},
- {"Path", ToTypeV3(EValueType::VT_STRING, false)},
- {"HttpCode", ToTypeV3(EValueType::VT_INT32, false)}});
- return required ? structType : NTi::TTypePtr(NTi::Optional(structType));
- }
-
- Y_UNIT_TEST(TRowFieldSerializationOption)
- {
- const auto schema = CreateTableSchema<NUnitTesting::TRowFieldSerializationOption>();
-
- ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema()
- .AddColumn(TColumnSchema().Name("UrlRow_1").Type(GetUrlRowType(false)))
- .AddColumn(TColumnSchema().Name("UrlRow_2").Type(ToTypeV3(EValueType::VT_STRING, false))));
- }
-
- Y_UNIT_TEST(TRowMessageSerializationOption)
- {
- const auto schema = CreateTableSchema<NUnitTesting::TRowMessageSerializationOption>();
-
- ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema()
- .AddColumn(TColumnSchema().Name("UrlRow_1").Type(GetUrlRowType(false)))
- .AddColumn(TColumnSchema().Name("UrlRow_2").Type(GetUrlRowType(false))));
- }
-
- Y_UNIT_TEST(TRowMixedSerializationOptions)
- {
- const auto schema = CreateTableSchema<NUnitTesting::TRowMixedSerializationOptions>();
-
- ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema()
- .AddColumn(TColumnSchema().Name("UrlRow_1").Type(GetUrlRowType(false)))
- .AddColumn(TColumnSchema().Name("UrlRow_2").Type(ToTypeV3(EValueType::VT_STRING, false))));
- }
-
- NTi::TTypePtr GetUrlRowType_ColumnNames(bool required)
- {
- static const NTi::TTypePtr type = NTi::Struct({
- {"Host_ColumnName", ToTypeV3(EValueType::VT_STRING, false)},
- {"Path_KeyColumnName", ToTypeV3(EValueType::VT_STRING, false)},
- {"HttpCode", ToTypeV3(EValueType::VT_INT32, false)},
- });
- return required ? type : NTi::TTypePtr(NTi::Optional(type));
- }
-
- Y_UNIT_TEST(TRowMixedSerializationOptions_ColumnNames)
- {
- const auto schema = CreateTableSchema<NUnitTesting::TRowMixedSerializationOptions_ColumnNames>();
-
- ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema()
- .AddColumn(TColumnSchema().Name("UrlRow_1").Type(GetUrlRowType_ColumnNames(false)))
- .AddColumn(TColumnSchema().Name("UrlRow_2").Type(ToTypeV3(EValueType::VT_STRING, false))));
- }
-
- Y_UNIT_TEST(NoOptionInheritance)
- {
- auto deepestEmbedded = NTi::Optional(NTi::Struct({{"x", ToTypeV3(EValueType::VT_INT64, false)}}));
-
- const auto schema = CreateTableSchema<NUnitTesting::TNoOptionInheritance>();
-
- ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema()
- .AddColumn(TColumnSchema()
- .Name("EmbeddedYt_YtOption")
- .Type(NTi::Optional(NTi::Struct({{"embedded", deepestEmbedded}}))))
- .AddColumn(TColumnSchema().Name("EmbeddedYt_ProtobufOption").Type(ToTypeV3(EValueType::VT_STRING, false)))
- .AddColumn(TColumnSchema().Name("EmbeddedYt_NoOption").Type(ToTypeV3(EValueType::VT_STRING, false)))
- .AddColumn(TColumnSchema()
- .Name("EmbeddedProtobuf_YtOption")
- .Type(NTi::Optional(NTi::Struct({{"embedded", ToTypeV3(EValueType::VT_STRING, false)}}))))
- .AddColumn(TColumnSchema().Name("EmbeddedProtobuf_ProtobufOption").Type(ToTypeV3(EValueType::VT_STRING, false)))
- .AddColumn(TColumnSchema().Name("EmbeddedProtobuf_NoOption").Type(ToTypeV3(EValueType::VT_STRING, false)))
- .AddColumn(TColumnSchema()
- .Name("Embedded_YtOption")
- .Type(NTi::Optional(NTi::Struct({{"embedded", ToTypeV3(EValueType::VT_STRING, false)}}))))
- .AddColumn(TColumnSchema().Name("Embedded_ProtobufOption").Type(ToTypeV3(EValueType::VT_STRING, false)))
- .AddColumn(TColumnSchema().Name("Embedded_NoOption").Type(ToTypeV3(EValueType::VT_STRING, false))));
- }
-
- Y_UNIT_TEST(Cyclic)
- {
- UNIT_ASSERT_EXCEPTION(CreateTableSchema<NUnitTesting::TCyclic>(), TApiUsageError);
- UNIT_ASSERT_EXCEPTION(CreateTableSchema<NUnitTesting::TCyclic::TA>(), TApiUsageError);
- UNIT_ASSERT_EXCEPTION(CreateTableSchema<NUnitTesting::TCyclic::TB>(), TApiUsageError);
- UNIT_ASSERT_EXCEPTION(CreateTableSchema<NUnitTesting::TCyclic::TC>(), TApiUsageError);
- UNIT_ASSERT_EXCEPTION(CreateTableSchema<NUnitTesting::TCyclic::TD>(), TApiUsageError);
-
- ASSERT_SERIALIZABLES_EQUAL(
- TTableSchema().AddColumn(
- TColumnSchema().Name("d").TypeV3(NTi::Optional(NTi::String()))),
- CreateTableSchema<NUnitTesting::TCyclic::TE>());
- }
-
- Y_UNIT_TEST(FieldSortOrder)
- {
- const auto schema = CreateTableSchema<NUnitTesting::TFieldSortOrder>();
-
- auto byFieldNumber = NTi::Optional(NTi::Struct({
- {"z", NTi::Optional(NTi::Bool())},
- {"x", NTi::Optional(NTi::Int64())},
- {"y", NTi::Optional(NTi::String())},
- }));
-
- ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema()
- .AddColumn(TColumnSchema().Name("EmbeddedDefault").Type(byFieldNumber))
- .AddColumn(TColumnSchema()
- .Name("EmbeddedAsInProtoFile")
- .Type(NTi::Optional(NTi::Struct({
- {"x", NTi::Optional(NTi::Int64())},
- {"y", NTi::Optional(NTi::String())},
- {"z", NTi::Optional(NTi::Bool())},
- }))))
- .AddColumn(TColumnSchema().Name("EmbeddedByFieldNumber").Type(byFieldNumber)));
- }
-
- Y_UNIT_TEST(Map)
- {
- const auto schema = CreateTableSchema<NUnitTesting::TWithMap>();
-
- auto createKeyValueStruct = [] (NTi::TTypePtr key, NTi::TTypePtr value) {
- return NTi::List(NTi::Struct({
- {"key", NTi::Optional(key)},
- {"value", NTi::Optional(value)},
- }));
- };
-
- auto embedded = NTi::Struct({
- {"x", NTi::Optional(NTi::Int64())},
- {"y", NTi::Optional(NTi::String())},
- });
-
- ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema()
- .AddColumn(TColumnSchema()
- .Name("MapDefault")
- .Type(createKeyValueStruct(NTi::Int64(), NTi::String())))
- .AddColumn(TColumnSchema()
- .Name("MapListOfStructsLegacy")
- .Type(createKeyValueStruct(NTi::Int64(), NTi::String())))
- .AddColumn(TColumnSchema()
- .Name("MapListOfStructs")
- .Type(createKeyValueStruct(NTi::Int64(), embedded)))
- .AddColumn(TColumnSchema()
- .Name("MapOptionalDict")
- .Type(NTi::Optional(NTi::Dict(NTi::Int64(), embedded))))
- .AddColumn(TColumnSchema()
- .Name("MapDict")
- .Type(NTi::Dict(NTi::Int64(), embedded))));
- }
-
- Y_UNIT_TEST(Oneof)
- {
- const auto schema = CreateTableSchema<NUnitTesting::TWithOneof>();
-
- auto embedded = NTi::Struct({
- {"Oneof", NTi::Optional(NTi::Variant(NTi::Struct({
- {"x", NTi::Int64()},
- {"y", NTi::String()},
- })))},
- });
-
- auto createType = [&] (TString oneof2Name) {
- return NTi::Optional(NTi::Struct({
- {"field", NTi::Optional(NTi::String())},
- {oneof2Name, NTi::Optional(NTi::Variant(NTi::Struct({
- {"x2", NTi::Int64()},
- {"y2", NTi::String()},
- {"z2", embedded},
- })))},
- {"y1", NTi::Optional(NTi::String())},
- {"z1", NTi::Optional(embedded)},
- {"x1", NTi::Optional(NTi::Int64())},
- }));
- };
-
- ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema()
- .AddColumn(TColumnSchema()
- .Name("DefaultSeparateFields")
- .Type(createType("variant_field_name")))
- .AddColumn(TColumnSchema()
- .Name("NoDefault")
- .Type(createType("Oneof2")))
- .AddColumn(TColumnSchema()
- .Name("SerializationProtobuf")
- .Type(NTi::Optional(NTi::Struct({
- {"y1", NTi::Optional(NTi::String())},
- {"x1", NTi::Optional(NTi::Int64())},
- {"z1", NTi::Optional(NTi::String())},
- }))))
- .AddColumn(TColumnSchema()
- .Name("TopLevelOneof")
- .Type(
- NTi::Optional(
- NTi::Variant(NTi::Struct({
- {"MemberOfTopLevelOneof", NTi::Int64()}
- }))
- )
- ))
- );
- }
-
- Y_UNIT_TEST(Embedded)
- {
- const auto schema = CreateTableSchema<NUnitTesting::TEmbeddingMessage>();
- ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema()
- .Strict(false)
- .AddColumn(TColumnSchema().Name("embedded2_num").Type(NTi::Optional(NTi::Uint64())))
- .AddColumn(TColumnSchema().Name("embedded2_struct").Type(NTi::Optional(NTi::Struct({
- {"float1", NTi::Optional(NTi::Double())},
- {"string1", NTi::Optional(NTi::String())},
- }))))
- .AddColumn(TColumnSchema().Name("embedded2_repeated").Type(NTi::List(NTi::String())))
- .AddColumn(TColumnSchema().Name("embedded_num").Type(NTi::Optional(NTi::Uint64())))
- .AddColumn(TColumnSchema().Name("embedded_extra_field").Type(NTi::Optional(NTi::String())))
- .AddColumn(TColumnSchema().Name("variant").Type(NTi::Optional(NTi::Variant(NTi::Struct({
- {"str_variant", NTi::String()},
- {"uint_variant", NTi::Uint64()},
- })))))
- .AddColumn(TColumnSchema().Name("num").Type(NTi::Optional(NTi::Uint64())))
- .AddColumn(TColumnSchema().Name("extra_field").Type(NTi::Optional(NTi::String())))
- );
- }
-}
-
-Y_UNIT_TEST_SUITE(ProtoSchemaTest_Proto3)
-{
- Y_UNIT_TEST(TWithOptional)
- {
- const auto schema = CreateTableSchema<NTestingProto3::TWithOptional>();
- ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema()
- .AddColumn(TColumnSchema()
- .Name("x").Type(NTi::Optional(NTi::Int64()))
- )
- );
- }
-
- Y_UNIT_TEST(TWithOptionalMessage)
- {
- const auto schema = CreateTableSchema<NTestingProto3::TWithOptionalMessage>();
- ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema()
- .AddColumn(TColumnSchema()
- .Name("x").Type(
- NTi::Optional(
- NTi::Struct({{"x", NTi::Optional(NTi::Int64())}})
- )
- )
- )
- );
- }
-}
diff --git a/yt/cpp/mapreduce/interface/serialize_ut.cpp b/yt/cpp/mapreduce/interface/serialize_ut.cpp
deleted file mode 100644
index 59d4501ee8..0000000000
--- a/yt/cpp/mapreduce/interface/serialize_ut.cpp
+++ /dev/null
@@ -1,49 +0,0 @@
-#include <yt/cpp/mapreduce/interface/serialize.h>
-#include <yt/cpp/mapreduce/interface/common.h>
-
-#include <library/cpp/yson/node/node_builder.h>
-
-#include <library/cpp/testing/unittest/registar.h>
-
-#include <util/generic/serialized_enum.h>
-
-using namespace NYT;
-
-Y_UNIT_TEST_SUITE(Serialization)
-{
- Y_UNIT_TEST(TableSchema)
- {
- auto schema = TTableSchema()
- .AddColumn(TColumnSchema().Name("a").Type(EValueType::VT_STRING).SortOrder(SO_ASCENDING))
- .AddColumn(TColumnSchema().Name("b").Type(EValueType::VT_UINT64))
- .AddColumn(TColumnSchema().Name("c").Type(EValueType::VT_INT64, true));
-
- auto schemaNode = schema.ToNode();
- UNIT_ASSERT(schemaNode.IsList());
- UNIT_ASSERT_VALUES_EQUAL(schemaNode.Size(), 3);
-
-
- UNIT_ASSERT_VALUES_EQUAL(schemaNode[0]["name"], "a");
- UNIT_ASSERT_VALUES_EQUAL(schemaNode[0]["type"], "string");
- UNIT_ASSERT_VALUES_EQUAL(schemaNode[0]["required"], false);
- UNIT_ASSERT_VALUES_EQUAL(schemaNode[0]["sort_order"], "ascending");
-
- UNIT_ASSERT_VALUES_EQUAL(schemaNode[1]["name"], "b");
- UNIT_ASSERT_VALUES_EQUAL(schemaNode[1]["type"], "uint64");
- UNIT_ASSERT_VALUES_EQUAL(schemaNode[1]["required"], false);
-
- UNIT_ASSERT_VALUES_EQUAL(schemaNode[2]["name"], "c");
- UNIT_ASSERT_VALUES_EQUAL(schemaNode[2]["type"], "int64");
- UNIT_ASSERT_VALUES_EQUAL(schemaNode[2]["required"], true);
- }
-
- Y_UNIT_TEST(ValueTypeSerialization)
- {
- for (const auto value : GetEnumAllValues<EValueType>()) {
- TNode serialized = NYT::NDetail::ToString(value);
- EValueType deserialized;
- Deserialize(deserialized, serialized);
- UNIT_ASSERT_VALUES_EQUAL(value, deserialized);
- }
- }
-}
diff --git a/yt/cpp/mapreduce/interface/ut/common_ut.cpp b/yt/cpp/mapreduce/interface/ut/common_ut.cpp
new file mode 100644
index 0000000000..85122a97ec
--- /dev/null
+++ b/yt/cpp/mapreduce/interface/ut/common_ut.cpp
@@ -0,0 +1,353 @@
+#include "common_ut.h"
+
+#include <yt/cpp/mapreduce/interface/common.h>
+#include <yt/cpp/mapreduce/interface/fluent.h>
+
+#include <yt/cpp/mapreduce/tests/yt_unittest_lib/yt_unittest_lib.h>
+
+#include <library/cpp/testing/gtest/gtest.h>
+
+#include <library/cpp/yson/node/node_io.h>
+#include <library/cpp/yson/node/node_builder.h>
+
+#include <util/generic/xrange.h>
+
+using namespace NYT;
+
+template <class T>
+TString SaveToString(const T& obj)
+{
+ TString s;
+ TStringOutput out(s);
+ ::Save(&out, obj);
+ return s;
+}
+
+template <class T>
+T LoadFromString(TStringBuf s)
+{
+ TMemoryInput in(s);
+ T obj;
+ ::Load(&in, obj);
+ return obj;
+}
+
+template <class T>
+T SaveLoad(const T& obj)
+{
+ return LoadFromString<T>(SaveToString(obj));
+}
+
+TEST(TCommonTest, SortColumnsLegacy)
+{
+ TSortColumns keys1("a", "b");
+ EXPECT_TRUE((keys1.Parts_ == TSortColumns{"a", "b"}));
+
+ keys1.Add("c", "d");
+ EXPECT_TRUE((keys1.Parts_ == TSortColumns{"a", "b", "c", "d"}));
+
+ auto keys2 = TSortColumns(keys1).Add("e", "f");
+ EXPECT_TRUE((keys1.Parts_ == TSortColumns{"a", "b", "c", "d"}));
+ EXPECT_TRUE((keys2.Parts_ == TSortColumns{"a", "b", "c", "d", "e", "f"}));
+
+ auto keys3 = TSortColumns(keys1).Add("e").Add("f").Add("g");
+ EXPECT_TRUE((keys1.Parts_ == TSortColumns{"a", "b", "c", "d"}));
+ EXPECT_TRUE((keys3.Parts_ == TSortColumns{"a", "b", "c", "d", "e", "f", "g"}));
+}
+
+TEST(TCommonTest, SortColumn)
+{
+ auto ascending = TSortColumn("a");
+ EXPECT_EQ(ascending.Name(), "a");
+ EXPECT_EQ(ascending.SortOrder(), ESortOrder::SO_ASCENDING);
+ EXPECT_EQ(ascending, TSortColumn("a", ESortOrder::SO_ASCENDING));
+ EXPECT_NE(ascending, TSortColumn("a", ESortOrder::SO_DESCENDING));
+
+ EXPECT_NO_THROW(ascending.EnsureAscending());
+ EXPECT_EQ(static_cast<TString>(ascending), "a");
+ EXPECT_EQ(ascending, "a");
+
+ auto another = ascending;
+ EXPECT_NO_THROW(another = "another");
+ EXPECT_EQ(another.Name(), "another");
+ EXPECT_EQ(another.SortOrder(), ESortOrder::SO_ASCENDING);
+ EXPECT_EQ(another, TSortColumn("another", ESortOrder::SO_ASCENDING));
+ EXPECT_NE(another, TSortColumn("another", ESortOrder::SO_DESCENDING));
+
+ auto ascendingNode = BuildYsonNodeFluently().Value(ascending);
+ EXPECT_EQ(ascendingNode, TNode("a"));
+
+ EXPECT_EQ(SaveLoad(ascending), ascending);
+ EXPECT_NE(SaveToString(ascending), SaveToString(TString("a")));
+
+ auto descending = TSortColumn("a", ESortOrder::SO_DESCENDING);
+ EXPECT_EQ(descending.Name(), "a");
+ EXPECT_EQ(descending.SortOrder(), ESortOrder::SO_DESCENDING);
+ EXPECT_EQ(descending, TSortColumn("a", ESortOrder::SO_DESCENDING));
+ EXPECT_NE(descending, TSortColumn("a", ESortOrder::SO_ASCENDING));
+
+ EXPECT_THROW(descending.EnsureAscending(), yexception);
+ EXPECT_THROW(Y_UNUSED(static_cast<TString>(descending)), yexception);
+ EXPECT_THROW(Y_UNUSED(descending == "a"), yexception);
+ EXPECT_THROW(descending = "a", yexception);
+
+ auto descendingNode = BuildYsonNodeFluently().Value(descending);
+ EXPECT_EQ(descendingNode, TNode()("name", "a")("sort_order", "descending"));
+
+ EXPECT_EQ(SaveLoad(descending), descending);
+ EXPECT_NE(SaveToString(descending), SaveToString("a"));
+
+ EXPECT_EQ(ToString(TSortColumn("blah")), "blah");
+ EXPECT_EQ(ToString(TSortColumn("blah", ESortOrder::SO_DESCENDING)), "{\"name\"=\"blah\";\"sort_order\"=\"descending\"}");
+}
+
+TEST(TCommonTest, SortColumns)
+{
+ TSortColumns ascending("a", "b");
+ EXPECT_TRUE(ascending.Parts_ == (TSortColumns{"a", "b"}));
+ EXPECT_NO_THROW(ascending.EnsureAscending());
+ EXPECT_EQ(static_cast<TColumnNames>(ascending).Parts_, (TVector<TString>{"a", "b"}));
+ EXPECT_EQ(ascending.GetNames(), (TVector<TString>{"a", "b"}));
+
+ auto mixed = ascending;
+ mixed.Add(TSortColumn("c", ESortOrder::SO_DESCENDING), "d");
+ EXPECT_TRUE((mixed.Parts_ != TVector<TSortColumn>{"a", "b", "c", "d"}));
+ EXPECT_TRUE((mixed.Parts_ == TVector<TSortColumn>{"a", "b", TSortColumn("c", ESortOrder::SO_DESCENDING), "d"}));
+ EXPECT_EQ(mixed.GetNames(), (TVector<TString>{"a", "b", "c", "d"}));
+ EXPECT_THROW(mixed.EnsureAscending(), yexception);
+ EXPECT_THROW(Y_UNUSED(static_cast<TColumnNames>(mixed)), yexception);
+}
+
+TEST(TCommonTest, KeyBound)
+{
+ auto keyBound = TKeyBound(ERelation::Greater, TKey(7, "a", TNode()("x", "y")));
+ EXPECT_EQ(keyBound.Relation(), ERelation::Greater);
+ EXPECT_EQ(keyBound.Key(), TKey(7, "a", TNode()("x", "y")));
+
+ auto keyBound1 = TKeyBound().Relation(ERelation::Greater).Key(TKey(7, "a", TNode()("x", "y")));
+ auto expectedNode = TNode()
+ .Add(">")
+ .Add(TNode().Add(7).Add("a").Add(TNode()("x", "y")));
+
+ EXPECT_EQ(expectedNode, BuildYsonNodeFluently().Value(keyBound));
+ EXPECT_EQ(expectedNode, BuildYsonNodeFluently().Value(keyBound1));
+
+ keyBound.Relation(ERelation::LessOrEqual);
+ keyBound.Key(TKey("A", 7));
+ EXPECT_EQ(keyBound.Relation(), ERelation::LessOrEqual);
+ EXPECT_EQ(keyBound.Key(), TKey("A", 7));
+
+ EXPECT_EQ(
+ BuildYsonNodeFluently().Value(keyBound),
+ TNode()
+ .Add("<=")
+ .Add(TNode().Add("A").Add(7)));
+}
+
+TEST(TCommonTest, TTableSchema)
+{
+ TTableSchema schema;
+ schema
+ .AddColumn(TColumnSchema().Name("a").Type(EValueType::VT_STRING).SortOrder(SO_ASCENDING))
+ .AddColumn(TColumnSchema().Name("b").Type(EValueType::VT_UINT64))
+ .AddColumn(TColumnSchema().Name("c").Type(EValueType::VT_INT64));
+ auto checkSortBy = [](TTableSchema schema, const TVector<TString>& columns) {
+ auto initialSchema = schema;
+ schema.SortBy(columns);
+ for (auto i: xrange(columns.size())) {
+ EXPECT_EQ(schema.Columns()[i].Name(), columns[i]);
+ EXPECT_EQ(schema.Columns()[i].SortOrder(), ESortOrder::SO_ASCENDING);
+ }
+ for (auto i: xrange(columns.size(), (size_t)initialSchema.Columns().size())) {
+ EXPECT_EQ(schema.Columns()[i].SortOrder(), Nothing());
+ }
+ EXPECT_EQ(initialSchema.Columns().size(), schema.Columns().size());
+ return schema;
+ };
+ auto newSchema = checkSortBy(schema, {"b"});
+ EXPECT_EQ(newSchema.Columns()[1].Name(), TString("a"));
+ EXPECT_EQ(newSchema.Columns()[2].Name(), TString("c"));
+ checkSortBy(schema, {"b", "c"});
+ checkSortBy(schema, {"c", "a"});
+ EXPECT_THROW(checkSortBy(schema, {"b", "b"}), yexception);
+ EXPECT_THROW(checkSortBy(schema, {"a", "junk"}), yexception);
+}
+
+TEST(TCommonTest, TTableSchema_Decimal)
+{
+ NYT::TTableSchema tableSchema;
+
+ tableSchema.AddColumn("a", NTi::Decimal(35, 18));
+ tableSchema.AddColumn("b", NTi::Optional(NTi::Decimal(35, 18)));
+ tableSchema.AddColumn("c", NTi::List(NTi::Decimal(35, 18)));
+
+ auto tableSchemaNode = tableSchema.ToNode();
+ const auto& tableSchemaNodeList = tableSchemaNode.AsList();
+
+ // There was a bug in the serialization of decimal type: https://github.com/ytsaurus/ytsaurus/issues/173
+ {
+ const auto& currentType = tableSchemaNodeList[0];
+ EXPECT_EQ(currentType.ChildAsString("type"), "string");
+ EXPECT_TRUE(currentType.ChildAsBool("required"));
+ EXPECT_TRUE(currentType.HasKey("type_v3"));
+ EXPECT_EQ(currentType.At("type_v3").ChildAsString("type_name"), "decimal");
+ }
+ {
+ const auto& currentType = tableSchemaNodeList[1];
+ EXPECT_EQ(currentType.ChildAsString("type"), "string");
+ EXPECT_TRUE(!currentType.ChildAsBool("required"));
+ EXPECT_TRUE(currentType.HasKey("type_v3"));
+ EXPECT_EQ(currentType.At("type_v3").ChildAsString("type_name"), "optional");
+ EXPECT_EQ(currentType.At("type_v3").At("item").ChildAsString("type_name"), "decimal");
+ }
+ {
+ const auto& currentType = tableSchemaNodeList[2];
+ EXPECT_EQ(currentType.ChildAsString("type"), "any");
+ EXPECT_TRUE(currentType.ChildAsBool("required"));
+ EXPECT_TRUE(currentType.HasKey("type_v3"));
+ EXPECT_EQ(currentType.At("type_v3").ChildAsString("type_name"), "list");
+ EXPECT_EQ(currentType.At("type_v3").At("item").ChildAsString("type_name"), "decimal");
+ }
+
+ EXPECT_EQ(tableSchema, TTableSchema::FromNode(tableSchemaNode));
+}
+
+TEST(TCommonTest, TColumnSchema_TypeV3)
+{
+ {
+ auto column = TColumnSchema().Type(NTi::Interval());
+ EXPECT_EQ(column.Required(), true);
+ EXPECT_EQ(column.Type(), VT_INTERVAL);
+ }
+ {
+ auto column = TColumnSchema().Type(NTi::Optional(NTi::Date()));
+ EXPECT_EQ(column.Required(), false);
+ EXPECT_EQ(column.Type(), VT_DATE);
+ }
+ {
+ auto column = TColumnSchema().Type(NTi::Interval64());
+ EXPECT_EQ(column.Required(), true);
+ EXPECT_EQ(column.Type(), VT_INTERVAL64);
+ }
+ {
+ auto column = TColumnSchema().Type(NTi::Optional(NTi::Date32()));
+ EXPECT_EQ(column.Required(), false);
+ EXPECT_EQ(column.Type(), VT_DATE32);
+ }
+ {
+ auto column = TColumnSchema().Type(NTi::Null());
+ EXPECT_EQ(column.Required(), false);
+ EXPECT_EQ(column.Type(), VT_NULL);
+ }
+ {
+ auto column = TColumnSchema().Type(NTi::Optional(NTi::Null()));
+ EXPECT_EQ(column.Required(), false);
+ EXPECT_EQ(column.Type(), VT_ANY);
+ }
+ {
+ auto column = TColumnSchema().Type(NTi::Decimal(35, 18));
+ EXPECT_EQ(column.Required(), true);
+ EXPECT_EQ(column.Type(), VT_STRING);
+ }
+}
+
+TEST(TCommonTest, ToTypeV3)
+{
+ EXPECT_EQ(*ToTypeV3(VT_INT32, true), *NTi::Int32());
+ EXPECT_EQ(*ToTypeV3(VT_UTF8, false), *NTi::Optional(NTi::Utf8()));
+}
+
+TEST(TCommonTest, DeserializeColumn)
+{
+ auto deserialize = [] (TStringBuf yson) {
+ auto node = NodeFromYsonString(yson);
+ TColumnSchema column;
+ Deserialize(column, node);
+ return column;
+ };
+
+ auto column = deserialize("{name=foo; type=int64; required=%false}");
+ EXPECT_EQ(column.Name(), "foo");
+ EXPECT_EQ(*column.TypeV3(), *NTi::Optional(NTi::Int64()));
+
+ column = deserialize("{name=bar; type=utf8; required=%true; type_v3=utf8}");
+ EXPECT_EQ(column.Name(), "bar");
+ EXPECT_EQ(*column.TypeV3(), *NTi::Utf8());
+}
+
+TEST(TCommonTest, ColumnSchemaEquality)
+{
+ auto base = TColumnSchema()
+ .Name("col")
+ .TypeV3(NTi::Optional(NTi::List(NTi::String())))
+ .SortOrder(ESortOrder::SO_ASCENDING)
+ .Lock("lock")
+ .Expression("x + 12")
+ .Aggregate("sum")
+ .Group("group");
+
+ auto other = base;
+ ASSERT_SERIALIZABLES_EQ(other, base);
+ other.Name("other");
+ ASSERT_SERIALIZABLES_NE(other, base);
+
+ other = base;
+ other.TypeV3(NTi::List(NTi::String()));
+ ASSERT_SERIALIZABLES_NE(other, base);
+
+ other = base;
+ other.ResetSortOrder();
+ ASSERT_SERIALIZABLES_NE(other, base);
+
+ other = base;
+ other.Lock("lock1");
+ ASSERT_SERIALIZABLES_NE(other, base);
+
+ other = base;
+ other.Expression("x + 13");
+ ASSERT_SERIALIZABLES_NE(other, base);
+
+ other = base;
+ other.ResetAggregate();
+ ASSERT_SERIALIZABLES_NE(other, base);
+
+ other = base;
+ other.Group("group1");
+ ASSERT_SERIALIZABLES_NE(other, base);
+}
+
+TEST(TCommonTest, TableSchemaEquality)
+{
+ auto col1 = TColumnSchema()
+ .Name("col1")
+ .TypeV3(NTi::Optional(NTi::List(NTi::String())))
+ .SortOrder(ESortOrder::SO_ASCENDING);
+
+ auto col2 = TColumnSchema()
+ .Name("col2")
+ .TypeV3(NTi::Uint32());
+
+ auto schema = TTableSchema()
+ .AddColumn(col1)
+ .AddColumn(col2)
+ .Strict(true)
+ .UniqueKeys(true);
+
+ auto other = schema;
+ ASSERT_SERIALIZABLES_EQ(other, schema);
+
+ other.Strict(false);
+ ASSERT_SERIALIZABLES_NE(other, schema);
+
+ other = schema;
+ other.MutableColumns()[0].TypeV3(NTi::List(NTi::String()));
+ ASSERT_SERIALIZABLES_NE(other, schema);
+
+ other = schema;
+ other.MutableColumns().push_back(col1);
+ ASSERT_SERIALIZABLES_NE(other, schema);
+
+ other = schema;
+ other.UniqueKeys(false);
+ ASSERT_SERIALIZABLES_NE(other, schema);
+}
diff --git a/yt/cpp/mapreduce/interface/common_ut.h b/yt/cpp/mapreduce/interface/ut/common_ut.h
index 6f70f09bee..6f70f09bee 100644
--- a/yt/cpp/mapreduce/interface/common_ut.h
+++ b/yt/cpp/mapreduce/interface/ut/common_ut.h
diff --git a/yt/cpp/mapreduce/interface/ut/config_ut.cpp b/yt/cpp/mapreduce/interface/ut/config_ut.cpp
new file mode 100644
index 0000000000..780a57f3f2
--- /dev/null
+++ b/yt/cpp/mapreduce/interface/ut/config_ut.cpp
@@ -0,0 +1,17 @@
+#include <library/cpp/testing/gtest/gtest.h>
+
+#include <yt/cpp/mapreduce/interface/config.h>
+
+using namespace NYT;
+
+TEST(TConfigTest, Reset) {
+ // Very limited test, checks only one config field.
+
+ auto origConfig = *TConfig::Get();
+ TConfig::Get()->Reset();
+ EXPECT_EQ(origConfig.Hosts, TConfig::Get()->Hosts);
+
+ TConfig::Get()->Hosts = "hosts/fb867";
+ TConfig::Get()->Reset();
+ EXPECT_EQ(origConfig.Hosts, TConfig::Get()->Hosts);
+}
diff --git a/yt/cpp/mapreduce/interface/ut/error_ut.cpp b/yt/cpp/mapreduce/interface/ut/error_ut.cpp
new file mode 100644
index 0000000000..4911f29d97
--- /dev/null
+++ b/yt/cpp/mapreduce/interface/ut/error_ut.cpp
@@ -0,0 +1,81 @@
+#include <library/cpp/testing/gtest/gtest.h>
+
+#include <library/cpp/json/json_reader.h>
+
+#include <yt/cpp/mapreduce/interface/errors.h>
+
+#include <yt/cpp/mapreduce/common/helpers.h>
+
+#include <util/generic/set.h>
+
+using namespace NYT;
+
+template<>
+void Out<NYT::TNode>(IOutputStream& s, const NYT::TNode& node)
+{
+ s << "TNode:" << NodeToYsonString(node);
+}
+
+TEST(TErrorTest, ParseJson)
+{
+ // Scary real world error! Бу!
+ const char* jsonText =
+ R"""({)"""
+ R"""("code":500,)"""
+ R"""("message":"Error resolving path //home/user/link",)"""
+ R"""("attributes":{)"""
+ R"""("fid":18446484571700269066,)"""
+ R"""("method":"Create",)"""
+ R"""("tid":17558639495721339338,)"""
+ R"""("datetime":"2017-04-07T13:38:56.474819Z",)"""
+ R"""("pid":414529,)"""
+ R"""("host":"build01-01g.yt.yandex.net"},)"""
+ R"""("inner_errors":[{)"""
+ R"""("code":1,)"""
+ R"""("message":"Node //tt cannot have children",)"""
+ R"""("attributes":{)"""
+ R"""("fid":18446484571700269066,)"""
+ R"""("tid":17558639495721339338,)"""
+ R"""("datetime":"2017-04-07T13:38:56.474725Z",)"""
+ R"""("pid":414529,)"""
+ R"""("host":"build01-01g.yt.yandex.net"},)"""
+ R"""("inner_errors":[]}]})""";
+
+ NJson::TJsonValue jsonValue;
+ ReadJsonFastTree(jsonText, &jsonValue, /*throwOnError=*/ true);
+
+ TYtError error(jsonValue);
+ EXPECT_EQ(error.GetCode(), 500);
+ EXPECT_EQ(error.GetMessage(), R"""(Error resolving path //home/user/link)""");
+ EXPECT_EQ(error.InnerErrors().size(), 1u);
+ EXPECT_EQ(error.InnerErrors()[0].GetCode(), 1);
+
+ EXPECT_EQ(error.HasAttributes(), true);
+ EXPECT_EQ(error.GetAttributes().at("method"), TNode("Create"));
+
+ EXPECT_EQ(error.GetAllErrorCodes(), TSet<int>({500, 1}));
+}
+
+TEST(TErrorTest, GetYsonText) {
+ const char* jsonText =
+ R"""({)"""
+ R"""("code":500,)"""
+ R"""("message":"outer error",)"""
+ R"""("attributes":{)"""
+ R"""("method":"Create",)"""
+ R"""("pid":414529},)"""
+ R"""("inner_errors":[{)"""
+ R"""("code":1,)"""
+ R"""("message":"inner error",)"""
+ R"""("attributes":{},)"""
+ R"""("inner_errors":[])"""
+ R"""(}]})""";
+ TYtError error;
+ error.ParseFrom(jsonText);
+ TString ysonText = error.GetYsonText();
+ TYtError error2(NodeFromYsonString(ysonText));
+ EXPECT_EQ(
+ ysonText,
+ R"""({"code"=500;"message"="outer error";"attributes"={"method"="Create";"pid"=414529};"inner_errors"=[{"code"=1;"message"="inner error"}]})""");
+ EXPECT_EQ(error2.GetYsonText(), ysonText);
+}
diff --git a/yt/cpp/mapreduce/interface/ut/format_ut.cpp b/yt/cpp/mapreduce/interface/ut/format_ut.cpp
new file mode 100644
index 0000000000..83b860ab94
--- /dev/null
+++ b/yt/cpp/mapreduce/interface/ut/format_ut.cpp
@@ -0,0 +1,232 @@
+#include "common_ut.h"
+
+#include <yt/cpp/mapreduce/interface/common.h>
+#include <yt/cpp/mapreduce/interface/errors.h>
+#include <yt/cpp/mapreduce/interface/format.h>
+
+#include <yt/cpp/mapreduce/interface/ut/proto3_ut.pb.h>
+#include <yt/cpp/mapreduce/interface/ut/protobuf_table_schema_ut.pb.h>
+
+#include <library/cpp/testing/gtest/gtest.h>
+
+using namespace NYT;
+
+static TNode GetColumns(const TFormat& format, int tableIndex = 0)
+{
+ return format.Config.GetAttributes()["tables"][tableIndex]["columns"];
+}
+
+TEST(TProtobufFormatTest, TIntegral)
+{
+ const auto format = TFormat::Protobuf<NUnitTesting::TIntegral>();
+ auto columns = GetColumns(format);
+
+ struct TColumn
+ {
+ TString Name;
+ TString ProtoType;
+ int FieldNumber;
+ };
+
+ auto expected = TVector<TColumn>{
+ {"DoubleField", "double", 1},
+ {"FloatField", "float", 2},
+ {"Int32Field", "int32", 3},
+ {"Int64Field", "int64", 4},
+ {"Uint32Field", "uint32", 5},
+ {"Uint64Field", "uint64", 6},
+ {"Sint32Field", "sint32", 7},
+ {"Sint64Field", "sint64", 8},
+ {"Fixed32Field", "fixed32", 9},
+ {"Fixed64Field", "fixed64", 10},
+ {"Sfixed32Field", "sfixed32", 11},
+ {"Sfixed64Field", "sfixed64", 12},
+ {"BoolField", "bool", 13},
+ {"EnumField", "enum_string", 14},
+ };
+
+ EXPECT_EQ(columns.Size(), expected.size());
+ for (int i = 0; i < static_cast<int>(columns.Size()); ++i) {
+ EXPECT_EQ(columns[i]["name"], expected[i].Name);
+ EXPECT_EQ(columns[i]["proto_type"], expected[i].ProtoType);
+ EXPECT_EQ(columns[i]["field_number"], expected[i].FieldNumber);
+ }
+}
+
+TEST(TProtobufFormatTest, TRowFieldSerializationOption)
+{
+ const auto format = TFormat::Protobuf<NUnitTesting::TRowFieldSerializationOption>();
+ auto columns = GetColumns(format);
+
+ EXPECT_EQ(columns[0]["name"], "UrlRow_1");
+ EXPECT_EQ(columns[0]["proto_type"], "structured_message");
+ EXPECT_EQ(columns[0]["field_number"], 1);
+ const auto& fields = columns[0]["fields"];
+ EXPECT_EQ(fields[0]["name"], "Host");
+ EXPECT_EQ(fields[0]["proto_type"], "string");
+ EXPECT_EQ(fields[0]["field_number"], 1);
+
+ EXPECT_EQ(fields[1]["name"], "Path");
+ EXPECT_EQ(fields[1]["proto_type"], "string");
+ EXPECT_EQ(fields[1]["field_number"], 2);
+
+ EXPECT_EQ(fields[2]["name"], "HttpCode");
+ EXPECT_EQ(fields[2]["proto_type"], "sint32");
+ EXPECT_EQ(fields[2]["field_number"], 3);
+
+ EXPECT_EQ(columns[1]["name"], "UrlRow_2");
+ EXPECT_EQ(columns[1]["proto_type"], "message");
+ EXPECT_EQ(columns[1]["field_number"], 2);
+}
+
+
+TEST(TProtobufFormatTest, TPacked)
+{
+ const auto format = TFormat::Protobuf<NUnitTesting::TPacked>();
+ auto column = GetColumns(format)[0];
+
+ EXPECT_EQ(column["name"], "PackedListInt64");
+ EXPECT_EQ(column["proto_type"], "int64");
+ EXPECT_EQ(column["field_number"], 1);
+ EXPECT_EQ(column["packed"], true);
+ EXPECT_EQ(column["repeated"], true);
+}
+
+TEST(TProtobufFormatTest, TCyclic)
+{
+ EXPECT_THROW(TFormat::Protobuf<NUnitTesting::TCyclic>(), TApiUsageError);
+ EXPECT_THROW(TFormat::Protobuf<NUnitTesting::TCyclic::TA>(), TApiUsageError);
+ EXPECT_THROW(TFormat::Protobuf<NUnitTesting::TCyclic::TB>(), TApiUsageError);
+ EXPECT_THROW(TFormat::Protobuf<NUnitTesting::TCyclic::TC>(), TApiUsageError);
+ EXPECT_THROW(TFormat::Protobuf<NUnitTesting::TCyclic::TD>(), TApiUsageError);
+
+ const auto format = TFormat::Protobuf<NUnitTesting::TCyclic::TE>();
+ auto column = GetColumns(format)[0];
+ EXPECT_EQ(column["name"], "d");
+ EXPECT_EQ(column["proto_type"], "message");
+ EXPECT_EQ(column["field_number"], 1);
+}
+
+TEST(TProtobufFormatTest, Map)
+{
+ const auto format = TFormat::Protobuf<NUnitTesting::TWithMap>();
+ auto columns = GetColumns(format);
+
+ EXPECT_EQ(columns.Size(), 5u);
+ {
+ const auto& column = columns[0];
+ EXPECT_EQ(column["name"], "MapDefault");
+ EXPECT_EQ(column["proto_type"], "structured_message");
+ EXPECT_EQ(column["fields"].Size(), 2u);
+ EXPECT_EQ(column["fields"][0]["proto_type"], "int64");
+ EXPECT_EQ(column["fields"][1]["proto_type"], "message");
+ }
+ {
+ const auto& column = columns[1];
+ EXPECT_EQ(column["name"], "MapListOfStructsLegacy");
+ EXPECT_EQ(column["proto_type"], "structured_message");
+ EXPECT_EQ(column["fields"].Size(), 2u);
+ EXPECT_EQ(column["fields"][0]["proto_type"], "int64");
+ EXPECT_EQ(column["fields"][1]["proto_type"], "message");
+ }
+ {
+ const auto& column = columns[2];
+ EXPECT_EQ(column["name"], "MapListOfStructs");
+ EXPECT_EQ(column["proto_type"], "structured_message");
+ EXPECT_EQ(column["fields"].Size(), 2u);
+ EXPECT_EQ(column["fields"][0]["proto_type"], "int64");
+ EXPECT_EQ(column["fields"][1]["proto_type"], "structured_message");
+ }
+ {
+ const auto& column = columns[3];
+ EXPECT_EQ(column["name"], "MapOptionalDict");
+ EXPECT_EQ(column["proto_type"], "structured_message");
+ EXPECT_EQ(column["fields"].Size(), 2u);
+ EXPECT_EQ(column["fields"][0]["proto_type"], "int64");
+ EXPECT_EQ(column["fields"][1]["proto_type"], "structured_message");
+ }
+ {
+ const auto& column = columns[4];
+ EXPECT_EQ(column["name"], "MapDict");
+ EXPECT_EQ(column["proto_type"], "structured_message");
+ EXPECT_EQ(column["fields"].Size(), 2u);
+ EXPECT_EQ(column["fields"][0]["proto_type"], "int64");
+ EXPECT_EQ(column["fields"][1]["proto_type"], "structured_message");
+ }
+}
+
+
+TEST(TProtobufFormatTest, Oneof)
+{
+ const auto format = TFormat::Protobuf<NUnitTesting::TWithOneof>();
+ auto columns = GetColumns(format);
+
+ EXPECT_EQ(columns.Size(), 4u);
+ auto check = [] (const TNode& column, TStringBuf name, TStringBuf oneof2Name) {
+ EXPECT_EQ(column["name"], name);
+ EXPECT_EQ(column["proto_type"], "structured_message");
+ EXPECT_EQ(column["fields"].Size(), 5u);
+ EXPECT_EQ(column["fields"][0]["name"], "field");
+
+ const auto& oneof2 = column["fields"][1];
+ EXPECT_EQ(oneof2["name"], oneof2Name);
+ EXPECT_EQ(oneof2["proto_type"], "oneof");
+ EXPECT_EQ(oneof2["fields"][0]["name"], "y2");
+ EXPECT_EQ(oneof2["fields"][1]["name"], "z2");
+ EXPECT_EQ(oneof2["fields"][1]["proto_type"], "structured_message");
+ const auto& embeddedOneof = oneof2["fields"][1]["fields"][0];
+ EXPECT_EQ(embeddedOneof["name"], "Oneof");
+ EXPECT_EQ(embeddedOneof["fields"][0]["name"], "x");
+ EXPECT_EQ(embeddedOneof["fields"][1]["name"], "y");
+ EXPECT_EQ(oneof2["fields"][2]["name"], "x2");
+
+ EXPECT_EQ(column["fields"][2]["name"], "x1");
+ EXPECT_EQ(column["fields"][3]["name"], "y1");
+ EXPECT_EQ(column["fields"][4]["name"], "z1");
+ };
+
+ check(columns[0], "DefaultSeparateFields", "variant_field_name");
+ check(columns[1], "NoDefault", "Oneof2");
+
+ {
+ const auto& column = columns[2];
+ EXPECT_EQ(column["name"], "SerializationProtobuf");
+ EXPECT_EQ(column["proto_type"], "structured_message");
+ EXPECT_EQ(column["fields"].Size(), 3u);
+ EXPECT_EQ(column["fields"][0]["name"], "x1");
+ EXPECT_EQ(column["fields"][1]["name"], "y1");
+ EXPECT_EQ(column["fields"][2]["name"], "z1");
+ }
+ {
+ const auto& column = columns[3];
+ EXPECT_EQ(column["name"], "TopLevelOneof");
+ EXPECT_EQ(column["proto_type"], "oneof");
+ EXPECT_EQ(column["fields"].Size(), 1u);
+ EXPECT_EQ(column["fields"][0]["name"], "MemberOfTopLevelOneof");
+ }
+}
+
+TEST(TProto3Test, TWithOptional)
+{
+ const auto format = TFormat::Protobuf<NTestingProto3::TWithOptional>();
+ auto columns = GetColumns(format);
+
+ EXPECT_EQ(columns[0]["name"], "x");
+ EXPECT_EQ(columns[0]["proto_type"], "int64");
+ EXPECT_EQ(columns[0]["field_number"], 1);
+}
+
+TEST(TProto3Test, TWithOptionalMessage)
+{
+ const auto format = TFormat::Protobuf<NTestingProto3::TWithOptionalMessage>();
+ auto columns = GetColumns(format);
+
+ EXPECT_EQ(columns[0]["name"], "x");
+ EXPECT_EQ(columns[0]["proto_type"], "structured_message");
+ EXPECT_EQ(columns[0]["field_number"], 1);
+
+ EXPECT_EQ(columns[0]["fields"].Size(), 1u);
+ EXPECT_EQ(columns[0]["fields"][0]["name"], "x");
+ EXPECT_EQ(columns[0]["fields"][0]["proto_type"], "int64");
+ EXPECT_EQ(columns[0]["fields"][0]["field_number"], 1);
+}
diff --git a/yt/cpp/mapreduce/interface/ut/job_counters_ut.cpp b/yt/cpp/mapreduce/interface/ut/job_counters_ut.cpp
new file mode 100644
index 0000000000..9972637aff
--- /dev/null
+++ b/yt/cpp/mapreduce/interface/ut/job_counters_ut.cpp
@@ -0,0 +1,100 @@
+#include <yt/cpp/mapreduce/interface/job_counters.h>
+#include <yt/cpp/mapreduce/interface/operation.h>
+
+#include <library/cpp/yson/node/node_io.h>
+
+#include <library/cpp/testing/gtest/gtest.h>
+
+using namespace NYT;
+
+TEST(TJobCountersTest, Full)
+{
+ const TString input = R"""(
+ {
+ "completed" = {
+ "total" = 6;
+ "non-interrupted" = 1;
+ "interrupted" = {
+ "whatever_interrupted" = 2;
+ "whatever_else_interrupted" = 3;
+ };
+ };
+ "aborted" = {
+ "non_scheduled" = {
+ "whatever_non_scheduled" = 4;
+ "whatever_else_non_scheduled" = 5;
+ };
+ "scheduled" = {
+ "whatever_scheduled" = 6;
+ "whatever_else_scheduled" = 7;
+ };
+ "total" = 22;
+ };
+ "lost" = 8;
+ "invalidated" = 9;
+ "failed" = 10;
+ "running" = 11;
+ "suspended" = 12;
+ "pending" = 13;
+ "blocked" = 14;
+ "total" = 105;
+ })""";
+
+ TJobCounters counters(NodeFromYsonString(input));
+
+ EXPECT_EQ(counters.GetTotal(), 105u);
+
+ EXPECT_EQ(counters.GetCompleted().GetTotal(), 6u);
+ EXPECT_EQ(counters.GetCompletedNonInterrupted().GetTotal(), 1u);
+ EXPECT_EQ(counters.GetCompletedInterrupted().GetTotal(), 5u);
+ EXPECT_EQ(counters.GetAborted().GetTotal(), 22u);
+ EXPECT_EQ(counters.GetAbortedNonScheduled().GetTotal(), 9u);
+ EXPECT_EQ(counters.GetAbortedScheduled().GetTotal(), 13u);
+ EXPECT_EQ(counters.GetLost().GetTotal(), 8u);
+ EXPECT_EQ(counters.GetInvalidated().GetTotal(), 9u);
+ EXPECT_EQ(counters.GetFailed().GetTotal(), 10u);
+ EXPECT_EQ(counters.GetRunning().GetTotal(), 11u);
+ EXPECT_EQ(counters.GetSuspended().GetTotal(), 12u);
+ EXPECT_EQ(counters.GetPending().GetTotal(), 13u);
+ EXPECT_EQ(counters.GetBlocked().GetTotal(), 14u);
+
+ EXPECT_EQ(counters.GetCompletedInterrupted().GetValue("whatever_interrupted"), 2u);
+ EXPECT_EQ(counters.GetCompletedInterrupted().GetValue("whatever_else_interrupted"), 3u);
+ EXPECT_EQ(counters.GetAbortedNonScheduled().GetValue("whatever_non_scheduled"), 4u);
+ EXPECT_EQ(counters.GetAbortedNonScheduled().GetValue("whatever_else_non_scheduled"), 5u);
+ EXPECT_EQ(counters.GetAbortedScheduled().GetValue("whatever_scheduled"), 6u);
+ EXPECT_EQ(counters.GetAbortedScheduled().GetValue("whatever_else_scheduled"), 7u);
+
+ EXPECT_THROW(counters.GetCompletedInterrupted().GetValue("Nothingness"), yexception);
+}
+
+TEST(TJobCountersTest, Empty)
+{
+ const TString input = "{}";
+
+ TJobCounters counters(NodeFromYsonString(input));
+
+ EXPECT_EQ(counters.GetTotal(), 0u);
+
+ EXPECT_EQ(counters.GetCompleted().GetTotal(), 0u);
+ EXPECT_EQ(counters.GetCompletedNonInterrupted().GetTotal(), 0u);
+ EXPECT_EQ(counters.GetCompletedInterrupted().GetTotal(), 0u);
+ EXPECT_EQ(counters.GetAborted().GetTotal(), 0u);
+ EXPECT_EQ(counters.GetAbortedNonScheduled().GetTotal(), 0u);
+ EXPECT_EQ(counters.GetAbortedScheduled().GetTotal(), 0u);
+ EXPECT_EQ(counters.GetLost().GetTotal(), 0u);
+ EXPECT_EQ(counters.GetInvalidated().GetTotal(), 0u);
+ EXPECT_EQ(counters.GetFailed().GetTotal(), 0u);
+ EXPECT_EQ(counters.GetRunning().GetTotal(), 0u);
+ EXPECT_EQ(counters.GetSuspended().GetTotal(), 0u);
+ EXPECT_EQ(counters.GetPending().GetTotal(), 0u);
+ EXPECT_EQ(counters.GetBlocked().GetTotal(), 0u);
+}
+
+TEST(TJobCountersTest, Broken)
+{
+ EXPECT_THROW_MESSAGE_HAS_SUBSTR((TJobCounters(TNode())), yexception, "TJobCounters");
+ EXPECT_THROW_MESSAGE_HAS_SUBSTR((TJobCounters(TNode(1))), yexception, "TJobCounters");
+ EXPECT_THROW_MESSAGE_HAS_SUBSTR((TJobCounters(TNode(1.0))), yexception, "TJobCounters");
+ EXPECT_THROW_MESSAGE_HAS_SUBSTR((TJobCounters(TNode("Whatever"))), yexception, "TJobCounters");
+}
diff --git a/yt/cpp/mapreduce/interface/ut/job_statistics_ut.cpp b/yt/cpp/mapreduce/interface/ut/job_statistics_ut.cpp
new file mode 100644
index 0000000000..90d40623c1
--- /dev/null
+++ b/yt/cpp/mapreduce/interface/ut/job_statistics_ut.cpp
@@ -0,0 +1,254 @@
+#include <yt/cpp/mapreduce/interface/job_statistics.h>
+#include <yt/cpp/mapreduce/interface/operation.h>
+
+#include <library/cpp/yson/node/node_io.h>
+
+#include <library/cpp/testing/gtest/gtest.h>
+
+using namespace NYT;
+
+TEST(TJobStatisticsTest, Simple)
+{
+ const TString input = R"""(
+ {
+ "data" = {
+ "output" = {
+ "0" = {
+ "uncompressed_data_size" = {
+ "$" = {
+ "completed" = {
+ "simple_sort" = {
+ "max" = 130;
+ "count" = 1;
+ "min" = 130;
+ "sum" = 130;
+ };
+ "map" = {
+ "max" = 42;
+ "count" = 1;
+ "min" = 42;
+ "sum" = 42;
+ };
+ };
+ "aborted" = {
+ "simple_sort" = {
+ "max" = 24;
+ "count" = 1;
+ "min" = 24;
+ "sum" = 24;
+ };
+ };
+ };
+ };
+ };
+ };
+ };
+ })""";
+
+ TJobStatistics stat(NodeFromYsonString(input));
+
+ EXPECT_TRUE(stat.HasStatistics("data/output/0/uncompressed_data_size"));
+ EXPECT_TRUE(!stat.HasStatistics("nonexistent-statistics"));
+ EXPECT_THROW_MESSAGE_HAS_SUBSTR(stat.GetStatistics("BLAH-BLAH"), yexception, "Statistics");
+
+ EXPECT_EQ(stat.GetStatisticsNames(), TVector<TString>{"data/output/0/uncompressed_data_size"});
+
+ EXPECT_EQ(stat.GetStatistics("data/output/0/uncompressed_data_size").Max(), 130);
+ EXPECT_EQ(stat.GetStatistics("data/output/0/uncompressed_data_size").Count(), 2);
+ EXPECT_EQ(stat.GetStatistics("data/output/0/uncompressed_data_size").Min(), 42);
+ EXPECT_EQ(stat.GetStatistics("data/output/0/uncompressed_data_size").Sum(), 172);
+ EXPECT_EQ(stat.GetStatistics("data/output/0/uncompressed_data_size").Avg(), 172 / 2);
+
+ EXPECT_EQ(stat.JobState({EJobState::Aborted}).GetStatistics("data/output/0/uncompressed_data_size").Sum(), 24);
+ EXPECT_EQ(stat.JobType({EJobType::Map}).JobState({EJobState::Aborted}).GetStatistics("data/output/0/uncompressed_data_size").Sum(), TMaybe<i64>());
+}
+
+TEST(TJobStatisticsTest, OtherTypes)
+{
+ const TString input = R"""(
+ {
+ "time" = {
+ "exec" = {
+ "$" = {
+ "completed" = {
+ "map" = {
+ "max" = 2482468;
+ "count" = 38;
+ "min" = 578976;
+ "sum" = 47987270;
+ };
+ };
+ };
+ };
+ };
+ })""";
+
+ TJobStatistics stat(NodeFromYsonString(input));
+
+ EXPECT_EQ(stat.GetStatisticsAs<TDuration>("time/exec").Max(), TDuration::MilliSeconds(2482468));
+}
+
+TEST(TJobStatisticsTest, Custom)
+{
+ const TString input = R"""(
+ {
+ "custom" = {
+ "some" = {
+ "path" = {
+ "$" = {
+ "completed" = {
+ "map" = {
+ "max" = -1;
+ "count" = 1;
+ "min" = -1;
+ "sum" = -1;
+ };
+ };
+ };
+ };
+ };
+ "another" = {
+ "path" = {
+ "$" = {
+ "completed" = {
+ "map" = {
+ "max" = 1001;
+ "count" = 2;
+ "min" = 1001;
+ "sum" = 2002;
+ };
+ };
+ };
+ };
+ };
+ };
+ })""";
+
+ TJobStatistics stat(NodeFromYsonString(input));
+
+ EXPECT_TRUE(stat.HasCustomStatistics("some/path"));
+ EXPECT_TRUE(!stat.HasCustomStatistics("nonexistent-statistics"));
+ EXPECT_THROW_MESSAGE_HAS_SUBSTR(stat.GetCustomStatistics("BLAH-BLAH"), yexception, "Statistics");
+
+ const auto names = stat.GetCustomStatisticsNames();
+ const THashSet<TString> expected = {"some/path", "another/path"};
+ EXPECT_EQ(THashSet<TString>(names.begin(), names.end()), expected);
+
+ EXPECT_EQ(stat.GetCustomStatistics("some/path").Max(), -1);
+ EXPECT_EQ(stat.GetCustomStatistics("another/path").Avg(), 1001);
+}
+
+TEST(TJobStatisticsTest, TaskNames)
+{
+ const TString input = R"""(
+ {
+ "data" = {
+ "output" = {
+ "0" = {
+ "uncompressed_data_size" = {
+ "$" = {
+ "completed" = {
+ "partition_map" = {
+ "max" = 130;
+ "count" = 1;
+ "min" = 130;
+ "sum" = 130;
+ };
+ "partition(0)" = {
+ "max" = 42;
+ "count" = 1;
+ "min" = 42;
+ "sum" = 42;
+ };
+ };
+ "aborted" = {
+ "simple_sort" = {
+ "max" = 24;
+ "count" = 1;
+ "min" = 24;
+ "sum" = 24;
+ };
+ };
+ };
+ };
+ };
+ };
+ };
+ })""";
+
+ TJobStatistics stat(NodeFromYsonString(input));
+
+ EXPECT_TRUE(stat.HasStatistics("data/output/0/uncompressed_data_size"));
+ EXPECT_TRUE(!stat.HasStatistics("nonexistent-statistics"));
+ EXPECT_THROW_MESSAGE_HAS_SUBSTR(stat.GetStatistics("BLAH-BLAH"), yexception, "Statistics");
+
+ EXPECT_EQ(stat.GetStatisticsNames(), TVector<TString>{"data/output/0/uncompressed_data_size"});
+
+ EXPECT_EQ(stat.GetStatistics("data/output/0/uncompressed_data_size").Max(), 130);
+ EXPECT_EQ(stat.GetStatistics("data/output/0/uncompressed_data_size").Count(), 2);
+ EXPECT_EQ(stat.GetStatistics("data/output/0/uncompressed_data_size").Min(), 42);
+ EXPECT_EQ(stat.GetStatistics("data/output/0/uncompressed_data_size").Sum(), 172);
+ EXPECT_EQ(stat.GetStatistics("data/output/0/uncompressed_data_size").Avg(), 172 / 2);
+
+ EXPECT_EQ(
+ stat
+ .JobState({EJobState::Aborted})
+ .GetStatistics("data/output/0/uncompressed_data_size")
+ .Sum(),
+ 24);
+ EXPECT_EQ(
+ stat
+ .JobType({EJobType::Partition})
+ .JobState({EJobState::Aborted})
+ .GetStatistics("data/output/0/uncompressed_data_size")
+ .Sum(),
+ TMaybe<i64>());
+ EXPECT_EQ(
+ stat
+ .TaskName({"partition(0)"})
+ .GetStatistics("data/output/0/uncompressed_data_size")
+ .Sum(),
+ 42);
+ EXPECT_EQ(
+ stat
+ .TaskName({"partition"})
+ .GetStatistics("data/output/0/uncompressed_data_size")
+ .Sum(),
+ TMaybe<i64>());
+ EXPECT_EQ(
+ stat
+ .TaskName({"partition_map(0)"})
+ .GetStatistics("data/output/0/uncompressed_data_size")
+ .Sum(),
+ 130);
+ EXPECT_EQ(
+ stat
+ .JobType({EJobType::Partition})
+ .GetStatistics("data/output/0/uncompressed_data_size")
+ .Sum(),
+ 42);
+ EXPECT_EQ(
+ stat
+ .JobType({EJobType::PartitionMap})
+ .GetStatistics("data/output/0/uncompressed_data_size")
+ .Sum(),
+ 130);
+ EXPECT_EQ(
+ stat
+ .TaskName({ETaskName::Partition0})
+ .GetStatistics("data/output/0/uncompressed_data_size")
+ .Sum(),
+ 42);
+ EXPECT_EQ(
+ stat
+ .TaskName({ETaskName::Partition1})
+ .GetStatistics("data/output/0/uncompressed_data_size")
+ .Sum(),
+ TMaybe<i64>());
+ EXPECT_EQ(
+ stat
+ .TaskName({ETaskName::PartitionMap0})
+ .GetStatistics("data/output/0/uncompressed_data_size")
+ .Sum(),
+ 130);
+}
diff --git a/yt/cpp/mapreduce/interface/ut/operation_ut.cpp b/yt/cpp/mapreduce/interface/ut/operation_ut.cpp
new file mode 100644
index 0000000000..81d03d0618
--- /dev/null
+++ b/yt/cpp/mapreduce/interface/ut/operation_ut.cpp
@@ -0,0 +1,272 @@
+#include "common_ut.h"
+
+#include <yt/cpp/mapreduce/interface/job_statistics.h>
+#include <yt/cpp/mapreduce/interface/operation.h>
+
+#include <yt/cpp/mapreduce/interface/ut/protobuf_table_schema_ut.pb.h>
+
+#include <yt/cpp/mapreduce/tests/yt_unittest_lib/yt_unittest_lib.h>
+
+#include <library/cpp/yson/node/node_io.h>
+
+#include <library/cpp/testing/gtest/gtest.h>
+
+using namespace NYT;
+using namespace NYT::NUnitTesting;
+
+////////////////////////////////////////////////////////////////////
+
+class TDummyInferenceContext
+ : public IOperationPreparationContext
+{
+public:
+ TDummyInferenceContext(int inputCount, int outputCount)
+ : InputCount_(inputCount)
+ , OutputCount_(outputCount)
+ , InputSchemas_(inputCount)
+ { }
+
+ int GetInputCount() const override
+ {
+ return InputCount_;
+ }
+
+ int GetOutputCount() const override
+ {
+ return OutputCount_;
+ }
+
+ const TVector<TTableSchema>& GetInputSchemas() const override
+ {
+ return InputSchemas_;
+ }
+
+ const TTableSchema& GetInputSchema(int index) const override
+ {
+ return InputSchemas_[index];
+ }
+
+ TMaybe<TYPath> GetInputPath(int) const override
+ {
+ return Nothing();
+ }
+
+ TMaybe<TYPath> GetOutputPath(int) const override
+ {
+ return Nothing();
+ }
+
+private:
+ int InputCount_;
+ int OutputCount_;
+ TVector<TTableSchema> InputSchemas_;
+};
+
+////////////////////////////////////////////////////////////////////
+
+TEST(TPrepareOperationTest, BasicSchemas)
+{
+ auto firstSchema = TTableSchema()
+ .AddColumn(TColumnSchema().Name("some_column").Type(EValueType::VT_UINT64));
+ auto otherSchema = TTableSchema()
+ .AddColumn(TColumnSchema().Name("other_column").Type(EValueType::VT_BOOLEAN));
+ auto thirdSchema = TTableSchema()
+ .AddColumn(TColumnSchema().Name("third_column").Type(EValueType::VT_STRING));
+
+ TDummyInferenceContext context(3,7);
+ TJobOperationPreparer builder(context);
+
+ builder
+ .OutputSchema(1, firstSchema)
+ .BeginOutputGroup(TVector<int>{2, 5})
+ .Schema(otherSchema)
+ .EndOutputGroup()
+ .BeginOutputGroup(3, 5)
+ .Schema(thirdSchema)
+ .EndOutputGroup()
+ .BeginOutputGroup(TVector<int>{0, 6})
+ .Schema(thirdSchema)
+ .EndOutputGroup();
+
+ EXPECT_THROW(builder.OutputSchema(1, otherSchema), TApiUsageError);
+ EXPECT_THROW(builder.BeginOutputGroup(3, 5).Schema(otherSchema), TApiUsageError);
+ EXPECT_THROW(builder.BeginOutputGroup(TVector<int>{3,6,7}).Schema(otherSchema), TApiUsageError);
+
+ builder.Finish();
+ auto result = builder.GetOutputSchemas();
+
+ ASSERT_SERIALIZABLES_EQ(result[0], thirdSchema);
+ ASSERT_SERIALIZABLES_EQ(result[1], firstSchema);
+ ASSERT_SERIALIZABLES_EQ(result[2], otherSchema);
+ ASSERT_SERIALIZABLES_EQ(result[3], thirdSchema);
+ ASSERT_SERIALIZABLES_EQ(result[4], thirdSchema);
+ ASSERT_SERIALIZABLES_EQ(result[5], otherSchema);
+ ASSERT_SERIALIZABLES_EQ(result[6], thirdSchema);
+}
+
+TEST(TPrepareOperationTest, NoSchema)
+{
+ auto schema = TTableSchema()
+ .AddColumn(TColumnSchema().Name("some_column").Type(EValueType::VT_UINT64));
+
+ TDummyInferenceContext context(3,4);
+ TJobOperationPreparer builder(context);
+
+ builder
+ .OutputSchema(1, schema)
+ .NoOutputSchema(0)
+ .BeginOutputGroup(2, 4)
+ .Schema(schema)
+ .EndOutputGroup();
+
+ EXPECT_THROW(builder.OutputSchema(0, schema), TApiUsageError);
+
+ builder.Finish();
+ auto result = builder.GetOutputSchemas();
+
+ EXPECT_TRUE(result[0].Empty());
+
+ ASSERT_SERIALIZABLES_EQ(result[1], schema);
+ ASSERT_SERIALIZABLES_EQ(result[2], schema);
+ ASSERT_SERIALIZABLES_EQ(result[3], schema);
+}
+
+TEST(TPrepareOperationTest, Descriptions)
+{
+ auto urlRowSchema = TTableSchema()
+ .AddColumn(TColumnSchema().Name("Host").Type(NTi::Optional(NTi::String())))
+ .AddColumn(TColumnSchema().Name("Path").Type(NTi::Optional(NTi::String())))
+ .AddColumn(TColumnSchema().Name("HttpCode").Type(NTi::Optional(NTi::Int32())));
+
+ auto urlRowStruct = NTi::Struct({
+ {"Host", NTi::Optional(NTi::String())},
+ {"Path", NTi::Optional(NTi::String())},
+ {"HttpCode", NTi::Optional(NTi::Int32())},
+ });
+
+ auto rowFieldSerializationOptionSchema = TTableSchema()
+ .AddColumn(TColumnSchema().Name("UrlRow_1").Type(NTi::Optional(urlRowStruct)))
+ .AddColumn(TColumnSchema().Name("UrlRow_2").Type(NTi::Optional(NTi::String())));
+
+ auto rowSerializedRepeatedFieldsSchema = TTableSchema()
+ .AddColumn(TColumnSchema().Name("Ints").Type(NTi::List(NTi::Int64())))
+ .AddColumn(TColumnSchema().Name("UrlRows").Type(NTi::List(urlRowStruct)));
+
+ TDummyInferenceContext context(5,7);
+ TJobOperationPreparer builder(context);
+
+ builder
+ .InputDescription<TUrlRow>(0)
+ .BeginInputGroup(2, 3)
+ .Description<TUrlRow>()
+ .EndInputGroup()
+ .BeginInputGroup(TVector<int>{1, 4})
+ .Description<TRowSerializedRepeatedFields>()
+ .EndInputGroup()
+ .InputDescription<TUrlRow>(3);
+
+ EXPECT_THROW(builder.InputDescription<TUrlRow>(0), TApiUsageError);
+
+ builder
+ .OutputDescription<TUrlRow>(0, false)
+ .OutputDescription<TRowFieldSerializationOption>(1)
+ .BeginOutputGroup(2, 4)
+ .Description<TUrlRow>()
+ .EndOutputGroup()
+ .BeginOutputGroup(TVector<int>{4,6})
+ .Description<TRowSerializedRepeatedFields>()
+ .EndOutputGroup()
+ .OutputDescription<TUrlRow>(5, false);
+
+ EXPECT_THROW(builder.OutputDescription<TUrlRow>(0), TApiUsageError);
+ EXPECT_NO_THROW(builder.OutputSchema(0, urlRowSchema));
+ EXPECT_NO_THROW(builder.OutputSchema(5, urlRowSchema));
+ EXPECT_THROW(builder.OutputSchema(1, urlRowSchema), TApiUsageError);
+
+ builder.Finish();
+ auto result = builder.GetOutputSchemas();
+
+ ASSERT_SERIALIZABLES_EQ(result[0], urlRowSchema);
+ ASSERT_SERIALIZABLES_EQ(result[1], rowFieldSerializationOptionSchema);
+ ASSERT_SERIALIZABLES_EQ(result[2], urlRowSchema);
+ ASSERT_SERIALIZABLES_EQ(result[3], urlRowSchema);
+ ASSERT_SERIALIZABLES_EQ(result[4], rowSerializedRepeatedFieldsSchema);
+ ASSERT_SERIALIZABLES_EQ(result[5], urlRowSchema);
+ ASSERT_SERIALIZABLES_EQ(result[6], rowSerializedRepeatedFieldsSchema);
+
+ auto expectedInputDescriptions = TVector<TMaybe<TTableStructure>>{
+ {TProtobufTableStructure{TUrlRow::descriptor()}},
+ {TProtobufTableStructure{TRowSerializedRepeatedFields::descriptor()}},
+ {TProtobufTableStructure{TUrlRow::descriptor()}},
+ {TProtobufTableStructure{TUrlRow::descriptor()}},
+ {TProtobufTableStructure{TRowSerializedRepeatedFields::descriptor()}},
+ };
+ EXPECT_EQ(expectedInputDescriptions, builder.GetInputDescriptions());
+
+ auto expectedOutputDescriptions = TVector<TMaybe<TTableStructure>>{
+ {TProtobufTableStructure{TUrlRow::descriptor()}},
+ {TProtobufTableStructure{TRowFieldSerializationOption::descriptor()}},
+ {TProtobufTableStructure{TUrlRow::descriptor()}},
+ {TProtobufTableStructure{TUrlRow::descriptor()}},
+ {TProtobufTableStructure{TRowSerializedRepeatedFields::descriptor()}},
+ {TProtobufTableStructure{TUrlRow::descriptor()}},
+ {TProtobufTableStructure{TRowSerializedRepeatedFields::descriptor()}},
+ };
+ EXPECT_EQ(expectedOutputDescriptions, builder.GetOutputDescriptions());
+}
+
+TEST(TPrepareOperationTest, InputColumns)
+{
+ TDummyInferenceContext context(5, 1);
+ TJobOperationPreparer builder(context);
+ builder
+ .InputColumnFilter(2, {"a", "b"})
+ .BeginInputGroup(0, 2)
+ .ColumnFilter({"b", "c"})
+ .ColumnRenaming({{"b", "B"}, {"c", "C"}})
+ .EndInputGroup()
+ .InputColumnRenaming(3, {{"a", "AAA"}})
+ .NoOutputSchema(0);
+ builder.Finish();
+
+ auto expectedRenamings = TVector<THashMap<TString, TString>>{
+ {{"b", "B"}, {"c", "C"}},
+ {{"b", "B"}, {"c", "C"}},
+ {},
+ {{"a", "AAA"}},
+ {},
+ };
+ EXPECT_EQ(builder.GetInputColumnRenamings(), expectedRenamings);
+
+ auto expectedFilters = TVector<TMaybe<TVector<TString>>>{
+ {{"b", "c"}},
+ {{"b", "c"}},
+ {{"a", "b"}},
+ {},
+ {},
+ };
+ EXPECT_EQ(builder.GetInputColumnFilters(), expectedFilters);
+}
+
+TEST(TPrepareOperationTest, Bug_r7349102)
+{
+ auto firstSchema = TTableSchema()
+ .AddColumn(TColumnSchema().Name("some_column").Type(EValueType::VT_UINT64));
+ auto otherSchema = TTableSchema()
+ .AddColumn(TColumnSchema().Name("other_column").Type(EValueType::VT_BOOLEAN));
+ auto thirdSchema = TTableSchema()
+ .AddColumn(TColumnSchema().Name("third_column").Type(EValueType::VT_STRING));
+
+ TDummyInferenceContext context(3,1);
+ TJobOperationPreparer builder(context);
+
+ builder
+ .InputDescription<TUrlRow>(0)
+ .InputDescription<TUrlRow>(1)
+ .InputDescription<TUrlRow>(2)
+ .OutputDescription<TUrlRow>(0);
+
+ builder.Finish();
+}
+
+////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/interface/proto3_ut.proto b/yt/cpp/mapreduce/interface/ut/proto3_ut.proto
index b24c13085b..b24c13085b 100644
--- a/yt/cpp/mapreduce/interface/proto3_ut.proto
+++ b/yt/cpp/mapreduce/interface/ut/proto3_ut.proto
diff --git a/yt/cpp/mapreduce/interface/ut/protobuf_file_options_ut.cpp b/yt/cpp/mapreduce/interface/ut/protobuf_file_options_ut.cpp
new file mode 100644
index 0000000000..abfe5bbfdc
--- /dev/null
+++ b/yt/cpp/mapreduce/interface/ut/protobuf_file_options_ut.cpp
@@ -0,0 +1,270 @@
+#include "common_ut.h"
+
+#include <yt/cpp/mapreduce/interface/errors.h>
+#include <yt/cpp/mapreduce/interface/format.h>
+
+#include <yt/cpp/mapreduce/interface/ut/protobuf_file_options_ut.pb.h>
+
+#include <yt/cpp/mapreduce/tests/yt_unittest_lib/yt_unittest_lib.h>
+
+#include <library/cpp/testing/gtest/gtest.h>
+
+using namespace NYT;
+
+namespace {
+
+NTi::TTypePtr GetUrlRowType(bool required)
+{
+ static const NTi::TTypePtr structType = NTi::Struct({
+ {"Host", ToTypeV3(EValueType::VT_STRING, false)},
+ {"Path", ToTypeV3(EValueType::VT_STRING, false)},
+ {"HttpCode", ToTypeV3(EValueType::VT_INT32, false)}});
+ return required ? structType : NTi::TTypePtr(NTi::Optional(structType));
+}
+
+} // namespace
+
+TEST(TProtobufFileOptionsTest, TRowFieldSerializationOption)
+{
+ const auto schema = CreateTableSchema<NTestingFileOptions::TRowFieldSerializationOption>();
+
+ ASSERT_SERIALIZABLES_EQ(schema, TTableSchema()
+ .AddColumn(TColumnSchema().Name("UrlRow_1").Type(ToTypeV3(EValueType::VT_STRING, false)))
+ .AddColumn(TColumnSchema().Name("UrlRow_2").Type(GetUrlRowType(false))));
+}
+
+TEST(TProtobufFileOptionsTest, TRowMixedSerializationOptions)
+{
+ const auto schema = CreateTableSchema<NTestingFileOptions::TRowMixedSerializationOptions>();
+
+ ASSERT_SERIALIZABLES_EQ(schema, TTableSchema()
+ .AddColumn(TColumnSchema().Name("UrlRow_1").Type(ToTypeV3(EValueType::VT_STRING, false)))
+ .AddColumn(TColumnSchema().Name("UrlRow_2").Type(GetUrlRowType(false))));
+}
+
+TEST(TProtobufFileOptionsTest, FieldSortOrder)
+{
+ const auto schema = CreateTableSchema<NTestingFileOptions::TFieldSortOrder>();
+
+ auto asInProtoFile = NTi::Optional(NTi::Struct({
+ {"x", NTi::Optional(NTi::Int64())},
+ {"y", NTi::Optional(NTi::String())},
+ {"z", NTi::Optional(NTi::Bool())},
+ }));
+ auto byFieldNumber = NTi::Optional(NTi::Struct({
+ {"z", NTi::Optional(NTi::Bool())},
+ {"x", NTi::Optional(NTi::Int64())},
+ {"y", NTi::Optional(NTi::String())},
+ }));
+
+ ASSERT_SERIALIZABLES_EQ(schema, TTableSchema()
+ .AddColumn(TColumnSchema().Name("EmbeddedDefault").Type(asInProtoFile))
+ .AddColumn(TColumnSchema().Name("EmbeddedAsInProtoFile").Type(asInProtoFile))
+ .AddColumn(TColumnSchema().Name("EmbeddedByFieldNumber").Type(byFieldNumber)));
+}
+
+TEST(TProtobufFileOptionsTest, Map)
+{
+ const auto schema = CreateTableSchema<NTestingFileOptions::TWithMap>();
+
+ auto createKeyValueStruct = [] (NTi::TTypePtr key, NTi::TTypePtr value) {
+ return NTi::List(NTi::Struct({
+ {"key", NTi::Optional(key)},
+ {"value", NTi::Optional(value)},
+ }));
+ };
+
+ auto embedded = NTi::Struct({
+ {"x", NTi::Optional(NTi::Int64())},
+ {"y", NTi::Optional(NTi::String())},
+ });
+
+ ASSERT_SERIALIZABLES_EQ(schema, TTableSchema()
+ .AddColumn(TColumnSchema()
+ .Name("MapDefault")
+ .Type(createKeyValueStruct(NTi::Int64(), embedded)))
+ .AddColumn(TColumnSchema()
+ .Name("MapDict")
+ .Type(NTi::Dict(NTi::Int64(), embedded))));
+}
+
+TEST(TProtobufFileOptionsTest, Oneof)
+{
+ const auto schema = CreateTableSchema<NTestingFileOptions::TWithOneof>();
+
+ auto embedded = NTi::Struct({
+ {"x", NTi::Optional(NTi::Int64())},
+ {"y", NTi::Optional(NTi::String())},
+ });
+
+ auto defaultVariantType = NTi::Optional(NTi::Struct({
+ {"field", NTi::Optional(NTi::String())},
+ {"Oneof2", NTi::Optional(NTi::Variant(NTi::Struct({
+ {"y2", NTi::String()},
+ {"z2", embedded},
+ {"x2", NTi::Int64()},
+ })))},
+ {"x1", NTi::Optional(NTi::Int64())},
+ {"y1", NTi::Optional(NTi::String())},
+ {"z1", NTi::Optional(embedded)},
+ }));
+
+ auto noDefaultType = NTi::Optional(NTi::Struct({
+ {"field", NTi::Optional(NTi::String())},
+ {"y2", NTi::Optional(NTi::String())},
+ {"z2", NTi::Optional(embedded)},
+ {"x2", NTi::Optional(NTi::Int64())},
+ {"x1", NTi::Optional(NTi::Int64())},
+ {"y1", NTi::Optional(NTi::String())},
+ {"z1", NTi::Optional(embedded)},
+ }));
+
+ ASSERT_SERIALIZABLES_EQ(schema, TTableSchema()
+ .AddColumn(TColumnSchema()
+ .Name("DefaultVariant")
+ .Type(defaultVariantType)
+ )
+ .AddColumn(TColumnSchema()
+ .Name("NoDefault")
+ .Type(noDefaultType)
+ )
+ .AddColumn(TColumnSchema()
+ .Name("SerializationProtobuf")
+ .Type(NTi::Optional(NTi::Struct({
+ {"x1", NTi::Optional(NTi::Int64())},
+ {"y1", NTi::Optional(NTi::String())},
+ {"z1", NTi::Optional(NTi::String())},
+ })))
+ )
+ .AddColumn(TColumnSchema()
+ .Name("MemberOfTopLevelOneof")
+ .Type(NTi::Optional(NTi::Int64()))
+ )
+ );
+}
+
+static TNode GetColumns(const TFormat& format, int tableIndex = 0)
+{
+ return format.Config.GetAttributes()["tables"][tableIndex]["columns"];
+}
+
+TEST(TProtobufFormatFileOptionsTest, TRowFieldSerializationOption)
+{
+ const auto format = TFormat::Protobuf<NTestingFileOptions::TRowFieldSerializationOption>();
+ auto columns = GetColumns(format);
+
+ EXPECT_EQ(columns[0]["name"], "UrlRow_1");
+ EXPECT_EQ(columns[0]["proto_type"], "message");
+ EXPECT_EQ(columns[0]["field_number"], 1);
+
+ EXPECT_EQ(columns[1]["name"], "UrlRow_2");
+ EXPECT_EQ(columns[1]["proto_type"], "structured_message");
+ EXPECT_EQ(columns[1]["field_number"], 2);
+ const auto& fields = columns[1]["fields"];
+ EXPECT_EQ(fields[0]["name"], "Host");
+ EXPECT_EQ(fields[0]["proto_type"], "string");
+ EXPECT_EQ(fields[0]["field_number"], 1);
+
+ EXPECT_EQ(fields[1]["name"], "Path");
+ EXPECT_EQ(fields[1]["proto_type"], "string");
+ EXPECT_EQ(fields[1]["field_number"], 2);
+
+ EXPECT_EQ(fields[2]["name"], "HttpCode");
+ EXPECT_EQ(fields[2]["proto_type"], "sint32");
+ EXPECT_EQ(fields[2]["field_number"], 3);
+}
+
+TEST(TProtobufFormatFileOptionsTest, Map)
+{
+ const auto format = TFormat::Protobuf<NTestingFileOptions::TWithMap>();
+ auto columns = GetColumns(format);
+
+ EXPECT_EQ(columns.Size(), 2u);
+ {
+ const auto& column = columns[0];
+ EXPECT_EQ(column["name"], "MapDefault");
+ EXPECT_EQ(column["proto_type"], "structured_message");
+ EXPECT_EQ(column["fields"].Size(), 2u);
+ EXPECT_EQ(column["fields"][0]["proto_type"], "int64");
+ EXPECT_EQ(column["fields"][1]["proto_type"], "structured_message");
+ }
+ {
+ const auto& column = columns[1];
+ EXPECT_EQ(column["name"], "MapDict");
+ EXPECT_EQ(column["proto_type"], "structured_message");
+ EXPECT_EQ(column["fields"].Size(), 2u);
+ EXPECT_EQ(column["fields"][0]["proto_type"], "int64");
+ EXPECT_EQ(column["fields"][1]["proto_type"], "structured_message");
+ }
+}
+
+TEST(TProtobufFormatFileOptionsTest, Oneof)
+{
+ const auto format = TFormat::Protobuf<NTestingFileOptions::TWithOneof>();
+ auto columns = GetColumns(format);
+
+ EXPECT_EQ(columns.Size(), 4u);
+
+ {
+ const auto& column = columns[0];
+ EXPECT_EQ(column["name"], "DefaultVariant");
+ EXPECT_EQ(column["proto_type"], "structured_message");
+ EXPECT_EQ(column["fields"].Size(), 5u);
+ EXPECT_EQ(column["fields"][0]["name"], "field");
+
+ const auto& oneof2 = column["fields"][1];
+ EXPECT_EQ(oneof2["name"], "Oneof2");
+ EXPECT_EQ(oneof2["proto_type"], "oneof");
+ EXPECT_EQ(oneof2["fields"][0]["name"], "y2");
+ EXPECT_EQ(oneof2["fields"][1]["name"], "z2");
+ EXPECT_EQ(oneof2["fields"][1]["proto_type"], "structured_message");
+ const auto& embeddedFields = oneof2["fields"][1]["fields"];
+ EXPECT_EQ(embeddedFields[0]["name"], "x");
+ EXPECT_EQ(embeddedFields[1]["name"], "y");
+
+ EXPECT_EQ(oneof2["fields"][2]["name"], "x2");
+
+ EXPECT_EQ(column["fields"][2]["name"], "x1");
+ EXPECT_EQ(column["fields"][3]["name"], "y1");
+ EXPECT_EQ(column["fields"][4]["name"], "z1");
+ };
+
+ {
+ const auto& column = columns[1];
+ EXPECT_EQ(column["name"], "NoDefault");
+ EXPECT_EQ(column["proto_type"], "structured_message");
+ const auto& fields = column["fields"];
+ EXPECT_EQ(fields.Size(), 7u);
+
+ EXPECT_EQ(fields[0]["name"], "field");
+
+ EXPECT_EQ(fields[1]["name"], "y2");
+
+ EXPECT_EQ(fields[2]["name"], "z2");
+ EXPECT_EQ(fields[2]["proto_type"], "structured_message");
+ const auto& embeddedFields = fields[2]["fields"];
+ EXPECT_EQ(embeddedFields[0]["name"], "x");
+ EXPECT_EQ(embeddedFields[1]["name"], "y");
+
+ EXPECT_EQ(fields[3]["name"], "x2");
+
+ EXPECT_EQ(fields[4]["name"], "x1");
+ EXPECT_EQ(fields[5]["name"], "y1");
+ EXPECT_EQ(fields[6]["name"], "z1");
+ };
+
+ {
+ const auto& column = columns[2];
+ EXPECT_EQ(column["name"], "SerializationProtobuf");
+ EXPECT_EQ(column["proto_type"], "structured_message");
+ EXPECT_EQ(column["fields"].Size(), 3u);
+ EXPECT_EQ(column["fields"][0]["name"], "x1");
+ EXPECT_EQ(column["fields"][1]["name"], "y1");
+ EXPECT_EQ(column["fields"][2]["name"], "z1");
+ }
+ {
+ const auto& column = columns[3];
+ EXPECT_EQ(column["name"], "MemberOfTopLevelOneof");
+ EXPECT_EQ(column["proto_type"], "int64");
+ }
+}
diff --git a/yt/cpp/mapreduce/interface/protobuf_file_options_ut.proto b/yt/cpp/mapreduce/interface/ut/protobuf_file_options_ut.proto
index 4804b2f60c..4804b2f60c 100644
--- a/yt/cpp/mapreduce/interface/protobuf_file_options_ut.proto
+++ b/yt/cpp/mapreduce/interface/ut/protobuf_file_options_ut.proto
diff --git a/yt/cpp/mapreduce/interface/ut/protobuf_table_schema_ut.cpp b/yt/cpp/mapreduce/interface/ut/protobuf_table_schema_ut.cpp
new file mode 100644
index 0000000000..d7bee1e6d2
--- /dev/null
+++ b/yt/cpp/mapreduce/interface/ut/protobuf_table_schema_ut.cpp
@@ -0,0 +1,444 @@
+#include "common_ut.h"
+
+#include <yt/cpp/mapreduce/interface/common.h>
+#include <yt/cpp/mapreduce/interface/errors.h>
+
+#include <yt/cpp/mapreduce/interface/ut/proto3_ut.pb.h>
+#include <yt/cpp/mapreduce/interface/ut/protobuf_table_schema_ut.pb.h>
+
+#include <yt/cpp/mapreduce/tests/yt_unittest_lib/yt_unittest_lib.h>
+
+#include <library/cpp/testing/gtest/gtest.h>
+
+#include <util/generic/fwd.h>
+
+#include <algorithm>
+
+using namespace NYT;
+
+bool IsFieldPresent(const TTableSchema& schema, TStringBuf name)
+{
+ for (const auto& field : schema.Columns()) {
+ if (field.Name() == name) {
+ return true;
+ }
+ }
+ return false;
+}
+
+TEST(TProtoSchemaSimpleTest, TIntegral)
+{
+ const auto schema = CreateTableSchema<NUnitTesting::TIntegral>();
+
+ ASSERT_SERIALIZABLES_EQ(schema, TTableSchema()
+ .AddColumn(TColumnSchema().Name("DoubleField").Type(ToTypeV3(EValueType::VT_DOUBLE, false)))
+ .AddColumn(TColumnSchema().Name("FloatField").Type(ToTypeV3(EValueType::VT_DOUBLE, false)))
+ .AddColumn(TColumnSchema().Name("Int32Field").Type(ToTypeV3(EValueType::VT_INT32, false)))
+ .AddColumn(TColumnSchema().Name("Int64Field").Type(ToTypeV3(EValueType::VT_INT64, false)))
+ .AddColumn(TColumnSchema().Name("Uint32Field").Type(ToTypeV3(EValueType::VT_UINT32, false)))
+ .AddColumn(TColumnSchema().Name("Uint64Field").Type(ToTypeV3(EValueType::VT_UINT64, false)))
+ .AddColumn(TColumnSchema().Name("Sint32Field").Type(ToTypeV3(EValueType::VT_INT32, false)))
+ .AddColumn(TColumnSchema().Name("Sint64Field").Type(ToTypeV3(EValueType::VT_INT64, false)))
+ .AddColumn(TColumnSchema().Name("Fixed32Field").Type(ToTypeV3(EValueType::VT_UINT32, false)))
+ .AddColumn(TColumnSchema().Name("Fixed64Field").Type(ToTypeV3(EValueType::VT_UINT64, false)))
+ .AddColumn(TColumnSchema().Name("Sfixed32Field").Type(ToTypeV3(EValueType::VT_INT32, false)))
+ .AddColumn(TColumnSchema().Name("Sfixed64Field").Type(ToTypeV3(EValueType::VT_INT64, false)))
+ .AddColumn(TColumnSchema().Name("BoolField").Type(ToTypeV3(EValueType::VT_BOOLEAN, false)))
+ .AddColumn(TColumnSchema().Name("EnumField").Type(ToTypeV3(EValueType::VT_STRING, false))));
+}
+
+TEST(TProtoSchemaSimpleTest, TOneOf)
+{
+ const auto schema = CreateTableSchema<NUnitTesting::TOneOf>();
+
+ ASSERT_SERIALIZABLES_EQ(schema, TTableSchema()
+ .AddColumn(TColumnSchema().Name("DoubleField").Type(ToTypeV3(EValueType::VT_DOUBLE, false)))
+ .AddColumn(TColumnSchema().Name("Int32Field").Type(ToTypeV3(EValueType::VT_INT32, false)))
+ .AddColumn(TColumnSchema().Name("BoolField").Type(ToTypeV3(EValueType::VT_BOOLEAN, false))));
+}
+
+TEST(TProtoSchemaSimpleTest, TWithRequired)
+{
+ const auto schema = CreateTableSchema<NUnitTesting::TWithRequired>();
+
+ ASSERT_SERIALIZABLES_EQ(schema, TTableSchema()
+ .AddColumn(TColumnSchema().Name("RequiredField").Type(ToTypeV3(EValueType::VT_STRING, true)))
+ .AddColumn(TColumnSchema().Name("NotRequiredField").Type(ToTypeV3(EValueType::VT_STRING, false))));
+}
+
+TEST(TProtoSchemaSimpleTest, TAggregated)
+{
+ const auto schema = CreateTableSchema<NUnitTesting::TAggregated>();
+
+ EXPECT_EQ(6u, schema.Columns().size());
+ ASSERT_SERIALIZABLES_EQ(schema, TTableSchema()
+ .AddColumn(TColumnSchema().Name("StringField").Type(ToTypeV3(EValueType::VT_STRING, false)))
+ .AddColumn(TColumnSchema().Name("BytesField").Type(ToTypeV3(EValueType::VT_STRING, false)))
+ .AddColumn(TColumnSchema().Name("NestedField").Type(ToTypeV3(EValueType::VT_STRING, false)))
+ .AddColumn(TColumnSchema().Name("NestedRepeatedField").Type(ToTypeV3(EValueType::VT_STRING, false)))
+ .AddColumn(TColumnSchema().Name("NestedOneOfField").Type(ToTypeV3(EValueType::VT_STRING, false)))
+ .AddColumn(TColumnSchema().Name("NestedRecursiveField").Type(ToTypeV3(EValueType::VT_STRING, false))));
+}
+
+TEST(TProtoSchemaSimpleTest, TAliased)
+{
+ const auto schema = CreateTableSchema<NUnitTesting::TAliased>();
+
+ ASSERT_SERIALIZABLES_EQ(schema, TTableSchema()
+ .AddColumn(TColumnSchema().Name("key").Type(ToTypeV3(EValueType::VT_INT32, false)))
+ .AddColumn(TColumnSchema().Name("subkey").Type(ToTypeV3(EValueType::VT_DOUBLE, false)))
+ .AddColumn(TColumnSchema().Name("Data").Type(ToTypeV3(EValueType::VT_STRING, false))));
+}
+
+TEST(TProtoSchemaSimpleTest, SortColumns)
+{
+ const TSortColumns keys = {"key", "subkey"};
+
+ const auto schema = CreateTableSchema<NUnitTesting::TAliased>(keys);
+
+ ASSERT_SERIALIZABLES_EQ(schema, TTableSchema()
+ .AddColumn(TColumnSchema()
+ .Name("key")
+ .Type(ToTypeV3(EValueType::VT_INT32, false))
+ .SortOrder(ESortOrder::SO_ASCENDING))
+ .AddColumn(TColumnSchema()
+ .Name("subkey")
+ .Type(ToTypeV3(EValueType::VT_DOUBLE, false))
+ .SortOrder(ESortOrder::SO_ASCENDING))
+ .AddColumn(TColumnSchema().Name("Data").Type(ToTypeV3(EValueType::VT_STRING, false))));
+}
+
+TEST(TProtoSchemaSimpleTest, SortColumnsReordered)
+{
+ const TSortColumns keys = {"subkey"};
+
+ const auto schema = CreateTableSchema<NUnitTesting::TAliased>(keys);
+
+ ASSERT_SERIALIZABLES_EQ(schema, TTableSchema()
+ .AddColumn(TColumnSchema()
+ .Name("subkey")
+ .Type(ToTypeV3(EValueType::VT_DOUBLE, false))
+ .SortOrder(ESortOrder::SO_ASCENDING))
+ .AddColumn(TColumnSchema().Name("key").Type(ToTypeV3(EValueType::VT_INT32, false)))
+ .AddColumn(TColumnSchema().Name("Data").Type(ToTypeV3(EValueType::VT_STRING, false))));
+}
+
+TEST(TProtoSchemaSimpleTest, SortColumnsInvalid)
+{
+ EXPECT_THROW(CreateTableSchema<NUnitTesting::TAliased>({"subkey", "subkey"}), yexception);
+ EXPECT_THROW(CreateTableSchema<NUnitTesting::TAliased>({"key", "junk"}), yexception);
+}
+
+TEST(TProtoSchemaSimpleTest, KeepFieldsWithoutExtensionTrue)
+{
+ const auto schema = CreateTableSchema<NUnitTesting::TAliased>({}, true);
+ EXPECT_TRUE(IsFieldPresent(schema, "key"));
+ EXPECT_TRUE(IsFieldPresent(schema, "subkey"));
+ EXPECT_TRUE(IsFieldPresent(schema, "Data"));
+ EXPECT_TRUE(schema.Strict());
+}
+
+TEST(TProtoSchemaSimpleTest, KeepFieldsWithoutExtensionFalse)
+{
+ const auto schema = CreateTableSchema<NUnitTesting::TAliased>({}, false);
+ EXPECT_TRUE(IsFieldPresent(schema, "key"));
+ EXPECT_TRUE(IsFieldPresent(schema, "subkey"));
+ EXPECT_TRUE(!IsFieldPresent(schema, "Data"));
+ EXPECT_TRUE(schema.Strict());
+}
+
+TEST(TProtoSchemaSimpleTest, ProtobufTypeOption)
+{
+ const auto schema = CreateTableSchema<NUnitTesting::TWithTypeOptions>({});
+
+ ASSERT_SERIALIZABLES_EQ(schema, TTableSchema()
+ .Strict(false)
+ .AddColumn(TColumnSchema().Name("ColorIntField").Type(ToTypeV3(EValueType::VT_INT64, false)))
+ .AddColumn(TColumnSchema().Name("ColorStringField").Type(ToTypeV3(EValueType::VT_STRING, false)))
+ .AddColumn(TColumnSchema().Name("AnyField").Type(ToTypeV3(EValueType::VT_ANY, false)))
+ .AddColumn(TColumnSchema().Name("EmbeddedField").Type(
+ NTi::Optional(NTi::Struct({
+ {"ColorIntField", ToTypeV3(EValueType::VT_INT64, false)},
+ {"ColorStringField", ToTypeV3(EValueType::VT_STRING, false)},
+ {"AnyField", ToTypeV3(EValueType::VT_ANY, false)}}))))
+ .AddColumn(TColumnSchema().Name("RepeatedEnumIntField").Type(NTi::List(NTi::Int64()))));
+}
+
+TEST(TProtoSchemaSimpleTest, ProtobufTypeOption_TypeMismatch)
+{
+ EXPECT_THROW(
+ CreateTableSchema<NUnitTesting::TWithTypeOptions_TypeMismatch_EnumInt>({}),
+ yexception);
+ EXPECT_THROW(
+ CreateTableSchema<NUnitTesting::TWithTypeOptions_TypeMismatch_EnumString>({}),
+ yexception);
+ EXPECT_THROW(
+ CreateTableSchema<NUnitTesting::TWithTypeOptions_TypeMismatch_Any>({}),
+ yexception);
+ EXPECT_THROW(
+ CreateTableSchema<NUnitTesting::TWithTypeOptions_TypeMismatch_OtherColumns>({}),
+ yexception);
+}
+
+NTi::TTypePtr GetUrlRowType_ColumnNames(bool required)
+{
+ static const NTi::TTypePtr type = NTi::Struct({
+ {"Host_ColumnName", ToTypeV3(EValueType::VT_STRING, false)},
+ {"Path_KeyColumnName", ToTypeV3(EValueType::VT_STRING, false)},
+ {"HttpCode", ToTypeV3(EValueType::VT_INT32, false)},
+ });
+ return required ? type : NTi::TTypePtr(NTi::Optional(type));
+}
+
+TEST(TProtoSchemaComplexTest, TRepeated)
+{
+ EXPECT_THROW(CreateTableSchema<NUnitTesting::TRepeated>(), yexception);
+
+ const auto schema = CreateTableSchema<NUnitTesting::TRepeatedYtMode>();
+ ASSERT_SERIALIZABLES_EQ(schema, TTableSchema()
+ .AddColumn(TColumnSchema().Name("Int32Field").Type(NTi::List(ToTypeV3(EValueType::VT_INT32, true)))));
+}
+
+TEST(TProtoSchemaComplexTest, TRepeatedOptionalList)
+{
+ const auto schema = CreateTableSchema<NUnitTesting::TOptionalList>();
+ auto type = NTi::Optional(NTi::List(NTi::Int64()));
+ ASSERT_SERIALIZABLES_EQ(schema, TTableSchema()
+ .AddColumn(TColumnSchema().Name("OptionalListInt64").TypeV3(type)));
+}
+
+NTi::TTypePtr GetUrlRowType(bool required)
+{
+ static const NTi::TTypePtr structType = NTi::Struct({
+ {"Host", ToTypeV3(EValueType::VT_STRING, false)},
+ {"Path", ToTypeV3(EValueType::VT_STRING, false)},
+ {"HttpCode", ToTypeV3(EValueType::VT_INT32, false)}});
+ return required ? structType : NTi::TTypePtr(NTi::Optional(structType));
+}
+
+TEST(TProtoSchemaComplexTest, TRowFieldSerializationOption)
+{
+ const auto schema = CreateTableSchema<NUnitTesting::TRowFieldSerializationOption>();
+
+ ASSERT_SERIALIZABLES_EQ(schema, TTableSchema()
+ .AddColumn(TColumnSchema().Name("UrlRow_1").Type(GetUrlRowType(false)))
+ .AddColumn(TColumnSchema().Name("UrlRow_2").Type(ToTypeV3(EValueType::VT_STRING, false))));
+}
+
+TEST(TProtoSchemaComplexTest, TRowMessageSerializationOption)
+{
+ const auto schema = CreateTableSchema<NUnitTesting::TRowMessageSerializationOption>();
+
+ ASSERT_SERIALIZABLES_EQ(schema, TTableSchema()
+ .AddColumn(TColumnSchema().Name("UrlRow_1").Type(GetUrlRowType(false)))
+ .AddColumn(TColumnSchema().Name("UrlRow_2").Type(GetUrlRowType(false))));
+}
+
+TEST(TProtoSchemaComplexTest, TRowMixedSerializationOptions)
+{
+ const auto schema = CreateTableSchema<NUnitTesting::TRowMixedSerializationOptions>();
+
+ ASSERT_SERIALIZABLES_EQ(schema, TTableSchema()
+ .AddColumn(TColumnSchema().Name("UrlRow_1").Type(GetUrlRowType(false)))
+ .AddColumn(TColumnSchema().Name("UrlRow_2").Type(ToTypeV3(EValueType::VT_STRING, false))));
+}
+
+TEST(TProtoSchemaComplexTest, TRowMixedSerializationOptions_ColumnNames)
+{
+ const auto schema = CreateTableSchema<NUnitTesting::TRowMixedSerializationOptions_ColumnNames>();
+
+ ASSERT_SERIALIZABLES_EQ(schema, TTableSchema()
+ .AddColumn(TColumnSchema().Name("UrlRow_1").Type(GetUrlRowType_ColumnNames(false)))
+ .AddColumn(TColumnSchema().Name("UrlRow_2").Type(ToTypeV3(EValueType::VT_STRING, false))));
+}
+
+TEST(TProtoSchemaComplexTest, NoOptionInheritance)
+{
+ auto deepestEmbedded = NTi::Optional(NTi::Struct({{"x", ToTypeV3(EValueType::VT_INT64, false)}}));
+
+ const auto schema = CreateTableSchema<NUnitTesting::TNoOptionInheritance>();
+
+ ASSERT_SERIALIZABLES_EQ(schema, TTableSchema()
+ .AddColumn(TColumnSchema()
+ .Name("EmbeddedYt_YtOption")
+ .Type(NTi::Optional(NTi::Struct({{"embedded", deepestEmbedded}}))))
+ .AddColumn(TColumnSchema().Name("EmbeddedYt_ProtobufOption").Type(ToTypeV3(EValueType::VT_STRING, false)))
+ .AddColumn(TColumnSchema().Name("EmbeddedYt_NoOption").Type(ToTypeV3(EValueType::VT_STRING, false)))
+ .AddColumn(TColumnSchema()
+ .Name("EmbeddedProtobuf_YtOption")
+ .Type(NTi::Optional(NTi::Struct({{"embedded", ToTypeV3(EValueType::VT_STRING, false)}}))))
+ .AddColumn(TColumnSchema().Name("EmbeddedProtobuf_ProtobufOption").Type(ToTypeV3(EValueType::VT_STRING, false)))
+ .AddColumn(TColumnSchema().Name("EmbeddedProtobuf_NoOption").Type(ToTypeV3(EValueType::VT_STRING, false)))
+ .AddColumn(TColumnSchema()
+ .Name("Embedded_YtOption")
+ .Type(NTi::Optional(NTi::Struct({{"embedded", ToTypeV3(EValueType::VT_STRING, false)}}))))
+ .AddColumn(TColumnSchema().Name("Embedded_ProtobufOption").Type(ToTypeV3(EValueType::VT_STRING, false)))
+ .AddColumn(TColumnSchema().Name("Embedded_NoOption").Type(ToTypeV3(EValueType::VT_STRING, false))));
+}
+
+TEST(TProtoSchemaComplexTest, Cyclic)
+{
+ EXPECT_THROW(CreateTableSchema<NUnitTesting::TCyclic>(), TApiUsageError);
+ EXPECT_THROW(CreateTableSchema<NUnitTesting::TCyclic::TA>(), TApiUsageError);
+ EXPECT_THROW(CreateTableSchema<NUnitTesting::TCyclic::TB>(), TApiUsageError);
+ EXPECT_THROW(CreateTableSchema<NUnitTesting::TCyclic::TC>(), TApiUsageError);
+ EXPECT_THROW(CreateTableSchema<NUnitTesting::TCyclic::TD>(), TApiUsageError);
+
+ ASSERT_SERIALIZABLES_EQ(
+ TTableSchema().AddColumn(
+ TColumnSchema().Name("d").TypeV3(NTi::Optional(NTi::String()))),
+ CreateTableSchema<NUnitTesting::TCyclic::TE>());
+}
+
+TEST(TProtoSchemaComplexTest, FieldSortOrder)
+{
+ const auto schema = CreateTableSchema<NUnitTesting::TFieldSortOrder>();
+
+ auto byFieldNumber = NTi::Optional(NTi::Struct({
+ {"z", NTi::Optional(NTi::Bool())},
+ {"x", NTi::Optional(NTi::Int64())},
+ {"y", NTi::Optional(NTi::String())},
+ }));
+
+ ASSERT_SERIALIZABLES_EQ(schema, TTableSchema()
+ .AddColumn(TColumnSchema().Name("EmbeddedDefault").Type(byFieldNumber))
+ .AddColumn(TColumnSchema()
+ .Name("EmbeddedAsInProtoFile")
+ .Type(NTi::Optional(NTi::Struct({
+ {"x", NTi::Optional(NTi::Int64())},
+ {"y", NTi::Optional(NTi::String())},
+ {"z", NTi::Optional(NTi::Bool())},
+ }))))
+ .AddColumn(TColumnSchema().Name("EmbeddedByFieldNumber").Type(byFieldNumber)));
+}
+
+TEST(TProtoSchemaComplexTest, Map)
+{
+ const auto schema = CreateTableSchema<NUnitTesting::TWithMap>();
+
+ auto createKeyValueStruct = [] (NTi::TTypePtr key, NTi::TTypePtr value) {
+ return NTi::List(NTi::Struct({
+ {"key", NTi::Optional(key)},
+ {"value", NTi::Optional(value)},
+ }));
+ };
+
+ auto embedded = NTi::Struct({
+ {"x", NTi::Optional(NTi::Int64())},
+ {"y", NTi::Optional(NTi::String())},
+ });
+
+ ASSERT_SERIALIZABLES_EQ(schema, TTableSchema()
+ .AddColumn(TColumnSchema()
+ .Name("MapDefault")
+ .Type(createKeyValueStruct(NTi::Int64(), NTi::String())))
+ .AddColumn(TColumnSchema()
+ .Name("MapListOfStructsLegacy")
+ .Type(createKeyValueStruct(NTi::Int64(), NTi::String())))
+ .AddColumn(TColumnSchema()
+ .Name("MapListOfStructs")
+ .Type(createKeyValueStruct(NTi::Int64(), embedded)))
+ .AddColumn(TColumnSchema()
+ .Name("MapOptionalDict")
+ .Type(NTi::Optional(NTi::Dict(NTi::Int64(), embedded))))
+ .AddColumn(TColumnSchema()
+ .Name("MapDict")
+ .Type(NTi::Dict(NTi::Int64(), embedded))));
+}
+
+TEST(TProtoSchemaComplexTest, Oneof)
+{
+ const auto schema = CreateTableSchema<NUnitTesting::TWithOneof>();
+
+ auto embedded = NTi::Struct({
+ {"Oneof", NTi::Optional(NTi::Variant(NTi::Struct({
+ {"x", NTi::Int64()},
+ {"y", NTi::String()},
+ })))},
+ });
+
+ auto createType = [&] (TString oneof2Name) {
+ return NTi::Optional(NTi::Struct({
+ {"field", NTi::Optional(NTi::String())},
+ {oneof2Name, NTi::Optional(NTi::Variant(NTi::Struct({
+ {"x2", NTi::Int64()},
+ {"y2", NTi::String()},
+ {"z2", embedded},
+ })))},
+ {"y1", NTi::Optional(NTi::String())},
+ {"z1", NTi::Optional(embedded)},
+ {"x1", NTi::Optional(NTi::Int64())},
+ }));
+ };
+
+ ASSERT_SERIALIZABLES_EQ(schema, TTableSchema()
+ .AddColumn(TColumnSchema()
+ .Name("DefaultSeparateFields")
+ .Type(createType("variant_field_name")))
+ .AddColumn(TColumnSchema()
+ .Name("NoDefault")
+ .Type(createType("Oneof2")))
+ .AddColumn(TColumnSchema()
+ .Name("SerializationProtobuf")
+ .Type(NTi::Optional(NTi::Struct({
+ {"y1", NTi::Optional(NTi::String())},
+ {"x1", NTi::Optional(NTi::Int64())},
+ {"z1", NTi::Optional(NTi::String())},
+ }))))
+ .AddColumn(TColumnSchema()
+ .Name("TopLevelOneof")
+ .Type(
+ NTi::Optional(
+ NTi::Variant(NTi::Struct({
+ {"MemberOfTopLevelOneof", NTi::Int64()}
+ }))
+ )
+ ))
+ );
+}
+
+TEST(TProtoSchemaComplexTest, Embedded)
+{
+ const auto schema = CreateTableSchema<NUnitTesting::TEmbeddingMessage>();
+ ASSERT_SERIALIZABLES_EQ(schema, TTableSchema()
+ .Strict(false)
+ .AddColumn(TColumnSchema().Name("embedded2_num").Type(NTi::Optional(NTi::Uint64())))
+ .AddColumn(TColumnSchema().Name("embedded2_struct").Type(NTi::Optional(NTi::Struct({
+ {"float1", NTi::Optional(NTi::Double())},
+ {"string1", NTi::Optional(NTi::String())},
+ }))))
+ .AddColumn(TColumnSchema().Name("embedded2_repeated").Type(NTi::List(NTi::String())))
+ .AddColumn(TColumnSchema().Name("embedded_num").Type(NTi::Optional(NTi::Uint64())))
+ .AddColumn(TColumnSchema().Name("embedded_extra_field").Type(NTi::Optional(NTi::String())))
+ .AddColumn(TColumnSchema().Name("variant").Type(NTi::Optional(NTi::Variant(NTi::Struct({
+ {"str_variant", NTi::String()},
+ {"uint_variant", NTi::Uint64()},
+ })))))
+ .AddColumn(TColumnSchema().Name("num").Type(NTi::Optional(NTi::Uint64())))
+ .AddColumn(TColumnSchema().Name("extra_field").Type(NTi::Optional(NTi::String())))
+ );
+}
+
+TEST(TProtoSchemaProto3Test, TWithOptional)
+{
+ const auto schema = CreateTableSchema<NTestingProto3::TWithOptional>();
+ ASSERT_SERIALIZABLES_EQ(schema, TTableSchema()
+ .AddColumn(TColumnSchema()
+ .Name("x").Type(NTi::Optional(NTi::Int64()))
+ )
+ );
+}
+
+TEST(TProtoSchemaProto3Test, TWithOptionalMessage)
+{
+ const auto schema = CreateTableSchema<NTestingProto3::TWithOptionalMessage>();
+ ASSERT_SERIALIZABLES_EQ(schema, TTableSchema()
+ .AddColumn(TColumnSchema()
+ .Name("x").Type(
+ NTi::Optional(
+ NTi::Struct({{"x", NTi::Optional(NTi::Int64())}})
+ )
+ )
+ )
+ );
+}
diff --git a/yt/cpp/mapreduce/interface/protobuf_table_schema_ut.proto b/yt/cpp/mapreduce/interface/ut/protobuf_table_schema_ut.proto
index da1e48f691..da1e48f691 100644
--- a/yt/cpp/mapreduce/interface/protobuf_table_schema_ut.proto
+++ b/yt/cpp/mapreduce/interface/ut/protobuf_table_schema_ut.proto
diff --git a/yt/cpp/mapreduce/interface/ut/serialize_ut.cpp b/yt/cpp/mapreduce/interface/ut/serialize_ut.cpp
new file mode 100644
index 0000000000..0acec154d4
--- /dev/null
+++ b/yt/cpp/mapreduce/interface/ut/serialize_ut.cpp
@@ -0,0 +1,46 @@
+#include <yt/cpp/mapreduce/interface/serialize.h>
+#include <yt/cpp/mapreduce/interface/common.h>
+
+#include <library/cpp/yson/node/node_builder.h>
+
+#include <library/cpp/testing/gtest/gtest.h>
+
+#include <util/generic/serialized_enum.h>
+
+using namespace NYT;
+
+TEST(TSerializationTest, TableSchema)
+{
+ auto schema = TTableSchema()
+ .AddColumn(TColumnSchema().Name("a").Type(EValueType::VT_STRING).SortOrder(SO_ASCENDING))
+ .AddColumn(TColumnSchema().Name("b").Type(EValueType::VT_UINT64))
+ .AddColumn(TColumnSchema().Name("c").Type(EValueType::VT_INT64, true));
+
+ auto schemaNode = schema.ToNode();
+ EXPECT_TRUE(schemaNode.IsList());
+ EXPECT_EQ(schemaNode.Size(), 3u);
+
+
+ EXPECT_EQ(schemaNode[0]["name"], "a");
+ EXPECT_EQ(schemaNode[0]["type"], "string");
+ EXPECT_EQ(schemaNode[0]["required"], false);
+ EXPECT_EQ(schemaNode[0]["sort_order"], "ascending");
+
+ EXPECT_EQ(schemaNode[1]["name"], "b");
+ EXPECT_EQ(schemaNode[1]["type"], "uint64");
+ EXPECT_EQ(schemaNode[1]["required"], false);
+
+ EXPECT_EQ(schemaNode[2]["name"], "c");
+ EXPECT_EQ(schemaNode[2]["type"], "int64");
+ EXPECT_EQ(schemaNode[2]["required"], true);
+}
+
+TEST(TSerializationTest, ValueTypeSerialization)
+{
+ for (const auto value : GetEnumAllValues<EValueType>()) {
+ TNode serialized = NYT::NDetail::ToString(value);
+ EValueType deserialized;
+ Deserialize(deserialized, serialized);
+ EXPECT_EQ(value, deserialized);
+ }
+}
diff --git a/yt/cpp/mapreduce/interface/ut/ya.make b/yt/cpp/mapreduce/interface/ut/ya.make
index 0219e6430c..9e92931b5d 100644
--- a/yt/cpp/mapreduce/interface/ut/ya.make
+++ b/yt/cpp/mapreduce/interface/ut/ya.make
@@ -1,4 +1,4 @@
-UNITTEST_FOR(yt/cpp/mapreduce/interface)
+GTEST()
SRCS(
common_ut.cpp
@@ -18,8 +18,9 @@ SRCS(
PEERDIR(
contrib/libs/protobuf
- library/cpp/testing/unittest
+ library/cpp/testing/gtest
yt/yt_proto/yt/formats
+ yt/cpp/mapreduce/interface
)
END()
diff --git a/yt/cpp/mapreduce/interface/ya.make b/yt/cpp/mapreduce/interface/ya.make
index f9bc3e172c..a4cbd8951a 100644
--- a/yt/cpp/mapreduce/interface/ya.make
+++ b/yt/cpp/mapreduce/interface/ya.make
@@ -45,4 +45,6 @@ GENERATE_ENUM_SERIALIZATION(protobuf_format.h)
END()
-RECURSE_FOR_TESTS(ut)
+RECURSE_FOR_TESTS(
+ ut
+)
diff --git a/yt/cpp/mapreduce/io/ut/end_of_stream_ut.cpp b/yt/cpp/mapreduce/io/ut/end_of_stream_ut.cpp
new file mode 100644
index 0000000000..3642aa1e45
--- /dev/null
+++ b/yt/cpp/mapreduce/io/ut/end_of_stream_ut.cpp
@@ -0,0 +1,94 @@
+#include <yt/cpp/mapreduce/io/node_table_reader.h>
+
+#include <library/cpp/testing/gtest/gtest.h>
+
+using namespace NYT;
+
+////////////////////////////////////////////////////////////////////
+
+class TStringRawTableReader
+ : public TRawTableReader
+{
+public:
+ TStringRawTableReader(const TString& string)
+ : String_(string)
+ , Stream_(String_)
+ { }
+
+ bool Retry(const TMaybe<ui32>&, const TMaybe<ui64>&, const std::exception_ptr&) override
+ {
+ return false;
+ }
+
+ void ResetRetries() override
+ { }
+
+ bool HasRangeIndices() const override
+ {
+ return false;
+ }
+
+private:
+ size_t DoRead(void* buf, size_t len) override
+ {
+ return Stream_.Read(buf, len);
+ }
+
+private:
+ const TString String_;
+ TStringStream Stream_;
+};
+
+////////////////////////////////////////////////////////////////////
+
+class TEndOfStreamTest
+ : public ::testing::TestWithParam<bool>
+{ };
+
+TEST_P(TEndOfStreamTest, Eos)
+{
+ bool addEos = GetParam();
+ auto proxy = ::MakeIntrusive<TStringRawTableReader>(TString::Join(
+ "{a=13;b = \"string\"}; {c = {d=12}};",
+ "<key_switch=%true>#; {e = 42};",
+ addEos ? "<end_of_stream=%true>#" : ""
+ ));
+
+ TNodeTableReader reader(proxy);
+ TVector<TNode> expectedRows = {TNode()("a", 13)("b", "string"), TNode()("c", TNode()("d", 12))};
+ for (const auto& expectedRow : expectedRows) {
+ EXPECT_TRUE(reader.IsValid());
+ EXPECT_TRUE(!reader.IsEndOfStream());
+ EXPECT_TRUE(!reader.IsRawReaderExhausted());
+ EXPECT_EQ(reader.GetRow(), expectedRow);
+ reader.Next();
+ }
+
+ EXPECT_TRUE(!reader.IsValid());
+ EXPECT_TRUE(!reader.IsEndOfStream());
+ EXPECT_TRUE(!reader.IsRawReaderExhausted());
+
+ reader.NextKey();
+ reader.Next();
+ expectedRows = {TNode()("e", 42)};
+ for (const auto& expectedRow : expectedRows) {
+ EXPECT_TRUE(reader.IsValid());
+ EXPECT_TRUE(!reader.IsEndOfStream());
+ EXPECT_TRUE(!reader.IsRawReaderExhausted());
+ EXPECT_EQ(reader.GetRow(), expectedRow);
+ reader.Next();
+ }
+
+ EXPECT_TRUE(!reader.IsValid());
+ if (addEos) {
+ EXPECT_TRUE(reader.IsEndOfStream());
+ } else {
+ EXPECT_TRUE(!reader.IsEndOfStream());
+ }
+ EXPECT_TRUE(reader.IsRawReaderExhausted());
+}
+
+INSTANTIATE_TEST_SUITE_P(WithEos, TEndOfStreamTest, ::testing::Values(true));
+INSTANTIATE_TEST_SUITE_P(WithoutEos, TEndOfStreamTest, ::testing::Values(false));
+
+////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/io/ut/readers_ut.cpp b/yt/cpp/mapreduce/io/ut/readers_ut.cpp
new file mode 100644
index 0000000000..86d06629a2
--- /dev/null
+++ b/yt/cpp/mapreduce/io/ut/readers_ut.cpp
@@ -0,0 +1,232 @@
+#include <yt/cpp/mapreduce/io/node_table_reader.h>
+#include <yt/cpp/mapreduce/io/proto_table_reader.h>
+#include <yt/cpp/mapreduce/io/skiff_table_reader.h>
+#include <yt/cpp/mapreduce/io/stream_table_reader.h>
+#include <yt/cpp/mapreduce/io/yamr_table_reader.h>
+
+#include <yt/cpp/mapreduce/io/ut/ut_row.pb.h>
+
+#include <yt/cpp/mapreduce/skiff/checked_parser.h>
+#include <yt/cpp/mapreduce/skiff/skiff_schema.h>
+
+#include <library/cpp/yson/node/node_io.h>
+
+#include <library/cpp/testing/gtest/gtest.h>
+
+using namespace NYT;
+using namespace NYT::NDetail;
+using namespace NSkiff;
+
+////////////////////////////////////////////////////////////////////
+
+class TRetryEmulatingRawTableReader
+ : public TRawTableReader
+{
+public:
+ TRetryEmulatingRawTableReader(const TString& string)
+ : String_(string)
+ , Stream_(String_)
+ { }
+
+ bool Retry(
+ const TMaybe<ui32>& /*rangeIndex*/,
+ const TMaybe<ui64>& /*rowIndex*/,
+ const std::exception_ptr& /*error*/) override
+ {
+ if (RetriesLeft_ == 0) {
+ return false;
+ }
+ Stream_ = TStringStream(String_);
+ --RetriesLeft_;
+ return true;
+ }
+
+ void ResetRetries() override
+ {
+ RetriesLeft_ = 10;
+ }
+
+ bool HasRangeIndices() const override
+ {
+ return false;
+ }
+
+private:
+ size_t DoRead(void* buf, size_t len) override
+ {
+ switch (DoReadCallCount_++) {
+ case 0:
+ return Stream_.Read(buf, std::min(len, String_.size() / 2));
+ case 1:
+ ythrow yexception() << "Just wanted to test you, first fail";
+ case 2:
+ ythrow yexception() << "Just wanted to test you, second fail";
+ default:
+ return Stream_.Read(buf, len);
+ }
+ }
+
+private:
+ const TString String_;
+ TStringStream Stream_;
+ int RetriesLeft_ = 10;
+ int DoReadCallCount_ = 0;
+};
+
+////////////////////////////////////////////////////////////////////
+
+TEST(TReadersTest, YsonGood)
+{
+ auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>("{a=13;b = \"string\"}; {c = {d=12}}");
+
+ TNodeTableReader reader(proxy);
+ TVector<TNode> expectedRows = {TNode()("a", 13)("b", "string"), TNode()("c", TNode()("d", 12))};
+ for (const auto& expectedRow : expectedRows) {
+ EXPECT_TRUE(reader.IsValid());
+ EXPECT_TRUE(!reader.IsRawReaderExhausted());
+ EXPECT_EQ(reader.GetRow(), expectedRow);
+ reader.Next();
+ }
+ EXPECT_TRUE(!reader.IsValid());
+ EXPECT_TRUE(reader.IsRawReaderExhausted());
+}
+
+TEST(TReadersTest, YsonBad)
+{
+ auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>("{a=13;-b := \"string\"}; {c = {d=12}}");
+ EXPECT_THROW(TNodeTableReader(proxy).GetRow(), yexception);
+}
+
+TEST(TReadersTest, SkiffGood)
+{
+ const char arr[] = "\x00\x00" "\x94\x88\x01\x00\x00\x00\x00\x00" "\x06\x00\x00\x00""foobar" "\x01"
+ "\x00\x00" "\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF" "\x03\x00\x00\x00""abc" "\x00";
+ auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>(TString(arr, sizeof(arr) - 1));
+
+ TSkiffSchemaPtr schema = CreateVariant16Schema({
+ CreateTupleSchema({
+ CreateSimpleTypeSchema(EWireType::Int64)->SetName("a"),
+ CreateSimpleTypeSchema(EWireType::String32)->SetName("b"),
+ CreateSimpleTypeSchema(EWireType::Boolean)->SetName("c")
+ })
+ });
+
+ TSkiffTableReader reader(proxy, schema);
+ TVector<TNode> expectedRows = {
+ TNode()("a", 100500)("b", "foobar")("c", true),
+ TNode()("a", -1)("b", "abc")("c", false),
+ };
+ for (const auto& expectedRow : expectedRows) {
+ EXPECT_TRUE(reader.IsValid());
+ EXPECT_TRUE(!reader.IsRawReaderExhausted());
+ EXPECT_EQ(reader.GetRow(), expectedRow);
+ reader.Next();
+ }
+ EXPECT_TRUE(!reader.IsValid());
+ EXPECT_TRUE(reader.IsRawReaderExhausted());
+}
+
+TEST(TReadersTest, SkiffExtraColumns)
+{
+ const char arr[] = "\x00\x00" "\x7B\x00\x00\x00\x00\x00\x00\x00";
+ auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>(TString(arr, sizeof(arr) - 1));
+
+ TSkiffSchemaPtr schema = CreateVariant16Schema({
+ CreateTupleSchema({
+ CreateSimpleTypeSchema(EWireType::Uint64)->SetName("$timestamp")
+ })
+ });
+
+ TSkiffTableReader reader(proxy, schema);
+ TVector<TNode> expectedRows = {
+ TNode()("$timestamp", 123u),
+ };
+ for (const auto& expectedRow : expectedRows) {
+ EXPECT_TRUE(reader.IsValid());
+ EXPECT_TRUE(!reader.IsRawReaderExhausted());
+ EXPECT_EQ(reader.GetRow(), expectedRow);
+ reader.Next();
+ }
+ EXPECT_TRUE(!reader.IsValid());
+ EXPECT_TRUE(reader.IsRawReaderExhausted());
+}
+
+TEST(TReadersTest, SkiffBad)
+{
+ const char arr[] = "\x00\x00" "\x94\x88\x01\x00\x00\x00\x00\x00" "\xFF\x00\x00\x00""foobar" "\x01";
+ auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>(TString(arr, sizeof(arr) - 1));
+
+ TSkiffSchemaPtr schema = CreateVariant16Schema({
+ CreateTupleSchema({
+ CreateSimpleTypeSchema(EWireType::Int64)->SetName("a"),
+ CreateSimpleTypeSchema(EWireType::String32)->SetName("b"),
+ CreateSimpleTypeSchema(EWireType::Boolean)->SetName("c")
+ })
+ });
+
+ EXPECT_THROW(TSkiffTableReader(proxy, schema).GetRow(), yexception);
+}
+
+TEST(TReadersTest, SkiffBadFormat)
+{
+ const char arr[] = "\x00\x00" "\x12" "\x23\x34\x00\x00";
+ auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>(TString(arr, sizeof(arr) - 1));
+
+ TSkiffSchemaPtr schema = CreateVariant16Schema({
+ CreateTupleSchema({
+ CreateVariant8Schema({
+ CreateSimpleTypeSchema(EWireType::Nothing),
+ CreateSimpleTypeSchema(EWireType::Int32)
+ })
+ })
+ });
+
+ EXPECT_THROW_MESSAGE_HAS_SUBSTR(
+ TSkiffTableReader(proxy, schema).GetRow(),
+ yexception,
+ "Tag for 'variant8<nothing,int32>' expected to be 0 or 1");
+}
+
+TEST(TReadersTest, ProtobufGood)
+{
+ using NYT::NTesting::TRow;
+
+ const char arr[] = "\x13\x00\x00\x00" "\x0A""\x06""foobar" "\x10""\x0F" "\x19""\x94\x88\x01\x00\x00\x00\x00\x00"
+ "\x10\x00\x00\x00" "\x0A""\x03""abc" "\x10""\x1F" "\x19""\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF";
+ auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>(TString(arr, sizeof(arr) - 1));
+
+ TLenvalProtoTableReader reader(proxy, {TRow::descriptor()});
+ TRow row1, row2;
+ row1.SetString("foobar");
+ row1.SetInt32(15);
+ row1.SetFixed64(100500);
+
+ row2.SetString("abc");
+ row2.SetInt32(31);
+ row2.SetFixed64(-1);
+
+ TVector<TRow> expectedRows = {row1, row2};
+ for (const auto& expectedRow : expectedRows) {
+ TRow row;
+ EXPECT_TRUE(reader.IsValid());
+ EXPECT_TRUE(!reader.IsRawReaderExhausted());
+ reader.ReadRow(&row);
+ EXPECT_EQ(row.GetString(), expectedRow.GetString());
+ EXPECT_EQ(row.GetInt32(), expectedRow.GetInt32());
+ EXPECT_EQ(row.GetFixed64(), expectedRow.GetFixed64());
+ reader.Next();
+ }
+ EXPECT_TRUE(!reader.IsValid());
+ EXPECT_TRUE(reader.IsRawReaderExhausted());
+}
+
+TEST(TReadersTest, ProtobufBad)
+{
+ const char arr[] = "\x13\x00\x00\x00" "\x0F""\x06""foobar" "\x10""\x0F" "\x19""\x94\x88\x01\x00\x00\x00\x00\x00";
+ auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>(TString(arr, sizeof(arr) - 1));
+
+ NYT::NTesting::TRow row;
+ EXPECT_THROW(TLenvalProtoTableReader(proxy, { NYT::NTesting::TRow::descriptor() }).ReadRow(&row), yexception);
+}
+
+////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/io/ut/ut_row.proto b/yt/cpp/mapreduce/io/ut/ut_row.proto
new file mode 100644
index 0000000000..6a9649e4c6
--- /dev/null
+++ b/yt/cpp/mapreduce/io/ut/ut_row.proto
@@ -0,0 +1,7 @@
+package NYT.NTesting;
+
+message TRow {
+ optional string String = 1;
+ optional int32 Int32 = 2;
+ optional fixed64 Fixed64 = 3;
+}
diff --git a/yt/cpp/mapreduce/io/ut/ya.make b/yt/cpp/mapreduce/io/ut/ya.make
new file mode 100644
index 0000000000..e7f9d018c3
--- /dev/null
+++ b/yt/cpp/mapreduce/io/ut/ya.make
@@ -0,0 +1,15 @@
+GTEST()
+
+SRCS(
+ end_of_stream_ut.cpp
+ readers_ut.cpp
+ yamr_table_reader_ut.cpp
+
+ ut_row.proto
+)
+
+PEERDIR(
+ yt/cpp/mapreduce/io
+)
+
+END()
diff --git a/yt/cpp/mapreduce/io/ut/yamr_table_reader_ut.cpp b/yt/cpp/mapreduce/io/ut/yamr_table_reader_ut.cpp
new file mode 100644
index 0000000000..d55a28b3e4
--- /dev/null
+++ b/yt/cpp/mapreduce/io/ut/yamr_table_reader_ut.cpp
@@ -0,0 +1,185 @@
+#include <yt/cpp/mapreduce/io/yamr_table_reader.h>
+
+#include <library/cpp/testing/gtest/gtest.h>
+
+using namespace NYT;
+
+template <>
+void Out<std::tuple<TString, TString, TString>>(IOutputStream& out, const std::tuple<TString, TString, TString>& value) {
+ out << "{" << std::get<0>(value) << ", " << std::get<1>(value) << ", " << std::get<2>(value) << "}";
+}
+
+
+////////////////////////////////////////////////////////////////////
+
+class TRowCollection
+{
+public:
+ void AddRow(TStringBuf key, TStringBuf subkey, TStringBuf value)
+ {
+ TStringStream row;
+ auto appendLenval = [&] (TStringBuf value) {
+ ui32 size = value.size();
+ row.Write(&size, sizeof(size));
+ row.Write(value);
+ };
+ appendLenval(key);
+ appendLenval(subkey);
+ appendLenval(value);
+ RowList_.push_back(row.Str());
+ }
+
+ TString GetStreamDataStartFromRow(ui64 rowIndex) const
+ {
+ Y_ABORT_UNLESS(rowIndex < RowList_.size());
+ TStringStream ss;
+ ss.Write("\xFC\xFF\xFF\xFF");
+ ss.Write(&rowIndex, sizeof(rowIndex));
+ for (size_t i = rowIndex; i != RowList_.size(); ++i) {
+ ss.Write(RowList_[i]);
+ }
+ return ss.Str();
+ }
+
+ size_t ComputeTotalStreamSize() const {
+ return GetStreamDataStartFromRow(0).size();
+ }
+
+private:
+ TVector<TString> RowList_;
+};
+
+class TTestRawTableReader
+ : public TRawTableReader
+{
+public:
+ TTestRawTableReader(const TRowCollection& rowCollection)
+ : RowCollection_(rowCollection)
+ , DataToRead_(RowCollection_.GetStreamDataStartFromRow(0))
+ , Input_(MakeHolder<TStringStream>(DataToRead_))
+ { }
+
+ TTestRawTableReader(const TRowCollection& rowCollection, size_t breakPoint)
+ : RowCollection_(rowCollection)
+ , DataToRead_(RowCollection_.GetStreamDataStartFromRow(0).substr(0, breakPoint))
+ , Input_(MakeHolder<TStringStream>(DataToRead_))
+ , Broken_(true)
+ { }
+
+ size_t DoRead(void* buf, size_t size) override
+ {
+ Y_ABORT_UNLESS(Input_);
+ size_t res = Input_->Read(buf, size);
+ if (!res && Broken_) {
+ ythrow yexception() << "Stream is broken";
+ }
+ return res;
+ }
+
+ bool Retry(
+ const TMaybe<ui32>& /*rangeIndex*/,
+ const TMaybe<ui64>& rowIndex,
+ const std::exception_ptr& /*error*/) override
+ {
+ if (--Retries < 0) {
+ return false;
+ }
+ ui64 actualRowIndex = rowIndex ? *rowIndex : 0;
+ DataToRead_ = RowCollection_.GetStreamDataStartFromRow(actualRowIndex);
+ Input_ = MakeHolder<TStringInput>(DataToRead_);
+ Broken_ = false;
+ return true;
+ }
+
+ void ResetRetries() override
+ { }
+
+ bool HasRangeIndices() const override
+ {
+ return false;
+ }
+
+private:
+ TRowCollection RowCollection_;
+ TString DataToRead_;
+ THolder<IInputStream> Input_;
+ bool Broken_ = false;
+ i32 Retries = 1;
+};
+
+TEST(TYamrTableReaderTest, TestReadRetry)
+{
+ const TVector<std::tuple<TString, TString, TString>> expectedResult = {
+ {"foo1", "bar1", "baz1"},
+ {"foo2", "bar2", "baz2"},
+ {"foo3", "bar3", "baz3"},
+ };
+
+ TRowCollection rowCollection;
+ for (const auto& row : expectedResult) {
+ rowCollection.AddRow(std::get<0>(row), std::get<1>(row), std::get<2>(row));
+ }
+
+ ssize_t streamSize = rowCollection.ComputeTotalStreamSize();
+
+ for (ssize_t breakPoint = -1; breakPoint < streamSize; ++breakPoint) {
+ ::TIntrusivePtr<TRawTableReader> rawReader;
+ if (breakPoint == -1) {
+ rawReader = ::MakeIntrusive<TTestRawTableReader>(rowCollection);
+ } else {
+ rawReader = ::MakeIntrusive<TTestRawTableReader>(rowCollection, static_cast<size_t>(breakPoint));
+ }
+
+ TYaMRTableReader tableReader(rawReader);
+ TVector<std::tuple<TString, TString, TString>> actualResult;
+ for (; tableReader.IsValid(); tableReader.Next()) {
+ EXPECT_TRUE(!tableReader.IsRawReaderExhausted());
+ auto row = tableReader.GetRow();
+ actualResult.emplace_back(row.Key, row.SubKey, row.Value);
+ }
+ EXPECT_TRUE(tableReader.IsRawReaderExhausted());
+ EXPECT_EQ(actualResult, expectedResult);
+ }
+}
+
+TEST(TYamrTableReaderTest, TestSkipRetry)
+{
+ const TVector<std::tuple<TString, TString, TString>> expectedResult = {
+ {"foo1", "bar1", "baz1"},
+ {"foo2", "bar2", "baz2"},
+ {"foo3", "bar3", "baz3"},
+ };
+
+ TRowCollection rowCollection;
+ for (const auto& row : expectedResult) {
+ rowCollection.AddRow(std::get<0>(row), std::get<1>(row), std::get<2>(row));
+ }
+
+ ssize_t streamSize = rowCollection.ComputeTotalStreamSize();
+
+ for (ssize_t breakPoint = -1; breakPoint < streamSize; ++breakPoint) {
+ try {
+ ::TIntrusivePtr<TRawTableReader> rawReader;
+ if (breakPoint == -1) {
+ rawReader = ::MakeIntrusive<TTestRawTableReader>(rowCollection);
+ } else {
+ rawReader = ::MakeIntrusive<TTestRawTableReader>(rowCollection, static_cast<size_t>(breakPoint));
+ }
+
+ TYaMRTableReader tableReader(rawReader);
+ ui32 rowCount = 0;
+ for (; tableReader.IsValid(); tableReader.Next()) {
+ EXPECT_TRUE(!tableReader.IsRawReaderExhausted());
+ ++rowCount;
+ }
+ EXPECT_TRUE(tableReader.IsRawReaderExhausted());
+ EXPECT_EQ(rowCount, 3u);
+ } catch (const std::exception& ex) {
+ Cerr << breakPoint << Endl;
+ Cerr << ex.what() << Endl;
+ throw;
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/io/ya.make b/yt/cpp/mapreduce/io/ya.make
index 0531a669b5..18a0cac464 100644
--- a/yt/cpp/mapreduce/io/ya.make
+++ b/yt/cpp/mapreduce/io/ya.make
@@ -22,13 +22,15 @@ SRCS(
PEERDIR(
contrib/libs/protobuf
library/cpp/yson
+ library/cpp/yson/node
yt/cpp/mapreduce/common
yt/cpp/mapreduce/interface
yt/cpp/mapreduce/interface/logging
- yt/yt_proto/yt/formats
- library/cpp/yson/node
yt/cpp/mapreduce/skiff
+ yt/yt_proto/yt/formats
yt/yt/core
)
END()
+
+RECURSE_FOR_TESTS(ut)
diff --git a/yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp b/yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp
new file mode 100644
index 0000000000..cee62cf1f7
--- /dev/null
+++ b/yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp
@@ -0,0 +1,124 @@
+#include <yt/cpp/mapreduce/raw_client/raw_batch_request.h>
+
+#include <yt/cpp/mapreduce/http/context.h>
+#include <yt/cpp/mapreduce/interface/client_method_options.h>
+#include <yt/cpp/mapreduce/interface/errors.h>
+#include <yt/cpp/mapreduce/common/retry_lib.h>
+
+#include <library/cpp/testing/gtest/gtest.h>
+
+using namespace NYT;
+using namespace NYT::NDetail;
+using namespace NYT::NDetail::NRawClient;
+
+
+class TTestRetryPolicy
+ : public IRequestRetryPolicy
+{
+private:
+ static constexpr int RetriableCode = 904;
+
+public:
+ void NotifyNewAttempt() override
+ { }
+
+ TMaybe<TDuration> OnGenericError(const std::exception& /*e*/) override
+ {
+ return TDuration::Seconds(42);
+ }
+
+ void OnIgnoredError(const TErrorResponse& /*e*/) override
+ { }
+
+ TMaybe<TDuration> OnRetriableError(const TErrorResponse& e) override
+ {
+ if (e.GetError().GetCode() == RetriableCode) {
+ return TDuration::Seconds(e.GetError().GetAttributes().at("retry_interval").AsUint64());
+ } else {
+ return Nothing();
+ }
+ }
+
+ TString GetAttemptDescription() const override
+ {
+ return "attempt";
+ }
+
+ static TNode GenerateRetriableError(TDuration retryDuration)
+ {
+ Y_ABORT_UNLESS(retryDuration - TDuration::Seconds(retryDuration.Seconds()) == TDuration::Zero());
+
+ return TNode()
+ ("code", RetriableCode)
+ ("attributes",
+ TNode()
+ ("retry_interval", retryDuration.Seconds()));
+ }
+};
+
+
+TString GetPathFromRequest(const TNode& params)
+{
+ return params.AsMap().at("parameters").AsMap().at("path").AsString();
+}
+
+TVector<TString> GetAllPathsFromRequestList(const TNode& requestList)
+{
+ TVector<TString> result;
+ for (const auto& request : requestList.AsList()) {
+ result.push_back(GetPathFromRequest(request)); }
+ return result;
+}
+
+
+TEST(TBatchRequestImplTest, ParseResponse) {
+ TClientContext context;
+ TRawBatchRequest batchRequest(context.Config);
+
+ EXPECT_EQ(batchRequest.BatchSize(), 0u);
+
+ auto get1 = batchRequest.Get(
+ TTransactionId(),
+ "//getOk",
+ TGetOptions());
+
+ auto get2 = batchRequest.Get(
+ TTransactionId(),
+ "//getError-3",
+ TGetOptions());
+
+ auto get3 = batchRequest.Get(
+ TTransactionId(),
+ "//getError-5",
+ TGetOptions());
+
+ EXPECT_EQ(batchRequest.BatchSize(), 3u);
+
+ auto testRetryPolicy = MakeIntrusive<TTestRetryPolicy>();
+ const TInstant now = TInstant::Seconds(100500);
+
+ TRawBatchRequest retryBatch(context.Config);
+ batchRequest.ParseResponse(
+ TNode()
+ .Add(TNode()("output", 5))
+ .Add(TNode()("error",
+ TTestRetryPolicy::GenerateRetriableError(TDuration::Seconds(3))))
+ .Add(TNode()("error",
+ TTestRetryPolicy::GenerateRetriableError(TDuration::Seconds(5)))),
+ "<no-request-id>",
+ testRetryPolicy,
+ &retryBatch,
+ now);
+
+ EXPECT_EQ(batchRequest.BatchSize(), 0u);
+ EXPECT_EQ(retryBatch.BatchSize(), 2u);
+
+ TNode retryParameterList;
+ TInstant nextTry;
+ retryBatch.FillParameterList(3, &retryParameterList, &nextTry);
+ EXPECT_EQ(
+ GetAllPathsFromRequestList(retryParameterList),
+ TVector<TString>({"//getError-3", "//getError-5"}));
+
+ EXPECT_EQ(nextTry, now + TDuration::Seconds(5));
+}
diff --git a/yt/cpp/mapreduce/raw_client/ut/raw_requests_ut.cpp b/yt/cpp/mapreduce/raw_client/ut/raw_requests_ut.cpp
new file mode 100644
index 0000000000..d2c7022f02
--- /dev/null
+++ b/yt/cpp/mapreduce/raw_client/ut/raw_requests_ut.cpp
@@ -0,0 +1,191 @@
+#include <yt/cpp/mapreduce/raw_client/raw_requests.h>
+
+#include <library/cpp/yson/node/node_io.h>
+
+#include <library/cpp/testing/gtest/gtest.h>
+
+using namespace NYT;
+using namespace NYT::NDetail;
+using namespace NYT::NDetail::NRawClient;
+
+TEST(TOperationsApiParsingTest, ParseOperationAttributes)
+{
+ auto response = TStringBuf(R"""({
+ "id" = "1-2-3-4";
+ "authenticated_user" = "some-user";
+ "start_time" = "2018-01-01T00:00:00.0Z";
+ "weight" = 1.;
+ "state" = "completed";
+ "suspended" = %false;
+ "finish_time" = "2018-01-02T00:00:00.0Z";
+ "brief_progress" = {
+ "jobs" = {
+ "lost" = 0;
+ "pending" = 0;
+ "failed" = 1;
+ "aborted" = 0;
+ "total" = 84;
+ "running" = 0;
+ "completed" = 84;
+ };
+ };
+ "result" = {
+ "error" = {
+ "attributes" = {};
+ "code" = 0;
+ "message" = "";
+ };
+ };
+ "brief_spec" = {
+ "input_table_paths" = <
+ "count" = 1;
+ > [
+ "//some-input";
+ ];
+ "pool" = "some-pool";
+ "scheduling_info_per_pool_tree" = {
+ "physical" = {
+ "pool" = "some-pool";
+ };
+ };
+ "title" = "some-title";
+ "output_table_paths" = <
+ "count" = 1;
+ > [
+ "//some-output";
+ ];
+ "mapper" = {
+ "command" = "some-command";
+ };
+ };
+ "type" = "map";
+ "pool" = "some-pool";
+ "progress" = {
+ "build_time" = "2018-01-01T00:00:00.000000Z";
+ "job_statistics" = {
+ "data" = {
+ "input" = {
+ "row_count" = {
+ "$" = {
+ "failed" = {
+ "map" = {
+ "max" = 1;
+ "count" = 1;
+ "sum" = 1;
+ "min" = 1;
+ };
+ };
+ "completed" = {
+ "map" = {
+ "max" = 1;
+ "count" = 84;
+ "sum" = 84;
+ "min" = 1;
+ };
+ };
+ };
+ };
+ };
+ };
+ };
+ "total_job_counter" = {
+ "completed" = {
+ "total" = 3;
+ "non-interrupted" = 1;
+ "interrupted" = {
+ "whatever_interrupted" = 2;
+ };
+ };
+ "aborted" = {
+ "non_scheduled" = {
+ "whatever_non_scheduled" = 3;
+ };
+ "scheduled" = {
+ "whatever_scheduled" = 4;
+ };
+ "total" = 7;
+ };
+ "lost" = 5;
+ "invalidated" = 6;
+ "failed" = 7;
+ "running" = 8;
+ "suspended" = 9;
+ "pending" = 10;
+ "blocked" = 11;
+ "total" = 66;
+ };
+ };
+ "events" = [
+ {"state" = "pending"; "time" = "2018-01-01T00:00:00.000000Z";};
+ {"state" = "materializing"; "time" = "2018-01-02T00:00:00.000000Z";};
+ {"state" = "running"; "time" = "2018-01-03T00:00:00.000000Z";};
+ ];
+ })""");
+ auto attrs = ParseOperationAttributes(NodeFromYsonString(response));
+
+ EXPECT_TRUE(attrs.Id);
+ EXPECT_EQ(GetGuidAsString(*attrs.Id), "1-2-3-4");
+
+ EXPECT_TRUE(attrs.Type);
+ EXPECT_EQ(*attrs.Type, EOperationType::Map);
+
+ EXPECT_TRUE(attrs.State);
+ EXPECT_EQ(*attrs.State, "completed");
+
+ EXPECT_TRUE(attrs.BriefState);
+ EXPECT_EQ(*attrs.BriefState, EOperationBriefState::Completed);
+
+ EXPECT_TRUE(attrs.AuthenticatedUser);
+ EXPECT_EQ(*attrs.AuthenticatedUser, "some-user");
+
+ EXPECT_TRUE(attrs.StartTime);
+ EXPECT_TRUE(attrs.FinishTime);
+ EXPECT_EQ(*attrs.FinishTime - *attrs.StartTime, TDuration::Days(1));
+
+ EXPECT_TRUE(attrs.BriefProgress);
+ EXPECT_EQ(attrs.BriefProgress->Completed, 84u);
+ EXPECT_EQ(attrs.BriefProgress->Failed, 1u);
+
+ EXPECT_TRUE(attrs.BriefSpec);
+ EXPECT_EQ((*attrs.BriefSpec)["title"].AsString(), "some-title");
+
+ EXPECT_TRUE(attrs.Suspended);
+ EXPECT_EQ(*attrs.Suspended, false);
+
+ EXPECT_TRUE(attrs.Result);
+ EXPECT_TRUE(!attrs.Result->Error);
+
+ EXPECT_TRUE(attrs.Progress);
+ EXPECT_EQ(attrs.Progress->JobStatistics.JobState({}).GetStatistics("data/input/row_count").Sum(), 85u);
+ EXPECT_EQ(attrs.Progress->JobCounters.GetCompletedInterrupted().GetTotal(), 2u);
+ EXPECT_EQ(attrs.Progress->JobCounters.GetAbortedNonScheduled().GetTotal(), 3u);
+ EXPECT_EQ(attrs.Progress->JobCounters.GetAbortedScheduled().GetTotal(), 4u);
+ EXPECT_EQ(attrs.Progress->JobCounters.GetAborted().GetTotal(), 7u);
+ EXPECT_EQ(attrs.Progress->JobCounters.GetFailed().GetTotal(), 7u);
+ EXPECT_EQ(attrs.Progress->JobCounters.GetTotal(), 66u);
+ EXPECT_EQ(*attrs.Progress->BuildTime, TInstant::ParseIso8601("2018-01-01T00:00:00.000000Z"));
+
+ EXPECT_TRUE(attrs.Events);
+ EXPECT_EQ((*attrs.Events)[1].State, "materializing");
+ EXPECT_EQ((*attrs.Events)[1].Time, TInstant::ParseIso8601("2018-01-02T00:00:00.000000Z"));
+}
+
+TEST(TOperationsApiParsingTest, EmptyProgress)
+{
+ auto response = TStringBuf(R"""({
+ "id" = "1-2-3-4";
+ "brief_progress" = {};
+ "progress" = {};
+ })""");
+ auto attrs = ParseOperationAttributes(NodeFromYsonString(response));
+
+ EXPECT_TRUE(attrs.Id);
+ EXPECT_EQ(GetGuidAsString(*attrs.Id), "1-2-3-4");
+
+ EXPECT_TRUE(!attrs.BriefProgress);
+
+ EXPECT_TRUE(attrs.Progress);
+ EXPECT_EQ(attrs.Progress->JobStatistics.JobState({}).GetStatisticsNames(), TVector<TString>{});
+ EXPECT_EQ(attrs.Progress->JobCounters.GetTotal(), 0u);
+ EXPECT_TRUE(!attrs.Progress->BuildTime);
+}
diff --git a/yt/cpp/mapreduce/raw_client/ut/ya.make b/yt/cpp/mapreduce/raw_client/ut/ya.make
new file mode 100644
index 0000000000..248471152c
--- /dev/null
+++ b/yt/cpp/mapreduce/raw_client/ut/ya.make
@@ -0,0 +1,12 @@
+GTEST()
+
+SRCS(
+ raw_batch_request_ut.cpp
+ raw_requests_ut.cpp
+)
+
+PEERDIR(
+ yt/cpp/mapreduce/raw_client
+)
+
+END()
diff --git a/yt/cpp/mapreduce/raw_client/ya.make b/yt/cpp/mapreduce/raw_client/ya.make
index 0d03aae80c..c201b86f05 100644
--- a/yt/cpp/mapreduce/raw_client/ya.make
+++ b/yt/cpp/mapreduce/raw_client/ya.make
@@ -17,3 +17,7 @@ PEERDIR(
)
END()
+
+RECURSE_FOR_TESTS(
+ ut
+)
diff --git a/yt/cpp/mapreduce/skiff/checked_parser.h b/yt/cpp/mapreduce/skiff/checked_parser.h
new file mode 100644
index 0000000000..8fd9f90b0b
--- /dev/null
+++ b/yt/cpp/mapreduce/skiff/checked_parser.h
@@ -0,0 +1 @@
+#include <library/cpp/skiff/skiff.h>