aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorabcdef <akotov@ydb.tech>2023-08-30 10:13:54 +0300
committerabcdef <akotov@ydb.tech>2023-08-30 10:35:19 +0300
commitf0ee5fd09e79f0c4a3d7972f91c2d7eb17518cd8 (patch)
tree706dcdf676b9d490b4eb6084f887c5dd58c8f1ae
parentd6dfde60c0bc97c0acffa53994186d90c6144fa9 (diff)
downloadydb-f0ee5fd09e79f0c4a3d7972f91c2d7eb17518cd8.tar.gz
запись в топик в транзакции. изменения в SDK и KQP
-rw-r--r--ydb/core/kqp/common/events/query.h2
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp13
-rw-r--r--ydb/core/kqp/session_actor/kqp_query_state.cpp1
-rw-r--r--ydb/core/kqp/session_actor/kqp_query_state.h2
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp44
-rw-r--r--ydb/core/kqp/topics/kqp_topics.cpp15
-rw-r--r--ydb/core/kqp/topics/kqp_topics.h5
-rw-r--r--ydb/core/persqueue/pq_impl.cpp20
-rw-r--r--ydb/core/persqueue/writer/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/core/persqueue/writer/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/persqueue/writer/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/core/persqueue/writer/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/core/persqueue/writer/writer.cpp119
-rw-r--r--ydb/core/persqueue/writer/writer.h59
-rw-r--r--ydb/core/persqueue/writer/ya.make2
-rw-r--r--ydb/core/protos/kqp.proto9
-rw-r--r--ydb/core/protos/msgbus_pq.proto1
-rw-r--r--ydb/core/protos/pqconfig.proto2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp5
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp17
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h25
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/topic.h11
-rw-r--r--ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp49
-rw-r--r--ydb/services/persqueue_v1/actors/partition_writer.cpp109
-rw-r--r--ydb/services/persqueue_v1/actors/partition_writer.h59
-rw-r--r--ydb/services/persqueue_v1/actors/write_request_info.h50
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.h49
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.ipp247
29 files changed, 784 insertions, 141 deletions
diff --git a/ydb/core/kqp/common/events/query.h b/ydb/core/kqp/common/events/query.h
index 49bd2cbac2..a73a6e9e0a 100644
--- a/ydb/core/kqp/common/events/query.h
+++ b/ydb/core/kqp/common/events/query.h
@@ -55,7 +55,7 @@ public:
return RequestCtx ? false : Record.HasYdbStatus();
}
- const ::NKikimrKqp::TTopicOperations& GetTopicOperations() const {
+ const ::NKikimrKqp::TTopicOperationsRequest& GetTopicOperations() const {
return Record.GetRequest().GetTopicOperations();
}
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 6a73fbac76..a6c26a6988 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -2261,10 +2261,9 @@ private:
}
void ExecuteTopicTabletTransactions(TTopicTabletTxs& topicTxs) {
- auto lockTxId = Request.AcquireLocksTxId;
- if (lockTxId.Defined() && *lockTxId == 0) {
- lockTxId = TxId;
- LockHandle = TLockHandle(TxId, TActivationContext::ActorSystem());
+ TMaybe<ui64> writeId;
+ if (Request.TopicOperations.HasWriteId()) {
+ writeId = Request.TopicOperations.GetWriteId();
}
for (auto& tx : topicTxs) {
@@ -2273,8 +2272,8 @@ private:
auto ev = std::make_unique<TEvPersQueue::TEvProposeTransaction>();
- if (lockTxId) {
- transaction.SetLockTxId(*lockTxId);
+ if (writeId.Defined()) {
+ transaction.SetWriteId(*writeId);
}
transaction.SetImmediate(ImmediateTx);
@@ -2286,7 +2285,7 @@ private:
LOG_D("ExecuteTopicTabletTransaction traceId.verbosity: " << std::to_string(traceId.GetVerbosity()));
LOG_D("Executing KQP transaction on topic tablet: " << tabletId
- << ", lockTxId: " << lockTxId);
+ << ", writeId: " << writeId);
Send(MakePipePeNodeCacheID(false),
new TEvPipeCache::TEvForward(ev.release(), tabletId, true),
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.cpp b/ydb/core/kqp/session_actor/kqp_query_state.cpp
index 95bdefab6d..ccd842f504 100644
--- a/ydb/core/kqp/session_actor/kqp_query_state.cpp
+++ b/ydb/core/kqp/session_actor/kqp_query_state.cpp
@@ -300,4 +300,5 @@ bool TKqpQueryState::HasErrors(const NSchemeCache::TSchemeCacheNavigate& respons
return true;
}
+
}
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h
index 92319c8971..4cc5e80927 100644
--- a/ydb/core/kqp/session_actor/kqp_query_state.h
+++ b/ydb/core/kqp/session_actor/kqp_query_state.h
@@ -230,7 +230,7 @@ public:
return RequestEv->GetQuery();
}
- const ::NKikimrKqp::TTopicOperations& GetTopicOperations() const {
+ const ::NKikimrKqp::TTopicOperationsRequest& GetTopicOperations() const {
return RequestEv->GetTopicOperations();
}
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 70336809bf..75a6cd2ae4 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -986,8 +986,8 @@ public:
request.PerShardKeysSizeLimitBytes = Config->_CommitPerShardKeysSizeLimitBytes.Get().GetRef();
}
- if (txCtx.Locks.HasLocks() || txCtx.TopicOperations.HasReadOperations()) {
- if (!txCtx.GetSnapshot().IsValid() || txCtx.TxHasEffects() || txCtx.TopicOperations.HasReadOperations()) {
+ if (txCtx.Locks.HasLocks() || txCtx.TopicOperations.HasOperations()) {
+ if (!txCtx.GetSnapshot().IsValid() || txCtx.TxHasEffects() || txCtx.TopicOperations.HasOperations()) {
LOG_D("TExecPhysicalRequest, tx has commit locks");
request.LocksOp = ELocksOp::Commit;
} else {
@@ -1402,6 +1402,8 @@ public:
bool replyQueryId = false;
bool replyQueryParameters = false;
+ bool replyTopicOperations = false;
+
switch (QueryState->GetAction()) {
case NKikimrKqp::QUERY_ACTION_PREPARE:
replyQueryId = true;
@@ -1417,6 +1419,10 @@ public:
replyQueryParameters = true;
break;
+ case NKikimrKqp::QUERY_ACTION_TOPIC:
+ replyTopicOperations = true;
+ break;
+
default:
break;
}
@@ -1435,6 +1441,12 @@ public:
response->SetPreparedQuery(queryId);
}
+ if (replyTopicOperations) {
+ if (HasTopicWriteId()) {
+ response->MutableTopicOperations()->SetWriteId(GetTopicWriteId());
+ }
+ }
+
// Result for scan query is sent directly to target actor.
Y_VERIFY(response->GetArena());
if (QueryState->PreparedQuery && !QueryState->IsStreamResult()) {
@@ -1996,6 +2008,7 @@ public:
hFunc(TEvKqp::TEvQueryRequest, HandleTopicOps);
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleTopicOps);
+ hFunc(TEvTxUserProxy::TEvAllocateTxIdResult, HandleTopicOps);
hFunc(TEvKqp::TEvCloseSessionRequest, HandleTopicOps);
hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
@@ -2074,7 +2087,11 @@ private:
ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << message;
}
- ReplySuccess();
+ if (HasTopicWriteOperations() && !HasTopicWriteId()) {
+ Send(MakeTxProxyID(), new TEvTxUserProxy::TEvAllocateTxId);
+ } else {
+ ReplySuccess();
+ }
}
void HandleTopicOps(TEvKqp::TEvCloseSessionRequest::TPtr&) {
@@ -2083,6 +2100,27 @@ private:
Counters->ReportSessionActorClosedRequest(Settings.DbCounters);
}
+ void HandleTopicOps(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev) {
+ SetTopicWriteId(NLongTxService::TLockHandle(ev->Get()->TxId, TActivationContext::ActorSystem()));
+ ReplySuccess();
+ }
+
+ bool HasTopicWriteOperations() const {
+ return QueryState->TxCtx->TopicOperations.HasWriteOperations();
+ }
+
+ bool HasTopicWriteId() const {
+ return QueryState->TxCtx->TopicOperations.HasWriteId();
+ }
+
+ ui64 GetTopicWriteId() const {
+ return QueryState->TxCtx->TopicOperations.GetWriteId();
+ }
+
+ void SetTopicWriteId(NLongTxService::TLockHandle handle) {
+ QueryState->TxCtx->TopicOperations.SetWriteId(std::move(handle));
+ }
+
private:
TActorId Owner;
TString SessionId;
diff --git a/ydb/core/kqp/topics/kqp_topics.cpp b/ydb/core/kqp/topics/kqp_topics.cpp
index 7afb30f2f9..5c6a96e1ec 100644
--- a/ydb/core/kqp/topics/kqp_topics.cpp
+++ b/ydb/core/kqp/topics/kqp_topics.cpp
@@ -201,6 +201,21 @@ bool TTopicOperations::HasWriteOperations() const
return HasWriteOperations_;
}
+bool TTopicOperations::HasWriteId() const
+{
+ return WriteId_.GetLockId();
+}
+
+ui64 TTopicOperations::GetWriteId() const
+{
+ return WriteId_.GetLockId();
+}
+
+void TTopicOperations::SetWriteId(NLongTxService::TLockHandle handle)
+{
+ WriteId_ = std::move(handle);
+}
+
bool TTopicOperations::TabletHasReadOperations(ui64 tabletId) const
{
for (auto& [_, value] : Operations_) {
diff --git a/ydb/core/kqp/topics/kqp_topics.h b/ydb/core/kqp/topics/kqp_topics.h
index e285b1b06d..4d0164d512 100644
--- a/ydb/core/kqp/topics/kqp_topics.h
+++ b/ydb/core/kqp/topics/kqp_topics.h
@@ -4,6 +4,7 @@
#include <ydb/public/api/protos/ydb_topic.pb.h>
#include <ydb/core/protos/pqconfig.pb.h>
+#include <ydb/core/tx/long_tx_service/public/lock_handle.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <library/cpp/actors/core/actor.h>
@@ -88,6 +89,9 @@ public:
bool HasOperations() const;
bool HasReadOperations() const;
bool HasWriteOperations() const;
+ bool HasWriteId() const;
+ ui64 GetWriteId() const;
+ void SetWriteId(NLongTxService::TLockHandle handle);
bool TabletHasReadOperations(ui64 tabletId) const;
@@ -117,6 +121,7 @@ private:
bool HasWriteOperations_ = false;
TMaybe<TString> Consumer_;
+ NLongTxService::TLockHandle WriteId_;
};
}
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index 880a278046..3a7235e97f 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -1725,6 +1725,14 @@ void TPersQueue::HandleWriteRequest(const ui64 responseCookie, const TActorId& p
return;
}
+ if (req.HasWriteId()) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE,
+ "Tablet " << TabletID() <<
+ " Write in transaction." <<
+ " Partition: " << req.GetPartition() <<
+ ", WriteId: " << req.GetWriteId());
+ }
+
for (ui32 i = 0; i < req.CmdWriteSize(); ++i) {
const auto& cmd = req.GetCmdWrite(i);
@@ -1882,6 +1890,14 @@ void TPersQueue::HandleReserveBytesRequest(const ui64 responseCookie, const TAct
return;
}
+ if (req.HasWriteId()) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE,
+ "Tablet " << TabletID() <<
+ " Reserve bytes in transaction." <<
+ " Partition: " << req.GetPartition() <<
+ ", WriteId: " << req.GetWriteId());
+ }
+
InitResponseBuilder(responseCookie, 1, COUNTER_LATENCY_PQ_RESERVE_BYTES);
THolder<TEvPQ::TEvReserveBytes> event = MakeHolder<TEvPQ::TEvReserveBytes>(responseCookie, req.GetCmdReserveBytes().GetSize(),
req.GetOwnerCookie(), req.GetMessageNo(), req.GetCmdReserveBytes().GetLastRequest());
@@ -2417,7 +2433,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE,
"Tablet " << TabletID() <<
" tx=" << event.GetTxId() <<
- ", lock_tx_id=" << txBody.GetLockTxId() <<
+ ", write_id=" << txBody.GetWriteId() <<
", path=" << operation.GetPath() <<
", partition=" << operation.GetPartitionId() <<
", consumer=" << operation.GetConsumer() <<
@@ -2426,7 +2442,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
", is_write=" << isWriteOperation);
}
- if (TabletState != NKikimrPQ::ENormal) {
+ if ((TabletState != NKikimrPQ::ENormal) || txBody.HasWriteId()) {
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
event.GetTxId(),
ctx);
diff --git a/ydb/core/persqueue/writer/CMakeLists.darwin-x86_64.txt b/ydb/core/persqueue/writer/CMakeLists.darwin-x86_64.txt
index 29c28de57e..35aa50f7e6 100644
--- a/ydb/core/persqueue/writer/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/persqueue/writer/CMakeLists.darwin-x86_64.txt
@@ -16,6 +16,8 @@ target_link_libraries(core-persqueue-writer PUBLIC
cpp-string_utils-base64
ydb-core-base
core-persqueue-events
+ grpc_services-cancelation-protos
+ kqp-common-simple
ydb-core-protos
public-lib-base
lib-deprecated-kicli
diff --git a/ydb/core/persqueue/writer/CMakeLists.linux-aarch64.txt b/ydb/core/persqueue/writer/CMakeLists.linux-aarch64.txt
index 21840f9a4b..0dc0507f24 100644
--- a/ydb/core/persqueue/writer/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/persqueue/writer/CMakeLists.linux-aarch64.txt
@@ -17,6 +17,8 @@ target_link_libraries(core-persqueue-writer PUBLIC
cpp-string_utils-base64
ydb-core-base
core-persqueue-events
+ grpc_services-cancelation-protos
+ kqp-common-simple
ydb-core-protos
public-lib-base
lib-deprecated-kicli
diff --git a/ydb/core/persqueue/writer/CMakeLists.linux-x86_64.txt b/ydb/core/persqueue/writer/CMakeLists.linux-x86_64.txt
index 21840f9a4b..0dc0507f24 100644
--- a/ydb/core/persqueue/writer/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/persqueue/writer/CMakeLists.linux-x86_64.txt
@@ -17,6 +17,8 @@ target_link_libraries(core-persqueue-writer PUBLIC
cpp-string_utils-base64
ydb-core-base
core-persqueue-events
+ grpc_services-cancelation-protos
+ kqp-common-simple
ydb-core-protos
public-lib-base
lib-deprecated-kicli
diff --git a/ydb/core/persqueue/writer/CMakeLists.windows-x86_64.txt b/ydb/core/persqueue/writer/CMakeLists.windows-x86_64.txt
index 29c28de57e..35aa50f7e6 100644
--- a/ydb/core/persqueue/writer/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/persqueue/writer/CMakeLists.windows-x86_64.txt
@@ -16,6 +16,8 @@ target_link_libraries(core-persqueue-writer PUBLIC
cpp-string_utils-base64
ydb-core-base
core-persqueue-events
+ grpc_services-cancelation-protos
+ kqp-common-simple
ydb-core-protos
public-lib-base
lib-deprecated-kicli
diff --git a/ydb/core/persqueue/writer/writer.cpp b/ydb/core/persqueue/writer/writer.cpp
index 36d13fe700..edb24303e8 100644
--- a/ydb/core/persqueue/writer/writer.cpp
+++ b/ydb/core/persqueue/writer/writer.cpp
@@ -5,7 +5,10 @@
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/log.h>
+#include <ydb/core/base/path.h>
#include <ydb/core/base/tablet_pipe.h>
+#include <ydb/core/kqp/common/events/events.h>
+#include <ydb/core/kqp/common/simple/services.h>
#include <ydb/core/persqueue/events/global.h>
#include <ydb/core/persqueue/pq_rl_helpers.h>
#include <ydb/public/lib/base/msgbus_status.h>
@@ -31,10 +34,14 @@ namespace NKikimr::NPQ {
static const ui64 WRITE_BLOCK_SIZE = 4_KB;
TString TEvPartitionWriter::TEvInitResult::TSuccess::ToString() const {
- return TStringBuilder() << "Success {"
+ auto out = TStringBuilder() << "Success {"
<< " OwnerCookie: " << OwnerCookie
- << " SourceIdInfo: " << SourceIdInfo.ShortDebugString()
- << " }";
+ << " SourceIdInfo: " << SourceIdInfo.ShortDebugString();
+ if (WriteId != INVALID_WRITE_ID) {
+ out << " WriteId: " << WriteId;
+ }
+ out << " }";
+ return out;
}
TString TEvPartitionWriter::TEvInitResult::TError::ToString() const {
@@ -47,6 +54,8 @@ TString TEvPartitionWriter::TEvInitResult::TError::ToString() const {
TString TEvPartitionWriter::TEvInitResult::ToString() const {
auto out = TStringBuilder() << ToStringHeader() << " {";
+ out << " SessionId: " << SessionId;
+ out << " TxId: " << TxId;
if (IsSuccess()) {
out << " " << GetResult().ToString();
} else {
@@ -59,6 +68,8 @@ TString TEvPartitionWriter::TEvInitResult::ToString() const {
TString TEvPartitionWriter::TEvWriteAccepted::ToString() const {
return TStringBuilder() << ToStringHeader() << " {"
+ << " SessionId: " << SessionId
+ << " TxId: " << TxId
<< " Cookie: " << Cookie
<< " }";
}
@@ -67,6 +78,8 @@ TString TEvPartitionWriter::TEvWriteResponse::DumpError() const {
Y_VERIFY(!IsSuccess());
return TStringBuilder() << "Error {"
+ << " SessionId: " << SessionId
+ << " TxId: " << TxId
<< " Reason: " << GetError().Reason
<< " Response: " << Record.ShortDebugString()
<< " }";
@@ -75,6 +88,8 @@ TString TEvPartitionWriter::TEvWriteResponse::DumpError() const {
TString TEvPartitionWriter::TEvWriteResponse::ToString() const {
auto out = TStringBuilder() << ToStringHeader() << " {";
+ out << " SessionId: " << SessionId;
+ out << " TxId: " << TxId;
if (IsSuccess()) {
out << " Success { Response: " << Record.ShortDebugString() << " }";
} else {
@@ -163,7 +178,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
template <typename... Args>
void SendInitResult(Args&&... args) {
- Send(Client, new TEvPartitionWriter::TEvInitResult(std::forward<Args>(args)...));
+ Send(Client, new TEvPartitionWriter::TEvInitResult(Opts.SessionId, Opts.TxId, std::forward<Args>(args)...));
}
void InitResult(const TString& reason, NKikimrClient::TResponse&& response) {
@@ -171,13 +186,13 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
BecomeZombie(TEvPartitionWriter::TEvWriteResponse::EErrors::InternalError, "Init error");
}
- void InitResult(const TString& ownerCookie, const TEvPartitionWriter::TEvInitResult::TSourceIdInfo& sourceIdInfo) {
- SendInitResult(ownerCookie, sourceIdInfo);
+ void InitResult(const TString& ownerCookie, const TEvPartitionWriter::TEvInitResult::TSourceIdInfo& sourceIdInfo, ui64 writeId) {
+ SendInitResult(ownerCookie, sourceIdInfo, writeId);
}
template <typename... Args>
void SendWriteResult(Args&&... args) {
- Send(Client, new TEvPartitionWriter::TEvWriteResponse(std::forward<Args>(args)...));
+ Send(Client, new TEvPartitionWriter::TEvWriteResponse(Opts.SessionId, Opts.TxId, std::forward<Args>(args)...));
}
void WriteResult(TEvPartitionWriter::TEvWriteResponse::EErrors errorCode, const TString& reason, NKikimrClient::TResponse&& response) {
@@ -191,7 +206,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
}
void WriteAccepted(ui64 cookie) {
- Send(Client, new TEvPartitionWriter::TEvWriteAccepted(cookie));
+ Send(Client, new TEvPartitionWriter::TEvWriteAccepted(Opts.SessionId, Opts.TxId, cookie));
}
void Disconnected(TEvPartitionWriter::TEvWriteResponse::EErrors errorCode) {
@@ -199,6 +214,77 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
BecomeZombie(errorCode, "Disconnected");
}
+ /// GetWriteId
+
+ void GetWriteId(const TActorContext& ctx) {
+ auto ev = MakeWriteIdRequest();
+ ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
+ Become(&TThis::StateGetWriteId);
+ }
+
+ STATEFN(StateGetWriteId) {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(NKqp::TEvKqp::TEvQueryResponse, HandleWriteId);
+ hFunc(TEvPartitionWriter::TEvWriteRequest, HoldPending);
+ default:
+ return StateBase(ev);
+ }
+ }
+
+ void HandleWriteId(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
+ Y_UNUSED(ctx);
+
+ auto& record = ev->Get()->Record.GetRef();
+ WriteId = record.GetResponse().GetTopicOperations().GetWriteId();
+
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_WRITE_PROXY,
+ "SessionId: " << Opts.SessionId <<
+ " TxId: " << Opts.TxId <<
+ " WriteId: " << WriteId);
+
+ GetOwnership();
+ }
+
+ THolder<NKqp::TEvKqp::TEvQueryRequest> MakeWriteIdRequest() {
+ auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
+
+ if (Opts.Token) {
+ ev->Record.SetUserToken(Opts.Token);
+ }
+ //ev->Record.SetRequestActorId(???);
+
+ ev->Record.MutableRequest()->SetDatabase(CanonizePath(Opts.Database));
+ ev->Record.MutableRequest()->SetSessionId(Opts.SessionId);
+ ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_UNDEFINED);
+ ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_TOPIC);
+
+ if (Opts.TraceId) {
+ ev->Record.SetTraceId(Opts.TraceId);
+ }
+
+ if (Opts.RequestType) {
+ ev->Record.SetRequestType(Opts.RequestType);
+ }
+
+ //ev->Record.MutableRequest()->SetCancelAfterMs(???);
+ //ev->Record.MutableRequest()->SetTimeoutMs(???);
+
+ ev->Record.MutableRequest()->MutableTxControl()->set_tx_id(Opts.TxId);
+
+ auto* topics = ev->Record.MutableRequest()->MutableTopicOperations()->AddTopics();
+ topics->set_path(Opts.TopicPath);
+ auto* partitions = topics->add_partitions();
+ partitions->set_partition_id(PartitionId);
+
+ return ev;
+ }
+
+ void SetWriteId(NKikimrClient::TPersQueuePartitionRequest& request) {
+ if (WriteId != INVALID_WRITE_ID) {
+ request.SetWriteId(WriteId);
+ }
+ }
+
/// GetOwnership
void GetOwnership() {
@@ -298,7 +384,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
}
}
- InitResult(OwnerCookie, sourceIdInfo);
+ InitResult(OwnerCookie, sourceIdInfo, WriteId);
Become(&TThis::StateWork);
if (Pending) {
@@ -399,6 +485,8 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
auto& request = *ev->Record.MutablePartitionRequest();
request.SetMessageNo(MessageNo++);
+ SetWriteId(request);
+
auto& cmd = *request.MutableCmdReserveBytes();
cmd.SetSize(it->second.ByteSize());
cmd.SetLastRequest(false);
@@ -511,6 +599,8 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
auto& request = *ev->Record.MutablePartitionRequest();
request.SetMessageNo(MessageNo++);
+ SetWriteId(request);
+
if (!Opts.UseDeduplication) {
request.SetPartition(PartitionId);
}
@@ -666,7 +756,7 @@ public:
}
}
- void Bootstrap() {
+ void Bootstrap(const TActorContext& ctx) {
NTabletPipe::TClientConfig config;
config.RetryPolicy = {
.RetryLimitCount = 6,
@@ -677,7 +767,12 @@ public:
};
PipeClient = RegisterWithSameMailbox(NTabletPipe::CreateClient(SelfId(), TabletId, config));
- GetOwnership();
+
+ if (Opts.Database && Opts.SessionId && Opts.TxId) {
+ GetWriteId(ctx);
+ } else {
+ GetOwnership();
+ }
}
STATEFN(StateBase) {
@@ -697,6 +792,7 @@ public:
}
private:
+
const TActorId Client;
const ui64 TabletId;
const ui32 PartitionId;
@@ -731,6 +827,7 @@ private:
TEvPartitionWriter::TEvWriteResponse::EErrors ErrorCode = TEvPartitionWriter::TEvWriteResponse::EErrors::InternalError;
+ ui64 WriteId = INVALID_WRITE_ID;
}; // TPartitionWriter
IActor* CreatePartitionWriter(const TActorId& client, const std::optional<TString>& topicPath, ui64 tabletId, ui32 partitionId,
diff --git a/ydb/core/persqueue/writer/writer.h b/ydb/core/persqueue/writer/writer.h
index fb2f8dd260..b53844cfbf 100644
--- a/ydb/core/persqueue/writer/writer.h
+++ b/ydb/core/persqueue/writer/writer.h
@@ -11,6 +11,8 @@
namespace NKikimr::NPQ {
+constexpr ui64 INVALID_WRITE_ID = Max<ui64>();
+
struct TEvPartitionWriter {
enum EEv {
EvInitResult = EventSpaceBegin(TKikimrEvents::ES_PQ_PARTITION_WRITER),
@@ -30,24 +32,35 @@ struct TEvPartitionWriter {
struct TSuccess {
TString OwnerCookie;
TSourceIdInfo SourceIdInfo;
+ ui64 WriteId = INVALID_WRITE_ID;
+
TString ToString() const;
};
struct TError {
TString Reason;
NKikimrClient::TResponse Response;
+
TString ToString() const;
};
+ TString SessionId;
+ TString TxId;
std::variant<TSuccess, TError> Result;
- explicit TEvInitResult(const TString& ownerCookie, const TSourceIdInfo& sourceIdInfo)
- : Result(TSuccess{ownerCookie, sourceIdInfo})
+ TEvInitResult(const TString& sessionId, const TString& txId,
+ const TString& ownerCookie, const TSourceIdInfo& sourceIdInfo, ui64 writeId)
+ : SessionId(sessionId)
+ , TxId(txId)
+ , Result(TSuccess{ownerCookie, sourceIdInfo, writeId})
{
}
- explicit TEvInitResult(const TString& reason, NKikimrClient::TResponse&& response)
- : Result(TError{reason, std::move(response)})
+ TEvInitResult(const TString& sessionId, const TString& txId,
+ const TString& reason, NKikimrClient::TResponse&& response)
+ : SessionId(sessionId)
+ , TxId(txId)
+ , Result(TError{reason, std::move(response)})
{
}
@@ -67,10 +80,14 @@ struct TEvPartitionWriter {
};
struct TEvWriteAccepted: public TEventLocal<TEvWriteAccepted, EvWriteAccepted> {
+ TString SessionId;
+ TString TxId;
ui64 Cookie;
- explicit TEvWriteAccepted(ui64 cookie)
- : Cookie(cookie)
+ TEvWriteAccepted(const TString& sessionId, const TString& txId, ui64 cookie)
+ : SessionId(sessionId)
+ , TxId(txId)
+ , Cookie(cookie)
{
}
@@ -95,18 +112,26 @@ struct TEvPartitionWriter {
TString Reason;
};
+ TString SessionId;
+ TString TxId;
std::variant<TSuccess, TError> Result;
TEvWriteResponse() = default;
- explicit TEvWriteResponse(NKikimrClient::TResponse&& response)
- : Result(TSuccess{})
+ TEvWriteResponse(const TString& sessionId, const TString& txId,
+ NKikimrClient::TResponse&& response)
+ : SessionId(sessionId)
+ , TxId(txId)
+ , Result(TSuccess{})
{
Record = std::move(response);
}
- explicit TEvWriteResponse(const EErrors code, const TString& reason, NKikimrClient::TResponse&& response)
- : Result(TError{code, reason})
+ TEvWriteResponse(const TString& sessionId, const TString& txId,
+ const EErrors code, const TString& reason, NKikimrClient::TResponse&& response)
+ : SessionId(sessionId)
+ , TxId(txId)
+ , Result(TError{code, reason})
{
Record = std::move(response);
}
@@ -126,6 +151,13 @@ struct TPartitionWriterOpts {
bool CheckState = false;
bool AutoRegister = false;
bool UseDeduplication = true;
+ TString Database;
+ TString TopicPath;
+ TString Token;
+ TString SessionId;
+ TString TxId;
+ TString TraceId;
+ TString RequestType;
std::optional<NKikimrPQ::TPQTabletConfig::EMeteringMode> MeteringMode;
TRlContext RlCtx;
@@ -136,6 +168,13 @@ struct TPartitionWriterOpts {
TPartitionWriterOpts& WithAutoRegister(bool value) { AutoRegister = value; return *this; }
TPartitionWriterOpts& WithDeduplication(bool value) { UseDeduplication = value; return *this; }
TPartitionWriterOpts& WithCheckRequestUnits(const NKikimrPQ::TPQTabletConfig::EMeteringMode meteringMode , const TRlContext& rlCtx) { MeteringMode = meteringMode; RlCtx = rlCtx; return *this; }
+ TPartitionWriterOpts& WithDatabase(const TString& value) { Database = value; return *this; }
+ TPartitionWriterOpts& WithTopicPath(const TString& value) { TopicPath = value; return *this; }
+ TPartitionWriterOpts& WithToken(const TString& value) { Token = value; return *this; }
+ TPartitionWriterOpts& WithSessionId(const TString& value) { SessionId = value; return *this; }
+ TPartitionWriterOpts& WithTxId(const TString& value) { TxId = value; return *this; }
+ TPartitionWriterOpts& WithTraceId(const TString& value) { TraceId = value; return *this; }
+ TPartitionWriterOpts& WithRequestType(const TString& value) { RequestType = value; return *this; }
};
IActor* CreatePartitionWriter(const TActorId& client, const std::optional<TString>& topicPath, ui64 tabletId, ui32 partitionId, const std::optional<ui32> expectedGeneration, const TString& sourceId,
diff --git a/ydb/core/persqueue/writer/ya.make b/ydb/core/persqueue/writer/ya.make
index 0ab2b4e9ab..d8848d37cc 100644
--- a/ydb/core/persqueue/writer/ya.make
+++ b/ydb/core/persqueue/writer/ya.make
@@ -12,6 +12,8 @@ PEERDIR(
library/cpp/string_utils/base64
ydb/core/base
ydb/core/persqueue/events
+ ydb/core/grpc_services/cancelation/protos
+ ydb/core/kqp/common/simple
ydb/core/protos
ydb/public/lib/base
ydb/public/lib/deprecated/kicli
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index 4fd27d332c..bf18c4358f 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -71,11 +71,15 @@ enum EQueryReplyFlags {
QUERY_REPLY_FLAG_AST = 4;
};
-message TTopicOperations {
+message TTopicOperationsRequest {
optional string Consumer = 1;
repeated Ydb.Topic.UpdateOffsetsInTransactionRequest.TopicOffsets Topics = 2;
}
+message TTopicOperationsResponse {
+ optional int64 WriteId = 1;
+};
+
message TQueryRequest {
optional bytes SessionId = 1;
optional string Query = 2;
@@ -98,7 +102,7 @@ message TQueryRequest {
reserved 19; // (deprecated) StatsMode
optional NYql.NDqProto.EDqStatsMode StatsMode = 20; // deprecated
optional Ydb.Table.QueryStatsCollection.Mode CollectStats = 21;
- optional TTopicOperations TopicOperations = 22;
+ optional TTopicOperationsRequest TopicOperations = 22;
optional bool UsePublicResponseDataFormat = 23;
map<string, Ydb.TypedValue> YdbParameters = 24;
optional bool IsInternalCall = 25;
@@ -256,6 +260,7 @@ message TQueryResponse {
optional Ydb.Table.TransactionMeta TxMeta = 11;
optional NKqpProto.TKqpStatsQuery QueryStats = 12;
repeated Ydb.ResultSet YdbResults = 13;
+ optional TTopicOperationsResponse TopicOperations = 14;
}
message TEvQueryResponse {
diff --git a/ydb/core/protos/msgbus_pq.proto b/ydb/core/protos/msgbus_pq.proto
index ba7c162d32..d3d09f1b3f 100644
--- a/ydb/core/protos/msgbus_pq.proto
+++ b/ydb/core/protos/msgbus_pq.proto
@@ -120,6 +120,7 @@ message TPersQueuePartitionRequest {
optional string OwnerCookie = 3; //mandatory for write
optional int64 MessageNo = 12; //mandatory for write
optional int64 CmdWriteOffset = 13; //optional
+ optional int64 WriteId = 23;
repeated TCmdWrite CmdWrite = 4;
optional TCmdGetMaxSeqNo CmdGetMaxSeqNo = 5;
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index 719d27f454..335cd494ea 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -861,7 +861,7 @@ message TDataTransaction {
repeated uint64 SendingShards = 3;
repeated uint64 ReceivingShards = 4;
optional bool Immediate = 5;
- optional fixed64 LockTxId = 6;
+ optional fixed64 WriteId = 6;
}
message TConfigTransaction {
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp
index 0cfdf77356..593a036292 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.cpp
@@ -54,6 +54,11 @@ void TWriteSession::WriteEncoded(TContinuationToken&& token, TStringBuf data, EC
Impl->WriteInternal(std::move(token), std::move(message));
}
+void TWriteSession::WriteEncoded(TContinuationToken&& token, TWriteMessage&& message)
+{
+ Impl->WriteInternal(std::move(token), std::move(message));
+}
+
void TWriteSession::Write(TContinuationToken&& token, TStringBuf data, TMaybe<ui64> seqNo,
TMaybe<TInstant> createTimestamp) {
TWriteMessage message{data};
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h
index a12c61b4b9..d4e11d3408 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session.h
@@ -40,6 +40,8 @@ public:
void Write(TContinuationToken&& continuationToken, TWriteMessage&& message) override;
+ void WriteEncoded(TContinuationToken&& continuationToken, TWriteMessage&& message) override;
+
NThreading::TFuture<void> WaitEvent() override;
// Empty maybe - block till all work is done. Otherwise block at most at closeTimeout duration.
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
index ee236e67de..e86ca445fd 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
@@ -376,7 +376,8 @@ void TWriteSessionImpl::WriteInternal(TContinuationToken&&, TWriteMessage&& mess
with_lock(Lock) {
CurrentBatch.Add(
GetNextSeqNoImpl(message.SeqNo_), createdAtValue, message.Data, message.Codec, message.OriginalSize,
- message.MessageMeta_
+ message.MessageMeta_,
+ message.GetTxPtr()
);
FlushWriteIfRequiredImpl();
@@ -392,6 +393,10 @@ void TWriteSessionImpl::Write(TContinuationToken&& token, TWriteMessage&& messag
WriteInternal(std::move(token), std::move(message));
}
+void TWriteSessionImpl::WriteEncoded(TContinuationToken&& token, TWriteMessage&& message)
+{
+ WriteInternal(std::move(token), std::move(message));
+}
TWriteSessionImpl::THandleResult TWriteSessionImpl::OnErrorImpl(NYdb::TPlainStatus&& status) {
Y_VERIFY(Lock.IsLocked());
@@ -1078,9 +1083,11 @@ size_t TWriteSessionImpl::WriteBatchImpl() {
(*Counters->MessagesInflight)++;
if (!currMessage.MessageMeta.empty()) {
OriginalMessagesToSend.emplace(sequenceNumber, createTs, datum.size(),
- std::move(currMessage.MessageMeta));
+ std::move(currMessage.MessageMeta),
+ currMessage.Tx);
} else {
- OriginalMessagesToSend.emplace(sequenceNumber, createTs, datum.size());
+ OriginalMessagesToSend.emplace(sequenceNumber, createTs, datum.size(),
+ currMessage.Tx);
}
}
block.Data = std::move(CurrentBatch.Data);
@@ -1161,6 +1168,10 @@ void TWriteSessionImpl::SendImpl() {
auto* msgData = writeRequest->add_messages();
+ if (message.Tx) {
+ writeRequest->mutable_tx()->set_id(message.Tx->GetId());
+ writeRequest->mutable_tx()->set_session(message.Tx->GetSession().GetId());
+ }
msgData->set_seq_no(message.SeqNo + SeqNoShift);
*msgData->mutable_created_at() = ::google::protobuf::util::TimeUtil::MillisecondsToTimestamp(message.CreatedAt.MilliSeconds());
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
index e0a986c0af..e5ed606a66 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
@@ -174,14 +174,18 @@ private:
TMaybe<ECodec> Codec;
ui32 OriginalSize; // only for coded messages
TVector<std::pair<TString, TString>> MessageMeta;
+ const NTable::TTransaction* Tx;
+
TMessage(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec = {},
- ui32 originalSize = 0, const TVector<std::pair<TString, TString>>& messageMeta = {})
+ ui32 originalSize = 0, const TVector<std::pair<TString, TString>>& messageMeta = {},
+ const NTable::TTransaction* tx = nullptr)
: SeqNo(seqNo)
, CreatedAt(createdAt)
, DataRef(data)
, Codec(codec)
, OriginalSize(originalSize)
, MessageMeta(messageMeta)
+ , Tx(tx)
{}
};
@@ -192,12 +196,14 @@ private:
TInstant StartedAt = TInstant::Zero();
bool Acquired = false;
bool FlushRequested = false;
+
void Add(ui64 seqNo, const TInstant& createdAt, TStringBuf data, TMaybe<ECodec> codec, ui32 originalSize,
- const TVector<std::pair<TString, TString>>& messageMeta) {
+ const TVector<std::pair<TString, TString>>& messageMeta,
+ const NTable::TTransaction* tx) {
if (StartedAt == TInstant::Zero())
StartedAt = TInstant::Now();
CurrentSize += codec ? originalSize : data.size();
- Messages.emplace_back(seqNo, createdAt, data, codec, originalSize, messageMeta);
+ Messages.emplace_back(seqNo, createdAt, data, codec, originalSize, messageMeta, tx);
Acquired = false;
}
@@ -267,17 +273,24 @@ private:
TInstant CreatedAt;
size_t Size;
TVector<std::pair<TString, TString>> MessageMeta;
- TOriginalMessage(const ui64 sequenceNumber, const TInstant createdAt, const size_t size)
+ const NTable::TTransaction* Tx;
+
+ TOriginalMessage(const ui64 sequenceNumber, const TInstant createdAt, const size_t size,
+ const NTable::TTransaction* tx)
: SeqNo(sequenceNumber)
, CreatedAt(createdAt)
, Size(size)
+ , Tx(tx)
{}
+
TOriginalMessage(const ui64 sequenceNumber, const TInstant createdAt, const size_t size,
- TVector<std::pair<TString, TString>>&& messageMeta)
+ TVector<std::pair<TString, TString>>&& messageMeta,
+ const NTable::TTransaction* tx)
: SeqNo(sequenceNumber)
, CreatedAt(createdAt)
, Size(size)
, MessageMeta(std::move(messageMeta))
+ , Tx(tx)
{}
};
@@ -329,6 +342,8 @@ public:
Y_FAIL("Do not use this method");
};
+ void WriteEncoded(TContinuationToken&& continuationToken, TWriteMessage&& message) override;
+
void WriteEncoded(TContinuationToken&&, TStringBuf, ECodec, ui32,
TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) override {
Y_UNUSED(seqNo);
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/topic.h b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
index 5c8e2bfa73..0fecce9af0 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/topic.h
+++ b/ydb/public/sdk/cpp/client/ydb_topic/topic.h
@@ -1573,6 +1573,13 @@ public:
//! Message metadata. Limited to 4096 characters overall (all keys and values combined).
FLUENT_SETTING(TMessageMeta, MessageMeta);
+ //! Transaction id
+ FLUENT_SETTING_OPTIONAL(std::reference_wrapper<NTable::TTransaction>, Tx);
+
+ const NTable::TTransaction* GetTxPtr() const
+ {
+ return Tx_ ? &Tx_->get() : nullptr;
+ }
};
//! Simple write session. Does not need event handlers. Does not provide Events, ContinuationTokens, write Acks.
@@ -1632,6 +1639,10 @@ public:
virtual void Write(TContinuationToken&& continuationToken, TStringBuf data, TMaybe<ui64> seqNo = Nothing(),
TMaybe<TInstant> createTimestamp = Nothing()) = 0;
+ //! Write single message that is already coded by codec.
+ //! continuationToken - a token earlier provided to client with ReadyToAccept event.
+ virtual void WriteEncoded(TContinuationToken&& continuationToken, TWriteMessage&& params) = 0;
+
//! Write single message that is already compressed by codec. Old method with only basic message options.
virtual void WriteEncoded(TContinuationToken&& continuationToken, TStringBuf data, ECodec codec, ui32 originalSize,
TMaybe<ui64> seqNo = Nothing(), TMaybe<TInstant> createTimestamp = Nothing()) = 0;
diff --git a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
index ce9c3f88ef..dc2f938d60 100644
--- a/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
@@ -12,12 +12,23 @@ Y_UNIT_TEST_SUITE(TxUsage) {
NKikimr::Tests::TServerSettings MakeServerSettings()
{
+ auto loggerInitializer = [](TTestActorRuntime& runtime) {
+ runtime.SetLogPriority(NKikimrServices::PQ_READ_PROXY, NActors::NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::PQ_WRITE_PROXY, NActors::NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::PQ_MIRRORER, NActors::NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::PQ_METACACHE, NActors::NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::PERSQUEUE, NActors::NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::PERSQUEUE_CLUSTER_TRACKER, NActors::NLog::PRI_DEBUG);
+ };
+
auto settings = PQSettings(0);
settings.SetDomainName("Root");
settings.SetEnableTopicServiceTx(true);
settings.PQConfig.SetTopicsAreFirstClassCitizen(true);
settings.PQConfig.SetRoot("/Root");
settings.PQConfig.SetDatabase("/Root");
+ settings.SetLoggerInitializer(loggerInitializer);
+
return settings;
}
@@ -114,19 +125,21 @@ protected:
void WriteMessage(const TString& data);
-private:
+protected:
+ const TDriver& GetDriver();
+
TString GetTopicPath() const;
+ TString GetMessageGroupId() const;
+
+private:
TString GetTopicName() const;
TString GetConsumerName() const;
- TString GetMessageGroupId() const;
template<class E>
E ReadEvent(TTopicReadSessionPtr reader, NTable::TTransaction& tx);
template<class E>
E ReadEvent(TTopicReadSessionPtr reader);
- const TDriver& GetDriver();
-
std::shared_ptr<TEnvironment> Env;
TMaybe<TDriver> Driver;
};
@@ -317,6 +330,34 @@ Y_UNIT_TEST_F(TwoSessionOneConsumer, TFixture)
CommitTx(tx1, EStatus::ABORTED);
}
+Y_UNIT_TEST_F(WriteToTopic, TFixture)
+{
+ NTopic::TWriteSessionSettings options;
+ options.Path(GetTopicPath());
+ options.MessageGroupId(GetMessageGroupId());
+
+ auto session = CreateSession();
+ auto tx = BeginTx(session);
+
+ auto writeMessages = [&](const TVector<TString>& messages) {
+ NTopic::TTopicClient client(GetDriver());
+ auto session = client.CreateSimpleBlockingWriteSession(options);
+
+ for (auto& message : messages) {
+ NTopic::TWriteMessage params(message);
+ params.Tx(tx);
+ UNIT_ASSERT(session->Write(std::move(params)));
+ }
+
+ UNIT_ASSERT(session->Close());
+ };
+
+ writeMessages({"a", "bb", "ccc", "dddd"});
+ writeMessages({"eeeee", "ffffff", "ggggggg"});
+
+ CommitTx(tx, EStatus::ABORTED);
+}
+
}
}
diff --git a/ydb/services/persqueue_v1/actors/partition_writer.cpp b/ydb/services/persqueue_v1/actors/partition_writer.cpp
new file mode 100644
index 0000000000..4827405eb5
--- /dev/null
+++ b/ydb/services/persqueue_v1/actors/partition_writer.cpp
@@ -0,0 +1,109 @@
+namespace NKikimr::NGRpcProxy::V1 {
+
+const ui32 MAX_RESERVE_REQUESTS_INFLIGHT = 5;
+
+template<class TEvWrite>
+TPartitionWriterImpl<TEvWrite>::TPartitionWriterImpl(NKikimr::NPQ::TMultiCounter& bytesInflight,
+ NKikimr::NPQ::TMultiCounter& bytesInflightTotal,
+ ui64& bytesInflight_,
+ ui64& bytesInflightTotal_) :
+ BytesInflight(bytesInflight),
+ BytesInflightTotal(bytesInflightTotal),
+ BytesInflight_(bytesInflight_),
+ BytesInflightTotal_(bytesInflightTotal_)
+{
+}
+
+template<class TEvWrite>
+void TPartitionWriterImpl<TEvWrite>::OnEvInitResult(NPQ::TEvPartitionWriter::TEvInitResult::TPtr& ev)
+{
+ const auto& result = *ev->Get();
+ Y_VERIFY(result.IsSuccess());
+ OwnerCookie = result.GetResult().OwnerCookie;
+}
+
+template<class TEvWrite>
+ui64 TPartitionWriterImpl<TEvWrite>::OnEvWriteAccepted(NPQ::TEvPartitionWriter::TEvWriteAccepted::TPtr& ev)
+{
+ Y_VERIFY(!SentRequests.empty());
+
+ auto request = std::move(SentRequests.front());
+ Y_VERIFY(ev->Get()->Cookie == request->Cookie);
+
+ SentRequests.pop_front();
+
+ ui64 size = request->ByteSize;
+
+ AcceptedRequests.emplace_back(std::move(request));
+
+ return size;
+}
+
+template<class TEvWrite>
+auto TPartitionWriterImpl<TEvWrite>::OnEvWriteResponse(NPQ::TEvPartitionWriter::TEvWriteResponse::TPtr& ev) -> TWriteRequestInfoPtr
+{
+ Y_VERIFY(!AcceptedRequests.empty());
+
+ auto request = std::move(AcceptedRequests.front());
+ AcceptedRequests.pop_front();
+
+ const auto& resp = ev->Get()->Record.GetPartitionResponse();
+ Y_VERIFY(resp.GetCookie() == request->Cookie);
+
+ return request;
+}
+
+template<class TEvWrite>
+bool TPartitionWriterImpl<TEvWrite>::AnyRequests() const
+{
+ return !QuotedRequests.empty() || !SentRequests.empty() || !AcceptedRequests.empty();
+}
+
+template<class TEvWrite>
+void TPartitionWriterImpl<TEvWrite>::AddWriteRequest(TWriteRequestInfoPtr request, const TActorContext& ctx)
+{
+ if (!QuotedRequests.empty() && SentRequests.size() < MAX_RESERVE_REQUESTS_INFLIGHT) {
+ SendRequest(std::move(request), ctx);
+ } else {
+ QuotedRequests.emplace_back(std::move(request));
+ }
+}
+
+template<class TEvWrite>
+bool TPartitionWriterImpl<TEvWrite>::TrySendNextQuotedRequest(const TActorContext& ctx)
+{
+ if (QuotedRequests.empty()) {
+ return false;
+ }
+
+ SendRequest(std::move(QuotedRequests.front()), ctx);
+ QuotedRequests.pop_front();
+
+ return true;
+}
+
+template<class TEvWrite>
+void TPartitionWriterImpl<TEvWrite>::SendRequest(TWriteRequestInfoPtr request, const TActorContext& ctx)
+{
+ Y_VERIFY(request->PartitionWriteRequest);
+
+ i64 diff = 0;
+ for (const auto& w : request->UserWriteRequests) {
+ diff -= w->Request.ByteSize();
+ }
+
+ Y_VERIFY(-diff <= (i64)BytesInflight_);
+ diff += request->PartitionWriteRequest->Record.ByteSize();
+ BytesInflight_ += diff;
+ BytesInflightTotal_ += diff;
+
+ if (BytesInflight && BytesInflightTotal) {
+ BytesInflight.Inc(diff);
+ BytesInflightTotal.Inc(diff);
+ }
+
+ ctx.Send(PartitionWriterActor, std::move(request->PartitionWriteRequest));
+ SentRequests.emplace_back(std::move(request));
+}
+
+}
diff --git a/ydb/services/persqueue_v1/actors/partition_writer.h b/ydb/services/persqueue_v1/actors/partition_writer.h
new file mode 100644
index 0000000000..b47ddfaa82
--- /dev/null
+++ b/ydb/services/persqueue_v1/actors/partition_writer.h
@@ -0,0 +1,59 @@
+#pragma once
+
+#include <ydb/core/persqueue/writer/writer.h>
+#include <ydb/services/persqueue_v1/actors/events.h>
+#include <ydb/services/persqueue_v1/actors/write_request_info.h>
+
+namespace NKikimr::NGRpcProxy::V1 {
+
+template<class TEvWrite>
+struct TPartitionWriterImpl {
+ using TWriteRequestInfoPtr = typename TWriteRequestInfoImpl<TEvWrite>::TPtr;
+
+ TPartitionWriterImpl(NKikimr::NPQ::TMultiCounter& bytesInflight,
+ NKikimr::NPQ::TMultiCounter& bytesInflightTotal,
+ ui64& bytesInflight_,
+ ui64& bytesInflightTotal_);
+
+ //
+ // from client
+ //
+ void OnEvWrite(TEvPQProxy::TEvTopicWrite::TPtr& ev, const TActorContext& ctx);
+
+ //
+ // from partition writer actor
+ //
+ void OnEvInitResult(NPQ::TEvPartitionWriter::TEvInitResult::TPtr& ev);
+ ui64 OnEvWriteAccepted(NPQ::TEvPartitionWriter::TEvWriteAccepted::TPtr& ev);
+ TWriteRequestInfoPtr OnEvWriteResponse(NPQ::TEvPartitionWriter::TEvWriteResponse::TPtr& ev);
+
+ //
+ // from quoter
+ //
+ void OnEvWakeup(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx);
+
+ bool AnyRequests() const;
+ void AddWriteRequest(TWriteRequestInfoPtr request, const TActorContext& ctx);
+ bool TrySendNextQuotedRequest(const TActorContext& ctx);
+ void SendRequest(TWriteRequestInfoPtr request, const TActorContext& ctx);
+
+ TActorId PartitionWriterActor;
+ TString OwnerCookie;
+
+ // Quoted, but not sent requests
+ TDeque<TWriteRequestInfoPtr> QuotedRequests;
+ // Requests that is sent to partition actor, but not accepted
+ TDeque<TWriteRequestInfoPtr> SentRequests;
+ // Accepted requests
+ TDeque<TWriteRequestInfoPtr> AcceptedRequests;
+
+ NKikimr::NPQ::TMultiCounter& BytesInflight;
+ NKikimr::NPQ::TMultiCounter& BytesInflightTotal;
+
+ ui64& BytesInflight_;
+ ui64& BytesInflightTotal_;
+};
+
+}
+
+#include "partition_writer.cpp"
diff --git a/ydb/services/persqueue_v1/actors/write_request_info.h b/ydb/services/persqueue_v1/actors/write_request_info.h
new file mode 100644
index 0000000000..dc21640cd2
--- /dev/null
+++ b/ydb/services/persqueue_v1/actors/write_request_info.h
@@ -0,0 +1,50 @@
+#pragma once
+
+namespace NKikimr::NGRpcProxy::V1 {
+
+template<class TEvWrite>
+struct TWriteRequestInfoImpl : public TSimpleRefCount<TWriteRequestInfoImpl<TEvWrite>> {
+ using TPtr = TIntrusivePtr<TWriteRequestInfoImpl<TEvWrite>>;
+
+ explicit TWriteRequestInfoImpl(ui64 cookie)
+ : PartitionWriteRequest(new NPQ::TEvPartitionWriter::TEvWriteRequest(cookie))
+ , Cookie(cookie)
+ , ByteSize(0)
+ , RequiredQuota(0)
+ {
+ }
+
+ std::pair<TString, TString> GetTransactionId() const;
+
+ // Source requests from user (grpc session object)
+ std::deque<THolder<TEvWrite>> UserWriteRequests;
+
+ // Partition write request
+ THolder<NPQ::TEvPartitionWriter::TEvWriteRequest> PartitionWriteRequest;
+
+ // Formed write request's cookie
+ ui64 Cookie;
+
+ // Formed write request's size
+ ui64 ByteSize;
+
+ // Quota in term of RUs
+ ui64 RequiredQuota;
+};
+
+template<class TEvWrite>
+std::pair<TString, TString> TWriteRequestInfoImpl<TEvWrite>::GetTransactionId() const
+{
+ Y_VERIFY(!UserWriteRequests.empty());
+
+ static constexpr bool UseMigrationProtocol = !std::is_same_v<TEvWrite, TEvPQProxy::TEvTopicWrite>;
+
+ if constexpr (UseMigrationProtocol) {
+ return {"", ""};
+ } else {
+ auto& request = UserWriteRequests.front()->Request.write_request();
+ return {request.tx().session(), request.tx().id()};
+ }
+}
+
+}
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.h b/ydb/services/persqueue_v1/actors/write_session_actor.h
index bc679a850a..ece2cd0992 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.h
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.h
@@ -1,7 +1,9 @@
#pragma once
#include "events.h"
+#include "partition_writer.h"
#include "persqueue_utils.h"
+#include "write_request_info.h"
#include <library/cpp/actors/core/actor_bootstrapped.h>
@@ -52,32 +54,8 @@ class TWriteSessionActor
using TEvDescribeTopicsResponse = NMsgBusProxy::NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeTopicsResponse;
using TEvDescribeTopicsRequest = NMsgBusProxy::NPqMetaCacheV2::TEvPqNewMetaCache::TEvDescribeTopicsRequest;
- struct TWriteRequestInfo: public TSimpleRefCount<TWriteRequestInfo> {
- using TPtr = TIntrusivePtr<TWriteRequestInfo>;
-
- explicit TWriteRequestInfo(ui64 cookie)
- : PartitionWriteRequest(new NPQ::TEvPartitionWriter::TEvWriteRequest(cookie))
- , Cookie(cookie)
- , ByteSize(0)
- , RequiredQuota(0)
- {
- }
-
- // Source requests from user (grpc session object)
- std::deque<THolder<TEvWrite>> UserWriteRequests;
-
- // Partition write request
- THolder<NPQ::TEvPartitionWriter::TEvWriteRequest> PartitionWriteRequest;
-
- // Formed write request's cookie
- ui64 Cookie;
-
- // Formed write request's size
- ui64 ByteSize;
-
- // Quota in term of RUs
- ui64 RequiredQuota;
- };
+ using TWriteRequestInfo = TWriteRequestInfoImpl<TEvWrite>;
+ using TPartitionWriter = TPartitionWriterImpl<TEvWrite>;
// Codec ID size in bytes
static constexpr ui32 CODEC_ID_SIZE = 1;
@@ -181,7 +159,7 @@ private:
void MakeAndSentInitResponse(const TMaybe<ui64>& maxSeqNo, const TActorContext& ctx);
void Handle(NPQ::TEvPartitionWriter::TEvWriteAccepted::TPtr& ev, const TActorContext& ctx);
- void ProcessWriteResponse(const NKikimrClient::TPersQueuePartitionResponse& response, const TActorContext& ctx);
+ void ProcessWriteResponse(const NKikimrClient::TPersQueuePartitionResponse& response, TPartitionWriter& writer, const TActorContext& ctx);
void Handle(NPQ::TEvPartitionWriter::TEvWriteResponse::TPtr& ev, const TActorContext& ctx);
void Handle(NPQ::TEvPartitionWriter::TEvDisconnected::TPtr& ev, const TActorContext& ctx);
void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const NActors::TActorContext& ctx);
@@ -196,12 +174,17 @@ private:
void CheckFinish(const NActors::TActorContext& ctx);
void PrepareRequest(THolder<TEvWrite>&& ev, const TActorContext& ctx);
- void SendRequest(typename TWriteRequestInfo::TPtr&& request, const TActorContext& ctx);
+ void SendRequest(TPartitionWriter& writer, typename TWriteRequestInfo::TPtr&& request, const TActorContext& ctx);
void SetupCounters();
void SetupCounters(const TString& cloudId, const TString& dbId, const TString& dbPath, const bool isServerless, const TString& folderId);
private:
+ TPartitionWriter* FindPartitionWriter(const TString& sessionId, const TString& txId);
+ void InitPartitionWriter(const TString& sessionId, const TString& txId,
+ const TActorContext& ctx);
+ bool AnyRequests() const;
+
std::unique_ptr<TEvStreamWriteRequest> Request;
enum EState {
@@ -218,7 +201,6 @@ private:
EState State;
TActorId SchemeCache;
- TActorId Writer;
TString PeerName;
ui64 Cookie;
@@ -242,15 +224,9 @@ private:
THolder<TAclWrapper> ACL;
// Future batch request to partition actor
- typename TWriteRequestInfo::TPtr PendingRequest;
+ std::deque<typename TWriteRequestInfo::TPtr> PendingRequests;
// Request that is waiting for quota
typename TWriteRequestInfo::TPtr PendingQuotaRequest;
- // Quoted, but not sent requests
- std::deque<typename TWriteRequestInfo::TPtr> QuotedRequests;
- // Requests that is sent to partition actor, but not accepted
- std::deque<typename TWriteRequestInfo::TPtr> SentRequests;
- // Accepted requests
- std::deque<typename TWriteRequestInfo::TPtr> AcceptedRequests;
bool WritesDone;
@@ -321,6 +297,7 @@ private:
TDeque<ui64> SeqNoInflight;
+ THashMap<std::pair<TString, TString>, TPartitionWriter> Writers;
};
}
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
index 0b06d0be56..a4f2a26abe 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
@@ -154,13 +154,10 @@ inline void FillChunkDataFromReq(
*msgMeta = msg.metadata_items();
}
-namespace NGRpcProxy {
-namespace V1 {
+namespace NGRpcProxy::V1 {
using namespace Ydb::PersQueue::V1;
-static const ui32 MAX_RESERVE_REQUESTS_INFLIGHT = 5;
-
static const ui32 MAX_BYTES_INFLIGHT = 1_MB;
static const TDuration SOURCEID_UPDATE_PERIOD = TDuration::Hours(1);
@@ -295,8 +292,9 @@ template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::Die(const TActorContext& ctx) {
if (State == ES_DYING)
return;
- if (Writer)
- ctx.Send(Writer, new TEvents::TEvPoisonPill());
+ for (auto& [_, writer] : Writers) {
+ ctx.Send(writer.PartitionWriterActor, new TEvents::TEvPoisonPill());
+ }
if (SessionsActive) {
SessionsActive.Dec();
@@ -339,7 +337,7 @@ void TWriteSessionActor<UseMigrationProtocol>::CheckFinish(const TActorContext&
CloseSession("out of order Writes done before initialization", PersQueue::ErrorCode::BAD_REQUEST, ctx);
return;
}
- if (!PendingRequest && !PendingQuotaRequest && QuotedRequests.empty() && SentRequests.empty() && AcceptedRequests.empty()) {
+ if (PendingRequests.empty() && !PendingQuotaRequest && !AnyRequests()) {
CloseSession("", PersQueue::ErrorCode::OK, ctx);
return;
}
@@ -928,7 +926,6 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NKqp::TEvKqp::TEvQueryResp
GetOrProcessPartition(ctx);
return;
} else if (State == EState::ES_WAIT_TABLE_REQUEST_2) {
-
SourceIdUpdatesInflight--;
if (!SourceIdUpdatesInflight) {
LastSourceIdUpdate = ctx.Now();
@@ -1033,10 +1030,8 @@ void TWriteSessionActor<UseMigrationProtocol>::ProceedPartition(const ui32 parti
return;
}
- Writer = ctx.RegisterWithSameMailbox(NPQ::CreatePartitionWriter(
- ctx.SelfID, {}, PartitionTabletId, Partition, ExpectedGeneration,
- SourceId, TPartitionWriterOpts().WithDeduplication(UseDeduplication)
- ));
+ InitPartitionWriter("", "", ctx);
+
State = ES_WAIT_WRITER_INIT;
ui32 border = AppData(ctx)->PQConfig.GetWriteInitLatencyBigMs();
@@ -1146,11 +1141,21 @@ void TWriteSessionActor<UseMigrationProtocol>::MakeAndSentInitResponse(
template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::TEvInitResult::TPtr& ev, const TActorContext& ctx) {
- if (State != ES_WAIT_WRITER_INIT) {
- return CloseSession("got init result but not wait for it", PersQueue::ErrorCode::ERROR, ctx);
+ const auto& result = *ev->Get();
+
+ auto writer = FindPartitionWriter(result.SessionId, result.TxId);
+ Y_VERIFY(writer != nullptr);
+
+ if (!result.SessionId && !result.TxId) {
+ if (State != ES_WAIT_WRITER_INIT) {
+ return CloseSession("got init result but not wait for it", PersQueue::ErrorCode::ERROR, ctx);
+ }
+ } else {
+ if (State != ES_INITED) {
+ return CloseSession("got init result but not wait for it", PersQueue::ErrorCode::ERROR, ctx);
+ }
}
- const auto& result = *ev->Get();
if (!result.IsSuccess()) {
const auto& error = result.GetError();
if (error.Response.HasErrorCode()) {
@@ -1160,13 +1165,17 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T
}
}
- OwnerCookie = result.GetResult().OwnerCookie;
+ writer->OnEvInitResult(ev);
+
const auto& maxSeqNo = result.GetResult().SourceIdInfo.GetSeqNo();
if (!UseDeduplication) {
Y_VERIFY(maxSeqNo == 0);
}
- MakeAndSentInitResponse(maxSeqNo, ctx);
+ if (!result.SessionId && !result.TxId) {
+ OwnerCookie = result.GetResult().OwnerCookie;
+ MakeAndSentInitResponse(maxSeqNo, ctx);
+ }
}
template<bool UseMigrationProtocol>
@@ -1175,18 +1184,21 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T
return CloseSession("got write permission but not wait for it", PersQueue::ErrorCode::ERROR, ctx);
}
- Y_VERIFY(!SentRequests.empty());
- auto writeRequest = std::move(SentRequests.front());
+ auto* writer = FindPartitionWriter(ev->Get()->SessionId, ev->Get()->TxId);
+ Y_VERIFY(writer != nullptr);
+
+ Y_VERIFY(!writer->SentRequests.empty());
+ auto writeRequest = std::move(writer->SentRequests.front());
if (ev->Get()->Cookie != writeRequest->Cookie) {
return CloseSession("out of order reserve bytes response from server, may be previous is lost", PersQueue::ErrorCode::ERROR, ctx);
}
- SentRequests.pop_front();
+ writer->SentRequests.pop_front();
ui64 diff = writeRequest->ByteSize;
- AcceptedRequests.emplace_back(std::move(writeRequest));
+ writer->AcceptedRequests.emplace_back(std::move(writeRequest));
BytesInflight_ -= diff;
if (BytesInflight) {
@@ -1201,22 +1213,21 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T
}
}
- if (!IsQuotaRequired() && PendingRequest) {
- SendRequest(std::move(PendingRequest), ctx);
- } else if (!QuotedRequests.empty()) {
- SendRequest(std::move(QuotedRequests.front()), ctx);
- QuotedRequests.pop_front();
+ if (!IsQuotaRequired() && !PendingRequests.empty()) {
+ SendRequest(*writer, std::move(PendingRequests.front()), ctx);
+ PendingRequests.pop_front();
+ } else if (!writer->QuotedRequests.empty()) {
+ SendRequest(*writer, std::move(writer->QuotedRequests.front()), ctx);
+ writer->QuotedRequests.pop_front();
}
}
template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::ProcessWriteResponse(
- const NKikimrClient::TPersQueuePartitionResponse& response, const TActorContext& ctx
+ const NKikimrClient::TPersQueuePartitionResponse& response, TPartitionWriter& writer, const TActorContext& ctx
) {
-
- auto writeRequest = std::move(AcceptedRequests.front());
- AcceptedRequests.pop_front();
-
+ auto writeRequest = std::move(writer.AcceptedRequests.front());
+ writer.AcceptedRequests.pop_front();
auto addAckMigration = [this](
const TPersQueuePartitionResponse::TCmdWriteResult& res,
@@ -1361,19 +1372,21 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T
}
}
+ auto* writer = FindPartitionWriter(result.SessionId, result.TxId);
+ Y_VERIFY(writer != nullptr);
- if (AcceptedRequests.empty()) {
+ if (writer->AcceptedRequests.empty()) {
CloseSession("got too many replies from server, internal error", PersQueue::ErrorCode::ERROR, ctx);
return;
}
- const auto& writeRequest = AcceptedRequests.front();
+ const auto& writeRequest = writer->AcceptedRequests.front();
const auto& resp = result.Record.GetPartitionResponse();
if (resp.GetCookie() != writeRequest->Cookie) {
return CloseSession("out of order write response from server, may be previous is lost", PersQueue::ErrorCode::ERROR, ctx);
}
- ProcessWriteResponse(resp, ctx);
+ ProcessWriteResponse(resp, *writer, ctx);
}
template<bool UseMigrationProtocol>
@@ -1399,11 +1412,39 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvTabletPipe::TEvClientDe
template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::PrepareRequest(THolder<TEvWrite>&& ev, const TActorContext& ctx) {
- if (!PendingRequest) {
- PendingRequest = new TWriteRequestInfo(++NextRequestCookie);
+ const auto& writeRequest = ev->Request.write_request();
+
+ if constexpr (!UseMigrationProtocol) {
+ if (writeRequest.has_tx() && !AppData(ctx)->FeatureFlags.GetEnableTopicServiceTx()) {
+ CloseSession("Disabled transaction support for TopicService.",
+ PersQueue::ErrorCode::ERROR, ctx);
+ return;
+ }
}
- auto& request = PendingRequest->PartitionWriteRequest->Record;
+ if (PendingRequests.empty()) {
+ PendingRequests.emplace_back(new TWriteRequestInfo(++NextRequestCookie));
+ } else if constexpr (!UseMigrationProtocol) {
+ Y_VERIFY(!PendingRequests.back()->UserWriteRequests.empty());
+
+ auto& last = PendingRequests.back()->UserWriteRequests.back()->Request.write_request();
+
+ if (writeRequest.has_tx()) {
+ if (last.has_tx()) {
+ if ((writeRequest.tx().session() != last.tx().session()) ||
+ (writeRequest.tx().id() != last.tx().id())) {
+ PendingRequests.emplace_back(new TWriteRequestInfo(++NextRequestCookie));
+ }
+ } else {
+ PendingRequests.emplace_back(new TWriteRequestInfo(++NextRequestCookie));
+ }
+ } else if (last.has_tx()) {
+ PendingRequests.emplace_back(new TWriteRequestInfo(++NextRequestCookie));
+ }
+ }
+
+ auto pendingRequest = PendingRequests.back();
+ auto& request = pendingRequest->PartitionWriteRequest->Record;
ui64 payloadSize = 0;
auto addDataMigration = [&](const StreamingWriteClientMessage::WriteRequest& writeRequest, const i32 messageIndex) {
@@ -1450,7 +1491,6 @@ void TWriteSessionActor<UseMigrationProtocol>::PrepareRequest(THolder<TEvWrite>&
maxMessageMetadataSize = std::max(maxMessageMetadataSize, currMetadataSize);
};
- const auto& writeRequest = ev->Request.write_request();
if constexpr (UseMigrationProtocol) {
for (i32 messageIndex = 0; messageIndex != writeRequest.sequence_numbers_size(); ++messageIndex) {
addDataMigration(writeRequest, messageIndex);
@@ -1459,15 +1499,21 @@ void TWriteSessionActor<UseMigrationProtocol>::PrepareRequest(THolder<TEvWrite>&
for (i32 messageIndex = 0; messageIndex != writeRequest.messages_size(); ++messageIndex) {
addData(writeRequest, messageIndex);
}
+
+ if (writeRequest.has_tx()) {
+ InitPartitionWriter(writeRequest.tx().session(), writeRequest.tx().id(), ctx);
+ }
}
- PendingRequest->UserWriteRequests.push_back(std::move(ev));
- PendingRequest->ByteSize = request.ByteSize();
+ pendingRequest->UserWriteRequests.push_back(std::move(ev));
+ pendingRequest->ByteSize = request.ByteSize();
+
auto msgMetaEnabled = AppData(ctx)->FeatureFlags.GetEnableTopicMessageMeta();
if (!msgMetaEnabled && maxMessageMetadataSize > 0) {
CloseSession("Message level metadata support is disabled on server size", PersQueue::ErrorCode::BAD_REQUEST, ctx);
return;
}
+
if (maxMessageMetadataSize > MAX_METADATA_SIZE_PER_MESSAGE) {
CloseSession(
TStringBuilder() << "Message level metadata size is limited to " << MAX_METADATA_SIZE_PER_MESSAGE
@@ -1476,21 +1522,36 @@ void TWriteSessionActor<UseMigrationProtocol>::PrepareRequest(THolder<TEvWrite>&
);
return;
}
+
if (const auto ru = CalcRuConsumption(payloadSize)) {
- PendingRequest->RequiredQuota += ru;
- if (MaybeRequestQuota(PendingRequest->RequiredQuota, EWakeupTag::RlAllowed, ctx)) {
- Y_VERIFY(!PendingQuotaRequest);
- PendingQuotaRequest = std::move(PendingRequest);
+ pendingRequest->RequiredQuota += ru;
+
+ if (!PendingQuotaRequest) {
+ if (MaybeRequestQuota(PendingRequests.front()->RequiredQuota, EWakeupTag::RlAllowed, ctx)) {
+ PendingQuotaRequest = std::move(PendingRequests.front());
+ PendingRequests.pop_front();
+ }
}
} else {
- if (!PendingQuotaRequest && SentRequests.size() < MAX_RESERVE_REQUESTS_INFLIGHT) {
- SendRequest(std::move(PendingRequest), ctx);
+ TPartitionWriter* writer = nullptr;
+ if constexpr (UseMigrationProtocol) {
+ Y_VERIFY(Writers.size() == 1);
+ writer = &Writers.begin()->second;
+ } else {
+ const auto& [sessionId, txId] = pendingRequest->GetTransactionId();
+ writer = FindPartitionWriter(sessionId, txId);
+ Y_VERIFY(writer != nullptr);
+ }
+
+ if (!PendingQuotaRequest && writer->SentRequests.size() < MAX_RESERVE_REQUESTS_INFLIGHT) {
+ SendRequest(*writer, std::move(PendingRequests.front()), ctx);
+ PendingRequests.pop_front();
}
}
}
template<bool UseMigrationProtocol>
-void TWriteSessionActor<UseMigrationProtocol>::SendRequest(typename TWriteRequestInfo::TPtr&& request, const TActorContext& ctx) {
+void TWriteSessionActor<UseMigrationProtocol>::SendRequest(TPartitionWriter& writer, typename TWriteRequestInfo::TPtr&& request, const TActorContext& ctx) {
Y_VERIFY(request->PartitionWriteRequest);
i64 diff = 0;
@@ -1507,8 +1568,8 @@ void TWriteSessionActor<UseMigrationProtocol>::SendRequest(typename TWriteReques
BytesInflightTotal.Inc(diff);
}
- ctx.Send(Writer, std::move(request->PartitionWriteRequest));
- SentRequests.push_back(std::move(request));
+ ctx.Send(writer.PartitionWriterActor, std::move(request->PartitionWriteRequest));
+ writer.SentRequests.push_back(std::move(request));
}
template<bool UseMigrationProtocol>
@@ -1754,21 +1815,29 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(TEvents::TEvWakeup::TPtr&
case EWakeupTag::RecheckAcl:
return RecheckACL(ctx);
- case EWakeupTag::RlAllowed:
+ case EWakeupTag::RlAllowed: {
if (auto counters = Request->GetCounters()) {
counters->AddConsumedRequestUnits(PendingQuotaRequest->RequiredQuota);
}
- if (SentRequests.size() < MAX_RESERVE_REQUESTS_INFLIGHT) {
- SendRequest(std::move(PendingQuotaRequest), ctx);
+ const auto& [sessionId, txId] = PendingQuotaRequest->GetTransactionId();
+ auto* writer = FindPartitionWriter(sessionId, txId);
+ Y_VERIFY(writer != nullptr);
+
+ if (writer->SentRequests.size() < MAX_RESERVE_REQUESTS_INFLIGHT) {
+ SendRequest(*writer, std::move(PendingQuotaRequest), ctx);
} else {
- QuotedRequests.push_back(std::move(PendingQuotaRequest));
+ writer->QuotedRequests.push_back(std::move(PendingQuotaRequest));
}
- if (PendingQuotaRequest = std::move(PendingRequest)) {
- Y_VERIFY(MaybeRequestQuota(PendingQuotaRequest->RequiredQuota, EWakeupTag::RlAllowed, ctx));
+ if (!PendingRequests.empty()) {
+ Y_VERIFY(MaybeRequestQuota(PendingRequests.front()->RequiredQuota, EWakeupTag::RlAllowed, ctx));
+ PendingQuotaRequest = std::move(PendingRequests.front());
+ PendingRequests.pop_front();
}
+
break;
+ }
case EWakeupTag::RlNoResource:
case EWakeupTag::RlInitNoResource:
@@ -1803,6 +1872,74 @@ void TWriteSessionActor<UseMigrationProtocol>::RecheckACL(const TActorContext& c
}
}
+template<bool UseMigrationProtocol>
+auto TWriteSessionActor<UseMigrationProtocol>::FindPartitionWriter(const TString& sessionId, const TString& txId) -> TPartitionWriter*
+{
+ auto p = Writers.find(std::make_pair(sessionId, txId));
+ if (p == Writers.end()) {
+ return nullptr;
+ }
+ return &p->second;
}
+
+template<bool UseMigrationProtocol>
+void TWriteSessionActor<UseMigrationProtocol>::InitPartitionWriter(const TString& sessionId, const TString& txId,
+ const TActorContext& ctx)
+{
+ auto key = std::make_pair(sessionId, txId);
+ if (auto p = Writers.find(key); p != Writers.end()) {
+ return;
+ }
+
+ auto [p, _] = Writers.emplace(std::piecewise_construct,
+ std::forward_as_tuple(key),
+ std::forward_as_tuple(BytesInflight,
+ BytesInflightTotal,
+ BytesInflight_,
+ BytesInflightTotal_));
+ auto& writer = p->second;
+
+ auto it = PartitionToTablet.find(Partition);
+ ui64 tabletId = it != PartitionToTablet.end() ? it->second : 0;
+
+ TPartitionWriterOpts opts;
+ opts.WithDeduplication(UseDeduplication);
+ if constexpr (!UseMigrationProtocol) {
+ if (sessionId && txId) {
+ Y_VERIFY(Request->GetDatabaseName());
+ opts.WithDatabase(*Request->GetDatabaseName());
+ opts.WithTopicPath(InitRequest.path());
+ opts.WithSessionId(sessionId);
+ opts.WithTxId(txId);
+ if (Request->GetSerializedToken()) {
+ opts.WithToken(Request->GetSerializedToken());
+ }
+ if (Request->GetTraceId()) {
+ opts.WithTraceId(*Request->GetTraceId());
+ }
+ if (Request->GetRequestType()) {
+ opts.WithRequestType(*Request->GetRequestType());
+ }
+ }
+ }
+
+ TActorId actor = ctx.RegisterWithSameMailbox(NPQ::CreatePartitionWriter(
+ ctx.SelfID, {}, tabletId, Partition, ExpectedGeneration,
+ SourceId, opts
+ ));
+ writer.PartitionWriterActor = actor;
+}
+
+template<bool UseMigrationProtocol>
+bool TWriteSessionActor<UseMigrationProtocol>::AnyRequests() const
+{
+ for (auto& [_, writer] : Writers) {
+ if (writer.AnyRequests()) {
+ return true;
+ }
+ }
+ return false;
+}
+
}
}