diff options
author | Andrey Serebryanskiy <serebryanskiy@ydb.tech> | 2025-07-22 21:00:33 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-07-22 18:00:33 +0000 |
commit | dda968ffaa0f48643ff8e16d4fe956e6aab18387 (patch) | |
tree | da7b9c8a572fecf4ee060da73f38ae9f4d7538f5 | |
parent | 8ae2a630a8f04f35de5df97c0e8510115af4d3d5 (diff) | |
download | ydb-dda968ffaa0f48643ff8e16d4fe956e6aab18387.tar.gz |
[Kafka API] Fix bug in kafka transactions (#21310)
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp | 158 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_produce_actor.h | 9 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_transaction_actor.cpp | 69 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_transaction_actor.h | 2 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_transaction_actor_sql.cpp | 12 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_transaction_actor_sql.h | 3 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/ut/kafka_test_client.h | 2 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/ut/ut_produce_actor.cpp | 59 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/ut/ut_protocol.cpp | 131 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/ut/ut_transaction_actor.cpp | 14 | ||||
-rw-r--r-- | ydb/core/persqueue/partition.cpp | 1 | ||||
-rw-r--r-- | ydb/core/persqueue/partition_write.cpp | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/pq_impl.cpp | 14 | ||||
-rw-r--r-- | ydb/core/persqueue/write_id.cpp | 6 | ||||
-rw-r--r-- | ydb/public/api/protos/draft/persqueue_error_codes.proto | 3 |
15 files changed, 389 insertions, 96 deletions
diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp index 64f2a71152c..3bb17afef27 100644 --- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp @@ -378,54 +378,7 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co for(const auto& topicData : r->TopicData) { const TString& topicPath = NormalizePath(Context->DatabasePath, *topicData.Name); for(const auto& partitionData : topicData.PartitionData) { - const auto partitionId = partitionData.Index; - TProducerInstanceId producerInstanceId{partitionData.Records->ProducerId, partitionData.Records->ProducerEpoch}; - TMaybe<TString> transactionalId; - if (r->TransactionalId) { - transactionalId.ConstructInPlace(r->TransactionalId->c_str()); - } - - auto writer = PartitionWriter({topicPath, static_cast<ui32>(partitionId)}, producerInstanceId, transactionalId, ctx); - auto& result = pendingRequest->Results[position]; - if (OK == writer.first) { - auto ownCookie = ++Cookie; - auto& cookieInfo = Cookies[ownCookie]; - cookieInfo.TopicPath = topicPath; - cookieInfo.PartitionId = partitionId; - cookieInfo.Position = position; - cookieInfo.Request = pendingRequest; - - pendingRequest->WaitAcceptingCookies.insert(ownCookie); - pendingRequest->WaitResultCookies.insert(ownCookie); - - auto [error, ev] = Convert(transactionalId.GetOrElse(""), partitionData, *topicData.Name, ownCookie, ClientDC, ruPerRequest); - if (error == EKafkaErrors::NONE_ERROR) { - ruPerRequest = false; - Send(writer.second, std::move(ev)); - result.ErrorCode = NONE_ERROR; - } else { - result.ErrorCode = error; - } - } else { - switch (writer.first) { - case NOT_FOUND: - result.ErrorCode = EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION; - break; - case UNAUTHORIZED: - result.ErrorCode = EKafkaErrors::TOPIC_AUTHORIZATION_FAILED; - break; - case PRODUCER_FENCED: - result.ErrorCode = EKafkaErrors::PRODUCER_FENCED; - break; - default: - result.ErrorCode = EKafkaErrors::UNKNOWN_SERVER_ERROR; - } - } - - if (result.ErrorCode != EKafkaErrors::NONE_ERROR) { - KAFKA_LOG_ERROR("Write request failed with error " << result.ErrorCode << " and message " << result.ErrorMessage); - } - + SendWriteRequest(partitionData, *topicData.Name, pendingRequest, position, ruPerRequest, ctx); ++position; } } @@ -465,6 +418,7 @@ void TKafkaProduceActor::Handle(TEvPartitionWriter::TEvInitResult::TPtr request, void TKafkaProduceActor::Handle(TEvPartitionWriter::TEvWriteResponse::TPtr request, const TActorContext& ctx) { auto r = request->Get(); auto cookie = r->Record.GetPartitionResponse().GetCookie(); + KAFKA_LOG_T("Handling TEvPartitionWriter::TEvWriteResponse with cookie " << cookie); auto it = Cookies.find(cookie); if (it == Cookies.end()) { @@ -478,7 +432,12 @@ void TKafkaProduceActor::Handle(TEvPartitionWriter::TEvWriteResponse::TPtr reque partitionResult.Value = request; cookieInfo.Request->WaitResultCookies.erase(cookie); - if (!r->IsSuccess()) { + // Missing supportive partition means that we wrote in transaction and that transaction ended, thus suppprtive partition was deleted + // it means that we are writing in a new transaction and need to create a new partition writer (cause only partition writer in init state properly creates supportive partition) + if (r->Record.GetErrorCode() == NPersQueue::NErrorCode::EErrorCode::KAFKA_TRANSACTION_MISSING_SUPPORTIVE_PARTITION) { + RecreatePartitionWriterAndRetry(cookie, ctx); + return; + } else if (!r->IsSuccess()) { auto wit = NonTransactionalWriters.find(cookieInfo.TopicPath); if (wit != NonTransactionalWriters.end()) { auto& partitions = wit->second; @@ -488,6 +447,11 @@ void TKafkaProduceActor::Handle(TEvPartitionWriter::TEvWriteResponse::TPtr reque partitions.erase(pit); } } + auto txnIt = TransactionalWriters.find({cookieInfo.TopicPath, cookieInfo.PartitionId}); + if (txnIt != TransactionalWriters.end()) { + Send(txnIt->second.ActorId, new TEvents::TEvPoison()); + TransactionalWriters.erase(txnIt); + } } if (cookieInfo.Request->WaitResultCookies.empty()) { @@ -569,7 +533,11 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) { partitionResponse.BaseOffset = writeResults.at(0).GetOffset(); } } else { - KAFKA_LOG_ERROR("Produce actor: Partition result with error: ErrorCode=" << static_cast<int>(Convert(msg->GetError().Code)) << ", ErrorMessage=" << msg->GetError().Reason << ", #02"); + KAFKA_LOG_ERROR("Produce actor: Partition result with error: ErrorCode=" + << static_cast<int>(Convert(msg->GetError().Code)) + << ", ErrorMessage=" << msg->GetError().Reason + << ", Error from writer=" << static_cast<int>(msg->Record.GetErrorCode()) + << ", #02"); SendMetrics(TStringBuilder() << topicData.Name, recordsCount, "failed_messages", ctx); if (msg->Record.GetErrorCode() == NPersQueue::NErrorCode::KAFKA_INVALID_PRODUCER_EPOCH) { @@ -641,6 +609,94 @@ void TKafkaProduceActor::ProcessInitializationRequests(const TActorContext& ctx) ctx.Send(MakeSchemeCacheID(), MakeHolder<TEvTxProxySchemeCache::TEvNavigateKeySet>(request.release())); } +void TKafkaProduceActor::RecreatePartitionWriterAndRetry(ui64 cookie, const TActorContext& ctx) { + auto it = Cookies.find(cookie); + if (it != Cookies.end()) { + auto& cookieInfo = it->second; + KAFKA_LOG_D("Transaction was committed. Retrying produce request as a part of next transaction"); + auto txnIt = TransactionalWriters.find({cookieInfo.TopicPath, cookieInfo.PartitionId}); + if (txnIt != TransactionalWriters.end()) { + Send(txnIt->second.ActorId, new TEvents::TEvPoison()); + TransactionalWriters.erase(txnIt); + } + TProduceRequestData::TProduceRequestData::TTopicProduceData::TPartitionProduceData partitionData; + for (const auto& topicData : cookieInfo.Request->Request->Get()->Request->TopicData) { + TString topicPath = NormalizePath(Context->DatabasePath, *topicData.Name); + if (topicPath == cookieInfo.TopicPath) { + for(const auto& partitionData : topicData.PartitionData) { + if (partitionData.Index == static_cast<int>(cookieInfo.PartitionId)) { + SendWriteRequest(partitionData, topicPath, cookieInfo.Request, cookieInfo.Position, cookieInfo.RuPerRequest, ctx); + break; + } + } + } + } + + + Cookies.erase(it); + } +} + +void TKafkaProduceActor::SendWriteRequest(const TProduceRequestData::TTopicProduceData::TPartitionProduceData& partitionData, + const TString& topicName, + TPendingRequest::TPtr pendingRequest, + size_t position, + bool& ruPerRequest, + const TActorContext& ctx + ) { + auto r = pendingRequest->Request->Get()->Request; + const TString& topicPath = NormalizePath(Context->DatabasePath, topicName); + const auto partitionId = partitionData.Index; + TProducerInstanceId producerInstanceId{partitionData.Records->ProducerId, partitionData.Records->ProducerEpoch}; + TMaybe<TString> transactionalId; + if (r->TransactionalId) { + transactionalId.ConstructInPlace(r->TransactionalId->c_str()); + } + + auto writer = PartitionWriter({topicPath, static_cast<ui32>(partitionId)}, producerInstanceId, transactionalId, ctx); + auto& result = pendingRequest->Results[position]; + if (OK == writer.first) { + auto ownCookie = ++Cookie; + auto& cookieInfo = Cookies[ownCookie]; + cookieInfo.TopicPath = topicPath; + cookieInfo.PartitionId = partitionId; + cookieInfo.Position = position; + cookieInfo.RuPerRequest = ruPerRequest; + cookieInfo.Request = pendingRequest; + + pendingRequest->WaitAcceptingCookies.insert(ownCookie); + pendingRequest->WaitResultCookies.insert(ownCookie); + + auto [error, ev] = Convert(transactionalId.GetOrElse(""), partitionData, topicPath, ownCookie, ClientDC, ruPerRequest); + if (error == EKafkaErrors::NONE_ERROR) { + ruPerRequest = false; + KAFKA_LOG_T("Sending TEvPartitionWriter::TEvWriteRequest to " << writer.second << " with cookie " << ownCookie); + Send(writer.second, std::move(ev)); + result.ErrorCode = NONE_ERROR; + } else { + result.ErrorCode = error; + } + } else { + switch (writer.first) { + case NOT_FOUND: + result.ErrorCode = EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION; + break; + case UNAUTHORIZED: + result.ErrorCode = EKafkaErrors::TOPIC_AUTHORIZATION_FAILED; + break; + case PRODUCER_FENCED: + result.ErrorCode = EKafkaErrors::PRODUCER_FENCED; + break; + default: + result.ErrorCode = EKafkaErrors::UNKNOWN_SERVER_ERROR; + } + } + + if (result.ErrorCode != EKafkaErrors::NONE_ERROR) { + KAFKA_LOG_ERROR("Write request failed with error " << result.ErrorCode << " and message " << result.ErrorMessage); + } +} + std::pair<TKafkaProduceActor::ETopicStatus, TActorId> TKafkaProduceActor::PartitionWriter(const TTopicPartition& topicPartition, const TProducerInstanceId& producerInstanceId, const TMaybe<TString>& transactionalId, const TActorContext& ctx) { auto it = Topics.find(topicPartition.TopicPath); if (it == Topics.end()) { @@ -709,7 +765,7 @@ std::pair<TKafkaProduceActor::ETopicStatus, TActorId> TKafkaProduceActor::GetOrC } std::pair<TKafkaProduceActor::ETopicStatus, TActorId> TKafkaProduceActor::CreateTransactionalWriter(const TTopicPartition& topicPartition, const TTopicInfo& topicInfo, const TProducerInstanceId& producerInstanceId, const TString& transactionalId, const TActorContext& ctx) { - KAFKA_LOG_D("Created transactional writer for producerId=" << producerInstanceId.Id << " and producerEpoch=" << producerInstanceId.Epoch << "for topic-partition " << topicPartition.TopicPath << ":" << topicPartition.PartitionId); + KAFKA_LOG_D("Created transactional writer for producerId=" << producerInstanceId.Id << " and producerEpoch=" << producerInstanceId.Epoch << " for topic-partition " << topicPartition.TopicPath << ":" << topicPartition.PartitionId); auto* partition = topicInfo.PartitionChooser->GetPartition(topicPartition.PartitionId); if (!partition) { return { NOT_FOUND, TActorId{} }; diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.h b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h index d6bd720ff52..d89a0c0ee0a 100644 --- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h @@ -170,6 +170,7 @@ private: TString TopicPath; ui32 PartitionId; size_t Position; + bool RuPerRequest; TPendingRequest::TPtr Request; }; @@ -196,6 +197,14 @@ private: std::unordered_map<TString, std::unordered_map<ui32, TWriterInfo>> NonTransactionalWriters; std::unordered_map<TTopicPartition, TWriterInfo, TTopicPartitionHashFn> TransactionalWriters; + void RecreatePartitionWriterAndRetry(ui64 cookie, const TActorContext& ctx); + void SendWriteRequest(const TProduceRequestData::TTopicProduceData::TPartitionProduceData& partitionData, + const TString& topicName, + TPendingRequest::TPtr pendingRequest, + size_t position, + bool& ruPerRequest, + const TActorContext& ctx + ); void CleanWriter(const TTopicPartition& topicPartition, const TActorId& writerId); std::pair<TKafkaProduceActor::ETopicStatus, TActorId> GetOrCreateNonTransactionalWriter(const TTopicPartition& topicPartition, const TTopicInfo& topicInfo, const TProducerInstanceId& producerInstanceId, const TActorContext& ctx); std::pair<TKafkaProduceActor::ETopicStatus, TActorId> GetOrCreateTransactionalWriter(const TTopicPartition& topicPartition, const TTopicInfo& topicInfo, const TProducerInstanceId& producerInstanceId, const TString& transactionalId, const TActorContext& ctx); diff --git a/ydb/core/kafka_proxy/actors/kafka_transaction_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_transaction_actor.cpp index 4b04e384d9b..ff8d8e207e5 100644 --- a/ydb/core/kafka_proxy/actors/kafka_transaction_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_transaction_actor.cpp @@ -148,7 +148,7 @@ namespace NKafka { void TTransactionActor::SendToKqpValidationRequests(const TActorContext& ctx) { KAFKA_LOG_D("Sending select request to KQP for database " << DatabasePath); Kqp->SendYqlRequest( - GetYqlWithTablesNames(NKafkaTransactionSql::SELECT_FOR_VALIDATION), + GetYqlWithTablesNames(), BuildSelectParams(), ++KqpCookie, ctx, @@ -212,19 +212,24 @@ namespace NKafka { return NPersQueue::GetFullTopicPath(DatabasePath, topicName); } - TString TTransactionActor::GetYqlWithTablesNames(const TString& templateStr) { + TString TTransactionActor::GetYqlWithTablesNames() { + const TString& templateStr = OffsetsToCommit.empty() ? NKafkaTransactionSql::SELECT_FOR_VALIDATION_WITHOUT_CONSUMERS : NKafkaTransactionSql::SELECT_FOR_VALIDATION_WITH_CONSUMERS; + TString templateWithProducerStateTable = std::regex_replace( templateStr.c_str(), std::regex("<producer_state_table_name>"), NKikimr::NGRpcProxy::V1::TTransactionalProducersInitManager::GetInstant()->GetStorageTablePath().c_str() ); - TString templateWithConsumerStateTable = std::regex_replace( - templateWithProducerStateTable.c_str(), - std::regex("<consumer_state_table_name>"), - NKikimr::NGRpcProxy::V1::TKafkaConsumerGroupsMetaInitManager::GetInstant()->GetStorageTablePath().c_str() - ); - return templateWithConsumerStateTable; + if (!OffsetsToCommit.empty()) { + return std::regex_replace( + templateWithProducerStateTable.c_str(), + std::regex("<consumer_state_table_name>"), + NKikimr::NGRpcProxy::V1::TKafkaConsumerGroupsMetaInitManager::GetInstant()->GetStorageTablePath().c_str() + ); + } + + return templateWithProducerStateTable; } NYdb::TParams TTransactionActor::BuildSelectParams() { @@ -232,18 +237,20 @@ namespace NKafka { params.AddParam("$Database").Utf8(DatabasePath).Build(); params.AddParam("$TransactionalId").Utf8(TransactionalId).Build(); - // select unique consumer group names - std::unordered_set<TString> uniqueConsumerGroups; - for (auto& [partition, commitDetails] : OffsetsToCommit) { - uniqueConsumerGroups.emplace(commitDetails.ConsumerName); - } - - // add unique consumer group names to request as params - auto& consumerGroupsParamBuilder = params.AddParam("$ConsumerGroups").BeginList(); - for (auto& consumerGroupName : uniqueConsumerGroups) { - consumerGroupsParamBuilder.AddListItem().Utf8(consumerGroupName); + if (!OffsetsToCommit.empty()) { + // select unique consumer group names + std::unordered_set<TString> uniqueConsumerGroups; + for (auto& [partition, commitDetails] : OffsetsToCommit) { + uniqueConsumerGroups.emplace(commitDetails.ConsumerName); + } + + // add unique consumer group names to request as params + auto& consumerGroupsParamBuilder = params.AddParam("$ConsumerGroups").BeginList(); + for (auto& consumerGroupName : uniqueConsumerGroups) { + consumerGroupsParamBuilder.AddListItem().Utf8(consumerGroupName); + } + consumerGroupsParamBuilder.EndList().Build(); } - consumerGroupsParamBuilder.EndList().Build(); return params.Build(); } @@ -283,8 +290,10 @@ namespace NKafka { void TTransactionActor::HandleSelectResponse(const NKqp::TEvKqp::TEvQueryResponse& response, const TActorContext& ctx) { // YDB should return exactly two result sets for two queries: to producer and consumer state tables - if (response.Record.GetResponse().GetYdbResults().size() != 2) { - TString error = TStringBuilder() << "KQP returned wrong number of result sets on SELECT query. Expected 2, got " << response.Record.GetResponse().GetYdbResults().size() << "."; + int resultsSize = response.Record.GetResponse().GetYdbResults().size(); + int expectedResultsSize = OffsetsToCommit.empty() ? 1 : 2; // if there were no consumer in transactions we do not send request to consumer table + if (expectedResultsSize != resultsSize) { + TString error = TStringBuilder() << "KQP returned wrong number of result sets on SELECT query. Expected " << expectedResultsSize << ", got " << resultsSize << "."; KAFKA_LOG_W(error); SendFailResponse<TEndTxnResponseData>(EndTxnRequestPtr, EKafkaErrors::BROKER_NOT_AVAILABLE, error); Die(ctx); @@ -309,15 +318,17 @@ namespace NKafka { return; } - // parse and validate consumers - std::unordered_map<TString, i32> consumerGenerationsByName = ParseConsumersGenerations(response); - if (auto error = GetErrorInConsumersStates(consumerGenerationsByName)) { - KAFKA_LOG_W(error); - SendFailResponse<TEndTxnResponseData>(EndTxnRequestPtr, EKafkaErrors::PRODUCER_FENCED, error->data()); - Die(ctx); - return; + if (!OffsetsToCommit.empty()) { + // parse and validate consumers + std::unordered_map<TString, i32> consumerGenerationsByName = ParseConsumersGenerations(response); + if (auto error = GetErrorInConsumersStates(consumerGenerationsByName)) { + KAFKA_LOG_W(error); + SendFailResponse<TEndTxnResponseData>(EndTxnRequestPtr, EKafkaErrors::PRODUCER_FENCED, error->data()); + Die(ctx); + return; + } } - + KAFKA_LOG_D("Validated producer and consumers states. Everything is alright, adding kafka operations to transaction."); auto kqpTxnId = response.Record.GetResponse().GetTxMeta().id(); // finally everything is valid and we can add kafka operations to transaction and attempt to commit diff --git a/ydb/core/kafka_proxy/actors/kafka_transaction_actor.h b/ydb/core/kafka_proxy/actors/kafka_transaction_actor.h index 551d7e5c2c2..9aec3d5c516 100644 --- a/ydb/core/kafka_proxy/actors/kafka_transaction_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_transaction_actor.h @@ -100,7 +100,7 @@ namespace NKafka { template<class EventType> bool ProducerInRequestIsValid(TMessagePtr<EventType> kafkaRequest); TString GetFullTopicPath(const TString& topicName); - TString GetYqlWithTablesNames(const TString& templateStr); + TString GetYqlWithTablesNames(); NYdb::TParams BuildSelectParams(); THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> BuildAddKafkaOperationsRequest(const TString& kqpTransactionId); void HandleSelectResponse(const NKqp::TEvKqp::TEvQueryResponse& response, const TActorContext& ctx); diff --git a/ydb/core/kafka_proxy/actors/kafka_transaction_actor_sql.cpp b/ydb/core/kafka_proxy/actors/kafka_transaction_actor_sql.cpp index 00d9bdb3a9b..1e1adfaf8a0 100644 --- a/ydb/core/kafka_proxy/actors/kafka_transaction_actor_sql.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_transaction_actor_sql.cpp @@ -2,7 +2,17 @@ namespace NKafka::NKafkaTransactionSql { - TString SELECT_FOR_VALIDATION = R"sql( + TString SELECT_FOR_VALIDATION_WITHOUT_CONSUMERS = R"sql( + --!syntax_v1 + DECLARE $Database AS Utf8; + DECLARE $TransactionalId AS Utf8; + + SELECT * FROM `<producer_state_table_name>` + WHERE database = $Database + AND transactional_id = $TransactionalId; + )sql"; + + TString SELECT_FOR_VALIDATION_WITH_CONSUMERS = R"sql( --!syntax_v1 DECLARE $Database AS Utf8; DECLARE $TransactionalId AS Utf8; diff --git a/ydb/core/kafka_proxy/actors/kafka_transaction_actor_sql.h b/ydb/core/kafka_proxy/actors/kafka_transaction_actor_sql.h index 8a116a90ce0..fdc192499c8 100644 --- a/ydb/core/kafka_proxy/actors/kafka_transaction_actor_sql.h +++ b/ydb/core/kafka_proxy/actors/kafka_transaction_actor_sql.h @@ -7,6 +7,7 @@ namespace NKafka::NKafkaTransactionSql { constexpr ui32 PRODUCER_STATE_REQUEST_INDEX = 0; constexpr ui32 CONSUMER_STATES_REQUEST_INDEX = 1; - extern const TString SELECT_FOR_VALIDATION; + extern const TString SELECT_FOR_VALIDATION_WITHOUT_CONSUMERS; + extern const TString SELECT_FOR_VALIDATION_WITH_CONSUMERS; } // namespace NKafka::NKafkaTransactionSql diff --git a/ydb/core/kafka_proxy/ut/kafka_test_client.h b/ydb/core/kafka_proxy/ut/kafka_test_client.h index b979461f2e9..3dd2da9919a 100644 --- a/ydb/core/kafka_proxy/ut/kafka_test_client.h +++ b/ydb/core/kafka_proxy/ut/kafka_test_client.h @@ -107,7 +107,7 @@ class TKafkaTestClient { TMessagePtr<TListGroupsResponseData> ListGroups(TListGroupsRequestData request); - TMessagePtr<TListGroupsResponseData> ListGroups(const std::vector<std::optional<TString>>& statesFilter); + TMessagePtr<TListGroupsResponseData> ListGroups(const std::vector<std::optional<TString>>& statesFilter = {}); TMessagePtr<TDescribeGroupsResponseData> DescribeGroups(TDescribeGroupsRequestData& request); diff --git a/ydb/core/kafka_proxy/ut/ut_produce_actor.cpp b/ydb/core/kafka_proxy/ut/ut_produce_actor.cpp index 712bc85262f..4af1821c025 100644 --- a/ydb/core/kafka_proxy/ut/ut_produce_actor.cpp +++ b/ydb/core/kafka_proxy/ut/ut_produce_actor.cpp @@ -62,6 +62,7 @@ namespace { Ctx->Runtime->DisableBreakOnStopCondition(); Ctx->Runtime->SetLogPriority(NKikimrServices::KAFKA_PROXY, NLog::PRI_TRACE); Ctx->Runtime->SetLogPriority(NKikimrServices::PQ_WRITE_PROXY, NLog::PRI_TRACE); + Ctx->Runtime->SetLogPriority(NKikimrServices::PERSQUEUE, NLog::PRI_DEBUG); TContext::TPtr kafkaContext = std::make_shared<TContext>(KafkaConfig); kafkaContext->DatabasePath = "/Root/PQ"; ActorId = Ctx->Runtime->Register(CreateKafkaProduceActor(kafkaContext)); @@ -103,6 +104,7 @@ namespace { void AssertCorrectOptsInPartitionWriter(const TActorId& writerId, const TProducerInstanceId& producerInstanceId, const TMaybe<TString>& transactionalId) { NKikimr::NPQ::TPartitionWriter* writer = dynamic_cast<NKikimr::NPQ::TPartitionWriter*>(Ctx->Runtime->FindActor(writerId)); + UNIT_ASSERT(writer); const TPartitionWriterOpts& writerOpts = writer->GetOpts(); UNIT_ASSERT_VALUES_EQUAL(*writerOpts.KafkaProducerInstanceId, producerInstanceId); @@ -112,6 +114,16 @@ namespace { UNIT_ASSERT(transactionalId.Empty()); } } + + THolder<TEvPersQueue::TEvResponse> CreateMissingSupPartitionErrorResponse(ui64 cookie) { + auto event = MakeHolder<TEvPersQueue::TEvResponse>(); + NKikimrClient::TResponse record; + record.SetErrorReason("expected test error"); + record.SetErrorCode(::NPersQueue::NErrorCode::EErrorCode::KAFKA_TRANSACTION_MISSING_SUPPORTIVE_PARTITION); + record.MutablePartitionResponse()->SetCookie(cookie); + event->Record = record; + return event; + } }; Y_UNIT_TEST_SUITE_F(ProduceActor, TProduceActorFixture) { @@ -171,6 +183,53 @@ namespace { UNIT_ASSERT_VALUES_EQUAL(poisonPillReceiver, firstPartitionWriterId); } + Y_UNIT_TEST(OnProduceWithTransactionalId_andLostMessagesError_shouldRecreatePartitionWriterAndRetryProduce) { + i64 producerId = 1; + i32 producerEpoch = 2; + TActorId firstWriteRequestReceiver; + TActorId poisonPillReceiver; + TActorId secondWriteRequestReceiver; + int writeRequestsCounter = 0; + int poisonPillCounter = 0; + auto observer = [&](TAutoPtr<IEventHandle>& input) { + Cout << input->ToString() << Endl; + if (auto* event = input->CastAsLocal<TEvPartitionWriter::TEvWriteRequest>()) { + if (writeRequestsCounter == 0) { + firstWriteRequestReceiver = input->Recipient; + AssertCorrectOptsInPartitionWriter(firstWriteRequestReceiver, {producerId, producerEpoch}, TransactionalId); + } else if (writeRequestsCounter == 1) { + secondWriteRequestReceiver = input->Recipient; + AssertCorrectOptsInPartitionWriter(secondWriteRequestReceiver, {producerId, producerEpoch}, TransactionalId); + } + writeRequestsCounter++; + } else if (auto* event = input->CastAsLocal<TEvPersQueue::TEvRequest>()) { + if (event->Record.GetPartitionRequest().HasCmdReserveBytes()) { + Ctx->Runtime->Send(new IEventHandle(firstWriteRequestReceiver, input->Sender, CreateMissingSupPartitionErrorResponse(event->Record.GetPartitionRequest().GetCookie()).Release())); + return TTestActorRuntimeBase::EEventAction::DROP; + } + } else if (auto* event = input->CastAsLocal<TEvents::TEvPoison>()) { + if (poisonPillCounter == 0) { // only first poison pill goes to writer + poisonPillReceiver = input->Recipient; + poisonPillCounter++; + } // we are not interested in all subsequent + } + + return TTestActorRuntimeBase::EEventAction::PROCESS; + }; + Ctx->Runtime->SetObserverFunc(observer); + + SendProduce(TransactionalId, producerId, producerEpoch); + + TDispatchOptions options; + options.CustomFinalCondition = [&writeRequestsCounter, &poisonPillCounter]() { + return writeRequestsCounter > 1 && poisonPillCounter > 0; + }; + UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options)); + + UNIT_ASSERT_VALUES_UNEQUAL(firstWriteRequestReceiver, secondWriteRequestReceiver); + UNIT_ASSERT_VALUES_EQUAL(firstWriteRequestReceiver, poisonPillReceiver); + } + Y_UNIT_TEST(OnProduceWithoutTransactionalId_shouldNotKillOldWriter) { i64 producerId = 0; i32 producerEpoch = 0; diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp index bbb41c85fee..53adc7be319 100644 --- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp +++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp @@ -3367,6 +3367,137 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { } } + Y_UNIT_TEST(Several_Subsequent_Transactions_Scenario) { + TInsecureTestServer testServer("1", false, true); + TKafkaTestClient kafkaClient(testServer.Port); + NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); + // use random values to avoid parallel execution problems + TString inputTopicName = TStringBuilder() << "input-topic-" << RandomNumber<ui64>(); + TString outputTopicName = TStringBuilder() << "output-topic-" << RandomNumber<ui64>(); + TString transactionalId = TStringBuilder() << "my-tx-producer-" << RandomNumber<ui64>(); + TString consumerName = "my-consumer"; + + // create input and output topics + CreateTopic(pqClient, inputTopicName, 3, {consumerName}); + CreateTopic(pqClient, outputTopicName, 3, {consumerName}); + + // produce data to input topic (to commit offsets in further steps) + auto inputProduceResponse = kafkaClient.Produce({inputTopicName, 0}, {{"key1", "val1"}, {"key2", "val2"}, {"key3", "val3"}}); + UNIT_ASSERT_VALUES_EQUAL(inputProduceResponse->Responses[0].PartitionResponses[0].ErrorCode, EKafkaErrors::NONE_ERROR); + + // init producer id + auto initProducerIdResp = kafkaClient.InitProducerId(transactionalId, 30000); + UNIT_ASSERT_VALUES_EQUAL(initProducerIdResp->ErrorCode, EKafkaErrors::NONE_ERROR); + TProducerInstanceId producerInstanceId = {initProducerIdResp->ProducerId, initProducerIdResp->ProducerEpoch}; + + ui32 totalTxns = 3; + for (ui32 i = 0; i < totalTxns; ++i) { + // add partitions to txn + std::unordered_map<TString, std::vector<ui32>> topicPartitionsToAddToTxn; + topicPartitionsToAddToTxn[outputTopicName] = std::vector<ui32>{0, 1}; + auto addPartsResponse = kafkaClient.AddPartitionsToTxn(transactionalId, producerInstanceId, topicPartitionsToAddToTxn); + UNIT_ASSERT_VALUES_EQUAL(addPartsResponse->Results[0].Results[0].ErrorCode, EKafkaErrors::NONE_ERROR); + UNIT_ASSERT_VALUES_EQUAL(addPartsResponse->Results[0].Results[1].ErrorCode, EKafkaErrors::NONE_ERROR); + + // produce data + // to part 0 + auto out0ProduceResponse = kafkaClient.Produce({outputTopicName, 0}, {{std::to_string(i), "123"}}, i, producerInstanceId, transactionalId); + UNIT_ASSERT_VALUES_EQUAL_C(out0ProduceResponse->Responses[0].PartitionResponses[0].ErrorCode, EKafkaErrors::NONE_ERROR, TStringBuilder() << "Txn " << i + 1); + // to part 1 + auto out1ProduceResponse = kafkaClient.Produce({outputTopicName, 1}, {{std::to_string(i + totalTxns), "987"}}, i, producerInstanceId, transactionalId); + UNIT_ASSERT_VALUES_EQUAL_C(out1ProduceResponse->Responses[0].PartitionResponses[0].ErrorCode, EKafkaErrors::NONE_ERROR, TStringBuilder() << "Txn " << i + 1); + + // init consumer + std::vector<TString> topicsToSubscribe; + topicsToSubscribe.push_back(outputTopicName); + TString protocolName = "range"; + auto consumerInfo = kafkaClient.JoinAndSyncGroupAndWaitPartitions(topicsToSubscribe, consumerName, 3, protocolName, 3, 15000); + + // add offsets to txn + auto addOffsetsResponse = kafkaClient.AddOffsetsToTxn(transactionalId, producerInstanceId, consumerName); + UNIT_ASSERT_VALUES_EQUAL(addOffsetsResponse->ErrorCode, EKafkaErrors::NONE_ERROR); + + // txn offset commit + std::unordered_map<TString, std::vector<std::pair<ui32, ui64>>> offsetsToCommit; + offsetsToCommit[inputTopicName] = std::vector<std::pair<ui32, ui64>>{{0, i + 1}}; + auto txnOffsetCommitResponse = kafkaClient.TxnOffsetCommit(transactionalId, producerInstanceId, consumerName, consumerInfo.GenerationId, offsetsToCommit); + UNIT_ASSERT_VALUES_EQUAL(txnOffsetCommitResponse->Topics[0].Partitions[0].ErrorCode, EKafkaErrors::NONE_ERROR); + + // end txn + auto endTxnResponse = kafkaClient.EndTxn(transactionalId, producerInstanceId, true); + UNIT_ASSERT_VALUES_EQUAL(endTxnResponse->ErrorCode, EKafkaErrors::NONE_ERROR); + + // validate data is accessible in target topic + auto fetchResponse1 = kafkaClient.Fetch({{outputTopicName, {0, 1}}}); + UNIT_ASSERT_VALUES_EQUAL(fetchResponse1->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(fetchResponse1->Responses[0].Partitions[0].Records->Records.size(), i + 1); + UNIT_ASSERT_VALUES_EQUAL(fetchResponse1->Responses[0].Partitions[1].Records->Records.size(), i + 1); + auto record1 = fetchResponse1->Responses[0].Partitions[0].Records->Records[i]; + UNIT_ASSERT_VALUES_EQUAL(TString(record1.Key.value().data(), record1.Key.value().size()), std::to_string(i)); + UNIT_ASSERT_VALUES_EQUAL(TString(record1.Value.value().data(), record1.Value.value().size()), "123"); + auto record2 = fetchResponse1->Responses[0].Partitions[1].Records->Records[i]; + UNIT_ASSERT_VALUES_EQUAL(TString(record2.Key.value().data(), record2.Key.value().size()), std::to_string(i + totalTxns)); + UNIT_ASSERT_VALUES_EQUAL(TString(record2.Value.value().data(), record2.Value.value().size()), "987"); + + // validate consumer offset committed + std::map<TString, std::vector<i32>> topicsToPartitionsToFetch; + topicsToPartitionsToFetch[inputTopicName] = std::vector<i32>{0}; + auto offsetFetchResponse = kafkaClient.OffsetFetch(consumerName, topicsToPartitionsToFetch); + UNIT_ASSERT_VALUES_EQUAL(offsetFetchResponse->ErrorCode, EKafkaErrors::NONE_ERROR); + UNIT_ASSERT_VALUES_EQUAL(offsetFetchResponse->Groups[0].Topics[0].Partitions[0].CommittedOffset, i + 1); + } + } + + Y_UNIT_TEST(Several_Writes_In_One_Transaction) { + TInsecureTestServer testServer("1", false, true); + TKafkaTestClient kafkaClient(testServer.Port); + NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); + // use random values to avoid parallel execution problems + TString topicName = TStringBuilder() << "input-topic-" << RandomNumber<ui64>(); + TString transactionalId = TStringBuilder() << "my-tx-producer-" << RandomNumber<ui64>(); + TString consumerName = "my-consumer"; + + CreateTopic(pqClient, topicName, 1, {consumerName}); + + // init producer id + auto initProducerIdResp = kafkaClient.InitProducerId(transactionalId, 30000); + UNIT_ASSERT_VALUES_EQUAL(initProducerIdResp->ErrorCode, EKafkaErrors::NONE_ERROR); + TProducerInstanceId producerInstanceId = {initProducerIdResp->ProducerId, initProducerIdResp->ProducerEpoch}; + + // add partitions to txn + std::unordered_map<TString, std::vector<ui32>> topicPartitionsToAddToTxn; + topicPartitionsToAddToTxn[topicName] = std::vector<ui32>{0}; + auto addPartsResponse = kafkaClient.AddPartitionsToTxn(transactionalId, producerInstanceId, topicPartitionsToAddToTxn); + UNIT_ASSERT_VALUES_EQUAL(addPartsResponse->Results[0].Results[0].ErrorCode, EKafkaErrors::NONE_ERROR); + + // produce data + auto produceResponse0 = kafkaClient.Produce({topicName, 0}, {{"0", "123"}}, 0, producerInstanceId, transactionalId); + UNIT_ASSERT_VALUES_EQUAL(produceResponse0->Responses[0].PartitionResponses[0].ErrorCode, EKafkaErrors::NONE_ERROR); + auto produceResponse1 = kafkaClient.Produce({topicName, 0}, {{"1", "234"}}, 1, producerInstanceId, transactionalId); + UNIT_ASSERT_VALUES_EQUAL(produceResponse1->Responses[0].PartitionResponses[0].ErrorCode, EKafkaErrors::NONE_ERROR); + + // init consumer + std::vector<TString> topicsToSubscribe; + topicsToSubscribe.push_back(topicName); + TString protocolName = "range"; + auto consumerInfo = kafkaClient.JoinAndSyncGroupAndWaitPartitions(topicsToSubscribe, consumerName, 1, protocolName, 1, 15000); + + // end txn + auto endTxnResponse = kafkaClient.EndTxn(transactionalId, producerInstanceId, true); + UNIT_ASSERT_VALUES_EQUAL(endTxnResponse->ErrorCode, EKafkaErrors::NONE_ERROR); + + // validate data is accessible in target topic + auto fetchResponse1 = kafkaClient.Fetch({{topicName, {0}}}); + UNIT_ASSERT_VALUES_EQUAL(fetchResponse1->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(fetchResponse1->Responses[0].Partitions[0].Records->Records.size(), 2); + auto record1 = fetchResponse1->Responses[0].Partitions[0].Records->Records[0]; + UNIT_ASSERT_VALUES_EQUAL(TString(record1.Key.value().data(), record1.Key.value().size()), "0"); + UNIT_ASSERT_VALUES_EQUAL(TString(record1.Value.value().data(), record1.Value.value().size()), "123"); + auto record2 = fetchResponse1->Responses[0].Partitions[0].Records->Records[1]; + UNIT_ASSERT_VALUES_EQUAL(TString(record2.Key.value().data(), record2.Key.value().size()), "1"); + UNIT_ASSERT_VALUES_EQUAL(TString(record2.Value.value().data(), record2.Value.value().size()), "234"); + } + Y_UNIT_TEST(Commit_Transaction_After_timeout_should_return_producer_fenced) { TInsecureTestServer testServer("1", false, true); TKafkaTestClient kafkaClient(testServer.Port); diff --git a/ydb/core/kafka_proxy/ut/ut_transaction_actor.cpp b/ydb/core/kafka_proxy/ut/ut_transaction_actor.cpp index efb9ac6b200..aac121a2fbf 100644 --- a/ydb/core/kafka_proxy/ut/ut_transaction_actor.cpp +++ b/ydb/core/kafka_proxy/ut/ut_transaction_actor.cpp @@ -15,7 +15,7 @@ namespace { public: TDummyKqpActor() : TActor<TDummyKqpActor>(&TDummyKqpActor::StateFunc) {} - void SetValidationResponse(const TString& transactionalId, i64 producerId, i32 producerEpoch, const std::unordered_map<TString, i32>& consumerGenerations = {}) { + void SetValidationResponse(const TString& transactionalId, i64 producerId, i32 producerEpoch, const TMaybe<std::unordered_map<TString, i32>>& consumerGenerations = Nothing()) { TransactionalIdToReturn = transactionalId; ProducerIdToReturn = producerId; ProducerEpochToReturn = producerEpoch; @@ -81,8 +81,10 @@ namespace { record.SetYdbStatus(Ydb::StatusIds::SUCCESS); auto* producerState = record.MutableResponse()->AddYdbResults(); *producerState = CreateProducerStateResultsSet(); - auto* consumersState = record.MutableResponse()->AddYdbResults(); - *consumersState = CreateConsumersStatesResultSet(); + if (ConsumerGenerationsToReturn.Defined()) { + auto* consumersState = record.MutableResponse()->AddYdbResults(); + *consumersState = CreateConsumersStatesResultSet(); + } response->Record = record; return response; @@ -140,7 +142,7 @@ namespace { " }\n" "}\n"; - for (auto& [consumerName, generation] : ConsumerGenerationsToReturn) { + for (auto& [consumerName, generation] : *ConsumerGenerationsToReturn) { builder << "rows {\n" " items {\n" @@ -159,7 +161,7 @@ namespace { TString TransactionalIdToReturn = ""; i64 ProducerIdToReturn = 0; i32 ProducerEpochToReturn = 0; - std::unordered_map<TString, i32> ConsumerGenerationsToReturn = {}; + TMaybe<std::unordered_map<TString, i32>> ConsumerGenerationsToReturn = Nothing(); bool ReturnSuccessOnCommit = true; }; @@ -294,7 +296,7 @@ namespace { // Arguments: // 1. callback - function, that will be called on recieving the response from KQP om commit request // 2. consumerGenerationsToReturnInValidationRequest - map of consumer name to its generation to ensure proper validation of consumer state by actor - void AddObserverForAddOperationsRequest(std::function<void(const TEvKqp::TEvQueryRequest*)> callback, std::unordered_map<TString, i32> consumerGenerationsToReturnInValidationRequest = {}) { + void AddObserverForAddOperationsRequest(std::function<void(const TEvKqp::TEvQueryRequest*)> callback, TMaybe<std::unordered_map<TString, i32>> consumerGenerationsToReturnInValidationRequest = Nothing()) { DummyKqpActor->SetValidationResponse(TransactionalId, ProducerId, ProducerEpoch, consumerGenerationsToReturnInValidationRequest); auto observer = [callback = std::move(callback), this](TAutoPtr<IEventHandle>& input) { diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index e2f5dce9f87..ac5d77c00b3 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -3474,6 +3474,7 @@ void TPartition::ScheduleReplyError(const ui64 dst, NPersQueue::NErrorCode::EErrorCode errorCode, const TString& error) { + PQ_LOG_ERROR("Got error: " << error); Replies.emplace_back(Tablet, MakeReplyError(dst, errorCode, diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index a6e839941f6..52081e0a1ec 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -1212,7 +1212,7 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey ((sourceId.SeqNo() && *sourceId.SeqNo() >= p.Msg.SeqNo) || (p.InitialSeqNo && p.InitialSeqNo.value() >= p.Msg.SeqNo)) ) { if (poffset >= curOffset) { - PQ_LOG_D("Already written message. Topic: '" << TopicName() + PQ_LOG_W("Already written message. Topic: '" << TopicName() << "' Partition: " << Partition << " SourceId: '" << EscapeC(p.Msg.SourceId) << "'. Message seqNo: " << p.Msg.SeqNo << ". Committed seqNo: " << sourceId.CommittedSeqNo() diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp index 33f60d2c121..3b8b81dec2c 100644 --- a/ydb/core/persqueue/pq_impl.cpp +++ b/ydb/core/persqueue/pq_impl.cpp @@ -395,7 +395,7 @@ public: bool HandleError(TEvPQ::TEvError *ev, const TActorContext& ctx) { - PQ_LOG_D("Answer error topic: '" << TopicName << "'" << + PQ_LOG_ERROR("Answer error topic: '" << TopicName << "'" << " partition: " << Partition << " messageNo: " << MessageNo << " requestId: " << ReqId << @@ -2704,10 +2704,18 @@ void TPersQueue::HandleEventForSupportivePartition(const ui64 responseCookie, } } else { if (!req.GetNeedSupportivePartition()) { + // missing supportivce partition in kafka transaction means that we already committed and deleted transaction for current producerId + producerEpoch + NPersQueue::NErrorCode::EErrorCode errorCode = writeId.KafkaApiTransaction ? + NPersQueue::NErrorCode::KAFKA_TRANSACTION_MISSING_SUPPORTIVE_PARTITION : + NPersQueue::NErrorCode::PRECONDITION_FAILED; + TString error = writeId.KafkaApiTransaction ? + "Kafka transaction and there is no supportive partition for current producerId and producerEpoch. It means GetOwnership request was not called from TPartitionWriter" : + "lost messages"; + ReplyError(ctx, responseCookie, - NPersQueue::NErrorCode::PRECONDITION_FAILED, - "lost messages"); + errorCode, + error); return; } diff --git a/ydb/core/persqueue/write_id.cpp b/ydb/core/persqueue/write_id.cpp index fe7959baa5f..bb87e99211d 100644 --- a/ydb/core/persqueue/write_id.cpp +++ b/ydb/core/persqueue/write_id.cpp @@ -27,7 +27,11 @@ bool TWriteId::operator<(const TWriteId& rhs) const void TWriteId::ToStream(IOutputStream& s) const { - s << '{' << NodeId << ", " << KeyId << '}'; + if (KafkaApiTransaction) { + s << "KafkaTransactionWriteId{" << KafkaProducerInstanceId.Id << ", " << KafkaProducerInstanceId.Epoch << '}'; + } else { + s << '{' << NodeId << ", " << KeyId << '}'; + } } template <class T> diff --git a/ydb/public/api/protos/draft/persqueue_error_codes.proto b/ydb/public/api/protos/draft/persqueue_error_codes.proto index 9a32a6f00ba..7296cd932f3 100644 --- a/ydb/public/api/protos/draft/persqueue_error_codes.proto +++ b/ydb/public/api/protos/draft/persqueue_error_codes.proto @@ -54,7 +54,8 @@ enum EErrorCode { KAFKA_INVALID_PRODUCER_EPOCH = 32; KAFKA_OUT_OF_ORDER_SEQUENCE_NUMBER = 33; - KAFKA_DUPLICATE_SEQUENCE_NUMBER = 34; + KAFKA_TRANSACTION_MISSING_SUPPORTIVE_PARTITION = 34; + KAFKA_DUPLICATE_SEQUENCE_NUMBER = 35; ERROR = 100; } |