diff options
author | tesseract <tesseract@yandex-team.com> | 2023-08-31 17:38:27 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-08-31 18:10:45 +0300 |
commit | 346b2bad3d60db2a9b53006912f01331f1803d4f (patch) | |
tree | 637fceef5b208401698dc8aa67a25059efedaf1d | |
parent | 0a6cd0452320909717ba06b29a39e6af06e004ff (diff) | |
download | ydb-346b2bad3d60db2a9b53006912f01331f1803d4f.tar.gz |
Proxy configuration for serverless BD
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp | 40 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp | 24 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/ut/metarequest_ut.cpp | 20 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/ut/ut_protocol.cpp | 121 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 7 |
5 files changed, 176 insertions, 36 deletions
diff --git a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp index 15e8a2a5fd8..2abfc027d86 100644 --- a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp @@ -6,6 +6,8 @@ namespace NKafka { using namespace NKikimr::NGRpcProxy::V1; +static constexpr int ProxyNodeId = 1; + NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context, const ui64 correlationId, const TMetadataRequestData* message) { @@ -63,26 +65,42 @@ void TKafkaMetadataActor::AddTopicError( } void TKafkaMetadataActor::AddTopicResponse(TMetadataResponseData::TMetadataResponseTopic& topic, TEvLocationResponse* response) { + bool withProxy = Context->Config.HasProxy() && !Context->Config.GetProxy().GetHostname().Empty(); + topic.ErrorCode = NONE_ERROR; topic.TopicId = TKafkaUuid(response->SchemeShardId, response->PathId); + if (withProxy) { + auto broker = TMetadataResponseData::TMetadataResponseBroker{}; + broker.NodeId = ProxyNodeId; + broker.Host = Context->Config.GetProxy().GetHostname(); + broker.Port = Context->Config.GetProxy().GetPort(); + Response->Brokers.emplace_back(std::move(broker)); + } + topic.Partitions.reserve(response->Partitions.size()); for (const auto& part : response->Partitions) { + auto nodeId = withProxy ? ProxyNodeId : part.NodeId; + TMetadataResponseData::TMetadataResponseTopic::PartitionsMeta::ItemType responsePartition; responsePartition.PartitionIndex = part.PartitionId; responsePartition.ErrorCode = NONE_ERROR; - responsePartition.LeaderId = part.NodeId; + responsePartition.LeaderId = nodeId; responsePartition.LeaderEpoch = part.Generation; - responsePartition.ReplicaNodes.push_back(part.NodeId); - responsePartition.IsrNodes.push_back(part.NodeId); - auto ins = AllClusterNodes.insert(part.NodeId); - if (ins.second) { - auto broker = TMetadataResponseData::TMetadataResponseBroker{}; - broker.NodeId = part.NodeId; - broker.Host = part.Hostname; - broker.Port = Context->Config.GetListeningPort(); - Response->Brokers.emplace_back(std::move(broker)); - } + responsePartition.ReplicaNodes.push_back(nodeId); + responsePartition.IsrNodes.push_back(nodeId); + topic.Partitions.emplace_back(std::move(responsePartition)); + + if (!withProxy) { + auto ins = AllClusterNodes.insert(part.NodeId); + if (ins.second) { + auto broker = TMetadataResponseData::TMetadataResponseBroker{}; + broker.NodeId = part.NodeId; + broker.Host = part.Hostname; + broker.Port = Context->Config.GetListeningPort(); + Response->Brokers.emplace_back(std::move(broker)); + } + } } } diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp index 69f87290f44..5ab1b9fa937 100644 --- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp @@ -356,8 +356,10 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co switch (writer.first) { case NOT_FOUND: result.ErrorCode = EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION; + break; case UNAUTHORIZED: result.ErrorCode = EKafkaErrors::TOPIC_AUTHORIZATION_FAILED; + break; default: result.ErrorCode = EKafkaErrors::UNKNOWN_SERVER_ERROR; } @@ -451,14 +453,19 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) { // We send the results in the order of receipt of the request while (!PendingRequests.empty()) { auto pendingRequest = PendingRequests.front(); - auto* request = pendingRequest->Request->Get()->Request; - auto correlationId = pendingRequest->Request->Get()->CorrelationId; - EKafkaErrors metricsErrorCode = EKafkaErrors::NONE_ERROR; // We send the response by timeout. This is possible, for example, if the event was lost or the PartitionWrite died. bool expired = expireTime > pendingRequest->StartTime; - KAFKA_LOG_D("Send result for correlationId " << correlationId << ". Expired=" << expired); + if (!expired && !pendingRequest->WaitResultCookies.empty()) { + return; + } + + auto* request = pendingRequest->Request->Get()->Request; + auto correlationId = pendingRequest->Request->Get()->CorrelationId; + EKafkaErrors metricsErrorCode = EKafkaErrors::NONE_ERROR; + + KAFKA_LOG_D("Send result for correlation=" << correlationId << ". Expired=" << expired); const auto topicsCount = request->TopicData.size(); auto response = std::make_shared<TProduceResponseData>(); @@ -479,13 +486,12 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) { const auto& result = pendingRequest->Results[position++]; size_t recordsCount = partitionData.Records.has_value() ? partitionData.Records->Records.size() : 0; partitionResponse.Index = partitionData.Index; - if (expired || pendingRequest->WaitResultCookies.empty()) { - SendMetrics(TStringBuilder() << topicData.Name, recordsCount, "failed_messages", ctx); - } else if (EKafkaErrors::NONE_ERROR != result.ErrorCode ) { - KAFKA_LOG_T("Partition result with error: ErrorCode=" << static_cast<int>(result.ErrorCode) << ", ErrorMessage=" << result.ErrorMessage); + if (EKafkaErrors::NONE_ERROR != result.ErrorCode) { + KAFKA_LOG_T("Partition result with error: ErrorCode=" << static_cast<int>(result.ErrorCode) << ", ErrorMessage=" << result.ErrorMessage << ", #01"); partitionResponse.ErrorCode = result.ErrorCode; metricsErrorCode = result.ErrorCode; partitionResponse.ErrorMessage = result.ErrorMessage; + SendMetrics(TStringBuilder() << topicData.Name, recordsCount, "failed_messages", ctx); } else { auto* msg = result.Value->Get(); @@ -500,7 +506,7 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) { partitionResponse.BaseOffset = lastResult.GetSeqNo(); } } else { - KAFKA_LOG_T("Partition result with error: ErrorCode=" << static_cast<int>(Convert(msg->GetError().Code)) << ", ErrorMessage=" << msg->GetError().Reason); + KAFKA_LOG_T("Partition result with error: ErrorCode=" << static_cast<int>(Convert(msg->GetError().Code)) << ", ErrorMessage=" << msg->GetError().Reason << ", #02"); SendMetrics(TStringBuilder() << topicData.Name, recordsCount, "failed_messages", ctx); partitionResponse.ErrorCode = Convert(msg->GetError().Code); metricsErrorCode = Convert(msg->GetError().Code); diff --git a/ydb/core/kafka_proxy/ut/metarequest_ut.cpp b/ydb/core/kafka_proxy/ut/metarequest_ut.cpp index 0f54284e8c9..9d3e7a70716 100644 --- a/ydb/core/kafka_proxy/ut/metarequest_ut.cpp +++ b/ydb/core/kafka_proxy/ut/metarequest_ut.cpp @@ -17,8 +17,12 @@ Y_UNIT_TEST_SUITE(TMetadataActorTests) { return res; } - auto GetEvent(NPersQueue::TTestServer& server, const TActorId& edgeActor, const TVector<TString>& topics) { + auto GetEvent(NPersQueue::TTestServer& server, const TActorId& edgeActor, const TVector<TString>& topics, const TString& proxyHost = "") { NKikimrConfig::TKafkaProxyConfig Config; + if (proxyHost) { + Config.MutableProxy()->SetHostname(proxyHost); + Config.MutableProxy()->SetPort(9097); + } auto* runtime = server.CleverServer->GetRuntime(); auto request = GetMetadataRequest(topics); @@ -84,6 +88,20 @@ Y_UNIT_TEST_SUITE(TMetadataActorTests) { event = GetEvent(server, edgeId, {}); response = dynamic_cast<TMetadataResponseData*>(event->Response.get()); UNIT_ASSERT_VALUES_EQUAL(response->Topics.size(), 0); + + event = GetEvent(server, edgeId, {topicPath}, "proxy-host"); + response = dynamic_cast<TMetadataResponseData*>(event->Response.get()); + UNIT_ASSERT_VALUES_EQUAL(response->Topics.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(response->Brokers.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(response->Brokers[0].NodeId, 1); + UNIT_ASSERT_VALUES_EQUAL(response->Brokers[0].Host, "proxy-host"); + UNIT_ASSERT_VALUES_EQUAL(response->Brokers[0].Port, 9097); + + for(auto& t : response->Topics) { + for(auto& p : t.Partitions) { + UNIT_ASSERT_VALUES_EQUAL(p.LeaderId, 1); + } + } } }; diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp index 280bc1cb287..4cb6e179972 100644 --- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp +++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp @@ -237,23 +237,24 @@ void AssertMessageMeta(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent UNIT_ASSERT_C(false, "Field " << field << " not found in message meta"); } -TMaybe<NTopic::TReadSessionEvent::TDataReceivedEvent> Read(std::shared_ptr<NYdb::NTopic::IReadSession> reader) { - TMaybe<NTopic::TReadSessionEvent::TDataReceivedEvent> result; +std::vector<NTopic::TReadSessionEvent::TDataReceivedEvent> Read(std::shared_ptr<NYdb::NTopic::IReadSession> reader) { + std::vector<NTopic::TReadSessionEvent::TDataReceivedEvent> result; while (true) { auto event = reader->GetEvent(true); if (!event) - return result; + break; if (auto dataEvent = std::get_if<NTopic::TReadSessionEvent::TDataReceivedEvent>(&*event)) { - result = *dataEvent; + result.push_back(*dataEvent); break; } else if (auto* lockEv = std::get_if<NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*event)) { lockEv->Confirm(); } else if (auto* releaseEv = std::get_if<NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&*event)) { releaseEv->Confirm(); } else if (auto* closeSessionEvent = std::get_if<NTopic::TSessionClosedEvent>(&*event)) { - return result; + break; } } + Cerr << ">>>>> Received messages " << result.size() << Endl; return result; } @@ -317,6 +318,12 @@ public: } TProduceResponseData::TPtr Produce(const TString& topicName, ui32 partition, const TKafkaRecordBatch& batch) { + std::vector<std::pair<ui32, TKafkaRecordBatch>> msgs; + msgs.emplace_back(partition, batch); + return Produce(topicName, msgs); + } + + TProduceResponseData::TPtr Produce(const TString& topicName, const std::vector<std::pair<ui32, TKafkaRecordBatch>> msgs) { Cerr << ">>>>> TProduceRequestData\n"; TRequestHeaderData header = Header(NKafka::EApiKey::PRODUCE, 9); @@ -324,9 +331,11 @@ public: TProduceRequestData request; request.TopicData.resize(1); request.TopicData[0].Name = topicName; - request.TopicData[0].PartitionData.resize(1); - request.TopicData[0].PartitionData[0].Index = partition; - request.TopicData[0].PartitionData[0].Records = batch; + request.TopicData[0].PartitionData.resize(msgs.size()); + for(size_t i = 0 ; i < msgs.size(); ++i) { + request.TopicData[0].PartitionData[i].Index = msgs[i].first; + request.TopicData[0].PartitionData[i].Records = msgs[i].second; + } return WriteAndRead<TProduceResponseData>(header, request); } @@ -391,7 +400,9 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { auto result = pqClient .CreateTopic(topicName, - NYdb::NTopic::TCreateTopicSettings().BeginAddConsumer("consumer-0").EndAddConsumer()) + NYdb::NTopic::TCreateTopicSettings() + .PartitioningSettings(10, 100) + .BeginAddConsumer("consumer-0").EndAddConsumer()) .ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); @@ -456,10 +467,10 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); auto m = Read(topicReader); - UNIT_ASSERT(m); + UNIT_ASSERT_EQUAL(m.size(), 1); - UNIT_ASSERT_EQUAL(m->GetMessages().size(), 1); - auto& m0 = m->GetMessages()[0]; + UNIT_ASSERT_EQUAL(m[0].GetMessages().size(), 1); + auto& m0 = m[0].GetMessages()[0]; m0.Commit(); UNIT_ASSERT_STRINGS_EQUAL(m0.GetData(), value); @@ -486,14 +497,94 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) { static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); auto m = Read(topicReader); - UNIT_ASSERT(m); + UNIT_ASSERT_EQUAL(m.size(), 1); - UNIT_ASSERT_EQUAL(m->GetMessages().size(), 1); - auto& m0 = m->GetMessages()[0]; + UNIT_ASSERT_EQUAL(m[0].GetMessages().size(), 1); + auto& m0 = m[0].GetMessages()[0]; m0.Commit(); } { + // Check for few records + + TKafkaRecordBatch batch; + batch.BaseOffset = 13; + batch.BaseSequence = 17; + batch.Magic = 2; // Current supported + batch.Records.resize(1); + batch.Records[0].Key = "record-key-0"; + batch.Records[0].Value = "record-value-0"; + + std::vector<std::pair<ui32, TKafkaRecordBatch>> msgs; + msgs.emplace_back(0, batch); + msgs.emplace_back(1, batch); + + auto msg = client.Produce("topic-0-test", msgs); + + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Name, "topic-0-test"); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].Index, 0); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].ErrorCode, + static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[1].Index, 1); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[1].ErrorCode, + static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR)); + + { + auto m = Read(topicReader); + UNIT_ASSERT_EQUAL(m.size(), 1); + + UNIT_ASSERT_EQUAL(m[0].GetMessages().size(), 1); + m[0].GetMessages()[0].Commit(); + } + + { + auto m = Read(topicReader); + UNIT_ASSERT_EQUAL(m.size(), 1); + + UNIT_ASSERT_EQUAL(m[0].GetMessages().size(), 1); + m[0].GetMessages()[0].Commit(); + } + } + + { + // Unknown topic + + TKafkaRecordBatch batch; + batch.BaseOffset = 7; + batch.BaseSequence = 11; + batch.Magic = 2; // Current supported + batch.Records.resize(1); + batch.Records[0].Key = "record-key-1"; + batch.Records[0].Value = "record-value-1"; + + auto msg = client.Produce("topic-0-test-not-exists", 0, batch); + + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Name, "topic-0-test-not-exists"); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].Index, 0); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].ErrorCode, + static_cast<TKafkaInt16>(EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION)); + } + + { + // Unknown partition + + TKafkaRecordBatch batch; + batch.BaseOffset = 7; + batch.BaseSequence = 11; + batch.Magic = 2; // Current supported + batch.Records.resize(1); + batch.Records[0].Key = "record-key-1"; + batch.Records[0].Value = "record-value-1"; + + auto msg = client.Produce("topic-0-test", 10000, batch); + + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Name, "topic-0-test"); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].Index, 10000); + UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].ErrorCode, + static_cast<TKafkaInt16>(EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION)); + } + + { // Check unknown ApiKey (must be last. close the session) // expect no exception client.UnknownApiKey(); diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 122aa8077e1..fef6ab206a7 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1938,6 +1938,13 @@ message TKafkaProxyConfig { optional uint64 MaxMessageSize = 4 [default = 268435456]; optional uint64 MaxInflightSize = 5 [default = 268435456]; optional uint64 PacketSize = 6 [default = 1500]; + + message TProxy { + optional string Hostname = 1; + optional int32 Port = 2 [default = 9092]; + } + + optional TProxy Proxy = 7; } message TAwsCompatibilityConfig { |