diff options
author | Ilnaz Nizametdinov <i.nizametdinov@gmail.com> | 2022-06-15 13:43:58 +0300 |
---|---|---|
committer | Daniil Cherednik <dan.cherednik@gmail.com> | 2022-06-15 13:43:58 +0300 |
commit | 4b9c305c0b7baac37eb298e810e86d999f057e45 (patch) | |
tree | e2fcf75767c868a8914d9560d2b0cce2f7bd9078 | |
parent | ec54578fd0328cc795ce56f9abcacb7e979f09ed (diff) | |
download | ydb-4b9c305c0b7baac37eb298e810e86d999f057e45.tar.gz |
22-2: Compressed backups: public api, cpp sdk, cli KIKIMR-14803
merge from trunk: r9566079
REVIEW: 2630448
x-ydb-stable-ref: 1b5a0b97fe622ff831c16634b7f72fe4e8d3f587
-rw-r--r-- | ydb/core/grpc_services/rpc_export.cpp | 25 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp | 6 | ||||
-rw-r--r-- | ydb/core/ydb_convert/compression.cpp | 64 | ||||
-rw-r--r-- | ydb/core/ydb_convert/compression.h | 10 | ||||
-rw-r--r-- | ydb/core/ydb_convert/compression_ut.cpp | 39 | ||||
-rw-r--r-- | ydb/core/ydb_convert/ut/ya.make | 1 | ||||
-rw-r--r-- | ydb/core/ydb_convert/ya.make | 2 | ||||
-rw-r--r-- | ydb/public/api/protos/ydb_export.proto | 5 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_service_export.cpp | 11 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_service_export.h | 1 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/common/print_operation.cpp | 3 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_export/export.cpp | 10 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_export/export.h | 7 |
13 files changed, 173 insertions, 11 deletions
diff --git a/ydb/core/grpc_services/rpc_export.cpp b/ydb/core/grpc_services/rpc_export.cpp index f0fa7be3df..403c000462 100644 --- a/ydb/core/grpc_services/rpc_export.cpp +++ b/ydb/core/grpc_services/rpc_export.cpp @@ -4,6 +4,7 @@ #include "rpc_operation_request_base.h" #include <ydb/core/tx/schemeshard/schemeshard_export.h> +#include <ydb/core/ydb_convert/compression.h> #include <library/cpp/actors/core/hfunc.h> @@ -35,11 +36,11 @@ class TExportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>, } auto& createExport = *ev->Record.MutableRequest(); - createExport.MutableOperationParams()->CopyFrom(request.operation_params()); - if (std::is_same_v<TEvRequest, TEvExportToYtRequest>) { - createExport.MutableExportToYtSettings()->CopyFrom(request.settings()); - } else if (std::is_same_v<TEvRequest, TEvExportToS3Request>) { - createExport.MutableExportToS3Settings()->CopyFrom(request.settings()); + *createExport.MutableOperationParams() = request.operation_params(); + if constexpr (std::is_same_v<TEvRequest, TEvExportToYtRequest>) { + *createExport.MutableExportToYtSettings() = request.settings(); + } else if constexpr (std::is_same_v<TEvRequest, TEvExportToS3Request>) { + *createExport.MutableExportToS3Settings() = request.settings(); } return ev.Release(); @@ -175,12 +176,22 @@ public: using TRpcOperationRequestActor<TDerived, TEvRequest, true>::TRpcOperationRequestActor; void Bootstrap(const TActorContext&) { - const auto& request = *this->Request->GetProtoRequest(); + const auto& settings = this->Request->GetProtoRequest()->settings(); - if (request.settings().items().empty()) { + if (settings.items().empty()) { return this->Reply(StatusIds::BAD_REQUEST, TIssuesIds::DEFAULT_ERROR, "Items are not set"); } + if constexpr (std::is_same_v<TEvRequest, TEvExportToS3Request>) { + if (settings.compression()) { + StatusIds::StatusCode status; + TString error; + if (!CheckCompression(settings.compression(), status, error)) { + return this->Reply(status, TIssuesIds::DEFAULT_ERROR, error); + } + } + } + ResolvePaths(ExtractPaths()); } diff --git a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp index 17223ef5a5..2f2176743e 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp @@ -1,6 +1,8 @@ #include "schemeshard_export_flow_proposals.h" #include "schemeshard_path_describer.h" +#include <ydb/core/ydb_convert/compression.h> + #include <util/string/builder.h> #include <util/string/cast.h> @@ -147,6 +149,10 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> BackupPropose( default: Y_FAIL("Unknown scheme"); } + + if (const auto compression = exportSettings.compression()) { + Y_VERIFY(FillCompression(*task.MutableCompression(), compression)); + } } break; } diff --git a/ydb/core/ydb_convert/compression.cpp b/ydb/core/ydb_convert/compression.cpp new file mode 100644 index 0000000000..7cbfb3ec35 --- /dev/null +++ b/ydb/core/ydb_convert/compression.cpp @@ -0,0 +1,64 @@ +#include "compression.h" + +#include <ydb/core/util/yverify_stream.h> + +#include <util/string/builder.h> +#include <util/string/cast.h> + +namespace NKikimr { + +bool FillCompression(NKikimrSchemeOp::TBackupTask::TCompressionOptions& out, const TString& in, + Ydb::StatusIds::StatusCode& status, TString& error) +{ + auto returnError = [&](Ydb::StatusIds::StatusCode code, const TString& msg) { + status = code; + error = msg; + return false; + }; + + TStringBuf inBuf = in; + + if (inBuf.StartsWith("zstd")) { + Y_VERIFY(inBuf.SkipPrefix("zstd")); + + if (inBuf) { + if (inBuf.front() != '-') { + return returnError(Ydb::StatusIds::UNSUPPORTED, + TStringBuilder() << "Unsupported compression codec: " << in); + } + + int level; + if (!TryFromString(inBuf.Tail(1), level)) { + return returnError(Ydb::StatusIds::BAD_REQUEST, + TStringBuilder() << "Invalid compression level: " << inBuf.Tail(1)); + } + + out.SetLevel(level); + } + + out.SetCodec("zstd"); + } else { + return returnError(Ydb::StatusIds::UNSUPPORTED, + TStringBuilder() << "Unsupported compression codec: " << in); + } + + status = Ydb::StatusIds::SUCCESS; + return true; +} + +bool CheckCompression(const TString& in, Ydb::StatusIds::StatusCode& status, TString& error) { + NKikimrSchemeOp::TBackupTask::TCompressionOptions unused; + return FillCompression(unused, in, status, error); +} + +bool FillCompression(NKikimrSchemeOp::TBackupTask::TCompressionOptions& out, const TString& in) { + Ydb::StatusIds::StatusCode status; + TString error; + + const auto ret = FillCompression(out, in, status, error); + Y_VERIFY_S(ret, "Cannot parse compression: " << in); + + return ret; +} + +} // NKikimr diff --git a/ydb/core/ydb_convert/compression.h b/ydb/core/ydb_convert/compression.h new file mode 100644 index 0000000000..7cfd53abe0 --- /dev/null +++ b/ydb/core/ydb_convert/compression.h @@ -0,0 +1,10 @@ +#pragma once + +#include <ydb/core/protos/flat_scheme_op.pb.h> + +namespace NKikimr { + +bool CheckCompression(const TString& in, Ydb::StatusIds::StatusCode& status, TString& error); +bool FillCompression(NKikimrSchemeOp::TBackupTask::TCompressionOptions& out, const TString& in); + +} // NKikimr diff --git a/ydb/core/ydb_convert/compression_ut.cpp b/ydb/core/ydb_convert/compression_ut.cpp new file mode 100644 index 0000000000..9fd30549b1 --- /dev/null +++ b/ydb/core/ydb_convert/compression_ut.cpp @@ -0,0 +1,39 @@ +#include "compression.h" + +#include <library/cpp/testing/unittest/registar.h> + +namespace NKikimr { + +Y_UNIT_TEST_SUITE(CompressionTests) { + void Check(const TString& compression, Ydb::StatusIds::StatusCode expectedStatus) { + Ydb::StatusIds::StatusCode status; + TString error; + CheckCompression(compression, status, error); + UNIT_ASSERT_VALUES_EQUAL_C(status, expectedStatus, error); + } + + void Fill(const TString& compression, const TString& expectedCodec, int expectedLevel) { + NKikimrSchemeOp::TBackupTask::TCompressionOptions proto; + FillCompression(proto, compression); + UNIT_ASSERT_VALUES_EQUAL(proto.GetCodec(), expectedCodec); + UNIT_ASSERT_VALUES_EQUAL(proto.GetLevel(), expectedLevel); + } + + Y_UNIT_TEST(Zstd) { + Check("zstd", Ydb::StatusIds::SUCCESS); + Check("zstd-1", Ydb::StatusIds::SUCCESS); + Check("zstd--1", Ydb::StatusIds::SUCCESS); + Check("zstd-foo", Ydb::StatusIds::BAD_REQUEST); + + Fill("zstd", "zstd", 0); + Fill("zstd-1", "zstd", 1); + Fill("zstd--1", "zstd", -1); + } + + Y_UNIT_TEST(Unsupported) { + Check("foo", Ydb::StatusIds::UNSUPPORTED); + Check("zstdfoo", Ydb::StatusIds::UNSUPPORTED); + } +} + +} // NKikimr diff --git a/ydb/core/ydb_convert/ut/ya.make b/ydb/core/ydb_convert/ut/ya.make index d71f89e815..e824e2f107 100644 --- a/ydb/core/ydb_convert/ut/ya.make +++ b/ydb/core/ydb_convert/ut/ya.make @@ -9,6 +9,7 @@ IF (SANITIZER_TYPE OR WITH_VALGRIND) ENDIF() SRCS( + compression_ut.cpp ydb_convert_ut.cpp ) diff --git a/ydb/core/ydb_convert/ya.make b/ydb/core/ydb_convert/ya.make index 230db48744..9a6715a1ec 100644 --- a/ydb/core/ydb_convert/ya.make +++ b/ydb/core/ydb_convert/ya.make @@ -4,6 +4,7 @@ OWNER(g:kikimr) SRCS( column_families.cpp + compression.cpp table_settings.cpp table_description.cpp ydb_convert.cpp @@ -14,6 +15,7 @@ PEERDIR( ydb/core/engine ydb/core/protos ydb/core/scheme + ydb/core/util ydb/library/binary_json ydb/library/dynumber ydb/library/mkql_proto/protos diff --git a/ydb/public/api/protos/ydb_export.proto b/ydb/public/api/protos/ydb_export.proto index 31f49667a9..ca1d796929 100644 --- a/ydb/public/api/protos/ydb_export.proto +++ b/ydb/public/api/protos/ydb_export.proto @@ -106,6 +106,11 @@ message ExportToS3Settings { string description = 7 [(length).le = 128]; uint32 number_of_retries = 8; StorageClass storage_class = 9; + + // Codec used to compress data. Codecs are available: + // - zstd. + // - zstd-N, where N is compression level, e.g. zstd-3. + string compression = 10; } message ExportToS3Result { diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_export.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_export.cpp index f997343dc8..9e6d766048 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_export.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_export.cpp @@ -262,6 +262,13 @@ void TCommandExportToS3::Config(TConfig& config) { config.Opts->AddLongOption("retries", "Number of retries") .RequiredArgument("NUM").StoreResult(&NumberOfRetries).DefaultValue(NumberOfRetries); + config.Opts->AddLongOption("compression", TStringBuilder() + << "Codec used to compress data" << Endl + << " Available options:" << Endl + << " - zstd" << Endl + << " - zstd-N (N is compression level, e.g. zstd-3)" << Endl) + .RequiredArgument("STRING").StoreResult(&Compression); + AddJsonOption(config); AddFormats(config, { EOutputFormat::Pretty, EOutputFormat::ProtoJsonBase64 }); config.Opts->MutuallyExclusive("json", "format"); @@ -307,6 +314,10 @@ int TCommandExportToS3::Run(TConfig& config) { settings.NumberOfRetries(NumberOfRetries); + if (Compression) { + settings.Compression(Compression); + } + const TDriver driver = CreateDriver(config); TSchemeClient schemeClient(driver); diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_export.h b/ydb/public/lib/ydb_cli/commands/ydb_service_export.h index 6f842321ec..56d54c216c 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_export.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_export.h @@ -70,6 +70,7 @@ private: TVector<TRegExMatch> ExclusionPatterns; TString Description; ui32 NumberOfRetries = 10; + TString Compression; }; } diff --git a/ydb/public/lib/ydb_cli/common/print_operation.cpp b/ydb/public/lib/ydb_cli/common/print_operation.cpp index c254f854c9..233f7b6cda 100644 --- a/ydb/public/lib/ydb_cli/common/print_operation.cpp +++ b/ydb/public/lib/ydb_cli/common/print_operation.cpp @@ -141,6 +141,9 @@ namespace { if constexpr (std::is_same_v<NExport::TExportToS3Response, T>) { freeText << "StorageClass: " << settings.StorageClass_ << Endl; + if (settings.Compression_) { + freeText << "Compression: " << *settings.Compression_ << Endl; + } } if (!status.GetIssues().Empty()) { diff --git a/ydb/public/sdk/cpp/client/ydb_export/export.cpp b/ydb/public/sdk/cpp/client/ydb_export/export.cpp index c391c033e1..f91398bdbd 100644 --- a/ydb/public/sdk/cpp/client/ydb_export/export.cpp +++ b/ydb/public/sdk/cpp/client/ydb_export/export.cpp @@ -13,6 +13,8 @@ #include <google/protobuf/repeated_field.h> #include <google/protobuf/timestamp.pb.h> +#include <util/stream/str.h> + namespace NYdb { namespace NExport { @@ -95,6 +97,10 @@ TExportToS3Response::TExportToS3Response(TStatus&& status, Ydb::Operations::Oper Metadata_.Settings.Description(metadata.settings().description()); Metadata_.Settings.NumberOfRetries(metadata.settings().number_of_retries()); + if (metadata.settings().compression()) { + Metadata_.Settings.Compression(metadata.settings().compression()); + } + // progress Metadata_.Progress = TProtoAccessor::FromProto(metadata.progress()); Metadata_.ItemsProgress = ItemsProgressFromProto(metadata.items_progress()); @@ -192,6 +198,10 @@ TFuture<TExportToS3Response> TExportClient::ExportToS3(const TExportToS3Settings request.mutable_settings()->set_number_of_retries(settings.NumberOfRetries_.GetRef()); } + if (settings.Compression_) { + request.mutable_settings()->set_compression(*settings.Compression_); + } + return Impl_->ExportToS3(std::move(request), settings); } diff --git a/ydb/public/sdk/cpp/client/ydb_export/export.h b/ydb/public/sdk/cpp/client/ydb_export/export.h index 38225e36aa..504f6ad9a6 100644 --- a/ydb/public/sdk/cpp/client/ydb_export/export.h +++ b/ydb/public/sdk/cpp/client/ydb_export/export.h @@ -89,6 +89,7 @@ struct TExportToS3Settings : public TOperationRequestSettings<TExportToS3Setting FLUENT_SETTING_VECTOR(TItem, Item); FLUENT_SETTING_OPTIONAL(TString, Description); FLUENT_SETTING_OPTIONAL(ui32, NumberOfRetries); + FLUENT_SETTING_OPTIONAL(TString, Compression); }; class TExportToS3Response : public TOperation { @@ -125,12 +126,10 @@ private: } // namespace NExport } // namespace NYdb -template<> -inline void Out<NYdb::NExport::TExportToYtResponse>(IOutputStream& o, const NYdb::NExport::TExportToYtResponse& x) { +Y_DECLARE_OUT_SPEC(inline, NYdb::NExport::TExportToYtResponse, o, x) { return x.Out(o); } -template<> -inline void Out<NYdb::NExport::TExportToS3Response>(IOutputStream& o, const NYdb::NExport::TExportToS3Response& x) { +Y_DECLARE_OUT_SPEC(inline, NYdb::NExport::TExportToS3Response, o, x) { return x.Out(o); } |