aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2022-07-25 19:05:28 +0300
committerilnaz <ilnaz@ydb.tech>2022-07-25 19:05:28 +0300
commit6288f21c1c76968f728a2951d4159b50bb8bbece (patch)
treed20a984fd1beac242959745860071294299e1947
parent3935c4ab2dab91b36d5432b0394025fd59fbf656 (diff)
downloadydb-6288f21c1c76968f728a2951d4159b50bb8bbece.tar.gz
(refactoring) Scheme tx helpers
-rw-r--r--ydb/core/tx/datashard/datashard_ut_common.cpp361
1 files changed, 140 insertions, 221 deletions
diff --git a/ydb/core/tx/datashard/datashard_ut_common.cpp b/ydb/core/tx/datashard/datashard_ut_common.cpp
index b0438ef795..8a50b78adb 100644
--- a/ydb/core/tx/datashard/datashard_ut_common.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_common.cpp
@@ -1096,113 +1096,132 @@ void InitRoot(Tests::TServer::TPtr server,
}
}
-void CreateShardedTable(Tests::TServer::TPtr server,
- TActorId sender,
- const TString &root,
- const TString &name,
- const TShardedTableOptions &opts)
+static THolder<TEvTxUserProxy::TEvProposeTransaction> SchemeTxTemplate(
+ NKikimrSchemeOp::EOperationType type,
+ const TString& workingDir = {})
{
- auto &runtime = *server->GetRuntime();
- auto &settings = server->GetSettings();
+ auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>();
+ request->Record.SetExecTimeoutPeriod(Max<ui64>());
- // Create table with four shards.
- ui64 txId;
- TAutoPtr<IEventHandle> handle;
- {
- auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>();
- request->Record.SetExecTimeoutPeriod(Max<ui64>());
- auto &tx = *request->Record.MutableTransaction()->MutableModifyScheme();
- tx.SetWorkingDir(root);
-
- NKikimrSchemeOp::TTableDescription* desc = nullptr;
- if (opts.Indexes_) {
- tx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateIndexedTable);
- desc = tx.MutableCreateIndexedTable()->MutableTableDescription();
- } else {
- tx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateTable);
- desc = tx.MutableCreateTable();
- }
+ auto& tx = *request->Record.MutableTransaction()->MutableModifyScheme();
+ tx.SetOperationType(type);
- desc->SetName(name);
+ if (workingDir) {
+ tx.SetWorkingDir(workingDir);
+ }
- for (const auto& column : opts.Columns_) {
- auto col = desc->AddColumns();
- col->SetName(column.Name);
- col->SetType(column.Type);
- col->SetNotNull(column.NotNull);
- if (column.IsKey) {
- desc->AddKeyColumnNames(column.Name);
- }
- }
+ return request;
+}
- for (const auto& index : opts.Indexes_) {
- auto* indexDesc = tx.MutableCreateIndexedTable()->MutableIndexDescription()->Add();
+static ui64 RunSchemeTx(
+ TTestActorRuntimeBase& runtime,
+ THolder<TEvTxUserProxy::TEvProposeTransaction>&& request,
+ TActorId sender = {},
+ bool viaActorSystem = false,
+ TEvTxUserProxy::TEvProposeTransactionStatus::EStatus expectedStatus = TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress)
+{
+ if (!sender) {
+ sender = runtime.AllocateEdgeActor();
+ }
- indexDesc->SetName(index.Name);
- indexDesc->SetType(index.Type);
+ runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release()), 0, viaActorSystem);
+ auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender);
+ UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), expectedStatus);
- for (const auto& col : index.IndexColumns) {
- indexDesc->AddKeyColumnNames(col);
- }
- for (const auto& col : index.DataColumns) {
- indexDesc->AddDataColumnNames(col);
- }
- }
+ return ev->Get()->Record.GetTxId();
+}
- desc->SetUniformPartitionsCount(opts.Shards_);
+void CreateShardedTable(
+ Tests::TServer::TPtr server,
+ TActorId sender,
+ const TString &root,
+ const TString &name,
+ const TShardedTableOptions &opts)
+{
+ // Create table with four shards.
+ auto request = SchemeTxTemplate(NKikimrSchemeOp::ESchemeOpCreateTable, root);
- if (!opts.EnableOutOfOrder_)
- desc->MutablePartitionConfig()->MutablePipelineConfig()->SetEnableOutOfOrder(false);
+ auto& tx = *request->Record.MutableTransaction()->MutableModifyScheme();
+ NKikimrSchemeOp::TTableDescription* desc = nullptr;
+ if (opts.Indexes_) {
+ tx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateIndexedTable);
+ desc = tx.MutableCreateIndexedTable()->MutableTableDescription();
+ } else {
+ tx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateTable);
+ desc = tx.MutableCreateTable();
+ }
- if (opts.Policy_) {
- opts.Policy_->Serialize(*desc->MutablePartitionConfig()->MutableCompactionPolicy());
- }
+ UNIT_ASSERT(desc);
+ desc->SetName(name);
- switch (opts.ShadowData_) {
- case EShadowDataMode::Default:
- break;
- case EShadowDataMode::Enabled:
- desc->MutablePartitionConfig()->SetShadowData(true);
- break;
+ for (const auto& column : opts.Columns_) {
+ auto col = desc->AddColumns();
+ col->SetName(column.Name);
+ col->SetType(column.Type);
+ col->SetNotNull(column.NotNull);
+ if (column.IsKey) {
+ desc->AddKeyColumnNames(column.Name);
}
+ }
- if (opts.Followers_ > 0) {
- auto& followerGroup = *desc->MutablePartitionConfig()->AddFollowerGroups();
- followerGroup.SetFollowerCount(opts.Followers_);
- followerGroup.SetAllowLeaderPromotion(opts.FollowerPromotion_);
- }
+ for (const auto& index : opts.Indexes_) {
+ auto* indexDesc = tx.MutableCreateIndexedTable()->MutableIndexDescription()->Add();
- if (opts.ExternalStorage_) {
- auto& family = *desc->MutablePartitionConfig()->AddColumnFamilies();
- family.SetStorage(NKikimrSchemeOp::ColumnStorageTest_1_2_1k);
- }
+ indexDesc->SetName(index.Name);
+ indexDesc->SetType(index.Type);
- if (opts.ExecutorCacheSize_) {
- desc->MutablePartitionConfig()->SetExecutorCacheSize(*opts.ExecutorCacheSize_);
+ for (const auto& col : index.IndexColumns) {
+ indexDesc->AddKeyColumnNames(col);
+ }
+ for (const auto& col : index.DataColumns) {
+ indexDesc->AddDataColumnNames(col);
}
+ }
- runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release()));
- auto reply = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(handle);
- UNIT_ASSERT_VALUES_EQUAL(reply->Record.GetStatus(), TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress);
- txId = reply->Record.GetTxId();
+ desc->SetUniformPartitionsCount(opts.Shards_);
+
+ if (!opts.EnableOutOfOrder_)
+ desc->MutablePartitionConfig()->MutablePipelineConfig()->SetEnableOutOfOrder(false);
+
+ if (opts.Policy_) {
+ opts.Policy_->Serialize(*desc->MutablePartitionConfig()->MutableCompactionPolicy());
}
- {
- auto request = MakeHolder<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletion>();
- request->Record.SetTxId(txId);
- auto tid = ChangeStateStorage(SchemeRoot, settings.Domain);
- runtime.SendToPipe(tid, sender, request.Release(), 0, GetPipeConfigWithRetries());
- runtime.GrabEdgeEventRethrow<TEvSchemeShard::TEvNotifyTxCompletionResult>(handle);
+
+ switch (opts.ShadowData_) {
+ case EShadowDataMode::Default:
+ break;
+ case EShadowDataMode::Enabled:
+ desc->MutablePartitionConfig()->SetShadowData(true);
+ break;
+ }
+
+ if (opts.Followers_ > 0) {
+ auto& followerGroup = *desc->MutablePartitionConfig()->AddFollowerGroups();
+ followerGroup.SetFollowerCount(opts.Followers_);
+ followerGroup.SetAllowLeaderPromotion(opts.FollowerPromotion_);
+ }
+
+ if (opts.ExternalStorage_) {
+ auto& family = *desc->MutablePartitionConfig()->AddColumnFamilies();
+ family.SetStorage(NKikimrSchemeOp::ColumnStorageTest_1_2_1k);
+ }
+
+ if (opts.ExecutorCacheSize_) {
+ desc->MutablePartitionConfig()->SetExecutorCacheSize(*opts.ExecutorCacheSize_);
}
+
+ WaitTxNotification(server, sender, RunSchemeTx(*server->GetRuntime(), std::move(request), sender));
}
-void CreateShardedTable(Tests::TServer::TPtr server,
- TActorId sender,
- const TString &root,
- const TString &name,
- ui64 shards,
- bool enableOutOfOrder,
- const NLocalDb::TCompactionPolicy* policy,
- EShadowDataMode shadowData)
+void CreateShardedTable(
+ Tests::TServer::TPtr server,
+ TActorId sender,
+ const TString &root,
+ const TString &name,
+ ui64 shards,
+ bool enableOutOfOrder,
+ const NLocalDb::TCompactionPolicy* policy,
+ EShadowDataMode shadowData)
{
auto opts = TShardedTableOptions()
.Shards(shards)
@@ -1219,22 +1238,12 @@ ui64 AsyncCreateCopyTable(
const TString &name,
const TString &from)
{
- auto &runtime = *server->GetRuntime();
-
- // Create table with four shards.
- auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>();
- request->Record.SetExecTimeoutPeriod(Max<ui64>());
- auto &tx = *request->Record.MutableTransaction()->MutableModifyScheme();
- tx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateTable);
- tx.SetWorkingDir(root);
- auto &desc = *tx.MutableCreateTable();
+ auto request = SchemeTxTemplate(NKikimrSchemeOp::ESchemeOpCreateTable, root);
+ auto& desc = *request->Record.MutableTransaction()->MutableModifyScheme()->MutableCreateTable();
desc.SetName(name);
desc.SetCopyFromTable(from);
- runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release()));
- auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender);
- UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress);
- return ev->Get()->Record.GetTxId();
+ return RunSchemeTx(*server->GetRuntime(), std::move(request), sender);
}
NKikimrScheme::TEvDescribeSchemeResult DescribeTable(Tests::TServer::TPtr server,
@@ -1495,22 +1504,13 @@ ui64 AsyncSplitTable(
ui64 sourceTablet,
ui32 splitKey)
{
- auto &runtime = *server->GetRuntime();
-
- // Create table with four shards.
- auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>();
- request->Record.SetExecTimeoutPeriod(Max<ui64>());
- auto &tx = *request->Record.MutableTransaction()->MutableModifyScheme();
- tx.SetOperationType(NKikimrSchemeOp::ESchemeOpSplitMergeTablePartitions);
- auto &desc = *tx.MutableSplitMergeTablePartitions();
+ auto request = SchemeTxTemplate(NKikimrSchemeOp::ESchemeOpSplitMergeTablePartitions);
+ auto& desc = *request->Record.MutableTransaction()->MutableModifyScheme()->MutableSplitMergeTablePartitions();
desc.SetTablePath(path);
desc.AddSourceTabletId(sourceTablet);
desc.AddSplitBoundary()->MutableKeyPrefix()->AddTuple()->MutableOptional()->SetUint32(splitKey);
- runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release()), 0, /* viaActorSystem */ true);
- auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender);
- UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress);
- return ev->Get()->Record.GetTxId();
+ return RunSchemeTx(*server->GetRuntime(), std::move(request), sender, true);
}
ui64 AsyncMergeTable(
@@ -1519,23 +1519,14 @@ ui64 AsyncMergeTable(
const TString& path,
const TVector<ui64>& sourceTabletIds)
{
- auto &runtime = *server->GetRuntime();
-
- // Create table with four shards.
- auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>();
- request->Record.SetExecTimeoutPeriod(Max<ui64>());
- auto &tx = *request->Record.MutableTransaction()->MutableModifyScheme();
- tx.SetOperationType(NKikimrSchemeOp::ESchemeOpSplitMergeTablePartitions);
- auto &desc = *tx.MutableSplitMergeTablePartitions();
+ auto request = SchemeTxTemplate(NKikimrSchemeOp::ESchemeOpSplitMergeTablePartitions);
+ auto& desc = *request->Record.MutableTransaction()->MutableModifyScheme()->MutableSplitMergeTablePartitions();
desc.SetTablePath(path);
for (ui64 tabletId : sourceTabletIds) {
desc.AddSourceTabletId(tabletId);
}
- runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release()));
- auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender);
- UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress);
- return ev->Get()->Record.GetTxId();
+ return RunSchemeTx(*server->GetRuntime(), std::move(request), sender);
}
ui64 AsyncAlterAddExtraColumn(
@@ -1543,25 +1534,14 @@ ui64 AsyncAlterAddExtraColumn(
const TString& workingDir,
const TString& name)
{
- auto &runtime = *server->GetRuntime();
- auto sender = runtime.AllocateEdgeActor();
-
- auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>();
- request->Record.SetExecTimeoutPeriod(Max<ui64>());
- auto &tx = *request->Record.MutableTransaction()->MutableModifyScheme();
- tx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterTable);
- tx.SetWorkingDir(workingDir);
- auto &desc = *tx.MutableAlterTable();
+ auto request = SchemeTxTemplate(NKikimrSchemeOp::ESchemeOpAlterTable, workingDir);
+ auto& desc = *request->Record.MutableTransaction()->MutableModifyScheme()->MutableAlterTable();
desc.SetName(name);
+ auto& col = *desc.AddColumns();
+ col.SetName("extra");
+ col.SetType("Uint32");
- auto* col = desc.AddColumns();
- col->SetName("extra");
- col->SetType("Uint32");
-
- runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release()));
- auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender);
- UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress);
- return ev->Get()->Record.GetTxId();
+ return RunSchemeTx(*server->GetRuntime(), std::move(request));
}
ui64 AsyncAlterDropColumn(
@@ -1570,24 +1550,13 @@ ui64 AsyncAlterDropColumn(
const TString& name,
const TString& colName)
{
- auto &runtime = *server->GetRuntime();
- auto sender = runtime.AllocateEdgeActor();
-
- auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>();
- request->Record.SetExecTimeoutPeriod(Max<ui64>());
- auto &tx = *request->Record.MutableTransaction()->MutableModifyScheme();
- tx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterTable);
- tx.SetWorkingDir(workingDir);
- auto &desc = *tx.MutableAlterTable();
+ auto request = SchemeTxTemplate(NKikimrSchemeOp::ESchemeOpAlterTable, workingDir);
+ auto& desc = *request->Record.MutableTransaction()->MutableModifyScheme()->MutableAlterTable();
desc.SetName(name);
+ auto& col = *desc.AddDropColumns();
+ col.SetName(colName);
- auto* col = desc.AddDropColumns();
- col->SetName(colName);
-
- runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release()));
- auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender);
- UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress);
- return ev->Get()->Record.GetTxId();
+ return RunSchemeTx(*server->GetRuntime(), std::move(request));
}
ui64 AsyncAlterAndDisableShadow(
@@ -1596,15 +1565,8 @@ ui64 AsyncAlterAndDisableShadow(
const TString& name,
const NLocalDb::TCompactionPolicy* policy)
{
- auto &runtime = *server->GetRuntime();
- auto sender = runtime.AllocateEdgeActor();
-
- auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>();
- request->Record.SetExecTimeoutPeriod(Max<ui64>());
- auto &tx = *request->Record.MutableTransaction()->MutableModifyScheme();
- tx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterTable);
- tx.SetWorkingDir(workingDir);
- auto &desc = *tx.MutableAlterTable();
+ auto request = SchemeTxTemplate(NKikimrSchemeOp::ESchemeOpAlterTable, workingDir);
+ auto& desc = *request->Record.MutableTransaction()->MutableModifyScheme()->MutableAlterTable();
desc.SetName(name);
desc.MutablePartitionConfig()->SetShadowData(false);
@@ -1612,10 +1574,7 @@ ui64 AsyncAlterAndDisableShadow(
policy->Serialize(*desc.MutablePartitionConfig()->MutableCompactionPolicy());
}
- runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release()));
- auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender);
- UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress);
- return ev->Get()->Record.GetTxId();
+ return RunSchemeTx(*server->GetRuntime(), std::move(request));
}
ui64 AsyncAlterAddIndex(
@@ -1683,22 +1642,12 @@ ui64 AsyncAlterDropIndex(
const TString& tableName,
const TString& indexName)
{
- auto &runtime = *server->GetRuntime();
- auto sender = runtime.AllocateEdgeActor();
-
- auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>();
- request->Record.SetExecTimeoutPeriod(Max<ui64>());
- auto &tx = *request->Record.MutableTransaction()->MutableModifyScheme();
- tx.SetOperationType(NKikimrSchemeOp::ESchemeOpDropIndex);
- tx.SetWorkingDir(workingDir);
- auto &desc = *tx.MutableDropIndex();
+ auto request = SchemeTxTemplate(NKikimrSchemeOp::ESchemeOpDropIndex, workingDir);
+ auto& desc = *request->Record.MutableTransaction()->MutableModifyScheme()->MutableDropIndex();
desc.SetTableName(tableName);
desc.SetIndexName(indexName);
- runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release()));
- auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender);
- UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress);
- return ev->Get()->Record.GetTxId();
+ return RunSchemeTx(*server->GetRuntime(), std::move(request));
}
ui64 AsyncAlterAddStream(
@@ -1707,24 +1656,14 @@ ui64 AsyncAlterAddStream(
const TString& tableName,
const TShardedTableOptions::TCdcStream& streamDesc)
{
- auto &runtime = *server->GetRuntime();
- auto sender = runtime.AllocateEdgeActor();
-
- auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>();
- request->Record.SetExecTimeoutPeriod(Max<ui64>());
- auto &tx = *request->Record.MutableTransaction()->MutableModifyScheme();
- tx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateCdcStream);
- tx.SetWorkingDir(workingDir);
- auto &desc = *tx.MutableCreateCdcStream();
+ auto request = SchemeTxTemplate(NKikimrSchemeOp::ESchemeOpCreateCdcStream, workingDir);
+ auto& desc = *request->Record.MutableTransaction()->MutableModifyScheme()->MutableCreateCdcStream();
desc.SetTableName(tableName);
desc.MutableStreamDescription()->SetName(streamDesc.Name);
desc.MutableStreamDescription()->SetMode(streamDesc.Mode);
desc.MutableStreamDescription()->SetFormat(streamDesc.Format);
- runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release()));
- auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender);
- UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress);
- return ev->Get()->Record.GetTxId();
+ return RunSchemeTx(*server->GetRuntime(), std::move(request));
}
ui64 AsyncAlterDisableStream(
@@ -1733,23 +1672,13 @@ ui64 AsyncAlterDisableStream(
const TString& tableName,
const TString& streamName)
{
- auto &runtime = *server->GetRuntime();
- auto sender = runtime.AllocateEdgeActor();
-
- auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>();
- request->Record.SetExecTimeoutPeriod(Max<ui64>());
- auto &tx = *request->Record.MutableTransaction()->MutableModifyScheme();
- tx.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterCdcStream);
- tx.SetWorkingDir(workingDir);
- auto &desc = *tx.MutableAlterCdcStream();
+ auto request = SchemeTxTemplate(NKikimrSchemeOp::ESchemeOpAlterCdcStream, workingDir);
+ auto& desc = *request->Record.MutableTransaction()->MutableModifyScheme()->MutableAlterCdcStream();
desc.SetTableName(tableName);
desc.SetStreamName(streamName);
desc.MutableDisable();
- runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release()));
- auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender);
- UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress);
- return ev->Get()->Record.GetTxId();
+ return RunSchemeTx(*server->GetRuntime(), std::move(request));
}
ui64 AsyncAlterDropStream(
@@ -1758,22 +1687,12 @@ ui64 AsyncAlterDropStream(
const TString& tableName,
const TString& streamName)
{
- auto &runtime = *server->GetRuntime();
- auto sender = runtime.AllocateEdgeActor();
-
- auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>();
- request->Record.SetExecTimeoutPeriod(Max<ui64>());
- auto &tx = *request->Record.MutableTransaction()->MutableModifyScheme();
- tx.SetOperationType(NKikimrSchemeOp::ESchemeOpDropCdcStream);
- tx.SetWorkingDir(workingDir);
- auto &desc = *tx.MutableDropCdcStream();
+ auto request = SchemeTxTemplate(NKikimrSchemeOp::ESchemeOpDropCdcStream, workingDir);
+ auto& desc = *request->Record.MutableTransaction()->MutableModifyScheme()->MutableDropCdcStream();
desc.SetTableName(tableName);
desc.SetStreamName(streamName);
- runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release()));
- auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender);
- UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress);
- return ev->Get()->Record.GetTxId();
+ return RunSchemeTx(*server->GetRuntime(), std::move(request));
}
void WaitTxNotification(Tests::TServer::TPtr server, TActorId sender, ui64 txId) {