diff options
author | alexvru <alexvru@ydb.tech> | 2023-02-01 12:40:07 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2023-02-01 12:40:07 +0300 |
commit | 959f19c2ac5e04e166d118381cb36ebfc660d25f (patch) | |
tree | e0f6488e261554a302d193ea2ca94eee3d81557a | |
parent | 72345589f0c36d577a467ab01854e20bb7e23a13 (diff) | |
download | ydb-959f19c2ac5e04e166d118381cb36ebfc660d25f.tar.gz |
Improve BlobDepot
30 files changed, 1217 insertions, 687 deletions
diff --git a/ydb/core/blob_depot/CMakeLists.darwin.txt b/ydb/core/blob_depot/CMakeLists.darwin.txt index e1ac07e603..601a05c08f 100644 --- a/ydb/core/blob_depot/CMakeLists.darwin.txt +++ b/ydb/core/blob_depot/CMakeLists.darwin.txt @@ -22,6 +22,7 @@ target_sources(ydb-core-blob_depot PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/assimilator.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blocks.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/coro_tx.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_decommit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_gc.cpp diff --git a/ydb/core/blob_depot/CMakeLists.linux-aarch64.txt b/ydb/core/blob_depot/CMakeLists.linux-aarch64.txt index ae8c5e7a0d..e12fd5266e 100644 --- a/ydb/core/blob_depot/CMakeLists.linux-aarch64.txt +++ b/ydb/core/blob_depot/CMakeLists.linux-aarch64.txt @@ -23,6 +23,7 @@ target_sources(ydb-core-blob_depot PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/assimilator.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blocks.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/coro_tx.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_decommit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_gc.cpp diff --git a/ydb/core/blob_depot/CMakeLists.linux.txt b/ydb/core/blob_depot/CMakeLists.linux.txt index ae8c5e7a0d..e12fd5266e 100644 --- a/ydb/core/blob_depot/CMakeLists.linux.txt +++ b/ydb/core/blob_depot/CMakeLists.linux.txt @@ -23,6 +23,7 @@ target_sources(ydb-core-blob_depot PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/assimilator.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/blocks.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/coro_tx.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_decommit.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/data_gc.cpp diff --git a/ydb/core/blob_depot/agent/CMakeLists.darwin.txt b/ydb/core/blob_depot/agent/CMakeLists.darwin.txt index be98146c1b..c24e4325cf 100644 --- a/ydb/core/blob_depot/agent/CMakeLists.darwin.txt +++ b/ydb/core/blob_depot/agent/CMakeLists.darwin.txt @@ -26,6 +26,7 @@ target_sources(core-blob_depot-agent PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/query.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/read.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/request.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/resolved_value.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/status.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_put.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_get.cpp diff --git a/ydb/core/blob_depot/agent/CMakeLists.linux-aarch64.txt b/ydb/core/blob_depot/agent/CMakeLists.linux-aarch64.txt index e081b10637..ef14494980 100644 --- a/ydb/core/blob_depot/agent/CMakeLists.linux-aarch64.txt +++ b/ydb/core/blob_depot/agent/CMakeLists.linux-aarch64.txt @@ -27,6 +27,7 @@ target_sources(core-blob_depot-agent PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/query.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/read.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/request.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/resolved_value.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/status.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_put.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_get.cpp diff --git a/ydb/core/blob_depot/agent/CMakeLists.linux.txt b/ydb/core/blob_depot/agent/CMakeLists.linux.txt index e081b10637..ef14494980 100644 --- a/ydb/core/blob_depot/agent/CMakeLists.linux.txt +++ b/ydb/core/blob_depot/agent/CMakeLists.linux.txt @@ -27,6 +27,7 @@ target_sources(core-blob_depot-agent PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/query.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/read.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/request.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/resolved_value.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/status.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_put.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/agent/storage_get.cpp diff --git a/ydb/core/blob_depot/agent/agent_impl.h b/ydb/core/blob_depot/agent/agent_impl.h index fa79201822..f9b9bb4ef0 100644 --- a/ydb/core/blob_depot/agent/agent_impl.h +++ b/ydb/core/blob_depot/agent/agent_impl.h @@ -1,6 +1,7 @@ #pragma once #include "defs.h" +#include "resolved_value.h" namespace NKikimr::NBlobDepot { @@ -33,8 +34,43 @@ namespace NKikimr::NBlobDepot { struct TTabletDisconnected {}; struct TKeyResolved { - const TResolvedValueChain* ValueChain; - std::optional<TString> ErrorReason; + struct TSuccess { + const TResolvedValue *Value; + }; + struct TError { + TString ErrorReason; + }; + std::variant<TSuccess, TError> Outcome; + + TKeyResolved(const TResolvedValue *value) + : Outcome(TSuccess{value}) + {} + + static constexpr struct TResolutionError {} ResolutionError{}; + + TKeyResolved(TResolutionError, TString errorReason) + : Outcome(TError{std::move(errorReason)}) + {} + + bool Error() const { return std::holds_alternative<TError>(Outcome); } + bool Success() const { return std::holds_alternative<TSuccess>(Outcome); } + const TResolvedValue *GetResolvedValue() const { return std::get<TSuccess>(Outcome).Value; } + + void Output(IOutputStream& s) const { + if (auto *success = std::get_if<TSuccess>(&Outcome)) { + s << (success->Value ? success->Value->ToString() : "<no data>"); + } else if (auto *error = std::get_if<TError>(&Outcome)) { + s << "Error# '" << EscapeC(error->ErrorReason) << '\''; + } else { + Y_FAIL(); + } + } + + TString ToString() const { + TStringStream s; + Output(s); + return s.Str(); + } }; class TRequestSender; @@ -297,6 +333,23 @@ namespace NKikimr::NBlobDepot { virtual void OnIdAllocated() {} virtual void OnDestroy(bool /*success*/) {} + protected: // reading logic + struct TReadContext; + struct TReadArg { + TResolvedValue Value; + NKikimrBlobStorage::EGetHandleClass GetHandleClass; + bool MustRestoreFirst = false; + ui64 Offset = 0; + ui64 Size = 0; + ui64 Tag = 0; + std::optional<TEvBlobStorage::TEvGet::TReaderTabletData> ReaderTabletData; + TString Key; // the key we are reading -- this is used for retries when we are getting NODATA + }; + + bool IssueRead(TReadArg&& arg, TString& error); + void HandleGetResult(const TRequestContext::TPtr& context, TEvBlobStorage::TEvGetResult& msg); + void HandleResolveResult(const TRequestContext::TPtr& context, TEvBlobDepot::TEvResolveResult& msg); + public: struct TDeleter { static void Destroy(TQuery *query) { delete query; } @@ -394,26 +447,6 @@ namespace NKikimr::NBlobDepot { TBlocksManager& BlocksManager; //////////////////////////////////////////////////////////////////////////////////////////////////////////////// - // Reading - - struct TReadContext; - struct TReadArg { - const TResolvedValueChain& Values; - NKikimrBlobStorage::EGetHandleClass GetHandleClass; - bool MustRestoreFirst = false; - TQuery *Query = nullptr; - ui64 Offset = 0; - ui64 Size = 0; - ui64 Tag = 0; - std::optional<TEvBlobStorage::TEvGet::TReaderTabletData> ReaderTabletData; - }; - - bool IssueRead(const TReadArg& arg, TString& error); - static TString GetValueChainId(const TResolvedValueChain& valueChain); - - void HandleGetResult(const TRequestContext::TPtr& context, TEvBlobStorage::TEvGetResult& msg); - - //////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Blob mapping cache class TBlobMappingCache; @@ -425,6 +458,17 @@ namespace NKikimr::NBlobDepot { TStorageStatusFlags GetStorageStatusFlags() const; float GetApproximateFreeSpaceShare() const; + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Logging + + TString PrettyKey(const TString& key) const { + if (VirtualGroupId) { + return TLogoBlobID::FromBinary(key).ToString(); + } else { + return EscapeC(key); + } + } }; #define BDEV_QUERY(MARKER, TEXT, ...) BDEV(MARKER, TEXT, (VG, Agent.VirtualGroupId), (BDT, Agent.TabletId), \ diff --git a/ydb/core/blob_depot/agent/blob_mapping_cache.cpp b/ydb/core/blob_depot/agent/blob_mapping_cache.cpp index 49e9d467e5..77800af63a 100644 --- a/ydb/core/blob_depot/agent/blob_mapping_cache.cpp +++ b/ydb/core/blob_depot/agent/blob_mapping_cache.cpp @@ -4,9 +4,11 @@ namespace NKikimr::NBlobDepot { struct TResolveContext : TRequestContext { TString Key; + bool MustRestoreFirst; - TResolveContext(TString key) + TResolveContext(TString key, bool mustRestoreFirst) : Key(std::move(key)) + , MustRestoreFirst(mustRestoreFirst) {} }; @@ -15,60 +17,104 @@ namespace NKikimr::NBlobDepot { STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA28, "HandleResolveResult", (AgentId, Agent.LogId), (Cookie, tag), (Msg, msg)); - auto process = [&](TString key, const NKikimrBlobDepot::TEvResolveResult::TResolvedKey *item) { - const TStringBuf keyBuf = key; - TCachedKeyItem& entry = Cache.try_emplace(std::move(key), keyBuf).first->second; - if (item) { - Y_VERIFY(entry.Key == item->GetKey()); - entry.Values = item->GetValueChain(); + auto process = [&](const auto& item, bool nodata) { + // check if there is an error or no data attached + if (item.HasErrorReason() || item.GetValueChain().empty() || nodata) { + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA43, "HandleResolveResult error", (AgentId, Agent.LogId), (Item, item), + (NoData, nodata), (Key, Agent.PrettyKey(item.GetKey()))); + + const TString errorReason = item.HasErrorReason() ? item.GetErrorReason() : "no data attached to the key"; + const auto it = Cache.find(item.GetKey()); + if (it != Cache.end()) { + for (const auto& [id, mustRestoreFirst] : it->second.PendingQueries) { + Agent.OnRequestComplete(id, nodata ? TKeyResolved(nullptr) : + TKeyResolved(TKeyResolved::ResolutionError, errorReason), + Agent.OtherRequestInFlight); + } + Cache.erase(it); + } } else { - entry.Values.Clear(); - } - Queue.PushBack(&entry); - entry.ResolveInFlight = false; - for (const ui64 id : std::exchange(entry.PendingQueries, {})) { - Agent.OnRequestComplete(id, TKeyResolved{ - entry.Values.empty() ? nullptr : &entry.Values, - item && item->HasErrorReason() ? std::make_optional(item->GetErrorReason()) : std::nullopt - }, Agent.OtherRequestInFlight); + TString key = item.GetKey(); + TStringBuf keyBuf = key; + const auto [it, inserted] = Cache.try_emplace(std::move(key), keyBuf); + auto& [key_, entry] = *it; + + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA44, "HandleResolveResult success", (AgentId, Agent.LogId), (Item, item), + (NoData, nodata), (Key, Agent.PrettyKey(key_)), (CurrentValue, entry.Value)); + + // update value if it supersedes current one + if (TResolvedValue value(item); value.Supersedes(entry.Value)) { + entry.Value = std::move(value); + } + + // notify matching queries about this + for (auto it = entry.PendingQueries.begin(); it != entry.PendingQueries.end(); ) { + const auto& [id, mustRestoreFirst] = *it; + if (mustRestoreFirst <= entry.Value.ReliablyWritten) { + Agent.OnRequestComplete(id, TKeyResolved(&entry.Value), Agent.OtherRequestInFlight); + entry.PendingQueries.erase(it++); + } else { + ++it; + } + } + + if (context) { + auto& resolveContext = context->Obtain<TResolveContext>(); + if (resolveContext.MustRestoreFirst) { + --entry.MustRestoreFirstResolvePending; + } else { + --entry.OrdinaryResolvePending; + } + } + + Queue.PushBack(&entry); + while (Cache.size() > 1'000'000) { + auto& front = *Queue.Front(); + Cache.erase(front.Key); + } } }; - for (const auto& item : msg.GetResolvedKeys()) { - process(item.GetKey(), &item); - if (context && context->Obtain<TResolveContext>().Key == item.GetKey()) { - context.reset(); + if (!context) { + // aside requests -- just add received data to cache + for (const auto& item : msg.GetResolvedKeys()) { + process(item, false); + } + } else { + auto& resolveContext = context->Obtain<TResolveContext>(); + if (msg.ResolvedKeysSize() == 1) { + const auto& item = msg.GetResolvedKeys(0); + Y_VERIFY(item.GetKey() == resolveContext.Key); + process(item, false); + } else if (msg.ResolvedKeysSize() == 0) { + NKikimrBlobDepot::TEvResolveResult::TResolvedKey item; + item.SetKey(resolveContext.Key); + process(item, true); + } else { + Y_FAIL("unexpected resolve response"); } - } - if (context) { - process(context->Obtain<TResolveContext>().Key, nullptr); } } - const TResolvedValueChain *TBlobDepotAgent::TBlobMappingCache::ResolveKey(TString key, TQuery *query, - TRequestContext::TPtr context, bool force) { + const TResolvedValue *TBlobDepotAgent::TBlobMappingCache::ResolveKey(TString key, TQuery *query, + TRequestContext::TPtr context, bool mustRestoreFirst) { + // obtain entry in the cache const TStringBuf keyBuf = key; - auto& entry = Cache.try_emplace(std::move(key), keyBuf).first->second; - if (!entry.Values.empty() && !force) { - return &entry.Values; - } - if (!entry.ResolveInFlight) { - entry.ResolveInFlight = true; + const auto [it, inserted] = Cache.try_emplace(std::move(key), keyBuf); + auto& entry = it->second; - NKikimrBlobDepot::TEvResolve msg; - auto *item = msg.AddItems(); - item->SetExactKey(TString(keyBuf)); - - if (Agent.VirtualGroupId) { - const auto& id = TLogoBlobID::FromBinary(keyBuf); - item->SetTabletId(id.TabletID()); - } + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA45, "ResolveKey", (AgentId, Agent.LogId), (Key, Agent.PrettyKey(it->first)), + (MustRestoreFirst, mustRestoreFirst), (Value, entry.Value), (OrdinaryResolvePending, entry.OrdinaryResolvePending), + (MustRestoreFirstResolvePending, entry.MustRestoreFirstResolvePending)); - Agent.Issue(std::move(msg), this, std::make_unique<TResolveContext>(TString(keyBuf))); + // return cached value if we have all conditions met for this query + if (!entry.Value.IsEmpty() && mustRestoreFirst <= entry.Value.ReliablyWritten) { + return &entry.Value; } + // register query-local request for the key const ui64 id = Agent.NextOtherRequestId++; - const bool inserted1 = entry.PendingQueries.emplace(id).second; + const bool inserted1 = entry.PendingQueries.emplace(id, mustRestoreFirst).second; Y_VERIFY(inserted1); auto cancelCallback = [&entry, id] { const size_t numErased = entry.PendingQueries.erase(id); @@ -76,6 +122,19 @@ namespace NKikimr::NBlobDepot { }; Agent.RegisterRequest(id, query, std::move(context), std::move(cancelCallback), false); + // see if we have to issue the query + if (!entry.MustRestoreFirstResolvePending && (mustRestoreFirst || !entry.OrdinaryResolvePending)) { + NKikimrBlobDepot::TEvResolve msg; + auto *item = msg.AddItems(); + item->SetExactKey(TString(keyBuf)); + if (mustRestoreFirst != item->GetMustRestoreFirst()) { + item->SetMustRestoreFirst(mustRestoreFirst); + } + + Agent.Issue(std::move(msg), this, std::make_unique<TResolveContext>(TString(keyBuf), mustRestoreFirst)); + ++(mustRestoreFirst ? entry.MustRestoreFirstResolvePending : entry.OrdinaryResolvePending); + } + return nullptr; } @@ -85,12 +144,18 @@ namespace NKikimr::NBlobDepot { } else if (std::holds_alternative<TTabletDisconnected>(response)) { STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA38, "TBlobMappingCache::TTabletDisconnected", (AgentId, Agent.LogId), (Cookie, tag)); - if (auto resolveContext = std::dynamic_pointer_cast<TResolveContext>(context)) { - if (const auto it = Cache.find(resolveContext->Key); it != Cache.end() && it->second.ResolveInFlight) { - for (const ui64 id : std::exchange(it->second.PendingQueries, {})) { - Agent.OnRequestComplete(id, response, Agent.OtherRequestInFlight); - } - it->second.ResolveInFlight = false; + auto& resolveContext = context->Obtain<TResolveContext>(); + if (const auto it = Cache.find(resolveContext.Key); it != Cache.end()) { + for (const auto& [id, mustRestoreFirst] : std::exchange(it->second.PendingQueries, {})) { + Agent.OnRequestComplete(id, TKeyResolved(TKeyResolved::ResolutionError, "BlobDepot tablet disconnected"), + Agent.OtherRequestInFlight); + } + if (!it->second.Value.IsEmpty()) { + --(resolveContext.MustRestoreFirst + ? it->second.MustRestoreFirstResolvePending + : it->second.OrdinaryResolvePending); + } else { + Cache.erase(it); } } } else { diff --git a/ydb/core/blob_depot/agent/blob_mapping_cache.h b/ydb/core/blob_depot/agent/blob_mapping_cache.h index f3d74a7030..57aaa3ff4c 100644 --- a/ydb/core/blob_depot/agent/blob_mapping_cache.h +++ b/ydb/core/blob_depot/agent/blob_mapping_cache.h @@ -9,13 +9,14 @@ namespace NKikimr::NBlobDepot { : public TRequestSender { struct TCachedKeyItem : TIntrusiveListItem<TCachedKeyItem> { - TStringBuf Key; - TResolvedValueChain Values; - bool ResolveInFlight = false; - THashSet<ui64> PendingQueries; + TStringBuf Key; // key buffer (view of key part of the Cache set) + TResolvedValue Value; // recently resolved value, if any + ui32 OrdinaryResolvePending = 0; + ui32 MustRestoreFirstResolvePending = 0; + THashMap<ui64, bool> PendingQueries; // a set of queries waiting for this blob TCachedKeyItem(TStringBuf key) - : Key(std::move(key)) + : Key(key) {} }; @@ -28,7 +29,9 @@ namespace NKikimr::NBlobDepot { {} void HandleResolveResult(ui64 tag, const NKikimrBlobDepot::TEvResolveResult& msg, TRequestContext::TPtr context); - const TResolvedValueChain *ResolveKey(TString key, TQuery *query, TRequestContext::TPtr context, bool force = false); + const TResolvedValue *ResolveKey(TString key, TQuery *query, TRequestContext::TPtr context, bool mustRestoreFirst); + + private: void ProcessResponse(ui64 tag, TRequestContext::TPtr /*context*/, TResponse response) override; }; diff --git a/ydb/core/blob_depot/agent/read.cpp b/ydb/core/blob_depot/agent/read.cpp index c4888b6d24..d367c21b56 100644 --- a/ydb/core/blob_depot/agent/read.cpp +++ b/ydb/core/blob_depot/agent/read.cpp @@ -2,27 +2,34 @@ namespace NKikimr::NBlobDepot { - struct TBlobDepotAgent::TReadContext : TRequestContext { - TQuery *Query; - const ui64 Tag; + struct TBlobDepotAgent::TQuery::TReadContext : TRequestContext { + TReadArg ReadArg; const ui64 Size; TString Buffer; bool Terminated = false; ui32 NumPartsPending = 0; + TLogoBlobID BlobWithoutData; - TReadContext(TQuery *query, ui64 tag, ui64 size) - : Query(query) - , Tag(tag) + TReadContext(TReadArg&& readArg, ui64 size) + : ReadArg(std::move(readArg)) , Size(size) {} - void EndWithError(NKikimrProto::EReplyStatus status, TString errorReason) { - Query->OnRead(Tag, status, errorReason); + void EndWithError(TQuery *query, NKikimrProto::EReplyStatus status, TString errorReason) { + query->OnRead(ReadArg.Tag, status, errorReason); Terminated = true; } - void EndWithSuccess() { - Query->OnRead(Tag, NKikimrProto::OK, std::move(Buffer)); + void Abort() { + Terminated = true; + } + + void EndWithSuccess(TQuery *query) { + query->OnRead(ReadArg.Tag, NKikimrProto::OK, std::move(Buffer)); + } + + ui64 GetTag() const { + return ReadArg.Tag; } struct TPartContext : TRequestContext { @@ -35,7 +42,10 @@ namespace NKikimr::NBlobDepot { }; }; - bool TBlobDepotAgent::IssueRead(const TReadArg& arg, TString& error) { + bool TBlobDepotAgent::TQuery::IssueRead(TReadArg&& arg, TString& error) { + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA34, "IssueRead", (AgentId, Agent.LogId), (QueryId, GetQueryId()), + (ReadId, arg.Tag), (Offset, arg.Offset), (Size, arg.Size), (Value, arg.Value)); + ui64 outputOffset = 0; struct TReadItem { @@ -50,16 +60,16 @@ namespace NKikimr::NBlobDepot { ui64 offset = arg.Offset; ui64 size = arg.Size; - for (const auto& value : arg.Values) { - const ui32 groupId = value.GetGroupId(); - const auto blobId = LogoBlobIDFromLogoBlobID(value.GetBlobId()); - const ui64 begin = value.GetSubrangeBegin(); - const ui64 end = value.HasSubrangeEnd() ? value.GetSubrangeEnd() : blobId.BlobSize(); + for (const auto& value : arg.Value.Chain) { + const ui32 groupId = value.GroupId; + const auto& blobId = value.BlobId; + const ui32 begin = value.SubrangeBegin; + const ui32 end = value.SubrangeEnd; if (end <= begin || blobId.BlobSize() < end) { error = "incorrect SubrangeBegin/SubrangeEnd pair"; - STLOG(PRI_CRIT, BLOB_DEPOT_AGENT, BDA24, error, (AgentId, LogId), (TabletId, TabletId), - (Values, FormatList(arg.Values))); + STLOG(PRI_CRIT, BLOB_DEPOT_AGENT, BDA24, error, (AgentId, Agent.LogId), (QueryId, GetQueryId()), + (ReadId, arg.Tag), (Offset, arg.Offset), (Size, arg.Size), (Value, arg.Value)); return false; } @@ -90,14 +100,14 @@ namespace NKikimr::NBlobDepot { if (size) { error = "incorrect offset/size provided"; - STLOG(PRI_ERROR, BLOB_DEPOT_AGENT, BDA25, error, (AgentId, LogId), (TabletId, TabletId), - (Offset, arg.Offset), (Size, arg.Size), (Values, FormatList(arg.Values))); + STLOG(PRI_ERROR, BLOB_DEPOT_AGENT, BDA25, error, (AgentId, Agent.LogId), (QueryId, GetQueryId()), + (ReadId, arg.Tag), (Offset, arg.Offset), (Size, arg.Size), (Value, arg.Value)); return false; } - auto context = std::make_shared<TReadContext>(arg.Query, arg.Tag, outputOffset); + auto context = std::make_shared<TReadContext>(std::move(arg), outputOffset); if (!outputOffset) { - context->EndWithSuccess(); + context->EndWithSuccess(this); return true; } @@ -118,9 +128,15 @@ namespace NKikimr::NBlobDepot { partContext->Offsets.push_back(outputOffset); } - auto event = std::make_unique<TEvBlobStorage::TEvGet>(q, sz, TInstant::Max(), arg.GetHandleClass, arg.MustRestoreFirst); - event->ReaderTabletData = arg.ReaderTabletData; - SendToProxy(groupId, std::move(event), arg.Query, std::move(partContext)); + // when the TEvGet query is sent to the underlying proxy, MustRestoreFirst must be cleared, or else it may + // lead to ERROR due to impossibility of writes; all MustRestoreFirst should be handled by the TEvResolve + // query + auto event = std::make_unique<TEvBlobStorage::TEvGet>(q, sz, TInstant::Max(), context->ReadArg.GetHandleClass, + context->ReadArg.MustRestoreFirst && groupId != Agent.DecommitGroupId); + event->ReaderTabletData = context->ReadArg.ReaderTabletData; + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA39, "issuing TEvGet", (AgentId, Agent.LogId), (QueryId, GetQueryId()), + (ReadId, context->GetTag()), (GroupId, groupId), (Msg, *event)); + Agent.SendToProxy(groupId, std::move(event), this, std::move(partContext)); ++context->NumPartsPending; } @@ -129,35 +145,32 @@ namespace NKikimr::NBlobDepot { return true; } - TString TBlobDepotAgent::GetValueChainId(const TResolvedValueChain& valueChain) { - constexpr ui8 separator = 7; - TString s; - for (auto it = valueChain.begin(); it != valueChain.end(); ++it) { - if (it != valueChain.begin()) { - s += separator; - } - const bool success = it->AppendToString(&s); - Y_VERIFY_DEBUG(success); - } - return s; - } - - void TBlobDepotAgent::HandleGetResult(const TRequestContext::TPtr& context, TEvBlobStorage::TEvGetResult& msg) { + void TBlobDepotAgent::TQuery::HandleGetResult(const TRequestContext::TPtr& context, TEvBlobStorage::TEvGetResult& msg) { auto& partContext = context->Obtain<TReadContext::TPartContext>(); auto& readContext = *partContext.Read; + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA41, "HandleGetResult", (AgentId, Agent.LogId), (ReadId, readContext.GetTag()), + (Msg, msg)); if (readContext.Terminated) { return; // just ignore this read } if (msg.Status != NKikimrProto::OK) { - readContext.EndWithError(msg.Status, std::move(msg.ErrorReason)); + readContext.EndWithError(this, msg.Status, std::move(msg.ErrorReason)); } else { Y_VERIFY(msg.ResponseSz == partContext.Offsets.size()); for (ui32 i = 0; i < msg.ResponseSz; ++i) { auto& blob = msg.Responses[i]; - if (blob.Status != NKikimrProto::OK) { - return readContext.EndWithError(blob.Status, TStringBuilder() << "failed to read BlobId# " << blob.Id); + if (blob.Status == NKikimrProto::NODATA) { + NKikimrBlobDepot::TEvResolve resolve; + auto *item = resolve.AddItems(); + item->SetExactKey(readContext.ReadArg.Key); + Agent.Issue(std::move(resolve), this, partContext.Read); + readContext.Abort(); + readContext.BlobWithoutData = blob.Id; + return; + } else if (blob.Status != NKikimrProto::OK) { + return readContext.EndWithError(this, blob.Status, TStringBuilder() << "failed to read BlobId# " << blob.Id); } auto& buffer = readContext.Buffer; @@ -177,8 +190,38 @@ namespace NKikimr::NBlobDepot { } if (!--readContext.NumPartsPending) { - readContext.EndWithSuccess(); + readContext.EndWithSuccess(this); + } + } + } + + void TBlobDepotAgent::TQuery::HandleResolveResult(const TRequestContext::TPtr& context, TEvBlobDepot::TEvResolveResult& msg) { + auto& readContext = context->Obtain<TReadContext>(); + STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA42, "HandleResolveResult", (AgentId, Agent.LogId), (QueryId, GetQueryId()), + (ReadId, readContext.GetTag()), (Msg, msg.Record)); + if (msg.Record.GetStatus() != NKikimrProto::OK) { + readContext.EndWithError(this, msg.Record.GetStatus(), msg.Record.GetErrorReason()); + } else if (msg.Record.ResolvedKeysSize() == 1) { + const auto& item = msg.Record.GetResolvedKeys(0); + if (TResolvedValue value(item); value.Supersedes(readContext.ReadArg.Value)) { // value chain has changed, we have to try again + readContext.ReadArg.Value = std::move(value); + TString error; + if (!IssueRead(std::move(readContext.ReadArg), error)) { + readContext.EndWithError(this, NKikimrProto::ERROR, TStringBuilder() << "failed to restart read Error# " << error); + } + } else if (!item.GetReliablyWritten()) { // this was unassimilated value and we got NODATA for it + readContext.EndWithError(this, NKikimrProto::NODATA, {}); + } else { + Y_VERIFY_DEBUG_S(false, "data is lost AgentId# " << Agent.LogId << " QueryId# " << GetQueryId() + << " ReadId# " << readContext.GetTag() << " BlobId# " << readContext.BlobWithoutData); + STLOG(PRI_CRIT, BLOB_DEPOT_AGENT, BDA40, "failed to read blob -- data is lost", (AgentId, Agent.LogId), + (QueryId, GetQueryId()), (ReadId, readContext.GetTag()), (BlobId, readContext.BlobWithoutData)); + readContext.EndWithError(this, NKikimrProto::ERROR, TStringBuilder() << "failed to read BlobId# " + << readContext.BlobWithoutData << ": data is lost"); } + } else { + Y_VERIFY(!msg.Record.ResolvedKeysSize()); + readContext.EndWithError(this, NKikimrProto::NODATA, {}); } } diff --git a/ydb/core/blob_depot/agent/resolved_value.cpp b/ydb/core/blob_depot/agent/resolved_value.cpp new file mode 100644 index 0000000000..a4564c5d75 --- /dev/null +++ b/ydb/core/blob_depot/agent/resolved_value.cpp @@ -0,0 +1,65 @@ +#include "resolved_value.h" + +namespace NKikimr::NBlobDepot { + + TResolvedValue::TLink::TLink(const NKikimrBlobDepot::TResolvedValueChain& link) + : BlobId(LogoBlobIDFromLogoBlobID(link.GetBlobId())) + , GroupId(link.GetGroupId()) + , SubrangeBegin(link.GetSubrangeBegin()) + , SubrangeEnd(link.HasSubrangeEnd() ? link.GetSubrangeEnd() : BlobId.BlobSize()) + { + Y_VERIFY_DEBUG(link.HasBlobId() && link.HasGroupId()); + } + + void TResolvedValue::TLink::Output(IOutputStream& s) const { + s << BlobId << '@' << GroupId << '{' << SubrangeBegin << '-' << SubrangeEnd - 1 << '}'; + } + + TString TResolvedValue::TLink::ToString() const { + TStringStream s; + Output(s); + return s.Str(); + } + + TResolvedValue::TResolvedValue(const NKikimrBlobDepot::TEvResolveResult::TResolvedKey& item) + : Defined(true) + , ReliablyWritten(item.GetReliablyWritten()) + , Version(item.GetValueVersion()) + , Chain([](auto& x) { return decltype(Chain)(x.begin(), x.end()); }(item.GetValueChain())) + {} + + bool TResolvedValue::Supersedes(const TResolvedValue& old) const { + Y_VERIFY(Defined); + if (!old.Defined) { + return true; + } else if (Version < old.Version) { + return false; + } else if (Version == old.Version) { + Y_VERIFY(Chain == old.Chain && ReliablyWritten == old.ReliablyWritten); + return false; + } else { + Y_VERIFY(old.ReliablyWritten <= ReliablyWritten); // item can't suddenly become unreliably written + return true; + } + } + + void TResolvedValue::Output(IOutputStream& s) const { + if (Defined) { + s << '{' << FormatList(Chain) << " Version# " << Version << " ReliablyWritten# " << ReliablyWritten << '}'; + } else { + s << "{}"; + } + } + + TString TResolvedValue::ToString() const { + TStringStream s; + Output(s); + return s.Str(); + } + +} // NKikimr::NBlobDepot + +template<> +void Out<NKikimr::NBlobDepot::TResolvedValue::TLink>(IOutputStream& s, const NKikimr::NBlobDepot::TResolvedValue::TLink& x) { + x.Output(s); +} diff --git a/ydb/core/blob_depot/agent/resolved_value.h b/ydb/core/blob_depot/agent/resolved_value.h new file mode 100644 index 0000000000..a3194e65ec --- /dev/null +++ b/ydb/core/blob_depot/agent/resolved_value.h @@ -0,0 +1,47 @@ +#pragma once + +#include "defs.h" + +namespace NKikimr::NBlobDepot { + + struct TResolvedValue { + struct TLink { + TLogoBlobID BlobId; + ui32 GroupId; + ui32 SubrangeBegin; + ui32 SubrangeEnd; + + TLink(const NKikimrBlobDepot::TResolvedValueChain& link); + + void Output(IOutputStream& s) const; + TString ToString() const; + + friend bool operator ==(const TLink& x, const TLink& y) { + return x.BlobId == y.BlobId && x.GroupId == y.GroupId && x.SubrangeBegin == y.SubrangeBegin && + x.SubrangeEnd == y.SubrangeEnd; + } + }; + + bool Defined = false; + bool ReliablyWritten = false; + ui32 Version = 0; + std::vector<TLink> Chain; + + TResolvedValue() = default; + TResolvedValue(const TResolvedValue&) = default; + TResolvedValue(TResolvedValue&&) = default; + TResolvedValue(const NKikimrBlobDepot::TEvResolveResult::TResolvedKey& item); + + TResolvedValue& operator =(const TResolvedValue&) = default; + TResolvedValue& operator =(TResolvedValue&&) = default; + + bool Supersedes(const TResolvedValue& old) const; + void Output(IOutputStream& s) const; + TString ToString() const; + + bool IsEmpty() const { // check if no data attached + return Chain.empty(); + } + }; + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/agent/storage_discover.cpp b/ydb/core/blob_depot/agent/storage_discover.cpp index 27bfab0593..da5deaf459 100644 --- a/ydb/core/blob_depot/agent/storage_discover.cpp +++ b/ydb/core/blob_depot/agent/storage_discover.cpp @@ -16,10 +16,6 @@ namespace NKikimr::NBlobDepot { TString Buffer; ui32 BlockedGeneration = 0; - std::unordered_set<TString> ValueChainsWithNodata; - TString ValueChain; - bool IsUnassimilated = false; - NKikimrBlobDepot::TEvResolve Resolve; public: @@ -69,9 +65,13 @@ namespace NKikimr::NBlobDepot { if (std::holds_alternative<TTabletDisconnected>(response)) { return EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected"); } else if (auto *p = std::get_if<TEvBlobStorage::TEvGetResult*>(&response)) { - Agent.HandleGetResult(context, **p); + TQuery::HandleGetResult(context, **p); } else if (auto *p = std::get_if<TEvBlobDepot::TEvResolveResult*>(&response)) { - HandleResolveResult(id, std::move(context), **p); + if (context) { + TQuery::HandleResolveResult(std::move(context), **p); + } else { + HandleResolveResult(id, std::move(context), **p); + } } else { Y_FAIL(); } @@ -115,20 +115,18 @@ namespace NKikimr::NBlobDepot { // FIXME(alexvru): hypothetically this can be considered normal and we may continue scan return EndWithError(NKikimrProto::ERROR, TStringBuilder() << "empty ValueChain"); } - ValueChain = GetValueChainId(item.GetValueChain()); - IsUnassimilated = item.ValueChainSize() == 1 && item.GetValueChain(0).GetGroupId() == Agent.DecommitGroupId && - LogoBlobIDFromLogoBlobID(item.GetValueChain(0).GetBlobId()) == Id; TReadArg arg{ - item.GetValueChain(), + item, NKikimrBlobStorage::Discover, true, - this, 0, 0, 0, - {}}; + {}, + item.GetKey(), + }; TString error; - if (!Agent.IssueRead(arg, error)) { + if (!IssueRead(std::move(arg), error)) { return EndWithError(NKikimrProto::ERROR, TStringBuilder() << "failed to read discovered blob: " << error); } @@ -152,29 +150,26 @@ namespace NKikimr::NBlobDepot { STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA20, "OnRead", (AgentId, Agent.LogId), (QueryId, GetQueryId()), (Status, status)); - if (status == NKikimrProto::OK) { - Buffer = std::move(dataOrErrorReason); - DoneWithData = true; - CheckIfDone(); - } else if (status == NKikimrProto::NODATA) { - if (ValueChainsWithNodata.insert(std::exchange(ValueChain, {})).second) { - // this may indicate a data race between locator and key value, we have to restart our resolution query - IssueResolve(); - } else if (IsUnassimilated) { + switch (status) { + case NKikimrProto::OK: + Buffer = std::move(dataOrErrorReason); + DoneWithData = true; + CheckIfDone(); + break; + + case NKikimrProto::NODATA: { // we are reading blob from the original group and it may be partially written -- it is totally // okay to have some; we need to advance to the next readable blob auto *range = Resolve.MutableItems(0)->MutableKeyRange(); range->SetEndingKey(Id.AsBinaryString()); range->ClearIncludeEnding(); IssueResolve(); - } else { - Y_VERIFY_DEBUG_S(false, "data is lost AgentId# " << Agent.LogId << " BlobId# " << Id); - STLOG(PRI_CRIT, BLOB_DEPOT_AGENT, BDA39, "failed to Discover blob -- data is lost", - (AgentId, Agent.LogId), (BlobId, Id)); - status = NKikimrProto::ERROR; + break; } - } else { - EndWithError(status, dataOrErrorReason); + + default: + EndWithError(status, std::move(dataOrErrorReason)); + break; } } diff --git a/ydb/core/blob_depot/agent/storage_get.cpp b/ydb/core/blob_depot/agent/storage_get.cpp index a923a5ec53..4e97c34738 100644 --- a/ydb/core/blob_depot/agent/storage_get.cpp +++ b/ydb/core/blob_depot/agent/storage_get.cpp @@ -9,8 +9,6 @@ namespace NKikimr::NBlobDepot { class TGetQuery : public TBlobStorageQuery<TEvBlobStorage::TEvGet> { std::unique_ptr<TEvBlobStorage::TEvGetResult> Response; ui32 AnswersRemain; - std::vector<TString> ValueChainsInFlight; - std::unordered_set<std::tuple<ui64, TString>> ValueChainsWithNodata; struct TResolveKeyContext : TRequestContext { ui32 QueryIdx; @@ -35,7 +33,6 @@ namespace NKikimr::NBlobDepot { Response = std::make_unique<TEvBlobStorage::TEvGetResult>(NKikimrProto::OK, Request.QuerySize, Agent.VirtualGroupId); AnswersRemain = Request.QuerySize; - ValueChainsInFlight.resize(Request.QuerySize); if (Request.ReaderTabletData) { auto status = Agent.BlocksManager.CheckBlockForTablet(Request.ReaderTabletData->Id, Request.ReaderTabletData->Generation, this, nullptr); @@ -54,9 +51,9 @@ namespace NKikimr::NBlobDepot { response.RequestedSize = query.Size; TString blobId = query.Id.AsBinaryString(); - if (const TResolvedValueChain *value = Agent.BlobMappingCache.ResolveKey(blobId, this, - std::make_shared<TResolveKeyContext>(i))) { - if (!ProcessSingleResult(i, value, std::nullopt)) { + if (const TResolvedValue *value = Agent.BlobMappingCache.ResolveKey(blobId, this, + std::make_shared<TResolveKeyContext>(i), Request.MustRestoreFirst)) { + if (!ProcessSingleResult(i, value)) { return; // error occured } } else { @@ -68,52 +65,35 @@ namespace NKikimr::NBlobDepot { CheckAndFinish(); } - bool ProcessSingleResult(ui32 queryIdx, const TResolvedValueChain *value, const std::optional<TString>& errorReason) { + bool ProcessSingleResult(ui32 queryIdx, const TKeyResolved& result) { STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA27, "ProcessSingleResult", (AgentId, Agent.LogId), - (QueryId, GetQueryId()), (QueryIdx, queryIdx), (Value, value), (ErrorReason, errorReason)); + (QueryId, GetQueryId()), (QueryIdx, queryIdx), (Result, result)); auto& r = Response->Responses[queryIdx]; Y_VERIFY(r.Status == NKikimrProto::UNKNOWN); - if (errorReason) { + if (result.Error()) { r.Status = NKikimrProto::ERROR; --AnswersRemain; - } else if (!value || value->empty()) { + } else if (const TResolvedValue *value = result.GetResolvedValue(); !value) { r.Status = NKikimrProto::NODATA; --AnswersRemain; } else if (Request.IsIndexOnly) { r.Status = NKikimrProto::OK; --AnswersRemain; } else { - ValueChainsInFlight[queryIdx] = GetValueChainId(*value); + Y_VERIFY(Request.MustRestoreFirst <= value->ReliablyWritten); TReadArg arg{ *value, Request.GetHandleClass, Request.MustRestoreFirst, - this, Request.Queries[queryIdx].Shift, Request.Queries[queryIdx].Size, queryIdx, - Request.ReaderTabletData}; - TString error; - auto makeValueChain = [&] { - TStringStream str; - str << '['; - for (int i = 0; i < value->size(); ++i) { - const auto& item = value->at(i); - if (i != 0) { - str << ' '; - } - const auto blobId = LogoBlobIDFromLogoBlobID(item.GetBlobId()); - const ui64 subrangeBegin = item.GetSubrangeBegin(); - const ui64 subrangeEnd = item.HasSubrangeEnd() ? item.GetSubrangeEnd() : blobId.BlobSize(); - str << blobId << '@' << item.GetGroupId() << '{' << subrangeBegin << '-' << (subrangeEnd - 1) << '}'; - } - str << ']'; - return str.Str(); + Request.ReaderTabletData, + Request.Queries[queryIdx].Id.AsBinaryString(), }; - STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA34, "IssueRead", (AgentId, Agent.LogId), - (Offset, arg.Offset), (Size, arg.Size), (ValueChain, makeValueChain()), (Tag, arg.Tag)); - const bool success = Agent.IssueRead(arg, error); + TString error; + const bool success = IssueRead(std::move(arg), error); if (!success) { EndWithError(NKikimrProto::ERROR, std::move(error)); return false; @@ -127,24 +107,6 @@ namespace NKikimr::NBlobDepot { (Tag, tag), (Status, status), (Buffer.size, status == NKikimrProto::OK ? buffer.size() : 0), (ErrorReason, status != NKikimrProto::OK ? buffer : "")); - if (status == NKikimrProto::NODATA) { // we have to retry this read, this may be a race between blob movement - const auto& q = Request.Queries[tag]; - if (ValueChainsWithNodata.emplace(tag, std::exchange(ValueChainsInFlight[tag], {})).second) { // real race - const TResolvedValueChain *value = Agent.BlobMappingCache.ResolveKey( - q.Id.AsBinaryString(), - this, - std::make_shared<TResolveKeyContext>(tag), - true); - Y_VERIFY(!value); - return; - } else { - Y_VERIFY_DEBUG_S(false, "data is lost AgentId# " << Agent.LogId << " BlobId# " << q.Id); - STLOG(PRI_CRIT, BLOB_DEPOT_AGENT, BDA41, "failed to Get blob -- data is lost", - (AgentId, Agent.LogId), (BlobId, q.Id)); - status = NKikimrProto::ERROR; - } - } - auto& resp = Response->Responses[tag]; Y_VERIFY(resp.Status == NKikimrProto::UNKNOWN); resp.Status = status; @@ -195,10 +157,12 @@ namespace NKikimr::NBlobDepot { void ProcessResponse(ui64 /*id*/, TRequestContext::TPtr context, TResponse response) override { if (auto *p = std::get_if<TKeyResolved>(&response)) { - ProcessSingleResult(context->Obtain<TResolveKeyContext>().QueryIdx, p->ValueChain, p->ErrorReason); + ProcessSingleResult(context->Obtain<TResolveKeyContext>().QueryIdx, *p); CheckAndFinish(); } else if (auto *p = std::get_if<TEvBlobStorage::TEvGetResult*>(&response)) { - Agent.HandleGetResult(context, **p); + TQuery::HandleGetResult(context, **p); + } else if (auto *p = std::get_if<TEvBlobDepot::TEvResolveResult*>(&response)) { + TQuery::HandleResolveResult(context, **p); } else if (std::holds_alternative<TTabletDisconnected>(response)) { if (auto *resolveContext = dynamic_cast<TResolveKeyContext*>(context.get())) { STLOG(PRI_DEBUG, BLOB_DEPOT_AGENT, BDA26, "TTabletDisconnected", (AgentId, Agent.LogId), diff --git a/ydb/core/blob_depot/agent/storage_range.cpp b/ydb/core/blob_depot/agent/storage_range.cpp index 719a09f757..70e50a56f1 100644 --- a/ydb/core/blob_depot/agent/storage_range.cpp +++ b/ydb/core/blob_depot/agent/storage_range.cpp @@ -7,7 +7,6 @@ namespace NKikimr::NBlobDepot { class TRangeQuery : public TBlobStorageQuery<TEvBlobStorage::TEvRange> { struct TRead { TLogoBlobID Id; - TString ValueChain; }; std::unique_ptr<TEvBlobStorage::TEvRangeResult> Response; @@ -15,7 +14,6 @@ namespace NKikimr::NBlobDepot { ui32 ResolvesInFlight = 0; std::map<TLogoBlobID, TString> FoundBlobs; std::vector<TRead> Reads; - std::unordered_set<std::tuple<ui64, TString>> ValueChainsWithNodata; // processed value chains with this status bool Reverse = false; bool Finished = false; @@ -68,9 +66,13 @@ namespace NKikimr::NBlobDepot { void ProcessResponse(ui64 id, TRequestContext::TPtr context, TResponse response) override { if (auto *p = std::get_if<TEvBlobDepot::TEvResolveResult*>(&response)) { - HandleResolveResult(id, std::move(context), (*p)->Record); + if (context) { + TQuery::HandleResolveResult(std::move(context), **p); + } else { + HandleResolveResult(id, std::move(context), (*p)->Record); + } } else if (auto *p = std::get_if<TEvBlobStorage::TEvGetResult*>(&response)) { - Agent.HandleGetResult(context, **p); + TQuery::HandleGetResult(context, **p); } else if (std::holds_alternative<TTabletDisconnected>(response)) { EndWithError(NKikimrProto::ERROR, "BlobDepot tablet disconnected"); } else { @@ -96,25 +98,24 @@ namespace NKikimr::NBlobDepot { FoundBlobs.try_emplace(id); } else if (key.ValueChainSize()) { const ui64 tag = key.HasCookie() ? key.GetCookie() : Reads.size(); - TString valueChain = GetValueChainId(key.GetValueChain()); if (tag == Reads.size()) { - Reads.push_back(TRead{id, std::move(valueChain)}); + Reads.push_back(TRead{id}); } else { Y_VERIFY(Reads[tag].Id == id); - Reads[tag].ValueChain = std::move(valueChain); } TReadArg arg{ - key.GetValueChain(), + key, NKikimrBlobStorage::EGetHandleClass::FastRead, Request.MustRestoreFirst, - this, 0, 0, tag, - {}}; + {}, + key.GetKey(), + }; ++ReadsInFlight; TString error; - if (!Agent.IssueRead(arg, error)) { + if (!IssueRead(std::move(arg), error)) { return EndWithError(NKikimrProto::ERROR, TStringBuilder() << "failed to read discovered blob: " << error); } @@ -133,7 +134,6 @@ namespace NKikimr::NBlobDepot { Y_VERIFY(tag < Reads.size()); TRead& read = Reads[tag]; - Y_VERIFY(read.ValueChain); switch (status) { case NKikimrProto::OK: { @@ -144,15 +144,8 @@ namespace NKikimr::NBlobDepot { } case NKikimrProto::NODATA: - if (ValueChainsWithNodata.emplace(tag, std::exchange(read.ValueChain, {})).second) { // real race - IssueResolve(tag); - } else { - Y_VERIFY_DEBUG_S(false, "data is lost AgentId# " << Agent.LogId << " BlobId# " << read.Id); - STLOG(PRI_CRIT, BLOB_DEPOT_AGENT, BDA40, "failed to ReadRange blob -- data is lost", - (AgentId, Agent.LogId), (BlobId, read.Id)); - return EndWithError(status, TStringBuilder() << "failed to retrieve BlobId# " - << read.Id << " data is lost"); - } + // this blob has just vanished since we found it in index -- may be it was partially written and + // now gone; it's okay to have this situation, not a data loss break; default: diff --git a/ydb/core/blob_depot/assimilator.cpp b/ydb/core/blob_depot/assimilator.cpp index 2a31d58e96..41e91e49e2 100644 --- a/ydb/core/blob_depot/assimilator.cpp +++ b/ydb/core/blob_depot/assimilator.cpp @@ -314,8 +314,8 @@ namespace NKikimr::NBlobDepot { } }; - Self->Data->ScanRange(LastScannedKey ? std::make_optional<TData::TKey>(*LastScannedKey) : std::nullopt, - std::optional<TData::TKey>(), {}, callback); + TData::TScanRange r{LastScannedKey ? TData::TKey(*LastScannedKey) : TData::TKey::Min(), TData::TKey::Max()}; + Self->Data->ScanRange(r, nullptr, nullptr, callback); STLOG(PRI_DEBUG, BLOB_DEPOT, BDT56, "ScanDataForCopying step", (Id, Self->GetLogId()), (LastScannedKey, LastScannedKey), (ScanQ.size, ScanQ.size()), (TotalSize, TotalSize), diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h index d2efc2bff7..c13e342ab2 100644 --- a/ydb/core/blob_depot/blob_depot_tablet.h +++ b/ydb/core/blob_depot/blob_depot_tablet.h @@ -15,6 +15,8 @@ namespace NKikimr::NBlobDepot { using NTabletFlatExecutor::TTabletExecutedFlat; + struct TToken {}; + class TBlobDepot : public TActor<TBlobDepot> , public TTabletExecutedFlat @@ -50,7 +52,6 @@ namespace NKikimr::NBlobDepot { static constexpr TDuration ExpirationTimeout = TDuration::Minutes(1); - struct TToken {}; std::shared_ptr<TToken> Token = std::make_shared<TToken>(); struct TAgent { diff --git a/ydb/core/blob_depot/closed_interval_set.h b/ydb/core/blob_depot/closed_interval_set.h index df28dfaadb..b8338b37fc 100644 --- a/ydb/core/blob_depot/closed_interval_set.h +++ b/ydb/core/blob_depot/closed_interval_set.h @@ -102,6 +102,37 @@ namespace NKikimr { return *this; } + static std::optional<std::pair<T, T>> PartialSubtractFromRange(T myLeft, T myRight, const TClosedIntervalSet& other) { + for (auto otherIt = other.Intervals.begin(); otherIt != other.Intervals.end(); ) { + if (myRight < otherIt->Left) { + break; + } else if (otherIt->Right < myLeft) { + ++otherIt; + if (otherIt != other.Intervals.end() && otherIt->Right < myLeft) { + otherIt = other.Intervals.lower_bound(TByRight{myLeft}); + } + } else if (otherIt->Left <= myLeft) { + if (myRight <= otherIt->Right) { + return std::nullopt; + } else { + myLeft = otherIt->Right; + ++otherIt; + } + } else if (myRight <= otherIt->Right) { + myRight = otherIt->Left; + break; + } else { + if (otherIt->Left < otherIt->Right) { + myRight = otherIt->Left; + break; + } + ++otherIt; + } + } + + return std::make_pair(std::move(myLeft), std::move(myRight)); + } + // returns the first subrange of the full subtraction result std::optional<std::pair<T, T>> PartialSubtract(const TClosedIntervalSet& other) const { if (auto myIt = Intervals.begin(); myIt != Intervals.end()) { @@ -161,6 +192,79 @@ namespace NKikimr { } return true; } + + template<typename TCallback> + void EnumInRange(const T& left, const T& right, bool reverse, TCallback&& callback) const { + if (reverse) { + const T *cursor = &right; + for (auto it = Intervals.upper_bound(TByLeft{right}); it != Intervals.begin(); ) { + --it; + if (it->Right < *cursor) { + if (it->Right < left) { + callback(left, *cursor, false); + return; + } + if (!callback(it->Right, *cursor, false)) { + return; + } + cursor = &it->Right; + } + if (it->Left <= left) { + callback(left, *cursor, true); + return; + } + if (!callback(it->Left, *cursor, true)) { + return; + } + cursor = &it->Left; + } + if (left < *cursor) { + callback(left, *cursor, false); + } + } else { + const T *cursor = &left; + for (auto it = Intervals.lower_bound(TByRight{left}); it != Intervals.end(); ++it) { + if (*cursor < it->Left) { + if (right < it->Left) { + callback(*cursor, right, false); + return; + } + if (!callback(*cursor, it->Left, false)) { + return; + } + cursor = &it->Left; + } + if (right <= it->Right) { + callback(*cursor, right, true); + return; + } + if (!callback(*cursor, it->Right, true)) { + return; + } + cursor = &it->Right; + } + if (*cursor < right) { + callback(*cursor, right, false); + } + } + } + + void Output(IOutputStream& s) const { + s << '{'; + for (auto it = Intervals.begin(); it != Intervals.end(); ++it) { + if (it != Intervals.begin()) { + s << ' '; + } + s << it->Left << '-' << it->Right; + } + s << '}'; + } + + TString ToString() const { + TStringStream s; + Output(s); + return s.Str(); + } }; } // NKikimr diff --git a/ydb/core/blob_depot/closed_interval_set_ut.cpp b/ydb/core/blob_depot/closed_interval_set_ut.cpp index cb6d70a555..484074efa7 100644 --- a/ydb/core/blob_depot/closed_interval_set_ut.cpp +++ b/ydb/core/blob_depot/closed_interval_set_ut.cpp @@ -19,7 +19,7 @@ TString ToString(const T& ivs) { ui64 Convert(const T& ivs) { ui64 res = 0; ivs([&](ui8 first, ui8 last) { - const ui64 mask = ((ui64(1) << (last - first + 1)) - 1) << first; + const ui64 mask = (ui64(1) << last + 1) - (ui64(1) << first); UNIT_ASSERT_VALUES_EQUAL_C(res & mask, 0, ToString(ivs)); res |= mask; return true; @@ -52,7 +52,7 @@ Y_UNIT_TEST_SUITE(ClosedIntervalSet) { for (ui32 end = begin; end <= 12; ++end) { T x = Make(i); x |= {begin, end}; - UNIT_ASSERT_VALUES_EQUAL(Convert(x), i | ((1 << (end - begin + 1)) - 1) << begin); + UNIT_ASSERT_VALUES_EQUAL(Convert(x), i | ((1 << end + 1) - (1 << begin))); } } } @@ -103,4 +103,52 @@ Y_UNIT_TEST_SUITE(ClosedIntervalSet) { } } + Y_UNIT_TEST(EnumInRange) { + for (ui32 i = 0; i < 4096; ++i) { + T x = Make(i); + for (ui32 j = 0; j <= 12; ++j) { + for (ui32 k = j; k <= 12; ++k) { + ui8 expectedLeft = j; + ui32 res = 0; + x.EnumInRange(j, k, false, [&](ui8 left, ui8 right, bool inside) { + UNIT_ASSERT_VALUES_EQUAL(left, expectedLeft); + UNIT_ASSERT(left <= right); + expectedLeft = right; + if (inside) { + const ui32 mask = (1 << right + 1) - (1 << left); + res |= mask; + } + return true; + }); + UNIT_ASSERT_VALUES_EQUAL(expectedLeft, k); + UNIT_ASSERT_VALUES_EQUAL(res, i & ((1 << k + 1) - (1 << j))); + } + } + } + } + + Y_UNIT_TEST(EnumInRangeReverse) { + for (ui32 i = 0; i < 4096; ++i) { + T x = Make(i); + for (ui32 j = 0; j <= 12; ++j) { + for (ui32 k = j; k <= 12; ++k) { + ui8 expectedRight = k; + ui32 res = 0; + x.EnumInRange(j, k, true, [&](ui8 left, ui8 right, bool inside) { + UNIT_ASSERT_VALUES_EQUAL(right, expectedRight); + UNIT_ASSERT(left <= right); + expectedRight = left; + if (inside) { + const ui32 mask = (1 << right + 1) - (1 << left); + res |= mask; + } + return true; + }); + UNIT_ASSERT_VALUES_EQUAL(expectedRight, j); + UNIT_ASSERT_VALUES_EQUAL(res, i & ((1 << k + 1) - (1 << j))); + } + } + } + } + } diff --git a/ydb/core/blob_depot/coro_tx.cpp b/ydb/core/blob_depot/coro_tx.cpp new file mode 100644 index 0000000000..319bcb8573 --- /dev/null +++ b/ydb/core/blob_depot/coro_tx.cpp @@ -0,0 +1,200 @@ +#include "coro_tx.h" + +namespace NKikimr::NBlobDepot { + + thread_local TCoroTx *TCoroTx::Current = nullptr; + + enum class EOutcome { + UNSET, + FINISH_TX, + RESTART_TX, + RUN_SUCCESSOR_TX, + END_CORO + }; + +#ifndef NDEBUG + static constexpr ui64 StackSentinel = 0x8E0CDBFD41F04520; + static constexpr size_t NumStackSentinels = 8; +#endif + + class TCoroTx::TContext : public ITrampoLine { + TMappedAllocation Stack; + TExceptionSafeContext Context; + TExceptionSafeContext *BackContext = nullptr; + + EOutcome Outcome = EOutcome::UNSET; + + std::weak_ptr<TToken> Token; + std::function<void()> Body; + + bool Finished = false; + + public: + TContext(const std::weak_ptr<TToken>& token, std::function<void()>&& body) + : Stack(65536) + , Context({this, TArrayRef(Stack.Begin(), Stack.End())}) + , Token(token) + , Body(std::move(body)) + { +#ifndef NDEBUG + char *p; +# if STACK_GROW_DOWN + p = Stack.Begin(); +# else + p = Stack.End() - sizeof(StackSentinel) * NumStackSentinels; +# endif + for (size_t i = 0; i < NumStackSentinels; ++i) { + memcpy(p + i * sizeof(StackSentinel), &StackSentinel, sizeof(StackSentinel)); + } +#endif + } + + ~TContext() { + if (!Finished) { + Finished = true; + Resume(); + } + } + + EOutcome Resume() { + Outcome = EOutcome::UNSET; + + TExceptionSafeContext returnContext; + Y_VERIFY(!BackContext); + BackContext = &returnContext; + returnContext.SwitchTo(&Context); + Y_VERIFY(BackContext == &returnContext); + BackContext = nullptr; + + // validate stack +#ifndef NDEBUG + char *p; +# if STACK_GROW_DOWN + p = Stack.Begin(); +# else + p = Stack.End() - sizeof(StackSentinel) * NumStackSentinels; +# endif + for (size_t i = 0; i < NumStackSentinels; ++i) { + ui64 temp; + memcpy(&temp, p + i * sizeof(StackSentinel), sizeof(StackSentinel)); + Y_VERIFY(StackSentinel == temp); + } +#endif + + Y_VERIFY(Outcome != EOutcome::UNSET); + return Outcome; + } + + void Return(EOutcome outcome) { + Y_VERIFY(Outcome == EOutcome::UNSET); + Outcome = outcome; + Y_VERIFY(BackContext); + Context.SwitchTo(BackContext); + if (Token.expired() || Finished) { + throw TExDead(); + } + } + + private: + void DoRun() override { + if (!Token.expired()) { + try { + Body(); + } catch (const TExDead&) { + // just do nothing + } + } + Finished = true; + Return(EOutcome::END_CORO); + } + }; + + TCoroTx::TCoroTx(TBlobDepot *self, const std::weak_ptr<TToken>& token, std::function<void()> body) + : TTransactionBase(self) + , Context(std::make_unique<TContext>(token, std::move(body))) + {} + + TCoroTx::TCoroTx(TCoroTx& predecessor) + : TTransactionBase(predecessor.Self) + , Context(std::move(predecessor.Context)) + {} + + TCoroTx::~TCoroTx() + {} + + bool TCoroTx::Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext&) { + // prepare environment + Y_VERIFY(TxContext == nullptr && Current == nullptr); + TxContext = &txc; + Current = this; + + Y_VERIFY(Context); + const EOutcome outcome = Context->Resume(); + + // clear environment back + Y_VERIFY(TxContext == &txc && Current == this); + TxContext = nullptr; + Current = nullptr; + + switch (outcome) { + case EOutcome::FINISH_TX: + return true; + + case EOutcome::RESTART_TX: + return false; + + default: + Y_FAIL(); + } + } + + void TCoroTx::Complete(const TActorContext&) { + // prepare environment + Y_VERIFY(TxContext == nullptr && Current == nullptr); + Current = this; + + Y_VERIFY(Context); + const EOutcome outcome = Context->Resume(); + + // clear environment back + Y_VERIFY(TxContext == nullptr && Current == this); + Current = nullptr; + + switch (outcome) { + case EOutcome::RUN_SUCCESSOR_TX: + Self->Execute(std::make_unique<TCoroTx>(*this)); + break; + + case EOutcome::END_CORO: + break; + + default: + Y_FAIL(); + } + } + + TCoroTx *TCoroTx::CurrentTx() { + return Current; + } + + NTabletFlatExecutor::TTransactionContext *TCoroTx::GetTxc() { + Y_VERIFY(Current->TxContext); + return Current->TxContext; + } + + void TCoroTx::FinishTx() { + Y_VERIFY(Current); + Current->Context->Return(EOutcome::FINISH_TX); + } + + void TCoroTx::RestartTx() { + Y_VERIFY(Current); + Current->Context->Return(EOutcome::RESTART_TX); + } + + void TCoroTx::RunSuccessorTx() { + Y_VERIFY(Current); + Current->Context->Return(EOutcome::RUN_SUCCESSOR_TX); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/coro_tx.h b/ydb/core/blob_depot/coro_tx.h new file mode 100644 index 0000000000..f731ff88dd --- /dev/null +++ b/ydb/core/blob_depot/coro_tx.h @@ -0,0 +1,33 @@ +#pragma once + +#include "defs.h" +#include "blob_depot_tablet.h" + +namespace NKikimr::NBlobDepot { + + struct TExDead {}; + + class TCoroTx : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { + class TContext; + std::unique_ptr<TContext> Context; + TTransactionContext *TxContext = nullptr; + static thread_local TCoroTx *Current; + + public: + TCoroTx(TBlobDepot *self, const std::weak_ptr<TToken>& token, std::function<void()> body); + TCoroTx(TCoroTx& predecessor); + ~TCoroTx(); + + private: + bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext&) override; + void Complete(const TActorContext&) override; + + public: + static NTabletFlatExecutor::TTransactionContext *GetTxc(); + static TCoroTx *CurrentTx(); // obtain pointer to current tx + static void FinishTx(); // finish this transaction; function returns on Complete() entry + static void RestartTx(); // restart transaction; function returns on next Execute() entry + static void RunSuccessorTx(); // restart in new transaction -- called after FinishTx() + }; + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/data.cpp b/ydb/core/blob_depot/data.cpp index 52edebf5f0..0ce533696a 100644 --- a/ydb/core/blob_depot/data.cpp +++ b/ydb/core/blob_depot/data.cpp @@ -110,7 +110,7 @@ namespace NKikimr::NBlobDepot { #ifndef NDEBUG Y_VERIFY(outcome != EUpdateOutcome::NO_CHANGE || !value.Changed(originalValue)); - Y_VERIFY(value.ValueVersion == originalValue.ValueVersion + 1 || IsSameValueChain(value.ValueChain, originalValue.ValueChain)); + Y_VERIFY(inserted || value.ValueVersion == originalValue.ValueVersion + 1 || IsSameValueChain(value.ValueChain, originalValue.ValueChain)); #endif if ((underSoft && value.KeepState != EKeepState::Keep) || underHard) { @@ -329,7 +329,7 @@ namespace NKikimr::NBlobDepot { return it->second; } - void TData::AddDataOnLoad(TKey key, TString value, bool uncertainWrite) { + TData::TValue *TData::AddDataOnLoad(TKey key, TString value, bool uncertainWrite) { Y_VERIFY_S(!IsKeyLoaded(key), "Id# " << Self->GetLogId() << " Key# " << key.ToString()); NKikimrBlobDepot::TValue proto; @@ -351,6 +351,8 @@ namespace NKikimr::NBlobDepot { } ValidateRecords(); + + return &it->second; } bool TData::AddDataOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlob& blob, @@ -491,11 +493,14 @@ namespace NKikimr::NBlobDepot { const TData::TKey last(TLogoBlobID(tabletId, current.Generation(), current.Step(), channel, TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie, TLogoBlobID::MaxPartId, TLogoBlobID::MaxCrcMode)); + // find keys we have to delete bool finished = true; - Self->Data->ScanRange(&first, &last, TData::EScanFlags::INCLUDE_END, [&](auto& key, auto& value) { + TScanRange r{first, last, TData::EScanFlags::INCLUDE_END}; + std::vector<TKey> keysToDelete; + Self->Data->ScanRange(r, nullptr, nullptr, [&](auto& key, auto& value) { if (value.KeepState != EKeepState::Keep || hard) { if (maxItems) { - Self->Data->DeleteKey(key, txc, cookie); + keysToDelete.push_back(key); --maxItems; } else { finished = false; @@ -505,6 +510,11 @@ namespace NKikimr::NBlobDepot { return true; }); + // delete selected keys + for (const TKey& key : keysToDelete) { + DeleteKey(key, txc, cookie); + } + return finished; } @@ -664,3 +674,8 @@ namespace NKikimr::NBlobDepot { } } // NKikimr::NBlobDepot + +template<> +void Out<NKikimr::NBlobDepot::TBlobDepot::TData::TKey>(IOutputStream& s, const NKikimr::NBlobDepot::TBlobDepot::TData::TKey& x) { + x.Output(s); +} diff --git a/ydb/core/blob_depot/data.h b/ydb/core/blob_depot/data.h index 3475175645..7231cc8226 100644 --- a/ydb/core/blob_depot/data.h +++ b/ydb/core/blob_depot/data.h @@ -372,6 +372,15 @@ namespace NKikimr::NBlobDepot { Y_DECLARE_FLAGS(TScanFlags, EScanFlags) + struct TScanRange { + TKey Begin; + TKey End; + TScanFlags Flags = {}; + ui64 MaxKeys = 0; + ui32 PrechargeRows = 0; + ui64 PrechargeBytes = 0; + }; + private: struct TRecordWithTrash {}; @@ -413,13 +422,6 @@ namespace NKikimr::NBlobDepot { THashMultiMap<void*, TLogoBlobID> InFlightTrash; // being committed, but not yet confirmed - struct TResolveDecommitContext { - TEvBlobDepot::TEvResolve::TPtr Ev; // original resolve request - TActorId ReturnAfterLoadingKeys; - std::deque<TEvBlobStorage::TEvAssimilateResult::TBlob> DecommitBlobs = {}; - std::vector<TKey> ResolutionErrors = {}; - }; - class TTxIssueGC; class TTxConfirmGC; @@ -442,39 +444,157 @@ namespace NKikimr::NBlobDepot { ui64 LastCollectCmdId = 0; std::unordered_map<ui64, TCollectCmd> CollectCmds; + struct TLoadRangeFromDB { + TData* const Data; + const TScanRange& Range; + bool* const Progress; + bool Processing = true; + std::optional<TKey> LastProcessedKey = {}; + + static constexpr struct TReverse {} Reverse{}; + static constexpr struct TLeftBound {} LeftBound{}; + static constexpr struct TRightBound {} RightBound{}; + + template<typename TCallback> + bool operator ()(NTabletFlatExecutor::TTransactionContext& txc, const TKey& left, const TKey& right, TCallback&& callback) { + auto table = NIceDb::TNiceDb(txc.DB).Table<Schema::Data>(); + return Range.Flags & EScanFlags::REVERSE + ? Load(Reverse, table.Reverse(), left, right, std::forward<TCallback>(callback)) + : Load(Reverse, std::move(table), left, right, std::forward<TCallback>(callback)); + } + + template<typename TTable, typename TCallback> + bool Load(TReverse, TTable&& table, const TKey& left, const TKey& right, TCallback&& callback) { + return left != TKey::Min() + ? Load(LeftBound, table.GreaterOrEqual(left.MakeBinaryKey()), left, right, std::forward<TCallback>(callback)) + : right != TKey::Max() + ? Load(RightBound, table.LessOrEqual(right.MakeBinaryKey()), left, right, std::forward<TCallback>(callback)) + : Load(RightBound, table.All(), left, right, std::forward<TCallback>(callback)); + } + + template<typename TTable, typename TCallback> + bool Load(TLeftBound, TTable&& table, const TKey& left, const TKey& right, TCallback&& callback) { + return right != TKey::Max() + ? Load(RightBound, table.LessOrEqual(right.MakeBinaryKey()), left, right, std::forward<TCallback>(callback)) + : Load(RightBound, std::forward<TTable>(table), left, right, std::forward<TCallback>(callback)); + } + + template<typename TTable, typename TCallback> + bool Load(TRightBound, TTable&& table, const TKey& left, const TKey& right, TCallback&& callback) { + if ((Range.PrechargeRows || Range.PrechargeBytes) && !table.Precharge(Range.PrechargeRows, Range.PrechargeBytes)) { + return false; + } + auto rowset = table.Select(); + if (!rowset.IsReady()) { + return false; + } + while (rowset.IsValid()) { + TKey key = TKey::FromBinaryKey(rowset.GetKey(), Data->Self->Config); + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT46, "ScanRange.Load", (Id, Data->Self->GetLogId()), (Left, left), + (Right, right), (Key, key)); + LastProcessedKey.emplace(key); + if (left < key && key < right) { + TValue* const value = Data->AddDataOnLoad(key, rowset.template GetValue<Schema::Data::Value>(), + rowset.template GetValueOrDefault<Schema::Data::UncertainWrite>()); + if (Processing) { + // we should not feed keys out of range when we are processing prefetched data outside the range + if (Range.Flags & EScanFlags::REVERSE) { + Processing = Range.Flags & EScanFlags::INCLUDE_BEGIN ? Range.Begin <= key : Range.Begin < key; + } else { + Processing = Range.Flags & EScanFlags::INCLUDE_END ? key <= Range.End : key < Range.End; + } + } + Processing = Processing && callback(std::move(key), *value); + *Progress = true; + } else { + Y_VERIFY_DEBUG(key == left || key == right); + } + if (!rowset.Next()) { + return false; // we break iteration anyway, because we can't read more data + } + } + if (!LastProcessedKey || (Range.Flags & EScanFlags::REVERSE ? left < *LastProcessedKey : *LastProcessedKey < right)) { + LastProcessedKey.emplace(Range.Flags & EScanFlags::REVERSE ? left : right); + } + return Processing; + }; + }; + public: TData(TBlobDepot *self); ~TData(); - template<typename TCallback, typename T> - bool ScanRange(const T& begin, const T& end, TScanFlags flags, TCallback&& callback) { - auto beginIt = !begin ? Data.begin() - : flags & EScanFlags::INCLUDE_BEGIN ? Data.lower_bound(*begin) - : Data.upper_bound(*begin); - - auto endIt = !end ? Data.end() - : flags & EScanFlags::INCLUDE_END ? Data.upper_bound(*end) - : Data.lower_bound(*end); - - if (flags & EScanFlags::REVERSE) { - if (beginIt != endIt) { - --endIt; - do { - auto& current = *endIt--; - if (!callback(current.first, current.second)) { - return false; - } - } while (beginIt != endIt); + template<typename TCallback> + bool ScanRange(TScanRange& range, NTabletFlatExecutor::TTransactionContext *txc, bool *progress, TCallback&& callback) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT76, "ScanRange", (Id, Self->GetLogId()), (Begin, range.Begin), (End, range.End), + (Flags, range.Flags), (MaxKeys, range.MaxKeys)); + + const bool reverse = range.Flags & EScanFlags::REVERSE; + TLoadRangeFromDB loader{this, range, progress}; + + bool res = true; + + auto issue = [&](TKey&& key, const TValue& value) { + Y_VERIFY_DEBUG(range.Flags & EScanFlags::INCLUDE_BEGIN ? range.Begin <= key : range.Begin < key); + Y_VERIFY_DEBUG(range.Flags & EScanFlags::INCLUDE_END ? key <= range.End : key < range.End); + + if (!callback(key, value) || (range.MaxKeys && !--range.MaxKeys)) { + return false; // scan aborted by user or finished scanning the required range + } else { + // remove already scanned items from the range query + return true; } - } else { - while (beginIt != endIt) { - auto& current = *beginIt++; - if (!callback(current.first, current.second)) { - return false; + }; + + const auto& from = reverse ? TKey::Min() : range.Begin; + const auto& to = reverse ? range.End : TKey::Max(); + LoadedKeys.EnumInRange(from, to, reverse, [&](const TKey& left, const TKey& right, bool isRangeLoaded) { + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT83, "ScanRange.Step", (Id, Self->GetLogId()), (Left, left), (Right, right), + (IsRangeLoaded, isRangeLoaded), (From, from), (To, to)); + if (!isRangeLoaded) { + // we have to load range (left, right), not including both ends + Y_VERIFY(txc && progress); + if (!loader(*txc, left, right, issue)) { + res = !loader.Processing; + return false; // break the iteration + } + } else if (reverse) { + for (auto it = Data.upper_bound(right); it != Data.begin(); ) { + const auto& [key, value] = *--it; + if (key < left) { + break; + } else if (range.Flags & EScanFlags::INCLUDE_BEGIN ? key < range.Begin : key <= range.Begin) { + return false; // just left the left side of the range + } else if ((key != range.End || range.Flags & EScanFlags::INCLUDE_END) && !issue(TKey(key), value)) { + return false; // enough keys processed + } } + } else { + // we have a range of loaded keys in the interval [left, right], including both ends -- load + // data from memory + for (auto it = Data.lower_bound(left); it != Data.end() && it->first <= right; ++it) { + const auto& [key, value] = *it; + if (range.Flags & EScanFlags::INCLUDE_END ? range.End < key : range.End <= key) { + return false; // just left the right side of the range + } else if ((key != range.Begin || range.Flags & EScanFlags::INCLUDE_BEGIN) && !issue(TKey(key), value)) { + return false; // enough keys processed + } + } + } + return true; + }); + + if (loader.LastProcessedKey) { + if (reverse) { + LoadedKeys.AddRange(std::make_tuple(*loader.LastProcessedKey, range.End)); + } else { + LoadedKeys.AddRange(std::make_tuple(range.Begin, *loader.LastProcessedKey)); } + (reverse ? range.End : range.Begin) = std::move(*loader.LastProcessedKey); + range.Flags.RemoveFlags(reverse ? EScanFlags::INCLUDE_END : EScanFlags::INCLUDE_BEGIN); } - return true; + + return res; } template<typename TCallback> @@ -505,7 +625,7 @@ namespace NKikimr::NBlobDepot { TRecordsPerChannelGroup& GetRecordsPerChannelGroup(TLogoBlobID id); TRecordsPerChannelGroup& GetRecordsPerChannelGroup(ui8 channel, ui32 groupId); - void AddDataOnLoad(TKey key, TString value, bool uncertainWrite); + TValue *AddDataOnLoad(TKey key, TString value, bool uncertainWrite); bool AddDataOnDecommit(const TEvBlobStorage::TEvAssimilateResult::TBlob& blob, NTabletFlatExecutor::TTransactionContext& txc, void *cookie); void AddTrashOnLoad(TLogoBlobID id); @@ -546,10 +666,13 @@ namespace NKikimr::NBlobDepot { } void StartLoad(); + bool LoadTrash(NTabletFlatExecutor::TTransactionContext& txc, TString& from, bool& progress); void OnLoadComplete(); bool IsLoaded() const { return Loaded; } bool IsKeyLoaded(const TKey& key) const { return Loaded || LoadedKeys[key]; } + bool EnsureKeyLoaded(const TKey& key, NTabletFlatExecutor::TTransactionContext& txc); + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// class TResolveDecommitActor; @@ -560,7 +683,7 @@ namespace NKikimr::NBlobDepot { ui32 notifyEventType, TActorId parentId, ui64 cookie, bool keep = false, bool doNotKeep = false); class TTxResolve; - void ExecuteTxResolve(TEvBlobDepot::TEvResolve::TPtr ev, TResolveDecommitContext&& context); + void ExecuteTxResolve(TEvBlobDepot::TEvResolve::TPtr ev, THashSet<TLogoBlobID>&& resolutionErrors = {}); void Handle(TEvBlobDepot::TEvResolve::TPtr ev); @@ -578,7 +701,7 @@ namespace NKikimr::NBlobDepot { void EndCommittingBlobSeqId(TAgent& agent, TBlobSeqId blobSeqId); //////////////////////////////////////////////////////////////////////////////////////////////////////////////// - + TMonotonic LastRecordsValidationTimestamp; void ValidateRecords(); diff --git a/ydb/core/blob_depot/data_decommit.cpp b/ydb/core/blob_depot/data_decommit.cpp index 15a45ecb8a..4a90a0b60c 100644 --- a/ydb/core/blob_depot/data_decommit.cpp +++ b/ydb/core/blob_depot/data_decommit.cpp @@ -1,4 +1,5 @@ #include "data.h" +#include "coro_tx.h" namespace NKikimr::NBlobDepot { @@ -11,7 +12,8 @@ namespace NKikimr::NBlobDepot { TBlobDepot* const Self; std::weak_ptr<TToken> Token; - TResolveDecommitContext Context; + std::vector<TEvBlobStorage::TEvAssimilateResult::TBlob> DecommitBlobs; + THashSet<TLogoBlobID> ResolutionErrors; TEvBlobDepot::TEvResolve::TPtr Ev; ui32 RangesInFlight = 0; @@ -41,24 +43,11 @@ namespace NKikimr::NBlobDepot { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT42, "TResolveDecommitActor::Bootstrap", (Id, Self->GetLogId()), (Sender, Ev->Sender), (Cookie, Ev->Cookie)); - if (!Self->Data->Loaded) { - // issue request to a resolver to just load keys we are looking for - TResolveDecommitContext context; - context.ReturnAfterLoadingKeys = SelfId(); - Self->Data->ExecuteTxResolve(Ev, std::move(context)); - } else { - Handle(Ev); - } - + Self->Execute(std::make_unique<TCoroTx>(Self, Token, std::bind(&TThis::TxPrepare, this))); Become(&TThis::StateFunc); } - void Handle(TEvBlobDepot::TEvResolve::TPtr ev) { - Ev = ev; // store event back - - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT46, "keys loaded", (Id, Self->GetLogId()), - (Sender, Ev->Sender), (Cookie, Ev->Cookie)); - + void TxPrepare() { for (const auto& item : Ev->Get()->Record.GetItems()) { switch (item.GetKeyDesignatorCase()) { case NKikimrBlobDepot::TEvResolve::TItem::kKeyRange: { @@ -85,15 +74,17 @@ namespace NKikimr::NBlobDepot { // adjust minId to skip already assimilated items in range query if (minId < Self->Data->LastAssimilatedBlobId) { if (item.GetMustRestoreFirst()) { - ScanRangeAndIssueGets(TKey(minId), TKey(*Self->Data->LastAssimilatedBlobId), EScanFlags::INCLUDE_BEGIN); + InvokeOtherActor(*this, &TThis::ScanRangeAndIssueGets, TKey(minId), + TKey(*Self->Data->LastAssimilatedBlobId), EScanFlags::INCLUDE_BEGIN); } minId = *Self->Data->LastAssimilatedBlobId; } // issue scan query - IssueRange(tabletId, minId, maxId, item.GetMustRestoreFirst()); + InvokeOtherActor(*this, &TThis::IssueRange, tabletId, minId, maxId, item.GetMustRestoreFirst()); } else if (item.GetMustRestoreFirst()) { - ScanRangeAndIssueGets(TKey(minId), TKey(maxId), EScanFlags::INCLUDE_BEGIN | EScanFlags::INCLUDE_END); + InvokeOtherActor(*this, &TThis::ScanRangeAndIssueGets, TKey(minId), TKey(maxId), + EScanFlags::INCLUDE_BEGIN | EScanFlags::INCLUDE_END); } break; @@ -101,11 +92,14 @@ namespace NKikimr::NBlobDepot { case NKikimrBlobDepot::TEvResolve::TItem::kExactKey: { TData::TKey key = TKey::FromBinaryKey(item.GetExactKey(), Self->Config); + while (!Self->Data->EnsureKeyLoaded(key, *TCoroTx::GetTxc())) { + TCoroTx::RestartTx(); + } const TValue *value = Self->Data->FindKey(key); const bool doGet = (!value && key.GetBlobId() < Self->Data->LastAssimilatedBlobId) // value not yet assimilated || (value && value->GoingToAssimilate && item.GetMustRestoreFirst()); // value has no local data yet if (doGet) { - IssueGet(key.GetBlobId(), item.GetMustRestoreFirst()); + InvokeOtherActor(*this, &TThis::IssueGet, key.GetBlobId(), item.GetMustRestoreFirst()); } break; } @@ -116,16 +110,29 @@ namespace NKikimr::NBlobDepot { } } + TCoroTx::FinishTx(); CheckIfDone(); } void ScanRangeAndIssueGets(TKey from, TKey to, TScanFlags flags) { - Self->Data->ScanRange(&from, &to, flags, [&](const TKey& key, const TValue& value) { + bool progress = false; + + auto callback = [&](const TKey& key, const TValue& value) { if (value.GoingToAssimilate) { IssueGet(key.GetBlobId(), true /*mustRestoreFirst*/); } return true; - }); + }; + + TScanRange r{from, to, flags}; + while (!Self->Data->ScanRange(r, TCoroTx::GetTxc(), &progress, callback)) { + if (std::exchange(progress, false)) { + TCoroTx::FinishTx(); + TCoroTx::RunSuccessorTx(); + } else { + TCoroTx::RestartTx(); + } + } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -153,7 +160,7 @@ namespace NKikimr::NBlobDepot { IssueGet(r.Id, true /*mustRestoreFirst*/); } } else { - Context.DecommitBlobs.push_back({r.Id, r.Keep, r.DoNotKeep}); + DecommitBlobs.push_back({r.Id, r.Keep, r.DoNotKeep}); } } } else { @@ -206,7 +213,7 @@ namespace NKikimr::NBlobDepot { if (r.Buffer) { // wasn't index read IssuePut(TKey(r.Id), std::move(r.Buffer), r.Keep, r.DoNotKeep); } else { - Context.DecommitBlobs.push_back({r.Id, r.Keep, r.DoNotKeep}); + DecommitBlobs.push_back({r.Id, r.Keep, r.DoNotKeep}); } } else if (r.Status == NKikimrProto::NODATA) { Self->Data->ExecuteTxCommitAssimilatedBlob(NKikimrProto::NODATA, TBlobSeqId(), TData::TKey(r.Id), @@ -214,7 +221,7 @@ namespace NKikimr::NBlobDepot { ++PutsInFlight; } else { // mark this specific key as unresolvable - Context.ResolutionErrors.emplace_back(r.Id); + ResolutionErrors.emplace(r.Id); } } @@ -264,7 +271,7 @@ namespace NKikimr::NBlobDepot { TEvPrivate::EvTxComplete, SelfId(), 0, keep, doNotKeep); if (msg.Status != NKikimrProto::OK) { // do not reply OK to this item - Context.ResolutionErrors.emplace_back(msg.Id); + ResolutionErrors.insert(msg.Id); } } @@ -281,10 +288,26 @@ namespace NKikimr::NBlobDepot { void FinishWithSuccess() { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT92, "request succeeded", (Id, Self->GetLogId()), (Sender, Ev->Sender), - (Cookie, Ev->Cookie), (ResolutionErrors.size, Context.ResolutionErrors.size()), - (DecommitBlobs.size, Context.DecommitBlobs.size())); - std::sort(Context.ResolutionErrors.begin(), Context.ResolutionErrors.end()); - Self->Data->ExecuteTxResolve(Ev, std::move(Context)); + (Cookie, Ev->Cookie), (ResolutionErrors.size, ResolutionErrors.size()), + (DecommitBlobs.size, DecommitBlobs.size())); + + Self->Execute(std::make_unique<TCoroTx>(Self, Token, [self = Self, decommitBlobs = std::move(DecommitBlobs), + ev = Ev, resolutionErrors = std::move(ResolutionErrors)]() mutable { + ui32 numItemsProcessed = 0; + for (const auto& blob : decommitBlobs) { + if (numItemsProcessed == 10'000) { + TCoroTx::FinishTx(); + self->Data->CommitTrash(TCoroTx::CurrentTx()); + numItemsProcessed = 0; + TCoroTx::RunSuccessorTx(); + } + numItemsProcessed += self->Data->AddDataOnDecommit(blob, *TCoroTx::GetTxc(), TCoroTx::CurrentTx()); + } + TCoroTx::FinishTx(); + self->Data->CommitTrash(TCoroTx::CurrentTx()); + self->Data->ExecuteTxResolve(ev, std::move(resolutionErrors)); + })); + PassAway(); } @@ -302,7 +325,6 @@ namespace NKikimr::NBlobDepot { } switch (const ui32 type = ev->GetTypeRewrite()) { - hFunc(TEvBlobDepot::TEvResolve, Handle); hFunc(TEvBlobStorage::TEvGetResult, Handle); hFunc(TEvBlobStorage::TEvRangeResult, Handle); hFunc(TEvBlobStorage::TEvPutResult, Handle); diff --git a/ydb/core/blob_depot/data_load.cpp b/ydb/core/blob_depot/data_load.cpp index 010ae46e2c..3165c16873 100644 --- a/ydb/core/blob_depot/data_load.cpp +++ b/ydb/core/blob_depot/data_load.cpp @@ -1,167 +1,103 @@ #include "data.h" #include "schema.h" #include "garbage_collection.h" +#include "coro_tx.h" namespace NKikimr::NBlobDepot { using TData = TBlobDepot::TData; - class TData::TTxDataLoad : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { - std::optional<TString> LastTrashKey; - bool TrashLoaded = false; - bool SuccessorTx = true; - - public: - TTxType GetTxType() const override { return NKikimrBlobDepot::TXTYPE_DATA_LOAD; } - - TTxDataLoad(TBlobDepot *self) - : TTransactionBase(self) - {} - - TTxDataLoad(TTxDataLoad& predecessor) - : TTransactionBase(predecessor.Self) - , LastTrashKey(std::move(predecessor.LastTrashKey)) - , TrashLoaded(predecessor.TrashLoaded) - {} - - bool Execute(TTransactionContext& txc, const TActorContext&) override { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT28, "TData::TTxDataLoad::Execute", (Id, Self->GetLogId())); - - NIceDb::TNiceDb db(txc.DB); + void TData::StartLoad() { + Self->Execute(std::make_unique<TCoroTx>(Self, Self->Token, [&] { bool progress = false; - auto load = [&](auto t, auto& lastKey, auto callback) { - auto table = t.GreaterOrEqual(lastKey.value_or(TString())); - static constexpr ui64 PrechargeRows = 10'000; - static constexpr ui64 PrechargeBytes = 1'000'000; - if (!table.Precharge(PrechargeRows, PrechargeBytes)) { - return false; - } - auto rows = table.Select(); - if (!rows.IsReady()) { - return false; - } - while (rows.IsValid()) { - if (auto key = rows.GetKey(); key != lastKey) { - callback(key, rows); - lastKey.emplace(std::move(key)); - progress = true; - } - if (!rows.Next()) { - return false; - } - } - lastKey.reset(); - return true; - }; - - if (!TrashLoaded) { - auto addTrash = [this](const auto& key, const auto& /*rows*/) { - Self->Data->AddTrashOnLoad(TLogoBlobID::FromBinary(key)); - }; - if (!load(db.Table<Schema::Trash>(), LastTrashKey, addTrash)) { - return progress; - } - TrashLoaded = true; - } - - for (;;) { - // calculate a set of keys we need to load - TClosedIntervalSet<TKey> needed; - needed |= {TKey::Min(), TKey::Max()}; - const auto interval = needed.PartialSubtract(Self->Data->LoadedKeys); - if (!interval) { - break; - } - - const auto& [first, last] = *interval; - - auto makeNeeded = [&] { - TStringStream s("{"); - bool flag = true; - needed([&](const TKey& first, const TKey& last) { - s << (std::exchange(flag, false) ? "" : "-"); - first.Output(s); - last.Output(s << '-'); - return true; - }); - s << '}'; - return s.Str(); - }; - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT83, "TTxDataLoad iteration", (Id, Self->GetLogId()), (Needed, makeNeeded()), - (FirstKey, first), (LastKey, last)); + TString trash; + bool trashLoaded = false; - bool status = true; - std::optional<TKey> lastKey; // the actual last processed key + TScanRange r{ + .Begin = TKey::Min(), + .End = TKey::Max(), + .PrechargeRows = 10'000, + .PrechargeBytes = 1'000'000, + }; - auto table = db.Table<Schema::Data>().GreaterOrEqual(first.MakeBinaryKey()); - static constexpr ui64 PrechargeRows = 10'000; - static constexpr ui64 PrechargeBytes = 1'000'000; - if (!table.Precharge(PrechargeRows, PrechargeBytes)) { - status = false; - } else if (auto rows = table.Select(); !rows.IsReady()) { - status = false; + while (!(trashLoaded = LoadTrash(*TCoroTx::GetTxc(), trash, progress)) || + !ScanRange(r, TCoroTx::GetTxc(), &progress, [](const TKey&, const TValue&) { return true; })) { + if (std::exchange(progress, false)) { + TCoroTx::FinishTx(); + TCoroTx::RunSuccessorTx(); } else { - for (;;) { - if (!rows.IsValid()) { // finished reading the range - lastKey.emplace(last); - break; - } - auto key = TKey::FromBinaryKey(rows.GetKey(), Self->Config); - lastKey.emplace(key); - if (last <= key) { // stop iteration -- we are getting out of range - break; - } else if (first < key && !Self->Data->IsKeyLoaded(key)) { - Self->Data->AddDataOnLoad(std::move(key), rows.template GetValue<Schema::Data::Value>(), - rows.template GetValueOrDefault<Schema::Data::UncertainWrite>()); - progress = true; - } - if (!rows.Next()) { - status = false; - break; - } - } - } - - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT84, "TTxDataLoad iteration complete", (Id, Self->GetLogId()), - (FirstKey, first), (LastKey, lastKey), (Status, status), (Progress, progress)); - - if (lastKey) { - Self->Data->LoadedKeys |= {first, *lastKey}; - } - if (!status) { - return progress; + TCoroTx::RestartTx(); } } - SuccessorTx = false; // everything loaded - return true; - } - - void Complete(const TActorContext&) override { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT29, "TData::TTxDataLoad::Complete", (Id, Self->GetLogId()), - (TrashLoaded, TrashLoaded), (SuccessorTx, SuccessorTx)); + TCoroTx::FinishTx(); + Self->Data->OnLoadComplete(); + })); + } - if (SuccessorTx) { - Self->Execute(std::make_unique<TTxDataLoad>(*this)); - } else { - Self->Data->OnLoadComplete(); + bool TData::LoadTrash(NTabletFlatExecutor::TTransactionContext& txc, TString& from, bool& progress) { + NIceDb::TNiceDb db(txc.DB); + auto table = db.Table<Schema::Trash>().GreaterOrEqual(from); + static constexpr ui64 PrechargeRows = 10'000; + static constexpr ui64 PrechargeBytes = 1'000'000; + if (!table.Precharge(PrechargeRows, PrechargeBytes)) { + return false; + } + auto rows = table.Select(); + if (!rows.IsReady()) { + return false; + } + while (rows.IsValid()) { + if (auto key = rows.GetKey(); key != from) { + Self->Data->AddTrashOnLoad(TLogoBlobID::FromBinary(key)); + from = std::move(key); + progress = true; + } + if (!rows.Next()) { + return false; } } - }; - - void TData::StartLoad() { - Self->Execute(std::make_unique<TTxDataLoad>(Self)); + return true; } void TData::OnLoadComplete() { - Loaded = true; + Self->Data->LoadedKeys([&](const TKey& left, const TKey& right) { + // verify that LoadedKeys == {Min, Max} exactly + Y_VERIFY_S(left == TKey::Min() && right == TKey::Max() && !Loaded, "Id# " << Self->GetLogId() + << " Left# " << left.ToString() + << " Right# " << right.ToString() + << " Loaded# " << Loaded + << " LoadedKeys# " << LoadedKeys.ToString()); + Loaded = true; + return true; + }); + Y_VERIFY(Loaded); Self->OnDataLoadComplete(); for (auto& [key, record] : RecordsPerChannelGroup) { record.CollectIfPossible(this); } } + bool TData::EnsureKeyLoaded(const TKey& key, NTabletFlatExecutor::TTransactionContext& txc) { + if (IsKeyLoaded(key)) { + return true; + } + + NIceDb::TNiceDb db(txc.DB); + using Table = Schema::Data; + auto row = db.Table<Table>().Key(key.MakeBinaryKey()).Select(); + if (!row.IsReady()) { + return false; + } else { + if (row.IsValid()) { + AddDataOnLoad(key, row.GetValue<Table::Value>(), row.GetValueOrDefault<Table::UncertainWrite>()); + } + Self->Data->LoadedKeys |= {key, key}; + return true; + } + } + void TBlobDepot::StartDataLoad() { Data->StartLoad(); } diff --git a/ydb/core/blob_depot/data_resolve.cpp b/ydb/core/blob_depot/data_resolve.cpp index 6570d5b71f..68856636ed 100644 --- a/ydb/core/blob_depot/data_resolve.cpp +++ b/ydb/core/blob_depot/data_resolve.cpp @@ -83,149 +83,108 @@ namespace NKikimr::NBlobDepot { class TData::TTxResolve : public NTabletFlatExecutor::TTransactionBase<TBlobDepot> { std::unique_ptr<TEvBlobDepot::TEvResolve::THandle> Request; - TResolveDecommitContext ResolveDecommitContext; - - bool KeysLoaded = false; - int ItemIndex = 0; - std::optional<TKey> LastScannedKey; - ui32 NumKeysRead = 0; // number of keys already read for this item - - // final state + THashSet<TLogoBlobID> ResolutionErrors; + size_t ItemIndex = 0; + std::optional<TScanRange> Range; TResolveResultAccumulator Result; - bool SuccessorTx = false; std::deque<TKey> Uncertainties; + bool SuccessorTx = false; + public: TTxType GetTxType() const override { return NKikimrBlobDepot::TXTYPE_RESOLVE; } - TTxResolve(TBlobDepot *self, TEvBlobDepot::TEvResolve::TPtr request, - TResolveDecommitContext&& resolveDecommitContext = {}) + TTxResolve(TBlobDepot *self, TEvBlobDepot::TEvResolve::TPtr request, THashSet<TLogoBlobID>&& resolutionErrors) : TTransactionBase(self) , Request(request.Release()) - , ResolveDecommitContext(std::move(resolveDecommitContext)) + , ResolutionErrors(std::move(resolutionErrors)) , Result(*Request) {} TTxResolve(TTxResolve& predecessor) : TTransactionBase(predecessor.Self) , Request(std::move(predecessor.Request)) - , ResolveDecommitContext(std::move(predecessor.ResolveDecommitContext)) - , KeysLoaded(predecessor.KeysLoaded) + , ResolutionErrors(std::move(predecessor.ResolutionErrors)) , ItemIndex(predecessor.ItemIndex) - , LastScannedKey(std::move(predecessor.LastScannedKey)) - , NumKeysRead(predecessor.NumKeysRead) + , Range(std::move(predecessor.Range)) , Result(std::move(predecessor.Result)) + , Uncertainties(std::move(predecessor.Uncertainties)) {} - bool GetScanParams(const NKikimrBlobDepot::TEvResolve::TItem& item, std::optional<TKey> *begin, - std::optional<TKey> *end, TScanFlags *flags, ui64 *maxKeys) { - switch (item.GetKeyDesignatorCase()) { - case NKikimrBlobDepot::TEvResolve::TItem::kKeyRange: { - const auto& range = item.GetKeyRange(); - *flags = TScanFlags() - | (range.GetIncludeBeginning() ? EScanFlags::INCLUDE_BEGIN : TScanFlags()) - | (range.GetIncludeEnding() ? EScanFlags::INCLUDE_END : TScanFlags()) - | (range.GetReverse() ? EScanFlags::REVERSE : TScanFlags()); - if (range.HasBeginningKey()) { - begin->emplace(TKey::FromBinaryKey(range.GetBeginningKey(), Self->Config)); - } else { - begin->reset(); - } - if (range.HasEndingKey()) { - end->emplace(TKey::FromBinaryKey(range.GetEndingKey(), Self->Config)); - } else { - end->reset(); - } - *maxKeys = range.GetMaxKeys(); - return true; - } - - case NKikimrBlobDepot::TEvResolve::TItem::kExactKey: - begin->emplace(TKey::FromBinaryKey(item.GetExactKey(), Self->Config)); - end->emplace(begin->value()); - *flags = EScanFlags::INCLUDE_BEGIN | EScanFlags::INCLUDE_END; - *maxKeys = 1; - return true; - - case NKikimrBlobDepot::TEvResolve::TItem::KEYDESIGNATOR_NOT_SET: - return false; - } - - return false; - } - bool Execute(TTransactionContext& txc, const TActorContext&) override { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT22, "TTxResolve::Execute", (Id, Self->GetLogId()), - (Sender, Request->Sender), (Cookie, Request->Cookie), (ItemIndex, ItemIndex), - (LastScannedKey, LastScannedKey), (DecommitBlobs.size, ResolveDecommitContext.DecommitBlobs.size())); + (Sender, Request->Sender), (Cookie, Request->Cookie), (ItemIndex, ItemIndex)); bool progress = false; - if (!KeysLoaded && !LoadKeys(txc, progress)) { - return progress; - } else if (SuccessorTx) { - return true; - } else { - KeysLoaded = true; - } - - if (ResolveDecommitContext.ReturnAfterLoadingKeys) { - return true; - } - - ui32 numItemsRemain = 10'000; - - while (!ResolveDecommitContext.DecommitBlobs.empty()) { - if (!numItemsRemain) { - SuccessorTx = true; - return true; - } - - const auto& blob = ResolveDecommitContext.DecommitBlobs.front(); - if (Self->Data->LastAssimilatedBlobId < blob.Id) { - numItemsRemain -= Self->Data->AddDataOnDecommit(blob, txc, this); - } - ResolveDecommitContext.DecommitBlobs.pop_front(); - } const auto& record = Request->Get()->Record; - for (const auto& item : record.GetItems()) { + for (; ItemIndex < record.ItemsSize(); ++ItemIndex) { + const auto& item = record.GetItems(ItemIndex); std::optional<ui64> cookie = item.HasCookie() ? std::make_optional(item.GetCookie()) : std::nullopt; - + std::optional<bool> status; switch (item.GetKeyDesignatorCase()) { - case NKikimrBlobDepot::TEvResolve::TItem::kKeyRange: { - std::optional<TKey> begin; - std::optional<TKey> end; - TScanFlags flags; - ui64 maxKeys; - const bool success = GetScanParams(item, &begin, &end, &flags, &maxKeys); - Y_VERIFY(success); - Self->Data->ScanRange(begin, end, flags, [&](const TKey& key, const TValue& value) { - IssueResponseItem(cookie, key, value); - return --maxKeys != 0; - }); + case NKikimrBlobDepot::TEvResolve::TItem::kKeyRange: + status = ProcessKeyRange(item.GetKeyRange(), txc, progress, cookie, item.GetMustRestoreFirst()); break; - } - case NKikimrBlobDepot::TEvResolve::TItem::kExactKey: { - const auto key = TKey::FromBinaryKey(item.GetExactKey(), Self->Config); - const TValue *value = Self->Data->FindKey(key); - const auto& errors = ResolveDecommitContext.ResolutionErrors; - if (value || std::binary_search(errors.begin(), errors.end(), key)) { - static const TValue zeroValue; - IssueResponseItem(cookie, key, value ? *value : zeroValue); - } + case NKikimrBlobDepot::TEvResolve::TItem::kExactKey: + status = ProcessExactKey(item.GetExactKey(), txc, progress, cookie, item.GetMustRestoreFirst()); break; - } case NKikimrBlobDepot::TEvResolve::TItem::KEYDESIGNATOR_NOT_SET: Y_VERIFY_DEBUG(false, "incorrect query field"); break; } + if (status) { + return *status; + } } return true; } + std::optional<bool> ProcessKeyRange(const NKikimrBlobDepot::TEvResolve::TKeyRange& range, + TTransactionContext& txc, bool& progress, const std::optional<ui64>& cookie, bool mustRestoreFirst) { + if (!Range) { + Range.emplace(); + Range->Begin = range.HasBeginningKey() + ? TKey::FromBinaryKey(range.GetBeginningKey(), Self->Config) + : TKey::Min(); + Range->End = range.HasEndingKey() + ? TKey::FromBinaryKey(range.GetEndingKey(), Self->Config) + : TKey::Max(); + Range->Flags = TScanFlags() + | (range.GetIncludeBeginning() ? EScanFlags::INCLUDE_BEGIN : TScanFlags()) + | (range.GetIncludeEnding() ? EScanFlags::INCLUDE_END : TScanFlags()) + | (range.GetReverse() ? EScanFlags::REVERSE : TScanFlags()); + Range->MaxKeys = range.GetMaxKeys(); + } + auto callback = [&](const TKey& key, const TValue& value) { + IssueResponseItem(cookie, key, value, mustRestoreFirst); + return true; + }; + if (Self->Data->ScanRange(*Range, &txc, &progress, callback)) { + Range.reset(); + return std::nullopt; + } else { + return SuccessorTx = progress; + } + } + + std::optional<bool> ProcessExactKey(const TString& exactKey, TTransactionContext& txc, bool& progress, + const std::optional<ui64>& cookie, bool mustRestoreFirst) { + const auto key = TKey::FromBinaryKey(exactKey, Self->Config); + if (!Self->Data->EnsureKeyLoaded(key, txc)) { + return SuccessorTx = progress; + } + const TValue *value = Self->Data->FindKey(key); + if (value || (!ResolutionErrors.empty() && ResolutionErrors.contains(key.GetBlobId()))) { + static const TValue zero; + IssueResponseItem(cookie, key, value ? *value : zero, mustRestoreFirst); + } + return std::nullopt; + } + void Complete(const TActorContext&) override { STLOG(PRI_DEBUG, BLOB_DEPOT, BDT30, "TTxResolve::Complete", (Id, Self->GetLogId()), (Sender, Request->Sender), (Cookie, Request->Cookie), (SuccessorTx, SuccessorTx), @@ -235,8 +194,6 @@ namespace NKikimr::NBlobDepot { if (SuccessorTx) { Self->Execute(std::make_unique<TTxResolve>(*this)); - } else if (const TActorId recipient = ResolveDecommitContext.ReturnAfterLoadingKeys) { - TActivationContext::Send(Request->Forward(recipient)); } else if (Uncertainties.empty()) { Result.Send(NKikimrProto::OK, std::nullopt); } else { @@ -244,132 +201,7 @@ namespace NKikimr::NBlobDepot { } } - bool LoadKeys(TTransactionContext& txc, bool& progress) { - NIceDb::TNiceDb db(txc.DB); - - if (Self->Data->Loaded) { - return true; - } - - const auto& record = Request->Get()->Record; - const auto& items = record.GetItems(); - for (; ItemIndex < items.size(); ++ItemIndex, LastScannedKey.reset(), NumKeysRead = 0) { - const auto& item = items[ItemIndex]; - - std::optional<TKey> begin; - std::optional<TKey> end; - TScanFlags flags; - ui64 maxKeys; - const bool success = GetScanParams(item, &begin, &end, &flags, &maxKeys); - Y_VERIFY_DEBUG(success); - - // adjust range according to actually generated data - if (LastScannedKey) { - if (flags & EScanFlags::REVERSE) { // reverse scan - end = *LastScannedKey; - flags &= ~EScanFlags::INCLUDE_END; - } else { // direct scan - begin = *LastScannedKey; - flags &= ~EScanFlags::INCLUDE_BEGIN; - } - } - - TClosedIntervalSet<TKey> needed; - needed |= {begin.value_or(TKey::Min()), end.value_or(TKey::Max())}; - needed -= Self->Data->LoadedKeys; - if (!needed) { - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT76, "TTxResolve: skipping subrange", (Id, Self->GetLogId()), - (Sender, Request->Sender), (Cookie, Request->Cookie), (ItemIndex, ItemIndex)); - continue; - } - - std::optional<TKey> boundary; - - auto processRange = [&](auto table) { - bool done = false; - for (auto rowset = table.Select();; rowset.Next()) { - if (!rowset.IsReady()) { - return done; - } else if (!rowset.IsValid()) { - // no more keys in our direction -- we have scanned full requested range - boundary.emplace(flags & EScanFlags::REVERSE - ? begin.value_or(TKey::Min()) - : end.value_or(TKey::Max())); - return true; - } - auto key = TKey::FromBinaryKey(rowset.template GetValue<Schema::Data::Key>(), Self->Config); - boundary.emplace(key); - if (key != LastScannedKey) { - LastScannedKey = key; - progress = true; - - const bool isKeyLoaded = Self->Data->IsKeyLoaded(key); - - STLOG(PRI_DEBUG, BLOB_DEPOT, BDT85, "TTxResolve: loading key from database", (Id, Self->GetLogId()), - (Sender, Request->Sender), (Cookie, Request->Cookie), (ItemIndex, ItemIndex), (Key, key), - (IsKeyLoaded, isKeyLoaded)); - - if (!isKeyLoaded) { - Self->Data->AddDataOnLoad(key, rowset.template GetValue<Schema::Data::Value>(), - rowset.template GetValueOrDefault<Schema::Data::UncertainWrite>()); - } - - const bool matchBegin = !begin || (flags & EScanFlags::INCLUDE_BEGIN ? *begin <= key : *begin < key); - const bool matchEnd = !end || (flags & EScanFlags::INCLUDE_END ? key <= *end : key < *end); - if (matchBegin && matchEnd && ResolveDecommitContext.DecommitBlobs.empty() && ++NumKeysRead == maxKeys) { - // we have hit the MaxItems limit, exit - done = true; - } - if (flags & EScanFlags::REVERSE ? !matchEnd : !matchBegin) { - // we have exceeded our range - done = true; - } - } - } - }; - - auto applyEnd = [&](auto&& x) { - return end - ? processRange(x.LessOrEqual(end->MakeBinaryKey())) - : processRange(std::forward<std::decay_t<decltype(x)>>(x)); - }; - auto applyBegin = [&](auto&& x) { - return begin - ? applyEnd(x.GreaterOrEqual(begin->MakeBinaryKey())) - : applyEnd(std::forward<std::decay_t<decltype(x)>>(x)); - }; - auto applyReverse = [&](auto&& x) { - return flags & EScanFlags::REVERSE - ? applyBegin(x.Reverse()) - : applyBegin(std::forward<std::decay_t<decltype(x)>>(x)); - }; - - const bool status = applyReverse(db.Table<Schema::Data>()); - - if (boundary) { - if (flags & EScanFlags::REVERSE) { - Self->Data->LoadedKeys |= {*boundary, end.value_or(TKey::Max())}; - } else { - Self->Data->LoadedKeys |= {begin.value_or(TKey::Min()), *boundary}; - } - } - - if (status) { - continue; // all work done for this item - } else if (progress) { - // we have already done something, so let's finish this transaction and start a new one, continuing - // the job - SuccessorTx = true; - return true; - } else { - return false; // we'll have to restart this transaction to fetch some data - } - } - - return true; - } - - void IssueResponseItem(std::optional<ui64> cookie, const TKey& key, const TValue& value) { + void IssueResponseItem(std::optional<ui64> cookie, const TKey& key, const TValue& value, bool reliablyWritten) { NKikimrBlobDepot::TEvResolveResult::TResolvedKey item; if (cookie) { @@ -381,6 +213,7 @@ namespace NKikimr::NBlobDepot { auto *out = item.AddValueChain(); out->SetGroupId(Self->Config.GetVirtualGroupId()); LogoBlobIDFromLogoBlobID(key.GetBlobId(), out->MutableBlobId()); + item.SetReliablyWritten(reliablyWritten); } else { EnumerateBlobsForValueChain(value.ValueChain, Self->TabletID(), [&](const TLogoBlobID& id, ui32 begin, ui32 end) { if (begin != end) { @@ -395,13 +228,13 @@ namespace NKikimr::NBlobDepot { } } }); + item.SetReliablyWritten(true); } if (value.Meta) { item.SetMeta(value.Meta.data(), value.Meta.size()); } - const auto& errors = ResolveDecommitContext.ResolutionErrors; - if (std::binary_search(errors.begin(), errors.end(), key)) { + if (!ResolutionErrors.empty() && ResolutionErrors.contains(key.GetBlobId())) { item.SetErrorReason("item resolution error"); item.ClearValueChain(); } else if (!item.ValueChainSize()) { @@ -423,12 +256,12 @@ namespace NKikimr::NBlobDepot { if (Self->Config.GetIsDecommittingGroup() && Self->DecommitState <= EDecommitState::BlobsFinished) { Self->RegisterWithSameMailbox(CreateResolveDecommitActor(ev)); } else { - Self->Execute(std::make_unique<TTxResolve>(Self, ev)); + ExecuteTxResolve(ev); } } - void TData::ExecuteTxResolve(TEvBlobDepot::TEvResolve::TPtr ev, TResolveDecommitContext&& context) { - Self->Execute(std::make_unique<TTxResolve>(Self, ev, std::move(context))); + void TData::ExecuteTxResolve(TEvBlobDepot::TEvResolve::TPtr ev, THashSet<TLogoBlobID>&& resolutionErrors) { + Self->Execute(std::make_unique<TTxResolve>(Self, ev, std::move(resolutionErrors))); } } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/garbage_collection.cpp b/ydb/core/blob_depot/garbage_collection.cpp index 4aa554fa66..60ea091567 100644 --- a/ydb/core/blob_depot/garbage_collection.cpp +++ b/ydb/core/blob_depot/garbage_collection.cpp @@ -273,8 +273,9 @@ namespace NKikimr::NBlobDepot { const TData::TKey last(TLogoBlobID(tabletId, barrierGenStep.Generation(), barrierGenStep.Step(), channel, TLogoBlobID::MaxBlobSize, TLogoBlobID::MaxCookie, TLogoBlobID::MaxPartId, TLogoBlobID::MaxCrcMode)); - Self->Data->ScanRange(&first, &last, TData::EScanFlags::INCLUDE_BEGIN | TData::EScanFlags::INCLUDE_END, - [&](const TData::TKey& key, const TData::TValue& value) { + + TData::TScanRange r{first, last, TData::EScanFlags::INCLUDE_BEGIN | TData::EScanFlags::INCLUDE_END}; + Self->Data->ScanRange(r, nullptr, nullptr, [&](const TData::TKey& key, const TData::TValue& value) { // there must be no blobs under the hard barrier and no blobs with mode other than Keep under the soft one Y_VERIFY_S(!hard && value.KeepState == NKikimrBlobDepot::EKeepState::Keep, "Key# " << key.ToString() << " Value# " << value.ToString()); @@ -282,7 +283,8 @@ namespace NKikimr::NBlobDepot { }); } # if 0 - Self->Data->ScanRange(nullptr, nullptr, {}, [&](const TData::TKey& key, const TData::TValue& value) { + TData::TScanRange r{TData::TKey::Min(), TData::TKey::Max()}; + Self->Data->ScanRange(r, nullptr, nullptr, [&](const TData::TKey& key, const TData::TValue& value) { bool underSoft, underHard; Self->BarrierServer->GetBlobBarrierRelation(key.GetBlobId(), &underSoft, &underHard); Y_VERIFY(!underHard && (!underSoft || value.KeepState == NKikimrBlobDepot::EKeepState::Keep)); diff --git a/ydb/core/blob_depot/op_commit_blob_seq.cpp b/ydb/core/blob_depot/op_commit_blob_seq.cpp index e062d00f30..2373af001c 100644 --- a/ydb/core/blob_depot/op_commit_blob_seq.cpp +++ b/ydb/core/blob_depot/op_commit_blob_seq.cpp @@ -133,25 +133,13 @@ namespace NKikimr::NBlobDepot { if (Self->Data->IsLoaded()) { return true; } - bool success = true; for (const auto& item : Request->Get()->Record.GetItems()) { auto key = TData::TKey::FromBinaryKey(item.GetKey(), Self->Config); - if (Self->Data->IsKeyLoaded(key)) { - continue; - } - using Table = Schema::Data; - auto row = db.Table<Table>().Key(item.GetKey()).Select(); - if (!row.IsReady()) { - success = false; - } else { - Self->Data->LoadedKeys |= {key, key}; - if (row.IsValid()) { - Self->Data->AddDataOnLoad(std::move(key), row.GetValue<Table::Value>(), - row.GetValueOrDefault<Table::UncertainWrite>()); - } + if (!Self->Data->EnsureKeyLoaded(key, txc)) { + return false; } } - return success; + return true; } bool CheckKeyAgainstBarrier(const TData::TKey& key, TString *error) { diff --git a/ydb/core/blob_depot/types.h b/ydb/core/blob_depot/types.h index ac33792de6..ba9a3f0148 100644 --- a/ydb/core/blob_depot/types.h +++ b/ydb/core/blob_depot/types.h @@ -151,7 +151,6 @@ namespace NKikimr::NBlobDepot { }; using TValueChain = NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TValueChain>; - using TResolvedValueChain = NProtoBuf::RepeatedPtrField<NKikimrBlobDepot::TResolvedValueChain>; template<typename TCallback> void EnumerateBlobsForValueChain(const TValueChain& valueChain, ui64 tabletId, TCallback&& callback) { diff --git a/ydb/core/protos/blob_depot.proto b/ydb/core/protos/blob_depot.proto index 488858ccc1..2efe99467e 100644 --- a/ydb/core/protos/blob_depot.proto +++ b/ydb/core/protos/blob_depot.proto @@ -222,6 +222,7 @@ message TEvResolveResult { repeated uint64 Owners = 5; optional string ErrorReason = 6; // if set, this means value wasn't resolved due to error optional uint32 ValueVersion = 7; // ValueChain version, gets increased every time value is changed + optional bool ReliablyWritten = 8; // MustRestoreFirst was either set, or the blob is in local storage } optional NKikimrProto.EReplyStatus Status = 1; // OVERRUN means there are more messages on the way optional string ErrorReason = 2; |