diff options
| author | Andrey Serebryanskiy <[email protected]> | 2025-05-23 12:47:36 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2025-05-23 12:47:36 +0300 |
| commit | 94d96d2b7d5d19d6483ad161c3188578aa2e2b4e (patch) | |
| tree | 69fc09574913a869a5cb44248f0c2f22ef9da4d9 | |
| parent | a536360eb0b96a2bb2730e42f523ae060a8a058c (diff) | |
Kafka api transactions fixes (#18639)
| -rw-r--r-- | ydb/core/driver_lib/run/kikimr_services_initializers.cpp | 2 | ||||
| -rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_balancer_actor.h | 2 | ||||
| -rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp | 8 | ||||
| -rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_transaction_actor.cpp | 66 | ||||
| -rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_transaction_actor.h | 25 | ||||
| -rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_transaction_actor_sql.cpp | 4 | ||||
| -rw-r--r-- | ydb/core/kafka_proxy/kafka_connection.cpp | 9 | ||||
| -rw-r--r-- | ydb/core/kafka_proxy/kafka_transactions_coordinator.cpp | 16 | ||||
| -rw-r--r-- | ydb/core/kafka_proxy/kafka_transactions_coordinator.h | 4 | ||||
| -rw-r--r-- | ydb/core/kafka_proxy/kqp_helper.cpp | 20 | ||||
| -rw-r--r-- | ydb/core/kafka_proxy/kqp_helper.h | 4 | ||||
| -rw-r--r-- | ydb/core/kafka_proxy/ut/ut_transaction_actor.cpp | 60 | ||||
| -rw-r--r-- | ydb/core/testlib/test_client.cpp | 2 |
13 files changed, 112 insertions, 110 deletions
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 3f214455536..dba3b5713a2 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -2783,7 +2783,7 @@ void TKafkaProxyServiceInitializer::InitializeServices(NActors::TActorSystemSetu ); setup->LocalServices.emplace_back( - NKafka::MakeTransactionsServiceID(), + NKafka::MakeTransactionsServiceID(NodeId), TActorSetupCmd(NKafka::CreateTransactionsCoordinator(), TMailboxType::HTSwap, appData->UserPoolId ) diff --git a/ydb/core/kafka_proxy/actors/kafka_balancer_actor.h b/ydb/core/kafka_proxy/actors/kafka_balancer_actor.h index 6a998b69b07..79d72ba8f61 100644 --- a/ydb/core/kafka_proxy/actors/kafka_balancer_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_balancer_actor.h @@ -138,7 +138,7 @@ public: InstanceId = JoinGroupRequestData->GroupInstanceId.value_or(""); MemberId = JoinGroupRequestData->MemberId.value_or(""); - KAFKA_LOG_ERROR(TStringBuilder() << "JOIN_GROUP request. MemberId# " << MemberId); + KAFKA_LOG_D(TStringBuilder() << "JOIN_GROUP request. MemberId# " << MemberId); if (JoinGroupRequestData->SessionTimeoutMs) { SessionTimeoutMs = JoinGroupRequestData->SessionTimeoutMs; diff --git a/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp index 42530a8442d..d868a35d844 100644 --- a/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_init_producer_id_actor.cpp @@ -45,7 +45,7 @@ namespace NKafka { TKafkaInitProducerIdActor::TKafkaInitProducerIdActor(const TContext::TPtr context, const ui64 correlationId, const TString& transactionalId) : Context(context) , CorrelationId(correlationId) - , TransactionalId(std::move(transactionalId)) { + , TransactionalId(transactionalId) { } void TKafkaInitProducerIdActor::Bootstrap(const NActors::TActorContext& ctx) { @@ -201,9 +201,9 @@ namespace NKafka { void TKafkaInitProducerIdActor::OnSuccessfullProducerStateUpdate(NKqp::TEvKqp::TEvQueryResponse::TPtr ev) { auto producerState = ParseProducerState(ev).value(); - PersistedProducerState = std::move(producerState); - SendSaveTxnProducerStateRequest(producerState); + + PersistedProducerState = std::move(producerState); } // requests to producer_state table @@ -282,7 +282,7 @@ namespace NKafka { void TKafkaInitProducerIdActor::SendSaveTxnProducerStateRequest(const TProducerState& producerState) { KAFKA_LOG_D("Sending save txn producer state request"); - Send(NKafka::MakeTransactionsServiceID(), new TEvKafka::TEvSaveTxnProducerRequest( + Send(NKafka::MakeTransactionsServiceID(SelfId().NodeId()), new TEvKafka::TEvSaveTxnProducerRequest( producerState.TransactionalId, { producerState.ProducerId, diff --git a/ydb/core/kafka_proxy/actors/kafka_transaction_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_transaction_actor.cpp index 28a09fefa2c..b54207e065f 100644 --- a/ydb/core/kafka_proxy/actors/kafka_transaction_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_transaction_actor.cpp @@ -12,7 +12,7 @@ if (!ProducerInRequestIsValid(ev->Get()->Request)) { \ auto& kafkaRequest = ev->Get()->Request; \ TString message = TStringBuilder() << "Recieved invalid request. Got: " \ - << "transactionalId=" << kafkaRequest->TransactionalId->c_str() \ + << "transactionalId=" << *kafkaRequest->TransactionalId \ << ", producerId=" << kafkaRequest->ProducerId \ << ", producerEpoch=" << kafkaRequest->ProducerEpoch; \ SendFailResponse<ErrorResponseType>(ev, EKafkaErrors::UNKNOWN_SERVER_ERROR, message); \ @@ -27,7 +27,7 @@ namespace NKafka { for (auto& topicInRequest : ev->Get()->Request->Topics) { for (auto& partitionInRequest : topicInRequest.Partitions) { - PartitionsInTxn.emplace(GetFullTopicPath(topicInRequest.Name->c_str()), static_cast<ui32>(partitionInRequest)); + PartitionsInTxn.emplace(GetFullTopicPath(*topicInRequest.Name), static_cast<ui32>(partitionInRequest)); } } SendOkResponse<TAddPartitionsToTxnResponseData>(ev); @@ -49,11 +49,11 @@ namespace NKafka { // save offsets for future use for (auto& topicInRequest : ev->Get()->Request->Topics) { for (auto& partitionInRequest : topicInRequest.Partitions) { - TTopicPartition topicPartition = {GetFullTopicPath(topicInRequest.Name->c_str()), static_cast<ui32>(partitionInRequest.PartitionIndex)}; + TTopicPartition topicPartition = {GetFullTopicPath(*topicInRequest.Name), static_cast<ui32>(partitionInRequest.PartitionIndex)}; TPartitionCommit newCommit{ .Partition = partitionInRequest.PartitionIndex, .Offset = partitionInRequest.CommittedOffset, - .ConsumerName = ev->Get()->Request->GroupId->c_str(), + .ConsumerName = ev->Get()->Request->GroupId.value(), .ConsumerGeneration = ev->Get()->Request->GenerationId }; auto it = OffsetsToCommit.find(topicPartition); @@ -87,6 +87,7 @@ namespace NKafka { return; // we just ignore second and subsequent requests } else if (txnAborted) { SendOkResponse<TEndTxnResponseData>(ev); + Die(ctx); } else { CommitStarted = true; EndTxnRequestPtr = std::move(ev); @@ -106,6 +107,8 @@ namespace NKafka { return; } + KqpSessionId = ev->Get()->Record.GetResponse().GetSessionId(); + SendToKqpValidationRequests(ctx); } @@ -122,6 +125,9 @@ namespace NKafka { case EKafkaTxnKqpRequests::SELECT: HandleSelectResponse(*ev->Get(), ctx); break; + case EKafkaTxnKqpRequests::ADD_KAFKA_OPERATIONS_TO_TXN: + HandleAddKafkaOperationsResponse(ev->Get()->Record.GetResponse().GetTxMeta().id(), ctx); + break; case EKafkaTxnKqpRequests::COMMIT: HandleCommitResponse(ctx); break; @@ -133,7 +139,7 @@ namespace NKafka { void TTransactionActor::StartKqpSession(const TActorContext& ctx) { Kqp = std::make_unique<TKqpTxHelper>(DatabasePath); KAFKA_LOG_D("Sending create session request to KQP for database " << DatabasePath); - Kqp->SendCreateSessionRequest(ctx, KqpActorId); + Kqp->SendCreateSessionRequest(ctx); } void TTransactionActor::SendToKqpValidationRequests(const TActorContext& ctx) { @@ -143,19 +149,18 @@ namespace NKafka { BuildSelectParams(), ++KqpCookie, ctx, - false, - KqpActorId + false ); LastSentToKqpRequest = EKafkaTxnKqpRequests::SELECT; } - void TTransactionActor::SendCommitTxnRequest(const TString& kqpTransactionId) { - auto request = BuildCommitTxnRequestToKqp(kqpTransactionId); + void TTransactionActor::SendAddKafkaOperationsToTxRequest(const TString& kqpTransactionId) { + auto request = BuildAddKafkaOperationsRequest(kqpTransactionId); - Send(KqpActorId, request.Release(), 0, ++KqpCookie); + Send(MakeKqpProxyID(SelfId().NodeId()), request.Release(), 0, ++KqpCookie); - LastSentToKqpRequest = EKafkaTxnKqpRequests::COMMIT; + LastSentToKqpRequest = EKafkaTxnKqpRequests::ADD_KAFKA_OPERATIONS_TO_TXN; } // Response senders @@ -185,13 +190,13 @@ namespace NKafka { if (Kqp) { Kqp->CloseKqpSession(ctx); } - Send(TxnCoordinatorActorId, new TEvKafka::TEvTransactionActorDied(TransactionalId, ProducerInstanceId)); + Send(MakeTransactionsServiceID(SelfId().NodeId()), new TEvKafka::TEvTransactionActorDied(TransactionalId, ProducerInstanceId)); TBase::Die(ctx); } template<class EventType> bool TTransactionActor::ProducerInRequestIsValid(TMessagePtr<EventType> kafkaRequest) { - return kafkaRequest->TransactionalId->c_str() == TransactionalId + return *kafkaRequest->TransactionalId == TransactionalId && kafkaRequest->ProducerId == ProducerInstanceId.Id && kafkaRequest->ProducerEpoch == ProducerInstanceId.Epoch; } @@ -236,7 +241,7 @@ namespace NKafka { return params.Build(); } - THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> TTransactionActor::BuildCommitTxnRequestToKqp(const TString& kqpTransactionId) { + THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> TTransactionActor::BuildAddKafkaOperationsRequest(const TString& kqpTransactionId) { auto ev = MakeHolder<TEvKqp::TEvQueryRequest>(); ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_UNDEFINED); @@ -244,13 +249,11 @@ namespace NKafka { ev->Record.MutableRequest()->SetDatabase(DatabasePath); ev->Record.MutableRequest()->SetSessionId(KqpSessionId); - ev->Record.MutableRequest()->MutableTxControl()->set_commit_tx(true); ev->Record.MutableRequest()->MutableTxControl()->set_tx_id(kqpTransactionId); ev->Record.MutableRequest()->SetUsePublicResponseDataFormat(true); auto* kafkaApiOperations = ev->Record.MutableRequest()->MutableKafkaApiOperations(); - kafkaApiOperations->set_transactionalid(TransactionalId); kafkaApiOperations->set_producerid(ProducerInstanceId.Id); kafkaApiOperations->set_producerepoch(ProducerInstanceId.Epoch); @@ -265,10 +268,9 @@ namespace NKafka { offsetInRequest->set_topicpath(partition.TopicPath); offsetInRequest->set_partitionid(partition.PartitionId); offsetInRequest->set_consumername(offsetDetails.ConsumerName); - offsetInRequest->set_consumergeneration(offsetDetails.ConsumerGeneration); offsetInRequest->set_offset(offsetDetails.Offset); } - + return ev; } @@ -309,10 +311,16 @@ namespace NKafka { return; } - KAFKA_LOG_D("Validated producer and consumers states. Everything is alright, sending commit"); + KAFKA_LOG_D("Validated producer and consumers states. Everything is alright, adding kafka operations to transaction."); auto kqpTxnId = response.Record.GetResponse().GetTxMeta().id(); - // finally everything is valid and we can attempt to commit - SendCommitTxnRequest(kqpTxnId); + // finally everything is valid and we can add kafka operations to transaction and attempt to commit + SendAddKafkaOperationsToTxRequest(kqpTxnId); + } + + void TTransactionActor::HandleAddKafkaOperationsResponse(const TString& kqpTransactionId, const TActorContext& ctx) { + KAFKA_LOG_D("Successfully added kafka operations to transaction. Committing transaction."); + Kqp->SetTxId(kqpTransactionId); + Kqp->CommitTx(++KqpCookie, ctx); } void TTransactionActor::HandleCommitResponse(const TActorContext& ctx) { @@ -370,12 +378,12 @@ namespace NKafka { } } - /** - * Parses the response to extract consumer group generations. - * - * @param response The response object containing the result set from the YDB query. - * @return A map where keys are consumer group names and values are their corresponding generations. - */ + /** + * Parses the response to extract consumer group generations. + * + * @param response The response object containing the result set from the YDB query. + * @return A map where keys are consumer group names and values are their corresponding generations. + */ std::unordered_map<TString, i32> TTransactionActor::ParseConsumersGenerations(const NKqp::TEvKqp::TEvQueryResponse& response) { std::unordered_map<TString, i32> generationByConsumerName; @@ -425,8 +433,10 @@ namespace NKafka { switch (request) { case SELECT: return "SELECT"; + case ADD_KAFKA_OPERATIONS_TO_TXN: + return "ADD_KAFKA_OPERATIONS_TO_TXN"; case COMMIT: - return "SELECT"; + return "COMMIT"; case NO_REQUEST: return "NO_REQUEST"; } diff --git a/ydb/core/kafka_proxy/actors/kafka_transaction_actor.h b/ydb/core/kafka_proxy/actors/kafka_transaction_actor.h index 104548248a0..ca9c859461b 100644 --- a/ydb/core/kafka_proxy/actors/kafka_transaction_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_transaction_actor.h @@ -31,19 +31,18 @@ namespace NKafka { // This request selects up-to-date producer and consumers states from relevant tables // After this request a check will happen, that no transaction details has expired. SELECT, + // This requests just adds all transaction details (partitions, offsets) to KQP + ADD_KAFKA_OPERATIONS_TO_TXN, // This request sends to KQP a command to commit transaction - // Both these requests happen in same transaction - COMMIT + COMMIT }; // we need to exlplicitly specify kqpActorId and txnCoordinatorActorId for unit tests - TTransactionActor(const TString& transactionalId, i64 producerId, i16 producerEpoch, const TString& DatabasePath, const TActorId& kqpActorId, const TActorId& txnCoordinatorActorId) : + TTransactionActor(const TString& transactionalId, i64 producerId, i16 producerEpoch, const TString& DatabasePath) : TBase(&TTransactionActor::StateFunc), TransactionalId(transactionalId), ProducerInstanceId({producerId, producerEpoch}), - DatabasePath(DatabasePath), - TxnCoordinatorActorId(txnCoordinatorActorId), - KqpActorId(kqpActorId) {}; + DatabasePath(DatabasePath) {}; TStringBuilder LogPrefix() const { return TStringBuilder() << "KafkaTransactionActor{TransactionalId=" << TransactionalId << "; ProducerId=" << ProducerInstanceId.Id << "; ProducerEpoch=" << ProducerInstanceId.Epoch << "}: "; @@ -62,7 +61,10 @@ namespace NKafka { HFunc(TEvents::TEvPoison, Handle); } } catch (const yexception& y) { - KAFKA_LOG_CRIT(TStringBuilder() << "Critical error happened. Reason: " << y.what()); + KAFKA_LOG_CRIT("Critical error happened. Reason: " << y.what()); + if (EndTxnRequestPtr) { + SendFailResponse<TEndTxnResponseData>(EndTxnRequestPtr, EKafkaErrors::UNKNOWN_SERVER_ERROR, y.what()); + } Die(ActorContext()); } } @@ -82,7 +84,7 @@ namespace NKafka { // Transaction commit logic void StartKqpSession(const TActorContext& ctx); void SendToKqpValidationRequests(const TActorContext& ctx); - void SendCommitTxnRequest(const TString& kqpTransactionId); + void SendAddKafkaOperationsToTxRequest(const TString& kqpTransactionId); // Response senders template<class ErrorResponseType, class EventType> @@ -97,8 +99,9 @@ namespace NKafka { TString GetFullTopicPath(const TString& topicName); TString GetYqlWithTablesNames(const TString& templateStr); NYdb::TParams BuildSelectParams(); - THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> BuildCommitTxnRequestToKqp(const TString& kqpTransactionId); + THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> BuildAddKafkaOperationsRequest(const TString& kqpTransactionId); void HandleSelectResponse(const NKqp::TEvKqp::TEvQueryResponse& response, const TActorContext& ctx); + void HandleAddKafkaOperationsResponse(const TString& kqpTransactionId, const TActorContext& ctx); void HandleCommitResponse(const TActorContext& ctx); TMaybe<TString> GetErrorFromYdbResponse(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev); TMaybe<TProducerState> ParseProducerState(const NKqp::TEvKqp::TEvQueryResponse& response); @@ -112,8 +115,6 @@ namespace NKafka { std::unordered_set<TTopicPartition, TTopicPartitionHashFn> PartitionsInTxn = {}; const TString TransactionalId; const TProducerInstanceId ProducerInstanceId; - // const i64 ProducerId; - // const i16 ProducerEpoch; // helper fields const TString DatabasePath; @@ -121,11 +122,9 @@ namespace NKafka { // In case something goes off road, we can always send error back to client TAutoPtr<TEventHandle<TEvKafka::TEvEndTxnRequest>> EndTxnRequestPtr; bool CommitStarted = false; - const TActorId TxnCoordinatorActorId; // communication with KQP std::unique_ptr<TKqpTxHelper> Kqp; - TActorId KqpActorId; TString KqpSessionId; ui64 KqpCookie = 0; EKafkaTxnKqpRequests LastSentToKqpRequest; diff --git a/ydb/core/kafka_proxy/actors/kafka_transaction_actor_sql.cpp b/ydb/core/kafka_proxy/actors/kafka_transaction_actor_sql.cpp index e4f18f3be3a..00d9bdb3a9b 100644 --- a/ydb/core/kafka_proxy/actors/kafka_transaction_actor_sql.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_transaction_actor_sql.cpp @@ -12,10 +12,10 @@ namespace NKafka::NKafkaTransactionSql { WHERE database = $Database AND transactional_id = $TransactionalId; - SELECT consumer_group, MAX(generation) FROM `<consumer_state_table_name>` + SELECT consumer_group, MAX(generation) AS generation FROM `<consumer_state_table_name>` VIEW PRIMARY KEY WHERE database = $Database - AND consumer_group IN COMPACT ($ConsumerGroups) + AND consumer_group IN COMPACT $ConsumerGroups GROUP BY consumer_group; )sql"; diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp index 004ec9428b1..4e63e1717c8 100644 --- a/ydb/core/kafka_proxy/kafka_connection.cpp +++ b/ydb/core/kafka_proxy/kafka_connection.cpp @@ -51,7 +51,6 @@ public: TEvPollerReady* InactivityEvent = nullptr; const TActorId ListenerActorId; - const TActorId KafkaTxnCoordinatorActorId = NKafka::MakeTransactionsServiceID(); TIntrusivePtr<TSocketDescriptor> Socket; TSocketAddressType Address; @@ -341,7 +340,7 @@ protected: } void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TAddPartitionsToTxnRequestData>& message) { - Send(MakeTransactionsServiceID(), new TEvKafka::TEvAddPartitionsToTxnRequest( + Send(MakeTransactionsServiceID(SelfId().NodeId()), new TEvKafka::TEvAddPartitionsToTxnRequest( header->CorrelationId, message, Context->ConnectionId, @@ -350,7 +349,7 @@ protected: } void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TAddOffsetsToTxnRequestData>& message) { - Send(MakeTransactionsServiceID(), new TEvKafka::TEvAddOffsetsToTxnRequest( + Send(MakeTransactionsServiceID(SelfId().NodeId()), new TEvKafka::TEvAddOffsetsToTxnRequest( header->CorrelationId, message, Context->ConnectionId, @@ -359,7 +358,7 @@ protected: } void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TTxnOffsetCommitRequestData>& message) { - Send(MakeTransactionsServiceID(), new TEvKafka::TEvTxnOffsetCommitRequest( + Send(MakeTransactionsServiceID(SelfId().NodeId()), new TEvKafka::TEvTxnOffsetCommitRequest( header->CorrelationId, message, Context->ConnectionId, @@ -368,7 +367,7 @@ protected: } void HandleMessage(const TRequestHeaderData* header, const TMessagePtr<TEndTxnRequestData>& message) { - Send(MakeTransactionsServiceID(), new TEvKafka::TEvEndTxnRequest( + Send(MakeTransactionsServiceID(SelfId().NodeId()), new TEvKafka::TEvEndTxnRequest( header->CorrelationId, message, Context->ConnectionId, diff --git a/ydb/core/kafka_proxy/kafka_transactions_coordinator.cpp b/ydb/core/kafka_proxy/kafka_transactions_coordinator.cpp index 95eabbe719b..a66129071a7 100644 --- a/ydb/core/kafka_proxy/kafka_transactions_coordinator.cpp +++ b/ydb/core/kafka_proxy/kafka_transactions_coordinator.cpp @@ -83,11 +83,11 @@ namespace NKafka { template<class ErrorResponseType, class EventType> void TTransactionsCoordinator::HandleTransactionalRequest(TAutoPtr<TEventHandle<EventType>>& evHandle, const TActorContext& ctx) { EventType* ev = evHandle->Get(); - KAFKA_LOG_D("Received message for transactionalId " << ev->Request->TransactionalId->c_str() << " and ApiKey " << ev->Request->ApiKey()); + KAFKA_LOG_D("Received message for transactionalId " << *ev->Request->TransactionalId << " and ApiKey " << ev->Request->ApiKey()); // create helper struct to simplify methods interaction auto txnRequest = TTransactionalRequest( - ev->Request->TransactionalId->c_str(), + *ev->Request->TransactionalId, TProducerInstanceId(ev->Request->ProducerId, ev->Request->ProducerEpoch), ev->CorrelationId, ev->ConnectionId @@ -111,16 +111,16 @@ namespace NKafka { EventType* ev = evHandle->Get(); TActorId txnActorId; - if (TxnActorByTransactionalId.contains(ev->Request->TransactionalId->c_str())) { - txnActorId = TxnActorByTransactionalId[ev->Request->TransactionalId->c_str()]; + if (TxnActorByTransactionalId.contains(*ev->Request->TransactionalId)) { + txnActorId = TxnActorByTransactionalId[*ev->Request->TransactionalId]; } else { - txnActorId = ctx.Register(new TTransactionActor(ev->Request->TransactionalId->c_str(), ev->Request->ProducerId, ev->Request->ProducerEpoch, ev->DatabasePath, NKikimr::NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ctx.SelfID)); - TxnActorByTransactionalId[ev->Request->TransactionalId->c_str()] = txnActorId; - KAFKA_LOG_D("Registered TTransactionActor with id " << txnActorId << " for transactionalId " << ev->Request->TransactionalId->c_str() << " and ApiKey " << ev->Request->ApiKey()); + txnActorId = ctx.Register(new TTransactionActor(*ev->Request->TransactionalId, ev->Request->ProducerId, ev->Request->ProducerEpoch, ev->DatabasePath)); + TxnActorByTransactionalId[*ev->Request->TransactionalId] = txnActorId; + KAFKA_LOG_D("Registered TTransactionActor with id " << txnActorId << " for transactionalId " << *ev->Request->TransactionalId << " and ApiKey " << ev->Request->ApiKey()); } TAutoPtr<IEventHandle> tmpPtr = evHandle.Release(); ctx.Forward(tmpPtr, txnActorId); - KAFKA_LOG_D("Forwarded message to TTransactionActor with id " << txnActorId << " for transactionalId " << ev->Request->TransactionalId->c_str() << " and ApiKey " << ev->Request->ApiKey()); + KAFKA_LOG_D("Forwarded message to TTransactionActor with id " << txnActorId << " for transactionalId " << *ev->Request->TransactionalId << " and ApiKey " << ev->Request->ApiKey()); }; void TTransactionsCoordinator::DeleteTransactionActor(const TString& transactionalId) { diff --git a/ydb/core/kafka_proxy/kafka_transactions_coordinator.h b/ydb/core/kafka_proxy/kafka_transactions_coordinator.h index 965a2f0807a..5a4d596ba54 100644 --- a/ydb/core/kafka_proxy/kafka_transactions_coordinator.h +++ b/ydb/core/kafka_proxy/kafka_transactions_coordinator.h @@ -82,9 +82,9 @@ namespace NKafka { return new TTransactionsCoordinator(); }; - inline TActorId MakeTransactionsServiceID() { + inline TActorId MakeTransactionsServiceID(ui32 nodeId) { static const char x[12] = "kafka_txns"; - return TActorId(0, TStringBuf(x, 12)); + return TActorId(nodeId, TStringBuf(x, 12)); }; } // namespace NKafka
\ No newline at end of file diff --git a/ydb/core/kafka_proxy/kqp_helper.cpp b/ydb/core/kafka_proxy/kqp_helper.cpp index 0ca2e3c1ec6..a592138b162 100644 --- a/ydb/core/kafka_proxy/kqp_helper.cpp +++ b/ydb/core/kafka_proxy/kqp_helper.cpp @@ -9,15 +9,9 @@ TKqpTxHelper::TKqpTxHelper(TString database) : DataBase(database) {} -void TKqpTxHelper::SendCreateSessionRequest(const TActorContext& ctx, const TMaybe<TActorId>& kqpActorId) { +void TKqpTxHelper::SendCreateSessionRequest(const TActorContext& ctx) { auto ev = MakeCreateSessionRequest(); - TActorId actorId; - if (!kqpActorId) { - actorId = MakeKqpProxyID(ctx.SelfID.NodeId()); - } else { - actorId = *kqpActorId; - } - ctx.Send(actorId, ev.Release(), 0, 0); + ctx.Send(MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release(), 0, 0); } void TKqpTxHelper::BeginTransaction(ui64 cookie, const TActorContext& ctx) { @@ -68,7 +62,7 @@ void TKqpTxHelper::SendRequest(THolder<TEvKqp::TEvQueryRequest> request, ui64 co ctx.Send(MakeKqpProxyID(ctx.SelfID.NodeId()), request.Release(), 0, cookie); } -void TKqpTxHelper::SendYqlRequest(const TString& yqlRequest, NYdb::TParams sqlParams, ui64 cookie, const TActorContext& ctx, bool commit, const TMaybe<TActorId>& kqpActorId) { +void TKqpTxHelper::SendYqlRequest(const TString& yqlRequest, NYdb::TParams sqlParams, ui64 cookie, const TActorContext& ctx, bool commit) { auto ev = MakeHolder<TEvKqp::TEvQueryRequest>(); ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); @@ -87,13 +81,7 @@ void TKqpTxHelper::SendYqlRequest(const TString& yqlRequest, NYdb::TParams sqlPa ev->Record.MutableRequest()->MutableYdbParameters()->swap(*(NYdb::TProtoAccessor::GetProtoMapPtr(sqlParams))); - TActorId actorId; - if (!kqpActorId) { - actorId = MakeKqpProxyID(ctx.SelfID.NodeId()); - } else { - actorId = *kqpActorId; - } - ctx.Send(actorId, ev.Release(), 0, cookie); + ctx.Send(MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release(), 0, cookie); } void TKqpTxHelper::CommitTx(ui64 cookie, const TActorContext& ctx) { diff --git a/ydb/core/kafka_proxy/kqp_helper.h b/ydb/core/kafka_proxy/kqp_helper.h index ea22719b8e1..42ce77bc05a 100644 --- a/ydb/core/kafka_proxy/kqp_helper.h +++ b/ydb/core/kafka_proxy/kqp_helper.h @@ -15,13 +15,13 @@ class TKqpTxHelper { public: TKqpTxHelper(TString database); // kqp actor id can be changed for testing purposes - void SendCreateSessionRequest(const TActorContext& ctx, const TMaybe<TActorId>& kqpActorId = {}); + void SendCreateSessionRequest(const TActorContext& ctx); void BeginTransaction(ui64 cookie, const TActorContext& ctx); bool HandleCreateSessionResponse(TEvKqp::TEvCreateSessionResponse::TPtr& ev, const TActorContext& ctx); void CloseKqpSession(const TActorContext& ctx); void SendRequest(THolder<TEvKqp::TEvQueryRequest> request, ui64 cookie, const TActorContext& ctx); void CommitTx(ui64 cookie, const TActorContext& ctx); - void SendYqlRequest(const TString& yqlRequest, NYdb::TParams sqlParams, ui64 cookie, const TActorContext& ctx, bool commit = false, const TMaybe<TActorId>& kqpActorId = {}); + void SendYqlRequest(const TString& yqlRequest, NYdb::TParams sqlParams, ui64 cookie, const TActorContext& ctx, bool commit = false); void SendInitTableRequest(const TActorContext& ctx, std::shared_ptr<NKikimr::NMetadata::IClassBehaviour> prepareManager); void SetTxId(const TString& txId); void ResetTxId(); diff --git a/ydb/core/kafka_proxy/ut/ut_transaction_actor.cpp b/ydb/core/kafka_proxy/ut/ut_transaction_actor.cpp index 0249adfb9a0..a240328ca23 100644 --- a/ydb/core/kafka_proxy/ut/ut_transaction_actor.cpp +++ b/ydb/core/kafka_proxy/ut/ut_transaction_actor.cpp @@ -1,5 +1,6 @@ #include <ydb/core/kafka_proxy/actors/kafka_transaction_actor.h> #include <ydb/core/kafka_proxy/kafka_events.h> +#include <ydb/core/kafka_proxy/kafka_transactions_coordinator.h> #include <library/cpp/testing/unittest/registar.h> #include <util/generic/fwd.h> @@ -45,7 +46,10 @@ namespace { THolder<NKqp::TEvKqp::TEvQueryResponse> response; if (ev->Get()->Record.GetRequest().GetTxControl().commit_tx()) { Cout << "Sending response on commit from dummy kqp" << Endl; - response = MakeResponseOnCommit(); + response = MakeSimpleSuccessResponse(); + } else if (ev->Get()->Record.GetRequest().HasKafkaApiOperations()) { + Cout << "Sending response on add kafka operations from dummy kqp" << Endl; + response = MakeSimpleSuccessResponse(); } else { Cout << "Sending response on select from dummy kqp" << Endl; response = MakeResponseOnSelectFromKqp(); @@ -59,7 +63,7 @@ namespace { )); } - THolder<NKqp::TEvKqp::TEvQueryResponse> MakeResponseOnCommit() { + THolder<NKqp::TEvKqp::TEvQueryResponse> MakeSimpleSuccessResponse() { auto response = MakeHolder<NKqp::TEvKqp::TEvQueryResponse>(); NKikimrKqp::TEvQueryResponse record; if (ReturnSuccessOnCommit) { @@ -176,7 +180,6 @@ namespace { }; struct TQueryRequestMatcher { - bool CommitTx; TVector<TTopicPartitions> TopicPartitions; TVector<TConsumerCommitMatcher> ConsumerCommitMatchers; }; @@ -210,13 +213,13 @@ namespace { Ctx->Runtime->SetLogPriority(NKikimrServices::KAFKA_PROXY, NLog::PRI_DEBUG); DummyKqpActor = new TDummyKqpActor(); KqpActorId = Ctx->Runtime->Register(DummyKqpActor); + Ctx->Runtime->RegisterService(MakeKqpProxyID(Ctx->Runtime->GetNodeId()), KqpActorId); + Ctx->Runtime->RegisterService(MakeTransactionsServiceID(Ctx->Runtime->GetNodeId()), Ctx->Edge); ActorId = Ctx->Runtime->Register(new TTransactionActor( TransactionalId, ProducerId, ProducerEpoch, - Database, - KqpActorId, - Ctx->Edge + Database )); DummyKqpActor->SetValidationResponse(TransactionalId, ProducerId, ProducerEpoch); } @@ -291,7 +294,7 @@ namespace { // Arguments: // 1. callback - function, that will be called on recieving the response from KQP om commit request // 2. consumerGenerationsToReturnInValidationRequest - map of consumer name to its generation to ensure proper validation of consumer state by actor - void AddObserverForCommitRequestToKqp(std::function<void(const TEvKqp::TEvQueryRequest*)> callback, std::unordered_map<TString, i32> consumerGenerationsToReturnInValidationRequest = {}) { + void AddObserverForAddOperationsRequest(std::function<void(const TEvKqp::TEvQueryRequest*)> callback, std::unordered_map<TString, i32> consumerGenerationsToReturnInValidationRequest = {}) { DummyKqpActor->SetValidationResponse(TransactionalId, ProducerId, ProducerEpoch, consumerGenerationsToReturnInValidationRequest); auto observer = [callback = std::move(callback), this](TAutoPtr<IEventHandle>& input) { @@ -314,8 +317,7 @@ namespace { void MatchQueryRequest(const NKikimr::NKqp::TEvKqp::TEvQueryRequest* request, const TQueryRequestMatcher& matcher) { UNIT_ASSERT_VALUES_EQUAL(request->Record.GetRequest().GetTxControl().begin_tx().has_serializable_read_write(), false); - UNIT_ASSERT_VALUES_EQUAL(request->Record.GetRequest().GetTxControl().commit_tx(), matcher.CommitTx); - UNIT_ASSERT_VALUES_EQUAL(request->Record.GetRequest().GetKafkaApiOperations().GetTransactionalId(), TransactionalId); + UNIT_ASSERT_VALUES_EQUAL(request->Record.GetRequest().GetAction(), NKikimrKqp::QUERY_ACTION_TOPIC); UNIT_ASSERT_VALUES_EQUAL(request->Record.GetRequest().GetKafkaApiOperations().GetProducerId(), ProducerId); UNIT_ASSERT_VALUES_EQUAL(request->Record.GetRequest().GetKafkaApiOperations().GetProducerEpoch(), ProducerEpoch); @@ -364,7 +366,6 @@ namespace { UNIT_ASSERT_VALUES_EQUAL(it->GetPartitionId(), partitionOffset.first); UNIT_ASSERT_VALUES_EQUAL(it->GetOffset(), partitionOffset.second); UNIT_ASSERT_VALUES_EQUAL(it->GetConsumerName(), consumerCommitMatcher.ConsumerName); - UNIT_ASSERT_VALUES_EQUAL(it->GetConsumerGeneration(), consumerCommitMatcher.GenerationId); } } } @@ -379,8 +380,8 @@ namespace { Y_UNIT_TEST(OnAddPartitionsAndEndTxn_shouldSendTxnToKqpWithSpecifiedPartitions) { TVector<TTopicPartitions> topics = {{"topic1", {0, 1}}, {"topic2", {0}}}; bool seenEvent = false; - AddObserverForCommitRequestToKqp([&](const NKikimr::NKqp::TEvKqp::TEvQueryRequest* request) { - MatchQueryRequest(request, {true, topics, {}}); + AddObserverForAddOperationsRequest([&](const NKikimr::NKqp::TEvKqp::TEvQueryRequest* request) { + MatchQueryRequest(request, {topics, {}}); seenEvent = true; }); @@ -419,11 +420,11 @@ namespace { UNIT_ASSERT_EQUAL(message.Results[1].Results[0].ErrorCode, NKafka::EKafkaErrors::NONE_ERROR); } - Y_UNIT_TEST(OnDoubleAddPartitionsWithSamePartitionsAndEndTxn_shouldSendTxnToKqpWithOnceSpecifiedPartitions) { + Y_UNIT_TEST(OnDoubleAddPartitionsWithSamePartitionsAndEndTxn_shouldSendTxnToKqpKafkaOperationsWithOnceSpecifiedPartitions) { TVector<TTopicPartitions> topics = {{"topic1", {0}}}; bool seenEvent = false; - AddObserverForCommitRequestToKqp([&](const NKikimr::NKqp::TEvKqp::TEvQueryRequest* request) { - MatchQueryRequest(request, {true, topics, {}}); + AddObserverForAddOperationsRequest([&](const NKikimr::NKqp::TEvKqp::TEvQueryRequest* request) { + MatchQueryRequest(request, {topics, {}}); seenEvent = true; }); @@ -439,12 +440,12 @@ namespace { UNIT_ASSERT(seenEvent); } - Y_UNIT_TEST(OnDoubleAddPartitionsWithDifferentPartitionsAndEndTxn_shouldSendTxnToKqpWithAllSpecifiedPartitions) { + Y_UNIT_TEST(OnDoubleAddPartitionsWithDifferentPartitionsAndEndTxn_shouldSendTxnToKqKafkaOperationspWithAllSpecifiedPartitions) { TVector<TTopicPartitions> topics1 = {{"topic1", {0}}}; TVector<TTopicPartitions> topics2 = {{"topic2", {0}}}; bool seenEvent = false; - AddObserverForCommitRequestToKqp([&](const NKikimr::NKqp::TEvKqp::TEvQueryRequest* request) { - MatchQueryRequest(request, {true, {{"topic1", {0}}, {"topic2", {0}}}, {}}); + AddObserverForAddOperationsRequest([&](const NKikimr::NKqp::TEvKqp::TEvQueryRequest* request) { + MatchQueryRequest(request, {{{"topic1", {0}}, {"topic2", {0}}}, {}}); seenEvent = true; }); @@ -469,8 +470,8 @@ namespace { std::unordered_map<TString, i32> consumerGenerationByNameToReturnFromKqp; consumerGenerationByNameToReturnFromKqp[consumerName] = consumerGeneration; bool seenEvent = false; - AddObserverForCommitRequestToKqp([&](const NKikimr::NKqp::TEvKqp::TEvQueryRequest* request) { - MatchQueryRequest(request, {true, {}, {{consumerName, consumerGeneration, partitionOffsetsToCommitByTopic}}}); + AddObserverForAddOperationsRequest([&](const NKikimr::NKqp::TEvKqp::TEvQueryRequest* request) { + MatchQueryRequest(request, {{}, {{consumerName, consumerGeneration, partitionOffsetsToCommitByTopic}}}); seenEvent = true; }, consumerGenerationByNameToReturnFromKqp); @@ -493,8 +494,8 @@ namespace { std::unordered_map<TString, i32> consumerGenerationByNameToReturnFromKqp; consumerGenerationByNameToReturnFromKqp[consumerName] = consumerGenerationFromTable; bool seenEvent = false; - AddObserverForCommitRequestToKqp([&](const NKikimr::NKqp::TEvKqp::TEvQueryRequest* request) { - MatchQueryRequest(request, {true, {}, {{consumerName, consumerGenerationFromTable, partitionOffsetsToCommitByTopic}}}); + AddObserverForAddOperationsRequest([&](const NKikimr::NKqp::TEvKqp::TEvQueryRequest* request) { + MatchQueryRequest(request, {{}, {{consumerName, consumerGenerationFromTable, partitionOffsetsToCommitByTopic}}}); seenEvent = true; }, consumerGenerationByNameToReturnFromKqp); @@ -522,9 +523,9 @@ namespace { std::unordered_map<TString, i32> consumerGenerationByNameToReturnFromKqp; consumerGenerationByNameToReturnFromKqp[consumerName] = consumerGeneration; bool seenEvent = false; - AddObserverForCommitRequestToKqp([&](const NKikimr::NKqp::TEvKqp::TEvQueryRequest* request) { + AddObserverForAddOperationsRequest([&](const NKikimr::NKqp::TEvKqp::TEvQueryRequest* request) { // we validate that to KQP are sent offsets from second request - MatchQueryRequest(request, {true, {}, {{consumerName, consumerGeneration, secondRequestOffsetsByTopic}}}); + MatchQueryRequest(request, {{}, {{consumerName, consumerGeneration, secondRequestOffsetsByTopic}}}); seenEvent = true; }, consumerGenerationByNameToReturnFromKqp); @@ -569,7 +570,7 @@ namespace { UNIT_ASSERT_EQUAL(message.Topics[1].Partitions[1].ErrorCode, NKafka::EKafkaErrors::NONE_ERROR); } - Y_UNIT_TEST(OnAddOffsetsToTxnAndAddPartitionsAndEndTxn_shouldSendToKqpCorrectTxn) { + Y_UNIT_TEST(OnAddOffsetsToTxnAndAddPartitionsAndEndTxn_shouldSendToKqpCorrectAddKafkaOperationsRequest) { TVector<TTopicPartitions> topics = {{"topic3", {0}}}; TString consumerName = "my-consumer"; i32 consumerGeneration = 0; @@ -579,8 +580,8 @@ namespace { std::unordered_map<TString, i32> consumerGenerationByNameToReturnFromKqp; consumerGenerationByNameToReturnFromKqp[consumerName] = consumerGeneration; bool seenEvent = false; - AddObserverForCommitRequestToKqp([&](const NKikimr::NKqp::TEvKqp::TEvQueryRequest* request) { - MatchQueryRequest(request, {true, topics, {{consumerName, consumerGeneration, partitionOffsetsToCommitByTopic}}}); + AddObserverForAddOperationsRequest([&](const NKikimr::NKqp::TEvKqp::TEvQueryRequest* request) { + MatchQueryRequest(request, {topics, {{consumerName, consumerGeneration, partitionOffsetsToCommitByTopic}}}); seenEvent = true; }, consumerGenerationByNameToReturnFromKqp); @@ -607,6 +608,11 @@ namespace { const auto& result = static_cast<const NKafka::TEndTxnResponseData&>(*response->Response); UNIT_ASSERT_VALUES_EQUAL(response->CorrelationId, correlationId); UNIT_ASSERT_VALUES_EQUAL(result.ErrorCode, NKafka::EKafkaErrors::NONE_ERROR); + auto txnActorDiedEvent = Ctx->Runtime->GrabEdgeEvent<NKafka::TEvKafka::TEvTransactionActorDied>(); + UNIT_ASSERT(txnActorDiedEvent != nullptr); + UNIT_ASSERT_VALUES_EQUAL(txnActorDiedEvent->TransactionalId, TransactionalId); + UNIT_ASSERT_VALUES_EQUAL(txnActorDiedEvent->ProducerState.Id, ProducerId); + UNIT_ASSERT_VALUES_EQUAL(txnActorDiedEvent->ProducerState.Epoch, ProducerEpoch); } Y_UNIT_TEST(OnEndTxnWithCommitAndAbortFromTxn_shouldReturnBROKER_NOT_AVAILABLE) { diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index f9c2f1fa224..bea993525be 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -1453,7 +1453,7 @@ namespace Tests { Runtime->RegisterService(NKafka::MakeKafkaDiscoveryCacheID(), discoveryCacheId, nodeIdx); TActorId kafkaTxnCoordinatorActorId = Runtime->Register(NKafka::CreateTransactionsCoordinator(), nodeIdx, userPoolId); - Runtime->RegisterService(NKafka::MakeTransactionsServiceID(), kafkaTxnCoordinatorActorId, nodeIdx); + Runtime->RegisterService(NKafka::MakeTransactionsServiceID(Runtime->GetNodeId(nodeIdx)), kafkaTxnCoordinatorActorId, nodeIdx); NKafka::TListenerSettings settings; settings.Port = Settings->AppConfig->GetKafkaProxyConfig().GetListeningPort(); |
