diff options
-rw-r--r-- | ydb/core/backup/common/checksum.h | 2 | ||||
-rw-r--r-- | ydb/core/backup/common/encryption.cpp | 2 | ||||
-rw-r--r-- | ydb/core/backup/common/encryption.h | 2 | ||||
-rw-r--r-- | ydb/core/protos/datashard_backup.proto (renamed from ydb/core/protos/checksum.proto) | 4 | ||||
-rw-r--r-- | ydb/core/protos/ya.make | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 10 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_s3_download.h | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_s3_downloads.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/datashard/import_s3.cpp | 132 | ||||
-rw-r--r-- | ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema | 20 |
10 files changed, 154 insertions, 31 deletions
diff --git a/ydb/core/backup/common/checksum.h b/ydb/core/backup/common/checksum.h index a647324ff4..c25c9745be 100644 --- a/ydb/core/backup/common/checksum.h +++ b/ydb/core/backup/common/checksum.h @@ -1,6 +1,6 @@ #pragma once -#include <ydb/core/protos/checksum.pb.h> +#include <ydb/core/protos/datashard_backup.pb.h> #include <util/generic/string.h> diff --git a/ydb/core/backup/common/encryption.cpp b/ydb/core/backup/common/encryption.cpp index 55624c4e2c..cef0dc8c85 100644 --- a/ydb/core/backup/common/encryption.cpp +++ b/ydb/core/backup/common/encryption.cpp @@ -826,6 +826,8 @@ TEncryptedFileDeserializer::TEncryptedFileDeserializer() TEncryptedFileDeserializer::~TEncryptedFileDeserializer() = default; +TEncryptedFileDeserializer& TEncryptedFileDeserializer::operator=(TEncryptedFileDeserializer&&) = default; + void TEncryptedFileDeserializer::AddData(TBuffer data, bool last) { Impl->AddData(std::move(data), last); } diff --git a/ydb/core/backup/common/encryption.h b/ydb/core/backup/common/encryption.h index 5e7ac585d6..9b78445e78 100644 --- a/ydb/core/backup/common/encryption.h +++ b/ydb/core/backup/common/encryption.h @@ -199,6 +199,8 @@ public: TEncryptedFileDeserializer(TEncryptionKey key, TEncryptionIV expectedIV); // Decrypt file with key. Check that IV in header is equal to expectedIV ~TEncryptedFileDeserializer(); + TEncryptedFileDeserializer& operator=(TEncryptedFileDeserializer&&); + // Adds buffer with input data. void AddData(TBuffer data, bool last); diff --git a/ydb/core/protos/checksum.proto b/ydb/core/protos/datashard_backup.proto index d6f681c6bc..2516244aac 100644 --- a/ydb/core/protos/checksum.proto +++ b/ydb/core/protos/datashard_backup.proto @@ -17,3 +17,7 @@ message TChecksumState { TSha256State Sha256State = 1; } } + +message TS3DownloadState { + optional bytes EncryptedDeserializerState = 1; +} diff --git a/ydb/core/protos/ya.make b/ydb/core/protos/ya.make index 1fc65aca56..c519f7962a 100644 --- a/ydb/core/protos/ya.make +++ b/ydb/core/protos/ya.make @@ -31,7 +31,6 @@ SRCS( bootstrapper.proto change_exchange.proto channel_purpose.proto - checksum.proto cms.proto compaction.proto compile_service_config.proto @@ -68,6 +67,7 @@ SRCS( database_basic_sausage_metainfo.proto datashard_config.proto datashard_load.proto + datashard_backup.proto db_metadata_cache.proto drivemodel.proto export.proto diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index b3b70a2594..c0d4b61c19 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -52,7 +52,7 @@ #include <ydb/core/protos/tx.pb.h> #include <ydb/core/protos/tx_datashard.pb.h> #include <ydb/core/protos/subdomains.pb.h> -#include <ydb/core/protos/checksum.pb.h> +#include <ydb/core/protos/datashard_backup.pb.h> #include <ydb/core/protos/counters_datashard.pb.h> #include <ydb/core/protos/table_stats.pb.h> @@ -792,6 +792,7 @@ class TDataShard struct WrittenBytes : Column<5, NScheme::NTypeIds::Uint64> {}; struct WrittenRows : Column<6, NScheme::NTypeIds::Uint64> {}; struct ChecksumState : Column<7, NScheme::NTypeIds::String> { using Type = NKikimrBackup::TChecksumState; }; + struct DownloadState : Column<8, NScheme::NTypeIds::String> { using Type = NKikimrBackup::TS3DownloadState; }; using TKey = TableKey<TxId>; using TColumns = TableColumns< @@ -801,7 +802,8 @@ class TDataShard ProcessedBytes, WrittenBytes, WrittenRows, - ChecksumState + ChecksumState, + DownloadState >; }; @@ -3395,14 +3397,14 @@ protected: } for (const auto& pi : SysTablesPartOwners) { ev->Record.AddSysTablesPartOwners(pi); - } + } } ev->Record.MutableTableStats()->SetImmediateTxCompleted(TabletCounters->Cumulative()[COUNTER_PREPARE_IMMEDIATE].Get() + TabletCounters->Cumulative()[COUNTER_WRITE_IMMEDIATE].Get()); ev->Record.MutableTableStats()->SetPlannedTxCompleted(TabletCounters->Cumulative()[COUNTER_PLANNED_TX_COMPLETE].Get()); ev->Record.MutableTableStats()->SetTxRejectedByOverload(TabletCounters->Cumulative()[COUNTER_PREPARE_OVERLOADED].Get() + TabletCounters->Cumulative()[COUNTER_WRITE_OVERLOADED].Get()); ev->Record.MutableTableStats()->SetTxRejectedBySpace( - TabletCounters->Cumulative()[COUNTER_PREPARE_OUT_OF_SPACE].Get() + TabletCounters->Cumulative()[COUNTER_PREPARE_OUT_OF_SPACE].Get() + TabletCounters->Cumulative()[COUNTER_PREPARE_DISK_SPACE_EXHAUSTED].Get() + TabletCounters->Cumulative()[COUNTER_WRITE_OUT_OF_SPACE].Get() + TabletCounters->Cumulative()[COUNTER_WRITE_DISK_SPACE_EXHAUSTED].Get() diff --git a/ydb/core/tx/datashard/datashard_s3_download.h b/ydb/core/tx/datashard/datashard_s3_download.h index d75e41cce1..63566699cf 100644 --- a/ydb/core/tx/datashard/datashard_s3_download.h +++ b/ydb/core/tx/datashard/datashard_s3_download.h @@ -1,6 +1,6 @@ #pragma once -#include <ydb/core/protos/checksum.pb.h> +#include <ydb/core/protos/datashard_backup.pb.h> #include <util/generic/maybe.h> @@ -13,6 +13,7 @@ struct TS3Download { ui64 WrittenBytes = 0; ui64 WrittenRows = 0; NKikimrBackup::TChecksumState ChecksumState; + NKikimrBackup::TS3DownloadState DownloadState; void Out(IOutputStream& out) const { out << "{" @@ -21,6 +22,7 @@ struct TS3Download { << " WrittenBytes: " << WrittenBytes << " WrittenRows: " << WrittenRows << " ChecksumState: " << ChecksumState.ShortDebugString() + << " DownloadState: " << DownloadState.ShortDebugString() << " }"; } }; diff --git a/ydb/core/tx/datashard/datashard_s3_downloads.cpp b/ydb/core/tx/datashard/datashard_s3_downloads.cpp index 49963761b9..d77b418360 100644 --- a/ydb/core/tx/datashard/datashard_s3_downloads.cpp +++ b/ydb/core/tx/datashard/datashard_s3_downloads.cpp @@ -28,6 +28,10 @@ bool TS3DownloadsManager::Load(NIceDb::TNiceDb& db) { info.ChecksumState = rowset.GetValue<Schema::S3Downloads::ChecksumState>(); } + if (rowset.HaveValue<Schema::S3Downloads::DownloadState>()) { + info.DownloadState = rowset.GetValue<Schema::S3Downloads::DownloadState>(); + } + if (!rowset.Next()) { ready = false; break; @@ -61,7 +65,8 @@ const TS3Download& TS3DownloadsManager::Store(NIceDb::TNiceDb& db, ui64 txId, co NIceDb::TUpdate<Schema::S3Downloads::ProcessedBytes>(newInfo.ProcessedBytes), NIceDb::TUpdate<Schema::S3Downloads::WrittenBytes>(newInfo.WrittenBytes), NIceDb::TUpdate<Schema::S3Downloads::WrittenRows>(newInfo.WrittenRows), - NIceDb::TUpdate<Schema::S3Downloads::ChecksumState>(newInfo.ChecksumState) + NIceDb::TUpdate<Schema::S3Downloads::ChecksumState>(newInfo.ChecksumState), + NIceDb::TUpdate<Schema::S3Downloads::DownloadState>(newInfo.DownloadState) ); return info; diff --git a/ydb/core/tx/datashard/import_s3.cpp b/ydb/core/tx/datashard/import_s3.cpp index 9cc842303e..02dc4f42dd 100644 --- a/ydb/core/tx/datashard/import_s3.cpp +++ b/ydb/core/tx/datashard/import_s3.cpp @@ -7,6 +7,7 @@ #include "import_s3.h" #include <ydb/core/backup/common/checksum.h> +#include <ydb/core/backup/common/encryption.h> #include <ydb/core/base/appdata.h> #include <ydb/core/protos/datashard_config.pb.h> #include <ydb/core/protos/flat_scheme_op.pb.h> @@ -68,13 +69,17 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { public: virtual ~IReadController() = default; - virtual void Feed(TString&& portion) = 0; - // Returns data (view to internal buffer) or error + virtual void Feed(TString&& portion, bool last) = 0; + // Returns data (points to internal buffer) or error virtual EDataStatus TryGetData(TStringBuf& data, TString& error) = 0; - // Clear internal buffer & makes it ready for another Feed() and TryGetData() - virtual void Confirm() = 0; + // Clears internal buffer & makes it ready for another Feed() and TryGetData() + virtual void Confirm(NKikimrBackup::TS3DownloadState& state) = 0; + // Bytes that were read from S3 and put into controller. In terms of input bytes virtual ui64 PendingBytes() const = 0; + // Bytes that controller has given to processing. In terms of input bytes virtual ui64 ReadyBytes() const = 0; + virtual std::pair<ui64, ui64> NextRange(ui64 contentLength, ui64 processedBytes) const = 0; + virtual bool RestoreFromState(const NKikimrBackup::TS3DownloadState& state, TString& error) = 0; }; class TReadController: public IReadController { @@ -87,7 +92,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { Buffer.Reserve(RangeSize); } - std::pair<ui64, ui64> NextRange(ui64 contentLength, ui64 processedBytes) const { + std::pair<ui64, ui64> NextRange(ui64 contentLength, ui64 processedBytes) const override { Y_ABORT_UNLESS(contentLength > 0); Y_ABORT_UNLESS(processedBytes < contentLength); @@ -96,6 +101,10 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { return std::make_pair(start, end); } + bool RestoreFromState(const NKikimrBackup::TS3DownloadState&, TString&) override { + return true; + } + protected: bool CanIncreaseBuffer(size_t size, size_t delta, TString& reason) const { if ((size + delta) >= BufferSizeLimit) { @@ -127,7 +136,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { public: using TReadController::TReadController; - void Feed(TString&& portion) override { + void Feed(TString&& portion, bool /* last */) override { Buffer.Append(portion.data(), portion.size()); } @@ -149,7 +158,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { return READY_DATA; } - void Confirm() override { + void Confirm(NKikimrBackup::TS3DownloadState&) override { Buffer.ChopHead(Pos); Pos = 0; } @@ -183,7 +192,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { Buffer.Reserve(AppData()->ZstdBlockSizeForTest.GetOrElse(ZSTD_BLOCKSIZE_MAX)); } - void Feed(TString&& portion) override { + void Feed(TString&& portion, bool /* last */) override { Y_ABORT_UNLESS(Portion.Empty()); Portion.Assign(portion.data(), portion.size()); } @@ -253,7 +262,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { return READY_DATA; } - void Confirm() override { + void Confirm(NKikimrBackup::TS3DownloadState&) override { Buffer.ChopHead(ReadyOutputPos); ReadyOutputPos = 0; @@ -277,6 +286,86 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { ui64 ReadyOutputPos = 0; }; + class TEncryptionDeserializerController: public IReadController { + public: + TEncryptionDeserializerController( + NBackup::TEncryptionKey key, + NBackup::TEncryptionIV expectedIV, + THolder<IReadController> deserializedDataController) + : Deserializer(std::move(key), std::move(expectedIV)) + , DataController(std::move(deserializedDataController)) + { + } + + void Feed(TString&& portion, bool last) override { + Last = last; + FeedUnprocessedBytes += portion.size(); + Deserializer.AddData(TBuffer(portion.data(), portion.size()), last); + } + + EDataStatus TryGetData(TStringBuf& data, TString& error) override { + try { + const ui64 processedBefore = Deserializer.GetProcessedInputBytes(); + TMaybe<TBuffer> block = Deserializer.GetNextBlock(); // Returns at least one encrypted block + if (block) { + const ui64 processedAfter = Deserializer.GetProcessedInputBytes(); + Y_ABORT_UNLESS(processedAfter - processedBefore <= FeedUnprocessedBytes); + // Data is read by blocks from encrypted file. + // Each block contains at least one row of data with '\n', + // so we will always get some data from DataController. + ReadyInputBytes += processedAfter - processedBefore; + DataController->Feed(TString(block->Data(), block->Size()), Last); + const EDataStatus status = DataController->TryGetData(data, error); + Y_ABORT_UNLESS(status == READY_DATA); + return status; + } else { + return NOT_ENOUGH_DATA; + } + } catch (const std::exception& ex) { + error = ex.what(); + return ERROR; + } + } + + void Confirm(NKikimrBackup::TS3DownloadState& state) override { + state.SetEncryptedDeserializerState(Deserializer.GetState()); + FeedUnprocessedBytes -= ReadyInputBytes; + ReadyInputBytes = 0; + return DataController->Confirm(state); + } + + ui64 PendingBytes() const override { + return FeedUnprocessedBytes; + } + + ui64 ReadyBytes() const override { + return ReadyInputBytes; + } + + std::pair<ui64, ui64> NextRange(ui64 contentLength, ui64 processedBytes) const override { + return DataController->NextRange(contentLength, processedBytes); + } + + bool RestoreFromState(const NKikimrBackup::TS3DownloadState& state, TString& error) override { + if (const TString& deserializerState = state.GetEncryptedDeserializerState()) { + try { + Deserializer = NBackup::TEncryptedFileDeserializer::RestoreFromState(deserializerState); + } catch (const std::exception& ex) { + error = ex.what(); + return false; + } + } + return DataController->RestoreFromState(state, error); + } + + private: + bool Last = false; + ui64 FeedUnprocessedBytes = 0; + ui64 ReadyInputBytes = 0; + NBackup::TEncryptedFileDeserializer Deserializer; + THolder<IReadController> DataController; + }; + class TUploadRowsRequestBuilder { public: void New(const TTableInfo& tableInfo, const NKikimrSchemeOp::TTableDescription& scheme) { @@ -435,7 +524,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { const auto& info = ev->Get()->Info; if (!info.DataETag) { Send(DataShard, new TEvDataShard::TEvStoreS3DownloadInfo(TxId, { - ETag, ProcessedBytes, WrittenBytes, WrittenRows, GetChecksumState() + ETag, ProcessedBytes, WrittenBytes, WrittenRows, GetChecksumState(), DownloadState })); return; } @@ -453,11 +542,18 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { } ProcessedBytes = info.ProcessedBytes; + ReadBytes = ProcessedBytes; WrittenBytes = info.WrittenBytes; WrittenRows = info.WrittenRows; + DownloadState = info.DownloadState; if (Checksum) { Checksum->Continue(info.ChecksumState); } + if (TString restoreErr; !Reader->RestoreFromState(DownloadState, restoreErr)) { + const TString error = TStringBuilder() << "Failed to restore reader state: " << restoreErr; + IMPORT_LOG_E(error); + return Finish(false, error); + } if (!ContentLength || ProcessedBytes >= ContentLength) { if (!CheckChecksum()) { @@ -489,7 +585,8 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { << ", content-length# " << ContentLength << ", body-size# " << msg.Body.size()); - Reader->Feed(std::move(msg.Body)); + ReadBytes += msg.Body.size(); + Reader->Feed(std::move(msg.Body), ReadBytes >= ContentLength); Process(); } @@ -528,10 +625,10 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { TString error; switch (Reader->TryGetData(data, error)) { - case TReadController::READY_DATA: + case IReadController::READY_DATA: break; - case TReadController::NOT_ENOUGH_DATA: + case IReadController::NOT_ENOUGH_DATA: if (SumWithSaturation(ProcessedBytes, Reader->PendingBytes()) < ContentLength) { return GetObject(Settings.GetDataKey(DataFormat, CompressionCodec), Reader->NextRange(ContentLength, ProcessedBytes)); @@ -559,7 +656,8 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { WrittenRows += std::exchange(PendingRows, 0); } - Reader->Confirm(); + DownloadState.Clear(); + Reader->Confirm(DownloadState); UploadRows(); } @@ -615,7 +713,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> { << ", size# " << record->ByteSizeLong()); Send(DataShard, new TEvDataShard::TEvS3UploadRowsRequest(TxId, record, { - ETag, ProcessedBytes, WrittenBytes, WrittenRows, GetChecksumState() + ETag, ProcessedBytes, WrittenBytes, WrittenRows, GetChecksumState(), DownloadState })); } @@ -882,14 +980,16 @@ private: TString ETag; ui64 ContentLength = 0; ui64 ProcessedBytes = 0; + ui64 ReadBytes = 0; ui64 WrittenBytes = 0; ui64 WrittenRows = 0; ui64 PendingBytes = 0; ui64 PendingRows = 0; + NKikimrBackup::TS3DownloadState DownloadState; const ui32 ReadBatchSize; const ui64 ReadBufferSizeLimit; - THolder<TReadController> Reader; + THolder<IReadController> Reader; TUploadRowsRequestBuilder RequestBuilder; NBackup::IChecksum::TPtr Checksum; diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema index 330d6a02d4..a0d80aca53 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema @@ -952,11 +952,6 @@ ], "ColumnsAdded": [ { - "ColumnId": 7, - "ColumnName": "ChecksumState", - "ColumnType": "String" - }, - { "ColumnId": 1, "ColumnName": "TxId", "ColumnType": "Uint64" @@ -985,19 +980,30 @@ "ColumnId": 6, "ColumnName": "WrittenRows", "ColumnType": "Uint64" + }, + { + "ColumnId": 7, + "ColumnName": "ChecksumState", + "ColumnType": "String" + }, + { + "ColumnId": 8, + "ColumnName": "DownloadState", + "ColumnType": "String" } ], "ColumnsDropped": [], "ColumnFamilies": { "0": { "Columns": [ - 7, 1, 2, 3, 4, 5, - 6 + 6, + 7, + 8 ], "RoomID": 0, "Codec": 0, |