aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexander Rutkovsky <alexander.rutkovsky@gmail.com>2022-07-05 14:20:43 +0300
committerAlexander Rutkovsky <alexander.rutkovsky@gmail.com>2022-07-05 14:20:43 +0300
commitc3fe0cad2a593816dcc77b126e63b772c262e915 (patch)
treea096b0cbc7dd9ff28b83f170e785e2a44effc11e
parentbc3062704271d10240ecd6177321745bdbcb5b9d (diff)
downloadydb-c3fe0cad2a593816dcc77b126e63b772c262e915.tar.gz
BlobDepot work in progress KIKIMR-14867
ref:4c86ac8a78779c6eadb6d26d525d14601a92b7a9
-rw-r--r--ydb/core/base/logoblob.h12
-rw-r--r--ydb/core/blob_depot/CMakeLists.txt2
-rw-r--r--ydb/core/blob_depot/agent.cpp (renamed from ydb/core/blob_depot/blob_depot_agent.cpp)9
-rw-r--r--ydb/core/blob_depot/agent/agent_impl.h9
-rw-r--r--ydb/core/blob_depot/agent/blocks.cpp76
-rw-r--r--ydb/core/blob_depot/agent/comm.cpp8
-rw-r--r--ydb/core/blob_depot/agent/request.cpp1
-rw-r--r--ydb/core/blob_depot/agent/storage_block.cpp2
-rw-r--r--ydb/core/blob_depot/agent/storage_collect_garbage.cpp1
-rw-r--r--ydb/core/blob_depot/agent/storage_discover.cpp2
-rw-r--r--ydb/core/blob_depot/agent/storage_range.cpp133
-rw-r--r--ydb/core/blob_depot/blob_depot_tablet.h5
-rw-r--r--ydb/core/blob_depot/blocks.cpp129
-rw-r--r--ydb/core/blob_depot/data.cpp46
-rw-r--r--ydb/core/blob_depot/events.h9
-rw-r--r--ydb/core/blob_depot/garbage_collection.cpp138
-rw-r--r--ydb/core/blob_depot/op_commit_blob_seq.cpp8
-rw-r--r--ydb/core/blob_depot/schema.h10
-rw-r--r--ydb/core/blobstorage/dsproxy/group_sessions.cpp4
-rw-r--r--ydb/core/blobstorage/dsproxy/group_sessions.h6
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/blob_depot.cpp26
-rw-r--r--ydb/core/mind/bscontroller/load_everything.cpp9
-rw-r--r--ydb/core/protos/blob_depot.proto11
23 files changed, 558 insertions, 98 deletions
diff --git a/ydb/core/base/logoblob.h b/ydb/core/base/logoblob.h
index 36987e7684d..ddcecd3b065 100644
--- a/ydb/core/base/logoblob.h
+++ b/ydb/core/base/logoblob.h
@@ -324,3 +324,15 @@ struct THash<NKikimr::TLogoBlobID> {
return x.Hash();
}
};
+
+template<>
+inline NKikimr::TLogoBlobID Min<NKikimr::TLogoBlobID>() noexcept {
+ return {};
+}
+
+template<>
+inline NKikimr::TLogoBlobID Max<NKikimr::TLogoBlobID>() noexcept {
+ return NKikimr::TLogoBlobID(Max<ui64>(), Max<ui32>(), Max<ui32>(), NKikimr::TLogoBlobID::MaxChannel,
+ NKikimr::TLogoBlobID::MaxBlobSize, NKikimr::TLogoBlobID::MaxCookie, NKikimr::TLogoBlobID::MaxPartId,
+ NKikimr::TLogoBlobID::MaxCrcMode);
+}
diff --git a/ydb/core/blob_depot/CMakeLists.txt b/ydb/core/blob_depot/CMakeLists.txt
index f30810879af..f393be16f28 100644
--- a/ydb/core/blob_depot/CMakeLists.txt
+++ b/ydb/core/blob_depot/CMakeLists.txt
@@ -16,7 +16,7 @@ target_link_libraries(ydb-core-blob_depot PUBLIC
)
target_sources(ydb-core-blob_depot PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blob_depot.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blob_depot_agent.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blocks.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/garbage_collection.cpp
diff --git a/ydb/core/blob_depot/blob_depot_agent.cpp b/ydb/core/blob_depot/agent.cpp
index 9a9a7cbab6e..fd94bdfd15a 100644
--- a/ydb/core/blob_depot/blob_depot_agent.cpp
+++ b/ydb/core/blob_depot/agent.cpp
@@ -82,9 +82,14 @@ namespace NKikimr::NBlobDepot {
const auto it = PipeServerToNode.find(pipeServerId);
Y_VERIFY(it != PipeServerToNode.end());
Y_VERIFY(it->second);
- const auto agentIt = Agents.find(*it->second);
+ TAgentInfo& agent = GetAgent(*it->second);
+ Y_VERIFY(agent.ConnectedAgent == pipeServerId);
+ return agent;
+ }
+
+ TBlobDepot::TAgentInfo& TBlobDepot::GetAgent(ui32 nodeId) {
+ const auto agentIt = Agents.find(nodeId);
Y_VERIFY(agentIt != Agents.end());
- Y_VERIFY(agentIt->second.ConnectedAgent == pipeServerId);
return agentIt->second;
}
diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h
index 7123f0293c6..e6bcc3a6a58 100644
--- a/ydb/core/blob_depot/agent/agent_impl.h
+++ b/ydb/core/blob_depot/agent/agent_impl.h
@@ -98,10 +98,13 @@ namespace NKikimr::NBlobDepot {
hFunc(TEvTabletPipe::TEvClientConnected, Handle);
hFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
+ hFunc(TEvBlobDepot::TEvPushNotify, Handle);
+
hFunc(TEvBlobDepot::TEvRegisterAgentResult, HandleTabletResponse);
hFunc(TEvBlobDepot::TEvAllocateIdsResult, HandleTabletResponse);
hFunc(TEvBlobDepot::TEvBlockResult, HandleTabletResponse);
hFunc(TEvBlobDepot::TEvQueryBlocksResult, HandleTabletResponse);
+ hFunc(TEvBlobDepot::TEvCollectGarbageResult, HandleTabletResponse);
hFunc(TEvBlobDepot::TEvCommitBlobSeqResult, HandleTabletResponse);
hFunc(TEvBlobDepot::TEvResolveResult, HandleTabletResponse);
@@ -188,6 +191,8 @@ namespace NKikimr::NBlobDepot {
void Issue(std::unique_ptr<IEventBase> ev, TRequestSender *sender, TRequestContext::TPtr context);
+ void Handle(TEvBlobDepot::TEvPushNotify::TPtr ev);
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
struct TExecutingQueries {};
@@ -319,7 +324,9 @@ namespace NKikimr::NBlobDepot {
ui32 GetBlockForTablet(ui64 tabletId);
- void SetBlockForTablet(ui64 tabletId, ui32 blockedGeneration, TMonotonic expirationTimestamp);
+ void SetBlockForTablet(ui64 tabletId, ui32 blockedGeneration, TMonotonic issueTimestamp, TDuration timeToLive);
+
+ void OnBlockedTablets(const NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TEvPushNotify::TBlockedTablet>& tablets);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Reading
diff --git a/ydb/core/blob_depot/agent/blocks.cpp b/ydb/core/blob_depot/agent/blocks.cpp
index a812a6e2ab1..a3b070799fd 100644
--- a/ydb/core/blob_depot/agent/blocks.cpp
+++ b/ydb/core/blob_depot/agent/blocks.cpp
@@ -5,14 +5,15 @@ namespace NKikimr::NBlobDepot {
class TBlobDepotAgent::TBlocksManager
: public TRequestSender
{
- struct TBlockInfo {
+ struct TBlock {
ui32 BlockedGeneration;
+ TDuration TimeToLive;
TMonotonic ExpirationTimestamp; // not valid after
bool RefreshInFlight = false;
TIntrusiveList<TQuery, TPendingBlockChecks> PendingBlockChecks;
};
- THashMap<ui64, TBlockInfo> Blocks;
+ THashMap<ui64, TBlock> Blocks;
public:
TBlocksManager(TBlobDepotAgent& agent)
@@ -33,16 +34,18 @@ namespace NKikimr::NBlobDepot {
NKikimrProto::EReplyStatus CheckBlockForTablet(ui64 tabletId, ui32 generation, TQuery *query,
ui32 *blockedGeneration) {
+ NKikimrProto::EReplyStatus status = NKikimrProto::UNKNOWN;
auto& block = Blocks[tabletId];
- const TMonotonic issueTime = TActivationContext::Monotonic();
+ const TMonotonic now = TActivationContext::Monotonic();
if (generation <= block.BlockedGeneration) {
- return NKikimrProto::RACE;
- } else if (issueTime < block.ExpirationTimestamp) {
+ status = NKikimrProto::RACE;
+ } else if (now < block.ExpirationTimestamp) {
if (blockedGeneration) {
*blockedGeneration = block.BlockedGeneration;
}
- return NKikimrProto::OK;
- } else if (!block.RefreshInFlight) {
+ status = NKikimrProto::OK;
+ }
+ if (status != NKikimrProto::RACE && now + block.TimeToLive / 2 >= block.ExpirationTimestamp && !block.RefreshInFlight) {
NKikimrBlobDepot::TEvQueryBlocks queryBlocks;
queryBlocks.AddTabletIds(tabletId);
Agent.Issue(std::move(queryBlocks), this, std::make_shared<TQueryBlockContext>(
@@ -50,14 +53,20 @@ namespace NKikimr::NBlobDepot {
block.RefreshInFlight = true;
block.PendingBlockChecks.PushBack(query);
}
- return NKikimrProto::UNKNOWN;
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA99, "CheckBlockForTablet", (QueryId, query->GetQueryId()),
+ (TabletId, tabletId), (Generation, generation), (Status, status), (Now, now),
+ (ExpirationTimestamp, block.ExpirationTimestamp));
+ return status;
}
void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr context, TResponse response) override {
if (auto *p = std::get_if<TEvBlobDepot::TEvQueryBlocksResult*>(&response)) {
Handle(std::move(context), (*p)->Record);
} else if (std::holds_alternative<TTabletDisconnected>(response)) {
- IssueOnUpdateBlock(context, false);
+ auto& queryBlockContext = context->Obtain<TQueryBlockContext>();
+ if (const auto it = Blocks.find(queryBlockContext.TabletId); it != Blocks.end()) {
+ IssueOnUpdateBlock(it->second, false);
+ }
} else {
Y_FAIL("unexpected response type");
}
@@ -72,19 +81,18 @@ namespace NKikimr::NBlobDepot {
const ui32 newBlockedGeneration = msg.GetBlockedGenerations(0);
Y_VERIFY(block.BlockedGeneration <= newBlockedGeneration);
block.BlockedGeneration = newBlockedGeneration;
- block.ExpirationTimestamp = queryBlockContext.Timestamp + TDuration::MilliSeconds(msg.GetTimeToLiveMs());
- IssueOnUpdateBlock(context, true);
+ block.TimeToLive = TDuration::MilliSeconds(msg.GetTimeToLiveMs());
+ block.ExpirationTimestamp = queryBlockContext.Timestamp + block.TimeToLive;
+ block.RefreshInFlight = false;
+ IssueOnUpdateBlock(block, true);
}
- void IssueOnUpdateBlock(const TRequestContext::TPtr& context, bool success) {
- auto& queryBlockContext = context->Obtain<TQueryBlockContext>();
- auto& block = Blocks[queryBlockContext.TabletId];
- TIntrusiveList<TQuery, TPendingBlockChecks> temp;
- temp.Swap(block.PendingBlockChecks);
- for (auto it = temp.begin(); it != temp.end(); ) {
- const auto current = it++;
- current->OnUpdateBlock(success);
- }
+ void IssueOnUpdateBlock(TBlock& block, bool success) {
+ TIntrusiveList<TQuery, TPendingBlockChecks> pendingBlockChecks;
+ pendingBlockChecks.Append(block.PendingBlockChecks);
+ pendingBlockChecks.ForEach([success](TQuery *query) {
+ query->OnUpdateBlock(success);
+ });
}
ui32 GetBlockForTablet(ui64 tabletId) {
@@ -92,14 +100,22 @@ namespace NKikimr::NBlobDepot {
return it != Blocks.end() ? it->second.BlockedGeneration : 0;
}
- void SetBlockForTablet(ui64 tabletId, ui32 blockedGeneration, TMonotonic expirationTimestamp) {
+ void SetBlockForTablet(ui64 tabletId, ui32 blockedGeneration, TMonotonic timestamp, TDuration timeToLive) {
auto& block = Blocks[tabletId];
Y_VERIFY(block.BlockedGeneration <= blockedGeneration);
- if (block.BlockedGeneration < blockedGeneration) {
- block.BlockedGeneration = blockedGeneration;
- block.ExpirationTimestamp = expirationTimestamp;
- } else if (block.ExpirationTimestamp < expirationTimestamp) {
- block.ExpirationTimestamp = expirationTimestamp;
+ block.BlockedGeneration = blockedGeneration;
+ block.TimeToLive = timeToLive;
+ block.ExpirationTimestamp = timestamp + timeToLive;
+ }
+
+ void OnBlockedTablets(const NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TEvPushNotify::TBlockedTablet>& tablets) {
+ for (const auto& tablet : tablets) {
+ if (const auto it = Blocks.find(tablet.GetTabletId()); it != Blocks.end()) {
+ auto& block = it->second;
+ block.BlockedGeneration = tablet.GetBlockedGeneration();
+ block.ExpirationTimestamp = TMonotonic::Zero();
+ IssueOnUpdateBlock(block, true);
+ }
}
}
};
@@ -123,8 +139,12 @@ namespace NKikimr::NBlobDepot {
return BlocksManager->GetBlockForTablet(tabletId);
}
- void TBlobDepotAgent::SetBlockForTablet(ui64 tabletId, ui32 blockedGeneration, TMonotonic expirationTimestamp) {
- BlocksManager->SetBlockForTablet(tabletId, blockedGeneration, expirationTimestamp);
+ void TBlobDepotAgent::SetBlockForTablet(ui64 tabletId, ui32 blockedGeneration, TMonotonic timestamp, TDuration timeToLive) {
+ BlocksManager->SetBlockForTablet(tabletId, blockedGeneration, timestamp, timeToLive);
+ }
+
+ void TBlobDepotAgent::OnBlockedTablets(const NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TEvPushNotify::TBlockedTablet>& tablets) {
+ BlocksManager->OnBlockedTablets(tablets);
}
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/comm.cpp b/ydb/core/blob_depot/agent/comm.cpp
index 4fbba6f39d0..fcd6c0c7022 100644
--- a/ydb/core/blob_depot/agent/comm.cpp
+++ b/ydb/core/blob_depot/agent/comm.cpp
@@ -137,4 +137,12 @@ namespace NKikimr::NBlobDepot {
RegisterRequest(id, sender, std::move(context), {}, true);
}
+ void TBlobDepotAgent::Handle(TEvBlobDepot::TEvPushNotify::TPtr ev) {
+ auto& msg = ev->Get()->Record;
+ OnBlockedTablets(msg.GetBlockedTablets());
+
+ auto [response, _] = TEvBlobDepot::MakeResponseFor(*ev, SelfId());
+ TActivationContext::Send(response.release());
+ }
+
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/request.cpp b/ydb/core/blob_depot/agent/request.cpp
index 0851d290778..8e73045be96 100644
--- a/ydb/core/blob_depot/agent/request.cpp
+++ b/ydb/core/blob_depot/agent/request.cpp
@@ -67,6 +67,7 @@ namespace NKikimr::NBlobDepot {
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvAllocateIdsResult::TPtr ev);
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvBlockResult::TPtr ev);
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvQueryBlocksResult::TPtr ev);
+ template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvCollectGarbageResult::TPtr ev);
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvCommitBlobSeqResult::TPtr ev);
template void TBlobDepotAgent::HandleTabletResponse(TEvBlobDepot::TEvResolveResult::TPtr ev);
diff --git a/ydb/core/blob_depot/agent/storage_block.cpp b/ydb/core/blob_depot/agent/storage_block.cpp
index 9503615f912..65155971033 100644
--- a/ydb/core/blob_depot/agent/storage_block.cpp
+++ b/ydb/core/blob_depot/agent/storage_block.cpp
@@ -52,7 +52,7 @@ namespace NKikimr::NBlobDepot {
// update blocks cache
auto& blockContext = context->Obtain<TBlockContext>();
auto& query = *Event->Get<TEvBlobStorage::TEvBlock>();
- Agent.SetBlockForTablet(query.TabletId, query.Generation, blockContext.Timestamp +
+ Agent.SetBlockForTablet(query.TabletId, query.Generation, blockContext.Timestamp,
TDuration::MilliSeconds(msg.Record.GetTimeToLiveMs()));
EndWithSuccess(std::make_unique<TEvBlobStorage::TEvBlockResult>(NKikimrProto::OK));
}
diff --git a/ydb/core/blob_depot/agent/storage_collect_garbage.cpp b/ydb/core/blob_depot/agent/storage_collect_garbage.cpp
index fdac21f6ea2..0d29a5272ed 100644
--- a/ydb/core/blob_depot/agent/storage_collect_garbage.cpp
+++ b/ydb/core/blob_depot/agent/storage_collect_garbage.cpp
@@ -23,7 +23,6 @@ namespace NKikimr::NBlobDepot {
NumDoNotKeep = msg.DoNotKeep ? msg.DoNotKeep->size() : 0;
const auto status = Agent.CheckBlockForTablet(msg.TabletId, msg.RecordGeneration, this);
- STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA99, "CheckBlockForTablet", (Status, status));
if (status == NKikimrProto::OK) {
IssueCollectGarbage();
} else if (status != NKikimrProto::UNKNOWN) {
diff --git a/ydb/core/blob_depot/agent/storage_discover.cpp b/ydb/core/blob_depot/agent/storage_discover.cpp
index 4b1e45c74e5..9b5c37b4a7e 100644
--- a/ydb/core/blob_depot/agent/storage_discover.cpp
+++ b/ydb/core/blob_depot/agent/storage_discover.cpp
@@ -60,6 +60,8 @@ namespace NKikimr::NBlobDepot {
void ProcessResponse(ui64 id, TRequestContext::TPtr context, TResponse response) override {
if (std::holds_alternative<TTabletDisconnected>(response)) {
return EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected");
+ } else if (auto *p = std::get_if<TEvBlobStorage::TEvGetResult*>(&response)) {
+ Agent.HandleGetResult(context, **p);
} else if (auto *p = std::get_if<TEvBlobDepot::TEvResolveResult*>(&response)) {
HandleResolveResult(id, std::move(context), **p);
} else {
diff --git a/ydb/core/blob_depot/agent/storage_range.cpp b/ydb/core/blob_depot/agent/storage_range.cpp
index 95049a2401b..57c5de67b9c 100644
--- a/ydb/core/blob_depot/agent/storage_range.cpp
+++ b/ydb/core/blob_depot/agent/storage_range.cpp
@@ -5,16 +5,141 @@ namespace NKikimr::NBlobDepot {
template<>
TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvRange>(std::unique_ptr<IEventHandle> ev) {
class TRangeQuery : public TQuery {
+ std::unique_ptr<TEvBlobStorage::TEvRangeResult> Response;
+ ui32 ReadsInFlight = 0;
+ ui32 ResolvesInFlight = 0;
+
+ struct TExtraResolveContext : TRequestContext {
+ const size_t Index;
+
+ TExtraResolveContext(size_t index)
+ : Index(index)
+ {}
+ };
+
public:
using TQuery::TQuery;
void Initiate() override {
- EndWithError(NKikimrProto::ERROR, "not implemented");
+ auto& msg = *Event->Get<TEvBlobStorage::TEvRange>();
+ Response = std::make_unique<TEvBlobStorage::TEvRangeResult>(NKikimrProto::OK, msg.From, msg.To,
+ Agent.VirtualGroupId);
+
+ IssueResolve();
+ }
+
+ void IssueResolve() {
+ auto& msg = *Event->Get<TEvBlobStorage::TEvRange>();
+
+ TStringBuf from = msg.From.AsBinaryString();
+ TStringBuf to = msg.To.AsBinaryString();
+ const bool reverse = msg.To < msg.From;
+ if (reverse) {
+ std::swap(from, to);
+ }
+
+ NKikimrBlobDepot::TEvResolve resolve;
+ auto *item = resolve.AddItems();
+ item->SetBeginningKey(from.data(), from.size());
+ item->SetIncludeBeginning(true);
+ item->SetEndingKey(to.data(), to.size());
+ item->SetIncludeEnding(true);
+ item->SetReverse(reverse);
+
+ Agent.Issue(std::move(resolve), this, nullptr);
+ ++ResolvesInFlight;
+ }
+
+ void IssueResolve(TLogoBlobID id, size_t index) {
+ NKikimrBlobDepot::TEvResolve resolve;
+ auto *item = resolve.AddItems();
+ item->SetBeginningKey(id.GetRaw(), 3 * sizeof(ui64));
+ item->SetIncludeBeginning(true);
+ item->SetEndingKey(id.GetRaw(), 3 * sizeof(ui64));
+ item->SetIncludeEnding(true);
+
+ Agent.Issue(std::move(resolve), this, std::make_shared<TExtraResolveContext>(index));
+ ++ResolvesInFlight;
+ }
+
+ void ProcessResponse(ui64 id, TRequestContext::TPtr context, TResponse response) override {
+ if (auto *p = std::get_if<TEvBlobDepot::TEvResolveResult*>(&response)) {
+ HandleResolveResult(id, std::move(context), (*p)->Record);
+ } else if (auto *p = std::get_if<TEvBlobStorage::TEvGetResult*>(&response)) {
+ Agent.HandleGetResult(context, **p);
+ } else if (std::holds_alternative<TTabletDisconnected>(response)) {
+ EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected");
+ } else {
+ Y_FAIL();
+ }
}
- void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr /*context*/, TResponse response) override {
- (void)response;
- Y_FAIL();
+ void HandleResolveResult(ui64 id, TRequestContext::TPtr context, NKikimrBlobDepot::TEvResolveResult& msg) {
+ auto& query = *Event->Get<TEvBlobStorage::TEvRange>();
+
+ --ResolvesInFlight;
+
+ if (msg.GetStatus() != NKikimrProto::OK && msg.GetStatus() != NKikimrProto::OVERRUN) {
+ return EndWithError(msg.GetStatus(), msg.GetErrorReason());
+ }
+
+ for (const auto& key : msg.GetResolvedKeys()) {
+ const TString& blobId = key.GetKey();
+ Y_VERIFY(blobId.size() == 3 * sizeof(ui64));
+ TLogoBlobID id(reinterpret_cast<const ui64*>(blobId.data()));
+
+ const size_t index = context
+ ? context->Obtain<TExtraResolveContext>().Index
+ : Response->Responses.size();
+ if (!context) {
+ Response->Responses.emplace_back(id, TString());
+ }
+
+ if (!query.IsIndexOnly) {
+ TString error;
+ if (!Agent.IssueRead(key.GetValueChain(), 0, 0, NKikimrBlobStorage::EGetHandleClass::FastRead,
+ query.MustRestoreFirst, this, index, true, &error)) {
+ return EndWithError(NKikimrProto::ERROR, TStringBuilder() << "failed to read discovered blob: "
+ << error);
+ }
+ ++ReadsInFlight;
+ } else if (query.MustRestoreFirst) {
+ Y_FAIL("not implemented yet");
+ }
+ }
+
+ if (msg.GetStatus() == NKikimrProto::OVERRUN) {
+ Agent.RegisterRequest(id, this, std::move(context), {}, true);
+ } else if (msg.GetStatus() == NKikimrProto::OK) {
+ if (!ReadsInFlight && !ResolvesInFlight) {
+ EndWithSuccess(std::move(Response));
+ }
+ } else {
+ Y_UNREACHABLE();
+ }
+ }
+
+ void OnRead(ui64 tag, NKikimrProto::EReplyStatus status, TString dataOrErrorReason) override {
+ auto& item = Response->Responses[tag];
+ --ReadsInFlight;
+
+ switch (status) {
+ case NKikimrProto::OK:
+ item.Buffer = std::move(dataOrErrorReason);
+ break;
+
+ case NKikimrProto::NODATA:
+ IssueResolve(item.Id, tag);
+ break;
+
+ default:
+ return EndWithError(status, TStringBuilder() << "failed to retrieve BlobId# "
+ << item.Id << " Error# " << dataOrErrorReason);
+ }
+
+ if (!ReadsInFlight && !ResolvesInFlight) {
+ EndWithSuccess(std::move(Response));
+ }
}
};
diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h
index e27b820ed13..ead3fca02b0 100644
--- a/ydb/core/blob_depot/blob_depot_tablet.h
+++ b/ydb/core/blob_depot/blob_depot_tablet.h
@@ -62,6 +62,7 @@ namespace NKikimr::NBlobDepot {
void OnAgentConnect(TAgentInfo& agent);
void Handle(TEvBlobDepot::TEvAllocateIds::TPtr ev);
TAgentInfo& GetAgent(const TActorId& pipeServerId);
+ TAgentInfo& GetAgent(ui32 nodeId);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -183,6 +184,8 @@ namespace NKikimr::NBlobDepot {
void Handle(TEvBlobDepot::TEvCollectGarbage::TPtr ev);
+ bool CheckBlobForBarrier(TLogoBlobID id) const;
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Data operations
@@ -215,6 +218,8 @@ namespace NKikimr::NBlobDepot {
void DeleteKeys(const std::vector<TString>& keysToDelete);
void PutKey(TString key, TDataValue&& data);
void AddDataOnLoad(TString key, TString value);
+ std::optional<TString> UpdatesKeepState(TStringBuf key, NKikimrBlobDepot::EKeepState keepState);
+ void UpdateKeepState(const std::vector<std::pair<TString, NKikimrBlobDepot::EKeepState>>& data);
void Handle(TEvBlobDepot::TEvCommitBlobSeq::TPtr ev);
void Handle(TEvBlobDepot::TEvResolve::TPtr ev);
diff --git a/ydb/core/blob_depot/blocks.cpp b/ydb/core/blob_depot/blocks.cpp
index 2d0377db14a..5fc4241f208 100644
--- a/ydb/core/blob_depot/blocks.cpp
+++ b/ydb/core/blob_depot/blocks.cpp
@@ -4,8 +4,23 @@
namespace NKikimr::NBlobDepot {
class TBlobDepot::TBlocksManager {
+ // wait duration before issuing blocks via storage
+ static constexpr TDuration AgentsWaitTime = TDuration::Seconds(1);
+
+ // TTL for block lease
+ static constexpr TDuration BlockLeaseTime = TDuration::Seconds(60);
+
+ struct TBlock {
+ struct TPerAgentInfo {
+ TMonotonic ExpirationTimestamp = TMonotonic::Zero();
+ };
+
+ ui32 BlockedGeneration = 0;
+ THashMap<ui32, TPerAgentInfo> PerAgentInfo;
+ };
+
TBlobDepot* const Self;
- THashMap<ui64, ui32> Blocks;
+ THashMap<ui64, TBlock> Blocks;
private:
class TTxUpdateBlock : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
@@ -14,7 +29,6 @@ namespace NKikimr::NBlobDepot {
const ui32 NodeId;
const TInstant Timestamp;
std::unique_ptr<IEventHandle> Response;
- bool RaceDetected;
public:
TTxUpdateBlock(TBlobDepot *self, ui64 tabletId, ui32 blockedGeneration, ui32 nodeId, TInstant timestamp,
@@ -28,9 +42,8 @@ namespace NKikimr::NBlobDepot {
{}
bool Execute(TTransactionContext& txc, const TActorContext&) override {
- const auto [it, inserted] = Self->BlocksManager->Blocks.emplace(TabletId, BlockedGeneration);
- RaceDetected = !inserted && BlockedGeneration <= it->second;
- if (RaceDetected) {
+ auto& block = Self->BlocksManager->Blocks[TabletId];
+ if (BlockedGeneration <= block.BlockedGeneration) {
Response->Get<TEvBlobDepot::TEvBlockResult>()->Record.SetStatus(NKikimrProto::ALREADY);
} else {
NIceDb::TNiceDb db(txc.DB);
@@ -44,10 +57,15 @@ namespace NKikimr::NBlobDepot {
}
void Complete(const TActorContext&) override {
- if (RaceDetected) {
+ if (Response->Get<TEvBlobDepot::TEvBlockResult>()->Record.GetStatus() != NKikimrProto::OK) {
TActivationContext::Send(Response.release());
} else {
- Self->BlocksManager->OnBlockCommitted(TabletId, BlockedGeneration, std::move(Response));
+ // update block value in memory
+ auto& block = Self->BlocksManager->Blocks[TabletId];
+ Y_VERIFY(block.BlockedGeneration < BlockedGeneration);
+ block.BlockedGeneration = BlockedGeneration;
+
+ Self->BlocksManager->OnBlockCommitted(TabletId, BlockedGeneration, NodeId, std::move(Response));
}
}
};
@@ -56,21 +74,74 @@ namespace NKikimr::NBlobDepot {
TBlobDepot* const Self;
const ui64 TabletId;
const ui32 BlockedGeneration;
+ const ui32 NodeId;
std::unique_ptr<IEventHandle> Response;
ui32 BlocksPending = 0;
ui32 RetryCount = 0;
const ui64 IssuerGuid = RandomNumber<ui64>() | 1;
+ THashSet<ui32> NodesWaitingForPushResult;
public:
- TBlockProcessorActor(TBlobDepot *self, ui64 tabletId, ui32 blockedGeneration,
+ TBlockProcessorActor(TBlobDepot *self, ui64 tabletId, ui32 blockedGeneration, ui32 nodeId,
std::unique_ptr<IEventHandle> response)
: Self(self)
, TabletId(tabletId)
, BlockedGeneration(blockedGeneration)
+ , NodeId(nodeId)
, Response(std::move(response))
{}
void Bootstrap() {
+ IssueNotificationsToAgents();
+ Become(&TThis::StateFunc, AgentsWaitTime, new TEvents::TEvWakeup);
+ }
+
+ void IssueNotificationsToAgents() {
+ const TMonotonic now = TActivationContext::Monotonic();
+ auto& block = Self->BlocksManager->Blocks[TabletId];
+ for (const auto& [agentId, info] : block.PerAgentInfo) {
+ if (agentId == NodeId) {
+ // skip the origin agent
+ continue;
+ }
+ if (info.ExpirationTimestamp <= now) {
+ SendPushToAgent(agentId);
+ }
+ }
+ }
+
+ void SendPushToAgent(ui32 agentId) {
+ auto ev = std::make_unique<TEvBlobDepot::TEvPushNotify>();
+ auto *item = ev->Record.AddBlockedTablets();
+ item->SetTabletId(TabletId);
+ item->SetBlockedGeneration(BlockedGeneration);
+
+ TAgentInfo& agent = Self->GetAgent(agentId);
+ if (const auto& actorId = agent.ConnectedAgent) {
+ Send(*actorId, ev.release(), IEventHandle::FlagTrackDelivery, IssuerGuid);
+ }
+ NodesWaitingForPushResult.insert(agentId);
+ }
+
+ void Handle(TEvBlobDepot::TEvPushNotifyResult::TPtr ev) {
+ const ui32 agentId = ev->Sender.NodeId();
+ const size_t numErased = NodesWaitingForPushResult.erase(agentId);
+ Y_VERIFY(numErased == 1 && ev->Cookie == IssuerGuid);
+
+ // mark lease as successfully revoked one
+ auto& block = Self->BlocksManager->Blocks[TabletId];
+ block.PerAgentInfo.erase(agentId);
+
+ if (NodesWaitingForPushResult.empty()) {
+ Finish();
+ }
+ }
+
+ void Handle(TEvents::TEvUndelivered::TPtr /*ev*/) {
+ // can't reach an agent to notify it about blocked generation change -- we can't do anything here
+ }
+
+ void IssueBlocksToStorage() {
TTabletStorageInfo *info = Self->Info();
for (const auto& [_, kind] : Self->ChannelKinds) {
for (const ui8 channel : kind.IndexToChannel) {
@@ -80,8 +151,6 @@ namespace NKikimr::NBlobDepot {
RetryCount += 2;
}
}
-
- Become(&TThis::StateFunc);
}
void SendBlock(ui32 groupId) {
@@ -93,8 +162,7 @@ namespace NKikimr::NBlobDepot {
switch (ev->Get()->Status) {
case NKikimrProto::OK:
if (!--BlocksPending) {
- TActivationContext::Send(Response.release());
- PassAway();
+ Finish();
}
break;
@@ -108,6 +176,7 @@ namespace NKikimr::NBlobDepot {
auto& r = Response->Get<TEvBlobDepot::TEvBlockResult>()->Record;
r.SetStatus(NKikimrProto::ERROR);
r.SetErrorReason(ev->Get()->ErrorReason);
+ Finish();
} else {
SendBlock(ev->Cookie);
}
@@ -115,8 +184,16 @@ namespace NKikimr::NBlobDepot {
}
}
+ void Finish() {
+ TActivationContext::Send(Response.release());
+ PassAway();
+ }
+
STRICT_STFUNC(StateFunc,
hFunc(TEvBlobStorage::TEvBlockResult, Handle);
+ hFunc(TEvBlobDepot::TEvPushNotifyResult, Handle);
+ hFunc(TEvents::TEvUndelivered, Handle);
+ cFunc(TEvents::TSystem::Wakeup, IssueBlocksToStorage);
cFunc(TEvents::TSystem::Poison, PassAway);
)
};
@@ -127,17 +204,18 @@ namespace NKikimr::NBlobDepot {
{}
void AddBlockOnLoad(ui64 tabletId, ui32 generation) {
- Blocks.emplace(tabletId, generation);
+ Blocks[tabletId].BlockedGeneration = generation;
}
- void OnBlockCommitted(ui64 tabletId, ui32 blockedGeneration,
- std::unique_ptr<IEventHandle> response) {
- Self->RegisterWithSameMailbox(new TBlockProcessorActor(Self, tabletId, blockedGeneration, std::move(response)));
+ void OnBlockCommitted(ui64 tabletId, ui32 blockedGeneration, ui32 nodeId, std::unique_ptr<IEventHandle> response) {
+ Self->RegisterWithSameMailbox(new TBlockProcessorActor(Self, tabletId, blockedGeneration, nodeId,
+ std::move(response)));
}
void Handle(TEvBlobDepot::TEvBlock::TPtr ev) {
const auto& record = ev->Get()->Record;
- auto [response, responseRecord] = TEvBlobDepot::MakeResponseFor(*ev, Self->SelfId(), NKikimrProto::OK, std::nullopt);
+ auto [response, responseRecord] = TEvBlobDepot::MakeResponseFor(*ev, Self->SelfId(), NKikimrProto::OK,
+ std::nullopt, BlockLeaseTime.MilliSeconds());
if (!record.HasTabletId() || !record.HasBlockedGeneration()) {
responseRecord->SetStatus(NKikimrProto::ERROR);
@@ -145,8 +223,8 @@ namespace NKikimr::NBlobDepot {
} else {
const ui64 tabletId = record.GetTabletId();
const ui32 blockedGeneration = record.GetBlockedGeneration();
- if (const auto it = Blocks.find(tabletId); it != Blocks.end() && blockedGeneration <= it->second) {
- responseRecord->SetStatus(NKikimrProto::ERROR);
+ if (const auto it = Blocks.find(tabletId); it != Blocks.end() && blockedGeneration <= it->second.BlockedGeneration) {
+ responseRecord->SetStatus(NKikimrProto::ALREADY);
} else {
TAgentInfo& agent = Self->GetAgent(ev->Recipient);
Self->Execute(std::make_unique<TTxUpdateBlock>(Self, tabletId, blockedGeneration,
@@ -158,13 +236,20 @@ namespace NKikimr::NBlobDepot {
}
void Handle(TEvBlobDepot::TEvQueryBlocks::TPtr ev) {
+ TAgentInfo& agent = Self->GetAgent(ev->Recipient);
+ const ui32 agentId = agent.ConnectedNodeId;
+ Y_VERIFY(agentId);
+
+ const TMonotonic now = TActivationContext::Monotonic();
+
const auto& record = ev->Get()->Record;
auto [response, responseRecord] = TEvBlobDepot::MakeResponseFor(*ev, Self->SelfId());
- responseRecord->SetTimeToLiveMs(15000); // FIXME
+ responseRecord->SetTimeToLiveMs(BlockLeaseTime.MilliSeconds());
for (const ui64 tabletId : record.GetTabletIds()) {
- const auto it = Blocks.find(tabletId);
- responseRecord->AddBlockedGenerations(it != Blocks.end() ? it->second : 0);
+ auto& block = Blocks[tabletId];
+ responseRecord->AddBlockedGenerations(block.BlockedGeneration);
+ block.PerAgentInfo[agentId].ExpirationTimestamp = now + BlockLeaseTime;
}
TActivationContext::Send(response.release());
diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp
index 688c3144d8f..6f74c9b27b1 100644
--- a/ydb/core/blob_depot/data.cpp
+++ b/ydb/core/blob_depot/data.cpp
@@ -14,6 +14,7 @@ namespace NKikimr::NBlobDepot {
};
std::map<TString, TDataValue, TCompareKey> Data;
+ std::set<TLogoBlobID> DataBlobIds;
public:
TDataManager(TBlobDepot *self)
@@ -75,6 +76,43 @@ namespace NKikimr::NBlobDepot {
.Public = proto.GetPublic(),
});
}
+
+ std::optional<TString> UpdatesKeepState(TStringBuf key, NKikimrBlobDepot::EKeepState keepState) {
+ if (const auto it = Data.find(key); it != Data.end()) {
+ TDataValue value = it->second;
+ value.KeepState = keepState;
+ return ToValueProto(value);
+ } else {
+ return std::nullopt;
+ }
+ }
+
+ void UpdateKeepState(const std::vector<std::pair<TString, NKikimrBlobDepot::EKeepState>>& data) {
+ for (const auto& [key, keepState] : data) {
+ auto& value = Data[std::move(key)];
+ Y_VERIFY_DEBUG(value.KeepState < keepState);
+ value.KeepState = keepState;
+ }
+ }
+
+ static TString ToValueProto(const TDataValue& value) {
+ NKikimrBlobDepot::TValue proto;
+ if (value.Meta) {
+ proto.SetMeta(value.Meta);
+ }
+ proto.MutableValueChain()->CopyFrom(value.ValueChain);
+ if (proto.GetKeepState() != value.KeepState) {
+ proto.SetKeepState(value.KeepState);
+ }
+ if (proto.GetPublic() != value.Public) {
+ proto.SetPublic(value.Public);
+ }
+
+ TString s;
+ const bool success = proto.SerializeToString(&s);
+ Y_VERIFY(success);
+ return s;
+ }
};
TBlobDepot::TDataManagerPtr TBlobDepot::CreateDataManager() {
@@ -102,4 +140,12 @@ namespace NKikimr::NBlobDepot {
DataManager->AddDataOnLoad(std::move(key), std::move(value));
}
+ std::optional<TString> TBlobDepot::UpdatesKeepState(TStringBuf key, NKikimrBlobDepot::EKeepState keepState) {
+ return DataManager->UpdatesKeepState(key, keepState);
+ }
+
+ void TBlobDepot::UpdateKeepState(const std::vector<std::pair<TString, NKikimrBlobDepot::EKeepState>>& data) {
+ DataManager->UpdateKeepState(data);
+ }
+
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/events.h b/ydb/core/blob_depot/events.h
index 10edf3e4545..01212d9e5fa 100644
--- a/ydb/core/blob_depot/events.h
+++ b/ydb/core/blob_depot/events.h
@@ -14,9 +14,10 @@ namespace NKikimr {
EvRegisterAgentResult,
EvAllocateIds,
EvAllocateIdsResult,
+ EvPushNotify,
+ EvPushNotifyResult,
EvBlock,
EvBlockResult,
- EvPushNotify,
EvQueryBlocks,
EvQueryBlocksResult,
EvCollectGarbage,
@@ -56,9 +57,10 @@ namespace NKikimr {
BLOBDEPOT_EVENT_PB_NO_ARGS(EvRegisterAgentResult);
BLOBDEPOT_EVENT_PB(EvAllocateIds, ChannelKind);
BLOBDEPOT_EVENT_PB(EvAllocateIdsResult, ChannelKind, Generation);
- BLOBDEPOT_EVENT_PB(EvBlock, TabletId, BlockedGeneration);
- BLOBDEPOT_EVENT_PB(EvBlockResult, Status, ErrorReason);
BLOBDEPOT_EVENT_PB_NO_ARGS(EvPushNotify);
+ BLOBDEPOT_EVENT_PB_NO_ARGS(EvPushNotifyResult);
+ BLOBDEPOT_EVENT_PB(EvBlock, TabletId, BlockedGeneration);
+ BLOBDEPOT_EVENT_PB(EvBlockResult, Status, ErrorReason, TimeToLiveMs);
BLOBDEPOT_EVENT_PB_NO_ARGS(EvQueryBlocks);
BLOBDEPOT_EVENT_PB_NO_ARGS(EvQueryBlocksResult);
BLOBDEPOT_EVENT_PB_NO_ARGS(EvCollectGarbage);
@@ -74,6 +76,7 @@ namespace NKikimr {
template<> struct TResponseFor<TEvApplyConfig> { using Type = TEvApplyConfigResult; };
template<> struct TResponseFor<TEvRegisterAgent> { using Type = TEvRegisterAgentResult; };
template<> struct TResponseFor<TEvAllocateIds> { using Type = TEvAllocateIdsResult; };
+ template<> struct TResponseFor<TEvPushNotify> { using Type = TEvPushNotifyResult; };
template<> struct TResponseFor<TEvBlock> { using Type = TEvBlockResult; };
template<> struct TResponseFor<TEvQueryBlocks> { using Type = TEvQueryBlocksResult; };
template<> struct TResponseFor<TEvCollectGarbage> { using Type = TEvCollectGarbageResult; };
diff --git a/ydb/core/blob_depot/garbage_collection.cpp b/ydb/core/blob_depot/garbage_collection.cpp
index 06e2d627236..2076729c162 100644
--- a/ydb/core/blob_depot/garbage_collection.cpp
+++ b/ydb/core/blob_depot/garbage_collection.cpp
@@ -3,16 +3,33 @@
namespace NKikimr::NBlobDepot {
+ namespace {
+ static ui64 GenStep(ui32 gen, ui32 step) {
+ return static_cast<ui64>(gen) << 32 | step;
+ }
+ }
+
class TBlobDepot::TGarbageCollectionManager {
- TBlobDepot *Self;
+ TBlobDepot* const Self;
+
+ struct TBarrier {
+ ui64 LastRecordGenStep = 0;
+ ui64 Soft = 0;
+ ui64 Hard = 0;
+ };
+
+ THashMap<std::pair<ui64, ui8>, TBarrier> Barriers;
private:
class TTxCollectGarbage : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
+ std::optional<TString> Error;
+
std::unique_ptr<TEvBlobDepot::TEvCollectGarbage::THandle> Request;
- ui32 KeepIndex = 0;
- ui32 DoNotKeepIndex = 0;
+ int KeepIndex = 0;
+ int DoNotKeepIndex = 0;
ui32 NumKeysProcessed = 0;
std::vector<TString> KeysToDelete;
+ std::vector<std::pair<TString, NKikimrBlobDepot::EKeepState>> KeepStateUpdates;
bool Done = false;
static constexpr ui32 MaxKeysToProcessAtOnce = 10'000;
@@ -27,41 +44,82 @@ namespace NKikimr::NBlobDepot {
{}
bool Execute(TTransactionContext& txc, const TActorContext&) override {
- Done = ProcessKeepFlags(txc) && ProcessDoNotKeepFlags(txc) && ApplyBarrier(txc);
+ Validate();
+ if (!Error) {
+ auto& record = Request->Get()->Record;
+ Done = ProcessFlags(txc, KeepIndex, record.GetKeep(), NKikimrBlobDepot::EKeepState::Keep)
+ && ProcessFlags(txc, DoNotKeepIndex, record.GetDoNotKeep(), NKikimrBlobDepot::EKeepState::DoNotKeep)
+ && ProcessBarrier(txc);
+ }
return true;
}
void Complete(const TActorContext&) override {
Self->DeleteKeys(KeysToDelete);
- if (Done) {
- auto [response, _] = TEvBlobDepot::MakeResponseFor(*Request, Self->SelfId(), NKikimrProto::OK, std::nullopt);
+ Self->UpdateKeepState(KeepStateUpdates);
+
+ if (Done || Error) {
+ if (Done) {
+ ApplyBarrier();
+ }
+ auto [response, _] = TEvBlobDepot::MakeResponseFor(*Request, Self->SelfId(),
+ Error ? NKikimrProto::ERROR : NKikimrProto::OK, std::move(Error));
TActivationContext::Send(response.release());
} else {
Self->Execute(std::make_unique<TTxCollectGarbage>(Self, std::move(Request), KeepIndex, DoNotKeepIndex));
}
}
- bool ProcessKeepFlags(TTransactionContext& /*txc*/) {
- const auto& record = Request->Get()->Record;
- while (KeepIndex < record.KeepSize() && NumKeysProcessed < MaxKeysToProcessAtOnce) {
- ++KeepIndex;
- ++NumKeysProcessed;
+ void Validate() {
+ // validate the command first
+ auto& record = Request->Get()->Record;
+ if (record.HasCollectGeneration() && record.HasCollectStep()) {
+ if (!record.HasTabletId() || !record.HasChannel() || record.GetChannel() > TLogoBlobID::MaxChannel) {
+ Error = "TabletId/Channel are either not set or invalid";
+ } else if (!record.HasGeneration() || !record.HasPerGenerationCounter()) {
+ Error = "Generation/PerGenerationCounter are not set";
+ } else {
+ const auto key = std::make_pair(record.GetTabletId(), record.GetChannel());
+ auto& barriers = Self->GarbageCollectionManager->Barriers;
+ if (const auto it = barriers.find(key); it != barriers.end()) {
+ auto& barrier = it->second;
+ const ui64 recordGenStep = GenStep(record.GetGeneration(), record.GetPerGenerationCounter());
+ const ui64 collectGenStep = GenStep(record.GetCollectGeneration(), record.GetCollectStep());
+ ui64& currentGenStep = record.GetHard() ? barrier.Hard : barrier.Soft;
+ if (recordGenStep < barrier.LastRecordGenStep) {
+ Error = "record generation:counter is obsolete";
+ } else if (recordGenStep == barrier.LastRecordGenStep) {
+ if (currentGenStep != collectGenStep) {
+ Error = "repeated command with different collect parameters received";
+ }
+ } else if (collectGenStep < currentGenStep) {
+ Error = "decreasing barrier";
+ }
+ }
+ }
+ } else if (record.HasCollectGeneration() || record.HasCollectStep()) {
+ Error = "CollectGeneration/CollectStep set incorrectly";
}
-
- return KeepIndex == record.KeepSize();
}
- bool ProcessDoNotKeepFlags(TTransactionContext& /*txc*/) {
- const auto& record = Request->Get()->Record;
- while (DoNotKeepIndex < record.DoNotKeepSize() && NumKeysProcessed < MaxKeysToProcessAtOnce) {
- ++DoNotKeepIndex;
- ++NumKeysProcessed;
+ bool ProcessFlags(TTransactionContext& txc, int& index,
+ const NProtoBuf::RepeatedPtrField<NKikimrProto::TLogoBlobID>& items,
+ NKikimrBlobDepot::EKeepState state) {
+ NIceDb::TNiceDb db(txc.DB);
+ for (; index < items.size() && NumKeysProcessed < MaxKeysToProcessAtOnce; ++index) {
+ const auto id = LogoBlobIDFromLogoBlobID(items[index]);
+ const TStringBuf key = id.AsBinaryString();
+ if (const auto& value = Self->UpdatesKeepState(key, state)) {
+ KeepStateUpdates.emplace_back(key, state);
+ db.Table<Schema::Data>().Key(TString(key)).Update<Schema::Data::Value>(*value);
+ ++NumKeysProcessed;
+ }
}
- return DoNotKeepIndex == record.DoNotKeepSize();
+ return index == items.size();
}
- bool ApplyBarrier(TTransactionContext& txc) {
+ bool ProcessBarrier(TTransactionContext& txc) {
NIceDb::TNiceDb db(txc.DB);
const auto& record = Request->Get()->Record;
@@ -84,10 +142,39 @@ namespace NKikimr::NBlobDepot {
Self->ScanRange(first.AsBinaryString(), last.AsBinaryString(), EScanFlags::INCLUDE_BEGIN | EScanFlags::INCLUDE_END,
processKey);
+
+ if (NumKeysProcessed == MaxKeysToProcessAtOnce) {
+ return false;
+ }
+
+ auto row = db.Table<Schema::Barriers>().Key(record.GetTabletId(), record.GetChannel());
+ const ui64 collectGenStep = GenStep(record.GetCollectGeneration(), record.GetCollectStep());
+ if (record.GetHard()) {
+ row.Update<Schema::Barriers::Hard>(collectGenStep);
+ } else {
+ row.Update<Schema::Barriers::Soft>(collectGenStep);
+ }
+ row.Update<Schema::Barriers::LastRecordGenStep>(GenStep(record.GetGeneration(), record.GetPerGenerationCounter()));
}
return true;
}
+
+ void ApplyBarrier() {
+ const auto& record = Request->Get()->Record;
+ if (record.HasCollectGeneration() && record.HasCollectStep()) {
+ const auto key = std::make_pair(record.GetTabletId(), record.GetChannel());
+ auto& barriers = Self->GarbageCollectionManager->Barriers;
+ auto& barrier = barriers[key];
+ const ui64 recordGenStep = GenStep(record.GetGeneration(), record.GetPerGenerationCounter());
+ const ui64 collectGenStep = GenStep(record.GetCollectGeneration(), record.GetCollectStep());
+ ui64& currentGenStep = record.GetHard() ? barrier.Hard : barrier.Soft;
+ Y_VERIFY(barrier.LastRecordGenStep <= recordGenStep);
+ barrier.LastRecordGenStep = recordGenStep;
+ Y_VERIFY(currentGenStep <= collectGenStep);
+ currentGenStep = collectGenStep;
+ }
+ }
};
public:
@@ -99,6 +186,13 @@ namespace NKikimr::NBlobDepot {
std::unique_ptr<TEvBlobDepot::TEvCollectGarbage::THandle> uniq(ev.Release());
Self->Execute(std::make_unique<TTxCollectGarbage>(Self, std::move(uniq)));
}
+
+ bool CheckBlobForBarrier(TLogoBlobID id) const {
+ const auto key = std::make_pair(id.TabletID(), id.Channel());
+ const auto it = Barriers.find(key);
+ const ui64 genstep = static_cast<ui64>(id.Generation()) << 32 | id.Step();
+ return it == Barriers.end() || genstep > Max(it->second.Soft, it->second.Hard);
+ }
};
TBlobDepot::TGarbageCollectionManagerPtr TBlobDepot::CreateGarbageCollectionManager() {
@@ -109,4 +203,8 @@ namespace NKikimr::NBlobDepot {
GarbageCollectionManager->Handle(ev);
}
+ bool TBlobDepot::CheckBlobForBarrier(TLogoBlobID id) const {
+ return GarbageCollectionManager->CheckBlobForBarrier(id);
+ }
+
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp
index ef6eb40bd69..318883bd169 100644
--- a/ydb/core/blob_depot/op_commit_blob_seq.cpp
+++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp
@@ -29,6 +29,14 @@ namespace NKikimr::NBlobDepot {
const bool success = value.SerializeToString(&valueData);
Y_VERIFY(success);
+ const TString& key = item.GetKey();
+ if (key.size() == 3 * sizeof(ui64)) {
+ const TLogoBlobID id(reinterpret_cast<const ui64*>(key.data()));
+ if (!Self->CheckBlobForBarrier(id)) {
+ continue; // FIXME: report error somehow (?)
+ }
+ }
+
db.Table<Schema::Data>().Key(item.GetKey()).Update<Schema::Data::Value>(valueData);
UpdateQ.emplace_back(item.GetKey(), std::move(value));
}
diff --git a/ydb/core/blob_depot/schema.h b/ydb/core/blob_depot/schema.h
index 80d9c948dd6..f5b2f123bb2 100644
--- a/ydb/core/blob_depot/schema.h
+++ b/ydb/core/blob_depot/schema.h
@@ -38,15 +38,17 @@ namespace NKikimr::NBlobDepot {
struct Barriers : Table<3> {
struct TabletId : Column<1, NScheme::NTypeIds::Uint64> {};
struct Channel : Column<2, NScheme::NTypeIds::Uint8> {};
- struct SoftGenStep : Column<3, NScheme::NTypeIds::Uint64> {};
- struct HardGenStep : Column<4, NScheme::NTypeIds::Uint64> {};
+ struct LastRecordGenStep : Column<3, NScheme::NTypeIds::Uint64> {};
+ struct Soft : Column<4, NScheme::NTypeIds::Uint64> {};
+ struct Hard : Column<5, NScheme::NTypeIds::Uint64> {};
using TKey = TableKey<TabletId, Channel>;
using TColumns = TableColumns<
TabletId,
Channel,
- SoftGenStep,
- HardGenStep
+ LastRecordGenStep,
+ Soft,
+ Hard
>;
};
diff --git a/ydb/core/blobstorage/dsproxy/group_sessions.cpp b/ydb/core/blobstorage/dsproxy/group_sessions.cpp
index 27223501628..9971c4a4e72 100644
--- a/ydb/core/blobstorage/dsproxy/group_sessions.cpp
+++ b/ydb/core/blobstorage/dsproxy/group_sessions.cpp
@@ -85,7 +85,7 @@ TGroupSessions::TGroupSessions(const TIntrusivePtr<TBlobStorageGroupInfo>& info,
auto& q = stateVDisk.Queues.GetQueue(queueId);
q.ActorId = queue;
q.FlowRecord = std::move(flowRecord);
- q.ExtraBlockChecksSupport = false;
+ q.ExtraBlockChecksSupport.reset();
}
}
}
@@ -126,7 +126,7 @@ void TGroupSessions::QueueConnectUpdate(ui32 orderNumber, NKikimrBlobStorage::EV
q.ExtraBlockChecksSupport = extraGroupChecksSupport;
} else {
ConnectedQueuesMask[orderNumber] &= ~(1 << queueId);
- q.ExtraBlockChecksSupport = false;
+ q.ExtraBlockChecksSupport.reset();
}
}
diff --git a/ydb/core/blobstorage/dsproxy/group_sessions.h b/ydb/core/blobstorage/dsproxy/group_sessions.h
index 120ad2da4c9..d5005a7177d 100644
--- a/ydb/core/blobstorage/dsproxy/group_sessions.h
+++ b/ydb/core/blobstorage/dsproxy/group_sessions.h
@@ -21,7 +21,7 @@ namespace NKikimr {
struct TQueue {
TActorId ActorId;
TIntrusivePtr<NBackpressure::TFlowRecord> FlowRecord;
- bool ExtraBlockChecksSupport;
+ std::optional<bool> ExtraBlockChecksSupport;
};
TQueue PutTabletLog;
TQueue PutAsyncBlob;
@@ -80,12 +80,12 @@ namespace NKikimr {
{}
static void ValidateEvent(TQueue& queue, const TEvBlobStorage::TEvVPut& event) {
- Y_VERIFY(!event.Record.ExtraBlockChecksSize() || queue.ExtraBlockChecksSupport);
+ Y_VERIFY(!event.Record.ExtraBlockChecksSize() || queue.ExtraBlockChecksSupport.value_or(true));
}
static void ValidateEvent(TQueue& queue, const TEvBlobStorage::TEvVMultiPut& event) {
for (const auto& item : event.Record.GetItems()) {
- Y_VERIFY(!item.ExtraBlockChecksSize() || queue.ExtraBlockChecksSupport);
+ Y_VERIFY(!item.ExtraBlockChecksSize() || queue.ExtraBlockChecksSupport.value_or(true));
}
}
diff --git a/ydb/core/blobstorage/ut_blobstorage/blob_depot.cpp b/ydb/core/blobstorage/ut_blobstorage/blob_depot.cpp
index 14cdbf21f39..0c1cd18d596 100644
--- a/ydb/core/blobstorage/ut_blobstorage/blob_depot.cpp
+++ b/ydb/core/blobstorage/ut_blobstorage/blob_depot.cpp
@@ -84,6 +84,32 @@ Y_UNIT_TEST_SUITE(BlobDepot) {
UNIT_ASSERT_VALUES_EQUAL(res->Get()->Responses[0].Buffer, data);
}
+ sender = env.Runtime->AllocateEdgeActor(3);
+ env.Runtime->WrapInActorContext(sender, [&] {
+ SendToBSProxy(sender, groupId, new TEvBlobStorage::TEvRange(id.TabletID(), Min<TLogoBlobID>(),
+ Max<TLogoBlobID>(), false, TInstant::Max(), true));
+ });
+ {
+ auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvRangeResult>(sender);
+ UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
+ UNIT_ASSERT_VALUES_EQUAL(res->Get()->Responses.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(res->Get()->Responses[0].Id, id);
+ UNIT_ASSERT_VALUES_EQUAL(res->Get()->Responses[0].Buffer, TString());
+ }
+
+ sender = env.Runtime->AllocateEdgeActor(4);
+ env.Runtime->WrapInActorContext(sender, [&] {
+ SendToBSProxy(sender, groupId, new TEvBlobStorage::TEvRange(id.TabletID(), Min<TLogoBlobID>(),
+ Max<TLogoBlobID>(), false, TInstant::Max(), false));
+ });
+ {
+ auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvRangeResult>(sender);
+ UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK);
+ UNIT_ASSERT_VALUES_EQUAL(res->Get()->Responses.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(res->Get()->Responses[0].Id, id);
+ UNIT_ASSERT_VALUES_EQUAL(res->Get()->Responses[0].Buffer, data);
+ }
+
env.Sim(TDuration::Seconds(20));
}
diff --git a/ydb/core/mind/bscontroller/load_everything.cpp b/ydb/core/mind/bscontroller/load_everything.cpp
index 529b0ebe384..e01099456ae 100644
--- a/ydb/core/mind/bscontroller/load_everything.cpp
+++ b/ydb/core/mind/bscontroller/load_everything.cpp
@@ -176,8 +176,9 @@ public:
const TBoxStoragePoolId storagePoolId = it->second;
groupToStoragePool.erase(it);
+ const bool isVirtualGroup = TGroupID(groups.GetKey()).ConfigurationType() == EGroupConfigurationType::Virtual;
const auto geomIt = geometry.find(groups.GetKey());
- Y_VERIFY(geomIt != geometry.end());
+ Y_VERIFY(isVirtualGroup || geomIt != geometry.end());
TGroupInfo& group = Self->AddGroup(groups.GetKey(),
groups.GetValue<T::Generation>(),
@@ -194,9 +195,9 @@ public:
groups.GetValueOrDefault<T::Down>(),
groups.GetValueOrDefault<T::SeenOperational>(),
storagePoolId,
- std::get<0>(geomIt->second),
- std::get<1>(geomIt->second),
- std::get<2>(geomIt->second));
+ isVirtualGroup ? 0 : std::get<0>(geomIt->second),
+ isVirtualGroup ? 0 : std::get<1>(geomIt->second),
+ isVirtualGroup ? 0 : std::get<2>(geomIt->second));
group.DecommitStatus = groups.GetValueOrDefault<T::DecommitStatus>();
#define OPTIONAL(NAME) \
diff --git a/ydb/core/protos/blob_depot.proto b/ydb/core/protos/blob_depot.proto
index 273c1786f52..46ec82b9ec0 100644
--- a/ydb/core/protos/blob_depot.proto
+++ b/ydb/core/protos/blob_depot.proto
@@ -87,8 +87,15 @@ message TEvBlockResult {
optional uint32 TimeToLiveMs = 3;
}
-message TEvPushNotify { // BlobDepot -> Agent push notification (to take some action)
- repeated fixed64 UpdateBlocksForTabletIds = 1; // notify about some changed blocks
+message TEvPushNotify {
+ message TBlockedTablet {
+ optional fixed64 TabletId = 1;
+ optional uint32 BlockedGeneration = 2;
+ }
+ repeated TBlockedTablet BlockedTablets = 1;
+}
+
+message TEvPushNotifyResult {
}
message TEvQueryBlocks {