aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/backup/common/checksum.h2
-rw-r--r--ydb/core/backup/common/encryption.cpp2
-rw-r--r--ydb/core/backup/common/encryption.h2
-rw-r--r--ydb/core/protos/datashard_backup.proto (renamed from ydb/core/protos/checksum.proto)4
-rw-r--r--ydb/core/protos/ya.make2
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h10
-rw-r--r--ydb/core/tx/datashard/datashard_s3_download.h4
-rw-r--r--ydb/core/tx/datashard/datashard_s3_downloads.cpp7
-rw-r--r--ydb/core/tx/datashard/import_s3.cpp132
-rw-r--r--ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema20
10 files changed, 154 insertions, 31 deletions
diff --git a/ydb/core/backup/common/checksum.h b/ydb/core/backup/common/checksum.h
index a647324ff4..c25c9745be 100644
--- a/ydb/core/backup/common/checksum.h
+++ b/ydb/core/backup/common/checksum.h
@@ -1,6 +1,6 @@
#pragma once
-#include <ydb/core/protos/checksum.pb.h>
+#include <ydb/core/protos/datashard_backup.pb.h>
#include <util/generic/string.h>
diff --git a/ydb/core/backup/common/encryption.cpp b/ydb/core/backup/common/encryption.cpp
index 55624c4e2c..cef0dc8c85 100644
--- a/ydb/core/backup/common/encryption.cpp
+++ b/ydb/core/backup/common/encryption.cpp
@@ -826,6 +826,8 @@ TEncryptedFileDeserializer::TEncryptedFileDeserializer()
TEncryptedFileDeserializer::~TEncryptedFileDeserializer() = default;
+TEncryptedFileDeserializer& TEncryptedFileDeserializer::operator=(TEncryptedFileDeserializer&&) = default;
+
void TEncryptedFileDeserializer::AddData(TBuffer data, bool last) {
Impl->AddData(std::move(data), last);
}
diff --git a/ydb/core/backup/common/encryption.h b/ydb/core/backup/common/encryption.h
index 5e7ac585d6..9b78445e78 100644
--- a/ydb/core/backup/common/encryption.h
+++ b/ydb/core/backup/common/encryption.h
@@ -199,6 +199,8 @@ public:
TEncryptedFileDeserializer(TEncryptionKey key, TEncryptionIV expectedIV); // Decrypt file with key. Check that IV in header is equal to expectedIV
~TEncryptedFileDeserializer();
+ TEncryptedFileDeserializer& operator=(TEncryptedFileDeserializer&&);
+
// Adds buffer with input data.
void AddData(TBuffer data, bool last);
diff --git a/ydb/core/protos/checksum.proto b/ydb/core/protos/datashard_backup.proto
index d6f681c6bc..2516244aac 100644
--- a/ydb/core/protos/checksum.proto
+++ b/ydb/core/protos/datashard_backup.proto
@@ -17,3 +17,7 @@ message TChecksumState {
TSha256State Sha256State = 1;
}
}
+
+message TS3DownloadState {
+ optional bytes EncryptedDeserializerState = 1;
+}
diff --git a/ydb/core/protos/ya.make b/ydb/core/protos/ya.make
index 1fc65aca56..c519f7962a 100644
--- a/ydb/core/protos/ya.make
+++ b/ydb/core/protos/ya.make
@@ -31,7 +31,6 @@ SRCS(
bootstrapper.proto
change_exchange.proto
channel_purpose.proto
- checksum.proto
cms.proto
compaction.proto
compile_service_config.proto
@@ -68,6 +67,7 @@ SRCS(
database_basic_sausage_metainfo.proto
datashard_config.proto
datashard_load.proto
+ datashard_backup.proto
db_metadata_cache.proto
drivemodel.proto
export.proto
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index b3b70a2594..c0d4b61c19 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -52,7 +52,7 @@
#include <ydb/core/protos/tx.pb.h>
#include <ydb/core/protos/tx_datashard.pb.h>
#include <ydb/core/protos/subdomains.pb.h>
-#include <ydb/core/protos/checksum.pb.h>
+#include <ydb/core/protos/datashard_backup.pb.h>
#include <ydb/core/protos/counters_datashard.pb.h>
#include <ydb/core/protos/table_stats.pb.h>
@@ -792,6 +792,7 @@ class TDataShard
struct WrittenBytes : Column<5, NScheme::NTypeIds::Uint64> {};
struct WrittenRows : Column<6, NScheme::NTypeIds::Uint64> {};
struct ChecksumState : Column<7, NScheme::NTypeIds::String> { using Type = NKikimrBackup::TChecksumState; };
+ struct DownloadState : Column<8, NScheme::NTypeIds::String> { using Type = NKikimrBackup::TS3DownloadState; };
using TKey = TableKey<TxId>;
using TColumns = TableColumns<
@@ -801,7 +802,8 @@ class TDataShard
ProcessedBytes,
WrittenBytes,
WrittenRows,
- ChecksumState
+ ChecksumState,
+ DownloadState
>;
};
@@ -3395,14 +3397,14 @@ protected:
}
for (const auto& pi : SysTablesPartOwners) {
ev->Record.AddSysTablesPartOwners(pi);
- }
+ }
}
ev->Record.MutableTableStats()->SetImmediateTxCompleted(TabletCounters->Cumulative()[COUNTER_PREPARE_IMMEDIATE].Get() + TabletCounters->Cumulative()[COUNTER_WRITE_IMMEDIATE].Get());
ev->Record.MutableTableStats()->SetPlannedTxCompleted(TabletCounters->Cumulative()[COUNTER_PLANNED_TX_COMPLETE].Get());
ev->Record.MutableTableStats()->SetTxRejectedByOverload(TabletCounters->Cumulative()[COUNTER_PREPARE_OVERLOADED].Get() + TabletCounters->Cumulative()[COUNTER_WRITE_OVERLOADED].Get());
ev->Record.MutableTableStats()->SetTxRejectedBySpace(
- TabletCounters->Cumulative()[COUNTER_PREPARE_OUT_OF_SPACE].Get()
+ TabletCounters->Cumulative()[COUNTER_PREPARE_OUT_OF_SPACE].Get()
+ TabletCounters->Cumulative()[COUNTER_PREPARE_DISK_SPACE_EXHAUSTED].Get()
+ TabletCounters->Cumulative()[COUNTER_WRITE_OUT_OF_SPACE].Get()
+ TabletCounters->Cumulative()[COUNTER_WRITE_DISK_SPACE_EXHAUSTED].Get()
diff --git a/ydb/core/tx/datashard/datashard_s3_download.h b/ydb/core/tx/datashard/datashard_s3_download.h
index d75e41cce1..63566699cf 100644
--- a/ydb/core/tx/datashard/datashard_s3_download.h
+++ b/ydb/core/tx/datashard/datashard_s3_download.h
@@ -1,6 +1,6 @@
#pragma once
-#include <ydb/core/protos/checksum.pb.h>
+#include <ydb/core/protos/datashard_backup.pb.h>
#include <util/generic/maybe.h>
@@ -13,6 +13,7 @@ struct TS3Download {
ui64 WrittenBytes = 0;
ui64 WrittenRows = 0;
NKikimrBackup::TChecksumState ChecksumState;
+ NKikimrBackup::TS3DownloadState DownloadState;
void Out(IOutputStream& out) const {
out << "{"
@@ -21,6 +22,7 @@ struct TS3Download {
<< " WrittenBytes: " << WrittenBytes
<< " WrittenRows: " << WrittenRows
<< " ChecksumState: " << ChecksumState.ShortDebugString()
+ << " DownloadState: " << DownloadState.ShortDebugString()
<< " }";
}
};
diff --git a/ydb/core/tx/datashard/datashard_s3_downloads.cpp b/ydb/core/tx/datashard/datashard_s3_downloads.cpp
index 49963761b9..d77b418360 100644
--- a/ydb/core/tx/datashard/datashard_s3_downloads.cpp
+++ b/ydb/core/tx/datashard/datashard_s3_downloads.cpp
@@ -28,6 +28,10 @@ bool TS3DownloadsManager::Load(NIceDb::TNiceDb& db) {
info.ChecksumState = rowset.GetValue<Schema::S3Downloads::ChecksumState>();
}
+ if (rowset.HaveValue<Schema::S3Downloads::DownloadState>()) {
+ info.DownloadState = rowset.GetValue<Schema::S3Downloads::DownloadState>();
+ }
+
if (!rowset.Next()) {
ready = false;
break;
@@ -61,7 +65,8 @@ const TS3Download& TS3DownloadsManager::Store(NIceDb::TNiceDb& db, ui64 txId, co
NIceDb::TUpdate<Schema::S3Downloads::ProcessedBytes>(newInfo.ProcessedBytes),
NIceDb::TUpdate<Schema::S3Downloads::WrittenBytes>(newInfo.WrittenBytes),
NIceDb::TUpdate<Schema::S3Downloads::WrittenRows>(newInfo.WrittenRows),
- NIceDb::TUpdate<Schema::S3Downloads::ChecksumState>(newInfo.ChecksumState)
+ NIceDb::TUpdate<Schema::S3Downloads::ChecksumState>(newInfo.ChecksumState),
+ NIceDb::TUpdate<Schema::S3Downloads::DownloadState>(newInfo.DownloadState)
);
return info;
diff --git a/ydb/core/tx/datashard/import_s3.cpp b/ydb/core/tx/datashard/import_s3.cpp
index 9cc842303e..02dc4f42dd 100644
--- a/ydb/core/tx/datashard/import_s3.cpp
+++ b/ydb/core/tx/datashard/import_s3.cpp
@@ -7,6 +7,7 @@
#include "import_s3.h"
#include <ydb/core/backup/common/checksum.h>
+#include <ydb/core/backup/common/encryption.h>
#include <ydb/core/base/appdata.h>
#include <ydb/core/protos/datashard_config.pb.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>
@@ -68,13 +69,17 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
public:
virtual ~IReadController() = default;
- virtual void Feed(TString&& portion) = 0;
- // Returns data (view to internal buffer) or error
+ virtual void Feed(TString&& portion, bool last) = 0;
+ // Returns data (points to internal buffer) or error
virtual EDataStatus TryGetData(TStringBuf& data, TString& error) = 0;
- // Clear internal buffer & makes it ready for another Feed() and TryGetData()
- virtual void Confirm() = 0;
+ // Clears internal buffer & makes it ready for another Feed() and TryGetData()
+ virtual void Confirm(NKikimrBackup::TS3DownloadState& state) = 0;
+ // Bytes that were read from S3 and put into controller. In terms of input bytes
virtual ui64 PendingBytes() const = 0;
+ // Bytes that controller has given to processing. In terms of input bytes
virtual ui64 ReadyBytes() const = 0;
+ virtual std::pair<ui64, ui64> NextRange(ui64 contentLength, ui64 processedBytes) const = 0;
+ virtual bool RestoreFromState(const NKikimrBackup::TS3DownloadState& state, TString& error) = 0;
};
class TReadController: public IReadController {
@@ -87,7 +92,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
Buffer.Reserve(RangeSize);
}
- std::pair<ui64, ui64> NextRange(ui64 contentLength, ui64 processedBytes) const {
+ std::pair<ui64, ui64> NextRange(ui64 contentLength, ui64 processedBytes) const override {
Y_ABORT_UNLESS(contentLength > 0);
Y_ABORT_UNLESS(processedBytes < contentLength);
@@ -96,6 +101,10 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
return std::make_pair(start, end);
}
+ bool RestoreFromState(const NKikimrBackup::TS3DownloadState&, TString&) override {
+ return true;
+ }
+
protected:
bool CanIncreaseBuffer(size_t size, size_t delta, TString& reason) const {
if ((size + delta) >= BufferSizeLimit) {
@@ -127,7 +136,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
public:
using TReadController::TReadController;
- void Feed(TString&& portion) override {
+ void Feed(TString&& portion, bool /* last */) override {
Buffer.Append(portion.data(), portion.size());
}
@@ -149,7 +158,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
return READY_DATA;
}
- void Confirm() override {
+ void Confirm(NKikimrBackup::TS3DownloadState&) override {
Buffer.ChopHead(Pos);
Pos = 0;
}
@@ -183,7 +192,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
Buffer.Reserve(AppData()->ZstdBlockSizeForTest.GetOrElse(ZSTD_BLOCKSIZE_MAX));
}
- void Feed(TString&& portion) override {
+ void Feed(TString&& portion, bool /* last */) override {
Y_ABORT_UNLESS(Portion.Empty());
Portion.Assign(portion.data(), portion.size());
}
@@ -253,7 +262,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
return READY_DATA;
}
- void Confirm() override {
+ void Confirm(NKikimrBackup::TS3DownloadState&) override {
Buffer.ChopHead(ReadyOutputPos);
ReadyOutputPos = 0;
@@ -277,6 +286,86 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
ui64 ReadyOutputPos = 0;
};
+ class TEncryptionDeserializerController: public IReadController {
+ public:
+ TEncryptionDeserializerController(
+ NBackup::TEncryptionKey key,
+ NBackup::TEncryptionIV expectedIV,
+ THolder<IReadController> deserializedDataController)
+ : Deserializer(std::move(key), std::move(expectedIV))
+ , DataController(std::move(deserializedDataController))
+ {
+ }
+
+ void Feed(TString&& portion, bool last) override {
+ Last = last;
+ FeedUnprocessedBytes += portion.size();
+ Deserializer.AddData(TBuffer(portion.data(), portion.size()), last);
+ }
+
+ EDataStatus TryGetData(TStringBuf& data, TString& error) override {
+ try {
+ const ui64 processedBefore = Deserializer.GetProcessedInputBytes();
+ TMaybe<TBuffer> block = Deserializer.GetNextBlock(); // Returns at least one encrypted block
+ if (block) {
+ const ui64 processedAfter = Deserializer.GetProcessedInputBytes();
+ Y_ABORT_UNLESS(processedAfter - processedBefore <= FeedUnprocessedBytes);
+ // Data is read by blocks from encrypted file.
+ // Each block contains at least one row of data with '\n',
+ // so we will always get some data from DataController.
+ ReadyInputBytes += processedAfter - processedBefore;
+ DataController->Feed(TString(block->Data(), block->Size()), Last);
+ const EDataStatus status = DataController->TryGetData(data, error);
+ Y_ABORT_UNLESS(status == READY_DATA);
+ return status;
+ } else {
+ return NOT_ENOUGH_DATA;
+ }
+ } catch (const std::exception& ex) {
+ error = ex.what();
+ return ERROR;
+ }
+ }
+
+ void Confirm(NKikimrBackup::TS3DownloadState& state) override {
+ state.SetEncryptedDeserializerState(Deserializer.GetState());
+ FeedUnprocessedBytes -= ReadyInputBytes;
+ ReadyInputBytes = 0;
+ return DataController->Confirm(state);
+ }
+
+ ui64 PendingBytes() const override {
+ return FeedUnprocessedBytes;
+ }
+
+ ui64 ReadyBytes() const override {
+ return ReadyInputBytes;
+ }
+
+ std::pair<ui64, ui64> NextRange(ui64 contentLength, ui64 processedBytes) const override {
+ return DataController->NextRange(contentLength, processedBytes);
+ }
+
+ bool RestoreFromState(const NKikimrBackup::TS3DownloadState& state, TString& error) override {
+ if (const TString& deserializerState = state.GetEncryptedDeserializerState()) {
+ try {
+ Deserializer = NBackup::TEncryptedFileDeserializer::RestoreFromState(deserializerState);
+ } catch (const std::exception& ex) {
+ error = ex.what();
+ return false;
+ }
+ }
+ return DataController->RestoreFromState(state, error);
+ }
+
+ private:
+ bool Last = false;
+ ui64 FeedUnprocessedBytes = 0;
+ ui64 ReadyInputBytes = 0;
+ NBackup::TEncryptedFileDeserializer Deserializer;
+ THolder<IReadController> DataController;
+ };
+
class TUploadRowsRequestBuilder {
public:
void New(const TTableInfo& tableInfo, const NKikimrSchemeOp::TTableDescription& scheme) {
@@ -435,7 +524,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
const auto& info = ev->Get()->Info;
if (!info.DataETag) {
Send(DataShard, new TEvDataShard::TEvStoreS3DownloadInfo(TxId, {
- ETag, ProcessedBytes, WrittenBytes, WrittenRows, GetChecksumState()
+ ETag, ProcessedBytes, WrittenBytes, WrittenRows, GetChecksumState(), DownloadState
}));
return;
}
@@ -453,11 +542,18 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
}
ProcessedBytes = info.ProcessedBytes;
+ ReadBytes = ProcessedBytes;
WrittenBytes = info.WrittenBytes;
WrittenRows = info.WrittenRows;
+ DownloadState = info.DownloadState;
if (Checksum) {
Checksum->Continue(info.ChecksumState);
}
+ if (TString restoreErr; !Reader->RestoreFromState(DownloadState, restoreErr)) {
+ const TString error = TStringBuilder() << "Failed to restore reader state: " << restoreErr;
+ IMPORT_LOG_E(error);
+ return Finish(false, error);
+ }
if (!ContentLength || ProcessedBytes >= ContentLength) {
if (!CheckChecksum()) {
@@ -489,7 +585,8 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
<< ", content-length# " << ContentLength
<< ", body-size# " << msg.Body.size());
- Reader->Feed(std::move(msg.Body));
+ ReadBytes += msg.Body.size();
+ Reader->Feed(std::move(msg.Body), ReadBytes >= ContentLength);
Process();
}
@@ -528,10 +625,10 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
TString error;
switch (Reader->TryGetData(data, error)) {
- case TReadController::READY_DATA:
+ case IReadController::READY_DATA:
break;
- case TReadController::NOT_ENOUGH_DATA:
+ case IReadController::NOT_ENOUGH_DATA:
if (SumWithSaturation(ProcessedBytes, Reader->PendingBytes()) < ContentLength) {
return GetObject(Settings.GetDataKey(DataFormat, CompressionCodec),
Reader->NextRange(ContentLength, ProcessedBytes));
@@ -559,7 +656,8 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
WrittenRows += std::exchange(PendingRows, 0);
}
- Reader->Confirm();
+ DownloadState.Clear();
+ Reader->Confirm(DownloadState);
UploadRows();
}
@@ -615,7 +713,7 @@ class TS3Downloader: public TActorBootstrapped<TS3Downloader> {
<< ", size# " << record->ByteSizeLong());
Send(DataShard, new TEvDataShard::TEvS3UploadRowsRequest(TxId, record, {
- ETag, ProcessedBytes, WrittenBytes, WrittenRows, GetChecksumState()
+ ETag, ProcessedBytes, WrittenBytes, WrittenRows, GetChecksumState(), DownloadState
}));
}
@@ -882,14 +980,16 @@ private:
TString ETag;
ui64 ContentLength = 0;
ui64 ProcessedBytes = 0;
+ ui64 ReadBytes = 0;
ui64 WrittenBytes = 0;
ui64 WrittenRows = 0;
ui64 PendingBytes = 0;
ui64 PendingRows = 0;
+ NKikimrBackup::TS3DownloadState DownloadState;
const ui32 ReadBatchSize;
const ui64 ReadBufferSizeLimit;
- THolder<TReadController> Reader;
+ THolder<IReadController> Reader;
TUploadRowsRequestBuilder RequestBuilder;
NBackup::IChecksum::TPtr Checksum;
diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema
index 330d6a02d4..a0d80aca53 100644
--- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema
+++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_datashard_/flat_datashard.schema
@@ -952,11 +952,6 @@
],
"ColumnsAdded": [
{
- "ColumnId": 7,
- "ColumnName": "ChecksumState",
- "ColumnType": "String"
- },
- {
"ColumnId": 1,
"ColumnName": "TxId",
"ColumnType": "Uint64"
@@ -985,19 +980,30 @@
"ColumnId": 6,
"ColumnName": "WrittenRows",
"ColumnType": "Uint64"
+ },
+ {
+ "ColumnId": 7,
+ "ColumnName": "ChecksumState",
+ "ColumnType": "String"
+ },
+ {
+ "ColumnId": 8,
+ "ColumnName": "DownloadState",
+ "ColumnType": "String"
}
],
"ColumnsDropped": [],
"ColumnFamilies": {
"0": {
"Columns": [
- 7,
1,
2,
3,
4,
5,
- 6
+ 6,
+ 7,
+ 8
],
"RoomID": 0,
"Codec": 0,