diff options
author | nadya73 <nadya73@yandex-team.com> | 2024-07-02 23:10:50 +0300 |
---|---|---|
committer | nadya73 <nadya73@yandex-team.com> | 2024-07-02 23:21:03 +0300 |
commit | 5ea9afc5ee7edc24efa5f45b3a15e184872b0854 (patch) | |
tree | 4ccc339d97575cba8b3ed47b6f0615326bdb5324 /yt/cpp | |
parent | 96b239778766d32d5158aca805e08199b3c0a743 (diff) | |
download | ydb-5ea9afc5ee7edc24efa5f45b3a15e184872b0854.tar.gz |
[yt/cpp/mapreduce] YT-21595: Use gtest instead of ytest in all mapreduce tests
85671f0cf4f45b4f015fa2cc0d195b81c16c6e8a
Diffstat (limited to 'yt/cpp')
48 files changed, 3416 insertions, 2100 deletions
diff --git a/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp b/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp new file mode 100644 index 0000000000..90196246c5 --- /dev/null +++ b/yt/cpp/mapreduce/http/ut/connection_pool_ut.cpp @@ -0,0 +1,199 @@ +#include "simple_server.h" + +#include <yt/cpp/mapreduce/http/http.h> + +#include <yt/cpp/mapreduce/interface/config.h> + +#include <library/cpp/threading/future/async.h> + +#include <library/cpp/http/io/stream.h> + +#include <library/cpp/testing/gtest/gtest.h> + +#include <library/cpp/testing/common/network.h> + +#include <util/string/builder.h> +#include <util/stream/tee.h> +#include <util/system/thread.h> + +using namespace NYT; + +namespace { + void ParseFirstLine(const TString firstLine, TString& method, TString& host , ui64& port, TString& command) + { + size_t idx = firstLine.find_first_of(' '); + method = firstLine.substr(0, idx); + size_t idx2 = firstLine.find_first_of(':', idx + 1); + host = firstLine.substr(idx + 1, idx2 - idx - 1); + idx = firstLine.find_first_of('/', idx2 + 1); + port = std::atoi(firstLine.substr(idx2 + 1, idx - idx2 - 1).c_str()); + idx2 = firstLine.find_first_of(' ', idx + 1); + command = firstLine.substr(idx, idx2 - idx); + } +} // namespace + +THolder<TSimpleServer> CreateSimpleHttpServer() +{ + auto port = NTesting::GetFreePort(); + return MakeHolder<TSimpleServer>( + port, + [] (IInputStream* input, IOutputStream* output) { + try { + while (true) { + THttpInput httpInput(input); + httpInput.ReadAll(); + + THttpOutput httpOutput(output); + httpOutput.EnableKeepAlive(true); + httpOutput << "HTTP/1.1 200 OK\r\n"; + httpOutput << "\r\n"; + for (size_t i = 0; i != 10000; ++i) { + httpOutput << "The grass was greener"; + } + httpOutput.Flush(); + } + } catch (const std::exception&) { + } + }); +} + +THolder<TSimpleServer> CreateProxyHttpServer() +{ + auto port = NTesting::GetFreePort(); + return MakeHolder<TSimpleServer>( + port, + [] (IInputStream* input, IOutputStream* output) { + try { + while (true) { + THttpInput httpInput(input); + const TString inputStr = httpInput.FirstLine(); + auto headers = httpInput.Headers(); + TString method, command, host; + ui64 port; + ParseFirstLine(inputStr, method, host, port, command); + + THttpRequest request; + const TString hostName = ::TStringBuilder() << host << ":" << port; + request.Connect(hostName); + auto header = THttpHeader(method, command); + request.StartRequest(header); + request.FinishRequest(); + auto res = request.GetResponseStream(); + THttpOutput httpOutput(output); + httpOutput.EnableKeepAlive(true); + auto strRes = res->ReadAll(); + httpOutput << "HTTP/1.1 200 OK\r\n"; + httpOutput << "\r\n"; + httpOutput << strRes; + httpOutput.Flush(); + } + } catch (const std::exception&) { + } + }); +} + + +class TConnectionPoolConfigGuard +{ +public: + TConnectionPoolConfigGuard(int newSize) + { + OldValue_ = TConfig::Get()->ConnectionPoolSize; + TConfig::Get()->ConnectionPoolSize = newSize; + } + + ~TConnectionPoolConfigGuard() + { + TConfig::Get()->ConnectionPoolSize = OldValue_; + } + +private: + int OldValue_; +}; + +class TFuncThread + : public ISimpleThread +{ +public: + using TFunc = std::function<void()>; + +public: + TFuncThread(const TFunc& func) + : Func_(func) + { } + + void* ThreadProc() noexcept override { + Func_(); + return nullptr; + } + +private: + TFunc Func_; +}; + +TEST(TConnectionPool, TestReleaseUnread) +{ + auto simpleServer = CreateSimpleHttpServer(); + + const TString hostName = ::TStringBuilder() << "localhost:" << simpleServer->GetPort(); + + for (size_t i = 0; i != 10; ++i) { + THttpRequest request; + request.Connect(hostName); + request.StartRequest(THttpHeader("GET", "foo")); + request.FinishRequest(); + request.GetResponseStream(); + } +} + +TEST(TConnectionPool, TestProxy) +{ + auto simpleServer = CreateSimpleHttpServer(); + auto simpleServer2 = CreateProxyHttpServer(); + + const TString hostName = ::TStringBuilder() << "localhost:" << simpleServer->GetPort(); + const TString hostName2 = ::TStringBuilder() << "localhost:" << simpleServer2->GetPort(); + + for (size_t i = 0; i != 10; ++i) { + THttpRequest request; + request.Connect(hostName2); + auto header = THttpHeader("GET", "foo"); + header.SetProxyAddress(hostName2); + header.SetHostPort(hostName); + request.StartRequest(header); + request.FinishRequest(); + request.GetResponseStream(); + } +} + +TEST(TConnectionPool, TestConcurrency) +{ + TConnectionPoolConfigGuard g(1); + + auto simpleServer = CreateSimpleHttpServer(); + const TString hostName = ::TStringBuilder() << "localhost:" << simpleServer->GetPort(); + auto threadPool = CreateThreadPool(20); + + const auto func = [&] { + for (int i = 0; i != 100; ++i) { + THttpRequest request; + request.Connect(hostName); + request.StartRequest(THttpHeader("GET", "foo")); + request.FinishRequest(); + auto res = request.GetResponseStream(); + res->ReadAll(); + } + }; + + TVector<THolder<TFuncThread>> threads; + for (int i = 0; i != 10; ++i) { + threads.push_back(MakeHolder<TFuncThread>(func)); + }; + + for (auto& t : threads) { + t->Start(); + } + for (auto& t : threads) { + t->Join(); + } +} diff --git a/yt/cpp/mapreduce/http/ut/http_ut.cpp b/yt/cpp/mapreduce/http/ut/http_ut.cpp new file mode 100644 index 0000000000..0c0ab3363e --- /dev/null +++ b/yt/cpp/mapreduce/http/ut/http_ut.cpp @@ -0,0 +1,94 @@ +#include "simple_server.h" + +#include <yt/cpp/mapreduce/http/http.h> + +#include <yt/cpp/mapreduce/interface/config.h> + +#include <library/cpp/testing/common/network.h> + +#include <library/cpp/testing/gtest/gtest.h> + +#include <util/system/byteorder.h> + +using namespace NYT; + +void WriteDataFrame(TStringBuf string, IOutputStream* stream) +{ + stream->Write("\x01"); + ui32 size = string.Size(); + auto littleEndianSize = HostToLittle(size); + stream->Write(&littleEndianSize, sizeof(littleEndianSize)); + stream->Write(string); +} + +THolder<TSimpleServer> CreateFramingEchoServer() +{ + auto port = NTesting::GetFreePort(); + return MakeHolder<TSimpleServer>( + port, + [] (IInputStream* input, IOutputStream* output) { + try { + THttpInput httpInput(input); + if (!httpInput.Headers().FindHeader("X-YT-Accept-Framing")) { + FAIL() << "X-YT-Accept-Framing header not found"; + } + auto input = httpInput.ReadAll(); + + THttpOutput httpOutput(output); + httpOutput << "HTTP/1.1 200 OK\r\n"; + httpOutput << "X-YT-Framing: 1\r\n"; + httpOutput << "\r\n"; + httpOutput << "\x02\x02"; // Two KeepAlive frames. + WriteDataFrame("", &httpOutput); + WriteDataFrame(TStringBuf(input).SubString(0, 10), &httpOutput); + httpOutput << "\x02"; // KeepAlive. + WriteDataFrame("", &httpOutput); + WriteDataFrame(TStringBuf(input).SubString(10, std::string::npos), &httpOutput); + httpOutput << "\x02"; // KeepAlive. + + httpOutput.Flush(); + } catch (const std::exception& exc) { + } + }); +} + +TEST(THttpHeaderTest, AddParameter) +{ + THttpHeader header("POST", "/foo"); + header.AddMutationId(); + + auto id1 = header.GetParameters()["mutation_id"].AsString(); + + header.AddMutationId(); + + auto id2 = header.GetParameters()["mutation_id"].AsString(); + + EXPECT_TRUE(id1 != id2); +} + +TEST(TFramingTest, FramingSimple) +{ + auto server = CreateFramingEchoServer(); + + THttpRequest request; + request.Connect(server->GetAddress()); + auto requestStream = request.StartRequest(THttpHeader("POST", "concatenate")); + *requestStream << "Some funny data"; + request.FinishRequest(); + auto response = request.GetResponseStream()->ReadAll(); + EXPECT_EQ(response, "Some funny data"); +} + +TEST(TFramingTest, FramingLarge) +{ + auto server = CreateFramingEchoServer(); + + THttpRequest request; + request.Connect(server->GetAddress()); + auto requestStream = request.StartRequest(THttpHeader("POST", "concatenate")); + auto data = TString(100000, 'x'); + *requestStream << data; + request.FinishRequest(); + auto response = request.GetResponseStream()->ReadAll(); + EXPECT_EQ(response, data); +} diff --git a/yt/cpp/mapreduce/http/ut/simple_server.cpp b/yt/cpp/mapreduce/http/ut/simple_server.cpp new file mode 100644 index 0000000000..fbc369ec20 --- /dev/null +++ b/yt/cpp/mapreduce/http/ut/simple_server.cpp @@ -0,0 +1,90 @@ +#include "simple_server.h" + +#include <util/network/pair.h> +#include <util/network/poller.h> +#include <util/network/sock.h> +#include <util/string/builder.h> +#include <util/system/thread.h> +#include <util/thread/pool.h> + +TSimpleServer::TSimpleServer(int port, TRequestHandler requestHandler) + : Port_(port) +{ + auto listenSocket = MakeAtomicShared<TInetStreamSocket>(); + TSockAddrInet addr((TIpHost)INADDR_ANY, Port_); + SetSockOpt(*listenSocket, SOL_SOCKET, SO_REUSEADDR, 1); + int ret = listenSocket->Bind(&addr); + Y_ENSURE_EX(ret == 0, TSystemError() << "Can not bind"); + + SOCKET socketPair[2]; + ret = SocketPair(socketPair); + Y_ENSURE_EX(ret == 0, TSystemError() << "Can not create socket pair"); + + ret = listenSocket->Listen(10); + Y_ENSURE_EX(ret == 0, TSystemError() << "Can not listen socket"); + + SendFinishSocket_ = MakeHolder<TInetStreamSocket>(socketPair[1]); + + ThreadPool_ = MakeHolder<TAdaptiveThreadPool>(); + ThreadPool_->Start(1); + + auto receiveFinish = MakeAtomicShared<TInetStreamSocket>(socketPair[0]); + ListenerThread_ = ThreadPool_->Run([listenSocket, receiveFinish, requestHandler] { + TSocketPoller socketPoller; + socketPoller.WaitRead(*receiveFinish, nullptr); + socketPoller.WaitRead(*listenSocket, (void*)1); + + bool running = true; + while (running) { + void* cookies[2]; + size_t cookieCount = socketPoller.WaitI(cookies, 2); + for (size_t i = 0; i != cookieCount; ++i) { + if (!cookies[i]) { + running = false; + } else { + TSockAddrInet addr; + TAtomicSharedPtr<TStreamSocket> socket = MakeAtomicShared<TInetStreamSocket>(); + int ret = listenSocket->Accept(socket.Get(), &addr); + Y_ENSURE_EX(ret == 0, TSystemError() << "Can not accept connection"); + + SystemThreadFactory()->Run( + [socket, requestHandler] { + TStreamSocketInput input(socket.Get()); + TStreamSocketOutput output(socket.Get()); + requestHandler(&input, &output); + socket->Close(); + }); + } + } + } + }); +} + +TSimpleServer::~TSimpleServer() +{ + try { + if (ThreadPool_) { + Stop(); + } + } catch (...) { + } +} + +void TSimpleServer::Stop() +{ + // Just send something to indicate shutdown. + SendFinishSocket_->Send("X", 1); + ListenerThread_->Join(); + ThreadPool_->Stop(); + ThreadPool_.Destroy(); +} + +int TSimpleServer::GetPort() const +{ + return Port_; +} + +TString TSimpleServer::GetAddress() const +{ + return TStringBuilder() << "localhost:" << Port_; +} diff --git a/yt/cpp/mapreduce/http/ut/simple_server.h b/yt/cpp/mapreduce/http/ut/simple_server.h new file mode 100644 index 0000000000..f468ca55a6 --- /dev/null +++ b/yt/cpp/mapreduce/http/ut/simple_server.h @@ -0,0 +1,35 @@ +#pragma once + +#include <util/generic/ptr.h> + +#include <util/stream/input.h> +#include <util/stream/output.h> + +#include <util/thread/pool.h> + +#include <functional> + +class TInetStreamSocket; + +// Simple server listens on the specified port and launches +// requestHandler in the separate thread for each incoming connection. +class TSimpleServer +{ +public: + using TRequestHandler = std::function<void(IInputStream* input, IOutputStream* output)>; + +public: + TSimpleServer(int port, TRequestHandler requestHandler); + ~TSimpleServer(); + + void Stop(); + + int GetPort() const; + TString GetAddress() const; + +private: + const int Port_; + THolder<IThreadPool> ThreadPool_; + THolder<IThreadFactory::IThread> ListenerThread_; + THolder<TInetStreamSocket> SendFinishSocket_; +}; diff --git a/yt/cpp/mapreduce/http/ut/ya.make b/yt/cpp/mapreduce/http/ut/ya.make new file mode 100644 index 0000000000..fdc632ac95 --- /dev/null +++ b/yt/cpp/mapreduce/http/ut/ya.make @@ -0,0 +1,14 @@ +GTEST() + +SRCS( + connection_pool_ut.cpp + http_ut.cpp + simple_server.cpp +) + +PEERDIR( + yt/cpp/mapreduce/http + library/cpp/testing/common +) + +END() diff --git a/yt/cpp/mapreduce/http/ya.make b/yt/cpp/mapreduce/http/ya.make index ef81a4b64a..4db8e99bd7 100644 --- a/yt/cpp/mapreduce/http/ya.make +++ b/yt/cpp/mapreduce/http/ya.make @@ -27,3 +27,7 @@ PEERDIR( ) END() + +RECURSE_FOR_TESTS( + ut +) diff --git a/yt/cpp/mapreduce/interface/common_ut.cpp b/yt/cpp/mapreduce/interface/common_ut.cpp deleted file mode 100644 index 05d7495ca2..0000000000 --- a/yt/cpp/mapreduce/interface/common_ut.cpp +++ /dev/null @@ -1,357 +0,0 @@ -#include "common_ut.h" - -#include "fluent.h" - -#include <yt/cpp/mapreduce/interface/common.h> - -#include <yt/cpp/mapreduce/tests/yt_unittest_lib/yt_unittest_lib.h> - -#include <library/cpp/testing/unittest/registar.h> - -#include <library/cpp/yson/node/node_io.h> -#include <library/cpp/yson/node/node_builder.h> - -#include <util/generic/xrange.h> - -using namespace NYT; - -template <class T> -TString SaveToString(const T& obj) -{ - TString s; - TStringOutput out(s); - ::Save(&out, obj); - return s; -} - -template <class T> -T LoadFromString(TStringBuf s) -{ - TMemoryInput in(s); - T obj; - ::Load(&in, obj); - return obj; -} - -template <class T> -T SaveLoad(const T& obj) -{ - return LoadFromString<T>(SaveToString(obj)); -} - -Y_UNIT_TEST_SUITE(Common) -{ - Y_UNIT_TEST(SortColumnsLegacy) - { - TSortColumns keys1("a", "b"); - UNIT_ASSERT((keys1.Parts_ == TSortColumns{"a", "b"})); - - keys1.Add("c", "d"); - UNIT_ASSERT((keys1.Parts_ == TSortColumns{"a", "b", "c", "d"})); - - auto keys2 = TSortColumns(keys1).Add("e", "f"); - UNIT_ASSERT((keys1.Parts_ == TSortColumns{"a", "b", "c", "d"})); - UNIT_ASSERT((keys2.Parts_ == TSortColumns{"a", "b", "c", "d", "e", "f"})); - - auto keys3 = TSortColumns(keys1).Add("e").Add("f").Add("g"); - UNIT_ASSERT((keys1.Parts_ == TSortColumns{"a", "b", "c", "d"})); - UNIT_ASSERT((keys3.Parts_ == TSortColumns{"a", "b", "c", "d", "e", "f", "g"})); - } - - Y_UNIT_TEST(SortColumn) - { - auto ascending = TSortColumn("a"); - UNIT_ASSERT_VALUES_EQUAL(ascending.Name(), "a"); - UNIT_ASSERT_VALUES_EQUAL(ascending.SortOrder(), ESortOrder::SO_ASCENDING); - UNIT_ASSERT_VALUES_EQUAL(ascending, TSortColumn("a", ESortOrder::SO_ASCENDING)); - UNIT_ASSERT_VALUES_UNEQUAL(ascending, TSortColumn("a", ESortOrder::SO_DESCENDING)); - - UNIT_ASSERT_NO_EXCEPTION(ascending.EnsureAscending()); - UNIT_ASSERT_VALUES_EQUAL(static_cast<TString>(ascending), "a"); - UNIT_ASSERT_VALUES_EQUAL(ascending, "a"); - - auto another = ascending; - UNIT_ASSERT_NO_EXCEPTION(another = "another"); - UNIT_ASSERT_VALUES_EQUAL(another.Name(), "another"); - UNIT_ASSERT_VALUES_EQUAL(another.SortOrder(), ESortOrder::SO_ASCENDING); - UNIT_ASSERT_VALUES_EQUAL(another, TSortColumn("another", ESortOrder::SO_ASCENDING)); - UNIT_ASSERT_VALUES_UNEQUAL(another, TSortColumn("another", ESortOrder::SO_DESCENDING)); - - auto ascendingNode = BuildYsonNodeFluently().Value(ascending); - UNIT_ASSERT_VALUES_EQUAL(ascendingNode, TNode("a")); - - UNIT_ASSERT_VALUES_EQUAL(SaveLoad(ascending), ascending); - UNIT_ASSERT_VALUES_UNEQUAL(SaveToString(ascending), SaveToString(TString("a"))); - - auto descending = TSortColumn("a", ESortOrder::SO_DESCENDING); - UNIT_ASSERT_VALUES_EQUAL(descending.Name(), "a"); - UNIT_ASSERT_VALUES_EQUAL(descending.SortOrder(), ESortOrder::SO_DESCENDING); - UNIT_ASSERT_VALUES_EQUAL(descending, TSortColumn("a", ESortOrder::SO_DESCENDING)); - UNIT_ASSERT_VALUES_UNEQUAL(descending, TSortColumn("a", ESortOrder::SO_ASCENDING)); - - UNIT_ASSERT_EXCEPTION(descending.EnsureAscending(), yexception); - UNIT_ASSERT_EXCEPTION(static_cast<TString>(descending), yexception); - UNIT_ASSERT_EXCEPTION(descending == "a", yexception); - UNIT_ASSERT_EXCEPTION(descending = "a", yexception); - - auto descendingNode = BuildYsonNodeFluently().Value(descending); - UNIT_ASSERT_VALUES_EQUAL(descendingNode, TNode()("name", "a")("sort_order", "descending")); - - UNIT_ASSERT_VALUES_EQUAL(SaveLoad(descending), descending); - UNIT_ASSERT_VALUES_UNEQUAL(SaveToString(descending), SaveToString("a")); - - UNIT_ASSERT_VALUES_EQUAL(ToString(TSortColumn("blah")), "blah"); - UNIT_ASSERT_VALUES_EQUAL(ToString(TSortColumn("blah", ESortOrder::SO_DESCENDING)), "{\"name\"=\"blah\";\"sort_order\"=\"descending\"}"); - } - - Y_UNIT_TEST(SortColumns) - { - TSortColumns ascending("a", "b"); - UNIT_ASSERT(ascending.Parts_ == (TSortColumns{"a", "b"})); - UNIT_ASSERT_NO_EXCEPTION(ascending.EnsureAscending()); - UNIT_ASSERT_VALUES_EQUAL(static_cast<TColumnNames>(ascending).Parts_, (TVector<TString>{"a", "b"})); - UNIT_ASSERT_VALUES_EQUAL(ascending.GetNames(), (TVector<TString>{"a", "b"})); - - auto mixed = ascending; - mixed.Add(TSortColumn("c", ESortOrder::SO_DESCENDING), "d"); - UNIT_ASSERT((mixed.Parts_ != TVector<TSortColumn>{"a", "b", "c", "d"})); - UNIT_ASSERT((mixed.Parts_ == TVector<TSortColumn>{"a", "b", TSortColumn("c", ESortOrder::SO_DESCENDING), "d"})); - UNIT_ASSERT_VALUES_EQUAL(mixed.GetNames(), (TVector<TString>{"a", "b", "c", "d"})); - UNIT_ASSERT_EXCEPTION(mixed.EnsureAscending(), yexception); - UNIT_ASSERT_EXCEPTION(static_cast<TColumnNames>(mixed), yexception); - } - - Y_UNIT_TEST(KeyBound) - { - auto keyBound = TKeyBound(ERelation::Greater, TKey(7, "a", TNode()("x", "y"))); - UNIT_ASSERT_VALUES_EQUAL(keyBound.Relation(), ERelation::Greater); - UNIT_ASSERT_EQUAL(keyBound.Key(), TKey(7, "a", TNode()("x", "y"))); - - auto keyBound1 = TKeyBound().Relation(ERelation::Greater).Key(TKey(7, "a", TNode()("x", "y"))); - auto expectedNode = TNode() - .Add(">") - .Add(TNode().Add(7).Add("a").Add(TNode()("x", "y"))); - - UNIT_ASSERT_VALUES_EQUAL(expectedNode, BuildYsonNodeFluently().Value(keyBound)); - UNIT_ASSERT_VALUES_EQUAL(expectedNode, BuildYsonNodeFluently().Value(keyBound1)); - - keyBound.Relation(ERelation::LessOrEqual); - keyBound.Key(TKey("A", 7)); - UNIT_ASSERT_VALUES_EQUAL(keyBound.Relation(), ERelation::LessOrEqual); - UNIT_ASSERT_EQUAL(keyBound.Key(), TKey("A", 7)); - - UNIT_ASSERT_VALUES_EQUAL( - BuildYsonNodeFluently().Value(keyBound), - TNode() - .Add("<=") - .Add(TNode().Add("A").Add(7))); - } - - Y_UNIT_TEST(TTableSchema) - { - TTableSchema schema; - schema - .AddColumn(TColumnSchema().Name("a").Type(EValueType::VT_STRING).SortOrder(SO_ASCENDING)) - .AddColumn(TColumnSchema().Name("b").Type(EValueType::VT_UINT64)) - .AddColumn(TColumnSchema().Name("c").Type(EValueType::VT_INT64)); - auto checkSortBy = [](TTableSchema schema, const TVector<TString>& columns) { - auto initialSchema = schema; - schema.SortBy(columns); - for (auto i: xrange(columns.size())) { - UNIT_ASSERT_VALUES_EQUAL(schema.Columns()[i].Name(), columns[i]); - UNIT_ASSERT_VALUES_EQUAL(schema.Columns()[i].SortOrder(), ESortOrder::SO_ASCENDING); - } - for (auto i: xrange(columns.size(), (size_t)initialSchema.Columns().size())) { - UNIT_ASSERT_VALUES_EQUAL(schema.Columns()[i].SortOrder(), Nothing()); - } - UNIT_ASSERT_VALUES_EQUAL(initialSchema.Columns().size(), schema.Columns().size()); - return schema; - }; - auto newSchema = checkSortBy(schema, {"b"}); - UNIT_ASSERT_VALUES_EQUAL(newSchema.Columns()[1].Name(), TString("a")); - UNIT_ASSERT_VALUES_EQUAL(newSchema.Columns()[2].Name(), TString("c")); - checkSortBy(schema, {"b", "c"}); - checkSortBy(schema, {"c", "a"}); - UNIT_ASSERT_EXCEPTION(checkSortBy(schema, {"b", "b"}), yexception); - UNIT_ASSERT_EXCEPTION(checkSortBy(schema, {"a", "junk"}), yexception); - } - - Y_UNIT_TEST(TTableSchema_Decimal) - { - NYT::TTableSchema tableSchema; - - tableSchema.AddColumn("a", NTi::Decimal(35, 18)); - tableSchema.AddColumn("b", NTi::Optional(NTi::Decimal(35, 18))); - tableSchema.AddColumn("c", NTi::List(NTi::Decimal(35, 18))); - - auto tableSchemaNode = tableSchema.ToNode(); - const auto& tableSchemaNodeList = tableSchemaNode.AsList(); - - // There was a bug in the serialization of decimal type: https://github.com/ytsaurus/ytsaurus/issues/173 - { - const auto& currentType = tableSchemaNodeList[0]; - UNIT_ASSERT_VALUES_EQUAL(currentType.ChildAsString("type"), "string"); - UNIT_ASSERT(currentType.ChildAsBool("required")); - UNIT_ASSERT(currentType.HasKey("type_v3")); - UNIT_ASSERT_VALUES_EQUAL(currentType.At("type_v3").ChildAsString("type_name"), "decimal"); - } - { - const auto& currentType = tableSchemaNodeList[1]; - UNIT_ASSERT_VALUES_EQUAL(currentType.ChildAsString("type"), "string"); - UNIT_ASSERT(!currentType.ChildAsBool("required")); - UNIT_ASSERT(currentType.HasKey("type_v3")); - UNIT_ASSERT_VALUES_EQUAL(currentType.At("type_v3").ChildAsString("type_name"), "optional"); - UNIT_ASSERT_VALUES_EQUAL(currentType.At("type_v3").At("item").ChildAsString("type_name"), "decimal"); - } - { - const auto& currentType = tableSchemaNodeList[2]; - UNIT_ASSERT_VALUES_EQUAL(currentType.ChildAsString("type"), "any"); - UNIT_ASSERT(currentType.ChildAsBool("required")); - UNIT_ASSERT(currentType.HasKey("type_v3")); - UNIT_ASSERT_VALUES_EQUAL(currentType.At("type_v3").ChildAsString("type_name"), "list"); - UNIT_ASSERT_VALUES_EQUAL(currentType.At("type_v3").At("item").ChildAsString("type_name"), "decimal"); - } - - UNIT_ASSERT_EQUAL(tableSchema, TTableSchema::FromNode(tableSchemaNode)); - } - - Y_UNIT_TEST(TColumnSchema_TypeV3) - { - { - auto column = TColumnSchema().Type(NTi::Interval()); - UNIT_ASSERT_VALUES_EQUAL(column.Required(), true); - UNIT_ASSERT_VALUES_EQUAL(column.Type(), VT_INTERVAL); - } - { - auto column = TColumnSchema().Type(NTi::Optional(NTi::Date())); - UNIT_ASSERT_VALUES_EQUAL(column.Required(), false); - UNIT_ASSERT_VALUES_EQUAL(column.Type(), VT_DATE); - } - { - auto column = TColumnSchema().Type(NTi::Interval64()); - UNIT_ASSERT_VALUES_EQUAL(column.Required(), true); - UNIT_ASSERT_VALUES_EQUAL(column.Type(), VT_INTERVAL64); - } - { - auto column = TColumnSchema().Type(NTi::Optional(NTi::Date32())); - UNIT_ASSERT_VALUES_EQUAL(column.Required(), false); - UNIT_ASSERT_VALUES_EQUAL(column.Type(), VT_DATE32); - } - { - auto column = TColumnSchema().Type(NTi::Null()); - UNIT_ASSERT_VALUES_EQUAL(column.Required(), false); - UNIT_ASSERT_VALUES_EQUAL(column.Type(), VT_NULL); - } - { - auto column = TColumnSchema().Type(NTi::Optional(NTi::Null())); - UNIT_ASSERT_VALUES_EQUAL(column.Required(), false); - UNIT_ASSERT_VALUES_EQUAL(column.Type(), VT_ANY); - } - { - auto column = TColumnSchema().Type(NTi::Decimal(35, 18)); - UNIT_ASSERT_VALUES_EQUAL(column.Required(), true); - UNIT_ASSERT_VALUES_EQUAL(column.Type(), VT_STRING); - } - } - - Y_UNIT_TEST(ToTypeV3) - { - UNIT_ASSERT_VALUES_EQUAL(*ToTypeV3(VT_INT32, true), *NTi::Int32()); - UNIT_ASSERT_VALUES_EQUAL(*ToTypeV3(VT_UTF8, false), *NTi::Optional(NTi::Utf8())); - } - - Y_UNIT_TEST(DeserializeColumn) - { - auto deserialize = [] (TStringBuf yson) { - auto node = NodeFromYsonString(yson); - TColumnSchema column; - Deserialize(column, node); - return column; - }; - - auto column = deserialize("{name=foo; type=int64; required=%false}"); - UNIT_ASSERT_VALUES_EQUAL(column.Name(), "foo"); - UNIT_ASSERT_VALUES_EQUAL(*column.TypeV3(), *NTi::Optional(NTi::Int64())); - - column = deserialize("{name=bar; type=utf8; required=%true; type_v3=utf8}"); - UNIT_ASSERT_VALUES_EQUAL(column.Name(), "bar"); - UNIT_ASSERT_VALUES_EQUAL(*column.TypeV3(), *NTi::Utf8()); - } - - Y_UNIT_TEST(ColumnSchemaEquality) - { - auto base = TColumnSchema() - .Name("col") - .TypeV3(NTi::Optional(NTi::List(NTi::String()))) - .SortOrder(ESortOrder::SO_ASCENDING) - .Lock("lock") - .Expression("x + 12") - .Aggregate("sum") - .Group("group"); - - auto other = base; - ASSERT_SERIALIZABLES_EQUAL(other, base); - other.Name("other"); - ASSERT_SERIALIZABLES_UNEQUAL(other, base); - - other = base; - other.TypeV3(NTi::List(NTi::String())); - ASSERT_SERIALIZABLES_UNEQUAL(other, base); - - other = base; - other.ResetSortOrder(); - ASSERT_SERIALIZABLES_UNEQUAL(other, base); - - other = base; - other.Lock("lock1"); - ASSERT_SERIALIZABLES_UNEQUAL(other, base); - - other = base; - other.Expression("x + 13"); - ASSERT_SERIALIZABLES_UNEQUAL(other, base); - - other = base; - other.ResetAggregate(); - ASSERT_SERIALIZABLES_UNEQUAL(other, base); - - other = base; - other.Group("group1"); - ASSERT_SERIALIZABLES_UNEQUAL(other, base); - } - - Y_UNIT_TEST(TableSchemaEquality) - { - auto col1 = TColumnSchema() - .Name("col1") - .TypeV3(NTi::Optional(NTi::List(NTi::String()))) - .SortOrder(ESortOrder::SO_ASCENDING); - - auto col2 = TColumnSchema() - .Name("col2") - .TypeV3(NTi::Uint32()); - - auto schema = TTableSchema() - .AddColumn(col1) - .AddColumn(col2) - .Strict(true) - .UniqueKeys(true); - - auto other = schema; - ASSERT_SERIALIZABLES_EQUAL(other, schema); - - other.Strict(false); - ASSERT_SERIALIZABLES_UNEQUAL(other, schema); - - other = schema; - other.MutableColumns()[0].TypeV3(NTi::List(NTi::String())); - ASSERT_SERIALIZABLES_UNEQUAL(other, schema); - - other = schema; - other.MutableColumns().push_back(col1); - ASSERT_SERIALIZABLES_UNEQUAL(other, schema); - - other = schema; - other.UniqueKeys(false); - ASSERT_SERIALIZABLES_UNEQUAL(other, schema); - } -} diff --git a/yt/cpp/mapreduce/interface/config_ut.cpp b/yt/cpp/mapreduce/interface/config_ut.cpp deleted file mode 100644 index e49ba02108..0000000000 --- a/yt/cpp/mapreduce/interface/config_ut.cpp +++ /dev/null @@ -1,20 +0,0 @@ -#include <library/cpp/testing/unittest/registar.h> - -#include <yt/cpp/mapreduce/interface/config.h> - -using namespace NYT; - -Y_UNIT_TEST_SUITE(ConfigSuite) -{ - Y_UNIT_TEST(TestReset) { - // very limited test, checks only one config field - - auto origConfig = *TConfig::Get(); - TConfig::Get()->Reset(); - UNIT_ASSERT_VALUES_EQUAL(origConfig.Hosts, TConfig::Get()->Hosts); - - TConfig::Get()->Hosts = "hosts/fb867"; - TConfig::Get()->Reset(); - UNIT_ASSERT_VALUES_EQUAL(origConfig.Hosts, TConfig::Get()->Hosts); - } -} diff --git a/yt/cpp/mapreduce/interface/error_ut.cpp b/yt/cpp/mapreduce/interface/error_ut.cpp deleted file mode 100644 index 03f2751b23..0000000000 --- a/yt/cpp/mapreduce/interface/error_ut.cpp +++ /dev/null @@ -1,81 +0,0 @@ -#include <library/cpp/testing/unittest/registar.h> - -#include <library/cpp/json/json_reader.h> - -#include <yt/cpp/mapreduce/interface/errors.h> -#include <yt/cpp/mapreduce/common/helpers.h> - -using namespace NYT; - -template<> -void Out<NYT::TNode>(IOutputStream& s, const NYT::TNode& node) -{ - s << "TNode:" << NodeToYsonString(node); -} - -Y_UNIT_TEST_SUITE(ErrorSuite) -{ - Y_UNIT_TEST(TestParseJson) - { - // Scary real world error! Бу! - const char* jsonText = - R"""({)""" - R"""("code":500,)""" - R"""("message":"Error resolving path //home/user/link",)""" - R"""("attributes":{)""" - R"""("fid":18446484571700269066,)""" - R"""("method":"Create",)""" - R"""("tid":17558639495721339338,)""" - R"""("datetime":"2017-04-07T13:38:56.474819Z",)""" - R"""("pid":414529,)""" - R"""("host":"build01-01g.yt.yandex.net"},)""" - R"""("inner_errors":[{)""" - R"""("code":1,)""" - R"""("message":"Node //tt cannot have children",)""" - R"""("attributes":{)""" - R"""("fid":18446484571700269066,)""" - R"""("tid":17558639495721339338,)""" - R"""("datetime":"2017-04-07T13:38:56.474725Z",)""" - R"""("pid":414529,)""" - R"""("host":"build01-01g.yt.yandex.net"},)""" - R"""("inner_errors":[]}]})"""; - - NJson::TJsonValue jsonValue; - ReadJsonFastTree(jsonText, &jsonValue, /*throwOnError=*/ true); - - TYtError error(jsonValue); - UNIT_ASSERT_VALUES_EQUAL(error.GetCode(), 500); - UNIT_ASSERT_VALUES_EQUAL(error.GetMessage(), R"""(Error resolving path //home/user/link)"""); - UNIT_ASSERT_VALUES_EQUAL(error.InnerErrors().size(), 1); - UNIT_ASSERT_VALUES_EQUAL(error.InnerErrors()[0].GetCode(), 1); - - UNIT_ASSERT_VALUES_EQUAL(error.HasAttributes(), true); - UNIT_ASSERT_VALUES_EQUAL(error.GetAttributes().at("method"), TNode("Create")); - - UNIT_ASSERT_VALUES_EQUAL(error.GetAllErrorCodes(), TSet<int>({500, 1})); - } - - Y_UNIT_TEST(TestGetYsonText) { - const char* jsonText = - R"""({)""" - R"""("code":500,)""" - R"""("message":"outer error",)""" - R"""("attributes":{)""" - R"""("method":"Create",)""" - R"""("pid":414529},)""" - R"""("inner_errors":[{)""" - R"""("code":1,)""" - R"""("message":"inner error",)""" - R"""("attributes":{},)""" - R"""("inner_errors":[])""" - R"""(}]})"""; - TYtError error; - error.ParseFrom(jsonText); - TString ysonText = error.GetYsonText(); - TYtError error2(NodeFromYsonString(ysonText)); - UNIT_ASSERT_EQUAL( - ysonText, - R"""({"code"=500;"message"="outer error";"attributes"={"method"="Create";"pid"=414529};"inner_errors"=[{"code"=1;"message"="inner error"}]})"""); - UNIT_ASSERT_EQUAL(error2.GetYsonText(), ysonText); - } -} diff --git a/yt/cpp/mapreduce/interface/format_ut.cpp b/yt/cpp/mapreduce/interface/format_ut.cpp deleted file mode 100644 index 069c29087d..0000000000 --- a/yt/cpp/mapreduce/interface/format_ut.cpp +++ /dev/null @@ -1,235 +0,0 @@ -#include "common.h" -#include "errors.h" -#include "format.h" -#include "common_ut.h" - -#include <yt/cpp/mapreduce/interface/proto3_ut.pb.h> -#include <yt/cpp/mapreduce/interface/protobuf_table_schema_ut.pb.h> - -#include <library/cpp/testing/unittest/registar.h> - -using namespace NYT; - -static TNode GetColumns(const TFormat& format, int tableIndex = 0) -{ - return format.Config.GetAttributes()["tables"][tableIndex]["columns"]; -} - -Y_UNIT_TEST_SUITE(ProtobufFormat) -{ - Y_UNIT_TEST(TIntegral) - { - const auto format = TFormat::Protobuf<NUnitTesting::TIntegral>(); - auto columns = GetColumns(format); - - struct TColumn - { - TString Name; - TString ProtoType; - int FieldNumber; - }; - - auto expected = TVector<TColumn>{ - {"DoubleField", "double", 1}, - {"FloatField", "float", 2}, - {"Int32Field", "int32", 3}, - {"Int64Field", "int64", 4}, - {"Uint32Field", "uint32", 5}, - {"Uint64Field", "uint64", 6}, - {"Sint32Field", "sint32", 7}, - {"Sint64Field", "sint64", 8}, - {"Fixed32Field", "fixed32", 9}, - {"Fixed64Field", "fixed64", 10}, - {"Sfixed32Field", "sfixed32", 11}, - {"Sfixed64Field", "sfixed64", 12}, - {"BoolField", "bool", 13}, - {"EnumField", "enum_string", 14}, - }; - - UNIT_ASSERT_VALUES_EQUAL(columns.Size(), expected.size()); - for (int i = 0; i < static_cast<int>(columns.Size()); ++i) { - UNIT_ASSERT_VALUES_EQUAL(columns[i]["name"], expected[i].Name); - UNIT_ASSERT_VALUES_EQUAL(columns[i]["proto_type"], expected[i].ProtoType); - UNIT_ASSERT_VALUES_EQUAL(columns[i]["field_number"], expected[i].FieldNumber); - } - } - - Y_UNIT_TEST(TRowFieldSerializationOption) - { - const auto format = TFormat::Protobuf<NUnitTesting::TRowFieldSerializationOption>(); - auto columns = GetColumns(format); - - UNIT_ASSERT_VALUES_EQUAL(columns[0]["name"], "UrlRow_1"); - UNIT_ASSERT_VALUES_EQUAL(columns[0]["proto_type"], "structured_message"); - UNIT_ASSERT_VALUES_EQUAL(columns[0]["field_number"], 1); - const auto& fields = columns[0]["fields"]; - UNIT_ASSERT_VALUES_EQUAL(fields[0]["name"], "Host"); - UNIT_ASSERT_VALUES_EQUAL(fields[0]["proto_type"], "string"); - UNIT_ASSERT_VALUES_EQUAL(fields[0]["field_number"], 1); - - UNIT_ASSERT_VALUES_EQUAL(fields[1]["name"], "Path"); - UNIT_ASSERT_VALUES_EQUAL(fields[1]["proto_type"], "string"); - UNIT_ASSERT_VALUES_EQUAL(fields[1]["field_number"], 2); - - UNIT_ASSERT_VALUES_EQUAL(fields[2]["name"], "HttpCode"); - UNIT_ASSERT_VALUES_EQUAL(fields[2]["proto_type"], "sint32"); - UNIT_ASSERT_VALUES_EQUAL(fields[2]["field_number"], 3); - - UNIT_ASSERT_VALUES_EQUAL(columns[1]["name"], "UrlRow_2"); - UNIT_ASSERT_VALUES_EQUAL(columns[1]["proto_type"], "message"); - UNIT_ASSERT_VALUES_EQUAL(columns[1]["field_number"], 2); - } - - Y_UNIT_TEST(Packed) - { - const auto format = TFormat::Protobuf<NUnitTesting::TPacked>(); - auto column = GetColumns(format)[0]; - - UNIT_ASSERT_VALUES_EQUAL(column["name"], "PackedListInt64"); - UNIT_ASSERT_VALUES_EQUAL(column["proto_type"], "int64"); - UNIT_ASSERT_VALUES_EQUAL(column["field_number"], 1); - UNIT_ASSERT_VALUES_EQUAL(column["packed"], true); - UNIT_ASSERT_VALUES_EQUAL(column["repeated"], true); - } - - Y_UNIT_TEST(Cyclic) - { - UNIT_ASSERT_EXCEPTION(TFormat::Protobuf<NUnitTesting::TCyclic>(), TApiUsageError); - UNIT_ASSERT_EXCEPTION(TFormat::Protobuf<NUnitTesting::TCyclic::TA>(), TApiUsageError); - UNIT_ASSERT_EXCEPTION(TFormat::Protobuf<NUnitTesting::TCyclic::TB>(), TApiUsageError); - UNIT_ASSERT_EXCEPTION(TFormat::Protobuf<NUnitTesting::TCyclic::TC>(), TApiUsageError); - UNIT_ASSERT_EXCEPTION(TFormat::Protobuf<NUnitTesting::TCyclic::TD>(), TApiUsageError); - - const auto format = TFormat::Protobuf<NUnitTesting::TCyclic::TE>(); - auto column = GetColumns(format)[0]; - UNIT_ASSERT_VALUES_EQUAL(column["name"], "d"); - UNIT_ASSERT_VALUES_EQUAL(column["proto_type"], "message"); - UNIT_ASSERT_VALUES_EQUAL(column["field_number"], 1); - } - - Y_UNIT_TEST(Map) - { - const auto format = TFormat::Protobuf<NUnitTesting::TWithMap>(); - auto columns = GetColumns(format); - - UNIT_ASSERT_VALUES_EQUAL(columns.Size(), 5); - { - const auto& column = columns[0]; - UNIT_ASSERT_VALUES_EQUAL(column["name"], "MapDefault"); - UNIT_ASSERT_VALUES_EQUAL(column["proto_type"], "structured_message"); - UNIT_ASSERT_VALUES_EQUAL(column["fields"].Size(), 2); - UNIT_ASSERT_VALUES_EQUAL(column["fields"][0]["proto_type"], "int64"); - UNIT_ASSERT_VALUES_EQUAL(column["fields"][1]["proto_type"], "message"); - } - { - const auto& column = columns[1]; - UNIT_ASSERT_VALUES_EQUAL(column["name"], "MapListOfStructsLegacy"); - UNIT_ASSERT_VALUES_EQUAL(column["proto_type"], "structured_message"); - UNIT_ASSERT_VALUES_EQUAL(column["fields"].Size(), 2); - UNIT_ASSERT_VALUES_EQUAL(column["fields"][0]["proto_type"], "int64"); - UNIT_ASSERT_VALUES_EQUAL(column["fields"][1]["proto_type"], "message"); - } - { - const auto& column = columns[2]; - UNIT_ASSERT_VALUES_EQUAL(column["name"], "MapListOfStructs"); - UNIT_ASSERT_VALUES_EQUAL(column["proto_type"], "structured_message"); - UNIT_ASSERT_VALUES_EQUAL(column["fields"].Size(), 2); - UNIT_ASSERT_VALUES_EQUAL(column["fields"][0]["proto_type"], "int64"); - UNIT_ASSERT_VALUES_EQUAL(column["fields"][1]["proto_type"], "structured_message"); - } - { - const auto& column = columns[3]; - UNIT_ASSERT_VALUES_EQUAL(column["name"], "MapOptionalDict"); - UNIT_ASSERT_VALUES_EQUAL(column["proto_type"], "structured_message"); - UNIT_ASSERT_VALUES_EQUAL(column["fields"].Size(), 2); - UNIT_ASSERT_VALUES_EQUAL(column["fields"][0]["proto_type"], "int64"); - UNIT_ASSERT_VALUES_EQUAL(column["fields"][1]["proto_type"], "structured_message"); - } - { - const auto& column = columns[4]; - UNIT_ASSERT_VALUES_EQUAL(column["name"], "MapDict"); - UNIT_ASSERT_VALUES_EQUAL(column["proto_type"], "structured_message"); - UNIT_ASSERT_VALUES_EQUAL(column["fields"].Size(), 2); - UNIT_ASSERT_VALUES_EQUAL(column["fields"][0]["proto_type"], "int64"); - UNIT_ASSERT_VALUES_EQUAL(column["fields"][1]["proto_type"], "structured_message"); - } - } - - Y_UNIT_TEST(Oneof) - { - const auto format = TFormat::Protobuf<NUnitTesting::TWithOneof>(); - auto columns = GetColumns(format); - - UNIT_ASSERT_VALUES_EQUAL(columns.Size(), 4); - auto check = [] (const TNode& column, TStringBuf name, TStringBuf oneof2Name) { - UNIT_ASSERT_VALUES_EQUAL(column["name"], name); - UNIT_ASSERT_VALUES_EQUAL(column["proto_type"], "structured_message"); - UNIT_ASSERT_VALUES_EQUAL(column["fields"].Size(), 5); - UNIT_ASSERT_VALUES_EQUAL(column["fields"][0]["name"], "field"); - - const auto& oneof2 = column["fields"][1]; - UNIT_ASSERT_VALUES_EQUAL(oneof2["name"], oneof2Name); - UNIT_ASSERT_VALUES_EQUAL(oneof2["proto_type"], "oneof"); - UNIT_ASSERT_VALUES_EQUAL(oneof2["fields"][0]["name"], "y2"); - UNIT_ASSERT_VALUES_EQUAL(oneof2["fields"][1]["name"], "z2"); - UNIT_ASSERT_VALUES_EQUAL(oneof2["fields"][1]["proto_type"], "structured_message"); - const auto& embeddedOneof = oneof2["fields"][1]["fields"][0]; - UNIT_ASSERT_VALUES_EQUAL(embeddedOneof["name"], "Oneof"); - UNIT_ASSERT_VALUES_EQUAL(embeddedOneof["fields"][0]["name"], "x"); - UNIT_ASSERT_VALUES_EQUAL(embeddedOneof["fields"][1]["name"], "y"); - UNIT_ASSERT_VALUES_EQUAL(oneof2["fields"][2]["name"], "x2"); - - UNIT_ASSERT_VALUES_EQUAL(column["fields"][2]["name"], "x1"); - UNIT_ASSERT_VALUES_EQUAL(column["fields"][3]["name"], "y1"); - UNIT_ASSERT_VALUES_EQUAL(column["fields"][4]["name"], "z1"); - }; - - check(columns[0], "DefaultSeparateFields", "variant_field_name"); - check(columns[1], "NoDefault", "Oneof2"); - - { - const auto& column = columns[2]; - UNIT_ASSERT_VALUES_EQUAL(column["name"], "SerializationProtobuf"); - UNIT_ASSERT_VALUES_EQUAL(column["proto_type"], "structured_message"); - UNIT_ASSERT_VALUES_EQUAL(column["fields"].Size(), 3); - UNIT_ASSERT_VALUES_EQUAL(column["fields"][0]["name"], "x1"); - UNIT_ASSERT_VALUES_EQUAL(column["fields"][1]["name"], "y1"); - UNIT_ASSERT_VALUES_EQUAL(column["fields"][2]["name"], "z1"); - } - { - const auto& column = columns[3]; - UNIT_ASSERT_VALUES_EQUAL(column["name"], "TopLevelOneof"); - UNIT_ASSERT_VALUES_EQUAL(column["proto_type"], "oneof"); - UNIT_ASSERT_VALUES_EQUAL(column["fields"].Size(), 1); - UNIT_ASSERT_VALUES_EQUAL(column["fields"][0]["name"], "MemberOfTopLevelOneof"); - } - } -} - -Y_UNIT_TEST_SUITE(Proto3) -{ - Y_UNIT_TEST(TWithOptional) - { - const auto format = TFormat::Protobuf<NTestingProto3::TWithOptional>(); - auto columns = GetColumns(format); - - UNIT_ASSERT_VALUES_EQUAL(columns[0]["name"], "x"); - UNIT_ASSERT_VALUES_EQUAL(columns[0]["proto_type"], "int64"); - UNIT_ASSERT_VALUES_EQUAL(columns[0]["field_number"], 1); - } - - Y_UNIT_TEST(TWithOptionalMessage) - { - const auto format = TFormat::Protobuf<NTestingProto3::TWithOptionalMessage>(); - auto columns = GetColumns(format); - - UNIT_ASSERT_VALUES_EQUAL(columns[0]["name"], "x"); - UNIT_ASSERT_VALUES_EQUAL(columns[0]["proto_type"], "structured_message"); - UNIT_ASSERT_VALUES_EQUAL(columns[0]["field_number"], 1); - - UNIT_ASSERT_VALUES_EQUAL(columns[0]["fields"].Size(), 1); - UNIT_ASSERT_VALUES_EQUAL(columns[0]["fields"][0]["name"], "x"); - UNIT_ASSERT_VALUES_EQUAL(columns[0]["fields"][0]["proto_type"], "int64"); - UNIT_ASSERT_VALUES_EQUAL(columns[0]["fields"][0]["field_number"], 1); - } -} diff --git a/yt/cpp/mapreduce/interface/job_counters.cpp b/yt/cpp/mapreduce/interface/job_counters.cpp index 6d4a2a6fcb..717982e216 100644 --- a/yt/cpp/mapreduce/interface/job_counters.cpp +++ b/yt/cpp/mapreduce/interface/job_counters.cpp @@ -60,7 +60,7 @@ ui64 TJobCounter::GetValue(const TStringBuf key) const //////////////////////////////////////////////////////////////////// -TJobCounters::TJobCounters(const NYT::TNode& counters) +TJobCounters::TJobCounters(const TNode& counters) : Total_(0) { if (!counters.IsMap()) { diff --git a/yt/cpp/mapreduce/interface/job_counters.h b/yt/cpp/mapreduce/interface/job_counters.h index 9257cc1ec1..573cbe4784 100644 --- a/yt/cpp/mapreduce/interface/job_counters.h +++ b/yt/cpp/mapreduce/interface/job_counters.h @@ -33,7 +33,7 @@ public: /// /// Construct counter from counters node. - TJobCounters(const NYT::TNode& counters); + TJobCounters(const TNode& counters); const TJobCounter& GetAborted() const; const TJobCounter& GetAbortedScheduled() const; diff --git a/yt/cpp/mapreduce/interface/job_counters_ut.cpp b/yt/cpp/mapreduce/interface/job_counters_ut.cpp deleted file mode 100644 index 56d3932b8f..0000000000 --- a/yt/cpp/mapreduce/interface/job_counters_ut.cpp +++ /dev/null @@ -1,103 +0,0 @@ -#include <yt/cpp/mapreduce/interface/job_counters.h> -#include <yt/cpp/mapreduce/interface/operation.h> - -#include <library/cpp/yson/node/node_io.h> - -#include <library/cpp/testing/unittest/registar.h> - -using namespace NYT; - -Y_UNIT_TEST_SUITE(JobCounters) -{ - Y_UNIT_TEST(Full) - { - const TString input = R"""( - { - "completed" = { - "total" = 6; - "non-interrupted" = 1; - "interrupted" = { - "whatever_interrupted" = 2; - "whatever_else_interrupted" = 3; - }; - }; - "aborted" = { - "non_scheduled" = { - "whatever_non_scheduled" = 4; - "whatever_else_non_scheduled" = 5; - }; - "scheduled" = { - "whatever_scheduled" = 6; - "whatever_else_scheduled" = 7; - }; - "total" = 22; - }; - "lost" = 8; - "invalidated" = 9; - "failed" = 10; - "running" = 11; - "suspended" = 12; - "pending" = 13; - "blocked" = 14; - "total" = 105; - })"""; - - TJobCounters counters(NodeFromYsonString(input)); - - UNIT_ASSERT_VALUES_EQUAL(counters.GetTotal(), 105); - - UNIT_ASSERT_VALUES_EQUAL(counters.GetCompleted().GetTotal(), 6); - UNIT_ASSERT_VALUES_EQUAL(counters.GetCompletedNonInterrupted().GetTotal(), 1); - UNIT_ASSERT_VALUES_EQUAL(counters.GetCompletedInterrupted().GetTotal(), 5); - UNIT_ASSERT_VALUES_EQUAL(counters.GetAborted().GetTotal(), 22); - UNIT_ASSERT_VALUES_EQUAL(counters.GetAbortedNonScheduled().GetTotal(), 9); - UNIT_ASSERT_VALUES_EQUAL(counters.GetAbortedScheduled().GetTotal(), 13); - UNIT_ASSERT_VALUES_EQUAL(counters.GetLost().GetTotal(), 8); - UNIT_ASSERT_VALUES_EQUAL(counters.GetInvalidated().GetTotal(), 9); - UNIT_ASSERT_VALUES_EQUAL(counters.GetFailed().GetTotal(), 10); - UNIT_ASSERT_VALUES_EQUAL(counters.GetRunning().GetTotal(), 11); - UNIT_ASSERT_VALUES_EQUAL(counters.GetSuspended().GetTotal(), 12); - UNIT_ASSERT_VALUES_EQUAL(counters.GetPending().GetTotal(), 13); - UNIT_ASSERT_VALUES_EQUAL(counters.GetBlocked().GetTotal(), 14); - - UNIT_ASSERT_VALUES_EQUAL(counters.GetCompletedInterrupted().GetValue("whatever_interrupted"), 2); - UNIT_ASSERT_VALUES_EQUAL(counters.GetCompletedInterrupted().GetValue("whatever_else_interrupted"), 3); - UNIT_ASSERT_VALUES_EQUAL(counters.GetAbortedNonScheduled().GetValue("whatever_non_scheduled"), 4); - UNIT_ASSERT_VALUES_EQUAL(counters.GetAbortedNonScheduled().GetValue("whatever_else_non_scheduled"), 5); - UNIT_ASSERT_VALUES_EQUAL(counters.GetAbortedScheduled().GetValue("whatever_scheduled"), 6); - UNIT_ASSERT_VALUES_EQUAL(counters.GetAbortedScheduled().GetValue("whatever_else_scheduled"), 7); - - UNIT_ASSERT_EXCEPTION(counters.GetCompletedInterrupted().GetValue("Nothingness"), yexception); - } - - Y_UNIT_TEST(Empty) - { - const TString input = "{}"; - - TJobCounters counters(NodeFromYsonString(input)); - - UNIT_ASSERT_VALUES_EQUAL(counters.GetTotal(), 0); - - UNIT_ASSERT_VALUES_EQUAL(counters.GetCompleted().GetTotal(), 0); - UNIT_ASSERT_VALUES_EQUAL(counters.GetCompletedNonInterrupted().GetTotal(), 0); - UNIT_ASSERT_VALUES_EQUAL(counters.GetCompletedInterrupted().GetTotal(), 0); - UNIT_ASSERT_VALUES_EQUAL(counters.GetAborted().GetTotal(), 0); - UNIT_ASSERT_VALUES_EQUAL(counters.GetAbortedNonScheduled().GetTotal(), 0); - UNIT_ASSERT_VALUES_EQUAL(counters.GetAbortedScheduled().GetTotal(), 0); - UNIT_ASSERT_VALUES_EQUAL(counters.GetLost().GetTotal(), 0); - UNIT_ASSERT_VALUES_EQUAL(counters.GetInvalidated().GetTotal(), 0); - UNIT_ASSERT_VALUES_EQUAL(counters.GetFailed().GetTotal(), 0); - UNIT_ASSERT_VALUES_EQUAL(counters.GetRunning().GetTotal(), 0); - UNIT_ASSERT_VALUES_EQUAL(counters.GetSuspended().GetTotal(), 0); - UNIT_ASSERT_VALUES_EQUAL(counters.GetPending().GetTotal(), 0); - UNIT_ASSERT_VALUES_EQUAL(counters.GetBlocked().GetTotal(), 0); - } - - Y_UNIT_TEST(Broken) - { - UNIT_ASSERT_EXCEPTION_CONTAINS(TJobCounters(TNode()), yexception, "TJobCounters"); - UNIT_ASSERT_EXCEPTION_CONTAINS(TJobCounters(TNode(1)), yexception, "TJobCounters"); - UNIT_ASSERT_EXCEPTION_CONTAINS(TJobCounters(TNode(1.0)), yexception, "TJobCounters"); - UNIT_ASSERT_EXCEPTION_CONTAINS(TJobCounters(TNode("Whatever")), yexception, "TJobCounters"); - } -} diff --git a/yt/cpp/mapreduce/interface/job_statistics_ut.cpp b/yt/cpp/mapreduce/interface/job_statistics_ut.cpp deleted file mode 100644 index 2603a4fbf0..0000000000 --- a/yt/cpp/mapreduce/interface/job_statistics_ut.cpp +++ /dev/null @@ -1,257 +0,0 @@ -#include <yt/cpp/mapreduce/interface/job_statistics.h> -#include <yt/cpp/mapreduce/interface/operation.h> - -#include <library/cpp/yson/node/node_io.h> - -#include <library/cpp/testing/unittest/registar.h> - -using namespace NYT; - -Y_UNIT_TEST_SUITE(JobStatistics) -{ - Y_UNIT_TEST(Simple) - { - const TString input = R"""( - { - "data" = { - "output" = { - "0" = { - "uncompressed_data_size" = { - "$" = { - "completed" = { - "simple_sort" = { - "max" = 130; - "count" = 1; - "min" = 130; - "sum" = 130; - }; - "map" = { - "max" = 42; - "count" = 1; - "min" = 42; - "sum" = 42; - }; - }; - "aborted" = { - "simple_sort" = { - "max" = 24; - "count" = 1; - "min" = 24; - "sum" = 24; - }; - }; - }; - }; - }; - }; - }; - })"""; - - TJobStatistics stat(NodeFromYsonString(input)); - - UNIT_ASSERT(stat.HasStatistics("data/output/0/uncompressed_data_size")); - UNIT_ASSERT(!stat.HasStatistics("nonexistent-statistics")); - UNIT_ASSERT_EXCEPTION_CONTAINS(stat.GetStatistics("BLAH-BLAH"), yexception, "Statistics"); - - UNIT_ASSERT_VALUES_EQUAL(stat.GetStatisticsNames(), TVector<TString>{"data/output/0/uncompressed_data_size"}); - - UNIT_ASSERT_VALUES_EQUAL(stat.GetStatistics("data/output/0/uncompressed_data_size").Max(), 130); - UNIT_ASSERT_VALUES_EQUAL(stat.GetStatistics("data/output/0/uncompressed_data_size").Count(), 2); - UNIT_ASSERT_VALUES_EQUAL(stat.GetStatistics("data/output/0/uncompressed_data_size").Min(), 42); - UNIT_ASSERT_VALUES_EQUAL(stat.GetStatistics("data/output/0/uncompressed_data_size").Sum(), 172); - UNIT_ASSERT_VALUES_EQUAL(stat.GetStatistics("data/output/0/uncompressed_data_size").Avg(), 172 / 2); - - UNIT_ASSERT_VALUES_EQUAL(stat.JobState({EJobState::Aborted}).GetStatistics("data/output/0/uncompressed_data_size").Sum(), 24); - UNIT_ASSERT_VALUES_EQUAL(stat.JobType({EJobType::Map}).JobState({EJobState::Aborted}).GetStatistics("data/output/0/uncompressed_data_size").Sum(), TMaybe<i64>()); - } - - Y_UNIT_TEST(TestOtherTypes) - { - const TString input = R"""( - { - "time" = { - "exec" = { - "$" = { - "completed" = { - "map" = { - "max" = 2482468; - "count" = 38; - "min" = 578976; - "sum" = 47987270; - }; - }; - }; - }; - }; - })"""; - - TJobStatistics stat(NodeFromYsonString(input)); - - UNIT_ASSERT_VALUES_EQUAL(stat.GetStatisticsAs<TDuration>("time/exec").Max(), TDuration::MilliSeconds(2482468)); - } - - Y_UNIT_TEST(Custom) - { - const TString input = R"""( - { - "custom" = { - "some" = { - "path" = { - "$" = { - "completed" = { - "map" = { - "max" = -1; - "count" = 1; - "min" = -1; - "sum" = -1; - }; - }; - }; - }; - }; - "another" = { - "path" = { - "$" = { - "completed" = { - "map" = { - "max" = 1001; - "count" = 2; - "min" = 1001; - "sum" = 2002; - }; - }; - }; - }; - }; - }; - })"""; - - TJobStatistics stat(NodeFromYsonString(input)); - - UNIT_ASSERT(stat.HasCustomStatistics("some/path")); - UNIT_ASSERT(!stat.HasCustomStatistics("nonexistent-statistics")); - UNIT_ASSERT_EXCEPTION_CONTAINS(stat.GetCustomStatistics("BLAH-BLAH"), yexception, "Statistics"); - - const auto names = stat.GetCustomStatisticsNames(); - const THashSet<TString> expected = {"some/path", "another/path"}; - UNIT_ASSERT_VALUES_EQUAL(THashSet<TString>(names.begin(), names.end()), expected); - - UNIT_ASSERT_VALUES_EQUAL(stat.GetCustomStatistics("some/path").Max(), -1); - UNIT_ASSERT_VALUES_EQUAL(stat.GetCustomStatistics("another/path").Avg(), 1001); - } - - Y_UNIT_TEST(TaskNames) - { - const TString input = R"""( - { - "data" = { - "output" = { - "0" = { - "uncompressed_data_size" = { - "$" = { - "completed" = { - "partition_map" = { - "max" = 130; - "count" = 1; - "min" = 130; - "sum" = 130; - }; - "partition(0)" = { - "max" = 42; - "count" = 1; - "min" = 42; - "sum" = 42; - }; - }; - "aborted" = { - "simple_sort" = { - "max" = 24; - "count" = 1; - "min" = 24; - "sum" = 24; - }; - }; - }; - }; - }; - }; - }; - })"""; - - TJobStatistics stat(NodeFromYsonString(input)); - - UNIT_ASSERT(stat.HasStatistics("data/output/0/uncompressed_data_size")); - UNIT_ASSERT(!stat.HasStatistics("nonexistent-statistics")); - UNIT_ASSERT_EXCEPTION_CONTAINS(stat.GetStatistics("BLAH-BLAH"), yexception, "Statistics"); - - UNIT_ASSERT_VALUES_EQUAL(stat.GetStatisticsNames(), TVector<TString>{"data/output/0/uncompressed_data_size"}); - - UNIT_ASSERT_VALUES_EQUAL(stat.GetStatistics("data/output/0/uncompressed_data_size").Max(), 130); - UNIT_ASSERT_VALUES_EQUAL(stat.GetStatistics("data/output/0/uncompressed_data_size").Count(), 2); - UNIT_ASSERT_VALUES_EQUAL(stat.GetStatistics("data/output/0/uncompressed_data_size").Min(), 42); - UNIT_ASSERT_VALUES_EQUAL(stat.GetStatistics("data/output/0/uncompressed_data_size").Sum(), 172); - UNIT_ASSERT_VALUES_EQUAL(stat.GetStatistics("data/output/0/uncompressed_data_size").Avg(), 172 / 2); - - UNIT_ASSERT_VALUES_EQUAL( - stat - .JobState({EJobState::Aborted}) - .GetStatistics("data/output/0/uncompressed_data_size") - .Sum(), - 24); - UNIT_ASSERT_VALUES_EQUAL( - stat - .JobType({EJobType::Partition}) - .JobState({EJobState::Aborted}) - .GetStatistics("data/output/0/uncompressed_data_size") - .Sum(), - TMaybe<i64>()); - UNIT_ASSERT_VALUES_EQUAL( - stat - .TaskName({"partition(0)"}) - .GetStatistics("data/output/0/uncompressed_data_size") - .Sum(), - 42); - UNIT_ASSERT_VALUES_EQUAL( - stat - .TaskName({"partition"}) - .GetStatistics("data/output/0/uncompressed_data_size") - .Sum(), - TMaybe<i64>()); - UNIT_ASSERT_VALUES_EQUAL( - stat - .TaskName({"partition_map(0)"}) - .GetStatistics("data/output/0/uncompressed_data_size") - .Sum(), - 130); - UNIT_ASSERT_VALUES_EQUAL( - stat - .JobType({EJobType::Partition}) - .GetStatistics("data/output/0/uncompressed_data_size") - .Sum(), - 42); - UNIT_ASSERT_VALUES_EQUAL( - stat - .JobType({EJobType::PartitionMap}) - .GetStatistics("data/output/0/uncompressed_data_size") - .Sum(), - 130); - UNIT_ASSERT_VALUES_EQUAL( - stat - .TaskName({ETaskName::Partition0}) - .GetStatistics("data/output/0/uncompressed_data_size") - .Sum(), - 42); - UNIT_ASSERT_VALUES_EQUAL( - stat - .TaskName({ETaskName::Partition1}) - .GetStatistics("data/output/0/uncompressed_data_size") - .Sum(), - TMaybe<i64>()); - UNIT_ASSERT_VALUES_EQUAL( - stat - .TaskName({ETaskName::PartitionMap0}) - .GetStatistics("data/output/0/uncompressed_data_size") - .Sum(), - 130); - } -} diff --git a/yt/cpp/mapreduce/interface/logging/ut/log_ut.cpp b/yt/cpp/mapreduce/interface/logging/ut/log_ut.cpp new file mode 100644 index 0000000000..b79b2f707f --- /dev/null +++ b/yt/cpp/mapreduce/interface/logging/ut/log_ut.cpp @@ -0,0 +1,19 @@ +#include <yt/cpp/mapreduce/interface/logging/logger.h> + +#include <library/cpp/testing/gtest/gtest.h> + +#include <util/string/cast.h> + +using namespace NYT; + +TEST(TLoggingTest, FromString) { + EXPECT_EQ(FromString("error"), ILogger::ELevel::ERROR); + EXPECT_EQ(FromString("warning"), ILogger::ELevel::ERROR); + EXPECT_EQ(FromString("info"), ILogger::ELevel::INFO); + EXPECT_EQ(FromString("debug"), ILogger::ELevel::DEBUG); + EXPECT_EQ(FromString("ERROR"), ILogger::ELevel::ERROR); + EXPECT_EQ(FromString("WARNING"), ILogger::ELevel::ERROR); + EXPECT_EQ(FromString("INFO"), ILogger::ELevel::INFO); + EXPECT_EQ(FromString("DEBUG"), ILogger::ELevel::DEBUG); + EXPECT_THROW(FromString<ILogger::ELevel>("no"), yexception); +} diff --git a/yt/cpp/mapreduce/interface/logging/ut/ya.make b/yt/cpp/mapreduce/interface/logging/ut/ya.make new file mode 100644 index 0000000000..53d94509fb --- /dev/null +++ b/yt/cpp/mapreduce/interface/logging/ut/ya.make @@ -0,0 +1,11 @@ +GTEST() + +SRCS( + log_ut.cpp +) + +PEERDIR( + yt/cpp/mapreduce/interface/logging +) + +END() diff --git a/yt/cpp/mapreduce/interface/logging/ya.make b/yt/cpp/mapreduce/interface/logging/ya.make index 8095bfe4ba..03407caf14 100644 --- a/yt/cpp/mapreduce/interface/logging/ya.make +++ b/yt/cpp/mapreduce/interface/logging/ya.make @@ -14,3 +14,7 @@ PEERDIR( GENERATE_ENUM_SERIALIZATION(logger.h) END() + +RECURSE_FOR_TESTS( + ut +) diff --git a/yt/cpp/mapreduce/interface/operation_ut.cpp b/yt/cpp/mapreduce/interface/operation_ut.cpp deleted file mode 100644 index 0fa62e1568..0000000000 --- a/yt/cpp/mapreduce/interface/operation_ut.cpp +++ /dev/null @@ -1,269 +0,0 @@ -#include <yt/cpp/mapreduce/interface/common_ut.h> -#include <yt/cpp/mapreduce/interface/job_statistics.h> -#include <yt/cpp/mapreduce/interface/operation.h> -#include <yt/cpp/mapreduce/interface/protobuf_table_schema_ut.pb.h> - -#include <yt/cpp/mapreduce/tests/yt_unittest_lib/yt_unittest_lib.h> - -#include <library/cpp/yson/node/node_io.h> - -#include <library/cpp/testing/unittest/registar.h> - -using namespace NYT; -using namespace NYT::NUnitTesting; - -class TDummyInferenceContext - : public IOperationPreparationContext -{ -public: - TDummyInferenceContext(int inputCount, int outputCount) - : InputCount_(inputCount) - , OutputCount_(outputCount) - , InputSchemas_(inputCount) - { } - - int GetInputCount() const override - { - return InputCount_; - } - - int GetOutputCount() const override - { - return OutputCount_; - } - - const TVector<TTableSchema>& GetInputSchemas() const override - { - return InputSchemas_; - } - - const TTableSchema& GetInputSchema(int index) const override - { - return InputSchemas_[index]; - } - - TMaybe<TYPath> GetInputPath(int) const override - { - return Nothing(); - } - - TMaybe<TYPath> GetOutputPath(int) const override - { - return Nothing(); - } - -private: - int InputCount_; - int OutputCount_; - TVector<TTableSchema> InputSchemas_; -}; - -Y_UNIT_TEST_SUITE(PrepareOperation) -{ - - Y_UNIT_TEST(BasicSchemas) - { - auto firstSchema = TTableSchema() - .AddColumn(TColumnSchema().Name("some_column").Type(EValueType::VT_UINT64)); - auto otherSchema = TTableSchema() - .AddColumn(TColumnSchema().Name("other_column").Type(EValueType::VT_BOOLEAN)); - auto thirdSchema = TTableSchema() - .AddColumn(TColumnSchema().Name("third_column").Type(EValueType::VT_STRING)); - - TDummyInferenceContext context(3,7); - TJobOperationPreparer builder(context); - - builder - .OutputSchema(1, firstSchema) - .BeginOutputGroup(TVector<int>{2, 5}) - .Schema(otherSchema) - .EndOutputGroup() - .BeginOutputGroup(3, 5) - .Schema(thirdSchema) - .EndOutputGroup() - .BeginOutputGroup(TVector<int>{0, 6}) - .Schema(thirdSchema) - .EndOutputGroup(); - - UNIT_ASSERT_EXCEPTION(builder.OutputSchema(1, otherSchema), TApiUsageError); - UNIT_ASSERT_EXCEPTION(builder.BeginOutputGroup(3, 5).Schema(otherSchema), TApiUsageError); - UNIT_ASSERT_EXCEPTION(builder.BeginOutputGroup(TVector<int>{3,6,7}).Schema(otherSchema), TApiUsageError); - - builder.Finish(); - auto result = builder.GetOutputSchemas(); - - ASSERT_SERIALIZABLES_EQUAL(result[0], thirdSchema); - ASSERT_SERIALIZABLES_EQUAL(result[1], firstSchema); - ASSERT_SERIALIZABLES_EQUAL(result[2], otherSchema); - ASSERT_SERIALIZABLES_EQUAL(result[3], thirdSchema); - ASSERT_SERIALIZABLES_EQUAL(result[4], thirdSchema); - ASSERT_SERIALIZABLES_EQUAL(result[5], otherSchema); - ASSERT_SERIALIZABLES_EQUAL(result[6], thirdSchema); - } - - Y_UNIT_TEST(NoSchema) - { - auto schema = TTableSchema() - .AddColumn(TColumnSchema().Name("some_column").Type(EValueType::VT_UINT64)); - - TDummyInferenceContext context(3,4); - TJobOperationPreparer builder(context); - - builder - .OutputSchema(1, schema) - .NoOutputSchema(0) - .BeginOutputGroup(2, 4) - .Schema(schema) - .EndOutputGroup(); - - UNIT_ASSERT_EXCEPTION(builder.OutputSchema(0, schema), TApiUsageError); - - builder.Finish(); - auto result = builder.GetOutputSchemas(); - - UNIT_ASSERT(result[0].Empty()); - - ASSERT_SERIALIZABLES_EQUAL(result[1], schema); - ASSERT_SERIALIZABLES_EQUAL(result[2], schema); - ASSERT_SERIALIZABLES_EQUAL(result[3], schema); - } - - Y_UNIT_TEST(Descriptions) - { - auto urlRowSchema = TTableSchema() - .AddColumn(TColumnSchema().Name("Host").Type(NTi::Optional(NTi::String()))) - .AddColumn(TColumnSchema().Name("Path").Type(NTi::Optional(NTi::String()))) - .AddColumn(TColumnSchema().Name("HttpCode").Type(NTi::Optional(NTi::Int32()))); - - auto urlRowStruct = NTi::Struct({ - {"Host", NTi::Optional(NTi::String())}, - {"Path", NTi::Optional(NTi::String())}, - {"HttpCode", NTi::Optional(NTi::Int32())}, - }); - - auto rowFieldSerializationOptionSchema = TTableSchema() - .AddColumn(TColumnSchema().Name("UrlRow_1").Type(NTi::Optional(urlRowStruct))) - .AddColumn(TColumnSchema().Name("UrlRow_2").Type(NTi::Optional(NTi::String()))); - - auto rowSerializedRepeatedFieldsSchema = TTableSchema() - .AddColumn(TColumnSchema().Name("Ints").Type(NTi::List(NTi::Int64()))) - .AddColumn(TColumnSchema().Name("UrlRows").Type(NTi::List(urlRowStruct))); - - TDummyInferenceContext context(5,7); - TJobOperationPreparer builder(context); - - builder - .InputDescription<TUrlRow>(0) - .BeginInputGroup(2, 3) - .Description<TUrlRow>() - .EndInputGroup() - .BeginInputGroup(TVector<int>{1, 4}) - .Description<TRowSerializedRepeatedFields>() - .EndInputGroup() - .InputDescription<TUrlRow>(3); - - UNIT_ASSERT_EXCEPTION(builder.InputDescription<TUrlRow>(0), TApiUsageError); - - builder - .OutputDescription<TUrlRow>(0, false) - .OutputDescription<TRowFieldSerializationOption>(1) - .BeginOutputGroup(2, 4) - .Description<TUrlRow>() - .EndOutputGroup() - .BeginOutputGroup(TVector<int>{4,6}) - .Description<TRowSerializedRepeatedFields>() - .EndOutputGroup() - .OutputDescription<TUrlRow>(5, false); - - UNIT_ASSERT_EXCEPTION(builder.OutputDescription<TUrlRow>(0), TApiUsageError); - UNIT_ASSERT_NO_EXCEPTION(builder.OutputSchema(0, urlRowSchema)); - UNIT_ASSERT_NO_EXCEPTION(builder.OutputSchema(5, urlRowSchema)); - UNIT_ASSERT_EXCEPTION(builder.OutputSchema(1, urlRowSchema), TApiUsageError); - - builder.Finish(); - auto result = builder.GetOutputSchemas(); - - ASSERT_SERIALIZABLES_EQUAL(result[0], urlRowSchema); - ASSERT_SERIALIZABLES_EQUAL(result[1], rowFieldSerializationOptionSchema); - ASSERT_SERIALIZABLES_EQUAL(result[2], urlRowSchema); - ASSERT_SERIALIZABLES_EQUAL(result[3], urlRowSchema); - ASSERT_SERIALIZABLES_EQUAL(result[4], rowSerializedRepeatedFieldsSchema); - ASSERT_SERIALIZABLES_EQUAL(result[5], urlRowSchema); - ASSERT_SERIALIZABLES_EQUAL(result[6], rowSerializedRepeatedFieldsSchema); - - auto expectedInputDescriptions = TVector<TMaybe<TTableStructure>>{ - {TProtobufTableStructure{TUrlRow::descriptor()}}, - {TProtobufTableStructure{TRowSerializedRepeatedFields::descriptor()}}, - {TProtobufTableStructure{TUrlRow::descriptor()}}, - {TProtobufTableStructure{TUrlRow::descriptor()}}, - {TProtobufTableStructure{TRowSerializedRepeatedFields::descriptor()}}, - }; - UNIT_ASSERT_EQUAL(expectedInputDescriptions, builder.GetInputDescriptions()); - - auto expectedOutputDescriptions = TVector<TMaybe<TTableStructure>>{ - {TProtobufTableStructure{TUrlRow::descriptor()}}, - {TProtobufTableStructure{TRowFieldSerializationOption::descriptor()}}, - {TProtobufTableStructure{TUrlRow::descriptor()}}, - {TProtobufTableStructure{TUrlRow::descriptor()}}, - {TProtobufTableStructure{TRowSerializedRepeatedFields::descriptor()}}, - {TProtobufTableStructure{TUrlRow::descriptor()}}, - {TProtobufTableStructure{TRowSerializedRepeatedFields::descriptor()}}, - }; - UNIT_ASSERT_EQUAL(expectedOutputDescriptions, builder.GetOutputDescriptions()); - } - - Y_UNIT_TEST(InputColumns) - { - TDummyInferenceContext context(5, 1); - TJobOperationPreparer builder(context); - builder - .InputColumnFilter(2, {"a", "b"}) - .BeginInputGroup(0, 2) - .ColumnFilter({"b", "c"}) - .ColumnRenaming({{"b", "B"}, {"c", "C"}}) - .EndInputGroup() - .InputColumnRenaming(3, {{"a", "AAA"}}) - .NoOutputSchema(0); - builder.Finish(); - - auto expectedRenamings = TVector<THashMap<TString, TString>>{ - {{"b", "B"}, {"c", "C"}}, - {{"b", "B"}, {"c", "C"}}, - {}, - {{"a", "AAA"}}, - {}, - }; - UNIT_ASSERT_EQUAL(builder.GetInputColumnRenamings(), expectedRenamings); - - auto expectedFilters = TVector<TMaybe<TVector<TString>>>{ - {{"b", "c"}}, - {{"b", "c"}}, - {{"a", "b"}}, - {}, - {}, - }; - UNIT_ASSERT_EQUAL(builder.GetInputColumnFilters(), expectedFilters); - } - - Y_UNIT_TEST(Bug_r7349102) - { - auto firstSchema = TTableSchema() - .AddColumn(TColumnSchema().Name("some_column").Type(EValueType::VT_UINT64)); - auto otherSchema = TTableSchema() - .AddColumn(TColumnSchema().Name("other_column").Type(EValueType::VT_BOOLEAN)); - auto thirdSchema = TTableSchema() - .AddColumn(TColumnSchema().Name("third_column").Type(EValueType::VT_STRING)); - - TDummyInferenceContext context(3,1); - TJobOperationPreparer builder(context); - - builder - .InputDescription<TUrlRow>(0) - .InputDescription<TUrlRow>(1) - .InputDescription<TUrlRow>(2) - .OutputDescription<TUrlRow>(0); - - builder.Finish(); - } - -} // Y_UNIT_TEST_SUITE(SchemaInference) diff --git a/yt/cpp/mapreduce/interface/protobuf_file_options_ut.cpp b/yt/cpp/mapreduce/interface/protobuf_file_options_ut.cpp deleted file mode 100644 index 5ffa9564d7..0000000000 --- a/yt/cpp/mapreduce/interface/protobuf_file_options_ut.cpp +++ /dev/null @@ -1,271 +0,0 @@ -#include "errors.h" -#include "format.h" -#include "common_ut.h" - -#include <yt/cpp/mapreduce/interface/protobuf_file_options_ut.pb.h> - -#include <yt/cpp/mapreduce/tests/yt_unittest_lib/yt_unittest_lib.h> - -#include <library/cpp/testing/unittest/registar.h> - -using namespace NYT; - -Y_UNIT_TEST_SUITE(ProtobufFileOptions) -{ - NTi::TTypePtr GetUrlRowType(bool required) - { - static const NTi::TTypePtr structType = NTi::Struct({ - {"Host", ToTypeV3(EValueType::VT_STRING, false)}, - {"Path", ToTypeV3(EValueType::VT_STRING, false)}, - {"HttpCode", ToTypeV3(EValueType::VT_INT32, false)}}); - return required ? structType : NTi::TTypePtr(NTi::Optional(structType)); - } - - Y_UNIT_TEST(TRowFieldSerializationOption) - { - const auto schema = CreateTableSchema<NTestingFileOptions::TRowFieldSerializationOption>(); - - ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema() - .AddColumn(TColumnSchema().Name("UrlRow_1").Type(ToTypeV3(EValueType::VT_STRING, false))) - .AddColumn(TColumnSchema().Name("UrlRow_2").Type(GetUrlRowType(false)))); - } - - Y_UNIT_TEST(TRowMixedSerializationOptions) - { - const auto schema = CreateTableSchema<NTestingFileOptions::TRowMixedSerializationOptions>(); - - ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema() - .AddColumn(TColumnSchema().Name("UrlRow_1").Type(ToTypeV3(EValueType::VT_STRING, false))) - .AddColumn(TColumnSchema().Name("UrlRow_2").Type(GetUrlRowType(false)))); - } - - Y_UNIT_TEST(FieldSortOrder) - { - const auto schema = CreateTableSchema<NTestingFileOptions::TFieldSortOrder>(); - - auto asInProtoFile = NTi::Optional(NTi::Struct({ - {"x", NTi::Optional(NTi::Int64())}, - {"y", NTi::Optional(NTi::String())}, - {"z", NTi::Optional(NTi::Bool())}, - })); - auto byFieldNumber = NTi::Optional(NTi::Struct({ - {"z", NTi::Optional(NTi::Bool())}, - {"x", NTi::Optional(NTi::Int64())}, - {"y", NTi::Optional(NTi::String())}, - })); - - ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema() - .AddColumn(TColumnSchema().Name("EmbeddedDefault").Type(asInProtoFile)) - .AddColumn(TColumnSchema().Name("EmbeddedAsInProtoFile").Type(asInProtoFile)) - .AddColumn(TColumnSchema().Name("EmbeddedByFieldNumber").Type(byFieldNumber))); - } - - Y_UNIT_TEST(Map) - { - const auto schema = CreateTableSchema<NTestingFileOptions::TWithMap>(); - - auto createKeyValueStruct = [] (NTi::TTypePtr key, NTi::TTypePtr value) { - return NTi::List(NTi::Struct({ - {"key", NTi::Optional(key)}, - {"value", NTi::Optional(value)}, - })); - }; - - auto embedded = NTi::Struct({ - {"x", NTi::Optional(NTi::Int64())}, - {"y", NTi::Optional(NTi::String())}, - }); - - ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema() - .AddColumn(TColumnSchema() - .Name("MapDefault") - .Type(createKeyValueStruct(NTi::Int64(), embedded))) - .AddColumn(TColumnSchema() - .Name("MapDict") - .Type(NTi::Dict(NTi::Int64(), embedded)))); - } - - Y_UNIT_TEST(Oneof) - { - const auto schema = CreateTableSchema<NTestingFileOptions::TWithOneof>(); - - auto embedded = NTi::Struct({ - {"x", NTi::Optional(NTi::Int64())}, - {"y", NTi::Optional(NTi::String())}, - }); - - auto defaultVariantType = NTi::Optional(NTi::Struct({ - {"field", NTi::Optional(NTi::String())}, - {"Oneof2", NTi::Optional(NTi::Variant(NTi::Struct({ - {"y2", NTi::String()}, - {"z2", embedded}, - {"x2", NTi::Int64()}, - })))}, - {"x1", NTi::Optional(NTi::Int64())}, - {"y1", NTi::Optional(NTi::String())}, - {"z1", NTi::Optional(embedded)}, - })); - - auto noDefaultType = NTi::Optional(NTi::Struct({ - {"field", NTi::Optional(NTi::String())}, - {"y2", NTi::Optional(NTi::String())}, - {"z2", NTi::Optional(embedded)}, - {"x2", NTi::Optional(NTi::Int64())}, - {"x1", NTi::Optional(NTi::Int64())}, - {"y1", NTi::Optional(NTi::String())}, - {"z1", NTi::Optional(embedded)}, - })); - - ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema() - .AddColumn(TColumnSchema() - .Name("DefaultVariant") - .Type(defaultVariantType) - ) - .AddColumn(TColumnSchema() - .Name("NoDefault") - .Type(noDefaultType) - ) - .AddColumn(TColumnSchema() - .Name("SerializationProtobuf") - .Type(NTi::Optional(NTi::Struct({ - {"x1", NTi::Optional(NTi::Int64())}, - {"y1", NTi::Optional(NTi::String())}, - {"z1", NTi::Optional(NTi::String())}, - }))) - ) - .AddColumn(TColumnSchema() - .Name("MemberOfTopLevelOneof") - .Type(NTi::Optional(NTi::Int64())) - ) - ); - } -} - -static TNode GetColumns(const TFormat& format, int tableIndex = 0) -{ - return format.Config.GetAttributes()["tables"][tableIndex]["columns"]; -} - -Y_UNIT_TEST_SUITE(ProtobufFormatFileOptions) -{ - Y_UNIT_TEST(TRowFieldSerializationOption) - { - const auto format = TFormat::Protobuf<NTestingFileOptions::TRowFieldSerializationOption>(); - auto columns = GetColumns(format); - - UNIT_ASSERT_VALUES_EQUAL(columns[0]["name"], "UrlRow_1"); - UNIT_ASSERT_VALUES_EQUAL(columns[0]["proto_type"], "message"); - UNIT_ASSERT_VALUES_EQUAL(columns[0]["field_number"], 1); - - UNIT_ASSERT_VALUES_EQUAL(columns[1]["name"], "UrlRow_2"); - UNIT_ASSERT_VALUES_EQUAL(columns[1]["proto_type"], "structured_message"); - UNIT_ASSERT_VALUES_EQUAL(columns[1]["field_number"], 2); - const auto& fields = columns[1]["fields"]; - UNIT_ASSERT_VALUES_EQUAL(fields[0]["name"], "Host"); - UNIT_ASSERT_VALUES_EQUAL(fields[0]["proto_type"], "string"); - UNIT_ASSERT_VALUES_EQUAL(fields[0]["field_number"], 1); - - UNIT_ASSERT_VALUES_EQUAL(fields[1]["name"], "Path"); - UNIT_ASSERT_VALUES_EQUAL(fields[1]["proto_type"], "string"); - UNIT_ASSERT_VALUES_EQUAL(fields[1]["field_number"], 2); - - UNIT_ASSERT_VALUES_EQUAL(fields[2]["name"], "HttpCode"); - UNIT_ASSERT_VALUES_EQUAL(fields[2]["proto_type"], "sint32"); - UNIT_ASSERT_VALUES_EQUAL(fields[2]["field_number"], 3); - } - - Y_UNIT_TEST(Map) - { - const auto format = TFormat::Protobuf<NTestingFileOptions::TWithMap>(); - auto columns = GetColumns(format); - - UNIT_ASSERT_VALUES_EQUAL(columns.Size(), 2); - { - const auto& column = columns[0]; - UNIT_ASSERT_VALUES_EQUAL(column["name"], "MapDefault"); - UNIT_ASSERT_VALUES_EQUAL(column["proto_type"], "structured_message"); - UNIT_ASSERT_VALUES_EQUAL(column["fields"].Size(), 2); - UNIT_ASSERT_VALUES_EQUAL(column["fields"][0]["proto_type"], "int64"); - UNIT_ASSERT_VALUES_EQUAL(column["fields"][1]["proto_type"], "structured_message"); - } - { - const auto& column = columns[1]; - UNIT_ASSERT_VALUES_EQUAL(column["name"], "MapDict"); - UNIT_ASSERT_VALUES_EQUAL(column["proto_type"], "structured_message"); - UNIT_ASSERT_VALUES_EQUAL(column["fields"].Size(), 2); - UNIT_ASSERT_VALUES_EQUAL(column["fields"][0]["proto_type"], "int64"); - UNIT_ASSERT_VALUES_EQUAL(column["fields"][1]["proto_type"], "structured_message"); - } - } - - Y_UNIT_TEST(Oneof) - { - const auto format = TFormat::Protobuf<NTestingFileOptions::TWithOneof>(); - auto columns = GetColumns(format); - - UNIT_ASSERT_VALUES_EQUAL(columns.Size(), 4); - - { - const auto& column = columns[0]; - UNIT_ASSERT_VALUES_EQUAL(column["name"], "DefaultVariant"); - UNIT_ASSERT_VALUES_EQUAL(column["proto_type"], "structured_message"); - UNIT_ASSERT_VALUES_EQUAL(column["fields"].Size(), 5); - UNIT_ASSERT_VALUES_EQUAL(column["fields"][0]["name"], "field"); - - const auto& oneof2 = column["fields"][1]; - UNIT_ASSERT_VALUES_EQUAL(oneof2["name"], "Oneof2"); - UNIT_ASSERT_VALUES_EQUAL(oneof2["proto_type"], "oneof"); - UNIT_ASSERT_VALUES_EQUAL(oneof2["fields"][0]["name"], "y2"); - UNIT_ASSERT_VALUES_EQUAL(oneof2["fields"][1]["name"], "z2"); - UNIT_ASSERT_VALUES_EQUAL(oneof2["fields"][1]["proto_type"], "structured_message"); - const auto& embeddedFields = oneof2["fields"][1]["fields"]; - UNIT_ASSERT_VALUES_EQUAL(embeddedFields[0]["name"], "x"); - UNIT_ASSERT_VALUES_EQUAL(embeddedFields[1]["name"], "y"); - - UNIT_ASSERT_VALUES_EQUAL(oneof2["fields"][2]["name"], "x2"); - - UNIT_ASSERT_VALUES_EQUAL(column["fields"][2]["name"], "x1"); - UNIT_ASSERT_VALUES_EQUAL(column["fields"][3]["name"], "y1"); - UNIT_ASSERT_VALUES_EQUAL(column["fields"][4]["name"], "z1"); - }; - - { - const auto& column = columns[1]; - UNIT_ASSERT_VALUES_EQUAL(column["name"], "NoDefault"); - UNIT_ASSERT_VALUES_EQUAL(column["proto_type"], "structured_message"); - const auto& fields = column["fields"]; - UNIT_ASSERT_VALUES_EQUAL(fields.Size(), 7); - - UNIT_ASSERT_VALUES_EQUAL(fields[0]["name"], "field"); - - UNIT_ASSERT_VALUES_EQUAL(fields[1]["name"], "y2"); - - UNIT_ASSERT_VALUES_EQUAL(fields[2]["name"], "z2"); - UNIT_ASSERT_VALUES_EQUAL(fields[2]["proto_type"], "structured_message"); - const auto& embeddedFields = fields[2]["fields"]; - UNIT_ASSERT_VALUES_EQUAL(embeddedFields[0]["name"], "x"); - UNIT_ASSERT_VALUES_EQUAL(embeddedFields[1]["name"], "y"); - - UNIT_ASSERT_VALUES_EQUAL(fields[3]["name"], "x2"); - - UNIT_ASSERT_VALUES_EQUAL(fields[4]["name"], "x1"); - UNIT_ASSERT_VALUES_EQUAL(fields[5]["name"], "y1"); - UNIT_ASSERT_VALUES_EQUAL(fields[6]["name"], "z1"); - }; - - { - const auto& column = columns[2]; - UNIT_ASSERT_VALUES_EQUAL(column["name"], "SerializationProtobuf"); - UNIT_ASSERT_VALUES_EQUAL(column["proto_type"], "structured_message"); - UNIT_ASSERT_VALUES_EQUAL(column["fields"].Size(), 3); - UNIT_ASSERT_VALUES_EQUAL(column["fields"][0]["name"], "x1"); - UNIT_ASSERT_VALUES_EQUAL(column["fields"][1]["name"], "y1"); - UNIT_ASSERT_VALUES_EQUAL(column["fields"][2]["name"], "z1"); - } - { - const auto& column = columns[3]; - UNIT_ASSERT_VALUES_EQUAL(column["name"], "MemberOfTopLevelOneof"); - UNIT_ASSERT_VALUES_EQUAL(column["proto_type"], "int64"); - } - } -} diff --git a/yt/cpp/mapreduce/interface/protobuf_table_schema_ut.cpp b/yt/cpp/mapreduce/interface/protobuf_table_schema_ut.cpp deleted file mode 100644 index 19a3d5163f..0000000000 --- a/yt/cpp/mapreduce/interface/protobuf_table_schema_ut.cpp +++ /dev/null @@ -1,451 +0,0 @@ -#include "common.h" -#include "errors.h" -#include "common_ut.h" -#include "util/generic/fwd.h" - -#include <yt/cpp/mapreduce/interface/protobuf_table_schema_ut.pb.h> -#include <yt/cpp/mapreduce/interface/proto3_ut.pb.h> - -#include <yt/cpp/mapreduce/tests/yt_unittest_lib/yt_unittest_lib.h> - -#include <library/cpp/testing/unittest/registar.h> - -#include <algorithm> - -using namespace NYT; - -bool IsFieldPresent(const TTableSchema& schema, TStringBuf name) -{ - for (const auto& field : schema.Columns()) { - if (field.Name() == name) { - return true; - } - } - return false; -} - -Y_UNIT_TEST_SUITE(ProtoSchemaTest_Simple) -{ - Y_UNIT_TEST(TIntegral) - { - const auto schema = CreateTableSchema<NUnitTesting::TIntegral>(); - - ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema() - .AddColumn(TColumnSchema().Name("DoubleField").Type(ToTypeV3(EValueType::VT_DOUBLE, false))) - .AddColumn(TColumnSchema().Name("FloatField").Type(ToTypeV3(EValueType::VT_DOUBLE, false))) - .AddColumn(TColumnSchema().Name("Int32Field").Type(ToTypeV3(EValueType::VT_INT32, false))) - .AddColumn(TColumnSchema().Name("Int64Field").Type(ToTypeV3(EValueType::VT_INT64, false))) - .AddColumn(TColumnSchema().Name("Uint32Field").Type(ToTypeV3(EValueType::VT_UINT32, false))) - .AddColumn(TColumnSchema().Name("Uint64Field").Type(ToTypeV3(EValueType::VT_UINT64, false))) - .AddColumn(TColumnSchema().Name("Sint32Field").Type(ToTypeV3(EValueType::VT_INT32, false))) - .AddColumn(TColumnSchema().Name("Sint64Field").Type(ToTypeV3(EValueType::VT_INT64, false))) - .AddColumn(TColumnSchema().Name("Fixed32Field").Type(ToTypeV3(EValueType::VT_UINT32, false))) - .AddColumn(TColumnSchema().Name("Fixed64Field").Type(ToTypeV3(EValueType::VT_UINT64, false))) - .AddColumn(TColumnSchema().Name("Sfixed32Field").Type(ToTypeV3(EValueType::VT_INT32, false))) - .AddColumn(TColumnSchema().Name("Sfixed64Field").Type(ToTypeV3(EValueType::VT_INT64, false))) - .AddColumn(TColumnSchema().Name("BoolField").Type(ToTypeV3(EValueType::VT_BOOLEAN, false))) - .AddColumn(TColumnSchema().Name("EnumField").Type(ToTypeV3(EValueType::VT_STRING, false)))); - } - - Y_UNIT_TEST(TOneOf) - { - const auto schema = CreateTableSchema<NUnitTesting::TOneOf>(); - - ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema() - .AddColumn(TColumnSchema().Name("DoubleField").Type(ToTypeV3(EValueType::VT_DOUBLE, false))) - .AddColumn(TColumnSchema().Name("Int32Field").Type(ToTypeV3(EValueType::VT_INT32, false))) - .AddColumn(TColumnSchema().Name("BoolField").Type(ToTypeV3(EValueType::VT_BOOLEAN, false)))); - } - - Y_UNIT_TEST(TWithRequired) - { - const auto schema = CreateTableSchema<NUnitTesting::TWithRequired>(); - - ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema() - .AddColumn(TColumnSchema().Name("RequiredField").Type(ToTypeV3(EValueType::VT_STRING, true))) - .AddColumn(TColumnSchema().Name("NotRequiredField").Type(ToTypeV3(EValueType::VT_STRING, false)))); - } - - Y_UNIT_TEST(TAggregated) - { - const auto schema = CreateTableSchema<NUnitTesting::TAggregated>(); - - UNIT_ASSERT_VALUES_EQUAL(6, schema.Columns().size()); - ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema() - .AddColumn(TColumnSchema().Name("StringField").Type(ToTypeV3(EValueType::VT_STRING, false))) - .AddColumn(TColumnSchema().Name("BytesField").Type(ToTypeV3(EValueType::VT_STRING, false))) - .AddColumn(TColumnSchema().Name("NestedField").Type(ToTypeV3(EValueType::VT_STRING, false))) - .AddColumn(TColumnSchema().Name("NestedRepeatedField").Type(ToTypeV3(EValueType::VT_STRING, false))) - .AddColumn(TColumnSchema().Name("NestedOneOfField").Type(ToTypeV3(EValueType::VT_STRING, false))) - .AddColumn(TColumnSchema().Name("NestedRecursiveField").Type(ToTypeV3(EValueType::VT_STRING, false)))); - } - - Y_UNIT_TEST(TAliased) - { - const auto schema = CreateTableSchema<NUnitTesting::TAliased>(); - - ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema() - .AddColumn(TColumnSchema().Name("key").Type(ToTypeV3(EValueType::VT_INT32, false))) - .AddColumn(TColumnSchema().Name("subkey").Type(ToTypeV3(EValueType::VT_DOUBLE, false))) - .AddColumn(TColumnSchema().Name("Data").Type(ToTypeV3(EValueType::VT_STRING, false)))); - } - - Y_UNIT_TEST(SortColumns) - { - const TSortColumns keys = {"key", "subkey"}; - - const auto schema = CreateTableSchema<NUnitTesting::TAliased>(keys); - - ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema() - .AddColumn(TColumnSchema() - .Name("key") - .Type(ToTypeV3(EValueType::VT_INT32, false)) - .SortOrder(ESortOrder::SO_ASCENDING)) - .AddColumn(TColumnSchema() - .Name("subkey") - .Type(ToTypeV3(EValueType::VT_DOUBLE, false)) - .SortOrder(ESortOrder::SO_ASCENDING)) - .AddColumn(TColumnSchema().Name("Data").Type(ToTypeV3(EValueType::VT_STRING, false)))); - } - - Y_UNIT_TEST(SortColumnsReordered) - { - const TSortColumns keys = {"subkey"}; - - const auto schema = CreateTableSchema<NUnitTesting::TAliased>(keys); - - ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema() - .AddColumn(TColumnSchema() - .Name("subkey") - .Type(ToTypeV3(EValueType::VT_DOUBLE, false)) - .SortOrder(ESortOrder::SO_ASCENDING)) - .AddColumn(TColumnSchema().Name("key").Type(ToTypeV3(EValueType::VT_INT32, false))) - .AddColumn(TColumnSchema().Name("Data").Type(ToTypeV3(EValueType::VT_STRING, false)))); - } - - Y_UNIT_TEST(SortColumnsInvalid) - { - UNIT_ASSERT_EXCEPTION(CreateTableSchema<NUnitTesting::TAliased>({"subkey", "subkey"}), yexception); - UNIT_ASSERT_EXCEPTION(CreateTableSchema<NUnitTesting::TAliased>({"key", "junk"}), yexception); - } - - Y_UNIT_TEST(KeepFieldsWithoutExtensionTrue) - { - const auto schema = CreateTableSchema<NUnitTesting::TAliased>({}, true); - UNIT_ASSERT(IsFieldPresent(schema, "key")); - UNIT_ASSERT(IsFieldPresent(schema, "subkey")); - UNIT_ASSERT(IsFieldPresent(schema, "Data")); - UNIT_ASSERT(schema.Strict()); - } - - Y_UNIT_TEST(KeepFieldsWithoutExtensionFalse) - { - const auto schema = CreateTableSchema<NUnitTesting::TAliased>({}, false); - UNIT_ASSERT(IsFieldPresent(schema, "key")); - UNIT_ASSERT(IsFieldPresent(schema, "subkey")); - UNIT_ASSERT(!IsFieldPresent(schema, "Data")); - UNIT_ASSERT(schema.Strict()); - } - - Y_UNIT_TEST(ProtobufTypeOption) - { - const auto schema = CreateTableSchema<NUnitTesting::TWithTypeOptions>({}); - - ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema() - .Strict(false) - .AddColumn(TColumnSchema().Name("ColorIntField").Type(ToTypeV3(EValueType::VT_INT64, false))) - .AddColumn(TColumnSchema().Name("ColorStringField").Type(ToTypeV3(EValueType::VT_STRING, false))) - .AddColumn(TColumnSchema().Name("AnyField").Type(ToTypeV3(EValueType::VT_ANY, false))) - .AddColumn(TColumnSchema().Name("EmbeddedField").Type( - NTi::Optional(NTi::Struct({ - {"ColorIntField", ToTypeV3(EValueType::VT_INT64, false)}, - {"ColorStringField", ToTypeV3(EValueType::VT_STRING, false)}, - {"AnyField", ToTypeV3(EValueType::VT_ANY, false)}})))) - .AddColumn(TColumnSchema().Name("RepeatedEnumIntField").Type(NTi::List(NTi::Int64())))); - } - - Y_UNIT_TEST(ProtobufTypeOption_TypeMismatch) - { - UNIT_ASSERT_EXCEPTION( - CreateTableSchema<NUnitTesting::TWithTypeOptions_TypeMismatch_EnumInt>({}), - yexception); - UNIT_ASSERT_EXCEPTION( - CreateTableSchema<NUnitTesting::TWithTypeOptions_TypeMismatch_EnumString>({}), - yexception); - UNIT_ASSERT_EXCEPTION( - CreateTableSchema<NUnitTesting::TWithTypeOptions_TypeMismatch_Any>({}), - yexception); - UNIT_ASSERT_EXCEPTION( - CreateTableSchema<NUnitTesting::TWithTypeOptions_TypeMismatch_OtherColumns>({}), - yexception); - } -} - -Y_UNIT_TEST_SUITE(ProtoSchemaTest_Complex) -{ - Y_UNIT_TEST(TRepeated) - { - UNIT_ASSERT_EXCEPTION(CreateTableSchema<NUnitTesting::TRepeated>(), yexception); - - const auto schema = CreateTableSchema<NUnitTesting::TRepeatedYtMode>(); - ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema() - .AddColumn(TColumnSchema().Name("Int32Field").Type(NTi::List(ToTypeV3(EValueType::VT_INT32, true))))); - } - - Y_UNIT_TEST(TRepeatedOptionalList) - { - const auto schema = CreateTableSchema<NUnitTesting::TOptionalList>(); - auto type = NTi::Optional(NTi::List(NTi::Int64())); - ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema() - .AddColumn(TColumnSchema().Name("OptionalListInt64").TypeV3(type))); - } - - NTi::TTypePtr GetUrlRowType(bool required) - { - static const NTi::TTypePtr structType = NTi::Struct({ - {"Host", ToTypeV3(EValueType::VT_STRING, false)}, - {"Path", ToTypeV3(EValueType::VT_STRING, false)}, - {"HttpCode", ToTypeV3(EValueType::VT_INT32, false)}}); - return required ? structType : NTi::TTypePtr(NTi::Optional(structType)); - } - - Y_UNIT_TEST(TRowFieldSerializationOption) - { - const auto schema = CreateTableSchema<NUnitTesting::TRowFieldSerializationOption>(); - - ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema() - .AddColumn(TColumnSchema().Name("UrlRow_1").Type(GetUrlRowType(false))) - .AddColumn(TColumnSchema().Name("UrlRow_2").Type(ToTypeV3(EValueType::VT_STRING, false)))); - } - - Y_UNIT_TEST(TRowMessageSerializationOption) - { - const auto schema = CreateTableSchema<NUnitTesting::TRowMessageSerializationOption>(); - - ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema() - .AddColumn(TColumnSchema().Name("UrlRow_1").Type(GetUrlRowType(false))) - .AddColumn(TColumnSchema().Name("UrlRow_2").Type(GetUrlRowType(false)))); - } - - Y_UNIT_TEST(TRowMixedSerializationOptions) - { - const auto schema = CreateTableSchema<NUnitTesting::TRowMixedSerializationOptions>(); - - ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema() - .AddColumn(TColumnSchema().Name("UrlRow_1").Type(GetUrlRowType(false))) - .AddColumn(TColumnSchema().Name("UrlRow_2").Type(ToTypeV3(EValueType::VT_STRING, false)))); - } - - NTi::TTypePtr GetUrlRowType_ColumnNames(bool required) - { - static const NTi::TTypePtr type = NTi::Struct({ - {"Host_ColumnName", ToTypeV3(EValueType::VT_STRING, false)}, - {"Path_KeyColumnName", ToTypeV3(EValueType::VT_STRING, false)}, - {"HttpCode", ToTypeV3(EValueType::VT_INT32, false)}, - }); - return required ? type : NTi::TTypePtr(NTi::Optional(type)); - } - - Y_UNIT_TEST(TRowMixedSerializationOptions_ColumnNames) - { - const auto schema = CreateTableSchema<NUnitTesting::TRowMixedSerializationOptions_ColumnNames>(); - - ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema() - .AddColumn(TColumnSchema().Name("UrlRow_1").Type(GetUrlRowType_ColumnNames(false))) - .AddColumn(TColumnSchema().Name("UrlRow_2").Type(ToTypeV3(EValueType::VT_STRING, false)))); - } - - Y_UNIT_TEST(NoOptionInheritance) - { - auto deepestEmbedded = NTi::Optional(NTi::Struct({{"x", ToTypeV3(EValueType::VT_INT64, false)}})); - - const auto schema = CreateTableSchema<NUnitTesting::TNoOptionInheritance>(); - - ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema() - .AddColumn(TColumnSchema() - .Name("EmbeddedYt_YtOption") - .Type(NTi::Optional(NTi::Struct({{"embedded", deepestEmbedded}})))) - .AddColumn(TColumnSchema().Name("EmbeddedYt_ProtobufOption").Type(ToTypeV3(EValueType::VT_STRING, false))) - .AddColumn(TColumnSchema().Name("EmbeddedYt_NoOption").Type(ToTypeV3(EValueType::VT_STRING, false))) - .AddColumn(TColumnSchema() - .Name("EmbeddedProtobuf_YtOption") - .Type(NTi::Optional(NTi::Struct({{"embedded", ToTypeV3(EValueType::VT_STRING, false)}})))) - .AddColumn(TColumnSchema().Name("EmbeddedProtobuf_ProtobufOption").Type(ToTypeV3(EValueType::VT_STRING, false))) - .AddColumn(TColumnSchema().Name("EmbeddedProtobuf_NoOption").Type(ToTypeV3(EValueType::VT_STRING, false))) - .AddColumn(TColumnSchema() - .Name("Embedded_YtOption") - .Type(NTi::Optional(NTi::Struct({{"embedded", ToTypeV3(EValueType::VT_STRING, false)}})))) - .AddColumn(TColumnSchema().Name("Embedded_ProtobufOption").Type(ToTypeV3(EValueType::VT_STRING, false))) - .AddColumn(TColumnSchema().Name("Embedded_NoOption").Type(ToTypeV3(EValueType::VT_STRING, false)))); - } - - Y_UNIT_TEST(Cyclic) - { - UNIT_ASSERT_EXCEPTION(CreateTableSchema<NUnitTesting::TCyclic>(), TApiUsageError); - UNIT_ASSERT_EXCEPTION(CreateTableSchema<NUnitTesting::TCyclic::TA>(), TApiUsageError); - UNIT_ASSERT_EXCEPTION(CreateTableSchema<NUnitTesting::TCyclic::TB>(), TApiUsageError); - UNIT_ASSERT_EXCEPTION(CreateTableSchema<NUnitTesting::TCyclic::TC>(), TApiUsageError); - UNIT_ASSERT_EXCEPTION(CreateTableSchema<NUnitTesting::TCyclic::TD>(), TApiUsageError); - - ASSERT_SERIALIZABLES_EQUAL( - TTableSchema().AddColumn( - TColumnSchema().Name("d").TypeV3(NTi::Optional(NTi::String()))), - CreateTableSchema<NUnitTesting::TCyclic::TE>()); - } - - Y_UNIT_TEST(FieldSortOrder) - { - const auto schema = CreateTableSchema<NUnitTesting::TFieldSortOrder>(); - - auto byFieldNumber = NTi::Optional(NTi::Struct({ - {"z", NTi::Optional(NTi::Bool())}, - {"x", NTi::Optional(NTi::Int64())}, - {"y", NTi::Optional(NTi::String())}, - })); - - ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema() - .AddColumn(TColumnSchema().Name("EmbeddedDefault").Type(byFieldNumber)) - .AddColumn(TColumnSchema() - .Name("EmbeddedAsInProtoFile") - .Type(NTi::Optional(NTi::Struct({ - {"x", NTi::Optional(NTi::Int64())}, - {"y", NTi::Optional(NTi::String())}, - {"z", NTi::Optional(NTi::Bool())}, - })))) - .AddColumn(TColumnSchema().Name("EmbeddedByFieldNumber").Type(byFieldNumber))); - } - - Y_UNIT_TEST(Map) - { - const auto schema = CreateTableSchema<NUnitTesting::TWithMap>(); - - auto createKeyValueStruct = [] (NTi::TTypePtr key, NTi::TTypePtr value) { - return NTi::List(NTi::Struct({ - {"key", NTi::Optional(key)}, - {"value", NTi::Optional(value)}, - })); - }; - - auto embedded = NTi::Struct({ - {"x", NTi::Optional(NTi::Int64())}, - {"y", NTi::Optional(NTi::String())}, - }); - - ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema() - .AddColumn(TColumnSchema() - .Name("MapDefault") - .Type(createKeyValueStruct(NTi::Int64(), NTi::String()))) - .AddColumn(TColumnSchema() - .Name("MapListOfStructsLegacy") - .Type(createKeyValueStruct(NTi::Int64(), NTi::String()))) - .AddColumn(TColumnSchema() - .Name("MapListOfStructs") - .Type(createKeyValueStruct(NTi::Int64(), embedded))) - .AddColumn(TColumnSchema() - .Name("MapOptionalDict") - .Type(NTi::Optional(NTi::Dict(NTi::Int64(), embedded)))) - .AddColumn(TColumnSchema() - .Name("MapDict") - .Type(NTi::Dict(NTi::Int64(), embedded)))); - } - - Y_UNIT_TEST(Oneof) - { - const auto schema = CreateTableSchema<NUnitTesting::TWithOneof>(); - - auto embedded = NTi::Struct({ - {"Oneof", NTi::Optional(NTi::Variant(NTi::Struct({ - {"x", NTi::Int64()}, - {"y", NTi::String()}, - })))}, - }); - - auto createType = [&] (TString oneof2Name) { - return NTi::Optional(NTi::Struct({ - {"field", NTi::Optional(NTi::String())}, - {oneof2Name, NTi::Optional(NTi::Variant(NTi::Struct({ - {"x2", NTi::Int64()}, - {"y2", NTi::String()}, - {"z2", embedded}, - })))}, - {"y1", NTi::Optional(NTi::String())}, - {"z1", NTi::Optional(embedded)}, - {"x1", NTi::Optional(NTi::Int64())}, - })); - }; - - ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema() - .AddColumn(TColumnSchema() - .Name("DefaultSeparateFields") - .Type(createType("variant_field_name"))) - .AddColumn(TColumnSchema() - .Name("NoDefault") - .Type(createType("Oneof2"))) - .AddColumn(TColumnSchema() - .Name("SerializationProtobuf") - .Type(NTi::Optional(NTi::Struct({ - {"y1", NTi::Optional(NTi::String())}, - {"x1", NTi::Optional(NTi::Int64())}, - {"z1", NTi::Optional(NTi::String())}, - })))) - .AddColumn(TColumnSchema() - .Name("TopLevelOneof") - .Type( - NTi::Optional( - NTi::Variant(NTi::Struct({ - {"MemberOfTopLevelOneof", NTi::Int64()} - })) - ) - )) - ); - } - - Y_UNIT_TEST(Embedded) - { - const auto schema = CreateTableSchema<NUnitTesting::TEmbeddingMessage>(); - ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema() - .Strict(false) - .AddColumn(TColumnSchema().Name("embedded2_num").Type(NTi::Optional(NTi::Uint64()))) - .AddColumn(TColumnSchema().Name("embedded2_struct").Type(NTi::Optional(NTi::Struct({ - {"float1", NTi::Optional(NTi::Double())}, - {"string1", NTi::Optional(NTi::String())}, - })))) - .AddColumn(TColumnSchema().Name("embedded2_repeated").Type(NTi::List(NTi::String()))) - .AddColumn(TColumnSchema().Name("embedded_num").Type(NTi::Optional(NTi::Uint64()))) - .AddColumn(TColumnSchema().Name("embedded_extra_field").Type(NTi::Optional(NTi::String()))) - .AddColumn(TColumnSchema().Name("variant").Type(NTi::Optional(NTi::Variant(NTi::Struct({ - {"str_variant", NTi::String()}, - {"uint_variant", NTi::Uint64()}, - }))))) - .AddColumn(TColumnSchema().Name("num").Type(NTi::Optional(NTi::Uint64()))) - .AddColumn(TColumnSchema().Name("extra_field").Type(NTi::Optional(NTi::String()))) - ); - } -} - -Y_UNIT_TEST_SUITE(ProtoSchemaTest_Proto3) -{ - Y_UNIT_TEST(TWithOptional) - { - const auto schema = CreateTableSchema<NTestingProto3::TWithOptional>(); - ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema() - .AddColumn(TColumnSchema() - .Name("x").Type(NTi::Optional(NTi::Int64())) - ) - ); - } - - Y_UNIT_TEST(TWithOptionalMessage) - { - const auto schema = CreateTableSchema<NTestingProto3::TWithOptionalMessage>(); - ASSERT_SERIALIZABLES_EQUAL(schema, TTableSchema() - .AddColumn(TColumnSchema() - .Name("x").Type( - NTi::Optional( - NTi::Struct({{"x", NTi::Optional(NTi::Int64())}}) - ) - ) - ) - ); - } -} diff --git a/yt/cpp/mapreduce/interface/serialize_ut.cpp b/yt/cpp/mapreduce/interface/serialize_ut.cpp deleted file mode 100644 index 59d4501ee8..0000000000 --- a/yt/cpp/mapreduce/interface/serialize_ut.cpp +++ /dev/null @@ -1,49 +0,0 @@ -#include <yt/cpp/mapreduce/interface/serialize.h> -#include <yt/cpp/mapreduce/interface/common.h> - -#include <library/cpp/yson/node/node_builder.h> - -#include <library/cpp/testing/unittest/registar.h> - -#include <util/generic/serialized_enum.h> - -using namespace NYT; - -Y_UNIT_TEST_SUITE(Serialization) -{ - Y_UNIT_TEST(TableSchema) - { - auto schema = TTableSchema() - .AddColumn(TColumnSchema().Name("a").Type(EValueType::VT_STRING).SortOrder(SO_ASCENDING)) - .AddColumn(TColumnSchema().Name("b").Type(EValueType::VT_UINT64)) - .AddColumn(TColumnSchema().Name("c").Type(EValueType::VT_INT64, true)); - - auto schemaNode = schema.ToNode(); - UNIT_ASSERT(schemaNode.IsList()); - UNIT_ASSERT_VALUES_EQUAL(schemaNode.Size(), 3); - - - UNIT_ASSERT_VALUES_EQUAL(schemaNode[0]["name"], "a"); - UNIT_ASSERT_VALUES_EQUAL(schemaNode[0]["type"], "string"); - UNIT_ASSERT_VALUES_EQUAL(schemaNode[0]["required"], false); - UNIT_ASSERT_VALUES_EQUAL(schemaNode[0]["sort_order"], "ascending"); - - UNIT_ASSERT_VALUES_EQUAL(schemaNode[1]["name"], "b"); - UNIT_ASSERT_VALUES_EQUAL(schemaNode[1]["type"], "uint64"); - UNIT_ASSERT_VALUES_EQUAL(schemaNode[1]["required"], false); - - UNIT_ASSERT_VALUES_EQUAL(schemaNode[2]["name"], "c"); - UNIT_ASSERT_VALUES_EQUAL(schemaNode[2]["type"], "int64"); - UNIT_ASSERT_VALUES_EQUAL(schemaNode[2]["required"], true); - } - - Y_UNIT_TEST(ValueTypeSerialization) - { - for (const auto value : GetEnumAllValues<EValueType>()) { - TNode serialized = NYT::NDetail::ToString(value); - EValueType deserialized; - Deserialize(deserialized, serialized); - UNIT_ASSERT_VALUES_EQUAL(value, deserialized); - } - } -} diff --git a/yt/cpp/mapreduce/interface/ut/common_ut.cpp b/yt/cpp/mapreduce/interface/ut/common_ut.cpp new file mode 100644 index 0000000000..85122a97ec --- /dev/null +++ b/yt/cpp/mapreduce/interface/ut/common_ut.cpp @@ -0,0 +1,353 @@ +#include "common_ut.h" + +#include <yt/cpp/mapreduce/interface/common.h> +#include <yt/cpp/mapreduce/interface/fluent.h> + +#include <yt/cpp/mapreduce/tests/yt_unittest_lib/yt_unittest_lib.h> + +#include <library/cpp/testing/gtest/gtest.h> + +#include <library/cpp/yson/node/node_io.h> +#include <library/cpp/yson/node/node_builder.h> + +#include <util/generic/xrange.h> + +using namespace NYT; + +template <class T> +TString SaveToString(const T& obj) +{ + TString s; + TStringOutput out(s); + ::Save(&out, obj); + return s; +} + +template <class T> +T LoadFromString(TStringBuf s) +{ + TMemoryInput in(s); + T obj; + ::Load(&in, obj); + return obj; +} + +template <class T> +T SaveLoad(const T& obj) +{ + return LoadFromString<T>(SaveToString(obj)); +} + +TEST(TCommonTest, SortColumnsLegacy) +{ + TSortColumns keys1("a", "b"); + EXPECT_TRUE((keys1.Parts_ == TSortColumns{"a", "b"})); + + keys1.Add("c", "d"); + EXPECT_TRUE((keys1.Parts_ == TSortColumns{"a", "b", "c", "d"})); + + auto keys2 = TSortColumns(keys1).Add("e", "f"); + EXPECT_TRUE((keys1.Parts_ == TSortColumns{"a", "b", "c", "d"})); + EXPECT_TRUE((keys2.Parts_ == TSortColumns{"a", "b", "c", "d", "e", "f"})); + + auto keys3 = TSortColumns(keys1).Add("e").Add("f").Add("g"); + EXPECT_TRUE((keys1.Parts_ == TSortColumns{"a", "b", "c", "d"})); + EXPECT_TRUE((keys3.Parts_ == TSortColumns{"a", "b", "c", "d", "e", "f", "g"})); +} + +TEST(TCommonTest, SortColumn) +{ + auto ascending = TSortColumn("a"); + EXPECT_EQ(ascending.Name(), "a"); + EXPECT_EQ(ascending.SortOrder(), ESortOrder::SO_ASCENDING); + EXPECT_EQ(ascending, TSortColumn("a", ESortOrder::SO_ASCENDING)); + EXPECT_NE(ascending, TSortColumn("a", ESortOrder::SO_DESCENDING)); + + EXPECT_NO_THROW(ascending.EnsureAscending()); + EXPECT_EQ(static_cast<TString>(ascending), "a"); + EXPECT_EQ(ascending, "a"); + + auto another = ascending; + EXPECT_NO_THROW(another = "another"); + EXPECT_EQ(another.Name(), "another"); + EXPECT_EQ(another.SortOrder(), ESortOrder::SO_ASCENDING); + EXPECT_EQ(another, TSortColumn("another", ESortOrder::SO_ASCENDING)); + EXPECT_NE(another, TSortColumn("another", ESortOrder::SO_DESCENDING)); + + auto ascendingNode = BuildYsonNodeFluently().Value(ascending); + EXPECT_EQ(ascendingNode, TNode("a")); + + EXPECT_EQ(SaveLoad(ascending), ascending); + EXPECT_NE(SaveToString(ascending), SaveToString(TString("a"))); + + auto descending = TSortColumn("a", ESortOrder::SO_DESCENDING); + EXPECT_EQ(descending.Name(), "a"); + EXPECT_EQ(descending.SortOrder(), ESortOrder::SO_DESCENDING); + EXPECT_EQ(descending, TSortColumn("a", ESortOrder::SO_DESCENDING)); + EXPECT_NE(descending, TSortColumn("a", ESortOrder::SO_ASCENDING)); + + EXPECT_THROW(descending.EnsureAscending(), yexception); + EXPECT_THROW(Y_UNUSED(static_cast<TString>(descending)), yexception); + EXPECT_THROW(Y_UNUSED(descending == "a"), yexception); + EXPECT_THROW(descending = "a", yexception); + + auto descendingNode = BuildYsonNodeFluently().Value(descending); + EXPECT_EQ(descendingNode, TNode()("name", "a")("sort_order", "descending")); + + EXPECT_EQ(SaveLoad(descending), descending); + EXPECT_NE(SaveToString(descending), SaveToString("a")); + + EXPECT_EQ(ToString(TSortColumn("blah")), "blah"); + EXPECT_EQ(ToString(TSortColumn("blah", ESortOrder::SO_DESCENDING)), "{\"name\"=\"blah\";\"sort_order\"=\"descending\"}"); +} + +TEST(TCommonTest, SortColumns) +{ + TSortColumns ascending("a", "b"); + EXPECT_TRUE(ascending.Parts_ == (TSortColumns{"a", "b"})); + EXPECT_NO_THROW(ascending.EnsureAscending()); + EXPECT_EQ(static_cast<TColumnNames>(ascending).Parts_, (TVector<TString>{"a", "b"})); + EXPECT_EQ(ascending.GetNames(), (TVector<TString>{"a", "b"})); + + auto mixed = ascending; + mixed.Add(TSortColumn("c", ESortOrder::SO_DESCENDING), "d"); + EXPECT_TRUE((mixed.Parts_ != TVector<TSortColumn>{"a", "b", "c", "d"})); + EXPECT_TRUE((mixed.Parts_ == TVector<TSortColumn>{"a", "b", TSortColumn("c", ESortOrder::SO_DESCENDING), "d"})); + EXPECT_EQ(mixed.GetNames(), (TVector<TString>{"a", "b", "c", "d"})); + EXPECT_THROW(mixed.EnsureAscending(), yexception); + EXPECT_THROW(Y_UNUSED(static_cast<TColumnNames>(mixed)), yexception); +} + +TEST(TCommonTest, KeyBound) +{ + auto keyBound = TKeyBound(ERelation::Greater, TKey(7, "a", TNode()("x", "y"))); + EXPECT_EQ(keyBound.Relation(), ERelation::Greater); + EXPECT_EQ(keyBound.Key(), TKey(7, "a", TNode()("x", "y"))); + + auto keyBound1 = TKeyBound().Relation(ERelation::Greater).Key(TKey(7, "a", TNode()("x", "y"))); + auto expectedNode = TNode() + .Add(">") + .Add(TNode().Add(7).Add("a").Add(TNode()("x", "y"))); + + EXPECT_EQ(expectedNode, BuildYsonNodeFluently().Value(keyBound)); + EXPECT_EQ(expectedNode, BuildYsonNodeFluently().Value(keyBound1)); + + keyBound.Relation(ERelation::LessOrEqual); + keyBound.Key(TKey("A", 7)); + EXPECT_EQ(keyBound.Relation(), ERelation::LessOrEqual); + EXPECT_EQ(keyBound.Key(), TKey("A", 7)); + + EXPECT_EQ( + BuildYsonNodeFluently().Value(keyBound), + TNode() + .Add("<=") + .Add(TNode().Add("A").Add(7))); +} + +TEST(TCommonTest, TTableSchema) +{ + TTableSchema schema; + schema + .AddColumn(TColumnSchema().Name("a").Type(EValueType::VT_STRING).SortOrder(SO_ASCENDING)) + .AddColumn(TColumnSchema().Name("b").Type(EValueType::VT_UINT64)) + .AddColumn(TColumnSchema().Name("c").Type(EValueType::VT_INT64)); + auto checkSortBy = [](TTableSchema schema, const TVector<TString>& columns) { + auto initialSchema = schema; + schema.SortBy(columns); + for (auto i: xrange(columns.size())) { + EXPECT_EQ(schema.Columns()[i].Name(), columns[i]); + EXPECT_EQ(schema.Columns()[i].SortOrder(), ESortOrder::SO_ASCENDING); + } + for (auto i: xrange(columns.size(), (size_t)initialSchema.Columns().size())) { + EXPECT_EQ(schema.Columns()[i].SortOrder(), Nothing()); + } + EXPECT_EQ(initialSchema.Columns().size(), schema.Columns().size()); + return schema; + }; + auto newSchema = checkSortBy(schema, {"b"}); + EXPECT_EQ(newSchema.Columns()[1].Name(), TString("a")); + EXPECT_EQ(newSchema.Columns()[2].Name(), TString("c")); + checkSortBy(schema, {"b", "c"}); + checkSortBy(schema, {"c", "a"}); + EXPECT_THROW(checkSortBy(schema, {"b", "b"}), yexception); + EXPECT_THROW(checkSortBy(schema, {"a", "junk"}), yexception); +} + +TEST(TCommonTest, TTableSchema_Decimal) +{ + NYT::TTableSchema tableSchema; + + tableSchema.AddColumn("a", NTi::Decimal(35, 18)); + tableSchema.AddColumn("b", NTi::Optional(NTi::Decimal(35, 18))); + tableSchema.AddColumn("c", NTi::List(NTi::Decimal(35, 18))); + + auto tableSchemaNode = tableSchema.ToNode(); + const auto& tableSchemaNodeList = tableSchemaNode.AsList(); + + // There was a bug in the serialization of decimal type: https://github.com/ytsaurus/ytsaurus/issues/173 + { + const auto& currentType = tableSchemaNodeList[0]; + EXPECT_EQ(currentType.ChildAsString("type"), "string"); + EXPECT_TRUE(currentType.ChildAsBool("required")); + EXPECT_TRUE(currentType.HasKey("type_v3")); + EXPECT_EQ(currentType.At("type_v3").ChildAsString("type_name"), "decimal"); + } + { + const auto& currentType = tableSchemaNodeList[1]; + EXPECT_EQ(currentType.ChildAsString("type"), "string"); + EXPECT_TRUE(!currentType.ChildAsBool("required")); + EXPECT_TRUE(currentType.HasKey("type_v3")); + EXPECT_EQ(currentType.At("type_v3").ChildAsString("type_name"), "optional"); + EXPECT_EQ(currentType.At("type_v3").At("item").ChildAsString("type_name"), "decimal"); + } + { + const auto& currentType = tableSchemaNodeList[2]; + EXPECT_EQ(currentType.ChildAsString("type"), "any"); + EXPECT_TRUE(currentType.ChildAsBool("required")); + EXPECT_TRUE(currentType.HasKey("type_v3")); + EXPECT_EQ(currentType.At("type_v3").ChildAsString("type_name"), "list"); + EXPECT_EQ(currentType.At("type_v3").At("item").ChildAsString("type_name"), "decimal"); + } + + EXPECT_EQ(tableSchema, TTableSchema::FromNode(tableSchemaNode)); +} + +TEST(TCommonTest, TColumnSchema_TypeV3) +{ + { + auto column = TColumnSchema().Type(NTi::Interval()); + EXPECT_EQ(column.Required(), true); + EXPECT_EQ(column.Type(), VT_INTERVAL); + } + { + auto column = TColumnSchema().Type(NTi::Optional(NTi::Date())); + EXPECT_EQ(column.Required(), false); + EXPECT_EQ(column.Type(), VT_DATE); + } + { + auto column = TColumnSchema().Type(NTi::Interval64()); + EXPECT_EQ(column.Required(), true); + EXPECT_EQ(column.Type(), VT_INTERVAL64); + } + { + auto column = TColumnSchema().Type(NTi::Optional(NTi::Date32())); + EXPECT_EQ(column.Required(), false); + EXPECT_EQ(column.Type(), VT_DATE32); + } + { + auto column = TColumnSchema().Type(NTi::Null()); + EXPECT_EQ(column.Required(), false); + EXPECT_EQ(column.Type(), VT_NULL); + } + { + auto column = TColumnSchema().Type(NTi::Optional(NTi::Null())); + EXPECT_EQ(column.Required(), false); + EXPECT_EQ(column.Type(), VT_ANY); + } + { + auto column = TColumnSchema().Type(NTi::Decimal(35, 18)); + EXPECT_EQ(column.Required(), true); + EXPECT_EQ(column.Type(), VT_STRING); + } +} + +TEST(TCommonTest, ToTypeV3) +{ + EXPECT_EQ(*ToTypeV3(VT_INT32, true), *NTi::Int32()); + EXPECT_EQ(*ToTypeV3(VT_UTF8, false), *NTi::Optional(NTi::Utf8())); +} + +TEST(TCommonTest, DeserializeColumn) +{ + auto deserialize = [] (TStringBuf yson) { + auto node = NodeFromYsonString(yson); + TColumnSchema column; + Deserialize(column, node); + return column; + }; + + auto column = deserialize("{name=foo; type=int64; required=%false}"); + EXPECT_EQ(column.Name(), "foo"); + EXPECT_EQ(*column.TypeV3(), *NTi::Optional(NTi::Int64())); + + column = deserialize("{name=bar; type=utf8; required=%true; type_v3=utf8}"); + EXPECT_EQ(column.Name(), "bar"); + EXPECT_EQ(*column.TypeV3(), *NTi::Utf8()); +} + +TEST(TCommonTest, ColumnSchemaEquality) +{ + auto base = TColumnSchema() + .Name("col") + .TypeV3(NTi::Optional(NTi::List(NTi::String()))) + .SortOrder(ESortOrder::SO_ASCENDING) + .Lock("lock") + .Expression("x + 12") + .Aggregate("sum") + .Group("group"); + + auto other = base; + ASSERT_SERIALIZABLES_EQ(other, base); + other.Name("other"); + ASSERT_SERIALIZABLES_NE(other, base); + + other = base; + other.TypeV3(NTi::List(NTi::String())); + ASSERT_SERIALIZABLES_NE(other, base); + + other = base; + other.ResetSortOrder(); + ASSERT_SERIALIZABLES_NE(other, base); + + other = base; + other.Lock("lock1"); + ASSERT_SERIALIZABLES_NE(other, base); + + other = base; + other.Expression("x + 13"); + ASSERT_SERIALIZABLES_NE(other, base); + + other = base; + other.ResetAggregate(); + ASSERT_SERIALIZABLES_NE(other, base); + + other = base; + other.Group("group1"); + ASSERT_SERIALIZABLES_NE(other, base); +} + +TEST(TCommonTest, TableSchemaEquality) +{ + auto col1 = TColumnSchema() + .Name("col1") + .TypeV3(NTi::Optional(NTi::List(NTi::String()))) + .SortOrder(ESortOrder::SO_ASCENDING); + + auto col2 = TColumnSchema() + .Name("col2") + .TypeV3(NTi::Uint32()); + + auto schema = TTableSchema() + .AddColumn(col1) + .AddColumn(col2) + .Strict(true) + .UniqueKeys(true); + + auto other = schema; + ASSERT_SERIALIZABLES_EQ(other, schema); + + other.Strict(false); + ASSERT_SERIALIZABLES_NE(other, schema); + + other = schema; + other.MutableColumns()[0].TypeV3(NTi::List(NTi::String())); + ASSERT_SERIALIZABLES_NE(other, schema); + + other = schema; + other.MutableColumns().push_back(col1); + ASSERT_SERIALIZABLES_NE(other, schema); + + other = schema; + other.UniqueKeys(false); + ASSERT_SERIALIZABLES_NE(other, schema); +} diff --git a/yt/cpp/mapreduce/interface/common_ut.h b/yt/cpp/mapreduce/interface/ut/common_ut.h index 6f70f09bee..6f70f09bee 100644 --- a/yt/cpp/mapreduce/interface/common_ut.h +++ b/yt/cpp/mapreduce/interface/ut/common_ut.h diff --git a/yt/cpp/mapreduce/interface/ut/config_ut.cpp b/yt/cpp/mapreduce/interface/ut/config_ut.cpp new file mode 100644 index 0000000000..780a57f3f2 --- /dev/null +++ b/yt/cpp/mapreduce/interface/ut/config_ut.cpp @@ -0,0 +1,17 @@ +#include <library/cpp/testing/gtest/gtest.h> + +#include <yt/cpp/mapreduce/interface/config.h> + +using namespace NYT; + +TEST(TConfigTest, Reset) { + // Very limited test, checks only one config field. + + auto origConfig = *TConfig::Get(); + TConfig::Get()->Reset(); + EXPECT_EQ(origConfig.Hosts, TConfig::Get()->Hosts); + + TConfig::Get()->Hosts = "hosts/fb867"; + TConfig::Get()->Reset(); + EXPECT_EQ(origConfig.Hosts, TConfig::Get()->Hosts); +} diff --git a/yt/cpp/mapreduce/interface/ut/error_ut.cpp b/yt/cpp/mapreduce/interface/ut/error_ut.cpp new file mode 100644 index 0000000000..4911f29d97 --- /dev/null +++ b/yt/cpp/mapreduce/interface/ut/error_ut.cpp @@ -0,0 +1,81 @@ +#include <library/cpp/testing/gtest/gtest.h> + +#include <library/cpp/json/json_reader.h> + +#include <yt/cpp/mapreduce/interface/errors.h> + +#include <yt/cpp/mapreduce/common/helpers.h> + +#include <util/generic/set.h> + +using namespace NYT; + +template<> +void Out<NYT::TNode>(IOutputStream& s, const NYT::TNode& node) +{ + s << "TNode:" << NodeToYsonString(node); +} + +TEST(TErrorTest, ParseJson) +{ + // Scary real world error! Бу! + const char* jsonText = + R"""({)""" + R"""("code":500,)""" + R"""("message":"Error resolving path //home/user/link",)""" + R"""("attributes":{)""" + R"""("fid":18446484571700269066,)""" + R"""("method":"Create",)""" + R"""("tid":17558639495721339338,)""" + R"""("datetime":"2017-04-07T13:38:56.474819Z",)""" + R"""("pid":414529,)""" + R"""("host":"build01-01g.yt.yandex.net"},)""" + R"""("inner_errors":[{)""" + R"""("code":1,)""" + R"""("message":"Node //tt cannot have children",)""" + R"""("attributes":{)""" + R"""("fid":18446484571700269066,)""" + R"""("tid":17558639495721339338,)""" + R"""("datetime":"2017-04-07T13:38:56.474725Z",)""" + R"""("pid":414529,)""" + R"""("host":"build01-01g.yt.yandex.net"},)""" + R"""("inner_errors":[]}]})"""; + + NJson::TJsonValue jsonValue; + ReadJsonFastTree(jsonText, &jsonValue, /*throwOnError=*/ true); + + TYtError error(jsonValue); + EXPECT_EQ(error.GetCode(), 500); + EXPECT_EQ(error.GetMessage(), R"""(Error resolving path //home/user/link)"""); + EXPECT_EQ(error.InnerErrors().size(), 1u); + EXPECT_EQ(error.InnerErrors()[0].GetCode(), 1); + + EXPECT_EQ(error.HasAttributes(), true); + EXPECT_EQ(error.GetAttributes().at("method"), TNode("Create")); + + EXPECT_EQ(error.GetAllErrorCodes(), TSet<int>({500, 1})); +} + +TEST(TErrorTest, GetYsonText) { + const char* jsonText = + R"""({)""" + R"""("code":500,)""" + R"""("message":"outer error",)""" + R"""("attributes":{)""" + R"""("method":"Create",)""" + R"""("pid":414529},)""" + R"""("inner_errors":[{)""" + R"""("code":1,)""" + R"""("message":"inner error",)""" + R"""("attributes":{},)""" + R"""("inner_errors":[])""" + R"""(}]})"""; + TYtError error; + error.ParseFrom(jsonText); + TString ysonText = error.GetYsonText(); + TYtError error2(NodeFromYsonString(ysonText)); + EXPECT_EQ( + ysonText, + R"""({"code"=500;"message"="outer error";"attributes"={"method"="Create";"pid"=414529};"inner_errors"=[{"code"=1;"message"="inner error"}]})"""); + EXPECT_EQ(error2.GetYsonText(), ysonText); +} diff --git a/yt/cpp/mapreduce/interface/ut/format_ut.cpp b/yt/cpp/mapreduce/interface/ut/format_ut.cpp new file mode 100644 index 0000000000..83b860ab94 --- /dev/null +++ b/yt/cpp/mapreduce/interface/ut/format_ut.cpp @@ -0,0 +1,232 @@ +#include "common_ut.h" + +#include <yt/cpp/mapreduce/interface/common.h> +#include <yt/cpp/mapreduce/interface/errors.h> +#include <yt/cpp/mapreduce/interface/format.h> + +#include <yt/cpp/mapreduce/interface/ut/proto3_ut.pb.h> +#include <yt/cpp/mapreduce/interface/ut/protobuf_table_schema_ut.pb.h> + +#include <library/cpp/testing/gtest/gtest.h> + +using namespace NYT; + +static TNode GetColumns(const TFormat& format, int tableIndex = 0) +{ + return format.Config.GetAttributes()["tables"][tableIndex]["columns"]; +} + +TEST(TProtobufFormatTest, TIntegral) +{ + const auto format = TFormat::Protobuf<NUnitTesting::TIntegral>(); + auto columns = GetColumns(format); + + struct TColumn + { + TString Name; + TString ProtoType; + int FieldNumber; + }; + + auto expected = TVector<TColumn>{ + {"DoubleField", "double", 1}, + {"FloatField", "float", 2}, + {"Int32Field", "int32", 3}, + {"Int64Field", "int64", 4}, + {"Uint32Field", "uint32", 5}, + {"Uint64Field", "uint64", 6}, + {"Sint32Field", "sint32", 7}, + {"Sint64Field", "sint64", 8}, + {"Fixed32Field", "fixed32", 9}, + {"Fixed64Field", "fixed64", 10}, + {"Sfixed32Field", "sfixed32", 11}, + {"Sfixed64Field", "sfixed64", 12}, + {"BoolField", "bool", 13}, + {"EnumField", "enum_string", 14}, + }; + + EXPECT_EQ(columns.Size(), expected.size()); + for (int i = 0; i < static_cast<int>(columns.Size()); ++i) { + EXPECT_EQ(columns[i]["name"], expected[i].Name); + EXPECT_EQ(columns[i]["proto_type"], expected[i].ProtoType); + EXPECT_EQ(columns[i]["field_number"], expected[i].FieldNumber); + } +} + +TEST(TProtobufFormatTest, TRowFieldSerializationOption) +{ + const auto format = TFormat::Protobuf<NUnitTesting::TRowFieldSerializationOption>(); + auto columns = GetColumns(format); + + EXPECT_EQ(columns[0]["name"], "UrlRow_1"); + EXPECT_EQ(columns[0]["proto_type"], "structured_message"); + EXPECT_EQ(columns[0]["field_number"], 1); + const auto& fields = columns[0]["fields"]; + EXPECT_EQ(fields[0]["name"], "Host"); + EXPECT_EQ(fields[0]["proto_type"], "string"); + EXPECT_EQ(fields[0]["field_number"], 1); + + EXPECT_EQ(fields[1]["name"], "Path"); + EXPECT_EQ(fields[1]["proto_type"], "string"); + EXPECT_EQ(fields[1]["field_number"], 2); + + EXPECT_EQ(fields[2]["name"], "HttpCode"); + EXPECT_EQ(fields[2]["proto_type"], "sint32"); + EXPECT_EQ(fields[2]["field_number"], 3); + + EXPECT_EQ(columns[1]["name"], "UrlRow_2"); + EXPECT_EQ(columns[1]["proto_type"], "message"); + EXPECT_EQ(columns[1]["field_number"], 2); +} + + +TEST(TProtobufFormatTest, TPacked) +{ + const auto format = TFormat::Protobuf<NUnitTesting::TPacked>(); + auto column = GetColumns(format)[0]; + + EXPECT_EQ(column["name"], "PackedListInt64"); + EXPECT_EQ(column["proto_type"], "int64"); + EXPECT_EQ(column["field_number"], 1); + EXPECT_EQ(column["packed"], true); + EXPECT_EQ(column["repeated"], true); +} + +TEST(TProtobufFormatTest, TCyclic) +{ + EXPECT_THROW(TFormat::Protobuf<NUnitTesting::TCyclic>(), TApiUsageError); + EXPECT_THROW(TFormat::Protobuf<NUnitTesting::TCyclic::TA>(), TApiUsageError); + EXPECT_THROW(TFormat::Protobuf<NUnitTesting::TCyclic::TB>(), TApiUsageError); + EXPECT_THROW(TFormat::Protobuf<NUnitTesting::TCyclic::TC>(), TApiUsageError); + EXPECT_THROW(TFormat::Protobuf<NUnitTesting::TCyclic::TD>(), TApiUsageError); + + const auto format = TFormat::Protobuf<NUnitTesting::TCyclic::TE>(); + auto column = GetColumns(format)[0]; + EXPECT_EQ(column["name"], "d"); + EXPECT_EQ(column["proto_type"], "message"); + EXPECT_EQ(column["field_number"], 1); +} + +TEST(TProtobufFormatTest, Map) +{ + const auto format = TFormat::Protobuf<NUnitTesting::TWithMap>(); + auto columns = GetColumns(format); + + EXPECT_EQ(columns.Size(), 5u); + { + const auto& column = columns[0]; + EXPECT_EQ(column["name"], "MapDefault"); + EXPECT_EQ(column["proto_type"], "structured_message"); + EXPECT_EQ(column["fields"].Size(), 2u); + EXPECT_EQ(column["fields"][0]["proto_type"], "int64"); + EXPECT_EQ(column["fields"][1]["proto_type"], "message"); + } + { + const auto& column = columns[1]; + EXPECT_EQ(column["name"], "MapListOfStructsLegacy"); + EXPECT_EQ(column["proto_type"], "structured_message"); + EXPECT_EQ(column["fields"].Size(), 2u); + EXPECT_EQ(column["fields"][0]["proto_type"], "int64"); + EXPECT_EQ(column["fields"][1]["proto_type"], "message"); + } + { + const auto& column = columns[2]; + EXPECT_EQ(column["name"], "MapListOfStructs"); + EXPECT_EQ(column["proto_type"], "structured_message"); + EXPECT_EQ(column["fields"].Size(), 2u); + EXPECT_EQ(column["fields"][0]["proto_type"], "int64"); + EXPECT_EQ(column["fields"][1]["proto_type"], "structured_message"); + } + { + const auto& column = columns[3]; + EXPECT_EQ(column["name"], "MapOptionalDict"); + EXPECT_EQ(column["proto_type"], "structured_message"); + EXPECT_EQ(column["fields"].Size(), 2u); + EXPECT_EQ(column["fields"][0]["proto_type"], "int64"); + EXPECT_EQ(column["fields"][1]["proto_type"], "structured_message"); + } + { + const auto& column = columns[4]; + EXPECT_EQ(column["name"], "MapDict"); + EXPECT_EQ(column["proto_type"], "structured_message"); + EXPECT_EQ(column["fields"].Size(), 2u); + EXPECT_EQ(column["fields"][0]["proto_type"], "int64"); + EXPECT_EQ(column["fields"][1]["proto_type"], "structured_message"); + } +} + + +TEST(TProtobufFormatTest, Oneof) +{ + const auto format = TFormat::Protobuf<NUnitTesting::TWithOneof>(); + auto columns = GetColumns(format); + + EXPECT_EQ(columns.Size(), 4u); + auto check = [] (const TNode& column, TStringBuf name, TStringBuf oneof2Name) { + EXPECT_EQ(column["name"], name); + EXPECT_EQ(column["proto_type"], "structured_message"); + EXPECT_EQ(column["fields"].Size(), 5u); + EXPECT_EQ(column["fields"][0]["name"], "field"); + + const auto& oneof2 = column["fields"][1]; + EXPECT_EQ(oneof2["name"], oneof2Name); + EXPECT_EQ(oneof2["proto_type"], "oneof"); + EXPECT_EQ(oneof2["fields"][0]["name"], "y2"); + EXPECT_EQ(oneof2["fields"][1]["name"], "z2"); + EXPECT_EQ(oneof2["fields"][1]["proto_type"], "structured_message"); + const auto& embeddedOneof = oneof2["fields"][1]["fields"][0]; + EXPECT_EQ(embeddedOneof["name"], "Oneof"); + EXPECT_EQ(embeddedOneof["fields"][0]["name"], "x"); + EXPECT_EQ(embeddedOneof["fields"][1]["name"], "y"); + EXPECT_EQ(oneof2["fields"][2]["name"], "x2"); + + EXPECT_EQ(column["fields"][2]["name"], "x1"); + EXPECT_EQ(column["fields"][3]["name"], "y1"); + EXPECT_EQ(column["fields"][4]["name"], "z1"); + }; + + check(columns[0], "DefaultSeparateFields", "variant_field_name"); + check(columns[1], "NoDefault", "Oneof2"); + + { + const auto& column = columns[2]; + EXPECT_EQ(column["name"], "SerializationProtobuf"); + EXPECT_EQ(column["proto_type"], "structured_message"); + EXPECT_EQ(column["fields"].Size(), 3u); + EXPECT_EQ(column["fields"][0]["name"], "x1"); + EXPECT_EQ(column["fields"][1]["name"], "y1"); + EXPECT_EQ(column["fields"][2]["name"], "z1"); + } + { + const auto& column = columns[3]; + EXPECT_EQ(column["name"], "TopLevelOneof"); + EXPECT_EQ(column["proto_type"], "oneof"); + EXPECT_EQ(column["fields"].Size(), 1u); + EXPECT_EQ(column["fields"][0]["name"], "MemberOfTopLevelOneof"); + } +} + +TEST(TProto3Test, TWithOptional) +{ + const auto format = TFormat::Protobuf<NTestingProto3::TWithOptional>(); + auto columns = GetColumns(format); + + EXPECT_EQ(columns[0]["name"], "x"); + EXPECT_EQ(columns[0]["proto_type"], "int64"); + EXPECT_EQ(columns[0]["field_number"], 1); +} + +TEST(TProto3Test, TWithOptionalMessage) +{ + const auto format = TFormat::Protobuf<NTestingProto3::TWithOptionalMessage>(); + auto columns = GetColumns(format); + + EXPECT_EQ(columns[0]["name"], "x"); + EXPECT_EQ(columns[0]["proto_type"], "structured_message"); + EXPECT_EQ(columns[0]["field_number"], 1); + + EXPECT_EQ(columns[0]["fields"].Size(), 1u); + EXPECT_EQ(columns[0]["fields"][0]["name"], "x"); + EXPECT_EQ(columns[0]["fields"][0]["proto_type"], "int64"); + EXPECT_EQ(columns[0]["fields"][0]["field_number"], 1); +} diff --git a/yt/cpp/mapreduce/interface/ut/job_counters_ut.cpp b/yt/cpp/mapreduce/interface/ut/job_counters_ut.cpp new file mode 100644 index 0000000000..9972637aff --- /dev/null +++ b/yt/cpp/mapreduce/interface/ut/job_counters_ut.cpp @@ -0,0 +1,100 @@ +#include <yt/cpp/mapreduce/interface/job_counters.h> +#include <yt/cpp/mapreduce/interface/operation.h> + +#include <library/cpp/yson/node/node_io.h> + +#include <library/cpp/testing/gtest/gtest.h> + +using namespace NYT; + +TEST(TJobCountersTest, Full) +{ + const TString input = R"""( + { + "completed" = { + "total" = 6; + "non-interrupted" = 1; + "interrupted" = { + "whatever_interrupted" = 2; + "whatever_else_interrupted" = 3; + }; + }; + "aborted" = { + "non_scheduled" = { + "whatever_non_scheduled" = 4; + "whatever_else_non_scheduled" = 5; + }; + "scheduled" = { + "whatever_scheduled" = 6; + "whatever_else_scheduled" = 7; + }; + "total" = 22; + }; + "lost" = 8; + "invalidated" = 9; + "failed" = 10; + "running" = 11; + "suspended" = 12; + "pending" = 13; + "blocked" = 14; + "total" = 105; + })"""; + + TJobCounters counters(NodeFromYsonString(input)); + + EXPECT_EQ(counters.GetTotal(), 105u); + + EXPECT_EQ(counters.GetCompleted().GetTotal(), 6u); + EXPECT_EQ(counters.GetCompletedNonInterrupted().GetTotal(), 1u); + EXPECT_EQ(counters.GetCompletedInterrupted().GetTotal(), 5u); + EXPECT_EQ(counters.GetAborted().GetTotal(), 22u); + EXPECT_EQ(counters.GetAbortedNonScheduled().GetTotal(), 9u); + EXPECT_EQ(counters.GetAbortedScheduled().GetTotal(), 13u); + EXPECT_EQ(counters.GetLost().GetTotal(), 8u); + EXPECT_EQ(counters.GetInvalidated().GetTotal(), 9u); + EXPECT_EQ(counters.GetFailed().GetTotal(), 10u); + EXPECT_EQ(counters.GetRunning().GetTotal(), 11u); + EXPECT_EQ(counters.GetSuspended().GetTotal(), 12u); + EXPECT_EQ(counters.GetPending().GetTotal(), 13u); + EXPECT_EQ(counters.GetBlocked().GetTotal(), 14u); + + EXPECT_EQ(counters.GetCompletedInterrupted().GetValue("whatever_interrupted"), 2u); + EXPECT_EQ(counters.GetCompletedInterrupted().GetValue("whatever_else_interrupted"), 3u); + EXPECT_EQ(counters.GetAbortedNonScheduled().GetValue("whatever_non_scheduled"), 4u); + EXPECT_EQ(counters.GetAbortedNonScheduled().GetValue("whatever_else_non_scheduled"), 5u); + EXPECT_EQ(counters.GetAbortedScheduled().GetValue("whatever_scheduled"), 6u); + EXPECT_EQ(counters.GetAbortedScheduled().GetValue("whatever_else_scheduled"), 7u); + + EXPECT_THROW(counters.GetCompletedInterrupted().GetValue("Nothingness"), yexception); +} + +TEST(TJobCountersTest, Empty) +{ + const TString input = "{}"; + + TJobCounters counters(NodeFromYsonString(input)); + + EXPECT_EQ(counters.GetTotal(), 0u); + + EXPECT_EQ(counters.GetCompleted().GetTotal(), 0u); + EXPECT_EQ(counters.GetCompletedNonInterrupted().GetTotal(), 0u); + EXPECT_EQ(counters.GetCompletedInterrupted().GetTotal(), 0u); + EXPECT_EQ(counters.GetAborted().GetTotal(), 0u); + EXPECT_EQ(counters.GetAbortedNonScheduled().GetTotal(), 0u); + EXPECT_EQ(counters.GetAbortedScheduled().GetTotal(), 0u); + EXPECT_EQ(counters.GetLost().GetTotal(), 0u); + EXPECT_EQ(counters.GetInvalidated().GetTotal(), 0u); + EXPECT_EQ(counters.GetFailed().GetTotal(), 0u); + EXPECT_EQ(counters.GetRunning().GetTotal(), 0u); + EXPECT_EQ(counters.GetSuspended().GetTotal(), 0u); + EXPECT_EQ(counters.GetPending().GetTotal(), 0u); + EXPECT_EQ(counters.GetBlocked().GetTotal(), 0u); +} + +TEST(TJobCountersTest, Broken) +{ + EXPECT_THROW_MESSAGE_HAS_SUBSTR((TJobCounters(TNode())), yexception, "TJobCounters"); + EXPECT_THROW_MESSAGE_HAS_SUBSTR((TJobCounters(TNode(1))), yexception, "TJobCounters"); + EXPECT_THROW_MESSAGE_HAS_SUBSTR((TJobCounters(TNode(1.0))), yexception, "TJobCounters"); + EXPECT_THROW_MESSAGE_HAS_SUBSTR((TJobCounters(TNode("Whatever"))), yexception, "TJobCounters"); +} diff --git a/yt/cpp/mapreduce/interface/ut/job_statistics_ut.cpp b/yt/cpp/mapreduce/interface/ut/job_statistics_ut.cpp new file mode 100644 index 0000000000..90d40623c1 --- /dev/null +++ b/yt/cpp/mapreduce/interface/ut/job_statistics_ut.cpp @@ -0,0 +1,254 @@ +#include <yt/cpp/mapreduce/interface/job_statistics.h> +#include <yt/cpp/mapreduce/interface/operation.h> + +#include <library/cpp/yson/node/node_io.h> + +#include <library/cpp/testing/gtest/gtest.h> + +using namespace NYT; + +TEST(TJobStatisticsTest, Simple) +{ + const TString input = R"""( + { + "data" = { + "output" = { + "0" = { + "uncompressed_data_size" = { + "$" = { + "completed" = { + "simple_sort" = { + "max" = 130; + "count" = 1; + "min" = 130; + "sum" = 130; + }; + "map" = { + "max" = 42; + "count" = 1; + "min" = 42; + "sum" = 42; + }; + }; + "aborted" = { + "simple_sort" = { + "max" = 24; + "count" = 1; + "min" = 24; + "sum" = 24; + }; + }; + }; + }; + }; + }; + }; + })"""; + + TJobStatistics stat(NodeFromYsonString(input)); + + EXPECT_TRUE(stat.HasStatistics("data/output/0/uncompressed_data_size")); + EXPECT_TRUE(!stat.HasStatistics("nonexistent-statistics")); + EXPECT_THROW_MESSAGE_HAS_SUBSTR(stat.GetStatistics("BLAH-BLAH"), yexception, "Statistics"); + + EXPECT_EQ(stat.GetStatisticsNames(), TVector<TString>{"data/output/0/uncompressed_data_size"}); + + EXPECT_EQ(stat.GetStatistics("data/output/0/uncompressed_data_size").Max(), 130); + EXPECT_EQ(stat.GetStatistics("data/output/0/uncompressed_data_size").Count(), 2); + EXPECT_EQ(stat.GetStatistics("data/output/0/uncompressed_data_size").Min(), 42); + EXPECT_EQ(stat.GetStatistics("data/output/0/uncompressed_data_size").Sum(), 172); + EXPECT_EQ(stat.GetStatistics("data/output/0/uncompressed_data_size").Avg(), 172 / 2); + + EXPECT_EQ(stat.JobState({EJobState::Aborted}).GetStatistics("data/output/0/uncompressed_data_size").Sum(), 24); + EXPECT_EQ(stat.JobType({EJobType::Map}).JobState({EJobState::Aborted}).GetStatistics("data/output/0/uncompressed_data_size").Sum(), TMaybe<i64>()); +} + +TEST(TJobStatisticsTest, OtherTypes) +{ + const TString input = R"""( + { + "time" = { + "exec" = { + "$" = { + "completed" = { + "map" = { + "max" = 2482468; + "count" = 38; + "min" = 578976; + "sum" = 47987270; + }; + }; + }; + }; + }; + })"""; + + TJobStatistics stat(NodeFromYsonString(input)); + + EXPECT_EQ(stat.GetStatisticsAs<TDuration>("time/exec").Max(), TDuration::MilliSeconds(2482468)); +} + +TEST(TJobStatisticsTest, Custom) +{ + const TString input = R"""( + { + "custom" = { + "some" = { + "path" = { + "$" = { + "completed" = { + "map" = { + "max" = -1; + "count" = 1; + "min" = -1; + "sum" = -1; + }; + }; + }; + }; + }; + "another" = { + "path" = { + "$" = { + "completed" = { + "map" = { + "max" = 1001; + "count" = 2; + "min" = 1001; + "sum" = 2002; + }; + }; + }; + }; + }; + }; + })"""; + + TJobStatistics stat(NodeFromYsonString(input)); + + EXPECT_TRUE(stat.HasCustomStatistics("some/path")); + EXPECT_TRUE(!stat.HasCustomStatistics("nonexistent-statistics")); + EXPECT_THROW_MESSAGE_HAS_SUBSTR(stat.GetCustomStatistics("BLAH-BLAH"), yexception, "Statistics"); + + const auto names = stat.GetCustomStatisticsNames(); + const THashSet<TString> expected = {"some/path", "another/path"}; + EXPECT_EQ(THashSet<TString>(names.begin(), names.end()), expected); + + EXPECT_EQ(stat.GetCustomStatistics("some/path").Max(), -1); + EXPECT_EQ(stat.GetCustomStatistics("another/path").Avg(), 1001); +} + +TEST(TJobStatisticsTest, TaskNames) +{ + const TString input = R"""( + { + "data" = { + "output" = { + "0" = { + "uncompressed_data_size" = { + "$" = { + "completed" = { + "partition_map" = { + "max" = 130; + "count" = 1; + "min" = 130; + "sum" = 130; + }; + "partition(0)" = { + "max" = 42; + "count" = 1; + "min" = 42; + "sum" = 42; + }; + }; + "aborted" = { + "simple_sort" = { + "max" = 24; + "count" = 1; + "min" = 24; + "sum" = 24; + }; + }; + }; + }; + }; + }; + }; + })"""; + + TJobStatistics stat(NodeFromYsonString(input)); + + EXPECT_TRUE(stat.HasStatistics("data/output/0/uncompressed_data_size")); + EXPECT_TRUE(!stat.HasStatistics("nonexistent-statistics")); + EXPECT_THROW_MESSAGE_HAS_SUBSTR(stat.GetStatistics("BLAH-BLAH"), yexception, "Statistics"); + + EXPECT_EQ(stat.GetStatisticsNames(), TVector<TString>{"data/output/0/uncompressed_data_size"}); + + EXPECT_EQ(stat.GetStatistics("data/output/0/uncompressed_data_size").Max(), 130); + EXPECT_EQ(stat.GetStatistics("data/output/0/uncompressed_data_size").Count(), 2); + EXPECT_EQ(stat.GetStatistics("data/output/0/uncompressed_data_size").Min(), 42); + EXPECT_EQ(stat.GetStatistics("data/output/0/uncompressed_data_size").Sum(), 172); + EXPECT_EQ(stat.GetStatistics("data/output/0/uncompressed_data_size").Avg(), 172 / 2); + + EXPECT_EQ( + stat + .JobState({EJobState::Aborted}) + .GetStatistics("data/output/0/uncompressed_data_size") + .Sum(), + 24); + EXPECT_EQ( + stat + .JobType({EJobType::Partition}) + .JobState({EJobState::Aborted}) + .GetStatistics("data/output/0/uncompressed_data_size") + .Sum(), + TMaybe<i64>()); + EXPECT_EQ( + stat + .TaskName({"partition(0)"}) + .GetStatistics("data/output/0/uncompressed_data_size") + .Sum(), + 42); + EXPECT_EQ( + stat + .TaskName({"partition"}) + .GetStatistics("data/output/0/uncompressed_data_size") + .Sum(), + TMaybe<i64>()); + EXPECT_EQ( + stat + .TaskName({"partition_map(0)"}) + .GetStatistics("data/output/0/uncompressed_data_size") + .Sum(), + 130); + EXPECT_EQ( + stat + .JobType({EJobType::Partition}) + .GetStatistics("data/output/0/uncompressed_data_size") + .Sum(), + 42); + EXPECT_EQ( + stat + .JobType({EJobType::PartitionMap}) + .GetStatistics("data/output/0/uncompressed_data_size") + .Sum(), + 130); + EXPECT_EQ( + stat + .TaskName({ETaskName::Partition0}) + .GetStatistics("data/output/0/uncompressed_data_size") + .Sum(), + 42); + EXPECT_EQ( + stat + .TaskName({ETaskName::Partition1}) + .GetStatistics("data/output/0/uncompressed_data_size") + .Sum(), + TMaybe<i64>()); + EXPECT_EQ( + stat + .TaskName({ETaskName::PartitionMap0}) + .GetStatistics("data/output/0/uncompressed_data_size") + .Sum(), + 130); +} diff --git a/yt/cpp/mapreduce/interface/ut/operation_ut.cpp b/yt/cpp/mapreduce/interface/ut/operation_ut.cpp new file mode 100644 index 0000000000..81d03d0618 --- /dev/null +++ b/yt/cpp/mapreduce/interface/ut/operation_ut.cpp @@ -0,0 +1,272 @@ +#include "common_ut.h" + +#include <yt/cpp/mapreduce/interface/job_statistics.h> +#include <yt/cpp/mapreduce/interface/operation.h> + +#include <yt/cpp/mapreduce/interface/ut/protobuf_table_schema_ut.pb.h> + +#include <yt/cpp/mapreduce/tests/yt_unittest_lib/yt_unittest_lib.h> + +#include <library/cpp/yson/node/node_io.h> + +#include <library/cpp/testing/gtest/gtest.h> + +using namespace NYT; +using namespace NYT::NUnitTesting; + +//////////////////////////////////////////////////////////////////// + +class TDummyInferenceContext + : public IOperationPreparationContext +{ +public: + TDummyInferenceContext(int inputCount, int outputCount) + : InputCount_(inputCount) + , OutputCount_(outputCount) + , InputSchemas_(inputCount) + { } + + int GetInputCount() const override + { + return InputCount_; + } + + int GetOutputCount() const override + { + return OutputCount_; + } + + const TVector<TTableSchema>& GetInputSchemas() const override + { + return InputSchemas_; + } + + const TTableSchema& GetInputSchema(int index) const override + { + return InputSchemas_[index]; + } + + TMaybe<TYPath> GetInputPath(int) const override + { + return Nothing(); + } + + TMaybe<TYPath> GetOutputPath(int) const override + { + return Nothing(); + } + +private: + int InputCount_; + int OutputCount_; + TVector<TTableSchema> InputSchemas_; +}; + +//////////////////////////////////////////////////////////////////// + +TEST(TPrepareOperationTest, BasicSchemas) +{ + auto firstSchema = TTableSchema() + .AddColumn(TColumnSchema().Name("some_column").Type(EValueType::VT_UINT64)); + auto otherSchema = TTableSchema() + .AddColumn(TColumnSchema().Name("other_column").Type(EValueType::VT_BOOLEAN)); + auto thirdSchema = TTableSchema() + .AddColumn(TColumnSchema().Name("third_column").Type(EValueType::VT_STRING)); + + TDummyInferenceContext context(3,7); + TJobOperationPreparer builder(context); + + builder + .OutputSchema(1, firstSchema) + .BeginOutputGroup(TVector<int>{2, 5}) + .Schema(otherSchema) + .EndOutputGroup() + .BeginOutputGroup(3, 5) + .Schema(thirdSchema) + .EndOutputGroup() + .BeginOutputGroup(TVector<int>{0, 6}) + .Schema(thirdSchema) + .EndOutputGroup(); + + EXPECT_THROW(builder.OutputSchema(1, otherSchema), TApiUsageError); + EXPECT_THROW(builder.BeginOutputGroup(3, 5).Schema(otherSchema), TApiUsageError); + EXPECT_THROW(builder.BeginOutputGroup(TVector<int>{3,6,7}).Schema(otherSchema), TApiUsageError); + + builder.Finish(); + auto result = builder.GetOutputSchemas(); + + ASSERT_SERIALIZABLES_EQ(result[0], thirdSchema); + ASSERT_SERIALIZABLES_EQ(result[1], firstSchema); + ASSERT_SERIALIZABLES_EQ(result[2], otherSchema); + ASSERT_SERIALIZABLES_EQ(result[3], thirdSchema); + ASSERT_SERIALIZABLES_EQ(result[4], thirdSchema); + ASSERT_SERIALIZABLES_EQ(result[5], otherSchema); + ASSERT_SERIALIZABLES_EQ(result[6], thirdSchema); +} + +TEST(TPrepareOperationTest, NoSchema) +{ + auto schema = TTableSchema() + .AddColumn(TColumnSchema().Name("some_column").Type(EValueType::VT_UINT64)); + + TDummyInferenceContext context(3,4); + TJobOperationPreparer builder(context); + + builder + .OutputSchema(1, schema) + .NoOutputSchema(0) + .BeginOutputGroup(2, 4) + .Schema(schema) + .EndOutputGroup(); + + EXPECT_THROW(builder.OutputSchema(0, schema), TApiUsageError); + + builder.Finish(); + auto result = builder.GetOutputSchemas(); + + EXPECT_TRUE(result[0].Empty()); + + ASSERT_SERIALIZABLES_EQ(result[1], schema); + ASSERT_SERIALIZABLES_EQ(result[2], schema); + ASSERT_SERIALIZABLES_EQ(result[3], schema); +} + +TEST(TPrepareOperationTest, Descriptions) +{ + auto urlRowSchema = TTableSchema() + .AddColumn(TColumnSchema().Name("Host").Type(NTi::Optional(NTi::String()))) + .AddColumn(TColumnSchema().Name("Path").Type(NTi::Optional(NTi::String()))) + .AddColumn(TColumnSchema().Name("HttpCode").Type(NTi::Optional(NTi::Int32()))); + + auto urlRowStruct = NTi::Struct({ + {"Host", NTi::Optional(NTi::String())}, + {"Path", NTi::Optional(NTi::String())}, + {"HttpCode", NTi::Optional(NTi::Int32())}, + }); + + auto rowFieldSerializationOptionSchema = TTableSchema() + .AddColumn(TColumnSchema().Name("UrlRow_1").Type(NTi::Optional(urlRowStruct))) + .AddColumn(TColumnSchema().Name("UrlRow_2").Type(NTi::Optional(NTi::String()))); + + auto rowSerializedRepeatedFieldsSchema = TTableSchema() + .AddColumn(TColumnSchema().Name("Ints").Type(NTi::List(NTi::Int64()))) + .AddColumn(TColumnSchema().Name("UrlRows").Type(NTi::List(urlRowStruct))); + + TDummyInferenceContext context(5,7); + TJobOperationPreparer builder(context); + + builder + .InputDescription<TUrlRow>(0) + .BeginInputGroup(2, 3) + .Description<TUrlRow>() + .EndInputGroup() + .BeginInputGroup(TVector<int>{1, 4}) + .Description<TRowSerializedRepeatedFields>() + .EndInputGroup() + .InputDescription<TUrlRow>(3); + + EXPECT_THROW(builder.InputDescription<TUrlRow>(0), TApiUsageError); + + builder + .OutputDescription<TUrlRow>(0, false) + .OutputDescription<TRowFieldSerializationOption>(1) + .BeginOutputGroup(2, 4) + .Description<TUrlRow>() + .EndOutputGroup() + .BeginOutputGroup(TVector<int>{4,6}) + .Description<TRowSerializedRepeatedFields>() + .EndOutputGroup() + .OutputDescription<TUrlRow>(5, false); + + EXPECT_THROW(builder.OutputDescription<TUrlRow>(0), TApiUsageError); + EXPECT_NO_THROW(builder.OutputSchema(0, urlRowSchema)); + EXPECT_NO_THROW(builder.OutputSchema(5, urlRowSchema)); + EXPECT_THROW(builder.OutputSchema(1, urlRowSchema), TApiUsageError); + + builder.Finish(); + auto result = builder.GetOutputSchemas(); + + ASSERT_SERIALIZABLES_EQ(result[0], urlRowSchema); + ASSERT_SERIALIZABLES_EQ(result[1], rowFieldSerializationOptionSchema); + ASSERT_SERIALIZABLES_EQ(result[2], urlRowSchema); + ASSERT_SERIALIZABLES_EQ(result[3], urlRowSchema); + ASSERT_SERIALIZABLES_EQ(result[4], rowSerializedRepeatedFieldsSchema); + ASSERT_SERIALIZABLES_EQ(result[5], urlRowSchema); + ASSERT_SERIALIZABLES_EQ(result[6], rowSerializedRepeatedFieldsSchema); + + auto expectedInputDescriptions = TVector<TMaybe<TTableStructure>>{ + {TProtobufTableStructure{TUrlRow::descriptor()}}, + {TProtobufTableStructure{TRowSerializedRepeatedFields::descriptor()}}, + {TProtobufTableStructure{TUrlRow::descriptor()}}, + {TProtobufTableStructure{TUrlRow::descriptor()}}, + {TProtobufTableStructure{TRowSerializedRepeatedFields::descriptor()}}, + }; + EXPECT_EQ(expectedInputDescriptions, builder.GetInputDescriptions()); + + auto expectedOutputDescriptions = TVector<TMaybe<TTableStructure>>{ + {TProtobufTableStructure{TUrlRow::descriptor()}}, + {TProtobufTableStructure{TRowFieldSerializationOption::descriptor()}}, + {TProtobufTableStructure{TUrlRow::descriptor()}}, + {TProtobufTableStructure{TUrlRow::descriptor()}}, + {TProtobufTableStructure{TRowSerializedRepeatedFields::descriptor()}}, + {TProtobufTableStructure{TUrlRow::descriptor()}}, + {TProtobufTableStructure{TRowSerializedRepeatedFields::descriptor()}}, + }; + EXPECT_EQ(expectedOutputDescriptions, builder.GetOutputDescriptions()); +} + +TEST(TPrepareOperationTest, InputColumns) +{ + TDummyInferenceContext context(5, 1); + TJobOperationPreparer builder(context); + builder + .InputColumnFilter(2, {"a", "b"}) + .BeginInputGroup(0, 2) + .ColumnFilter({"b", "c"}) + .ColumnRenaming({{"b", "B"}, {"c", "C"}}) + .EndInputGroup() + .InputColumnRenaming(3, {{"a", "AAA"}}) + .NoOutputSchema(0); + builder.Finish(); + + auto expectedRenamings = TVector<THashMap<TString, TString>>{ + {{"b", "B"}, {"c", "C"}}, + {{"b", "B"}, {"c", "C"}}, + {}, + {{"a", "AAA"}}, + {}, + }; + EXPECT_EQ(builder.GetInputColumnRenamings(), expectedRenamings); + + auto expectedFilters = TVector<TMaybe<TVector<TString>>>{ + {{"b", "c"}}, + {{"b", "c"}}, + {{"a", "b"}}, + {}, + {}, + }; + EXPECT_EQ(builder.GetInputColumnFilters(), expectedFilters); +} + +TEST(TPrepareOperationTest, Bug_r7349102) +{ + auto firstSchema = TTableSchema() + .AddColumn(TColumnSchema().Name("some_column").Type(EValueType::VT_UINT64)); + auto otherSchema = TTableSchema() + .AddColumn(TColumnSchema().Name("other_column").Type(EValueType::VT_BOOLEAN)); + auto thirdSchema = TTableSchema() + .AddColumn(TColumnSchema().Name("third_column").Type(EValueType::VT_STRING)); + + TDummyInferenceContext context(3,1); + TJobOperationPreparer builder(context); + + builder + .InputDescription<TUrlRow>(0) + .InputDescription<TUrlRow>(1) + .InputDescription<TUrlRow>(2) + .OutputDescription<TUrlRow>(0); + + builder.Finish(); +} + +//////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/interface/proto3_ut.proto b/yt/cpp/mapreduce/interface/ut/proto3_ut.proto index b24c13085b..b24c13085b 100644 --- a/yt/cpp/mapreduce/interface/proto3_ut.proto +++ b/yt/cpp/mapreduce/interface/ut/proto3_ut.proto diff --git a/yt/cpp/mapreduce/interface/ut/protobuf_file_options_ut.cpp b/yt/cpp/mapreduce/interface/ut/protobuf_file_options_ut.cpp new file mode 100644 index 0000000000..abfe5bbfdc --- /dev/null +++ b/yt/cpp/mapreduce/interface/ut/protobuf_file_options_ut.cpp @@ -0,0 +1,270 @@ +#include "common_ut.h" + +#include <yt/cpp/mapreduce/interface/errors.h> +#include <yt/cpp/mapreduce/interface/format.h> + +#include <yt/cpp/mapreduce/interface/ut/protobuf_file_options_ut.pb.h> + +#include <yt/cpp/mapreduce/tests/yt_unittest_lib/yt_unittest_lib.h> + +#include <library/cpp/testing/gtest/gtest.h> + +using namespace NYT; + +namespace { + +NTi::TTypePtr GetUrlRowType(bool required) +{ + static const NTi::TTypePtr structType = NTi::Struct({ + {"Host", ToTypeV3(EValueType::VT_STRING, false)}, + {"Path", ToTypeV3(EValueType::VT_STRING, false)}, + {"HttpCode", ToTypeV3(EValueType::VT_INT32, false)}}); + return required ? structType : NTi::TTypePtr(NTi::Optional(structType)); +} + +} // namespace + +TEST(TProtobufFileOptionsTest, TRowFieldSerializationOption) +{ + const auto schema = CreateTableSchema<NTestingFileOptions::TRowFieldSerializationOption>(); + + ASSERT_SERIALIZABLES_EQ(schema, TTableSchema() + .AddColumn(TColumnSchema().Name("UrlRow_1").Type(ToTypeV3(EValueType::VT_STRING, false))) + .AddColumn(TColumnSchema().Name("UrlRow_2").Type(GetUrlRowType(false)))); +} + +TEST(TProtobufFileOptionsTest, TRowMixedSerializationOptions) +{ + const auto schema = CreateTableSchema<NTestingFileOptions::TRowMixedSerializationOptions>(); + + ASSERT_SERIALIZABLES_EQ(schema, TTableSchema() + .AddColumn(TColumnSchema().Name("UrlRow_1").Type(ToTypeV3(EValueType::VT_STRING, false))) + .AddColumn(TColumnSchema().Name("UrlRow_2").Type(GetUrlRowType(false)))); +} + +TEST(TProtobufFileOptionsTest, FieldSortOrder) +{ + const auto schema = CreateTableSchema<NTestingFileOptions::TFieldSortOrder>(); + + auto asInProtoFile = NTi::Optional(NTi::Struct({ + {"x", NTi::Optional(NTi::Int64())}, + {"y", NTi::Optional(NTi::String())}, + {"z", NTi::Optional(NTi::Bool())}, + })); + auto byFieldNumber = NTi::Optional(NTi::Struct({ + {"z", NTi::Optional(NTi::Bool())}, + {"x", NTi::Optional(NTi::Int64())}, + {"y", NTi::Optional(NTi::String())}, + })); + + ASSERT_SERIALIZABLES_EQ(schema, TTableSchema() + .AddColumn(TColumnSchema().Name("EmbeddedDefault").Type(asInProtoFile)) + .AddColumn(TColumnSchema().Name("EmbeddedAsInProtoFile").Type(asInProtoFile)) + .AddColumn(TColumnSchema().Name("EmbeddedByFieldNumber").Type(byFieldNumber))); +} + +TEST(TProtobufFileOptionsTest, Map) +{ + const auto schema = CreateTableSchema<NTestingFileOptions::TWithMap>(); + + auto createKeyValueStruct = [] (NTi::TTypePtr key, NTi::TTypePtr value) { + return NTi::List(NTi::Struct({ + {"key", NTi::Optional(key)}, + {"value", NTi::Optional(value)}, + })); + }; + + auto embedded = NTi::Struct({ + {"x", NTi::Optional(NTi::Int64())}, + {"y", NTi::Optional(NTi::String())}, + }); + + ASSERT_SERIALIZABLES_EQ(schema, TTableSchema() + .AddColumn(TColumnSchema() + .Name("MapDefault") + .Type(createKeyValueStruct(NTi::Int64(), embedded))) + .AddColumn(TColumnSchema() + .Name("MapDict") + .Type(NTi::Dict(NTi::Int64(), embedded)))); +} + +TEST(TProtobufFileOptionsTest, Oneof) +{ + const auto schema = CreateTableSchema<NTestingFileOptions::TWithOneof>(); + + auto embedded = NTi::Struct({ + {"x", NTi::Optional(NTi::Int64())}, + {"y", NTi::Optional(NTi::String())}, + }); + + auto defaultVariantType = NTi::Optional(NTi::Struct({ + {"field", NTi::Optional(NTi::String())}, + {"Oneof2", NTi::Optional(NTi::Variant(NTi::Struct({ + {"y2", NTi::String()}, + {"z2", embedded}, + {"x2", NTi::Int64()}, + })))}, + {"x1", NTi::Optional(NTi::Int64())}, + {"y1", NTi::Optional(NTi::String())}, + {"z1", NTi::Optional(embedded)}, + })); + + auto noDefaultType = NTi::Optional(NTi::Struct({ + {"field", NTi::Optional(NTi::String())}, + {"y2", NTi::Optional(NTi::String())}, + {"z2", NTi::Optional(embedded)}, + {"x2", NTi::Optional(NTi::Int64())}, + {"x1", NTi::Optional(NTi::Int64())}, + {"y1", NTi::Optional(NTi::String())}, + {"z1", NTi::Optional(embedded)}, + })); + + ASSERT_SERIALIZABLES_EQ(schema, TTableSchema() + .AddColumn(TColumnSchema() + .Name("DefaultVariant") + .Type(defaultVariantType) + ) + .AddColumn(TColumnSchema() + .Name("NoDefault") + .Type(noDefaultType) + ) + .AddColumn(TColumnSchema() + .Name("SerializationProtobuf") + .Type(NTi::Optional(NTi::Struct({ + {"x1", NTi::Optional(NTi::Int64())}, + {"y1", NTi::Optional(NTi::String())}, + {"z1", NTi::Optional(NTi::String())}, + }))) + ) + .AddColumn(TColumnSchema() + .Name("MemberOfTopLevelOneof") + .Type(NTi::Optional(NTi::Int64())) + ) + ); +} + +static TNode GetColumns(const TFormat& format, int tableIndex = 0) +{ + return format.Config.GetAttributes()["tables"][tableIndex]["columns"]; +} + +TEST(TProtobufFormatFileOptionsTest, TRowFieldSerializationOption) +{ + const auto format = TFormat::Protobuf<NTestingFileOptions::TRowFieldSerializationOption>(); + auto columns = GetColumns(format); + + EXPECT_EQ(columns[0]["name"], "UrlRow_1"); + EXPECT_EQ(columns[0]["proto_type"], "message"); + EXPECT_EQ(columns[0]["field_number"], 1); + + EXPECT_EQ(columns[1]["name"], "UrlRow_2"); + EXPECT_EQ(columns[1]["proto_type"], "structured_message"); + EXPECT_EQ(columns[1]["field_number"], 2); + const auto& fields = columns[1]["fields"]; + EXPECT_EQ(fields[0]["name"], "Host"); + EXPECT_EQ(fields[0]["proto_type"], "string"); + EXPECT_EQ(fields[0]["field_number"], 1); + + EXPECT_EQ(fields[1]["name"], "Path"); + EXPECT_EQ(fields[1]["proto_type"], "string"); + EXPECT_EQ(fields[1]["field_number"], 2); + + EXPECT_EQ(fields[2]["name"], "HttpCode"); + EXPECT_EQ(fields[2]["proto_type"], "sint32"); + EXPECT_EQ(fields[2]["field_number"], 3); +} + +TEST(TProtobufFormatFileOptionsTest, Map) +{ + const auto format = TFormat::Protobuf<NTestingFileOptions::TWithMap>(); + auto columns = GetColumns(format); + + EXPECT_EQ(columns.Size(), 2u); + { + const auto& column = columns[0]; + EXPECT_EQ(column["name"], "MapDefault"); + EXPECT_EQ(column["proto_type"], "structured_message"); + EXPECT_EQ(column["fields"].Size(), 2u); + EXPECT_EQ(column["fields"][0]["proto_type"], "int64"); + EXPECT_EQ(column["fields"][1]["proto_type"], "structured_message"); + } + { + const auto& column = columns[1]; + EXPECT_EQ(column["name"], "MapDict"); + EXPECT_EQ(column["proto_type"], "structured_message"); + EXPECT_EQ(column["fields"].Size(), 2u); + EXPECT_EQ(column["fields"][0]["proto_type"], "int64"); + EXPECT_EQ(column["fields"][1]["proto_type"], "structured_message"); + } +} + +TEST(TProtobufFormatFileOptionsTest, Oneof) +{ + const auto format = TFormat::Protobuf<NTestingFileOptions::TWithOneof>(); + auto columns = GetColumns(format); + + EXPECT_EQ(columns.Size(), 4u); + + { + const auto& column = columns[0]; + EXPECT_EQ(column["name"], "DefaultVariant"); + EXPECT_EQ(column["proto_type"], "structured_message"); + EXPECT_EQ(column["fields"].Size(), 5u); + EXPECT_EQ(column["fields"][0]["name"], "field"); + + const auto& oneof2 = column["fields"][1]; + EXPECT_EQ(oneof2["name"], "Oneof2"); + EXPECT_EQ(oneof2["proto_type"], "oneof"); + EXPECT_EQ(oneof2["fields"][0]["name"], "y2"); + EXPECT_EQ(oneof2["fields"][1]["name"], "z2"); + EXPECT_EQ(oneof2["fields"][1]["proto_type"], "structured_message"); + const auto& embeddedFields = oneof2["fields"][1]["fields"]; + EXPECT_EQ(embeddedFields[0]["name"], "x"); + EXPECT_EQ(embeddedFields[1]["name"], "y"); + + EXPECT_EQ(oneof2["fields"][2]["name"], "x2"); + + EXPECT_EQ(column["fields"][2]["name"], "x1"); + EXPECT_EQ(column["fields"][3]["name"], "y1"); + EXPECT_EQ(column["fields"][4]["name"], "z1"); + }; + + { + const auto& column = columns[1]; + EXPECT_EQ(column["name"], "NoDefault"); + EXPECT_EQ(column["proto_type"], "structured_message"); + const auto& fields = column["fields"]; + EXPECT_EQ(fields.Size(), 7u); + + EXPECT_EQ(fields[0]["name"], "field"); + + EXPECT_EQ(fields[1]["name"], "y2"); + + EXPECT_EQ(fields[2]["name"], "z2"); + EXPECT_EQ(fields[2]["proto_type"], "structured_message"); + const auto& embeddedFields = fields[2]["fields"]; + EXPECT_EQ(embeddedFields[0]["name"], "x"); + EXPECT_EQ(embeddedFields[1]["name"], "y"); + + EXPECT_EQ(fields[3]["name"], "x2"); + + EXPECT_EQ(fields[4]["name"], "x1"); + EXPECT_EQ(fields[5]["name"], "y1"); + EXPECT_EQ(fields[6]["name"], "z1"); + }; + + { + const auto& column = columns[2]; + EXPECT_EQ(column["name"], "SerializationProtobuf"); + EXPECT_EQ(column["proto_type"], "structured_message"); + EXPECT_EQ(column["fields"].Size(), 3u); + EXPECT_EQ(column["fields"][0]["name"], "x1"); + EXPECT_EQ(column["fields"][1]["name"], "y1"); + EXPECT_EQ(column["fields"][2]["name"], "z1"); + } + { + const auto& column = columns[3]; + EXPECT_EQ(column["name"], "MemberOfTopLevelOneof"); + EXPECT_EQ(column["proto_type"], "int64"); + } +} diff --git a/yt/cpp/mapreduce/interface/protobuf_file_options_ut.proto b/yt/cpp/mapreduce/interface/ut/protobuf_file_options_ut.proto index 4804b2f60c..4804b2f60c 100644 --- a/yt/cpp/mapreduce/interface/protobuf_file_options_ut.proto +++ b/yt/cpp/mapreduce/interface/ut/protobuf_file_options_ut.proto diff --git a/yt/cpp/mapreduce/interface/ut/protobuf_table_schema_ut.cpp b/yt/cpp/mapreduce/interface/ut/protobuf_table_schema_ut.cpp new file mode 100644 index 0000000000..d7bee1e6d2 --- /dev/null +++ b/yt/cpp/mapreduce/interface/ut/protobuf_table_schema_ut.cpp @@ -0,0 +1,444 @@ +#include "common_ut.h" + +#include <yt/cpp/mapreduce/interface/common.h> +#include <yt/cpp/mapreduce/interface/errors.h> + +#include <yt/cpp/mapreduce/interface/ut/proto3_ut.pb.h> +#include <yt/cpp/mapreduce/interface/ut/protobuf_table_schema_ut.pb.h> + +#include <yt/cpp/mapreduce/tests/yt_unittest_lib/yt_unittest_lib.h> + +#include <library/cpp/testing/gtest/gtest.h> + +#include <util/generic/fwd.h> + +#include <algorithm> + +using namespace NYT; + +bool IsFieldPresent(const TTableSchema& schema, TStringBuf name) +{ + for (const auto& field : schema.Columns()) { + if (field.Name() == name) { + return true; + } + } + return false; +} + +TEST(TProtoSchemaSimpleTest, TIntegral) +{ + const auto schema = CreateTableSchema<NUnitTesting::TIntegral>(); + + ASSERT_SERIALIZABLES_EQ(schema, TTableSchema() + .AddColumn(TColumnSchema().Name("DoubleField").Type(ToTypeV3(EValueType::VT_DOUBLE, false))) + .AddColumn(TColumnSchema().Name("FloatField").Type(ToTypeV3(EValueType::VT_DOUBLE, false))) + .AddColumn(TColumnSchema().Name("Int32Field").Type(ToTypeV3(EValueType::VT_INT32, false))) + .AddColumn(TColumnSchema().Name("Int64Field").Type(ToTypeV3(EValueType::VT_INT64, false))) + .AddColumn(TColumnSchema().Name("Uint32Field").Type(ToTypeV3(EValueType::VT_UINT32, false))) + .AddColumn(TColumnSchema().Name("Uint64Field").Type(ToTypeV3(EValueType::VT_UINT64, false))) + .AddColumn(TColumnSchema().Name("Sint32Field").Type(ToTypeV3(EValueType::VT_INT32, false))) + .AddColumn(TColumnSchema().Name("Sint64Field").Type(ToTypeV3(EValueType::VT_INT64, false))) + .AddColumn(TColumnSchema().Name("Fixed32Field").Type(ToTypeV3(EValueType::VT_UINT32, false))) + .AddColumn(TColumnSchema().Name("Fixed64Field").Type(ToTypeV3(EValueType::VT_UINT64, false))) + .AddColumn(TColumnSchema().Name("Sfixed32Field").Type(ToTypeV3(EValueType::VT_INT32, false))) + .AddColumn(TColumnSchema().Name("Sfixed64Field").Type(ToTypeV3(EValueType::VT_INT64, false))) + .AddColumn(TColumnSchema().Name("BoolField").Type(ToTypeV3(EValueType::VT_BOOLEAN, false))) + .AddColumn(TColumnSchema().Name("EnumField").Type(ToTypeV3(EValueType::VT_STRING, false)))); +} + +TEST(TProtoSchemaSimpleTest, TOneOf) +{ + const auto schema = CreateTableSchema<NUnitTesting::TOneOf>(); + + ASSERT_SERIALIZABLES_EQ(schema, TTableSchema() + .AddColumn(TColumnSchema().Name("DoubleField").Type(ToTypeV3(EValueType::VT_DOUBLE, false))) + .AddColumn(TColumnSchema().Name("Int32Field").Type(ToTypeV3(EValueType::VT_INT32, false))) + .AddColumn(TColumnSchema().Name("BoolField").Type(ToTypeV3(EValueType::VT_BOOLEAN, false)))); +} + +TEST(TProtoSchemaSimpleTest, TWithRequired) +{ + const auto schema = CreateTableSchema<NUnitTesting::TWithRequired>(); + + ASSERT_SERIALIZABLES_EQ(schema, TTableSchema() + .AddColumn(TColumnSchema().Name("RequiredField").Type(ToTypeV3(EValueType::VT_STRING, true))) + .AddColumn(TColumnSchema().Name("NotRequiredField").Type(ToTypeV3(EValueType::VT_STRING, false)))); +} + +TEST(TProtoSchemaSimpleTest, TAggregated) +{ + const auto schema = CreateTableSchema<NUnitTesting::TAggregated>(); + + EXPECT_EQ(6u, schema.Columns().size()); + ASSERT_SERIALIZABLES_EQ(schema, TTableSchema() + .AddColumn(TColumnSchema().Name("StringField").Type(ToTypeV3(EValueType::VT_STRING, false))) + .AddColumn(TColumnSchema().Name("BytesField").Type(ToTypeV3(EValueType::VT_STRING, false))) + .AddColumn(TColumnSchema().Name("NestedField").Type(ToTypeV3(EValueType::VT_STRING, false))) + .AddColumn(TColumnSchema().Name("NestedRepeatedField").Type(ToTypeV3(EValueType::VT_STRING, false))) + .AddColumn(TColumnSchema().Name("NestedOneOfField").Type(ToTypeV3(EValueType::VT_STRING, false))) + .AddColumn(TColumnSchema().Name("NestedRecursiveField").Type(ToTypeV3(EValueType::VT_STRING, false)))); +} + +TEST(TProtoSchemaSimpleTest, TAliased) +{ + const auto schema = CreateTableSchema<NUnitTesting::TAliased>(); + + ASSERT_SERIALIZABLES_EQ(schema, TTableSchema() + .AddColumn(TColumnSchema().Name("key").Type(ToTypeV3(EValueType::VT_INT32, false))) + .AddColumn(TColumnSchema().Name("subkey").Type(ToTypeV3(EValueType::VT_DOUBLE, false))) + .AddColumn(TColumnSchema().Name("Data").Type(ToTypeV3(EValueType::VT_STRING, false)))); +} + +TEST(TProtoSchemaSimpleTest, SortColumns) +{ + const TSortColumns keys = {"key", "subkey"}; + + const auto schema = CreateTableSchema<NUnitTesting::TAliased>(keys); + + ASSERT_SERIALIZABLES_EQ(schema, TTableSchema() + .AddColumn(TColumnSchema() + .Name("key") + .Type(ToTypeV3(EValueType::VT_INT32, false)) + .SortOrder(ESortOrder::SO_ASCENDING)) + .AddColumn(TColumnSchema() + .Name("subkey") + .Type(ToTypeV3(EValueType::VT_DOUBLE, false)) + .SortOrder(ESortOrder::SO_ASCENDING)) + .AddColumn(TColumnSchema().Name("Data").Type(ToTypeV3(EValueType::VT_STRING, false)))); +} + +TEST(TProtoSchemaSimpleTest, SortColumnsReordered) +{ + const TSortColumns keys = {"subkey"}; + + const auto schema = CreateTableSchema<NUnitTesting::TAliased>(keys); + + ASSERT_SERIALIZABLES_EQ(schema, TTableSchema() + .AddColumn(TColumnSchema() + .Name("subkey") + .Type(ToTypeV3(EValueType::VT_DOUBLE, false)) + .SortOrder(ESortOrder::SO_ASCENDING)) + .AddColumn(TColumnSchema().Name("key").Type(ToTypeV3(EValueType::VT_INT32, false))) + .AddColumn(TColumnSchema().Name("Data").Type(ToTypeV3(EValueType::VT_STRING, false)))); +} + +TEST(TProtoSchemaSimpleTest, SortColumnsInvalid) +{ + EXPECT_THROW(CreateTableSchema<NUnitTesting::TAliased>({"subkey", "subkey"}), yexception); + EXPECT_THROW(CreateTableSchema<NUnitTesting::TAliased>({"key", "junk"}), yexception); +} + +TEST(TProtoSchemaSimpleTest, KeepFieldsWithoutExtensionTrue) +{ + const auto schema = CreateTableSchema<NUnitTesting::TAliased>({}, true); + EXPECT_TRUE(IsFieldPresent(schema, "key")); + EXPECT_TRUE(IsFieldPresent(schema, "subkey")); + EXPECT_TRUE(IsFieldPresent(schema, "Data")); + EXPECT_TRUE(schema.Strict()); +} + +TEST(TProtoSchemaSimpleTest, KeepFieldsWithoutExtensionFalse) +{ + const auto schema = CreateTableSchema<NUnitTesting::TAliased>({}, false); + EXPECT_TRUE(IsFieldPresent(schema, "key")); + EXPECT_TRUE(IsFieldPresent(schema, "subkey")); + EXPECT_TRUE(!IsFieldPresent(schema, "Data")); + EXPECT_TRUE(schema.Strict()); +} + +TEST(TProtoSchemaSimpleTest, ProtobufTypeOption) +{ + const auto schema = CreateTableSchema<NUnitTesting::TWithTypeOptions>({}); + + ASSERT_SERIALIZABLES_EQ(schema, TTableSchema() + .Strict(false) + .AddColumn(TColumnSchema().Name("ColorIntField").Type(ToTypeV3(EValueType::VT_INT64, false))) + .AddColumn(TColumnSchema().Name("ColorStringField").Type(ToTypeV3(EValueType::VT_STRING, false))) + .AddColumn(TColumnSchema().Name("AnyField").Type(ToTypeV3(EValueType::VT_ANY, false))) + .AddColumn(TColumnSchema().Name("EmbeddedField").Type( + NTi::Optional(NTi::Struct({ + {"ColorIntField", ToTypeV3(EValueType::VT_INT64, false)}, + {"ColorStringField", ToTypeV3(EValueType::VT_STRING, false)}, + {"AnyField", ToTypeV3(EValueType::VT_ANY, false)}})))) + .AddColumn(TColumnSchema().Name("RepeatedEnumIntField").Type(NTi::List(NTi::Int64())))); +} + +TEST(TProtoSchemaSimpleTest, ProtobufTypeOption_TypeMismatch) +{ + EXPECT_THROW( + CreateTableSchema<NUnitTesting::TWithTypeOptions_TypeMismatch_EnumInt>({}), + yexception); + EXPECT_THROW( + CreateTableSchema<NUnitTesting::TWithTypeOptions_TypeMismatch_EnumString>({}), + yexception); + EXPECT_THROW( + CreateTableSchema<NUnitTesting::TWithTypeOptions_TypeMismatch_Any>({}), + yexception); + EXPECT_THROW( + CreateTableSchema<NUnitTesting::TWithTypeOptions_TypeMismatch_OtherColumns>({}), + yexception); +} + +NTi::TTypePtr GetUrlRowType_ColumnNames(bool required) +{ + static const NTi::TTypePtr type = NTi::Struct({ + {"Host_ColumnName", ToTypeV3(EValueType::VT_STRING, false)}, + {"Path_KeyColumnName", ToTypeV3(EValueType::VT_STRING, false)}, + {"HttpCode", ToTypeV3(EValueType::VT_INT32, false)}, + }); + return required ? type : NTi::TTypePtr(NTi::Optional(type)); +} + +TEST(TProtoSchemaComplexTest, TRepeated) +{ + EXPECT_THROW(CreateTableSchema<NUnitTesting::TRepeated>(), yexception); + + const auto schema = CreateTableSchema<NUnitTesting::TRepeatedYtMode>(); + ASSERT_SERIALIZABLES_EQ(schema, TTableSchema() + .AddColumn(TColumnSchema().Name("Int32Field").Type(NTi::List(ToTypeV3(EValueType::VT_INT32, true))))); +} + +TEST(TProtoSchemaComplexTest, TRepeatedOptionalList) +{ + const auto schema = CreateTableSchema<NUnitTesting::TOptionalList>(); + auto type = NTi::Optional(NTi::List(NTi::Int64())); + ASSERT_SERIALIZABLES_EQ(schema, TTableSchema() + .AddColumn(TColumnSchema().Name("OptionalListInt64").TypeV3(type))); +} + +NTi::TTypePtr GetUrlRowType(bool required) +{ + static const NTi::TTypePtr structType = NTi::Struct({ + {"Host", ToTypeV3(EValueType::VT_STRING, false)}, + {"Path", ToTypeV3(EValueType::VT_STRING, false)}, + {"HttpCode", ToTypeV3(EValueType::VT_INT32, false)}}); + return required ? structType : NTi::TTypePtr(NTi::Optional(structType)); +} + +TEST(TProtoSchemaComplexTest, TRowFieldSerializationOption) +{ + const auto schema = CreateTableSchema<NUnitTesting::TRowFieldSerializationOption>(); + + ASSERT_SERIALIZABLES_EQ(schema, TTableSchema() + .AddColumn(TColumnSchema().Name("UrlRow_1").Type(GetUrlRowType(false))) + .AddColumn(TColumnSchema().Name("UrlRow_2").Type(ToTypeV3(EValueType::VT_STRING, false)))); +} + +TEST(TProtoSchemaComplexTest, TRowMessageSerializationOption) +{ + const auto schema = CreateTableSchema<NUnitTesting::TRowMessageSerializationOption>(); + + ASSERT_SERIALIZABLES_EQ(schema, TTableSchema() + .AddColumn(TColumnSchema().Name("UrlRow_1").Type(GetUrlRowType(false))) + .AddColumn(TColumnSchema().Name("UrlRow_2").Type(GetUrlRowType(false)))); +} + +TEST(TProtoSchemaComplexTest, TRowMixedSerializationOptions) +{ + const auto schema = CreateTableSchema<NUnitTesting::TRowMixedSerializationOptions>(); + + ASSERT_SERIALIZABLES_EQ(schema, TTableSchema() + .AddColumn(TColumnSchema().Name("UrlRow_1").Type(GetUrlRowType(false))) + .AddColumn(TColumnSchema().Name("UrlRow_2").Type(ToTypeV3(EValueType::VT_STRING, false)))); +} + +TEST(TProtoSchemaComplexTest, TRowMixedSerializationOptions_ColumnNames) +{ + const auto schema = CreateTableSchema<NUnitTesting::TRowMixedSerializationOptions_ColumnNames>(); + + ASSERT_SERIALIZABLES_EQ(schema, TTableSchema() + .AddColumn(TColumnSchema().Name("UrlRow_1").Type(GetUrlRowType_ColumnNames(false))) + .AddColumn(TColumnSchema().Name("UrlRow_2").Type(ToTypeV3(EValueType::VT_STRING, false)))); +} + +TEST(TProtoSchemaComplexTest, NoOptionInheritance) +{ + auto deepestEmbedded = NTi::Optional(NTi::Struct({{"x", ToTypeV3(EValueType::VT_INT64, false)}})); + + const auto schema = CreateTableSchema<NUnitTesting::TNoOptionInheritance>(); + + ASSERT_SERIALIZABLES_EQ(schema, TTableSchema() + .AddColumn(TColumnSchema() + .Name("EmbeddedYt_YtOption") + .Type(NTi::Optional(NTi::Struct({{"embedded", deepestEmbedded}})))) + .AddColumn(TColumnSchema().Name("EmbeddedYt_ProtobufOption").Type(ToTypeV3(EValueType::VT_STRING, false))) + .AddColumn(TColumnSchema().Name("EmbeddedYt_NoOption").Type(ToTypeV3(EValueType::VT_STRING, false))) + .AddColumn(TColumnSchema() + .Name("EmbeddedProtobuf_YtOption") + .Type(NTi::Optional(NTi::Struct({{"embedded", ToTypeV3(EValueType::VT_STRING, false)}})))) + .AddColumn(TColumnSchema().Name("EmbeddedProtobuf_ProtobufOption").Type(ToTypeV3(EValueType::VT_STRING, false))) + .AddColumn(TColumnSchema().Name("EmbeddedProtobuf_NoOption").Type(ToTypeV3(EValueType::VT_STRING, false))) + .AddColumn(TColumnSchema() + .Name("Embedded_YtOption") + .Type(NTi::Optional(NTi::Struct({{"embedded", ToTypeV3(EValueType::VT_STRING, false)}})))) + .AddColumn(TColumnSchema().Name("Embedded_ProtobufOption").Type(ToTypeV3(EValueType::VT_STRING, false))) + .AddColumn(TColumnSchema().Name("Embedded_NoOption").Type(ToTypeV3(EValueType::VT_STRING, false)))); +} + +TEST(TProtoSchemaComplexTest, Cyclic) +{ + EXPECT_THROW(CreateTableSchema<NUnitTesting::TCyclic>(), TApiUsageError); + EXPECT_THROW(CreateTableSchema<NUnitTesting::TCyclic::TA>(), TApiUsageError); + EXPECT_THROW(CreateTableSchema<NUnitTesting::TCyclic::TB>(), TApiUsageError); + EXPECT_THROW(CreateTableSchema<NUnitTesting::TCyclic::TC>(), TApiUsageError); + EXPECT_THROW(CreateTableSchema<NUnitTesting::TCyclic::TD>(), TApiUsageError); + + ASSERT_SERIALIZABLES_EQ( + TTableSchema().AddColumn( + TColumnSchema().Name("d").TypeV3(NTi::Optional(NTi::String()))), + CreateTableSchema<NUnitTesting::TCyclic::TE>()); +} + +TEST(TProtoSchemaComplexTest, FieldSortOrder) +{ + const auto schema = CreateTableSchema<NUnitTesting::TFieldSortOrder>(); + + auto byFieldNumber = NTi::Optional(NTi::Struct({ + {"z", NTi::Optional(NTi::Bool())}, + {"x", NTi::Optional(NTi::Int64())}, + {"y", NTi::Optional(NTi::String())}, + })); + + ASSERT_SERIALIZABLES_EQ(schema, TTableSchema() + .AddColumn(TColumnSchema().Name("EmbeddedDefault").Type(byFieldNumber)) + .AddColumn(TColumnSchema() + .Name("EmbeddedAsInProtoFile") + .Type(NTi::Optional(NTi::Struct({ + {"x", NTi::Optional(NTi::Int64())}, + {"y", NTi::Optional(NTi::String())}, + {"z", NTi::Optional(NTi::Bool())}, + })))) + .AddColumn(TColumnSchema().Name("EmbeddedByFieldNumber").Type(byFieldNumber))); +} + +TEST(TProtoSchemaComplexTest, Map) +{ + const auto schema = CreateTableSchema<NUnitTesting::TWithMap>(); + + auto createKeyValueStruct = [] (NTi::TTypePtr key, NTi::TTypePtr value) { + return NTi::List(NTi::Struct({ + {"key", NTi::Optional(key)}, + {"value", NTi::Optional(value)}, + })); + }; + + auto embedded = NTi::Struct({ + {"x", NTi::Optional(NTi::Int64())}, + {"y", NTi::Optional(NTi::String())}, + }); + + ASSERT_SERIALIZABLES_EQ(schema, TTableSchema() + .AddColumn(TColumnSchema() + .Name("MapDefault") + .Type(createKeyValueStruct(NTi::Int64(), NTi::String()))) + .AddColumn(TColumnSchema() + .Name("MapListOfStructsLegacy") + .Type(createKeyValueStruct(NTi::Int64(), NTi::String()))) + .AddColumn(TColumnSchema() + .Name("MapListOfStructs") + .Type(createKeyValueStruct(NTi::Int64(), embedded))) + .AddColumn(TColumnSchema() + .Name("MapOptionalDict") + .Type(NTi::Optional(NTi::Dict(NTi::Int64(), embedded)))) + .AddColumn(TColumnSchema() + .Name("MapDict") + .Type(NTi::Dict(NTi::Int64(), embedded)))); +} + +TEST(TProtoSchemaComplexTest, Oneof) +{ + const auto schema = CreateTableSchema<NUnitTesting::TWithOneof>(); + + auto embedded = NTi::Struct({ + {"Oneof", NTi::Optional(NTi::Variant(NTi::Struct({ + {"x", NTi::Int64()}, + {"y", NTi::String()}, + })))}, + }); + + auto createType = [&] (TString oneof2Name) { + return NTi::Optional(NTi::Struct({ + {"field", NTi::Optional(NTi::String())}, + {oneof2Name, NTi::Optional(NTi::Variant(NTi::Struct({ + {"x2", NTi::Int64()}, + {"y2", NTi::String()}, + {"z2", embedded}, + })))}, + {"y1", NTi::Optional(NTi::String())}, + {"z1", NTi::Optional(embedded)}, + {"x1", NTi::Optional(NTi::Int64())}, + })); + }; + + ASSERT_SERIALIZABLES_EQ(schema, TTableSchema() + .AddColumn(TColumnSchema() + .Name("DefaultSeparateFields") + .Type(createType("variant_field_name"))) + .AddColumn(TColumnSchema() + .Name("NoDefault") + .Type(createType("Oneof2"))) + .AddColumn(TColumnSchema() + .Name("SerializationProtobuf") + .Type(NTi::Optional(NTi::Struct({ + {"y1", NTi::Optional(NTi::String())}, + {"x1", NTi::Optional(NTi::Int64())}, + {"z1", NTi::Optional(NTi::String())}, + })))) + .AddColumn(TColumnSchema() + .Name("TopLevelOneof") + .Type( + NTi::Optional( + NTi::Variant(NTi::Struct({ + {"MemberOfTopLevelOneof", NTi::Int64()} + })) + ) + )) + ); +} + +TEST(TProtoSchemaComplexTest, Embedded) +{ + const auto schema = CreateTableSchema<NUnitTesting::TEmbeddingMessage>(); + ASSERT_SERIALIZABLES_EQ(schema, TTableSchema() + .Strict(false) + .AddColumn(TColumnSchema().Name("embedded2_num").Type(NTi::Optional(NTi::Uint64()))) + .AddColumn(TColumnSchema().Name("embedded2_struct").Type(NTi::Optional(NTi::Struct({ + {"float1", NTi::Optional(NTi::Double())}, + {"string1", NTi::Optional(NTi::String())}, + })))) + .AddColumn(TColumnSchema().Name("embedded2_repeated").Type(NTi::List(NTi::String()))) + .AddColumn(TColumnSchema().Name("embedded_num").Type(NTi::Optional(NTi::Uint64()))) + .AddColumn(TColumnSchema().Name("embedded_extra_field").Type(NTi::Optional(NTi::String()))) + .AddColumn(TColumnSchema().Name("variant").Type(NTi::Optional(NTi::Variant(NTi::Struct({ + {"str_variant", NTi::String()}, + {"uint_variant", NTi::Uint64()}, + }))))) + .AddColumn(TColumnSchema().Name("num").Type(NTi::Optional(NTi::Uint64()))) + .AddColumn(TColumnSchema().Name("extra_field").Type(NTi::Optional(NTi::String()))) + ); +} + +TEST(TProtoSchemaProto3Test, TWithOptional) +{ + const auto schema = CreateTableSchema<NTestingProto3::TWithOptional>(); + ASSERT_SERIALIZABLES_EQ(schema, TTableSchema() + .AddColumn(TColumnSchema() + .Name("x").Type(NTi::Optional(NTi::Int64())) + ) + ); +} + +TEST(TProtoSchemaProto3Test, TWithOptionalMessage) +{ + const auto schema = CreateTableSchema<NTestingProto3::TWithOptionalMessage>(); + ASSERT_SERIALIZABLES_EQ(schema, TTableSchema() + .AddColumn(TColumnSchema() + .Name("x").Type( + NTi::Optional( + NTi::Struct({{"x", NTi::Optional(NTi::Int64())}}) + ) + ) + ) + ); +} diff --git a/yt/cpp/mapreduce/interface/protobuf_table_schema_ut.proto b/yt/cpp/mapreduce/interface/ut/protobuf_table_schema_ut.proto index da1e48f691..da1e48f691 100644 --- a/yt/cpp/mapreduce/interface/protobuf_table_schema_ut.proto +++ b/yt/cpp/mapreduce/interface/ut/protobuf_table_schema_ut.proto diff --git a/yt/cpp/mapreduce/interface/ut/serialize_ut.cpp b/yt/cpp/mapreduce/interface/ut/serialize_ut.cpp new file mode 100644 index 0000000000..0acec154d4 --- /dev/null +++ b/yt/cpp/mapreduce/interface/ut/serialize_ut.cpp @@ -0,0 +1,46 @@ +#include <yt/cpp/mapreduce/interface/serialize.h> +#include <yt/cpp/mapreduce/interface/common.h> + +#include <library/cpp/yson/node/node_builder.h> + +#include <library/cpp/testing/gtest/gtest.h> + +#include <util/generic/serialized_enum.h> + +using namespace NYT; + +TEST(TSerializationTest, TableSchema) +{ + auto schema = TTableSchema() + .AddColumn(TColumnSchema().Name("a").Type(EValueType::VT_STRING).SortOrder(SO_ASCENDING)) + .AddColumn(TColumnSchema().Name("b").Type(EValueType::VT_UINT64)) + .AddColumn(TColumnSchema().Name("c").Type(EValueType::VT_INT64, true)); + + auto schemaNode = schema.ToNode(); + EXPECT_TRUE(schemaNode.IsList()); + EXPECT_EQ(schemaNode.Size(), 3u); + + + EXPECT_EQ(schemaNode[0]["name"], "a"); + EXPECT_EQ(schemaNode[0]["type"], "string"); + EXPECT_EQ(schemaNode[0]["required"], false); + EXPECT_EQ(schemaNode[0]["sort_order"], "ascending"); + + EXPECT_EQ(schemaNode[1]["name"], "b"); + EXPECT_EQ(schemaNode[1]["type"], "uint64"); + EXPECT_EQ(schemaNode[1]["required"], false); + + EXPECT_EQ(schemaNode[2]["name"], "c"); + EXPECT_EQ(schemaNode[2]["type"], "int64"); + EXPECT_EQ(schemaNode[2]["required"], true); +} + +TEST(TSerializationTest, ValueTypeSerialization) +{ + for (const auto value : GetEnumAllValues<EValueType>()) { + TNode serialized = NYT::NDetail::ToString(value); + EValueType deserialized; + Deserialize(deserialized, serialized); + EXPECT_EQ(value, deserialized); + } +} diff --git a/yt/cpp/mapreduce/interface/ut/ya.make b/yt/cpp/mapreduce/interface/ut/ya.make index 0219e6430c..9e92931b5d 100644 --- a/yt/cpp/mapreduce/interface/ut/ya.make +++ b/yt/cpp/mapreduce/interface/ut/ya.make @@ -1,4 +1,4 @@ -UNITTEST_FOR(yt/cpp/mapreduce/interface) +GTEST() SRCS( common_ut.cpp @@ -18,8 +18,9 @@ SRCS( PEERDIR( contrib/libs/protobuf - library/cpp/testing/unittest + library/cpp/testing/gtest yt/yt_proto/yt/formats + yt/cpp/mapreduce/interface ) END() diff --git a/yt/cpp/mapreduce/interface/ya.make b/yt/cpp/mapreduce/interface/ya.make index f9bc3e172c..a4cbd8951a 100644 --- a/yt/cpp/mapreduce/interface/ya.make +++ b/yt/cpp/mapreduce/interface/ya.make @@ -45,4 +45,6 @@ GENERATE_ENUM_SERIALIZATION(protobuf_format.h) END() -RECURSE_FOR_TESTS(ut) +RECURSE_FOR_TESTS( + ut +) diff --git a/yt/cpp/mapreduce/io/ut/end_of_stream_ut.cpp b/yt/cpp/mapreduce/io/ut/end_of_stream_ut.cpp new file mode 100644 index 0000000000..3642aa1e45 --- /dev/null +++ b/yt/cpp/mapreduce/io/ut/end_of_stream_ut.cpp @@ -0,0 +1,94 @@ +#include <yt/cpp/mapreduce/io/node_table_reader.h> + +#include <library/cpp/testing/gtest/gtest.h> + +using namespace NYT; + +//////////////////////////////////////////////////////////////////// + +class TStringRawTableReader + : public TRawTableReader +{ +public: + TStringRawTableReader(const TString& string) + : String_(string) + , Stream_(String_) + { } + + bool Retry(const TMaybe<ui32>&, const TMaybe<ui64>&, const std::exception_ptr&) override + { + return false; + } + + void ResetRetries() override + { } + + bool HasRangeIndices() const override + { + return false; + } + +private: + size_t DoRead(void* buf, size_t len) override + { + return Stream_.Read(buf, len); + } + +private: + const TString String_; + TStringStream Stream_; +}; + +//////////////////////////////////////////////////////////////////// + +class TEndOfStreamTest + : public ::testing::TestWithParam<bool> +{ }; + +TEST_P(TEndOfStreamTest, Eos) +{ + bool addEos = GetParam(); + auto proxy = ::MakeIntrusive<TStringRawTableReader>(TString::Join( + "{a=13;b = \"string\"}; {c = {d=12}};", + "<key_switch=%true>#; {e = 42};", + addEos ? "<end_of_stream=%true>#" : "" + )); + + TNodeTableReader reader(proxy); + TVector<TNode> expectedRows = {TNode()("a", 13)("b", "string"), TNode()("c", TNode()("d", 12))}; + for (const auto& expectedRow : expectedRows) { + EXPECT_TRUE(reader.IsValid()); + EXPECT_TRUE(!reader.IsEndOfStream()); + EXPECT_TRUE(!reader.IsRawReaderExhausted()); + EXPECT_EQ(reader.GetRow(), expectedRow); + reader.Next(); + } + + EXPECT_TRUE(!reader.IsValid()); + EXPECT_TRUE(!reader.IsEndOfStream()); + EXPECT_TRUE(!reader.IsRawReaderExhausted()); + + reader.NextKey(); + reader.Next(); + expectedRows = {TNode()("e", 42)}; + for (const auto& expectedRow : expectedRows) { + EXPECT_TRUE(reader.IsValid()); + EXPECT_TRUE(!reader.IsEndOfStream()); + EXPECT_TRUE(!reader.IsRawReaderExhausted()); + EXPECT_EQ(reader.GetRow(), expectedRow); + reader.Next(); + } + + EXPECT_TRUE(!reader.IsValid()); + if (addEos) { + EXPECT_TRUE(reader.IsEndOfStream()); + } else { + EXPECT_TRUE(!reader.IsEndOfStream()); + } + EXPECT_TRUE(reader.IsRawReaderExhausted()); +} + +INSTANTIATE_TEST_SUITE_P(WithEos, TEndOfStreamTest, ::testing::Values(true)); +INSTANTIATE_TEST_SUITE_P(WithoutEos, TEndOfStreamTest, ::testing::Values(false)); + +//////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/io/ut/readers_ut.cpp b/yt/cpp/mapreduce/io/ut/readers_ut.cpp new file mode 100644 index 0000000000..86d06629a2 --- /dev/null +++ b/yt/cpp/mapreduce/io/ut/readers_ut.cpp @@ -0,0 +1,232 @@ +#include <yt/cpp/mapreduce/io/node_table_reader.h> +#include <yt/cpp/mapreduce/io/proto_table_reader.h> +#include <yt/cpp/mapreduce/io/skiff_table_reader.h> +#include <yt/cpp/mapreduce/io/stream_table_reader.h> +#include <yt/cpp/mapreduce/io/yamr_table_reader.h> + +#include <yt/cpp/mapreduce/io/ut/ut_row.pb.h> + +#include <yt/cpp/mapreduce/skiff/checked_parser.h> +#include <yt/cpp/mapreduce/skiff/skiff_schema.h> + +#include <library/cpp/yson/node/node_io.h> + +#include <library/cpp/testing/gtest/gtest.h> + +using namespace NYT; +using namespace NYT::NDetail; +using namespace NSkiff; + +//////////////////////////////////////////////////////////////////// + +class TRetryEmulatingRawTableReader + : public TRawTableReader +{ +public: + TRetryEmulatingRawTableReader(const TString& string) + : String_(string) + , Stream_(String_) + { } + + bool Retry( + const TMaybe<ui32>& /*rangeIndex*/, + const TMaybe<ui64>& /*rowIndex*/, + const std::exception_ptr& /*error*/) override + { + if (RetriesLeft_ == 0) { + return false; + } + Stream_ = TStringStream(String_); + --RetriesLeft_; + return true; + } + + void ResetRetries() override + { + RetriesLeft_ = 10; + } + + bool HasRangeIndices() const override + { + return false; + } + +private: + size_t DoRead(void* buf, size_t len) override + { + switch (DoReadCallCount_++) { + case 0: + return Stream_.Read(buf, std::min(len, String_.size() / 2)); + case 1: + ythrow yexception() << "Just wanted to test you, first fail"; + case 2: + ythrow yexception() << "Just wanted to test you, second fail"; + default: + return Stream_.Read(buf, len); + } + } + +private: + const TString String_; + TStringStream Stream_; + int RetriesLeft_ = 10; + int DoReadCallCount_ = 0; +}; + +//////////////////////////////////////////////////////////////////// + +TEST(TReadersTest, YsonGood) +{ + auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>("{a=13;b = \"string\"}; {c = {d=12}}"); + + TNodeTableReader reader(proxy); + TVector<TNode> expectedRows = {TNode()("a", 13)("b", "string"), TNode()("c", TNode()("d", 12))}; + for (const auto& expectedRow : expectedRows) { + EXPECT_TRUE(reader.IsValid()); + EXPECT_TRUE(!reader.IsRawReaderExhausted()); + EXPECT_EQ(reader.GetRow(), expectedRow); + reader.Next(); + } + EXPECT_TRUE(!reader.IsValid()); + EXPECT_TRUE(reader.IsRawReaderExhausted()); +} + +TEST(TReadersTest, YsonBad) +{ + auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>("{a=13;-b := \"string\"}; {c = {d=12}}"); + EXPECT_THROW(TNodeTableReader(proxy).GetRow(), yexception); +} + +TEST(TReadersTest, SkiffGood) +{ + const char arr[] = "\x00\x00" "\x94\x88\x01\x00\x00\x00\x00\x00" "\x06\x00\x00\x00""foobar" "\x01" + "\x00\x00" "\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF" "\x03\x00\x00\x00""abc" "\x00"; + auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>(TString(arr, sizeof(arr) - 1)); + + TSkiffSchemaPtr schema = CreateVariant16Schema({ + CreateTupleSchema({ + CreateSimpleTypeSchema(EWireType::Int64)->SetName("a"), + CreateSimpleTypeSchema(EWireType::String32)->SetName("b"), + CreateSimpleTypeSchema(EWireType::Boolean)->SetName("c") + }) + }); + + TSkiffTableReader reader(proxy, schema); + TVector<TNode> expectedRows = { + TNode()("a", 100500)("b", "foobar")("c", true), + TNode()("a", -1)("b", "abc")("c", false), + }; + for (const auto& expectedRow : expectedRows) { + EXPECT_TRUE(reader.IsValid()); + EXPECT_TRUE(!reader.IsRawReaderExhausted()); + EXPECT_EQ(reader.GetRow(), expectedRow); + reader.Next(); + } + EXPECT_TRUE(!reader.IsValid()); + EXPECT_TRUE(reader.IsRawReaderExhausted()); +} + +TEST(TReadersTest, SkiffExtraColumns) +{ + const char arr[] = "\x00\x00" "\x7B\x00\x00\x00\x00\x00\x00\x00"; + auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>(TString(arr, sizeof(arr) - 1)); + + TSkiffSchemaPtr schema = CreateVariant16Schema({ + CreateTupleSchema({ + CreateSimpleTypeSchema(EWireType::Uint64)->SetName("$timestamp") + }) + }); + + TSkiffTableReader reader(proxy, schema); + TVector<TNode> expectedRows = { + TNode()("$timestamp", 123u), + }; + for (const auto& expectedRow : expectedRows) { + EXPECT_TRUE(reader.IsValid()); + EXPECT_TRUE(!reader.IsRawReaderExhausted()); + EXPECT_EQ(reader.GetRow(), expectedRow); + reader.Next(); + } + EXPECT_TRUE(!reader.IsValid()); + EXPECT_TRUE(reader.IsRawReaderExhausted()); +} + +TEST(TReadersTest, SkiffBad) +{ + const char arr[] = "\x00\x00" "\x94\x88\x01\x00\x00\x00\x00\x00" "\xFF\x00\x00\x00""foobar" "\x01"; + auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>(TString(arr, sizeof(arr) - 1)); + + TSkiffSchemaPtr schema = CreateVariant16Schema({ + CreateTupleSchema({ + CreateSimpleTypeSchema(EWireType::Int64)->SetName("a"), + CreateSimpleTypeSchema(EWireType::String32)->SetName("b"), + CreateSimpleTypeSchema(EWireType::Boolean)->SetName("c") + }) + }); + + EXPECT_THROW(TSkiffTableReader(proxy, schema).GetRow(), yexception); +} + +TEST(TReadersTest, SkiffBadFormat) +{ + const char arr[] = "\x00\x00" "\x12" "\x23\x34\x00\x00"; + auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>(TString(arr, sizeof(arr) - 1)); + + TSkiffSchemaPtr schema = CreateVariant16Schema({ + CreateTupleSchema({ + CreateVariant8Schema({ + CreateSimpleTypeSchema(EWireType::Nothing), + CreateSimpleTypeSchema(EWireType::Int32) + }) + }) + }); + + EXPECT_THROW_MESSAGE_HAS_SUBSTR( + TSkiffTableReader(proxy, schema).GetRow(), + yexception, + "Tag for 'variant8<nothing,int32>' expected to be 0 or 1"); +} + +TEST(TReadersTest, ProtobufGood) +{ + using NYT::NTesting::TRow; + + const char arr[] = "\x13\x00\x00\x00" "\x0A""\x06""foobar" "\x10""\x0F" "\x19""\x94\x88\x01\x00\x00\x00\x00\x00" + "\x10\x00\x00\x00" "\x0A""\x03""abc" "\x10""\x1F" "\x19""\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF"; + auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>(TString(arr, sizeof(arr) - 1)); + + TLenvalProtoTableReader reader(proxy, {TRow::descriptor()}); + TRow row1, row2; + row1.SetString("foobar"); + row1.SetInt32(15); + row1.SetFixed64(100500); + + row2.SetString("abc"); + row2.SetInt32(31); + row2.SetFixed64(-1); + + TVector<TRow> expectedRows = {row1, row2}; + for (const auto& expectedRow : expectedRows) { + TRow row; + EXPECT_TRUE(reader.IsValid()); + EXPECT_TRUE(!reader.IsRawReaderExhausted()); + reader.ReadRow(&row); + EXPECT_EQ(row.GetString(), expectedRow.GetString()); + EXPECT_EQ(row.GetInt32(), expectedRow.GetInt32()); + EXPECT_EQ(row.GetFixed64(), expectedRow.GetFixed64()); + reader.Next(); + } + EXPECT_TRUE(!reader.IsValid()); + EXPECT_TRUE(reader.IsRawReaderExhausted()); +} + +TEST(TReadersTest, ProtobufBad) +{ + const char arr[] = "\x13\x00\x00\x00" "\x0F""\x06""foobar" "\x10""\x0F" "\x19""\x94\x88\x01\x00\x00\x00\x00\x00"; + auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>(TString(arr, sizeof(arr) - 1)); + + NYT::NTesting::TRow row; + EXPECT_THROW(TLenvalProtoTableReader(proxy, { NYT::NTesting::TRow::descriptor() }).ReadRow(&row), yexception); +} + +//////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/io/ut/ut_row.proto b/yt/cpp/mapreduce/io/ut/ut_row.proto new file mode 100644 index 0000000000..6a9649e4c6 --- /dev/null +++ b/yt/cpp/mapreduce/io/ut/ut_row.proto @@ -0,0 +1,7 @@ +package NYT.NTesting; + +message TRow { + optional string String = 1; + optional int32 Int32 = 2; + optional fixed64 Fixed64 = 3; +} diff --git a/yt/cpp/mapreduce/io/ut/ya.make b/yt/cpp/mapreduce/io/ut/ya.make new file mode 100644 index 0000000000..e7f9d018c3 --- /dev/null +++ b/yt/cpp/mapreduce/io/ut/ya.make @@ -0,0 +1,15 @@ +GTEST() + +SRCS( + end_of_stream_ut.cpp + readers_ut.cpp + yamr_table_reader_ut.cpp + + ut_row.proto +) + +PEERDIR( + yt/cpp/mapreduce/io +) + +END() diff --git a/yt/cpp/mapreduce/io/ut/yamr_table_reader_ut.cpp b/yt/cpp/mapreduce/io/ut/yamr_table_reader_ut.cpp new file mode 100644 index 0000000000..d55a28b3e4 --- /dev/null +++ b/yt/cpp/mapreduce/io/ut/yamr_table_reader_ut.cpp @@ -0,0 +1,185 @@ +#include <yt/cpp/mapreduce/io/yamr_table_reader.h> + +#include <library/cpp/testing/gtest/gtest.h> + +using namespace NYT; + +template <> +void Out<std::tuple<TString, TString, TString>>(IOutputStream& out, const std::tuple<TString, TString, TString>& value) { + out << "{" << std::get<0>(value) << ", " << std::get<1>(value) << ", " << std::get<2>(value) << "}"; +} + + +//////////////////////////////////////////////////////////////////// + +class TRowCollection +{ +public: + void AddRow(TStringBuf key, TStringBuf subkey, TStringBuf value) + { + TStringStream row; + auto appendLenval = [&] (TStringBuf value) { + ui32 size = value.size(); + row.Write(&size, sizeof(size)); + row.Write(value); + }; + appendLenval(key); + appendLenval(subkey); + appendLenval(value); + RowList_.push_back(row.Str()); + } + + TString GetStreamDataStartFromRow(ui64 rowIndex) const + { + Y_ABORT_UNLESS(rowIndex < RowList_.size()); + TStringStream ss; + ss.Write("\xFC\xFF\xFF\xFF"); + ss.Write(&rowIndex, sizeof(rowIndex)); + for (size_t i = rowIndex; i != RowList_.size(); ++i) { + ss.Write(RowList_[i]); + } + return ss.Str(); + } + + size_t ComputeTotalStreamSize() const { + return GetStreamDataStartFromRow(0).size(); + } + +private: + TVector<TString> RowList_; +}; + +class TTestRawTableReader + : public TRawTableReader +{ +public: + TTestRawTableReader(const TRowCollection& rowCollection) + : RowCollection_(rowCollection) + , DataToRead_(RowCollection_.GetStreamDataStartFromRow(0)) + , Input_(MakeHolder<TStringStream>(DataToRead_)) + { } + + TTestRawTableReader(const TRowCollection& rowCollection, size_t breakPoint) + : RowCollection_(rowCollection) + , DataToRead_(RowCollection_.GetStreamDataStartFromRow(0).substr(0, breakPoint)) + , Input_(MakeHolder<TStringStream>(DataToRead_)) + , Broken_(true) + { } + + size_t DoRead(void* buf, size_t size) override + { + Y_ABORT_UNLESS(Input_); + size_t res = Input_->Read(buf, size); + if (!res && Broken_) { + ythrow yexception() << "Stream is broken"; + } + return res; + } + + bool Retry( + const TMaybe<ui32>& /*rangeIndex*/, + const TMaybe<ui64>& rowIndex, + const std::exception_ptr& /*error*/) override + { + if (--Retries < 0) { + return false; + } + ui64 actualRowIndex = rowIndex ? *rowIndex : 0; + DataToRead_ = RowCollection_.GetStreamDataStartFromRow(actualRowIndex); + Input_ = MakeHolder<TStringInput>(DataToRead_); + Broken_ = false; + return true; + } + + void ResetRetries() override + { } + + bool HasRangeIndices() const override + { + return false; + } + +private: + TRowCollection RowCollection_; + TString DataToRead_; + THolder<IInputStream> Input_; + bool Broken_ = false; + i32 Retries = 1; +}; + +TEST(TYamrTableReaderTest, TestReadRetry) +{ + const TVector<std::tuple<TString, TString, TString>> expectedResult = { + {"foo1", "bar1", "baz1"}, + {"foo2", "bar2", "baz2"}, + {"foo3", "bar3", "baz3"}, + }; + + TRowCollection rowCollection; + for (const auto& row : expectedResult) { + rowCollection.AddRow(std::get<0>(row), std::get<1>(row), std::get<2>(row)); + } + + ssize_t streamSize = rowCollection.ComputeTotalStreamSize(); + + for (ssize_t breakPoint = -1; breakPoint < streamSize; ++breakPoint) { + ::TIntrusivePtr<TRawTableReader> rawReader; + if (breakPoint == -1) { + rawReader = ::MakeIntrusive<TTestRawTableReader>(rowCollection); + } else { + rawReader = ::MakeIntrusive<TTestRawTableReader>(rowCollection, static_cast<size_t>(breakPoint)); + } + + TYaMRTableReader tableReader(rawReader); + TVector<std::tuple<TString, TString, TString>> actualResult; + for (; tableReader.IsValid(); tableReader.Next()) { + EXPECT_TRUE(!tableReader.IsRawReaderExhausted()); + auto row = tableReader.GetRow(); + actualResult.emplace_back(row.Key, row.SubKey, row.Value); + } + EXPECT_TRUE(tableReader.IsRawReaderExhausted()); + EXPECT_EQ(actualResult, expectedResult); + } +} + +TEST(TYamrTableReaderTest, TestSkipRetry) +{ + const TVector<std::tuple<TString, TString, TString>> expectedResult = { + {"foo1", "bar1", "baz1"}, + {"foo2", "bar2", "baz2"}, + {"foo3", "bar3", "baz3"}, + }; + + TRowCollection rowCollection; + for (const auto& row : expectedResult) { + rowCollection.AddRow(std::get<0>(row), std::get<1>(row), std::get<2>(row)); + } + + ssize_t streamSize = rowCollection.ComputeTotalStreamSize(); + + for (ssize_t breakPoint = -1; breakPoint < streamSize; ++breakPoint) { + try { + ::TIntrusivePtr<TRawTableReader> rawReader; + if (breakPoint == -1) { + rawReader = ::MakeIntrusive<TTestRawTableReader>(rowCollection); + } else { + rawReader = ::MakeIntrusive<TTestRawTableReader>(rowCollection, static_cast<size_t>(breakPoint)); + } + + TYaMRTableReader tableReader(rawReader); + ui32 rowCount = 0; + for (; tableReader.IsValid(); tableReader.Next()) { + EXPECT_TRUE(!tableReader.IsRawReaderExhausted()); + ++rowCount; + } + EXPECT_TRUE(tableReader.IsRawReaderExhausted()); + EXPECT_EQ(rowCount, 3u); + } catch (const std::exception& ex) { + Cerr << breakPoint << Endl; + Cerr << ex.what() << Endl; + throw; + } + } +} + +//////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/io/ya.make b/yt/cpp/mapreduce/io/ya.make index 0531a669b5..18a0cac464 100644 --- a/yt/cpp/mapreduce/io/ya.make +++ b/yt/cpp/mapreduce/io/ya.make @@ -22,13 +22,15 @@ SRCS( PEERDIR( contrib/libs/protobuf library/cpp/yson + library/cpp/yson/node yt/cpp/mapreduce/common yt/cpp/mapreduce/interface yt/cpp/mapreduce/interface/logging - yt/yt_proto/yt/formats - library/cpp/yson/node yt/cpp/mapreduce/skiff + yt/yt_proto/yt/formats yt/yt/core ) END() + +RECURSE_FOR_TESTS(ut) diff --git a/yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp b/yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp new file mode 100644 index 0000000000..cee62cf1f7 --- /dev/null +++ b/yt/cpp/mapreduce/raw_client/ut/raw_batch_request_ut.cpp @@ -0,0 +1,124 @@ +#include <yt/cpp/mapreduce/raw_client/raw_batch_request.h> + +#include <yt/cpp/mapreduce/http/context.h> +#include <yt/cpp/mapreduce/interface/client_method_options.h> +#include <yt/cpp/mapreduce/interface/errors.h> +#include <yt/cpp/mapreduce/common/retry_lib.h> + +#include <library/cpp/testing/gtest/gtest.h> + +using namespace NYT; +using namespace NYT::NDetail; +using namespace NYT::NDetail::NRawClient; + + +class TTestRetryPolicy + : public IRequestRetryPolicy +{ +private: + static constexpr int RetriableCode = 904; + +public: + void NotifyNewAttempt() override + { } + + TMaybe<TDuration> OnGenericError(const std::exception& /*e*/) override + { + return TDuration::Seconds(42); + } + + void OnIgnoredError(const TErrorResponse& /*e*/) override + { } + + TMaybe<TDuration> OnRetriableError(const TErrorResponse& e) override + { + if (e.GetError().GetCode() == RetriableCode) { + return TDuration::Seconds(e.GetError().GetAttributes().at("retry_interval").AsUint64()); + } else { + return Nothing(); + } + } + + TString GetAttemptDescription() const override + { + return "attempt"; + } + + static TNode GenerateRetriableError(TDuration retryDuration) + { + Y_ABORT_UNLESS(retryDuration - TDuration::Seconds(retryDuration.Seconds()) == TDuration::Zero()); + + return TNode() + ("code", RetriableCode) + ("attributes", + TNode() + ("retry_interval", retryDuration.Seconds())); + } +}; + + +TString GetPathFromRequest(const TNode& params) +{ + return params.AsMap().at("parameters").AsMap().at("path").AsString(); +} + +TVector<TString> GetAllPathsFromRequestList(const TNode& requestList) +{ + TVector<TString> result; + for (const auto& request : requestList.AsList()) { + result.push_back(GetPathFromRequest(request)); } + return result; +} + + +TEST(TBatchRequestImplTest, ParseResponse) { + TClientContext context; + TRawBatchRequest batchRequest(context.Config); + + EXPECT_EQ(batchRequest.BatchSize(), 0u); + + auto get1 = batchRequest.Get( + TTransactionId(), + "//getOk", + TGetOptions()); + + auto get2 = batchRequest.Get( + TTransactionId(), + "//getError-3", + TGetOptions()); + + auto get3 = batchRequest.Get( + TTransactionId(), + "//getError-5", + TGetOptions()); + + EXPECT_EQ(batchRequest.BatchSize(), 3u); + + auto testRetryPolicy = MakeIntrusive<TTestRetryPolicy>(); + const TInstant now = TInstant::Seconds(100500); + + TRawBatchRequest retryBatch(context.Config); + batchRequest.ParseResponse( + TNode() + .Add(TNode()("output", 5)) + .Add(TNode()("error", + TTestRetryPolicy::GenerateRetriableError(TDuration::Seconds(3)))) + .Add(TNode()("error", + TTestRetryPolicy::GenerateRetriableError(TDuration::Seconds(5)))), + "<no-request-id>", + testRetryPolicy, + &retryBatch, + now); + + EXPECT_EQ(batchRequest.BatchSize(), 0u); + EXPECT_EQ(retryBatch.BatchSize(), 2u); + + TNode retryParameterList; + TInstant nextTry; + retryBatch.FillParameterList(3, &retryParameterList, &nextTry); + EXPECT_EQ( + GetAllPathsFromRequestList(retryParameterList), + TVector<TString>({"//getError-3", "//getError-5"})); + + EXPECT_EQ(nextTry, now + TDuration::Seconds(5)); +} diff --git a/yt/cpp/mapreduce/raw_client/ut/raw_requests_ut.cpp b/yt/cpp/mapreduce/raw_client/ut/raw_requests_ut.cpp new file mode 100644 index 0000000000..d2c7022f02 --- /dev/null +++ b/yt/cpp/mapreduce/raw_client/ut/raw_requests_ut.cpp @@ -0,0 +1,191 @@ +#include <yt/cpp/mapreduce/raw_client/raw_requests.h> + +#include <library/cpp/yson/node/node_io.h> + +#include <library/cpp/testing/gtest/gtest.h> + +using namespace NYT; +using namespace NYT::NDetail; +using namespace NYT::NDetail::NRawClient; + +TEST(TOperationsApiParsingTest, ParseOperationAttributes) +{ + auto response = TStringBuf(R"""({ + "id" = "1-2-3-4"; + "authenticated_user" = "some-user"; + "start_time" = "2018-01-01T00:00:00.0Z"; + "weight" = 1.; + "state" = "completed"; + "suspended" = %false; + "finish_time" = "2018-01-02T00:00:00.0Z"; + "brief_progress" = { + "jobs" = { + "lost" = 0; + "pending" = 0; + "failed" = 1; + "aborted" = 0; + "total" = 84; + "running" = 0; + "completed" = 84; + }; + }; + "result" = { + "error" = { + "attributes" = {}; + "code" = 0; + "message" = ""; + }; + }; + "brief_spec" = { + "input_table_paths" = < + "count" = 1; + > [ + "//some-input"; + ]; + "pool" = "some-pool"; + "scheduling_info_per_pool_tree" = { + "physical" = { + "pool" = "some-pool"; + }; + }; + "title" = "some-title"; + "output_table_paths" = < + "count" = 1; + > [ + "//some-output"; + ]; + "mapper" = { + "command" = "some-command"; + }; + }; + "type" = "map"; + "pool" = "some-pool"; + "progress" = { + "build_time" = "2018-01-01T00:00:00.000000Z"; + "job_statistics" = { + "data" = { + "input" = { + "row_count" = { + "$" = { + "failed" = { + "map" = { + "max" = 1; + "count" = 1; + "sum" = 1; + "min" = 1; + }; + }; + "completed" = { + "map" = { + "max" = 1; + "count" = 84; + "sum" = 84; + "min" = 1; + }; + }; + }; + }; + }; + }; + }; + "total_job_counter" = { + "completed" = { + "total" = 3; + "non-interrupted" = 1; + "interrupted" = { + "whatever_interrupted" = 2; + }; + }; + "aborted" = { + "non_scheduled" = { + "whatever_non_scheduled" = 3; + }; + "scheduled" = { + "whatever_scheduled" = 4; + }; + "total" = 7; + }; + "lost" = 5; + "invalidated" = 6; + "failed" = 7; + "running" = 8; + "suspended" = 9; + "pending" = 10; + "blocked" = 11; + "total" = 66; + }; + }; + "events" = [ + {"state" = "pending"; "time" = "2018-01-01T00:00:00.000000Z";}; + {"state" = "materializing"; "time" = "2018-01-02T00:00:00.000000Z";}; + {"state" = "running"; "time" = "2018-01-03T00:00:00.000000Z";}; + ]; + })"""); + auto attrs = ParseOperationAttributes(NodeFromYsonString(response)); + + EXPECT_TRUE(attrs.Id); + EXPECT_EQ(GetGuidAsString(*attrs.Id), "1-2-3-4"); + + EXPECT_TRUE(attrs.Type); + EXPECT_EQ(*attrs.Type, EOperationType::Map); + + EXPECT_TRUE(attrs.State); + EXPECT_EQ(*attrs.State, "completed"); + + EXPECT_TRUE(attrs.BriefState); + EXPECT_EQ(*attrs.BriefState, EOperationBriefState::Completed); + + EXPECT_TRUE(attrs.AuthenticatedUser); + EXPECT_EQ(*attrs.AuthenticatedUser, "some-user"); + + EXPECT_TRUE(attrs.StartTime); + EXPECT_TRUE(attrs.FinishTime); + EXPECT_EQ(*attrs.FinishTime - *attrs.StartTime, TDuration::Days(1)); + + EXPECT_TRUE(attrs.BriefProgress); + EXPECT_EQ(attrs.BriefProgress->Completed, 84u); + EXPECT_EQ(attrs.BriefProgress->Failed, 1u); + + EXPECT_TRUE(attrs.BriefSpec); + EXPECT_EQ((*attrs.BriefSpec)["title"].AsString(), "some-title"); + + EXPECT_TRUE(attrs.Suspended); + EXPECT_EQ(*attrs.Suspended, false); + + EXPECT_TRUE(attrs.Result); + EXPECT_TRUE(!attrs.Result->Error); + + EXPECT_TRUE(attrs.Progress); + EXPECT_EQ(attrs.Progress->JobStatistics.JobState({}).GetStatistics("data/input/row_count").Sum(), 85u); + EXPECT_EQ(attrs.Progress->JobCounters.GetCompletedInterrupted().GetTotal(), 2u); + EXPECT_EQ(attrs.Progress->JobCounters.GetAbortedNonScheduled().GetTotal(), 3u); + EXPECT_EQ(attrs.Progress->JobCounters.GetAbortedScheduled().GetTotal(), 4u); + EXPECT_EQ(attrs.Progress->JobCounters.GetAborted().GetTotal(), 7u); + EXPECT_EQ(attrs.Progress->JobCounters.GetFailed().GetTotal(), 7u); + EXPECT_EQ(attrs.Progress->JobCounters.GetTotal(), 66u); + EXPECT_EQ(*attrs.Progress->BuildTime, TInstant::ParseIso8601("2018-01-01T00:00:00.000000Z")); + + EXPECT_TRUE(attrs.Events); + EXPECT_EQ((*attrs.Events)[1].State, "materializing"); + EXPECT_EQ((*attrs.Events)[1].Time, TInstant::ParseIso8601("2018-01-02T00:00:00.000000Z")); +} + +TEST(TOperationsApiParsingTest, EmptyProgress) +{ + auto response = TStringBuf(R"""({ + "id" = "1-2-3-4"; + "brief_progress" = {}; + "progress" = {}; + })"""); + auto attrs = ParseOperationAttributes(NodeFromYsonString(response)); + + EXPECT_TRUE(attrs.Id); + EXPECT_EQ(GetGuidAsString(*attrs.Id), "1-2-3-4"); + + EXPECT_TRUE(!attrs.BriefProgress); + + EXPECT_TRUE(attrs.Progress); + EXPECT_EQ(attrs.Progress->JobStatistics.JobState({}).GetStatisticsNames(), TVector<TString>{}); + EXPECT_EQ(attrs.Progress->JobCounters.GetTotal(), 0u); + EXPECT_TRUE(!attrs.Progress->BuildTime); +} diff --git a/yt/cpp/mapreduce/raw_client/ut/ya.make b/yt/cpp/mapreduce/raw_client/ut/ya.make new file mode 100644 index 0000000000..248471152c --- /dev/null +++ b/yt/cpp/mapreduce/raw_client/ut/ya.make @@ -0,0 +1,12 @@ +GTEST() + +SRCS( + raw_batch_request_ut.cpp + raw_requests_ut.cpp +) + +PEERDIR( + yt/cpp/mapreduce/raw_client +) + +END() diff --git a/yt/cpp/mapreduce/raw_client/ya.make b/yt/cpp/mapreduce/raw_client/ya.make index 0d03aae80c..c201b86f05 100644 --- a/yt/cpp/mapreduce/raw_client/ya.make +++ b/yt/cpp/mapreduce/raw_client/ya.make @@ -17,3 +17,7 @@ PEERDIR( ) END() + +RECURSE_FOR_TESTS( + ut +) diff --git a/yt/cpp/mapreduce/skiff/checked_parser.h b/yt/cpp/mapreduce/skiff/checked_parser.h new file mode 100644 index 0000000000..8fd9f90b0b --- /dev/null +++ b/yt/cpp/mapreduce/skiff/checked_parser.h @@ -0,0 +1 @@ +#include <library/cpp/skiff/skiff.h> |