diff options
author | Aleksei Borzenkov <snaury@ydb.tech> | 2025-04-23 22:49:11 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-04-23 19:49:11 +0000 |
commit | e93caa619a41b0b3c8a85e53a43ceb073ac0304e (patch) | |
tree | 050a4dae99e46e3c011099854311e651e5085366 | |
parent | 26da61702746571afe6af2196434ac08ff3425d2 (diff) | |
download | ydb-e93caa619a41b0b3c8a85e53a43ceb073ac0304e.tar.gz |
Support an emergency one-to-one split/merge in SchemeShard (#17642)
6 files changed, 313 insertions, 6 deletions
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index c6b048d3d9d..41176fb96b2 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -1477,6 +1477,7 @@ message TSplitMergeTablePartitions { repeated TSplitBoundary SplitBoundary = 5; // Points of split (there will be N+1 parts) optional uint64 SchemeshardId = 6; // Only needed if TableId is used instead of path optional uint64 TableOwnerId = 7; + optional bool AllowOneToOneSplitMerge = 8; // Allow a special 1-to-1 split/merge for emergencies } message TUserAttribute { diff --git a/ydb/core/tx/schemeshard/schemeshard__monitoring.cpp b/ydb/core/tx/schemeshard/schemeshard__monitoring.cpp index 76278ac524f..cc62a574274 100644 --- a/ydb/core/tx/schemeshard/schemeshard__monitoring.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__monitoring.cpp @@ -5,14 +5,19 @@ #include <ydb/core/tx/datashard/range_ops.h> #include <ydb/core/tx/tx_proxy/proxy.h> +#include <library/cpp/protobuf/json/proto2json.h> + #include <library/cpp/html/pcdata/pcdata.h> #include <util/string/cast.h> static ui64 TryParseTabletId(TStringBuf tabletIdParam) { - if (tabletIdParam.StartsWith("0x")) - return IntFromString<ui64, 16>(tabletIdParam.substr(2)); - else - return FromStringWithDefault<ui64>(tabletIdParam, ui64(NKikimr::NSchemeShard::InvalidTabletId)); + ui64 tabletId = ui64(NKikimr::NSchemeShard::InvalidTabletId); + if (tabletIdParam.StartsWith("0x")) { + TryIntFromString<16>(tabletIdParam.substr(2), tabletId); + } else { + TryFromString(tabletIdParam, tabletId); + } + return tabletId; } namespace NKikimr { @@ -79,6 +84,7 @@ struct TCgi { static const TParam BuildIndexId; static const TParam UpdateCoordinatorsConfig; static const TParam UpdateCoordinatorsConfigDryRun; + static const TParam Action; struct TPages { static constexpr TStringBuf MainPage = "Main"; @@ -91,6 +97,10 @@ struct TCgi { static constexpr TStringBuf ShardInfoByShardIdx = "ShardInfoByShardIdx"; static constexpr TStringBuf BuildIndexInfo = "BuildIndexInfo"; }; + + struct TActions { + static constexpr TStringBuf SplitOneToOne = "SplitOneToOne"; + }; }; const TCgi::TParam TCgi::TabletID = TStringBuf("TabletID"); @@ -111,6 +121,7 @@ const TCgi::TParam TCgi::Page = TStringBuf("Page"); const TCgi::TParam TCgi::BuildIndexId = TStringBuf("BuildIndexId"); const TCgi::TParam TCgi::UpdateCoordinatorsConfig = TStringBuf("UpdateCoordinatorsConfig"); const TCgi::TParam TCgi::UpdateCoordinatorsConfigDryRun = TStringBuf("UpdateCoordinatorsConfigDryRun"); +const TCgi::TParam TCgi::Action = TStringBuf("Action"); class TUpdateCoordinatorsConfigActor : public TActorBootstrapped<TUpdateCoordinatorsConfigActor> { @@ -231,6 +242,93 @@ private: THashMap<ui64, const TItem*> InFlight; }; +class TMonitoringShardSplitOneToOne : public TActorBootstrapped<TMonitoringShardSplitOneToOne> { +public: + TMonitoringShardSplitOneToOne(NMon::TEvRemoteHttpInfo::TPtr&& ev, ui64 schemeShardId, const TPathId& pathId, TTabletId shardId) + : Ev(std::move(ev)) + , SchemeShardId(schemeShardId) + , PathId(pathId) + , ShardId(shardId) + {} + + void Bootstrap() { + Send(MakeTxProxyID(), new TEvTxUserProxy::TEvAllocateTxId); + Become(&TThis::StateWaitTxId); + } + + STFUNC(StateWaitTxId) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, Handle); + } + } + + void Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev) { + TxId = ev->Get()->TxId; + + auto propose = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(TxId, SchemeShardId); + + auto& modifyScheme = *propose->Record.AddTransaction(); + modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpSplitMergeTablePartitions); + modifyScheme.SetInternal(true); + + auto& info = *modifyScheme.MutableSplitMergeTablePartitions(); + info.SetTableOwnerId(PathId.OwnerId); + info.SetTableLocalId(PathId.LocalPathId); + info.AddSourceTabletId(ui64(ShardId)); + info.SetAllowOneToOneSplitMerge(true); + + PipeCache = MakePipePerNodeCacheID(EPipePerNodeCache::Leader); + Send(PipeCache, new TEvPipeCache::TEvForward(propose.Release(), SchemeShardId, /* subscribe */ true)); + Become(&TThis::StateWaitProposed); + } + + STFUNC(StateWaitProposed) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvSchemeShard::TEvModifySchemeTransactionResult, Handle); + hFunc(TEvPipeCache::TEvDeliveryProblem, Handle); + } + } + + void Handle(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& ev) { + TString text; + try { + NProtobufJson::Proto2Json(ev->Get()->Record, text, { + .EnumMode = NProtobufJson::TProto2JsonConfig::EnumName, + .FieldNameMode = NProtobufJson::TProto2JsonConfig::FieldNameSnakeCaseDense, + .MapAsObject = true, + }); + } catch (const std::exception& e) { + Send(Ev->Sender, new NMon::TEvRemoteBinaryInfoRes( + "HTTP/1.1 500 Internal Error\r\nConnection: Close\r\n\r\nUnexpected failure to serialize the response\r\n")); + PassAway(); + } + + Send(Ev->Sender, new NMon::TEvRemoteJsonInfoRes(text)); + PassAway(); + } + + void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr&) { + Send(Ev->Sender, new NMon::TEvRemoteBinaryInfoRes( + TStringBuilder() << "HTTP/1.1 502 Bad Gateway\r\nConnection: Close\r\n\r\nSchemeShard tablet disconnected\r\n")); + PassAway(); + } + + void PassAway() override { + if (PipeCache) { + Send(PipeCache, new TEvPipeCache::TEvUnlink(0)); + } + TActorBootstrapped::PassAway(); + } + +private: + NMon::TEvRemoteHttpInfo::TPtr Ev; + ui64 SchemeShardId; + TPathId PathId; + TTabletId ShardId; + ui64 TxId = 0; + TActorId PipeCache; +}; + struct TSchemeShard::TTxMonitoring : public NTabletFlatExecutor::TTransactionBase<TSchemeShard> { NMon::TEvRemoteHttpInfo::TPtr Ev; TStringStream Answer; @@ -242,11 +340,18 @@ public: { } + TTxType GetTxType() const override { return TXTYPE_MONITORING; } + bool Execute(NTabletFlatExecutor::TTransactionContext &txc, const TActorContext &ctx) override { Y_UNUSED(txc); const TCgiParameters& cgi = Ev->Get()->Cgi(); + if (cgi.Has(TCgi::Action)) { + HandleAction(cgi.Get(TCgi::Action), cgi, ctx); + return true; + } + const TString page = cgi.Has(TCgi::Page) ? cgi.Get(TCgi::Page) : ToString(TCgi::TPages::MainPage); if (page == TCgi::TPages::AdminRequest) { @@ -311,7 +416,7 @@ public: } void Complete(const TActorContext &ctx) override { - if (Answer) { + if (Ev && Answer) { ctx.Send(Ev->Sender, new NMon::TEvRemoteHttpInfoRes(Answer.Str())); } } @@ -1360,7 +1465,44 @@ private: } } - TTxType GetTxType() const override { return TXTYPE_MONITORING; } +private: + void SendBadRequest(const TString& details, const TActorContext& ctx) { + ctx.Send(Ev->Sender, new NMon::TEvRemoteBinaryInfoRes( + TStringBuilder() << "HTTP/1.1 400 Bad Request\r\nConnection: Close\r\n\r\n" << details << "\r\n")); + } + +private: + void HandleAction(const TString& action, const TCgiParameters& cgi, const TActorContext& ctx) { + if (Ev->Get()->Method != HTTP_METHOD_POST) { + SendBadRequest("Action requires a POST method", ctx); + return; + } + + if (action == TCgi::TActions::SplitOneToOne) { + TTabletId tabletId = TTabletId(TryParseTabletId(cgi.Get(TCgi::ShardID))); + TShardIdx shardIdx = Self->GetShardIdx(tabletId); + if (!shardIdx) { + SendBadRequest("Cannot find the specified shard", ctx); + return; + } + auto* info = Self->ShardInfos.FindPtr(shardIdx); + if (!info) { + SendBadRequest("Cannot find the specified shard info", ctx); + return; + } + TPathId pathId = info->PathId; + auto* table = Self->Tables.FindPtr(pathId); + if (!table) { + SendBadRequest("Cannot find the specified shard's table", ctx); + return; + } + + ctx.Register(new TMonitoringShardSplitOneToOne(std::move(Ev), Self->TabletID(), pathId, tabletId)); + return; + } + + SendBadRequest("Action not supported", ctx); + } }; bool TSchemeShard::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TActorContext &ctx) { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp index 83e876b011b..ec0895f5629 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp @@ -705,6 +705,75 @@ public: return true; } + bool AllocateDstForOneToOne( + const NKikimrSchemeOp::TSplitMergeTablePartitions& info, + TTxId txId, + const TPathId& pathId, + const TVector<ui64>& srcPartitionIdxs, + const TTableInfo::TCPtr tableInfo, + TTxState& op, + const TChannelsBindings& channels, + TString& errStr, + TOperationContext& context) + { + Y_UNUSED(errStr); + + // 1 source shard is split/merged into 1 shard + Y_ABORT_UNLESS(srcPartitionIdxs.size() == 1); + Y_ABORT_UNLESS(info.SplitBoundarySize() == 0); + + TString firstRangeBegin; + if (srcPartitionIdxs[0] != 0) { + // Take the end of previous shard + firstRangeBegin = tableInfo->GetPartitions()[srcPartitionIdxs[0]-1].EndOfRange; + } else { + TVector<TCell> firstKey; + ui32 keyColCount = 0; + for (const auto& col : tableInfo->Columns) { + if (col.second.IsKey()) { + ++keyColCount; + } + } + // Or start from (NULL, NULL, .., NULL) + firstKey.resize(keyColCount); + firstRangeBegin = TSerializedCellVec::Serialize(firstKey); + } + + op.SplitDescription = std::make_shared<NKikimrTxDataShard::TSplitMergeDescription>(); + // Fill src shards + TString prevRangeEnd = firstRangeBegin; + for (ui64 pi : srcPartitionIdxs) { + auto* srcRange = op.SplitDescription->AddSourceRanges(); + auto shardIdx = tableInfo->GetPartitions()[pi].ShardIdx; + srcRange->SetShardIdx(ui64(shardIdx.GetLocalId())); + srcRange->SetTabletID(ui64(context.SS->ShardInfos[shardIdx].TabletID)); + srcRange->SetKeyRangeBegin(prevRangeEnd); + TString rangeEnd = tableInfo->GetPartitions()[pi].EndOfRange; + srcRange->SetKeyRangeEnd(rangeEnd); + prevRangeEnd = rangeEnd; + } + + // Fill dst shard + TShardInfo datashardInfo = TShardInfo::DataShardInfo(txId, pathId); + datashardInfo.BindedChannels = channels; + + auto idx = context.SS->RegisterShardInfo(datashardInfo); + + ui64 lastSrcPartition = srcPartitionIdxs.back(); + TString lastRangeEnd = tableInfo->GetPartitions()[lastSrcPartition].EndOfRange; + + TTxState::TShardOperation dstShardOp(idx, ETabletType::DataShard, TTxState::CreateParts); + dstShardOp.RangeEnd = lastRangeEnd; + op.Shards.push_back(dstShardOp); + + auto* dstRange = op.SplitDescription->AddDestinationRanges(); + dstRange->SetShardIdx(ui64(idx.GetLocalId())); + dstRange->SetKeyRangeBegin(firstRangeBegin); + dstRange->SetKeyRangeEnd(lastRangeEnd); + + return true; + } + THolder<TProposeResponse> Propose(const TString&, TOperationContext& context) override { const TTabletId ssId = context.SS->SelfTabletId(); @@ -942,6 +1011,12 @@ public: setResultError(NKikimrScheme::StatusInvalidParameter, errStr); return result; } + } else if (srcPartitionIdxs.size() == 1 && dstCount == 1 && info.GetAllowOneToOneSplitMerge()) { + // This is one-to-one split/merge + if (!AllocateDstForOneToOne(info, OperationId.GetTxId(), path.Base()->PathId, srcPartitionIdxs, tableInfo, op, channelsBinding, errStr, context)) { + setResultError(NKikimrScheme::StatusInvalidParameter, errStr); + return result; + } } else { setResultError(NKikimrScheme::StatusInvalidParameter, "Invalid request: only 1->N or N->1 are supported"); return result; diff --git a/ydb/core/tx/schemeshard/ut_split_merge/ut_split_merge.cpp b/ydb/core/tx/schemeshard/ut_split_merge/ut_split_merge.cpp index a4fdb37029f..5cbef442592 100644 --- a/ydb/core/tx/schemeshard/ut_split_merge/ut_split_merge.cpp +++ b/ydb/core/tx/schemeshard/ut_split_merge/ut_split_merge.cpp @@ -87,6 +87,53 @@ Y_UNIT_TEST_SUITE(TSchemeShardSplitBySizeTest) { } + Y_UNIT_TEST(ConcurrentSplitOneToOne) { + TTestBasicRuntime runtime; + + TTestEnvOptions opts; + opts.EnableBackgroundCompaction(false); + + TTestEnv env(runtime, opts); + + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "Key" Type: "Utf8"} + Columns { Name: "Value" Type: "Utf8"} + KeyColumnNames: ["Key", "Value"] + )"); + env.TestWaitNotification(runtime, txId); + TestDescribeResult(DescribePath(runtime, "/MyRoot/Table", true), + {NLs::PartitionKeys({""})}); + + TVector<THolder<IEventHandle>> suppressed; + auto prevObserver = SetSuppressObserver(runtime, suppressed, TEvHive::TEvCreateTablet::EventType); + + TestSplitTable(runtime, ++txId, "/MyRoot/Table", R"( + SourceTabletId: 72075186233409546 + AllowOneToOneSplitMerge: true + )"); + + RebootTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor()); + + TestSplitTable(runtime, ++txId, "/MyRoot/Table", R"( + SourceTabletId: 72075186233409546 + AllowOneToOneSplitMerge: true + )", + {NKikimrScheme::StatusMultipleModifications}); + + WaitForSuppressed(runtime, suppressed, 2, prevObserver); + + RebootTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor()); + + env.TestWaitNotification(runtime, {txId-1, txId}); + env.TestWaitTabletDeletion(runtime, TTestTxConfig::FakeHiveTablets); //delete src + + TestDescribeResult(DescribePath(runtime, "/MyRoot/Table", true), + {NLs::PartitionKeys({""})}); + } + Y_UNIT_TEST(Split10Shards) { TTestBasicRuntime runtime; diff --git a/ydb/core/tx/schemeshard/ut_split_merge_reboots/ut_split_merge_reboots.cpp b/ydb/core/tx/schemeshard/ut_split_merge_reboots/ut_split_merge_reboots.cpp index d1b798a2db2..d7d0c545cd1 100644 --- a/ydb/core/tx/schemeshard/ut_split_merge_reboots/ut_split_merge_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_split_merge_reboots/ut_split_merge_reboots.cpp @@ -398,6 +398,47 @@ Y_UNIT_TEST_SUITE(TSchemeShardSplitTestReboots) { }); } + Y_UNIT_TEST(SplitTableOneToOneWithReboots) { + TTestWithReboots t(true); + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + { + TInactiveZone inactive(activeZone); + TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key1" Type: "Utf8"} + Columns { Name: "key2" Type: "Uint32"} + Columns { Name: "Value" Type: "Utf8"} + KeyColumnNames: ["key1", "key2"] + SplitBoundary { + KeyPrefix { + Tuple { Optional { Text: "Jack" } } + } + })"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + TestDescribeResult(DescribePath(runtime, "/MyRoot/Table", true), + {NLs::PartitionKeys({"Jack", ""})}); + } + + AsyncSplitTable(runtime, ++t.TxId, "/MyRoot/Table", R"( + SourceTabletId: 72075186233409546 + AllowOneToOneSplitMerge: true + )"); + AsyncSplitTable(runtime, ++t.TxId, "/MyRoot/Table", R"( + SourceTabletId: 72075186233409547 + AllowOneToOneSplitMerge: true + )"); + + t.TestEnv->TestWaitNotification(runtime, {t.TxId-1, t.TxId}); + t.TestEnv->TestWaitTabletDeletion(runtime, {72075186233409546, 72075186233409547}); //delete src + + { + TInactiveZone inactive(activeZone); + TestDescribeResult(DescribePath(runtime, "/MyRoot/Table", true), + {NLs::PartitionKeys({"Jack", ""})}); + } + }); + } + Y_UNIT_TEST(MergeTableWithReboots) { //+ TTestWithReboots t(true); t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make index f622ad83d46..52af72a5976 100644 --- a/ydb/core/tx/schemeshard/ya.make +++ b/ydb/core/tx/schemeshard/ya.make @@ -277,6 +277,7 @@ PEERDIR( library/cpp/deprecated/enum_codegen library/cpp/html/pcdata library/cpp/json + library/cpp/protobuf/json ydb/core/actorlib_impl ydb/core/audit ydb/core/base |