summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorOleg <[email protected]>2025-10-20 21:58:09 +0300
committerGitHub <[email protected]>2025-10-20 21:58:09 +0300
commit924d2e51e2100a86fb8707e2df5b5b92dca47304 (patch)
tree9f022b226bbdcf9b03aa1f6b25fb8843460c6c12
parent719bc6813fb2c0a6ad8d0c024d4928f3bc5bf9d9 (diff)
Removing tables from backup collection (#27068)
-rw-r--r--ydb/tests/functional/backup_collection/basic_user_scenarios.py169
1 files changed, 169 insertions, 0 deletions
diff --git a/ydb/tests/functional/backup_collection/basic_user_scenarios.py b/ydb/tests/functional/backup_collection/basic_user_scenarios.py
index 7a0327c66a3..5f6a3b95a3b 100644
--- a/ydb/tests/functional/backup_collection/basic_user_scenarios.py
+++ b/ydb/tests/functional/backup_collection/basic_user_scenarios.py
@@ -1514,3 +1514,172 @@ class TestFullCycleLocalBackupRestoreWSchemaChange(TestFullCycleLocalBackupResto
# cleanup exported data
if os.path.exists(export_dir):
shutil.rmtree(export_dir)
+
+
+class TestIncrementalChainRestoreAfterDeletion(TestFullCycleLocalBackupRestore):
+ def _record_snapshot_and_rows(self, collection_src: str, t_orders: str, t_products: str,
+ created_snapshots: list, snapshot_rows: dict) -> str:
+ """Record newest snapshot name and capture rows for orders/products."""
+ kids = sorted(self.get_collection_children(collection_src))
+ assert kids, "No snapshots found after backup"
+ last = kids[-1]
+ created_snapshots.append(last)
+ snapshot_rows[last] = {
+ "orders": self._capture_snapshot(t_orders),
+ "products": self._capture_snapshot(t_products),
+ }
+ return last
+
+ def _apply_sql_mutations(self, *sql_statements: str) -> None:
+ with self.session_scope() as session:
+ for s in sql_statements:
+ session.transaction().execute(s, commit_tx=True)
+ time.sleep(1.1)
+
+ def _import_exported_snapshots_up_to(self, coll_restore: str, export_dir: str, target_ts: str) -> list:
+ """Import exported snapshot directories whose timestamp part <= target_ts into the restore collection."""
+ all_dirs = sorted([d for d in os.listdir(export_dir) if os.path.isdir(os.path.join(export_dir, d))])
+ chosen = [d for d in all_dirs if d.split("_", 1)[0] <= target_ts]
+ assert chosen, f"No exported snapshots with ts <= {target_ts} found in {export_dir}: {all_dirs}"
+
+ for name in chosen:
+ src = os.path.join(export_dir, name)
+ dest_path = f"/Root/.backups/collections/{coll_restore}/{name}"
+ r = yatest.common.execute(
+ [
+ backup_bin(),
+ "--verbose",
+ "--endpoint",
+ "grpc://localhost:%d" % self.cluster.nodes[1].grpc_port,
+ "--database",
+ self.root_dir,
+ "tools",
+ "restore",
+ "--path",
+ dest_path,
+ "--input",
+ src,
+ ],
+ check_exit_code=False,
+ )
+ out = (r.std_out or b"").decode("utf-8", "ignore")
+ err = (r.std_err or b"").decode("utf-8", "ignore")
+ assert r.exit_code == 0, f"tools restore import failed for {name}: stdout={out} stderr={err}"
+
+ # wait for imported snapshots to appear in scheme
+ deadline = time.time() + 60
+ expected = set(chosen)
+ while time.time() < deadline:
+ kids = set(self.get_collection_children(coll_restore))
+ if expected.issubset(kids):
+ break
+ time.sleep(1)
+ else:
+ raise AssertionError(
+ f"Imported snapshots did not appear in collection {coll_restore} within timeout. Expected: {sorted(chosen)}"
+ )
+
+ return chosen
+
+ def test_incremental_chain_restore_when_tables_deleted(self):
+ """Create chain full -> inc1 -> inc2 -> inc3, export/import up to inc2, delete tables and restore."""
+ # Setup
+ collection_src, t_orders, t_products = self._setup_test_collections()
+ full_orders = f"/Root/{t_orders}"
+ full_products = f"/Root/{t_products}"
+
+ # Create incremental-enabled collection
+ create_collection_sql = f"""
+ CREATE BACKUP COLLECTION `{collection_src}`
+ ( TABLE `{full_orders}`, TABLE `{full_products}` )
+ WITH ( STORAGE = 'cluster', INCREMENTAL_BACKUP_ENABLED = 'true' );
+ """
+ create_res = self._execute_yql(create_collection_sql)
+ assert create_res.exit_code == 0, f"CREATE BACKUP COLLECTION failed: {getattr(create_res, 'std_err', None)}"
+ self.wait_for_collection(collection_src, timeout_s=30)
+
+ created_snapshots = []
+ snapshot_rows = {} # snapshot_name -> {"orders": rows, "products": rows}
+
+ # Full backup
+ r = self._execute_yql(f"BACKUP `{collection_src}`;")
+ assert r.exit_code == 0, f"FULL BACKUP 1 failed: {getattr(r, 'std_err', None)}"
+ self.wait_for_collection_has_snapshot(collection_src, timeout_s=30)
+ self._record_snapshot_and_rows(collection_src, t_orders, t_products, created_snapshots, snapshot_rows)
+
+ # change data and create incremental 1
+ self._apply_sql_mutations(
+ 'PRAGMA TablePathPrefix("/Root"); UPSERT INTO orders (id, number, txt) VALUES (10, 1000, "inc1");',
+ 'PRAGMA TablePathPrefix("/Root"); DELETE FROM products WHERE id = 1;'
+ )
+ r = self._execute_yql(f"BACKUP `{collection_src}` INCREMENTAL;")
+ assert r.exit_code == 0, "INCREMENTAL 1 failed"
+ self._record_snapshot_and_rows(collection_src, t_orders, t_products, created_snapshots, snapshot_rows)
+
+ # change data and create incremental 2
+ self._apply_sql_mutations(
+ 'PRAGMA TablePathPrefix("/Root"); UPSERT INTO orders (id, number, txt) VALUES (20, 2000, "inc2");',
+ 'PRAGMA TablePathPrefix("/Root"); DELETE FROM orders WHERE id = 1;'
+ )
+ r = self._execute_yql(f"BACKUP `{collection_src}` INCREMENTAL;")
+ assert r.exit_code == 0, "INCREMENTAL 2 failed"
+ snap_inc2 = self._record_snapshot_and_rows(collection_src, t_orders, t_products, created_snapshots, snapshot_rows)
+
+ # change data and create incremental 3
+ self._apply_sql_mutations(
+ 'PRAGMA TablePathPrefix("/Root"); UPSERT INTO orders (id, number, txt) VALUES (30, 3000, "inc3");'
+ )
+ r = self._execute_yql(f"BACKUP `{collection_src}` INCREMENTAL;")
+ assert r.exit_code == 0, "INCREMENTAL 3 failed"
+ self._record_snapshot_and_rows(collection_src, t_orders, t_products, created_snapshots, snapshot_rows)
+
+ assert len(created_snapshots) >= 2, "Expected at least 1 full + incrementals"
+
+ # Export backups
+ export_dir, exported_items = self._export_backups(collection_src)
+ assert exported_items, "No exported snapshots found"
+ exported_dirs = sorted([d for d in os.listdir(export_dir) if os.path.isdir(os.path.join(export_dir, d))])
+ for s in created_snapshots:
+ assert s in exported_dirs, f"Recorded snapshot {s} not found in exported dirs {exported_dirs}"
+
+ # Create restore collection and import snapshots up to target (choose inc2)
+ target_snap = snap_inc2
+ target_ts = target_snap.split("_", 1)[0]
+
+ coll_restore = f"coll_restore_incr_{int(time.time())}"
+ create_restore_sql = f"""
+ CREATE BACKUP COLLECTION `{coll_restore}`
+ ( TABLE `{full_orders}`, TABLE `{full_products}` )
+ WITH ( STORAGE = 'cluster' );
+ """
+ res = self._execute_yql(create_restore_sql)
+ assert res.exit_code == 0, f"CREATE backup collection {coll_restore} failed"
+ self.wait_for_collection(coll_restore, timeout_s=30)
+
+ self._import_exported_snapshots_up_to(coll_restore, export_dir, target_ts)
+ time.sleep(1)
+ self._drop_tables([t_orders, t_products])
+
+ # Run RESTORE
+ res_restore = self._execute_yql(f"RESTORE `{coll_restore}`;")
+ assert res_restore.exit_code == 0, f"RESTORE failed: {getattr(res_restore, 'std_err', None) or getattr(res_restore, 'std_out', None)}"
+
+ # Verify restored data matches snapshot inc2
+ expected_orders = snapshot_rows[target_snap]["orders"]
+ expected_products = snapshot_rows[target_snap]["products"]
+
+ self._verify_restored_table_data(t_orders, expected_orders)
+ self._verify_restored_table_data(t_products, expected_products)
+
+ # Check whether original collection still present or removed (either is acceptable)
+ coll_present = self.collection_exists(collection_src)
+ if not coll_present:
+ logger.info("Starting collection %s not present (deleted) — OK", collection_src)
+ else:
+ logger.info(
+ f"Starting collection {collection_src} is present and incremental backups appear enabled. "
+ "Expected: starting collection removed OR incremental backups disabled."
+ )
+
+ if os.path.exists(export_dir):
+ shutil.rmtree(export_dir)