diff options
author | ilnaz <ilnaz@ydb.tech> | 2022-07-25 19:05:28 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2022-07-25 19:05:28 +0300 |
commit | 6288f21c1c76968f728a2951d4159b50bb8bbece (patch) | |
tree | d20a984fd1beac242959745860071294299e1947 | |
parent | 3935c4ab2dab91b36d5432b0394025fd59fbf656 (diff) | |
download | ydb-6288f21c1c76968f728a2951d4159b50bb8bbece.tar.gz |
(refactoring) Scheme tx helpers
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_common.cpp | 361 |
1 files changed, 140 insertions, 221 deletions
diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp index b0438ef795..8a50b78adb 100644 --- a/ydb/core/tx/datashard/datashard_ut_common.cpp +++ b/ydb/core/tx/datashard/datashard_ut_common.cpp @@ -1096,113 +1096,132 @@ void InitRoot(Tests::TServer::TPtr server, } } -void CreateShardedTable(Tests::TServer::TPtr server, - TActorId sender, - const TString &root, - const TString &name, - const TShardedTableOptions &opts) +static THolder<TEvTxUserProxy::TEvProposeTransaction> SchemeTxTemplate( + NKikimrSchemeOp::EOperationType type, + const TString& workingDir = {}) { - auto &runtime = *server->GetRuntime(); - auto &settings = server->GetSettings(); + auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>(); + request->Record.SetExecTimeoutPeriod(Max<ui64>()); - // Create table with four shards. - ui64 txId; - TAutoPtr<IEventHandle> handle; - { - auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>(); - request->Record.SetExecTimeoutPeriod(Max<ui64>()); - auto &tx = *request->Record.MutableTransaction()->MutableModifyScheme(); - tx.SetWorkingDir(root); - - NKikimrSchemeOp::TTableDescription* desc = nullptr; - if (opts.Indexes_) { - tx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateIndexedTable); - desc = tx.MutableCreateIndexedTable()->MutableTableDescription(); - } else { - tx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateTable); - desc = tx.MutableCreateTable(); - } + auto& tx = *request->Record.MutableTransaction()->MutableModifyScheme(); + tx.SetOperationType(type); - desc->SetName(name); + if (workingDir) { + tx.SetWorkingDir(workingDir); + } - for (const auto& column : opts.Columns_) { - auto col = desc->AddColumns(); - col->SetName(column.Name); - col->SetType(column.Type); - col->SetNotNull(column.NotNull); - if (column.IsKey) { - desc->AddKeyColumnNames(column.Name); - } - } + return request; +} - for (const auto& index : opts.Indexes_) { - auto* indexDesc = tx.MutableCreateIndexedTable()->MutableIndexDescription()->Add(); +static ui64 RunSchemeTx( + TTestActorRuntimeBase& runtime, + THolder<TEvTxUserProxy::TEvProposeTransaction>&& request, + TActorId sender = {}, + bool viaActorSystem = false, + TEvTxUserProxy::TEvProposeTransactionStatus::EStatus expectedStatus = TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress) +{ + if (!sender) { + sender = runtime.AllocateEdgeActor(); + } - indexDesc->SetName(index.Name); - indexDesc->SetType(index.Type); + runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release()), 0, viaActorSystem); + auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender); + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), expectedStatus); - for (const auto& col : index.IndexColumns) { - indexDesc->AddKeyColumnNames(col); - } - for (const auto& col : index.DataColumns) { - indexDesc->AddDataColumnNames(col); - } - } + return ev->Get()->Record.GetTxId(); +} - desc->SetUniformPartitionsCount(opts.Shards_); +void CreateShardedTable( + Tests::TServer::TPtr server, + TActorId sender, + const TString &root, + const TString &name, + const TShardedTableOptions &opts) +{ + // Create table with four shards. + auto request = SchemeTxTemplate(NKikimrSchemeOp::ESchemeOpCreateTable, root); - if (!opts.EnableOutOfOrder_) - desc->MutablePartitionConfig()->MutablePipelineConfig()->SetEnableOutOfOrder(false); + auto& tx = *request->Record.MutableTransaction()->MutableModifyScheme(); + NKikimrSchemeOp::TTableDescription* desc = nullptr; + if (opts.Indexes_) { + tx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateIndexedTable); + desc = tx.MutableCreateIndexedTable()->MutableTableDescription(); + } else { + tx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateTable); + desc = tx.MutableCreateTable(); + } - if (opts.Policy_) { - opts.Policy_->Serialize(*desc->MutablePartitionConfig()->MutableCompactionPolicy()); - } + UNIT_ASSERT(desc); + desc->SetName(name); - switch (opts.ShadowData_) { - case EShadowDataMode::Default: - break; - case EShadowDataMode::Enabled: - desc->MutablePartitionConfig()->SetShadowData(true); - break; + for (const auto& column : opts.Columns_) { + auto col = desc->AddColumns(); + col->SetName(column.Name); + col->SetType(column.Type); + col->SetNotNull(column.NotNull); + if (column.IsKey) { + desc->AddKeyColumnNames(column.Name); } + } - if (opts.Followers_ > 0) { - auto& followerGroup = *desc->MutablePartitionConfig()->AddFollowerGroups(); - followerGroup.SetFollowerCount(opts.Followers_); - followerGroup.SetAllowLeaderPromotion(opts.FollowerPromotion_); - } + for (const auto& index : opts.Indexes_) { + auto* indexDesc = tx.MutableCreateIndexedTable()->MutableIndexDescription()->Add(); - if (opts.ExternalStorage_) { - auto& family = *desc->MutablePartitionConfig()->AddColumnFamilies(); - family.SetStorage(NKikimrSchemeOp::ColumnStorageTest_1_2_1k); - } + indexDesc->SetName(index.Name); + indexDesc->SetType(index.Type); - if (opts.ExecutorCacheSize_) { - desc->MutablePartitionConfig()->SetExecutorCacheSize(*opts.ExecutorCacheSize_); + for (const auto& col : index.IndexColumns) { + indexDesc->AddKeyColumnNames(col); + } + for (const auto& col : index.DataColumns) { + indexDesc->AddDataColumnNames(col); } + } - runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release())); - auto reply = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(handle); - UNIT_ASSERT_VALUES_EQUAL(reply->Record.GetStatus(), TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress); - txId = reply->Record.GetTxId(); + desc->SetUniformPartitionsCount(opts.Shards_); + + if (!opts.EnableOutOfOrder_) + desc->MutablePartitionConfig()->MutablePipelineConfig()->SetEnableOutOfOrder(false); + + if (opts.Policy_) { + opts.Policy_->Serialize(*desc->MutablePartitionConfig()->MutableCompactionPolicy()); } - { - auto request = MakeHolder<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletion>(); - request->Record.SetTxId(txId); - auto tid = ChangeStateStorage(SchemeRoot, settings.Domain); - runtime.SendToPipe(tid, sender, request.Release(), 0, GetPipeConfigWithRetries()); - runtime.GrabEdgeEventRethrow<TEvSchemeShard::TEvNotifyTxCompletionResult>(handle); + + switch (opts.ShadowData_) { + case EShadowDataMode::Default: + break; + case EShadowDataMode::Enabled: + desc->MutablePartitionConfig()->SetShadowData(true); + break; + } + + if (opts.Followers_ > 0) { + auto& followerGroup = *desc->MutablePartitionConfig()->AddFollowerGroups(); + followerGroup.SetFollowerCount(opts.Followers_); + followerGroup.SetAllowLeaderPromotion(opts.FollowerPromotion_); + } + + if (opts.ExternalStorage_) { + auto& family = *desc->MutablePartitionConfig()->AddColumnFamilies(); + family.SetStorage(NKikimrSchemeOp::ColumnStorageTest_1_2_1k); + } + + if (opts.ExecutorCacheSize_) { + desc->MutablePartitionConfig()->SetExecutorCacheSize(*opts.ExecutorCacheSize_); } + + WaitTxNotification(server, sender, RunSchemeTx(*server->GetRuntime(), std::move(request), sender)); } -void CreateShardedTable(Tests::TServer::TPtr server, - TActorId sender, - const TString &root, - const TString &name, - ui64 shards, - bool enableOutOfOrder, - const NLocalDb::TCompactionPolicy* policy, - EShadowDataMode shadowData) +void CreateShardedTable( + Tests::TServer::TPtr server, + TActorId sender, + const TString &root, + const TString &name, + ui64 shards, + bool enableOutOfOrder, + const NLocalDb::TCompactionPolicy* policy, + EShadowDataMode shadowData) { auto opts = TShardedTableOptions() .Shards(shards) @@ -1219,22 +1238,12 @@ ui64 AsyncCreateCopyTable( const TString &name, const TString &from) { - auto &runtime = *server->GetRuntime(); - - // Create table with four shards. - auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>(); - request->Record.SetExecTimeoutPeriod(Max<ui64>()); - auto &tx = *request->Record.MutableTransaction()->MutableModifyScheme(); - tx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateTable); - tx.SetWorkingDir(root); - auto &desc = *tx.MutableCreateTable(); + auto request = SchemeTxTemplate(NKikimrSchemeOp::ESchemeOpCreateTable, root); + auto& desc = *request->Record.MutableTransaction()->MutableModifyScheme()->MutableCreateTable(); desc.SetName(name); desc.SetCopyFromTable(from); - runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release())); - auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender); - UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress); - return ev->Get()->Record.GetTxId(); + return RunSchemeTx(*server->GetRuntime(), std::move(request), sender); } NKikimrScheme::TEvDescribeSchemeResult DescribeTable(Tests::TServer::TPtr server, @@ -1495,22 +1504,13 @@ ui64 AsyncSplitTable( ui64 sourceTablet, ui32 splitKey) { - auto &runtime = *server->GetRuntime(); - - // Create table with four shards. - auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>(); - request->Record.SetExecTimeoutPeriod(Max<ui64>()); - auto &tx = *request->Record.MutableTransaction()->MutableModifyScheme(); - tx.SetOperationType(NKikimrSchemeOp::ESchemeOpSplitMergeTablePartitions); - auto &desc = *tx.MutableSplitMergeTablePartitions(); + auto request = SchemeTxTemplate(NKikimrSchemeOp::ESchemeOpSplitMergeTablePartitions); + auto& desc = *request->Record.MutableTransaction()->MutableModifyScheme()->MutableSplitMergeTablePartitions(); desc.SetTablePath(path); desc.AddSourceTabletId(sourceTablet); desc.AddSplitBoundary()->MutableKeyPrefix()->AddTuple()->MutableOptional()->SetUint32(splitKey); - runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release()), 0, /* viaActorSystem */ true); - auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender); - UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress); - return ev->Get()->Record.GetTxId(); + return RunSchemeTx(*server->GetRuntime(), std::move(request), sender, true); } ui64 AsyncMergeTable( @@ -1519,23 +1519,14 @@ ui64 AsyncMergeTable( const TString& path, const TVector<ui64>& sourceTabletIds) { - auto &runtime = *server->GetRuntime(); - - // Create table with four shards. - auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>(); - request->Record.SetExecTimeoutPeriod(Max<ui64>()); - auto &tx = *request->Record.MutableTransaction()->MutableModifyScheme(); - tx.SetOperationType(NKikimrSchemeOp::ESchemeOpSplitMergeTablePartitions); - auto &desc = *tx.MutableSplitMergeTablePartitions(); + auto request = SchemeTxTemplate(NKikimrSchemeOp::ESchemeOpSplitMergeTablePartitions); + auto& desc = *request->Record.MutableTransaction()->MutableModifyScheme()->MutableSplitMergeTablePartitions(); desc.SetTablePath(path); for (ui64 tabletId : sourceTabletIds) { desc.AddSourceTabletId(tabletId); } - runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release())); - auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender); - UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress); - return ev->Get()->Record.GetTxId(); + return RunSchemeTx(*server->GetRuntime(), std::move(request), sender); } ui64 AsyncAlterAddExtraColumn( @@ -1543,25 +1534,14 @@ ui64 AsyncAlterAddExtraColumn( const TString& workingDir, const TString& name) { - auto &runtime = *server->GetRuntime(); - auto sender = runtime.AllocateEdgeActor(); - - auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>(); - request->Record.SetExecTimeoutPeriod(Max<ui64>()); - auto &tx = *request->Record.MutableTransaction()->MutableModifyScheme(); - tx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterTable); - tx.SetWorkingDir(workingDir); - auto &desc = *tx.MutableAlterTable(); + auto request = SchemeTxTemplate(NKikimrSchemeOp::ESchemeOpAlterTable, workingDir); + auto& desc = *request->Record.MutableTransaction()->MutableModifyScheme()->MutableAlterTable(); desc.SetName(name); + auto& col = *desc.AddColumns(); + col.SetName("extra"); + col.SetType("Uint32"); - auto* col = desc.AddColumns(); - col->SetName("extra"); - col->SetType("Uint32"); - - runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release())); - auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender); - UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress); - return ev->Get()->Record.GetTxId(); + return RunSchemeTx(*server->GetRuntime(), std::move(request)); } ui64 AsyncAlterDropColumn( @@ -1570,24 +1550,13 @@ ui64 AsyncAlterDropColumn( const TString& name, const TString& colName) { - auto &runtime = *server->GetRuntime(); - auto sender = runtime.AllocateEdgeActor(); - - auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>(); - request->Record.SetExecTimeoutPeriod(Max<ui64>()); - auto &tx = *request->Record.MutableTransaction()->MutableModifyScheme(); - tx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterTable); - tx.SetWorkingDir(workingDir); - auto &desc = *tx.MutableAlterTable(); + auto request = SchemeTxTemplate(NKikimrSchemeOp::ESchemeOpAlterTable, workingDir); + auto& desc = *request->Record.MutableTransaction()->MutableModifyScheme()->MutableAlterTable(); desc.SetName(name); + auto& col = *desc.AddDropColumns(); + col.SetName(colName); - auto* col = desc.AddDropColumns(); - col->SetName(colName); - - runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release())); - auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender); - UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress); - return ev->Get()->Record.GetTxId(); + return RunSchemeTx(*server->GetRuntime(), std::move(request)); } ui64 AsyncAlterAndDisableShadow( @@ -1596,15 +1565,8 @@ ui64 AsyncAlterAndDisableShadow( const TString& name, const NLocalDb::TCompactionPolicy* policy) { - auto &runtime = *server->GetRuntime(); - auto sender = runtime.AllocateEdgeActor(); - - auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>(); - request->Record.SetExecTimeoutPeriod(Max<ui64>()); - auto &tx = *request->Record.MutableTransaction()->MutableModifyScheme(); - tx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterTable); - tx.SetWorkingDir(workingDir); - auto &desc = *tx.MutableAlterTable(); + auto request = SchemeTxTemplate(NKikimrSchemeOp::ESchemeOpAlterTable, workingDir); + auto& desc = *request->Record.MutableTransaction()->MutableModifyScheme()->MutableAlterTable(); desc.SetName(name); desc.MutablePartitionConfig()->SetShadowData(false); @@ -1612,10 +1574,7 @@ ui64 AsyncAlterAndDisableShadow( policy->Serialize(*desc.MutablePartitionConfig()->MutableCompactionPolicy()); } - runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release())); - auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender); - UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress); - return ev->Get()->Record.GetTxId(); + return RunSchemeTx(*server->GetRuntime(), std::move(request)); } ui64 AsyncAlterAddIndex( @@ -1683,22 +1642,12 @@ ui64 AsyncAlterDropIndex( const TString& tableName, const TString& indexName) { - auto &runtime = *server->GetRuntime(); - auto sender = runtime.AllocateEdgeActor(); - - auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>(); - request->Record.SetExecTimeoutPeriod(Max<ui64>()); - auto &tx = *request->Record.MutableTransaction()->MutableModifyScheme(); - tx.SetOperationType(NKikimrSchemeOp::ESchemeOpDropIndex); - tx.SetWorkingDir(workingDir); - auto &desc = *tx.MutableDropIndex(); + auto request = SchemeTxTemplate(NKikimrSchemeOp::ESchemeOpDropIndex, workingDir); + auto& desc = *request->Record.MutableTransaction()->MutableModifyScheme()->MutableDropIndex(); desc.SetTableName(tableName); desc.SetIndexName(indexName); - runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release())); - auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender); - UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress); - return ev->Get()->Record.GetTxId(); + return RunSchemeTx(*server->GetRuntime(), std::move(request)); } ui64 AsyncAlterAddStream( @@ -1707,24 +1656,14 @@ ui64 AsyncAlterAddStream( const TString& tableName, const TShardedTableOptions::TCdcStream& streamDesc) { - auto &runtime = *server->GetRuntime(); - auto sender = runtime.AllocateEdgeActor(); - - auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>(); - request->Record.SetExecTimeoutPeriod(Max<ui64>()); - auto &tx = *request->Record.MutableTransaction()->MutableModifyScheme(); - tx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateCdcStream); - tx.SetWorkingDir(workingDir); - auto &desc = *tx.MutableCreateCdcStream(); + auto request = SchemeTxTemplate(NKikimrSchemeOp::ESchemeOpCreateCdcStream, workingDir); + auto& desc = *request->Record.MutableTransaction()->MutableModifyScheme()->MutableCreateCdcStream(); desc.SetTableName(tableName); desc.MutableStreamDescription()->SetName(streamDesc.Name); desc.MutableStreamDescription()->SetMode(streamDesc.Mode); desc.MutableStreamDescription()->SetFormat(streamDesc.Format); - runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release())); - auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender); - UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress); - return ev->Get()->Record.GetTxId(); + return RunSchemeTx(*server->GetRuntime(), std::move(request)); } ui64 AsyncAlterDisableStream( @@ -1733,23 +1672,13 @@ ui64 AsyncAlterDisableStream( const TString& tableName, const TString& streamName) { - auto &runtime = *server->GetRuntime(); - auto sender = runtime.AllocateEdgeActor(); - - auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>(); - request->Record.SetExecTimeoutPeriod(Max<ui64>()); - auto &tx = *request->Record.MutableTransaction()->MutableModifyScheme(); - tx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterCdcStream); - tx.SetWorkingDir(workingDir); - auto &desc = *tx.MutableAlterCdcStream(); + auto request = SchemeTxTemplate(NKikimrSchemeOp::ESchemeOpAlterCdcStream, workingDir); + auto& desc = *request->Record.MutableTransaction()->MutableModifyScheme()->MutableAlterCdcStream(); desc.SetTableName(tableName); desc.SetStreamName(streamName); desc.MutableDisable(); - runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release())); - auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender); - UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress); - return ev->Get()->Record.GetTxId(); + return RunSchemeTx(*server->GetRuntime(), std::move(request)); } ui64 AsyncAlterDropStream( @@ -1758,22 +1687,12 @@ ui64 AsyncAlterDropStream( const TString& tableName, const TString& streamName) { - auto &runtime = *server->GetRuntime(); - auto sender = runtime.AllocateEdgeActor(); - - auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>(); - request->Record.SetExecTimeoutPeriod(Max<ui64>()); - auto &tx = *request->Record.MutableTransaction()->MutableModifyScheme(); - tx.SetOperationType(NKikimrSchemeOp::ESchemeOpDropCdcStream); - tx.SetWorkingDir(workingDir); - auto &desc = *tx.MutableDropCdcStream(); + auto request = SchemeTxTemplate(NKikimrSchemeOp::ESchemeOpDropCdcStream, workingDir); + auto& desc = *request->Record.MutableTransaction()->MutableModifyScheme()->MutableDropCdcStream(); desc.SetTableName(tableName); desc.SetStreamName(streamName); - runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release())); - auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender); - UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress); - return ev->Get()->Record.GetTxId(); + return RunSchemeTx(*server->GetRuntime(), std::move(request)); } void WaitTxNotification(Tests::TServer::TPtr server, TActorId sender, ui64 txId) { |