aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlia Shakhov <pixcc@ydb.tech>2024-12-25 16:21:34 +0300
committerGitHub <noreply@github.com>2024-12-25 16:21:34 +0300
commit2edc9b5e6d5b07f320692cfe212e04f6e49f5ec5 (patch)
treeb2256dbbc2b8b1824476bccfe07ff2dfe60c8db5
parentc19b139582006a4e50993664b725209825e2336b (diff)
downloadydb-2edc9b5e6d5b07f320692cfe212e04f6e49f5ec5.tar.gz
Add export checksums (#12728)
-rw-r--r--ydb/core/protos/feature_flags.proto1
-rw-r--r--ydb/core/protos/flat_scheme_op.proto1
-rw-r--r--ydb/core/testlib/basics/feature_flags.h1
-rw-r--r--ydb/core/tx/datashard/backup_restore_common.cpp6
-rw-r--r--ydb/core/tx/datashard/backup_restore_common.h2
-rw-r--r--ydb/core/tx/datashard/backup_restore_traits.cpp21
-rw-r--r--ydb/core/tx/datashard/backup_restore_traits.h20
-rw-r--r--ydb/core/tx/datashard/export_checksum.cpp39
-rw-r--r--ydb/core/tx/datashard/export_checksum.h20
-rw-r--r--ydb/core/tx/datashard/export_s3.h5
-rw-r--r--ydb/core/tx/datashard/export_s3_buffer.h4
-rw-r--r--ydb/core/tx/datashard/export_s3_buffer_raw.cpp26
-rw-r--r--ydb/core/tx/datashard/export_s3_buffer_raw.h4
-rw-r--r--ydb/core/tx/datashard/export_s3_buffer_zstd.cpp13
-rw-r--r--ydb/core/tx/datashard/export_s3_uploader.cpp142
-rw-r--r--ydb/core/tx/datashard/export_scan.h7
-rw-r--r--ydb/core/tx/datashard/extstorage_usage_config.h14
-rw-r--r--ydb/core/tx/datashard/ya.make1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__init.cpp1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_export.cpp3
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_export__create.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.h2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_schema.h5
-rw-r--r--ydb/core/tx/schemeshard/ut_export/ut_export.cpp105
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/test_env.cpp1
-rw-r--r--ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema6
27 files changed, 398 insertions, 56 deletions
diff --git a/ydb/core/protos/feature_flags.proto b/ydb/core/protos/feature_flags.proto
index 1f8a704658..fd7947080a 100644
--- a/ydb/core/protos/feature_flags.proto
+++ b/ydb/core/protos/feature_flags.proto
@@ -184,4 +184,5 @@ message TFeatureFlags {
optional bool EnableDataShardInMemoryStateMigration = 159 [default = true];
optional bool EnableDataShardInMemoryStateMigrationAcrossGenerations = 160 [default = false];
optional bool DisableLocalDBEraseCache = 161 [default = false];
+ optional bool EnableExportChecksums = 162 [default = false];
}
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index 4c52ff262e..e412b2b1f6 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -1246,6 +1246,7 @@ message TBackupTask {
optional uint64 SnapshotStep = 14;
optional uint64 SnapshotTxId = 15;
+ optional bool EnableChecksums = 16; // currently available for s3
}
message TRestoreTask {
diff --git a/ydb/core/testlib/basics/feature_flags.h b/ydb/core/testlib/basics/feature_flags.h
index 5124e9ed3f..25df60794a 100644
--- a/ydb/core/testlib/basics/feature_flags.h
+++ b/ydb/core/testlib/basics/feature_flags.h
@@ -71,6 +71,7 @@ public:
FEATURE_FLAG_SETTER(EnableParameterizedDecimal)
FEATURE_FLAG_SETTER(EnableTopicAutopartitioningForCDC)
FEATURE_FLAG_SETTER(EnableFollowerStats)
+ FEATURE_FLAG_SETTER(EnableExportChecksums)
#undef FEATURE_FLAG_SETTER
};
diff --git a/ydb/core/tx/datashard/backup_restore_common.cpp b/ydb/core/tx/datashard/backup_restore_common.cpp
index dad590c13d..5daaf26e22 100644
--- a/ydb/core/tx/datashard/backup_restore_common.cpp
+++ b/ydb/core/tx/datashard/backup_restore_common.cpp
@@ -6,9 +6,13 @@ void TMetadata::AddFullBackup(TFullBackupMetadata::TPtr fb) {
FullBackups.emplace(fb->SnapshotVts, fb);
}
+void TMetadata::SetVersion(ui64 version) {
+ Version = version;
+}
+
TString TMetadata::Serialize() const {
NJson::TJsonMap m;
- m["version"] = 0;
+ m["version"] = Version;
NJson::TJsonArray fullBackups;
for (auto &[tp, _] : FullBackups) {
NJson::TJsonMap backupMap;
diff --git a/ydb/core/tx/datashard/backup_restore_common.h b/ydb/core/tx/datashard/backup_restore_common.h
index 61da3f7e53..d932f35de9 100644
--- a/ydb/core/tx/datashard/backup_restore_common.h
+++ b/ydb/core/tx/datashard/backup_restore_common.h
@@ -175,6 +175,7 @@ public:
void AddFullBackup(TFullBackupMetadata::TPtr fullBackup);
void AddLog(TLogMetadata::TPtr log);
void SetConsistencyKey(const TString& key);
+ void SetVersion(ui64 version);
TString Serialize() const;
static TMetadata Deserialize(const TString& metadata);
@@ -182,6 +183,7 @@ private:
TString ConsistencyKey;
TMap<TVirtualTimestamp, TFullBackupMetadata::TPtr> FullBackups;
TMap<TVirtualTimestamp, TLogMetadata::TPtr> Logs;
+ ui64 Version = 0;
};
} // NBackupRestore
diff --git a/ydb/core/tx/datashard/backup_restore_traits.cpp b/ydb/core/tx/datashard/backup_restore_traits.cpp
index a55f5681d9..c7c8443c4e 100644
--- a/ydb/core/tx/datashard/backup_restore_traits.cpp
+++ b/ydb/core/tx/datashard/backup_restore_traits.cpp
@@ -72,6 +72,27 @@ TString DataFileExtension(EDataFormat format, ECompressionCodec codec) {
return Sprintf("%s%s", fit->second.c_str(), cit->second.c_str());
}
+TString PermissionsKeySuffix() {
+ return "permissions.pb";
+}
+
+TString SchemeKeySuffix() {
+ return "scheme.pb";
+}
+
+TString MetadataKeySuffix() {
+ return "metadata.json";
+}
+
+TString DataKeySuffix(ui32 n, EDataFormat format, ECompressionCodec codec) {
+ const auto ext = DataFileExtension(format, codec);
+ return Sprintf("data_%02d%s", n, ext.c_str());
+}
+
+TString ChecksumKey(const TString& objKey) {
+ return objKey + ".sha256";
+}
+
} // NBackupRestoreTraits
} // NDataShard
} // NKikimr
diff --git a/ydb/core/tx/datashard/backup_restore_traits.h b/ydb/core/tx/datashard/backup_restore_traits.h
index 036d5a6ea5..539931a33e 100644
--- a/ydb/core/tx/datashard/backup_restore_traits.h
+++ b/ydb/core/tx/datashard/backup_restore_traits.h
@@ -30,22 +30,12 @@ ECompressionCodec NextCompressionCodec(ECompressionCodec cur);
TString DataFileExtension(EDataFormat format, ECompressionCodec codec);
-inline TString SchemeKey(const TString& objKeyPattern) {
- return Sprintf("%s/scheme.pb", objKeyPattern.c_str());
-}
-
-inline TString PermissionsKey(const TString& objKeyPattern) {
- return Sprintf("%s/permissions.pb", objKeyPattern.c_str());
-}
+TString PermissionsKeySuffix();
+TString SchemeKeySuffix();
+TString MetadataKeySuffix();
+TString DataKeySuffix(ui32 n, EDataFormat format, ECompressionCodec codec);
-inline TString MetadataKey(const TString& objKeyPattern) {
- return Sprintf("%s/metadata.json", objKeyPattern.c_str());
-}
-
-inline TString DataKey(const TString& objKeyPattern, ui32 n, EDataFormat format, ECompressionCodec codec) {
- const auto ext = DataFileExtension(format, codec);
- return Sprintf("%s/data_%02d%s", objKeyPattern.c_str(), n, ext.c_str());
-}
+TString ChecksumKey(const TString& objKey);
} // NBackupRestoreTraits
} // NDataShard
diff --git a/ydb/core/tx/datashard/export_checksum.cpp b/ydb/core/tx/datashard/export_checksum.cpp
new file mode 100644
index 0000000000..06f8ac3423
--- /dev/null
+++ b/ydb/core/tx/datashard/export_checksum.cpp
@@ -0,0 +1,39 @@
+#include "export_checksum.h"
+
+#include <openssl/sha.h>
+
+#include <util/string/hex.h>
+
+namespace NKikimr::NDataShard {
+
+class TSHA256 : public IExportChecksum {
+public:
+ TSHA256() {
+ SHA256_Init(&Context);
+ }
+
+ void AddData(TStringBuf data) override {
+ SHA256_Update(&Context, data.data(), data.size());
+ }
+
+ TString Serialize() override {
+ unsigned char hash[SHA256_DIGEST_LENGTH];
+ SHA256_Final(hash, &Context);
+ return to_lower(HexEncode(hash, SHA256_DIGEST_LENGTH));
+ }
+
+private:
+ SHA256_CTX Context;
+};
+
+TString ComputeExportChecksum(TStringBuf data) {
+ IExportChecksum::TPtr checksum(CreateExportChecksum());
+ checksum->AddData(data);
+ return checksum->Serialize();
+}
+
+IExportChecksum* CreateExportChecksum() {
+ return new TSHA256();
+}
+
+} // NKikimr::NDataShard
diff --git a/ydb/core/tx/datashard/export_checksum.h b/ydb/core/tx/datashard/export_checksum.h
new file mode 100644
index 0000000000..0944c111f1
--- /dev/null
+++ b/ydb/core/tx/datashard/export_checksum.h
@@ -0,0 +1,20 @@
+#pragma once
+
+#include <util/generic/string.h>
+
+namespace NKikimr::NDataShard {
+
+class IExportChecksum {
+public:
+ using TPtr = std::unique_ptr<IExportChecksum>;
+
+ virtual ~IExportChecksum() = default;
+
+ virtual void AddData(TStringBuf data) = 0;
+ virtual TString Serialize() = 0;
+};
+
+IExportChecksum* CreateExportChecksum();
+TString ComputeExportChecksum(TStringBuf data);
+
+} // NKikimr::NDataShard
diff --git a/ydb/core/tx/datashard/export_s3.h b/ydb/core/tx/datashard/export_s3.h
index 3e3a651c8f..cf21cd2e90 100644
--- a/ydb/core/tx/datashard/export_s3.h
+++ b/ydb/core/tx/datashard/export_s3.h
@@ -31,9 +31,10 @@ public:
switch (CodecFromTask(Task)) {
case ECompressionCodec::None:
- return CreateS3ExportBufferRaw(Columns, maxRows, maxBytes);
+ return CreateS3ExportBufferRaw(Columns, maxRows, maxBytes, Task.GetEnableChecksums());
case ECompressionCodec::Zstd:
- return CreateS3ExportBufferZstd(Task.GetCompression().GetLevel(), Columns, maxRows, maxBytes, minBytes);
+ return CreateS3ExportBufferZstd(Task.GetCompression().GetLevel(), Columns, maxRows,
+ maxBytes, minBytes, Task.GetEnableChecksums());
case ECompressionCodec::Invalid:
Y_ABORT("unreachable");
}
diff --git a/ydb/core/tx/datashard/export_s3_buffer.h b/ydb/core/tx/datashard/export_s3_buffer.h
index 2d332a561d..130e0ba7c9 100644
--- a/ydb/core/tx/datashard/export_s3_buffer.h
+++ b/ydb/core/tx/datashard/export_s3_buffer.h
@@ -9,10 +9,10 @@ namespace NKikimr {
namespace NDataShard {
NExportScan::IBuffer* CreateS3ExportBufferRaw(
- const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes);
+ const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, bool enableChecksums);
NExportScan::IBuffer* CreateS3ExportBufferZstd(int compressionLevel,
- const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, ui64 minBytes);
+ const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, ui64 minBytes, bool enableChecksums);
} // NDataShard
} // NKikimr
diff --git a/ydb/core/tx/datashard/export_s3_buffer_raw.cpp b/ydb/core/tx/datashard/export_s3_buffer_raw.cpp
index e0647ee70a..ad6d03789c 100644
--- a/ydb/core/tx/datashard/export_s3_buffer_raw.cpp
+++ b/ydb/core/tx/datashard/export_s3_buffer_raw.cpp
@@ -15,12 +15,13 @@
namespace NKikimr {
namespace NDataShard {
-TS3BufferRaw::TS3BufferRaw(const TTagToColumn& columns, ui64 rowsLimit, ui64 bytesLimit)
+TS3BufferRaw::TS3BufferRaw(const TTagToColumn& columns, ui64 rowsLimit, ui64 bytesLimit, bool enableChecksums)
: Columns(columns)
, RowsLimit(rowsLimit)
, BytesLimit(bytesLimit)
, Rows(0)
, BytesRead(0)
+ , Checksum(enableChecksums ? CreateExportChecksum() : nullptr)
{
}
@@ -154,7 +155,17 @@ bool TS3BufferRaw::Collect(const NTable::IScan::TRow& row, IOutputStream& out) {
bool TS3BufferRaw::Collect(const NTable::IScan::TRow& row) {
TBufferOutput out(Buffer);
ErrorString.clear();
- return Collect(row, out);
+
+ size_t beforeSize = Buffer.Size();
+ if (!Collect(row, out)) {
+ return false;
+ }
+
+ if (Checksum) {
+ TStringBuf data(Buffer.Data(), Buffer.Size());
+ Checksum->AddData(data.Tail(beforeSize));
+ }
+ return true;
}
IEventBase* TS3BufferRaw::PrepareEvent(bool last, NExportScan::IBuffer::TStats& stats) {
@@ -167,7 +178,12 @@ IEventBase* TS3BufferRaw::PrepareEvent(bool last, NExportScan::IBuffer::TStats&
}
stats.BytesSent = buffer->Size();
- return new TEvExportScan::TEvBuffer<TBuffer>(std::move(*buffer), last);
+
+ if (Checksum && last) {
+ return new TEvExportScan::TEvBuffer<TBuffer>(std::move(*buffer), last, Checksum->Serialize());
+ } else {
+ return new TEvExportScan::TEvBuffer<TBuffer>(std::move(*buffer), last);
+ }
}
void TS3BufferRaw::Clear() {
@@ -189,9 +205,9 @@ TMaybe<TBuffer> TS3BufferRaw::Flush(bool) {
}
NExportScan::IBuffer* CreateS3ExportBufferRaw(
- const IExport::TTableColumns& columns, ui64 rowsLimit, ui64 bytesLimit)
+ const IExport::TTableColumns& columns, ui64 rowsLimit, ui64 bytesLimit, bool enableChecksums)
{
- return new TS3BufferRaw(columns, rowsLimit, bytesLimit);
+ return new TS3BufferRaw(columns, rowsLimit, bytesLimit, enableChecksums);
}
} // NDataShard
diff --git a/ydb/core/tx/datashard/export_s3_buffer_raw.h b/ydb/core/tx/datashard/export_s3_buffer_raw.h
index 3573f4b6d5..816a6fa586 100644
--- a/ydb/core/tx/datashard/export_s3_buffer_raw.h
+++ b/ydb/core/tx/datashard/export_s3_buffer_raw.h
@@ -14,7 +14,7 @@ class TS3BufferRaw: public NExportScan::IBuffer {
using TTagToIndex = THashMap<ui32, ui32>; // index in IScan::TRow
public:
- explicit TS3BufferRaw(const TTagToColumn& columns, ui64 rowsLimit, ui64 bytesLimit);
+ explicit TS3BufferRaw(const TTagToColumn& columns, ui64 rowsLimit, ui64 bytesLimit, bool enableChecksums);
void ColumnsOrder(const TVector<ui32>& tags) override;
bool Collect(const NTable::IScan::TRow& row) override;
@@ -42,6 +42,8 @@ protected:
ui64 BytesRead;
TBuffer Buffer;
+ IExportChecksum::TPtr Checksum;
+
TString ErrorString;
}; // TS3BufferRaw
diff --git a/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp b/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp
index 7588d3b7a5..d1312b3984 100644
--- a/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp
+++ b/ydb/core/tx/datashard/export_s3_buffer_zstd.cpp
@@ -49,8 +49,9 @@ class TS3BufferZstd: public TS3BufferRaw {
public:
explicit TS3BufferZstd(int compressionLevel,
- const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, ui64 minBytes)
- : TS3BufferRaw(columns, maxRows, maxBytes)
+ const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, ui64 minBytes,
+ bool enableChecksums)
+ : TS3BufferRaw(columns, maxRows, maxBytes, enableChecksums)
, CompressionLevel(compressionLevel)
, MinBytes(minBytes)
, Context(ZSTD_createCCtx())
@@ -67,6 +68,9 @@ public:
return false;
}
+ if (Checksum) {
+ Checksum->AddData(BufferRaw);
+ }
BytesRaw += BufferRaw.size();
auto input = ZSTD_inBuffer{BufferRaw.data(), BufferRaw.size(), 0};
@@ -122,9 +126,10 @@ private:
}; // TS3BufferZstd
NExportScan::IBuffer* CreateS3ExportBufferZstd(int compressionLevel,
- const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, ui64 minBytes)
+ const IExport::TTableColumns& columns, ui64 maxRows, ui64 maxBytes, ui64 minBytes,
+ bool enableChecksums)
{
- return new TS3BufferZstd(compressionLevel, columns, maxRows, maxBytes, minBytes);
+ return new TS3BufferZstd(compressionLevel, columns, maxRows, maxBytes, minBytes, enableChecksums);
}
} // NDataShard
diff --git a/ydb/core/tx/datashard/export_s3_uploader.cpp b/ydb/core/tx/datashard/export_s3_uploader.cpp
index 9acd52bc7b..cedcec73c7 100644
--- a/ydb/core/tx/datashard/export_s3_uploader.cpp
+++ b/ydb/core/tx/datashard/export_s3_uploader.cpp
@@ -31,6 +31,8 @@
namespace NKikimr {
namespace NDataShard {
+using namespace NBackupRestoreTraits;
+
class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
using TS3ExternalStorageConfig = NWrappers::NExternalStorage::TS3ExternalStorageConfig;
using THttpResolverConfig = NKikimrConfig::TS3ProxyResolverConfig::THttpResolverConfig;
@@ -180,6 +182,9 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
}
google::protobuf::TextFormat::PrintToString(Scheme.GetRef(), &Buffer);
+ if (EnableChecksums) {
+ SchemeChecksum = ComputeExportChecksum(Buffer);
+ }
auto request = Aws::S3::Model::PutObjectRequest()
.WithKey(Settings.GetSchemeKey());
@@ -196,6 +201,9 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
}
google::protobuf::TextFormat::PrintToString(Permissions.GetRef(), &Buffer);
+ if (EnableChecksums) {
+ PermissionsChecksum = ComputeExportChecksum(Buffer);
+ }
auto request = Aws::S3::Model::PutObjectRequest()
.WithKey(Settings.GetPermissionsKey());
@@ -208,6 +216,9 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
Y_ABORT_UNLESS(!MetadataUploaded);
Buffer = std::move(Metadata);
+ if (EnableChecksums) {
+ MetadataChecksum = ComputeExportChecksum(Buffer);
+ }
auto request = Aws::S3::Model::PutObjectRequest()
.WithKey(Settings.GetMetadataKey());
@@ -216,6 +227,20 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
this->Become(&TThis::StateUploadMetadata);
}
+ void UploadChecksum(TString&& checksum, const TString& checksumKey, const TString& objectKeySuffix,
+ std::function<void()> checksumUploadedCallback)
+ {
+ // make checksum verifiable using sha256sum CLI
+ checksum += ' ' + objectKeySuffix;
+
+ auto request = Aws::S3::Model::PutObjectRequest()
+ .WithKey(checksumKey);
+ this->Send(Client, new TEvExternalStorage::TEvPutObjectRequest(request, std::move(checksum)));
+
+ ChecksumUploadedCallback = checksumUploadedCallback;
+ this->Become(&TThis::StateUploadChecksum);
+ }
+
void HandleScheme(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) {
const auto& result = ev->Get()->Result;
@@ -227,13 +252,21 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
return;
}
- SchemeUploaded = true;
+ auto nextStep = [this]() {
+ SchemeUploaded = true;
- if (Scanner) {
- this->Send(Scanner, new TEvExportScan::TEvFeed());
- }
+ if (Scanner) {
+ this->Send(Scanner, new TEvExportScan::TEvFeed());
+ }
+ this->Become(&TThis::StateUploadData);
+ };
- this->Become(&TThis::StateUploadData);
+ if (EnableChecksums) {
+ TString checksumKey = ChecksumKey(Settings.GetSchemeKey());
+ UploadChecksum(std::move(SchemeChecksum), checksumKey, SchemeKeySuffix(), nextStep);
+ } else {
+ nextStep();
+ }
}
void HandlePermissions(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) {
@@ -247,9 +280,17 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
return;
}
- PermissionsUploaded = true;
+ auto nextStep = [this]() {
+ PermissionsUploaded = true;
+ UploadScheme();
+ };
- UploadScheme();
+ if (EnableChecksums) {
+ TString checksumKey = ChecksumKey(Settings.GetPermissionsKey());
+ UploadChecksum(std::move(PermissionsChecksum), checksumKey, PermissionsKeySuffix(), nextStep);
+ } else {
+ nextStep();
+ }
}
void HandleMetadata(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) {
@@ -263,9 +304,31 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
return;
}
- MetadataUploaded = true;
+ auto nextStep = [this]() {
+ MetadataUploaded = true;
+ UploadPermissions();
+ };
- UploadPermissions();
+ if (EnableChecksums) {
+ TString checksumKey = ChecksumKey(Settings.GetMetadataKey());
+ UploadChecksum(std::move(MetadataChecksum), checksumKey, MetadataKeySuffix(), nextStep);
+ } else {
+ nextStep();
+ }
+ }
+
+ void HandleChecksum(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) {
+ const auto& result = ev->Get()->Result;
+
+ EXPORT_LOG_D("HandleChecksum TEvExternalStorage::TEvPutObjectResponse"
+ << ": self# " << this->SelfId()
+ << ", result# " << result);
+
+ if (!CheckResult(result, TStringBuf("PutObject (checksum)"))) {
+ return;
+ }
+
+ ChecksumUploadedCallback();
}
void Handle(TEvExportScan::TEvReady::TPtr& ev) {
@@ -301,6 +364,7 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
Last = ev->Get()->Last;
MultiPart = MultiPart || !Last;
ev->Get()->Buffer.AsString(Buffer);
+ DataChecksum = std::move(ev->Get()->Checksum);
UploadData();
}
@@ -335,7 +399,18 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
return;
}
- Finish();
+ auto nextStep = [this]() {
+ Finish();
+ };
+
+ if (EnableChecksums) {
+ // checksum is always calculated before compression
+ TString checksumKey = ChecksumKey(Settings.GetDataKey(DataFormat, ECompressionCodec::None));
+ TString dataKeySuffix = DataKeySuffix(ShardNum, DataFormat, ECompressionCodec::None);
+ UploadChecksum(std::move(DataChecksum), checksumKey, dataKeySuffix, nextStep);
+ } else {
+ nextStep();
+ }
}
void Handle(TEvDataShard::TEvS3Upload::TPtr& ev) {
@@ -418,7 +493,18 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
Parts.push_back(result.GetResult().GetETag().c_str());
if (Last) {
- return Finish();
+ auto nextStep = [this]() {
+ Finish();
+ };
+
+ if (EnableChecksums) {
+ // checksum is always calculated before compression
+ TString checksumKey = ChecksumKey(Settings.GetDataKey(DataFormat, ECompressionCodec::None));
+ TString dataKeySuffix = DataKeySuffix(ShardNum, DataFormat, ECompressionCodec::None);
+ return UploadChecksum(std::move(DataChecksum), checksumKey, dataKeySuffix, nextStep);
+ } else {
+ return nextStep();
+ }
}
this->Send(Scanner, new TEvExportScan::TEvFeed());
@@ -542,6 +628,7 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
this->Send(DataShard, new TEvDataShard::TEvChangeS3UploadStatus(this->SelfId(), TxId,
TS3Upload::EStatus::Abort, *Error));
}
+ Become(&TThis::StateUploadData);
}
}
@@ -576,8 +663,9 @@ public:
TString&& metadata)
: ExternalStorageConfig(new TS3ExternalStorageConfig(task.GetS3Settings()))
, Settings(TS3Settings::FromBackupTask(task))
- , DataFormat(NBackupRestoreTraits::EDataFormat::Csv)
- , CompressionCodec(NBackupRestoreTraits::CodecFromTask(task))
+ , DataFormat(EDataFormat::Csv)
+ , CompressionCodec(CodecFromTask(task))
+ , ShardNum(task.GetShardNum())
, HttpResolverConfig(GetHttpResolverConfig(*GetS3StorageConfig()))
, DataShard(dataShard)
, TxId(txId)
@@ -587,9 +675,10 @@ public:
, Retries(task.GetNumberOfRetries())
, Attempt(0)
, Delay(TDuration::Minutes(1))
- , SchemeUploaded(task.GetShardNum() == 0 ? false : true)
- , MetadataUploaded(task.GetShardNum() == 0 ? false : true)
- , PermissionsUploaded(task.GetShardNum() == 0 ? false : true)
+ , SchemeUploaded(ShardNum == 0 ? false : true)
+ , MetadataUploaded(ShardNum == 0 ? false : true)
+ , PermissionsUploaded(ShardNum == 0 ? false : true)
+ , EnableChecksums(task.GetEnableChecksums())
{
}
@@ -647,6 +736,14 @@ public:
}
}
+ STATEFN(StateUploadChecksum) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvExternalStorage::TEvPutObjectResponse, HandleChecksum);
+ default:
+ return StateBase(ev);
+ }
+ }
+
STATEFN(StateUploadData) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvBuffer, Handle);
@@ -665,8 +762,9 @@ public:
private:
NWrappers::IExternalStorageConfig::TPtr ExternalStorageConfig;
TS3Settings Settings;
- const NBackupRestoreTraits::EDataFormat DataFormat;
- const NBackupRestoreTraits::ECompressionCodec CompressionCodec;
+ const EDataFormat DataFormat;
+ const ECompressionCodec CompressionCodec;
+ const ui32 ShardNum;
bool ProxyResolved;
TMaybe<THttpResolverConfig> HttpResolverConfig;
@@ -698,6 +796,13 @@ private:
TVector<TString> Parts;
TMaybe<TString> Error;
+ bool EnableChecksums;
+ TString DataChecksum;
+ TString MetadataChecksum;
+ TString SchemeChecksum;
+ TString PermissionsChecksum;
+ std::function<void()> ChecksumUploadedCallback;
+
}; // TS3Uploader
IActor* TS3Export::CreateUploader(const TActorId& dataShard, ui64 txId) const {
@@ -710,6 +815,7 @@ IActor* TS3Export::CreateUploader(const TActorId& dataShard, ui64 txId) const {
: Nothing();
NBackupRestore::TMetadata metadata;
+ metadata.SetVersion(Task.GetEnableChecksums() ? 1 : 0);
NBackupRestore::TFullBackupMetadata::TPtr backup = new NBackupRestore::TFullBackupMetadata{
.SnapshotVts = NBackupRestore::TVirtualTimestamp(
diff --git a/ydb/core/tx/datashard/export_scan.h b/ydb/core/tx/datashard/export_scan.h
index 1873795255..8c72344877 100644
--- a/ydb/core/tx/datashard/export_scan.h
+++ b/ydb/core/tx/datashard/export_scan.h
@@ -2,6 +2,8 @@
#include "defs.h"
+#include "export_checksum.h"
+
#include <ydb/core/tablet_flat/flat_scan_iface.h>
#include <util/generic/ptr.h>
@@ -32,18 +34,21 @@ struct TEvExportScan {
struct TEvBuffer: public TEventLocal<TEvBuffer<TBuffer>, EvBuffer> {
TBuffer Buffer;
bool Last;
+ TString Checksum;
TEvBuffer() = default;
- explicit TEvBuffer(TBuffer&& buffer, bool last)
+ explicit TEvBuffer(TBuffer&& buffer, bool last, TString&& checksum = "")
: Buffer(std::move(buffer))
, Last(last)
+ , Checksum(std::move(checksum))
{
}
TString ToString() const override {
return TStringBuilder() << this->ToStringHeader() << " {"
<< " Last: " << Last
+ << " Checksum:" << Checksum
<< " }";
}
};
diff --git a/ydb/core/tx/datashard/extstorage_usage_config.h b/ydb/core/tx/datashard/extstorage_usage_config.h
index 873e1b04ac..a64cc757f3 100644
--- a/ydb/core/tx/datashard/extstorage_usage_config.h
+++ b/ydb/core/tx/datashard/extstorage_usage_config.h
@@ -43,21 +43,27 @@ public:
Aws::S3::Model::StorageClass GetStorageClass() const;
inline TString GetPermissionsKey() const {
- return NBackupRestoreTraits::PermissionsKey(ObjectKeyPattern);
+ return ObjectKeyPattern + '/' + NBackupRestoreTraits::PermissionsKeySuffix();
}
inline TString GetMetadataKey() const {
- return NBackupRestoreTraits::MetadataKey(ObjectKeyPattern);
+ return ObjectKeyPattern + '/' + NBackupRestoreTraits::MetadataKeySuffix();
}
inline TString GetSchemeKey() const {
- return NBackupRestoreTraits::SchemeKey(ObjectKeyPattern);
+ return ObjectKeyPattern + '/' + NBackupRestoreTraits::SchemeKeySuffix();
}
inline TString GetDataKey(
NBackupRestoreTraits::EDataFormat format,
NBackupRestoreTraits::ECompressionCodec codec) const {
- return NBackupRestoreTraits::DataKey(ObjectKeyPattern, Shard, format, codec);
+ return ObjectKeyPattern + '/' + NBackupRestoreTraits::DataKeySuffix(Shard, format, codec);
+ }
+
+ inline TString GetDataFile(
+ NBackupRestoreTraits::EDataFormat format,
+ NBackupRestoreTraits::ECompressionCodec codec) const {
+ return NBackupRestoreTraits::DataKeySuffix(Shard, format, codec);
}
}; // TS3Settings
diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make
index 3ec752d289..d73553229a 100644
--- a/ydb/core/tx/datashard/ya.make
+++ b/ydb/core/tx/datashard/ya.make
@@ -152,6 +152,7 @@ SRCS(
execution_unit.h
execution_unit_ctors.h
execution_unit_kind.h
+ export_checksum.cpp
export_common.cpp
export_iface.cpp
export_iface.h
diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp
index 10db6d9c49..aa07665ed9 100644
--- a/ydb/core/tx/schemeshard/schemeshard__init.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp
@@ -4279,6 +4279,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
exportInfo->StartTime = TInstant::Seconds(rowset.GetValueOrDefault<Schema::Exports::StartTime>());
exportInfo->EndTime = TInstant::Seconds(rowset.GetValueOrDefault<Schema::Exports::EndTime>());
+ exportInfo->EnableChecksums = rowset.GetValueOrDefault<Schema::Exports::EnableChecksums>(false);
Self->Exports[id] = exportInfo;
if (uid) {
diff --git a/ydb/core/tx/schemeshard/schemeshard_export.cpp b/ydb/core/tx/schemeshard/schemeshard_export.cpp
index a8ae8e67d9..ea6f881ee3 100644
--- a/ydb/core/tx/schemeshard/schemeshard_export.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_export.cpp
@@ -153,7 +153,8 @@ void TSchemeShard::PersistCreateExport(NIceDb::TNiceDb& db, const TExportInfo::T
NIceDb::TUpdate<Schema::Exports::Settings>(exportInfo->Settings),
NIceDb::TUpdate<Schema::Exports::DomainPathOwnerId>(exportInfo->DomainPathId.OwnerId),
NIceDb::TUpdate<Schema::Exports::DomainPathId>(exportInfo->DomainPathId.LocalPathId),
- NIceDb::TUpdate<Schema::Exports::Items>(exportInfo->Items.size())
+ NIceDb::TUpdate<Schema::Exports::Items>(exportInfo->Items.size()),
+ NIceDb::TUpdate<Schema::Exports::EnableChecksums>(exportInfo->EnableChecksums)
);
if (exportInfo->UserSID) {
diff --git a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp
index f65eabb43f..7f63a60a79 100644
--- a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp
@@ -118,7 +118,7 @@ struct TSchemeShard::TExport::TTxCreate: public TSchemeShard::TXxport::TTxBase {
}
exportInfo = new TExportInfo(id, uid, TExportInfo::EKind::S3, settings, domainPath.Base()->PathId, request.GetPeerName());
-
+ exportInfo->EnableChecksums = AppData()->FeatureFlags.GetEnableExportChecksums();
TString explain;
if (!FillItems(exportInfo, settings, explain)) {
return Reply(
diff --git a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp
index d6fca52aec..ce03d1669a 100644
--- a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp
@@ -193,6 +193,8 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> BackupPropose(
if (const auto compression = exportSettings.compression()) {
Y_ABORT_UNLESS(FillCompression(*task.MutableCompression(), compression));
}
+
+ task.SetEnableChecksums(exportInfo->EnableChecksums);
}
break;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h
index f265463bc9..765a407715 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h
@@ -2709,6 +2709,8 @@ struct TExportInfo: public TSimpleRefCount<TExportInfo> {
TInstant StartTime = TInstant::Zero();
TInstant EndTime = TInstant::Zero();
+ bool EnableChecksums = false;
+
explicit TExportInfo(
const ui64 id,
const TString& uid,
diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h
index 632616f6fc..76d29549ea 100644
--- a/ydb/core/tx/schemeshard/schemeshard_schema.h
+++ b/ydb/core/tx/schemeshard/schemeshard_schema.h
@@ -1175,6 +1175,8 @@ struct Schema : NIceDb::Schema {
struct EndTime : Column<15, NScheme::NTypeIds::Uint64> {};
struct PeerName : Column<16, NScheme::NTypeIds::Utf8> {};
+ struct EnableChecksums : Column<17, NScheme::NTypeIds::Bool> {};
+
using TKey = TableKey<Id>;
using TColumns = TableColumns<
Id,
@@ -1192,7 +1194,8 @@ struct Schema : NIceDb::Schema {
UserSID,
StartTime,
EndTime,
- PeerName
+ PeerName,
+ EnableChecksums
>;
};
diff --git a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp
index 47cfca6c2e..f09413afcf 100644
--- a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp
+++ b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp
@@ -2346,4 +2346,109 @@ partitioning_settings {
}
)"));
}
+
+ Y_UNIT_TEST(Checksums) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+ ui64 txId = 100;
+
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Utf8" }
+ Columns { Name: "value" Type: "Utf8" }
+ KeyColumnNames: ["key"]
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)});
+
+ TPortManager portManager;
+ const ui16 port = portManager.GetPort();
+
+ TS3Mock s3Mock({}, TS3Mock::TSettings(port));
+ UNIT_ASSERT(s3Mock.Start());
+
+ TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ ExportToS3Settings {
+ endpoint: "localhost:%d"
+ scheme: HTTP
+ items {
+ source_path: "/MyRoot/Table"
+ destination_prefix: ""
+ }
+ }
+ )", port));
+ env.TestWaitNotification(runtime, txId);
+
+ UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().size(), 8);
+
+ const auto* dataChecksum = s3Mock.GetData().FindPtr("/data_00.csv.sha256");
+ UNIT_ASSERT(dataChecksum);
+ UNIT_ASSERT_VALUES_EQUAL(*dataChecksum, "19dcd641390a61063ee45f3e6e06b8f0d3acfc33f934b9bf1ba204668a98f21d data_00.csv");
+
+ const auto* metadataChecksum = s3Mock.GetData().FindPtr("/metadata.json.sha256");
+ UNIT_ASSERT(metadataChecksum);
+ UNIT_ASSERT_VALUES_EQUAL(*metadataChecksum, "b72575244ae0cce8dffd45f3537d1e412bfe39de4268f4f85f529cb529870903 metadata.json");
+
+ const auto* schemeChecksum = s3Mock.GetData().FindPtr("/scheme.pb.sha256");
+ UNIT_ASSERT(schemeChecksum);
+ UNIT_ASSERT_VALUES_EQUAL(*schemeChecksum, "cb1fb80965ae92e6369acda2b3b5921fd5518c97d6437f467ce00492907f9eb6 scheme.pb");
+
+ const auto* permissionsChecksum = s3Mock.GetData().FindPtr("/permissions.pb.sha256");
+ UNIT_ASSERT(permissionsChecksum);
+ UNIT_ASSERT_VALUES_EQUAL(*permissionsChecksum, "b41fd8921ff3a7314d9c702dc0e71aace6af8443e0102add0432895c5e50a326 permissions.pb");
+ }
+
+ Y_UNIT_TEST(ChecksumsWithCompression) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+ ui64 txId = 100;
+
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Utf8" }
+ Columns { Name: "value" Type: "Utf8" }
+ KeyColumnNames: ["key"]
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)});
+
+ TPortManager portManager;
+ const ui16 port = portManager.GetPort();
+
+ TS3Mock s3Mock({}, TS3Mock::TSettings(port));
+ UNIT_ASSERT(s3Mock.Start());
+
+ TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ ExportToS3Settings {
+ endpoint: "localhost:%d"
+ scheme: HTTP
+ items {
+ source_path: "/MyRoot/Table"
+ destination_prefix: ""
+ }
+ compression: "zstd"
+ }
+ )", port));
+ env.TestWaitNotification(runtime, txId);
+
+ UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().size(), 8);
+
+ const auto* dataChecksum = s3Mock.GetData().FindPtr("/data_00.csv.sha256");
+ UNIT_ASSERT(dataChecksum);
+ UNIT_ASSERT_VALUES_EQUAL(*dataChecksum, "19dcd641390a61063ee45f3e6e06b8f0d3acfc33f934b9bf1ba204668a98f21d data_00.csv");
+
+ const auto* metadataChecksum = s3Mock.GetData().FindPtr("/metadata.json.sha256");
+ UNIT_ASSERT(metadataChecksum);
+ UNIT_ASSERT_VALUES_EQUAL(*metadataChecksum, "b72575244ae0cce8dffd45f3537d1e412bfe39de4268f4f85f529cb529870903 metadata.json");
+
+ const auto* schemeChecksum = s3Mock.GetData().FindPtr("/scheme.pb.sha256");
+ UNIT_ASSERT(schemeChecksum);
+ UNIT_ASSERT_VALUES_EQUAL(*schemeChecksum, "cb1fb80965ae92e6369acda2b3b5921fd5518c97d6437f467ce00492907f9eb6 scheme.pb");
+
+ const auto* permissionsChecksum = s3Mock.GetData().FindPtr("/permissions.pb.sha256");
+ UNIT_ASSERT(permissionsChecksum);
+ UNIT_ASSERT_VALUES_EQUAL(*permissionsChecksum, "b41fd8921ff3a7314d9c702dc0e71aace6af8443e0102add0432895c5e50a326 permissions.pb");
+ }
}
diff --git a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp
index 16077cc065..e785ed85e0 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp
+++ b/ydb/core/tx/schemeshard/ut_helpers/test_env.cpp
@@ -548,6 +548,7 @@ NSchemeShardUT_Private::TTestEnv::TTestEnv(TTestActorRuntime& runtime, const TTe
app.SetEnableParameterizedDecimal(opts.EnableParameterizedDecimal_);
app.SetEnableTopicAutopartitioningForCDC(opts.EnableTopicAutopartitioningForCDC_);
app.SetEnableBackupService(opts.EnableBackupService_);
+ app.SetEnableExportChecksums(true);
app.ColumnShardConfig.SetDisabledOnSchemeShard(false);
diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema
index cd58d3db75..ed19c6e0a5 100644
--- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema
+++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema
@@ -3032,6 +3032,11 @@
],
"ColumnsAdded": [
{
+ "ColumnId": 17,
+ "ColumnName": "EnableChecksums",
+ "ColumnType": "Bool"
+ },
+ {
"ColumnId": 1,
"ColumnName": "Id",
"ColumnType": "Uint64"
@@ -3116,6 +3121,7 @@
"ColumnFamilies": {
"0": {
"Columns": [
+ 17,
1,
2,
3,