diff options
| author | max42 <[email protected]> | 2023-06-30 11:13:34 +0300 |
|---|---|---|
| committer | max42 <[email protected]> | 2023-06-30 11:13:34 +0300 |
| commit | 3e1899838408bbad47622007aa382bc8a2b01f87 (patch) | |
| tree | 0f21c1e6add187ddb6c3ccc048a7d640ce03fb87 /yt/cpp/mapreduce/client/skiff.cpp | |
| parent | 5463eb3f5e72a86f858a3d27c886470a724ede34 (diff) | |
Revert "YT-19324: move YT provider to ydb/library/yql"
This reverts commit ca272f12fdd0e8d5c3e957fc87939148f1caaf72, reversing
changes made to 49f8acfc8b0b5c0071b804423bcf53fda26c7c12.
Diffstat (limited to 'yt/cpp/mapreduce/client/skiff.cpp')
| -rw-r--r-- | yt/cpp/mapreduce/client/skiff.cpp | 396 |
1 files changed, 0 insertions, 396 deletions
diff --git a/yt/cpp/mapreduce/client/skiff.cpp b/yt/cpp/mapreduce/client/skiff.cpp deleted file mode 100644 index 67a0f960ae2..00000000000 --- a/yt/cpp/mapreduce/client/skiff.cpp +++ /dev/null @@ -1,396 +0,0 @@ -#include "skiff.h" - -#include <yt/cpp/mapreduce/common/retry_lib.h> - -#include <yt/cpp/mapreduce/http/retry_request.h> -#include <yt/cpp/mapreduce/http/requests.h> - -#include <yt/cpp/mapreduce/interface/config.h> -#include <yt/cpp/mapreduce/interface/common.h> -#include <yt/cpp/mapreduce/interface/serialize.h> - -#include <yt/cpp/mapreduce/interface/logging/yt_log.h> - -#include <library/cpp/yson/node/node_builder.h> -#include <library/cpp/yson/node/node_io.h> - -#include <yt/cpp/mapreduce/raw_client/raw_batch_request.h> -#include <yt/cpp/mapreduce/raw_client/raw_requests.h> - -#include <yt/cpp/mapreduce/skiff/skiff_schema.h> - -#include <library/cpp/yson/consumer.h> -#include <library/cpp/yson/writer.h> - -#include <util/string/cast.h> -#include <util/stream/str.h> -#include <util/stream/file.h> -#include <util/folder/path.h> - -namespace NYT { -namespace NDetail { - -using namespace NRawClient; - -using ::ToString; - -//////////////////////////////////////////////////////////////////////////////// - -static NSkiff::TSkiffSchemaPtr ReadSkiffSchema(const TString& fileName) -{ - if (!TFsPath(fileName).Exists()) { - return nullptr; - } - TIFStream input(fileName); - NSkiff::TSkiffSchemaPtr schema; - Deserialize(schema, NodeFromYsonStream(&input)); - return schema; -} - -NSkiff::TSkiffSchemaPtr GetJobInputSkiffSchema() -{ - return ReadSkiffSchema("skiff_input"); -} - -NSkiff::EWireType ValueTypeToSkiffType(EValueType valueType) -{ - using NSkiff::EWireType; - switch (valueType) { - case VT_INT64: - case VT_INT32: - case VT_INT16: - case VT_INT8: - return EWireType::Int64; - - case VT_UINT64: - case VT_UINT32: - case VT_UINT16: - case VT_UINT8: - return EWireType::Uint64; - - case VT_DOUBLE: - case VT_FLOAT: - return EWireType::Double; - - case VT_BOOLEAN: - return EWireType::Boolean; - - case VT_STRING: - case VT_UTF8: - case VT_JSON: - return EWireType::String32; - - case VT_ANY: - return EWireType::Yson32; - - case VT_NULL: - case VT_VOID: - return EWireType::Nothing; - - case VT_DATE: - case VT_DATETIME: - case VT_TIMESTAMP: - return EWireType::Uint64; - - case VT_INTERVAL: - return EWireType::Int64; - }; - ythrow yexception() << "Cannot convert EValueType '" << valueType << "' to NSkiff::EWireType"; -} - -NSkiff::TSkiffSchemaPtr CreateSkiffSchema( - const TTableSchema& schema, - const TCreateSkiffSchemaOptions& options) -{ - using namespace NSkiff; - - Y_ENSURE(schema.Strict(), "Cannot create Skiff schema for non-strict table schema"); - TVector<TSkiffSchemaPtr> skiffColumns; - for (const auto& column: schema.Columns()) { - TSkiffSchemaPtr skiffColumn; - if (column.Type() == VT_ANY && *column.TypeV3() != *NTi::Optional(NTi::Yson())) { - // We ignore all complex types until YT-12717 is done. - return nullptr; - } - if (column.Required() || NTi::IsSingular(column.TypeV3()->GetTypeName())) { - skiffColumn = CreateSimpleTypeSchema(ValueTypeToSkiffType(column.Type())); - } else { - skiffColumn = CreateVariant8Schema({ - CreateSimpleTypeSchema(EWireType::Nothing), - CreateSimpleTypeSchema(ValueTypeToSkiffType(column.Type()))}); - } - if (options.RenameColumns_) { - auto maybeName = options.RenameColumns_->find(column.Name()); - skiffColumn->SetName(maybeName == options.RenameColumns_->end() ? column.Name() : maybeName->second); - } else { - skiffColumn->SetName(column.Name()); - } - skiffColumns.push_back(skiffColumn); - } - - if (options.HasKeySwitch_) { - skiffColumns.push_back( - CreateSimpleTypeSchema(EWireType::Boolean)->SetName("$key_switch")); - } - if (options.HasRangeIndex_) { - skiffColumns.push_back( - CreateVariant8Schema({ - CreateSimpleTypeSchema(EWireType::Nothing), - CreateSimpleTypeSchema(EWireType::Int64)}) - ->SetName("$range_index")); - } - - skiffColumns.push_back( - CreateVariant8Schema({ - CreateSimpleTypeSchema(EWireType::Nothing), - CreateSimpleTypeSchema(EWireType::Int64)}) - ->SetName("$row_index")); - - return CreateTupleSchema(std::move(skiffColumns)); -} - -NSkiff::TSkiffSchemaPtr CreateSkiffSchema( - const TNode& schemaNode, - const TCreateSkiffSchemaOptions& options) -{ - TTableSchema schema; - Deserialize(schema, schemaNode); - return CreateSkiffSchema(schema, options); -} - -void Serialize(const NSkiff::TSkiffSchemaPtr& schema, NYson::IYsonConsumer* consumer) -{ - consumer->OnBeginMap(); - if (schema->GetName().size() > 0) { - consumer->OnKeyedItem("name"); - consumer->OnStringScalar(schema->GetName()); - } - consumer->OnKeyedItem("wire_type"); - consumer->OnStringScalar(ToString(schema->GetWireType())); - if (schema->GetChildren().size() > 0) { - consumer->OnKeyedItem("children"); - consumer->OnBeginList(); - for (const auto& child : schema->GetChildren()) { - consumer->OnListItem(); - Serialize(child, consumer); - } - consumer->OnEndList(); - } - consumer->OnEndMap(); -} - -void Deserialize(NSkiff::TSkiffSchemaPtr& schema, const TNode& node) -{ - using namespace NSkiff; - - static auto createSchema = [](EWireType wireType, TVector<TSkiffSchemaPtr>&& children) -> TSkiffSchemaPtr { - switch (wireType) { - case EWireType::Tuple: - return CreateTupleSchema(std::move(children)); - case EWireType::Variant8: - return CreateVariant8Schema(std::move(children)); - case EWireType::Variant16: - return CreateVariant16Schema(std::move(children)); - case EWireType::RepeatedVariant8: - return CreateRepeatedVariant8Schema(std::move(children)); - case EWireType::RepeatedVariant16: - return CreateRepeatedVariant16Schema(std::move(children)); - default: - return CreateSimpleTypeSchema(wireType); - } - }; - - const auto& map = node.AsMap(); - const auto* wireTypePtr = map.FindPtr("wire_type"); - Y_ENSURE(wireTypePtr, "'wire_type' is a required key"); - auto wireType = FromString<NSkiff::EWireType>(wireTypePtr->AsString()); - - const auto* childrenPtr = map.FindPtr("children"); - Y_ENSURE(NSkiff::IsSimpleType(wireType) || childrenPtr, - "'children' key is required for complex node '" << wireType << "'"); - TVector<TSkiffSchemaPtr> children; - if (childrenPtr) { - for (const auto& childNode : childrenPtr->AsList()) { - TSkiffSchemaPtr childSchema; - Deserialize(childSchema, childNode); - children.push_back(std::move(childSchema)); - } - } - - schema = createSchema(wireType, std::move(children)); - - const auto* namePtr = map.FindPtr("name"); - if (namePtr) { - schema->SetName(namePtr->AsString()); - } -} - -TFormat CreateSkiffFormat(const NSkiff::TSkiffSchemaPtr& schema) { - Y_ENSURE(schema->GetWireType() == NSkiff::EWireType::Variant16, - "Bad wire type for schema; expected 'variant16', got " << schema->GetWireType()); - - THashMap< - NSkiff::TSkiffSchemaPtr, - size_t, - NSkiff::TSkiffSchemaPtrHasher, - NSkiff::TSkiffSchemaPtrEqual> schemasMap; - size_t tableIndex = 0; - auto config = TNode("skiff"); - config.Attributes()["table_skiff_schemas"] = TNode::CreateList(); - - for (const auto& schemaChild : schema->GetChildren()) { - auto [iter, inserted] = schemasMap.emplace(schemaChild, tableIndex); - size_t currentIndex; - if (inserted) { - currentIndex = tableIndex; - ++tableIndex; - } else { - currentIndex = iter->second; - } - config.Attributes()["table_skiff_schemas"].Add("$" + ToString(currentIndex)); - } - - config.Attributes()["skiff_schema_registry"] = TNode::CreateMap(); - - for (const auto& [tableSchema, index] : schemasMap) { - TNode node; - TNodeBuilder nodeBuilder(&node); - Serialize(tableSchema, &nodeBuilder); - config.Attributes()["skiff_schema_registry"][ToString(index)] = std::move(node); - } - - return TFormat(config); -} - -NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary( - const TClientContext& context, - const IClientRetryPolicyPtr& clientRetryPolicy, - const TTransactionId& transactionId, - ENodeReaderFormat nodeReaderFormat, - const TVector<TRichYPath>& tablePaths, - const TCreateSkiffSchemaOptions& options) -{ - if (nodeReaderFormat == ENodeReaderFormat::Yson) { - return nullptr; - } - - for (const auto& path : tablePaths) { - if (path.Columns_) { - switch (nodeReaderFormat) { - case ENodeReaderFormat::Skiff: - ythrow TApiUsageError() << "Cannot use Skiff format with column selectors"; - case ENodeReaderFormat::Auto: - return nullptr; - default: - Y_FAIL("Unexpected node reader format: %d", static_cast<int>(nodeReaderFormat)); - } - } - } - - auto nodes = NRawClient::BatchTransform( - clientRetryPolicy->CreatePolicyForGenericRequest(), - context, - NRawClient::CanonizeYPaths(clientRetryPolicy->CreatePolicyForGenericRequest(), context, tablePaths), - [&] (TRawBatchRequest& batch, const TRichYPath& path) { - auto getOptions = TGetOptions() - .AttributeFilter( - TAttributeFilter() - .AddAttribute("schema") - .AddAttribute("dynamic") - .AddAttribute("type") - ); - return batch.Get(transactionId, path.Path_, getOptions); - }); - - TVector<NSkiff::TSkiffSchemaPtr> schemas; - for (size_t tableIndex = 0; tableIndex < nodes.size(); ++tableIndex) { - const auto& tablePath = tablePaths[tableIndex].Path_; - const auto& attributes = nodes[tableIndex].GetAttributes(); - Y_ENSURE_EX(attributes["type"] == TNode("table"), - TApiUsageError() << "Operation input path " << tablePath << " is not a table"); - bool dynamic = attributes["dynamic"].AsBool(); - bool strict = attributes["schema"].GetAttributes()["strict"].AsBool(); - switch (nodeReaderFormat) { - case ENodeReaderFormat::Skiff: - Y_ENSURE_EX(strict, - TApiUsageError() << "Cannot use skiff format for table with non-strict schema '" << tablePath << "'"); - Y_ENSURE_EX(!dynamic, - TApiUsageError() << "Cannot use skiff format for dynamic table '" << tablePath << "'"); - break; - case ENodeReaderFormat::Auto: - if (dynamic || !strict) { - YT_LOG_DEBUG("Cannot use skiff format for table '%v' as it is dynamic or has non-strict schema", - tablePath); - return nullptr; - } - break; - default: - Y_FAIL("Unexpected node reader format: %d", static_cast<int>(nodeReaderFormat)); - } - - NSkiff::TSkiffSchemaPtr curSkiffSchema; - if (tablePaths[tableIndex].RenameColumns_) { - auto customOptions = options; - customOptions.RenameColumns(*tablePaths[tableIndex].RenameColumns_); - curSkiffSchema = CreateSkiffSchema(attributes["schema"], customOptions); - } else { - curSkiffSchema = CreateSkiffSchema(attributes["schema"], options); - } - - if (!curSkiffSchema) { - return nullptr; - } - schemas.push_back(curSkiffSchema); - } - return NSkiff::CreateVariant16Schema(std::move(schemas)); -} - -//////////////////////////////////////////////////////////////////////////////// - -NSkiff::TSkiffSchemaPtr CreateSkiffSchema( - const TVector<NSkiff::TSkiffSchemaPtr>& tableSchemas, - const TCreateSkiffSchemaOptions& options -) { - constexpr auto KEY_SWITCH_COLUMN = "$key_switch"; - constexpr auto ROW_INDEX_COLUMN = "$row_index"; - constexpr auto RANGE_INDEX_COLUMN = "$range_index"; - - TVector<NSkiff::TSkiffSchemaPtr> schemas; - schemas.reserve(tableSchemas.size()); - - for (const auto& tableSchema : tableSchemas) { - Y_ENSURE(tableSchema->GetWireType() == NSkiff::EWireType::Tuple, - "Expected 'tuple' wire type for table schema, got '" << tableSchema->GetWireType() << "'"); - - const auto& children = tableSchema->GetChildren(); - NSkiff::TSkiffSchemaList columns; - - columns.reserve(children.size() + 3); - if (options.HasKeySwitch_) { - columns.push_back( - CreateSimpleTypeSchema(NSkiff::EWireType::Boolean)->SetName(KEY_SWITCH_COLUMN)); - } - columns.push_back( - NSkiff::CreateVariant8Schema({ - CreateSimpleTypeSchema(NSkiff::EWireType::Nothing), - CreateSimpleTypeSchema(NSkiff::EWireType::Int64)}) - ->SetName(ROW_INDEX_COLUMN)); - if (options.HasRangeIndex_) { - columns.push_back( - NSkiff::CreateVariant8Schema({ - CreateSimpleTypeSchema(NSkiff::EWireType::Nothing), - CreateSimpleTypeSchema(NSkiff::EWireType::Int64)}) - ->SetName(RANGE_INDEX_COLUMN)); - } - columns.insert(columns.end(), children.begin(), children.end()); - - schemas.push_back(NSkiff::CreateTupleSchema(columns)); - } - - return NSkiff::CreateVariant16Schema(schemas); -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NDetail -} // namespace NYT |
