summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/io/node_table_reader.cpp
diff options
context:
space:
mode:
authormax42 <[email protected]>2023-06-30 11:13:34 +0300
committermax42 <[email protected]>2023-06-30 11:13:34 +0300
commit3e1899838408bbad47622007aa382bc8a2b01f87 (patch)
tree0f21c1e6add187ddb6c3ccc048a7d640ce03fb87 /yt/cpp/mapreduce/io/node_table_reader.cpp
parent5463eb3f5e72a86f858a3d27c886470a724ede34 (diff)
Revert "YT-19324: move YT provider to ydb/library/yql"
This reverts commit ca272f12fdd0e8d5c3e957fc87939148f1caaf72, reversing changes made to 49f8acfc8b0b5c0071b804423bcf53fda26c7c12.
Diffstat (limited to 'yt/cpp/mapreduce/io/node_table_reader.cpp')
-rw-r--r--yt/cpp/mapreduce/io/node_table_reader.cpp375
1 files changed, 0 insertions, 375 deletions
diff --git a/yt/cpp/mapreduce/io/node_table_reader.cpp b/yt/cpp/mapreduce/io/node_table_reader.cpp
deleted file mode 100644
index d39e1398a5a..00000000000
--- a/yt/cpp/mapreduce/io/node_table_reader.cpp
+++ /dev/null
@@ -1,375 +0,0 @@
-#include "node_table_reader.h"
-
-#include <yt/cpp/mapreduce/common/node_builder.h>
-#include <yt/cpp/mapreduce/common/wait_proxy.h>
-
-#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
-
-#include <library/cpp/yson/parser.h>
-
-namespace NYT {
-
-////////////////////////////////////////////////////////////////////////////////
-
-class TRowBuilder
- : public ::NYson::TYsonConsumerBase
-{
-public:
- explicit TRowBuilder(TMaybe<TRowElement>* resultRow);
-
- void OnStringScalar(TStringBuf value) override;
- void OnInt64Scalar(i64 value) override;
- void OnUint64Scalar(ui64 value) override;
- void OnDoubleScalar(double value) override;
- void OnBooleanScalar(bool value) override;
- void OnBeginList() override;
- void OnEntity() override;
- void OnListItem() override;
- void OnEndList() override;
- void OnBeginMap() override;
- void OnKeyedItem(TStringBuf key) override;
- void OnEndMap() override;
- void OnBeginAttributes() override;
- void OnEndAttributes() override;
-
- void Finalize();
-
-private:
- THolder<TNodeBuilder> Builder_;
- TRowElement Row_;
- int Depth_ = 0;
- bool Started_ = false;
- TMaybe<TRowElement>* ResultRow_;
-
- void SaveResultRow();
-};
-
-TRowBuilder::TRowBuilder(TMaybe<TRowElement>* resultRow)
- : ResultRow_(resultRow)
-{ }
-
-void TRowBuilder::OnStringScalar(TStringBuf value)
-{
- Row_.Size += sizeof(TNode) + sizeof(TString) + value.size();
- Builder_->OnStringScalar(value);
-}
-
-void TRowBuilder::OnInt64Scalar(i64 value)
-{
- Row_.Size += sizeof(TNode);
- Builder_->OnInt64Scalar(value);
-}
-
-void TRowBuilder::OnUint64Scalar(ui64 value)
-{
- Row_.Size += sizeof(TNode);
- Builder_->OnUint64Scalar(value);
-}
-
-void TRowBuilder::OnDoubleScalar(double value)
-{
- Row_.Size += sizeof(TNode);
- Builder_->OnDoubleScalar(value);
-}
-
-void TRowBuilder::OnBooleanScalar(bool value)
-{
- Row_.Size += sizeof(TNode);
- Builder_->OnBooleanScalar(value);
-}
-
-void TRowBuilder::OnBeginList()
-{
- ++Depth_;
- Builder_->OnBeginList();
-}
-
-void TRowBuilder::OnEntity()
-{
- Row_.Size += sizeof(TNode);
- Builder_->OnEntity();
-}
-
-void TRowBuilder::OnListItem()
-{
- if (Depth_ == 0) {
- SaveResultRow();
- } else {
- Builder_->OnListItem();
- }
-}
-
-void TRowBuilder::OnEndList()
-{
- --Depth_;
- Builder_->OnEndList();
-}
-
-void TRowBuilder::OnBeginMap()
-{
- ++Depth_;
- Builder_->OnBeginMap();
-}
-
-void TRowBuilder::OnKeyedItem(TStringBuf key)
-{
- Row_.Size += sizeof(TString) + key.size();
- Builder_->OnKeyedItem(key);
-}
-
-void TRowBuilder::OnEndMap()
-{
- --Depth_;
- Builder_->OnEndMap();
-}
-
-void TRowBuilder::OnBeginAttributes()
-{
- ++Depth_;
- Builder_->OnBeginAttributes();
-}
-
-void TRowBuilder::OnEndAttributes()
-{
- --Depth_;
- Builder_->OnEndAttributes();
-}
-
-void TRowBuilder::SaveResultRow()
-{
- if (!Started_) {
- Started_ = true;
- } else {
- *ResultRow_ = std::move(Row_);
- }
- Row_.Reset();
- Builder_.Reset(new TNodeBuilder(&Row_.Node));
-}
-
-void TRowBuilder::Finalize()
-{
- if (Started_) {
- *ResultRow_ = std::move(Row_);
- }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-TNodeTableReader::TNodeTableReader(::TIntrusivePtr<TRawTableReader> input)
- : Input_(std::move(input))
-{
- PrepareParsing();
- Next();
-}
-
-TNodeTableReader::~TNodeTableReader()
-{
-}
-
-void TNodeTableReader::ParseListFragmentItem() {
- if (!Parser_->Parse()) {
- Builder_->Finalize();
- IsLast_ = true;
- }
-}
-
-const TNode& TNodeTableReader::GetRow() const
-{
- CheckValidity();
- if (!Row_) {
- ythrow yexception() << "Row is moved";
- }
- return Row_->Node;
-}
-
-void TNodeTableReader::MoveRow(TNode* result)
-{
- CheckValidity();
- if (!Row_) {
- ythrow yexception() << "Row is moved";
- }
- *result = std::move(Row_->Node);
- Row_.Clear();
-}
-
-bool TNodeTableReader::IsValid() const
-{
- return Valid_;
-}
-
-void TNodeTableReader::Next()
-{
- try {
- NextImpl();
- } catch (const std::exception& ex) {
- YT_LOG_ERROR("TNodeTableReader::Next failed: %v", ex.what());
- throw;
- }
-}
-
-void TNodeTableReader::NextImpl()
-{
- CheckValidity();
-
- if (RowIndex_) {
- ++*RowIndex_;
- }
-
- // At the begin of stream parser doesn't return a finished row.
- ParseFirstListFragmentItem();
-
- while (true) {
- if (IsLast_) {
- Finished_ = true;
- Valid_ = false;
- break;
- }
-
- try {
- ParseListFragmentItem();
- } catch (std::exception& ex) {
- NeedParseFirst_ = true;
- OnStreamError(std::current_exception(), ex.what());
- ParseFirstListFragmentItem();
- continue;
- }
-
- Row_ = std::move(*NextRow_);
- if (!Row_) {
- throw yexception() << "No row in NextRow_";
- }
-
- // We successfully parsed one more row from the stream,
- // so reset retry count to their initial value.
- Input_.ResetRetries();
-
- if (!Row_->Node.IsNull()) {
- AtStart_ = false;
- break;
- }
-
- for (auto& entry : Row_->Node.GetAttributes().AsMap()) {
- if (entry.first == "key_switch") {
- if (!AtStart_) {
- Valid_ = false;
- }
- } else if (entry.first == "table_index") {
- TableIndex_ = static_cast<ui32>(entry.second.AsInt64());
- } else if (entry.first == "row_index") {
- RowIndex_ = static_cast<ui64>(entry.second.AsInt64());
- } else if (entry.first == "range_index") {
- RangeIndex_ = static_cast<ui32>(entry.second.AsInt64());
- } else if (entry.first == "tablet_index") {
- TabletIndex_ = entry.second.AsInt64();
- } else if (entry.first == "end_of_stream") {
- IsEndOfStream_ = true;
- }
- }
-
- if (!Valid_) {
- break;
- }
- }
-}
-
-void TNodeTableReader::ParseFirstListFragmentItem()
-{
- while (NeedParseFirst_) {
- try {
- ParseListFragmentItem();
- NeedParseFirst_ = false;
- break;
- } catch (std::exception& ex) {
- OnStreamError(std::current_exception(), ex.what());
- }
- }
-}
-
-ui32 TNodeTableReader::GetTableIndex() const
-{
- CheckValidity();
- return TableIndex_;
-}
-
-ui32 TNodeTableReader::GetRangeIndex() const
-{
- CheckValidity();
- return RangeIndex_.GetOrElse(0);
-}
-
-ui64 TNodeTableReader::GetRowIndex() const
-{
- CheckValidity();
- return RowIndex_.GetOrElse(0UL);
-}
-
-i64 TNodeTableReader::GetTabletIndex() const
-{
- CheckValidity();
- return TabletIndex_.GetOrElse(0L);
-}
-
-void TNodeTableReader::NextKey()
-{
- while (Valid_) {
- Next();
- }
-
- if (Finished_) {
- return;
- }
-
- Valid_ = true;
-
- if (RowIndex_) {
- --*RowIndex_;
- }
-}
-
-TMaybe<size_t> TNodeTableReader::GetReadByteCount() const
-{
- return Input_.GetReadByteCount();
-}
-
-bool TNodeTableReader::IsEndOfStream() const
-{
- return IsEndOfStream_;
-}
-
-bool TNodeTableReader::IsRawReaderExhausted() const
-{
- return Finished_;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-void TNodeTableReader::PrepareParsing()
-{
- NextRow_.Clear();
- Builder_.Reset(new TRowBuilder(&NextRow_));
- Parser_.Reset(new ::NYson::TYsonListParser(Builder_.Get(), &Input_));
-}
-
-void TNodeTableReader::OnStreamError(std::exception_ptr exception, TString error)
-{
- YT_LOG_ERROR("Read error: %v", error);
- Exception_ = exception;
- if (Input_.Retry(RangeIndex_, RowIndex_)) {
- RowIndex_.Clear();
- RangeIndex_.Clear();
- PrepareParsing();
- } else {
- std::rethrow_exception(Exception_);
- }
-}
-
-void TNodeTableReader::CheckValidity() const
-{
- if (!Valid_) {
- ythrow yexception() << "Iterator is not valid";
- }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT