aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexander Rutkovsky <alexander.rutkovsky@gmail.com>2022-06-10 17:57:04 +0300
committerAlexander Rutkovsky <alexander.rutkovsky@gmail.com>2022-06-10 17:57:04 +0300
commit03c65f3b529302f6f685077470a89bf8ce31a1b7 (patch)
treeb6ac57a8521053a6b7e7ceb528cafe7a1c939a79
parent1bcf2dc16edab6ccb8d8f99967a89a639ce41a2a (diff)
downloadydb-03c65f3b529302f6f685077470a89bf8ce31a1b7.tar.gz
Separate data and log channels KIKIMR-14867
ref:7d9947cb1fb037e129320ffe8bc8836f36d78d6f
-rw-r--r--ydb/core/blob_depot/agent/agent_comm.cpp54
-rw-r--r--ydb/core/blob_depot/agent/agent_impl.h60
-rw-r--r--ydb/core/blob_depot/blob_depot_agent.cpp43
-rw-r--r--ydb/core/blob_depot/blob_depot_tablet.h12
-rw-r--r--ydb/core/blob_depot/events.h4
-rw-r--r--ydb/core/blob_depot/op_apply_config.cpp12
-rw-r--r--ydb/core/blob_depot/op_load.cpp1
-rw-r--r--ydb/core/blob_depot/types.h17
-rw-r--r--ydb/core/protos/blob_depot.proto20
-rw-r--r--ydb/core/protos/blob_depot_config.proto8
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_blob_depot.cpp1
11 files changed, 160 insertions, 72 deletions
diff --git a/ydb/core/blob_depot/agent/agent_comm.cpp b/ydb/core/blob_depot/agent/agent_comm.cpp
index 87cd4f95cb..cf884280e4 100644
--- a/ydb/core/blob_depot/agent/agent_comm.cpp
+++ b/ydb/core/blob_depot/agent/agent_comm.cpp
@@ -26,38 +26,60 @@ namespace NKikimr::NBlobDepot {
(Msg, msg.Record));
Registered = true;
BlobDepotGeneration = msg.Record.GetGeneration();
- auto& channelGroups = msg.Record.GetChannelGroups();
- BlobDepotChannelGroups = {channelGroups.begin(), channelGroups.end()};
+ for (const auto& kind : msg.Record.GetChannelKinds()) {
+ auto& v = ChannelKinds[kind.GetChannelKind()];
+ v.ChannelGroups.clear();
+ v.IndexToChannel.clear();
+ for (const auto& channelGroup : kind.GetChannelGroups()) {
+ const ui8 channel = channelGroup.GetChannel();
+ const ui32 groupId = channelGroup.GetGroupId();
+ v.ChannelGroups.emplace_back(channel, groupId);
+ v.ChannelToIndex[channel] = v.IndexToChannel.size();
+ v.IndexToChannel.push_back(channel);
+ }
+ }
}
return true;
});
- IssueAllocateIdsIfNeeded();
+ IssueAllocateIdsIfNeeded(NKikimrBlobDepot::TChannelKind::Data);
+ IssueAllocateIdsIfNeeded(NKikimrBlobDepot::TChannelKind::Log);
}
- void TBlobDepotAgent::IssueAllocateIdsIfNeeded() {
+ void TBlobDepotAgent::IssueAllocateIdsIfNeeded(NKikimrBlobDepot::TChannelKind::E channelKind) {
+ auto& kind = ChannelKinds[channelKind];
+
STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC09, "IssueAllocateIdsIfNeeded", (VirtualGroupId, VirtualGroupId),
- (IdAllocInFlight, IdAllocInFlight), (IdQ.size, IdQ.size()), (PreallocatedIdCount, PreallocatedIdCount),
- (PipeId, PipeId));
- if (!IdAllocInFlight && IdQ.size() < PreallocatedIdCount && PipeId) {
+ (ChannelKind, NKikimrBlobDepot::TChannelKind::E_Name(channelKind)),
+ (IdAllocInFlight, kind.IdAllocInFlight), (IdQ.size, kind.IdQ.size()),
+ (PreallocatedIdCount, kind.PreallocatedIdCount), (PipeId, PipeId));
+ if (!kind.IdAllocInFlight && kind.IdQ.size() < kind.PreallocatedIdCount && PipeId) {
const ui64 id = NextRequestId++;
- NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvAllocateIds, id);
- IdAllocInFlight = true;
+ NTabletPipe::SendData(SelfId(), PipeId, new TEvBlobDepot::TEvAllocateIds(channelKind), id);
+ kind.IdAllocInFlight = true;
- RegisterRequest(id, nullptr, [this](IEventBase *ev) {
- Y_VERIFY(IdAllocInFlight);
- IdAllocInFlight = false;
+ RegisterRequest(id, nullptr, [this, channelKind](IEventBase *ev) {
+ auto& kind = ChannelKinds[channelKind];
+
+ Y_VERIFY(kind.IdAllocInFlight);
+ kind.IdAllocInFlight = false;
if (ev) {
auto& msg = *static_cast<TEvBlobDepot::TEvAllocateIdsResult*>(ev);
STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC07, "TEvAllocateIdsResult", (VirtualGroupId, VirtualGroupId),
(Msg, msg.Record));
+ Y_VERIFY(msg.Record.GetChannelKind() == channelKind);
Y_VERIFY(msg.Record.GetGeneration() == BlobDepotGeneration);
- IdQ.push_back(TAllocatedId{BlobDepotGeneration, msg.Record.GetRangeBegin(), msg.Record.GetRangeEnd()});
- // FIXME notify waiting requests about new ids
+ if (msg.Record.HasRangeBegin() && msg.Record.HasRangeEnd()) {
+ kind.IdQ.push_back({BlobDepotGeneration, msg.Record.GetRangeBegin(), msg.Record.GetRangeEnd()});
+
+ // FIXME notify waiting requests about new ids
- // ask for more ids if needed
- IssueAllocateIdsIfNeeded();
+ // ask for more ids if needed
+ IssueAllocateIdsIfNeeded(channelKind);
+ } else {
+ // no such channel allocated
+ }
}
return true; // request complete, remove from queue
diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h
index e4caa77894..54d01187f8 100644
--- a/ydb/core/blob_depot/agent/agent_impl.h
+++ b/ydb/core/blob_depot/agent/agent_impl.h
@@ -115,12 +115,10 @@ namespace NKikimr::NBlobDepot {
bool Registered = false;
ui32 BlobDepotGeneration = 0;
- std::vector<ui32> BlobDepotChannelGroups;
void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev);
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev);
void ConnectToBlobDepot();
- void IssueAllocateIdsIfNeeded();
void OnDisconnect();
void Issue(NKikimrBlobDepot::TEvBlock msg, TRequestSender *sender, TRequestCompleteCallback callback);
@@ -131,34 +129,44 @@ namespace NKikimr::NBlobDepot {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- struct TAllocatedId {
- ui32 Generation;
- ui64 Begin;
- ui64 End;
- };
+ struct TChannelKind
+ : NBlobDepot::TChannelKind
+ {
+ struct TAllocatedId {
+ ui32 Generation;
+ ui64 Begin;
+ ui64 End;
+ };
- bool IdAllocInFlight = false;
- std::deque<TAllocatedId> IdQ;
- static constexpr size_t PreallocatedIdCount = 2;
+ std::vector<std::pair<ui8, ui32>> ChannelGroups;
- std::pair<TLogoBlobID, ui32> AllocateDataBlobId(ui32 size, ui32 type) {
- if (IdQ.empty()) {
- return {};
- }
+ bool IdAllocInFlight = false;
+ std::deque<TAllocatedId> IdQ;
+ static constexpr size_t PreallocatedIdCount = 2;
+
+ std::pair<TLogoBlobID, ui32> Allocate(TBlobDepotAgent& agent, ui32 size, ui32 type) {
+ if (IdQ.empty()) {
+ return {};
+ }
- auto& item = IdQ.front();
- auto cgsi = TCGSI::FromBinary(BlobDepotGeneration, BlobDepotChannelGroups.size(), item.Begin++);
- if (item.Begin == item.End) {
- IdQ.pop_front();
- IssueAllocateIdsIfNeeded();
+ auto& item = IdQ.front();
+ auto cgsi = TCGSI::FromBinary(item.Generation, *this, item.Begin++);
+ if (item.Begin == item.End) {
+ IdQ.pop_front();
+ }
+ static constexpr ui32 typeBits = 24 - TCGSI::IndexBits;
+ Y_VERIFY(type < (1 << typeBits));
+ const ui32 cookie = cgsi.Index << typeBits | type;
+ const TLogoBlobID id(agent.TabletId, cgsi.Generation, cgsi.Step, cgsi.Channel, size, cookie);
+ const auto [channel, groupId] = ChannelGroups[ChannelToIndex[cgsi.Channel]];
+ Y_VERIFY_DEBUG(channel == cgsi.Channel);
+ return {id, groupId};
}
- static constexpr ui32 typeBits = 24 - TCGSI::IndexBits;
- Y_VERIFY(type < (1 << typeBits));
- const ui32 cookie = cgsi.Index << typeBits | type;
- const TLogoBlobID id(TabletId, cgsi.Generation, cgsi.Step, cgsi.Channel, size, cookie);
- const ui32 groupId = BlobDepotChannelGroups[cgsi.Channel];
- return {id, groupId};
- }
+ };
+
+ THashMap<NKikimrBlobDepot::TChannelKind::E, TChannelKind> ChannelKinds;
+
+ void IssueAllocateIdsIfNeeded(NKikimrBlobDepot::TChannelKind::E channelKind);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
diff --git a/ydb/core/blob_depot/blob_depot_agent.cpp b/ydb/core/blob_depot/blob_depot_agent.cpp
index 7b4e0c45ec..a9d7aeabf6 100644
--- a/ydb/core/blob_depot/blob_depot_agent.cpp
+++ b/ydb/core/blob_depot/blob_depot_agent.cpp
@@ -46,9 +46,14 @@ namespace NKikimr::NBlobDepot {
auto response = std::make_unique<TEvBlobDepot::TEvRegisterAgentResult>();
auto& record = response->Record;
record.SetGeneration(Executor()->Generation());
- for (const auto& channel : Info()->Channels) {
- Y_VERIFY(channel.Channel == record.ChannelGroupsSize());
- record.AddChannelGroups(channel.History ? channel.History.back().GroupID : 0);
+ for (const auto& [k, v] : ChannelKinds) {
+ auto *proto = record.AddChannelKinds();
+ proto->SetChannelKind(k);
+ for (const ui32 channel : v.IndexToChannel) {
+ auto *cg = proto->AddChannelGroups();
+ cg->SetChannel(channel);
+ cg->SetGroupId(Info()->Channels[channel].History.back().GroupID);
+ }
}
SendResponseToAgent(*ev, std::move(response));
@@ -61,13 +66,15 @@ namespace NKikimr::NBlobDepot {
void TBlobDepot::Handle(TEvBlobDepot::TEvAllocateIds::TPtr ev) {
STLOG(PRI_DEBUG, BLOB_DEPOT, BD04, "TEvAllocateIds", (TabletId, TabletID()), (Msg, ev->Get()->Record),
(PipeServerId, ev->Recipient));
- auto response = std::make_unique<TEvBlobDepot::TEvAllocateIdsResult>();
+ auto response = std::make_unique<TEvBlobDepot::TEvAllocateIdsResult>(ev->Get()->Record.GetChannelKind(),
+ Executor()->Generation());
auto& record = response->Record;
- const ui32 generation = Executor()->Generation();
- record.SetGeneration(generation);
- record.SetRangeBegin(NextBlobSeqId);
- NextBlobSeqId += PreallocatedIdCount;
- record.SetRangeEnd(NextBlobSeqId);
+ if (const auto it = ChannelKinds.find(record.GetChannelKind()); it != ChannelKinds.end()) {
+ auto& nextBlobSeqId = it->second.NextBlobSeqId;
+ record.SetRangeBegin(nextBlobSeqId);
+ nextBlobSeqId += PreallocatedIdCount;
+ record.SetRangeEnd(nextBlobSeqId);
+ }
SendResponseToAgent(*ev, std::move(response));
}
@@ -89,4 +96,22 @@ namespace NKikimr::NBlobDepot {
TActivationContext::Send(handle.release());
}
+ void TBlobDepot::InitChannelKinds() {
+ ui32 channel = 0;
+ for (const auto& profile : Config.GetChannelProfiles()) {
+ for (ui32 i = 0, count = profile.GetCount(); i < count; ++i, ++channel) {
+ if (channel >= 2) {
+ const auto kind = profile.GetChannelKind();
+ auto& p = ChannelKinds[kind];
+ const ui32 indexWithinKind = p.IndexToChannel.size();
+ p.IndexToChannel.push_back(channel);
+ p.ChannelToIndex[indexWithinKind] = channel;
+ }
+ }
+ }
+ for (auto& [k, v] : ChannelKinds) {
+ v.NextBlobSeqId = TCGSI{v.IndexToChannel.front(), Executor()->Generation(), 1, 0}.ToBinary(v);
+ }
+ }
+
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h
index 014ad61d42..9e2bdb0a73 100644
--- a/ydb/core/blob_depot/blob_depot_tablet.h
+++ b/ydb/core/blob_depot/blob_depot_tablet.h
@@ -45,7 +45,13 @@ namespace NKikimr::NBlobDepot {
THashMap<TActorId, std::optional<ui32>> PipeServerToNode;
THashMap<ui32, TAgentInfo> Agents; // NodeId -> Agent
- ui64 NextBlobSeqId = 0;
+ struct TChannelKind
+ : NBlobDepot::TChannelKind
+ {
+ ui64 NextBlobSeqId = 0;
+ };
+
+ THashMap<NKikimrBlobDepot::TChannelKind::E, TChannelKind> ChannelKinds;
void Handle(TEvTabletPipe::TEvServerConnected::TPtr ev);
void Handle(TEvTabletPipe::TEvServerDisconnected::TPtr ev);
@@ -72,8 +78,6 @@ namespace NKikimr::NBlobDepot {
for (auto&& ev : std::exchange(InitialEventsQ, {})) {
TActivationContext::Send(ev.release());
}
-
- NextBlobSeqId = TCGSI{BaseDataChannel, Executor()->Generation(), 1, 0}.ToBinary(Info()->Channels.size());
}
void OnDetach(const TActorContext&) override {
@@ -90,6 +94,8 @@ namespace NKikimr::NBlobDepot {
void SendResponseToAgent(IEventHandle& request, std::unique_ptr<IEventBase> response);
+ void InitChannelKinds();
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
STFUNC(StateInit) {
diff --git a/ydb/core/blob_depot/events.h b/ydb/core/blob_depot/events.h
index 7c01329c50..57b8e21c72 100644
--- a/ydb/core/blob_depot/events.h
+++ b/ydb/core/blob_depot/events.h
@@ -52,8 +52,8 @@ namespace NKikimr {
BLOBDEPOT_EVENT_PB(EvApplyConfigResult, TabletId, TxId);
BLOBDEPOT_EVENT_PB(EvRegisterAgent, VirtualGroupId);
BLOBDEPOT_EVENT_PB_NO_ARGS(EvRegisterAgentResult);
- BLOBDEPOT_EVENT_PB_NO_ARGS(EvAllocateIds);
- BLOBDEPOT_EVENT_PB_NO_ARGS(EvAllocateIdsResult);
+ BLOBDEPOT_EVENT_PB(EvAllocateIds, ChannelKind);
+ BLOBDEPOT_EVENT_PB(EvAllocateIdsResult, ChannelKind, Generation);
BLOBDEPOT_EVENT_PB(EvBlock, TabletId, BlockedGeneration);
BLOBDEPOT_EVENT_PB(EvBlockResult, Status, ErrorReason);
BLOBDEPOT_EVENT_PB_NO_ARGS(EvPushNotify);
diff --git a/ydb/core/blob_depot/op_apply_config.cpp b/ydb/core/blob_depot/op_apply_config.cpp
index 57b77e0183..9bda0223d0 100644
--- a/ydb/core/blob_depot/op_apply_config.cpp
+++ b/ydb/core/blob_depot/op_apply_config.cpp
@@ -8,7 +8,6 @@ namespace NKikimr::NBlobDepot {
class TTxApplyConfig : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
std::unique_ptr<IEventHandle> Response;
- const TActorId InterconnectSession;
TString ConfigProtobuf;
public:
@@ -16,9 +15,11 @@ namespace NKikimr::NBlobDepot {
TActorId interconnectSession)
: TTransactionBase(self)
, Response(std::move(response))
- , InterconnectSession(interconnectSession)
{
- const bool success = ev.Record.SerializeToString(&ConfigProtobuf);
+ if (interconnectSession) {
+ Response->Rewrite(TEvInterconnect::EvForward, interconnectSession);
+ }
+ const bool success = ev.Record.GetConfig().SerializeToString(&ConfigProtobuf);
Y_VERIFY(success);
}
@@ -31,10 +32,11 @@ namespace NKikimr::NBlobDepot {
}
void Complete(const TActorContext&) override {
+ const bool wasEmpty = !Self->Config.ByteSizeLong();
const bool success = Self->Config.ParseFromString(ConfigProtobuf);
Y_VERIFY(success);
- if (InterconnectSession) {
- Response->Rewrite(TEvInterconnect::EvForward, InterconnectSession);
+ if (wasEmpty) {
+ Self->InitChannelKinds();
}
TActivationContext::Send(Response.release());
}
diff --git a/ydb/core/blob_depot/op_load.cpp b/ydb/core/blob_depot/op_load.cpp
index 240bc5ad6d..c028378065 100644
--- a/ydb/core/blob_depot/op_load.cpp
+++ b/ydb/core/blob_depot/op_load.cpp
@@ -57,6 +57,7 @@ namespace NKikimr::NBlobDepot {
}
void Complete(const TActorContext&) override {
+ Self->InitChannelKinds();
}
};
diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h
index 141696bd4f..caf347a6ce 100644
--- a/ydb/core/blob_depot/types.h
+++ b/ydb/core/blob_depot/types.h
@@ -6,6 +6,11 @@ namespace NKikimr::NBlobDepot {
static constexpr ui32 BaseDataChannel = 2;
+ struct TChannelKind {
+ std::array<ui8, 256> ChannelToIndex;
+ std::vector<ui8> IndexToChannel;
+ };
+
struct TCGSI {
static constexpr ui32 IndexBits = 20;
static constexpr ui32 MaxIndex = (1 << IndexBits) - 1;
@@ -15,18 +20,18 @@ namespace NKikimr::NBlobDepot {
ui32 Step;
ui32 Index;
- ui64 ToBinary(ui32 numChannels) const {
- Y_VERIFY_DEBUG(numChannels > BaseDataChannel);
+ ui64 ToBinary(const TChannelKind& kind) const {
Y_VERIFY_DEBUG(Index <= MaxIndex);
- return (static_cast<ui64>(Step) << IndexBits | Index) * (numChannels - BaseDataChannel) + (Channel - BaseDataChannel);
+ Y_VERIFY(Channel < kind.ChannelToIndex.size());
+ return (static_cast<ui64>(Step) << IndexBits | Index) * kind.IndexToChannel.size() + kind.ChannelToIndex[Channel];
}
- static TCGSI FromBinary(ui32 generation, ui32 numChannels, ui64 value) {
+ static TCGSI FromBinary(ui32 generation, const TChannelKind& kind, ui64 value) {
static_assert(sizeof(long long) >= sizeof(ui64));
- auto res = std::lldiv(value, numChannels - BaseDataChannel);
+ auto res = std::lldiv(value, kind.IndexToChannel.size());
return TCGSI{
- .Channel = static_cast<ui32>(res.rem + BaseDataChannel),
+ .Channel = kind.IndexToChannel[res.rem],
.Generation = generation,
.Step = static_cast<ui32>(res.quot >> IndexBits),
.Index = static_cast<ui32>(res.quot) & MaxIndex
diff --git a/ydb/core/protos/blob_depot.proto b/ydb/core/protos/blob_depot.proto
index a1cca5e67b..81a56285d2 100644
--- a/ydb/core/protos/blob_depot.proto
+++ b/ydb/core/protos/blob_depot.proto
@@ -38,17 +38,27 @@ message TEvRegisterAgent {
}
message TEvRegisterAgentResult {
- repeated uint32 ChannelGroups = 1;
- optional uint32 Generation = 2;
+ message TChannelGroupId {
+ optional uint32 Channel = 1;
+ optional uint32 GroupId = 2;
+ };
+ message TChannelKind {
+ optional NKikimrBlobDepot.TChannelKind.E ChannelKind = 1;
+ repeated TChannelGroupId ChannelGroups = 2;
+ }
+ optional uint32 Generation = 1;
+ repeated TChannelKind ChannelKinds = 2;
}
message TEvAllocateIds {
+ optional NKikimrBlobDepot.TChannelKind.E ChannelKind = 1;
}
message TEvAllocateIdsResult {
- optional uint32 Generation = 1; // executor generation, for validation purposes
- optional uint64 RangeBegin = 2; // <Generation> <Step> <Channel>
- optional uint64 RangeEnd = 3;
+ optional NKikimrBlobDepot.TChannelKind.E ChannelKind = 1;
+ optional uint32 Generation = 2; // executor generation, for validation purposes
+ optional uint64 RangeBegin = 3; // <Generation> <Step> <Channel>
+ optional uint64 RangeEnd = 4;
}
message TEvBlock {
diff --git a/ydb/core/protos/blob_depot_config.proto b/ydb/core/protos/blob_depot_config.proto
index 2f7f9268ca..6858a242c2 100644
--- a/ydb/core/protos/blob_depot_config.proto
+++ b/ydb/core/protos/blob_depot_config.proto
@@ -1,8 +1,16 @@
package NKikimrBlobDepot;
+message TChannelKind {
+ enum E {
+ Data = 1;
+ Log = 2;
+ }
+}
+
message TChannelProfile {
optional string StoragePoolKind = 1;
optional uint32 Count = 2;
+ optional TChannelKind.E ChannelKind = 3;
}
enum EOperationMode {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_blob_depot.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_blob_depot.cpp
index add3147d5a..a0aef187a7 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_blob_depot.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_blob_depot.cpp
@@ -68,6 +68,7 @@ namespace NKikimr::NSchemeShard {
blobDepotInfo->BlobDepotTabletId = tabletId;
Y_VERIFY(shard.TabletType == ETabletType::BlobDepot);
auto event = std::make_unique<TEvBlobDepot::TEvApplyConfig>(static_cast<ui64>(OperationId.GetTxId()));
+ event->Record.MutableConfig()->CopyFrom(blobDepotInfo->Description.GetConfig());
context.OnComplete.BindMsgToPipe(OperationId, tabletId, shardIdx, event.release());
txState->ShardsInProgress.insert(shardIdx);
}