aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-08-03 15:57:36 +0300
committertesseract <tesseract@yandex-team.com>2023-08-03 15:57:36 +0300
commit1a304629e0b0acbf460271ad8c6101d003884380 (patch)
tree9ed27ec418c61d47f6b83d63ac195d6cbd3668d2
parent4c75b5ed26e3369316d8a9b06950527b94ad7c1d (diff)
downloadydb-1a304629e0b0acbf460271ad8c6101d003884380.tar.gz
Kafka write protocol
-rw-r--r--ydb/core/base/events.h3
-rw-r--r--ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/kafka_proxy/kafka.h140
-rw-r--r--ydb/core/kafka_proxy/kafka_connection.cpp109
-rw-r--r--ydb/core/kafka_proxy/kafka_events.h51
-rw-r--r--ydb/core/kafka_proxy/kafka_messages.h24
-rw-r--r--ydb/core/kafka_proxy/kafka_messages_int.h1
-rw-r--r--ydb/core/kafka_proxy/kafka_produce_actor.cpp526
-rw-r--r--ydb/core/kafka_proxy/kafka_produce_actor.h186
-rw-r--r--ydb/core/kafka_proxy/ya.make2
-rw-r--r--ydb/core/persqueue/blob.h2
-rw-r--r--ydb/core/persqueue/writer/writer.cpp9
-rw-r--r--ydb/library/services/services.proto1
-rw-r--r--ydb/services/persqueue_v1/actors/write_session_actor.ipp11
17 files changed, 1030 insertions, 39 deletions
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h
index f81a444a7f7..fcc5dad2041 100644
--- a/ydb/core/base/events.h
+++ b/ydb/core/base/events.h
@@ -163,7 +163,8 @@ struct TKikimrEvents : TEvents {
ES_CONVEYOR,
ES_KQP_SCAN_EXCHANGE,
ES_IC_NODE_CACHE,
- ES_DATA_OPERATIONS
+ ES_DATA_OPERATIONS,
+ ES_KAFKA
};
};
diff --git a/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt
index 09418d69986..f5901e67f90 100644
--- a/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt
@@ -22,5 +22,6 @@ target_sources(ydb-core-kafka_proxy PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_produce_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp
)
diff --git a/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt b/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt
index d4beb652817..ce6bfbd67a3 100644
--- a/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt
@@ -23,5 +23,6 @@ target_sources(ydb-core-kafka_proxy PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_produce_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp
)
diff --git a/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt
index d4beb652817..ce6bfbd67a3 100644
--- a/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt
@@ -23,5 +23,6 @@ target_sources(ydb-core-kafka_proxy PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_produce_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp
)
diff --git a/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt
index 09418d69986..f5901e67f90 100644
--- a/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt
@@ -22,5 +22,6 @@ target_sources(ydb-core-kafka_proxy PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_produce_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp
)
diff --git a/ydb/core/kafka_proxy/kafka.h b/ydb/core/kafka_proxy/kafka.h
index 7297003527b..70c41082116 100644
--- a/ydb/core/kafka_proxy/kafka.h
+++ b/ydb/core/kafka_proxy/kafka.h
@@ -157,6 +157,144 @@ enum ESizeFormat {
} // namespace NPrivate
+// see https://kafka.apache.org/11/protocol.html#protocol_error_codes
+enum EKafkaErrors {
+
+ UNKNOWN_SERVER_ERROR = -1, // The server experienced an unexpected error when processing the request.
+ NONE_ERROR = 0,
+ OFFSET_OUT_OF_RANGE, // The requested offset is not within the range of offsets maintained by the server.,
+ CORRUPT_MESSAGE, // This message has failed its CRC checksum, exceeds the valid size, has a null key for a compacted topic, or is otherwise corrupt.
+ UNKNOWN_TOPIC_OR_PARTITION, // This server does not host this topic-partition.
+ INVALID_FETCH_SIZE, // The requested fetch size is invalid.
+ LEADER_NOT_AVAILABLE, // There is no leader for this topic-partition as we are in the middle of a leadership election.
+ NOT_LEADER_OR_FOLLOWER, // For requests intended only for the leader, this error indicates that the broker is not the current leader.
+ // For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.
+ REQUEST_TIMED_OUT, // The request timed out.
+ BROKER_NOT_AVAILABLE, // The broker is not available.
+ REPLICA_NOT_AVAILABLE, // The replica is not available for the requested topic-partition. Produce/Fetch requests and other requests
+ // intended only for the leader or follower return NOT_LEADER_OR_FOLLOWER if the broker is not a replica of the topic-partition.
+ MESSAGE_TOO_LARGE, // The request included a message larger than the max message size the server will accept.
+ STALE_CONTROLLER_EPOCH, // The controller moved to another broker.
+ OFFSET_METADATA_TOO_LARGE, // The metadata field of the offset request was too large.
+ NETWORK_EXCEPTION, // The server disconnected before a response was received.
+ COORDINATOR_LOAD_IN_PROGRESS, // The coordinator is loading and hence can't process requests.
+ COORDINATOR_NOT_AVAILABLE, // The coordinator is not available.
+ NOT_COORDINATOR, // This is not the correct coordinator.
+ INVALID_TOPIC_EXCEPTION, // The request attempted to perform an operation on an invalid topic.
+ RECORD_LIST_TOO_LARGE, // The request included message batch larger than the configured segment size on the server.
+ NOT_ENOUGH_REPLICAS, // Messages are rejected since there are fewer in-sync replicas than required.
+ NOT_ENOUGH_REPLICAS_AFTER_APPEND, // Messages are written to the log, but to fewer in-sync replicas than required.
+ INVALID_REQUIRED_ACKS, // Produce request specified an invalid value for required acks.
+ ILLEGAL_GENERATION, // Specified group generation id is not valid.
+ INCONSISTENT_GROUP_PROTOCOL, // The group member's supported protocols are incompatible with those of existing members
+ // or first group member tried to join with empty protocol type or empty protocol list.
+ INVALID_GROUP_ID, // The configured groupId is invalid.
+ UNKNOWN_MEMBER_ID, // The coordinator is not aware of this member.
+ INVALID_SESSION_TIMEOUT, // The session timeout is not within the range allowed by the broker
+ // (as configured by group.min.session.timeout.ms and group.max.session.timeout.ms).
+ REBALANCE_IN_PROGRESS, // The group is rebalancing, so a rejoin is needed.
+ INVALID_COMMIT_OFFSET_SIZE, // The committing offset data size is not valid.
+ TOPIC_AUTHORIZATION_FAILED, // Topic authorization failed.
+ GROUP_AUTHORIZATION_FAILED, // Group authorization failed.
+ CLUSTER_AUTHORIZATION_FAILED, // Cluster authorization failed.
+ INVALID_TIMESTAMP, // The timestamp of the message is out of acceptable range.
+ UNSUPPORTED_SASL_MECHANISM, // The broker does not support the requested SASL mechanism.
+ ILLEGAL_SASL_STATE, // Request is not valid given the current SASL state.
+ UNSUPPORTED_VERSION, // The version of API is not supported.
+ TOPIC_ALREADY_EXISTS, // Topic with this name already exists.
+ INVALID_PARTITIONS, // Number of partitions is below 1.
+ INVALID_REPLICATION_FACTOR, // Replication factor is below 1 or larger than the number of available brokers.
+ INVALID_REPLICA_ASSIGNMENT, // Replica assignment is invalid.
+ INVALID_CONFIG, // Configuration is invalid.
+ NOT_CONTROLLER, // This is not the correct controller for this cluster.
+ INVALID_REQUEST, // This most likely occurs because of a request being malformed by the
+ // client library or the message was sent to an incompatible broker. See the broker logs
+ // for more details.
+ UNSUPPORTED_FOR_MESSAGE_FORMAT, // The message format version on the broker does not support the request.
+ POLICY_VIOLATION, // Request parameters do not satisfy the configured policy.
+ OUT_OF_ORDER_SEQUENCE_NUMBER, // The broker received an out of order sequence number.
+ DUPLICATE_SEQUENCE_NUMBER, // The broker received a duplicate sequence number.
+ INVALID_PRODUCER_EPOCH, // Producer attempted to produce with an old epoch.
+ INVALID_TXN_STATE, // The producer attempted a transactional operation in an invalid state.
+ INVALID_PRODUCER_ID_MAPPING, // The producer attempted to use a producer id which is not currently assigned to
+ // its transactional id.
+ INVALID_TRANSACTION_TIMEOUT, // The transaction timeout is larger than the maximum value allowed by
+ // the broker (as configured by transaction.max.timeout.ms).
+ CONCURRENT_TRANSACTIONS, // The producer attempted to update a transaction
+ // while another concurrent operation on the same transaction was ongoing.
+ TRANSACTION_COORDINATOR_FENCED, // Indicates that the transaction coordinator sending a WriteTxnMarker
+ // is no longer the current coordinator for a given producer.
+ TRANSACTIONAL_ID_AUTHORIZATION_FAILED, // Transactional Id authorization failed.
+ SECURITY_DISABLED, // Security features are disabled.
+ OPERATION_NOT_ATTEMPTED, // The broker did not attempt to execute this operation. This may happen for
+ // batched RPCs where some operations in the batch failed, causing the broker to respond without
+ // trying the rest.
+ KAFKA_STORAGE_ERROR, // Disk error when trying to access log file on the disk.
+ LOG_DIR_NOT_FOUND, // The user-specified log directory is not found in the broker config.
+ SASL_AUTHENTICATION_FAILED, // SASL Authentication failed.
+ UNKNOWN_PRODUCER_ID, // This exception is raised by the broker if it could not locate the producer metadata
+ // associated with the producerId in question. This could happen if, for instance, the producer's records
+ // were deleted because their retention time had elapsed. Once the last records of the producerId are
+ // removed, the producer's metadata is removed from the broker, and future appends by the producer will
+ // return this exception.
+ REASSIGNMENT_IN_PROGRESS, // A partition reassignment is in progress.
+ DELEGATION_TOKEN_AUTH_DISABLED, // Delegation Token feature is not enabled.
+ DELEGATION_TOKEN_NOT_FOUND, // Delegation Token is not found on server.
+ DELEGATION_TOKEN_OWNER_MISMATCH, // Specified Principal is not valid Owner/Renewer.
+ DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, // Delegation Token requests are not allowed on PLAINTEXT/1-way SSL
+ // channels and on delegation token authenticated channels.
+ DELEGATION_TOKEN_AUTHORIZATION_FAILED, // Delegation Token authorization failed.
+ DELEGATION_TOKEN_EXPIRED, // Delegation Token is expired.
+ INVALID_PRINCIPAL_TYPE, // Supplied principalType is not supported.
+ NON_EMPTY_GROUP, // The group is not empty.
+ GROUP_ID_NOT_FOUND, // The group id does not exist.
+ FETCH_SESSION_ID_NOT_FOUND, // The fetch session ID was not found.
+ INVALID_FETCH_SESSION_EPOCH, // The fetch session epoch is invalid.
+ LISTENER_NOT_FOUND, // There is no listener on the leader broker that matches the listener on which
+ // metadata request was processed.
+ TOPIC_DELETION_DISABLED, // Topic deletion is disabled.
+ FENCED_LEADER_EPOCH, // The leader epoch in the request is older than the epoch on the broker.
+ UNKNOWN_LEADER_EPOCH, // The leader epoch in the request is newer than the epoch on the broker.
+ UNSUPPORTED_COMPRESSION_TYPE, // The requesting client does not support the compression type of given partition.
+ STALE_BROKER_EPOCH, // Broker epoch has changed.
+ OFFSET_NOT_AVAILABLE, // The leader high watermark has not caught up from a recent leader
+ // election so the offsets cannot be guaranteed to be monotonically increasing.
+ MEMBER_ID_REQUIRED, // The group member needs to have a valid member id before actually entering a consumer group.
+ PREFERRED_LEADER_NOT_AVAILABLE, // The preferred leader was not available.
+ GROUP_MAX_SIZE_REACHED, // The consumer group has reached its max size.
+ FENCED_INSTANCE_ID, // The broker rejected this static consumer since
+ // another consumer with the same group.instance.id has registered with a different member.id.
+ ELIGIBLE_LEADERS_NOT_AVAILABLE, // Eligible topic partition leaders are not available.
+ ELECTION_NOT_NEEDED, // Leader election not needed for topic partition.
+ NO_REASSIGNMENT_IN_PROGRESS, // No partition reassignment is in progress.
+ GROUP_SUBSCRIBED_TO_TOPIC, // Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it.
+ INVALID_RECORD, // This record has failed the validation on broker and hence will be rejected.
+ UNSTABLE_OFFSET_COMMIT, // There are unstable offsets that need to be cleared.
+ THROTTLING_QUOTA_EXCEEDED, // The throttling quota has been exceeded.
+ PRODUCER_FENCED, // There is a newer producer with the same transactionalId
+ // which fences the current one.
+ RESOURCE_NOT_FOUND, // A request illegally referred to a resource that does not exist.
+ DUPLICATE_RESOURCE, // A request illegally referred to the same resource twice.
+ UNACCEPTABLE_CREDENTIAL, // Requested credential would not meet criteria for acceptability.
+ INCONSISTENT_VOTER_SET, // Indicates that the either the sender or recipient of a
+ // voter-only request is not one of the expected voters
+ INVALID_UPDATE_VERSION, // The given update version was invalid.
+ FEATURE_UPDATE_FAILED, // Unable to update finalized features due to an unexpected server error.
+ PRINCIPAL_DESERIALIZATION_FAILURE, // Request principal deserialization failed during forwarding.
+ // This indicates an internal error on the broker cluster security setup.
+ SNAPSHOT_NOT_FOUND, // Requested snapshot was not found
+ POSITION_OUT_OF_RANGE, // Requested position is not greater than or equal to zero, and less than the size of the snapshot.
+ UNKNOWN_TOPIC_ID, // This server does not host this topic ID.
+ DUPLICATE_BROKER_REGISTRATION, // This broker ID is already in use.
+ BROKER_ID_NOT_REGISTERED, // The given broker ID was not registered.
+ INCONSISTENT_TOPIC_ID, // The log's topic ID did not match the topic ID in the request
+ INCONSISTENT_CLUSTER_ID, // The clusterId in the request does not match that found on the server
+ TRANSACTIONAL_ID_NOT_FOUND, // The transactionalId could not be found
+ FETCH_SESSION_TOPIC_ID_ERROR, // The fetch session encountered inconsistent topic ID usage
+ INELIGIBLE_REPLICA, // The new ISR contains at least one ineligible replica.
+ NEW_LEADER_ELECTED // The AlterPartition request successfully updated the partition state but the leader has changed.
+};
+
template <typename T>
void NormalizeNumber(T& value) {
#ifndef WORDS_BIGENDIAN
@@ -274,6 +412,8 @@ public:
class TApiMessage: public TMessage {
public:
+ using TPtr = std::shared_ptr<TApiMessage>;
+
~TApiMessage() = default;
virtual i16 ApiKey() const = 0;
diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp
index 6d1fea98466..d9553d591b8 100644
--- a/ydb/core/kafka_proxy/kafka_connection.cpp
+++ b/ydb/core/kafka_proxy/kafka_connection.cpp
@@ -1,8 +1,11 @@
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <ydb/core/raw_socket/sock_config.h>
+#include <ydb/core/util/address_classifier.h>
#include "kafka_connection.h"
+#include "kafka_events.h"
#include "kafka_messages.h"
+#include "kafka_produce_actor.h"
#include "kafka_log_impl.h"
#include <strstream>
@@ -12,6 +15,7 @@
namespace NKafka {
using namespace NActors;
+using namespace NKikimr;
char Hex(const unsigned char c) {
return c < 10 ? '0' + c : 'A' + c - 10;
@@ -82,6 +86,9 @@ public:
bool ConnectionEstablished = false;
bool CloseConnection = false;
+ NAddressClassifier::TLabeledAddressClassifier::TConstPtr DatacenterClassifier;
+ TString ClientDC;
+
Msg Request;
enum EReadSteps { SIZE_READ, SIZE_PREPARE, INFLIGTH_CHECK, MESSAGE_READ, MESSAGE_PROCESS };
@@ -91,6 +98,10 @@ public:
size_t InflightSize;
+ TActorId ProduceActorId;
+
+ std::unordered_map<ui64, Msg> PendingRequests;
+
TKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address,
const NKikimrConfig::TKafkaProxyConfig& config)
: Socket(std::move(socket))
@@ -103,33 +114,48 @@ public:
IsSslSupported = IsSslSupported && Socket->IsSslSupported();
}
- void Bootstrap() {
+ void Bootstrap(const TActorContext& ctx) {
Become(&TKafkaConnection::StateAccepting);
Schedule(InactivityTimeout, InactivityEvent = new TEvPollerReady(nullptr, false, false));
- KAFKA_LOG_D("incoming connection opened");
+ KAFKA_LOG_I("incoming connection opened " << Address);
+
+ ProduceActorId = ctx.RegisterWithSameMailbox(new TKafkaProduceActor(SelfId(), ClientDC));
+
OnAccept();
}
void PassAway() override {
+ KAFKA_LOG_D("PassAway");
+
if (ConnectionEstablished) {
ConnectionEstablished = false;
}
+ if (ProduceActorId) {
+ Send(ProduceActorId, new TEvents::TEvPoison());
+ }
Shutdown();
TBase::PassAway();
}
protected:
+ void LogEvent(IEventHandle& ev) {
+ KAFKA_LOG_T("Event: " << ev.GetTypeName());
+ }
+
void SetNonBlock() noexcept {
Socket->SetNonBlock();
}
void Shutdown() {
+ KAFKA_LOG_D("Shutdown");
+
if (Socket) {
Socket->Shutdown();
}
}
ssize_t SocketSend(const void* data, size_t size) {
+ KAFKA_LOG_T("SocketSend Size=" << size);
return Socket->Send(data, size);
}
@@ -146,7 +172,17 @@ protected:
}
TString LogPrefix() const {
- return TStringBuilder() << "(#" << GetRawSocket() << "," << Address->ToString() << ") ";
+ TStringBuilder sb;
+ sb << "TKafkaConnection " << SelfId() << "(#" << GetRawSocket() << "," << Address->ToString() << ") State: ";
+ auto stateFunc = CurrentStateFunc();
+ if (stateFunc == &TKafkaConnection::StateConnected) {
+ sb << "Connected ";
+ } else if (stateFunc == &TKafkaConnection::StateAccepting) {
+ sb << "Accepting ";
+ } else {
+ sb << "Unknown ";
+ }
+ return sb;
}
void OnAccept() {
@@ -165,9 +201,13 @@ protected:
}
STATEFN(StateAccepting) {
+ LogEvent(*ev.Get());
switch (ev->GetTypeRewrite()) {
hFunc(TEvPollerReady, HandleAccepting);
hFunc(TEvPollerRegisterResult, HandleAccepting);
+ hFunc(TEvKafka::TEvResponse, Handle);
+ default:
+ KAFKA_LOG_ERROR("TKafkaConnection: Unexpected " << ev.Get()->GetTypeName());
}
}
@@ -177,29 +217,17 @@ protected:
InflightSize -= messageSize;
}
- void HandleMessage(TRequestHeaderData* header, TProduceRequestData* message, size_t messageSize) {
- TProduceResponseData response;
- response.Responses.resize(message->TopicData.size());
- int i = 0;
- for (auto& data : message->TopicData) {
- response.Responses[i].Name = data.Name;
- response.Responses[i].PartitionResponses.resize(data.PartitionData.size());
- int j = 0;
- for (auto& p : data.PartitionData) {
- response.Responses[i].PartitionResponses[j].Index = p.Index;
- response.Responses[i].PartitionResponses[j].BaseOffset = 40;
-
- ++j;
- }
- ++i;
- }
-
- Reply(header, &response);
+ void HandleMessage(const TRequestHeaderData* header, const TProduceRequestData* message, size_t /*messageSize*/) {
+ PendingRequests[header->CorrelationId] = std::move(Request);
+ Send(ProduceActorId, new TEvKafka::TEvProduceRequest(header->CorrelationId, message));
+ }
- InflightSize -= messageSize;
+ void Handle(TEvKafka::TEvResponse::TPtr response) {
+ auto r = response->Get();
+ Reply(r->Cookie, r->Response.get());
}
- void HandleMessage(TRequestHeaderData* header, TInitProducerIdRequestData* /*message*/, size_t messageSize) {
+ void HandleMessage(const TRequestHeaderData* header, const TInitProducerIdRequestData* /*message*/, size_t messageSize) {
TInitProducerIdResponseData response;
response.ProducerEpoch = 1;
response.ProducerId = 1;
@@ -211,7 +239,7 @@ protected:
InflightSize -= messageSize;
}
- void HandleMessage(TRequestHeaderData* header, TMetadataRequestData* message, size_t messageSize) {
+ void HandleMessage(const TRequestHeaderData* header, const TMetadataRequestData* message, size_t messageSize) {
TMetadataResponseData response;
response.ThrottleTimeMs = 0;
response.ClusterId = "cluster-ahjgk";
@@ -266,8 +294,24 @@ protected:
}
}
+ void Reply(const ui64 cookie, const TApiMessage* response) {
+ auto it = PendingRequests.find(cookie);
+ if (it == PendingRequests.end()) {
+ KAFKA_LOG_ERROR("Unexpected cookie " << cookie);
+ return;
+ }
+
+ auto& request = it->second;
+ Reply(&request.Header, response);
+
+ InflightSize -= request.ExpectedSize;
+
+ PendingRequests.erase(it);
+
+ DoRead();
+ }
+
void Reply(const TRequestHeaderData* header, const TApiMessage* reply) {
- // TODO improve allocation
TKafkaVersion headerVersion = ResponseHeaderVersion(header->RequestApiKey, header->RequestApiVersion);
TKafkaVersion version = header->RequestApiVersion;
@@ -284,9 +328,13 @@ protected:
reply->Write(writable, version);
buffer.flush();
+
+ KAFKA_LOG_D("Sent reply: ApiKey=" << header->RequestApiKey << ", Version=" << version << ", Correlation=" << responseHeader.CorrelationId << ", Size=" << size);
}
void DoRead() {
+ KAFKA_LOG_T("DoRead: Demand=" << Demand.Length << ", Step=" << static_cast<i32>(Step));
+
for (;;) {
while (Demand) {
ssize_t received = 0;
@@ -296,16 +344,13 @@ protected:
} else if (-res == EINTR) {
continue;
} else if (!res) {
- KAFKA_LOG_ERROR("connection closed");
+ KAFKA_LOG_I("connection closed");
return PassAway();
} else if (res < 0) {
- KAFKA_LOG_ERROR("connection closed - error in recv: " << strerror(-res));
+ KAFKA_LOG_I("connection closed - error in recv: " << strerror(-res));
return PassAway();
}
received = res;
- if (!received) {
- return;
- }
Request.Size += received;
Demand.Buffer += received;
@@ -408,9 +453,13 @@ protected:
}
STATEFN(StateConnected) {
+ LogEvent(*ev.Get());
switch (ev->GetTypeRewrite()) {
hFunc(TEvPollerReady, HandleConnected);
hFunc(TEvPollerRegisterResult, HandleConnected);
+ hFunc(TEvKafka::TEvResponse, Handle);
+ default:
+ KAFKA_LOG_ERROR("TKafkaConnection: Unexpected " << ev.Get()->GetTypeName());
}
}
};
diff --git a/ydb/core/kafka_proxy/kafka_events.h b/ydb/core/kafka_proxy/kafka_events.h
new file mode 100644
index 00000000000..f674fe98041
--- /dev/null
+++ b/ydb/core/kafka_proxy/kafka_events.h
@@ -0,0 +1,51 @@
+#pragma once
+
+#include <library/cpp/actors/core/event_local.h>
+#include <ydb/core/base/events.h>
+
+#include "kafka_messages.h"
+
+using namespace NActors;
+
+namespace NKafka {
+
+struct TEvKafka {
+ enum EEv {
+ EvRequest = EventSpaceBegin(NKikimr::TKikimrEvents::TKikimrEvents::ES_KAFKA),
+ EvProduceRequest,
+ EvWakeup,
+ EvResponse = EvRequest + 256,
+ EvInternalEvents = EvResponse + 256,
+ EvEnd
+ };
+
+ static_assert(
+ EvEnd < EventSpaceEnd(NKikimr::TKikimrEvents::TKikimrEvents::ES_KAFKA),
+ "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_KAFKA)");
+
+
+ struct TEvProduceRequest : public TEventLocal<TEvProduceRequest, EvProduceRequest> {
+ TEvProduceRequest(const ui64 cookie, const TProduceRequestData* request)
+ : Cookie(cookie)
+ , Request(request)
+ {}
+
+ ui64 Cookie;
+ const TProduceRequestData* Request;
+ };
+
+ struct TEvResponse : public TEventLocal<TEvResponse, EvResponse> {
+ TEvResponse(const ui64 cookie, const TApiMessage::TPtr response)
+ : Cookie(cookie)
+ , Response(std::move(response)) {
+ }
+
+ ui64 Cookie;
+ const TApiMessage::TPtr Response;
+ };
+
+ struct TEvWakeup : public TEventLocal<TEvWakeup, EvWakeup> {
+ };
+};
+
+} // namespace NKafka
diff --git a/ydb/core/kafka_proxy/kafka_messages.h b/ydb/core/kafka_proxy/kafka_messages.h
index a118fc0128d..28c8d2388d2 100644
--- a/ydb/core/kafka_proxy/kafka_messages.h
+++ b/ydb/core/kafka_proxy/kafka_messages.h
@@ -30,6 +30,8 @@ enum EApiKey {
class TRequestHeaderData : public TApiMessage {
public:
+ typedef std::shared_ptr<TRequestHeaderData> TPtr;
+
struct MessageMeta {
static constexpr TKafkaVersions PresentVersions = {0, 2};
static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()};
@@ -109,6 +111,8 @@ public:
class TResponseHeaderData : public TApiMessage {
public:
+ typedef std::shared_ptr<TResponseHeaderData> TPtr;
+
struct MessageMeta {
static constexpr TKafkaVersions PresentVersions = {0, 1};
static constexpr TKafkaVersions FlexibleVersions = {1, Max<TKafkaVersion>()};
@@ -143,6 +147,8 @@ public:
class TProduceRequestData : public TApiMessage {
public:
+ typedef std::shared_ptr<TProduceRequestData> TPtr;
+
struct MessageMeta {
static constexpr TKafkaVersions PresentVersions = {0, 9};
static constexpr TKafkaVersions FlexibleVersions = {9, Max<TKafkaVersion>()};
@@ -317,6 +323,8 @@ public:
class TProduceResponseData : public TApiMessage {
public:
+ typedef std::shared_ptr<TProduceResponseData> TPtr;
+
struct MessageMeta {
static constexpr TKafkaVersions PresentVersions = {0, 9};
static constexpr TKafkaVersions FlexibleVersions = {9, Max<TKafkaVersion>()};
@@ -585,6 +593,8 @@ public:
class TFetchRequestData : public TApiMessage {
public:
+ typedef std::shared_ptr<TFetchRequestData> TPtr;
+
struct MessageMeta {
static constexpr TKafkaVersions PresentVersions = {0, 13};
static constexpr TKafkaVersions FlexibleVersions = {12, Max<TKafkaVersion>()};
@@ -1005,6 +1015,8 @@ public:
class TFetchResponseData : public TApiMessage {
public:
+ typedef std::shared_ptr<TFetchResponseData> TPtr;
+
struct MessageMeta {
static constexpr TKafkaVersions PresentVersions = {0, 13};
static constexpr TKafkaVersions FlexibleVersions = {12, Max<TKafkaVersion>()};
@@ -1518,6 +1530,8 @@ public:
class TMetadataRequestData : public TApiMessage {
public:
+ typedef std::shared_ptr<TMetadataRequestData> TPtr;
+
struct MessageMeta {
static constexpr TKafkaVersions PresentVersions = {0, 12};
static constexpr TKafkaVersions FlexibleVersions = {9, Max<TKafkaVersion>()};
@@ -1645,6 +1659,8 @@ public:
class TMetadataResponseData : public TApiMessage {
public:
+ typedef std::shared_ptr<TMetadataResponseData> TPtr;
+
struct MessageMeta {
static constexpr TKafkaVersions PresentVersions = {0, 12};
static constexpr TKafkaVersions FlexibleVersions = {9, Max<TKafkaVersion>()};
@@ -2066,6 +2082,8 @@ public:
class TApiVersionsRequestData : public TApiMessage {
public:
+ typedef std::shared_ptr<TApiVersionsRequestData> TPtr;
+
struct MessageMeta {
static constexpr TKafkaVersions PresentVersions = {0, 3};
static constexpr TKafkaVersions FlexibleVersions = {3, Max<TKafkaVersion>()};
@@ -2115,6 +2133,8 @@ public:
class TApiVersionsResponseData : public TApiMessage {
public:
+ typedef std::shared_ptr<TApiVersionsResponseData> TPtr;
+
struct MessageMeta {
static constexpr TKafkaVersions PresentVersions = {0, 3};
static constexpr TKafkaVersions FlexibleVersions = {3, Max<TKafkaVersion>()};
@@ -2432,6 +2452,8 @@ public:
class TInitProducerIdRequestData : public TApiMessage {
public:
+ typedef std::shared_ptr<TInitProducerIdRequestData> TPtr;
+
struct MessageMeta {
static constexpr TKafkaVersions PresentVersions = {0, 4};
static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()};
@@ -2511,6 +2533,8 @@ public:
class TInitProducerIdResponseData : public TApiMessage {
public:
+ typedef std::shared_ptr<TInitProducerIdResponseData> TPtr;
+
struct MessageMeta {
static constexpr TKafkaVersions PresentVersions = {0, 4};
static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()};
diff --git a/ydb/core/kafka_proxy/kafka_messages_int.h b/ydb/core/kafka_proxy/kafka_messages_int.h
index 69cbaea98ef..95172b82289 100644
--- a/ydb/core/kafka_proxy/kafka_messages_int.h
+++ b/ydb/core/kafka_proxy/kafka_messages_int.h
@@ -5,6 +5,7 @@
#include <vector>
#include <util/generic/array_ref.h>
+#include <util/generic/ptr.h>
#include <util/generic/yexception.h>
#include <contrib/libs/cxxsupp/libcxx/include/type_traits>
diff --git a/ydb/core/kafka_proxy/kafka_produce_actor.cpp b/ydb/core/kafka_proxy/kafka_produce_actor.cpp
new file mode 100644
index 00000000000..8a3ff23d281
--- /dev/null
+++ b/ydb/core/kafka_proxy/kafka_produce_actor.cpp
@@ -0,0 +1,526 @@
+#include "kafka_produce_actor.h"
+
+#include <contrib/libs/protobuf/src/google/protobuf/util/time_util.h>
+
+#include <ydb/core/base/path.h>
+#include <ydb/core/protos/grpc_pq_old.pb.h>
+
+namespace NKafka {
+
+static constexpr TDuration WAKEUP_INTERVAL = TDuration::Seconds(1);
+static constexpr TDuration TOPIC_NOT_FOUND_EXPIRATION_INTERVAL = TDuration::Seconds(15);
+static constexpr TDuration REQUEST_EXPIRATION_INTERVAL = TDuration::Seconds(30);
+static constexpr TDuration WRITER_EXPIRATION_INTERVAL = TDuration::Minutes(5);
+
+TString TKafkaProduceActor::LogPrefix() {
+ TStringBuilder sb;
+ sb << "TKafkaProduceActor " << SelfId() << " State: ";
+ auto stateFunc = CurrentStateFunc();
+ if (stateFunc == &TKafkaProduceActor::StateInit) {
+ sb << "Init ";
+ } else if (stateFunc == &TKafkaProduceActor::StateWork) {
+ sb << "Work ";
+ } else if (stateFunc == &TKafkaProduceActor::StateAccepting) {
+ sb << "Accepting ";
+ } else {
+ sb << "Unknown ";
+ }
+ return sb;
+}
+
+void TKafkaProduceActor::LogEvent(IEventHandle& ev) {
+ KAFKA_LOG_T("Received event: " << ev.GetTypeName());
+}
+
+void TKafkaProduceActor::Bootstrap(const NActors::TActorContext& /*ctx*/) {
+ Schedule(WAKEUP_INTERVAL, new TEvKafka::TEvWakeup());
+ Become(&TKafkaProduceActor::StateWork);
+}
+
+void TKafkaProduceActor::Handle(TEvKafka::TEvWakeup::TPtr /*request*/, const TActorContext& ctx) {
+ KAFKA_LOG_D("Wakeup");
+
+ SendResults(ctx);
+ CleanTopics(ctx);
+ CleanWriters(ctx);
+
+ Schedule(WAKEUP_INTERVAL, new TEvKafka::TEvWakeup());
+
+ KAFKA_LOG_T("Wakeup was completed successfully");
+}
+
+void TKafkaProduceActor::PassAway() {
+ KAFKA_LOG_D("PassAway");
+
+ for(const auto& [_, partitionWriters] : Writers) {
+ for(const auto& [_, w] : partitionWriters) {
+ Send(w.ActorId, new TEvents::TEvPoison());
+ }
+ }
+
+ Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove());
+
+ TActorBootstrapped::PassAway();
+
+ KAFKA_LOG_T("PassAway was completed successfully");
+}
+
+void TKafkaProduceActor::CleanTopics(const TActorContext& ctx) {
+ const auto expired = ctx.Now() - TOPIC_NOT_FOUND_EXPIRATION_INTERVAL;
+
+ std::map<TString, TTopicInfo> newTopics;
+ for(auto& [topicPath, topicInfo] : Topics) {
+ if (!topicInfo.NotFound || topicInfo.NotFoundTime > expired) {
+ newTopics[topicPath] = std::move(topicInfo);
+ }
+ }
+ Topics = std::move(newTopics);
+}
+
+void TKafkaProduceActor::CleanWriters(const TActorContext& ctx) {
+ KAFKA_LOG_D("CleanWriters");
+ const auto expired = ctx.Now() - WRITER_EXPIRATION_INTERVAL;
+
+ for (auto& [topicPath, partitionWriters] : Writers) {
+ std::unordered_map<ui32, TWriterInfo> newPartitionWriters;
+ for (const auto& [partitionId, writerInfo] : partitionWriters) {
+ if (writerInfo.LastAccessed > expired) {
+ newPartitionWriters[partitionId] = writerInfo;
+ } else {
+ TStringBuilder sb;
+ sb << "Destroing inactive PartitionWriter. Topic='" << topicPath << "', Partition=" << partitionId;
+ KAFKA_LOG_D(sb);
+ Send(writerInfo.ActorId, new TEvents::TEvPoison());
+ }
+ }
+ partitionWriters = std::move(newPartitionWriters);
+ }
+
+ KAFKA_LOG_T("CleanWriters was completed successfully");
+}
+
+void TKafkaProduceActor::EnqueueRequest(TEvKafka::TEvProduceRequest::TPtr request, const TActorContext& /*ctx*/) {
+ Requests.push_back(request);
+}
+
+void TKafkaProduceActor::HandleInit(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
+ auto* navigate = ev.Get()->Get()->Request.Get();
+ for (auto& info : navigate->ResultSet) {
+ if (NSchemeCache::TSchemeCacheNavigate::EStatus::Ok == info.Status) {
+ auto topicPath = "/" + NKikimr::JoinPath(info.Path);
+ KAFKA_LOG_D("Received topic '" << topicPath << "' description");
+ TopicsForInitialization.erase(topicPath);
+
+ auto& topic = Topics[topicPath];
+ for(auto& p : info.PQGroupInfo->Description.GetPartitions()) {
+ topic.partitions[p.GetPartitionId()] = p.GetTabletId();
+ }
+
+ auto pathId = info.TableId.PathId;
+ Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchPathId(pathId));
+ }
+ }
+
+ for(auto& topicPath : TopicsForInitialization) {
+ KAFKA_LOG_D("Topic '" << topicPath << "' not found");
+ auto& topicInfo = Topics[topicPath];
+ topicInfo.NotFound = true;
+ topicInfo.NotFoundTime = ctx.Now();
+ }
+
+ TopicsForInitialization.clear();
+
+ Become(&TKafkaProduceActor::StateWork);
+
+ KAFKA_LOG_T("HandleInit(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr) was completed successfully");
+
+ ProcessRequests(ctx);
+}
+
+void TKafkaProduceActor::Handle(TEvTxProxySchemeCache::TEvWatchNotifyDeleted::TPtr& ev, const TActorContext& ctx) {
+ auto& path = ev->Get()->Path;
+ KAFKA_LOG_I("Topic '" << path << "' was deleted");
+
+ auto it = Writers.find(path);
+ if (it != Writers.end()) {
+ for(auto& [_, writer] : it->second) {
+ Send(writer.ActorId, new TEvents::TEvPoison());
+ }
+ Writers.erase(it);
+ }
+
+ auto& topicInfo = Topics[path];
+ topicInfo.NotFound = true;
+ topicInfo.NotFoundTime = ctx.Now();
+ topicInfo.partitions.clear();
+}
+
+void TKafkaProduceActor::Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& /*ctx*/) {
+ auto* e = ev->Get();
+ auto& path = e->Path;
+ KAFKA_LOG_I("Topic '" << path << "' was updated");
+
+ auto& topic = Topics[path];
+ topic.partitions.clear();
+ for (auto& p : e->Result->GetPathDescription().GetPersQueueGroup().GetPartitions()) {
+ topic.NotFound = false;
+ topic.partitions[p.GetPartitionId()] = p.GetTabletId();
+ }
+}
+
+void TKafkaProduceActor::Handle(TEvKafka::TEvProduceRequest::TPtr request, const TActorContext& ctx) {
+ Requests.push_back(request);
+ ProcessRequests(ctx);
+}
+
+void TKafkaProduceActor::ProcessRequests(const TActorContext& ctx) {
+ if (&TKafkaProduceActor::StateWork != CurrentStateFunc()) {
+ KAFKA_LOG_ERROR("Unexpected state");
+ return;
+ }
+
+ if (Requests.empty()) {
+ return;
+ }
+
+ if (EnqueueInitialization()) {
+ PendingRequests.push_back({Requests.front()});
+ Requests.pop_front();
+
+ ProcessRequest(PendingRequests.back(), ctx);
+ } else {
+ ProcessInitializationRequests(ctx);
+ }
+}
+
+size_t TKafkaProduceActor::EnqueueInitialization() {
+ size_t canProcess = 0;
+ bool requireInitialization = false;
+
+ for(const auto& e : Requests) {
+ auto* r = e->Get()->Request;
+ for(const auto& topicData : r->TopicData) {
+ const auto& topicPath = *topicData.Name;
+ if (!Topics.contains(topicPath)) {
+ requireInitialization = true;
+ TopicsForInitialization.insert(topicPath);
+ }
+ }
+ if (!requireInitialization) {
+ ++canProcess;
+ }
+ }
+
+ return canProcess;
+}
+
+THolder<TEvPartitionWriter::TEvWriteRequest> Convert(const TProduceRequestData::TTopicProduceData::TPartitionProduceData& data,
+ const TString& topicName,
+ ui64 cookie,
+ const TString& clientDC) {
+ auto ev = MakeHolder<TEvPartitionWriter::TEvWriteRequest>();
+ auto& request = ev->Record;
+
+ const auto& batch = data.Records;
+ const TString sourceId = TStringBuilder() << batch->ProducerId;
+
+ auto* partitionRequest = request.MutablePartitionRequest();
+ partitionRequest->SetTopic(topicName);
+ partitionRequest->SetPartition(data.Index);
+ // partitionRequest->SetCmdWriteOffset();
+ partitionRequest->SetCookie(cookie);
+ // partitionRequest->SetPutUnitsSize(); TODO
+
+ for (const auto& record : batch->Records) {
+ if (!record.Value) {
+ continue;
+ }
+
+ NKikimrPQClient::TDataChunk proto;
+ for(auto& h : record.Headers) {
+ auto res = proto.AddMessageMeta();
+ res->set_key(static_cast<const char*>(h.Key->data()), h.Key->size());
+ res->set_value(static_cast<const char*>(h.Value->data()), h.Value->size());
+ }
+
+ {
+ auto res = proto.AddMessageMeta();
+ res->set_key("__key");
+ res->set_value(static_cast<const char*>(record.Key->data()), record.Key->size());
+ }
+
+ proto.SetData(static_cast<const void*>(record.Value->data()), record.Value->size());
+
+ TString str;
+ bool res = proto.SerializeToString(&str);
+ Y_VERIFY(res);
+
+ auto w = partitionRequest->AddCmdWrite();
+
+ w->SetSourceId(sourceId);
+ w->SetSeqNo(batch->BaseOffset + record.OffsetDelta);
+ w->SetData(str);
+ w->SetCreateTimeMS(batch->BaseTimestamp + record.TimestampDelta);
+ w->SetDisableDeduplication(true);
+ w->SetUncompressedSize(record.Value->size());
+ w->SetClientDC(clientDC);
+ w->SetIgnoreQuotaDeadline(true);
+ }
+
+ return ev;
+}
+
+size_t PartsCount(const TProduceRequestData* r) {
+ size_t result = 0;
+ for(const auto& topicData : r->TopicData) {
+ result += topicData.PartitionData.size();
+ }
+ return result;
+}
+
+void TKafkaProduceActor::ProcessRequest(TPendingRequest& pendingRequest, const TActorContext& ctx) {
+ auto* r = pendingRequest.Request->Get()->Request;
+
+ pendingRequest.Results.resize(PartsCount(r));
+ pendingRequest.StartTime = ctx.Now();
+
+ size_t position = 0;
+ for(const auto& topicData : r->TopicData) {
+ const TString& topicPath = *topicData.Name;
+ for(const auto& partitionData : topicData.PartitionData) {
+ const auto partitionId = partitionData.Index;
+
+ auto writerId = PartitionWriter(topicPath, partitionId, ctx);
+ if (writerId) {
+ auto ownCookie = ++Cookie;
+ auto& cookieInfo = Cookies[ownCookie];
+ cookieInfo.TopicPath = topicPath;
+ cookieInfo.PartitionId = partitionId;
+ cookieInfo.Position = position;
+ cookieInfo.Request = &pendingRequest;
+
+ pendingRequest.WaitAcceptingCookies.insert(ownCookie);
+ pendingRequest.WaitResultCookies.insert(ownCookie);
+
+ auto ev = Convert(partitionData, *topicData.Name, ownCookie, ClientDC);
+
+ Send(writerId, std::move(ev));
+ } else {
+ auto& result = pendingRequest.Results[position];
+ result.ErrorCode = EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION;
+ }
+
+ ++position;
+ }
+ }
+
+ if (pendingRequest.WaitResultCookies.empty()) {
+ // All request for unknown topic or empty request
+ SendResults(ctx);
+ } else {
+ Become(&TKafkaProduceActor::StateAccepting);
+ }
+}
+
+void TKafkaProduceActor::HandleAccepting(TEvPartitionWriter::TEvWriteAccepted::TPtr request, const TActorContext& ctx) {
+ auto r = request->Get();
+ auto cookie = r->Cookie;
+
+ auto it = Cookies.find(cookie);
+ if (it == Cookies.end()) {
+ KAFKA_LOG_W("Received TEvWriteAccepted with unexpected cookie " << cookie);
+ return;
+ }
+
+ auto& cookieInfo = it->second;
+ auto& expectedCookies = cookieInfo.Request->WaitAcceptingCookies;
+ expectedCookies.erase(cookie);
+
+ if (expectedCookies.empty()) {
+ Become(&TKafkaProduceActor::StateWork);
+ ProcessRequests(ctx);
+ }
+}
+
+void TKafkaProduceActor::Handle(TEvPartitionWriter::TEvInitResult::TPtr request, const TActorContext& /*ctx*/) {
+ KAFKA_LOG_D("Init " << request->Get()->ToString());
+}
+
+void TKafkaProduceActor::Handle(TEvPartitionWriter::TEvWriteResponse::TPtr request, const TActorContext& ctx) {
+ auto r = request->Get();
+ auto cookie = r->Record.GetPartitionResponse().GetCookie();
+
+ auto it = Cookies.find(cookie);
+ if (it == Cookies.end()) {
+ KAFKA_LOG_W("Received TEvWriteResponse with unexpected cookie " << cookie);
+ return;
+ }
+
+ auto& cookieInfo = it->second;
+ auto& partitionResult = cookieInfo.Request->Results[cookieInfo.Position];
+ partitionResult.ErrorCode = EKafkaErrors::NONE_ERROR;
+ partitionResult.Value = request;
+ cookieInfo.Request->WaitResultCookies.erase(cookie);
+
+ Cookies.erase(it);
+
+ if (!r->IsSuccess()) {
+ auto wit = Writers.find(cookieInfo.TopicPath);
+ if (wit != Writers.end()) {
+ auto& partitions = wit->second;
+ auto pit = partitions.find(cookieInfo.PartitionId);
+ if (pit != partitions.end()) {
+ Send(pit->second.ActorId, new TEvents::TEvPoison());
+ partitions.erase(pit);
+ }
+ }
+ }
+
+ if (cookieInfo.Request->WaitResultCookies.empty()) {
+ SendResults(ctx);
+ }
+}
+
+void TKafkaProduceActor::SendResults(const TActorContext& ctx) {
+ auto expireTime = ctx.Now() - REQUEST_EXPIRATION_INTERVAL;
+
+ KAFKA_LOG_T("Sending results. QueueSize= " << PendingRequests.size() << ", ExpirationTime=" << expireTime);
+
+ // We send the results in the order of receipt of the request
+ while (!PendingRequests.empty()) {
+ auto& pendingRequest = PendingRequests.front();
+
+ // We send the response by timeout. This is possible, for example, if the event was lost or the PartitionWrite died.
+ bool expired = expireTime > pendingRequest.StartTime;
+
+ if (!expired && !pendingRequest.WaitResultCookies.empty()) {
+ return;
+ }
+
+ auto* r = pendingRequest.Request->Get()->Request;
+ auto cookie = pendingRequest.Request->Get()->Cookie;
+
+ KAFKA_LOG_D("Send result for cookie " << cookie << ". Expired=" << expired);
+
+ const auto topicsCount = r->TopicData.size();
+ auto response = std::make_shared<TProduceResponseData>();
+ response->Responses.resize(topicsCount);
+
+ size_t position = 0;
+
+ for(size_t i = 0; i < topicsCount; ++i) {
+ const auto& topicData = r->TopicData[i];
+ const auto partitionCount = topicData.PartitionData.size();
+
+ auto& topicResponse = response->Responses[i];
+ topicResponse.Name = topicData.Name;
+ topicResponse.PartitionResponses.resize(partitionCount);
+
+ for(size_t j = 0; j < partitionCount; ++j) {
+ const auto& partitionData = topicData.PartitionData[j];
+ auto& partitionResponse = topicResponse.PartitionResponses[j];
+ const auto& result = pendingRequest.Results[position++];
+
+ partitionResponse.Index = partitionData.Index;
+
+ if (EKafkaErrors::NONE_ERROR != result.ErrorCode) {
+ partitionResponse.ErrorCode = result.ErrorCode;
+ partitionResponse.ErrorMessage = result.ErrorMessage;
+ } else {
+ auto* msg = result.Value->Get();
+ if (msg->IsSuccess()) {
+ partitionResponse.ErrorCode = EKafkaErrors::NONE_ERROR;
+ auto& writeResults = msg->Record.GetPartitionResponse().GetCmdWriteResult();
+
+ if (!writeResults.empty()) {
+ auto& lastResult = writeResults.at(writeResults.size() - 1);
+ partitionResponse.LogAppendTimeMs = lastResult.GetWriteTimestampMS();
+ partitionResponse.BaseOffset = lastResult.GetSeqNo();
+ }
+ } else {
+ partitionResponse.ErrorCode = EKafkaErrors::UNKNOWN_SERVER_ERROR;
+ partitionResponse.ErrorMessage = msg->GetError().Reason;
+ }
+ }
+ }
+ }
+
+ Send(Client, new TEvKafka::TEvResponse(cookie, response));
+
+ if (!pendingRequest.WaitAcceptingCookies.empty()) {
+ if (!expired) {
+ TStringBuilder sb;
+ sb << "All TEvWriteResponse were received, but not all TEvWriteAccepted. Unreceived cookies:";
+ for(auto cookie : pendingRequest.WaitAcceptingCookies) {
+ sb << " " << cookie;
+ }
+ KAFKA_LOG_W(sb);
+ }
+ if (&TKafkaProduceActor::StateAccepting == CurrentStateFunc()) {
+ Become(&TKafkaProduceActor::StateWork);
+ }
+ }
+
+ PendingRequests.pop_front();
+ }
+}
+
+void TKafkaProduceActor::ProcessInitializationRequests(const TActorContext& ctx) {
+ if (TopicsForInitialization.empty()) {
+ return;
+ }
+
+ Become(&TKafkaProduceActor::StateInit);
+
+ auto request = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
+
+ for(auto& topicPath : TopicsForInitialization) {
+ KAFKA_LOG_D("Describe topic '" << topicPath << "'");
+ NSchemeCache::TSchemeCacheNavigate::TEntry entry;
+ entry.Path = NKikimr::SplitPath(topicPath);
+ entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList;
+ entry.SyncVersion = true;
+
+ request->ResultSet.emplace_back(entry);
+ }
+
+ ctx.Send(MakeSchemeCacheID(), MakeHolder<TEvTxProxySchemeCache::TEvNavigateKeySet>(request.release()));
+}
+
+TActorId TKafkaProduceActor::PartitionWriter(const TString& topicPath, ui32 partitionId, const TActorContext& ctx) {
+ auto& partitionWriters = Writers[topicPath];
+ auto itp = partitionWriters.find(partitionId);
+ if (itp != partitionWriters.end()) {
+ auto& writerInfo = itp->second;
+ writerInfo.LastAccessed = ctx.Now();
+ return writerInfo.ActorId;
+ }
+
+ auto it = Topics.find(topicPath);
+ if (it == Topics.end()) {
+ KAFKA_LOG_ERROR("Internal error: topic '" << topicPath << "' isn`t initialized");
+ return TActorId{};
+ }
+
+ auto& topicInfo = it->second;
+ if (topicInfo.NotFound) {
+ return TActorId{};
+ }
+
+ auto& partitions = topicInfo.partitions;
+ auto pit = partitions.find(partitionId);
+ if (pit == partitions.end()) {
+ return TActorId{};
+ }
+
+ auto tabletId = pit->second;
+ auto* writerActor = CreatePartitionWriter(SelfId(), tabletId, partitionId, SourceId,
+ TPartitionWriterOpts().WithDeduplication(false));
+
+ auto& writerInfo = partitionWriters[partitionId];
+ writerInfo.ActorId = ctx.RegisterWithSameMailbox(writerActor);
+ writerInfo.LastAccessed = ctx.Now();
+ return writerInfo.ActorId;
+}
+
+} // namespace NKafka
diff --git a/ydb/core/kafka_proxy/kafka_produce_actor.h b/ydb/core/kafka_proxy/kafka_produce_actor.h
new file mode 100644
index 00000000000..c618da54940
--- /dev/null
+++ b/ydb/core/kafka_proxy/kafka_produce_actor.h
@@ -0,0 +1,186 @@
+#pragma once
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <ydb/core/base/tablet_pipe.h>
+#include <ydb/core/persqueue/writer/writer.h>
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
+
+#include "kafka_events.h"
+
+namespace NKafka {
+
+using namespace NKikimr;
+using namespace NKikimr::NPQ;
+using namespace NKikimrClient;
+
+//
+// This actor handles write requests.
+// Each request can contain data for writing to several topics, and in each topic to several partitions.
+// When a request to write to an unknown topic arrives, the actor changes the state to Init until it receives
+// information about all the topics needed to process the request.
+//
+// Requests are processed in parallel, but it is guaranteed that the recording order will be preserved.
+// The order of responses to requests is also guaranteed.
+//
+// When the request begins to be processed, the actor enters the Accepting state. In this state, responses
+// are expected from all TPartitionWriters confirming acceptance of the request (TEvWriteAccepted). After that,
+// the actor switches back to the Work state. This guarantees the order of writing to each partition.
+//
+class TKafkaProduceActor: public NActors::TActorBootstrapped<TKafkaProduceActor> {
+ struct TPendingRequest;
+public:
+ TKafkaProduceActor(const TActorId& client, const TString& clientDC)
+ : Client(client)
+ , ClientDC(clientDC) {
+ }
+
+ void Bootstrap(const NActors::TActorContext& ctx);
+
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::KAFKA_PRODUCE_ACTOR; }
+
+private:
+ void PassAway() override;
+
+ // Handlers for many StateFunc
+ void Handle(TEvKafka::TEvWakeup::TPtr request, const TActorContext& ctx);
+ void Handle(TEvPartitionWriter::TEvWriteResponse::TPtr request, const TActorContext& ctx);
+ void Handle(TEvPartitionWriter::TEvInitResult::TPtr request, const TActorContext& ctx);
+ void EnqueueRequest(TEvKafka::TEvProduceRequest::TPtr request, const TActorContext& ctx);
+ void Handle(TEvTxProxySchemeCache::TEvWatchNotifyDeleted::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx);
+
+ // StateInit - describe topics
+ void HandleInit(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx);
+
+ STATEFN(StateInit) {
+ LogEvent(*ev.Get());
+ switch (ev->GetTypeRewrite()) {
+ HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleInit);
+
+ HFunc(TEvKafka::TEvProduceRequest, EnqueueRequest);
+ HFunc(TEvPartitionWriter::TEvInitResult, Handle);
+ HFunc(TEvPartitionWriter::TEvWriteResponse, Handle);
+
+ HFunc(TEvTxProxySchemeCache::TEvWatchNotifyDeleted, Handle);
+ HFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle);
+
+ HFunc(TEvKafka::TEvWakeup, Handle);
+ sFunc(TEvents::TEvPoison, PassAway);
+ }
+ }
+
+ // StateWork - processing messages
+ void Handle(TEvKafka::TEvProduceRequest::TPtr request, const TActorContext& ctx);
+
+ STATEFN(StateWork) {
+ LogEvent(*ev.Get());
+ switch (ev->GetTypeRewrite()) {
+ HFunc(TEvKafka::TEvProduceRequest, Handle);
+ HFunc(TEvPartitionWriter::TEvInitResult, Handle);
+ HFunc(TEvPartitionWriter::TEvWriteResponse, Handle);
+
+ HFunc(TEvTxProxySchemeCache::TEvWatchNotifyDeleted, Handle);
+ HFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle);
+
+ HFunc(TEvKafka::TEvWakeup, Handle);
+ sFunc(TEvents::TEvPoison, PassAway);
+ }
+ }
+
+ // StateAccepting - enqueue ProduceRequest parts to PartitionWriters
+ // This guarantees the order of responses according order of request
+ void HandleAccepting(TEvPartitionWriter::TEvWriteAccepted::TPtr request, const TActorContext& ctx);
+
+ STATEFN(StateAccepting) {
+ LogEvent(*ev.Get());
+ switch (ev->GetTypeRewrite()) {
+ HFunc(TEvPartitionWriter::TEvWriteAccepted, HandleAccepting);
+
+ HFunc(TEvKafka::TEvProduceRequest, EnqueueRequest);
+ HFunc(TEvPartitionWriter::TEvInitResult, Handle);
+ HFunc(TEvPartitionWriter::TEvWriteResponse, Handle);
+
+ HFunc(TEvTxProxySchemeCache::TEvWatchNotifyDeleted, Handle);
+ HFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle);
+
+ HFunc(TEvKafka::TEvWakeup, Handle);
+ sFunc(TEvents::TEvPoison, PassAway);
+ }
+ }
+
+
+ // Logic
+ void ProcessRequests(const TActorContext& ctx);
+ void ProcessRequest(TPendingRequest& pendingRequest, const TActorContext& ctx);
+
+ void SendResults(const TActorContext& ctx);
+
+ size_t EnqueueInitialization();
+ void ProcessInitializationRequests(const TActorContext& ctx);
+ void CleanTopics(const TActorContext& ctx);
+ void CleanWriters(const TActorContext& ctx);
+
+ TActorId PartitionWriter(const TString& topicPath, ui32 partitionId, const TActorContext& ctx);
+
+ TString LogPrefix();
+ void LogEvent(IEventHandle& ev);
+
+private:
+ const TActorId Client;
+ TString SourceId;
+
+ TString ClientDC;
+
+ ui64 Cookie = 0;
+ TDeque<TEvKafka::TEvProduceRequest::TPtr> Requests;
+
+ struct TPendingRequest {
+ TPendingRequest(TEvKafka::TEvProduceRequest::TPtr request)
+ : Request(request) {
+ }
+
+ TEvKafka::TEvProduceRequest::TPtr Request;
+
+ struct TPartitionResult {
+ EKafkaErrors ErrorCode = EKafkaErrors::REQUEST_TIMED_OUT;
+ TString ErrorMessage;
+ TEvPartitionWriter::TEvWriteResponse::TPtr Value;
+ };
+ std::vector<TPartitionResult> Results;
+
+ std::set<ui64> WaitAcceptingCookies;
+ std::set<ui64> WaitResultCookies;
+
+ TInstant StartTime;
+ };
+ TDeque<TPendingRequest> PendingRequests;
+
+ struct TCookieInfo {
+ TString TopicPath;
+ ui32 PartitionId;
+ size_t Position;
+
+ TPendingRequest* Request;
+ };
+ std::map<ui64, TCookieInfo> Cookies;
+
+ std::set<TString> TopicsForInitialization;
+
+ struct TTopicInfo {
+ bool NotFound = false;
+ TInstant NotFoundTime;
+
+ // partitioId -> tabletId
+ std::unordered_map<ui32, ui64> partitions;
+ };
+ std::map<TString, TTopicInfo> Topics;
+
+ struct TWriterInfo {
+ TActorId ActorId;
+ TInstant LastAccessed;
+ };
+ // TopicPath -> PartitionId -> TPartitionWriter
+ std::unordered_map<TString, std::unordered_map<ui32, TWriterInfo>> Writers;
+};
+
+}
diff --git a/ydb/core/kafka_proxy/ya.make b/ydb/core/kafka_proxy/ya.make
index eec08b69054..ce30eca927e 100644
--- a/ydb/core/kafka_proxy/ya.make
+++ b/ydb/core/kafka_proxy/ya.make
@@ -11,6 +11,8 @@ SRCS(
kafka_messages.h
kafka_messages_int.cpp
kafka_messages_int.h
+ kafka_produce_actor.cpp
+ kafka_produce_actor.h
kafka_proxy.h
kafka_records.cpp
)
diff --git a/ydb/core/persqueue/blob.h b/ydb/core/persqueue/blob.h
index 0410a6b7d01..4b99f0b8000 100644
--- a/ydb/core/persqueue/blob.h
+++ b/ydb/core/persqueue/blob.h
@@ -88,7 +88,7 @@ struct TClientBlob {
return !PartData || PartData->PartNo + 1 == PartData->TotalParts;
}
- static const ui32 OVERHEAD = sizeof(ui32)/*totalSize*/ + sizeof(ui64)/*SeqNo*/ + sizeof(ui16) /*SourceId*/ + sizeof(ui64) /*WriteTimestamp*/ + sizeof(ui64) /*CreateTimestamp*/;
+ static constexpr ui32 OVERHEAD = sizeof(ui32)/*totalSize*/ + sizeof(ui64)/*SeqNo*/ + sizeof(ui16) /*SourceId*/ + sizeof(ui64) /*WriteTimestamp*/ + sizeof(ui64) /*CreateTimestamp*/;
void SerializeTo(TBuffer& buffer) const;
static TClientBlob Deserialize(const char *data, ui32 size);
diff --git a/ydb/core/persqueue/writer/writer.cpp b/ydb/core/persqueue/writer/writer.cpp
index df0f0925ee2..57495befe2b 100644
--- a/ydb/core/persqueue/writer/writer.cpp
+++ b/ydb/core/persqueue/writer/writer.cpp
@@ -123,6 +123,11 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> {
}
void BecomeZombie(const TString& error) {
+ SendError(error);
+ Become(&TThis::StateZombie);
+ }
+
+ void SendError(const TString& error) {
for (auto cookie : std::exchange(PendingWrite, {})) {
SendWriteResult(error, MakeResponse(cookie));
}
@@ -132,8 +137,6 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> {
for (const auto& [cookie, _] : std::exchange(Pending, {})) {
SendWriteResult(error, MakeResponse(cookie));
}
-
- Become(&TThis::StateZombie);
}
template <typename... Args>
@@ -453,7 +456,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> {
if (PipeClient) {
NTabletPipe::CloseAndForgetClient(SelfId(), PipeClient);
}
-
+ SendError("Unexpected termination");
TActorBootstrapped::PassAway();
}
diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto
index f9b74e17657..dcbf04d226a 100644
--- a/ydb/library/services/services.proto
+++ b/ydb/library/services/services.proto
@@ -996,5 +996,6 @@ message TActivity {
KQP_SCAN_FETCH_ACTOR = 614;
COLUMNSHARD_CONVEYOR = 615;
PERSQUEUE_READ_QUOTER = 616;
+ KAFKA_PRODUCE_ACTOR = 617;
};
};
diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
index f8fe09e9bef..810d1c98fe9 100644
--- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp
+++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp
@@ -1417,6 +1417,8 @@ void TWriteSessionActor<UseMigrationProtocol>::PrepareRequest(THolder<TEvWrite>&
ui64 maxMessageMetadataSize = 0;
auto addData = [&](const Topic::StreamWriteMessage::WriteRequest& writeRequest, const i32 messageIndex) {
+ const auto& msg = writeRequest.messages(messageIndex);
+
auto w = request.MutablePartitionRequest()->AddCmdWrite();
w->SetData(GetSerializedData(InitMeta, writeRequest, messageIndex));
if (UseDeduplication) {
@@ -1424,14 +1426,15 @@ void TWriteSessionActor<UseMigrationProtocol>::PrepareRequest(THolder<TEvWrite>&
} else {
w->SetDisableDeduplication(true);
}
- w->SetSeqNo(writeRequest.messages(messageIndex).seq_no());
+ w->SetSeqNo(msg.seq_no());
SeqNoInflight.push_back(w->GetSeqNo());
- w->SetCreateTimeMS(::google::protobuf::util::TimeUtil::TimestampToMilliseconds(writeRequest.messages(messageIndex).created_at()));
- w->SetUncompressedSize(writeRequest.messages(messageIndex).uncompressed_size());
+ w->SetCreateTimeMS(::google::protobuf::util::TimeUtil::TimestampToMilliseconds(msg.created_at()));
+ w->SetUncompressedSize(msg.uncompressed_size());
w->SetClientDC(ClientDC);
w->SetIgnoreQuotaDeadline(true);
+
payloadSize += w->GetData().size() + w->GetSourceId().size();
- const auto& msg = writeRequest.messages(messageIndex);
+
ui64 currMetadataSize = 0;
for (const auto& metaItem : msg.metadata_items()) {
currMetadataSize += metaItem.key().size() + metaItem.value().size();