aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortesseract <tesseract@yandex-team.com>2023-08-31 09:47:54 +0300
committertesseract <tesseract@yandex-team.com>2023-08-31 10:02:02 +0300
commit697f4769f1195d8d2e8b69ba560556f8404dec55 (patch)
tree76d1bc007da0a651c8ef9c6f5eec5e1baf8065a8
parent13004290f7ae5603115f0cd10c6a0e5d0b9c5be9 (diff)
downloadydb-697f4769f1195d8d2e8b69ba560556f8404dec55.tar.gz
Fail on bind error for Kafka port
-rw-r--r--ydb/core/kafka_proxy/kafka_connection.cpp12
-rw-r--r--ydb/core/kafka_proxy/kafka_listener.h2
-rw-r--r--ydb/core/raw_socket/sock_listener.cpp29
-rw-r--r--ydb/core/raw_socket/sock_listener.h8
-rw-r--r--ydb/core/testlib/test_client.cpp2
5 files changed, 34 insertions, 19 deletions
diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp
index a783cb00d7e..061af4d3e0d 100644
--- a/ydb/core/kafka_proxy/kafka_connection.cpp
+++ b/ydb/core/kafka_proxy/kafka_connection.cpp
@@ -182,6 +182,7 @@ protected:
hFunc(TEvPollerReady, HandleAccepting);
hFunc(TEvPollerRegisterResult, HandleAccepting);
HFunc(TEvKafka::TEvResponse, Handle);
+ sFunc(NActors::TEvents::TEvPoison, PassAway);
default:
KAFKA_LOG_ERROR("TKafkaConnection: Unexpected " << ev.Get()->GetTypeName());
}
@@ -499,16 +500,8 @@ protected:
}
}
}
- if (event->Get()->Write) {
- if (!FlushOutput()) {
- return;
- }
- }
- RequestPoller();
- }
- bool FlushOutput() {
- return true; // TODO
+ RequestPoller();
}
void HandleConnected(TEvPollerRegisterResult::TPtr ev) {
@@ -524,6 +517,7 @@ protected:
HFunc(TEvKafka::TEvResponse, Handle);
HFunc(TEvKafka::TEvAuthResult, Handle);
HFunc(TEvKafka::TEvHandshakeResult, Handle);
+ sFunc(NActors::TEvents::TEvPoison, PassAway);
default:
KAFKA_LOG_ERROR("TKafkaConnection: Unexpected " << ev.Get()->GetTypeName());
}
diff --git a/ydb/core/kafka_proxy/kafka_listener.h b/ydb/core/kafka_proxy/kafka_listener.h
index d5dde3ad416..10a82bf0944 100644
--- a/ydb/core/kafka_proxy/kafka_listener.h
+++ b/ydb/core/kafka_proxy/kafka_listener.h
@@ -13,7 +13,7 @@ inline NActors::IActor* CreateKafkaListener(const NActors::TActorId& poller, con
[=](TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address) {
return CreateKafkaConnection(socket, address, config);
},
- NKikimrServices::EServiceKikimr::KAFKA_PROXY);
+ NKikimrServices::EServiceKikimr::KAFKA_PROXY, EErrorAction::Abort);
}
} // namespace NKafka
diff --git a/ydb/core/raw_socket/sock_listener.cpp b/ydb/core/raw_socket/sock_listener.cpp
index ae2a51d93b7..c90ccf6ea3a 100644
--- a/ydb/core/raw_socket/sock_listener.cpp
+++ b/ydb/core/raw_socket/sock_listener.cpp
@@ -27,18 +27,22 @@ public:
TPollerToken::TPtr PollerToken;
THashSet<TActorId> Connections;
+ EErrorAction ErrorAction;
+
TSocketListener(const TActorId& poller, const TListenerSettings& settings, const TConnectionCreator& connectionCreator,
- NKikimrServices::EServiceKikimr service)
+ NKikimrServices::EServiceKikimr service, EErrorAction errorAction)
: Poller(poller)
, Settings(settings)
, ConnectionCreator(connectionCreator)
- , Service(service) {
+ , Service(service)
+ , ErrorAction(errorAction) {
}
STATEFN(StateWorking) {
switch (ev->GetTypeRewrite()) {
hFunc(NActors::TEvPollerRegisterResult, Handle);
hFunc(NActors::TEvPollerReady, Handle);
+ sFunc(TEvents::TEvPoison, PassAway);
}
}
@@ -69,13 +73,23 @@ public:
} else {
LOG_ERROR_S(*NActors::TlsActivationContext, Service, "Failed to bind " << bindAddress->ToString() << ". Error: " << strerror(-err));
}
- //abort();
- PassAway();
+
+ switch(ErrorAction) {
+ case EErrorAction::Abort:
+ Cerr << "Failed to set up listener on port " << Settings.Port
+ << " errno# " << -err << " (" << strerror(-err) << ")" << Endl;
+ exit(1);
+ break;
+
+ case EErrorAction::Ignore:
+ PassAway();
+ break;
+ }
}
void PassAway() override {
for (const NActors::TActorId& connection : Connections) {
- Send(connection, new NActors::TEvents::TEvPoisonPill());
+ Send(connection, new NActors::TEvents::TEvPoison());
}
}
@@ -105,8 +119,9 @@ public:
};
NActors::IActor* CreateSocketListener(const NActors::TActorId& poller, const TListenerSettings& settings,
- TConnectionCreator connectionCreator, NKikimrServices::EServiceKikimr service) {
- return new TSocketListener(poller, settings, connectionCreator, service);
+ TConnectionCreator connectionCreator, NKikimrServices::EServiceKikimr service,
+ EErrorAction errorAction) {
+ return new TSocketListener(poller, settings, connectionCreator, service, errorAction);
}
} // namespace NKikimr::NRawSocket
diff --git a/ydb/core/raw_socket/sock_listener.h b/ydb/core/raw_socket/sock_listener.h
index 584f0568d12..d328eff004e 100644
--- a/ydb/core/raw_socket/sock_listener.h
+++ b/ydb/core/raw_socket/sock_listener.h
@@ -15,9 +15,15 @@ struct TListenerSettings {
TString SslCertificatePem;
};
+enum EErrorAction {
+ Ignore,
+ Abort
+};
+
using TConnectionCreator = std::function<NActors::IActor* (TIntrusivePtr<TSocketDescriptor> socket, TNetworkConfig::TSocketAddressType address)>;
NActors::IActor* CreateSocketListener(const NActors::TActorId& poller, const TListenerSettings& settings,
- TConnectionCreator connectionCreator, NKikimrServices::EServiceKikimr service);
+ TConnectionCreator connectionCreator, NKikimrServices::EServiceKikimr service,
+ EErrorAction errorAction = EErrorAction::Ignore);
} // namespace NKikimr::NRawSocket
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index 904f24f6b6a..4650b63bc9b 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -930,7 +930,7 @@ namespace Tests {
Runtime->RegisterService(MakePollerActorId(), actorId, nodeIdx);
}
- {
+ if (Settings->AppConfig.GetKafkaProxyConfig().GetEnableKafkaProxy()) {
NKafka::TListenerSettings settings;
settings.Port = Settings->AppConfig.GetKafkaProxyConfig().GetListeningPort();
if (Settings->AppConfig.GetKafkaProxyConfig().HasSslCertificate()) {