aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/io/ut
diff options
context:
space:
mode:
authornadya73 <nadya73@yandex-team.com>2024-07-02 23:10:50 +0300
committernadya73 <nadya73@yandex-team.com>2024-07-02 23:21:03 +0300
commit5ea9afc5ee7edc24efa5f45b3a15e184872b0854 (patch)
tree4ccc339d97575cba8b3ed47b6f0615326bdb5324 /yt/cpp/mapreduce/io/ut
parent96b239778766d32d5158aca805e08199b3c0a743 (diff)
downloadydb-5ea9afc5ee7edc24efa5f45b3a15e184872b0854.tar.gz
[yt/cpp/mapreduce] YT-21595: Use gtest instead of ytest in all mapreduce tests
85671f0cf4f45b4f015fa2cc0d195b81c16c6e8a
Diffstat (limited to 'yt/cpp/mapreduce/io/ut')
-rw-r--r--yt/cpp/mapreduce/io/ut/end_of_stream_ut.cpp94
-rw-r--r--yt/cpp/mapreduce/io/ut/readers_ut.cpp232
-rw-r--r--yt/cpp/mapreduce/io/ut/ut_row.proto7
-rw-r--r--yt/cpp/mapreduce/io/ut/ya.make15
-rw-r--r--yt/cpp/mapreduce/io/ut/yamr_table_reader_ut.cpp185
5 files changed, 533 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/io/ut/end_of_stream_ut.cpp b/yt/cpp/mapreduce/io/ut/end_of_stream_ut.cpp
new file mode 100644
index 0000000000..3642aa1e45
--- /dev/null
+++ b/yt/cpp/mapreduce/io/ut/end_of_stream_ut.cpp
@@ -0,0 +1,94 @@
+#include <yt/cpp/mapreduce/io/node_table_reader.h>
+
+#include <library/cpp/testing/gtest/gtest.h>
+
+using namespace NYT;
+
+////////////////////////////////////////////////////////////////////
+
+class TStringRawTableReader
+ : public TRawTableReader
+{
+public:
+ TStringRawTableReader(const TString& string)
+ : String_(string)
+ , Stream_(String_)
+ { }
+
+ bool Retry(const TMaybe<ui32>&, const TMaybe<ui64>&, const std::exception_ptr&) override
+ {
+ return false;
+ }
+
+ void ResetRetries() override
+ { }
+
+ bool HasRangeIndices() const override
+ {
+ return false;
+ }
+
+private:
+ size_t DoRead(void* buf, size_t len) override
+ {
+ return Stream_.Read(buf, len);
+ }
+
+private:
+ const TString String_;
+ TStringStream Stream_;
+};
+
+////////////////////////////////////////////////////////////////////
+
+class TEndOfStreamTest
+ : public ::testing::TestWithParam<bool>
+{ };
+
+TEST_P(TEndOfStreamTest, Eos)
+{
+ bool addEos = GetParam();
+ auto proxy = ::MakeIntrusive<TStringRawTableReader>(TString::Join(
+ "{a=13;b = \"string\"}; {c = {d=12}};",
+ "<key_switch=%true>#; {e = 42};",
+ addEos ? "<end_of_stream=%true>#" : ""
+ ));
+
+ TNodeTableReader reader(proxy);
+ TVector<TNode> expectedRows = {TNode()("a", 13)("b", "string"), TNode()("c", TNode()("d", 12))};
+ for (const auto& expectedRow : expectedRows) {
+ EXPECT_TRUE(reader.IsValid());
+ EXPECT_TRUE(!reader.IsEndOfStream());
+ EXPECT_TRUE(!reader.IsRawReaderExhausted());
+ EXPECT_EQ(reader.GetRow(), expectedRow);
+ reader.Next();
+ }
+
+ EXPECT_TRUE(!reader.IsValid());
+ EXPECT_TRUE(!reader.IsEndOfStream());
+ EXPECT_TRUE(!reader.IsRawReaderExhausted());
+
+ reader.NextKey();
+ reader.Next();
+ expectedRows = {TNode()("e", 42)};
+ for (const auto& expectedRow : expectedRows) {
+ EXPECT_TRUE(reader.IsValid());
+ EXPECT_TRUE(!reader.IsEndOfStream());
+ EXPECT_TRUE(!reader.IsRawReaderExhausted());
+ EXPECT_EQ(reader.GetRow(), expectedRow);
+ reader.Next();
+ }
+
+ EXPECT_TRUE(!reader.IsValid());
+ if (addEos) {
+ EXPECT_TRUE(reader.IsEndOfStream());
+ } else {
+ EXPECT_TRUE(!reader.IsEndOfStream());
+ }
+ EXPECT_TRUE(reader.IsRawReaderExhausted());
+}
+
+INSTANTIATE_TEST_SUITE_P(WithEos, TEndOfStreamTest, ::testing::Values(true));
+INSTANTIATE_TEST_SUITE_P(WithoutEos, TEndOfStreamTest, ::testing::Values(false));
+
+////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/io/ut/readers_ut.cpp b/yt/cpp/mapreduce/io/ut/readers_ut.cpp
new file mode 100644
index 0000000000..86d06629a2
--- /dev/null
+++ b/yt/cpp/mapreduce/io/ut/readers_ut.cpp
@@ -0,0 +1,232 @@
+#include <yt/cpp/mapreduce/io/node_table_reader.h>
+#include <yt/cpp/mapreduce/io/proto_table_reader.h>
+#include <yt/cpp/mapreduce/io/skiff_table_reader.h>
+#include <yt/cpp/mapreduce/io/stream_table_reader.h>
+#include <yt/cpp/mapreduce/io/yamr_table_reader.h>
+
+#include <yt/cpp/mapreduce/io/ut/ut_row.pb.h>
+
+#include <yt/cpp/mapreduce/skiff/checked_parser.h>
+#include <yt/cpp/mapreduce/skiff/skiff_schema.h>
+
+#include <library/cpp/yson/node/node_io.h>
+
+#include <library/cpp/testing/gtest/gtest.h>
+
+using namespace NYT;
+using namespace NYT::NDetail;
+using namespace NSkiff;
+
+////////////////////////////////////////////////////////////////////
+
+class TRetryEmulatingRawTableReader
+ : public TRawTableReader
+{
+public:
+ TRetryEmulatingRawTableReader(const TString& string)
+ : String_(string)
+ , Stream_(String_)
+ { }
+
+ bool Retry(
+ const TMaybe<ui32>& /*rangeIndex*/,
+ const TMaybe<ui64>& /*rowIndex*/,
+ const std::exception_ptr& /*error*/) override
+ {
+ if (RetriesLeft_ == 0) {
+ return false;
+ }
+ Stream_ = TStringStream(String_);
+ --RetriesLeft_;
+ return true;
+ }
+
+ void ResetRetries() override
+ {
+ RetriesLeft_ = 10;
+ }
+
+ bool HasRangeIndices() const override
+ {
+ return false;
+ }
+
+private:
+ size_t DoRead(void* buf, size_t len) override
+ {
+ switch (DoReadCallCount_++) {
+ case 0:
+ return Stream_.Read(buf, std::min(len, String_.size() / 2));
+ case 1:
+ ythrow yexception() << "Just wanted to test you, first fail";
+ case 2:
+ ythrow yexception() << "Just wanted to test you, second fail";
+ default:
+ return Stream_.Read(buf, len);
+ }
+ }
+
+private:
+ const TString String_;
+ TStringStream Stream_;
+ int RetriesLeft_ = 10;
+ int DoReadCallCount_ = 0;
+};
+
+////////////////////////////////////////////////////////////////////
+
+TEST(TReadersTest, YsonGood)
+{
+ auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>("{a=13;b = \"string\"}; {c = {d=12}}");
+
+ TNodeTableReader reader(proxy);
+ TVector<TNode> expectedRows = {TNode()("a", 13)("b", "string"), TNode()("c", TNode()("d", 12))};
+ for (const auto& expectedRow : expectedRows) {
+ EXPECT_TRUE(reader.IsValid());
+ EXPECT_TRUE(!reader.IsRawReaderExhausted());
+ EXPECT_EQ(reader.GetRow(), expectedRow);
+ reader.Next();
+ }
+ EXPECT_TRUE(!reader.IsValid());
+ EXPECT_TRUE(reader.IsRawReaderExhausted());
+}
+
+TEST(TReadersTest, YsonBad)
+{
+ auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>("{a=13;-b := \"string\"}; {c = {d=12}}");
+ EXPECT_THROW(TNodeTableReader(proxy).GetRow(), yexception);
+}
+
+TEST(TReadersTest, SkiffGood)
+{
+ const char arr[] = "\x00\x00" "\x94\x88\x01\x00\x00\x00\x00\x00" "\x06\x00\x00\x00""foobar" "\x01"
+ "\x00\x00" "\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF" "\x03\x00\x00\x00""abc" "\x00";
+ auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>(TString(arr, sizeof(arr) - 1));
+
+ TSkiffSchemaPtr schema = CreateVariant16Schema({
+ CreateTupleSchema({
+ CreateSimpleTypeSchema(EWireType::Int64)->SetName("a"),
+ CreateSimpleTypeSchema(EWireType::String32)->SetName("b"),
+ CreateSimpleTypeSchema(EWireType::Boolean)->SetName("c")
+ })
+ });
+
+ TSkiffTableReader reader(proxy, schema);
+ TVector<TNode> expectedRows = {
+ TNode()("a", 100500)("b", "foobar")("c", true),
+ TNode()("a", -1)("b", "abc")("c", false),
+ };
+ for (const auto& expectedRow : expectedRows) {
+ EXPECT_TRUE(reader.IsValid());
+ EXPECT_TRUE(!reader.IsRawReaderExhausted());
+ EXPECT_EQ(reader.GetRow(), expectedRow);
+ reader.Next();
+ }
+ EXPECT_TRUE(!reader.IsValid());
+ EXPECT_TRUE(reader.IsRawReaderExhausted());
+}
+
+TEST(TReadersTest, SkiffExtraColumns)
+{
+ const char arr[] = "\x00\x00" "\x7B\x00\x00\x00\x00\x00\x00\x00";
+ auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>(TString(arr, sizeof(arr) - 1));
+
+ TSkiffSchemaPtr schema = CreateVariant16Schema({
+ CreateTupleSchema({
+ CreateSimpleTypeSchema(EWireType::Uint64)->SetName("$timestamp")
+ })
+ });
+
+ TSkiffTableReader reader(proxy, schema);
+ TVector<TNode> expectedRows = {
+ TNode()("$timestamp", 123u),
+ };
+ for (const auto& expectedRow : expectedRows) {
+ EXPECT_TRUE(reader.IsValid());
+ EXPECT_TRUE(!reader.IsRawReaderExhausted());
+ EXPECT_EQ(reader.GetRow(), expectedRow);
+ reader.Next();
+ }
+ EXPECT_TRUE(!reader.IsValid());
+ EXPECT_TRUE(reader.IsRawReaderExhausted());
+}
+
+TEST(TReadersTest, SkiffBad)
+{
+ const char arr[] = "\x00\x00" "\x94\x88\x01\x00\x00\x00\x00\x00" "\xFF\x00\x00\x00""foobar" "\x01";
+ auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>(TString(arr, sizeof(arr) - 1));
+
+ TSkiffSchemaPtr schema = CreateVariant16Schema({
+ CreateTupleSchema({
+ CreateSimpleTypeSchema(EWireType::Int64)->SetName("a"),
+ CreateSimpleTypeSchema(EWireType::String32)->SetName("b"),
+ CreateSimpleTypeSchema(EWireType::Boolean)->SetName("c")
+ })
+ });
+
+ EXPECT_THROW(TSkiffTableReader(proxy, schema).GetRow(), yexception);
+}
+
+TEST(TReadersTest, SkiffBadFormat)
+{
+ const char arr[] = "\x00\x00" "\x12" "\x23\x34\x00\x00";
+ auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>(TString(arr, sizeof(arr) - 1));
+
+ TSkiffSchemaPtr schema = CreateVariant16Schema({
+ CreateTupleSchema({
+ CreateVariant8Schema({
+ CreateSimpleTypeSchema(EWireType::Nothing),
+ CreateSimpleTypeSchema(EWireType::Int32)
+ })
+ })
+ });
+
+ EXPECT_THROW_MESSAGE_HAS_SUBSTR(
+ TSkiffTableReader(proxy, schema).GetRow(),
+ yexception,
+ "Tag for 'variant8<nothing,int32>' expected to be 0 or 1");
+}
+
+TEST(TReadersTest, ProtobufGood)
+{
+ using NYT::NTesting::TRow;
+
+ const char arr[] = "\x13\x00\x00\x00" "\x0A""\x06""foobar" "\x10""\x0F" "\x19""\x94\x88\x01\x00\x00\x00\x00\x00"
+ "\x10\x00\x00\x00" "\x0A""\x03""abc" "\x10""\x1F" "\x19""\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF";
+ auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>(TString(arr, sizeof(arr) - 1));
+
+ TLenvalProtoTableReader reader(proxy, {TRow::descriptor()});
+ TRow row1, row2;
+ row1.SetString("foobar");
+ row1.SetInt32(15);
+ row1.SetFixed64(100500);
+
+ row2.SetString("abc");
+ row2.SetInt32(31);
+ row2.SetFixed64(-1);
+
+ TVector<TRow> expectedRows = {row1, row2};
+ for (const auto& expectedRow : expectedRows) {
+ TRow row;
+ EXPECT_TRUE(reader.IsValid());
+ EXPECT_TRUE(!reader.IsRawReaderExhausted());
+ reader.ReadRow(&row);
+ EXPECT_EQ(row.GetString(), expectedRow.GetString());
+ EXPECT_EQ(row.GetInt32(), expectedRow.GetInt32());
+ EXPECT_EQ(row.GetFixed64(), expectedRow.GetFixed64());
+ reader.Next();
+ }
+ EXPECT_TRUE(!reader.IsValid());
+ EXPECT_TRUE(reader.IsRawReaderExhausted());
+}
+
+TEST(TReadersTest, ProtobufBad)
+{
+ const char arr[] = "\x13\x00\x00\x00" "\x0F""\x06""foobar" "\x10""\x0F" "\x19""\x94\x88\x01\x00\x00\x00\x00\x00";
+ auto proxy = ::MakeIntrusive<TRetryEmulatingRawTableReader>(TString(arr, sizeof(arr) - 1));
+
+ NYT::NTesting::TRow row;
+ EXPECT_THROW(TLenvalProtoTableReader(proxy, { NYT::NTesting::TRow::descriptor() }).ReadRow(&row), yexception);
+}
+
+////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/io/ut/ut_row.proto b/yt/cpp/mapreduce/io/ut/ut_row.proto
new file mode 100644
index 0000000000..6a9649e4c6
--- /dev/null
+++ b/yt/cpp/mapreduce/io/ut/ut_row.proto
@@ -0,0 +1,7 @@
+package NYT.NTesting;
+
+message TRow {
+ optional string String = 1;
+ optional int32 Int32 = 2;
+ optional fixed64 Fixed64 = 3;
+}
diff --git a/yt/cpp/mapreduce/io/ut/ya.make b/yt/cpp/mapreduce/io/ut/ya.make
new file mode 100644
index 0000000000..e7f9d018c3
--- /dev/null
+++ b/yt/cpp/mapreduce/io/ut/ya.make
@@ -0,0 +1,15 @@
+GTEST()
+
+SRCS(
+ end_of_stream_ut.cpp
+ readers_ut.cpp
+ yamr_table_reader_ut.cpp
+
+ ut_row.proto
+)
+
+PEERDIR(
+ yt/cpp/mapreduce/io
+)
+
+END()
diff --git a/yt/cpp/mapreduce/io/ut/yamr_table_reader_ut.cpp b/yt/cpp/mapreduce/io/ut/yamr_table_reader_ut.cpp
new file mode 100644
index 0000000000..d55a28b3e4
--- /dev/null
+++ b/yt/cpp/mapreduce/io/ut/yamr_table_reader_ut.cpp
@@ -0,0 +1,185 @@
+#include <yt/cpp/mapreduce/io/yamr_table_reader.h>
+
+#include <library/cpp/testing/gtest/gtest.h>
+
+using namespace NYT;
+
+template <>
+void Out<std::tuple<TString, TString, TString>>(IOutputStream& out, const std::tuple<TString, TString, TString>& value) {
+ out << "{" << std::get<0>(value) << ", " << std::get<1>(value) << ", " << std::get<2>(value) << "}";
+}
+
+
+////////////////////////////////////////////////////////////////////
+
+class TRowCollection
+{
+public:
+ void AddRow(TStringBuf key, TStringBuf subkey, TStringBuf value)
+ {
+ TStringStream row;
+ auto appendLenval = [&] (TStringBuf value) {
+ ui32 size = value.size();
+ row.Write(&size, sizeof(size));
+ row.Write(value);
+ };
+ appendLenval(key);
+ appendLenval(subkey);
+ appendLenval(value);
+ RowList_.push_back(row.Str());
+ }
+
+ TString GetStreamDataStartFromRow(ui64 rowIndex) const
+ {
+ Y_ABORT_UNLESS(rowIndex < RowList_.size());
+ TStringStream ss;
+ ss.Write("\xFC\xFF\xFF\xFF");
+ ss.Write(&rowIndex, sizeof(rowIndex));
+ for (size_t i = rowIndex; i != RowList_.size(); ++i) {
+ ss.Write(RowList_[i]);
+ }
+ return ss.Str();
+ }
+
+ size_t ComputeTotalStreamSize() const {
+ return GetStreamDataStartFromRow(0).size();
+ }
+
+private:
+ TVector<TString> RowList_;
+};
+
+class TTestRawTableReader
+ : public TRawTableReader
+{
+public:
+ TTestRawTableReader(const TRowCollection& rowCollection)
+ : RowCollection_(rowCollection)
+ , DataToRead_(RowCollection_.GetStreamDataStartFromRow(0))
+ , Input_(MakeHolder<TStringStream>(DataToRead_))
+ { }
+
+ TTestRawTableReader(const TRowCollection& rowCollection, size_t breakPoint)
+ : RowCollection_(rowCollection)
+ , DataToRead_(RowCollection_.GetStreamDataStartFromRow(0).substr(0, breakPoint))
+ , Input_(MakeHolder<TStringStream>(DataToRead_))
+ , Broken_(true)
+ { }
+
+ size_t DoRead(void* buf, size_t size) override
+ {
+ Y_ABORT_UNLESS(Input_);
+ size_t res = Input_->Read(buf, size);
+ if (!res && Broken_) {
+ ythrow yexception() << "Stream is broken";
+ }
+ return res;
+ }
+
+ bool Retry(
+ const TMaybe<ui32>& /*rangeIndex*/,
+ const TMaybe<ui64>& rowIndex,
+ const std::exception_ptr& /*error*/) override
+ {
+ if (--Retries < 0) {
+ return false;
+ }
+ ui64 actualRowIndex = rowIndex ? *rowIndex : 0;
+ DataToRead_ = RowCollection_.GetStreamDataStartFromRow(actualRowIndex);
+ Input_ = MakeHolder<TStringInput>(DataToRead_);
+ Broken_ = false;
+ return true;
+ }
+
+ void ResetRetries() override
+ { }
+
+ bool HasRangeIndices() const override
+ {
+ return false;
+ }
+
+private:
+ TRowCollection RowCollection_;
+ TString DataToRead_;
+ THolder<IInputStream> Input_;
+ bool Broken_ = false;
+ i32 Retries = 1;
+};
+
+TEST(TYamrTableReaderTest, TestReadRetry)
+{
+ const TVector<std::tuple<TString, TString, TString>> expectedResult = {
+ {"foo1", "bar1", "baz1"},
+ {"foo2", "bar2", "baz2"},
+ {"foo3", "bar3", "baz3"},
+ };
+
+ TRowCollection rowCollection;
+ for (const auto& row : expectedResult) {
+ rowCollection.AddRow(std::get<0>(row), std::get<1>(row), std::get<2>(row));
+ }
+
+ ssize_t streamSize = rowCollection.ComputeTotalStreamSize();
+
+ for (ssize_t breakPoint = -1; breakPoint < streamSize; ++breakPoint) {
+ ::TIntrusivePtr<TRawTableReader> rawReader;
+ if (breakPoint == -1) {
+ rawReader = ::MakeIntrusive<TTestRawTableReader>(rowCollection);
+ } else {
+ rawReader = ::MakeIntrusive<TTestRawTableReader>(rowCollection, static_cast<size_t>(breakPoint));
+ }
+
+ TYaMRTableReader tableReader(rawReader);
+ TVector<std::tuple<TString, TString, TString>> actualResult;
+ for (; tableReader.IsValid(); tableReader.Next()) {
+ EXPECT_TRUE(!tableReader.IsRawReaderExhausted());
+ auto row = tableReader.GetRow();
+ actualResult.emplace_back(row.Key, row.SubKey, row.Value);
+ }
+ EXPECT_TRUE(tableReader.IsRawReaderExhausted());
+ EXPECT_EQ(actualResult, expectedResult);
+ }
+}
+
+TEST(TYamrTableReaderTest, TestSkipRetry)
+{
+ const TVector<std::tuple<TString, TString, TString>> expectedResult = {
+ {"foo1", "bar1", "baz1"},
+ {"foo2", "bar2", "baz2"},
+ {"foo3", "bar3", "baz3"},
+ };
+
+ TRowCollection rowCollection;
+ for (const auto& row : expectedResult) {
+ rowCollection.AddRow(std::get<0>(row), std::get<1>(row), std::get<2>(row));
+ }
+
+ ssize_t streamSize = rowCollection.ComputeTotalStreamSize();
+
+ for (ssize_t breakPoint = -1; breakPoint < streamSize; ++breakPoint) {
+ try {
+ ::TIntrusivePtr<TRawTableReader> rawReader;
+ if (breakPoint == -1) {
+ rawReader = ::MakeIntrusive<TTestRawTableReader>(rowCollection);
+ } else {
+ rawReader = ::MakeIntrusive<TTestRawTableReader>(rowCollection, static_cast<size_t>(breakPoint));
+ }
+
+ TYaMRTableReader tableReader(rawReader);
+ ui32 rowCount = 0;
+ for (; tableReader.IsValid(); tableReader.Next()) {
+ EXPECT_TRUE(!tableReader.IsRawReaderExhausted());
+ ++rowCount;
+ }
+ EXPECT_TRUE(tableReader.IsRawReaderExhausted());
+ EXPECT_EQ(rowCount, 3u);
+ } catch (const std::exception& ex) {
+ Cerr << breakPoint << Endl;
+ Cerr << ex.what() << Endl;
+ throw;
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////