summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlia Shakhov <[email protected]>2025-01-10 19:14:34 +0300
committerGitHub <[email protected]>2025-01-10 16:14:34 +0000
commitd425a70c0e5b9a4e55b009c87d2acb1c2c813a12 (patch)
tree6c63bc6fd1bd578272d349c67a20ec1d3edd2f8e
parentde295440fc0bd1080345c32147a1512558d4b937 (diff)
Validate checksums on import from s3 (#12909)
-rw-r--r--ydb/apps/ydb/CHANGELOG.md1
-rw-r--r--ydb/core/backup/common/checksum.cpp (renamed from ydb/core/tx/datashard/export_checksum.cpp)16
-rw-r--r--ydb/core/backup/common/checksum.h21
-rw-r--r--ydb/core/backup/common/metadata.cpp54
-rw-r--r--ydb/core/backup/common/metadata.h59
-rw-r--r--ydb/core/backup/common/ya.make15
-rw-r--r--ydb/core/backup/ya.make1
-rw-r--r--ydb/core/protos/flat_scheme_op.proto2
-rw-r--r--ydb/core/tx/datashard/backup_restore_common.cpp29
-rw-r--r--ydb/core/tx/datashard/backup_restore_common.h52
-rw-r--r--ydb/core/tx/datashard/backup_restore_traits.cpp4
-rw-r--r--ydb/core/tx/datashard/backup_restore_traits.h2
-rw-r--r--ydb/core/tx/datashard/export_checksum.h20
-rw-r--r--ydb/core/tx/datashard/export_s3_buffer_raw.cpp2
-rw-r--r--ydb/core/tx/datashard/export_s3_buffer_raw.h4
-rw-r--r--ydb/core/tx/datashard/export_s3_uploader.cpp16
-rw-r--r--ydb/core/tx/datashard/export_scan.h2
-rw-r--r--ydb/core/tx/datashard/import_s3.cpp86
-rw-r--r--ydb/core/tx/datashard/ya.make3
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__init.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_import.cpp3
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp178
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.h2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_schema.h2
-rw-r--r--ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp144
-rw-r--r--ydb/public/api/protos/ydb_import.proto3
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp4
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_import.h3
-rw-r--r--ydb/public/lib/ydb_cli/common/print_operation.cpp10
-rw-r--r--ydb/public/sdk/cpp/client/ydb_import/import.h1
-rw-r--r--ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema8
32 files changed, 616 insertions, 139 deletions
diff --git a/ydb/apps/ydb/CHANGELOG.md b/ydb/apps/ydb/CHANGELOG.md
index 2ade1ddfc87..f27802c9e47 100644
--- a/ydb/apps/ydb/CHANGELOG.md
+++ b/ydb/apps/ydb/CHANGELOG.md
@@ -1,3 +1,4 @@
+* Added `--skip-checksum-validation` option to `ydb import s3` command to skip server-side checksum validation.
## 2.18.0 ##
diff --git a/ydb/core/tx/datashard/export_checksum.cpp b/ydb/core/backup/common/checksum.cpp
index 06f8ac34233..006ad48cd8b 100644
--- a/ydb/core/tx/datashard/export_checksum.cpp
+++ b/ydb/core/backup/common/checksum.cpp
@@ -1,12 +1,12 @@
-#include "export_checksum.h"
+#include "checksum.h"
#include <openssl/sha.h>
#include <util/string/hex.h>
-namespace NKikimr::NDataShard {
+namespace NKikimr::NBackup {
-class TSHA256 : public IExportChecksum {
+class TSHA256 : public IChecksum {
public:
TSHA256() {
SHA256_Init(&Context);
@@ -26,14 +26,18 @@ private:
SHA256_CTX Context;
};
-TString ComputeExportChecksum(TStringBuf data) {
- IExportChecksum::TPtr checksum(CreateExportChecksum());
+TString ComputeChecksum(TStringBuf data) {
+ IChecksum::TPtr checksum(CreateChecksum());
checksum->AddData(data);
return checksum->Serialize();
}
-IExportChecksum* CreateExportChecksum() {
+IChecksum* CreateChecksum() {
return new TSHA256();
}
+TString ChecksumKey(const TString& objKey) {
+ return objKey + ".sha256";
+}
+
} // NKikimr::NDataShard
diff --git a/ydb/core/backup/common/checksum.h b/ydb/core/backup/common/checksum.h
new file mode 100644
index 00000000000..4da5dbbe968
--- /dev/null
+++ b/ydb/core/backup/common/checksum.h
@@ -0,0 +1,21 @@
+#pragma once
+
+#include <util/generic/string.h>
+
+namespace NKikimr::NBackup {
+
+class IChecksum {
+public:
+ using TPtr = std::unique_ptr<IChecksum>;
+
+ virtual ~IChecksum() = default;
+
+ virtual void AddData(TStringBuf data) = 0;
+ virtual TString Serialize() = 0;
+};
+
+IChecksum* CreateChecksum();
+TString ComputeChecksum(TStringBuf data);
+TString ChecksumKey(const TString& objKey);
+
+} // NKikimr::NBackup
diff --git a/ydb/core/backup/common/metadata.cpp b/ydb/core/backup/common/metadata.cpp
new file mode 100644
index 00000000000..3b14d50e430
--- /dev/null
+++ b/ydb/core/backup/common/metadata.cpp
@@ -0,0 +1,54 @@
+#include "metadata.h"
+
+#include <library/cpp/json/json_writer.h>
+#include <library/cpp/json/json_reader.h>
+
+namespace NKikimr::NBackup {
+
+void TMetadata::AddFullBackup(TFullBackupMetadata::TPtr fb) {
+ FullBackups.emplace(fb->SnapshotVts, fb);
+}
+
+void TMetadata::SetVersion(ui64 version) {
+ Version = version;
+}
+
+bool TMetadata::HasVersion() const {
+ return Version.Defined();
+}
+
+ui64 TMetadata::GetVersion() const {
+ return *Version;
+}
+
+TString TMetadata::Serialize() const {
+ NJson::TJsonMap m;
+ m["version"] = *Version;
+
+ NJson::TJsonArray fullBackups;
+ for (auto &[tp, b] : FullBackups) {
+ NJson::TJsonMap backupMap;
+ NJson::TJsonArray vts;
+ vts.AppendValue(tp.Step);
+ vts.AppendValue(tp.TxId);
+ backupMap["snapshot_vts"] = std::move(vts);
+ fullBackups.AppendValue(std::move(backupMap));
+ }
+ m["full_backups"] = fullBackups;
+ return NJson::WriteJson(&m, false);
+}
+
+TMetadata TMetadata::Deserialize(const TString& metadata) {
+ NJson::TJsonValue json;
+ NJson::ReadJsonTree(metadata, &json);
+ const auto& value = json["version"];
+
+ TMetadata result;
+ if (value.IsUInteger()) {
+ result.Version = value.GetUIntegerSafe();
+ }
+
+ return result;
+}
+
+}
diff --git a/ydb/core/backup/common/metadata.h b/ydb/core/backup/common/metadata.h
new file mode 100644
index 00000000000..f92e5e4fd6e
--- /dev/null
+++ b/ydb/core/backup/common/metadata.h
@@ -0,0 +1,59 @@
+#pragma once
+
+#include <ydb/core/base/row_version.h>
+
+#include <util/generic/map.h>
+#include <util/generic/maybe.h>
+#include <util/generic/ptr.h>
+#include <util/generic/string.h>
+
+namespace NKikimr::NBackup {
+
+using TVirtualTimestamp = TRowVersion;
+
+enum class EStorageType {
+ YT,
+ S3,
+};
+
+struct TLogMetadata : TSimpleRefCount<TLogMetadata> {
+ using TPtr = TIntrusivePtr<TLogMetadata>;
+
+ const TVirtualTimestamp StartVts;
+ TString ConsistencyKey;
+ EStorageType StorageType;
+ TString StoragePath;
+};
+
+struct TFullBackupMetadata : TSimpleRefCount<TFullBackupMetadata> {
+ using TPtr = TIntrusivePtr<TFullBackupMetadata>;
+
+ const TVirtualTimestamp SnapshotVts;
+ TString ConsistencyKey;
+ TLogMetadata::TPtr FollowingLog;
+ EStorageType StorageType;
+ TString StoragePath;
+};
+
+class TMetadata {
+public:
+ TMetadata() = default;
+ TMetadata(TVector<TFullBackupMetadata::TPtr>&& fullBackups, TVector<TLogMetadata::TPtr>&& logs);
+
+ void AddFullBackup(TFullBackupMetadata::TPtr fullBackup);
+ void AddLog(TLogMetadata::TPtr log);
+ void SetConsistencyKey(const TString& key);
+ void SetVersion(ui64 version);
+ bool HasVersion() const;
+ ui64 GetVersion() const;
+
+ TString Serialize() const;
+ static TMetadata Deserialize(const TString& metadata);
+private:
+ TString ConsistencyKey;
+ TMap<TVirtualTimestamp, TFullBackupMetadata::TPtr> FullBackups;
+ TMap<TVirtualTimestamp, TLogMetadata::TPtr> Logs;
+ TMaybeFail<ui64> Version;
+};
+
+} // namespace NKikimr::NBackup
diff --git a/ydb/core/backup/common/ya.make b/ydb/core/backup/common/ya.make
new file mode 100644
index 00000000000..3d45df32f77
--- /dev/null
+++ b/ydb/core/backup/common/ya.make
@@ -0,0 +1,15 @@
+LIBRARY()
+
+SRCS(
+ metadata.cpp
+ checksum.cpp
+)
+
+PEERDIR(
+ library/cpp/json
+ ydb/core/base
+)
+
+GENERATE_ENUM_SERIALIZATION(metadata.h)
+
+END()
diff --git a/ydb/core/backup/ya.make b/ydb/core/backup/ya.make
index 0edf32a6971..0a28151e73f 100644
--- a/ydb/core/backup/ya.make
+++ b/ydb/core/backup/ya.make
@@ -1,4 +1,5 @@
RECURSE(
+ common
controller
impl
)
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index 8a8ac2946fe..beb279f261d 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -1262,6 +1262,8 @@ message TRestoreTask {
oneof Settings {
TS3Settings S3Settings = 6;
}
+
+ optional bool ValidateChecksums = 7; // currently available for s3
}
message TPersQueueGroupAllocate {
diff --git a/ydb/core/tx/datashard/backup_restore_common.cpp b/ydb/core/tx/datashard/backup_restore_common.cpp
deleted file mode 100644
index 5daaf26e22f..00000000000
--- a/ydb/core/tx/datashard/backup_restore_common.cpp
+++ /dev/null
@@ -1,29 +0,0 @@
-#include "backup_restore_common.h"
-
-namespace NKikimr::NDataShard::NBackupRestore {
-
-void TMetadata::AddFullBackup(TFullBackupMetadata::TPtr fb) {
- FullBackups.emplace(fb->SnapshotVts, fb);
-}
-
-void TMetadata::SetVersion(ui64 version) {
- Version = version;
-}
-
-TString TMetadata::Serialize() const {
- NJson::TJsonMap m;
- m["version"] = Version;
- NJson::TJsonArray fullBackups;
- for (auto &[tp, _] : FullBackups) {
- NJson::TJsonMap backupMap;
- NJson::TJsonArray vts;
- vts.AppendValue(tp.Step);
- vts.AppendValue(tp.TxId);
- backupMap["snapshot_vts"] = std::move(vts);
- fullBackups.AppendValue(std::move(backupMap));
- }
- m["full_backups"] = fullBackups;
- return NJson::WriteJson(&m, false);
-}
-
-} // NKikimr::NDataShard::NBackupRestore
diff --git a/ydb/core/tx/datashard/backup_restore_common.h b/ydb/core/tx/datashard/backup_restore_common.h
index d932f35de94..06d1970da3f 100644
--- a/ydb/core/tx/datashard/backup_restore_common.h
+++ b/ydb/core/tx/datashard/backup_restore_common.h
@@ -6,11 +6,8 @@
#include "datashard_pipeline.h"
#include "execution_unit.h"
-#include <ydb/core/base/row_version.h>
-
#include <util/generic/ptr.h>
#include <util/generic/string.h>
-#include <util/generic/vector.h>
namespace NKikimr {
namespace NDataShard {
@@ -139,54 +136,5 @@ public:
}; // TBackupRestoreUnitBase
-namespace NBackupRestore {
-
-using TVirtualTimestamp = TRowVersion;
-
-enum class EStorageType {
- YT,
- S3,
-};
-
-struct TLogMetadata : TSimpleRefCount<TLogMetadata> {
- using TPtr = TIntrusivePtr<TLogMetadata>;
-
- const TVirtualTimestamp StartVts;
- TString ConsistencyKey;
- EStorageType StorageType;
- TString StoragePath;
-};
-
-struct TFullBackupMetadata : TSimpleRefCount<TFullBackupMetadata> {
- using TPtr = TIntrusivePtr<TFullBackupMetadata>;
-
- const TVirtualTimestamp SnapshotVts;
- TString ConsistencyKey;
- TLogMetadata::TPtr FollowingLog;
- EStorageType StorageType;
- TString StoragePath;
-};
-
-class TMetadata {
-public:
- TMetadata() = default;
- TMetadata(TVector<TFullBackupMetadata::TPtr>&& fullBackups, TVector<TLogMetadata::TPtr>&& logs);
-
- void AddFullBackup(TFullBackupMetadata::TPtr fullBackup);
- void AddLog(TLogMetadata::TPtr log);
- void SetConsistencyKey(const TString& key);
- void SetVersion(ui64 version);
-
- TString Serialize() const;
- static TMetadata Deserialize(const TString& metadata);
-private:
- TString ConsistencyKey;
- TMap<TVirtualTimestamp, TFullBackupMetadata::TPtr> FullBackups;
- TMap<TVirtualTimestamp, TLogMetadata::TPtr> Logs;
- ui64 Version = 0;
-};
-
-} // NBackupRestore
-
} // NDataShard
} // NKikimr
diff --git a/ydb/core/tx/datashard/backup_restore_traits.cpp b/ydb/core/tx/datashard/backup_restore_traits.cpp
index c7c8443c4e4..5ef966dcea3 100644
--- a/ydb/core/tx/datashard/backup_restore_traits.cpp
+++ b/ydb/core/tx/datashard/backup_restore_traits.cpp
@@ -89,10 +89,6 @@ TString DataKeySuffix(ui32 n, EDataFormat format, ECompressionCodec codec) {
return Sprintf("data_%02d%s", n, ext.c_str());
}
-TString ChecksumKey(const TString& objKey) {
- return objKey + ".sha256";
-}
-
} // NBackupRestoreTraits
} // NDataShard
} // NKikimr
diff --git a/ydb/core/tx/datashard/backup_restore_traits.h b/ydb/core/tx/datashard/backup_restore_traits.h
index 539931a33e1..342943b9c32 100644
--- a/ydb/core/tx/datashard/backup_restore_traits.h
+++ b/ydb/core/tx/datashard/backup_restore_traits.h
@@ -35,8 +35,6 @@ TString SchemeKeySuffix();
TString MetadataKeySuffix();
TString DataKeySuffix(ui32 n, EDataFormat format, ECompressionCodec codec);
-TString ChecksumKey(const TString& objKey);
-
} // NBackupRestoreTraits
} // NDataShard
} // NKikimr
diff --git a/ydb/core/tx/datashard/export_checksum.h b/ydb/core/tx/datashard/export_checksum.h
deleted file mode 100644
index 0944c111f11..00000000000
--- a/ydb/core/tx/datashard/export_checksum.h
+++ /dev/null
@@ -1,20 +0,0 @@
-#pragma once
-
-#include <util/generic/string.h>
-
-namespace NKikimr::NDataShard {
-
-class IExportChecksum {
-public:
- using TPtr = std::unique_ptr<IExportChecksum>;
-
- virtual ~IExportChecksum() = default;
-
- virtual void AddData(TStringBuf data) = 0;
- virtual TString Serialize() = 0;
-};
-
-IExportChecksum* CreateExportChecksum();
-TString ComputeExportChecksum(TStringBuf data);
-
-} // NKikimr::NDataShard
diff --git a/ydb/core/tx/datashard/export_s3_buffer_raw.cpp b/ydb/core/tx/datashard/export_s3_buffer_raw.cpp
index ad6d03789c2..2af4b591af0 100644
--- a/ydb/core/tx/datashard/export_s3_buffer_raw.cpp
+++ b/ydb/core/tx/datashard/export_s3_buffer_raw.cpp
@@ -21,7 +21,7 @@ TS3BufferRaw::TS3BufferRaw(const TTagToColumn& columns, ui64 rowsLimit, ui64 byt
, BytesLimit(bytesLimit)
, Rows(0)
, BytesRead(0)
- , Checksum(enableChecksums ? CreateExportChecksum() : nullptr)
+ , Checksum(enableChecksums ? NBackup::CreateChecksum() : nullptr)
{
}
diff --git a/ydb/core/tx/datashard/export_s3_buffer_raw.h b/ydb/core/tx/datashard/export_s3_buffer_raw.h
index 816a6fa5864..6591242358d 100644
--- a/ydb/core/tx/datashard/export_s3_buffer_raw.h
+++ b/ydb/core/tx/datashard/export_s3_buffer_raw.h
@@ -4,6 +4,8 @@
#include "export_s3_buffer.h"
+#include <ydb/core/backup/common/checksum.h>
+
#include <util/generic/buffer.h>
namespace NKikimr {
@@ -42,7 +44,7 @@ protected:
ui64 BytesRead;
TBuffer Buffer;
- IExportChecksum::TPtr Checksum;
+ NBackup::IChecksum::TPtr Checksum;
TString ErrorString;
}; // TS3BufferRaw
diff --git a/ydb/core/tx/datashard/export_s3_uploader.cpp b/ydb/core/tx/datashard/export_s3_uploader.cpp
index cedcec73c70..faf3c0e3be1 100644
--- a/ydb/core/tx/datashard/export_s3_uploader.cpp
+++ b/ydb/core/tx/datashard/export_s3_uploader.cpp
@@ -1,6 +1,5 @@
#ifndef KIKIMR_DISABLE_S3_OPS
-#include "backup_restore_common.h"
#include "datashard.h"
#include "export_common.h"
#include "export_s3.h"
@@ -9,6 +8,8 @@
#include <ydb/core/base/appdata.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/library/services/services.pb.h>
+#include <ydb/core/backup/common/checksum.h>
+#include <ydb/core/backup/common/metadata.h>
#include <ydb/core/wrappers/s3_storage_config.h>
#include <ydb/core/wrappers/s3_wrapper.h>
#include <ydb/core/wrappers/events/common.h>
@@ -31,6 +32,7 @@
namespace NKikimr {
namespace NDataShard {
+using namespace NBackup;
using namespace NBackupRestoreTraits;
class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
@@ -183,7 +185,7 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
google::protobuf::TextFormat::PrintToString(Scheme.GetRef(), &Buffer);
if (EnableChecksums) {
- SchemeChecksum = ComputeExportChecksum(Buffer);
+ SchemeChecksum = NBackup::ComputeChecksum(Buffer);
}
auto request = Aws::S3::Model::PutObjectRequest()
@@ -202,7 +204,7 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
google::protobuf::TextFormat::PrintToString(Permissions.GetRef(), &Buffer);
if (EnableChecksums) {
- PermissionsChecksum = ComputeExportChecksum(Buffer);
+ PermissionsChecksum = NBackup::ComputeChecksum(Buffer);
}
auto request = Aws::S3::Model::PutObjectRequest()
@@ -217,7 +219,7 @@ class TS3Uploader: public TActorBootstrapped<TS3Uploader> {
Buffer = std::move(Metadata);
if (EnableChecksums) {
- MetadataChecksum = ComputeExportChecksum(Buffer);
+ MetadataChecksum = NBackup::ComputeChecksum(Buffer);
}
auto request = Aws::S3::Model::PutObjectRequest()
@@ -814,11 +816,11 @@ IActor* TS3Export::CreateUploader(const TActorId& dataShard, ui64 txId) const {
? GenYdbPermissions(Task.GetTable())
: Nothing();
- NBackupRestore::TMetadata metadata;
+ NBackup::TMetadata metadata;
metadata.SetVersion(Task.GetEnableChecksums() ? 1 : 0);
- NBackupRestore::TFullBackupMetadata::TPtr backup = new NBackupRestore::TFullBackupMetadata{
- .SnapshotVts = NBackupRestore::TVirtualTimestamp(
+ NBackup::TFullBackupMetadata::TPtr backup = new NBackup::TFullBackupMetadata{
+ .SnapshotVts = NBackup::TVirtualTimestamp(
Task.GetSnapshotStep(),
Task.GetSnapshotTxId())
};
diff --git a/ydb/core/tx/datashard/export_scan.h b/ydb/core/tx/datashard/export_scan.h
index 8c723448770..7699f2fca8e 100644
--- a/ydb/core/tx/datashard/export_scan.h
+++ b/ydb/core/tx/datashard/export_scan.h
@@ -2,8 +2,6 @@
#include "defs.h"
-#include "export_checksum.h"
-
#include <ydb/core/tablet_flat/flat_scan_iface.h>
#include <util/generic/ptr.h>
diff --git a/ydb/core/tx/datashard/import_s3.cpp b/ydb/core/tx/datashard/import_s3.cpp
index 655ee80172a..9897ebd8bf9 100644
--- a/ydb/core/tx/datashard/import_s3.cpp
+++ b/ydb/core/tx/datashard/import_s3.cpp
@@ -6,6 +6,7 @@
#include "import_common.h"
#include "import_s3.h"
+#include <ydb/core/backup/common/checksum.h>
#include <ydb/core/base/appdata.h>
#include <ydb/core/protos/datashard_config.pb.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>
@@ -47,6 +48,9 @@ namespace {
namespace NKikimr {
namespace NDataShard {
+using namespace NBackup;
+using namespace NBackupRestoreTraits;
+
using namespace NResourceBroker;
using namespace NWrappers;
@@ -346,7 +350,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
Client = RegisterWithSameMailbox(CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator()));
HeadObject(Settings.GetDataKey(DataFormat, CompressionCodec));
- Become(&TThis::StateWork);
+ Become(&TThis::StateDownloadData);
}
void HeadObject(const TString& key) {
@@ -409,7 +413,12 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
ETag = result.GetResult().GetETag();
ContentLength = result.GetResult().GetContentLength();
- Send(DataShard, new TEvDataShard::TEvGetS3DownloadInfo(TxId));
+ if (Checksum) {
+ HeadObject(ChecksumKey(Settings.GetDataKey(DataFormat, ECompressionCodec::None)));
+ Become(&TThis::StateDownloadChecksum);
+ } else {
+ Send(DataShard, new TEvDataShard::TEvGetS3DownloadInfo(TxId));
+ }
}
void Handle(TEvDataShard::TEvS3DownloadInfo::TPtr& ev) {
@@ -440,7 +449,9 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
WrittenRows = info.WrittenRows;
if (!ContentLength || ProcessedBytes >= ContentLength) {
- return Finish();
+ if (CheckChecksum()) {
+ return Finish();
+ }
}
Process();
@@ -470,6 +481,36 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
Process();
}
+ void HandleChecksum(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) {
+ IMPORT_LOG_D("HandleChecksum " << ev->Get()->ToString());
+
+ const auto& result = ev->Get()->Result;
+
+ if (!CheckResult(result, "HeadObject")) {
+ return;
+ }
+
+ const auto contentLength = result.GetResult().GetContentLength();
+ const auto checksumKey = ChecksumKey(Settings.GetDataKey(DataFormat, CompressionCodec));
+ GetObject(checksumKey, std::make_pair(0, contentLength - 1));
+ }
+
+ void HandleChecksum(TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) {
+ IMPORT_LOG_D("HandleChecksum " << ev->Get()->ToString());
+
+ auto& msg = *ev->Get();
+ const auto& result = msg.Result;
+
+ if (!CheckResult(result, "GetObject")) {
+ return;
+ }
+
+ ExpectedChecksum = msg.Body.substr(0, msg.Body.find(' '));
+
+ Send(DataShard, new TEvDataShard::TEvGetS3DownloadInfo(TxId));
+ Become(&TThis::StateDownloadData);
+ }
+
void Process() {
TStringBuf data;
TString error;
@@ -492,6 +533,10 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
<< ": " << error);
}
+ if (Checksum) {
+ Checksum->AddData(data);
+ }
+
RequestBuilder.New(TableInfo, Scheme);
TMemoryPool pool(256);
while (ProcessData(data, pool));
@@ -653,6 +698,26 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
return true;
}
+ bool CheckChecksum() {
+ if (!Checksum) {
+ return true;
+ }
+
+ TString gotChecksum = Checksum->Serialize();
+ if (gotChecksum == ExpectedChecksum) {
+ return true;
+ }
+
+ const TString error = TStringBuilder() << "Checksum mismatch:"
+ << ": expected# " << ExpectedChecksum
+ << ", got# " << gotChecksum;
+
+ IMPORT_LOG_E(error);
+ Finish(false, error);
+
+ return false;
+ }
+
static bool ShouldRetry(const Aws::S3::S3Error& error) {
return error.ShouldRetry();
}
@@ -736,6 +801,7 @@ public:
, Retries(task.GetNumberOfRetries())
, ReadBatchSize(task.GetS3Settings().GetLimits().GetReadBatchSize())
, ReadBufferSizeLimit(AppData()->DataShardConfig.GetRestoreReadBufferSizeLimit())
+ , Checksum(task.GetValidateChecksums() ? CreateChecksum() : nullptr)
{
}
@@ -757,7 +823,7 @@ public:
}
}
- STATEFN(StateWork) {
+ STATEFN(StateDownloadData) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvExternalStorage::TEvHeadObjectResponse, Handle);
hFunc(TEvExternalStorage::TEvGetObjectResponse, Handle);
@@ -770,6 +836,16 @@ public:
}
}
+ STATEFN(StateDownloadChecksum) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvExternalStorage::TEvHeadObjectResponse, HandleChecksum);
+ hFunc(TEvExternalStorage::TEvGetObjectResponse, HandleChecksum);
+
+ sFunc(TEvents::TEvWakeup, Restart);
+ sFunc(TEvents::TEvPoisonPill, NotifyDied);
+ }
+ }
+
private:
NWrappers::IExternalStorageConfig::TPtr ExternalStorageConfig;
const TActorId DataShard;
@@ -803,6 +879,8 @@ private:
THolder<TReadController> Reader;
TUploadRowsRequestBuilder RequestBuilder;
+ NBackup::IChecksum::TPtr Checksum;
+ TString ExpectedChecksum;
}; // TS3Downloader
IActor* CreateS3Downloader(const TActorId& dataShard, ui64 txId, const NKikimrSchemeOp::TRestoreTask& task, const TTableInfo& info) {
diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make
index 5fe808d9d92..c2e3717c0ad 100644
--- a/ydb/core/tx/datashard/ya.make
+++ b/ydb/core/tx/datashard/ya.make
@@ -3,7 +3,6 @@ LIBRARY()
SRCS(
alter_cdc_stream_unit.cpp
alter_table_unit.cpp
- backup_restore_common.cpp
backup_restore_traits.cpp
backup_unit.cpp
build_and_wait_dependencies_unit.cpp
@@ -153,7 +152,6 @@ SRCS(
execution_unit.h
execution_unit_ctors.h
execution_unit_kind.h
- export_checksum.cpp
export_common.cpp
export_iface.cpp
export_iface.h
@@ -256,6 +254,7 @@ PEERDIR(
library/cpp/l1_distance
library/cpp/l2_distance
ydb/core/actorlib_impl
+ ydb/core/backup/common
ydb/core/base
ydb/core/change_exchange
ydb/core/engine
diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp
index aa07665ed93..4bbc5aace97 100644
--- a/ydb/core/tx/schemeshard/schemeshard__init.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp
@@ -4433,6 +4433,10 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
item.Permissions = permissions;
}
+ if (rowset.HaveValue<Schema::ImportItems::Metadata>()) {
+ item.Metadata = NBackup::TMetadata::Deserialize(rowset.GetValue<Schema::ImportItems::Metadata>());
+ }
+
item.State = static_cast<TImportInfo::EState>(rowset.GetValue<Schema::ImportItems::State>());
item.WaitTxId = rowset.GetValueOrDefault<Schema::ImportItems::WaitTxId>(InvalidTxId);
item.NextIndexIdx = rowset.GetValueOrDefault<Schema::ImportItems::NextIndexIdx>(0);
diff --git a/ydb/core/tx/schemeshard/schemeshard_import.cpp b/ydb/core/tx/schemeshard/schemeshard_import.cpp
index 968897b29bd..0ced40d7370 100644
--- a/ydb/core/tx/schemeshard/schemeshard_import.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_import.cpp
@@ -179,6 +179,9 @@ void TSchemeShard::PersistImportItemScheme(NIceDb::TNiceDb& db, const TImportInf
NIceDb::TUpdate<Schema::ImportItems::Permissions>(item.Permissions->SerializeAsString())
);
}
+ db.Table<Schema::ImportItems>().Key(importInfo->Id, itemIdx).Update(
+ NIceDb::TUpdate<Schema::ImportItems::Metadata>(item.Metadata.Serialize())
+ );
}
void TSchemeShard::PersistImportItemDstPathId(NIceDb::TNiceDb& db, const TImportInfo::TPtr importInfo, ui32 itemIdx) {
diff --git a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp
index ea5f379f22a..4368d0299d5 100644
--- a/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp
@@ -168,6 +168,10 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> RestorePropose(
if (const auto region = importInfo->Settings.region()) {
restoreSettings.SetRegion(region);
}
+
+ if (item.Metadata.HasVersion()) {
+ task.SetValidateChecksums(item.Metadata.GetVersion() > 0 && !importInfo->Settings.skip_checksum_validation());
+ }
}
break;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp b/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp
index c802f2a419b..ab722b2c5e5 100644
--- a/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_import_scheme_getter.cpp
@@ -2,6 +2,8 @@
#include "schemeshard_import_helpers.h"
#include "schemeshard_private.h"
+#include <ydb/core/backup/common/checksum.h>
+#include <ydb/core/backup/common/metadata.h>
#include <ydb/core/wrappers/s3_storage_config.h>
#include <ydb/core/wrappers/s3_wrapper.h>
#include <ydb/public/api/protos/ydb_import.pb.h>
@@ -25,6 +27,11 @@ using namespace Aws;
// Downloads scheme-related objects from S3
class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
+ static TString MetadataKeyFromSettings(const Ydb::Import::ImportFromS3Settings& settings, ui32 itemIdx) {
+ Y_ABORT_UNLESS(itemIdx < (ui32)settings.items_size());
+ return TStringBuilder() << settings.items(itemIdx).source_prefix() << "/metadata.json";
+ }
+
static TString SchemeKeyFromSettings(const Ydb::Import::ImportFromS3Settings& settings, ui32 itemIdx) {
Y_ABORT_UNLESS(itemIdx < (ui32)settings.items_size());
return TStringBuilder() << settings.items(itemIdx).source_prefix() << "/scheme.pb";
@@ -42,6 +49,21 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
Send(Client, new TEvExternalStorage::TEvHeadObjectRequest(request));
}
+ void HandleMetadata(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) {
+ const auto& result = ev->Get()->Result;
+
+ LOG_D("HandleMetadata TEvExternalStorage::TEvHeadObjectResponse"
+ << ": self# " << SelfId()
+ << ", result# " << result);
+
+ if (!CheckResult(result, "HeadObject")) {
+ return;
+ }
+
+ const auto contentLength = result.GetResult().GetContentLength();
+ GetObject(MetadataKey, std::make_pair(0, contentLength - 1));
+ }
+
void HandleScheme(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) {
const auto& result = ev->Get()->Result;
@@ -76,6 +98,21 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
GetObject(PermissionsKey, std::make_pair(0, contentLength - 1));
}
+ void HandleChecksum(TEvExternalStorage::TEvHeadObjectResponse::TPtr& ev) {
+ const auto& result = ev->Get()->Result;
+
+ LOG_D("HandleChecksum TEvExternalStorage::TEvHeadObjectResponse"
+ << ": self# " << SelfId()
+ << ", result# " << result);
+
+ if (!CheckResult(result, "HeadObject")) {
+ return;
+ }
+
+ const auto contentLength = result.GetResult().GetContentLength();
+ GetObject(ChecksumKey, std::make_pair(0, contentLength - 1));
+ }
+
void GetObject(const TString& key, const std::pair<ui64, ui64>& range) {
auto request = Model::GetObjectRequest()
.WithKey(key)
@@ -84,6 +121,44 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
Send(Client, new TEvExternalStorage::TEvGetObjectRequest(request));
}
+ void HandleMetadata(TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) {
+ const auto& msg = *ev->Get();
+ const auto& result = msg.Result;
+
+ LOG_D("HandleMetadata TEvExternalStorage::TEvGetObjectResponse"
+ << ": self# " << SelfId()
+ << ", result# " << result);
+
+ if (!CheckResult(result, "GetObject")) {
+ return;
+ }
+
+ Y_ABORT_UNLESS(ItemIdx < ImportInfo->Items.size());
+ auto& item = ImportInfo->Items.at(ItemIdx);
+
+ LOG_T("Trying to parse metadata"
+ << ": self# " << SelfId()
+ << ", body# " << SubstGlobalCopy(msg.Body, "\n", "\\n"));
+
+ item.Metadata = NBackup::TMetadata::Deserialize(msg.Body);
+
+ if (!item.Metadata.HasVersion()) {
+ return Reply(false, "Metadata is corrupted: no version");
+ }
+
+ NeedValidateChecksums = item.Metadata.GetVersion() > 0 && !SkipChecksumValidation;
+
+ auto nextStep = [this]() {
+ StartDownloadingScheme();
+ };
+
+ if (NeedValidateChecksums) {
+ StartValidatingChecksum(MetadataKey, msg.Body, nextStep);
+ } else {
+ nextStep();
+ }
+ }
+
void HandleScheme(TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) {
const auto& msg = *ev->Get();
const auto& result = msg.Result;
@@ -107,10 +182,18 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
return Reply(false, "Cannot parse scheme");
}
- if (NeedDownloadPermissions) {
- StartDownloadingPermissions();
+ auto nextStep = [this]() {
+ if (NeedDownloadPermissions) {
+ StartDownloadingPermissions();
+ } else {
+ Reply();
+ }
+ };
+
+ if (NeedValidateChecksums) {
+ StartValidatingChecksum(SchemeKey, msg.Body, nextStep);
} else {
- Reply();
+ nextStep();
}
}
@@ -139,7 +222,37 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
}
item.Permissions = std::move(permissions);
- Reply();
+ auto nextStep = [this]() {
+ Reply();
+ };
+
+ if (NeedValidateChecksums) {
+ StartValidatingChecksum(PermissionsKey, msg.Body, nextStep);
+ } else {
+ nextStep();
+ }
+ }
+
+ void HandleChecksum(TEvExternalStorage::TEvGetObjectResponse::TPtr& ev) {
+ const auto& msg = *ev->Get();
+ const auto& result = msg.Result;
+
+ LOG_D("HandleChecksum TEvExternalStorage::TEvGetObjectResponse"
+ << ": self# " << SelfId()
+ << ", result# " << result);
+
+ if (!CheckResult(result, "GetObject")) {
+ return;
+ }
+
+ TString expectedChecksum = msg.Body.substr(0, msg.Body.find(' '));
+ if (expectedChecksum != Checksum) {
+ return Reply(false, TStringBuilder() << "Checksum mismatch for " << ChecksumKey
+ << " expected# " << expectedChecksum
+ << ", got# " << Checksum);
+ }
+
+ ChecksumValidatedCallback();
}
template <typename TResult>
@@ -189,6 +302,10 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
HeadObject(key);
}
+ void DownloadMetadata() {
+ Download(MetadataKey);
+ }
+
void DownloadScheme() {
Download(SchemeKey);
}
@@ -197,32 +314,64 @@ class TSchemeGetter: public TActorBootstrapped<TSchemeGetter> {
Download(PermissionsKey);
}
+ void DownloadChecksum() {
+ Download(ChecksumKey);
+ }
+
void ResetRetries() {
Attempt = 0;
}
+ void StartDownloadingScheme() {
+ ResetRetries();
+ DownloadScheme();
+ Become(&TThis::StateDownloadScheme);
+ }
+
void StartDownloadingPermissions() {
ResetRetries();
DownloadPermissions();
Become(&TThis::StateDownloadPermissions);
}
+ void StartValidatingChecksum(const TString& key, const TString& object, std::function<void()> checksumValidatedCallback) {
+ ChecksumKey = NBackup::ChecksumKey(key);
+ Checksum = NBackup::ComputeChecksum(object);
+ ChecksumValidatedCallback = checksumValidatedCallback;
+
+ ResetRetries();
+ DownloadChecksum();
+ Become(&TThis::StateDownloadChecksum);
+ }
+
public:
explicit TSchemeGetter(const TActorId& replyTo, TImportInfo::TPtr importInfo, ui32 itemIdx)
: ExternalStorageConfig(new NWrappers::NExternalStorage::TS3ExternalStorageConfig(importInfo->Settings))
, ReplyTo(replyTo)
, ImportInfo(importInfo)
, ItemIdx(itemIdx)
+ , MetadataKey(MetadataKeyFromSettings(importInfo->Settings, itemIdx))
, SchemeKey(SchemeKeyFromSettings(importInfo->Settings, itemIdx))
, PermissionsKey(PermissionsKeyFromSettings(importInfo->Settings, itemIdx))
, Retries(importInfo->Settings.number_of_retries())
, NeedDownloadPermissions(!importInfo->Settings.no_acl())
+ , SkipChecksumValidation(importInfo->Settings.skip_checksum_validation())
{
}
void Bootstrap() {
- DownloadScheme();
- Become(&TThis::StateDownloadScheme);
+ DownloadMetadata();
+ Become(&TThis::StateDownloadMetadata);
+ }
+
+ STATEFN(StateDownloadMetadata) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvExternalStorage::TEvHeadObjectResponse, HandleMetadata);
+ hFunc(TEvExternalStorage::TEvGetObjectResponse, HandleMetadata);
+
+ sFunc(TEvents::TEvWakeup, DownloadMetadata);
+ sFunc(TEvents::TEvPoisonPill, PassAway);
+ }
}
STATEFN(StateDownloadScheme) {
@@ -245,12 +394,23 @@ public:
}
}
+ STATEFN(StateDownloadChecksum) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvExternalStorage::TEvHeadObjectResponse, HandleChecksum);
+ hFunc(TEvExternalStorage::TEvGetObjectResponse, HandleChecksum);
+
+ sFunc(TEvents::TEvWakeup, DownloadChecksum);
+ sFunc(TEvents::TEvPoisonPill, PassAway);
+ }
+ }
+
private:
NWrappers::IExternalStorageConfig::TPtr ExternalStorageConfig;
const TActorId ReplyTo;
TImportInfo::TPtr ImportInfo;
const ui32 ItemIdx;
+ const TString MetadataKey;
const TString SchemeKey;
const TString PermissionsKey;
@@ -264,6 +424,12 @@ private:
TActorId Client;
+ const bool SkipChecksumValidation = false;
+ bool NeedValidateChecksums = true;
+
+ TString Checksum;
+ TString ChecksumKey;
+ std::function<void()> ChecksumValidatedCallback;
}; // TSchemeGetter
IActor* CreateSchemeGetter(const TActorId& replyTo, TImportInfo::TPtr importInfo, ui32 itemIdx) {
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h
index 765a4077158..81a4b4012fd 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h
@@ -12,6 +12,7 @@
#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/core/control/immediate_control_board_impl.h>
+#include <ydb/core/backup/common/metadata.h>
#include <ydb/core/base/feature_flags.h>
#include <ydb/core/base/table_vector_index.h>
#include <ydb/core/persqueue/partition_key_range/partition_key_range.h>
@@ -2822,6 +2823,7 @@ struct TImportInfo: public TSimpleRefCount<TImportInfo> {
TPathId DstPathId;
Ydb::Table::CreateTableRequest Scheme;
TMaybeFail<Ydb::Scheme::ModifyPermissionsRequest> Permissions;
+ NBackup::TMetadata Metadata;
EState State = EState::GetScheme;
ESubState SubState = ESubState::AllocateTxId;
diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h
index 76d29549ea3..cb90003e59b 100644
--- a/ydb/core/tx/schemeshard/schemeshard_schema.h
+++ b/ydb/core/tx/schemeshard/schemeshard_schema.h
@@ -1552,6 +1552,7 @@ struct Schema : NIceDb::Schema {
struct DstPathLocalId : Column<5, NScheme::NTypeIds::Uint64> { using Type = TLocalPathId; };
struct Scheme : Column<6, NScheme::NTypeIds::String> {};
struct Permissions : Column<11, NScheme::NTypeIds::String> {};
+ struct Metadata : Column<12, NScheme::NTypeIds::String> {};
struct State : Column<7, NScheme::NTypeIds::Byte> {};
struct WaitTxId : Column<8, NScheme::NTypeIds::Uint64> { using Type = TTxId; };
@@ -1567,6 +1568,7 @@ struct Schema : NIceDb::Schema {
DstPathLocalId,
Scheme,
Permissions,
+ Metadata,
State,
WaitTxId,
NextIndexIdx,
diff --git a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp
index 7d3a9c202df..77194e77eb4 100644
--- a/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp
+++ b/ydb/core/tx/schemeshard/ut_restore/ut_restore.cpp
@@ -138,6 +138,7 @@ namespace {
};
struct TTestDataWithScheme {
+ TString Metadata;
TString Scheme;
TString Permissions;
TVector<TTestData> Data;
@@ -245,11 +246,13 @@ namespace {
TTestDataWithScheme GenerateTestData(
const TString& scheme,
const TVector<std::pair<TString, ui64>>& shardsConfig,
- const TString& permissions = "")
+ const TString& permissions = "",
+ const TString& metadata = "")
{
TTestDataWithScheme result;
result.Scheme = scheme;
result.Permissions = permissions;
+ result.Metadata = metadata;
for (const auto& [keyPrefix, count] : shardsConfig) {
result.Data.push_back(GenerateTestData(keyPrefix, count));
@@ -263,6 +266,12 @@ namespace {
for (const auto& [prefix, item] : data) {
result.emplace(prefix + "/scheme.pb", item.Scheme);
+ if (item.Metadata) {
+ result.emplace(prefix + "/metadata.json", item.Metadata);
+ } else {
+ result.emplace(prefix + "/metadata.json", R"({"version": 0})"); // without checksums
+ }
+
if (item.Permissions) {
result.emplace(prefix + "/permissions.pb", item.Permissions);
}
@@ -4341,6 +4350,139 @@ Y_UNIT_TEST_SUITE(TImportTests) {
NLs::HasNoRight("+R:bob")
});
}
+
+ Y_UNIT_TEST(CorruptedMetadata) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+ ui64 txId = 100;
+
+ const auto metadata = R"(
+ corrupted
+ )";
+
+ const auto data = GenerateTestData(R"(
+ columns {
+ name: "key"
+ type { optional_type { item { type_id: UTF8 } } }
+ }
+ columns {
+ name: "value"
+ type { optional_type { item { type_id: UTF8 } } }
+ }
+ primary_key: "key"
+ )", {{"a", 1}}, "", metadata);
+
+ TPortManager portManager;
+ const ui16 port = portManager.GetPort();
+
+ TS3Mock s3Mock(ConvertTestData(data), TS3Mock::TSettings(port));
+ UNIT_ASSERT(s3Mock.Start());
+
+ TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ ImportFromS3Settings {
+ endpoint: "localhost:%d"
+ scheme: HTTP
+ items {
+ source_prefix: ""
+ destination_path: "/MyRoot/Table"
+ }
+ }
+ )", port));
+ env.TestWaitNotification(runtime, txId);
+
+ auto desc = TestGetImport(runtime, txId, "/MyRoot", Ydb::StatusIds::CANCELLED);
+ auto entry = desc.GetResponse().GetEntry();
+ UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Import::ImportProgress::PROGRESS_CANCELLED);
+ }
+
+ Y_UNIT_TEST(NoDataChecksums) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+ ui64 txId = 100;
+
+ const auto metadata = R"({
+ "version": 1
+ })";
+
+ const auto data = GenerateTestData(R"(
+ columns {
+ name: "key"
+ type { optional_type { item { type_id: UTF8 } } }
+ }
+ columns {
+ name: "value"
+ type { optional_type { item { type_id: UTF8 } } }
+ }
+ primary_key: "key"
+ )", {{"a", 1}}, "", metadata);
+
+ TPortManager portManager;
+ const ui16 port = portManager.GetPort();
+
+ TS3Mock s3Mock(ConvertTestData(data), TS3Mock::TSettings(port));
+ UNIT_ASSERT(s3Mock.Start());
+
+ TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ ImportFromS3Settings {
+ endpoint: "localhost:%d"
+ scheme: HTTP
+ items {
+ source_prefix: ""
+ destination_path: "/MyRoot/Table"
+ }
+ }
+ )", port));
+ env.TestWaitNotification(runtime, txId);
+
+ auto desc = TestGetImport(runtime, txId, "/MyRoot", Ydb::StatusIds::CANCELLED);
+ auto entry = desc.GetResponse().GetEntry();
+ UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Import::ImportProgress::PROGRESS_CANCELLED);
+ }
+
+ Y_UNIT_TEST(SkipChecksumValidation) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+ ui64 txId = 100;
+
+ const auto metadata = R"(
+ corrupted
+ )";
+
+ const auto data = GenerateTestData(R"(
+ columns {
+ name: "key"
+ type { optional_type { item { type_id: UTF8 } } }
+ }
+ columns {
+ name: "value"
+ type { optional_type { item { type_id: UTF8 } } }
+ }
+ primary_key: "key"
+ )", {{"a", 1}}, "", metadata);
+
+ TPortManager portManager;
+ const ui16 port = portManager.GetPort();
+
+ TS3Mock s3Mock(ConvertTestData(data), TS3Mock::TSettings(port));
+ UNIT_ASSERT(s3Mock.Start());
+
+ TestImport(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ ImportFromS3Settings {
+ endpoint: "localhost:%d"
+ scheme: HTTP
+ items {
+ source_prefix: ""
+ destination_path: "/MyRoot/Table"
+ }
+ skip_checksum_validation: true
+ }
+ )", port));
+ env.TestWaitNotification(runtime, txId);
+
+ auto desc = TestGetImport(runtime, txId, "/MyRoot", Ydb::StatusIds::CANCELLED);
+ auto entry = desc.GetResponse().GetEntry();
+ UNIT_ASSERT_VALUES_EQUAL(entry.GetProgress(), Ydb::Import::ImportProgress::PROGRESS_CANCELLED);
+ }
}
Y_UNIT_TEST_SUITE(TImportWithRebootsTests) {
diff --git a/ydb/public/api/protos/ydb_import.proto b/ydb/public/api/protos/ydb_import.proto
index 4717aa95319..2a575a1f438 100644
--- a/ydb/public/api/protos/ydb_import.proto
+++ b/ydb/public/api/protos/ydb_import.proto
@@ -72,6 +72,9 @@ message ImportFromS3Settings {
// Prevent importing of ACL and owner. If true, objects are created with empty ACL
// and their owner will be the user who started the import.
bool no_acl = 11;
+
+ // Skip checksum validation during import
+ bool skip_checksum_validation = 12;
}
message ImportFromS3Result {
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp
index 57c2764bebb..6834e0977e6 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp
@@ -105,6 +105,9 @@ void TCommandImportFromS3::Config(TConfig& config) {
config.Opts->AddLongOption("no-acl", "Prevent importing of ACL and owner")
.RequiredArgument("BOOL").StoreTrue(&NoACL).DefaultValue("false");
+ config.Opts->AddLongOption("skip-checksum-validation", "Skip checksum validation during import")
+ .RequiredArgument("BOOL").StoreTrue(&SkipChecksumValidation).DefaultValue("false");
+
AddDeprecatedJsonOption(config);
AddOutputFormats(config, { EDataFormat::Pretty, EDataFormat::ProtoJsonBase64 });
config.Opts->MutuallyExclusive("json", "format");
@@ -146,6 +149,7 @@ int TCommandImportFromS3::Run(TConfig& config) {
settings.NumberOfRetries(NumberOfRetries);
settings.NoACL(NoACL);
+ settings.SkipChecksumValidation(SkipChecksumValidation);
#if defined(_win32_)
for (const auto& item : Items) {
settings.AppendItem({item.Source, item.Destination});
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_import.h b/ydb/public/lib/ydb_cli/commands/ydb_service_import.h
index c396d4c5e37..5702f4bc970 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_import.h
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_import.h
@@ -38,7 +38,8 @@ private:
TString Description;
ui32 NumberOfRetries = 10;
bool UseVirtualAddressing = true;
- bool NoACL = true;
+ bool NoACL = false;
+ bool SkipChecksumValidation = false;
};
class TCommandImportFromFile : public TClientCommandTree {
diff --git a/ydb/public/lib/ydb_cli/common/print_operation.cpp b/ydb/public/lib/ydb_cli/common/print_operation.cpp
index 80f60145065..659d6004d9d 100644
--- a/ydb/public/lib/ydb_cli/common/print_operation.cpp
+++ b/ydb/public/lib/ydb_cli/common/print_operation.cpp
@@ -170,6 +170,16 @@ namespace {
}
}
+ if constexpr (std::is_same_v<NImport::TImportFromS3Response, T>) {
+ if (settings.NoACL_) {
+ freeText << "NoACL: " << settings.NoACL_ << Endl;
+ }
+
+ if (settings.SkipChecksumValidation_) {
+ freeText << "SkipChecksumValidation: " << settings.SkipChecksumValidation_ << Endl;
+ }
+ }
+
if (!status.GetIssues().Empty()) {
freeText << "Issues: " << Endl;
for (const auto& issue : status.GetIssues()) {
diff --git a/ydb/public/sdk/cpp/client/ydb_import/import.h b/ydb/public/sdk/cpp/client/ydb_import/import.h
index 8f479cfd905..162e973e0e3 100644
--- a/ydb/public/sdk/cpp/client/ydb_import/import.h
+++ b/ydb/public/sdk/cpp/client/ydb_import/import.h
@@ -42,6 +42,7 @@ struct TImportFromS3Settings : public TOperationRequestSettings<TImportFromS3Set
FLUENT_SETTING_OPTIONAL(TString, Description);
FLUENT_SETTING_OPTIONAL(ui32, NumberOfRetries);
FLUENT_SETTING_OPTIONAL(bool, NoACL);
+ FLUENT_SETTING_OPTIONAL(bool, SkipChecksumValidation);
};
class TImportFromS3Response : public TOperation {
diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema
index ed19c6e0a55..9965f0af5e7 100644
--- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema
+++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema
@@ -6168,6 +6168,11 @@
"ColumnId": 11,
"ColumnName": "Permissions",
"ColumnType": "String"
+ },
+ {
+ "ColumnId": 12,
+ "ColumnName": "Metadata",
+ "ColumnType": "String"
}
],
"ColumnsDropped": [],
@@ -6184,7 +6189,8 @@
8,
9,
10,
- 11
+ 11,
+ 12
],
"RoomID": 0,
"Codec": 0,