diff options
author | Alexander Rutkovsky <alexander.rutkovsky@gmail.com> | 2022-06-28 16:34:28 +0300 |
---|---|---|
committer | Alexander Rutkovsky <alexander.rutkovsky@gmail.com> | 2022-06-28 16:34:28 +0300 |
commit | 247f5b950d58304bd60e296b4104780b995be97d (patch) | |
tree | ef696879621720bde24a7cef80396323dc6b0b10 | |
parent | 00850ef7df561dfea6516f639b432ebced561983 (diff) | |
download | ydb-247f5b950d58304bd60e296b4104780b995be97d.tar.gz |
BlobDepot work in progress KIKIMR-14867
ref:ee7ea658a40cb124bab5002c4ec850ea395750bc
-rw-r--r-- | ydb/core/blob_depot/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/agent_impl.h | 6 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/blocks.cpp | 35 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/comm.cpp | 47 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/garbage.cpp | 11 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/query.cpp | 6 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/request.cpp | 4 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_collect_garbage.cpp | 73 | ||||
-rw-r--r-- | ydb/core/blob_depot/agent/storage_discover.cpp | 4 | ||||
-rw-r--r-- | ydb/core/blob_depot/blob_depot_agent.cpp | 14 | ||||
-rw-r--r-- | ydb/core/blob_depot/blob_depot_tablet.h | 60 | ||||
-rw-r--r-- | ydb/core/blob_depot/blocks.cpp | 44 | ||||
-rw-r--r-- | ydb/core/blob_depot/events.h | 5 | ||||
-rw-r--r-- | ydb/core/blob_depot/garbage_collection.cpp | 60 | ||||
-rw-r--r-- | ydb/core/blob_depot/op_apply_config.cpp | 2 | ||||
-rw-r--r-- | ydb/core/blob_depot/op_load.cpp | 2 | ||||
-rw-r--r-- | ydb/core/blob_depot/op_resolve.cpp | 4 | ||||
-rw-r--r-- | ydb/core/protos/blob_depot.proto | 17 |
19 files changed, 283 insertions, 113 deletions
diff --git a/ydb/core/blob_depot/CMakeLists.txt b/ydb/core/blob_depot/CMakeLists.txt index e0edc850baf..b603df3fb02 100644 --- a/ydb/core/blob_depot/CMakeLists.txt +++ b/ydb/core/blob_depot/CMakeLists.txt @@ -18,6 +18,7 @@ target_sources(ydb-core-blob_depot PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blob_depot.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blob_depot_agent.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blocks.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/garbage_collection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_apply_config.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_init_schema.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_load.cpp diff --git a/ydb/core/blob_depot/agent/CMakeLists.txt b/ydb/core/blob_depot/agent/CMakeLists.txt index 29aca02aa67..1a57dd02a7d 100644 --- a/ydb/core/blob_depot/agent/CMakeLists.txt +++ b/ydb/core/blob_depot/agent/CMakeLists.txt @@ -18,6 +18,7 @@ target_sources(core-blob_depot-agent PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/agent.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/blocks.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/comm.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/garbage.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/query.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/read.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/request.cpp diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h index ff0f9848e55..95620605c6d 100644 --- a/ydb/core/blob_depot/agent/agent_impl.h +++ b/ydb/core/blob_depot/agent/agent_impl.h @@ -48,6 +48,7 @@ namespace NKikimr::NBlobDepot { TEvBlobDepot::TEvAllocateIdsResult*, TEvBlobDepot::TEvBlockResult*, TEvBlobDepot::TEvQueryBlocksResult*, + TEvBlobDepot::TEvCollectGarbageResult*, TEvBlobDepot::TEvCommitBlobSeqResult*, TEvBlobDepot::TEvResolveResult*, @@ -159,12 +160,13 @@ namespace NKikimr::NBlobDepot { void OnDisconnect(); void ProcessResponse(ui64 id, TRequestContext::TPtr context, TResponse response) override; - void HandleRegisterAgentResult(TRequestContext::TPtr context, TEvBlobDepot::TEvRegisterAgentResult& msg); - void HandleAllocateIdsResult(TRequestContext::TPtr context, TEvBlobDepot::TEvAllocateIdsResult& msg); + void Handle(TRequestContext::TPtr context, NKikimrBlobDepot::TEvRegisterAgentResult& msg); + void Handle(TRequestContext::TPtr context, NKikimrBlobDepot::TEvAllocateIdsResult& msg); void Issue(NKikimrBlobDepot::TEvBlock msg, TRequestSender *sender, TRequestContext::TPtr context); void Issue(NKikimrBlobDepot::TEvResolve msg, TRequestSender *sender, TRequestContext::TPtr context); void Issue(NKikimrBlobDepot::TEvQueryBlocks msg, TRequestSender *sender, TRequestContext::TPtr context); + void Issue(NKikimrBlobDepot::TEvCollectGarbage msg, TRequestSender *sender, TRequestContext::TPtr context); void Issue(std::unique_ptr<IEventBase> ev, TRequestSender *sender, TRequestContext::TPtr context); diff --git a/ydb/core/blob_depot/agent/blocks.cpp b/ydb/core/blob_depot/agent/blocks.cpp index 86b49bc342f..4e11044a5c7 100644 --- a/ydb/core/blob_depot/agent/blocks.cpp +++ b/ydb/core/blob_depot/agent/blocks.cpp @@ -54,27 +54,36 @@ namespace NKikimr::NBlobDepot { } void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr context, TResponse response) override { - auto& queryBlockContext = context->Obtain<TQueryBlockContext>(); - auto& block = Blocks[queryBlockContext.TabletId]; - if (auto *p = std::get_if<TEvBlobDepot::TEvQueryBlocksResult*>(&response)) { - auto& msg = **p; - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC08, "TEvQueryBlocksResult", (VirtualGroupId, Agent.VirtualGroupId), - (Msg, msg.Record), (TabletId, queryBlockContext.TabletId)); - Y_VERIFY(msg.Record.BlockedGenerationsSize() == 1); - const ui32 newBlockedGeneration = msg.Record.GetBlockedGenerations(0); - Y_VERIFY(block.BlockedGeneration <= newBlockedGeneration); - block.BlockedGeneration = newBlockedGeneration; - block.ExpirationTimestamp = queryBlockContext.Timestamp + TDuration::MilliSeconds(msg.Record.GetTimeToLiveMs()); + Handle(std::move(context), (*p)->Record); + } else if (std::holds_alternative<TTabletDisconnected>(response)) { + IssueOnUpdateBlock(context, false); } else { - Y_VERIFY(std::holds_alternative<TTabletDisconnected>(response)); + Y_FAIL("unexpected response type"); } + } + + void Handle(TRequestContext::TPtr context, NKikimrBlobDepot::TEvQueryBlocksResult& msg) { + auto& queryBlockContext = context->Obtain<TQueryBlockContext>(); + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA01, "TEvQueryBlocksResult", (VirtualGroupId, Agent.VirtualGroupId), + (Msg, msg), (TabletId, queryBlockContext.TabletId)); + auto& block = Blocks[queryBlockContext.TabletId]; + Y_VERIFY(msg.BlockedGenerationsSize() == 1); + const ui32 newBlockedGeneration = msg.GetBlockedGenerations(0); + Y_VERIFY(block.BlockedGeneration <= newBlockedGeneration); + block.BlockedGeneration = newBlockedGeneration; + block.ExpirationTimestamp = queryBlockContext.Timestamp + TDuration::MilliSeconds(msg.GetTimeToLiveMs()); + IssueOnUpdateBlock(context, true); + } + void IssueOnUpdateBlock(const TRequestContext::TPtr& context, bool success) { + auto& queryBlockContext = context->Obtain<TQueryBlockContext>(); + auto& block = Blocks[queryBlockContext.TabletId]; TIntrusiveList<TQuery, TPendingBlockChecks> temp; temp.Swap(block.PendingBlockChecks); for (auto it = temp.begin(); it != temp.end(); ) { const auto current = it++; - current->OnUpdateBlock(!std::holds_alternative<TTabletDisconnected>(response)); + current->OnUpdateBlock(success); } } diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp index 480504db430..6ed071777cd 100644 --- a/ydb/core/blob_depot/agent/comm.cpp +++ b/ydb/core/blob_depot/agent/comm.cpp @@ -3,12 +3,12 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::Handle(TEvTabletPipe::TEvClientConnected::TPtr ev) { - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDAC01, "TEvClientConnected", (VirtualGroupId, VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA02, "TEvClientConnected", (VirtualGroupId, VirtualGroupId), (Msg, ev->Get()->ToString())); } void TBlobDepotAgent::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev) { - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC02, "TEvClientDestroyed", (VirtualGroupId, VirtualGroupId), + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA03, "TEvClientDestroyed", (VirtualGroupId, VirtualGroupId), (Msg, ev->Get()->ToString())); PipeId = {}; OnDisconnect(); @@ -24,12 +24,12 @@ namespace NKikimr::NBlobDepot { IssueAllocateIdsIfNeeded(NKikimrBlobDepot::TChannelKind::Log); } - void TBlobDepotAgent::HandleRegisterAgentResult(TRequestContext::TPtr /*context*/, TEvBlobDepot::TEvRegisterAgentResult& msg) { - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC06, "TEvRegisterAgentResult", (VirtualGroupId, VirtualGroupId), - (Msg, msg.Record)); + void TBlobDepotAgent::Handle(TRequestContext::TPtr /*context*/, NKikimrBlobDepot::TEvRegisterAgentResult& msg) { + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA04, "TEvRegisterAgentResult", (VirtualGroupId, VirtualGroupId), + (Msg, msg)); Registered = true; - BlobDepotGeneration = msg.Record.GetGeneration(); - for (const auto& kind : msg.Record.GetChannelKinds()) { + BlobDepotGeneration = msg.GetGeneration(); + for (const auto& kind : msg.GetChannelKinds()) { auto& v = ChannelKinds[kind.GetChannelKind()]; v.ChannelGroups.clear(); v.IndexToChannel.clear(); @@ -46,7 +46,7 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::IssueAllocateIdsIfNeeded(NKikimrBlobDepot::TChannelKind::E channelKind) { auto& kind = ChannelKinds[channelKind]; - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC09, "IssueAllocateIdsIfNeeded", (VirtualGroupId, VirtualGroupId), + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA05, "IssueAllocateIdsIfNeeded", (VirtualGroupId, VirtualGroupId), (ChannelKind, NKikimrBlobDepot::TChannelKind::E_Name(channelKind)), (IdAllocInFlight, kind.IdAllocInFlight), (IdQ.size, kind.IdQ.size()), (PreallocatedIdCount, kind.PreallocatedIdCount), (PipeId, PipeId)); @@ -58,20 +58,20 @@ namespace NKikimr::NBlobDepot { } } - void TBlobDepotAgent::HandleAllocateIdsResult(TRequestContext::TPtr context, TEvBlobDepot::TEvAllocateIdsResult& msg) { + void TBlobDepotAgent::Handle(TRequestContext::TPtr context, NKikimrBlobDepot::TEvAllocateIdsResult& msg) { auto& allocateIdsContext = context->Obtain<TAllocateIdsContext>(); auto& kind = ChannelKinds[allocateIdsContext.ChannelKind]; Y_VERIFY(kind.IdAllocInFlight); kind.IdAllocInFlight = false; - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC07, "TEvAllocateIdsResult", (VirtualGroupId, VirtualGroupId), - (Msg, msg.Record)); - Y_VERIFY(msg.Record.GetChannelKind() == allocateIdsContext.ChannelKind); - Y_VERIFY(msg.Record.GetGeneration() == BlobDepotGeneration); + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA06, "TEvAllocateIdsResult", (VirtualGroupId, VirtualGroupId), + (Msg, msg)); + Y_VERIFY(msg.GetChannelKind() == allocateIdsContext.ChannelKind); + Y_VERIFY(msg.GetGeneration() == BlobDepotGeneration); - if (msg.Record.HasRangeBegin() && msg.Record.HasRangeEnd()) { - kind.IdQ.push_back({BlobDepotGeneration, msg.Record.GetRangeBegin(), msg.Record.GetRangeEnd()}); + if (msg.HasRangeBegin() && msg.HasRangeEnd()) { + kind.IdQ.push_back({BlobDepotGeneration, msg.GetRangeBegin(), msg.GetRangeEnd()}); // FIXME notify waiting requests about new ids @@ -83,7 +83,7 @@ namespace NKikimr::NBlobDepot { } void TBlobDepotAgent::OnDisconnect() { - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDAC04, "OnDisconnect", (VirtualGroupId, VirtualGroupId)); + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA07, "OnDisconnect", (VirtualGroupId, VirtualGroupId)); for (auto& [id, sender] : std::exchange(TabletRequestInFlight, {})) { sender->OnRequestComplete(id, TTabletDisconnected{}); @@ -97,14 +97,13 @@ namespace NKikimr::NBlobDepot { } void TBlobDepotAgent::ProcessResponse(ui64 /*id*/, TRequestContext::TPtr context, TResponse response) { - std::visit([&](auto&& item) { - using T = std::decay_t<decltype(item)>; - if constexpr (std::is_same_v<T, TEvBlobDepot::TEvRegisterAgentResult*>) { - HandleRegisterAgentResult(std::move(context), *item); - } else if constexpr (std::is_same_v<T, TEvBlobDepot::TEvAllocateIdsResult*>) { - HandleAllocateIdsResult(std::move(context), *item); + std::visit([&](auto&& response) { + using T = std::decay_t<decltype(response)>; + if constexpr (std::is_same_v<T, TEvBlobDepot::TEvRegisterAgentResult*> + || std::is_same_v<T, TEvBlobDepot::TEvAllocateIdsResult*>) { + Handle(std::move(context), response->Record); } else if constexpr (!std::is_same_v<T, TTabletDisconnected>) { - Y_FAIL(); + Y_FAIL_S("unexpected response received Type# " << TypeName<T>()); } }, response); } @@ -123,7 +122,7 @@ namespace NKikimr::NBlobDepot { void TBlobDepotAgent::Issue(std::unique_ptr<IEventBase> ev, TRequestSender *sender, TRequestContext::TPtr context) { const ui64 id = NextRequestId++; - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDAC03, "Issue", (VirtualGroupId, VirtualGroupId), (Id, id), (Msg, ev->ToString())); + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA08, "Issue", (VirtualGroupId, VirtualGroupId), (Id, id), (Msg, ev->ToString())); NTabletPipe::SendData(SelfId(), PipeId, ev.release(), id); RegisterRequest(id, sender, std::move(context), true); } diff --git a/ydb/core/blob_depot/agent/garbage.cpp b/ydb/core/blob_depot/agent/garbage.cpp new file mode 100644 index 00000000000..27178294729 --- /dev/null +++ b/ydb/core/blob_depot/agent/garbage.cpp @@ -0,0 +1,11 @@ +#include "agent_impl.h" + +namespace NKikimr::NBlobDepot { + + void TBlobDepotAgent::Issue(NKikimrBlobDepot::TEvCollectGarbage msg, TRequestSender *sender, TRequestContext::TPtr context) { + auto ev = std::make_unique<TEvBlobDepot::TEvCollectGarbage>(); + msg.Swap(&ev->Record); + Issue(std::move(ev), sender, std::move(context)); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/query.cpp b/ydb/core/blob_depot/agent/query.cpp index 533a793077e..0eb0e862ce6 100644 --- a/ydb/core/blob_depot/agent/query.cpp +++ b/ydb/core/blob_depot/agent/query.cpp @@ -8,7 +8,7 @@ namespace NKikimr::NBlobDepot { PendingEventQ.emplace_back(ev.Release()); } else { auto *query = CreateQuery(ev); - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA01, "new query", (VirtualGroupId, VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA09, "new query", (VirtualGroupId, VirtualGroupId), (QueryId, query->GetQueryId()), (Name, query->GetName())); if (!TabletId) { query->EndWithError(NKikimrProto::ERROR, "group is in error state"); @@ -30,7 +30,7 @@ namespace NKikimr::NBlobDepot { } void TBlobDepotAgent::TQuery::EndWithError(NKikimrProto::EReplyStatus status, const TString& errorReason) { - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA02, "query ends with error", (VirtualGroupId, Agent.VirtualGroupId), + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA10, "query ends with error", (VirtualGroupId, Agent.VirtualGroupId), (QueryId, QueryId), (Status, status), (ErrorReason, errorReason)); std::unique_ptr<IEventBase> response; @@ -49,7 +49,7 @@ namespace NKikimr::NBlobDepot { } void TBlobDepotAgent::TQuery::EndWithSuccess(std::unique_ptr<IEventBase> response) { - STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA02, "query ends with success", (VirtualGroupId, Agent.VirtualGroupId), + STLOG(PRI_INFO, BLOB_DEPOT_AGENT, BDA11, "query ends with success", (VirtualGroupId, Agent.VirtualGroupId), (QueryId, QueryId), (Response, response->ToString())); Agent.SelfId().Send(Event->Sender, response.release(), 0, Event->Cookie); delete this; diff --git a/ydb/core/blob_depot/agent/request.cpp b/ydb/core/blob_depot/agent/request.cpp index 46639c9b379..32266410f6f 100644 --- a/ydb/core/blob_depot/agent/request.cpp +++ b/ydb/core/blob_depot/agent/request.cpp @@ -43,7 +43,7 @@ namespace NKikimr::NBlobDepot { template<typename TEvent> void TBlobDepotAgent::HandleTabletResponse(TAutoPtr<TEventHandle<TEvent>> ev) { - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA01, "HandleTabletResponse", (VirtualGroupId, VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA12, "HandleTabletResponse", (VirtualGroupId, VirtualGroupId), (Id, ev->Cookie), (Type, TypeName<TEvent>())); auto *event = ev->Get(); HandleResponse(reinterpret_cast<TAutoPtr<IEventHandle>&>(ev), event, TabletRequestInFlight); @@ -58,7 +58,7 @@ namespace NKikimr::NBlobDepot { template<typename TEvent> void TBlobDepotAgent::HandleOtherResponse(TAutoPtr<TEventHandle<TEvent>> ev) { - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA02, "HandleOtherResponse", (VirtualGroupId, VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA13, "HandleOtherResponse", (VirtualGroupId, VirtualGroupId), (Id, ev->Cookie), (Type, TypeName<TEvent>())); auto *event = ev->Get(); HandleResponse(ev, event, OtherRequestInFlight); diff --git a/ydb/core/blob_depot/agent/storage_collect_garbage.cpp b/ydb/core/blob_depot/agent/storage_collect_garbage.cpp index eeaa7de4368..fdac21f6ea2 100644 --- a/ydb/core/blob_depot/agent/storage_collect_garbage.cpp +++ b/ydb/core/blob_depot/agent/storage_collect_garbage.cpp @@ -6,6 +6,12 @@ namespace NKikimr::NBlobDepot { TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvCollectGarbage>(std::unique_ptr<IEventHandle> ev) { class TCollectGarbageQuery : public TQuery { ui32 BlockChecksRemain = 3; + ui32 KeepIndex = 0; + ui32 NumKeep; + ui32 DoNotKeepIndex = 0; + ui32 NumDoNotKeep; + ui32 CounterShift = 0; + bool IsLast; public: using TQuery::TQuery; @@ -13,7 +19,11 @@ namespace NKikimr::NBlobDepot { void Initiate() override { auto& msg = *Event->Get<TEvBlobStorage::TEvCollectGarbage>(); + NumKeep = msg.Keep ? msg.Keep->size() : 0; + NumDoNotKeep = msg.DoNotKeep ? msg.DoNotKeep->size() : 0; + const auto status = Agent.CheckBlockForTablet(msg.TabletId, msg.RecordGeneration, this); + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA99, "CheckBlockForTablet", (Status, status)); if (status == NKikimrProto::OK) { IssueCollectGarbage(); } else if (status != NKikimrProto::UNKNOWN) { @@ -24,11 +34,68 @@ namespace NKikimr::NBlobDepot { } void IssueCollectGarbage() { + auto& msg = *Event->Get<TEvBlobStorage::TEvCollectGarbage>(); + NKikimrBlobDepot::TEvCollectGarbage record; + + ui32 numItemsIssued = 0; + + for (; KeepIndex < NumKeep && numItemsIssued < MaxCollectGarbageFlagsPerMessage; ++KeepIndex) { + LogoBlobIDFromLogoBlobID((*msg.Keep)[KeepIndex], record.AddKeep()); + ++numItemsIssued; + } + for (; DoNotKeepIndex < NumDoNotKeep && numItemsIssued < MaxCollectGarbageFlagsPerMessage; ++DoNotKeepIndex) { + LogoBlobIDFromLogoBlobID((*msg.DoNotKeep)[DoNotKeepIndex], record.AddDoNotKeep()); + ++numItemsIssued; + } + + IsLast = KeepIndex == NumKeep && DoNotKeepIndex == NumDoNotKeep; + + record.SetTabletId(msg.TabletId); + record.SetGeneration(msg.RecordGeneration); + record.SetPerGenerationCounter(msg.PerGenerationCounter + CounterShift); + record.SetChannel(msg.Channel); + + if (msg.Collect && IsLast) { + record.SetHard(msg.Hard); + record.SetCollectGeneration(msg.CollectGeneration); + record.SetCollectStep(msg.CollectStep); + } + + Agent.Issue(std::move(record), this, nullptr); + + ++CounterShift; + } + + void OnUpdateBlock(bool success) override { + if (success) { + Initiate(); + } else { + EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected"); + } } - void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr /*context*/, TResponse response) override { - (void)response; - Y_FAIL(); + void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr context, TResponse response) override { + if (std::holds_alternative<TTabletDisconnected>(response)) { + EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected"); + } else if (auto *p = std::get_if<TEvBlobDepot::TEvCollectGarbageResult*>(&response)) { + HandleCollectGarbageResult(std::move(context), (*p)->Record); + } else { + Y_FAIL(); + } + } + + void HandleCollectGarbageResult(TRequestContext::TPtr /*context*/, NKikimrBlobDepot::TEvCollectGarbageResult& msg) { + if (!msg.HasStatus()) { + EndWithError(NKikimrProto::ERROR, "incorrect TEvCollectGarbageResult protobuf"); + } else if (const auto status = msg.GetStatus(); status != NKikimrProto::OK) { + EndWithError(status, msg.GetErrorReason()); + } else if (IsLast) { + auto& msg = *Event->Get<TEvBlobStorage::TEvCollectGarbage>(); + EndWithSuccess(std::make_unique<TEvBlobStorage::TEvCollectGarbageResult>(NKikimrProto::OK, + msg.TabletId, msg.RecordGeneration, msg.PerGenerationCounter, msg.Channel)); + } else { + IssueCollectGarbage(); + } } }; diff --git a/ydb/core/blob_depot/agent/storage_discover.cpp b/ydb/core/blob_depot/agent/storage_discover.cpp index 61e22a3c578..45b8d6f34ad 100644 --- a/ydb/core/blob_depot/agent/storage_discover.cpp +++ b/ydb/core/blob_depot/agent/storage_discover.cpp @@ -68,7 +68,7 @@ namespace NKikimr::NBlobDepot { } void OnUpdateBlock(bool success) override { - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDASD02, "OnUpdateBlock", (VirtualGroupId, Agent.VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA14, "OnUpdateBlock", (VirtualGroupId, Agent.VirtualGroupId), (QueryId, QueryId), (Success, success)); if (!success) { @@ -87,7 +87,7 @@ namespace NKikimr::NBlobDepot { } void HandleResolveResult(ui64 id, TRequestContext::TPtr context, TEvBlobDepot::TEvResolveResult& msg) { - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDASD01, "HandleResolveResult", (VirtualGroupId, Agent.VirtualGroupId), + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA15, "HandleResolveResult", (VirtualGroupId, Agent.VirtualGroupId), (QueryId, QueryId), (Msg, msg.Record)); const NKikimrProto::EReplyStatus status = msg.Record.GetStatus(); diff --git a/ydb/core/blob_depot/blob_depot_agent.cpp b/ydb/core/blob_depot/blob_depot_agent.cpp index 6c2b769d338..22848df7e04 100644 --- a/ydb/core/blob_depot/blob_depot_agent.cpp +++ b/ydb/core/blob_depot/blob_depot_agent.cpp @@ -3,13 +3,13 @@ namespace NKikimr::NBlobDepot { void TBlobDepot::Handle(TEvTabletPipe::TEvServerConnected::TPtr ev) { - STLOG(PRI_DEBUG, BLOB_DEPOT, BD01, "TEvServerConnected", (TabletId, TabletID()), (Msg, ev->Get()->ToString())); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "TEvServerConnected", (TabletId, TabletID()), (Msg, ev->Get()->ToString())); const auto [it, inserted] = PipeServerToNode.emplace(ev->Get()->ServerId, std::nullopt); Y_VERIFY(inserted); } void TBlobDepot::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr ev) { - STLOG(PRI_DEBUG, BLOB_DEPOT, BD02, "TEvServerDisconnected", (TabletId, TabletID()), (Msg, ev->Get()->ToString())); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT02, "TEvServerDisconnected", (TabletId, TabletID()), (Msg, ev->Get()->ToString())); const auto it = PipeServerToNode.find(ev->Get()->ServerId); Y_VERIFY(it != PipeServerToNode.end()); if (const auto& nodeId = it->second) { @@ -25,8 +25,7 @@ namespace NKikimr::NBlobDepot { PipeServerToNode.erase(it); } - void TBlobDepot::OnAgentDisconnect(TAgentInfo& agent) { - BlocksManager.OnAgentDisconnect(agent); + void TBlobDepot::OnAgentDisconnect(TAgentInfo& /*agent*/) { } void TBlobDepot::Handle(TEvBlobDepot::TEvRegisterAgent::TPtr ev) { @@ -36,7 +35,7 @@ namespace NKikimr::NBlobDepot { Y_VERIFY(!it->second || *it->second == nodeId); it->second = nodeId; auto& agent = Agents[nodeId]; - STLOG(PRI_DEBUG, BLOB_DEPOT, BD03, "TEvRegisterAgent", (TabletId, TabletID()), (Msg, ev->Get()->Record), + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT03, "TEvRegisterAgent", (TabletId, TabletID()), (Msg, ev->Get()->Record), (NodeId, nodeId), (PipeServerId, it->first)); agent.ConnectedAgent = it->first; agent.ConnectedNodeId = nodeId; @@ -59,12 +58,11 @@ namespace NKikimr::NBlobDepot { TActivationContext::Send(response.release()); } - void TBlobDepot::OnAgentConnect(TAgentInfo& agent) { - BlocksManager.OnAgentConnect(agent); + void TBlobDepot::OnAgentConnect(TAgentInfo& /*agent*/) { } void TBlobDepot::Handle(TEvBlobDepot::TEvAllocateIds::TPtr ev) { - STLOG(PRI_DEBUG, BLOB_DEPOT, BD04, "TEvAllocateIds", (TabletId, TabletID()), (Msg, ev->Get()->Record), + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT04, "TEvAllocateIds", (TabletId, TabletID()), (Msg, ev->Get()->Record), (PipeServerId, ev->Recipient)); auto [response, record] = TEvBlobDepot::MakeResponseFor(ev, SelfId(), ev->Get()->Record.GetChannelKind(), diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h index d7b14fed2ae..e51876592d6 100644 --- a/ydb/core/blob_depot/blob_depot_tablet.h +++ b/ydb/core/blob_depot/blob_depot_tablet.h @@ -22,11 +22,12 @@ namespace NKikimr::NBlobDepot { TBlobDepot(TActorId tablet, TTabletStorageInfo *info) : TActor(&TThis::StateInit) , TTabletExecutedFlat(info, tablet, new NMiniKQL::TMiniKQLFactory) - , BlocksManager(this) + , BlocksManager(CreateBlocksManager()) + , GarbageCollectionManager(CreateGarbageCollectionManager()) {} void HandlePoison() { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT02, "HandlePoison", (TabletId, TabletID())); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT09, "HandlePoison", (TabletId, TabletID())); Become(&TThis::StateZombie); Send(Tablet(), new TEvents::TEvPoison); } @@ -70,7 +71,7 @@ namespace NKikimr::NBlobDepot { } void OnActivateExecutor(const TActorContext&) override { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT03, "OnActivateExecutor", (TabletId, TabletID())); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT10, "OnActivateExecutor", (TabletId, TabletID())); ExecuteTxInitSchema(); @@ -81,14 +82,14 @@ namespace NKikimr::NBlobDepot { } void OnDetach(const TActorContext&) override { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT04, "OnDetach", (TabletId, TabletID())); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT11, "OnDetach", (TabletId, TabletID())); // TODO: what does this callback mean PassAway(); } void OnTabletDead(TEvTablet::TEvTabletDead::TPtr& /*ev*/, const TActorContext&) override { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT05, "OnTabletDead", (TabletId, TabletID())); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT12, "OnTabletDead", (TabletId, TabletID())); PassAway(); } @@ -118,8 +119,10 @@ namespace NKikimr::NBlobDepot { hFunc(TEvBlobDepot::TEvCommitBlobSeq, Handle); hFunc(TEvBlobDepot::TEvResolve, Handle); - hFunc(TEvBlobDepot::TEvBlock, BlocksManager.Handle); - hFunc(TEvBlobDepot::TEvQueryBlocks, BlocksManager.Handle); + hFunc(TEvBlobDepot::TEvBlock, Handle); + hFunc(TEvBlobDepot::TEvQueryBlocks, Handle); + + hFunc(TEvBlobDepot::TEvCollectGarbage, Handle); hFunc(TEvTabletPipe::TEvServerConnected, Handle); hFunc(TEvTabletPipe::TEvServerDisconnected, Handle); @@ -157,30 +160,43 @@ namespace NKikimr::NBlobDepot { //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Blocks - class TBlocksManager { - class TImpl; - std::unique_ptr<TImpl> Impl; + class TBlocksManager; + struct TBlocksManagerDeleter { void operator ()(TBlocksManager *object) const; }; + using TBlocksManagerPtr = std::unique_ptr<TBlocksManager, TBlocksManagerDeleter>; + TBlocksManagerPtr BlocksManager; - public: - TBlocksManager(TBlobDepot *self); - ~TBlocksManager(); - void AddBlockOnLoad(ui64 tabletId, ui32 blockedGeneration); - void OnAgentConnect(TAgentInfo& agent); - void OnAgentDisconnect(TAgentInfo& agent); + TBlocksManagerPtr CreateBlocksManager(); - void Handle(TEvBlobDepot::TEvBlock::TPtr ev); - void Handle(TEvBlobDepot::TEvQueryBlocks::TPtr ev); - }; + void AddBlockOnLoad(ui64 tabletId, ui32 blockedGeneration); + + void Handle(TEvBlobDepot::TEvBlock::TPtr ev); + void Handle(TEvBlobDepot::TEvQueryBlocks::TPtr ev); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Garbage collection + + class TGarbageCollectionManager; + struct TGarbageCollectionManagerDeleter { void operator ()(TGarbageCollectionManager *object) const; }; + using TGarbageCollectionManagerPtr = std::unique_ptr<TGarbageCollectionManager, TGarbageCollectionManagerDeleter>; + TGarbageCollectionManagerPtr GarbageCollectionManager; + + TGarbageCollectionManagerPtr CreateGarbageCollectionManager(); - TBlocksManager BlocksManager; + void Handle(TEvBlobDepot::TEvCollectGarbage::TPtr ev); //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Key operation - struct TKeyValue { + struct TDataValue { + TString Meta; + TCGSI Location; + ui32 Checksum; + ui64 TotalDataLen; + EKeepState KeepState; + bool Public; }; - std::map<TString, TKeyValue> Data; + std::map<TString, TDataValue> Data; void Handle(TEvBlobDepot::TEvCommitBlobSeq::TPtr ev); diff --git a/ydb/core/blob_depot/blocks.cpp b/ydb/core/blob_depot/blocks.cpp index 2e36788569e..512c1f1b3ef 100644 --- a/ydb/core/blob_depot/blocks.cpp +++ b/ydb/core/blob_depot/blocks.cpp @@ -3,7 +3,7 @@ namespace NKikimr::NBlobDepot { - class TBlobDepot::TBlocksManager::TImpl { + class TBlobDepot::TBlocksManager { TBlobDepot *Self; THashMap<ui64, ui32> Blocks; @@ -28,8 +28,7 @@ namespace NKikimr::NBlobDepot { {} bool Execute(TTransactionContext& txc, const TActorContext&) override { - TImpl& impl = *Self->BlocksManager.Impl; - const auto [it, inserted] = impl.Blocks.emplace(TabletId, BlockedGeneration); + const auto [it, inserted] = Self->BlocksManager->Blocks.emplace(TabletId, BlockedGeneration); RaceDetected = !inserted && BlockedGeneration <= it->second; if (RaceDetected) { Response->Get<TEvBlobDepot::TEvBlockResult>()->Record.SetStatus(NKikimrProto::RACE); @@ -48,13 +47,13 @@ namespace NKikimr::NBlobDepot { if (RaceDetected) { TActivationContext::Send(Response.release()); } else { - Self->BlocksManager.Impl->OnBlockCommitted(TabletId, BlockedGeneration, std::move(Response)); + Self->BlocksManager->OnBlockCommitted(TabletId, BlockedGeneration, std::move(Response)); } } }; public: - TImpl(TBlobDepot *self) + TBlocksManager(TBlobDepot *self) : Self(self) {} @@ -66,14 +65,6 @@ namespace NKikimr::NBlobDepot { (void)tabletId, (void)blockedGeneration, (void)response; } - void OnAgentConnect(TAgentInfo& agent) { - (void)agent; - } - - void OnAgentDisconnect(TAgentInfo& agent) { - (void)agent; - } - void Handle(TEvBlobDepot::TEvBlock::TPtr ev) { const auto& record = ev->Get()->Record; auto [response, responseRecord] = TEvBlobDepot::MakeResponseFor(ev, Self->SelfId(), NKikimrProto::OK, std::nullopt); @@ -113,31 +104,24 @@ namespace NKikimr::NBlobDepot { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // TBlocksManager wrapper - TBlobDepot::TBlocksManager::TBlocksManager(TBlobDepot *self) - : Impl(std::make_unique<TImpl>(self)) - {} - - TBlobDepot::TBlocksManager::~TBlocksManager() - {} - - void TBlobDepot::TBlocksManager::AddBlockOnLoad(ui64 tabletId, ui32 generation) { - Impl->AddBlockOnLoad(tabletId, generation); + TBlobDepot::TBlocksManagerPtr TBlobDepot::CreateBlocksManager() { + return TBlocksManagerPtr(new TBlocksManager(this)); } - void TBlobDepot::TBlocksManager::OnAgentConnect(TAgentInfo& agent) { - Impl->OnAgentConnect(agent); + void TBlobDepot::TBlocksManagerDeleter::operator ()(TBlocksManager *object) const { + delete object; } - void TBlobDepot::TBlocksManager::OnAgentDisconnect(TAgentInfo& agent) { - Impl->OnAgentDisconnect(agent); + void TBlobDepot::AddBlockOnLoad(ui64 tabletId, ui32 generation) { + BlocksManager->AddBlockOnLoad(tabletId, generation); } - void TBlobDepot::TBlocksManager::Handle(TEvBlobDepot::TEvBlock::TPtr ev) { - return Impl->Handle(ev); + void TBlobDepot::Handle(TEvBlobDepot::TEvBlock::TPtr ev) { + return BlocksManager->Handle(ev); } - void TBlobDepot::TBlocksManager::Handle(TEvBlobDepot::TEvQueryBlocks::TPtr ev) { - return Impl->Handle(ev); + void TBlobDepot::Handle(TEvBlobDepot::TEvQueryBlocks::TPtr ev) { + return BlocksManager->Handle(ev); } } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/events.h b/ydb/core/blob_depot/events.h index b64f893c6cb..8a5e2bf1617 100644 --- a/ydb/core/blob_depot/events.h +++ b/ydb/core/blob_depot/events.h @@ -19,6 +19,8 @@ namespace NKikimr { EvPushNotify, EvQueryBlocks, EvQueryBlocksResult, + EvCollectGarbage, + EvCollectGarbageResult, EvCommitBlobSeq, EvCommitBlobSeqResult, EvResolve, @@ -59,6 +61,8 @@ namespace NKikimr { BLOBDEPOT_EVENT_PB_NO_ARGS(EvPushNotify); BLOBDEPOT_EVENT_PB_NO_ARGS(EvQueryBlocks); BLOBDEPOT_EVENT_PB_NO_ARGS(EvQueryBlocksResult); + BLOBDEPOT_EVENT_PB_NO_ARGS(EvCollectGarbage); + BLOBDEPOT_EVENT_PB(EvCollectGarbageResult, Status, ErrorReason); BLOBDEPOT_EVENT_PB_NO_ARGS(EvCommitBlobSeq); BLOBDEPOT_EVENT_PB_NO_ARGS(EvCommitBlobSeqResult); BLOBDEPOT_EVENT_PB_NO_ARGS(EvResolve); @@ -72,6 +76,7 @@ namespace NKikimr { template<> struct TResponseFor<TEvAllocateIds> { using Type = TEvAllocateIdsResult; }; template<> struct TResponseFor<TEvBlock> { using Type = TEvBlockResult; }; template<> struct TResponseFor<TEvQueryBlocks> { using Type = TEvQueryBlocksResult; }; + template<> struct TResponseFor<TEvCollectGarbage> { using Type = TEvCollectGarbageResult; }; template<> struct TResponseFor<TEvCommitBlobSeq> { using Type = TEvCommitBlobSeqResult; }; template<> struct TResponseFor<TEvResolve> { using Type = TEvResolveResult; }; diff --git a/ydb/core/blob_depot/garbage_collection.cpp b/ydb/core/blob_depot/garbage_collection.cpp new file mode 100644 index 00000000000..2d479f849ca --- /dev/null +++ b/ydb/core/blob_depot/garbage_collection.cpp @@ -0,0 +1,60 @@ +#include "blob_depot_tablet.h" + +namespace NKikimr::NBlobDepot { + + class TBlobDepot::TGarbageCollectionManager { + TBlobDepot *Self; + + public: + TGarbageCollectionManager(TBlobDepot *self) + : Self(self) + {} + + void Handle(TEvBlobDepot::TEvCollectGarbage::TPtr ev) { + std::vector<std::pair<std::map<TString, TDataValue>::iterator, EKeepState>> updates; + + const auto& record = ev->Get()->Record; + auto processFlags = [&](const auto& items, EKeepState state) { + for (const NKikimrProto::TLogoBlobID& item : items) { + const TLogoBlobID id = LogoBlobIDFromLogoBlobID(item); + const TString key(reinterpret_cast<const char*>(id.GetRaw()), 3 * sizeof(ui64)); + if (const auto it = Self->Data.find(key); it == Self->Data.end()) { + if (state == EKeepState::Keep) { + STLOG(PRI_CRIT, BLOB_DEPOT, BDT05, "received Keep on nonexistent blob", + (TabletId, Self->TabletID()), (BlobId, id.ToString())); + return false; // we can't allow Keep on nonexistent blobs + } + } else if (it->second.KeepState < state) { + updates.emplace_back(it, state); + } + } + return true; + }; + + const bool success = processFlags(record.GetKeep(), EKeepState::Keep) && + processFlags(record.GetDoNotKeep(), EKeepState::DoNotKeep); + if (!success) { + auto [response, _] = TEvBlobDepot::MakeResponseFor(ev, Self->SelfId(), NKikimrProto::ERROR, + "missing key for Keep/DoNotKeep items"); + TActivationContext::Send(response.release()); + return; + } + + auto [response, _] = TEvBlobDepot::MakeResponseFor(ev, Self->SelfId(), NKikimrProto::OK, std::nullopt); + TActivationContext::Send(response.release()); + } + }; + + TBlobDepot::TGarbageCollectionManagerPtr TBlobDepot::CreateGarbageCollectionManager() { + return TGarbageCollectionManagerPtr(new TGarbageCollectionManager(this)); + } + + void TBlobDepot::TGarbageCollectionManagerDeleter::operator ()(TGarbageCollectionManager *object) const { + delete object; + } + + void TBlobDepot::Handle(TEvBlobDepot::TEvCollectGarbage::TPtr ev) { + GarbageCollectionManager->Handle(ev); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/op_apply_config.cpp b/ydb/core/blob_depot/op_apply_config.cpp index 9bda0223d06..57e161acea5 100644 --- a/ydb/core/blob_depot/op_apply_config.cpp +++ b/ydb/core/blob_depot/op_apply_config.cpp @@ -4,7 +4,7 @@ namespace NKikimr::NBlobDepot { void TBlobDepot::Handle(TEvBlobDepot::TEvApplyConfig::TPtr ev) { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT01, "TEvApplyConfig", (TabletId, TabletID()), (Msg, ev->Get()->Record)); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT06, "TEvApplyConfig", (TabletId, TabletID()), (Msg, ev->Get()->Record)); class TTxApplyConfig : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { std::unique_ptr<IEventHandle> Response; diff --git a/ydb/core/blob_depot/op_load.cpp b/ydb/core/blob_depot/op_load.cpp index c028378065f..f1343174a46 100644 --- a/ydb/core/blob_depot/op_load.cpp +++ b/ydb/core/blob_depot/op_load.cpp @@ -37,7 +37,7 @@ namespace NKikimr::NBlobDepot { return false; } while (table.IsValid()) { - Self->BlocksManager.AddBlockOnLoad( + Self->AddBlockOnLoad( table.GetValue<Schema::Blocks::TabletId>(), table.GetValue<Schema::Blocks::BlockedGeneration>() ); diff --git a/ydb/core/blob_depot/op_resolve.cpp b/ydb/core/blob_depot/op_resolve.cpp index 39b36197480..94e812a6204 100644 --- a/ydb/core/blob_depot/op_resolve.cpp +++ b/ydb/core/blob_depot/op_resolve.cpp @@ -3,7 +3,7 @@ namespace NKikimr::NBlobDepot { void TBlobDepot::Handle(TEvBlobDepot::TEvResolve::TPtr ev) { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDR01, "TEvResolve", (TabletId, TabletID()), (Msg, ev->Get()->ToString()), + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT07, "TEvResolve", (TabletId, TabletID()), (Msg, ev->Get()->ToString()), (Sender, ev->Sender), (Recipient, ev->Recipient), (Cookie, ev->Cookie)); // collect records if needed @@ -15,7 +15,7 @@ namespace NKikimr::NBlobDepot { response->Record.SetStatus(NKikimrProto::OVERRUN); } - STLOG(PRI_DEBUG, BLOB_DEPOT, BDR02, "sending TEvResolveResult", (TabletId, TabletID()), (Msg, response->Record)); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT08, "sending TEvResolveResult", (TabletId, TabletID()), (Msg, response->Record)); auto handle = std::make_unique<IEventHandle>(ev->Sender, SelfId(), response.release(), 0, ev->Cookie); if (ev->InterconnectSession) { diff --git a/ydb/core/protos/blob_depot.proto b/ydb/core/protos/blob_depot.proto index e6f75da82db..0c9e35f28cc 100644 --- a/ydb/core/protos/blob_depot.proto +++ b/ydb/core/protos/blob_depot.proto @@ -86,6 +86,23 @@ message TEvQueryBlocksResult { optional uint32 TimeToLiveMs = 2; // TTL starting since sending TEvQueryBlocks at agent } +message TEvCollectGarbage { + repeated NKikimrProto.TLogoBlobID Keep = 1; + repeated NKikimrProto.TLogoBlobID DoNotKeep = 2; + optional uint64 TabletId = 3; + optional uint32 Generation = 4; + optional uint32 PerGenerationCounter = 5; + optional uint32 Channel = 6; + optional bool Hard = 7; + optional uint32 CollectGeneration = 8; + optional uint64 CollectStep = 9; +} + +message TEvCollectGarbageResult { + optional NKikimrProto.EReplyStatus Status = 1; + optional string ErrorReason = 2; +} + message TEvCommitBlobSeq { message TItem { optional TBlobLocator BlobLocator = 1; // GroupId and Generation are for validation purposes |