diff options
| author | max42 <[email protected]> | 2023-06-30 03:37:03 +0300 |
|---|---|---|
| committer | max42 <[email protected]> | 2023-06-30 03:37:03 +0300 |
| commit | fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch) | |
| tree | b8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/io/node_table_reader.cpp | |
| parent | 7bf166b1a7ed0af927f230022b245af618e998c1 (diff) | |
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text.
This commit has zero effect on all projects that depend on YQL.
The summary of changes:
- `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library;
- `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes;
- `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`;
- `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`;
- `yql/core` is gone;
- `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`.
**NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/io/node_table_reader.cpp')
| -rw-r--r-- | yt/cpp/mapreduce/io/node_table_reader.cpp | 375 |
1 files changed, 375 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/io/node_table_reader.cpp b/yt/cpp/mapreduce/io/node_table_reader.cpp new file mode 100644 index 00000000000..d39e1398a5a --- /dev/null +++ b/yt/cpp/mapreduce/io/node_table_reader.cpp @@ -0,0 +1,375 @@ +#include "node_table_reader.h" + +#include <yt/cpp/mapreduce/common/node_builder.h> +#include <yt/cpp/mapreduce/common/wait_proxy.h> + +#include <yt/cpp/mapreduce/interface/logging/yt_log.h> + +#include <library/cpp/yson/parser.h> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +class TRowBuilder + : public ::NYson::TYsonConsumerBase +{ +public: + explicit TRowBuilder(TMaybe<TRowElement>* resultRow); + + void OnStringScalar(TStringBuf value) override; + void OnInt64Scalar(i64 value) override; + void OnUint64Scalar(ui64 value) override; + void OnDoubleScalar(double value) override; + void OnBooleanScalar(bool value) override; + void OnBeginList() override; + void OnEntity() override; + void OnListItem() override; + void OnEndList() override; + void OnBeginMap() override; + void OnKeyedItem(TStringBuf key) override; + void OnEndMap() override; + void OnBeginAttributes() override; + void OnEndAttributes() override; + + void Finalize(); + +private: + THolder<TNodeBuilder> Builder_; + TRowElement Row_; + int Depth_ = 0; + bool Started_ = false; + TMaybe<TRowElement>* ResultRow_; + + void SaveResultRow(); +}; + +TRowBuilder::TRowBuilder(TMaybe<TRowElement>* resultRow) + : ResultRow_(resultRow) +{ } + +void TRowBuilder::OnStringScalar(TStringBuf value) +{ + Row_.Size += sizeof(TNode) + sizeof(TString) + value.size(); + Builder_->OnStringScalar(value); +} + +void TRowBuilder::OnInt64Scalar(i64 value) +{ + Row_.Size += sizeof(TNode); + Builder_->OnInt64Scalar(value); +} + +void TRowBuilder::OnUint64Scalar(ui64 value) +{ + Row_.Size += sizeof(TNode); + Builder_->OnUint64Scalar(value); +} + +void TRowBuilder::OnDoubleScalar(double value) +{ + Row_.Size += sizeof(TNode); + Builder_->OnDoubleScalar(value); +} + +void TRowBuilder::OnBooleanScalar(bool value) +{ + Row_.Size += sizeof(TNode); + Builder_->OnBooleanScalar(value); +} + +void TRowBuilder::OnBeginList() +{ + ++Depth_; + Builder_->OnBeginList(); +} + +void TRowBuilder::OnEntity() +{ + Row_.Size += sizeof(TNode); + Builder_->OnEntity(); +} + +void TRowBuilder::OnListItem() +{ + if (Depth_ == 0) { + SaveResultRow(); + } else { + Builder_->OnListItem(); + } +} + +void TRowBuilder::OnEndList() +{ + --Depth_; + Builder_->OnEndList(); +} + +void TRowBuilder::OnBeginMap() +{ + ++Depth_; + Builder_->OnBeginMap(); +} + +void TRowBuilder::OnKeyedItem(TStringBuf key) +{ + Row_.Size += sizeof(TString) + key.size(); + Builder_->OnKeyedItem(key); +} + +void TRowBuilder::OnEndMap() +{ + --Depth_; + Builder_->OnEndMap(); +} + +void TRowBuilder::OnBeginAttributes() +{ + ++Depth_; + Builder_->OnBeginAttributes(); +} + +void TRowBuilder::OnEndAttributes() +{ + --Depth_; + Builder_->OnEndAttributes(); +} + +void TRowBuilder::SaveResultRow() +{ + if (!Started_) { + Started_ = true; + } else { + *ResultRow_ = std::move(Row_); + } + Row_.Reset(); + Builder_.Reset(new TNodeBuilder(&Row_.Node)); +} + +void TRowBuilder::Finalize() +{ + if (Started_) { + *ResultRow_ = std::move(Row_); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +TNodeTableReader::TNodeTableReader(::TIntrusivePtr<TRawTableReader> input) + : Input_(std::move(input)) +{ + PrepareParsing(); + Next(); +} + +TNodeTableReader::~TNodeTableReader() +{ +} + +void TNodeTableReader::ParseListFragmentItem() { + if (!Parser_->Parse()) { + Builder_->Finalize(); + IsLast_ = true; + } +} + +const TNode& TNodeTableReader::GetRow() const +{ + CheckValidity(); + if (!Row_) { + ythrow yexception() << "Row is moved"; + } + return Row_->Node; +} + +void TNodeTableReader::MoveRow(TNode* result) +{ + CheckValidity(); + if (!Row_) { + ythrow yexception() << "Row is moved"; + } + *result = std::move(Row_->Node); + Row_.Clear(); +} + +bool TNodeTableReader::IsValid() const +{ + return Valid_; +} + +void TNodeTableReader::Next() +{ + try { + NextImpl(); + } catch (const std::exception& ex) { + YT_LOG_ERROR("TNodeTableReader::Next failed: %v", ex.what()); + throw; + } +} + +void TNodeTableReader::NextImpl() +{ + CheckValidity(); + + if (RowIndex_) { + ++*RowIndex_; + } + + // At the begin of stream parser doesn't return a finished row. + ParseFirstListFragmentItem(); + + while (true) { + if (IsLast_) { + Finished_ = true; + Valid_ = false; + break; + } + + try { + ParseListFragmentItem(); + } catch (std::exception& ex) { + NeedParseFirst_ = true; + OnStreamError(std::current_exception(), ex.what()); + ParseFirstListFragmentItem(); + continue; + } + + Row_ = std::move(*NextRow_); + if (!Row_) { + throw yexception() << "No row in NextRow_"; + } + + // We successfully parsed one more row from the stream, + // so reset retry count to their initial value. + Input_.ResetRetries(); + + if (!Row_->Node.IsNull()) { + AtStart_ = false; + break; + } + + for (auto& entry : Row_->Node.GetAttributes().AsMap()) { + if (entry.first == "key_switch") { + if (!AtStart_) { + Valid_ = false; + } + } else if (entry.first == "table_index") { + TableIndex_ = static_cast<ui32>(entry.second.AsInt64()); + } else if (entry.first == "row_index") { + RowIndex_ = static_cast<ui64>(entry.second.AsInt64()); + } else if (entry.first == "range_index") { + RangeIndex_ = static_cast<ui32>(entry.second.AsInt64()); + } else if (entry.first == "tablet_index") { + TabletIndex_ = entry.second.AsInt64(); + } else if (entry.first == "end_of_stream") { + IsEndOfStream_ = true; + } + } + + if (!Valid_) { + break; + } + } +} + +void TNodeTableReader::ParseFirstListFragmentItem() +{ + while (NeedParseFirst_) { + try { + ParseListFragmentItem(); + NeedParseFirst_ = false; + break; + } catch (std::exception& ex) { + OnStreamError(std::current_exception(), ex.what()); + } + } +} + +ui32 TNodeTableReader::GetTableIndex() const +{ + CheckValidity(); + return TableIndex_; +} + +ui32 TNodeTableReader::GetRangeIndex() const +{ + CheckValidity(); + return RangeIndex_.GetOrElse(0); +} + +ui64 TNodeTableReader::GetRowIndex() const +{ + CheckValidity(); + return RowIndex_.GetOrElse(0UL); +} + +i64 TNodeTableReader::GetTabletIndex() const +{ + CheckValidity(); + return TabletIndex_.GetOrElse(0L); +} + +void TNodeTableReader::NextKey() +{ + while (Valid_) { + Next(); + } + + if (Finished_) { + return; + } + + Valid_ = true; + + if (RowIndex_) { + --*RowIndex_; + } +} + +TMaybe<size_t> TNodeTableReader::GetReadByteCount() const +{ + return Input_.GetReadByteCount(); +} + +bool TNodeTableReader::IsEndOfStream() const +{ + return IsEndOfStream_; +} + +bool TNodeTableReader::IsRawReaderExhausted() const +{ + return Finished_; +} + +//////////////////////////////////////////////////////////////////////////////// + +void TNodeTableReader::PrepareParsing() +{ + NextRow_.Clear(); + Builder_.Reset(new TRowBuilder(&NextRow_)); + Parser_.Reset(new ::NYson::TYsonListParser(Builder_.Get(), &Input_)); +} + +void TNodeTableReader::OnStreamError(std::exception_ptr exception, TString error) +{ + YT_LOG_ERROR("Read error: %v", error); + Exception_ = exception; + if (Input_.Retry(RangeIndex_, RowIndex_)) { + RowIndex_.Clear(); + RangeIndex_.Clear(); + PrepareParsing(); + } else { + std::rethrow_exception(Exception_); + } +} + +void TNodeTableReader::CheckValidity() const +{ + if (!Valid_) { + ythrow yexception() << "Iterator is not valid"; + } +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT |
