aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorabcdef <akotov@ydb.tech>2023-12-04 12:09:29 +0300
committerabcdef <akotov@ydb.tech>2023-12-04 16:15:24 +0300
commit4a40f67198ed136416031878d22f0418886e4419 (patch)
tree94c69ddf2047792df45ef21d6c38a6cd247e2849
parent8a17f12ce8da549aecb623bc406523885c8dba4e (diff)
downloadydb-4a40f67198ed136416031878d22f0418886e4419.tar.gz
paths to the actors library
- исправил пути к библиотеке акторов - вернул тесты для `TPartitionWriterCacheActor`
-rw-r--r--ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp7
-rw-r--r--ydb/services/persqueue_v1/ut/CMakeLists.darwin-arm64.txt4
-rw-r--r--ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt4
-rw-r--r--ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt4
-rw-r--r--ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt4
-rw-r--r--ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt4
-rw-r--r--ydb/services/persqueue_v1/ut/kqp_mock.cpp32
-rw-r--r--ydb/services/persqueue_v1/ut/kqp_mock.h19
-rw-r--r--ydb/services/persqueue_v1/ut/partition_writer_cache_actor_fixture.cpp184
-rw-r--r--ydb/services/persqueue_v1/ut/partition_writer_cache_actor_fixture.h80
-rw-r--r--ydb/services/persqueue_v1/ut/partition_writer_cache_actor_ut.cpp145
-rw-r--r--ydb/services/persqueue_v1/ut/pqtablet_mock.cpp222
-rw-r--r--ydb/services/persqueue_v1/ut/pqtablet_mock.h71
-rw-r--r--ydb/services/persqueue_v1/ut/ya.make6
14 files changed, 785 insertions, 1 deletions
diff --git a/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp b/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp
index a7cc60c1cf..3d72815667 100644
--- a/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp
+++ b/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp
@@ -215,7 +215,7 @@ auto TPartitionWriterCacheActor::GetPartitionWriter(const TString& sessionId, co
return p->second.get();
}
- if (Writers.size() >= MAX_TRANSACTIONS_COUNT) {
+ if (Writers.size() >= (1 + MAX_TRANSACTIONS_COUNT)) {
if (!TryDeleteOldestWriter(ctx)) {
return nullptr;
}
@@ -237,8 +237,13 @@ bool TPartitionWriterCacheActor::TryDeleteOldestWriter(const TActorContext& ctx)
auto oldest = Writers.end();
for (auto p = Writers.begin(); p != Writers.end(); ++p) {
+ auto& tx = p->first;
auto& writer = *p->second;
+ if ((tx.first == "") && (tx.second == "")) {
+ continue;
+ }
+
if ((writer.LastActivity < minLastActivity) && !writer.HasPendingRequests()) {
minLastActivity = writer.LastActivity;
oldest = p;
diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.darwin-arm64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.darwin-arm64.txt
index 3382a1ded5..b754157dc2 100644
--- a/ydb/services/persqueue_v1/ut/CMakeLists.darwin-arm64.txt
+++ b/ydb/services/persqueue_v1/ut/CMakeLists.darwin-arm64.txt
@@ -52,6 +52,10 @@ target_sources(ydb-services-persqueue_v1-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/topic_service_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/demo_tx.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/pqtablet_mock.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/kqp_mock.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_fixture.cpp
)
set_property(
TARGET
diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt
index bd2dfa11ff..b9ff3fbb03 100644
--- a/ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt
+++ b/ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt
@@ -53,6 +53,10 @@ target_sources(ydb-services-persqueue_v1-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/topic_service_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/demo_tx.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/pqtablet_mock.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/kqp_mock.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_fixture.cpp
)
set_property(
TARGET
diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt
index f43d22e671..d135c0c3dc 100644
--- a/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt
@@ -56,6 +56,10 @@ target_sources(ydb-services-persqueue_v1-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/topic_service_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/demo_tx.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/pqtablet_mock.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/kqp_mock.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_fixture.cpp
)
set_property(
TARGET
diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt
index de9ee61be0..fff4a470c7 100644
--- a/ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt
+++ b/ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt
@@ -57,6 +57,10 @@ target_sources(ydb-services-persqueue_v1-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/topic_service_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/demo_tx.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/pqtablet_mock.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/kqp_mock.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_fixture.cpp
)
set_property(
TARGET
diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt
index 9971d1c24a..1700f10ce7 100644
--- a/ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt
+++ b/ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt
@@ -46,6 +46,10 @@ target_sources(ydb-services-persqueue_v1-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/topic_service_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/demo_tx.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/pqtablet_mock.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/kqp_mock.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_fixture.cpp
)
set_property(
TARGET
diff --git a/ydb/services/persqueue_v1/ut/kqp_mock.cpp b/ydb/services/persqueue_v1/ut/kqp_mock.cpp
new file mode 100644
index 0000000000..39291a9119
--- /dev/null
+++ b/ydb/services/persqueue_v1/ut/kqp_mock.cpp
@@ -0,0 +1,32 @@
+#include "kqp_mock.h"
+
+namespace NKikimr::NPersQueueTests {
+
+void TKqpProxyServiceMock::Bootstrap()
+{
+ Become(&TKqpProxyServiceMock::StateWork);
+}
+
+STFUNC(TKqpProxyServiceMock::StateWork)
+{
+ switch (ev->GetTypeRewrite()) {
+ HFunc(NKqp::TEvKqp::TEvQueryRequest, Handle);
+ }
+}
+
+void TKqpProxyServiceMock::Handle(NKqp::TEvKqp::TEvQueryRequest::TPtr& ev, const TActorContext& ctx)
+{
+ auto& event = *ev->Get();
+
+ Y_ABORT_UNLESS(event.HasAction());
+ Y_ABORT_UNLESS(event.GetAction() == NKikimrKqp::QUERY_ACTION_TOPIC);
+
+ auto queryResponse = std::make_unique<NKqp::TEvKqp::TEvQueryResponse>();
+ auto* response = queryResponse->Record.GetRef().MutableResponse();
+
+ response->MutableTopicOperations()->SetWriteId(NextWriteId++);
+
+ ctx.Send(ev->Sender, std::move(queryResponse));
+}
+
+}
diff --git a/ydb/services/persqueue_v1/ut/kqp_mock.h b/ydb/services/persqueue_v1/ut/kqp_mock.h
new file mode 100644
index 0000000000..57f9ed4564
--- /dev/null
+++ b/ydb/services/persqueue_v1/ut/kqp_mock.h
@@ -0,0 +1,19 @@
+#pragma once
+
+#include <ydb/core/kqp/common/events/events.h>
+#include <ydb/library/actors/core/actor_bootstrapped.h>
+
+namespace NKikimr::NPersQueueTests {
+
+class TKqpProxyServiceMock : public TActorBootstrapped<TKqpProxyServiceMock> {
+public:
+ void Bootstrap();
+
+ STFUNC(StateWork);
+
+ void Handle(NKqp::TEvKqp::TEvQueryRequest::TPtr& ev, const TActorContext& ctx);
+
+ ui64 NextWriteId = 1;
+};
+
+}
diff --git a/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_fixture.cpp b/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_fixture.cpp
new file mode 100644
index 0000000000..b4c57e054c
--- /dev/null
+++ b/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_fixture.cpp
@@ -0,0 +1,184 @@
+#include "partition_writer_cache_actor_fixture.h"
+#include "kqp_mock.h"
+
+#include <ydb/core/kqp/common/simple/services.h>
+#include <ydb/services/persqueue_v1/actors/partition_writer_cache_actor.h>
+
+namespace NKikimr::NPersQueueTests {
+
+void TPartitionWriterCacheActorFixture::SetUp(NUnitTest::TTestContext&)
+{
+ SetupContext();
+ SetupKqpMock();
+ SetupPQTabletMock(PQTabletId);
+ SetupEventObserver();
+}
+
+void TPartitionWriterCacheActorFixture::TearDown(NUnitTest::TTestContext&)
+{
+ CleanupContext();
+}
+
+void TPartitionWriterCacheActorFixture::SetupContext()
+{
+ Ctx.ConstructInPlace();
+ Finalizer.ConstructInPlace(*Ctx);
+
+ Ctx->Prepare();
+}
+
+void TPartitionWriterCacheActorFixture::SetupKqpMock()
+{
+ for (ui32 node = 0; node < Ctx->Runtime->GetNodeCount(); ++node) {
+ Ctx->Runtime->RegisterService(NKqp::MakeKqpProxyID(Ctx->Runtime->GetNodeId(node)),
+ Ctx->Runtime->Register(new TKqpProxyServiceMock(), node),
+ node);
+ }
+}
+
+void TPartitionWriterCacheActorFixture::SetupPQTabletMock(ui64 tabletId)
+{
+ auto createPQTabletMock = [&](const NActors::TActorId& tablet, NKikimr::TTabletStorageInfo* info) -> IActor* {
+ PQTablet = new TPQTabletMock(tablet, info);
+ return PQTablet;
+ };
+
+ CreateTestBootstrapper(*Ctx->Runtime,
+ CreateTestTabletInfo(tabletId, NKikimrTabletBase::TTabletTypes::Dummy, TErasureType::ErasureNone),
+ createPQTabletMock);
+
+ TDispatchOptions options;
+ options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot));
+
+ Ctx->Runtime->DispatchEvents(options);
+}
+
+void TPartitionWriterCacheActorFixture::CleanupContext()
+{
+ Finalizer.Clear();
+ Ctx.Clear();
+}
+
+TActorId TPartitionWriterCacheActorFixture::CreatePartitionWriterCacheActor(const TCreatePartitionWriterCacheActorParams& params)
+{
+ using TPartitionWriterCacheActor = NKikimr::NGRpcProxy::V1::TPartitionWriterCacheActor;
+
+ NPQ::TPartitionWriterOpts options;
+ options.WithDeduplication(params.WithDeduplication);
+ options.WithDatabase(params.Database);
+
+ auto actor = std::make_unique<TPartitionWriterCacheActor>(Ctx->Edge,
+ params.Partition,
+ PQTabletId,
+ params.Generation,
+ params.SourceId,
+ options);
+ TActorId actorId = Ctx->Runtime->Register(actor.release());
+
+ auto event = Ctx->Runtime->GrabEdgeEvent<NPQ::TEvPartitionWriter::TEvInitResult>();
+ UNIT_ASSERT(event != nullptr);
+
+ return actorId;
+}
+
+auto TPartitionWriterCacheActorFixture::MakeTxId(const TString& sessionId, const TString& txId) -> TTxId
+{
+ return {sessionId, txId};
+}
+
+void TPartitionWriterCacheActorFixture::SetupEventObserver()
+{
+ auto observer = [this](TAutoPtr<IEventHandle>& ev) {
+ if (auto event = ev->CastAsLocal<NPQ::TEvPartitionWriter::TEvTxWriteRequest>(); event) {
+ CookieToTxId[event->Request->GetCookie()] = MakeTxId(event->SessionId, event->TxId);
+ } else if (auto event = ev->CastAsLocal<NPQ::TEvPartitionWriter::TEvWriteRequest>(); event) {
+ auto p = CookieToTxId.find(event->GetCookie());
+ UNIT_ASSERT(p != CookieToTxId.end());
+
+ if (!TxIdToPartitionWriter.contains(p->second)) {
+ TxIdToPartitionWriter[p->second] = ev->Recipient;
+ PartitionWriterToTxId[ev->Recipient] = p->second;
+
+ ++CreatePartitionWriterCount;
+ }
+ } else if (auto event = ev->CastAsLocal<TEvents::TEvPoisonPill>(); event) {
+ auto p = PartitionWriterToTxId.find(ev->Recipient);
+ if (p != PartitionWriterToTxId.end()) {
+ TxIdToPartitionWriter.erase(p->second);
+ PartitionWriterToTxId.erase(p);
+
+ ++DeletePartitionWriterCount;
+ }
+ }
+
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+
+ Ctx->Runtime->SetObserverFunc(std::move(observer));
+}
+
+void TPartitionWriterCacheActorFixture::SendTxWriteRequest(const TActorId& recipient,
+ const TSendTxWriteRequestParams& params)
+{
+ auto write =
+ std::make_unique<NPQ::TEvPartitionWriter::TEvTxWriteRequest>(params.SessionId, params.TxId,
+ MakeHolder<NPQ::TEvPartitionWriter::TEvWriteRequest>(params.Cookie));
+ auto* w = write->Request->Record.MutablePartitionRequest()->AddCmdWrite();
+ Y_UNUSED(w);
+
+ Ctx->Runtime->Send(recipient, Ctx->Edge, write.release(), 0, true);
+}
+
+void TPartitionWriterCacheActorFixture::EnsurePartitionWriterExist(const TEnsurePartitionWriterExistParams& params)
+{
+ auto p = TxIdToPartitionWriter.find(MakeTxId(params.SessionId, params.TxId));
+ UNIT_ASSERT(p != TxIdToPartitionWriter.end());
+}
+
+void TPartitionWriterCacheActorFixture::EnsurePartitionWriterNotExist(const TEnsurePartitionWriterExistParams& params)
+{
+ auto p = TxIdToPartitionWriter.find(MakeTxId(params.SessionId, params.TxId));
+ UNIT_ASSERT(p == TxIdToPartitionWriter.end());
+}
+
+void TPartitionWriterCacheActorFixture::EnsureWriteSessionClosed(EErrorCode errorCode)
+{
+ while (true) {
+ auto event = Ctx->Runtime->GrabEdgeEvent<NPQ::TEvPartitionWriter::TEvWriteResponse>();
+ UNIT_ASSERT(event != nullptr);
+ if (!event->IsSuccess()) {
+ UNIT_ASSERT_VALUES_EQUAL((int)event->GetError().Code, (int)errorCode);
+ break;
+ }
+ }
+}
+
+void TPartitionWriterCacheActorFixture::WaitForPartitionWriterOps(const TWaitForPartitionWriterOpsParams& params)
+{
+ CreatePartitionWriterCount = 0;
+ DeletePartitionWriterCount = 0;
+
+ TDispatchOptions options;
+ options.CustomFinalCondition = [this, &params]() -> bool {
+ if (params.CreateCount && params.DeleteCount) {
+ return
+ (DeletePartitionWriterCount == *params.DeleteCount) &&
+ (CreatePartitionWriterCount == *params.CreateCount);
+ } else if (params.DeleteCount) {
+ return DeletePartitionWriterCount == *params.DeleteCount;
+ } else if (params.CreateCount) {
+ return CreatePartitionWriterCount == *params.CreateCount;
+ } else {
+ return false;
+ }
+ };
+
+ Ctx->Runtime->DispatchEvents(options);
+}
+
+void TPartitionWriterCacheActorFixture::AdvanceCurrentTime(TDuration d)
+{
+ Ctx->Runtime->AdvanceCurrentTime(d);
+}
+
+}
diff --git a/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_fixture.h b/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_fixture.h
new file mode 100644
index 0000000000..cc1adc6a67
--- /dev/null
+++ b/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_fixture.h
@@ -0,0 +1,80 @@
+#pragma once
+
+#include "pqtablet_mock.h"
+
+#include <ydb/core/persqueue/ut/common/pq_ut_common.h>
+#include <ydb/core/persqueue/writer/writer.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <optional>
+
+namespace NKikimr::NPersQueueTests {
+
+struct TCreatePartitionWriterCacheActorParams {
+ ui32 Partition = 0;
+ std::optional<ui32> Generation;
+ TString SourceId = "source_id";
+ bool WithDeduplication = true;
+ TString Database = "database";
+};
+
+struct TSendTxWriteRequestParams {
+ TString SessionId;
+ TString TxId;
+ ui64 Cookie;
+};
+
+struct TEnsurePartitionWriterExistParams {
+ TString SessionId;
+ TString TxId;
+};
+
+struct TWaitForPartitionWriterOpsParams {
+ TMaybe<size_t> CreateCount;
+ TMaybe<size_t> DeleteCount;
+};
+
+class TPartitionWriterCacheActorFixture : public NUnitTest::TBaseFixture {
+protected:
+ using EErrorCode = NPQ::TEvPartitionWriter::TEvWriteResponse::EErrorCode;
+ using TTxId = std::pair<TString, TString>;
+
+ void SetUp(NUnitTest::TTestContext&) override;
+ void TearDown(NUnitTest::TTestContext&) override;
+
+ void SetupContext();
+ void SetupKqpMock();
+ void SetupPQTabletMock(ui64 tabletId);
+ void SetupEventObserver();
+
+ void CleanupContext();
+
+ TActorId CreatePartitionWriterCacheActor(const TCreatePartitionWriterCacheActorParams& params = {});
+ void SendTxWriteRequest(const TActorId& recipient, const TSendTxWriteRequestParams& params);
+
+ void EnsurePartitionWriterExist(const TEnsurePartitionWriterExistParams& params);
+ void EnsurePartitionWriterNotExist(const TEnsurePartitionWriterExistParams& params);
+ void EnsureWriteSessionClosed(EErrorCode errorCode);
+
+ void WaitForPartitionWriterOps(const TWaitForPartitionWriterOpsParams& params);
+
+ void AdvanceCurrentTime(TDuration d);
+
+ static TTxId MakeTxId(const TString& sessionId, const TString& txId);
+
+ TMaybe<NPQ::TTestContext> Ctx;
+ TMaybe<NPQ::TFinalizer> Finalizer;
+
+ const ui64 PQTabletId = 12345;
+ TPQTabletMock* PQTablet = nullptr;
+
+ THashMap<ui64, TTxId> CookieToTxId;
+ THashMap<TTxId, TActorId> TxIdToPartitionWriter;
+ THashMap<TActorId, TTxId> PartitionWriterToTxId;
+
+ size_t CreatePartitionWriterCount = 0;
+ size_t DeletePartitionWriterCount = 0;
+};
+
+}
diff --git a/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_ut.cpp b/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_ut.cpp
new file mode 100644
index 0000000000..8e30e0ee45
--- /dev/null
+++ b/ydb/services/persqueue_v1/ut/partition_writer_cache_actor_ut.cpp
@@ -0,0 +1,145 @@
+#include "partition_writer_cache_actor_fixture.h"
+
+#include <ydb/core/persqueue/writer/writer.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <memory>
+
+namespace NKikimr::NPersQueueTests {
+
+Y_UNIT_TEST_SUITE(TPartitionWriterCacheActorTests) {
+
+Y_UNIT_TEST_F(WriteReplyOrder, TPartitionWriterCacheActorFixture)
+{
+ TActorId partitionWriterCache = CreatePartitionWriterCacheActor();
+
+ //
+ // write operations within two transactions txId-A and txId-B
+ //
+ SendTxWriteRequest(partitionWriterCache, {.SessionId="sessionId", .TxId="txId-A", .Cookie=1});
+ SendTxWriteRequest(partitionWriterCache, {.SessionId="sessionId", .TxId="txId-B", .Cookie=2});
+ SendTxWriteRequest(partitionWriterCache, {.SessionId="sessionId", .TxId="txId-B", .Cookie=3});
+ SendTxWriteRequest(partitionWriterCache, {.SessionId="sessionId", .TxId="txId-A", .Cookie=4});
+
+ //
+ // The answers from the PQ tablet come in a different order
+ //
+ PQTablet->AppendWriteReply(2);
+ PQTablet->AppendWriteReply(3);
+ PQTablet->AppendWriteReply(1);
+ PQTablet->AppendWriteReply(4);
+
+ //
+ // it is expected that the responses will come in the same order as the requests
+ //
+ ui64 expectCookie[2] = {1, 1};
+
+ while ((expectCookie[0] != 4) && (expectCookie[1] != 4)) {
+ TAutoPtr<IEventHandle> handle;
+ auto events =
+ Ctx->Runtime->GrabEdgeEvents<NPQ::TEvPartitionWriter::TEvWriteAccepted, NPQ::TEvPartitionWriter::TEvWriteResponse>(handle);
+
+ if (auto accepted = get<0>(events); accepted) {
+ UNIT_ASSERT_VALUES_EQUAL(accepted->SessionId, "sessionId");
+ UNIT_ASSERT((accepted->TxId == "txId-A") || (accepted->TxId == "txId-B"));
+ UNIT_ASSERT_VALUES_EQUAL(accepted->Cookie, expectCookie[0]);
+
+ ++expectCookie[0];
+ } else if (auto complete = get<1>(events); complete) {
+ UNIT_ASSERT_VALUES_EQUAL(complete->SessionId, "sessionId");
+ UNIT_ASSERT((accepted->TxId == "txId-A") || (accepted->TxId == "txId-B"));
+ UNIT_ASSERT(complete->IsSuccess());
+ UNIT_ASSERT_VALUES_EQUAL(complete->Record.GetPartitionResponse().GetCookie(), expectCookie[1]);
+
+ ++expectCookie[1];
+ } else {
+ UNIT_ASSERT(false);
+ }
+ }
+}
+
+Y_UNIT_TEST_F(DropOldWriter, TPartitionWriterCacheActorFixture)
+{
+ TActorId partitionWriterCache = CreatePartitionWriterCacheActor();
+
+ //
+ // no more than four actors writers work within one writing session
+ //
+ SendTxWriteRequest(partitionWriterCache, {.SessionId="sessionId", .TxId="txId-A", .Cookie=1});
+ WaitForPartitionWriterOps({.CreateCount=1});
+
+ AdvanceCurrentTime(TDuration::Seconds(1));
+ SendTxWriteRequest(partitionWriterCache, {.SessionId="sessionId", .TxId="txId-B", .Cookie=2});
+ WaitForPartitionWriterOps({.CreateCount=1});
+
+ //
+ // the transaction time of txId-A and txId-B is different
+ //
+
+ AdvanceCurrentTime(TDuration::Seconds(1));
+ SendTxWriteRequest(partitionWriterCache, {.SessionId="sessionId", .TxId="txId-C", .Cookie=3});
+ SendTxWriteRequest(partitionWriterCache, {.SessionId="sessionId", .TxId="txId-D", .Cookie=4});
+ WaitForPartitionWriterOps({.CreateCount=2});
+
+ //
+ // a writer's actor has been created for each transaction
+ //
+ EnsurePartitionWriterExist({.SessionId="sessionId", .TxId="txId-A"});
+ EnsurePartitionWriterExist({.SessionId="sessionId", .TxId="txId-B"});
+ EnsurePartitionWriterExist({.SessionId="sessionId", .TxId="txId-C"});
+ EnsurePartitionWriterExist({.SessionId="sessionId", .TxId="txId-D"});
+
+ //
+ // a write operation within a new transaction should displace the oldest actor of the writer
+ //
+ AdvanceCurrentTime(TDuration::Seconds(1));
+ SendTxWriteRequest(partitionWriterCache, {.SessionId="sessionId", .TxId="txId-E", .Cookie=5});
+ WaitForPartitionWriterOps({.CreateCount=1, .DeleteCount=1});
+
+ //
+ // this is the new actor for the txId-E transaction
+ //
+ EnsurePartitionWriterExist({.SessionId="sessionId", .TxId="txId-E"});
+ EnsurePartitionWriterNotExist({.SessionId="sessionId", .TxId="txId-A"});
+
+ //
+ // a new write operation within the txId-A transaction should displace the writer actor
+ //
+ AdvanceCurrentTime(TDuration::Seconds(1));
+ SendTxWriteRequest(partitionWriterCache, {.SessionId="sessionId", .TxId="txId-A", .Cookie=6});
+ WaitForPartitionWriterOps({.CreateCount=1, .DeleteCount=1});
+
+ //
+ // now this is the writer actor for the txId-B transaction
+ //
+ EnsurePartitionWriterExist({.SessionId="sessionId", .TxId="txId-A"});
+ EnsurePartitionWriterNotExist({.SessionId="sessionId", .TxId="txId-B"});
+
+ AdvanceCurrentTime(TDuration::Seconds(1));
+
+ //
+ // we simulate the delay in processing write operations
+ //
+ PQTablet->AppendWriteReply(7);
+ PQTablet->AppendWriteReply(8);
+ PQTablet->AppendWriteReply(9);
+ PQTablet->AppendWriteReply(10);
+ PQTablet->AppendWriteReply(11);
+
+ SendTxWriteRequest(partitionWriterCache, {.SessionId="sessionId", .TxId="txId-A", .Cookie=7});
+ SendTxWriteRequest(partitionWriterCache, {.SessionId="sessionId", .TxId="txId-C", .Cookie=8});
+ SendTxWriteRequest(partitionWriterCache, {.SessionId="sessionId", .TxId="txId-D", .Cookie=9});
+ SendTxWriteRequest(partitionWriterCache, {.SessionId="sessionId", .TxId="txId-E", .Cookie=10});
+ SendTxWriteRequest(partitionWriterCache, {.SessionId="sessionId", .TxId="txId-B", .Cookie=11});
+
+ //
+ // since there are 4 active transactions and an actor cannot be created for the txId-B transaction,
+ // the writing session will end with an error
+ //
+ EnsureWriteSessionClosed(EErrorCode::OverloadError);
+}
+
+}
+
+}
diff --git a/ydb/services/persqueue_v1/ut/pqtablet_mock.cpp b/ydb/services/persqueue_v1/ut/pqtablet_mock.cpp
new file mode 100644
index 0000000000..8591b8245f
--- /dev/null
+++ b/ydb/services/persqueue_v1/ut/pqtablet_mock.cpp
@@ -0,0 +1,222 @@
+#include "pqtablet_mock.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace NKikimr::NPersQueueTests {
+
+TMaybe<ui64> TPQTabletMock::GetPartitionRequestCookie() const
+{
+ Y_ABORT_UNLESS(Request);
+ Y_ABORT_UNLESS(Request->Record.HasPartitionRequest());
+
+ if (Request->Record.GetPartitionRequest().HasCookie()) {
+ return Request->Record.GetPartitionRequest().GetCookie();
+ }
+
+ return {};
+}
+
+void TPQTabletMock::PrepareGetOwnershipResponse()
+{
+ Response = std::make_unique<TEvPersQueue::TEvResponse>();
+ Response->Record.SetStatus(NMsgBusProxy::MSTATUS_OK);
+ Response->Record.SetErrorCode(NPersQueue::NErrorCode::OK);
+
+ auto* partition = Response->Record.MutablePartitionResponse();
+
+ if (auto cookie = GetPartitionRequestCookie()) {
+ partition->SetCookie(*cookie);
+ }
+
+ partition->MutableCmdGetOwnershipResult()->SetOwnerCookie(OwnerCookie);
+}
+
+void TPQTabletMock::PrepareGetMaxSeqNoResponse()
+{
+ Response = std::make_unique<TEvPersQueue::TEvResponse>();
+ Response->Record.SetStatus(NMsgBusProxy::MSTATUS_OK);
+ Response->Record.SetErrorCode(NPersQueue::NErrorCode::OK);
+
+ auto* partition = Response->Record.MutablePartitionResponse();
+
+ if (auto cookie = GetPartitionRequestCookie()) {
+ partition->SetCookie(*cookie);
+ }
+
+ auto* sourceIdInfo = partition->MutableCmdGetMaxSeqNoResult()->AddSourceIdInfo();
+ sourceIdInfo->SetState(NKikimrPQ::TMessageGroupInfo::STATE_REGISTERED);
+ sourceIdInfo->SetSeqNo(MaxSeqNo);
+}
+
+auto TPQTabletMock::MakeReserveBytesResponse(ui64 cookie) -> TEvResponsePtr
+{
+ auto event = std::make_unique<TEvPersQueue::TEvResponse>();
+ event->Record.SetStatus(NMsgBusProxy::MSTATUS_OK);
+ event->Record.SetErrorCode(NPersQueue::NErrorCode::OK);
+
+ auto* partition = event->Record.MutablePartitionResponse();
+ partition->SetCookie(cookie);
+
+ //
+ // CmdWriteResultSize == 0
+ //
+
+ return event;
+}
+
+auto TPQTabletMock::MakeWriteResponse(ui64 cookie) -> TEvResponsePtr
+{
+ auto event = std::make_unique<TEvPersQueue::TEvResponse>();
+ event->Record.SetStatus(NMsgBusProxy::MSTATUS_OK);
+ event->Record.SetErrorCode(NPersQueue::NErrorCode::OK);
+
+ auto* partition = event->Record.MutablePartitionResponse();
+ partition->SetCookie(cookie);
+
+ //
+ // CmdWriteResultSize == 1
+ //
+ auto* w = partition->AddCmdWriteResult();
+ Y_UNUSED(w);
+
+ return event;
+}
+
+void TPQTabletMock::PrepareReserveBytesResponse(const TActorId& recipient)
+{
+ if (auto cookie = GetPartitionRequestCookie(); cookie) {
+ TReply reply;
+ reply.Response = MakeReserveBytesResponse(*cookie);
+ reply.Recipient = recipient;
+
+ CmdReserve.Replies[*cookie] = std::move(reply);
+ }
+}
+
+void TPQTabletMock::PrepareWriteResponse(const TActorId& recipient)
+{
+ if (auto cookie = GetPartitionRequestCookie(); cookie) {
+ TReply reply;
+ reply.Response = MakeWriteResponse(*cookie);
+ reply.Recipient = recipient;
+
+ CmdWrite.Replies[*cookie] = std::move(reply);
+ }
+}
+
+void TPQTabletMock::TrySendResponses(const TActorContext& ctx, TCommandReplies& cmd)
+{
+ size_t count = 0;
+ for (auto cookie : cmd.ExpectedRequests) {
+ count += cmd.Replies.contains(cookie);
+ }
+
+ if (count != cmd.ExpectedRequests.size()) {
+ return;
+ }
+
+ for (auto cookie : cmd.ExpectedRequests) {
+ auto p = cmd.Replies.find(cookie);
+ auto& reply = p->second;
+
+ ctx.Send(reply.Recipient, std::move(reply.Response));
+ cmd.Replies.erase(p);
+ }
+
+ for (auto& [_, reply] : cmd.Replies) {
+ ctx.Send(reply.Recipient, std::move(reply.Response));
+ }
+ cmd.Replies.clear();
+}
+
+TPQTabletMock::TPQTabletMock(const TActorId& tablet, TTabletStorageInfo* info) :
+ TActor(&TThis::StateBoot),
+ TTabletExecutedFlat(info, tablet, nullptr)
+{
+}
+
+void TPQTabletMock::OnDetach(const TActorContext &ctx)
+{
+ Die(ctx);
+}
+
+void TPQTabletMock::OnTabletDead(TEvTablet::TEvTabletDead::TPtr &ev, const TActorContext &ctx)
+{
+ Y_UNUSED(ev);
+
+ Die(ctx);
+}
+
+void TPQTabletMock::DefaultSignalTabletActive(const TActorContext &ctx)
+{
+ Y_UNUSED(ctx);
+}
+
+void TPQTabletMock::OnActivateExecutor(const TActorContext &ctx)
+{
+ Become(&TThis::StateWork);
+ SignalTabletActive(ctx);
+}
+
+STFUNC(TPQTabletMock::StateBoot)
+{
+ StateInitImpl(ev, SelfId());
+}
+
+STFUNC(TPQTabletMock::StateWork)
+{
+ switch (ev->GetTypeRewrite()) {
+ HFunc(TEvTabletPipe::TEvClientConnected, Handle);
+ HFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
+
+ HFunc(TEvPersQueue::TEvRequest, Handle);
+ default:
+ HandleDefaultEvents(ev, SelfId());
+ }
+}
+
+void TPQTabletMock::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx)
+{
+ Y_UNUSED(ev);
+ Y_UNUSED(ctx);
+}
+
+void TPQTabletMock::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx)
+{
+ Y_UNUSED(ev);
+ Y_UNUSED(ctx);
+}
+
+void TPQTabletMock::Handle(TEvPersQueue::TEvRequest::TPtr& ev, const TActorContext& ctx)
+{
+ Request.reset(ev->Release().Release());
+
+ Y_ABORT_UNLESS(Request->Record.HasPartitionRequest());
+ auto& partition = Request->Record.GetPartitionRequest();
+
+ if (partition.HasCmdGetOwnership()) {
+ PrepareGetOwnershipResponse();
+ UNIT_ASSERT(Response.get());
+ ctx.Send(ev->Sender, std::move(Response));
+ } else if (partition.HasCmdGetMaxSeqNo()) {
+ PrepareGetMaxSeqNoResponse();
+ UNIT_ASSERT(Response.get());
+ ctx.Send(ev->Sender, std::move(Response));
+ } else if (partition.HasCmdReserveBytes()) {
+ PrepareReserveBytesResponse(ev->Sender);
+ TrySendResponses(ctx, CmdReserve);
+ } else if (partition.CmdWriteSize()) {
+ PrepareWriteResponse(ev->Sender);
+ TrySendResponses(ctx, CmdWrite);
+ } else {
+ Y_ABORT_UNLESS(false);
+ }
+}
+
+void TPQTabletMock::AppendWriteReply(ui64 cookie)
+{
+ CmdReserve.ExpectedRequests.push_back(cookie);
+ CmdWrite.ExpectedRequests.push_back(cookie);
+}
+
+}
diff --git a/ydb/services/persqueue_v1/ut/pqtablet_mock.h b/ydb/services/persqueue_v1/ut/pqtablet_mock.h
new file mode 100644
index 0000000000..842a63e00d
--- /dev/null
+++ b/ydb/services/persqueue_v1/ut/pqtablet_mock.h
@@ -0,0 +1,71 @@
+#pragma once
+
+#include <ydb/core/base/tablet_pipe.h>
+#include <ydb/core/persqueue/events/global.h>
+#include <ydb/core/tablet_flat/tablet_flat_executed.h>
+
+#include <ydb/public/lib/base/msgbus_status.h>
+
+#include <ydb/library/actors/core/actor.h>
+#include <ydb/library/actors/core/actorid.h>
+
+#include <memory>
+
+namespace NKikimr::NPersQueueTests {
+
+class TPQTabletMock : public TActor<TPQTabletMock>, public NTabletFlatExecutor::TTabletExecutedFlat {
+public:
+ TPQTabletMock(const TActorId& tablet, TTabletStorageInfo* info);
+
+ void AppendWriteReply(ui64 cookie);
+
+private:
+ using TEvRequestPtr = std::unique_ptr<TEvPersQueue::TEvRequest>;
+ using TEvResponsePtr = std::unique_ptr<TEvPersQueue::TEvResponse>;
+
+ struct TReply {
+ TEvResponsePtr Response;
+ TActorId Recipient;
+ };
+
+ struct TCommandReplies {
+ TDeque<ui64> ExpectedRequests;
+ THashMap<ui64, TReply> Replies;
+ };
+
+ void OnDetach(const TActorContext &ctx) override;
+ void OnTabletDead(TEvTablet::TEvTabletDead::TPtr &ev, const TActorContext &ctx) override;
+ void DefaultSignalTabletActive(const TActorContext &ctx) override;
+ void OnActivateExecutor(const TActorContext &ctx) override;
+
+ STFUNC(StateBoot);
+ STFUNC(StateWork);
+
+ void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx);
+
+ void Handle(TEvPersQueue::TEvRequest::TPtr& ev, const TActorContext& ctx);
+
+ TMaybe<ui64> GetPartitionRequestCookie() const;
+
+ void PrepareGetOwnershipResponse();
+ void PrepareGetMaxSeqNoResponse();
+ void PrepareReserveBytesResponse(const TActorId& recipient);
+ void PrepareWriteResponse(const TActorId& recipient);
+
+ static TEvResponsePtr MakeReserveBytesResponse(ui64 cookie);
+ static TEvResponsePtr MakeWriteResponse(ui64 cookie);
+
+ void TrySendResponses(const TActorContext& ctx, TCommandReplies& cmd);
+
+ TEvRequestPtr Request;
+ TEvResponsePtr Response;
+
+ TString OwnerCookie = "owner-cookie";
+ ui64 MaxSeqNo = 0;
+
+ TCommandReplies CmdReserve;
+ TCommandReplies CmdWrite;
+};
+
+}
diff --git a/ydb/services/persqueue_v1/ut/ya.make b/ydb/services/persqueue_v1/ut/ya.make
index 78d0a7cbea..279e0e2d5c 100644
--- a/ydb/services/persqueue_v1/ut/ya.make
+++ b/ydb/services/persqueue_v1/ut/ya.make
@@ -28,6 +28,12 @@ SRCS(
functions_executor_wrapper.cpp
topic_service_ut.cpp
demo_tx.cpp
+
+ partition_writer_cache_actor_ut.cpp
+
+ pqtablet_mock.cpp
+ kqp_mock.cpp
+ partition_writer_cache_actor_fixture.cpp
)
PEERDIR(