aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsavnik <savnik@yandex-team.com>2023-09-07 12:25:19 +0300
committersavnik <savnik@yandex-team.com>2023-09-07 13:46:09 +0300
commited69062caf0a4b6eca864b5b8ccee4312c12751b (patch)
tree94c2a9fbc58c59bf148f984fa13017049cf5e671
parentf51f48a9153ef1341700ad8ba322f01d60368b60 (diff)
downloadydb-ed69062caf0a4b6eca864b5b8ccee4312c12751b.tar.gz
Split kafka metrics to datastreams and datastreams_serverless
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp2
-rw-r--r--ydb/core/kafka_proxy/actors/actors.h2
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp3
-rw-r--r--ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h1
-rw-r--r--ydb/core/kafka_proxy/kafka_connection.cpp3
-rw-r--r--ydb/core/kafka_proxy/kafka_events.h6
-rw-r--r--ydb/core/kafka_proxy/kafka_metrics.cpp5
7 files changed, 14 insertions, 8 deletions
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index ef13aa7752..0edb122cda 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -2640,7 +2640,7 @@ void TKafkaProxyServiceInitializer::InitializeServices(NActors::TActorSystemSetu
TMailboxType::HTSwap, appData->UserPoolId)
);
- IActor* metricsActor = CreateKafkaMetricsActor(NKafka::TKafkaMetricsSettings{appData->Counters->GetSubgroup("counters", "datastreams")}); // FIXME savnik: change to kafka_proxy
+ IActor* metricsActor = CreateKafkaMetricsActor(NKafka::TKafkaMetricsSettings{appData->Counters});
setup->LocalServices.emplace_back(
NKafka::MakeKafkaMetricsServiceID(),
TActorSetupCmd(metricsActor,
diff --git a/ydb/core/kafka_proxy/actors/actors.h b/ydb/core/kafka_proxy/actors/actors.h
index d08b620bc8..637e222c8a 100644
--- a/ydb/core/kafka_proxy/actors/actors.h
+++ b/ydb/core/kafka_proxy/actors/actors.h
@@ -36,6 +36,7 @@ struct TContext {
TString DatabaseId;
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
TString ClientDC;
+ bool IsServerless;
NKikimr::NPQ::TRlContext RlContext;
@@ -76,5 +77,4 @@ NActors::IActor* CreateKafkaProduceActor(const TContext::TPtr context);
NActors::IActor* CreateKafkaSaslHandshakeActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TSaslHandshakeRequestData>& message);
NActors::IActor* CreateKafkaSaslAuthActor(const TContext::TPtr context, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TMessagePtr<TSaslAuthenticateRequestData>& message);
-
} // namespace NKafka
diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
index 6e0e8e353a..89ab305b5a 100644
--- a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
+++ b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp
@@ -99,7 +99,7 @@ void TKafkaSaslAuthActor::SendResponseAndDie(EKafkaErrors errorCode, const TStri
responseToClient->ErrorMessage = "";
auto evResponse = std::make_shared<TEvKafka::TEvResponse>(CorrelationId, responseToClient, errorCode);
- auto authResult = new TEvKafka::TEvAuthResult(EAuthSteps::SUCCESS, evResponse, UserToken, DatabasePath, DatabaseId, FolderId, CloudId, ServiceAccountId, Coordinator, ResourcePath, errorMessage);
+ auto authResult = new TEvKafka::TEvAuthResult(EAuthSteps::SUCCESS, evResponse, UserToken, DatabasePath, DatabaseId, FolderId, CloudId, ServiceAccountId, Coordinator, ResourcePath, IsServerless, errorMessage);
Send(Context->ConnectionId, authResult);
}
@@ -192,6 +192,7 @@ void TKafkaSaslAuthActor::Handle(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeyS
return;
}
Y_VERIFY(navigate->ResultSet.size() == 1);
+ IsServerless = navigate->ResultSet.front().DomainInfo->IsServerless();
for (const auto& attr : navigate->ResultSet.front().Attributes) {
if (attr.first == "folder_id") FolderId = attr.second;
if (attr.first == "cloud_id") CloudId = attr.second;
diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h
index d240164baf..6a13e97ab4 100644
--- a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h
+++ b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h
@@ -92,6 +92,7 @@ private:
TString Coordinator;
TString ResourcePath;
TString CloudId;
+ bool IsServerless;
};
} // NKafka
diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp
index e933abb087..6108bfc632 100644
--- a/ydb/core/kafka_proxy/kafka_connection.cpp
+++ b/ydb/core/kafka_proxy/kafka_connection.cpp
@@ -319,6 +319,7 @@ protected:
Context->DatabaseId = event->DatabaseId;
Context->CloudId = event->CloudId;
Context->FolderId = event->FolderId;
+ Context->IsServerless = event->IsServerless;
KAFKA_LOG_D("Authentificated successful. SID=" << Context->UserToken->GetUserSID());
}
@@ -468,7 +469,7 @@ protected:
Step = HEADER_PROCESS;
break;
-
+
case HEADER_PROCESS:
Request->ApiKey = *(TKafkaInt16*)Request->Buffer->Data();
Request->ApiVersion = *(TKafkaVersion*)(Request->Buffer->Data() + sizeof(TKafkaInt16));
diff --git a/ydb/core/kafka_proxy/kafka_events.h b/ydb/core/kafka_proxy/kafka_events.h
index e623360292..32d55f3a49 100644
--- a/ydb/core/kafka_proxy/kafka_events.h
+++ b/ydb/core/kafka_proxy/kafka_events.h
@@ -61,7 +61,7 @@ struct TEvKafka {
}
TEvAuthResult(EAuthSteps authStep, std::shared_ptr<TEvKafka::TEvResponse> clientResponse, TIntrusiveConstPtr<NACLib::TUserToken> token, TString databasePath, TString databaseId,
- TString folderId, TString cloudId, TString serviceAccountId, TString coordinator, TString resourcePath, TString error = "")
+ TString folderId, TString cloudId, TString serviceAccountId, TString coordinator, TString resourcePath, bool isServerless, TString error = "")
: AuthStep(authStep)
, UserToken(token)
, DatabasePath(databasePath)
@@ -71,6 +71,7 @@ struct TEvKafka {
, DatabaseId(databaseId)
, Coordinator(coordinator)
, ResourcePath(resourcePath)
+ , IsServerless(isServerless)
, Error(error)
, ClientResponse(std::move(clientResponse)) {
}
@@ -84,6 +85,7 @@ struct TEvKafka {
TString DatabaseId;
TString Coordinator;
TString ResourcePath;
+ bool IsServerless;
TString Error;
TString SaslMechanism;
@@ -97,7 +99,7 @@ struct TEvKafka {
SaslMechanism(saslMechanism),
ClientResponse(std::move(clientResponse))
{}
-
+
EAuthSteps AuthStep;
TString Error;
TString SaslMechanism;
diff --git a/ydb/core/kafka_proxy/kafka_metrics.cpp b/ydb/core/kafka_proxy/kafka_metrics.cpp
index b001d2e6c5..758f128d7b 100644
--- a/ydb/core/kafka_proxy/kafka_metrics.cpp
+++ b/ydb/core/kafka_proxy/kafka_metrics.cpp
@@ -4,9 +4,10 @@
namespace NKafka {
TVector<std::pair<TString, TString>> BuildLabels(const NKafka::TContext::TPtr context, const TString& method, const TString& topic, const TString& name, const TString& errorCode) {
- return {{"database", context->DatabasePath}, {"method", method}, {"cloud_id", context->CloudId},
+ return {{"counters", context->IsServerless ? "datastreams_serverless" : "datastreams"},
+ {"database", context->DatabasePath}, {"method", method}, {"cloud_id", context->CloudId},
{"folder_id", context->FolderId}, {"database_id", context->DatabaseId},
- {"topic", topic}, {"errorCode", errorCode}, {"name", name}};
+ {"topic", topic}, {"error_code", errorCode}, {"name", name}};
}
TActorId MakeKafkaMetricsServiceID() {