diff options
author | niksaveliev <nik@saveliev.me> | 2024-02-16 19:42:15 +0600 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-02-16 19:42:15 +0600 |
commit | ab6bbc49e15d417a9ebeb418ff379839efc94118 (patch) | |
tree | e5a1e0dfd5ebfd56d42c15be34c86d501975e098 | |
parent | ed91832c0151f100468559c80a48215809c0138d (diff) | |
download | ydb-ab6bbc49e15d417a9ebeb418ff379839efc94118.tar.gz |
Fix kafka with enabled proxy (#2017)
-rw-r--r-- | ydb/core/kafka_proxy/actors/actors.h | 5 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp | 12 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp | 5 |
3 files changed, 15 insertions, 7 deletions
diff --git a/ydb/core/kafka_proxy/actors/actors.h b/ydb/core/kafka_proxy/actors/actors.h index ff6403c737..4c0186ec3a 100644 --- a/ydb/core/kafka_proxy/actors/actors.h +++ b/ydb/core/kafka_proxy/actors/actors.h @@ -10,6 +10,11 @@ namespace NKafka { +static constexpr int ProxyNodeId = 1; +static constexpr char UnderlayPrefix[] = "u-"; + +static_assert(sizeof(UnderlayPrefix) == 3); + enum EAuthSteps { WAIT_HANDSHAKE, WAIT_AUTH, diff --git a/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp index c1f473f59f..de97d76722 100644 --- a/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_find_coordinator_actor.cpp @@ -25,7 +25,7 @@ void TKafkaFindCoordinatorActor::Bootstrap(const NActors::TActorContext& ctx) { bool withProxy = Context->Config.HasProxy() && !Context->Config.GetProxy().GetHostname().Empty(); if (withProxy) { - SendResponseOkAndDie(Context->Config.GetProxy().GetHostname(), Context->Config.GetProxy().GetPort(), -1, ctx); + SendResponseOkAndDie(Context->Config.GetProxy().GetHostname(), Context->Config.GetProxy().GetPort(), NKafka::ProxyNodeId, ctx); return; } @@ -54,6 +54,8 @@ void TKafkaFindCoordinatorActor::SendResponseOkAndDie(const TString& host, i32 p response->Port = port; response->NodeId = nodeId; + KAFKA_LOG_D("FIND_COORDINATOR response. Host#: " << host << ", Port#: " << port << ", NodeId# " << nodeId); + Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, static_cast<EKafkaErrors>(response->ErrorCode))); Die(ctx); } @@ -71,7 +73,9 @@ void TKafkaFindCoordinatorActor::SendResponseFailAndDie(EKafkaErrors error, cons response->Coordinators.push_back(coordinator); } - + + response->ErrorCode = error; + Send(Context->ConnectionId, new TEvKafka::TEvResponse(CorrelationId, response, static_cast<EKafkaErrors>(response->ErrorCode))); Die(ctx); } @@ -79,7 +83,11 @@ void TKafkaFindCoordinatorActor::SendResponseFailAndDie(EKafkaErrors error, cons void TKafkaFindCoordinatorActor::Handle(NKikimr::NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev, const NActors::TActorContext& ctx) { auto iter = ev->Get()->NodeIdsMapping->find(ctx.SelfID.NodeId()); Y_ABORT_UNLESS(!iter.IsEnd()); + auto host = (*ev->Get()->Nodes)[iter->second].Host; + if (host.StartsWith(UnderlayPrefix)) { + host = host.substr(sizeof(UnderlayPrefix) - 1); + } KAFKA_LOG_D("FIND_COORDINATOR incoming TEvGetAllNodesInfoResponse. Host#: " << host); SendResponseOkAndDie(host, Context->Config.GetListeningPort(), ctx.SelfID.NodeId(), ctx); } diff --git a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp index 9a9029a9f2..b51d3b6bad 100644 --- a/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_metadata_actor.cpp @@ -6,11 +6,6 @@ namespace NKafka { using namespace NKikimr::NGRpcProxy::V1; -static constexpr int ProxyNodeId = 1; -static constexpr char UnderlayPrefix[] = "u-"; - -static_assert(sizeof(UnderlayPrefix) == 3); - NActors::IActor* CreateKafkaMetadataActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TMetadataRequestData>& message) { |