diff options
author | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
---|---|---|
committer | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
commit | fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch) | |
tree | b8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/io | |
parent | 7bf166b1a7ed0af927f230022b245af618e998c1 (diff) | |
download | ydb-fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a.tar.gz |
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text.
This commit has zero effect on all projects that depend on YQL.
The summary of changes:
- `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library;
- `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes;
- `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`;
- `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`;
- `yql/core` is gone;
- `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`.
**NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/io')
30 files changed, 3084 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/io/counting_raw_reader.cpp b/yt/cpp/mapreduce/io/counting_raw_reader.cpp new file mode 100644 index 0000000000..6a918bdddb --- /dev/null +++ b/yt/cpp/mapreduce/io/counting_raw_reader.cpp @@ -0,0 +1,38 @@ +#include "counting_raw_reader.h" + +namespace NYT { +namespace NDetail { + +//////////////////////////////////////////////////////////////////////////////// + +bool TCountingRawTableReader::Retry(const TMaybe<ui32>& rangeIndex, const TMaybe<ui64>& rowIndex) +{ + return Reader_->Retry(rangeIndex, rowIndex); +} + +void TCountingRawTableReader::ResetRetries() +{ + Reader_->ResetRetries(); +} + +bool TCountingRawTableReader::HasRangeIndices() const +{ + return Reader_->HasRangeIndices(); +} + +size_t TCountingRawTableReader::GetReadByteCount() const +{ + return ReadByteCount_; +} + +size_t TCountingRawTableReader::DoRead(void* buf, size_t len) +{ + auto readLen = Reader_->Read(buf, len); + ReadByteCount_ += readLen; + return readLen; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NDetail +} // namespace NYT diff --git a/yt/cpp/mapreduce/io/counting_raw_reader.h b/yt/cpp/mapreduce/io/counting_raw_reader.h new file mode 100644 index 0000000000..3b6705c5e4 --- /dev/null +++ b/yt/cpp/mapreduce/io/counting_raw_reader.h @@ -0,0 +1,31 @@ +#pragma once + +#include <yt/cpp/mapreduce/interface/io.h> + +namespace NYT { +namespace NDetail { + +class TCountingRawTableReader + final : public TRawTableReader +{ +public: + TCountingRawTableReader(::TIntrusivePtr<TRawTableReader> reader) + : Reader_(std::move(reader)) + { } + + bool Retry(const TMaybe<ui32>& rangeIndex, const TMaybe<ui64>& rowIndex) override; + void ResetRetries() override; + bool HasRangeIndices() const override; + + size_t GetReadByteCount() const; + +protected: + size_t DoRead(void* buf, size_t len) override; + +private: + ::TIntrusivePtr<TRawTableReader> Reader_; + size_t ReadByteCount_ = 0; +}; + +} // namespace NDetail +} // namespace NYT diff --git a/yt/cpp/mapreduce/io/helpers.h b/yt/cpp/mapreduce/io/helpers.h new file mode 100644 index 0000000000..5dbbf20906 --- /dev/null +++ b/yt/cpp/mapreduce/io/helpers.h @@ -0,0 +1,130 @@ +#pragma once + +#include <yt/cpp/mapreduce/interface/io.h> +#include <yt/cpp/mapreduce/interface/config.h> +#include <yt/cpp/mapreduce/common/helpers.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +template <class TOptions> +struct TIOOptionsTraits; + +template <> +struct TIOOptionsTraits<TFileReaderOptions> +{ + static constexpr const char* const ConfigName = "file_reader"; +}; +template <> +struct TIOOptionsTraits<TFileWriterOptions> +{ + static constexpr const char* const ConfigName = "file_writer"; +}; +template <> +struct TIOOptionsTraits<TTableReaderOptions> +{ + static constexpr const char* const ConfigName = "table_reader"; +}; +template <> +struct TIOOptionsTraits<TTableWriterOptions> +{ + static constexpr const char* const ConfigName = "table_writer"; +}; + +template <class TOptions> +TNode FormIORequestParameters( + const TRichYPath& path, + const TOptions& options) +{ + auto params = PathToParamNode(path); + if (options.Config_) { + params[TIOOptionsTraits<TOptions>::ConfigName] = *options.Config_; + } + return params; +} + +template <> +inline TNode FormIORequestParameters( + const TRichYPath& path, + const TFileReaderOptions& options) +{ + auto params = PathToParamNode(path); + if (options.Config_) { + params[TIOOptionsTraits<TTableReaderOptions>::ConfigName] = *options.Config_; + } + if (options.Offset_) { + params["offset"] = *options.Offset_; + } + if (options.Length_) { + params["length"] = *options.Length_; + } + return params; +} + +static void AddWriterOptionsToNode(const TWriterOptions& options, TNode* node) +{ + if (options.EnableEarlyFinish_) { + (*node)["enable_early_finish"] = *options.EnableEarlyFinish_; + } + if (options.UploadReplicationFactor_) { + (*node)["upload_replication_factor"] = *options.UploadReplicationFactor_; + } + if (options.MinUploadReplicationFactor_) { + (*node)["min_upload_replication_factor"] = *options.MinUploadReplicationFactor_; + } + if (options.DesiredChunkSize_) { + (*node)["desired_chunk_size"] = *options.DesiredChunkSize_; + } +} + +template <> +inline TNode FormIORequestParameters( + const TRichYPath& path, + const TFileWriterOptions& options) +{ + auto params = PathToParamNode(path); + TNode fileWriter = TNode::CreateMap(); + if (options.Config_) { + fileWriter = *options.Config_; + } + if (options.WriterOptions_) { + AddWriterOptionsToNode(*options.WriterOptions_, &fileWriter); + } + if (fileWriter.Empty()) { + AddWriterOptionsToNode( + TWriterOptions() + .EnableEarlyFinish(true) + .UploadReplicationFactor(3) + .MinUploadReplicationFactor(2), + &fileWriter); + } + params[TIOOptionsTraits<TFileWriterOptions>::ConfigName] = fileWriter; + if (options.ComputeMD5_) { + params["compute_md5"] = *options.ComputeMD5_; + } + return params; +} + +template <> +inline TNode FormIORequestParameters( + const TRichYPath& path, + const TTableWriterOptions& options) +{ + auto params = PathToParamNode(path); + auto tableWriter = TConfig::Get()->TableWriter; + if (options.Config_) { + MergeNodes(tableWriter, *options.Config_); + } + if (options.WriterOptions_) { + AddWriterOptionsToNode(*options.WriterOptions_, &tableWriter); + } + if (!tableWriter.Empty()) { + params[TIOOptionsTraits<TTableWriterOptions>::ConfigName] = std::move(tableWriter); + } + return params; +} + +//////////////////////////////////////////////////////////////////////////////// + +} diff --git a/yt/cpp/mapreduce/io/job_reader.cpp b/yt/cpp/mapreduce/io/job_reader.cpp new file mode 100644 index 0000000000..39056f00e2 --- /dev/null +++ b/yt/cpp/mapreduce/io/job_reader.cpp @@ -0,0 +1,46 @@ +#include "job_reader.h" + +#include <yt/cpp/mapreduce/interface/logging/yt_log.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +TJobReader::TJobReader(int fd) + : TJobReader(Duplicate(fd)) +{ } + +TJobReader::TJobReader(const TFile& file) + : FdFile_(file) + , FdInput_(FdFile_) + , BufferedInput_(&FdInput_, BUFFER_SIZE) +{ } + +bool TJobReader::Retry(const TMaybe<ui32>& /*rangeIndex*/, const TMaybe<ui64>& /*rowIndex*/) +{ + return false; +} + +void TJobReader::ResetRetries() +{ } + +bool TJobReader::HasRangeIndices() const +{ + return true; +} + +size_t TJobReader::DoRead(void* buf, size_t len) +{ + return BufferedInput_.Read(buf, len); +} + +//////////////////////////////////////////////////////////////////////////////// + +TRawTableReaderPtr CreateRawJobReader(int fd) +{ + return ::MakeIntrusive<TJobReader>(fd); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/io/job_reader.h b/yt/cpp/mapreduce/io/job_reader.h new file mode 100644 index 0000000000..ce62ec180f --- /dev/null +++ b/yt/cpp/mapreduce/io/job_reader.h @@ -0,0 +1,38 @@ +#pragma once + +#include <yt/cpp/mapreduce/interface/io.h> + +#include <util/stream/buffered.h> +#include <util/stream/file.h> +#include <util/system/file.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +class TJobReader + : public TRawTableReader +{ +public: + explicit TJobReader(int fd); + explicit TJobReader(const TFile& file); + + virtual bool Retry( const TMaybe<ui32>& /*rangeIndex*/, const TMaybe<ui64>& /*rowIndex*/) override; + virtual void ResetRetries() override; + virtual bool HasRangeIndices() const override; + +protected: + size_t DoRead(void* buf, size_t len) override; + +private: + TFile FdFile_; + TUnbufferedFileInput FdInput_; + TBufferedInput BufferedInput_; + + static const size_t BUFFER_SIZE = 64 << 10; +}; + +//////////////////////////////////////////////////////////////////////////////// + + +} // namespace NYT diff --git a/yt/cpp/mapreduce/io/job_writer.cpp b/yt/cpp/mapreduce/io/job_writer.cpp new file mode 100644 index 0000000000..d08bb0a665 --- /dev/null +++ b/yt/cpp/mapreduce/io/job_writer.cpp @@ -0,0 +1,68 @@ +#include "job_writer.h" + +#include <yt/cpp/mapreduce/interface/io.h> + +#include <util/system/file.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +TJobWriter::TStream::TStream(int fd) + : TStream(Duplicate(fd)) +{ } + +TJobWriter::TStream::TStream(const TFile& file) + : FdFile(file) + , FdOutput(FdFile) + , BufferedOutput(&FdOutput, BUFFER_SIZE) +{ } + +TJobWriter::TStream::~TStream() +{ +} + +//////////////////////////////////////////////////////////////////////////////// + +TJobWriter::TJobWriter(size_t outputTableCount) +{ + for (size_t i = 0; i < outputTableCount; ++i) { + Streams_.emplace_back(MakeHolder<TStream>(int(i * 3 + 1))); + } +} + +TJobWriter::TJobWriter(const TVector<TFile>& fileList) +{ + for (const auto& f : fileList) { + Streams_.emplace_back(MakeHolder<TStream>(f)); + } +} + +size_t TJobWriter::GetStreamCount() const +{ + return Streams_.size(); +} + +IOutputStream* TJobWriter::GetStream(size_t tableIndex) const +{ + if (tableIndex >= Streams_.size()) { + ythrow TIOException() << + "Table index " << tableIndex << + " is out of range [0, " << Streams_.size() << ")"; + } + return &Streams_[tableIndex]->BufferedOutput; +} + +void TJobWriter::OnRowFinished(size_t) +{ } + +//////////////////////////////////////////////////////////////////////////////// + +THolder<IProxyOutput> CreateRawJobWriter(size_t outputTableCount) +{ + return ::MakeHolder<TJobWriter>(outputTableCount); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/io/job_writer.h b/yt/cpp/mapreduce/io/job_writer.h new file mode 100644 index 0000000000..9b24650640 --- /dev/null +++ b/yt/cpp/mapreduce/io/job_writer.h @@ -0,0 +1,43 @@ +#pragma once + +#include <yt/cpp/mapreduce/interface/io.h> + +#include <util/generic/vector.h> +#include <util/generic/ptr.h> +#include <util/stream/file.h> +#include <util/stream/buffered.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +class TJobWriter + : public IProxyOutput +{ +public: + explicit TJobWriter(size_t outputTableCount); + explicit TJobWriter(const TVector<TFile>& fileList); + + size_t GetStreamCount() const override; + IOutputStream* GetStream(size_t tableIndex) const override; + void OnRowFinished(size_t tableIndex) override; + +private: + struct TStream { + TFile FdFile; + TUnbufferedFileOutput FdOutput; + TBufferedOutput BufferedOutput; + + explicit TStream(int fd); + explicit TStream(const TFile& file); + ~TStream(); + + static const size_t BUFFER_SIZE = 1 << 20; + }; + + TVector<THolder<TStream>> Streams_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/io/lenval_table_reader.cpp b/yt/cpp/mapreduce/io/lenval_table_reader.cpp new file mode 100644 index 0000000000..98274c7996 --- /dev/null +++ b/yt/cpp/mapreduce/io/lenval_table_reader.cpp @@ -0,0 +1,198 @@ +#include "lenval_table_reader.h" + +#include <yt/cpp/mapreduce/common/helpers.h> + +#include <yt/cpp/mapreduce/interface/logging/yt_log.h> + +#include <util/string/printf.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +const i32 CONTROL_ATTR_TABLE_INDEX = -1; +const i32 CONTROL_ATTR_KEY_SWITCH = -2; +const i32 CONTROL_ATTR_RANGE_INDEX = -3; +const i32 CONTROL_ATTR_ROW_INDEX = -4; +const i32 CONTROL_ATTR_END_OF_STREAM = -5; +const i32 CONTROL_ATTR_TABLET_INDEX = -6; + +//////////////////////////////////////////////////////////////////////////////// + +TLenvalTableReader::TLenvalTableReader(::TIntrusivePtr<TRawTableReader> input) + : Input_(std::move(input)) +{ + TLenvalTableReader::Next(); +} + +TLenvalTableReader::~TLenvalTableReader() +{ } + +void TLenvalTableReader::CheckValidity() const +{ + if (!IsValid()) { + ythrow yexception() << "Iterator is not valid"; + } +} + +bool TLenvalTableReader::IsValid() const +{ + return Valid_; +} + +void TLenvalTableReader::Next() +{ + if (!RowTaken_) { + SkipRow(); + } + + CheckValidity(); + + if (RowIndex_) { + ++*RowIndex_; + } + + while (true) { + try { + i32 value = 0; + if (!ReadInteger(&value, true)) { + return; + } + + while (value < 0 && !IsEndOfStream_) { + switch (value) { + case CONTROL_ATTR_KEY_SWITCH: + if (!AtStart_) { + Valid_ = false; + return; + } else { + ReadInteger(&value); + } + break; + + case CONTROL_ATTR_TABLE_INDEX: { + ui32 tmp = 0; + ReadInteger(&tmp); + TableIndex_ = tmp; + ReadInteger(&value); + break; + } + case CONTROL_ATTR_ROW_INDEX: { + ui64 tmp = 0; + ReadInteger(&tmp); + RowIndex_ = tmp; + ReadInteger(&value); + break; + } + case CONTROL_ATTR_RANGE_INDEX: { + ui32 tmp = 0; + ReadInteger(&tmp); + RangeIndex_ = tmp; + ReadInteger(&value); + break; + } + case CONTROL_ATTR_TABLET_INDEX: { + ui64 tmp = 0; + ReadInteger(&tmp); + TabletIndex_ = tmp; + ReadInteger(&value); + break; + } + case CONTROL_ATTR_END_OF_STREAM: { + IsEndOfStream_ = true; + break; + } + default: + ythrow yexception() << + Sprintf("Invalid control integer %d in lenval stream", value); + } + } + + Length_ = static_cast<ui32>(value); + RowTaken_ = false; + AtStart_ = false; + } catch (const std::exception& e) { + if (!PrepareRetry()) { + throw; + } + continue; + } + break; + } +} + +bool TLenvalTableReader::Retry() +{ + if (PrepareRetry()) { + RowTaken_ = true; + Next(); + return true; + } + return false; +} + +void TLenvalTableReader::NextKey() +{ + while (Valid_) { + Next(); + } + + if (Finished_) { + return; + } + + Valid_ = true; + + if (RowIndex_) { + --*RowIndex_; + } + + RowTaken_ = true; +} + +ui32 TLenvalTableReader::GetTableIndex() const +{ + CheckValidity(); + return TableIndex_; +} + +ui32 TLenvalTableReader::GetRangeIndex() const +{ + CheckValidity(); + return RangeIndex_.GetOrElse(0); +} + +ui64 TLenvalTableReader::GetRowIndex() const +{ + CheckValidity(); + return RowIndex_.GetOrElse(0UL); +} + +TMaybe<size_t> TLenvalTableReader::GetReadByteCount() const +{ + return Input_.GetReadByteCount(); +} + +bool TLenvalTableReader::IsEndOfStream() const +{ + return IsEndOfStream_; +} + +bool TLenvalTableReader::IsRawReaderExhausted() const +{ + return Finished_; +} + +bool TLenvalTableReader::PrepareRetry() +{ + if (Input_.Retry(RangeIndex_, RowIndex_)) { + RowIndex_.Clear(); + RangeIndex_.Clear(); + return true; + } + return false; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/io/lenval_table_reader.h b/yt/cpp/mapreduce/io/lenval_table_reader.h new file mode 100644 index 0000000000..990fe0b756 --- /dev/null +++ b/yt/cpp/mapreduce/io/lenval_table_reader.h @@ -0,0 +1,67 @@ +#pragma once + +#include "counting_raw_reader.h" + +#include <yt/cpp/mapreduce/interface/io.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +class TLenvalTableReader +{ +public: + explicit TLenvalTableReader(::TIntrusivePtr<TRawTableReader> input); + virtual ~TLenvalTableReader(); + +protected: + bool IsValid() const; + void Next(); + ui32 GetTableIndex() const; + ui32 GetRangeIndex() const; + ui64 GetRowIndex() const; + void NextKey(); + TMaybe<size_t> GetReadByteCount() const; + bool IsEndOfStream() const; + bool IsRawReaderExhausted() const; + + void CheckValidity() const; + + bool Retry(); + + template <class T> + bool ReadInteger(T* result, bool acceptEndOfStream = false) + { + size_t count = Input_.Load(result, sizeof(T)); + if (acceptEndOfStream && count == 0) { + Finished_ = true; + Valid_ = false; + return false; + } + Y_ENSURE(count == sizeof(T), "Premature end of stream"); + return true; + } + + virtual void SkipRow() = 0; + +protected: + NDetail::TCountingRawTableReader Input_; + + bool Valid_ = true; + bool Finished_ = false; + ui32 TableIndex_ = 0; + TMaybe<ui64> RowIndex_; + TMaybe<ui32> RangeIndex_; + TMaybe<ui64> TabletIndex_; + bool IsEndOfStream_ = false; + bool AtStart_ = true; + bool RowTaken_ = true; + ui32 Length_ = 0; + +private: + bool PrepareRetry(); +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/io/node_table_reader.cpp b/yt/cpp/mapreduce/io/node_table_reader.cpp new file mode 100644 index 0000000000..d39e1398a5 --- /dev/null +++ b/yt/cpp/mapreduce/io/node_table_reader.cpp @@ -0,0 +1,375 @@ +#include "node_table_reader.h" + +#include <yt/cpp/mapreduce/common/node_builder.h> +#include <yt/cpp/mapreduce/common/wait_proxy.h> + +#include <yt/cpp/mapreduce/interface/logging/yt_log.h> + +#include <library/cpp/yson/parser.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +class TRowBuilder + : public ::NYson::TYsonConsumerBase +{ +public: + explicit TRowBuilder(TMaybe<TRowElement>* resultRow); + + void OnStringScalar(TStringBuf value) override; + void OnInt64Scalar(i64 value) override; + void OnUint64Scalar(ui64 value) override; + void OnDoubleScalar(double value) override; + void OnBooleanScalar(bool value) override; + void OnBeginList() override; + void OnEntity() override; + void OnListItem() override; + void OnEndList() override; + void OnBeginMap() override; + void OnKeyedItem(TStringBuf key) override; + void OnEndMap() override; + void OnBeginAttributes() override; + void OnEndAttributes() override; + + void Finalize(); + +private: + THolder<TNodeBuilder> Builder_; + TRowElement Row_; + int Depth_ = 0; + bool Started_ = false; + TMaybe<TRowElement>* ResultRow_; + + void SaveResultRow(); +}; + +TRowBuilder::TRowBuilder(TMaybe<TRowElement>* resultRow) + : ResultRow_(resultRow) +{ } + +void TRowBuilder::OnStringScalar(TStringBuf value) +{ + Row_.Size += sizeof(TNode) + sizeof(TString) + value.size(); + Builder_->OnStringScalar(value); +} + +void TRowBuilder::OnInt64Scalar(i64 value) +{ + Row_.Size += sizeof(TNode); + Builder_->OnInt64Scalar(value); +} + +void TRowBuilder::OnUint64Scalar(ui64 value) +{ + Row_.Size += sizeof(TNode); + Builder_->OnUint64Scalar(value); +} + +void TRowBuilder::OnDoubleScalar(double value) +{ + Row_.Size += sizeof(TNode); + Builder_->OnDoubleScalar(value); +} + +void TRowBuilder::OnBooleanScalar(bool value) +{ + Row_.Size += sizeof(TNode); + Builder_->OnBooleanScalar(value); +} + +void TRowBuilder::OnBeginList() +{ + ++Depth_; + Builder_->OnBeginList(); +} + +void TRowBuilder::OnEntity() +{ + Row_.Size += sizeof(TNode); + Builder_->OnEntity(); +} + +void TRowBuilder::OnListItem() +{ + if (Depth_ == 0) { + SaveResultRow(); + } else { + Builder_->OnListItem(); + } +} + +void TRowBuilder::OnEndList() +{ + --Depth_; + Builder_->OnEndList(); +} + +void TRowBuilder::OnBeginMap() +{ + ++Depth_; + Builder_->OnBeginMap(); +} + +void TRowBuilder::OnKeyedItem(TStringBuf key) +{ + Row_.Size += sizeof(TString) + key.size(); + Builder_->OnKeyedItem(key); +} + +void TRowBuilder::OnEndMap() +{ + --Depth_; + Builder_->OnEndMap(); +} + +void TRowBuilder::OnBeginAttributes() +{ + ++Depth_; + Builder_->OnBeginAttributes(); +} + +void TRowBuilder::OnEndAttributes() +{ + --Depth_; + Builder_->OnEndAttributes(); +} + +void TRowBuilder::SaveResultRow() +{ + if (!Started_) { + Started_ = true; + } else { + *ResultRow_ = std::move(Row_); + } + Row_.Reset(); + Builder_.Reset(new TNodeBuilder(&Row_.Node)); +} + +void TRowBuilder::Finalize() +{ + if (Started_) { + *ResultRow_ = std::move(Row_); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +TNodeTableReader::TNodeTableReader(::TIntrusivePtr<TRawTableReader> input) + : Input_(std::move(input)) +{ + PrepareParsing(); + Next(); +} + +TNodeTableReader::~TNodeTableReader() +{ +} + +void TNodeTableReader::ParseListFragmentItem() { + if (!Parser_->Parse()) { + Builder_->Finalize(); + IsLast_ = true; + } +} + +const TNode& TNodeTableReader::GetRow() const +{ + CheckValidity(); + if (!Row_) { + ythrow yexception() << "Row is moved"; + } + return Row_->Node; +} + +void TNodeTableReader::MoveRow(TNode* result) +{ + CheckValidity(); + if (!Row_) { + ythrow yexception() << "Row is moved"; + } + *result = std::move(Row_->Node); + Row_.Clear(); +} + +bool TNodeTableReader::IsValid() const +{ + return Valid_; +} + +void TNodeTableReader::Next() +{ + try { + NextImpl(); + } catch (const std::exception& ex) { + YT_LOG_ERROR("TNodeTableReader::Next failed: %v", ex.what()); + throw; + } +} + +void TNodeTableReader::NextImpl() +{ + CheckValidity(); + + if (RowIndex_) { + ++*RowIndex_; + } + + // At the begin of stream parser doesn't return a finished row. + ParseFirstListFragmentItem(); + + while (true) { + if (IsLast_) { + Finished_ = true; + Valid_ = false; + break; + } + + try { + ParseListFragmentItem(); + } catch (std::exception& ex) { + NeedParseFirst_ = true; + OnStreamError(std::current_exception(), ex.what()); + ParseFirstListFragmentItem(); + continue; + } + + Row_ = std::move(*NextRow_); + if (!Row_) { + throw yexception() << "No row in NextRow_"; + } + + // We successfully parsed one more row from the stream, + // so reset retry count to their initial value. + Input_.ResetRetries(); + + if (!Row_->Node.IsNull()) { + AtStart_ = false; + break; + } + + for (auto& entry : Row_->Node.GetAttributes().AsMap()) { + if (entry.first == "key_switch") { + if (!AtStart_) { + Valid_ = false; + } + } else if (entry.first == "table_index") { + TableIndex_ = static_cast<ui32>(entry.second.AsInt64()); + } else if (entry.first == "row_index") { + RowIndex_ = static_cast<ui64>(entry.second.AsInt64()); + } else if (entry.first == "range_index") { + RangeIndex_ = static_cast<ui32>(entry.second.AsInt64()); + } else if (entry.first == "tablet_index") { + TabletIndex_ = entry.second.AsInt64(); + } else if (entry.first == "end_of_stream") { + IsEndOfStream_ = true; + } + } + + if (!Valid_) { + break; + } + } +} + +void TNodeTableReader::ParseFirstListFragmentItem() +{ + while (NeedParseFirst_) { + try { + ParseListFragmentItem(); + NeedParseFirst_ = false; + break; + } catch (std::exception& ex) { + OnStreamError(std::current_exception(), ex.what()); + } + } +} + +ui32 TNodeTableReader::GetTableIndex() const +{ + CheckValidity(); + return TableIndex_; +} + +ui32 TNodeTableReader::GetRangeIndex() const +{ + CheckValidity(); + return RangeIndex_.GetOrElse(0); +} + +ui64 TNodeTableReader::GetRowIndex() const +{ + CheckValidity(); + return RowIndex_.GetOrElse(0UL); +} + +i64 TNodeTableReader::GetTabletIndex() const +{ + CheckValidity(); + return TabletIndex_.GetOrElse(0L); +} + +void TNodeTableReader::NextKey() +{ + while (Valid_) { + Next(); + } + + if (Finished_) { + return; + } + + Valid_ = true; + + if (RowIndex_) { + --*RowIndex_; + } +} + +TMaybe<size_t> TNodeTableReader::GetReadByteCount() const +{ + return Input_.GetReadByteCount(); +} + +bool TNodeTableReader::IsEndOfStream() const +{ + return IsEndOfStream_; +} + +bool TNodeTableReader::IsRawReaderExhausted() const +{ + return Finished_; +} + +//////////////////////////////////////////////////////////////////////////////// + +void TNodeTableReader::PrepareParsing() +{ + NextRow_.Clear(); + Builder_.Reset(new TRowBuilder(&NextRow_)); + Parser_.Reset(new ::NYson::TYsonListParser(Builder_.Get(), &Input_)); +} + +void TNodeTableReader::OnStreamError(std::exception_ptr exception, TString error) +{ + YT_LOG_ERROR("Read error: %v", error); + Exception_ = exception; + if (Input_.Retry(RangeIndex_, RowIndex_)) { + RowIndex_.Clear(); + RangeIndex_.Clear(); + PrepareParsing(); + } else { + std::rethrow_exception(Exception_); + } +} + +void TNodeTableReader::CheckValidity() const +{ + if (!Valid_) { + ythrow yexception() << "Iterator is not valid"; + } +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/io/node_table_reader.h b/yt/cpp/mapreduce/io/node_table_reader.h new file mode 100644 index 0000000000..4fe839eeb6 --- /dev/null +++ b/yt/cpp/mapreduce/io/node_table_reader.h @@ -0,0 +1,91 @@ +#pragma once + +#include "counting_raw_reader.h" + +#include <yt/cpp/mapreduce/interface/io.h> + +#include <library/cpp/yson/public.h> + +#include <util/stream/input.h> +#include <util/generic/buffer.h> +#include <util/system/event.h> +#include <util/system/thread.h> + +#include <atomic> + +namespace NYT { + +class TRawTableReader; +class TRowBuilder; + +//////////////////////////////////////////////////////////////////////////////// + +struct TRowElement +{ + TNode Node; + size_t Size = 0; + + void Reset() + { + Node = TNode(); + Size = 0; + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TNodeTableReader + : public INodeReaderImpl +{ +public: + explicit TNodeTableReader(::TIntrusivePtr<TRawTableReader> input); + ~TNodeTableReader() override; + + const TNode& GetRow() const override; + void MoveRow(TNode* result) override; + + bool IsValid() const override; + void Next() override; + ui32 GetTableIndex() const override; + ui32 GetRangeIndex() const override; + ui64 GetRowIndex() const override; + i64 GetTabletIndex() const override; + void NextKey() override; + TMaybe<size_t> GetReadByteCount() const override; + bool IsEndOfStream() const override; + bool IsRawReaderExhausted() const override; + +private: + void NextImpl(); + void OnStreamError(std::exception_ptr exception, TString error); + void CheckValidity() const; + void PrepareParsing(); + void ParseListFragmentItem(); + void ParseFirstListFragmentItem(); + +private: + NDetail::TCountingRawTableReader Input_; + + bool Valid_ = true; + bool Finished_ = false; + ui32 TableIndex_ = 0; + TMaybe<ui64> RowIndex_; + TMaybe<ui32> RangeIndex_; + TMaybe<i64> TabletIndex_; + bool IsEndOfStream_ = false; + bool AtStart_ = true; + + TMaybe<TRowElement> Row_; + TMaybe<TRowElement> NextRow_; + + THolder<TRowBuilder> Builder_; + THolder<::NYson::TYsonListParser> Parser_; + + std::exception_ptr Exception_; + bool NeedParseFirst_ = true; + bool IsLast_ = false; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/io/node_table_writer.cpp b/yt/cpp/mapreduce/io/node_table_writer.cpp new file mode 100644 index 0000000000..dcb5a0f5b5 --- /dev/null +++ b/yt/cpp/mapreduce/io/node_table_writer.cpp @@ -0,0 +1,72 @@ +#include "node_table_writer.h" + +#include <yt/cpp/mapreduce/common/node_visitor.h> + +#include <yt/cpp/mapreduce/interface/io.h> + +#include <yt/cpp/mapreduce/interface/logging/yt_log.h> + +#include <library/cpp/yson/writer.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +TNodeTableWriter::TNodeTableWriter(THolder<IProxyOutput> output, NYson::EYsonFormat format) + : Output_(std::move(output)) +{ + for (size_t i = 0; i < Output_->GetStreamCount(); ++i) { + Writers_.push_back( + MakeHolder<::NYson::TYsonWriter>(Output_->GetStream(i), format, NYT::NYson::EYsonType::ListFragment)); + } +} + +TNodeTableWriter::~TNodeTableWriter() +{ } + +size_t TNodeTableWriter::GetTableCount() const +{ + return Output_->GetStreamCount(); +} + +void TNodeTableWriter::FinishTable(size_t tableIndex) { + Output_->GetStream(tableIndex)->Finish(); +} + +void TNodeTableWriter::AddRow(const TNode& row, size_t tableIndex) +{ + if (row.HasAttributes()) { + ythrow TIOException() << "Row cannot have attributes"; + } + + static const TNode emptyMap = TNode::CreateMap(); + const TNode* outRow = &emptyMap; + if (row.GetType() != TNode::Undefined) { + if (!row.IsMap()) { + ythrow TIOException() << "Row should be a map node"; + } else { + outRow = &row; + } + } + + auto* writer = Writers_[tableIndex].Get(); + writer->OnListItem(); + + TNodeVisitor visitor(writer); + visitor.Visit(*outRow); + + Output_->OnRowFinished(tableIndex); +} + +void TNodeTableWriter::AddRow(TNode&& row, size_t tableIndex) { + AddRow(row, tableIndex); +} + +void TNodeTableWriter::Abort() +{ + Output_->Abort(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/io/node_table_writer.h b/yt/cpp/mapreduce/io/node_table_writer.h new file mode 100644 index 0000000000..4bf8cb2fe7 --- /dev/null +++ b/yt/cpp/mapreduce/io/node_table_writer.h @@ -0,0 +1,33 @@ +#pragma once + +#include <yt/cpp/mapreduce/interface/io.h> +#include <library/cpp/yson/public.h> + +namespace NYT { + +class IProxyOutput; + +//////////////////////////////////////////////////////////////////////////////// + +class TNodeTableWriter + : public INodeWriterImpl +{ +public: + explicit TNodeTableWriter(THolder<IProxyOutput> output, ::NYson::EYsonFormat format = ::NYson::EYsonFormat::Binary); + ~TNodeTableWriter() override; + + void AddRow(const TNode& row, size_t tableIndex) override; + void AddRow(TNode&& row, size_t tableIndex) override; + + size_t GetTableCount() const override; + void FinishTable(size_t) override; + void Abort() override; + +private: + THolder<IProxyOutput> Output_; + TVector<THolder<::NYson::TYsonWriter>> Writers_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/io/proto_helpers.cpp b/yt/cpp/mapreduce/io/proto_helpers.cpp new file mode 100644 index 0000000000..2ffbfd8d89 --- /dev/null +++ b/yt/cpp/mapreduce/io/proto_helpers.cpp @@ -0,0 +1,101 @@ +#include "proto_helpers.h" + +#include <yt/cpp/mapreduce/interface/io.h> +#include <yt/cpp/mapreduce/interface/fluent.h> + +#include <yt/yt_proto/yt/formats/extension.pb.h> + +#include <google/protobuf/descriptor.h> +#include <google/protobuf/descriptor.pb.h> +#include <google/protobuf/messagext.h> +#include <google/protobuf/io/coded_stream.h> + +#include <util/stream/str.h> +#include <util/stream/file.h> +#include <util/folder/path.h> + +namespace NYT { + +using ::google::protobuf::Message; +using ::google::protobuf::Descriptor; +using ::google::protobuf::DescriptorPool; + +using ::google::protobuf::io::CodedInputStream; +using ::google::protobuf::io::TCopyingInputStreamAdaptor; + +//////////////////////////////////////////////////////////////////////////////// + +namespace { + +TVector<const Descriptor*> GetJobDescriptors(const TString& fileName) +{ + TVector<const Descriptor*> descriptors; + if (!TFsPath(fileName).Exists()) { + ythrow TIOException() << + "Cannot load '" << fileName << "' file"; + } + + TIFStream input(fileName); + TString line; + while (input.ReadLine(line)) { + const auto* pool = DescriptorPool::generated_pool(); + const auto* descriptor = pool->FindMessageTypeByName(line); + descriptors.push_back(descriptor); + } + + return descriptors; +} + +} // namespace + +//////////////////////////////////////////////////////////////////////////////// + +TVector<const Descriptor*> GetJobInputDescriptors() +{ + return GetJobDescriptors("proto_input"); +} + +TVector<const Descriptor*> GetJobOutputDescriptors() +{ + return GetJobDescriptors("proto_output"); +} + +void ValidateProtoDescriptor( + const Message& row, + size_t tableIndex, + const TVector<const Descriptor*>& descriptors, + bool isRead) +{ + const char* direction = isRead ? "input" : "output"; + + if (tableIndex >= descriptors.size()) { + ythrow TIOException() << + "Table index " << tableIndex << + " is out of range [0, " << descriptors.size() << + ") in " << direction; + } + + if (row.GetDescriptor() != descriptors[tableIndex]) { + ythrow TIOException() << + "Invalid row of type " << row.GetDescriptor()->full_name() << + " at index " << tableIndex << + ", row of type " << descriptors[tableIndex]->full_name() << + " expected in " << direction; + } +} + +void ParseFromArcadiaStream(IInputStream* stream, Message& row, ui32 length) +{ + TLengthLimitedInput input(stream, length); + TCopyingInputStreamAdaptor adaptor(&input); + CodedInputStream codedStream(&adaptor); + codedStream.SetTotalBytesLimit(length + 1); + bool parsedOk = row.ParseFromCodedStream(&codedStream); + Y_ENSURE(parsedOk, "Failed to parse protobuf message"); + + Y_ENSURE(input.Left() == 0); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/io/proto_helpers.h b/yt/cpp/mapreduce/io/proto_helpers.h new file mode 100644 index 0000000000..9d1ec0027c --- /dev/null +++ b/yt/cpp/mapreduce/io/proto_helpers.h @@ -0,0 +1,36 @@ +#pragma once + +#include <yt/cpp/mapreduce/interface/node.h> + +namespace google { +namespace protobuf { + +class Message; +class Descriptor; + +} +} + +class IInputStream; + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +TVector<const ::google::protobuf::Descriptor*> GetJobInputDescriptors(); +TVector<const ::google::protobuf::Descriptor*> GetJobOutputDescriptors(); + +void ValidateProtoDescriptor( + const ::google::protobuf::Message& row, + size_t tableIndex, + const TVector<const ::google::protobuf::Descriptor*>& descriptors, + bool isRead); + +void ParseFromArcadiaStream( + IInputStream* stream, + ::google::protobuf::Message& row, + ui32 size); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/io/proto_table_reader.cpp b/yt/cpp/mapreduce/io/proto_table_reader.cpp new file mode 100644 index 0000000000..28a4bc8719 --- /dev/null +++ b/yt/cpp/mapreduce/io/proto_table_reader.cpp @@ -0,0 +1,305 @@ +#include "proto_table_reader.h" + +#include "node_table_reader.h" + +#include "proto_helpers.h" + +#include <yt/yt_proto/yt/formats/extension.pb.h> + +#include <util/string/escape.h> +#include <util/string/printf.h> + +namespace NYT { + +using ::google::protobuf::Descriptor; +using ::google::protobuf::FieldDescriptor; +using ::google::protobuf::EnumValueDescriptor; + +const TString& GetFieldColumnName(const FieldDescriptor* fieldDesc) { + const auto& columnName = fieldDesc->options().GetExtension(column_name); + if (!columnName.empty()) { + return columnName; + } + const auto& keyColumnName = fieldDesc->options().GetExtension(key_column_name); + if (!keyColumnName.empty()) { + return keyColumnName; + } + return fieldDesc->name(); +} + +void ReadMessageFromNode(const TNode& node, Message* row) +{ + auto* descriptor = row->GetDescriptor(); + auto* reflection = row->GetReflection(); + + int count = descriptor->field_count(); + for (int i = 0; i < count; ++i) { + auto* fieldDesc = descriptor->field(i); + + const auto& columnName = GetFieldColumnName(fieldDesc); + + const auto& nodeMap = node.AsMap(); + auto it = nodeMap.find(columnName); + if (it == nodeMap.end()) { + continue; // no such column + } + auto actualType = it->second.GetType(); + if (actualType == TNode::Null) { + continue; // null field + } + + auto checkType = [&columnName] (TNode::EType expected, TNode::EType actual) { + if (expected != actual) { + ythrow TNode::TTypeError() << "expected node type " << expected + << ", actual " << actual << " for node " << columnName.data(); + } + }; + + switch (fieldDesc->type()) { + case FieldDescriptor::TYPE_STRING: + case FieldDescriptor::TYPE_BYTES: + checkType(TNode::String, actualType); + reflection->SetString(row, fieldDesc, it->second.AsString()); + break; + case FieldDescriptor::TYPE_INT64: + case FieldDescriptor::TYPE_SINT64: + case FieldDescriptor::TYPE_SFIXED64: + checkType(TNode::Int64, actualType); + reflection->SetInt64(row, fieldDesc, it->second.AsInt64()); + break; + case FieldDescriptor::TYPE_INT32: + case FieldDescriptor::TYPE_SINT32: + case FieldDescriptor::TYPE_SFIXED32: + checkType(TNode::Int64, actualType); + reflection->SetInt32(row, fieldDesc, it->second.AsInt64()); + break; + case FieldDescriptor::TYPE_UINT64: + case FieldDescriptor::TYPE_FIXED64: + checkType(TNode::Uint64, actualType); + reflection->SetUInt64(row, fieldDesc, it->second.AsUint64()); + break; + case FieldDescriptor::TYPE_UINT32: + case FieldDescriptor::TYPE_FIXED32: + checkType(TNode::Uint64, actualType); + reflection->SetUInt32(row, fieldDesc, it->second.AsUint64()); + break; + case FieldDescriptor::TYPE_DOUBLE: + checkType(TNode::Double, actualType); + reflection->SetDouble(row, fieldDesc, it->second.AsDouble()); + break; + case FieldDescriptor::TYPE_FLOAT: + checkType(TNode::Double, actualType); + reflection->SetFloat(row, fieldDesc, it->second.AsDouble()); + break; + case FieldDescriptor::TYPE_BOOL: + checkType(TNode::Bool, actualType); + reflection->SetBool(row, fieldDesc, it->second.AsBool()); + break; + case FieldDescriptor::TYPE_ENUM: { + TNode::EType columnType = TNode::String; + for (const auto& flag : fieldDesc->options().GetRepeatedExtension(flags)) { + if (flag == EWrapperFieldFlag::ENUM_INT) { + columnType = TNode::Int64; + break; + } + } + checkType(columnType, actualType); + + const EnumValueDescriptor* valueDesc = nullptr; + TString stringValue; + if (columnType == TNode::String) { + const auto& value = it->second.AsString(); + valueDesc = fieldDesc->enum_type()->FindValueByName(value); + stringValue = value; + } else if (columnType == TNode::Int64) { + const auto& value = it->second.AsInt64(); + valueDesc = fieldDesc->enum_type()->FindValueByNumber(value); + stringValue = ToString(value); + } else { + Y_FAIL(); + } + + if (valueDesc == nullptr) { + ythrow yexception() << "Failed to parse value '" << EscapeC(stringValue) << "' as " << fieldDesc->enum_type()->full_name(); + } + + reflection->SetEnum(row, fieldDesc, valueDesc); + + break; + } + case FieldDescriptor::TYPE_MESSAGE: { + checkType(TNode::String, actualType); + Message* message = reflection->MutableMessage(row, fieldDesc); + if (!message->ParseFromArray(it->second.AsString().data(), it->second.AsString().size())) { + ythrow yexception() << "Failed to parse protobuf message"; + } + break; + } + default: + ythrow yexception() << "Incorrect protobuf type"; + } + } +} + +//////////////////////////////////////////////////////////////////////////////// + +TProtoTableReader::TProtoTableReader( + ::TIntrusivePtr<TRawTableReader> input, + TVector<const Descriptor*>&& descriptors) + : NodeReader_(new TNodeTableReader(std::move(input))) + , Descriptors_(std::move(descriptors)) +{ } + +TProtoTableReader::~TProtoTableReader() +{ } + +void TProtoTableReader::ReadRow(Message* row) +{ + const auto& node = NodeReader_->GetRow(); + ReadMessageFromNode(node, row); +} + +bool TProtoTableReader::IsValid() const +{ + return NodeReader_->IsValid(); +} + +void TProtoTableReader::Next() +{ + NodeReader_->Next(); +} + +ui32 TProtoTableReader::GetTableIndex() const +{ + return NodeReader_->GetTableIndex(); +} + +ui32 TProtoTableReader::GetRangeIndex() const +{ + return NodeReader_->GetRangeIndex(); +} + +ui64 TProtoTableReader::GetRowIndex() const +{ + return NodeReader_->GetRowIndex(); +} + +void TProtoTableReader::NextKey() +{ + NodeReader_->NextKey(); +} + +TMaybe<size_t> TProtoTableReader::GetReadByteCount() const +{ + return NodeReader_->GetReadByteCount(); +} + +bool TProtoTableReader::IsEndOfStream() const +{ + return NodeReader_->IsEndOfStream(); +} + +bool TProtoTableReader::IsRawReaderExhausted() const +{ + return NodeReader_->IsRawReaderExhausted(); +} + +//////////////////////////////////////////////////////////////////////////////// + +TLenvalProtoTableReader::TLenvalProtoTableReader( + ::TIntrusivePtr<TRawTableReader> input, + TVector<const Descriptor*>&& descriptors) + : TLenvalTableReader(std::move(input)) + , Descriptors_(std::move(descriptors)) +{ } + +TLenvalProtoTableReader::~TLenvalProtoTableReader() +{ } + +void TLenvalProtoTableReader::ReadRow(Message* row) +{ + ValidateProtoDescriptor(*row, GetTableIndex(), Descriptors_, true); + + while (true) { + try { + ParseFromArcadiaStream(&Input_, *row, Length_); + RowTaken_ = true; + + // We successfully parsed one more row from the stream, + // so reset retry count to their initial value. + Input_.ResetRetries(); + + break; + } catch (const std::exception& ) { + if (!TLenvalTableReader::Retry()) { + throw; + } + } + } +} + +bool TLenvalProtoTableReader::IsValid() const +{ + return TLenvalTableReader::IsValid(); +} + +void TLenvalProtoTableReader::Next() +{ + TLenvalTableReader::Next(); +} + +ui32 TLenvalProtoTableReader::GetTableIndex() const +{ + return TLenvalTableReader::GetTableIndex(); +} + +ui32 TLenvalProtoTableReader::GetRangeIndex() const +{ + return TLenvalTableReader::GetRangeIndex(); +} + +ui64 TLenvalProtoTableReader::GetRowIndex() const +{ + return TLenvalTableReader::GetRowIndex(); +} + +void TLenvalProtoTableReader::NextKey() +{ + TLenvalTableReader::NextKey(); +} + +TMaybe<size_t> TLenvalProtoTableReader::GetReadByteCount() const +{ + return TLenvalTableReader::GetReadByteCount(); +} + +bool TLenvalProtoTableReader::IsEndOfStream() const +{ + return TLenvalTableReader::IsEndOfStream(); +} + +bool TLenvalProtoTableReader::IsRawReaderExhausted() const +{ + return TLenvalTableReader::IsRawReaderExhausted(); +} + +void TLenvalProtoTableReader::SkipRow() +{ + while (true) { + try { + size_t skipped = Input_.Skip(Length_); + if (skipped != Length_) { + ythrow yexception() << "Premature end of stream"; + } + break; + } catch (const std::exception& ) { + if (!TLenvalTableReader::Retry()) { + throw; + } + } + } +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/io/proto_table_reader.h b/yt/cpp/mapreduce/io/proto_table_reader.h new file mode 100644 index 0000000000..05a528b9c6 --- /dev/null +++ b/yt/cpp/mapreduce/io/proto_table_reader.h @@ -0,0 +1,76 @@ +#pragma once + +#include "lenval_table_reader.h" + +#include <yt/cpp/mapreduce/interface/io.h> + +namespace NYT { + +class TRawTableReader; +class TNodeTableReader; + +//////////////////////////////////////////////////////////////////////////////// + +class TProtoTableReader + : public IProtoReaderImpl +{ +public: + explicit TProtoTableReader( + ::TIntrusivePtr<TRawTableReader> input, + TVector<const ::google::protobuf::Descriptor*>&& descriptors); + ~TProtoTableReader() override; + + void ReadRow(Message* row) override; + + bool IsValid() const override; + void Next() override; + ui32 GetTableIndex() const override; + ui32 GetRangeIndex() const override; + ui64 GetRowIndex() const override; + void NextKey() override; + TMaybe<size_t> GetReadByteCount() const override; + bool IsEndOfStream() const override; + bool IsRawReaderExhausted() const override; + +private: + THolder<TNodeTableReader> NodeReader_; + TVector<const ::google::protobuf::Descriptor*> Descriptors_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TLenvalProtoTableReader + : public IProtoReaderImpl + , public TLenvalTableReader +{ +public: + explicit TLenvalProtoTableReader( + ::TIntrusivePtr<TRawTableReader> input, + TVector<const ::google::protobuf::Descriptor*>&& descriptors); + ~TLenvalProtoTableReader() override; + + void ReadRow(Message* row) override; + + bool IsValid() const override; + void Next() override; + ui32 GetTableIndex() const override; + ui32 GetRangeIndex() const override; + ui64 GetRowIndex() const override; + void NextKey() override; + TMaybe<size_t> GetReadByteCount() const override; + bool IsEndOfStream() const override; + bool IsRawReaderExhausted() const override; + +protected: + void SkipRow() override; + +private: + TVector<const ::google::protobuf::Descriptor*> Descriptors_; +}; + +// Sometime useful outside mapreduce/yt +void ReadMessageFromNode(const TNode& node, Message* row); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/io/proto_table_writer.cpp b/yt/cpp/mapreduce/io/proto_table_writer.cpp new file mode 100644 index 0000000000..1ce7811625 --- /dev/null +++ b/yt/cpp/mapreduce/io/proto_table_writer.cpp @@ -0,0 +1,184 @@ +#include "proto_table_writer.h" + +#include "node_table_writer.h" +#include "proto_helpers.h" + +#include <yt/cpp/mapreduce/common/node_builder.h> + +#include <yt/cpp/mapreduce/interface/io.h> + +#include <yt/yt_proto/yt/formats/extension.pb.h> + +#include <google/protobuf/unknown_field_set.h> + +namespace NYT { + +using ::google::protobuf::Descriptor; +using ::google::protobuf::FieldDescriptor; + +//////////////////////////////////////////////////////////////////////////////// + +TNode MakeNodeFromMessage(const Message& row) +{ + TNode node; + TNodeBuilder builder(&node); + builder.OnBeginMap(); + + auto* descriptor = row.GetDescriptor(); + auto* reflection = row.GetReflection(); + + int count = descriptor->field_count(); + for (int i = 0; i < count; ++i) { + auto* fieldDesc = descriptor->field(i); + if (fieldDesc->is_repeated()) { + Y_ENSURE(reflection->FieldSize(row, fieldDesc) == 0, "Storing repeated protobuf fields is not supported yet"); + continue; + } else if (!reflection->HasField(row, fieldDesc)) { + continue; + } + + TString columnName = fieldDesc->options().GetExtension(column_name); + if (columnName.empty()) { + const auto& keyColumnName = fieldDesc->options().GetExtension(key_column_name); + columnName = keyColumnName.empty() ? fieldDesc->name() : keyColumnName; + } + + builder.OnKeyedItem(columnName); + + switch (fieldDesc->type()) { + case FieldDescriptor::TYPE_STRING: + case FieldDescriptor::TYPE_BYTES: + builder.OnStringScalar(reflection->GetString(row, fieldDesc)); + break; + case FieldDescriptor::TYPE_INT64: + case FieldDescriptor::TYPE_SINT64: + case FieldDescriptor::TYPE_SFIXED64: + builder.OnInt64Scalar(reflection->GetInt64(row, fieldDesc)); + break; + case FieldDescriptor::TYPE_INT32: + case FieldDescriptor::TYPE_SINT32: + case FieldDescriptor::TYPE_SFIXED32: + builder.OnInt64Scalar(reflection->GetInt32(row, fieldDesc)); + break; + case FieldDescriptor::TYPE_UINT64: + case FieldDescriptor::TYPE_FIXED64: + builder.OnUint64Scalar(reflection->GetUInt64(row, fieldDesc)); + break; + case FieldDescriptor::TYPE_UINT32: + case FieldDescriptor::TYPE_FIXED32: + builder.OnUint64Scalar(reflection->GetUInt32(row, fieldDesc)); + break; + case FieldDescriptor::TYPE_DOUBLE: + builder.OnDoubleScalar(reflection->GetDouble(row, fieldDesc)); + break; + case FieldDescriptor::TYPE_FLOAT: + builder.OnDoubleScalar(reflection->GetFloat(row, fieldDesc)); + break; + case FieldDescriptor::TYPE_BOOL: + builder.OnBooleanScalar(reflection->GetBool(row, fieldDesc)); + break; + case FieldDescriptor::TYPE_ENUM: + builder.OnStringScalar(reflection->GetEnum(row, fieldDesc)->name()); + break; + case FieldDescriptor::TYPE_MESSAGE: + builder.OnStringScalar(reflection->GetMessage(row, fieldDesc).SerializeAsString()); + break; + default: + ythrow yexception() << "Invalid field type for column: " << columnName; + break; + } + } + + builder.OnEndMap(); + return node; +} + +//////////////////////////////////////////////////////////////////////////////// + +TProtoTableWriter::TProtoTableWriter( + THolder<IProxyOutput> output, + TVector<const Descriptor*>&& descriptors) + : NodeWriter_(new TNodeTableWriter(std::move(output))) + , Descriptors_(std::move(descriptors)) +{ } + +TProtoTableWriter::~TProtoTableWriter() +{ } + +size_t TProtoTableWriter::GetTableCount() const +{ + return NodeWriter_->GetTableCount(); +} + +void TProtoTableWriter::FinishTable(size_t tableIndex) +{ + NodeWriter_->FinishTable(tableIndex); +} + +void TProtoTableWriter::AddRow(const Message& row, size_t tableIndex) +{ + NodeWriter_->AddRow(MakeNodeFromMessage(row), tableIndex); +} + +void TProtoTableWriter::AddRow(Message&& row, size_t tableIndex) +{ + TProtoTableWriter::AddRow(row, tableIndex); +} + + +void TProtoTableWriter::Abort() +{ + NodeWriter_->Abort(); +} + +//////////////////////////////////////////////////////////////////////////////// + +TLenvalProtoTableWriter::TLenvalProtoTableWriter( + THolder<IProxyOutput> output, + TVector<const Descriptor*>&& descriptors) + : Output_(std::move(output)) + , Descriptors_(std::move(descriptors)) +{ } + +TLenvalProtoTableWriter::~TLenvalProtoTableWriter() +{ } + +size_t TLenvalProtoTableWriter::GetTableCount() const +{ + return Output_->GetStreamCount(); +} + +void TLenvalProtoTableWriter::FinishTable(size_t tableIndex) +{ + Output_->GetStream(tableIndex)->Finish(); +} + +void TLenvalProtoTableWriter::AddRow(const Message& row, size_t tableIndex) +{ + ValidateProtoDescriptor(row, tableIndex, Descriptors_, false); + + Y_VERIFY(row.GetReflection()->GetUnknownFields(row).empty(), + "Message has unknown fields. This probably means bug in client code.\n" + "Message: %s", row.DebugString().data()); + + auto* stream = Output_->GetStream(tableIndex); + i32 size = row.ByteSize(); + stream->Write(&size, sizeof(size)); + bool serializedOk = row.SerializeToArcadiaStream(stream); + Y_ENSURE(serializedOk, "Failed to serialize protobuf message"); + Output_->OnRowFinished(tableIndex); +} + +void TLenvalProtoTableWriter::AddRow(Message&& row, size_t tableIndex) +{ + TLenvalProtoTableWriter::AddRow(row, tableIndex); +} + +void TLenvalProtoTableWriter::Abort() +{ + Output_->Abort(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/io/proto_table_writer.h b/yt/cpp/mapreduce/io/proto_table_writer.h new file mode 100644 index 0000000000..a6df69e6ae --- /dev/null +++ b/yt/cpp/mapreduce/io/proto_table_writer.h @@ -0,0 +1,61 @@ +#pragma once + +#include <yt/cpp/mapreduce/interface/io.h> + +namespace NYT { + +class IProxyOutput; +class TNodeTableWriter; + +//////////////////////////////////////////////////////////////////////////////// + +class TProtoTableWriter + : public IProtoWriterImpl +{ +public: + TProtoTableWriter( + THolder<IProxyOutput> output, + TVector<const ::google::protobuf::Descriptor*>&& descriptors); + ~TProtoTableWriter() override; + + void AddRow(const Message& row, size_t tableIndex) override; + void AddRow(Message&& row, size_t tableIndex) override; + + size_t GetTableCount() const override; + void FinishTable(size_t) override; + void Abort() override; + +private: + THolder<TNodeTableWriter> NodeWriter_; + TVector<const ::google::protobuf::Descriptor*> Descriptors_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TLenvalProtoTableWriter + : public IProtoWriterImpl +{ +public: + TLenvalProtoTableWriter( + THolder<IProxyOutput> output, + TVector<const ::google::protobuf::Descriptor*>&& descriptors); + ~TLenvalProtoTableWriter() override; + + void AddRow(const Message& row, size_t tableIndex) override; + void AddRow(Message&& row, size_t tableIndex) override; + + size_t GetTableCount() const override; + void FinishTable(size_t) override; + void Abort() override; + +private: + THolder<IProxyOutput> Output_; + TVector<const ::google::protobuf::Descriptor*> Descriptors_; +}; + +// Sometime useful outside mapreduce/yt +TNode MakeNodeFromMessage(const ::google::protobuf::Message& row); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp b/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp new file mode 100644 index 0000000000..8da3b2da31 --- /dev/null +++ b/yt/cpp/mapreduce/io/skiff_row_table_reader.cpp @@ -0,0 +1,232 @@ +#include "skiff_row_table_reader.h" + +#include <yt/cpp/mapreduce/interface/logging/yt_log.h> + +#include <yt/cpp/mapreduce/interface/skiff_row.h> + +#include <library/cpp/skiff/skiff.h> + +#include <library/cpp/yt/logging/logger.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +TSkiffRowTableReader::TSkiffRowTableReader( + ::TIntrusivePtr<TRawTableReader> input, + const NSkiff::TSkiffSchemaPtr& schema, + TVector<ISkiffRowSkipperPtr>&& skippers, + NDetail::TCreateSkiffSchemaOptions&& options) + : Input_(std::move(input)) + , BufferedInput_(&Input_) + , Parser_({schema, &BufferedInput_}) + , Skippers_(std::move(skippers)) + , Options_(std::move(options)) +{ + Next(); +} + +TSkiffRowTableReader::~TSkiffRowTableReader() +{ } + +bool TSkiffRowTableReader::Retry() +{ + if (PrepareRetry()) { + RowTaken_ = true; + Next(); + return true; + } + return false; +} + +bool TSkiffRowTableReader::PrepareRetry() +{ + if (Input_.Retry(RangeIndex_, RowIndex_)) { + RowIndex_.Clear(); + RangeIndex_.Clear(); + BufferedInput_ = TBufferedInput(&Input_); + Parser_.emplace(&BufferedInput_); + return true; + } + return false; +} + +void TSkiffRowTableReader::ReadRow(const ISkiffRowParserPtr& parser) +{ + while (true) { + try { + parser->Parse(&Parser_.value()); + RowTaken_ = true; + + // We successfully parsed one more row from the stream, + // so reset retry count to their initial value. + Input_.ResetRetries(); + + break; + } catch (const std::exception& ex) { + YT_LOG_ERROR("Read error during parsing: %v", ex.what()); + + if (!Retry()) { + throw; + } + } + } +} + +bool TSkiffRowTableReader::IsValid() const +{ + return Valid_; +} + +void TSkiffRowTableReader::SkipRow() +{ + CheckValidity(); + while (true) { + try { + Skippers_[TableIndex_]->SkipRow(&Parser_.value()); + + break; + } catch (const std::exception& ex) { + YT_LOG_ERROR("Read error during skipping row: %v", ex.what()); + + if (!Retry()) { + throw; + } + } + } +} + +void TSkiffRowTableReader::CheckValidity() const { + if (!IsValid()) { + ythrow yexception() << "Iterator is not valid"; + } +} + +void TSkiffRowTableReader::Next() +{ + if (!RowTaken_) { + SkipRow(); + } + + CheckValidity(); + + if (Y_UNLIKELY(Finished_ || !Parser_->HasMoreData())) { + Finished_ = true; + Valid_ = false; + return; + } + + if (AfterKeySwitch_) { + AfterKeySwitch_ = false; + return; + } + + if (RowIndex_) { + ++*RowIndex_; + } + + while (true) { + try { + auto tag = Parser_->ParseVariant16Tag(); + if (tag == NSkiff::EndOfSequenceTag<ui16>()) { + IsEndOfStream_ = true; + break; + } else { + TableIndex_ = tag; + } + + if (TableIndex_ >= Skippers_.size()) { + ythrow TIOException() << + "Table index " << TableIndex_ << + " is out of range [0, " << Skippers_.size() << + ") in read"; + } + + if (Options_.HasKeySwitch_) { + auto keySwitch = Parser_->ParseBoolean(); + if (keySwitch) { + AfterKeySwitch_ = true; + Valid_ = false; + } + } + + auto tagRowIndex = Parser_->ParseVariant8Tag(); + if (tagRowIndex == 1) { + RowIndex_ = Parser_->ParseInt64(); + } else { + Y_ENSURE(tagRowIndex == 0, "Tag for row_index was expected to be 0 or 1, got " << tagRowIndex); + } + + if (Options_.HasRangeIndex_) { + auto tagRangeIndex = Parser_->ParseVariant8Tag(); + if (tagRangeIndex == 1) { + RangeIndex_ = Parser_->ParseInt64(); + } else { + Y_ENSURE(tagRangeIndex == 0, "Tag for range_index was expected to be 0 or 1, got " << tagRangeIndex); + } + } + + break; + } catch (const std::exception& ex) { + YT_LOG_ERROR("Read error: %v", ex.what()); + + if (!PrepareRetry()) { + throw; + } + } + } + + RowTaken_ = false; +} + +ui32 TSkiffRowTableReader::GetTableIndex() const +{ + CheckValidity(); + return TableIndex_; +} + +ui32 TSkiffRowTableReader::GetRangeIndex() const +{ + CheckValidity(); + return RangeIndex_.GetOrElse(0); +} + +ui64 TSkiffRowTableReader::GetRowIndex() const +{ + CheckValidity(); + return RowIndex_.GetOrElse(0ULL); +} + +void TSkiffRowTableReader::NextKey() { + while (Valid_) { + Next(); + } + + if (Finished_) { + return; + } + + Valid_ = true; + + if (RowIndex_) { + --*RowIndex_; + } + + RowTaken_ = true; +} + +TMaybe<size_t> TSkiffRowTableReader::GetReadByteCount() const { + return Input_.GetReadByteCount(); +} + +bool TSkiffRowTableReader::IsEndOfStream() const { + return IsEndOfStream_; +} + +bool TSkiffRowTableReader::IsRawReaderExhausted() const { + return Finished_; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/io/skiff_row_table_reader.h b/yt/cpp/mapreduce/io/skiff_row_table_reader.h new file mode 100644 index 0000000000..368968266c --- /dev/null +++ b/yt/cpp/mapreduce/io/skiff_row_table_reader.h @@ -0,0 +1,67 @@ +#pragma once + +#include "counting_raw_reader.h" + +#include <yt/cpp/mapreduce/client/skiff.h> + +#include <yt/cpp/mapreduce/interface/io.h> + +#include <yt/cpp/mapreduce/skiff/unchecked_parser.h> + +#include <util/stream/buffered.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +class TSkiffRowTableReader + : public ISkiffRowReaderImpl +{ +public: + explicit TSkiffRowTableReader( + ::TIntrusivePtr<TRawTableReader> input, + const NSkiff::TSkiffSchemaPtr& schema, + TVector<ISkiffRowSkipperPtr>&& skippers, + NDetail::TCreateSkiffSchemaOptions&& options); + + ~TSkiffRowTableReader() override; + + void ReadRow(const ISkiffRowParserPtr& parser) override; + + bool IsValid() const override; + void Next() override; + ui32 GetTableIndex() const override; + ui32 GetRangeIndex() const override; + ui64 GetRowIndex() const override; + void NextKey() override; + TMaybe<size_t> GetReadByteCount() const override; + bool IsEndOfStream() const override; + bool IsRawReaderExhausted() const override; + +private: + bool Retry(); + void SkipRow(); + void CheckValidity() const; + bool PrepareRetry(); + +private: + NDetail::TCountingRawTableReader Input_; + TBufferedInput BufferedInput_; + std::optional<NSkiff::TCheckedInDebugSkiffParser> Parser_; + TVector<ISkiffRowSkipperPtr> Skippers_; + NDetail::TCreateSkiffSchemaOptions Options_; + + bool RowTaken_ = true; + bool Valid_ = true; + bool Finished_ = false; + bool AfterKeySwitch_ = false; + bool IsEndOfStream_ = false; + + TMaybe<ui64> RowIndex_; + TMaybe<ui32> RangeIndex_; + ui32 TableIndex_ = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/io/skiff_table_reader.cpp b/yt/cpp/mapreduce/io/skiff_table_reader.cpp new file mode 100644 index 0000000000..51c20609f0 --- /dev/null +++ b/yt/cpp/mapreduce/io/skiff_table_reader.cpp @@ -0,0 +1,293 @@ +#include "skiff_table_reader.h" + +#include <yt/cpp/mapreduce/interface/logging/yt_log.h> + +#include <library/cpp/yson/node/node_io.h> + +#include <yt/cpp/mapreduce/skiff/wire_type.h> +#include <yt/cpp/mapreduce/skiff/skiff_schema.h> + +#include <util/string/cast.h> + +namespace NYT { +namespace NDetail { +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +enum EColumnType : i8 +{ + Dense, + KeySwitch, + RangeIndex, + RowIndex +}; + +struct TSkiffColumnSchema +{ + EColumnType Type; + bool Required; + NSkiff::EWireType WireType; + TString Name; + + TSkiffColumnSchema(EColumnType type, bool required, NSkiff::EWireType wireType, const TString& name) + : Type(type) + , Required(required) + , WireType(wireType) + , Name(name) + { } +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace + +struct TSkiffTableReader::TSkiffTableSchema +{ + TVector<TSkiffColumnSchema> Columns; +}; + +TSkiffTableReader::TSkiffTableReader( + ::TIntrusivePtr<TRawTableReader> input, + const NSkiff::TSkiffSchemaPtr& schema) + : Input_(std::move(input)) + , BufferedInput_(&Input_) + , Parser_(&BufferedInput_) + , Schemas_(CreateSkiffTableSchemas(schema)) +{ + Next(); +} + +TSkiffTableReader::~TSkiffTableReader() = default; + +const TNode& TSkiffTableReader::GetRow() const +{ + EnsureValidity(); + Y_ENSURE(!Row_.IsUndefined(), "Row is moved"); + return Row_; +} + +void TSkiffTableReader::MoveRow(TNode* result) +{ + EnsureValidity(); + Y_ENSURE(!Row_.IsUndefined(), "Row is moved"); + *result = std::move(Row_); + Row_ = TNode(); +} + +bool TSkiffTableReader::IsValid() const +{ + return Valid_; +} + +void TSkiffTableReader::Next() +{ + EnsureValidity(); + if (Y_UNLIKELY(Finished_ || !Parser_->HasMoreData())) { + Finished_ = true; + Valid_ = false; + return; + } + + if (AfterKeySwitch_) { + AfterKeySwitch_ = false; + return; + } + + while (true) { + try { + ReadRow(); + break; + } catch (const std::exception& exception) { + YT_LOG_ERROR("Read error: %v", exception.what()); + if (!Input_.Retry(RangeIndex_, RowIndex_)) { + throw; + } + BufferedInput_ = TBufferedInput(&Input_); + Parser_.emplace(NSkiff::TUncheckedSkiffParser(&BufferedInput_)); + RangeIndex_.Clear(); + RowIndex_.Clear(); + } + } +} + +ui32 TSkiffTableReader::GetTableIndex() const +{ + EnsureValidity(); + return TableIndex_; +} + +ui32 TSkiffTableReader::GetRangeIndex() const +{ + EnsureValidity(); + return RangeIndex_.GetOrElse(0); +} + +ui64 TSkiffTableReader::GetRowIndex() const +{ + EnsureValidity(); + return RowIndex_.GetOrElse(0ULL); +} + +void TSkiffTableReader::NextKey() +{ + while (Valid_) { + Next(); + } + + if (Finished_) { + return; + } + + Valid_ = true; +} + +TMaybe<size_t> TSkiffTableReader::GetReadByteCount() const +{ + return Input_.GetReadByteCount(); +} + +bool TSkiffTableReader::IsRawReaderExhausted() const +{ + return Finished_; +} + +//////////////////////////////////////////////////////////////////////////////// + +TVector<TSkiffTableReader::TSkiffTableSchema> TSkiffTableReader::CreateSkiffTableSchemas( + const NSkiff::TSkiffSchemaPtr& schema) +{ + using NSkiff::EWireType; + + constexpr auto keySwitchColumnName = "$key_switch"; + constexpr auto rangeIndexColumnName = "$range_index"; + constexpr auto rowIndexColumnName = "$row_index"; + + static const THashMap<TString, TSkiffColumnSchema> specialColumns = { + {keySwitchColumnName, {EColumnType::KeySwitch, true, EWireType::Boolean, keySwitchColumnName}}, + {rangeIndexColumnName, {EColumnType::RangeIndex, false, EWireType::Int64, rangeIndexColumnName}}, + {rowIndexColumnName, {EColumnType::RowIndex, false, EWireType::Int64, rowIndexColumnName}}, + }; + + Y_ENSURE(schema->GetWireType() == EWireType::Variant16, + "Expected 'variant16' wire type for schema, got '" << schema->GetWireType() << "'"); + TVector<TSkiffTableSchema> result; + for (const auto& tableSchema : schema->GetChildren()) { + Y_ENSURE(tableSchema->GetWireType() == EWireType::Tuple, + "Expected 'tuple' wire type for table schema, got '" << tableSchema->GetWireType() << "'"); + TVector<TSkiffColumnSchema> columns; + for (const auto& columnSchema : tableSchema->GetChildren()) { + if (columnSchema->GetName().StartsWith("$")) { + auto iter = specialColumns.find(columnSchema->GetName()); + Y_ENSURE(iter != specialColumns.end(), "Unknown special column: " << columnSchema->GetName()); + columns.push_back(iter->second); + } else { + auto wireType = columnSchema->GetWireType(); + bool required = true; + if (wireType == EWireType::Variant8) { + const auto& children = columnSchema->GetChildren(); + Y_ENSURE( + children.size() == 2 && children[0]->GetWireType() == EWireType::Nothing && + NSkiff::IsSimpleType(children[1]->GetWireType()), + "Expected schema of form 'variant8<nothing, simple-type>', got " + << NSkiff::GetShortDebugString(columnSchema)); + wireType = children[1]->GetWireType(); + required = false; + } + Y_ENSURE(NSkiff::IsSimpleType(wireType), + "Expected column schema to be of simple type, got " << NSkiff::GetShortDebugString(columnSchema)); + columns.emplace_back( + EColumnType::Dense, + required, + wireType, + columnSchema->GetName()); + } + } + result.push_back({std::move(columns)}); + } + return result; +} + +void TSkiffTableReader::ReadRow() +{ + if (Row_.IsUndefined()) { + Row_ = TNode::CreateMap(); + } else { + Row_.AsMap().clear(); + } + + if (RowIndex_) { + ++*RowIndex_; + } + + TableIndex_ = Parser_->ParseVariant16Tag(); + Y_ENSURE(TableIndex_ < Schemas_.size(), "Table index out of range: " << TableIndex_ << " >= " << Schemas_.size()); + const auto& tableSchema = Schemas_[TableIndex_]; + + auto parse = [&](NSkiff::EWireType wireType) -> TNode { + switch (wireType) { + case NSkiff::EWireType::Int64: + return Parser_->ParseInt64(); + case NSkiff::EWireType::Uint64: + return Parser_->ParseUint64(); + case NSkiff::EWireType::Boolean: + return Parser_->ParseBoolean(); + case NSkiff::EWireType::Double: + return Parser_->ParseDouble(); + case NSkiff::EWireType::String32: + return Parser_->ParseString32(); + case NSkiff::EWireType::Yson32: + return NodeFromYsonString(Parser_->ParseYson32()); + case NSkiff::EWireType::Nothing: + return TNode::CreateEntity(); + default: + Y_FAIL("Bad column wire type: '%s'", ::ToString(wireType).data()); + } + }; + + for (const auto& columnSchema : tableSchema.Columns) { + if (!columnSchema.Required) { + auto tag = Parser_->ParseVariant8Tag(); + if (tag == 0) { + if (columnSchema.Type == EColumnType::Dense) { + Row_[columnSchema.Name] = TNode::CreateEntity(); + } + continue; + } + Y_ENSURE(tag == 1, "Tag for 'variant8<nothing," << columnSchema.WireType + << ">' expected to be 0 or 1, got " << tag); + } + auto value = parse(columnSchema.WireType); + switch (columnSchema.Type) { + case EColumnType::Dense: + Row_[columnSchema.Name] = std::move(value); + break; + case EColumnType::KeySwitch: + if (value.AsBool()) { + AfterKeySwitch_ = true; + Valid_ = false; + } + break; + case EColumnType::RangeIndex: + RangeIndex_ = value.AsInt64(); + break; + case EColumnType::RowIndex: + RowIndex_ = value.AsInt64(); + break; + default: + Y_FAIL("Bad column type: %d", static_cast<int>(columnSchema.Type)); + } + } + + // We successfully parsed one more row from the stream, + // so reset retry count to their initial value. + Input_.ResetRetries(); +} + +void TSkiffTableReader::EnsureValidity() const +{ + Y_ENSURE(Valid_, "Iterator is not valid"); +} + +} // namespace NDetail +} // namespace NYT diff --git a/yt/cpp/mapreduce/io/skiff_table_reader.h b/yt/cpp/mapreduce/io/skiff_table_reader.h new file mode 100644 index 0000000000..95ece5f9c7 --- /dev/null +++ b/yt/cpp/mapreduce/io/skiff_table_reader.h @@ -0,0 +1,65 @@ +#pragma once + +#include "counting_raw_reader.h" + +#include <yt/cpp/mapreduce/interface/io.h> + +#include <yt/cpp/mapreduce/skiff/wire_type.h> +#include <yt/cpp/mapreduce/skiff/unchecked_parser.h> + +#include <util/stream/buffered.h> + +namespace NYT { +namespace NDetail { + +//////////////////////////////////////////////////////////////////////////////// + +class TSkiffTableReader + : public INodeReaderImpl +{ +public: + TSkiffTableReader( + ::TIntrusivePtr<TRawTableReader> input, + const std::shared_ptr<NSkiff::TSkiffSchema>& schema); + ~TSkiffTableReader() override; + + virtual const TNode& GetRow() const override; + virtual void MoveRow(TNode* row) override; + + bool IsValid() const override; + void Next() override; + ui32 GetTableIndex() const override; + ui32 GetRangeIndex() const override; + ui64 GetRowIndex() const override; + void NextKey() override; + TMaybe<size_t> GetReadByteCount() const override; + bool IsRawReaderExhausted() const override; + +private: + struct TSkiffTableSchema; + +private: + void EnsureValidity() const; + void ReadRow(); + static TVector<TSkiffTableSchema> CreateSkiffTableSchemas(const std::shared_ptr<NSkiff::TSkiffSchema>& schema); + +private: + NDetail::TCountingRawTableReader Input_; + TBufferedInput BufferedInput_; + std::optional<NSkiff::TUncheckedSkiffParser> Parser_; + TVector<TSkiffTableSchema> Schemas_; + + TNode Row_; + + bool Valid_ = true; + bool AfterKeySwitch_ = false; + bool Finished_ = false; + TMaybe<ui64> RangeIndex_; + TMaybe<ui64> RowIndex_; + ui32 TableIndex_ = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NDetail +} // namespace NYT diff --git a/yt/cpp/mapreduce/io/stream_raw_reader.cpp b/yt/cpp/mapreduce/io/stream_raw_reader.cpp new file mode 100644 index 0000000000..ec19b67d0b --- /dev/null +++ b/yt/cpp/mapreduce/io/stream_raw_reader.cpp @@ -0,0 +1,59 @@ +#include "stream_table_reader.h" + +#include "node_table_reader.h" +#include "proto_table_reader.h" +#include "skiff_table_reader.h" +#include "yamr_table_reader.h" + +#include <util/system/env.h> +#include <util/string/type.h> + +namespace NYT { + +template <> +TTableReaderPtr<TNode> CreateTableReader<TNode>( + IInputStream* stream, const TTableReaderOptions& /*options*/) +{ + auto impl = ::MakeIntrusive<TNodeTableReader>( + ::MakeIntrusive<NDetail::TInputStreamProxy>(stream)); + return new TTableReader<TNode>(impl); +} + +template <> +TTableReaderPtr<TYaMRRow> CreateTableReader<TYaMRRow>( + IInputStream* stream, const TTableReaderOptions& /*options*/) +{ + auto impl = ::MakeIntrusive<TYaMRTableReader>( + ::MakeIntrusive<NDetail::TInputStreamProxy>(stream)); + return new TTableReader<TYaMRRow>(impl); +} + + +namespace NDetail { + +//////////////////////////////////////////////////////////////////////////////// + +::TIntrusivePtr<IProtoReaderImpl> CreateProtoReader( + IInputStream* stream, + const TTableReaderOptions& /* options */, + const ::google::protobuf::Descriptor* descriptor) +{ + return new TLenvalProtoTableReader( + ::MakeIntrusive<TInputStreamProxy>(stream), + {descriptor}); +} + +::TIntrusivePtr<IProtoReaderImpl> CreateProtoReader( + IInputStream* stream, + const TTableReaderOptions& /* options */, + TVector<const ::google::protobuf::Descriptor*> descriptors) +{ + return new TLenvalProtoTableReader( + ::MakeIntrusive<TInputStreamProxy>(stream), + std::move(descriptors)); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NDetail +} // namespace NYT diff --git a/yt/cpp/mapreduce/io/stream_table_reader.h b/yt/cpp/mapreduce/io/stream_table_reader.h new file mode 100644 index 0000000000..d799c63cf4 --- /dev/null +++ b/yt/cpp/mapreduce/io/stream_table_reader.h @@ -0,0 +1,65 @@ +#pragma once + +#include <yt/cpp/mapreduce/interface/io.h> + +namespace NYT { +namespace NDetail { + +//////////////////////////////////////////////////////////////////////////////// + +class TInputStreamProxy + : public TRawTableReader +{ +public: + TInputStreamProxy(IInputStream* stream) + : Stream_(stream) + { } + + bool Retry(const TMaybe<ui32>& /* rangeIndex */, const TMaybe<ui64>& /* rowIndex */) override + { + return false; + } + + void ResetRetries() override + { } + + bool HasRangeIndices() const override + { + return false; + } + +protected: + size_t DoRead(void* buf, size_t len) override + { + return Stream_->Read(buf, len); + } + +private: + IInputStream* Stream_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +::TIntrusivePtr<IProtoReaderImpl> CreateProtoReader( + IInputStream* stream, + const TTableReaderOptions& /* options */, + const ::google::protobuf::Descriptor* descriptor); + +::TIntrusivePtr<IProtoReaderImpl> CreateProtoReader( + IInputStream* stream, + const TTableReaderOptions& /* options */, + TVector<const ::google::protobuf::Descriptor*> descriptors); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NDetail + +template <> +TTableReaderPtr<TNode> CreateTableReader<TNode>( + IInputStream* stream, const TTableReaderOptions& options); + +template <> +TTableReaderPtr<TYaMRRow> CreateTableReader<TYaMRRow>( + IInputStream* stream, const TTableReaderOptions& /*options*/); + +} // namespace NYT diff --git a/yt/cpp/mapreduce/io/ya.make b/yt/cpp/mapreduce/io/ya.make new file mode 100644 index 0000000000..d355e86850 --- /dev/null +++ b/yt/cpp/mapreduce/io/ya.make @@ -0,0 +1,33 @@ +LIBRARY() + +INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc) + +SRCS( + counting_raw_reader.cpp + job_reader.cpp + job_writer.cpp + lenval_table_reader.cpp + node_table_reader.cpp + node_table_writer.cpp + proto_helpers.cpp + proto_table_reader.cpp + proto_table_writer.cpp + skiff_row_table_reader.cpp + skiff_table_reader.cpp + stream_raw_reader.cpp + yamr_table_reader.cpp + yamr_table_writer.cpp +) + +PEERDIR( + contrib/libs/protobuf + library/cpp/yson + yt/cpp/mapreduce/common + yt/cpp/mapreduce/interface + yt/cpp/mapreduce/interface/logging + yt/yt_proto/yt/formats + library/cpp/yson/node + yt/cpp/mapreduce/skiff +) + +END() diff --git a/yt/cpp/mapreduce/io/yamr_table_reader.cpp b/yt/cpp/mapreduce/io/yamr_table_reader.cpp new file mode 100644 index 0000000000..6204738e10 --- /dev/null +++ b/yt/cpp/mapreduce/io/yamr_table_reader.cpp @@ -0,0 +1,145 @@ +#include "yamr_table_reader.h" + +#include <yt/cpp/mapreduce/common/helpers.h> +#include <yt/cpp/mapreduce/common/retry_lib.h> +#include <yt/cpp/mapreduce/raw_client/raw_requests.h> + +//////////////////////////////////////////////////////////////////// + +static void CheckedSkip(IInputStream* input, size_t byteCount) +{ + size_t skipped = input->Skip(byteCount); + Y_ENSURE(skipped == byteCount, "Premature end of YaMR stream"); +} + +//////////////////////////////////////////////////////////////////// + +namespace NYT { + +using namespace NYT::NDetail::NRawClient; + +//////////////////////////////////////////////////////////////////////////////// + +TYaMRTableReader::TYaMRTableReader(::TIntrusivePtr<TRawTableReader> input) + : TLenvalTableReader(std::move(input)) +{ } + +TYaMRTableReader::~TYaMRTableReader() +{ } + +const TYaMRRow& TYaMRTableReader::GetRow() const +{ + CheckValidity(); + if (!RowTaken_) { + const_cast<TYaMRTableReader*>(this)->ReadRow(); + } + return Row_; +} + +bool TYaMRTableReader::IsValid() const +{ + return Valid_; +} + +void TYaMRTableReader::Next() +{ + TLenvalTableReader::Next(); +} + +void TYaMRTableReader::NextKey() +{ + TLenvalTableReader::NextKey(); +} + +ui32 TYaMRTableReader::GetTableIndex() const +{ + return TLenvalTableReader::GetTableIndex(); +} + +ui32 TYaMRTableReader::GetRangeIndex() const +{ + return TLenvalTableReader::GetRangeIndex(); +} + +ui64 TYaMRTableReader::GetRowIndex() const +{ + return TLenvalTableReader::GetRowIndex(); +} + +TMaybe<size_t> TYaMRTableReader::GetReadByteCount() const +{ + return TLenvalTableReader::GetReadByteCount(); +} + +bool TYaMRTableReader::IsEndOfStream() const +{ + return TLenvalTableReader::IsEndOfStream(); +} + +bool TYaMRTableReader::IsRawReaderExhausted() const +{ + return TLenvalTableReader::IsRawReaderExhausted(); +} + +void TYaMRTableReader::ReadField(TString* result, i32 length) +{ + result->resize(length); + size_t count = Input_.Load(result->begin(), length); + Y_ENSURE(count == static_cast<size_t>(length), "Premature end of YaMR stream"); +} + +void TYaMRTableReader::ReadRow() +{ + while (true) { + try { + i32 value = static_cast<i32>(Length_); + ReadField(&Key_, value); + Row_.Key = Key_; + + ReadInteger(&value); + ReadField(&SubKey_, value); + Row_.SubKey = SubKey_; + + ReadInteger(&value); + ReadField(&Value_, value); + Row_.Value = Value_; + + RowTaken_ = true; + + // We successfully parsed one more row from the stream, + // so reset retry count to their initial value. + Input_.ResetRetries(); + + break; + } catch (const std::exception& ) { + if (!TLenvalTableReader::Retry()) { + throw; + } + } + } +} + +void TYaMRTableReader::SkipRow() +{ + while (true) { + try { + i32 value = static_cast<i32>(Length_); + CheckedSkip(&Input_, value); + + ReadInteger(&value); + CheckedSkip(&Input_, value); + + ReadInteger(&value); + CheckedSkip(&Input_, value); + break; + } catch (const std::exception& ) { + if (!TLenvalTableReader::Retry()) { + throw; + } + } + } +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/io/yamr_table_reader.h b/yt/cpp/mapreduce/io/yamr_table_reader.h new file mode 100644 index 0000000000..39fdecfa71 --- /dev/null +++ b/yt/cpp/mapreduce/io/yamr_table_reader.h @@ -0,0 +1,48 @@ +#pragma once + +#include "lenval_table_reader.h" + +#include <yt/cpp/mapreduce/interface/io.h> + +namespace NYT { + +class TRawTableReader; +struct TClientContext; + +//////////////////////////////////////////////////////////////////////////////// + +class TYaMRTableReader + : public IYaMRReaderImpl + , public TLenvalTableReader +{ +public: + explicit TYaMRTableReader(::TIntrusivePtr<TRawTableReader> input); + ~TYaMRTableReader() override; + + const TYaMRRow& GetRow() const override; + + bool IsValid() const override; + void Next() override; + ui32 GetTableIndex() const override; + ui32 GetRangeIndex() const override; + ui64 GetRowIndex() const override; + void NextKey() override; + TMaybe<size_t> GetReadByteCount() const override; + bool IsEndOfStream() const override; + bool IsRawReaderExhausted() const override; + +private: + void ReadField(TString* result, i32 length); + + void ReadRow(); + void SkipRow() override; + + TYaMRRow Row_; + TString Key_; + TString SubKey_; + TString Value_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/io/yamr_table_writer.cpp b/yt/cpp/mapreduce/io/yamr_table_writer.cpp new file mode 100644 index 0000000000..cce7ceb0f0 --- /dev/null +++ b/yt/cpp/mapreduce/io/yamr_table_writer.cpp @@ -0,0 +1,53 @@ +#include "yamr_table_writer.h" + +#include <yt/cpp/mapreduce/interface/io.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +TYaMRTableWriter::TYaMRTableWriter(THolder<IProxyOutput> output) + : Output_(std::move(output)) +{ } + +TYaMRTableWriter::~TYaMRTableWriter() +{ } + +size_t TYaMRTableWriter::GetTableCount() const +{ + return Output_->GetStreamCount(); +} + +void TYaMRTableWriter::FinishTable(size_t tableIndex) { + Output_->GetStream(tableIndex)->Finish(); +} + +void TYaMRTableWriter::AddRow(const TYaMRRow& row, size_t tableIndex) +{ + auto* stream = Output_->GetStream(tableIndex); + + auto writeField = [&stream] (const TStringBuf& field) { + i32 length = static_cast<i32>(field.length()); + stream->Write(&length, sizeof(length)); + stream->Write(field.data(), field.length()); + }; + + writeField(row.Key); + writeField(row.SubKey); + writeField(row.Value); + + Output_->OnRowFinished(tableIndex); +} + +void TYaMRTableWriter::AddRow(TYaMRRow&& row, size_t tableIndex) { + TYaMRTableWriter::AddRow(row, tableIndex); +} + +void TYaMRTableWriter::Abort() +{ + Output_->Abort(); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/io/yamr_table_writer.h b/yt/cpp/mapreduce/io/yamr_table_writer.h new file mode 100644 index 0000000000..cf88eaf287 --- /dev/null +++ b/yt/cpp/mapreduce/io/yamr_table_writer.h @@ -0,0 +1,31 @@ +#pragma once + +#include <yt/cpp/mapreduce/interface/io.h> + +namespace NYT { + +class IProxyOutput; + +//////////////////////////////////////////////////////////////////////////////// + +class TYaMRTableWriter + : public IYaMRWriterImpl +{ +public: + explicit TYaMRTableWriter(THolder<IProxyOutput> output); + ~TYaMRTableWriter() override; + + void AddRow(const TYaMRRow& row, size_t tableIndex) override; + void AddRow(TYaMRRow&& row, size_t tableIndex) override; + + size_t GetTableCount() const override; + void FinishTable(size_t) override; + void Abort() override; + +private: + THolder<IProxyOutput> Output_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT |