aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexvru <alexvru@ydb.tech>2022-09-24 20:51:05 +0300
committeralexvru <alexvru@ydb.tech>2022-09-24 20:51:05 +0300
commit5db281b96a25fdced40fb6e21a00c2c501ad0662 (patch)
treecaa2b0c412b82dd379ca684f9cb073631f16b97b
parent1e31d37252ec7af9d09375f1acd77dda07e08366 (diff)
downloadydb-5db281b96a25fdced40fb6e21a00c2c501ad0662.tar.gz
Add GroupOverseer component for testing
-rw-r--r--library/cpp/actors/core/actor.h2
-rw-r--r--library/cpp/actors/core/actorid.h2
-rw-r--r--ydb/core/blob_depot/CMakeLists.txt1
-rw-r--r--ydb/core/blob_depot/blob_depot_tablet.h11
-rw-r--r--ydb/core/blob_depot/defs.h1
-rw-r--r--ydb/core/blob_depot/testing.cpp27
-rw-r--r--ydb/core/blob_depot/testing.h16
-rw-r--r--ydb/core/blobstorage/CMakeLists.txt1
-rw-r--r--ydb/core/blobstorage/testing/CMakeLists.txt9
-rw-r--r--ydb/core/blobstorage/testing/group_overseer/CMakeLists.txt20
-rw-r--r--ydb/core/blobstorage/testing/group_overseer/defs.h6
-rw-r--r--ydb/core/blobstorage/testing/group_overseer/group_overseer.cpp133
-rw-r--r--ydb/core/blobstorage/testing/group_overseer/group_overseer.h29
-rw-r--r--ydb/core/blobstorage/testing/group_overseer/group_state.cpp372
-rw-r--r--ydb/core/blobstorage/testing/group_overseer/group_state.h161
-rw-r--r--ydb/core/blobstorage/ut_testshard/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/blobstorage/ut_testshard/CMakeLists.linux.txt1
-rw-r--r--ydb/core/blobstorage/ut_testshard/defs.h4
-rw-r--r--ydb/core/blobstorage/ut_testshard/env.h28
-rw-r--r--ydb/core/blobstorage/ut_testshard/main.cpp14
-rw-r--r--ydb/core/util/testactorsys.h10
-rw-r--r--ydb/core/viewer/json_browse.h11
22 files changed, 840 insertions, 20 deletions
diff --git a/library/cpp/actors/core/actor.h b/library/cpp/actors/core/actor.h
index 0a3b4110db0..267107da846 100644
--- a/library/cpp/actors/core/actor.h
+++ b/library/cpp/actors/core/actor.h
@@ -555,3 +555,5 @@ struct THash<NActors::TActorIdentity> {
return x.Hash();
}
};
+
+template<> struct std::hash<NActors::TActorIdentity> : THash<NActors::TActorIdentity> {};
diff --git a/library/cpp/actors/core/actorid.h b/library/cpp/actors/core/actorid.h
index d972b1a0ff1..a9cac589b2b 100644
--- a/library/cpp/actors/core/actorid.h
+++ b/library/cpp/actors/core/actorid.h
@@ -194,3 +194,5 @@ struct THash<NActors::TActorId> {
return x.Hash();
}
};
+
+template<> struct std::hash<NActors::TActorId> : THash<NActors::TActorId> {};
diff --git a/ydb/core/blob_depot/CMakeLists.txt b/ydb/core/blob_depot/CMakeLists.txt
index f92f8aca9a0..6461836196d 100644
--- a/ydb/core/blob_depot/CMakeLists.txt
+++ b/ydb/core/blob_depot/CMakeLists.txt
@@ -30,6 +30,7 @@ target_sources(ydb-core-blob_depot PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/given_id_range.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/group_metrics_exchange.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/mon_main.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/testing.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_apply_config.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_init_schema.cpp
${CMAKE_SOURCE_DIR}/ydb/core/blob_depot/op_load.cpp
diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h
index 46c7d75251b..42a87d1c96e 100644
--- a/ydb/core/blob_depot/blob_depot_tablet.h
+++ b/ydb/core/blob_depot/blob_depot_tablet.h
@@ -5,6 +5,12 @@
#include "types.h"
#include "schema.h"
+namespace NKikimr::NTesting {
+
+ class TGroupOverseer;
+
+} // NKikimr::NTesting
+
namespace NKikimr::NBlobDepot {
using NTabletFlatExecutor::TTabletExecutedFlat;
@@ -227,6 +233,11 @@ namespace NKikimr::NBlobDepot {
void DoGroupMetricsExchange();
void Handle(TEvBlobStorage::TEvControllerGroupMetricsExchange::TPtr ev);
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Validation
+
+ void Validate(NTesting::TGroupOverseer& overseer) const;
};
} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/defs.h b/ydb/core/blob_depot/defs.h
index 997866ecd23..807509fd456 100644
--- a/ydb/core/blob_depot/defs.h
+++ b/ydb/core/blob_depot/defs.h
@@ -5,6 +5,7 @@
#include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h>
#include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo_sets.h>
#include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo_partlayout.h>
+#include <ydb/core/blobstorage/testing/group_overseer/group_overseer.h>
#include <ydb/core/blobstorage/vdisk/common/vdisk_events.h>
#include <ydb/core/engine/minikql/flat_local_tx_factory.h>
#include <ydb/core/tablet_flat/tablet_flat_executed.h>
diff --git a/ydb/core/blob_depot/testing.cpp b/ydb/core/blob_depot/testing.cpp
new file mode 100644
index 00000000000..27cf6fe6e6f
--- /dev/null
+++ b/ydb/core/blob_depot/testing.cpp
@@ -0,0 +1,27 @@
+#include "testing.h"
+#include "blob_depot_tablet.h"
+
+namespace NKikimr::NBlobDepot {
+
+ bool IsBlobDepotActor(IActor *actor) {
+ return dynamic_cast<TBlobDepot*>(actor);
+ }
+
+ void ValidateBlobDepot(IActor *actor, NTesting::TGroupOverseer& overseer) {
+ if (auto *x = dynamic_cast<TBlobDepot*>(actor)) {
+ x->Validate(overseer);
+ } else {
+ Y_FAIL();
+ }
+ }
+
+ void TBlobDepot::Validate(NTesting::TGroupOverseer& overseer) const {
+ Y_VERIFY(Config.HasVirtualGroupId());
+ (void)overseer;
+// overseer.EnumerateBlobs(Config.GetVirtualGroupId(), [&](TLogoBlobID id, NTesting::EBlobState state) {
+// Cerr << id.ToString() << Endl;
+// (void)state;
+// });
+ }
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blob_depot/testing.h b/ydb/core/blob_depot/testing.h
new file mode 100644
index 00000000000..04d5d87646a
--- /dev/null
+++ b/ydb/core/blob_depot/testing.h
@@ -0,0 +1,16 @@
+#pragma once
+
+#include "defs.h"
+
+namespace NKikimr::NTesting {
+
+ class TGroupOverseer;
+
+} // NKikimr::NTesting
+
+namespace NKikimr::NBlobDepot {
+
+ bool IsBlobDepotActor(IActor *actor);
+ void ValidateBlobDepot(IActor *actor, NTesting::TGroupOverseer& overseer);
+
+} // NKikimr::NBlobDepot
diff --git a/ydb/core/blobstorage/CMakeLists.txt b/ydb/core/blobstorage/CMakeLists.txt
index c6dc5671666..77cb7c9da43 100644
--- a/ydb/core/blobstorage/CMakeLists.txt
+++ b/ydb/core/blobstorage/CMakeLists.txt
@@ -17,6 +17,7 @@ add_subdirectory(nodewarden)
add_subdirectory(other)
add_subdirectory(pdisk)
add_subdirectory(storagepoolmon)
+add_subdirectory(testing)
add_subdirectory(testload)
add_subdirectory(ut_blobstorage)
add_subdirectory(ut_group)
diff --git a/ydb/core/blobstorage/testing/CMakeLists.txt b/ydb/core/blobstorage/testing/CMakeLists.txt
new file mode 100644
index 00000000000..8fbce163859
--- /dev/null
+++ b/ydb/core/blobstorage/testing/CMakeLists.txt
@@ -0,0 +1,9 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(group_overseer)
diff --git a/ydb/core/blobstorage/testing/group_overseer/CMakeLists.txt b/ydb/core/blobstorage/testing/group_overseer/CMakeLists.txt
new file mode 100644
index 00000000000..32dd4aeaed9
--- /dev/null
+++ b/ydb/core/blobstorage/testing/group_overseer/CMakeLists.txt
@@ -0,0 +1,20 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(blobstorage-testing-group_overseer)
+target_link_libraries(blobstorage-testing-group_overseer PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ ydb-core-base
+ contrib-libs-t1ha
+)
+target_sources(blobstorage-testing-group_overseer PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/testing/group_overseer/group_overseer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/blobstorage/testing/group_overseer/group_state.cpp
+)
diff --git a/ydb/core/blobstorage/testing/group_overseer/defs.h b/ydb/core/blobstorage/testing/group_overseer/defs.h
new file mode 100644
index 00000000000..ed072023e02
--- /dev/null
+++ b/ydb/core/blobstorage/testing/group_overseer/defs.h
@@ -0,0 +1,6 @@
+#pragma once
+
+#include <ydb/core/base/blobstorage.h>
+#include <ydb/core/blobstorage/testing/group_overseer/group_overseer.h>
+
+#include <contrib/libs/t1ha/t1ha.h>
diff --git a/ydb/core/blobstorage/testing/group_overseer/group_overseer.cpp b/ydb/core/blobstorage/testing/group_overseer/group_overseer.cpp
new file mode 100644
index 00000000000..fe662e5249e
--- /dev/null
+++ b/ydb/core/blobstorage/testing/group_overseer/group_overseer.cpp
@@ -0,0 +1,133 @@
+#include "group_overseer.h"
+#include "group_state.h"
+
+namespace NKikimr::NTesting {
+
+ class TGroupOverseer::TImpl {
+ std::unordered_map<TActorId, ui32> OverseenServiceMap;
+ std::unordered_map<ui32, TGroupState> GroupStates;
+ std::unordered_map<TQueryId, ui32> QueryToGroup;
+
+ public:
+ void AddGroupToOversee(ui32 groupId) {
+ const TActorId proxyId = MakeBlobStorageProxyID(groupId);
+ OverseenServiceMap[proxyId] = groupId;
+ GroupStates.try_emplace(groupId);
+ }
+
+ void ExamineEvent(ui32 nodeId, IEventHandle& ev) {
+ switch (ev.GetTypeRewrite()) { // all of these events may alter storage state
+#define QUERY(EV) case TEvBlobStorage::EV: return ExamineResultEvent<TEvBlobStorage::T##EV>(nodeId, ev);
+ QUERY(EvBlockResult)
+ QUERY(EvPutResult)
+ QUERY(EvPatchResult)
+ QUERY(EvInplacePatchResult)
+ QUERY(EvCollectGarbageResult)
+ QUERY(EvGetResult)
+ QUERY(EvRangeResult)
+ QUERY(EvDiscoverResult)
+#undef QUERY
+ }
+ }
+
+ void ExamineEnqueue(ui32 nodeId, IEventHandle& ev) {
+ switch (ev.GetTypeRewrite()) { // all of these events may alter storage state
+#define RESULT(EV) case TEvBlobStorage::EV: return ExamineQueryEvent<TEvBlobStorage::T##EV>(nodeId, ev, TEvBlobStorage::EV##Result);
+ RESULT(EvBlock)
+ RESULT(EvPut)
+ RESULT(EvPatch)
+ RESULT(EvInplacePatch)
+ RESULT(EvCollectGarbage)
+ RESULT(EvGet)
+ RESULT(EvRange)
+ RESULT(EvDiscover)
+#undef RESULT
+ }
+ }
+
+ EBlobState GetBlobState(ui32 groupId, TLogoBlobID id) const {
+ if (const auto it = GroupStates.find(groupId); it != GroupStates.end()) {
+ return it->second.GetBlobState(id);
+ } else {
+ Y_FAIL_S("GroupId# " << groupId << " is not being overseen");
+ }
+ }
+
+ void EnumerateBlobs(ui32 groupId, const std::function<void(TLogoBlobID, EBlobState)>& callback) const {
+ if (const auto it = GroupStates.find(groupId); it != GroupStates.end()) {
+ return it->second.EnumerateBlobs(callback);
+ } else {
+ Y_FAIL();
+ }
+ }
+
+ private:
+ template<typename T>
+ void ExamineQueryEvent(ui32 nodeId, IEventHandle& ev, ui32 resultEventType) {
+ Y_VERIFY(ev.GetTypeRewrite() == T::EventType);
+
+ const auto it = OverseenServiceMap.find(ev.Recipient);
+ if (it == OverseenServiceMap.end()) {
+ return;
+ }
+ const ui32 groupId = it->second;
+
+ const TQueryId queryId{nodeId, resultEventType, ev.Sender, ev.Cookie};
+ const auto [_, inserted] = QueryToGroup.emplace(queryId, groupId);
+ if (inserted) {
+ GroupStates[groupId].ExamineQueryEvent(queryId, *ev.Get<T>());
+ }
+ }
+
+ template<typename T>
+ void ExamineResultEvent(ui32 nodeId, IEventHandle& ev) {
+ Y_VERIFY(ev.GetTypeRewrite() == T::EventType);
+
+ const TQueryId queryId{nodeId, T::EventType, ev.Recipient, ev.Cookie};
+ const auto it = QueryToGroup.find(queryId);
+ if (it == QueryToGroup.end()) {
+ return;
+ }
+ const ui32 groupId = it->second;
+ QueryToGroup.erase(it);
+
+ auto& msg = *ev.Get<T>();
+ if constexpr (T::EventType != TEvBlobStorage::EvBlockResult &&
+ T::EventType != TEvBlobStorage::EvInplacePatchResult &&
+ T::EventType != TEvBlobStorage::EvCollectGarbageResult &&
+ T::EventType != TEvBlobStorage::EvDiscoverResult) {
+ Y_VERIFY(groupId == msg.GroupId);
+ }
+
+ GroupStates[groupId].ExamineResultEvent(queryId, msg);
+ }
+ };
+
+ TGroupOverseer::TGroupOverseer()
+ : Impl(std::make_unique<TImpl>())
+ {}
+
+ TGroupOverseer::~TGroupOverseer()
+ {}
+
+ void TGroupOverseer::AddGroupToOversee(ui32 groupId) {
+ Impl->AddGroupToOversee(groupId);
+ }
+
+ void TGroupOverseer::ExamineEnqueue(ui32 nodeId, IEventHandle& ev) {
+ Impl->ExamineEnqueue(nodeId, ev);
+ }
+
+ void TGroupOverseer::ExamineEvent(ui32 nodeId, IEventHandle& ev) {
+ Impl->ExamineEvent(nodeId, ev);
+ }
+
+ EBlobState TGroupOverseer::GetBlobState(ui32 groupId, TLogoBlobID id) const {
+ return Impl->GetBlobState(groupId, id);
+ }
+
+ void TGroupOverseer::EnumerateBlobs(ui32 groupId, const std::function<void(TLogoBlobID, EBlobState)>& callback) const {
+ Impl->EnumerateBlobs(groupId, callback);
+ }
+
+} // NKikimr::NTesting
diff --git a/ydb/core/blobstorage/testing/group_overseer/group_overseer.h b/ydb/core/blobstorage/testing/group_overseer/group_overseer.h
new file mode 100644
index 00000000000..7da4381d3f9
--- /dev/null
+++ b/ydb/core/blobstorage/testing/group_overseer/group_overseer.h
@@ -0,0 +1,29 @@
+#pragma once
+
+#include "defs.h"
+
+namespace NKikimr::NTesting {
+
+ enum class EBlobState {
+ NOT_WRITTEN,
+ POSSIBLY_WRITTEN, // write was in flight, but no confirmation received
+ CERTAINLY_WRITTEN, // write was confirmed
+ POSSIBLY_COLLECTED, // blob is beyond unconfirmed GC command
+ CERTAINLY_COLLECTED_OR_NEVER_WRITTEN, // blob is gonna be collected soon
+ };
+
+ class TGroupOverseer {
+ class TImpl;
+ std::unique_ptr<TImpl> Impl;
+
+ public:
+ TGroupOverseer();
+ ~TGroupOverseer();
+ void AddGroupToOversee(ui32 groupId);
+ void ExamineEnqueue(ui32 nodeId, IEventHandle& ev);
+ void ExamineEvent(ui32 nodeId, IEventHandle& ev);
+ EBlobState GetBlobState(ui32 groupId, TLogoBlobID id) const;
+ void EnumerateBlobs(ui32 groupId, const std::function<void(TLogoBlobID, EBlobState)>& callback) const;
+ };
+
+} // NKikimr::NTesting
diff --git a/ydb/core/blobstorage/testing/group_overseer/group_state.cpp b/ydb/core/blobstorage/testing/group_overseer/group_state.cpp
new file mode 100644
index 00000000000..469017496bd
--- /dev/null
+++ b/ydb/core/blobstorage/testing/group_overseer/group_state.cpp
@@ -0,0 +1,372 @@
+#include "group_state.h"
+
+namespace NKikimr::NTesting {
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Block
+
+ template<>
+ void TGroupState::ExamineQueryEvent(const TQueryId& queryId, const TEvBlobStorage::TEvBlock& msg) {
+ TBlockInfo& block = Blocks[msg.TabletId];
+ const auto inflightIt = block.InFlight.emplace(msg);
+ const bool inserted1 = block.QueryToInFlight.emplace(queryId, inflightIt).second;
+ Y_VERIFY(inserted1);
+ const bool inserted2 = BlockQueryToTabletId.emplace(queryId, msg.TabletId).second;
+ Y_VERIFY(inserted2);
+ }
+
+ template<>
+ void TGroupState::ExamineResultEvent(const TQueryId& queryId, const TEvBlobStorage::TEvBlockResult& msg) {
+ auto query = BlockQueryToTabletId.extract(queryId);
+ Y_VERIFY(query);
+
+ TBlockInfo& block = Blocks[query.mapped()];
+
+ auto queryToInFlight = block.QueryToInFlight.extract(queryId);
+ Y_VERIFY(queryToInFlight);
+ auto inFlight = block.InFlight.extract(queryToInFlight.mapped());
+ Y_VERIFY(inFlight);
+ TBlockedGeneration& gen = inFlight.value();
+
+ if (msg.Status == NKikimrProto::OK) {
+ if (!block.Confirmed || block.Confirmed->Generation < gen.Generation || gen.Same(*block.Confirmed)) {
+ block.Confirmed.emplace(gen);
+ } else {
+ Y_FAIL("incorrect successful block");
+ }
+ }
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Put
+
+ template<>
+ void TGroupState::ExamineQueryEvent(const TQueryId& queryId, const TEvBlobStorage::TEvPut& msg) {
+ TBlobInfo& blob = *LookupBlob(msg.Id, true);
+
+ TBlobValueHash valueHash(msg);
+
+ if (blob.ValueHash) {
+ Y_VERIFY(*blob.ValueHash == valueHash);
+ } else {
+ blob.ValueHash.emplace(valueHash);
+ }
+
+ EConfidence isBlocked = IsBlocked(msg.Id.TabletID(), msg.Id.Generation());
+ for (const auto& [tabletId, generation] : msg.ExtraBlockChecks) {
+ isBlocked = Max(isBlocked, IsBlocked(tabletId, generation));
+ }
+
+ const EConfidence isCollected = IsCollected(msg.Id,
+ blob.ConfirmedKeep ? EConfidence::CONFIRMED :
+ blob.NumKeepsInFlight ? EConfidence::POSSIBLE :
+ EConfidence::SURELY_NOT,
+ blob.ConfirmedDoNotKeep ? EConfidence::CONFIRMED :
+ blob.NumDoNotKeepsInFlight ? EConfidence::POSSIBLE :
+ EConfidence::SURELY_NOT);
+
+ const bool inserted = blob.PutsInFlight.emplace(queryId, TBlobInfo::TQueryContext{
+ .IsBlocked = isBlocked == EConfidence::CONFIRMED, // if true, we can't get OK answer
+ .IsCollected = isCollected == EConfidence::CONFIRMED, // if true, this blob is being put beyond the barrier
+ }).second;
+ Y_VERIFY(inserted);
+ }
+
+ template<>
+ void TGroupState::ExamineResultEvent(const TQueryId& queryId, const TEvBlobStorage::TEvPutResult& msg) {
+ const auto it = Blobs.find(msg.Id);
+ if (msg.Status != NKikimrProto::OK && it == Blobs.end()) {
+ Y_VERIFY(GetBlobState(msg.Id) == EBlobState::CERTAINLY_COLLECTED_OR_NEVER_WRITTEN);
+ return; // it's okay to get ERROR for TEvPut when the blob is already collected and is not in the map
+ }
+ Y_VERIFY(it != Blobs.end());
+ TBlobInfo& blob = it->second;
+
+ auto putInFlight = blob.PutsInFlight.extract(queryId);
+ Y_VERIFY(putInFlight);
+
+ auto& v = putInFlight.mapped();
+ if (v.IsBlocked && (msg.Status != NKikimrProto::BLOCKED && msg.Status != NKikimrProto::ERROR)) {
+ Y_FAIL("incorrect TEvPut result status code -- was BLOCKED at the begin of the query");
+ }
+
+ if (v.IsCollected && msg.Status != NKikimrProto::ERROR) {
+ Y_FAIL("incorrect TEvPut result status code -- was already beyond the barrier at begin of the query");
+ }
+
+ if (msg.Status == NKikimrProto::OK) {
+ Y_VERIFY(blob.ValueHash);
+ blob.ConfirmedValue = true;
+ }
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Patch
+
+ template<>
+ void TGroupState::ExamineQueryEvent(const TQueryId&, const TEvBlobStorage::TEvPatch&) {
+ Y_FAIL("not implemented");
+ }
+
+ template<>
+ void TGroupState::ExamineResultEvent(const TQueryId&, const TEvBlobStorage::TEvPatchResult&) {
+ Y_FAIL("not implemented");
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // InplacePatch
+
+ template<>
+ void TGroupState::ExamineQueryEvent(const TQueryId&, const TEvBlobStorage::TEvInplacePatch&) {
+ Y_FAIL("not implemented");
+ }
+
+ template<>
+ void TGroupState::ExamineResultEvent(const TQueryId&, const TEvBlobStorage::TEvInplacePatchResult&) {
+ Y_FAIL("not implemented");
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // CollectGarbage
+
+ template<>
+ void TGroupState::ExamineQueryEvent(const TQueryId& queryId, const TEvBlobStorage::TEvCollectGarbage& msg) {
+ auto processFlags = [&](TVector<TLogoBlobID> *vector, bool isKeepFlag) {
+ for (const TLogoBlobID& id : vector ? *vector : TVector<TLogoBlobID>()) {
+ TBlobInfo& blob = *LookupBlob(id, true);
+ ++(isKeepFlag ? blob.NumKeepsInFlight : blob.NumDoNotKeepsInFlight);
+ FlagsInFlight.emplace(queryId, std::make_tuple(isKeepFlag, id));
+ }
+ };
+
+ processFlags(msg.Keep.Get(), true);
+ processFlags(msg.DoNotKeep.Get(), true);
+
+ if (msg.Collect) {
+ const TBarrierId id(msg.TabletId, msg.Channel);
+ TBarrierInfo& barrier = Barriers[id];
+
+ auto inFlightIt = barrier.InFlight[msg.Hard].insert({
+ .Hard = msg.Hard,
+ .Value{
+ .RecordGeneration = msg.RecordGeneration,
+ .PerGenerationCounter = msg.PerGenerationCounter,
+ .CollectGeneration = msg.CollectGeneration,
+ .CollectStep = msg.CollectStep
+ }
+ });
+
+ const bool inserted = barrier.CollectsInFlight.emplace(queryId, inFlightIt).second;
+ Y_VERIFY(inserted);
+ }
+ }
+
+ template<>
+ void TGroupState::ExamineResultEvent(const TQueryId& queryId, const TEvBlobStorage::TEvCollectGarbageResult& msg) {
+ auto [begin, end] = FlagsInFlight.equal_range(queryId);
+ for (auto it = begin; it != end; ++it) {
+ const auto& [isKeepFlag, id] = it->second;
+ TBlobInfo& blob = *LookupBlob(id, true);
+ --(isKeepFlag ? blob.NumKeepsInFlight : blob.NumDoNotKeepsInFlight);
+ if (msg.Status == NKikimrProto::OK) {
+ (isKeepFlag ? blob.ConfirmedKeep : blob.ConfirmedDoNotKeep) = true;
+ }
+ }
+ FlagsInFlight.erase(begin, end);
+
+ const TBarrierId barrierId(msg.TabletId, msg.Channel);
+ TBarrierInfo& barrier = Barriers[barrierId];
+ if (auto query = barrier.CollectsInFlight.extract(queryId)) {
+ auto inFlightIt = query.mapped();
+ const bool hard = inFlightIt->Hard;
+ auto inFlight = barrier.InFlight[hard].extract(inFlightIt);
+ Y_VERIFY(inFlight);
+ if (msg.Status == NKikimrProto::OK) {
+ if (auto& dest = barrier.Confirmed[hard]) {
+ dest->Supersede(inFlight.value().Value);
+ } else {
+ dest.emplace(inFlight.value().Value);
+ }
+ }
+ }
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Get
+
+ template<>
+ void TGroupState::ExamineQueryEvent(const TQueryId& queryId, const TEvBlobStorage::TEvGet& msg) {
+ (void)queryId, (void)msg;
+ }
+
+ template<>
+ void TGroupState::ExamineResultEvent(const TQueryId& queryId, const TEvBlobStorage::TEvGetResult& msg) {
+ (void)queryId, (void)msg;
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Range
+
+ template<>
+ void TGroupState::ExamineQueryEvent(const TQueryId& queryId, const TEvBlobStorage::TEvRange& msg) {
+ (void)queryId, (void)msg;
+ }
+
+ template<>
+ void TGroupState::ExamineResultEvent(const TQueryId& queryId, const TEvBlobStorage::TEvRangeResult& msg) {
+ (void)queryId, (void)msg;
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Discover
+
+ template<>
+ void TGroupState::ExamineQueryEvent(const TQueryId& queryId, const TEvBlobStorage::TEvDiscover& msg) {
+ (void)queryId, (void)msg;
+ }
+
+ template<>
+ void TGroupState::ExamineResultEvent(const TQueryId& queryId, const TEvBlobStorage::TEvDiscoverResult& msg) {
+ (void)queryId, (void)msg;
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Common parts
+
+ TGroupState::EConfidence TGroupState::TBlockInfo::IsBlocked(ui32 generation) const {
+ if (Confirmed && generation <= Confirmed->Generation) {
+ return EConfidence::CONFIRMED;
+ } else if (InFlight.empty()) {
+ return EConfidence::SURELY_NOT;
+ } else {
+ const TBlockedGeneration& gen = *--InFlight.end();
+ return generation <= gen.Generation ? EConfidence::POSSIBLE : EConfidence::SURELY_NOT;
+ }
+ }
+
+ TGroupState::EConfidence TGroupState::IsBlocked(ui64 tabletId, ui32 generation) const {
+ const auto it = Blocks.find(tabletId);
+ return it == Blocks.end() ? EConfidence::SURELY_NOT : it->second.IsBlocked(generation);
+ }
+
+ TGroupState::EConfidence TGroupState::IsCollected(TLogoBlobID id, EConfidence keep, EConfidence doNotKeep) const {
+ EConfidence result = EConfidence::SURELY_NOT;
+
+ const TBarrierId barrierId(id.TabletID(), id.Channel());
+ if (const auto it = Barriers.find(barrierId); it != Barriers.end()) {
+ const TBarrierInfo& barrier = it->second;
+ const auto genstep = std::make_tuple(id.Generation(), id.Step());
+
+ EConfidence isCollectedBySoftBarrier;
+ switch (keep) {
+ case EConfidence::SURELY_NOT:
+ isCollectedBySoftBarrier = EConfidence::CONFIRMED;
+ break;
+
+ case EConfidence::POSSIBLE:
+ switch (doNotKeep) {
+ case EConfidence::SURELY_NOT:
+ case EConfidence::POSSIBLE:
+ isCollectedBySoftBarrier = EConfidence::POSSIBLE;
+ break;
+
+ case EConfidence::CONFIRMED:
+ // this case should not occur
+ isCollectedBySoftBarrier = EConfidence::SURELY_NOT;
+ break;
+ }
+ break;
+
+ case EConfidence::CONFIRMED:
+ switch (doNotKeep) {
+ case EConfidence::SURELY_NOT:
+ isCollectedBySoftBarrier = EConfidence::SURELY_NOT;
+ break;
+
+ case EConfidence::POSSIBLE:
+ case EConfidence::CONFIRMED:
+ isCollectedBySoftBarrier = EConfidence::POSSIBLE;
+ break;
+ }
+ break;
+ }
+
+ auto getBarrierState = [&](const auto& confirmed, const auto& inflight) {
+ if (confirmed && genstep <= confirmed->GetCollectGenStep()) {
+ return EConfidence::CONFIRMED;
+ }
+ if (!inflight.empty()) {
+ const auto& most = *--inflight.end();
+ if (genstep <= most.Value.GetCollectGenStep()) {
+ return EConfidence::POSSIBLE;
+ }
+ }
+ return EConfidence::SURELY_NOT;
+ };
+
+ result = Max(getBarrierState(barrier.Confirmed[true], barrier.InFlight[true]),
+ Min(isCollectedBySoftBarrier, getBarrierState(barrier.Confirmed[false], barrier.InFlight[false])));
+ }
+
+ return result;
+ }
+
+ TGroupState::TBlobInfo *TGroupState::LookupBlob(TLogoBlobID id, bool create) const {
+ Y_VERIFY(id.BlobSize() != 0);
+ Y_VERIFY(id.PartId() == 0);
+
+ const TLogoBlobID min(id.TabletID(), id.Generation(), id.Step(), id.Channel(), 0, id.Cookie());
+ auto it = Blobs.lower_bound(min);
+ if (it != Blobs.end()) {
+ const TLogoBlobID& key = it->first;
+ if (key.TabletID() == id.TabletID() && key.Channel() == id.Channel() && key.Generation() == id.Generation() &&
+ key.Step() == id.Step() && key.Cookie() == id.Cookie()) {
+ Y_VERIFY(key == id);
+ } else if (create) {
+ it = const_cast<decltype(Blobs)&>(Blobs).try_emplace(it, id);
+ } else {
+ return nullptr;
+ }
+ return const_cast<TBlobInfo*>(&it->second);
+ }
+ return create ? &const_cast<decltype(Blobs)&>(Blobs)[id] : nullptr;
+ }
+
+ EBlobState TGroupState::GetBlobState(TLogoBlobID id, const TBlobInfo *blob) const {
+ if (!blob) {
+ blob = LookupBlob(id, false);
+ }
+ if (!blob) {
+ switch (IsCollected(id, EConfidence::SURELY_NOT, EConfidence::SURELY_NOT)) {
+ case EConfidence::SURELY_NOT: return EBlobState::NOT_WRITTEN;
+ case EConfidence::POSSIBLE: return EBlobState::NOT_WRITTEN;
+ case EConfidence::CONFIRMED: return EBlobState::CERTAINLY_COLLECTED_OR_NEVER_WRITTEN;
+ }
+ Y_UNREACHABLE();
+ }
+ if (!blob->ValueHash) {
+ return EBlobState::NOT_WRITTEN;
+ }
+ const EConfidence keep = blob->ConfirmedKeep ? EConfidence::CONFIRMED :
+ blob->NumKeepsInFlight ? EConfidence::POSSIBLE : EConfidence::SURELY_NOT;
+ const EConfidence doNotKeep = blob->ConfirmedDoNotKeep ? EConfidence::CONFIRMED :
+ blob->NumDoNotKeepsInFlight ? EConfidence::POSSIBLE : EConfidence::SURELY_NOT;
+ switch (IsCollected(id, keep, doNotKeep)) {
+ case EConfidence::SURELY_NOT:
+ return blob->ConfirmedValue ? EBlobState::CERTAINLY_WRITTEN : EBlobState::POSSIBLY_WRITTEN;
+
+ case EConfidence::POSSIBLE:
+ return EBlobState::POSSIBLY_COLLECTED;
+
+ case EConfidence::CONFIRMED:
+ Y_FAIL(); // must never reach this point -- blob must be deleted from the map when this state occurs
+ }
+ }
+
+ void TGroupState::EnumerateBlobs(const std::function<void(TLogoBlobID, EBlobState)>& callback) const {
+ for (const auto& [id, blob] : Blobs) {
+ callback(id, GetBlobState(id, &blob));
+ }
+ }
+
+} // NKikimr::NTesting
diff --git a/ydb/core/blobstorage/testing/group_overseer/group_state.h b/ydb/core/blobstorage/testing/group_overseer/group_state.h
new file mode 100644
index 00000000000..f71655d4ddf
--- /dev/null
+++ b/ydb/core/blobstorage/testing/group_overseer/group_state.h
@@ -0,0 +1,161 @@
+#pragma once
+
+#include "defs.h"
+
+namespace NKikimr::NTesting {
+
+ using TQueryId = std::tuple<ui32, ui32, TActorId, ui64>; // nodeId, result type, sender, cookie
+ using TBarrierId = std::tuple<ui64, ui8>;
+
+} // NKikimr::NTesting
+
+template<>
+struct std::hash<NKikimr::NTesting::TQueryId> {
+ size_t operator ()(const NKikimr::NTesting::TQueryId& x) const {
+ return MultiHash(std::get<0>(x), std::get<1>(x), std::get<2>(x), std::get<3>(x));
+ }
+};
+
+template<>
+struct std::hash<NKikimr::NTesting::TBarrierId> {
+ size_t operator ()(const NKikimr::NTesting::TBarrierId& x) const {
+ return MultiHash(std::get<0>(x), std::get<1>(x));
+ }
+};
+
+namespace NKikimr::NTesting {
+
+ class TGroupState {
+
+ enum class EConfidence {
+ SURELY_NOT,
+ POSSIBLE,
+ CONFIRMED,
+ };
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // Blocks
+
+ struct TBlockedGeneration {
+ ui32 Generation;
+ ui64 IssuerGuid;
+
+ TBlockedGeneration(const TEvBlobStorage::TEvBlock& msg)
+ : Generation(msg.Generation)
+ , IssuerGuid(msg.IssuerGuid)
+ {}
+
+ TBlockedGeneration(const TBlockedGeneration&) = default;
+
+ friend bool operator <(const TBlockedGeneration& x, const TBlockedGeneration& y) {
+ return x.Generation < y.Generation;
+ }
+
+ bool Same(const TBlockedGeneration& x) const {
+ return Generation == x.Generation && IssuerGuid == x.IssuerGuid && IssuerGuid;
+ }
+ };
+
+ struct TBlockInfo {
+ std::optional<TBlockedGeneration> Confirmed;
+ std::multiset<TBlockedGeneration> InFlight;
+ std::unordered_map<TQueryId, std::multiset<TBlockedGeneration>::iterator> QueryToInFlight;
+
+ EConfidence IsBlocked(ui32 generation) const;
+ };
+
+ std::unordered_map<ui64, TBlockInfo> Blocks;
+ std::unordered_map<TQueryId, ui64> BlockQueryToTabletId;
+
+ EConfidence IsBlocked(ui64 tabletId, ui32 generation) const;
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ struct TBarrierInfo {
+ struct TValue {
+ ui32 RecordGeneration;
+ ui32 PerGenerationCounter;
+ ui32 CollectGeneration;
+ ui32 CollectStep;
+
+ std::tuple<ui32, ui32> GetCollectGenStep() const { return {CollectGeneration, CollectStep}; }
+ friend bool operator <(const TValue& x, const TValue& y) { return x.GetCollectGenStep() < y.GetCollectGenStep(); }
+
+ void Supersede(const TValue& with) {
+ Y_VERIFY(std::tie(RecordGeneration, PerGenerationCounter) <= std::tie(with.RecordGeneration, with.PerGenerationCounter));
+ if (RecordGeneration == with.RecordGeneration && PerGenerationCounter == with.PerGenerationCounter) {
+ Y_VERIFY(GetCollectGenStep() == with.GetCollectGenStep());
+ } else {
+ Y_VERIFY(GetCollectGenStep() <= with.GetCollectGenStep());
+ }
+ *this = with;
+ }
+ };
+ std::optional<TValue> Confirmed[2]; // soft, hard
+
+ struct TInFlightCollect {
+ bool Hard;
+ TValue Value;
+
+ friend bool operator <(const TInFlightCollect& x, const TInFlightCollect& y) { return x.Value < y.Value; }
+ };
+
+ std::multiset<TInFlightCollect> InFlight[2]; // soft, hard
+ std::unordered_map<TQueryId, std::multiset<TInFlightCollect>::iterator> CollectsInFlight;
+ };
+
+ std::unordered_map<TBarrierId, TBarrierInfo> Barriers;
+ std::unordered_multimap<TQueryId, std::tuple<bool, TLogoBlobID>> FlagsInFlight;
+
+ EConfidence IsCollected(TLogoBlobID id, EConfidence keep, EConfidence doNotKeep) const;
+
+ struct TBlobValueHash {
+ ui64 Low;
+ ui64 High;
+
+ TBlobValueHash(const TEvBlobStorage::TEvPut& msg) {
+ uint64_t high;
+ TRope buffer(msg.Buffer);
+ const auto span = buffer.GetContiguousSpan();
+ Low = t1ha2_atonce128(&high, span.data(), span.size(), 1);
+ High = high;
+ }
+
+ TBlobValueHash(const TBlobValueHash&) = default;
+
+ friend bool operator ==(const TBlobValueHash& x, const TBlobValueHash& y) { return x.Low == y.Low && x.High == y.High; }
+ friend bool operator !=(const TBlobValueHash& x, const TBlobValueHash& y) { return !(x == y); }
+ };
+
+ struct TBlobInfo {
+ std::optional<TBlobValueHash> ValueHash; // nullopt if not written
+ bool ConfirmedValue = false;
+ bool ConfirmedKeep = false;
+ bool ConfirmedDoNotKeep = false;
+ ui32 NumKeepsInFlight = 0; // number of CollectGarbage requests in flight with Keep flag for this blob
+ ui32 NumDoNotKeepsInFlight = 0; // the same, but for DoNotKeep flag
+ EBlobState BlobState = EBlobState::NOT_WRITTEN;
+
+ struct TQueryContext {
+ bool IsBlocked = false; // was the request already blocked when the Put got issued?
+ bool IsCollected = false; // was the blob id under garbage barrier when the Put got issued?
+ };
+ std::unordered_map<TQueryId, TQueryContext> PutsInFlight;
+ };
+
+ std::map<TLogoBlobID, TBlobInfo> Blobs;
+
+ TBlobInfo *LookupBlob(TLogoBlobID id, bool create) const;
+
+ public:
+ template<typename T>
+ void ExamineQueryEvent(const TQueryId& queryId, const T& msg);
+
+ template<typename T>
+ void ExamineResultEvent(const TQueryId& queryId, const T& msg);
+
+ EBlobState GetBlobState(TLogoBlobID id, const TBlobInfo *blob = nullptr) const;
+ void EnumerateBlobs(const std::function<void(TLogoBlobID, EBlobState)>& callback) const;
+ };
+
+} // NKikimr::NTesting
diff --git a/ydb/core/blobstorage/ut_testshard/CMakeLists.darwin.txt b/ydb/core/blobstorage/ut_testshard/CMakeLists.darwin.txt
index 052d4f6ebef..ea1e178bd15 100644
--- a/ydb/core/blobstorage/ut_testshard/CMakeLists.darwin.txt
+++ b/ydb/core/blobstorage/ut_testshard/CMakeLists.darwin.txt
@@ -19,6 +19,7 @@ target_link_libraries(ydb-core-blobstorage-ut_testshard PUBLIC
blobstorage-dsproxy-mock
core-blobstorage-nodewarden
blobstorage-pdisk-mock
+ blobstorage-testing-group_overseer
blobstorage-vdisk-common
ydb-core-mind
core-mind-bscontroller
diff --git a/ydb/core/blobstorage/ut_testshard/CMakeLists.linux.txt b/ydb/core/blobstorage/ut_testshard/CMakeLists.linux.txt
index 89622e7eff4..86e157a4e4d 100644
--- a/ydb/core/blobstorage/ut_testshard/CMakeLists.linux.txt
+++ b/ydb/core/blobstorage/ut_testshard/CMakeLists.linux.txt
@@ -21,6 +21,7 @@ target_link_libraries(ydb-core-blobstorage-ut_testshard PUBLIC
blobstorage-dsproxy-mock
core-blobstorage-nodewarden
blobstorage-pdisk-mock
+ blobstorage-testing-group_overseer
blobstorage-vdisk-common
ydb-core-mind
core-mind-bscontroller
diff --git a/ydb/core/blobstorage/ut_testshard/defs.h b/ydb/core/blobstorage/ut_testshard/defs.h
index 6cbaf4cbae5..26a9626097e 100644
--- a/ydb/core/blobstorage/ut_testshard/defs.h
+++ b/ydb/core/blobstorage/ut_testshard/defs.h
@@ -2,11 +2,13 @@
#include <ydb/core/base/hive.h>
#include <ydb/core/blob_depot/blob_depot.h>
+#include <ydb/core/blob_depot/testing.h>
#include <ydb/core/blobstorage/crypto/default.h>
#include <ydb/core/blobstorage/dsproxy/mock/dsproxy_mock.h>
#include <ydb/core/blobstorage/dsproxy/mock/model.h>
-#include <ydb/core/blobstorage/pdisk/mock/pdisk_mock.h>
#include <ydb/core/blobstorage/nodewarden/node_warden.h>
+#include <ydb/core/blobstorage/pdisk/mock/pdisk_mock.h>
+#include <ydb/core/blobstorage/testing/group_overseer/group_overseer.h>
#include <ydb/core/mind/bscontroller/bsc.h>
#include <ydb/core/mind/bscontroller/types.h>
#include <ydb/core/mind/dynamic_nameserver.h>
diff --git a/ydb/core/blobstorage/ut_testshard/env.h b/ydb/core/blobstorage/ut_testshard/env.h
index f354de8e179..6a55403faf5 100644
--- a/ydb/core/blobstorage/ut_testshard/env.h
+++ b/ydb/core/blobstorage/ut_testshard/env.h
@@ -14,11 +14,11 @@ struct TEnvironmentSetup {
TIntrusivePtr<NFake::TProxyDS> Group0 = MakeIntrusive<NFake::TProxyDS>();
std::map<std::pair<ui32, ui32>, TIntrusivePtr<TPDiskMockState>> PDiskMockStates;
NKikimr::NTestShard::TTestShardContext::TPtr TestShardContext = NKikimr::NTestShard::TTestShardContext::Create();
+ NKikimr::NTesting::TGroupOverseer GroupOverseer;
struct TSettings {
const ui32 NodeCount = 8;
const bool Encryption = false;
- const std::function<void(TTestActorSystem&)> PrepareRuntime;
const ui32 ControllerNodeId = 1;
};
@@ -77,9 +77,16 @@ struct TEnvironmentSetup {
void Initialize() {
Runtime = std::make_unique<TTestActorSystem>(Settings.NodeCount, NLog::PRI_ERROR);
- if (Settings.PrepareRuntime) {
- Settings.PrepareRuntime(*Runtime);
- }
+
+ Runtime->FilterFunction = [this](ui32 nodeId, std::unique_ptr<IEventHandle>& ev) {
+ GroupOverseer.ExamineEvent(nodeId, *ev);
+ return true;
+ };
+ Runtime->FilterEnqueue = [this](ui32 nodeId, std::unique_ptr<IEventHandle>& ev, ISchedulerCookie*, TInstant) {
+ GroupOverseer.ExamineEnqueue(nodeId, *ev);
+ return true;
+ };
+
SetupLogging();
Runtime->Start();
auto *appData = Runtime->GetAppData();
@@ -339,8 +346,21 @@ struct TEnvironmentSetup {
prof->SetCount(32);
}
+ {
+ auto *cmd = request.AddCommand();
+ cmd->MutableQueryBaseConfig();
+ }
+
auto response = Invoke(request);
UNIT_ASSERT_C(response.GetSuccess(), response.GetErrorDescription());
+
+ for (const auto& status : response.GetStatus()) {
+ if (status.HasBaseConfig()) {
+ for (const auto& group : status.GetBaseConfig().GetGroup()) {
+ GroupOverseer.AddGroupToOversee(group.GetGroupId());
+ }
+ }
+ }
}
void Sim(TDuration delta = TDuration::Zero()) {
diff --git a/ydb/core/blobstorage/ut_testshard/main.cpp b/ydb/core/blobstorage/ut_testshard/main.cpp
index 73f1f262355..441283881a0 100644
--- a/ydb/core/blobstorage/ut_testshard/main.cpp
+++ b/ydb/core/blobstorage/ut_testshard/main.cpp
@@ -99,7 +99,19 @@ Y_UNIT_TEST_SUITE(BlobDepotWithTestShard) {
Cerr << "TabletId# " << tabletId << " configured" << Endl;
}
- env.Sim(TDuration::Seconds(5));
+ std::vector<IActor*> blobDepots;
+ env.Runtime->EnumActors([&](IActor *actor) {
+ if (NBlobDepot::IsBlobDepotActor(actor)) {
+ blobDepots.push_back(actor);
+ }
+ });
+
+ for (ui32 i = 0; i < 1000; ++i) {
+ for (IActor *actor : blobDepots) {
+ NBlobDepot::ValidateBlobDepot(actor, env.GroupOverseer);
+ }
+ env.Sim(TDuration::MilliSeconds(5));
+ }
}
}
diff --git a/ydb/core/util/testactorsys.h b/ydb/core/util/testactorsys.h
index a6b9f2ca913..ab12d5c7049 100644
--- a/ydb/core/util/testactorsys.h
+++ b/ydb/core/util/testactorsys.h
@@ -66,8 +66,8 @@ class TTestActorSystem {
ISchedulerCookie *Cookie;
ui32 NodeId;
- TScheduleItem(TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, ui32 nodeId)
- : Event(ev.Release())
+ TScheduleItem(std::unique_ptr<IEventHandle> ev, ISchedulerCookie *cookie, ui32 nodeId)
+ : Event(std::move(ev))
, Cookie(cookie)
, NodeId(nodeId)
{}
@@ -160,6 +160,7 @@ class TTestActorSystem {
public:
std::function<bool(ui32, std::unique_ptr<IEventHandle>&)> FilterFunction;
+ std::function<bool(ui32, std::unique_ptr<IEventHandle>&, ISchedulerCookie*, TInstant)> FilterEnqueue;
IOutputStream *LogStream = &Cerr;
public:
@@ -418,7 +419,10 @@ public:
if (ev && ev->HasEvent() && ev->GetTypeRewrite() == ev->Type && !EventName.count(ev->Type)) {
EventName.emplace(ev->Type, TypeName(*ev->GetBase()));
}
- ScheduleQ[ts].emplace_back(ev, cookie, nodeId);
+ std::unique_ptr<IEventHandle> evPtr(ev.Release());
+ if (!FilterEnqueue || FilterEnqueue(nodeId, evPtr, cookie, ts)) {
+ ScheduleQ[ts].emplace_back(std::move(evPtr), cookie, nodeId);
+ }
}
void Schedule(TDuration timeout, TAutoPtr<IEventHandle> ev, ISchedulerCookie *cookie, ui32 nodeId) {
diff --git a/ydb/core/viewer/json_browse.h b/ydb/core/viewer/json_browse.h
index 6a5532ed50e..b14019893a3 100644
--- a/ydb/core/viewer/json_browse.h
+++ b/ydb/core/viewer/json_browse.h
@@ -14,17 +14,6 @@
#include "viewer.h"
#include "wb_aggregate.h"
-namespace std {
-
-template <>
-struct hash<NActors::TActorId> {
- size_t operator ()(const NActors::TActorId& actorId) const {
- return actorId.Hash();
- }
-};
-
-}
-
namespace NKikimr {
namespace NViewer {