diff options
author | Alexander Rutkovsky <alexander.rutkovsky@gmail.com> | 2022-06-10 17:57:04 +0300 |
---|---|---|
committer | Alexander Rutkovsky <alexander.rutkovsky@gmail.com> | 2022-06-10 17:57:04 +0300 |
commit | 03c65f3b529302f6f685077470a89bf8ce31a1b7 (patch) | |
tree | b6ac57a8521053a6b7e7ceb528cafe7a1c939a79 | |
parent | 1bcf2dc16edab6ccb8d8f99967a89a639ce41a2a (diff) | |
download | ydb-03c65f3b529302f6f685077470a89bf8ce31a1b7.tar.gz |
Separate data and log channels KIKIMR-14867
ref:7d9947cb1fb037e129320ffe8bc8836f36d78d6f
-rw-r--r-- | ydb/core/blob_depot/agent/agent_comm.cpp | 54 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/agent_impl.h | 60 | ||||
-rw-r--r-- | ydb/core/blob_depot/blob_depot_agent.cpp | 43 | ||||
-rw-r--r-- | ydb/core/blob_depot/blob_depot_tablet.h | 12 | ||||
-rw-r--r-- | ydb/core/blob_depot/events.h | 4 | ||||
-rw-r--r-- | ydb/core/blob_depot/op_apply_config.cpp | 12 | ||||
-rw-r--r-- | ydb/core/blob_depot/op_load.cpp | 1 | ||||
-rw-r--r-- | ydb/core/blob_depot/types.h | 17 | ||||
-rw-r--r-- | ydb/core/protos/blob_depot.proto | 20 | ||||
-rw-r--r-- | ydb/core/protos/blob_depot_config.proto | 8 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_blob_depot.cpp | 1 |
11 files changed, 160 insertions, 72 deletions
diff --git a/ydb/core/blob_depot/agent/agent_comm.cpp b/ydb/core/blob_depot/agent/agent_comm.cpp index 87cd4f95cb..cf884280e4 100644 --- a/ydb/core/blob_depot/agent/agent_comm.cpp +++ b/ydb/core/blob_depot/agent/agent_comm.cpp @@ -26,38 +26,60 @@ namespace NKikimr::NBlobDepot { (Msg, msg.Record)); Registered = true; BlobDepotGeneration = msg.Record.GetGeneration(); - auto& channelGroups = msg.Record.GetChannelGroups(); - BlobDepotChannelGroups = {channelGroups.begin(), channelGroups.end()}; + for (const auto& kind : msg.Record.GetChannelKinds()) { + auto& v = ChannelKinds[kind.GetChannelKind()]; + v.ChannelGroups.clear(); + v.IndexToChannel.clear(); + for (const auto& channelGroup : kind.GetChannelGroups()) { + const ui8 channel = channelGroup.GetChannel(); + const ui32 groupId = channelGroup.GetGroupId(); + v.ChannelGroups.emplace_back(channel, groupId); + v.ChannelToIndex[channel] = v.IndexToChannel.size(); + v.IndexToChannel.push_back(channel); + } + } } return true; }); - IssueAllocateIdsIfNeeded(); + IssueAllocateIdsIfNeeded(NKikimrBlobDepot::TChannelKind::Data); + IssueAllocateIdsIfNeeded(NKikimrBlobDepot::TChannelKind::Log); } - void TBlobDepotAgent::IssueAllocateIdsIfNeeded() { + void TBlobDepotAgent::IssueAllocateIdsIfNeeded(NKikimrBlobDepot::TChannelKind::E channelKind) { + auto& kind = ChannelKinds[channelKind]; + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC09, "IssueAllocateIdsIfNeeded", (VirtualGroupId, VirtualGroupId), - (IdAllocInFlight, IdAllocInFlight), (IdQ.size, IdQ.size()), (PreallocatedIdCount, PreallocatedIdCount), - (PipeId, PipeId)); - if (!IdAllocInFlight && IdQ.size() < PreallocatedIdCount && PipeId) { + (ChannelKind, NKikimrBlobDepot::TChannelKind::E_Name(channelKind)), + (IdAllocInFlight, kind.IdAllocInFlight), (IdQ.size, kind.IdQ.size()), + (PreallocatedIdCount, kind.PreallocatedIdCount), (PipeId, PipeId)); + if (!kind.IdAllocInFlight && kind.IdQ.size() < kind.PreallocatedIdCount && PipeId) { const ui64 id = NextRequestId++; - NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvAllocateIds, id); - IdAllocInFlight = true; + NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvAllocateIds(channelKind), id); + kind.IdAllocInFlight = true; - RegisterRequest(id, nullptr, [this](IEventBase *ev) { - Y_VERIFY(IdAllocInFlight); - IdAllocInFlight = false; + RegisterRequest(id, nullptr, [this, channelKind](IEventBase *ev) { + auto& kind = ChannelKinds[channelKind]; + + Y_VERIFY(kind.IdAllocInFlight); + kind.IdAllocInFlight = false; if (ev) { auto& msg = *static_cast<TEvBlobDepot::TEvAllocateIdsResult*>(ev); STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC07, "TEvAllocateIdsResult", (VirtualGroupId, VirtualGroupId), (Msg, msg.Record)); + Y_VERIFY(msg.Record.GetChannelKind() == channelKind); Y_VERIFY(msg.Record.GetGeneration() == BlobDepotGeneration); - IdQ.push_back(TAllocatedId{BlobDepotGeneration, msg.Record.GetRangeBegin(), msg.Record.GetRangeEnd()}); - // FIXME notify waiting requests about new ids + if (msg.Record.HasRangeBegin() && msg.Record.HasRangeEnd()) { + kind.IdQ.push_back({BlobDepotGeneration, msg.Record.GetRangeBegin(), msg.Record.GetRangeEnd()}); + + // FIXME notify waiting requests about new ids - // ask for more ids if needed - IssueAllocateIdsIfNeeded(); + // ask for more ids if needed + IssueAllocateIdsIfNeeded(channelKind); + } else { + // no such channel allocated + } } return true; // request complete, remove from queue diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h index e4caa77894..54d01187f8 100644 --- a/ydb/core/blob_depot/agent/agent_impl.h +++ b/ydb/core/blob_depot/agent/agent_impl.h @@ -115,12 +115,10 @@ namespace NKikimr::NBlobDepot { bool Registered = false; ui32 BlobDepotGeneration = 0; - std::vector<ui32> BlobDepotChannelGroups; void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev); void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev); void ConnectToBlobDepot(); - void IssueAllocateIdsIfNeeded(); void OnDisconnect(); void Issue(NKikimrBlobDepot::TEvBlock msg, TRequestSender *sender, TRequestCompleteCallback callback); @@ -131,34 +129,44 @@ namespace NKikimr::NBlobDepot { //////////////////////////////////////////////////////////////////////////////////////////////////////////////// - struct TAllocatedId { - ui32 Generation; - ui64 Begin; - ui64 End; - }; + struct TChannelKind + : NBlobDepot::TChannelKind + { + struct TAllocatedId { + ui32 Generation; + ui64 Begin; + ui64 End; + }; - bool IdAllocInFlight = false; - std::deque<TAllocatedId> IdQ; - static constexpr size_t PreallocatedIdCount = 2; + std::vector<std::pair<ui8, ui32>> ChannelGroups; - std::pair<TLogoBlobID, ui32> AllocateDataBlobId(ui32 size, ui32 type) { - if (IdQ.empty()) { - return {}; - } + bool IdAllocInFlight = false; + std::deque<TAllocatedId> IdQ; + static constexpr size_t PreallocatedIdCount = 2; + + std::pair<TLogoBlobID, ui32> Allocate(TBlobDepotAgent& agent, ui32 size, ui32 type) { + if (IdQ.empty()) { + return {}; + } - auto& item = IdQ.front(); - auto cgsi = TCGSI::FromBinary(BlobDepotGeneration, BlobDepotChannelGroups.size(), item.Begin++); - if (item.Begin == item.End) { - IdQ.pop_front(); - IssueAllocateIdsIfNeeded(); + auto& item = IdQ.front(); + auto cgsi = TCGSI::FromBinary(item.Generation, *this, item.Begin++); + if (item.Begin == item.End) { + IdQ.pop_front(); + } + static constexpr ui32 typeBits = 24 - TCGSI::IndexBits; + Y_VERIFY(type < (1 << typeBits)); + const ui32 cookie = cgsi.Index << typeBits | type; + const TLogoBlobID id(agent.TabletId, cgsi.Generation, cgsi.Step, cgsi.Channel, size, cookie); + const auto [channel, groupId] = ChannelGroups[ChannelToIndex[cgsi.Channel]]; + Y_VERIFY_DEBUG(channel == cgsi.Channel); + return {id, groupId}; } - static constexpr ui32 typeBits = 24 - TCGSI::IndexBits; - Y_VERIFY(type < (1 << typeBits)); - const ui32 cookie = cgsi.Index << typeBits | type; - const TLogoBlobID id(TabletId, cgsi.Generation, cgsi.Step, cgsi.Channel, size, cookie); - const ui32 groupId = BlobDepotChannelGroups[cgsi.Channel]; - return {id, groupId}; - } + }; + + THashMap<NKikimrBlobDepot::TChannelKind::E, TChannelKind> ChannelKinds; + + void IssueAllocateIdsIfNeeded(NKikimrBlobDepot::TChannelKind::E channelKind); //////////////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/blob_depot/blob_depot_agent.cpp b/ydb/core/blob_depot/blob_depot_agent.cpp index 7b4e0c45ec..a9d7aeabf6 100644 --- a/ydb/core/blob_depot/blob_depot_agent.cpp +++ b/ydb/core/blob_depot/blob_depot_agent.cpp @@ -46,9 +46,14 @@ namespace NKikimr::NBlobDepot { auto response = std::make_unique<TEvBlobDepot::TEvRegisterAgentResult>(); auto& record = response->Record; record.SetGeneration(Executor()->Generation()); - for (const auto& channel : Info()->Channels) { - Y_VERIFY(channel.Channel == record.ChannelGroupsSize()); - record.AddChannelGroups(channel.History ? channel.History.back().GroupID : 0); + for (const auto& [k, v] : ChannelKinds) { + auto *proto = record.AddChannelKinds(); + proto->SetChannelKind(k); + for (const ui32 channel : v.IndexToChannel) { + auto *cg = proto->AddChannelGroups(); + cg->SetChannel(channel); + cg->SetGroupId(Info()->Channels[channel].History.back().GroupID); + } } SendResponseToAgent(*ev, std::move(response)); @@ -61,13 +66,15 @@ namespace NKikimr::NBlobDepot { void TBlobDepot::Handle(TEvBlobDepot::TEvAllocateIds::TPtr ev) { STLOG(PRI_DEBUG, BLOB_DEPOT, BD04, "TEvAllocateIds", (TabletId, TabletID()), (Msg, ev->Get()->Record), (PipeServerId, ev->Recipient)); - auto response = std::make_unique<TEvBlobDepot::TEvAllocateIdsResult>(); + auto response = std::make_unique<TEvBlobDepot::TEvAllocateIdsResult>(ev->Get()->Record.GetChannelKind(), + Executor()->Generation()); auto& record = response->Record; - const ui32 generation = Executor()->Generation(); - record.SetGeneration(generation); - record.SetRangeBegin(NextBlobSeqId); - NextBlobSeqId += PreallocatedIdCount; - record.SetRangeEnd(NextBlobSeqId); + if (const auto it = ChannelKinds.find(record.GetChannelKind()); it != ChannelKinds.end()) { + auto& nextBlobSeqId = it->second.NextBlobSeqId; + record.SetRangeBegin(nextBlobSeqId); + nextBlobSeqId += PreallocatedIdCount; + record.SetRangeEnd(nextBlobSeqId); + } SendResponseToAgent(*ev, std::move(response)); } @@ -89,4 +96,22 @@ namespace NKikimr::NBlobDepot { TActivationContext::Send(handle.release()); } + void TBlobDepot::InitChannelKinds() { + ui32 channel = 0; + for (const auto& profile : Config.GetChannelProfiles()) { + for (ui32 i = 0, count = profile.GetCount(); i < count; ++i, ++channel) { + if (channel >= 2) { + const auto kind = profile.GetChannelKind(); + auto& p = ChannelKinds[kind]; + const ui32 indexWithinKind = p.IndexToChannel.size(); + p.IndexToChannel.push_back(channel); + p.ChannelToIndex[indexWithinKind] = channel; + } + } + } + for (auto& [k, v] : ChannelKinds) { + v.NextBlobSeqId = TCGSI{v.IndexToChannel.front(), Executor()->Generation(), 1, 0}.ToBinary(v); + } + } + } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h index 014ad61d42..9e2bdb0a73 100644 --- a/ydb/core/blob_depot/blob_depot_tablet.h +++ b/ydb/core/blob_depot/blob_depot_tablet.h @@ -45,7 +45,13 @@ namespace NKikimr::NBlobDepot { THashMap<TActorId, std::optional<ui32>> PipeServerToNode; THashMap<ui32, TAgentInfo> Agents; // NodeId -> Agent - ui64 NextBlobSeqId = 0; + struct TChannelKind + : NBlobDepot::TChannelKind + { + ui64 NextBlobSeqId = 0; + }; + + THashMap<NKikimrBlobDepot::TChannelKind::E, TChannelKind> ChannelKinds; void Handle(TEvTabletPipe::TEvServerConnected::TPtr ev); void Handle(TEvTabletPipe::TEvServerDisconnected::TPtr ev); @@ -72,8 +78,6 @@ namespace NKikimr::NBlobDepot { for (auto&& ev : std::exchange(InitialEventsQ, {})) { TActivationContext::Send(ev.release()); } - - NextBlobSeqId = TCGSI{BaseDataChannel, Executor()->Generation(), 1, 0}.ToBinary(Info()->Channels.size()); } void OnDetach(const TActorContext&) override { @@ -90,6 +94,8 @@ namespace NKikimr::NBlobDepot { void SendResponseToAgent(IEventHandle& request, std::unique_ptr<IEventBase> response); + void InitChannelKinds(); + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// STFUNC(StateInit) { diff --git a/ydb/core/blob_depot/events.h b/ydb/core/blob_depot/events.h index 7c01329c50..57b8e21c72 100644 --- a/ydb/core/blob_depot/events.h +++ b/ydb/core/blob_depot/events.h @@ -52,8 +52,8 @@ namespace NKikimr { BLOBDEPOT_EVENT_PB(EvApplyConfigResult, TabletId, TxId); BLOBDEPOT_EVENT_PB(EvRegisterAgent, VirtualGroupId); BLOBDEPOT_EVENT_PB_NO_ARGS(EvRegisterAgentResult); - BLOBDEPOT_EVENT_PB_NO_ARGS(EvAllocateIds); - BLOBDEPOT_EVENT_PB_NO_ARGS(EvAllocateIdsResult); + BLOBDEPOT_EVENT_PB(EvAllocateIds, ChannelKind); + BLOBDEPOT_EVENT_PB(EvAllocateIdsResult, ChannelKind, Generation); BLOBDEPOT_EVENT_PB(EvBlock, TabletId, BlockedGeneration); BLOBDEPOT_EVENT_PB(EvBlockResult, Status, ErrorReason); BLOBDEPOT_EVENT_PB_NO_ARGS(EvPushNotify); diff --git a/ydb/core/blob_depot/op_apply_config.cpp b/ydb/core/blob_depot/op_apply_config.cpp index 57b77e0183..9bda0223d0 100644 --- a/ydb/core/blob_depot/op_apply_config.cpp +++ b/ydb/core/blob_depot/op_apply_config.cpp @@ -8,7 +8,6 @@ namespace NKikimr::NBlobDepot { class TTxApplyConfig : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { std::unique_ptr<IEventHandle> Response; - const TActorId InterconnectSession; TString ConfigProtobuf; public: @@ -16,9 +15,11 @@ namespace NKikimr::NBlobDepot { TActorId interconnectSession) : TTransactionBase(self) , Response(std::move(response)) - , InterconnectSession(interconnectSession) { - const bool success = ev.Record.SerializeToString(&ConfigProtobuf); + if (interconnectSession) { + Response->Rewrite(TEvInterconnect::EvForward, interconnectSession); + } + const bool success = ev.Record.GetConfig().SerializeToString(&ConfigProtobuf); Y_VERIFY(success); } @@ -31,10 +32,11 @@ namespace NKikimr::NBlobDepot { } void Complete(const TActorContext&) override { + const bool wasEmpty = !Self->Config.ByteSizeLong(); const bool success = Self->Config.ParseFromString(ConfigProtobuf); Y_VERIFY(success); - if (InterconnectSession) { - Response->Rewrite(TEvInterconnect::EvForward, InterconnectSession); + if (wasEmpty) { + Self->InitChannelKinds(); } TActivationContext::Send(Response.release()); } diff --git a/ydb/core/blob_depot/op_load.cpp b/ydb/core/blob_depot/op_load.cpp index 240bc5ad6d..c028378065 100644 --- a/ydb/core/blob_depot/op_load.cpp +++ b/ydb/core/blob_depot/op_load.cpp @@ -57,6 +57,7 @@ namespace NKikimr::NBlobDepot { } void Complete(const TActorContext&) override { + Self->InitChannelKinds(); } }; diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h index 141696bd4f..caf347a6ce 100644 --- a/ydb/core/blob_depot/types.h +++ b/ydb/core/blob_depot/types.h @@ -6,6 +6,11 @@ namespace NKikimr::NBlobDepot { static constexpr ui32 BaseDataChannel = 2; + struct TChannelKind { + std::array<ui8, 256> ChannelToIndex; + std::vector<ui8> IndexToChannel; + }; + struct TCGSI { static constexpr ui32 IndexBits = 20; static constexpr ui32 MaxIndex = (1 << IndexBits) - 1; @@ -15,18 +20,18 @@ namespace NKikimr::NBlobDepot { ui32 Step; ui32 Index; - ui64 ToBinary(ui32 numChannels) const { - Y_VERIFY_DEBUG(numChannels > BaseDataChannel); + ui64 ToBinary(const TChannelKind& kind) const { Y_VERIFY_DEBUG(Index <= MaxIndex); - return (static_cast<ui64>(Step) << IndexBits | Index) * (numChannels - BaseDataChannel) + (Channel - BaseDataChannel); + Y_VERIFY(Channel < kind.ChannelToIndex.size()); + return (static_cast<ui64>(Step) << IndexBits | Index) * kind.IndexToChannel.size() + kind.ChannelToIndex[Channel]; } - static TCGSI FromBinary(ui32 generation, ui32 numChannels, ui64 value) { + static TCGSI FromBinary(ui32 generation, const TChannelKind& kind, ui64 value) { static_assert(sizeof(long long) >= sizeof(ui64)); - auto res = std::lldiv(value, numChannels - BaseDataChannel); + auto res = std::lldiv(value, kind.IndexToChannel.size()); return TCGSI{ - .Channel = static_cast<ui32>(res.rem + BaseDataChannel), + .Channel = kind.IndexToChannel[res.rem], .Generation = generation, .Step = static_cast<ui32>(res.quot >> IndexBits), .Index = static_cast<ui32>(res.quot) & MaxIndex diff --git a/ydb/core/protos/blob_depot.proto b/ydb/core/protos/blob_depot.proto index a1cca5e67b..81a56285d2 100644 --- a/ydb/core/protos/blob_depot.proto +++ b/ydb/core/protos/blob_depot.proto @@ -38,17 +38,27 @@ message TEvRegisterAgent { } message TEvRegisterAgentResult { - repeated uint32 ChannelGroups = 1; - optional uint32 Generation = 2; + message TChannelGroupId { + optional uint32 Channel = 1; + optional uint32 GroupId = 2; + }; + message TChannelKind { + optional NKikimrBlobDepot.TChannelKind.E ChannelKind = 1; + repeated TChannelGroupId ChannelGroups = 2; + } + optional uint32 Generation = 1; + repeated TChannelKind ChannelKinds = 2; } message TEvAllocateIds { + optional NKikimrBlobDepot.TChannelKind.E ChannelKind = 1; } message TEvAllocateIdsResult { - optional uint32 Generation = 1; // executor generation, for validation purposes - optional uint64 RangeBegin = 2; // <Generation> <Step> <Channel> - optional uint64 RangeEnd = 3; + optional NKikimrBlobDepot.TChannelKind.E ChannelKind = 1; + optional uint32 Generation = 2; // executor generation, for validation purposes + optional uint64 RangeBegin = 3; // <Generation> <Step> <Channel> + optional uint64 RangeEnd = 4; } message TEvBlock { diff --git a/ydb/core/protos/blob_depot_config.proto b/ydb/core/protos/blob_depot_config.proto index 2f7f9268ca..6858a242c2 100644 --- a/ydb/core/protos/blob_depot_config.proto +++ b/ydb/core/protos/blob_depot_config.proto @@ -1,8 +1,16 @@ package NKikimrBlobDepot; +message TChannelKind { + enum E { + Data = 1; + Log = 2; + } +} + message TChannelProfile { optional string StoragePoolKind = 1; optional uint32 Count = 2; + optional TChannelKind.E ChannelKind = 3; } enum EOperationMode { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_blob_depot.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_blob_depot.cpp index add3147d5a..a0aef187a7 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_blob_depot.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_blob_depot.cpp @@ -68,6 +68,7 @@ namespace NKikimr::NSchemeShard { blobDepotInfo->BlobDepotTabletId = tabletId; Y_VERIFY(shard.TabletType == ETabletType::BlobDepot); auto event = std::make_unique<TEvBlobDepot::TEvApplyConfig>(static_cast<ui64>(OperationId.GetTxId())); + event->Record.MutableConfig()->CopyFrom(blobDepotInfo->Description.GetConfig()); context.OnComplete.BindMsgToPipe(OperationId, tabletId, shardIdx, event.release()); txState->ShardsInProgress.insert(shardIdx); } |