aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-08-31 17:38:27 +0300
committertesseract <tesseract@yandex-team.com>2023-08-31 18:10:45 +0300
commit346b2bad3d60db2a9b53006912f01331f1803d4f (patch)
tree637fceef5b208401698dc8aa67a25059efedaf1d
parent0a6cd0452320909717ba06b29a39e6af06e004ff (diff)
downloadydb-346b2bad3d60db2a9b53006912f01331f1803d4f.tar.gz
Proxy configuration for serverless BD
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp40
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp24
-rw-r--r--ydb/core/kafka_proxy/ut/metarequest_ut.cpp20
-rw-r--r--ydb/core/kafka_proxy/ut/ut_protocol.cpp121
-rw-r--r--ydb/core/protos/config.proto7
5 files changed, 176 insertions, 36 deletions
diff --git a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
index 15e8a2a5fd8..2abfc027d86 100644
--- a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
@@ -6,6 +6,8 @@
namespace NKafka {
using namespace NKikimr::NGRpcProxy::V1;
+static constexpr int ProxyNodeId = 1;
+
NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context,
const ui64 correlationId,
const TMetadataRequestData* message) {
@@ -63,26 +65,42 @@ void TKafkaMetadataActor::AddTopicError(
}
void TKafkaMetadataActor::AddTopicResponse(TMetadataResponseData::TMetadataResponseTopic& topic, TEvLocationResponse* response) {
+ bool withProxy = Context->Config.HasProxy() && !Context->Config.GetProxy().GetHostname().Empty();
+
topic.ErrorCode = NONE_ERROR;
topic.TopicId = TKafkaUuid(response->SchemeShardId, response->PathId);
+ if (withProxy) {
+ auto broker = TMetadataResponseData::TMetadataResponseBroker{};
+ broker.NodeId = ProxyNodeId;
+ broker.Host = Context->Config.GetProxy().GetHostname();
+ broker.Port = Context->Config.GetProxy().GetPort();
+ Response->Brokers.emplace_back(std::move(broker));
+ }
+
topic.Partitions.reserve(response->Partitions.size());
for (const auto& part : response->Partitions) {
+ auto nodeId = withProxy ? ProxyNodeId : part.NodeId;
+
TMetadataResponseData::TMetadataResponseTopic::PartitionsMeta::ItemType responsePartition;
responsePartition.PartitionIndex = part.PartitionId;
responsePartition.ErrorCode = NONE_ERROR;
- responsePartition.LeaderId = part.NodeId;
+ responsePartition.LeaderId = nodeId;
responsePartition.LeaderEpoch = part.Generation;
- responsePartition.ReplicaNodes.push_back(part.NodeId);
- responsePartition.IsrNodes.push_back(part.NodeId);
- auto ins = AllClusterNodes.insert(part.NodeId);
- if (ins.second) {
- auto broker = TMetadataResponseData::TMetadataResponseBroker{};
- broker.NodeId = part.NodeId;
- broker.Host = part.Hostname;
- broker.Port = Context->Config.GetListeningPort();
- Response->Brokers.emplace_back(std::move(broker));
- }
+ responsePartition.ReplicaNodes.push_back(nodeId);
+ responsePartition.IsrNodes.push_back(nodeId);
+
topic.Partitions.emplace_back(std::move(responsePartition));
+
+ if (!withProxy) {
+ auto ins = AllClusterNodes.insert(part.NodeId);
+ if (ins.second) {
+ auto broker = TMetadataResponseData::TMetadataResponseBroker{};
+ broker.NodeId = part.NodeId;
+ broker.Host = part.Hostname;
+ broker.Port = Context->Config.GetListeningPort();
+ Response->Brokers.emplace_back(std::move(broker));
+ }
+ }
}
}
diff --git a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
index 69f87290f44..5ab1b9fa937 100644
--- a/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_produce_actor.cpp
@@ -356,8 +356,10 @@ void TKafkaProduceActor::ProcessRequest(TPendingRequest::TPtr pendingRequest, co
switch (writer.first) {
case NOT_FOUND:
result.ErrorCode = EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION;
+ break;
case UNAUTHORIZED:
result.ErrorCode = EKafkaErrors::TOPIC_AUTHORIZATION_FAILED;
+ break;
default:
result.ErrorCode = EKafkaErrors::UNKNOWN_SERVER_ERROR;
}
@@ -451,14 +453,19 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) {
// We send the results in the order of receipt of the request
while (!PendingRequests.empty()) {
auto pendingRequest = PendingRequests.front();
- auto* request = pendingRequest->Request->Get()->Request;
- auto correlationId = pendingRequest->Request->Get()->CorrelationId;
- EKafkaErrors metricsErrorCode = EKafkaErrors::NONE_ERROR;
// We send the response by timeout. This is possible, for example, if the event was lost or the PartitionWrite died.
bool expired = expireTime > pendingRequest->StartTime;
- KAFKA_LOG_D("Send result for correlationId " << correlationId << ". Expired=" << expired);
+ if (!expired && !pendingRequest->WaitResultCookies.empty()) {
+ return;
+ }
+
+ auto* request = pendingRequest->Request->Get()->Request;
+ auto correlationId = pendingRequest->Request->Get()->CorrelationId;
+ EKafkaErrors metricsErrorCode = EKafkaErrors::NONE_ERROR;
+
+ KAFKA_LOG_D("Send result for correlation=" << correlationId << ". Expired=" << expired);
const auto topicsCount = request->TopicData.size();
auto response = std::make_shared<TProduceResponseData>();
@@ -479,13 +486,12 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) {
const auto& result = pendingRequest->Results[position++];
size_t recordsCount = partitionData.Records.has_value() ? partitionData.Records->Records.size() : 0;
partitionResponse.Index = partitionData.Index;
- if (expired || pendingRequest->WaitResultCookies.empty()) {
- SendMetrics(TStringBuilder() << topicData.Name, recordsCount, "failed_messages", ctx);
- } else if (EKafkaErrors::NONE_ERROR != result.ErrorCode ) {
- KAFKA_LOG_T("Partition result with error: ErrorCode=" << static_cast<int>(result.ErrorCode) << ", ErrorMessage=" << result.ErrorMessage);
+ if (EKafkaErrors::NONE_ERROR != result.ErrorCode) {
+ KAFKA_LOG_T("Partition result with error: ErrorCode=" << static_cast<int>(result.ErrorCode) << ", ErrorMessage=" << result.ErrorMessage << ", #01");
partitionResponse.ErrorCode = result.ErrorCode;
metricsErrorCode = result.ErrorCode;
partitionResponse.ErrorMessage = result.ErrorMessage;
+
SendMetrics(TStringBuilder() << topicData.Name, recordsCount, "failed_messages", ctx);
} else {
auto* msg = result.Value->Get();
@@ -500,7 +506,7 @@ void TKafkaProduceActor::SendResults(const TActorContext& ctx) {
partitionResponse.BaseOffset = lastResult.GetSeqNo();
}
} else {
- KAFKA_LOG_T("Partition result with error: ErrorCode=" << static_cast<int>(Convert(msg->GetError().Code)) << ", ErrorMessage=" << msg->GetError().Reason);
+ KAFKA_LOG_T("Partition result with error: ErrorCode=" << static_cast<int>(Convert(msg->GetError().Code)) << ", ErrorMessage=" << msg->GetError().Reason << ", #02");
SendMetrics(TStringBuilder() << topicData.Name, recordsCount, "failed_messages", ctx);
partitionResponse.ErrorCode = Convert(msg->GetError().Code);
metricsErrorCode = Convert(msg->GetError().Code);
diff --git a/ydb/core/kafka_proxy/ut/metarequest_ut.cpp b/ydb/core/kafka_proxy/ut/metarequest_ut.cpp
index 0f54284e8c9..9d3e7a70716 100644
--- a/ydb/core/kafka_proxy/ut/metarequest_ut.cpp
+++ b/ydb/core/kafka_proxy/ut/metarequest_ut.cpp
@@ -17,8 +17,12 @@ Y_UNIT_TEST_SUITE(TMetadataActorTests) {
return res;
}
- auto GetEvent(NPersQueue::TTestServer& server, const TActorId& edgeActor, const TVector<TString>& topics) {
+ auto GetEvent(NPersQueue::TTestServer& server, const TActorId& edgeActor, const TVector<TString>& topics, const TString& proxyHost = "") {
NKikimrConfig::TKafkaProxyConfig Config;
+ if (proxyHost) {
+ Config.MutableProxy()->SetHostname(proxyHost);
+ Config.MutableProxy()->SetPort(9097);
+ }
auto* runtime = server.CleverServer->GetRuntime();
auto request = GetMetadataRequest(topics);
@@ -84,6 +88,20 @@ Y_UNIT_TEST_SUITE(TMetadataActorTests) {
event = GetEvent(server, edgeId, {});
response = dynamic_cast<TMetadataResponseData*>(event->Response.get());
UNIT_ASSERT_VALUES_EQUAL(response->Topics.size(), 0);
+
+ event = GetEvent(server, edgeId, {topicPath}, "proxy-host");
+ response = dynamic_cast<TMetadataResponseData*>(event->Response.get());
+ UNIT_ASSERT_VALUES_EQUAL(response->Topics.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(response->Brokers.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(response->Brokers[0].NodeId, 1);
+ UNIT_ASSERT_VALUES_EQUAL(response->Brokers[0].Host, "proxy-host");
+ UNIT_ASSERT_VALUES_EQUAL(response->Brokers[0].Port, 9097);
+
+ for(auto& t : response->Topics) {
+ for(auto& p : t.Partitions) {
+ UNIT_ASSERT_VALUES_EQUAL(p.LeaderId, 1);
+ }
+ }
}
};
diff --git a/ydb/core/kafka_proxy/ut/ut_protocol.cpp b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
index 280bc1cb287..4cb6e179972 100644
--- a/ydb/core/kafka_proxy/ut/ut_protocol.cpp
+++ b/ydb/core/kafka_proxy/ut/ut_protocol.cpp
@@ -237,23 +237,24 @@ void AssertMessageMeta(const NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent
UNIT_ASSERT_C(false, "Field " << field << " not found in message meta");
}
-TMaybe<NTopic::TReadSessionEvent::TDataReceivedEvent> Read(std::shared_ptr<NYdb::NTopic::IReadSession> reader) {
- TMaybe<NTopic::TReadSessionEvent::TDataReceivedEvent> result;
+std::vector<NTopic::TReadSessionEvent::TDataReceivedEvent> Read(std::shared_ptr<NYdb::NTopic::IReadSession> reader) {
+ std::vector<NTopic::TReadSessionEvent::TDataReceivedEvent> result;
while (true) {
auto event = reader->GetEvent(true);
if (!event)
- return result;
+ break;
if (auto dataEvent = std::get_if<NTopic::TReadSessionEvent::TDataReceivedEvent>(&*event)) {
- result = *dataEvent;
+ result.push_back(*dataEvent);
break;
} else if (auto* lockEv = std::get_if<NTopic::TReadSessionEvent::TStartPartitionSessionEvent>(&*event)) {
lockEv->Confirm();
} else if (auto* releaseEv = std::get_if<NTopic::TReadSessionEvent::TStopPartitionSessionEvent>(&*event)) {
releaseEv->Confirm();
} else if (auto* closeSessionEvent = std::get_if<NTopic::TSessionClosedEvent>(&*event)) {
- return result;
+ break;
}
}
+ Cerr << ">>>>> Received messages " << result.size() << Endl;
return result;
}
@@ -317,6 +318,12 @@ public:
}
TProduceResponseData::TPtr Produce(const TString& topicName, ui32 partition, const TKafkaRecordBatch& batch) {
+ std::vector<std::pair<ui32, TKafkaRecordBatch>> msgs;
+ msgs.emplace_back(partition, batch);
+ return Produce(topicName, msgs);
+ }
+
+ TProduceResponseData::TPtr Produce(const TString& topicName, const std::vector<std::pair<ui32, TKafkaRecordBatch>> msgs) {
Cerr << ">>>>> TProduceRequestData\n";
TRequestHeaderData header = Header(NKafka::EApiKey::PRODUCE, 9);
@@ -324,9 +331,11 @@ public:
TProduceRequestData request;
request.TopicData.resize(1);
request.TopicData[0].Name = topicName;
- request.TopicData[0].PartitionData.resize(1);
- request.TopicData[0].PartitionData[0].Index = partition;
- request.TopicData[0].PartitionData[0].Records = batch;
+ request.TopicData[0].PartitionData.resize(msgs.size());
+ for(size_t i = 0 ; i < msgs.size(); ++i) {
+ request.TopicData[0].PartitionData[i].Index = msgs[i].first;
+ request.TopicData[0].PartitionData[i].Records = msgs[i].second;
+ }
return WriteAndRead<TProduceResponseData>(header, request);
}
@@ -391,7 +400,9 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
auto result =
pqClient
.CreateTopic(topicName,
- NYdb::NTopic::TCreateTopicSettings().BeginAddConsumer("consumer-0").EndAddConsumer())
+ NYdb::NTopic::TCreateTopicSettings()
+ .PartitioningSettings(10, 100)
+ .BeginAddConsumer("consumer-0").EndAddConsumer())
.ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
@@ -456,10 +467,10 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
auto m = Read(topicReader);
- UNIT_ASSERT(m);
+ UNIT_ASSERT_EQUAL(m.size(), 1);
- UNIT_ASSERT_EQUAL(m->GetMessages().size(), 1);
- auto& m0 = m->GetMessages()[0];
+ UNIT_ASSERT_EQUAL(m[0].GetMessages().size(), 1);
+ auto& m0 = m[0].GetMessages()[0];
m0.Commit();
UNIT_ASSERT_STRINGS_EQUAL(m0.GetData(), value);
@@ -486,14 +497,94 @@ Y_UNIT_TEST_SUITE(KafkaProtocol) {
static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
auto m = Read(topicReader);
- UNIT_ASSERT(m);
+ UNIT_ASSERT_EQUAL(m.size(), 1);
- UNIT_ASSERT_EQUAL(m->GetMessages().size(), 1);
- auto& m0 = m->GetMessages()[0];
+ UNIT_ASSERT_EQUAL(m[0].GetMessages().size(), 1);
+ auto& m0 = m[0].GetMessages()[0];
m0.Commit();
}
{
+ // Check for few records
+
+ TKafkaRecordBatch batch;
+ batch.BaseOffset = 13;
+ batch.BaseSequence = 17;
+ batch.Magic = 2; // Current supported
+ batch.Records.resize(1);
+ batch.Records[0].Key = "record-key-0";
+ batch.Records[0].Value = "record-value-0";
+
+ std::vector<std::pair<ui32, TKafkaRecordBatch>> msgs;
+ msgs.emplace_back(0, batch);
+ msgs.emplace_back(1, batch);
+
+ auto msg = client.Produce("topic-0-test", msgs);
+
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Name, "topic-0-test");
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].Index, 0);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].ErrorCode,
+ static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[1].Index, 1);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[1].ErrorCode,
+ static_cast<TKafkaInt16>(EKafkaErrors::NONE_ERROR));
+
+ {
+ auto m = Read(topicReader);
+ UNIT_ASSERT_EQUAL(m.size(), 1);
+
+ UNIT_ASSERT_EQUAL(m[0].GetMessages().size(), 1);
+ m[0].GetMessages()[0].Commit();
+ }
+
+ {
+ auto m = Read(topicReader);
+ UNIT_ASSERT_EQUAL(m.size(), 1);
+
+ UNIT_ASSERT_EQUAL(m[0].GetMessages().size(), 1);
+ m[0].GetMessages()[0].Commit();
+ }
+ }
+
+ {
+ // Unknown topic
+
+ TKafkaRecordBatch batch;
+ batch.BaseOffset = 7;
+ batch.BaseSequence = 11;
+ batch.Magic = 2; // Current supported
+ batch.Records.resize(1);
+ batch.Records[0].Key = "record-key-1";
+ batch.Records[0].Value = "record-value-1";
+
+ auto msg = client.Produce("topic-0-test-not-exists", 0, batch);
+
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Name, "topic-0-test-not-exists");
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].Index, 0);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].ErrorCode,
+ static_cast<TKafkaInt16>(EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION));
+ }
+
+ {
+ // Unknown partition
+
+ TKafkaRecordBatch batch;
+ batch.BaseOffset = 7;
+ batch.BaseSequence = 11;
+ batch.Magic = 2; // Current supported
+ batch.Records.resize(1);
+ batch.Records[0].Key = "record-key-1";
+ batch.Records[0].Value = "record-value-1";
+
+ auto msg = client.Produce("topic-0-test", 10000, batch);
+
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].Name, "topic-0-test");
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].Index, 10000);
+ UNIT_ASSERT_VALUES_EQUAL(msg->Responses[0].PartitionResponses[0].ErrorCode,
+ static_cast<TKafkaInt16>(EKafkaErrors::UNKNOWN_TOPIC_OR_PARTITION));
+ }
+
+ {
// Check unknown ApiKey (must be last. close the session)
// expect no exception
client.UnknownApiKey();
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 122aa8077e1..fef6ab206a7 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1938,6 +1938,13 @@ message TKafkaProxyConfig {
optional uint64 MaxMessageSize = 4 [default = 268435456];
optional uint64 MaxInflightSize = 5 [default = 268435456];
optional uint64 PacketSize = 6 [default = 1500];
+
+ message TProxy {
+ optional string Hostname = 1;
+ optional int32 Port = 2 [default = 9092];
+ }
+
+ optional TProxy Proxy = 7;
}
message TAwsCompatibilityConfig {