diff options
author | savnik <savnik@yandex-team.com> | 2023-09-07 12:25:19 +0300 |
---|---|---|
committer | savnik <savnik@yandex-team.com> | 2023-09-07 13:46:09 +0300 |
commit | ed69062caf0a4b6eca864b5b8ccee4312c12751b (patch) | |
tree | 94c2a9fbc58c59bf148f984fa13017049cf5e671 | |
parent | f51f48a9153ef1341700ad8ba322f01d60368b60 (diff) | |
download | ydb-ed69062caf0a4b6eca864b5b8ccee4312c12751b.tar.gz |
Split kafka metrics to datastreams and datastreams_serverless
-rw-r--r-- | ydb/core/driver_lib/run/kikimr_services_initializers.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/actors/actors.h | 2 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp | 3 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h | 1 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_connection.cpp | 3 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_events.h | 6 | ||||
-rw-r--r-- | ydb/core/kafka_proxy/kafka_metrics.cpp | 5 |
7 files changed, 14 insertions, 8 deletions
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index ef13aa7752..0edb122cda 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -2640,7 +2640,7 @@ void TKafkaProxyServiceInitializer::InitializeServices(NActors::TActorSystemSetu TMailboxType::HTSwap, appData->UserPoolId) ); - IActor* metricsActor = CreateKafkaMetricsActor(NKafka::TKafkaMetricsSettings{appData->Counters->GetSubgroup("counters", "datastreams")}); // FIXME savnik: change to kafka_proxy + IActor* metricsActor = CreateKafkaMetricsActor(NKafka::TKafkaMetricsSettings{appData->Counters}); setup->LocalServices.emplace_back( NKafka::MakeKafkaMetricsServiceID(), TActorSetupCmd(metricsActor, diff --git a/ydb/core/kafka_proxy/actors/actors.h b/ydb/core/kafka_proxy/actors/actors.h index d08b620bc8..637e222c8a 100644 --- a/ydb/core/kafka_proxy/actors/actors.h +++ b/ydb/core/kafka_proxy/actors/actors.h @@ -36,6 +36,7 @@ struct TContext { TString DatabaseId; TIntrusiveConstPtr<NACLib::TUserToken> UserToken; TString ClientDC; + bool IsServerless; NKikimr::NPQ::TRlContext RlContext; @@ -76,5 +77,4 @@ NActors::IActor* CreateKafkaProduceActor(const TContext::TPtr context); NActors::IActor* CreateKafkaSaslHandshakeActor(const TContext::TPtr context, const ui64 correlationId, const TMessagePtr<TSaslHandshakeRequestData>& message); NActors::IActor* CreateKafkaSaslAuthActor(const TContext::TPtr context, const ui64 correlationId, const NKikimr::NRawSocket::TSocketDescriptor::TSocketAddressType address, const TMessagePtr<TSaslAuthenticateRequestData>& message); - } // namespace NKafka diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp index 6e0e8e353a..89ab305b5a 100644 --- a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp +++ b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.cpp @@ -99,7 +99,7 @@ void TKafkaSaslAuthActor::SendResponseAndDie(EKafkaErrors errorCode, const TStri responseToClient->ErrorMessage = ""; auto evResponse = std::make_shared<TEvKafka::TEvResponse>(CorrelationId, responseToClient, errorCode); - auto authResult = new TEvKafka::TEvAuthResult(EAuthSteps::SUCCESS, evResponse, UserToken, DatabasePath, DatabaseId, FolderId, CloudId, ServiceAccountId, Coordinator, ResourcePath, errorMessage); + auto authResult = new TEvKafka::TEvAuthResult(EAuthSteps::SUCCESS, evResponse, UserToken, DatabasePath, DatabaseId, FolderId, CloudId, ServiceAccountId, Coordinator, ResourcePath, IsServerless, errorMessage); Send(Context->ConnectionId, authResult); } @@ -192,6 +192,7 @@ void TKafkaSaslAuthActor::Handle(NKikimr::TEvTxProxySchemeCache::TEvNavigateKeyS return; } Y_VERIFY(navigate->ResultSet.size() == 1); + IsServerless = navigate->ResultSet.front().DomainInfo->IsServerless(); for (const auto& attr : navigate->ResultSet.front().Attributes) { if (attr.first == "folder_id") FolderId = attr.second; if (attr.first == "cloud_id") CloudId = attr.second; diff --git a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h index d240164baf..6a13e97ab4 100644 --- a/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h +++ b/ydb/core/kafka_proxy/actors/kafka_sasl_auth_actor.h @@ -92,6 +92,7 @@ private: TString Coordinator; TString ResourcePath; TString CloudId; + bool IsServerless; }; } // NKafka diff --git a/ydb/core/kafka_proxy/kafka_connection.cpp b/ydb/core/kafka_proxy/kafka_connection.cpp index e933abb087..6108bfc632 100644 --- a/ydb/core/kafka_proxy/kafka_connection.cpp +++ b/ydb/core/kafka_proxy/kafka_connection.cpp @@ -319,6 +319,7 @@ protected: Context->DatabaseId = event->DatabaseId; Context->CloudId = event->CloudId; Context->FolderId = event->FolderId; + Context->IsServerless = event->IsServerless; KAFKA_LOG_D("Authentificated successful. SID=" << Context->UserToken->GetUserSID()); } @@ -468,7 +469,7 @@ protected: Step = HEADER_PROCESS; break; - + case HEADER_PROCESS: Request->ApiKey = *(TKafkaInt16*)Request->Buffer->Data(); Request->ApiVersion = *(TKafkaVersion*)(Request->Buffer->Data() + sizeof(TKafkaInt16)); diff --git a/ydb/core/kafka_proxy/kafka_events.h b/ydb/core/kafka_proxy/kafka_events.h index e623360292..32d55f3a49 100644 --- a/ydb/core/kafka_proxy/kafka_events.h +++ b/ydb/core/kafka_proxy/kafka_events.h @@ -61,7 +61,7 @@ struct TEvKafka { } TEvAuthResult(EAuthSteps authStep, std::shared_ptr<TEvKafka::TEvResponse> clientResponse, TIntrusiveConstPtr<NACLib::TUserToken> token, TString databasePath, TString databaseId, - TString folderId, TString cloudId, TString serviceAccountId, TString coordinator, TString resourcePath, TString error = "") + TString folderId, TString cloudId, TString serviceAccountId, TString coordinator, TString resourcePath, bool isServerless, TString error = "") : AuthStep(authStep) , UserToken(token) , DatabasePath(databasePath) @@ -71,6 +71,7 @@ struct TEvKafka { , DatabaseId(databaseId) , Coordinator(coordinator) , ResourcePath(resourcePath) + , IsServerless(isServerless) , Error(error) , ClientResponse(std::move(clientResponse)) { } @@ -84,6 +85,7 @@ struct TEvKafka { TString DatabaseId; TString Coordinator; TString ResourcePath; + bool IsServerless; TString Error; TString SaslMechanism; @@ -97,7 +99,7 @@ struct TEvKafka { SaslMechanism(saslMechanism), ClientResponse(std::move(clientResponse)) {} - + EAuthSteps AuthStep; TString Error; TString SaslMechanism; diff --git a/ydb/core/kafka_proxy/kafka_metrics.cpp b/ydb/core/kafka_proxy/kafka_metrics.cpp index b001d2e6c5..758f128d7b 100644 --- a/ydb/core/kafka_proxy/kafka_metrics.cpp +++ b/ydb/core/kafka_proxy/kafka_metrics.cpp @@ -4,9 +4,10 @@ namespace NKafka { TVector<std::pair<TString, TString>> BuildLabels(const NKafka::TContext::TPtr context, const TString& method, const TString& topic, const TString& name, const TString& errorCode) { - return {{"database", context->DatabasePath}, {"method", method}, {"cloud_id", context->CloudId}, + return {{"counters", context->IsServerless ? "datastreams_serverless" : "datastreams"}, + {"database", context->DatabasePath}, {"method", method}, {"cloud_id", context->CloudId}, {"folder_id", context->FolderId}, {"database_id", context->DatabaseId}, - {"topic", topic}, {"errorCode", errorCode}, {"name", name}}; + {"topic", topic}, {"error_code", errorCode}, {"name", name}}; } TActorId MakeKafkaMetricsServiceID() { |