summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/io/node_table_reader.cpp
diff options
context:
space:
mode:
authormax42 <[email protected]>2023-06-30 03:37:03 +0300
committermax42 <[email protected]>2023-06-30 03:37:03 +0300
commitfac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch)
treeb8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/io/node_table_reader.cpp
parent7bf166b1a7ed0af927f230022b245af618e998c1 (diff)
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text. This commit has zero effect on all projects that depend on YQL. The summary of changes: - `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library; - `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes; - `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`; - `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`; - `yql/core` is gone; - `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`. **NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/io/node_table_reader.cpp')
-rw-r--r--yt/cpp/mapreduce/io/node_table_reader.cpp375
1 files changed, 375 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/io/node_table_reader.cpp b/yt/cpp/mapreduce/io/node_table_reader.cpp
new file mode 100644
index 00000000000..d39e1398a5a
--- /dev/null
+++ b/yt/cpp/mapreduce/io/node_table_reader.cpp
@@ -0,0 +1,375 @@
+#include "node_table_reader.h"
+
+#include <yt/cpp/mapreduce/common/node_builder.h>
+#include <yt/cpp/mapreduce/common/wait_proxy.h>
+
+#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
+
+#include <library/cpp/yson/parser.h>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TRowBuilder
+ : public ::NYson::TYsonConsumerBase
+{
+public:
+ explicit TRowBuilder(TMaybe<TRowElement>* resultRow);
+
+ void OnStringScalar(TStringBuf value) override;
+ void OnInt64Scalar(i64 value) override;
+ void OnUint64Scalar(ui64 value) override;
+ void OnDoubleScalar(double value) override;
+ void OnBooleanScalar(bool value) override;
+ void OnBeginList() override;
+ void OnEntity() override;
+ void OnListItem() override;
+ void OnEndList() override;
+ void OnBeginMap() override;
+ void OnKeyedItem(TStringBuf key) override;
+ void OnEndMap() override;
+ void OnBeginAttributes() override;
+ void OnEndAttributes() override;
+
+ void Finalize();
+
+private:
+ THolder<TNodeBuilder> Builder_;
+ TRowElement Row_;
+ int Depth_ = 0;
+ bool Started_ = false;
+ TMaybe<TRowElement>* ResultRow_;
+
+ void SaveResultRow();
+};
+
+TRowBuilder::TRowBuilder(TMaybe<TRowElement>* resultRow)
+ : ResultRow_(resultRow)
+{ }
+
+void TRowBuilder::OnStringScalar(TStringBuf value)
+{
+ Row_.Size += sizeof(TNode) + sizeof(TString) + value.size();
+ Builder_->OnStringScalar(value);
+}
+
+void TRowBuilder::OnInt64Scalar(i64 value)
+{
+ Row_.Size += sizeof(TNode);
+ Builder_->OnInt64Scalar(value);
+}
+
+void TRowBuilder::OnUint64Scalar(ui64 value)
+{
+ Row_.Size += sizeof(TNode);
+ Builder_->OnUint64Scalar(value);
+}
+
+void TRowBuilder::OnDoubleScalar(double value)
+{
+ Row_.Size += sizeof(TNode);
+ Builder_->OnDoubleScalar(value);
+}
+
+void TRowBuilder::OnBooleanScalar(bool value)
+{
+ Row_.Size += sizeof(TNode);
+ Builder_->OnBooleanScalar(value);
+}
+
+void TRowBuilder::OnBeginList()
+{
+ ++Depth_;
+ Builder_->OnBeginList();
+}
+
+void TRowBuilder::OnEntity()
+{
+ Row_.Size += sizeof(TNode);
+ Builder_->OnEntity();
+}
+
+void TRowBuilder::OnListItem()
+{
+ if (Depth_ == 0) {
+ SaveResultRow();
+ } else {
+ Builder_->OnListItem();
+ }
+}
+
+void TRowBuilder::OnEndList()
+{
+ --Depth_;
+ Builder_->OnEndList();
+}
+
+void TRowBuilder::OnBeginMap()
+{
+ ++Depth_;
+ Builder_->OnBeginMap();
+}
+
+void TRowBuilder::OnKeyedItem(TStringBuf key)
+{
+ Row_.Size += sizeof(TString) + key.size();
+ Builder_->OnKeyedItem(key);
+}
+
+void TRowBuilder::OnEndMap()
+{
+ --Depth_;
+ Builder_->OnEndMap();
+}
+
+void TRowBuilder::OnBeginAttributes()
+{
+ ++Depth_;
+ Builder_->OnBeginAttributes();
+}
+
+void TRowBuilder::OnEndAttributes()
+{
+ --Depth_;
+ Builder_->OnEndAttributes();
+}
+
+void TRowBuilder::SaveResultRow()
+{
+ if (!Started_) {
+ Started_ = true;
+ } else {
+ *ResultRow_ = std::move(Row_);
+ }
+ Row_.Reset();
+ Builder_.Reset(new TNodeBuilder(&Row_.Node));
+}
+
+void TRowBuilder::Finalize()
+{
+ if (Started_) {
+ *ResultRow_ = std::move(Row_);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TNodeTableReader::TNodeTableReader(::TIntrusivePtr<TRawTableReader> input)
+ : Input_(std::move(input))
+{
+ PrepareParsing();
+ Next();
+}
+
+TNodeTableReader::~TNodeTableReader()
+{
+}
+
+void TNodeTableReader::ParseListFragmentItem() {
+ if (!Parser_->Parse()) {
+ Builder_->Finalize();
+ IsLast_ = true;
+ }
+}
+
+const TNode& TNodeTableReader::GetRow() const
+{
+ CheckValidity();
+ if (!Row_) {
+ ythrow yexception() << "Row is moved";
+ }
+ return Row_->Node;
+}
+
+void TNodeTableReader::MoveRow(TNode* result)
+{
+ CheckValidity();
+ if (!Row_) {
+ ythrow yexception() << "Row is moved";
+ }
+ *result = std::move(Row_->Node);
+ Row_.Clear();
+}
+
+bool TNodeTableReader::IsValid() const
+{
+ return Valid_;
+}
+
+void TNodeTableReader::Next()
+{
+ try {
+ NextImpl();
+ } catch (const std::exception& ex) {
+ YT_LOG_ERROR("TNodeTableReader::Next failed: %v", ex.what());
+ throw;
+ }
+}
+
+void TNodeTableReader::NextImpl()
+{
+ CheckValidity();
+
+ if (RowIndex_) {
+ ++*RowIndex_;
+ }
+
+ // At the begin of stream parser doesn't return a finished row.
+ ParseFirstListFragmentItem();
+
+ while (true) {
+ if (IsLast_) {
+ Finished_ = true;
+ Valid_ = false;
+ break;
+ }
+
+ try {
+ ParseListFragmentItem();
+ } catch (std::exception& ex) {
+ NeedParseFirst_ = true;
+ OnStreamError(std::current_exception(), ex.what());
+ ParseFirstListFragmentItem();
+ continue;
+ }
+
+ Row_ = std::move(*NextRow_);
+ if (!Row_) {
+ throw yexception() << "No row in NextRow_";
+ }
+
+ // We successfully parsed one more row from the stream,
+ // so reset retry count to their initial value.
+ Input_.ResetRetries();
+
+ if (!Row_->Node.IsNull()) {
+ AtStart_ = false;
+ break;
+ }
+
+ for (auto& entry : Row_->Node.GetAttributes().AsMap()) {
+ if (entry.first == "key_switch") {
+ if (!AtStart_) {
+ Valid_ = false;
+ }
+ } else if (entry.first == "table_index") {
+ TableIndex_ = static_cast<ui32>(entry.second.AsInt64());
+ } else if (entry.first == "row_index") {
+ RowIndex_ = static_cast<ui64>(entry.second.AsInt64());
+ } else if (entry.first == "range_index") {
+ RangeIndex_ = static_cast<ui32>(entry.second.AsInt64());
+ } else if (entry.first == "tablet_index") {
+ TabletIndex_ = entry.second.AsInt64();
+ } else if (entry.first == "end_of_stream") {
+ IsEndOfStream_ = true;
+ }
+ }
+
+ if (!Valid_) {
+ break;
+ }
+ }
+}
+
+void TNodeTableReader::ParseFirstListFragmentItem()
+{
+ while (NeedParseFirst_) {
+ try {
+ ParseListFragmentItem();
+ NeedParseFirst_ = false;
+ break;
+ } catch (std::exception& ex) {
+ OnStreamError(std::current_exception(), ex.what());
+ }
+ }
+}
+
+ui32 TNodeTableReader::GetTableIndex() const
+{
+ CheckValidity();
+ return TableIndex_;
+}
+
+ui32 TNodeTableReader::GetRangeIndex() const
+{
+ CheckValidity();
+ return RangeIndex_.GetOrElse(0);
+}
+
+ui64 TNodeTableReader::GetRowIndex() const
+{
+ CheckValidity();
+ return RowIndex_.GetOrElse(0UL);
+}
+
+i64 TNodeTableReader::GetTabletIndex() const
+{
+ CheckValidity();
+ return TabletIndex_.GetOrElse(0L);
+}
+
+void TNodeTableReader::NextKey()
+{
+ while (Valid_) {
+ Next();
+ }
+
+ if (Finished_) {
+ return;
+ }
+
+ Valid_ = true;
+
+ if (RowIndex_) {
+ --*RowIndex_;
+ }
+}
+
+TMaybe<size_t> TNodeTableReader::GetReadByteCount() const
+{
+ return Input_.GetReadByteCount();
+}
+
+bool TNodeTableReader::IsEndOfStream() const
+{
+ return IsEndOfStream_;
+}
+
+bool TNodeTableReader::IsRawReaderExhausted() const
+{
+ return Finished_;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TNodeTableReader::PrepareParsing()
+{
+ NextRow_.Clear();
+ Builder_.Reset(new TRowBuilder(&NextRow_));
+ Parser_.Reset(new ::NYson::TYsonListParser(Builder_.Get(), &Input_));
+}
+
+void TNodeTableReader::OnStreamError(std::exception_ptr exception, TString error)
+{
+ YT_LOG_ERROR("Read error: %v", error);
+ Exception_ = exception;
+ if (Input_.Retry(RangeIndex_, RowIndex_)) {
+ RowIndex_.Clear();
+ RangeIndex_.Clear();
+ PrepareParsing();
+ } else {
+ std::rethrow_exception(Exception_);
+ }
+}
+
+void TNodeTableReader::CheckValidity() const
+{
+ if (!Valid_) {
+ ythrow yexception() << "Iterator is not valid";
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT