aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2022-08-13 23:14:43 +0300
committeralexvru <alexvru@ydb.tech>2022-08-13 23:14:43 +0300
commit77b7e132d724f31ae97a0438cc39fa87b39d968b (patch)
treebda2e13eb7b34828d69218f17855950b02be98bc
parent167bbfedbd3a21f0da22ca76f6c81772c6acb9ec (diff)
downloadydb-77b7e132d724f31ae97a0438cc39fa87b39d968b.tar.gz
BlobDepot work in progress: support assimilation through ds proxy
-rw-r--r--ydb/core/base/blobstorage.cpp5
-rw-r--r--ydb/core/base/blobstorage.h167
-rw-r--r--ydb/core/blob_depot/CMakeLists.txt2
-rw-r--r--ydb/core/blob_depot/agent.cpp4
-rw-r--r--ydb/core/blob_depot/agent/CMakeLists.txt1
-rw-r--r--ydb/core/blob_depot/agent/agent_impl.h3
-rw-r--r--ydb/core/blob_depot/agent/blocks.cpp2
-rw-r--r--ydb/core/blob_depot/agent/blocks.h2
-rw-r--r--ydb/core/blob_depot/agent/channel_kind.cpp2
-rw-r--r--ydb/core/blob_depot/agent/storage_assimilate.cpp26
-rw-r--r--ydb/core/blob_depot/assimilator.cpp276
-rw-r--r--ydb/core/blob_depot/assimilator.h34
-rw-r--r--ydb/core/blob_depot/assimilator_copier.cpp31
-rw-r--r--ydb/core/blob_depot/assimilator_copier.h19
-rw-r--r--ydb/core/blob_depot/assimilator_fetch_machine.cpp382
-rw-r--r--ydb/core/blob_depot/assimilator_fetch_machine.h135
-rw-r--r--ydb/core/blob_depot/blob_depot.cpp5
-rw-r--r--ydb/core/blob_depot/blob_depot_tablet.h140
-rw-r--r--ydb/core/blob_depot/blocks.cpp15
-rw-r--r--ydb/core/blob_depot/blocks.h4
-rw-r--r--ydb/core/blob_depot/data.cpp3
-rw-r--r--ydb/core/blob_depot/data.h2
-rw-r--r--ydb/core/blob_depot/events.h2
-rw-r--r--ydb/core/blob_depot/garbage_collection.cpp74
-rw-r--r--ydb/core/blob_depot/garbage_collection.h9
-rw-r--r--ydb/core/blob_depot/op_load.cpp15
-rw-r--r--ydb/core/blob_depot/types.h2
-rw-r--r--ydb/core/blobstorage/backpressure/queue.cpp1
-rw-r--r--ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp48
-rw-r--r--ydb/core/blobstorage/dsproxy/CMakeLists.txt1
-rw-r--r--ydb/core/blobstorage/dsproxy/defs.h1
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy.h9
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp428
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_impl.h4
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_mon.cpp3
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_mon.h3
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_nodemon.cpp1
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_nodemon.h1
-rw-r--r--ydb/core/blobstorage/dsproxy/dsproxy_request.cpp8
-rw-r--r--ydb/core/blobstorage/dsproxy/group_sessions.h5
-rw-r--r--ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp5
-rw-r--r--ydb/core/blobstorage/ut_blobstorage/assimilation.cpp17
-rw-r--r--ydb/core/blobstorage/vdisk/common/vdisk_events.h17
-rw-r--r--ydb/core/blobstorage/vdisk/query/assimilation.cpp12
-rw-r--r--ydb/core/blobstorage/vdisk/query/assimilation.h2
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp16
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonerr.h9
-rw-r--r--ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp6
-rw-r--r--ydb/core/protos/blobstorage.proto1
-rw-r--r--ydb/core/protos/services.proto2
50 files changed, 991 insertions, 971 deletions
diff --git a/ydb/core/base/blobstorage.cpp b/ydb/core/base/blobstorage.cpp
index 8ce42d7ea9..b18215f508 100644
--- a/ydb/core/base/blobstorage.cpp
+++ b/ydb/core/base/blobstorage.cpp
@@ -115,6 +115,11 @@ std::unique_ptr<TEvBlobStorage::TEvStatusResult> TEvBlobStorage::TEvStatus::Make
return res;
}
+std::unique_ptr<TEvBlobStorage::TEvAssimilateResult> TEvBlobStorage::TEvAssimilate::MakeErrorResponse(
+ NKikimrProto::EReplyStatus status, const TString& errorReason, ui32 /*groupId*/) {
+ return std::make_unique<TEvBlobStorage::TEvAssimilateResult>(status, errorReason);
+}
+
};
template<>
diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h
index cad14e35e8..50b464aaf5 100644
--- a/ydb/core/base/blobstorage.h
+++ b/ydb/core/base/blobstorage.h
@@ -459,6 +459,7 @@ struct TEvBlobStorage {
EvVBaldSyncLog,
EvPatch,
EvInplacePatch,
+ EvAssimilate,
//
EvPutResult = EvPut + 512, /// 268 632 576
@@ -472,6 +473,7 @@ struct TEvBlobStorage {
EvVBaldSyncLogResult,
EvPatchResult,
EvInplacePatchResult,
+ EvAssimilateResult,
// proxy <-> vdisk interface
EvVPut = EvPut + 2 * 512, /// 268 633 088
@@ -868,6 +870,7 @@ struct TEvBlobStorage {
struct TEvStatusResult;
struct TEvPatchResult;
struct TEvInplacePatchResult;
+ struct TEvAssimilateResult;
struct TEvPut : public TEventLocal<TEvPut, EvPut> {
enum ETactic {
@@ -1994,6 +1997,170 @@ struct TEvBlobStorage {
}
};
+ struct TEvAssimilate : TEventLocal<TEvAssimilate, EvAssimilate> {
+ std::optional<ui64> SkipBlocksUpTo;
+ std::optional<std::tuple<ui64, ui8>> SkipBarriersUpTo;
+ std::optional<TLogoBlobID> SkipBlobsUpTo;
+ ui32 RestartCounter = 0;
+
+ TEvAssimilate(std::optional<ui64> skipBlocksUpTo, std::optional<std::tuple<ui64, ui8>> skipBarriersUpTo,
+ std::optional<TLogoBlobID> skipBlobsUpTo)
+ : SkipBlocksUpTo(skipBlocksUpTo)
+ , SkipBarriersUpTo(skipBarriersUpTo)
+ , SkipBlobsUpTo(skipBlobsUpTo)
+ {}
+
+ TString Print(bool /*isFull*/) const {
+ return ToString();
+ }
+
+ TString ToString() const {
+ TStringStream str;
+ str << "TEvAssimilate {";
+ const char *prefix = "";
+ if (SkipBlocksUpTo) {
+ str << std::exchange(prefix, " ") << "SkipBlocksUpTo# " << *SkipBlocksUpTo;
+ }
+ if (SkipBarriersUpTo) {
+ str << std::exchange(prefix, " " ) << "SkipBarriersUpTo# " << std::get<0>(*SkipBarriersUpTo)
+ << ":" << int(std::get<1>(*SkipBarriersUpTo));
+ }
+ if (SkipBlobsUpTo) {
+ str << std::exchange(prefix, " " ) << "SkipBlobsUpTo# ";
+ SkipBlobsUpTo->Out(str);
+ }
+ str << "}";
+ return str.Str();
+ }
+
+ ui32 CalculateSize() const {
+ return sizeof(*this);
+ }
+
+ std::unique_ptr<TEvAssimilateResult> MakeErrorResponse(NKikimrProto::EReplyStatus status, const TString& errorReason,
+ ui32 groupId);
+ };
+
+ struct TEvAssimilateResult : TEventLocal<TEvAssimilateResult, EvAssimilateResult> {
+ struct TBlock {
+ ui64 TabletId;
+ ui32 BlockedGeneration;
+
+ TString ToString() const {
+ TStringStream str;
+ Output(str);
+ return str.Str();
+ }
+
+ void Output(IOutputStream& s) const {
+ s << "{" << TabletId << "=>" << BlockedGeneration << "}";
+ }
+ };
+
+ struct TBarrier {
+ struct TValue {
+ ui32 RecordGeneration;
+ ui32 PerGenerationCounter;
+ ui32 CollectGeneration;
+ ui32 CollectStep;
+
+ void Output(IOutputStream& s) const {
+ if (RecordGeneration || PerGenerationCounter || CollectGeneration || CollectGeneration) {
+ s << "{" << RecordGeneration << ":" << PerGenerationCounter << "=>" << CollectGeneration
+ << ":" << CollectStep << "}";
+ }
+ }
+ };
+
+ ui64 TabletId;
+ ui8 Channel;
+ TValue Soft;
+ TValue Hard;
+
+ TString ToString() const {
+ TStringStream str;
+ Output(str);
+ return str.Str();
+ }
+
+ void Output(IOutputStream& s) const {
+ s << "{" << TabletId << ":" << int(Channel) << "=>soft";
+ Soft.Output(s);
+ s << "/hard";
+ Hard.Output(s);
+ s << "}";
+ }
+ };
+
+ struct TBlob {
+ TLogoBlobID Id;
+ bool Keep;
+ bool DoNotKeep;
+
+ TString ToString() const {
+ TStringStream str;
+ Output(str);
+ return str.Str();
+ }
+
+ void Output(IOutputStream& s) const {
+ Id.Out(s);
+ if (Keep) {
+ s << "k";
+ }
+ if (DoNotKeep) {
+ s << "d";
+ }
+ }
+ };
+
+ NKikimrProto::EReplyStatus Status;
+ TString ErrorReason;
+ std::deque<TBlock> Blocks;
+ std::deque<TBarrier> Barriers;
+ std::deque<TBlob> Blobs;
+
+ TEvAssimilateResult(NKikimrProto::EReplyStatus status, TString errorReason = {})
+ : Status(status)
+ , ErrorReason(std::move(errorReason))
+ {}
+
+ TString Print(bool isFull) const {
+ TStringStream str;
+ str << "TEvAssimilateResult {"
+ << "Status# " << NKikimrProto::EReplyStatus_Name(Status)
+ << " ErrorReason# '" << ErrorReason << "'";
+
+ auto out = [&](const char *name, auto& container) {
+ str << " " << name << "# ";
+ if (isFull) {
+ str << "[";
+ for (auto it = container.begin(); it != container.end(); ++it) {
+ if (it != container.begin()) {
+ str << " ";
+ }
+ it->Output(str);
+ }
+ str << "]";
+ } else {
+ str << "size=" << container.size();
+ }
+ };
+
+ out("Blocks", Blocks);
+ out("Barriers", Barriers);
+ out("Blobs", Blobs);
+
+ str << "}";
+
+ return str.Str();
+ }
+
+ TString ToString() const {
+ return Print(false);
+ }
+ };
+
struct TEvConfigureProxy;
struct TEvUpdateGroupInfo;
diff --git a/ydb/core/blob_depot/CMakeLists.txt b/ydb/core/blob_depot/CMakeLists.txt
index 330f795866..8c20c98b6e 100644
--- a/ydb/core/blob_depot/CMakeLists.txt
+++ b/ydb/core/blob_depot/CMakeLists.txt
@@ -19,8 +19,6 @@ target_sources(ydb-core-blob_depot PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blob_depot.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/assimilator.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/assimilator_copier.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/assimilator_fetch_machine.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blocks.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_gc.cpp
diff --git a/ydb/core/blob_depot/agent.cpp b/ydb/core/blob_depot/agent.cpp
index 272df7ae92..792491b5b4 100644
--- a/ydb/core/blob_depot/agent.cpp
+++ b/ydb/core/blob_depot/agent.cpp
@@ -118,10 +118,10 @@ namespace NKikimr::NBlobDepot {
std::make_heap(Channels.begin(), Channels.end(), TChannelCompare());
}
- std::pair<ui8, ui64> PickChannelBlobSeq() {
+ std::tuple<ui8, ui64> PickChannelBlobSeq() {
std::pop_heap(Channels.begin(), Channels.end(), TChannelCompare());
TChannelInfo *channel = Channels.back();
- auto res = std::make_pair(channel->Index, channel->NextBlobSeqId++);
+ auto res = std::make_tuple(channel->Index, channel->NextBlobSeqId++);
std::push_heap(Channels.begin(), Channels.end(), TChannelCompare());
Size += 4 << 20; // assume each written blob of this size in a first approximation
return res;
diff --git a/ydb/core/blob_depot/agent/CMakeLists.txt b/ydb/core/blob_depot/agent/CMakeLists.txt
index 8ebb4a51ef..86891c8205 100644
--- a/ydb/core/blob_depot/agent/CMakeLists.txt
+++ b/ydb/core/blob_depot/agent/CMakeLists.txt
@@ -35,4 +35,5 @@ target_sources(core-blob_depot-agent PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_collect_garbage.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_status.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_patch.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_assimilate.cpp
)
diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h
index 25e1b93702..40c4fe2283 100644
--- a/ydb/core/blob_depot/agent/agent_impl.h
+++ b/ydb/core/blob_depot/agent/agent_impl.h
@@ -13,6 +13,7 @@ namespace NKikimr::NBlobDepot {
XX(EvCollectGarbage) \
XX(EvStatus) \
XX(EvPatch) \
+ XX(EvAssimilate) \
// END
class TBlobDepotAgent;
@@ -270,7 +271,7 @@ namespace NKikimr::NBlobDepot {
void IssueGivenIdRange(const NKikimrBlobDepot::TGivenIdRange& proto);
ui32 GetNumAvailableItems() const;
std::optional<TBlobSeqId> Allocate(TBlobDepotAgent& agent);
- std::pair<TLogoBlobID, ui32> MakeBlobId(TBlobDepotAgent& agent, const TBlobSeqId& blobSeqId, EBlobType type,
+ std::tuple<TLogoBlobID, ui32> MakeBlobId(TBlobDepotAgent& agent, const TBlobSeqId& blobSeqId, EBlobType type,
ui32 part, ui32 size) const;
void Trim(ui8 channel, ui32 generation, ui32 invalidatedStep);
void RebuildHeap();
diff --git a/ydb/core/blob_depot/agent/blocks.cpp b/ydb/core/blob_depot/agent/blocks.cpp
index 06efcfbff0..864b5a1266 100644
--- a/ydb/core/blob_depot/agent/blocks.cpp
+++ b/ydb/core/blob_depot/agent/blocks.cpp
@@ -67,7 +67,7 @@ namespace NKikimr::NBlobDepot {
});
}
- std::pair<ui32, ui64> TBlobDepotAgent::TBlocksManager::GetBlockForTablet(ui64 tabletId) {
+ std::tuple<ui32, ui64> TBlobDepotAgent::TBlocksManager::GetBlockForTablet(ui64 tabletId) {
if (const auto it = Blocks.find(tabletId); it != Blocks.end()) {
const auto& record = it->second;
return {record.BlockedGeneration, record.IssuerGuid};
diff --git a/ydb/core/blob_depot/agent/blocks.h b/ydb/core/blob_depot/agent/blocks.h
index ddf79bac88..551465fbde 100644
--- a/ydb/core/blob_depot/agent/blocks.h
+++ b/ydb/core/blob_depot/agent/blocks.h
@@ -40,7 +40,7 @@ namespace NKikimr::NBlobDepot {
void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr context, TResponse response) override;
void Handle(TRequestContext::TPtr context, NKikimrBlobDepot::TEvQueryBlocksResult& msg);
void IssueOnUpdateBlock(TBlock& block, bool success);
- std::pair<ui32, ui64> GetBlockForTablet(ui64 tabletId);
+ std::tuple<ui32, ui64> GetBlockForTablet(ui64 tabletId);
void SetBlockForTablet(ui64 tabletId, ui32 blockedGeneration, TMonotonic timestamp, TDuration timeToLive);
void OnBlockedTablets(const NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TEvPushNotify::TBlockedTablet>& tablets);
};
diff --git a/ydb/core/blob_depot/agent/channel_kind.cpp b/ydb/core/blob_depot/agent/channel_kind.cpp
index a5be21c0b6..544b6d5244 100644
--- a/ydb/core/blob_depot/agent/channel_kind.cpp
+++ b/ydb/core/blob_depot/agent/channel_kind.cpp
@@ -55,7 +55,7 @@ namespace NKikimr::NBlobDepot {
return TBlobSeqId::FromSequentalNumber(channel, agent.BlobDepotGeneration, value);
}
- std::pair<TLogoBlobID, ui32> TBlobDepotAgent::TChannelKind::MakeBlobId(TBlobDepotAgent& agent,
+ std::tuple<TLogoBlobID, ui32> TBlobDepotAgent::TChannelKind::MakeBlobId(TBlobDepotAgent& agent,
const TBlobSeqId& blobSeqId, EBlobType type, ui32 part, ui32 size) const {
auto id = blobSeqId.MakeBlobId(agent.TabletId, type, part, size);
const auto [channel, groupId] = ChannelGroups[ChannelToIndex[blobSeqId.Channel]];
diff --git a/ydb/core/blob_depot/agent/storage_assimilate.cpp b/ydb/core/blob_depot/agent/storage_assimilate.cpp
new file mode 100644
index 0000000000..05964b1ca0
--- /dev/null
+++ b/ydb/core/blob_depot/agent/storage_assimilate.cpp
@@ -0,0 +1,26 @@
+#include "agent_impl.h"
+
+namespace NKikimr::NBlobDepot {
+
+ template<>
+ TBlobDepotAgent::TQuery *TBlobDepotAgent::CreateQuery<TEvBlobStorage::EvAssimilate>(std::unique_ptr<IEventHandle> ev) {
+ class TAssimilateQuery : public TQuery {
+ public:
+ using TQuery::TQuery;
+
+ void Initiate() override {
+ Y_VERIFY(Agent.ProxyId);
+ const bool sent = TActivationContext::Send(Event->Forward(Agent.ProxyId));
+ Y_VERIFY(sent);
+ delete this;
+ }
+
+ void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr /*context*/, TResponse /*response*/) override {
+ Y_FAIL();
+ }
+ };
+
+ return new TAssimilateQuery(*this, std::move(ev));
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/assimilator.cpp b/ydb/core/blob_depot/assimilator.cpp
index ef3c2b9913..7f7e218cdb 100644
--- a/ydb/core/blob_depot/assimilator.cpp
+++ b/ydb/core/blob_depot/assimilator.cpp
@@ -1,204 +1,127 @@
#include "blob_depot_tablet.h"
-#include "assimilator_fetch_machine.h"
-#include "assimilator_copier.h"
-#include "schema.h"
+#include "assimilator.h"
#include "blocks.h"
#include "garbage_collection.h"
#include "data.h"
namespace NKikimr::NBlobDepot {
- class TBlobDepot::TGroupAssimilator : public TActorBootstrapped<TGroupAssimilator> {
+ using TAssimilator = TBlobDepot::TGroupAssimilator;
+
+ struct TStateBits {
enum {
- EvReconnectToController = EventSpaceBegin(TEvents::ES_PRIVATE),
+ Blocks = 1,
+ Barriers = 2,
+ Blobs = 4,
};
+ };
- const ui32 GroupId;
- const NKikimrBlobDepot::TBlobDepotConfig Config;
- const ui64 TabletId;
- std::optional<TString> AssimilatorState;
- TActorId BlobDepotId;
- TIntrusivePtr<TBlobStorageGroupInfo> Info;
-
- public:
- TGroupAssimilator(const NKikimrBlobDepot::TBlobDepotConfig& config, ui64 tabletId, std::optional<TString> assimilatorState)
- : GroupId(config.GetDecommitGroupId())
- , Config(config)
- , TabletId(tabletId)
- , AssimilatorState(std::move(assimilatorState))
- {
- Y_VERIFY(Config.GetOperationMode() == NKikimrBlobDepot::EOperationMode::VirtualGroup);
- }
-
- void Bootstrap(TActorId parentId) {
- BlobDepotId = parentId;
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT31, "TGroupAssimilator::Bootstrap", (GroupId, GroupId));
- QueryGroupConfiguration();
- }
-
- void PassAway() override {
- FetchMachine->OnPassAway();
- TActorBootstrapped::PassAway();
+ void TAssimilator::Bootstrap() {
+ if (Token.expired()) {
+ return PassAway();
}
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // BSC interaction
-
- TActorId ControllerPipeId;
-
- void QueryGroupConfiguration() {
- TGroupID groupId(GroupId);
- if (groupId.ConfigurationType() != EGroupConfigurationType::Dynamic) {
- AbortWithError("group configuration type is not dynamic");
+ const std::optional<TString>& state = Self->AssimilatorState;
+ if (state) {
+ TStringInput stream(*state);
+ ui8 stateBits;
+ Load(&stream, stateBits);
+ if (stateBits & TStateBits::Blocks) {
+ Load(&stream, SkipBlocksUpTo.emplace());
}
-
- const ui64 controllerId = MakeBSControllerID(groupId.AvailabilityDomainID());
- ControllerPipeId = Register(NTabletPipe::CreateClient(SelfId(), controllerId));
- Become(&TThis::StateQueryController);
- }
-
- STRICT_STFUNC(StateQueryController,
- cFunc(TEvents::TSystem::Poison, PassAway);
-
- cFunc(EvReconnectToController, QueryGroupConfiguration);
- hFunc(TEvTabletPipe::TEvClientConnected, Handle);
- hFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
- hFunc(TEvBlobStorage::TEvControllerNodeServiceSetUpdate, Handle);
- );
-
- void Handle(TEvTabletPipe::TEvClientConnected::TPtr ev) {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT00, "TGroupAssimilator::TEvClientConnected", (GroupId, GroupId));
- Y_VERIFY(ev->Get()->ClientId == ControllerPipeId);
- if (ev->Get()->Status == NKikimrProto::OK) {
- NTabletPipe::SendData(SelfId(), ControllerPipeId, new TEvBlobStorage::TEvControllerGetGroup(0, GroupId));
- } else {
- Reconnect();
+ if (stateBits & TStateBits::Barriers) {
+ Load(&stream, SkipBarriersUpTo.emplace());
}
- }
-
- void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr ev) {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT32, "TGroupAssimilator::TEvClientDestroyed", (GroupId, GroupId));
- Y_VERIFY(ev->Get()->ClientId == ControllerPipeId);
- Reconnect();
- }
-
- void Handle(TEvBlobStorage::TEvControllerNodeServiceSetUpdate::TPtr ev) {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT33, "TGroupAssimilator::TEvControllerNodeServiceSetUpdate", (GroupId, GroupId),
- (Msg, ev->Get()->Record));
- NTabletPipe::CloseAndForgetClient(SelfId(), ControllerPipeId);
-
- auto& record = ev->Get()->Record;
- if (record.HasStatus() && record.GetStatus() == NKikimrProto::OK && record.HasServiceSet()) {
- const auto& ss = record.GetServiceSet();
- for (const auto& group : ss.GetGroups()) {
- if (group.GetGroupID() == GroupId) {
- if (group.GetEntityStatus() == NKikimrBlobStorage::EEntityStatus::DESTROY) {
- return AbortWithError("the group being decommitted was destroyed");
- } else if (!group.HasBlobDepotId() || group.GetBlobDepotId() != TabletId) {
- return AbortWithError("inconsistent decommission state");
- } else {
- Info = TBlobStorageGroupInfo::Parse(group, nullptr, nullptr);
- StartAssimilation();
- return;
- }
- }
- }
+ if (stateBits & TStateBits::Blobs) {
+ Load(&stream, SkipBlobsUpTo.emplace());
}
-
- // retry operation in some time
- Reconnect();
- }
-
- void Reconnect() {
- NTabletPipe::CloseAndForgetClient(SelfId(), ControllerPipeId);
- TActivationContext::Schedule(TDuration::Seconds(1), new IEventHandle(EvReconnectToController, 0,
- SelfId(), {}, nullptr, 0));
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-
- std::unique_ptr<TGroupAssimilatorFetchMachine> FetchMachine;
-
- void StartAssimilation() {
- Become(&TThis::StateAssimilate);
- FetchMachine = std::make_unique<TGroupAssimilatorFetchMachine>(SelfId(), Info, BlobDepotId,
- std::move(AssimilatorState));
}
- void StateAssimilate(STFUNC_SIG) {
- Y_UNUSED(ctx);
+ SendRequest();
+ Become(&TThis::StateFunc);
+ }
- switch (ev->GetTypeRewrite()) {
- cFunc(TEvents::TSystem::Poison, PassAway);
- IgnoreFunc(TEvTabletPipe::TEvClientDestroyed);
+ void TAssimilator::PassAway() {
+ }
- default:
- return FetchMachine->Handle(ev);
- }
+ STATEFN(TAssimilator::StateFunc) {
+ if (Token.expired()) {
+ return PassAway();
}
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-
- void AbortWithError(TString error) {
- STLOG(PRI_ERROR, BLOB_DEPOT, BDT34, "failed to assimilate group", (GroupId, GroupId), (Error, error));
- TActivationContext::Send(new IEventHandle(TEvents::TSystem::Gone, 0, BlobDepotId, SelfId(), nullptr, 0));
- PassAway();
- }
- };
+ switch (const ui32 type = ev->GetTypeRewrite()) {
+ hFunc(TEvBlobStorage::TEvAssimilateResult, Handle);
- void TBlobDepot::StartGroupAssimilator() {
- if (!RunningGroupAssimilator && Config.HasDecommitGroupId()) {
- CopierId = RegisterWithSameMailbox(new TGroupAssimilatorCopierActor(this));
- RunningGroupAssimilator = Register(new TGroupAssimilator(Config, TabletID(), AssimilatorState));
+ default:
+ Y_VERIFY_DEBUG(false, "unexpected event Type# %08" PRIx32, type);
+ STLOG(PRI_CRIT, BLOB_DEPOT, BDTxx, "unexpected event", (Id, Self->GetLogId()), (Type, type));
+ break;
}
}
- void TBlobDepot::HandleGone(TAutoPtr<IEventHandle> ev) {
- if (ev->Sender == RunningGroupAssimilator) {
- RunningGroupAssimilator = {};
- } else {
- Y_FAIL("unexpected event");
- }
+ void TAssimilator::SendRequest() {
+ SendToBSProxy(SelfId(), Self->Config.GetDecommitGroupId(), new TEvBlobStorage::TEvAssimilate(SkipBlocksUpTo,
+ SkipBarriersUpTo, SkipBlobsUpTo));
}
- void TBlobDepot::Handle(TEvAssimilatedData::TPtr ev) {
+ void TAssimilator::Handle(TEvBlobStorage::TEvAssimilateResult::TPtr ev) {
class TTxPutAssimilatedData : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
- std::unique_ptr<TEvAssimilatedData> Ev;
+ TAssimilator *Self;
+ std::unique_ptr<TEvBlobStorage::TEvAssimilateResult> Ev;
bool UnblockRegisterActorQ = false;
public:
- TTxPutAssimilatedData(TBlobDepot *self, TEvAssimilatedData::TPtr ev)
- : TTransactionBase(self)
+ TTxPutAssimilatedData(TAssimilator *self, TEvBlobStorage::TEvAssimilateResult::TPtr ev)
+ : TTransactionBase(self->Self)
+ , Self(self)
, Ev(ev->Release().Release())
{}
bool Execute(TTransactionContext& txc, const TActorContext&) override {
NIceDb::TNiceDb db(txc.DB);
+ const bool done = Ev->Blocks.empty() && Ev->Barriers.empty() && Ev->Blobs.empty();
+ const bool blocksFinished = Ev->Blocks.empty() || !Ev->Barriers.empty() || !Ev->Blobs.empty() || done;
+ const bool barriersFinished = Ev->Barriers.empty() || !Ev->Blobs.empty() || done;
+
+ if (const auto& blocks = Ev->Blocks; !blocks.empty()) {
+ Self->SkipBlocksUpTo = blocks.back().TabletId;
+ }
+ if (const auto& barriers = Ev->Barriers; !barriers.empty()) {
+ Self->SkipBarriersUpTo = {barriers.back().TabletId, barriers.back().Channel};
+ }
+ if (const auto& blobs = Ev->Blobs; !blobs.empty()) {
+ Self->SkipBlobsUpTo = blobs.back().Id;
+ }
+
for (const auto& block : Ev->Blocks) {
- Self->BlocksManager->AddBlockOnDecommit(block, txc);
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDTxx, "assimilated block", (Id, Self->Self->GetLogId()), (Block, block));
+ Self->Self->BlocksManager->AddBlockOnDecommit(block, txc);
}
for (const auto& barrier : Ev->Barriers) {
- Self->BarrierServer->AddBarrierOnDecommit(barrier, txc);
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDTxx, "assimilated barrier", (Id, Self->Self->GetLogId()), (Barrier, barrier));
+ Self->Self->BarrierServer->AddBarrierOnDecommit(barrier, txc);
}
for (const auto& blob : Ev->Blobs) {
- Self->Data->AddDataOnDecommit(blob, txc);
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDTxx, "assimilated blob", (Id, Self->Self->GetLogId()), (Blob, blob));
+ Self->Self->Data->AddDataOnDecommit(blob, txc);
}
- if (Ev->BlocksFinished && Self->DecommitState < EDecommitState::BlocksFinished) {
- Self->DecommitState = EDecommitState::BlocksFinished;
+ auto& decommitState = Self->Self->DecommitState;
+ if (blocksFinished && decommitState < EDecommitState::BlocksFinished) {
+ decommitState = EDecommitState::BlocksFinished;
UnblockRegisterActorQ = true;
}
- if (Ev->BarriersFinished && Self->DecommitState < EDecommitState::BarriersFinished) {
- Self->DecommitState = EDecommitState::BarriersFinished;
+ if (barriersFinished && decommitState < EDecommitState::BarriersFinished) {
+ decommitState = EDecommitState::BarriersFinished;
}
- if (Ev->BlobsFinished && Self->DecommitState < EDecommitState::BlobsFinished) {
- Self->DecommitState = EDecommitState::BlobsFinished;
+ if (done && decommitState < EDecommitState::BlobsFinished) {
+ decommitState = EDecommitState::BlobsFinished;
}
db.Table<Schema::Config>().Key(Schema::Config::Key::Value).Update(
- NIceDb::TUpdate<Schema::Config::DecommitState>(Self->DecommitState),
- NIceDb::TUpdate<Schema::Config::AssimilatorState>(Ev->AssimilatorState)
+ NIceDb::TUpdate<Schema::Config::DecommitState>(decommitState),
+ NIceDb::TUpdate<Schema::Config::AssimilatorState>(Self->SerializeAssimilatorState())
);
return true;
@@ -206,32 +129,49 @@ namespace NKikimr::NBlobDepot {
void Complete(const TActorContext&) override {
if (UnblockRegisterActorQ) {
- STLOG(PRI_INFO, BLOB_DEPOT, BDT35, "blocks assimilation complete", (Id, Self->GetLogId()),
- (DecommitGroupId, Self->Config.GetDecommitGroupId()));
- Self->ProcessRegisterAgentQ();
+ STLOG(PRI_INFO, BLOB_DEPOT, BDT35, "blocks assimilation complete", (Id, Self->Self->GetLogId()),
+ (DecommitGroupId, Self->Self->Config.GetDecommitGroupId()));
+ Self->Self->ProcessRegisterAgentQ();
}
- if (EDecommitState::BlobsFinished <= Self->DecommitState) {
- // finished metadata replication, time to kill the assimilator
- TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, Self->RunningGroupAssimilator,
- Self->SelfId(), nullptr, 0));
- Self->RunningGroupAssimilator = {};
+ if (EDecommitState::BlobsFinished <= Self->Self->DecommitState) {
+ // finished metadata replication
} else {
- TActivationContext::Send(new IEventHandle(TEvPrivate::EvAssimilatedDataConfirm, 0,
- Self->RunningGroupAssimilator, Self->SelfId(), nullptr, 0)); // ask assimilator to resume process
+ Self->SendRequest();
}
}
};
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT47, "received TEvAssimilatedData", (Id, GetLogId()),
- (Blocks.size, ev->Get()->Blocks.size()), (Barriers.size, ev->Get()->Barriers.size()),
- (Blobs.size, ev->Get()->Blobs.size()), (BlocksFinished, ev->Get()->BlocksFinished),
- (BarriersFinished, ev->Get()->BarriersFinished), (BlobsFinished, ev->Get()->BlobsFinished));
- Execute(std::make_unique<TTxPutAssimilatedData>(this, ev));
+ Self->Execute(std::make_unique<TTxPutAssimilatedData>(this, ev));
}
- void TBlobDepot::ProcessAssimilatedData(TEvAssimilatedData& msg) {
- (void)msg;
+ TString TAssimilator::SerializeAssimilatorState() const {
+ TStringStream stream;
+
+ const ui8 stateBits = (SkipBlocksUpTo ? TStateBits::Blocks : 0)
+ | (SkipBarriersUpTo ? TStateBits::Barriers : 0)
+ | (SkipBlocksUpTo ? TStateBits::Blocks : 0);
+
+ Save(&stream, stateBits);
+
+ if (SkipBlocksUpTo) {
+ Save(&stream, *SkipBlocksUpTo);
+ }
+ if (SkipBarriersUpTo) {
+ Save(&stream, *SkipBarriersUpTo);
+ }
+ if (SkipBlobsUpTo) {
+ Save(&stream, *SkipBlobsUpTo);
+ }
+
+ return stream.Str();
+ }
+
+ void TBlobDepot::StartGroupAssimilator() {
+ if (Config.HasDecommitGroupId()) {
+ Y_VERIFY(!GroupAssimilatorId);
+ GroupAssimilatorId = RegisterWithSameMailbox(new TGroupAssimilator(this));
+ }
}
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/assimilator.h b/ydb/core/blob_depot/assimilator.h
new file mode 100644
index 0000000000..bab6d171bf
--- /dev/null
+++ b/ydb/core/blob_depot/assimilator.h
@@ -0,0 +1,34 @@
+#pragma once
+
+#include "defs.h"
+#include "blob_depot_tablet.h"
+
+namespace NKikimr::NBlobDepot {
+
+ class TBlobDepot::TGroupAssimilator : public TActorBootstrapped<TGroupAssimilator> {
+ std::weak_ptr<TToken> Token;
+ TBlobDepot *Self;
+
+ std::optional<ui64> SkipBlocksUpTo;
+ std::optional<std::tuple<ui64, ui8>> SkipBarriersUpTo;
+ std::optional<TLogoBlobID> SkipBlobsUpTo;
+
+ public:
+ TGroupAssimilator(TBlobDepot *self)
+ : Token(self->Token)
+ , Self(self)
+ {
+ Y_VERIFY(Self->Config.GetOperationMode() == NKikimrBlobDepot::EOperationMode::VirtualGroup);
+ }
+
+ void Bootstrap();
+ void PassAway() override;
+ STATEFN(StateFunc);
+
+ private:
+ void SendRequest();
+ void Handle(TEvBlobStorage::TEvAssimilateResult::TPtr ev);
+ TString SerializeAssimilatorState() const;
+ };
+
+} // NKikimrBlobDepot::NBlobDepot
diff --git a/ydb/core/blob_depot/assimilator_copier.cpp b/ydb/core/blob_depot/assimilator_copier.cpp
deleted file mode 100644
index 11456969b0..0000000000
--- a/ydb/core/blob_depot/assimilator_copier.cpp
+++ /dev/null
@@ -1,31 +0,0 @@
-#include "assimilator_copier.h"
-
-namespace NKikimr::NBlobDepot {
-
- using TCopierActor = TBlobDepot::TGroupAssimilatorCopierActor;
-
- TCopierActor::TGroupAssimilatorCopierActor(TBlobDepot *self)
- : Self(self)
- , Token(self->Token)
- {}
-
- void TCopierActor::Bootstrap() {
- Become(&TThis::StateFunc);
- }
-
- STATEFN(TCopierActor::StateFunc) {
- if (Token.expired()) {
- return; // BlobDepot tablet is already dead, we can't access its internals
- }
-
- switch (const ui32 type = ev->GetTypeRewrite()) {
- cFunc(TEvents::TSystem::Poison, PassAway);
-
- default:
- Y_VERIFY_DEBUG(false, "unexpected event Type# %08" PRIx32, type);
- STLOG(PRI_CRIT, BLOB_DEPOT, BDT46, "unexpected event", (Id, Self->GetLogId()), (Type, type));
- break;
- }
- }
-
-} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/assimilator_copier.h b/ydb/core/blob_depot/assimilator_copier.h
deleted file mode 100644
index 6f4926b75c..0000000000
--- a/ydb/core/blob_depot/assimilator_copier.h
+++ /dev/null
@@ -1,19 +0,0 @@
-#pragma once
-
-#include "defs.h"
-
-#include "blob_depot_tablet.h"
-
-namespace NKikimr::NBlobDepot {
-
- class TBlobDepot::TGroupAssimilatorCopierActor : public TActorBootstrapped<TGroupAssimilatorCopierActor> {
- TBlobDepot* const Self;
- std::weak_ptr<TBlobDepot::TToken> Token;
-
- public:
- TGroupAssimilatorCopierActor(TBlobDepot *self);
- void Bootstrap();
- STATEFN(StateFunc);
- };
-
-} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/assimilator_fetch_machine.cpp b/ydb/core/blob_depot/assimilator_fetch_machine.cpp
deleted file mode 100644
index 2edf40b598..0000000000
--- a/ydb/core/blob_depot/assimilator_fetch_machine.cpp
+++ /dev/null
@@ -1,382 +0,0 @@
-#include "assimilator_fetch_machine.h"
-
-namespace NKikimr::NBlobDepot {
-
- using TFetchMachine = TBlobDepot::TGroupAssimilatorFetchMachine;
-
- struct TStateMask {
- enum {
- Block = 1,
- Barrier = 2,
- Blob = 4,
- };
- };
-
- TFetchMachine::TGroupAssimilatorFetchMachine(TActorIdentity self, TIntrusivePtr<TBlobStorageGroupInfo> info,
- TActorId blobDepotId, const std::optional<TString>& assimilatorState)
- : Self(self)
- , Info(std::move(info))
- , BlobDepotId(blobDepotId)
- {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT38, "TGroupAssimilatorFetchMachine start", (GroupId, Info->GroupID));
-
- PerDiskState.resize(Info->GetTotalVDisksNum());
- for (ui32 i = 0; i < PerDiskState.size(); ++i) {
- const TActorId actorId = Info->GetActorId(i);
- const ui32 nodeId = actorId.NodeId();
- TNodeInfo& node = Nodes[nodeId];
- node.OrderNumbers.push_back(i);
- }
-
- for (const auto& [nodeId, node] : Nodes) {
- if (nodeId != Self.NodeId()) {
- TActivationContext::Send(new IEventHandle(TEvInterconnect::EvConnectNode, 0,
- TActivationContext::InterconnectProxy(nodeId), Self, nullptr, 0));
- } else {
- for (const ui32 orderNumber : node.OrderNumbers) {
- IssueAssimilateCmdToVDisk(orderNumber);
- }
- }
- }
-
- if (assimilatorState) {
- TStringInput stream(*assimilatorState);
- ui8 mask = 0;
- Load(&stream, mask);
- if (mask & TStateMask::Block) {
- Load(&stream, LastProcessedBlock.emplace());
- }
- if (mask & TStateMask::Barrier) {
- Load(&stream, LastProcessedBarrier.emplace());
- }
- if (mask & TStateMask::Blob) {
- Load(&stream, LastProcessedBlob.emplace());
- }
- }
- }
-
- void TFetchMachine::Handle(TAutoPtr<IEventHandle>& ev) {
- switch (const ui32 type = ev->GetTypeRewrite()) {
- hFunc(TEvBlobStorage::TEvVAssimilateResult, Handle);
- hFunc(TEvents::TEvUndelivered, Handle);
- hFunc(TEvInterconnect::TEvNodeConnected, Handle);
- hFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
- cFunc(TEvPrivate::EvAssimilatedDataConfirm, HandleAssimilateDataConfirm);
-
- default:
- Y_VERIFY_DEBUG(false, "unexpected event Type# %08" PRIx32, type);
- }
- }
-
- void TFetchMachine::OnPassAway() {
- for (const auto& [nodeId, node] : Nodes) {
- if (nodeId != Self.NodeId()) {
- TActivationContext::Send(new IEventHandle(TEvents::TSystem::Unsubscribe, 0,
- TActivationContext::InterconnectProxy(nodeId), Self, nullptr, 0));
- }
- }
- }
-
- void TFetchMachine::IssueAssimilateCmdToVDisk(ui32 orderNumber) {
- const TActorId actorId = Info->GetActorId(orderNumber);
-
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT39, "IssueAssimilateCmdToVDisk", (GroupId, Info->GroupID),
- (OrderNumber, orderNumber), (ActorId, actorId));
-
- TPerDiskState& state = PerDiskState[orderNumber];
- Y_VERIFY(!state.Finished);
- Y_VERIFY(!state.RequestInFlight);
- state.RequestInFlight = true;
-
- auto maxOpt = [](const auto& x, const auto& y) { return !y ? *x : !x ? *y : *x < *y ? *y : *x; };
-
- auto ev = std::make_unique<TEvBlobStorage::TEvVAssimilate>(Info->GetVDiskId(orderNumber));
- auto& record = ev->Record;
- if (state.LastBlock || LastProcessedBlock) {
- record.SetSkipBlocksUpTo(maxOpt(state.LastBlock, LastProcessedBlock));
- }
- if (state.LastBarrier || LastProcessedBarrier) {
- auto *x = record.MutableSkipBarriersUpTo();
- const auto& [tabletId, channel] = maxOpt(state.LastBarrier, LastProcessedBarrier);
- x->SetTabletId(tabletId);
- x->SetChannel(channel);
- }
- if (state.LastBlob || LastProcessedBlob) {
- LogoBlobIDFromLogoBlobID(maxOpt(state.LastBlob, LastProcessedBlob), record.MutableSkipBlobsUpTo());
- }
-
- const ui64 id = ++LastRequestId;
- Self.Send(actorId, ev.release(), IEventHandle::FlagTrackDelivery, id);
-
- const auto [it, inserted] = RequestsInFlight.emplace(id, TRequestInFlight{orderNumber});
- Y_VERIFY(inserted);
-
- const ui32 nodeId = actorId.NodeId();
- Nodes[nodeId].RequestsInFlight.insert(&*it);
- }
-
- void TFetchMachine::Handle(TEvInterconnect::TEvNodeConnected::TPtr ev) {
- const ui32 nodeId = ev->Get()->NodeId;
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT40, "NodeConnected", (GroupId, Info->GroupID), (NodeId, nodeId));
- for (const ui32 orderNumber : Nodes[nodeId].OrderNumbers) {
- if (auto& state = PerDiskState[orderNumber]; !state.Finished) {
- IssueAssimilateCmdToVDisk(orderNumber);
- }
- }
- }
-
- void TFetchMachine::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr ev) {
- const ui32 nodeId = ev->Get()->NodeId;
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT41, "NodeDisconnected", (GroupId, Info->GroupID), (NodeId, nodeId));
- for (const auto *kv : std::exchange(Nodes[nodeId].RequestsInFlight, {})) {
- TPerDiskState& state = PerDiskState[kv->second.OrderNumber];
- Y_VERIFY(state.RequestInFlight);
- state.RequestInFlight = false;
- const size_t num = RequestsInFlight.erase(kv->first);
- Y_VERIFY(num == 1);
- }
- Merge();
- TActivationContext::Send(new IEventHandle(TEvInterconnect::EvConnectNode, 0,
- TActivationContext::InterconnectProxy(nodeId), Self, nullptr, 0));
- }
-
- void TFetchMachine::Handle(TEvents::TEvUndelivered::TPtr ev) {
- if (ev->Get()->SourceType == TEvBlobStorage::EvVAssimilate) {
- // TODO: undelivery may be caused by moving VDisk actor out, handle it
- EndRequest(ev->Cookie);
- Merge();
- }
- }
-
- ui32 TFetchMachine::EndRequest(ui64 id) {
- const auto it = RequestsInFlight.find(id);
- Y_VERIFY(it != RequestsInFlight.end());
- const ui32 orderNumber = it->second.OrderNumber;
- TPerDiskState& state = PerDiskState[orderNumber];
- Y_VERIFY(state.RequestInFlight);
- state.RequestInFlight = false;
- const TActorId actorId = Info->GetActorId(orderNumber);
- const ui32 nodeId = actorId.NodeId();
- TNodeInfo& node = Nodes[nodeId];
- const size_t num = node.RequestsInFlight.erase(&*it);
- Y_VERIFY(num == 1);
- RequestsInFlight.erase(it);
- return orderNumber;
- }
-
- void TFetchMachine::Handle(TEvBlobStorage::TEvVAssimilateResult::TPtr ev) {
- const ui32 orderNumber = EndRequest(ev->Cookie);
- const auto& record = ev->Get()->Record;
-
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT42, "EvVAssimilate", (GroupId, Info->GroupID), (Id, ev->Cookie),
- (OrderNumber, orderNumber), (Status, record.GetStatus()), (Blocks.size, record.BlocksSize()),
- (Barriers.size, record.BarriersSize()), (Blobs.size, record.BlobsSize()));
-
- TPerDiskState& state = PerDiskState[orderNumber];
-
- const bool wasExhausted = state.Exhausted();
-
- for (const auto& item : record.GetBlocks()) {
- const ui64 tabletId = item.GetTabletId();
- state.LastBlock.emplace(tabletId);
- if (!LastProcessedBlock || *LastProcessedBlock <= *state.LastBlock) {
- state.Blocks.emplace_back(item);
- }
- }
- for (const auto& item : record.GetBarriers()) {
- const ui64 tabletId = item.GetTabletId();
- const ui8 channel = item.GetChannel();
- state.LastBarrier.emplace(tabletId, channel);
- if (!LastProcessedBarrier || *LastProcessedBarrier <= *state.LastBarrier) {
- state.Barriers.emplace_back(item);
- }
- }
- ui64 raw[3] = {0, 0, 0};
- for (const auto& item : record.GetBlobs()) {
- if (item.HasRawX1()) {
- raw[0] = item.GetRawX1();
- } else if (item.HasDiffX1()) {
- raw[0] += item.GetDiffX1();
- }
- if (item.HasRawX2()) {
- raw[1] = item.GetRawX2();
- } else if (item.HasDiffX2()) {
- raw[1] += item.GetDiffX2();
- }
- if (item.HasRawX3()) {
- raw[2] = item.GetRawX3();
- } else if (item.HasDiffX3()) {
- raw[2] += item.GetDiffX3();
- }
- const TLogoBlobID id(raw);
- state.LastBlob.emplace(id);
- if (!LastProcessedBlob || *LastProcessedBlob <= *state.LastBlob) {
- state.Blobs.emplace_back(item, id);
- }
- }
-
- if (wasExhausted && !state.Exhausted()) {
- Heap.push_back(&state);
- std::push_heap(Heap.begin(), Heap.end(), TPerDiskState::THeapCompare());
- }
-
- if (record.BlocksSize() + record.BarriersSize() + record.BlobsSize() == 0 && record.GetStatus() == NKikimrProto::OK) {
- state.Finished = true;
- } else if (state.Exhausted()) { // still no records; for example, when all were skipped
- return IssueAssimilateCmdToVDisk(orderNumber);
- }
-
- Merge();
- }
-
- void TFetchMachine::Merge() {
- if (AssimilateDataInFlight) {
- return;
- }
-
- auto ev = std::make_unique<TEvAssimilatedData>();
-
- const TBlobStorageGroupInfo::TTopology *top = &Info->GetTopology();
- TBlobStorageGroupInfo::TGroupVDisks mergeableDisks(top);
- for (ui32 i = 0; i < PerDiskState.size(); ++i) {
- TPerDiskState& state = PerDiskState[i];
- if (!state.Exhausted() || state.Finished) {
- mergeableDisks |= {top, top->GetVDiskId(i)};
- } else if (state.RequestInFlight) {
- return;
- }
- }
-
- static constexpr ui64 MaxBlock = Max<ui64>();
- static constexpr std::tuple<ui64, ui8> MaxBarrier = std::make_tuple(Max<ui64>(), Max<ui8>());
- static const TLogoBlobID MaxBlob = Max<TLogoBlobID>();
-
- bool quorumCorrect = Info->GetQuorumChecker().CheckQuorumForGroup(mergeableDisks);
- while (quorumCorrect) {
- if (Heap.empty()) {
- LastProcessedBlock.emplace(MaxBlock);
- LastProcessedBarrier.emplace(MaxBarrier);
- LastProcessedBlob.emplace(MaxBlob);
- break;
- }
-
- std::optional<TBlock> block;
- std::optional<TBarrier> barrier;
- std::optional<TBlob> blob;
- TSubgroupPartLayout layout;
-
- auto callback = [&](auto&& value, ui32 orderNumber) {
- using T = std::decay_t<decltype(value)>;
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT37, "AssimilatedItem", (GroupId, Info->GroupID), (OrderNumber, orderNumber),
- (Value, value));
- if constexpr (std::is_same_v<T, TBlock>) {
- Y_VERIFY(!LastProcessedBlock || *LastProcessedBlock < value.TabletId);
- if (block) {
- block->Merge(value);
- } else {
- block.emplace(std::move(value));
- }
- } else if constexpr (std::is_same_v<T, TBarrier>) {
- Y_VERIFY(!LastProcessedBarrier || *LastProcessedBarrier < std::make_tuple(value.TabletId, value.Channel));
- if (barrier) {
- barrier->Merge(value);
- } else {
- barrier.emplace(std::move(value));
- }
- } else if constexpr (std::is_same_v<T, TBlob>) {
- Y_VERIFY(!LastProcessedBlob || *LastProcessedBlob < value.Id);
- if (blob) {
- blob->Merge(value);
- } else {
- blob.emplace(std::move(value));
- }
-
- auto local = TIngress(value.Ingress).LocalParts(top->GType);
- for (ui8 partIdx = local.FirstPosition(); partIdx != local.GetSize(); partIdx = local.NextPosition(partIdx)) {
- layout.AddItem(top->GetIdxInSubgroup(top->GetVDiskId(orderNumber), blob->Id.Hash()), partIdx, top->GType);
- }
- } else {
- static_assert(TDependentFalse<T>, "incorrect case");
- }
- };
-
- TPerDiskState& head = *Heap.front();
- auto key = head.FirstKey();
- while (!Heap.empty() && Heap.front()->FirstKey() == key) {
- std::pop_heap(Heap.begin(), Heap.end(), TPerDiskState::THeapCompare());
- const ui32 orderNumber = Heap.back() - PerDiskState.data();
- TPerDiskState& item = PerDiskState[orderNumber];
- item.PopFirstItem(std::bind(callback, std::placeholders::_1, orderNumber));
- if (item.Exhausted()) {
- if (!item.Finished) {
- // data not yet received -- ask for it
- IssueAssimilateCmdToVDisk(orderNumber);
- // mark disk temporarily unavailable
- mergeableDisks -= {top, top->GetVDiskId(orderNumber)};
- quorumCorrect = Info->GetQuorumChecker().CheckQuorumForGroup(mergeableDisks);
- }
- // remove item from the heap -- it has no valid data to process
- Heap.pop_back();
- } else {
- // more items to do
- std::push_heap(Heap.begin(), Heap.end(), TPerDiskState::THeapCompare());
- }
- }
-
- if (block) {
- LastProcessedBlock.emplace(block->TabletId);
- ev->Blocks.push_back(std::move(*block));
- } else if (barrier) {
- LastProcessedBlock.emplace(MaxBlock);
- LastProcessedBarrier.emplace(barrier->TabletId, barrier->Channel);
- ev->Barriers.push_back(std::move(*barrier));
- } else if (blob) {
- blob->Keep = TIngress(blob->Ingress).KeepUnconditionally(TIngress::IngressMode(top->GType));
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT48, "assimilated blob", (GroupId, Info->GroupID), (Id, blob->Id),
- (Layout, layout.ToString(top->GType)), (Keep, blob->Keep));
- LastProcessedBlock.emplace(MaxBlock);
- LastProcessedBarrier.emplace(MaxBarrier);
- LastProcessedBlob.emplace(blob->Id);
- ev->Blobs.push_back(std::move(*blob));
- } else {
- Y_FAIL();
- }
-
- if (ev->Blocks.size() + ev->Barriers.size() + ev->Blobs.size() == 10'000) {
- break;
- }
- }
-
- ev->BlocksFinished = LastProcessedBlock == MaxBlock;
- ev->BarriersFinished = LastProcessedBarrier == MaxBarrier;
- ev->BlobsFinished = LastProcessedBlob == MaxBlob;
- Y_VERIFY(ev->BlocksFinished >= ev->BarriersFinished && ev->BarriersFinished >= ev->BlobsFinished);
-
- // store the assimilator state
- TStringOutput stream(ev->AssimilatorState);
-
- Save(&stream, ui8((LastProcessedBlock ? TStateMask::Block : 0)
- | (LastProcessedBarrier ? TStateMask::Barrier : 0)
- | (LastProcessedBlob ? TStateMask::Blob : 0)));
- if (LastProcessedBlock) {
- Save(&stream, *LastProcessedBlock);
- }
- if (LastProcessedBarrier) {
- Save(&stream, *LastProcessedBarrier);
- }
- if (LastProcessedBlob) {
- Save(&stream, *LastProcessedBlob);
- }
-
- Self.Send(BlobDepotId, ev.release());
-
- AssimilateDataInFlight = true;
- }
-
- void TFetchMachine::HandleAssimilateDataConfirm() {
- Y_VERIFY(AssimilateDataInFlight);
- AssimilateDataInFlight = false;
- Merge();
- }
-
-} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/assimilator_fetch_machine.h b/ydb/core/blob_depot/assimilator_fetch_machine.h
deleted file mode 100644
index 225c751d9e..0000000000
--- a/ydb/core/blob_depot/assimilator_fetch_machine.h
+++ /dev/null
@@ -1,135 +0,0 @@
-#pragma once
-
-#include "defs.h"
-#include "blob_depot_tablet.h"
-
-namespace NKikimr::NBlobDepot {
-
- class TBlobDepot::TGroupAssimilatorFetchMachine {
- TActorIdentity Self;
- TIntrusivePtr<TBlobStorageGroupInfo> Info;
- TActorId BlobDepotId;
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // Data processing
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-
- struct TPerDiskState {
- std::deque<TBlock> Blocks;
- std::optional<ui64> LastBlock;
- std::deque<TBarrier> Barriers;
- std::optional<std::tuple<ui64, ui8>> LastBarrier;
- std::deque<TBlob> Blobs;
- std::optional<TLogoBlobID> LastBlob;
- bool Finished = false;
- bool RequestInFlight = false;
-
- ui64 FirstBlock() const {
- Y_VERIFY_DEBUG(!Blocks.empty());
- return Blocks.front().TabletId;
- }
-
- std::tuple<ui64, ui8> FirstBarrier() const {
- Y_VERIFY_DEBUG(!Barriers.empty());
- const auto& barrier = Barriers.front();
- return {barrier.TabletId, barrier.Channel};
- }
-
- TLogoBlobID FirstBlob() const {
- Y_VERIFY_DEBUG(!Blobs.empty());
- return Blobs.front().Id;
- }
-
- std::variant<ui64, std::tuple<ui64, ui8>, TLogoBlobID> FirstKey() const {
- if (!Blocks.empty()) {
- return FirstBlock();
- } else if (!Barriers.empty()) {
- return FirstBarrier();
- } else if (!Blobs.empty()) {
- return FirstBlob();
- } else {
- Y_FAIL();
- }
- }
-
- template<typename T>
- void PopFirstItem(T&& callback) {
- if (!Blocks.empty()) {
- callback(Blocks.front());
- Blocks.pop_front();
- } else if (!Barriers.empty()) {
- callback(Barriers.front());
- Barriers.pop_front();
- } else if (!Blobs.empty()) {
- callback(Blobs.front());
- Blobs.pop_front();
- } else {
- Y_FAIL();
- }
- }
-
- struct THeapCompare {
- bool operator ()(const TPerDiskState *x, const TPerDiskState *y) const {
- if (!x->Blocks.empty() && !y->Blocks.empty()) {
- return x->FirstBlock() > y->FirstBlock();
- } else if (x->Blocks.empty() != y->Blocks.empty()) {
- return x->Blocks.empty() > y->Blocks.empty();
- } else if (!x->Barriers.empty() && !y->Barriers.empty()) {
- return x->FirstBarrier() > y->FirstBarrier();
- } else if (x->Barriers.empty() != y->Barriers.empty()) {
- return x->Barriers.empty() > y->Barriers.empty();
- } else if (!x->Blobs.empty() && !y->Blobs.empty()) {
- return x->FirstBlob() > y->FirstBlob();
- } else {
- return x->Barriers.empty() > y->Barriers.empty();
- }
- }
- };
-
- bool Exhausted() const {
- return Blocks.empty() && Barriers.empty() && Blobs.empty();
- }
- };
-
- std::vector<TPerDiskState> PerDiskState;
- std::vector<TPerDiskState*> Heap;
- std::optional<ui64> LastProcessedBlock;
- std::optional<std::tuple<ui64, ui8>> LastProcessedBarrier;
- std::optional<TLogoBlobID> LastProcessedBlob;
-
- struct TRequestInFlight {
- ui32 OrderNumber;
- };
-
- using TRequestsInFlight = THashMap<ui64, TRequestInFlight>;
-
- struct TNodeInfo {
- std::vector<ui32> OrderNumbers;
- THashSet<TRequestsInFlight::value_type*> RequestsInFlight;
- };
-
- ui32 LastRequestId = 0;
- TRequestsInFlight RequestsInFlight;
- THashMap<ui32, TNodeInfo> Nodes;
-
- bool AssimilateDataInFlight = false;
-
- public:
- TGroupAssimilatorFetchMachine(TActorIdentity self, TIntrusivePtr<TBlobStorageGroupInfo> info,
- TActorId blobDepotId, const std::optional<TString>& assimilatorState);
- void Handle(TAutoPtr<IEventHandle>& ev);
- void OnPassAway();
-
- private:
- void IssueAssimilateCmdToVDisk(ui32 orderNumber);
- void Handle(TEvInterconnect::TEvNodeConnected::TPtr ev);
- void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr ev);
- void Handle(TEvents::TEvUndelivered::TPtr ev);
- ui32 EndRequest(ui64 id);
- void Handle(TEvBlobStorage::TEvVAssimilateResult::TPtr ev);
- void HandleAssimilateDataConfirm();
- void Merge();
- void MergeDone();
- };
-
-} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/blob_depot.cpp b/ydb/core/blob_depot/blob_depot.cpp
index e48168aac8..f1ca62ccd2 100644
--- a/ydb/core/blob_depot/blob_depot.cpp
+++ b/ydb/core/blob_depot/blob_depot.cpp
@@ -46,9 +46,6 @@ namespace NKikimr::NBlobDepot {
hFunc(TEvTabletPipe::TEvServerConnected, Handle);
hFunc(TEvTabletPipe::TEvServerDisconnected, Handle);
- fFunc(TEvents::TSystem::Gone, HandleGone);
- hFunc(TEvAssimilatedData, Handle);
-
default:
if (!HandleDefaultEvents(ev, ctx)) {
Y_FAIL("unexpected event Type# 0x%08" PRIx32, type);
@@ -61,7 +58,7 @@ namespace NKikimr::NBlobDepot {
}
void TBlobDepot::PassAway() {
- for (const TActorId& actorId : {RunningGroupAssimilator, CopierId}) {
+ for (const TActorId& actorId : {GroupAssimilatorId}) {
if (actorId) {
TActivationContext::Send(new IEventHandle(TEvents::TSystem::Poison, 0, actorId, SelfId(), nullptr, 0));
}
diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h
index e9e7be234e..c3529148b1 100644
--- a/ydb/core/blob_depot/blob_depot_tablet.h
+++ b/ydb/core/blob_depot/blob_depot_tablet.h
@@ -16,140 +16,9 @@ namespace NKikimr::NBlobDepot {
struct TEvPrivate {
enum {
EvCheckExpiredAgents = EventSpaceBegin(TEvents::ES_PRIVATE),
- EvAssimilatedData,
- EvAssimilatedDataConfirm,
};
};
- struct TBlock {
- ui64 TabletId;
- ui32 BlockedGeneration;
- ui64 IssuerGuid = 0;
-
- TBlock() = default;
-
- TBlock(const NKikimrBlobStorage::TEvVAssimilateResult::TBlock& item)
- : TabletId(item.GetTabletId())
- , BlockedGeneration(item.GetBlockedGeneration())
- {}
-
- template<typename T>
- static TBlock FromRow(T&& row) {
- TBlock block;
- block.TabletId = row.template GetValue<Schema::Blocks::TabletId>();
- block.BlockedGeneration = row.template GetValue<Schema::Blocks::BlockedGeneration>();
- block.IssuerGuid = row.template GetValue<Schema::Blocks::IssuerGuid>();
- return block;
- }
-
- void Merge(TBlock& other) {
- Y_VERIFY_DEBUG(other.TabletId == TabletId);
- BlockedGeneration = Max(BlockedGeneration, other.BlockedGeneration);
- }
-
- TString ToString() const {
- return TStringBuilder() << "{" << TabletId << ":" << BlockedGeneration << "}";
- }
- };
-
- struct TBarrier {
- struct TValue {
- TGenStep GenCtr;
- TGenStep Collect;
-
- TValue() = default;
-
- TValue(const NKikimrBlobStorage::TEvVAssimilateResult::TBarrier::TValue& value)
- : GenCtr(value.GetRecordGeneration(), value.GetPerGenerationCounter())
- , Collect(value.GetCollectGeneration(), value.GetCollectStep())
- {}
-
- void Merge(TValue& other) {
- if (GenCtr < other.GenCtr) {
- *this = other;
- }
- }
-
- TString ToString() const {
- return TStringBuilder() << "{" << GenCtr.ToString() << "=>" << Collect.ToString() << "}";
- }
- };
-
- ui64 TabletId;
- ui8 Channel;
- TValue Hard;
- TValue Soft;
-
- TBarrier() = default;
-
- TBarrier(const NKikimrBlobStorage::TEvVAssimilateResult::TBarrier& item)
- : TabletId(item.GetTabletId())
- , Channel(item.GetChannel())
- , Hard(item.HasHard() ? TValue(item.GetHard()) : TValue())
- , Soft(item.HasSoft() ? TValue(item.GetSoft()) : TValue())
- {}
-
- template<typename TRow>
- static TBarrier FromRow(TRow&& row) {
- using T = Schema::Barriers;
- TBarrier barrier;
- barrier.TabletId = row.template GetValue<T::TabletId>();
- barrier.Channel = row.template GetValue<T::Channel>();
- if (row.template HaveValue<T::HardGenCtr>() && row.template HaveValue<T::Hard>()) {
- barrier.Hard.GenCtr = TGenStep(row.template GetValue<T::HardGenCtr>());
- barrier.Hard.Collect = TGenStep(row.template GetValue<T::Hard>());
- }
- if (row.template HaveValue<T::SoftGenCtr>() && row.template HaveValue<T::Soft>()) {
- barrier.Soft.GenCtr = TGenStep(row.template GetValue<T::SoftGenCtr>());
- barrier.Soft.Collect = TGenStep(row.template GetValue<T::Soft>());
- }
- return barrier;
- }
-
- void Merge(TBarrier& other) {
- Y_VERIFY_DEBUG(TabletId == other.TabletId && Channel == other.Channel);
- Hard.Merge(other.Hard);
- Soft.Merge(other.Soft);
- }
-
- TString ToString() const {
- return TStringBuilder() << "{" << TabletId << ":" << int(Channel) << "@" << Hard.ToString() << "/"
- << Soft.ToString() << "}";
- }
- };
-
- struct TBlob {
- TLogoBlobID Id;
- ui64 Ingress;
- bool Keep = false;
-
- TBlob() = default;
-
- TBlob(const NKikimrBlobStorage::TEvVAssimilateResult::TBlob& item, const TLogoBlobID& id)
- : Id(id)
- , Ingress(item.GetIngress())
- {}
-
- void Merge(TBlob& other) {
- Y_VERIFY_DEBUG(Id == other.Id);
- Ingress |= other.Ingress;
- }
-
- TString ToString() const {
- return TStringBuilder() << "{" << Id.ToString() << "/" << Ingress << "}";
- }
- };
-
- struct TEvAssimilatedData : TEventLocal<TEvAssimilatedData, TEvPrivate::EvAssimilatedData> {
- std::deque<TBlock> Blocks;
- bool BlocksFinished = false;
- std::deque<TBarrier> Barriers;
- bool BarriersFinished = false;
- std::deque<TBlob> Blobs;
- bool BlobsFinished = false;
- TString AssimilatorState;
- };
-
public:
TBlobDepot(TActorId tablet, TTabletStorageInfo *info);
~TBlobDepot();
@@ -337,20 +206,13 @@ namespace NKikimr::NBlobDepot {
struct TToken {};
std::shared_ptr<TToken> Token = std::make_shared<TToken>();
- TActorId RunningGroupAssimilator;
- TActorId CopierId;
+ TActorId GroupAssimilatorId;
EDecommitState DecommitState = EDecommitState::Default;
std::optional<TString> AssimilatorState;
class TGroupAssimilator;
- class TGroupAssimilatorFetchMachine;
-
- class TGroupAssimilatorCopierActor;
void StartGroupAssimilator();
- void HandleGone(TAutoPtr<IEventHandle> ev);
- void Handle(TEvAssimilatedData::TPtr ev);
- void ProcessAssimilatedData(TEvAssimilatedData& msg);
};
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/blocks.cpp b/ydb/core/blob_depot/blocks.cpp
index 5cd5dfb685..a2604fdd80 100644
--- a/ydb/core/blob_depot/blocks.cpp
+++ b/ydb/core/blob_depot/blocks.cpp
@@ -190,20 +190,21 @@ namespace NKikimr::NBlobDepot {
)
};
- void TBlobDepot::TBlocksManager::AddBlockOnLoad(const TBlobDepot::TBlock& block) {
- Blocks[block.TabletId] = {
- .BlockedGeneration = block.BlockedGeneration,
- .IssuerGuid = block.IssuerGuid,
+ void TBlobDepot::TBlocksManager::AddBlockOnLoad(ui64 tabletId, ui32 blockedGeneration, ui64 issuerGuid) {
+ Blocks[tabletId] = {
+ .BlockedGeneration = blockedGeneration,
+ .IssuerGuid = issuerGuid,
};
}
- void TBlobDepot::TBlocksManager::AddBlockOnDecommit(const TBlobDepot::TBlock& block, NTabletFlatExecutor::TTransactionContext& txc) {
- AddBlockOnLoad(block);
+ void TBlobDepot::TBlocksManager::AddBlockOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlock& block,
+ NTabletFlatExecutor::TTransactionContext& txc) {
+ AddBlockOnLoad(block.TabletId, block.BlockedGeneration, 0);
NIceDb::TNiceDb db(txc.DB);
db.Table<Schema::Blocks>().Key(block.TabletId).Update(
NIceDb::TUpdate<Schema::Blocks::BlockedGeneration>(block.BlockedGeneration),
- NIceDb::TUpdate<Schema::Blocks::IssuerGuid>(block.IssuerGuid)
+ NIceDb::TUpdate<Schema::Blocks::IssuerGuid>(0)
);
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT44, "adding block through decommission", (Id, Self->GetLogId()), (Block, block));
diff --git a/ydb/core/blob_depot/blocks.h b/ydb/core/blob_depot/blocks.h
index 5e0ed55324..658fafba12 100644
--- a/ydb/core/blob_depot/blocks.h
+++ b/ydb/core/blob_depot/blocks.h
@@ -39,8 +39,8 @@ namespace NKikimr::NBlobDepot {
: Self(self)
{}
- void AddBlockOnLoad(const TBlobDepot::TBlock& block);
- void AddBlockOnDecommit(const TBlobDepot::TBlock& block, NTabletFlatExecutor::TTransactionContext& txc);
+ void AddBlockOnLoad(ui64 tabletId, ui32 blockedGeneration, ui64 issuerGuid);
+ void AddBlockOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlock& block, NTabletFlatExecutor::TTransactionContext& txc);
void OnBlockCommitted(ui64 tabletId, ui32 blockedGeneration, ui32 nodeId, ui64 issuerGuid,
std::unique_ptr<IEventHandle> response);
void Handle(TEvBlobDepot::TEvBlock::TPtr ev);
diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp
index fc69cd33e3..937daf5abe 100644
--- a/ydb/core/blob_depot/data.cpp
+++ b/ydb/core/blob_depot/data.cpp
@@ -25,7 +25,8 @@ namespace NKikimr::NBlobDepot {
PutKey(std::move(key), TValue(std::move(proto)));
}
- void TData::AddDataOnDecommit(const TBlobDepot::TBlob& blob, NTabletFlatExecutor::TTransactionContext& txc) {
+ void TData::AddDataOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlob& blob,
+ NTabletFlatExecutor::TTransactionContext& txc) {
bool underSoft, underHard;
Self->BarrierServer->GetBlobBarrierRelation(blob.Id, &underSoft, &underHard);
if (underHard || (underSoft && !blob.Keep)) {
diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h
index 797e6a5ac8..e6f83d2056 100644
--- a/ydb/core/blob_depot/data.h
+++ b/ydb/core/blob_depot/data.h
@@ -335,7 +335,7 @@ namespace NKikimr::NBlobDepot {
TRecordsPerChannelGroup& GetRecordsPerChannelGroup(TLogoBlobID id);
void AddDataOnLoad(TKey key, TString value);
- void AddDataOnDecommit(const TBlobDepot::TBlob& blob, NTabletFlatExecutor::TTransactionContext& txc);
+ void AddDataOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlob& blob, NTabletFlatExecutor::TTransactionContext& txc);
void AddTrashOnLoad(TLogoBlobID id);
void AddGenStepOnLoad(ui8 channel, ui32 groupId, TGenStep issuedGenStep, TGenStep confirmedGenStep);
diff --git a/ydb/core/blob_depot/events.h b/ydb/core/blob_depot/events.h
index 773520084f..e961ca960e 100644
--- a/ydb/core/blob_depot/events.h
+++ b/ydb/core/blob_depot/events.h
@@ -90,7 +90,7 @@ namespace NKikimr {
if (ev.InterconnectSession) {
handle->Rewrite(TEvInterconnect::EvForward, ev.InterconnectSession);
}
- return std::make_pair(std::move(handle), record);
+ return std::make_tuple(std::move(handle), record);
}
};
diff --git a/ydb/core/blob_depot/garbage_collection.cpp b/ydb/core/blob_depot/garbage_collection.cpp
index 8c378e712e..34f057652c 100644
--- a/ydb/core/blob_depot/garbage_collection.cpp
+++ b/ydb/core/blob_depot/garbage_collection.cpp
@@ -21,7 +21,7 @@ namespace NKikimr::NBlobDepot {
public:
TTxCollectGarbage(TBlobDepot *self, ui64 tabletId, ui8 channel, ui32 keepIndex = 0, ui32 doNotKeepIndex = 0)
: TTransactionBase(self)
- , Barrier(Self->BarrierServer->Barriers[std::make_pair(tabletId, channel)])
+ , Barrier(Self->BarrierServer->Barriers[std::make_tuple(tabletId, channel)])
, Request(Barrier.ProcessingQ.front())
, TabletId(tabletId)
, Channel(channel)
@@ -67,7 +67,7 @@ namespace NKikimr::NBlobDepot {
} else if (!record.HasGeneration() || !record.HasPerGenerationCounter()) {
Error = "Generation/PerGenerationCounter are not set";
} else {
- const auto key = std::make_pair(record.GetTabletId(), record.GetChannel());
+ const auto key = std::make_tuple(record.GetTabletId(), record.GetChannel());
auto& barriers = Self->BarrierServer->Barriers;
if (const auto it = barriers.find(key); it != barriers.end()) {
// extract existing barrier record
@@ -146,7 +146,7 @@ namespace NKikimr::NBlobDepot {
return false;
}
- const auto key = std::make_pair(record.GetTabletId(), record.GetChannel());
+ const auto key = std::make_tuple(record.GetTabletId(), record.GetChannel());
auto& barrier = Self->BarrierServer->Barriers[key];
TGenStep& barrierGenCtr = record.GetHard() ? barrier.HardGenCtr : barrier.SoftGenCtr;
TGenStep& barrierGenStep = record.GetHard() ? barrier.Hard : barrier.Soft;
@@ -175,45 +175,50 @@ namespace NKikimr::NBlobDepot {
}
};
- void TBlobDepot::TBarrierServer::AddBarrierOnLoad(const TBlobDepot::TBarrier& barrier) {
- Barriers[std::make_pair(barrier.TabletId, barrier.Channel)] = {
- .SoftGenCtr = barrier.Soft.GenCtr,
- .Soft = barrier.Soft.Collect,
- .HardGenCtr = barrier.Hard.GenCtr,
- .Hard = barrier.Hard.Collect,
+ void TBlobDepot::TBarrierServer::AddBarrierOnLoad(ui64 tabletId, ui8 channel, TGenStep softGenCtr, TGenStep soft,
+ TGenStep hardGenCtr, TGenStep hard) {
+ Barriers[std::make_tuple(tabletId, channel)] = {
+ .SoftGenCtr = softGenCtr,
+ .Soft = soft,
+ .HardGenCtr = hardGenCtr,
+ .Hard = hard,
};
}
- void TBlobDepot::TBarrierServer::AddBarrierOnDecommit(const TBlobDepot::TBarrier& barrier,
+ void TBlobDepot::TBarrierServer::AddBarrierOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBarrier& barrier,
NTabletFlatExecutor::TTransactionContext& txc) {
NIceDb::TNiceDb db(txc.DB);
- const auto key = std::make_pair(barrier.TabletId, barrier.Channel);
+ const auto key = std::make_tuple(barrier.TabletId, barrier.Channel);
auto& current = Barriers[key];
#define DO(TYPE) \
- if (current.TYPE##GenCtr < barrier.TYPE.GenCtr) { \
- if (current.TYPE <= barrier.TYPE.Collect) { \
- current.TYPE##GenCtr = barrier.TYPE.GenCtr; \
- current.TYPE = barrier.TYPE.Collect; \
- db.Table<Schema::Barriers>().Key(barrier.TabletId, barrier.Channel).Update( \
- NIceDb::TUpdate<Schema::Barriers::TYPE##GenCtr>(ui64(barrier.TYPE.GenCtr)), \
- NIceDb::TUpdate<Schema::Barriers::TYPE>(ui64(barrier.TYPE.Collect)) \
- ); \
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT45, "replacing " #TYPE " barrier through decommission", \
- (TabletId, barrier.TabletId), (Channel, int(barrier.Channel)), \
- (GenCtr, current.TYPE##GenCtr), (Collect, current.TYPE), \
- (Barrier, barrier)); \
- } else { \
- STLOG(PRI_ERROR, BLOB_DEPOT, BDT36, "decreasing " #TYPE " barrier through decommission", \
+ { \
+ const TGenStep barrierGenCtr(barrier.TYPE.RecordGeneration, barrier.TYPE.PerGenerationCounter); \
+ const TGenStep barrierCollect(barrier.TYPE.CollectGeneration, barrier.TYPE.CollectStep); \
+ if (current.TYPE##GenCtr < barrierGenCtr) { \
+ if (current.TYPE <= barrierCollect) { \
+ current.TYPE##GenCtr = barrierGenCtr; \
+ current.TYPE = barrierCollect; \
+ db.Table<Schema::Barriers>().Key(barrier.TabletId, barrier.Channel).Update( \
+ NIceDb::TUpdate<Schema::Barriers::TYPE##GenCtr>(ui64(barrierGenCtr)), \
+ NIceDb::TUpdate<Schema::Barriers::TYPE>(ui64(barrierCollect)) \
+ ); \
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT45, "replacing " #TYPE " barrier through decommission", \
+ (TabletId, barrier.TabletId), (Channel, int(barrier.Channel)), \
+ (GenCtr, current.TYPE##GenCtr), (Collect, current.TYPE), \
+ (Barrier, barrier)); \
+ } else { \
+ STLOG(PRI_ERROR, BLOB_DEPOT, BDT36, "decreasing " #TYPE " barrier through decommission", \
+ (TabletId, barrier.TabletId), (Channel, int(barrier.Channel)), \
+ (GenCtr, current.TYPE##GenCtr), (Collect, current.TYPE), \
+ (Barrier, barrier)); \
+ } \
+ } else if (current.TYPE##GenCtr == barrierGenCtr && current.TYPE != barrierCollect) { \
+ STLOG(PRI_ERROR, BLOB_DEPOT, BDT43, "barrier value mismatch through decommission", \
(TabletId, barrier.TabletId), (Channel, int(barrier.Channel)), \
(GenCtr, current.TYPE##GenCtr), (Collect, current.TYPE), \
(Barrier, barrier)); \
} \
- } else if (current.TYPE##GenCtr == barrier.TYPE.GenCtr && current.TYPE != barrier.TYPE.Collect) { \
- STLOG(PRI_ERROR, BLOB_DEPOT, BDT43, "barrier value mismatch through decommission", \
- (TabletId, barrier.TabletId), (Channel, int(barrier.Channel)), \
- (GenCtr, current.TYPE##GenCtr), (Collect, current.TYPE), \
- (Barrier, barrier)); \
}
DO(Hard)
@@ -222,7 +227,7 @@ namespace NKikimr::NBlobDepot {
void TBlobDepot::TBarrierServer::Handle(TEvBlobDepot::TEvCollectGarbage::TPtr ev) {
const auto& record = ev->Get()->Record;
- const auto key = std::make_pair(record.GetTabletId(), record.GetChannel());
+ const auto key = std::make_tuple(record.GetTabletId(), record.GetChannel());
auto& barrier = Barriers[key];
barrier.ProcessingQ.emplace_back(ev.Release());
if (Self->Data->IsLoaded() && barrier.ProcessingQ.size() == 1) {
@@ -231,12 +236,12 @@ namespace NKikimr::NBlobDepot {
}
bool TBlobDepot::TBarrierServer::CheckBlobForBarrier(TLogoBlobID id) const {
- const auto it = Barriers.find(std::make_pair(id.TabletID(), id.Channel()));
+ const auto it = Barriers.find(std::make_tuple(id.TabletID(), id.Channel()));
return it == Barriers.end() || TGenStep(id) > Max(it->second.Soft, it->second.Hard);
}
void TBlobDepot::TBarrierServer::GetBlobBarrierRelation(TLogoBlobID id, bool *underSoft, bool *underHard) const {
- const auto it = Barriers.find(std::make_pair(id.TabletID(), id.Channel()));
+ const auto it = Barriers.find(std::make_tuple(id.TabletID(), id.Channel()));
const TGenStep genStep(id);
*underSoft = it == Barriers.end() ? false : genStep <= it->second.Soft;
*underHard = it == Barriers.end() ? false : genStep <= it->second.Hard;
@@ -245,7 +250,8 @@ namespace NKikimr::NBlobDepot {
void TBlobDepot::TBarrierServer::OnDataLoaded() {
for (auto& [key, barrier] : Barriers) {
if (!barrier.ProcessingQ.empty()) {
- Self->Execute(std::make_unique<TTxCollectGarbage>(Self, key.first, key.second));
+ const auto& [tabletId, channel] = key;
+ Self->Execute(std::make_unique<TTxCollectGarbage>(Self, tabletId, channel));
}
}
}
diff --git a/ydb/core/blob_depot/garbage_collection.h b/ydb/core/blob_depot/garbage_collection.h
index ebaa1fb589..d12c4abb57 100644
--- a/ydb/core/blob_depot/garbage_collection.h
+++ b/ydb/core/blob_depot/garbage_collection.h
@@ -16,7 +16,7 @@ namespace NKikimr::NBlobDepot {
std::deque<std::unique_ptr<TEvBlobDepot::TEvCollectGarbage::THandle>> ProcessingQ;
};
- THashMap<std::pair<ui64, ui8>, TBarrier> Barriers;
+ THashMap<std::tuple<ui64, ui8>, TBarrier> Barriers;
private:
class TTxCollectGarbage;
@@ -26,8 +26,8 @@ namespace NKikimr::NBlobDepot {
: Self(self)
{}
- void AddBarrierOnLoad(const TBlobDepot::TBarrier& barrier);
- void AddBarrierOnDecommit(const TBlobDepot::TBarrier& barrier, NTabletFlatExecutor::TTransactionContext& txc);
+ void AddBarrierOnLoad(ui64 tabletId, ui8 channel, TGenStep softGenCtr, TGenStep soft, TGenStep hardGenCtr, TGenStep hard);
+ void AddBarrierOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBarrier& barrier, NTabletFlatExecutor::TTransactionContext& txc);
void Handle(TEvBlobDepot::TEvCollectGarbage::TPtr ev);
bool CheckBlobForBarrier(TLogoBlobID id) const;
void GetBlobBarrierRelation(TLogoBlobID id, bool *underSoft, bool *underHard) const;
@@ -36,7 +36,8 @@ namespace NKikimr::NBlobDepot {
template<typename TCallback>
void Enumerate(TCallback&& callback) {
for (const auto& [key, value] : Barriers) {
- callback(key.first, key.second, value.SoftGenCtr, value.Soft, value.HardGenCtr, value.Hard);
+ const auto& [tabletId, channel] = key;
+ callback(tabletId, channel, value.SoftGenCtr, value.Soft, value.HardGenCtr, value.Hard);
}
}
};
diff --git a/ydb/core/blob_depot/op_load.cpp b/ydb/core/blob_depot/op_load.cpp
index 10cd84db33..60701db398 100644
--- a/ydb/core/blob_depot/op_load.cpp
+++ b/ydb/core/blob_depot/op_load.cpp
@@ -46,7 +46,11 @@ namespace NKikimr::NBlobDepot {
return false;
}
while (table.IsValid()) {
- Self->BlocksManager->AddBlockOnLoad(TBlock::FromRow(table));
+ Self->BlocksManager->AddBlockOnLoad(
+ table.GetValue<Schema::Blocks::TabletId>(),
+ table.GetValue<Schema::Blocks::BlockedGeneration>(),
+ table.GetValue<Schema::Blocks::IssuerGuid>()
+ );
if (!table.Next()) {
return false;
}
@@ -60,7 +64,14 @@ namespace NKikimr::NBlobDepot {
return false;
}
while (table.IsValid()) {
- Self->BarrierServer->AddBarrierOnLoad(TBarrier::FromRow(table));
+ Self->BarrierServer->AddBarrierOnLoad(
+ table.GetValue<Schema::Barriers::TabletId>(),
+ table.GetValue<Schema::Barriers::Channel>(),
+ TGenStep(table.GetValue<Schema::Barriers::SoftGenCtr>()),
+ TGenStep(table.GetValue<Schema::Barriers::Soft>()),
+ TGenStep(table.GetValue<Schema::Barriers::HardGenCtr>()),
+ TGenStep(table.GetValue<Schema::Barriers::Hard>())
+ );
if (!table.Next()) {
return false;
}
diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h
index fae3b54999..3eb87d2b05 100644
--- a/ydb/core/blob_depot/types.h
+++ b/ydb/core/blob_depot/types.h
@@ -8,7 +8,7 @@ namespace NKikimr::NBlobDepot {
struct TChannelKind {
std::array<ui8, 256> ChannelToIndex;
- std::vector<std::pair<ui8, ui32>> ChannelGroups;
+ std::vector<std::tuple<ui8, ui32>> ChannelGroups;
};
#pragma pack(push, 1)
diff --git a/ydb/core/blobstorage/backpressure/queue.cpp b/ydb/core/blobstorage/backpressure/queue.cpp
index e768d2d841..45f7d9166d 100644
--- a/ydb/core/blobstorage/backpressure/queue.cpp
+++ b/ydb/core/blobstorage/backpressure/queue.cpp
@@ -155,6 +155,7 @@ void TBlobStorageQueue::SendToVDisk(const TActorContext& ctx, const TActorId& re
TYPE_CASE(TEvBlobStorage::TEvVCollectGarbage)
TYPE_CASE(TEvBlobStorage::TEvVGetBarrier)
TYPE_CASE(TEvBlobStorage::TEvVStatus)
+ TYPE_CASE(TEvBlobStorage::TEvVAssimilate)
#undef TYPE_CASE
default:
return Sprintf("0x%08" PRIx32, item.Event.GetType());
diff --git a/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp b/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp
index 43625bd5ff..1156f3180e 100644
--- a/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp
+++ b/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp
@@ -234,6 +234,7 @@ private:
case TEvBlobStorage::EvVGetResult:
case TEvBlobStorage::EvVStatusResult:
+ case TEvBlobStorage::EvVAssimilateResult:
expected = InterconnectChannel;
break;
@@ -458,6 +459,7 @@ private:
ctx.Send(BlobStorageProxy, new TEvProxyQueueState(VDiskId, QueueId, false, false));
Queue.DrainQueue(status, TStringBuilder() << "BS_QUEUE: " << errorReason, ctx);
DrainStatus(status, ctx);
+ DrainAssimilate(status, errorReason, ctx);
break;
}
State = EState::INITIAL;
@@ -668,6 +670,48 @@ private:
ctx.Send(sender, response.release(), 0, cookie);
}
+ THashMap<ui64, std::pair<TActorId, ui64>> AssimilateRequests;
+ ui64 NextAssimilateRequestCookie = 1;
+
+ void Handle(TEvBlobStorage::TEvVAssimilate::TPtr& ev, const TActorContext& ctx) {
+ if (IsReady()) {
+ const ui64 id = NextAssimilateRequestCookie++;
+ ctx.Send(RemoteVDisk, ev->Release().Release(), IEventHandle::MakeFlags(InterconnectChannel,
+ IEventHandle::FlagTrackDelivery), id, std::move(ev->TraceId));
+ AssimilateRequests.emplace(id, std::make_pair(ev->Sender, ev->Cookie));
+ } else {
+ ctx.Send(ev->Sender, new TEvBlobStorage::TEvVAssimilateResult(NKikimrProto::NOTREADY, "no connection",
+ VDiskId), 0, ev->Cookie);
+ }
+ }
+
+ void DrainAssimilate(NKikimrProto::EReplyStatus status, TString errorReason, const TActorContext& ctx) {
+ for (const auto& [id, ep] : std::exchange(AssimilateRequests, {})) {
+ const auto& [sender, cookie] = ep;
+ ctx.Send(sender, new TEvBlobStorage::TEvVAssimilateResult(status, errorReason, VDiskId), 0, cookie);
+ }
+ }
+
+ void Handle(TEvBlobStorage::TEvVAssimilateResult::TPtr& ev, const TActorContext& ctx) {
+ if (!CheckReply(ev, ctx)) {
+ return;
+ }
+ if (const auto it = AssimilateRequests.find(ev->Cookie); it != AssimilateRequests.end()) {
+ const auto& [sender, cookie] = it->second;
+ ctx.Send(sender, ev->Release().Release(), 0, cookie, std::move(ev->TraceId));
+ AssimilateRequests.erase(it);
+ } else {
+ const TString& message = TStringBuilder() << "unexpected TEvVAssimilateResult received"
+ << " Cookie# " << ev->Cookie
+ << " Sender# " << ev->Sender
+ << " Msg# " << ev->Get()->ToString()
+ << " VDiskId# " << VDiskId;
+ Y_VERIFY_DEBUG(false, "%s", message.data());
+ QLOG_CRIT_S("BSQ39", message);
+ }
+ ResetWatchdogTimer(ctx.Now());
+ }
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// REQUEST TIME TRACKING AND REPORTING SYSTEM
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////0
@@ -825,6 +869,8 @@ private:
XX(TEvBlobStorage::EvVReadyNotify, EvVReadyNotify) \
XX(TEvBlobStorage::EvVStatus, EvVStatus) \
XX(TEvBlobStorage::EvVStatusResult, EvVStatusResult) \
+ XX(TEvBlobStorage::EvVAssimilate, EvVAssimilate) \
+ XX(TEvBlobStorage::EvVAssimilateResult, EvVAssimilateResult) \
XX(TEvBlobStorage::EvRequestProxyQueueState, EvRequestProxyQueueState) \
XX(TEvBlobStorage::EvVWindowChange, EvVWindowChange) \
XX(TEvInterconnect::EvNodeConnected, EvNodeConnected) \
@@ -896,6 +942,8 @@ private:
HFunc(TEvBlobStorage::TEvVStatus, Handle)
HFunc(TEvBlobStorage::TEvVStatusResult, Handle)
+ HFunc(TEvBlobStorage::TEvVAssimilate, Handle)
+ HFunc(TEvBlobStorage::TEvVAssimilateResult, Handle)
HFunc(TEvRequestProxyQueueState, Handle)
diff --git a/ydb/core/blobstorage/dsproxy/CMakeLists.txt b/ydb/core/blobstorage/dsproxy/CMakeLists.txt
index ec25b93606..277b5f44e6 100644
--- a/ydb/core/blobstorage/dsproxy/CMakeLists.txt
+++ b/ydb/core/blobstorage/dsproxy/CMakeLists.txt
@@ -28,6 +28,7 @@ target_sources(core-blobstorage-dsproxy PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/dsproxy/dsproxy_stat.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/dsproxy/dsproxy_state.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/dsproxy/dsproxy_block.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp
diff --git a/ydb/core/blobstorage/dsproxy/defs.h b/ydb/core/blobstorage/dsproxy/defs.h
index f2c837b4d6..0daa207e3b 100644
--- a/ydb/core/blobstorage/dsproxy/defs.h
+++ b/ydb/core/blobstorage/dsproxy/defs.h
@@ -20,5 +20,6 @@
#include <util/generic/deque.h>
#include <util/generic/hash_set.h>
#include <util/generic/map.h>
+#include <util/generic/overloaded.h>
#include <util/string/escape.h>
#include <library/cpp/monlib/service/pages/templates.h>
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy.h b/ydb/core/blobstorage/dsproxy/dsproxy.h
index 5f1fd27e53..80095da114 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy.h
@@ -290,6 +290,7 @@ public:
CHECK(TEvVCollectGarbageResult);
CHECK(TEvVGetBarrierResult);
CHECK(TEvVStatusResult);
+ CHECK(TEvVAssimilateResult);
#undef CHECK
case TEvBlobStorage::EvProxySessionsState: {
@@ -331,7 +332,7 @@ public:
template<typename T>
void SendToQueue(std::unique_ptr<T> event, ui64 cookie, bool timeStatsEnabled = false) {
- if constexpr (!std::is_same_v<T, TEvBlobStorage::TEvVStatus>) {
+ if constexpr (!std::is_same_v<T, TEvBlobStorage::TEvVStatus> && !std::is_same_v<T, TEvBlobStorage::TEvVAssimilate>) {
event->MessageRelevanceTracker = MessageRelevanceTracker;
}
const TActorId queueId = GroupQueues->Send(*this, Info->GetTopology(), std::move(event), cookie, Span.GetTraceId(),
@@ -445,6 +446,7 @@ public:
XX(CollectGarbage)
XX(Status)
XX(Patch)
+ XX(Assimilate)
default:
Y_FAIL();
#undef XX
@@ -631,6 +633,11 @@ IActor* CreateBlobStorageGroupStatusRequest(const TIntrusivePtr<TBlobStorageGrou
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvStatus *ev,
ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters);
+IActor* CreateBlobStorageGroupAssimilateRequest(const TIntrusivePtr<TBlobStorageGroupInfo>& info,
+ const TIntrusivePtr<TGroupQueues>& state, const TActorId& source,
+ const TIntrusivePtr<TBlobStorageGroupProxyMon>& mon, TEvBlobStorage::TEvAssimilate *ev,
+ ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters>& storagePoolCounters);
+
IActor* CreateBlobStorageGroupEjectedProxy(ui32 groupId, TIntrusivePtr<TDsProxyNodeMon> &nodeMon);
IActor* CreateBlobStorageGroupProxyConfigured(TIntrusivePtr<TBlobStorageGroupInfo>&& info,
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp
new file mode 100644
index 0000000000..d8fe73077d
--- /dev/null
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_assimilate.cpp
@@ -0,0 +1,428 @@
+#include "dsproxy.h"
+
+namespace NKikimr {
+
+class TBlobStorageGroupAssimilateRequest : public TBlobStorageGroupRequestActor<TBlobStorageGroupAssimilateRequest> {
+ std::optional<ui64> SkipBlocksUpTo;
+ std::optional<std::tuple<ui64, ui8>> SkipBarriersUpTo;
+ std::optional<TLogoBlobID> SkipBlobsUpTo;
+
+ struct TBlock : TEvBlobStorage::TEvAssimilateResult::TBlock {
+ TBlock(const TBlock&) = default;
+
+ TBlock(ui64 key, const NKikimrBlobStorage::TEvVAssimilateResult::TBlock& msg) {
+ Y_VERIFY_DEBUG(msg.HasBlockedGeneration());
+ TabletId = key;
+ BlockedGeneration = msg.GetBlockedGeneration();
+ }
+
+ bool Merge(const TBlock& other) {
+ if (TabletId == other.TabletId) {
+ BlockedGeneration = Max(BlockedGeneration, other.BlockedGeneration);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ bool GoesBeforeThan(const TBlock& other) const {
+ return TabletId < other.TabletId;
+ }
+ };
+
+ struct TBarrier : TEvBlobStorage::TEvAssimilateResult::TBarrier {
+ TBarrier(const TBarrier&) = default;
+
+ TBarrier(std::tuple<ui64, ui8> key, const NKikimrBlobStorage::TEvVAssimilateResult::TBarrier& msg) {
+ std::tie(TabletId, Channel) = key;
+
+ auto parseValue = [](auto& value, const auto& pb) {
+ Y_VERIFY_DEBUG(pb.HasRecordGeneration() && pb.HasPerGenerationCounter() && pb.HasCollectGeneration() &&
+ pb.HasCollectStep());
+ value = {
+ .RecordGeneration = pb.GetRecordGeneration(),
+ .PerGenerationCounter = pb.GetPerGenerationCounter(),
+ .CollectGeneration = pb.GetCollectGeneration(),
+ .CollectStep = pb.GetCollectStep(),
+ };
+ };
+ if (msg.HasSoft()) {
+ parseValue(Soft, msg.GetSoft());
+ }
+ if (msg.HasHard()) {
+ parseValue(Hard, msg.GetHard());
+ }
+ }
+
+ bool Merge(const TBarrier& other) {
+ if (TabletId == other.TabletId && Channel == other.Channel) {
+ Merge(Soft, other.Soft);
+ Merge(Hard, other.Hard);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ static void Merge(TValue& to, const TValue& from) {
+ if (std::tie(to.RecordGeneration, to.PerGenerationCounter) < std::tie(from.RecordGeneration, from.PerGenerationCounter)) {
+ to = from;
+ }
+ }
+
+ bool GoesBeforeThan(const TBarrier& other) const {
+ return std::tie(TabletId, Channel) < std::tie(other.TabletId, other.Channel);
+ }
+ };
+
+ struct TBlob : TEvBlobStorage::TEvAssimilateResult::TBlob {
+ ui64 Ingress;
+
+ TBlob(const TBlob&) = default;
+
+ TBlob(TLogoBlobID key, const NKikimrBlobStorage::TEvVAssimilateResult::TBlob& msg) {
+ Y_VERIFY_DEBUG(msg.HasIngress());
+ Id = key;
+ Ingress = msg.GetIngress();
+ Keep = DoNotKeep = false;
+ }
+
+ bool Merge(const TBlob& other) {
+ if (Id == other.Id) {
+ Ingress |= other.Ingress;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ bool GoesBeforeThan(const TBlob& other) const {
+ return Id < other.Id;
+ }
+ };
+
+ using TItemVariant = std::variant<TBlock, TBarrier, TBlob>;
+
+ struct TPerVDiskInfo {
+ std::optional<ui64> LastProcessedBlock;
+ std::optional<std::tuple<ui64, ui8>> LastProcessedBarrier;
+ std::optional<TLogoBlobID> LastProcessedBlob;
+
+ std::deque<TBlock> Blocks;
+ std::deque<TBarrier> Barriers;
+ std::deque<TBlob> Blobs;
+
+ struct TFinished {
+ friend bool operator <(const TFinished&, const TFinished&) { return false; }
+ };
+
+ std::variant<TBlock*, TBarrier*, TBlob*, TFinished> Next;
+
+ void PushDataFromMessage(const NKikimrBlobStorage::TEvVAssimilateResult& msg,
+ const TBlobStorageGroupAssimilateRequest& self) {
+ Y_VERIFY(Blocks.empty() && Barriers.empty() && Blobs.empty());
+
+ std::array<ui64, 3> context = {0, 0, 0};
+
+ auto getKey = [&](const auto& item) {
+ using T = std::decay_t<decltype(item)>;
+ if constexpr (std::is_same_v<T, NKikimrBlobStorage::TEvVAssimilateResult::TBlock>) {
+ Y_VERIFY_DEBUG(item.HasTabletId());
+ return ui64(item.GetTabletId());
+ } else if constexpr (std::is_same_v<T, NKikimrBlobStorage::TEvVAssimilateResult::TBarrier>) {
+ Y_VERIFY_DEBUG(item.HasTabletId() && item.HasChannel());
+ return std::tuple<ui64, ui8>(item.GetTabletId(), item.GetChannel());
+ } else if constexpr (std::is_same_v<T, NKikimrBlobStorage::TEvVAssimilateResult::TBlob>) {
+ if (item.HasRawX1()) {
+ context[0] = item.GetRawX1();
+ } else if (item.HasDiffX1()) {
+ context[0] += item.GetDiffX1();
+ }
+ if (item.HasRawX2()) {
+ context[1] = item.GetRawX2();
+ } else if (item.HasDiffX2()) {
+ context[1] += item.GetDiffX2();
+ }
+ if (item.HasRawX3()) {
+ context[2] = item.GetRawX3();
+ } else if (item.HasDiffX3()) {
+ context[2] += item.GetDiffX3();
+ }
+ return TLogoBlobID(context.data());
+ } else {
+ static_assert(TDependentFalse<T>, "unsupported protobuf");
+ }
+ };
+
+ auto processItems = [&](const auto& items, auto& lastProcessed, auto& field, const auto& skipUpTo) {
+ for (const auto& item : items) {
+ const auto key = getKey(item);
+ Y_VERIFY(lastProcessed < key);
+ lastProcessed.emplace(key);
+ if (skipUpTo < key) {
+ field.emplace_back(key, item);
+ }
+ }
+ };
+
+ processItems(msg.GetBlocks(), LastProcessedBlock, Blocks, self.SkipBlocksUpTo);
+ processItems(msg.GetBarriers(), LastProcessedBarrier, Barriers, self.SkipBarriersUpTo);
+ processItems(msg.GetBlobs(), LastProcessedBlob, Blobs, self.SkipBlobsUpTo);
+
+ AdjustNext();
+ }
+
+ void Consume() {
+ if (!Blocks.empty()) {
+ Blocks.pop_front();
+ } else if (!Barriers.empty()) {
+ Barriers.pop_front();
+ } else if (!Blobs.empty()) {
+ Blobs.pop_front();
+ } else {
+ Y_UNREACHABLE();
+ }
+ }
+
+ void AdjustNext() {
+ if (!Blocks.empty()) {
+ Next.emplace<0>(&Blocks.front());
+ } else if (!Barriers.empty()) {
+ Next.emplace<1>(&Barriers.front());
+ } else if (!Blobs.empty()) {
+ Next.emplace<2>(&Blobs.front());
+ } else {
+ Next.emplace<3>();
+ }
+ }
+
+ TItemVariant BeginMerge() const {
+ return std::visit([](auto value) -> TItemVariant {
+ if constexpr (std::is_same_v<decltype(value), TFinished>) {
+ Y_FAIL();
+ } else {
+ return TItemVariant(std::in_place_type<std::decay_t<decltype(*value)>>, *value);
+ }
+ }, Next);
+ }
+
+ bool Merge(TItemVariant *to) const {
+ return std::visit([to](auto value) -> bool {
+ if constexpr (std::is_same_v<decltype(value), TFinished>) {
+ Y_FAIL();
+ } else if (auto *toItem = std::get_if<std::decay_t<decltype(*value)>>(to)) {
+ return toItem->Merge(*value);
+ } else {
+ return false;
+ }
+ }, Next);
+ }
+
+ bool GoesBeforeThan(const TPerVDiskInfo& other) const {
+ return Next.index() != other.Next.index()
+ ? Next.index() < other.Next.index()
+ : std::visit([&other](auto&& left) {
+ using T = std::decay_t<decltype(left)>;
+ const auto& right = std::get<T>(other.Next);
+ if constexpr (std::is_same_v<T, TFinished>) {
+ return false; // always equal
+ } else {
+ return left->GoesBeforeThan(*right);
+ }
+ }, Next);
+ }
+
+ bool Finished() const {
+ return Next.index() == 3;
+ }
+
+ bool HasItemsToMerge() const {
+ return !Blocks.empty() || !Barriers.empty() || !Blobs.empty();
+ }
+
+ struct TCompare {
+ bool operator ()(const TPerVDiskInfo *x, const TPerVDiskInfo *y) const {
+ return y->GoesBeforeThan(*x);
+ }
+ };
+ };
+
+ std::vector<TPerVDiskInfo> PerVDiskInfo;
+ std::vector<TPerVDiskInfo*> Heap;
+
+ std::unique_ptr<TEvBlobStorage::TEvAssimilateResult> Result;
+ ui32 RequestsInFlight = 0;
+
+public:
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType(){
+ return NKikimrServices::TActivity::BS_GROUP_ASSIMILATE;
+ }
+
+ static const auto& ActiveCounter(const TIntrusivePtr<TBlobStorageGroupProxyMon>& mon) {
+ return mon->ActiveAssimilate;
+ }
+
+ TBlobStorageGroupAssimilateRequest(const TIntrusivePtr<TBlobStorageGroupInfo>& info,
+ const TIntrusivePtr<TGroupQueues>& state, const TActorId& source,
+ const TIntrusivePtr<TBlobStorageGroupProxyMon>& mon, TEvBlobStorage::TEvAssimilate *ev, ui64 cookie,
+ NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters>& storagePoolCounters)
+ : TBlobStorageGroupRequestActor(info, state, mon, source, cookie, std::move(traceId),
+ NKikimrServices::BS_PROXY_ASSIMILATE, false, {}, now, storagePoolCounters, ev->RestartCounter,
+ "DSProxy.Assimilate")
+ , SkipBlocksUpTo(ev->SkipBlocksUpTo)
+ , SkipBarriersUpTo(ev->SkipBarriersUpTo)
+ , SkipBlobsUpTo(ev->SkipBlobsUpTo)
+ , PerVDiskInfo(info->GetTotalVDisksNum())
+ , Result(new TEvBlobStorage::TEvAssimilateResult(NKikimrProto::OK, {}))
+ {
+ Heap.reserve(PerVDiskInfo.size());
+ }
+
+ void Bootstrap() {
+ Become(&TThis::StateWork);
+
+ for (ui32 i = 0; i < PerVDiskInfo.size(); ++i) {
+ Request(i);
+ }
+ }
+
+ STATEFN(StateWork) {
+ if (ProcessEvent(ev)) {
+ return;
+ }
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvBlobStorage::TEvVAssimilateResult, Handle);
+ }
+ }
+
+ void Request(ui32 orderNumber) {
+ TPerVDiskInfo& info = PerVDiskInfo[orderNumber];
+ Y_VERIFY(!info.Finished());
+
+ auto maxOpt = [](const auto& x, const auto& y) { return !x ? y : !y ? x : *x < *y ? y : x; };
+
+ SendToQueue(std::make_unique<TEvBlobStorage::TEvVAssimilate>(Info->GetVDiskId(orderNumber),
+ maxOpt(SkipBlocksUpTo, info.LastProcessedBlock),
+ maxOpt(SkipBarriersUpTo, info.LastProcessedBarrier),
+ maxOpt(SkipBlobsUpTo, info.LastProcessedBlob)), 0);
+
+ ++RequestsInFlight;
+ }
+
+ void Handle(TEvBlobStorage::TEvVAssimilateResult::TPtr ev) {
+ --RequestsInFlight;
+
+ const auto& record = ev->Get()->Record;
+ const TVDiskID vdiskId = VDiskIDFromVDiskID(record.GetVDiskID());
+ const ui32 orderNumber = Info->GetOrderNumber(vdiskId);
+ Y_VERIFY(orderNumber < PerVDiskInfo.size());
+
+ if (record.GetStatus() == NKikimrProto::OK) {
+ auto& info = PerVDiskInfo[orderNumber];
+ info.PushDataFromMessage(record, *this);
+ if (info.HasItemsToMerge()) {
+ Heap.push_back(&info);
+ std::push_heap(Heap.begin(), Heap.end(), TPerVDiskInfo::TCompare());
+ } else if (!info.Finished()) {
+ Request(orderNumber);
+ }
+ }
+
+ if (!RequestsInFlight) {
+ Merge();
+ }
+ }
+
+ void Merge() {
+ std::vector<ui32> requests;
+
+ const TBlobStorageGroupInfo::TTopology *top = &Info->GetTopology();
+ TBlobStorageGroupInfo::TGroupVDisks disksWithData(top);
+ for (ui32 i = 0; i < PerVDiskInfo.size(); ++i) {
+ TPerVDiskInfo& info = PerVDiskInfo[i];
+ if (info.HasItemsToMerge() || info.Finished()) {
+ disksWithData += {top, top->GetVDiskId(i)};
+ }
+ }
+ if (!Info->GetQuorumChecker().CheckQuorumForGroup(disksWithData)) {
+ if (Result->Blocks.empty() && Result->Barriers.empty() && Result->Blobs.empty()) {
+ // we didn't get any data, so reply with ERROR to prevent confusing blob depot with 'all finished' situation
+ ReplyAndDie(NKikimrProto::ERROR);
+ } else {
+ // answer with what we have already collected
+ SendResponseAndDie(std::move(Result));
+ }
+ return;
+ }
+ while (requests.empty()) {
+ if (Heap.empty()) {
+ SendResponseAndDie(std::move(Result));
+ return;
+ }
+
+ TPerVDiskInfo *heapItem = Heap.front();
+ auto item = heapItem->BeginMerge();
+
+ for (;;) {
+ std::pop_heap(Heap.begin(), Heap.end(), TPerVDiskInfo::TCompare());
+ heapItem->Consume();
+ if (heapItem->HasItemsToMerge()) {
+ heapItem->AdjustNext();
+ std::push_heap(Heap.begin(), Heap.end(), TPerVDiskInfo::TCompare());
+ } else {
+ Heap.pop_back();
+ const ui32 orderNumber = heapItem - PerVDiskInfo.data();
+ requests.push_back(orderNumber);
+ }
+
+ heapItem = Heap.empty() ? nullptr : Heap.front();
+ if (!heapItem || !heapItem->Merge(&item)) {
+ break;
+ }
+ }
+
+ std::visit(TOverloaded{
+ [&](TBlock& block) {
+ SkipBlocksUpTo.emplace(block.TabletId);
+ Result->Blocks.push_back(block);
+ },
+ [&](TBarrier& barrier) {
+ SkipBarriersUpTo.emplace(barrier.TabletId, barrier.Channel);
+ Result->Barriers.push_back(barrier);
+ },
+ [&](TBlob& blob) {
+ SkipBlobsUpTo.emplace(blob.Id);
+ Result->Blobs.push_back(blob);
+ }
+ }, item);
+ }
+
+ if (Result->Blocks.size() + Result->Barriers.size() + Result->Blobs.size() >= 10'000) {
+ SendResponseAndDie(std::move(Result));
+ } else {
+ for (const ui32 orderNumber : requests) {
+ Request(orderNumber);
+ }
+ }
+ }
+
+ std::unique_ptr<IEventBase> RestartQuery(ui32 counter) {
+ ++*Mon->NodeMon->RestartAssimilate;
+ auto ev = std::make_unique<TEvBlobStorage::TEvAssimilate>(SkipBlocksUpTo, SkipBarriersUpTo, SkipBlobsUpTo);
+ ev->RestartCounter = counter;
+ return ev;
+ }
+
+ void ReplyAndDie(NKikimrProto::EReplyStatus status) {
+ SendResponseAndDie(std::make_unique<TEvBlobStorage::TEvAssimilateResult>(status, ErrorReason));
+ }
+};
+
+IActor* CreateBlobStorageGroupAssimilateRequest(const TIntrusivePtr<TBlobStorageGroupInfo>& info,
+ const TIntrusivePtr<TGroupQueues>& state, const TActorId& source,
+ const TIntrusivePtr<TBlobStorageGroupProxyMon>& mon, TEvBlobStorage::TEvAssimilate *ev,
+ ui64 cookie, NWilson::TTraceId traceId, TInstant now, TIntrusivePtr<TStoragePoolCounters>& storagePoolCounters) {
+ return new TBlobStorageGroupAssimilateRequest(info, state, source, mon, ev, cookie, std::move(traceId), now, storagePoolCounters);
+}
+
+} // NKikimr
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_impl.h
index e03405a42c..47d60411f3 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_impl.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_impl.h
@@ -143,6 +143,8 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy>
Mon->EventStatus->Inc();
} else if constexpr (std::is_same_v<TEvent, TEvBlobStorage::TEvPatch>) {
Mon->EventPatch->Inc();
+ } else if constexpr (std::is_same_v<TEvent, TEvBlobStorage::TEvAssimilate>) {
+ Mon->EventAssimilate->Inc();
}
}
@@ -255,6 +257,7 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy>
void HandleNormal(TEvBlobStorage::TEvRange::TPtr &ev);
void HandleNormal(TEvBlobStorage::TEvCollectGarbage::TPtr &ev);
void HandleNormal(TEvBlobStorage::TEvStatus::TPtr &ev);
+ void HandleNormal(TEvBlobStorage::TEvAssimilate::TPtr &ev);
void Handle(TEvBlobStorage::TEvBunchOfEvents::TPtr ev);
void Handle(TEvDeathNote::TPtr ev);
@@ -362,6 +365,7 @@ public:
hFunc(TEvBlobStorage::TEvCollectGarbage, HANDLER); \
hFunc(TEvBlobStorage::TEvStatus, HANDLER); \
hFunc(TEvBlobStorage::TEvPatch, HANDLER); \
+ hFunc(TEvBlobStorage::TEvAssimilate, HANDLER); \
/**/
STFUNC(StateUnconfigured) {
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_mon.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_mon.cpp
index 30440593be..6b617a6b0f 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_mon.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_mon.cpp
@@ -47,6 +47,7 @@ TBlobStorageGroupProxyMon::TBlobStorageGroupProxyMon(const TIntrusivePtr<::NMoni
EventStopPutBatching = EventGroup->GetCounter("EvStopPutBatching", true);
EventStopGetBatching = EventGroup->GetCounter("EvStopGetBatching", true);
EventPatch = EventGroup->GetCounter("EvPatch", true);
+ EventAssimilate = EventGroup->GetCounter("EvAssimilate", true);
PutsSentViaPutBatching = EventGroup->GetCounter("PutsSentViaPutBatching", true);
PutBatchesSent = EventGroup->GetCounter("PutBatchesSent", true);
@@ -72,6 +73,7 @@ TBlobStorageGroupProxyMon::TBlobStorageGroupProxyMon(const TIntrusivePtr<::NMoni
ActiveCollectGarbage = ActiveRequestsGroup->GetCounter("ActiveCollectGarbage");
ActiveStatus = ActiveRequestsGroup->GetCounter("ActiveStatus");
ActivePatch = ActiveRequestsGroup->GetCounter("ActivePatch");
+ ActiveAssimilate = ActiveRequestsGroup->GetCounter("ActiveAssimilate");
// special patch counters
VPatchContinueFailed = ActiveRequestsGroup->GetCounter("VPatchContinueFailed");
@@ -101,6 +103,7 @@ TBlobStorageGroupProxyMon::TBlobStorageGroupProxyMon(const TIntrusivePtr<::NMoni
RespStatCollectGarbage.emplace(respStatGroup->GetSubgroup("request", "collectGarbage"));
RespStatStatus.emplace(respStatGroup->GetSubgroup("request", "status"));
RespStatPatch.emplace(respStatGroup->GetSubgroup("request", "patch"));
+ RespStatAssimilate.emplace(respStatGroup->GetSubgroup("request", "assimilate"));
}
void TBlobStorageGroupProxyMon::BecomeFull() {
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_mon.h b/ydb/core/blobstorage/dsproxy/dsproxy_mon.h
index 59e3007572..5d1a9aa84c 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_mon.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_mon.h
@@ -212,6 +212,7 @@ public:
::NMonitoring::TDynamicCounters::TCounterPtr EventStopPutBatching;
::NMonitoring::TDynamicCounters::TCounterPtr EventStopGetBatching;
::NMonitoring::TDynamicCounters::TCounterPtr EventPatch;
+ ::NMonitoring::TDynamicCounters::TCounterPtr EventAssimilate;
::NMonitoring::TDynamicCounters::TCounterPtr PutsSentViaPutBatching;
::NMonitoring::TDynamicCounters::TCounterPtr PutBatchesSent;
@@ -231,6 +232,7 @@ public:
::NMonitoring::TDynamicCounters::TCounterPtr ActiveMultiCollect;
::NMonitoring::TDynamicCounters::TCounterPtr ActiveStatus;
::NMonitoring::TDynamicCounters::TCounterPtr ActivePatch;
+ ::NMonitoring::TDynamicCounters::TCounterPtr ActiveAssimilate;
std::optional<TResponseStatusGroup> RespStatPut;
std::optional<TResponseStatusGroup> RespStatGet;
@@ -240,6 +242,7 @@ public:
std::optional<TResponseStatusGroup> RespStatCollectGarbage;
std::optional<TResponseStatusGroup> RespStatStatus;
std::optional<TResponseStatusGroup> RespStatPatch;
+ std::optional<TResponseStatusGroup> RespStatAssimilate;
// special patch counters
::NMonitoring::TDynamicCounters::TCounterPtr VPatchContinueFailed;
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.cpp
index 527a10ae86..c0930e7a0e 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.cpp
@@ -58,6 +58,7 @@ TDsProxyNodeMon::TDsProxyNodeMon(TIntrusivePtr<::NMonitoring::TDynamicCounters>
RestartCollectGarbage = group->GetCounter("EvCollectGarbage", true);
RestartIndexRestoreGet = group->GetCounter("EvIndexRestoreGet", true);
RestartStatus = group->GetCounter("EvStatus", true);
+ RestartAssimilate = group->GetCounter("EvAssimilate", true);
}
{
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.h b/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.h
index c5fab00d3d..2681b86681 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.h
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_nodemon.h
@@ -75,6 +75,7 @@ struct TDsProxyNodeMon : public TThrRefBase {
::NMonitoring::TDynamicCounters::TCounterPtr RestartIndexRestoreGet;
::NMonitoring::TDynamicCounters::TCounterPtr RestartStatus;
::NMonitoring::TDynamicCounters::TCounterPtr RestartPatch;
+ ::NMonitoring::TDynamicCounters::TCounterPtr RestartAssimilate;
std::array<::NMonitoring::TDynamicCounters::TCounterPtr, 4> RestartHisto;
diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp
index 72a6b09104..87b4d72b5d 100644
--- a/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp
+++ b/ydb/core/blobstorage/dsproxy/dsproxy_request.cpp
@@ -231,6 +231,14 @@ namespace NKikimr {
ActiveRequests.insert(reqID);
}
+ void TBlobStorageGroupProxy::HandleNormal(TEvBlobStorage::TEvAssimilate::TPtr &ev) {
+ EnsureMonitoring(true);
+ Mon->EventAssimilate->Inc();
+ const TActorId reqID = Register(CreateBlobStorageGroupAssimilateRequest(Info, Sessions->GroupQueues, ev->Sender,
+ Mon, ev->Get(), ev->Cookie, std::move(ev->TraceId), TActivationContext::Now(), StoragePoolCounters));
+ ActiveRequests.insert(reqID);
+ }
+
void TBlobStorageGroupProxy::Handle(TEvDeathNote::TPtr ev) {
const bool wasEmpty = ResponsivenessTracker.IsEmpty();
for (const auto &item : ev->Get()->Responsiveness) {
diff --git a/ydb/core/blobstorage/dsproxy/group_sessions.h b/ydb/core/blobstorage/dsproxy/group_sessions.h
index b8dc8c69da..b5f33d2f4e 100644
--- a/ydb/core/blobstorage/dsproxy/group_sessions.h
+++ b/ydb/core/blobstorage/dsproxy/group_sessions.h
@@ -53,6 +53,10 @@ namespace NKikimr {
return NKikimrBlobStorage::EVDiskQueueId::GetFastRead;
}
+ static NKikimrBlobStorage::EVDiskQueueId VDiskQueueId(const TEvBlobStorage::TEvVAssimilate&) {
+ return NKikimrBlobStorage::EVDiskQueueId::GetAsyncRead;
+ }
+
TQueue& GetQueue(NKikimrBlobStorage::EVDiskQueueId queueId) {
switch (queueId) {
case NKikimrBlobStorage::EVDiskQueueId::PutTabletLog: return PutTabletLog;
@@ -179,6 +183,7 @@ namespace NKikimr {
}
void SetUpSubmitTimestamp(TEvBlobStorage::TEvVStatus& /*event*/) {}
+ void SetUpSubmitTimestamp(TEvBlobStorage::TEvVAssimilate& /*event*/) {}
template<typename TEvent>
TActorId Send(const IActor& actor, const TBlobStorageGroupInfo::TTopology& topology, std::unique_ptr<TEvent> event,
diff --git a/ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp b/ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp
index c0e6d8f15c..b9404a73b1 100644
--- a/ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp
+++ b/ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.cpp
@@ -49,6 +49,11 @@ namespace NKikimr {
ev->Cookie);
}
+ void Handle(TEvBlobStorage::TEvAssimilate::TPtr& ev) {
+ STLOG(PRI_DEBUG, BS_PROXY, BSPM09, "TEvAssimilate", (Msg, ev->Get()->ToString()));
+ Send(ev->Sender, new TEvBlobStorage::TEvAssimilateResult(NKikimrProto::ERROR, "not implemented"));
+ }
+
void HandlePoison(TEvents::TEvPoisonPill::TPtr& ev) {
STLOG(PRI_DEBUG, BS_PROXY, BSPM08, "TEvPoisonPill");
Send(ev->Sender, new TEvents::TEvPoisonTaken);
diff --git a/ydb/core/blobstorage/ut_blobstorage/assimilation.cpp b/ydb/core/blobstorage/ut_blobstorage/assimilation.cpp
index 20d2058c30..29a83a61c6 100644
--- a/ydb/core/blobstorage/ut_blobstorage/assimilation.cpp
+++ b/ydb/core/blobstorage/ut_blobstorage/assimilation.cpp
@@ -92,21 +92,8 @@ Y_UNIT_TEST_SUITE(VDiskAssimilation) {
for (;;) {
const TActorId vdiskId = info->GetActorId(i);
const TActorId client = runtime->AllocateEdgeActor(vdiskId.NodeId(), __FILE__, __LINE__);
- auto ev = std::make_unique<TEvBlobStorage::TEvVAssimilate>(info->GetVDiskId(i));
- {
- auto& record = ev->Record;
- if (lastBlock) {
- record.SetSkipBlocksUpTo(*lastBlock);
- }
- if (lastBarrier) {
- auto *x = record.MutableSkipBarriersUpTo();
- x->SetTabletId(lastBarrier->first);
- x->SetChannel(lastBarrier->second);
- }
- if (lastBlob) {
- LogoBlobIDFromLogoBlobID(*lastBlob, record.MutableSkipBlobsUpTo());
- }
- }
+ auto ev = std::make_unique<TEvBlobStorage::TEvVAssimilate>(info->GetVDiskId(i), lastBlock, lastBarrier,
+ lastBlob);
runtime->Send(new IEventHandle(vdiskId, client, ev.release()), vdiskId.NodeId());
auto res = env.WaitForEdgeActorEvent<TEvBlobStorage::TEvVAssimilateResult>(client);
const auto& record = res->Get()->Record;
diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_events.h b/ydb/core/blobstorage/vdisk/common/vdisk_events.h
index 305e4ad722..3393f2ac30 100644
--- a/ydb/core/blobstorage/vdisk/common/vdisk_events.h
+++ b/ydb/core/blobstorage/vdisk/common/vdisk_events.h
@@ -2512,8 +2512,20 @@ namespace NKikimr {
struct TEvBlobStorage::TEvVAssimilate : TEventPB<TEvVAssimilate, NKikimrBlobStorage::TEvVAssimilate, EvVAssimilate> {
TEvVAssimilate() = default;
- TEvVAssimilate(const TVDiskID& vdiskId) {
+ TEvVAssimilate(const TVDiskID& vdiskId, std::optional<ui64> skipBlocksUpTo,
+ std::optional<std::tuple<ui64, ui8>> skipBarriersUpTo, std::optional<TLogoBlobID> skipBlobsUpTo) {
VDiskIDFromVDiskID(vdiskId, Record.MutableVDiskID());
+ if (skipBlocksUpTo) {
+ Record.SetSkipBlocksUpTo(*skipBlocksUpTo);
+ }
+ if (skipBarriersUpTo) {
+ auto *barrier = Record.MutableSkipBarriersUpTo();
+ barrier->SetTabletId(std::get<0>(*skipBarriersUpTo));
+ barrier->SetChannel(std::get<1>(*skipBarriersUpTo));
+ }
+ if (skipBlobsUpTo) {
+ LogoBlobIDFromLogoBlobID(*skipBlobsUpTo, Record.MutableSkipBlobsUpTo());
+ }
}
};
@@ -2521,11 +2533,12 @@ namespace NKikimr {
{
TEvVAssimilateResult() = default;
- TEvVAssimilateResult(NKikimrProto::EReplyStatus status, TString errorReason) {
+ TEvVAssimilateResult(NKikimrProto::EReplyStatus status, TString errorReason, TVDiskID vdiskId) {
Record.SetStatus(status);
if (status != NKikimrProto::OK) {
Record.SetErrorReason(errorReason);
}
+ VDiskIDFromVDiskID(vdiskId, Record.MutableVDiskID());
}
};
diff --git a/ydb/core/blobstorage/vdisk/query/assimilation.cpp b/ydb/core/blobstorage/vdisk/query/assimilation.cpp
index 6038d4edcf..07ef96c24d 100644
--- a/ydb/core/blobstorage/vdisk/query/assimilation.cpp
+++ b/ydb/core/blobstorage/vdisk/query/assimilation.cpp
@@ -15,16 +15,16 @@ namespace NKikimr {
std::optional<TKeyBarrier> SkipBarriersUpTo;
std::optional<TKeyLogoBlob> SkipBlobsUpTo;
ui64 LastRawBlobId[3] = {0, 0, 0};
- size_t RecordSize = CountUnsigned(NKikimrBlobStorage::TEvVAssimilateResult::kStatusFieldNumber, NKikimrProto::OK);
+ size_t RecordSize;
static constexpr TDuration MaxQuantumTime = TDuration::MilliSeconds(10);
static constexpr size_t MaxResultSize = 5'000'000; // may overshoot a little
public:
- TAssimilationActor(THullDsSnap&& snap, TEvBlobStorage::TEvVAssimilate::TPtr& ev)
+ TAssimilationActor(THullDsSnap&& snap, TEvBlobStorage::TEvVAssimilate::TPtr& ev, TVDiskID vdiskId)
: Snap(std::move(snap))
, Ev(ev.Release())
- , Result(std::make_unique<TEvBlobStorage::TEvVAssimilateResult>(NKikimrProto::OK, TString()))
+ , Result(std::make_unique<TEvBlobStorage::TEvVAssimilateResult>(NKikimrProto::OK, TString(), vdiskId))
{
const auto& record = Ev->Get()->Record;
if (record.HasSkipBlocksUpTo()) {
@@ -37,6 +37,8 @@ namespace NKikimr {
if (record.HasSkipBlobsUpTo()) {
SkipBlobsUpTo.emplace(LogoBlobIDFromLogoBlobID(record.GetSkipBlobsUpTo()));
}
+
+ RecordSize = Result->Record.ByteSizeLong();
}
void Bootstrap(const TActorId& parentId) {
@@ -257,8 +259,8 @@ namespace NKikimr {
}
};
- IActor *CreateAssimilationActor(THullDsSnap&& snap, TEvBlobStorage::TEvVAssimilate::TPtr& ev) {
- return new TAssimilationActor(std::move(snap), ev);
+ IActor *CreateAssimilationActor(THullDsSnap&& snap, TEvBlobStorage::TEvVAssimilate::TPtr& ev, TVDiskID vdiskId) {
+ return new TAssimilationActor(std::move(snap), ev, vdiskId);
}
} // NKikimr
diff --git a/ydb/core/blobstorage/vdisk/query/assimilation.h b/ydb/core/blobstorage/vdisk/query/assimilation.h
index 8d838cb750..7cda79fe39 100644
--- a/ydb/core/blobstorage/vdisk/query/assimilation.h
+++ b/ydb/core/blobstorage/vdisk/query/assimilation.h
@@ -7,6 +7,6 @@
namespace NKikimr {
- IActor *CreateAssimilationActor(THullDsSnap&& snap, TEvBlobStorage::TEvVAssimilate::TPtr& ev);
+ IActor *CreateAssimilationActor(THullDsSnap&& snap, TEvBlobStorage::TEvVAssimilate::TPtr& ev, TVDiskID vdiskId);
} // NKikimr
diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
index aae8764608..64df734d78 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp
@@ -1215,9 +1215,21 @@ namespace NKikimr {
// ASSIMILATION
////////////////////////////////////////////////////////////////////////
+ void ReplyError(NKikimrProto::EReplyStatus status, const TString& errorReason, TEvBlobStorage::TEvVAssimilate::TPtr &ev,
+ const TActorContext &ctx, const TInstant &now) {
+ using namespace NErrBuilder;
+ std::unique_ptr<IEventBase> res(ErroneousResult(VCtx, status, errorReason, ev, now, SkeletonFrontIDPtr,
+ SelfVDiskId, Db->GetVDiskIncarnationGuid(), GInfo));
+ SendVDiskResponse(ctx, ev->Sender, res.release(), ev->Cookie);
+ }
+
void Handle(TEvBlobStorage::TEvVAssimilate::TPtr& ev, const TActorContext& ctx) {
- const TActorId actorId = RunInBatchPool(ctx, CreateAssimilationActor(Hull->GetIndexSnapshot(), ev));
- ActiveActors.insert(actorId);
+ if (!SelfVDiskId.SameDisk(ev->Get()->Record.GetVDiskID())) {
+ ReplyError(NKikimrProto::RACE, "group generation mismatch", ev, ctx, TAppData::TimeProvider->Now());
+ } else {
+ const TActorId actorId = RunInBatchPool(ctx, CreateAssimilationActor(Hull->GetIndexSnapshot(), ev, SelfVDiskId));
+ ActiveActors.insert(actorId);
+ }
}
////////////////////////////////////////////////////////////////////////
diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonerr.h b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonerr.h
index f209e77fca..f014441a74 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonerr.h
+++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonerr.h
@@ -439,6 +439,15 @@ namespace NKikimr {
return std::make_unique<TEvBlobStorage::TEvVSyncGuidResult>(status, vdiskID, now, counterPtr, nullptr,
ev->GetChannel());
}
+
+ static inline std::unique_ptr<IEventBase>
+ ErroneousResult(const TVDiskContextPtr& /*vctx*/, const NKikimrProto::EReplyStatus status, const TString& errorReason,
+ TEvBlobStorage::TEvVAssimilate::TPtr& /*ev*/, const TInstant& /*now*/,
+ const TActorIDPtr &/*skeletonFrontIDPtr*/, const TVDiskID &vdiskID, ui64 /*vdiskIncarnationGuid*/,
+ const TIntrusivePtr<TBlobStorageGroupInfo>& /*groupInfo*/)
+ {
+ return std::make_unique<TEvBlobStorage::TEvVAssimilateResult>(status, errorReason, vdiskID);
+ }
} // NErrBuilder
} // NKikimr
diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp
index fa9fcbd38b..a50f694cef 100644
--- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp
+++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp
@@ -1342,12 +1342,6 @@ namespace NKikimr {
ctx.Send(ev->Sender, new TEvBlobStorage::TEvVStatusResult(status, ev->Get()->Record.GetVDiskID()),
flags, ev->Cookie);
}
-
- void Reply(TEvBlobStorage::TEvVAssimilate::TPtr& ev, const TActorContext& ctx,
- NKikimrProto::EReplyStatus status, const TString& errorReason, TInstant /*now*/) {
- const ui32 flags = IEventHandle::MakeFlags(ev->GetChannel(), 0);
- ctx.Send(ev->Sender, new TEvBlobStorage::TEvVAssimilateResult(status, errorReason), flags, ev->Cookie);
- }
// FIXME: don't forget about counters
diff --git a/ydb/core/protos/blobstorage.proto b/ydb/core/protos/blobstorage.proto
index 2d498cd2af..d0009a446e 100644
--- a/ydb/core/protos/blobstorage.proto
+++ b/ydb/core/protos/blobstorage.proto
@@ -616,6 +616,7 @@ message TEvVAssimilateResult {
optional NKikimrProto.EReplyStatus Status = 1;
optional string ErrorReason = 2;
+ optional NKikimrBlobStorage.TVDiskID VDiskID = 3;
repeated TBlock Blocks = 10;
repeated TBarrier Barriers = 11;
diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto
index 8f27f228f1..a04b2f2289 100644
--- a/ydb/core/protos/services.proto
+++ b/ydb/core/protos/services.proto
@@ -50,6 +50,7 @@ enum EServiceKikimr {
BS_VDISK_SCRUB = 344;
BS_VDISK_PATCH = 345;
BS_VDISK_DEFRAG = 346;
+ BS_PROXY_ASSIMILATE = 347;
// DATASHARD section //
TX_DATASHARD = 290; //
@@ -903,5 +904,6 @@ message TActivity {
BS_STORAGE_STATS_ACTOR = 573;
DS_LOAD_ACTOR = 574;
PQ_META_CACHE = 575;
+ BS_GROUP_ASSIMILATE = 576;
};
};