diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-01-12 17:20:34 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-01-12 17:20:34 +0300 |
commit | 2057e7ab542becaaa93e24d46c134de5992f862c (patch) | |
tree | 98174fd98bbde6de8d01d231a4d62425ea45fbdd | |
parent | 8a6ebbf8374f2521f252d6055d422466ecf0f181 (diff) | |
download | ydb-2057e7ab542becaaa93e24d46c134de5992f862c.tar.gz |
(refactoring) MakeDataShardProposal
14 files changed, 44 insertions, 140 deletions
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_table.cpp index 408042c5f5..c4342bf53e 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_table.cpp @@ -273,16 +273,9 @@ public: LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Propose modify scheme on datashard " << datashardId << " txid: " << OperationId << " at schemeshard" << ssId); - auto seqNo = context.SS->StartRound(*txState); - TString txBody = context.SS->FillAlterTableTxBody(txState->TargetPathId, idx, seqNo); - THolder<TEvDataShard::TEvProposeTransaction> event = - MakeHolder<TEvDataShard::TEvProposeTransaction>(NKikimrTxDataShard::TX_KIND_SCHEME, - ui64(ssId), //owner schemeshard tablet id - context.Ctx.SelfID, - ui64(OperationId.GetTxId()), - txBody, - context.SS->SelectProcessingParams(txState->TargetPathId)); - + const auto seqNo = context.SS->StartRound(*txState); + const auto txBody = context.SS->FillAlterTableTxBody(txState->TargetPathId, idx, seqNo); + auto event = context.SS->MakeDataShardProposal(txState->TargetPathId, OperationId, txBody, context.Ctx); context.OnComplete.BindMsgToPipe(OperationId, datashardId, idx, event.Release()); } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_common.h b/ydb/core/tx/schemeshard/schemeshard__operation_common.h index 54bfb7f90b..e51b19a593 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_common.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation_common.h @@ -1085,22 +1085,13 @@ public: context.SS->FillSeqNo(tx, context.SS->StartRound(*txState)); FillNotice(pathId, tx, context); - TString txBody; - Y_PROTOBUF_SUPPRESS_NODISCARD tx.SerializeToString(&txBody); - txState->ClearShardsInProgress(); Y_VERIFY(txState->Shards.size()); for (ui32 i = 0; i < txState->Shards.size(); ++i) { const auto& idx = txState->Shards[i].Idx; const auto datashardId = context.SS->ShardInfos[idx].TabletID; - - auto ev = MakeHolder<TEvDataShard::TEvProposeTransaction>( - NKikimrTxDataShard::TX_KIND_SCHEME, context.SS->TabletID(), context.Ctx.SelfID, - ui64(OperationId.GetTxId()), txBody, - context.SS->SelectProcessingParams(pathId) - ); - + auto ev = context.SS->MakeDataShardProposal(pathId, OperationId, tx.SerializeAsString(), context.Ctx); context.OnComplete.BindMsgToPipe(OperationId, datashardId, idx, ev.Release()); } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp index 50c2561941..83d0925e0a 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_copy_table.cpp @@ -70,8 +70,6 @@ public: "CopyTable partition counts don't match"); const ui64 dstSchemaVersion = NEW_TABLE_ALTER_VERSION; - const ui64 subDomainPathId = context.SS->ResolvePathIdForDomain(txState->TargetPathId).LocalPathId; - for (ui32 i = 0; i < dstTableInfo->GetPartitions().size(); ++i) { TShardIdx srcShardIdx = srcTableInfo->GetPartitions()[i].ShardIdx; TTabletId srcDatashardId = context.SS->ShardInfos[srcShardIdx].TabletID; @@ -98,17 +96,11 @@ public: newShardTx.MutableReceiveSnapshot()->MutableTableId()->SetOwnerId(txState->TargetPathId.OwnerId); newShardTx.MutableReceiveSnapshot()->MutableTableId()->SetTableId(txState->TargetPathId.LocalPathId); newShardTx.MutableReceiveSnapshot()->AddReceiveFrom()->SetShard(ui64(srcDatashardId)); - TString txBody; - Y_PROTOBUF_SUPPRESS_NODISCARD newShardTx.SerializeToString(&txBody); - - auto dstEvent = MakeHolder<TEvDataShard::TEvProposeTransaction>( - NKikimrTxDataShard::TX_KIND_SCHEME, - context.SS->TabletID(), - subDomainPathId, - context.Ctx.SelfID, - ui64(OperationId.GetTxId()), - txBody, - context.SS->SelectProcessingParams(txState->TargetPathId)); + + auto dstEvent = context.SS->MakeDataShardProposal(txState->TargetPathId, OperationId, newShardTx.SerializeAsString(), context.Ctx); + if (const ui64 subDomainPathId = context.SS->ResolvePathIdForDomain(txState->TargetPathId).LocalPathId) { + dstEvent->Record.SetSubDomainPathId(subDomainPathId); + } context.OnComplete.BindMsgToPipe(OperationId, dstDatashardId, dstShardIdx, dstEvent.Release()); // Send "SendParts" transaction to source datashard @@ -119,15 +111,7 @@ public: oldShardTx.MutableSendSnapshot()->MutableTableId()->SetTableId(txState->SourcePathId.LocalPathId); oldShardTx.MutableSendSnapshot()->AddSendTo()->SetShard(ui64(dstDatashardId)); oldShardTx.SetReadOnly(true); - txBody.clear(); - Y_PROTOBUF_SUPPRESS_NODISCARD oldShardTx.SerializeToString(&txBody); - auto srcEvent = MakeHolder<TEvDataShard::TEvProposeTransaction>( - NKikimrTxDataShard::TX_KIND_SCHEME, - context.SS->TabletID(), - context.Ctx.SelfID, - ui64(OperationId.GetTxId()), - txBody, - context.SS->SelectProcessingParams(txState->TargetPathId)); + auto srcEvent = context.SS->MakeDataShardProposal(txState->TargetPathId, OperationId, oldShardTx.SerializeAsString(), context.Ctx); context.OnComplete.BindMsgToPipe(OperationId, srcDatashardId, srcShardIdx, srcEvent.Release()); } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_backup.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_backup.cpp index 27ebd45312..1e10b1cb8e 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_backup.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_backup.cpp @@ -18,13 +18,12 @@ struct TBackup { } static void ProposeTx(const TOperationId& opId, TTxState& txState, TOperationContext& context) { - auto seqNo = context.SS->StartRound(txState); - const auto& processingParams = context.SS->SelectProcessingParams(txState.TargetPathId); - - Y_VERIFY(context.SS->Tables.contains(txState.TargetPathId)); - TTableInfo::TPtr table = context.SS->Tables.at(txState.TargetPathId); + const auto& pathId = txState.TargetPathId; + Y_VERIFY(context.SS->Tables.contains(pathId)); + TTableInfo::TPtr table = context.SS->Tables.at(pathId); NKikimrSchemeOp::TBackupTask backup = table->BackupSettings; + const auto seqNo = context.SS->StartRound(txState); for (ui32 i = 0; i < txState.Shards.size(); ++i) { auto idx = txState.Shards[i].Idx; auto datashardId = context.SS->ShardInfos[idx].TabletID; @@ -35,15 +34,8 @@ struct TBackup { << " txid " << opId << " at schemeshard " << context.SS->SelfTabletId()); - TString txBody = context.SS->FillBackupTxBody(txState.TargetPathId, backup, i, seqNo); - auto event = MakeHolder<TEvDataShard::TEvProposeTransaction>( - NKikimrTxDataShard::TX_KIND_SCHEME, - context.SS->TabletID(), - context.Ctx.SelfID, - ui64(opId.GetTxId()), - txBody, - processingParams); - + const auto txBody = context.SS->FillBackupTxBody(pathId, backup, i, seqNo); + auto event = context.SS->MakeDataShardProposal(pathId, opId, txBody, context.Ctx); context.OnComplete.BindMsgToPipe(opId, datashardId, idx, event.Release()); backup.ClearTable(); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_restore.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_restore.cpp index ad2e7f2649..7b704f3b0e 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_restore.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_restore.cpp @@ -19,13 +19,10 @@ struct TRestore { static void ProposeTx(const TOperationId& opId, TTxState& txState, TOperationContext& context) { const auto& pathId = txState.TargetPathId; - - const auto seqNo = context.SS->StartRound(txState); - const auto& processingParams = context.SS->SelectProcessingParams(pathId); - Y_VERIFY(context.SS->Tables.contains(pathId)); TTableInfo::TPtr table = context.SS->Tables.at(pathId); + const auto seqNo = context.SS->StartRound(txState); for (ui32 i = 0; i < txState.Shards.size(); ++i) { const auto& idx = txState.Shards[i].Idx; const auto& datashardId = context.SS->ShardInfos[idx].TabletID; @@ -43,15 +40,7 @@ struct TRestore { restore.SetTableId(pathId.LocalPathId); restore.SetShardNum(i); - auto ev = MakeHolder<TEvDataShard::TEvProposeTransaction>( - NKikimrTxDataShard::TX_KIND_SCHEME, - context.SS->TabletID(), - context.Ctx.SelfID, - ui64(opId.GetTxId()), - tx.SerializeAsString(), - processingParams - ); - + auto ev = context.SS->MakeDataShardProposal(pathId, opId, tx.SerializeAsString(), context.Ctx); context.OnComplete.BindMsgToPipe(opId, datashardId, idx, ev.Release()); } } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp index f84bcc141a..c2317e4661 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_table.cpp @@ -193,8 +193,6 @@ public: txState->ClearShardsInProgress(); - const ui64 subDomainPathId = context.SS->ResolvePathIdForDomain(txState->TargetPathId).LocalPathId; - for (ui32 i = 0; i < txState->Shards.size(); ++i) { TShardIdx shardIdx = txState->Shards[i].Idx; TTabletId datashardId = context.SS->ShardInfos[shardIdx].TabletID; @@ -212,17 +210,10 @@ public: context.SS->FillSeqNo(tx, seqNo); context.SS->FillTableDescription(txState->TargetPathId, i, NEW_TABLE_ALTER_VERSION, tableDesc); - TString txBody; - Y_PROTOBUF_SUPPRESS_NODISCARD tx.SerializeToString(&txBody); - - auto event = MakeHolder<TEvDataShard::TEvProposeTransaction>( - NKikimrTxDataShard::TX_KIND_SCHEME, - context.SS->TabletID(), - subDomainPathId, - context.Ctx.SelfID, - ui64(OperationId.GetTxId()), - txBody, - context.SS->SelectProcessingParams(txState->TargetPathId)); + auto event = context.SS->MakeDataShardProposal(txState->TargetPathId, OperationId, tx.SerializeAsString(), context.Ctx); + if (const ui64 subDomainPathId = context.SS->ResolvePathIdForDomain(txState->TargetPathId).LocalPathId) { + event->Record.SetSubDomainPathId(subDomainPathId); + } LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, DebugHint() << " ProgressState" @@ -230,7 +221,7 @@ public: << " datashardId: " << datashardId << " message: " << event->Record.ShortDebugString()); - context.OnComplete.BindMsgToPipe(OperationId, datashardId, shardIdx, event.Release()); + context.OnComplete.BindMsgToPipe(OperationId, datashardId, shardIdx, event.Release()); } txState->UpdateShardsInProgress(); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_index.cpp index ab6a45402d..2331e78951 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_index.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_index.cpp @@ -112,14 +112,7 @@ public: auto idx = txState->Shards[i].Idx; auto datashardId = context.SS->ShardInfos[idx].TabletID; - auto event = MakeHolder<TEvDataShard::TEvProposeTransaction>( - NKikimrTxDataShard::TX_KIND_SCHEME, - context.SS->TabletID(), - context.Ctx.SelfID, - ui64(OperationId.GetTxId()), - txBody, - context.SS->SelectProcessingParams(txState->TargetPathId)); - + auto event = context.SS->MakeDataShardProposal(txState->TargetPathId, OperationId, txBody, context.Ctx); context.OnComplete.BindMsgToPipe(OperationId, datashardId, idx, event.Release()); } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_table.cpp index e286f4e663..bfe04ca0a0 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_table.cpp @@ -134,14 +134,7 @@ public: auto idx = txState->Shards[i].Idx; auto datashardId = context.SS->ShardInfos[idx].TabletID; - THolder<TEvDataShard::TEvProposeTransaction> event = - MakeHolder<TEvDataShard::TEvProposeTransaction>(NKikimrTxDataShard::TX_KIND_SCHEME, - context.SS->TabletID(), - context.Ctx.SelfID, - ui64(OperationId.GetTxId()), - txBody, - context.SS->SelectProcessingParams(txState->TargetPathId)); - + auto event = context.SS->MakeDataShardProposal(txState->TargetPathId, OperationId, txBody, context.Ctx); context.OnComplete.BindMsgToPipe(OperationId, datashardId, idx, event.Release()); } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_finalize_build_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_finalize_build_index.cpp index f33792e2b6..fb7e28739f 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_finalize_build_index.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_finalize_build_index.cpp @@ -80,9 +80,6 @@ public: context.SS->FillSeqNo(tx, seqNo); - TString txBody; - Y_PROTOBUF_SUPPRESS_NODISCARD tx.SerializeToString(&txBody); - LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, DebugHint() << " ProgressState" << " SEND TFlatSchemeTransaction to datashard: " << datashardId @@ -91,16 +88,8 @@ public: << " seqNo: " << seqNo << " at schemeshard: " << ssId); - - auto event = MakeHolder<TEvDataShard::TEvProposeTransaction>( - NKikimrTxDataShard::TX_KIND_SCHEME, - context.SS->TabletID(), - context.Ctx.SelfID, - ui64(OperationId.GetTxId()), - txBody, - context.SS->SelectProcessingParams(txState->TargetPathId)); - - context.OnComplete.BindMsgToPipe(OperationId, datashardId, shardIdx, event.Release()); + auto event = context.SS->MakeDataShardProposal(txState->TargetPathId, OperationId, tx.SerializeAsString(), context.Ctx); + context.OnComplete.BindMsgToPipe(OperationId, datashardId, shardIdx, event.Release()); } txState->UpdateShardsInProgress(); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_initiate_build_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_initiate_build_index.cpp index 87bcf55ebd..8e63d8d7a1 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_initiate_build_index.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_initiate_build_index.cpp @@ -102,9 +102,6 @@ public: NKikimrTxDataShard::TFlatSchemeTransaction tx(txTemplate); context.SS->FillSeqNo(tx, seqNo); - TString txBody; - Y_PROTOBUF_SUPPRESS_NODISCARD tx.SerializeToString(&txBody); - LOG_DEBUG_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, DebugHint() << " ProgressState" << " SEND TFlatSchemeTransaction to datashard: " << datashardId @@ -113,15 +110,8 @@ public: << " seqNo: " << seqNo << " at schemeshard: " << ssId); - - auto event = MakeHolder<TEvDataShard::TEvProposeTransaction>( - NKikimrTxDataShard::TX_KIND_SCHEME, - context.SS->TabletID(), context.Ctx.SelfID, - ui64(OperationId.GetTxId()), txBody, - context.SS->SelectProcessingParams(txState->TargetPathId) - ); - - context.OnComplete.BindMsgToPipe(OperationId, datashardId, shardIdx, event.Release()); + auto event = context.SS->MakeDataShardProposal(txState->TargetPathId, OperationId, tx.SerializeAsString(), context.Ctx); + context.OnComplete.BindMsgToPipe(OperationId, datashardId, shardIdx, event.Release()); } txState->UpdateShardsInProgress(); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_move_index.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_move_index.cpp index aa110142af..02701c53b3 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_move_index.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_move_index.cpp @@ -142,14 +142,7 @@ public: auto idx = txState->Shards[i].Idx; auto datashardId = context.SS->ShardInfos[idx].TabletID; - auto event = MakeHolder<TEvDataShard::TEvProposeTransaction>( - NKikimrTxDataShard::TX_KIND_SCHEME, - context.SS->TabletID(), - context.Ctx.SelfID, - ui64(OperationId.GetTxId()), - txBody, - context.SS->SelectProcessingParams(txState->TargetPathId)); - + auto event = context.SS->MakeDataShardProposal(txState->TargetPathId, OperationId, txBody, context.Ctx); context.OnComplete.BindMsgToPipe(OperationId, datashardId, idx, event.Release()); } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp index 6acb4eff55..245e48e54c 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_move_table.cpp @@ -122,17 +122,11 @@ public: auto idx = txState->Shards[i].Idx; auto datashardId = context.SS->ShardInfos[idx].TabletID; - auto event = MakeHolder<TEvDataShard::TEvProposeTransaction>(NKikimrTxDataShard::TX_KIND_SCHEME, - context.SS->TabletID(), - context.Ctx.SelfID, - ui64(OperationId.GetTxId()), - txBody, - context.SS->SelectProcessingParams(txState->TargetPathId)); - + auto event = context.SS->MakeDataShardProposal(txState->TargetPathId, OperationId, txBody, context.Ctx); context.OnComplete.BindMsgToPipe(OperationId, datashardId, idx, event.Release()); } - txState->UpdateShardsInProgress(TTxState::ConfigureParts); + txState->UpdateShardsInProgress(TTxState::ConfigureParts); return false; } }; diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 41c8264799..895952147e 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -214,6 +214,16 @@ struct TAttachOrder { } }; +THolder<TEvDataShard::TEvProposeTransaction> TSchemeShard::MakeDataShardProposal( + const TPathId& pathId, const TOperationId& opId, + const TString& body, const TActorContext& ctx) const +{ + return MakeHolder<TEvDataShard::TEvProposeTransaction>( + NKikimrTxDataShard::TX_KIND_SCHEME, TabletID(), ctx.SelfID, + ui64(opId.GetTxId()), body, SelectProcessingParams(pathId) + ); +} + TTxId TSchemeShard::GetCachedTxId(const TActorContext &ctx) { TTxId txId = InvalidTxId; if (CachedTxIds) { diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 4f9676fb31..59b8714237 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -327,6 +327,8 @@ public: TAutoPtr<TEvSchemeShard::TEvInitTenantSchemeShardResult> DelayedInitTenantReply; THolder<TProposeResponse> IgniteOperation(TProposeRequest& request, TOperationContext& context); + THolder<TEvDataShard::TEvProposeTransaction> MakeDataShardProposal(const TPathId& pathId, const TOperationId& opId, + const TString& body, const TActorContext& ctx) const; TPathId RootPathId() const { return MakeLocalId(TPathElement::RootPathId); |