aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorniksaveliev <nik@saveliev.me>2024-02-16 19:42:15 +0600
committerGitHub <noreply@github.com>2024-02-16 19:42:15 +0600
commitab6bbc49e15d417a9ebeb418ff379839efc94118 (patch)
treee5a1e0dfd5ebfd56d42c15be34c86d501975e098
parented91832c0151f100468559c80a48215809c0138d (diff)
downloadydb-ab6bbc49e15d417a9ebeb418ff379839efc94118.tar.gz
Fix kafka with enabled proxy (#2017)
-rw-r--r--ydb/core/kafka_proxy/actors/actors.h5
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp12
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp5
3 files changed, 15 insertions, 7 deletions
diff --git a/ydb/core/kafka_proxy/actors/actors.h b/ydb/core/kafka_proxy/actors/actors.h
index ff6403c737..4c0186ec3a 100644
--- a/ydb/core/kafka_proxy/actors/actors.h
+++ b/ydb/core/kafka_proxy/actors/actors.h
@@ -10,6 +10,11 @@
namespace NKafka {
+static constexpr int ProxyNodeId = 1;
+static constexpr char UnderlayPrefix[] = "u-";
+
+static_assert(sizeof(UnderlayPrefix) == 3);
+
enum EAuthSteps {
WAIT_HANDSHAKE,
WAIT_AUTH,
diff --git a/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp
index c1f473f59f..de97d76722 100644
--- a/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp
@@ -25,7 +25,7 @@ void TKafkaFindCoordinatorActor::Bootstrap(const NActors::TActorContext& ctx) {
bool withProxy = Context->Config.HasProxy() && !Context->Config.GetProxy().GetHostname().Empty();
if (withProxy) {
- SendResponseOkAndDie(Context->Config.GetProxy().GetHostname(), Context->Config.GetProxy().GetPort(), -1, ctx);
+ SendResponseOkAndDie(Context->Config.GetProxy().GetHostname(), Context->Config.GetProxy().GetPort(), NKafka::ProxyNodeId, ctx);
return;
}
@@ -54,6 +54,8 @@ void TKafkaFindCoordinatorActor::SendResponseOkAndDie(const TString& host, i32 p
response->Port = port;
response->NodeId = nodeId;
+ KAFKA_LOG_D("FIND_COORDINATOR response. Host#: " << host << ", Port#: " << port << ", NodeId# " << nodeId);
+
Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, static_cast<EKafkaErrors>(response->ErrorCode)));
Die(ctx);
}
@@ -71,7 +73,9 @@ void TKafkaFindCoordinatorActor::SendResponseFailAndDie(EKafkaErrors error, cons
response->Coordinators.push_back(coordinator);
}
-
+
+ response->ErrorCode = error;
+
Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, static_cast<EKafkaErrors>(response->ErrorCode)));
Die(ctx);
}
@@ -79,7 +83,11 @@ void TKafkaFindCoordinatorActor::SendResponseFailAndDie(EKafkaErrors error, cons
void TKafkaFindCoordinatorActor::Handle(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev, const NActors::TActorContext& ctx) {
auto iter = ev->Get()->NodeIdsMapping->find(ctx.SelfID.NodeId());
Y_ABORT_UNLESS(!iter.IsEnd());
+
auto host = (*ev->Get()->Nodes)[iter->second].Host;
+ if (host.StartsWith(UnderlayPrefix)) {
+ host = host.substr(sizeof(UnderlayPrefix) - 1);
+ }
KAFKA_LOG_D("FIND_COORDINATOR incoming TEvGetAllNodesInfoResponse. Host#: " << host);
SendResponseOkAndDie(host, Context->Config.GetListeningPort(), ctx.SelfID.NodeId(), ctx);
}
diff --git a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
index 9a9029a9f2..b51d3b6bad 100644
--- a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp
@@ -6,11 +6,6 @@
namespace NKafka {
using namespace NKikimr::NGRpcProxy::V1;
-static constexpr int ProxyNodeId = 1;
-static constexpr char UnderlayPrefix[] = "u-";
-
-static_assert(sizeof(UnderlayPrefix) == 3);
-
NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context,
const ui64 correlationId,
const TMessagePtr<TMetadataRequestData>& message) {