aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2023-02-01 12:40:07 +0300
committeralexvru <alexvru@ydb.tech>2023-02-01 12:40:07 +0300
commit959f19c2ac5e04e166d118381cb36ebfc660d25f (patch)
treee0f6488e261554a302d193ea2ca94eee3d81557a
parent72345589f0c36d577a467ab01854e20bb7e23a13 (diff)
downloadydb-959f19c2ac5e04e166d118381cb36ebfc660d25f.tar.gz
Improve BlobDepot
-rw-r--r--ydb/core/blob_depot/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/blob_depot/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/blob_depot/CMakeLists.linux.txt1
-rw-r--r--ydb/core/blob_depot/agent/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/blob_depot/agent/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/blob_depot/agent/CMakeLists.linux.txt1
-rw-r--r--ydb/core/blob_depot/agent/agent_impl.h88
-rw-r--r--ydb/core/blob_depot/agent/blob_mapping_cache.cpp159
-rw-r--r--ydb/core/blob_depot/agent/blob_mapping_cache.h15
-rw-r--r--ydb/core/blob_depot/agent/read.cpp129
-rw-r--r--ydb/core/blob_depot/agent/resolved_value.cpp65
-rw-r--r--ydb/core/blob_depot/agent/resolved_value.h47
-rw-r--r--ydb/core/blob_depot/agent/storage_discover.cpp53
-rw-r--r--ydb/core/blob_depot/agent/storage_get.cpp68
-rw-r--r--ydb/core/blob_depot/agent/storage_range.cpp35
-rw-r--r--ydb/core/blob_depot/assimilator.cpp4
-rw-r--r--ydb/core/blob_depot/blob_depot_tablet.h3
-rw-r--r--ydb/core/blob_depot/closed_interval_set.h104
-rw-r--r--ydb/core/blob_depot/closed_interval_set_ut.cpp52
-rw-r--r--ydb/core/blob_depot/coro_tx.cpp200
-rw-r--r--ydb/core/blob_depot/coro_tx.h33
-rw-r--r--ydb/core/blob_depot/data.cpp23
-rw-r--r--ydb/core/blob_depot/data.h193
-rw-r--r--ydb/core/blob_depot/data_decommit.cpp84
-rw-r--r--ydb/core/blob_depot/data_load.cpp208
-rw-r--r--ydb/core/blob_depot/data_resolve.cpp307
-rw-r--r--ydb/core/blob_depot/garbage_collection.cpp8
-rw-r--r--ydb/core/blob_depot/op_commit_blob_seq.cpp18
-rw-r--r--ydb/core/blob_depot/types.h1
-rw-r--r--ydb/core/protos/blob_depot.proto1
30 files changed, 1217 insertions, 687 deletions
diff --git a/ydb/core/blob_depot/CMakeLists.darwin.txt b/ydb/core/blob_depot/CMakeLists.darwin.txt
index e1ac07e603..601a05c08f 100644
--- a/ydb/core/blob_depot/CMakeLists.darwin.txt
+++ b/ydb/core/blob_depot/CMakeLists.darwin.txt
@@ -22,6 +22,7 @@ target_sources(ydb-core-blob_depot PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/assimilator.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blocks.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/coro_tx.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_decommit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_gc.cpp
diff --git a/ydb/core/blob_depot/CMakeLists.linux-aarch64.txt b/ydb/core/blob_depot/CMakeLists.linux-aarch64.txt
index ae8c5e7a0d..e12fd5266e 100644
--- a/ydb/core/blob_depot/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/blob_depot/CMakeLists.linux-aarch64.txt
@@ -23,6 +23,7 @@ target_sources(ydb-core-blob_depot PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/assimilator.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blocks.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/coro_tx.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_decommit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_gc.cpp
diff --git a/ydb/core/blob_depot/CMakeLists.linux.txt b/ydb/core/blob_depot/CMakeLists.linux.txt
index ae8c5e7a0d..e12fd5266e 100644
--- a/ydb/core/blob_depot/CMakeLists.linux.txt
+++ b/ydb/core/blob_depot/CMakeLists.linux.txt
@@ -23,6 +23,7 @@ target_sources(ydb-core-blob_depot PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/assimilator.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blocks.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/coro_tx.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_decommit.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_gc.cpp
diff --git a/ydb/core/blob_depot/agent/CMakeLists.darwin.txt b/ydb/core/blob_depot/agent/CMakeLists.darwin.txt
index be98146c1b..c24e4325cf 100644
--- a/ydb/core/blob_depot/agent/CMakeLists.darwin.txt
+++ b/ydb/core/blob_depot/agent/CMakeLists.darwin.txt
@@ -26,6 +26,7 @@ target_sources(core-blob_depot-agent PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/query.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/read.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/request.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/resolved_value.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/status.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_put.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_get.cpp
diff --git a/ydb/core/blob_depot/agent/CMakeLists.linux-aarch64.txt b/ydb/core/blob_depot/agent/CMakeLists.linux-aarch64.txt
index e081b10637..ef14494980 100644
--- a/ydb/core/blob_depot/agent/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/blob_depot/agent/CMakeLists.linux-aarch64.txt
@@ -27,6 +27,7 @@ target_sources(core-blob_depot-agent PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/query.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/read.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/request.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/resolved_value.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/status.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_put.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_get.cpp
diff --git a/ydb/core/blob_depot/agent/CMakeLists.linux.txt b/ydb/core/blob_depot/agent/CMakeLists.linux.txt
index e081b10637..ef14494980 100644
--- a/ydb/core/blob_depot/agent/CMakeLists.linux.txt
+++ b/ydb/core/blob_depot/agent/CMakeLists.linux.txt
@@ -27,6 +27,7 @@ target_sources(core-blob_depot-agent PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/query.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/read.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/request.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/resolved_value.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/status.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_put.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_get.cpp
diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h
index fa79201822..f9b9bb4ef0 100644
--- a/ydb/core/blob_depot/agent/agent_impl.h
+++ b/ydb/core/blob_depot/agent/agent_impl.h
@@ -1,6 +1,7 @@
#pragma once
#include "defs.h"
+#include "resolved_value.h"
namespace NKikimr::NBlobDepot {
@@ -33,8 +34,43 @@ namespace NKikimr::NBlobDepot {
struct TTabletDisconnected {};
struct TKeyResolved {
- const TResolvedValueChain* ValueChain;
- std::optional<TString> ErrorReason;
+ struct TSuccess {
+ const TResolvedValue *Value;
+ };
+ struct TError {
+ TString ErrorReason;
+ };
+ std::variant<TSuccess, TError> Outcome;
+
+ TKeyResolved(const TResolvedValue *value)
+ : Outcome(TSuccess{value})
+ {}
+
+ static constexpr struct TResolutionError {} ResolutionError{};
+
+ TKeyResolved(TResolutionError, TString errorReason)
+ : Outcome(TError{std::move(errorReason)})
+ {}
+
+ bool Error() const { return std::holds_alternative<TError>(Outcome); }
+ bool Success() const { return std::holds_alternative<TSuccess>(Outcome); }
+ const TResolvedValue *GetResolvedValue() const { return std::get<TSuccess>(Outcome).Value; }
+
+ void Output(IOutputStream& s) const {
+ if (auto *success = std::get_if<TSuccess>(&Outcome)) {
+ s << (success->Value ? success->Value->ToString() : "<no data>");
+ } else if (auto *error = std::get_if<TError>(&Outcome)) {
+ s << "Error# '" << EscapeC(error->ErrorReason) << '\'';
+ } else {
+ Y_FAIL();
+ }
+ }
+
+ TString ToString() const {
+ TStringStream s;
+ Output(s);
+ return s.Str();
+ }
};
class TRequestSender;
@@ -297,6 +333,23 @@ namespace NKikimr::NBlobDepot {
virtual void OnIdAllocated() {}
virtual void OnDestroy(bool /*success*/) {}
+ protected: // reading logic
+ struct TReadContext;
+ struct TReadArg {
+ TResolvedValue Value;
+ NKikimrBlobStorage::EGetHandleClass GetHandleClass;
+ bool MustRestoreFirst = false;
+ ui64 Offset = 0;
+ ui64 Size = 0;
+ ui64 Tag = 0;
+ std::optional<TEvBlobStorage::TEvGet::TReaderTabletData> ReaderTabletData;
+ TString Key; // the key we are reading -- this is used for retries when we are getting NODATA
+ };
+
+ bool IssueRead(TReadArg&& arg, TString& error);
+ void HandleGetResult(const TRequestContext::TPtr& context, TEvBlobStorage::TEvGetResult& msg);
+ void HandleResolveResult(const TRequestContext::TPtr& context, TEvBlobDepot::TEvResolveResult& msg);
+
public:
struct TDeleter {
static void Destroy(TQuery *query) { delete query; }
@@ -394,26 +447,6 @@ namespace NKikimr::NBlobDepot {
TBlocksManager& BlocksManager;
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
- // Reading
-
- struct TReadContext;
- struct TReadArg {
- const TResolvedValueChain& Values;
- NKikimrBlobStorage::EGetHandleClass GetHandleClass;
- bool MustRestoreFirst = false;
- TQuery *Query = nullptr;
- ui64 Offset = 0;
- ui64 Size = 0;
- ui64 Tag = 0;
- std::optional<TEvBlobStorage::TEvGet::TReaderTabletData> ReaderTabletData;
- };
-
- bool IssueRead(const TReadArg& arg, TString& error);
- static TString GetValueChainId(const TResolvedValueChain& valueChain);
-
- void HandleGetResult(const TRequestContext::TPtr& context, TEvBlobStorage::TEvGetResult& msg);
-
- ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Blob mapping cache
class TBlobMappingCache;
@@ -425,6 +458,17 @@ namespace NKikimr::NBlobDepot {
TStorageStatusFlags GetStorageStatusFlags() const;
float GetApproximateFreeSpaceShare() const;
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Logging
+
+ TString PrettyKey(const TString& key) const {
+ if (VirtualGroupId) {
+ return TLogoBlobID::FromBinary(key).ToString();
+ } else {
+ return EscapeC(key);
+ }
+ }
};
#define BDEV_QUERY(MARKER, TEXT, ...) BDEV(MARKER, TEXT, (VG, Agent.VirtualGroupId), (BDT, Agent.TabletId), \
diff --git a/ydb/core/blob_depot/agent/blob_mapping_cache.cpp b/ydb/core/blob_depot/agent/blob_mapping_cache.cpp
index 49e9d467e5..77800af63a 100644
--- a/ydb/core/blob_depot/agent/blob_mapping_cache.cpp
+++ b/ydb/core/blob_depot/agent/blob_mapping_cache.cpp
@@ -4,9 +4,11 @@ namespace NKikimr::NBlobDepot {
struct TResolveContext : TRequestContext {
TString Key;
+ bool MustRestoreFirst;
- TResolveContext(TString key)
+ TResolveContext(TString key, bool mustRestoreFirst)
: Key(std::move(key))
+ , MustRestoreFirst(mustRestoreFirst)
{}
};
@@ -15,60 +17,104 @@ namespace NKikimr::NBlobDepot {
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA28, "HandleResolveResult", (AgentId, Agent.LogId),
(Cookie, tag), (Msg, msg));
- auto process = [&](TString key, const NKikimrBlobDepot::TEvResolveResult::TResolvedKey *item) {
- const TStringBuf keyBuf = key;
- TCachedKeyItem& entry = Cache.try_emplace(std::move(key), keyBuf).first->second;
- if (item) {
- Y_VERIFY(entry.Key == item->GetKey());
- entry.Values = item->GetValueChain();
+ auto process = [&](const auto& item, bool nodata) {
+ // check if there is an error or no data attached
+ if (item.HasErrorReason() || item.GetValueChain().empty() || nodata) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA43, "HandleResolveResult error", (AgentId, Agent.LogId), (Item, item),
+ (NoData, nodata), (Key, Agent.PrettyKey(item.GetKey())));
+
+ const TString errorReason = item.HasErrorReason() ? item.GetErrorReason() : "no data attached to the key";
+ const auto it = Cache.find(item.GetKey());
+ if (it != Cache.end()) {
+ for (const auto& [id, mustRestoreFirst] : it->second.PendingQueries) {
+ Agent.OnRequestComplete(id, nodata ? TKeyResolved(nullptr) :
+ TKeyResolved(TKeyResolved::ResolutionError, errorReason),
+ Agent.OtherRequestInFlight);
+ }
+ Cache.erase(it);
+ }
} else {
- entry.Values.Clear();
- }
- Queue.PushBack(&entry);
- entry.ResolveInFlight = false;
- for (const ui64 id : std::exchange(entry.PendingQueries, {})) {
- Agent.OnRequestComplete(id, TKeyResolved{
- entry.Values.empty() ? nullptr : &entry.Values,
- item && item->HasErrorReason() ? std::make_optional(item->GetErrorReason()) : std::nullopt
- }, Agent.OtherRequestInFlight);
+ TString key = item.GetKey();
+ TStringBuf keyBuf = key;
+ const auto [it, inserted] = Cache.try_emplace(std::move(key), keyBuf);
+ auto& [key_, entry] = *it;
+
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA44, "HandleResolveResult success", (AgentId, Agent.LogId), (Item, item),
+ (NoData, nodata), (Key, Agent.PrettyKey(key_)), (CurrentValue, entry.Value));
+
+ // update value if it supersedes current one
+ if (TResolvedValue value(item); value.Supersedes(entry.Value)) {
+ entry.Value = std::move(value);
+ }
+
+ // notify matching queries about this
+ for (auto it = entry.PendingQueries.begin(); it != entry.PendingQueries.end(); ) {
+ const auto& [id, mustRestoreFirst] = *it;
+ if (mustRestoreFirst <= entry.Value.ReliablyWritten) {
+ Agent.OnRequestComplete(id, TKeyResolved(&entry.Value), Agent.OtherRequestInFlight);
+ entry.PendingQueries.erase(it++);
+ } else {
+ ++it;
+ }
+ }
+
+ if (context) {
+ auto& resolveContext = context->Obtain<TResolveContext>();
+ if (resolveContext.MustRestoreFirst) {
+ --entry.MustRestoreFirstResolvePending;
+ } else {
+ --entry.OrdinaryResolvePending;
+ }
+ }
+
+ Queue.PushBack(&entry);
+ while (Cache.size() > 1'000'000) {
+ auto& front = *Queue.Front();
+ Cache.erase(front.Key);
+ }
}
};
- for (const auto& item : msg.GetResolvedKeys()) {
- process(item.GetKey(), &item);
- if (context && context->Obtain<TResolveContext>().Key == item.GetKey()) {
- context.reset();
+ if (!context) {
+ // aside requests -- just add received data to cache
+ for (const auto& item : msg.GetResolvedKeys()) {
+ process(item, false);
+ }
+ } else {
+ auto& resolveContext = context->Obtain<TResolveContext>();
+ if (msg.ResolvedKeysSize() == 1) {
+ const auto& item = msg.GetResolvedKeys(0);
+ Y_VERIFY(item.GetKey() == resolveContext.Key);
+ process(item, false);
+ } else if (msg.ResolvedKeysSize() == 0) {
+ NKikimrBlobDepot::TEvResolveResult::TResolvedKey item;
+ item.SetKey(resolveContext.Key);
+ process(item, true);
+ } else {
+ Y_FAIL("unexpected resolve response");
}
- }
- if (context) {
- process(context->Obtain<TResolveContext>().Key, nullptr);
}
}
- const TResolvedValueChain *TBlobDepotAgent::TBlobMappingCache::ResolveKey(TString key, TQuery *query,
- TRequestContext::TPtr context, bool force) {
+ const TResolvedValue *TBlobDepotAgent::TBlobMappingCache::ResolveKey(TString key, TQuery *query,
+ TRequestContext::TPtr context, bool mustRestoreFirst) {
+ // obtain entry in the cache
const TStringBuf keyBuf = key;
- auto& entry = Cache.try_emplace(std::move(key), keyBuf).first->second;
- if (!entry.Values.empty() && !force) {
- return &entry.Values;
- }
- if (!entry.ResolveInFlight) {
- entry.ResolveInFlight = true;
+ const auto [it, inserted] = Cache.try_emplace(std::move(key), keyBuf);
+ auto& entry = it->second;
- NKikimrBlobDepot::TEvResolve msg;
- auto *item = msg.AddItems();
- item->SetExactKey(TString(keyBuf));
-
- if (Agent.VirtualGroupId) {
- const auto& id = TLogoBlobID::FromBinary(keyBuf);
- item->SetTabletId(id.TabletID());
- }
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA45, "ResolveKey", (AgentId, Agent.LogId), (Key, Agent.PrettyKey(it->first)),
+ (MustRestoreFirst, mustRestoreFirst), (Value, entry.Value), (OrdinaryResolvePending, entry.OrdinaryResolvePending),
+ (MustRestoreFirstResolvePending, entry.MustRestoreFirstResolvePending));
- Agent.Issue(std::move(msg), this, std::make_unique<TResolveContext>(TString(keyBuf)));
+ // return cached value if we have all conditions met for this query
+ if (!entry.Value.IsEmpty() && mustRestoreFirst <= entry.Value.ReliablyWritten) {
+ return &entry.Value;
}
+ // register query-local request for the key
const ui64 id = Agent.NextOtherRequestId++;
- const bool inserted1 = entry.PendingQueries.emplace(id).second;
+ const bool inserted1 = entry.PendingQueries.emplace(id, mustRestoreFirst).second;
Y_VERIFY(inserted1);
auto cancelCallback = [&entry, id] {
const size_t numErased = entry.PendingQueries.erase(id);
@@ -76,6 +122,19 @@ namespace NKikimr::NBlobDepot {
};
Agent.RegisterRequest(id, query, std::move(context), std::move(cancelCallback), false);
+ // see if we have to issue the query
+ if (!entry.MustRestoreFirstResolvePending && (mustRestoreFirst || !entry.OrdinaryResolvePending)) {
+ NKikimrBlobDepot::TEvResolve msg;
+ auto *item = msg.AddItems();
+ item->SetExactKey(TString(keyBuf));
+ if (mustRestoreFirst != item->GetMustRestoreFirst()) {
+ item->SetMustRestoreFirst(mustRestoreFirst);
+ }
+
+ Agent.Issue(std::move(msg), this, std::make_unique<TResolveContext>(TString(keyBuf), mustRestoreFirst));
+ ++(mustRestoreFirst ? entry.MustRestoreFirstResolvePending : entry.OrdinaryResolvePending);
+ }
+
return nullptr;
}
@@ -85,12 +144,18 @@ namespace NKikimr::NBlobDepot {
} else if (std::holds_alternative<TTabletDisconnected>(response)) {
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA38, "TBlobMappingCache::TTabletDisconnected",
(AgentId, Agent.LogId), (Cookie, tag));
- if (auto resolveContext = std::dynamic_pointer_cast<TResolveContext>(context)) {
- if (const auto it = Cache.find(resolveContext->Key); it != Cache.end() && it->second.ResolveInFlight) {
- for (const ui64 id : std::exchange(it->second.PendingQueries, {})) {
- Agent.OnRequestComplete(id, response, Agent.OtherRequestInFlight);
- }
- it->second.ResolveInFlight = false;
+ auto& resolveContext = context->Obtain<TResolveContext>();
+ if (const auto it = Cache.find(resolveContext.Key); it != Cache.end()) {
+ for (const auto& [id, mustRestoreFirst] : std::exchange(it->second.PendingQueries, {})) {
+ Agent.OnRequestComplete(id, TKeyResolved(TKeyResolved::ResolutionError, "BlobDepot tablet disconnected"),
+ Agent.OtherRequestInFlight);
+ }
+ if (!it->second.Value.IsEmpty()) {
+ --(resolveContext.MustRestoreFirst
+ ? it->second.MustRestoreFirstResolvePending
+ : it->second.OrdinaryResolvePending);
+ } else {
+ Cache.erase(it);
}
}
} else {
diff --git a/ydb/core/blob_depot/agent/blob_mapping_cache.h b/ydb/core/blob_depot/agent/blob_mapping_cache.h
index f3d74a7030..57aaa3ff4c 100644
--- a/ydb/core/blob_depot/agent/blob_mapping_cache.h
+++ b/ydb/core/blob_depot/agent/blob_mapping_cache.h
@@ -9,13 +9,14 @@ namespace NKikimr::NBlobDepot {
: public TRequestSender
{
struct TCachedKeyItem : TIntrusiveListItem<TCachedKeyItem> {
- TStringBuf Key;
- TResolvedValueChain Values;
- bool ResolveInFlight = false;
- THashSet<ui64> PendingQueries;
+ TStringBuf Key; // key buffer (view of key part of the Cache set)
+ TResolvedValue Value; // recently resolved value, if any
+ ui32 OrdinaryResolvePending = 0;
+ ui32 MustRestoreFirstResolvePending = 0;
+ THashMap<ui64, bool> PendingQueries; // a set of queries waiting for this blob
TCachedKeyItem(TStringBuf key)
- : Key(std::move(key))
+ : Key(key)
{}
};
@@ -28,7 +29,9 @@ namespace NKikimr::NBlobDepot {
{}
void HandleResolveResult(ui64 tag, const NKikimrBlobDepot::TEvResolveResult& msg, TRequestContext::TPtr context);
- const TResolvedValueChain *ResolveKey(TString key, TQuery *query, TRequestContext::TPtr context, bool force = false);
+ const TResolvedValue *ResolveKey(TString key, TQuery *query, TRequestContext::TPtr context, bool mustRestoreFirst);
+
+ private:
void ProcessResponse(ui64 tag, TRequestContext::TPtr /*context*/, TResponse response) override;
};
diff --git a/ydb/core/blob_depot/agent/read.cpp b/ydb/core/blob_depot/agent/read.cpp
index c4888b6d24..d367c21b56 100644
--- a/ydb/core/blob_depot/agent/read.cpp
+++ b/ydb/core/blob_depot/agent/read.cpp
@@ -2,27 +2,34 @@
namespace NKikimr::NBlobDepot {
- struct TBlobDepotAgent::TReadContext : TRequestContext {
- TQuery *Query;
- const ui64 Tag;
+ struct TBlobDepotAgent::TQuery::TReadContext : TRequestContext {
+ TReadArg ReadArg;
const ui64 Size;
TString Buffer;
bool Terminated = false;
ui32 NumPartsPending = 0;
+ TLogoBlobID BlobWithoutData;
- TReadContext(TQuery *query, ui64 tag, ui64 size)
- : Query(query)
- , Tag(tag)
+ TReadContext(TReadArg&& readArg, ui64 size)
+ : ReadArg(std::move(readArg))
, Size(size)
{}
- void EndWithError(NKikimrProto::EReplyStatus status, TString errorReason) {
- Query->OnRead(Tag, status, errorReason);
+ void EndWithError(TQuery *query, NKikimrProto::EReplyStatus status, TString errorReason) {
+ query->OnRead(ReadArg.Tag, status, errorReason);
Terminated = true;
}
- void EndWithSuccess() {
- Query->OnRead(Tag, NKikimrProto::OK, std::move(Buffer));
+ void Abort() {
+ Terminated = true;
+ }
+
+ void EndWithSuccess(TQuery *query) {
+ query->OnRead(ReadArg.Tag, NKikimrProto::OK, std::move(Buffer));
+ }
+
+ ui64 GetTag() const {
+ return ReadArg.Tag;
}
struct TPartContext : TRequestContext {
@@ -35,7 +42,10 @@ namespace NKikimr::NBlobDepot {
};
};
- bool TBlobDepotAgent::IssueRead(const TReadArg& arg, TString& error) {
+ bool TBlobDepotAgent::TQuery::IssueRead(TReadArg&& arg, TString& error) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA34, "IssueRead", (AgentId, Agent.LogId), (QueryId, GetQueryId()),
+ (ReadId, arg.Tag), (Offset, arg.Offset), (Size, arg.Size), (Value, arg.Value));
+
ui64 outputOffset = 0;
struct TReadItem {
@@ -50,16 +60,16 @@ namespace NKikimr::NBlobDepot {
ui64 offset = arg.Offset;
ui64 size = arg.Size;
- for (const auto& value : arg.Values) {
- const ui32 groupId = value.GetGroupId();
- const auto blobId = LogoBlobIDFromLogoBlobID(value.GetBlobId());
- const ui64 begin = value.GetSubrangeBegin();
- const ui64 end = value.HasSubrangeEnd() ? value.GetSubrangeEnd() : blobId.BlobSize();
+ for (const auto& value : arg.Value.Chain) {
+ const ui32 groupId = value.GroupId;
+ const auto& blobId = value.BlobId;
+ const ui32 begin = value.SubrangeBegin;
+ const ui32 end = value.SubrangeEnd;
if (end <= begin || blobId.BlobSize() < end) {
error = "incorrect SubrangeBegin/SubrangeEnd pair";
- STLOG(PRI_CRIT, BLOB_DEPOT_AGENT, BDA24, error, (AgentId, LogId), (TabletId, TabletId),
- (Values, FormatList(arg.Values)));
+ STLOG(PRI_CRIT, BLOB_DEPOT_AGENT, BDA24, error, (AgentId, Agent.LogId), (QueryId, GetQueryId()),
+ (ReadId, arg.Tag), (Offset, arg.Offset), (Size, arg.Size), (Value, arg.Value));
return false;
}
@@ -90,14 +100,14 @@ namespace NKikimr::NBlobDepot {
if (size) {
error = "incorrect offset/size provided";
- STLOG(PRI_ERROR, BLOB_DEPOT_AGENT, BDA25, error, (AgentId, LogId), (TabletId, TabletId),
- (Offset, arg.Offset), (Size, arg.Size), (Values, FormatList(arg.Values)));
+ STLOG(PRI_ERROR, BLOB_DEPOT_AGENT, BDA25, error, (AgentId, Agent.LogId), (QueryId, GetQueryId()),
+ (ReadId, arg.Tag), (Offset, arg.Offset), (Size, arg.Size), (Value, arg.Value));
return false;
}
- auto context = std::make_shared<TReadContext>(arg.Query, arg.Tag, outputOffset);
+ auto context = std::make_shared<TReadContext>(std::move(arg), outputOffset);
if (!outputOffset) {
- context->EndWithSuccess();
+ context->EndWithSuccess(this);
return true;
}
@@ -118,9 +128,15 @@ namespace NKikimr::NBlobDepot {
partContext->Offsets.push_back(outputOffset);
}
- auto event = std::make_unique<TEvBlobStorage::TEvGet>(q, sz, TInstant::Max(), arg.GetHandleClass, arg.MustRestoreFirst);
- event->ReaderTabletData = arg.ReaderTabletData;
- SendToProxy(groupId, std::move(event), arg.Query, std::move(partContext));
+ // when the TEvGet query is sent to the underlying proxy, MustRestoreFirst must be cleared, or else it may
+ // lead to ERROR due to impossibility of writes; all MustRestoreFirst should be handled by the TEvResolve
+ // query
+ auto event = std::make_unique<TEvBlobStorage::TEvGet>(q, sz, TInstant::Max(), context->ReadArg.GetHandleClass,
+ context->ReadArg.MustRestoreFirst && groupId != Agent.DecommitGroupId);
+ event->ReaderTabletData = context->ReadArg.ReaderTabletData;
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA39, "issuing TEvGet", (AgentId, Agent.LogId), (QueryId, GetQueryId()),
+ (ReadId, context->GetTag()), (GroupId, groupId), (Msg, *event));
+ Agent.SendToProxy(groupId, std::move(event), this, std::move(partContext));
++context->NumPartsPending;
}
@@ -129,35 +145,32 @@ namespace NKikimr::NBlobDepot {
return true;
}
- TString TBlobDepotAgent::GetValueChainId(const TResolvedValueChain& valueChain) {
- constexpr ui8 separator = 7;
- TString s;
- for (auto it = valueChain.begin(); it != valueChain.end(); ++it) {
- if (it != valueChain.begin()) {
- s += separator;
- }
- const bool success = it->AppendToString(&s);
- Y_VERIFY_DEBUG(success);
- }
- return s;
- }
-
- void TBlobDepotAgent::HandleGetResult(const TRequestContext::TPtr& context, TEvBlobStorage::TEvGetResult& msg) {
+ void TBlobDepotAgent::TQuery::HandleGetResult(const TRequestContext::TPtr& context, TEvBlobStorage::TEvGetResult& msg) {
auto& partContext = context->Obtain<TReadContext::TPartContext>();
auto& readContext = *partContext.Read;
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA41, "HandleGetResult", (AgentId, Agent.LogId), (ReadId, readContext.GetTag()),
+ (Msg, msg));
if (readContext.Terminated) {
return; // just ignore this read
}
if (msg.Status != NKikimrProto::OK) {
- readContext.EndWithError(msg.Status, std::move(msg.ErrorReason));
+ readContext.EndWithError(this, msg.Status, std::move(msg.ErrorReason));
} else {
Y_VERIFY(msg.ResponseSz == partContext.Offsets.size());
for (ui32 i = 0; i < msg.ResponseSz; ++i) {
auto& blob = msg.Responses[i];
- if (blob.Status != NKikimrProto::OK) {
- return readContext.EndWithError(blob.Status, TStringBuilder() << "failed to read BlobId# " << blob.Id);
+ if (blob.Status == NKikimrProto::NODATA) {
+ NKikimrBlobDepot::TEvResolve resolve;
+ auto *item = resolve.AddItems();
+ item->SetExactKey(readContext.ReadArg.Key);
+ Agent.Issue(std::move(resolve), this, partContext.Read);
+ readContext.Abort();
+ readContext.BlobWithoutData = blob.Id;
+ return;
+ } else if (blob.Status != NKikimrProto::OK) {
+ return readContext.EndWithError(this, blob.Status, TStringBuilder() << "failed to read BlobId# " << blob.Id);
}
auto& buffer = readContext.Buffer;
@@ -177,8 +190,38 @@ namespace NKikimr::NBlobDepot {
}
if (!--readContext.NumPartsPending) {
- readContext.EndWithSuccess();
+ readContext.EndWithSuccess(this);
+ }
+ }
+ }
+
+ void TBlobDepotAgent::TQuery::HandleResolveResult(const TRequestContext::TPtr& context, TEvBlobDepot::TEvResolveResult& msg) {
+ auto& readContext = context->Obtain<TReadContext>();
+ STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA42, "HandleResolveResult", (AgentId, Agent.LogId), (QueryId, GetQueryId()),
+ (ReadId, readContext.GetTag()), (Msg, msg.Record));
+ if (msg.Record.GetStatus() != NKikimrProto::OK) {
+ readContext.EndWithError(this, msg.Record.GetStatus(), msg.Record.GetErrorReason());
+ } else if (msg.Record.ResolvedKeysSize() == 1) {
+ const auto& item = msg.Record.GetResolvedKeys(0);
+ if (TResolvedValue value(item); value.Supersedes(readContext.ReadArg.Value)) { // value chain has changed, we have to try again
+ readContext.ReadArg.Value = std::move(value);
+ TString error;
+ if (!IssueRead(std::move(readContext.ReadArg), error)) {
+ readContext.EndWithError(this, NKikimrProto::ERROR, TStringBuilder() << "failed to restart read Error# " << error);
+ }
+ } else if (!item.GetReliablyWritten()) { // this was unassimilated value and we got NODATA for it
+ readContext.EndWithError(this, NKikimrProto::NODATA, {});
+ } else {
+ Y_VERIFY_DEBUG_S(false, "data is lost AgentId# " << Agent.LogId << " QueryId# " << GetQueryId()
+ << " ReadId# " << readContext.GetTag() << " BlobId# " << readContext.BlobWithoutData);
+ STLOG(PRI_CRIT, BLOB_DEPOT_AGENT, BDA40, "failed to read blob -- data is lost", (AgentId, Agent.LogId),
+ (QueryId, GetQueryId()), (ReadId, readContext.GetTag()), (BlobId, readContext.BlobWithoutData));
+ readContext.EndWithError(this, NKikimrProto::ERROR, TStringBuilder() << "failed to read BlobId# "
+ << readContext.BlobWithoutData << ": data is lost");
}
+ } else {
+ Y_VERIFY(!msg.Record.ResolvedKeysSize());
+ readContext.EndWithError(this, NKikimrProto::NODATA, {});
}
}
diff --git a/ydb/core/blob_depot/agent/resolved_value.cpp b/ydb/core/blob_depot/agent/resolved_value.cpp
new file mode 100644
index 0000000000..a4564c5d75
--- /dev/null
+++ b/ydb/core/blob_depot/agent/resolved_value.cpp
@@ -0,0 +1,65 @@
+#include "resolved_value.h"
+
+namespace NKikimr::NBlobDepot {
+
+ TResolvedValue::TLink::TLink(const NKikimrBlobDepot::TResolvedValueChain& link)
+ : BlobId(LogoBlobIDFromLogoBlobID(link.GetBlobId()))
+ , GroupId(link.GetGroupId())
+ , SubrangeBegin(link.GetSubrangeBegin())
+ , SubrangeEnd(link.HasSubrangeEnd() ? link.GetSubrangeEnd() : BlobId.BlobSize())
+ {
+ Y_VERIFY_DEBUG(link.HasBlobId() && link.HasGroupId());
+ }
+
+ void TResolvedValue::TLink::Output(IOutputStream& s) const {
+ s << BlobId << '@' << GroupId << '{' << SubrangeBegin << '-' << SubrangeEnd - 1 << '}';
+ }
+
+ TString TResolvedValue::TLink::ToString() const {
+ TStringStream s;
+ Output(s);
+ return s.Str();
+ }
+
+ TResolvedValue::TResolvedValue(const NKikimrBlobDepot::TEvResolveResult::TResolvedKey& item)
+ : Defined(true)
+ , ReliablyWritten(item.GetReliablyWritten())
+ , Version(item.GetValueVersion())
+ , Chain([](auto& x) { return decltype(Chain)(x.begin(), x.end()); }(item.GetValueChain()))
+ {}
+
+ bool TResolvedValue::Supersedes(const TResolvedValue& old) const {
+ Y_VERIFY(Defined);
+ if (!old.Defined) {
+ return true;
+ } else if (Version < old.Version) {
+ return false;
+ } else if (Version == old.Version) {
+ Y_VERIFY(Chain == old.Chain && ReliablyWritten == old.ReliablyWritten);
+ return false;
+ } else {
+ Y_VERIFY(old.ReliablyWritten <= ReliablyWritten); // item can't suddenly become unreliably written
+ return true;
+ }
+ }
+
+ void TResolvedValue::Output(IOutputStream& s) const {
+ if (Defined) {
+ s << '{' << FormatList(Chain) << " Version# " << Version << " ReliablyWritten# " << ReliablyWritten << '}';
+ } else {
+ s << "{}";
+ }
+ }
+
+ TString TResolvedValue::ToString() const {
+ TStringStream s;
+ Output(s);
+ return s.Str();
+ }
+
+} // NKikimr::NBlobDepot
+
+template<>
+void Out<NKikimr::NBlobDepot::TResolvedValue::TLink>(IOutputStream& s, const NKikimr::NBlobDepot::TResolvedValue::TLink& x) {
+ x.Output(s);
+}
diff --git a/ydb/core/blob_depot/agent/resolved_value.h b/ydb/core/blob_depot/agent/resolved_value.h
new file mode 100644
index 0000000000..a3194e65ec
--- /dev/null
+++ b/ydb/core/blob_depot/agent/resolved_value.h
@@ -0,0 +1,47 @@
+#pragma once
+
+#include "defs.h"
+
+namespace NKikimr::NBlobDepot {
+
+ struct TResolvedValue {
+ struct TLink {
+ TLogoBlobID BlobId;
+ ui32 GroupId;
+ ui32 SubrangeBegin;
+ ui32 SubrangeEnd;
+
+ TLink(const NKikimrBlobDepot::TResolvedValueChain& link);
+
+ void Output(IOutputStream& s) const;
+ TString ToString() const;
+
+ friend bool operator ==(const TLink& x, const TLink& y) {
+ return x.BlobId == y.BlobId && x.GroupId == y.GroupId && x.SubrangeBegin == y.SubrangeBegin &&
+ x.SubrangeEnd == y.SubrangeEnd;
+ }
+ };
+
+ bool Defined = false;
+ bool ReliablyWritten = false;
+ ui32 Version = 0;
+ std::vector<TLink> Chain;
+
+ TResolvedValue() = default;
+ TResolvedValue(const TResolvedValue&) = default;
+ TResolvedValue(TResolvedValue&&) = default;
+ TResolvedValue(const NKikimrBlobDepot::TEvResolveResult::TResolvedKey& item);
+
+ TResolvedValue& operator =(const TResolvedValue&) = default;
+ TResolvedValue& operator =(TResolvedValue&&) = default;
+
+ bool Supersedes(const TResolvedValue& old) const;
+ void Output(IOutputStream& s) const;
+ TString ToString() const;
+
+ bool IsEmpty() const { // check if no data attached
+ return Chain.empty();
+ }
+ };
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/agent/storage_discover.cpp b/ydb/core/blob_depot/agent/storage_discover.cpp
index 27bfab0593..da5deaf459 100644
--- a/ydb/core/blob_depot/agent/storage_discover.cpp
+++ b/ydb/core/blob_depot/agent/storage_discover.cpp
@@ -16,10 +16,6 @@ namespace NKikimr::NBlobDepot {
TString Buffer;
ui32 BlockedGeneration = 0;
- std::unordered_set<TString> ValueChainsWithNodata;
- TString ValueChain;
- bool IsUnassimilated = false;
-
NKikimrBlobDepot::TEvResolve Resolve;
public:
@@ -69,9 +65,13 @@ namespace NKikimr::NBlobDepot {
if (std::holds_alternative<TTabletDisconnected>(response)) {
return EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected");
} else if (auto *p = std::get_if<TEvBlobStorage::TEvGetResult*>(&response)) {
- Agent.HandleGetResult(context, **p);
+ TQuery::HandleGetResult(context, **p);
} else if (auto *p = std::get_if<TEvBlobDepot::TEvResolveResult*>(&response)) {
- HandleResolveResult(id, std::move(context), **p);
+ if (context) {
+ TQuery::HandleResolveResult(std::move(context), **p);
+ } else {
+ HandleResolveResult(id, std::move(context), **p);
+ }
} else {
Y_FAIL();
}
@@ -115,20 +115,18 @@ namespace NKikimr::NBlobDepot {
// FIXME(alexvru): hypothetically this can be considered normal and we may continue scan
return EndWithError(NKikimrProto::ERROR, TStringBuilder() << "empty ValueChain");
}
- ValueChain = GetValueChainId(item.GetValueChain());
- IsUnassimilated = item.ValueChainSize() == 1 && item.GetValueChain(0).GetGroupId() == Agent.DecommitGroupId &&
- LogoBlobIDFromLogoBlobID(item.GetValueChain(0).GetBlobId()) == Id;
TReadArg arg{
- item.GetValueChain(),
+ item,
NKikimrBlobStorage::Discover,
true,
- this,
0,
0,
0,
- {}};
+ {},
+ item.GetKey(),
+ };
TString error;
- if (!Agent.IssueRead(arg, error)) {
+ if (!IssueRead(std::move(arg), error)) {
return EndWithError(NKikimrProto::ERROR, TStringBuilder() << "failed to read discovered blob: "
<< error);
}
@@ -152,29 +150,26 @@ namespace NKikimr::NBlobDepot {
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA20, "OnRead", (AgentId, Agent.LogId),
(QueryId, GetQueryId()), (Status, status));
- if (status == NKikimrProto::OK) {
- Buffer = std::move(dataOrErrorReason);
- DoneWithData = true;
- CheckIfDone();
- } else if (status == NKikimrProto::NODATA) {
- if (ValueChainsWithNodata.insert(std::exchange(ValueChain, {})).second) {
- // this may indicate a data race between locator and key value, we have to restart our resolution query
- IssueResolve();
- } else if (IsUnassimilated) {
+ switch (status) {
+ case NKikimrProto::OK:
+ Buffer = std::move(dataOrErrorReason);
+ DoneWithData = true;
+ CheckIfDone();
+ break;
+
+ case NKikimrProto::NODATA: {
// we are reading blob from the original group and it may be partially written -- it is totally
// okay to have some; we need to advance to the next readable blob
auto *range = Resolve.MutableItems(0)->MutableKeyRange();
range->SetEndingKey(Id.AsBinaryString());
range->ClearIncludeEnding();
IssueResolve();
- } else {
- Y_VERIFY_DEBUG_S(false, "data is lost AgentId# " << Agent.LogId << " BlobId# " << Id);
- STLOG(PRI_CRIT, BLOB_DEPOT_AGENT, BDA39, "failed to Discover blob -- data is lost",
- (AgentId, Agent.LogId), (BlobId, Id));
- status = NKikimrProto::ERROR;
+ break;
}
- } else {
- EndWithError(status, dataOrErrorReason);
+
+ default:
+ EndWithError(status, std::move(dataOrErrorReason));
+ break;
}
}
diff --git a/ydb/core/blob_depot/agent/storage_get.cpp b/ydb/core/blob_depot/agent/storage_get.cpp
index a923a5ec53..4e97c34738 100644
--- a/ydb/core/blob_depot/agent/storage_get.cpp
+++ b/ydb/core/blob_depot/agent/storage_get.cpp
@@ -9,8 +9,6 @@ namespace NKikimr::NBlobDepot {
class TGetQuery : public TBlobStorageQuery<TEvBlobStorage::TEvGet> {
std::unique_ptr<TEvBlobStorage::TEvGetResult> Response;
ui32 AnswersRemain;
- std::vector<TString> ValueChainsInFlight;
- std::unordered_set<std::tuple<ui64, TString>> ValueChainsWithNodata;
struct TResolveKeyContext : TRequestContext {
ui32 QueryIdx;
@@ -35,7 +33,6 @@ namespace NKikimr::NBlobDepot {
Response = std::make_unique<TEvBlobStorage::TEvGetResult>(NKikimrProto::OK, Request.QuerySize,
Agent.VirtualGroupId);
AnswersRemain = Request.QuerySize;
- ValueChainsInFlight.resize(Request.QuerySize);
if (Request.ReaderTabletData) {
auto status = Agent.BlocksManager.CheckBlockForTablet(Request.ReaderTabletData->Id, Request.ReaderTabletData->Generation, this, nullptr);
@@ -54,9 +51,9 @@ namespace NKikimr::NBlobDepot {
response.RequestedSize = query.Size;
TString blobId = query.Id.AsBinaryString();
- if (const TResolvedValueChain *value = Agent.BlobMappingCache.ResolveKey(blobId, this,
- std::make_shared<TResolveKeyContext>(i))) {
- if (!ProcessSingleResult(i, value, std::nullopt)) {
+ if (const TResolvedValue *value = Agent.BlobMappingCache.ResolveKey(blobId, this,
+ std::make_shared<TResolveKeyContext>(i), Request.MustRestoreFirst)) {
+ if (!ProcessSingleResult(i, value)) {
return; // error occured
}
} else {
@@ -68,52 +65,35 @@ namespace NKikimr::NBlobDepot {
CheckAndFinish();
}
- bool ProcessSingleResult(ui32 queryIdx, const TResolvedValueChain *value, const std::optional<TString>& errorReason) {
+ bool ProcessSingleResult(ui32 queryIdx, const TKeyResolved& result) {
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA27, "ProcessSingleResult", (AgentId, Agent.LogId),
- (QueryId, GetQueryId()), (QueryIdx, queryIdx), (Value, value), (ErrorReason, errorReason));
+ (QueryId, GetQueryId()), (QueryIdx, queryIdx), (Result, result));
auto& r = Response->Responses[queryIdx];
Y_VERIFY(r.Status == NKikimrProto::UNKNOWN);
- if (errorReason) {
+ if (result.Error()) {
r.Status = NKikimrProto::ERROR;
--AnswersRemain;
- } else if (!value || value->empty()) {
+ } else if (const TResolvedValue *value = result.GetResolvedValue(); !value) {
r.Status = NKikimrProto::NODATA;
--AnswersRemain;
} else if (Request.IsIndexOnly) {
r.Status = NKikimrProto::OK;
--AnswersRemain;
} else {
- ValueChainsInFlight[queryIdx] = GetValueChainId(*value);
+ Y_VERIFY(Request.MustRestoreFirst <= value->ReliablyWritten);
TReadArg arg{
*value,
Request.GetHandleClass,
Request.MustRestoreFirst,
- this,
Request.Queries[queryIdx].Shift,
Request.Queries[queryIdx].Size,
queryIdx,
- Request.ReaderTabletData};
- TString error;
- auto makeValueChain = [&] {
- TStringStream str;
- str << '[';
- for (int i = 0; i < value->size(); ++i) {
- const auto& item = value->at(i);
- if (i != 0) {
- str << ' ';
- }
- const auto blobId = LogoBlobIDFromLogoBlobID(item.GetBlobId());
- const ui64 subrangeBegin = item.GetSubrangeBegin();
- const ui64 subrangeEnd = item.HasSubrangeEnd() ? item.GetSubrangeEnd() : blobId.BlobSize();
- str << blobId << '@' << item.GetGroupId() << '{' << subrangeBegin << '-' << (subrangeEnd - 1) << '}';
- }
- str << ']';
- return str.Str();
+ Request.ReaderTabletData,
+ Request.Queries[queryIdx].Id.AsBinaryString(),
};
- STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA34, "IssueRead", (AgentId, Agent.LogId),
- (Offset, arg.Offset), (Size, arg.Size), (ValueChain, makeValueChain()), (Tag, arg.Tag));
- const bool success = Agent.IssueRead(arg, error);
+ TString error;
+ const bool success = IssueRead(std::move(arg), error);
if (!success) {
EndWithError(NKikimrProto::ERROR, std::move(error));
return false;
@@ -127,24 +107,6 @@ namespace NKikimr::NBlobDepot {
(Tag, tag), (Status, status), (Buffer.size, status == NKikimrProto::OK ? buffer.size() : 0),
(ErrorReason, status != NKikimrProto::OK ? buffer : ""));
- if (status == NKikimrProto::NODATA) { // we have to retry this read, this may be a race between blob movement
- const auto& q = Request.Queries[tag];
- if (ValueChainsWithNodata.emplace(tag, std::exchange(ValueChainsInFlight[tag], {})).second) { // real race
- const TResolvedValueChain *value = Agent.BlobMappingCache.ResolveKey(
- q.Id.AsBinaryString(),
- this,
- std::make_shared<TResolveKeyContext>(tag),
- true);
- Y_VERIFY(!value);
- return;
- } else {
- Y_VERIFY_DEBUG_S(false, "data is lost AgentId# " << Agent.LogId << " BlobId# " << q.Id);
- STLOG(PRI_CRIT, BLOB_DEPOT_AGENT, BDA41, "failed to Get blob -- data is lost",
- (AgentId, Agent.LogId), (BlobId, q.Id));
- status = NKikimrProto::ERROR;
- }
- }
-
auto& resp = Response->Responses[tag];
Y_VERIFY(resp.Status == NKikimrProto::UNKNOWN);
resp.Status = status;
@@ -195,10 +157,12 @@ namespace NKikimr::NBlobDepot {
void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr context, TResponse response) override {
if (auto *p = std::get_if<TKeyResolved>(&response)) {
- ProcessSingleResult(context->Obtain<TResolveKeyContext>().QueryIdx, p->ValueChain, p->ErrorReason);
+ ProcessSingleResult(context->Obtain<TResolveKeyContext>().QueryIdx, *p);
CheckAndFinish();
} else if (auto *p = std::get_if<TEvBlobStorage::TEvGetResult*>(&response)) {
- Agent.HandleGetResult(context, **p);
+ TQuery::HandleGetResult(context, **p);
+ } else if (auto *p = std::get_if<TEvBlobDepot::TEvResolveResult*>(&response)) {
+ TQuery::HandleResolveResult(context, **p);
} else if (std::holds_alternative<TTabletDisconnected>(response)) {
if (auto *resolveContext = dynamic_cast<TResolveKeyContext*>(context.get())) {
STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA26, "TTabletDisconnected", (AgentId, Agent.LogId),
diff --git a/ydb/core/blob_depot/agent/storage_range.cpp b/ydb/core/blob_depot/agent/storage_range.cpp
index 719a09f757..70e50a56f1 100644
--- a/ydb/core/blob_depot/agent/storage_range.cpp
+++ b/ydb/core/blob_depot/agent/storage_range.cpp
@@ -7,7 +7,6 @@ namespace NKikimr::NBlobDepot {
class TRangeQuery : public TBlobStorageQuery<TEvBlobStorage::TEvRange> {
struct TRead {
TLogoBlobID Id;
- TString ValueChain;
};
std::unique_ptr<TEvBlobStorage::TEvRangeResult> Response;
@@ -15,7 +14,6 @@ namespace NKikimr::NBlobDepot {
ui32 ResolvesInFlight = 0;
std::map<TLogoBlobID, TString> FoundBlobs;
std::vector<TRead> Reads;
- std::unordered_set<std::tuple<ui64, TString>> ValueChainsWithNodata; // processed value chains with this status
bool Reverse = false;
bool Finished = false;
@@ -68,9 +66,13 @@ namespace NKikimr::NBlobDepot {
void ProcessResponse(ui64 id, TRequestContext::TPtr context, TResponse response) override {
if (auto *p = std::get_if<TEvBlobDepot::TEvResolveResult*>(&response)) {
- HandleResolveResult(id, std::move(context), (*p)->Record);
+ if (context) {
+ TQuery::HandleResolveResult(std::move(context), **p);
+ } else {
+ HandleResolveResult(id, std::move(context), (*p)->Record);
+ }
} else if (auto *p = std::get_if<TEvBlobStorage::TEvGetResult*>(&response)) {
- Agent.HandleGetResult(context, **p);
+ TQuery::HandleGetResult(context, **p);
} else if (std::holds_alternative<TTabletDisconnected>(response)) {
EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected");
} else {
@@ -96,25 +98,24 @@ namespace NKikimr::NBlobDepot {
FoundBlobs.try_emplace(id);
} else if (key.ValueChainSize()) {
const ui64 tag = key.HasCookie() ? key.GetCookie() : Reads.size();
- TString valueChain = GetValueChainId(key.GetValueChain());
if (tag == Reads.size()) {
- Reads.push_back(TRead{id, std::move(valueChain)});
+ Reads.push_back(TRead{id});
} else {
Y_VERIFY(Reads[tag].Id == id);
- Reads[tag].ValueChain = std::move(valueChain);
}
TReadArg arg{
- key.GetValueChain(),
+ key,
NKikimrBlobStorage::EGetHandleClass::FastRead,
Request.MustRestoreFirst,
- this,
0,
0,
tag,
- {}};
+ {},
+ key.GetKey(),
+ };
++ReadsInFlight;
TString error;
- if (!Agent.IssueRead(arg, error)) {
+ if (!IssueRead(std::move(arg), error)) {
return EndWithError(NKikimrProto::ERROR, TStringBuilder() << "failed to read discovered blob: "
<< error);
}
@@ -133,7 +134,6 @@ namespace NKikimr::NBlobDepot {
Y_VERIFY(tag < Reads.size());
TRead& read = Reads[tag];
- Y_VERIFY(read.ValueChain);
switch (status) {
case NKikimrProto::OK: {
@@ -144,15 +144,8 @@ namespace NKikimr::NBlobDepot {
}
case NKikimrProto::NODATA:
- if (ValueChainsWithNodata.emplace(tag, std::exchange(read.ValueChain, {})).second) { // real race
- IssueResolve(tag);
- } else {
- Y_VERIFY_DEBUG_S(false, "data is lost AgentId# " << Agent.LogId << " BlobId# " << read.Id);
- STLOG(PRI_CRIT, BLOB_DEPOT_AGENT, BDA40, "failed to ReadRange blob -- data is lost",
- (AgentId, Agent.LogId), (BlobId, read.Id));
- return EndWithError(status, TStringBuilder() << "failed to retrieve BlobId# "
- << read.Id << " data is lost");
- }
+ // this blob has just vanished since we found it in index -- may be it was partially written and
+ // now gone; it's okay to have this situation, not a data loss
break;
default:
diff --git a/ydb/core/blob_depot/assimilator.cpp b/ydb/core/blob_depot/assimilator.cpp
index 2a31d58e96..41e91e49e2 100644
--- a/ydb/core/blob_depot/assimilator.cpp
+++ b/ydb/core/blob_depot/assimilator.cpp
@@ -314,8 +314,8 @@ namespace NKikimr::NBlobDepot {
}
};
- Self->Data->ScanRange(LastScannedKey ? std::make_optional<TData::TKey>(*LastScannedKey) : std::nullopt,
- std::optional<TData::TKey>(), {}, callback);
+ TData::TScanRange r{LastScannedKey ? TData::TKey(*LastScannedKey) : TData::TKey::Min(), TData::TKey::Max()};
+ Self->Data->ScanRange(r, nullptr, nullptr, callback);
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT56, "ScanDataForCopying step", (Id, Self->GetLogId()),
(LastScannedKey, LastScannedKey), (ScanQ.size, ScanQ.size()), (TotalSize, TotalSize),
diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h
index d2efc2bff7..c13e342ab2 100644
--- a/ydb/core/blob_depot/blob_depot_tablet.h
+++ b/ydb/core/blob_depot/blob_depot_tablet.h
@@ -15,6 +15,8 @@ namespace NKikimr::NBlobDepot {
using NTabletFlatExecutor::TTabletExecutedFlat;
+ struct TToken {};
+
class TBlobDepot
: public TActor<TBlobDepot>
, public TTabletExecutedFlat
@@ -50,7 +52,6 @@ namespace NKikimr::NBlobDepot {
static constexpr TDuration ExpirationTimeout = TDuration::Minutes(1);
- struct TToken {};
std::shared_ptr<TToken> Token = std::make_shared<TToken>();
struct TAgent {
diff --git a/ydb/core/blob_depot/closed_interval_set.h b/ydb/core/blob_depot/closed_interval_set.h
index df28dfaadb..b8338b37fc 100644
--- a/ydb/core/blob_depot/closed_interval_set.h
+++ b/ydb/core/blob_depot/closed_interval_set.h
@@ -102,6 +102,37 @@ namespace NKikimr {
return *this;
}
+ static std::optional<std::pair<T, T>> PartialSubtractFromRange(T myLeft, T myRight, const TClosedIntervalSet& other) {
+ for (auto otherIt = other.Intervals.begin(); otherIt != other.Intervals.end(); ) {
+ if (myRight < otherIt->Left) {
+ break;
+ } else if (otherIt->Right < myLeft) {
+ ++otherIt;
+ if (otherIt != other.Intervals.end() && otherIt->Right < myLeft) {
+ otherIt = other.Intervals.lower_bound(TByRight{myLeft});
+ }
+ } else if (otherIt->Left <= myLeft) {
+ if (myRight <= otherIt->Right) {
+ return std::nullopt;
+ } else {
+ myLeft = otherIt->Right;
+ ++otherIt;
+ }
+ } else if (myRight <= otherIt->Right) {
+ myRight = otherIt->Left;
+ break;
+ } else {
+ if (otherIt->Left < otherIt->Right) {
+ myRight = otherIt->Left;
+ break;
+ }
+ ++otherIt;
+ }
+ }
+
+ return std::make_pair(std::move(myLeft), std::move(myRight));
+ }
+
// returns the first subrange of the full subtraction result
std::optional<std::pair<T, T>> PartialSubtract(const TClosedIntervalSet& other) const {
if (auto myIt = Intervals.begin(); myIt != Intervals.end()) {
@@ -161,6 +192,79 @@ namespace NKikimr {
}
return true;
}
+
+ template<typename TCallback>
+ void EnumInRange(const T& left, const T& right, bool reverse, TCallback&& callback) const {
+ if (reverse) {
+ const T *cursor = &right;
+ for (auto it = Intervals.upper_bound(TByLeft{right}); it != Intervals.begin(); ) {
+ --it;
+ if (it->Right < *cursor) {
+ if (it->Right < left) {
+ callback(left, *cursor, false);
+ return;
+ }
+ if (!callback(it->Right, *cursor, false)) {
+ return;
+ }
+ cursor = &it->Right;
+ }
+ if (it->Left <= left) {
+ callback(left, *cursor, true);
+ return;
+ }
+ if (!callback(it->Left, *cursor, true)) {
+ return;
+ }
+ cursor = &it->Left;
+ }
+ if (left < *cursor) {
+ callback(left, *cursor, false);
+ }
+ } else {
+ const T *cursor = &left;
+ for (auto it = Intervals.lower_bound(TByRight{left}); it != Intervals.end(); ++it) {
+ if (*cursor < it->Left) {
+ if (right < it->Left) {
+ callback(*cursor, right, false);
+ return;
+ }
+ if (!callback(*cursor, it->Left, false)) {
+ return;
+ }
+ cursor = &it->Left;
+ }
+ if (right <= it->Right) {
+ callback(*cursor, right, true);
+ return;
+ }
+ if (!callback(*cursor, it->Right, true)) {
+ return;
+ }
+ cursor = &it->Right;
+ }
+ if (*cursor < right) {
+ callback(*cursor, right, false);
+ }
+ }
+ }
+
+ void Output(IOutputStream& s) const {
+ s << '{';
+ for (auto it = Intervals.begin(); it != Intervals.end(); ++it) {
+ if (it != Intervals.begin()) {
+ s << ' ';
+ }
+ s << it->Left << '-' << it->Right;
+ }
+ s << '}';
+ }
+
+ TString ToString() const {
+ TStringStream s;
+ Output(s);
+ return s.Str();
+ }
};
} // NKikimr
diff --git a/ydb/core/blob_depot/closed_interval_set_ut.cpp b/ydb/core/blob_depot/closed_interval_set_ut.cpp
index cb6d70a555..484074efa7 100644
--- a/ydb/core/blob_depot/closed_interval_set_ut.cpp
+++ b/ydb/core/blob_depot/closed_interval_set_ut.cpp
@@ -19,7 +19,7 @@ TString ToString(const T& ivs) {
ui64 Convert(const T& ivs) {
ui64 res = 0;
ivs([&](ui8 first, ui8 last) {
- const ui64 mask = ((ui64(1) << (last - first + 1)) - 1) << first;
+ const ui64 mask = (ui64(1) << last + 1) - (ui64(1) << first);
UNIT_ASSERT_VALUES_EQUAL_C(res & mask, 0, ToString(ivs));
res |= mask;
return true;
@@ -52,7 +52,7 @@ Y_UNIT_TEST_SUITE(ClosedIntervalSet) {
for (ui32 end = begin; end <= 12; ++end) {
T x = Make(i);
x |= {begin, end};
- UNIT_ASSERT_VALUES_EQUAL(Convert(x), i | ((1 << (end - begin + 1)) - 1) << begin);
+ UNIT_ASSERT_VALUES_EQUAL(Convert(x), i | ((1 << end + 1) - (1 << begin)));
}
}
}
@@ -103,4 +103,52 @@ Y_UNIT_TEST_SUITE(ClosedIntervalSet) {
}
}
+ Y_UNIT_TEST(EnumInRange) {
+ for (ui32 i = 0; i < 4096; ++i) {
+ T x = Make(i);
+ for (ui32 j = 0; j <= 12; ++j) {
+ for (ui32 k = j; k <= 12; ++k) {
+ ui8 expectedLeft = j;
+ ui32 res = 0;
+ x.EnumInRange(j, k, false, [&](ui8 left, ui8 right, bool inside) {
+ UNIT_ASSERT_VALUES_EQUAL(left, expectedLeft);
+ UNIT_ASSERT(left <= right);
+ expectedLeft = right;
+ if (inside) {
+ const ui32 mask = (1 << right + 1) - (1 << left);
+ res |= mask;
+ }
+ return true;
+ });
+ UNIT_ASSERT_VALUES_EQUAL(expectedLeft, k);
+ UNIT_ASSERT_VALUES_EQUAL(res, i & ((1 << k + 1) - (1 << j)));
+ }
+ }
+ }
+ }
+
+ Y_UNIT_TEST(EnumInRangeReverse) {
+ for (ui32 i = 0; i < 4096; ++i) {
+ T x = Make(i);
+ for (ui32 j = 0; j <= 12; ++j) {
+ for (ui32 k = j; k <= 12; ++k) {
+ ui8 expectedRight = k;
+ ui32 res = 0;
+ x.EnumInRange(j, k, true, [&](ui8 left, ui8 right, bool inside) {
+ UNIT_ASSERT_VALUES_EQUAL(right, expectedRight);
+ UNIT_ASSERT(left <= right);
+ expectedRight = left;
+ if (inside) {
+ const ui32 mask = (1 << right + 1) - (1 << left);
+ res |= mask;
+ }
+ return true;
+ });
+ UNIT_ASSERT_VALUES_EQUAL(expectedRight, j);
+ UNIT_ASSERT_VALUES_EQUAL(res, i & ((1 << k + 1) - (1 << j)));
+ }
+ }
+ }
+ }
+
}
diff --git a/ydb/core/blob_depot/coro_tx.cpp b/ydb/core/blob_depot/coro_tx.cpp
new file mode 100644
index 0000000000..319bcb8573
--- /dev/null
+++ b/ydb/core/blob_depot/coro_tx.cpp
@@ -0,0 +1,200 @@
+#include "coro_tx.h"
+
+namespace NKikimr::NBlobDepot {
+
+ thread_local TCoroTx *TCoroTx::Current = nullptr;
+
+ enum class EOutcome {
+ UNSET,
+ FINISH_TX,
+ RESTART_TX,
+ RUN_SUCCESSOR_TX,
+ END_CORO
+ };
+
+#ifndef NDEBUG
+ static constexpr ui64 StackSentinel = 0x8E0CDBFD41F04520;
+ static constexpr size_t NumStackSentinels = 8;
+#endif
+
+ class TCoroTx::TContext : public ITrampoLine {
+ TMappedAllocation Stack;
+ TExceptionSafeContext Context;
+ TExceptionSafeContext *BackContext = nullptr;
+
+ EOutcome Outcome = EOutcome::UNSET;
+
+ std::weak_ptr<TToken> Token;
+ std::function<void()> Body;
+
+ bool Finished = false;
+
+ public:
+ TContext(const std::weak_ptr<TToken>& token, std::function<void()>&& body)
+ : Stack(65536)
+ , Context({this, TArrayRef(Stack.Begin(), Stack.End())})
+ , Token(token)
+ , Body(std::move(body))
+ {
+#ifndef NDEBUG
+ char *p;
+# if STACK_GROW_DOWN
+ p = Stack.Begin();
+# else
+ p = Stack.End() - sizeof(StackSentinel) * NumStackSentinels;
+# endif
+ for (size_t i = 0; i < NumStackSentinels; ++i) {
+ memcpy(p + i * sizeof(StackSentinel), &StackSentinel, sizeof(StackSentinel));
+ }
+#endif
+ }
+
+ ~TContext() {
+ if (!Finished) {
+ Finished = true;
+ Resume();
+ }
+ }
+
+ EOutcome Resume() {
+ Outcome = EOutcome::UNSET;
+
+ TExceptionSafeContext returnContext;
+ Y_VERIFY(!BackContext);
+ BackContext = &returnContext;
+ returnContext.SwitchTo(&Context);
+ Y_VERIFY(BackContext == &returnContext);
+ BackContext = nullptr;
+
+ // validate stack
+#ifndef NDEBUG
+ char *p;
+# if STACK_GROW_DOWN
+ p = Stack.Begin();
+# else
+ p = Stack.End() - sizeof(StackSentinel) * NumStackSentinels;
+# endif
+ for (size_t i = 0; i < NumStackSentinels; ++i) {
+ ui64 temp;
+ memcpy(&temp, p + i * sizeof(StackSentinel), sizeof(StackSentinel));
+ Y_VERIFY(StackSentinel == temp);
+ }
+#endif
+
+ Y_VERIFY(Outcome != EOutcome::UNSET);
+ return Outcome;
+ }
+
+ void Return(EOutcome outcome) {
+ Y_VERIFY(Outcome == EOutcome::UNSET);
+ Outcome = outcome;
+ Y_VERIFY(BackContext);
+ Context.SwitchTo(BackContext);
+ if (Token.expired() || Finished) {
+ throw TExDead();
+ }
+ }
+
+ private:
+ void DoRun() override {
+ if (!Token.expired()) {
+ try {
+ Body();
+ } catch (const TExDead&) {
+ // just do nothing
+ }
+ }
+ Finished = true;
+ Return(EOutcome::END_CORO);
+ }
+ };
+
+ TCoroTx::TCoroTx(TBlobDepot *self, const std::weak_ptr<TToken>& token, std::function<void()> body)
+ : TTransactionBase(self)
+ , Context(std::make_unique<TContext>(token, std::move(body)))
+ {}
+
+ TCoroTx::TCoroTx(TCoroTx& predecessor)
+ : TTransactionBase(predecessor.Self)
+ , Context(std::move(predecessor.Context))
+ {}
+
+ TCoroTx::~TCoroTx()
+ {}
+
+ bool TCoroTx::Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext&) {
+ // prepare environment
+ Y_VERIFY(TxContext == nullptr && Current == nullptr);
+ TxContext = &txc;
+ Current = this;
+
+ Y_VERIFY(Context);
+ const EOutcome outcome = Context->Resume();
+
+ // clear environment back
+ Y_VERIFY(TxContext == &txc && Current == this);
+ TxContext = nullptr;
+ Current = nullptr;
+
+ switch (outcome) {
+ case EOutcome::FINISH_TX:
+ return true;
+
+ case EOutcome::RESTART_TX:
+ return false;
+
+ default:
+ Y_FAIL();
+ }
+ }
+
+ void TCoroTx::Complete(const TActorContext&) {
+ // prepare environment
+ Y_VERIFY(TxContext == nullptr && Current == nullptr);
+ Current = this;
+
+ Y_VERIFY(Context);
+ const EOutcome outcome = Context->Resume();
+
+ // clear environment back
+ Y_VERIFY(TxContext == nullptr && Current == this);
+ Current = nullptr;
+
+ switch (outcome) {
+ case EOutcome::RUN_SUCCESSOR_TX:
+ Self->Execute(std::make_unique<TCoroTx>(*this));
+ break;
+
+ case EOutcome::END_CORO:
+ break;
+
+ default:
+ Y_FAIL();
+ }
+ }
+
+ TCoroTx *TCoroTx::CurrentTx() {
+ return Current;
+ }
+
+ NTabletFlatExecutor::TTransactionContext *TCoroTx::GetTxc() {
+ Y_VERIFY(Current->TxContext);
+ return Current->TxContext;
+ }
+
+ void TCoroTx::FinishTx() {
+ Y_VERIFY(Current);
+ Current->Context->Return(EOutcome::FINISH_TX);
+ }
+
+ void TCoroTx::RestartTx() {
+ Y_VERIFY(Current);
+ Current->Context->Return(EOutcome::RESTART_TX);
+ }
+
+ void TCoroTx::RunSuccessorTx() {
+ Y_VERIFY(Current);
+ Current->Context->Return(EOutcome::RUN_SUCCESSOR_TX);
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/coro_tx.h b/ydb/core/blob_depot/coro_tx.h
new file mode 100644
index 0000000000..f731ff88dd
--- /dev/null
+++ b/ydb/core/blob_depot/coro_tx.h
@@ -0,0 +1,33 @@
+#pragma once
+
+#include "defs.h"
+#include "blob_depot_tablet.h"
+
+namespace NKikimr::NBlobDepot {
+
+ struct TExDead {};
+
+ class TCoroTx : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
+ class TContext;
+ std::unique_ptr<TContext> Context;
+ TTransactionContext *TxContext = nullptr;
+ static thread_local TCoroTx *Current;
+
+ public:
+ TCoroTx(TBlobDepot *self, const std::weak_ptr<TToken>& token, std::function<void()> body);
+ TCoroTx(TCoroTx& predecessor);
+ ~TCoroTx();
+
+ private:
+ bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext&) override;
+ void Complete(const TActorContext&) override;
+
+ public:
+ static NTabletFlatExecutor::TTransactionContext *GetTxc();
+ static TCoroTx *CurrentTx(); // obtain pointer to current tx
+ static void FinishTx(); // finish this transaction; function returns on Complete() entry
+ static void RestartTx(); // restart transaction; function returns on next Execute() entry
+ static void RunSuccessorTx(); // restart in new transaction -- called after FinishTx()
+ };
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp
index 52edebf5f0..0ce533696a 100644
--- a/ydb/core/blob_depot/data.cpp
+++ b/ydb/core/blob_depot/data.cpp
@@ -110,7 +110,7 @@ namespace NKikimr::NBlobDepot {
#ifndef NDEBUG
Y_VERIFY(outcome != EUpdateOutcome::NO_CHANGE || !value.Changed(originalValue));
- Y_VERIFY(value.ValueVersion == originalValue.ValueVersion + 1 || IsSameValueChain(value.ValueChain, originalValue.ValueChain));
+ Y_VERIFY(inserted || value.ValueVersion == originalValue.ValueVersion + 1 || IsSameValueChain(value.ValueChain, originalValue.ValueChain));
#endif
if ((underSoft && value.KeepState != EKeepState::Keep) || underHard) {
@@ -329,7 +329,7 @@ namespace NKikimr::NBlobDepot {
return it->second;
}
- void TData::AddDataOnLoad(TKey key, TString value, bool uncertainWrite) {
+ TData::TValue *TData::AddDataOnLoad(TKey key, TString value, bool uncertainWrite) {
Y_VERIFY_S(!IsKeyLoaded(key), "Id# " << Self->GetLogId() << " Key# " << key.ToString());
NKikimrBlobDepot::TValue proto;
@@ -351,6 +351,8 @@ namespace NKikimr::NBlobDepot {
}
ValidateRecords();
+
+ return &it->second;
}
bool TData::AddDataOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlob& blob,
@@ -491,11 +493,14 @@ namespace NKikimr::NBlobDepot {
const TData::TKey last(TLogoBlobID(tabletId, current.Generation(), current.Step(), channel,
TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie, TLogoBlobID::MaxPartId, TLogoBlobID::MaxCrcMode));
+ // find keys we have to delete
bool finished = true;
- Self->Data->ScanRange(&first, &last, TData::EScanFlags::INCLUDE_END, [&](auto& key, auto& value) {
+ TScanRange r{first, last, TData::EScanFlags::INCLUDE_END};
+ std::vector<TKey> keysToDelete;
+ Self->Data->ScanRange(r, nullptr, nullptr, [&](auto& key, auto& value) {
if (value.KeepState != EKeepState::Keep || hard) {
if (maxItems) {
- Self->Data->DeleteKey(key, txc, cookie);
+ keysToDelete.push_back(key);
--maxItems;
} else {
finished = false;
@@ -505,6 +510,11 @@ namespace NKikimr::NBlobDepot {
return true;
});
+ // delete selected keys
+ for (const TKey& key : keysToDelete) {
+ DeleteKey(key, txc, cookie);
+ }
+
return finished;
}
@@ -664,3 +674,8 @@ namespace NKikimr::NBlobDepot {
}
} // NKikimr::NBlobDepot
+
+template<>
+void Out<NKikimr::NBlobDepot::TBlobDepot::TData::TKey>(IOutputStream& s, const NKikimr::NBlobDepot::TBlobDepot::TData::TKey& x) {
+ x.Output(s);
+}
diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h
index 3475175645..7231cc8226 100644
--- a/ydb/core/blob_depot/data.h
+++ b/ydb/core/blob_depot/data.h
@@ -372,6 +372,15 @@ namespace NKikimr::NBlobDepot {
Y_DECLARE_FLAGS(TScanFlags, EScanFlags)
+ struct TScanRange {
+ TKey Begin;
+ TKey End;
+ TScanFlags Flags = {};
+ ui64 MaxKeys = 0;
+ ui32 PrechargeRows = 0;
+ ui64 PrechargeBytes = 0;
+ };
+
private:
struct TRecordWithTrash {};
@@ -413,13 +422,6 @@ namespace NKikimr::NBlobDepot {
THashMultiMap<void*, TLogoBlobID> InFlightTrash; // being committed, but not yet confirmed
- struct TResolveDecommitContext {
- TEvBlobDepot::TEvResolve::TPtr Ev; // original resolve request
- TActorId ReturnAfterLoadingKeys;
- std::deque<TEvBlobStorage::TEvAssimilateResult::TBlob> DecommitBlobs = {};
- std::vector<TKey> ResolutionErrors = {};
- };
-
class TTxIssueGC;
class TTxConfirmGC;
@@ -442,39 +444,157 @@ namespace NKikimr::NBlobDepot {
ui64 LastCollectCmdId = 0;
std::unordered_map<ui64, TCollectCmd> CollectCmds;
+ struct TLoadRangeFromDB {
+ TData* const Data;
+ const TScanRange& Range;
+ bool* const Progress;
+ bool Processing = true;
+ std::optional<TKey> LastProcessedKey = {};
+
+ static constexpr struct TReverse {} Reverse{};
+ static constexpr struct TLeftBound {} LeftBound{};
+ static constexpr struct TRightBound {} RightBound{};
+
+ template<typename TCallback>
+ bool operator ()(NTabletFlatExecutor::TTransactionContext& txc, const TKey& left, const TKey& right, TCallback&& callback) {
+ auto table = NIceDb::TNiceDb(txc.DB).Table<Schema::Data>();
+ return Range.Flags & EScanFlags::REVERSE
+ ? Load(Reverse, table.Reverse(), left, right, std::forward<TCallback>(callback))
+ : Load(Reverse, std::move(table), left, right, std::forward<TCallback>(callback));
+ }
+
+ template<typename TTable, typename TCallback>
+ bool Load(TReverse, TTable&& table, const TKey& left, const TKey& right, TCallback&& callback) {
+ return left != TKey::Min()
+ ? Load(LeftBound, table.GreaterOrEqual(left.MakeBinaryKey()), left, right, std::forward<TCallback>(callback))
+ : right != TKey::Max()
+ ? Load(RightBound, table.LessOrEqual(right.MakeBinaryKey()), left, right, std::forward<TCallback>(callback))
+ : Load(RightBound, table.All(), left, right, std::forward<TCallback>(callback));
+ }
+
+ template<typename TTable, typename TCallback>
+ bool Load(TLeftBound, TTable&& table, const TKey& left, const TKey& right, TCallback&& callback) {
+ return right != TKey::Max()
+ ? Load(RightBound, table.LessOrEqual(right.MakeBinaryKey()), left, right, std::forward<TCallback>(callback))
+ : Load(RightBound, std::forward<TTable>(table), left, right, std::forward<TCallback>(callback));
+ }
+
+ template<typename TTable, typename TCallback>
+ bool Load(TRightBound, TTable&& table, const TKey& left, const TKey& right, TCallback&& callback) {
+ if ((Range.PrechargeRows || Range.PrechargeBytes) && !table.Precharge(Range.PrechargeRows, Range.PrechargeBytes)) {
+ return false;
+ }
+ auto rowset = table.Select();
+ if (!rowset.IsReady()) {
+ return false;
+ }
+ while (rowset.IsValid()) {
+ TKey key = TKey::FromBinaryKey(rowset.GetKey(), Data->Self->Config);
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT46, "ScanRange.Load", (Id, Data->Self->GetLogId()), (Left, left),
+ (Right, right), (Key, key));
+ LastProcessedKey.emplace(key);
+ if (left < key && key < right) {
+ TValue* const value = Data->AddDataOnLoad(key, rowset.template GetValue<Schema::Data::Value>(),
+ rowset.template GetValueOrDefault<Schema::Data::UncertainWrite>());
+ if (Processing) {
+ // we should not feed keys out of range when we are processing prefetched data outside the range
+ if (Range.Flags & EScanFlags::REVERSE) {
+ Processing = Range.Flags & EScanFlags::INCLUDE_BEGIN ? Range.Begin <= key : Range.Begin < key;
+ } else {
+ Processing = Range.Flags & EScanFlags::INCLUDE_END ? key <= Range.End : key < Range.End;
+ }
+ }
+ Processing = Processing && callback(std::move(key), *value);
+ *Progress = true;
+ } else {
+ Y_VERIFY_DEBUG(key == left || key == right);
+ }
+ if (!rowset.Next()) {
+ return false; // we break iteration anyway, because we can't read more data
+ }
+ }
+ if (!LastProcessedKey || (Range.Flags & EScanFlags::REVERSE ? left < *LastProcessedKey : *LastProcessedKey < right)) {
+ LastProcessedKey.emplace(Range.Flags & EScanFlags::REVERSE ? left : right);
+ }
+ return Processing;
+ };
+ };
+
public:
TData(TBlobDepot *self);
~TData();
- template<typename TCallback, typename T>
- bool ScanRange(const T& begin, const T& end, TScanFlags flags, TCallback&& callback) {
- auto beginIt = !begin ? Data.begin()
- : flags & EScanFlags::INCLUDE_BEGIN ? Data.lower_bound(*begin)
- : Data.upper_bound(*begin);
-
- auto endIt = !end ? Data.end()
- : flags & EScanFlags::INCLUDE_END ? Data.upper_bound(*end)
- : Data.lower_bound(*end);
-
- if (flags & EScanFlags::REVERSE) {
- if (beginIt != endIt) {
- --endIt;
- do {
- auto& current = *endIt--;
- if (!callback(current.first, current.second)) {
- return false;
- }
- } while (beginIt != endIt);
+ template<typename TCallback>
+ bool ScanRange(TScanRange& range, NTabletFlatExecutor::TTransactionContext *txc, bool *progress, TCallback&& callback) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT76, "ScanRange", (Id, Self->GetLogId()), (Begin, range.Begin), (End, range.End),
+ (Flags, range.Flags), (MaxKeys, range.MaxKeys));
+
+ const bool reverse = range.Flags & EScanFlags::REVERSE;
+ TLoadRangeFromDB loader{this, range, progress};
+
+ bool res = true;
+
+ auto issue = [&](TKey&& key, const TValue& value) {
+ Y_VERIFY_DEBUG(range.Flags & EScanFlags::INCLUDE_BEGIN ? range.Begin <= key : range.Begin < key);
+ Y_VERIFY_DEBUG(range.Flags & EScanFlags::INCLUDE_END ? key <= range.End : key < range.End);
+
+ if (!callback(key, value) || (range.MaxKeys && !--range.MaxKeys)) {
+ return false; // scan aborted by user or finished scanning the required range
+ } else {
+ // remove already scanned items from the range query
+ return true;
}
- } else {
- while (beginIt != endIt) {
- auto& current = *beginIt++;
- if (!callback(current.first, current.second)) {
- return false;
+ };
+
+ const auto& from = reverse ? TKey::Min() : range.Begin;
+ const auto& to = reverse ? range.End : TKey::Max();
+ LoadedKeys.EnumInRange(from, to, reverse, [&](const TKey& left, const TKey& right, bool isRangeLoaded) {
+ STLOG(PRI_DEBUG, BLOB_DEPOT, BDT83, "ScanRange.Step", (Id, Self->GetLogId()), (Left, left), (Right, right),
+ (IsRangeLoaded, isRangeLoaded), (From, from), (To, to));
+ if (!isRangeLoaded) {
+ // we have to load range (left, right), not including both ends
+ Y_VERIFY(txc && progress);
+ if (!loader(*txc, left, right, issue)) {
+ res = !loader.Processing;
+ return false; // break the iteration
+ }
+ } else if (reverse) {
+ for (auto it = Data.upper_bound(right); it != Data.begin(); ) {
+ const auto& [key, value] = *--it;
+ if (key < left) {
+ break;
+ } else if (range.Flags & EScanFlags::INCLUDE_BEGIN ? key < range.Begin : key <= range.Begin) {
+ return false; // just left the left side of the range
+ } else if ((key != range.End || range.Flags & EScanFlags::INCLUDE_END) && !issue(TKey(key), value)) {
+ return false; // enough keys processed
+ }
}
+ } else {
+ // we have a range of loaded keys in the interval [left, right], including both ends -- load
+ // data from memory
+ for (auto it = Data.lower_bound(left); it != Data.end() && it->first <= right; ++it) {
+ const auto& [key, value] = *it;
+ if (range.Flags & EScanFlags::INCLUDE_END ? range.End < key : range.End <= key) {
+ return false; // just left the right side of the range
+ } else if ((key != range.Begin || range.Flags & EScanFlags::INCLUDE_BEGIN) && !issue(TKey(key), value)) {
+ return false; // enough keys processed
+ }
+ }
+ }
+ return true;
+ });
+
+ if (loader.LastProcessedKey) {
+ if (reverse) {
+ LoadedKeys.AddRange(std::make_tuple(*loader.LastProcessedKey, range.End));
+ } else {
+ LoadedKeys.AddRange(std::make_tuple(range.Begin, *loader.LastProcessedKey));
}
+ (reverse ? range.End : range.Begin) = std::move(*loader.LastProcessedKey);
+ range.Flags.RemoveFlags(reverse ? EScanFlags::INCLUDE_END : EScanFlags::INCLUDE_BEGIN);
}
- return true;
+
+ return res;
}
template<typename TCallback>
@@ -505,7 +625,7 @@ namespace NKikimr::NBlobDepot {
TRecordsPerChannelGroup& GetRecordsPerChannelGroup(TLogoBlobID id);
TRecordsPerChannelGroup& GetRecordsPerChannelGroup(ui8 channel, ui32 groupId);
- void AddDataOnLoad(TKey key, TString value, bool uncertainWrite);
+ TValue *AddDataOnLoad(TKey key, TString value, bool uncertainWrite);
bool AddDataOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlob& blob,
NTabletFlatExecutor::TTransactionContext& txc, void *cookie);
void AddTrashOnLoad(TLogoBlobID id);
@@ -546,10 +666,13 @@ namespace NKikimr::NBlobDepot {
}
void StartLoad();
+ bool LoadTrash(NTabletFlatExecutor::TTransactionContext& txc, TString& from, bool& progress);
void OnLoadComplete();
bool IsLoaded() const { return Loaded; }
bool IsKeyLoaded(const TKey& key) const { return Loaded || LoadedKeys[key]; }
+ bool EnsureKeyLoaded(const TKey& key, NTabletFlatExecutor::TTransactionContext& txc);
+
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
class TResolveDecommitActor;
@@ -560,7 +683,7 @@ namespace NKikimr::NBlobDepot {
ui32 notifyEventType, TActorId parentId, ui64 cookie, bool keep = false, bool doNotKeep = false);
class TTxResolve;
- void ExecuteTxResolve(TEvBlobDepot::TEvResolve::TPtr ev, TResolveDecommitContext&& context);
+ void ExecuteTxResolve(TEvBlobDepot::TEvResolve::TPtr ev, THashSet<TLogoBlobID>&& resolutionErrors = {});
void Handle(TEvBlobDepot::TEvResolve::TPtr ev);
@@ -578,7 +701,7 @@ namespace NKikimr::NBlobDepot {
void EndCommittingBlobSeqId(TAgent& agent, TBlobSeqId blobSeqId);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-
+
TMonotonic LastRecordsValidationTimestamp;
void ValidateRecords();
diff --git a/ydb/core/blob_depot/data_decommit.cpp b/ydb/core/blob_depot/data_decommit.cpp
index 15a45ecb8a..4a90a0b60c 100644
--- a/ydb/core/blob_depot/data_decommit.cpp
+++ b/ydb/core/blob_depot/data_decommit.cpp
@@ -1,4 +1,5 @@
#include "data.h"
+#include "coro_tx.h"
namespace NKikimr::NBlobDepot {
@@ -11,7 +12,8 @@ namespace NKikimr::NBlobDepot {
TBlobDepot* const Self;
std::weak_ptr<TToken> Token;
- TResolveDecommitContext Context;
+ std::vector<TEvBlobStorage::TEvAssimilateResult::TBlob> DecommitBlobs;
+ THashSet<TLogoBlobID> ResolutionErrors;
TEvBlobDepot::TEvResolve::TPtr Ev;
ui32 RangesInFlight = 0;
@@ -41,24 +43,11 @@ namespace NKikimr::NBlobDepot {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT42, "TResolveDecommitActor::Bootstrap", (Id, Self->GetLogId()),
(Sender, Ev->Sender), (Cookie, Ev->Cookie));
- if (!Self->Data->Loaded) {
- // issue request to a resolver to just load keys we are looking for
- TResolveDecommitContext context;
- context.ReturnAfterLoadingKeys = SelfId();
- Self->Data->ExecuteTxResolve(Ev, std::move(context));
- } else {
- Handle(Ev);
- }
-
+ Self->Execute(std::make_unique<TCoroTx>(Self, Token, std::bind(&TThis::TxPrepare, this)));
Become(&TThis::StateFunc);
}
- void Handle(TEvBlobDepot::TEvResolve::TPtr ev) {
- Ev = ev; // store event back
-
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT46, "keys loaded", (Id, Self->GetLogId()),
- (Sender, Ev->Sender), (Cookie, Ev->Cookie));
-
+ void TxPrepare() {
for (const auto& item : Ev->Get()->Record.GetItems()) {
switch (item.GetKeyDesignatorCase()) {
case NKikimrBlobDepot::TEvResolve::TItem::kKeyRange: {
@@ -85,15 +74,17 @@ namespace NKikimr::NBlobDepot {
// adjust minId to skip already assimilated items in range query
if (minId < Self->Data->LastAssimilatedBlobId) {
if (item.GetMustRestoreFirst()) {
- ScanRangeAndIssueGets(TKey(minId), TKey(*Self->Data->LastAssimilatedBlobId), EScanFlags::INCLUDE_BEGIN);
+ InvokeOtherActor(*this, &TThis::ScanRangeAndIssueGets, TKey(minId),
+ TKey(*Self->Data->LastAssimilatedBlobId), EScanFlags::INCLUDE_BEGIN);
}
minId = *Self->Data->LastAssimilatedBlobId;
}
// issue scan query
- IssueRange(tabletId, minId, maxId, item.GetMustRestoreFirst());
+ InvokeOtherActor(*this, &TThis::IssueRange, tabletId, minId, maxId, item.GetMustRestoreFirst());
} else if (item.GetMustRestoreFirst()) {
- ScanRangeAndIssueGets(TKey(minId), TKey(maxId), EScanFlags::INCLUDE_BEGIN | EScanFlags::INCLUDE_END);
+ InvokeOtherActor(*this, &TThis::ScanRangeAndIssueGets, TKey(minId), TKey(maxId),
+ EScanFlags::INCLUDE_BEGIN | EScanFlags::INCLUDE_END);
}
break;
@@ -101,11 +92,14 @@ namespace NKikimr::NBlobDepot {
case NKikimrBlobDepot::TEvResolve::TItem::kExactKey: {
TData::TKey key = TKey::FromBinaryKey(item.GetExactKey(), Self->Config);
+ while (!Self->Data->EnsureKeyLoaded(key, *TCoroTx::GetTxc())) {
+ TCoroTx::RestartTx();
+ }
const TValue *value = Self->Data->FindKey(key);
const bool doGet = (!value && key.GetBlobId() < Self->Data->LastAssimilatedBlobId) // value not yet assimilated
|| (value && value->GoingToAssimilate && item.GetMustRestoreFirst()); // value has no local data yet
if (doGet) {
- IssueGet(key.GetBlobId(), item.GetMustRestoreFirst());
+ InvokeOtherActor(*this, &TThis::IssueGet, key.GetBlobId(), item.GetMustRestoreFirst());
}
break;
}
@@ -116,16 +110,29 @@ namespace NKikimr::NBlobDepot {
}
}
+ TCoroTx::FinishTx();
CheckIfDone();
}
void ScanRangeAndIssueGets(TKey from, TKey to, TScanFlags flags) {
- Self->Data->ScanRange(&from, &to, flags, [&](const TKey& key, const TValue& value) {
+ bool progress = false;
+
+ auto callback = [&](const TKey& key, const TValue& value) {
if (value.GoingToAssimilate) {
IssueGet(key.GetBlobId(), true /*mustRestoreFirst*/);
}
return true;
- });
+ };
+
+ TScanRange r{from, to, flags};
+ while (!Self->Data->ScanRange(r, TCoroTx::GetTxc(), &progress, callback)) {
+ if (std::exchange(progress, false)) {
+ TCoroTx::FinishTx();
+ TCoroTx::RunSuccessorTx();
+ } else {
+ TCoroTx::RestartTx();
+ }
+ }
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -153,7 +160,7 @@ namespace NKikimr::NBlobDepot {
IssueGet(r.Id, true /*mustRestoreFirst*/);
}
} else {
- Context.DecommitBlobs.push_back({r.Id, r.Keep, r.DoNotKeep});
+ DecommitBlobs.push_back({r.Id, r.Keep, r.DoNotKeep});
}
}
} else {
@@ -206,7 +213,7 @@ namespace NKikimr::NBlobDepot {
if (r.Buffer) { // wasn't index read
IssuePut(TKey(r.Id), std::move(r.Buffer), r.Keep, r.DoNotKeep);
} else {
- Context.DecommitBlobs.push_back({r.Id, r.Keep, r.DoNotKeep});
+ DecommitBlobs.push_back({r.Id, r.Keep, r.DoNotKeep});
}
} else if (r.Status == NKikimrProto::NODATA) {
Self->Data->ExecuteTxCommitAssimilatedBlob(NKikimrProto::NODATA, TBlobSeqId(), TData::TKey(r.Id),
@@ -214,7 +221,7 @@ namespace NKikimr::NBlobDepot {
++PutsInFlight;
} else {
// mark this specific key as unresolvable
- Context.ResolutionErrors.emplace_back(r.Id);
+ ResolutionErrors.emplace(r.Id);
}
}
@@ -264,7 +271,7 @@ namespace NKikimr::NBlobDepot {
TEvPrivate::EvTxComplete, SelfId(), 0, keep, doNotKeep);
if (msg.Status != NKikimrProto::OK) { // do not reply OK to this item
- Context.ResolutionErrors.emplace_back(msg.Id);
+ ResolutionErrors.insert(msg.Id);
}
}
@@ -281,10 +288,26 @@ namespace NKikimr::NBlobDepot {
void FinishWithSuccess() {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT92, "request succeeded", (Id, Self->GetLogId()), (Sender, Ev->Sender),
- (Cookie, Ev->Cookie), (ResolutionErrors.size, Context.ResolutionErrors.size()),
- (DecommitBlobs.size, Context.DecommitBlobs.size()));
- std::sort(Context.ResolutionErrors.begin(), Context.ResolutionErrors.end());
- Self->Data->ExecuteTxResolve(Ev, std::move(Context));
+ (Cookie, Ev->Cookie), (ResolutionErrors.size, ResolutionErrors.size()),
+ (DecommitBlobs.size, DecommitBlobs.size()));
+
+ Self->Execute(std::make_unique<TCoroTx>(Self, Token, [self = Self, decommitBlobs = std::move(DecommitBlobs),
+ ev = Ev, resolutionErrors = std::move(ResolutionErrors)]() mutable {
+ ui32 numItemsProcessed = 0;
+ for (const auto& blob : decommitBlobs) {
+ if (numItemsProcessed == 10'000) {
+ TCoroTx::FinishTx();
+ self->Data->CommitTrash(TCoroTx::CurrentTx());
+ numItemsProcessed = 0;
+ TCoroTx::RunSuccessorTx();
+ }
+ numItemsProcessed += self->Data->AddDataOnDecommit(blob, *TCoroTx::GetTxc(), TCoroTx::CurrentTx());
+ }
+ TCoroTx::FinishTx();
+ self->Data->CommitTrash(TCoroTx::CurrentTx());
+ self->Data->ExecuteTxResolve(ev, std::move(resolutionErrors));
+ }));
+
PassAway();
}
@@ -302,7 +325,6 @@ namespace NKikimr::NBlobDepot {
}
switch (const ui32 type = ev->GetTypeRewrite()) {
- hFunc(TEvBlobDepot::TEvResolve, Handle);
hFunc(TEvBlobStorage::TEvGetResult, Handle);
hFunc(TEvBlobStorage::TEvRangeResult, Handle);
hFunc(TEvBlobStorage::TEvPutResult, Handle);
diff --git a/ydb/core/blob_depot/data_load.cpp b/ydb/core/blob_depot/data_load.cpp
index 010ae46e2c..3165c16873 100644
--- a/ydb/core/blob_depot/data_load.cpp
+++ b/ydb/core/blob_depot/data_load.cpp
@@ -1,167 +1,103 @@
#include "data.h"
#include "schema.h"
#include "garbage_collection.h"
+#include "coro_tx.h"
namespace NKikimr::NBlobDepot {
using TData = TBlobDepot::TData;
- class TData::TTxDataLoad : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
- std::optional<TString> LastTrashKey;
- bool TrashLoaded = false;
- bool SuccessorTx = true;
-
- public:
- TTxType GetTxType() const override { return NKikimrBlobDepot::TXTYPE_DATA_LOAD; }
-
- TTxDataLoad(TBlobDepot *self)
- : TTransactionBase(self)
- {}
-
- TTxDataLoad(TTxDataLoad& predecessor)
- : TTransactionBase(predecessor.Self)
- , LastTrashKey(std::move(predecessor.LastTrashKey))
- , TrashLoaded(predecessor.TrashLoaded)
- {}
-
- bool Execute(TTransactionContext& txc, const TActorContext&) override {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT28, "TData::TTxDataLoad::Execute", (Id, Self->GetLogId()));
-
- NIceDb::TNiceDb db(txc.DB);
+ void TData::StartLoad() {
+ Self->Execute(std::make_unique<TCoroTx>(Self, Self->Token, [&] {
bool progress = false;
- auto load = [&](auto t, auto& lastKey, auto callback) {
- auto table = t.GreaterOrEqual(lastKey.value_or(TString()));
- static constexpr ui64 PrechargeRows = 10'000;
- static constexpr ui64 PrechargeBytes = 1'000'000;
- if (!table.Precharge(PrechargeRows, PrechargeBytes)) {
- return false;
- }
- auto rows = table.Select();
- if (!rows.IsReady()) {
- return false;
- }
- while (rows.IsValid()) {
- if (auto key = rows.GetKey(); key != lastKey) {
- callback(key, rows);
- lastKey.emplace(std::move(key));
- progress = true;
- }
- if (!rows.Next()) {
- return false;
- }
- }
- lastKey.reset();
- return true;
- };
-
- if (!TrashLoaded) {
- auto addTrash = [this](const auto& key, const auto& /*rows*/) {
- Self->Data->AddTrashOnLoad(TLogoBlobID::FromBinary(key));
- };
- if (!load(db.Table<Schema::Trash>(), LastTrashKey, addTrash)) {
- return progress;
- }
- TrashLoaded = true;
- }
-
- for (;;) {
- // calculate a set of keys we need to load
- TClosedIntervalSet<TKey> needed;
- needed |= {TKey::Min(), TKey::Max()};
- const auto interval = needed.PartialSubtract(Self->Data->LoadedKeys);
- if (!interval) {
- break;
- }
-
- const auto& [first, last] = *interval;
-
- auto makeNeeded = [&] {
- TStringStream s("{");
- bool flag = true;
- needed([&](const TKey& first, const TKey& last) {
- s << (std::exchange(flag, false) ? "" : "-");
- first.Output(s);
- last.Output(s << '-');
- return true;
- });
- s << '}';
- return s.Str();
- };
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT83, "TTxDataLoad iteration", (Id, Self->GetLogId()), (Needed, makeNeeded()),
- (FirstKey, first), (LastKey, last));
+ TString trash;
+ bool trashLoaded = false;
- bool status = true;
- std::optional<TKey> lastKey; // the actual last processed key
+ TScanRange r{
+ .Begin = TKey::Min(),
+ .End = TKey::Max(),
+ .PrechargeRows = 10'000,
+ .PrechargeBytes = 1'000'000,
+ };
- auto table = db.Table<Schema::Data>().GreaterOrEqual(first.MakeBinaryKey());
- static constexpr ui64 PrechargeRows = 10'000;
- static constexpr ui64 PrechargeBytes = 1'000'000;
- if (!table.Precharge(PrechargeRows, PrechargeBytes)) {
- status = false;
- } else if (auto rows = table.Select(); !rows.IsReady()) {
- status = false;
+ while (!(trashLoaded = LoadTrash(*TCoroTx::GetTxc(), trash, progress)) ||
+ !ScanRange(r, TCoroTx::GetTxc(), &progress, [](const TKey&, const TValue&) { return true; })) {
+ if (std::exchange(progress, false)) {
+ TCoroTx::FinishTx();
+ TCoroTx::RunSuccessorTx();
} else {
- for (;;) {
- if (!rows.IsValid()) { // finished reading the range
- lastKey.emplace(last);
- break;
- }
- auto key = TKey::FromBinaryKey(rows.GetKey(), Self->Config);
- lastKey.emplace(key);
- if (last <= key) { // stop iteration -- we are getting out of range
- break;
- } else if (first < key && !Self->Data->IsKeyLoaded(key)) {
- Self->Data->AddDataOnLoad(std::move(key), rows.template GetValue<Schema::Data::Value>(),
- rows.template GetValueOrDefault<Schema::Data::UncertainWrite>());
- progress = true;
- }
- if (!rows.Next()) {
- status = false;
- break;
- }
- }
- }
-
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT84, "TTxDataLoad iteration complete", (Id, Self->GetLogId()),
- (FirstKey, first), (LastKey, lastKey), (Status, status), (Progress, progress));
-
- if (lastKey) {
- Self->Data->LoadedKeys |= {first, *lastKey};
- }
- if (!status) {
- return progress;
+ TCoroTx::RestartTx();
}
}
- SuccessorTx = false; // everything loaded
- return true;
- }
-
- void Complete(const TActorContext&) override {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT29, "TData::TTxDataLoad::Complete", (Id, Self->GetLogId()),
- (TrashLoaded, TrashLoaded), (SuccessorTx, SuccessorTx));
+ TCoroTx::FinishTx();
+ Self->Data->OnLoadComplete();
+ }));
+ }
- if (SuccessorTx) {
- Self->Execute(std::make_unique<TTxDataLoad>(*this));
- } else {
- Self->Data->OnLoadComplete();
+ bool TData::LoadTrash(NTabletFlatExecutor::TTransactionContext& txc, TString& from, bool& progress) {
+ NIceDb::TNiceDb db(txc.DB);
+ auto table = db.Table<Schema::Trash>().GreaterOrEqual(from);
+ static constexpr ui64 PrechargeRows = 10'000;
+ static constexpr ui64 PrechargeBytes = 1'000'000;
+ if (!table.Precharge(PrechargeRows, PrechargeBytes)) {
+ return false;
+ }
+ auto rows = table.Select();
+ if (!rows.IsReady()) {
+ return false;
+ }
+ while (rows.IsValid()) {
+ if (auto key = rows.GetKey(); key != from) {
+ Self->Data->AddTrashOnLoad(TLogoBlobID::FromBinary(key));
+ from = std::move(key);
+ progress = true;
+ }
+ if (!rows.Next()) {
+ return false;
}
}
- };
-
- void TData::StartLoad() {
- Self->Execute(std::make_unique<TTxDataLoad>(Self));
+ return true;
}
void TData::OnLoadComplete() {
- Loaded = true;
+ Self->Data->LoadedKeys([&](const TKey& left, const TKey& right) {
+ // verify that LoadedKeys == {Min, Max} exactly
+ Y_VERIFY_S(left == TKey::Min() && right == TKey::Max() && !Loaded, "Id# " << Self->GetLogId()
+ << " Left# " << left.ToString()
+ << " Right# " << right.ToString()
+ << " Loaded# " << Loaded
+ << " LoadedKeys# " << LoadedKeys.ToString());
+ Loaded = true;
+ return true;
+ });
+ Y_VERIFY(Loaded);
Self->OnDataLoadComplete();
for (auto& [key, record] : RecordsPerChannelGroup) {
record.CollectIfPossible(this);
}
}
+ bool TData::EnsureKeyLoaded(const TKey& key, NTabletFlatExecutor::TTransactionContext& txc) {
+ if (IsKeyLoaded(key)) {
+ return true;
+ }
+
+ NIceDb::TNiceDb db(txc.DB);
+ using Table = Schema::Data;
+ auto row = db.Table<Table>().Key(key.MakeBinaryKey()).Select();
+ if (!row.IsReady()) {
+ return false;
+ } else {
+ if (row.IsValid()) {
+ AddDataOnLoad(key, row.GetValue<Table::Value>(), row.GetValueOrDefault<Table::UncertainWrite>());
+ }
+ Self->Data->LoadedKeys |= {key, key};
+ return true;
+ }
+ }
+
void TBlobDepot::StartDataLoad() {
Data->StartLoad();
}
diff --git a/ydb/core/blob_depot/data_resolve.cpp b/ydb/core/blob_depot/data_resolve.cpp
index 6570d5b71f..68856636ed 100644
--- a/ydb/core/blob_depot/data_resolve.cpp
+++ b/ydb/core/blob_depot/data_resolve.cpp
@@ -83,149 +83,108 @@ namespace NKikimr::NBlobDepot {
class TData::TTxResolve : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> {
std::unique_ptr<TEvBlobDepot::TEvResolve::THandle> Request;
- TResolveDecommitContext ResolveDecommitContext;
-
- bool KeysLoaded = false;
- int ItemIndex = 0;
- std::optional<TKey> LastScannedKey;
- ui32 NumKeysRead = 0; // number of keys already read for this item
-
- // final state
+ THashSet<TLogoBlobID> ResolutionErrors;
+ size_t ItemIndex = 0;
+ std::optional<TScanRange> Range;
TResolveResultAccumulator Result;
- bool SuccessorTx = false;
std::deque<TKey> Uncertainties;
+ bool SuccessorTx = false;
+
public:
TTxType GetTxType() const override { return NKikimrBlobDepot::TXTYPE_RESOLVE; }
- TTxResolve(TBlobDepot *self, TEvBlobDepot::TEvResolve::TPtr request,
- TResolveDecommitContext&& resolveDecommitContext = {})
+ TTxResolve(TBlobDepot *self, TEvBlobDepot::TEvResolve::TPtr request, THashSet<TLogoBlobID>&& resolutionErrors)
: TTransactionBase(self)
, Request(request.Release())
- , ResolveDecommitContext(std::move(resolveDecommitContext))
+ , ResolutionErrors(std::move(resolutionErrors))
, Result(*Request)
{}
TTxResolve(TTxResolve& predecessor)
: TTransactionBase(predecessor.Self)
, Request(std::move(predecessor.Request))
- , ResolveDecommitContext(std::move(predecessor.ResolveDecommitContext))
- , KeysLoaded(predecessor.KeysLoaded)
+ , ResolutionErrors(std::move(predecessor.ResolutionErrors))
, ItemIndex(predecessor.ItemIndex)
- , LastScannedKey(std::move(predecessor.LastScannedKey))
- , NumKeysRead(predecessor.NumKeysRead)
+ , Range(std::move(predecessor.Range))
, Result(std::move(predecessor.Result))
+ , Uncertainties(std::move(predecessor.Uncertainties))
{}
- bool GetScanParams(const NKikimrBlobDepot::TEvResolve::TItem& item, std::optional<TKey> *begin,
- std::optional<TKey> *end, TScanFlags *flags, ui64 *maxKeys) {
- switch (item.GetKeyDesignatorCase()) {
- case NKikimrBlobDepot::TEvResolve::TItem::kKeyRange: {
- const auto& range = item.GetKeyRange();
- *flags = TScanFlags()
- | (range.GetIncludeBeginning() ? EScanFlags::INCLUDE_BEGIN : TScanFlags())
- | (range.GetIncludeEnding() ? EScanFlags::INCLUDE_END : TScanFlags())
- | (range.GetReverse() ? EScanFlags::REVERSE : TScanFlags());
- if (range.HasBeginningKey()) {
- begin->emplace(TKey::FromBinaryKey(range.GetBeginningKey(), Self->Config));
- } else {
- begin->reset();
- }
- if (range.HasEndingKey()) {
- end->emplace(TKey::FromBinaryKey(range.GetEndingKey(), Self->Config));
- } else {
- end->reset();
- }
- *maxKeys = range.GetMaxKeys();
- return true;
- }
-
- case NKikimrBlobDepot::TEvResolve::TItem::kExactKey:
- begin->emplace(TKey::FromBinaryKey(item.GetExactKey(), Self->Config));
- end->emplace(begin->value());
- *flags = EScanFlags::INCLUDE_BEGIN | EScanFlags::INCLUDE_END;
- *maxKeys = 1;
- return true;
-
- case NKikimrBlobDepot::TEvResolve::TItem::KEYDESIGNATOR_NOT_SET:
- return false;
- }
-
- return false;
- }
-
bool Execute(TTransactionContext& txc, const TActorContext&) override {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT22, "TTxResolve::Execute", (Id, Self->GetLogId()),
- (Sender, Request->Sender), (Cookie, Request->Cookie), (ItemIndex, ItemIndex),
- (LastScannedKey, LastScannedKey), (DecommitBlobs.size, ResolveDecommitContext.DecommitBlobs.size()));
+ (Sender, Request->Sender), (Cookie, Request->Cookie), (ItemIndex, ItemIndex));
bool progress = false;
- if (!KeysLoaded && !LoadKeys(txc, progress)) {
- return progress;
- } else if (SuccessorTx) {
- return true;
- } else {
- KeysLoaded = true;
- }
-
- if (ResolveDecommitContext.ReturnAfterLoadingKeys) {
- return true;
- }
-
- ui32 numItemsRemain = 10'000;
-
- while (!ResolveDecommitContext.DecommitBlobs.empty()) {
- if (!numItemsRemain) {
- SuccessorTx = true;
- return true;
- }
-
- const auto& blob = ResolveDecommitContext.DecommitBlobs.front();
- if (Self->Data->LastAssimilatedBlobId < blob.Id) {
- numItemsRemain -= Self->Data->AddDataOnDecommit(blob, txc, this);
- }
- ResolveDecommitContext.DecommitBlobs.pop_front();
- }
const auto& record = Request->Get()->Record;
- for (const auto& item : record.GetItems()) {
+ for (; ItemIndex < record.ItemsSize(); ++ItemIndex) {
+ const auto& item = record.GetItems(ItemIndex);
std::optional<ui64> cookie = item.HasCookie() ? std::make_optional(item.GetCookie()) : std::nullopt;
-
+ std::optional<bool> status;
switch (item.GetKeyDesignatorCase()) {
- case NKikimrBlobDepot::TEvResolve::TItem::kKeyRange: {
- std::optional<TKey> begin;
- std::optional<TKey> end;
- TScanFlags flags;
- ui64 maxKeys;
- const bool success = GetScanParams(item, &begin, &end, &flags, &maxKeys);
- Y_VERIFY(success);
- Self->Data->ScanRange(begin, end, flags, [&](const TKey& key, const TValue& value) {
- IssueResponseItem(cookie, key, value);
- return --maxKeys != 0;
- });
+ case NKikimrBlobDepot::TEvResolve::TItem::kKeyRange:
+ status = ProcessKeyRange(item.GetKeyRange(), txc, progress, cookie, item.GetMustRestoreFirst());
break;
- }
- case NKikimrBlobDepot::TEvResolve::TItem::kExactKey: {
- const auto key = TKey::FromBinaryKey(item.GetExactKey(), Self->Config);
- const TValue *value = Self->Data->FindKey(key);
- const auto& errors = ResolveDecommitContext.ResolutionErrors;
- if (value || std::binary_search(errors.begin(), errors.end(), key)) {
- static const TValue zeroValue;
- IssueResponseItem(cookie, key, value ? *value : zeroValue);
- }
+ case NKikimrBlobDepot::TEvResolve::TItem::kExactKey:
+ status = ProcessExactKey(item.GetExactKey(), txc, progress, cookie, item.GetMustRestoreFirst());
break;
- }
case NKikimrBlobDepot::TEvResolve::TItem::KEYDESIGNATOR_NOT_SET:
Y_VERIFY_DEBUG(false, "incorrect query field");
break;
}
+ if (status) {
+ return *status;
+ }
}
return true;
}
+ std::optional<bool> ProcessKeyRange(const NKikimrBlobDepot::TEvResolve::TKeyRange& range,
+ TTransactionContext& txc, bool& progress, const std::optional<ui64>& cookie, bool mustRestoreFirst) {
+ if (!Range) {
+ Range.emplace();
+ Range->Begin = range.HasBeginningKey()
+ ? TKey::FromBinaryKey(range.GetBeginningKey(), Self->Config)
+ : TKey::Min();
+ Range->End = range.HasEndingKey()
+ ? TKey::FromBinaryKey(range.GetEndingKey(), Self->Config)
+ : TKey::Max();
+ Range->Flags = TScanFlags()
+ | (range.GetIncludeBeginning() ? EScanFlags::INCLUDE_BEGIN : TScanFlags())
+ | (range.GetIncludeEnding() ? EScanFlags::INCLUDE_END : TScanFlags())
+ | (range.GetReverse() ? EScanFlags::REVERSE : TScanFlags());
+ Range->MaxKeys = range.GetMaxKeys();
+ }
+ auto callback = [&](const TKey& key, const TValue& value) {
+ IssueResponseItem(cookie, key, value, mustRestoreFirst);
+ return true;
+ };
+ if (Self->Data->ScanRange(*Range, &txc, &progress, callback)) {
+ Range.reset();
+ return std::nullopt;
+ } else {
+ return SuccessorTx = progress;
+ }
+ }
+
+ std::optional<bool> ProcessExactKey(const TString& exactKey, TTransactionContext& txc, bool& progress,
+ const std::optional<ui64>& cookie, bool mustRestoreFirst) {
+ const auto key = TKey::FromBinaryKey(exactKey, Self->Config);
+ if (!Self->Data->EnsureKeyLoaded(key, txc)) {
+ return SuccessorTx = progress;
+ }
+ const TValue *value = Self->Data->FindKey(key);
+ if (value || (!ResolutionErrors.empty() && ResolutionErrors.contains(key.GetBlobId()))) {
+ static const TValue zero;
+ IssueResponseItem(cookie, key, value ? *value : zero, mustRestoreFirst);
+ }
+ return std::nullopt;
+ }
+
void Complete(const TActorContext&) override {
STLOG(PRI_DEBUG, BLOB_DEPOT, BDT30, "TTxResolve::Complete", (Id, Self->GetLogId()),
(Sender, Request->Sender), (Cookie, Request->Cookie), (SuccessorTx, SuccessorTx),
@@ -235,8 +194,6 @@ namespace NKikimr::NBlobDepot {
if (SuccessorTx) {
Self->Execute(std::make_unique<TTxResolve>(*this));
- } else if (const TActorId recipient = ResolveDecommitContext.ReturnAfterLoadingKeys) {
- TActivationContext::Send(Request->Forward(recipient));
} else if (Uncertainties.empty()) {
Result.Send(NKikimrProto::OK, std::nullopt);
} else {
@@ -244,132 +201,7 @@ namespace NKikimr::NBlobDepot {
}
}
- bool LoadKeys(TTransactionContext& txc, bool& progress) {
- NIceDb::TNiceDb db(txc.DB);
-
- if (Self->Data->Loaded) {
- return true;
- }
-
- const auto& record = Request->Get()->Record;
- const auto& items = record.GetItems();
- for (; ItemIndex < items.size(); ++ItemIndex, LastScannedKey.reset(), NumKeysRead = 0) {
- const auto& item = items[ItemIndex];
-
- std::optional<TKey> begin;
- std::optional<TKey> end;
- TScanFlags flags;
- ui64 maxKeys;
- const bool success = GetScanParams(item, &begin, &end, &flags, &maxKeys);
- Y_VERIFY_DEBUG(success);
-
- // adjust range according to actually generated data
- if (LastScannedKey) {
- if (flags & EScanFlags::REVERSE) { // reverse scan
- end = *LastScannedKey;
- flags &= ~EScanFlags::INCLUDE_END;
- } else { // direct scan
- begin = *LastScannedKey;
- flags &= ~EScanFlags::INCLUDE_BEGIN;
- }
- }
-
- TClosedIntervalSet<TKey> needed;
- needed |= {begin.value_or(TKey::Min()), end.value_or(TKey::Max())};
- needed -= Self->Data->LoadedKeys;
- if (!needed) {
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT76, "TTxResolve: skipping subrange", (Id, Self->GetLogId()),
- (Sender, Request->Sender), (Cookie, Request->Cookie), (ItemIndex, ItemIndex));
- continue;
- }
-
- std::optional<TKey> boundary;
-
- auto processRange = [&](auto table) {
- bool done = false;
- for (auto rowset = table.Select();; rowset.Next()) {
- if (!rowset.IsReady()) {
- return done;
- } else if (!rowset.IsValid()) {
- // no more keys in our direction -- we have scanned full requested range
- boundary.emplace(flags & EScanFlags::REVERSE
- ? begin.value_or(TKey::Min())
- : end.value_or(TKey::Max()));
- return true;
- }
- auto key = TKey::FromBinaryKey(rowset.template GetValue<Schema::Data::Key>(), Self->Config);
- boundary.emplace(key);
- if (key != LastScannedKey) {
- LastScannedKey = key;
- progress = true;
-
- const bool isKeyLoaded = Self->Data->IsKeyLoaded(key);
-
- STLOG(PRI_DEBUG, BLOB_DEPOT, BDT85, "TTxResolve: loading key from database", (Id, Self->GetLogId()),
- (Sender, Request->Sender), (Cookie, Request->Cookie), (ItemIndex, ItemIndex), (Key, key),
- (IsKeyLoaded, isKeyLoaded));
-
- if (!isKeyLoaded) {
- Self->Data->AddDataOnLoad(key, rowset.template GetValue<Schema::Data::Value>(),
- rowset.template GetValueOrDefault<Schema::Data::UncertainWrite>());
- }
-
- const bool matchBegin = !begin || (flags & EScanFlags::INCLUDE_BEGIN ? *begin <= key : *begin < key);
- const bool matchEnd = !end || (flags & EScanFlags::INCLUDE_END ? key <= *end : key < *end);
- if (matchBegin && matchEnd && ResolveDecommitContext.DecommitBlobs.empty() && ++NumKeysRead == maxKeys) {
- // we have hit the MaxItems limit, exit
- done = true;
- }
- if (flags & EScanFlags::REVERSE ? !matchEnd : !matchBegin) {
- // we have exceeded our range
- done = true;
- }
- }
- }
- };
-
- auto applyEnd = [&](auto&& x) {
- return end
- ? processRange(x.LessOrEqual(end->MakeBinaryKey()))
- : processRange(std::forward<std::decay_t<decltype(x)>>(x));
- };
- auto applyBegin = [&](auto&& x) {
- return begin
- ? applyEnd(x.GreaterOrEqual(begin->MakeBinaryKey()))
- : applyEnd(std::forward<std::decay_t<decltype(x)>>(x));
- };
- auto applyReverse = [&](auto&& x) {
- return flags & EScanFlags::REVERSE
- ? applyBegin(x.Reverse())
- : applyBegin(std::forward<std::decay_t<decltype(x)>>(x));
- };
-
- const bool status = applyReverse(db.Table<Schema::Data>());
-
- if (boundary) {
- if (flags & EScanFlags::REVERSE) {
- Self->Data->LoadedKeys |= {*boundary, end.value_or(TKey::Max())};
- } else {
- Self->Data->LoadedKeys |= {begin.value_or(TKey::Min()), *boundary};
- }
- }
-
- if (status) {
- continue; // all work done for this item
- } else if (progress) {
- // we have already done something, so let's finish this transaction and start a new one, continuing
- // the job
- SuccessorTx = true;
- return true;
- } else {
- return false; // we'll have to restart this transaction to fetch some data
- }
- }
-
- return true;
- }
-
- void IssueResponseItem(std::optional<ui64> cookie, const TKey& key, const TValue& value) {
+ void IssueResponseItem(std::optional<ui64> cookie, const TKey& key, const TValue& value, bool reliablyWritten) {
NKikimrBlobDepot::TEvResolveResult::TResolvedKey item;
if (cookie) {
@@ -381,6 +213,7 @@ namespace NKikimr::NBlobDepot {
auto *out = item.AddValueChain();
out->SetGroupId(Self->Config.GetVirtualGroupId());
LogoBlobIDFromLogoBlobID(key.GetBlobId(), out->MutableBlobId());
+ item.SetReliablyWritten(reliablyWritten);
} else {
EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), [&](const TLogoBlobID& id, ui32 begin, ui32 end) {
if (begin != end) {
@@ -395,13 +228,13 @@ namespace NKikimr::NBlobDepot {
}
}
});
+ item.SetReliablyWritten(true);
}
if (value.Meta) {
item.SetMeta(value.Meta.data(), value.Meta.size());
}
- const auto& errors = ResolveDecommitContext.ResolutionErrors;
- if (std::binary_search(errors.begin(), errors.end(), key)) {
+ if (!ResolutionErrors.empty() && ResolutionErrors.contains(key.GetBlobId())) {
item.SetErrorReason("item resolution error");
item.ClearValueChain();
} else if (!item.ValueChainSize()) {
@@ -423,12 +256,12 @@ namespace NKikimr::NBlobDepot {
if (Self->Config.GetIsDecommittingGroup() && Self->DecommitState <= EDecommitState::BlobsFinished) {
Self->RegisterWithSameMailbox(CreateResolveDecommitActor(ev));
} else {
- Self->Execute(std::make_unique<TTxResolve>(Self, ev));
+ ExecuteTxResolve(ev);
}
}
- void TData::ExecuteTxResolve(TEvBlobDepot::TEvResolve::TPtr ev, TResolveDecommitContext&& context) {
- Self->Execute(std::make_unique<TTxResolve>(Self, ev, std::move(context)));
+ void TData::ExecuteTxResolve(TEvBlobDepot::TEvResolve::TPtr ev, THashSet<TLogoBlobID>&& resolutionErrors) {
+ Self->Execute(std::make_unique<TTxResolve>(Self, ev, std::move(resolutionErrors)));
}
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/garbage_collection.cpp b/ydb/core/blob_depot/garbage_collection.cpp
index 4aa554fa66..60ea091567 100644
--- a/ydb/core/blob_depot/garbage_collection.cpp
+++ b/ydb/core/blob_depot/garbage_collection.cpp
@@ -273,8 +273,9 @@ namespace NKikimr::NBlobDepot {
const TData::TKey last(TLogoBlobID(tabletId, barrierGenStep.Generation(), barrierGenStep.Step(),
channel, TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie, TLogoBlobID::MaxPartId,
TLogoBlobID::MaxCrcMode));
- Self->Data->ScanRange(&first, &last, TData::EScanFlags::INCLUDE_BEGIN | TData::EScanFlags::INCLUDE_END,
- [&](const TData::TKey& key, const TData::TValue& value) {
+
+ TData::TScanRange r{first, last, TData::EScanFlags::INCLUDE_BEGIN | TData::EScanFlags::INCLUDE_END};
+ Self->Data->ScanRange(r, nullptr, nullptr, [&](const TData::TKey& key, const TData::TValue& value) {
// there must be no blobs under the hard barrier and no blobs with mode other than Keep under the soft one
Y_VERIFY_S(!hard && value.KeepState == NKikimrBlobDepot::EKeepState::Keep, "Key# " << key.ToString()
<< " Value# " << value.ToString());
@@ -282,7 +283,8 @@ namespace NKikimr::NBlobDepot {
});
}
# if 0
- Self->Data->ScanRange(nullptr, nullptr, {}, [&](const TData::TKey& key, const TData::TValue& value) {
+ TData::TScanRange r{TData::TKey::Min(), TData::TKey::Max()};
+ Self->Data->ScanRange(r, nullptr, nullptr, [&](const TData::TKey& key, const TData::TValue& value) {
bool underSoft, underHard;
Self->BarrierServer->GetBlobBarrierRelation(key.GetBlobId(), &underSoft, &underHard);
Y_VERIFY(!underHard && (!underSoft || value.KeepState == NKikimrBlobDepot::EKeepState::Keep));
diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp
index e062d00f30..2373af001c 100644
--- a/ydb/core/blob_depot/op_commit_blob_seq.cpp
+++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp
@@ -133,25 +133,13 @@ namespace NKikimr::NBlobDepot {
if (Self->Data->IsLoaded()) {
return true;
}
- bool success = true;
for (const auto& item : Request->Get()->Record.GetItems()) {
auto key = TData::TKey::FromBinaryKey(item.GetKey(), Self->Config);
- if (Self->Data->IsKeyLoaded(key)) {
- continue;
- }
- using Table = Schema::Data;
- auto row = db.Table<Table>().Key(item.GetKey()).Select();
- if (!row.IsReady()) {
- success = false;
- } else {
- Self->Data->LoadedKeys |= {key, key};
- if (row.IsValid()) {
- Self->Data->AddDataOnLoad(std::move(key), row.GetValue<Table::Value>(),
- row.GetValueOrDefault<Table::UncertainWrite>());
- }
+ if (!Self->Data->EnsureKeyLoaded(key, txc)) {
+ return false;
}
}
- return success;
+ return true;
}
bool CheckKeyAgainstBarrier(const TData::TKey& key, TString *error) {
diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h
index ac33792de6..ba9a3f0148 100644
--- a/ydb/core/blob_depot/types.h
+++ b/ydb/core/blob_depot/types.h
@@ -151,7 +151,6 @@ namespace NKikimr::NBlobDepot {
};
using TValueChain = NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TValueChain>;
- using TResolvedValueChain = NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TResolvedValueChain>;
template<typename TCallback>
void EnumerateBlobsForValueChain(const TValueChain& valueChain, ui64 tabletId, TCallback&& callback) {
diff --git a/ydb/core/protos/blob_depot.proto b/ydb/core/protos/blob_depot.proto
index 488858ccc1..2efe99467e 100644
--- a/ydb/core/protos/blob_depot.proto
+++ b/ydb/core/protos/blob_depot.proto
@@ -222,6 +222,7 @@ message TEvResolveResult {
repeated uint64 Owners = 5;
optional string ErrorReason = 6; // if set, this means value wasn't resolved due to error
optional uint32 ValueVersion = 7; // ValueChain version, gets increased every time value is changed
+ optional bool ReliablyWritten = 8; // MustRestoreFirst was either set, or the blob is in local storage
}
optional NKikimrProto.EReplyStatus Status = 1; // OVERRUN means there are more messages on the way
optional string ErrorReason = 2;