diff options
author | tesseract <tesseract@yandex-team.com> | 2023-08-31 09:47:54 +0300 |
---|---|---|
committer | tesseract <tesseract@yandex-team.com> | 2023-08-31 10:02:02 +0300 |
commit | 697f4769f1195d8d2e8b69ba560556f8404dec55 (patch) | |
tree | 76d1bc007da0a651c8ef9c6f5eec5e1baf8065a8 | |
parent | 13004290f7ae5603115f0cd10c6a0e5d0b9c5be9 (diff) | |
download | ydb-697f4769f1195d8d2e8b69ba560556f8404dec55.tar.gz |
Fail on bind error for Kafka port
-rw-r--r-- | ydb/core/kafka_proxy/kafka_connection.cpp | 12 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_listener.h | 2 | ||||
-rw-r--r-- | ydb/core/raw_socket/sock_listener.cpp | 29 | ||||
-rw-r--r-- | ydb/core/raw_socket/sock_listener.h | 8 | ||||
-rw-r--r-- | ydb/core/testlib/test_client.cpp | 2 |
5 files changed, 34 insertions, 19 deletions
diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp index a783cb00d7e..061af4d3e0d 100644 --- a/ydb/core/kafka_proxy/kafka_connection.cpp +++ b/ydb/core/kafka_proxy/kafka_connection.cpp @@ -182,6 +182,7 @@ protected: hFunc(TEvPollerReady, HandleAccepting); hFunc(TEvPollerRegisterResult, HandleAccepting); HFunc(TEvKafka::TEvResponse, Handle); + sFunc(NActors::TEvents::TEvPoison, PassAway); default: KAFKA_LOG_ERROR("TKafkaConnection: Unexpected " << ev.Get()->GetTypeName()); } @@ -499,16 +500,8 @@ protected: } } } - if (event->Get()->Write) { - if (!FlushOutput()) { - return; - } - } - RequestPoller(); - } - bool FlushOutput() { - return true; // TODO + RequestPoller(); } void HandleConnected(TEvPollerRegisterResult::TPtr ev) { @@ -524,6 +517,7 @@ protected: HFunc(TEvKafka::TEvResponse, Handle); HFunc(TEvKafka::TEvAuthResult, Handle); HFunc(TEvKafka::TEvHandshakeResult, Handle); + sFunc(NActors::TEvents::TEvPoison, PassAway); default: KAFKA_LOG_ERROR("TKafkaConnection: Unexpected " << ev.Get()->GetTypeName()); } diff --git a/ydb/core/kafka_proxy/kafka_listener.h b/ydb/core/kafka_proxy/kafka_listener.h index d5dde3ad416..10a82bf0944 100644 --- a/ydb/core/kafka_proxy/kafka_listener.h +++ b/ydb/core/kafka_proxy/kafka_listener.h @@ -13,7 +13,7 @@ inline NActors::IActor* CreateKafkaListener(const NActors::TActorId& poller, con [=](TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address) { return CreateKafkaConnection(socket, address, config); }, - NKikimrServices::EServiceKikimr::KAFKA_PROXY); + NKikimrServices::EServiceKikimr::KAFKA_PROXY, EErrorAction::Abort); } } // namespace NKafka diff --git a/ydb/core/raw_socket/sock_listener.cpp b/ydb/core/raw_socket/sock_listener.cpp index ae2a51d93b7..c90ccf6ea3a 100644 --- a/ydb/core/raw_socket/sock_listener.cpp +++ b/ydb/core/raw_socket/sock_listener.cpp @@ -27,18 +27,22 @@ public: TPollerToken::TPtr PollerToken; THashSet<TActorId> Connections; + EErrorAction ErrorAction; + TSocketListener(const TActorId& poller, const TListenerSettings& settings, const TConnectionCreator& connectionCreator, - NKikimrServices::EServiceKikimr service) + NKikimrServices::EServiceKikimr service, EErrorAction errorAction) : Poller(poller) , Settings(settings) , ConnectionCreator(connectionCreator) - , Service(service) { + , Service(service) + , ErrorAction(errorAction) { } STATEFN(StateWorking) { switch (ev->GetTypeRewrite()) { hFunc(NActors::TEvPollerRegisterResult, Handle); hFunc(NActors::TEvPollerReady, Handle); + sFunc(TEvents::TEvPoison, PassAway); } } @@ -69,13 +73,23 @@ public: } else { LOG_ERROR_S(*NActors::TlsActivationContext, Service, "Failed to bind " << bindAddress->ToString() << ". Error: " << strerror(-err)); } - //abort(); - PassAway(); + + switch(ErrorAction) { + case EErrorAction::Abort: + Cerr << "Failed to set up listener on port " << Settings.Port + << " errno# " << -err << " (" << strerror(-err) << ")" << Endl; + exit(1); + break; + + case EErrorAction::Ignore: + PassAway(); + break; + } } void PassAway() override { for (const NActors::TActorId& connection : Connections) { - Send(connection, new NActors::TEvents::TEvPoisonPill()); + Send(connection, new NActors::TEvents::TEvPoison()); } } @@ -105,8 +119,9 @@ public: }; NActors::IActor* CreateSocketListener(const NActors::TActorId& poller, const TListenerSettings& settings, - TConnectionCreator connectionCreator, NKikimrServices::EServiceKikimr service) { - return new TSocketListener(poller, settings, connectionCreator, service); + TConnectionCreator connectionCreator, NKikimrServices::EServiceKikimr service, + EErrorAction errorAction) { + return new TSocketListener(poller, settings, connectionCreator, service, errorAction); } } // namespace NKikimr::NRawSocket diff --git a/ydb/core/raw_socket/sock_listener.h b/ydb/core/raw_socket/sock_listener.h index 584f0568d12..d328eff004e 100644 --- a/ydb/core/raw_socket/sock_listener.h +++ b/ydb/core/raw_socket/sock_listener.h @@ -15,9 +15,15 @@ struct TListenerSettings { TString SslCertificatePem; }; +enum EErrorAction { + Ignore, + Abort +}; + using TConnectionCreator = std::function<NActors::IActor* (TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address)>; NActors::IActor* CreateSocketListener(const NActors::TActorId& poller, const TListenerSettings& settings, - TConnectionCreator connectionCreator, NKikimrServices::EServiceKikimr service); + TConnectionCreator connectionCreator, NKikimrServices::EServiceKikimr service, + EErrorAction errorAction = EErrorAction::Ignore); } // namespace NKikimr::NRawSocket diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 904f24f6b6a..4650b63bc9b 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -930,7 +930,7 @@ namespace Tests { Runtime->RegisterService(MakePollerActorId(), actorId, nodeIdx); } - { + if (Settings->AppConfig.GetKafkaProxyConfig().GetEnableKafkaProxy()) { NKafka::TListenerSettings settings; settings.Port = Settings->AppConfig.GetKafkaProxyConfig().GetListeningPort(); if (Settings->AppConfig.GetKafkaProxyConfig().HasSslCertificate()) { |