diff options
author | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
---|---|---|
committer | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
commit | fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch) | |
tree | b8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/interface/common.cpp | |
parent | 7bf166b1a7ed0af927f230022b245af618e998c1 (diff) | |
download | ydb-fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a.tar.gz |
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text.
This commit has zero effect on all projects that depend on YQL.
The summary of changes:
- `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library;
- `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes;
- `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`;
- `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`;
- `yql/core` is gone;
- `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`.
**NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/interface/common.cpp')
-rw-r--r-- | yt/cpp/mapreduce/interface/common.cpp | 664 |
1 files changed, 664 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/interface/common.cpp b/yt/cpp/mapreduce/interface/common.cpp new file mode 100644 index 0000000000..f6d60127ce --- /dev/null +++ b/yt/cpp/mapreduce/interface/common.cpp @@ -0,0 +1,664 @@ +#include "common.h" + +#include "errors.h" +#include "format.h" +#include "serialize.h" +#include "fluent.h" + +#include <yt/yt_proto/yt/formats/extension.pb.h> + +#include <library/cpp/yson/node/node_builder.h> +#include <library/cpp/yson/node/node_io.h> +#include <library/cpp/type_info/type.h> + +#include <util/generic/xrange.h> + +namespace NYT { + +using ::google::protobuf::FieldDescriptor; +using ::google::protobuf::Descriptor; + +//////////////////////////////////////////////////////////////////////////////// + +TSortColumn::TSortColumn(TStringBuf name, ESortOrder sortOrder) + : Name_(name) + , SortOrder_(sortOrder) +{ } + +TSortColumn::TSortColumn(const TString& name, ESortOrder sortOrder) + : TSortColumn(static_cast<TStringBuf>(name), sortOrder) +{ } + +TSortColumn::TSortColumn(const char* name, ESortOrder sortOrder) + : TSortColumn(static_cast<TStringBuf>(name), sortOrder) +{ } + +const TSortColumn& TSortColumn::EnsureAscending() const +{ + Y_ENSURE(SortOrder() == ESortOrder::SO_ASCENDING); + return *this; +} + +TNode TSortColumn::ToNode() const +{ + return BuildYsonNodeFluently().Value(*this); +} + +//////////////////////////////////////////////////////////////////////////////// +// Below lie backward compatibility methods. +//////////////////////////////////////////////////////////////////////////////// + +TSortColumn& TSortColumn::operator = (TStringBuf name) +{ + EnsureAscending(); + Name_ = name; + return *this; +} + +TSortColumn& TSortColumn::operator = (const TString& name) +{ + return (*this = static_cast<TStringBuf>(name)); +} + +TSortColumn& TSortColumn::operator = (const char* name) +{ + return (*this = static_cast<TStringBuf>(name)); +} + +bool TSortColumn::operator == (TStringBuf rhsName) const +{ + EnsureAscending(); + return Name_ == rhsName; +} + +bool TSortColumn::operator != (TStringBuf rhsName) const +{ + return !(*this == rhsName); +} + +bool TSortColumn::operator == (const TString& rhsName) const +{ + return *this == static_cast<TStringBuf>(rhsName); +} + +bool TSortColumn::operator != (const TString& rhsName) const +{ + return !(*this == rhsName); +} + +bool TSortColumn::operator == (const char* rhsName) const +{ + return *this == static_cast<TStringBuf>(rhsName); +} + +bool TSortColumn::operator != (const char* rhsName) const +{ + return !(*this == rhsName); +} + +TSortColumn::operator TStringBuf() const +{ + EnsureAscending(); + return Name_; +} + +TSortColumn::operator TString() const +{ + return TString(static_cast<TStringBuf>(*this)); +} + +TSortColumn::operator std::string() const +{ + EnsureAscending(); + return static_cast<std::string>(Name_); +} + +//////////////////////////////////////////////////////////////////////////////// + +TSortColumns::TSortColumns() +{ } + +TSortColumns::TSortColumns(const TVector<TString>& names) +{ + Parts_.assign(names.begin(), names.end()); +} + +TSortColumns::TSortColumns(const TColumnNames& names) + : TSortColumns(names.Parts_) +{ } + +TSortColumns::operator TColumnNames() const +{ + return TColumnNames(EnsureAscending().GetNames()); +} + +const TSortColumns& TSortColumns::EnsureAscending() const +{ + for (const auto& sortColumn : Parts_) { + sortColumn.EnsureAscending(); + } + return *this; +} + +TVector<TString> TSortColumns::GetNames() const +{ + TVector<TString> names; + names.reserve(Parts_.size()); + for (const auto& sortColumn : Parts_) { + names.push_back(sortColumn.Name()); + } + return names; +} + +//////////////////////////////////////////////////////////////////////////////// + +static NTi::TTypePtr OldTypeToTypeV3(EValueType type) +{ + switch (type) { + case VT_INT64: + return NTi::Int64(); + case VT_UINT64: + return NTi::Uint64(); + + case VT_DOUBLE: + return NTi::Double(); + + case VT_BOOLEAN: + return NTi::Bool(); + + case VT_STRING: + return NTi::String(); + + case VT_ANY: + return NTi::Yson(); + + case VT_INT8: + return NTi::Int8(); + case VT_INT16: + return NTi::Int16(); + case VT_INT32: + return NTi::Int32(); + + case VT_UINT8: + return NTi::Uint8(); + case VT_UINT16: + return NTi::Uint16(); + case VT_UINT32: + return NTi::Uint32(); + + case VT_UTF8: + return NTi::Utf8(); + + case VT_NULL: + return NTi::Null(); + + case VT_VOID: + return NTi::Void(); + + case VT_DATE: + return NTi::Date(); + case VT_DATETIME: + return NTi::Datetime(); + case VT_TIMESTAMP: + return NTi::Timestamp(); + case VT_INTERVAL: + return NTi::Interval(); + + case VT_FLOAT: + return NTi::Float(); + case VT_JSON: + return NTi::Json(); + } +} + +static std::pair<EValueType, bool> Simplify(const NTi::TTypePtr& type) +{ + using namespace NTi; + const auto typeName = type->GetTypeName(); + switch (typeName) { + case ETypeName::Bool: + return {VT_BOOLEAN, true}; + + case ETypeName::Int8: + return {VT_INT8, true}; + case ETypeName::Int16: + return {VT_INT16, true}; + case ETypeName::Int32: + return {VT_INT32, true}; + case ETypeName::Int64: + return {VT_INT64, true}; + + case ETypeName::Uint8: + return {VT_UINT8, true}; + case ETypeName::Uint16: + return {VT_UINT16, true}; + case ETypeName::Uint32: + return {VT_UINT32, true}; + case ETypeName::Uint64: + return {VT_UINT64, true}; + + case ETypeName::Float: + return {VT_FLOAT, true}; + case ETypeName::Double: + return {VT_DOUBLE, true}; + + case ETypeName::String: + return {VT_STRING, true}; + case ETypeName::Utf8: + return {VT_UTF8, true}; + + case ETypeName::Date: + return {VT_DATE, true}; + case ETypeName::Datetime: + return {VT_DATETIME, true}; + case ETypeName::Timestamp: + return {VT_TIMESTAMP, true}; + case ETypeName::Interval: + return {VT_INTERVAL, true}; + + case ETypeName::TzDate: + case ETypeName::TzDatetime: + case ETypeName::TzTimestamp: + break; + + case ETypeName::Json: + return {VT_JSON, true}; + case ETypeName::Decimal: + return {VT_STRING, true}; + case ETypeName::Uuid: + break; + case ETypeName::Yson: + return {VT_ANY, true}; + + case ETypeName::Void: + return {VT_VOID, false}; + case ETypeName::Null: + return {VT_NULL, false}; + + case ETypeName::Optional: + { + auto itemType = type->AsOptional()->GetItemType(); + if (itemType->IsPrimitive()) { + auto simplified = Simplify(itemType->AsPrimitive()); + if (simplified.second) { + simplified.second = false; + return simplified; + } + } + return {VT_ANY, false}; + } + case ETypeName::List: + return {VT_ANY, true}; + case ETypeName::Dict: + return {VT_ANY, true}; + case ETypeName::Struct: + return {VT_ANY, true}; + case ETypeName::Tuple: + return {VT_ANY, true}; + case ETypeName::Variant: + return {VT_ANY, true}; + case ETypeName::Tagged: + return Simplify(type->AsTagged()->GetItemType()); + } + ythrow TApiUsageError() << "Unsupported type: " << typeName; +} + +NTi::TTypePtr ToTypeV3(EValueType type, bool required) +{ + auto typeV3 = OldTypeToTypeV3(type); + if (!Simplify(typeV3).second) { + if (required) { + ythrow TApiUsageError() << "type: " << type << " cannot be required"; + } else { + return typeV3; + } + } + if (required) { + return typeV3; + } else { + return NTi::Optional(typeV3); + } +} + +TColumnSchema::TColumnSchema() + : TypeV3_(NTi::Optional(NTi::Int64())) +{ } + +EValueType TColumnSchema::Type() const +{ + return Simplify(TypeV3_).first; +} + +TColumnSchema& TColumnSchema::Type(EValueType type) & +{ + return Type(ToTypeV3(type, false)); +} + +TColumnSchema TColumnSchema::Type(EValueType type) && +{ + return Type(ToTypeV3(type, false)); +} + +TColumnSchema& TColumnSchema::Type(const NTi::TTypePtr& type) & +{ + Y_VERIFY(type.Get(), "Cannot create column schema with nullptr type"); + TypeV3_ = type; + return *this; +} + +TColumnSchema TColumnSchema::Type(const NTi::TTypePtr& type) && +{ + Y_VERIFY(type.Get(), "Cannot create column schema with nullptr type"); + TypeV3_ = type; + return *this; +} + +TColumnSchema& TColumnSchema::TypeV3(const NTi::TTypePtr& type) & +{ + return Type(type); +} + +TColumnSchema TColumnSchema::TypeV3(const NTi::TTypePtr& type) && +{ + return Type(type); +} + +NTi::TTypePtr TColumnSchema::TypeV3() const +{ + return TypeV3_; +} + +bool TColumnSchema::Required() const +{ + return Simplify(TypeV3_).second; +} + +TColumnSchema& TColumnSchema::Type(EValueType type, bool required) & +{ + return Type(ToTypeV3(type, required)); +} + +TColumnSchema TColumnSchema::Type(EValueType type, bool required) && +{ + return Type(ToTypeV3(type, required)); +} + +bool operator==(const TColumnSchema& lhs, const TColumnSchema& rhs) +{ + return + lhs.Name() == rhs.Name() && + NTi::NEq::TStrictlyEqual()(lhs.TypeV3(), rhs.TypeV3()) && + lhs.SortOrder() == rhs.SortOrder() && + lhs.Lock() == rhs.Lock() && + lhs.Expression() == rhs.Expression() && + lhs.Aggregate() == rhs.Aggregate() && + lhs.Group() == rhs.Group(); +} + +//////////////////////////////////////////////////////////////////////////////// + +bool TTableSchema::Empty() const +{ + return Columns_.empty(); +} + +TTableSchema& TTableSchema::AddColumn(const TString& name, EValueType type) & +{ + Columns_.push_back(TColumnSchema().Name(name).Type(type)); + return *this; +} + +TTableSchema TTableSchema::AddColumn(const TString& name, EValueType type) && +{ + return std::move(AddColumn(name, type)); +} + +TTableSchema& TTableSchema::AddColumn(const TString& name, EValueType type, ESortOrder sortOrder) & +{ + Columns_.push_back(TColumnSchema().Name(name).Type(type).SortOrder(sortOrder)); + return *this; +} + +TTableSchema TTableSchema::AddColumn(const TString& name, EValueType type, ESortOrder sortOrder) && +{ + return std::move(AddColumn(name, type, sortOrder)); +} + +TTableSchema& TTableSchema::AddColumn(const TString& name, const NTi::TTypePtr& type) & +{ + Columns_.push_back(TColumnSchema().Name(name).Type(type)); + return *this; +} + +TTableSchema TTableSchema::AddColumn(const TString& name, const NTi::TTypePtr& type) && +{ + return std::move(AddColumn(name, type)); +} + +TTableSchema& TTableSchema::AddColumn(const TString& name, const NTi::TTypePtr& type, ESortOrder sortOrder) & +{ + Columns_.push_back(TColumnSchema().Name(name).Type(type).SortOrder(sortOrder)); + return *this; +} + +TTableSchema TTableSchema::AddColumn(const TString& name, const NTi::TTypePtr& type, ESortOrder sortOrder) && +{ + return std::move(AddColumn(name, type, sortOrder)); +} + +TTableSchema& TTableSchema::SortBy(const TSortColumns& sortColumns) & +{ + Y_ENSURE(sortColumns.Parts_.size() <= Columns_.size()); + + THashMap<TString, ui64> sortColumnIndex; + for (auto i: xrange(sortColumns.Parts_.size())) { + Y_ENSURE(sortColumnIndex.emplace(sortColumns.Parts_[i].Name(), i).second, + "Key column name '" << sortColumns.Parts_[i].Name() << "' repeats in columns list"); + } + + TVector<TColumnSchema> newColumnsSorted(sortColumns.Parts_.size()); + TVector<TColumnSchema> newColumnsUnsorted; + for (auto& column : Columns_) { + auto it = sortColumnIndex.find(column.Name()); + if (it == sortColumnIndex.end()) { + column.ResetSortOrder(); + newColumnsUnsorted.push_back(std::move(column)); + } else { + auto index = it->second; + const auto& sortColumn = sortColumns.Parts_[index]; + column.SortOrder(sortColumn.SortOrder()); + newColumnsSorted[index] = std::move(column); + sortColumnIndex.erase(it); + } + } + + Y_ENSURE(sortColumnIndex.empty(), "Column name '" << sortColumnIndex.begin()->first + << "' not found in table schema"); + + newColumnsSorted.insert(newColumnsSorted.end(), newColumnsUnsorted.begin(), newColumnsUnsorted.end()); + Columns_ = std::move(newColumnsSorted); + + return *this; +} + +TTableSchema TTableSchema::SortBy(const TSortColumns& sortColumns) && +{ + return std::move(SortBy(sortColumns)); +} + +TVector<TColumnSchema>& TTableSchema::MutableColumns() +{ + return Columns_; +} + +TNode TTableSchema::ToNode() const +{ + TNode result; + TNodeBuilder builder(&result); + Serialize(*this, &builder); + return result; +} + +TTableSchema TTableSchema::FromNode(const TNode& node) +{ + TTableSchema schema; + Deserialize(schema, node); + return schema; +} + +bool operator==(const TTableSchema& lhs, const TTableSchema& rhs) +{ + return + lhs.Columns() == rhs.Columns() && + lhs.Strict() == rhs.Strict() && + lhs.UniqueKeys() == rhs.UniqueKeys(); +} + +//////////////////////////////////////////////////////////////////////////////// + +TKeyBound::TKeyBound(ERelation relation, TKey key) + : Relation_(relation) + , Key_(std::move(key)) +{ } + +//////////////////////////////////////////////////////////////////////////////// + +TTableSchema CreateTableSchema( + const Descriptor& messageDescriptor, + const TSortColumns& sortColumns, + bool keepFieldsWithoutExtension) +{ + auto result = CreateTableSchema(messageDescriptor, keepFieldsWithoutExtension); + if (!sortColumns.Parts_.empty()) { + result.SortBy(sortColumns.Parts_); + } + return result; +} + +TTableSchema CreateTableSchema(NTi::TTypePtr type) +{ + Y_VERIFY(type); + TTableSchema schema; + Deserialize(schema, NodeFromYsonString(NTi::NIo::AsYtSchema(type.Get()))); + return schema; +} + +//////////////////////////////////////////////////////////////////////////////// + +bool IsTrivial(const TReadLimit& readLimit) +{ + return !readLimit.Key_ && !readLimit.RowIndex_ && !readLimit.Offset_ && !readLimit.TabletIndex_ && !readLimit.KeyBound_; +} + +EValueType NodeTypeToValueType(TNode::EType nodeType) +{ + switch (nodeType) { + case TNode::EType::Int64: return VT_INT64; + case TNode::EType::Uint64: return VT_UINT64; + case TNode::EType::String: return VT_STRING; + case TNode::EType::Double: return VT_DOUBLE; + case TNode::EType::Bool: return VT_BOOLEAN; + default: + ythrow yexception() << "Cannot convert TNode type " << nodeType << " to EValueType"; + } +} + +//////////////////////////////////////////////////////////////////////////////// + +const TVector<TReadRange>& GetRangesCompat(const TRichYPath& path) +{ + static const TVector<TReadRange> empty; + + const auto& maybeRanges = path.GetRanges(); + if (maybeRanges.Empty()) { + return empty; + } else if (maybeRanges->size() > 0) { + return *maybeRanges; + } else { + // If you see this exception, that means that caller of this function doesn't known what to do + // with RichYPath that has set range list, but the range list is empty. + // + // To avoid this exception caller must explicitly handle such case. + // NB. YT-17683 + ythrow TApiUsageError() << "Unsupported RichYPath: explicitly empty range list"; + } +} + +//////////////////////////////////////////////////////////////////////////////// + +namespace NDetail { + +//////////////////////////////////////////////////////////////////////////////// + +TString ToString(EValueType type) +{ + switch (type) { + case VT_INT8: + return "int8"; + case VT_INT16: + return "int16"; + case VT_INT32: + return "int32"; + case VT_INT64: + return "int64"; + + case VT_UINT8: + return "uint8"; + case VT_UINT16: + return "uint16"; + case VT_UINT32: + return "uint32"; + case VT_UINT64: + return "uint64"; + + case VT_DOUBLE: + return "double"; + + case VT_BOOLEAN: + return "boolean"; + + case VT_STRING: + return "string"; + case VT_UTF8: + return "utf8"; + + case VT_ANY: + return "any"; + + case VT_NULL: + return "null"; + case VT_VOID: + return "void"; + + case VT_DATE: + return "date"; + case VT_DATETIME: + return "datetime"; + case VT_TIMESTAMP: + return "timestamp"; + case VT_INTERVAL: + return "interval"; + + case VT_FLOAT: + return "float"; + + case VT_JSON: + return "json"; + } + ythrow yexception() << "Invalid value type " << static_cast<int>(type); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NDetail +} // namespace NYT + +template <> +void Out<NYT::TSortColumn>(IOutputStream& os, const NYT::TSortColumn& sortColumn) +{ + if (sortColumn.SortOrder() == NYT::ESortOrder::SO_ASCENDING) { + os << sortColumn.Name(); + } else { + os << NYT::BuildYsonStringFluently(NYson::EYsonFormat::Text).Value(sortColumn); + } +} |