diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-04-07 12:03:23 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-04-07 12:03:23 +0300 |
commit | a99d2644ac460383617c0eb6fb5a6b3cf7e43048 (patch) | |
tree | a258a1b9c41331f30b5b92f4cad65a46b8e59099 | |
parent | 6304f31413c0cc8f2bcecc97ae6c76fccbb8051c (diff) | |
download | ydb-a99d2644ac460383617c0eb6fb5a6b3cf7e43048.tar.gz |
Restart backup operation after an aborted scan
-rw-r--r-- | ydb/core/tx/datashard/backup_restore_common.h | 11 | ||||
-rw-r--r-- | ydb/core/tx/datashard/backup_unit.cpp | 33 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard.cpp | 17 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard.h | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_impl.h | 13 | ||||
-rw-r--r-- | ydb/core/tx/datashard/export_scan.cpp | 18 | ||||
-rw-r--r-- | ydb/core/tx/datashard/export_scan.h | 12 | ||||
-rw-r--r-- | ydb/core/tx/datashard/operation.h | 5 | ||||
-rw-r--r-- | ydb/core/tx/datashard/restore_unit.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_export.cpp | 121 |
10 files changed, 178 insertions, 60 deletions
diff --git a/ydb/core/tx/datashard/backup_restore_common.h b/ydb/core/tx/datashard/backup_restore_common.h index 06571ca2256..5c4a440a1f3 100644 --- a/ydb/core/tx/datashard/backup_restore_common.h +++ b/ydb/core/tx/datashard/backup_restore_common.h @@ -22,7 +22,7 @@ protected: virtual bool Run(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) = 0; virtual bool HasResult(TOperation::TPtr op) const = 0; - virtual void ProcessResult(TOperation::TPtr op, const TActorContext& ctx) = 0; + virtual bool ProcessResult(TOperation::TPtr op, const TActorContext& ctx) = 0; virtual void Cancel(TActiveTransaction* tx, const TActorContext& ctx) = 0; @@ -106,9 +106,14 @@ public: LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "" << GetKind() << " complete" << " at " << DataShard.TabletID()); - ProcessResult(op, ctx); - PersistResult(op, txc); ResetWaiting(op); + if (ProcessResult(op, ctx)) { + PersistResult(op, txc); + } else { + Y_VERIFY_DEBUG(!HasResult(op)); + op->SetWaitingForRestartFlag(); + ctx.Schedule(TDuration::Seconds(1), new TDataShard::TEvPrivate::TEvRestartOperation(op->GetTxId())); + } } while (op->HasPendingInputEvents()) { diff --git a/ydb/core/tx/datashard/backup_unit.cpp b/ydb/core/tx/datashard/backup_unit.cpp index 7ea23befc9f..0c00fc6cd4b 100644 --- a/ydb/core/tx/datashard/backup_unit.cpp +++ b/ydb/core/tx/datashard/backup_unit.cpp @@ -1,7 +1,7 @@ -#include "export_iface.h" #include "backup_restore_common.h" #include "backup_restore_traits.h" #include "execution_unit_ctors.h" +#include "export_iface.h" #include "export_scan.h" #include "export_s3.h" @@ -17,7 +17,7 @@ protected: } bool IsWaiting(TOperation::TPtr op) const override { - return op->IsWaitingForScan(); + return op->IsWaitingForScan() || op->IsWaitingForRestart(); } void SetWaiting(TOperation::TPtr op) override { @@ -26,6 +26,7 @@ protected: void ResetWaiting(TOperation::TPtr op) override { op->ResetWaitingForScanFlag(); + op->ResetWaitingForRestartFlag(); } bool Run(TOperation::TPtr op, TTransactionContext& txc, const TActorContext& ctx) override { @@ -110,20 +111,34 @@ protected: return op->HasScanResult(); } - void ProcessResult(TOperation::TPtr op, const TActorContext&) override { + bool ProcessResult(TOperation::TPtr op, const TActorContext&) override { TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get()); Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind()); auto* result = CheckedCast<TExportScanProduct*>(op->ScanResult().Get()); - auto* schemeOp = DataShard.FindSchemaTx(op->GetTxId()); - - schemeOp->Success = result->Success; - schemeOp->Error = std::move(result->Error); - schemeOp->BytesProcessed = result->BytesRead; - schemeOp->RowsProcessed = result->RowsRead; + bool done = true; + + switch (result->Outcome) { + case EExportOutcome::Success: + case EExportOutcome::Error: + if (auto* schemeOp = DataShard.FindSchemaTx(op->GetTxId())) { + schemeOp->Success = result->Outcome == EExportOutcome::Success; + schemeOp->Error = std::move(result->Error); + schemeOp->BytesProcessed = result->BytesRead; + schemeOp->RowsProcessed = result->RowsRead; + } else { + Y_FAIL_S("Cannot find schema tx: " << op->GetTxId()); + } + break; + case EExportOutcome::Aborted: + done = false; + break; + } op->SetScanResult(nullptr); tx->SetScanTask(0); + + return done; } void Cancel(TActiveTransaction* tx, const TActorContext&) override { diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp index a40d95de94f..3f1c5fab01e 100644 --- a/ydb/core/tx/datashard/datashard.cpp +++ b/ydb/core/tx/datashard/datashard.cpp @@ -3827,6 +3827,23 @@ void TDataShard::Handle(TEvPrivate::TEvAsyncJobComplete::TPtr &ev, const TActorC PlanQueue.Progress(ctx); } +void TDataShard::Handle(TEvPrivate::TEvRestartOperation::TPtr &ev, const TActorContext &ctx) { + const auto txId = ev->Get()->TxId; + + if (auto op = Pipeline.FindOp(txId)) { + LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Restart op: " << txId + << " at " << TabletID()); + + if (op->IsWaitingForRestart()) { + op->ResetWaitingForRestartFlag(); + Pipeline.AddCandidateOp(op); + } + } + + // Continue current Tx + PlanQueue.Progress(ctx); +} + bool TDataShard::ReassignChannelsEnabled() const { return true; } diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h index 416ef9ff4c5..4b13c4c6584 100644 --- a/ydb/core/tx/datashard/datashard.h +++ b/ydb/core/tx/datashard/datashard.h @@ -124,8 +124,10 @@ namespace NDataShard { WaitCompletion = 1ULL << 44, // Waiting for global tx id allocation WaitingForGlobalTxId = 1ULL << 45, + // Operation is waiting for restart + WaitingForRestart = 1ULL << 46, - LastFlag = WaitCompletion, + LastFlag = WaitingForRestart, PrivateFlagsMask = 0xFFFFFFFFFFFF0000ULL, PreservedPrivateFlagsMask = ReadOnly | ProposeBlocker | NeedDiagnostics | GlobalReader diff --git a/ydb/core/tx/datashard/datashard_impl.h b/ydb/core/tx/datashard/datashard_impl.h index 04124acae01..27168c0746c 100644 --- a/ydb/core/tx/datashard/datashard_impl.h +++ b/ydb/core/tx/datashard/datashard_impl.h @@ -284,6 +284,7 @@ class TDataShard friend class TS3UploadsManager; friend class TS3DownloadsManager; friend class TS3Downloader; + template <typename T> friend class TBackupRestoreUnitBase; friend struct TSetupSysLocks; friend class TDataShardLocksDb; @@ -334,6 +335,7 @@ class TDataShard EvCdcStreamScanRegistered, EvCdcStreamScanProgress, EvCdcStreamScanContinue, + EvRestartOperation, // used to restart after an aborted scan (e.g. backup) EvEnd }; @@ -505,6 +507,15 @@ class TDataShard }; struct TEvCdcStreamScanContinue : public TEventLocal<TEvCdcStreamScanContinue, EvCdcStreamScanContinue> {}; + + struct TEvRestartOperation : public TEventLocal<TEvRestartOperation, EvRestartOperation> { + explicit TEvRestartOperation(ui64 txId) + : TxId(txId) + { + } + + const ui64 TxId; + }; }; struct Schema : NIceDb::Schema { @@ -1194,6 +1205,7 @@ class TDataShard void Handle(TEvPrivate::TEvCdcStreamScanRegistered::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvCdcStreamScanProgress::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvAsyncJobComplete::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvRestartOperation::TPtr& ev, const TActorContext& ctx); void Handle(TEvDataShard::TEvCancelBackup::TPtr &ev, const TActorContext &ctx); void Handle(TEvDataShard::TEvCancelRestore::TPtr &ev, const TActorContext &ctx); @@ -2757,6 +2769,7 @@ protected: HFunc(TEvPrivate::TEvCdcStreamScanRegistered, Handle); HFunc(TEvPrivate::TEvCdcStreamScanProgress, Handle); HFunc(TEvPrivate::TEvAsyncJobComplete, Handle); + HFunc(TEvPrivate::TEvRestartOperation, Handle); HFunc(TEvPrivate::TEvPeriodicWakeup, DoPeriodicTasks); HFunc(TEvents::TEvUndelivered, Handle); IgnoreFunc(TEvInterconnect::TEvNodeConnected); diff --git a/ydb/core/tx/datashard/export_scan.cpp b/ydb/core/tx/datashard/export_scan.cpp index 0538e88b856..51d9484aabc 100644 --- a/ydb/core/tx/datashard/export_scan.cpp +++ b/ydb/core/tx/datashard/export_scan.cpp @@ -217,15 +217,23 @@ public: } TAutoPtr<IDestructable> Finish(EAbort abort) noexcept override { - const bool success = (abort == EAbort::None) && Success; - + auto outcome = EExportOutcome::Success; if (abort != EAbort::None) { - Error = "Aborted by scan host env"; - Send(Uploader, new TEvents::TEvPoisonPill()); + outcome = EExportOutcome::Aborted; + } else if (!Success) { + outcome = EExportOutcome::Error; } PassAway(); - return new TExportScanProduct(success, Error, Stats->BytesRead, Stats->Rows); + return new TExportScanProduct(outcome, Error, Stats->BytesRead, Stats->Rows); + } + + void PassAway() override { + if (const auto& actorId = std::exchange(Uploader, {})) { + Send(actorId, new TEvents::TEvPoisonPill()); + } + + IActorCallback::PassAway(); } STATEFN(StateWork) { diff --git a/ydb/core/tx/datashard/export_scan.h b/ydb/core/tx/datashard/export_scan.h index 9d983be5954..1873795255e 100644 --- a/ydb/core/tx/datashard/export_scan.h +++ b/ydb/core/tx/datashard/export_scan.h @@ -70,14 +70,20 @@ struct TEvExportScan { }; // TEvExportScan +enum class EExportOutcome { + Success, + Error, + Aborted, +}; + struct TExportScanProduct: public IDestructable { - bool Success; + EExportOutcome Outcome; TString Error; ui64 BytesRead; ui64 RowsRead; - explicit TExportScanProduct(bool success, TString error, ui64 bytes, ui64 rows) - : Success(success) + explicit TExportScanProduct(EExportOutcome outcome, TString error, ui64 bytes, ui64 rows) + : Outcome(outcome) , Error(std::move(error)) , BytesRead(bytes) , RowsRead(rows) diff --git a/ydb/core/tx/datashard/operation.h b/ydb/core/tx/datashard/operation.h index 1569832efbb..c351891e574 100644 --- a/ydb/core/tx/datashard/operation.h +++ b/ydb/core/tx/datashard/operation.h @@ -342,6 +342,11 @@ public: void ResetWaitingForSnapshotFlag() { ResetFlag(TTxFlags::WaitingForSnapshot); } bool IsWaitingForSnapshot() const { return HasWaitingForSnapshotFlag(); } + bool HasWaitingForRestartFlag() const { return HasFlag(TTxFlags::WaitingForRestart); } + void SetWaitingForRestartFlag(bool val = true) { SetFlag(TTxFlags::WaitingForRestart, val); } + void ResetWaitingForRestartFlag() { ResetFlag(TTxFlags::WaitingForRestart); } + bool IsWaitingForRestart() const { return HasWaitingForRestartFlag(); } + bool HasResultSentFlag() const { return HasFlag(TTxFlags::ResultSent); } void SetResultSentFlag(bool val = true) { SetFlag(TTxFlags::ResultSent, val); } diff --git a/ydb/core/tx/datashard/restore_unit.cpp b/ydb/core/tx/datashard/restore_unit.cpp index 6f80c11bcdf..03ca603f5ca 100644 --- a/ydb/core/tx/datashard/restore_unit.cpp +++ b/ydb/core/tx/datashard/restore_unit.cpp @@ -60,7 +60,7 @@ protected: return op->HasAsyncJobResult(); } - void ProcessResult(TOperation::TPtr op, const TActorContext&) override { + bool ProcessResult(TOperation::TPtr op, const TActorContext&) override { TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get()); Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind()); @@ -74,6 +74,8 @@ protected: op->SetAsyncJobResult(nullptr); tx->SetAsyncJobActor(TActorId()); + + return true; } void Cancel(TActiveTransaction* tx, const TActorContext& ctx) override { diff --git a/ydb/core/tx/schemeshard/ut_export.cpp b/ydb/core/tx/schemeshard/ut_export.cpp index 0609751f617..86d98ab389d 100644 --- a/ydb/core/tx/schemeshard/ut_export.cpp +++ b/ydb/core/tx/schemeshard/ut_export.cpp @@ -1,3 +1,4 @@ +#include <ydb/core/tablet_flat/shared_cache_events.h> #include <ydb/core/tx/schemeshard/ut_helpers/helpers.h> #include <ydb/core/tx/schemeshard/schemeshard_billing_helpers.h> #include <ydb/core/tx/datashard/datashard.h> @@ -157,6 +158,21 @@ namespace { TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::NOT_FOUND); } + void WriteRow(TTestActorRuntime& runtime, ui64 tabletId, const TString& key, const TString& value) { + NKikimrMiniKQL::TResult result; + TString error; + NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, Sprintf(R"( + ( + (let key '( '('key (Utf8 '%s) ) ) ) + (let row '( '('value (Utf8 '%s) ) ) ) + (return (AsList (UpdateRow '__user__Table key row) )) + ) + )", key.c_str(), value.c_str()), result, error); + + UNIT_ASSERT_VALUES_EQUAL_C(status, NKikimrProto::EReplyStatus::OK, error); + UNIT_ASSERT_VALUES_EQUAL(error, ""); + } + } // anonymous Y_UNIT_TEST_SUITE(TExportToS3Tests) { @@ -667,21 +683,6 @@ partitioning_settings { TTestEnv env(runtime); ui64 txId = 100; - auto writeRow = [&](ui64 tabletId, const char* key, const char* value) { - NKikimrMiniKQL::TResult result; - TString error; - NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, Sprintf(R"( - ( - (let key '( '('key (Utf8 '%s) ) ) ) - (let row '( '('value (Utf8 '%s) ) ) ) - (return (AsList (UpdateRow '__user__Table key row) )) - ) - )", key, value), result, error); - - UNIT_ASSERT_VALUES_EQUAL_C(status, NKikimrProto::EReplyStatus::OK, error); - UNIT_ASSERT_VALUES_EQUAL(error, ""); - }; - TestCreateTable(runtime, ++txId, "/MyRoot", R"( Name: "Table" Columns { Name: "key" Type: "Utf8" } @@ -690,8 +691,8 @@ partitioning_settings { )"); env.TestWaitNotification(runtime, txId); - writeRow(TTestTxConfig::FakeHiveTablets, "a", "valueA"); - writeRow(TTestTxConfig::FakeHiveTablets, "b", "valueB"); + WriteRow(runtime, TTestTxConfig::FakeHiveTablets, "a", "valueA"); + WriteRow(runtime, TTestTxConfig::FakeHiveTablets, "b", "valueB"); runtime.SetLogPriority(NKikimrServices::S3_WRAPPER, NActors::NLog::PRI_TRACE); runtime.SetLogPriority(NKikimrServices::DATASHARD_BACKUP, NActors::NLog::PRI_TRACE); @@ -795,28 +796,9 @@ partitioning_settings { Y_UNIT_TEST(ShouldExcludeBackupTableFromStats) { TTestBasicRuntime runtime; - TTestEnvOptions opts; - opts.DisableStatsBatching(true); - - TTestEnv env(runtime, opts); - + TTestEnv env(runtime, TTestEnvOptions().DisableStatsBatching(true)); ui64 txId = 100; - auto writeRow = [&](ui64 tabletId, const TString& key, const TString& value) { - NKikimrMiniKQL::TResult result; - TString error; - NKikimrProto::EReplyStatus status = LocalMiniKQL(runtime, tabletId, Sprintf(R"( - ( - (let key '( '('key (Utf8 '%s) ) ) ) - (let row '( '('value (Utf8 '%s) ) ) ) - (return (AsList (UpdateRow '__user__Table key row) )) - ) - )", key.c_str(), value.c_str()), result, error); - - UNIT_ASSERT_VALUES_EQUAL_C(status, NKikimrProto::EReplyStatus::OK, error); - UNIT_ASSERT_VALUES_EQUAL(error, ""); - }; - THashSet<ui64> statsCollected; runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { if (ev->GetTypeRewrite() == TEvDataShard::EvPeriodicTableStats) { @@ -850,7 +832,7 @@ partitioning_settings { env.TestWaitNotification(runtime, txId); for (int i = 1; i < 500; ++i) { - writeRow(TTestTxConfig::FakeHiveTablets, Sprintf("a%i", i), "value"); + WriteRow(runtime, TTestTxConfig::FakeHiveTablets, Sprintf("a%i", i), "value"); } // trigger memtable's compaction @@ -933,4 +915,67 @@ partitioning_settings { UNIT_ASSERT(item.has_start_time()); UNIT_ASSERT(item.has_end_time()); } + + Y_UNIT_TEST(ShouldRestartOnScanErrors) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Utf8" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + WriteRow(runtime, TTestTxConfig::FakeHiveTablets, "a", "valueA"); + + TPortManager portManager; + const ui16 port = portManager.GetPort(); + + TS3Mock s3Mock({}, TS3Mock::TSettings(port)); + UNIT_ASSERT(s3Mock.Start()); + + THolder<IEventHandle> injectResult; + auto prevObserver = runtime.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) { + if (ev->GetTypeRewrite() == NSharedCache::EvResult) { + const auto* msg = ev->Get<NSharedCache::TEvResult>(); + UNIT_ASSERT_VALUES_EQUAL(msg->Status, NKikimrProto::OK); + + auto result = MakeHolder<NSharedCache::TEvResult>(msg->Origin, msg->Cookie, NKikimrProto::ERROR); + std::move(msg->Loaded.begin(), msg->Loaded.end(), std::back_inserter(result->Loaded)); + + injectResult = MakeHolder<IEventHandle>(ev->Recipient, ev->Sender, result.Release(), ev->Flags, ev->Cookie); + return TTestActorRuntime::EEventAction::DROP; + } + + return TTestActorRuntime::EEventAction::PROCESS; + }); + + TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"( + ExportToS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_path: "/MyRoot/Table" + destination_prefix: "" + } + } + )", port)); + + if (!injectResult) { + TDispatchOptions opts; + opts.FinalEvents.emplace_back([&injectResult](IEventHandle&) -> bool { + return bool(injectResult); + }); + runtime.DispatchEvents(opts); + } + + runtime.SetObserverFunc(prevObserver); + runtime.Send(injectResult.Release(), 0, true); + + env.TestWaitNotification(runtime, txId); + TestGetExport(runtime, txId, "/MyRoot", Ydb::StatusIds::SUCCESS); + } } |