diff options
author | alexvru <[email protected]> | 2022-07-11 19:16:15 +0300 |
---|---|---|
committer | alexvru <[email protected]> | 2022-07-11 19:16:15 +0300 |
commit | 4c2d2fdca34314469838fb1dd5eac1e711aaa8cb (patch) | |
tree | 3956bb93334aef5e5e5021366e9b2dc7980604a0 | |
parent | 5a88a82f28dde161dd304636e9a6ed585b2022eb (diff) |
BlobDepot work in progress
-rw-r--r-- | ydb/core/blob_depot/agent/agent.cpp | 18 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/agent_impl.h | 36 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/blob_mapping_cache.cpp | 141 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/blob_mapping_cache.h | 41 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/blocks.cpp | 204 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/blocks.h | 48 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/comm.cpp | 3 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_block.cpp | 11 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_collect_garbage.cpp | 3 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_discover.cpp | 11 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_get.cpp | 4 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_put.cpp | 3 | ||||
-rw-r--r-- | ydb/core/blob_depot/blocks.cpp | 36 | ||||
-rw-r--r-- | ydb/core/blob_depot/blocks.h | 11 | ||||
-rw-r--r-- | ydb/core/blob_depot/events.h | 2 | ||||
-rw-r--r-- | ydb/core/blob_depot/op_load.cpp | 3 | ||||
-rw-r--r-- | ydb/core/blob_depot/schema.h | 6 | ||||
-rw-r--r-- | ydb/core/protos/blob_depot.proto | 1 |
18 files changed, 305 insertions, 277 deletions
diff --git a/ydb/core/blob_depot/agent/agent.cpp b/ydb/core/blob_depot/agent/agent.cpp index 0fc3ea0fa4b..c4f56e58a02 100644 --- a/ydb/core/blob_depot/agent/agent.cpp +++ b/ydb/core/blob_depot/agent/agent.cpp @@ -1,8 +1,26 @@ #include "agent.h" #include "agent_impl.h" +#include "blocks.h" +#include "blob_mapping_cache.h" namespace NKikimr::NBlobDepot { + TBlobDepotAgent::TBlobDepotAgent(ui32 virtualGroupId) + : TActor(&TThis::StateFunc) + , TRequestSender(*this) + , VirtualGroupId(virtualGroupId) + , AgentInstanceId(RandomNumber<ui64>()) + , BlocksManagerPtr(new TBlocksManager(*this)) + , BlocksManager(*BlocksManagerPtr) + , BlobMappingCachePtr(new TBlobMappingCache(*this)) + , BlobMappingCache(*BlobMappingCachePtr) + { + Y_VERIFY(TGroupID(VirtualGroupId).ConfigurationType() == EGroupConfigurationType::Virtual); + } + + TBlobDepotAgent::~TBlobDepotAgent() + {} + IActor *CreateBlobDepotAgent(ui32 virtualGroupId) { return new TBlobDepotAgent(virtualGroupId); } diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h index d81e72cb045..dc3068976d9 100644 --- a/ydb/core/blob_depot/agent/agent_impl.h +++ b/ydb/core/blob_depot/agent/agent_impl.h @@ -81,16 +81,8 @@ namespace NKikimr::NBlobDepot { TActorId PipeId; public: - TBlobDepotAgent(ui32 virtualGroupId) - : TActor(&TThis::StateFunc) - , TRequestSender(*this) - , VirtualGroupId(virtualGroupId) - , AgentInstanceId(RandomNumber<ui64>()) - , BlocksManager(CreateBlocksManager()) - , BlobMappingCache(CreateBlobMappingCache()) - { - Y_VERIFY(TGroupID(VirtualGroupId).ConfigurationType() == EGroupConfigurationType::Virtual); - } + TBlobDepotAgent(ui32 virtualGroupId); + ~TBlobDepotAgent(); #define FORWARD_STORAGE_PROXY(TYPE) fFunc(TEvBlobStorage::TYPE, HandleStorageProxy); STRICT_STFUNC(StateFunc, @@ -292,19 +284,8 @@ namespace NKikimr::NBlobDepot { // Blocks class TBlocksManager; - using TBlocksManagerPtr = std::unique_ptr<TBlocksManager, std::function<void(TBlocksManager*)>>; - TBlocksManagerPtr BlocksManager; - - TBlocksManagerPtr CreateBlocksManager(); - - NKikimrProto::EReplyStatus CheckBlockForTablet(ui64 tabletId, ui32 generation, TQuery *query, - ui32 *blockedGeneration = nullptr); - - ui32 GetBlockForTablet(ui64 tabletId); - - void SetBlockForTablet(ui64 tabletId, ui32 blockedGeneration, TMonotonic issueTimestamp, TDuration timeToLive); - - void OnBlockedTablets(const NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TEvPushNotify::TBlockedTablet>& tablets); + std::unique_ptr<TBlocksManager> BlocksManagerPtr; + TBlocksManager& BlocksManager; //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Reading @@ -320,13 +301,8 @@ namespace NKikimr::NBlobDepot { // Blob mapping cache class TBlobMappingCache; - using TBlobMappingCachePtr = std::unique_ptr<TBlobMappingCache, std::function<void(TBlobMappingCache*)>>; - TBlobMappingCachePtr BlobMappingCache; - - TBlobMappingCachePtr CreateBlobMappingCache(); - - void HandleResolveResult(const NKikimrBlobDepot::TEvResolveResult& msg); - const TValueChain *ResolveKey(TString key, TQuery *query, TRequestContext::TPtr context); + std::unique_ptr<TBlobMappingCache> BlobMappingCachePtr; + TBlobMappingCache& BlobMappingCache; //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Status flags diff --git a/ydb/core/blob_depot/agent/blob_mapping_cache.cpp b/ydb/core/blob_depot/agent/blob_mapping_cache.cpp index bc6a9ee2431..479cb057bdd 100644 --- a/ydb/core/blob_depot/agent/blob_mapping_cache.cpp +++ b/ydb/core/blob_depot/agent/blob_mapping_cache.cpp @@ -1,111 +1,70 @@ -#include "agent_impl.h" +#include "blob_mapping_cache.h" namespace NKikimr::NBlobDepot { - class TBlobDepotAgent::TBlobMappingCache - : public TRequestSender - { - struct TQueryWaitingForKey { - TQuery* const Query; - ui64 Id; - - TQueryWaitingForKey(TQuery *query, ui64 id) - : Query(query) - , Id(id) - {} - }; - - struct TCachedKeyItem : TIntrusiveListItem<TCachedKeyItem> { - TStringBuf Key; - std::optional<TValueChain> Values; - bool ResolveInFlight = false; - std::list<TQueryWaitingForKey> QueriesWaitingForKey; - }; - - THashMap<TString, TCachedKeyItem> Cache; - TIntrusiveList<TCachedKeyItem> Queue; - - public: - TBlobMappingCache(TBlobDepotAgent& agent) - : TRequestSender(agent) - {} - - void HandleResolveResult(const NKikimrBlobDepot::TEvResolveResult& msg) { - for (const auto& item : msg.GetResolvedKeys()) { - TString key = item.GetKey(); - const auto [it, inserted] = Cache.try_emplace(std::move(key)); - auto& entry = it->second; - if (inserted) { - entry.Key = it->first; - } - entry.Values.emplace(item.GetValueChain()); - Queue.PushBack(&entry); - - entry.ResolveInFlight = false; - - for (TQueryWaitingForKey& item : std::exchange(entry.QueriesWaitingForKey, {})) { - Agent.OnRequestComplete(item.Id, TKeyResolved{&entry.Values.value()}, Agent.OtherRequestInFlight); - } - } - } - - const TValueChain *ResolveKey(TString key, TQuery *query, TRequestContext::TPtr context) { + void TBlobDepotAgent::TBlobMappingCache::HandleResolveResult(const NKikimrBlobDepot::TEvResolveResult& msg) { + for (const auto& item : msg.GetResolvedKeys()) { + TString key = item.GetKey(); const auto [it, inserted] = Cache.try_emplace(std::move(key)); auto& entry = it->second; if (inserted) { entry.Key = it->first; } - if (entry.Values) { - return &entry.Values.value(); - } - if (!entry.ResolveInFlight) { - entry.ResolveInFlight = true; + entry.Values.emplace(item.GetValueChain()); + Queue.PushBack(&entry); - NKikimrBlobDepot::TEvResolve msg; - auto *item = msg.AddItems(); - item->SetBeginningKey(it->first); - item->SetEndingKey(it->first); - item->SetIncludeEnding(true); - Agent.Issue(std::move(msg), this, nullptr); - } - - const ui64 id = Agent.NextRequestId++; - auto queryIt = entry.QueriesWaitingForKey.emplace(entry.QueriesWaitingForKey.end(), query, id); - auto cancelCallback = [&entry, queryIt] { - entry.QueriesWaitingForKey.erase(queryIt); - }; - Agent.RegisterRequest(id, query, std::move(context), std::move(cancelCallback), false); + entry.ResolveInFlight = false; - return nullptr; + for (TQueryWaitingForKey& item : std::exchange(entry.QueriesWaitingForKey, {})) { + Agent.OnRequestComplete(item.Id, TKeyResolved{&entry.Values.value()}, Agent.OtherRequestInFlight); + } } + } - void ProcessResponse(ui64 /*tag*/, TRequestContext::TPtr /*context*/, TResponse response) { - if (auto *p = std::get_if<TEvBlobDepot::TEvResolveResult*>(&response)) { - HandleResolveResult((*p)->Record); - } else if (std::holds_alternative<TTabletDisconnected>(response)) { - for (auto& [key, entry] : Cache) { - if (entry.ResolveInFlight) { - for (TQueryWaitingForKey& item : std::exchange(entry.QueriesWaitingForKey, {})) { - Agent.OnRequestComplete(item.Id, response, Agent.OtherRequestInFlight); - } - } - } - } else { - Y_FAIL(); - } + const TValueChain *TBlobDepotAgent::TBlobMappingCache::ResolveKey(TString key, TQuery *query, TRequestContext::TPtr context) { + const auto [it, inserted] = Cache.try_emplace(std::move(key)); + auto& entry = it->second; + if (inserted) { + entry.Key = it->first; + } + if (entry.Values) { + return &entry.Values.value(); + } + if (!entry.ResolveInFlight) { + entry.ResolveInFlight = true; + + NKikimrBlobDepot::TEvResolve msg; + auto *item = msg.AddItems(); + item->SetBeginningKey(it->first); + item->SetEndingKey(it->first); + item->SetIncludeEnding(true); + Agent.Issue(std::move(msg), this, nullptr); } - }; - TBlobDepotAgent::TBlobMappingCachePtr TBlobDepotAgent::CreateBlobMappingCache() { - return {new TBlobMappingCache{*this}, std::default_delete<TBlobMappingCache>{}}; - } + const ui64 id = Agent.NextRequestId++; + auto queryIt = entry.QueriesWaitingForKey.emplace(entry.QueriesWaitingForKey.end(), query, id); + auto cancelCallback = [&entry, queryIt] { + entry.QueriesWaitingForKey.erase(queryIt); + }; + Agent.RegisterRequest(id, query, std::move(context), std::move(cancelCallback), false); - void TBlobDepotAgent::HandleResolveResult(const NKikimrBlobDepot::TEvResolveResult& msg) { - BlobMappingCache->HandleResolveResult(msg); + return nullptr; } - const TValueChain *TBlobDepotAgent::ResolveKey(TString key, TQuery *query, TRequestContext::TPtr context) { - return BlobMappingCache->ResolveKey(std::move(key), query, std::move(context)); + void TBlobDepotAgent::TBlobMappingCache::ProcessResponse(ui64 /*tag*/, TRequestContext::TPtr /*context*/, TResponse response) { + if (auto *p = std::get_if<TEvBlobDepot::TEvResolveResult*>(&response)) { + HandleResolveResult((*p)->Record); + } else if (std::holds_alternative<TTabletDisconnected>(response)) { + for (auto& [key, entry] : Cache) { + if (entry.ResolveInFlight) { + for (TQueryWaitingForKey& item : std::exchange(entry.QueriesWaitingForKey, {})) { + Agent.OnRequestComplete(item.Id, response, Agent.OtherRequestInFlight); + } + } + } + } else { + Y_FAIL(); + } } } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/blob_mapping_cache.h b/ydb/core/blob_depot/agent/blob_mapping_cache.h new file mode 100644 index 00000000000..d4e44e7c74f --- /dev/null +++ b/ydb/core/blob_depot/agent/blob_mapping_cache.h @@ -0,0 +1,41 @@ +#pragma once + +#include "defs.h" +#include "agent_impl.h" + +namespace NKikimr::NBlobDepot { + + class TBlobDepotAgent::TBlobMappingCache + : public TRequestSender + { + struct TQueryWaitingForKey { + TQuery* const Query; + ui64 Id; + + TQueryWaitingForKey(TQuery *query, ui64 id) + : Query(query) + , Id(id) + {} + }; + + struct TCachedKeyItem : TIntrusiveListItem<TCachedKeyItem> { + TStringBuf Key; + std::optional<TValueChain> Values; + bool ResolveInFlight = false; + std::list<TQueryWaitingForKey> QueriesWaitingForKey; + }; + + THashMap<TString, TCachedKeyItem> Cache; + TIntrusiveList<TCachedKeyItem> Queue; + + public: + TBlobMappingCache(TBlobDepotAgent& agent) + : TRequestSender(agent) + {} + + void HandleResolveResult(const NKikimrBlobDepot::TEvResolveResult& msg); + const TValueChain *ResolveKey(TString key, TQuery *query, TRequestContext::TPtr context); + void ProcessResponse(ui64 /*tag*/, TRequestContext::TPtr /*context*/, TResponse response); + }; + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/blocks.cpp b/ydb/core/blob_depot/agent/blocks.cpp index e700e2c3d74..852c6bc24c2 100644 --- a/ydb/core/blob_depot/agent/blocks.cpp +++ b/ydb/core/blob_depot/agent/blocks.cpp @@ -1,150 +1,102 @@ -#include "agent_impl.h" +#include "blocks.h" namespace NKikimr::NBlobDepot { - class TBlobDepotAgent::TBlocksManager - : public TRequestSender - { - struct TBlock { - ui32 BlockedGeneration; - TDuration TimeToLive; - TMonotonic ExpirationTimestamp; // not valid after - bool RefreshInFlight = false; - TIntrusiveList<TQuery, TPendingBlockChecks> PendingBlockChecks; - }; - - THashMap<ui64, TBlock> Blocks; - - public: - TBlocksManager(TBlobDepotAgent& agent) - : TRequestSender(agent) - {} - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////// - - struct TQueryBlockContext : TRequestContext { - TMonotonic Timestamp; - ui64 TabletId; - - TQueryBlockContext(TMonotonic timestamp, ui64 tabletId) - : Timestamp(timestamp) - , TabletId(tabletId) - {} - }; - - NKikimrProto::EReplyStatus CheckBlockForTablet(ui64 tabletId, ui32 generation, TQuery *query, - ui32 *blockedGeneration) { - NKikimrProto::EReplyStatus status = NKikimrProto::UNKNOWN; - auto& block = Blocks[tabletId]; - const TMonotonic now = TActivationContext::Monotonic(); - if (generation <= block.BlockedGeneration) { - status = NKikimrProto::RACE; - } else if (now < block.ExpirationTimestamp) { - if (blockedGeneration) { - *blockedGeneration = block.BlockedGeneration; - } - status = NKikimrProto::OK; + NKikimrProto::EReplyStatus TBlobDepotAgent::TBlocksManager::CheckBlockForTablet(ui64 tabletId, ui32 generation, + TQuery *query, ui32 *blockedGeneration) { + NKikimrProto::EReplyStatus status = NKikimrProto::UNKNOWN; + auto& block = Blocks[tabletId]; + const TMonotonic now = TActivationContext::Monotonic(); + if (generation <= block.BlockedGeneration) { + status = NKikimrProto::ALREADY; + } else if (now < block.ExpirationTimestamp) { + if (blockedGeneration) { + *blockedGeneration = block.BlockedGeneration; } - if (status != NKikimrProto::RACE && now + block.TimeToLive / 2 >= block.ExpirationTimestamp && !block.RefreshInFlight) { - NKikimrBlobDepot::TEvQueryBlocks queryBlocks; - queryBlocks.AddTabletIds(tabletId); - Agent.Issue(std::move(queryBlocks), this, std::make_shared<TQueryBlockContext>( - TActivationContext::Monotonic(), tabletId)); - block.RefreshInFlight = true; - block.PendingBlockChecks.PushBack(query); - } - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA01, "CheckBlockForTablet", (QueryId, query->GetQueryId()), - (TabletId, tabletId), (Generation, generation), (Status, status), (Now, now), - (ExpirationTimestamp, block.ExpirationTimestamp)); - return status; + status = NKikimrProto::OK; } - - void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr context, TResponse response) override { - if (auto *p = std::get_if<TEvBlobDepot::TEvQueryBlocksResult*>(&response)) { - Handle(std::move(context), (*p)->Record); - } else if (std::holds_alternative<TTabletDisconnected>(response)) { - auto& queryBlockContext = context->Obtain<TQueryBlockContext>(); - if (const auto it = Blocks.find(queryBlockContext.TabletId); it != Blocks.end()) { - IssueOnUpdateBlock(it->second, false); - } - } else { - Y_FAIL("unexpected response type"); - } + if (status != NKikimrProto::ALREADY && now + block.TimeToLive / 2 >= block.ExpirationTimestamp && !block.RefreshInFlight) { + NKikimrBlobDepot::TEvQueryBlocks queryBlocks; + queryBlocks.AddTabletIds(tabletId); + Agent.Issue(std::move(queryBlocks), this, std::make_shared<TQueryBlockContext>( + TActivationContext::Monotonic(), tabletId)); + block.RefreshInFlight = true; + block.PendingBlockChecks.PushBack(query); } + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA01, "CheckBlockForTablet", (QueryId, query->GetQueryId()), + (TabletId, tabletId), (Generation, generation), (Status, status), (Now, now), + (ExpirationTimestamp, block.ExpirationTimestamp)); + return status; + } - void Handle(TRequestContext::TPtr context, NKikimrBlobDepot::TEvQueryBlocksResult& msg) { + void TBlobDepotAgent::TBlocksManager::ProcessResponse(ui64 /*id*/, TRequestContext::TPtr context, TResponse response) { + if (auto *p = std::get_if<TEvBlobDepot::TEvQueryBlocksResult*>(&response)) { + Handle(std::move(context), (*p)->Record); + } else if (std::holds_alternative<TTabletDisconnected>(response)) { auto& queryBlockContext = context->Obtain<TQueryBlockContext>(); - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA02, "TEvQueryBlocksResult", (VirtualGroupId, Agent.VirtualGroupId), - (Msg, msg), (TabletId, queryBlockContext.TabletId)); - auto& block = Blocks[queryBlockContext.TabletId]; - Y_VERIFY(msg.BlockedGenerationsSize() == 1); - const ui32 newBlockedGeneration = msg.GetBlockedGenerations(0); - Y_VERIFY(block.BlockedGeneration <= newBlockedGeneration); - block.BlockedGeneration = newBlockedGeneration; - block.TimeToLive = TDuration::MilliSeconds(msg.GetTimeToLiveMs()); - block.ExpirationTimestamp = queryBlockContext.Timestamp + block.TimeToLive; - block.RefreshInFlight = false; - IssueOnUpdateBlock(block, true); + if (const auto it = Blocks.find(queryBlockContext.TabletId); it != Blocks.end()) { + IssueOnUpdateBlock(it->second, false); + } + } else { + Y_FAIL("unexpected response type"); } + } - void IssueOnUpdateBlock(TBlock& block, bool success) { - TIntrusiveList<TQuery, TPendingBlockChecks> pendingBlockChecks; - pendingBlockChecks.Append(block.PendingBlockChecks); - pendingBlockChecks.ForEach([success](TQuery *query) { - query->OnUpdateBlock(success); - }); - } + void TBlobDepotAgent::TBlocksManager::Handle(TRequestContext::TPtr context, NKikimrBlobDepot::TEvQueryBlocksResult& msg) { + auto& queryBlockContext = context->Obtain<TQueryBlockContext>(); + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA02, "TEvQueryBlocksResult", (VirtualGroupId, Agent.VirtualGroupId), + (Msg, msg), (TabletId, queryBlockContext.TabletId)); + auto& block = Blocks[queryBlockContext.TabletId]; + Y_VERIFY(msg.BlockedGenerationsSize() == 1); + const ui32 newBlockedGeneration = msg.GetBlockedGenerations(0); + Y_VERIFY(block.BlockedGeneration <= newBlockedGeneration); + block.BlockedGeneration = newBlockedGeneration; + block.TimeToLive = TDuration::MilliSeconds(msg.GetTimeToLiveMs()); + block.ExpirationTimestamp = queryBlockContext.Timestamp + block.TimeToLive; + block.RefreshInFlight = false; + IssueOnUpdateBlock(block, true); + } - ui32 GetBlockForTablet(ui64 tabletId) { - const auto it = Blocks.find(tabletId); - return it != Blocks.end() ? it->second.BlockedGeneration : 0; - } + void TBlobDepotAgent::TBlocksManager::IssueOnUpdateBlock(TBlock& block, bool success) { + TIntrusiveList<TQuery, TPendingBlockChecks> pendingBlockChecks; + pendingBlockChecks.Append(block.PendingBlockChecks); + pendingBlockChecks.ForEach([success](TQuery *query) { + query->OnUpdateBlock(success); + }); + } - void SetBlockForTablet(ui64 tabletId, ui32 blockedGeneration, TMonotonic timestamp, TDuration timeToLive) { - auto& block = Blocks[tabletId]; - Y_VERIFY(block.BlockedGeneration <= blockedGeneration); - block.BlockedGeneration = blockedGeneration; - block.TimeToLive = timeToLive; - block.ExpirationTimestamp = timestamp + timeToLive; + std::pair<ui32, ui64> TBlobDepotAgent::TBlocksManager::GetBlockForTablet(ui64 tabletId) { + if (const auto it = Blocks.find(tabletId); it != Blocks.end()) { + const auto& record = it->second; + return {record.BlockedGeneration, record.IssuerGuid}; + } else { + return {0, 0}; } + } + + void TBlobDepotAgent::TBlocksManager::SetBlockForTablet(ui64 tabletId, ui32 blockedGeneration, TMonotonic timestamp, TDuration timeToLive) { + auto& block = Blocks[tabletId]; + Y_VERIFY(block.BlockedGeneration <= blockedGeneration); + block.BlockedGeneration = blockedGeneration; + block.TimeToLive = timeToLive; + block.ExpirationTimestamp = timestamp + timeToLive; + } - void OnBlockedTablets(const NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TEvPushNotify::TBlockedTablet>& tablets) { - for (const auto& tablet : tablets) { - if (const auto it = Blocks.find(tablet.GetTabletId()); it != Blocks.end()) { - auto& block = it->second; - block.BlockedGeneration = tablet.GetBlockedGeneration(); - block.ExpirationTimestamp = TMonotonic::Zero(); - IssueOnUpdateBlock(block, true); - } + void TBlobDepotAgent::TBlocksManager::OnBlockedTablets(const NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TEvPushNotify::TBlockedTablet>& tablets) { + for (const auto& tablet : tablets) { + if (const auto it = Blocks.find(tablet.GetTabletId()); it != Blocks.end()) { + auto& block = it->second; + block.BlockedGeneration = tablet.GetBlockedGeneration(); + block.ExpirationTimestamp = TMonotonic::Zero(); + IssueOnUpdateBlock(block, true); } } - }; - - TBlobDepotAgent::TBlocksManagerPtr TBlobDepotAgent::CreateBlocksManager() { - return {new TBlocksManager{*this}, std::default_delete<TBlocksManager>{}}; } - + void TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvQueryBlocks msg, TRequestSender *sender, TRequestContext::TPtr context) { auto ev = std::make_unique<TEvBlobDepot::TEvQueryBlocks>(); msg.Swap(&ev->Record); Issue(std::move(ev), sender, std::move(context)); } - NKikimrProto::EReplyStatus TBlobDepotAgent::CheckBlockForTablet(ui64 tabletId, ui32 generation, TQuery *query, - ui32 *blockedGeneration) { - return BlocksManager->CheckBlockForTablet(tabletId, generation, query, blockedGeneration); - } - - ui32 TBlobDepotAgent::GetBlockForTablet(ui64 tabletId) { - return BlocksManager->GetBlockForTablet(tabletId); - } - - void TBlobDepotAgent::SetBlockForTablet(ui64 tabletId, ui32 blockedGeneration, TMonotonic timestamp, TDuration timeToLive) { - BlocksManager->SetBlockForTablet(tabletId, blockedGeneration, timestamp, timeToLive); - } - - void TBlobDepotAgent::OnBlockedTablets(const NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TEvPushNotify::TBlockedTablet>& tablets) { - BlocksManager->OnBlockedTablets(tablets); - } - } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/blocks.h b/ydb/core/blob_depot/agent/blocks.h new file mode 100644 index 00000000000..ddf79bac885 --- /dev/null +++ b/ydb/core/blob_depot/agent/blocks.h @@ -0,0 +1,48 @@ +#pragma once + +#include "defs.h" +#include "agent_impl.h" + +namespace NKikimr::NBlobDepot { + + class TBlobDepotAgent::TBlocksManager + : public TRequestSender + { + struct TBlock { + ui32 BlockedGeneration = 0; + ui64 IssuerGuid = 0; + TDuration TimeToLive; + TMonotonic ExpirationTimestamp; // not valid after + bool RefreshInFlight = false; + TIntrusiveList<TQuery, TPendingBlockChecks> PendingBlockChecks; + }; + + THashMap<ui64, TBlock> Blocks; + + public: + TBlocksManager(TBlobDepotAgent& agent) + : TRequestSender(agent) + {} + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + struct TQueryBlockContext : TRequestContext { + TMonotonic Timestamp; + ui64 TabletId; + + TQueryBlockContext(TMonotonic timestamp, ui64 tabletId) + : Timestamp(timestamp) + , TabletId(tabletId) + {} + }; + + NKikimrProto::EReplyStatus CheckBlockForTablet(ui64 tabletId, ui32 generation, TQuery *query, ui32 *blockedGeneration); + void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr context, TResponse response) override; + void Handle(TRequestContext::TPtr context, NKikimrBlobDepot::TEvQueryBlocksResult& msg); + void IssueOnUpdateBlock(TBlock& block, bool success); + std::pair<ui32, ui64> GetBlockForTablet(ui64 tabletId); + void SetBlockForTablet(ui64 tabletId, ui32 blockedGeneration, TMonotonic timestamp, TDuration timeToLive); + void OnBlockedTablets(const NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TEvPushNotify::TBlockedTablet>& tablets); + }; + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp index 6359c4e6045..bd09eddd8a2 100644 --- a/ydb/core/blob_depot/agent/comm.cpp +++ b/ydb/core/blob_depot/agent/comm.cpp @@ -1,4 +1,5 @@ #include "agent_impl.h" +#include "blocks.h" namespace NKikimr::NBlobDepot { @@ -153,7 +154,7 @@ namespace NKikimr::NBlobDepot { auto [response, record] = TEvBlobDepot::MakeResponseFor(*ev, SelfId()); auto& msg = ev->Get()->Record; - OnBlockedTablets(msg.GetBlockedTablets()); + BlocksManager.OnBlockedTablets(msg.GetBlockedTablets()); for (const auto& item : msg.GetInvalidatedSteps()) { const ui8 channel = item.GetChannel(); diff --git a/ydb/core/blob_depot/agent/storage_block.cpp b/ydb/core/blob_depot/agent/storage_block.cpp index 65155971033..defebab3370 100644 --- a/ydb/core/blob_depot/agent/storage_block.cpp +++ b/ydb/core/blob_depot/agent/storage_block.cpp @@ -1,4 +1,5 @@ #include "agent_impl.h" +#include "blocks.h" namespace NKikimr::NBlobDepot { @@ -20,16 +21,18 @@ namespace NKikimr::NBlobDepot { auto& msg = *Event->Get<TEvBlobStorage::TEvBlock>(); // lookup existing blocks to try fail-fast - const ui32 blockedGeneration = Agent.GetBlockForTablet(msg.TabletId); - if (msg.Generation <= blockedGeneration) { + const auto& [blockedGeneration, issuerGuid] = Agent.BlocksManager.GetBlockForTablet(msg.TabletId); + if (msg.Generation < blockedGeneration || (msg.Generation == blockedGeneration && ( + msg.IssuerGuid != issuerGuid || !msg.IssuerGuid || !issuerGuid))) { // we don't consider ExpirationTimestamp here because blocked generation may only increase - return EndWithError(NKikimrProto::RACE, "block race detected"); + return EndWithError(NKikimrProto::ALREADY, "block race detected"); } // issue request to the tablet NKikimrBlobDepot::TEvBlock block; block.SetTabletId(msg.TabletId); block.SetBlockedGeneration(msg.Generation); + block.SetIssuerGuid(msg.IssuerGuid); Agent.Issue(std::move(block), this, std::make_shared<TBlockContext>(TActivationContext::Monotonic())); } @@ -52,7 +55,7 @@ namespace NKikimr::NBlobDepot { // update blocks cache auto& blockContext = context->Obtain<TBlockContext>(); auto& query = *Event->Get<TEvBlobStorage::TEvBlock>(); - Agent.SetBlockForTablet(query.TabletId, query.Generation, blockContext.Timestamp, + Agent.BlocksManager.SetBlockForTablet(query.TabletId, query.Generation, blockContext.Timestamp, TDuration::MilliSeconds(msg.Record.GetTimeToLiveMs())); EndWithSuccess(std::make_unique<TEvBlobStorage::TEvBlockResult>(NKikimrProto::OK)); } diff --git a/ydb/core/blob_depot/agent/storage_collect_garbage.cpp b/ydb/core/blob_depot/agent/storage_collect_garbage.cpp index 0d29a5272ed..0b0b6f27ee0 100644 --- a/ydb/core/blob_depot/agent/storage_collect_garbage.cpp +++ b/ydb/core/blob_depot/agent/storage_collect_garbage.cpp @@ -1,4 +1,5 @@ #include "agent_impl.h" +#include "blocks.h" namespace NKikimr::NBlobDepot { @@ -22,7 +23,7 @@ namespace NKikimr::NBlobDepot { NumKeep = msg.Keep ? msg.Keep->size() : 0; NumDoNotKeep = msg.DoNotKeep ? msg.DoNotKeep->size() : 0; - const auto status = Agent.CheckBlockForTablet(msg.TabletId, msg.RecordGeneration, this); + const auto status = Agent.BlocksManager.CheckBlockForTablet(msg.TabletId, msg.RecordGeneration, this, nullptr); if (status == NKikimrProto::OK) { IssueCollectGarbage(); } else if (status != NKikimrProto::UNKNOWN) { diff --git a/ydb/core/blob_depot/agent/storage_discover.cpp b/ydb/core/blob_depot/agent/storage_discover.cpp index 5214042b837..5dfeafb4bd6 100644 --- a/ydb/core/blob_depot/agent/storage_discover.cpp +++ b/ydb/core/blob_depot/agent/storage_discover.cpp @@ -1,4 +1,6 @@ #include "agent_impl.h" +#include "blocks.h" +#include "blob_mapping_cache.h" namespace NKikimr::NBlobDepot { @@ -29,7 +31,7 @@ namespace NKikimr::NBlobDepot { IssueResolve(); if (msg.DiscoverBlockedGeneration) { - const auto status = Agent.CheckBlockForTablet(TabletId, Max<ui32>(), this, &BlockedGeneration); + const auto status = Agent.BlocksManager.CheckBlockForTablet(TabletId, Max<ui32>(), this, &BlockedGeneration); if (status == NKikimrProto::OK) { DoneWithBlockedGeneration = true; } else if (status != NKikimrProto::UNKNOWN) { @@ -77,7 +79,7 @@ namespace NKikimr::NBlobDepot { return EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected"); } - const auto status = Agent.CheckBlockForTablet(TabletId, Max<ui32>(), this, &BlockedGeneration); + const auto status = Agent.BlocksManager.CheckBlockForTablet(TabletId, Max<ui32>(), this, &BlockedGeneration); if (status == NKikimrProto::OK) { DoneWithBlockedGeneration = true; CheckIfDone(); @@ -92,7 +94,7 @@ namespace NKikimr::NBlobDepot { STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA18, "HandleResolveResult", (VirtualGroupId, Agent.VirtualGroupId), (QueryId, QueryId), (Msg, msg.Record)); - Agent.HandleResolveResult(msg.Record); + Agent.BlobMappingCache.HandleResolveResult(msg.Record); const NKikimrProto::EReplyStatus status = msg.Record.GetStatus(); if (status != NKikimrProto::OK && status != NKikimrProto::OVERRUN) { @@ -130,6 +132,9 @@ namespace NKikimr::NBlobDepot { } void OnRead(ui64 /*tag*/, NKikimrProto::EReplyStatus status, TString dataOrErrorReason) override { + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA18, "OnRead", (VirtualGroupId, Agent.VirtualGroupId), + (QueryId, QueryId), (Status, status)); + if (status == NKikimrProto::OK) { Buffer = std::move(dataOrErrorReason); DoneWithData = true; diff --git a/ydb/core/blob_depot/agent/storage_get.cpp b/ydb/core/blob_depot/agent/storage_get.cpp index cce0d2bf09b..65882d78809 100644 --- a/ydb/core/blob_depot/agent/storage_get.cpp +++ b/ydb/core/blob_depot/agent/storage_get.cpp @@ -1,4 +1,5 @@ #include "agent_impl.h" +#include "blob_mapping_cache.h" namespace NKikimr::NBlobDepot { @@ -29,7 +30,8 @@ namespace NKikimr::NBlobDepot { for (ui32 i = 0; i < msg.QuerySize; ++i) { auto& query = msg.Queries[i]; TString blobId(reinterpret_cast<const char*>(query.Id.GetRaw()), 3 * sizeof(ui64)); - if (const TValueChain *value = Agent.ResolveKey(blobId, this, std::make_shared<TResolveKeyContext>(i))) { + if (const TValueChain *value = Agent.BlobMappingCache.ResolveKey(blobId, this, + std::make_shared<TResolveKeyContext>(i))) { ProcessSingleResult(i, value); } diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp index fa98c2e5845..8a66913fbb0 100644 --- a/ydb/core/blob_depot/agent/storage_put.cpp +++ b/ydb/core/blob_depot/agent/storage_put.cpp @@ -1,4 +1,5 @@ #include "agent_impl.h" +#include "blocks.h" namespace NKikimr::NBlobDepot { @@ -24,7 +25,7 @@ namespace NKikimr::NBlobDepot { auto& msg = *Event->Get<TEvBlobStorage::TEvPut>(); // first step -- check blocks - const auto status = Agent.CheckBlockForTablet(msg.Id.TabletID(), msg.Id.Generation(), this); + const auto status = Agent.BlocksManager.CheckBlockForTablet(msg.Id.TabletID(), msg.Id.Generation(), this, nullptr); if (status == NKikimrProto::OK) { IssuePuts(); } else if (status != NKikimrProto::UNKNOWN) { diff --git a/ydb/core/blob_depot/blocks.cpp b/ydb/core/blob_depot/blocks.cpp index f1c8245c4d3..0c7107ee1be 100644 --- a/ydb/core/blob_depot/blocks.cpp +++ b/ydb/core/blob_depot/blocks.cpp @@ -7,16 +7,18 @@ namespace NKikimr::NBlobDepot { const ui64 TabletId; const ui32 BlockedGeneration; const ui32 NodeId; + const ui64 IssuerGuid; const TInstant Timestamp; std::unique_ptr<IEventHandle> Response; public: - TTxUpdateBlock(TBlobDepot *self, ui64 tabletId, ui32 blockedGeneration, ui32 nodeId, TInstant timestamp, - std::unique_ptr<IEventHandle> response) + TTxUpdateBlock(TBlobDepot *self, ui64 tabletId, ui32 blockedGeneration, ui32 nodeId, ui64 issuerGuid, + TInstant timestamp, std::unique_ptr<IEventHandle> response) : TTransactionBase(self) , TabletId(tabletId) , BlockedGeneration(blockedGeneration) , NodeId(nodeId) + , IssuerGuid(issuerGuid) , Timestamp(timestamp) , Response(std::move(response)) {} @@ -29,11 +31,13 @@ namespace NKikimr::NBlobDepot { // update block value in memory auto& block = Self->BlocksManager->Blocks[TabletId]; block.BlockedGeneration = BlockedGeneration; + block.IssuerGuid = IssuerGuid; // and persist it NIceDb::TNiceDb db(txc.DB); db.Table<Schema::Blocks>().Key(TabletId).Update( NIceDb::TUpdate<Schema::Blocks::BlockedGeneration>(BlockedGeneration), + NIceDb::TUpdate<Schema::Blocks::IssuerGuid>(IssuerGuid), NIceDb::TUpdate<Schema::Blocks::IssuedByNode>(NodeId), NIceDb::TUpdate<Schema::Blocks::IssueTimestamp>(Timestamp) ); @@ -45,7 +49,7 @@ namespace NKikimr::NBlobDepot { if (Response->Get<TEvBlobDepot::TEvBlockResult>()->Record.GetStatus() != NKikimrProto::OK) { TActivationContext::Send(Response.release()); } else { - Self->BlocksManager->OnBlockCommitted(TabletId, BlockedGeneration, NodeId, std::move(Response)); + Self->BlocksManager->OnBlockCommitted(TabletId, BlockedGeneration, NodeId, IssuerGuid, std::move(Response)); } } }; @@ -55,19 +59,20 @@ namespace NKikimr::NBlobDepot { const ui64 TabletId; const ui32 BlockedGeneration; const ui32 NodeId; + const ui64 IssuerGuid; std::unique_ptr<IEventHandle> Response; ui32 BlocksPending = 0; ui32 RetryCount = 0; - const ui64 IssuerGuid = RandomNumber<ui64>() | 1; THashSet<ui32> NodesWaitingForPushResult; public: - TBlockProcessorActor(TBlobDepot *self, ui64 tabletId, ui32 blockedGeneration, ui32 nodeId, + TBlockProcessorActor(TBlobDepot *self, ui64 tabletId, ui32 blockedGeneration, ui32 nodeId, ui64 issuerGuid, std::unique_ptr<IEventHandle> response) : Self(self) , TabletId(tabletId) , BlockedGeneration(blockedGeneration) , NodeId(nodeId) + , IssuerGuid(issuerGuid) , Response(std::move(response)) {} @@ -181,12 +186,15 @@ namespace NKikimr::NBlobDepot { ) }; - void TBlobDepot::TBlocksManager::AddBlockOnLoad(ui64 tabletId, ui32 generation) { - Blocks[tabletId].BlockedGeneration = generation; + void TBlobDepot::TBlocksManager::AddBlockOnLoad(ui64 tabletId, ui32 generation, ui64 issuerGuid) { + auto& block = Blocks[tabletId]; + block.BlockedGeneration = generation; + block.IssuerGuid = issuerGuid; } - void TBlobDepot::TBlocksManager::OnBlockCommitted(ui64 tabletId, ui32 blockedGeneration, ui32 nodeId, std::unique_ptr<IEventHandle> response) { - Self->RegisterWithSameMailbox(new TBlockProcessorActor(Self, tabletId, blockedGeneration, nodeId, + void TBlobDepot::TBlocksManager::OnBlockCommitted(ui64 tabletId, ui32 blockedGeneration, ui32 nodeId, ui64 issuerGuid, + std::unique_ptr<IEventHandle> response) { + Self->RegisterWithSameMailbox(new TBlockProcessorActor(Self, tabletId, blockedGeneration, nodeId, issuerGuid, std::move(response))); } @@ -201,12 +209,14 @@ namespace NKikimr::NBlobDepot { } else { const ui64 tabletId = record.GetTabletId(); const ui32 blockedGeneration = record.GetBlockedGeneration(); - if (const auto it = Blocks.find(tabletId); it != Blocks.end() && blockedGeneration <= it->second.BlockedGeneration) { - responseRecord->SetStatus(NKikimrProto::ALREADY); - } else { + const ui64 issuerGuid = record.GetIssuerGuid(); + const auto it = Blocks.find(tabletId); + if (it == Blocks.end() || it->second.CanSetNewBlock(blockedGeneration, issuerGuid)) { TAgent& agent = Self->GetAgent(ev->Recipient); Self->Execute(std::make_unique<TTxUpdateBlock>(Self, tabletId, blockedGeneration, - agent.ConnectedNodeId, TActivationContext::Now(), std::move(response))); + agent.ConnectedNodeId, record.GetIssuerGuid(), TActivationContext::Now(), std::move(response))); + } else { + responseRecord->SetStatus(NKikimrProto::ALREADY); } } diff --git a/ydb/core/blob_depot/blocks.h b/ydb/core/blob_depot/blocks.h index 244b7c9ae43..0d26393dfaa 100644 --- a/ydb/core/blob_depot/blocks.h +++ b/ydb/core/blob_depot/blocks.h @@ -18,7 +18,13 @@ namespace NKikimr::NBlobDepot { }; ui32 BlockedGeneration = 0; + ui64 IssuerGuid = 0; THashMap<ui32, TPerAgentInfo> PerAgentInfo; + + bool CanSetNewBlock(ui32 blockedGeneration, ui64 issuerGuid) const { + return BlockedGeneration < blockedGeneration || (BlockedGeneration == blockedGeneration && + (IssuerGuid == issuerGuid && IssuerGuid && issuerGuid)); + } }; TBlobDepot* const Self; @@ -33,8 +39,9 @@ namespace NKikimr::NBlobDepot { : Self(self) {} - void AddBlockOnLoad(ui64 tabletId, ui32 generation); - void OnBlockCommitted(ui64 tabletId, ui32 blockedGeneration, ui32 nodeId, std::unique_ptr<IEventHandle> response); + void AddBlockOnLoad(ui64 tabletId, ui32 generation, ui64 issuerGuid); + void OnBlockCommitted(ui64 tabletId, ui32 blockedGeneration, ui32 nodeId, ui64 issuerGuid, + std::unique_ptr<IEventHandle> response); void Handle(TEvBlobDepot::TEvBlock::TPtr ev); void Handle(TEvBlobDepot::TEvQueryBlocks::TPtr ev); diff --git a/ydb/core/blob_depot/events.h b/ydb/core/blob_depot/events.h index ce20efa2ec5..3203d8800c9 100644 --- a/ydb/core/blob_depot/events.h +++ b/ydb/core/blob_depot/events.h @@ -59,7 +59,7 @@ namespace NKikimr { BLOBDEPOT_EVENT_PB(EvAllocateIdsResult, ChannelKind, Generation); BLOBDEPOT_EVENT_PB_NO_ARGS(EvPushNotify); BLOBDEPOT_EVENT_PB_NO_ARGS(EvPushNotifyResult); - BLOBDEPOT_EVENT_PB(EvBlock, TabletId, BlockedGeneration); + BLOBDEPOT_EVENT_PB(EvBlock, TabletId, BlockedGeneration, IssuerGuid); BLOBDEPOT_EVENT_PB(EvBlockResult, Status, ErrorReason, TimeToLiveMs); BLOBDEPOT_EVENT_PB_NO_ARGS(EvQueryBlocks); BLOBDEPOT_EVENT_PB_NO_ARGS(EvQueryBlocksResult); diff --git a/ydb/core/blob_depot/op_load.cpp b/ydb/core/blob_depot/op_load.cpp index d2db397b991..c15641b18d3 100644 --- a/ydb/core/blob_depot/op_load.cpp +++ b/ydb/core/blob_depot/op_load.cpp @@ -46,7 +46,8 @@ namespace NKikimr::NBlobDepot { while (table.IsValid()) { Self->BlocksManager->AddBlockOnLoad( table.GetValue<Schema::Blocks::TabletId>(), - table.GetValue<Schema::Blocks::BlockedGeneration>() + table.GetValue<Schema::Blocks::BlockedGeneration>(), + table.GetValue<Schema::Blocks::IssuerGuid>() ); if (!table.Next()) { return false; diff --git a/ydb/core/blob_depot/schema.h b/ydb/core/blob_depot/schema.h index beeba0585d0..057f91cad17 100644 --- a/ydb/core/blob_depot/schema.h +++ b/ydb/core/blob_depot/schema.h @@ -23,13 +23,15 @@ namespace NKikimr::NBlobDepot { struct Blocks : Table<2> { struct TabletId : Column<1, NScheme::NTypeIds::Uint64> {}; struct BlockedGeneration : Column<2, NScheme::NTypeIds::Uint32> {}; - struct IssueTimestamp : Column<3, NScheme::NTypeIds::Uint64> { using Type = TInstant; }; - struct IssuedByNode : Column<4, NScheme::NTypeIds::Uint32> {}; + struct IssuerGuid : Column<3, NScheme::NTypeIds::Uint64> {}; + struct IssueTimestamp : Column<4, NScheme::NTypeIds::Uint64> { using Type = TInstant; }; + struct IssuedByNode : Column<5, NScheme::NTypeIds::Uint32> {}; using TKey = TableKey<TabletId>; using TColumns = TableColumns< TabletId, BlockedGeneration, + IssuerGuid, IssueTimestamp, IssuedByNode >; diff --git a/ydb/core/protos/blob_depot.proto b/ydb/core/protos/blob_depot.proto index 15f9c568f51..6a911514b21 100644 --- a/ydb/core/protos/blob_depot.proto +++ b/ydb/core/protos/blob_depot.proto @@ -91,6 +91,7 @@ message TEvAllocateIdsResult { message TEvBlock { optional fixed64 TabletId = 1; optional uint32 BlockedGeneration = 2; + optional uint64 IssuerGuid = 3; } message TEvBlockResult { |