diff options
author | Alexander Rutkovsky <alexander.rutkovsky@gmail.com> | 2022-07-05 14:20:43 +0300 |
---|---|---|
committer | Alexander Rutkovsky <alexander.rutkovsky@gmail.com> | 2022-07-05 14:20:43 +0300 |
commit | c3fe0cad2a593816dcc77b126e63b772c262e915 (patch) | |
tree | a096b0cbc7dd9ff28b83f170e785e2a44effc11e | |
parent | bc3062704271d10240ecd6177321745bdbcb5b9d (diff) | |
download | ydb-c3fe0cad2a593816dcc77b126e63b772c262e915.tar.gz |
BlobDepot work in progress KIKIMR-14867
ref:4c86ac8a78779c6eadb6d26d525d14601a92b7a9
23 files changed, 558 insertions, 98 deletions
diff --git a/ydb/core/base/logoblob.h b/ydb/core/base/logoblob.h index 36987e7684d..ddcecd3b065 100644 --- a/ydb/core/base/logoblob.h +++ b/ydb/core/base/logoblob.h @@ -324,3 +324,15 @@ struct THash<NKikimr::TLogoBlobID> { return x.Hash(); } }; + +template<> +inline NKikimr::TLogoBlobID Min<NKikimr::TLogoBlobID>() noexcept { + return {}; +} + +template<> +inline NKikimr::TLogoBlobID Max<NKikimr::TLogoBlobID>() noexcept { + return NKikimr::TLogoBlobID(Max<ui64>(), Max<ui32>(), Max<ui32>(), NKikimr::TLogoBlobID::MaxChannel, + NKikimr::TLogoBlobID::MaxBlobSize, NKikimr::TLogoBlobID::MaxCookie, NKikimr::TLogoBlobID::MaxPartId, + NKikimr::TLogoBlobID::MaxCrcMode); +} diff --git a/ydb/core/blob_depot/CMakeLists.txt b/ydb/core/blob_depot/CMakeLists.txt index f30810879af..f393be16f28 100644 --- a/ydb/core/blob_depot/CMakeLists.txt +++ b/ydb/core/blob_depot/CMakeLists.txt @@ -16,7 +16,7 @@ target_link_libraries(ydb-core-blob_depot PUBLIC ) target_sources(ydb-core-blob_depot PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blob_depot.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blob_depot_agent.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blocks.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/garbage_collection.cpp diff --git a/ydb/core/blob_depot/blob_depot_agent.cpp b/ydb/core/blob_depot/agent.cpp index 9a9a7cbab6e..fd94bdfd15a 100644 --- a/ydb/core/blob_depot/blob_depot_agent.cpp +++ b/ydb/core/blob_depot/agent.cpp @@ -82,9 +82,14 @@ namespace NKikimr::NBlobDepot { const auto it = PipeServerToNode.find(pipeServerId); Y_VERIFY(it != PipeServerToNode.end()); Y_VERIFY(it->second); - const auto agentIt = Agents.find(*it->second); + TAgentInfo& agent = GetAgent(*it->second); + Y_VERIFY(agent.ConnectedAgent == pipeServerId); + return agent; + } + + TBlobDepot::TAgentInfo& TBlobDepot::GetAgent(ui32 nodeId) { + const auto agentIt = Agents.find(nodeId); Y_VERIFY(agentIt != Agents.end()); - Y_VERIFY(agentIt->second.ConnectedAgent == pipeServerId); return agentIt->second; } diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h index 7123f0293c6..e6bcc3a6a58 100644 --- a/ydb/core/blob_depot/agent/agent_impl.h +++ b/ydb/core/blob_depot/agent/agent_impl.h @@ -98,10 +98,13 @@ namespace NKikimr::NBlobDepot { hFunc(TEvTabletPipe::TEvClientConnected, Handle); hFunc(TEvTabletPipe::TEvClientDestroyed, Handle); + hFunc(TEvBlobDepot::TEvPushNotify, Handle); + hFunc(TEvBlobDepot::TEvRegisterAgentResult, HandleTabletResponse); hFunc(TEvBlobDepot::TEvAllocateIdsResult, HandleTabletResponse); hFunc(TEvBlobDepot::TEvBlockResult, HandleTabletResponse); hFunc(TEvBlobDepot::TEvQueryBlocksResult, HandleTabletResponse); + hFunc(TEvBlobDepot::TEvCollectGarbageResult, HandleTabletResponse); hFunc(TEvBlobDepot::TEvCommitBlobSeqResult, HandleTabletResponse); hFunc(TEvBlobDepot::TEvResolveResult, HandleTabletResponse); @@ -188,6 +191,8 @@ namespace NKikimr::NBlobDepot { void Issue(std::unique_ptr<IEventBase> ev, TRequestSender *sender, TRequestContext::TPtr context); + void Handle(TEvBlobDepot::TEvPushNotify::TPtr ev); + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// struct TExecutingQueries {}; @@ -319,7 +324,9 @@ namespace NKikimr::NBlobDepot { ui32 GetBlockForTablet(ui64 tabletId); - void SetBlockForTablet(ui64 tabletId, ui32 blockedGeneration, TMonotonic expirationTimestamp); + void SetBlockForTablet(ui64 tabletId, ui32 blockedGeneration, TMonotonic issueTimestamp, TDuration timeToLive); + + void OnBlockedTablets(const NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TEvPushNotify::TBlockedTablet>& tablets); //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Reading diff --git a/ydb/core/blob_depot/agent/blocks.cpp b/ydb/core/blob_depot/agent/blocks.cpp index a812a6e2ab1..a3b070799fd 100644 --- a/ydb/core/blob_depot/agent/blocks.cpp +++ b/ydb/core/blob_depot/agent/blocks.cpp @@ -5,14 +5,15 @@ namespace NKikimr::NBlobDepot { class TBlobDepotAgent::TBlocksManager : public TRequestSender { - struct TBlockInfo { + struct TBlock { ui32 BlockedGeneration; + TDuration TimeToLive; TMonotonic ExpirationTimestamp; // not valid after bool RefreshInFlight = false; TIntrusiveList<TQuery, TPendingBlockChecks> PendingBlockChecks; }; - THashMap<ui64, TBlockInfo> Blocks; + THashMap<ui64, TBlock> Blocks; public: TBlocksManager(TBlobDepotAgent& agent) @@ -33,16 +34,18 @@ namespace NKikimr::NBlobDepot { NKikimrProto::EReplyStatus CheckBlockForTablet(ui64 tabletId, ui32 generation, TQuery *query, ui32 *blockedGeneration) { + NKikimrProto::EReplyStatus status = NKikimrProto::UNKNOWN; auto& block = Blocks[tabletId]; - const TMonotonic issueTime = TActivationContext::Monotonic(); + const TMonotonic now = TActivationContext::Monotonic(); if (generation <= block.BlockedGeneration) { - return NKikimrProto::RACE; - } else if (issueTime < block.ExpirationTimestamp) { + status = NKikimrProto::RACE; + } else if (now < block.ExpirationTimestamp) { if (blockedGeneration) { *blockedGeneration = block.BlockedGeneration; } - return NKikimrProto::OK; - } else if (!block.RefreshInFlight) { + status = NKikimrProto::OK; + } + if (status != NKikimrProto::RACE && now + block.TimeToLive / 2 >= block.ExpirationTimestamp && !block.RefreshInFlight) { NKikimrBlobDepot::TEvQueryBlocks queryBlocks; queryBlocks.AddTabletIds(tabletId); Agent.Issue(std::move(queryBlocks), this, std::make_shared<TQueryBlockContext>( @@ -50,14 +53,20 @@ namespace NKikimr::NBlobDepot { block.RefreshInFlight = true; block.PendingBlockChecks.PushBack(query); } - return NKikimrProto::UNKNOWN; + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA99, "CheckBlockForTablet", (QueryId, query->GetQueryId()), + (TabletId, tabletId), (Generation, generation), (Status, status), (Now, now), + (ExpirationTimestamp, block.ExpirationTimestamp)); + return status; } void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr context, TResponse response) override { if (auto *p = std::get_if<TEvBlobDepot::TEvQueryBlocksResult*>(&response)) { Handle(std::move(context), (*p)->Record); } else if (std::holds_alternative<TTabletDisconnected>(response)) { - IssueOnUpdateBlock(context, false); + auto& queryBlockContext = context->Obtain<TQueryBlockContext>(); + if (const auto it = Blocks.find(queryBlockContext.TabletId); it != Blocks.end()) { + IssueOnUpdateBlock(it->second, false); + } } else { Y_FAIL("unexpected response type"); } @@ -72,19 +81,18 @@ namespace NKikimr::NBlobDepot { const ui32 newBlockedGeneration = msg.GetBlockedGenerations(0); Y_VERIFY(block.BlockedGeneration <= newBlockedGeneration); block.BlockedGeneration = newBlockedGeneration; - block.ExpirationTimestamp = queryBlockContext.Timestamp + TDuration::MilliSeconds(msg.GetTimeToLiveMs()); - IssueOnUpdateBlock(context, true); + block.TimeToLive = TDuration::MilliSeconds(msg.GetTimeToLiveMs()); + block.ExpirationTimestamp = queryBlockContext.Timestamp + block.TimeToLive; + block.RefreshInFlight = false; + IssueOnUpdateBlock(block, true); } - void IssueOnUpdateBlock(const TRequestContext::TPtr& context, bool success) { - auto& queryBlockContext = context->Obtain<TQueryBlockContext>(); - auto& block = Blocks[queryBlockContext.TabletId]; - TIntrusiveList<TQuery, TPendingBlockChecks> temp; - temp.Swap(block.PendingBlockChecks); - for (auto it = temp.begin(); it != temp.end(); ) { - const auto current = it++; - current->OnUpdateBlock(success); - } + void IssueOnUpdateBlock(TBlock& block, bool success) { + TIntrusiveList<TQuery, TPendingBlockChecks> pendingBlockChecks; + pendingBlockChecks.Append(block.PendingBlockChecks); + pendingBlockChecks.ForEach([success](TQuery *query) { + query->OnUpdateBlock(success); + }); } ui32 GetBlockForTablet(ui64 tabletId) { @@ -92,14 +100,22 @@ namespace NKikimr::NBlobDepot { return it != Blocks.end() ? it->second.BlockedGeneration : 0; } - void SetBlockForTablet(ui64 tabletId, ui32 blockedGeneration, TMonotonic expirationTimestamp) { + void SetBlockForTablet(ui64 tabletId, ui32 blockedGeneration, TMonotonic timestamp, TDuration timeToLive) { auto& block = Blocks[tabletId]; Y_VERIFY(block.BlockedGeneration <= blockedGeneration); - if (block.BlockedGeneration < blockedGeneration) { - block.BlockedGeneration = blockedGeneration; - block.ExpirationTimestamp = expirationTimestamp; - } else if (block.ExpirationTimestamp < expirationTimestamp) { - block.ExpirationTimestamp = expirationTimestamp; + block.BlockedGeneration = blockedGeneration; + block.TimeToLive = timeToLive; + block.ExpirationTimestamp = timestamp + timeToLive; + } + + void OnBlockedTablets(const NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TEvPushNotify::TBlockedTablet>& tablets) { + for (const auto& tablet : tablets) { + if (const auto it = Blocks.find(tablet.GetTabletId()); it != Blocks.end()) { + auto& block = it->second; + block.BlockedGeneration = tablet.GetBlockedGeneration(); + block.ExpirationTimestamp = TMonotonic::Zero(); + IssueOnUpdateBlock(block, true); + } } } }; @@ -123,8 +139,12 @@ namespace NKikimr::NBlobDepot { return BlocksManager->GetBlockForTablet(tabletId); } - void TBlobDepotAgent::SetBlockForTablet(ui64 tabletId, ui32 blockedGeneration, TMonotonic expirationTimestamp) { - BlocksManager->SetBlockForTablet(tabletId, blockedGeneration, expirationTimestamp); + void TBlobDepotAgent::SetBlockForTablet(ui64 tabletId, ui32 blockedGeneration, TMonotonic timestamp, TDuration timeToLive) { + BlocksManager->SetBlockForTablet(tabletId, blockedGeneration, timestamp, timeToLive); + } + + void TBlobDepotAgent::OnBlockedTablets(const NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TEvPushNotify::TBlockedTablet>& tablets) { + BlocksManager->OnBlockedTablets(tablets); } } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp index 4fbba6f39d0..fcd6c0c7022 100644 --- a/ydb/core/blob_depot/agent/comm.cpp +++ b/ydb/core/blob_depot/agent/comm.cpp @@ -137,4 +137,12 @@ namespace NKikimr::NBlobDepot { RegisterRequest(id, sender, std::move(context), {}, true); } + void TBlobDepotAgent::Handle(TEvBlobDepot::TEvPushNotify::TPtr ev) { + auto& msg = ev->Get()->Record; + OnBlockedTablets(msg.GetBlockedTablets()); + + auto [response, _] = TEvBlobDepot::MakeResponseFor(*ev, SelfId()); + TActivationContext::Send(response.release()); + } + } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/request.cpp b/ydb/core/blob_depot/agent/request.cpp index 0851d290778..8e73045be96 100644 --- a/ydb/core/blob_depot/agent/request.cpp +++ b/ydb/core/blob_depot/agent/request.cpp @@ -67,6 +67,7 @@ namespace NKikimr::NBlobDepot { template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvAllocateIdsResult::TPtr ev); template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvBlockResult::TPtr ev); template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvQueryBlocksResult::TPtr ev); + template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvCollectGarbageResult::TPtr ev); template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvCommitBlobSeqResult::TPtr ev); template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvResolveResult::TPtr ev); diff --git a/ydb/core/blob_depot/agent/storage_block.cpp b/ydb/core/blob_depot/agent/storage_block.cpp index 9503615f912..65155971033 100644 --- a/ydb/core/blob_depot/agent/storage_block.cpp +++ b/ydb/core/blob_depot/agent/storage_block.cpp @@ -52,7 +52,7 @@ namespace NKikimr::NBlobDepot { // update blocks cache auto& blockContext = context->Obtain<TBlockContext>(); auto& query = *Event->Get<TEvBlobStorage::TEvBlock>(); - Agent.SetBlockForTablet(query.TabletId, query.Generation, blockContext.Timestamp + + Agent.SetBlockForTablet(query.TabletId, query.Generation, blockContext.Timestamp, TDuration::MilliSeconds(msg.Record.GetTimeToLiveMs())); EndWithSuccess(std::make_unique<TEvBlobStorage::TEvBlockResult>(NKikimrProto::OK)); } diff --git a/ydb/core/blob_depot/agent/storage_collect_garbage.cpp b/ydb/core/blob_depot/agent/storage_collect_garbage.cpp index fdac21f6ea2..0d29a5272ed 100644 --- a/ydb/core/blob_depot/agent/storage_collect_garbage.cpp +++ b/ydb/core/blob_depot/agent/storage_collect_garbage.cpp @@ -23,7 +23,6 @@ namespace NKikimr::NBlobDepot { NumDoNotKeep = msg.DoNotKeep ? msg.DoNotKeep->size() : 0; const auto status = Agent.CheckBlockForTablet(msg.TabletId, msg.RecordGeneration, this); - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA99, "CheckBlockForTablet", (Status, status)); if (status == NKikimrProto::OK) { IssueCollectGarbage(); } else if (status != NKikimrProto::UNKNOWN) { diff --git a/ydb/core/blob_depot/agent/storage_discover.cpp b/ydb/core/blob_depot/agent/storage_discover.cpp index 4b1e45c74e5..9b5c37b4a7e 100644 --- a/ydb/core/blob_depot/agent/storage_discover.cpp +++ b/ydb/core/blob_depot/agent/storage_discover.cpp @@ -60,6 +60,8 @@ namespace NKikimr::NBlobDepot { void ProcessResponse(ui64 id, TRequestContext::TPtr context, TResponse response) override { if (std::holds_alternative<TTabletDisconnected>(response)) { return EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected"); + } else if (auto *p = std::get_if<TEvBlobStorage::TEvGetResult*>(&response)) { + Agent.HandleGetResult(context, **p); } else if (auto *p = std::get_if<TEvBlobDepot::TEvResolveResult*>(&response)) { HandleResolveResult(id, std::move(context), **p); } else { diff --git a/ydb/core/blob_depot/agent/storage_range.cpp b/ydb/core/blob_depot/agent/storage_range.cpp index 95049a2401b..57c5de67b9c 100644 --- a/ydb/core/blob_depot/agent/storage_range.cpp +++ b/ydb/core/blob_depot/agent/storage_range.cpp @@ -5,16 +5,141 @@ namespace NKikimr::NBlobDepot { template<> TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvRange>(std::unique_ptr<IEventHandle> ev) { class TRangeQuery : public TQuery { + std::unique_ptr<TEvBlobStorage::TEvRangeResult> Response; + ui32 ReadsInFlight = 0; + ui32 ResolvesInFlight = 0; + + struct TExtraResolveContext : TRequestContext { + const size_t Index; + + TExtraResolveContext(size_t index) + : Index(index) + {} + }; + public: using TQuery::TQuery; void Initiate() override { - EndWithError(NKikimrProto::ERROR, "not implemented"); + auto& msg = *Event->Get<TEvBlobStorage::TEvRange>(); + Response = std::make_unique<TEvBlobStorage::TEvRangeResult>(NKikimrProto::OK, msg.From, msg.To, + Agent.VirtualGroupId); + + IssueResolve(); + } + + void IssueResolve() { + auto& msg = *Event->Get<TEvBlobStorage::TEvRange>(); + + TStringBuf from = msg.From.AsBinaryString(); + TStringBuf to = msg.To.AsBinaryString(); + const bool reverse = msg.To < msg.From; + if (reverse) { + std::swap(from, to); + } + + NKikimrBlobDepot::TEvResolve resolve; + auto *item = resolve.AddItems(); + item->SetBeginningKey(from.data(), from.size()); + item->SetIncludeBeginning(true); + item->SetEndingKey(to.data(), to.size()); + item->SetIncludeEnding(true); + item->SetReverse(reverse); + + Agent.Issue(std::move(resolve), this, nullptr); + ++ResolvesInFlight; + } + + void IssueResolve(TLogoBlobID id, size_t index) { + NKikimrBlobDepot::TEvResolve resolve; + auto *item = resolve.AddItems(); + item->SetBeginningKey(id.GetRaw(), 3 * sizeof(ui64)); + item->SetIncludeBeginning(true); + item->SetEndingKey(id.GetRaw(), 3 * sizeof(ui64)); + item->SetIncludeEnding(true); + + Agent.Issue(std::move(resolve), this, std::make_shared<TExtraResolveContext>(index)); + ++ResolvesInFlight; + } + + void ProcessResponse(ui64 id, TRequestContext::TPtr context, TResponse response) override { + if (auto *p = std::get_if<TEvBlobDepot::TEvResolveResult*>(&response)) { + HandleResolveResult(id, std::move(context), (*p)->Record); + } else if (auto *p = std::get_if<TEvBlobStorage::TEvGetResult*>(&response)) { + Agent.HandleGetResult(context, **p); + } else if (std::holds_alternative<TTabletDisconnected>(response)) { + EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected"); + } else { + Y_FAIL(); + } } - void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr /*context*/, TResponse response) override { - (void)response; - Y_FAIL(); + void HandleResolveResult(ui64 id, TRequestContext::TPtr context, NKikimrBlobDepot::TEvResolveResult& msg) { + auto& query = *Event->Get<TEvBlobStorage::TEvRange>(); + + --ResolvesInFlight; + + if (msg.GetStatus() != NKikimrProto::OK && msg.GetStatus() != NKikimrProto::OVERRUN) { + return EndWithError(msg.GetStatus(), msg.GetErrorReason()); + } + + for (const auto& key : msg.GetResolvedKeys()) { + const TString& blobId = key.GetKey(); + Y_VERIFY(blobId.size() == 3 * sizeof(ui64)); + TLogoBlobID id(reinterpret_cast<const ui64*>(blobId.data())); + + const size_t index = context + ? context->Obtain<TExtraResolveContext>().Index + : Response->Responses.size(); + if (!context) { + Response->Responses.emplace_back(id, TString()); + } + + if (!query.IsIndexOnly) { + TString error; + if (!Agent.IssueRead(key.GetValueChain(), 0, 0, NKikimrBlobStorage::EGetHandleClass::FastRead, + query.MustRestoreFirst, this, index, true, &error)) { + return EndWithError(NKikimrProto::ERROR, TStringBuilder() << "failed to read discovered blob: " + << error); + } + ++ReadsInFlight; + } else if (query.MustRestoreFirst) { + Y_FAIL("not implemented yet"); + } + } + + if (msg.GetStatus() == NKikimrProto::OVERRUN) { + Agent.RegisterRequest(id, this, std::move(context), {}, true); + } else if (msg.GetStatus() == NKikimrProto::OK) { + if (!ReadsInFlight && !ResolvesInFlight) { + EndWithSuccess(std::move(Response)); + } + } else { + Y_UNREACHABLE(); + } + } + + void OnRead(ui64 tag, NKikimrProto::EReplyStatus status, TString dataOrErrorReason) override { + auto& item = Response->Responses[tag]; + --ReadsInFlight; + + switch (status) { + case NKikimrProto::OK: + item.Buffer = std::move(dataOrErrorReason); + break; + + case NKikimrProto::NODATA: + IssueResolve(item.Id, tag); + break; + + default: + return EndWithError(status, TStringBuilder() << "failed to retrieve BlobId# " + << item.Id << " Error# " << dataOrErrorReason); + } + + if (!ReadsInFlight && !ResolvesInFlight) { + EndWithSuccess(std::move(Response)); + } } }; diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h index e27b820ed13..ead3fca02b0 100644 --- a/ydb/core/blob_depot/blob_depot_tablet.h +++ b/ydb/core/blob_depot/blob_depot_tablet.h @@ -62,6 +62,7 @@ namespace NKikimr::NBlobDepot { void OnAgentConnect(TAgentInfo& agent); void Handle(TEvBlobDepot::TEvAllocateIds::TPtr ev); TAgentInfo& GetAgent(const TActorId& pipeServerId); + TAgentInfo& GetAgent(ui32 nodeId); //////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -183,6 +184,8 @@ namespace NKikimr::NBlobDepot { void Handle(TEvBlobDepot::TEvCollectGarbage::TPtr ev); + bool CheckBlobForBarrier(TLogoBlobID id) const; + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Data operations @@ -215,6 +218,8 @@ namespace NKikimr::NBlobDepot { void DeleteKeys(const std::vector<TString>& keysToDelete); void PutKey(TString key, TDataValue&& data); void AddDataOnLoad(TString key, TString value); + std::optional<TString> UpdatesKeepState(TStringBuf key, NKikimrBlobDepot::EKeepState keepState); + void UpdateKeepState(const std::vector<std::pair<TString, NKikimrBlobDepot::EKeepState>>& data); void Handle(TEvBlobDepot::TEvCommitBlobSeq::TPtr ev); void Handle(TEvBlobDepot::TEvResolve::TPtr ev); diff --git a/ydb/core/blob_depot/blocks.cpp b/ydb/core/blob_depot/blocks.cpp index 2d0377db14a..5fc4241f208 100644 --- a/ydb/core/blob_depot/blocks.cpp +++ b/ydb/core/blob_depot/blocks.cpp @@ -4,8 +4,23 @@ namespace NKikimr::NBlobDepot { class TBlobDepot::TBlocksManager { + // wait duration before issuing blocks via storage + static constexpr TDuration AgentsWaitTime = TDuration::Seconds(1); + + // TTL for block lease + static constexpr TDuration BlockLeaseTime = TDuration::Seconds(60); + + struct TBlock { + struct TPerAgentInfo { + TMonotonic ExpirationTimestamp = TMonotonic::Zero(); + }; + + ui32 BlockedGeneration = 0; + THashMap<ui32, TPerAgentInfo> PerAgentInfo; + }; + TBlobDepot* const Self; - THashMap<ui64, ui32> Blocks; + THashMap<ui64, TBlock> Blocks; private: class TTxUpdateBlock : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { @@ -14,7 +29,6 @@ namespace NKikimr::NBlobDepot { const ui32 NodeId; const TInstant Timestamp; std::unique_ptr<IEventHandle> Response; - bool RaceDetected; public: TTxUpdateBlock(TBlobDepot *self, ui64 tabletId, ui32 blockedGeneration, ui32 nodeId, TInstant timestamp, @@ -28,9 +42,8 @@ namespace NKikimr::NBlobDepot { {} bool Execute(TTransactionContext& txc, const TActorContext&) override { - const auto [it, inserted] = Self->BlocksManager->Blocks.emplace(TabletId, BlockedGeneration); - RaceDetected = !inserted && BlockedGeneration <= it->second; - if (RaceDetected) { + auto& block = Self->BlocksManager->Blocks[TabletId]; + if (BlockedGeneration <= block.BlockedGeneration) { Response->Get<TEvBlobDepot::TEvBlockResult>()->Record.SetStatus(NKikimrProto::ALREADY); } else { NIceDb::TNiceDb db(txc.DB); @@ -44,10 +57,15 @@ namespace NKikimr::NBlobDepot { } void Complete(const TActorContext&) override { - if (RaceDetected) { + if (Response->Get<TEvBlobDepot::TEvBlockResult>()->Record.GetStatus() != NKikimrProto::OK) { TActivationContext::Send(Response.release()); } else { - Self->BlocksManager->OnBlockCommitted(TabletId, BlockedGeneration, std::move(Response)); + // update block value in memory + auto& block = Self->BlocksManager->Blocks[TabletId]; + Y_VERIFY(block.BlockedGeneration < BlockedGeneration); + block.BlockedGeneration = BlockedGeneration; + + Self->BlocksManager->OnBlockCommitted(TabletId, BlockedGeneration, NodeId, std::move(Response)); } } }; @@ -56,21 +74,74 @@ namespace NKikimr::NBlobDepot { TBlobDepot* const Self; const ui64 TabletId; const ui32 BlockedGeneration; + const ui32 NodeId; std::unique_ptr<IEventHandle> Response; ui32 BlocksPending = 0; ui32 RetryCount = 0; const ui64 IssuerGuid = RandomNumber<ui64>() | 1; + THashSet<ui32> NodesWaitingForPushResult; public: - TBlockProcessorActor(TBlobDepot *self, ui64 tabletId, ui32 blockedGeneration, + TBlockProcessorActor(TBlobDepot *self, ui64 tabletId, ui32 blockedGeneration, ui32 nodeId, std::unique_ptr<IEventHandle> response) : Self(self) , TabletId(tabletId) , BlockedGeneration(blockedGeneration) + , NodeId(nodeId) , Response(std::move(response)) {} void Bootstrap() { + IssueNotificationsToAgents(); + Become(&TThis::StateFunc, AgentsWaitTime, new TEvents::TEvWakeup); + } + + void IssueNotificationsToAgents() { + const TMonotonic now = TActivationContext::Monotonic(); + auto& block = Self->BlocksManager->Blocks[TabletId]; + for (const auto& [agentId, info] : block.PerAgentInfo) { + if (agentId == NodeId) { + // skip the origin agent + continue; + } + if (info.ExpirationTimestamp <= now) { + SendPushToAgent(agentId); + } + } + } + + void SendPushToAgent(ui32 agentId) { + auto ev = std::make_unique<TEvBlobDepot::TEvPushNotify>(); + auto *item = ev->Record.AddBlockedTablets(); + item->SetTabletId(TabletId); + item->SetBlockedGeneration(BlockedGeneration); + + TAgentInfo& agent = Self->GetAgent(agentId); + if (const auto& actorId = agent.ConnectedAgent) { + Send(*actorId, ev.release(), IEventHandle::FlagTrackDelivery, IssuerGuid); + } + NodesWaitingForPushResult.insert(agentId); + } + + void Handle(TEvBlobDepot::TEvPushNotifyResult::TPtr ev) { + const ui32 agentId = ev->Sender.NodeId(); + const size_t numErased = NodesWaitingForPushResult.erase(agentId); + Y_VERIFY(numErased == 1 && ev->Cookie == IssuerGuid); + + // mark lease as successfully revoked one + auto& block = Self->BlocksManager->Blocks[TabletId]; + block.PerAgentInfo.erase(agentId); + + if (NodesWaitingForPushResult.empty()) { + Finish(); + } + } + + void Handle(TEvents::TEvUndelivered::TPtr /*ev*/) { + // can't reach an agent to notify it about blocked generation change -- we can't do anything here + } + + void IssueBlocksToStorage() { TTabletStorageInfo *info = Self->Info(); for (const auto& [_, kind] : Self->ChannelKinds) { for (const ui8 channel : kind.IndexToChannel) { @@ -80,8 +151,6 @@ namespace NKikimr::NBlobDepot { RetryCount += 2; } } - - Become(&TThis::StateFunc); } void SendBlock(ui32 groupId) { @@ -93,8 +162,7 @@ namespace NKikimr::NBlobDepot { switch (ev->Get()->Status) { case NKikimrProto::OK: if (!--BlocksPending) { - TActivationContext::Send(Response.release()); - PassAway(); + Finish(); } break; @@ -108,6 +176,7 @@ namespace NKikimr::NBlobDepot { auto& r = Response->Get<TEvBlobDepot::TEvBlockResult>()->Record; r.SetStatus(NKikimrProto::ERROR); r.SetErrorReason(ev->Get()->ErrorReason); + Finish(); } else { SendBlock(ev->Cookie); } @@ -115,8 +184,16 @@ namespace NKikimr::NBlobDepot { } } + void Finish() { + TActivationContext::Send(Response.release()); + PassAway(); + } + STRICT_STFUNC(StateFunc, hFunc(TEvBlobStorage::TEvBlockResult, Handle); + hFunc(TEvBlobDepot::TEvPushNotifyResult, Handle); + hFunc(TEvents::TEvUndelivered, Handle); + cFunc(TEvents::TSystem::Wakeup, IssueBlocksToStorage); cFunc(TEvents::TSystem::Poison, PassAway); ) }; @@ -127,17 +204,18 @@ namespace NKikimr::NBlobDepot { {} void AddBlockOnLoad(ui64 tabletId, ui32 generation) { - Blocks.emplace(tabletId, generation); + Blocks[tabletId].BlockedGeneration = generation; } - void OnBlockCommitted(ui64 tabletId, ui32 blockedGeneration, - std::unique_ptr<IEventHandle> response) { - Self->RegisterWithSameMailbox(new TBlockProcessorActor(Self, tabletId, blockedGeneration, std::move(response))); + void OnBlockCommitted(ui64 tabletId, ui32 blockedGeneration, ui32 nodeId, std::unique_ptr<IEventHandle> response) { + Self->RegisterWithSameMailbox(new TBlockProcessorActor(Self, tabletId, blockedGeneration, nodeId, + std::move(response))); } void Handle(TEvBlobDepot::TEvBlock::TPtr ev) { const auto& record = ev->Get()->Record; - auto [response, responseRecord] = TEvBlobDepot::MakeResponseFor(*ev, Self->SelfId(), NKikimrProto::OK, std::nullopt); + auto [response, responseRecord] = TEvBlobDepot::MakeResponseFor(*ev, Self->SelfId(), NKikimrProto::OK, + std::nullopt, BlockLeaseTime.MilliSeconds()); if (!record.HasTabletId() || !record.HasBlockedGeneration()) { responseRecord->SetStatus(NKikimrProto::ERROR); @@ -145,8 +223,8 @@ namespace NKikimr::NBlobDepot { } else { const ui64 tabletId = record.GetTabletId(); const ui32 blockedGeneration = record.GetBlockedGeneration(); - if (const auto it = Blocks.find(tabletId); it != Blocks.end() && blockedGeneration <= it->second) { - responseRecord->SetStatus(NKikimrProto::ERROR); + if (const auto it = Blocks.find(tabletId); it != Blocks.end() && blockedGeneration <= it->second.BlockedGeneration) { + responseRecord->SetStatus(NKikimrProto::ALREADY); } else { TAgentInfo& agent = Self->GetAgent(ev->Recipient); Self->Execute(std::make_unique<TTxUpdateBlock>(Self, tabletId, blockedGeneration, @@ -158,13 +236,20 @@ namespace NKikimr::NBlobDepot { } void Handle(TEvBlobDepot::TEvQueryBlocks::TPtr ev) { + TAgentInfo& agent = Self->GetAgent(ev->Recipient); + const ui32 agentId = agent.ConnectedNodeId; + Y_VERIFY(agentId); + + const TMonotonic now = TActivationContext::Monotonic(); + const auto& record = ev->Get()->Record; auto [response, responseRecord] = TEvBlobDepot::MakeResponseFor(*ev, Self->SelfId()); - responseRecord->SetTimeToLiveMs(15000); // FIXME + responseRecord->SetTimeToLiveMs(BlockLeaseTime.MilliSeconds()); for (const ui64 tabletId : record.GetTabletIds()) { - const auto it = Blocks.find(tabletId); - responseRecord->AddBlockedGenerations(it != Blocks.end() ? it->second : 0); + auto& block = Blocks[tabletId]; + responseRecord->AddBlockedGenerations(block.BlockedGeneration); + block.PerAgentInfo[agentId].ExpirationTimestamp = now + BlockLeaseTime; } TActivationContext::Send(response.release()); diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp index 688c3144d8f..6f74c9b27b1 100644 --- a/ydb/core/blob_depot/data.cpp +++ b/ydb/core/blob_depot/data.cpp @@ -14,6 +14,7 @@ namespace NKikimr::NBlobDepot { }; std::map<TString, TDataValue, TCompareKey> Data; + std::set<TLogoBlobID> DataBlobIds; public: TDataManager(TBlobDepot *self) @@ -75,6 +76,43 @@ namespace NKikimr::NBlobDepot { .Public = proto.GetPublic(), }); } + + std::optional<TString> UpdatesKeepState(TStringBuf key, NKikimrBlobDepot::EKeepState keepState) { + if (const auto it = Data.find(key); it != Data.end()) { + TDataValue value = it->second; + value.KeepState = keepState; + return ToValueProto(value); + } else { + return std::nullopt; + } + } + + void UpdateKeepState(const std::vector<std::pair<TString, NKikimrBlobDepot::EKeepState>>& data) { + for (const auto& [key, keepState] : data) { + auto& value = Data[std::move(key)]; + Y_VERIFY_DEBUG(value.KeepState < keepState); + value.KeepState = keepState; + } + } + + static TString ToValueProto(const TDataValue& value) { + NKikimrBlobDepot::TValue proto; + if (value.Meta) { + proto.SetMeta(value.Meta); + } + proto.MutableValueChain()->CopyFrom(value.ValueChain); + if (proto.GetKeepState() != value.KeepState) { + proto.SetKeepState(value.KeepState); + } + if (proto.GetPublic() != value.Public) { + proto.SetPublic(value.Public); + } + + TString s; + const bool success = proto.SerializeToString(&s); + Y_VERIFY(success); + return s; + } }; TBlobDepot::TDataManagerPtr TBlobDepot::CreateDataManager() { @@ -102,4 +140,12 @@ namespace NKikimr::NBlobDepot { DataManager->AddDataOnLoad(std::move(key), std::move(value)); } + std::optional<TString> TBlobDepot::UpdatesKeepState(TStringBuf key, NKikimrBlobDepot::EKeepState keepState) { + return DataManager->UpdatesKeepState(key, keepState); + } + + void TBlobDepot::UpdateKeepState(const std::vector<std::pair<TString, NKikimrBlobDepot::EKeepState>>& data) { + DataManager->UpdateKeepState(data); + } + } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/events.h b/ydb/core/blob_depot/events.h index 10edf3e4545..01212d9e5fa 100644 --- a/ydb/core/blob_depot/events.h +++ b/ydb/core/blob_depot/events.h @@ -14,9 +14,10 @@ namespace NKikimr { EvRegisterAgentResult, EvAllocateIds, EvAllocateIdsResult, + EvPushNotify, + EvPushNotifyResult, EvBlock, EvBlockResult, - EvPushNotify, EvQueryBlocks, EvQueryBlocksResult, EvCollectGarbage, @@ -56,9 +57,10 @@ namespace NKikimr { BLOBDEPOT_EVENT_PB_NO_ARGS(EvRegisterAgentResult); BLOBDEPOT_EVENT_PB(EvAllocateIds, ChannelKind); BLOBDEPOT_EVENT_PB(EvAllocateIdsResult, ChannelKind, Generation); - BLOBDEPOT_EVENT_PB(EvBlock, TabletId, BlockedGeneration); - BLOBDEPOT_EVENT_PB(EvBlockResult, Status, ErrorReason); BLOBDEPOT_EVENT_PB_NO_ARGS(EvPushNotify); + BLOBDEPOT_EVENT_PB_NO_ARGS(EvPushNotifyResult); + BLOBDEPOT_EVENT_PB(EvBlock, TabletId, BlockedGeneration); + BLOBDEPOT_EVENT_PB(EvBlockResult, Status, ErrorReason, TimeToLiveMs); BLOBDEPOT_EVENT_PB_NO_ARGS(EvQueryBlocks); BLOBDEPOT_EVENT_PB_NO_ARGS(EvQueryBlocksResult); BLOBDEPOT_EVENT_PB_NO_ARGS(EvCollectGarbage); @@ -74,6 +76,7 @@ namespace NKikimr { template<> struct TResponseFor<TEvApplyConfig> { using Type = TEvApplyConfigResult; }; template<> struct TResponseFor<TEvRegisterAgent> { using Type = TEvRegisterAgentResult; }; template<> struct TResponseFor<TEvAllocateIds> { using Type = TEvAllocateIdsResult; }; + template<> struct TResponseFor<TEvPushNotify> { using Type = TEvPushNotifyResult; }; template<> struct TResponseFor<TEvBlock> { using Type = TEvBlockResult; }; template<> struct TResponseFor<TEvQueryBlocks> { using Type = TEvQueryBlocksResult; }; template<> struct TResponseFor<TEvCollectGarbage> { using Type = TEvCollectGarbageResult; }; diff --git a/ydb/core/blob_depot/garbage_collection.cpp b/ydb/core/blob_depot/garbage_collection.cpp index 06e2d627236..2076729c162 100644 --- a/ydb/core/blob_depot/garbage_collection.cpp +++ b/ydb/core/blob_depot/garbage_collection.cpp @@ -3,16 +3,33 @@ namespace NKikimr::NBlobDepot { + namespace { + static ui64 GenStep(ui32 gen, ui32 step) { + return static_cast<ui64>(gen) << 32 | step; + } + } + class TBlobDepot::TGarbageCollectionManager { - TBlobDepot *Self; + TBlobDepot* const Self; + + struct TBarrier { + ui64 LastRecordGenStep = 0; + ui64 Soft = 0; + ui64 Hard = 0; + }; + + THashMap<std::pair<ui64, ui8>, TBarrier> Barriers; private: class TTxCollectGarbage : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { + std::optional<TString> Error; + std::unique_ptr<TEvBlobDepot::TEvCollectGarbage::THandle> Request; - ui32 KeepIndex = 0; - ui32 DoNotKeepIndex = 0; + int KeepIndex = 0; + int DoNotKeepIndex = 0; ui32 NumKeysProcessed = 0; std::vector<TString> KeysToDelete; + std::vector<std::pair<TString, NKikimrBlobDepot::EKeepState>> KeepStateUpdates; bool Done = false; static constexpr ui32 MaxKeysToProcessAtOnce = 10'000; @@ -27,41 +44,82 @@ namespace NKikimr::NBlobDepot { {} bool Execute(TTransactionContext& txc, const TActorContext&) override { - Done = ProcessKeepFlags(txc) && ProcessDoNotKeepFlags(txc) && ApplyBarrier(txc); + Validate(); + if (!Error) { + auto& record = Request->Get()->Record; + Done = ProcessFlags(txc, KeepIndex, record.GetKeep(), NKikimrBlobDepot::EKeepState::Keep) + && ProcessFlags(txc, DoNotKeepIndex, record.GetDoNotKeep(), NKikimrBlobDepot::EKeepState::DoNotKeep) + && ProcessBarrier(txc); + } return true; } void Complete(const TActorContext&) override { Self->DeleteKeys(KeysToDelete); - if (Done) { - auto [response, _] = TEvBlobDepot::MakeResponseFor(*Request, Self->SelfId(), NKikimrProto::OK, std::nullopt); + Self->UpdateKeepState(KeepStateUpdates); + + if (Done || Error) { + if (Done) { + ApplyBarrier(); + } + auto [response, _] = TEvBlobDepot::MakeResponseFor(*Request, Self->SelfId(), + Error ? NKikimrProto::ERROR : NKikimrProto::OK, std::move(Error)); TActivationContext::Send(response.release()); } else { Self->Execute(std::make_unique<TTxCollectGarbage>(Self, std::move(Request), KeepIndex, DoNotKeepIndex)); } } - bool ProcessKeepFlags(TTransactionContext& /*txc*/) { - const auto& record = Request->Get()->Record; - while (KeepIndex < record.KeepSize() && NumKeysProcessed < MaxKeysToProcessAtOnce) { - ++KeepIndex; - ++NumKeysProcessed; + void Validate() { + // validate the command first + auto& record = Request->Get()->Record; + if (record.HasCollectGeneration() && record.HasCollectStep()) { + if (!record.HasTabletId() || !record.HasChannel() || record.GetChannel() > TLogoBlobID::MaxChannel) { + Error = "TabletId/Channel are either not set or invalid"; + } else if (!record.HasGeneration() || !record.HasPerGenerationCounter()) { + Error = "Generation/PerGenerationCounter are not set"; + } else { + const auto key = std::make_pair(record.GetTabletId(), record.GetChannel()); + auto& barriers = Self->GarbageCollectionManager->Barriers; + if (const auto it = barriers.find(key); it != barriers.end()) { + auto& barrier = it->second; + const ui64 recordGenStep = GenStep(record.GetGeneration(), record.GetPerGenerationCounter()); + const ui64 collectGenStep = GenStep(record.GetCollectGeneration(), record.GetCollectStep()); + ui64& currentGenStep = record.GetHard() ? barrier.Hard : barrier.Soft; + if (recordGenStep < barrier.LastRecordGenStep) { + Error = "record generation:counter is obsolete"; + } else if (recordGenStep == barrier.LastRecordGenStep) { + if (currentGenStep != collectGenStep) { + Error = "repeated command with different collect parameters received"; + } + } else if (collectGenStep < currentGenStep) { + Error = "decreasing barrier"; + } + } + } + } else if (record.HasCollectGeneration() || record.HasCollectStep()) { + Error = "CollectGeneration/CollectStep set incorrectly"; } - - return KeepIndex == record.KeepSize(); } - bool ProcessDoNotKeepFlags(TTransactionContext& /*txc*/) { - const auto& record = Request->Get()->Record; - while (DoNotKeepIndex < record.DoNotKeepSize() && NumKeysProcessed < MaxKeysToProcessAtOnce) { - ++DoNotKeepIndex; - ++NumKeysProcessed; + bool ProcessFlags(TTransactionContext& txc, int& index, + const NProtoBuf::RepeatedPtrField<NKikimrProto::TLogoBlobID>& items, + NKikimrBlobDepot::EKeepState state) { + NIceDb::TNiceDb db(txc.DB); + for (; index < items.size() && NumKeysProcessed < MaxKeysToProcessAtOnce; ++index) { + const auto id = LogoBlobIDFromLogoBlobID(items[index]); + const TStringBuf key = id.AsBinaryString(); + if (const auto& value = Self->UpdatesKeepState(key, state)) { + KeepStateUpdates.emplace_back(key, state); + db.Table<Schema::Data>().Key(TString(key)).Update<Schema::Data::Value>(*value); + ++NumKeysProcessed; + } } - return DoNotKeepIndex == record.DoNotKeepSize(); + return index == items.size(); } - bool ApplyBarrier(TTransactionContext& txc) { + bool ProcessBarrier(TTransactionContext& txc) { NIceDb::TNiceDb db(txc.DB); const auto& record = Request->Get()->Record; @@ -84,10 +142,39 @@ namespace NKikimr::NBlobDepot { Self->ScanRange(first.AsBinaryString(), last.AsBinaryString(), EScanFlags::INCLUDE_BEGIN | EScanFlags::INCLUDE_END, processKey); + + if (NumKeysProcessed == MaxKeysToProcessAtOnce) { + return false; + } + + auto row = db.Table<Schema::Barriers>().Key(record.GetTabletId(), record.GetChannel()); + const ui64 collectGenStep = GenStep(record.GetCollectGeneration(), record.GetCollectStep()); + if (record.GetHard()) { + row.Update<Schema::Barriers::Hard>(collectGenStep); + } else { + row.Update<Schema::Barriers::Soft>(collectGenStep); + } + row.Update<Schema::Barriers::LastRecordGenStep>(GenStep(record.GetGeneration(), record.GetPerGenerationCounter())); } return true; } + + void ApplyBarrier() { + const auto& record = Request->Get()->Record; + if (record.HasCollectGeneration() && record.HasCollectStep()) { + const auto key = std::make_pair(record.GetTabletId(), record.GetChannel()); + auto& barriers = Self->GarbageCollectionManager->Barriers; + auto& barrier = barriers[key]; + const ui64 recordGenStep = GenStep(record.GetGeneration(), record.GetPerGenerationCounter()); + const ui64 collectGenStep = GenStep(record.GetCollectGeneration(), record.GetCollectStep()); + ui64& currentGenStep = record.GetHard() ? barrier.Hard : barrier.Soft; + Y_VERIFY(barrier.LastRecordGenStep <= recordGenStep); + barrier.LastRecordGenStep = recordGenStep; + Y_VERIFY(currentGenStep <= collectGenStep); + currentGenStep = collectGenStep; + } + } }; public: @@ -99,6 +186,13 @@ namespace NKikimr::NBlobDepot { std::unique_ptr<TEvBlobDepot::TEvCollectGarbage::THandle> uniq(ev.Release()); Self->Execute(std::make_unique<TTxCollectGarbage>(Self, std::move(uniq))); } + + bool CheckBlobForBarrier(TLogoBlobID id) const { + const auto key = std::make_pair(id.TabletID(), id.Channel()); + const auto it = Barriers.find(key); + const ui64 genstep = static_cast<ui64>(id.Generation()) << 32 | id.Step(); + return it == Barriers.end() || genstep > Max(it->second.Soft, it->second.Hard); + } }; TBlobDepot::TGarbageCollectionManagerPtr TBlobDepot::CreateGarbageCollectionManager() { @@ -109,4 +203,8 @@ namespace NKikimr::NBlobDepot { GarbageCollectionManager->Handle(ev); } + bool TBlobDepot::CheckBlobForBarrier(TLogoBlobID id) const { + return GarbageCollectionManager->CheckBlobForBarrier(id); + } + } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp index ef6eb40bd69..318883bd169 100644 --- a/ydb/core/blob_depot/op_commit_blob_seq.cpp +++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp @@ -29,6 +29,14 @@ namespace NKikimr::NBlobDepot { const bool success = value.SerializeToString(&valueData); Y_VERIFY(success); + const TString& key = item.GetKey(); + if (key.size() == 3 * sizeof(ui64)) { + const TLogoBlobID id(reinterpret_cast<const ui64*>(key.data())); + if (!Self->CheckBlobForBarrier(id)) { + continue; // FIXME: report error somehow (?) + } + } + db.Table<Schema::Data>().Key(item.GetKey()).Update<Schema::Data::Value>(valueData); UpdateQ.emplace_back(item.GetKey(), std::move(value)); } diff --git a/ydb/core/blob_depot/schema.h b/ydb/core/blob_depot/schema.h index 80d9c948dd6..f5b2f123bb2 100644 --- a/ydb/core/blob_depot/schema.h +++ b/ydb/core/blob_depot/schema.h @@ -38,15 +38,17 @@ namespace NKikimr::NBlobDepot { struct Barriers : Table<3> { struct TabletId : Column<1, NScheme::NTypeIds::Uint64> {}; struct Channel : Column<2, NScheme::NTypeIds::Uint8> {}; - struct SoftGenStep : Column<3, NScheme::NTypeIds::Uint64> {}; - struct HardGenStep : Column<4, NScheme::NTypeIds::Uint64> {}; + struct LastRecordGenStep : Column<3, NScheme::NTypeIds::Uint64> {}; + struct Soft : Column<4, NScheme::NTypeIds::Uint64> {}; + struct Hard : Column<5, NScheme::NTypeIds::Uint64> {}; using TKey = TableKey<TabletId, Channel>; using TColumns = TableColumns< TabletId, Channel, - SoftGenStep, - HardGenStep + LastRecordGenStep, + Soft, + Hard >; }; diff --git a/ydb/core/blobstorage/dsproxy/group_sessions.cpp b/ydb/core/blobstorage/dsproxy/group_sessions.cpp index 27223501628..9971c4a4e72 100644 --- a/ydb/core/blobstorage/dsproxy/group_sessions.cpp +++ b/ydb/core/blobstorage/dsproxy/group_sessions.cpp @@ -85,7 +85,7 @@ TGroupSessions::TGroupSessions(const TIntrusivePtr<TBlobStorageGroupInfo>& info, auto& q = stateVDisk.Queues.GetQueue(queueId); q.ActorId = queue; q.FlowRecord = std::move(flowRecord); - q.ExtraBlockChecksSupport = false; + q.ExtraBlockChecksSupport.reset(); } } } @@ -126,7 +126,7 @@ void TGroupSessions::QueueConnectUpdate(ui32 orderNumber, NKikimrBlobStorage::EV q.ExtraBlockChecksSupport = extraGroupChecksSupport; } else { ConnectedQueuesMask[orderNumber] &= ~(1 << queueId); - q.ExtraBlockChecksSupport = false; + q.ExtraBlockChecksSupport.reset(); } } diff --git a/ydb/core/blobstorage/dsproxy/group_sessions.h b/ydb/core/blobstorage/dsproxy/group_sessions.h index 120ad2da4c9..d5005a7177d 100644 --- a/ydb/core/blobstorage/dsproxy/group_sessions.h +++ b/ydb/core/blobstorage/dsproxy/group_sessions.h @@ -21,7 +21,7 @@ namespace NKikimr { struct TQueue { TActorId ActorId; TIntrusivePtr<NBackpressure::TFlowRecord> FlowRecord; - bool ExtraBlockChecksSupport; + std::optional<bool> ExtraBlockChecksSupport; }; TQueue PutTabletLog; TQueue PutAsyncBlob; @@ -80,12 +80,12 @@ namespace NKikimr { {} static void ValidateEvent(TQueue& queue, const TEvBlobStorage::TEvVPut& event) { - Y_VERIFY(!event.Record.ExtraBlockChecksSize() || queue.ExtraBlockChecksSupport); + Y_VERIFY(!event.Record.ExtraBlockChecksSize() || queue.ExtraBlockChecksSupport.value_or(true)); } static void ValidateEvent(TQueue& queue, const TEvBlobStorage::TEvVMultiPut& event) { for (const auto& item : event.Record.GetItems()) { - Y_VERIFY(!item.ExtraBlockChecksSize() || queue.ExtraBlockChecksSupport); + Y_VERIFY(!item.ExtraBlockChecksSize() || queue.ExtraBlockChecksSupport.value_or(true)); } } diff --git a/ydb/core/blobstorage/ut_blobstorage/blob_depot.cpp b/ydb/core/blobstorage/ut_blobstorage/blob_depot.cpp index 14cdbf21f39..0c1cd18d596 100644 --- a/ydb/core/blobstorage/ut_blobstorage/blob_depot.cpp +++ b/ydb/core/blobstorage/ut_blobstorage/blob_depot.cpp @@ -84,6 +84,32 @@ Y_UNIT_TEST_SUITE(BlobDepot) { UNIT_ASSERT_VALUES_EQUAL(res->Get()->Responses[0].Buffer, data); } + sender = env.Runtime->AllocateEdgeActor(3); + env.Runtime->WrapInActorContext(sender, [&] { + SendToBSProxy(sender, groupId, new TEvBlobStorage::TEvRange(id.TabletID(), Min<TLogoBlobID>(), + Max<TLogoBlobID>(), false, TInstant::Max(), true)); + }); + { + auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvRangeResult>(sender); + UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK); + UNIT_ASSERT_VALUES_EQUAL(res->Get()->Responses.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(res->Get()->Responses[0].Id, id); + UNIT_ASSERT_VALUES_EQUAL(res->Get()->Responses[0].Buffer, TString()); + } + + sender = env.Runtime->AllocateEdgeActor(4); + env.Runtime->WrapInActorContext(sender, [&] { + SendToBSProxy(sender, groupId, new TEvBlobStorage::TEvRange(id.TabletID(), Min<TLogoBlobID>(), + Max<TLogoBlobID>(), false, TInstant::Max(), false)); + }); + { + auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvRangeResult>(sender); + UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK); + UNIT_ASSERT_VALUES_EQUAL(res->Get()->Responses.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(res->Get()->Responses[0].Id, id); + UNIT_ASSERT_VALUES_EQUAL(res->Get()->Responses[0].Buffer, data); + } + env.Sim(TDuration::Seconds(20)); } diff --git a/ydb/core/mind/bscontroller/load_everything.cpp b/ydb/core/mind/bscontroller/load_everything.cpp index 529b0ebe384..e01099456ae 100644 --- a/ydb/core/mind/bscontroller/load_everything.cpp +++ b/ydb/core/mind/bscontroller/load_everything.cpp @@ -176,8 +176,9 @@ public: const TBoxStoragePoolId storagePoolId = it->second; groupToStoragePool.erase(it); + const bool isVirtualGroup = TGroupID(groups.GetKey()).ConfigurationType() == EGroupConfigurationType::Virtual; const auto geomIt = geometry.find(groups.GetKey()); - Y_VERIFY(geomIt != geometry.end()); + Y_VERIFY(isVirtualGroup || geomIt != geometry.end()); TGroupInfo& group = Self->AddGroup(groups.GetKey(), groups.GetValue<T::Generation>(), @@ -194,9 +195,9 @@ public: groups.GetValueOrDefault<T::Down>(), groups.GetValueOrDefault<T::SeenOperational>(), storagePoolId, - std::get<0>(geomIt->second), - std::get<1>(geomIt->second), - std::get<2>(geomIt->second)); + isVirtualGroup ? 0 : std::get<0>(geomIt->second), + isVirtualGroup ? 0 : std::get<1>(geomIt->second), + isVirtualGroup ? 0 : std::get<2>(geomIt->second)); group.DecommitStatus = groups.GetValueOrDefault<T::DecommitStatus>(); #define OPTIONAL(NAME) \ diff --git a/ydb/core/protos/blob_depot.proto b/ydb/core/protos/blob_depot.proto index 273c1786f52..46ec82b9ec0 100644 --- a/ydb/core/protos/blob_depot.proto +++ b/ydb/core/protos/blob_depot.proto @@ -87,8 +87,15 @@ message TEvBlockResult { optional uint32 TimeToLiveMs = 3; } -message TEvPushNotify { // BlobDepot -> Agent push notification (to take some action) - repeated fixed64 UpdateBlocksForTabletIds = 1; // notify about some changed blocks +message TEvPushNotify { + message TBlockedTablet { + optional fixed64 TabletId = 1; + optional uint32 BlockedGeneration = 2; + } + repeated TBlockedTablet BlockedTablets = 1; +} + +message TEvPushNotifyResult { } message TEvQueryBlocks { |