aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/io/skiff_table_reader.cpp
diff options
context:
space:
mode:
authormax42 <max42@yandex-team.com>2023-06-30 03:37:03 +0300
committermax42 <max42@yandex-team.com>2023-06-30 03:37:03 +0300
commitfac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch)
treeb8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/io/skiff_table_reader.cpp
parent7bf166b1a7ed0af927f230022b245af618e998c1 (diff)
downloadydb-fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a.tar.gz
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text. This commit has zero effect on all projects that depend on YQL. The summary of changes: - `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library; - `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes; - `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`; - `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`; - `yql/core` is gone; - `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`. **NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/io/skiff_table_reader.cpp')
-rw-r--r--yt/cpp/mapreduce/io/skiff_table_reader.cpp293
1 files changed, 293 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/io/skiff_table_reader.cpp b/yt/cpp/mapreduce/io/skiff_table_reader.cpp
new file mode 100644
index 0000000000..51c20609f0
--- /dev/null
+++ b/yt/cpp/mapreduce/io/skiff_table_reader.cpp
@@ -0,0 +1,293 @@
+#include "skiff_table_reader.h"
+
+#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
+
+#include <library/cpp/yson/node/node_io.h>
+
+#include <yt/cpp/mapreduce/skiff/wire_type.h>
+#include <yt/cpp/mapreduce/skiff/skiff_schema.h>
+
+#include <util/string/cast.h>
+
+namespace NYT {
+namespace NDetail {
+namespace {
+
+////////////////////////////////////////////////////////////////////////////////
+
+enum EColumnType : i8
+{
+ Dense,
+ KeySwitch,
+ RangeIndex,
+ RowIndex
+};
+
+struct TSkiffColumnSchema
+{
+ EColumnType Type;
+ bool Required;
+ NSkiff::EWireType WireType;
+ TString Name;
+
+ TSkiffColumnSchema(EColumnType type, bool required, NSkiff::EWireType wireType, const TString& name)
+ : Type(type)
+ , Required(required)
+ , WireType(wireType)
+ , Name(name)
+ { }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace
+
+struct TSkiffTableReader::TSkiffTableSchema
+{
+ TVector<TSkiffColumnSchema> Columns;
+};
+
+TSkiffTableReader::TSkiffTableReader(
+ ::TIntrusivePtr<TRawTableReader> input,
+ const NSkiff::TSkiffSchemaPtr& schema)
+ : Input_(std::move(input))
+ , BufferedInput_(&Input_)
+ , Parser_(&BufferedInput_)
+ , Schemas_(CreateSkiffTableSchemas(schema))
+{
+ Next();
+}
+
+TSkiffTableReader::~TSkiffTableReader() = default;
+
+const TNode& TSkiffTableReader::GetRow() const
+{
+ EnsureValidity();
+ Y_ENSURE(!Row_.IsUndefined(), "Row is moved");
+ return Row_;
+}
+
+void TSkiffTableReader::MoveRow(TNode* result)
+{
+ EnsureValidity();
+ Y_ENSURE(!Row_.IsUndefined(), "Row is moved");
+ *result = std::move(Row_);
+ Row_ = TNode();
+}
+
+bool TSkiffTableReader::IsValid() const
+{
+ return Valid_;
+}
+
+void TSkiffTableReader::Next()
+{
+ EnsureValidity();
+ if (Y_UNLIKELY(Finished_ || !Parser_->HasMoreData())) {
+ Finished_ = true;
+ Valid_ = false;
+ return;
+ }
+
+ if (AfterKeySwitch_) {
+ AfterKeySwitch_ = false;
+ return;
+ }
+
+ while (true) {
+ try {
+ ReadRow();
+ break;
+ } catch (const std::exception& exception) {
+ YT_LOG_ERROR("Read error: %v", exception.what());
+ if (!Input_.Retry(RangeIndex_, RowIndex_)) {
+ throw;
+ }
+ BufferedInput_ = TBufferedInput(&Input_);
+ Parser_.emplace(NSkiff::TUncheckedSkiffParser(&BufferedInput_));
+ RangeIndex_.Clear();
+ RowIndex_.Clear();
+ }
+ }
+}
+
+ui32 TSkiffTableReader::GetTableIndex() const
+{
+ EnsureValidity();
+ return TableIndex_;
+}
+
+ui32 TSkiffTableReader::GetRangeIndex() const
+{
+ EnsureValidity();
+ return RangeIndex_.GetOrElse(0);
+}
+
+ui64 TSkiffTableReader::GetRowIndex() const
+{
+ EnsureValidity();
+ return RowIndex_.GetOrElse(0ULL);
+}
+
+void TSkiffTableReader::NextKey()
+{
+ while (Valid_) {
+ Next();
+ }
+
+ if (Finished_) {
+ return;
+ }
+
+ Valid_ = true;
+}
+
+TMaybe<size_t> TSkiffTableReader::GetReadByteCount() const
+{
+ return Input_.GetReadByteCount();
+}
+
+bool TSkiffTableReader::IsRawReaderExhausted() const
+{
+ return Finished_;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TVector<TSkiffTableReader::TSkiffTableSchema> TSkiffTableReader::CreateSkiffTableSchemas(
+ const NSkiff::TSkiffSchemaPtr& schema)
+{
+ using NSkiff::EWireType;
+
+ constexpr auto keySwitchColumnName = "$key_switch";
+ constexpr auto rangeIndexColumnName = "$range_index";
+ constexpr auto rowIndexColumnName = "$row_index";
+
+ static const THashMap<TString, TSkiffColumnSchema> specialColumns = {
+ {keySwitchColumnName, {EColumnType::KeySwitch, true, EWireType::Boolean, keySwitchColumnName}},
+ {rangeIndexColumnName, {EColumnType::RangeIndex, false, EWireType::Int64, rangeIndexColumnName}},
+ {rowIndexColumnName, {EColumnType::RowIndex, false, EWireType::Int64, rowIndexColumnName}},
+ };
+
+ Y_ENSURE(schema->GetWireType() == EWireType::Variant16,
+ "Expected 'variant16' wire type for schema, got '" << schema->GetWireType() << "'");
+ TVector<TSkiffTableSchema> result;
+ for (const auto& tableSchema : schema->GetChildren()) {
+ Y_ENSURE(tableSchema->GetWireType() == EWireType::Tuple,
+ "Expected 'tuple' wire type for table schema, got '" << tableSchema->GetWireType() << "'");
+ TVector<TSkiffColumnSchema> columns;
+ for (const auto& columnSchema : tableSchema->GetChildren()) {
+ if (columnSchema->GetName().StartsWith("$")) {
+ auto iter = specialColumns.find(columnSchema->GetName());
+ Y_ENSURE(iter != specialColumns.end(), "Unknown special column: " << columnSchema->GetName());
+ columns.push_back(iter->second);
+ } else {
+ auto wireType = columnSchema->GetWireType();
+ bool required = true;
+ if (wireType == EWireType::Variant8) {
+ const auto& children = columnSchema->GetChildren();
+ Y_ENSURE(
+ children.size() == 2 && children[0]->GetWireType() == EWireType::Nothing &&
+ NSkiff::IsSimpleType(children[1]->GetWireType()),
+ "Expected schema of form 'variant8<nothing, simple-type>', got "
+ << NSkiff::GetShortDebugString(columnSchema));
+ wireType = children[1]->GetWireType();
+ required = false;
+ }
+ Y_ENSURE(NSkiff::IsSimpleType(wireType),
+ "Expected column schema to be of simple type, got " << NSkiff::GetShortDebugString(columnSchema));
+ columns.emplace_back(
+ EColumnType::Dense,
+ required,
+ wireType,
+ columnSchema->GetName());
+ }
+ }
+ result.push_back({std::move(columns)});
+ }
+ return result;
+}
+
+void TSkiffTableReader::ReadRow()
+{
+ if (Row_.IsUndefined()) {
+ Row_ = TNode::CreateMap();
+ } else {
+ Row_.AsMap().clear();
+ }
+
+ if (RowIndex_) {
+ ++*RowIndex_;
+ }
+
+ TableIndex_ = Parser_->ParseVariant16Tag();
+ Y_ENSURE(TableIndex_ < Schemas_.size(), "Table index out of range: " << TableIndex_ << " >= " << Schemas_.size());
+ const auto& tableSchema = Schemas_[TableIndex_];
+
+ auto parse = [&](NSkiff::EWireType wireType) -> TNode {
+ switch (wireType) {
+ case NSkiff::EWireType::Int64:
+ return Parser_->ParseInt64();
+ case NSkiff::EWireType::Uint64:
+ return Parser_->ParseUint64();
+ case NSkiff::EWireType::Boolean:
+ return Parser_->ParseBoolean();
+ case NSkiff::EWireType::Double:
+ return Parser_->ParseDouble();
+ case NSkiff::EWireType::String32:
+ return Parser_->ParseString32();
+ case NSkiff::EWireType::Yson32:
+ return NodeFromYsonString(Parser_->ParseYson32());
+ case NSkiff::EWireType::Nothing:
+ return TNode::CreateEntity();
+ default:
+ Y_FAIL("Bad column wire type: '%s'", ::ToString(wireType).data());
+ }
+ };
+
+ for (const auto& columnSchema : tableSchema.Columns) {
+ if (!columnSchema.Required) {
+ auto tag = Parser_->ParseVariant8Tag();
+ if (tag == 0) {
+ if (columnSchema.Type == EColumnType::Dense) {
+ Row_[columnSchema.Name] = TNode::CreateEntity();
+ }
+ continue;
+ }
+ Y_ENSURE(tag == 1, "Tag for 'variant8<nothing," << columnSchema.WireType
+ << ">' expected to be 0 or 1, got " << tag);
+ }
+ auto value = parse(columnSchema.WireType);
+ switch (columnSchema.Type) {
+ case EColumnType::Dense:
+ Row_[columnSchema.Name] = std::move(value);
+ break;
+ case EColumnType::KeySwitch:
+ if (value.AsBool()) {
+ AfterKeySwitch_ = true;
+ Valid_ = false;
+ }
+ break;
+ case EColumnType::RangeIndex:
+ RangeIndex_ = value.AsInt64();
+ break;
+ case EColumnType::RowIndex:
+ RowIndex_ = value.AsInt64();
+ break;
+ default:
+ Y_FAIL("Bad column type: %d", static_cast<int>(columnSchema.Type));
+ }
+ }
+
+ // We successfully parsed one more row from the stream,
+ // so reset retry count to their initial value.
+ Input_.ResetRetries();
+}
+
+void TSkiffTableReader::EnsureValidity() const
+{
+ Y_ENSURE(Valid_, "Iterator is not valid");
+}
+
+} // namespace NDetail
+} // namespace NYT