diff options
author | Ilnaz Nizametdinov <i.nizametdinov@gmail.com> | 2022-06-10 00:13:22 +0300 |
---|---|---|
committer | Ilnaz Nizametdinov <i.nizametdinov@gmail.com> | 2022-06-10 00:13:22 +0300 |
commit | 369c1c09940bf144ee472ea355968b6caec7435d (patch) | |
tree | 38a98af4d5c20638c2c2bb5bc0dbc0ec0648b71a | |
parent | a864adadcf849b49085a6c20aa414d535e25fcaf (diff) | |
download | ydb-369c1c09940bf144ee472ea355968b6caec7435d.tar.gz |
Compressed backups: public api, cpp sdk, cli KIKIMR-14803
ref:8a5142ea8d1f2e652c8d2c89ba32415fc774c522
-rw-r--r-- | ydb/core/grpc_services/rpc_export.cpp | 25 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp | 6 | ||||
-rw-r--r-- | ydb/core/ydb_convert/CMakeLists.txt | 2 | ||||
-rw-r--r-- | ydb/core/ydb_convert/compression.cpp | 64 | ||||
-rw-r--r-- | ydb/core/ydb_convert/compression.h | 10 | ||||
-rw-r--r-- | ydb/core/ydb_convert/compression_ut.cpp | 39 | ||||
-rw-r--r-- | ydb/core/ydb_convert/ut/CMakeLists.darwin.txt | 1 | ||||
-rw-r--r-- | ydb/core/ydb_convert/ut/CMakeLists.linux.txt | 1 | ||||
-rw-r--r-- | ydb/public/api/protos/ydb_export.proto | 5 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_service_export.cpp | 11 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_service_export.h | 1 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/common/print_operation.cpp | 3 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_export/export.cpp | 10 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_export/export.h | 7 |
14 files changed, 174 insertions, 11 deletions
diff --git a/ydb/core/grpc_services/rpc_export.cpp b/ydb/core/grpc_services/rpc_export.cpp index 6b846bea5f..70243e8b40 100644 --- a/ydb/core/grpc_services/rpc_export.cpp +++ b/ydb/core/grpc_services/rpc_export.cpp @@ -6,6 +6,7 @@ #include <ydb/public/api/protos/ydb_export.pb.h> #include <ydb/core/tx/schemeshard/schemeshard_export.h> +#include <ydb/core/ydb_convert/compression.h> #include <library/cpp/actors/core/hfunc.h> @@ -42,11 +43,11 @@ class TExportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>, } auto& createExport = *ev->Record.MutableRequest(); - createExport.MutableOperationParams()->CopyFrom(request.operation_params()); - if (std::is_same_v<TEvRequest, TEvExportToYtRequest>) { - createExport.MutableExportToYtSettings()->CopyFrom(request.settings()); - } else if (std::is_same_v<TEvRequest, TEvExportToS3Request>) { - createExport.MutableExportToS3Settings()->CopyFrom(request.settings()); + *createExport.MutableOperationParams() = request.operation_params(); + if constexpr (std::is_same_v<TEvRequest, TEvExportToYtRequest>) { + *createExport.MutableExportToYtSettings() = request.settings(); + } else if constexpr (std::is_same_v<TEvRequest, TEvExportToS3Request>) { + *createExport.MutableExportToS3Settings() = request.settings(); } return ev.Release(); @@ -182,12 +183,22 @@ public: using TRpcOperationRequestActor<TDerived, TEvRequest, true>::TRpcOperationRequestActor; void Bootstrap(const TActorContext&) { - const auto& request = *this->GetProtoRequest(); + const auto& settings = this->GetProtoRequest()->settings(); - if (request.settings().items().empty()) { + if (settings.items().empty()) { return this->Reply(StatusIds::BAD_REQUEST, TIssuesIds::DEFAULT_ERROR, "Items are not set"); } + if constexpr (std::is_same_v<TEvRequest, TEvExportToS3Request>) { + if (settings.compression()) { + StatusIds::StatusCode status; + TString error; + if (!CheckCompression(settings.compression(), status, error)) { + return this->Reply(status, TIssuesIds::DEFAULT_ERROR, error); + } + } + } + ResolvePaths(ExtractPaths()); } diff --git a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp index 17223ef5a5..2f2176743e 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp @@ -1,6 +1,8 @@ #include "schemeshard_export_flow_proposals.h" #include "schemeshard_path_describer.h" +#include <ydb/core/ydb_convert/compression.h> + #include <util/string/builder.h> #include <util/string/cast.h> @@ -147,6 +149,10 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> BackupPropose( default: Y_FAIL("Unknown scheme"); } + + if (const auto compression = exportSettings.compression()) { + Y_VERIFY(FillCompression(*task.MutableCompression(), compression)); + } } break; } diff --git a/ydb/core/ydb_convert/CMakeLists.txt b/ydb/core/ydb_convert/CMakeLists.txt index c45dfba839..6299224adf 100644 --- a/ydb/core/ydb_convert/CMakeLists.txt +++ b/ydb/core/ydb_convert/CMakeLists.txt @@ -18,6 +18,7 @@ target_link_libraries(ydb-core-ydb_convert PUBLIC ydb-core-engine ydb-core-protos ydb-core-scheme + ydb-core-util ydb-library-binary_json ydb-library-dynumber library-mkql_proto-protos @@ -27,6 +28,7 @@ target_link_libraries(ydb-core-ydb_convert PUBLIC ) target_sources(ydb-core-ydb_convert PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/column_families.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/compression.cpp ${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/table_settings.cpp ${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/table_description.cpp ${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/ydb_convert.cpp diff --git a/ydb/core/ydb_convert/compression.cpp b/ydb/core/ydb_convert/compression.cpp new file mode 100644 index 0000000000..7cbfb3ec35 --- /dev/null +++ b/ydb/core/ydb_convert/compression.cpp @@ -0,0 +1,64 @@ +#include "compression.h" + +#include <ydb/core/util/yverify_stream.h> + +#include <util/string/builder.h> +#include <util/string/cast.h> + +namespace NKikimr { + +bool FillCompression(NKikimrSchemeOp::TBackupTask::TCompressionOptions& out, const TString& in, + Ydb::StatusIds::StatusCode& status, TString& error) +{ + auto returnError = [&](Ydb::StatusIds::StatusCode code, const TString& msg) { + status = code; + error = msg; + return false; + }; + + TStringBuf inBuf = in; + + if (inBuf.StartsWith("zstd")) { + Y_VERIFY(inBuf.SkipPrefix("zstd")); + + if (inBuf) { + if (inBuf.front() != '-') { + return returnError(Ydb::StatusIds::UNSUPPORTED, + TStringBuilder() << "Unsupported compression codec: " << in); + } + + int level; + if (!TryFromString(inBuf.Tail(1), level)) { + return returnError(Ydb::StatusIds::BAD_REQUEST, + TStringBuilder() << "Invalid compression level: " << inBuf.Tail(1)); + } + + out.SetLevel(level); + } + + out.SetCodec("zstd"); + } else { + return returnError(Ydb::StatusIds::UNSUPPORTED, + TStringBuilder() << "Unsupported compression codec: " << in); + } + + status = Ydb::StatusIds::SUCCESS; + return true; +} + +bool CheckCompression(const TString& in, Ydb::StatusIds::StatusCode& status, TString& error) { + NKikimrSchemeOp::TBackupTask::TCompressionOptions unused; + return FillCompression(unused, in, status, error); +} + +bool FillCompression(NKikimrSchemeOp::TBackupTask::TCompressionOptions& out, const TString& in) { + Ydb::StatusIds::StatusCode status; + TString error; + + const auto ret = FillCompression(out, in, status, error); + Y_VERIFY_S(ret, "Cannot parse compression: " << in); + + return ret; +} + +} // NKikimr diff --git a/ydb/core/ydb_convert/compression.h b/ydb/core/ydb_convert/compression.h new file mode 100644 index 0000000000..7cfd53abe0 --- /dev/null +++ b/ydb/core/ydb_convert/compression.h @@ -0,0 +1,10 @@ +#pragma once + +#include <ydb/core/protos/flat_scheme_op.pb.h> + +namespace NKikimr { + +bool CheckCompression(const TString& in, Ydb::StatusIds::StatusCode& status, TString& error); +bool FillCompression(NKikimrSchemeOp::TBackupTask::TCompressionOptions& out, const TString& in); + +} // NKikimr diff --git a/ydb/core/ydb_convert/compression_ut.cpp b/ydb/core/ydb_convert/compression_ut.cpp new file mode 100644 index 0000000000..9fd30549b1 --- /dev/null +++ b/ydb/core/ydb_convert/compression_ut.cpp @@ -0,0 +1,39 @@ +#include "compression.h" + +#include <library/cpp/testing/unittest/registar.h> + +namespace NKikimr { + +Y_UNIT_TEST_SUITE(CompressionTests) { + void Check(const TString& compression, Ydb::StatusIds::StatusCode expectedStatus) { + Ydb::StatusIds::StatusCode status; + TString error; + CheckCompression(compression, status, error); + UNIT_ASSERT_VALUES_EQUAL_C(status, expectedStatus, error); + } + + void Fill(const TString& compression, const TString& expectedCodec, int expectedLevel) { + NKikimrSchemeOp::TBackupTask::TCompressionOptions proto; + FillCompression(proto, compression); + UNIT_ASSERT_VALUES_EQUAL(proto.GetCodec(), expectedCodec); + UNIT_ASSERT_VALUES_EQUAL(proto.GetLevel(), expectedLevel); + } + + Y_UNIT_TEST(Zstd) { + Check("zstd", Ydb::StatusIds::SUCCESS); + Check("zstd-1", Ydb::StatusIds::SUCCESS); + Check("zstd--1", Ydb::StatusIds::SUCCESS); + Check("zstd-foo", Ydb::StatusIds::BAD_REQUEST); + + Fill("zstd", "zstd", 0); + Fill("zstd-1", "zstd", 1); + Fill("zstd--1", "zstd", -1); + } + + Y_UNIT_TEST(Unsupported) { + Check("foo", Ydb::StatusIds::UNSUPPORTED); + Check("zstdfoo", Ydb::StatusIds::UNSUPPORTED); + } +} + +} // NKikimr diff --git a/ydb/core/ydb_convert/ut/CMakeLists.darwin.txt b/ydb/core/ydb_convert/ut/CMakeLists.darwin.txt index d21aef26fb..4a82ea56c5 100644 --- a/ydb/core/ydb_convert/ut/CMakeLists.darwin.txt +++ b/ydb/core/ydb_convert/ut/CMakeLists.darwin.txt @@ -29,6 +29,7 @@ target_link_options(ydb-core-ydb_convert-ut PRIVATE CoreFoundation ) target_sources(ydb-core-ydb_convert-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/compression_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/ydb_convert_ut.cpp ) add_test( diff --git a/ydb/core/ydb_convert/ut/CMakeLists.linux.txt b/ydb/core/ydb_convert/ut/CMakeLists.linux.txt index de2bbe219e..671a6a97b6 100644 --- a/ydb/core/ydb_convert/ut/CMakeLists.linux.txt +++ b/ydb/core/ydb_convert/ut/CMakeLists.linux.txt @@ -32,6 +32,7 @@ target_link_options(ydb-core-ydb_convert-ut PRIVATE -ldl ) target_sources(ydb-core-ydb_convert-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/compression_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/ydb_convert/ydb_convert_ut.cpp ) add_test( diff --git a/ydb/public/api/protos/ydb_export.proto b/ydb/public/api/protos/ydb_export.proto index 31f49667a9..ca1d796929 100644 --- a/ydb/public/api/protos/ydb_export.proto +++ b/ydb/public/api/protos/ydb_export.proto @@ -106,6 +106,11 @@ message ExportToS3Settings { string description = 7 [(length).le = 128]; uint32 number_of_retries = 8; StorageClass storage_class = 9; + + // Codec used to compress data. Codecs are available: + // - zstd. + // - zstd-N, where N is compression level, e.g. zstd-3. + string compression = 10; } message ExportToS3Result { diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_export.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_export.cpp index a82a8f8aa3..7be1701ef1 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_export.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_export.cpp @@ -262,6 +262,13 @@ void TCommandExportToS3::Config(TConfig& config) { config.Opts->AddLongOption("retries", "Number of retries") .RequiredArgument("NUM").StoreResult(&NumberOfRetries).DefaultValue(NumberOfRetries); + config.Opts->AddLongOption("compression", TStringBuilder() + << "Codec used to compress data" << Endl + << " Available options:" << Endl + << " - zstd" << Endl + << " - zstd-N (N is compression level, e.g. zstd-3)" << Endl) + .RequiredArgument("STRING").StoreResult(&Compression); + AddDeprecatedJsonOption(config); AddFormats(config, { EOutputFormat::Pretty, EOutputFormat::ProtoJsonBase64 }); config.Opts->MutuallyExclusive("json", "format"); @@ -307,6 +314,10 @@ int TCommandExportToS3::Run(TConfig& config) { settings.NumberOfRetries(NumberOfRetries); + if (Compression) { + settings.Compression(Compression); + } + const TDriver driver = CreateDriver(config); TSchemeClient schemeClient(driver); diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_export.h b/ydb/public/lib/ydb_cli/commands/ydb_service_export.h index 6f842321ec..56d54c216c 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_export.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_export.h @@ -70,6 +70,7 @@ private: TVector<TRegExMatch> ExclusionPatterns; TString Description; ui32 NumberOfRetries = 10; + TString Compression; }; } diff --git a/ydb/public/lib/ydb_cli/common/print_operation.cpp b/ydb/public/lib/ydb_cli/common/print_operation.cpp index 28a84375dc..98bbdeeea5 100644 --- a/ydb/public/lib/ydb_cli/common/print_operation.cpp +++ b/ydb/public/lib/ydb_cli/common/print_operation.cpp @@ -141,6 +141,9 @@ namespace { if constexpr (std::is_same_v<NExport::TExportToS3Response, T>) { freeText << "StorageClass: " << settings.StorageClass_ << Endl; + if (settings.Compression_) { + freeText << "Compression: " << *settings.Compression_ << Endl; + } } if (!status.GetIssues().Empty()) { diff --git a/ydb/public/sdk/cpp/client/ydb_export/export.cpp b/ydb/public/sdk/cpp/client/ydb_export/export.cpp index c391c033e1..f91398bdbd 100644 --- a/ydb/public/sdk/cpp/client/ydb_export/export.cpp +++ b/ydb/public/sdk/cpp/client/ydb_export/export.cpp @@ -13,6 +13,8 @@ #include <google/protobuf/repeated_field.h> #include <google/protobuf/timestamp.pb.h> +#include <util/stream/str.h> + namespace NYdb { namespace NExport { @@ -95,6 +97,10 @@ TExportToS3Response::TExportToS3Response(TStatus&& status, Ydb::Operations::Oper Metadata_.Settings.Description(metadata.settings().description()); Metadata_.Settings.NumberOfRetries(metadata.settings().number_of_retries()); + if (metadata.settings().compression()) { + Metadata_.Settings.Compression(metadata.settings().compression()); + } + // progress Metadata_.Progress = TProtoAccessor::FromProto(metadata.progress()); Metadata_.ItemsProgress = ItemsProgressFromProto(metadata.items_progress()); @@ -192,6 +198,10 @@ TFuture<TExportToS3Response> TExportClient::ExportToS3(const TExportToS3Settings request.mutable_settings()->set_number_of_retries(settings.NumberOfRetries_.GetRef()); } + if (settings.Compression_) { + request.mutable_settings()->set_compression(*settings.Compression_); + } + return Impl_->ExportToS3(std::move(request), settings); } diff --git a/ydb/public/sdk/cpp/client/ydb_export/export.h b/ydb/public/sdk/cpp/client/ydb_export/export.h index 38225e36aa..504f6ad9a6 100644 --- a/ydb/public/sdk/cpp/client/ydb_export/export.h +++ b/ydb/public/sdk/cpp/client/ydb_export/export.h @@ -89,6 +89,7 @@ struct TExportToS3Settings : public TOperationRequestSettings<TExportToS3Setting FLUENT_SETTING_VECTOR(TItem, Item); FLUENT_SETTING_OPTIONAL(TString, Description); FLUENT_SETTING_OPTIONAL(ui32, NumberOfRetries); + FLUENT_SETTING_OPTIONAL(TString, Compression); }; class TExportToS3Response : public TOperation { @@ -125,12 +126,10 @@ private: } // namespace NExport } // namespace NYdb -template<> -inline void Out<NYdb::NExport::TExportToYtResponse>(IOutputStream& o, const NYdb::NExport::TExportToYtResponse& x) { +Y_DECLARE_OUT_SPEC(inline, NYdb::NExport::TExportToYtResponse, o, x) { return x.Out(o); } -template<> -inline void Out<NYdb::NExport::TExportToS3Response>(IOutputStream& o, const NYdb::NExport::TExportToS3Response& x) { +Y_DECLARE_OUT_SPEC(inline, NYdb::NExport::TExportToS3Response, o, x) { return x.Out(o); } |