diff options
author | abcdef <akotov@ydb.tech> | 2023-12-04 12:09:29 +0300 |
---|---|---|
committer | abcdef <akotov@ydb.tech> | 2023-12-04 16:15:24 +0300 |
commit | 4a40f67198ed136416031878d22f0418886e4419 (patch) | |
tree | 94c69ddf2047792df45ef21d6c38a6cd247e2849 | |
parent | 8a17f12ce8da549aecb623bc406523885c8dba4e (diff) | |
download | ydb-4a40f67198ed136416031878d22f0418886e4419.tar.gz |
paths to the actors library
- исправил пути к библиотеке акторов
- вернул тесты для `TPartitionWriterCacheActor`
14 files changed, 785 insertions, 1 deletions
diff --git a/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp b/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp index a7cc60c1cf..3d72815667 100644 --- a/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp +++ b/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp @@ -215,7 +215,7 @@ auto TPartitionWriterCacheActor::GetPartitionWriter(const TString& sessionId, co return p->second.get(); } - if (Writers.size() >= MAX_TRANSACTIONS_COUNT) { + if (Writers.size() >= (1 + MAX_TRANSACTIONS_COUNT)) { if (!TryDeleteOldestWriter(ctx)) { return nullptr; } @@ -237,8 +237,13 @@ bool TPartitionWriterCacheActor::TryDeleteOldestWriter(const TActorContext& ctx) auto oldest = Writers.end(); for (auto p = Writers.begin(); p != Writers.end(); ++p) { + auto& tx = p->first; auto& writer = *p->second; + if ((tx.first == "") && (tx.second == "")) { + continue; + } + if ((writer.LastActivity < minLastActivity) && !writer.HasPendingRequests()) { minLastActivity = writer.LastActivity; oldest = p; diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.darwin-arm64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.darwin-arm64.txt index 3382a1ded5..b754157dc2 100644 --- a/ydb/services/persqueue_v1/ut/CMakeLists.darwin-arm64.txt +++ b/ydb/services/persqueue_v1/ut/CMakeLists.darwin-arm64.txt @@ -52,6 +52,10 @@ target_sources(ydb-services-persqueue_v1-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/topic_service_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/demo_tx.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/pqtablet_mock.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/kqp_mock.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_fixture.cpp ) set_property( TARGET diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt index bd2dfa11ff..b9ff3fbb03 100644 --- a/ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt @@ -53,6 +53,10 @@ target_sources(ydb-services-persqueue_v1-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/topic_service_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/demo_tx.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/pqtablet_mock.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/kqp_mock.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_fixture.cpp ) set_property( TARGET diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt index f43d22e671..d135c0c3dc 100644 --- a/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt @@ -56,6 +56,10 @@ target_sources(ydb-services-persqueue_v1-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/topic_service_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/demo_tx.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/pqtablet_mock.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/kqp_mock.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_fixture.cpp ) set_property( TARGET diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt index de9ee61be0..fff4a470c7 100644 --- a/ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt @@ -57,6 +57,10 @@ target_sources(ydb-services-persqueue_v1-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/topic_service_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/demo_tx.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/pqtablet_mock.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/kqp_mock.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_fixture.cpp ) set_property( TARGET diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt index 9971d1c24a..1700f10ce7 100644 --- a/ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt @@ -46,6 +46,10 @@ target_sources(ydb-services-persqueue_v1-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/topic_service_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/demo_tx.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/pqtablet_mock.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/kqp_mock.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_fixture.cpp ) set_property( TARGET diff --git a/ydb/services/persqueue_v1/ut/kqp_mock.cpp b/ydb/services/persqueue_v1/ut/kqp_mock.cpp new file mode 100644 index 0000000000..39291a9119 --- /dev/null +++ b/ydb/services/persqueue_v1/ut/kqp_mock.cpp @@ -0,0 +1,32 @@ +#include "kqp_mock.h" + +namespace NKikimr::NPersQueueTests { + +void TKqpProxyServiceMock::Bootstrap() +{ + Become(&TKqpProxyServiceMock::StateWork); +} + +STFUNC(TKqpProxyServiceMock::StateWork) +{ + switch (ev->GetTypeRewrite()) { + HFunc(NKqp::TEvKqp::TEvQueryRequest, Handle); + } +} + +void TKqpProxyServiceMock::Handle(NKqp::TEvKqp::TEvQueryRequest::TPtr& ev, const TActorContext& ctx) +{ + auto& event = *ev->Get(); + + Y_ABORT_UNLESS(event.HasAction()); + Y_ABORT_UNLESS(event.GetAction() == NKikimrKqp::QUERY_ACTION_TOPIC); + + auto queryResponse = std::make_unique<NKqp::TEvKqp::TEvQueryResponse>(); + auto* response = queryResponse->Record.GetRef().MutableResponse(); + + response->MutableTopicOperations()->SetWriteId(NextWriteId++); + + ctx.Send(ev->Sender, std::move(queryResponse)); +} + +} diff --git a/ydb/services/persqueue_v1/ut/kqp_mock.h b/ydb/services/persqueue_v1/ut/kqp_mock.h new file mode 100644 index 0000000000..57f9ed4564 --- /dev/null +++ b/ydb/services/persqueue_v1/ut/kqp_mock.h @@ -0,0 +1,19 @@ +#pragma once + +#include <ydb/core/kqp/common/events/events.h> +#include <ydb/library/actors/core/actor_bootstrapped.h> + +namespace NKikimr::NPersQueueTests { + +class TKqpProxyServiceMock : public TActorBootstrapped<TKqpProxyServiceMock> { +public: + void Bootstrap(); + + STFUNC(StateWork); + + void Handle(NKqp::TEvKqp::TEvQueryRequest::TPtr& ev, const TActorContext& ctx); + + ui64 NextWriteId = 1; +}; + +} diff --git a/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_fixture.cpp b/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_fixture.cpp new file mode 100644 index 0000000000..b4c57e054c --- /dev/null +++ b/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_fixture.cpp @@ -0,0 +1,184 @@ +#include "partition_writer_cache_actor_fixture.h" +#include "kqp_mock.h" + +#include <ydb/core/kqp/common/simple/services.h> +#include <ydb/services/persqueue_v1/actors/partition_writer_cache_actor.h> + +namespace NKikimr::NPersQueueTests { + +void TPartitionWriterCacheActorFixture::SetUp(NUnitTest::TTestContext&) +{ + SetupContext(); + SetupKqpMock(); + SetupPQTabletMock(PQTabletId); + SetupEventObserver(); +} + +void TPartitionWriterCacheActorFixture::TearDown(NUnitTest::TTestContext&) +{ + CleanupContext(); +} + +void TPartitionWriterCacheActorFixture::SetupContext() +{ + Ctx.ConstructInPlace(); + Finalizer.ConstructInPlace(*Ctx); + + Ctx->Prepare(); +} + +void TPartitionWriterCacheActorFixture::SetupKqpMock() +{ + for (ui32 node = 0; node < Ctx->Runtime->GetNodeCount(); ++node) { + Ctx->Runtime->RegisterService(NKqp::MakeKqpProxyID(Ctx->Runtime->GetNodeId(node)), + Ctx->Runtime->Register(new TKqpProxyServiceMock(), node), + node); + } +} + +void TPartitionWriterCacheActorFixture::SetupPQTabletMock(ui64 tabletId) +{ + auto createPQTabletMock = [&](const NActors::TActorId& tablet, NKikimr::TTabletStorageInfo* info) -> IActor* { + PQTablet = new TPQTabletMock(tablet, info); + return PQTablet; + }; + + CreateTestBootstrapper(*Ctx->Runtime, + CreateTestTabletInfo(tabletId, NKikimrTabletBase::TTabletTypes::Dummy, TErasureType::ErasureNone), + createPQTabletMock); + + TDispatchOptions options; + options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot)); + + Ctx->Runtime->DispatchEvents(options); +} + +void TPartitionWriterCacheActorFixture::CleanupContext() +{ + Finalizer.Clear(); + Ctx.Clear(); +} + +TActorId TPartitionWriterCacheActorFixture::CreatePartitionWriterCacheActor(const TCreatePartitionWriterCacheActorParams& params) +{ + using TPartitionWriterCacheActor = NKikimr::NGRpcProxy::V1::TPartitionWriterCacheActor; + + NPQ::TPartitionWriterOpts options; + options.WithDeduplication(params.WithDeduplication); + options.WithDatabase(params.Database); + + auto actor = std::make_unique<TPartitionWriterCacheActor>(Ctx->Edge, + params.Partition, + PQTabletId, + params.Generation, + params.SourceId, + options); + TActorId actorId = Ctx->Runtime->Register(actor.release()); + + auto event = Ctx->Runtime->GrabEdgeEvent<NPQ::TEvPartitionWriter::TEvInitResult>(); + UNIT_ASSERT(event != nullptr); + + return actorId; +} + +auto TPartitionWriterCacheActorFixture::MakeTxId(const TString& sessionId, const TString& txId) -> TTxId +{ + return {sessionId, txId}; +} + +void TPartitionWriterCacheActorFixture::SetupEventObserver() +{ + auto observer = [this](TAutoPtr<IEventHandle>& ev) { + if (auto event = ev->CastAsLocal<NPQ::TEvPartitionWriter::TEvTxWriteRequest>(); event) { + CookieToTxId[event->Request->GetCookie()] = MakeTxId(event->SessionId, event->TxId); + } else if (auto event = ev->CastAsLocal<NPQ::TEvPartitionWriter::TEvWriteRequest>(); event) { + auto p = CookieToTxId.find(event->GetCookie()); + UNIT_ASSERT(p != CookieToTxId.end()); + + if (!TxIdToPartitionWriter.contains(p->second)) { + TxIdToPartitionWriter[p->second] = ev->Recipient; + PartitionWriterToTxId[ev->Recipient] = p->second; + + ++CreatePartitionWriterCount; + } + } else if (auto event = ev->CastAsLocal<TEvents::TEvPoisonPill>(); event) { + auto p = PartitionWriterToTxId.find(ev->Recipient); + if (p != PartitionWriterToTxId.end()) { + TxIdToPartitionWriter.erase(p->second); + PartitionWriterToTxId.erase(p); + + ++DeletePartitionWriterCount; + } + } + + return TTestActorRuntime::EEventAction::PROCESS; + }; + + Ctx->Runtime->SetObserverFunc(std::move(observer)); +} + +void TPartitionWriterCacheActorFixture::SendTxWriteRequest(const TActorId& recipient, + const TSendTxWriteRequestParams& params) +{ + auto write = + std::make_unique<NPQ::TEvPartitionWriter::TEvTxWriteRequest>(params.SessionId, params.TxId, + MakeHolder<NPQ::TEvPartitionWriter::TEvWriteRequest>(params.Cookie)); + auto* w = write->Request->Record.MutablePartitionRequest()->AddCmdWrite(); + Y_UNUSED(w); + + Ctx->Runtime->Send(recipient, Ctx->Edge, write.release(), 0, true); +} + +void TPartitionWriterCacheActorFixture::EnsurePartitionWriterExist(const TEnsurePartitionWriterExistParams& params) +{ + auto p = TxIdToPartitionWriter.find(MakeTxId(params.SessionId, params.TxId)); + UNIT_ASSERT(p != TxIdToPartitionWriter.end()); +} + +void TPartitionWriterCacheActorFixture::EnsurePartitionWriterNotExist(const TEnsurePartitionWriterExistParams& params) +{ + auto p = TxIdToPartitionWriter.find(MakeTxId(params.SessionId, params.TxId)); + UNIT_ASSERT(p == TxIdToPartitionWriter.end()); +} + +void TPartitionWriterCacheActorFixture::EnsureWriteSessionClosed(EErrorCode errorCode) +{ + while (true) { + auto event = Ctx->Runtime->GrabEdgeEvent<NPQ::TEvPartitionWriter::TEvWriteResponse>(); + UNIT_ASSERT(event != nullptr); + if (!event->IsSuccess()) { + UNIT_ASSERT_VALUES_EQUAL((int)event->GetError().Code, (int)errorCode); + break; + } + } +} + +void TPartitionWriterCacheActorFixture::WaitForPartitionWriterOps(const TWaitForPartitionWriterOpsParams& params) +{ + CreatePartitionWriterCount = 0; + DeletePartitionWriterCount = 0; + + TDispatchOptions options; + options.CustomFinalCondition = [this, ¶ms]() -> bool { + if (params.CreateCount && params.DeleteCount) { + return + (DeletePartitionWriterCount == *params.DeleteCount) && + (CreatePartitionWriterCount == *params.CreateCount); + } else if (params.DeleteCount) { + return DeletePartitionWriterCount == *params.DeleteCount; + } else if (params.CreateCount) { + return CreatePartitionWriterCount == *params.CreateCount; + } else { + return false; + } + }; + + Ctx->Runtime->DispatchEvents(options); +} + +void TPartitionWriterCacheActorFixture::AdvanceCurrentTime(TDuration d) +{ + Ctx->Runtime->AdvanceCurrentTime(d); +} + +} diff --git a/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_fixture.h b/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_fixture.h new file mode 100644 index 0000000000..cc1adc6a67 --- /dev/null +++ b/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_fixture.h @@ -0,0 +1,80 @@ +#pragma once + +#include "pqtablet_mock.h" + +#include <ydb/core/persqueue/ut/common/pq_ut_common.h> +#include <ydb/core/persqueue/writer/writer.h> + +#include <library/cpp/testing/unittest/registar.h> + +#include <optional> + +namespace NKikimr::NPersQueueTests { + +struct TCreatePartitionWriterCacheActorParams { + ui32 Partition = 0; + std::optional<ui32> Generation; + TString SourceId = "source_id"; + bool WithDeduplication = true; + TString Database = "database"; +}; + +struct TSendTxWriteRequestParams { + TString SessionId; + TString TxId; + ui64 Cookie; +}; + +struct TEnsurePartitionWriterExistParams { + TString SessionId; + TString TxId; +}; + +struct TWaitForPartitionWriterOpsParams { + TMaybe<size_t> CreateCount; + TMaybe<size_t> DeleteCount; +}; + +class TPartitionWriterCacheActorFixture : public NUnitTest::TBaseFixture { +protected: + using EErrorCode = NPQ::TEvPartitionWriter::TEvWriteResponse::EErrorCode; + using TTxId = std::pair<TString, TString>; + + void SetUp(NUnitTest::TTestContext&) override; + void TearDown(NUnitTest::TTestContext&) override; + + void SetupContext(); + void SetupKqpMock(); + void SetupPQTabletMock(ui64 tabletId); + void SetupEventObserver(); + + void CleanupContext(); + + TActorId CreatePartitionWriterCacheActor(const TCreatePartitionWriterCacheActorParams& params = {}); + void SendTxWriteRequest(const TActorId& recipient, const TSendTxWriteRequestParams& params); + + void EnsurePartitionWriterExist(const TEnsurePartitionWriterExistParams& params); + void EnsurePartitionWriterNotExist(const TEnsurePartitionWriterExistParams& params); + void EnsureWriteSessionClosed(EErrorCode errorCode); + + void WaitForPartitionWriterOps(const TWaitForPartitionWriterOpsParams& params); + + void AdvanceCurrentTime(TDuration d); + + static TTxId MakeTxId(const TString& sessionId, const TString& txId); + + TMaybe<NPQ::TTestContext> Ctx; + TMaybe<NPQ::TFinalizer> Finalizer; + + const ui64 PQTabletId = 12345; + TPQTabletMock* PQTablet = nullptr; + + THashMap<ui64, TTxId> CookieToTxId; + THashMap<TTxId, TActorId> TxIdToPartitionWriter; + THashMap<TActorId, TTxId> PartitionWriterToTxId; + + size_t CreatePartitionWriterCount = 0; + size_t DeletePartitionWriterCount = 0; +}; + +} diff --git a/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_ut.cpp b/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_ut.cpp new file mode 100644 index 0000000000..8e30e0ee45 --- /dev/null +++ b/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_ut.cpp @@ -0,0 +1,145 @@ +#include "partition_writer_cache_actor_fixture.h" + +#include <ydb/core/persqueue/writer/writer.h> + +#include <library/cpp/testing/unittest/registar.h> + +#include <memory> + +namespace NKikimr::NPersQueueTests { + +Y_UNIT_TEST_SUITE(TPartitionWriterCacheActorTests) { + +Y_UNIT_TEST_F(WriteReplyOrder, TPartitionWriterCacheActorFixture) +{ + TActorId partitionWriterCache = CreatePartitionWriterCacheActor(); + + // + // write operations within two transactions txId-A and txId-B + // + SendTxWriteRequest(partitionWriterCache, {.SessionId="sessionId", .TxId="txId-A", .Cookie=1}); + SendTxWriteRequest(partitionWriterCache, {.SessionId="sessionId", .TxId="txId-B", .Cookie=2}); + SendTxWriteRequest(partitionWriterCache, {.SessionId="sessionId", .TxId="txId-B", .Cookie=3}); + SendTxWriteRequest(partitionWriterCache, {.SessionId="sessionId", .TxId="txId-A", .Cookie=4}); + + // + // The answers from the PQ tablet come in a different order + // + PQTablet->AppendWriteReply(2); + PQTablet->AppendWriteReply(3); + PQTablet->AppendWriteReply(1); + PQTablet->AppendWriteReply(4); + + // + // it is expected that the responses will come in the same order as the requests + // + ui64 expectCookie[2] = {1, 1}; + + while ((expectCookie[0] != 4) && (expectCookie[1] != 4)) { + TAutoPtr<IEventHandle> handle; + auto events = + Ctx->Runtime->GrabEdgeEvents<NPQ::TEvPartitionWriter::TEvWriteAccepted, NPQ::TEvPartitionWriter::TEvWriteResponse>(handle); + + if (auto accepted = get<0>(events); accepted) { + UNIT_ASSERT_VALUES_EQUAL(accepted->SessionId, "sessionId"); + UNIT_ASSERT((accepted->TxId == "txId-A") || (accepted->TxId == "txId-B")); + UNIT_ASSERT_VALUES_EQUAL(accepted->Cookie, expectCookie[0]); + + ++expectCookie[0]; + } else if (auto complete = get<1>(events); complete) { + UNIT_ASSERT_VALUES_EQUAL(complete->SessionId, "sessionId"); + UNIT_ASSERT((accepted->TxId == "txId-A") || (accepted->TxId == "txId-B")); + UNIT_ASSERT(complete->IsSuccess()); + UNIT_ASSERT_VALUES_EQUAL(complete->Record.GetPartitionResponse().GetCookie(), expectCookie[1]); + + ++expectCookie[1]; + } else { + UNIT_ASSERT(false); + } + } +} + +Y_UNIT_TEST_F(DropOldWriter, TPartitionWriterCacheActorFixture) +{ + TActorId partitionWriterCache = CreatePartitionWriterCacheActor(); + + // + // no more than four actors writers work within one writing session + // + SendTxWriteRequest(partitionWriterCache, {.SessionId="sessionId", .TxId="txId-A", .Cookie=1}); + WaitForPartitionWriterOps({.CreateCount=1}); + + AdvanceCurrentTime(TDuration::Seconds(1)); + SendTxWriteRequest(partitionWriterCache, {.SessionId="sessionId", .TxId="txId-B", .Cookie=2}); + WaitForPartitionWriterOps({.CreateCount=1}); + + // + // the transaction time of txId-A and txId-B is different + // + + AdvanceCurrentTime(TDuration::Seconds(1)); + SendTxWriteRequest(partitionWriterCache, {.SessionId="sessionId", .TxId="txId-C", .Cookie=3}); + SendTxWriteRequest(partitionWriterCache, {.SessionId="sessionId", .TxId="txId-D", .Cookie=4}); + WaitForPartitionWriterOps({.CreateCount=2}); + + // + // a writer's actor has been created for each transaction + // + EnsurePartitionWriterExist({.SessionId="sessionId", .TxId="txId-A"}); + EnsurePartitionWriterExist({.SessionId="sessionId", .TxId="txId-B"}); + EnsurePartitionWriterExist({.SessionId="sessionId", .TxId="txId-C"}); + EnsurePartitionWriterExist({.SessionId="sessionId", .TxId="txId-D"}); + + // + // a write operation within a new transaction should displace the oldest actor of the writer + // + AdvanceCurrentTime(TDuration::Seconds(1)); + SendTxWriteRequest(partitionWriterCache, {.SessionId="sessionId", .TxId="txId-E", .Cookie=5}); + WaitForPartitionWriterOps({.CreateCount=1, .DeleteCount=1}); + + // + // this is the new actor for the txId-E transaction + // + EnsurePartitionWriterExist({.SessionId="sessionId", .TxId="txId-E"}); + EnsurePartitionWriterNotExist({.SessionId="sessionId", .TxId="txId-A"}); + + // + // a new write operation within the txId-A transaction should displace the writer actor + // + AdvanceCurrentTime(TDuration::Seconds(1)); + SendTxWriteRequest(partitionWriterCache, {.SessionId="sessionId", .TxId="txId-A", .Cookie=6}); + WaitForPartitionWriterOps({.CreateCount=1, .DeleteCount=1}); + + // + // now this is the writer actor for the txId-B transaction + // + EnsurePartitionWriterExist({.SessionId="sessionId", .TxId="txId-A"}); + EnsurePartitionWriterNotExist({.SessionId="sessionId", .TxId="txId-B"}); + + AdvanceCurrentTime(TDuration::Seconds(1)); + + // + // we simulate the delay in processing write operations + // + PQTablet->AppendWriteReply(7); + PQTablet->AppendWriteReply(8); + PQTablet->AppendWriteReply(9); + PQTablet->AppendWriteReply(10); + PQTablet->AppendWriteReply(11); + + SendTxWriteRequest(partitionWriterCache, {.SessionId="sessionId", .TxId="txId-A", .Cookie=7}); + SendTxWriteRequest(partitionWriterCache, {.SessionId="sessionId", .TxId="txId-C", .Cookie=8}); + SendTxWriteRequest(partitionWriterCache, {.SessionId="sessionId", .TxId="txId-D", .Cookie=9}); + SendTxWriteRequest(partitionWriterCache, {.SessionId="sessionId", .TxId="txId-E", .Cookie=10}); + SendTxWriteRequest(partitionWriterCache, {.SessionId="sessionId", .TxId="txId-B", .Cookie=11}); + + // + // since there are 4 active transactions and an actor cannot be created for the txId-B transaction, + // the writing session will end with an error + // + EnsureWriteSessionClosed(EErrorCode::OverloadError); +} + +} + +} diff --git a/ydb/services/persqueue_v1/ut/pqtablet_mock.cpp b/ydb/services/persqueue_v1/ut/pqtablet_mock.cpp new file mode 100644 index 0000000000..8591b8245f --- /dev/null +++ b/ydb/services/persqueue_v1/ut/pqtablet_mock.cpp @@ -0,0 +1,222 @@ +#include "pqtablet_mock.h" + +#include <library/cpp/testing/unittest/registar.h> + +namespace NKikimr::NPersQueueTests { + +TMaybe<ui64> TPQTabletMock::GetPartitionRequestCookie() const +{ + Y_ABORT_UNLESS(Request); + Y_ABORT_UNLESS(Request->Record.HasPartitionRequest()); + + if (Request->Record.GetPartitionRequest().HasCookie()) { + return Request->Record.GetPartitionRequest().GetCookie(); + } + + return {}; +} + +void TPQTabletMock::PrepareGetOwnershipResponse() +{ + Response = std::make_unique<TEvPersQueue::TEvResponse>(); + Response->Record.SetStatus(NMsgBusProxy::MSTATUS_OK); + Response->Record.SetErrorCode(NPersQueue::NErrorCode::OK); + + auto* partition = Response->Record.MutablePartitionResponse(); + + if (auto cookie = GetPartitionRequestCookie()) { + partition->SetCookie(*cookie); + } + + partition->MutableCmdGetOwnershipResult()->SetOwnerCookie(OwnerCookie); +} + +void TPQTabletMock::PrepareGetMaxSeqNoResponse() +{ + Response = std::make_unique<TEvPersQueue::TEvResponse>(); + Response->Record.SetStatus(NMsgBusProxy::MSTATUS_OK); + Response->Record.SetErrorCode(NPersQueue::NErrorCode::OK); + + auto* partition = Response->Record.MutablePartitionResponse(); + + if (auto cookie = GetPartitionRequestCookie()) { + partition->SetCookie(*cookie); + } + + auto* sourceIdInfo = partition->MutableCmdGetMaxSeqNoResult()->AddSourceIdInfo(); + sourceIdInfo->SetState(NKikimrPQ::TMessageGroupInfo::STATE_REGISTERED); + sourceIdInfo->SetSeqNo(MaxSeqNo); +} + +auto TPQTabletMock::MakeReserveBytesResponse(ui64 cookie) -> TEvResponsePtr +{ + auto event = std::make_unique<TEvPersQueue::TEvResponse>(); + event->Record.SetStatus(NMsgBusProxy::MSTATUS_OK); + event->Record.SetErrorCode(NPersQueue::NErrorCode::OK); + + auto* partition = event->Record.MutablePartitionResponse(); + partition->SetCookie(cookie); + + // + // CmdWriteResultSize == 0 + // + + return event; +} + +auto TPQTabletMock::MakeWriteResponse(ui64 cookie) -> TEvResponsePtr +{ + auto event = std::make_unique<TEvPersQueue::TEvResponse>(); + event->Record.SetStatus(NMsgBusProxy::MSTATUS_OK); + event->Record.SetErrorCode(NPersQueue::NErrorCode::OK); + + auto* partition = event->Record.MutablePartitionResponse(); + partition->SetCookie(cookie); + + // + // CmdWriteResultSize == 1 + // + auto* w = partition->AddCmdWriteResult(); + Y_UNUSED(w); + + return event; +} + +void TPQTabletMock::PrepareReserveBytesResponse(const TActorId& recipient) +{ + if (auto cookie = GetPartitionRequestCookie(); cookie) { + TReply reply; + reply.Response = MakeReserveBytesResponse(*cookie); + reply.Recipient = recipient; + + CmdReserve.Replies[*cookie] = std::move(reply); + } +} + +void TPQTabletMock::PrepareWriteResponse(const TActorId& recipient) +{ + if (auto cookie = GetPartitionRequestCookie(); cookie) { + TReply reply; + reply.Response = MakeWriteResponse(*cookie); + reply.Recipient = recipient; + + CmdWrite.Replies[*cookie] = std::move(reply); + } +} + +void TPQTabletMock::TrySendResponses(const TActorContext& ctx, TCommandReplies& cmd) +{ + size_t count = 0; + for (auto cookie : cmd.ExpectedRequests) { + count += cmd.Replies.contains(cookie); + } + + if (count != cmd.ExpectedRequests.size()) { + return; + } + + for (auto cookie : cmd.ExpectedRequests) { + auto p = cmd.Replies.find(cookie); + auto& reply = p->second; + + ctx.Send(reply.Recipient, std::move(reply.Response)); + cmd.Replies.erase(p); + } + + for (auto& [_, reply] : cmd.Replies) { + ctx.Send(reply.Recipient, std::move(reply.Response)); + } + cmd.Replies.clear(); +} + +TPQTabletMock::TPQTabletMock(const TActorId& tablet, TTabletStorageInfo* info) : + TActor(&TThis::StateBoot), + TTabletExecutedFlat(info, tablet, nullptr) +{ +} + +void TPQTabletMock::OnDetach(const TActorContext &ctx) +{ + Die(ctx); +} + +void TPQTabletMock::OnTabletDead(TEvTablet::TEvTabletDead::TPtr &ev, const TActorContext &ctx) +{ + Y_UNUSED(ev); + + Die(ctx); +} + +void TPQTabletMock::DefaultSignalTabletActive(const TActorContext &ctx) +{ + Y_UNUSED(ctx); +} + +void TPQTabletMock::OnActivateExecutor(const TActorContext &ctx) +{ + Become(&TThis::StateWork); + SignalTabletActive(ctx); +} + +STFUNC(TPQTabletMock::StateBoot) +{ + StateInitImpl(ev, SelfId()); +} + +STFUNC(TPQTabletMock::StateWork) +{ + switch (ev->GetTypeRewrite()) { + HFunc(TEvTabletPipe::TEvClientConnected, Handle); + HFunc(TEvTabletPipe::TEvClientDestroyed, Handle); + + HFunc(TEvPersQueue::TEvRequest, Handle); + default: + HandleDefaultEvents(ev, SelfId()); + } +} + +void TPQTabletMock::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) +{ + Y_UNUSED(ev); + Y_UNUSED(ctx); +} + +void TPQTabletMock::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) +{ + Y_UNUSED(ev); + Y_UNUSED(ctx); +} + +void TPQTabletMock::Handle(TEvPersQueue::TEvRequest::TPtr& ev, const TActorContext& ctx) +{ + Request.reset(ev->Release().Release()); + + Y_ABORT_UNLESS(Request->Record.HasPartitionRequest()); + auto& partition = Request->Record.GetPartitionRequest(); + + if (partition.HasCmdGetOwnership()) { + PrepareGetOwnershipResponse(); + UNIT_ASSERT(Response.get()); + ctx.Send(ev->Sender, std::move(Response)); + } else if (partition.HasCmdGetMaxSeqNo()) { + PrepareGetMaxSeqNoResponse(); + UNIT_ASSERT(Response.get()); + ctx.Send(ev->Sender, std::move(Response)); + } else if (partition.HasCmdReserveBytes()) { + PrepareReserveBytesResponse(ev->Sender); + TrySendResponses(ctx, CmdReserve); + } else if (partition.CmdWriteSize()) { + PrepareWriteResponse(ev->Sender); + TrySendResponses(ctx, CmdWrite); + } else { + Y_ABORT_UNLESS(false); + } +} + +void TPQTabletMock::AppendWriteReply(ui64 cookie) +{ + CmdReserve.ExpectedRequests.push_back(cookie); + CmdWrite.ExpectedRequests.push_back(cookie); +} + +} diff --git a/ydb/services/persqueue_v1/ut/pqtablet_mock.h b/ydb/services/persqueue_v1/ut/pqtablet_mock.h new file mode 100644 index 0000000000..842a63e00d --- /dev/null +++ b/ydb/services/persqueue_v1/ut/pqtablet_mock.h @@ -0,0 +1,71 @@ +#pragma once + +#include <ydb/core/base/tablet_pipe.h> +#include <ydb/core/persqueue/events/global.h> +#include <ydb/core/tablet_flat/tablet_flat_executed.h> + +#include <ydb/public/lib/base/msgbus_status.h> + +#include <ydb/library/actors/core/actor.h> +#include <ydb/library/actors/core/actorid.h> + +#include <memory> + +namespace NKikimr::NPersQueueTests { + +class TPQTabletMock : public TActor<TPQTabletMock>, public NTabletFlatExecutor::TTabletExecutedFlat { +public: + TPQTabletMock(const TActorId& tablet, TTabletStorageInfo* info); + + void AppendWriteReply(ui64 cookie); + +private: + using TEvRequestPtr = std::unique_ptr<TEvPersQueue::TEvRequest>; + using TEvResponsePtr = std::unique_ptr<TEvPersQueue::TEvResponse>; + + struct TReply { + TEvResponsePtr Response; + TActorId Recipient; + }; + + struct TCommandReplies { + TDeque<ui64> ExpectedRequests; + THashMap<ui64, TReply> Replies; + }; + + void OnDetach(const TActorContext &ctx) override; + void OnTabletDead(TEvTablet::TEvTabletDead::TPtr &ev, const TActorContext &ctx) override; + void DefaultSignalTabletActive(const TActorContext &ctx) override; + void OnActivateExecutor(const TActorContext &ctx) override; + + STFUNC(StateBoot); + STFUNC(StateWork); + + void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx); + void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx); + + void Handle(TEvPersQueue::TEvRequest::TPtr& ev, const TActorContext& ctx); + + TMaybe<ui64> GetPartitionRequestCookie() const; + + void PrepareGetOwnershipResponse(); + void PrepareGetMaxSeqNoResponse(); + void PrepareReserveBytesResponse(const TActorId& recipient); + void PrepareWriteResponse(const TActorId& recipient); + + static TEvResponsePtr MakeReserveBytesResponse(ui64 cookie); + static TEvResponsePtr MakeWriteResponse(ui64 cookie); + + void TrySendResponses(const TActorContext& ctx, TCommandReplies& cmd); + + TEvRequestPtr Request; + TEvResponsePtr Response; + + TString OwnerCookie = "owner-cookie"; + ui64 MaxSeqNo = 0; + + TCommandReplies CmdReserve; + TCommandReplies CmdWrite; +}; + +} diff --git a/ydb/services/persqueue_v1/ut/ya.make b/ydb/services/persqueue_v1/ut/ya.make index 78d0a7cbea..279e0e2d5c 100644 --- a/ydb/services/persqueue_v1/ut/ya.make +++ b/ydb/services/persqueue_v1/ut/ya.make @@ -28,6 +28,12 @@ SRCS( functions_executor_wrapper.cpp topic_service_ut.cpp demo_tx.cpp + + partition_writer_cache_actor_ut.cpp + + pqtablet_mock.cpp + kqp_mock.cpp + partition_writer_cache_actor_fixture.cpp ) PEERDIR( |