diff options
author | alexvru <alexvru@ydb.tech> | 2022-09-24 20:51:05 +0300 |
---|---|---|
committer | alexvru <alexvru@ydb.tech> | 2022-09-24 20:51:05 +0300 |
commit | 5db281b96a25fdced40fb6e21a00c2c501ad0662 (patch) | |
tree | caa2b0c412b82dd379ca684f9cb073631f16b97b | |
parent | 1e31d37252ec7af9d09375f1acd77dda07e08366 (diff) | |
download | ydb-5db281b96a25fdced40fb6e21a00c2c501ad0662.tar.gz |
Add GroupOverseer component for testing
22 files changed, 840 insertions, 20 deletions
diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h index 0a3b4110db0..267107da846 100644 --- a/library/cpp/actors/core/actor.h +++ b/library/cpp/actors/core/actor.h @@ -555,3 +555,5 @@ struct THash<NActors::TActorIdentity> { return x.Hash(); } }; + +template<> struct std::hash<NActors::TActorIdentity> : THash<NActors::TActorIdentity> {}; diff --git a/library/cpp/actors/core/actorid.h b/library/cpp/actors/core/actorid.h index d972b1a0ff1..a9cac589b2b 100644 --- a/library/cpp/actors/core/actorid.h +++ b/library/cpp/actors/core/actorid.h @@ -194,3 +194,5 @@ struct THash<NActors::TActorId> { return x.Hash(); } }; + +template<> struct std::hash<NActors::TActorId> : THash<NActors::TActorId> {}; diff --git a/ydb/core/blob_depot/CMakeLists.txt b/ydb/core/blob_depot/CMakeLists.txt index f92f8aca9a0..6461836196d 100644 --- a/ydb/core/blob_depot/CMakeLists.txt +++ b/ydb/core/blob_depot/CMakeLists.txt @@ -30,6 +30,7 @@ target_sources(ydb-core-blob_depot PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/given_id_range.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/group_metrics_exchange.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/mon_main.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/testing.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_apply_config.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_init_schema.cpp ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_load.cpp diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h index 46c7d75251b..42a87d1c96e 100644 --- a/ydb/core/blob_depot/blob_depot_tablet.h +++ b/ydb/core/blob_depot/blob_depot_tablet.h @@ -5,6 +5,12 @@ #include "types.h" #include "schema.h" +namespace NKikimr::NTesting { + + class TGroupOverseer; + +} // NKikimr::NTesting + namespace NKikimr::NBlobDepot { using NTabletFlatExecutor::TTabletExecutedFlat; @@ -227,6 +233,11 @@ namespace NKikimr::NBlobDepot { void DoGroupMetricsExchange(); void Handle(TEvBlobStorage::TEvControllerGroupMetricsExchange::TPtr ev); + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Validation + + void Validate(NTesting::TGroupOverseer& overseer) const; }; } // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/defs.h b/ydb/core/blob_depot/defs.h index 997866ecd23..807509fd456 100644 --- a/ydb/core/blob_depot/defs.h +++ b/ydb/core/blob_depot/defs.h @@ -5,6 +5,7 @@ #include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h> #include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo_sets.h> #include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo_partlayout.h> +#include <ydb/core/blobstorage/testing/group_overseer/group_overseer.h> #include <ydb/core/blobstorage/vdisk/common/vdisk_events.h> #include <ydb/core/engine/minikql/flat_local_tx_factory.h> #include <ydb/core/tablet_flat/tablet_flat_executed.h> diff --git a/ydb/core/blob_depot/testing.cpp b/ydb/core/blob_depot/testing.cpp new file mode 100644 index 00000000000..27cf6fe6e6f --- /dev/null +++ b/ydb/core/blob_depot/testing.cpp @@ -0,0 +1,27 @@ +#include "testing.h" +#include "blob_depot_tablet.h" + +namespace NKikimr::NBlobDepot { + + bool IsBlobDepotActor(IActor *actor) { + return dynamic_cast<TBlobDepot*>(actor); + } + + void ValidateBlobDepot(IActor *actor, NTesting::TGroupOverseer& overseer) { + if (auto *x = dynamic_cast<TBlobDepot*>(actor)) { + x->Validate(overseer); + } else { + Y_FAIL(); + } + } + + void TBlobDepot::Validate(NTesting::TGroupOverseer& overseer) const { + Y_VERIFY(Config.HasVirtualGroupId()); + (void)overseer; +// overseer.EnumerateBlobs(Config.GetVirtualGroupId(), [&](TLogoBlobID id, NTesting::EBlobState state) { +// Cerr << id.ToString() << Endl; +// (void)state; +// }); + } + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blob_depot/testing.h b/ydb/core/blob_depot/testing.h new file mode 100644 index 00000000000..04d5d87646a --- /dev/null +++ b/ydb/core/blob_depot/testing.h @@ -0,0 +1,16 @@ +#pragma once + +#include "defs.h" + +namespace NKikimr::NTesting { + + class TGroupOverseer; + +} // NKikimr::NTesting + +namespace NKikimr::NBlobDepot { + + bool IsBlobDepotActor(IActor *actor); + void ValidateBlobDepot(IActor *actor, NTesting::TGroupOverseer& overseer); + +} // NKikimr::NBlobDepot diff --git a/ydb/core/blobstorage/CMakeLists.txt b/ydb/core/blobstorage/CMakeLists.txt index c6dc5671666..77cb7c9da43 100644 --- a/ydb/core/blobstorage/CMakeLists.txt +++ b/ydb/core/blobstorage/CMakeLists.txt @@ -17,6 +17,7 @@ add_subdirectory(nodewarden) add_subdirectory(other) add_subdirectory(pdisk) add_subdirectory(storagepoolmon) +add_subdirectory(testing) add_subdirectory(testload) add_subdirectory(ut_blobstorage) add_subdirectory(ut_group) diff --git a/ydb/core/blobstorage/testing/CMakeLists.txt b/ydb/core/blobstorage/testing/CMakeLists.txt new file mode 100644 index 00000000000..8fbce163859 --- /dev/null +++ b/ydb/core/blobstorage/testing/CMakeLists.txt @@ -0,0 +1,9 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(group_overseer) diff --git a/ydb/core/blobstorage/testing/group_overseer/CMakeLists.txt b/ydb/core/blobstorage/testing/group_overseer/CMakeLists.txt new file mode 100644 index 00000000000..32dd4aeaed9 --- /dev/null +++ b/ydb/core/blobstorage/testing/group_overseer/CMakeLists.txt @@ -0,0 +1,20 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(blobstorage-testing-group_overseer) +target_link_libraries(blobstorage-testing-group_overseer PUBLIC + contrib-libs-cxxsupp + yutil + ydb-core-base + contrib-libs-t1ha +) +target_sources(blobstorage-testing-group_overseer PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/testing/group_overseer/group_overseer.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/testing/group_overseer/group_state.cpp +) diff --git a/ydb/core/blobstorage/testing/group_overseer/defs.h b/ydb/core/blobstorage/testing/group_overseer/defs.h new file mode 100644 index 00000000000..ed072023e02 --- /dev/null +++ b/ydb/core/blobstorage/testing/group_overseer/defs.h @@ -0,0 +1,6 @@ +#pragma once + +#include <ydb/core/base/blobstorage.h> +#include <ydb/core/blobstorage/testing/group_overseer/group_overseer.h> + +#include <contrib/libs/t1ha/t1ha.h> diff --git a/ydb/core/blobstorage/testing/group_overseer/group_overseer.cpp b/ydb/core/blobstorage/testing/group_overseer/group_overseer.cpp new file mode 100644 index 00000000000..fe662e5249e --- /dev/null +++ b/ydb/core/blobstorage/testing/group_overseer/group_overseer.cpp @@ -0,0 +1,133 @@ +#include "group_overseer.h" +#include "group_state.h" + +namespace NKikimr::NTesting { + + class TGroupOverseer::TImpl { + std::unordered_map<TActorId, ui32> OverseenServiceMap; + std::unordered_map<ui32, TGroupState> GroupStates; + std::unordered_map<TQueryId, ui32> QueryToGroup; + + public: + void AddGroupToOversee(ui32 groupId) { + const TActorId proxyId = MakeBlobStorageProxyID(groupId); + OverseenServiceMap[proxyId] = groupId; + GroupStates.try_emplace(groupId); + } + + void ExamineEvent(ui32 nodeId, IEventHandle& ev) { + switch (ev.GetTypeRewrite()) { // all of these events may alter storage state +#define QUERY(EV) case TEvBlobStorage::EV: return ExamineResultEvent<TEvBlobStorage::T##EV>(nodeId, ev); + QUERY(EvBlockResult) + QUERY(EvPutResult) + QUERY(EvPatchResult) + QUERY(EvInplacePatchResult) + QUERY(EvCollectGarbageResult) + QUERY(EvGetResult) + QUERY(EvRangeResult) + QUERY(EvDiscoverResult) +#undef QUERY + } + } + + void ExamineEnqueue(ui32 nodeId, IEventHandle& ev) { + switch (ev.GetTypeRewrite()) { // all of these events may alter storage state +#define RESULT(EV) case TEvBlobStorage::EV: return ExamineQueryEvent<TEvBlobStorage::T##EV>(nodeId, ev, TEvBlobStorage::EV##Result); + RESULT(EvBlock) + RESULT(EvPut) + RESULT(EvPatch) + RESULT(EvInplacePatch) + RESULT(EvCollectGarbage) + RESULT(EvGet) + RESULT(EvRange) + RESULT(EvDiscover) +#undef RESULT + } + } + + EBlobState GetBlobState(ui32 groupId, TLogoBlobID id) const { + if (const auto it = GroupStates.find(groupId); it != GroupStates.end()) { + return it->second.GetBlobState(id); + } else { + Y_FAIL_S("GroupId# " << groupId << " is not being overseen"); + } + } + + void EnumerateBlobs(ui32 groupId, const std::function<void(TLogoBlobID, EBlobState)>& callback) const { + if (const auto it = GroupStates.find(groupId); it != GroupStates.end()) { + return it->second.EnumerateBlobs(callback); + } else { + Y_FAIL(); + } + } + + private: + template<typename T> + void ExamineQueryEvent(ui32 nodeId, IEventHandle& ev, ui32 resultEventType) { + Y_VERIFY(ev.GetTypeRewrite() == T::EventType); + + const auto it = OverseenServiceMap.find(ev.Recipient); + if (it == OverseenServiceMap.end()) { + return; + } + const ui32 groupId = it->second; + + const TQueryId queryId{nodeId, resultEventType, ev.Sender, ev.Cookie}; + const auto [_, inserted] = QueryToGroup.emplace(queryId, groupId); + if (inserted) { + GroupStates[groupId].ExamineQueryEvent(queryId, *ev.Get<T>()); + } + } + + template<typename T> + void ExamineResultEvent(ui32 nodeId, IEventHandle& ev) { + Y_VERIFY(ev.GetTypeRewrite() == T::EventType); + + const TQueryId queryId{nodeId, T::EventType, ev.Recipient, ev.Cookie}; + const auto it = QueryToGroup.find(queryId); + if (it == QueryToGroup.end()) { + return; + } + const ui32 groupId = it->second; + QueryToGroup.erase(it); + + auto& msg = *ev.Get<T>(); + if constexpr (T::EventType != TEvBlobStorage::EvBlockResult && + T::EventType != TEvBlobStorage::EvInplacePatchResult && + T::EventType != TEvBlobStorage::EvCollectGarbageResult && + T::EventType != TEvBlobStorage::EvDiscoverResult) { + Y_VERIFY(groupId == msg.GroupId); + } + + GroupStates[groupId].ExamineResultEvent(queryId, msg); + } + }; + + TGroupOverseer::TGroupOverseer() + : Impl(std::make_unique<TImpl>()) + {} + + TGroupOverseer::~TGroupOverseer() + {} + + void TGroupOverseer::AddGroupToOversee(ui32 groupId) { + Impl->AddGroupToOversee(groupId); + } + + void TGroupOverseer::ExamineEnqueue(ui32 nodeId, IEventHandle& ev) { + Impl->ExamineEnqueue(nodeId, ev); + } + + void TGroupOverseer::ExamineEvent(ui32 nodeId, IEventHandle& ev) { + Impl->ExamineEvent(nodeId, ev); + } + + EBlobState TGroupOverseer::GetBlobState(ui32 groupId, TLogoBlobID id) const { + return Impl->GetBlobState(groupId, id); + } + + void TGroupOverseer::EnumerateBlobs(ui32 groupId, const std::function<void(TLogoBlobID, EBlobState)>& callback) const { + Impl->EnumerateBlobs(groupId, callback); + } + +} // NKikimr::NTesting diff --git a/ydb/core/blobstorage/testing/group_overseer/group_overseer.h b/ydb/core/blobstorage/testing/group_overseer/group_overseer.h new file mode 100644 index 00000000000..7da4381d3f9 --- /dev/null +++ b/ydb/core/blobstorage/testing/group_overseer/group_overseer.h @@ -0,0 +1,29 @@ +#pragma once + +#include "defs.h" + +namespace NKikimr::NTesting { + + enum class EBlobState { + NOT_WRITTEN, + POSSIBLY_WRITTEN, // write was in flight, but no confirmation received + CERTAINLY_WRITTEN, // write was confirmed + POSSIBLY_COLLECTED, // blob is beyond unconfirmed GC command + CERTAINLY_COLLECTED_OR_NEVER_WRITTEN, // blob is gonna be collected soon + }; + + class TGroupOverseer { + class TImpl; + std::unique_ptr<TImpl> Impl; + + public: + TGroupOverseer(); + ~TGroupOverseer(); + void AddGroupToOversee(ui32 groupId); + void ExamineEnqueue(ui32 nodeId, IEventHandle& ev); + void ExamineEvent(ui32 nodeId, IEventHandle& ev); + EBlobState GetBlobState(ui32 groupId, TLogoBlobID id) const; + void EnumerateBlobs(ui32 groupId, const std::function<void(TLogoBlobID, EBlobState)>& callback) const; + }; + +} // NKikimr::NTesting diff --git a/ydb/core/blobstorage/testing/group_overseer/group_state.cpp b/ydb/core/blobstorage/testing/group_overseer/group_state.cpp new file mode 100644 index 00000000000..469017496bd --- /dev/null +++ b/ydb/core/blobstorage/testing/group_overseer/group_state.cpp @@ -0,0 +1,372 @@ +#include "group_state.h" + +namespace NKikimr::NTesting { + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Block + + template<> + void TGroupState::ExamineQueryEvent(const TQueryId& queryId, const TEvBlobStorage::TEvBlock& msg) { + TBlockInfo& block = Blocks[msg.TabletId]; + const auto inflightIt = block.InFlight.emplace(msg); + const bool inserted1 = block.QueryToInFlight.emplace(queryId, inflightIt).second; + Y_VERIFY(inserted1); + const bool inserted2 = BlockQueryToTabletId.emplace(queryId, msg.TabletId).second; + Y_VERIFY(inserted2); + } + + template<> + void TGroupState::ExamineResultEvent(const TQueryId& queryId, const TEvBlobStorage::TEvBlockResult& msg) { + auto query = BlockQueryToTabletId.extract(queryId); + Y_VERIFY(query); + + TBlockInfo& block = Blocks[query.mapped()]; + + auto queryToInFlight = block.QueryToInFlight.extract(queryId); + Y_VERIFY(queryToInFlight); + auto inFlight = block.InFlight.extract(queryToInFlight.mapped()); + Y_VERIFY(inFlight); + TBlockedGeneration& gen = inFlight.value(); + + if (msg.Status == NKikimrProto::OK) { + if (!block.Confirmed || block.Confirmed->Generation < gen.Generation || gen.Same(*block.Confirmed)) { + block.Confirmed.emplace(gen); + } else { + Y_FAIL("incorrect successful block"); + } + } + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Put + + template<> + void TGroupState::ExamineQueryEvent(const TQueryId& queryId, const TEvBlobStorage::TEvPut& msg) { + TBlobInfo& blob = *LookupBlob(msg.Id, true); + + TBlobValueHash valueHash(msg); + + if (blob.ValueHash) { + Y_VERIFY(*blob.ValueHash == valueHash); + } else { + blob.ValueHash.emplace(valueHash); + } + + EConfidence isBlocked = IsBlocked(msg.Id.TabletID(), msg.Id.Generation()); + for (const auto& [tabletId, generation] : msg.ExtraBlockChecks) { + isBlocked = Max(isBlocked, IsBlocked(tabletId, generation)); + } + + const EConfidence isCollected = IsCollected(msg.Id, + blob.ConfirmedKeep ? EConfidence::CONFIRMED : + blob.NumKeepsInFlight ? EConfidence::POSSIBLE : + EConfidence::SURELY_NOT, + blob.ConfirmedDoNotKeep ? EConfidence::CONFIRMED : + blob.NumDoNotKeepsInFlight ? EConfidence::POSSIBLE : + EConfidence::SURELY_NOT); + + const bool inserted = blob.PutsInFlight.emplace(queryId, TBlobInfo::TQueryContext{ + .IsBlocked = isBlocked == EConfidence::CONFIRMED, // if true, we can't get OK answer + .IsCollected = isCollected == EConfidence::CONFIRMED, // if true, this blob is being put beyond the barrier + }).second; + Y_VERIFY(inserted); + } + + template<> + void TGroupState::ExamineResultEvent(const TQueryId& queryId, const TEvBlobStorage::TEvPutResult& msg) { + const auto it = Blobs.find(msg.Id); + if (msg.Status != NKikimrProto::OK && it == Blobs.end()) { + Y_VERIFY(GetBlobState(msg.Id) == EBlobState::CERTAINLY_COLLECTED_OR_NEVER_WRITTEN); + return; // it's okay to get ERROR for TEvPut when the blob is already collected and is not in the map + } + Y_VERIFY(it != Blobs.end()); + TBlobInfo& blob = it->second; + + auto putInFlight = blob.PutsInFlight.extract(queryId); + Y_VERIFY(putInFlight); + + auto& v = putInFlight.mapped(); + if (v.IsBlocked && (msg.Status != NKikimrProto::BLOCKED && msg.Status != NKikimrProto::ERROR)) { + Y_FAIL("incorrect TEvPut result status code -- was BLOCKED at the begin of the query"); + } + + if (v.IsCollected && msg.Status != NKikimrProto::ERROR) { + Y_FAIL("incorrect TEvPut result status code -- was already beyond the barrier at begin of the query"); + } + + if (msg.Status == NKikimrProto::OK) { + Y_VERIFY(blob.ValueHash); + blob.ConfirmedValue = true; + } + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Patch + + template<> + void TGroupState::ExamineQueryEvent(const TQueryId&, const TEvBlobStorage::TEvPatch&) { + Y_FAIL("not implemented"); + } + + template<> + void TGroupState::ExamineResultEvent(const TQueryId&, const TEvBlobStorage::TEvPatchResult&) { + Y_FAIL("not implemented"); + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // InplacePatch + + template<> + void TGroupState::ExamineQueryEvent(const TQueryId&, const TEvBlobStorage::TEvInplacePatch&) { + Y_FAIL("not implemented"); + } + + template<> + void TGroupState::ExamineResultEvent(const TQueryId&, const TEvBlobStorage::TEvInplacePatchResult&) { + Y_FAIL("not implemented"); + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // CollectGarbage + + template<> + void TGroupState::ExamineQueryEvent(const TQueryId& queryId, const TEvBlobStorage::TEvCollectGarbage& msg) { + auto processFlags = [&](TVector<TLogoBlobID> *vector, bool isKeepFlag) { + for (const TLogoBlobID& id : vector ? *vector : TVector<TLogoBlobID>()) { + TBlobInfo& blob = *LookupBlob(id, true); + ++(isKeepFlag ? blob.NumKeepsInFlight : blob.NumDoNotKeepsInFlight); + FlagsInFlight.emplace(queryId, std::make_tuple(isKeepFlag, id)); + } + }; + + processFlags(msg.Keep.Get(), true); + processFlags(msg.DoNotKeep.Get(), true); + + if (msg.Collect) { + const TBarrierId id(msg.TabletId, msg.Channel); + TBarrierInfo& barrier = Barriers[id]; + + auto inFlightIt = barrier.InFlight[msg.Hard].insert({ + .Hard = msg.Hard, + .Value{ + .RecordGeneration = msg.RecordGeneration, + .PerGenerationCounter = msg.PerGenerationCounter, + .CollectGeneration = msg.CollectGeneration, + .CollectStep = msg.CollectStep + } + }); + + const bool inserted = barrier.CollectsInFlight.emplace(queryId, inFlightIt).second; + Y_VERIFY(inserted); + } + } + + template<> + void TGroupState::ExamineResultEvent(const TQueryId& queryId, const TEvBlobStorage::TEvCollectGarbageResult& msg) { + auto [begin, end] = FlagsInFlight.equal_range(queryId); + for (auto it = begin; it != end; ++it) { + const auto& [isKeepFlag, id] = it->second; + TBlobInfo& blob = *LookupBlob(id, true); + --(isKeepFlag ? blob.NumKeepsInFlight : blob.NumDoNotKeepsInFlight); + if (msg.Status == NKikimrProto::OK) { + (isKeepFlag ? blob.ConfirmedKeep : blob.ConfirmedDoNotKeep) = true; + } + } + FlagsInFlight.erase(begin, end); + + const TBarrierId barrierId(msg.TabletId, msg.Channel); + TBarrierInfo& barrier = Barriers[barrierId]; + if (auto query = barrier.CollectsInFlight.extract(queryId)) { + auto inFlightIt = query.mapped(); + const bool hard = inFlightIt->Hard; + auto inFlight = barrier.InFlight[hard].extract(inFlightIt); + Y_VERIFY(inFlight); + if (msg.Status == NKikimrProto::OK) { + if (auto& dest = barrier.Confirmed[hard]) { + dest->Supersede(inFlight.value().Value); + } else { + dest.emplace(inFlight.value().Value); + } + } + } + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Get + + template<> + void TGroupState::ExamineQueryEvent(const TQueryId& queryId, const TEvBlobStorage::TEvGet& msg) { + (void)queryId, (void)msg; + } + + template<> + void TGroupState::ExamineResultEvent(const TQueryId& queryId, const TEvBlobStorage::TEvGetResult& msg) { + (void)queryId, (void)msg; + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Range + + template<> + void TGroupState::ExamineQueryEvent(const TQueryId& queryId, const TEvBlobStorage::TEvRange& msg) { + (void)queryId, (void)msg; + } + + template<> + void TGroupState::ExamineResultEvent(const TQueryId& queryId, const TEvBlobStorage::TEvRangeResult& msg) { + (void)queryId, (void)msg; + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Discover + + template<> + void TGroupState::ExamineQueryEvent(const TQueryId& queryId, const TEvBlobStorage::TEvDiscover& msg) { + (void)queryId, (void)msg; + } + + template<> + void TGroupState::ExamineResultEvent(const TQueryId& queryId, const TEvBlobStorage::TEvDiscoverResult& msg) { + (void)queryId, (void)msg; + } + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Common parts + + TGroupState::EConfidence TGroupState::TBlockInfo::IsBlocked(ui32 generation) const { + if (Confirmed && generation <= Confirmed->Generation) { + return EConfidence::CONFIRMED; + } else if (InFlight.empty()) { + return EConfidence::SURELY_NOT; + } else { + const TBlockedGeneration& gen = *--InFlight.end(); + return generation <= gen.Generation ? EConfidence::POSSIBLE : EConfidence::SURELY_NOT; + } + } + + TGroupState::EConfidence TGroupState::IsBlocked(ui64 tabletId, ui32 generation) const { + const auto it = Blocks.find(tabletId); + return it == Blocks.end() ? EConfidence::SURELY_NOT : it->second.IsBlocked(generation); + } + + TGroupState::EConfidence TGroupState::IsCollected(TLogoBlobID id, EConfidence keep, EConfidence doNotKeep) const { + EConfidence result = EConfidence::SURELY_NOT; + + const TBarrierId barrierId(id.TabletID(), id.Channel()); + if (const auto it = Barriers.find(barrierId); it != Barriers.end()) { + const TBarrierInfo& barrier = it->second; + const auto genstep = std::make_tuple(id.Generation(), id.Step()); + + EConfidence isCollectedBySoftBarrier; + switch (keep) { + case EConfidence::SURELY_NOT: + isCollectedBySoftBarrier = EConfidence::CONFIRMED; + break; + + case EConfidence::POSSIBLE: + switch (doNotKeep) { + case EConfidence::SURELY_NOT: + case EConfidence::POSSIBLE: + isCollectedBySoftBarrier = EConfidence::POSSIBLE; + break; + + case EConfidence::CONFIRMED: + // this case should not occur + isCollectedBySoftBarrier = EConfidence::SURELY_NOT; + break; + } + break; + + case EConfidence::CONFIRMED: + switch (doNotKeep) { + case EConfidence::SURELY_NOT: + isCollectedBySoftBarrier = EConfidence::SURELY_NOT; + break; + + case EConfidence::POSSIBLE: + case EConfidence::CONFIRMED: + isCollectedBySoftBarrier = EConfidence::POSSIBLE; + break; + } + break; + } + + auto getBarrierState = [&](const auto& confirmed, const auto& inflight) { + if (confirmed && genstep <= confirmed->GetCollectGenStep()) { + return EConfidence::CONFIRMED; + } + if (!inflight.empty()) { + const auto& most = *--inflight.end(); + if (genstep <= most.Value.GetCollectGenStep()) { + return EConfidence::POSSIBLE; + } + } + return EConfidence::SURELY_NOT; + }; + + result = Max(getBarrierState(barrier.Confirmed[true], barrier.InFlight[true]), + Min(isCollectedBySoftBarrier, getBarrierState(barrier.Confirmed[false], barrier.InFlight[false]))); + } + + return result; + } + + TGroupState::TBlobInfo *TGroupState::LookupBlob(TLogoBlobID id, bool create) const { + Y_VERIFY(id.BlobSize() != 0); + Y_VERIFY(id.PartId() == 0); + + const TLogoBlobID min(id.TabletID(), id.Generation(), id.Step(), id.Channel(), 0, id.Cookie()); + auto it = Blobs.lower_bound(min); + if (it != Blobs.end()) { + const TLogoBlobID& key = it->first; + if (key.TabletID() == id.TabletID() && key.Channel() == id.Channel() && key.Generation() == id.Generation() && + key.Step() == id.Step() && key.Cookie() == id.Cookie()) { + Y_VERIFY(key == id); + } else if (create) { + it = const_cast<decltype(Blobs)&>(Blobs).try_emplace(it, id); + } else { + return nullptr; + } + return const_cast<TBlobInfo*>(&it->second); + } + return create ? &const_cast<decltype(Blobs)&>(Blobs)[id] : nullptr; + } + + EBlobState TGroupState::GetBlobState(TLogoBlobID id, const TBlobInfo *blob) const { + if (!blob) { + blob = LookupBlob(id, false); + } + if (!blob) { + switch (IsCollected(id, EConfidence::SURELY_NOT, EConfidence::SURELY_NOT)) { + case EConfidence::SURELY_NOT: return EBlobState::NOT_WRITTEN; + case EConfidence::POSSIBLE: return EBlobState::NOT_WRITTEN; + case EConfidence::CONFIRMED: return EBlobState::CERTAINLY_COLLECTED_OR_NEVER_WRITTEN; + } + Y_UNREACHABLE(); + } + if (!blob->ValueHash) { + return EBlobState::NOT_WRITTEN; + } + const EConfidence keep = blob->ConfirmedKeep ? EConfidence::CONFIRMED : + blob->NumKeepsInFlight ? EConfidence::POSSIBLE : EConfidence::SURELY_NOT; + const EConfidence doNotKeep = blob->ConfirmedDoNotKeep ? EConfidence::CONFIRMED : + blob->NumDoNotKeepsInFlight ? EConfidence::POSSIBLE : EConfidence::SURELY_NOT; + switch (IsCollected(id, keep, doNotKeep)) { + case EConfidence::SURELY_NOT: + return blob->ConfirmedValue ? EBlobState::CERTAINLY_WRITTEN : EBlobState::POSSIBLY_WRITTEN; + + case EConfidence::POSSIBLE: + return EBlobState::POSSIBLY_COLLECTED; + + case EConfidence::CONFIRMED: + Y_FAIL(); // must never reach this point -- blob must be deleted from the map when this state occurs + } + } + + void TGroupState::EnumerateBlobs(const std::function<void(TLogoBlobID, EBlobState)>& callback) const { + for (const auto& [id, blob] : Blobs) { + callback(id, GetBlobState(id, &blob)); + } + } + +} // NKikimr::NTesting diff --git a/ydb/core/blobstorage/testing/group_overseer/group_state.h b/ydb/core/blobstorage/testing/group_overseer/group_state.h new file mode 100644 index 00000000000..f71655d4ddf --- /dev/null +++ b/ydb/core/blobstorage/testing/group_overseer/group_state.h @@ -0,0 +1,161 @@ +#pragma once + +#include "defs.h" + +namespace NKikimr::NTesting { + + using TQueryId = std::tuple<ui32, ui32, TActorId, ui64>; // nodeId, result type, sender, cookie + using TBarrierId = std::tuple<ui64, ui8>; + +} // NKikimr::NTesting + +template<> +struct std::hash<NKikimr::NTesting::TQueryId> { + size_t operator ()(const NKikimr::NTesting::TQueryId& x) const { + return MultiHash(std::get<0>(x), std::get<1>(x), std::get<2>(x), std::get<3>(x)); + } +}; + +template<> +struct std::hash<NKikimr::NTesting::TBarrierId> { + size_t operator ()(const NKikimr::NTesting::TBarrierId& x) const { + return MultiHash(std::get<0>(x), std::get<1>(x)); + } +}; + +namespace NKikimr::NTesting { + + class TGroupState { + + enum class EConfidence { + SURELY_NOT, + POSSIBLE, + CONFIRMED, + }; + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // Blocks + + struct TBlockedGeneration { + ui32 Generation; + ui64 IssuerGuid; + + TBlockedGeneration(const TEvBlobStorage::TEvBlock& msg) + : Generation(msg.Generation) + , IssuerGuid(msg.IssuerGuid) + {} + + TBlockedGeneration(const TBlockedGeneration&) = default; + + friend bool operator <(const TBlockedGeneration& x, const TBlockedGeneration& y) { + return x.Generation < y.Generation; + } + + bool Same(const TBlockedGeneration& x) const { + return Generation == x.Generation && IssuerGuid == x.IssuerGuid && IssuerGuid; + } + }; + + struct TBlockInfo { + std::optional<TBlockedGeneration> Confirmed; + std::multiset<TBlockedGeneration> InFlight; + std::unordered_map<TQueryId, std::multiset<TBlockedGeneration>::iterator> QueryToInFlight; + + EConfidence IsBlocked(ui32 generation) const; + }; + + std::unordered_map<ui64, TBlockInfo> Blocks; + std::unordered_map<TQueryId, ui64> BlockQueryToTabletId; + + EConfidence IsBlocked(ui64 tabletId, ui32 generation) const; + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + struct TBarrierInfo { + struct TValue { + ui32 RecordGeneration; + ui32 PerGenerationCounter; + ui32 CollectGeneration; + ui32 CollectStep; + + std::tuple<ui32, ui32> GetCollectGenStep() const { return {CollectGeneration, CollectStep}; } + friend bool operator <(const TValue& x, const TValue& y) { return x.GetCollectGenStep() < y.GetCollectGenStep(); } + + void Supersede(const TValue& with) { + Y_VERIFY(std::tie(RecordGeneration, PerGenerationCounter) <= std::tie(with.RecordGeneration, with.PerGenerationCounter)); + if (RecordGeneration == with.RecordGeneration && PerGenerationCounter == with.PerGenerationCounter) { + Y_VERIFY(GetCollectGenStep() == with.GetCollectGenStep()); + } else { + Y_VERIFY(GetCollectGenStep() <= with.GetCollectGenStep()); + } + *this = with; + } + }; + std::optional<TValue> Confirmed[2]; // soft, hard + + struct TInFlightCollect { + bool Hard; + TValue Value; + + friend bool operator <(const TInFlightCollect& x, const TInFlightCollect& y) { return x.Value < y.Value; } + }; + + std::multiset<TInFlightCollect> InFlight[2]; // soft, hard + std::unordered_map<TQueryId, std::multiset<TInFlightCollect>::iterator> CollectsInFlight; + }; + + std::unordered_map<TBarrierId, TBarrierInfo> Barriers; + std::unordered_multimap<TQueryId, std::tuple<bool, TLogoBlobID>> FlagsInFlight; + + EConfidence IsCollected(TLogoBlobID id, EConfidence keep, EConfidence doNotKeep) const; + + struct TBlobValueHash { + ui64 Low; + ui64 High; + + TBlobValueHash(const TEvBlobStorage::TEvPut& msg) { + uint64_t high; + TRope buffer(msg.Buffer); + const auto span = buffer.GetContiguousSpan(); + Low = t1ha2_atonce128(&high, span.data(), span.size(), 1); + High = high; + } + + TBlobValueHash(const TBlobValueHash&) = default; + + friend bool operator ==(const TBlobValueHash& x, const TBlobValueHash& y) { return x.Low == y.Low && x.High == y.High; } + friend bool operator !=(const TBlobValueHash& x, const TBlobValueHash& y) { return !(x == y); } + }; + + struct TBlobInfo { + std::optional<TBlobValueHash> ValueHash; // nullopt if not written + bool ConfirmedValue = false; + bool ConfirmedKeep = false; + bool ConfirmedDoNotKeep = false; + ui32 NumKeepsInFlight = 0; // number of CollectGarbage requests in flight with Keep flag for this blob + ui32 NumDoNotKeepsInFlight = 0; // the same, but for DoNotKeep flag + EBlobState BlobState = EBlobState::NOT_WRITTEN; + + struct TQueryContext { + bool IsBlocked = false; // was the request already blocked when the Put got issued? + bool IsCollected = false; // was the blob id under garbage barrier when the Put got issued? + }; + std::unordered_map<TQueryId, TQueryContext> PutsInFlight; + }; + + std::map<TLogoBlobID, TBlobInfo> Blobs; + + TBlobInfo *LookupBlob(TLogoBlobID id, bool create) const; + + public: + template<typename T> + void ExamineQueryEvent(const TQueryId& queryId, const T& msg); + + template<typename T> + void ExamineResultEvent(const TQueryId& queryId, const T& msg); + + EBlobState GetBlobState(TLogoBlobID id, const TBlobInfo *blob = nullptr) const; + void EnumerateBlobs(const std::function<void(TLogoBlobID, EBlobState)>& callback) const; + }; + +} // NKikimr::NTesting diff --git a/ydb/core/blobstorage/ut_testshard/CMakeLists.darwin.txt b/ydb/core/blobstorage/ut_testshard/CMakeLists.darwin.txt index 052d4f6ebef..ea1e178bd15 100644 --- a/ydb/core/blobstorage/ut_testshard/CMakeLists.darwin.txt +++ b/ydb/core/blobstorage/ut_testshard/CMakeLists.darwin.txt @@ -19,6 +19,7 @@ target_link_libraries(ydb-core-blobstorage-ut_testshard PUBLIC blobstorage-dsproxy-mock core-blobstorage-nodewarden blobstorage-pdisk-mock + blobstorage-testing-group_overseer blobstorage-vdisk-common ydb-core-mind core-mind-bscontroller diff --git a/ydb/core/blobstorage/ut_testshard/CMakeLists.linux.txt b/ydb/core/blobstorage/ut_testshard/CMakeLists.linux.txt index 89622e7eff4..86e157a4e4d 100644 --- a/ydb/core/blobstorage/ut_testshard/CMakeLists.linux.txt +++ b/ydb/core/blobstorage/ut_testshard/CMakeLists.linux.txt @@ -21,6 +21,7 @@ target_link_libraries(ydb-core-blobstorage-ut_testshard PUBLIC blobstorage-dsproxy-mock core-blobstorage-nodewarden blobstorage-pdisk-mock + blobstorage-testing-group_overseer blobstorage-vdisk-common ydb-core-mind core-mind-bscontroller diff --git a/ydb/core/blobstorage/ut_testshard/defs.h b/ydb/core/blobstorage/ut_testshard/defs.h index 6cbaf4cbae5..26a9626097e 100644 --- a/ydb/core/blobstorage/ut_testshard/defs.h +++ b/ydb/core/blobstorage/ut_testshard/defs.h @@ -2,11 +2,13 @@ #include <ydb/core/base/hive.h> #include <ydb/core/blob_depot/blob_depot.h> +#include <ydb/core/blob_depot/testing.h> #include <ydb/core/blobstorage/crypto/default.h> #include <ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.h> #include <ydb/core/blobstorage/dsproxy/mock/model.h> -#include <ydb/core/blobstorage/pdisk/mock/pdisk_mock.h> #include <ydb/core/blobstorage/nodewarden/node_warden.h> +#include <ydb/core/blobstorage/pdisk/mock/pdisk_mock.h> +#include <ydb/core/blobstorage/testing/group_overseer/group_overseer.h> #include <ydb/core/mind/bscontroller/bsc.h> #include <ydb/core/mind/bscontroller/types.h> #include <ydb/core/mind/dynamic_nameserver.h> diff --git a/ydb/core/blobstorage/ut_testshard/env.h b/ydb/core/blobstorage/ut_testshard/env.h index f354de8e179..6a55403faf5 100644 --- a/ydb/core/blobstorage/ut_testshard/env.h +++ b/ydb/core/blobstorage/ut_testshard/env.h @@ -14,11 +14,11 @@ struct TEnvironmentSetup { TIntrusivePtr<NFake::TProxyDS> Group0 = MakeIntrusive<NFake::TProxyDS>(); std::map<std::pair<ui32, ui32>, TIntrusivePtr<TPDiskMockState>> PDiskMockStates; NKikimr::NTestShard::TTestShardContext::TPtr TestShardContext = NKikimr::NTestShard::TTestShardContext::Create(); + NKikimr::NTesting::TGroupOverseer GroupOverseer; struct TSettings { const ui32 NodeCount = 8; const bool Encryption = false; - const std::function<void(TTestActorSystem&)> PrepareRuntime; const ui32 ControllerNodeId = 1; }; @@ -77,9 +77,16 @@ struct TEnvironmentSetup { void Initialize() { Runtime = std::make_unique<TTestActorSystem>(Settings.NodeCount, NLog::PRI_ERROR); - if (Settings.PrepareRuntime) { - Settings.PrepareRuntime(*Runtime); - } + + Runtime->FilterFunction = [this](ui32 nodeId, std::unique_ptr<IEventHandle>& ev) { + GroupOverseer.ExamineEvent(nodeId, *ev); + return true; + }; + Runtime->FilterEnqueue = [this](ui32 nodeId, std::unique_ptr<IEventHandle>& ev, ISchedulerCookie*, TInstant) { + GroupOverseer.ExamineEnqueue(nodeId, *ev); + return true; + }; + SetupLogging(); Runtime->Start(); auto *appData = Runtime->GetAppData(); @@ -339,8 +346,21 @@ struct TEnvironmentSetup { prof->SetCount(32); } + { + auto *cmd = request.AddCommand(); + cmd->MutableQueryBaseConfig(); + } + auto response = Invoke(request); UNIT_ASSERT_C(response.GetSuccess(), response.GetErrorDescription()); + + for (const auto& status : response.GetStatus()) { + if (status.HasBaseConfig()) { + for (const auto& group : status.GetBaseConfig().GetGroup()) { + GroupOverseer.AddGroupToOversee(group.GetGroupId()); + } + } + } } void Sim(TDuration delta = TDuration::Zero()) { diff --git a/ydb/core/blobstorage/ut_testshard/main.cpp b/ydb/core/blobstorage/ut_testshard/main.cpp index 73f1f262355..441283881a0 100644 --- a/ydb/core/blobstorage/ut_testshard/main.cpp +++ b/ydb/core/blobstorage/ut_testshard/main.cpp @@ -99,7 +99,19 @@ Y_UNIT_TEST_SUITE(BlobDepotWithTestShard) { Cerr << "TabletId# " << tabletId << " configured" << Endl; } - env.Sim(TDuration::Seconds(5)); + std::vector<IActor*> blobDepots; + env.Runtime->EnumActors([&](IActor *actor) { + if (NBlobDepot::IsBlobDepotActor(actor)) { + blobDepots.push_back(actor); + } + }); + + for (ui32 i = 0; i < 1000; ++i) { + for (IActor *actor : blobDepots) { + NBlobDepot::ValidateBlobDepot(actor, env.GroupOverseer); + } + env.Sim(TDuration::MilliSeconds(5)); + } } } diff --git a/ydb/core/util/testactorsys.h b/ydb/core/util/testactorsys.h index a6b9f2ca913..ab12d5c7049 100644 --- a/ydb/core/util/testactorsys.h +++ b/ydb/core/util/testactorsys.h @@ -66,8 +66,8 @@ class TTestActorSystem { ISchedulerCookie *Cookie; ui32 NodeId; - TScheduleItem(TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, ui32 nodeId) - : Event(ev.Release()) + TScheduleItem(std::unique_ptr<IEventHandle> ev, ISchedulerCookie *cookie, ui32 nodeId) + : Event(std::move(ev)) , Cookie(cookie) , NodeId(nodeId) {} @@ -160,6 +160,7 @@ class TTestActorSystem { public: std::function<bool(ui32, std::unique_ptr<IEventHandle>&)> FilterFunction; + std::function<bool(ui32, std::unique_ptr<IEventHandle>&, ISchedulerCookie*, TInstant)> FilterEnqueue; IOutputStream *LogStream = &Cerr; public: @@ -418,7 +419,10 @@ public: if (ev && ev->HasEvent() && ev->GetTypeRewrite() == ev->Type && !EventName.count(ev->Type)) { EventName.emplace(ev->Type, TypeName(*ev->GetBase())); } - ScheduleQ[ts].emplace_back(ev, cookie, nodeId); + std::unique_ptr<IEventHandle> evPtr(ev.Release()); + if (!FilterEnqueue || FilterEnqueue(nodeId, evPtr, cookie, ts)) { + ScheduleQ[ts].emplace_back(std::move(evPtr), cookie, nodeId); + } } void Schedule(TDuration timeout, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, ui32 nodeId) { diff --git a/ydb/core/viewer/json_browse.h b/ydb/core/viewer/json_browse.h index 6a5532ed50e..b14019893a3 100644 --- a/ydb/core/viewer/json_browse.h +++ b/ydb/core/viewer/json_browse.h @@ -14,17 +14,6 @@ #include "viewer.h" #include "wb_aggregate.h" -namespace std { - -template <> -struct hash<NActors::TActorId> { - size_t operator ()(const NActors::TActorId& actorId) const { - return actorId.Hash(); - } -}; - -} - namespace NKikimr { namespace NViewer { |