diff options
author | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
---|---|---|
committer | max42 <max42@yandex-team.com> | 2023-06-30 03:37:03 +0300 |
commit | fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch) | |
tree | b8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/client/structured_table_formats.cpp | |
parent | 7bf166b1a7ed0af927f230022b245af618e998c1 (diff) | |
download | ydb-fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a.tar.gz |
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text.
This commit has zero effect on all projects that depend on YQL.
The summary of changes:
- `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library;
- `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes;
- `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`;
- `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`;
- `yql/core` is gone;
- `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`.
**NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/client/structured_table_formats.cpp')
-rw-r--r-- | yt/cpp/mapreduce/client/structured_table_formats.cpp | 572 |
1 files changed, 572 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/client/structured_table_formats.cpp b/yt/cpp/mapreduce/client/structured_table_formats.cpp new file mode 100644 index 0000000000..b6e82c6c15 --- /dev/null +++ b/yt/cpp/mapreduce/client/structured_table_formats.cpp @@ -0,0 +1,572 @@ +#include "structured_table_formats.h" + +#include "format_hints.h" +#include "skiff.h" + +#include <yt/cpp/mapreduce/common/retry_lib.h> + +#include <yt/cpp/mapreduce/io/yamr_table_reader.h> + +#include <yt/cpp/mapreduce/library/table_schema/protobuf.h> + +#include <yt/cpp/mapreduce/interface/common.h> + +#include <yt/cpp/mapreduce/raw_client/raw_requests.h> + +#include <library/cpp/type_info/type_info.h> +#include <library/cpp/yson/writer.h> + +#include <memory> + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +TMaybe<TNode> GetCommonTableFormat( + const TVector<TMaybe<TNode>>& formats) +{ + TMaybe<TNode> result; + bool start = true; + for (auto& format : formats) { + if (start) { + result = format; + start = false; + continue; + } + + if (result.Defined() != format.Defined()) { + ythrow yexception() << "Different formats of input tables"; + } + + if (!result.Defined()) { + continue; + } + + auto& resultAttrs = result.Get()->GetAttributes(); + auto& formatAttrs = format.Get()->GetAttributes(); + + if (resultAttrs["key_column_names"] != formatAttrs["key_column_names"]) { + ythrow yexception() << "Different formats of input tables"; + } + + bool hasSubkeyColumns = resultAttrs.HasKey("subkey_column_names"); + if (hasSubkeyColumns != formatAttrs.HasKey("subkey_column_names")) { + ythrow yexception() << "Different formats of input tables"; + } + + if (hasSubkeyColumns && + resultAttrs["subkey_column_names"] != formatAttrs["subkey_column_names"]) + { + ythrow yexception() << "Different formats of input tables"; + } + } + + return result; +} + +TMaybe<TNode> GetTableFormat( + const IClientRetryPolicyPtr& retryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TRichYPath& path) +{ + auto formatPath = path.Path_ + "/@_format"; + if (!NDetail::NRawClient::Exists(retryPolicy->CreatePolicyForGenericRequest(), context, transactionId, formatPath)) { + return TMaybe<TNode>(); + } + TMaybe<TNode> format = NDetail::NRawClient::Get(retryPolicy->CreatePolicyForGenericRequest(), context, transactionId, formatPath); + if (format.Get()->AsString() != "yamred_dsv") { + return TMaybe<TNode>(); + } + auto& formatAttrs = format.Get()->Attributes(); + if (!formatAttrs.HasKey("key_column_names")) { + ythrow yexception() << + "Table '" << path.Path_ << "': attribute 'key_column_names' is missing"; + } + formatAttrs["has_subkey"] = "true"; + formatAttrs["lenval"] = "true"; + return format; +} + +TMaybe<TNode> GetTableFormats( + const IClientRetryPolicyPtr& clientRetryPolicy, + const TClientContext& context, + const TTransactionId& transactionId, + const TVector<TRichYPath>& inputs) +{ + TVector<TMaybe<TNode>> formats; + for (auto& table : inputs) { + formats.push_back(GetTableFormat(clientRetryPolicy, context, transactionId, table)); + } + + return GetCommonTableFormat(formats); +} + +//////////////////////////////////////////////////////////////////////////////// + +namespace NDetail { + +//////////////////////////////////////////////////////////////////////////////// + +NSkiff::TSkiffSchemaPtr TryCreateSkiffSchema( + const TClientContext& context, + const IClientRetryPolicyPtr& clientRetryPolicy, + const TTransactionId& transactionId, + const TVector<TRichYPath>& tables, + const TOperationOptions& options, + ENodeReaderFormat nodeReaderFormat) +{ + bool hasInputQuery = options.Spec_.Defined() && options.Spec_->IsMap() && options.Spec_->HasKey("input_query"); + if (hasInputQuery) { + Y_ENSURE_EX(nodeReaderFormat != ENodeReaderFormat::Skiff, + TApiUsageError() << "Cannot use Skiff format for operations with 'input_query' in spec"); + return nullptr; + } + return CreateSkiffSchemaIfNecessary( + context, + clientRetryPolicy, + transactionId, + nodeReaderFormat, + tables, + TCreateSkiffSchemaOptions() + .HasKeySwitch(true) + .HasRangeIndex(true)); +} + +TString CreateSkiffConfig(const NSkiff::TSkiffSchemaPtr& schema) +{ + TString result; + TStringOutput stream(result); + ::NYson::TYsonWriter writer(&stream); + Serialize(schema, &writer); + return result; +} + +TString CreateProtoConfig(const TVector<const ::google::protobuf::Descriptor*>& descriptorList) +{ + TString result; + TStringOutput messageTypeList(result); + for (const auto& descriptor : descriptorList) { + messageTypeList << descriptor->full_name() << Endl; + } + return result; +} + +//////////////////////////////////////////////////////////////////////////////// + +struct TGetTableStructureDescriptionStringImpl { + template<typename T> + TString operator()(const T& description) { + if constexpr (std::is_same_v<T, TUnspecifiedTableStructure>) { + return "Unspecified"; + } else if constexpr (std::is_same_v<T, TProtobufTableStructure>) { + TString res; + TStringStream out(res); + if (description.Descriptor) { + out << description.Descriptor->full_name(); + } else { + out << "<unknown>"; + } + out << " protobuf message"; + return res; + } else { + static_assert(TDependentFalse<T>, "Unknown type"); + } + } +}; + +TString GetTableStructureDescriptionString(const TTableStructure& tableStructure) +{ + return std::visit(TGetTableStructureDescriptionStringImpl(), tableStructure); +} + +//////////////////////////////////////////////////////////////////////////////// + +TString JobTablePathString(const TStructuredJobTable& jobTable) +{ + if (jobTable.RichYPath) { + return jobTable.RichYPath->Path_; + } else { + return "<intermediate-table>"; + } +} + +TStructuredJobTableList ToStructuredJobTableList(const TVector<TStructuredTablePath>& tableList) +{ + TStructuredJobTableList result; + for (const auto& table : tableList) { + result.push_back(TStructuredJobTable{table.Description, table.RichYPath}); + } + return result; +} + +TStructuredJobTableList CanonizeStructuredTableList(const TClientContext& context, const TVector<TStructuredTablePath>& tableList) +{ + TVector<TRichYPath> toCanonize; + toCanonize.reserve(tableList.size()); + for (const auto& table : tableList) { + toCanonize.emplace_back(table.RichYPath); + } + const auto canonized = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, context, toCanonize); + Y_VERIFY(canonized.size() == tableList.size()); + + TStructuredJobTableList result; + result.reserve(tableList.size()); + for (size_t i = 0; i != canonized.size(); ++i) { + result.emplace_back(TStructuredJobTable{tableList[i].Description, canonized[i]}); + } + return result; +} + +TVector<TRichYPath> GetPathList( + const TStructuredJobTableList& tableList, + const TMaybe<TVector<TTableSchema>>& jobSchemaInferenceResult, + bool inferSchemaFromDescriptions) +{ + Y_VERIFY(!jobSchemaInferenceResult || tableList.size() == jobSchemaInferenceResult->size()); + + auto maybeInferSchema = [&] (const TStructuredJobTable& table, ui32 tableIndex) -> TMaybe<TTableSchema> { + if (jobSchemaInferenceResult && !jobSchemaInferenceResult->at(tableIndex).Empty()) { + return jobSchemaInferenceResult->at(tableIndex); + } + if (inferSchemaFromDescriptions) { + return GetTableSchema(table.Description); + } + return Nothing(); + }; + + TVector<TRichYPath> result; + result.reserve(tableList.size()); + for (size_t tableIndex = 0; tableIndex != tableList.size(); ++tableIndex) { + const auto& table = tableList[tableIndex]; + Y_VERIFY(table.RichYPath, "Cannot get path for intermediate table"); + auto richYPath = *table.RichYPath; + if (!richYPath.Schema_) { + if (auto schema = maybeInferSchema(table, tableIndex)) { + richYPath.Schema(std::move(*schema)); + } + } + + result.emplace_back(std::move(richYPath)); + } + return result; +} + + +TStructuredRowStreamDescription GetJobStreamDescription( + const IStructuredJob& job, + EIODirection direction) +{ + switch (direction) { + case EIODirection::Input: + return job.GetInputRowStreamDescription(); + case EIODirection::Output: + return job.GetOutputRowStreamDescription(); + default: + Y_FAIL("unreachable"); + } +} + +TString GetSuffix(EIODirection direction) +{ + switch (direction) { + case EIODirection::Input: + return "_input"; + case EIODirection::Output: + return "_output"; + } + Y_FAIL("unreachable"); +} + +TString GetAddIOMethodName(EIODirection direction) +{ + switch (direction) { + case EIODirection::Input: + return "AddInput<>"; + case EIODirection::Output: + return "AddOutput<>"; + } + Y_FAIL("unreachable"); +} + +//////////////////////////////////////////////////////////////////////////////// + +struct TFormatBuilder::TFormatSwitcher +{ + template <typename T> + auto operator() (const T& /*t*/) { + if constexpr (std::is_same_v<T, TTNodeStructuredRowStream>) { + return &TFormatBuilder::CreateNodeFormat; + } else if constexpr (std::is_same_v<T, TTYaMRRowStructuredRowStream>) { + return &TFormatBuilder::CreateYamrFormat; + } else if constexpr (std::is_same_v<T, TProtobufStructuredRowStream>) { + return &TFormatBuilder::CreateProtobufFormat; + } else if constexpr (std::is_same_v<T, TVoidStructuredRowStream>) { + return &TFormatBuilder::CreateVoidFormat; + } else { + static_assert(TDependentFalse<T>, "unknown stream description"); + } + } +}; + +TFormatBuilder::TFormatBuilder( + IClientRetryPolicyPtr clientRetryPolicy, + TClientContext context, + TTransactionId transactionId, + TOperationOptions operationOptions) + : ClientRetryPolicy_(std::move(clientRetryPolicy)) + , Context_(std::move(context)) + , TransactionId_(transactionId) + , OperationOptions_(std::move(operationOptions)) +{ } + +std::pair <TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateFormat( + const IStructuredJob& job, + const EIODirection& direction, + const TStructuredJobTableList& structuredTableList, + const TMaybe <TFormatHints>& formatHints, + ENodeReaderFormat nodeReaderFormat, + bool allowFormatFromTableAttribute) +{ + auto jobStreamDescription = GetJobStreamDescription(job, direction); + auto method = std::visit(TFormatSwitcher(), jobStreamDescription); + return (this->*method)( + job, + direction, + structuredTableList, + formatHints, + nodeReaderFormat, + allowFormatFromTableAttribute); +} + +std::pair<TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateVoidFormat( + const IStructuredJob& /*job*/, + const EIODirection& /*direction*/, + const TStructuredJobTableList& /*structuredTableList*/, + const TMaybe<TFormatHints>& /*formatHints*/, + ENodeReaderFormat /*nodeReaderFormat*/, + bool /*allowFormatFromTableAttribute*/) +{ + return { + TFormat(), + Nothing() + }; +} + +std::pair<TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateYamrFormat( + const IStructuredJob& job, + const EIODirection& direction, + const TStructuredJobTableList& structuredTableList, + const TMaybe<TFormatHints>& /*formatHints*/, + ENodeReaderFormat /*nodeReaderFormat*/, + bool allowFormatFromTableAttribute) +{ + for (const auto& table: structuredTableList) { + if (!std::holds_alternative<TUnspecifiedTableStructure>(table.Description)) { + ythrow TApiUsageError() + << "cannot use " << direction << " table '" << JobTablePathString(table) + << "' with job " << TJobFactory::Get()->GetJobName(&job) << "; " + << "table has unsupported structure description; check " << GetAddIOMethodName(direction) << " for this table"; + } + } + TMaybe<TNode> formatFromTableAttributes; + if (allowFormatFromTableAttribute && OperationOptions_.UseTableFormats_) { + TVector<TRichYPath> tableList; + for (const auto& table: structuredTableList) { + Y_VERIFY(table.RichYPath, "Cannot use format from table for intermediate table"); + tableList.push_back(*table.RichYPath); + } + formatFromTableAttributes = GetTableFormats(ClientRetryPolicy_, Context_, TransactionId_, tableList); + } + if (formatFromTableAttributes) { + return { + TFormat(*formatFromTableAttributes), + Nothing() + }; + } else { + auto formatNode = TNode("yamr"); + formatNode.Attributes() = TNode() + ("lenval", true) + ("has_subkey", true) + ("enable_table_index", true); + return { + TFormat(formatNode), + Nothing() + }; + } +} + +std::pair<TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateNodeFormat( + const IStructuredJob& job, + const EIODirection& direction, + const TStructuredJobTableList& structuredTableList, + const TMaybe<TFormatHints>& formatHints, + ENodeReaderFormat nodeReaderFormat, + bool /*allowFormatFromTableAttribute*/) +{ + for (const auto& table: structuredTableList) { + if (!std::holds_alternative<TUnspecifiedTableStructure>(table.Description)) { + ythrow TApiUsageError() + << "cannot use " << direction << " table '" << JobTablePathString(table) + << "' with job " << TJobFactory::Get()->GetJobName(&job) << "; " + << "table has unsupported structure description; check AddInput<> / AddOutput<> for this table"; + } + } + NSkiff::TSkiffSchemaPtr skiffSchema = nullptr; + if (nodeReaderFormat != ENodeReaderFormat::Yson) { + TVector<TRichYPath> tableList; + for (const auto& table: structuredTableList) { + Y_VERIFY(table.RichYPath, "Cannot use skiff with temporary tables"); + tableList.emplace_back(*table.RichYPath); + } + skiffSchema = TryCreateSkiffSchema( + Context_, + ClientRetryPolicy_, + TransactionId_, + tableList, + OperationOptions_, + nodeReaderFormat); + } + if (skiffSchema) { + auto format = CreateSkiffFormat(skiffSchema); + NYT::NDetail::ApplyFormatHints<TNode>(&format, formatHints); + return { + CreateSkiffFormat(skiffSchema), + TSmallJobFile{ + TString("skiff") + GetSuffix(direction), + CreateSkiffConfig(skiffSchema) + } + }; + } else { + auto format = TFormat::YsonBinary(); + NYT::NDetail::ApplyFormatHints<TNode>(&format, formatHints); + return { + format, + Nothing() + }; + } +} + +[[noreturn]] static void ThrowUnsupportedStructureDescription( + const EIODirection& direction, + const TStructuredJobTable& table, + const IStructuredJob& job) +{ + ythrow TApiUsageError() + << "cannot use " << direction << " table '" << JobTablePathString(table) + << "' with job " << TJobFactory::Get()->GetJobName(&job) << "; " + << "table has unsupported structure description; check " << GetAddIOMethodName(direction) << " for this table"; +} + +[[noreturn]] static void ThrowTypeDeriveFail( + const EIODirection& direction, + const IStructuredJob& job, + const TString& type) +{ + ythrow TApiUsageError() + << "Cannot derive exact " << type << " type for intermediate " << direction << " table for job " + << TJobFactory::Get()->GetJobName(&job) + << "; use one of TMapReduceOperationSpec::Hint* methods to specifiy intermediate table structure"; +} + +[[noreturn]] static void ThrowUnexpectedDifferentDescriptors( + const EIODirection& direction, + const TStructuredJobTable& table, + const IStructuredJob& job, + const TMaybe<TStringBuf> jobDescriptorName, + const TMaybe<TStringBuf> descriptorName) +{ + ythrow TApiUsageError() + << "Job " << TJobFactory::Get()->GetJobName(&job) << " expects " + << jobDescriptorName << " as " << direction << ", but table " << JobTablePathString(table) + << " is tagged with " << descriptorName; +} + +std::pair<TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateProtobufFormat( + const IStructuredJob& job, + const EIODirection& direction, + const TStructuredJobTableList& structuredTableList, + const TMaybe<TFormatHints>& /*formatHints*/, + ENodeReaderFormat /*nodeReaderFormat*/, + bool /*allowFormatFromTableAttribute*/) +{ + if (Context_.Config->UseClientProtobuf) { + return { + TFormat::YsonBinary(), + TSmallJobFile{ + TString("proto") + GetSuffix(direction), + CreateProtoConfig({}), + }, + }; + } + const ::google::protobuf::Descriptor* const jobDescriptor = + std::get<TProtobufStructuredRowStream>(GetJobStreamDescription(job, direction)).Descriptor; + Y_ENSURE(!structuredTableList.empty(), + "empty " << direction << " tables for job " << TJobFactory::Get()->GetJobName(&job)); + + TVector<const ::google::protobuf::Descriptor*> descriptorList; + for (const auto& table : structuredTableList) { + const ::google::protobuf::Descriptor* descriptor = nullptr; + if (std::holds_alternative<TProtobufTableStructure>(table.Description)) { + descriptor = std::get<TProtobufTableStructure>(table.Description).Descriptor; + } else if (table.RichYPath) { + ThrowUnsupportedStructureDescription(direction, table, job); + } + if (!descriptor) { + // It must be intermediate table, because there is no proper way to add such table to spec + // (AddInput requires to specify proper message). + Y_VERIFY(!table.RichYPath, "Descriptors for all tables except intermediate must be known"); + if (jobDescriptor) { + descriptor = jobDescriptor; + } else { + ThrowTypeDeriveFail(direction, job, "protobuf"); + } + } + if (jobDescriptor && descriptor != jobDescriptor) { + ThrowUnexpectedDifferentDescriptors( + direction, + table, + job, + jobDescriptor->full_name(), + descriptor->full_name()); + } + descriptorList.push_back(descriptor); + } + Y_VERIFY(!descriptorList.empty(), "Messages for proto format are unknown (empty ProtoDescriptors)"); + return { + TFormat::Protobuf(descriptorList, Context_.Config->ProtobufFormatWithDescriptors), + TSmallJobFile{ + TString("proto") + GetSuffix(direction), + CreateProtoConfig(descriptorList) + }, + }; +} + +//////////////////////////////////////////////////////////////////////////////// + +struct TGetTableSchemaImpl +{ + template <typename T> + TMaybe<TTableSchema> operator() (const T& description) { + if constexpr (std::is_same_v<T, TUnspecifiedTableStructure>) { + return Nothing(); + } else if constexpr (std::is_same_v<T, TProtobufTableStructure>) { + if (!description.Descriptor) { + return Nothing(); + } + return CreateTableSchema(*description.Descriptor); + } else { + static_assert(TDependentFalse<T>, "unknown type"); + } + } +}; + +TMaybe<TTableSchema> GetTableSchema(const TTableStructure& tableStructure) +{ + return std::visit(TGetTableSchemaImpl(), tableStructure); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NDetail +} // namespace NYT |