diff options
author | tesseract <tesseract@yandex-team.com> | 2023-08-14 10:39:00 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-08-14 11:51:29 +0300 |
commit | 0677981702178f1a002820ddcc517389022d197a (patch) | |
tree | a662b3125f064c5c3c0ce1a494c9c6604aa8cfd9 | |
parent | 3e43e9be67fe0ce6bba135c59c9f7054abab3db6 (diff) | |
download | ydb-0677981702178f1a002820ddcc517389022d197a.tar.gz |
Test for Kafka
-rw-r--r-- | ydb/core/kafka_proxy/kafka.h | 266 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_connection.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/ut/CMakeLists.darwin-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/ut/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/ut/CMakeLists.linux-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/ut/CMakeLists.windows-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/ut/ut_protocol.cpp | 294 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/ut/ya.make | 3 | ||||
-rw-r--r-- | ydb/core/raw_socket/sock_listener.cpp | 5 | ||||
-rw-r--r-- | ydb/core/testlib/test_client.cpp | 19 |
10 files changed, 458 insertions, 135 deletions
diff --git a/ydb/core/kafka_proxy/kafka.h b/ydb/core/kafka_proxy/kafka.h index 70c4108211..df14f752d8 100644 --- a/ydb/core/kafka_proxy/kafka.h +++ b/ydb/core/kafka_proxy/kafka.h @@ -160,139 +160,139 @@ enum ESizeFormat { // see https://kafka.apache.org/11/protocol.html#protocol_error_codes enum EKafkaErrors { - UNKNOWN_SERVER_ERROR = -1, // The server experienced an unexpected error when processing the request. - NONE_ERROR = 0, - OFFSET_OUT_OF_RANGE, // The requested offset is not within the range of offsets maintained by the server., - CORRUPT_MESSAGE, // This message has failed its CRC checksum, exceeds the valid size, has a null key for a compacted topic, or is otherwise corrupt. - UNKNOWN_TOPIC_OR_PARTITION, // This server does not host this topic-partition. - INVALID_FETCH_SIZE, // The requested fetch size is invalid. - LEADER_NOT_AVAILABLE, // There is no leader for this topic-partition as we are in the middle of a leadership election. - NOT_LEADER_OR_FOLLOWER, // For requests intended only for the leader, this error indicates that the broker is not the current leader. - // For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. - REQUEST_TIMED_OUT, // The request timed out. - BROKER_NOT_AVAILABLE, // The broker is not available. - REPLICA_NOT_AVAILABLE, // The replica is not available for the requested topic-partition. Produce/Fetch requests and other requests - // intended only for the leader or follower return NOT_LEADER_OR_FOLLOWER if the broker is not a replica of the topic-partition. - MESSAGE_TOO_LARGE, // The request included a message larger than the max message size the server will accept. - STALE_CONTROLLER_EPOCH, // The controller moved to another broker. - OFFSET_METADATA_TOO_LARGE, // The metadata field of the offset request was too large. - NETWORK_EXCEPTION, // The server disconnected before a response was received. - COORDINATOR_LOAD_IN_PROGRESS, // The coordinator is loading and hence can't process requests. - COORDINATOR_NOT_AVAILABLE, // The coordinator is not available. - NOT_COORDINATOR, // This is not the correct coordinator. - INVALID_TOPIC_EXCEPTION, // The request attempted to perform an operation on an invalid topic. - RECORD_LIST_TOO_LARGE, // The request included message batch larger than the configured segment size on the server. - NOT_ENOUGH_REPLICAS, // Messages are rejected since there are fewer in-sync replicas than required. - NOT_ENOUGH_REPLICAS_AFTER_APPEND, // Messages are written to the log, but to fewer in-sync replicas than required. - INVALID_REQUIRED_ACKS, // Produce request specified an invalid value for required acks. - ILLEGAL_GENERATION, // Specified group generation id is not valid. - INCONSISTENT_GROUP_PROTOCOL, // The group member's supported protocols are incompatible with those of existing members - // or first group member tried to join with empty protocol type or empty protocol list. - INVALID_GROUP_ID, // The configured groupId is invalid. - UNKNOWN_MEMBER_ID, // The coordinator is not aware of this member. - INVALID_SESSION_TIMEOUT, // The session timeout is not within the range allowed by the broker - // (as configured by group.min.session.timeout.ms and group.max.session.timeout.ms). - REBALANCE_IN_PROGRESS, // The group is rebalancing, so a rejoin is needed. - INVALID_COMMIT_OFFSET_SIZE, // The committing offset data size is not valid. - TOPIC_AUTHORIZATION_FAILED, // Topic authorization failed. - GROUP_AUTHORIZATION_FAILED, // Group authorization failed. - CLUSTER_AUTHORIZATION_FAILED, // Cluster authorization failed. - INVALID_TIMESTAMP, // The timestamp of the message is out of acceptable range. - UNSUPPORTED_SASL_MECHANISM, // The broker does not support the requested SASL mechanism. - ILLEGAL_SASL_STATE, // Request is not valid given the current SASL state. - UNSUPPORTED_VERSION, // The version of API is not supported. - TOPIC_ALREADY_EXISTS, // Topic with this name already exists. - INVALID_PARTITIONS, // Number of partitions is below 1. - INVALID_REPLICATION_FACTOR, // Replication factor is below 1 or larger than the number of available brokers. - INVALID_REPLICA_ASSIGNMENT, // Replica assignment is invalid. - INVALID_CONFIG, // Configuration is invalid. - NOT_CONTROLLER, // This is not the correct controller for this cluster. - INVALID_REQUEST, // This most likely occurs because of a request being malformed by the - // client library or the message was sent to an incompatible broker. See the broker logs - // for more details. - UNSUPPORTED_FOR_MESSAGE_FORMAT, // The message format version on the broker does not support the request. - POLICY_VIOLATION, // Request parameters do not satisfy the configured policy. - OUT_OF_ORDER_SEQUENCE_NUMBER, // The broker received an out of order sequence number. - DUPLICATE_SEQUENCE_NUMBER, // The broker received a duplicate sequence number. - INVALID_PRODUCER_EPOCH, // Producer attempted to produce with an old epoch. - INVALID_TXN_STATE, // The producer attempted a transactional operation in an invalid state. - INVALID_PRODUCER_ID_MAPPING, // The producer attempted to use a producer id which is not currently assigned to - // its transactional id. - INVALID_TRANSACTION_TIMEOUT, // The transaction timeout is larger than the maximum value allowed by - // the broker (as configured by transaction.max.timeout.ms). - CONCURRENT_TRANSACTIONS, // The producer attempted to update a transaction - // while another concurrent operation on the same transaction was ongoing. - TRANSACTION_COORDINATOR_FENCED, // Indicates that the transaction coordinator sending a WriteTxnMarker - // is no longer the current coordinator for a given producer. - TRANSACTIONAL_ID_AUTHORIZATION_FAILED, // Transactional Id authorization failed. - SECURITY_DISABLED, // Security features are disabled. - OPERATION_NOT_ATTEMPTED, // The broker did not attempt to execute this operation. This may happen for - // batched RPCs where some operations in the batch failed, causing the broker to respond without - // trying the rest. - KAFKA_STORAGE_ERROR, // Disk error when trying to access log file on the disk. - LOG_DIR_NOT_FOUND, // The user-specified log directory is not found in the broker config. - SASL_AUTHENTICATION_FAILED, // SASL Authentication failed. - UNKNOWN_PRODUCER_ID, // This exception is raised by the broker if it could not locate the producer metadata - // associated with the producerId in question. This could happen if, for instance, the producer's records - // were deleted because their retention time had elapsed. Once the last records of the producerId are - // removed, the producer's metadata is removed from the broker, and future appends by the producer will - // return this exception. - REASSIGNMENT_IN_PROGRESS, // A partition reassignment is in progress. - DELEGATION_TOKEN_AUTH_DISABLED, // Delegation Token feature is not enabled. - DELEGATION_TOKEN_NOT_FOUND, // Delegation Token is not found on server. - DELEGATION_TOKEN_OWNER_MISMATCH, // Specified Principal is not valid Owner/Renewer. - DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, // Delegation Token requests are not allowed on PLAINTEXT/1-way SSL - // channels and on delegation token authenticated channels. - DELEGATION_TOKEN_AUTHORIZATION_FAILED, // Delegation Token authorization failed. - DELEGATION_TOKEN_EXPIRED, // Delegation Token is expired. - INVALID_PRINCIPAL_TYPE, // Supplied principalType is not supported. - NON_EMPTY_GROUP, // The group is not empty. - GROUP_ID_NOT_FOUND, // The group id does not exist. - FETCH_SESSION_ID_NOT_FOUND, // The fetch session ID was not found. - INVALID_FETCH_SESSION_EPOCH, // The fetch session epoch is invalid. - LISTENER_NOT_FOUND, // There is no listener on the leader broker that matches the listener on which - // metadata request was processed. - TOPIC_DELETION_DISABLED, // Topic deletion is disabled. - FENCED_LEADER_EPOCH, // The leader epoch in the request is older than the epoch on the broker. - UNKNOWN_LEADER_EPOCH, // The leader epoch in the request is newer than the epoch on the broker. - UNSUPPORTED_COMPRESSION_TYPE, // The requesting client does not support the compression type of given partition. - STALE_BROKER_EPOCH, // Broker epoch has changed. - OFFSET_NOT_AVAILABLE, // The leader high watermark has not caught up from a recent leader - // election so the offsets cannot be guaranteed to be monotonically increasing. - MEMBER_ID_REQUIRED, // The group member needs to have a valid member id before actually entering a consumer group. - PREFERRED_LEADER_NOT_AVAILABLE, // The preferred leader was not available. - GROUP_MAX_SIZE_REACHED, // The consumer group has reached its max size. - FENCED_INSTANCE_ID, // The broker rejected this static consumer since - // another consumer with the same group.instance.id has registered with a different member.id. - ELIGIBLE_LEADERS_NOT_AVAILABLE, // Eligible topic partition leaders are not available. - ELECTION_NOT_NEEDED, // Leader election not needed for topic partition. - NO_REASSIGNMENT_IN_PROGRESS, // No partition reassignment is in progress. - GROUP_SUBSCRIBED_TO_TOPIC, // Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it. - INVALID_RECORD, // This record has failed the validation on broker and hence will be rejected. - UNSTABLE_OFFSET_COMMIT, // There are unstable offsets that need to be cleared. - THROTTLING_QUOTA_EXCEEDED, // The throttling quota has been exceeded. - PRODUCER_FENCED, // There is a newer producer with the same transactionalId - // which fences the current one. - RESOURCE_NOT_FOUND, // A request illegally referred to a resource that does not exist. - DUPLICATE_RESOURCE, // A request illegally referred to the same resource twice. - UNACCEPTABLE_CREDENTIAL, // Requested credential would not meet criteria for acceptability. - INCONSISTENT_VOTER_SET, // Indicates that the either the sender or recipient of a - // voter-only request is not one of the expected voters - INVALID_UPDATE_VERSION, // The given update version was invalid. - FEATURE_UPDATE_FAILED, // Unable to update finalized features due to an unexpected server error. - PRINCIPAL_DESERIALIZATION_FAILURE, // Request principal deserialization failed during forwarding. - // This indicates an internal error on the broker cluster security setup. - SNAPSHOT_NOT_FOUND, // Requested snapshot was not found - POSITION_OUT_OF_RANGE, // Requested position is not greater than or equal to zero, and less than the size of the snapshot. - UNKNOWN_TOPIC_ID, // This server does not host this topic ID. - DUPLICATE_BROKER_REGISTRATION, // This broker ID is already in use. - BROKER_ID_NOT_REGISTERED, // The given broker ID was not registered. - INCONSISTENT_TOPIC_ID, // The log's topic ID did not match the topic ID in the request - INCONSISTENT_CLUSTER_ID, // The clusterId in the request does not match that found on the server - TRANSACTIONAL_ID_NOT_FOUND, // The transactionalId could not be found - FETCH_SESSION_TOPIC_ID_ERROR, // The fetch session encountered inconsistent topic ID usage - INELIGIBLE_REPLICA, // The new ISR contains at least one ineligible replica. - NEW_LEADER_ELECTED // The AlterPartition request successfully updated the partition state but the leader has changed. + UNKNOWN_SERVER_ERROR = -1, // The server experienced an unexpected error when processing the request. + NONE_ERROR = 0, + OFFSET_OUT_OF_RANGE = 1, // The requested offset is not within the range of offsets maintained by the server., + CORRUPT_MESSAGE = 2, // This message has failed its CRC checksum, exceeds the valid size, has a null key for a compacted topic, or is otherwise corrupt. + UNKNOWN_TOPIC_OR_PARTITION = 3, // This server does not host this topic-partition. + INVALID_FETCH_SIZE = 4, // The requested fetch size is invalid. + LEADER_NOT_AVAILABLE = 5, // There is no leader for this topic-partition as we are in the middle of a leadership election. + NOT_LEADER_OR_FOLLOWER = 6, // For requests intended only for the leader, this error indicates that the broker is not the current leader. + // For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition. + REQUEST_TIMED_OUT = 7, // The request timed out. + BROKER_NOT_AVAILABLE = 8, // The broker is not available. + REPLICA_NOT_AVAILABLE = 9, // The replica is not available for the requested topic-partition. Produce/Fetch requests and other requests + // intended only for the leader or follower return NOT_LEADER_OR_FOLLOWER if the broker is not a replica of the topic-partition. + MESSAGE_TOO_LARGE = 10, // The request included a message larger than the max message size the server will accept. + STALE_CONTROLLER_EPOCH = 11, // The controller moved to another broker. + OFFSET_METADATA_TOO_LARGE = 12, // The metadata field of the offset request was too large. + NETWORK_EXCEPTION = 13, // The server disconnected before a response was received. + COORDINATOR_LOAD_IN_PROGRESS = 14, // The coordinator is loading and hence can't process requests. + COORDINATOR_NOT_AVAILABLE = 15, // The coordinator is not available. + NOT_COORDINATOR = 16, // This is not the correct coordinator. + INVALID_TOPIC_EXCEPTION = 17, // The request attempted to perform an operation on an invalid topic. + RECORD_LIST_TOO_LARGE = 18, // The request included message batch larger than the configured segment size on the server. + NOT_ENOUGH_REPLICAS = 19, // Messages are rejected since there are fewer in-sync replicas than required. + NOT_ENOUGH_REPLICAS_AFTER_APPEND = 20, // Messages are written to the log, but to fewer in-sync replicas than required. + INVALID_REQUIRED_ACKS = 21, // Produce request specified an invalid value for required acks. + ILLEGAL_GENERATION = 22, // Specified group generation id is not valid. + INCONSISTENT_GROUP_PROTOCOL = 23, // The group member's supported protocols are incompatible with those of existing members + // or first group member tried to join with empty protocol type or empty protocol list. + INVALID_GROUP_ID = 24, // The configured groupId is invalid. + UNKNOWN_MEMBER_ID = 25, // The coordinator is not aware of this member. + INVALID_SESSION_TIMEOUT = 26, // The session timeout is not within the range allowed by the broker + // (as configured by group.min.session.timeout.ms and group.max.session.timeout.ms). + REBALANCE_IN_PROGRESS = 27, // The group is rebalancing, so a rejoin is needed. + INVALID_COMMIT_OFFSET_SIZE = 28, // The committing offset data size is not valid. + TOPIC_AUTHORIZATION_FAILED = 29, // Topic authorization failed. + GROUP_AUTHORIZATION_FAILED = 30, // Group authorization failed. + CLUSTER_AUTHORIZATION_FAILED = 31, // Cluster authorization failed. + INVALID_TIMESTAMP = 32, // The timestamp of the message is out of acceptable range. + UNSUPPORTED_SASL_MECHANISM = 33, // The broker does not support the requested SASL mechanism. + ILLEGAL_SASL_STATE = 34, // Request is not valid given the current SASL state. + UNSUPPORTED_VERSION = 35, // The version of API is not supported. + TOPIC_ALREADY_EXISTS = 36, // Topic with this name already exists. + INVALID_PARTITIONS = 37, // Number of partitions is below 1. + INVALID_REPLICATION_FACTOR = 38, // Replication factor is below 1 or larger than the number of available brokers. + INVALID_REPLICA_ASSIGNMENT = 39, // Replica assignment is invalid. + INVALID_CONFIG = 40, // Configuration is invalid. + NOT_CONTROLLER = 41, // This is not the correct controller for this cluster. + INVALID_REQUEST = 42, // This most likely occurs because of a request being malformed by the + // client library or the message was sent to an incompatible broker. See the broker logs + // for more details. + UNSUPPORTED_FOR_MESSAGE_FORMAT = 43, // The message format version on the broker does not support the request. + POLICY_VIOLATION = 44, // Request parameters do not satisfy the configured policy. + OUT_OF_ORDER_SEQUENCE_NUMBER = 45, // The broker received an out of order sequence number. + DUPLICATE_SEQUENCE_NUMBER = 46, // The broker received a duplicate sequence number. + INVALID_PRODUCER_EPOCH = 47, // Producer attempted to produce with an old epoch. + INVALID_TXN_STATE = 48, // The producer attempted a transactional operation in an invalid state. + INVALID_PRODUCER_ID_MAPPING = 49, // The producer attempted to use a producer id which is not currently assigned to + // its transactional id. + INVALID_TRANSACTION_TIMEOUT = 50, // The transaction timeout is larger than the maximum value allowed by + // the broker (as configured by transaction.max.timeout.ms). + CONCURRENT_TRANSACTIONS = 51, // The producer attempted to update a transaction + // while another concurrent operation on the same transaction was ongoing. + TRANSACTION_COORDINATOR_FENCED = 52, // Indicates that the transaction coordinator sending a WriteTxnMarker + // is no longer the current coordinator for a given producer. + TRANSACTIONAL_ID_AUTHORIZATION_FAILED = 53, // Transactional Id authorization failed. + SECURITY_DISABLED = 54, // Security features are disabled. + OPERATION_NOT_ATTEMPTED = 55, // The broker did not attempt to execute this operation. This may happen for + // batched RPCs where some operations in the batch failed, causing the broker to respond without + // trying the rest. + KAFKA_STORAGE_ERROR = 56, // Disk error when trying to access log file on the disk. + LOG_DIR_NOT_FOUND = 57, // The user-specified log directory is not found in the broker config. + SASL_AUTHENTICATION_FAILED = 58, // SASL Authentication failed. + UNKNOWN_PRODUCER_ID = 59, // This exception is raised by the broker if it could not locate the producer metadata + // associated with the producerId in question. This could happen if, for instance, the producer's records + // were deleted because their retention time had elapsed. Once the last records of the producerId are + // removed, the producer's metadata is removed from the broker, and future appends by the producer will + // return this exception. + REASSIGNMENT_IN_PROGRESS = 60, // A partition reassignment is in progress. + DELEGATION_TOKEN_AUTH_DISABLED = 61, // Delegation Token feature is not enabled. + DELEGATION_TOKEN_NOT_FOUND = 62, // Delegation Token is not found on server. + DELEGATION_TOKEN_OWNER_MISMATCH = 63, // Specified Principal is not valid Owner/Renewer. + DELEGATION_TOKEN_REQUEST_NOT_ALLOWED = 64, // Delegation Token requests are not allowed on PLAINTEXT/1-way SSL + // channels and on delegation token authenticated channels. + DELEGATION_TOKEN_AUTHORIZATION_FAILED = 65, // Delegation Token authorization failed. + DELEGATION_TOKEN_EXPIRED = 66, // Delegation Token is expired. + INVALID_PRINCIPAL_TYPE = 67, // Supplied principalType is not supported. + NON_EMPTY_GROUP = 68, // The group is not empty. + GROUP_ID_NOT_FOUND = 69, // The group id does not exist. + FETCH_SESSION_ID_NOT_FOUND = 70, // The fetch session ID was not found. + INVALID_FETCH_SESSION_EPOCH = 71, // The fetch session epoch is invalid. + LISTENER_NOT_FOUND = 72, // There is no listener on the leader broker that matches the listener on which + // metadata request was processed. + TOPIC_DELETION_DISABLED = 73, // Topic deletion is disabled. + FENCED_LEADER_EPOCH = 74, // The leader epoch in the request is older than the epoch on the broker. + UNKNOWN_LEADER_EPOCH = 75, // The leader epoch in the request is newer than the epoch on the broker. + UNSUPPORTED_COMPRESSION_TYPE = 76, // The requesting client does not support the compression type of given partition. + STALE_BROKER_EPOCH = 77, // Broker epoch has changed. + OFFSET_NOT_AVAILABLE = 78, // The leader high watermark has not caught up from a recent leader + // election so the offsets cannot be guaranteed to be monotonically increasing. + MEMBER_ID_REQUIRED = 79, // The group member needs to have a valid member id before actually entering a consumer group. + PREFERRED_LEADER_NOT_AVAILABLE = 80, // The preferred leader was not available. + GROUP_MAX_SIZE_REACHED = 81, // The consumer group has reached its max size. + FENCED_INSTANCE_ID = 82, // The broker rejected this static consumer since + // another consumer with the same group.instance.id has registered with a different member.id. + ELIGIBLE_LEADERS_NOT_AVAILABLE = 83, // Eligible topic partition leaders are not available. + ELECTION_NOT_NEEDED = 84, // Leader election not needed for topic partition. + NO_REASSIGNMENT_IN_PROGRESS = 85, // No partition reassignment is in progress. + GROUP_SUBSCRIBED_TO_TOPIC = 86, // Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it. + INVALID_RECORD = 87, // This record has failed the validation on broker and hence will be rejected. + UNSTABLE_OFFSET_COMMIT = 88, // There are unstable offsets that need to be cleared. + THROTTLING_QUOTA_EXCEEDED = 89, // The throttling quota has been exceeded. + PRODUCER_FENCED = 90, // There is a newer producer with the same transactionalId + // which fences the current one. + RESOURCE_NOT_FOUND = 91, // A request illegally referred to a resource that does not exist. + DUPLICATE_RESOURCE = 92, // A request illegally referred to the same resource twice. + UNACCEPTABLE_CREDENTIAL = 93, // Requested credential would not meet criteria for acceptability. + INCONSISTENT_VOTER_SET = 94, // Indicates that the either the sender or recipient of a + // voter-only request is not one of the expected voters + INVALID_UPDATE_VERSION = 95, // The given update version was invalid. + FEATURE_UPDATE_FAILED = 96, // Unable to update finalized features due to an unexpected server error. + PRINCIPAL_DESERIALIZATION_FAILURE = 97, // Request principal deserialization failed during forwarding. + // This indicates an internal error on the broker cluster security setup. + SNAPSHOT_NOT_FOUND = 98, // Requested snapshot was not found + POSITION_OUT_OF_RANGE = 99, // Requested position is not greater than or equal to zero, and less than the size of the snapshot. + UNKNOWN_TOPIC_ID = 100, // This server does not host this topic ID. + DUPLICATE_BROKER_REGISTRATION = 101, // This broker ID is already in use. + BROKER_ID_NOT_REGISTERED = 102, // The given broker ID was not registered. + INCONSISTENT_TOPIC_ID = 103, // The log's topic ID did not match the topic ID in the request + INCONSISTENT_CLUSTER_ID = 104, // The clusterId in the request does not match that found on the server + TRANSACTIONAL_ID_NOT_FOUND = 105, // The transactionalId could not be found + FETCH_SESSION_TOPIC_ID_ERROR = 106, // The fetch session encountered inconsistent topic ID usage + INELIGIBLE_REPLICA = 107, // The new ISR contains at least one ineligible replica. + NEW_LEADER_ELECTED = 108 // The AlterPartition request successfully updated the partition state but the leader has changed. }; template <typename T> diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp index 66edeed807..e5aeb2f5ba 100644 --- a/ydb/core/kafka_proxy/kafka_connection.cpp +++ b/ydb/core/kafka_proxy/kafka_connection.cpp @@ -47,7 +47,7 @@ public: using TBase = TActorBootstrapped<TKafkaConnection>; struct Msg { - using TPtr=std::shared_ptr<Msg>; + using TPtr = std::shared_ptr<Msg>; size_t Size = 0; TKafkaInt32 ExpectedSize = 0; diff --git a/ydb/core/kafka_proxy/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/kafka_proxy/ut/CMakeLists.darwin-x86_64.txt index 47bf8e6799..b8df216cda 100644 --- a/ydb/core/kafka_proxy/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kafka_proxy/ut/CMakeLists.darwin-x86_64.txt @@ -33,6 +33,7 @@ target_link_options(ydb-core-kafka_proxy-ut PRIVATE ) target_sources(ydb-core-kafka_proxy-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_kafka_functions.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_protocol.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_serialization.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/metarequest_ut.cpp ) diff --git a/ydb/core/kafka_proxy/ut/CMakeLists.linux-aarch64.txt b/ydb/core/kafka_proxy/ut/CMakeLists.linux-aarch64.txt index 63df13dbe7..a4ad9bc7be 100644 --- a/ydb/core/kafka_proxy/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kafka_proxy/ut/CMakeLists.linux-aarch64.txt @@ -36,6 +36,7 @@ target_link_options(ydb-core-kafka_proxy-ut PRIVATE ) target_sources(ydb-core-kafka_proxy-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_kafka_functions.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_protocol.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_serialization.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/metarequest_ut.cpp ) diff --git a/ydb/core/kafka_proxy/ut/CMakeLists.linux-x86_64.txt b/ydb/core/kafka_proxy/ut/CMakeLists.linux-x86_64.txt index 70e7385f09..a7c8cc6172 100644 --- a/ydb/core/kafka_proxy/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kafka_proxy/ut/CMakeLists.linux-x86_64.txt @@ -37,6 +37,7 @@ target_link_options(ydb-core-kafka_proxy-ut PRIVATE ) target_sources(ydb-core-kafka_proxy-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_kafka_functions.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_protocol.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_serialization.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/metarequest_ut.cpp ) diff --git a/ydb/core/kafka_proxy/ut/CMakeLists.windows-x86_64.txt b/ydb/core/kafka_proxy/ut/CMakeLists.windows-x86_64.txt index 38f370cfe1..38fe365d4f 100644 --- a/ydb/core/kafka_proxy/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kafka_proxy/ut/CMakeLists.windows-x86_64.txt @@ -26,6 +26,7 @@ target_link_libraries(ydb-core-kafka_proxy-ut PUBLIC ) target_sources(ydb-core-kafka_proxy-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_kafka_functions.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_protocol.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_serialization.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/metarequest_ut.cpp ) diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp new file mode 100644 index 0000000000..8b6df0c716 --- /dev/null +++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp @@ -0,0 +1,294 @@ +#include <library/cpp/testing/unittest/registar.h> + +#include "../kafka_messages.h" + +#include <ydb/services/ydb/ydb_common_ut.h> +#include <ydb/services/ydb/ydb_keys_ut.h> + +#include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h> +#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h> +#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h> +#include <ydb/public/sdk/cpp/client/ydb_table/table.h> +#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h> +#include <ydb/public/api/grpc/draft/ydb_datastreams_v1.grpc.pb.h> + +#include <library/cpp/json/json_reader.h> +#include <library/cpp/digest/md5/md5.h> + +#include <util/system/tempfile.h> + +#include <random> + + +using namespace NKafka; +using namespace NYdb; +using namespace NYdb::NTable; + +static constexpr const char NON_CHARGEABLE_USER[] = "superuser@builtin"; +static constexpr const char NON_CHARGEABLE_USER_X[] = "superuser_x@builtin"; +static constexpr const char NON_CHARGEABLE_USER_Y[] = "superuser_y@builtin"; + +static constexpr const char DEFAULT_CLOUD_ID[] = "somecloud"; +static constexpr const char DEFAULT_FOLDER_ID[] = "somefolder"; + +struct WithSslAndAuth : TKikimrTestSettings { + static constexpr bool SSL = true; + static constexpr bool AUTH = true; +}; +using TKikimrWithGrpcAndRootSchemaSecure = NYdb::TBasicKikimrWithGrpcAndRootSchema<WithSslAndAuth>; + +char Hex0(const unsigned char c) { + return c < 10 ? '0' + c : 'A' + c - 10; +} + +void Print(const TBuffer& buffer) { + TStringBuilder sb; + for (size_t i = 0; i < buffer.Size(); ++i) { + char c = buffer.Data()[i]; + if (i > 0) { + sb << ", "; + } + sb << "0x" << Hex0((c & 0xF0) >> 4) << Hex0(c & 0x0F); + } + Cerr << ">>>>> Packet sent: " << sb << Endl; +} + + +template<class TKikimr, bool secure> +class TTestServer { +public: + TIpPort Port; + + TTestServer() { + TPortManager portManager; + Port = 9090; // portManager.GetTcpPort(); + + NKikimrConfig::TAppConfig appConfig; + appConfig.MutablePQConfig()->SetTopicsAreFirstClassCitizen(true); + appConfig.MutablePQConfig()->SetEnabled(true); + // NOTE(shmel1k@): KIKIMR-14221 + appConfig.MutablePQConfig()->SetCheckACL(false); + appConfig.MutablePQConfig()->SetRequireCredentialsInNewProtocol(false); + + auto cst = appConfig.MutablePQConfig()->AddClientServiceType(); + cst->SetName("data-transfer"); + cst = appConfig.MutablePQConfig()->AddClientServiceType(); + cst->SetName("data-transfer2"); + + appConfig.MutableKafkaProxyConfig()->SetEnableKafkaProxy(true); + appConfig.MutableKafkaProxyConfig()->SetListeningPort(Port); + appConfig.MutableKafkaProxyConfig()->SetMaxMessageSize(1024); + appConfig.MutableKafkaProxyConfig()->SetMaxInflightSize(2048); + + + appConfig.MutablePQConfig()->MutableQuotingConfig()->SetEnableQuoting(true); + appConfig.MutablePQConfig()->MutableQuotingConfig()->SetQuotaWaitDurationMs(300); + appConfig.MutablePQConfig()->MutableQuotingConfig()->SetPartitionReadQuotaIsTwiceWriteQuota(true); + appConfig.MutablePQConfig()->MutableBillingMeteringConfig()->SetEnabled(true); + appConfig.MutablePQConfig()->MutableBillingMeteringConfig()->SetFlushIntervalSec(1); + appConfig.MutablePQConfig()->AddClientServiceType()->SetName("data-streams"); + appConfig.MutablePQConfig()->AddNonChargeableUser(NON_CHARGEABLE_USER); + appConfig.MutablePQConfig()->AddNonChargeableUser(NON_CHARGEABLE_USER_X); + appConfig.MutablePQConfig()->AddNonChargeableUser(NON_CHARGEABLE_USER_Y); + + appConfig.MutablePQConfig()->AddValidWriteSpeedLimitsKbPerSec(128); + appConfig.MutablePQConfig()->AddValidWriteSpeedLimitsKbPerSec(512); + appConfig.MutablePQConfig()->AddValidWriteSpeedLimitsKbPerSec(1_KB); + + auto limit = appConfig.MutablePQConfig()->AddValidRetentionLimits(); + limit->SetMinPeriodSeconds(0); + limit->SetMaxPeriodSeconds(TDuration::Days(1).Seconds()); + limit->SetMinStorageMegabytes(0); + limit->SetMaxStorageMegabytes(0); + + limit = appConfig.MutablePQConfig()->AddValidRetentionLimits(); + limit->SetMinPeriodSeconds(0); + limit->SetMaxPeriodSeconds(TDuration::Days(7).Seconds()); + limit->SetMinStorageMegabytes(50_KB); + limit->SetMaxStorageMegabytes(1_MB); + + MeteringFile = MakeHolder<TTempFileHandle>(); + appConfig.MutableMeteringConfig()->SetMeteringFilePath(MeteringFile->Name()); + + if (secure) { + appConfig.MutablePQConfig()->SetRequireCredentialsInNewProtocol(true); + } + KikimrServer = std::make_unique<TKikimr>(std::move(appConfig)); + KikimrServer->GetRuntime()->SetLogPriority(NKikimrServices::KAFKA_PROXY, NActors::NLog::PRI_DEBUG); + + ui16 grpc = KikimrServer->GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + auto driverConfig = TDriverConfig().SetEndpoint(location).SetLog(CreateLogBackend("cerr", TLOG_DEBUG)); + if (secure) { + driverConfig.UseSecureConnection(TString(NYdbSslTestData::CaCrt)); + } else { + driverConfig.SetDatabase("/Root/"); + } + + Driver = std::make_unique<TDriver>(std::move(driverConfig)); + + { + NYdb::NScheme::TSchemeClient schemeClient(*Driver); + NYdb::NScheme::TPermissions permissions("user@builtin", {"ydb.generic.read", "ydb.generic.write"}); + + auto result = schemeClient.ModifyPermissions("/Root", + NYdb::NScheme::TModifyPermissionsSettings().AddGrantPermissions(permissions) + ).ExtractValueSync(); + Cerr << result.GetIssues().ToString() << "\n"; + UNIT_ASSERT(result.IsSuccess()); + } + + TClient client(*(KikimrServer->ServerSettings)); + UNIT_ASSERT_VALUES_EQUAL(NMsgBusProxy::MSTATUS_OK, + client.AlterUserAttributes("/", "Root", {{"folder_id", DEFAULT_FOLDER_ID}, + {"cloud_id", DEFAULT_CLOUD_ID}, + {"database_id", "root"}})); + + //auto status = client.CreateUser("/Root", "ouruser", "ourUserPassword"); + //UNIT_ASSERT_VALUES_EQUAL(status, NMsgBusProxy::MSTATUS_OK); + } + +public: + std::unique_ptr<TKikimr> KikimrServer; + std::unique_ptr<TDriver> Driver; + THolder<TTempFileHandle> MeteringFile; +}; + +using TInsecureTestServer = TTestServer<TKikimrWithGrpcAndRootSchema, false>; +using TSecureTestServer = TTestServer<TKikimrWithGrpcAndRootSchemaSecure, true>; + +void Write(TSocketOutput& so, TApiMessage* request, TKafkaVersion version) { + TWritableBuf sb(nullptr, request->Size(version)); + TKafkaWritable writable(sb); + request->Write(writable, version); + so.Write(sb.Data(), sb.Size()); + + Print(sb.GetBuffer()); +} + +void Write(TSocketOutput& so, TRequestHeaderData* header, TApiMessage* request) { + TKafkaVersion version = header->RequestApiVersion; + TKafkaVersion headerVersion = RequestHeaderVersion(request->ApiKey(), version); + + TKafkaInt32 size = header->Size(headerVersion) + request->Size(version); + Cerr << ">>>>> Size=" << size << Endl; + NKafka::NormalizeNumber(size); + so.Write(&size, sizeof(size)); + + Write(so, header, headerVersion); + Write(so, request, version); + + so.Flush(); +} + +std::unique_ptr<TApiMessage> Read(TSocketInput& si, TRequestHeaderData* requestHeader) { + TKafkaInt32 size; + + si.Read(&size, sizeof(size)); + NKafka::NormalizeNumber(size); + + TBuffer buffer; + buffer.Resize(size); + si.Load(buffer.Data(), size); + + TKafkaVersion headerVersion = ResponseHeaderVersion(requestHeader->RequestApiKey, requestHeader->RequestApiVersion); + + TKafkaReadable readable(buffer); + + TResponseHeaderData header; + header.Read(readable, headerVersion); + + UNIT_ASSERT_VALUES_EQUAL(header.CorrelationId, requestHeader->CorrelationId); + + auto response = CreateResponse(requestHeader->RequestApiKey); + response->Read(readable, requestHeader->RequestApiVersion); + + return response; +} + +Y_UNIT_TEST_SUITE(KafkaProtocol) { + Y_UNIT_TEST(ProduceScenario) { + TInsecureTestServer testServer; + + TString topicName = "topic-0-test"; + + { + NYdb::NTopic::TTopicClient pqClient(*testServer.Driver); + auto result = pqClient.CreateTopic(topicName).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + } + + TNetworkAddress addr("localhost", testServer.Port); + TSocket s(addr); + TSocketOutput so(s); + TSocketInput si(s); + + { + Cerr << ">>>>> ApiVersionsRequest\n"; + + TRequestHeaderData header; + header.RequestApiKey = NKafka::EApiKey::API_VERSIONS; + header.RequestApiVersion = 2; + header.CorrelationId = 0; + header.ClientId = "test"; + + TApiVersionsRequestData request; + request.ClientSoftwareName = "SuperTest"; + request.ClientSoftwareVersion = "3100.7.13"; + + Write(so, &header, &request); + + auto response = Read(si, &header); + auto* msg = dynamic_cast<TApiVersionsResponseData*>(response.get()); + + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 6u); + } + + { + Cerr << ">>>>> SaslHandshakeRequest\n"; + + TRequestHeaderData header; + header.RequestApiKey = NKafka::EApiKey::SASL_HANDSHAKE; + header.RequestApiVersion = 1; + header.CorrelationId = 1; + header.ClientId = "test"; + + TSaslHandshakeRequestData request; + request.Mechanism = "PLAIN"; + + Write(so, &header, &request); + + auto response = Read(si, &header); + auto* msg = dynamic_cast<TSaslHandshakeResponseData*>(response.get()); + + UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u); + UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN"); + } + + { + Cerr << ">>>>> SaslAuthenticateRequestData"; + char authBytes[] = "ignored\0ourUser@/Root\0ourUserPassword"; + + TRequestHeaderData header; + header.RequestApiKey = NKafka::EApiKey::SASL_AUTHENTICATE; + header.RequestApiVersion = 2; + header.CorrelationId = 2; + header.ClientId = "test"; + + TSaslAuthenticateRequestData request; + request.AuthBytes = TKafkaRawBytes(authBytes, sizeof(authBytes) - 1); + + Write(so, &header, &request); + + auto response = Read(si, &header); + Y_UNUSED(response); + //auto* msg = dynamic_cast<TSaslAuthenticateResponseData*>(response.get()); + + //UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + } + } +}
\ No newline at end of file diff --git a/ydb/core/kafka_proxy/ut/ya.make b/ydb/core/kafka_proxy/ut/ya.make index 556f5132e7..91266be62c 100644 --- a/ydb/core/kafka_proxy/ut/ya.make +++ b/ydb/core/kafka_proxy/ut/ya.make @@ -1,7 +1,10 @@ UNITTEST_FOR(ydb/core/kafka_proxy) +#SIZE(medium) + SRCS( ut_kafka_functions.cpp + ut_protocol.cpp ut_serialization.cpp metarequest_ut.cpp ) diff --git a/ydb/core/raw_socket/sock_listener.cpp b/ydb/core/raw_socket/sock_listener.cpp index 2354c9f33d..ae2a51d93b 100644 --- a/ydb/core/raw_socket/sock_listener.cpp +++ b/ydb/core/raw_socket/sock_listener.cpp @@ -63,9 +63,12 @@ public: Send(Poller, new NActors::TEvPollerRegister(Socket, SelfId(), SelfId())); Become(&TThis::StateWorking); return; + } else { + LOG_ERROR_S(*NActors::TlsActivationContext, Service, "Failed to listen on " << bindAddress->ToString() << ". Error: " << strerror(-err)); } + } else { + LOG_ERROR_S(*NActors::TlsActivationContext, Service, "Failed to bind " << bindAddress->ToString() << ". Error: " << strerror(-err)); } - LOG_ERROR_S(*NActors::TlsActivationContext, Service, "Failed to listen on " << bindAddress->ToString()); //abort(); PassAway(); } diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 45b39945e6..c1041f5c0a 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -45,6 +45,7 @@ #include <ydb/core/security/ticket_parser.h> #include <ydb/core/base/user_registry.h> #include <ydb/core/health_check/health_check.h> +#include <ydb/core/kafka_proxy/kafka_listener.h> #include <ydb/core/kqp/common/kqp.h> #include <ydb/core/kqp/rm_service/kqp_rm_service.h> #include <ydb/core/kqp/proxy_service/kqp_proxy_service.h> @@ -909,6 +910,24 @@ namespace Tests { Runtime->RegisterService(NNetClassifier::MakeNetClassifierID(), netClassifierId, nodeIdx); } + { + IActor* actor = CreatePollerActor(); + TActorId actorId = Runtime->Register(actor, nodeIdx); + Runtime->RegisterService(MakePollerActorId(), actorId, nodeIdx); + } + + { + NKafka::TListenerSettings settings; + settings.Port = Settings->AppConfig.GetKafkaProxyConfig().GetListeningPort(); + if (Settings->AppConfig.GetKafkaProxyConfig().HasSslCertificate()) { + settings.SslCertificatePem = Settings->AppConfig.GetKafkaProxyConfig().GetSslCertificate(); + } + + IActor* actor = NKafka::CreateKafkaListener(MakePollerActorId(), settings, Settings->AppConfig.GetKafkaProxyConfig()); + TActorId actorId = Runtime->Register(actor, nodeIdx); + Runtime->RegisterService(TActorId{}, actorId, nodeIdx); + } + if (Settings->EnableYq) { NFq::NConfig::TConfig protoConfig; protoConfig.SetEnabled(true); |