#include "structured_table_formats.h"
#include "format_hints.h"
#include "skiff.h"
#include <yt/cpp/mapreduce/common/retry_lib.h>
#include <yt/cpp/mapreduce/io/yamr_table_reader.h>
#include <yt/cpp/mapreduce/library/table_schema/protobuf.h>
#include <yt/cpp/mapreduce/interface/common.h>
#include <yt/cpp/mapreduce/raw_client/raw_requests.h>
#include <library/cpp/type_info/type_info.h>
#include <library/cpp/yson/writer.h>
#include <memory>
namespace NYT {
////////////////////////////////////////////////////////////////////////////////
TMaybe<TNode> GetCommonTableFormat(
const TVector<TMaybe<TNode>>& formats)
{
TMaybe<TNode> result;
bool start = true;
for (auto& format : formats) {
if (start) {
result = format;
start = false;
continue;
}
if (result.Defined() != format.Defined()) {
ythrow yexception() << "Different formats of input tables";
}
if (!result.Defined()) {
continue;
}
auto& resultAttrs = result.Get()->GetAttributes();
auto& formatAttrs = format.Get()->GetAttributes();
if (resultAttrs["key_column_names"] != formatAttrs["key_column_names"]) {
ythrow yexception() << "Different formats of input tables";
}
bool hasSubkeyColumns = resultAttrs.HasKey("subkey_column_names");
if (hasSubkeyColumns != formatAttrs.HasKey("subkey_column_names")) {
ythrow yexception() << "Different formats of input tables";
}
if (hasSubkeyColumns &&
resultAttrs["subkey_column_names"] != formatAttrs["subkey_column_names"])
{
ythrow yexception() << "Different formats of input tables";
}
}
return result;
}
TMaybe<TNode> GetTableFormat(
const IClientRetryPolicyPtr& retryPolicy,
const TClientContext& context,
const TTransactionId& transactionId,
const TRichYPath& path)
{
auto formatPath = path.Path_ + "/@_format";
if (!NDetail::NRawClient::Exists(retryPolicy->CreatePolicyForGenericRequest(), context, transactionId, formatPath)) {
return TMaybe<TNode>();
}
TMaybe<TNode> format = NDetail::NRawClient::Get(retryPolicy->CreatePolicyForGenericRequest(), context, transactionId, formatPath);
if (format.Get()->AsString() != "yamred_dsv") {
return TMaybe<TNode>();
}
auto& formatAttrs = format.Get()->Attributes();
if (!formatAttrs.HasKey("key_column_names")) {
ythrow yexception() <<
"Table '" << path.Path_ << "': attribute 'key_column_names' is missing";
}
formatAttrs["has_subkey"] = "true";
formatAttrs["lenval"] = "true";
return format;
}
TMaybe<TNode> GetTableFormats(
const IClientRetryPolicyPtr& clientRetryPolicy,
const TClientContext& context,
const TTransactionId& transactionId,
const TVector<TRichYPath>& inputs)
{
TVector<TMaybe<TNode>> formats;
for (auto& table : inputs) {
formats.push_back(GetTableFormat(clientRetryPolicy, context, transactionId, table));
}
return GetCommonTableFormat(formats);
}
////////////////////////////////////////////////////////////////////////////////
namespace NDetail {
////////////////////////////////////////////////////////////////////////////////
NSkiff::TSkiffSchemaPtr TryCreateSkiffSchema(
const TClientContext& context,
const IClientRetryPolicyPtr& clientRetryPolicy,
const TTransactionId& transactionId,
const TVector<TRichYPath>& tables,
const TOperationOptions& options,
ENodeReaderFormat nodeReaderFormat)
{
bool hasInputQuery = options.Spec_.Defined() && options.Spec_->IsMap() && options.Spec_->HasKey("input_query");
if (hasInputQuery) {
Y_ENSURE_EX(nodeReaderFormat != ENodeReaderFormat::Skiff,
TApiUsageError() << "Cannot use Skiff format for operations with 'input_query' in spec");
return nullptr;
}
return CreateSkiffSchemaIfNecessary(
context,
clientRetryPolicy,
transactionId,
nodeReaderFormat,
tables,
TCreateSkiffSchemaOptions()
.HasKeySwitch(true)
.HasRangeIndex(true));
}
TString CreateSkiffConfig(const NSkiff::TSkiffSchemaPtr& schema)
{
TString result;
TStringOutput stream(result);
::NYson::TYsonWriter writer(&stream);
Serialize(schema, &writer);
return result;
}
TString CreateProtoConfig(const TVector<const ::google::protobuf::Descriptor*>& descriptorList)
{
TString result;
TStringOutput messageTypeList(result);
for (const auto& descriptor : descriptorList) {
messageTypeList << descriptor->full_name() << Endl;
}
return result;
}
////////////////////////////////////////////////////////////////////////////////
struct TGetTableStructureDescriptionStringImpl {
template<typename T>
TString operator()(const T& description) {
if constexpr (std::is_same_v<T, TUnspecifiedTableStructure>) {
return "Unspecified";
} else if constexpr (std::is_same_v<T, TProtobufTableStructure>) {
TString res;
TStringStream out(res);
if (description.Descriptor) {
out << description.Descriptor->full_name();
} else {
out << "<unknown>";
}
out << " protobuf message";
return res;
} else {
static_assert(TDependentFalse<T>, "Unknown type");
}
}
};
TString GetTableStructureDescriptionString(const TTableStructure& tableStructure)
{
return std::visit(TGetTableStructureDescriptionStringImpl(), tableStructure);
}
////////////////////////////////////////////////////////////////////////////////
TString JobTablePathString(const TStructuredJobTable& jobTable)
{
if (jobTable.RichYPath) {
return jobTable.RichYPath->Path_;
} else {
return "<intermediate-table>";
}
}
TStructuredJobTableList ToStructuredJobTableList(const TVector<TStructuredTablePath>& tableList)
{
TStructuredJobTableList result;
for (const auto& table : tableList) {
result.push_back(TStructuredJobTable{table.Description, table.RichYPath});
}
return result;
}
TStructuredJobTableList CanonizeStructuredTableList(const TClientContext& context, const TVector<TStructuredTablePath>& tableList)
{
TVector<TRichYPath> toCanonize;
toCanonize.reserve(tableList.size());
for (const auto& table : tableList) {
toCanonize.emplace_back(table.RichYPath);
}
const auto canonized = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, context, toCanonize);
Y_VERIFY(canonized.size() == tableList.size());
TStructuredJobTableList result;
result.reserve(tableList.size());
for (size_t i = 0; i != canonized.size(); ++i) {
result.emplace_back(TStructuredJobTable{tableList[i].Description, canonized[i]});
}
return result;
}
TVector<TRichYPath> GetPathList(
const TStructuredJobTableList& tableList,
const TMaybe<TVector<TTableSchema>>& jobSchemaInferenceResult,
bool inferSchemaFromDescriptions)
{
Y_VERIFY(!jobSchemaInferenceResult || tableList.size() == jobSchemaInferenceResult->size());
auto maybeInferSchema = [&] (const TStructuredJobTable& table, ui32 tableIndex) -> TMaybe<TTableSchema> {
if (jobSchemaInferenceResult && !jobSchemaInferenceResult->at(tableIndex).Empty()) {
return jobSchemaInferenceResult->at(tableIndex);
}
if (inferSchemaFromDescriptions) {
return GetTableSchema(table.Description);
}
return Nothing();
};
TVector<TRichYPath> result;
result.reserve(tableList.size());
for (size_t tableIndex = 0; tableIndex != tableList.size(); ++tableIndex) {
const auto& table = tableList[tableIndex];
Y_VERIFY(table.RichYPath, "Cannot get path for intermediate table");
auto richYPath = *table.RichYPath;
if (!richYPath.Schema_) {
if (auto schema = maybeInferSchema(table, tableIndex)) {
richYPath.Schema(std::move(*schema));
}
}
result.emplace_back(std::move(richYPath));
}
return result;
}
TStructuredRowStreamDescription GetJobStreamDescription(
const IStructuredJob& job,
EIODirection direction)
{
switch (direction) {
case EIODirection::Input:
return job.GetInputRowStreamDescription();
case EIODirection::Output:
return job.GetOutputRowStreamDescription();
default:
Y_FAIL("unreachable");
}
}
TString GetSuffix(EIODirection direction)
{
switch (direction) {
case EIODirection::Input:
return "_input";
case EIODirection::Output:
return "_output";
}
Y_FAIL("unreachable");
}
TString GetAddIOMethodName(EIODirection direction)
{
switch (direction) {
case EIODirection::Input:
return "AddInput<>";
case EIODirection::Output:
return "AddOutput<>";
}
Y_FAIL("unreachable");
}
////////////////////////////////////////////////////////////////////////////////
struct TFormatBuilder::TFormatSwitcher
{
template <typename T>
auto operator() (const T& /*t*/) {
if constexpr (std::is_same_v<T, TTNodeStructuredRowStream>) {
return &TFormatBuilder::CreateNodeFormat;
} else if constexpr (std::is_same_v<T, TTYaMRRowStructuredRowStream>) {
return &TFormatBuilder::CreateYamrFormat;
} else if constexpr (std::is_same_v<T, TProtobufStructuredRowStream>) {
return &TFormatBuilder::CreateProtobufFormat;
} else if constexpr (std::is_same_v<T, TVoidStructuredRowStream>) {
return &TFormatBuilder::CreateVoidFormat;
} else {
static_assert(TDependentFalse<T>, "unknown stream description");
}
}
};
TFormatBuilder::TFormatBuilder(
IClientRetryPolicyPtr clientRetryPolicy,
TClientContext context,
TTransactionId transactionId,
TOperationOptions operationOptions)
: ClientRetryPolicy_(std::move(clientRetryPolicy))
, Context_(std::move(context))
, TransactionId_(transactionId)
, OperationOptions_(std::move(operationOptions))
{ }
std::pair <TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateFormat(
const IStructuredJob& job,
const EIODirection& direction,
const TStructuredJobTableList& structuredTableList,
const TMaybe <TFormatHints>& formatHints,
ENodeReaderFormat nodeReaderFormat,
bool allowFormatFromTableAttribute)
{
auto jobStreamDescription = GetJobStreamDescription(job, direction);
auto method = std::visit(TFormatSwitcher(), jobStreamDescription);
return (this->*method)(
job,
direction,
structuredTableList,
formatHints,
nodeReaderFormat,
allowFormatFromTableAttribute);
}
std::pair<TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateVoidFormat(
const IStructuredJob& /*job*/,
const EIODirection& /*direction*/,
const TStructuredJobTableList& /*structuredTableList*/,
const TMaybe<TFormatHints>& /*formatHints*/,
ENodeReaderFormat /*nodeReaderFormat*/,
bool /*allowFormatFromTableAttribute*/)
{
return {
TFormat(),
Nothing()
};
}
std::pair<TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateYamrFormat(
const IStructuredJob& job,
const EIODirection& direction,
const TStructuredJobTableList& structuredTableList,
const TMaybe<TFormatHints>& /*formatHints*/,
ENodeReaderFormat /*nodeReaderFormat*/,
bool allowFormatFromTableAttribute)
{
for (const auto& table: structuredTableList) {
if (!std::holds_alternative<TUnspecifiedTableStructure>(table.Description)) {
ythrow TApiUsageError()
<< "cannot use " << direction << " table '" << JobTablePathString(table)
<< "' with job " << TJobFactory::Get()->GetJobName(&job) << "; "
<< "table has unsupported structure description; check " << GetAddIOMethodName(direction) << " for this table";
}
}
TMaybe<TNode> formatFromTableAttributes;
if (allowFormatFromTableAttribute && OperationOptions_.UseTableFormats_) {
TVector<TRichYPath> tableList;
for (const auto& table: structuredTableList) {
Y_VERIFY(table.RichYPath, "Cannot use format from table for intermediate table");
tableList.push_back(*table.RichYPath);
}
formatFromTableAttributes = GetTableFormats(ClientRetryPolicy_, Context_, TransactionId_, tableList);
}
if (formatFromTableAttributes) {
return {
TFormat(*formatFromTableAttributes),
Nothing()
};
} else {
auto formatNode = TNode("yamr");
formatNode.Attributes() = TNode()
("lenval", true)
("has_subkey", true)
("enable_table_index", true);
return {
TFormat(formatNode),
Nothing()
};
}
}
std::pair<TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateNodeFormat(
const IStructuredJob& job,
const EIODirection& direction,
const TStructuredJobTableList& structuredTableList,
const TMaybe<TFormatHints>& formatHints,
ENodeReaderFormat nodeReaderFormat,
bool /*allowFormatFromTableAttribute*/)
{
for (const auto& table: structuredTableList) {
if (!std::holds_alternative<TUnspecifiedTableStructure>(table.Description)) {
ythrow TApiUsageError()
<< "cannot use " << direction << " table '" << JobTablePathString(table)
<< "' with job " << TJobFactory::Get()->GetJobName(&job) << "; "
<< "table has unsupported structure description; check AddInput<> / AddOutput<> for this table";
}
}
NSkiff::TSkiffSchemaPtr skiffSchema = nullptr;
if (nodeReaderFormat != ENodeReaderFormat::Yson) {
TVector<TRichYPath> tableList;
for (const auto& table: structuredTableList) {
Y_VERIFY(table.RichYPath, "Cannot use skiff with temporary tables");
tableList.emplace_back(*table.RichYPath);
}
skiffSchema = TryCreateSkiffSchema(
Context_,
ClientRetryPolicy_,
TransactionId_,
tableList,
OperationOptions_,
nodeReaderFormat);
}
if (skiffSchema) {
auto format = CreateSkiffFormat(skiffSchema);
NYT::NDetail::ApplyFormatHints<TNode>(&format, formatHints);
return {
CreateSkiffFormat(skiffSchema),
TSmallJobFile{
TString("skiff") + GetSuffix(direction),
CreateSkiffConfig(skiffSchema)
}
};
} else {
auto format = TFormat::YsonBinary();
NYT::NDetail::ApplyFormatHints<TNode>(&format, formatHints);
return {
format,
Nothing()
};
}
}
[[noreturn]] static void ThrowUnsupportedStructureDescription(
const EIODirection& direction,
const TStructuredJobTable& table,
const IStructuredJob& job)
{
ythrow TApiUsageError()
<< "cannot use " << direction << " table '" << JobTablePathString(table)
<< "' with job " << TJobFactory::Get()->GetJobName(&job) << "; "
<< "table has unsupported structure description; check " << GetAddIOMethodName(direction) << " for this table";
}
[[noreturn]] static void ThrowTypeDeriveFail(
const EIODirection& direction,
const IStructuredJob& job,
const TString& type)
{
ythrow TApiUsageError()
<< "Cannot derive exact " << type << " type for intermediate " << direction << " table for job "
<< TJobFactory::Get()->GetJobName(&job)
<< "; use one of TMapReduceOperationSpec::Hint* methods to specifiy intermediate table structure";
}
[[noreturn]] static void ThrowUnexpectedDifferentDescriptors(
const EIODirection& direction,
const TStructuredJobTable& table,
const IStructuredJob& job,
const TMaybe<TStringBuf> jobDescriptorName,
const TMaybe<TStringBuf> descriptorName)
{
ythrow TApiUsageError()
<< "Job " << TJobFactory::Get()->GetJobName(&job) << " expects "
<< jobDescriptorName << " as " << direction << ", but table " << JobTablePathString(table)
<< " is tagged with " << descriptorName;
}
std::pair<TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateProtobufFormat(
const IStructuredJob& job,
const EIODirection& direction,
const TStructuredJobTableList& structuredTableList,
const TMaybe<TFormatHints>& /*formatHints*/,
ENodeReaderFormat /*nodeReaderFormat*/,
bool /*allowFormatFromTableAttribute*/)
{
if (Context_.Config->UseClientProtobuf) {
return {
TFormat::YsonBinary(),
TSmallJobFile{
TString("proto") + GetSuffix(direction),
CreateProtoConfig({}),
},
};
}
const ::google::protobuf::Descriptor* const jobDescriptor =
std::get<TProtobufStructuredRowStream>(GetJobStreamDescription(job, direction)).Descriptor;
Y_ENSURE(!structuredTableList.empty(),
"empty " << direction << " tables for job " << TJobFactory::Get()->GetJobName(&job));
TVector<const ::google::protobuf::Descriptor*> descriptorList;
for (const auto& table : structuredTableList) {
const ::google::protobuf::Descriptor* descriptor = nullptr;
if (std::holds_alternative<TProtobufTableStructure>(table.Description)) {
descriptor = std::get<TProtobufTableStructure>(table.Description).Descriptor;
} else if (table.RichYPath) {
ThrowUnsupportedStructureDescription(direction, table, job);
}
if (!descriptor) {
// It must be intermediate table, because there is no proper way to add such table to spec
// (AddInput requires to specify proper message).
Y_VERIFY(!table.RichYPath, "Descriptors for all tables except intermediate must be known");
if (jobDescriptor) {
descriptor = jobDescriptor;
} else {
ThrowTypeDeriveFail(direction, job, "protobuf");
}
}
if (jobDescriptor && descriptor != jobDescriptor) {
ThrowUnexpectedDifferentDescriptors(
direction,
table,
job,
jobDescriptor->full_name(),
descriptor->full_name());
}
descriptorList.push_back(descriptor);
}
Y_VERIFY(!descriptorList.empty(), "Messages for proto format are unknown (empty ProtoDescriptors)");
return {
TFormat::Protobuf(descriptorList, Context_.Config->ProtobufFormatWithDescriptors),
TSmallJobFile{
TString("proto") + GetSuffix(direction),
CreateProtoConfig(descriptorList)
},
};
}
////////////////////////////////////////////////////////////////////////////////
struct TGetTableSchemaImpl
{
template <typename T>
TMaybe<TTableSchema> operator() (const T& description) {
if constexpr (std::is_same_v<T, TUnspecifiedTableStructure>) {
return Nothing();
} else if constexpr (std::is_same_v<T, TProtobufTableStructure>) {
if (!description.Descriptor) {
return Nothing();
}
return CreateTableSchema(*description.Descriptor);
} else {
static_assert(TDependentFalse<T>, "unknown type");
}
}
};
TMaybe<TTableSchema> GetTableSchema(const TTableStructure& tableStructure)
{
return std::visit(TGetTableSchemaImpl(), tableStructure);
}
////////////////////////////////////////////////////////////////////////////////
} // namespace NDetail
} // namespace NYT