aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksei Borzenkov <snaury@ydb.tech>2025-04-23 22:49:11 +0300
committerGitHub <noreply@github.com>2025-04-23 19:49:11 +0000
commite93caa619a41b0b3c8a85e53a43ceb073ac0304e (patch)
tree050a4dae99e46e3c011099854311e651e5085366
parent26da61702746571afe6af2196434ac08ff3425d2 (diff)
downloadydb-e93caa619a41b0b3c8a85e53a43ceb073ac0304e.tar.gz
Support an emergency one-to-one split/merge in SchemeShard (#17642)
-rw-r--r--ydb/core/protos/flat_scheme_op.proto1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__monitoring.cpp154
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp75
-rw-r--r--ydb/core/tx/schemeshard/ut_split_merge/ut_split_merge.cpp47
-rw-r--r--ydb/core/tx/schemeshard/ut_split_merge_reboots/ut_split_merge_reboots.cpp41
-rw-r--r--ydb/core/tx/schemeshard/ya.make1
6 files changed, 313 insertions, 6 deletions
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index c6b048d3d9d..41176fb96b2 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -1477,6 +1477,7 @@ message TSplitMergeTablePartitions {
repeated TSplitBoundary SplitBoundary = 5; // Points of split (there will be N+1 parts)
optional uint64 SchemeshardId = 6; // Only needed if TableId is used instead of path
optional uint64 TableOwnerId = 7;
+ optional bool AllowOneToOneSplitMerge = 8; // Allow a special 1-to-1 split/merge for emergencies
}
message TUserAttribute {
diff --git a/ydb/core/tx/schemeshard/schemeshard__monitoring.cpp b/ydb/core/tx/schemeshard/schemeshard__monitoring.cpp
index 76278ac524f..cc62a574274 100644
--- a/ydb/core/tx/schemeshard/schemeshard__monitoring.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__monitoring.cpp
@@ -5,14 +5,19 @@
#include <ydb/core/tx/datashard/range_ops.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
+#include <library/cpp/protobuf/json/proto2json.h>
+
#include <library/cpp/html/pcdata/pcdata.h>
#include <util/string/cast.h>
static ui64 TryParseTabletId(TStringBuf tabletIdParam) {
- if (tabletIdParam.StartsWith("0x"))
- return IntFromString<ui64, 16>(tabletIdParam.substr(2));
- else
- return FromStringWithDefault<ui64>(tabletIdParam, ui64(NKikimr::NSchemeShard::InvalidTabletId));
+ ui64 tabletId = ui64(NKikimr::NSchemeShard::InvalidTabletId);
+ if (tabletIdParam.StartsWith("0x")) {
+ TryIntFromString<16>(tabletIdParam.substr(2), tabletId);
+ } else {
+ TryFromString(tabletIdParam, tabletId);
+ }
+ return tabletId;
}
namespace NKikimr {
@@ -79,6 +84,7 @@ struct TCgi {
static const TParam BuildIndexId;
static const TParam UpdateCoordinatorsConfig;
static const TParam UpdateCoordinatorsConfigDryRun;
+ static const TParam Action;
struct TPages {
static constexpr TStringBuf MainPage = "Main";
@@ -91,6 +97,10 @@ struct TCgi {
static constexpr TStringBuf ShardInfoByShardIdx = "ShardInfoByShardIdx";
static constexpr TStringBuf BuildIndexInfo = "BuildIndexInfo";
};
+
+ struct TActions {
+ static constexpr TStringBuf SplitOneToOne = "SplitOneToOne";
+ };
};
const TCgi::TParam TCgi::TabletID = TStringBuf("TabletID");
@@ -111,6 +121,7 @@ const TCgi::TParam TCgi::Page = TStringBuf("Page");
const TCgi::TParam TCgi::BuildIndexId = TStringBuf("BuildIndexId");
const TCgi::TParam TCgi::UpdateCoordinatorsConfig = TStringBuf("UpdateCoordinatorsConfig");
const TCgi::TParam TCgi::UpdateCoordinatorsConfigDryRun = TStringBuf("UpdateCoordinatorsConfigDryRun");
+const TCgi::TParam TCgi::Action = TStringBuf("Action");
class TUpdateCoordinatorsConfigActor : public TActorBootstrapped<TUpdateCoordinatorsConfigActor> {
@@ -231,6 +242,93 @@ private:
THashMap<ui64, const TItem*> InFlight;
};
+class TMonitoringShardSplitOneToOne : public TActorBootstrapped<TMonitoringShardSplitOneToOne> {
+public:
+ TMonitoringShardSplitOneToOne(NMon::TEvRemoteHttpInfo::TPtr&& ev, ui64 schemeShardId, const TPathId& pathId, TTabletId shardId)
+ : Ev(std::move(ev))
+ , SchemeShardId(schemeShardId)
+ , PathId(pathId)
+ , ShardId(shardId)
+ {}
+
+ void Bootstrap() {
+ Send(MakeTxProxyID(), new TEvTxUserProxy::TEvAllocateTxId);
+ Become(&TThis::StateWaitTxId);
+ }
+
+ STFUNC(StateWaitTxId) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, Handle);
+ }
+ }
+
+ void Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev) {
+ TxId = ev->Get()->TxId;
+
+ auto propose = MakeHolder<TEvSchemeShard::TEvModifySchemeTransaction>(TxId, SchemeShardId);
+
+ auto& modifyScheme = *propose->Record.AddTransaction();
+ modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpSplitMergeTablePartitions);
+ modifyScheme.SetInternal(true);
+
+ auto& info = *modifyScheme.MutableSplitMergeTablePartitions();
+ info.SetTableOwnerId(PathId.OwnerId);
+ info.SetTableLocalId(PathId.LocalPathId);
+ info.AddSourceTabletId(ui64(ShardId));
+ info.SetAllowOneToOneSplitMerge(true);
+
+ PipeCache = MakePipePerNodeCacheID(EPipePerNodeCache::Leader);
+ Send(PipeCache, new TEvPipeCache::TEvForward(propose.Release(), SchemeShardId, /* subscribe */ true));
+ Become(&TThis::StateWaitProposed);
+ }
+
+ STFUNC(StateWaitProposed) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvSchemeShard::TEvModifySchemeTransactionResult, Handle);
+ hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
+ }
+ }
+
+ void Handle(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& ev) {
+ TString text;
+ try {
+ NProtobufJson::Proto2Json(ev->Get()->Record, text, {
+ .EnumMode = NProtobufJson::TProto2JsonConfig::EnumName,
+ .FieldNameMode = NProtobufJson::TProto2JsonConfig::FieldNameSnakeCaseDense,
+ .MapAsObject = true,
+ });
+ } catch (const std::exception& e) {
+ Send(Ev->Sender, new NMon::TEvRemoteBinaryInfoRes(
+ "HTTP/1.1 500 Internal Error\r\nConnection: Close\r\n\r\nUnexpected failure to serialize the response\r\n"));
+ PassAway();
+ }
+
+ Send(Ev->Sender, new NMon::TEvRemoteJsonInfoRes(text));
+ PassAway();
+ }
+
+ void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr&) {
+ Send(Ev->Sender, new NMon::TEvRemoteBinaryInfoRes(
+ TStringBuilder() << "HTTP/1.1 502 Bad Gateway\r\nConnection: Close\r\n\r\nSchemeShard tablet disconnected\r\n"));
+ PassAway();
+ }
+
+ void PassAway() override {
+ if (PipeCache) {
+ Send(PipeCache, new TEvPipeCache::TEvUnlink(0));
+ }
+ TActorBootstrapped::PassAway();
+ }
+
+private:
+ NMon::TEvRemoteHttpInfo::TPtr Ev;
+ ui64 SchemeShardId;
+ TPathId PathId;
+ TTabletId ShardId;
+ ui64 TxId = 0;
+ TActorId PipeCache;
+};
+
struct TSchemeShard::TTxMonitoring : public NTabletFlatExecutor::TTransactionBase<TSchemeShard> {
NMon::TEvRemoteHttpInfo::TPtr Ev;
TStringStream Answer;
@@ -242,11 +340,18 @@ public:
{
}
+ TTxType GetTxType() const override { return TXTYPE_MONITORING; }
+
bool Execute(NTabletFlatExecutor::TTransactionContext &txc, const TActorContext &ctx) override {
Y_UNUSED(txc);
const TCgiParameters& cgi = Ev->Get()->Cgi();
+ if (cgi.Has(TCgi::Action)) {
+ HandleAction(cgi.Get(TCgi::Action), cgi, ctx);
+ return true;
+ }
+
const TString page = cgi.Has(TCgi::Page) ? cgi.Get(TCgi::Page) : ToString(TCgi::TPages::MainPage);
if (page == TCgi::TPages::AdminRequest) {
@@ -311,7 +416,7 @@ public:
}
void Complete(const TActorContext &ctx) override {
- if (Answer) {
+ if (Ev && Answer) {
ctx.Send(Ev->Sender, new NMon::TEvRemoteHttpInfoRes(Answer.Str()));
}
}
@@ -1360,7 +1465,44 @@ private:
}
}
- TTxType GetTxType() const override { return TXTYPE_MONITORING; }
+private:
+ void SendBadRequest(const TString& details, const TActorContext& ctx) {
+ ctx.Send(Ev->Sender, new NMon::TEvRemoteBinaryInfoRes(
+ TStringBuilder() << "HTTP/1.1 400 Bad Request\r\nConnection: Close\r\n\r\n" << details << "\r\n"));
+ }
+
+private:
+ void HandleAction(const TString& action, const TCgiParameters& cgi, const TActorContext& ctx) {
+ if (Ev->Get()->Method != HTTP_METHOD_POST) {
+ SendBadRequest("Action requires a POST method", ctx);
+ return;
+ }
+
+ if (action == TCgi::TActions::SplitOneToOne) {
+ TTabletId tabletId = TTabletId(TryParseTabletId(cgi.Get(TCgi::ShardID)));
+ TShardIdx shardIdx = Self->GetShardIdx(tabletId);
+ if (!shardIdx) {
+ SendBadRequest("Cannot find the specified shard", ctx);
+ return;
+ }
+ auto* info = Self->ShardInfos.FindPtr(shardIdx);
+ if (!info) {
+ SendBadRequest("Cannot find the specified shard info", ctx);
+ return;
+ }
+ TPathId pathId = info->PathId;
+ auto* table = Self->Tables.FindPtr(pathId);
+ if (!table) {
+ SendBadRequest("Cannot find the specified shard's table", ctx);
+ return;
+ }
+
+ ctx.Register(new TMonitoringShardSplitOneToOne(std::move(Ev), Self->TabletID(), pathId, tabletId));
+ return;
+ }
+
+ SendBadRequest("Action not supported", ctx);
+ }
};
bool TSchemeShard::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TActorContext &ctx) {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp
index 83e876b011b..ec0895f5629 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_split_merge.cpp
@@ -705,6 +705,75 @@ public:
return true;
}
+ bool AllocateDstForOneToOne(
+ const NKikimrSchemeOp::TSplitMergeTablePartitions& info,
+ TTxId txId,
+ const TPathId& pathId,
+ const TVector<ui64>& srcPartitionIdxs,
+ const TTableInfo::TCPtr tableInfo,
+ TTxState& op,
+ const TChannelsBindings& channels,
+ TString& errStr,
+ TOperationContext& context)
+ {
+ Y_UNUSED(errStr);
+
+ // 1 source shard is split/merged into 1 shard
+ Y_ABORT_UNLESS(srcPartitionIdxs.size() == 1);
+ Y_ABORT_UNLESS(info.SplitBoundarySize() == 0);
+
+ TString firstRangeBegin;
+ if (srcPartitionIdxs[0] != 0) {
+ // Take the end of previous shard
+ firstRangeBegin = tableInfo->GetPartitions()[srcPartitionIdxs[0]-1].EndOfRange;
+ } else {
+ TVector<TCell> firstKey;
+ ui32 keyColCount = 0;
+ for (const auto& col : tableInfo->Columns) {
+ if (col.second.IsKey()) {
+ ++keyColCount;
+ }
+ }
+ // Or start from (NULL, NULL, .., NULL)
+ firstKey.resize(keyColCount);
+ firstRangeBegin = TSerializedCellVec::Serialize(firstKey);
+ }
+
+ op.SplitDescription = std::make_shared<NKikimrTxDataShard::TSplitMergeDescription>();
+ // Fill src shards
+ TString prevRangeEnd = firstRangeBegin;
+ for (ui64 pi : srcPartitionIdxs) {
+ auto* srcRange = op.SplitDescription->AddSourceRanges();
+ auto shardIdx = tableInfo->GetPartitions()[pi].ShardIdx;
+ srcRange->SetShardIdx(ui64(shardIdx.GetLocalId()));
+ srcRange->SetTabletID(ui64(context.SS->ShardInfos[shardIdx].TabletID));
+ srcRange->SetKeyRangeBegin(prevRangeEnd);
+ TString rangeEnd = tableInfo->GetPartitions()[pi].EndOfRange;
+ srcRange->SetKeyRangeEnd(rangeEnd);
+ prevRangeEnd = rangeEnd;
+ }
+
+ // Fill dst shard
+ TShardInfo datashardInfo = TShardInfo::DataShardInfo(txId, pathId);
+ datashardInfo.BindedChannels = channels;
+
+ auto idx = context.SS->RegisterShardInfo(datashardInfo);
+
+ ui64 lastSrcPartition = srcPartitionIdxs.back();
+ TString lastRangeEnd = tableInfo->GetPartitions()[lastSrcPartition].EndOfRange;
+
+ TTxState::TShardOperation dstShardOp(idx, ETabletType::DataShard, TTxState::CreateParts);
+ dstShardOp.RangeEnd = lastRangeEnd;
+ op.Shards.push_back(dstShardOp);
+
+ auto* dstRange = op.SplitDescription->AddDestinationRanges();
+ dstRange->SetShardIdx(ui64(idx.GetLocalId()));
+ dstRange->SetKeyRangeBegin(firstRangeBegin);
+ dstRange->SetKeyRangeEnd(lastRangeEnd);
+
+ return true;
+ }
+
THolder<TProposeResponse> Propose(const TString&, TOperationContext& context) override {
const TTabletId ssId = context.SS->SelfTabletId();
@@ -942,6 +1011,12 @@ public:
setResultError(NKikimrScheme::StatusInvalidParameter, errStr);
return result;
}
+ } else if (srcPartitionIdxs.size() == 1 && dstCount == 1 && info.GetAllowOneToOneSplitMerge()) {
+ // This is one-to-one split/merge
+ if (!AllocateDstForOneToOne(info, OperationId.GetTxId(), path.Base()->PathId, srcPartitionIdxs, tableInfo, op, channelsBinding, errStr, context)) {
+ setResultError(NKikimrScheme::StatusInvalidParameter, errStr);
+ return result;
+ }
} else {
setResultError(NKikimrScheme::StatusInvalidParameter, "Invalid request: only 1->N or N->1 are supported");
return result;
diff --git a/ydb/core/tx/schemeshard/ut_split_merge/ut_split_merge.cpp b/ydb/core/tx/schemeshard/ut_split_merge/ut_split_merge.cpp
index a4fdb37029f..5cbef442592 100644
--- a/ydb/core/tx/schemeshard/ut_split_merge/ut_split_merge.cpp
+++ b/ydb/core/tx/schemeshard/ut_split_merge/ut_split_merge.cpp
@@ -87,6 +87,53 @@ Y_UNIT_TEST_SUITE(TSchemeShardSplitBySizeTest) {
}
+ Y_UNIT_TEST(ConcurrentSplitOneToOne) {
+ TTestBasicRuntime runtime;
+
+ TTestEnvOptions opts;
+ opts.EnableBackgroundCompaction(false);
+
+ TTestEnv env(runtime, opts);
+
+ ui64 txId = 100;
+
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "Key" Type: "Utf8"}
+ Columns { Name: "Value" Type: "Utf8"}
+ KeyColumnNames: ["Key", "Value"]
+ )");
+ env.TestWaitNotification(runtime, txId);
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/Table", true),
+ {NLs::PartitionKeys({""})});
+
+ TVector<THolder<IEventHandle>> suppressed;
+ auto prevObserver = SetSuppressObserver(runtime, suppressed, TEvHive::TEvCreateTablet::EventType);
+
+ TestSplitTable(runtime, ++txId, "/MyRoot/Table", R"(
+ SourceTabletId: 72075186233409546
+ AllowOneToOneSplitMerge: true
+ )");
+
+ RebootTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor());
+
+ TestSplitTable(runtime, ++txId, "/MyRoot/Table", R"(
+ SourceTabletId: 72075186233409546
+ AllowOneToOneSplitMerge: true
+ )",
+ {NKikimrScheme::StatusMultipleModifications});
+
+ WaitForSuppressed(runtime, suppressed, 2, prevObserver);
+
+ RebootTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor());
+
+ env.TestWaitNotification(runtime, {txId-1, txId});
+ env.TestWaitTabletDeletion(runtime, TTestTxConfig::FakeHiveTablets); //delete src
+
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/Table", true),
+ {NLs::PartitionKeys({""})});
+ }
+
Y_UNIT_TEST(Split10Shards) {
TTestBasicRuntime runtime;
diff --git a/ydb/core/tx/schemeshard/ut_split_merge_reboots/ut_split_merge_reboots.cpp b/ydb/core/tx/schemeshard/ut_split_merge_reboots/ut_split_merge_reboots.cpp
index d1b798a2db2..d7d0c545cd1 100644
--- a/ydb/core/tx/schemeshard/ut_split_merge_reboots/ut_split_merge_reboots.cpp
+++ b/ydb/core/tx/schemeshard/ut_split_merge_reboots/ut_split_merge_reboots.cpp
@@ -398,6 +398,47 @@ Y_UNIT_TEST_SUITE(TSchemeShardSplitTestReboots) {
});
}
+ Y_UNIT_TEST(SplitTableOneToOneWithReboots) {
+ TTestWithReboots t(true);
+ t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
+ {
+ TInactiveZone inactive(activeZone);
+ TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key1" Type: "Utf8"}
+ Columns { Name: "key2" Type: "Uint32"}
+ Columns { Name: "Value" Type: "Utf8"}
+ KeyColumnNames: ["key1", "key2"]
+ SplitBoundary {
+ KeyPrefix {
+ Tuple { Optional { Text: "Jack" } }
+ }
+ })");
+ t.TestEnv->TestWaitNotification(runtime, t.TxId);
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/Table", true),
+ {NLs::PartitionKeys({"Jack", ""})});
+ }
+
+ AsyncSplitTable(runtime, ++t.TxId, "/MyRoot/Table", R"(
+ SourceTabletId: 72075186233409546
+ AllowOneToOneSplitMerge: true
+ )");
+ AsyncSplitTable(runtime, ++t.TxId, "/MyRoot/Table", R"(
+ SourceTabletId: 72075186233409547
+ AllowOneToOneSplitMerge: true
+ )");
+
+ t.TestEnv->TestWaitNotification(runtime, {t.TxId-1, t.TxId});
+ t.TestEnv->TestWaitTabletDeletion(runtime, {72075186233409546, 72075186233409547}); //delete src
+
+ {
+ TInactiveZone inactive(activeZone);
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/Table", true),
+ {NLs::PartitionKeys({"Jack", ""})});
+ }
+ });
+ }
+
Y_UNIT_TEST(MergeTableWithReboots) { //+
TTestWithReboots t(true);
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make
index f622ad83d46..52af72a5976 100644
--- a/ydb/core/tx/schemeshard/ya.make
+++ b/ydb/core/tx/schemeshard/ya.make
@@ -277,6 +277,7 @@ PEERDIR(
library/cpp/deprecated/enum_codegen
library/cpp/html/pcdata
library/cpp/json
+ library/cpp/protobuf/json
ydb/core/actorlib_impl
ydb/core/audit
ydb/core/base