diff options
| author | Nikolay Shestakov <[email protected]> | 2026-05-19 17:11:47 +0500 |
|---|---|---|
| committer | GitHub <[email protected]> | 2026-05-19 17:11:47 +0500 |
| commit | 6cfd6eccef2143f752356af9a2b00a5cc54fee89 (patch) | |
| tree | 26f67db02e0912b63180e0cd1330643a83bb3dfd | |
| parent | 774093255c06a377bcc214a7379839afec6e74a6 (diff) | |
Refactoring: move serialization and deserialization from http_req.cpp (#40332)
30 files changed, 880 insertions, 546 deletions
diff --git a/ydb/core/http_proxy/auth_actors.cpp b/ydb/core/http_proxy/auth_actors.cpp index 7a7d65729a4..0eb101ab26a 100644 --- a/ydb/core/http_proxy/auth_actors.cpp +++ b/ydb/core/http_proxy/auth_actors.cpp @@ -1,5 +1,7 @@ #include "auth_actors.h" +#include "http_req.h" + #include <ydb/core/base/path.h> #include <ydb/core/base/ticket_parser.h> #include <ydb/core/protos/config.pb.h> diff --git a/ydb/core/http_proxy/auth_actors.h b/ydb/core/http_proxy/auth_actors.h index 58b156e2118..d8f89e15c8c 100644 --- a/ydb/core/http_proxy/auth_actors.h +++ b/ydb/core/http_proxy/auth_actors.h @@ -1,13 +1,20 @@ #pragma once -#include "http_req.h" - #include <ydb/core/protos/serverless_proxy_config.pb.h> #include <ydb/library/actors/core/actorsystem_fwd.h> +#include <util/generic/ptr.h> + +namespace NKikimr::NSQS { + class TAwsRequestSignV4; +} + namespace NKikimr::NHttpProxy { - NActors::IActor* CreateAccessServiceActor(const NKikimrConfig::TServerlessProxyConfig& config); - NActors::IActor* CreateIamTokenServiceActor(const NKikimrConfig::TServerlessProxyConfig& config); - NActors::IActor* CreateIamAuthActor(const NActors::TActorId sender, THttpRequestContext& context, THolder<NKikimr::NSQS::TAwsRequestSignV4> signature); +struct THttpRequestContext; + +NActors::IActor* CreateAccessServiceActor(const NKikimrConfig::TServerlessProxyConfig& config); +NActors::IActor* CreateIamTokenServiceActor(const NKikimrConfig::TServerlessProxyConfig& config); +NActors::IActor* CreateIamAuthActor(const NActors::TActorId sender, THttpRequestContext& context, THolder<NKikimr::NSQS::TAwsRequestSignV4> signature); + } // namespace NKikimr::NHttpProxy diff --git a/ydb/core/http_proxy/auth_factory.cpp b/ydb/core/http_proxy/auth_factory.cpp index 641581b99f5..d22f5b7cd30 100644 --- a/ydb/core/http_proxy/auth_factory.cpp +++ b/ydb/core/http_proxy/auth_factory.cpp @@ -1,17 +1,16 @@ -#include "auth_actors.h" #include "auth_factory.h" +#include "auth_actors.h" +#include "http_service.h" +#include "metrics_actor.h" + #include <ydb/core/base/feature_flags.h> -#include <ydb/core/http_proxy/http_service.h> -#include <ydb/core/http_proxy/metrics_actor.h> -#include <ydb/core/protos/config.pb.h> #include <ydb/library/actors/http/http_proxy.h> #include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/iam_private/iam.h> #include <ydb/public/sdk/cpp/src/client/types/core_facility/simple_core_facility.h> namespace NKikimr::NHttpProxy { - void TIamAuthFactory::InitTenantDiscovery( NActors::TActorSystemSetup::TLocalServices&, const TAppData&, @@ -23,7 +22,7 @@ void TIamAuthFactory::Initialize( NActors::TActorSystemSetup::TLocalServices& localServices, const TAppData& appData, const THttpConfig& httpConfig, - const NKikimrConfig::TGRpcConfig& grpcConfig) + const NKikimrConfig::TGRpcConfig& grpcConfig) { if (!httpConfig.GetEnabled()) { return; @@ -43,7 +42,7 @@ void TIamAuthFactory::Initialize( if (httpConfig.GetYandexCloudMode() && httpConfig.GetYandexCloudServiceRegion().empty()) { Cout << "HttpProxy: YandexCloudServiceRegion must not be empty" << Endl; } - IActor* actor = NKikimr::NHttpProxy::CreateAccessServiceActor(config); + NActors::IActor* actor = NKikimr::NHttpProxy::CreateAccessServiceActor(config); localServices.push_back(std::pair<TActorId, TActorSetupCmd>( NKikimr::NHttpProxy::MakeAccessServiceID(), TActorSetupCmd(actor, TMailboxType::HTSwap, appData.UserPoolId))); @@ -56,7 +55,7 @@ void TIamAuthFactory::Initialize( const NYdb::TCredentialsProviderFactoryPtr credentialsProviderFactory = jwtFilename.empty() ? NYdb::CreateInsecureCredentialsProviderFactory() : NYdb::CreateIamJwtFileCredentialsProviderFactoryPrivate( - {{.Endpoint = iamExternalEndpoint}, jwtFilename} ); + {{.Endpoint = iamExternalEndpoint}, jwtFilename}); const std::shared_ptr<NYdb::ICoreFacility> coreFacility = jwtFilename.empty() ? nullptr @@ -94,7 +93,7 @@ void TIamAuthFactory::Initialize( TActorSetupCmd(actor, TMailboxType::HTSwap, appData.UserPoolId))); } -NActors::IActor* TIamAuthFactory::CreateAuthActor(const TActorId sender, THttpRequestContext& context, THolder<NKikimr::NSQS::TAwsRequestSignV4>&& signature) const +NActors::IActor* TIamAuthFactory::CreateAuthActor(const NActors::TActorId sender, THttpRequestContext& context, THolder<NKikimr::NSQS::TAwsRequestSignV4>&& signature) const { return CreateIamAuthActor(sender, context, std::move(signature)); } diff --git a/ydb/core/http_proxy/auth_factory.h b/ydb/core/http_proxy/auth_factory.h index c8acb2cc43e..dfc139a5099 100644 --- a/ydb/core/http_proxy/auth_factory.h +++ b/ydb/core/http_proxy/auth_factory.h @@ -1,12 +1,15 @@ #pragma once -#include "http_req.h" - #include <ydb/core/base/appdata.h> +#include <ydb/core/protos/config.pb.h> +#include <ydb/library/actors/core/actorsystem.h> #include <ydb/library/http_proxy/authorization/signature.h> +#include <util/generic/ptr.h> + namespace NKikimr::NHttpProxy { +struct THttpRequestContext; class IAuthFactory { public: @@ -18,7 +21,7 @@ public: const THttpConfig& config, const NKikimrConfig::TGRpcConfig& grpcConfig) = 0; - virtual NActors::IActor* CreateAuthActor(const TActorId sender, THttpRequestContext& context, THolder<NKikimr::NSQS::TAwsRequestSignV4>&& signature) const = 0; + virtual NActors::IActor* CreateAuthActor(const NActors::TActorId sender, THttpRequestContext& context, THolder<NKikimr::NSQS::TAwsRequestSignV4>&& signature) const = 0; virtual ~IAuthFactory() = default; }; @@ -34,7 +37,7 @@ public: const THttpConfig& config, const NKikimrConfig::TGRpcConfig& grpcConfig) final; - NActors::IActor* CreateAuthActor(const TActorId sender, THttpRequestContext& context, THolder<NKikimr::NSQS::TAwsRequestSignV4>&& signature) const final; + NActors::IActor* CreateAuthActor(const NActors::TActorId sender, THttpRequestContext& context, THolder<NKikimr::NSQS::TAwsRequestSignV4>&& signature) const final; virtual void InitTenantDiscovery(NActors::TActorSystemSetup::TLocalServices&, const TAppData& appData, diff --git a/ydb/core/http_proxy/controller_base.cpp b/ydb/core/http_proxy/controller_base.cpp new file mode 100644 index 00000000000..7a9daa3d66d --- /dev/null +++ b/ydb/core/http_proxy/controller_base.cpp @@ -0,0 +1,50 @@ +#include "controller_base.h" + +namespace NKikimr::NHttpProxy { + + bool TBaseHttpController::Execute( + THttpRequestContext&& context, + THolder<NKikimr::NSQS::TAwsRequestSignV4> signature + ) const { + const auto& ctx = TlsActivationContext->AsActorContext(); + + const auto name = context.MethodName; + const auto contentType = context.ContentType; + const auto apiVersion = context.ApiVersion; + + auto proc = GetProcessor(name, context); + if (proc.has_value()) { + proc.value()->Execute(std::move(context), std::move(signature), ctx); + return true; + } else { + switch (proc.error()) { + case IHttpController::EError::MethodNotFound: + context.DoReply(MakeError(contentType, NYdb::EStatus::UNSUPPORTED, + TStringBuilder() << "Unknown method name " << name.Quote(), static_cast<size_t>(NYds::EErrorCodes::MISSING_ACTION))); + return false; + case IHttpController::EError::ServiceDisabled: + context.DoReply(MakeError(contentType, NYdb::EStatus::BAD_REQUEST, + TStringBuilder() << apiVersion << " is disabled", static_cast<size_t>(NYds::EErrorCodes::NOT_FOUND))); + return false; + } + } + + return false; + } + + std::expected<IHttpRequestProcessor*, IHttpController::EError> TBaseHttpController::GetProcessor( + const TString& name, + const THttpRequestContext& context + ) const { + if (!IsEnabled(context.ServiceConfig.GetHttpConfig())) { + return std::unexpected(IHttpController::EError::ServiceDisabled); + } + + if (auto proc = Name2Processor.find(name); proc != Name2Processor.end()) { + return proc->second.get(); + } + + return std::unexpected(IHttpController::EError::MethodNotFound); + } + +} // namespace NKikimr::NHttpProxy
\ No newline at end of file diff --git a/ydb/core/http_proxy/controller_base.h b/ydb/core/http_proxy/controller_base.h new file mode 100644 index 00000000000..54bc80510a8 --- /dev/null +++ b/ydb/core/http_proxy/controller_base.h @@ -0,0 +1,27 @@ +#pragma once + +#include "http_req.h" + +namespace NKikimr::NHttpProxy { + + class TBaseHttpController: public IHttpController { + public: + bool Execute( + THttpRequestContext&& context, + THolder<NKikimr::NSQS::TAwsRequestSignV4> signature + ) const override; + + virtual bool IsEnabled(const NKikimrConfig::THttpProxyConfig&) const = 0; + + protected: + std::expected<IHttpRequestProcessor*, IHttpController::EError> GetProcessor( + const TString& name, + const THttpRequestContext& context + ) const; + + protected: + absl::flat_hash_map<TString, std::unique_ptr<IHttpRequestProcessor>> Name2Processor; + + }; + +} // namespace NKikimr::NHttpProxy diff --git a/ydb/core/http_proxy/controller_registry.cpp b/ydb/core/http_proxy/controller_registry.cpp new file mode 100644 index 00000000000..901c7172f06 --- /dev/null +++ b/ydb/core/http_proxy/controller_registry.cpp @@ -0,0 +1,62 @@ +#include "datastreams.h" +#include "http_req.h" +#include "sqs.h" +#include "ymq.h" + +#include <ydb/core/protos/config.pb.h> +#include <ydb/core/protos/serverless_proxy_config.pb.h> + +#include <util/generic/algorithm.h> + +namespace NKikimr::NHttpProxy { + + namespace { + + class TSqsControllerProxy: public IHttpController { + public: + THttpResponseData MakeError(MimeTypes contentType, NYdb::EStatus Status, const TStringBuf message, size_t issueCode) const override { + return GetSqsHttpController()->MakeError(contentType, Status, message, issueCode); + } + + bool IsPossible(const TStringBuf apiVersion, const NKikimrConfig::TServerlessProxyConfig& config) const override { + return GetController(config)->IsPossible(apiVersion, config); + } + + bool Execute( + THttpRequestContext&& context, + THolder<NKikimr::NSQS::TAwsRequestSignV4> signature + ) const override { + return GetController(context.ServiceConfig)->Execute(std::move(context), std::move(signature)); + } + + private: + const IHttpController* GetController(const NKikimrConfig::TServerlessProxyConfig& config) const { + return config.GetHttpConfig().GetYmqEnabled() ? GetYmqHttpController() : GetSqsHttpController(); + } + }; + + TSqsControllerProxy SqsControllerProxyInstance; + + const std::array<const IHttpController*, 2> Controllers = { + &SqsControllerProxyInstance, + GetDataStreamsHttpController() + }; + + THttpControllerRegistry Instance; + + } // namespace + + const THttpControllerRegistry& GetHttpControllerRegistry() { + return Instance; + } + + const IHttpController* THttpControllerRegistry::GetController(const TStringBuf apiVersion, const NKikimrConfig::TServerlessProxyConfig& config) const { + for (const auto& controller : Controllers) { + if (controller->IsPossible(apiVersion, config)) { + return controller; + } + } + return nullptr; + } + +} // namespace NKikimr::NHttpProxy diff --git a/ydb/core/http_proxy/datastreams.cpp b/ydb/core/http_proxy/datastreams.cpp index d0562858e77..a96872a3a3b 100644 --- a/ydb/core/http_proxy/datastreams.cpp +++ b/ydb/core/http_proxy/datastreams.cpp @@ -1,8 +1,11 @@ +#include "datastreams.h" + #include "auth_factory.h" +#include "controller_base.h" #include "custom_metrics.h" +#include "datastreams_serialization.h" #include "exceptions_mapping.h" #include "http_req.h" -#include "json_proto_conversion.h" #include "utils.h" #include <ydb/core/base/appdata.h> @@ -86,9 +89,10 @@ namespace NKikimr::NHttpProxy { template<class TProtoService, class TProtoRequest, class TProtoResponse, class TProtoResult, class TProtoCall, class TRpcEv> class THttpRequestProcessor : public TBaseHttpRequestProcessor<TProtoService, TProtoRequest, TProtoResponse, TProtoResult, TProtoCall, TRpcEv>{ - using TProcessorBase = TBaseHttpRequestProcessor<TProtoService, TProtoRequest, TProtoResponse, TProtoResult, TProtoCall, TRpcEv>; + using TProcessorBase = TBaseHttpRequestProcessor<TProtoService, TProtoRequest, TProtoResponse, TProtoResult, TProtoCall, TRpcEv>; public: - THttpRequestProcessor(TString method, TProtoCall protoCall) : TProcessorBase(method, protoCall) + THttpRequestProcessor(TString method, TProtoCall protoCall) + : TProcessorBase(method, protoCall) { } @@ -276,6 +280,8 @@ namespace NKikimr::NHttpProxy { } void ReplyWithError(const TActorContext& ctx, NYdb::EStatus status, const TString& errorText, size_t issueCode = ISSUE_CODE_GENERIC) { + auto exception = MapToException(status, Method, issueCode); + /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{ 1, true, true, @@ -284,7 +290,7 @@ namespace NKikimr::NHttpProxy { {"folder", HttpContext.FolderId}, {"database", HttpContext.DatabaseId}, {"stream", HttpContext.StreamName}, - {"code", TStringBuilder() << (int)MapToException(status, Method, issueCode).second}, + {"code", TStringBuilder() << (int)exception.second}, {"name", "api.http.errors_per_second"}} }); @@ -297,37 +303,42 @@ namespace NKikimr::NHttpProxy { {"folder_id", HttpContext.FolderId}, {"database_id", HttpContext.DatabaseId}, {"topic", HttpContext.StreamName}, - {"code", TStringBuilder() << (int)MapToException(status, Method, issueCode).second}, + {"code", TStringBuilder() << (int)exception.second}, {"name", "api.http.data_streams.response.count"}} }); - HttpContext.ResponseData.Status = status; - HttpContext.ResponseData.ErrorText = errorText; - ReplyToHttpContext(ctx, issueCode); + ReplyToHttpContext({ + .HttpCode = exception.second, + .ContentType = HttpContext.ContentType, + .Message = exception.first, + .Body = NDataStreams::Serialize(HttpContext.ContentType, { + .Exception = exception, + .ErrorText = errorText, + }) + }, status, issueCode, errorText); ctx.Send(AuthActor, new TEvents::TEvPoisonPill()); TBase::Die(ctx); } - void ReplyToHttpContext(const TActorContext& ctx, std::optional<size_t> issueCode = std::nullopt) { + void ReplyToHttpContext(THttpResponseData&& data, NYdb::EStatus status, std::optional<size_t> issueCode = std::nullopt, TStringBuf errorText = "OK") { + const TActorContext& ctx = TlsActivationContext->AsActorContext(); + ReportLatencyCounters(ctx); - LogHttpRequestResponse(ctx, issueCode); + LogHttpRequestResponse(ctx, status, issueCode, errorText); - if (issueCode.has_value()) { - HttpContext.DoReply(ctx, issueCode.value()); - } else { - HttpContext.DoReply(ctx); - } + HttpContext.DoReply(std::move(data)); } - void LogHttpRequestResponse(const TActorContext& ctx, const std::optional<size_t> issueCode) { - const int httpCode = issueCode ? MapToException(HttpContext.ResponseData.Status, Method, *issueCode).second : 200; + void LogHttpRequestResponse(const TActorContext& ctx, NYdb::EStatus status, const std::optional<size_t> issueCode, TStringBuf errorText) { + const int httpCode = issueCode ? MapToException(status, Method, *issueCode).second : 200; const bool isServerError = IsServerError(httpCode); auto priority = isServerError ? NActors::NLog::PRI_WARN : NActors::NLog::PRI_INFO; LOG_LOG_S_SAMPLED_BY(ctx, priority, NKikimrServices::HTTP_PROXY, NSqsTopic::SampleIdFromRequestId(HttpContext.RequestId), - "Request [" << HttpContext.RequestId << "] " << LogHttpRequestResponseCommonInfoString(HttpContext, StartTime, "Kinesis", HttpContext.StreamName, Method, {}, httpCode, HttpContext.ResponseData.ErrorText)); + "Request [" << HttpContext.RequestId << "] " << + LogHttpRequestResponseCommonInfoString(HttpContext, StartTime, "Kinesis", HttpContext.StreamName, Method, {}, httpCode, errorText)); } void ReportInputCounters(const TActorContext& ctx) { @@ -388,8 +399,6 @@ namespace NKikimr::NHttpProxy { void HandleGrpcResponse(TEvServerlessProxy::TEvGrpcRequestResult::TPtr ev, const TActorContext& ctx) { if (ev->Get()->Status->IsSuccess()) { - ProtoToJson(*ev->Get()->Message, HttpContext.ResponseData.Body, - HttpContext.ContentType == MIME_CBOR); FillOutputCustomMetrics<TProtoResult>( *(dynamic_cast<TProtoResult*>(ev->Get()->Message.Get())), HttpContext, ctx); /* deprecated metric: */ ctx.Send(MakeMetricsServiceID(), @@ -408,7 +417,12 @@ namespace NKikimr::NHttpProxy { {"code", "200"}, {"name", "api.http.data_streams.response.count"}} }); - ReplyToHttpContext(ctx); + ReplyToHttpContext({ + .HttpCode = 200, + .ContentType = HttpContext.ContentType, + .Message = "", + .Body = NDataStreams::Serialize(HttpContext.ContentType, *ev->Get()->Message) + }, NYdb::EStatus::SUCCESS); } else { auto retryClass = NYdb::NTopic::GetRetryErrorClass(ev->Get()->Status->GetStatus()); @@ -449,7 +463,7 @@ namespace NKikimr::NHttpProxy { void Bootstrap(const TActorContext& ctx) { StartTime = ctx.Now(); try { - HttpContext.RequestBodyToProto(&Request); + NDataStreams::Deserialize<TProtoRequest>(HttpContext.ContentType, Request, HttpContext.Request->Body); } catch (const NKikimr::NSQS::TSQSException& e) { NYds::EErrorCodes issueCode = NYds::EErrorCodes::OK; if (e.ErrorClass.ErrorCode == "MissingParameter") @@ -513,17 +527,18 @@ namespace NKikimr::NHttpProxy { }; - class TController : public IHttpController { + class TController : public TBaseHttpController { public: TController() { - #define DECLARE_DATASTREAMS_PROCESSOR(name) Name2Processor[#name] = MakeHolder<THttpRequestProcessor<\ - DataStreamsService, \ - name##Request, \ - name##Response, \ - name##Result, \ - decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::Async##name), \ - NKikimr::NGRpcService::TEvDataStreams##name##Request>> \ - (#name, &Ydb::DataStreams::V1::DataStreamsService::Stub::Async##name); + #define DECLARE_DATASTREAMS_PROCESSOR(name) Name2Processor[#name] = \ + std::make_unique<THttpRequestProcessor< \ + DataStreamsService, \ + name##Request, \ + name##Response, \ + name##Result, \ + decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::Async##name), \ + NKikimr::NGRpcService::TEvDataStreams##name##Request \ + >>(#name, &Ydb::DataStreams::V1::DataStreamsService::Stub::Async##name) \ DECLARE_DATASTREAMS_PROCESSOR(PutRecords); DECLARE_DATASTREAMS_PROCESSOR(CreateStream); @@ -556,35 +571,37 @@ namespace NKikimr::NHttpProxy { DECLARE_DATASTREAMS_PROCESSOR(StopStreamEncryption); #undef DECLARE_DATASTREAMS_PROCESSOR - } - std::expected<IHttpRequestProcessor*, IHttpController::EError> GetProcessor( - const TString& name, - const THttpRequestContext& context - ) const override { - if (context.ApiVersion != "kinesisApi") { - return std::unexpected(IHttpController::EError::NotMyProtocol); - } - - if (auto proc = Name2Processor.find(name); proc != Name2Processor.end()) { - return std::expected<IHttpRequestProcessor*, IHttpController::EError>(proc->second.Get()); - } + THttpResponseData MakeError(MimeTypes contentType, NYdb::EStatus Status, const TStringBuf message, size_t issueCode) const override { + const auto exception = MapToException(Status, "", issueCode); + return { + .HttpCode = exception.second, + .ContentType = contentType, + .Message = exception.first, + .Body = NDataStreams::Serialize(contentType, NDataStreams::TErrorResponse{ + .Exception = exception, + .ErrorText = TString(message), + }) + }; + } - return std::unexpected(IHttpController::EError::MethodNotFound); + bool IsEnabled(const NKikimrConfig::THttpProxyConfig& config) const override { + return config.GetDataStreamsEnabled(); } - private: - THashMap<TString, THolder<IHttpRequestProcessor>> Name2Processor; + bool IsPossible(const TStringBuf apiVersion, const NKikimrConfig::TServerlessProxyConfig&) const override { + return apiVersion == "kinesisApi"; + } }; + TController ControllerInstance; + } // namespace - std::shared_ptr<const IHttpController> CreateDataStreamsHttpController(const NKikimrConfig::TServerlessProxyConfig& config) { - if (config.GetHttpConfig().GetDataStreamsEnabled()) { - return std::make_shared<TController>(); - } - return {}; + const IHttpController* GetDataStreamsHttpController() { + return &ControllerInstance; } } // namespace NKikimr::NHttpProxy + diff --git a/ydb/core/http_proxy/datastreams.h b/ydb/core/http_proxy/datastreams.h index 5b5652dcfbb..b7f14455929 100644 --- a/ydb/core/http_proxy/datastreams.h +++ b/ydb/core/http_proxy/datastreams.h @@ -4,6 +4,6 @@ namespace NKikimr::NHttpProxy { -std::shared_ptr<const IHttpController> CreateDataStreamsHttpController(const NKikimrConfig::TServerlessProxyConfig& config); +const IHttpController* GetDataStreamsHttpController(); } // namespace NKikimr::NHttpProxy diff --git a/ydb/core/http_proxy/datastreams_serialization.cpp b/ydb/core/http_proxy/datastreams_serialization.cpp new file mode 100644 index 00000000000..b6b5656a3b0 --- /dev/null +++ b/ydb/core/http_proxy/datastreams_serialization.cpp @@ -0,0 +1,48 @@ +#include "datastreams_serialization.h" + +namespace NKikimr::NHttpProxy::NDataStreams { + + template<> + void PrepareValue<Ydb::DataStreams::V1::ListStreamsRequest>(Ydb::DataStreams::V1::ListStreamsRequest& value) { + value.set_recurse(true); + } + + TString Serialize(const MimeTypes mimeType, const NProtoBuf::Message& value) { + switch (mimeType) { + case MIME_CBOR: + return SerializeCbor(value); + case MIME_JSON: + [[fallthrough]]; + default: + return SerializeJson(value); + } + } + + TString Serialize(const MimeTypes mimeType, TErrorResponse&& value) { + NJson::TJsonValue json; + json.SetType(NJson::JSON_MAP); + json["message"] = value.ErrorText; + json["__type"] = value.Exception.first; + + switch (mimeType) { + case MIME_CBOR: + return SerializeCbor(json); + case MIME_JSON: + [[fallthrough]]; + default: + return SerializeJson(json); + } + } + +} // namespace NKikimr::NHttpProxy::NDataStreams + +namespace NKikimr::NHttpProxy { + + TString BuildError(MimeTypes mimeType, HttpCodes httpCode, const TString& errorName, const TString& errorText) { + return Serialize(mimeType, NDataStreams::TErrorResponse{ + .Exception = TException{errorName, httpCode}, + .ErrorText = errorText + }); + } + +} // namespace NKikimr::NHttpProxy diff --git a/ydb/core/http_proxy/datastreams_serialization.h b/ydb/core/http_proxy/datastreams_serialization.h new file mode 100644 index 00000000000..84cd9757206 --- /dev/null +++ b/ydb/core/http_proxy/datastreams_serialization.h @@ -0,0 +1,51 @@ +#pragma once + +#include "exceptions_mapping.h" +#include "json_proto_conversion.h" +#include "serialization.h" + +#include <ydb/library/http_proxy/error/error.h> +#include <ydb/public/api/protos/draft/datastreams.pb.h> + +namespace NKikimr::NHttpProxy::NDataStreams { + + template<typename TValue> + void PrepareValue(TValue& value) { + Y_UNUSED(value); + } + + template<> + void PrepareValue<Ydb::DataStreams::V1::ListStreamsRequest>(Ydb::DataStreams::V1::ListStreamsRequest& value); + + template<typename TValue> + void Deserialize(const MimeTypes mimeType, TValue& value, const TStringBuf& input) + requires std::is_base_of_v<NProtoBuf::Message, TValue> { + + if (input.empty()) { + throw NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::MALFORMED_QUERY_STRING) << "Empty body"; + } + + switch (mimeType) { + case MIME_CBOR: + PrepareValue(value); + DeserializeCbor(value, input); + break; + case MIME_JSON: + PrepareValue(value); + DeserializeJson(value, input); + break; + default: + throw NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::MALFORMED_QUERY_STRING) << + "Unknown ContentType"; + } + }; + + TString Serialize(const MimeTypes mimeType, const NProtoBuf::Message& value); + + struct TErrorResponse { + TException Exception; + TString ErrorText; + }; + TString Serialize(const MimeTypes mimeType, TErrorResponse&& value); + +} // namespace NKikimr::NHttpProxy::NDataStreams diff --git a/ydb/core/http_proxy/discovery_actor.cpp b/ydb/core/http_proxy/discovery_actor.cpp index 317d593d542..ca7624294e0 100644 --- a/ydb/core/http_proxy/discovery_actor.cpp +++ b/ydb/core/http_proxy/discovery_actor.cpp @@ -5,7 +5,6 @@ #include <ydb/library/actors/core/events.h> #include <ydb/library/actors/core/hfunc.h> #include <ydb/library/actors/core/log.h> -#include <ydb/public/api/grpc/ydb_discovery_v1.grpc.pb.h> #include <library/cpp/cache/cache.h> @@ -177,3 +176,4 @@ namespace NKikimr::NHttpProxy { return new TDiscoveryProxyActor(credentialsProvider, config); } } // namespace NKikimr::NHttpProxy + diff --git a/ydb/core/http_proxy/exceptions_mapping.cpp b/ydb/core/http_proxy/exceptions_mapping.cpp index 081207cd5d3..dc850d74520 100644 --- a/ydb/core/http_proxy/exceptions_mapping.cpp +++ b/ydb/core/http_proxy/exceptions_mapping.cpp @@ -174,3 +174,4 @@ TException UnsupportedExceptions(const TString& method, NYds::EErrorCodes issueC } } // namespace NKikimr::NHttpProxy + diff --git a/ydb/core/http_proxy/http_req.cpp b/ydb/core/http_proxy/http_req.cpp index 8f8e0c0dc9a..ccaba669bae 100644 --- a/ydb/core/http_proxy/http_req.cpp +++ b/ydb/core/http_proxy/http_req.cpp @@ -1,75 +1,24 @@ -#include "auth_factory.h" -#include "custom_metrics.h" -#include "datastreams.h" -#include "exceptions_mapping.h" #include "http_req.h" -#include "json_proto_conversion.h" -#include "sqs.h" + +#include "auth_factory.h" #include "utils.h" -#include "ymq.h" -#include <ydb/core/base/appdata.h> -#include <ydb/core/base/path.h> -#include <ydb/core/grpc_caching/cached_grpc_request_actor.h> -#include <ydb/core/grpc_services/local_rpc/local_rpc.h> -#include <ydb/core/protos/serverless_proxy_config.pb.h> -#include <ydb/core/security/ticket_parser_impl.h> -#include <ydb/core/viewer/json/json.h> -#include <ydb/core/ymq/actor/auth_multi_factory.h> -#include <ydb/core/ymq/actor/serviceid.h> #include <ydb/library/actors/http/http_proxy.h> -#include <ydb/library/folder_service/events.h> -#include <ydb/library/folder_service/folder_service.h> -#include <ydb/library/grpc/actor_client/grpc_service_cache.h> #include <ydb/library/http_proxy/authorization/auth_helpers.h> #include <ydb/library/http_proxy/error/error.h> -#include <ydb/library/ycloud/api/access_service.h> -#include <ydb/library/ycloud/api/iam_token_service.h> -#include <ydb/library/ycloud/impl/access_service.h> -#include <ydb/library/ycloud/impl/iam_token_service.h> -#include <ydb/public/api/grpc/draft/ydb_sqs_topic_v1.grpc.pb.h> -#include <ydb/public/sdk/cpp/adapters/issue/issue.h> -#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/datastreams/datastreams.h> -#include <ydb/public/sdk/cpp/src/client/topic/impl/common.h> -#include <ydb/services/datastreams/datastreams_proxy.h> -#include <ydb/services/datastreams/next_token.h> -#include <ydb/services/datastreams/shard_iterator.h> -#include <ydb/services/lib/sharding/sharding.h> -#include <ydb/services/sqs_topic/queue_url/utils.h> -#include <ydb/services/sqs_topic/sqs_topic_proxy.h> -#include <ydb/services/sqs_topic/utils.h> -#include <ydb/services/ymq/grpc_service.h> -#include <ydb/services/ymq/rpc_params.h> -#include <ydb/services/ymq/utils.h> -#include <ydb/services/ymq/ymq_proxy.h> - -#include <yql/essentials/public/issue/yql_issue_message.h> #include <library/cpp/cgiparam/cgiparam.h> -#include <library/cpp/digest/old_crc/crc.h> #include <library/cpp/http/misc/parsed_request.h> #include <library/cpp/http/server/response.h> -#include <library/cpp/json/json_reader.h> -#include <library/cpp/json/json_writer.h> -#include <library/cpp/protobuf/json/json_output_create.h> -#include <library/cpp/protobuf/json/proto2json.h> -#include <library/cpp/protobuf/json/proto2json_printer.h> #include <library/cpp/uri/uri.h> -#include <util/generic/guid.h> -#include <util/stream/file.h> #include <util/string/ascii.h> -#include <util/string/cast.h> -#include <util/string/join.h> #include <util/string/vector.h> -#include <nlohmann/json.hpp> namespace NKikimr::NHttpProxy { using namespace google::protobuf; - using namespace Ydb::DataStreams::V1; - using namespace NYdb::NDataStreams::V1; constexpr TStringBuf IAM_HEADER = "x-yacloud-subjecttoken"; constexpr TStringBuf SECURITY_TOKEN_HEADER = "x-amz-security-token"; @@ -82,63 +31,24 @@ namespace NKikimr::NHttpProxy { constexpr TStringBuf REQUEST_CONTENT_TYPE_HEADER = "content-type"; constexpr TStringBuf CREDENTIAL_PARAM = "Credential"; - std::vector<std::shared_ptr<const IHttpController>> BuildControllers(const NKikimrConfig::TServerlessProxyConfig& config) { - auto controllers = std::vector<std::shared_ptr<const IHttpController>>{ - CreateSqsHttpController(config), - CreateYmqHttpController(config), - CreateDataStreamsHttpController(config) - } | std::views::filter([](const auto& controller) { return controller != nullptr; }); - - return std::vector<std::shared_ptr<const IHttpController>>(controllers.begin(), controllers.end()); - } - - THttpRequestProcessors::THttpRequestProcessors(const NKikimrConfig::TServerlessProxyConfig& config) - : Controllers(BuildControllers(config)) - { - } - - void SetApiVersionDisabledErrorText(THttpRequestContext& context) { - context.ResponseData.ErrorText = (TStringBuilder() << context.ApiVersion << " is disabled"); + THttpRequestProcessors::THttpRequestProcessors(const NKikimrConfig::TServerlessProxyConfig&) { } - bool THttpRequestProcessors::Execute(const TString& name, THttpRequestContext&& context, + bool THttpRequestProcessors::Execute(const TString&, THttpRequestContext&& context, THolder<NKikimr::NSQS::TAwsRequestSignV4> signature, - const TActorContext& ctx) { + const TActorContext&) { - for (const auto& controller : Controllers) { - auto proc = controller->GetProcessor(name, context); - if (proc.has_value()) { - proc.value()->Execute(std::move(context), std::move(signature), ctx); - return true; - } else { - switch (proc.error()) { - case IHttpController::EError::NotMyProtocol: - continue; - case IHttpController::EError::MethodNotFound: - context.ResponseData.Status = NYdb::EStatus::UNSUPPORTED; - context.ResponseData.ErrorText = TStringBuilder() << "Unknown method name " << name.Quote(); - context.DoReply(ctx, static_cast<size_t>(NYds::EErrorCodes::MISSING_ACTION)); - return false; - } - } + const auto* controller = GetHttpControllerRegistry().GetController(context.ApiVersion, context.ServiceConfig); + if (controller) { + return controller->Execute(std::move(context), std::move(signature)); } - if (name.empty()) { - context.ResponseData.Status = NYdb::EStatus::UNSUPPORTED; - context.ResponseData.ErrorText = TStringBuilder() << "Unknown method name " << name.Quote(); - context.DoReply(ctx, static_cast<size_t>(NYds::EErrorCodes::MISSING_ACTION)); - return false; - } - - context.ResponseData.IsYmq = context.ApiVersion == "AmazonSQS"; - if (context.ResponseData.IsYmq) { - context.ResponseData.UseYmqStatusCode = true; - context.ResponseData.YmqHttpCode = 400; - } else { - context.ResponseData.Status = NYdb::EStatus::BAD_REQUEST; - } - SetApiVersionDisabledErrorText(context); - context.DoReply(ctx); + context.DoReply(THttpResponseData{ + .HttpCode = 404, + .ContentType = MimeTypes::MIME_TEXT, + .Message = "Not Found", + .Body = "Not Found" + }); return false; } @@ -196,110 +106,38 @@ namespace NKikimr::NHttpProxy { return signature; } - void THttpRequestContext::DoReply(const TActorContext& ctx, size_t issueCode) { - auto createResponse = [this](const auto& request, - TStringBuf status, - TStringBuf message, - TStringBuf contentType, - TStringBuf body) { - NHttp::THttpOutgoingResponsePtr response = - new NHttp::THttpOutgoingResponse(request, "HTTP", "1.1", status, message); - response->Set<&NHttp::THttpResponse::Connection>(request->GetConnection()); - response->Set(REQUEST_ID_HEADER_EXT, RequestId); - if (!contentType.empty() && !body.empty()) { - response->Set<&NHttp::THttpResponse::ContentType>(contentType); - if (!request->Endpoint->CompressContentTypes.empty()) { - contentType = NHttp::Trim(contentType.Before(';'), ' '); - if (Count(request->Endpoint->CompressContentTypes, contentType) != 0) { - response->EnableCompression(); - } - } - } + void THttpRequestContext::DoReply(THttpResponseData&& data) { + auto ctx = TlsActivationContext->AsActorContext(); + LOG_SP_INFO_S(ctx, NKikimrServices::HTTP_PROXY, + "reply with status: " << data.HttpCode << " message: " << data.Message); - if (response->IsNeedBody() || !body.empty()) { - if (request->Method == "HEAD") { - response->Set<&NHttp::THttpResponse::ContentLength>(ToString(body.size())); - } else { - response->SetBody(body); + NHttp::THttpOutgoingResponsePtr response = new NHttp::THttpOutgoingResponse( + Request, + "HTTP", + "1.1", + TStringBuilder() << data.HttpCode, + data.Message + ); + response->Set<&NHttp::THttpResponse::Connection>(Request->GetConnection()); + response->Set(REQUEST_ID_HEADER_EXT, RequestId); + if (!data.Body.empty()) { + const auto contentType = AsAwsContentType(data.ContentType); + response->Set<&NHttp::THttpResponse::ContentType>(contentType); + if (!Request->Endpoint->CompressContentTypes.empty()) { + TStringBuf buffer = contentType; + auto contentType = NHttp::Trim(buffer.Before(';'), ' '); + if (Count(Request->Endpoint->CompressContentTypes, contentType) != 0) { + response->EnableCompression(); } } - return response; - }; - auto strByMimeAws = [](MimeTypes contentType) { - switch (contentType) { - case MIME_JSON: - return "application/x-amz-json-1.1"; - case MIME_CBOR: - return "application/x-amz-cbor-1.1"; - default: - return strByMime(contentType); - } - }; - - if (ResponseData.Status == NYdb::EStatus::SUCCESS) { - LOG_SP_INFO_S(ctx, NKikimrServices::HTTP_PROXY, "reply ok"); - } else { - LOG_SP_INFO_S(ctx, NKikimrServices::HTTP_PROXY, - "reply with status: " << ResponseData.Status << - " message: " << ResponseData.ErrorText); - ResponseData.Body.SetType(NJson::JSON_MAP); - ResponseData.Body["message"] = ResponseData.ErrorText; - if (ResponseData.UseYmqStatusCode) { - ResponseData.Body["__type"] = ResponseData.YmqStatusCode; - } else { - ResponseData.Body["__type"] = MapToException(ResponseData.Status, MethodName, issueCode).first; - } } - TString errorName; - ui32 httpCode; - if (ResponseData.UseYmqStatusCode) { - httpCode = ResponseData.YmqHttpCode; - errorName = ResponseData.YmqStatusCode; - } else { - std::tie(errorName, httpCode) = MapToException(ResponseData.Status, MethodName, issueCode); - } - auto response = createResponse( - Request, - TStringBuilder() << (ui32)httpCode, - errorName, - strByMimeAws(ContentType), - ResponseData.DumpBody(ContentType) - ); - - if (ResponseData.IsYmq && ServiceConfig.GetHttpConfig().GetYandexCloudMode()) { - // Send request attributes to the metering actor - auto reportRequestAttributes = MakeHolder<NSQS::TSqsEvents::TEvReportProcessedRequestAttributes>(); - - auto& requestAttributes = reportRequestAttributes->Data; - - requestAttributes.HttpStatusCode = httpCode; - requestAttributes.IsFifo = ResponseData.YmqIsFifo; - requestAttributes.FolderId = FolderId; - requestAttributes.RequestSizeInBytes = Request->Size(); - requestAttributes.ResponseSizeInBytes = response->Size(); - requestAttributes.SourceAddress = SourceAddress; - requestAttributes.ResourceId = ResourceId; - requestAttributes.Action = NSQS::ActionFromString(MethodName); - for (const auto& [k, v] : ResponseData.QueueTags) { - requestAttributes.QueueTags[k] = v; + if (response->IsNeedBody() || !data.Body.empty()) { + if (Request->Method == "HEAD") { + response->Set<&NHttp::THttpResponse::ContentLength>(ToString(data.Body.size())); + } else { + response->SetBody(data.Body); } - - LOG_SP_DEBUG_S( - ctx, - NKikimrServices::HTTP_PROXY, - TStringBuilder() << "Send metering event." - << " HttpStatusCode: " << requestAttributes.HttpStatusCode - << " IsFifo: " << requestAttributes.IsFifo - << " FolderId: " << requestAttributes.FolderId - << " RequestSizeInBytes: " << requestAttributes.RequestSizeInBytes - << " ResponseSizeInBytes: " << requestAttributes.ResponseSizeInBytes - << " SourceAddress: " << requestAttributes.SourceAddress - << " ResourceId: " << requestAttributes.ResourceId - << " Action: " << requestAttributes.Action - ); - - ctx.Send(NSQS::MakeSqsMeteringServiceID(), reportRequestAttributes.Release()); } ctx.Send(Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response)); @@ -331,7 +169,7 @@ namespace NKikimr::NHttpProxy { for (const auto& header : headers.Headers) { if (AsciiEqualsIgnoreCase(header.first, IAM_HEADER)) { IamToken = header.second; - } else if(AsciiEqualsIgnoreCase(header.first, SECURITY_TOKEN_HEADER)) { + } else if (AsciiEqualsIgnoreCase(header.first, SECURITY_TOKEN_HEADER)) { SecurityToken = header.second; } else if (AsciiEqualsIgnoreCase(header.first, AUTHORIZATION_HEADER)) { if (header.second.StartsWith("Bearer ")) { @@ -359,98 +197,25 @@ namespace NKikimr::NHttpProxy { RequestId = GenerateRequestId(sourceReqId); } - TString THttpResponseData::DumpBody(MimeTypes contentType) { - // according to https://json.nlohmann.me/features/binary_formats/cbor/#serialization - auto cborBinaryTagBySize = [](size_t size) -> ui8 { - if (size <= 23) { - return 0x40 + static_cast<ui32>(size); - } else if (size <= 255) { - return 0x58; - } else if (size <= 65536) { - return 0x59; - } - - return 0x5A; - }; + TString AsAwsContentType(MimeTypes contentType) { switch (contentType) { - case MIME_CBOR: { - bool gotData = false; - std::function<bool(int, nlohmann::json::parse_event_t, nlohmann::basic_json<>&)> bz = - [&gotData, &cborBinaryTagBySize](int, nlohmann::json::parse_event_t event, nlohmann::json& parsed) { - if (event == nlohmann::json::parse_event_t::key and parsed == nlohmann::json("Data")) { - gotData = true; - return true; - } - if (event == nlohmann::json::parse_event_t::value and gotData) { - gotData = false; - std::string data = parsed.get<std::string>(); - parsed = nlohmann::json::binary({data.begin(), data.end()}, - cborBinaryTagBySize(data.size())); - return true; - } - return true; - }; - - auto toCborStr = NJson::WriteJson(Body, false); - auto json = - nlohmann::json::parse(TStringBuf(toCborStr).begin(), TStringBuf(toCborStr).end(), bz, false); - auto toCbor = nlohmann::json::to_cbor(json); - return {(char*)&toCbor[0], toCbor.size()}; - } - default: { - case MIME_JSON: - return NJson::WriteJson(Body, false); - } - } - } - - void THttpRequestContext::RequestBodyToProto(NProtoBuf::Message* request) { - TStringBuf requestStr = Request->Body; - if (requestStr.empty()) { - throw NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::MALFORMED_QUERY_STRING) << - "Empty body"; - } - - // recursive is default setting - if (auto listStreamsRequest = dynamic_cast<Ydb::DataStreams::V1::ListStreamsRequest*>(request)) { - listStreamsRequest->set_recurse(true); - } - - switch (ContentType) { - case MIME_CBOR: { - auto fromCbor = nlohmann::json::from_cbor(requestStr.begin(), requestStr.end(), - true, false, - nlohmann::json::cbor_tag_handler_t::ignore); - if (fromCbor.is_discarded()) { - throw NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::MALFORMED_QUERY_STRING) << - "Can not parse request body from CBOR"; - } else { - NlohmannJsonToProto(fromCbor, request); - } - break; - } - case MIME_JSON: { - auto fromJson = nlohmann::json::parse(requestStr, nullptr, false); - if (fromJson.is_discarded()) { - throw NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::MALFORMED_QUERY_STRING) << - "Can not parse request body from JSON"; - } else { - NlohmannJsonToProto(fromJson, request); - } - break; - } - default: - throw NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::MALFORMED_QUERY_STRING) << - "Unknown ContentType"; + case MIME_JSON: + return "application/x-amz-json-1.1"; + case MIME_CBOR: + return "application/x-amz-cbor-1.1"; + default: + return strByMime(contentType); } } - } // namespace NKikimr::NHttpProxy template <> void Out<NKikimr::NHttpProxy::THttpResponseData>(IOutputStream& o, const NKikimr::NHttpProxy::THttpResponseData& p) { - TString s = TStringBuilder() << "NYdb status: " << std::to_string(static_cast<size_t>(p.Status)) << - ". Body: " << NJson::WriteJson(p.Body) << ". Error text: " << p.ErrorText; + TString s = TStringBuilder() << "NYdb status: " << p.HttpCode << + ". Content type: " << p.ContentType << + ". Message: " << p.Message << + ". Body: " << p.Body; + o.Write(s.data(), s.length()); } diff --git a/ydb/core/http_proxy/http_req.h b/ydb/core/http_proxy/http_req.h index 182dc46c462..fa3d5651f11 100644 --- a/ydb/core/http_proxy/http_req.h +++ b/ydb/core/http_proxy/http_req.h @@ -10,8 +10,6 @@ #include <ydb/services/datastreams/codes/datastreams_codes.h> #include <library/cpp/http/server/http.h> -#include <library/cpp/json/json_reader.h> -#include <library/cpp/json/json_value.h> #include <util/stream/output.h> #include <util/string/builder.h> @@ -47,19 +45,11 @@ private: ui32 UsedRetries{0}; }; - struct THttpResponseData { - bool IsYmq = false; - bool UseYmqStatusCode = false; - NYdb::EStatus Status{NYdb::EStatus::SUCCESS}; - NJson::TJsonValue Body; - TString ErrorText{"OK"}; - TString YmqStatusCode; - ui32 YmqHttpCode = 500; - bool YmqIsFifo = false; - THashMap<TString, TString> QueueTags; - - TString DumpBody(MimeTypes contentType); + ui32 HttpCode; + MimeTypes ContentType = MimeTypes::MIME_TEXT; + TString Message; + TString Body; }; struct THttpRequestContext { @@ -74,7 +64,6 @@ struct THttpRequestContext { NYdb::TDriver* Driver; std::shared_ptr<NYdb::ICredentialsProvider> ServiceAccountCredentialsProvider; - THttpResponseData ResponseData; TString ServiceAccountId; TString RequestId; TString DiscoveryEndpoint; @@ -98,9 +87,9 @@ struct THttpRequestContext { } THolder<NKikimr::NSQS::TAwsRequestSignV4> GetSignature(); - void DoReply(const TActorContext& ctx, size_t issueCode = ISSUE_CODE_GENERIC); void ParseHeaders(TStringBuf headers); - void RequestBodyToProto(NProtoBuf::Message* request); + + void DoReply(THttpResponseData&& data); }; class IHttpRequestProcessor { @@ -142,18 +131,29 @@ protected: class IHttpController { public: enum class EError { - NotMyProtocol, - MethodNotFound + MethodNotFound, + ServiceDisabled }; virtual ~IHttpController() = default; - virtual std::expected<IHttpRequestProcessor*, EError> GetProcessor( - const TString& name, - const THttpRequestContext& context + virtual bool Execute( + THttpRequestContext&& context, + THolder<NKikimr::NSQS::TAwsRequestSignV4> signature ) const = 0; + + virtual THttpResponseData MakeError(MimeTypes contentType, NYdb::EStatus Status, const TStringBuf message, size_t issueCode) const = 0; + + virtual bool IsPossible(const TStringBuf apiVersion, const NKikimrConfig::TServerlessProxyConfig& config) const = 0; +}; + +class THttpControllerRegistry { +public: + const IHttpController* GetController(const TStringBuf apiVersion, const NKikimrConfig::TServerlessProxyConfig& config) const; }; +const THttpControllerRegistry& GetHttpControllerRegistry(); + class THttpRequestProcessors { public: using TService = Ydb::DataStreams::V1::DataStreamsService; @@ -165,11 +165,10 @@ public: bool Execute(const TString& name, THttpRequestContext&& params, THolder<NKikimr::NSQS::TAwsRequestSignV4> signature, const TActorContext& ctx); - -private: - const std::vector<std::shared_ptr<const IHttpController>> Controllers; }; +TString AsAwsContentType(MimeTypes contentType); + } // namespace NKikimr::NHttpProxy template <> diff --git a/ydb/core/http_proxy/http_service.cpp b/ydb/core/http_proxy/http_service.cpp index aa022488d43..95559991598 100644 --- a/ydb/core/http_proxy/http_service.cpp +++ b/ydb/core/http_proxy/http_service.cpp @@ -1,6 +1,7 @@ -#include "http_req.h" #include "http_service.h" +#include "http_req.h" + #include <ydb/core/protos/config.pb.h> #include <ydb/library/actors/core/actor_bootstrapped.h> #include <ydb/library/actors/core/events.h> @@ -9,13 +10,15 @@ #include <ydb/library/actors/http/http_proxy.h> #include <ydb/library/http_proxy/error/error.h> -#include <util/string/ascii.h> #include <util/stream/file.h> +#include <util/string/ascii.h> namespace NKikimr::NHttpProxy { using namespace NActors; + TString BuildError(MimeTypes mimeType, HttpCodes httpCode, const TString& errorName, const TString& errorText); + class THttpProxyActor : public NActors::TActorBootstrapped<THttpProxyActor> { using TBase = NActors::TActorBootstrapped<THttpProxyActor>; public: @@ -101,15 +104,18 @@ namespace NKikimr::NHttpProxy { " database [" << context.DatabasePath << "]" << " requestId: " << context.RequestId); + auto contentType = context.ContentType; try { auto signature = context.GetSignature(); auto methodName = context.MethodName; Processors->Execute(std::move(methodName), std::move(context), std::move(signature), ctx); } catch (const NKikimr::NSQS::TSQSException& e) { - context.ResponseData.Status = NYdb::EStatus::BAD_REQUEST; - context.ResponseData.ErrorText = e.what(); - context.DoReply(ctx, static_cast<size_t>(NYds::EErrorCodes::ACCESS_DENIED)); - return; + context.DoReply({ + .HttpCode = HTTP_BAD_REQUEST, + .ContentType = contentType, + .Message = "AccessDeniedException", + .Body = BuildError(contentType, HTTP_BAD_REQUEST, "AccessDeniedException", e.what()) + }); } } @@ -118,3 +124,4 @@ namespace NKikimr::NHttpProxy { } } // namespace NKikimr::NHttpProxy + diff --git a/ydb/core/http_proxy/serialization.cpp b/ydb/core/http_proxy/serialization.cpp new file mode 100644 index 00000000000..29bc1c77a25 --- /dev/null +++ b/ydb/core/http_proxy/serialization.cpp @@ -0,0 +1,91 @@ +#include "serialization.h" +#include "json_proto_conversion.h" + +#include <nlohmann/json.hpp> + + +namespace NKikimr::NHttpProxy { + +void DeserializeCbor(NProtoBuf::Message& message, const TStringBuf& input) { + auto fromCbor = nlohmann::json::from_cbor( + input.begin(), + input.end(), + true, // strict mode + false, // allow exceptions + nlohmann::json::cbor_tag_handler_t::ignore + ); + if (fromCbor.is_discarded()) { + throw NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::MALFORMED_QUERY_STRING) << + "Can not parse request body from CBOR"; + } else { + NlohmannJsonToProto(fromCbor, &message); + } +} + +void DeserializeJson(NProtoBuf::Message& message, const TStringBuf& input) { + auto fromJson = nlohmann::json::parse(input, nullptr, false); + if (fromJson.is_discarded()) { + throw NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::MALFORMED_QUERY_STRING) << + "Can not parse request body from JSON"; + } else { + NlohmannJsonToProto(fromJson, &message); + } +} + +TString SerializeCbor(const NProtoBuf::Message& message) { + NJson::TJsonValue result; + ProtoToJson(message, result, true); + return SerializeCbor(result); +} + +TString SerializeJson(const NProtoBuf::Message& message) { + NJson::TJsonValue result; + ProtoToJson(message, result, false); + return SerializeJson(result); +} + +TString SerializeCbor(const NJson::TJsonValue& message) { + // according to https://json.nlohmann.me/features/binary_formats/cbor/#serialization + auto cborBinaryTagBySize = [](size_t size) -> ui8 { + if (size <= 23) { + return 0x40 + static_cast<ui32>(size); + } else if (size <= 255) { + return 0x58; + } else if (size <= 65535) { + return 0x59; + } + + return 0x5A; + }; + + bool gotData = false; + std::function<bool(int, nlohmann::json::parse_event_t, nlohmann::basic_json<>&)> bz = + [&gotData, &cborBinaryTagBySize](int, nlohmann::json::parse_event_t event, nlohmann::json& parsed) { + if (event == nlohmann::json::parse_event_t::key and parsed == nlohmann::json("Data")) { + gotData = true; + return true; + } + if (event == nlohmann::json::parse_event_t::value and gotData) { + gotData = false; + std::string data = parsed.get<std::string>(); + parsed = nlohmann::json::binary({data.begin(), data.end()}, + cborBinaryTagBySize(data.size())); + return true; + } + return true; + }; + + auto toCborStr = NJson::WriteJson(message, false); + TStringBuf toCborBuf(toCborStr); + + auto json = nlohmann::json::parse(toCborBuf.begin(), toCborBuf.end(), bz, false); + auto toCbor = nlohmann::json::to_cbor(json); + + return {(char*)&toCbor[0], toCbor.size()}; +} + +TString SerializeJson(const NJson::TJsonValue& message) { + return NJson::WriteJson(message, false); +} + +} // namespace NKikimr::NHttpProxy diff --git a/ydb/core/http_proxy/serialization.h b/ydb/core/http_proxy/serialization.h new file mode 100644 index 00000000000..5196e4c02f7 --- /dev/null +++ b/ydb/core/http_proxy/serialization.h @@ -0,0 +1,18 @@ +#pragma once + +#include <contrib/libs/protobuf/src/google/protobuf/message.h> +#include <library/cpp/json/writer/json_value.h> +#include <library/cpp/mime/types/mime.h> + +namespace NKikimr::NHttpProxy { + +void DeserializeCbor(NProtoBuf::Message& message, const TStringBuf& input); +void DeserializeJson(NProtoBuf::Message& message, const TStringBuf& input); + +TString SerializeCbor(const NProtoBuf::Message& message); +TString SerializeJson(const NProtoBuf::Message& message); + +TString SerializeCbor(const NJson::TJsonValue& message); +TString SerializeJson(const NJson::TJsonValue& message); + +} // namespace NKikimr::NHttpProxy diff --git a/ydb/core/http_proxy/sqs.cpp b/ydb/core/http_proxy/sqs.cpp index 481ab381898..ba2b6642af6 100644 --- a/ydb/core/http_proxy/sqs.cpp +++ b/ydb/core/http_proxy/sqs.cpp @@ -1,8 +1,11 @@ +#include "sqs.h" + #include "auth_factory.h" +#include "controller_base.h" #include "custom_metrics.h" #include "exceptions_mapping.h" #include "http_req.h" -#include "json_proto_conversion.h" +#include "sqs_serialization.h" #include "utils.h" #include <ydb/core/base/appdata.h> @@ -26,8 +29,6 @@ #include <yql/essentials/public/issue/yql_issue_message.h> -#include <util/string/cast.h> - namespace NKikimr::NHttpProxy { namespace { @@ -51,7 +52,7 @@ namespace NKikimr::NHttpProxy { return std::unexpected(std::move(parsedQueueUrl).error()); } return parsedQueueUrl->Database; - } + } template<class TProtoService, class TProtoRequest, class TProtoResponse, class TProtoResult, class TProtoCall, class TRpcEv> class TSqsTopicHttpRequestProcessor : public TBaseHttpRequestProcessor<TProtoService, TProtoRequest, TProtoResponse, TProtoResult, TProtoCall, TRpcEv>{ @@ -171,16 +172,25 @@ namespace NKikimr::NHttpProxy { } void ReplyWithYdbError(const TActorContext& ctx, NYdb::EStatus status, const TString& errorText, size_t issueCode = ISSUE_CODE_GENERIC) { - HttpContext.ResponseData.Status = status; - HttpContext.ResponseData.ErrorText = errorText; + const auto [errorName, httpCode] = MapToException(status, Method, issueCode); + ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{ 1, true, true, AddCommonLabels({ - {"code", TStringBuilder() << (int)MapToException(status, Method, issueCode).second}, + {"code", TStringBuilder() << (int)httpCode}, {"name", "api.sqs.response.count"}, })}); - ReplyToHttpContext(ctx, 0, issueCode); + + ReplyToHttpContext({ + .HttpCode = httpCode, + .ContentType = HttpContext.ContentType, + .Message = errorName, + .Body = NSQS::Serialize(HttpContext.ContentType, { + .StatusCode = errorName, + .ErrorText = errorText, + }) + }, errorText.size(), errorText); ctx.Send(AuthActor, new TEvents::TEvPoisonPill()); @@ -192,12 +202,7 @@ namespace NKikimr::NHttpProxy { ui32 httpStatusCode, const TString& ymqStatusCode, const TString& errorText) { - HttpContext.ResponseData.IsYmq = false; - HttpContext.ResponseData.UseYmqStatusCode = true; - HttpContext.ResponseData.Status = NYdb::EStatus::STATUS_UNDEFINED; - HttpContext.ResponseData.YmqHttpCode = httpStatusCode; - HttpContext.ResponseData.YmqStatusCode = ymqStatusCode; - HttpContext.ResponseData.ErrorText = errorText; + ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{ 1, true, true, @@ -205,32 +210,38 @@ namespace NKikimr::NHttpProxy { {"code", ToString(httpStatusCode)}, {"name", "api.sqs.response.count"}, })}); - ReplyToHttpContext(ctx, errorText.size(), std::nullopt); + + ReplyToHttpContext({ + .HttpCode = httpStatusCode, + .ContentType = HttpContext.ContentType, + .Message = ymqStatusCode, + .Body = NSQS::Serialize(HttpContext.ContentType, { + .StatusCode = ymqStatusCode, + .ErrorText = errorText, + }) + }, errorText.size(), errorText); ctx.Send(AuthActor, new TEvents::TEvPoisonPill()); TBase::Die(ctx); } - void ReplyToHttpContext(const TActorContext& ctx, size_t messageSize, std::optional<size_t> issueCode) { + void ReplyToHttpContext(THttpResponseData&& data, size_t messageSize, TStringBuf errorText = "") { + const TActorContext& ctx = TlsActivationContext->AsActorContext(); + ReportLatencyCounters(ctx); - ReportResponseSizeCounters(TStringBuilder() << HttpContext.ResponseData.YmqHttpCode, messageSize, ctx); - LogHttpRequestResponse(ctx); + ReportResponseSizeCounters(TStringBuilder() << data.HttpCode, messageSize, ctx); + LogHttpRequestResponse(ctx, data.HttpCode, errorText); - if (issueCode.has_value()) { - HttpContext.DoReply(ctx, issueCode.value()); - } else { - HttpContext.DoReply(ctx); - } + HttpContext.DoReply(std::move(data)); } - void LogHttpRequestResponse(const TActorContext& ctx) { - const int httpCode = HttpContext.ResponseData.UseYmqStatusCode ? HttpContext.ResponseData.YmqHttpCode : 200; + void LogHttpRequestResponse(const TActorContext& ctx, int httpCode, TStringBuf errorText) { const bool isServerError = IsServerError(httpCode); auto priority = isServerError ? NActors::NLog::PRI_WARN : NActors::NLog::PRI_INFO; LOG_LOG_S_SAMPLED_BY(ctx, priority, NKikimrServices::SQS, NSqsTopic::SampleIdFromRequestId(HttpContext.RequestId), - "Request [" << HttpContext.RequestId << "] " << LogHttpRequestResponseCommonInfoString(HttpContext, StartTime, "SqsTopic", TopicPath, Method, UserSid_, httpCode, HttpContext.ResponseData.ErrorText)); + "Request [" << HttpContext.RequestId << "] " << LogHttpRequestResponseCommonInfoString(HttpContext, StartTime, "SqsTopic", TopicPath, Method, UserSid_, httpCode, errorText)); } void ReportInputCounters(const TActorContext& ctx) { @@ -265,17 +276,23 @@ namespace NKikimr::NHttpProxy { void HandleGrpcResponse(TEvServerlessProxy::TEvGrpcRequestResult::TPtr ev, const TActorContext& ctx) { if (ev->Get()->Status->IsSuccess()) { - ProtoToJson(*ev->Get()->Message, HttpContext.ResponseData.Body, - HttpContext.ContentType == MIME_CBOR); FillOutputCustomMetrics<TProtoResult>( - *(dynamic_cast<TProtoResult*>(ev->Get()->Message.Get())), HttpContext, ctx); + *(dynamic_cast<TProtoResult*>(ev->Get()->Message.Get())), + HttpContext, + ctx + ); ctx.Send(MakeMetricsServiceID(), new TEvServerlessProxy::TEvCounter{ 1, true, true, AddCommonLabels({ {"code", "200"}, {"name", "api.sqs.response.count"}})}); - ReplyToHttpContext(ctx, ev->Get()->Message->ByteSizeLong(), std::nullopt); + ReplyToHttpContext({ + .HttpCode = 200, + .ContentType = HttpContext.ContentType, + .Message = "", + .Body = NSQS::Serialize(HttpContext.ContentType, *ev->Get()->Message) + }, ev->Get()->Message->ByteSizeLong()); } else { auto retryClass = NYdb::NTopic::GetRetryErrorClass(ev->Get()->Status->GetStatus()); @@ -299,8 +316,8 @@ namespace NKikimr::NHttpProxy { auto issues = ev->Get()->Status->GetIssues(); auto [error, errorCode] = issues.Empty() ? std::make_tuple( - NSQS::NErrors::INTERNAL_FAILURE.ErrorCode, - NSQS::NErrors::INTERNAL_FAILURE.HttpStatusCode) + NKikimr::NSQS::NErrors::INTERNAL_FAILURE.ErrorCode, + NKikimr::NSQS::NErrors::INTERNAL_FAILURE.HttpStatusCode) : NKikimr::NSQS::TErrorClass::GetErrorAndCode(issues.begin()->GetCode()); LOG_SP_DEBUG_S( @@ -313,7 +330,7 @@ namespace NKikimr::NHttpProxy { ctx, errorCode, error, - TString{!issues.Empty() ? issues.begin()->GetMessage() : NSQS::NErrors::INTERNAL_FAILURE.ErrorCode} + TString{!issues.Empty() ? issues.begin()->GetMessage() : NKikimr::NSQS::NErrors::INTERNAL_FAILURE.ErrorCode} ); } } @@ -343,7 +360,7 @@ namespace NKikimr::NHttpProxy { void Bootstrap(const TActorContext& ctx) { StartTime = ctx.Now(); try { - HttpContext.RequestBodyToProto(&Request); + NSQS::Deserialize<TProtoRequest>(HttpContext.ContentType, Request, HttpContext.Request->Body); } catch (const NKikimr::NSQS::TSQSException& e) { NYds::EErrorCodes issueCode = NYds::EErrorCodes::OK; if (e.ErrorClass.ErrorCode == "MissingParameter") @@ -388,9 +405,9 @@ namespace NKikimr::NHttpProxy { if (AppData(ctx)->EnforceUserTokenRequirement || AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) { return ReplyWithMessageQueueError( ctx, - NSQS::NErrors::INCOMPLETE_SIGNATURE.HttpStatusCode, - NSQS::NErrors::INCOMPLETE_SIGNATURE.ErrorCode, - NSQS::NErrors::INCOMPLETE_SIGNATURE.DefaultMessage); + NKikimr::NSQS::NErrors::INCOMPLETE_SIGNATURE.HttpStatusCode, + NKikimr::NSQS::NErrors::INCOMPLETE_SIGNATURE.ErrorCode, + NKikimr::NSQS::NErrors::INCOMPLETE_SIGNATURE.DefaultMessage); } SendGrpcRequestNoDriver(ctx); } @@ -422,31 +439,33 @@ namespace NKikimr::NHttpProxy { }; }; - class TController : public IHttpController { + class TController : public TBaseHttpController { public: TController() { - #define DECLARE_SQS_TOPIC_PROCESSOR_QUEUE_UNKNOWN(name) Name2Processor[#name] = MakeHolder<TSqsTopicHttpRequestProcessor< \ - Ydb::SqsTopic::V1::SqsTopicService, \ - Ydb::Ymq::V1::name##Request, \ - Ydb::Ymq::V1::name##Response, \ - Ydb::Ymq::V1::name##Result, \ - decltype(&Ydb::SqsTopic::V1::SqsTopicService::Stub::AsyncSqsTopic##name), \ - NKikimr::NGRpcService::TEvSqsTopic##name##Request>> \ - (#name, &Ydb::SqsTopic::V1::SqsTopicService::Stub::AsyncSqsTopic##name) + #define DECLARE_SQS_TOPIC_PROCESSOR_QUEUE_UNKNOWN(name) Name2Processor[#name] = \ + std::make_unique<TSqsTopicHttpRequestProcessor< \ + Ydb::SqsTopic::V1::SqsTopicService, \ + Ydb::Ymq::V1::name##Request, \ + Ydb::Ymq::V1::name##Response, \ + Ydb::Ymq::V1::name##Result, \ + decltype(&Ydb::SqsTopic::V1::SqsTopicService::Stub::AsyncSqsTopic##name), \ + NKikimr::NGRpcService::TEvSqsTopic##name##Request \ + >>(#name, &Ydb::SqsTopic::V1::SqsTopicService::Stub::AsyncSqsTopic##name) \ DECLARE_SQS_TOPIC_PROCESSOR_QUEUE_UNKNOWN(GetQueueUrl); DECLARE_SQS_TOPIC_PROCESSOR_QUEUE_UNKNOWN(ListQueues); #undef DECLARE_SQS_TOPIC_PROCESSOR_QUEUE_UNKNOWN - #define DECLARE_SQS_TOPIC_PROCESSOR_QUEUE_KNOWN(name) Name2Processor[#name] = MakeHolder<TSqsTopicHttpRequestProcessor< \ - Ydb::SqsTopic::V1::SqsTopicService, \ - Ydb::Ymq::V1::name##Request, \ - Ydb::Ymq::V1::name##Response, \ - Ydb::Ymq::V1::name##Result, \ - decltype(&Ydb::SqsTopic::V1::SqsTopicService::Stub::AsyncSqsTopic##name), \ - NKikimr::NGRpcService::TEvSqsTopic##name##Request>> \ - (#name, &Ydb::SqsTopic::V1::SqsTopicService::Stub::AsyncSqsTopic##name) + #define DECLARE_SQS_TOPIC_PROCESSOR_QUEUE_KNOWN(name) Name2Processor[#name] = \ + std::make_unique<TSqsTopicHttpRequestProcessor< \ + Ydb::SqsTopic::V1::SqsTopicService, \ + Ydb::Ymq::V1::name##Request, \ + Ydb::Ymq::V1::name##Response, \ + Ydb::Ymq::V1::name##Result, \ + decltype(&Ydb::SqsTopic::V1::SqsTopicService::Stub::AsyncSqsTopic##name), \ + NKikimr::NGRpcService::TEvSqsTopic##name##Request \ + >>(#name, &Ydb::SqsTopic::V1::SqsTopicService::Stub::AsyncSqsTopic##name) \ DECLARE_SQS_TOPIC_PROCESSOR_QUEUE_KNOWN(CreateQueue); DECLARE_SQS_TOPIC_PROCESSOR_QUEUE_KNOWN(DeleteMessage); @@ -463,33 +482,36 @@ namespace NKikimr::NHttpProxy { #undef DECLARE_SQS_TOPIC_PROCESSOR_QUEUE_KNOWN } - - std::expected<IHttpRequestProcessor*, IHttpController::EError> GetProcessor( - const TString& name, - const THttpRequestContext& context - ) const override { - if (context.ApiVersion != "AmazonSQS") { - return std::unexpected(IHttpController::EError::NotMyProtocol); - } - if (auto proc = Name2Processor.find(name); proc != Name2Processor.end()) { - return std::expected<IHttpRequestProcessor*, IHttpController::EError>(proc->second.Get()); - } + THttpResponseData MakeError(MimeTypes contentType, NYdb::EStatus Status, const TStringBuf message, size_t issueCode) const override { + const auto [errorName, httpCode] = MapToException(Status, "", issueCode); + return { + .HttpCode = httpCode, + .ContentType = contentType, + .Message = errorName, + .Body = NSQS::Serialize(contentType, NSQS::TErrorResponse{ + .StatusCode = errorName, + .ErrorText = TString(message), + }) + }; + } + + bool IsEnabled(const NKikimrConfig::THttpProxyConfig& config) const override { + return config.GetSqsTopicEnabled(); + } - return std::unexpected(IHttpController::EError::MethodNotFound); + bool IsPossible(const TStringBuf apiVersion, const NKikimrConfig::TServerlessProxyConfig&) const override { + return apiVersion == "AmazonSQS"; } - - private: - THashMap<TString, THolder<IHttpRequestProcessor>> Name2Processor; }; + TController ControllerInstance; + } // namespace - std::shared_ptr<const IHttpController> CreateSqsHttpController(const NKikimrConfig::TServerlessProxyConfig& config) { - if (config.GetHttpConfig().GetSqsTopicEnabled()) { - return std::make_shared<TController>(); - } - return {}; + const IHttpController* GetSqsHttpController() { + return &ControllerInstance; } } // namespace NKikimr::NHttpProxy + diff --git a/ydb/core/http_proxy/sqs.h b/ydb/core/http_proxy/sqs.h index 0d681501bd3..f777e7a0e6b 100644 --- a/ydb/core/http_proxy/sqs.h +++ b/ydb/core/http_proxy/sqs.h @@ -4,6 +4,6 @@ namespace NKikimr::NHttpProxy { -std::shared_ptr<const IHttpController> CreateSqsHttpController(const NKikimrConfig::TServerlessProxyConfig& config); +const IHttpController* GetSqsHttpController(); } // namespace NKikimr::NHttpProxy diff --git a/ydb/core/http_proxy/sqs_serialization.cpp b/ydb/core/http_proxy/sqs_serialization.cpp new file mode 100644 index 00000000000..8086d19c9b1 --- /dev/null +++ b/ydb/core/http_proxy/sqs_serialization.cpp @@ -0,0 +1,36 @@ +#include "sqs_serialization.h" + +namespace NKikimr::NHttpProxy::NSQS { + + TString Serialize(const MimeTypes mimeType, const NProtoBuf::Message& value) { + switch (mimeType) { + case MIME_XML: + // TODO implement + Y_ENSURE(false); + case MIME_CBOR: + return SerializeCbor(value); + case MIME_JSON: + [[fallthrough]]; + default: + return SerializeJson(value); + } + } + + + TString Serialize(const MimeTypes mimeType, TErrorResponse&& value) { + NJson::TJsonValue json; + json.SetType(NJson::JSON_MAP); + json["message"] = value.ErrorText; + json["__type"] = value.StatusCode; + + switch (mimeType) { + case MIME_CBOR: + return SerializeCbor(json); + case MIME_JSON: + [[fallthrough]]; + default: + return SerializeJson(json); + } + } + +} // namespace NKikimr::NHttpProxy::NSQS diff --git a/ydb/core/http_proxy/sqs_serialization.h b/ydb/core/http_proxy/sqs_serialization.h new file mode 100644 index 00000000000..75594f43126 --- /dev/null +++ b/ydb/core/http_proxy/sqs_serialization.h @@ -0,0 +1,45 @@ +#pragma once + +#include "serialization.h" + +#include <ydb/library/http_proxy/error/error.h> + +namespace NKikimr::NHttpProxy::NSQS { + + template<typename TValue> + void PrepareValue(TValue& value) { + Y_UNUSED(value); + } + + template<typename TValue> + void Deserialize(const MimeTypes mimeType, TValue& value, const TStringBuf& input) + requires std::is_base_of_v<NProtoBuf::Message, TValue> { + + if (input.empty()) { + throw NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::MALFORMED_QUERY_STRING) << "Empty body"; + } + + switch (mimeType) { + case MIME_CBOR: + PrepareValue(value); + DeserializeCbor(value, input); + break; + case MIME_JSON: + PrepareValue(value); + DeserializeJson(value, input); + break; + default: + throw NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::MALFORMED_QUERY_STRING) << + "Unknown ContentType"; + } + }; + + TString Serialize(const MimeTypes mimeType, const NProtoBuf::Message& value); + + struct TErrorResponse { + TString StatusCode; + TString ErrorText; + }; + TString Serialize(const MimeTypes mimeType, TErrorResponse&& value); + +} // namespace NKikimr::NHttpProxy::NSQS diff --git a/ydb/core/http_proxy/ut/datastreams_fixture/datastreams_fixture.cpp b/ydb/core/http_proxy/ut/datastreams_fixture/datastreams_fixture.cpp index 337fbcb4ee3..0905db1a478 100644 --- a/ydb/core/http_proxy/ut/datastreams_fixture/datastreams_fixture.cpp +++ b/ydb/core/http_proxy/ut/datastreams_fixture/datastreams_fixture.cpp @@ -807,7 +807,7 @@ void THttpProxyTestMock::InitHttpServer(bool yandexCloudMode, bool enableSqsTopi config.SetTestMode(true); config.MutableHttpConfig()->SetPort(HttpServicePort); config.MutableHttpConfig()->SetYandexCloudMode(yandexCloudMode); - config.MutableHttpConfig()->SetYmqEnabled(true); + config.MutableHttpConfig()->SetYmqEnabled(!enableSqsTopic); config.MutableHttpConfig()->SetSqsTopicEnabled(enableSqsTopic); SqsTopicMode = enableSqsTopic; diff --git a/ydb/core/http_proxy/ut/kinesis_ut.cpp b/ydb/core/http_proxy/ut/kinesis_ut.cpp index a31f221a8a9..8471f456b18 100644 --- a/ydb/core/http_proxy/ut/kinesis_ut.cpp +++ b/ydb/core/http_proxy/ut/kinesis_ut.cpp @@ -190,22 +190,22 @@ Y_UNIT_TEST_SUITE(TestKinesisHttpProxy) { request = CreateDescribeStreamRequest(); request["StreamName"] = "teststream"; res = SendHttpRequest("/Root", ".", request, FormAuthorizationStr("ru-central-1")); - UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 400); - UNIT_ASSERT_VALUES_EQUAL(res.Description, "MissingAction"); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 404); + UNIT_ASSERT_VALUES_EQUAL(res.Description, "Not Found"); } { request = CreateDescribeStreamRequest(); request["StreamName"] = "teststream"; res = SendHttpRequest("/Root", "", request, FormAuthorizationStr("ru-central-1")); - UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 400); - UNIT_ASSERT_VALUES_EQUAL(res.Description, "MissingAction"); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 404); + UNIT_ASSERT_VALUES_EQUAL(res.Description, "Not Found"); } { request = CreateDescribeStreamRequest(); request["StreamName"] = "teststream"; res = SendHttpRequest("/Root", ".DescribeStream", request, FormAuthorizationStr("ru-central-1")); - UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 400); - UNIT_ASSERT_VALUES_EQUAL(res.Description, "MissingAction"); + UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, 404); + UNIT_ASSERT_VALUES_EQUAL(res.Description, "Not Found"); } } diff --git a/ydb/core/http_proxy/utils.cpp b/ydb/core/http_proxy/utils.cpp index 90496640dfc..2519b0556de 100644 --- a/ydb/core/http_proxy/utils.cpp +++ b/ydb/core/http_proxy/utils.cpp @@ -1,6 +1,7 @@ -#include "exceptions_mapping.h" #include "utils.h" +#include "http_req.h" + namespace NKikimr::NHttpProxy { TException MapToException(NYdb::EStatus status, const TString& method, size_t issueCode) { diff --git a/ydb/core/http_proxy/utils.h b/ydb/core/http_proxy/utils.h index 119d82d75d4..0ba7b49ba10 100644 --- a/ydb/core/http_proxy/utils.h +++ b/ydb/core/http_proxy/utils.h @@ -1,13 +1,17 @@ #pragma once #include "exceptions_mapping.h" -#include "http_req.h" + +#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/types/status_codes.h> #include <util/datetime/base.h> namespace NKikimr::NHttpProxy { -TException MapToException(NYdb::EStatus status, const TString& method, size_t issueCode = ISSUE_CODE_ERROR); +struct THttpRequestContext; + +TException MapToException(NYdb::EStatus status, const TString& method, size_t issueCode); + TString LogHttpRequestResponseCommonInfoString(const THttpRequestContext& httpContext, TInstant startTime, TStringBuf api, TStringBuf topicPath, TStringBuf method, TStringBuf userSid, int httpCode, TStringBuf httpResponseMessage); } // namespace NKikimr::NHttpProxy diff --git a/ydb/core/http_proxy/ya.make b/ydb/core/http_proxy/ya.make index 6b07d755c78..2ac9615485e 100644 --- a/ydb/core/http_proxy/ya.make +++ b/ydb/core/http_proxy/ya.make @@ -9,8 +9,14 @@ SRCS( auth_factory.h auth_actors.cpp auth_actors.h + controller_base.cpp + controller_base.h + controller_registry.cpp custom_metrics.h datastreams.cpp + datastreams.h + datastreams_serialization.cpp + datastreams_serialization.h discovery_actor.cpp discovery_actor.h events.h @@ -26,9 +32,16 @@ SRCS( json_proto_conversion.cpp metrics_actor.cpp metrics_actor.h + serialization.cpp + serialization.h sqs.cpp + sqs.h + sqs_serialization.cpp + sqs_serialization.h utils.cpp + utils.h ymq.cpp + ymq.h ) PEERDIR( diff --git a/ydb/core/http_proxy/ymq.cpp b/ydb/core/http_proxy/ymq.cpp index f57753f998f..63c774365c2 100644 --- a/ydb/core/http_proxy/ymq.cpp +++ b/ydb/core/http_proxy/ymq.cpp @@ -1,5 +1,9 @@ +#include "ymq.h" + +#include "controller_base.h" #include "http_req.h" -#include "json_proto_conversion.h" +#include "sqs_serialization.h" +#include "utils.h" #include <ydb/core/grpc_services/local_rpc/local_rpc.h> #include <ydb/core/ymq/actor/auth_multi_factory.h> @@ -18,6 +22,9 @@ #include <yql/essentials/public/issue/yql_issue_message.h> +#include <expected> +#include <functional> + namespace NKikimr::NHttpProxy { namespace { @@ -144,10 +151,19 @@ namespace NKikimr::NHttpProxy { NYdb::EStatus status, const TString& errorText, size_t issueCode = ISSUE_CODE_GENERIC) { - HttpContext.ResponseData.Status = status; - HttpContext.ResponseData.ErrorText = errorText; - ReplyToHttpContext(ctx, issueCode); + const auto [errorName, httpCode] = MapToException(status, Method, issueCode); + + ReplyToHttpContext({ + .HttpCode = httpCode, + .ContentType = HttpContext.ContentType, + .Message = errorName, + .Body = NSQS::Serialize(HttpContext.ContentType, { + .StatusCode = errorName, + .ErrorText = errorText, + }) + }, {}); + ctx.Send(AuthActor, new TEvents::TEvPoisonPill()); @@ -159,28 +175,69 @@ namespace NKikimr::NHttpProxy { ui32 httpStatusCode, const TString& ymqStatusCode, const TString& errorText) { - HttpContext.ResponseData.IsYmq = true; - HttpContext.ResponseData.UseYmqStatusCode = true; - HttpContext.ResponseData.Status = NYdb::EStatus::STATUS_UNDEFINED; - HttpContext.ResponseData.YmqHttpCode = httpStatusCode; - HttpContext.ResponseData.YmqStatusCode = ymqStatusCode; - HttpContext.ResponseData.ErrorText = errorText; - ReplyToHttpContext(ctx); + ReplyToHttpContext({ + .HttpCode = httpStatusCode, + .ContentType = HttpContext.ContentType, + .Message = ymqStatusCode, + .Body = NSQS::Serialize(HttpContext.ContentType, { + .StatusCode = ymqStatusCode, + .ErrorText = errorText, + }) + }, {}); ctx.Send(AuthActor, new TEvents::TEvPoisonPill()); TBase::Die(ctx); } - void ReplyToHttpContext(const TActorContext& ctx, std::optional<size_t> issueCode = std::nullopt) { - if (issueCode.has_value()) { - HttpContext.DoReply(ctx, issueCode.value()); - } else { - HttpContext.DoReply(ctx); + void DoMetering(const THttpResponseData& data, THolder<THashMap<TString, TString>>&& queueTags, const TActorContext& ctx) { + if (HttpContext.ServiceConfig.GetHttpConfig().GetYandexCloudMode()) { + // Send request attributes to the metering actor + auto reportRequestAttributes = MakeHolder<::NKikimr::NSQS::TSqsEvents::TEvReportProcessedRequestAttributes>(); + + auto& requestAttributes = reportRequestAttributes->Data; + + requestAttributes.HttpStatusCode = data.HttpCode; + requestAttributes.IsFifo = IsFifo; + requestAttributes.FolderId = FolderId; + requestAttributes.RequestSizeInBytes = HttpContext.Request->Size(); + requestAttributes.ResponseSizeInBytes = data.Body.size(); + requestAttributes.SourceAddress = HttpContext.SourceAddress; + requestAttributes.ResourceId = ResourceId; + requestAttributes.Action = ::NKikimr::NSQS::ActionFromString(HttpContext.MethodName); + if (queueTags) { + for (auto&& [k, v] : *queueTags) { + requestAttributes.QueueTags[std::move(k)] = std::move(v); + } + } + + LOG_SP_DEBUG_S( + ctx, + NKikimrServices::HTTP_PROXY, + TStringBuilder() << "Send metering event." + << " HttpStatusCode: " << requestAttributes.HttpStatusCode + << " IsFifo: " << requestAttributes.IsFifo + << " FolderId: " << requestAttributes.FolderId + << " RequestSizeInBytes: " << requestAttributes.RequestSizeInBytes + << " ResponseSizeInBytes: " << requestAttributes.ResponseSizeInBytes + << " SourceAddress: " << requestAttributes.SourceAddress + << " ResourceId: " << requestAttributes.ResourceId + << " Action: " << requestAttributes.Action + ); + + ctx.Send(::NKikimr::NSQS::MakeSqsMeteringServiceID(), reportRequestAttributes.Release()); } } + void ReplyToHttpContext(THttpResponseData&& data, THolder<THashMap<TString, TString>>&& queueTags) { + const TActorContext& ctx = TlsActivationContext->AsActorContext(); + + DoMetering(data, std::move(queueTags), ctx); + + HttpContext.DoReply(std::move(data)); + } + void HandleGrpcResponse(TEvServerlessProxy::TEvGrpcRequestResult::TPtr ev, const TActorContext& ctx) { if (ev->Get()->Status->IsSuccess()) { @@ -189,18 +246,15 @@ namespace NKikimr::NHttpProxy { NKikimrServices::HTTP_PROXY, "Got succesfult GRPC response." ); - ProtoToJson( - *ev->Get()->Message, - HttpContext.ResponseData.Body, - HttpContext.ContentType == MIME_CBOR + + ReplyToHttpContext({ + .HttpCode = 200, + .ContentType = HttpContext.ContentType, + .Message = "", + .Body = NSQS::Serialize(HttpContext.ContentType, *ev->Get()->Message) + }, + std::move(ev->Get()->QueueTags) ); - HttpContext.ResponseData.IsYmq = true; - HttpContext.ResponseData.UseYmqStatusCode = true; - HttpContext.ResponseData.YmqHttpCode = 200; - if (ev->Get()->QueueTags) { - HttpContext.ResponseData.QueueTags = std::move(*ev->Get()->QueueTags); - } - ReplyToHttpContext(ctx); } else { auto retryClass = NYdb::NTopic::GetRetryErrorClass(ev->Get()->Status->GetStatus()); @@ -227,8 +281,8 @@ namespace NKikimr::NHttpProxy { auto issues = ev->Get()->Status->GetIssues(); auto errorAndCode = issues.Empty() ? std::make_tuple( - NSQS::NErrors::INTERNAL_FAILURE.ErrorCode, - NSQS::NErrors::INTERNAL_FAILURE.HttpStatusCode) + NKikimr::NSQS::NErrors::INTERNAL_FAILURE.ErrorCode, + NKikimr::NSQS::NErrors::INTERNAL_FAILURE.HttpStatusCode) : NKikimr::NSQS::TErrorClass::GetErrorAndCode(issues.begin()->GetCode()); LOG_SP_DEBUG_S( @@ -292,7 +346,7 @@ namespace NKikimr::NHttpProxy { PoolId = ctx.SelfID.PoolID(); StartTime = ctx.Now(); try { - HttpContext.RequestBodyToProto(&Request); + NSQS::Deserialize<TProtoRequest>(HttpContext.ContentType, Request, HttpContext.Request->Body); auto queueUrl = QueueUrlExtractor(Request); if (!queueUrl.empty()) { auto cloudIdAndResourceId = NKikimr::NYmq::CloudIdAndResourceIdFromQueueUrl(queueUrl); @@ -301,7 +355,7 @@ namespace NKikimr::NHttpProxy { } CloudId = cloudIdAndResourceId.first; HttpContext.ResourceId = ResourceId = cloudIdAndResourceId.second; - HttpContext.ResponseData.YmqIsFifo = AsciiHasSuffixIgnoreCase(queueUrl, ".fifo"); + IsFifo = AsciiHasSuffixIgnoreCase(queueUrl, ".fifo"); } } catch (const NKikimr::NSQS::TSQSException& e) { NYds::EErrorCodes issueCode = NYds::EErrorCodes::OK; @@ -337,10 +391,10 @@ namespace NKikimr::NHttpProxy { } else { auto requestHolder = MakeHolder<NKikimrClient::TSqsRequest>(); - NSQS::EAction action = NSQS::ActionFromString(Method); + NKikimr::NSQS::EAction action = NKikimr::NSQS::ActionFromString(Method); requestHolder->SetRequestId(HttpContext.RequestId); - NSQS::TAuthActorData data { + NKikimr::NSQS::TAuthActorData data { .SQSRequest = std::move(requestHolder), .UserSidCallback = [](const TString& userSid) { Y_UNUSED(userSid); }, .EnableQueueLeader = true, @@ -352,7 +406,7 @@ namespace NKikimr::NHttpProxy { .AWSSignature = std::move(HttpContext.GetSignature()), .IAMToken = HttpContext.IamToken, .FolderID = HttpContext.FolderId, - .RequestFormat = NSQS::TAuthActorData::Json, + .RequestFormat = NKikimr::NSQS::TAuthActorData::Json, .Requester = ctx.SelfID }; @@ -386,22 +440,25 @@ namespace NKikimr::NHttpProxy { TString CloudId; TString ResourceId; TString UserSid; + bool IsFifo = false; }; std::function<TString(TProtoRequest&)> QueueUrlExtractor; }; - class TController : public IHttpController { + class TController: public TBaseHttpController { public: TController() { - #define DECLARE_YMQ_PROCESSOR_QUEUE_UNKNOWN(name) Name2Processor[#name] = MakeHolder<TYmqHttpRequestProcessor< \ - Ydb::Ymq::V1::YmqService, \ - Ydb::Ymq::V1::name##Request, \ - Ydb::Ymq::V1::name##Response, \ - Ydb::Ymq::V1::name##Result, \ - decltype(&Ydb::Ymq::V1::YmqService::Stub::AsyncYmq##name), \ - NKikimr::NGRpcService::TEvYmq##name##Request>> \ - (#name, &Ydb::Ymq::V1::YmqService::Stub::AsyncYmq##name, [](Ydb::Ymq::V1::name##Request&){return "";}); + + #define DECLARE_YMQ_PROCESSOR_QUEUE_UNKNOWN(name) Name2Processor[#name] = \ + std::make_unique<TYmqHttpRequestProcessor< \ + Ydb::Ymq::V1::YmqService, \ + Ydb::Ymq::V1::name##Request, \ + Ydb::Ymq::V1::name##Response, \ + Ydb::Ymq::V1::name##Result, \ + decltype(&Ydb::Ymq::V1::YmqService::Stub::AsyncYmq##name), \ + NKikimr::NGRpcService::TEvYmq##name##Request \ + >>(#name, &Ydb::Ymq::V1::YmqService::Stub::AsyncYmq##name, [](Ydb::Ymq::V1::name##Request&){return "";}) DECLARE_YMQ_PROCESSOR_QUEUE_UNKNOWN(GetQueueUrl); DECLARE_YMQ_PROCESSOR_QUEUE_UNKNOWN(CreateQueue); @@ -409,14 +466,18 @@ namespace NKikimr::NHttpProxy { #undef DECLARE_YMQ_PROCESSOR_QUEUE_UNKNOWN - #define DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(name) Name2Processor[#name] = MakeHolder<TYmqHttpRequestProcessor< \ - Ydb::Ymq::V1::YmqService, \ - Ydb::Ymq::V1::name##Request, \ - Ydb::Ymq::V1::name##Response, \ - Ydb::Ymq::V1::name##Result,\ - decltype(&Ydb::Ymq::V1::YmqService::Stub::AsyncYmq##name), \ - NKikimr::NGRpcService::TEvYmq##name##Request>> \ - (#name, &Ydb::Ymq::V1::YmqService::Stub::AsyncYmq##name, [](Ydb::Ymq::V1::name##Request& request){return request.Getqueue_url();}); + #define DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(name) Name2Processor[#name] = \ + std::make_unique<TYmqHttpRequestProcessor< \ + Ydb::Ymq::V1::YmqService, \ + Ydb::Ymq::V1::name##Request, \ + Ydb::Ymq::V1::name##Response, \ + Ydb::Ymq::V1::name##Result, \ + decltype(&Ydb::Ymq::V1::YmqService::Stub::AsyncYmq##name), \ + NKikimr::NGRpcService::TEvYmq##name##Request \ + >>(#name, &Ydb::Ymq::V1::YmqService::Stub::AsyncYmq##name, [](Ydb::Ymq::V1::name##Request& request){ \ + return request.Getqueue_url(); \ + }) + DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(SendMessage); DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(ReceiveMessage); DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(GetQueueAttributes); @@ -434,34 +495,38 @@ namespace NKikimr::NHttpProxy { DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(UntagQueue); #undef DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN + } - std::expected<IHttpRequestProcessor*, IHttpController::EError> GetProcessor( - const TString& name, - const THttpRequestContext& context - ) const override { - if (context.ApiVersion != "AmazonSQS") { - return std::unexpected(IHttpController::EError::NotMyProtocol); - } + THttpResponseData MakeError(MimeTypes contentType, NYdb::EStatus Status, const TStringBuf message, size_t issueCode) const override { + const auto [errorName, httpCode] = MapToException(Status, "", issueCode); + return { + .HttpCode = httpCode, + .ContentType = contentType, + .Message = errorName, + .Body = NSQS::Serialize(contentType, NSQS::TErrorResponse{ + .StatusCode = errorName, + .ErrorText = TString(message), + }) + }; + } - if (auto proc = Name2Processor.find(name); proc != Name2Processor.end()) { - return std::expected<IHttpRequestProcessor*, IHttpController::EError>(proc->second.Get()); - } + bool IsEnabled(const NKikimrConfig::THttpProxyConfig& config) const override { + return config.GetYmqEnabled(); + } - return std::unexpected(IHttpController::EError::MethodNotFound); + bool IsPossible(const TStringBuf apiVersion, const NKikimrConfig::TServerlessProxyConfig&) const override { + return apiVersion == "AmazonSQS"; } + }; - private: - THashMap<TString, THolder<IHttpRequestProcessor>> Name2Processor; - }; + TController ControllerInstance; } // namespace - std::shared_ptr<const IHttpController> CreateYmqHttpController(const NKikimrConfig::TServerlessProxyConfig& config) { - if (config.GetHttpConfig().GetYmqEnabled()) { - return std::make_shared<TController>(); - } - return {}; + const IHttpController* GetYmqHttpController() { + return &ControllerInstance; } } // namespace NKikimr::NHttpProxy + diff --git a/ydb/core/http_proxy/ymq.h b/ydb/core/http_proxy/ymq.h index 67da4e08466..5eab412fd37 100644 --- a/ydb/core/http_proxy/ymq.h +++ b/ydb/core/http_proxy/ymq.h @@ -4,6 +4,6 @@ namespace NKikimr::NHttpProxy { -std::shared_ptr<const IHttpController> CreateYmqHttpController(const NKikimrConfig::TServerlessProxyConfig& config); +const IHttpController* GetYmqHttpController(); } // namespace NKikimr::NHttpProxy diff --git a/ydb/core/kqp/ut/discovery/kqp_discovery_ut.cpp b/ydb/core/kqp/ut/discovery/kqp_discovery_ut.cpp index 60fadd1c2b7..6818a8cfc69 100644 --- a/ydb/core/kqp/ut/discovery/kqp_discovery_ut.cpp +++ b/ydb/core/kqp/ut/discovery/kqp_discovery_ut.cpp @@ -1,6 +1,7 @@ #include <ydb/core/discovery/discovery.h> #include <ydb/core/kqp/ut/common/kqp_ut_common.h> +#include <ydb/public/api/grpc/ydb_discovery_v1.grpc.pb.h> #include <library/cpp/testing/unittest/registar.h> |
