summaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/io/skiff_table_reader.cpp
diff options
context:
space:
mode:
authormax42 <[email protected]>2023-06-30 11:13:34 +0300
committermax42 <[email protected]>2023-06-30 11:13:34 +0300
commit3e1899838408bbad47622007aa382bc8a2b01f87 (patch)
tree0f21c1e6add187ddb6c3ccc048a7d640ce03fb87 /yt/cpp/mapreduce/io/skiff_table_reader.cpp
parent5463eb3f5e72a86f858a3d27c886470a724ede34 (diff)
Revert "YT-19324: move YT provider to ydb/library/yql"
This reverts commit ca272f12fdd0e8d5c3e957fc87939148f1caaf72, reversing changes made to 49f8acfc8b0b5c0071b804423bcf53fda26c7c12.
Diffstat (limited to 'yt/cpp/mapreduce/io/skiff_table_reader.cpp')
-rw-r--r--yt/cpp/mapreduce/io/skiff_table_reader.cpp293
1 files changed, 0 insertions, 293 deletions
diff --git a/yt/cpp/mapreduce/io/skiff_table_reader.cpp b/yt/cpp/mapreduce/io/skiff_table_reader.cpp
deleted file mode 100644
index 51c20609f08..00000000000
--- a/yt/cpp/mapreduce/io/skiff_table_reader.cpp
+++ /dev/null
@@ -1,293 +0,0 @@
-#include "skiff_table_reader.h"
-
-#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
-
-#include <library/cpp/yson/node/node_io.h>
-
-#include <yt/cpp/mapreduce/skiff/wire_type.h>
-#include <yt/cpp/mapreduce/skiff/skiff_schema.h>
-
-#include <util/string/cast.h>
-
-namespace NYT {
-namespace NDetail {
-namespace {
-
-////////////////////////////////////////////////////////////////////////////////
-
-enum EColumnType : i8
-{
- Dense,
- KeySwitch,
- RangeIndex,
- RowIndex
-};
-
-struct TSkiffColumnSchema
-{
- EColumnType Type;
- bool Required;
- NSkiff::EWireType WireType;
- TString Name;
-
- TSkiffColumnSchema(EColumnType type, bool required, NSkiff::EWireType wireType, const TString& name)
- : Type(type)
- , Required(required)
- , WireType(wireType)
- , Name(name)
- { }
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace
-
-struct TSkiffTableReader::TSkiffTableSchema
-{
- TVector<TSkiffColumnSchema> Columns;
-};
-
-TSkiffTableReader::TSkiffTableReader(
- ::TIntrusivePtr<TRawTableReader> input,
- const NSkiff::TSkiffSchemaPtr& schema)
- : Input_(std::move(input))
- , BufferedInput_(&Input_)
- , Parser_(&BufferedInput_)
- , Schemas_(CreateSkiffTableSchemas(schema))
-{
- Next();
-}
-
-TSkiffTableReader::~TSkiffTableReader() = default;
-
-const TNode& TSkiffTableReader::GetRow() const
-{
- EnsureValidity();
- Y_ENSURE(!Row_.IsUndefined(), "Row is moved");
- return Row_;
-}
-
-void TSkiffTableReader::MoveRow(TNode* result)
-{
- EnsureValidity();
- Y_ENSURE(!Row_.IsUndefined(), "Row is moved");
- *result = std::move(Row_);
- Row_ = TNode();
-}
-
-bool TSkiffTableReader::IsValid() const
-{
- return Valid_;
-}
-
-void TSkiffTableReader::Next()
-{
- EnsureValidity();
- if (Y_UNLIKELY(Finished_ || !Parser_->HasMoreData())) {
- Finished_ = true;
- Valid_ = false;
- return;
- }
-
- if (AfterKeySwitch_) {
- AfterKeySwitch_ = false;
- return;
- }
-
- while (true) {
- try {
- ReadRow();
- break;
- } catch (const std::exception& exception) {
- YT_LOG_ERROR("Read error: %v", exception.what());
- if (!Input_.Retry(RangeIndex_, RowIndex_)) {
- throw;
- }
- BufferedInput_ = TBufferedInput(&Input_);
- Parser_.emplace(NSkiff::TUncheckedSkiffParser(&BufferedInput_));
- RangeIndex_.Clear();
- RowIndex_.Clear();
- }
- }
-}
-
-ui32 TSkiffTableReader::GetTableIndex() const
-{
- EnsureValidity();
- return TableIndex_;
-}
-
-ui32 TSkiffTableReader::GetRangeIndex() const
-{
- EnsureValidity();
- return RangeIndex_.GetOrElse(0);
-}
-
-ui64 TSkiffTableReader::GetRowIndex() const
-{
- EnsureValidity();
- return RowIndex_.GetOrElse(0ULL);
-}
-
-void TSkiffTableReader::NextKey()
-{
- while (Valid_) {
- Next();
- }
-
- if (Finished_) {
- return;
- }
-
- Valid_ = true;
-}
-
-TMaybe<size_t> TSkiffTableReader::GetReadByteCount() const
-{
- return Input_.GetReadByteCount();
-}
-
-bool TSkiffTableReader::IsRawReaderExhausted() const
-{
- return Finished_;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-TVector<TSkiffTableReader::TSkiffTableSchema> TSkiffTableReader::CreateSkiffTableSchemas(
- const NSkiff::TSkiffSchemaPtr& schema)
-{
- using NSkiff::EWireType;
-
- constexpr auto keySwitchColumnName = "$key_switch";
- constexpr auto rangeIndexColumnName = "$range_index";
- constexpr auto rowIndexColumnName = "$row_index";
-
- static const THashMap<TString, TSkiffColumnSchema> specialColumns = {
- {keySwitchColumnName, {EColumnType::KeySwitch, true, EWireType::Boolean, keySwitchColumnName}},
- {rangeIndexColumnName, {EColumnType::RangeIndex, false, EWireType::Int64, rangeIndexColumnName}},
- {rowIndexColumnName, {EColumnType::RowIndex, false, EWireType::Int64, rowIndexColumnName}},
- };
-
- Y_ENSURE(schema->GetWireType() == EWireType::Variant16,
- "Expected 'variant16' wire type for schema, got '" << schema->GetWireType() << "'");
- TVector<TSkiffTableSchema> result;
- for (const auto& tableSchema : schema->GetChildren()) {
- Y_ENSURE(tableSchema->GetWireType() == EWireType::Tuple,
- "Expected 'tuple' wire type for table schema, got '" << tableSchema->GetWireType() << "'");
- TVector<TSkiffColumnSchema> columns;
- for (const auto& columnSchema : tableSchema->GetChildren()) {
- if (columnSchema->GetName().StartsWith("$")) {
- auto iter = specialColumns.find(columnSchema->GetName());
- Y_ENSURE(iter != specialColumns.end(), "Unknown special column: " << columnSchema->GetName());
- columns.push_back(iter->second);
- } else {
- auto wireType = columnSchema->GetWireType();
- bool required = true;
- if (wireType == EWireType::Variant8) {
- const auto& children = columnSchema->GetChildren();
- Y_ENSURE(
- children.size() == 2 && children[0]->GetWireType() == EWireType::Nothing &&
- NSkiff::IsSimpleType(children[1]->GetWireType()),
- "Expected schema of form 'variant8<nothing, simple-type>', got "
- << NSkiff::GetShortDebugString(columnSchema));
- wireType = children[1]->GetWireType();
- required = false;
- }
- Y_ENSURE(NSkiff::IsSimpleType(wireType),
- "Expected column schema to be of simple type, got " << NSkiff::GetShortDebugString(columnSchema));
- columns.emplace_back(
- EColumnType::Dense,
- required,
- wireType,
- columnSchema->GetName());
- }
- }
- result.push_back({std::move(columns)});
- }
- return result;
-}
-
-void TSkiffTableReader::ReadRow()
-{
- if (Row_.IsUndefined()) {
- Row_ = TNode::CreateMap();
- } else {
- Row_.AsMap().clear();
- }
-
- if (RowIndex_) {
- ++*RowIndex_;
- }
-
- TableIndex_ = Parser_->ParseVariant16Tag();
- Y_ENSURE(TableIndex_ < Schemas_.size(), "Table index out of range: " << TableIndex_ << " >= " << Schemas_.size());
- const auto& tableSchema = Schemas_[TableIndex_];
-
- auto parse = [&](NSkiff::EWireType wireType) -> TNode {
- switch (wireType) {
- case NSkiff::EWireType::Int64:
- return Parser_->ParseInt64();
- case NSkiff::EWireType::Uint64:
- return Parser_->ParseUint64();
- case NSkiff::EWireType::Boolean:
- return Parser_->ParseBoolean();
- case NSkiff::EWireType::Double:
- return Parser_->ParseDouble();
- case NSkiff::EWireType::String32:
- return Parser_->ParseString32();
- case NSkiff::EWireType::Yson32:
- return NodeFromYsonString(Parser_->ParseYson32());
- case NSkiff::EWireType::Nothing:
- return TNode::CreateEntity();
- default:
- Y_FAIL("Bad column wire type: '%s'", ::ToString(wireType).data());
- }
- };
-
- for (const auto& columnSchema : tableSchema.Columns) {
- if (!columnSchema.Required) {
- auto tag = Parser_->ParseVariant8Tag();
- if (tag == 0) {
- if (columnSchema.Type == EColumnType::Dense) {
- Row_[columnSchema.Name] = TNode::CreateEntity();
- }
- continue;
- }
- Y_ENSURE(tag == 1, "Tag for 'variant8<nothing," << columnSchema.WireType
- << ">' expected to be 0 or 1, got " << tag);
- }
- auto value = parse(columnSchema.WireType);
- switch (columnSchema.Type) {
- case EColumnType::Dense:
- Row_[columnSchema.Name] = std::move(value);
- break;
- case EColumnType::KeySwitch:
- if (value.AsBool()) {
- AfterKeySwitch_ = true;
- Valid_ = false;
- }
- break;
- case EColumnType::RangeIndex:
- RangeIndex_ = value.AsInt64();
- break;
- case EColumnType::RowIndex:
- RowIndex_ = value.AsInt64();
- break;
- default:
- Y_FAIL("Bad column type: %d", static_cast<int>(columnSchema.Type));
- }
- }
-
- // We successfully parsed one more row from the stream,
- // so reset retry count to their initial value.
- Input_.ResetRetries();
-}
-
-void TSkiffTableReader::EnsureValidity() const
-{
- Y_ENSURE(Valid_, "Iterator is not valid");
-}
-
-} // namespace NDetail
-} // namespace NYT