diff options
| author | arkady-e1ppa <[email protected]> | 2024-10-15 17:23:48 +0300 | 
|---|---|---|
| committer | arkady-e1ppa <[email protected]> | 2024-10-15 17:35:30 +0300 | 
| commit | bd5483a72244773dcf2a54704e89540322816908 (patch) | |
| tree | 51d2c35d4d777daee1d123fdb1728bcb080501f3 | |
| parent | 337e125ccd63e222f75ff7290c8a1a1205024cde (diff) | |
YT-22307: Drop include dependency client -on-> ytlib
commit_hash:ccc25ebeed4973b3f8f133835e308fad69589169
| -rw-r--r-- | yt/yt/client/api/distributed_table_session.h | 2 | ||||
| -rw-r--r-- | yt/yt/client/chunk_client/public.h | 6 | ||||
| -rw-r--r-- | yt/yt/client/table_client/table_upload_options.cpp | 389 | ||||
| -rw-r--r-- | yt/yt/client/table_client/table_upload_options.h | 87 | ||||
| -rw-r--r-- | yt/yt/client/ya.make | 1 | 
5 files changed, 484 insertions, 1 deletions
diff --git a/yt/yt/client/api/distributed_table_session.h b/yt/yt/client/api/distributed_table_session.h index 1932284206e..c8c074c84c6 100644 --- a/yt/yt/client/api/distributed_table_session.h +++ b/yt/yt/client/api/distributed_table_session.h @@ -2,7 +2,7 @@  #include "public.h" -#include <yt/yt/ytlib/table_client/table_upload_options.h> +#include <yt/yt/client/table_client/table_upload_options.h>  #include <yt/yt/client/chunk_client/public.h> diff --git a/yt/yt/client/chunk_client/public.h b/yt/yt/client/chunk_client/public.h index 720aeed8060..5c93ee55fd6 100644 --- a/yt/yt/client/chunk_client/public.h +++ b/yt/yt/client/chunk_client/public.h @@ -91,6 +91,12 @@ YT_DEFINE_ERROR_ENUM(      ((ForbiddenErasureCodec)                 (762))  ); +DEFINE_ENUM(EUpdateMode, +    ((None)                     (0)) +    ((Append)                   (1)) +    ((Overwrite)                (2)) +); +  using TChunkId = NObjectClient::TObjectId;  extern const TChunkId NullChunkId; diff --git a/yt/yt/client/table_client/table_upload_options.cpp b/yt/yt/client/table_client/table_upload_options.cpp new file mode 100644 index 00000000000..6e3bddcf266 --- /dev/null +++ b/yt/yt/client/table_client/table_upload_options.cpp @@ -0,0 +1,389 @@ +#include "table_upload_options.h" +#include "helpers.h" + +#include <yt/yt/client/table_client/helpers.h> + +#include <yt/yt/client/ypath/rich.h> + +#include <yt/yt/core/ytree/helpers.h> + +namespace NYT::NTableClient { + +using namespace NChunkClient; +using namespace NCompression; +using namespace NCypressClient; +using namespace NYPath; +using namespace NYTree; + +//////////////////////////////////////////////////////////////////////////////// + +TEpochSchema::TEpochSchema(const TEpochSchema& other) +{ +    *this = other; +} + +TEpochSchema& TEpochSchema::operator=(const TEpochSchema& other) +{ +    TableSchema_ = other.TableSchema_; +    Revision_ += other.Revision_ + 1; +    return *this; +} + +TEpochSchema& TEpochSchema::operator=(TTableSchemaPtr schema) +{ +    Set(schema); +    return *this; +} + +const TTableSchema* TEpochSchema::operator->() const +{ +    return TableSchema_.Get(); +} + +const TTableSchemaPtr& TEpochSchema::operator*() const +{ +    return TableSchema_; +} + +const TTableSchemaPtr& TEpochSchema::Get() const +{ +    return TableSchema_; +} + +ui64 TEpochSchema::GetRevision() const +{ +    return Revision_; +} + +ui64 TEpochSchema::Set(const TTableSchemaPtr& schema) +{ +    TableSchema_ = schema; +    return ++Revision_; +} + +void TEpochSchema::Persist(const NPhoenix::TPersistenceContext& context) +{ +    using NYT::Persist; + +    Persist(context, Revision_); +    Persist<TNonNullableIntrusivePtrSerializer<>>(context, TableSchema_); +} + +ui64 TEpochSchema::Reset() +{ +    TableSchema_ = New<TTableSchema>(); +    return ++Revision_; +} + +//////////////////////////////////////////////////////////////////////////////// + +TTableSchemaPtr TTableUploadOptions::GetUploadSchema() const +{ +    switch (SchemaModification) { +        case ETableSchemaModification::None: +            return TableSchema.Get(); + +        case ETableSchemaModification::UnversionedUpdate: +            return TableSchema->ToUnversionedUpdate(/*sorted*/ true); + +        default: +            YT_ABORT(); +    } +} + +void TTableUploadOptions::Persist(const NPhoenix::TPersistenceContext& context) +{ +    using NYT::Persist; + +    Persist(context, UpdateMode); +    Persist(context, LockMode); +    // COMPAT(h0pless): NControllerAgent::ESnapshotVersion::AddChunkSchemas +    if (context.GetVersion() >= 301300) { +        Persist(context, TableSchema); +    } else { +        TTableSchemaPtr schema; +        Persist<TNonNullableIntrusivePtrSerializer<>>(context, schema); +        TableSchema.Set(schema); +    } +    Persist(context, SchemaId); +    Persist(context, SchemaModification); +    // COMPAT(dave11ar): NControllerAgent::ESnapshotVersion::VersionedMapReduceWrite +    if (context.GetVersion() >= 301602) { +        Persist(context, VersionedWriteOptions); +    } +    Persist(context, SchemaMode); +    Persist(context, OptimizeFor); +    // COMPAT(babenko): NControllerAgent::ESnapshotVersion::ChunkFormat +    if (context.GetVersion() >= 301103) { +        Persist(context, ChunkFormat); +    } +    Persist(context, CompressionCodec); +    Persist(context, ErasureCodec); +    Persist(context, EnableStripedErasure); +    Persist(context, SecurityTags); +    Persist(context, PartiallySorted); +} + +//////////////////////////////////////////////////////////////////////////////// + +static void ValidateSortColumnsEqual(const TSortColumns& sortColumns, const TTableSchema& schema) +{ +    if (sortColumns != schema.GetSortColumns()) { +        THROW_ERROR_EXCEPTION("YPath attribute \"sorted_by\" must be compatible with table schema for a \"strong\" schema mode") +            << TErrorAttribute("sort_columns", sortColumns) +            << TErrorAttribute("table_schema", schema); +    } +} + +static void ValidateAppendKeyColumns(const TSortColumns& sortColumns, const TTableSchema& schema, i64 rowCount) +{ +    ValidateSortColumns(sortColumns); + +    if (rowCount == 0) { +        return; +    } + +    auto tableSortColumns = schema.GetSortColumns(); +    bool areKeyColumnsCompatible = true; +    if (tableSortColumns.size() < sortColumns.size()) { +        areKeyColumnsCompatible = false; +    } else { +        for (int i = 0; i < std::ssize(sortColumns); ++i) { +            if (tableSortColumns[i] != sortColumns[i]) { +                areKeyColumnsCompatible = false; +                break; +            } +        } +    } + +    if (!areKeyColumnsCompatible) { +        THROW_ERROR_EXCEPTION("Sort columns mismatch while trying to append sorted data into a non-empty table") +            << TErrorAttribute("append_sort_columns", sortColumns) +            << TErrorAttribute("table_sort_columns", tableSortColumns); +    } +} + +const std::vector<TString>& GetTableUploadOptionsAttributeKeys() +{ +    static const std::vector<TString> Result{ +        "schema_mode", +        "optimize_for", +        "chunk_format", +        "compression_codec", +        "erasure_codec", +        "enable_striped_erasure", +        "dynamic" +    }; +    return Result; +} + +TTableUploadOptions GetTableUploadOptions( +    const TRichYPath& path, +    const IAttributeDictionary& cypressTableAttributes, +    const TTableSchemaPtr& schema, +    i64 rowCount) +{ +    auto schemaMode = cypressTableAttributes.Get<ETableSchemaMode>("schema_mode"); +    auto optimizeFor = cypressTableAttributes.Get<EOptimizeFor>("optimize_for"); +    auto chunkFormat = cypressTableAttributes.Find<EChunkFormat>("chunk_format"); +    auto compressionCodec = cypressTableAttributes.Get<NCompression::ECodec>("compression_codec"); +    auto erasureCodec = cypressTableAttributes.Get<NErasure::ECodec>("erasure_codec", NErasure::ECodec::None); +    auto enableStripedErasure = cypressTableAttributes.Get<bool>("enable_striped_erasure", false); +    auto dynamic = cypressTableAttributes.Get<bool>("dynamic"); + +    // Validate "optimize_for" and "chunk_format" compatibility. +    if (chunkFormat) { +        ValidateTableChunkFormatAndOptimizeFor(*chunkFormat, optimizeFor); +    } + +    // Some ypath attributes are not compatible with attribute "schema". +    if (path.GetAppend() && path.GetSchema()) { +        THROW_ERROR_EXCEPTION("YPath attributes \"append\" and \"schema\" are not compatible") +            << TErrorAttribute("path", path); +    } + +    if (!path.GetSortedBy().empty() && path.GetSchema()) { +        THROW_ERROR_EXCEPTION("YPath attributes \"sorted_by\" and \"schema\" are not compatible") +            << TErrorAttribute("path", path); +    } + +    // Dynamic tables have their own requirements as well. +    if (dynamic) { +        if (path.GetSchema()) { +            THROW_ERROR_EXCEPTION("YPath attribute \"schema\" cannot be set on a dynamic table") +                << TErrorAttribute("path", path); +        } + +        if (!path.GetSortedBy().empty()) { +            THROW_ERROR_EXCEPTION("YPath attribute \"sorted_by\" cannot be set on a dynamic table") +                << TErrorAttribute("path", path); +        } +    } + +    TTableUploadOptions result; +    // NB: Saving schema to make sure that if changes are applied to it the schema revision also changes. +    result.TableSchema = schema; +    auto pathSchema = path.GetSchema(); +    if (path.GetAppend() && !path.GetSortedBy().empty() && (schemaMode == ETableSchemaMode::Strong)) { +        ValidateSortColumnsEqual(path.GetSortedBy(), *schema); + +        result.LockMode = ELockMode::Exclusive; +        result.UpdateMode = EUpdateMode::Append; +        result.SchemaMode = ETableSchemaMode::Strong; +    } else if (path.GetAppend() && !path.GetSortedBy().empty() && (schemaMode == ETableSchemaMode::Weak)) { +        // Old behaviour. +        ValidateAppendKeyColumns(path.GetSortedBy(), *schema, rowCount); + +        result.LockMode = ELockMode::Exclusive; +        result.UpdateMode = EUpdateMode::Append; +        result.SchemaMode = ETableSchemaMode::Weak; +        result.TableSchema = TTableSchema::FromSortColumns(path.GetSortedBy()); +    } else if (path.GetAppend() && path.GetSortedBy().empty() && (schemaMode == ETableSchemaMode::Strong)) { +        result.LockMode = (schema->IsSorted() && !dynamic) ? ELockMode::Exclusive : ELockMode::Shared; +        result.UpdateMode = EUpdateMode::Append; +        result.SchemaMode = ETableSchemaMode::Strong; +    } else if (path.GetAppend() && path.GetSortedBy().empty() && (schemaMode == ETableSchemaMode::Weak)) { +        // Old behaviour - reset key columns if there were any. +        result.LockMode = ELockMode::Shared; +        result.UpdateMode = EUpdateMode::Append; +        result.SchemaMode = ETableSchemaMode::Weak; +        result.TableSchema.Reset(); +    } else if (!path.GetAppend() && !path.GetSortedBy().empty() && (schemaMode == ETableSchemaMode::Strong)) { +        ValidateSortColumnsEqual(path.GetSortedBy(), *schema); + +        result.LockMode = ELockMode::Exclusive; +        result.UpdateMode = EUpdateMode::Overwrite; +        result.SchemaMode = ETableSchemaMode::Strong; +    } else if (!path.GetAppend() && !path.GetSortedBy().empty() && (schemaMode == ETableSchemaMode::Weak)) { +        result.LockMode = ELockMode::Exclusive; +        result.UpdateMode = EUpdateMode::Overwrite; +        result.SchemaMode = ETableSchemaMode::Weak; +        result.TableSchema = TTableSchema::FromSortColumns(path.GetSortedBy()); +    } else if (!path.GetAppend() && pathSchema && (schemaMode == ETableSchemaMode::Strong)) { +        result.LockMode = ELockMode::Exclusive; +        result.UpdateMode = EUpdateMode::Overwrite; +        result.SchemaMode = ETableSchemaMode::Strong; +        result.TableSchema = pathSchema; +    } else if (!path.GetAppend() && pathSchema && (schemaMode == ETableSchemaMode::Weak)) { +        // Change from Weak to Strong schema mode. +        result.LockMode = ELockMode::Exclusive; +        result.UpdateMode = EUpdateMode::Overwrite; +        result.SchemaMode = ETableSchemaMode::Strong; +        result.TableSchema = pathSchema; +    } else if (!path.GetAppend() && path.GetSortedBy().empty() && (schemaMode == ETableSchemaMode::Strong)) { +        result.LockMode = ELockMode::Exclusive; +        result.UpdateMode = EUpdateMode::Overwrite; +        result.SchemaMode = ETableSchemaMode::Strong; +    } else if (!path.GetAppend() && path.GetSortedBy().empty() && (schemaMode == ETableSchemaMode::Weak)) { +        result.LockMode = ELockMode::Exclusive; +        result.UpdateMode = EUpdateMode::Overwrite; +        result.SchemaMode = ETableSchemaMode::Weak; +        result.TableSchema.Reset(); +    } else { +        // Do not use YT_ABORT here, since this code is executed inside scheduler. +        THROW_ERROR_EXCEPTION("Failed to define upload parameters") +            << TErrorAttribute("path", path) +            << TErrorAttribute("schema_mode", schemaMode) +            << TErrorAttribute("schema", *schema); +    } + +    if (path.GetAppend() && path.GetOptimizeFor()) { +        THROW_ERROR_EXCEPTION("YPath attributes \"append\" and \"optimize_for\" are not compatible") +            << TErrorAttribute("path", path); +    } + +    result.OptimizeFor = path.GetOptimizeFor() ? *path.GetOptimizeFor() : optimizeFor; +    result.ChunkFormat = path.GetChunkFormat() ? *path.GetChunkFormat() : chunkFormat; + +    if (path.GetAppend() && path.GetCompressionCodec()) { +        THROW_ERROR_EXCEPTION("YPath attributes \"append\" and \"compression_codec\" are not compatible") +            << TErrorAttribute("path", path); +    } + +    if (path.GetCompressionCodec()) { +        result.CompressionCodec = *path.GetCompressionCodec(); +    } else { +        result.CompressionCodec = compressionCodec; +    } + +    if (path.GetAppend() && path.GetErasureCodec()) { +        THROW_ERROR_EXCEPTION("YPath attributes \"append\" and \"erasure_codec\" are not compatible") +            << TErrorAttribute("path", path); +    } + +    result.ErasureCodec = path.GetErasureCodec().value_or(erasureCodec); +    result.EnableStripedErasure = enableStripedErasure; + +    if (path.GetSchemaModification() == ETableSchemaModification::UnversionedUpdateUnsorted) { +        THROW_ERROR_EXCEPTION("YPath attribute \"schema_modification\" cannot have value %Qlv for output tables", +            path.GetSchemaModification()) +            << TErrorAttribute("path", path); +    } else if (!dynamic && path.GetSchemaModification() != ETableSchemaModification::None) { +        THROW_ERROR_EXCEPTION("YPath attribute \"schema_modification\" can have value %Qlv only for dynamic tables", +            path.GetSchemaModification()) +            << TErrorAttribute("path", path); +    } +    result.SchemaModification = path.GetSchemaModification(); + +    auto versionedWriteOptions = path.GetVersionedWriteOptions(); +    if (!dynamic && versionedWriteOptions.WriteMode != EVersionedIOMode::Default) { +        THROW_ERROR_EXCEPTION("YPath attribute \"versioned_write_options/write_mode\" can have value %Qlv only for dynamic tables", +            versionedWriteOptions.WriteMode) +            << TErrorAttribute("path", path); +    } +    if (versionedWriteOptions.WriteMode != EVersionedIOMode::Default && path.GetSchemaModification() != ETableSchemaModification::None) { +        THROW_ERROR_EXCEPTION("YPath attributes \"versioned_write_options/write_mode\" and \"schema_modification\"" +            "can not be set in non-trivial state together: \"versioned_write_options/write_mode\" is %Qlv, \"schema_modification\" is %Qlv", +            versionedWriteOptions.WriteMode, +            path.GetSchemaModification()) +            << TErrorAttribute("path", path); +    } +    result.VersionedWriteOptions = versionedWriteOptions; + +    if (!dynamic && path.GetPartiallySorted()) { +        THROW_ERROR_EXCEPTION("YPath attribute \"partially_sorted\" can be set only for dynamic tables") +            << TErrorAttribute("path", path); +    } +    result.PartiallySorted = path.GetPartiallySorted(); + +    result.SecurityTags = path.GetSecurityTags(); + +    return result; +} + +TTableUploadOptions GetFileUploadOptions( +    const TRichYPath& path, +    const IAttributeDictionary& cypressTableAttributes) +{ +    auto compressionCodec = cypressTableAttributes.Get<NCompression::ECodec>("compression_codec"); +    auto enableStripedErasure = cypressTableAttributes.Get<bool>("enable_striped_erasure", false); +    auto erasureCodec = cypressTableAttributes.Get<NErasure::ECodec>("erasure_codec", NErasure::ECodec::None); + +    TTableUploadOptions result; + +    if (path.GetAppend()) { +        THROW_ERROR_EXCEPTION("Attribute \"append\" is not supported for files") +            << TErrorAttribute("path", path); +    } + +    // NB(coteeq): Fill for sanity. They should not have impact on behaviour, because +    //             RichYPath's compression_codec & erasure_codec are disallowed in remote copy. +    // TODO(coteeq): Make it YT_VERIFY +    if (path.GetCompressionCodec() || path.GetErasureCodec()) { +        THROW_ERROR_EXCEPTION("\"compression_codec\" and \"erasure_codec\" are disallowed for files") +            << TErrorAttribute("path", path); +    } + +    result.CompressionCodec = compressionCodec; +    result.ErasureCodec = erasureCodec; +    result.EnableStripedErasure = enableStripedErasure; +    result.SecurityTags = path.GetSecurityTags(); +    result.LockMode = ELockMode::Exclusive; +    result.UpdateMode = EUpdateMode::Overwrite; + +    return result; +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NTableClient diff --git a/yt/yt/client/table_client/table_upload_options.h b/yt/yt/client/table_client/table_upload_options.h new file mode 100644 index 00000000000..91bfa5fc874 --- /dev/null +++ b/yt/yt/client/table_client/table_upload_options.h @@ -0,0 +1,87 @@ +#pragma once + +#include "public.h" + +#include <yt/yt/client/chunk_client/public.h> + +#include <yt/yt/client/table_client/schema.h> +#include <yt/yt/client/table_client/versioned_io_options.h> + +#include <yt/yt/client/security_client/public.h> + +#include <yt/yt/library/erasure/public.h> + +#include <yt/yt/core/compression/public.h> + +#include <yt/yt/core/misc/phoenix.h> + +namespace NYT::NTableClient { + +//////////////////////////////////////////////////////////////////////////////// + +class TEpochSchema +{ +public: +    TEpochSchema() = default; +    TEpochSchema(const TEpochSchema& other); +    TEpochSchema& operator=(const TEpochSchema& other); + +    TEpochSchema(const TEpochSchema&& other) = delete; +    TEpochSchema& operator=(TEpochSchema&& other) = delete; + +    TEpochSchema& operator=(TTableSchemaPtr schema); + +    const TTableSchema* operator->() const; +    const TTableSchemaPtr& operator*() const; + +    const TTableSchemaPtr& Get() const; +    ui64 GetRevision() const; + +    ui64 Set(const TTableSchemaPtr& schema); + +    void Persist(const NPhoenix::TPersistenceContext& context); + +    ui64 Reset(); + +private: +    TTableSchemaPtr TableSchema_ = New<TTableSchema>(); +    ui64 Revision_ = 0; +}; + +struct TTableUploadOptions +{ +    NChunkClient::EUpdateMode UpdateMode; +    NCypressClient::ELockMode LockMode; +    TEpochSchema TableSchema; +    TMasterTableSchemaId SchemaId; +    ETableSchemaModification SchemaModification; +    TVersionedWriteOptions VersionedWriteOptions; +    ETableSchemaMode SchemaMode; +    EOptimizeFor OptimizeFor; +    std::optional<NChunkClient::EChunkFormat> ChunkFormat; +    NCompression::ECodec CompressionCodec; +    NErasure::ECodec ErasureCodec; +    bool EnableStripedErasure; +    std::optional<std::vector<NSecurityClient::TSecurityTag>> SecurityTags; +    bool PartiallySorted; + +    TTableSchemaPtr GetUploadSchema() const; + +    void Persist(const NPhoenix::TPersistenceContext& context); +}; + +const std::vector<TString>& GetTableUploadOptionsAttributeKeys(); + +TTableUploadOptions GetTableUploadOptions( +    const NYPath::TRichYPath& path, +    const NYTree::IAttributeDictionary& cypressTableAttributes, +    const TTableSchemaPtr& schema, +    i64 rowCount); + +TTableUploadOptions GetFileUploadOptions( +    const NYPath::TRichYPath& path, +    const NYTree::IAttributeDictionary& cypressTableAttributes); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NTableClient diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make index 9e468fbf8d8..a09ba33382a 100644 --- a/yt/yt/client/ya.make +++ b/yt/yt/client/ya.make @@ -127,6 +127,7 @@ SRCS(      table_client/schemaless_buffered_dynamic_table_writer.cpp      table_client/schemaless_dynamic_table_writer.cpp      table_client/serialize.cpp +    table_client/table_upload_options.cpp      table_client/logical_type.cpp      table_client/merge_table_schemas.cpp      table_client/name_table.cpp  | 
