aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-04-07 12:03:23 +0300
committerilnaz <ilnaz@ydb.tech>2023-04-07 12:03:23 +0300
commita99d2644ac460383617c0eb6fb5a6b3cf7e43048 (patch)
treea258a1b9c41331f30b5b92f4cad65a46b8e59099
parent6304f31413c0cc8f2bcecc97ae6c76fccbb8051c (diff)
downloadydb-a99d2644ac460383617c0eb6fb5a6b3cf7e43048.tar.gz
Restart backup operation after an aborted scan
-rw-r--r--ydb/core/tx/datashard/backup_restore_common.h11
-rw-r--r--ydb/core/tx/datashard/backup_unit.cpp33
-rw-r--r--ydb/core/tx/datashard/datashard.cpp17
-rw-r--r--ydb/core/tx/datashard/datashard.h4
-rw-r--r--ydb/core/tx/datashard/datashard_impl.h13
-rw-r--r--ydb/core/tx/datashard/export_scan.cpp18
-rw-r--r--ydb/core/tx/datashard/export_scan.h12
-rw-r--r--ydb/core/tx/datashard/operation.h5
-rw-r--r--ydb/core/tx/datashard/restore_unit.cpp4
-rw-r--r--ydb/core/tx/schemeshard/ut_export.cpp121
10 files changed, 178 insertions, 60 deletions
diff --git a/ydb/core/tx/datashard/backup_restore_common.h b/ydb/core/tx/datashard/backup_restore_common.h
index 06571ca2256..5c4a440a1f3 100644
--- a/ydb/core/tx/datashard/backup_restore_common.h
+++ b/ydb/core/tx/datashard/backup_restore_common.h
@@ -22,7 +22,7 @@ protected:
virtual bool Run(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) = 0;
virtual bool HasResult(TOperation::TPtr op) const = 0;
- virtual void ProcessResult(TOperation::TPtr op, const TActorContext& ctx) = 0;
+ virtual bool ProcessResult(TOperation::TPtr op, const TActorContext& ctx) = 0;
virtual void Cancel(TActiveTransaction* tx, const TActorContext& ctx) = 0;
@@ -106,9 +106,14 @@ public:
LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "" << GetKind() << " complete"
<< " at " << DataShard.TabletID());
- ProcessResult(op, ctx);
- PersistResult(op, txc);
ResetWaiting(op);
+ if (ProcessResult(op, ctx)) {
+ PersistResult(op, txc);
+ } else {
+ Y_VERIFY_DEBUG(!HasResult(op));
+ op->SetWaitingForRestartFlag();
+ ctx.Schedule(TDuration::Seconds(1), new TDataShard::TEvPrivate::TEvRestartOperation(op->GetTxId()));
+ }
}
while (op->HasPendingInputEvents()) {
diff --git a/ydb/core/tx/datashard/backup_unit.cpp b/ydb/core/tx/datashard/backup_unit.cpp
index 7ea23befc9f..0c00fc6cd4b 100644
--- a/ydb/core/tx/datashard/backup_unit.cpp
+++ b/ydb/core/tx/datashard/backup_unit.cpp
@@ -1,7 +1,7 @@
-#include "export_iface.h"
#include "backup_restore_common.h"
#include "backup_restore_traits.h"
#include "execution_unit_ctors.h"
+#include "export_iface.h"
#include "export_scan.h"
#include "export_s3.h"
@@ -17,7 +17,7 @@ protected:
}
bool IsWaiting(TOperation::TPtr op) const override {
- return op->IsWaitingForScan();
+ return op->IsWaitingForScan() || op->IsWaitingForRestart();
}
void SetWaiting(TOperation::TPtr op) override {
@@ -26,6 +26,7 @@ protected:
void ResetWaiting(TOperation::TPtr op) override {
op->ResetWaitingForScanFlag();
+ op->ResetWaitingForRestartFlag();
}
bool Run(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) override {
@@ -110,20 +111,34 @@ protected:
return op->HasScanResult();
}
- void ProcessResult(TOperation::TPtr op, const TActorContext&) override {
+ bool ProcessResult(TOperation::TPtr op, const TActorContext&) override {
TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get());
Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind());
auto* result = CheckedCast<TExportScanProduct*>(op->ScanResult().Get());
- auto* schemeOp = DataShard.FindSchemaTx(op->GetTxId());
-
- schemeOp->Success = result->Success;
- schemeOp->Error = std::move(result->Error);
- schemeOp->BytesProcessed = result->BytesRead;
- schemeOp->RowsProcessed = result->RowsRead;
+ bool done = true;
+
+ switch (result->Outcome) {
+ case EExportOutcome::Success:
+ case EExportOutcome::Error:
+ if (auto* schemeOp = DataShard.FindSchemaTx(op->GetTxId())) {
+ schemeOp->Success = result->Outcome == EExportOutcome::Success;
+ schemeOp->Error = std::move(result->Error);
+ schemeOp->BytesProcessed = result->BytesRead;
+ schemeOp->RowsProcessed = result->RowsRead;
+ } else {
+ Y_FAIL_S("Cannot find schema tx: " << op->GetTxId());
+ }
+ break;
+ case EExportOutcome::Aborted:
+ done = false;
+ break;
+ }
op->SetScanResult(nullptr);
tx->SetScanTask(0);
+
+ return done;
}
void Cancel(TActiveTransaction* tx, const TActorContext&) override {
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index a40d95de94f..3f1c5fab01e 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -3827,6 +3827,23 @@ void TDataShard::Handle(TEvPrivate::TEvAsyncJobComplete::TPtr &ev, const TActorC
PlanQueue.Progress(ctx);
}
+void TDataShard::Handle(TEvPrivate::TEvRestartOperation::TPtr &ev, const TActorContext &ctx) {
+ const auto txId = ev->Get()->TxId;
+
+ if (auto op = Pipeline.FindOp(txId)) {
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Restart op: " << txId
+ << " at " << TabletID());
+
+ if (op->IsWaitingForRestart()) {
+ op->ResetWaitingForRestartFlag();
+ Pipeline.AddCandidateOp(op);
+ }
+ }
+
+ // Continue current Tx
+ PlanQueue.Progress(ctx);
+}
+
bool TDataShard::ReassignChannelsEnabled() const {
return true;
}
diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h
index 416ef9ff4c5..4b13c4c6584 100644
--- a/ydb/core/tx/datashard/datashard.h
+++ b/ydb/core/tx/datashard/datashard.h
@@ -124,8 +124,10 @@ namespace NDataShard {
WaitCompletion = 1ULL << 44,
// Waiting for global tx id allocation
WaitingForGlobalTxId = 1ULL << 45,
+ // Operation is waiting for restart
+ WaitingForRestart = 1ULL << 46,
- LastFlag = WaitCompletion,
+ LastFlag = WaitingForRestart,
PrivateFlagsMask = 0xFFFFFFFFFFFF0000ULL,
PreservedPrivateFlagsMask = ReadOnly | ProposeBlocker | NeedDiagnostics | GlobalReader
diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h
index 04124acae01..27168c0746c 100644
--- a/ydb/core/tx/datashard/datashard_impl.h
+++ b/ydb/core/tx/datashard/datashard_impl.h
@@ -284,6 +284,7 @@ class TDataShard
friend class TS3UploadsManager;
friend class TS3DownloadsManager;
friend class TS3Downloader;
+ template <typename T> friend class TBackupRestoreUnitBase;
friend struct TSetupSysLocks;
friend class TDataShardLocksDb;
@@ -334,6 +335,7 @@ class TDataShard
EvCdcStreamScanRegistered,
EvCdcStreamScanProgress,
EvCdcStreamScanContinue,
+ EvRestartOperation, // used to restart after an aborted scan (e.g. backup)
EvEnd
};
@@ -505,6 +507,15 @@ class TDataShard
};
struct TEvCdcStreamScanContinue : public TEventLocal<TEvCdcStreamScanContinue, EvCdcStreamScanContinue> {};
+
+ struct TEvRestartOperation : public TEventLocal<TEvRestartOperation, EvRestartOperation> {
+ explicit TEvRestartOperation(ui64 txId)
+ : TxId(txId)
+ {
+ }
+
+ const ui64 TxId;
+ };
};
struct Schema : NIceDb::Schema {
@@ -1194,6 +1205,7 @@ class TDataShard
void Handle(TEvPrivate::TEvCdcStreamScanRegistered::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvCdcStreamScanProgress::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvAsyncJobComplete::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPrivate::TEvRestartOperation::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvCancelBackup::TPtr &ev, const TActorContext &ctx);
void Handle(TEvDataShard::TEvCancelRestore::TPtr &ev, const TActorContext &ctx);
@@ -2757,6 +2769,7 @@ protected:
HFunc(TEvPrivate::TEvCdcStreamScanRegistered, Handle);
HFunc(TEvPrivate::TEvCdcStreamScanProgress, Handle);
HFunc(TEvPrivate::TEvAsyncJobComplete, Handle);
+ HFunc(TEvPrivate::TEvRestartOperation, Handle);
HFunc(TEvPrivate::TEvPeriodicWakeup, DoPeriodicTasks);
HFunc(TEvents::TEvUndelivered, Handle);
IgnoreFunc(TEvInterconnect::TEvNodeConnected);
diff --git a/ydb/core/tx/datashard/export_scan.cpp b/ydb/core/tx/datashard/export_scan.cpp
index 0538e88b856..51d9484aabc 100644
--- a/ydb/core/tx/datashard/export_scan.cpp
+++ b/ydb/core/tx/datashard/export_scan.cpp
@@ -217,15 +217,23 @@ public:
}
TAutoPtr<IDestructable> Finish(EAbort abort) noexcept override {
- const bool success = (abort == EAbort::None) && Success;
-
+ auto outcome = EExportOutcome::Success;
if (abort != EAbort::None) {
- Error = "Aborted by scan host env";
- Send(Uploader, new TEvents::TEvPoisonPill());
+ outcome = EExportOutcome::Aborted;
+ } else if (!Success) {
+ outcome = EExportOutcome::Error;
}
PassAway();
- return new TExportScanProduct(success, Error, Stats->BytesRead, Stats->Rows);
+ return new TExportScanProduct(outcome, Error, Stats->BytesRead, Stats->Rows);
+ }
+
+ void PassAway() override {
+ if (const auto& actorId = std::exchange(Uploader, {})) {
+ Send(actorId, new TEvents::TEvPoisonPill());
+ }
+
+ IActorCallback::PassAway();
}
STATEFN(StateWork) {
diff --git a/ydb/core/tx/datashard/export_scan.h b/ydb/core/tx/datashard/export_scan.h
index 9d983be5954..1873795255e 100644
--- a/ydb/core/tx/datashard/export_scan.h
+++ b/ydb/core/tx/datashard/export_scan.h
@@ -70,14 +70,20 @@ struct TEvExportScan {
}; // TEvExportScan
+enum class EExportOutcome {
+ Success,
+ Error,
+ Aborted,
+};
+
struct TExportScanProduct: public IDestructable {
- bool Success;
+ EExportOutcome Outcome;
TString Error;
ui64 BytesRead;
ui64 RowsRead;
- explicit TExportScanProduct(bool success, TString error, ui64 bytes, ui64 rows)
- : Success(success)
+ explicit TExportScanProduct(EExportOutcome outcome, TString error, ui64 bytes, ui64 rows)
+ : Outcome(outcome)
, Error(std::move(error))
, BytesRead(bytes)
, RowsRead(rows)
diff --git a/ydb/core/tx/datashard/operation.h b/ydb/core/tx/datashard/operation.h
index 1569832efbb..c351891e574 100644
--- a/ydb/core/tx/datashard/operation.h
+++ b/ydb/core/tx/datashard/operation.h
@@ -342,6 +342,11 @@ public:
void ResetWaitingForSnapshotFlag() { ResetFlag(TTxFlags::WaitingForSnapshot); }
bool IsWaitingForSnapshot() const { return HasWaitingForSnapshotFlag(); }
+ bool HasWaitingForRestartFlag() const { return HasFlag(TTxFlags::WaitingForRestart); }
+ void SetWaitingForRestartFlag(bool val = true) { SetFlag(TTxFlags::WaitingForRestart, val); }
+ void ResetWaitingForRestartFlag() { ResetFlag(TTxFlags::WaitingForRestart); }
+ bool IsWaitingForRestart() const { return HasWaitingForRestartFlag(); }
+
bool HasResultSentFlag() const { return HasFlag(TTxFlags::ResultSent); }
void SetResultSentFlag(bool val = true) { SetFlag(TTxFlags::ResultSent, val); }
diff --git a/ydb/core/tx/datashard/restore_unit.cpp b/ydb/core/tx/datashard/restore_unit.cpp
index 6f80c11bcdf..03ca603f5ca 100644
--- a/ydb/core/tx/datashard/restore_unit.cpp
+++ b/ydb/core/tx/datashard/restore_unit.cpp
@@ -60,7 +60,7 @@ protected:
return op->HasAsyncJobResult();
}
- void ProcessResult(TOperation::TPtr op, const TActorContext&) override {
+ bool ProcessResult(TOperation::TPtr op, const TActorContext&) override {
TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get());
Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind());
@@ -74,6 +74,8 @@ protected:
op->SetAsyncJobResult(nullptr);
tx->SetAsyncJobActor(TActorId());
+
+ return true;
}
void Cancel(TActiveTransaction* tx, const TActorContext& ctx) override {
diff --git a/ydb/core/tx/schemeshard/ut_export.cpp b/ydb/core/tx/schemeshard/ut_export.cpp
index 0609751f617..86d98ab389d 100644
--- a/ydb/core/tx/schemeshard/ut_export.cpp
+++ b/ydb/core/tx/schemeshard/ut_export.cpp
@@ -1,3 +1,4 @@
+#include <ydb/core/tablet_flat/shared_cache_events.h>
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
#include <ydb/core/tx/schemeshard/schemeshard_billing_helpers.h>
#include <ydb/core/tx/datashard/datashard.h>
@@ -157,6 +158,21 @@ namespace {
TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::NOT_FOUND);
}
+ void WriteRow(TTestActorRuntime& runtime, ui64 tabletId, const TString& key, const TString& value) {
+ NKikimrMiniKQL::TResult result;
+ TString error;
+ NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, Sprintf(R"(
+ (
+ (let key '( '('key (Utf8 '%s) ) ) )
+ (let row '( '('value (Utf8 '%s) ) ) )
+ (return (AsList (UpdateRow '__user__Table key row) ))
+ )
+ )", key.c_str(), value.c_str()), result, error);
+
+ UNIT_ASSERT_VALUES_EQUAL_C(status, NKikimrProto::EReplyStatus::OK, error);
+ UNIT_ASSERT_VALUES_EQUAL(error, "");
+ }
+
} // anonymous
Y_UNIT_TEST_SUITE(TExportToS3Tests) {
@@ -667,21 +683,6 @@ partitioning_settings {
TTestEnv env(runtime);
ui64 txId = 100;
- auto writeRow = [&](ui64 tabletId, const char* key, const char* value) {
- NKikimrMiniKQL::TResult result;
- TString error;
- NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, Sprintf(R"(
- (
- (let key '( '('key (Utf8 '%s) ) ) )
- (let row '( '('value (Utf8 '%s) ) ) )
- (return (AsList (UpdateRow '__user__Table key row) ))
- )
- )", key, value), result, error);
-
- UNIT_ASSERT_VALUES_EQUAL_C(status, NKikimrProto::EReplyStatus::OK, error);
- UNIT_ASSERT_VALUES_EQUAL(error, "");
- };
-
TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
@@ -690,8 +691,8 @@ partitioning_settings {
)");
env.TestWaitNotification(runtime, txId);
- writeRow(TTestTxConfig::FakeHiveTablets, "a", "valueA");
- writeRow(TTestTxConfig::FakeHiveTablets, "b", "valueB");
+ WriteRow(runtime, TTestTxConfig::FakeHiveTablets, "a", "valueA");
+ WriteRow(runtime, TTestTxConfig::FakeHiveTablets, "b", "valueB");
runtime.SetLogPriority(NKikimrServices::S3_WRAPPER, NActors::NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE);
@@ -795,28 +796,9 @@ partitioning_settings {
Y_UNIT_TEST(ShouldExcludeBackupTableFromStats) {
TTestBasicRuntime runtime;
- TTestEnvOptions opts;
- opts.DisableStatsBatching(true);
-
- TTestEnv env(runtime, opts);
-
+ TTestEnv env(runtime, TTestEnvOptions().DisableStatsBatching(true));
ui64 txId = 100;
- auto writeRow = [&](ui64 tabletId, const TString& key, const TString& value) {
- NKikimrMiniKQL::TResult result;
- TString error;
- NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, Sprintf(R"(
- (
- (let key '( '('key (Utf8 '%s) ) ) )
- (let row '( '('value (Utf8 '%s) ) ) )
- (return (AsList (UpdateRow '__user__Table key row) ))
- )
- )", key.c_str(), value.c_str()), result, error);
-
- UNIT_ASSERT_VALUES_EQUAL_C(status, NKikimrProto::EReplyStatus::OK, error);
- UNIT_ASSERT_VALUES_EQUAL(error, "");
- };
-
THashSet<ui64> statsCollected;
runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
if (ev->GetTypeRewrite() == TEvDataShard::EvPeriodicTableStats) {
@@ -850,7 +832,7 @@ partitioning_settings {
env.TestWaitNotification(runtime, txId);
for (int i = 1; i < 500; ++i) {
- writeRow(TTestTxConfig::FakeHiveTablets, Sprintf("a%i", i), "value");
+ WriteRow(runtime, TTestTxConfig::FakeHiveTablets, Sprintf("a%i", i), "value");
}
// trigger memtable's compaction
@@ -933,4 +915,67 @@ partitioning_settings {
UNIT_ASSERT(item.has_start_time());
UNIT_ASSERT(item.has_end_time());
}
+
+ Y_UNIT_TEST(ShouldRestartOnScanErrors) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+ ui64 txId = 100;
+
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Utf8" }
+ Columns { Name: "value" Type: "Utf8" }
+ KeyColumnNames: ["key"]
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ WriteRow(runtime, TTestTxConfig::FakeHiveTablets, "a", "valueA");
+
+ TPortManager portManager;
+ const ui16 port = portManager.GetPort();
+
+ TS3Mock s3Mock({}, TS3Mock::TSettings(port));
+ UNIT_ASSERT(s3Mock.Start());
+
+ THolder<IEventHandle> injectResult;
+ auto prevObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ if (ev->GetTypeRewrite() == NSharedCache::EvResult) {
+ const auto* msg = ev->Get<NSharedCache::TEvResult>();
+ UNIT_ASSERT_VALUES_EQUAL(msg->Status, NKikimrProto::OK);
+
+ auto result = MakeHolder<NSharedCache::TEvResult>(msg->Origin, msg->Cookie, NKikimrProto::ERROR);
+ std::move(msg->Loaded.begin(), msg->Loaded.end(), std::back_inserter(result->Loaded));
+
+ injectResult = MakeHolder<IEventHandle>(ev->Recipient, ev->Sender, result.Release(), ev->Flags, ev->Cookie);
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ ExportToS3Settings {
+ endpoint: "localhost:%d"
+ scheme: HTTP
+ items {
+ source_path: "/MyRoot/Table"
+ destination_prefix: ""
+ }
+ }
+ )", port));
+
+ if (!injectResult) {
+ TDispatchOptions opts;
+ opts.FinalEvents.emplace_back([&injectResult](IEventHandle&) -> bool {
+ return bool(injectResult);
+ });
+ runtime.DispatchEvents(opts);
+ }
+
+ runtime.SetObserverFunc(prevObserver);
+ runtime.Send(injectResult.Release(), 0, true);
+
+ env.TestWaitNotification(runtime, txId);
+ TestGetExport(runtime, txId, "/MyRoot", Ydb::StatusIds::SUCCESS);
+ }
}