diff options
author | Oleg <[email protected]> | 2025-10-20 21:58:09 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2025-10-20 21:58:09 +0300 |
commit | 924d2e51e2100a86fb8707e2df5b5b92dca47304 (patch) | |
tree | 9f022b226bbdcf9b03aa1f6b25fb8843460c6c12 | |
parent | 719bc6813fb2c0a6ad8d0c024d4928f3bc5bf9d9 (diff) |
Removing tables from backup collection (#27068)
-rw-r--r-- | ydb/tests/functional/backup_collection/basic_user_scenarios.py | 169 |
1 files changed, 169 insertions, 0 deletions
diff --git a/ydb/tests/functional/backup_collection/basic_user_scenarios.py b/ydb/tests/functional/backup_collection/basic_user_scenarios.py index 7a0327c66a3..5f6a3b95a3b 100644 --- a/ydb/tests/functional/backup_collection/basic_user_scenarios.py +++ b/ydb/tests/functional/backup_collection/basic_user_scenarios.py @@ -1514,3 +1514,172 @@ class TestFullCycleLocalBackupRestoreWSchemaChange(TestFullCycleLocalBackupResto # cleanup exported data if os.path.exists(export_dir): shutil.rmtree(export_dir) + + +class TestIncrementalChainRestoreAfterDeletion(TestFullCycleLocalBackupRestore): + def _record_snapshot_and_rows(self, collection_src: str, t_orders: str, t_products: str, + created_snapshots: list, snapshot_rows: dict) -> str: + """Record newest snapshot name and capture rows for orders/products.""" + kids = sorted(self.get_collection_children(collection_src)) + assert kids, "No snapshots found after backup" + last = kids[-1] + created_snapshots.append(last) + snapshot_rows[last] = { + "orders": self._capture_snapshot(t_orders), + "products": self._capture_snapshot(t_products), + } + return last + + def _apply_sql_mutations(self, *sql_statements: str) -> None: + with self.session_scope() as session: + for s in sql_statements: + session.transaction().execute(s, commit_tx=True) + time.sleep(1.1) + + def _import_exported_snapshots_up_to(self, coll_restore: str, export_dir: str, target_ts: str) -> list: + """Import exported snapshot directories whose timestamp part <= target_ts into the restore collection.""" + all_dirs = sorted([d for d in os.listdir(export_dir) if os.path.isdir(os.path.join(export_dir, d))]) + chosen = [d for d in all_dirs if d.split("_", 1)[0] <= target_ts] + assert chosen, f"No exported snapshots with ts <= {target_ts} found in {export_dir}: {all_dirs}" + + for name in chosen: + src = os.path.join(export_dir, name) + dest_path = f"/Root/.backups/collections/{coll_restore}/{name}" + r = yatest.common.execute( + [ + backup_bin(), + "--verbose", + "--endpoint", + "grpc://localhost:%d" % self.cluster.nodes[1].grpc_port, + "--database", + self.root_dir, + "tools", + "restore", + "--path", + dest_path, + "--input", + src, + ], + check_exit_code=False, + ) + out = (r.std_out or b"").decode("utf-8", "ignore") + err = (r.std_err or b"").decode("utf-8", "ignore") + assert r.exit_code == 0, f"tools restore import failed for {name}: stdout={out} stderr={err}" + + # wait for imported snapshots to appear in scheme + deadline = time.time() + 60 + expected = set(chosen) + while time.time() < deadline: + kids = set(self.get_collection_children(coll_restore)) + if expected.issubset(kids): + break + time.sleep(1) + else: + raise AssertionError( + f"Imported snapshots did not appear in collection {coll_restore} within timeout. Expected: {sorted(chosen)}" + ) + + return chosen + + def test_incremental_chain_restore_when_tables_deleted(self): + """Create chain full -> inc1 -> inc2 -> inc3, export/import up to inc2, delete tables and restore.""" + # Setup + collection_src, t_orders, t_products = self._setup_test_collections() + full_orders = f"/Root/{t_orders}" + full_products = f"/Root/{t_products}" + + # Create incremental-enabled collection + create_collection_sql = f""" + CREATE BACKUP COLLECTION `{collection_src}` + ( TABLE `{full_orders}`, TABLE `{full_products}` ) + WITH ( STORAGE = 'cluster', INCREMENTAL_BACKUP_ENABLED = 'true' ); + """ + create_res = self._execute_yql(create_collection_sql) + assert create_res.exit_code == 0, f"CREATE BACKUP COLLECTION failed: {getattr(create_res, 'std_err', None)}" + self.wait_for_collection(collection_src, timeout_s=30) + + created_snapshots = [] + snapshot_rows = {} # snapshot_name -> {"orders": rows, "products": rows} + + # Full backup + r = self._execute_yql(f"BACKUP `{collection_src}`;") + assert r.exit_code == 0, f"FULL BACKUP 1 failed: {getattr(r, 'std_err', None)}" + self.wait_for_collection_has_snapshot(collection_src, timeout_s=30) + self._record_snapshot_and_rows(collection_src, t_orders, t_products, created_snapshots, snapshot_rows) + + # change data and create incremental 1 + self._apply_sql_mutations( + 'PRAGMA TablePathPrefix("/Root"); UPSERT INTO orders (id, number, txt) VALUES (10, 1000, "inc1");', + 'PRAGMA TablePathPrefix("/Root"); DELETE FROM products WHERE id = 1;' + ) + r = self._execute_yql(f"BACKUP `{collection_src}` INCREMENTAL;") + assert r.exit_code == 0, "INCREMENTAL 1 failed" + self._record_snapshot_and_rows(collection_src, t_orders, t_products, created_snapshots, snapshot_rows) + + # change data and create incremental 2 + self._apply_sql_mutations( + 'PRAGMA TablePathPrefix("/Root"); UPSERT INTO orders (id, number, txt) VALUES (20, 2000, "inc2");', + 'PRAGMA TablePathPrefix("/Root"); DELETE FROM orders WHERE id = 1;' + ) + r = self._execute_yql(f"BACKUP `{collection_src}` INCREMENTAL;") + assert r.exit_code == 0, "INCREMENTAL 2 failed" + snap_inc2 = self._record_snapshot_and_rows(collection_src, t_orders, t_products, created_snapshots, snapshot_rows) + + # change data and create incremental 3 + self._apply_sql_mutations( + 'PRAGMA TablePathPrefix("/Root"); UPSERT INTO orders (id, number, txt) VALUES (30, 3000, "inc3");' + ) + r = self._execute_yql(f"BACKUP `{collection_src}` INCREMENTAL;") + assert r.exit_code == 0, "INCREMENTAL 3 failed" + self._record_snapshot_and_rows(collection_src, t_orders, t_products, created_snapshots, snapshot_rows) + + assert len(created_snapshots) >= 2, "Expected at least 1 full + incrementals" + + # Export backups + export_dir, exported_items = self._export_backups(collection_src) + assert exported_items, "No exported snapshots found" + exported_dirs = sorted([d for d in os.listdir(export_dir) if os.path.isdir(os.path.join(export_dir, d))]) + for s in created_snapshots: + assert s in exported_dirs, f"Recorded snapshot {s} not found in exported dirs {exported_dirs}" + + # Create restore collection and import snapshots up to target (choose inc2) + target_snap = snap_inc2 + target_ts = target_snap.split("_", 1)[0] + + coll_restore = f"coll_restore_incr_{int(time.time())}" + create_restore_sql = f""" + CREATE BACKUP COLLECTION `{coll_restore}` + ( TABLE `{full_orders}`, TABLE `{full_products}` ) + WITH ( STORAGE = 'cluster' ); + """ + res = self._execute_yql(create_restore_sql) + assert res.exit_code == 0, f"CREATE backup collection {coll_restore} failed" + self.wait_for_collection(coll_restore, timeout_s=30) + + self._import_exported_snapshots_up_to(coll_restore, export_dir, target_ts) + time.sleep(1) + self._drop_tables([t_orders, t_products]) + + # Run RESTORE + res_restore = self._execute_yql(f"RESTORE `{coll_restore}`;") + assert res_restore.exit_code == 0, f"RESTORE failed: {getattr(res_restore, 'std_err', None) or getattr(res_restore, 'std_out', None)}" + + # Verify restored data matches snapshot inc2 + expected_orders = snapshot_rows[target_snap]["orders"] + expected_products = snapshot_rows[target_snap]["products"] + + self._verify_restored_table_data(t_orders, expected_orders) + self._verify_restored_table_data(t_products, expected_products) + + # Check whether original collection still present or removed (either is acceptable) + coll_present = self.collection_exists(collection_src) + if not coll_present: + logger.info("Starting collection %s not present (deleted) — OK", collection_src) + else: + logger.info( + f"Starting collection {collection_src} is present and incremental backups appear enabled. " + "Expected: starting collection removed OR incremental backups disabled." + ) + + if os.path.exists(export_dir): + shutil.rmtree(export_dir) |