diff options
author | abcdef <akotov@ydb.tech> | 2023-11-27 11:52:45 +0300 |
---|---|---|
committer | abcdef <akotov@ydb.tech> | 2023-11-27 12:17:01 +0300 |
commit | d31477e5ed13679dd3b409b100623cc81a5e0964 (patch) | |
tree | 4f37b3cdf1c700eea8d415be0c238c611df49a99 | |
parent | 8ff120094ddf6980a0ebd047056c133ce1c71130 (diff) | |
download | ydb-d31477e5ed13679dd3b409b100623cc81a5e0964.tar.gz |
limit on the number of transactions per writing session
ограничение на число транзакций для одной сессии записи
13 files changed, 543 insertions, 267 deletions
diff --git a/ydb/core/persqueue/writer/writer.h b/ydb/core/persqueue/writer/writer.h index 57df28f26b..a44c98be5a 100644 --- a/ydb/core/persqueue/writer/writer.h +++ b/ydb/core/persqueue/writer/writer.h @@ -21,6 +21,8 @@ struct TEvPartitionWriter { EvWriteResponse, EvDisconnected, + EvTxWriteRequest, + EvEnd, }; @@ -77,6 +79,10 @@ struct TEvPartitionWriter { explicit TEvWriteRequest(ui64 cookie) { Record.MutablePartitionRequest()->SetCookie(cookie); } + + ui64 GetCookie() const { + return Record.GetPartitionRequest().GetCookie(); + } }; struct TEvWriteAccepted: public TEventLocal<TEvWriteAccepted, EvWriteAccepted> { @@ -101,6 +107,7 @@ struct TEvPartitionWriter { PartitionNotLocal, // Partitition restarted. PartitionDisconnected, + OverloadError, }; struct TSuccess { @@ -144,6 +151,19 @@ struct TEvPartitionWriter { struct TEvDisconnected: public TEventLocal<TEvDisconnected, EvDisconnected> { }; + struct TEvTxWriteRequest : public TEventLocal<TEvTxWriteRequest, EvTxWriteRequest> { + TEvTxWriteRequest(const TString& sessionId, const TString& txId, THolder<TEvWriteRequest>&& request) : + SessionId(sessionId), + TxId(txId), + Request(std::move(request)) + { + } + + TString SessionId; + TString TxId; + THolder<TEvWriteRequest> Request; + }; + }; // TEvPartitionWriter struct TPartitionWriterOpts { diff --git a/ydb/services/persqueue_v1/actors/CMakeLists.darwin-arm64.txt b/ydb/services/persqueue_v1/actors/CMakeLists.darwin-arm64.txt index bc403f982c..f9b4e65561 100644 --- a/ydb/services/persqueue_v1/actors/CMakeLists.darwin-arm64.txt +++ b/ydb/services/persqueue_v1/actors/CMakeLists.darwin-arm64.txt @@ -39,4 +39,6 @@ target_sources(services-persqueue_v1-actors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/read_info_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/schema_actors.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/update_offsets_in_transaction_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/partition_writer.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp ) diff --git a/ydb/services/persqueue_v1/actors/CMakeLists.darwin-x86_64.txt b/ydb/services/persqueue_v1/actors/CMakeLists.darwin-x86_64.txt index bc403f982c..f9b4e65561 100644 --- a/ydb/services/persqueue_v1/actors/CMakeLists.darwin-x86_64.txt +++ b/ydb/services/persqueue_v1/actors/CMakeLists.darwin-x86_64.txt @@ -39,4 +39,6 @@ target_sources(services-persqueue_v1-actors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/read_info_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/schema_actors.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/update_offsets_in_transaction_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/partition_writer.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp ) diff --git a/ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt b/ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt index 595cf29d92..c74bdef3b2 100644 --- a/ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt +++ b/ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt @@ -40,4 +40,6 @@ target_sources(services-persqueue_v1-actors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/read_info_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/schema_actors.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/update_offsets_in_transaction_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/partition_writer.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp ) diff --git a/ydb/services/persqueue_v1/actors/CMakeLists.linux-x86_64.txt b/ydb/services/persqueue_v1/actors/CMakeLists.linux-x86_64.txt index 595cf29d92..c74bdef3b2 100644 --- a/ydb/services/persqueue_v1/actors/CMakeLists.linux-x86_64.txt +++ b/ydb/services/persqueue_v1/actors/CMakeLists.linux-x86_64.txt @@ -40,4 +40,6 @@ target_sources(services-persqueue_v1-actors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/read_info_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/schema_actors.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/update_offsets_in_transaction_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/partition_writer.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp ) diff --git a/ydb/services/persqueue_v1/actors/CMakeLists.windows-x86_64.txt b/ydb/services/persqueue_v1/actors/CMakeLists.windows-x86_64.txt index bc403f982c..f9b4e65561 100644 --- a/ydb/services/persqueue_v1/actors/CMakeLists.windows-x86_64.txt +++ b/ydb/services/persqueue_v1/actors/CMakeLists.windows-x86_64.txt @@ -39,4 +39,6 @@ target_sources(services-persqueue_v1-actors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/read_info_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/schema_actors.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/update_offsets_in_transaction_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/partition_writer.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp ) diff --git a/ydb/services/persqueue_v1/actors/partition_writer.cpp b/ydb/services/persqueue_v1/actors/partition_writer.cpp index 26d378100b..777be691a9 100644 --- a/ydb/services/persqueue_v1/actors/partition_writer.cpp +++ b/ydb/services/persqueue_v1/actors/partition_writer.cpp @@ -1,109 +1,65 @@ +#include "partition_writer.h" + namespace NKikimr::NGRpcProxy::V1 { const ui32 MAX_RESERVE_REQUESTS_INFLIGHT = 5; -template<class TEvWrite> -TPartitionWriterImpl<TEvWrite>::TPartitionWriterImpl(NKikimr::NPQ::TMultiCounter& bytesInflight, - NKikimr::NPQ::TMultiCounter& bytesInflightTotal, - ui64& bytesInflight_, - ui64& bytesInflightTotal_) : - BytesInflight(bytesInflight), - BytesInflightTotal(bytesInflightTotal), - BytesInflight_(bytesInflight_), - BytesInflightTotal_(bytesInflightTotal_) -{ -} - -template<class TEvWrite> -void TPartitionWriterImpl<TEvWrite>::OnEvInitResult(NPQ::TEvPartitionWriter::TEvInitResult::TPtr& ev) +void TPartitionWriter::OnEvInitResult(const NPQ::TEvPartitionWriter::TEvInitResult::TPtr& ev) { const auto& result = *ev->Get(); Y_ABORT_UNLESS(result.IsSuccess()); + OwnerCookie = result.GetResult().OwnerCookie; + MaxSeqNo = result.GetResult().SourceIdInfo.GetSeqNo(); } -template<class TEvWrite> -ui64 TPartitionWriterImpl<TEvWrite>::OnEvWriteAccepted(NPQ::TEvPartitionWriter::TEvWriteAccepted::TPtr& ev) +void TPartitionWriter::OnWriteRequest(THolder<NPQ::TEvPartitionWriter::TEvWriteRequest>&& ev, + const TActorContext& ctx) { - Y_ABORT_UNLESS(!SentRequests.empty()); - - auto request = std::move(SentRequests.front()); - Y_ABORT_UNLESS(ev->Get()->Cookie == request->Cookie); - - SentRequests.pop_front(); - - ui64 size = request->ByteSize; + Y_ABORT_UNLESS(ev->Record.HasPartitionRequest()); - AcceptedRequests.emplace_back(std::move(request)); - - return size; + if (SentRequests.size() < MAX_RESERVE_REQUESTS_INFLIGHT) { + SentRequests.push_back(ev->Record.GetPartitionRequest().GetCookie()); + ctx.Send(Actor, ev.Release()); + } else { + QuotedRequests.push_back(std::move(ev)); + } } -template<class TEvWrite> -auto TPartitionWriterImpl<TEvWrite>::OnEvWriteResponse(NPQ::TEvPartitionWriter::TEvWriteResponse::TPtr& ev) -> TWriteRequestInfoPtr +void TPartitionWriter::OnWriteAccepted(const NPQ::TEvPartitionWriter::TEvWriteAccepted& ev, const TActorContext& ctx) { - Y_ABORT_UNLESS(!AcceptedRequests.empty()); - - auto request = std::move(AcceptedRequests.front()); - AcceptedRequests.pop_front(); + Y_ABORT_UNLESS(!SentRequests.empty()); + Y_ABORT_UNLESS(ev.Cookie == SentRequests.front()); - const auto& resp = ev->Get()->Record.GetPartitionResponse(); - Y_ABORT_UNLESS(resp.GetCookie() == request->Cookie); + AcceptedRequests.push_back(SentRequests.front()); + SentRequests.pop_front(); - return request; -} + if (QuotedRequests.empty()) { + return; + } -template<class TEvWrite> -bool TPartitionWriterImpl<TEvWrite>::AnyRequests() const -{ - return !QuotedRequests.empty() || !SentRequests.empty() || !AcceptedRequests.empty(); -} + if (SentRequests.size() < MAX_RESERVE_REQUESTS_INFLIGHT) { + auto next = std::move(QuotedRequests.front()); + QuotedRequests.pop_front(); -template<class TEvWrite> -void TPartitionWriterImpl<TEvWrite>::AddWriteRequest(TWriteRequestInfoPtr request, const TActorContext& ctx) -{ - if (!QuotedRequests.empty() && SentRequests.size() < MAX_RESERVE_REQUESTS_INFLIGHT) { - SendRequest(std::move(request), ctx); - } else { - QuotedRequests.emplace_back(std::move(request)); + SentRequests.push_back(next->Record.GetPartitionRequest().GetCookie()); + ctx.Send(Actor, next.Release()); } } -template<class TEvWrite> -bool TPartitionWriterImpl<TEvWrite>::TrySendNextQuotedRequest(const TActorContext& ctx) +void TPartitionWriter::OnWriteResponse(const NPQ::TEvPartitionWriter::TEvWriteResponse& ev) { - if (QuotedRequests.empty()) { - return false; - } + Y_ABORT_UNLESS(ev.IsSuccess()); - SendRequest(std::move(QuotedRequests.front()), ctx); - QuotedRequests.pop_front(); + Y_ABORT_UNLESS(!AcceptedRequests.empty()); + Y_ABORT_UNLESS(ev.Record.GetPartitionResponse().GetCookie() == AcceptedRequests.front()); - return true; + AcceptedRequests.pop_front(); } -template<class TEvWrite> -void TPartitionWriterImpl<TEvWrite>::SendRequest(TWriteRequestInfoPtr request, const TActorContext& ctx) +bool TPartitionWriter::HasPendingRequests() const { - Y_ABORT_UNLESS(request->PartitionWriteRequest); - - i64 diff = 0; - for (const auto& w : request->UserWriteRequests) { - diff -= w->Request.ByteSize(); - } - - Y_ABORT_UNLESS(-diff <= (i64)BytesInflight_); - diff += request->PartitionWriteRequest->Record.ByteSize(); - BytesInflight_ += diff; - BytesInflightTotal_ += diff; - - if (BytesInflight && BytesInflightTotal) { - BytesInflight.Inc(diff); - BytesInflightTotal.Inc(diff); - } - - ctx.Send(PartitionWriterActor, std::move(request->PartitionWriteRequest)); - SentRequests.emplace_back(std::move(request)); + return !QuotedRequests.empty() || !SentRequests.empty() || !AcceptedRequests.empty(); } } diff --git a/ydb/services/persqueue_v1/actors/partition_writer.h b/ydb/services/persqueue_v1/actors/partition_writer.h index b47ddfaa82..78b8a41278 100644 --- a/ydb/services/persqueue_v1/actors/partition_writer.h +++ b/ydb/services/persqueue_v1/actors/partition_writer.h @@ -6,54 +6,27 @@ namespace NKikimr::NGRpcProxy::V1 { -template<class TEvWrite> -struct TPartitionWriterImpl { - using TWriteRequestInfoPtr = typename TWriteRequestInfoImpl<TEvWrite>::TPtr; - - TPartitionWriterImpl(NKikimr::NPQ::TMultiCounter& bytesInflight, - NKikimr::NPQ::TMultiCounter& bytesInflightTotal, - ui64& bytesInflight_, - ui64& bytesInflightTotal_); - - // - // from client - // - void OnEvWrite(TEvPQProxy::TEvTopicWrite::TPtr& ev, const TActorContext& ctx); - - // - // from partition writer actor - // - void OnEvInitResult(NPQ::TEvPartitionWriter::TEvInitResult::TPtr& ev); - ui64 OnEvWriteAccepted(NPQ::TEvPartitionWriter::TEvWriteAccepted::TPtr& ev); - TWriteRequestInfoPtr OnEvWriteResponse(NPQ::TEvPartitionWriter::TEvWriteResponse::TPtr& ev); - - // - // from quoter - // - void OnEvWakeup(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx); - - bool AnyRequests() const; - void AddWriteRequest(TWriteRequestInfoPtr request, const TActorContext& ctx); - bool TrySendNextQuotedRequest(const TActorContext& ctx); - void SendRequest(TWriteRequestInfoPtr request, const TActorContext& ctx); - - TActorId PartitionWriterActor; +struct TPartitionWriter { + TPartitionWriter() = default; + + void OnEvInitResult(const NPQ::TEvPartitionWriter::TEvInitResult::TPtr& ev); + void OnWriteRequest(THolder<NPQ::TEvPartitionWriter::TEvWriteRequest>&& ev, const TActorContext& ctx); + void OnWriteAccepted(const NPQ::TEvPartitionWriter::TEvWriteAccepted& ev, const TActorContext& ctx); + void OnWriteResponse(const NPQ::TEvPartitionWriter::TEvWriteResponse& ev); + + bool HasPendingRequests() const; + + TActorId Actor; TString OwnerCookie; + ui64 MaxSeqNo = 0; + TInstant LastActivity; // Quoted, but not sent requests - TDeque<TWriteRequestInfoPtr> QuotedRequests; + TDeque<THolder<NPQ::TEvPartitionWriter::TEvWriteRequest>> QuotedRequests; // Requests that is sent to partition actor, but not accepted - TDeque<TWriteRequestInfoPtr> SentRequests; + TDeque<ui64> SentRequests; // Accepted requests - TDeque<TWriteRequestInfoPtr> AcceptedRequests; - - NKikimr::NPQ::TMultiCounter& BytesInflight; - NKikimr::NPQ::TMultiCounter& BytesInflightTotal; - - ui64& BytesInflight_; - ui64& BytesInflightTotal_; + TDeque<ui64> AcceptedRequests; }; } - -#include "partition_writer.cpp" diff --git a/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp b/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp new file mode 100644 index 0000000000..a7cc60c1cf --- /dev/null +++ b/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp @@ -0,0 +1,285 @@ +#include "partition_writer_cache_actor.h" +#include <ydb/core/persqueue/writer/writer.h> + +namespace NKikimr::NGRpcProxy::V1 { + +TPartitionWriterCacheActor::TPartitionWriterCacheActor(const TActorId& owner, + ui32 partition, + ui64 tabletId, + std::optional<ui32> expectedGeneration, + const TString& sourceId, + const NPQ::TPartitionWriterOpts& opts) : + Owner(owner), + Partition(partition), + TabletId(tabletId), + ExpectedGeneration(expectedGeneration), + SourceId(sourceId), + Opts(opts) +{ +} + +void TPartitionWriterCacheActor::Bootstrap(const TActorContext& ctx) +{ + RegisterDefaultPartitionWriter(ctx); + + this->Become(&TPartitionWriterCacheActor::StateWork); +} + +void TPartitionWriterCacheActor::RegisterPartitionWriter(const TString& sessionId, const TString& txId, + const TActorContext& ctx) +{ + std::pair<TString, TString> key(sessionId, txId); + + auto writer = std::make_unique<TPartitionWriter>(); + writer->Actor = CreatePartitionWriter(sessionId, txId, ctx); + writer->LastActivity = ctx.Now(); + + Writers.emplace(key, std::move(writer)); +} + +void TPartitionWriterCacheActor::RegisterDefaultPartitionWriter(const TActorContext& ctx) +{ + RegisterPartitionWriter("", "", ctx); +} + +STFUNC(TPartitionWriterCacheActor::StateWork) +{ + switch (ev->GetTypeRewrite()) { + HFunc(NPQ::TEvPartitionWriter::TEvTxWriteRequest, Handle); + HFunc(NPQ::TEvPartitionWriter::TEvInitResult, Handle); + HFunc(NPQ::TEvPartitionWriter::TEvWriteAccepted, Handle); + HFunc(NPQ::TEvPartitionWriter::TEvWriteResponse, Handle); + HFunc(NPQ::TEvPartitionWriter::TEvDisconnected, Handle); + HFunc(TEvents::TEvPoisonPill, Handle); + } +} + +void TPartitionWriterCacheActor::ReplyError(const TString& sessionId, const TString& txId, + EErrorCode code, const TString& reason, + ui64 cookie, + const TActorContext& ctx) +{ + NKikimrClient::TResponse response; + response.MutablePartitionResponse()->SetCookie(cookie); + + ctx.Send(Owner, new NPQ::TEvPartitionWriter::TEvWriteResponse(sessionId, txId, + code, reason, + std::move(response))); +} + +void TPartitionWriterCacheActor::Handle(NPQ::TEvPartitionWriter::TEvTxWriteRequest::TPtr& ev, const TActorContext& ctx) +{ + auto& event = *ev->Get(); + + if (auto* writer = GetPartitionWriter(event.SessionId, event.TxId, ctx); writer) { + if (PendingWriteAccepted.Expected == Max<ui64>()) { + Y_ABORT_UNLESS(PendingWriteResponse.Expected == Max<ui64>()); + + PendingWriteAccepted.Expected = event.Request->GetCookie(); + PendingWriteResponse.Expected = event.Request->GetCookie(); + } + + writer->LastActivity = ctx.Now(); + writer->OnWriteRequest(std::move(event.Request), ctx); + } else { + ReplyError(event.SessionId, event.TxId, + EErrorCode::OverloadError, "limit of active transactions has been exceeded", + event.Request->GetCookie(), + ctx); + this->Become(&TPartitionWriterCacheActor::StateBroken); + } +} + +void TPartitionWriterCacheActor::HandleOnBroken(NPQ::TEvPartitionWriter::TEvTxWriteRequest::TPtr& ev, const TActorContext& ctx) +{ + auto& event = *ev->Get(); + + ReplyError(event.SessionId, event.TxId, + EErrorCode::OverloadError, "limit of active transactions has been exceeded", + event.Request->GetCookie(), + ctx); +} + +void TPartitionWriterCacheActor::Handle(NPQ::TEvPartitionWriter::TEvInitResult::TPtr& ev, const TActorContext& ctx) +{ + auto& result = *ev->Get(); + + auto key = std::make_pair(result.SessionId, result.TxId); + auto p = Writers.find(key); + Y_ABORT_UNLESS(p != Writers.end()); + + if (result.IsSuccess()) { + p->second->OnEvInitResult(ev); + } + + if (!result.SessionId && !result.TxId) { + ctx.Send(Owner, ev->Release().Release()); + } +} + +template <class TEvent> +void TPartitionWriterCacheActor::TryForwardToOwner(TEvent* event, TEventQueue<TEvent>& queue, + ui64 cookie, + const TActorContext& ctx) +{ + Y_ABORT_UNLESS(queue.Expected != Max<ui64>()); + + if (queue.Expected == cookie) { + ctx.Send(Owner, event); + + ++queue.Expected; + for (auto p = queue.Events.find(queue.Expected); p != queue.Events.end(); ) { + ctx.Send(Owner, p->second.release()); + queue.Events.erase(queue.Expected); + + ++queue.Expected; + p = queue.Events.find(queue.Expected); + } + } else { + queue.Events.emplace(cookie, event); + } +} + +void TPartitionWriterCacheActor::Handle(NPQ::TEvPartitionWriter::TEvWriteAccepted::TPtr& ev, const TActorContext& ctx) +{ + const auto& result = *ev->Get(); + + auto key = std::make_pair(result.SessionId, result.TxId); + auto p = Writers.find(key); + Y_ABORT_UNLESS(p != Writers.end()); + + if (result.Cookie == p->second->SentRequests.front()) { + p->second->OnWriteAccepted(result, ctx); + + TryForwardToOwner(ev->Release().Release(), PendingWriteAccepted, + result.Cookie, + ctx); + } else { + ReplyError(result.SessionId, result.TxId, + EErrorCode::InternalError, "out of order reserve bytes response from server, may be previous is lost", + p->second->SentRequests.front(), + ctx); + this->Become(&TPartitionWriterCacheActor::StateBroken); + } +} + +void TPartitionWriterCacheActor::Handle(NPQ::TEvPartitionWriter::TEvWriteResponse::TPtr& ev, const TActorContext& ctx) +{ + const auto& result = *ev->Get(); + + auto key = std::make_pair(result.SessionId, result.TxId); + auto p = Writers.find(key); + Y_ABORT_UNLESS(p != Writers.end()); + + if (result.IsSuccess()) { + ui64 cookie = result.Record.GetPartitionResponse().GetCookie(); + if (cookie == p->second->AcceptedRequests.front()) { + p->second->OnWriteResponse(result); + + TryForwardToOwner(ev->Release().Release(), PendingWriteResponse, + cookie, + ctx); + } else { + ReplyError(result.SessionId, result.TxId, + EErrorCode::InternalError, "out of order write response from server, may be previous is lost", + p->second->AcceptedRequests.front(), + ctx); + this->Become(&TPartitionWriterCacheActor::StateBroken); + } + } else { + ctx.Send(Owner, ev->Release().Release()); + } +} + +void TPartitionWriterCacheActor::Handle(NPQ::TEvPartitionWriter::TEvDisconnected::TPtr& ev, const TActorContext& ctx) +{ + ctx.Send(Owner, ev->Release().Release()); +} + +void TPartitionWriterCacheActor::Handle(TEvents::TEvPoisonPill::TPtr& ev, const TActorContext& ctx) +{ + Y_UNUSED(ev); + + for (auto& [_, writer] : Writers) { + ctx.Send(writer->Actor, new TEvents::TEvPoisonPill()); + } +} + +auto TPartitionWriterCacheActor::GetPartitionWriter(const TString& sessionId, const TString& txId, + const TActorContext& ctx) -> TPartitionWriter* +{ + auto key = std::make_pair(sessionId, txId); + + auto p = Writers.find(key); + if (p != Writers.end()) { + return p->second.get(); + } + + if (Writers.size() >= MAX_TRANSACTIONS_COUNT) { + if (!TryDeleteOldestWriter(ctx)) { + return nullptr; + } + } + + RegisterPartitionWriter(sessionId, txId, ctx); + + p = Writers.find(key); + Y_ABORT_UNLESS(p != Writers.end()); + + return p->second.get(); +} + +bool TPartitionWriterCacheActor::TryDeleteOldestWriter(const TActorContext& ctx) +{ + Y_ABORT_UNLESS(!Writers.empty()); + + auto minLastActivity = TInstant::Max(); + auto oldest = Writers.end(); + + for (auto p = Writers.begin(); p != Writers.end(); ++p) { + auto& writer = *p->second; + + if ((writer.LastActivity < minLastActivity) && !writer.HasPendingRequests()) { + minLastActivity = writer.LastActivity; + oldest = p; + } + } + + if (minLastActivity == TInstant::Max()) { + return false; + } + + ctx.Send(oldest->second->Actor, new TEvents::TEvPoisonPill()); + Writers.erase(oldest); + + return true; +} + +TActorId TPartitionWriterCacheActor::CreatePartitionWriter(const TString& sessionId, const TString& txId, + const TActorContext& ctx) +{ + NPQ::TPartitionWriterOpts opts = Opts; + if (sessionId && txId) { + opts.WithSessionId(sessionId); + opts.WithTxId(txId); + } + + return ctx.RegisterWithSameMailbox(NPQ::CreatePartitionWriter( + ctx.SelfID, {}, TabletId, Partition, ExpectedGeneration, + SourceId, opts + )); +} + +STFUNC(TPartitionWriterCacheActor::StateBroken) +{ + switch (ev->GetTypeRewrite()) { + HFunc(NPQ::TEvPartitionWriter::TEvTxWriteRequest, HandleOnBroken); + HFunc(NPQ::TEvPartitionWriter::TEvInitResult, Handle); + HFunc(NPQ::TEvPartitionWriter::TEvWriteAccepted, Handle); + HFunc(NPQ::TEvPartitionWriter::TEvWriteResponse, Handle); + HFunc(NPQ::TEvPartitionWriter::TEvDisconnected, Handle); + HFunc(TEvents::TEvPoisonPill, Handle); + } +} + +} diff --git a/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.h b/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.h new file mode 100644 index 0000000000..cad18c3d89 --- /dev/null +++ b/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.h @@ -0,0 +1,81 @@ +#pragma once + +#include "events.h" +#include "partition_writer.h" + +#include <library/cpp/actors/core/actor_bootstrapped.h> + +namespace NKikimr::NGRpcProxy::V1 { + +class TPartitionWriterCacheActor : public NActors::TActorBootstrapped<TPartitionWriterCacheActor> { +public: + TPartitionWriterCacheActor(const TActorId& owner, + ui32 partition, + ui64 tabletId, + std::optional<ui32> expectedGeneration, + const TString& sourceId, + const NPQ::TPartitionWriterOpts& opts); + + void Bootstrap(const TActorContext& ctx); + +private: + using TPartitionWriterPtr = std::unique_ptr<TPartitionWriter>; + using EErrorCode = NPQ::TEvPartitionWriter::TEvWriteResponse::EErrorCode; + + template <class TEvent> + struct TEventQueue { + TEventQueue() : + Expected(Max<ui64>()) + { + } + + ui64 Expected; + THashMap<ui64, std::unique_ptr<TEvent>> Events; + }; + + static constexpr const size_t MAX_TRANSACTIONS_COUNT = 4; + + STFUNC(StateWork); + STFUNC(StateBroken); + + void Handle(NPQ::TEvPartitionWriter::TEvTxWriteRequest::TPtr& ev, const TActorContext& ctx); + void HandleOnBroken(NPQ::TEvPartitionWriter::TEvTxWriteRequest::TPtr& ev, const TActorContext& ctx); + void Handle(NPQ::TEvPartitionWriter::TEvInitResult::TPtr& ev, const TActorContext& ctx); + void Handle(NPQ::TEvPartitionWriter::TEvWriteAccepted::TPtr& ev, const TActorContext& ctx); + void Handle(NPQ::TEvPartitionWriter::TEvWriteResponse::TPtr& ev, const TActorContext& ctx); + void Handle(NPQ::TEvPartitionWriter::TEvDisconnected::TPtr& ev, const TActorContext& ctx); + void Handle(TEvents::TEvPoisonPill::TPtr& ev, const TActorContext& ctx); + + void ReplyError(const TString& sessionId, const TString& txId, + EErrorCode code, const TString& reason, + ui64 cookie, + const TActorContext& ctx); + + TPartitionWriter* GetPartitionWriter(const TString& sessionId, const TString& txId, + const TActorContext& ctx); + bool TryDeleteOldestWriter(const TActorContext& ctx); + void RegisterPartitionWriter(const TString& sessionId, const TString& txId, + const TActorContext& ctx); + void RegisterDefaultPartitionWriter(const TActorContext& ctx); + TActorId CreatePartitionWriter(const TString& sessionId, const TString& txId, + const TActorContext& ctx); + + template <class TEvent> + void TryForwardToOwner(TEvent* event, TEventQueue<TEvent>& queue, + ui64 cookie, + const TActorContext& ctx); + + TActorId Owner; // WriteSessionActor + ui32 Partition; + ui64 TabletId; + std::optional<ui32> ExpectedGeneration; + TString SourceId; + NPQ::TPartitionWriterOpts Opts; + + THashMap<std::pair<TString, TString>, TPartitionWriterPtr> Writers; + + TEventQueue<NPQ::TEvPartitionWriter::TEvWriteAccepted> PendingWriteAccepted; + TEventQueue<NPQ::TEvPartitionWriter::TEvWriteResponse> PendingWriteResponse; +}; + +} diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.h b/ydb/services/persqueue_v1/actors/write_session_actor.h index 089583362c..545126389c 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.h +++ b/ydb/services/persqueue_v1/actors/write_session_actor.h @@ -4,6 +4,7 @@ #include "partition_writer.h" #include "persqueue_utils.h" #include "write_request_info.h" +#include "partition_writer_cache_actor.h" #include <library/cpp/actors/core/actor_bootstrapped.h> @@ -55,7 +56,6 @@ class TWriteSessionActor using TEvDescribeTopicsRequest = NMsgBusProxy::NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeTopicsRequest; using TWriteRequestInfo = TWriteRequestInfoImpl<TEvWrite>; - using TPartitionWriter = TPartitionWriterImpl<TEvWrite>; // Codec ID size in bytes static constexpr ui32 CODEC_ID_SIZE = 1; @@ -164,7 +164,7 @@ private: void MakeAndSentInitResponse(const TMaybe<ui64>& maxSeqNo, const TActorContext& ctx); void Handle(NPQ::TEvPartitionWriter::TEvWriteAccepted::TPtr& ev, const TActorContext& ctx); - void ProcessWriteResponse(const NKikimrClient::TPersQueuePartitionResponse& response, TPartitionWriter& writer, const TActorContext& ctx); + void ProcessWriteResponse(const NKikimrClient::TPersQueuePartitionResponse& response, const TActorContext& ctx); void Handle(NPQ::TEvPartitionWriter::TEvWriteResponse::TPtr& ev, const TActorContext& ctx); void Handle(NPQ::TEvPartitionWriter::TEvDisconnected::TPtr& ev, const TActorContext& ctx); void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const NActors::TActorContext& ctx); @@ -179,16 +179,14 @@ private: void CheckFinish(const NActors::TActorContext& ctx); void PrepareRequest(THolder<TEvWrite>&& ev, const TActorContext& ctx); - void SendRequest(TPartitionWriter& writer, typename TWriteRequestInfo::TPtr&& request, const TActorContext& ctx); + void SendWriteRequest(typename TWriteRequestInfo::TPtr&& request, const TActorContext& ctx); void SetupCounters(); void SetupCounters(const TString& cloudId, const TString& dbId, const TString& dbPath, const bool isServerless, const TString& folderId); private: - TPartitionWriter* FindPartitionWriter(const TString& sessionId, const TString& txId); - void InitPartitionWriter(const TString& sessionId, const TString& txId, - const TActorContext& ctx); - bool AnyRequests() const; + void CreatePartitionWriterCache(const TActorContext& ctx); + void DestroyPartitionWriterCache(const TActorContext& ctx); std::unique_ptr<TEvStreamWriteRequest> Request; @@ -232,6 +230,10 @@ private: // Request that is waiting for quota typename TWriteRequestInfo::TPtr PendingQuotaRequest; + // Requests that is sent to partition actor, but not accepted + std::deque<typename TWriteRequestInfo::TPtr> SentRequests; + // Accepted requests + std::deque<typename TWriteRequestInfo::TPtr> AcceptedRequests; bool WritesDone; @@ -301,7 +303,7 @@ private: TDeque<ui64> SeqNoInflight; - THashMap<std::pair<TString, TString>, TPartitionWriter> Writers; + TActorId PartitionWriterCache; }; } diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp index 62c4f2bd15..bfd91ad055 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp @@ -293,9 +293,6 @@ template<bool UseMigrationProtocol> void TWriteSessionActor<UseMigrationProtocol>::Die(const TActorContext& ctx) { if (State == ES_DYING) return; - for (auto& [_, writer] : Writers) { - ctx.Send(writer.PartitionWriterActor, new TEvents::TEvPoisonPill()); - } if (SessionsActive) { SessionsActive.Dec(); @@ -309,6 +306,8 @@ void TWriteSessionActor<UseMigrationProtocol>::Die(const TActorContext& ctx) { ctx.Send(GetPQWriteServiceActorID(), new TEvPQProxy::TEvSessionDead(Cookie)); + DestroyPartitionWriterCache(ctx); + if (State == ES_WAIT_SESSION) { // final die will be done later, on session discover State = ES_DYING; return; @@ -338,7 +337,7 @@ void TWriteSessionActor<UseMigrationProtocol>::CheckFinish(const TActorContext& CloseSession("out of order Writes done before initialization", PersQueue::ErrorCode::BAD_REQUEST, ctx); return; } - if (PendingRequests.empty() && !PendingQuotaRequest && !AnyRequests()) { + if (PendingRequests.empty() && !PendingQuotaRequest && SentRequests.empty() && AcceptedRequests.empty()) { CloseSession("", PersQueue::ErrorCode::OK, ctx); return; } @@ -1019,7 +1018,7 @@ void TWriteSessionActor<UseMigrationProtocol>::ProceedPartition(const ui32 parti return; } - InitPartitionWriter("", "", ctx); + CreatePartitionWriterCache(ctx); State = ES_WAIT_WRITER_INIT; @@ -1036,6 +1035,52 @@ void TWriteSessionActor<UseMigrationProtocol>::ProceedPartition(const ui32 parti } } +template <bool UseMigrationProtocol> +void TWriteSessionActor<UseMigrationProtocol>::CreatePartitionWriterCache(const TActorContext& ctx) +{ + NPQ::TPartitionWriterOpts opts; + + opts.WithDeduplication(UseDeduplication); + if constexpr (!UseMigrationProtocol) { + if (Request->GetDatabaseName()) { + opts.WithDatabase(*Request->GetDatabaseName()); + } + opts.WithTopicPath(InitRequest.path()); + if (Request->GetSerializedToken()) { + opts.WithToken(Request->GetSerializedToken()); + } + if (Request->GetTraceId()) { + opts.WithTraceId(*Request->GetTraceId()); + } + if (Request->GetRequestType()) { + opts.WithRequestType(*Request->GetRequestType()); + } + } + + auto it = PartitionToTablet.find(Partition); + ui64 tabletId = it != PartitionToTablet.end() ? it->second : 0; + + auto actor = + std::make_unique<TPartitionWriterCacheActor>(ctx.SelfID, + Partition, + tabletId, + ExpectedGeneration, + SourceId, + opts); + + PartitionWriterCache = ctx.RegisterWithSameMailbox(actor.release()); +} + +template <bool UseMigrationProtocol> +void TWriteSessionActor<UseMigrationProtocol>::DestroyPartitionWriterCache(const TActorContext& ctx) +{ + if (PartitionWriterCache == TActorId()) { + return; + } + + ctx.Send(PartitionWriterCache, new TEvents::TEvPoisonPill()); +} + template<bool UseMigrationProtocol> void TWriteSessionActor<UseMigrationProtocol>::CloseSession(const TString& errorReason, const PersQueue::ErrorCode::ErrorCode errorCode, const NActors::TActorContext& ctx) { @@ -1132,19 +1177,12 @@ template<bool UseMigrationProtocol> void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::TEvInitResult::TPtr& ev, const TActorContext& ctx) { const auto& result = *ev->Get(); - auto writer = FindPartitionWriter(result.SessionId, result.TxId); - Y_ABORT_UNLESS(writer != nullptr); - - if (!result.SessionId && !result.TxId) { - if (State != ES_WAIT_WRITER_INIT) { - return CloseSession("got init result but not wait for it", PersQueue::ErrorCode::ERROR, ctx); - } - } else { - if (State != ES_INITED) { - return CloseSession("got init result but not wait for it", PersQueue::ErrorCode::ERROR, ctx); - } + if (State != ES_WAIT_WRITER_INIT) { + return CloseSession("got init result but not wait for it", PersQueue::ErrorCode::ERROR, ctx); } + Y_ABORT_UNLESS(!result.SessionId && !result.TxId); + if (!result.IsSuccess()) { const auto& error = result.GetError(); if (error.Response.HasErrorCode()) { @@ -1154,17 +1192,15 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T } } - writer->OnEvInitResult(ev); + OwnerCookie = result.GetResult().OwnerCookie; const auto& maxSeqNo = result.GetResult().SourceIdInfo.GetSeqNo(); if (!UseDeduplication) { Y_ABORT_UNLESS(maxSeqNo == 0); } - if (!result.SessionId && !result.TxId) { - OwnerCookie = result.GetResult().OwnerCookie; - MakeAndSentInitResponse(maxSeqNo, ctx); - } + OwnerCookie = result.GetResult().OwnerCookie; + MakeAndSentInitResponse(maxSeqNo, ctx); } template<bool UseMigrationProtocol> @@ -1173,21 +1209,18 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T return CloseSession("got write permission but not wait for it", PersQueue::ErrorCode::ERROR, ctx); } - auto* writer = FindPartitionWriter(ev->Get()->SessionId, ev->Get()->TxId); - Y_ABORT_UNLESS(writer != nullptr); - - Y_ABORT_UNLESS(!writer->SentRequests.empty()); - auto writeRequest = std::move(writer->SentRequests.front()); + Y_ABORT_UNLESS(!SentRequests.empty()); + auto writeRequest = std::move(SentRequests.front()); if (ev->Get()->Cookie != writeRequest->Cookie) { return CloseSession("out of order reserve bytes response from server, may be previous is lost", PersQueue::ErrorCode::ERROR, ctx); } - writer->SentRequests.pop_front(); + SentRequests.pop_front(); ui64 diff = writeRequest->ByteSize; - writer->AcceptedRequests.emplace_back(std::move(writeRequest)); + AcceptedRequests.emplace_back(std::move(writeRequest)); BytesInflight_ -= diff; if (BytesInflight) { @@ -1203,20 +1236,17 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T } if (!IsQuotaRequired() && !PendingRequests.empty()) { - SendRequest(*writer, std::move(PendingRequests.front()), ctx); + SendWriteRequest(std::move(PendingRequests.front()), ctx); PendingRequests.pop_front(); - } else if (!writer->QuotedRequests.empty()) { - SendRequest(*writer, std::move(writer->QuotedRequests.front()), ctx); - writer->QuotedRequests.pop_front(); } } template<bool UseMigrationProtocol> void TWriteSessionActor<UseMigrationProtocol>::ProcessWriteResponse( - const NKikimrClient::TPersQueuePartitionResponse& response, TPartitionWriter& writer, const TActorContext& ctx + const NKikimrClient::TPersQueuePartitionResponse& response, const TActorContext& ctx ) { - auto writeRequest = std::move(writer.AcceptedRequests.front()); - writer.AcceptedRequests.pop_front(); + auto writeRequest = std::move(AcceptedRequests.front()); + AcceptedRequests.pop_front(); auto addAckMigration = [this]( const TPersQueuePartitionResponse::TCmdWriteResult& res, @@ -1361,21 +1391,19 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T } } - auto* writer = FindPartitionWriter(result.SessionId, result.TxId); - Y_ABORT_UNLESS(writer != nullptr); - - if (writer->AcceptedRequests.empty()) { + if (AcceptedRequests.empty()) { CloseSession("got too many replies from server, internal error", PersQueue::ErrorCode::ERROR, ctx); return; } - const auto& writeRequest = writer->AcceptedRequests.front(); + const auto& writeRequest = AcceptedRequests.front(); const auto& resp = result.Record.GetPartitionResponse(); if (resp.GetCookie() != writeRequest->Cookie) { return CloseSession("out of order write response from server, may be previous is lost", PersQueue::ErrorCode::ERROR, ctx); } - ProcessWriteResponse(resp, *writer, ctx); + + ProcessWriteResponse(resp, ctx); } template<bool UseMigrationProtocol> @@ -1488,10 +1516,6 @@ void TWriteSessionActor<UseMigrationProtocol>::PrepareRequest(THolder<TEvWrite>& for (i32 messageIndex = 0; messageIndex != writeRequest.messages_size(); ++messageIndex) { addData(writeRequest, messageIndex); } - - if (writeRequest.has_tx()) { - InitPartitionWriter(writeRequest.tx().session(), writeRequest.tx().id(), ctx); - } } pendingRequest->UserWriteRequests.push_back(std::move(ev)); @@ -1522,25 +1546,16 @@ void TWriteSessionActor<UseMigrationProtocol>::PrepareRequest(THolder<TEvWrite>& } } } else { - TPartitionWriter* writer = nullptr; - if constexpr (UseMigrationProtocol) { - Y_ABORT_UNLESS(Writers.size() == 1); - writer = &Writers.begin()->second; - } else { - const auto& [sessionId, txId] = pendingRequest->GetTransactionId(); - writer = FindPartitionWriter(sessionId, txId); - Y_ABORT_UNLESS(writer != nullptr); - } - - if (!PendingQuotaRequest && writer->SentRequests.size() < MAX_RESERVE_REQUESTS_INFLIGHT) { - SendRequest(*writer, std::move(PendingRequests.front()), ctx); + if (!PendingQuotaRequest) { + SendWriteRequest(std::move(PendingRequests.front()), ctx); PendingRequests.pop_front(); } } } template<bool UseMigrationProtocol> -void TWriteSessionActor<UseMigrationProtocol>::SendRequest(TPartitionWriter& writer, typename TWriteRequestInfo::TPtr&& request, const TActorContext& ctx) { +void TWriteSessionActor<UseMigrationProtocol>::SendWriteRequest(typename TWriteRequestInfo::TPtr&& request, const TActorContext& ctx) +{ Y_ABORT_UNLESS(request->PartitionWriteRequest); i64 diff = 0; @@ -1550,6 +1565,7 @@ void TWriteSessionActor<UseMigrationProtocol>::SendRequest(TPartitionWriter& wri Y_ABORT_UNLESS(-diff <= (i64)BytesInflight_); diff += request->PartitionWriteRequest->Record.ByteSize(); + BytesInflight_ += diff; BytesInflightTotal_ += diff; if (BytesInflight && BytesInflightTotal) { @@ -1557,8 +1573,14 @@ void TWriteSessionActor<UseMigrationProtocol>::SendRequest(TPartitionWriter& wri BytesInflightTotal.Inc(diff); } - ctx.Send(writer.PartitionWriterActor, std::move(request->PartitionWriteRequest)); - writer.SentRequests.push_back(std::move(request)); + auto [sessionId, txId] = request->GetTransactionId(); + auto event = + std::make_unique<NPQ::TEvPartitionWriter::TEvTxWriteRequest>(sessionId, txId, + std::move(request->PartitionWriteRequest)); + + ctx.Send(PartitionWriterCache, std::move(event)); + + SentRequests.push_back(std::move(request)); } template<bool UseMigrationProtocol> @@ -1809,15 +1831,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvents::TEvWakeup::TPtr& counters->AddConsumedRequestUnits(PendingQuotaRequest->RequiredQuota); } - const auto& [sessionId, txId] = PendingQuotaRequest->GetTransactionId(); - auto* writer = FindPartitionWriter(sessionId, txId); - Y_ABORT_UNLESS(writer != nullptr); - - if (writer->SentRequests.size() < MAX_RESERVE_REQUESTS_INFLIGHT) { - SendRequest(*writer, std::move(PendingQuotaRequest), ctx); - } else { - writer->QuotedRequests.push_back(std::move(PendingQuotaRequest)); - } + SendWriteRequest(std::move(PendingQuotaRequest), ctx); if (!PendingRequests.empty()) { Y_ABORT_UNLESS(MaybeRequestQuota(PendingRequests.front()->RequiredQuota, EWakeupTag::RlAllowed, ctx)); @@ -1861,74 +1875,5 @@ void TWriteSessionActor<UseMigrationProtocol>::RecheckACL(const TActorContext& c } } -template<bool UseMigrationProtocol> -auto TWriteSessionActor<UseMigrationProtocol>::FindPartitionWriter(const TString& sessionId, const TString& txId) -> TPartitionWriter* -{ - auto p = Writers.find(std::make_pair(sessionId, txId)); - if (p == Writers.end()) { - return nullptr; - } - return &p->second; -} - -template<bool UseMigrationProtocol> -void TWriteSessionActor<UseMigrationProtocol>::InitPartitionWriter(const TString& sessionId, const TString& txId, - const TActorContext& ctx) -{ - auto key = std::make_pair(sessionId, txId); - if (auto p = Writers.find(key); p != Writers.end()) { - return; - } - - auto [p, _] = Writers.emplace(std::piecewise_construct, - std::forward_as_tuple(key), - std::forward_as_tuple(BytesInflight, - BytesInflightTotal, - BytesInflight_, - BytesInflightTotal_)); - auto& writer = p->second; - - auto it = PartitionToTablet.find(Partition); - ui64 tabletId = it != PartitionToTablet.end() ? it->second : 0; - - TPartitionWriterOpts opts; - opts.WithDeduplication(UseDeduplication); - if constexpr (!UseMigrationProtocol) { - if (sessionId && txId) { - Y_ABORT_UNLESS(Request->GetDatabaseName()); - opts.WithDatabase(*Request->GetDatabaseName()); - opts.WithTopicPath(InitRequest.path()); - opts.WithSessionId(sessionId); - opts.WithTxId(txId); - if (Request->GetSerializedToken()) { - opts.WithToken(Request->GetSerializedToken()); - } - if (Request->GetTraceId()) { - opts.WithTraceId(*Request->GetTraceId()); - } - if (Request->GetRequestType()) { - opts.WithRequestType(*Request->GetRequestType()); - } - } - } - - TActorId actor = ctx.RegisterWithSameMailbox(NPQ::CreatePartitionWriter( - ctx.SelfID, {}, tabletId, Partition, ExpectedGeneration, - SourceId, opts - )); - writer.PartitionWriterActor = actor; -} - -template<bool UseMigrationProtocol> -bool TWriteSessionActor<UseMigrationProtocol>::AnyRequests() const -{ - for (auto& [_, writer] : Writers) { - if (writer.AnyRequests()) { - return true; - } - } - return false; -} - } } diff --git a/ydb/services/persqueue_v1/actors/ya.make b/ydb/services/persqueue_v1/actors/ya.make index 1f694353f6..d15a62203c 100644 --- a/ydb/services/persqueue_v1/actors/ya.make +++ b/ydb/services/persqueue_v1/actors/ya.make @@ -42,6 +42,10 @@ SRCS( schema_actors.h schema_actors.cpp update_offsets_in_transaction_actor.cpp + partition_writer.h + partition_writer.cpp + partition_writer_cache_actor.h + partition_writer_cache_actor.cpp ) END() |