aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordave11ar <dave11ar@yandex-team.com>2024-09-20 02:59:18 +0300
committerdave11ar <dave11ar@yandex-team.com>2024-09-20 03:07:53 +0300
commite8df434071e6a63077b49df797d2fd05826eef26 (patch)
treea642971ef95bbda75edd72dddfe907173fc3e143
parent452cef334fbdb920e8180aeac12ae1fe8dbd3158 (diff)
downloadydb-e8df434071e6a63077b49df797d2fd05826eef26.tar.gz
YT-21274: Add dynamic tables versioned map reduce write
commit_hash:6ad6c8ea122b90ec0ea6a226b42fc82a47760de6
-rw-r--r--yt/yt/client/table_client/check_schema_compatibility.cpp9
-rw-r--r--yt/yt/client/table_client/check_schema_compatibility.h1
-rw-r--r--yt/yt/client/table_client/config.cpp2
-rw-r--r--yt/yt/client/table_client/config.h3
-rw-r--r--yt/yt/client/table_client/public.h2
-rw-r--r--yt/yt/client/table_client/schema.cpp7
-rw-r--r--yt/yt/client/table_client/versioned_io_options.cpp20
-rw-r--r--yt/yt/client/table_client/versioned_io_options.h18
-rw-r--r--yt/yt/client/ypath/rich.cpp6
-rw-r--r--yt/yt/client/ypath/rich.h3
-rw-r--r--yt/yt_proto/yt/client/table_client/proto/versioned_io_options.proto5
11 files changed, 75 insertions, 1 deletions
diff --git a/yt/yt/client/table_client/check_schema_compatibility.cpp b/yt/yt/client/table_client/check_schema_compatibility.cpp
index a2297bf05d..d2fd29b903 100644
--- a/yt/yt/client/table_client/check_schema_compatibility.cpp
+++ b/yt/yt/client/table_client/check_schema_compatibility.cpp
@@ -2,6 +2,7 @@
#include "logical_type.h"
#include "schema.h"
#include "comparator.h"
+#include "versioned_io_options.h"
#include <yt/yt/client/complex_types/check_type_compatibility.h>
@@ -27,6 +28,14 @@ std::pair<ESchemaCompatibility, TError> CheckTableSchemaCompatibilityImpl(
for (const auto& inputColumn : inputSchema.Columns()) {
if (!outputSchema.FindColumn(inputColumn.Name())) {
+ if (options.AllowTimestampColumns) {
+ if (auto originalColumnName = GetTimestampColumnOriginalNameOrNull(inputColumn.Name())) {
+ if (outputSchema.FindColumn(*originalColumnName)) {
+ continue;
+ }
+ }
+ }
+
return {
ESchemaCompatibility::Incompatible,
TError("Column %v is found in input schema but is missing in output schema",
diff --git a/yt/yt/client/table_client/check_schema_compatibility.h b/yt/yt/client/table_client/check_schema_compatibility.h
index 2fbb823ff0..78ebb088cc 100644
--- a/yt/yt/client/table_client/check_schema_compatibility.h
+++ b/yt/yt/client/table_client/check_schema_compatibility.h
@@ -11,6 +11,7 @@ struct TTableSchemaCompatibilityOptions
bool IgnoreSortOrder = false;
bool ForbidExtraComputedColumns = true;
bool IgnoreStableNamesDifference = false;
+ bool AllowTimestampColumns = false;
};
// Validates that values from table with inputSchema also match outputSchema.
diff --git a/yt/yt/client/table_client/config.cpp b/yt/yt/client/table_client/config.cpp
index a580572880..96ff8b5af6 100644
--- a/yt/yt/client/table_client/config.cpp
+++ b/yt/yt/client/table_client/config.cpp
@@ -467,6 +467,8 @@ void TChunkWriterOptions::Register(TRegistrar registrar)
registrar.Parameter("schema_modification", &TThis::SchemaModification)
.Default(ETableSchemaModification::None);
+ registrar.Parameter("versioned_write_options", &TThis::VersionedWriteOptions)
+ .Default();
registrar.Parameter("max_heavy_columns", &TThis::MaxHeavyColumns)
.Default(0);
diff --git a/yt/yt/client/table_client/config.h b/yt/yt/client/table_client/config.h
index e1bc37043d..a8accf974b 100644
--- a/yt/yt/client/table_client/config.h
+++ b/yt/yt/client/table_client/config.h
@@ -1,6 +1,7 @@
#pragma once
#include "public.h"
+#include "versioned_io_options.h"
#include <yt/yt/client/chunk_client/config.h>
@@ -433,6 +434,8 @@ public:
ETableSchemaModification SchemaModification;
+ TVersionedWriteOptions VersionedWriteOptions;
+
EOptimizeFor OptimizeFor;
std::optional<NChunkClient::EChunkFormat> ChunkFormat;
NChunkClient::EChunkFormat GetEffectiveChunkFormat(bool versioned) const;
diff --git a/yt/yt/client/table_client/public.h b/yt/yt/client/table_client/public.h
index 45a241afff..e52eaecf48 100644
--- a/yt/yt/client/table_client/public.h
+++ b/yt/yt/client/table_client/public.h
@@ -47,6 +47,7 @@ class TColumnMetaExt;
class TVersionedRowDigestExt;
class TCompressionDictionaryExt;
class TVersionedReadOptions;
+class TVersionedWriteOptions;
} // namespace NProto
@@ -443,6 +444,7 @@ static_assert(sizeof(TDynamicTableKeyMask) * 8 == MaxKeyColumnCountInDynamicTabl
using TUUComparerSignature = int(const TUnversionedValue*, const TUnversionedValue*, int);
struct TVersionedReadOptions;
+struct TVersionedWriteOptions;
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/table_client/schema.cpp b/yt/yt/client/table_client/schema.cpp
index ac530ed758..f3762d0951 100644
--- a/yt/yt/client/table_client/schema.cpp
+++ b/yt/yt/client/table_client/schema.cpp
@@ -4,6 +4,7 @@
#include "comparator.h"
#include "logical_type.h"
#include "unversioned_row.h"
+#include "versioned_io_options.h"
#include <optional>
@@ -518,7 +519,11 @@ TColumnStableName TTableSchema::TNameMapping::NameToStableName(TStringBuf name)
auto* column = Schema_.FindColumn(name);
if (!column) {
if (Schema_.GetStrict()) {
- THROW_ERROR_EXCEPTION("No column with name %Qv in strict schema", name);
+ if (auto originalColumnName = GetTimestampColumnOriginalNameOrNull(name);
+ !originalColumnName || !Schema_.FindColumn(*originalColumnName))
+ {
+ THROW_ERROR_EXCEPTION("No column with name %Qv in strict schema", name);
+ }
}
return TColumnStableName(TString(name));
}
diff --git a/yt/yt/client/table_client/versioned_io_options.cpp b/yt/yt/client/table_client/versioned_io_options.cpp
index 69bd8fceff..7a067d8a96 100644
--- a/yt/yt/client/table_client/versioned_io_options.cpp
+++ b/yt/yt/client/table_client/versioned_io_options.cpp
@@ -12,6 +12,12 @@ void TVersionedReadOptions::Register(TRegistrar registrar)
.Default(EVersionedIOMode::Default);
}
+void TVersionedWriteOptions::Register(TRegistrar registrar)
+{
+ registrar.Parameter("write_mode", &TThis::WriteMode)
+ .Default(EVersionedIOMode::Default);
+}
+
void ToProto(
NProto::TVersionedReadOptions* protoOptions,
const TVersionedReadOptions& options)
@@ -26,6 +32,20 @@ void FromProto(
options->ReadMode = CheckedEnumCast<EVersionedIOMode>(protoOptions.read_mode());
}
+void ToProto(
+ NProto::TVersionedWriteOptions* protoOptions,
+ const NTableClient::TVersionedWriteOptions& options)
+{
+ protoOptions->set_write_mode(static_cast<i32>(options.WriteMode));
+}
+
+void FromProto(
+ NTableClient::TVersionedWriteOptions* options,
+ const NProto::TVersionedWriteOptions& protoOptions)
+{
+ options->WriteMode = CheckedEnumCast<EVersionedIOMode>(protoOptions.write_mode());
+}
+
std::optional<TString> GetTimestampColumnOriginalNameOrNull(TStringBuf name)
{
auto prefixEnd = name.begin() + ssize(TimestampColumnPrefix);
diff --git a/yt/yt/client/table_client/versioned_io_options.h b/yt/yt/client/table_client/versioned_io_options.h
index 6e1f2f7a7b..219c97a064 100644
--- a/yt/yt/client/table_client/versioned_io_options.h
+++ b/yt/yt/client/table_client/versioned_io_options.h
@@ -23,6 +23,16 @@ struct TVersionedReadOptions
static void Register(TRegistrar registrar);
};
+struct TVersionedWriteOptions
+ : public NYTree::TYsonStructLite
+{
+ EVersionedIOMode WriteMode;
+
+ REGISTER_YSON_STRUCT_LITE(TVersionedWriteOptions);
+
+ static void Register(TRegistrar registrar);
+};
+
void ToProto(
NProto::TVersionedReadOptions* protoOptions,
const NTableClient::TVersionedReadOptions& options);
@@ -31,6 +41,14 @@ void FromProto(
NTableClient::TVersionedReadOptions* options,
const NProto::TVersionedReadOptions& protoOptions);
+void ToProto(
+ NProto::TVersionedWriteOptions* protoOptions,
+ const NTableClient::TVersionedWriteOptions& options);
+
+void FromProto(
+ NTableClient::TVersionedWriteOptions* options,
+ const NProto::TVersionedWriteOptions& protoOptions);
+
std::optional<TString> GetTimestampColumnOriginalNameOrNull(TStringBuf name);
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/yt/client/ypath/rich.cpp b/yt/yt/client/ypath/rich.cpp
index b275c2ea04..ddd386df64 100644
--- a/yt/yt/client/ypath/rich.cpp
+++ b/yt/yt/client/ypath/rich.cpp
@@ -686,6 +686,11 @@ TVersionedReadOptions TRichYPath::GetVersionedReadOptions() const
return GetAttribute(*this, "versioned_read_options", TVersionedReadOptions());
}
+TVersionedWriteOptions TRichYPath::GetVersionedWriteOptions() const
+{
+ return GetAttribute(*this, "versioned_write_options", TVersionedWriteOptions());
+}
+
////////////////////////////////////////////////////////////////////////////////
TString ConvertToString(const TRichYPath& path, EYsonFormat ysonFormat)
@@ -787,6 +792,7 @@ const std::vector<TString>& GetWellKnownRichYPathAttributes()
"create",
"read_via_exec_node",
"versioned_read_options",
+ "versioned_write_options",
};
return WellKnownAttributes;
}
diff --git a/yt/yt/client/ypath/rich.h b/yt/yt/client/ypath/rich.h
index 0e483b4838..b6d7d60ccf 100644
--- a/yt/yt/client/ypath/rich.h
+++ b/yt/yt/client/ypath/rich.h
@@ -170,6 +170,9 @@ public:
// "versioned_read_options"
NTableClient::TVersionedReadOptions GetVersionedReadOptions() const;
+ // "versioned_write_options"
+ NTableClient::TVersionedWriteOptions GetVersionedWriteOptions() const;
+
private:
TYPath Path_;
NYTree::IAttributeDictionaryPtr Attributes_;
diff --git a/yt/yt_proto/yt/client/table_client/proto/versioned_io_options.proto b/yt/yt_proto/yt/client/table_client/proto/versioned_io_options.proto
index 53e1ada19e..9172331d81 100644
--- a/yt/yt_proto/yt/client/table_client/proto/versioned_io_options.proto
+++ b/yt/yt_proto/yt/client/table_client/proto/versioned_io_options.proto
@@ -9,4 +9,9 @@ message TVersionedReadOptions
required int32 read_mode = 1; // EVersionedIOMode
}
+message TVersionedWriteOptions
+{
+ required int32 write_mode = 1; // EVersionedIOMode
+}
+
////////////////////////////////////////////////////////////////////////////////