aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <i.nizametdinov@gmail.com>2022-06-15 13:43:58 +0300
committerDaniil Cherednik <dan.cherednik@gmail.com>2022-06-15 13:43:58 +0300
commit4b9c305c0b7baac37eb298e810e86d999f057e45 (patch)
treee2fcf75767c868a8914d9560d2b0cce2f7bd9078
parentec54578fd0328cc795ce56f9abcacb7e979f09ed (diff)
downloadydb-4b9c305c0b7baac37eb298e810e86d999f057e45.tar.gz
22-2: Compressed backups: public api, cpp sdk, cli KIKIMR-14803
merge from trunk: r9566079 REVIEW: 2630448 x-ydb-stable-ref: 1b5a0b97fe622ff831c16634b7f72fe4e8d3f587
-rw-r--r--ydb/core/grpc_services/rpc_export.cpp25
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp6
-rw-r--r--ydb/core/ydb_convert/compression.cpp64
-rw-r--r--ydb/core/ydb_convert/compression.h10
-rw-r--r--ydb/core/ydb_convert/compression_ut.cpp39
-rw-r--r--ydb/core/ydb_convert/ut/ya.make1
-rw-r--r--ydb/core/ydb_convert/ya.make2
-rw-r--r--ydb/public/api/protos/ydb_export.proto5
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_export.cpp11
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_export.h1
-rw-r--r--ydb/public/lib/ydb_cli/common/print_operation.cpp3
-rw-r--r--ydb/public/sdk/cpp/client/ydb_export/export.cpp10
-rw-r--r--ydb/public/sdk/cpp/client/ydb_export/export.h7
13 files changed, 173 insertions, 11 deletions
diff --git a/ydb/core/grpc_services/rpc_export.cpp b/ydb/core/grpc_services/rpc_export.cpp
index f0fa7be3df..403c000462 100644
--- a/ydb/core/grpc_services/rpc_export.cpp
+++ b/ydb/core/grpc_services/rpc_export.cpp
@@ -4,6 +4,7 @@
#include "rpc_operation_request_base.h"
#include <ydb/core/tx/schemeshard/schemeshard_export.h>
+#include <ydb/core/ydb_convert/compression.h>
#include <library/cpp/actors/core/hfunc.h>
@@ -35,11 +36,11 @@ class TExportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>,
}
auto& createExport = *ev->Record.MutableRequest();
- createExport.MutableOperationParams()->CopyFrom(request.operation_params());
- if (std::is_same_v<TEvRequest, TEvExportToYtRequest>) {
- createExport.MutableExportToYtSettings()->CopyFrom(request.settings());
- } else if (std::is_same_v<TEvRequest, TEvExportToS3Request>) {
- createExport.MutableExportToS3Settings()->CopyFrom(request.settings());
+ *createExport.MutableOperationParams() = request.operation_params();
+ if constexpr (std::is_same_v<TEvRequest, TEvExportToYtRequest>) {
+ *createExport.MutableExportToYtSettings() = request.settings();
+ } else if constexpr (std::is_same_v<TEvRequest, TEvExportToS3Request>) {
+ *createExport.MutableExportToS3Settings() = request.settings();
}
return ev.Release();
@@ -175,12 +176,22 @@ public:
using TRpcOperationRequestActor<TDerived, TEvRequest, true>::TRpcOperationRequestActor;
void Bootstrap(const TActorContext&) {
- const auto& request = *this->Request->GetProtoRequest();
+ const auto& settings = this->Request->GetProtoRequest()->settings();
- if (request.settings().items().empty()) {
+ if (settings.items().empty()) {
return this->Reply(StatusIds::BAD_REQUEST, TIssuesIds::DEFAULT_ERROR, "Items are not set");
}
+ if constexpr (std::is_same_v<TEvRequest, TEvExportToS3Request>) {
+ if (settings.compression()) {
+ StatusIds::StatusCode status;
+ TString error;
+ if (!CheckCompression(settings.compression(), status, error)) {
+ return this->Reply(status, TIssuesIds::DEFAULT_ERROR, error);
+ }
+ }
+ }
+
ResolvePaths(ExtractPaths());
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp
index 17223ef5a5..2f2176743e 100644
--- a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp
@@ -1,6 +1,8 @@
#include "schemeshard_export_flow_proposals.h"
#include "schemeshard_path_describer.h"
+#include <ydb/core/ydb_convert/compression.h>
+
#include <util/string/builder.h>
#include <util/string/cast.h>
@@ -147,6 +149,10 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> BackupPropose(
default:
Y_FAIL("Unknown scheme");
}
+
+ if (const auto compression = exportSettings.compression()) {
+ Y_VERIFY(FillCompression(*task.MutableCompression(), compression));
+ }
}
break;
}
diff --git a/ydb/core/ydb_convert/compression.cpp b/ydb/core/ydb_convert/compression.cpp
new file mode 100644
index 0000000000..7cbfb3ec35
--- /dev/null
+++ b/ydb/core/ydb_convert/compression.cpp
@@ -0,0 +1,64 @@
+#include "compression.h"
+
+#include <ydb/core/util/yverify_stream.h>
+
+#include <util/string/builder.h>
+#include <util/string/cast.h>
+
+namespace NKikimr {
+
+bool FillCompression(NKikimrSchemeOp::TBackupTask::TCompressionOptions& out, const TString& in,
+ Ydb::StatusIds::StatusCode& status, TString& error)
+{
+ auto returnError = [&](Ydb::StatusIds::StatusCode code, const TString& msg) {
+ status = code;
+ error = msg;
+ return false;
+ };
+
+ TStringBuf inBuf = in;
+
+ if (inBuf.StartsWith("zstd")) {
+ Y_VERIFY(inBuf.SkipPrefix("zstd"));
+
+ if (inBuf) {
+ if (inBuf.front() != '-') {
+ return returnError(Ydb::StatusIds::UNSUPPORTED,
+ TStringBuilder() << "Unsupported compression codec: " << in);
+ }
+
+ int level;
+ if (!TryFromString(inBuf.Tail(1), level)) {
+ return returnError(Ydb::StatusIds::BAD_REQUEST,
+ TStringBuilder() << "Invalid compression level: " << inBuf.Tail(1));
+ }
+
+ out.SetLevel(level);
+ }
+
+ out.SetCodec("zstd");
+ } else {
+ return returnError(Ydb::StatusIds::UNSUPPORTED,
+ TStringBuilder() << "Unsupported compression codec: " << in);
+ }
+
+ status = Ydb::StatusIds::SUCCESS;
+ return true;
+}
+
+bool CheckCompression(const TString& in, Ydb::StatusIds::StatusCode& status, TString& error) {
+ NKikimrSchemeOp::TBackupTask::TCompressionOptions unused;
+ return FillCompression(unused, in, status, error);
+}
+
+bool FillCompression(NKikimrSchemeOp::TBackupTask::TCompressionOptions& out, const TString& in) {
+ Ydb::StatusIds::StatusCode status;
+ TString error;
+
+ const auto ret = FillCompression(out, in, status, error);
+ Y_VERIFY_S(ret, "Cannot parse compression: " << in);
+
+ return ret;
+}
+
+} // NKikimr
diff --git a/ydb/core/ydb_convert/compression.h b/ydb/core/ydb_convert/compression.h
new file mode 100644
index 0000000000..7cfd53abe0
--- /dev/null
+++ b/ydb/core/ydb_convert/compression.h
@@ -0,0 +1,10 @@
+#pragma once
+
+#include <ydb/core/protos/flat_scheme_op.pb.h>
+
+namespace NKikimr {
+
+bool CheckCompression(const TString& in, Ydb::StatusIds::StatusCode& status, TString& error);
+bool FillCompression(NKikimrSchemeOp::TBackupTask::TCompressionOptions& out, const TString& in);
+
+} // NKikimr
diff --git a/ydb/core/ydb_convert/compression_ut.cpp b/ydb/core/ydb_convert/compression_ut.cpp
new file mode 100644
index 0000000000..9fd30549b1
--- /dev/null
+++ b/ydb/core/ydb_convert/compression_ut.cpp
@@ -0,0 +1,39 @@
+#include "compression.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace NKikimr {
+
+Y_UNIT_TEST_SUITE(CompressionTests) {
+ void Check(const TString& compression, Ydb::StatusIds::StatusCode expectedStatus) {
+ Ydb::StatusIds::StatusCode status;
+ TString error;
+ CheckCompression(compression, status, error);
+ UNIT_ASSERT_VALUES_EQUAL_C(status, expectedStatus, error);
+ }
+
+ void Fill(const TString& compression, const TString& expectedCodec, int expectedLevel) {
+ NKikimrSchemeOp::TBackupTask::TCompressionOptions proto;
+ FillCompression(proto, compression);
+ UNIT_ASSERT_VALUES_EQUAL(proto.GetCodec(), expectedCodec);
+ UNIT_ASSERT_VALUES_EQUAL(proto.GetLevel(), expectedLevel);
+ }
+
+ Y_UNIT_TEST(Zstd) {
+ Check("zstd", Ydb::StatusIds::SUCCESS);
+ Check("zstd-1", Ydb::StatusIds::SUCCESS);
+ Check("zstd--1", Ydb::StatusIds::SUCCESS);
+ Check("zstd-foo", Ydb::StatusIds::BAD_REQUEST);
+
+ Fill("zstd", "zstd", 0);
+ Fill("zstd-1", "zstd", 1);
+ Fill("zstd--1", "zstd", -1);
+ }
+
+ Y_UNIT_TEST(Unsupported) {
+ Check("foo", Ydb::StatusIds::UNSUPPORTED);
+ Check("zstdfoo", Ydb::StatusIds::UNSUPPORTED);
+ }
+}
+
+} // NKikimr
diff --git a/ydb/core/ydb_convert/ut/ya.make b/ydb/core/ydb_convert/ut/ya.make
index d71f89e815..e824e2f107 100644
--- a/ydb/core/ydb_convert/ut/ya.make
+++ b/ydb/core/ydb_convert/ut/ya.make
@@ -9,6 +9,7 @@ IF (SANITIZER_TYPE OR WITH_VALGRIND)
ENDIF()
SRCS(
+ compression_ut.cpp
ydb_convert_ut.cpp
)
diff --git a/ydb/core/ydb_convert/ya.make b/ydb/core/ydb_convert/ya.make
index 230db48744..9a6715a1ec 100644
--- a/ydb/core/ydb_convert/ya.make
+++ b/ydb/core/ydb_convert/ya.make
@@ -4,6 +4,7 @@ OWNER(g:kikimr)
SRCS(
column_families.cpp
+ compression.cpp
table_settings.cpp
table_description.cpp
ydb_convert.cpp
@@ -14,6 +15,7 @@ PEERDIR(
ydb/core/engine
ydb/core/protos
ydb/core/scheme
+ ydb/core/util
ydb/library/binary_json
ydb/library/dynumber
ydb/library/mkql_proto/protos
diff --git a/ydb/public/api/protos/ydb_export.proto b/ydb/public/api/protos/ydb_export.proto
index 31f49667a9..ca1d796929 100644
--- a/ydb/public/api/protos/ydb_export.proto
+++ b/ydb/public/api/protos/ydb_export.proto
@@ -106,6 +106,11 @@ message ExportToS3Settings {
string description = 7 [(length).le = 128];
uint32 number_of_retries = 8;
StorageClass storage_class = 9;
+
+ // Codec used to compress data. Codecs are available:
+ // - zstd.
+ // - zstd-N, where N is compression level, e.g. zstd-3.
+ string compression = 10;
}
message ExportToS3Result {
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_export.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_export.cpp
index f997343dc8..9e6d766048 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_export.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_export.cpp
@@ -262,6 +262,13 @@ void TCommandExportToS3::Config(TConfig& config) {
config.Opts->AddLongOption("retries", "Number of retries")
.RequiredArgument("NUM").StoreResult(&NumberOfRetries).DefaultValue(NumberOfRetries);
+ config.Opts->AddLongOption("compression", TStringBuilder()
+ << "Codec used to compress data" << Endl
+ << " Available options:" << Endl
+ << " - zstd" << Endl
+ << " - zstd-N (N is compression level, e.g. zstd-3)" << Endl)
+ .RequiredArgument("STRING").StoreResult(&Compression);
+
AddJsonOption(config);
AddFormats(config, { EOutputFormat::Pretty, EOutputFormat::ProtoJsonBase64 });
config.Opts->MutuallyExclusive("json", "format");
@@ -307,6 +314,10 @@ int TCommandExportToS3::Run(TConfig& config) {
settings.NumberOfRetries(NumberOfRetries);
+ if (Compression) {
+ settings.Compression(Compression);
+ }
+
const TDriver driver = CreateDriver(config);
TSchemeClient schemeClient(driver);
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_export.h b/ydb/public/lib/ydb_cli/commands/ydb_service_export.h
index 6f842321ec..56d54c216c 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_export.h
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_export.h
@@ -70,6 +70,7 @@ private:
TVector<TRegExMatch> ExclusionPatterns;
TString Description;
ui32 NumberOfRetries = 10;
+ TString Compression;
};
}
diff --git a/ydb/public/lib/ydb_cli/common/print_operation.cpp b/ydb/public/lib/ydb_cli/common/print_operation.cpp
index c254f854c9..233f7b6cda 100644
--- a/ydb/public/lib/ydb_cli/common/print_operation.cpp
+++ b/ydb/public/lib/ydb_cli/common/print_operation.cpp
@@ -141,6 +141,9 @@ namespace {
if constexpr (std::is_same_v<NExport::TExportToS3Response, T>) {
freeText << "StorageClass: " << settings.StorageClass_ << Endl;
+ if (settings.Compression_) {
+ freeText << "Compression: " << *settings.Compression_ << Endl;
+ }
}
if (!status.GetIssues().Empty()) {
diff --git a/ydb/public/sdk/cpp/client/ydb_export/export.cpp b/ydb/public/sdk/cpp/client/ydb_export/export.cpp
index c391c033e1..f91398bdbd 100644
--- a/ydb/public/sdk/cpp/client/ydb_export/export.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_export/export.cpp
@@ -13,6 +13,8 @@
#include <google/protobuf/repeated_field.h>
#include <google/protobuf/timestamp.pb.h>
+#include <util/stream/str.h>
+
namespace NYdb {
namespace NExport {
@@ -95,6 +97,10 @@ TExportToS3Response::TExportToS3Response(TStatus&& status, Ydb::Operations::Oper
Metadata_.Settings.Description(metadata.settings().description());
Metadata_.Settings.NumberOfRetries(metadata.settings().number_of_retries());
+ if (metadata.settings().compression()) {
+ Metadata_.Settings.Compression(metadata.settings().compression());
+ }
+
// progress
Metadata_.Progress = TProtoAccessor::FromProto(metadata.progress());
Metadata_.ItemsProgress = ItemsProgressFromProto(metadata.items_progress());
@@ -192,6 +198,10 @@ TFuture<TExportToS3Response> TExportClient::ExportToS3(const TExportToS3Settings
request.mutable_settings()->set_number_of_retries(settings.NumberOfRetries_.GetRef());
}
+ if (settings.Compression_) {
+ request.mutable_settings()->set_compression(*settings.Compression_);
+ }
+
return Impl_->ExportToS3(std::move(request), settings);
}
diff --git a/ydb/public/sdk/cpp/client/ydb_export/export.h b/ydb/public/sdk/cpp/client/ydb_export/export.h
index 38225e36aa..504f6ad9a6 100644
--- a/ydb/public/sdk/cpp/client/ydb_export/export.h
+++ b/ydb/public/sdk/cpp/client/ydb_export/export.h
@@ -89,6 +89,7 @@ struct TExportToS3Settings : public TOperationRequestSettings<TExportToS3Setting
FLUENT_SETTING_VECTOR(TItem, Item);
FLUENT_SETTING_OPTIONAL(TString, Description);
FLUENT_SETTING_OPTIONAL(ui32, NumberOfRetries);
+ FLUENT_SETTING_OPTIONAL(TString, Compression);
};
class TExportToS3Response : public TOperation {
@@ -125,12 +126,10 @@ private:
} // namespace NExport
} // namespace NYdb
-template<>
-inline void Out<NYdb::NExport::TExportToYtResponse>(IOutputStream& o, const NYdb::NExport::TExportToYtResponse& x) {
+Y_DECLARE_OUT_SPEC(inline, NYdb::NExport::TExportToYtResponse, o, x) {
return x.Out(o);
}
-template<>
-inline void Out<NYdb::NExport::TExportToS3Response>(IOutputStream& o, const NYdb::NExport::TExportToS3Response& x) {
+Y_DECLARE_OUT_SPEC(inline, NYdb::NExport::TExportToS3Response, o, x) {
return x.Out(o);
}