diff options
| author | max42 <[email protected]> | 2023-07-29 00:02:16 +0300 |
|---|---|---|
| committer | max42 <[email protected]> | 2023-07-29 00:02:16 +0300 |
| commit | 73b89de71748a21e102d27b9f3ed1bf658766cb5 (patch) | |
| tree | 188bbd2d622fa91cdcbb1b6d6d77fbc84a0646f5 /yt/cpp/mapreduce/io/node_table_reader.cpp | |
| parent | 528e321bcc2a2b67b53aeba58c3bd88305a141ee (diff) | |
YT-19210: expose YQL shared library for YT.
After this, a new target libyqlplugin.so appears. in open-source cmake build.
Diff in open-source YDB repo looks like the following: https://paste.yandex-team.ru/f302bdb4-7ef2-4362-91c7-6ca45f329264
Diffstat (limited to 'yt/cpp/mapreduce/io/node_table_reader.cpp')
| -rw-r--r-- | yt/cpp/mapreduce/io/node_table_reader.cpp | 375 |
1 files changed, 375 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/io/node_table_reader.cpp b/yt/cpp/mapreduce/io/node_table_reader.cpp new file mode 100644 index 00000000000..d39e1398a5a --- /dev/null +++ b/yt/cpp/mapreduce/io/node_table_reader.cpp @@ -0,0 +1,375 @@ +#include "node_table_reader.h" + +#include <yt/cpp/mapreduce/common/node_builder.h> +#include <yt/cpp/mapreduce/common/wait_proxy.h> + +#include <yt/cpp/mapreduce/interface/logging/yt_log.h> + +#include <library/cpp/yson/parser.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +class TRowBuilder + : public ::NYson::TYsonConsumerBase +{ +public: + explicit TRowBuilder(TMaybe<TRowElement>* resultRow); + + void OnStringScalar(TStringBuf value) override; + void OnInt64Scalar(i64 value) override; + void OnUint64Scalar(ui64 value) override; + void OnDoubleScalar(double value) override; + void OnBooleanScalar(bool value) override; + void OnBeginList() override; + void OnEntity() override; + void OnListItem() override; + void OnEndList() override; + void OnBeginMap() override; + void OnKeyedItem(TStringBuf key) override; + void OnEndMap() override; + void OnBeginAttributes() override; + void OnEndAttributes() override; + + void Finalize(); + +private: + THolder<TNodeBuilder> Builder_; + TRowElement Row_; + int Depth_ = 0; + bool Started_ = false; + TMaybe<TRowElement>* ResultRow_; + + void SaveResultRow(); +}; + +TRowBuilder::TRowBuilder(TMaybe<TRowElement>* resultRow) + : ResultRow_(resultRow) +{ } + +void TRowBuilder::OnStringScalar(TStringBuf value) +{ + Row_.Size += sizeof(TNode) + sizeof(TString) + value.size(); + Builder_->OnStringScalar(value); +} + +void TRowBuilder::OnInt64Scalar(i64 value) +{ + Row_.Size += sizeof(TNode); + Builder_->OnInt64Scalar(value); +} + +void TRowBuilder::OnUint64Scalar(ui64 value) +{ + Row_.Size += sizeof(TNode); + Builder_->OnUint64Scalar(value); +} + +void TRowBuilder::OnDoubleScalar(double value) +{ + Row_.Size += sizeof(TNode); + Builder_->OnDoubleScalar(value); +} + +void TRowBuilder::OnBooleanScalar(bool value) +{ + Row_.Size += sizeof(TNode); + Builder_->OnBooleanScalar(value); +} + +void TRowBuilder::OnBeginList() +{ + ++Depth_; + Builder_->OnBeginList(); +} + +void TRowBuilder::OnEntity() +{ + Row_.Size += sizeof(TNode); + Builder_->OnEntity(); +} + +void TRowBuilder::OnListItem() +{ + if (Depth_ == 0) { + SaveResultRow(); + } else { + Builder_->OnListItem(); + } +} + +void TRowBuilder::OnEndList() +{ + --Depth_; + Builder_->OnEndList(); +} + +void TRowBuilder::OnBeginMap() +{ + ++Depth_; + Builder_->OnBeginMap(); +} + +void TRowBuilder::OnKeyedItem(TStringBuf key) +{ + Row_.Size += sizeof(TString) + key.size(); + Builder_->OnKeyedItem(key); +} + +void TRowBuilder::OnEndMap() +{ + --Depth_; + Builder_->OnEndMap(); +} + +void TRowBuilder::OnBeginAttributes() +{ + ++Depth_; + Builder_->OnBeginAttributes(); +} + +void TRowBuilder::OnEndAttributes() +{ + --Depth_; + Builder_->OnEndAttributes(); +} + +void TRowBuilder::SaveResultRow() +{ + if (!Started_) { + Started_ = true; + } else { + *ResultRow_ = std::move(Row_); + } + Row_.Reset(); + Builder_.Reset(new TNodeBuilder(&Row_.Node)); +} + +void TRowBuilder::Finalize() +{ + if (Started_) { + *ResultRow_ = std::move(Row_); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +TNodeTableReader::TNodeTableReader(::TIntrusivePtr<TRawTableReader> input) + : Input_(std::move(input)) +{ + PrepareParsing(); + Next(); +} + +TNodeTableReader::~TNodeTableReader() +{ +} + +void TNodeTableReader::ParseListFragmentItem() { + if (!Parser_->Parse()) { + Builder_->Finalize(); + IsLast_ = true; + } +} + +const TNode& TNodeTableReader::GetRow() const +{ + CheckValidity(); + if (!Row_) { + ythrow yexception() << "Row is moved"; + } + return Row_->Node; +} + +void TNodeTableReader::MoveRow(TNode* result) +{ + CheckValidity(); + if (!Row_) { + ythrow yexception() << "Row is moved"; + } + *result = std::move(Row_->Node); + Row_.Clear(); +} + +bool TNodeTableReader::IsValid() const +{ + return Valid_; +} + +void TNodeTableReader::Next() +{ + try { + NextImpl(); + } catch (const std::exception& ex) { + YT_LOG_ERROR("TNodeTableReader::Next failed: %v", ex.what()); + throw; + } +} + +void TNodeTableReader::NextImpl() +{ + CheckValidity(); + + if (RowIndex_) { + ++*RowIndex_; + } + + // At the begin of stream parser doesn't return a finished row. + ParseFirstListFragmentItem(); + + while (true) { + if (IsLast_) { + Finished_ = true; + Valid_ = false; + break; + } + + try { + ParseListFragmentItem(); + } catch (std::exception& ex) { + NeedParseFirst_ = true; + OnStreamError(std::current_exception(), ex.what()); + ParseFirstListFragmentItem(); + continue; + } + + Row_ = std::move(*NextRow_); + if (!Row_) { + throw yexception() << "No row in NextRow_"; + } + + // We successfully parsed one more row from the stream, + // so reset retry count to their initial value. + Input_.ResetRetries(); + + if (!Row_->Node.IsNull()) { + AtStart_ = false; + break; + } + + for (auto& entry : Row_->Node.GetAttributes().AsMap()) { + if (entry.first == "key_switch") { + if (!AtStart_) { + Valid_ = false; + } + } else if (entry.first == "table_index") { + TableIndex_ = static_cast<ui32>(entry.second.AsInt64()); + } else if (entry.first == "row_index") { + RowIndex_ = static_cast<ui64>(entry.second.AsInt64()); + } else if (entry.first == "range_index") { + RangeIndex_ = static_cast<ui32>(entry.second.AsInt64()); + } else if (entry.first == "tablet_index") { + TabletIndex_ = entry.second.AsInt64(); + } else if (entry.first == "end_of_stream") { + IsEndOfStream_ = true; + } + } + + if (!Valid_) { + break; + } + } +} + +void TNodeTableReader::ParseFirstListFragmentItem() +{ + while (NeedParseFirst_) { + try { + ParseListFragmentItem(); + NeedParseFirst_ = false; + break; + } catch (std::exception& ex) { + OnStreamError(std::current_exception(), ex.what()); + } + } +} + +ui32 TNodeTableReader::GetTableIndex() const +{ + CheckValidity(); + return TableIndex_; +} + +ui32 TNodeTableReader::GetRangeIndex() const +{ + CheckValidity(); + return RangeIndex_.GetOrElse(0); +} + +ui64 TNodeTableReader::GetRowIndex() const +{ + CheckValidity(); + return RowIndex_.GetOrElse(0UL); +} + +i64 TNodeTableReader::GetTabletIndex() const +{ + CheckValidity(); + return TabletIndex_.GetOrElse(0L); +} + +void TNodeTableReader::NextKey() +{ + while (Valid_) { + Next(); + } + + if (Finished_) { + return; + } + + Valid_ = true; + + if (RowIndex_) { + --*RowIndex_; + } +} + +TMaybe<size_t> TNodeTableReader::GetReadByteCount() const +{ + return Input_.GetReadByteCount(); +} + +bool TNodeTableReader::IsEndOfStream() const +{ + return IsEndOfStream_; +} + +bool TNodeTableReader::IsRawReaderExhausted() const +{ + return Finished_; +} + +//////////////////////////////////////////////////////////////////////////////// + +void TNodeTableReader::PrepareParsing() +{ + NextRow_.Clear(); + Builder_.Reset(new TRowBuilder(&NextRow_)); + Parser_.Reset(new ::NYson::TYsonListParser(Builder_.Get(), &Input_)); +} + +void TNodeTableReader::OnStreamError(std::exception_ptr exception, TString error) +{ + YT_LOG_ERROR("Read error: %v", error); + Exception_ = exception; + if (Input_.Retry(RangeIndex_, RowIndex_)) { + RowIndex_.Clear(); + RangeIndex_.Clear(); + PrepareParsing(); + } else { + std::rethrow_exception(Exception_); + } +} + +void TNodeTableReader::CheckValidity() const +{ + if (!Valid_) { + ythrow yexception() << "Iterator is not valid"; + } +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT |
