diff options
author | Alexander Rutkovsky <alexander.rutkovsky@gmail.com> | 2022-07-06 14:01:50 +0300 |
---|---|---|
committer | Alexander Rutkovsky <alexander.rutkovsky@gmail.com> | 2022-07-06 14:01:50 +0300 |
commit | 34362463f988a45a252ec6816453b3bea478152e (patch) | |
tree | 0f4173e5e896fb5a63f44b6371a6e2b869c390eb | |
parent | 99c29df361c9a4e7f9377ed58025dccf521d86c4 (diff) | |
download | ydb-34362463f988a45a252ec6816453b3bea478152e.tar.gz |
BlobDepot work in progress KIKIMR-14867
ref:cf621fb462ea79d0b982a2a91bf5ff128f7a545a
-rw-r--r-- | ydb/core/blob_depot/agent.cpp | 14 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/agent_impl.h | 17 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/comm.cpp | 36 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/read.cpp | 4 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_put.cpp | 18 | ||||
-rw-r--r-- | ydb/core/blob_depot/blob_depot_tablet.h | 53 | ||||
-rw-r--r-- | ydb/core/blob_depot/blocks.cpp | 16 | ||||
-rw-r--r-- | ydb/core/blob_depot/data.cpp | 57 | ||||
-rw-r--r-- | ydb/core/blob_depot/garbage_collection.cpp | 44 | ||||
-rw-r--r-- | ydb/core/blob_depot/op_commit_blob_seq.cpp | 41 | ||||
-rw-r--r-- | ydb/core/blob_depot/types.h | 29 |
11 files changed, 169 insertions, 160 deletions
diff --git a/ydb/core/blob_depot/agent.cpp b/ydb/core/blob_depot/agent.cpp index fd94bdfd15..ff78ad2e75 100644 --- a/ydb/core/blob_depot/agent.cpp +++ b/ydb/core/blob_depot/agent.cpp @@ -48,10 +48,10 @@ namespace NKikimr::NBlobDepot { for (const auto& [k, v] : ChannelKinds) { auto *proto = record->AddChannelKinds(); proto->SetChannelKind(k); - for (const ui32 channel : v.IndexToChannel) { + for (const auto& [channel, groupId] : v.ChannelGroups) { auto *cg = proto->AddChannelGroups(); cg->SetChannel(channel); - cg->SetGroupId(Info()->Channels[channel].History.back().GroupID); + cg->SetGroupId(groupId); } } @@ -94,20 +94,22 @@ namespace NKikimr::NBlobDepot { } void TBlobDepot::InitChannelKinds() { + TTabletStorageInfo *info = Info(); + const ui32 generation = Executor()->Generation(); + ui32 channel = 0; for (const auto& profile : Config.GetChannelProfiles()) { for (ui32 i = 0, count = profile.GetCount(); i < count; ++i, ++channel) { if (channel >= 2) { const auto kind = profile.GetChannelKind(); auto& p = ChannelKinds[kind]; - const ui32 indexWithinKind = p.IndexToChannel.size(); - p.IndexToChannel.push_back(channel); - p.ChannelToIndex[indexWithinKind] = channel; + p.ChannelToIndex[channel] = p.ChannelGroups.size(); + p.ChannelGroups.emplace_back(channel, info->GroupFor(channel, generation)); } } } for (auto& [k, v] : ChannelKinds) { - v.NextBlobSeqId = TCGSI{v.IndexToChannel.front(), Executor()->Generation(), 1, 0}.ToBinary(v); + v.NextBlobSeqId = TBlobSeqId{v.ChannelGroups.front().first, generation, 1, 0}.ToBinary(v); } } diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h index e6bcc3a6a5..95216de7d6 100644 --- a/ydb/core/blob_depot/agent/agent_impl.h +++ b/ydb/core/blob_depot/agent/agent_impl.h @@ -23,7 +23,7 @@ namespace NKikimr::NBlobDepot { template<typename T> T& Obtain() { T *sp = static_cast<T*>(this); - Y_VERIFY_DEBUG(sp == dynamic_cast<T*>(this)); + Y_VERIFY_DEBUG(sp && sp == dynamic_cast<T*>(this)); return *sp; } @@ -253,7 +253,6 @@ namespace NKikimr::NBlobDepot { }; const NKikimrBlobDepot::TChannelKind::E Kind; - std::vector<std::pair<ui8, ui32>> ChannelGroups; bool IdAllocInFlight = false; std::deque<TAllocatedId> IdQ; @@ -265,26 +264,26 @@ namespace NKikimr::NBlobDepot { : Kind(kind) {} - std::optional<TCGSI> Allocate(TBlobDepotAgent& agent) { + std::optional<TBlobSeqId> Allocate(TBlobDepotAgent& agent) { if (IdQ.empty()) { return std::nullopt; } auto& item = IdQ.front(); - auto cgsi = TCGSI::FromBinary(item.Generation, *this, item.Begin++); + auto blobSeqId = TBlobSeqId::FromBinary(item.Generation, *this, item.Begin++); if (item.Begin == item.End) { IdQ.pop_front(); agent.IssueAllocateIdsIfNeeded(*this); } - return cgsi; + return blobSeqId; } - std::pair<TLogoBlobID, ui32> MakeBlobId(TBlobDepotAgent& agent, const TCGSI& cgsi, EBlobType type, ui32 part, + std::pair<TLogoBlobID, ui32> MakeBlobId(TBlobDepotAgent& agent, const TBlobSeqId& blobSeqId, EBlobType type, ui32 part, ui32 size) const { - auto id = cgsi.MakeBlobId(agent.TabletId, type, part, size); - const auto [channel, groupId] = ChannelGroups[ChannelToIndex[cgsi.Channel]]; - Y_VERIFY_DEBUG(channel == cgsi.Channel); + auto id = blobSeqId.MakeBlobId(agent.TabletId, type, part, size); + const auto [channel, groupId] = ChannelGroups[ChannelToIndex[blobSeqId.Channel]]; + Y_VERIFY_DEBUG(channel == blobSeqId.Channel); return {id, groupId}; } diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp index fcd6c0c702..aa8abeff2b 100644 --- a/ydb/core/blob_depot/agent/comm.cpp +++ b/ydb/core/blob_depot/agent/comm.cpp @@ -27,25 +27,34 @@ namespace NKikimr::NBlobDepot { (Msg, msg)); Registered = true; BlobDepotGeneration = msg.GetGeneration(); - THashSet<NKikimrBlobDepot::TChannelKind::E> allKinds; + + THashSet<NKikimrBlobDepot::TChannelKind::E> vanishedKinds; for (const auto& [kind, _] : ChannelKinds) { - allKinds.insert(kind); + vanishedKinds.insert(kind); } - for (const auto& kind : msg.GetChannelKinds()) { - auto& v = ChannelKinds.emplace(kind.GetChannelKind(), kind.GetChannelKind()).first->second; - allKinds.erase(v.Kind); + + for (const auto& ch : msg.GetChannelKinds()) { + const NKikimrBlobDepot::TChannelKind::E kind = ch.GetChannelKind(); + vanishedKinds.erase(kind); + + auto [it, inserted] = ChannelKinds.try_emplace(kind, kind); + auto& v = it->second; + + v.ChannelToIndex.fill(0); v.ChannelGroups.clear(); - v.IndexToChannel.clear(); - for (const auto& channelGroup : kind.GetChannelGroups()) { + + for (const auto& channelGroup : ch.GetChannelGroups()) { const ui8 channel = channelGroup.GetChannel(); const ui32 groupId = channelGroup.GetGroupId(); + v.ChannelToIndex[channel] = v.ChannelGroups.size(); v.ChannelGroups.emplace_back(channel, groupId); - v.ChannelToIndex[channel] = v.IndexToChannel.size(); - v.IndexToChannel.push_back(channel); } + IssueAllocateIdsIfNeeded(v); } - for (const auto& kind : allKinds) { + + for (const NKikimrBlobDepot::TChannelKind::E kind : vanishedKinds) { + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA05, "kind vanished", (VirtualGroupId, VirtualGroupId), (Kind, kind)); ChannelKinds.erase(kind); } } @@ -64,16 +73,17 @@ namespace NKikimr::NBlobDepot { } void TBlobDepotAgent::Handle(TRequestContext::TPtr context, NKikimrBlobDepot::TEvAllocateIdsResult& msg) { + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA06, "TEvAllocateIdsResult", (VirtualGroupId, VirtualGroupId), + (Msg, msg)); + auto& allocateIdsContext = context->Obtain<TAllocateIdsContext>(); const auto it = ChannelKinds.find(allocateIdsContext.ChannelKind); - Y_VERIFY(it != ChannelKinds.end()); + Y_VERIFY_S(it != ChannelKinds.end(), "Kind# " << NKikimrBlobDepot::TChannelKind::E_Name(allocateIdsContext.ChannelKind)); auto& kind = it->second; Y_VERIFY(kind.IdAllocInFlight); kind.IdAllocInFlight = false; - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA06, "TEvAllocateIdsResult", (VirtualGroupId, VirtualGroupId), - (Msg, msg)); Y_VERIFY(msg.GetChannelKind() == allocateIdsContext.ChannelKind); Y_VERIFY(msg.GetGeneration() == BlobDepotGeneration); diff --git a/ydb/core/blob_depot/agent/read.cpp b/ydb/core/blob_depot/agent/read.cpp index b1fddaed5a..941a925832 100644 --- a/ydb/core/blob_depot/agent/read.cpp +++ b/ydb/core/blob_depot/agent/read.cpp @@ -67,13 +67,13 @@ namespace NKikimr::NBlobDepot { const ui64 partSize = Min(size ? size : Max<ui64>(), partLen - offset); - auto cgsi = TCGSI::FromProto(locator.GetBlobSeqId()); + auto blobSeqId = TBlobSeqId::FromProto(locator.GetBlobSeqId()); if (vg) { const bool composite = totalDataLen + sizeof(TVirtualGroupBlobFooter) <= MaxBlobSize; const EBlobType type = composite ? EBlobType::VG_COMPOSITE_BLOB : EBlobType::VG_DATA_BLOB; const ui32 blobSize = totalDataLen + (composite ? sizeof(TVirtualGroupBlobFooter) : 0); - const auto id = cgsi.MakeBlobId(TabletId, type, 0, blobSize); + const auto id = blobSeqId.MakeBlobId(TabletId, type, 0, blobSize); items.push_back(TReadItem{locator.GetGroupId(), id, static_cast<ui32>(offset + begin), static_cast<ui32>(partSize), outputOffset}); } else { diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp index 76646cc87d..87bbc0603b 100644 --- a/ydb/core/blob_depot/agent/storage_put.cpp +++ b/ydb/core/blob_depot/agent/storage_put.cpp @@ -7,7 +7,7 @@ namespace NKikimr::NBlobDepot { class TPutQuery : public TQuery { ui32 BlockChecksRemain = 3; ui32 PutsInFlight = 0; - TCGSI CGSI; + TBlobSeqId BlobSeqId; ui32 GroupId; ui64 TotalDataLen; @@ -35,12 +35,14 @@ namespace NKikimr::NBlobDepot { } auto& kind = it->second; - std::optional<TCGSI> cgsi = kind.Allocate(Agent); - if (!cgsi) { + std::optional<TBlobSeqId> blobSeqId = kind.Allocate(Agent); + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA01, "allocated BlobSeqId", (VirtualGroupId, Agent.VirtualGroupId), + (QueryId, GetQueryId()), (BlobSeqId, blobSeqId)); + if (!blobSeqId) { return kind.EnqueueQueryWaitingForId(this); } - CGSI = *cgsi; + BlobSeqId = *blobSeqId; auto& msg = *Event->Get<TEvBlobStorage::TEvPut>(); const ui32 size = msg.Id.BlobSize(); @@ -64,15 +66,15 @@ namespace NKikimr::NBlobDepot { // write single blob with footer TString buffer = msg.Buffer; buffer.append(footerData); - std::tie(id, GroupId) = kind.MakeBlobId(Agent, CGSI, EBlobType::VG_COMPOSITE_BLOB, 0, buffer.size()); + std::tie(id, GroupId) = kind.MakeBlobId(Agent, BlobSeqId, EBlobType::VG_COMPOSITE_BLOB, 0, buffer.size()); sendPut(id, buffer); ++PutsInFlight; } else { // write data blob and blob with footer - std::tie(id, GroupId) = kind.MakeBlobId(Agent, CGSI, EBlobType::VG_DATA_BLOB, 0, msg.Buffer.size()); + std::tie(id, GroupId) = kind.MakeBlobId(Agent, BlobSeqId, EBlobType::VG_DATA_BLOB, 0, msg.Buffer.size()); sendPut(id, msg.Buffer); - std::tie(id, GroupId) = kind.MakeBlobId(Agent, CGSI, EBlobType::VG_FOOTER_BLOB, 0, footerData.size()); + std::tie(id, GroupId) = kind.MakeBlobId(Agent, BlobSeqId, EBlobType::VG_FOOTER_BLOB, 0, footerData.size()); sendPut(id, TString(footerData)); PutsInFlight += 2; @@ -117,7 +119,7 @@ namespace NKikimr::NBlobDepot { item->SetKey(key.data(), key.size()); auto *locator = item->MutableBlobLocator(); locator->SetGroupId(GroupId); - CGSI.ToProto(locator->MutableBlobSeqId()); + BlobSeqId.ToProto(locator->MutableBlobSeqId()); locator->SetChecksum(0); locator->SetTotalDataLen(TotalDataLen); locator->SetFooterLen(sizeof(TVirtualGroupBlobFooter)); diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h index ead3fca02b..b1ef30bf07 100644 --- a/ydb/core/blob_depot/blob_depot_tablet.h +++ b/ydb/core/blob_depot/blob_depot_tablet.h @@ -112,28 +112,32 @@ namespace NKikimr::NBlobDepot { } STFUNC(StateWork) { - switch (const ui32 type = ev->GetTypeRewrite()) { - cFunc(TEvents::TSystem::Poison, HandlePoison); - - hFunc(TEvBlobDepot::TEvApplyConfig, Handle); - hFunc(TEvBlobDepot::TEvRegisterAgent, Handle); - hFunc(TEvBlobDepot::TEvAllocateIds, Handle); - hFunc(TEvBlobDepot::TEvCommitBlobSeq, Handle); - hFunc(TEvBlobDepot::TEvResolve, Handle); - - hFunc(TEvBlobDepot::TEvBlock, Handle); - hFunc(TEvBlobDepot::TEvQueryBlocks, Handle); - - hFunc(TEvBlobDepot::TEvCollectGarbage, Handle); - - hFunc(TEvTabletPipe::TEvServerConnected, Handle); - hFunc(TEvTabletPipe::TEvServerDisconnected, Handle); - - default: - if (!HandleDefaultEvents(ev, ctx)) { - Y_FAIL("unexpected event Type# 0x%08" PRIx32, type); - } - break; + try { + switch (const ui32 type = ev->GetTypeRewrite()) { + cFunc(TEvents::TSystem::Poison, HandlePoison); + + hFunc(TEvBlobDepot::TEvApplyConfig, Handle); + hFunc(TEvBlobDepot::TEvRegisterAgent, Handle); + hFunc(TEvBlobDepot::TEvAllocateIds, Handle); + hFunc(TEvBlobDepot::TEvCommitBlobSeq, Handle); + hFunc(TEvBlobDepot::TEvResolve, Handle); + + hFunc(TEvBlobDepot::TEvBlock, Handle); + hFunc(TEvBlobDepot::TEvQueryBlocks, Handle); + + hFunc(TEvBlobDepot::TEvCollectGarbage, Handle); + + hFunc(TEvTabletPipe::TEvServerConnected, Handle); + hFunc(TEvTabletPipe::TEvServerDisconnected, Handle); + + default: + if (!HandleDefaultEvents(ev, ctx)) { + Y_FAIL("unexpected event Type# 0x%08" PRIx32, type); + } + break; + } + } catch (...) { + Y_FAIL_S("unexpected exception# " << CurrentExceptionMessage()); } } @@ -215,11 +219,10 @@ namespace NKikimr::NBlobDepot { std::optional<TDataValue> FindKey(TStringBuf key); void ScanRange(const std::optional<TStringBuf>& begin, const std::optional<TStringBuf>& end, TScanFlags flags, const std::function<bool(TStringBuf, const TDataValue&)>& callback); - void DeleteKeys(const std::vector<TString>& keysToDelete); + void DeleteKey(TStringBuf key); void PutKey(TString key, TDataValue&& data); void AddDataOnLoad(TString key, TString value); - std::optional<TString> UpdatesKeepState(TStringBuf key, NKikimrBlobDepot::EKeepState keepState); - void UpdateKeepState(const std::vector<std::pair<TString, NKikimrBlobDepot::EKeepState>>& data); + std::optional<TString> UpdateKeepState(TStringBuf key, NKikimrBlobDepot::EKeepState keepState); void Handle(TEvBlobDepot::TEvCommitBlobSeq::TPtr ev); void Handle(TEvBlobDepot::TEvResolve::TPtr ev); diff --git a/ydb/core/blob_depot/blocks.cpp b/ydb/core/blob_depot/blocks.cpp index 5fc4241f20..2bcb4ce0a7 100644 --- a/ydb/core/blob_depot/blocks.cpp +++ b/ydb/core/blob_depot/blocks.cpp @@ -46,6 +46,11 @@ namespace NKikimr::NBlobDepot { if (BlockedGeneration <= block.BlockedGeneration) { Response->Get<TEvBlobDepot::TEvBlockResult>()->Record.SetStatus(NKikimrProto::ALREADY); } else { + // update block value in memory + auto& block = Self->BlocksManager->Blocks[TabletId]; + block.BlockedGeneration = BlockedGeneration; + + // and persist it NIceDb::TNiceDb db(txc.DB); db.Table<Schema::Blocks>().Key(TabletId).Update( NIceDb::TUpdate<Schema::Blocks::BlockedGeneration>(BlockedGeneration), @@ -60,11 +65,6 @@ namespace NKikimr::NBlobDepot { if (Response->Get<TEvBlobDepot::TEvBlockResult>()->Record.GetStatus() != NKikimrProto::OK) { TActivationContext::Send(Response.release()); } else { - // update block value in memory - auto& block = Self->BlocksManager->Blocks[TabletId]; - Y_VERIFY(block.BlockedGeneration < BlockedGeneration); - block.BlockedGeneration = BlockedGeneration; - Self->BlocksManager->OnBlockCommitted(TabletId, BlockedGeneration, NodeId, std::move(Response)); } } @@ -142,10 +142,10 @@ namespace NKikimr::NBlobDepot { } void IssueBlocksToStorage() { - TTabletStorageInfo *info = Self->Info(); for (const auto& [_, kind] : Self->ChannelKinds) { - for (const ui8 channel : kind.IndexToChannel) { - const ui32 groupId = info->GroupFor(channel, Self->Executor()->Generation()); + for (const auto& [channel, groupId] : kind.ChannelGroups) { + // FIXME: consider previous group generations (because agent can write in obsolete tablet generation) + // !!!!!!!!!!! SendBlock(groupId); ++BlocksPending; RetryCount += 2; diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp index 6f74c9b27b..1ecc15a913 100644 --- a/ydb/core/blob_depot/data.cpp +++ b/ydb/core/blob_depot/data.cpp @@ -39,29 +39,40 @@ namespace NKikimr::NBlobDepot { : Data.lower_bound(*end); if (flags & EScanFlags::REVERSE) { - while (beginIt != endIt) { + if (beginIt != endIt) { --endIt; - if (!callback(endIt->first, endIt->second)) { - break; - } + do { + auto& current = *endIt--; + if (!callback(current.first, current.second)) { + break; + } + } while (beginIt != endIt); } } else { while (beginIt != endIt) { - if (!callback(beginIt->first, beginIt->second)) { + auto& current = *beginIt++; + if (!callback(current.first, current.second)) { break; } - ++beginIt; } } } - void DeleteKeys(const std::vector<TString>& keysToDelete) { - for (const TString& key : keysToDelete) { - Data.erase(key); - } + void DeleteKey(TStringBuf key) { + Data.erase(TString(key)); } void PutKey(TString key, TDataValue&& data) { + auto getKeyString = [&] { + if (Self->Config.GetOperationMode() == NKikimrBlobDepot::VirtualGroup) { + return TLogoBlobID(reinterpret_cast<const ui64*>(key.data())).ToString(); + } else { + return key; + } + }; + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "PutKey", (TabletId, Self->TabletID()), (Key, EscapeC(getKeyString())), + (KeepState, NKikimrBlobDepot::EKeepState_Name(data.KeepState))); + Data[std::move(key)] = std::move(data); } @@ -77,9 +88,9 @@ namespace NKikimr::NBlobDepot { }); } - std::optional<TString> UpdatesKeepState(TStringBuf key, NKikimrBlobDepot::EKeepState keepState) { - if (const auto it = Data.find(key); it != Data.end()) { - TDataValue value = it->second; + std::optional<TString> UpdateKeepState(TStringBuf key, NKikimrBlobDepot::EKeepState keepState) { + auto& value = Data[TString(key)]; + if (value.KeepState < keepState) { value.KeepState = keepState; return ToValueProto(value); } else { @@ -87,14 +98,6 @@ namespace NKikimr::NBlobDepot { } } - void UpdateKeepState(const std::vector<std::pair<TString, NKikimrBlobDepot::EKeepState>>& data) { - for (const auto& [key, keepState] : data) { - auto& value = Data[std::move(key)]; - Y_VERIFY_DEBUG(value.KeepState < keepState); - value.KeepState = keepState; - } - } - static TString ToValueProto(const TDataValue& value) { NKikimrBlobDepot::TValue proto; if (value.Meta) { @@ -128,8 +131,8 @@ namespace NKikimr::NBlobDepot { return DataManager->ScanRange(begin, end, flags, callback); } - void TBlobDepot::DeleteKeys(const std::vector<TString>& keysToDelete) { - DataManager->DeleteKeys(keysToDelete); + void TBlobDepot::DeleteKey(TStringBuf key) { + DataManager->DeleteKey(key); } void TBlobDepot::PutKey(TString key, TDataValue&& data) { @@ -140,12 +143,8 @@ namespace NKikimr::NBlobDepot { DataManager->AddDataOnLoad(std::move(key), std::move(value)); } - std::optional<TString> TBlobDepot::UpdatesKeepState(TStringBuf key, NKikimrBlobDepot::EKeepState keepState) { - return DataManager->UpdatesKeepState(key, keepState); - } - - void TBlobDepot::UpdateKeepState(const std::vector<std::pair<TString, NKikimrBlobDepot::EKeepState>>& data) { - DataManager->UpdateKeepState(data); + std::optional<TString> TBlobDepot::UpdateKeepState(TStringBuf key, NKikimrBlobDepot::EKeepState keepState) { + return DataManager->UpdateKeepState(key, keepState); } } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/garbage_collection.cpp b/ydb/core/blob_depot/garbage_collection.cpp index 2076729c16..3bd4da5c74 100644 --- a/ydb/core/blob_depot/garbage_collection.cpp +++ b/ydb/core/blob_depot/garbage_collection.cpp @@ -28,8 +28,6 @@ namespace NKikimr::NBlobDepot { int KeepIndex = 0; int DoNotKeepIndex = 0; ui32 NumKeysProcessed = 0; - std::vector<TString> KeysToDelete; - std::vector<std::pair<TString, NKikimrBlobDepot::EKeepState>> KeepStateUpdates; bool Done = false; static constexpr ui32 MaxKeysToProcessAtOnce = 10'000; @@ -55,13 +53,7 @@ namespace NKikimr::NBlobDepot { } void Complete(const TActorContext&) override { - Self->DeleteKeys(KeysToDelete); - Self->UpdateKeepState(KeepStateUpdates); - if (Done || Error) { - if (Done) { - ApplyBarrier(); - } auto [response, _] = TEvBlobDepot::MakeResponseFor(*Request, Self->SelfId(), Error ? NKikimrProto::ERROR : NKikimrProto::OK, std::move(Error)); TActivationContext::Send(response.release()); @@ -109,8 +101,7 @@ namespace NKikimr::NBlobDepot { for (; index < items.size() && NumKeysProcessed < MaxKeysToProcessAtOnce; ++index) { const auto id = LogoBlobIDFromLogoBlobID(items[index]); const TStringBuf key = id.AsBinaryString(); - if (const auto& value = Self->UpdatesKeepState(key, state)) { - KeepStateUpdates.emplace_back(key, state); + if (const auto& value = Self->UpdateKeepState(key, state)) { db.Table<Schema::Data>().Key(TString(key)).Update<Schema::Data::Value>(*value); ++NumKeysProcessed; } @@ -132,37 +123,24 @@ namespace NKikimr::NBlobDepot { auto processKey = [&](TStringBuf key, const TDataValue& value) { if (value.KeepState != NKikimrBlobDepot::EKeepState::Keep || hard) { + const TLogoBlobID id(reinterpret_cast<const ui64*>(key.data())); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "DeleteKey", (TabletId, Self->TabletID()), + (BlobId, id)); db.Table<Schema::Data>().Key(TString(key)).Delete(); - KeysToDelete.emplace_back(key); + Self->DeleteKey(key); ++NumKeysProcessed; } return NumKeysProcessed < MaxKeysToProcessAtOnce; }; - Self->ScanRange(first.AsBinaryString(), last.AsBinaryString(), EScanFlags::INCLUDE_BEGIN | EScanFlags::INCLUDE_END, - processKey); + Self->ScanRange(first.AsBinaryString(), last.AsBinaryString(), + EScanFlags::INCLUDE_BEGIN | EScanFlags::INCLUDE_END, processKey); if (NumKeysProcessed == MaxKeysToProcessAtOnce) { return false; } - auto row = db.Table<Schema::Barriers>().Key(record.GetTabletId(), record.GetChannel()); - const ui64 collectGenStep = GenStep(record.GetCollectGeneration(), record.GetCollectStep()); - if (record.GetHard()) { - row.Update<Schema::Barriers::Hard>(collectGenStep); - } else { - row.Update<Schema::Barriers::Soft>(collectGenStep); - } - row.Update<Schema::Barriers::LastRecordGenStep>(GenStep(record.GetGeneration(), record.GetPerGenerationCounter())); - } - - return true; - } - - void ApplyBarrier() { - const auto& record = Request->Get()->Record; - if (record.HasCollectGeneration() && record.HasCollectStep()) { const auto key = std::make_pair(record.GetTabletId(), record.GetChannel()); auto& barriers = Self->GarbageCollectionManager->Barriers; auto& barrier = barriers[key]; @@ -173,7 +151,15 @@ namespace NKikimr::NBlobDepot { barrier.LastRecordGenStep = recordGenStep; Y_VERIFY(currentGenStep <= collectGenStep); currentGenStep = collectGenStep; + + db.Table<Schema::Barriers>().Key(record.GetTabletId(), record.GetChannel()).Update( + NIceDb::TUpdate<Schema::Barriers::LastRecordGenStep>(recordGenStep), + NIceDb::TUpdate<Schema::Barriers::Soft>(barrier.Soft), + NIceDb::TUpdate<Schema::Barriers::Hard>(barrier.Hard) + ); } + + return true; } }; diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp index 318883bd16..56b9db2849 100644 --- a/ydb/core/blob_depot/op_commit_blob_seq.cpp +++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp @@ -6,7 +6,7 @@ namespace NKikimr::NBlobDepot { void TBlobDepot::Handle(TEvBlobDepot::TEvCommitBlobSeq::TPtr ev) { class TTxCommitBlobSeq : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { std::unique_ptr<TEvBlobDepot::TEvCommitBlobSeq::THandle> Request; - std::vector<std::pair<TString, NKikimrBlobDepot::TValue>> UpdateQ; + std::unique_ptr<IEventHandle> Response; public: TTxCommitBlobSeq(TBlobDepot *self, std::unique_ptr<TEvBlobDepot::TEvCommitBlobSeq::THandle> request) @@ -17,7 +17,13 @@ namespace NKikimr::NBlobDepot { bool Execute(TTransactionContext& txc, const TActorContext&) override { NIceDb::TNiceDb db(txc.DB); + NKikimrBlobDepot::TEvCommitBlobSeqResult *responseRecord; + std::tie(Response, responseRecord) = TEvBlobDepot::MakeResponseFor(*Request, Self->SelfId()); + for (const auto& item : Request->Get()->Record.GetItems()) { + auto *responseItem = responseRecord->AddItems(); + responseItem->SetStatus(NKikimrProto::OK); + NKikimrBlobDepot::TValue value; if (item.HasMeta()) { value.SetMeta(item.GetMeta()); @@ -25,38 +31,35 @@ namespace NKikimr::NBlobDepot { auto *chain = value.AddValueChain(); chain->MutableLocator()->CopyFrom(item.GetBlobLocator()); - TString valueData; - const bool success = value.SerializeToString(&valueData); - Y_VERIFY(success); - const TString& key = item.GetKey(); if (key.size() == 3 * sizeof(ui64)) { const TLogoBlobID id(reinterpret_cast<const ui64*>(key.data())); if (!Self->CheckBlobForBarrier(id)) { - continue; // FIXME: report error somehow (?) + responseItem->SetStatus(NKikimrProto::ERROR); + responseItem->SetErrorReason(TStringBuilder() << "BlobId# " << id << " is being put beyond the barrier"); + continue; } } - db.Table<Schema::Data>().Key(item.GetKey()).Update<Schema::Data::Value>(valueData); - UpdateQ.emplace_back(item.GetKey(), std::move(value)); - } - - return true; - } - - void Complete(const TActorContext&) override { - auto [response, record] = TEvBlobDepot::MakeResponseFor(*Request, Self->SelfId()); - for (auto& [key, value] : UpdateQ) { Self->PutKey(std::move(key), { .Meta = value.GetMeta(), .ValueChain = std::move(*value.MutableValueChain()), .KeepState = value.GetKeepState(), .Public = value.GetPublic(), }); - auto *responseItem = record->AddItems(); - responseItem->SetStatus(NKikimrProto::OK); + + TString valueData; + const bool success = value.SerializeToString(&valueData); + Y_VERIFY(success); + + db.Table<Schema::Data>().Key(item.GetKey()).Update<Schema::Data::Value>(valueData); } - TActivationContext::Send(response.release()); + + return true; + } + + void Complete(const TActorContext&) override { + TActivationContext::Send(Response.release()); } }; diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h index 5841b36aba..c3e65dd220 100644 --- a/ydb/core/blob_depot/types.h +++ b/ydb/core/blob_depot/types.h @@ -8,7 +8,7 @@ namespace NKikimr::NBlobDepot { struct TChannelKind { std::array<ui8, 256> ChannelToIndex; - std::vector<ui8> IndexToChannel; + std::vector<std::pair<ui8, ui32>> ChannelGroups; }; #pragma pack(push, 1) @@ -27,7 +27,7 @@ namespace NKikimr::NBlobDepot { VG_GC_BLOB = 3, // garbage collection command }; - struct TCGSI { + struct TBlobSeqId { static constexpr ui32 IndexBits = 20; static constexpr ui32 MaxIndex = (1 << IndexBits) - 1; @@ -37,33 +37,38 @@ namespace NKikimr::NBlobDepot { ui32 Index = 0; auto AsTuple() const { return std::make_tuple(Channel, Generation, Step, Index); } - friend bool operator ==(const TCGSI& x, const TCGSI& y) { return x.AsTuple() == y.AsTuple(); } - friend bool operator !=(const TCGSI& x, const TCGSI& y) { return x.AsTuple() != y.AsTuple(); } + friend bool operator ==(const TBlobSeqId& x, const TBlobSeqId& y) { return x.AsTuple() == y.AsTuple(); } + friend bool operator !=(const TBlobSeqId& x, const TBlobSeqId& y) { return x.AsTuple() != y.AsTuple(); } + + TString ToString() const { + return TStringBuilder() << "{" << Channel << ":" << Generation << ":" << Step << ":" << Index << "}"; + } explicit operator bool() const { - return *this != TCGSI(); + return *this != TBlobSeqId(); } ui64 ToBinary(const TChannelKind& kind) const { Y_VERIFY_DEBUG(Index <= MaxIndex); Y_VERIFY(Channel < kind.ChannelToIndex.size()); - return (static_cast<ui64>(Step) << IndexBits | Index) * kind.IndexToChannel.size() + kind.ChannelToIndex[Channel]; + return (static_cast<ui64>(Step) << IndexBits | Index) * kind.ChannelGroups.size() + kind.ChannelToIndex[Channel]; } - static TCGSI FromBinary(ui32 generation, const TChannelKind& kind, ui64 value) { + static TBlobSeqId FromBinary(ui32 generation, const TChannelKind& kind, ui64 value) { static_assert(sizeof(long long) >= sizeof(ui64)); - auto res = std::lldiv(value, kind.IndexToChannel.size()); + Y_VERIFY(!kind.ChannelGroups.empty()); + auto res = std::lldiv(value, kind.ChannelGroups.size()); - return TCGSI{ - .Channel = kind.IndexToChannel[res.rem], + return TBlobSeqId{ + .Channel = kind.ChannelGroups[res.rem].first, .Generation = generation, .Step = static_cast<ui32>(res.quot >> IndexBits), .Index = static_cast<ui32>(res.quot) & MaxIndex }; } - static TCGSI FromProto(const NKikimrBlobDepot::TBlobSeqId& proto) { - return TCGSI{ + static TBlobSeqId FromProto(const NKikimrBlobDepot::TBlobSeqId& proto) { + return TBlobSeqId{ proto.GetChannel(), proto.GetGeneration(), proto.GetStep(), |