diff options
author | nadya73 <nadya73@yandex-team.com> | 2024-07-02 23:10:50 +0300 |
---|---|---|
committer | nadya73 <nadya73@yandex-team.com> | 2024-07-02 23:21:03 +0300 |
commit | 5ea9afc5ee7edc24efa5f45b3a15e184872b0854 (patch) | |
tree | 4ccc339d97575cba8b3ed47b6f0615326bdb5324 /yt/cpp/mapreduce/io | |
parent | 96b239778766d32d5158aca805e08199b3c0a743 (diff) | |
download | ydb-5ea9afc5ee7edc24efa5f45b3a15e184872b0854.tar.gz |
[yt/cpp/mapreduce] YT-21595: Use gtest instead of ytest in all mapreduce tests
85671f0cf4f45b4f015fa2cc0d195b81c16c6e8a
Diffstat (limited to 'yt/cpp/mapreduce/io')
-rw-r--r-- | yt/cpp/mapreduce/io/ut/end_of_stream_ut.cpp | 94 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/ut/readers_ut.cpp | 232 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/ut/ut_row.proto | 7 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/ut/ya.make | 15 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/ut/yamr_table_reader_ut.cpp | 185 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/ya.make | 6 |
6 files changed, 537 insertions, 2 deletions
diff --git a/yt/cpp/mapreduce/io/ut/end_of_stream_ut.cpp b/yt/cpp/mapreduce/io/ut/end_of_stream_ut.cpp new file mode 100644 index 0000000000..3642aa1e45 --- /dev/null +++ b/yt/cpp/mapreduce/io/ut/end_of_stream_ut.cpp @@ -0,0 +1,94 @@ +#include <yt/cpp/mapreduce/io/node_table_reader.h> + +#include <library/cpp/testing/gtest/gtest.h> + +using namespace NYT; + +//////////////////////////////////////////////////////////////////// + +class TStringRawTableReader + : public TRawTableReader +{ +public: + TStringRawTableReader(const TString& string) + : String_(string) + , Stream_(String_) + { } + + bool Retry(const TMaybe<ui32>&, const TMaybe<ui64>&, const std::exception_ptr&) override + { + return false; + } + + void ResetRetries() override + { } + + bool HasRangeIndices() const override + { + return false; + } + +private: + size_t DoRead(void* buf, size_t len) override + { + return Stream_.Read(buf, len); + } + +private: + const TString String_; + TStringStream Stream_; +}; + +//////////////////////////////////////////////////////////////////// + +class TEndOfStreamTest + : public ::testing::TestWithParam<bool> +{ }; + +TEST_P(TEndOfStreamTest, Eos) +{ + bool addEos = GetParam(); + auto proxy = ::MakeIntrusive<TStringRawTableReader>(TString::Join( + "{a=13;b = \"string\"}; {c = {d=12}};", + "<key_switch=%true>#; {e = 42};", + addEos ? "<end_of_stream=%true>#" : "" + )); + + TNodeTableReader reader(proxy); + TVector<TNode> expectedRows = {TNode()("a", 13)("b", "string"), TNode()("c", TNode()("d", 12))}; + for (const auto& expectedRow : expectedRows) { + EXPECT_TRUE(reader.IsValid()); + EXPECT_TRUE(!reader.IsEndOfStream()); + EXPECT_TRUE(!reader.IsRawReaderExhausted()); + EXPECT_EQ(reader.GetRow(), expectedRow); + reader.Next(); + } + + EXPECT_TRUE(!reader.IsValid()); + EXPECT_TRUE(!reader.IsEndOfStream()); + EXPECT_TRUE(!reader.IsRawReaderExhausted()); + + reader.NextKey(); + reader.Next(); + expectedRows = {TNode()("e", 42)}; + for (const auto& expectedRow : expectedRows) { + EXPECT_TRUE(reader.IsValid()); + EXPECT_TRUE(!reader.IsEndOfStream()); + EXPECT_TRUE(!reader.IsRawReaderExhausted()); + EXPECT_EQ(reader.GetRow(), expectedRow); + reader.Next(); + } + + EXPECT_TRUE(!reader.IsValid()); + if (addEos) { + EXPECT_TRUE(reader.IsEndOfStream()); + } else { + EXPECT_TRUE(!reader.IsEndOfStream()); + } + EXPECT_TRUE(reader.IsRawReaderExhausted()); +} + +INSTANTIATE_TEST_SUITE_P(WithEos, TEndOfStreamTest, ::testing::Values(true)); +INSTANTIATE_TEST_SUITE_P(WithoutEos, TEndOfStreamTest, ::testing::Values(false)); + +//////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/io/ut/readers_ut.cpp b/yt/cpp/mapreduce/io/ut/readers_ut.cpp new file mode 100644 index 0000000000..86d06629a2 --- /dev/null +++ b/yt/cpp/mapreduce/io/ut/readers_ut.cpp @@ -0,0 +1,232 @@ +#include <yt/cpp/mapreduce/io/node_table_reader.h> +#include <yt/cpp/mapreduce/io/proto_table_reader.h> +#include <yt/cpp/mapreduce/io/skiff_table_reader.h> +#include <yt/cpp/mapreduce/io/stream_table_reader.h> +#include <yt/cpp/mapreduce/io/yamr_table_reader.h> + +#include <yt/cpp/mapreduce/io/ut/ut_row.pb.h> + +#include <yt/cpp/mapreduce/skiff/checked_parser.h> +#include <yt/cpp/mapreduce/skiff/skiff_schema.h> + +#include <library/cpp/yson/node/node_io.h> + +#include <library/cpp/testing/gtest/gtest.h> + +using namespace NYT; +using namespace NYT::NDetail; +using namespace NSkiff; + +//////////////////////////////////////////////////////////////////// + +class TRetryEmulatingRawTableReader + : public TRawTableReader +{ +public: + TRetryEmulatingRawTableReader(const TString& string) + : String_(string) + , Stream_(String_) + { } + + bool Retry( + const TMaybe<ui32>& /*rangeIndex*/, + const TMaybe<ui64>& /*rowIndex*/, + const std::exception_ptr& /*error*/) override + { + if (RetriesLeft_ == 0) { + return false; + } + Stream_ = TStringStream(String_); + --RetriesLeft_; + return true; + } + + void ResetRetries() override + { + RetriesLeft_ = 10; + } + + bool HasRangeIndices() const override + { + return false; + } + +private: + size_t DoRead(void* buf, size_t len) override + { + switch (DoReadCallCount_++) { + case 0: + return Stream_.Read(buf, std::min(len, String_.size() / 2)); + case 1: + ythrow yexception() << "Just wanted to test you, first fail"; + case 2: + ythrow yexception() << "Just wanted to test you, second fail"; + default: + return Stream_.Read(buf, len); + } + } + +private: + const TString String_; + TStringStream Stream_; + int RetriesLeft_ = 10; + int DoReadCallCount_ = 0; +}; + +//////////////////////////////////////////////////////////////////// + +TEST(TReadersTest, YsonGood) +{ + auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>("{a=13;b = \"string\"}; {c = {d=12}}"); + + TNodeTableReader reader(proxy); + TVector<TNode> expectedRows = {TNode()("a", 13)("b", "string"), TNode()("c", TNode()("d", 12))}; + for (const auto& expectedRow : expectedRows) { + EXPECT_TRUE(reader.IsValid()); + EXPECT_TRUE(!reader.IsRawReaderExhausted()); + EXPECT_EQ(reader.GetRow(), expectedRow); + reader.Next(); + } + EXPECT_TRUE(!reader.IsValid()); + EXPECT_TRUE(reader.IsRawReaderExhausted()); +} + +TEST(TReadersTest, YsonBad) +{ + auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>("{a=13;-b := \"string\"}; {c = {d=12}}"); + EXPECT_THROW(TNodeTableReader(proxy).GetRow(), yexception); +} + +TEST(TReadersTest, SkiffGood) +{ + const char arr[] = "\x00\x00" "\x94\x88\x01\x00\x00\x00\x00\x00" "\x06\x00\x00\x00""foobar" "\x01" + "\x00\x00" "\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF" "\x03\x00\x00\x00""abc" "\x00"; + auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>(TString(arr, sizeof(arr) - 1)); + + TSkiffSchemaPtr schema = CreateVariant16Schema({ + CreateTupleSchema({ + CreateSimpleTypeSchema(EWireType::Int64)->SetName("a"), + CreateSimpleTypeSchema(EWireType::String32)->SetName("b"), + CreateSimpleTypeSchema(EWireType::Boolean)->SetName("c") + }) + }); + + TSkiffTableReader reader(proxy, schema); + TVector<TNode> expectedRows = { + TNode()("a", 100500)("b", "foobar")("c", true), + TNode()("a", -1)("b", "abc")("c", false), + }; + for (const auto& expectedRow : expectedRows) { + EXPECT_TRUE(reader.IsValid()); + EXPECT_TRUE(!reader.IsRawReaderExhausted()); + EXPECT_EQ(reader.GetRow(), expectedRow); + reader.Next(); + } + EXPECT_TRUE(!reader.IsValid()); + EXPECT_TRUE(reader.IsRawReaderExhausted()); +} + +TEST(TReadersTest, SkiffExtraColumns) +{ + const char arr[] = "\x00\x00" "\x7B\x00\x00\x00\x00\x00\x00\x00"; + auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>(TString(arr, sizeof(arr) - 1)); + + TSkiffSchemaPtr schema = CreateVariant16Schema({ + CreateTupleSchema({ + CreateSimpleTypeSchema(EWireType::Uint64)->SetName("$timestamp") + }) + }); + + TSkiffTableReader reader(proxy, schema); + TVector<TNode> expectedRows = { + TNode()("$timestamp", 123u), + }; + for (const auto& expectedRow : expectedRows) { + EXPECT_TRUE(reader.IsValid()); + EXPECT_TRUE(!reader.IsRawReaderExhausted()); + EXPECT_EQ(reader.GetRow(), expectedRow); + reader.Next(); + } + EXPECT_TRUE(!reader.IsValid()); + EXPECT_TRUE(reader.IsRawReaderExhausted()); +} + +TEST(TReadersTest, SkiffBad) +{ + const char arr[] = "\x00\x00" "\x94\x88\x01\x00\x00\x00\x00\x00" "\xFF\x00\x00\x00""foobar" "\x01"; + auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>(TString(arr, sizeof(arr) - 1)); + + TSkiffSchemaPtr schema = CreateVariant16Schema({ + CreateTupleSchema({ + CreateSimpleTypeSchema(EWireType::Int64)->SetName("a"), + CreateSimpleTypeSchema(EWireType::String32)->SetName("b"), + CreateSimpleTypeSchema(EWireType::Boolean)->SetName("c") + }) + }); + + EXPECT_THROW(TSkiffTableReader(proxy, schema).GetRow(), yexception); +} + +TEST(TReadersTest, SkiffBadFormat) +{ + const char arr[] = "\x00\x00" "\x12" "\x23\x34\x00\x00"; + auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>(TString(arr, sizeof(arr) - 1)); + + TSkiffSchemaPtr schema = CreateVariant16Schema({ + CreateTupleSchema({ + CreateVariant8Schema({ + CreateSimpleTypeSchema(EWireType::Nothing), + CreateSimpleTypeSchema(EWireType::Int32) + }) + }) + }); + + EXPECT_THROW_MESSAGE_HAS_SUBSTR( + TSkiffTableReader(proxy, schema).GetRow(), + yexception, + "Tag for 'variant8<nothing,int32>' expected to be 0 or 1"); +} + +TEST(TReadersTest, ProtobufGood) +{ + using NYT::NTesting::TRow; + + const char arr[] = "\x13\x00\x00\x00" "\x0A""\x06""foobar" "\x10""\x0F" "\x19""\x94\x88\x01\x00\x00\x00\x00\x00" + "\x10\x00\x00\x00" "\x0A""\x03""abc" "\x10""\x1F" "\x19""\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF"; + auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>(TString(arr, sizeof(arr) - 1)); + + TLenvalProtoTableReader reader(proxy, {TRow::descriptor()}); + TRow row1, row2; + row1.SetString("foobar"); + row1.SetInt32(15); + row1.SetFixed64(100500); + + row2.SetString("abc"); + row2.SetInt32(31); + row2.SetFixed64(-1); + + TVector<TRow> expectedRows = {row1, row2}; + for (const auto& expectedRow : expectedRows) { + TRow row; + EXPECT_TRUE(reader.IsValid()); + EXPECT_TRUE(!reader.IsRawReaderExhausted()); + reader.ReadRow(&row); + EXPECT_EQ(row.GetString(), expectedRow.GetString()); + EXPECT_EQ(row.GetInt32(), expectedRow.GetInt32()); + EXPECT_EQ(row.GetFixed64(), expectedRow.GetFixed64()); + reader.Next(); + } + EXPECT_TRUE(!reader.IsValid()); + EXPECT_TRUE(reader.IsRawReaderExhausted()); +} + +TEST(TReadersTest, ProtobufBad) +{ + const char arr[] = "\x13\x00\x00\x00" "\x0F""\x06""foobar" "\x10""\x0F" "\x19""\x94\x88\x01\x00\x00\x00\x00\x00"; + auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>(TString(arr, sizeof(arr) - 1)); + + NYT::NTesting::TRow row; + EXPECT_THROW(TLenvalProtoTableReader(proxy, { NYT::NTesting::TRow::descriptor() }).ReadRow(&row), yexception); +} + +//////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/io/ut/ut_row.proto b/yt/cpp/mapreduce/io/ut/ut_row.proto new file mode 100644 index 0000000000..6a9649e4c6 --- /dev/null +++ b/yt/cpp/mapreduce/io/ut/ut_row.proto @@ -0,0 +1,7 @@ +package NYT.NTesting; + +message TRow { + optional string String = 1; + optional int32 Int32 = 2; + optional fixed64 Fixed64 = 3; +} diff --git a/yt/cpp/mapreduce/io/ut/ya.make b/yt/cpp/mapreduce/io/ut/ya.make new file mode 100644 index 0000000000..e7f9d018c3 --- /dev/null +++ b/yt/cpp/mapreduce/io/ut/ya.make @@ -0,0 +1,15 @@ +GTEST() + +SRCS( + end_of_stream_ut.cpp + readers_ut.cpp + yamr_table_reader_ut.cpp + + ut_row.proto +) + +PEERDIR( + yt/cpp/mapreduce/io +) + +END() diff --git a/yt/cpp/mapreduce/io/ut/yamr_table_reader_ut.cpp b/yt/cpp/mapreduce/io/ut/yamr_table_reader_ut.cpp new file mode 100644 index 0000000000..d55a28b3e4 --- /dev/null +++ b/yt/cpp/mapreduce/io/ut/yamr_table_reader_ut.cpp @@ -0,0 +1,185 @@ +#include <yt/cpp/mapreduce/io/yamr_table_reader.h> + +#include <library/cpp/testing/gtest/gtest.h> + +using namespace NYT; + +template <> +void Out<std::tuple<TString, TString, TString>>(IOutputStream& out, const std::tuple<TString, TString, TString>& value) { + out << "{" << std::get<0>(value) << ", " << std::get<1>(value) << ", " << std::get<2>(value) << "}"; +} + + +//////////////////////////////////////////////////////////////////// + +class TRowCollection +{ +public: + void AddRow(TStringBuf key, TStringBuf subkey, TStringBuf value) + { + TStringStream row; + auto appendLenval = [&] (TStringBuf value) { + ui32 size = value.size(); + row.Write(&size, sizeof(size)); + row.Write(value); + }; + appendLenval(key); + appendLenval(subkey); + appendLenval(value); + RowList_.push_back(row.Str()); + } + + TString GetStreamDataStartFromRow(ui64 rowIndex) const + { + Y_ABORT_UNLESS(rowIndex < RowList_.size()); + TStringStream ss; + ss.Write("\xFC\xFF\xFF\xFF"); + ss.Write(&rowIndex, sizeof(rowIndex)); + for (size_t i = rowIndex; i != RowList_.size(); ++i) { + ss.Write(RowList_[i]); + } + return ss.Str(); + } + + size_t ComputeTotalStreamSize() const { + return GetStreamDataStartFromRow(0).size(); + } + +private: + TVector<TString> RowList_; +}; + +class TTestRawTableReader + : public TRawTableReader +{ +public: + TTestRawTableReader(const TRowCollection& rowCollection) + : RowCollection_(rowCollection) + , DataToRead_(RowCollection_.GetStreamDataStartFromRow(0)) + , Input_(MakeHolder<TStringStream>(DataToRead_)) + { } + + TTestRawTableReader(const TRowCollection& rowCollection, size_t breakPoint) + : RowCollection_(rowCollection) + , DataToRead_(RowCollection_.GetStreamDataStartFromRow(0).substr(0, breakPoint)) + , Input_(MakeHolder<TStringStream>(DataToRead_)) + , Broken_(true) + { } + + size_t DoRead(void* buf, size_t size) override + { + Y_ABORT_UNLESS(Input_); + size_t res = Input_->Read(buf, size); + if (!res && Broken_) { + ythrow yexception() << "Stream is broken"; + } + return res; + } + + bool Retry( + const TMaybe<ui32>& /*rangeIndex*/, + const TMaybe<ui64>& rowIndex, + const std::exception_ptr& /*error*/) override + { + if (--Retries < 0) { + return false; + } + ui64 actualRowIndex = rowIndex ? *rowIndex : 0; + DataToRead_ = RowCollection_.GetStreamDataStartFromRow(actualRowIndex); + Input_ = MakeHolder<TStringInput>(DataToRead_); + Broken_ = false; + return true; + } + + void ResetRetries() override + { } + + bool HasRangeIndices() const override + { + return false; + } + +private: + TRowCollection RowCollection_; + TString DataToRead_; + THolder<IInputStream> Input_; + bool Broken_ = false; + i32 Retries = 1; +}; + +TEST(TYamrTableReaderTest, TestReadRetry) +{ + const TVector<std::tuple<TString, TString, TString>> expectedResult = { + {"foo1", "bar1", "baz1"}, + {"foo2", "bar2", "baz2"}, + {"foo3", "bar3", "baz3"}, + }; + + TRowCollection rowCollection; + for (const auto& row : expectedResult) { + rowCollection.AddRow(std::get<0>(row), std::get<1>(row), std::get<2>(row)); + } + + ssize_t streamSize = rowCollection.ComputeTotalStreamSize(); + + for (ssize_t breakPoint = -1; breakPoint < streamSize; ++breakPoint) { + ::TIntrusivePtr<TRawTableReader> rawReader; + if (breakPoint == -1) { + rawReader = ::MakeIntrusive<TTestRawTableReader>(rowCollection); + } else { + rawReader = ::MakeIntrusive<TTestRawTableReader>(rowCollection, static_cast<size_t>(breakPoint)); + } + + TYaMRTableReader tableReader(rawReader); + TVector<std::tuple<TString, TString, TString>> actualResult; + for (; tableReader.IsValid(); tableReader.Next()) { + EXPECT_TRUE(!tableReader.IsRawReaderExhausted()); + auto row = tableReader.GetRow(); + actualResult.emplace_back(row.Key, row.SubKey, row.Value); + } + EXPECT_TRUE(tableReader.IsRawReaderExhausted()); + EXPECT_EQ(actualResult, expectedResult); + } +} + +TEST(TYamrTableReaderTest, TestSkipRetry) +{ + const TVector<std::tuple<TString, TString, TString>> expectedResult = { + {"foo1", "bar1", "baz1"}, + {"foo2", "bar2", "baz2"}, + {"foo3", "bar3", "baz3"}, + }; + + TRowCollection rowCollection; + for (const auto& row : expectedResult) { + rowCollection.AddRow(std::get<0>(row), std::get<1>(row), std::get<2>(row)); + } + + ssize_t streamSize = rowCollection.ComputeTotalStreamSize(); + + for (ssize_t breakPoint = -1; breakPoint < streamSize; ++breakPoint) { + try { + ::TIntrusivePtr<TRawTableReader> rawReader; + if (breakPoint == -1) { + rawReader = ::MakeIntrusive<TTestRawTableReader>(rowCollection); + } else { + rawReader = ::MakeIntrusive<TTestRawTableReader>(rowCollection, static_cast<size_t>(breakPoint)); + } + + TYaMRTableReader tableReader(rawReader); + ui32 rowCount = 0; + for (; tableReader.IsValid(); tableReader.Next()) { + EXPECT_TRUE(!tableReader.IsRawReaderExhausted()); + ++rowCount; + } + EXPECT_TRUE(tableReader.IsRawReaderExhausted()); + EXPECT_EQ(rowCount, 3u); + } catch (const std::exception& ex) { + Cerr << breakPoint << Endl; + Cerr << ex.what() << Endl; + throw; + } + } +} + +//////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/io/ya.make b/yt/cpp/mapreduce/io/ya.make index 0531a669b5..18a0cac464 100644 --- a/yt/cpp/mapreduce/io/ya.make +++ b/yt/cpp/mapreduce/io/ya.make @@ -22,13 +22,15 @@ SRCS( PEERDIR( contrib/libs/protobuf library/cpp/yson + library/cpp/yson/node yt/cpp/mapreduce/common yt/cpp/mapreduce/interface yt/cpp/mapreduce/interface/logging - yt/yt_proto/yt/formats - library/cpp/yson/node yt/cpp/mapreduce/skiff + yt/yt_proto/yt/formats yt/yt/core ) END() + +RECURSE_FOR_TESTS(ut) |