diff options
author | Semyon Danilov <senya@ydb.tech> | 2024-12-31 16:47:16 +0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-31 16:47:16 +0400 |
commit | acaff68f553828b0b8f93eefc71d89dcd7848e1f (patch) | |
tree | 4d94b67575fefecd22e6ebd64b7896ae29c9a729 | |
parent | bafd5812353f1c4f6ef5ab41b069d3eb765f1b31 (diff) | |
download | ydb-acaff68f553828b0b8f93eefc71d89dcd7848e1f.tar.gz |
Fix SIGSEGV in balancing actor (#13136)
-rw-r--r-- | ydb/core/blobstorage/vdisk/balance/balancing_actor.cpp | 24 | ||||
-rw-r--r-- | ydb/core/blobstorage/vdisk/balance/balancing_actor.h | 4 | ||||
-rw-r--r-- | ydb/core/blobstorage/vdisk/balance/deleter.cpp | 16 | ||||
-rw-r--r-- | ydb/core/blobstorage/vdisk/balance/sender.cpp | 40 |
4 files changed, 46 insertions, 38 deletions
diff --git a/ydb/core/blobstorage/vdisk/balance/balancing_actor.cpp b/ydb/core/blobstorage/vdisk/balance/balancing_actor.cpp index b0d3585439..ee35f7a079 100644 --- a/ydb/core/blobstorage/vdisk/balance/balancing_actor.cpp +++ b/ydb/core/blobstorage/vdisk/balance/balancing_actor.cpp @@ -19,7 +19,7 @@ namespace NBalancing { : BatchSize(batchSize) {} - TConstArrayRef<T> GetNextBatch() { + TVector<T> GetNextBatch() { if (Empty()) { return {}; } @@ -27,7 +27,8 @@ namespace NBalancing { ui32 begin = CurPos; ui32 end = Min(begin + BatchSize, static_cast<ui32>(Data.size())); CurPos = end; - return TConstArrayRef<T>(Data.data() + begin, end - begin); + + return TVector<T>(std::make_move_iterator(Data.begin() + begin), std::make_move_iterator(Data.begin() + end)); } bool Empty() const { @@ -60,9 +61,9 @@ namespace NBalancing { } TBatchManager() = default; - TBatchManager(IActor* sender, IActor* deleter) - : SenderId(TlsActivationContext->Register(sender)) - , DeleterId(TlsActivationContext->Register(deleter)) + TBatchManager(const TActorId& sender, const TActorId& deleter) + : SenderId(sender) + , DeleterId(deleter) {} }; @@ -204,13 +205,13 @@ namespace NBalancing { /////////////////////////////////////////////////////////////////////////////////////////// void ContinueBalancing() { - Ctx->MonGroup.PlannedToSendOnMain() = SendOnMainParts.Data.size(); - Ctx->MonGroup.CandidatesToDelete() = TryDeleteParts.Data.size(); + Ctx->MonGroup.PlannedToSendOnMain() = SendOnMainParts.Size(); + Ctx->MonGroup.CandidatesToDelete() = TryDeleteParts.Size(); if (SendOnMainParts.Empty() && TryDeleteParts.Empty()) { // no more parts to send or delete STLOG(PRI_INFO, BS_VDISK_BALANCING, BSVB03, VDISKP(Ctx->VCtx, "Balancing completed")); - bool hasSomeWorkForNextEpoch = SendOnMainParts.Data.size() >= Ctx->Cfg.MaxToSendPerEpoch || TryDeleteParts.Data.size() >= Ctx->Cfg.MaxToDeletePerEpoch; + bool hasSomeWorkForNextEpoch = SendOnMainParts.Size() >= Ctx->Cfg.MaxToSendPerEpoch || TryDeleteParts.Size() >= Ctx->Cfg.MaxToDeletePerEpoch; Stop(hasSomeWorkForNextEpoch ? TDuration::Seconds(0) : Ctx->Cfg.TimeToSleepIfNothingToDo); return; } @@ -231,9 +232,12 @@ namespace NBalancing { (ConnectedVDisks, ConnectedVDisks.size()), (TotalVDisks, GInfo->GetTotalVDisksNum())); // register sender and deleter actors + IActor* sender = CreateSenderActor(SelfId(), SendOnMainParts.GetNextBatch(), QueueActorMapPtr, Ctx); + IActor* deleter = CreateDeleterActor(SelfId(), TryDeleteParts.GetNextBatch(), QueueActorMapPtr, Ctx); + BatchManager = TBatchManager( - CreateSenderActor(SelfId(), SendOnMainParts.GetNextBatch(), QueueActorMapPtr, Ctx), - CreateDeleterActor(SelfId(), TryDeleteParts.GetNextBatch(), QueueActorMapPtr, Ctx) + RegisterWithSameMailbox(sender), + RegisterWithSameMailbox(deleter) ); } diff --git a/ydb/core/blobstorage/vdisk/balance/balancing_actor.h b/ydb/core/blobstorage/vdisk/balance/balancing_actor.h index 9e35865a53..67dd4cec88 100644 --- a/ydb/core/blobstorage/vdisk/balance/balancing_actor.h +++ b/ydb/core/blobstorage/vdisk/balance/balancing_actor.h @@ -10,13 +10,13 @@ namespace NKikimr { namespace NBalancing { IActor* CreateSenderActor( TActorId notifyId, - TConstArrayRef<TPartInfo> parts, + TVector<TPartInfo>&& parts, TQueueActorMapPtr queueActorMapPtr, std::shared_ptr<TBalancingCtx> ctx ); IActor* CreateDeleterActor( TActorId notifyId, - TConstArrayRef<TLogoBlobID> parts, + TVector<TLogoBlobID>&& parts, TQueueActorMapPtr queueActorMapPtr, std::shared_ptr<TBalancingCtx> ctx ); diff --git a/ydb/core/blobstorage/vdisk/balance/deleter.cpp b/ydb/core/blobstorage/vdisk/balance/deleter.cpp index b6d5764363..1c0bcb5aa2 100644 --- a/ydb/core/blobstorage/vdisk/balance/deleter.cpp +++ b/ydb/core/blobstorage/vdisk/balance/deleter.cpp @@ -21,7 +21,7 @@ namespace { class TPartsRequester { private: const TActorId NotifyId; - TConstArrayRef<TLogoBlobID> Parts; + TVector<TLogoBlobID> Parts; TReplQuoter::TPtr Quoter; TIntrusivePtr<TBlobStorageGroupInfo> GInfo; TQueueActorMapPtr QueueActorMapPtr; @@ -32,14 +32,14 @@ namespace { ui32 Responses = 0; public: - TPartsRequester(TActorId notifyId, TConstArrayRef<TLogoBlobID> parts, TReplQuoter::TPtr quoter, TIntrusivePtr<TBlobStorageGroupInfo> gInfo, TQueueActorMapPtr queueActorMapPtr, NMonGroup::TBalancingGroup& monGroup) + TPartsRequester(TActorId notifyId, TVector<TLogoBlobID>&& parts, TReplQuoter::TPtr quoter, TIntrusivePtr<TBlobStorageGroupInfo> gInfo, TQueueActorMapPtr queueActorMapPtr, NMonGroup::TBalancingGroup& monGroup) : NotifyId(notifyId) - , Parts(parts) + , Parts(std::move(parts)) , Quoter(quoter) , GInfo(gInfo) , QueueActorMapPtr(queueActorMapPtr) , MonGroup(monGroup) - , Result(parts.size()) + , Result(Parts.size()) { } @@ -294,14 +294,14 @@ namespace { TDeleter() = default; TDeleter( TActorId notifyId, - TConstArrayRef<TLogoBlobID> parts, + TVector<TLogoBlobID>&& parts, TQueueActorMapPtr queueActorMapPtr, std::shared_ptr<TBalancingCtx> ctx ) : NotifyId(notifyId) , Ctx(ctx) , GInfo(ctx->GInfo) - , PartsRequester(SelfId(), parts, Ctx->VCtx->ReplNodeRequestQuoter, GInfo, queueActorMapPtr, Ctx->MonGroup) + , PartsRequester(SelfId(), std::move(parts), Ctx->VCtx->ReplNodeRequestQuoter, GInfo, queueActorMapPtr, Ctx->MonGroup) , PartsDeleter(ctx, GInfo) { } @@ -314,11 +314,11 @@ namespace { IActor* CreateDeleterActor( TActorId notifyId, - TConstArrayRef<TLogoBlobID> parts, + TVector<TLogoBlobID>&& parts, TQueueActorMapPtr queueActorMapPtr, std::shared_ptr<TBalancingCtx> ctx ) { - return new TDeleter(notifyId, parts, queueActorMapPtr, ctx); + return new TDeleter(notifyId, std::move(parts), queueActorMapPtr, ctx); } } // NBalancing diff --git a/ydb/core/blobstorage/vdisk/balance/sender.cpp b/ydb/core/blobstorage/vdisk/balance/sender.cpp index a455abb572..ee47372f74 100644 --- a/ydb/core/blobstorage/vdisk/balance/sender.cpp +++ b/ydb/core/blobstorage/vdisk/balance/sender.cpp @@ -19,7 +19,7 @@ namespace { class TReader { private: TPDiskCtxPtr PDiskCtx; - TConstArrayRef<TPartInfo> Parts; + TVector<TPartInfo> Parts; TReplQuoter::TPtr Quoter; const TBlobStorageGroupType GType; NMonGroup::TBalancingGroup& MonGroup; @@ -28,27 +28,28 @@ namespace { ui32 Responses = 0; public: - TReader(TPDiskCtxPtr pDiskCtx, TConstArrayRef<TPartInfo> parts, TReplQuoter::TPtr replPDiskReadQuoter, TBlobStorageGroupType gType, NMonGroup::TBalancingGroup& monGroup) + TReader(TPDiskCtxPtr pDiskCtx, TVector<TPartInfo>&& parts, TReplQuoter::TPtr replPDiskReadQuoter, TBlobStorageGroupType gType, NMonGroup::TBalancingGroup& monGroup) : PDiskCtx(pDiskCtx) - , Parts(parts) + , Parts(std::move(parts)) , Quoter(replPDiskReadQuoter) , GType(gType) , MonGroup(monGroup) - , Result(parts.size()) + , Result(Parts.size()) {} void SendReadRequests(const TActorId& selfId) { for (ui32 i = 0; i < Parts.size(); ++i) { - const auto& item = Parts[i]; + auto& item = Parts[i]; Result[i] = TPart{.Key=item.Key, .PartsMask=item.PartsMask}; std::visit(TOverloaded{ - [&](const TRope& data) { + [&](TRope&& data) { // part is already in memory, no need to read it from disk Y_DEBUG_ABORT_UNLESS(item.PartsMask.CountBits() == 1); - Result[i].PartsData = {data}; + Result[i].PartsData.reserve(1); + Result[i].PartsData.emplace_back(std::move(data)); ++Responses; }, - [&](const TDiskPart& diskPart) { + [&](TDiskPart&& diskPart) { auto ev = std::make_unique<NPDisk::TEvChunkRead>( PDiskCtx->Dsk->Owner, PDiskCtx->Dsk->OwnerRound, @@ -66,7 +67,7 @@ namespace { ); MonGroup.ReadFromHandoffBytes() += diskPart.Size; } - }, item.PartData); + }, std::move(item.PartData)); } } @@ -83,6 +84,8 @@ namespace { auto diskBlob = TDiskBlob(&data, localParts, GType, key); ui32 readSize = 0; + Result[i].PartsData.reserve(localParts.CountBits()); + for (ui8 partIdx = localParts.FirstPosition(); partIdx < localParts.GetSize(); partIdx = localParts.NextPosition(partIdx)) { TRope result; result = diskBlob.GetPart(partIdx, &result); @@ -150,26 +153,27 @@ namespace { auto localParts = part.PartsMask; for (ui8 partIdx = localParts.FirstPosition(), i = 0; partIdx < localParts.GetSize(); partIdx = localParts.NextPosition(partIdx), ++i) { auto key = TLogoBlobID(part.Key, partIdx + 1); - auto& data = part.PartsData[i]; + auto&& data = std::move(part.PartsData[i]); + size_t dataSize = data.size(); auto vDiskId = GetMainReplicaVDiskId(*GInfo, key); if (Ctx->HugeBlobCtx->IsHugeBlob(GInfo->GetTopology().GType, part.Key, Ctx->MinHugeBlobInBytes)) { auto ev = std::make_unique<TEvBlobStorage::TEvVPut>( - key, data, vDiskId, + key, std::move(data), vDiskId, true, nullptr, TInstant::Max(), NKikimrBlobStorage::EPutHandleClass::AsyncBlob ); - SendRequest(TVDiskIdShort(vDiskId), selfId, ev.release(), data.size()); + SendRequest(TVDiskIdShort(vDiskId), selfId, ev.release(), dataSize); } else { STLOG(PRI_DEBUG, BS_VDISK_BALANCING, BSVB11, VDISKP(Ctx->VCtx, "Add in multiput"), (LogoBlobId, key.ToString()), - (To, GInfo->GetTopology().GetOrderNumber(TVDiskIdShort(vDiskId))), (DataSize, data.size())); + (To, GInfo->GetTopology().GetOrderNumber(TVDiskIdShort(vDiskId))), (DataSize, dataSize)); auto& ev = vDiskToEv[vDiskId]; if (!ev) { ev = std::make_unique<TEvBlobStorage::TEvVMultiPut>(vDiskId, TInstant::Max(), NKikimrBlobStorage::EPutHandleClass::AsyncBlob, true, nullptr); } - ev->AddVPut(key, TRcBuf(data), nullptr, {}, NWilson::TTraceId()); + ev->AddVPut(key, TRcBuf(std::move(data)), nullptr, {}, NWilson::TTraceId()); } } } @@ -344,7 +348,7 @@ namespace { public: TSender( TActorId notifyId, - TConstArrayRef<TPartInfo> parts, + TVector<TPartInfo>&& parts, TQueueActorMapPtr queueActorMapPtr, std::shared_ptr<TBalancingCtx> ctx ) @@ -352,7 +356,7 @@ namespace { , QueueActorMapPtr(queueActorMapPtr) , Ctx(ctx) , GInfo(ctx->GInfo) - , Reader(Ctx->PDiskCtx, parts, ctx->VCtx->ReplPDiskReadQuoter, GInfo->GetTopology().GType, Ctx->MonGroup) + , Reader(Ctx->PDiskCtx, std::move(parts), ctx->VCtx->ReplPDiskReadQuoter, GInfo->GetTopology().GType, Ctx->MonGroup) , Sender(ctx, GInfo, queueActorMapPtr) {} @@ -364,11 +368,11 @@ namespace { IActor* CreateSenderActor( TActorId notifyId, - TConstArrayRef<TPartInfo> parts, + TVector<TPartInfo>&& parts, TQueueActorMapPtr queueActorMapPtr, std::shared_ptr<TBalancingCtx> ctx ) { - return new TSender(notifyId, parts, queueActorMapPtr, ctx); + return new TSender(notifyId, std::move(parts), queueActorMapPtr, ctx); } } // NBalancing |