diff options
author | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
---|---|---|
committer | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
commit | fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch) | |
tree | b8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/io/skiff_table_reader.cpp | |
parent | 7bf166b1a7ed0af927f230022b245af618e998c1 (diff) | |
download | ydb-fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a.tar.gz |
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text.
This commit has zero effect on all projects that depend on YQL.
The summary of changes:
- `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library;
- `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes;
- `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`;
- `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`;
- `yql/core` is gone;
- `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`.
**NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/io/skiff_table_reader.cpp')
-rw-r--r-- | yt/cpp/mapreduce/io/skiff_table_reader.cpp | 293 |
1 files changed, 293 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/io/skiff_table_reader.cpp b/yt/cpp/mapreduce/io/skiff_table_reader.cpp new file mode 100644 index 0000000000..51c20609f0 --- /dev/null +++ b/yt/cpp/mapreduce/io/skiff_table_reader.cpp @@ -0,0 +1,293 @@ +#include "skiff_table_reader.h" + +#include <yt/cpp/mapreduce/interface/logging/yt_log.h> + +#include <library/cpp/yson/node/node_io.h> + +#include <yt/cpp/mapreduce/skiff/wire_type.h> +#include <yt/cpp/mapreduce/skiff/skiff_schema.h> + +#include <util/string/cast.h> + +namespace NYT { +namespace NDetail { +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +enum EColumnType : i8 +{ + Dense, + KeySwitch, + RangeIndex, + RowIndex +}; + +struct TSkiffColumnSchema +{ + EColumnType Type; + bool Required; + NSkiff::EWireType WireType; + TString Name; + + TSkiffColumnSchema(EColumnType type, bool required, NSkiff::EWireType wireType, const TString& name) + : Type(type) + , Required(required) + , WireType(wireType) + , Name(name) + { } +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace + +struct TSkiffTableReader::TSkiffTableSchema +{ + TVector<TSkiffColumnSchema> Columns; +}; + +TSkiffTableReader::TSkiffTableReader( + ::TIntrusivePtr<TRawTableReader> input, + const NSkiff::TSkiffSchemaPtr& schema) + : Input_(std::move(input)) + , BufferedInput_(&Input_) + , Parser_(&BufferedInput_) + , Schemas_(CreateSkiffTableSchemas(schema)) +{ + Next(); +} + +TSkiffTableReader::~TSkiffTableReader() = default; + +const TNode& TSkiffTableReader::GetRow() const +{ + EnsureValidity(); + Y_ENSURE(!Row_.IsUndefined(), "Row is moved"); + return Row_; +} + +void TSkiffTableReader::MoveRow(TNode* result) +{ + EnsureValidity(); + Y_ENSURE(!Row_.IsUndefined(), "Row is moved"); + *result = std::move(Row_); + Row_ = TNode(); +} + +bool TSkiffTableReader::IsValid() const +{ + return Valid_; +} + +void TSkiffTableReader::Next() +{ + EnsureValidity(); + if (Y_UNLIKELY(Finished_ || !Parser_->HasMoreData())) { + Finished_ = true; + Valid_ = false; + return; + } + + if (AfterKeySwitch_) { + AfterKeySwitch_ = false; + return; + } + + while (true) { + try { + ReadRow(); + break; + } catch (const std::exception& exception) { + YT_LOG_ERROR("Read error: %v", exception.what()); + if (!Input_.Retry(RangeIndex_, RowIndex_)) { + throw; + } + BufferedInput_ = TBufferedInput(&Input_); + Parser_.emplace(NSkiff::TUncheckedSkiffParser(&BufferedInput_)); + RangeIndex_.Clear(); + RowIndex_.Clear(); + } + } +} + +ui32 TSkiffTableReader::GetTableIndex() const +{ + EnsureValidity(); + return TableIndex_; +} + +ui32 TSkiffTableReader::GetRangeIndex() const +{ + EnsureValidity(); + return RangeIndex_.GetOrElse(0); +} + +ui64 TSkiffTableReader::GetRowIndex() const +{ + EnsureValidity(); + return RowIndex_.GetOrElse(0ULL); +} + +void TSkiffTableReader::NextKey() +{ + while (Valid_) { + Next(); + } + + if (Finished_) { + return; + } + + Valid_ = true; +} + +TMaybe<size_t> TSkiffTableReader::GetReadByteCount() const +{ + return Input_.GetReadByteCount(); +} + +bool TSkiffTableReader::IsRawReaderExhausted() const +{ + return Finished_; +} + +//////////////////////////////////////////////////////////////////////////////// + +TVector<TSkiffTableReader::TSkiffTableSchema> TSkiffTableReader::CreateSkiffTableSchemas( + const NSkiff::TSkiffSchemaPtr& schema) +{ + using NSkiff::EWireType; + + constexpr auto keySwitchColumnName = "$key_switch"; + constexpr auto rangeIndexColumnName = "$range_index"; + constexpr auto rowIndexColumnName = "$row_index"; + + static const THashMap<TString, TSkiffColumnSchema> specialColumns = { + {keySwitchColumnName, {EColumnType::KeySwitch, true, EWireType::Boolean, keySwitchColumnName}}, + {rangeIndexColumnName, {EColumnType::RangeIndex, false, EWireType::Int64, rangeIndexColumnName}}, + {rowIndexColumnName, {EColumnType::RowIndex, false, EWireType::Int64, rowIndexColumnName}}, + }; + + Y_ENSURE(schema->GetWireType() == EWireType::Variant16, + "Expected 'variant16' wire type for schema, got '" << schema->GetWireType() << "'"); + TVector<TSkiffTableSchema> result; + for (const auto& tableSchema : schema->GetChildren()) { + Y_ENSURE(tableSchema->GetWireType() == EWireType::Tuple, + "Expected 'tuple' wire type for table schema, got '" << tableSchema->GetWireType() << "'"); + TVector<TSkiffColumnSchema> columns; + for (const auto& columnSchema : tableSchema->GetChildren()) { + if (columnSchema->GetName().StartsWith("$")) { + auto iter = specialColumns.find(columnSchema->GetName()); + Y_ENSURE(iter != specialColumns.end(), "Unknown special column: " << columnSchema->GetName()); + columns.push_back(iter->second); + } else { + auto wireType = columnSchema->GetWireType(); + bool required = true; + if (wireType == EWireType::Variant8) { + const auto& children = columnSchema->GetChildren(); + Y_ENSURE( + children.size() == 2 && children[0]->GetWireType() == EWireType::Nothing && + NSkiff::IsSimpleType(children[1]->GetWireType()), + "Expected schema of form 'variant8<nothing, simple-type>', got " + << NSkiff::GetShortDebugString(columnSchema)); + wireType = children[1]->GetWireType(); + required = false; + } + Y_ENSURE(NSkiff::IsSimpleType(wireType), + "Expected column schema to be of simple type, got " << NSkiff::GetShortDebugString(columnSchema)); + columns.emplace_back( + EColumnType::Dense, + required, + wireType, + columnSchema->GetName()); + } + } + result.push_back({std::move(columns)}); + } + return result; +} + +void TSkiffTableReader::ReadRow() +{ + if (Row_.IsUndefined()) { + Row_ = TNode::CreateMap(); + } else { + Row_.AsMap().clear(); + } + + if (RowIndex_) { + ++*RowIndex_; + } + + TableIndex_ = Parser_->ParseVariant16Tag(); + Y_ENSURE(TableIndex_ < Schemas_.size(), "Table index out of range: " << TableIndex_ << " >= " << Schemas_.size()); + const auto& tableSchema = Schemas_[TableIndex_]; + + auto parse = [&](NSkiff::EWireType wireType) -> TNode { + switch (wireType) { + case NSkiff::EWireType::Int64: + return Parser_->ParseInt64(); + case NSkiff::EWireType::Uint64: + return Parser_->ParseUint64(); + case NSkiff::EWireType::Boolean: + return Parser_->ParseBoolean(); + case NSkiff::EWireType::Double: + return Parser_->ParseDouble(); + case NSkiff::EWireType::String32: + return Parser_->ParseString32(); + case NSkiff::EWireType::Yson32: + return NodeFromYsonString(Parser_->ParseYson32()); + case NSkiff::EWireType::Nothing: + return TNode::CreateEntity(); + default: + Y_FAIL("Bad column wire type: '%s'", ::ToString(wireType).data()); + } + }; + + for (const auto& columnSchema : tableSchema.Columns) { + if (!columnSchema.Required) { + auto tag = Parser_->ParseVariant8Tag(); + if (tag == 0) { + if (columnSchema.Type == EColumnType::Dense) { + Row_[columnSchema.Name] = TNode::CreateEntity(); + } + continue; + } + Y_ENSURE(tag == 1, "Tag for 'variant8<nothing," << columnSchema.WireType + << ">' expected to be 0 or 1, got " << tag); + } + auto value = parse(columnSchema.WireType); + switch (columnSchema.Type) { + case EColumnType::Dense: + Row_[columnSchema.Name] = std::move(value); + break; + case EColumnType::KeySwitch: + if (value.AsBool()) { + AfterKeySwitch_ = true; + Valid_ = false; + } + break; + case EColumnType::RangeIndex: + RangeIndex_ = value.AsInt64(); + break; + case EColumnType::RowIndex: + RowIndex_ = value.AsInt64(); + break; + default: + Y_FAIL("Bad column type: %d", static_cast<int>(columnSchema.Type)); + } + } + + // We successfully parsed one more row from the stream, + // so reset retry count to their initial value. + Input_.ResetRetries(); +} + +void TSkiffTableReader::EnsureValidity() const +{ + Y_ENSURE(Valid_, "Iterator is not valid"); +} + +} // namespace NDetail +} // namespace NYT |