aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSemyon Danilov <senya@ydb.tech>2024-12-31 16:47:16 +0400
committerGitHub <noreply@github.com>2024-12-31 16:47:16 +0400
commitacaff68f553828b0b8f93eefc71d89dcd7848e1f (patch)
tree4d94b67575fefecd22e6ebd64b7896ae29c9a729
parentbafd5812353f1c4f6ef5ab41b069d3eb765f1b31 (diff)
downloadydb-acaff68f553828b0b8f93eefc71d89dcd7848e1f.tar.gz
Fix SIGSEGV in balancing actor (#13136)
-rw-r--r--ydb/core/blobstorage/vdisk/balance/balancing_actor.cpp24
-rw-r--r--ydb/core/blobstorage/vdisk/balance/balancing_actor.h4
-rw-r--r--ydb/core/blobstorage/vdisk/balance/deleter.cpp16
-rw-r--r--ydb/core/blobstorage/vdisk/balance/sender.cpp40
4 files changed, 46 insertions, 38 deletions
diff --git a/ydb/core/blobstorage/vdisk/balance/balancing_actor.cpp b/ydb/core/blobstorage/vdisk/balance/balancing_actor.cpp
index b0d3585439..ee35f7a079 100644
--- a/ydb/core/blobstorage/vdisk/balance/balancing_actor.cpp
+++ b/ydb/core/blobstorage/vdisk/balance/balancing_actor.cpp
@@ -19,7 +19,7 @@ namespace NBalancing {
: BatchSize(batchSize)
{}
- TConstArrayRef<T> GetNextBatch() {
+ TVector<T> GetNextBatch() {
if (Empty()) {
return {};
}
@@ -27,7 +27,8 @@ namespace NBalancing {
ui32 begin = CurPos;
ui32 end = Min(begin + BatchSize, static_cast<ui32>(Data.size()));
CurPos = end;
- return TConstArrayRef<T>(Data.data() + begin, end - begin);
+
+ return TVector<T>(std::make_move_iterator(Data.begin() + begin), std::make_move_iterator(Data.begin() + end));
}
bool Empty() const {
@@ -60,9 +61,9 @@ namespace NBalancing {
}
TBatchManager() = default;
- TBatchManager(IActor* sender, IActor* deleter)
- : SenderId(TlsActivationContext->Register(sender))
- , DeleterId(TlsActivationContext->Register(deleter))
+ TBatchManager(const TActorId& sender, const TActorId& deleter)
+ : SenderId(sender)
+ , DeleterId(deleter)
{}
};
@@ -204,13 +205,13 @@ namespace NBalancing {
///////////////////////////////////////////////////////////////////////////////////////////
void ContinueBalancing() {
- Ctx->MonGroup.PlannedToSendOnMain() = SendOnMainParts.Data.size();
- Ctx->MonGroup.CandidatesToDelete() = TryDeleteParts.Data.size();
+ Ctx->MonGroup.PlannedToSendOnMain() = SendOnMainParts.Size();
+ Ctx->MonGroup.CandidatesToDelete() = TryDeleteParts.Size();
if (SendOnMainParts.Empty() && TryDeleteParts.Empty()) {
// no more parts to send or delete
STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB03, VDISKP(Ctx->VCtx, "Balancing completed"));
- bool hasSomeWorkForNextEpoch = SendOnMainParts.Data.size() >= Ctx->Cfg.MaxToSendPerEpoch || TryDeleteParts.Data.size() >= Ctx->Cfg.MaxToDeletePerEpoch;
+ bool hasSomeWorkForNextEpoch = SendOnMainParts.Size() >= Ctx->Cfg.MaxToSendPerEpoch || TryDeleteParts.Size() >= Ctx->Cfg.MaxToDeletePerEpoch;
Stop(hasSomeWorkForNextEpoch ? TDuration::Seconds(0) : Ctx->Cfg.TimeToSleepIfNothingToDo);
return;
}
@@ -231,9 +232,12 @@ namespace NBalancing {
(ConnectedVDisks, ConnectedVDisks.size()), (TotalVDisks, GInfo->GetTotalVDisksNum()));
// register sender and deleter actors
+ IActor* sender = CreateSenderActor(SelfId(), SendOnMainParts.GetNextBatch(), QueueActorMapPtr, Ctx);
+ IActor* deleter = CreateDeleterActor(SelfId(), TryDeleteParts.GetNextBatch(), QueueActorMapPtr, Ctx);
+
BatchManager = TBatchManager(
- CreateSenderActor(SelfId(), SendOnMainParts.GetNextBatch(), QueueActorMapPtr, Ctx),
- CreateDeleterActor(SelfId(), TryDeleteParts.GetNextBatch(), QueueActorMapPtr, Ctx)
+ RegisterWithSameMailbox(sender),
+ RegisterWithSameMailbox(deleter)
);
}
diff --git a/ydb/core/blobstorage/vdisk/balance/balancing_actor.h b/ydb/core/blobstorage/vdisk/balance/balancing_actor.h
index 9e35865a53..67dd4cec88 100644
--- a/ydb/core/blobstorage/vdisk/balance/balancing_actor.h
+++ b/ydb/core/blobstorage/vdisk/balance/balancing_actor.h
@@ -10,13 +10,13 @@ namespace NKikimr {
namespace NBalancing {
IActor* CreateSenderActor(
TActorId notifyId,
- TConstArrayRef<TPartInfo> parts,
+ TVector<TPartInfo>&& parts,
TQueueActorMapPtr queueActorMapPtr,
std::shared_ptr<TBalancingCtx> ctx
);
IActor* CreateDeleterActor(
TActorId notifyId,
- TConstArrayRef<TLogoBlobID> parts,
+ TVector<TLogoBlobID>&& parts,
TQueueActorMapPtr queueActorMapPtr,
std::shared_ptr<TBalancingCtx> ctx
);
diff --git a/ydb/core/blobstorage/vdisk/balance/deleter.cpp b/ydb/core/blobstorage/vdisk/balance/deleter.cpp
index b6d5764363..1c0bcb5aa2 100644
--- a/ydb/core/blobstorage/vdisk/balance/deleter.cpp
+++ b/ydb/core/blobstorage/vdisk/balance/deleter.cpp
@@ -21,7 +21,7 @@ namespace {
class TPartsRequester {
private:
const TActorId NotifyId;
- TConstArrayRef<TLogoBlobID> Parts;
+ TVector<TLogoBlobID> Parts;
TReplQuoter::TPtr Quoter;
TIntrusivePtr<TBlobStorageGroupInfo> GInfo;
TQueueActorMapPtr QueueActorMapPtr;
@@ -32,14 +32,14 @@ namespace {
ui32 Responses = 0;
public:
- TPartsRequester(TActorId notifyId, TConstArrayRef<TLogoBlobID> parts, TReplQuoter::TPtr quoter, TIntrusivePtr<TBlobStorageGroupInfo> gInfo, TQueueActorMapPtr queueActorMapPtr, NMonGroup::TBalancingGroup& monGroup)
+ TPartsRequester(TActorId notifyId, TVector<TLogoBlobID>&& parts, TReplQuoter::TPtr quoter, TIntrusivePtr<TBlobStorageGroupInfo> gInfo, TQueueActorMapPtr queueActorMapPtr, NMonGroup::TBalancingGroup& monGroup)
: NotifyId(notifyId)
- , Parts(parts)
+ , Parts(std::move(parts))
, Quoter(quoter)
, GInfo(gInfo)
, QueueActorMapPtr(queueActorMapPtr)
, MonGroup(monGroup)
- , Result(parts.size())
+ , Result(Parts.size())
{
}
@@ -294,14 +294,14 @@ namespace {
TDeleter() = default;
TDeleter(
TActorId notifyId,
- TConstArrayRef<TLogoBlobID> parts,
+ TVector<TLogoBlobID>&& parts,
TQueueActorMapPtr queueActorMapPtr,
std::shared_ptr<TBalancingCtx> ctx
)
: NotifyId(notifyId)
, Ctx(ctx)
, GInfo(ctx->GInfo)
- , PartsRequester(SelfId(), parts, Ctx->VCtx->ReplNodeRequestQuoter, GInfo, queueActorMapPtr, Ctx->MonGroup)
+ , PartsRequester(SelfId(), std::move(parts), Ctx->VCtx->ReplNodeRequestQuoter, GInfo, queueActorMapPtr, Ctx->MonGroup)
, PartsDeleter(ctx, GInfo)
{
}
@@ -314,11 +314,11 @@ namespace {
IActor* CreateDeleterActor(
TActorId notifyId,
- TConstArrayRef<TLogoBlobID> parts,
+ TVector<TLogoBlobID>&& parts,
TQueueActorMapPtr queueActorMapPtr,
std::shared_ptr<TBalancingCtx> ctx
) {
- return new TDeleter(notifyId, parts, queueActorMapPtr, ctx);
+ return new TDeleter(notifyId, std::move(parts), queueActorMapPtr, ctx);
}
} // NBalancing
diff --git a/ydb/core/blobstorage/vdisk/balance/sender.cpp b/ydb/core/blobstorage/vdisk/balance/sender.cpp
index a455abb572..ee47372f74 100644
--- a/ydb/core/blobstorage/vdisk/balance/sender.cpp
+++ b/ydb/core/blobstorage/vdisk/balance/sender.cpp
@@ -19,7 +19,7 @@ namespace {
class TReader {
private:
TPDiskCtxPtr PDiskCtx;
- TConstArrayRef<TPartInfo> Parts;
+ TVector<TPartInfo> Parts;
TReplQuoter::TPtr Quoter;
const TBlobStorageGroupType GType;
NMonGroup::TBalancingGroup& MonGroup;
@@ -28,27 +28,28 @@ namespace {
ui32 Responses = 0;
public:
- TReader(TPDiskCtxPtr pDiskCtx, TConstArrayRef<TPartInfo> parts, TReplQuoter::TPtr replPDiskReadQuoter, TBlobStorageGroupType gType, NMonGroup::TBalancingGroup& monGroup)
+ TReader(TPDiskCtxPtr pDiskCtx, TVector<TPartInfo>&& parts, TReplQuoter::TPtr replPDiskReadQuoter, TBlobStorageGroupType gType, NMonGroup::TBalancingGroup& monGroup)
: PDiskCtx(pDiskCtx)
- , Parts(parts)
+ , Parts(std::move(parts))
, Quoter(replPDiskReadQuoter)
, GType(gType)
, MonGroup(monGroup)
- , Result(parts.size())
+ , Result(Parts.size())
{}
void SendReadRequests(const TActorId& selfId) {
for (ui32 i = 0; i < Parts.size(); ++i) {
- const auto& item = Parts[i];
+ auto& item = Parts[i];
Result[i] = TPart{.Key=item.Key, .PartsMask=item.PartsMask};
std::visit(TOverloaded{
- [&](const TRope& data) {
+ [&](TRope&& data) {
// part is already in memory, no need to read it from disk
Y_DEBUG_ABORT_UNLESS(item.PartsMask.CountBits() == 1);
- Result[i].PartsData = {data};
+ Result[i].PartsData.reserve(1);
+ Result[i].PartsData.emplace_back(std::move(data));
++Responses;
},
- [&](const TDiskPart& diskPart) {
+ [&](TDiskPart&& diskPart) {
auto ev = std::make_unique<NPDisk::TEvChunkRead>(
PDiskCtx->Dsk->Owner,
PDiskCtx->Dsk->OwnerRound,
@@ -66,7 +67,7 @@ namespace {
);
MonGroup.ReadFromHandoffBytes() += diskPart.Size;
}
- }, item.PartData);
+ }, std::move(item.PartData));
}
}
@@ -83,6 +84,8 @@ namespace {
auto diskBlob = TDiskBlob(&data, localParts, GType, key);
ui32 readSize = 0;
+ Result[i].PartsData.reserve(localParts.CountBits());
+
for (ui8 partIdx = localParts.FirstPosition(); partIdx < localParts.GetSize(); partIdx = localParts.NextPosition(partIdx)) {
TRope result;
result = diskBlob.GetPart(partIdx, &result);
@@ -150,26 +153,27 @@ namespace {
auto localParts = part.PartsMask;
for (ui8 partIdx = localParts.FirstPosition(), i = 0; partIdx < localParts.GetSize(); partIdx = localParts.NextPosition(partIdx), ++i) {
auto key = TLogoBlobID(part.Key, partIdx + 1);
- auto& data = part.PartsData[i];
+ auto&& data = std::move(part.PartsData[i]);
+ size_t dataSize = data.size();
auto vDiskId = GetMainReplicaVDiskId(*GInfo, key);
if (Ctx->HugeBlobCtx->IsHugeBlob(GInfo->GetTopology().GType, part.Key, Ctx->MinHugeBlobInBytes)) {
auto ev = std::make_unique<TEvBlobStorage::TEvVPut>(
- key, data, vDiskId,
+ key, std::move(data), vDiskId,
true, nullptr,
TInstant::Max(), NKikimrBlobStorage::EPutHandleClass::AsyncBlob
);
- SendRequest(TVDiskIdShort(vDiskId), selfId, ev.release(), data.size());
+ SendRequest(TVDiskIdShort(vDiskId), selfId, ev.release(), dataSize);
} else {
STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB11, VDISKP(Ctx->VCtx, "Add in multiput"), (LogoBlobId, key.ToString()),
- (To, GInfo->GetTopology().GetOrderNumber(TVDiskIdShort(vDiskId))), (DataSize, data.size()));
+ (To, GInfo->GetTopology().GetOrderNumber(TVDiskIdShort(vDiskId))), (DataSize, dataSize));
auto& ev = vDiskToEv[vDiskId];
if (!ev) {
ev = std::make_unique<TEvBlobStorage::TEvVMultiPut>(vDiskId, TInstant::Max(), NKikimrBlobStorage::EPutHandleClass::AsyncBlob, true, nullptr);
}
- ev->AddVPut(key, TRcBuf(data), nullptr, {}, NWilson::TTraceId());
+ ev->AddVPut(key, TRcBuf(std::move(data)), nullptr, {}, NWilson::TTraceId());
}
}
}
@@ -344,7 +348,7 @@ namespace {
public:
TSender(
TActorId notifyId,
- TConstArrayRef<TPartInfo> parts,
+ TVector<TPartInfo>&& parts,
TQueueActorMapPtr queueActorMapPtr,
std::shared_ptr<TBalancingCtx> ctx
)
@@ -352,7 +356,7 @@ namespace {
, QueueActorMapPtr(queueActorMapPtr)
, Ctx(ctx)
, GInfo(ctx->GInfo)
- , Reader(Ctx->PDiskCtx, parts, ctx->VCtx->ReplPDiskReadQuoter, GInfo->GetTopology().GType, Ctx->MonGroup)
+ , Reader(Ctx->PDiskCtx, std::move(parts), ctx->VCtx->ReplPDiskReadQuoter, GInfo->GetTopology().GType, Ctx->MonGroup)
, Sender(ctx, GInfo, queueActorMapPtr)
{}
@@ -364,11 +368,11 @@ namespace {
IActor* CreateSenderActor(
TActorId notifyId,
- TConstArrayRef<TPartInfo> parts,
+ TVector<TPartInfo>&& parts,
TQueueActorMapPtr queueActorMapPtr,
std::shared_ptr<TBalancingCtx> ctx
) {
- return new TSender(notifyId, parts, queueActorMapPtr, ctx);
+ return new TSender(notifyId, std::move(parts), queueActorMapPtr, ctx);
}
} // NBalancing