aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-08-14 10:39:00 +0300
committertesseract <tesseract@yandex-team.com>2023-08-14 11:51:29 +0300
commit0677981702178f1a002820ddcc517389022d197a (patch)
treea662b3125f064c5c3c0ce1a494c9c6604aa8cfd9
parent3e43e9be67fe0ce6bba135c59c9f7054abab3db6 (diff)
downloadydb-0677981702178f1a002820ddcc517389022d197a.tar.gz
Test for Kafka
-rw-r--r--ydb/core/kafka_proxy/kafka.h266
-rw-r--r--ydb/core/kafka_proxy/kafka_connection.cpp2
-rw-r--r--ydb/core/kafka_proxy/ut/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/kafka_proxy/ut/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kafka_proxy/ut/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/kafka_proxy/ut/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/kafka_proxy/ut/ut_protocol.cpp294
-rw-r--r--ydb/core/kafka_proxy/ut/ya.make3
-rw-r--r--ydb/core/raw_socket/sock_listener.cpp5
-rw-r--r--ydb/core/testlib/test_client.cpp19
10 files changed, 458 insertions, 135 deletions
diff --git a/ydb/core/kafka_proxy/kafka.h b/ydb/core/kafka_proxy/kafka.h
index 70c4108211..df14f752d8 100644
--- a/ydb/core/kafka_proxy/kafka.h
+++ b/ydb/core/kafka_proxy/kafka.h
@@ -160,139 +160,139 @@ enum ESizeFormat {
// see https://kafka.apache.org/11/protocol.html#protocol_error_codes
enum EKafkaErrors {
- UNKNOWN_SERVER_ERROR = -1, // The server experienced an unexpected error when processing the request.
- NONE_ERROR = 0,
- OFFSET_OUT_OF_RANGE, // The requested offset is not within the range of offsets maintained by the server.,
- CORRUPT_MESSAGE, // This message has failed its CRC checksum, exceeds the valid size, has a null key for a compacted topic, or is otherwise corrupt.
- UNKNOWN_TOPIC_OR_PARTITION, // This server does not host this topic-partition.
- INVALID_FETCH_SIZE, // The requested fetch size is invalid.
- LEADER_NOT_AVAILABLE, // There is no leader for this topic-partition as we are in the middle of a leadership election.
- NOT_LEADER_OR_FOLLOWER, // For requests intended only for the leader, this error indicates that the broker is not the current leader.
- // For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.
- REQUEST_TIMED_OUT, // The request timed out.
- BROKER_NOT_AVAILABLE, // The broker is not available.
- REPLICA_NOT_AVAILABLE, // The replica is not available for the requested topic-partition. Produce/Fetch requests and other requests
- // intended only for the leader or follower return NOT_LEADER_OR_FOLLOWER if the broker is not a replica of the topic-partition.
- MESSAGE_TOO_LARGE, // The request included a message larger than the max message size the server will accept.
- STALE_CONTROLLER_EPOCH, // The controller moved to another broker.
- OFFSET_METADATA_TOO_LARGE, // The metadata field of the offset request was too large.
- NETWORK_EXCEPTION, // The server disconnected before a response was received.
- COORDINATOR_LOAD_IN_PROGRESS, // The coordinator is loading and hence can't process requests.
- COORDINATOR_NOT_AVAILABLE, // The coordinator is not available.
- NOT_COORDINATOR, // This is not the correct coordinator.
- INVALID_TOPIC_EXCEPTION, // The request attempted to perform an operation on an invalid topic.
- RECORD_LIST_TOO_LARGE, // The request included message batch larger than the configured segment size on the server.
- NOT_ENOUGH_REPLICAS, // Messages are rejected since there are fewer in-sync replicas than required.
- NOT_ENOUGH_REPLICAS_AFTER_APPEND, // Messages are written to the log, but to fewer in-sync replicas than required.
- INVALID_REQUIRED_ACKS, // Produce request specified an invalid value for required acks.
- ILLEGAL_GENERATION, // Specified group generation id is not valid.
- INCONSISTENT_GROUP_PROTOCOL, // The group member's supported protocols are incompatible with those of existing members
- // or first group member tried to join with empty protocol type or empty protocol list.
- INVALID_GROUP_ID, // The configured groupId is invalid.
- UNKNOWN_MEMBER_ID, // The coordinator is not aware of this member.
- INVALID_SESSION_TIMEOUT, // The session timeout is not within the range allowed by the broker
- // (as configured by group.min.session.timeout.ms and group.max.session.timeout.ms).
- REBALANCE_IN_PROGRESS, // The group is rebalancing, so a rejoin is needed.
- INVALID_COMMIT_OFFSET_SIZE, // The committing offset data size is not valid.
- TOPIC_AUTHORIZATION_FAILED, // Topic authorization failed.
- GROUP_AUTHORIZATION_FAILED, // Group authorization failed.
- CLUSTER_AUTHORIZATION_FAILED, // Cluster authorization failed.
- INVALID_TIMESTAMP, // The timestamp of the message is out of acceptable range.
- UNSUPPORTED_SASL_MECHANISM, // The broker does not support the requested SASL mechanism.
- ILLEGAL_SASL_STATE, // Request is not valid given the current SASL state.
- UNSUPPORTED_VERSION, // The version of API is not supported.
- TOPIC_ALREADY_EXISTS, // Topic with this name already exists.
- INVALID_PARTITIONS, // Number of partitions is below 1.
- INVALID_REPLICATION_FACTOR, // Replication factor is below 1 or larger than the number of available brokers.
- INVALID_REPLICA_ASSIGNMENT, // Replica assignment is invalid.
- INVALID_CONFIG, // Configuration is invalid.
- NOT_CONTROLLER, // This is not the correct controller for this cluster.
- INVALID_REQUEST, // This most likely occurs because of a request being malformed by the
- // client library or the message was sent to an incompatible broker. See the broker logs
- // for more details.
- UNSUPPORTED_FOR_MESSAGE_FORMAT, // The message format version on the broker does not support the request.
- POLICY_VIOLATION, // Request parameters do not satisfy the configured policy.
- OUT_OF_ORDER_SEQUENCE_NUMBER, // The broker received an out of order sequence number.
- DUPLICATE_SEQUENCE_NUMBER, // The broker received a duplicate sequence number.
- INVALID_PRODUCER_EPOCH, // Producer attempted to produce with an old epoch.
- INVALID_TXN_STATE, // The producer attempted a transactional operation in an invalid state.
- INVALID_PRODUCER_ID_MAPPING, // The producer attempted to use a producer id which is not currently assigned to
- // its transactional id.
- INVALID_TRANSACTION_TIMEOUT, // The transaction timeout is larger than the maximum value allowed by
- // the broker (as configured by transaction.max.timeout.ms).
- CONCURRENT_TRANSACTIONS, // The producer attempted to update a transaction
- // while another concurrent operation on the same transaction was ongoing.
- TRANSACTION_COORDINATOR_FENCED, // Indicates that the transaction coordinator sending a WriteTxnMarker
- // is no longer the current coordinator for a given producer.
- TRANSACTIONAL_ID_AUTHORIZATION_FAILED, // Transactional Id authorization failed.
- SECURITY_DISABLED, // Security features are disabled.
- OPERATION_NOT_ATTEMPTED, // The broker did not attempt to execute this operation. This may happen for
- // batched RPCs where some operations in the batch failed, causing the broker to respond without
- // trying the rest.
- KAFKA_STORAGE_ERROR, // Disk error when trying to access log file on the disk.
- LOG_DIR_NOT_FOUND, // The user-specified log directory is not found in the broker config.
- SASL_AUTHENTICATION_FAILED, // SASL Authentication failed.
- UNKNOWN_PRODUCER_ID, // This exception is raised by the broker if it could not locate the producer metadata
- // associated with the producerId in question. This could happen if, for instance, the producer's records
- // were deleted because their retention time had elapsed. Once the last records of the producerId are
- // removed, the producer's metadata is removed from the broker, and future appends by the producer will
- // return this exception.
- REASSIGNMENT_IN_PROGRESS, // A partition reassignment is in progress.
- DELEGATION_TOKEN_AUTH_DISABLED, // Delegation Token feature is not enabled.
- DELEGATION_TOKEN_NOT_FOUND, // Delegation Token is not found on server.
- DELEGATION_TOKEN_OWNER_MISMATCH, // Specified Principal is not valid Owner/Renewer.
- DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, // Delegation Token requests are not allowed on PLAINTEXT/1-way SSL
- // channels and on delegation token authenticated channels.
- DELEGATION_TOKEN_AUTHORIZATION_FAILED, // Delegation Token authorization failed.
- DELEGATION_TOKEN_EXPIRED, // Delegation Token is expired.
- INVALID_PRINCIPAL_TYPE, // Supplied principalType is not supported.
- NON_EMPTY_GROUP, // The group is not empty.
- GROUP_ID_NOT_FOUND, // The group id does not exist.
- FETCH_SESSION_ID_NOT_FOUND, // The fetch session ID was not found.
- INVALID_FETCH_SESSION_EPOCH, // The fetch session epoch is invalid.
- LISTENER_NOT_FOUND, // There is no listener on the leader broker that matches the listener on which
- // metadata request was processed.
- TOPIC_DELETION_DISABLED, // Topic deletion is disabled.
- FENCED_LEADER_EPOCH, // The leader epoch in the request is older than the epoch on the broker.
- UNKNOWN_LEADER_EPOCH, // The leader epoch in the request is newer than the epoch on the broker.
- UNSUPPORTED_COMPRESSION_TYPE, // The requesting client does not support the compression type of given partition.
- STALE_BROKER_EPOCH, // Broker epoch has changed.
- OFFSET_NOT_AVAILABLE, // The leader high watermark has not caught up from a recent leader
- // election so the offsets cannot be guaranteed to be monotonically increasing.
- MEMBER_ID_REQUIRED, // The group member needs to have a valid member id before actually entering a consumer group.
- PREFERRED_LEADER_NOT_AVAILABLE, // The preferred leader was not available.
- GROUP_MAX_SIZE_REACHED, // The consumer group has reached its max size.
- FENCED_INSTANCE_ID, // The broker rejected this static consumer since
- // another consumer with the same group.instance.id has registered with a different member.id.
- ELIGIBLE_LEADERS_NOT_AVAILABLE, // Eligible topic partition leaders are not available.
- ELECTION_NOT_NEEDED, // Leader election not needed for topic partition.
- NO_REASSIGNMENT_IN_PROGRESS, // No partition reassignment is in progress.
- GROUP_SUBSCRIBED_TO_TOPIC, // Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it.
- INVALID_RECORD, // This record has failed the validation on broker and hence will be rejected.
- UNSTABLE_OFFSET_COMMIT, // There are unstable offsets that need to be cleared.
- THROTTLING_QUOTA_EXCEEDED, // The throttling quota has been exceeded.
- PRODUCER_FENCED, // There is a newer producer with the same transactionalId
- // which fences the current one.
- RESOURCE_NOT_FOUND, // A request illegally referred to a resource that does not exist.
- DUPLICATE_RESOURCE, // A request illegally referred to the same resource twice.
- UNACCEPTABLE_CREDENTIAL, // Requested credential would not meet criteria for acceptability.
- INCONSISTENT_VOTER_SET, // Indicates that the either the sender or recipient of a
- // voter-only request is not one of the expected voters
- INVALID_UPDATE_VERSION, // The given update version was invalid.
- FEATURE_UPDATE_FAILED, // Unable to update finalized features due to an unexpected server error.
- PRINCIPAL_DESERIALIZATION_FAILURE, // Request principal deserialization failed during forwarding.
- // This indicates an internal error on the broker cluster security setup.
- SNAPSHOT_NOT_FOUND, // Requested snapshot was not found
- POSITION_OUT_OF_RANGE, // Requested position is not greater than or equal to zero, and less than the size of the snapshot.
- UNKNOWN_TOPIC_ID, // This server does not host this topic ID.
- DUPLICATE_BROKER_REGISTRATION, // This broker ID is already in use.
- BROKER_ID_NOT_REGISTERED, // The given broker ID was not registered.
- INCONSISTENT_TOPIC_ID, // The log's topic ID did not match the topic ID in the request
- INCONSISTENT_CLUSTER_ID, // The clusterId in the request does not match that found on the server
- TRANSACTIONAL_ID_NOT_FOUND, // The transactionalId could not be found
- FETCH_SESSION_TOPIC_ID_ERROR, // The fetch session encountered inconsistent topic ID usage
- INELIGIBLE_REPLICA, // The new ISR contains at least one ineligible replica.
- NEW_LEADER_ELECTED // The AlterPartition request successfully updated the partition state but the leader has changed.
+ UNKNOWN_SERVER_ERROR = -1, // The server experienced an unexpected error when processing the request.
+ NONE_ERROR = 0,
+ OFFSET_OUT_OF_RANGE = 1, // The requested offset is not within the range of offsets maintained by the server.,
+ CORRUPT_MESSAGE = 2, // This message has failed its CRC checksum, exceeds the valid size, has a null key for a compacted topic, or is otherwise corrupt.
+ UNKNOWN_TOPIC_OR_PARTITION = 3, // This server does not host this topic-partition.
+ INVALID_FETCH_SIZE = 4, // The requested fetch size is invalid.
+ LEADER_NOT_AVAILABLE = 5, // There is no leader for this topic-partition as we are in the middle of a leadership election.
+ NOT_LEADER_OR_FOLLOWER = 6, // For requests intended only for the leader, this error indicates that the broker is not the current leader.
+ // For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.
+ REQUEST_TIMED_OUT = 7, // The request timed out.
+ BROKER_NOT_AVAILABLE = 8, // The broker is not available.
+ REPLICA_NOT_AVAILABLE = 9, // The replica is not available for the requested topic-partition. Produce/Fetch requests and other requests
+ // intended only for the leader or follower return NOT_LEADER_OR_FOLLOWER if the broker is not a replica of the topic-partition.
+ MESSAGE_TOO_LARGE = 10, // The request included a message larger than the max message size the server will accept.
+ STALE_CONTROLLER_EPOCH = 11, // The controller moved to another broker.
+ OFFSET_METADATA_TOO_LARGE = 12, // The metadata field of the offset request was too large.
+ NETWORK_EXCEPTION = 13, // The server disconnected before a response was received.
+ COORDINATOR_LOAD_IN_PROGRESS = 14, // The coordinator is loading and hence can't process requests.
+ COORDINATOR_NOT_AVAILABLE = 15, // The coordinator is not available.
+ NOT_COORDINATOR = 16, // This is not the correct coordinator.
+ INVALID_TOPIC_EXCEPTION = 17, // The request attempted to perform an operation on an invalid topic.
+ RECORD_LIST_TOO_LARGE = 18, // The request included message batch larger than the configured segment size on the server.
+ NOT_ENOUGH_REPLICAS = 19, // Messages are rejected since there are fewer in-sync replicas than required.
+ NOT_ENOUGH_REPLICAS_AFTER_APPEND = 20, // Messages are written to the log, but to fewer in-sync replicas than required.
+ INVALID_REQUIRED_ACKS = 21, // Produce request specified an invalid value for required acks.
+ ILLEGAL_GENERATION = 22, // Specified group generation id is not valid.
+ INCONSISTENT_GROUP_PROTOCOL = 23, // The group member's supported protocols are incompatible with those of existing members
+ // or first group member tried to join with empty protocol type or empty protocol list.
+ INVALID_GROUP_ID = 24, // The configured groupId is invalid.
+ UNKNOWN_MEMBER_ID = 25, // The coordinator is not aware of this member.
+ INVALID_SESSION_TIMEOUT = 26, // The session timeout is not within the range allowed by the broker
+ // (as configured by group.min.session.timeout.ms and group.max.session.timeout.ms).
+ REBALANCE_IN_PROGRESS = 27, // The group is rebalancing, so a rejoin is needed.
+ INVALID_COMMIT_OFFSET_SIZE = 28, // The committing offset data size is not valid.
+ TOPIC_AUTHORIZATION_FAILED = 29, // Topic authorization failed.
+ GROUP_AUTHORIZATION_FAILED = 30, // Group authorization failed.
+ CLUSTER_AUTHORIZATION_FAILED = 31, // Cluster authorization failed.
+ INVALID_TIMESTAMP = 32, // The timestamp of the message is out of acceptable range.
+ UNSUPPORTED_SASL_MECHANISM = 33, // The broker does not support the requested SASL mechanism.
+ ILLEGAL_SASL_STATE = 34, // Request is not valid given the current SASL state.
+ UNSUPPORTED_VERSION = 35, // The version of API is not supported.
+ TOPIC_ALREADY_EXISTS = 36, // Topic with this name already exists.
+ INVALID_PARTITIONS = 37, // Number of partitions is below 1.
+ INVALID_REPLICATION_FACTOR = 38, // Replication factor is below 1 or larger than the number of available brokers.
+ INVALID_REPLICA_ASSIGNMENT = 39, // Replica assignment is invalid.
+ INVALID_CONFIG = 40, // Configuration is invalid.
+ NOT_CONTROLLER = 41, // This is not the correct controller for this cluster.
+ INVALID_REQUEST = 42, // This most likely occurs because of a request being malformed by the
+ // client library or the message was sent to an incompatible broker. See the broker logs
+ // for more details.
+ UNSUPPORTED_FOR_MESSAGE_FORMAT = 43, // The message format version on the broker does not support the request.
+ POLICY_VIOLATION = 44, // Request parameters do not satisfy the configured policy.
+ OUT_OF_ORDER_SEQUENCE_NUMBER = 45, // The broker received an out of order sequence number.
+ DUPLICATE_SEQUENCE_NUMBER = 46, // The broker received a duplicate sequence number.
+ INVALID_PRODUCER_EPOCH = 47, // Producer attempted to produce with an old epoch.
+ INVALID_TXN_STATE = 48, // The producer attempted a transactional operation in an invalid state.
+ INVALID_PRODUCER_ID_MAPPING = 49, // The producer attempted to use a producer id which is not currently assigned to
+ // its transactional id.
+ INVALID_TRANSACTION_TIMEOUT = 50, // The transaction timeout is larger than the maximum value allowed by
+ // the broker (as configured by transaction.max.timeout.ms).
+ CONCURRENT_TRANSACTIONS = 51, // The producer attempted to update a transaction
+ // while another concurrent operation on the same transaction was ongoing.
+ TRANSACTION_COORDINATOR_FENCED = 52, // Indicates that the transaction coordinator sending a WriteTxnMarker
+ // is no longer the current coordinator for a given producer.
+ TRANSACTIONAL_ID_AUTHORIZATION_FAILED = 53, // Transactional Id authorization failed.
+ SECURITY_DISABLED = 54, // Security features are disabled.
+ OPERATION_NOT_ATTEMPTED = 55, // The broker did not attempt to execute this operation. This may happen for
+ // batched RPCs where some operations in the batch failed, causing the broker to respond without
+ // trying the rest.
+ KAFKA_STORAGE_ERROR = 56, // Disk error when trying to access log file on the disk.
+ LOG_DIR_NOT_FOUND = 57, // The user-specified log directory is not found in the broker config.
+ SASL_AUTHENTICATION_FAILED = 58, // SASL Authentication failed.
+ UNKNOWN_PRODUCER_ID = 59, // This exception is raised by the broker if it could not locate the producer metadata
+ // associated with the producerId in question. This could happen if, for instance, the producer's records
+ // were deleted because their retention time had elapsed. Once the last records of the producerId are
+ // removed, the producer's metadata is removed from the broker, and future appends by the producer will
+ // return this exception.
+ REASSIGNMENT_IN_PROGRESS = 60, // A partition reassignment is in progress.
+ DELEGATION_TOKEN_AUTH_DISABLED = 61, // Delegation Token feature is not enabled.
+ DELEGATION_TOKEN_NOT_FOUND = 62, // Delegation Token is not found on server.
+ DELEGATION_TOKEN_OWNER_MISMATCH = 63, // Specified Principal is not valid Owner/Renewer.
+ DELEGATION_TOKEN_REQUEST_NOT_ALLOWED = 64, // Delegation Token requests are not allowed on PLAINTEXT/1-way SSL
+ // channels and on delegation token authenticated channels.
+ DELEGATION_TOKEN_AUTHORIZATION_FAILED = 65, // Delegation Token authorization failed.
+ DELEGATION_TOKEN_EXPIRED = 66, // Delegation Token is expired.
+ INVALID_PRINCIPAL_TYPE = 67, // Supplied principalType is not supported.
+ NON_EMPTY_GROUP = 68, // The group is not empty.
+ GROUP_ID_NOT_FOUND = 69, // The group id does not exist.
+ FETCH_SESSION_ID_NOT_FOUND = 70, // The fetch session ID was not found.
+ INVALID_FETCH_SESSION_EPOCH = 71, // The fetch session epoch is invalid.
+ LISTENER_NOT_FOUND = 72, // There is no listener on the leader broker that matches the listener on which
+ // metadata request was processed.
+ TOPIC_DELETION_DISABLED = 73, // Topic deletion is disabled.
+ FENCED_LEADER_EPOCH = 74, // The leader epoch in the request is older than the epoch on the broker.
+ UNKNOWN_LEADER_EPOCH = 75, // The leader epoch in the request is newer than the epoch on the broker.
+ UNSUPPORTED_COMPRESSION_TYPE = 76, // The requesting client does not support the compression type of given partition.
+ STALE_BROKER_EPOCH = 77, // Broker epoch has changed.
+ OFFSET_NOT_AVAILABLE = 78, // The leader high watermark has not caught up from a recent leader
+ // election so the offsets cannot be guaranteed to be monotonically increasing.
+ MEMBER_ID_REQUIRED = 79, // The group member needs to have a valid member id before actually entering a consumer group.
+ PREFERRED_LEADER_NOT_AVAILABLE = 80, // The preferred leader was not available.
+ GROUP_MAX_SIZE_REACHED = 81, // The consumer group has reached its max size.
+ FENCED_INSTANCE_ID = 82, // The broker rejected this static consumer since
+ // another consumer with the same group.instance.id has registered with a different member.id.
+ ELIGIBLE_LEADERS_NOT_AVAILABLE = 83, // Eligible topic partition leaders are not available.
+ ELECTION_NOT_NEEDED = 84, // Leader election not needed for topic partition.
+ NO_REASSIGNMENT_IN_PROGRESS = 85, // No partition reassignment is in progress.
+ GROUP_SUBSCRIBED_TO_TOPIC = 86, // Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it.
+ INVALID_RECORD = 87, // This record has failed the validation on broker and hence will be rejected.
+ UNSTABLE_OFFSET_COMMIT = 88, // There are unstable offsets that need to be cleared.
+ THROTTLING_QUOTA_EXCEEDED = 89, // The throttling quota has been exceeded.
+ PRODUCER_FENCED = 90, // There is a newer producer with the same transactionalId
+ // which fences the current one.
+ RESOURCE_NOT_FOUND = 91, // A request illegally referred to a resource that does not exist.
+ DUPLICATE_RESOURCE = 92, // A request illegally referred to the same resource twice.
+ UNACCEPTABLE_CREDENTIAL = 93, // Requested credential would not meet criteria for acceptability.
+ INCONSISTENT_VOTER_SET = 94, // Indicates that the either the sender or recipient of a
+ // voter-only request is not one of the expected voters
+ INVALID_UPDATE_VERSION = 95, // The given update version was invalid.
+ FEATURE_UPDATE_FAILED = 96, // Unable to update finalized features due to an unexpected server error.
+ PRINCIPAL_DESERIALIZATION_FAILURE = 97, // Request principal deserialization failed during forwarding.
+ // This indicates an internal error on the broker cluster security setup.
+ SNAPSHOT_NOT_FOUND = 98, // Requested snapshot was not found
+ POSITION_OUT_OF_RANGE = 99, // Requested position is not greater than or equal to zero, and less than the size of the snapshot.
+ UNKNOWN_TOPIC_ID = 100, // This server does not host this topic ID.
+ DUPLICATE_BROKER_REGISTRATION = 101, // This broker ID is already in use.
+ BROKER_ID_NOT_REGISTERED = 102, // The given broker ID was not registered.
+ INCONSISTENT_TOPIC_ID = 103, // The log's topic ID did not match the topic ID in the request
+ INCONSISTENT_CLUSTER_ID = 104, // The clusterId in the request does not match that found on the server
+ TRANSACTIONAL_ID_NOT_FOUND = 105, // The transactionalId could not be found
+ FETCH_SESSION_TOPIC_ID_ERROR = 106, // The fetch session encountered inconsistent topic ID usage
+ INELIGIBLE_REPLICA = 107, // The new ISR contains at least one ineligible replica.
+ NEW_LEADER_ELECTED = 108 // The AlterPartition request successfully updated the partition state but the leader has changed.
};
template <typename T>
diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp
index 66edeed807..e5aeb2f5ba 100644
--- a/ydb/core/kafka_proxy/kafka_connection.cpp
+++ b/ydb/core/kafka_proxy/kafka_connection.cpp
@@ -47,7 +47,7 @@ public:
using TBase = TActorBootstrapped<TKafkaConnection>;
struct Msg {
- using TPtr=std::shared_ptr<Msg>;
+ using TPtr = std::shared_ptr<Msg>;
size_t Size = 0;
TKafkaInt32 ExpectedSize = 0;
diff --git a/ydb/core/kafka_proxy/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/kafka_proxy/ut/CMakeLists.darwin-x86_64.txt
index 47bf8e6799..b8df216cda 100644
--- a/ydb/core/kafka_proxy/ut/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kafka_proxy/ut/CMakeLists.darwin-x86_64.txt
@@ -33,6 +33,7 @@ target_link_options(ydb-core-kafka_proxy-ut PRIVATE
)
target_sources(ydb-core-kafka_proxy-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_kafka_functions.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_protocol.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_serialization.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/metarequest_ut.cpp
)
diff --git a/ydb/core/kafka_proxy/ut/CMakeLists.linux-aarch64.txt b/ydb/core/kafka_proxy/ut/CMakeLists.linux-aarch64.txt
index 63df13dbe7..a4ad9bc7be 100644
--- a/ydb/core/kafka_proxy/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kafka_proxy/ut/CMakeLists.linux-aarch64.txt
@@ -36,6 +36,7 @@ target_link_options(ydb-core-kafka_proxy-ut PRIVATE
)
target_sources(ydb-core-kafka_proxy-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_kafka_functions.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_protocol.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_serialization.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/metarequest_ut.cpp
)
diff --git a/ydb/core/kafka_proxy/ut/CMakeLists.linux-x86_64.txt b/ydb/core/kafka_proxy/ut/CMakeLists.linux-x86_64.txt
index 70e7385f09..a7c8cc6172 100644
--- a/ydb/core/kafka_proxy/ut/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kafka_proxy/ut/CMakeLists.linux-x86_64.txt
@@ -37,6 +37,7 @@ target_link_options(ydb-core-kafka_proxy-ut PRIVATE
)
target_sources(ydb-core-kafka_proxy-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_kafka_functions.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_protocol.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_serialization.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/metarequest_ut.cpp
)
diff --git a/ydb/core/kafka_proxy/ut/CMakeLists.windows-x86_64.txt b/ydb/core/kafka_proxy/ut/CMakeLists.windows-x86_64.txt
index 38f370cfe1..38fe365d4f 100644
--- a/ydb/core/kafka_proxy/ut/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kafka_proxy/ut/CMakeLists.windows-x86_64.txt
@@ -26,6 +26,7 @@ target_link_libraries(ydb-core-kafka_proxy-ut PUBLIC
)
target_sources(ydb-core-kafka_proxy-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_kafka_functions.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_protocol.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/ut_serialization.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kafka_proxy/ut/metarequest_ut.cpp
)
diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
new file mode 100644
index 0000000000..8b6df0c716
--- /dev/null
+++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
@@ -0,0 +1,294 @@
+#include <library/cpp/testing/unittest/registar.h>
+
+#include "../kafka_messages.h"
+
+#include <ydb/services/ydb/ydb_common_ut.h>
+#include <ydb/services/ydb/ydb_keys_ut.h>
+
+#include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h>
+#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h>
+#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h>
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
+#include <ydb/public/api/grpc/draft/ydb_datastreams_v1.grpc.pb.h>
+
+#include <library/cpp/json/json_reader.h>
+#include <library/cpp/digest/md5/md5.h>
+
+#include <util/system/tempfile.h>
+
+#include <random>
+
+
+using namespace NKafka;
+using namespace NYdb;
+using namespace NYdb::NTable;
+
+static constexpr const char NON_CHARGEABLE_USER[] = "superuser@builtin";
+static constexpr const char NON_CHARGEABLE_USER_X[] = "superuser_x@builtin";
+static constexpr const char NON_CHARGEABLE_USER_Y[] = "superuser_y@builtin";
+
+static constexpr const char DEFAULT_CLOUD_ID[] = "somecloud";
+static constexpr const char DEFAULT_FOLDER_ID[] = "somefolder";
+
+struct WithSslAndAuth : TKikimrTestSettings {
+ static constexpr bool SSL = true;
+ static constexpr bool AUTH = true;
+};
+using TKikimrWithGrpcAndRootSchemaSecure = NYdb::TBasicKikimrWithGrpcAndRootSchema<WithSslAndAuth>;
+
+char Hex0(const unsigned char c) {
+ return c < 10 ? '0' + c : 'A' + c - 10;
+}
+
+void Print(const TBuffer& buffer) {
+ TStringBuilder sb;
+ for (size_t i = 0; i < buffer.Size(); ++i) {
+ char c = buffer.Data()[i];
+ if (i > 0) {
+ sb << ", ";
+ }
+ sb << "0x" << Hex0((c & 0xF0) >> 4) << Hex0(c & 0x0F);
+ }
+ Cerr << ">>>>> Packet sent: " << sb << Endl;
+}
+
+
+template<class TKikimr, bool secure>
+class TTestServer {
+public:
+ TIpPort Port;
+
+ TTestServer() {
+ TPortManager portManager;
+ Port = 9090; // portManager.GetTcpPort();
+
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutablePQConfig()->SetTopicsAreFirstClassCitizen(true);
+ appConfig.MutablePQConfig()->SetEnabled(true);
+ // NOTE(shmel1k@): KIKIMR-14221
+ appConfig.MutablePQConfig()->SetCheckACL(false);
+ appConfig.MutablePQConfig()->SetRequireCredentialsInNewProtocol(false);
+
+ auto cst = appConfig.MutablePQConfig()->AddClientServiceType();
+ cst->SetName("data-transfer");
+ cst = appConfig.MutablePQConfig()->AddClientServiceType();
+ cst->SetName("data-transfer2");
+
+ appConfig.MutableKafkaProxyConfig()->SetEnableKafkaProxy(true);
+ appConfig.MutableKafkaProxyConfig()->SetListeningPort(Port);
+ appConfig.MutableKafkaProxyConfig()->SetMaxMessageSize(1024);
+ appConfig.MutableKafkaProxyConfig()->SetMaxInflightSize(2048);
+
+
+ appConfig.MutablePQConfig()->MutableQuotingConfig()->SetEnableQuoting(true);
+ appConfig.MutablePQConfig()->MutableQuotingConfig()->SetQuotaWaitDurationMs(300);
+ appConfig.MutablePQConfig()->MutableQuotingConfig()->SetPartitionReadQuotaIsTwiceWriteQuota(true);
+ appConfig.MutablePQConfig()->MutableBillingMeteringConfig()->SetEnabled(true);
+ appConfig.MutablePQConfig()->MutableBillingMeteringConfig()->SetFlushIntervalSec(1);
+ appConfig.MutablePQConfig()->AddClientServiceType()->SetName("data-streams");
+ appConfig.MutablePQConfig()->AddNonChargeableUser(NON_CHARGEABLE_USER);
+ appConfig.MutablePQConfig()->AddNonChargeableUser(NON_CHARGEABLE_USER_X);
+ appConfig.MutablePQConfig()->AddNonChargeableUser(NON_CHARGEABLE_USER_Y);
+
+ appConfig.MutablePQConfig()->AddValidWriteSpeedLimitsKbPerSec(128);
+ appConfig.MutablePQConfig()->AddValidWriteSpeedLimitsKbPerSec(512);
+ appConfig.MutablePQConfig()->AddValidWriteSpeedLimitsKbPerSec(1_KB);
+
+ auto limit = appConfig.MutablePQConfig()->AddValidRetentionLimits();
+ limit->SetMinPeriodSeconds(0);
+ limit->SetMaxPeriodSeconds(TDuration::Days(1).Seconds());
+ limit->SetMinStorageMegabytes(0);
+ limit->SetMaxStorageMegabytes(0);
+
+ limit = appConfig.MutablePQConfig()->AddValidRetentionLimits();
+ limit->SetMinPeriodSeconds(0);
+ limit->SetMaxPeriodSeconds(TDuration::Days(7).Seconds());
+ limit->SetMinStorageMegabytes(50_KB);
+ limit->SetMaxStorageMegabytes(1_MB);
+
+ MeteringFile = MakeHolder<TTempFileHandle>();
+ appConfig.MutableMeteringConfig()->SetMeteringFilePath(MeteringFile->Name());
+
+ if (secure) {
+ appConfig.MutablePQConfig()->SetRequireCredentialsInNewProtocol(true);
+ }
+ KikimrServer = std::make_unique<TKikimr>(std::move(appConfig));
+ KikimrServer->GetRuntime()->SetLogPriority(NKikimrServices::KAFKA_PROXY, NActors::NLog::PRI_DEBUG);
+
+ ui16 grpc = KikimrServer->GetPort();
+ TString location = TStringBuilder() << "localhost:" << grpc;
+ auto driverConfig = TDriverConfig().SetEndpoint(location).SetLog(CreateLogBackend("cerr", TLOG_DEBUG));
+ if (secure) {
+ driverConfig.UseSecureConnection(TString(NYdbSslTestData::CaCrt));
+ } else {
+ driverConfig.SetDatabase("/Root/");
+ }
+
+ Driver = std::make_unique<TDriver>(std::move(driverConfig));
+
+ {
+ NYdb::NScheme::TSchemeClient schemeClient(*Driver);
+ NYdb::NScheme::TPermissions permissions("user@builtin", {"ydb.generic.read", "ydb.generic.write"});
+
+ auto result = schemeClient.ModifyPermissions("/Root",
+ NYdb::NScheme::TModifyPermissionsSettings().AddGrantPermissions(permissions)
+ ).ExtractValueSync();
+ Cerr << result.GetIssues().ToString() << "\n";
+ UNIT_ASSERT(result.IsSuccess());
+ }
+
+ TClient client(*(KikimrServer->ServerSettings));
+ UNIT_ASSERT_VALUES_EQUAL(NMsgBusProxy::MSTATUS_OK,
+ client.AlterUserAttributes("/", "Root", {{"folder_id", DEFAULT_FOLDER_ID},
+ {"cloud_id", DEFAULT_CLOUD_ID},
+ {"database_id", "root"}}));
+
+ //auto status = client.CreateUser("/Root", "ouruser", "ourUserPassword");
+ //UNIT_ASSERT_VALUES_EQUAL(status, NMsgBusProxy::MSTATUS_OK);
+ }
+
+public:
+ std::unique_ptr<TKikimr> KikimrServer;
+ std::unique_ptr<TDriver> Driver;
+ THolder<TTempFileHandle> MeteringFile;
+};
+
+using TInsecureTestServer = TTestServer<TKikimrWithGrpcAndRootSchema, false>;
+using TSecureTestServer = TTestServer<TKikimrWithGrpcAndRootSchemaSecure, true>;
+
+void Write(TSocketOutput& so, TApiMessage* request, TKafkaVersion version) {
+ TWritableBuf sb(nullptr, request->Size(version));
+ TKafkaWritable writable(sb);
+ request->Write(writable, version);
+ so.Write(sb.Data(), sb.Size());
+
+ Print(sb.GetBuffer());
+}
+
+void Write(TSocketOutput& so, TRequestHeaderData* header, TApiMessage* request) {
+ TKafkaVersion version = header->RequestApiVersion;
+ TKafkaVersion headerVersion = RequestHeaderVersion(request->ApiKey(), version);
+
+ TKafkaInt32 size = header->Size(headerVersion) + request->Size(version);
+ Cerr << ">>>>> Size=" << size << Endl;
+ NKafka::NormalizeNumber(size);
+ so.Write(&size, sizeof(size));
+
+ Write(so, header, headerVersion);
+ Write(so, request, version);
+
+ so.Flush();
+}
+
+std::unique_ptr<TApiMessage> Read(TSocketInput& si, TRequestHeaderData* requestHeader) {
+ TKafkaInt32 size;
+
+ si.Read(&size, sizeof(size));
+ NKafka::NormalizeNumber(size);
+
+ TBuffer buffer;
+ buffer.Resize(size);
+ si.Load(buffer.Data(), size);
+
+ TKafkaVersion headerVersion = ResponseHeaderVersion(requestHeader->RequestApiKey, requestHeader->RequestApiVersion);
+
+ TKafkaReadable readable(buffer);
+
+ TResponseHeaderData header;
+ header.Read(readable, headerVersion);
+
+ UNIT_ASSERT_VALUES_EQUAL(header.CorrelationId, requestHeader->CorrelationId);
+
+ auto response = CreateResponse(requestHeader->RequestApiKey);
+ response->Read(readable, requestHeader->RequestApiVersion);
+
+ return response;
+}
+
+Y_UNIT_TEST_SUITE(KafkaProtocol) {
+ Y_UNIT_TEST(ProduceScenario) {
+ TInsecureTestServer testServer;
+
+ TString topicName = "topic-0-test";
+
+ {
+ NYdb::NTopic::TTopicClient pqClient(*testServer.Driver);
+ auto result = pqClient.CreateTopic(topicName).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+ }
+
+ TNetworkAddress addr("localhost", testServer.Port);
+ TSocket s(addr);
+ TSocketOutput so(s);
+ TSocketInput si(s);
+
+ {
+ Cerr << ">>>>> ApiVersionsRequest\n";
+
+ TRequestHeaderData header;
+ header.RequestApiKey = NKafka::EApiKey::API_VERSIONS;
+ header.RequestApiVersion = 2;
+ header.CorrelationId = 0;
+ header.ClientId = "test";
+
+ TApiVersionsRequestData request;
+ request.ClientSoftwareName = "SuperTest";
+ request.ClientSoftwareVersion = "3100.7.13";
+
+ Write(so, &header, &request);
+
+ auto response = Read(si, &header);
+ auto* msg = dynamic_cast<TApiVersionsResponseData*>(response.get());
+
+ UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ UNIT_ASSERT_VALUES_EQUAL(msg->ApiKeys.size(), 6u);
+ }
+
+ {
+ Cerr << ">>>>> SaslHandshakeRequest\n";
+
+ TRequestHeaderData header;
+ header.RequestApiKey = NKafka::EApiKey::SASL_HANDSHAKE;
+ header.RequestApiVersion = 1;
+ header.CorrelationId = 1;
+ header.ClientId = "test";
+
+ TSaslHandshakeRequestData request;
+ request.Mechanism = "PLAIN";
+
+ Write(so, &header, &request);
+
+ auto response = Read(si, &header);
+ auto* msg = dynamic_cast<TSaslHandshakeResponseData*>(response.get());
+
+ UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ UNIT_ASSERT_VALUES_EQUAL(msg->Mechanisms.size(), 1u);
+ UNIT_ASSERT_VALUES_EQUAL(*msg->Mechanisms[0], "PLAIN");
+ }
+
+ {
+ Cerr << ">>>>> SaslAuthenticateRequestData";
+ char authBytes[] = "ignored\0ourUser@/Root\0ourUserPassword";
+
+ TRequestHeaderData header;
+ header.RequestApiKey = NKafka::EApiKey::SASL_AUTHENTICATE;
+ header.RequestApiVersion = 2;
+ header.CorrelationId = 2;
+ header.ClientId = "test";
+
+ TSaslAuthenticateRequestData request;
+ request.AuthBytes = TKafkaRawBytes(authBytes, sizeof(authBytes) - 1);
+
+ Write(so, &header, &request);
+
+ auto response = Read(si, &header);
+ Y_UNUSED(response);
+ //auto* msg = dynamic_cast<TSaslAuthenticateResponseData*>(response.get());
+
+ //UNIT_ASSERT_VALUES_EQUAL(msg->ErrorCode, static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ }
+ }
+} \ No newline at end of file
diff --git a/ydb/core/kafka_proxy/ut/ya.make b/ydb/core/kafka_proxy/ut/ya.make
index 556f5132e7..91266be62c 100644
--- a/ydb/core/kafka_proxy/ut/ya.make
+++ b/ydb/core/kafka_proxy/ut/ya.make
@@ -1,7 +1,10 @@
UNITTEST_FOR(ydb/core/kafka_proxy)
+#SIZE(medium)
+
SRCS(
ut_kafka_functions.cpp
+ ut_protocol.cpp
ut_serialization.cpp
metarequest_ut.cpp
)
diff --git a/ydb/core/raw_socket/sock_listener.cpp b/ydb/core/raw_socket/sock_listener.cpp
index 2354c9f33d..ae2a51d93b 100644
--- a/ydb/core/raw_socket/sock_listener.cpp
+++ b/ydb/core/raw_socket/sock_listener.cpp
@@ -63,9 +63,12 @@ public:
Send(Poller, new NActors::TEvPollerRegister(Socket, SelfId(), SelfId()));
Become(&TThis::StateWorking);
return;
+ } else {
+ LOG_ERROR_S(*NActors::TlsActivationContext, Service, "Failed to listen on " << bindAddress->ToString() << ". Error: " << strerror(-err));
}
+ } else {
+ LOG_ERROR_S(*NActors::TlsActivationContext, Service, "Failed to bind " << bindAddress->ToString() << ". Error: " << strerror(-err));
}
- LOG_ERROR_S(*NActors::TlsActivationContext, Service, "Failed to listen on " << bindAddress->ToString());
//abort();
PassAway();
}
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index 45b39945e6..c1041f5c0a 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -45,6 +45,7 @@
#include <ydb/core/security/ticket_parser.h>
#include <ydb/core/base/user_registry.h>
#include <ydb/core/health_check/health_check.h>
+#include <ydb/core/kafka_proxy/kafka_listener.h>
#include <ydb/core/kqp/common/kqp.h>
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
#include <ydb/core/kqp/proxy_service/kqp_proxy_service.h>
@@ -909,6 +910,24 @@ namespace Tests {
Runtime->RegisterService(NNetClassifier::MakeNetClassifierID(), netClassifierId, nodeIdx);
}
+ {
+ IActor* actor = CreatePollerActor();
+ TActorId actorId = Runtime->Register(actor, nodeIdx);
+ Runtime->RegisterService(MakePollerActorId(), actorId, nodeIdx);
+ }
+
+ {
+ NKafka::TListenerSettings settings;
+ settings.Port = Settings->AppConfig.GetKafkaProxyConfig().GetListeningPort();
+ if (Settings->AppConfig.GetKafkaProxyConfig().HasSslCertificate()) {
+ settings.SslCertificatePem = Settings->AppConfig.GetKafkaProxyConfig().GetSslCertificate();
+ }
+
+ IActor* actor = NKafka::CreateKafkaListener(MakePollerActorId(), settings, Settings->AppConfig.GetKafkaProxyConfig());
+ TActorId actorId = Runtime->Register(actor, nodeIdx);
+ Runtime->RegisterService(TActorId{}, actorId, nodeIdx);
+ }
+
if (Settings->EnableYq) {
NFq::NConfig::TConfig protoConfig;
protoConfig.SetEnabled(true);