summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAndrey Serebryanskiy <[email protected]>2025-05-23 12:47:36 +0300
committerGitHub <[email protected]>2025-05-23 12:47:36 +0300
commit94d96d2b7d5d19d6483ad161c3188578aa2e2b4e (patch)
tree69fc09574913a869a5cb44248f0c2f22ef9da4d9
parenta536360eb0b96a2bb2730e42f523ae060a8a058c (diff)
Kafka api transactions fixes (#18639)
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp2
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_balancer_actor.h2
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp8
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_transaction_actor.cpp66
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_transaction_actor.h25
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_transaction_actor_sql.cpp4
-rw-r--r--ydb/core/kafka_proxy/kafka_connection.cpp9
-rw-r--r--ydb/core/kafka_proxy/kafka_transactions_coordinator.cpp16
-rw-r--r--ydb/core/kafka_proxy/kafka_transactions_coordinator.h4
-rw-r--r--ydb/core/kafka_proxy/kqp_helper.cpp20
-rw-r--r--ydb/core/kafka_proxy/kqp_helper.h4
-rw-r--r--ydb/core/kafka_proxy/ut/ut_transaction_actor.cpp60
-rw-r--r--ydb/core/testlib/test_client.cpp2
13 files changed, 112 insertions, 110 deletions
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index 3f214455536..dba3b5713a2 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -2783,7 +2783,7 @@ void TKafkaProxyServiceInitializer::InitializeServices(NActors::TActorSystemSetu
);
setup->LocalServices.emplace_back(
- NKafka::MakeTransactionsServiceID(),
+ NKafka::MakeTransactionsServiceID(NodeId),
TActorSetupCmd(NKafka::CreateTransactionsCoordinator(),
TMailboxType::HTSwap, appData->UserPoolId
)
diff --git a/ydb/core/kafka_proxy/actors/kafka_balancer_actor.h b/ydb/core/kafka_proxy/actors/kafka_balancer_actor.h
index 6a998b69b07..79d72ba8f61 100644
--- a/ydb/core/kafka_proxy/actors/kafka_balancer_actor.h
+++ b/ydb/core/kafka_proxy/actors/kafka_balancer_actor.h
@@ -138,7 +138,7 @@ public:
InstanceId = JoinGroupRequestData->GroupInstanceId.value_or("");
MemberId = JoinGroupRequestData->MemberId.value_or("");
- KAFKA_LOG_ERROR(TStringBuilder() << "JOIN_GROUP request. MemberId# " << MemberId);
+ KAFKA_LOG_D(TStringBuilder() << "JOIN_GROUP request. MemberId# " << MemberId);
if (JoinGroupRequestData->SessionTimeoutMs) {
SessionTimeoutMs = JoinGroupRequestData->SessionTimeoutMs;
diff --git a/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp
index 42530a8442d..d868a35d844 100644
--- a/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp
@@ -45,7 +45,7 @@ namespace NKafka {
TKafkaInitProducerIdActor::TKafkaInitProducerIdActor(const TContext::TPtr context, const ui64 correlationId, const TString& transactionalId)
: Context(context)
, CorrelationId(correlationId)
- , TransactionalId(std::move(transactionalId)) {
+ , TransactionalId(transactionalId) {
}
void TKafkaInitProducerIdActor::Bootstrap(const NActors::TActorContext& ctx) {
@@ -201,9 +201,9 @@ namespace NKafka {
void TKafkaInitProducerIdActor::OnSuccessfullProducerStateUpdate(NKqp::TEvKqp::TEvQueryResponse::TPtr ev) {
auto producerState = ParseProducerState(ev).value();
- PersistedProducerState = std::move(producerState);
-
SendSaveTxnProducerStateRequest(producerState);
+
+ PersistedProducerState = std::move(producerState);
}
// requests to producer_state table
@@ -282,7 +282,7 @@ namespace NKafka {
void TKafkaInitProducerIdActor::SendSaveTxnProducerStateRequest(const TProducerState& producerState) {
KAFKA_LOG_D("Sending save txn producer state request");
- Send(NKafka::MakeTransactionsServiceID(), new TEvKafka::TEvSaveTxnProducerRequest(
+ Send(NKafka::MakeTransactionsServiceID(SelfId().NodeId()), new TEvKafka::TEvSaveTxnProducerRequest(
producerState.TransactionalId,
{
producerState.ProducerId,
diff --git a/ydb/core/kafka_proxy/actors/kafka_transaction_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_transaction_actor.cpp
index 28a09fefa2c..b54207e065f 100644
--- a/ydb/core/kafka_proxy/actors/kafka_transaction_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_transaction_actor.cpp
@@ -12,7 +12,7 @@
if (!ProducerInRequestIsValid(ev->Get()->Request)) { \
auto& kafkaRequest = ev->Get()->Request; \
TString message = TStringBuilder() << "Recieved invalid request. Got: " \
- << "transactionalId=" << kafkaRequest->TransactionalId->c_str() \
+ << "transactionalId=" << *kafkaRequest->TransactionalId \
<< ", producerId=" << kafkaRequest->ProducerId \
<< ", producerEpoch=" << kafkaRequest->ProducerEpoch; \
SendFailResponse<ErrorResponseType>(ev, EKafkaErrors::UNKNOWN_SERVER_ERROR, message); \
@@ -27,7 +27,7 @@ namespace NKafka {
for (auto& topicInRequest : ev->Get()->Request->Topics) {
for (auto& partitionInRequest : topicInRequest.Partitions) {
- PartitionsInTxn.emplace(GetFullTopicPath(topicInRequest.Name->c_str()), static_cast<ui32>(partitionInRequest));
+ PartitionsInTxn.emplace(GetFullTopicPath(*topicInRequest.Name), static_cast<ui32>(partitionInRequest));
}
}
SendOkResponse<TAddPartitionsToTxnResponseData>(ev);
@@ -49,11 +49,11 @@ namespace NKafka {
// save offsets for future use
for (auto& topicInRequest : ev->Get()->Request->Topics) {
for (auto& partitionInRequest : topicInRequest.Partitions) {
- TTopicPartition topicPartition = {GetFullTopicPath(topicInRequest.Name->c_str()), static_cast<ui32>(partitionInRequest.PartitionIndex)};
+ TTopicPartition topicPartition = {GetFullTopicPath(*topicInRequest.Name), static_cast<ui32>(partitionInRequest.PartitionIndex)};
TPartitionCommit newCommit{
.Partition = partitionInRequest.PartitionIndex,
.Offset = partitionInRequest.CommittedOffset,
- .ConsumerName = ev->Get()->Request->GroupId->c_str(),
+ .ConsumerName = ev->Get()->Request->GroupId.value(),
.ConsumerGeneration = ev->Get()->Request->GenerationId
};
auto it = OffsetsToCommit.find(topicPartition);
@@ -87,6 +87,7 @@ namespace NKafka {
return; // we just ignore second and subsequent requests
} else if (txnAborted) {
SendOkResponse<TEndTxnResponseData>(ev);
+ Die(ctx);
} else {
CommitStarted = true;
EndTxnRequestPtr = std::move(ev);
@@ -106,6 +107,8 @@ namespace NKafka {
return;
}
+ KqpSessionId = ev->Get()->Record.GetResponse().GetSessionId();
+
SendToKqpValidationRequests(ctx);
}
@@ -122,6 +125,9 @@ namespace NKafka {
case EKafkaTxnKqpRequests::SELECT:
HandleSelectResponse(*ev->Get(), ctx);
break;
+ case EKafkaTxnKqpRequests::ADD_KAFKA_OPERATIONS_TO_TXN:
+ HandleAddKafkaOperationsResponse(ev->Get()->Record.GetResponse().GetTxMeta().id(), ctx);
+ break;
case EKafkaTxnKqpRequests::COMMIT:
HandleCommitResponse(ctx);
break;
@@ -133,7 +139,7 @@ namespace NKafka {
void TTransactionActor::StartKqpSession(const TActorContext& ctx) {
Kqp = std::make_unique<TKqpTxHelper>(DatabasePath);
KAFKA_LOG_D("Sending create session request to KQP for database " << DatabasePath);
- Kqp->SendCreateSessionRequest(ctx, KqpActorId);
+ Kqp->SendCreateSessionRequest(ctx);
}
void TTransactionActor::SendToKqpValidationRequests(const TActorContext& ctx) {
@@ -143,19 +149,18 @@ namespace NKafka {
BuildSelectParams(),
++KqpCookie,
ctx,
- false,
- KqpActorId
+ false
);
LastSentToKqpRequest = EKafkaTxnKqpRequests::SELECT;
}
- void TTransactionActor::SendCommitTxnRequest(const TString& kqpTransactionId) {
- auto request = BuildCommitTxnRequestToKqp(kqpTransactionId);
+ void TTransactionActor::SendAddKafkaOperationsToTxRequest(const TString& kqpTransactionId) {
+ auto request = BuildAddKafkaOperationsRequest(kqpTransactionId);
- Send(KqpActorId, request.Release(), 0, ++KqpCookie);
+ Send(MakeKqpProxyID(SelfId().NodeId()), request.Release(), 0, ++KqpCookie);
- LastSentToKqpRequest = EKafkaTxnKqpRequests::COMMIT;
+ LastSentToKqpRequest = EKafkaTxnKqpRequests::ADD_KAFKA_OPERATIONS_TO_TXN;
}
// Response senders
@@ -185,13 +190,13 @@ namespace NKafka {
if (Kqp) {
Kqp->CloseKqpSession(ctx);
}
- Send(TxnCoordinatorActorId, new TEvKafka::TEvTransactionActorDied(TransactionalId, ProducerInstanceId));
+ Send(MakeTransactionsServiceID(SelfId().NodeId()), new TEvKafka::TEvTransactionActorDied(TransactionalId, ProducerInstanceId));
TBase::Die(ctx);
}
template<class EventType>
bool TTransactionActor::ProducerInRequestIsValid(TMessagePtr<EventType> kafkaRequest) {
- return kafkaRequest->TransactionalId->c_str() == TransactionalId
+ return *kafkaRequest->TransactionalId == TransactionalId
&& kafkaRequest->ProducerId == ProducerInstanceId.Id
&& kafkaRequest->ProducerEpoch == ProducerInstanceId.Epoch;
}
@@ -236,7 +241,7 @@ namespace NKafka {
return params.Build();
}
- THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> TTransactionActor::BuildCommitTxnRequestToKqp(const TString& kqpTransactionId) {
+ THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> TTransactionActor::BuildAddKafkaOperationsRequest(const TString& kqpTransactionId) {
auto ev = MakeHolder<TEvKqp::TEvQueryRequest>();
ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_UNDEFINED);
@@ -244,13 +249,11 @@ namespace NKafka {
ev->Record.MutableRequest()->SetDatabase(DatabasePath);
ev->Record.MutableRequest()->SetSessionId(KqpSessionId);
- ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true);
ev->Record.MutableRequest()->MutableTxControl()->set_tx_id(kqpTransactionId);
ev->Record.MutableRequest()->SetUsePublicResponseDataFormat(true);
auto* kafkaApiOperations = ev->Record.MutableRequest()->MutableKafkaApiOperations();
- kafkaApiOperations->set_transactionalid(TransactionalId);
kafkaApiOperations->set_producerid(ProducerInstanceId.Id);
kafkaApiOperations->set_producerepoch(ProducerInstanceId.Epoch);
@@ -265,10 +268,9 @@ namespace NKafka {
offsetInRequest->set_topicpath(partition.TopicPath);
offsetInRequest->set_partitionid(partition.PartitionId);
offsetInRequest->set_consumername(offsetDetails.ConsumerName);
- offsetInRequest->set_consumergeneration(offsetDetails.ConsumerGeneration);
offsetInRequest->set_offset(offsetDetails.Offset);
}
-
+
return ev;
}
@@ -309,10 +311,16 @@ namespace NKafka {
return;
}
- KAFKA_LOG_D("Validated producer and consumers states. Everything is alright, sending commit");
+ KAFKA_LOG_D("Validated producer and consumers states. Everything is alright, adding kafka operations to transaction.");
auto kqpTxnId = response.Record.GetResponse().GetTxMeta().id();
- // finally everything is valid and we can attempt to commit
- SendCommitTxnRequest(kqpTxnId);
+ // finally everything is valid and we can add kafka operations to transaction and attempt to commit
+ SendAddKafkaOperationsToTxRequest(kqpTxnId);
+ }
+
+ void TTransactionActor::HandleAddKafkaOperationsResponse(const TString& kqpTransactionId, const TActorContext& ctx) {
+ KAFKA_LOG_D("Successfully added kafka operations to transaction. Committing transaction.");
+ Kqp->SetTxId(kqpTransactionId);
+ Kqp->CommitTx(++KqpCookie, ctx);
}
void TTransactionActor::HandleCommitResponse(const TActorContext& ctx) {
@@ -370,12 +378,12 @@ namespace NKafka {
}
}
- /**
- * Parses the response to extract consumer group generations.
- *
- * @param response The response object containing the result set from the YDB query.
- * @return A map where keys are consumer group names and values are their corresponding generations.
- */
+ /**
+ * Parses the response to extract consumer group generations.
+ *
+ * @param response The response object containing the result set from the YDB query.
+ * @return A map where keys are consumer group names and values are their corresponding generations.
+ */
std::unordered_map<TString, i32> TTransactionActor::ParseConsumersGenerations(const NKqp::TEvKqp::TEvQueryResponse& response) {
std::unordered_map<TString, i32> generationByConsumerName;
@@ -425,8 +433,10 @@ namespace NKafka {
switch (request) {
case SELECT:
return "SELECT";
+ case ADD_KAFKA_OPERATIONS_TO_TXN:
+ return "ADD_KAFKA_OPERATIONS_TO_TXN";
case COMMIT:
- return "SELECT";
+ return "COMMIT";
case NO_REQUEST:
return "NO_REQUEST";
}
diff --git a/ydb/core/kafka_proxy/actors/kafka_transaction_actor.h b/ydb/core/kafka_proxy/actors/kafka_transaction_actor.h
index 104548248a0..ca9c859461b 100644
--- a/ydb/core/kafka_proxy/actors/kafka_transaction_actor.h
+++ b/ydb/core/kafka_proxy/actors/kafka_transaction_actor.h
@@ -31,19 +31,18 @@ namespace NKafka {
// This request selects up-to-date producer and consumers states from relevant tables
// After this request a check will happen, that no transaction details has expired.
SELECT,
+ // This requests just adds all transaction details (partitions, offsets) to KQP
+ ADD_KAFKA_OPERATIONS_TO_TXN,
// This request sends to KQP a command to commit transaction
- // Both these requests happen in same transaction
- COMMIT
+ COMMIT
};
// we need to exlplicitly specify kqpActorId and txnCoordinatorActorId for unit tests
- TTransactionActor(const TString& transactionalId, i64 producerId, i16 producerEpoch, const TString& DatabasePath, const TActorId& kqpActorId, const TActorId& txnCoordinatorActorId) :
+ TTransactionActor(const TString& transactionalId, i64 producerId, i16 producerEpoch, const TString& DatabasePath) :
TBase(&TTransactionActor::StateFunc),
TransactionalId(transactionalId),
ProducerInstanceId({producerId, producerEpoch}),
- DatabasePath(DatabasePath),
- TxnCoordinatorActorId(txnCoordinatorActorId),
- KqpActorId(kqpActorId) {};
+ DatabasePath(DatabasePath) {};
TStringBuilder LogPrefix() const {
return TStringBuilder() << "KafkaTransactionActor{TransactionalId=" << TransactionalId << "; ProducerId=" << ProducerInstanceId.Id << "; ProducerEpoch=" << ProducerInstanceId.Epoch << "}: ";
@@ -62,7 +61,10 @@ namespace NKafka {
HFunc(TEvents::TEvPoison, Handle);
}
} catch (const yexception& y) {
- KAFKA_LOG_CRIT(TStringBuilder() << "Critical error happened. Reason: " << y.what());
+ KAFKA_LOG_CRIT("Critical error happened. Reason: " << y.what());
+ if (EndTxnRequestPtr) {
+ SendFailResponse<TEndTxnResponseData>(EndTxnRequestPtr, EKafkaErrors::UNKNOWN_SERVER_ERROR, y.what());
+ }
Die(ActorContext());
}
}
@@ -82,7 +84,7 @@ namespace NKafka {
// Transaction commit logic
void StartKqpSession(const TActorContext& ctx);
void SendToKqpValidationRequests(const TActorContext& ctx);
- void SendCommitTxnRequest(const TString& kqpTransactionId);
+ void SendAddKafkaOperationsToTxRequest(const TString& kqpTransactionId);
// Response senders
template<class ErrorResponseType, class EventType>
@@ -97,8 +99,9 @@ namespace NKafka {
TString GetFullTopicPath(const TString& topicName);
TString GetYqlWithTablesNames(const TString& templateStr);
NYdb::TParams BuildSelectParams();
- THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> BuildCommitTxnRequestToKqp(const TString& kqpTransactionId);
+ THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> BuildAddKafkaOperationsRequest(const TString& kqpTransactionId);
void HandleSelectResponse(const NKqp::TEvKqp::TEvQueryResponse& response, const TActorContext& ctx);
+ void HandleAddKafkaOperationsResponse(const TString& kqpTransactionId, const TActorContext& ctx);
void HandleCommitResponse(const TActorContext& ctx);
TMaybe<TString> GetErrorFromYdbResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev);
TMaybe<TProducerState> ParseProducerState(const NKqp::TEvKqp::TEvQueryResponse& response);
@@ -112,8 +115,6 @@ namespace NKafka {
std::unordered_set<TTopicPartition, TTopicPartitionHashFn> PartitionsInTxn = {};
const TString TransactionalId;
const TProducerInstanceId ProducerInstanceId;
- // const i64 ProducerId;
- // const i16 ProducerEpoch;
// helper fields
const TString DatabasePath;
@@ -121,11 +122,9 @@ namespace NKafka {
// In case something goes off road, we can always send error back to client
TAutoPtr<TEventHandle<TEvKafka::TEvEndTxnRequest>> EndTxnRequestPtr;
bool CommitStarted = false;
- const TActorId TxnCoordinatorActorId;
// communication with KQP
std::unique_ptr<TKqpTxHelper> Kqp;
- TActorId KqpActorId;
TString KqpSessionId;
ui64 KqpCookie = 0;
EKafkaTxnKqpRequests LastSentToKqpRequest;
diff --git a/ydb/core/kafka_proxy/actors/kafka_transaction_actor_sql.cpp b/ydb/core/kafka_proxy/actors/kafka_transaction_actor_sql.cpp
index e4f18f3be3a..00d9bdb3a9b 100644
--- a/ydb/core/kafka_proxy/actors/kafka_transaction_actor_sql.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_transaction_actor_sql.cpp
@@ -12,10 +12,10 @@ namespace NKafka::NKafkaTransactionSql {
WHERE database = $Database
AND transactional_id = $TransactionalId;
- SELECT consumer_group, MAX(generation) FROM `<consumer_state_table_name>`
+ SELECT consumer_group, MAX(generation) AS generation FROM `<consumer_state_table_name>`
VIEW PRIMARY KEY
WHERE database = $Database
- AND consumer_group IN COMPACT ($ConsumerGroups)
+ AND consumer_group IN COMPACT $ConsumerGroups
GROUP BY consumer_group;
)sql";
diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp
index 004ec9428b1..4e63e1717c8 100644
--- a/ydb/core/kafka_proxy/kafka_connection.cpp
+++ b/ydb/core/kafka_proxy/kafka_connection.cpp
@@ -51,7 +51,6 @@ public:
TEvPollerReady* InactivityEvent = nullptr;
const TActorId ListenerActorId;
- const TActorId KafkaTxnCoordinatorActorId = NKafka::MakeTransactionsServiceID();
TIntrusivePtr<TSocketDescriptor> Socket;
TSocketAddressType Address;
@@ -341,7 +340,7 @@ protected:
}
void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TAddPartitionsToTxnRequestData>& message) {
- Send(MakeTransactionsServiceID(), new TEvKafka::TEvAddPartitionsToTxnRequest(
+ Send(MakeTransactionsServiceID(SelfId().NodeId()), new TEvKafka::TEvAddPartitionsToTxnRequest(
header->CorrelationId,
message,
Context->ConnectionId,
@@ -350,7 +349,7 @@ protected:
}
void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TAddOffsetsToTxnRequestData>& message) {
- Send(MakeTransactionsServiceID(), new TEvKafka::TEvAddOffsetsToTxnRequest(
+ Send(MakeTransactionsServiceID(SelfId().NodeId()), new TEvKafka::TEvAddOffsetsToTxnRequest(
header->CorrelationId,
message,
Context->ConnectionId,
@@ -359,7 +358,7 @@ protected:
}
void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TTxnOffsetCommitRequestData>& message) {
- Send(MakeTransactionsServiceID(), new TEvKafka::TEvTxnOffsetCommitRequest(
+ Send(MakeTransactionsServiceID(SelfId().NodeId()), new TEvKafka::TEvTxnOffsetCommitRequest(
header->CorrelationId,
message,
Context->ConnectionId,
@@ -368,7 +367,7 @@ protected:
}
void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TEndTxnRequestData>& message) {
- Send(MakeTransactionsServiceID(), new TEvKafka::TEvEndTxnRequest(
+ Send(MakeTransactionsServiceID(SelfId().NodeId()), new TEvKafka::TEvEndTxnRequest(
header->CorrelationId,
message,
Context->ConnectionId,
diff --git a/ydb/core/kafka_proxy/kafka_transactions_coordinator.cpp b/ydb/core/kafka_proxy/kafka_transactions_coordinator.cpp
index 95eabbe719b..a66129071a7 100644
--- a/ydb/core/kafka_proxy/kafka_transactions_coordinator.cpp
+++ b/ydb/core/kafka_proxy/kafka_transactions_coordinator.cpp
@@ -83,11 +83,11 @@ namespace NKafka {
template<class ErrorResponseType, class EventType>
void TTransactionsCoordinator::HandleTransactionalRequest(TAutoPtr<TEventHandle<EventType>>& evHandle, const TActorContext& ctx) {
EventType* ev = evHandle->Get();
- KAFKA_LOG_D("Received message for transactionalId " << ev->Request->TransactionalId->c_str() << " and ApiKey " << ev->Request->ApiKey());
+ KAFKA_LOG_D("Received message for transactionalId " << *ev->Request->TransactionalId << " and ApiKey " << ev->Request->ApiKey());
// create helper struct to simplify methods interaction
auto txnRequest = TTransactionalRequest(
- ev->Request->TransactionalId->c_str(),
+ *ev->Request->TransactionalId,
TProducerInstanceId(ev->Request->ProducerId, ev->Request->ProducerEpoch),
ev->CorrelationId,
ev->ConnectionId
@@ -111,16 +111,16 @@ namespace NKafka {
EventType* ev = evHandle->Get();
TActorId txnActorId;
- if (TxnActorByTransactionalId.contains(ev->Request->TransactionalId->c_str())) {
- txnActorId = TxnActorByTransactionalId[ev->Request->TransactionalId->c_str()];
+ if (TxnActorByTransactionalId.contains(*ev->Request->TransactionalId)) {
+ txnActorId = TxnActorByTransactionalId[*ev->Request->TransactionalId];
} else {
- txnActorId = ctx.Register(new TTransactionActor(ev->Request->TransactionalId->c_str(), ev->Request->ProducerId, ev->Request->ProducerEpoch, ev->DatabasePath, NKikimr::NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ctx.SelfID));
- TxnActorByTransactionalId[ev->Request->TransactionalId->c_str()] = txnActorId;
- KAFKA_LOG_D("Registered TTransactionActor with id " << txnActorId << " for transactionalId " << ev->Request->TransactionalId->c_str() << " and ApiKey " << ev->Request->ApiKey());
+ txnActorId = ctx.Register(new TTransactionActor(*ev->Request->TransactionalId, ev->Request->ProducerId, ev->Request->ProducerEpoch, ev->DatabasePath));
+ TxnActorByTransactionalId[*ev->Request->TransactionalId] = txnActorId;
+ KAFKA_LOG_D("Registered TTransactionActor with id " << txnActorId << " for transactionalId " << *ev->Request->TransactionalId << " and ApiKey " << ev->Request->ApiKey());
}
TAutoPtr<IEventHandle> tmpPtr = evHandle.Release();
ctx.Forward(tmpPtr, txnActorId);
- KAFKA_LOG_D("Forwarded message to TTransactionActor with id " << txnActorId << " for transactionalId " << ev->Request->TransactionalId->c_str() << " and ApiKey " << ev->Request->ApiKey());
+ KAFKA_LOG_D("Forwarded message to TTransactionActor with id " << txnActorId << " for transactionalId " << *ev->Request->TransactionalId << " and ApiKey " << ev->Request->ApiKey());
};
void TTransactionsCoordinator::DeleteTransactionActor(const TString& transactionalId) {
diff --git a/ydb/core/kafka_proxy/kafka_transactions_coordinator.h b/ydb/core/kafka_proxy/kafka_transactions_coordinator.h
index 965a2f0807a..5a4d596ba54 100644
--- a/ydb/core/kafka_proxy/kafka_transactions_coordinator.h
+++ b/ydb/core/kafka_proxy/kafka_transactions_coordinator.h
@@ -82,9 +82,9 @@ namespace NKafka {
return new TTransactionsCoordinator();
};
- inline TActorId MakeTransactionsServiceID() {
+ inline TActorId MakeTransactionsServiceID(ui32 nodeId) {
static const char x[12] = "kafka_txns";
- return TActorId(0, TStringBuf(x, 12));
+ return TActorId(nodeId, TStringBuf(x, 12));
};
} // namespace NKafka \ No newline at end of file
diff --git a/ydb/core/kafka_proxy/kqp_helper.cpp b/ydb/core/kafka_proxy/kqp_helper.cpp
index 0ca2e3c1ec6..a592138b162 100644
--- a/ydb/core/kafka_proxy/kqp_helper.cpp
+++ b/ydb/core/kafka_proxy/kqp_helper.cpp
@@ -9,15 +9,9 @@ TKqpTxHelper::TKqpTxHelper(TString database)
: DataBase(database)
{}
-void TKqpTxHelper::SendCreateSessionRequest(const TActorContext& ctx, const TMaybe<TActorId>& kqpActorId) {
+void TKqpTxHelper::SendCreateSessionRequest(const TActorContext& ctx) {
auto ev = MakeCreateSessionRequest();
- TActorId actorId;
- if (!kqpActorId) {
- actorId = MakeKqpProxyID(ctx.SelfID.NodeId());
- } else {
- actorId = *kqpActorId;
- }
- ctx.Send(actorId, ev.Release(), 0, 0);
+ ctx.Send(MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release(), 0, 0);
}
void TKqpTxHelper::BeginTransaction(ui64 cookie, const TActorContext& ctx) {
@@ -68,7 +62,7 @@ void TKqpTxHelper::SendRequest(THolder<TEvKqp::TEvQueryRequest> request, ui64 co
ctx.Send(MakeKqpProxyID(ctx.SelfID.NodeId()), request.Release(), 0, cookie);
}
-void TKqpTxHelper::SendYqlRequest(const TString& yqlRequest, NYdb::TParams sqlParams, ui64 cookie, const TActorContext& ctx, bool commit, const TMaybe<TActorId>& kqpActorId) {
+void TKqpTxHelper::SendYqlRequest(const TString& yqlRequest, NYdb::TParams sqlParams, ui64 cookie, const TActorContext& ctx, bool commit) {
auto ev = MakeHolder<TEvKqp::TEvQueryRequest>();
ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
@@ -87,13 +81,7 @@ void TKqpTxHelper::SendYqlRequest(const TString& yqlRequest, NYdb::TParams sqlPa
ev->Record.MutableRequest()->MutableYdbParameters()->swap(*(NYdb::TProtoAccessor::GetProtoMapPtr(sqlParams)));
- TActorId actorId;
- if (!kqpActorId) {
- actorId = MakeKqpProxyID(ctx.SelfID.NodeId());
- } else {
- actorId = *kqpActorId;
- }
- ctx.Send(actorId, ev.Release(), 0, cookie);
+ ctx.Send(MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release(), 0, cookie);
}
void TKqpTxHelper::CommitTx(ui64 cookie, const TActorContext& ctx) {
diff --git a/ydb/core/kafka_proxy/kqp_helper.h b/ydb/core/kafka_proxy/kqp_helper.h
index ea22719b8e1..42ce77bc05a 100644
--- a/ydb/core/kafka_proxy/kqp_helper.h
+++ b/ydb/core/kafka_proxy/kqp_helper.h
@@ -15,13 +15,13 @@ class TKqpTxHelper {
public:
TKqpTxHelper(TString database);
// kqp actor id can be changed for testing purposes
- void SendCreateSessionRequest(const TActorContext& ctx, const TMaybe<TActorId>& kqpActorId = {});
+ void SendCreateSessionRequest(const TActorContext& ctx);
void BeginTransaction(ui64 cookie, const TActorContext& ctx);
bool HandleCreateSessionResponse(TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& ctx);
void CloseKqpSession(const TActorContext& ctx);
void SendRequest(THolder<TEvKqp::TEvQueryRequest> request, ui64 cookie, const TActorContext& ctx);
void CommitTx(ui64 cookie, const TActorContext& ctx);
- void SendYqlRequest(const TString& yqlRequest, NYdb::TParams sqlParams, ui64 cookie, const TActorContext& ctx, bool commit = false, const TMaybe<TActorId>& kqpActorId = {});
+ void SendYqlRequest(const TString& yqlRequest, NYdb::TParams sqlParams, ui64 cookie, const TActorContext& ctx, bool commit = false);
void SendInitTableRequest(const TActorContext& ctx, std::shared_ptr<NKikimr::NMetadata::IClassBehaviour> prepareManager);
void SetTxId(const TString& txId);
void ResetTxId();
diff --git a/ydb/core/kafka_proxy/ut/ut_transaction_actor.cpp b/ydb/core/kafka_proxy/ut/ut_transaction_actor.cpp
index 0249adfb9a0..a240328ca23 100644
--- a/ydb/core/kafka_proxy/ut/ut_transaction_actor.cpp
+++ b/ydb/core/kafka_proxy/ut/ut_transaction_actor.cpp
@@ -1,5 +1,6 @@
#include <ydb/core/kafka_proxy/actors/kafka_transaction_actor.h>
#include <ydb/core/kafka_proxy/kafka_events.h>
+#include <ydb/core/kafka_proxy/kafka_transactions_coordinator.h>
#include <library/cpp/testing/unittest/registar.h>
#include <util/generic/fwd.h>
@@ -45,7 +46,10 @@ namespace {
THolder<NKqp::TEvKqp::TEvQueryResponse> response;
if (ev->Get()->Record.GetRequest().GetTxControl().commit_tx()) {
Cout << "Sending response on commit from dummy kqp" << Endl;
- response = MakeResponseOnCommit();
+ response = MakeSimpleSuccessResponse();
+ } else if (ev->Get()->Record.GetRequest().HasKafkaApiOperations()) {
+ Cout << "Sending response on add kafka operations from dummy kqp" << Endl;
+ response = MakeSimpleSuccessResponse();
} else {
Cout << "Sending response on select from dummy kqp" << Endl;
response = MakeResponseOnSelectFromKqp();
@@ -59,7 +63,7 @@ namespace {
));
}
- THolder<NKqp::TEvKqp::TEvQueryResponse> MakeResponseOnCommit() {
+ THolder<NKqp::TEvKqp::TEvQueryResponse> MakeSimpleSuccessResponse() {
auto response = MakeHolder<NKqp::TEvKqp::TEvQueryResponse>();
NKikimrKqp::TEvQueryResponse record;
if (ReturnSuccessOnCommit) {
@@ -176,7 +180,6 @@ namespace {
};
struct TQueryRequestMatcher {
- bool CommitTx;
TVector<TTopicPartitions> TopicPartitions;
TVector<TConsumerCommitMatcher> ConsumerCommitMatchers;
};
@@ -210,13 +213,13 @@ namespace {
Ctx->Runtime->SetLogPriority(NKikimrServices::KAFKA_PROXY, NLog::PRI_DEBUG);
DummyKqpActor = new TDummyKqpActor();
KqpActorId = Ctx->Runtime->Register(DummyKqpActor);
+ Ctx->Runtime->RegisterService(MakeKqpProxyID(Ctx->Runtime->GetNodeId()), KqpActorId);
+ Ctx->Runtime->RegisterService(MakeTransactionsServiceID(Ctx->Runtime->GetNodeId()), Ctx->Edge);
ActorId = Ctx->Runtime->Register(new TTransactionActor(
TransactionalId,
ProducerId,
ProducerEpoch,
- Database,
- KqpActorId,
- Ctx->Edge
+ Database
));
DummyKqpActor->SetValidationResponse(TransactionalId, ProducerId, ProducerEpoch);
}
@@ -291,7 +294,7 @@ namespace {
// Arguments:
// 1. callback - function, that will be called on recieving the response from KQP om commit request
// 2. consumerGenerationsToReturnInValidationRequest - map of consumer name to its generation to ensure proper validation of consumer state by actor
- void AddObserverForCommitRequestToKqp(std::function<void(const TEvKqp::TEvQueryRequest*)> callback, std::unordered_map<TString, i32> consumerGenerationsToReturnInValidationRequest = {}) {
+ void AddObserverForAddOperationsRequest(std::function<void(const TEvKqp::TEvQueryRequest*)> callback, std::unordered_map<TString, i32> consumerGenerationsToReturnInValidationRequest = {}) {
DummyKqpActor->SetValidationResponse(TransactionalId, ProducerId, ProducerEpoch, consumerGenerationsToReturnInValidationRequest);
auto observer = [callback = std::move(callback), this](TAutoPtr<IEventHandle>& input) {
@@ -314,8 +317,7 @@ namespace {
void MatchQueryRequest(const NKikimr::NKqp::TEvKqp::TEvQueryRequest* request, const TQueryRequestMatcher& matcher) {
UNIT_ASSERT_VALUES_EQUAL(request->Record.GetRequest().GetTxControl().begin_tx().has_serializable_read_write(), false);
- UNIT_ASSERT_VALUES_EQUAL(request->Record.GetRequest().GetTxControl().commit_tx(), matcher.CommitTx);
- UNIT_ASSERT_VALUES_EQUAL(request->Record.GetRequest().GetKafkaApiOperations().GetTransactionalId(), TransactionalId);
+ UNIT_ASSERT_VALUES_EQUAL(request->Record.GetRequest().GetAction(), NKikimrKqp::QUERY_ACTION_TOPIC);
UNIT_ASSERT_VALUES_EQUAL(request->Record.GetRequest().GetKafkaApiOperations().GetProducerId(), ProducerId);
UNIT_ASSERT_VALUES_EQUAL(request->Record.GetRequest().GetKafkaApiOperations().GetProducerEpoch(), ProducerEpoch);
@@ -364,7 +366,6 @@ namespace {
UNIT_ASSERT_VALUES_EQUAL(it->GetPartitionId(), partitionOffset.first);
UNIT_ASSERT_VALUES_EQUAL(it->GetOffset(), partitionOffset.second);
UNIT_ASSERT_VALUES_EQUAL(it->GetConsumerName(), consumerCommitMatcher.ConsumerName);
- UNIT_ASSERT_VALUES_EQUAL(it->GetConsumerGeneration(), consumerCommitMatcher.GenerationId);
}
}
}
@@ -379,8 +380,8 @@ namespace {
Y_UNIT_TEST(OnAddPartitionsAndEndTxn_shouldSendTxnToKqpWithSpecifiedPartitions) {
TVector<TTopicPartitions> topics = {{"topic1", {0, 1}}, {"topic2", {0}}};
bool seenEvent = false;
- AddObserverForCommitRequestToKqp([&](const NKikimr::NKqp::TEvKqp::TEvQueryRequest* request) {
- MatchQueryRequest(request, {true, topics, {}});
+ AddObserverForAddOperationsRequest([&](const NKikimr::NKqp::TEvKqp::TEvQueryRequest* request) {
+ MatchQueryRequest(request, {topics, {}});
seenEvent = true;
});
@@ -419,11 +420,11 @@ namespace {
UNIT_ASSERT_EQUAL(message.Results[1].Results[0].ErrorCode, NKafka::EKafkaErrors::NONE_ERROR);
}
- Y_UNIT_TEST(OnDoubleAddPartitionsWithSamePartitionsAndEndTxn_shouldSendTxnToKqpWithOnceSpecifiedPartitions) {
+ Y_UNIT_TEST(OnDoubleAddPartitionsWithSamePartitionsAndEndTxn_shouldSendTxnToKqpKafkaOperationsWithOnceSpecifiedPartitions) {
TVector<TTopicPartitions> topics = {{"topic1", {0}}};
bool seenEvent = false;
- AddObserverForCommitRequestToKqp([&](const NKikimr::NKqp::TEvKqp::TEvQueryRequest* request) {
- MatchQueryRequest(request, {true, topics, {}});
+ AddObserverForAddOperationsRequest([&](const NKikimr::NKqp::TEvKqp::TEvQueryRequest* request) {
+ MatchQueryRequest(request, {topics, {}});
seenEvent = true;
});
@@ -439,12 +440,12 @@ namespace {
UNIT_ASSERT(seenEvent);
}
- Y_UNIT_TEST(OnDoubleAddPartitionsWithDifferentPartitionsAndEndTxn_shouldSendTxnToKqpWithAllSpecifiedPartitions) {
+ Y_UNIT_TEST(OnDoubleAddPartitionsWithDifferentPartitionsAndEndTxn_shouldSendTxnToKqKafkaOperationspWithAllSpecifiedPartitions) {
TVector<TTopicPartitions> topics1 = {{"topic1", {0}}};
TVector<TTopicPartitions> topics2 = {{"topic2", {0}}};
bool seenEvent = false;
- AddObserverForCommitRequestToKqp([&](const NKikimr::NKqp::TEvKqp::TEvQueryRequest* request) {
- MatchQueryRequest(request, {true, {{"topic1", {0}}, {"topic2", {0}}}, {}});
+ AddObserverForAddOperationsRequest([&](const NKikimr::NKqp::TEvKqp::TEvQueryRequest* request) {
+ MatchQueryRequest(request, {{{"topic1", {0}}, {"topic2", {0}}}, {}});
seenEvent = true;
});
@@ -469,8 +470,8 @@ namespace {
std::unordered_map<TString, i32> consumerGenerationByNameToReturnFromKqp;
consumerGenerationByNameToReturnFromKqp[consumerName] = consumerGeneration;
bool seenEvent = false;
- AddObserverForCommitRequestToKqp([&](const NKikimr::NKqp::TEvKqp::TEvQueryRequest* request) {
- MatchQueryRequest(request, {true, {}, {{consumerName, consumerGeneration, partitionOffsetsToCommitByTopic}}});
+ AddObserverForAddOperationsRequest([&](const NKikimr::NKqp::TEvKqp::TEvQueryRequest* request) {
+ MatchQueryRequest(request, {{}, {{consumerName, consumerGeneration, partitionOffsetsToCommitByTopic}}});
seenEvent = true;
}, consumerGenerationByNameToReturnFromKqp);
@@ -493,8 +494,8 @@ namespace {
std::unordered_map<TString, i32> consumerGenerationByNameToReturnFromKqp;
consumerGenerationByNameToReturnFromKqp[consumerName] = consumerGenerationFromTable;
bool seenEvent = false;
- AddObserverForCommitRequestToKqp([&](const NKikimr::NKqp::TEvKqp::TEvQueryRequest* request) {
- MatchQueryRequest(request, {true, {}, {{consumerName, consumerGenerationFromTable, partitionOffsetsToCommitByTopic}}});
+ AddObserverForAddOperationsRequest([&](const NKikimr::NKqp::TEvKqp::TEvQueryRequest* request) {
+ MatchQueryRequest(request, {{}, {{consumerName, consumerGenerationFromTable, partitionOffsetsToCommitByTopic}}});
seenEvent = true;
}, consumerGenerationByNameToReturnFromKqp);
@@ -522,9 +523,9 @@ namespace {
std::unordered_map<TString, i32> consumerGenerationByNameToReturnFromKqp;
consumerGenerationByNameToReturnFromKqp[consumerName] = consumerGeneration;
bool seenEvent = false;
- AddObserverForCommitRequestToKqp([&](const NKikimr::NKqp::TEvKqp::TEvQueryRequest* request) {
+ AddObserverForAddOperationsRequest([&](const NKikimr::NKqp::TEvKqp::TEvQueryRequest* request) {
// we validate that to KQP are sent offsets from second request
- MatchQueryRequest(request, {true, {}, {{consumerName, consumerGeneration, secondRequestOffsetsByTopic}}});
+ MatchQueryRequest(request, {{}, {{consumerName, consumerGeneration, secondRequestOffsetsByTopic}}});
seenEvent = true;
}, consumerGenerationByNameToReturnFromKqp);
@@ -569,7 +570,7 @@ namespace {
UNIT_ASSERT_EQUAL(message.Topics[1].Partitions[1].ErrorCode, NKafka::EKafkaErrors::NONE_ERROR);
}
- Y_UNIT_TEST(OnAddOffsetsToTxnAndAddPartitionsAndEndTxn_shouldSendToKqpCorrectTxn) {
+ Y_UNIT_TEST(OnAddOffsetsToTxnAndAddPartitionsAndEndTxn_shouldSendToKqpCorrectAddKafkaOperationsRequest) {
TVector<TTopicPartitions> topics = {{"topic3", {0}}};
TString consumerName = "my-consumer";
i32 consumerGeneration = 0;
@@ -579,8 +580,8 @@ namespace {
std::unordered_map<TString, i32> consumerGenerationByNameToReturnFromKqp;
consumerGenerationByNameToReturnFromKqp[consumerName] = consumerGeneration;
bool seenEvent = false;
- AddObserverForCommitRequestToKqp([&](const NKikimr::NKqp::TEvKqp::TEvQueryRequest* request) {
- MatchQueryRequest(request, {true, topics, {{consumerName, consumerGeneration, partitionOffsetsToCommitByTopic}}});
+ AddObserverForAddOperationsRequest([&](const NKikimr::NKqp::TEvKqp::TEvQueryRequest* request) {
+ MatchQueryRequest(request, {topics, {{consumerName, consumerGeneration, partitionOffsetsToCommitByTopic}}});
seenEvent = true;
}, consumerGenerationByNameToReturnFromKqp);
@@ -607,6 +608,11 @@ namespace {
const auto& result = static_cast<const NKafka::TEndTxnResponseData&>(*response->Response);
UNIT_ASSERT_VALUES_EQUAL(response->CorrelationId, correlationId);
UNIT_ASSERT_VALUES_EQUAL(result.ErrorCode, NKafka::EKafkaErrors::NONE_ERROR);
+ auto txnActorDiedEvent = Ctx->Runtime->GrabEdgeEvent<NKafka::TEvKafka::TEvTransactionActorDied>();
+ UNIT_ASSERT(txnActorDiedEvent != nullptr);
+ UNIT_ASSERT_VALUES_EQUAL(txnActorDiedEvent->TransactionalId, TransactionalId);
+ UNIT_ASSERT_VALUES_EQUAL(txnActorDiedEvent->ProducerState.Id, ProducerId);
+ UNIT_ASSERT_VALUES_EQUAL(txnActorDiedEvent->ProducerState.Epoch, ProducerEpoch);
}
Y_UNIT_TEST(OnEndTxnWithCommitAndAbortFromTxn_shouldReturnBROKER_NOT_AVAILABLE) {
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index f9c2f1fa224..bea993525be 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -1453,7 +1453,7 @@ namespace Tests {
Runtime->RegisterService(NKafka::MakeKafkaDiscoveryCacheID(), discoveryCacheId, nodeIdx);
TActorId kafkaTxnCoordinatorActorId = Runtime->Register(NKafka::CreateTransactionsCoordinator(), nodeIdx, userPoolId);
- Runtime->RegisterService(NKafka::MakeTransactionsServiceID(), kafkaTxnCoordinatorActorId, nodeIdx);
+ Runtime->RegisterService(NKafka::MakeTransactionsServiceID(Runtime->GetNodeId(nodeIdx)), kafkaTxnCoordinatorActorId, nodeIdx);
NKafka::TListenerSettings settings;
settings.Port = Settings->AppConfig->GetKafkaProxyConfig().GetListeningPort();