aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexander Rutkovsky <alexander.rutkovsky@gmail.com>2022-07-06 14:01:50 +0300
committerAlexander Rutkovsky <alexander.rutkovsky@gmail.com>2022-07-06 14:01:50 +0300
commit34362463f988a45a252ec6816453b3bea478152e (patch)
tree0f4173e5e896fb5a63f44b6371a6e2b869c390eb
parent99c29df361c9a4e7f9377ed58025dccf521d86c4 (diff)
downloadydb-34362463f988a45a252ec6816453b3bea478152e.tar.gz
BlobDepot work in progress KIKIMR-14867
ref:cf621fb462ea79d0b982a2a91bf5ff128f7a545a
-rw-r--r--ydb/core/blob_depot/agent.cpp14
-rw-r--r--ydb/core/blob_depot/agent/agent_impl.h17
-rw-r--r--ydb/core/blob_depot/agent/comm.cpp36
-rw-r--r--ydb/core/blob_depot/agent/read.cpp4
-rw-r--r--ydb/core/blob_depot/agent/storage_put.cpp18
-rw-r--r--ydb/core/blob_depot/blob_depot_tablet.h53
-rw-r--r--ydb/core/blob_depot/blocks.cpp16
-rw-r--r--ydb/core/blob_depot/data.cpp57
-rw-r--r--ydb/core/blob_depot/garbage_collection.cpp44
-rw-r--r--ydb/core/blob_depot/op_commit_blob_seq.cpp41
-rw-r--r--ydb/core/blob_depot/types.h29
11 files changed, 169 insertions, 160 deletions
diff --git a/ydb/core/blob_depot/agent.cpp b/ydb/core/blob_depot/agent.cpp
index fd94bdfd15..ff78ad2e75 100644
--- a/ydb/core/blob_depot/agent.cpp
+++ b/ydb/core/blob_depot/agent.cpp
@@ -48,10 +48,10 @@ namespace NKikimr::NBlobDepot {
for (const auto& [k, v] : ChannelKinds) {
auto *proto = record->AddChannelKinds();
proto->SetChannelKind(k);
- for (const ui32 channel : v.IndexToChannel) {
+ for (const auto& [channel, groupId] : v.ChannelGroups) {
auto *cg = proto->AddChannelGroups();
cg->SetChannel(channel);
- cg->SetGroupId(Info()->Channels[channel].History.back().GroupID);
+ cg->SetGroupId(groupId);
}
}
@@ -94,20 +94,22 @@ namespace NKikimr::NBlobDepot {
}
void TBlobDepot::InitChannelKinds() {
+ TTabletStorageInfo *info = Info();
+ const ui32 generation = Executor()->Generation();
+
ui32 channel = 0;
for (const auto& profile : Config.GetChannelProfiles()) {
for (ui32 i = 0, count = profile.GetCount(); i < count; ++i, ++channel) {
if (channel >= 2) {
const auto kind = profile.GetChannelKind();
auto& p = ChannelKinds[kind];
- const ui32 indexWithinKind = p.IndexToChannel.size();
- p.IndexToChannel.push_back(channel);
- p.ChannelToIndex[indexWithinKind] = channel;
+ p.ChannelToIndex[channel] = p.ChannelGroups.size();
+ p.ChannelGroups.emplace_back(channel, info->GroupFor(channel, generation));
}
}
}
for (auto& [k, v] : ChannelKinds) {
- v.NextBlobSeqId = TCGSI{v.IndexToChannel.front(), Executor()->Generation(), 1, 0}.ToBinary(v);
+ v.NextBlobSeqId = TBlobSeqId{v.ChannelGroups.front().first, generation, 1, 0}.ToBinary(v);
}
}
diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h
index e6bcc3a6a5..95216de7d6 100644
--- a/ydb/core/blob_depot/agent/agent_impl.h
+++ b/ydb/core/blob_depot/agent/agent_impl.h
@@ -23,7 +23,7 @@ namespace NKikimr::NBlobDepot {
template<typename T>
T& Obtain() {
T *sp = static_cast<T*>(this);
- Y_VERIFY_DEBUG(sp == dynamic_cast<T*>(this));
+ Y_VERIFY_DEBUG(sp && sp == dynamic_cast<T*>(this));
return *sp;
}
@@ -253,7 +253,6 @@ namespace NKikimr::NBlobDepot {
};
const NKikimrBlobDepot::TChannelKind::E Kind;
- std::vector<std::pair<ui8, ui32>> ChannelGroups;
bool IdAllocInFlight = false;
std::deque<TAllocatedId> IdQ;
@@ -265,26 +264,26 @@ namespace NKikimr::NBlobDepot {
: Kind(kind)
{}
- std::optional<TCGSI> Allocate(TBlobDepotAgent& agent) {
+ std::optional<TBlobSeqId> Allocate(TBlobDepotAgent& agent) {
if (IdQ.empty()) {
return std::nullopt;
}
auto& item = IdQ.front();
- auto cgsi = TCGSI::FromBinary(item.Generation, *this, item.Begin++);
+ auto blobSeqId = TBlobSeqId::FromBinary(item.Generation, *this, item.Begin++);
if (item.Begin == item.End) {
IdQ.pop_front();
agent.IssueAllocateIdsIfNeeded(*this);
}
- return cgsi;
+ return blobSeqId;
}
- std::pair<TLogoBlobID, ui32> MakeBlobId(TBlobDepotAgent& agent, const TCGSI& cgsi, EBlobType type, ui32 part,
+ std::pair<TLogoBlobID, ui32> MakeBlobId(TBlobDepotAgent& agent, const TBlobSeqId& blobSeqId, EBlobType type, ui32 part,
ui32 size) const {
- auto id = cgsi.MakeBlobId(agent.TabletId, type, part, size);
- const auto [channel, groupId] = ChannelGroups[ChannelToIndex[cgsi.Channel]];
- Y_VERIFY_DEBUG(channel == cgsi.Channel);
+ auto id = blobSeqId.MakeBlobId(agent.TabletId, type, part, size);
+ const auto [channel, groupId] = ChannelGroups[ChannelToIndex[blobSeqId.Channel]];
+ Y_VERIFY_DEBUG(channel == blobSeqId.Channel);
return {id, groupId};
}
diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp
index fcd6c0c702..aa8abeff2b 100644
--- a/ydb/core/blob_depot/agent/comm.cpp
+++ b/ydb/core/blob_depot/agent/comm.cpp
@@ -27,25 +27,34 @@ namespace NKikimr::NBlobDepot {
(Msg, msg));
Registered = true;
BlobDepotGeneration = msg.GetGeneration();
- THashSet<NKikimrBlobDepot::TChannelKind::E> allKinds;
+
+ THashSet<NKikimrBlobDepot::TChannelKind::E> vanishedKinds;
for (const auto& [kind, _] : ChannelKinds) {
- allKinds.insert(kind);
+ vanishedKinds.insert(kind);
}
- for (const auto& kind : msg.GetChannelKinds()) {
- auto& v = ChannelKinds.emplace(kind.GetChannelKind(), kind.GetChannelKind()).first->second;
- allKinds.erase(v.Kind);
+
+ for (const auto& ch : msg.GetChannelKinds()) {
+ const NKikimrBlobDepot::TChannelKind::E kind = ch.GetChannelKind();
+ vanishedKinds.erase(kind);
+
+ auto [it, inserted] = ChannelKinds.try_emplace(kind, kind);
+ auto& v = it->second;
+
+ v.ChannelToIndex.fill(0);
v.ChannelGroups.clear();
- v.IndexToChannel.clear();
- for (const auto& channelGroup : kind.GetChannelGroups()) {
+
+ for (const auto& channelGroup : ch.GetChannelGroups()) {
const ui8 channel = channelGroup.GetChannel();
const ui32 groupId = channelGroup.GetGroupId();
+ v.ChannelToIndex[channel] = v.ChannelGroups.size();
v.ChannelGroups.emplace_back(channel, groupId);
- v.ChannelToIndex[channel] = v.IndexToChannel.size();
- v.IndexToChannel.push_back(channel);
}
+
IssueAllocateIdsIfNeeded(v);
}
- for (const auto& kind : allKinds) {
+
+ for (const NKikimrBlobDepot::TChannelKind::E kind : vanishedKinds) {
+ STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA05, "kind vanished", (VirtualGroupId, VirtualGroupId), (Kind, kind));
ChannelKinds.erase(kind);
}
}
@@ -64,16 +73,17 @@ namespace NKikimr::NBlobDepot {
}
void TBlobDepotAgent::Handle(TRequestContext::TPtr context, NKikimrBlobDepot::TEvAllocateIdsResult& msg) {
+ STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA06, "TEvAllocateIdsResult", (VirtualGroupId, VirtualGroupId),
+ (Msg, msg));
+
auto& allocateIdsContext = context->Obtain<TAllocateIdsContext>();
const auto it = ChannelKinds.find(allocateIdsContext.ChannelKind);
- Y_VERIFY(it != ChannelKinds.end());
+ Y_VERIFY_S(it != ChannelKinds.end(), "Kind# " << NKikimrBlobDepot::TChannelKind::E_Name(allocateIdsContext.ChannelKind));
auto& kind = it->second;
Y_VERIFY(kind.IdAllocInFlight);
kind.IdAllocInFlight = false;
- STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA06, "TEvAllocateIdsResult", (VirtualGroupId, VirtualGroupId),
- (Msg, msg));
Y_VERIFY(msg.GetChannelKind() == allocateIdsContext.ChannelKind);
Y_VERIFY(msg.GetGeneration() == BlobDepotGeneration);
diff --git a/ydb/core/blob_depot/agent/read.cpp b/ydb/core/blob_depot/agent/read.cpp
index b1fddaed5a..941a925832 100644
--- a/ydb/core/blob_depot/agent/read.cpp
+++ b/ydb/core/blob_depot/agent/read.cpp
@@ -67,13 +67,13 @@ namespace NKikimr::NBlobDepot {
const ui64 partSize = Min(size ? size : Max<ui64>(), partLen - offset);
- auto cgsi = TCGSI::FromProto(locator.GetBlobSeqId());
+ auto blobSeqId = TBlobSeqId::FromProto(locator.GetBlobSeqId());
if (vg) {
const bool composite = totalDataLen + sizeof(TVirtualGroupBlobFooter) <= MaxBlobSize;
const EBlobType type = composite ? EBlobType::VG_COMPOSITE_BLOB : EBlobType::VG_DATA_BLOB;
const ui32 blobSize = totalDataLen + (composite ? sizeof(TVirtualGroupBlobFooter) : 0);
- const auto id = cgsi.MakeBlobId(TabletId, type, 0, blobSize);
+ const auto id = blobSeqId.MakeBlobId(TabletId, type, 0, blobSize);
items.push_back(TReadItem{locator.GetGroupId(), id, static_cast<ui32>(offset + begin),
static_cast<ui32>(partSize), outputOffset});
} else {
diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp
index 76646cc87d..87bbc0603b 100644
--- a/ydb/core/blob_depot/agent/storage_put.cpp
+++ b/ydb/core/blob_depot/agent/storage_put.cpp
@@ -7,7 +7,7 @@ namespace NKikimr::NBlobDepot {
class TPutQuery : public TQuery {
ui32 BlockChecksRemain = 3;
ui32 PutsInFlight = 0;
- TCGSI CGSI;
+ TBlobSeqId BlobSeqId;
ui32 GroupId;
ui64 TotalDataLen;
@@ -35,12 +35,14 @@ namespace NKikimr::NBlobDepot {
}
auto& kind = it->second;
- std::optional<TCGSI> cgsi = kind.Allocate(Agent);
- if (!cgsi) {
+ std::optional<TBlobSeqId> blobSeqId = kind.Allocate(Agent);
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA01, "allocated BlobSeqId", (VirtualGroupId, Agent.VirtualGroupId),
+ (QueryId, GetQueryId()), (BlobSeqId, blobSeqId));
+ if (!blobSeqId) {
return kind.EnqueueQueryWaitingForId(this);
}
- CGSI = *cgsi;
+ BlobSeqId = *blobSeqId;
auto& msg = *Event->Get<TEvBlobStorage::TEvPut>();
const ui32 size = msg.Id.BlobSize();
@@ -64,15 +66,15 @@ namespace NKikimr::NBlobDepot {
// write single blob with footer
TString buffer = msg.Buffer;
buffer.append(footerData);
- std::tie(id, GroupId) = kind.MakeBlobId(Agent, CGSI, EBlobType::VG_COMPOSITE_BLOB, 0, buffer.size());
+ std::tie(id, GroupId) = kind.MakeBlobId(Agent, BlobSeqId, EBlobType::VG_COMPOSITE_BLOB, 0, buffer.size());
sendPut(id, buffer);
++PutsInFlight;
} else {
// write data blob and blob with footer
- std::tie(id, GroupId) = kind.MakeBlobId(Agent, CGSI, EBlobType::VG_DATA_BLOB, 0, msg.Buffer.size());
+ std::tie(id, GroupId) = kind.MakeBlobId(Agent, BlobSeqId, EBlobType::VG_DATA_BLOB, 0, msg.Buffer.size());
sendPut(id, msg.Buffer);
- std::tie(id, GroupId) = kind.MakeBlobId(Agent, CGSI, EBlobType::VG_FOOTER_BLOB, 0, footerData.size());
+ std::tie(id, GroupId) = kind.MakeBlobId(Agent, BlobSeqId, EBlobType::VG_FOOTER_BLOB, 0, footerData.size());
sendPut(id, TString(footerData));
PutsInFlight += 2;
@@ -117,7 +119,7 @@ namespace NKikimr::NBlobDepot {
item->SetKey(key.data(), key.size());
auto *locator = item->MutableBlobLocator();
locator->SetGroupId(GroupId);
- CGSI.ToProto(locator->MutableBlobSeqId());
+ BlobSeqId.ToProto(locator->MutableBlobSeqId());
locator->SetChecksum(0);
locator->SetTotalDataLen(TotalDataLen);
locator->SetFooterLen(sizeof(TVirtualGroupBlobFooter));
diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h
index ead3fca02b..b1ef30bf07 100644
--- a/ydb/core/blob_depot/blob_depot_tablet.h
+++ b/ydb/core/blob_depot/blob_depot_tablet.h
@@ -112,28 +112,32 @@ namespace NKikimr::NBlobDepot {
}
STFUNC(StateWork) {
- switch (const ui32 type = ev->GetTypeRewrite()) {
- cFunc(TEvents::TSystem::Poison, HandlePoison);
-
- hFunc(TEvBlobDepot::TEvApplyConfig, Handle);
- hFunc(TEvBlobDepot::TEvRegisterAgent, Handle);
- hFunc(TEvBlobDepot::TEvAllocateIds, Handle);
- hFunc(TEvBlobDepot::TEvCommitBlobSeq, Handle);
- hFunc(TEvBlobDepot::TEvResolve, Handle);
-
- hFunc(TEvBlobDepot::TEvBlock, Handle);
- hFunc(TEvBlobDepot::TEvQueryBlocks, Handle);
-
- hFunc(TEvBlobDepot::TEvCollectGarbage, Handle);
-
- hFunc(TEvTabletPipe::TEvServerConnected, Handle);
- hFunc(TEvTabletPipe::TEvServerDisconnected, Handle);
-
- default:
- if (!HandleDefaultEvents(ev, ctx)) {
- Y_FAIL("unexpected event Type# 0x%08" PRIx32, type);
- }
- break;
+ try {
+ switch (const ui32 type = ev->GetTypeRewrite()) {
+ cFunc(TEvents::TSystem::Poison, HandlePoison);
+
+ hFunc(TEvBlobDepot::TEvApplyConfig, Handle);
+ hFunc(TEvBlobDepot::TEvRegisterAgent, Handle);
+ hFunc(TEvBlobDepot::TEvAllocateIds, Handle);
+ hFunc(TEvBlobDepot::TEvCommitBlobSeq, Handle);
+ hFunc(TEvBlobDepot::TEvResolve, Handle);
+
+ hFunc(TEvBlobDepot::TEvBlock, Handle);
+ hFunc(TEvBlobDepot::TEvQueryBlocks, Handle);
+
+ hFunc(TEvBlobDepot::TEvCollectGarbage, Handle);
+
+ hFunc(TEvTabletPipe::TEvServerConnected, Handle);
+ hFunc(TEvTabletPipe::TEvServerDisconnected, Handle);
+
+ default:
+ if (!HandleDefaultEvents(ev, ctx)) {
+ Y_FAIL("unexpected event Type# 0x%08" PRIx32, type);
+ }
+ break;
+ }
+ } catch (...) {
+ Y_FAIL_S("unexpected exception# " << CurrentExceptionMessage());
}
}
@@ -215,11 +219,10 @@ namespace NKikimr::NBlobDepot {
std::optional<TDataValue> FindKey(TStringBuf key);
void ScanRange(const std::optional<TStringBuf>& begin, const std::optional<TStringBuf>& end, TScanFlags flags,
const std::function<bool(TStringBuf, const TDataValue&)>& callback);
- void DeleteKeys(const std::vector<TString>& keysToDelete);
+ void DeleteKey(TStringBuf key);
void PutKey(TString key, TDataValue&& data);
void AddDataOnLoad(TString key, TString value);
- std::optional<TString> UpdatesKeepState(TStringBuf key, NKikimrBlobDepot::EKeepState keepState);
- void UpdateKeepState(const std::vector<std::pair<TString, NKikimrBlobDepot::EKeepState>>& data);
+ std::optional<TString> UpdateKeepState(TStringBuf key, NKikimrBlobDepot::EKeepState keepState);
void Handle(TEvBlobDepot::TEvCommitBlobSeq::TPtr ev);
void Handle(TEvBlobDepot::TEvResolve::TPtr ev);
diff --git a/ydb/core/blob_depot/blocks.cpp b/ydb/core/blob_depot/blocks.cpp
index 5fc4241f20..2bcb4ce0a7 100644
--- a/ydb/core/blob_depot/blocks.cpp
+++ b/ydb/core/blob_depot/blocks.cpp
@@ -46,6 +46,11 @@ namespace NKikimr::NBlobDepot {
if (BlockedGeneration <= block.BlockedGeneration) {
Response->Get<TEvBlobDepot::TEvBlockResult>()->Record.SetStatus(NKikimrProto::ALREADY);
} else {
+ // update block value in memory
+ auto& block = Self->BlocksManager->Blocks[TabletId];
+ block.BlockedGeneration = BlockedGeneration;
+
+ // and persist it
NIceDb::TNiceDb db(txc.DB);
db.Table<Schema::Blocks>().Key(TabletId).Update(
NIceDb::TUpdate<Schema::Blocks::BlockedGeneration>(BlockedGeneration),
@@ -60,11 +65,6 @@ namespace NKikimr::NBlobDepot {
if (Response->Get<TEvBlobDepot::TEvBlockResult>()->Record.GetStatus() != NKikimrProto::OK) {
TActivationContext::Send(Response.release());
} else {
- // update block value in memory
- auto& block = Self->BlocksManager->Blocks[TabletId];
- Y_VERIFY(block.BlockedGeneration < BlockedGeneration);
- block.BlockedGeneration = BlockedGeneration;
-
Self->BlocksManager->OnBlockCommitted(TabletId, BlockedGeneration, NodeId, std::move(Response));
}
}
@@ -142,10 +142,10 @@ namespace NKikimr::NBlobDepot {
}
void IssueBlocksToStorage() {
- TTabletStorageInfo *info = Self->Info();
for (const auto& [_, kind] : Self->ChannelKinds) {
- for (const ui8 channel : kind.IndexToChannel) {
- const ui32 groupId = info->GroupFor(channel, Self->Executor()->Generation());
+ for (const auto& [channel, groupId] : kind.ChannelGroups) {
+ // FIXME: consider previous group generations (because agent can write in obsolete tablet generation)
+ // !!!!!!!!!!!
SendBlock(groupId);
++BlocksPending;
RetryCount += 2;
diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp
index 6f74c9b27b..1ecc15a913 100644
--- a/ydb/core/blob_depot/data.cpp
+++ b/ydb/core/blob_depot/data.cpp
@@ -39,29 +39,40 @@ namespace NKikimr::NBlobDepot {
: Data.lower_bound(*end);
if (flags & EScanFlags::REVERSE) {
- while (beginIt != endIt) {
+ if (beginIt != endIt) {
--endIt;
- if (!callback(endIt->first, endIt->second)) {
- break;
- }
+ do {
+ auto& current = *endIt--;
+ if (!callback(current.first, current.second)) {
+ break;
+ }
+ } while (beginIt != endIt);
}
} else {
while (beginIt != endIt) {
- if (!callback(beginIt->first, beginIt->second)) {
+ auto& current = *beginIt++;
+ if (!callback(current.first, current.second)) {
break;
}
- ++beginIt;
}
}
}
- void DeleteKeys(const std::vector<TString>& keysToDelete) {
- for (const TString& key : keysToDelete) {
- Data.erase(key);
- }
+ void DeleteKey(TStringBuf key) {
+ Data.erase(TString(key));
}
void PutKey(TString key, TDataValue&& data) {
+ auto getKeyString = [&] {
+ if (Self->Config.GetOperationMode() == NKikimrBlobDepot::VirtualGroup) {
+ return TLogoBlobID(reinterpret_cast<const ui64*>(key.data())).ToString();
+ } else {
+ return key;
+ }
+ };
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "PutKey", (TabletId, Self->TabletID()), (Key, EscapeC(getKeyString())),
+ (KeepState, NKikimrBlobDepot::EKeepState_Name(data.KeepState)));
+
Data[std::move(key)] = std::move(data);
}
@@ -77,9 +88,9 @@ namespace NKikimr::NBlobDepot {
});
}
- std::optional<TString> UpdatesKeepState(TStringBuf key, NKikimrBlobDepot::EKeepState keepState) {
- if (const auto it = Data.find(key); it != Data.end()) {
- TDataValue value = it->second;
+ std::optional<TString> UpdateKeepState(TStringBuf key, NKikimrBlobDepot::EKeepState keepState) {
+ auto& value = Data[TString(key)];
+ if (value.KeepState < keepState) {
value.KeepState = keepState;
return ToValueProto(value);
} else {
@@ -87,14 +98,6 @@ namespace NKikimr::NBlobDepot {
}
}
- void UpdateKeepState(const std::vector<std::pair<TString, NKikimrBlobDepot::EKeepState>>& data) {
- for (const auto& [key, keepState] : data) {
- auto& value = Data[std::move(key)];
- Y_VERIFY_DEBUG(value.KeepState < keepState);
- value.KeepState = keepState;
- }
- }
-
static TString ToValueProto(const TDataValue& value) {
NKikimrBlobDepot::TValue proto;
if (value.Meta) {
@@ -128,8 +131,8 @@ namespace NKikimr::NBlobDepot {
return DataManager->ScanRange(begin, end, flags, callback);
}
- void TBlobDepot::DeleteKeys(const std::vector<TString>& keysToDelete) {
- DataManager->DeleteKeys(keysToDelete);
+ void TBlobDepot::DeleteKey(TStringBuf key) {
+ DataManager->DeleteKey(key);
}
void TBlobDepot::PutKey(TString key, TDataValue&& data) {
@@ -140,12 +143,8 @@ namespace NKikimr::NBlobDepot {
DataManager->AddDataOnLoad(std::move(key), std::move(value));
}
- std::optional<TString> TBlobDepot::UpdatesKeepState(TStringBuf key, NKikimrBlobDepot::EKeepState keepState) {
- return DataManager->UpdatesKeepState(key, keepState);
- }
-
- void TBlobDepot::UpdateKeepState(const std::vector<std::pair<TString, NKikimrBlobDepot::EKeepState>>& data) {
- DataManager->UpdateKeepState(data);
+ std::optional<TString> TBlobDepot::UpdateKeepState(TStringBuf key, NKikimrBlobDepot::EKeepState keepState) {
+ return DataManager->UpdateKeepState(key, keepState);
}
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/garbage_collection.cpp b/ydb/core/blob_depot/garbage_collection.cpp
index 2076729c16..3bd4da5c74 100644
--- a/ydb/core/blob_depot/garbage_collection.cpp
+++ b/ydb/core/blob_depot/garbage_collection.cpp
@@ -28,8 +28,6 @@ namespace NKikimr::NBlobDepot {
int KeepIndex = 0;
int DoNotKeepIndex = 0;
ui32 NumKeysProcessed = 0;
- std::vector<TString> KeysToDelete;
- std::vector<std::pair<TString, NKikimrBlobDepot::EKeepState>> KeepStateUpdates;
bool Done = false;
static constexpr ui32 MaxKeysToProcessAtOnce = 10'000;
@@ -55,13 +53,7 @@ namespace NKikimr::NBlobDepot {
}
void Complete(const TActorContext&) override {
- Self->DeleteKeys(KeysToDelete);
- Self->UpdateKeepState(KeepStateUpdates);
-
if (Done || Error) {
- if (Done) {
- ApplyBarrier();
- }
auto [response, _] = TEvBlobDepot::MakeResponseFor(*Request, Self->SelfId(),
Error ? NKikimrProto::ERROR : NKikimrProto::OK, std::move(Error));
TActivationContext::Send(response.release());
@@ -109,8 +101,7 @@ namespace NKikimr::NBlobDepot {
for (; index < items.size() && NumKeysProcessed < MaxKeysToProcessAtOnce; ++index) {
const auto id = LogoBlobIDFromLogoBlobID(items[index]);
const TStringBuf key = id.AsBinaryString();
- if (const auto& value = Self->UpdatesKeepState(key, state)) {
- KeepStateUpdates.emplace_back(key, state);
+ if (const auto& value = Self->UpdateKeepState(key, state)) {
db.Table<Schema::Data>().Key(TString(key)).Update<Schema::Data::Value>(*value);
++NumKeysProcessed;
}
@@ -132,37 +123,24 @@ namespace NKikimr::NBlobDepot {
auto processKey = [&](TStringBuf key, const TDataValue& value) {
if (value.KeepState != NKikimrBlobDepot::EKeepState::Keep || hard) {
+ const TLogoBlobID id(reinterpret_cast<const ui64*>(key.data()));
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "DeleteKey", (TabletId, Self->TabletID()),
+ (BlobId, id));
db.Table<Schema::Data>().Key(TString(key)).Delete();
- KeysToDelete.emplace_back(key);
+ Self->DeleteKey(key);
++NumKeysProcessed;
}
return NumKeysProcessed < MaxKeysToProcessAtOnce;
};
- Self->ScanRange(first.AsBinaryString(), last.AsBinaryString(), EScanFlags::INCLUDE_BEGIN | EScanFlags::INCLUDE_END,
- processKey);
+ Self->ScanRange(first.AsBinaryString(), last.AsBinaryString(),
+ EScanFlags::INCLUDE_BEGIN | EScanFlags::INCLUDE_END, processKey);
if (NumKeysProcessed == MaxKeysToProcessAtOnce) {
return false;
}
- auto row = db.Table<Schema::Barriers>().Key(record.GetTabletId(), record.GetChannel());
- const ui64 collectGenStep = GenStep(record.GetCollectGeneration(), record.GetCollectStep());
- if (record.GetHard()) {
- row.Update<Schema::Barriers::Hard>(collectGenStep);
- } else {
- row.Update<Schema::Barriers::Soft>(collectGenStep);
- }
- row.Update<Schema::Barriers::LastRecordGenStep>(GenStep(record.GetGeneration(), record.GetPerGenerationCounter()));
- }
-
- return true;
- }
-
- void ApplyBarrier() {
- const auto& record = Request->Get()->Record;
- if (record.HasCollectGeneration() && record.HasCollectStep()) {
const auto key = std::make_pair(record.GetTabletId(), record.GetChannel());
auto& barriers = Self->GarbageCollectionManager->Barriers;
auto& barrier = barriers[key];
@@ -173,7 +151,15 @@ namespace NKikimr::NBlobDepot {
barrier.LastRecordGenStep = recordGenStep;
Y_VERIFY(currentGenStep <= collectGenStep);
currentGenStep = collectGenStep;
+
+ db.Table<Schema::Barriers>().Key(record.GetTabletId(), record.GetChannel()).Update(
+ NIceDb::TUpdate<Schema::Barriers::LastRecordGenStep>(recordGenStep),
+ NIceDb::TUpdate<Schema::Barriers::Soft>(barrier.Soft),
+ NIceDb::TUpdate<Schema::Barriers::Hard>(barrier.Hard)
+ );
}
+
+ return true;
}
};
diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp
index 318883bd16..56b9db2849 100644
--- a/ydb/core/blob_depot/op_commit_blob_seq.cpp
+++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp
@@ -6,7 +6,7 @@ namespace NKikimr::NBlobDepot {
void TBlobDepot::Handle(TEvBlobDepot::TEvCommitBlobSeq::TPtr ev) {
class TTxCommitBlobSeq : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
std::unique_ptr<TEvBlobDepot::TEvCommitBlobSeq::THandle> Request;
- std::vector<std::pair<TString, NKikimrBlobDepot::TValue>> UpdateQ;
+ std::unique_ptr<IEventHandle> Response;
public:
TTxCommitBlobSeq(TBlobDepot *self, std::unique_ptr<TEvBlobDepot::TEvCommitBlobSeq::THandle> request)
@@ -17,7 +17,13 @@ namespace NKikimr::NBlobDepot {
bool Execute(TTransactionContext& txc, const TActorContext&) override {
NIceDb::TNiceDb db(txc.DB);
+ NKikimrBlobDepot::TEvCommitBlobSeqResult *responseRecord;
+ std::tie(Response, responseRecord) = TEvBlobDepot::MakeResponseFor(*Request, Self->SelfId());
+
for (const auto& item : Request->Get()->Record.GetItems()) {
+ auto *responseItem = responseRecord->AddItems();
+ responseItem->SetStatus(NKikimrProto::OK);
+
NKikimrBlobDepot::TValue value;
if (item.HasMeta()) {
value.SetMeta(item.GetMeta());
@@ -25,38 +31,35 @@ namespace NKikimr::NBlobDepot {
auto *chain = value.AddValueChain();
chain->MutableLocator()->CopyFrom(item.GetBlobLocator());
- TString valueData;
- const bool success = value.SerializeToString(&valueData);
- Y_VERIFY(success);
-
const TString& key = item.GetKey();
if (key.size() == 3 * sizeof(ui64)) {
const TLogoBlobID id(reinterpret_cast<const ui64*>(key.data()));
if (!Self->CheckBlobForBarrier(id)) {
- continue; // FIXME: report error somehow (?)
+ responseItem->SetStatus(NKikimrProto::ERROR);
+ responseItem->SetErrorReason(TStringBuilder() << "BlobId# " << id << " is being put beyond the barrier");
+ continue;
}
}
- db.Table<Schema::Data>().Key(item.GetKey()).Update<Schema::Data::Value>(valueData);
- UpdateQ.emplace_back(item.GetKey(), std::move(value));
- }
-
- return true;
- }
-
- void Complete(const TActorContext&) override {
- auto [response, record] = TEvBlobDepot::MakeResponseFor(*Request, Self->SelfId());
- for (auto& [key, value] : UpdateQ) {
Self->PutKey(std::move(key), {
.Meta = value.GetMeta(),
.ValueChain = std::move(*value.MutableValueChain()),
.KeepState = value.GetKeepState(),
.Public = value.GetPublic(),
});
- auto *responseItem = record->AddItems();
- responseItem->SetStatus(NKikimrProto::OK);
+
+ TString valueData;
+ const bool success = value.SerializeToString(&valueData);
+ Y_VERIFY(success);
+
+ db.Table<Schema::Data>().Key(item.GetKey()).Update<Schema::Data::Value>(valueData);
}
- TActivationContext::Send(response.release());
+
+ return true;
+ }
+
+ void Complete(const TActorContext&) override {
+ TActivationContext::Send(Response.release());
}
};
diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h
index 5841b36aba..c3e65dd220 100644
--- a/ydb/core/blob_depot/types.h
+++ b/ydb/core/blob_depot/types.h
@@ -8,7 +8,7 @@ namespace NKikimr::NBlobDepot {
struct TChannelKind {
std::array<ui8, 256> ChannelToIndex;
- std::vector<ui8> IndexToChannel;
+ std::vector<std::pair<ui8, ui32>> ChannelGroups;
};
#pragma pack(push, 1)
@@ -27,7 +27,7 @@ namespace NKikimr::NBlobDepot {
VG_GC_BLOB = 3, // garbage collection command
};
- struct TCGSI {
+ struct TBlobSeqId {
static constexpr ui32 IndexBits = 20;
static constexpr ui32 MaxIndex = (1 << IndexBits) - 1;
@@ -37,33 +37,38 @@ namespace NKikimr::NBlobDepot {
ui32 Index = 0;
auto AsTuple() const { return std::make_tuple(Channel, Generation, Step, Index); }
- friend bool operator ==(const TCGSI& x, const TCGSI& y) { return x.AsTuple() == y.AsTuple(); }
- friend bool operator !=(const TCGSI& x, const TCGSI& y) { return x.AsTuple() != y.AsTuple(); }
+ friend bool operator ==(const TBlobSeqId& x, const TBlobSeqId& y) { return x.AsTuple() == y.AsTuple(); }
+ friend bool operator !=(const TBlobSeqId& x, const TBlobSeqId& y) { return x.AsTuple() != y.AsTuple(); }
+
+ TString ToString() const {
+ return TStringBuilder() << "{" << Channel << ":" << Generation << ":" << Step << ":" << Index << "}";
+ }
explicit operator bool() const {
- return *this != TCGSI();
+ return *this != TBlobSeqId();
}
ui64 ToBinary(const TChannelKind& kind) const {
Y_VERIFY_DEBUG(Index <= MaxIndex);
Y_VERIFY(Channel < kind.ChannelToIndex.size());
- return (static_cast<ui64>(Step) << IndexBits | Index) * kind.IndexToChannel.size() + kind.ChannelToIndex[Channel];
+ return (static_cast<ui64>(Step) << IndexBits | Index) * kind.ChannelGroups.size() + kind.ChannelToIndex[Channel];
}
- static TCGSI FromBinary(ui32 generation, const TChannelKind& kind, ui64 value) {
+ static TBlobSeqId FromBinary(ui32 generation, const TChannelKind& kind, ui64 value) {
static_assert(sizeof(long long) >= sizeof(ui64));
- auto res = std::lldiv(value, kind.IndexToChannel.size());
+ Y_VERIFY(!kind.ChannelGroups.empty());
+ auto res = std::lldiv(value, kind.ChannelGroups.size());
- return TCGSI{
- .Channel = kind.IndexToChannel[res.rem],
+ return TBlobSeqId{
+ .Channel = kind.ChannelGroups[res.rem].first,
.Generation = generation,
.Step = static_cast<ui32>(res.quot >> IndexBits),
.Index = static_cast<ui32>(res.quot) & MaxIndex
};
}
- static TCGSI FromProto(const NKikimrBlobDepot::TBlobSeqId& proto) {
- return TCGSI{
+ static TBlobSeqId FromProto(const NKikimrBlobDepot::TBlobSeqId& proto) {
+ return TBlobSeqId{
proto.GetChannel(),
proto.GetGeneration(),
proto.GetStep(),