aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorabcdef <akotov@ydb.tech>2023-11-27 11:52:45 +0300
committerabcdef <akotov@ydb.tech>2023-11-27 12:17:01 +0300
commitd31477e5ed13679dd3b409b100623cc81a5e0964 (patch)
tree4f37b3cdf1c700eea8d415be0c238c611df49a99
parent8ff120094ddf6980a0ebd047056c133ce1c71130 (diff)
downloadydb-d31477e5ed13679dd3b409b100623cc81a5e0964.tar.gz
limit on the number of transactions per writing session
ограничение на число транзакций для одной сессии записи
-rw-r--r--ydb/core/persqueue/writer/writer.h20
-rw-r--r--ydb/services/persqueue_v1/actors/CMakeLists.darwin-arm64.txt2
-rw-r--r--ydb/services/persqueue_v1/actors/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/services/persqueue_v1/actors/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/services/persqueue_v1/actors/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/services/persqueue_v1/actors/partition_writer.cpp112
-rw-r--r--ydb/services/persqueue_v1/actors/partition_writer.h59
-rw-r--r--ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp285
-rw-r--r--ydb/services/persqueue_v1/actors/partition_writer_cache_actor.h81
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.h18
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.ipp221
-rw-r--r--ydb/services/persqueue_v1/actors/ya.make4
13 files changed, 543 insertions, 267 deletions
diff --git a/ydb/core/persqueue/writer/writer.h b/ydb/core/persqueue/writer/writer.h
index 57df28f26b..a44c98be5a 100644
--- a/ydb/core/persqueue/writer/writer.h
+++ b/ydb/core/persqueue/writer/writer.h
@@ -21,6 +21,8 @@ struct TEvPartitionWriter {
EvWriteResponse,
EvDisconnected,
+ EvTxWriteRequest,
+
EvEnd,
};
@@ -77,6 +79,10 @@ struct TEvPartitionWriter {
explicit TEvWriteRequest(ui64 cookie) {
Record.MutablePartitionRequest()->SetCookie(cookie);
}
+
+ ui64 GetCookie() const {
+ return Record.GetPartitionRequest().GetCookie();
+ }
};
struct TEvWriteAccepted: public TEventLocal<TEvWriteAccepted, EvWriteAccepted> {
@@ -101,6 +107,7 @@ struct TEvPartitionWriter {
PartitionNotLocal,
// Partitition restarted.
PartitionDisconnected,
+ OverloadError,
};
struct TSuccess {
@@ -144,6 +151,19 @@ struct TEvPartitionWriter {
struct TEvDisconnected: public TEventLocal<TEvDisconnected, EvDisconnected> {
};
+ struct TEvTxWriteRequest : public TEventLocal<TEvTxWriteRequest, EvTxWriteRequest> {
+ TEvTxWriteRequest(const TString& sessionId, const TString& txId, THolder<TEvWriteRequest>&& request) :
+ SessionId(sessionId),
+ TxId(txId),
+ Request(std::move(request))
+ {
+ }
+
+ TString SessionId;
+ TString TxId;
+ THolder<TEvWriteRequest> Request;
+ };
+
}; // TEvPartitionWriter
struct TPartitionWriterOpts {
diff --git a/ydb/services/persqueue_v1/actors/CMakeLists.darwin-arm64.txt b/ydb/services/persqueue_v1/actors/CMakeLists.darwin-arm64.txt
index bc403f982c..f9b4e65561 100644
--- a/ydb/services/persqueue_v1/actors/CMakeLists.darwin-arm64.txt
+++ b/ydb/services/persqueue_v1/actors/CMakeLists.darwin-arm64.txt
@@ -39,4 +39,6 @@ target_sources(services-persqueue_v1-actors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/read_info_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/schema_actors.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/update_offsets_in_transaction_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/partition_writer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp
)
diff --git a/ydb/services/persqueue_v1/actors/CMakeLists.darwin-x86_64.txt b/ydb/services/persqueue_v1/actors/CMakeLists.darwin-x86_64.txt
index bc403f982c..f9b4e65561 100644
--- a/ydb/services/persqueue_v1/actors/CMakeLists.darwin-x86_64.txt
+++ b/ydb/services/persqueue_v1/actors/CMakeLists.darwin-x86_64.txt
@@ -39,4 +39,6 @@ target_sources(services-persqueue_v1-actors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/read_info_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/schema_actors.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/update_offsets_in_transaction_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/partition_writer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp
)
diff --git a/ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt b/ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt
index 595cf29d92..c74bdef3b2 100644
--- a/ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt
+++ b/ydb/services/persqueue_v1/actors/CMakeLists.linux-aarch64.txt
@@ -40,4 +40,6 @@ target_sources(services-persqueue_v1-actors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/read_info_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/schema_actors.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/update_offsets_in_transaction_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/partition_writer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp
)
diff --git a/ydb/services/persqueue_v1/actors/CMakeLists.linux-x86_64.txt b/ydb/services/persqueue_v1/actors/CMakeLists.linux-x86_64.txt
index 595cf29d92..c74bdef3b2 100644
--- a/ydb/services/persqueue_v1/actors/CMakeLists.linux-x86_64.txt
+++ b/ydb/services/persqueue_v1/actors/CMakeLists.linux-x86_64.txt
@@ -40,4 +40,6 @@ target_sources(services-persqueue_v1-actors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/read_info_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/schema_actors.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/update_offsets_in_transaction_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/partition_writer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp
)
diff --git a/ydb/services/persqueue_v1/actors/CMakeLists.windows-x86_64.txt b/ydb/services/persqueue_v1/actors/CMakeLists.windows-x86_64.txt
index bc403f982c..f9b4e65561 100644
--- a/ydb/services/persqueue_v1/actors/CMakeLists.windows-x86_64.txt
+++ b/ydb/services/persqueue_v1/actors/CMakeLists.windows-x86_64.txt
@@ -39,4 +39,6 @@ target_sources(services-persqueue_v1-actors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/read_info_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/schema_actors.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/update_offsets_in_transaction_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/partition_writer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp
)
diff --git a/ydb/services/persqueue_v1/actors/partition_writer.cpp b/ydb/services/persqueue_v1/actors/partition_writer.cpp
index 26d378100b..777be691a9 100644
--- a/ydb/services/persqueue_v1/actors/partition_writer.cpp
+++ b/ydb/services/persqueue_v1/actors/partition_writer.cpp
@@ -1,109 +1,65 @@
+#include "partition_writer.h"
+
namespace NKikimr::NGRpcProxy::V1 {
const ui32 MAX_RESERVE_REQUESTS_INFLIGHT = 5;
-template<class TEvWrite>
-TPartitionWriterImpl<TEvWrite>::TPartitionWriterImpl(NKikimr::NPQ::TMultiCounter& bytesInflight,
- NKikimr::NPQ::TMultiCounter& bytesInflightTotal,
- ui64& bytesInflight_,
- ui64& bytesInflightTotal_) :
- BytesInflight(bytesInflight),
- BytesInflightTotal(bytesInflightTotal),
- BytesInflight_(bytesInflight_),
- BytesInflightTotal_(bytesInflightTotal_)
-{
-}
-
-template<class TEvWrite>
-void TPartitionWriterImpl<TEvWrite>::OnEvInitResult(NPQ::TEvPartitionWriter::TEvInitResult::TPtr& ev)
+void TPartitionWriter::OnEvInitResult(const NPQ::TEvPartitionWriter::TEvInitResult::TPtr& ev)
{
const auto& result = *ev->Get();
Y_ABORT_UNLESS(result.IsSuccess());
+
OwnerCookie = result.GetResult().OwnerCookie;
+ MaxSeqNo = result.GetResult().SourceIdInfo.GetSeqNo();
}
-template<class TEvWrite>
-ui64 TPartitionWriterImpl<TEvWrite>::OnEvWriteAccepted(NPQ::TEvPartitionWriter::TEvWriteAccepted::TPtr& ev)
+void TPartitionWriter::OnWriteRequest(THolder<NPQ::TEvPartitionWriter::TEvWriteRequest>&& ev,
+ const TActorContext& ctx)
{
- Y_ABORT_UNLESS(!SentRequests.empty());
-
- auto request = std::move(SentRequests.front());
- Y_ABORT_UNLESS(ev->Get()->Cookie == request->Cookie);
-
- SentRequests.pop_front();
-
- ui64 size = request->ByteSize;
+ Y_ABORT_UNLESS(ev->Record.HasPartitionRequest());
- AcceptedRequests.emplace_back(std::move(request));
-
- return size;
+ if (SentRequests.size() < MAX_RESERVE_REQUESTS_INFLIGHT) {
+ SentRequests.push_back(ev->Record.GetPartitionRequest().GetCookie());
+ ctx.Send(Actor, ev.Release());
+ } else {
+ QuotedRequests.push_back(std::move(ev));
+ }
}
-template<class TEvWrite>
-auto TPartitionWriterImpl<TEvWrite>::OnEvWriteResponse(NPQ::TEvPartitionWriter::TEvWriteResponse::TPtr& ev) -> TWriteRequestInfoPtr
+void TPartitionWriter::OnWriteAccepted(const NPQ::TEvPartitionWriter::TEvWriteAccepted& ev, const TActorContext& ctx)
{
- Y_ABORT_UNLESS(!AcceptedRequests.empty());
-
- auto request = std::move(AcceptedRequests.front());
- AcceptedRequests.pop_front();
+ Y_ABORT_UNLESS(!SentRequests.empty());
+ Y_ABORT_UNLESS(ev.Cookie == SentRequests.front());
- const auto& resp = ev->Get()->Record.GetPartitionResponse();
- Y_ABORT_UNLESS(resp.GetCookie() == request->Cookie);
+ AcceptedRequests.push_back(SentRequests.front());
+ SentRequests.pop_front();
- return request;
-}
+ if (QuotedRequests.empty()) {
+ return;
+ }
-template<class TEvWrite>
-bool TPartitionWriterImpl<TEvWrite>::AnyRequests() const
-{
- return !QuotedRequests.empty() || !SentRequests.empty() || !AcceptedRequests.empty();
-}
+ if (SentRequests.size() < MAX_RESERVE_REQUESTS_INFLIGHT) {
+ auto next = std::move(QuotedRequests.front());
+ QuotedRequests.pop_front();
-template<class TEvWrite>
-void TPartitionWriterImpl<TEvWrite>::AddWriteRequest(TWriteRequestInfoPtr request, const TActorContext& ctx)
-{
- if (!QuotedRequests.empty() && SentRequests.size() < MAX_RESERVE_REQUESTS_INFLIGHT) {
- SendRequest(std::move(request), ctx);
- } else {
- QuotedRequests.emplace_back(std::move(request));
+ SentRequests.push_back(next->Record.GetPartitionRequest().GetCookie());
+ ctx.Send(Actor, next.Release());
}
}
-template<class TEvWrite>
-bool TPartitionWriterImpl<TEvWrite>::TrySendNextQuotedRequest(const TActorContext& ctx)
+void TPartitionWriter::OnWriteResponse(const NPQ::TEvPartitionWriter::TEvWriteResponse& ev)
{
- if (QuotedRequests.empty()) {
- return false;
- }
+ Y_ABORT_UNLESS(ev.IsSuccess());
- SendRequest(std::move(QuotedRequests.front()), ctx);
- QuotedRequests.pop_front();
+ Y_ABORT_UNLESS(!AcceptedRequests.empty());
+ Y_ABORT_UNLESS(ev.Record.GetPartitionResponse().GetCookie() == AcceptedRequests.front());
- return true;
+ AcceptedRequests.pop_front();
}
-template<class TEvWrite>
-void TPartitionWriterImpl<TEvWrite>::SendRequest(TWriteRequestInfoPtr request, const TActorContext& ctx)
+bool TPartitionWriter::HasPendingRequests() const
{
- Y_ABORT_UNLESS(request->PartitionWriteRequest);
-
- i64 diff = 0;
- for (const auto& w : request->UserWriteRequests) {
- diff -= w->Request.ByteSize();
- }
-
- Y_ABORT_UNLESS(-diff <= (i64)BytesInflight_);
- diff += request->PartitionWriteRequest->Record.ByteSize();
- BytesInflight_ += diff;
- BytesInflightTotal_ += diff;
-
- if (BytesInflight && BytesInflightTotal) {
- BytesInflight.Inc(diff);
- BytesInflightTotal.Inc(diff);
- }
-
- ctx.Send(PartitionWriterActor, std::move(request->PartitionWriteRequest));
- SentRequests.emplace_back(std::move(request));
+ return !QuotedRequests.empty() || !SentRequests.empty() || !AcceptedRequests.empty();
}
}
diff --git a/ydb/services/persqueue_v1/actors/partition_writer.h b/ydb/services/persqueue_v1/actors/partition_writer.h
index b47ddfaa82..78b8a41278 100644
--- a/ydb/services/persqueue_v1/actors/partition_writer.h
+++ b/ydb/services/persqueue_v1/actors/partition_writer.h
@@ -6,54 +6,27 @@
namespace NKikimr::NGRpcProxy::V1 {
-template<class TEvWrite>
-struct TPartitionWriterImpl {
- using TWriteRequestInfoPtr = typename TWriteRequestInfoImpl<TEvWrite>::TPtr;
-
- TPartitionWriterImpl(NKikimr::NPQ::TMultiCounter& bytesInflight,
- NKikimr::NPQ::TMultiCounter& bytesInflightTotal,
- ui64& bytesInflight_,
- ui64& bytesInflightTotal_);
-
- //
- // from client
- //
- void OnEvWrite(TEvPQProxy::TEvTopicWrite::TPtr& ev, const TActorContext& ctx);
-
- //
- // from partition writer actor
- //
- void OnEvInitResult(NPQ::TEvPartitionWriter::TEvInitResult::TPtr& ev);
- ui64 OnEvWriteAccepted(NPQ::TEvPartitionWriter::TEvWriteAccepted::TPtr& ev);
- TWriteRequestInfoPtr OnEvWriteResponse(NPQ::TEvPartitionWriter::TEvWriteResponse::TPtr& ev);
-
- //
- // from quoter
- //
- void OnEvWakeup(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx);
-
- bool AnyRequests() const;
- void AddWriteRequest(TWriteRequestInfoPtr request, const TActorContext& ctx);
- bool TrySendNextQuotedRequest(const TActorContext& ctx);
- void SendRequest(TWriteRequestInfoPtr request, const TActorContext& ctx);
-
- TActorId PartitionWriterActor;
+struct TPartitionWriter {
+ TPartitionWriter() = default;
+
+ void OnEvInitResult(const NPQ::TEvPartitionWriter::TEvInitResult::TPtr& ev);
+ void OnWriteRequest(THolder<NPQ::TEvPartitionWriter::TEvWriteRequest>&& ev, const TActorContext& ctx);
+ void OnWriteAccepted(const NPQ::TEvPartitionWriter::TEvWriteAccepted& ev, const TActorContext& ctx);
+ void OnWriteResponse(const NPQ::TEvPartitionWriter::TEvWriteResponse& ev);
+
+ bool HasPendingRequests() const;
+
+ TActorId Actor;
TString OwnerCookie;
+ ui64 MaxSeqNo = 0;
+ TInstant LastActivity;
// Quoted, but not sent requests
- TDeque<TWriteRequestInfoPtr> QuotedRequests;
+ TDeque<THolder<NPQ::TEvPartitionWriter::TEvWriteRequest>> QuotedRequests;
// Requests that is sent to partition actor, but not accepted
- TDeque<TWriteRequestInfoPtr> SentRequests;
+ TDeque<ui64> SentRequests;
// Accepted requests
- TDeque<TWriteRequestInfoPtr> AcceptedRequests;
-
- NKikimr::NPQ::TMultiCounter& BytesInflight;
- NKikimr::NPQ::TMultiCounter& BytesInflightTotal;
-
- ui64& BytesInflight_;
- ui64& BytesInflightTotal_;
+ TDeque<ui64> AcceptedRequests;
};
}
-
-#include "partition_writer.cpp"
diff --git a/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp b/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp
new file mode 100644
index 0000000000..a7cc60c1cf
--- /dev/null
+++ b/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.cpp
@@ -0,0 +1,285 @@
+#include "partition_writer_cache_actor.h"
+#include <ydb/core/persqueue/writer/writer.h>
+
+namespace NKikimr::NGRpcProxy::V1 {
+
+TPartitionWriterCacheActor::TPartitionWriterCacheActor(const TActorId& owner,
+ ui32 partition,
+ ui64 tabletId,
+ std::optional<ui32> expectedGeneration,
+ const TString& sourceId,
+ const NPQ::TPartitionWriterOpts& opts) :
+ Owner(owner),
+ Partition(partition),
+ TabletId(tabletId),
+ ExpectedGeneration(expectedGeneration),
+ SourceId(sourceId),
+ Opts(opts)
+{
+}
+
+void TPartitionWriterCacheActor::Bootstrap(const TActorContext& ctx)
+{
+ RegisterDefaultPartitionWriter(ctx);
+
+ this->Become(&TPartitionWriterCacheActor::StateWork);
+}
+
+void TPartitionWriterCacheActor::RegisterPartitionWriter(const TString& sessionId, const TString& txId,
+ const TActorContext& ctx)
+{
+ std::pair<TString, TString> key(sessionId, txId);
+
+ auto writer = std::make_unique<TPartitionWriter>();
+ writer->Actor = CreatePartitionWriter(sessionId, txId, ctx);
+ writer->LastActivity = ctx.Now();
+
+ Writers.emplace(key, std::move(writer));
+}
+
+void TPartitionWriterCacheActor::RegisterDefaultPartitionWriter(const TActorContext& ctx)
+{
+ RegisterPartitionWriter("", "", ctx);
+}
+
+STFUNC(TPartitionWriterCacheActor::StateWork)
+{
+ switch (ev->GetTypeRewrite()) {
+ HFunc(NPQ::TEvPartitionWriter::TEvTxWriteRequest, Handle);
+ HFunc(NPQ::TEvPartitionWriter::TEvInitResult, Handle);
+ HFunc(NPQ::TEvPartitionWriter::TEvWriteAccepted, Handle);
+ HFunc(NPQ::TEvPartitionWriter::TEvWriteResponse, Handle);
+ HFunc(NPQ::TEvPartitionWriter::TEvDisconnected, Handle);
+ HFunc(TEvents::TEvPoisonPill, Handle);
+ }
+}
+
+void TPartitionWriterCacheActor::ReplyError(const TString& sessionId, const TString& txId,
+ EErrorCode code, const TString& reason,
+ ui64 cookie,
+ const TActorContext& ctx)
+{
+ NKikimrClient::TResponse response;
+ response.MutablePartitionResponse()->SetCookie(cookie);
+
+ ctx.Send(Owner, new NPQ::TEvPartitionWriter::TEvWriteResponse(sessionId, txId,
+ code, reason,
+ std::move(response)));
+}
+
+void TPartitionWriterCacheActor::Handle(NPQ::TEvPartitionWriter::TEvTxWriteRequest::TPtr& ev, const TActorContext& ctx)
+{
+ auto& event = *ev->Get();
+
+ if (auto* writer = GetPartitionWriter(event.SessionId, event.TxId, ctx); writer) {
+ if (PendingWriteAccepted.Expected == Max<ui64>()) {
+ Y_ABORT_UNLESS(PendingWriteResponse.Expected == Max<ui64>());
+
+ PendingWriteAccepted.Expected = event.Request->GetCookie();
+ PendingWriteResponse.Expected = event.Request->GetCookie();
+ }
+
+ writer->LastActivity = ctx.Now();
+ writer->OnWriteRequest(std::move(event.Request), ctx);
+ } else {
+ ReplyError(event.SessionId, event.TxId,
+ EErrorCode::OverloadError, "limit of active transactions has been exceeded",
+ event.Request->GetCookie(),
+ ctx);
+ this->Become(&TPartitionWriterCacheActor::StateBroken);
+ }
+}
+
+void TPartitionWriterCacheActor::HandleOnBroken(NPQ::TEvPartitionWriter::TEvTxWriteRequest::TPtr& ev, const TActorContext& ctx)
+{
+ auto& event = *ev->Get();
+
+ ReplyError(event.SessionId, event.TxId,
+ EErrorCode::OverloadError, "limit of active transactions has been exceeded",
+ event.Request->GetCookie(),
+ ctx);
+}
+
+void TPartitionWriterCacheActor::Handle(NPQ::TEvPartitionWriter::TEvInitResult::TPtr& ev, const TActorContext& ctx)
+{
+ auto& result = *ev->Get();
+
+ auto key = std::make_pair(result.SessionId, result.TxId);
+ auto p = Writers.find(key);
+ Y_ABORT_UNLESS(p != Writers.end());
+
+ if (result.IsSuccess()) {
+ p->second->OnEvInitResult(ev);
+ }
+
+ if (!result.SessionId && !result.TxId) {
+ ctx.Send(Owner, ev->Release().Release());
+ }
+}
+
+template <class TEvent>
+void TPartitionWriterCacheActor::TryForwardToOwner(TEvent* event, TEventQueue<TEvent>& queue,
+ ui64 cookie,
+ const TActorContext& ctx)
+{
+ Y_ABORT_UNLESS(queue.Expected != Max<ui64>());
+
+ if (queue.Expected == cookie) {
+ ctx.Send(Owner, event);
+
+ ++queue.Expected;
+ for (auto p = queue.Events.find(queue.Expected); p != queue.Events.end(); ) {
+ ctx.Send(Owner, p->second.release());
+ queue.Events.erase(queue.Expected);
+
+ ++queue.Expected;
+ p = queue.Events.find(queue.Expected);
+ }
+ } else {
+ queue.Events.emplace(cookie, event);
+ }
+}
+
+void TPartitionWriterCacheActor::Handle(NPQ::TEvPartitionWriter::TEvWriteAccepted::TPtr& ev, const TActorContext& ctx)
+{
+ const auto& result = *ev->Get();
+
+ auto key = std::make_pair(result.SessionId, result.TxId);
+ auto p = Writers.find(key);
+ Y_ABORT_UNLESS(p != Writers.end());
+
+ if (result.Cookie == p->second->SentRequests.front()) {
+ p->second->OnWriteAccepted(result, ctx);
+
+ TryForwardToOwner(ev->Release().Release(), PendingWriteAccepted,
+ result.Cookie,
+ ctx);
+ } else {
+ ReplyError(result.SessionId, result.TxId,
+ EErrorCode::InternalError, "out of order reserve bytes response from server, may be previous is lost",
+ p->second->SentRequests.front(),
+ ctx);
+ this->Become(&TPartitionWriterCacheActor::StateBroken);
+ }
+}
+
+void TPartitionWriterCacheActor::Handle(NPQ::TEvPartitionWriter::TEvWriteResponse::TPtr& ev, const TActorContext& ctx)
+{
+ const auto& result = *ev->Get();
+
+ auto key = std::make_pair(result.SessionId, result.TxId);
+ auto p = Writers.find(key);
+ Y_ABORT_UNLESS(p != Writers.end());
+
+ if (result.IsSuccess()) {
+ ui64 cookie = result.Record.GetPartitionResponse().GetCookie();
+ if (cookie == p->second->AcceptedRequests.front()) {
+ p->second->OnWriteResponse(result);
+
+ TryForwardToOwner(ev->Release().Release(), PendingWriteResponse,
+ cookie,
+ ctx);
+ } else {
+ ReplyError(result.SessionId, result.TxId,
+ EErrorCode::InternalError, "out of order write response from server, may be previous is lost",
+ p->second->AcceptedRequests.front(),
+ ctx);
+ this->Become(&TPartitionWriterCacheActor::StateBroken);
+ }
+ } else {
+ ctx.Send(Owner, ev->Release().Release());
+ }
+}
+
+void TPartitionWriterCacheActor::Handle(NPQ::TEvPartitionWriter::TEvDisconnected::TPtr& ev, const TActorContext& ctx)
+{
+ ctx.Send(Owner, ev->Release().Release());
+}
+
+void TPartitionWriterCacheActor::Handle(TEvents::TEvPoisonPill::TPtr& ev, const TActorContext& ctx)
+{
+ Y_UNUSED(ev);
+
+ for (auto& [_, writer] : Writers) {
+ ctx.Send(writer->Actor, new TEvents::TEvPoisonPill());
+ }
+}
+
+auto TPartitionWriterCacheActor::GetPartitionWriter(const TString& sessionId, const TString& txId,
+ const TActorContext& ctx) -> TPartitionWriter*
+{
+ auto key = std::make_pair(sessionId, txId);
+
+ auto p = Writers.find(key);
+ if (p != Writers.end()) {
+ return p->second.get();
+ }
+
+ if (Writers.size() >= MAX_TRANSACTIONS_COUNT) {
+ if (!TryDeleteOldestWriter(ctx)) {
+ return nullptr;
+ }
+ }
+
+ RegisterPartitionWriter(sessionId, txId, ctx);
+
+ p = Writers.find(key);
+ Y_ABORT_UNLESS(p != Writers.end());
+
+ return p->second.get();
+}
+
+bool TPartitionWriterCacheActor::TryDeleteOldestWriter(const TActorContext& ctx)
+{
+ Y_ABORT_UNLESS(!Writers.empty());
+
+ auto minLastActivity = TInstant::Max();
+ auto oldest = Writers.end();
+
+ for (auto p = Writers.begin(); p != Writers.end(); ++p) {
+ auto& writer = *p->second;
+
+ if ((writer.LastActivity < minLastActivity) && !writer.HasPendingRequests()) {
+ minLastActivity = writer.LastActivity;
+ oldest = p;
+ }
+ }
+
+ if (minLastActivity == TInstant::Max()) {
+ return false;
+ }
+
+ ctx.Send(oldest->second->Actor, new TEvents::TEvPoisonPill());
+ Writers.erase(oldest);
+
+ return true;
+}
+
+TActorId TPartitionWriterCacheActor::CreatePartitionWriter(const TString& sessionId, const TString& txId,
+ const TActorContext& ctx)
+{
+ NPQ::TPartitionWriterOpts opts = Opts;
+ if (sessionId && txId) {
+ opts.WithSessionId(sessionId);
+ opts.WithTxId(txId);
+ }
+
+ return ctx.RegisterWithSameMailbox(NPQ::CreatePartitionWriter(
+ ctx.SelfID, {}, TabletId, Partition, ExpectedGeneration,
+ SourceId, opts
+ ));
+}
+
+STFUNC(TPartitionWriterCacheActor::StateBroken)
+{
+ switch (ev->GetTypeRewrite()) {
+ HFunc(NPQ::TEvPartitionWriter::TEvTxWriteRequest, HandleOnBroken);
+ HFunc(NPQ::TEvPartitionWriter::TEvInitResult, Handle);
+ HFunc(NPQ::TEvPartitionWriter::TEvWriteAccepted, Handle);
+ HFunc(NPQ::TEvPartitionWriter::TEvWriteResponse, Handle);
+ HFunc(NPQ::TEvPartitionWriter::TEvDisconnected, Handle);
+ HFunc(TEvents::TEvPoisonPill, Handle);
+ }
+}
+
+}
diff --git a/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.h b/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.h
new file mode 100644
index 0000000000..cad18c3d89
--- /dev/null
+++ b/ydb/services/persqueue_v1/actors/partition_writer_cache_actor.h
@@ -0,0 +1,81 @@
+#pragma once
+
+#include "events.h"
+#include "partition_writer.h"
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+
+namespace NKikimr::NGRpcProxy::V1 {
+
+class TPartitionWriterCacheActor : public NActors::TActorBootstrapped<TPartitionWriterCacheActor> {
+public:
+ TPartitionWriterCacheActor(const TActorId& owner,
+ ui32 partition,
+ ui64 tabletId,
+ std::optional<ui32> expectedGeneration,
+ const TString& sourceId,
+ const NPQ::TPartitionWriterOpts& opts);
+
+ void Bootstrap(const TActorContext& ctx);
+
+private:
+ using TPartitionWriterPtr = std::unique_ptr<TPartitionWriter>;
+ using EErrorCode = NPQ::TEvPartitionWriter::TEvWriteResponse::EErrorCode;
+
+ template <class TEvent>
+ struct TEventQueue {
+ TEventQueue() :
+ Expected(Max<ui64>())
+ {
+ }
+
+ ui64 Expected;
+ THashMap<ui64, std::unique_ptr<TEvent>> Events;
+ };
+
+ static constexpr const size_t MAX_TRANSACTIONS_COUNT = 4;
+
+ STFUNC(StateWork);
+ STFUNC(StateBroken);
+
+ void Handle(NPQ::TEvPartitionWriter::TEvTxWriteRequest::TPtr& ev, const TActorContext& ctx);
+ void HandleOnBroken(NPQ::TEvPartitionWriter::TEvTxWriteRequest::TPtr& ev, const TActorContext& ctx);
+ void Handle(NPQ::TEvPartitionWriter::TEvInitResult::TPtr& ev, const TActorContext& ctx);
+ void Handle(NPQ::TEvPartitionWriter::TEvWriteAccepted::TPtr& ev, const TActorContext& ctx);
+ void Handle(NPQ::TEvPartitionWriter::TEvWriteResponse::TPtr& ev, const TActorContext& ctx);
+ void Handle(NPQ::TEvPartitionWriter::TEvDisconnected::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvents::TEvPoisonPill::TPtr& ev, const TActorContext& ctx);
+
+ void ReplyError(const TString& sessionId, const TString& txId,
+ EErrorCode code, const TString& reason,
+ ui64 cookie,
+ const TActorContext& ctx);
+
+ TPartitionWriter* GetPartitionWriter(const TString& sessionId, const TString& txId,
+ const TActorContext& ctx);
+ bool TryDeleteOldestWriter(const TActorContext& ctx);
+ void RegisterPartitionWriter(const TString& sessionId, const TString& txId,
+ const TActorContext& ctx);
+ void RegisterDefaultPartitionWriter(const TActorContext& ctx);
+ TActorId CreatePartitionWriter(const TString& sessionId, const TString& txId,
+ const TActorContext& ctx);
+
+ template <class TEvent>
+ void TryForwardToOwner(TEvent* event, TEventQueue<TEvent>& queue,
+ ui64 cookie,
+ const TActorContext& ctx);
+
+ TActorId Owner; // WriteSessionActor
+ ui32 Partition;
+ ui64 TabletId;
+ std::optional<ui32> ExpectedGeneration;
+ TString SourceId;
+ NPQ::TPartitionWriterOpts Opts;
+
+ THashMap<std::pair<TString, TString>, TPartitionWriterPtr> Writers;
+
+ TEventQueue<NPQ::TEvPartitionWriter::TEvWriteAccepted> PendingWriteAccepted;
+ TEventQueue<NPQ::TEvPartitionWriter::TEvWriteResponse> PendingWriteResponse;
+};
+
+}
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.h b/ydb/services/persqueue_v1/actors/write_session_actor.h
index 089583362c..545126389c 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.h
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.h
@@ -4,6 +4,7 @@
#include "partition_writer.h"
#include "persqueue_utils.h"
#include "write_request_info.h"
+#include "partition_writer_cache_actor.h"
#include <library/cpp/actors/core/actor_bootstrapped.h>
@@ -55,7 +56,6 @@ class TWriteSessionActor
using TEvDescribeTopicsRequest = NMsgBusProxy::NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeTopicsRequest;
using TWriteRequestInfo = TWriteRequestInfoImpl<TEvWrite>;
- using TPartitionWriter = TPartitionWriterImpl<TEvWrite>;
// Codec ID size in bytes
static constexpr ui32 CODEC_ID_SIZE = 1;
@@ -164,7 +164,7 @@ private:
void MakeAndSentInitResponse(const TMaybe<ui64>& maxSeqNo, const TActorContext& ctx);
void Handle(NPQ::TEvPartitionWriter::TEvWriteAccepted::TPtr& ev, const TActorContext& ctx);
- void ProcessWriteResponse(const NKikimrClient::TPersQueuePartitionResponse& response, TPartitionWriter& writer, const TActorContext& ctx);
+ void ProcessWriteResponse(const NKikimrClient::TPersQueuePartitionResponse& response, const TActorContext& ctx);
void Handle(NPQ::TEvPartitionWriter::TEvWriteResponse::TPtr& ev, const TActorContext& ctx);
void Handle(NPQ::TEvPartitionWriter::TEvDisconnected::TPtr& ev, const TActorContext& ctx);
void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const NActors::TActorContext& ctx);
@@ -179,16 +179,14 @@ private:
void CheckFinish(const NActors::TActorContext& ctx);
void PrepareRequest(THolder<TEvWrite>&& ev, const TActorContext& ctx);
- void SendRequest(TPartitionWriter& writer, typename TWriteRequestInfo::TPtr&& request, const TActorContext& ctx);
+ void SendWriteRequest(typename TWriteRequestInfo::TPtr&& request, const TActorContext& ctx);
void SetupCounters();
void SetupCounters(const TString& cloudId, const TString& dbId, const TString& dbPath, const bool isServerless, const TString& folderId);
private:
- TPartitionWriter* FindPartitionWriter(const TString& sessionId, const TString& txId);
- void InitPartitionWriter(const TString& sessionId, const TString& txId,
- const TActorContext& ctx);
- bool AnyRequests() const;
+ void CreatePartitionWriterCache(const TActorContext& ctx);
+ void DestroyPartitionWriterCache(const TActorContext& ctx);
std::unique_ptr<TEvStreamWriteRequest> Request;
@@ -232,6 +230,10 @@ private:
// Request that is waiting for quota
typename TWriteRequestInfo::TPtr PendingQuotaRequest;
+ // Requests that is sent to partition actor, but not accepted
+ std::deque<typename TWriteRequestInfo::TPtr> SentRequests;
+ // Accepted requests
+ std::deque<typename TWriteRequestInfo::TPtr> AcceptedRequests;
bool WritesDone;
@@ -301,7 +303,7 @@ private:
TDeque<ui64> SeqNoInflight;
- THashMap<std::pair<TString, TString>, TPartitionWriter> Writers;
+ TActorId PartitionWriterCache;
};
}
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
index 62c4f2bd15..bfd91ad055 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
@@ -293,9 +293,6 @@ template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::Die(const TActorContext& ctx) {
if (State == ES_DYING)
return;
- for (auto& [_, writer] : Writers) {
- ctx.Send(writer.PartitionWriterActor, new TEvents::TEvPoisonPill());
- }
if (SessionsActive) {
SessionsActive.Dec();
@@ -309,6 +306,8 @@ void TWriteSessionActor<UseMigrationProtocol>::Die(const TActorContext& ctx) {
ctx.Send(GetPQWriteServiceActorID(), new TEvPQProxy::TEvSessionDead(Cookie));
+ DestroyPartitionWriterCache(ctx);
+
if (State == ES_WAIT_SESSION) { // final die will be done later, on session discover
State = ES_DYING;
return;
@@ -338,7 +337,7 @@ void TWriteSessionActor<UseMigrationProtocol>::CheckFinish(const TActorContext&
CloseSession("out of order Writes done before initialization", PersQueue::ErrorCode::BAD_REQUEST, ctx);
return;
}
- if (PendingRequests.empty() && !PendingQuotaRequest && !AnyRequests()) {
+ if (PendingRequests.empty() && !PendingQuotaRequest && SentRequests.empty() && AcceptedRequests.empty()) {
CloseSession("", PersQueue::ErrorCode::OK, ctx);
return;
}
@@ -1019,7 +1018,7 @@ void TWriteSessionActor<UseMigrationProtocol>::ProceedPartition(const ui32 parti
return;
}
- InitPartitionWriter("", "", ctx);
+ CreatePartitionWriterCache(ctx);
State = ES_WAIT_WRITER_INIT;
@@ -1036,6 +1035,52 @@ void TWriteSessionActor<UseMigrationProtocol>::ProceedPartition(const ui32 parti
}
}
+template <bool UseMigrationProtocol>
+void TWriteSessionActor<UseMigrationProtocol>::CreatePartitionWriterCache(const TActorContext& ctx)
+{
+ NPQ::TPartitionWriterOpts opts;
+
+ opts.WithDeduplication(UseDeduplication);
+ if constexpr (!UseMigrationProtocol) {
+ if (Request->GetDatabaseName()) {
+ opts.WithDatabase(*Request->GetDatabaseName());
+ }
+ opts.WithTopicPath(InitRequest.path());
+ if (Request->GetSerializedToken()) {
+ opts.WithToken(Request->GetSerializedToken());
+ }
+ if (Request->GetTraceId()) {
+ opts.WithTraceId(*Request->GetTraceId());
+ }
+ if (Request->GetRequestType()) {
+ opts.WithRequestType(*Request->GetRequestType());
+ }
+ }
+
+ auto it = PartitionToTablet.find(Partition);
+ ui64 tabletId = it != PartitionToTablet.end() ? it->second : 0;
+
+ auto actor =
+ std::make_unique<TPartitionWriterCacheActor>(ctx.SelfID,
+ Partition,
+ tabletId,
+ ExpectedGeneration,
+ SourceId,
+ opts);
+
+ PartitionWriterCache = ctx.RegisterWithSameMailbox(actor.release());
+}
+
+template <bool UseMigrationProtocol>
+void TWriteSessionActor<UseMigrationProtocol>::DestroyPartitionWriterCache(const TActorContext& ctx)
+{
+ if (PartitionWriterCache == TActorId()) {
+ return;
+ }
+
+ ctx.Send(PartitionWriterCache, new TEvents::TEvPoisonPill());
+}
+
template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::CloseSession(const TString& errorReason, const PersQueue::ErrorCode::ErrorCode errorCode, const NActors::TActorContext& ctx) {
@@ -1132,19 +1177,12 @@ template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::TEvInitResult::TPtr& ev, const TActorContext& ctx) {
const auto& result = *ev->Get();
- auto writer = FindPartitionWriter(result.SessionId, result.TxId);
- Y_ABORT_UNLESS(writer != nullptr);
-
- if (!result.SessionId && !result.TxId) {
- if (State != ES_WAIT_WRITER_INIT) {
- return CloseSession("got init result but not wait for it", PersQueue::ErrorCode::ERROR, ctx);
- }
- } else {
- if (State != ES_INITED) {
- return CloseSession("got init result but not wait for it", PersQueue::ErrorCode::ERROR, ctx);
- }
+ if (State != ES_WAIT_WRITER_INIT) {
+ return CloseSession("got init result but not wait for it", PersQueue::ErrorCode::ERROR, ctx);
}
+ Y_ABORT_UNLESS(!result.SessionId && !result.TxId);
+
if (!result.IsSuccess()) {
const auto& error = result.GetError();
if (error.Response.HasErrorCode()) {
@@ -1154,17 +1192,15 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T
}
}
- writer->OnEvInitResult(ev);
+ OwnerCookie = result.GetResult().OwnerCookie;
const auto& maxSeqNo = result.GetResult().SourceIdInfo.GetSeqNo();
if (!UseDeduplication) {
Y_ABORT_UNLESS(maxSeqNo == 0);
}
- if (!result.SessionId && !result.TxId) {
- OwnerCookie = result.GetResult().OwnerCookie;
- MakeAndSentInitResponse(maxSeqNo, ctx);
- }
+ OwnerCookie = result.GetResult().OwnerCookie;
+ MakeAndSentInitResponse(maxSeqNo, ctx);
}
template<bool UseMigrationProtocol>
@@ -1173,21 +1209,18 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T
return CloseSession("got write permission but not wait for it", PersQueue::ErrorCode::ERROR, ctx);
}
- auto* writer = FindPartitionWriter(ev->Get()->SessionId, ev->Get()->TxId);
- Y_ABORT_UNLESS(writer != nullptr);
-
- Y_ABORT_UNLESS(!writer->SentRequests.empty());
- auto writeRequest = std::move(writer->SentRequests.front());
+ Y_ABORT_UNLESS(!SentRequests.empty());
+ auto writeRequest = std::move(SentRequests.front());
if (ev->Get()->Cookie != writeRequest->Cookie) {
return CloseSession("out of order reserve bytes response from server, may be previous is lost", PersQueue::ErrorCode::ERROR, ctx);
}
- writer->SentRequests.pop_front();
+ SentRequests.pop_front();
ui64 diff = writeRequest->ByteSize;
- writer->AcceptedRequests.emplace_back(std::move(writeRequest));
+ AcceptedRequests.emplace_back(std::move(writeRequest));
BytesInflight_ -= diff;
if (BytesInflight) {
@@ -1203,20 +1236,17 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T
}
if (!IsQuotaRequired() && !PendingRequests.empty()) {
- SendRequest(*writer, std::move(PendingRequests.front()), ctx);
+ SendWriteRequest(std::move(PendingRequests.front()), ctx);
PendingRequests.pop_front();
- } else if (!writer->QuotedRequests.empty()) {
- SendRequest(*writer, std::move(writer->QuotedRequests.front()), ctx);
- writer->QuotedRequests.pop_front();
}
}
template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::ProcessWriteResponse(
- const NKikimrClient::TPersQueuePartitionResponse& response, TPartitionWriter& writer, const TActorContext& ctx
+ const NKikimrClient::TPersQueuePartitionResponse& response, const TActorContext& ctx
) {
- auto writeRequest = std::move(writer.AcceptedRequests.front());
- writer.AcceptedRequests.pop_front();
+ auto writeRequest = std::move(AcceptedRequests.front());
+ AcceptedRequests.pop_front();
auto addAckMigration = [this](
const TPersQueuePartitionResponse::TCmdWriteResult& res,
@@ -1361,21 +1391,19 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T
}
}
- auto* writer = FindPartitionWriter(result.SessionId, result.TxId);
- Y_ABORT_UNLESS(writer != nullptr);
-
- if (writer->AcceptedRequests.empty()) {
+ if (AcceptedRequests.empty()) {
CloseSession("got too many replies from server, internal error", PersQueue::ErrorCode::ERROR, ctx);
return;
}
- const auto& writeRequest = writer->AcceptedRequests.front();
+ const auto& writeRequest = AcceptedRequests.front();
const auto& resp = result.Record.GetPartitionResponse();
if (resp.GetCookie() != writeRequest->Cookie) {
return CloseSession("out of order write response from server, may be previous is lost", PersQueue::ErrorCode::ERROR, ctx);
}
- ProcessWriteResponse(resp, *writer, ctx);
+
+ ProcessWriteResponse(resp, ctx);
}
template<bool UseMigrationProtocol>
@@ -1488,10 +1516,6 @@ void TWriteSessionActor<UseMigrationProtocol>::PrepareRequest(THolder<TEvWrite>&
for (i32 messageIndex = 0; messageIndex != writeRequest.messages_size(); ++messageIndex) {
addData(writeRequest, messageIndex);
}
-
- if (writeRequest.has_tx()) {
- InitPartitionWriter(writeRequest.tx().session(), writeRequest.tx().id(), ctx);
- }
}
pendingRequest->UserWriteRequests.push_back(std::move(ev));
@@ -1522,25 +1546,16 @@ void TWriteSessionActor<UseMigrationProtocol>::PrepareRequest(THolder<TEvWrite>&
}
}
} else {
- TPartitionWriter* writer = nullptr;
- if constexpr (UseMigrationProtocol) {
- Y_ABORT_UNLESS(Writers.size() == 1);
- writer = &Writers.begin()->second;
- } else {
- const auto& [sessionId, txId] = pendingRequest->GetTransactionId();
- writer = FindPartitionWriter(sessionId, txId);
- Y_ABORT_UNLESS(writer != nullptr);
- }
-
- if (!PendingQuotaRequest && writer->SentRequests.size() < MAX_RESERVE_REQUESTS_INFLIGHT) {
- SendRequest(*writer, std::move(PendingRequests.front()), ctx);
+ if (!PendingQuotaRequest) {
+ SendWriteRequest(std::move(PendingRequests.front()), ctx);
PendingRequests.pop_front();
}
}
}
template<bool UseMigrationProtocol>
-void TWriteSessionActor<UseMigrationProtocol>::SendRequest(TPartitionWriter& writer, typename TWriteRequestInfo::TPtr&& request, const TActorContext& ctx) {
+void TWriteSessionActor<UseMigrationProtocol>::SendWriteRequest(typename TWriteRequestInfo::TPtr&& request, const TActorContext& ctx)
+{
Y_ABORT_UNLESS(request->PartitionWriteRequest);
i64 diff = 0;
@@ -1550,6 +1565,7 @@ void TWriteSessionActor<UseMigrationProtocol>::SendRequest(TPartitionWriter& wri
Y_ABORT_UNLESS(-diff <= (i64)BytesInflight_);
diff += request->PartitionWriteRequest->Record.ByteSize();
+
BytesInflight_ += diff;
BytesInflightTotal_ += diff;
if (BytesInflight && BytesInflightTotal) {
@@ -1557,8 +1573,14 @@ void TWriteSessionActor<UseMigrationProtocol>::SendRequest(TPartitionWriter& wri
BytesInflightTotal.Inc(diff);
}
- ctx.Send(writer.PartitionWriterActor, std::move(request->PartitionWriteRequest));
- writer.SentRequests.push_back(std::move(request));
+ auto [sessionId, txId] = request->GetTransactionId();
+ auto event =
+ std::make_unique<NPQ::TEvPartitionWriter::TEvTxWriteRequest>(sessionId, txId,
+ std::move(request->PartitionWriteRequest));
+
+ ctx.Send(PartitionWriterCache, std::move(event));
+
+ SentRequests.push_back(std::move(request));
}
template<bool UseMigrationProtocol>
@@ -1809,15 +1831,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvents::TEvWakeup::TPtr&
counters->AddConsumedRequestUnits(PendingQuotaRequest->RequiredQuota);
}
- const auto& [sessionId, txId] = PendingQuotaRequest->GetTransactionId();
- auto* writer = FindPartitionWriter(sessionId, txId);
- Y_ABORT_UNLESS(writer != nullptr);
-
- if (writer->SentRequests.size() < MAX_RESERVE_REQUESTS_INFLIGHT) {
- SendRequest(*writer, std::move(PendingQuotaRequest), ctx);
- } else {
- writer->QuotedRequests.push_back(std::move(PendingQuotaRequest));
- }
+ SendWriteRequest(std::move(PendingQuotaRequest), ctx);
if (!PendingRequests.empty()) {
Y_ABORT_UNLESS(MaybeRequestQuota(PendingRequests.front()->RequiredQuota, EWakeupTag::RlAllowed, ctx));
@@ -1861,74 +1875,5 @@ void TWriteSessionActor<UseMigrationProtocol>::RecheckACL(const TActorContext& c
}
}
-template<bool UseMigrationProtocol>
-auto TWriteSessionActor<UseMigrationProtocol>::FindPartitionWriter(const TString& sessionId, const TString& txId) -> TPartitionWriter*
-{
- auto p = Writers.find(std::make_pair(sessionId, txId));
- if (p == Writers.end()) {
- return nullptr;
- }
- return &p->second;
-}
-
-template<bool UseMigrationProtocol>
-void TWriteSessionActor<UseMigrationProtocol>::InitPartitionWriter(const TString& sessionId, const TString& txId,
- const TActorContext& ctx)
-{
- auto key = std::make_pair(sessionId, txId);
- if (auto p = Writers.find(key); p != Writers.end()) {
- return;
- }
-
- auto [p, _] = Writers.emplace(std::piecewise_construct,
- std::forward_as_tuple(key),
- std::forward_as_tuple(BytesInflight,
- BytesInflightTotal,
- BytesInflight_,
- BytesInflightTotal_));
- auto& writer = p->second;
-
- auto it = PartitionToTablet.find(Partition);
- ui64 tabletId = it != PartitionToTablet.end() ? it->second : 0;
-
- TPartitionWriterOpts opts;
- opts.WithDeduplication(UseDeduplication);
- if constexpr (!UseMigrationProtocol) {
- if (sessionId && txId) {
- Y_ABORT_UNLESS(Request->GetDatabaseName());
- opts.WithDatabase(*Request->GetDatabaseName());
- opts.WithTopicPath(InitRequest.path());
- opts.WithSessionId(sessionId);
- opts.WithTxId(txId);
- if (Request->GetSerializedToken()) {
- opts.WithToken(Request->GetSerializedToken());
- }
- if (Request->GetTraceId()) {
- opts.WithTraceId(*Request->GetTraceId());
- }
- if (Request->GetRequestType()) {
- opts.WithRequestType(*Request->GetRequestType());
- }
- }
- }
-
- TActorId actor = ctx.RegisterWithSameMailbox(NPQ::CreatePartitionWriter(
- ctx.SelfID, {}, tabletId, Partition, ExpectedGeneration,
- SourceId, opts
- ));
- writer.PartitionWriterActor = actor;
-}
-
-template<bool UseMigrationProtocol>
-bool TWriteSessionActor<UseMigrationProtocol>::AnyRequests() const
-{
- for (auto& [_, writer] : Writers) {
- if (writer.AnyRequests()) {
- return true;
- }
- }
- return false;
-}
-
}
}
diff --git a/ydb/services/persqueue_v1/actors/ya.make b/ydb/services/persqueue_v1/actors/ya.make
index 1f694353f6..d15a62203c 100644
--- a/ydb/services/persqueue_v1/actors/ya.make
+++ b/ydb/services/persqueue_v1/actors/ya.make
@@ -42,6 +42,10 @@ SRCS(
schema_actors.h
schema_actors.cpp
update_offsets_in_transaction_actor.cpp
+ partition_writer.h
+ partition_writer.cpp
+ partition_writer_cache_actor.h
+ partition_writer_cache_actor.cpp
)
END()