diff options
author | tesseract <tesseract@yandex-team.com> | 2023-08-03 15:57:36 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-08-03 15:57:36 +0300 |
commit | 1a304629e0b0acbf460271ad8c6101d003884380 (patch) | |
tree | 9ed27ec418c61d47f6b83d63ac195d6cbd3668d2 | |
parent | 4c75b5ed26e3369316d8a9b06950527b94ad7c1d (diff) | |
download | ydb-1a304629e0b0acbf460271ad8c6101d003884380.tar.gz |
Kafka write protocol
-rw-r--r-- | ydb/core/base/events.h | 3 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka.h | 140 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_connection.cpp | 109 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_events.h | 51 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_messages.h | 24 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_messages_int.h | 1 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_produce_actor.cpp | 526 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_produce_actor.h | 186 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/ya.make | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/blob.h | 2 | ||||
-rw-r--r-- | ydb/core/persqueue/writer/writer.cpp | 9 | ||||
-rw-r--r-- | ydb/library/services/services.proto | 1 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/actors/write_session_actor.ipp | 11 |
17 files changed, 1030 insertions, 39 deletions
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h index f81a444a7f7..fcc5dad2041 100644 --- a/ydb/core/base/events.h +++ b/ydb/core/base/events.h @@ -163,7 +163,8 @@ struct TKikimrEvents : TEvents { ES_CONVEYOR, ES_KQP_SCAN_EXCHANGE, ES_IC_NODE_CACHE, - ES_DATA_OPERATIONS + ES_DATA_OPERATIONS, + ES_KAFKA }; }; diff --git a/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt index 09418d69986..f5901e67f90 100644 --- a/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kafka_proxy/CMakeLists.darwin-x86_64.txt @@ -22,5 +22,6 @@ target_sources(ydb-core-kafka_proxy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_produce_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp ) diff --git a/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt b/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt index d4beb652817..ce6bfbd67a3 100644 --- a/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kafka_proxy/CMakeLists.linux-aarch64.txt @@ -23,5 +23,6 @@ target_sources(ydb-core-kafka_proxy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_produce_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp ) diff --git a/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt index d4beb652817..ce6bfbd67a3 100644 --- a/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kafka_proxy/CMakeLists.linux-x86_64.txt @@ -23,5 +23,6 @@ target_sources(ydb-core-kafka_proxy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_produce_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp ) diff --git a/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt b/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt index 09418d69986..f5901e67f90 100644 --- a/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kafka_proxy/CMakeLists.windows-x86_64.txt @@ -22,5 +22,6 @@ target_sources(ydb-core-kafka_proxy PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_connection.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_messages_int.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_produce_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/kafka_records.cpp ) diff --git a/ydb/core/kafka_proxy/kafka.h b/ydb/core/kafka_proxy/kafka.h index 7297003527b..70c41082116 100644 --- a/ydb/core/kafka_proxy/kafka.h +++ b/ydb/core/kafka_proxy/kafka.h @@ -157,6 +157,144 @@ enum ESizeFormat { } // namespace NPrivate +// see https://kafka.apache.org/11/protocol.html#protocol_error_codes +enum EKafkaErrors { + + UNKNOWN_SERVER_ERROR = -1, // The server experienced an unexpected error when processing the request. + NONE_ERROR = 0, + OFFSET_OUT_OF_RANGE, // The requested offset is not within the range of offsets maintained by the server., + CORRUPT_MESSAGE, // This message has failed its CRC checksum, exceeds the valid size, has a null key for a compacted topic, or is otherwise corrupt. + UNKNOWN_TOPIC_OR_PARTITION, // This server does not host this topic-partition. + INVALID_FETCH_SIZE, // The requested fetch size is invalid. + LEADER_NOT_AVAILABLE, // There is no leader for this topic-partition as we are in the middle of a leadership election. + NOT_LEADER_OR_FOLLOWER, // For requests intended only for the leader, this error indicates that the broker is not the current leader. + // For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. + REQUEST_TIMED_OUT, // The request timed out. + BROKER_NOT_AVAILABLE, // The broker is not available. + REPLICA_NOT_AVAILABLE, // The replica is not available for the requested topic-partition. Produce/Fetch requests and other requests + // intended only for the leader or follower return NOT_LEADER_OR_FOLLOWER if the broker is not a replica of the topic-partition. + MESSAGE_TOO_LARGE, // The request included a message larger than the max message size the server will accept. + STALE_CONTROLLER_EPOCH, // The controller moved to another broker. + OFFSET_METADATA_TOO_LARGE, // The metadata field of the offset request was too large. + NETWORK_EXCEPTION, // The server disconnected before a response was received. + COORDINATOR_LOAD_IN_PROGRESS, // The coordinator is loading and hence can't process requests. + COORDINATOR_NOT_AVAILABLE, // The coordinator is not available. + NOT_COORDINATOR, // This is not the correct coordinator. + INVALID_TOPIC_EXCEPTION, // The request attempted to perform an operation on an invalid topic. + RECORD_LIST_TOO_LARGE, // The request included message batch larger than the configured segment size on the server. + NOT_ENOUGH_REPLICAS, // Messages are rejected since there are fewer in-sync replicas than required. + NOT_ENOUGH_REPLICAS_AFTER_APPEND, // Messages are written to the log, but to fewer in-sync replicas than required. + INVALID_REQUIRED_ACKS, // Produce request specified an invalid value for required acks. + ILLEGAL_GENERATION, // Specified group generation id is not valid. + INCONSISTENT_GROUP_PROTOCOL, // The group member's supported protocols are incompatible with those of existing members + // or first group member tried to join with empty protocol type or empty protocol list. + INVALID_GROUP_ID, // The configured groupId is invalid. + UNKNOWN_MEMBER_ID, // The coordinator is not aware of this member. + INVALID_SESSION_TIMEOUT, // The session timeout is not within the range allowed by the broker + // (as configured by group.min.session.timeout.ms and group.max.session.timeout.ms). + REBALANCE_IN_PROGRESS, // The group is rebalancing, so a rejoin is needed. + INVALID_COMMIT_OFFSET_SIZE, // The committing offset data size is not valid. + TOPIC_AUTHORIZATION_FAILED, // Topic authorization failed. + GROUP_AUTHORIZATION_FAILED, // Group authorization failed. + CLUSTER_AUTHORIZATION_FAILED, // Cluster authorization failed. + INVALID_TIMESTAMP, // The timestamp of the message is out of acceptable range. + UNSUPPORTED_SASL_MECHANISM, // The broker does not support the requested SASL mechanism. + ILLEGAL_SASL_STATE, // Request is not valid given the current SASL state. + UNSUPPORTED_VERSION, // The version of API is not supported. + TOPIC_ALREADY_EXISTS, // Topic with this name already exists. + INVALID_PARTITIONS, // Number of partitions is below 1. + INVALID_REPLICATION_FACTOR, // Replication factor is below 1 or larger than the number of available brokers. + INVALID_REPLICA_ASSIGNMENT, // Replica assignment is invalid. + INVALID_CONFIG, // Configuration is invalid. + NOT_CONTROLLER, // This is not the correct controller for this cluster. + INVALID_REQUEST, // This most likely occurs because of a request being malformed by the + // client library or the message was sent to an incompatible broker. See the broker logs + // for more details. + UNSUPPORTED_FOR_MESSAGE_FORMAT, // The message format version on the broker does not support the request. + POLICY_VIOLATION, // Request parameters do not satisfy the configured policy. + OUT_OF_ORDER_SEQUENCE_NUMBER, // The broker received an out of order sequence number. + DUPLICATE_SEQUENCE_NUMBER, // The broker received a duplicate sequence number. + INVALID_PRODUCER_EPOCH, // Producer attempted to produce with an old epoch. + INVALID_TXN_STATE, // The producer attempted a transactional operation in an invalid state. + INVALID_PRODUCER_ID_MAPPING, // The producer attempted to use a producer id which is not currently assigned to + // its transactional id. + INVALID_TRANSACTION_TIMEOUT, // The transaction timeout is larger than the maximum value allowed by + // the broker (as configured by transaction.max.timeout.ms). + CONCURRENT_TRANSACTIONS, // The producer attempted to update a transaction + // while another concurrent operation on the same transaction was ongoing. + TRANSACTION_COORDINATOR_FENCED, // Indicates that the transaction coordinator sending a WriteTxnMarker + // is no longer the current coordinator for a given producer. + TRANSACTIONAL_ID_AUTHORIZATION_FAILED, // Transactional Id authorization failed. + SECURITY_DISABLED, // Security features are disabled. + OPERATION_NOT_ATTEMPTED, // The broker did not attempt to execute this operation. This may happen for + // batched RPCs where some operations in the batch failed, causing the broker to respond without + // trying the rest. + KAFKA_STORAGE_ERROR, // Disk error when trying to access log file on the disk. + LOG_DIR_NOT_FOUND, // The user-specified log directory is not found in the broker config. + SASL_AUTHENTICATION_FAILED, // SASL Authentication failed. + UNKNOWN_PRODUCER_ID, // This exception is raised by the broker if it could not locate the producer metadata + // associated with the producerId in question. This could happen if, for instance, the producer's records + // were deleted because their retention time had elapsed. Once the last records of the producerId are + // removed, the producer's metadata is removed from the broker, and future appends by the producer will + // return this exception. + REASSIGNMENT_IN_PROGRESS, // A partition reassignment is in progress. + DELEGATION_TOKEN_AUTH_DISABLED, // Delegation Token feature is not enabled. + DELEGATION_TOKEN_NOT_FOUND, // Delegation Token is not found on server. + DELEGATION_TOKEN_OWNER_MISMATCH, // Specified Principal is not valid Owner/Renewer. + DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, // Delegation Token requests are not allowed on PLAINTEXT/1-way SSL + // channels and on delegation token authenticated channels. + DELEGATION_TOKEN_AUTHORIZATION_FAILED, // Delegation Token authorization failed. + DELEGATION_TOKEN_EXPIRED, // Delegation Token is expired. + INVALID_PRINCIPAL_TYPE, // Supplied principalType is not supported. + NON_EMPTY_GROUP, // The group is not empty. + GROUP_ID_NOT_FOUND, // The group id does not exist. + FETCH_SESSION_ID_NOT_FOUND, // The fetch session ID was not found. + INVALID_FETCH_SESSION_EPOCH, // The fetch session epoch is invalid. + LISTENER_NOT_FOUND, // There is no listener on the leader broker that matches the listener on which + // metadata request was processed. + TOPIC_DELETION_DISABLED, // Topic deletion is disabled. + FENCED_LEADER_EPOCH, // The leader epoch in the request is older than the epoch on the broker. + UNKNOWN_LEADER_EPOCH, // The leader epoch in the request is newer than the epoch on the broker. + UNSUPPORTED_COMPRESSION_TYPE, // The requesting client does not support the compression type of given partition. + STALE_BROKER_EPOCH, // Broker epoch has changed. + OFFSET_NOT_AVAILABLE, // The leader high watermark has not caught up from a recent leader + // election so the offsets cannot be guaranteed to be monotonically increasing. + MEMBER_ID_REQUIRED, // The group member needs to have a valid member id before actually entering a consumer group. + PREFERRED_LEADER_NOT_AVAILABLE, // The preferred leader was not available. + GROUP_MAX_SIZE_REACHED, // The consumer group has reached its max size. + FENCED_INSTANCE_ID, // The broker rejected this static consumer since + // another consumer with the same group.instance.id has registered with a different member.id. + ELIGIBLE_LEADERS_NOT_AVAILABLE, // Eligible topic partition leaders are not available. + ELECTION_NOT_NEEDED, // Leader election not needed for topic partition. + NO_REASSIGNMENT_IN_PROGRESS, // No partition reassignment is in progress. + GROUP_SUBSCRIBED_TO_TOPIC, // Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it. + INVALID_RECORD, // This record has failed the validation on broker and hence will be rejected. + UNSTABLE_OFFSET_COMMIT, // There are unstable offsets that need to be cleared. + THROTTLING_QUOTA_EXCEEDED, // The throttling quota has been exceeded. + PRODUCER_FENCED, // There is a newer producer with the same transactionalId + // which fences the current one. + RESOURCE_NOT_FOUND, // A request illegally referred to a resource that does not exist. + DUPLICATE_RESOURCE, // A request illegally referred to the same resource twice. + UNACCEPTABLE_CREDENTIAL, // Requested credential would not meet criteria for acceptability. + INCONSISTENT_VOTER_SET, // Indicates that the either the sender or recipient of a + // voter-only request is not one of the expected voters + INVALID_UPDATE_VERSION, // The given update version was invalid. + FEATURE_UPDATE_FAILED, // Unable to update finalized features due to an unexpected server error. + PRINCIPAL_DESERIALIZATION_FAILURE, // Request principal deserialization failed during forwarding. + // This indicates an internal error on the broker cluster security setup. + SNAPSHOT_NOT_FOUND, // Requested snapshot was not found + POSITION_OUT_OF_RANGE, // Requested position is not greater than or equal to zero, and less than the size of the snapshot. + UNKNOWN_TOPIC_ID, // This server does not host this topic ID. + DUPLICATE_BROKER_REGISTRATION, // This broker ID is already in use. + BROKER_ID_NOT_REGISTERED, // The given broker ID was not registered. + INCONSISTENT_TOPIC_ID, // The log's topic ID did not match the topic ID in the request + INCONSISTENT_CLUSTER_ID, // The clusterId in the request does not match that found on the server + TRANSACTIONAL_ID_NOT_FOUND, // The transactionalId could not be found + FETCH_SESSION_TOPIC_ID_ERROR, // The fetch session encountered inconsistent topic ID usage + INELIGIBLE_REPLICA, // The new ISR contains at least one ineligible replica. + NEW_LEADER_ELECTED // The AlterPartition request successfully updated the partition state but the leader has changed. +}; + template <typename T> void NormalizeNumber(T& value) { #ifndef WORDS_BIGENDIAN @@ -274,6 +412,8 @@ public: class TApiMessage: public TMessage { public: + using TPtr = std::shared_ptr<TApiMessage>; + ~TApiMessage() = default; virtual i16 ApiKey() const = 0; diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp index 6d1fea98466..d9553d591b8 100644 --- a/ydb/core/kafka_proxy/kafka_connection.cpp +++ b/ydb/core/kafka_proxy/kafka_connection.cpp @@ -1,8 +1,11 @@ #include <library/cpp/actors/core/actor_bootstrapped.h> #include <ydb/core/raw_socket/sock_config.h> +#include <ydb/core/util/address_classifier.h> #include "kafka_connection.h" +#include "kafka_events.h" #include "kafka_messages.h" +#include "kafka_produce_actor.h" #include "kafka_log_impl.h" #include <strstream> @@ -12,6 +15,7 @@ namespace NKafka { using namespace NActors; +using namespace NKikimr; char Hex(const unsigned char c) { return c < 10 ? '0' + c : 'A' + c - 10; @@ -82,6 +86,9 @@ public: bool ConnectionEstablished = false; bool CloseConnection = false; + NAddressClassifier::TLabeledAddressClassifier::TConstPtr DatacenterClassifier; + TString ClientDC; + Msg Request; enum EReadSteps { SIZE_READ, SIZE_PREPARE, INFLIGTH_CHECK, MESSAGE_READ, MESSAGE_PROCESS }; @@ -91,6 +98,10 @@ public: size_t InflightSize; + TActorId ProduceActorId; + + std::unordered_map<ui64, Msg> PendingRequests; + TKafkaConnection(TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address, const NKikimrConfig::TKafkaProxyConfig& config) : Socket(std::move(socket)) @@ -103,33 +114,48 @@ public: IsSslSupported = IsSslSupported && Socket->IsSslSupported(); } - void Bootstrap() { + void Bootstrap(const TActorContext& ctx) { Become(&TKafkaConnection::StateAccepting); Schedule(InactivityTimeout, InactivityEvent = new TEvPollerReady(nullptr, false, false)); - KAFKA_LOG_D("incoming connection opened"); + KAFKA_LOG_I("incoming connection opened " << Address); + + ProduceActorId = ctx.RegisterWithSameMailbox(new TKafkaProduceActor(SelfId(), ClientDC)); + OnAccept(); } void PassAway() override { + KAFKA_LOG_D("PassAway"); + if (ConnectionEstablished) { ConnectionEstablished = false; } + if (ProduceActorId) { + Send(ProduceActorId, new TEvents::TEvPoison()); + } Shutdown(); TBase::PassAway(); } protected: + void LogEvent(IEventHandle& ev) { + KAFKA_LOG_T("Event: " << ev.GetTypeName()); + } + void SetNonBlock() noexcept { Socket->SetNonBlock(); } void Shutdown() { + KAFKA_LOG_D("Shutdown"); + if (Socket) { Socket->Shutdown(); } } ssize_t SocketSend(const void* data, size_t size) { + KAFKA_LOG_T("SocketSend Size=" << size); return Socket->Send(data, size); } @@ -146,7 +172,17 @@ protected: } TString LogPrefix() const { - return TStringBuilder() << "(#" << GetRawSocket() << "," << Address->ToString() << ") "; + TStringBuilder sb; + sb << "TKafkaConnection " << SelfId() << "(#" << GetRawSocket() << "," << Address->ToString() << ") State: "; + auto stateFunc = CurrentStateFunc(); + if (stateFunc == &TKafkaConnection::StateConnected) { + sb << "Connected "; + } else if (stateFunc == &TKafkaConnection::StateAccepting) { + sb << "Accepting "; + } else { + sb << "Unknown "; + } + return sb; } void OnAccept() { @@ -165,9 +201,13 @@ protected: } STATEFN(StateAccepting) { + LogEvent(*ev.Get()); switch (ev->GetTypeRewrite()) { hFunc(TEvPollerReady, HandleAccepting); hFunc(TEvPollerRegisterResult, HandleAccepting); + hFunc(TEvKafka::TEvResponse, Handle); + default: + KAFKA_LOG_ERROR("TKafkaConnection: Unexpected " << ev.Get()->GetTypeName()); } } @@ -177,29 +217,17 @@ protected: InflightSize -= messageSize; } - void HandleMessage(TRequestHeaderData* header, TProduceRequestData* message, size_t messageSize) { - TProduceResponseData response; - response.Responses.resize(message->TopicData.size()); - int i = 0; - for (auto& data : message->TopicData) { - response.Responses[i].Name = data.Name; - response.Responses[i].PartitionResponses.resize(data.PartitionData.size()); - int j = 0; - for (auto& p : data.PartitionData) { - response.Responses[i].PartitionResponses[j].Index = p.Index; - response.Responses[i].PartitionResponses[j].BaseOffset = 40; - - ++j; - } - ++i; - } - - Reply(header, &response); + void HandleMessage(const TRequestHeaderData* header, const TProduceRequestData* message, size_t /*messageSize*/) { + PendingRequests[header->CorrelationId] = std::move(Request); + Send(ProduceActorId, new TEvKafka::TEvProduceRequest(header->CorrelationId, message)); + } - InflightSize -= messageSize; + void Handle(TEvKafka::TEvResponse::TPtr response) { + auto r = response->Get(); + Reply(r->Cookie, r->Response.get()); } - void HandleMessage(TRequestHeaderData* header, TInitProducerIdRequestData* /*message*/, size_t messageSize) { + void HandleMessage(const TRequestHeaderData* header, const TInitProducerIdRequestData* /*message*/, size_t messageSize) { TInitProducerIdResponseData response; response.ProducerEpoch = 1; response.ProducerId = 1; @@ -211,7 +239,7 @@ protected: InflightSize -= messageSize; } - void HandleMessage(TRequestHeaderData* header, TMetadataRequestData* message, size_t messageSize) { + void HandleMessage(const TRequestHeaderData* header, const TMetadataRequestData* message, size_t messageSize) { TMetadataResponseData response; response.ThrottleTimeMs = 0; response.ClusterId = "cluster-ahjgk"; @@ -266,8 +294,24 @@ protected: } } + void Reply(const ui64 cookie, const TApiMessage* response) { + auto it = PendingRequests.find(cookie); + if (it == PendingRequests.end()) { + KAFKA_LOG_ERROR("Unexpected cookie " << cookie); + return; + } + + auto& request = it->second; + Reply(&request.Header, response); + + InflightSize -= request.ExpectedSize; + + PendingRequests.erase(it); + + DoRead(); + } + void Reply(const TRequestHeaderData* header, const TApiMessage* reply) { - // TODO improve allocation TKafkaVersion headerVersion = ResponseHeaderVersion(header->RequestApiKey, header->RequestApiVersion); TKafkaVersion version = header->RequestApiVersion; @@ -284,9 +328,13 @@ protected: reply->Write(writable, version); buffer.flush(); + + KAFKA_LOG_D("Sent reply: ApiKey=" << header->RequestApiKey << ", Version=" << version << ", Correlation=" << responseHeader.CorrelationId << ", Size=" << size); } void DoRead() { + KAFKA_LOG_T("DoRead: Demand=" << Demand.Length << ", Step=" << static_cast<i32>(Step)); + for (;;) { while (Demand) { ssize_t received = 0; @@ -296,16 +344,13 @@ protected: } else if (-res == EINTR) { continue; } else if (!res) { - KAFKA_LOG_ERROR("connection closed"); + KAFKA_LOG_I("connection closed"); return PassAway(); } else if (res < 0) { - KAFKA_LOG_ERROR("connection closed - error in recv: " << strerror(-res)); + KAFKA_LOG_I("connection closed - error in recv: " << strerror(-res)); return PassAway(); } received = res; - if (!received) { - return; - } Request.Size += received; Demand.Buffer += received; @@ -408,9 +453,13 @@ protected: } STATEFN(StateConnected) { + LogEvent(*ev.Get()); switch (ev->GetTypeRewrite()) { hFunc(TEvPollerReady, HandleConnected); hFunc(TEvPollerRegisterResult, HandleConnected); + hFunc(TEvKafka::TEvResponse, Handle); + default: + KAFKA_LOG_ERROR("TKafkaConnection: Unexpected " << ev.Get()->GetTypeName()); } } }; diff --git a/ydb/core/kafka_proxy/kafka_events.h b/ydb/core/kafka_proxy/kafka_events.h new file mode 100644 index 00000000000..f674fe98041 --- /dev/null +++ b/ydb/core/kafka_proxy/kafka_events.h @@ -0,0 +1,51 @@ +#pragma once + +#include <library/cpp/actors/core/event_local.h> +#include <ydb/core/base/events.h> + +#include "kafka_messages.h" + +using namespace NActors; + +namespace NKafka { + +struct TEvKafka { + enum EEv { + EvRequest = EventSpaceBegin(NKikimr::TKikimrEvents::TKikimrEvents::ES_KAFKA), + EvProduceRequest, + EvWakeup, + EvResponse = EvRequest + 256, + EvInternalEvents = EvResponse + 256, + EvEnd + }; + + static_assert( + EvEnd < EventSpaceEnd(NKikimr::TKikimrEvents::TKikimrEvents::ES_KAFKA), + "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_KAFKA)"); + + + struct TEvProduceRequest : public TEventLocal<TEvProduceRequest, EvProduceRequest> { + TEvProduceRequest(const ui64 cookie, const TProduceRequestData* request) + : Cookie(cookie) + , Request(request) + {} + + ui64 Cookie; + const TProduceRequestData* Request; + }; + + struct TEvResponse : public TEventLocal<TEvResponse, EvResponse> { + TEvResponse(const ui64 cookie, const TApiMessage::TPtr response) + : Cookie(cookie) + , Response(std::move(response)) { + } + + ui64 Cookie; + const TApiMessage::TPtr Response; + }; + + struct TEvWakeup : public TEventLocal<TEvWakeup, EvWakeup> { + }; +}; + +} // namespace NKafka diff --git a/ydb/core/kafka_proxy/kafka_messages.h b/ydb/core/kafka_proxy/kafka_messages.h index a118fc0128d..28c8d2388d2 100644 --- a/ydb/core/kafka_proxy/kafka_messages.h +++ b/ydb/core/kafka_proxy/kafka_messages.h @@ -30,6 +30,8 @@ enum EApiKey { class TRequestHeaderData : public TApiMessage { public: + typedef std::shared_ptr<TRequestHeaderData> TPtr; + struct MessageMeta { static constexpr TKafkaVersions PresentVersions = {0, 2}; static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()}; @@ -109,6 +111,8 @@ public: class TResponseHeaderData : public TApiMessage { public: + typedef std::shared_ptr<TResponseHeaderData> TPtr; + struct MessageMeta { static constexpr TKafkaVersions PresentVersions = {0, 1}; static constexpr TKafkaVersions FlexibleVersions = {1, Max<TKafkaVersion>()}; @@ -143,6 +147,8 @@ public: class TProduceRequestData : public TApiMessage { public: + typedef std::shared_ptr<TProduceRequestData> TPtr; + struct MessageMeta { static constexpr TKafkaVersions PresentVersions = {0, 9}; static constexpr TKafkaVersions FlexibleVersions = {9, Max<TKafkaVersion>()}; @@ -317,6 +323,8 @@ public: class TProduceResponseData : public TApiMessage { public: + typedef std::shared_ptr<TProduceResponseData> TPtr; + struct MessageMeta { static constexpr TKafkaVersions PresentVersions = {0, 9}; static constexpr TKafkaVersions FlexibleVersions = {9, Max<TKafkaVersion>()}; @@ -585,6 +593,8 @@ public: class TFetchRequestData : public TApiMessage { public: + typedef std::shared_ptr<TFetchRequestData> TPtr; + struct MessageMeta { static constexpr TKafkaVersions PresentVersions = {0, 13}; static constexpr TKafkaVersions FlexibleVersions = {12, Max<TKafkaVersion>()}; @@ -1005,6 +1015,8 @@ public: class TFetchResponseData : public TApiMessage { public: + typedef std::shared_ptr<TFetchResponseData> TPtr; + struct MessageMeta { static constexpr TKafkaVersions PresentVersions = {0, 13}; static constexpr TKafkaVersions FlexibleVersions = {12, Max<TKafkaVersion>()}; @@ -1518,6 +1530,8 @@ public: class TMetadataRequestData : public TApiMessage { public: + typedef std::shared_ptr<TMetadataRequestData> TPtr; + struct MessageMeta { static constexpr TKafkaVersions PresentVersions = {0, 12}; static constexpr TKafkaVersions FlexibleVersions = {9, Max<TKafkaVersion>()}; @@ -1645,6 +1659,8 @@ public: class TMetadataResponseData : public TApiMessage { public: + typedef std::shared_ptr<TMetadataResponseData> TPtr; + struct MessageMeta { static constexpr TKafkaVersions PresentVersions = {0, 12}; static constexpr TKafkaVersions FlexibleVersions = {9, Max<TKafkaVersion>()}; @@ -2066,6 +2082,8 @@ public: class TApiVersionsRequestData : public TApiMessage { public: + typedef std::shared_ptr<TApiVersionsRequestData> TPtr; + struct MessageMeta { static constexpr TKafkaVersions PresentVersions = {0, 3}; static constexpr TKafkaVersions FlexibleVersions = {3, Max<TKafkaVersion>()}; @@ -2115,6 +2133,8 @@ public: class TApiVersionsResponseData : public TApiMessage { public: + typedef std::shared_ptr<TApiVersionsResponseData> TPtr; + struct MessageMeta { static constexpr TKafkaVersions PresentVersions = {0, 3}; static constexpr TKafkaVersions FlexibleVersions = {3, Max<TKafkaVersion>()}; @@ -2432,6 +2452,8 @@ public: class TInitProducerIdRequestData : public TApiMessage { public: + typedef std::shared_ptr<TInitProducerIdRequestData> TPtr; + struct MessageMeta { static constexpr TKafkaVersions PresentVersions = {0, 4}; static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()}; @@ -2511,6 +2533,8 @@ public: class TInitProducerIdResponseData : public TApiMessage { public: + typedef std::shared_ptr<TInitProducerIdResponseData> TPtr; + struct MessageMeta { static constexpr TKafkaVersions PresentVersions = {0, 4}; static constexpr TKafkaVersions FlexibleVersions = {2, Max<TKafkaVersion>()}; diff --git a/ydb/core/kafka_proxy/kafka_messages_int.h b/ydb/core/kafka_proxy/kafka_messages_int.h index 69cbaea98ef..95172b82289 100644 --- a/ydb/core/kafka_proxy/kafka_messages_int.h +++ b/ydb/core/kafka_proxy/kafka_messages_int.h @@ -5,6 +5,7 @@ #include <vector> #include <util/generic/array_ref.h> +#include <util/generic/ptr.h> #include <util/generic/yexception.h> #include <contrib/libs/cxxsupp/libcxx/include/type_traits> diff --git a/ydb/core/kafka_proxy/kafka_produce_actor.cpp b/ydb/core/kafka_proxy/kafka_produce_actor.cpp new file mode 100644 index 00000000000..8a3ff23d281 --- /dev/null +++ b/ydb/core/kafka_proxy/kafka_produce_actor.cpp @@ -0,0 +1,526 @@ +#include "kafka_produce_actor.h" + +#include <contrib/libs/protobuf/src/google/protobuf/util/time_util.h> + +#include <ydb/core/base/path.h> +#include <ydb/core/protos/grpc_pq_old.pb.h> + +namespace NKafka { + +static constexpr TDuration WAKEUP_INTERVAL = TDuration::Seconds(1); +static constexpr TDuration TOPIC_NOT_FOUND_EXPIRATION_INTERVAL = TDuration::Seconds(15); +static constexpr TDuration REQUEST_EXPIRATION_INTERVAL = TDuration::Seconds(30); +static constexpr TDuration WRITER_EXPIRATION_INTERVAL = TDuration::Minutes(5); + +TString TKafkaProduceActor::LogPrefix() { + TStringBuilder sb; + sb << "TKafkaProduceActor " << SelfId() << " State: "; + auto stateFunc = CurrentStateFunc(); + if (stateFunc == &TKafkaProduceActor::StateInit) { + sb << "Init "; + } else if (stateFunc == &TKafkaProduceActor::StateWork) { + sb << "Work "; + } else if (stateFunc == &TKafkaProduceActor::StateAccepting) { + sb << "Accepting "; + } else { + sb << "Unknown "; + } + return sb; +} + +void TKafkaProduceActor::LogEvent(IEventHandle& ev) { + KAFKA_LOG_T("Received event: " << ev.GetTypeName()); +} + +void TKafkaProduceActor::Bootstrap(const NActors::TActorContext& /*ctx*/) { + Schedule(WAKEUP_INTERVAL, new TEvKafka::TEvWakeup()); + Become(&TKafkaProduceActor::StateWork); +} + +void TKafkaProduceActor::Handle(TEvKafka::TEvWakeup::TPtr /*request*/, const TActorContext& ctx) { + KAFKA_LOG_D("Wakeup"); + + SendResults(ctx); + CleanTopics(ctx); + CleanWriters(ctx); + + Schedule(WAKEUP_INTERVAL, new TEvKafka::TEvWakeup()); + + KAFKA_LOG_T("Wakeup was completed successfully"); +} + +void TKafkaProduceActor::PassAway() { + KAFKA_LOG_D("PassAway"); + + for(const auto& [_, partitionWriters] : Writers) { + for(const auto& [_, w] : partitionWriters) { + Send(w.ActorId, new TEvents::TEvPoison()); + } + } + + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove()); + + TActorBootstrapped::PassAway(); + + KAFKA_LOG_T("PassAway was completed successfully"); +} + +void TKafkaProduceActor::CleanTopics(const TActorContext& ctx) { + const auto expired = ctx.Now() - TOPIC_NOT_FOUND_EXPIRATION_INTERVAL; + + std::map<TString, TTopicInfo> newTopics; + for(auto& [topicPath, topicInfo] : Topics) { + if (!topicInfo.NotFound || topicInfo.NotFoundTime > expired) { + newTopics[topicPath] = std::move(topicInfo); + } + } + Topics = std::move(newTopics); +} + +void TKafkaProduceActor::CleanWriters(const TActorContext& ctx) { + KAFKA_LOG_D("CleanWriters"); + const auto expired = ctx.Now() - WRITER_EXPIRATION_INTERVAL; + + for (auto& [topicPath, partitionWriters] : Writers) { + std::unordered_map<ui32, TWriterInfo> newPartitionWriters; + for (const auto& [partitionId, writerInfo] : partitionWriters) { + if (writerInfo.LastAccessed > expired) { + newPartitionWriters[partitionId] = writerInfo; + } else { + TStringBuilder sb; + sb << "Destroing inactive PartitionWriter. Topic='" << topicPath << "', Partition=" << partitionId; + KAFKA_LOG_D(sb); + Send(writerInfo.ActorId, new TEvents::TEvPoison()); + } + } + partitionWriters = std::move(newPartitionWriters); + } + + KAFKA_LOG_T("CleanWriters was completed successfully"); +} + +void TKafkaProduceActor::EnqueueRequest(TEvKafka::TEvProduceRequest::TPtr request, const TActorContext& /*ctx*/) { + Requests.push_back(request); +} + +void TKafkaProduceActor::HandleInit(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) { + auto* navigate = ev.Get()->Get()->Request.Get(); + for (auto& info : navigate->ResultSet) { + if (NSchemeCache::TSchemeCacheNavigate::EStatus::Ok == info.Status) { + auto topicPath = "/" + NKikimr::JoinPath(info.Path); + KAFKA_LOG_D("Received topic '" << topicPath << "' description"); + TopicsForInitialization.erase(topicPath); + + auto& topic = Topics[topicPath]; + for(auto& p : info.PQGroupInfo->Description.GetPartitions()) { + topic.partitions[p.GetPartitionId()] = p.GetTabletId(); + } + + auto pathId = info.TableId.PathId; + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchPathId(pathId)); + } + } + + for(auto& topicPath : TopicsForInitialization) { + KAFKA_LOG_D("Topic '" << topicPath << "' not found"); + auto& topicInfo = Topics[topicPath]; + topicInfo.NotFound = true; + topicInfo.NotFoundTime = ctx.Now(); + } + + TopicsForInitialization.clear(); + + Become(&TKafkaProduceActor::StateWork); + + KAFKA_LOG_T("HandleInit(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr) was completed successfully"); + + ProcessRequests(ctx); +} + +void TKafkaProduceActor::Handle(TEvTxProxySchemeCache::TEvWatchNotifyDeleted::TPtr& ev, const TActorContext& ctx) { + auto& path = ev->Get()->Path; + KAFKA_LOG_I("Topic '" << path << "' was deleted"); + + auto it = Writers.find(path); + if (it != Writers.end()) { + for(auto& [_, writer] : it->second) { + Send(writer.ActorId, new TEvents::TEvPoison()); + } + Writers.erase(it); + } + + auto& topicInfo = Topics[path]; + topicInfo.NotFound = true; + topicInfo.NotFoundTime = ctx.Now(); + topicInfo.partitions.clear(); +} + +void TKafkaProduceActor::Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& /*ctx*/) { + auto* e = ev->Get(); + auto& path = e->Path; + KAFKA_LOG_I("Topic '" << path << "' was updated"); + + auto& topic = Topics[path]; + topic.partitions.clear(); + for (auto& p : e->Result->GetPathDescription().GetPersQueueGroup().GetPartitions()) { + topic.NotFound = false; + topic.partitions[p.GetPartitionId()] = p.GetTabletId(); + } +} + +void TKafkaProduceActor::Handle(TEvKafka::TEvProduceRequest::TPtr request, const TActorContext& ctx) { + Requests.push_back(request); + ProcessRequests(ctx); +} + +void TKafkaProduceActor::ProcessRequests(const TActorContext& ctx) { + if (&TKafkaProduceActor::StateWork != CurrentStateFunc()) { + KAFKA_LOG_ERROR("Unexpected state"); + return; + } + + if (Requests.empty()) { + return; + } + + if (EnqueueInitialization()) { + PendingRequests.push_back({Requests.front()}); + Requests.pop_front(); + + ProcessRequest(PendingRequests.back(), ctx); + } else { + ProcessInitializationRequests(ctx); + } +} + +size_t TKafkaProduceActor::EnqueueInitialization() { + size_t canProcess = 0; + bool requireInitialization = false; + + for(const auto& e : Requests) { + auto* r = e->Get()->Request; + for(const auto& topicData : r->TopicData) { + const auto& topicPath = *topicData.Name; + if (!Topics.contains(topicPath)) { + requireInitialization = true; + TopicsForInitialization.insert(topicPath); + } + } + if (!requireInitialization) { + ++canProcess; + } + } + + return canProcess; +} + +THolder<TEvPartitionWriter::TEvWriteRequest> Convert(const TProduceRequestData::TTopicProduceData::TPartitionProduceData& data, + const TString& topicName, + ui64 cookie, + const TString& clientDC) { + auto ev = MakeHolder<TEvPartitionWriter::TEvWriteRequest>(); + auto& request = ev->Record; + + const auto& batch = data.Records; + const TString sourceId = TStringBuilder() << batch->ProducerId; + + auto* partitionRequest = request.MutablePartitionRequest(); + partitionRequest->SetTopic(topicName); + partitionRequest->SetPartition(data.Index); + // partitionRequest->SetCmdWriteOffset(); + partitionRequest->SetCookie(cookie); + // partitionRequest->SetPutUnitsSize(); TODO + + for (const auto& record : batch->Records) { + if (!record.Value) { + continue; + } + + NKikimrPQClient::TDataChunk proto; + for(auto& h : record.Headers) { + auto res = proto.AddMessageMeta(); + res->set_key(static_cast<const char*>(h.Key->data()), h.Key->size()); + res->set_value(static_cast<const char*>(h.Value->data()), h.Value->size()); + } + + { + auto res = proto.AddMessageMeta(); + res->set_key("__key"); + res->set_value(static_cast<const char*>(record.Key->data()), record.Key->size()); + } + + proto.SetData(static_cast<const void*>(record.Value->data()), record.Value->size()); + + TString str; + bool res = proto.SerializeToString(&str); + Y_VERIFY(res); + + auto w = partitionRequest->AddCmdWrite(); + + w->SetSourceId(sourceId); + w->SetSeqNo(batch->BaseOffset + record.OffsetDelta); + w->SetData(str); + w->SetCreateTimeMS(batch->BaseTimestamp + record.TimestampDelta); + w->SetDisableDeduplication(true); + w->SetUncompressedSize(record.Value->size()); + w->SetClientDC(clientDC); + w->SetIgnoreQuotaDeadline(true); + } + + return ev; +} + +size_t PartsCount(const TProduceRequestData* r) { + size_t result = 0; + for(const auto& topicData : r->TopicData) { + result += topicData.PartitionData.size(); + } + return result; +} + +void TKafkaProduceActor::ProcessRequest(TPendingRequest& pendingRequest, const TActorContext& ctx) { + auto* r = pendingRequest.Request->Get()->Request; + + pendingRequest.Results.resize(PartsCount(r)); + pendingRequest.StartTime = ctx.Now(); + + size_t position = 0; + for(const auto& topicData : r->TopicData) { + const TString& topicPath = *topicData.Name; + for(const auto& partitionData : topicData.PartitionData) { + const auto partitionId = partitionData.Index; + + auto writerId = PartitionWriter(topicPath, partitionId, ctx); + if (writerId) { + auto ownCookie = ++Cookie; + auto& cookieInfo = Cookies[ownCookie]; + cookieInfo.TopicPath = topicPath; + cookieInfo.PartitionId = partitionId; + cookieInfo.Position = position; + cookieInfo.Request = &pendingRequest; + + pendingRequest.WaitAcceptingCookies.insert(ownCookie); + pendingRequest.WaitResultCookies.insert(ownCookie); + + auto ev = Convert(partitionData, *topicData.Name, ownCookie, ClientDC); + + Send(writerId, std::move(ev)); + } else { + auto& result = pendingRequest.Results[position]; + result.ErrorCode = EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION; + } + + ++position; + } + } + + if (pendingRequest.WaitResultCookies.empty()) { + // All request for unknown topic or empty request + SendResults(ctx); + } else { + Become(&TKafkaProduceActor::StateAccepting); + } +} + +void TKafkaProduceActor::HandleAccepting(TEvPartitionWriter::TEvWriteAccepted::TPtr request, const TActorContext& ctx) { + auto r = request->Get(); + auto cookie = r->Cookie; + + auto it = Cookies.find(cookie); + if (it == Cookies.end()) { + KAFKA_LOG_W("Received TEvWriteAccepted with unexpected cookie " << cookie); + return; + } + + auto& cookieInfo = it->second; + auto& expectedCookies = cookieInfo.Request->WaitAcceptingCookies; + expectedCookies.erase(cookie); + + if (expectedCookies.empty()) { + Become(&TKafkaProduceActor::StateWork); + ProcessRequests(ctx); + } +} + +void TKafkaProduceActor::Handle(TEvPartitionWriter::TEvInitResult::TPtr request, const TActorContext& /*ctx*/) { + KAFKA_LOG_D("Init " << request->Get()->ToString()); +} + +void TKafkaProduceActor::Handle(TEvPartitionWriter::TEvWriteResponse::TPtr request, const TActorContext& ctx) { + auto r = request->Get(); + auto cookie = r->Record.GetPartitionResponse().GetCookie(); + + auto it = Cookies.find(cookie); + if (it == Cookies.end()) { + KAFKA_LOG_W("Received TEvWriteResponse with unexpected cookie " << cookie); + return; + } + + auto& cookieInfo = it->second; + auto& partitionResult = cookieInfo.Request->Results[cookieInfo.Position]; + partitionResult.ErrorCode = EKafkaErrors::NONE_ERROR; + partitionResult.Value = request; + cookieInfo.Request->WaitResultCookies.erase(cookie); + + Cookies.erase(it); + + if (!r->IsSuccess()) { + auto wit = Writers.find(cookieInfo.TopicPath); + if (wit != Writers.end()) { + auto& partitions = wit->second; + auto pit = partitions.find(cookieInfo.PartitionId); + if (pit != partitions.end()) { + Send(pit->second.ActorId, new TEvents::TEvPoison()); + partitions.erase(pit); + } + } + } + + if (cookieInfo.Request->WaitResultCookies.empty()) { + SendResults(ctx); + } +} + +void TKafkaProduceActor::SendResults(const TActorContext& ctx) { + auto expireTime = ctx.Now() - REQUEST_EXPIRATION_INTERVAL; + + KAFKA_LOG_T("Sending results. QueueSize= " << PendingRequests.size() << ", ExpirationTime=" << expireTime); + + // We send the results in the order of receipt of the request + while (!PendingRequests.empty()) { + auto& pendingRequest = PendingRequests.front(); + + // We send the response by timeout. This is possible, for example, if the event was lost or the PartitionWrite died. + bool expired = expireTime > pendingRequest.StartTime; + + if (!expired && !pendingRequest.WaitResultCookies.empty()) { + return; + } + + auto* r = pendingRequest.Request->Get()->Request; + auto cookie = pendingRequest.Request->Get()->Cookie; + + KAFKA_LOG_D("Send result for cookie " << cookie << ". Expired=" << expired); + + const auto topicsCount = r->TopicData.size(); + auto response = std::make_shared<TProduceResponseData>(); + response->Responses.resize(topicsCount); + + size_t position = 0; + + for(size_t i = 0; i < topicsCount; ++i) { + const auto& topicData = r->TopicData[i]; + const auto partitionCount = topicData.PartitionData.size(); + + auto& topicResponse = response->Responses[i]; + topicResponse.Name = topicData.Name; + topicResponse.PartitionResponses.resize(partitionCount); + + for(size_t j = 0; j < partitionCount; ++j) { + const auto& partitionData = topicData.PartitionData[j]; + auto& partitionResponse = topicResponse.PartitionResponses[j]; + const auto& result = pendingRequest.Results[position++]; + + partitionResponse.Index = partitionData.Index; + + if (EKafkaErrors::NONE_ERROR != result.ErrorCode) { + partitionResponse.ErrorCode = result.ErrorCode; + partitionResponse.ErrorMessage = result.ErrorMessage; + } else { + auto* msg = result.Value->Get(); + if (msg->IsSuccess()) { + partitionResponse.ErrorCode = EKafkaErrors::NONE_ERROR; + auto& writeResults = msg->Record.GetPartitionResponse().GetCmdWriteResult(); + + if (!writeResults.empty()) { + auto& lastResult = writeResults.at(writeResults.size() - 1); + partitionResponse.LogAppendTimeMs = lastResult.GetWriteTimestampMS(); + partitionResponse.BaseOffset = lastResult.GetSeqNo(); + } + } else { + partitionResponse.ErrorCode = EKafkaErrors::UNKNOWN_SERVER_ERROR; + partitionResponse.ErrorMessage = msg->GetError().Reason; + } + } + } + } + + Send(Client, new TEvKafka::TEvResponse(cookie, response)); + + if (!pendingRequest.WaitAcceptingCookies.empty()) { + if (!expired) { + TStringBuilder sb; + sb << "All TEvWriteResponse were received, but not all TEvWriteAccepted. Unreceived cookies:"; + for(auto cookie : pendingRequest.WaitAcceptingCookies) { + sb << " " << cookie; + } + KAFKA_LOG_W(sb); + } + if (&TKafkaProduceActor::StateAccepting == CurrentStateFunc()) { + Become(&TKafkaProduceActor::StateWork); + } + } + + PendingRequests.pop_front(); + } +} + +void TKafkaProduceActor::ProcessInitializationRequests(const TActorContext& ctx) { + if (TopicsForInitialization.empty()) { + return; + } + + Become(&TKafkaProduceActor::StateInit); + + auto request = std::make_unique<NSchemeCache::TSchemeCacheNavigate>(); + + for(auto& topicPath : TopicsForInitialization) { + KAFKA_LOG_D("Describe topic '" << topicPath << "'"); + NSchemeCache::TSchemeCacheNavigate::TEntry entry; + entry.Path = NKikimr::SplitPath(topicPath); + entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList; + entry.SyncVersion = true; + + request->ResultSet.emplace_back(entry); + } + + ctx.Send(MakeSchemeCacheID(), MakeHolder<TEvTxProxySchemeCache::TEvNavigateKeySet>(request.release())); +} + +TActorId TKafkaProduceActor::PartitionWriter(const TString& topicPath, ui32 partitionId, const TActorContext& ctx) { + auto& partitionWriters = Writers[topicPath]; + auto itp = partitionWriters.find(partitionId); + if (itp != partitionWriters.end()) { + auto& writerInfo = itp->second; + writerInfo.LastAccessed = ctx.Now(); + return writerInfo.ActorId; + } + + auto it = Topics.find(topicPath); + if (it == Topics.end()) { + KAFKA_LOG_ERROR("Internal error: topic '" << topicPath << "' isn`t initialized"); + return TActorId{}; + } + + auto& topicInfo = it->second; + if (topicInfo.NotFound) { + return TActorId{}; + } + + auto& partitions = topicInfo.partitions; + auto pit = partitions.find(partitionId); + if (pit == partitions.end()) { + return TActorId{}; + } + + auto tabletId = pit->second; + auto* writerActor = CreatePartitionWriter(SelfId(), tabletId, partitionId, SourceId, + TPartitionWriterOpts().WithDeduplication(false)); + + auto& writerInfo = partitionWriters[partitionId]; + writerInfo.ActorId = ctx.RegisterWithSameMailbox(writerActor); + writerInfo.LastAccessed = ctx.Now(); + return writerInfo.ActorId; +} + +} // namespace NKafka diff --git a/ydb/core/kafka_proxy/kafka_produce_actor.h b/ydb/core/kafka_proxy/kafka_produce_actor.h new file mode 100644 index 00000000000..c618da54940 --- /dev/null +++ b/ydb/core/kafka_proxy/kafka_produce_actor.h @@ -0,0 +1,186 @@ +#pragma once + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <ydb/core/base/tablet_pipe.h> +#include <ydb/core/persqueue/writer/writer.h> +#include <ydb/core/tx/scheme_cache/scheme_cache.h> + +#include "kafka_events.h" + +namespace NKafka { + +using namespace NKikimr; +using namespace NKikimr::NPQ; +using namespace NKikimrClient; + +// +// This actor handles write requests. +// Each request can contain data for writing to several topics, and in each topic to several partitions. +// When a request to write to an unknown topic arrives, the actor changes the state to Init until it receives +// information about all the topics needed to process the request. +// +// Requests are processed in parallel, but it is guaranteed that the recording order will be preserved. +// The order of responses to requests is also guaranteed. +// +// When the request begins to be processed, the actor enters the Accepting state. In this state, responses +// are expected from all TPartitionWriters confirming acceptance of the request (TEvWriteAccepted). After that, +// the actor switches back to the Work state. This guarantees the order of writing to each partition. +// +class TKafkaProduceActor: public NActors::TActorBootstrapped<TKafkaProduceActor> { + struct TPendingRequest; +public: + TKafkaProduceActor(const TActorId& client, const TString& clientDC) + : Client(client) + , ClientDC(clientDC) { + } + + void Bootstrap(const NActors::TActorContext& ctx); + + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::KAFKA_PRODUCE_ACTOR; } + +private: + void PassAway() override; + + // Handlers for many StateFunc + void Handle(TEvKafka::TEvWakeup::TPtr request, const TActorContext& ctx); + void Handle(TEvPartitionWriter::TEvWriteResponse::TPtr request, const TActorContext& ctx); + void Handle(TEvPartitionWriter::TEvInitResult::TPtr request, const TActorContext& ctx); + void EnqueueRequest(TEvKafka::TEvProduceRequest::TPtr request, const TActorContext& ctx); + void Handle(TEvTxProxySchemeCache::TEvWatchNotifyDeleted::TPtr& ev, const TActorContext& ctx); + void Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx); + + // StateInit - describe topics + void HandleInit(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx); + + STATEFN(StateInit) { + LogEvent(*ev.Get()); + switch (ev->GetTypeRewrite()) { + HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleInit); + + HFunc(TEvKafka::TEvProduceRequest, EnqueueRequest); + HFunc(TEvPartitionWriter::TEvInitResult, Handle); + HFunc(TEvPartitionWriter::TEvWriteResponse, Handle); + + HFunc(TEvTxProxySchemeCache::TEvWatchNotifyDeleted, Handle); + HFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle); + + HFunc(TEvKafka::TEvWakeup, Handle); + sFunc(TEvents::TEvPoison, PassAway); + } + } + + // StateWork - processing messages + void Handle(TEvKafka::TEvProduceRequest::TPtr request, const TActorContext& ctx); + + STATEFN(StateWork) { + LogEvent(*ev.Get()); + switch (ev->GetTypeRewrite()) { + HFunc(TEvKafka::TEvProduceRequest, Handle); + HFunc(TEvPartitionWriter::TEvInitResult, Handle); + HFunc(TEvPartitionWriter::TEvWriteResponse, Handle); + + HFunc(TEvTxProxySchemeCache::TEvWatchNotifyDeleted, Handle); + HFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle); + + HFunc(TEvKafka::TEvWakeup, Handle); + sFunc(TEvents::TEvPoison, PassAway); + } + } + + // StateAccepting - enqueue ProduceRequest parts to PartitionWriters + // This guarantees the order of responses according order of request + void HandleAccepting(TEvPartitionWriter::TEvWriteAccepted::TPtr request, const TActorContext& ctx); + + STATEFN(StateAccepting) { + LogEvent(*ev.Get()); + switch (ev->GetTypeRewrite()) { + HFunc(TEvPartitionWriter::TEvWriteAccepted, HandleAccepting); + + HFunc(TEvKafka::TEvProduceRequest, EnqueueRequest); + HFunc(TEvPartitionWriter::TEvInitResult, Handle); + HFunc(TEvPartitionWriter::TEvWriteResponse, Handle); + + HFunc(TEvTxProxySchemeCache::TEvWatchNotifyDeleted, Handle); + HFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle); + + HFunc(TEvKafka::TEvWakeup, Handle); + sFunc(TEvents::TEvPoison, PassAway); + } + } + + + // Logic + void ProcessRequests(const TActorContext& ctx); + void ProcessRequest(TPendingRequest& pendingRequest, const TActorContext& ctx); + + void SendResults(const TActorContext& ctx); + + size_t EnqueueInitialization(); + void ProcessInitializationRequests(const TActorContext& ctx); + void CleanTopics(const TActorContext& ctx); + void CleanWriters(const TActorContext& ctx); + + TActorId PartitionWriter(const TString& topicPath, ui32 partitionId, const TActorContext& ctx); + + TString LogPrefix(); + void LogEvent(IEventHandle& ev); + +private: + const TActorId Client; + TString SourceId; + + TString ClientDC; + + ui64 Cookie = 0; + TDeque<TEvKafka::TEvProduceRequest::TPtr> Requests; + + struct TPendingRequest { + TPendingRequest(TEvKafka::TEvProduceRequest::TPtr request) + : Request(request) { + } + + TEvKafka::TEvProduceRequest::TPtr Request; + + struct TPartitionResult { + EKafkaErrors ErrorCode = EKafkaErrors::REQUEST_TIMED_OUT; + TString ErrorMessage; + TEvPartitionWriter::TEvWriteResponse::TPtr Value; + }; + std::vector<TPartitionResult> Results; + + std::set<ui64> WaitAcceptingCookies; + std::set<ui64> WaitResultCookies; + + TInstant StartTime; + }; + TDeque<TPendingRequest> PendingRequests; + + struct TCookieInfo { + TString TopicPath; + ui32 PartitionId; + size_t Position; + + TPendingRequest* Request; + }; + std::map<ui64, TCookieInfo> Cookies; + + std::set<TString> TopicsForInitialization; + + struct TTopicInfo { + bool NotFound = false; + TInstant NotFoundTime; + + // partitioId -> tabletId + std::unordered_map<ui32, ui64> partitions; + }; + std::map<TString, TTopicInfo> Topics; + + struct TWriterInfo { + TActorId ActorId; + TInstant LastAccessed; + }; + // TopicPath -> PartitionId -> TPartitionWriter + std::unordered_map<TString, std::unordered_map<ui32, TWriterInfo>> Writers; +}; + +} diff --git a/ydb/core/kafka_proxy/ya.make b/ydb/core/kafka_proxy/ya.make index eec08b69054..ce30eca927e 100644 --- a/ydb/core/kafka_proxy/ya.make +++ b/ydb/core/kafka_proxy/ya.make @@ -11,6 +11,8 @@ SRCS( kafka_messages.h kafka_messages_int.cpp kafka_messages_int.h + kafka_produce_actor.cpp + kafka_produce_actor.h kafka_proxy.h kafka_records.cpp ) diff --git a/ydb/core/persqueue/blob.h b/ydb/core/persqueue/blob.h index 0410a6b7d01..4b99f0b8000 100644 --- a/ydb/core/persqueue/blob.h +++ b/ydb/core/persqueue/blob.h @@ -88,7 +88,7 @@ struct TClientBlob { return !PartData || PartData->PartNo + 1 == PartData->TotalParts; } - static const ui32 OVERHEAD = sizeof(ui32)/*totalSize*/ + sizeof(ui64)/*SeqNo*/ + sizeof(ui16) /*SourceId*/ + sizeof(ui64) /*WriteTimestamp*/ + sizeof(ui64) /*CreateTimestamp*/; + static constexpr ui32 OVERHEAD = sizeof(ui32)/*totalSize*/ + sizeof(ui64)/*SeqNo*/ + sizeof(ui16) /*SourceId*/ + sizeof(ui64) /*WriteTimestamp*/ + sizeof(ui64) /*CreateTimestamp*/; void SerializeTo(TBuffer& buffer) const; static TClientBlob Deserialize(const char *data, ui32 size); diff --git a/ydb/core/persqueue/writer/writer.cpp b/ydb/core/persqueue/writer/writer.cpp index df0f0925ee2..57495befe2b 100644 --- a/ydb/core/persqueue/writer/writer.cpp +++ b/ydb/core/persqueue/writer/writer.cpp @@ -123,6 +123,11 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> { } void BecomeZombie(const TString& error) { + SendError(error); + Become(&TThis::StateZombie); + } + + void SendError(const TString& error) { for (auto cookie : std::exchange(PendingWrite, {})) { SendWriteResult(error, MakeResponse(cookie)); } @@ -132,8 +137,6 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> { for (const auto& [cookie, _] : std::exchange(Pending, {})) { SendWriteResult(error, MakeResponse(cookie)); } - - Become(&TThis::StateZombie); } template <typename... Args> @@ -453,7 +456,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter> { if (PipeClient) { NTabletPipe::CloseAndForgetClient(SelfId(), PipeClient); } - + SendError("Unexpected termination"); TActorBootstrapped::PassAway(); } diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto index f9b74e17657..dcbf04d226a 100644 --- a/ydb/library/services/services.proto +++ b/ydb/library/services/services.proto @@ -996,5 +996,6 @@ message TActivity { KQP_SCAN_FETCH_ACTOR = 614; COLUMNSHARD_CONVEYOR = 615; PERSQUEUE_READ_QUOTER = 616; + KAFKA_PRODUCE_ACTOR = 617; }; }; diff --git a/ydb/services/persqueue_v1/actors/write_session_actor.ipp b/ydb/services/persqueue_v1/actors/write_session_actor.ipp index f8fe09e9bef..810d1c98fe9 100644 --- a/ydb/services/persqueue_v1/actors/write_session_actor.ipp +++ b/ydb/services/persqueue_v1/actors/write_session_actor.ipp @@ -1417,6 +1417,8 @@ void TWriteSessionActor<UseMigrationProtocol>::PrepareRequest(THolder<TEvWrite>& ui64 maxMessageMetadataSize = 0; auto addData = [&](const Topic::StreamWriteMessage::WriteRequest& writeRequest, const i32 messageIndex) { + const auto& msg = writeRequest.messages(messageIndex); + auto w = request.MutablePartitionRequest()->AddCmdWrite(); w->SetData(GetSerializedData(InitMeta, writeRequest, messageIndex)); if (UseDeduplication) { @@ -1424,14 +1426,15 @@ void TWriteSessionActor<UseMigrationProtocol>::PrepareRequest(THolder<TEvWrite>& } else { w->SetDisableDeduplication(true); } - w->SetSeqNo(writeRequest.messages(messageIndex).seq_no()); + w->SetSeqNo(msg.seq_no()); SeqNoInflight.push_back(w->GetSeqNo()); - w->SetCreateTimeMS(::google::protobuf::util::TimeUtil::TimestampToMilliseconds(writeRequest.messages(messageIndex).created_at())); - w->SetUncompressedSize(writeRequest.messages(messageIndex).uncompressed_size()); + w->SetCreateTimeMS(::google::protobuf::util::TimeUtil::TimestampToMilliseconds(msg.created_at())); + w->SetUncompressedSize(msg.uncompressed_size()); w->SetClientDC(ClientDC); w->SetIgnoreQuotaDeadline(true); + payloadSize += w->GetData().size() + w->GetSourceId().size(); - const auto& msg = writeRequest.messages(messageIndex); + ui64 currMetadataSize = 0; for (const auto& metaItem : msg.metadata_items()) { currMetadataSize += metaItem.key().size() + metaItem.value().size(); |