aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAndrey Serebryanskiy <serebryanskiy@ydb.tech>2025-07-22 21:00:33 +0300
committerGitHub <noreply@github.com>2025-07-22 18:00:33 +0000
commitdda968ffaa0f48643ff8e16d4fe956e6aab18387 (patch)
treeda7b9c8a572fecf4ee060da73f38ae9f4d7538f5
parent8ae2a630a8f04f35de5df97c0e8510115af4d3d5 (diff)
downloadydb-dda968ffaa0f48643ff8e16d4fe956e6aab18387.tar.gz
[Kafka API] Fix bug in kafka transactions (#21310)
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp158
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_produce_actor.h9
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_transaction_actor.cpp69
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_transaction_actor.h2
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_transaction_actor_sql.cpp12
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_transaction_actor_sql.h3
-rw-r--r--ydb/core/kafka_proxy/ut/kafka_test_client.h2
-rw-r--r--ydb/core/kafka_proxy/ut/ut_produce_actor.cpp59
-rw-r--r--ydb/core/kafka_proxy/ut/ut_protocol.cpp131
-rw-r--r--ydb/core/kafka_proxy/ut/ut_transaction_actor.cpp14
-rw-r--r--ydb/core/persqueue/partition.cpp1
-rw-r--r--ydb/core/persqueue/partition_write.cpp2
-rw-r--r--ydb/core/persqueue/pq_impl.cpp14
-rw-r--r--ydb/core/persqueue/write_id.cpp6
-rw-r--r--ydb/public/api/protos/draft/persqueue_error_codes.proto3
15 files changed, 389 insertions, 96 deletions
diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
index 64f2a71152c..3bb17afef27 100644
--- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
@@ -378,54 +378,7 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co
for(const auto& topicData : r->TopicData) {
const TString& topicPath = NormalizePath(Context->DatabasePath, *topicData.Name);
for(const auto& partitionData : topicData.PartitionData) {
- const auto partitionId = partitionData.Index;
- TProducerInstanceId producerInstanceId{partitionData.Records->ProducerId, partitionData.Records->ProducerEpoch};
- TMaybe<TString> transactionalId;
- if (r->TransactionalId) {
- transactionalId.ConstructInPlace(r->TransactionalId->c_str());
- }
-
- auto writer = PartitionWriter({topicPath, static_cast<ui32>(partitionId)}, producerInstanceId, transactionalId, ctx);
- auto& result = pendingRequest->Results[position];
- if (OK == writer.first) {
- auto ownCookie = ++Cookie;
- auto& cookieInfo = Cookies[ownCookie];
- cookieInfo.TopicPath = topicPath;
- cookieInfo.PartitionId = partitionId;
- cookieInfo.Position = position;
- cookieInfo.Request = pendingRequest;
-
- pendingRequest->WaitAcceptingCookies.insert(ownCookie);
- pendingRequest->WaitResultCookies.insert(ownCookie);
-
- auto [error, ev] = Convert(transactionalId.GetOrElse(""), partitionData, *topicData.Name, ownCookie, ClientDC, ruPerRequest);
- if (error == EKafkaErrors::NONE_ERROR) {
- ruPerRequest = false;
- Send(writer.second, std::move(ev));
- result.ErrorCode = NONE_ERROR;
- } else {
- result.ErrorCode = error;
- }
- } else {
- switch (writer.first) {
- case NOT_FOUND:
- result.ErrorCode = EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION;
- break;
- case UNAUTHORIZED:
- result.ErrorCode = EKafkaErrors::TOPIC_AUTHORIZATION_FAILED;
- break;
- case PRODUCER_FENCED:
- result.ErrorCode = EKafkaErrors::PRODUCER_FENCED;
- break;
- default:
- result.ErrorCode = EKafkaErrors::UNKNOWN_SERVER_ERROR;
- }
- }
-
- if (result.ErrorCode != EKafkaErrors::NONE_ERROR) {
- KAFKA_LOG_ERROR("Write request failed with error " << result.ErrorCode << " and message " << result.ErrorMessage);
- }
-
+ SendWriteRequest(partitionData, *topicData.Name, pendingRequest, position, ruPerRequest, ctx);
++position;
}
}
@@ -465,6 +418,7 @@ void TKafkaProduceActor::Handle(TEvPartitionWriter::TEvInitResult::TPtr request,
void TKafkaProduceActor::Handle(TEvPartitionWriter::TEvWriteResponse::TPtr request, const TActorContext& ctx) {
auto r = request->Get();
auto cookie = r->Record.GetPartitionResponse().GetCookie();
+ KAFKA_LOG_T("Handling TEvPartitionWriter::TEvWriteResponse with cookie " << cookie);
auto it = Cookies.find(cookie);
if (it == Cookies.end()) {
@@ -478,7 +432,12 @@ void TKafkaProduceActor::Handle(TEvPartitionWriter::TEvWriteResponse::TPtr reque
partitionResult.Value = request;
cookieInfo.Request->WaitResultCookies.erase(cookie);
- if (!r->IsSuccess()) {
+ // Missing supportive partition means that we wrote in transaction and that transaction ended, thus suppprtive partition was deleted
+ // it means that we are writing in a new transaction and need to create a new partition writer (cause only partition writer in init state properly creates supportive partition)
+ if (r->Record.GetErrorCode() == NPersQueue::NErrorCode::EErrorCode::KAFKA_TRANSACTION_MISSING_SUPPORTIVE_PARTITION) {
+ RecreatePartitionWriterAndRetry(cookie, ctx);
+ return;
+ } else if (!r->IsSuccess()) {
auto wit = NonTransactionalWriters.find(cookieInfo.TopicPath);
if (wit != NonTransactionalWriters.end()) {
auto& partitions = wit->second;
@@ -488,6 +447,11 @@ void TKafkaProduceActor::Handle(TEvPartitionWriter::TEvWriteResponse::TPtr reque
partitions.erase(pit);
}
}
+ auto txnIt = TransactionalWriters.find({cookieInfo.TopicPath, cookieInfo.PartitionId});
+ if (txnIt != TransactionalWriters.end()) {
+ Send(txnIt->second.ActorId, new TEvents::TEvPoison());
+ TransactionalWriters.erase(txnIt);
+ }
}
if (cookieInfo.Request->WaitResultCookies.empty()) {
@@ -569,7 +533,11 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) {
partitionResponse.BaseOffset = writeResults.at(0).GetOffset();
}
} else {
- KAFKA_LOG_ERROR("Produce actor: Partition result with error: ErrorCode=" << static_cast<int>(Convert(msg->GetError().Code)) << ", ErrorMessage=" << msg->GetError().Reason << ", #02");
+ KAFKA_LOG_ERROR("Produce actor: Partition result with error: ErrorCode="
+ << static_cast<int>(Convert(msg->GetError().Code))
+ << ", ErrorMessage=" << msg->GetError().Reason
+ << ", Error from writer=" << static_cast<int>(msg->Record.GetErrorCode())
+ << ", #02");
SendMetrics(TStringBuilder() << topicData.Name, recordsCount, "failed_messages", ctx);
if (msg->Record.GetErrorCode() == NPersQueue::NErrorCode::KAFKA_INVALID_PRODUCER_EPOCH) {
@@ -641,6 +609,94 @@ void TKafkaProduceActor::ProcessInitializationRequests(const TActorContext& ctx)
ctx.Send(MakeSchemeCacheID(), MakeHolder<TEvTxProxySchemeCache::TEvNavigateKeySet>(request.release()));
}
+void TKafkaProduceActor::RecreatePartitionWriterAndRetry(ui64 cookie, const TActorContext& ctx) {
+ auto it = Cookies.find(cookie);
+ if (it != Cookies.end()) {
+ auto& cookieInfo = it->second;
+ KAFKA_LOG_D("Transaction was committed. Retrying produce request as a part of next transaction");
+ auto txnIt = TransactionalWriters.find({cookieInfo.TopicPath, cookieInfo.PartitionId});
+ if (txnIt != TransactionalWriters.end()) {
+ Send(txnIt->second.ActorId, new TEvents::TEvPoison());
+ TransactionalWriters.erase(txnIt);
+ }
+ TProduceRequestData::TProduceRequestData::TTopicProduceData::TPartitionProduceData partitionData;
+ for (const auto& topicData : cookieInfo.Request->Request->Get()->Request->TopicData) {
+ TString topicPath = NormalizePath(Context->DatabasePath, *topicData.Name);
+ if (topicPath == cookieInfo.TopicPath) {
+ for(const auto& partitionData : topicData.PartitionData) {
+ if (partitionData.Index == static_cast<int>(cookieInfo.PartitionId)) {
+ SendWriteRequest(partitionData, topicPath, cookieInfo.Request, cookieInfo.Position, cookieInfo.RuPerRequest, ctx);
+ break;
+ }
+ }
+ }
+ }
+
+
+ Cookies.erase(it);
+ }
+}
+
+void TKafkaProduceActor::SendWriteRequest(const TProduceRequestData::TTopicProduceData::TPartitionProduceData& partitionData,
+ const TString& topicName,
+ TPendingRequest::TPtr pendingRequest,
+ size_t position,
+ bool& ruPerRequest,
+ const TActorContext& ctx
+ ) {
+ auto r = pendingRequest->Request->Get()->Request;
+ const TString& topicPath = NormalizePath(Context->DatabasePath, topicName);
+ const auto partitionId = partitionData.Index;
+ TProducerInstanceId producerInstanceId{partitionData.Records->ProducerId, partitionData.Records->ProducerEpoch};
+ TMaybe<TString> transactionalId;
+ if (r->TransactionalId) {
+ transactionalId.ConstructInPlace(r->TransactionalId->c_str());
+ }
+
+ auto writer = PartitionWriter({topicPath, static_cast<ui32>(partitionId)}, producerInstanceId, transactionalId, ctx);
+ auto& result = pendingRequest->Results[position];
+ if (OK == writer.first) {
+ auto ownCookie = ++Cookie;
+ auto& cookieInfo = Cookies[ownCookie];
+ cookieInfo.TopicPath = topicPath;
+ cookieInfo.PartitionId = partitionId;
+ cookieInfo.Position = position;
+ cookieInfo.RuPerRequest = ruPerRequest;
+ cookieInfo.Request = pendingRequest;
+
+ pendingRequest->WaitAcceptingCookies.insert(ownCookie);
+ pendingRequest->WaitResultCookies.insert(ownCookie);
+
+ auto [error, ev] = Convert(transactionalId.GetOrElse(""), partitionData, topicPath, ownCookie, ClientDC, ruPerRequest);
+ if (error == EKafkaErrors::NONE_ERROR) {
+ ruPerRequest = false;
+ KAFKA_LOG_T("Sending TEvPartitionWriter::TEvWriteRequest to " << writer.second << " with cookie " << ownCookie);
+ Send(writer.second, std::move(ev));
+ result.ErrorCode = NONE_ERROR;
+ } else {
+ result.ErrorCode = error;
+ }
+ } else {
+ switch (writer.first) {
+ case NOT_FOUND:
+ result.ErrorCode = EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION;
+ break;
+ case UNAUTHORIZED:
+ result.ErrorCode = EKafkaErrors::TOPIC_AUTHORIZATION_FAILED;
+ break;
+ case PRODUCER_FENCED:
+ result.ErrorCode = EKafkaErrors::PRODUCER_FENCED;
+ break;
+ default:
+ result.ErrorCode = EKafkaErrors::UNKNOWN_SERVER_ERROR;
+ }
+ }
+
+ if (result.ErrorCode != EKafkaErrors::NONE_ERROR) {
+ KAFKA_LOG_ERROR("Write request failed with error " << result.ErrorCode << " and message " << result.ErrorMessage);
+ }
+}
+
std::pair<TKafkaProduceActor::ETopicStatus, TActorId> TKafkaProduceActor::PartitionWriter(const TTopicPartition& topicPartition, const TProducerInstanceId& producerInstanceId, const TMaybe<TString>& transactionalId, const TActorContext& ctx) {
auto it = Topics.find(topicPartition.TopicPath);
if (it == Topics.end()) {
@@ -709,7 +765,7 @@ std::pair<TKafkaProduceActor::ETopicStatus, TActorId> TKafkaProduceActor::GetOrC
}
std::pair<TKafkaProduceActor::ETopicStatus, TActorId> TKafkaProduceActor::CreateTransactionalWriter(const TTopicPartition& topicPartition, const TTopicInfo& topicInfo, const TProducerInstanceId& producerInstanceId, const TString& transactionalId, const TActorContext& ctx) {
- KAFKA_LOG_D("Created transactional writer for producerId=" << producerInstanceId.Id << " and producerEpoch=" << producerInstanceId.Epoch << "for topic-partition " << topicPartition.TopicPath << ":" << topicPartition.PartitionId);
+ KAFKA_LOG_D("Created transactional writer for producerId=" << producerInstanceId.Id << " and producerEpoch=" << producerInstanceId.Epoch << " for topic-partition " << topicPartition.TopicPath << ":" << topicPartition.PartitionId);
auto* partition = topicInfo.PartitionChooser->GetPartition(topicPartition.PartitionId);
if (!partition) {
return { NOT_FOUND, TActorId{} };
diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.h b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h
index d6bd720ff52..d89a0c0ee0a 100644
--- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.h
+++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.h
@@ -170,6 +170,7 @@ private:
TString TopicPath;
ui32 PartitionId;
size_t Position;
+ bool RuPerRequest;
TPendingRequest::TPtr Request;
};
@@ -196,6 +197,14 @@ private:
std::unordered_map<TString, std::unordered_map<ui32, TWriterInfo>> NonTransactionalWriters;
std::unordered_map<TTopicPartition, TWriterInfo, TTopicPartitionHashFn> TransactionalWriters;
+ void RecreatePartitionWriterAndRetry(ui64 cookie, const TActorContext& ctx);
+ void SendWriteRequest(const TProduceRequestData::TTopicProduceData::TPartitionProduceData& partitionData,
+ const TString& topicName,
+ TPendingRequest::TPtr pendingRequest,
+ size_t position,
+ bool& ruPerRequest,
+ const TActorContext& ctx
+ );
void CleanWriter(const TTopicPartition& topicPartition, const TActorId& writerId);
std::pair<TKafkaProduceActor::ETopicStatus, TActorId> GetOrCreateNonTransactionalWriter(const TTopicPartition& topicPartition, const TTopicInfo& topicInfo, const TProducerInstanceId& producerInstanceId, const TActorContext& ctx);
std::pair<TKafkaProduceActor::ETopicStatus, TActorId> GetOrCreateTransactionalWriter(const TTopicPartition& topicPartition, const TTopicInfo& topicInfo, const TProducerInstanceId& producerInstanceId, const TString& transactionalId, const TActorContext& ctx);
diff --git a/ydb/core/kafka_proxy/actors/kafka_transaction_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_transaction_actor.cpp
index 4b04e384d9b..ff8d8e207e5 100644
--- a/ydb/core/kafka_proxy/actors/kafka_transaction_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_transaction_actor.cpp
@@ -148,7 +148,7 @@ namespace NKafka {
void TTransactionActor::SendToKqpValidationRequests(const TActorContext& ctx) {
KAFKA_LOG_D("Sending select request to KQP for database " << DatabasePath);
Kqp->SendYqlRequest(
- GetYqlWithTablesNames(NKafkaTransactionSql::SELECT_FOR_VALIDATION),
+ GetYqlWithTablesNames(),
BuildSelectParams(),
++KqpCookie,
ctx,
@@ -212,19 +212,24 @@ namespace NKafka {
return NPersQueue::GetFullTopicPath(DatabasePath, topicName);
}
- TString TTransactionActor::GetYqlWithTablesNames(const TString& templateStr) {
+ TString TTransactionActor::GetYqlWithTablesNames() {
+ const TString& templateStr = OffsetsToCommit.empty() ? NKafkaTransactionSql::SELECT_FOR_VALIDATION_WITHOUT_CONSUMERS : NKafkaTransactionSql::SELECT_FOR_VALIDATION_WITH_CONSUMERS;
+
TString templateWithProducerStateTable = std::regex_replace(
templateStr.c_str(),
std::regex("<producer_state_table_name>"),
NKikimr::NGRpcProxy::V1::TTransactionalProducersInitManager::GetInstant()->GetStorageTablePath().c_str()
);
- TString templateWithConsumerStateTable = std::regex_replace(
- templateWithProducerStateTable.c_str(),
- std::regex("<consumer_state_table_name>"),
- NKikimr::NGRpcProxy::V1::TKafkaConsumerGroupsMetaInitManager::GetInstant()->GetStorageTablePath().c_str()
- );
- return templateWithConsumerStateTable;
+ if (!OffsetsToCommit.empty()) {
+ return std::regex_replace(
+ templateWithProducerStateTable.c_str(),
+ std::regex("<consumer_state_table_name>"),
+ NKikimr::NGRpcProxy::V1::TKafkaConsumerGroupsMetaInitManager::GetInstant()->GetStorageTablePath().c_str()
+ );
+ }
+
+ return templateWithProducerStateTable;
}
NYdb::TParams TTransactionActor::BuildSelectParams() {
@@ -232,18 +237,20 @@ namespace NKafka {
params.AddParam("$Database").Utf8(DatabasePath).Build();
params.AddParam("$TransactionalId").Utf8(TransactionalId).Build();
- // select unique consumer group names
- std::unordered_set<TString> uniqueConsumerGroups;
- for (auto& [partition, commitDetails] : OffsetsToCommit) {
- uniqueConsumerGroups.emplace(commitDetails.ConsumerName);
- }
-
- // add unique consumer group names to request as params
- auto& consumerGroupsParamBuilder = params.AddParam("$ConsumerGroups").BeginList();
- for (auto& consumerGroupName : uniqueConsumerGroups) {
- consumerGroupsParamBuilder.AddListItem().Utf8(consumerGroupName);
+ if (!OffsetsToCommit.empty()) {
+ // select unique consumer group names
+ std::unordered_set<TString> uniqueConsumerGroups;
+ for (auto& [partition, commitDetails] : OffsetsToCommit) {
+ uniqueConsumerGroups.emplace(commitDetails.ConsumerName);
+ }
+
+ // add unique consumer group names to request as params
+ auto& consumerGroupsParamBuilder = params.AddParam("$ConsumerGroups").BeginList();
+ for (auto& consumerGroupName : uniqueConsumerGroups) {
+ consumerGroupsParamBuilder.AddListItem().Utf8(consumerGroupName);
+ }
+ consumerGroupsParamBuilder.EndList().Build();
}
- consumerGroupsParamBuilder.EndList().Build();
return params.Build();
}
@@ -283,8 +290,10 @@ namespace NKafka {
void TTransactionActor::HandleSelectResponse(const NKqp::TEvKqp::TEvQueryResponse& response, const TActorContext& ctx) {
// YDB should return exactly two result sets for two queries: to producer and consumer state tables
- if (response.Record.GetResponse().GetYdbResults().size() != 2) {
- TString error = TStringBuilder() << "KQP returned wrong number of result sets on SELECT query. Expected 2, got " << response.Record.GetResponse().GetYdbResults().size() << ".";
+ int resultsSize = response.Record.GetResponse().GetYdbResults().size();
+ int expectedResultsSize = OffsetsToCommit.empty() ? 1 : 2; // if there were no consumer in transactions we do not send request to consumer table
+ if (expectedResultsSize != resultsSize) {
+ TString error = TStringBuilder() << "KQP returned wrong number of result sets on SELECT query. Expected " << expectedResultsSize << ", got " << resultsSize << ".";
KAFKA_LOG_W(error);
SendFailResponse<TEndTxnResponseData>(EndTxnRequestPtr, EKafkaErrors::BROKER_NOT_AVAILABLE, error);
Die(ctx);
@@ -309,15 +318,17 @@ namespace NKafka {
return;
}
- // parse and validate consumers
- std::unordered_map<TString, i32> consumerGenerationsByName = ParseConsumersGenerations(response);
- if (auto error = GetErrorInConsumersStates(consumerGenerationsByName)) {
- KAFKA_LOG_W(error);
- SendFailResponse<TEndTxnResponseData>(EndTxnRequestPtr, EKafkaErrors::PRODUCER_FENCED, error->data());
- Die(ctx);
- return;
+ if (!OffsetsToCommit.empty()) {
+ // parse and validate consumers
+ std::unordered_map<TString, i32> consumerGenerationsByName = ParseConsumersGenerations(response);
+ if (auto error = GetErrorInConsumersStates(consumerGenerationsByName)) {
+ KAFKA_LOG_W(error);
+ SendFailResponse<TEndTxnResponseData>(EndTxnRequestPtr, EKafkaErrors::PRODUCER_FENCED, error->data());
+ Die(ctx);
+ return;
+ }
}
-
+
KAFKA_LOG_D("Validated producer and consumers states. Everything is alright, adding kafka operations to transaction.");
auto kqpTxnId = response.Record.GetResponse().GetTxMeta().id();
// finally everything is valid and we can add kafka operations to transaction and attempt to commit
diff --git a/ydb/core/kafka_proxy/actors/kafka_transaction_actor.h b/ydb/core/kafka_proxy/actors/kafka_transaction_actor.h
index 551d7e5c2c2..9aec3d5c516 100644
--- a/ydb/core/kafka_proxy/actors/kafka_transaction_actor.h
+++ b/ydb/core/kafka_proxy/actors/kafka_transaction_actor.h
@@ -100,7 +100,7 @@ namespace NKafka {
template<class EventType>
bool ProducerInRequestIsValid(TMessagePtr<EventType> kafkaRequest);
TString GetFullTopicPath(const TString& topicName);
- TString GetYqlWithTablesNames(const TString& templateStr);
+ TString GetYqlWithTablesNames();
NYdb::TParams BuildSelectParams();
THolder<NKikimr::NKqp::TEvKqp::TEvQueryRequest> BuildAddKafkaOperationsRequest(const TString& kqpTransactionId);
void HandleSelectResponse(const NKqp::TEvKqp::TEvQueryResponse& response, const TActorContext& ctx);
diff --git a/ydb/core/kafka_proxy/actors/kafka_transaction_actor_sql.cpp b/ydb/core/kafka_proxy/actors/kafka_transaction_actor_sql.cpp
index 00d9bdb3a9b..1e1adfaf8a0 100644
--- a/ydb/core/kafka_proxy/actors/kafka_transaction_actor_sql.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_transaction_actor_sql.cpp
@@ -2,7 +2,17 @@
namespace NKafka::NKafkaTransactionSql {
- TString SELECT_FOR_VALIDATION = R"sql(
+ TString SELECT_FOR_VALIDATION_WITHOUT_CONSUMERS = R"sql(
+ --!syntax_v1
+ DECLARE $Database AS Utf8;
+ DECLARE $TransactionalId AS Utf8;
+
+ SELECT * FROM `<producer_state_table_name>`
+ WHERE database = $Database
+ AND transactional_id = $TransactionalId;
+ )sql";
+
+ TString SELECT_FOR_VALIDATION_WITH_CONSUMERS = R"sql(
--!syntax_v1
DECLARE $Database AS Utf8;
DECLARE $TransactionalId AS Utf8;
diff --git a/ydb/core/kafka_proxy/actors/kafka_transaction_actor_sql.h b/ydb/core/kafka_proxy/actors/kafka_transaction_actor_sql.h
index 8a116a90ce0..fdc192499c8 100644
--- a/ydb/core/kafka_proxy/actors/kafka_transaction_actor_sql.h
+++ b/ydb/core/kafka_proxy/actors/kafka_transaction_actor_sql.h
@@ -7,6 +7,7 @@ namespace NKafka::NKafkaTransactionSql {
constexpr ui32 PRODUCER_STATE_REQUEST_INDEX = 0;
constexpr ui32 CONSUMER_STATES_REQUEST_INDEX = 1;
- extern const TString SELECT_FOR_VALIDATION;
+ extern const TString SELECT_FOR_VALIDATION_WITHOUT_CONSUMERS;
+ extern const TString SELECT_FOR_VALIDATION_WITH_CONSUMERS;
} // namespace NKafka::NKafkaTransactionSql
diff --git a/ydb/core/kafka_proxy/ut/kafka_test_client.h b/ydb/core/kafka_proxy/ut/kafka_test_client.h
index b979461f2e9..3dd2da9919a 100644
--- a/ydb/core/kafka_proxy/ut/kafka_test_client.h
+++ b/ydb/core/kafka_proxy/ut/kafka_test_client.h
@@ -107,7 +107,7 @@ class TKafkaTestClient {
TMessagePtr<TListGroupsResponseData> ListGroups(TListGroupsRequestData request);
- TMessagePtr<TListGroupsResponseData> ListGroups(const std::vector<std::optional<TString>>& statesFilter);
+ TMessagePtr<TListGroupsResponseData> ListGroups(const std::vector<std::optional<TString>>& statesFilter = {});
TMessagePtr<TDescribeGroupsResponseData> DescribeGroups(TDescribeGroupsRequestData& request);
diff --git a/ydb/core/kafka_proxy/ut/ut_produce_actor.cpp b/ydb/core/kafka_proxy/ut/ut_produce_actor.cpp
index 712bc85262f..4af1821c025 100644
--- a/ydb/core/kafka_proxy/ut/ut_produce_actor.cpp
+++ b/ydb/core/kafka_proxy/ut/ut_produce_actor.cpp
@@ -62,6 +62,7 @@ namespace {
Ctx->Runtime->DisableBreakOnStopCondition();
Ctx->Runtime->SetLogPriority(NKikimrServices::KAFKA_PROXY, NLog::PRI_TRACE);
Ctx->Runtime->SetLogPriority(NKikimrServices::PQ_WRITE_PROXY, NLog::PRI_TRACE);
+ Ctx->Runtime->SetLogPriority(NKikimrServices::PERSQUEUE, NLog::PRI_DEBUG);
TContext::TPtr kafkaContext = std::make_shared<TContext>(KafkaConfig);
kafkaContext->DatabasePath = "/Root/PQ";
ActorId = Ctx->Runtime->Register(CreateKafkaProduceActor(kafkaContext));
@@ -103,6 +104,7 @@ namespace {
void AssertCorrectOptsInPartitionWriter(const TActorId& writerId, const TProducerInstanceId& producerInstanceId, const TMaybe<TString>& transactionalId) {
NKikimr::NPQ::TPartitionWriter* writer = dynamic_cast<NKikimr::NPQ::TPartitionWriter*>(Ctx->Runtime->FindActor(writerId));
+ UNIT_ASSERT(writer);
const TPartitionWriterOpts& writerOpts = writer->GetOpts();
UNIT_ASSERT_VALUES_EQUAL(*writerOpts.KafkaProducerInstanceId, producerInstanceId);
@@ -112,6 +114,16 @@ namespace {
UNIT_ASSERT(transactionalId.Empty());
}
}
+
+ THolder<TEvPersQueue::TEvResponse> CreateMissingSupPartitionErrorResponse(ui64 cookie) {
+ auto event = MakeHolder<TEvPersQueue::TEvResponse>();
+ NKikimrClient::TResponse record;
+ record.SetErrorReason("expected test error");
+ record.SetErrorCode(::NPersQueue::NErrorCode::EErrorCode::KAFKA_TRANSACTION_MISSING_SUPPORTIVE_PARTITION);
+ record.MutablePartitionResponse()->SetCookie(cookie);
+ event->Record = record;
+ return event;
+ }
};
Y_UNIT_TEST_SUITE_F(ProduceActor, TProduceActorFixture) {
@@ -171,6 +183,53 @@ namespace {
UNIT_ASSERT_VALUES_EQUAL(poisonPillReceiver, firstPartitionWriterId);
}
+ Y_UNIT_TEST(OnProduceWithTransactionalId_andLostMessagesError_shouldRecreatePartitionWriterAndRetryProduce) {
+ i64 producerId = 1;
+ i32 producerEpoch = 2;
+ TActorId firstWriteRequestReceiver;
+ TActorId poisonPillReceiver;
+ TActorId secondWriteRequestReceiver;
+ int writeRequestsCounter = 0;
+ int poisonPillCounter = 0;
+ auto observer = [&](TAutoPtr<IEventHandle>& input) {
+ Cout << input->ToString() << Endl;
+ if (auto* event = input->CastAsLocal<TEvPartitionWriter::TEvWriteRequest>()) {
+ if (writeRequestsCounter == 0) {
+ firstWriteRequestReceiver = input->Recipient;
+ AssertCorrectOptsInPartitionWriter(firstWriteRequestReceiver, {producerId, producerEpoch}, TransactionalId);
+ } else if (writeRequestsCounter == 1) {
+ secondWriteRequestReceiver = input->Recipient;
+ AssertCorrectOptsInPartitionWriter(secondWriteRequestReceiver, {producerId, producerEpoch}, TransactionalId);
+ }
+ writeRequestsCounter++;
+ } else if (auto* event = input->CastAsLocal<TEvPersQueue::TEvRequest>()) {
+ if (event->Record.GetPartitionRequest().HasCmdReserveBytes()) {
+ Ctx->Runtime->Send(new IEventHandle(firstWriteRequestReceiver, input->Sender, CreateMissingSupPartitionErrorResponse(event->Record.GetPartitionRequest().GetCookie()).Release()));
+ return TTestActorRuntimeBase::EEventAction::DROP;
+ }
+ } else if (auto* event = input->CastAsLocal<TEvents::TEvPoison>()) {
+ if (poisonPillCounter == 0) { // only first poison pill goes to writer
+ poisonPillReceiver = input->Recipient;
+ poisonPillCounter++;
+ } // we are not interested in all subsequent
+ }
+
+ return TTestActorRuntimeBase::EEventAction::PROCESS;
+ };
+ Ctx->Runtime->SetObserverFunc(observer);
+
+ SendProduce(TransactionalId, producerId, producerEpoch);
+
+ TDispatchOptions options;
+ options.CustomFinalCondition = [&writeRequestsCounter, &poisonPillCounter]() {
+ return writeRequestsCounter > 1 && poisonPillCounter > 0;
+ };
+ UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options));
+
+ UNIT_ASSERT_VALUES_UNEQUAL(firstWriteRequestReceiver, secondWriteRequestReceiver);
+ UNIT_ASSERT_VALUES_EQUAL(firstWriteRequestReceiver, poisonPillReceiver);
+ }
+
Y_UNIT_TEST(OnProduceWithoutTransactionalId_shouldNotKillOldWriter) {
i64 producerId = 0;
i32 producerEpoch = 0;
diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
index bbb41c85fee..53adc7be319 100644
--- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp
+++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
@@ -3367,6 +3367,137 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
}
}
+ Y_UNIT_TEST(Several_Subsequent_Transactions_Scenario) {
+ TInsecureTestServer testServer("1", false, true);
+ TKafkaTestClient kafkaClient(testServer.Port);
+ NYdb::NTopic::TTopicClient pqClient(*testServer.Driver);
+ // use random values to avoid parallel execution problems
+ TString inputTopicName = TStringBuilder() << "input-topic-" << RandomNumber<ui64>();
+ TString outputTopicName = TStringBuilder() << "output-topic-" << RandomNumber<ui64>();
+ TString transactionalId = TStringBuilder() << "my-tx-producer-" << RandomNumber<ui64>();
+ TString consumerName = "my-consumer";
+
+ // create input and output topics
+ CreateTopic(pqClient, inputTopicName, 3, {consumerName});
+ CreateTopic(pqClient, outputTopicName, 3, {consumerName});
+
+ // produce data to input topic (to commit offsets in further steps)
+ auto inputProduceResponse = kafkaClient.Produce({inputTopicName, 0}, {{"key1", "val1"}, {"key2", "val2"}, {"key3", "val3"}});
+ UNIT_ASSERT_VALUES_EQUAL(inputProduceResponse->Responses[0].PartitionResponses[0].ErrorCode, EKafkaErrors::NONE_ERROR);
+
+ // init producer id
+ auto initProducerIdResp = kafkaClient.InitProducerId(transactionalId, 30000);
+ UNIT_ASSERT_VALUES_EQUAL(initProducerIdResp->ErrorCode, EKafkaErrors::NONE_ERROR);
+ TProducerInstanceId producerInstanceId = {initProducerIdResp->ProducerId, initProducerIdResp->ProducerEpoch};
+
+ ui32 totalTxns = 3;
+ for (ui32 i = 0; i < totalTxns; ++i) {
+ // add partitions to txn
+ std::unordered_map<TString, std::vector<ui32>> topicPartitionsToAddToTxn;
+ topicPartitionsToAddToTxn[outputTopicName] = std::vector<ui32>{0, 1};
+ auto addPartsResponse = kafkaClient.AddPartitionsToTxn(transactionalId, producerInstanceId, topicPartitionsToAddToTxn);
+ UNIT_ASSERT_VALUES_EQUAL(addPartsResponse->Results[0].Results[0].ErrorCode, EKafkaErrors::NONE_ERROR);
+ UNIT_ASSERT_VALUES_EQUAL(addPartsResponse->Results[0].Results[1].ErrorCode, EKafkaErrors::NONE_ERROR);
+
+ // produce data
+ // to part 0
+ auto out0ProduceResponse = kafkaClient.Produce({outputTopicName, 0}, {{std::to_string(i), "123"}}, i, producerInstanceId, transactionalId);
+ UNIT_ASSERT_VALUES_EQUAL_C(out0ProduceResponse->Responses[0].PartitionResponses[0].ErrorCode, EKafkaErrors::NONE_ERROR, TStringBuilder() << "Txn " << i + 1);
+ // to part 1
+ auto out1ProduceResponse = kafkaClient.Produce({outputTopicName, 1}, {{std::to_string(i + totalTxns), "987"}}, i, producerInstanceId, transactionalId);
+ UNIT_ASSERT_VALUES_EQUAL_C(out1ProduceResponse->Responses[0].PartitionResponses[0].ErrorCode, EKafkaErrors::NONE_ERROR, TStringBuilder() << "Txn " << i + 1);
+
+ // init consumer
+ std::vector<TString> topicsToSubscribe;
+ topicsToSubscribe.push_back(outputTopicName);
+ TString protocolName = "range";
+ auto consumerInfo = kafkaClient.JoinAndSyncGroupAndWaitPartitions(topicsToSubscribe, consumerName, 3, protocolName, 3, 15000);
+
+ // add offsets to txn
+ auto addOffsetsResponse = kafkaClient.AddOffsetsToTxn(transactionalId, producerInstanceId, consumerName);
+ UNIT_ASSERT_VALUES_EQUAL(addOffsetsResponse->ErrorCode, EKafkaErrors::NONE_ERROR);
+
+ // txn offset commit
+ std::unordered_map<TString, std::vector<std::pair<ui32, ui64>>> offsetsToCommit;
+ offsetsToCommit[inputTopicName] = std::vector<std::pair<ui32, ui64>>{{0, i + 1}};
+ auto txnOffsetCommitResponse = kafkaClient.TxnOffsetCommit(transactionalId, producerInstanceId, consumerName, consumerInfo.GenerationId, offsetsToCommit);
+ UNIT_ASSERT_VALUES_EQUAL(txnOffsetCommitResponse->Topics[0].Partitions[0].ErrorCode, EKafkaErrors::NONE_ERROR);
+
+ // end txn
+ auto endTxnResponse = kafkaClient.EndTxn(transactionalId, producerInstanceId, true);
+ UNIT_ASSERT_VALUES_EQUAL(endTxnResponse->ErrorCode, EKafkaErrors::NONE_ERROR);
+
+ // validate data is accessible in target topic
+ auto fetchResponse1 = kafkaClient.Fetch({{outputTopicName, {0, 1}}});
+ UNIT_ASSERT_VALUES_EQUAL(fetchResponse1->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ UNIT_ASSERT_VALUES_EQUAL(fetchResponse1->Responses[0].Partitions[0].Records->Records.size(), i + 1);
+ UNIT_ASSERT_VALUES_EQUAL(fetchResponse1->Responses[0].Partitions[1].Records->Records.size(), i + 1);
+ auto record1 = fetchResponse1->Responses[0].Partitions[0].Records->Records[i];
+ UNIT_ASSERT_VALUES_EQUAL(TString(record1.Key.value().data(), record1.Key.value().size()), std::to_string(i));
+ UNIT_ASSERT_VALUES_EQUAL(TString(record1.Value.value().data(), record1.Value.value().size()), "123");
+ auto record2 = fetchResponse1->Responses[0].Partitions[1].Records->Records[i];
+ UNIT_ASSERT_VALUES_EQUAL(TString(record2.Key.value().data(), record2.Key.value().size()), std::to_string(i + totalTxns));
+ UNIT_ASSERT_VALUES_EQUAL(TString(record2.Value.value().data(), record2.Value.value().size()), "987");
+
+ // validate consumer offset committed
+ std::map<TString, std::vector<i32>> topicsToPartitionsToFetch;
+ topicsToPartitionsToFetch[inputTopicName] = std::vector<i32>{0};
+ auto offsetFetchResponse = kafkaClient.OffsetFetch(consumerName, topicsToPartitionsToFetch);
+ UNIT_ASSERT_VALUES_EQUAL(offsetFetchResponse->ErrorCode, EKafkaErrors::NONE_ERROR);
+ UNIT_ASSERT_VALUES_EQUAL(offsetFetchResponse->Groups[0].Topics[0].Partitions[0].CommittedOffset, i + 1);
+ }
+ }
+
+ Y_UNIT_TEST(Several_Writes_In_One_Transaction) {
+ TInsecureTestServer testServer("1", false, true);
+ TKafkaTestClient kafkaClient(testServer.Port);
+ NYdb::NTopic::TTopicClient pqClient(*testServer.Driver);
+ // use random values to avoid parallel execution problems
+ TString topicName = TStringBuilder() << "input-topic-" << RandomNumber<ui64>();
+ TString transactionalId = TStringBuilder() << "my-tx-producer-" << RandomNumber<ui64>();
+ TString consumerName = "my-consumer";
+
+ CreateTopic(pqClient, topicName, 1, {consumerName});
+
+ // init producer id
+ auto initProducerIdResp = kafkaClient.InitProducerId(transactionalId, 30000);
+ UNIT_ASSERT_VALUES_EQUAL(initProducerIdResp->ErrorCode, EKafkaErrors::NONE_ERROR);
+ TProducerInstanceId producerInstanceId = {initProducerIdResp->ProducerId, initProducerIdResp->ProducerEpoch};
+
+ // add partitions to txn
+ std::unordered_map<TString, std::vector<ui32>> topicPartitionsToAddToTxn;
+ topicPartitionsToAddToTxn[topicName] = std::vector<ui32>{0};
+ auto addPartsResponse = kafkaClient.AddPartitionsToTxn(transactionalId, producerInstanceId, topicPartitionsToAddToTxn);
+ UNIT_ASSERT_VALUES_EQUAL(addPartsResponse->Results[0].Results[0].ErrorCode, EKafkaErrors::NONE_ERROR);
+
+ // produce data
+ auto produceResponse0 = kafkaClient.Produce({topicName, 0}, {{"0", "123"}}, 0, producerInstanceId, transactionalId);
+ UNIT_ASSERT_VALUES_EQUAL(produceResponse0->Responses[0].PartitionResponses[0].ErrorCode, EKafkaErrors::NONE_ERROR);
+ auto produceResponse1 = kafkaClient.Produce({topicName, 0}, {{"1", "234"}}, 1, producerInstanceId, transactionalId);
+ UNIT_ASSERT_VALUES_EQUAL(produceResponse1->Responses[0].PartitionResponses[0].ErrorCode, EKafkaErrors::NONE_ERROR);
+
+ // init consumer
+ std::vector<TString> topicsToSubscribe;
+ topicsToSubscribe.push_back(topicName);
+ TString protocolName = "range";
+ auto consumerInfo = kafkaClient.JoinAndSyncGroupAndWaitPartitions(topicsToSubscribe, consumerName, 1, protocolName, 1, 15000);
+
+ // end txn
+ auto endTxnResponse = kafkaClient.EndTxn(transactionalId, producerInstanceId, true);
+ UNIT_ASSERT_VALUES_EQUAL(endTxnResponse->ErrorCode, EKafkaErrors::NONE_ERROR);
+
+ // validate data is accessible in target topic
+ auto fetchResponse1 = kafkaClient.Fetch({{topicName, {0}}});
+ UNIT_ASSERT_VALUES_EQUAL(fetchResponse1->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ UNIT_ASSERT_VALUES_EQUAL(fetchResponse1->Responses[0].Partitions[0].Records->Records.size(), 2);
+ auto record1 = fetchResponse1->Responses[0].Partitions[0].Records->Records[0];
+ UNIT_ASSERT_VALUES_EQUAL(TString(record1.Key.value().data(), record1.Key.value().size()), "0");
+ UNIT_ASSERT_VALUES_EQUAL(TString(record1.Value.value().data(), record1.Value.value().size()), "123");
+ auto record2 = fetchResponse1->Responses[0].Partitions[0].Records->Records[1];
+ UNIT_ASSERT_VALUES_EQUAL(TString(record2.Key.value().data(), record2.Key.value().size()), "1");
+ UNIT_ASSERT_VALUES_EQUAL(TString(record2.Value.value().data(), record2.Value.value().size()), "234");
+ }
+
Y_UNIT_TEST(Commit_Transaction_After_timeout_should_return_producer_fenced) {
TInsecureTestServer testServer("1", false, true);
TKafkaTestClient kafkaClient(testServer.Port);
diff --git a/ydb/core/kafka_proxy/ut/ut_transaction_actor.cpp b/ydb/core/kafka_proxy/ut/ut_transaction_actor.cpp
index efb9ac6b200..aac121a2fbf 100644
--- a/ydb/core/kafka_proxy/ut/ut_transaction_actor.cpp
+++ b/ydb/core/kafka_proxy/ut/ut_transaction_actor.cpp
@@ -15,7 +15,7 @@ namespace {
public:
TDummyKqpActor() : TActor<TDummyKqpActor>(&TDummyKqpActor::StateFunc) {}
- void SetValidationResponse(const TString& transactionalId, i64 producerId, i32 producerEpoch, const std::unordered_map<TString, i32>& consumerGenerations = {}) {
+ void SetValidationResponse(const TString& transactionalId, i64 producerId, i32 producerEpoch, const TMaybe<std::unordered_map<TString, i32>>& consumerGenerations = Nothing()) {
TransactionalIdToReturn = transactionalId;
ProducerIdToReturn = producerId;
ProducerEpochToReturn = producerEpoch;
@@ -81,8 +81,10 @@ namespace {
record.SetYdbStatus(Ydb::StatusIds::SUCCESS);
auto* producerState = record.MutableResponse()->AddYdbResults();
*producerState = CreateProducerStateResultsSet();
- auto* consumersState = record.MutableResponse()->AddYdbResults();
- *consumersState = CreateConsumersStatesResultSet();
+ if (ConsumerGenerationsToReturn.Defined()) {
+ auto* consumersState = record.MutableResponse()->AddYdbResults();
+ *consumersState = CreateConsumersStatesResultSet();
+ }
response->Record = record;
return response;
@@ -140,7 +142,7 @@ namespace {
" }\n"
"}\n";
- for (auto& [consumerName, generation] : ConsumerGenerationsToReturn) {
+ for (auto& [consumerName, generation] : *ConsumerGenerationsToReturn) {
builder <<
"rows {\n"
" items {\n"
@@ -159,7 +161,7 @@ namespace {
TString TransactionalIdToReturn = "";
i64 ProducerIdToReturn = 0;
i32 ProducerEpochToReturn = 0;
- std::unordered_map<TString, i32> ConsumerGenerationsToReturn = {};
+ TMaybe<std::unordered_map<TString, i32>> ConsumerGenerationsToReturn = Nothing();
bool ReturnSuccessOnCommit = true;
};
@@ -294,7 +296,7 @@ namespace {
// Arguments:
// 1. callback - function, that will be called on recieving the response from KQP om commit request
// 2. consumerGenerationsToReturnInValidationRequest - map of consumer name to its generation to ensure proper validation of consumer state by actor
- void AddObserverForAddOperationsRequest(std::function<void(const TEvKqp::TEvQueryRequest*)> callback, std::unordered_map<TString, i32> consumerGenerationsToReturnInValidationRequest = {}) {
+ void AddObserverForAddOperationsRequest(std::function<void(const TEvKqp::TEvQueryRequest*)> callback, TMaybe<std::unordered_map<TString, i32>> consumerGenerationsToReturnInValidationRequest = Nothing()) {
DummyKqpActor->SetValidationResponse(TransactionalId, ProducerId, ProducerEpoch, consumerGenerationsToReturnInValidationRequest);
auto observer = [callback = std::move(callback), this](TAutoPtr<IEventHandle>& input) {
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index e2f5dce9f87..ac5d77c00b3 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -3474,6 +3474,7 @@ void TPartition::ScheduleReplyError(const ui64 dst,
NPersQueue::NErrorCode::EErrorCode errorCode,
const TString& error)
{
+ PQ_LOG_ERROR("Got error: " << error);
Replies.emplace_back(Tablet,
MakeReplyError(dst,
errorCode,
diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp
index a6e839941f6..52081e0a1ec 100644
--- a/ydb/core/persqueue/partition_write.cpp
+++ b/ydb/core/persqueue/partition_write.cpp
@@ -1212,7 +1212,7 @@ bool TPartition::ExecRequest(TWriteMsg& p, ProcessParameters& parameters, TEvKey
((sourceId.SeqNo() && *sourceId.SeqNo() >= p.Msg.SeqNo) || (p.InitialSeqNo && p.InitialSeqNo.value() >= p.Msg.SeqNo))
) {
if (poffset >= curOffset) {
- PQ_LOG_D("Already written message. Topic: '" << TopicName()
+ PQ_LOG_W("Already written message. Topic: '" << TopicName()
<< "' Partition: " << Partition << " SourceId: '" << EscapeC(p.Msg.SourceId)
<< "'. Message seqNo: " << p.Msg.SeqNo
<< ". Committed seqNo: " << sourceId.CommittedSeqNo()
diff --git a/ydb/core/persqueue/pq_impl.cpp b/ydb/core/persqueue/pq_impl.cpp
index 33f60d2c121..3b8b81dec2c 100644
--- a/ydb/core/persqueue/pq_impl.cpp
+++ b/ydb/core/persqueue/pq_impl.cpp
@@ -395,7 +395,7 @@ public:
bool HandleError(TEvPQ::TEvError *ev, const TActorContext& ctx)
{
- PQ_LOG_D("Answer error topic: '" << TopicName << "'" <<
+ PQ_LOG_ERROR("Answer error topic: '" << TopicName << "'" <<
" partition: " << Partition <<
" messageNo: " << MessageNo <<
" requestId: " << ReqId <<
@@ -2704,10 +2704,18 @@ void TPersQueue::HandleEventForSupportivePartition(const ui64 responseCookie,
}
} else {
if (!req.GetNeedSupportivePartition()) {
+ // missing supportivce partition in kafka transaction means that we already committed and deleted transaction for current producerId + producerEpoch
+ NPersQueue::NErrorCode::EErrorCode errorCode = writeId.KafkaApiTransaction ?
+ NPersQueue::NErrorCode::KAFKA_TRANSACTION_MISSING_SUPPORTIVE_PARTITION :
+ NPersQueue::NErrorCode::PRECONDITION_FAILED;
+ TString error = writeId.KafkaApiTransaction ?
+ "Kafka transaction and there is no supportive partition for current producerId and producerEpoch. It means GetOwnership request was not called from TPartitionWriter" :
+ "lost messages";
+
ReplyError(ctx,
responseCookie,
- NPersQueue::NErrorCode::PRECONDITION_FAILED,
- "lost messages");
+ errorCode,
+ error);
return;
}
diff --git a/ydb/core/persqueue/write_id.cpp b/ydb/core/persqueue/write_id.cpp
index fe7959baa5f..bb87e99211d 100644
--- a/ydb/core/persqueue/write_id.cpp
+++ b/ydb/core/persqueue/write_id.cpp
@@ -27,7 +27,11 @@ bool TWriteId::operator<(const TWriteId& rhs) const
void TWriteId::ToStream(IOutputStream& s) const
{
- s << '{' << NodeId << ", " << KeyId << '}';
+ if (KafkaApiTransaction) {
+ s << "KafkaTransactionWriteId{" << KafkaProducerInstanceId.Id << ", " << KafkaProducerInstanceId.Epoch << '}';
+ } else {
+ s << '{' << NodeId << ", " << KeyId << '}';
+ }
}
template <class T>
diff --git a/ydb/public/api/protos/draft/persqueue_error_codes.proto b/ydb/public/api/protos/draft/persqueue_error_codes.proto
index 9a32a6f00ba..7296cd932f3 100644
--- a/ydb/public/api/protos/draft/persqueue_error_codes.proto
+++ b/ydb/public/api/protos/draft/persqueue_error_codes.proto
@@ -54,7 +54,8 @@ enum EErrorCode {
KAFKA_INVALID_PRODUCER_EPOCH = 32;
KAFKA_OUT_OF_ORDER_SEQUENCE_NUMBER = 33;
- KAFKA_DUPLICATE_SEQUENCE_NUMBER = 34;
+ KAFKA_TRANSACTION_MISSING_SUPPORTIVE_PARTITION = 34;
+ KAFKA_DUPLICATE_SEQUENCE_NUMBER = 35;
ERROR = 100;
}