aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/io
diff options
context:
space:
mode:
authormax42 <max42@yandex-team.com>2023-06-30 03:37:03 +0300
committermax42 <max42@yandex-team.com>2023-06-30 03:37:03 +0300
commitfac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch)
treeb8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/io
parent7bf166b1a7ed0af927f230022b245af618e998c1 (diff)
downloadydb-fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a.tar.gz
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text. This commit has zero effect on all projects that depend on YQL. The summary of changes: - `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library; - `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes; - `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`; - `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`; - `yql/core` is gone; - `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`. **NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/io')
-rw-r--r--yt/cpp/mapreduce/io/counting_raw_reader.cpp38
-rw-r--r--yt/cpp/mapreduce/io/counting_raw_reader.h31
-rw-r--r--yt/cpp/mapreduce/io/helpers.h130
-rw-r--r--yt/cpp/mapreduce/io/job_reader.cpp46
-rw-r--r--yt/cpp/mapreduce/io/job_reader.h38
-rw-r--r--yt/cpp/mapreduce/io/job_writer.cpp68
-rw-r--r--yt/cpp/mapreduce/io/job_writer.h43
-rw-r--r--yt/cpp/mapreduce/io/lenval_table_reader.cpp198
-rw-r--r--yt/cpp/mapreduce/io/lenval_table_reader.h67
-rw-r--r--yt/cpp/mapreduce/io/node_table_reader.cpp375
-rw-r--r--yt/cpp/mapreduce/io/node_table_reader.h91
-rw-r--r--yt/cpp/mapreduce/io/node_table_writer.cpp72
-rw-r--r--yt/cpp/mapreduce/io/node_table_writer.h33
-rw-r--r--yt/cpp/mapreduce/io/proto_helpers.cpp101
-rw-r--r--yt/cpp/mapreduce/io/proto_helpers.h36
-rw-r--r--yt/cpp/mapreduce/io/proto_table_reader.cpp305
-rw-r--r--yt/cpp/mapreduce/io/proto_table_reader.h76
-rw-r--r--yt/cpp/mapreduce/io/proto_table_writer.cpp184
-rw-r--r--yt/cpp/mapreduce/io/proto_table_writer.h61
-rw-r--r--yt/cpp/mapreduce/io/skiff_row_table_reader.cpp232
-rw-r--r--yt/cpp/mapreduce/io/skiff_row_table_reader.h67
-rw-r--r--yt/cpp/mapreduce/io/skiff_table_reader.cpp293
-rw-r--r--yt/cpp/mapreduce/io/skiff_table_reader.h65
-rw-r--r--yt/cpp/mapreduce/io/stream_raw_reader.cpp59
-rw-r--r--yt/cpp/mapreduce/io/stream_table_reader.h65
-rw-r--r--yt/cpp/mapreduce/io/ya.make33
-rw-r--r--yt/cpp/mapreduce/io/yamr_table_reader.cpp145
-rw-r--r--yt/cpp/mapreduce/io/yamr_table_reader.h48
-rw-r--r--yt/cpp/mapreduce/io/yamr_table_writer.cpp53
-rw-r--r--yt/cpp/mapreduce/io/yamr_table_writer.h31
30 files changed, 3084 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/io/counting_raw_reader.cpp b/yt/cpp/mapreduce/io/counting_raw_reader.cpp
new file mode 100644
index 0000000000..6a918bdddb
--- /dev/null
+++ b/yt/cpp/mapreduce/io/counting_raw_reader.cpp
@@ -0,0 +1,38 @@
+#include "counting_raw_reader.h"
+
+namespace NYT {
+namespace NDetail {
+
+////////////////////////////////////////////////////////////////////////////////
+
+bool TCountingRawTableReader::Retry(const TMaybe<ui32>& rangeIndex, const TMaybe<ui64>& rowIndex)
+{
+ return Reader_->Retry(rangeIndex, rowIndex);
+}
+
+void TCountingRawTableReader::ResetRetries()
+{
+ Reader_->ResetRetries();
+}
+
+bool TCountingRawTableReader::HasRangeIndices() const
+{
+ return Reader_->HasRangeIndices();
+}
+
+size_t TCountingRawTableReader::GetReadByteCount() const
+{
+ return ReadByteCount_;
+}
+
+size_t TCountingRawTableReader::DoRead(void* buf, size_t len)
+{
+ auto readLen = Reader_->Read(buf, len);
+ ReadByteCount_ += readLen;
+ return readLen;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NDetail
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/io/counting_raw_reader.h b/yt/cpp/mapreduce/io/counting_raw_reader.h
new file mode 100644
index 0000000000..3b6705c5e4
--- /dev/null
+++ b/yt/cpp/mapreduce/io/counting_raw_reader.h
@@ -0,0 +1,31 @@
+#pragma once
+
+#include <yt/cpp/mapreduce/interface/io.h>
+
+namespace NYT {
+namespace NDetail {
+
+class TCountingRawTableReader
+ final : public TRawTableReader
+{
+public:
+ TCountingRawTableReader(::TIntrusivePtr<TRawTableReader> reader)
+ : Reader_(std::move(reader))
+ { }
+
+ bool Retry(const TMaybe<ui32>& rangeIndex, const TMaybe<ui64>& rowIndex) override;
+ void ResetRetries() override;
+ bool HasRangeIndices() const override;
+
+ size_t GetReadByteCount() const;
+
+protected:
+ size_t DoRead(void* buf, size_t len) override;
+
+private:
+ ::TIntrusivePtr<TRawTableReader> Reader_;
+ size_t ReadByteCount_ = 0;
+};
+
+} // namespace NDetail
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/io/helpers.h b/yt/cpp/mapreduce/io/helpers.h
new file mode 100644
index 0000000000..5dbbf20906
--- /dev/null
+++ b/yt/cpp/mapreduce/io/helpers.h
@@ -0,0 +1,130 @@
+#pragma once
+
+#include <yt/cpp/mapreduce/interface/io.h>
+#include <yt/cpp/mapreduce/interface/config.h>
+#include <yt/cpp/mapreduce/common/helpers.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+template <class TOptions>
+struct TIOOptionsTraits;
+
+template <>
+struct TIOOptionsTraits<TFileReaderOptions>
+{
+ static constexpr const char* const ConfigName = "file_reader";
+};
+template <>
+struct TIOOptionsTraits<TFileWriterOptions>
+{
+ static constexpr const char* const ConfigName = "file_writer";
+};
+template <>
+struct TIOOptionsTraits<TTableReaderOptions>
+{
+ static constexpr const char* const ConfigName = "table_reader";
+};
+template <>
+struct TIOOptionsTraits<TTableWriterOptions>
+{
+ static constexpr const char* const ConfigName = "table_writer";
+};
+
+template <class TOptions>
+TNode FormIORequestParameters(
+ const TRichYPath& path,
+ const TOptions& options)
+{
+ auto params = PathToParamNode(path);
+ if (options.Config_) {
+ params[TIOOptionsTraits<TOptions>::ConfigName] = *options.Config_;
+ }
+ return params;
+}
+
+template <>
+inline TNode FormIORequestParameters(
+ const TRichYPath& path,
+ const TFileReaderOptions& options)
+{
+ auto params = PathToParamNode(path);
+ if (options.Config_) {
+ params[TIOOptionsTraits<TTableReaderOptions>::ConfigName] = *options.Config_;
+ }
+ if (options.Offset_) {
+ params["offset"] = *options.Offset_;
+ }
+ if (options.Length_) {
+ params["length"] = *options.Length_;
+ }
+ return params;
+}
+
+static void AddWriterOptionsToNode(const TWriterOptions& options, TNode* node)
+{
+ if (options.EnableEarlyFinish_) {
+ (*node)["enable_early_finish"] = *options.EnableEarlyFinish_;
+ }
+ if (options.UploadReplicationFactor_) {
+ (*node)["upload_replication_factor"] = *options.UploadReplicationFactor_;
+ }
+ if (options.MinUploadReplicationFactor_) {
+ (*node)["min_upload_replication_factor"] = *options.MinUploadReplicationFactor_;
+ }
+ if (options.DesiredChunkSize_) {
+ (*node)["desired_chunk_size"] = *options.DesiredChunkSize_;
+ }
+}
+
+template <>
+inline TNode FormIORequestParameters(
+ const TRichYPath& path,
+ const TFileWriterOptions& options)
+{
+ auto params = PathToParamNode(path);
+ TNode fileWriter = TNode::CreateMap();
+ if (options.Config_) {
+ fileWriter = *options.Config_;
+ }
+ if (options.WriterOptions_) {
+ AddWriterOptionsToNode(*options.WriterOptions_, &fileWriter);
+ }
+ if (fileWriter.Empty()) {
+ AddWriterOptionsToNode(
+ TWriterOptions()
+ .EnableEarlyFinish(true)
+ .UploadReplicationFactor(3)
+ .MinUploadReplicationFactor(2),
+ &fileWriter);
+ }
+ params[TIOOptionsTraits<TFileWriterOptions>::ConfigName] = fileWriter;
+ if (options.ComputeMD5_) {
+ params["compute_md5"] = *options.ComputeMD5_;
+ }
+ return params;
+}
+
+template <>
+inline TNode FormIORequestParameters(
+ const TRichYPath& path,
+ const TTableWriterOptions& options)
+{
+ auto params = PathToParamNode(path);
+ auto tableWriter = TConfig::Get()->TableWriter;
+ if (options.Config_) {
+ MergeNodes(tableWriter, *options.Config_);
+ }
+ if (options.WriterOptions_) {
+ AddWriterOptionsToNode(*options.WriterOptions_, &tableWriter);
+ }
+ if (!tableWriter.Empty()) {
+ params[TIOOptionsTraits<TTableWriterOptions>::ConfigName] = std::move(tableWriter);
+ }
+ return params;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+}
diff --git a/yt/cpp/mapreduce/io/job_reader.cpp b/yt/cpp/mapreduce/io/job_reader.cpp
new file mode 100644
index 0000000000..39056f00e2
--- /dev/null
+++ b/yt/cpp/mapreduce/io/job_reader.cpp
@@ -0,0 +1,46 @@
+#include "job_reader.h"
+
+#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TJobReader::TJobReader(int fd)
+ : TJobReader(Duplicate(fd))
+{ }
+
+TJobReader::TJobReader(const TFile& file)
+ : FdFile_(file)
+ , FdInput_(FdFile_)
+ , BufferedInput_(&FdInput_, BUFFER_SIZE)
+{ }
+
+bool TJobReader::Retry(const TMaybe<ui32>& /*rangeIndex*/, const TMaybe<ui64>& /*rowIndex*/)
+{
+ return false;
+}
+
+void TJobReader::ResetRetries()
+{ }
+
+bool TJobReader::HasRangeIndices() const
+{
+ return true;
+}
+
+size_t TJobReader::DoRead(void* buf, size_t len)
+{
+ return BufferedInput_.Read(buf, len);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TRawTableReaderPtr CreateRawJobReader(int fd)
+{
+ return ::MakeIntrusive<TJobReader>(fd);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/io/job_reader.h b/yt/cpp/mapreduce/io/job_reader.h
new file mode 100644
index 0000000000..ce62ec180f
--- /dev/null
+++ b/yt/cpp/mapreduce/io/job_reader.h
@@ -0,0 +1,38 @@
+#pragma once
+
+#include <yt/cpp/mapreduce/interface/io.h>
+
+#include <util/stream/buffered.h>
+#include <util/stream/file.h>
+#include <util/system/file.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TJobReader
+ : public TRawTableReader
+{
+public:
+ explicit TJobReader(int fd);
+ explicit TJobReader(const TFile& file);
+
+ virtual bool Retry( const TMaybe<ui32>& /*rangeIndex*/, const TMaybe<ui64>& /*rowIndex*/) override;
+ virtual void ResetRetries() override;
+ virtual bool HasRangeIndices() const override;
+
+protected:
+ size_t DoRead(void* buf, size_t len) override;
+
+private:
+ TFile FdFile_;
+ TUnbufferedFileInput FdInput_;
+ TBufferedInput BufferedInput_;
+
+ static const size_t BUFFER_SIZE = 64 << 10;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/io/job_writer.cpp b/yt/cpp/mapreduce/io/job_writer.cpp
new file mode 100644
index 0000000000..d08bb0a665
--- /dev/null
+++ b/yt/cpp/mapreduce/io/job_writer.cpp
@@ -0,0 +1,68 @@
+#include "job_writer.h"
+
+#include <yt/cpp/mapreduce/interface/io.h>
+
+#include <util/system/file.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TJobWriter::TStream::TStream(int fd)
+ : TStream(Duplicate(fd))
+{ }
+
+TJobWriter::TStream::TStream(const TFile& file)
+ : FdFile(file)
+ , FdOutput(FdFile)
+ , BufferedOutput(&FdOutput, BUFFER_SIZE)
+{ }
+
+TJobWriter::TStream::~TStream()
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TJobWriter::TJobWriter(size_t outputTableCount)
+{
+ for (size_t i = 0; i < outputTableCount; ++i) {
+ Streams_.emplace_back(MakeHolder<TStream>(int(i * 3 + 1)));
+ }
+}
+
+TJobWriter::TJobWriter(const TVector<TFile>& fileList)
+{
+ for (const auto& f : fileList) {
+ Streams_.emplace_back(MakeHolder<TStream>(f));
+ }
+}
+
+size_t TJobWriter::GetStreamCount() const
+{
+ return Streams_.size();
+}
+
+IOutputStream* TJobWriter::GetStream(size_t tableIndex) const
+{
+ if (tableIndex >= Streams_.size()) {
+ ythrow TIOException() <<
+ "Table index " << tableIndex <<
+ " is out of range [0, " << Streams_.size() << ")";
+ }
+ return &Streams_[tableIndex]->BufferedOutput;
+}
+
+void TJobWriter::OnRowFinished(size_t)
+{ }
+
+////////////////////////////////////////////////////////////////////////////////
+
+THolder<IProxyOutput> CreateRawJobWriter(size_t outputTableCount)
+{
+ return ::MakeHolder<TJobWriter>(outputTableCount);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/io/job_writer.h b/yt/cpp/mapreduce/io/job_writer.h
new file mode 100644
index 0000000000..9b24650640
--- /dev/null
+++ b/yt/cpp/mapreduce/io/job_writer.h
@@ -0,0 +1,43 @@
+#pragma once
+
+#include <yt/cpp/mapreduce/interface/io.h>
+
+#include <util/generic/vector.h>
+#include <util/generic/ptr.h>
+#include <util/stream/file.h>
+#include <util/stream/buffered.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TJobWriter
+ : public IProxyOutput
+{
+public:
+ explicit TJobWriter(size_t outputTableCount);
+ explicit TJobWriter(const TVector<TFile>& fileList);
+
+ size_t GetStreamCount() const override;
+ IOutputStream* GetStream(size_t tableIndex) const override;
+ void OnRowFinished(size_t tableIndex) override;
+
+private:
+ struct TStream {
+ TFile FdFile;
+ TUnbufferedFileOutput FdOutput;
+ TBufferedOutput BufferedOutput;
+
+ explicit TStream(int fd);
+ explicit TStream(const TFile& file);
+ ~TStream();
+
+ static const size_t BUFFER_SIZE = 1 << 20;
+ };
+
+ TVector<THolder<TStream>> Streams_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/io/lenval_table_reader.cpp b/yt/cpp/mapreduce/io/lenval_table_reader.cpp
new file mode 100644
index 0000000000..98274c7996
--- /dev/null
+++ b/yt/cpp/mapreduce/io/lenval_table_reader.cpp
@@ -0,0 +1,198 @@
+#include "lenval_table_reader.h"
+
+#include <yt/cpp/mapreduce/common/helpers.h>
+
+#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
+
+#include <util/string/printf.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+const i32 CONTROL_ATTR_TABLE_INDEX = -1;
+const i32 CONTROL_ATTR_KEY_SWITCH = -2;
+const i32 CONTROL_ATTR_RANGE_INDEX = -3;
+const i32 CONTROL_ATTR_ROW_INDEX = -4;
+const i32 CONTROL_ATTR_END_OF_STREAM = -5;
+const i32 CONTROL_ATTR_TABLET_INDEX = -6;
+
+////////////////////////////////////////////////////////////////////////////////
+
+TLenvalTableReader::TLenvalTableReader(::TIntrusivePtr<TRawTableReader> input)
+ : Input_(std::move(input))
+{
+ TLenvalTableReader::Next();
+}
+
+TLenvalTableReader::~TLenvalTableReader()
+{ }
+
+void TLenvalTableReader::CheckValidity() const
+{
+ if (!IsValid()) {
+ ythrow yexception() << "Iterator is not valid";
+ }
+}
+
+bool TLenvalTableReader::IsValid() const
+{
+ return Valid_;
+}
+
+void TLenvalTableReader::Next()
+{
+ if (!RowTaken_) {
+ SkipRow();
+ }
+
+ CheckValidity();
+
+ if (RowIndex_) {
+ ++*RowIndex_;
+ }
+
+ while (true) {
+ try {
+ i32 value = 0;
+ if (!ReadInteger(&value, true)) {
+ return;
+ }
+
+ while (value < 0 && !IsEndOfStream_) {
+ switch (value) {
+ case CONTROL_ATTR_KEY_SWITCH:
+ if (!AtStart_) {
+ Valid_ = false;
+ return;
+ } else {
+ ReadInteger(&value);
+ }
+ break;
+
+ case CONTROL_ATTR_TABLE_INDEX: {
+ ui32 tmp = 0;
+ ReadInteger(&tmp);
+ TableIndex_ = tmp;
+ ReadInteger(&value);
+ break;
+ }
+ case CONTROL_ATTR_ROW_INDEX: {
+ ui64 tmp = 0;
+ ReadInteger(&tmp);
+ RowIndex_ = tmp;
+ ReadInteger(&value);
+ break;
+ }
+ case CONTROL_ATTR_RANGE_INDEX: {
+ ui32 tmp = 0;
+ ReadInteger(&tmp);
+ RangeIndex_ = tmp;
+ ReadInteger(&value);
+ break;
+ }
+ case CONTROL_ATTR_TABLET_INDEX: {
+ ui64 tmp = 0;
+ ReadInteger(&tmp);
+ TabletIndex_ = tmp;
+ ReadInteger(&value);
+ break;
+ }
+ case CONTROL_ATTR_END_OF_STREAM: {
+ IsEndOfStream_ = true;
+ break;
+ }
+ default:
+ ythrow yexception() <<
+ Sprintf("Invalid control integer %d in lenval stream", value);
+ }
+ }
+
+ Length_ = static_cast<ui32>(value);
+ RowTaken_ = false;
+ AtStart_ = false;
+ } catch (const std::exception& e) {
+ if (!PrepareRetry()) {
+ throw;
+ }
+ continue;
+ }
+ break;
+ }
+}
+
+bool TLenvalTableReader::Retry()
+{
+ if (PrepareRetry()) {
+ RowTaken_ = true;
+ Next();
+ return true;
+ }
+ return false;
+}
+
+void TLenvalTableReader::NextKey()
+{
+ while (Valid_) {
+ Next();
+ }
+
+ if (Finished_) {
+ return;
+ }
+
+ Valid_ = true;
+
+ if (RowIndex_) {
+ --*RowIndex_;
+ }
+
+ RowTaken_ = true;
+}
+
+ui32 TLenvalTableReader::GetTableIndex() const
+{
+ CheckValidity();
+ return TableIndex_;
+}
+
+ui32 TLenvalTableReader::GetRangeIndex() const
+{
+ CheckValidity();
+ return RangeIndex_.GetOrElse(0);
+}
+
+ui64 TLenvalTableReader::GetRowIndex() const
+{
+ CheckValidity();
+ return RowIndex_.GetOrElse(0UL);
+}
+
+TMaybe<size_t> TLenvalTableReader::GetReadByteCount() const
+{
+ return Input_.GetReadByteCount();
+}
+
+bool TLenvalTableReader::IsEndOfStream() const
+{
+ return IsEndOfStream_;
+}
+
+bool TLenvalTableReader::IsRawReaderExhausted() const
+{
+ return Finished_;
+}
+
+bool TLenvalTableReader::PrepareRetry()
+{
+ if (Input_.Retry(RangeIndex_, RowIndex_)) {
+ RowIndex_.Clear();
+ RangeIndex_.Clear();
+ return true;
+ }
+ return false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/io/lenval_table_reader.h b/yt/cpp/mapreduce/io/lenval_table_reader.h
new file mode 100644
index 0000000000..990fe0b756
--- /dev/null
+++ b/yt/cpp/mapreduce/io/lenval_table_reader.h
@@ -0,0 +1,67 @@
+#pragma once
+
+#include "counting_raw_reader.h"
+
+#include <yt/cpp/mapreduce/interface/io.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TLenvalTableReader
+{
+public:
+ explicit TLenvalTableReader(::TIntrusivePtr<TRawTableReader> input);
+ virtual ~TLenvalTableReader();
+
+protected:
+ bool IsValid() const;
+ void Next();
+ ui32 GetTableIndex() const;
+ ui32 GetRangeIndex() const;
+ ui64 GetRowIndex() const;
+ void NextKey();
+ TMaybe<size_t> GetReadByteCount() const;
+ bool IsEndOfStream() const;
+ bool IsRawReaderExhausted() const;
+
+ void CheckValidity() const;
+
+ bool Retry();
+
+ template <class T>
+ bool ReadInteger(T* result, bool acceptEndOfStream = false)
+ {
+ size_t count = Input_.Load(result, sizeof(T));
+ if (acceptEndOfStream && count == 0) {
+ Finished_ = true;
+ Valid_ = false;
+ return false;
+ }
+ Y_ENSURE(count == sizeof(T), "Premature end of stream");
+ return true;
+ }
+
+ virtual void SkipRow() = 0;
+
+protected:
+ NDetail::TCountingRawTableReader Input_;
+
+ bool Valid_ = true;
+ bool Finished_ = false;
+ ui32 TableIndex_ = 0;
+ TMaybe<ui64> RowIndex_;
+ TMaybe<ui32> RangeIndex_;
+ TMaybe<ui64> TabletIndex_;
+ bool IsEndOfStream_ = false;
+ bool AtStart_ = true;
+ bool RowTaken_ = true;
+ ui32 Length_ = 0;
+
+private:
+ bool PrepareRetry();
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/io/node_table_reader.cpp b/yt/cpp/mapreduce/io/node_table_reader.cpp
new file mode 100644
index 0000000000..d39e1398a5
--- /dev/null
+++ b/yt/cpp/mapreduce/io/node_table_reader.cpp
@@ -0,0 +1,375 @@
+#include "node_table_reader.h"
+
+#include <yt/cpp/mapreduce/common/node_builder.h>
+#include <yt/cpp/mapreduce/common/wait_proxy.h>
+
+#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
+
+#include <library/cpp/yson/parser.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TRowBuilder
+ : public ::NYson::TYsonConsumerBase
+{
+public:
+ explicit TRowBuilder(TMaybe<TRowElement>* resultRow);
+
+ void OnStringScalar(TStringBuf value) override;
+ void OnInt64Scalar(i64 value) override;
+ void OnUint64Scalar(ui64 value) override;
+ void OnDoubleScalar(double value) override;
+ void OnBooleanScalar(bool value) override;
+ void OnBeginList() override;
+ void OnEntity() override;
+ void OnListItem() override;
+ void OnEndList() override;
+ void OnBeginMap() override;
+ void OnKeyedItem(TStringBuf key) override;
+ void OnEndMap() override;
+ void OnBeginAttributes() override;
+ void OnEndAttributes() override;
+
+ void Finalize();
+
+private:
+ THolder<TNodeBuilder> Builder_;
+ TRowElement Row_;
+ int Depth_ = 0;
+ bool Started_ = false;
+ TMaybe<TRowElement>* ResultRow_;
+
+ void SaveResultRow();
+};
+
+TRowBuilder::TRowBuilder(TMaybe<TRowElement>* resultRow)
+ : ResultRow_(resultRow)
+{ }
+
+void TRowBuilder::OnStringScalar(TStringBuf value)
+{
+ Row_.Size += sizeof(TNode) + sizeof(TString) + value.size();
+ Builder_->OnStringScalar(value);
+}
+
+void TRowBuilder::OnInt64Scalar(i64 value)
+{
+ Row_.Size += sizeof(TNode);
+ Builder_->OnInt64Scalar(value);
+}
+
+void TRowBuilder::OnUint64Scalar(ui64 value)
+{
+ Row_.Size += sizeof(TNode);
+ Builder_->OnUint64Scalar(value);
+}
+
+void TRowBuilder::OnDoubleScalar(double value)
+{
+ Row_.Size += sizeof(TNode);
+ Builder_->OnDoubleScalar(value);
+}
+
+void TRowBuilder::OnBooleanScalar(bool value)
+{
+ Row_.Size += sizeof(TNode);
+ Builder_->OnBooleanScalar(value);
+}
+
+void TRowBuilder::OnBeginList()
+{
+ ++Depth_;
+ Builder_->OnBeginList();
+}
+
+void TRowBuilder::OnEntity()
+{
+ Row_.Size += sizeof(TNode);
+ Builder_->OnEntity();
+}
+
+void TRowBuilder::OnListItem()
+{
+ if (Depth_ == 0) {
+ SaveResultRow();
+ } else {
+ Builder_->OnListItem();
+ }
+}
+
+void TRowBuilder::OnEndList()
+{
+ --Depth_;
+ Builder_->OnEndList();
+}
+
+void TRowBuilder::OnBeginMap()
+{
+ ++Depth_;
+ Builder_->OnBeginMap();
+}
+
+void TRowBuilder::OnKeyedItem(TStringBuf key)
+{
+ Row_.Size += sizeof(TString) + key.size();
+ Builder_->OnKeyedItem(key);
+}
+
+void TRowBuilder::OnEndMap()
+{
+ --Depth_;
+ Builder_->OnEndMap();
+}
+
+void TRowBuilder::OnBeginAttributes()
+{
+ ++Depth_;
+ Builder_->OnBeginAttributes();
+}
+
+void TRowBuilder::OnEndAttributes()
+{
+ --Depth_;
+ Builder_->OnEndAttributes();
+}
+
+void TRowBuilder::SaveResultRow()
+{
+ if (!Started_) {
+ Started_ = true;
+ } else {
+ *ResultRow_ = std::move(Row_);
+ }
+ Row_.Reset();
+ Builder_.Reset(new TNodeBuilder(&Row_.Node));
+}
+
+void TRowBuilder::Finalize()
+{
+ if (Started_) {
+ *ResultRow_ = std::move(Row_);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TNodeTableReader::TNodeTableReader(::TIntrusivePtr<TRawTableReader> input)
+ : Input_(std::move(input))
+{
+ PrepareParsing();
+ Next();
+}
+
+TNodeTableReader::~TNodeTableReader()
+{
+}
+
+void TNodeTableReader::ParseListFragmentItem() {
+ if (!Parser_->Parse()) {
+ Builder_->Finalize();
+ IsLast_ = true;
+ }
+}
+
+const TNode& TNodeTableReader::GetRow() const
+{
+ CheckValidity();
+ if (!Row_) {
+ ythrow yexception() << "Row is moved";
+ }
+ return Row_->Node;
+}
+
+void TNodeTableReader::MoveRow(TNode* result)
+{
+ CheckValidity();
+ if (!Row_) {
+ ythrow yexception() << "Row is moved";
+ }
+ *result = std::move(Row_->Node);
+ Row_.Clear();
+}
+
+bool TNodeTableReader::IsValid() const
+{
+ return Valid_;
+}
+
+void TNodeTableReader::Next()
+{
+ try {
+ NextImpl();
+ } catch (const std::exception& ex) {
+ YT_LOG_ERROR("TNodeTableReader::Next failed: %v", ex.what());
+ throw;
+ }
+}
+
+void TNodeTableReader::NextImpl()
+{
+ CheckValidity();
+
+ if (RowIndex_) {
+ ++*RowIndex_;
+ }
+
+ // At the begin of stream parser doesn't return a finished row.
+ ParseFirstListFragmentItem();
+
+ while (true) {
+ if (IsLast_) {
+ Finished_ = true;
+ Valid_ = false;
+ break;
+ }
+
+ try {
+ ParseListFragmentItem();
+ } catch (std::exception& ex) {
+ NeedParseFirst_ = true;
+ OnStreamError(std::current_exception(), ex.what());
+ ParseFirstListFragmentItem();
+ continue;
+ }
+
+ Row_ = std::move(*NextRow_);
+ if (!Row_) {
+ throw yexception() << "No row in NextRow_";
+ }
+
+ // We successfully parsed one more row from the stream,
+ // so reset retry count to their initial value.
+ Input_.ResetRetries();
+
+ if (!Row_->Node.IsNull()) {
+ AtStart_ = false;
+ break;
+ }
+
+ for (auto& entry : Row_->Node.GetAttributes().AsMap()) {
+ if (entry.first == "key_switch") {
+ if (!AtStart_) {
+ Valid_ = false;
+ }
+ } else if (entry.first == "table_index") {
+ TableIndex_ = static_cast<ui32>(entry.second.AsInt64());
+ } else if (entry.first == "row_index") {
+ RowIndex_ = static_cast<ui64>(entry.second.AsInt64());
+ } else if (entry.first == "range_index") {
+ RangeIndex_ = static_cast<ui32>(entry.second.AsInt64());
+ } else if (entry.first == "tablet_index") {
+ TabletIndex_ = entry.second.AsInt64();
+ } else if (entry.first == "end_of_stream") {
+ IsEndOfStream_ = true;
+ }
+ }
+
+ if (!Valid_) {
+ break;
+ }
+ }
+}
+
+void TNodeTableReader::ParseFirstListFragmentItem()
+{
+ while (NeedParseFirst_) {
+ try {
+ ParseListFragmentItem();
+ NeedParseFirst_ = false;
+ break;
+ } catch (std::exception& ex) {
+ OnStreamError(std::current_exception(), ex.what());
+ }
+ }
+}
+
+ui32 TNodeTableReader::GetTableIndex() const
+{
+ CheckValidity();
+ return TableIndex_;
+}
+
+ui32 TNodeTableReader::GetRangeIndex() const
+{
+ CheckValidity();
+ return RangeIndex_.GetOrElse(0);
+}
+
+ui64 TNodeTableReader::GetRowIndex() const
+{
+ CheckValidity();
+ return RowIndex_.GetOrElse(0UL);
+}
+
+i64 TNodeTableReader::GetTabletIndex() const
+{
+ CheckValidity();
+ return TabletIndex_.GetOrElse(0L);
+}
+
+void TNodeTableReader::NextKey()
+{
+ while (Valid_) {
+ Next();
+ }
+
+ if (Finished_) {
+ return;
+ }
+
+ Valid_ = true;
+
+ if (RowIndex_) {
+ --*RowIndex_;
+ }
+}
+
+TMaybe<size_t> TNodeTableReader::GetReadByteCount() const
+{
+ return Input_.GetReadByteCount();
+}
+
+bool TNodeTableReader::IsEndOfStream() const
+{
+ return IsEndOfStream_;
+}
+
+bool TNodeTableReader::IsRawReaderExhausted() const
+{
+ return Finished_;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TNodeTableReader::PrepareParsing()
+{
+ NextRow_.Clear();
+ Builder_.Reset(new TRowBuilder(&NextRow_));
+ Parser_.Reset(new ::NYson::TYsonListParser(Builder_.Get(), &Input_));
+}
+
+void TNodeTableReader::OnStreamError(std::exception_ptr exception, TString error)
+{
+ YT_LOG_ERROR("Read error: %v", error);
+ Exception_ = exception;
+ if (Input_.Retry(RangeIndex_, RowIndex_)) {
+ RowIndex_.Clear();
+ RangeIndex_.Clear();
+ PrepareParsing();
+ } else {
+ std::rethrow_exception(Exception_);
+ }
+}
+
+void TNodeTableReader::CheckValidity() const
+{
+ if (!Valid_) {
+ ythrow yexception() << "Iterator is not valid";
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/io/node_table_reader.h b/yt/cpp/mapreduce/io/node_table_reader.h
new file mode 100644
index 0000000000..4fe839eeb6
--- /dev/null
+++ b/yt/cpp/mapreduce/io/node_table_reader.h
@@ -0,0 +1,91 @@
+#pragma once
+
+#include "counting_raw_reader.h"
+
+#include <yt/cpp/mapreduce/interface/io.h>
+
+#include <library/cpp/yson/public.h>
+
+#include <util/stream/input.h>
+#include <util/generic/buffer.h>
+#include <util/system/event.h>
+#include <util/system/thread.h>
+
+#include <atomic>
+
+namespace NYT {
+
+class TRawTableReader;
+class TRowBuilder;
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TRowElement
+{
+ TNode Node;
+ size_t Size = 0;
+
+ void Reset()
+ {
+ Node = TNode();
+ Size = 0;
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TNodeTableReader
+ : public INodeReaderImpl
+{
+public:
+ explicit TNodeTableReader(::TIntrusivePtr<TRawTableReader> input);
+ ~TNodeTableReader() override;
+
+ const TNode& GetRow() const override;
+ void MoveRow(TNode* result) override;
+
+ bool IsValid() const override;
+ void Next() override;
+ ui32 GetTableIndex() const override;
+ ui32 GetRangeIndex() const override;
+ ui64 GetRowIndex() const override;
+ i64 GetTabletIndex() const override;
+ void NextKey() override;
+ TMaybe<size_t> GetReadByteCount() const override;
+ bool IsEndOfStream() const override;
+ bool IsRawReaderExhausted() const override;
+
+private:
+ void NextImpl();
+ void OnStreamError(std::exception_ptr exception, TString error);
+ void CheckValidity() const;
+ void PrepareParsing();
+ void ParseListFragmentItem();
+ void ParseFirstListFragmentItem();
+
+private:
+ NDetail::TCountingRawTableReader Input_;
+
+ bool Valid_ = true;
+ bool Finished_ = false;
+ ui32 TableIndex_ = 0;
+ TMaybe<ui64> RowIndex_;
+ TMaybe<ui32> RangeIndex_;
+ TMaybe<i64> TabletIndex_;
+ bool IsEndOfStream_ = false;
+ bool AtStart_ = true;
+
+ TMaybe<TRowElement> Row_;
+ TMaybe<TRowElement> NextRow_;
+
+ THolder<TRowBuilder> Builder_;
+ THolder<::NYson::TYsonListParser> Parser_;
+
+ std::exception_ptr Exception_;
+ bool NeedParseFirst_ = true;
+ bool IsLast_ = false;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/io/node_table_writer.cpp b/yt/cpp/mapreduce/io/node_table_writer.cpp
new file mode 100644
index 0000000000..dcb5a0f5b5
--- /dev/null
+++ b/yt/cpp/mapreduce/io/node_table_writer.cpp
@@ -0,0 +1,72 @@
+#include "node_table_writer.h"
+
+#include <yt/cpp/mapreduce/common/node_visitor.h>
+
+#include <yt/cpp/mapreduce/interface/io.h>
+
+#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
+
+#include <library/cpp/yson/writer.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TNodeTableWriter::TNodeTableWriter(THolder<IProxyOutput> output, NYson::EYsonFormat format)
+ : Output_(std::move(output))
+{
+ for (size_t i = 0; i < Output_->GetStreamCount(); ++i) {
+ Writers_.push_back(
+ MakeHolder<::NYson::TYsonWriter>(Output_->GetStream(i), format, NYT::NYson::EYsonType::ListFragment));
+ }
+}
+
+TNodeTableWriter::~TNodeTableWriter()
+{ }
+
+size_t TNodeTableWriter::GetTableCount() const
+{
+ return Output_->GetStreamCount();
+}
+
+void TNodeTableWriter::FinishTable(size_t tableIndex) {
+ Output_->GetStream(tableIndex)->Finish();
+}
+
+void TNodeTableWriter::AddRow(const TNode& row, size_t tableIndex)
+{
+ if (row.HasAttributes()) {
+ ythrow TIOException() << "Row cannot have attributes";
+ }
+
+ static const TNode emptyMap = TNode::CreateMap();
+ const TNode* outRow = &emptyMap;
+ if (row.GetType() != TNode::Undefined) {
+ if (!row.IsMap()) {
+ ythrow TIOException() << "Row should be a map node";
+ } else {
+ outRow = &row;
+ }
+ }
+
+ auto* writer = Writers_[tableIndex].Get();
+ writer->OnListItem();
+
+ TNodeVisitor visitor(writer);
+ visitor.Visit(*outRow);
+
+ Output_->OnRowFinished(tableIndex);
+}
+
+void TNodeTableWriter::AddRow(TNode&& row, size_t tableIndex) {
+ AddRow(row, tableIndex);
+}
+
+void TNodeTableWriter::Abort()
+{
+ Output_->Abort();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/io/node_table_writer.h b/yt/cpp/mapreduce/io/node_table_writer.h
new file mode 100644
index 0000000000..4bf8cb2fe7
--- /dev/null
+++ b/yt/cpp/mapreduce/io/node_table_writer.h
@@ -0,0 +1,33 @@
+#pragma once
+
+#include <yt/cpp/mapreduce/interface/io.h>
+#include <library/cpp/yson/public.h>
+
+namespace NYT {
+
+class IProxyOutput;
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TNodeTableWriter
+ : public INodeWriterImpl
+{
+public:
+ explicit TNodeTableWriter(THolder<IProxyOutput> output, ::NYson::EYsonFormat format = ::NYson::EYsonFormat::Binary);
+ ~TNodeTableWriter() override;
+
+ void AddRow(const TNode& row, size_t tableIndex) override;
+ void AddRow(TNode&& row, size_t tableIndex) override;
+
+ size_t GetTableCount() const override;
+ void FinishTable(size_t) override;
+ void Abort() override;
+
+private:
+ THolder<IProxyOutput> Output_;
+ TVector<THolder<::NYson::TYsonWriter>> Writers_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/io/proto_helpers.cpp b/yt/cpp/mapreduce/io/proto_helpers.cpp
new file mode 100644
index 0000000000..2ffbfd8d89
--- /dev/null
+++ b/yt/cpp/mapreduce/io/proto_helpers.cpp
@@ -0,0 +1,101 @@
+#include "proto_helpers.h"
+
+#include <yt/cpp/mapreduce/interface/io.h>
+#include <yt/cpp/mapreduce/interface/fluent.h>
+
+#include <yt/yt_proto/yt/formats/extension.pb.h>
+
+#include <google/protobuf/descriptor.h>
+#include <google/protobuf/descriptor.pb.h>
+#include <google/protobuf/messagext.h>
+#include <google/protobuf/io/coded_stream.h>
+
+#include <util/stream/str.h>
+#include <util/stream/file.h>
+#include <util/folder/path.h>
+
+namespace NYT {
+
+using ::google::protobuf::Message;
+using ::google::protobuf::Descriptor;
+using ::google::protobuf::DescriptorPool;
+
+using ::google::protobuf::io::CodedInputStream;
+using ::google::protobuf::io::TCopyingInputStreamAdaptor;
+
+////////////////////////////////////////////////////////////////////////////////
+
+namespace {
+
+TVector<const Descriptor*> GetJobDescriptors(const TString& fileName)
+{
+ TVector<const Descriptor*> descriptors;
+ if (!TFsPath(fileName).Exists()) {
+ ythrow TIOException() <<
+ "Cannot load '" << fileName << "' file";
+ }
+
+ TIFStream input(fileName);
+ TString line;
+ while (input.ReadLine(line)) {
+ const auto* pool = DescriptorPool::generated_pool();
+ const auto* descriptor = pool->FindMessageTypeByName(line);
+ descriptors.push_back(descriptor);
+ }
+
+ return descriptors;
+}
+
+} // namespace
+
+////////////////////////////////////////////////////////////////////////////////
+
+TVector<const Descriptor*> GetJobInputDescriptors()
+{
+ return GetJobDescriptors("proto_input");
+}
+
+TVector<const Descriptor*> GetJobOutputDescriptors()
+{
+ return GetJobDescriptors("proto_output");
+}
+
+void ValidateProtoDescriptor(
+ const Message& row,
+ size_t tableIndex,
+ const TVector<const Descriptor*>& descriptors,
+ bool isRead)
+{
+ const char* direction = isRead ? "input" : "output";
+
+ if (tableIndex >= descriptors.size()) {
+ ythrow TIOException() <<
+ "Table index " << tableIndex <<
+ " is out of range [0, " << descriptors.size() <<
+ ") in " << direction;
+ }
+
+ if (row.GetDescriptor() != descriptors[tableIndex]) {
+ ythrow TIOException() <<
+ "Invalid row of type " << row.GetDescriptor()->full_name() <<
+ " at index " << tableIndex <<
+ ", row of type " << descriptors[tableIndex]->full_name() <<
+ " expected in " << direction;
+ }
+}
+
+void ParseFromArcadiaStream(IInputStream* stream, Message& row, ui32 length)
+{
+ TLengthLimitedInput input(stream, length);
+ TCopyingInputStreamAdaptor adaptor(&input);
+ CodedInputStream codedStream(&adaptor);
+ codedStream.SetTotalBytesLimit(length + 1);
+ bool parsedOk = row.ParseFromCodedStream(&codedStream);
+ Y_ENSURE(parsedOk, "Failed to parse protobuf message");
+
+ Y_ENSURE(input.Left() == 0);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/io/proto_helpers.h b/yt/cpp/mapreduce/io/proto_helpers.h
new file mode 100644
index 0000000000..9d1ec0027c
--- /dev/null
+++ b/yt/cpp/mapreduce/io/proto_helpers.h
@@ -0,0 +1,36 @@
+#pragma once
+
+#include <yt/cpp/mapreduce/interface/node.h>
+
+namespace google {
+namespace protobuf {
+
+class Message;
+class Descriptor;
+
+}
+}
+
+class IInputStream;
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TVector<const ::google::protobuf::Descriptor*> GetJobInputDescriptors();
+TVector<const ::google::protobuf::Descriptor*> GetJobOutputDescriptors();
+
+void ValidateProtoDescriptor(
+ const ::google::protobuf::Message& row,
+ size_t tableIndex,
+ const TVector<const ::google::protobuf::Descriptor*>& descriptors,
+ bool isRead);
+
+void ParseFromArcadiaStream(
+ IInputStream* stream,
+ ::google::protobuf::Message& row,
+ ui32 size);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/io/proto_table_reader.cpp b/yt/cpp/mapreduce/io/proto_table_reader.cpp
new file mode 100644
index 0000000000..28a4bc8719
--- /dev/null
+++ b/yt/cpp/mapreduce/io/proto_table_reader.cpp
@@ -0,0 +1,305 @@
+#include "proto_table_reader.h"
+
+#include "node_table_reader.h"
+
+#include "proto_helpers.h"
+
+#include <yt/yt_proto/yt/formats/extension.pb.h>
+
+#include <util/string/escape.h>
+#include <util/string/printf.h>
+
+namespace NYT {
+
+using ::google::protobuf::Descriptor;
+using ::google::protobuf::FieldDescriptor;
+using ::google::protobuf::EnumValueDescriptor;
+
+const TString& GetFieldColumnName(const FieldDescriptor* fieldDesc) {
+ const auto& columnName = fieldDesc->options().GetExtension(column_name);
+ if (!columnName.empty()) {
+ return columnName;
+ }
+ const auto& keyColumnName = fieldDesc->options().GetExtension(key_column_name);
+ if (!keyColumnName.empty()) {
+ return keyColumnName;
+ }
+ return fieldDesc->name();
+}
+
+void ReadMessageFromNode(const TNode& node, Message* row)
+{
+ auto* descriptor = row->GetDescriptor();
+ auto* reflection = row->GetReflection();
+
+ int count = descriptor->field_count();
+ for (int i = 0; i < count; ++i) {
+ auto* fieldDesc = descriptor->field(i);
+
+ const auto& columnName = GetFieldColumnName(fieldDesc);
+
+ const auto& nodeMap = node.AsMap();
+ auto it = nodeMap.find(columnName);
+ if (it == nodeMap.end()) {
+ continue; // no such column
+ }
+ auto actualType = it->second.GetType();
+ if (actualType == TNode::Null) {
+ continue; // null field
+ }
+
+ auto checkType = [&columnName] (TNode::EType expected, TNode::EType actual) {
+ if (expected != actual) {
+ ythrow TNode::TTypeError() << "expected node type " << expected
+ << ", actual " << actual << " for node " << columnName.data();
+ }
+ };
+
+ switch (fieldDesc->type()) {
+ case FieldDescriptor::TYPE_STRING:
+ case FieldDescriptor::TYPE_BYTES:
+ checkType(TNode::String, actualType);
+ reflection->SetString(row, fieldDesc, it->second.AsString());
+ break;
+ case FieldDescriptor::TYPE_INT64:
+ case FieldDescriptor::TYPE_SINT64:
+ case FieldDescriptor::TYPE_SFIXED64:
+ checkType(TNode::Int64, actualType);
+ reflection->SetInt64(row, fieldDesc, it->second.AsInt64());
+ break;
+ case FieldDescriptor::TYPE_INT32:
+ case FieldDescriptor::TYPE_SINT32:
+ case FieldDescriptor::TYPE_SFIXED32:
+ checkType(TNode::Int64, actualType);
+ reflection->SetInt32(row, fieldDesc, it->second.AsInt64());
+ break;
+ case FieldDescriptor::TYPE_UINT64:
+ case FieldDescriptor::TYPE_FIXED64:
+ checkType(TNode::Uint64, actualType);
+ reflection->SetUInt64(row, fieldDesc, it->second.AsUint64());
+ break;
+ case FieldDescriptor::TYPE_UINT32:
+ case FieldDescriptor::TYPE_FIXED32:
+ checkType(TNode::Uint64, actualType);
+ reflection->SetUInt32(row, fieldDesc, it->second.AsUint64());
+ break;
+ case FieldDescriptor::TYPE_DOUBLE:
+ checkType(TNode::Double, actualType);
+ reflection->SetDouble(row, fieldDesc, it->second.AsDouble());
+ break;
+ case FieldDescriptor::TYPE_FLOAT:
+ checkType(TNode::Double, actualType);
+ reflection->SetFloat(row, fieldDesc, it->second.AsDouble());
+ break;
+ case FieldDescriptor::TYPE_BOOL:
+ checkType(TNode::Bool, actualType);
+ reflection->SetBool(row, fieldDesc, it->second.AsBool());
+ break;
+ case FieldDescriptor::TYPE_ENUM: {
+ TNode::EType columnType = TNode::String;
+ for (const auto& flag : fieldDesc->options().GetRepeatedExtension(flags)) {
+ if (flag == EWrapperFieldFlag::ENUM_INT) {
+ columnType = TNode::Int64;
+ break;
+ }
+ }
+ checkType(columnType, actualType);
+
+ const EnumValueDescriptor* valueDesc = nullptr;
+ TString stringValue;
+ if (columnType == TNode::String) {
+ const auto& value = it->second.AsString();
+ valueDesc = fieldDesc->enum_type()->FindValueByName(value);
+ stringValue = value;
+ } else if (columnType == TNode::Int64) {
+ const auto& value = it->second.AsInt64();
+ valueDesc = fieldDesc->enum_type()->FindValueByNumber(value);
+ stringValue = ToString(value);
+ } else {
+ Y_FAIL();
+ }
+
+ if (valueDesc == nullptr) {
+ ythrow yexception() << "Failed to parse value '" << EscapeC(stringValue) << "' as " << fieldDesc->enum_type()->full_name();
+ }
+
+ reflection->SetEnum(row, fieldDesc, valueDesc);
+
+ break;
+ }
+ case FieldDescriptor::TYPE_MESSAGE: {
+ checkType(TNode::String, actualType);
+ Message* message = reflection->MutableMessage(row, fieldDesc);
+ if (!message->ParseFromArray(it->second.AsString().data(), it->second.AsString().size())) {
+ ythrow yexception() << "Failed to parse protobuf message";
+ }
+ break;
+ }
+ default:
+ ythrow yexception() << "Incorrect protobuf type";
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TProtoTableReader::TProtoTableReader(
+ ::TIntrusivePtr<TRawTableReader> input,
+ TVector<const Descriptor*>&& descriptors)
+ : NodeReader_(new TNodeTableReader(std::move(input)))
+ , Descriptors_(std::move(descriptors))
+{ }
+
+TProtoTableReader::~TProtoTableReader()
+{ }
+
+void TProtoTableReader::ReadRow(Message* row)
+{
+ const auto& node = NodeReader_->GetRow();
+ ReadMessageFromNode(node, row);
+}
+
+bool TProtoTableReader::IsValid() const
+{
+ return NodeReader_->IsValid();
+}
+
+void TProtoTableReader::Next()
+{
+ NodeReader_->Next();
+}
+
+ui32 TProtoTableReader::GetTableIndex() const
+{
+ return NodeReader_->GetTableIndex();
+}
+
+ui32 TProtoTableReader::GetRangeIndex() const
+{
+ return NodeReader_->GetRangeIndex();
+}
+
+ui64 TProtoTableReader::GetRowIndex() const
+{
+ return NodeReader_->GetRowIndex();
+}
+
+void TProtoTableReader::NextKey()
+{
+ NodeReader_->NextKey();
+}
+
+TMaybe<size_t> TProtoTableReader::GetReadByteCount() const
+{
+ return NodeReader_->GetReadByteCount();
+}
+
+bool TProtoTableReader::IsEndOfStream() const
+{
+ return NodeReader_->IsEndOfStream();
+}
+
+bool TProtoTableReader::IsRawReaderExhausted() const
+{
+ return NodeReader_->IsRawReaderExhausted();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TLenvalProtoTableReader::TLenvalProtoTableReader(
+ ::TIntrusivePtr<TRawTableReader> input,
+ TVector<const Descriptor*>&& descriptors)
+ : TLenvalTableReader(std::move(input))
+ , Descriptors_(std::move(descriptors))
+{ }
+
+TLenvalProtoTableReader::~TLenvalProtoTableReader()
+{ }
+
+void TLenvalProtoTableReader::ReadRow(Message* row)
+{
+ ValidateProtoDescriptor(*row, GetTableIndex(), Descriptors_, true);
+
+ while (true) {
+ try {
+ ParseFromArcadiaStream(&Input_, *row, Length_);
+ RowTaken_ = true;
+
+ // We successfully parsed one more row from the stream,
+ // so reset retry count to their initial value.
+ Input_.ResetRetries();
+
+ break;
+ } catch (const std::exception& ) {
+ if (!TLenvalTableReader::Retry()) {
+ throw;
+ }
+ }
+ }
+}
+
+bool TLenvalProtoTableReader::IsValid() const
+{
+ return TLenvalTableReader::IsValid();
+}
+
+void TLenvalProtoTableReader::Next()
+{
+ TLenvalTableReader::Next();
+}
+
+ui32 TLenvalProtoTableReader::GetTableIndex() const
+{
+ return TLenvalTableReader::GetTableIndex();
+}
+
+ui32 TLenvalProtoTableReader::GetRangeIndex() const
+{
+ return TLenvalTableReader::GetRangeIndex();
+}
+
+ui64 TLenvalProtoTableReader::GetRowIndex() const
+{
+ return TLenvalTableReader::GetRowIndex();
+}
+
+void TLenvalProtoTableReader::NextKey()
+{
+ TLenvalTableReader::NextKey();
+}
+
+TMaybe<size_t> TLenvalProtoTableReader::GetReadByteCount() const
+{
+ return TLenvalTableReader::GetReadByteCount();
+}
+
+bool TLenvalProtoTableReader::IsEndOfStream() const
+{
+ return TLenvalTableReader::IsEndOfStream();
+}
+
+bool TLenvalProtoTableReader::IsRawReaderExhausted() const
+{
+ return TLenvalTableReader::IsRawReaderExhausted();
+}
+
+void TLenvalProtoTableReader::SkipRow()
+{
+ while (true) {
+ try {
+ size_t skipped = Input_.Skip(Length_);
+ if (skipped != Length_) {
+ ythrow yexception() << "Premature end of stream";
+ }
+ break;
+ } catch (const std::exception& ) {
+ if (!TLenvalTableReader::Retry()) {
+ throw;
+ }
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/io/proto_table_reader.h b/yt/cpp/mapreduce/io/proto_table_reader.h
new file mode 100644
index 0000000000..05a528b9c6
--- /dev/null
+++ b/yt/cpp/mapreduce/io/proto_table_reader.h
@@ -0,0 +1,76 @@
+#pragma once
+
+#include "lenval_table_reader.h"
+
+#include <yt/cpp/mapreduce/interface/io.h>
+
+namespace NYT {
+
+class TRawTableReader;
+class TNodeTableReader;
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TProtoTableReader
+ : public IProtoReaderImpl
+{
+public:
+ explicit TProtoTableReader(
+ ::TIntrusivePtr<TRawTableReader> input,
+ TVector<const ::google::protobuf::Descriptor*>&& descriptors);
+ ~TProtoTableReader() override;
+
+ void ReadRow(Message* row) override;
+
+ bool IsValid() const override;
+ void Next() override;
+ ui32 GetTableIndex() const override;
+ ui32 GetRangeIndex() const override;
+ ui64 GetRowIndex() const override;
+ void NextKey() override;
+ TMaybe<size_t> GetReadByteCount() const override;
+ bool IsEndOfStream() const override;
+ bool IsRawReaderExhausted() const override;
+
+private:
+ THolder<TNodeTableReader> NodeReader_;
+ TVector<const ::google::protobuf::Descriptor*> Descriptors_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TLenvalProtoTableReader
+ : public IProtoReaderImpl
+ , public TLenvalTableReader
+{
+public:
+ explicit TLenvalProtoTableReader(
+ ::TIntrusivePtr<TRawTableReader> input,
+ TVector<const ::google::protobuf::Descriptor*>&& descriptors);
+ ~TLenvalProtoTableReader() override;
+
+ void ReadRow(Message* row) override;
+
+ bool IsValid() const override;
+ void Next() override;
+ ui32 GetTableIndex() const override;
+ ui32 GetRangeIndex() const override;
+ ui64 GetRowIndex() const override;
+ void NextKey() override;
+ TMaybe<size_t> GetReadByteCount() const override;
+ bool IsEndOfStream() const override;
+ bool IsRawReaderExhausted() const override;
+
+protected:
+ void SkipRow() override;
+
+private:
+ TVector<const ::google::protobuf::Descriptor*> Descriptors_;
+};
+
+// Sometime useful outside mapreduce/yt
+void ReadMessageFromNode(const TNode& node, Message* row);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/io/proto_table_writer.cpp b/yt/cpp/mapreduce/io/proto_table_writer.cpp
new file mode 100644
index 0000000000..1ce7811625
--- /dev/null
+++ b/yt/cpp/mapreduce/io/proto_table_writer.cpp
@@ -0,0 +1,184 @@
+#include "proto_table_writer.h"
+
+#include "node_table_writer.h"
+#include "proto_helpers.h"
+
+#include <yt/cpp/mapreduce/common/node_builder.h>
+
+#include <yt/cpp/mapreduce/interface/io.h>
+
+#include <yt/yt_proto/yt/formats/extension.pb.h>
+
+#include <google/protobuf/unknown_field_set.h>
+
+namespace NYT {
+
+using ::google::protobuf::Descriptor;
+using ::google::protobuf::FieldDescriptor;
+
+////////////////////////////////////////////////////////////////////////////////
+
+TNode MakeNodeFromMessage(const Message& row)
+{
+ TNode node;
+ TNodeBuilder builder(&node);
+ builder.OnBeginMap();
+
+ auto* descriptor = row.GetDescriptor();
+ auto* reflection = row.GetReflection();
+
+ int count = descriptor->field_count();
+ for (int i = 0; i < count; ++i) {
+ auto* fieldDesc = descriptor->field(i);
+ if (fieldDesc->is_repeated()) {
+ Y_ENSURE(reflection->FieldSize(row, fieldDesc) == 0, "Storing repeated protobuf fields is not supported yet");
+ continue;
+ } else if (!reflection->HasField(row, fieldDesc)) {
+ continue;
+ }
+
+ TString columnName = fieldDesc->options().GetExtension(column_name);
+ if (columnName.empty()) {
+ const auto& keyColumnName = fieldDesc->options().GetExtension(key_column_name);
+ columnName = keyColumnName.empty() ? fieldDesc->name() : keyColumnName;
+ }
+
+ builder.OnKeyedItem(columnName);
+
+ switch (fieldDesc->type()) {
+ case FieldDescriptor::TYPE_STRING:
+ case FieldDescriptor::TYPE_BYTES:
+ builder.OnStringScalar(reflection->GetString(row, fieldDesc));
+ break;
+ case FieldDescriptor::TYPE_INT64:
+ case FieldDescriptor::TYPE_SINT64:
+ case FieldDescriptor::TYPE_SFIXED64:
+ builder.OnInt64Scalar(reflection->GetInt64(row, fieldDesc));
+ break;
+ case FieldDescriptor::TYPE_INT32:
+ case FieldDescriptor::TYPE_SINT32:
+ case FieldDescriptor::TYPE_SFIXED32:
+ builder.OnInt64Scalar(reflection->GetInt32(row, fieldDesc));
+ break;
+ case FieldDescriptor::TYPE_UINT64:
+ case FieldDescriptor::TYPE_FIXED64:
+ builder.OnUint64Scalar(reflection->GetUInt64(row, fieldDesc));
+ break;
+ case FieldDescriptor::TYPE_UINT32:
+ case FieldDescriptor::TYPE_FIXED32:
+ builder.OnUint64Scalar(reflection->GetUInt32(row, fieldDesc));
+ break;
+ case FieldDescriptor::TYPE_DOUBLE:
+ builder.OnDoubleScalar(reflection->GetDouble(row, fieldDesc));
+ break;
+ case FieldDescriptor::TYPE_FLOAT:
+ builder.OnDoubleScalar(reflection->GetFloat(row, fieldDesc));
+ break;
+ case FieldDescriptor::TYPE_BOOL:
+ builder.OnBooleanScalar(reflection->GetBool(row, fieldDesc));
+ break;
+ case FieldDescriptor::TYPE_ENUM:
+ builder.OnStringScalar(reflection->GetEnum(row, fieldDesc)->name());
+ break;
+ case FieldDescriptor::TYPE_MESSAGE:
+ builder.OnStringScalar(reflection->GetMessage(row, fieldDesc).SerializeAsString());
+ break;
+ default:
+ ythrow yexception() << "Invalid field type for column: " << columnName;
+ break;
+ }
+ }
+
+ builder.OnEndMap();
+ return node;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TProtoTableWriter::TProtoTableWriter(
+ THolder<IProxyOutput> output,
+ TVector<const Descriptor*>&& descriptors)
+ : NodeWriter_(new TNodeTableWriter(std::move(output)))
+ , Descriptors_(std::move(descriptors))
+{ }
+
+TProtoTableWriter::~TProtoTableWriter()
+{ }
+
+size_t TProtoTableWriter::GetTableCount() const
+{
+ return NodeWriter_->GetTableCount();
+}
+
+void TProtoTableWriter::FinishTable(size_t tableIndex)
+{
+ NodeWriter_->FinishTable(tableIndex);
+}
+
+void TProtoTableWriter::AddRow(const Message& row, size_t tableIndex)
+{
+ NodeWriter_->AddRow(MakeNodeFromMessage(row), tableIndex);
+}
+
+void TProtoTableWriter::AddRow(Message&& row, size_t tableIndex)
+{
+ TProtoTableWriter::AddRow(row, tableIndex);
+}
+
+
+void TProtoTableWriter::Abort()
+{
+ NodeWriter_->Abort();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TLenvalProtoTableWriter::TLenvalProtoTableWriter(
+ THolder<IProxyOutput> output,
+ TVector<const Descriptor*>&& descriptors)
+ : Output_(std::move(output))
+ , Descriptors_(std::move(descriptors))
+{ }
+
+TLenvalProtoTableWriter::~TLenvalProtoTableWriter()
+{ }
+
+size_t TLenvalProtoTableWriter::GetTableCount() const
+{
+ return Output_->GetStreamCount();
+}
+
+void TLenvalProtoTableWriter::FinishTable(size_t tableIndex)
+{
+ Output_->GetStream(tableIndex)->Finish();
+}
+
+void TLenvalProtoTableWriter::AddRow(const Message& row, size_t tableIndex)
+{
+ ValidateProtoDescriptor(row, tableIndex, Descriptors_, false);
+
+ Y_VERIFY(row.GetReflection()->GetUnknownFields(row).empty(),
+ "Message has unknown fields. This probably means bug in client code.\n"
+ "Message: %s", row.DebugString().data());
+
+ auto* stream = Output_->GetStream(tableIndex);
+ i32 size = row.ByteSize();
+ stream->Write(&size, sizeof(size));
+ bool serializedOk = row.SerializeToArcadiaStream(stream);
+ Y_ENSURE(serializedOk, "Failed to serialize protobuf message");
+ Output_->OnRowFinished(tableIndex);
+}
+
+void TLenvalProtoTableWriter::AddRow(Message&& row, size_t tableIndex)
+{
+ TLenvalProtoTableWriter::AddRow(row, tableIndex);
+}
+
+void TLenvalProtoTableWriter::Abort()
+{
+ Output_->Abort();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/io/proto_table_writer.h b/yt/cpp/mapreduce/io/proto_table_writer.h
new file mode 100644
index 0000000000..a6df69e6ae
--- /dev/null
+++ b/yt/cpp/mapreduce/io/proto_table_writer.h
@@ -0,0 +1,61 @@
+#pragma once
+
+#include <yt/cpp/mapreduce/interface/io.h>
+
+namespace NYT {
+
+class IProxyOutput;
+class TNodeTableWriter;
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TProtoTableWriter
+ : public IProtoWriterImpl
+{
+public:
+ TProtoTableWriter(
+ THolder<IProxyOutput> output,
+ TVector<const ::google::protobuf::Descriptor*>&& descriptors);
+ ~TProtoTableWriter() override;
+
+ void AddRow(const Message& row, size_t tableIndex) override;
+ void AddRow(Message&& row, size_t tableIndex) override;
+
+ size_t GetTableCount() const override;
+ void FinishTable(size_t) override;
+ void Abort() override;
+
+private:
+ THolder<TNodeTableWriter> NodeWriter_;
+ TVector<const ::google::protobuf::Descriptor*> Descriptors_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TLenvalProtoTableWriter
+ : public IProtoWriterImpl
+{
+public:
+ TLenvalProtoTableWriter(
+ THolder<IProxyOutput> output,
+ TVector<const ::google::protobuf::Descriptor*>&& descriptors);
+ ~TLenvalProtoTableWriter() override;
+
+ void AddRow(const Message& row, size_t tableIndex) override;
+ void AddRow(Message&& row, size_t tableIndex) override;
+
+ size_t GetTableCount() const override;
+ void FinishTable(size_t) override;
+ void Abort() override;
+
+private:
+ THolder<IProxyOutput> Output_;
+ TVector<const ::google::protobuf::Descriptor*> Descriptors_;
+};
+
+// Sometime useful outside mapreduce/yt
+TNode MakeNodeFromMessage(const ::google::protobuf::Message& row);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp b/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp
new file mode 100644
index 0000000000..8da3b2da31
--- /dev/null
+++ b/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp
@@ -0,0 +1,232 @@
+#include "skiff_row_table_reader.h"
+
+#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
+
+#include <yt/cpp/mapreduce/interface/skiff_row.h>
+
+#include <library/cpp/skiff/skiff.h>
+
+#include <library/cpp/yt/logging/logger.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TSkiffRowTableReader::TSkiffRowTableReader(
+ ::TIntrusivePtr<TRawTableReader> input,
+ const NSkiff::TSkiffSchemaPtr& schema,
+ TVector<ISkiffRowSkipperPtr>&& skippers,
+ NDetail::TCreateSkiffSchemaOptions&& options)
+ : Input_(std::move(input))
+ , BufferedInput_(&Input_)
+ , Parser_({schema, &BufferedInput_})
+ , Skippers_(std::move(skippers))
+ , Options_(std::move(options))
+{
+ Next();
+}
+
+TSkiffRowTableReader::~TSkiffRowTableReader()
+{ }
+
+bool TSkiffRowTableReader::Retry()
+{
+ if (PrepareRetry()) {
+ RowTaken_ = true;
+ Next();
+ return true;
+ }
+ return false;
+}
+
+bool TSkiffRowTableReader::PrepareRetry()
+{
+ if (Input_.Retry(RangeIndex_, RowIndex_)) {
+ RowIndex_.Clear();
+ RangeIndex_.Clear();
+ BufferedInput_ = TBufferedInput(&Input_);
+ Parser_.emplace(&BufferedInput_);
+ return true;
+ }
+ return false;
+}
+
+void TSkiffRowTableReader::ReadRow(const ISkiffRowParserPtr& parser)
+{
+ while (true) {
+ try {
+ parser->Parse(&Parser_.value());
+ RowTaken_ = true;
+
+ // We successfully parsed one more row from the stream,
+ // so reset retry count to their initial value.
+ Input_.ResetRetries();
+
+ break;
+ } catch (const std::exception& ex) {
+ YT_LOG_ERROR("Read error during parsing: %v", ex.what());
+
+ if (!Retry()) {
+ throw;
+ }
+ }
+ }
+}
+
+bool TSkiffRowTableReader::IsValid() const
+{
+ return Valid_;
+}
+
+void TSkiffRowTableReader::SkipRow()
+{
+ CheckValidity();
+ while (true) {
+ try {
+ Skippers_[TableIndex_]->SkipRow(&Parser_.value());
+
+ break;
+ } catch (const std::exception& ex) {
+ YT_LOG_ERROR("Read error during skipping row: %v", ex.what());
+
+ if (!Retry()) {
+ throw;
+ }
+ }
+ }
+}
+
+void TSkiffRowTableReader::CheckValidity() const {
+ if (!IsValid()) {
+ ythrow yexception() << "Iterator is not valid";
+ }
+}
+
+void TSkiffRowTableReader::Next()
+{
+ if (!RowTaken_) {
+ SkipRow();
+ }
+
+ CheckValidity();
+
+ if (Y_UNLIKELY(Finished_ || !Parser_->HasMoreData())) {
+ Finished_ = true;
+ Valid_ = false;
+ return;
+ }
+
+ if (AfterKeySwitch_) {
+ AfterKeySwitch_ = false;
+ return;
+ }
+
+ if (RowIndex_) {
+ ++*RowIndex_;
+ }
+
+ while (true) {
+ try {
+ auto tag = Parser_->ParseVariant16Tag();
+ if (tag == NSkiff::EndOfSequenceTag<ui16>()) {
+ IsEndOfStream_ = true;
+ break;
+ } else {
+ TableIndex_ = tag;
+ }
+
+ if (TableIndex_ >= Skippers_.size()) {
+ ythrow TIOException() <<
+ "Table index " << TableIndex_ <<
+ " is out of range [0, " << Skippers_.size() <<
+ ") in read";
+ }
+
+ if (Options_.HasKeySwitch_) {
+ auto keySwitch = Parser_->ParseBoolean();
+ if (keySwitch) {
+ AfterKeySwitch_ = true;
+ Valid_ = false;
+ }
+ }
+
+ auto tagRowIndex = Parser_->ParseVariant8Tag();
+ if (tagRowIndex == 1) {
+ RowIndex_ = Parser_->ParseInt64();
+ } else {
+ Y_ENSURE(tagRowIndex == 0, "Tag for row_index was expected to be 0 or 1, got " << tagRowIndex);
+ }
+
+ if (Options_.HasRangeIndex_) {
+ auto tagRangeIndex = Parser_->ParseVariant8Tag();
+ if (tagRangeIndex == 1) {
+ RangeIndex_ = Parser_->ParseInt64();
+ } else {
+ Y_ENSURE(tagRangeIndex == 0, "Tag for range_index was expected to be 0 or 1, got " << tagRangeIndex);
+ }
+ }
+
+ break;
+ } catch (const std::exception& ex) {
+ YT_LOG_ERROR("Read error: %v", ex.what());
+
+ if (!PrepareRetry()) {
+ throw;
+ }
+ }
+ }
+
+ RowTaken_ = false;
+}
+
+ui32 TSkiffRowTableReader::GetTableIndex() const
+{
+ CheckValidity();
+ return TableIndex_;
+}
+
+ui32 TSkiffRowTableReader::GetRangeIndex() const
+{
+ CheckValidity();
+ return RangeIndex_.GetOrElse(0);
+}
+
+ui64 TSkiffRowTableReader::GetRowIndex() const
+{
+ CheckValidity();
+ return RowIndex_.GetOrElse(0ULL);
+}
+
+void TSkiffRowTableReader::NextKey() {
+ while (Valid_) {
+ Next();
+ }
+
+ if (Finished_) {
+ return;
+ }
+
+ Valid_ = true;
+
+ if (RowIndex_) {
+ --*RowIndex_;
+ }
+
+ RowTaken_ = true;
+}
+
+TMaybe<size_t> TSkiffRowTableReader::GetReadByteCount() const {
+ return Input_.GetReadByteCount();
+}
+
+bool TSkiffRowTableReader::IsEndOfStream() const {
+ return IsEndOfStream_;
+}
+
+bool TSkiffRowTableReader::IsRawReaderExhausted() const {
+ return Finished_;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/io/skiff_row_table_reader.h b/yt/cpp/mapreduce/io/skiff_row_table_reader.h
new file mode 100644
index 0000000000..368968266c
--- /dev/null
+++ b/yt/cpp/mapreduce/io/skiff_row_table_reader.h
@@ -0,0 +1,67 @@
+#pragma once
+
+#include "counting_raw_reader.h"
+
+#include <yt/cpp/mapreduce/client/skiff.h>
+
+#include <yt/cpp/mapreduce/interface/io.h>
+
+#include <yt/cpp/mapreduce/skiff/unchecked_parser.h>
+
+#include <util/stream/buffered.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TSkiffRowTableReader
+ : public ISkiffRowReaderImpl
+{
+public:
+ explicit TSkiffRowTableReader(
+ ::TIntrusivePtr<TRawTableReader> input,
+ const NSkiff::TSkiffSchemaPtr& schema,
+ TVector<ISkiffRowSkipperPtr>&& skippers,
+ NDetail::TCreateSkiffSchemaOptions&& options);
+
+ ~TSkiffRowTableReader() override;
+
+ void ReadRow(const ISkiffRowParserPtr& parser) override;
+
+ bool IsValid() const override;
+ void Next() override;
+ ui32 GetTableIndex() const override;
+ ui32 GetRangeIndex() const override;
+ ui64 GetRowIndex() const override;
+ void NextKey() override;
+ TMaybe<size_t> GetReadByteCount() const override;
+ bool IsEndOfStream() const override;
+ bool IsRawReaderExhausted() const override;
+
+private:
+ bool Retry();
+ void SkipRow();
+ void CheckValidity() const;
+ bool PrepareRetry();
+
+private:
+ NDetail::TCountingRawTableReader Input_;
+ TBufferedInput BufferedInput_;
+ std::optional<NSkiff::TCheckedInDebugSkiffParser> Parser_;
+ TVector<ISkiffRowSkipperPtr> Skippers_;
+ NDetail::TCreateSkiffSchemaOptions Options_;
+
+ bool RowTaken_ = true;
+ bool Valid_ = true;
+ bool Finished_ = false;
+ bool AfterKeySwitch_ = false;
+ bool IsEndOfStream_ = false;
+
+ TMaybe<ui64> RowIndex_;
+ TMaybe<ui32> RangeIndex_;
+ ui32 TableIndex_ = 0;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/io/skiff_table_reader.cpp b/yt/cpp/mapreduce/io/skiff_table_reader.cpp
new file mode 100644
index 0000000000..51c20609f0
--- /dev/null
+++ b/yt/cpp/mapreduce/io/skiff_table_reader.cpp
@@ -0,0 +1,293 @@
+#include "skiff_table_reader.h"
+
+#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
+
+#include <library/cpp/yson/node/node_io.h>
+
+#include <yt/cpp/mapreduce/skiff/wire_type.h>
+#include <yt/cpp/mapreduce/skiff/skiff_schema.h>
+
+#include <util/string/cast.h>
+
+namespace NYT {
+namespace NDetail {
+namespace {
+
+////////////////////////////////////////////////////////////////////////////////
+
+enum EColumnType : i8
+{
+ Dense,
+ KeySwitch,
+ RangeIndex,
+ RowIndex
+};
+
+struct TSkiffColumnSchema
+{
+ EColumnType Type;
+ bool Required;
+ NSkiff::EWireType WireType;
+ TString Name;
+
+ TSkiffColumnSchema(EColumnType type, bool required, NSkiff::EWireType wireType, const TString& name)
+ : Type(type)
+ , Required(required)
+ , WireType(wireType)
+ , Name(name)
+ { }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace
+
+struct TSkiffTableReader::TSkiffTableSchema
+{
+ TVector<TSkiffColumnSchema> Columns;
+};
+
+TSkiffTableReader::TSkiffTableReader(
+ ::TIntrusivePtr<TRawTableReader> input,
+ const NSkiff::TSkiffSchemaPtr& schema)
+ : Input_(std::move(input))
+ , BufferedInput_(&Input_)
+ , Parser_(&BufferedInput_)
+ , Schemas_(CreateSkiffTableSchemas(schema))
+{
+ Next();
+}
+
+TSkiffTableReader::~TSkiffTableReader() = default;
+
+const TNode& TSkiffTableReader::GetRow() const
+{
+ EnsureValidity();
+ Y_ENSURE(!Row_.IsUndefined(), "Row is moved");
+ return Row_;
+}
+
+void TSkiffTableReader::MoveRow(TNode* result)
+{
+ EnsureValidity();
+ Y_ENSURE(!Row_.IsUndefined(), "Row is moved");
+ *result = std::move(Row_);
+ Row_ = TNode();
+}
+
+bool TSkiffTableReader::IsValid() const
+{
+ return Valid_;
+}
+
+void TSkiffTableReader::Next()
+{
+ EnsureValidity();
+ if (Y_UNLIKELY(Finished_ || !Parser_->HasMoreData())) {
+ Finished_ = true;
+ Valid_ = false;
+ return;
+ }
+
+ if (AfterKeySwitch_) {
+ AfterKeySwitch_ = false;
+ return;
+ }
+
+ while (true) {
+ try {
+ ReadRow();
+ break;
+ } catch (const std::exception& exception) {
+ YT_LOG_ERROR("Read error: %v", exception.what());
+ if (!Input_.Retry(RangeIndex_, RowIndex_)) {
+ throw;
+ }
+ BufferedInput_ = TBufferedInput(&Input_);
+ Parser_.emplace(NSkiff::TUncheckedSkiffParser(&BufferedInput_));
+ RangeIndex_.Clear();
+ RowIndex_.Clear();
+ }
+ }
+}
+
+ui32 TSkiffTableReader::GetTableIndex() const
+{
+ EnsureValidity();
+ return TableIndex_;
+}
+
+ui32 TSkiffTableReader::GetRangeIndex() const
+{
+ EnsureValidity();
+ return RangeIndex_.GetOrElse(0);
+}
+
+ui64 TSkiffTableReader::GetRowIndex() const
+{
+ EnsureValidity();
+ return RowIndex_.GetOrElse(0ULL);
+}
+
+void TSkiffTableReader::NextKey()
+{
+ while (Valid_) {
+ Next();
+ }
+
+ if (Finished_) {
+ return;
+ }
+
+ Valid_ = true;
+}
+
+TMaybe<size_t> TSkiffTableReader::GetReadByteCount() const
+{
+ return Input_.GetReadByteCount();
+}
+
+bool TSkiffTableReader::IsRawReaderExhausted() const
+{
+ return Finished_;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TVector<TSkiffTableReader::TSkiffTableSchema> TSkiffTableReader::CreateSkiffTableSchemas(
+ const NSkiff::TSkiffSchemaPtr& schema)
+{
+ using NSkiff::EWireType;
+
+ constexpr auto keySwitchColumnName = "$key_switch";
+ constexpr auto rangeIndexColumnName = "$range_index";
+ constexpr auto rowIndexColumnName = "$row_index";
+
+ static const THashMap<TString, TSkiffColumnSchema> specialColumns = {
+ {keySwitchColumnName, {EColumnType::KeySwitch, true, EWireType::Boolean, keySwitchColumnName}},
+ {rangeIndexColumnName, {EColumnType::RangeIndex, false, EWireType::Int64, rangeIndexColumnName}},
+ {rowIndexColumnName, {EColumnType::RowIndex, false, EWireType::Int64, rowIndexColumnName}},
+ };
+
+ Y_ENSURE(schema->GetWireType() == EWireType::Variant16,
+ "Expected 'variant16' wire type for schema, got '" << schema->GetWireType() << "'");
+ TVector<TSkiffTableSchema> result;
+ for (const auto& tableSchema : schema->GetChildren()) {
+ Y_ENSURE(tableSchema->GetWireType() == EWireType::Tuple,
+ "Expected 'tuple' wire type for table schema, got '" << tableSchema->GetWireType() << "'");
+ TVector<TSkiffColumnSchema> columns;
+ for (const auto& columnSchema : tableSchema->GetChildren()) {
+ if (columnSchema->GetName().StartsWith("$")) {
+ auto iter = specialColumns.find(columnSchema->GetName());
+ Y_ENSURE(iter != specialColumns.end(), "Unknown special column: " << columnSchema->GetName());
+ columns.push_back(iter->second);
+ } else {
+ auto wireType = columnSchema->GetWireType();
+ bool required = true;
+ if (wireType == EWireType::Variant8) {
+ const auto& children = columnSchema->GetChildren();
+ Y_ENSURE(
+ children.size() == 2 && children[0]->GetWireType() == EWireType::Nothing &&
+ NSkiff::IsSimpleType(children[1]->GetWireType()),
+ "Expected schema of form 'variant8<nothing, simple-type>', got "
+ << NSkiff::GetShortDebugString(columnSchema));
+ wireType = children[1]->GetWireType();
+ required = false;
+ }
+ Y_ENSURE(NSkiff::IsSimpleType(wireType),
+ "Expected column schema to be of simple type, got " << NSkiff::GetShortDebugString(columnSchema));
+ columns.emplace_back(
+ EColumnType::Dense,
+ required,
+ wireType,
+ columnSchema->GetName());
+ }
+ }
+ result.push_back({std::move(columns)});
+ }
+ return result;
+}
+
+void TSkiffTableReader::ReadRow()
+{
+ if (Row_.IsUndefined()) {
+ Row_ = TNode::CreateMap();
+ } else {
+ Row_.AsMap().clear();
+ }
+
+ if (RowIndex_) {
+ ++*RowIndex_;
+ }
+
+ TableIndex_ = Parser_->ParseVariant16Tag();
+ Y_ENSURE(TableIndex_ < Schemas_.size(), "Table index out of range: " << TableIndex_ << " >= " << Schemas_.size());
+ const auto& tableSchema = Schemas_[TableIndex_];
+
+ auto parse = [&](NSkiff::EWireType wireType) -> TNode {
+ switch (wireType) {
+ case NSkiff::EWireType::Int64:
+ return Parser_->ParseInt64();
+ case NSkiff::EWireType::Uint64:
+ return Parser_->ParseUint64();
+ case NSkiff::EWireType::Boolean:
+ return Parser_->ParseBoolean();
+ case NSkiff::EWireType::Double:
+ return Parser_->ParseDouble();
+ case NSkiff::EWireType::String32:
+ return Parser_->ParseString32();
+ case NSkiff::EWireType::Yson32:
+ return NodeFromYsonString(Parser_->ParseYson32());
+ case NSkiff::EWireType::Nothing:
+ return TNode::CreateEntity();
+ default:
+ Y_FAIL("Bad column wire type: '%s'", ::ToString(wireType).data());
+ }
+ };
+
+ for (const auto& columnSchema : tableSchema.Columns) {
+ if (!columnSchema.Required) {
+ auto tag = Parser_->ParseVariant8Tag();
+ if (tag == 0) {
+ if (columnSchema.Type == EColumnType::Dense) {
+ Row_[columnSchema.Name] = TNode::CreateEntity();
+ }
+ continue;
+ }
+ Y_ENSURE(tag == 1, "Tag for 'variant8<nothing," << columnSchema.WireType
+ << ">' expected to be 0 or 1, got " << tag);
+ }
+ auto value = parse(columnSchema.WireType);
+ switch (columnSchema.Type) {
+ case EColumnType::Dense:
+ Row_[columnSchema.Name] = std::move(value);
+ break;
+ case EColumnType::KeySwitch:
+ if (value.AsBool()) {
+ AfterKeySwitch_ = true;
+ Valid_ = false;
+ }
+ break;
+ case EColumnType::RangeIndex:
+ RangeIndex_ = value.AsInt64();
+ break;
+ case EColumnType::RowIndex:
+ RowIndex_ = value.AsInt64();
+ break;
+ default:
+ Y_FAIL("Bad column type: %d", static_cast<int>(columnSchema.Type));
+ }
+ }
+
+ // We successfully parsed one more row from the stream,
+ // so reset retry count to their initial value.
+ Input_.ResetRetries();
+}
+
+void TSkiffTableReader::EnsureValidity() const
+{
+ Y_ENSURE(Valid_, "Iterator is not valid");
+}
+
+} // namespace NDetail
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/io/skiff_table_reader.h b/yt/cpp/mapreduce/io/skiff_table_reader.h
new file mode 100644
index 0000000000..95ece5f9c7
--- /dev/null
+++ b/yt/cpp/mapreduce/io/skiff_table_reader.h
@@ -0,0 +1,65 @@
+#pragma once
+
+#include "counting_raw_reader.h"
+
+#include <yt/cpp/mapreduce/interface/io.h>
+
+#include <yt/cpp/mapreduce/skiff/wire_type.h>
+#include <yt/cpp/mapreduce/skiff/unchecked_parser.h>
+
+#include <util/stream/buffered.h>
+
+namespace NYT {
+namespace NDetail {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TSkiffTableReader
+ : public INodeReaderImpl
+{
+public:
+ TSkiffTableReader(
+ ::TIntrusivePtr<TRawTableReader> input,
+ const std::shared_ptr<NSkiff::TSkiffSchema>& schema);
+ ~TSkiffTableReader() override;
+
+ virtual const TNode& GetRow() const override;
+ virtual void MoveRow(TNode* row) override;
+
+ bool IsValid() const override;
+ void Next() override;
+ ui32 GetTableIndex() const override;
+ ui32 GetRangeIndex() const override;
+ ui64 GetRowIndex() const override;
+ void NextKey() override;
+ TMaybe<size_t> GetReadByteCount() const override;
+ bool IsRawReaderExhausted() const override;
+
+private:
+ struct TSkiffTableSchema;
+
+private:
+ void EnsureValidity() const;
+ void ReadRow();
+ static TVector<TSkiffTableSchema> CreateSkiffTableSchemas(const std::shared_ptr<NSkiff::TSkiffSchema>& schema);
+
+private:
+ NDetail::TCountingRawTableReader Input_;
+ TBufferedInput BufferedInput_;
+ std::optional<NSkiff::TUncheckedSkiffParser> Parser_;
+ TVector<TSkiffTableSchema> Schemas_;
+
+ TNode Row_;
+
+ bool Valid_ = true;
+ bool AfterKeySwitch_ = false;
+ bool Finished_ = false;
+ TMaybe<ui64> RangeIndex_;
+ TMaybe<ui64> RowIndex_;
+ ui32 TableIndex_ = 0;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NDetail
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/io/stream_raw_reader.cpp b/yt/cpp/mapreduce/io/stream_raw_reader.cpp
new file mode 100644
index 0000000000..ec19b67d0b
--- /dev/null
+++ b/yt/cpp/mapreduce/io/stream_raw_reader.cpp
@@ -0,0 +1,59 @@
+#include "stream_table_reader.h"
+
+#include "node_table_reader.h"
+#include "proto_table_reader.h"
+#include "skiff_table_reader.h"
+#include "yamr_table_reader.h"
+
+#include <util/system/env.h>
+#include <util/string/type.h>
+
+namespace NYT {
+
+template <>
+TTableReaderPtr<TNode> CreateTableReader<TNode>(
+ IInputStream* stream, const TTableReaderOptions& /*options*/)
+{
+ auto impl = ::MakeIntrusive<TNodeTableReader>(
+ ::MakeIntrusive<NDetail::TInputStreamProxy>(stream));
+ return new TTableReader<TNode>(impl);
+}
+
+template <>
+TTableReaderPtr<TYaMRRow> CreateTableReader<TYaMRRow>(
+ IInputStream* stream, const TTableReaderOptions& /*options*/)
+{
+ auto impl = ::MakeIntrusive<TYaMRTableReader>(
+ ::MakeIntrusive<NDetail::TInputStreamProxy>(stream));
+ return new TTableReader<TYaMRRow>(impl);
+}
+
+
+namespace NDetail {
+
+////////////////////////////////////////////////////////////////////////////////
+
+::TIntrusivePtr<IProtoReaderImpl> CreateProtoReader(
+ IInputStream* stream,
+ const TTableReaderOptions& /* options */,
+ const ::google::protobuf::Descriptor* descriptor)
+{
+ return new TLenvalProtoTableReader(
+ ::MakeIntrusive<TInputStreamProxy>(stream),
+ {descriptor});
+}
+
+::TIntrusivePtr<IProtoReaderImpl> CreateProtoReader(
+ IInputStream* stream,
+ const TTableReaderOptions& /* options */,
+ TVector<const ::google::protobuf::Descriptor*> descriptors)
+{
+ return new TLenvalProtoTableReader(
+ ::MakeIntrusive<TInputStreamProxy>(stream),
+ std::move(descriptors));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NDetail
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/io/stream_table_reader.h b/yt/cpp/mapreduce/io/stream_table_reader.h
new file mode 100644
index 0000000000..d799c63cf4
--- /dev/null
+++ b/yt/cpp/mapreduce/io/stream_table_reader.h
@@ -0,0 +1,65 @@
+#pragma once
+
+#include <yt/cpp/mapreduce/interface/io.h>
+
+namespace NYT {
+namespace NDetail {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TInputStreamProxy
+ : public TRawTableReader
+{
+public:
+ TInputStreamProxy(IInputStream* stream)
+ : Stream_(stream)
+ { }
+
+ bool Retry(const TMaybe<ui32>& /* rangeIndex */, const TMaybe<ui64>& /* rowIndex */) override
+ {
+ return false;
+ }
+
+ void ResetRetries() override
+ { }
+
+ bool HasRangeIndices() const override
+ {
+ return false;
+ }
+
+protected:
+ size_t DoRead(void* buf, size_t len) override
+ {
+ return Stream_->Read(buf, len);
+ }
+
+private:
+ IInputStream* Stream_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+::TIntrusivePtr<IProtoReaderImpl> CreateProtoReader(
+ IInputStream* stream,
+ const TTableReaderOptions& /* options */,
+ const ::google::protobuf::Descriptor* descriptor);
+
+::TIntrusivePtr<IProtoReaderImpl> CreateProtoReader(
+ IInputStream* stream,
+ const TTableReaderOptions& /* options */,
+ TVector<const ::google::protobuf::Descriptor*> descriptors);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NDetail
+
+template <>
+TTableReaderPtr<TNode> CreateTableReader<TNode>(
+ IInputStream* stream, const TTableReaderOptions& options);
+
+template <>
+TTableReaderPtr<TYaMRRow> CreateTableReader<TYaMRRow>(
+ IInputStream* stream, const TTableReaderOptions& /*options*/);
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/io/ya.make b/yt/cpp/mapreduce/io/ya.make
new file mode 100644
index 0000000000..d355e86850
--- /dev/null
+++ b/yt/cpp/mapreduce/io/ya.make
@@ -0,0 +1,33 @@
+LIBRARY()
+
+INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc)
+
+SRCS(
+ counting_raw_reader.cpp
+ job_reader.cpp
+ job_writer.cpp
+ lenval_table_reader.cpp
+ node_table_reader.cpp
+ node_table_writer.cpp
+ proto_helpers.cpp
+ proto_table_reader.cpp
+ proto_table_writer.cpp
+ skiff_row_table_reader.cpp
+ skiff_table_reader.cpp
+ stream_raw_reader.cpp
+ yamr_table_reader.cpp
+ yamr_table_writer.cpp
+)
+
+PEERDIR(
+ contrib/libs/protobuf
+ library/cpp/yson
+ yt/cpp/mapreduce/common
+ yt/cpp/mapreduce/interface
+ yt/cpp/mapreduce/interface/logging
+ yt/yt_proto/yt/formats
+ library/cpp/yson/node
+ yt/cpp/mapreduce/skiff
+)
+
+END()
diff --git a/yt/cpp/mapreduce/io/yamr_table_reader.cpp b/yt/cpp/mapreduce/io/yamr_table_reader.cpp
new file mode 100644
index 0000000000..6204738e10
--- /dev/null
+++ b/yt/cpp/mapreduce/io/yamr_table_reader.cpp
@@ -0,0 +1,145 @@
+#include "yamr_table_reader.h"
+
+#include <yt/cpp/mapreduce/common/helpers.h>
+#include <yt/cpp/mapreduce/common/retry_lib.h>
+#include <yt/cpp/mapreduce/raw_client/raw_requests.h>
+
+////////////////////////////////////////////////////////////////////
+
+static void CheckedSkip(IInputStream* input, size_t byteCount)
+{
+ size_t skipped = input->Skip(byteCount);
+ Y_ENSURE(skipped == byteCount, "Premature end of YaMR stream");
+}
+
+////////////////////////////////////////////////////////////////////
+
+namespace NYT {
+
+using namespace NYT::NDetail::NRawClient;
+
+////////////////////////////////////////////////////////////////////////////////
+
+TYaMRTableReader::TYaMRTableReader(::TIntrusivePtr<TRawTableReader> input)
+ : TLenvalTableReader(std::move(input))
+{ }
+
+TYaMRTableReader::~TYaMRTableReader()
+{ }
+
+const TYaMRRow& TYaMRTableReader::GetRow() const
+{
+ CheckValidity();
+ if (!RowTaken_) {
+ const_cast<TYaMRTableReader*>(this)->ReadRow();
+ }
+ return Row_;
+}
+
+bool TYaMRTableReader::IsValid() const
+{
+ return Valid_;
+}
+
+void TYaMRTableReader::Next()
+{
+ TLenvalTableReader::Next();
+}
+
+void TYaMRTableReader::NextKey()
+{
+ TLenvalTableReader::NextKey();
+}
+
+ui32 TYaMRTableReader::GetTableIndex() const
+{
+ return TLenvalTableReader::GetTableIndex();
+}
+
+ui32 TYaMRTableReader::GetRangeIndex() const
+{
+ return TLenvalTableReader::GetRangeIndex();
+}
+
+ui64 TYaMRTableReader::GetRowIndex() const
+{
+ return TLenvalTableReader::GetRowIndex();
+}
+
+TMaybe<size_t> TYaMRTableReader::GetReadByteCount() const
+{
+ return TLenvalTableReader::GetReadByteCount();
+}
+
+bool TYaMRTableReader::IsEndOfStream() const
+{
+ return TLenvalTableReader::IsEndOfStream();
+}
+
+bool TYaMRTableReader::IsRawReaderExhausted() const
+{
+ return TLenvalTableReader::IsRawReaderExhausted();
+}
+
+void TYaMRTableReader::ReadField(TString* result, i32 length)
+{
+ result->resize(length);
+ size_t count = Input_.Load(result->begin(), length);
+ Y_ENSURE(count == static_cast<size_t>(length), "Premature end of YaMR stream");
+}
+
+void TYaMRTableReader::ReadRow()
+{
+ while (true) {
+ try {
+ i32 value = static_cast<i32>(Length_);
+ ReadField(&Key_, value);
+ Row_.Key = Key_;
+
+ ReadInteger(&value);
+ ReadField(&SubKey_, value);
+ Row_.SubKey = SubKey_;
+
+ ReadInteger(&value);
+ ReadField(&Value_, value);
+ Row_.Value = Value_;
+
+ RowTaken_ = true;
+
+ // We successfully parsed one more row from the stream,
+ // so reset retry count to their initial value.
+ Input_.ResetRetries();
+
+ break;
+ } catch (const std::exception& ) {
+ if (!TLenvalTableReader::Retry()) {
+ throw;
+ }
+ }
+ }
+}
+
+void TYaMRTableReader::SkipRow()
+{
+ while (true) {
+ try {
+ i32 value = static_cast<i32>(Length_);
+ CheckedSkip(&Input_, value);
+
+ ReadInteger(&value);
+ CheckedSkip(&Input_, value);
+
+ ReadInteger(&value);
+ CheckedSkip(&Input_, value);
+ break;
+ } catch (const std::exception& ) {
+ if (!TLenvalTableReader::Retry()) {
+ throw;
+ }
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/io/yamr_table_reader.h b/yt/cpp/mapreduce/io/yamr_table_reader.h
new file mode 100644
index 0000000000..39fdecfa71
--- /dev/null
+++ b/yt/cpp/mapreduce/io/yamr_table_reader.h
@@ -0,0 +1,48 @@
+#pragma once
+
+#include "lenval_table_reader.h"
+
+#include <yt/cpp/mapreduce/interface/io.h>
+
+namespace NYT {
+
+class TRawTableReader;
+struct TClientContext;
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TYaMRTableReader
+ : public IYaMRReaderImpl
+ , public TLenvalTableReader
+{
+public:
+ explicit TYaMRTableReader(::TIntrusivePtr<TRawTableReader> input);
+ ~TYaMRTableReader() override;
+
+ const TYaMRRow& GetRow() const override;
+
+ bool IsValid() const override;
+ void Next() override;
+ ui32 GetTableIndex() const override;
+ ui32 GetRangeIndex() const override;
+ ui64 GetRowIndex() const override;
+ void NextKey() override;
+ TMaybe<size_t> GetReadByteCount() const override;
+ bool IsEndOfStream() const override;
+ bool IsRawReaderExhausted() const override;
+
+private:
+ void ReadField(TString* result, i32 length);
+
+ void ReadRow();
+ void SkipRow() override;
+
+ TYaMRRow Row_;
+ TString Key_;
+ TString SubKey_;
+ TString Value_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/io/yamr_table_writer.cpp b/yt/cpp/mapreduce/io/yamr_table_writer.cpp
new file mode 100644
index 0000000000..cce7ceb0f0
--- /dev/null
+++ b/yt/cpp/mapreduce/io/yamr_table_writer.cpp
@@ -0,0 +1,53 @@
+#include "yamr_table_writer.h"
+
+#include <yt/cpp/mapreduce/interface/io.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TYaMRTableWriter::TYaMRTableWriter(THolder<IProxyOutput> output)
+ : Output_(std::move(output))
+{ }
+
+TYaMRTableWriter::~TYaMRTableWriter()
+{ }
+
+size_t TYaMRTableWriter::GetTableCount() const
+{
+ return Output_->GetStreamCount();
+}
+
+void TYaMRTableWriter::FinishTable(size_t tableIndex) {
+ Output_->GetStream(tableIndex)->Finish();
+}
+
+void TYaMRTableWriter::AddRow(const TYaMRRow& row, size_t tableIndex)
+{
+ auto* stream = Output_->GetStream(tableIndex);
+
+ auto writeField = [&stream] (const TStringBuf& field) {
+ i32 length = static_cast<i32>(field.length());
+ stream->Write(&length, sizeof(length));
+ stream->Write(field.data(), field.length());
+ };
+
+ writeField(row.Key);
+ writeField(row.SubKey);
+ writeField(row.Value);
+
+ Output_->OnRowFinished(tableIndex);
+}
+
+void TYaMRTableWriter::AddRow(TYaMRRow&& row, size_t tableIndex) {
+ TYaMRTableWriter::AddRow(row, tableIndex);
+}
+
+void TYaMRTableWriter::Abort()
+{
+ Output_->Abort();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/io/yamr_table_writer.h b/yt/cpp/mapreduce/io/yamr_table_writer.h
new file mode 100644
index 0000000000..cf88eaf287
--- /dev/null
+++ b/yt/cpp/mapreduce/io/yamr_table_writer.h
@@ -0,0 +1,31 @@
+#pragma once
+
+#include <yt/cpp/mapreduce/interface/io.h>
+
+namespace NYT {
+
+class IProxyOutput;
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TYaMRTableWriter
+ : public IYaMRWriterImpl
+{
+public:
+ explicit TYaMRTableWriter(THolder<IProxyOutput> output);
+ ~TYaMRTableWriter() override;
+
+ void AddRow(const TYaMRRow& row, size_t tableIndex) override;
+ void AddRow(TYaMRRow&& row, size_t tableIndex) override;
+
+ size_t GetTableCount() const override;
+ void FinishTable(size_t) override;
+ void Abort() override;
+
+private:
+ THolder<IProxyOutput> Output_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT