diff options
author | dave11ar <dave11ar@yandex-team.com> | 2024-09-20 02:59:18 +0300 |
---|---|---|
committer | dave11ar <dave11ar@yandex-team.com> | 2024-09-20 03:07:53 +0300 |
commit | e8df434071e6a63077b49df797d2fd05826eef26 (patch) | |
tree | a642971ef95bbda75edd72dddfe907173fc3e143 | |
parent | 452cef334fbdb920e8180aeac12ae1fe8dbd3158 (diff) | |
download | ydb-e8df434071e6a63077b49df797d2fd05826eef26.tar.gz |
YT-21274: Add dynamic tables versioned map reduce write
commit_hash:6ad6c8ea122b90ec0ea6a226b42fc82a47760de6
-rw-r--r-- | yt/yt/client/table_client/check_schema_compatibility.cpp | 9 | ||||
-rw-r--r-- | yt/yt/client/table_client/check_schema_compatibility.h | 1 | ||||
-rw-r--r-- | yt/yt/client/table_client/config.cpp | 2 | ||||
-rw-r--r-- | yt/yt/client/table_client/config.h | 3 | ||||
-rw-r--r-- | yt/yt/client/table_client/public.h | 2 | ||||
-rw-r--r-- | yt/yt/client/table_client/schema.cpp | 7 | ||||
-rw-r--r-- | yt/yt/client/table_client/versioned_io_options.cpp | 20 | ||||
-rw-r--r-- | yt/yt/client/table_client/versioned_io_options.h | 18 | ||||
-rw-r--r-- | yt/yt/client/ypath/rich.cpp | 6 | ||||
-rw-r--r-- | yt/yt/client/ypath/rich.h | 3 | ||||
-rw-r--r-- | yt/yt_proto/yt/client/table_client/proto/versioned_io_options.proto | 5 |
11 files changed, 75 insertions, 1 deletions
diff --git a/yt/yt/client/table_client/check_schema_compatibility.cpp b/yt/yt/client/table_client/check_schema_compatibility.cpp index a2297bf05d..d2fd29b903 100644 --- a/yt/yt/client/table_client/check_schema_compatibility.cpp +++ b/yt/yt/client/table_client/check_schema_compatibility.cpp @@ -2,6 +2,7 @@ #include "logical_type.h" #include "schema.h" #include "comparator.h" +#include "versioned_io_options.h" #include <yt/yt/client/complex_types/check_type_compatibility.h> @@ -27,6 +28,14 @@ std::pair<ESchemaCompatibility, TError> CheckTableSchemaCompatibilityImpl( for (const auto& inputColumn : inputSchema.Columns()) { if (!outputSchema.FindColumn(inputColumn.Name())) { + if (options.AllowTimestampColumns) { + if (auto originalColumnName = GetTimestampColumnOriginalNameOrNull(inputColumn.Name())) { + if (outputSchema.FindColumn(*originalColumnName)) { + continue; + } + } + } + return { ESchemaCompatibility::Incompatible, TError("Column %v is found in input schema but is missing in output schema", diff --git a/yt/yt/client/table_client/check_schema_compatibility.h b/yt/yt/client/table_client/check_schema_compatibility.h index 2fbb823ff0..78ebb088cc 100644 --- a/yt/yt/client/table_client/check_schema_compatibility.h +++ b/yt/yt/client/table_client/check_schema_compatibility.h @@ -11,6 +11,7 @@ struct TTableSchemaCompatibilityOptions bool IgnoreSortOrder = false; bool ForbidExtraComputedColumns = true; bool IgnoreStableNamesDifference = false; + bool AllowTimestampColumns = false; }; // Validates that values from table with inputSchema also match outputSchema. diff --git a/yt/yt/client/table_client/config.cpp b/yt/yt/client/table_client/config.cpp index a580572880..96ff8b5af6 100644 --- a/yt/yt/client/table_client/config.cpp +++ b/yt/yt/client/table_client/config.cpp @@ -467,6 +467,8 @@ void TChunkWriterOptions::Register(TRegistrar registrar) registrar.Parameter("schema_modification", &TThis::SchemaModification) .Default(ETableSchemaModification::None); + registrar.Parameter("versioned_write_options", &TThis::VersionedWriteOptions) + .Default(); registrar.Parameter("max_heavy_columns", &TThis::MaxHeavyColumns) .Default(0); diff --git a/yt/yt/client/table_client/config.h b/yt/yt/client/table_client/config.h index e1bc37043d..a8accf974b 100644 --- a/yt/yt/client/table_client/config.h +++ b/yt/yt/client/table_client/config.h @@ -1,6 +1,7 @@ #pragma once #include "public.h" +#include "versioned_io_options.h" #include <yt/yt/client/chunk_client/config.h> @@ -433,6 +434,8 @@ public: ETableSchemaModification SchemaModification; + TVersionedWriteOptions VersionedWriteOptions; + EOptimizeFor OptimizeFor; std::optional<NChunkClient::EChunkFormat> ChunkFormat; NChunkClient::EChunkFormat GetEffectiveChunkFormat(bool versioned) const; diff --git a/yt/yt/client/table_client/public.h b/yt/yt/client/table_client/public.h index 45a241afff..e52eaecf48 100644 --- a/yt/yt/client/table_client/public.h +++ b/yt/yt/client/table_client/public.h @@ -47,6 +47,7 @@ class TColumnMetaExt; class TVersionedRowDigestExt; class TCompressionDictionaryExt; class TVersionedReadOptions; +class TVersionedWriteOptions; } // namespace NProto @@ -443,6 +444,7 @@ static_assert(sizeof(TDynamicTableKeyMask) * 8 == MaxKeyColumnCountInDynamicTabl using TUUComparerSignature = int(const TUnversionedValue*, const TUnversionedValue*, int); struct TVersionedReadOptions; +struct TVersionedWriteOptions; //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/table_client/schema.cpp b/yt/yt/client/table_client/schema.cpp index ac530ed758..f3762d0951 100644 --- a/yt/yt/client/table_client/schema.cpp +++ b/yt/yt/client/table_client/schema.cpp @@ -4,6 +4,7 @@ #include "comparator.h" #include "logical_type.h" #include "unversioned_row.h" +#include "versioned_io_options.h" #include <optional> @@ -518,7 +519,11 @@ TColumnStableName TTableSchema::TNameMapping::NameToStableName(TStringBuf name) auto* column = Schema_.FindColumn(name); if (!column) { if (Schema_.GetStrict()) { - THROW_ERROR_EXCEPTION("No column with name %Qv in strict schema", name); + if (auto originalColumnName = GetTimestampColumnOriginalNameOrNull(name); + !originalColumnName || !Schema_.FindColumn(*originalColumnName)) + { + THROW_ERROR_EXCEPTION("No column with name %Qv in strict schema", name); + } } return TColumnStableName(TString(name)); } diff --git a/yt/yt/client/table_client/versioned_io_options.cpp b/yt/yt/client/table_client/versioned_io_options.cpp index 69bd8fceff..7a067d8a96 100644 --- a/yt/yt/client/table_client/versioned_io_options.cpp +++ b/yt/yt/client/table_client/versioned_io_options.cpp @@ -12,6 +12,12 @@ void TVersionedReadOptions::Register(TRegistrar registrar) .Default(EVersionedIOMode::Default); } +void TVersionedWriteOptions::Register(TRegistrar registrar) +{ + registrar.Parameter("write_mode", &TThis::WriteMode) + .Default(EVersionedIOMode::Default); +} + void ToProto( NProto::TVersionedReadOptions* protoOptions, const TVersionedReadOptions& options) @@ -26,6 +32,20 @@ void FromProto( options->ReadMode = CheckedEnumCast<EVersionedIOMode>(protoOptions.read_mode()); } +void ToProto( + NProto::TVersionedWriteOptions* protoOptions, + const NTableClient::TVersionedWriteOptions& options) +{ + protoOptions->set_write_mode(static_cast<i32>(options.WriteMode)); +} + +void FromProto( + NTableClient::TVersionedWriteOptions* options, + const NProto::TVersionedWriteOptions& protoOptions) +{ + options->WriteMode = CheckedEnumCast<EVersionedIOMode>(protoOptions.write_mode()); +} + std::optional<TString> GetTimestampColumnOriginalNameOrNull(TStringBuf name) { auto prefixEnd = name.begin() + ssize(TimestampColumnPrefix); diff --git a/yt/yt/client/table_client/versioned_io_options.h b/yt/yt/client/table_client/versioned_io_options.h index 6e1f2f7a7b..219c97a064 100644 --- a/yt/yt/client/table_client/versioned_io_options.h +++ b/yt/yt/client/table_client/versioned_io_options.h @@ -23,6 +23,16 @@ struct TVersionedReadOptions static void Register(TRegistrar registrar); }; +struct TVersionedWriteOptions + : public NYTree::TYsonStructLite +{ + EVersionedIOMode WriteMode; + + REGISTER_YSON_STRUCT_LITE(TVersionedWriteOptions); + + static void Register(TRegistrar registrar); +}; + void ToProto( NProto::TVersionedReadOptions* protoOptions, const NTableClient::TVersionedReadOptions& options); @@ -31,6 +41,14 @@ void FromProto( NTableClient::TVersionedReadOptions* options, const NProto::TVersionedReadOptions& protoOptions); +void ToProto( + NProto::TVersionedWriteOptions* protoOptions, + const NTableClient::TVersionedWriteOptions& options); + +void FromProto( + NTableClient::TVersionedWriteOptions* options, + const NProto::TVersionedWriteOptions& protoOptions); + std::optional<TString> GetTimestampColumnOriginalNameOrNull(TStringBuf name); //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/yt/client/ypath/rich.cpp b/yt/yt/client/ypath/rich.cpp index b275c2ea04..ddd386df64 100644 --- a/yt/yt/client/ypath/rich.cpp +++ b/yt/yt/client/ypath/rich.cpp @@ -686,6 +686,11 @@ TVersionedReadOptions TRichYPath::GetVersionedReadOptions() const return GetAttribute(*this, "versioned_read_options", TVersionedReadOptions()); } +TVersionedWriteOptions TRichYPath::GetVersionedWriteOptions() const +{ + return GetAttribute(*this, "versioned_write_options", TVersionedWriteOptions()); +} + //////////////////////////////////////////////////////////////////////////////// TString ConvertToString(const TRichYPath& path, EYsonFormat ysonFormat) @@ -787,6 +792,7 @@ const std::vector<TString>& GetWellKnownRichYPathAttributes() "create", "read_via_exec_node", "versioned_read_options", + "versioned_write_options", }; return WellKnownAttributes; } diff --git a/yt/yt/client/ypath/rich.h b/yt/yt/client/ypath/rich.h index 0e483b4838..b6d7d60ccf 100644 --- a/yt/yt/client/ypath/rich.h +++ b/yt/yt/client/ypath/rich.h @@ -170,6 +170,9 @@ public: // "versioned_read_options" NTableClient::TVersionedReadOptions GetVersionedReadOptions() const; + // "versioned_write_options" + NTableClient::TVersionedWriteOptions GetVersionedWriteOptions() const; + private: TYPath Path_; NYTree::IAttributeDictionaryPtr Attributes_; diff --git a/yt/yt_proto/yt/client/table_client/proto/versioned_io_options.proto b/yt/yt_proto/yt/client/table_client/proto/versioned_io_options.proto index 53e1ada19e..9172331d81 100644 --- a/yt/yt_proto/yt/client/table_client/proto/versioned_io_options.proto +++ b/yt/yt_proto/yt/client/table_client/proto/versioned_io_options.proto @@ -9,4 +9,9 @@ message TVersionedReadOptions required int32 read_mode = 1; // EVersionedIOMode } +message TVersionedWriteOptions +{ + required int32 write_mode = 1; // EVersionedIOMode +} + //////////////////////////////////////////////////////////////////////////////// |