aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2022-07-25 13:40:32 +0300
committeralexvru <alexvru@ydb.tech>2022-07-25 13:40:32 +0300
commitd79a471fa6bf139887fd7087a16b8f3d3ec1c998 (patch)
tree9a93c169204a67109c86e5b32de1a188b0d761a7
parent6e02ac55fbb46b2dbcf36c2a76fe6bf28a031a3c (diff)
downloadydb-d79a471fa6bf139887fd7087a16b8f3d3ec1c998.tar.gz
BlobDepot work in progress
-rw-r--r--ydb/core/base/defs.h1
-rw-r--r--ydb/core/base/logoblob.h26
-rw-r--r--ydb/core/blob_depot/CMakeLists.txt4
-rw-r--r--ydb/core/blob_depot/agent/storage_discover.cpp9
-rw-r--r--ydb/core/blob_depot/agent/storage_get.cpp2
-rw-r--r--ydb/core/blob_depot/agent/storage_put.cpp3
-rw-r--r--ydb/core/blob_depot/agent/storage_range.cpp15
-rw-r--r--ydb/core/blob_depot/blob_depot.cpp2
-rw-r--r--ydb/core/blob_depot/blob_depot_tablet.h1
-rw-r--r--ydb/core/blob_depot/data.cpp91
-rw-r--r--ydb/core/blob_depot/data.h34
-rw-r--r--ydb/core/blob_depot/data_gc.cpp88
-rw-r--r--ydb/core/blob_depot/data_load.cpp162
-rw-r--r--ydb/core/blob_depot/data_resolve.cpp252
-rw-r--r--ydb/core/blob_depot/garbage_collection.cpp13
-rw-r--r--ydb/core/blob_depot/garbage_collection.h1
-rw-r--r--ydb/core/blob_depot/mon_main.cpp2
-rw-r--r--ydb/core/blob_depot/op_commit_blob_seq.cpp14
-rw-r--r--ydb/core/blob_depot/op_init_schema.cpp1
-rw-r--r--ydb/core/blob_depot/op_load.cpp63
-rw-r--r--ydb/core/blob_depot/op_resolve.cpp88
-rw-r--r--ydb/core/blob_depot/schema.h9
-rw-r--r--ydb/core/protos/blob_depot.proto5
23 files changed, 619 insertions, 267 deletions
diff --git a/ydb/core/base/defs.h b/ydb/core/base/defs.h
index bb4eb556895..e089d844a84 100644
--- a/ydb/core/base/defs.h
+++ b/ydb/core/base/defs.h
@@ -8,6 +8,7 @@
#include <ydb/core/debug/valgrind_check.h>
#include <util/generic/array_ref.h>
#include <util/generic/string.h>
+#include <util/system/byteorder.h>
namespace NKikimr {
// actorlib is organic part of kikimr so we emulate global import by this directive
diff --git a/ydb/core/base/logoblob.h b/ydb/core/base/logoblob.h
index ddcecd3b065..7d06bd5592a 100644
--- a/ydb/core/base/logoblob.h
+++ b/ydb/core/base/logoblob.h
@@ -14,6 +14,8 @@ namespace NKikimr {
static const ui32 MaxPartId = 15ul;
static const ui32 MaxCrcMode = 3ul;
+ static constexpr size_t BinarySize = 3 * sizeof(ui64);
+
TLogoBlobID()
{
Set(0, 0, 0, 0, 0, 0, 0, 0);
@@ -100,8 +102,28 @@ namespace NKikimr {
const ui64* GetRaw() const { return Raw.X; }
- TStringBuf AsBinaryString() const {
- return {reinterpret_cast<const char*>(GetRaw()), 3 * sizeof(ui64)};
+ void ToBinary(void *data) const {
+ ui64 *x = static_cast<ui64*>(data);
+ x[0] = HostToInet(Raw.X[0]);
+ x[1] = HostToInet(Raw.X[1]);
+ x[2] = HostToInet(Raw.X[2]);
+ }
+
+ TString AsBinaryString() const {
+ std::array<char, BinarySize> data;
+ ToBinary(data.data());
+ return TString(data.data(), data.size());
+ }
+
+ static TLogoBlobID FromBinary(const void *data) {
+ const ui64 *x = static_cast<const ui64*>(data);
+ ui64 arr[3] = {InetToHost(x[0]), InetToHost(x[1]), InetToHost(x[2])};
+ return TLogoBlobID(arr);
+ }
+
+ static TLogoBlobID FromBinary(const TString& data) {
+ Y_VERIFY(data.size() == BinarySize);
+ return FromBinary(data.data());
}
TString ToString() const;
diff --git a/ydb/core/blob_depot/CMakeLists.txt b/ydb/core/blob_depot/CMakeLists.txt
index 236ee49e557..9a868b3e026 100644
--- a/ydb/core/blob_depot/CMakeLists.txt
+++ b/ydb/core/blob_depot/CMakeLists.txt
@@ -19,12 +19,14 @@ target_sources(ydb-core-blob_depot PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blocks.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_gc.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_load.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_resolve.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/garbage_collection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/given_id_range.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/mon_main.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_apply_config.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_init_schema.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_load.cpp
- ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_resolve.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_commit_blob_seq.cpp
)
diff --git a/ydb/core/blob_depot/agent/storage_discover.cpp b/ydb/core/blob_depot/agent/storage_discover.cpp
index 7f1a4c9dd44..0ec66d59100 100644
--- a/ydb/core/blob_depot/agent/storage_discover.cpp
+++ b/ydb/core/blob_depot/agent/storage_discover.cpp
@@ -49,9 +49,9 @@ namespace NKikimr::NBlobDepot {
NKikimrBlobDepot::TEvResolve resolve;
auto *item = resolve.AddItems();
- item->SetBeginningKey(from.GetRaw(), 3 * sizeof(ui64));
+ item->SetBeginningKey(from.AsBinaryString());
item->SetIncludeBeginning(true);
- item->SetEndingKey(to.GetRaw(), 3 * sizeof(ui64));
+ item->SetEndingKey(to.AsBinaryString());
item->SetIncludeEnding(true);
item->SetMaxKeys(1);
item->SetReverse(true);
@@ -103,10 +103,7 @@ namespace NKikimr::NBlobDepot {
if (status == NKikimrProto::OK) {
for (const auto& item : msg.Record.GetResolvedKeys()) {
- const TString& id = item.GetKey();
- Y_VERIFY(id.size() == 3 * sizeof(ui64));
- Y_VERIFY(!Id);
- Id = TLogoBlobID(reinterpret_cast<const ui64*>(id.data()));
+ Id = TLogoBlobID::FromBinary(item.GetKey());
Y_VERIFY(item.ValueChainSize() == 1);
if (ReadBody) {
TString error;
diff --git a/ydb/core/blob_depot/agent/storage_get.cpp b/ydb/core/blob_depot/agent/storage_get.cpp
index 13bc7ef8e94..f710bbfe0f7 100644
--- a/ydb/core/blob_depot/agent/storage_get.cpp
+++ b/ydb/core/blob_depot/agent/storage_get.cpp
@@ -35,7 +35,7 @@ namespace NKikimr::NBlobDepot {
response.Shift = query.Shift;
response.RequestedSize = query.Size;
- TString blobId(reinterpret_cast<const char*>(query.Id.GetRaw()), 3 * sizeof(ui64));
+ TString blobId = query.Id.AsBinaryString();
if (const TValueChain *value = Agent.BlobMappingCache.ResolveKey(blobId, this,
std::make_shared<TResolveKeyContext>(i))) {
if (!ProcessSingleResult(i, value)) {
diff --git a/ydb/core/blob_depot/agent/storage_put.cpp b/ydb/core/blob_depot/agent/storage_put.cpp
index 325cb58d40a..c0bf7f2a084 100644
--- a/ydb/core/blob_depot/agent/storage_put.cpp
+++ b/ydb/core/blob_depot/agent/storage_put.cpp
@@ -142,10 +142,9 @@ namespace NKikimr::NBlobDepot {
NKikimrBlobDepot::TEvCommitBlobSeq request;
auto& msg = *Event->Get<TEvBlobStorage::TEvPut>();
- const TStringBuf key = msg.Id.AsBinaryString();
auto *item = request.AddItems();
- item->SetKey(key.data(), key.size());
+ item->SetKey(msg.Id.AsBinaryString());
auto *locator = item->MutableBlobLocator();
locator->SetGroupId(GroupId);
BlobSeqId.ToProto(locator->MutableBlobSeqId());
diff --git a/ydb/core/blob_depot/agent/storage_range.cpp b/ydb/core/blob_depot/agent/storage_range.cpp
index 57c5de67b9c..af054706490 100644
--- a/ydb/core/blob_depot/agent/storage_range.cpp
+++ b/ydb/core/blob_depot/agent/storage_range.cpp
@@ -31,8 +31,8 @@ namespace NKikimr::NBlobDepot {
void IssueResolve() {
auto& msg = *Event->Get<TEvBlobStorage::TEvRange>();
- TStringBuf from = msg.From.AsBinaryString();
- TStringBuf to = msg.To.AsBinaryString();
+ TString from = msg.From.AsBinaryString();
+ TString to = msg.To.AsBinaryString();
const bool reverse = msg.To < msg.From;
if (reverse) {
std::swap(from, to);
@@ -40,9 +40,9 @@ namespace NKikimr::NBlobDepot {
NKikimrBlobDepot::TEvResolve resolve;
auto *item = resolve.AddItems();
- item->SetBeginningKey(from.data(), from.size());
+ item->SetBeginningKey(from);
item->SetIncludeBeginning(true);
- item->SetEndingKey(to.data(), to.size());
+ item->SetEndingKey(to);
item->SetIncludeEnding(true);
item->SetReverse(reverse);
@@ -53,9 +53,9 @@ namespace NKikimr::NBlobDepot {
void IssueResolve(TLogoBlobID id, size_t index) {
NKikimrBlobDepot::TEvResolve resolve;
auto *item = resolve.AddItems();
- item->SetBeginningKey(id.GetRaw(), 3 * sizeof(ui64));
+ item->SetBeginningKey(id.AsBinaryString());
item->SetIncludeBeginning(true);
- item->SetEndingKey(id.GetRaw(), 3 * sizeof(ui64));
+ item->SetEndingKey(id.AsBinaryString());
item->SetIncludeEnding(true);
Agent.Issue(std::move(resolve), this, std::make_shared<TExtraResolveContext>(index));
@@ -85,8 +85,7 @@ namespace NKikimr::NBlobDepot {
for (const auto& key : msg.GetResolvedKeys()) {
const TString& blobId = key.GetKey();
- Y_VERIFY(blobId.size() == 3 * sizeof(ui64));
- TLogoBlobID id(reinterpret_cast<const ui64*>(blobId.data()));
+ auto id = TLogoBlobID::FromBinary(blobId);
const size_t index = context
? context->Obtain<TExtraResolveContext>().Index
diff --git a/ydb/core/blob_depot/blob_depot.cpp b/ydb/core/blob_depot/blob_depot.cpp
index 260caa7647a..bba9917e1a0 100644
--- a/ydb/core/blob_depot/blob_depot.cpp
+++ b/ydb/core/blob_depot/blob_depot.cpp
@@ -26,7 +26,7 @@ namespace NKikimr::NBlobDepot {
hFunc(TEvBlobDepot::TEvRegisterAgent, Handle);
hFunc(TEvBlobDepot::TEvAllocateIds, Handle);
hFunc(TEvBlobDepot::TEvCommitBlobSeq, Handle);
- hFunc(TEvBlobDepot::TEvResolve, Handle);
+ hFunc(TEvBlobDepot::TEvResolve, Data->Handle);
hFunc(TEvBlobDepot::TEvBlock, BlocksManager->Handle);
hFunc(TEvBlobDepot::TEvQueryBlocks, BlocksManager->Handle);
diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h
index b465995502d..05372c9bde7 100644
--- a/ydb/core/blob_depot/blob_depot_tablet.h
+++ b/ydb/core/blob_depot/blob_depot_tablet.h
@@ -167,7 +167,6 @@ namespace NKikimr::NBlobDepot {
std::unique_ptr<TData> Data;
void Handle(TEvBlobDepot::TEvCommitBlobSeq::TPtr ev);
- void Handle(TEvBlobDepot::TEvResolve::TPtr ev);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Monitoring
diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp
index c5750dc3ebc..aa43ca596ab 100644
--- a/ydb/core/blob_depot/data.cpp
+++ b/ydb/core/blob_depot/data.cpp
@@ -1,81 +1,9 @@
#include "data.h"
-#include "schema.h"
namespace NKikimr::NBlobDepot {
using TData = TBlobDepot::TData;
- class TData::TTxIssueGC : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
- const ui8 Channel;
- const ui32 GroupId;
- const TGenStep IssuedGenStep;
- std::unique_ptr<TEvBlobStorage::TEvCollectGarbage> CollectGarbage;
-
- public:
- TTxIssueGC(TBlobDepot *self, ui8 channel, ui32 groupId, TGenStep issuedGenStep,
- std::unique_ptr<TEvBlobStorage::TEvCollectGarbage> collectGarbage)
- : TTransactionBase(self)
- , Channel(channel)
- , GroupId(groupId)
- , IssuedGenStep(issuedGenStep)
- , CollectGarbage(std::move(collectGarbage))
- {}
-
- bool Execute(TTransactionContext& txc, const TActorContext&) override {
- NIceDb::TNiceDb db(txc.DB);
- db.Table<Schema::GC>().Key(Channel, GroupId).Update<Schema::GC::IssuedGenStep>(ui64(IssuedGenStep));
- return true;
- }
-
- void Complete(const TActorContext&) override {
- SendToBSProxy(Self->SelfId(), GroupId, CollectGarbage.release(), GroupId);
- }
- };
-
- class TData::TTxConfirmGC : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
- const ui8 Channel;
- const ui32 GroupId;
- std::vector<TLogoBlobID> TrashDeleted;
- const TGenStep ConfirmedGenStep;
-
- static constexpr ui32 MaxKeysToProcessAtOnce = 10'000;
-
- public:
- TTxConfirmGC(TBlobDepot *self, ui8 channel, ui32 groupId, std::vector<TLogoBlobID> trashDeleted, TGenStep confirmedGenStep)
- : TTransactionBase(self)
- , Channel(channel)
- , GroupId(groupId)
- , TrashDeleted(std::move(trashDeleted))
- , ConfirmedGenStep(confirmedGenStep)
- {}
-
- bool Execute(TTransactionContext& txc, const TActorContext&) override {
- NIceDb::TNiceDb db(txc.DB);
-
- for (ui32 i = 0; i < TrashDeleted.size() && i < MaxKeysToProcessAtOnce; ++i) {
- db.Table<Schema::Trash>().Key(TKey(TrashDeleted[i]).MakeBinaryKey()).Delete();
- }
- if (TrashDeleted.size() <= MaxKeysToProcessAtOnce) {
- TrashDeleted.clear();
- db.Table<Schema::GC>().Key(Channel, GroupId).Update<Schema::GC::ConfirmedGenStep>(ui64(ConfirmedGenStep));
- } else {
- std::vector<TLogoBlobID> temp;
- temp.insert(temp.end(), TrashDeleted.begin() + MaxKeysToProcessAtOnce, TrashDeleted.end());
- temp.swap(TrashDeleted);
- }
-
- return true;
- }
-
- void Complete(const TActorContext&) override {
- if (TrashDeleted.empty()) {
- Self->Data->OnCommitConfirmedGC(Channel, GroupId);
- } else { // resume transaction
- Self->Execute(std::make_unique<TTxConfirmGC>(Self, Channel, GroupId, std::move(TrashDeleted), ConfirmedGenStep));
- }
- }
- };
-
std::optional<TData::TValue> TData::FindKey(const TKey& key) {
const auto it = Data.find(key);
return it != Data.end() ? std::make_optional(it->second) : std::nullopt;
@@ -91,6 +19,10 @@ namespace NKikimr::NBlobDepot {
}
void TData::AddDataOnLoad(TKey key, TString value) {
+ if (Data.contains(key)) {
+ return; // we are racing with the offline load procedure -- skip this key, it is already new in memory
+ }
+
NKikimrBlobDepot::TValue proto;
const bool success = proto.ParseFromString(value);
Y_VERIFY(success);
@@ -118,8 +50,7 @@ namespace NKikimr::NBlobDepot {
}
void TData::PutKey(TKey key, TValue&& data) {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT10, "PutKey", (TabletId, Self->TabletID()), (Key, key.ToString(Self->Config)),
- (KeepState, NKikimrBlobDepot::EKeepState_Name(data.KeepState)));
+ ui64 referencedBytes = 0;
EnumerateBlobsForValueChain(data.ValueChain, Self->TabletID(), [&](TLogoBlobID id) {
if (!RefCount[id]++) {
@@ -129,8 +60,13 @@ namespace NKikimr::NBlobDepot {
Y_VERIFY(inserted);
AccountBlob(id, 1);
}
+ referencedBytes += id.BlobSize();
});
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT10, "PutKey", (TabletId, Self->TabletID()), (Key, key),
+ (ValueChain.size, data.ValueChain.size()), (ReferencedBytes, referencedBytes),
+ (KeepState, NKikimrBlobDepot::EKeepState_Name(data.KeepState)));
+
Data[std::move(key)] = std::move(data);
}
@@ -292,8 +228,7 @@ namespace NKikimr::NBlobDepot {
(TrashInFlight.size, record.TrashInFlight.size()));
if (collect) {
- Self->Execute(std::make_unique<TTxIssueGC>(Self, record.Channel, record.GroupId, record.IssuedGenStep,
- std::move(ev)));
+ ExecuteIssueGC(record.Channel, record.GroupId, record.IssuedGenStep, std::move(ev));
} else {
SendToBSProxy(Self->SelfId(), record.GroupId, ev.release(), record.GroupId);
}
@@ -323,8 +258,8 @@ namespace NKikimr::NBlobDepot {
if (ev->Get()->Status == NKikimrProto::OK) {
Y_VERIFY(record.CollectGarbageRequestInFlight);
record.OnSuccessfulCollect(this);
- Self->Execute(std::make_unique<TTxConfirmGC>(Self, record.Channel, record.GroupId,
- std::exchange(record.TrashInFlight, {}), record.LastConfirmedGenStep));
+ ExecuteConfirmGC(record.Channel, record.GroupId, std::exchange(record.TrashInFlight, {}),
+ record.LastConfirmedGenStep);
} else {
record.ClearInFlight(this);
HandleTrash();
diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h
index f9e4d6f340e..4816dced3ae 100644
--- a/ydb/core/blob_depot/data.h
+++ b/ydb/core/blob_depot/data.h
@@ -116,29 +116,28 @@ namespace NKikimr::NBlobDepot {
TString MakeBinaryKey() const {
if (Data.Type == BlobIdType) {
- return TString(GetBlobId().AsBinaryString());
+ return GetBlobId().AsBinaryString();
} else {
return TString(GetStringBuf());
}
}
- static TKey FromBinaryKey(const TStringBuf& key, const NKikimrBlobDepot::TBlobDepotConfig& config) {
+ static TKey FromBinaryKey(const TString& key, const NKikimrBlobDepot::TBlobDepotConfig& config) {
if (config.GetOperationMode() == NKikimrBlobDepot::EOperationMode::VirtualGroup) {
- Y_VERIFY(key.size() == 3 * sizeof(ui64));
- return TKey(TLogoBlobID(reinterpret_cast<const ui64*>(key.data())));
+ return TKey(TLogoBlobID::FromBinary(key));
} else {
return TKey(key);
}
}
- TString ToString(const NKikimrBlobDepot::TBlobDepotConfig& config) const {
+ TString ToString() const {
TStringStream s;
- Output(s, config);
+ Output(s);
return s.Str();
}
- void Output(IOutputStream& s, const NKikimrBlobDepot::TBlobDepotConfig& config) const {
- if (config.GetOperationMode() == NKikimrBlobDepot::EOperationMode::VirtualGroup) {
+ void Output(IOutputStream& s) const {
+ if (Data.Type == BlobIdType) {
s << GetBlobId();
} else {
s << EscapeC(GetStringBuf());
@@ -255,16 +254,23 @@ namespace NKikimr::NBlobDepot {
void EnqueueForCollectionIfPossible(TData *self);
};
+ bool Loaded = false;
std::map<TKey, TValue> Data;
THashMap<TLogoBlobID, ui32> RefCount;
THashMap<std::tuple<ui64, ui8, ui32>, TRecordsPerChannelGroup> RecordsPerChannelGroup;
TIntrusiveList<TRecordsPerChannelGroup, TRecordWithTrash> RecordsWithTrash;
+ std::optional<TKey> LastLoadedKey; // keys are being loaded in ascending order
THashMultiMap<void*, TLogoBlobID> InFlightTrash; // being committed, but not yet confirmed
class TTxIssueGC;
class TTxConfirmGC;
+ class TTxLoad;
+
+ class TTxLoadSpecificKeys;
+ class TTxResolve;
+
public:
TData(TBlobDepot *self)
: Self(self)
@@ -342,6 +348,18 @@ namespace NKikimr::NBlobDepot {
}
}
}
+
+ void StartLoad();
+ void OnLoadComplete();
+ bool IsLoaded() const { return Loaded; }
+
+ void Handle(TEvBlobDepot::TEvResolve::TPtr ev);
+
+ private:
+ void ExecuteIssueGC(ui8 channel, ui32 groupId, TGenStep issuedGenStep,
+ std::unique_ptr<TEvBlobStorage::TEvCollectGarbage> collectGarbage);
+
+ void ExecuteConfirmGC(ui8 channel, ui32 groupId, std::vector<TLogoBlobID> trashDeleted, TGenStep confirmedGenStep);
};
Y_DECLARE_OPERATORS_FOR_FLAGS(TBlobDepot::TData::TScanFlags)
diff --git a/ydb/core/blob_depot/data_gc.cpp b/ydb/core/blob_depot/data_gc.cpp
new file mode 100644
index 00000000000..4eff9208f76
--- /dev/null
+++ b/ydb/core/blob_depot/data_gc.cpp
@@ -0,0 +1,88 @@
+#include "data.h"
+#include "schema.h"
+
+namespace NKikimr::NBlobDepot {
+
+ using TData = TBlobDepot::TData;
+
+ class TData::TTxIssueGC : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
+ const ui8 Channel;
+ const ui32 GroupId;
+ const TGenStep IssuedGenStep;
+ std::unique_ptr<TEvBlobStorage::TEvCollectGarbage> CollectGarbage;
+
+ public:
+ TTxIssueGC(TBlobDepot *self, ui8 channel, ui32 groupId, TGenStep issuedGenStep,
+ std::unique_ptr<TEvBlobStorage::TEvCollectGarbage> collectGarbage)
+ : TTransactionBase(self)
+ , Channel(channel)
+ , GroupId(groupId)
+ , IssuedGenStep(issuedGenStep)
+ , CollectGarbage(std::move(collectGarbage))
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ NIceDb::TNiceDb db(txc.DB);
+ db.Table<Schema::GC>().Key(Channel, GroupId).Update<Schema::GC::IssuedGenStep>(ui64(IssuedGenStep));
+ return true;
+ }
+
+ void Complete(const TActorContext&) override {
+ SendToBSProxy(Self->SelfId(), GroupId, CollectGarbage.release(), GroupId);
+ }
+ };
+
+ void TData::ExecuteIssueGC(ui8 channel, ui32 groupId, TGenStep issuedGenStep,
+ std::unique_ptr<TEvBlobStorage::TEvCollectGarbage> collectGarbage) {
+ Self->Execute(std::make_unique<TTxIssueGC>(Self, channel, groupId, issuedGenStep, std::move(collectGarbage)));
+ }
+
+ class TData::TTxConfirmGC : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
+ const ui8 Channel;
+ const ui32 GroupId;
+ std::vector<TLogoBlobID> TrashDeleted;
+ const TGenStep ConfirmedGenStep;
+
+ static constexpr ui32 MaxKeysToProcessAtOnce = 10'000;
+
+ public:
+ TTxConfirmGC(TBlobDepot *self, ui8 channel, ui32 groupId, std::vector<TLogoBlobID> trashDeleted, TGenStep confirmedGenStep)
+ : TTransactionBase(self)
+ , Channel(channel)
+ , GroupId(groupId)
+ , TrashDeleted(std::move(trashDeleted))
+ , ConfirmedGenStep(confirmedGenStep)
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ NIceDb::TNiceDb db(txc.DB);
+
+ for (ui32 i = 0; i < TrashDeleted.size() && i < MaxKeysToProcessAtOnce; ++i) {
+ db.Table<Schema::Trash>().Key(TKey(TrashDeleted[i]).MakeBinaryKey()).Delete();
+ }
+ if (TrashDeleted.size() <= MaxKeysToProcessAtOnce) {
+ TrashDeleted.clear();
+ db.Table<Schema::GC>().Key(Channel, GroupId).Update<Schema::GC::ConfirmedGenStep>(ui64(ConfirmedGenStep));
+ } else {
+ std::vector<TLogoBlobID> temp;
+ temp.insert(temp.end(), TrashDeleted.begin() + MaxKeysToProcessAtOnce, TrashDeleted.end());
+ temp.swap(TrashDeleted);
+ }
+
+ return true;
+ }
+
+ void Complete(const TActorContext&) override {
+ if (TrashDeleted.empty()) {
+ Self->Data->OnCommitConfirmedGC(Channel, GroupId);
+ } else { // resume transaction
+ Self->Data->ExecuteConfirmGC(Channel, GroupId, std::move(TrashDeleted), ConfirmedGenStep);
+ }
+ }
+ };
+
+ void TData::ExecuteConfirmGC(ui8 channel, ui32 groupId, std::vector<TLogoBlobID> trashDeleted, TGenStep confirmedGenStep) {
+ Self->Execute(std::make_unique<TTxConfirmGC>(Self, channel, groupId, std::move(trashDeleted), confirmedGenStep));
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/data_load.cpp b/ydb/core/blob_depot/data_load.cpp
new file mode 100644
index 00000000000..820573e9ce6
--- /dev/null
+++ b/ydb/core/blob_depot/data_load.cpp
@@ -0,0 +1,162 @@
+#include "data.h"
+#include "schema.h"
+#include "garbage_collection.h"
+
+namespace NKikimr::NBlobDepot {
+
+ using TData = TBlobDepot::TData;
+
+ class TData::TTxLoad : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
+ template<typename TTable_, typename TInProgressState_, typename TNextState_, bool Initial_>
+ struct TItem {
+ using TTable = TTable_;
+ using TInProgressState = TInProgressState_;
+ using TNextState = TNextState_;
+ static constexpr bool Initial = Initial_;
+ typename TTable::TKey::KeyValuesType Key;
+ };
+
+ struct TLoadFinished {};
+ struct TLoadGcInProgress : TItem<Schema::GC, TLoadGcInProgress, TLoadFinished, false> {};
+ struct TLoadGcBegin : TItem<Schema::GC, TLoadGcInProgress, TLoadFinished, true> {};
+ struct TLoadTrashInProgress : TItem<Schema::Trash, TLoadTrashInProgress, TLoadGcBegin, false> {};
+ struct TLoadTrashBegin : TItem<Schema::Trash, TLoadTrashInProgress, TLoadGcBegin, true> {};
+ struct TLoadDataInProgress : TItem<Schema::Data, TLoadDataInProgress, TLoadTrashBegin, false> {};
+ struct TLoadDataBegin : TItem<Schema::Data, TLoadDataInProgress, TLoadTrashBegin, true> {};
+
+ using TLoadState = std::variant<
+ TLoadDataBegin,
+ TLoadDataInProgress,
+ TLoadTrashBegin,
+ TLoadTrashInProgress,
+ TLoadGcBegin,
+ TLoadGcInProgress,
+ TLoadFinished
+ >;
+
+ TLoadState LoadState;
+ std::unique_ptr<TTxLoad> SuccessorTx;
+
+ public:
+ TTxLoad(TBlobDepot *self, TLoadState loadState = TLoadDataBegin{})
+ : TTransactionBase(self)
+ , LoadState(std::move(loadState))
+ {}
+
+ TTxLoad(TTxLoad& predecessor)
+ : TTransactionBase(predecessor.Self)
+ , LoadState(std::move(predecessor.LoadState))
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT28, "TData::TTxLoad::Execute", (TabletId, Self->TabletID()));
+
+ NIceDb::TNiceDb db(txc.DB);
+ bool progress = false;
+ auto visitor = [&](auto& item) { return ExecuteLoad(db, item, progress); };
+
+ if (std::visit(visitor, LoadState)) {
+ // load finished
+ return true;
+ } else if (progress) {
+ // something was read, but not all
+ SuccessorTx = std::make_unique<TTxLoad>(*this);
+ return true;
+ } else {
+ // didn't read anything this time
+ return false;
+ }
+ }
+
+ template<typename T>
+ bool ExecuteLoad(NIceDb::TNiceDb& db, T& item, bool& progress) {
+ if constexpr (T::Initial) {
+ return LoadTable(db, db.Table<typename T::TTable>().All(), item, progress);
+ } else {
+ auto greaterOrEqual = [&](auto&&... key) { return db.Table<typename T::TTable>().GreaterOrEqual(key...); };
+ return LoadTable(db, std::apply(greaterOrEqual, item.Key), item, progress);
+ }
+ }
+
+ bool ExecuteLoad(NIceDb::TNiceDb&, TLoadFinished&, bool&) {
+ return true;
+ }
+
+ template<typename T, typename TItem>
+ bool LoadTable(NIceDb::TNiceDb& db, T&& table, TItem& item, bool& progress) {
+ if (!table.Precharge(TItem::TTable::PrechargeRows, TItem::TTable::PrechargeBytes)) {
+ return false;
+ }
+
+ for (auto rowset = table.Select();; rowset.Next()) {
+ if (!rowset.IsReady()) {
+ return false;
+ } else if (!rowset.IsValid()) {
+ break;
+ }
+
+ typename TItem::TTable::TKey::KeyValuesType key(rowset.GetKey());
+ bool processRow = true;
+ if constexpr (!TItem::Initial) {
+ processRow = item.Key < key;
+ }
+ if (processRow) {
+ progress = true;
+ typename TItem::TInProgressState state;
+ state.Key = std::move(key);
+ LoadState = std::move(state);
+ ProcessRow(rowset, static_cast<typename TItem::TTable*>(nullptr));
+ }
+ }
+
+ // table was read completely, advance to next state
+ LoadState = typename TItem::TNextState{};
+ return ExecuteLoad(db, std::get<typename TItem::TNextState>(LoadState), progress);
+ }
+
+ template<typename T>
+ void ProcessRow(T&& row, Schema::Data*) {
+ auto key = TData::TKey::FromBinaryKey(row.template GetValue<Schema::Data::Key>(), Self->Config);
+ Self->Data->AddDataOnLoad(key, row.template GetValue<Schema::Data::Value>());
+ Y_VERIFY(!Self->Data->LastLoadedKey || *Self->Data->LastLoadedKey < key);
+ Self->Data->LastLoadedKey = std::move(key);
+ }
+
+ template<typename T>
+ void ProcessRow(T&& row, Schema::Trash*) {
+ Self->Data->AddTrashOnLoad(TLogoBlobID::FromBinary(row.template GetValue<Schema::Trash::BlobId>()));
+ }
+
+ template<typename T>
+ void ProcessRow(T&& row, Schema::GC*) {
+ Self->Data->AddGenStepOnLoad(
+ row.template GetValue<Schema::GC::Channel>(),
+ row.template GetValue<Schema::GC::GroupId>(),
+ TGenStep(row.template GetValueOrDefault<Schema::GC::IssuedGenStep>()),
+ TGenStep(row.template GetValueOrDefault<Schema::GC::ConfirmedGenStep>())
+ );
+ }
+
+ void Complete(const TActorContext&) override {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT29, "TData::TTxLoad::Complete", (TabletId, Self->TabletID()),
+ (SuccessorTx, bool(SuccessorTx)), (LoadState.index, LoadState.index()));
+
+ if (SuccessorTx) {
+ Self->Execute(std::move(SuccessorTx));
+ } else {
+ Self->Data->OnLoadComplete();
+ }
+ }
+ };
+
+ void TData::StartLoad() {
+ Self->Execute(std::make_unique<TTxLoad>(Self));
+ }
+
+ void TData::OnLoadComplete() {
+ Loaded = true;
+ Self->BarrierServer->OnDataLoaded();
+ HandleTrash();
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/data_resolve.cpp b/ydb/core/blob_depot/data_resolve.cpp
new file mode 100644
index 00000000000..bdf96bb06b8
--- /dev/null
+++ b/ydb/core/blob_depot/data_resolve.cpp
@@ -0,0 +1,252 @@
+#include "data.h"
+#include "schema.h"
+
+namespace NKikimr::NBlobDepot {
+
+ using TData = TBlobDepot::TData;
+
+ class TData::TTxResolve : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
+ std::unique_ptr<TEvBlobDepot::TEvResolve::THandle> Request;
+ int ItemIndex = 0;
+ std::optional<TKey> LastScannedKey;
+ ui32 NumKeysRead = 0; // number of keys already read for this item
+
+ // final state
+ std::deque<std::unique_ptr<TEvBlobDepot::TEvResolveResult>> Outbox;
+ std::unique_ptr<TTxResolve> SuccessorTx;
+
+ public:
+ TTxResolve(TBlobDepot *self, TEvBlobDepot::TEvResolve::TPtr request)
+ : TTransactionBase(self)
+ , Request(request.Release())
+ {}
+
+ TTxResolve(TTxResolve& predecessor)
+ : TTransactionBase(predecessor.Self)
+ , Request(std::move(predecessor.Request))
+ , ItemIndex(predecessor.ItemIndex)
+ , LastScannedKey(std::move(predecessor.LastScannedKey))
+ , NumKeysRead(predecessor.NumKeysRead)
+ {}
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ NIceDb::TNiceDb db(txc.DB);
+
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT22, "TTxResolve::Execute", (TabletId, Self->TabletID()),
+ (Sender, Request->Sender), (Cookie, Request->Cookie), (ItemIndex, ItemIndex),
+ (LastScannedKey, LastScannedKey));
+
+ if (Self->Data->Loaded) {
+ GenerateResponse();
+ return true;
+ }
+
+ bool progress = false; // have we made some progress during scan?
+ const auto& record = Request->Get()->Record;
+ const auto& items = record.GetItems();
+ for (; ItemIndex < items.size(); ++ItemIndex, LastScannedKey.reset(), NumKeysRead = 0) {
+ const auto& item = items[ItemIndex];
+
+ std::optional<TKey> begin = item.HasBeginningKey()
+ ? std::make_optional(TKey::FromBinaryKey(item.GetBeginningKey(), Self->Config))
+ : std::nullopt;
+
+ std::optional<TKey> end = item.HasEndingKey()
+ ? std::make_optional(TKey::FromBinaryKey(item.GetEndingKey(), Self->Config))
+ : std::nullopt;
+
+ TScanFlags flags;
+ if (item.GetIncludeBeginning()) {
+ flags |= EScanFlags::INCLUDE_BEGIN;
+ }
+ if (item.GetIncludeEnding()) {
+ flags |= EScanFlags::INCLUDE_END;
+ }
+ if (item.GetReverse()) {
+ flags |= EScanFlags::REVERSE;
+ }
+
+ // adjust range according to actually generated data
+ if (LastScannedKey) {
+ if (flags & EScanFlags::REVERSE) { // reverse scan
+ end = *LastScannedKey;
+ flags &= ~EScanFlags::INCLUDE_END;
+ } else { // direct scan
+ begin = *LastScannedKey;
+ flags &= ~EScanFlags::INCLUDE_BEGIN;
+ }
+ }
+
+ if (end && Self->Data->LastLoadedKey && *end <= Self->Data->LastLoadedKey) {
+ // we have everything we need contained in memory, skip this item
+ continue;
+ } else if (Self->Data->LastLoadedKey && (!begin || *begin <= Self->Data->LastLoadedKey)) {
+ // we can scan only some part from memory -- do it
+ auto callback = [&](const TKey& key, const TValue&) {
+ LastScannedKey = key;
+ return ++NumKeysRead != item.GetMaxKeys();
+ };
+ Self->Data->ScanRange(begin ? &begin.value() : nullptr, &Self->Data->LastLoadedKey.value(),
+ flags | EScanFlags::INCLUDE_END, callback);
+
+ // adjust range beginning
+ begin = Self->Data->LastLoadedKey;
+ flags &= ~EScanFlags::INCLUDE_BEGIN;
+
+ // check if we have read all the keys requested
+ if (NumKeysRead == item.GetMaxKeys()) {
+ continue;
+ }
+ }
+
+ auto processRange = [&](auto&& table) {
+ for (auto rowset = table.Select();; rowset.Next()) {
+ if (!rowset.IsReady()) {
+ return false;
+ } else if (!rowset.IsValid()) {
+ // no more keys in our direction
+ return true;
+ }
+ auto key = TKey::FromBinaryKey(rowset.template GetValue<Schema::Data::Key>(), Self->Config);
+ if (key != LastScannedKey) {
+ LastScannedKey = key;
+ progress = true;
+ Self->Data->AddDataOnLoad(key, rowset.template GetValue<Schema::Data::Value>());
+
+ const bool matchBegin = !begin || (flags & EScanFlags::INCLUDE_BEGIN ? *begin <= key : *begin < key);
+ const bool matchEnd = !end || (flags & EScanFlags::INCLUDE_END ? key <= *end : key < *end);
+ if (matchBegin && matchEnd && ++NumKeysRead == item.GetMaxKeys()) {
+ // we have hit the MaxItems limit, exit
+ return true;
+ } else if (flags & EScanFlags::REVERSE ? !matchBegin : !matchEnd) {
+ // we have exceeded the opposite boundary, exit
+ return true;
+ }
+ }
+ }
+ };
+
+ auto applyEnd = [&](auto&& x) {
+ return end
+ ? processRange(x.LessOrEqual(end->MakeBinaryKey()))
+ : processRange(std::forward<std::decay_t<decltype(x)>>(x));
+ };
+ auto applyBegin = [&](auto&& x) {
+ return begin
+ ? applyEnd(x.GreaterOrEqual(begin->MakeBinaryKey()))
+ : applyEnd(std::forward<std::decay_t<decltype(x)>>(x));
+ };
+ auto applyReverse = [&](auto&& x) {
+ return item.GetReverse()
+ ? applyBegin(x.Reverse())
+ : applyBegin(std::forward<std::decay_t<decltype(x)>>(x));
+ };
+ if (applyReverse(db.Table<Schema::Data>())) {
+ continue; // all work done for this item
+ } else if (progress) {
+ // we have already done something, so let's finish this transaction and start a new one, continuing
+ // the job
+ SuccessorTx = std::make_unique<TTxResolve>(*this);
+ return true;
+ } else {
+ return false; // we'll have to restart this transaction to fetch some data
+ }
+ }
+
+ GenerateResponse();
+ return true;
+ }
+
+ void Complete(const TActorContext&) override {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT30, "TTxResolve::Complete", (TabletId, Self->TabletID()),
+ (Sender, Request->Sender), (Cookie, Request->Cookie), (SuccessorTx, bool(SuccessorTx)),
+ (Outbox.size, Outbox.size()));
+
+ if (SuccessorTx) {
+ Self->Execute(std::move(SuccessorTx));
+ } else {
+ if (Outbox.empty()) {
+ Outbox.push_back(std::make_unique<TEvBlobDepot::TEvResolveResult>(NKikimrProto::OK, std::nullopt));
+ }
+ for (auto& ev : Outbox) {
+ auto handle = std::make_unique<IEventHandle>(Request->Sender, Self->SelfId(), ev.release(), 0, Request->Cookie);
+ if (Request->InterconnectSession) {
+ handle->Rewrite(TEvInterconnect::EvForward, Request->InterconnectSession);
+ }
+ TActivationContext::Send(handle.release());
+ }
+ }
+ }
+
+ void GenerateResponse() {
+ size_t lastResponseSize;
+
+ for (const auto& item : Request->Get()->Record.GetItems()) {
+ std::optional<ui64> cookie = item.HasCookie() ? std::make_optional(item.GetCookie()) : std::nullopt;
+
+ std::optional<TKey> begin = item.HasBeginningKey()
+ ? std::make_optional(TKey::FromBinaryKey(item.GetBeginningKey(), Self->Config))
+ : std::nullopt;
+
+ std::optional<TKey> end = item.HasEndingKey()
+ ? std::make_optional(TKey::FromBinaryKey(item.GetEndingKey(), Self->Config))
+ : std::nullopt;
+
+ TScanFlags flags;
+ if (item.GetIncludeBeginning()) {
+ flags |= EScanFlags::INCLUDE_BEGIN;
+ }
+ if (item.GetIncludeEnding()) {
+ flags |= EScanFlags::INCLUDE_END;
+ }
+ if (item.GetReverse()) {
+ flags |= EScanFlags::REVERSE;
+ }
+
+ ui64 count = item.GetMaxKeys();
+
+ auto callback = [&](const TKey& key, const TValue& value) {
+ IssueResponseItem(cookie, key, value, lastResponseSize);
+ return --count != 0;
+ };
+
+ Self->Data->ScanRange(begin ? &begin.value() : nullptr, end ? &end.value() : nullptr, flags, callback);
+ }
+ }
+
+ void IssueResponseItem(std::optional<ui64> cookie, const TKey& key, const TValue& value, size_t& lastResponseSize) {
+ NKikimrBlobDepot::TEvResolveResult::TResolvedKey item;
+
+ if (cookie) {
+ item.SetCookie(*cookie);
+ }
+ item.SetKey(key.MakeBinaryKey());
+ item.MutableValueChain()->CopyFrom(value.ValueChain);
+ if (value.Meta) {
+ item.SetMeta(value.Meta.data(), value.Meta.size());
+ }
+
+ size_t itemSize = item.ByteSizeLong();
+ if (Outbox.empty() || lastResponseSize + itemSize > EventMaxByteSize) {
+ if (!Outbox.empty()) {
+ auto& lastEvent = Outbox.back();
+ lastEvent->Record.SetStatus(NKikimrProto::OVERRUN);
+ }
+ auto ev = std::make_unique<TEvBlobDepot::TEvResolveResult>(NKikimrProto::OK, std::nullopt);
+ lastResponseSize = ev->CalculateSerializedSize();
+ Outbox.push_back(std::move(ev));
+ }
+
+ auto& lastEvent = Outbox.back();
+ item.Swap(lastEvent->Record.AddResolvedKeys());
+ lastResponseSize += itemSize;
+ }
+ };
+
+ void TData::Handle(TEvBlobDepot::TEvResolve::TPtr ev) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT21, "TEvResolve", (TabletId, Self->TabletID()), (Msg, ev->Get()->ToString()),
+ (Sender, ev->Sender), (Cookie, ev->Cookie));
+ Self->Execute(std::make_unique<TTxResolve>(Self, ev));
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/garbage_collection.cpp b/ydb/core/blob_depot/garbage_collection.cpp
index 6715a61b239..ec44bbcf8ef 100644
--- a/ydb/core/blob_depot/garbage_collection.cpp
+++ b/ydb/core/blob_depot/garbage_collection.cpp
@@ -30,6 +30,7 @@ namespace NKikimr::NBlobDepot {
{}
bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ Y_VERIFY(Self->Data->IsLoaded());
Validate();
if (!Error) {
auto& record = Request->Get()->Record;
@@ -124,7 +125,7 @@ namespace NKikimr::NBlobDepot {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT14, "DeleteKey", (TabletId, Self->TabletID()), (BlobId, id));
db.Table<Schema::Data>().Key(key.MakeBinaryKey()).Delete();
auto updateTrash = [&](TLogoBlobID id) {
- db.Table<Schema::Trash>().Key(TString(id.AsBinaryString())).Update();
+ db.Table<Schema::Trash>().Key(id.AsBinaryString()).Update();
};
Self->Data->DeleteKey(key, updateTrash, this);
++NumKeysProcessed;
@@ -185,7 +186,7 @@ namespace NKikimr::NBlobDepot {
const auto key = std::make_pair(record.GetTabletId(), record.GetChannel());
auto& barrier = Barriers[key];
barrier.ProcessingQ.emplace_back(ev.Release());
- if (barrier.ProcessingQ.size() == 1) {
+ if (Self->Data->IsLoaded() && barrier.ProcessingQ.size() == 1) {
Self->Execute(std::make_unique<TTxCollectGarbage>(Self, record.GetTabletId(), record.GetChannel()));
}
}
@@ -202,4 +203,12 @@ namespace NKikimr::NBlobDepot {
*underHard = it == Barriers.end() ? false : genStep <= it->second.Hard;
}
+ void TBlobDepot::TBarrierServer::OnDataLoaded() {
+ for (auto& [key, barrier] : Barriers) {
+ if (!barrier.ProcessingQ.empty()) {
+ Self->Execute(std::make_unique<TTxCollectGarbage>(Self, key.first, key.second));
+ }
+ }
+ }
+
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/garbage_collection.h b/ydb/core/blob_depot/garbage_collection.h
index 3164828fda3..e398fcd9f11 100644
--- a/ydb/core/blob_depot/garbage_collection.h
+++ b/ydb/core/blob_depot/garbage_collection.h
@@ -31,6 +31,7 @@ namespace NKikimr::NBlobDepot {
void Handle(TEvBlobDepot::TEvCollectGarbage::TPtr ev);
bool CheckBlobForBarrier(TLogoBlobID id) const;
void GetBlobBarrierRelation(TLogoBlobID id, bool *underSoft, bool *underHard) const;
+ void OnDataLoaded();
template<typename TCallback>
void Enumerate(TCallback&& callback) {
diff --git a/ydb/core/blob_depot/mon_main.cpp b/ydb/core/blob_depot/mon_main.cpp
index 2eb4163dcc3..eb6fd40b165 100644
--- a/ydb/core/blob_depot/mon_main.cpp
+++ b/ydb/core/blob_depot/mon_main.cpp
@@ -114,7 +114,7 @@ namespace NKikimr::NBlobDepot {
Self->Data->ScanRange(nullptr, nullptr, 0, [&](const TData::TKey& key, const TData::TValue& value) {
TABLER() {
TABLED() {
- key.Output(Stream, Self->Config);
+ key.Output(Stream);
}
TABLED() {
bool first = true;
diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp
index 469bd04b49d..0f1d3154123 100644
--- a/ydb/core/blob_depot/op_commit_blob_seq.cpp
+++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp
@@ -58,23 +58,23 @@ namespace NKikimr::NBlobDepot {
if (!CheckKeyAgainstBarrier(key)) {
responseItem->SetStatus(NKikimrProto::ERROR);
- responseItem->SetErrorReason(TStringBuilder() << "BlobId# " << key.ToString(Self->Config)
+ responseItem->SetErrorReason(TStringBuilder() << "BlobId# " << key.ToString()
<< " is being put beyond the barrier");
continue;
}
+ TString valueData;
+ const bool success = value.SerializeToString(&valueData);
+ Y_VERIFY(success);
+
+ db.Table<Schema::Data>().Key(item.GetKey()).Update<Schema::Data::Value>(valueData);
+
Self->Data->PutKey(std::move(key), {
.Meta = value.GetMeta(),
.ValueChain = std::move(*value.MutableValueChain()),
.KeepState = value.GetKeepState(),
.Public = value.GetPublic(),
});
-
- TString valueData;
- const bool success = value.SerializeToString(&valueData);
- Y_VERIFY(success);
-
- db.Table<Schema::Data>().Key(item.GetKey()).Update<Schema::Data::Value>(valueData);
}
return true;
diff --git a/ydb/core/blob_depot/op_init_schema.cpp b/ydb/core/blob_depot/op_init_schema.cpp
index 6e2ba92d639..cf67bebb3f6 100644
--- a/ydb/core/blob_depot/op_init_schema.cpp
+++ b/ydb/core/blob_depot/op_init_schema.cpp
@@ -1,4 +1,5 @@
#include "blob_depot_tablet.h"
+#include "data.h"
#include "schema.h"
namespace NKikimr::NBlobDepot {
diff --git a/ydb/core/blob_depot/op_load.cpp b/ydb/core/blob_depot/op_load.cpp
index 6cf66a9f37d..e166186398e 100644
--- a/ydb/core/blob_depot/op_load.cpp
+++ b/ydb/core/blob_depot/op_load.cpp
@@ -76,68 +76,13 @@ namespace NKikimr::NBlobDepot {
}
}
- // Data
- {
- auto table = db.Table<Schema::Data>().Select();
- if (!table.IsReady()) {
- return false;
- }
- while (table.IsValid()) {
- Self->Data->AddDataOnLoad(
- TData::TKey::FromBinaryKey(table.GetValue<Schema::Data::Key>(), Self->Config),
- table.GetValue<Schema::Data::Value>()
- );
- if (!table.Next()) {
- return false;
- }
- }
- }
-
- // Trash
- {
- auto table = db.Table<Schema::Trash>().Select();
- if (!table.IsReady()) {
- return false;
- }
- while (table.IsValid()) {
- const TString& blobId = table.GetValue<Schema::Trash::BlobId>();
- Self->Data->AddTrashOnLoad(TLogoBlobID(reinterpret_cast<const ui64*>(blobId.data())));
- if (!table.Next()) {
- return false;
- }
- }
- }
-
- // GC
- {
- using T = Schema::GC;
- auto table = db.Table<T>().Select();
- if (!table.IsReady()) {
- return false;
- }
- while (table.IsValid()) {
- Self->Data->AddGenStepOnLoad(table.GetValue<T::Channel>(),
- table.GetValue<T::GroupId>(),
- TGenStep(table.GetValueOrDefault<T::IssuedGenStep>()),
- TGenStep(table.GetValueOrDefault<T::ConfirmedGenStep>()));
- if (!table.Next()) {
- return false;
- }
- }
- }
-
return true;
}
bool Precharge(NIceDb::TNiceDb& db) {
- auto config = db.Table<Schema::Config>().Select();
- auto blocks = db.Table<Schema::Blocks>().Select();
- auto barriers = db.Table<Schema::Barriers>().Select();
- auto data = db.Table<Schema::Data>().Select();
- auto trash = db.Table<Schema::Trash>().Select();
- auto confirmedGC = db.Table<Schema::GC>().Select();
- return config.IsReady() && blocks.IsReady() && barriers.IsReady() && data.IsReady() && trash.IsReady() &&
- confirmedGC.IsReady();
+ return db.Table<Schema::Config>().Precharge()
+ & db.Table<Schema::Blocks>().Precharge()
+ & db.Table<Schema::Barriers>().Precharge();
}
void Complete(const TActorContext&) override {
@@ -146,10 +91,10 @@ namespace NKikimr::NBlobDepot {
if (Configured) {
Self->InitChannelKinds();
- Self->Data->HandleTrash();
}
Self->OnLoadFinished();
+ Self->Data->StartLoad(); // we need at least Config to start correct loading of data
}
};
diff --git a/ydb/core/blob_depot/op_resolve.cpp b/ydb/core/blob_depot/op_resolve.cpp
deleted file mode 100644
index d60b0522c2d..00000000000
--- a/ydb/core/blob_depot/op_resolve.cpp
+++ /dev/null
@@ -1,88 +0,0 @@
-#include "blob_depot_tablet.h"
-#include "data.h"
-
-namespace NKikimr::NBlobDepot {
-
- void TBlobDepot::Handle(TEvBlobDepot::TEvResolve::TPtr ev) {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT21, "TEvResolve", (TabletId, TabletID()), (Msg, ev->Get()->ToString()),
- (Sender, ev->Sender), (Recipient, ev->Recipient), (Cookie, ev->Cookie));
-
- // collect records if needed
-
- auto response = std::make_unique<TEvBlobDepot::TEvResolveResult>(NKikimrProto::OK, std::nullopt);
- ui32 messageSize = response->CalculateSerializedSize();
- auto sendMessage = [&](bool more) {
- if (more) {
- response->Record.SetStatus(NKikimrProto::OVERRUN);
- }
-
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT22, "sending TEvResolveResult", (TabletId, TabletID()), (Msg, response->Record));
-
- auto handle = std::make_unique<IEventHandle>(ev->Sender, SelfId(), response.release(), 0, ev->Cookie);
- if (ev->InterconnectSession) {
- handle->Rewrite(TEvInterconnect::EvForward, ev->InterconnectSession);
- }
- TActivationContext::Send(handle.release());
-
- if (more) {
- response = std::make_unique<TEvBlobDepot::TEvResolveResult>(NKikimrProto::OK, std::nullopt);
- messageSize = response->CalculateSerializedSize();
- }
- };
-
- ui32 itemIndex = 0;
- for (const auto& item : ev->Get()->Record.GetItems()) {
- TData::TKey begin;
- TData::TKey end;
-
- if (item.HasBeginningKey()) {
- begin = TData::TKey::FromBinaryKey(item.GetBeginningKey(), Config);
- }
- if (item.HasEndingKey()) {
- end = TData::TKey::FromBinaryKey(item.GetEndingKey(), Config);
- }
-
- ui32 numItems = 0;
- auto addKey = [&](const TData::TKey& key, const TData::TValue& value) {
- NKikimrBlobDepot::TEvResolveResult::TResolvedKey resolvedKey;
- resolvedKey.SetItemIndex(itemIndex);
- resolvedKey.SetKey(key.MakeBinaryKey());
- resolvedKey.MutableValueChain()->CopyFrom(value.ValueChain);
-
- if (value.Meta) {
- resolvedKey.SetMeta(value.Meta.data(), value.Meta.size());
- }
-
- const ui32 keySize = resolvedKey.ByteSizeLong();
- if (messageSize + keySize > EventMaxByteSize) {
- sendMessage(true);
- }
-
- // put resolved key into the result
- resolvedKey.Swap(response->Record.AddResolvedKeys());
- messageSize += keySize;
- ++numItems;
-
- return !item.HasMaxKeys() || numItems != item.GetMaxKeys();
- };
-
- TData::TScanFlags flags;
- if (item.GetIncludeBeginning()) {
- flags |= TData::EScanFlags::INCLUDE_BEGIN;
- }
- if (item.GetIncludeEnding()) {
- flags |= TData::EScanFlags::INCLUDE_END;
- }
- if (item.GetReverse()) {
- flags |= TData::EScanFlags::REVERSE;
- }
-
- Data->ScanRange(item.HasBeginningKey() ? &begin : nullptr, item.HasEndingKey() ? &end : nullptr, flags, addKey);
-
- ++itemIndex;
- }
-
- sendMessage(false);
- }
-
-} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/schema.h b/ydb/core/blob_depot/schema.h
index 52a3f157503..1f168e96a37 100644
--- a/ydb/core/blob_depot/schema.h
+++ b/ydb/core/blob_depot/schema.h
@@ -68,6 +68,9 @@ namespace NKikimr::NBlobDepot {
Key,
Value
>;
+
+ static constexpr ui64 PrechargeRows = 10'000;
+ static constexpr ui64 PrechargeBytes = 1'000'000;
};
struct Trash : Table<5> {
@@ -75,6 +78,9 @@ namespace NKikimr::NBlobDepot {
using TKey = TableKey<BlobId>;
using TColumns = TableColumns<BlobId>;
+
+ static constexpr ui64 PrechargeRows = 10'000;
+ static constexpr ui64 PrechargeBytes = 1'000'000;
};
struct GC : Table<6> {
@@ -85,6 +91,9 @@ namespace NKikimr::NBlobDepot {
using TKey = TableKey<Channel, GroupId>;
using TColumns = TableColumns<Channel, GroupId, IssuedGenStep, ConfirmedGenStep>;
+
+ static constexpr ui64 PrechargeRows = 10'000;
+ static constexpr ui64 PrechargeBytes = 1'000'000;
};
using TTables = SchemaTables<
diff --git a/ydb/core/protos/blob_depot.proto b/ydb/core/protos/blob_depot.proto
index 74b9f9d7dbc..cd49115f4ce 100644
--- a/ydb/core/protos/blob_depot.proto
+++ b/ydb/core/protos/blob_depot.proto
@@ -174,10 +174,11 @@ message TEvResolve {
optional bool IncludeBeginning = 2 [default = true];
optional bytes EndingKey = 3; // end with the key beyond the last one (if not set)
optional bool IncludeEnding = 4 [default = false];
- optional uint32 MaxKeys = 5 [default = 0];
+ optional uint32 MaxKeys = 5 [default = 0]; // zero or unset value means infinite amount
optional bool ReturnMeta = 6 [default = false];
optional bool ReturnOwners = 7 [default = false];
optional bool Reverse = 8 [default = false]; // reverse output
+ optional uint64 Cookie = 9; // request cookie to match response item
}
repeated TItem Items = 1;
@@ -185,7 +186,7 @@ message TEvResolve {
message TEvResolveResult {
message TResolvedKey {
- optional uint32 ItemIndex = 1;
+ optional uint64 Cookie = 1;
optional bytes Key = 2;
repeated TValueChain ValueChain = 3;
optional bytes Meta = 4;