diff options
author | abcdef <akotov@ydb.tech> | 2023-08-30 10:13:54 +0300 |
---|---|---|
committer | abcdef <akotov@ydb.tech> | 2023-08-30 10:35:19 +0300 |
commit | f0ee5fd09e79f0c4a3d7972f91c2d7eb17518cd8 (patch) | |
tree | 706dcdf676b9d490b4eb6084f887c5dd58c8f1ae | |
parent | d6dfde60c0bc97c0acffa53994186d90c6144fa9 (diff) | |
download | ydb-f0ee5fd09e79f0c4a3d7972f91c2d7eb17518cd8.tar.gz |
запись в топик в транзакции. изменения в SDK и KQP
29 files changed, 784 insertions, 141 deletions
diff --git a/ydb/core/kqp/common/events/query.h b/ydb/core/kqp/common/events/query.h index 49bd2cbac2..a73a6e9e0a 100644 --- a/ydb/core/kqp/common/events/query.h +++ b/ydb/core/kqp/common/events/query.h @@ -55,7 +55,7 @@ public: return RequestCtx ? false : Record.HasYdbStatus(); } - const ::NKikimrKqp::TTopicOperations& GetTopicOperations() const { + const ::NKikimrKqp::TTopicOperationsRequest& GetTopicOperations() const { return Record.GetRequest().GetTopicOperations(); } diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 6a73fbac76..a6c26a6988 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2261,10 +2261,9 @@ private: } void ExecuteTopicTabletTransactions(TTopicTabletTxs& topicTxs) { - auto lockTxId = Request.AcquireLocksTxId; - if (lockTxId.Defined() && *lockTxId == 0) { - lockTxId = TxId; - LockHandle = TLockHandle(TxId, TActivationContext::ActorSystem()); + TMaybe<ui64> writeId; + if (Request.TopicOperations.HasWriteId()) { + writeId = Request.TopicOperations.GetWriteId(); } for (auto& tx : topicTxs) { @@ -2273,8 +2272,8 @@ private: auto ev = std::make_unique<TEvPersQueue::TEvProposeTransaction>(); - if (lockTxId) { - transaction.SetLockTxId(*lockTxId); + if (writeId.Defined()) { + transaction.SetWriteId(*writeId); } transaction.SetImmediate(ImmediateTx); @@ -2286,7 +2285,7 @@ private: LOG_D("ExecuteTopicTabletTransaction traceId.verbosity: " << std::to_string(traceId.GetVerbosity())); LOG_D("Executing KQP transaction on topic tablet: " << tabletId - << ", lockTxId: " << lockTxId); + << ", writeId: " << writeId); Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(ev.release(), tabletId, true), diff --git a/ydb/core/kqp/session_actor/kqp_query_state.cpp b/ydb/core/kqp/session_actor/kqp_query_state.cpp index 95bdefab6d..ccd842f504 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.cpp +++ b/ydb/core/kqp/session_actor/kqp_query_state.cpp @@ -300,4 +300,5 @@ bool TKqpQueryState::HasErrors(const NSchemeCache::TSchemeCacheNavigate& respons return true; } + } diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 92319c8971..4cc5e80927 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -230,7 +230,7 @@ public: return RequestEv->GetQuery(); } - const ::NKikimrKqp::TTopicOperations& GetTopicOperations() const { + const ::NKikimrKqp::TTopicOperationsRequest& GetTopicOperations() const { return RequestEv->GetTopicOperations(); } diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 70336809bf..75a6cd2ae4 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -986,8 +986,8 @@ public: request.PerShardKeysSizeLimitBytes = Config->_CommitPerShardKeysSizeLimitBytes.Get().GetRef(); } - if (txCtx.Locks.HasLocks() || txCtx.TopicOperations.HasReadOperations()) { - if (!txCtx.GetSnapshot().IsValid() || txCtx.TxHasEffects() || txCtx.TopicOperations.HasReadOperations()) { + if (txCtx.Locks.HasLocks() || txCtx.TopicOperations.HasOperations()) { + if (!txCtx.GetSnapshot().IsValid() || txCtx.TxHasEffects() || txCtx.TopicOperations.HasOperations()) { LOG_D("TExecPhysicalRequest, tx has commit locks"); request.LocksOp = ELocksOp::Commit; } else { @@ -1402,6 +1402,8 @@ public: bool replyQueryId = false; bool replyQueryParameters = false; + bool replyTopicOperations = false; + switch (QueryState->GetAction()) { case NKikimrKqp::QUERY_ACTION_PREPARE: replyQueryId = true; @@ -1417,6 +1419,10 @@ public: replyQueryParameters = true; break; + case NKikimrKqp::QUERY_ACTION_TOPIC: + replyTopicOperations = true; + break; + default: break; } @@ -1435,6 +1441,12 @@ public: response->SetPreparedQuery(queryId); } + if (replyTopicOperations) { + if (HasTopicWriteId()) { + response->MutableTopicOperations()->SetWriteId(GetTopicWriteId()); + } + } + // Result for scan query is sent directly to target actor. Y_VERIFY(response->GetArena()); if (QueryState->PreparedQuery && !QueryState->IsStreamResult()) { @@ -1996,6 +2008,7 @@ public: hFunc(TEvKqp::TEvQueryRequest, HandleTopicOps); hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleTopicOps); + hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, HandleTopicOps); hFunc(TEvKqp::TEvCloseSessionRequest, HandleTopicOps); hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); @@ -2074,7 +2087,11 @@ private: ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << message; } - ReplySuccess(); + if (HasTopicWriteOperations() && !HasTopicWriteId()) { + Send(MakeTxProxyID(), new TEvTxUserProxy::TEvAllocateTxId); + } else { + ReplySuccess(); + } } void HandleTopicOps(TEvKqp::TEvCloseSessionRequest::TPtr&) { @@ -2083,6 +2100,27 @@ private: Counters->ReportSessionActorClosedRequest(Settings.DbCounters); } + void HandleTopicOps(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev) { + SetTopicWriteId(NLongTxService::TLockHandle(ev->Get()->TxId, TActivationContext::ActorSystem())); + ReplySuccess(); + } + + bool HasTopicWriteOperations() const { + return QueryState->TxCtx->TopicOperations.HasWriteOperations(); + } + + bool HasTopicWriteId() const { + return QueryState->TxCtx->TopicOperations.HasWriteId(); + } + + ui64 GetTopicWriteId() const { + return QueryState->TxCtx->TopicOperations.GetWriteId(); + } + + void SetTopicWriteId(NLongTxService::TLockHandle handle) { + QueryState->TxCtx->TopicOperations.SetWriteId(std::move(handle)); + } + private: TActorId Owner; TString SessionId; diff --git a/ydb/core/kqp/topics/kqp_topics.cpp b/ydb/core/kqp/topics/kqp_topics.cpp index 7afb30f2f9..5c6a96e1ec 100644 --- a/ydb/core/kqp/topics/kqp_topics.cpp +++ b/ydb/core/kqp/topics/kqp_topics.cpp @@ -201,6 +201,21 @@ bool TTopicOperations::HasWriteOperations() const return HasWriteOperations_; } +bool TTopicOperations::HasWriteId() const +{ + return WriteId_.GetLockId(); +} + +ui64 TTopicOperations::GetWriteId() const +{ + return WriteId_.GetLockId(); +} + +void TTopicOperations::SetWriteId(NLongTxService::TLockHandle handle) +{ + WriteId_ = std::move(handle); +} + bool TTopicOperations::TabletHasReadOperations(ui64 tabletId) const { for (auto& [_, value] : Operations_) { diff --git a/ydb/core/kqp/topics/kqp_topics.h b/ydb/core/kqp/topics/kqp_topics.h index e285b1b06d..4d0164d512 100644 --- a/ydb/core/kqp/topics/kqp_topics.h +++ b/ydb/core/kqp/topics/kqp_topics.h @@ -4,6 +4,7 @@ #include <ydb/public/api/protos/ydb_topic.pb.h> #include <ydb/core/protos/pqconfig.pb.h> +#include <ydb/core/tx/long_tx_service/public/lock_handle.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> #include <library/cpp/actors/core/actor.h> @@ -88,6 +89,9 @@ public: bool HasOperations() const; bool HasReadOperations() const; bool HasWriteOperations() const; + bool HasWriteId() const; + ui64 GetWriteId() const; + void SetWriteId(NLongTxService::TLockHandle handle); bool TabletHasReadOperations(ui64 tabletId) const; @@ -117,6 +121,7 @@ private: bool HasWriteOperations_ = false; TMaybe<TString> Consumer_; + NLongTxService::TLockHandle WriteId_; }; } diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 880a278046..3a7235e97f 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -1725,6 +1725,14 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p return; } + if (req.HasWriteId()) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, + "Tablet " << TabletID() << + " Write in transaction." << + " Partition: " << req.GetPartition() << + ", WriteId: " << req.GetWriteId()); + } + for (ui32 i = 0; i < req.CmdWriteSize(); ++i) { const auto& cmd = req.GetCmdWrite(i); @@ -1882,6 +1890,14 @@ void TPersQueue::HandleReserveBytesRequest(const ui64 responseCookie, const TAct return; } + if (req.HasWriteId()) { + LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, + "Tablet " << TabletID() << + " Reserve bytes in transaction." << + " Partition: " << req.GetPartition() << + ", WriteId: " << req.GetWriteId()); + } + InitResponseBuilder(responseCookie, 1, COUNTER_LATENCY_PQ_RESERVE_BYTES); THolder<TEvPQ::TEvReserveBytes> event = MakeHolder<TEvPQ::TEvReserveBytes>(responseCookie, req.GetCmdReserveBytes().GetSize(), req.GetOwnerCookie(), req.GetMessageNo(), req.GetCmdReserveBytes().GetLastRequest()); @@ -2417,7 +2433,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " tx=" << event.GetTxId() << - ", lock_tx_id=" << txBody.GetLockTxId() << + ", write_id=" << txBody.GetWriteId() << ", path=" << operation.GetPath() << ", partition=" << operation.GetPartitionId() << ", consumer=" << operation.GetConsumer() << @@ -2426,7 +2442,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact ", is_write=" << isWriteOperation); } - if (TabletState != NKikimrPQ::ENormal) { + if ((TabletState != NKikimrPQ::ENormal) || txBody.HasWriteId()) { SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()), event.GetTxId(), ctx); diff --git a/ydb/core/persqueue/writer/CMakeLists.darwin-x86_64.txt b/ydb/core/persqueue/writer/CMakeLists.darwin-x86_64.txt index 29c28de57e..35aa50f7e6 100644 --- a/ydb/core/persqueue/writer/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/persqueue/writer/CMakeLists.darwin-x86_64.txt @@ -16,6 +16,8 @@ target_link_libraries(core-persqueue-writer PUBLIC cpp-string_utils-base64 ydb-core-base core-persqueue-events + grpc_services-cancelation-protos + kqp-common-simple ydb-core-protos public-lib-base lib-deprecated-kicli diff --git a/ydb/core/persqueue/writer/CMakeLists.linux-aarch64.txt b/ydb/core/persqueue/writer/CMakeLists.linux-aarch64.txt index 21840f9a4b..0dc0507f24 100644 --- a/ydb/core/persqueue/writer/CMakeLists.linux-aarch64.txt +++ b/ydb/core/persqueue/writer/CMakeLists.linux-aarch64.txt @@ -17,6 +17,8 @@ target_link_libraries(core-persqueue-writer PUBLIC cpp-string_utils-base64 ydb-core-base core-persqueue-events + grpc_services-cancelation-protos + kqp-common-simple ydb-core-protos public-lib-base lib-deprecated-kicli diff --git a/ydb/core/persqueue/writer/CMakeLists.linux-x86_64.txt b/ydb/core/persqueue/writer/CMakeLists.linux-x86_64.txt index 21840f9a4b..0dc0507f24 100644 --- a/ydb/core/persqueue/writer/CMakeLists.linux-x86_64.txt +++ b/ydb/core/persqueue/writer/CMakeLists.linux-x86_64.txt @@ -17,6 +17,8 @@ target_link_libraries(core-persqueue-writer PUBLIC cpp-string_utils-base64 ydb-core-base core-persqueue-events + grpc_services-cancelation-protos + kqp-common-simple ydb-core-protos public-lib-base lib-deprecated-kicli diff --git a/ydb/core/persqueue/writer/CMakeLists.windows-x86_64.txt b/ydb/core/persqueue/writer/CMakeLists.windows-x86_64.txt index 29c28de57e..35aa50f7e6 100644 --- a/ydb/core/persqueue/writer/CMakeLists.windows-x86_64.txt +++ b/ydb/core/persqueue/writer/CMakeLists.windows-x86_64.txt @@ -16,6 +16,8 @@ target_link_libraries(core-persqueue-writer PUBLIC cpp-string_utils-base64 ydb-core-base core-persqueue-events + grpc_services-cancelation-protos + kqp-common-simple ydb-core-protos public-lib-base lib-deprecated-kicli diff --git a/ydb/core/persqueue/writer/writer.cpp b/ydb/core/persqueue/writer/writer.cpp index 36d13fe700..edb24303e8 100644 --- a/ydb/core/persqueue/writer/writer.cpp +++ b/ydb/core/persqueue/writer/writer.cpp @@ -5,7 +5,10 @@ #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/log.h> +#include <ydb/core/base/path.h> #include <ydb/core/base/tablet_pipe.h> +#include <ydb/core/kqp/common/events/events.h> +#include <ydb/core/kqp/common/simple/services.h> #include <ydb/core/persqueue/events/global.h> #include <ydb/core/persqueue/pq_rl_helpers.h> #include <ydb/public/lib/base/msgbus_status.h> @@ -31,10 +34,14 @@ namespace NKikimr::NPQ { static const ui64 WRITE_BLOCK_SIZE = 4_KB; TString TEvPartitionWriter::TEvInitResult::TSuccess::ToString() const { - return TStringBuilder() << "Success {" + auto out = TStringBuilder() << "Success {" << " OwnerCookie: " << OwnerCookie - << " SourceIdInfo: " << SourceIdInfo.ShortDebugString() - << " }"; + << " SourceIdInfo: " << SourceIdInfo.ShortDebugString(); + if (WriteId != INVALID_WRITE_ID) { + out << " WriteId: " << WriteId; + } + out << " }"; + return out; } TString TEvPartitionWriter::TEvInitResult::TError::ToString() const { @@ -47,6 +54,8 @@ TString TEvPartitionWriter::TEvInitResult::TError::ToString() const { TString TEvPartitionWriter::TEvInitResult::ToString() const { auto out = TStringBuilder() << ToStringHeader() << " {"; + out << " SessionId: " << SessionId; + out << " TxId: " << TxId; if (IsSuccess()) { out << " " << GetResult().ToString(); } else { @@ -59,6 +68,8 @@ TString TEvPartitionWriter::TEvInitResult::ToString() const { TString TEvPartitionWriter::TEvWriteAccepted::ToString() const { return TStringBuilder() << ToStringHeader() << " {" + << " SessionId: " << SessionId + << " TxId: " << TxId << " Cookie: " << Cookie << " }"; } @@ -67,6 +78,8 @@ TString TEvPartitionWriter::TEvWriteResponse::DumpError() const { Y_VERIFY(!IsSuccess()); return TStringBuilder() << "Error {" + << " SessionId: " << SessionId + << " TxId: " << TxId << " Reason: " << GetError().Reason << " Response: " << Record.ShortDebugString() << " }"; @@ -75,6 +88,8 @@ TString TEvPartitionWriter::TEvWriteResponse::DumpError() const { TString TEvPartitionWriter::TEvWriteResponse::ToString() const { auto out = TStringBuilder() << ToStringHeader() << " {"; + out << " SessionId: " << SessionId; + out << " TxId: " << TxId; if (IsSuccess()) { out << " Success { Response: " << Record.ShortDebugString() << " }"; } else { @@ -163,7 +178,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl template <typename... Args> void SendInitResult(Args&&... args) { - Send(Client, new TEvPartitionWriter::TEvInitResult(std::forward<Args>(args)...)); + Send(Client, new TEvPartitionWriter::TEvInitResult(Opts.SessionId, Opts.TxId, std::forward<Args>(args)...)); } void InitResult(const TString& reason, NKikimrClient::TResponse&& response) { @@ -171,13 +186,13 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl BecomeZombie(TEvPartitionWriter::TEvWriteResponse::EErrors::InternalError, "Init error"); } - void InitResult(const TString& ownerCookie, const TEvPartitionWriter::TEvInitResult::TSourceIdInfo& sourceIdInfo) { - SendInitResult(ownerCookie, sourceIdInfo); + void InitResult(const TString& ownerCookie, const TEvPartitionWriter::TEvInitResult::TSourceIdInfo& sourceIdInfo, ui64 writeId) { + SendInitResult(ownerCookie, sourceIdInfo, writeId); } template <typename... Args> void SendWriteResult(Args&&... args) { - Send(Client, new TEvPartitionWriter::TEvWriteResponse(std::forward<Args>(args)...)); + Send(Client, new TEvPartitionWriter::TEvWriteResponse(Opts.SessionId, Opts.TxId, std::forward<Args>(args)...)); } void WriteResult(TEvPartitionWriter::TEvWriteResponse::EErrors errorCode, const TString& reason, NKikimrClient::TResponse&& response) { @@ -191,7 +206,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl } void WriteAccepted(ui64 cookie) { - Send(Client, new TEvPartitionWriter::TEvWriteAccepted(cookie)); + Send(Client, new TEvPartitionWriter::TEvWriteAccepted(Opts.SessionId, Opts.TxId, cookie)); } void Disconnected(TEvPartitionWriter::TEvWriteResponse::EErrors errorCode) { @@ -199,6 +214,77 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl BecomeZombie(errorCode, "Disconnected"); } + /// GetWriteId + + void GetWriteId(const TActorContext& ctx) { + auto ev = MakeWriteIdRequest(); + ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); + Become(&TThis::StateGetWriteId); + } + + STATEFN(StateGetWriteId) { + switch (ev->GetTypeRewrite()) { + HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleWriteId); + hFunc(TEvPartitionWriter::TEvWriteRequest, HoldPending); + default: + return StateBase(ev); + } + } + + void HandleWriteId(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { + Y_UNUSED(ctx); + + auto& record = ev->Get()->Record.GetRef(); + WriteId = record.GetResponse().GetTopicOperations().GetWriteId(); + + LOG_DEBUG_S(ctx, NKikimrServices::PQ_WRITE_PROXY, + "SessionId: " << Opts.SessionId << + " TxId: " << Opts.TxId << + " WriteId: " << WriteId); + + GetOwnership(); + } + + THolder<NKqp::TEvKqp::TEvQueryRequest> MakeWriteIdRequest() { + auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); + + if (Opts.Token) { + ev->Record.SetUserToken(Opts.Token); + } + //ev->Record.SetRequestActorId(???); + + ev->Record.MutableRequest()->SetDatabase(CanonizePath(Opts.Database)); + ev->Record.MutableRequest()->SetSessionId(Opts.SessionId); + ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_UNDEFINED); + ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_TOPIC); + + if (Opts.TraceId) { + ev->Record.SetTraceId(Opts.TraceId); + } + + if (Opts.RequestType) { + ev->Record.SetRequestType(Opts.RequestType); + } + + //ev->Record.MutableRequest()->SetCancelAfterMs(???); + //ev->Record.MutableRequest()->SetTimeoutMs(???); + + ev->Record.MutableRequest()->MutableTxControl()->set_tx_id(Opts.TxId); + + auto* topics = ev->Record.MutableRequest()->MutableTopicOperations()->AddTopics(); + topics->set_path(Opts.TopicPath); + auto* partitions = topics->add_partitions(); + partitions->set_partition_id(PartitionId); + + return ev; + } + + void SetWriteId(NKikimrClient::TPersQueuePartitionRequest& request) { + if (WriteId != INVALID_WRITE_ID) { + request.SetWriteId(WriteId); + } + } + /// GetOwnership void GetOwnership() { @@ -298,7 +384,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl } } - InitResult(OwnerCookie, sourceIdInfo); + InitResult(OwnerCookie, sourceIdInfo, WriteId); Become(&TThis::StateWork); if (Pending) { @@ -399,6 +485,8 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl auto& request = *ev->Record.MutablePartitionRequest(); request.SetMessageNo(MessageNo++); + SetWriteId(request); + auto& cmd = *request.MutableCmdReserveBytes(); cmd.SetSize(it->second.ByteSize()); cmd.SetLastRequest(false); @@ -511,6 +599,8 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl auto& request = *ev->Record.MutablePartitionRequest(); request.SetMessageNo(MessageNo++); + SetWriteId(request); + if (!Opts.UseDeduplication) { request.SetPartition(PartitionId); } @@ -666,7 +756,7 @@ public: } } - void Bootstrap() { + void Bootstrap(const TActorContext& ctx) { NTabletPipe::TClientConfig config; config.RetryPolicy = { .RetryLimitCount = 6, @@ -677,7 +767,12 @@ public: }; PipeClient = RegisterWithSameMailbox(NTabletPipe::CreateClient(SelfId(), TabletId, config)); - GetOwnership(); + + if (Opts.Database && Opts.SessionId && Opts.TxId) { + GetWriteId(ctx); + } else { + GetOwnership(); + } } STATEFN(StateBase) { @@ -697,6 +792,7 @@ public: } private: + const TActorId Client; const ui64 TabletId; const ui32 PartitionId; @@ -731,6 +827,7 @@ private: TEvPartitionWriter::TEvWriteResponse::EErrors ErrorCode = TEvPartitionWriter::TEvWriteResponse::EErrors::InternalError; + ui64 WriteId = INVALID_WRITE_ID; }; // TPartitionWriter IActor* CreatePartitionWriter(const TActorId& client, const std::optional<TString>& topicPath, ui64 tabletId, ui32 partitionId, diff --git a/ydb/core/persqueue/writer/writer.h b/ydb/core/persqueue/writer/writer.h index fb2f8dd260..b53844cfbf 100644 --- a/ydb/core/persqueue/writer/writer.h +++ b/ydb/core/persqueue/writer/writer.h @@ -11,6 +11,8 @@ namespace NKikimr::NPQ { +constexpr ui64 INVALID_WRITE_ID = Max<ui64>(); + struct TEvPartitionWriter { enum EEv { EvInitResult = EventSpaceBegin(TKikimrEvents::ES_PQ_PARTITION_WRITER), @@ -30,24 +32,35 @@ struct TEvPartitionWriter { struct TSuccess { TString OwnerCookie; TSourceIdInfo SourceIdInfo; + ui64 WriteId = INVALID_WRITE_ID; + TString ToString() const; }; struct TError { TString Reason; NKikimrClient::TResponse Response; + TString ToString() const; }; + TString SessionId; + TString TxId; std::variant<TSuccess, TError> Result; - explicit TEvInitResult(const TString& ownerCookie, const TSourceIdInfo& sourceIdInfo) - : Result(TSuccess{ownerCookie, sourceIdInfo}) + TEvInitResult(const TString& sessionId, const TString& txId, + const TString& ownerCookie, const TSourceIdInfo& sourceIdInfo, ui64 writeId) + : SessionId(sessionId) + , TxId(txId) + , Result(TSuccess{ownerCookie, sourceIdInfo, writeId}) { } - explicit TEvInitResult(const TString& reason, NKikimrClient::TResponse&& response) - : Result(TError{reason, std::move(response)}) + TEvInitResult(const TString& sessionId, const TString& txId, + const TString& reason, NKikimrClient::TResponse&& response) + : SessionId(sessionId) + , TxId(txId) + , Result(TError{reason, std::move(response)}) { } @@ -67,10 +80,14 @@ struct TEvPartitionWriter { }; struct TEvWriteAccepted: public TEventLocal<TEvWriteAccepted, EvWriteAccepted> { + TString SessionId; + TString TxId; ui64 Cookie; - explicit TEvWriteAccepted(ui64 cookie) - : Cookie(cookie) + TEvWriteAccepted(const TString& sessionId, const TString& txId, ui64 cookie) + : SessionId(sessionId) + , TxId(txId) + , Cookie(cookie) { } @@ -95,18 +112,26 @@ struct TEvPartitionWriter { TString Reason; }; + TString SessionId; + TString TxId; std::variant<TSuccess, TError> Result; TEvWriteResponse() = default; - explicit TEvWriteResponse(NKikimrClient::TResponse&& response) - : Result(TSuccess{}) + TEvWriteResponse(const TString& sessionId, const TString& txId, + NKikimrClient::TResponse&& response) + : SessionId(sessionId) + , TxId(txId) + , Result(TSuccess{}) { Record = std::move(response); } - explicit TEvWriteResponse(const EErrors code, const TString& reason, NKikimrClient::TResponse&& response) - : Result(TError{code, reason}) + TEvWriteResponse(const TString& sessionId, const TString& txId, + const EErrors code, const TString& reason, NKikimrClient::TResponse&& response) + : SessionId(sessionId) + , TxId(txId) + , Result(TError{code, reason}) { Record = std::move(response); } @@ -126,6 +151,13 @@ struct TPartitionWriterOpts { bool CheckState = false; bool AutoRegister = false; bool UseDeduplication = true; + TString Database; + TString TopicPath; + TString Token; + TString SessionId; + TString TxId; + TString TraceId; + TString RequestType; std::optional<NKikimrPQ::TPQTabletConfig::EMeteringMode> MeteringMode; TRlContext RlCtx; @@ -136,6 +168,13 @@ struct TPartitionWriterOpts { TPartitionWriterOpts& WithAutoRegister(bool value) { AutoRegister = value; return *this; } TPartitionWriterOpts& WithDeduplication(bool value) { UseDeduplication = value; return *this; } TPartitionWriterOpts& WithCheckRequestUnits(const NKikimrPQ::TPQTabletConfig::EMeteringMode meteringMode , const TRlContext& rlCtx) { MeteringMode = meteringMode; RlCtx = rlCtx; return *this; } + TPartitionWriterOpts& WithDatabase(const TString& value) { Database = value; return *this; } + TPartitionWriterOpts& WithTopicPath(const TString& value) { TopicPath = value; return *this; } + TPartitionWriterOpts& WithToken(const TString& value) { Token = value; return *this; } + TPartitionWriterOpts& WithSessionId(const TString& value) { SessionId = value; return *this; } + TPartitionWriterOpts& WithTxId(const TString& value) { TxId = value; return *this; } + TPartitionWriterOpts& WithTraceId(const TString& value) { TraceId = value; return *this; } + TPartitionWriterOpts& WithRequestType(const TString& value) { RequestType = value; return *this; } }; IActor* CreatePartitionWriter(const TActorId& client, const std::optional<TString>& topicPath, ui64 tabletId, ui32 partitionId, const std::optional<ui32> expectedGeneration, const TString& sourceId, diff --git a/ydb/core/persqueue/writer/ya.make b/ydb/core/persqueue/writer/ya.make index 0ab2b4e9ab..d8848d37cc 100644 --- a/ydb/core/persqueue/writer/ya.make +++ b/ydb/core/persqueue/writer/ya.make @@ -12,6 +12,8 @@ PEERDIR( library/cpp/string_utils/base64 ydb/core/base ydb/core/persqueue/events + ydb/core/grpc_services/cancelation/protos + ydb/core/kqp/common/simple ydb/core/protos ydb/public/lib/base ydb/public/lib/deprecated/kicli diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index 4fd27d332c..bf18c4358f 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -71,11 +71,15 @@ enum EQueryReplyFlags { QUERY_REPLY_FLAG_AST = 4; }; -message TTopicOperations { +message TTopicOperationsRequest { optional string Consumer = 1; repeated Ydb.Topic.UpdateOffsetsInTransactionRequest.TopicOffsets Topics = 2; } +message TTopicOperationsResponse { + optional int64 WriteId = 1; +}; + message TQueryRequest { optional bytes SessionId = 1; optional string Query = 2; @@ -98,7 +102,7 @@ message TQueryRequest { reserved 19; // (deprecated) StatsMode optional NYql.NDqProto.EDqStatsMode StatsMode = 20; // deprecated optional Ydb.Table.QueryStatsCollection.Mode CollectStats = 21; - optional TTopicOperations TopicOperations = 22; + optional TTopicOperationsRequest TopicOperations = 22; optional bool UsePublicResponseDataFormat = 23; map<string, Ydb.TypedValue> YdbParameters = 24; optional bool IsInternalCall = 25; @@ -256,6 +260,7 @@ message TQueryResponse { optional Ydb.Table.TransactionMeta TxMeta = 11; optional NKqpProto.TKqpStatsQuery QueryStats = 12; repeated Ydb.ResultSet YdbResults = 13; + optional TTopicOperationsResponse TopicOperations = 14; } message TEvQueryResponse { diff --git a/ydb/core/protos/msgbus_pq.proto b/ydb/core/protos/msgbus_pq.proto index ba7c162d32..d3d09f1b3f 100644 --- a/ydb/core/protos/msgbus_pq.proto +++ b/ydb/core/protos/msgbus_pq.proto @@ -120,6 +120,7 @@ message TPersQueuePartitionRequest { optional string OwnerCookie = 3; //mandatory for write optional int64 MessageNo = 12; //mandatory for write optional int64 CmdWriteOffset = 13; //optional + optional int64 WriteId = 23; repeated TCmdWrite CmdWrite = 4; optional TCmdGetMaxSeqNo CmdGetMaxSeqNo = 5; diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index 719d27f454..335cd494ea 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -861,7 +861,7 @@ message TDataTransaction { repeated uint64 SendingShards = 3; repeated uint64 ReceivingShards = 4; optional bool Immediate = 5; - optional fixed64 LockTxId = 6; + optional fixed64 WriteId = 6; } message TConfigTransaction { diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp index 0cfdf77356..593a036292 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp @@ -54,6 +54,11 @@ void TWriteSession::WriteEncoded(TContinuationToken&& token, TStringBuf data, EC Impl->WriteInternal(std::move(token), std::move(message)); } +void TWriteSession::WriteEncoded(TContinuationToken&& token, TWriteMessage&& message) +{ + Impl->WriteInternal(std::move(token), std::move(message)); +} + void TWriteSession::Write(TContinuationToken&& token, TStringBuf data, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp) { TWriteMessage message{data}; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h index a12c61b4b9..d4e11d3408 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h @@ -40,6 +40,8 @@ public: void Write(TContinuationToken&& continuationToken, TWriteMessage&& message) override; + void WriteEncoded(TContinuationToken&& continuationToken, TWriteMessage&& message) override; + NThreading::TFuture<void> WaitEvent() override; // Empty maybe - block till all work is done. Otherwise block at most at closeTimeout duration. diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp index ee236e67de..e86ca445fd 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp @@ -376,7 +376,8 @@ void TWriteSessionImpl::WriteInternal(TContinuationToken&&, TWriteMessage&& mess with_lock(Lock) { CurrentBatch.Add( GetNextSeqNoImpl(message.SeqNo_), createdAtValue, message.Data, message.Codec, message.OriginalSize, - message.MessageMeta_ + message.MessageMeta_, + message.GetTxPtr() ); FlushWriteIfRequiredImpl(); @@ -392,6 +393,10 @@ void TWriteSessionImpl::Write(TContinuationToken&& token, TWriteMessage&& messag WriteInternal(std::move(token), std::move(message)); } +void TWriteSessionImpl::WriteEncoded(TContinuationToken&& token, TWriteMessage&& message) +{ + WriteInternal(std::move(token), std::move(message)); +} TWriteSessionImpl::THandleResult TWriteSessionImpl::OnErrorImpl(NYdb::TPlainStatus&& status) { Y_VERIFY(Lock.IsLocked()); @@ -1078,9 +1083,11 @@ size_t TWriteSessionImpl::WriteBatchImpl() { (*Counters->MessagesInflight)++; if (!currMessage.MessageMeta.empty()) { OriginalMessagesToSend.emplace(sequenceNumber, createTs, datum.size(), - std::move(currMessage.MessageMeta)); + std::move(currMessage.MessageMeta), + currMessage.Tx); } else { - OriginalMessagesToSend.emplace(sequenceNumber, createTs, datum.size()); + OriginalMessagesToSend.emplace(sequenceNumber, createTs, datum.size(), + currMessage.Tx); } } block.Data = std::move(CurrentBatch.Data); @@ -1161,6 +1168,10 @@ void TWriteSessionImpl::SendImpl() { auto* msgData = writeRequest->add_messages(); + if (message.Tx) { + writeRequest->mutable_tx()->set_id(message.Tx->GetId()); + writeRequest->mutable_tx()->set_session(message.Tx->GetSession().GetId()); + } msgData->set_seq_no(message.SeqNo + SeqNoShift); *msgData->mutable_created_at() = ::google::protobuf::util::TimeUtil::MillisecondsToTimestamp(message.CreatedAt.MilliSeconds()); diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h index e0a986c0af..e5ed606a66 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h @@ -174,14 +174,18 @@ private: TMaybe<ECodec> Codec; ui32 OriginalSize; // only for coded messages TVector<std::pair<TString, TString>> MessageMeta; + const NTable::TTransaction* Tx; + TMessage(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec = {}, - ui32 originalSize = 0, const TVector<std::pair<TString, TString>>& messageMeta = {}) + ui32 originalSize = 0, const TVector<std::pair<TString, TString>>& messageMeta = {}, + const NTable::TTransaction* tx = nullptr) : SeqNo(seqNo) , CreatedAt(createdAt) , DataRef(data) , Codec(codec) , OriginalSize(originalSize) , MessageMeta(messageMeta) + , Tx(tx) {} }; @@ -192,12 +196,14 @@ private: TInstant StartedAt = TInstant::Zero(); bool Acquired = false; bool FlushRequested = false; + void Add(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize, - const TVector<std::pair<TString, TString>>& messageMeta) { + const TVector<std::pair<TString, TString>>& messageMeta, + const NTable::TTransaction* tx) { if (StartedAt == TInstant::Zero()) StartedAt = TInstant::Now(); CurrentSize += codec ? originalSize : data.size(); - Messages.emplace_back(seqNo, createdAt, data, codec, originalSize, messageMeta); + Messages.emplace_back(seqNo, createdAt, data, codec, originalSize, messageMeta, tx); Acquired = false; } @@ -267,17 +273,24 @@ private: TInstant CreatedAt; size_t Size; TVector<std::pair<TString, TString>> MessageMeta; - TOriginalMessage(const ui64 sequenceNumber, const TInstant createdAt, const size_t size) + const NTable::TTransaction* Tx; + + TOriginalMessage(const ui64 sequenceNumber, const TInstant createdAt, const size_t size, + const NTable::TTransaction* tx) : SeqNo(sequenceNumber) , CreatedAt(createdAt) , Size(size) + , Tx(tx) {} + TOriginalMessage(const ui64 sequenceNumber, const TInstant createdAt, const size_t size, - TVector<std::pair<TString, TString>>&& messageMeta) + TVector<std::pair<TString, TString>>&& messageMeta, + const NTable::TTransaction* tx) : SeqNo(sequenceNumber) , CreatedAt(createdAt) , Size(size) , MessageMeta(std::move(messageMeta)) + , Tx(tx) {} }; @@ -329,6 +342,8 @@ public: Y_FAIL("Do not use this method"); }; + void WriteEncoded(TContinuationToken&& continuationToken, TWriteMessage&& message) override; + void WriteEncoded(TContinuationToken&&, TStringBuf, ECodec, ui32, TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) override { Y_UNUSED(seqNo); diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h index 5c8e2bfa73..0fecce9af0 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h +++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h @@ -1573,6 +1573,13 @@ public: //! Message metadata. Limited to 4096 characters overall (all keys and values combined). FLUENT_SETTING(TMessageMeta, MessageMeta); + //! Transaction id + FLUENT_SETTING_OPTIONAL(std::reference_wrapper<NTable::TTransaction>, Tx); + + const NTable::TTransaction* GetTxPtr() const + { + return Tx_ ? &Tx_->get() : nullptr; + } }; //! Simple write session. Does not need event handlers. Does not provide Events, ContinuationTokens, write Acks. @@ -1632,6 +1639,10 @@ public: virtual void Write(TContinuationToken&& continuationToken, TStringBuf data, TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) = 0; + //! Write single message that is already coded by codec. + //! continuationToken - a token earlier provided to client with ReadyToAccept event. + virtual void WriteEncoded(TContinuationToken&& continuationToken, TWriteMessage&& params) = 0; + //! Write single message that is already compressed by codec. Old method with only basic message options. virtual void WriteEncoded(TContinuationToken&& continuationToken, TStringBuf data, ECodec codec, ui32 originalSize, TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) = 0; diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp index ce9c3f88ef..dc2f938d60 100644 --- a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp @@ -12,12 +12,23 @@ Y_UNIT_TEST_SUITE(TxUsage) { NKikimr::Tests::TServerSettings MakeServerSettings() { + auto loggerInitializer = [](TTestActorRuntime& runtime) { + runtime.SetLogPriority(NKikimrServices::PQ_READ_PROXY, NActors::NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::PQ_WRITE_PROXY, NActors::NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::PQ_MIRRORER, NActors::NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::PQ_METACACHE, NActors::NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::PERSQUEUE, NActors::NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::PERSQUEUE_CLUSTER_TRACKER, NActors::NLog::PRI_DEBUG); + }; + auto settings = PQSettings(0); settings.SetDomainName("Root"); settings.SetEnableTopicServiceTx(true); settings.PQConfig.SetTopicsAreFirstClassCitizen(true); settings.PQConfig.SetRoot("/Root"); settings.PQConfig.SetDatabase("/Root"); + settings.SetLoggerInitializer(loggerInitializer); + return settings; } @@ -114,19 +125,21 @@ protected: void WriteMessage(const TString& data); -private: +protected: + const TDriver& GetDriver(); + TString GetTopicPath() const; + TString GetMessageGroupId() const; + +private: TString GetTopicName() const; TString GetConsumerName() const; - TString GetMessageGroupId() const; template<class E> E ReadEvent(TTopicReadSessionPtr reader, NTable::TTransaction& tx); template<class E> E ReadEvent(TTopicReadSessionPtr reader); - const TDriver& GetDriver(); - std::shared_ptr<TEnvironment> Env; TMaybe<TDriver> Driver; }; @@ -317,6 +330,34 @@ Y_UNIT_TEST_F(TwoSessionOneConsumer, TFixture) CommitTx(tx1, EStatus::ABORTED); } +Y_UNIT_TEST_F(WriteToTopic, TFixture) +{ + NTopic::TWriteSessionSettings options; + options.Path(GetTopicPath()); + options.MessageGroupId(GetMessageGroupId()); + + auto session = CreateSession(); + auto tx = BeginTx(session); + + auto writeMessages = [&](const TVector<TString>& messages) { + NTopic::TTopicClient client(GetDriver()); + auto session = client.CreateSimpleBlockingWriteSession(options); + + for (auto& message : messages) { + NTopic::TWriteMessage params(message); + params.Tx(tx); + UNIT_ASSERT(session->Write(std::move(params))); + } + + UNIT_ASSERT(session->Close()); + }; + + writeMessages({"a", "bb", "ccc", "dddd"}); + writeMessages({"eeeee", "ffffff", "ggggggg"}); + + CommitTx(tx, EStatus::ABORTED); +} + } } diff --git a/ydb/services/persqueue_v1/actors/partition_writer.cpp b/ydb/services/persqueue_v1/actors/partition_writer.cpp new file mode 100644 index 0000000000..4827405eb5 --- /dev/null +++ b/ydb/services/persqueue_v1/actors/partition_writer.cpp @@ -0,0 +1,109 @@ +namespace NKikimr::NGRpcProxy::V1 { + +const ui32 MAX_RESERVE_REQUESTS_INFLIGHT = 5; + +template<class TEvWrite> +TPartitionWriterImpl<TEvWrite>::TPartitionWriterImpl(NKikimr::NPQ::TMultiCounter& bytesInflight, + NKikimr::NPQ::TMultiCounter& bytesInflightTotal, + ui64& bytesInflight_, + ui64& bytesInflightTotal_) : + BytesInflight(bytesInflight), + BytesInflightTotal(bytesInflightTotal), + BytesInflight_(bytesInflight_), + BytesInflightTotal_(bytesInflightTotal_) +{ +} + +template<class TEvWrite> +void TPartitionWriterImpl<TEvWrite>::OnEvInitResult(NPQ::TEvPartitionWriter::TEvInitResult::TPtr& ev) +{ + const auto& result = *ev->Get(); + Y_VERIFY(result.IsSuccess()); + OwnerCookie = result.GetResult().OwnerCookie; +} + +template<class TEvWrite> +ui64 TPartitionWriterImpl<TEvWrite>::OnEvWriteAccepted(NPQ::TEvPartitionWriter::TEvWriteAccepted::TPtr& ev) +{ + Y_VERIFY(!SentRequests.empty()); + + auto request = std::move(SentRequests.front()); + Y_VERIFY(ev->Get()->Cookie == request->Cookie); + + SentRequests.pop_front(); + + ui64 size = request->ByteSize; + + AcceptedRequests.emplace_back(std::move(request)); + + return size; +} + +template<class TEvWrite> +auto TPartitionWriterImpl<TEvWrite>::OnEvWriteResponse(NPQ::TEvPartitionWriter::TEvWriteResponse::TPtr& ev) -> TWriteRequestInfoPtr +{ + Y_VERIFY(!AcceptedRequests.empty()); + + auto request = std::move(AcceptedRequests.front()); + AcceptedRequests.pop_front(); + + const auto& resp = ev->Get()->Record.GetPartitionResponse(); + Y_VERIFY(resp.GetCookie() == request->Cookie); + + return request; +} + +template<class TEvWrite> +bool TPartitionWriterImpl<TEvWrite>::AnyRequests() const +{ + return !QuotedRequests.empty() || !SentRequests.empty() || !AcceptedRequests.empty(); +} + +template<class TEvWrite> +void TPartitionWriterImpl<TEvWrite>::AddWriteRequest(TWriteRequestInfoPtr request, const TActorContext& ctx) +{ + if (!QuotedRequests.empty() && SentRequests.size() < MAX_RESERVE_REQUESTS_INFLIGHT) { + SendRequest(std::move(request), ctx); + } else { + QuotedRequests.emplace_back(std::move(request)); + } +} + +template<class TEvWrite> +bool TPartitionWriterImpl<TEvWrite>::TrySendNextQuotedRequest(const TActorContext& ctx) +{ + if (QuotedRequests.empty()) { + return false; + } + + SendRequest(std::move(QuotedRequests.front()), ctx); + QuotedRequests.pop_front(); + + return true; +} + +template<class TEvWrite> +void TPartitionWriterImpl<TEvWrite>::SendRequest(TWriteRequestInfoPtr request, const TActorContext& ctx) +{ + Y_VERIFY(request->PartitionWriteRequest); + + i64 diff = 0; + for (const auto& w : request->UserWriteRequests) { + diff -= w->Request.ByteSize(); + } + + Y_VERIFY(-diff <= (i64)BytesInflight_); + diff += request->PartitionWriteRequest->Record.ByteSize(); + BytesInflight_ += diff; + BytesInflightTotal_ += diff; + + if (BytesInflight && BytesInflightTotal) { + BytesInflight.Inc(diff); + BytesInflightTotal.Inc(diff); + } + + ctx.Send(PartitionWriterActor, std::move(request->PartitionWriteRequest)); + SentRequests.emplace_back(std::move(request)); +} + +} diff --git a/ydb/services/persqueue_v1/actors/partition_writer.h b/ydb/services/persqueue_v1/actors/partition_writer.h new file mode 100644 index 0000000000..b47ddfaa82 --- /dev/null +++ b/ydb/services/persqueue_v1/actors/partition_writer.h @@ -0,0 +1,59 @@ +#pragma once + +#include <ydb/core/persqueue/writer/writer.h> +#include <ydb/services/persqueue_v1/actors/events.h> +#include <ydb/services/persqueue_v1/actors/write_request_info.h> + +namespace NKikimr::NGRpcProxy::V1 { + +template<class TEvWrite> +struct TPartitionWriterImpl { + using TWriteRequestInfoPtr = typename TWriteRequestInfoImpl<TEvWrite>::TPtr; + + TPartitionWriterImpl(NKikimr::NPQ::TMultiCounter& bytesInflight, + NKikimr::NPQ::TMultiCounter& bytesInflightTotal, + ui64& bytesInflight_, + ui64& bytesInflightTotal_); + + // + // from client + // + void OnEvWrite(TEvPQProxy::TEvTopicWrite::TPtr& ev, const TActorContext& ctx); + + // + // from partition writer actor + // + void OnEvInitResult(NPQ::TEvPartitionWriter::TEvInitResult::TPtr& ev); + ui64 OnEvWriteAccepted(NPQ::TEvPartitionWriter::TEvWriteAccepted::TPtr& ev); + TWriteRequestInfoPtr OnEvWriteResponse(NPQ::TEvPartitionWriter::TEvWriteResponse::TPtr& ev); + + // + // from quoter + // + void OnEvWakeup(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx); + + bool AnyRequests() const; + void AddWriteRequest(TWriteRequestInfoPtr request, const TActorContext& ctx); + bool TrySendNextQuotedRequest(const TActorContext& ctx); + void SendRequest(TWriteRequestInfoPtr request, const TActorContext& ctx); + + TActorId PartitionWriterActor; + TString OwnerCookie; + + // Quoted, but not sent requests + TDeque<TWriteRequestInfoPtr> QuotedRequests; + // Requests that is sent to partition actor, but not accepted + TDeque<TWriteRequestInfoPtr> SentRequests; + // Accepted requests + TDeque<TWriteRequestInfoPtr> AcceptedRequests; + + NKikimr::NPQ::TMultiCounter& BytesInflight; + NKikimr::NPQ::TMultiCounter& BytesInflightTotal; + + ui64& BytesInflight_; + ui64& BytesInflightTotal_; +}; + +} + +#include "partition_writer.cpp" diff --git a/ydb/services/persqueue_v1/actors/write_request_info.h b/ydb/services/persqueue_v1/actors/write_request_info.h new file mode 100644 index 0000000000..dc21640cd2 --- /dev/null +++ b/ydb/services/persqueue_v1/actors/write_request_info.h @@ -0,0 +1,50 @@ +#pragma once + +namespace NKikimr::NGRpcProxy::V1 { + +template<class TEvWrite> +struct TWriteRequestInfoImpl : public TSimpleRefCount<TWriteRequestInfoImpl<TEvWrite>> { + using TPtr = TIntrusivePtr<TWriteRequestInfoImpl<TEvWrite>>; + + explicit TWriteRequestInfoImpl(ui64 cookie) + : PartitionWriteRequest(new NPQ::TEvPartitionWriter::TEvWriteRequest(cookie)) + , Cookie(cookie) + , ByteSize(0) + , RequiredQuota(0) + { + } + + std::pair<TString, TString> GetTransactionId() const; + + // Source requests from user (grpc session object) + std::deque<THolder<TEvWrite>> UserWriteRequests; + + // Partition write request + THolder<NPQ::TEvPartitionWriter::TEvWriteRequest> PartitionWriteRequest; + + // Formed write request's cookie + ui64 Cookie; + + // Formed write request's size + ui64 ByteSize; + + // Quota in term of RUs + ui64 RequiredQuota; +}; + +template<class TEvWrite> +std::pair<TString, TString> TWriteRequestInfoImpl<TEvWrite>::GetTransactionId() const +{ + Y_VERIFY(!UserWriteRequests.empty()); + + static constexpr bool UseMigrationProtocol = !std::is_same_v<TEvWrite, TEvPQProxy::TEvTopicWrite>; + + if constexpr (UseMigrationProtocol) { + return {"", ""}; + } else { + auto& request = UserWriteRequests.front()->Request.write_request(); + return {request.tx().session(), request.tx().id()}; + } +} + +} diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.h b/ydb/services/persqueue_v1/actors/write_session_actor.h index bc679a850a..ece2cd0992 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.h +++ b/ydb/services/persqueue_v1/actors/write_session_actor.h @@ -1,7 +1,9 @@ #pragma once #include "events.h" +#include "partition_writer.h" #include "persqueue_utils.h" +#include "write_request_info.h" #include <library/cpp/actors/core/actor_bootstrapped.h> @@ -52,32 +54,8 @@ class TWriteSessionActor using TEvDescribeTopicsResponse = NMsgBusProxy::NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeTopicsResponse; using TEvDescribeTopicsRequest = NMsgBusProxy::NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeTopicsRequest; - struct TWriteRequestInfo: public TSimpleRefCount<TWriteRequestInfo> { - using TPtr = TIntrusivePtr<TWriteRequestInfo>; - - explicit TWriteRequestInfo(ui64 cookie) - : PartitionWriteRequest(new NPQ::TEvPartitionWriter::TEvWriteRequest(cookie)) - , Cookie(cookie) - , ByteSize(0) - , RequiredQuota(0) - { - } - - // Source requests from user (grpc session object) - std::deque<THolder<TEvWrite>> UserWriteRequests; - - // Partition write request - THolder<NPQ::TEvPartitionWriter::TEvWriteRequest> PartitionWriteRequest; - - // Formed write request's cookie - ui64 Cookie; - - // Formed write request's size - ui64 ByteSize; - - // Quota in term of RUs - ui64 RequiredQuota; - }; + using TWriteRequestInfo = TWriteRequestInfoImpl<TEvWrite>; + using TPartitionWriter = TPartitionWriterImpl<TEvWrite>; // Codec ID size in bytes static constexpr ui32 CODEC_ID_SIZE = 1; @@ -181,7 +159,7 @@ private: void MakeAndSentInitResponse(const TMaybe<ui64>& maxSeqNo, const TActorContext& ctx); void Handle(NPQ::TEvPartitionWriter::TEvWriteAccepted::TPtr& ev, const TActorContext& ctx); - void ProcessWriteResponse(const NKikimrClient::TPersQueuePartitionResponse& response, const TActorContext& ctx); + void ProcessWriteResponse(const NKikimrClient::TPersQueuePartitionResponse& response, TPartitionWriter& writer, const TActorContext& ctx); void Handle(NPQ::TEvPartitionWriter::TEvWriteResponse::TPtr& ev, const TActorContext& ctx); void Handle(NPQ::TEvPartitionWriter::TEvDisconnected::TPtr& ev, const TActorContext& ctx); void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const NActors::TActorContext& ctx); @@ -196,12 +174,17 @@ private: void CheckFinish(const NActors::TActorContext& ctx); void PrepareRequest(THolder<TEvWrite>&& ev, const TActorContext& ctx); - void SendRequest(typename TWriteRequestInfo::TPtr&& request, const TActorContext& ctx); + void SendRequest(TPartitionWriter& writer, typename TWriteRequestInfo::TPtr&& request, const TActorContext& ctx); void SetupCounters(); void SetupCounters(const TString& cloudId, const TString& dbId, const TString& dbPath, const bool isServerless, const TString& folderId); private: + TPartitionWriter* FindPartitionWriter(const TString& sessionId, const TString& txId); + void InitPartitionWriter(const TString& sessionId, const TString& txId, + const TActorContext& ctx); + bool AnyRequests() const; + std::unique_ptr<TEvStreamWriteRequest> Request; enum EState { @@ -218,7 +201,6 @@ private: EState State; TActorId SchemeCache; - TActorId Writer; TString PeerName; ui64 Cookie; @@ -242,15 +224,9 @@ private: THolder<TAclWrapper> ACL; // Future batch request to partition actor - typename TWriteRequestInfo::TPtr PendingRequest; + std::deque<typename TWriteRequestInfo::TPtr> PendingRequests; // Request that is waiting for quota typename TWriteRequestInfo::TPtr PendingQuotaRequest; - // Quoted, but not sent requests - std::deque<typename TWriteRequestInfo::TPtr> QuotedRequests; - // Requests that is sent to partition actor, but not accepted - std::deque<typename TWriteRequestInfo::TPtr> SentRequests; - // Accepted requests - std::deque<typename TWriteRequestInfo::TPtr> AcceptedRequests; bool WritesDone; @@ -321,6 +297,7 @@ private: TDeque<ui64> SeqNoInflight; + THashMap<std::pair<TString, TString>, TPartitionWriter> Writers; }; } diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp index 0b06d0be56..a4f2a26abe 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp @@ -154,13 +154,10 @@ inline void FillChunkDataFromReq( *msgMeta = msg.metadata_items(); } -namespace NGRpcProxy { -namespace V1 { +namespace NGRpcProxy::V1 { using namespace Ydb::PersQueue::V1; -static const ui32 MAX_RESERVE_REQUESTS_INFLIGHT = 5; - static const ui32 MAX_BYTES_INFLIGHT = 1_MB; static const TDuration SOURCEID_UPDATE_PERIOD = TDuration::Hours(1); @@ -295,8 +292,9 @@ template<bool UseMigrationProtocol> void TWriteSessionActor<UseMigrationProtocol>::Die(const TActorContext& ctx) { if (State == ES_DYING) return; - if (Writer) - ctx.Send(Writer, new TEvents::TEvPoisonPill()); + for (auto& [_, writer] : Writers) { + ctx.Send(writer.PartitionWriterActor, new TEvents::TEvPoisonPill()); + } if (SessionsActive) { SessionsActive.Dec(); @@ -339,7 +337,7 @@ void TWriteSessionActor<UseMigrationProtocol>::CheckFinish(const TActorContext& CloseSession("out of order Writes done before initialization", PersQueue::ErrorCode::BAD_REQUEST, ctx); return; } - if (!PendingRequest && !PendingQuotaRequest && QuotedRequests.empty() && SentRequests.empty() && AcceptedRequests.empty()) { + if (PendingRequests.empty() && !PendingQuotaRequest && !AnyRequests()) { CloseSession("", PersQueue::ErrorCode::OK, ctx); return; } @@ -928,7 +926,6 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NKqp::TEvKqp::TEvQueryResp GetOrProcessPartition(ctx); return; } else if (State == EState::ES_WAIT_TABLE_REQUEST_2) { - SourceIdUpdatesInflight--; if (!SourceIdUpdatesInflight) { LastSourceIdUpdate = ctx.Now(); @@ -1033,10 +1030,8 @@ void TWriteSessionActor<UseMigrationProtocol>::ProceedPartition(const ui32 parti return; } - Writer = ctx.RegisterWithSameMailbox(NPQ::CreatePartitionWriter( - ctx.SelfID, {}, PartitionTabletId, Partition, ExpectedGeneration, - SourceId, TPartitionWriterOpts().WithDeduplication(UseDeduplication) - )); + InitPartitionWriter("", "", ctx); + State = ES_WAIT_WRITER_INIT; ui32 border = AppData(ctx)->PQConfig.GetWriteInitLatencyBigMs(); @@ -1146,11 +1141,21 @@ void TWriteSessionActor<UseMigrationProtocol>::MakeAndSentInitResponse( template<bool UseMigrationProtocol> void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::TEvInitResult::TPtr& ev, const TActorContext& ctx) { - if (State != ES_WAIT_WRITER_INIT) { - return CloseSession("got init result but not wait for it", PersQueue::ErrorCode::ERROR, ctx); + const auto& result = *ev->Get(); + + auto writer = FindPartitionWriter(result.SessionId, result.TxId); + Y_VERIFY(writer != nullptr); + + if (!result.SessionId && !result.TxId) { + if (State != ES_WAIT_WRITER_INIT) { + return CloseSession("got init result but not wait for it", PersQueue::ErrorCode::ERROR, ctx); + } + } else { + if (State != ES_INITED) { + return CloseSession("got init result but not wait for it", PersQueue::ErrorCode::ERROR, ctx); + } } - const auto& result = *ev->Get(); if (!result.IsSuccess()) { const auto& error = result.GetError(); if (error.Response.HasErrorCode()) { @@ -1160,13 +1165,17 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T } } - OwnerCookie = result.GetResult().OwnerCookie; + writer->OnEvInitResult(ev); + const auto& maxSeqNo = result.GetResult().SourceIdInfo.GetSeqNo(); if (!UseDeduplication) { Y_VERIFY(maxSeqNo == 0); } - MakeAndSentInitResponse(maxSeqNo, ctx); + if (!result.SessionId && !result.TxId) { + OwnerCookie = result.GetResult().OwnerCookie; + MakeAndSentInitResponse(maxSeqNo, ctx); + } } template<bool UseMigrationProtocol> @@ -1175,18 +1184,21 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T return CloseSession("got write permission but not wait for it", PersQueue::ErrorCode::ERROR, ctx); } - Y_VERIFY(!SentRequests.empty()); - auto writeRequest = std::move(SentRequests.front()); + auto* writer = FindPartitionWriter(ev->Get()->SessionId, ev->Get()->TxId); + Y_VERIFY(writer != nullptr); + + Y_VERIFY(!writer->SentRequests.empty()); + auto writeRequest = std::move(writer->SentRequests.front()); if (ev->Get()->Cookie != writeRequest->Cookie) { return CloseSession("out of order reserve bytes response from server, may be previous is lost", PersQueue::ErrorCode::ERROR, ctx); } - SentRequests.pop_front(); + writer->SentRequests.pop_front(); ui64 diff = writeRequest->ByteSize; - AcceptedRequests.emplace_back(std::move(writeRequest)); + writer->AcceptedRequests.emplace_back(std::move(writeRequest)); BytesInflight_ -= diff; if (BytesInflight) { @@ -1201,22 +1213,21 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T } } - if (!IsQuotaRequired() && PendingRequest) { - SendRequest(std::move(PendingRequest), ctx); - } else if (!QuotedRequests.empty()) { - SendRequest(std::move(QuotedRequests.front()), ctx); - QuotedRequests.pop_front(); + if (!IsQuotaRequired() && !PendingRequests.empty()) { + SendRequest(*writer, std::move(PendingRequests.front()), ctx); + PendingRequests.pop_front(); + } else if (!writer->QuotedRequests.empty()) { + SendRequest(*writer, std::move(writer->QuotedRequests.front()), ctx); + writer->QuotedRequests.pop_front(); } } template<bool UseMigrationProtocol> void TWriteSessionActor<UseMigrationProtocol>::ProcessWriteResponse( - const NKikimrClient::TPersQueuePartitionResponse& response, const TActorContext& ctx + const NKikimrClient::TPersQueuePartitionResponse& response, TPartitionWriter& writer, const TActorContext& ctx ) { - - auto writeRequest = std::move(AcceptedRequests.front()); - AcceptedRequests.pop_front(); - + auto writeRequest = std::move(writer.AcceptedRequests.front()); + writer.AcceptedRequests.pop_front(); auto addAckMigration = [this]( const TPersQueuePartitionResponse::TCmdWriteResult& res, @@ -1361,19 +1372,21 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T } } + auto* writer = FindPartitionWriter(result.SessionId, result.TxId); + Y_VERIFY(writer != nullptr); - if (AcceptedRequests.empty()) { + if (writer->AcceptedRequests.empty()) { CloseSession("got too many replies from server, internal error", PersQueue::ErrorCode::ERROR, ctx); return; } - const auto& writeRequest = AcceptedRequests.front(); + const auto& writeRequest = writer->AcceptedRequests.front(); const auto& resp = result.Record.GetPartitionResponse(); if (resp.GetCookie() != writeRequest->Cookie) { return CloseSession("out of order write response from server, may be previous is lost", PersQueue::ErrorCode::ERROR, ctx); } - ProcessWriteResponse(resp, ctx); + ProcessWriteResponse(resp, *writer, ctx); } template<bool UseMigrationProtocol> @@ -1399,11 +1412,39 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvTabletPipe::TEvClientDe template<bool UseMigrationProtocol> void TWriteSessionActor<UseMigrationProtocol>::PrepareRequest(THolder<TEvWrite>&& ev, const TActorContext& ctx) { - if (!PendingRequest) { - PendingRequest = new TWriteRequestInfo(++NextRequestCookie); + const auto& writeRequest = ev->Request.write_request(); + + if constexpr (!UseMigrationProtocol) { + if (writeRequest.has_tx() && !AppData(ctx)->FeatureFlags.GetEnableTopicServiceTx()) { + CloseSession("Disabled transaction support for TopicService.", + PersQueue::ErrorCode::ERROR, ctx); + return; + } } - auto& request = PendingRequest->PartitionWriteRequest->Record; + if (PendingRequests.empty()) { + PendingRequests.emplace_back(new TWriteRequestInfo(++NextRequestCookie)); + } else if constexpr (!UseMigrationProtocol) { + Y_VERIFY(!PendingRequests.back()->UserWriteRequests.empty()); + + auto& last = PendingRequests.back()->UserWriteRequests.back()->Request.write_request(); + + if (writeRequest.has_tx()) { + if (last.has_tx()) { + if ((writeRequest.tx().session() != last.tx().session()) || + (writeRequest.tx().id() != last.tx().id())) { + PendingRequests.emplace_back(new TWriteRequestInfo(++NextRequestCookie)); + } + } else { + PendingRequests.emplace_back(new TWriteRequestInfo(++NextRequestCookie)); + } + } else if (last.has_tx()) { + PendingRequests.emplace_back(new TWriteRequestInfo(++NextRequestCookie)); + } + } + + auto pendingRequest = PendingRequests.back(); + auto& request = pendingRequest->PartitionWriteRequest->Record; ui64 payloadSize = 0; auto addDataMigration = [&](const StreamingWriteClientMessage::WriteRequest& writeRequest, const i32 messageIndex) { @@ -1450,7 +1491,6 @@ void TWriteSessionActor<UseMigrationProtocol>::PrepareRequest(THolder<TEvWrite>& maxMessageMetadataSize = std::max(maxMessageMetadataSize, currMetadataSize); }; - const auto& writeRequest = ev->Request.write_request(); if constexpr (UseMigrationProtocol) { for (i32 messageIndex = 0; messageIndex != writeRequest.sequence_numbers_size(); ++messageIndex) { addDataMigration(writeRequest, messageIndex); @@ -1459,15 +1499,21 @@ void TWriteSessionActor<UseMigrationProtocol>::PrepareRequest(THolder<TEvWrite>& for (i32 messageIndex = 0; messageIndex != writeRequest.messages_size(); ++messageIndex) { addData(writeRequest, messageIndex); } + + if (writeRequest.has_tx()) { + InitPartitionWriter(writeRequest.tx().session(), writeRequest.tx().id(), ctx); + } } - PendingRequest->UserWriteRequests.push_back(std::move(ev)); - PendingRequest->ByteSize = request.ByteSize(); + pendingRequest->UserWriteRequests.push_back(std::move(ev)); + pendingRequest->ByteSize = request.ByteSize(); + auto msgMetaEnabled = AppData(ctx)->FeatureFlags.GetEnableTopicMessageMeta(); if (!msgMetaEnabled && maxMessageMetadataSize > 0) { CloseSession("Message level metadata support is disabled on server size", PersQueue::ErrorCode::BAD_REQUEST, ctx); return; } + if (maxMessageMetadataSize > MAX_METADATA_SIZE_PER_MESSAGE) { CloseSession( TStringBuilder() << "Message level metadata size is limited to " << MAX_METADATA_SIZE_PER_MESSAGE @@ -1476,21 +1522,36 @@ void TWriteSessionActor<UseMigrationProtocol>::PrepareRequest(THolder<TEvWrite>& ); return; } + if (const auto ru = CalcRuConsumption(payloadSize)) { - PendingRequest->RequiredQuota += ru; - if (MaybeRequestQuota(PendingRequest->RequiredQuota, EWakeupTag::RlAllowed, ctx)) { - Y_VERIFY(!PendingQuotaRequest); - PendingQuotaRequest = std::move(PendingRequest); + pendingRequest->RequiredQuota += ru; + + if (!PendingQuotaRequest) { + if (MaybeRequestQuota(PendingRequests.front()->RequiredQuota, EWakeupTag::RlAllowed, ctx)) { + PendingQuotaRequest = std::move(PendingRequests.front()); + PendingRequests.pop_front(); + } } } else { - if (!PendingQuotaRequest && SentRequests.size() < MAX_RESERVE_REQUESTS_INFLIGHT) { - SendRequest(std::move(PendingRequest), ctx); + TPartitionWriter* writer = nullptr; + if constexpr (UseMigrationProtocol) { + Y_VERIFY(Writers.size() == 1); + writer = &Writers.begin()->second; + } else { + const auto& [sessionId, txId] = pendingRequest->GetTransactionId(); + writer = FindPartitionWriter(sessionId, txId); + Y_VERIFY(writer != nullptr); + } + + if (!PendingQuotaRequest && writer->SentRequests.size() < MAX_RESERVE_REQUESTS_INFLIGHT) { + SendRequest(*writer, std::move(PendingRequests.front()), ctx); + PendingRequests.pop_front(); } } } template<bool UseMigrationProtocol> -void TWriteSessionActor<UseMigrationProtocol>::SendRequest(typename TWriteRequestInfo::TPtr&& request, const TActorContext& ctx) { +void TWriteSessionActor<UseMigrationProtocol>::SendRequest(TPartitionWriter& writer, typename TWriteRequestInfo::TPtr&& request, const TActorContext& ctx) { Y_VERIFY(request->PartitionWriteRequest); i64 diff = 0; @@ -1507,8 +1568,8 @@ void TWriteSessionActor<UseMigrationProtocol>::SendRequest(typename TWriteReques BytesInflightTotal.Inc(diff); } - ctx.Send(Writer, std::move(request->PartitionWriteRequest)); - SentRequests.push_back(std::move(request)); + ctx.Send(writer.PartitionWriterActor, std::move(request->PartitionWriteRequest)); + writer.SentRequests.push_back(std::move(request)); } template<bool UseMigrationProtocol> @@ -1754,21 +1815,29 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvents::TEvWakeup::TPtr& case EWakeupTag::RecheckAcl: return RecheckACL(ctx); - case EWakeupTag::RlAllowed: + case EWakeupTag::RlAllowed: { if (auto counters = Request->GetCounters()) { counters->AddConsumedRequestUnits(PendingQuotaRequest->RequiredQuota); } - if (SentRequests.size() < MAX_RESERVE_REQUESTS_INFLIGHT) { - SendRequest(std::move(PendingQuotaRequest), ctx); + const auto& [sessionId, txId] = PendingQuotaRequest->GetTransactionId(); + auto* writer = FindPartitionWriter(sessionId, txId); + Y_VERIFY(writer != nullptr); + + if (writer->SentRequests.size() < MAX_RESERVE_REQUESTS_INFLIGHT) { + SendRequest(*writer, std::move(PendingQuotaRequest), ctx); } else { - QuotedRequests.push_back(std::move(PendingQuotaRequest)); + writer->QuotedRequests.push_back(std::move(PendingQuotaRequest)); } - if (PendingQuotaRequest = std::move(PendingRequest)) { - Y_VERIFY(MaybeRequestQuota(PendingQuotaRequest->RequiredQuota, EWakeupTag::RlAllowed, ctx)); + if (!PendingRequests.empty()) { + Y_VERIFY(MaybeRequestQuota(PendingRequests.front()->RequiredQuota, EWakeupTag::RlAllowed, ctx)); + PendingQuotaRequest = std::move(PendingRequests.front()); + PendingRequests.pop_front(); } + break; + } case EWakeupTag::RlNoResource: case EWakeupTag::RlInitNoResource: @@ -1803,6 +1872,74 @@ void TWriteSessionActor<UseMigrationProtocol>::RecheckACL(const TActorContext& c } } +template<bool UseMigrationProtocol> +auto TWriteSessionActor<UseMigrationProtocol>::FindPartitionWriter(const TString& sessionId, const TString& txId) -> TPartitionWriter* +{ + auto p = Writers.find(std::make_pair(sessionId, txId)); + if (p == Writers.end()) { + return nullptr; + } + return &p->second; } + +template<bool UseMigrationProtocol> +void TWriteSessionActor<UseMigrationProtocol>::InitPartitionWriter(const TString& sessionId, const TString& txId, + const TActorContext& ctx) +{ + auto key = std::make_pair(sessionId, txId); + if (auto p = Writers.find(key); p != Writers.end()) { + return; + } + + auto [p, _] = Writers.emplace(std::piecewise_construct, + std::forward_as_tuple(key), + std::forward_as_tuple(BytesInflight, + BytesInflightTotal, + BytesInflight_, + BytesInflightTotal_)); + auto& writer = p->second; + + auto it = PartitionToTablet.find(Partition); + ui64 tabletId = it != PartitionToTablet.end() ? it->second : 0; + + TPartitionWriterOpts opts; + opts.WithDeduplication(UseDeduplication); + if constexpr (!UseMigrationProtocol) { + if (sessionId && txId) { + Y_VERIFY(Request->GetDatabaseName()); + opts.WithDatabase(*Request->GetDatabaseName()); + opts.WithTopicPath(InitRequest.path()); + opts.WithSessionId(sessionId); + opts.WithTxId(txId); + if (Request->GetSerializedToken()) { + opts.WithToken(Request->GetSerializedToken()); + } + if (Request->GetTraceId()) { + opts.WithTraceId(*Request->GetTraceId()); + } + if (Request->GetRequestType()) { + opts.WithRequestType(*Request->GetRequestType()); + } + } + } + + TActorId actor = ctx.RegisterWithSameMailbox(NPQ::CreatePartitionWriter( + ctx.SelfID, {}, tabletId, Partition, ExpectedGeneration, + SourceId, opts + )); + writer.PartitionWriterActor = actor; +} + +template<bool UseMigrationProtocol> +bool TWriteSessionActor<UseMigrationProtocol>::AnyRequests() const +{ + for (auto& [_, writer] : Writers) { + if (writer.AnyRequests()) { + return true; + } + } + return false; +} + } } |