aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/client/structured_table_formats.cpp
diff options
context:
space:
mode:
authormax42 <max42@yandex-team.com>2023-06-30 03:37:03 +0300
committermax42 <max42@yandex-team.com>2023-06-30 03:37:03 +0300
commitfac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a (patch)
treeb8cbc1deb00309c7f1a7ab6df520a76cf0b5c6d7 /yt/cpp/mapreduce/client/structured_table_formats.cpp
parent7bf166b1a7ed0af927f230022b245af618e998c1 (diff)
downloadydb-fac2bd72b4b31ec3238292caf8fb2a8aaa6d6c4a.tar.gz
YT-19324: move YT provider to ydb/library/yql
This commit is formed by the following script: https://paste.yandex-team.ru/6f92e4b8-efc5-4d34-948b-15ee2accd7e7/text. This commit has zero effect on all projects that depend on YQL. The summary of changes: - `yql/providers/yt -> ydb/library/yql/providers/yt `- the whole implementation of YT provider is moved into YDB code base for further export as a part of YT YQL plugin shared library; - `yql/providers/stat/{expr_nodes,uploader} -> ydb/library/yql/providers/stat/{expr_nodes,uploader}` - a small interface without implementation and the description of stat expr nodes; - `yql/core/extract_predicate/ut -> ydb/library/yql/core/extract_predicate/ut`; - `yql/core/{ut,ut_common} -> ydb/library/yql/core/{ut,ut_common}`; - `yql/core` is gone; - `yql/library/url_preprocessing -> ydb/library/yql/core/url_preprocessing`. **NB**: all new targets inside `ydb/` are under `IF (NOT CMAKE_EXPORT)` clause which disables them from open-source cmake generation and ya make build. They will be enabled in the subsequent commits.
Diffstat (limited to 'yt/cpp/mapreduce/client/structured_table_formats.cpp')
-rw-r--r--yt/cpp/mapreduce/client/structured_table_formats.cpp572
1 files changed, 572 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/client/structured_table_formats.cpp b/yt/cpp/mapreduce/client/structured_table_formats.cpp
new file mode 100644
index 0000000000..b6e82c6c15
--- /dev/null
+++ b/yt/cpp/mapreduce/client/structured_table_formats.cpp
@@ -0,0 +1,572 @@
+#include "structured_table_formats.h"
+
+#include "format_hints.h"
+#include "skiff.h"
+
+#include <yt/cpp/mapreduce/common/retry_lib.h>
+
+#include <yt/cpp/mapreduce/io/yamr_table_reader.h>
+
+#include <yt/cpp/mapreduce/library/table_schema/protobuf.h>
+
+#include <yt/cpp/mapreduce/interface/common.h>
+
+#include <yt/cpp/mapreduce/raw_client/raw_requests.h>
+
+#include <library/cpp/type_info/type_info.h>
+#include <library/cpp/yson/writer.h>
+
+#include <memory>
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+TMaybe<TNode> GetCommonTableFormat(
+ const TVector<TMaybe<TNode>>& formats)
+{
+ TMaybe<TNode> result;
+ bool start = true;
+ for (auto& format : formats) {
+ if (start) {
+ result = format;
+ start = false;
+ continue;
+ }
+
+ if (result.Defined() != format.Defined()) {
+ ythrow yexception() << "Different formats of input tables";
+ }
+
+ if (!result.Defined()) {
+ continue;
+ }
+
+ auto& resultAttrs = result.Get()->GetAttributes();
+ auto& formatAttrs = format.Get()->GetAttributes();
+
+ if (resultAttrs["key_column_names"] != formatAttrs["key_column_names"]) {
+ ythrow yexception() << "Different formats of input tables";
+ }
+
+ bool hasSubkeyColumns = resultAttrs.HasKey("subkey_column_names");
+ if (hasSubkeyColumns != formatAttrs.HasKey("subkey_column_names")) {
+ ythrow yexception() << "Different formats of input tables";
+ }
+
+ if (hasSubkeyColumns &&
+ resultAttrs["subkey_column_names"] != formatAttrs["subkey_column_names"])
+ {
+ ythrow yexception() << "Different formats of input tables";
+ }
+ }
+
+ return result;
+}
+
+TMaybe<TNode> GetTableFormat(
+ const IClientRetryPolicyPtr& retryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TRichYPath& path)
+{
+ auto formatPath = path.Path_ + "/@_format";
+ if (!NDetail::NRawClient::Exists(retryPolicy->CreatePolicyForGenericRequest(), context, transactionId, formatPath)) {
+ return TMaybe<TNode>();
+ }
+ TMaybe<TNode> format = NDetail::NRawClient::Get(retryPolicy->CreatePolicyForGenericRequest(), context, transactionId, formatPath);
+ if (format.Get()->AsString() != "yamred_dsv") {
+ return TMaybe<TNode>();
+ }
+ auto& formatAttrs = format.Get()->Attributes();
+ if (!formatAttrs.HasKey("key_column_names")) {
+ ythrow yexception() <<
+ "Table '" << path.Path_ << "': attribute 'key_column_names' is missing";
+ }
+ formatAttrs["has_subkey"] = "true";
+ formatAttrs["lenval"] = "true";
+ return format;
+}
+
+TMaybe<TNode> GetTableFormats(
+ const IClientRetryPolicyPtr& clientRetryPolicy,
+ const TClientContext& context,
+ const TTransactionId& transactionId,
+ const TVector<TRichYPath>& inputs)
+{
+ TVector<TMaybe<TNode>> formats;
+ for (auto& table : inputs) {
+ formats.push_back(GetTableFormat(clientRetryPolicy, context, transactionId, table));
+ }
+
+ return GetCommonTableFormat(formats);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+namespace NDetail {
+
+////////////////////////////////////////////////////////////////////////////////
+
+NSkiff::TSkiffSchemaPtr TryCreateSkiffSchema(
+ const TClientContext& context,
+ const IClientRetryPolicyPtr& clientRetryPolicy,
+ const TTransactionId& transactionId,
+ const TVector<TRichYPath>& tables,
+ const TOperationOptions& options,
+ ENodeReaderFormat nodeReaderFormat)
+{
+ bool hasInputQuery = options.Spec_.Defined() && options.Spec_->IsMap() && options.Spec_->HasKey("input_query");
+ if (hasInputQuery) {
+ Y_ENSURE_EX(nodeReaderFormat != ENodeReaderFormat::Skiff,
+ TApiUsageError() << "Cannot use Skiff format for operations with 'input_query' in spec");
+ return nullptr;
+ }
+ return CreateSkiffSchemaIfNecessary(
+ context,
+ clientRetryPolicy,
+ transactionId,
+ nodeReaderFormat,
+ tables,
+ TCreateSkiffSchemaOptions()
+ .HasKeySwitch(true)
+ .HasRangeIndex(true));
+}
+
+TString CreateSkiffConfig(const NSkiff::TSkiffSchemaPtr& schema)
+{
+ TString result;
+ TStringOutput stream(result);
+ ::NYson::TYsonWriter writer(&stream);
+ Serialize(schema, &writer);
+ return result;
+}
+
+TString CreateProtoConfig(const TVector<const ::google::protobuf::Descriptor*>& descriptorList)
+{
+ TString result;
+ TStringOutput messageTypeList(result);
+ for (const auto& descriptor : descriptorList) {
+ messageTypeList << descriptor->full_name() << Endl;
+ }
+ return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TGetTableStructureDescriptionStringImpl {
+ template<typename T>
+ TString operator()(const T& description) {
+ if constexpr (std::is_same_v<T, TUnspecifiedTableStructure>) {
+ return "Unspecified";
+ } else if constexpr (std::is_same_v<T, TProtobufTableStructure>) {
+ TString res;
+ TStringStream out(res);
+ if (description.Descriptor) {
+ out << description.Descriptor->full_name();
+ } else {
+ out << "<unknown>";
+ }
+ out << " protobuf message";
+ return res;
+ } else {
+ static_assert(TDependentFalse<T>, "Unknown type");
+ }
+ }
+};
+
+TString GetTableStructureDescriptionString(const TTableStructure& tableStructure)
+{
+ return std::visit(TGetTableStructureDescriptionStringImpl(), tableStructure);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TString JobTablePathString(const TStructuredJobTable& jobTable)
+{
+ if (jobTable.RichYPath) {
+ return jobTable.RichYPath->Path_;
+ } else {
+ return "<intermediate-table>";
+ }
+}
+
+TStructuredJobTableList ToStructuredJobTableList(const TVector<TStructuredTablePath>& tableList)
+{
+ TStructuredJobTableList result;
+ for (const auto& table : tableList) {
+ result.push_back(TStructuredJobTable{table.Description, table.RichYPath});
+ }
+ return result;
+}
+
+TStructuredJobTableList CanonizeStructuredTableList(const TClientContext& context, const TVector<TStructuredTablePath>& tableList)
+{
+ TVector<TRichYPath> toCanonize;
+ toCanonize.reserve(tableList.size());
+ for (const auto& table : tableList) {
+ toCanonize.emplace_back(table.RichYPath);
+ }
+ const auto canonized = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, context, toCanonize);
+ Y_VERIFY(canonized.size() == tableList.size());
+
+ TStructuredJobTableList result;
+ result.reserve(tableList.size());
+ for (size_t i = 0; i != canonized.size(); ++i) {
+ result.emplace_back(TStructuredJobTable{tableList[i].Description, canonized[i]});
+ }
+ return result;
+}
+
+TVector<TRichYPath> GetPathList(
+ const TStructuredJobTableList& tableList,
+ const TMaybe<TVector<TTableSchema>>& jobSchemaInferenceResult,
+ bool inferSchemaFromDescriptions)
+{
+ Y_VERIFY(!jobSchemaInferenceResult || tableList.size() == jobSchemaInferenceResult->size());
+
+ auto maybeInferSchema = [&] (const TStructuredJobTable& table, ui32 tableIndex) -> TMaybe<TTableSchema> {
+ if (jobSchemaInferenceResult && !jobSchemaInferenceResult->at(tableIndex).Empty()) {
+ return jobSchemaInferenceResult->at(tableIndex);
+ }
+ if (inferSchemaFromDescriptions) {
+ return GetTableSchema(table.Description);
+ }
+ return Nothing();
+ };
+
+ TVector<TRichYPath> result;
+ result.reserve(tableList.size());
+ for (size_t tableIndex = 0; tableIndex != tableList.size(); ++tableIndex) {
+ const auto& table = tableList[tableIndex];
+ Y_VERIFY(table.RichYPath, "Cannot get path for intermediate table");
+ auto richYPath = *table.RichYPath;
+ if (!richYPath.Schema_) {
+ if (auto schema = maybeInferSchema(table, tableIndex)) {
+ richYPath.Schema(std::move(*schema));
+ }
+ }
+
+ result.emplace_back(std::move(richYPath));
+ }
+ return result;
+}
+
+
+TStructuredRowStreamDescription GetJobStreamDescription(
+ const IStructuredJob& job,
+ EIODirection direction)
+{
+ switch (direction) {
+ case EIODirection::Input:
+ return job.GetInputRowStreamDescription();
+ case EIODirection::Output:
+ return job.GetOutputRowStreamDescription();
+ default:
+ Y_FAIL("unreachable");
+ }
+}
+
+TString GetSuffix(EIODirection direction)
+{
+ switch (direction) {
+ case EIODirection::Input:
+ return "_input";
+ case EIODirection::Output:
+ return "_output";
+ }
+ Y_FAIL("unreachable");
+}
+
+TString GetAddIOMethodName(EIODirection direction)
+{
+ switch (direction) {
+ case EIODirection::Input:
+ return "AddInput<>";
+ case EIODirection::Output:
+ return "AddOutput<>";
+ }
+ Y_FAIL("unreachable");
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TFormatBuilder::TFormatSwitcher
+{
+ template <typename T>
+ auto operator() (const T& /*t*/) {
+ if constexpr (std::is_same_v<T, TTNodeStructuredRowStream>) {
+ return &TFormatBuilder::CreateNodeFormat;
+ } else if constexpr (std::is_same_v<T, TTYaMRRowStructuredRowStream>) {
+ return &TFormatBuilder::CreateYamrFormat;
+ } else if constexpr (std::is_same_v<T, TProtobufStructuredRowStream>) {
+ return &TFormatBuilder::CreateProtobufFormat;
+ } else if constexpr (std::is_same_v<T, TVoidStructuredRowStream>) {
+ return &TFormatBuilder::CreateVoidFormat;
+ } else {
+ static_assert(TDependentFalse<T>, "unknown stream description");
+ }
+ }
+};
+
+TFormatBuilder::TFormatBuilder(
+ IClientRetryPolicyPtr clientRetryPolicy,
+ TClientContext context,
+ TTransactionId transactionId,
+ TOperationOptions operationOptions)
+ : ClientRetryPolicy_(std::move(clientRetryPolicy))
+ , Context_(std::move(context))
+ , TransactionId_(transactionId)
+ , OperationOptions_(std::move(operationOptions))
+{ }
+
+std::pair <TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateFormat(
+ const IStructuredJob& job,
+ const EIODirection& direction,
+ const TStructuredJobTableList& structuredTableList,
+ const TMaybe <TFormatHints>& formatHints,
+ ENodeReaderFormat nodeReaderFormat,
+ bool allowFormatFromTableAttribute)
+{
+ auto jobStreamDescription = GetJobStreamDescription(job, direction);
+ auto method = std::visit(TFormatSwitcher(), jobStreamDescription);
+ return (this->*method)(
+ job,
+ direction,
+ structuredTableList,
+ formatHints,
+ nodeReaderFormat,
+ allowFormatFromTableAttribute);
+}
+
+std::pair<TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateVoidFormat(
+ const IStructuredJob& /*job*/,
+ const EIODirection& /*direction*/,
+ const TStructuredJobTableList& /*structuredTableList*/,
+ const TMaybe<TFormatHints>& /*formatHints*/,
+ ENodeReaderFormat /*nodeReaderFormat*/,
+ bool /*allowFormatFromTableAttribute*/)
+{
+ return {
+ TFormat(),
+ Nothing()
+ };
+}
+
+std::pair<TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateYamrFormat(
+ const IStructuredJob& job,
+ const EIODirection& direction,
+ const TStructuredJobTableList& structuredTableList,
+ const TMaybe<TFormatHints>& /*formatHints*/,
+ ENodeReaderFormat /*nodeReaderFormat*/,
+ bool allowFormatFromTableAttribute)
+{
+ for (const auto& table: structuredTableList) {
+ if (!std::holds_alternative<TUnspecifiedTableStructure>(table.Description)) {
+ ythrow TApiUsageError()
+ << "cannot use " << direction << " table '" << JobTablePathString(table)
+ << "' with job " << TJobFactory::Get()->GetJobName(&job) << "; "
+ << "table has unsupported structure description; check " << GetAddIOMethodName(direction) << " for this table";
+ }
+ }
+ TMaybe<TNode> formatFromTableAttributes;
+ if (allowFormatFromTableAttribute && OperationOptions_.UseTableFormats_) {
+ TVector<TRichYPath> tableList;
+ for (const auto& table: structuredTableList) {
+ Y_VERIFY(table.RichYPath, "Cannot use format from table for intermediate table");
+ tableList.push_back(*table.RichYPath);
+ }
+ formatFromTableAttributes = GetTableFormats(ClientRetryPolicy_, Context_, TransactionId_, tableList);
+ }
+ if (formatFromTableAttributes) {
+ return {
+ TFormat(*formatFromTableAttributes),
+ Nothing()
+ };
+ } else {
+ auto formatNode = TNode("yamr");
+ formatNode.Attributes() = TNode()
+ ("lenval", true)
+ ("has_subkey", true)
+ ("enable_table_index", true);
+ return {
+ TFormat(formatNode),
+ Nothing()
+ };
+ }
+}
+
+std::pair<TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateNodeFormat(
+ const IStructuredJob& job,
+ const EIODirection& direction,
+ const TStructuredJobTableList& structuredTableList,
+ const TMaybe<TFormatHints>& formatHints,
+ ENodeReaderFormat nodeReaderFormat,
+ bool /*allowFormatFromTableAttribute*/)
+{
+ for (const auto& table: structuredTableList) {
+ if (!std::holds_alternative<TUnspecifiedTableStructure>(table.Description)) {
+ ythrow TApiUsageError()
+ << "cannot use " << direction << " table '" << JobTablePathString(table)
+ << "' with job " << TJobFactory::Get()->GetJobName(&job) << "; "
+ << "table has unsupported structure description; check AddInput<> / AddOutput<> for this table";
+ }
+ }
+ NSkiff::TSkiffSchemaPtr skiffSchema = nullptr;
+ if (nodeReaderFormat != ENodeReaderFormat::Yson) {
+ TVector<TRichYPath> tableList;
+ for (const auto& table: structuredTableList) {
+ Y_VERIFY(table.RichYPath, "Cannot use skiff with temporary tables");
+ tableList.emplace_back(*table.RichYPath);
+ }
+ skiffSchema = TryCreateSkiffSchema(
+ Context_,
+ ClientRetryPolicy_,
+ TransactionId_,
+ tableList,
+ OperationOptions_,
+ nodeReaderFormat);
+ }
+ if (skiffSchema) {
+ auto format = CreateSkiffFormat(skiffSchema);
+ NYT::NDetail::ApplyFormatHints<TNode>(&format, formatHints);
+ return {
+ CreateSkiffFormat(skiffSchema),
+ TSmallJobFile{
+ TString("skiff") + GetSuffix(direction),
+ CreateSkiffConfig(skiffSchema)
+ }
+ };
+ } else {
+ auto format = TFormat::YsonBinary();
+ NYT::NDetail::ApplyFormatHints<TNode>(&format, formatHints);
+ return {
+ format,
+ Nothing()
+ };
+ }
+}
+
+[[noreturn]] static void ThrowUnsupportedStructureDescription(
+ const EIODirection& direction,
+ const TStructuredJobTable& table,
+ const IStructuredJob& job)
+{
+ ythrow TApiUsageError()
+ << "cannot use " << direction << " table '" << JobTablePathString(table)
+ << "' with job " << TJobFactory::Get()->GetJobName(&job) << "; "
+ << "table has unsupported structure description; check " << GetAddIOMethodName(direction) << " for this table";
+}
+
+[[noreturn]] static void ThrowTypeDeriveFail(
+ const EIODirection& direction,
+ const IStructuredJob& job,
+ const TString& type)
+{
+ ythrow TApiUsageError()
+ << "Cannot derive exact " << type << " type for intermediate " << direction << " table for job "
+ << TJobFactory::Get()->GetJobName(&job)
+ << "; use one of TMapReduceOperationSpec::Hint* methods to specifiy intermediate table structure";
+}
+
+[[noreturn]] static void ThrowUnexpectedDifferentDescriptors(
+ const EIODirection& direction,
+ const TStructuredJobTable& table,
+ const IStructuredJob& job,
+ const TMaybe<TStringBuf> jobDescriptorName,
+ const TMaybe<TStringBuf> descriptorName)
+{
+ ythrow TApiUsageError()
+ << "Job " << TJobFactory::Get()->GetJobName(&job) << " expects "
+ << jobDescriptorName << " as " << direction << ", but table " << JobTablePathString(table)
+ << " is tagged with " << descriptorName;
+}
+
+std::pair<TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateProtobufFormat(
+ const IStructuredJob& job,
+ const EIODirection& direction,
+ const TStructuredJobTableList& structuredTableList,
+ const TMaybe<TFormatHints>& /*formatHints*/,
+ ENodeReaderFormat /*nodeReaderFormat*/,
+ bool /*allowFormatFromTableAttribute*/)
+{
+ if (Context_.Config->UseClientProtobuf) {
+ return {
+ TFormat::YsonBinary(),
+ TSmallJobFile{
+ TString("proto") + GetSuffix(direction),
+ CreateProtoConfig({}),
+ },
+ };
+ }
+ const ::google::protobuf::Descriptor* const jobDescriptor =
+ std::get<TProtobufStructuredRowStream>(GetJobStreamDescription(job, direction)).Descriptor;
+ Y_ENSURE(!structuredTableList.empty(),
+ "empty " << direction << " tables for job " << TJobFactory::Get()->GetJobName(&job));
+
+ TVector<const ::google::protobuf::Descriptor*> descriptorList;
+ for (const auto& table : structuredTableList) {
+ const ::google::protobuf::Descriptor* descriptor = nullptr;
+ if (std::holds_alternative<TProtobufTableStructure>(table.Description)) {
+ descriptor = std::get<TProtobufTableStructure>(table.Description).Descriptor;
+ } else if (table.RichYPath) {
+ ThrowUnsupportedStructureDescription(direction, table, job);
+ }
+ if (!descriptor) {
+ // It must be intermediate table, because there is no proper way to add such table to spec
+ // (AddInput requires to specify proper message).
+ Y_VERIFY(!table.RichYPath, "Descriptors for all tables except intermediate must be known");
+ if (jobDescriptor) {
+ descriptor = jobDescriptor;
+ } else {
+ ThrowTypeDeriveFail(direction, job, "protobuf");
+ }
+ }
+ if (jobDescriptor && descriptor != jobDescriptor) {
+ ThrowUnexpectedDifferentDescriptors(
+ direction,
+ table,
+ job,
+ jobDescriptor->full_name(),
+ descriptor->full_name());
+ }
+ descriptorList.push_back(descriptor);
+ }
+ Y_VERIFY(!descriptorList.empty(), "Messages for proto format are unknown (empty ProtoDescriptors)");
+ return {
+ TFormat::Protobuf(descriptorList, Context_.Config->ProtobufFormatWithDescriptors),
+ TSmallJobFile{
+ TString("proto") + GetSuffix(direction),
+ CreateProtoConfig(descriptorList)
+ },
+ };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct TGetTableSchemaImpl
+{
+ template <typename T>
+ TMaybe<TTableSchema> operator() (const T& description) {
+ if constexpr (std::is_same_v<T, TUnspecifiedTableStructure>) {
+ return Nothing();
+ } else if constexpr (std::is_same_v<T, TProtobufTableStructure>) {
+ if (!description.Descriptor) {
+ return Nothing();
+ }
+ return CreateTableSchema(*description.Descriptor);
+ } else {
+ static_assert(TDependentFalse<T>, "unknown type");
+ }
+ }
+};
+
+TMaybe<TTableSchema> GetTableSchema(const TTableStructure& tableStructure)
+{
+ return std::visit(TGetTableSchemaImpl(), tableStructure);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NDetail
+} // namespace NYT