diff options
author | mokhotskii <[email protected]> | 2022-07-11 14:13:11 +0300 |
---|---|---|
committer | mokhotskii <[email protected]> | 2022-07-11 14:13:11 +0300 |
commit | 6a76a0b79c7482c76d839ca78c627bb53d8a222b (patch) | |
tree | 5f3813ab4d8a4efd72bda714f9076abe0a914233 | |
parent | 285021ab1aac39e84b269d9bacd4deee69cf63fc (diff) |
Refactor http proxy code
Refactor http proxy
- introduce HttpRequestContext::ContentType field
- switch to builtin MimeType and related functions
- move json-proto convertion to a separate file
- other minor transitions of code and readability improvements
-rw-r--r-- | ydb/core/http_proxy/CMakeLists.txt | 8 | ||||
-rw-r--r-- | ydb/core/http_proxy/http_req.cpp | 396 | ||||
-rw-r--r-- | ydb/core/http_proxy/http_req.h | 45 | ||||
-rw-r--r-- | ydb/core/http_proxy/http_service.cpp | 152 | ||||
-rw-r--r-- | ydb/core/http_proxy/json_proto_conversion.h | 279 |
5 files changed, 477 insertions, 403 deletions
diff --git a/ydb/core/http_proxy/CMakeLists.txt b/ydb/core/http_proxy/CMakeLists.txt index bfa39ff495e..d9162f71943 100644 --- a/ydb/core/http_proxy/CMakeLists.txt +++ b/ydb/core/http_proxy/CMakeLists.txt @@ -29,10 +29,10 @@ target_link_libraries(ydb-core-http_proxy PUBLIC ydb-services-datastreams ) target_sources(ydb-core-http_proxy PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/http_req.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/metrics_actor.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/grpc_service.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/auth_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/discovery_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/grpc_service.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/http_req.cpp ${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/http_service.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/auth_factory.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/http_proxy/metrics_actor.cpp ) diff --git a/ydb/core/http_proxy/http_req.cpp b/ydb/core/http_proxy/http_req.cpp index 5c5569980fa..3026a2f8bfb 100644 --- a/ydb/core/http_proxy/http_req.cpp +++ b/ydb/core/http_proxy/http_req.cpp @@ -1,33 +1,30 @@ #include "events.h" #include "http_req.h" #include "auth_factory.h" +#include "json_proto_conversion.h" -#include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h> -#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h> +#include <library/cpp/actors/http/http_proxy.h> +#include <library/cpp/cgiparam/cgiparam.h> +#include <library/cpp/http/misc/parsed_request.h> +#include <library/cpp/http/server/response.h> + +#include <ydb/core/base/appdata.h> #include <ydb/core/grpc_caching/cached_grpc_request_actor.h> #include <ydb/core/grpc_services/local_rpc/local_rpc.h> #include <ydb/core/protos/serverless_proxy_config.pb.h> -#include <ydb/services/datastreams/shard_iterator.h> -#include <ydb/services/datastreams/next_token.h> -#include <ydb/services/datastreams/datastreams_proxy.h> - #include <ydb/core/viewer/json/json.h> -#include <ydb/core/base/appdata.h> - -#include <ydb/library/naming_conventions/naming_conventions.h> -#include <ydb/library/yql/public/issue/yql_issue_message.h> #include <ydb/library/http_proxy/authorization/auth_helpers.h> #include <ydb/library/http_proxy/error/error.h> +#include <ydb/library/yql/public/issue/yql_issue_message.h> + +#include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h> + +#include <ydb/services/datastreams/datastreams_proxy.h> +#include <ydb/services/datastreams/next_token.h> +#include <ydb/services/datastreams/shard_iterator.h> #include <ydb/services/lib/sharding/sharding.h> -#include <library/cpp/cgiparam/cgiparam.h> -#include <library/cpp/http/misc/parsed_request.h> -#include <library/cpp/http/server/response.h> -#include <library/cpp/json/json_reader.h> -#include <library/cpp/json/json_writer.h> -#include <library/cpp/protobuf/json/json_output_create.h> -#include <library/cpp/protobuf/json/proto2json.h> -#include <library/cpp/protobuf/json/proto2json_printer.h> #include <library/cpp/uri/uri.h> @@ -38,6 +35,7 @@ #include <util/string/join.h> #include <util/string/vector.h> + namespace NKikimr::NHttpProxy { using namespace google::protobuf; @@ -176,269 +174,9 @@ namespace NKikimr::NHttpProxy { constexpr TStringBuf REQUEST_DATE_HEADER = "x-amz-date"; constexpr TStringBuf REQUEST_FORWARDED_FOR = "x-forwarded-for"; constexpr TStringBuf REQUEST_TARGET_HEADER = "x-amz-target"; + constexpr TStringBuf REQUEST_CONTENT_TYPE_HEADER = "content-type"; static const TString CREDENTIAL_PARAM = "credential"; - namespace { - class TYdsProtoToJsonPrinter : public NProtobufJson::TProto2JsonPrinter { - public: - TYdsProtoToJsonPrinter(const google::protobuf::Reflection* reflection, const NProtobufJson::TProto2JsonConfig& config) - : NProtobufJson::TProto2JsonPrinter(config) - , ProtoReflection(reflection) - {} - - protected: - template <bool InMapContext> - void PrintDoubleValue(const TStringBuf& key, double value, - NProtobufJson::IJsonOutput& json) { - if constexpr(InMapContext) { - json.WriteKey(key).Write(value); - } else { - json.Write(value); - } - } - - void PrintField(const NProtoBuf::Message& proto, const NProtoBuf::FieldDescriptor& field, - NProtobufJson::IJsonOutput& json, TStringBuf key = {}) override - { - if (field.options().HasExtension(FieldTransformer)) { - if (field.options().GetExtension(FieldTransformer) == TRANSFORM_BASE64) { - Y_ENSURE(field.cpp_type() == google::protobuf::FieldDescriptor::CPPTYPE_STRING, - "Base64 is only supported for strings"); - - if (!key) { - key = MakeKey(field); - } - - if (field.is_repeated()) { - for (int i = 0, endI = ProtoReflection->FieldSize(proto, &field); i < endI; ++i) { - PrintStringValue<false>(field, TStringBuf(), Base64Encode(proto.GetReflection()->GetRepeatedString(proto, &field, i)), json); - } - } else { - PrintStringValue<true>(field, key, Base64Encode(proto.GetReflection()->GetString(proto, &field)), json); - } - return; - } - - if (field.options().GetExtension(FieldTransformer) == TRANSFORM_DOUBLE_S_TO_INT_MS) { - Y_ENSURE(field.cpp_type() == google::protobuf::FieldDescriptor::CPPTYPE_INT64, - "Double S to Int MS is only supported for int64 timestamps"); - - if (!key) { - key = MakeKey(field); - } - - if (field.is_repeated()) { - for (int i = 0, endI = ProtoReflection->FieldSize(proto, &field); i < endI; ++i) { - double value = proto.GetReflection()->GetRepeatedInt64(proto, &field, i) / 1000.0; - PrintDoubleValue<false>(TStringBuf(), value, json); - } - } else { - double value = proto.GetReflection()->GetInt64(proto, &field) / 1000.0; - PrintDoubleValue<true>(key, value, json); - } - return; - } - - if (field.options().GetExtension(FieldTransformer) == TRANSFORM_EMPTY_TO_NOTHING) { - Y_ENSURE(field.cpp_type() == google::protobuf::FieldDescriptor::CPPTYPE_STRING, - "Empty to nothing is only supported for strings"); - - if (!key) { - key = MakeKey(field); - } - - if (field.is_repeated()) { - for (int i = 0, endI = ProtoReflection->FieldSize(proto, &field); i < endI; ++i) { - auto value = proto.GetReflection()->GetRepeatedString(proto, &field, i); - if (!value.empty()) { - PrintStringValue<false>(field, TStringBuf(), proto.GetReflection()->GetRepeatedString(proto, &field, i), json); - } - } - } else { - auto value = proto.GetReflection()->GetString(proto, &field); - if (!value.empty()) { - PrintStringValue<true>(field, key, proto.GetReflection()->GetString(proto, &field), json); - } - } - return; - } - } else { - return NProtobufJson::TProto2JsonPrinter::PrintField(proto, field, json, key); - } - } - - private: - const google::protobuf::Reflection* ProtoReflection = nullptr; - }; - - TString ProxyFieldNameConverter(const google::protobuf::FieldDescriptor& descriptor) { - return NNaming::SnakeToCamelCase(descriptor.name()); - } - - void ProtoToJson(const Message& resp, NJson::TJsonValue& value) { - auto config = NProtobufJson::TProto2JsonConfig() - .SetFormatOutput(false) - .SetMissingSingleKeyMode(NProtobufJson::TProto2JsonConfig::MissingKeyDefault) - .SetNameGenerator(ProxyFieldNameConverter) - .SetEnumMode(NProtobufJson::TProto2JsonConfig::EnumName); - TYdsProtoToJsonPrinter printer(resp.GetReflection(), config); - printer.Print(resp, *NProtobufJson::CreateJsonMapOutput(value)); - } - - void JsonToProto(Message* message, const NJson::TJsonValue& jsonValue, ui32 depth = 0) { - Y_ENSURE(depth < 100, "Too deep map"); - Y_ENSURE(jsonValue.IsMap(), "Top level of json value is not a map"); - auto* desc = message->GetDescriptor(); - auto* reflection = message->GetReflection(); - for (const auto& [key, value] : jsonValue.GetMap()) { - auto* fieldDescriptor = desc->FindFieldByName(NNaming::CamelToSnakeCase(key)); - Y_ENSURE(fieldDescriptor, "Unexpected json key: " + key); - auto transformer = Ydb::DataStreams::V1::TRANSFORM_NONE; - if (fieldDescriptor->options().HasExtension(FieldTransformer)) { - transformer = fieldDescriptor->options().GetExtension(FieldTransformer); - } - - if (value.IsArray()) { - Y_ENSURE(fieldDescriptor->is_repeated()); - for (auto& elem : value.GetArray()) { - switch (transformer) { - case Ydb::DataStreams::V1::TRANSFORM_BASE64: { - Y_ENSURE(fieldDescriptor->cpp_type() == - google::protobuf::FieldDescriptor::CPPTYPE_STRING, - "Base64 transformer is only applicable to strings"); - reflection->AddString(message, fieldDescriptor, Base64Decode(value.GetString())); - break; - } - case Ydb::DataStreams::V1::TRANSFORM_DOUBLE_S_TO_INT_MS: { - reflection->SetInt64(message, fieldDescriptor, value.GetDouble() * 1000); - break; - } - case Ydb::DataStreams::V1::TRANSFORM_EMPTY_TO_NOTHING: - case Ydb::DataStreams::V1::TRANSFORM_NONE: { - switch (fieldDescriptor->cpp_type()) { - case google::protobuf::FieldDescriptor::CPPTYPE_INT32: - reflection->AddInt32(message, fieldDescriptor, value.GetInteger()); - break; - case google::protobuf::FieldDescriptor::CPPTYPE_INT64: - reflection->AddInt64(message, fieldDescriptor, value.GetInteger()); - break; - case google::protobuf::FieldDescriptor::CPPTYPE_UINT32: - reflection->AddUInt32(message, fieldDescriptor, value.GetUInteger()); - break; - case google::protobuf::FieldDescriptor::CPPTYPE_UINT64: - reflection->AddUInt64(message, fieldDescriptor, value.GetUInteger()); - break; - case google::protobuf::FieldDescriptor::CPPTYPE_DOUBLE: - reflection->AddDouble(message, fieldDescriptor, value.GetDouble()); - break; - case google::protobuf::FieldDescriptor::CPPTYPE_FLOAT: - reflection->AddFloat(message, fieldDescriptor, value.GetDouble()); - break; - case google::protobuf::FieldDescriptor::CPPTYPE_BOOL: - reflection->AddFloat(message, fieldDescriptor, value.GetBoolean()); - break; - case google::protobuf::FieldDescriptor::CPPTYPE_ENUM: - { - const EnumValueDescriptor* enumValueDescriptor = fieldDescriptor->enum_type()->FindValueByName(value.GetString()); - i32 number{0}; - if (enumValueDescriptor == nullptr && TryFromString(value.GetString(), number)) { - enumValueDescriptor = fieldDescriptor->enum_type()->FindValueByNumber(number); - } - if (enumValueDescriptor != nullptr) { - reflection->AddEnum(message, fieldDescriptor, enumValueDescriptor); - } - } - break; - case google::protobuf::FieldDescriptor::CPPTYPE_STRING: - reflection->AddString(message, fieldDescriptor, value.GetString()); - break; - case google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE: { - Message *msg = reflection->AddMessage(message, fieldDescriptor); - JsonToProto(msg, elem, depth + 1); - break; - } - default: - Y_ENSURE(false, "Unexpected type"); - } - break; - } - default: - Y_ENSURE(false, "Unknown transformer type"); - } - } - } else { - switch (transformer) { - case Ydb::DataStreams::V1::TRANSFORM_BASE64: { - Y_ENSURE(fieldDescriptor->cpp_type() == - google::protobuf::FieldDescriptor::CPPTYPE_STRING, - "Base64 transformer is applicable only to strings"); - reflection->SetString(message, fieldDescriptor, Base64Decode(value.GetString())); - break; - } - case Ydb::DataStreams::V1::TRANSFORM_DOUBLE_S_TO_INT_MS: { - reflection->SetInt64(message, fieldDescriptor, value.GetDouble() * 1000); - break; - } - case Ydb::DataStreams::V1::TRANSFORM_EMPTY_TO_NOTHING: - case Ydb::DataStreams::V1::TRANSFORM_NONE: { - switch (fieldDescriptor->cpp_type()) { - case google::protobuf::FieldDescriptor::CPPTYPE_INT32: - reflection->SetInt32(message, fieldDescriptor, value.GetInteger()); - break; - case google::protobuf::FieldDescriptor::CPPTYPE_INT64: - reflection->SetInt64(message, fieldDescriptor, value.GetInteger()); - break; - case google::protobuf::FieldDescriptor::CPPTYPE_UINT32: - reflection->SetUInt32(message, fieldDescriptor, value.GetUInteger()); - break; - case google::protobuf::FieldDescriptor::CPPTYPE_UINT64: - reflection->SetUInt64(message, fieldDescriptor, value.GetUInteger()); - break; - case google::protobuf::FieldDescriptor::CPPTYPE_DOUBLE: - reflection->SetDouble(message, fieldDescriptor, value.GetDouble()); - break; - case google::protobuf::FieldDescriptor::CPPTYPE_FLOAT: - reflection->SetFloat(message, fieldDescriptor, value.GetDouble()); - break; - case google::protobuf::FieldDescriptor::CPPTYPE_BOOL: - reflection->SetBool(message, fieldDescriptor, value.GetBoolean()); - break; - case google::protobuf::FieldDescriptor::CPPTYPE_ENUM: - { - const EnumValueDescriptor* enumValueDescriptor = fieldDescriptor->enum_type()->FindValueByName(value.GetString()); - i32 number{0}; - if (enumValueDescriptor == nullptr && TryFromString(value.GetString(), number)) { - enumValueDescriptor = fieldDescriptor->enum_type()->FindValueByNumber(number); - } - if (enumValueDescriptor != nullptr) { - reflection->SetEnum(message, fieldDescriptor, enumValueDescriptor); - } - } - break; - case google::protobuf::FieldDescriptor::CPPTYPE_STRING: - reflection->SetString(message, fieldDescriptor, value.GetString()); - break; - case google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE: { - auto *msg = reflection->MutableMessage(message, fieldDescriptor); - JsonToProto(msg, value, depth + 1); - break; - } - default: - Y_ENSURE(false, "Unexpected type"); - } - break; - } - default: - Y_ENSURE(false, "Unexpected transformer"); - } - } - } - } - - } - - - template<class TProtoRequest> void FillInputCustomMetrics(const TProtoRequest& request, const THttpRequestContext& httpContext, const TActorContext& ctx) { Y_UNUSED(request, httpContext, ctx); @@ -810,10 +548,8 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second") void HandleGrpcResponse(TEvServerlessProxy::TEvGrpcRequestResult::TPtr ev, const TActorContext& ctx) { - // convert grpc result to protobuf - // return http response; if (ev->Get()->Status->IsSuccess()) { - ProtoToJson(*ev->Get()->Message, HttpContext.ResponseData.ResponseBody); + ProtoToJson(*ev->Get()->Message, HttpContext.ResponseData.Body); FillOutputCustomMetrics<TProtoResult>( *(dynamic_cast<TProtoResult*>(ev->Get()->Message.Get())), HttpContext, ctx); ReportLatencyCounters(ctx); @@ -856,7 +592,7 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second") void Bootstrap(const TActorContext& ctx) { StartTime = ctx.Now(); try { - JsonToProto(&Request, HttpContext.RequestBody); + JsonToProto(HttpContext.RequestData.Body, &Request); } catch (std::exception& e) { LOG_SP_WARN_S(ctx, NKikimrServices::HTTP_PROXY, "got new request with incorrect json from [" << SourceAddress << "] " << @@ -912,10 +648,6 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second") TActorId AuthActor; }; - - - - private: TString Method; @@ -965,12 +697,15 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second") #undef DECLARE_PROCESSOR } - bool THttpRequestProcessors::Execute(const TString& name, THttpRequestContext&& context, THolder<NKikimr::NSQS::TAwsRequestSignV4> signature, const TActorContext& ctx) { + bool THttpRequestProcessors::Execute(const TString& name, THttpRequestContext&& context, + THolder<NKikimr::NSQS::TAwsRequestSignV4> signature, + const TActorContext& ctx) { // TODO: To be removed by CLOUD-79086 if (name == "RegisterStreamConsumer" || name == "DeregisterStreamConsumer" || name == "ListStreamConsumers") { - context.SendBadRequest(NYdb::EStatus::BAD_REQUEST, TStringBuilder() << "Unsupported method name " << name, ctx); + context.SendBadRequest(NYdb::EStatus::BAD_REQUEST, + TStringBuilder() << "Unsupported method name " << name, ctx); return false; } @@ -978,7 +713,8 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second") proc->second->Execute(std::move(context), std::move(signature), ctx); return true; } - context.SendBadRequest(NYdb::EStatus::BAD_REQUEST, TStringBuilder() << "Unknown method name " << name, ctx); + context.SendBadRequest(NYdb::EStatus::BAD_REQUEST, + TStringBuilder() << "Unknown method name " << name, ctx); return false; } @@ -991,8 +727,61 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second") } } - void THttpRequestContext::ParseHeaders(TStringBuf str) - { + THttpRequestContext::THttpRequestContext( + const NKikimrConfig::TServerlessProxyConfig& config, + NHttp::THttpIncomingRequestPtr request, + NActors::TActorId sender, + NYdb::TDriver* driver, + std::shared_ptr<NYdb::ICredentialsProvider> serviceAccountCredentialsProvider) + : ServiceConfig(config) + , Request(request) + , Sender(sender) + , Driver(driver) + , ServiceAccountCredentialsProvider(serviceAccountCredentialsProvider) { + char address[INET6_ADDRSTRLEN]; + if (inet_ntop(AF_INET6, &(Request->Address), address, INET6_ADDRSTRLEN) == nullptr) { + SourceAddress = "unknown"; + } else { + SourceAddress = address; + } + + DatabaseName = Request->URL; + if (DatabaseName == "/") { + DatabaseName = ""; + } + } + + void THttpRequestContext::SendBadRequest(NYdb::EStatus status, const TString& errorText, + const TActorContext& ctx) { + ResponseData.Body.SetType(NJson::JSON_MAP); + ResponseData.Body["message"] = errorText; + ResponseData.Body["__type"] = StatusToErrorType(status); + + LOG_SP_INFO_S(ctx, NKikimrServices::HTTP_PROXY, + "reply with status: " << status << " message: " << errorText); + auto res = Request->CreateResponse( + TStringBuilder() << (int)StatusToHttpCode(status), + StatusToErrorType(status), + strByMime(ContentType), + ResponseData.DumpBody(ContentType) + ); + ctx.Send(Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(res)); + } + + void THttpRequestContext::DoReply(const TActorContext& ctx) { + if (ResponseData.Status == NYdb::EStatus::SUCCESS) { + LOG_SP_INFO_S(ctx, NKikimrServices::HTTP_PROXY, "reply ok"); + auto res = Request->CreateResponseOK( + ResponseData.DumpBody(ContentType), + strByMime(ContentType) + ); + ctx.Send(Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(res)); + } else { + SendBadRequest(ResponseData.Status, ResponseData.ErrorText, ctx); + } + } + + void THttpRequestContext::ParseHeaders(TStringBuf str) { TString sourceReqId; NHttp::THeaders headers(str); for (const auto& header : headers.Headers) { @@ -1011,11 +800,34 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second") TVector<TString> parts = SplitString(requestTarget, "."); ApiVersion = parts[0]; MethodName = parts[1]; + } else if (AsciiEqualsIgnoreCase(header.first, REQUEST_CONTENT_TYPE_HEADER)) { + ContentType = mimeByStr(header.second); } else if (AsciiEqualsIgnoreCase(header.first, REQUEST_DATE_HEADER)) { } } RequestId = GenerateRequestId(sourceReqId); } + std::optional<NJson::TJsonValue> THttpRequestData::Parse(MimeTypes contentType, const TStringBuf& body) { + auto requestJsonStr = body; + + switch (contentType) { + case MIME_JSON: { + NJson::TJsonValue requestBody; + auto fromJson = NJson::ReadJsonTree(requestJsonStr, &requestBody); + return fromJson ? std::optional(requestBody) : std::nullopt; + } + default: + return std::nullopt; + } + } + TString THttpResponseData::DumpBody(MimeTypes contentType) { + switch (contentType) { + case MIME_JSON: + default: { + return NJson::WriteJson(Body, false); + } + } + } } // namespace NKikimr::NHttpProxy diff --git a/ydb/core/http_proxy/http_req.h b/ydb/core/http_proxy/http_req.h index 9b1f01af3e4..0fc93910c4d 100644 --- a/ydb/core/http_proxy/http_req.h +++ b/ydb/core/http_proxy/http_req.h @@ -48,19 +48,31 @@ private: struct THttpResponseData { NYdb::EStatus Status = NYdb::EStatus::SUCCESS; - NJson::TJsonValue ResponseBody; + NJson::TJsonValue Body; TString ErrorText; + + TString DumpBody(MimeTypes contentType); +}; + +struct THttpRequestData { + NJson::TJsonValue Body; + std::optional<NJson::TJsonValue> Parse(MimeTypes contentType, const TStringBuf& body); }; struct THttpRequestContext { - THttpRequestContext( const NKikimrConfig::TServerlessProxyConfig& config, - std::shared_ptr<NYdb::ICredentialsProvider> serviceAccountCredentialsProvider) - : ServiceConfig(config) - , ServiceAccountCredentialsProvider(serviceAccountCredentialsProvider) - {} + THttpRequestContext(const NKikimrConfig::TServerlessProxyConfig& config, + NHttp::THttpIncomingRequestPtr request, + NActors::TActorId sender, + NYdb::TDriver* driver, + std::shared_ptr<NYdb::ICredentialsProvider> serviceAccountCredentialsProvider); + const NKikimrConfig::TServerlessProxyConfig& ServiceConfig; + NHttp::THttpIncomingRequestPtr Request; + NActors::TActorId Sender; + NYdb::TDriver* Driver; + std::shared_ptr<NYdb::ICredentialsProvider> ServiceAccountCredentialsProvider; THttpResponseData ResponseData; - NJson::TJsonValue RequestBody; + THttpRequestData RequestData; TString ServiceAccountId; TString RequestId; TString DiscoveryEndpoint; @@ -72,18 +84,11 @@ struct THttpRequestContext { TString SourceAddress; TString MethodName; TString ApiVersion; - - NHttp::THttpIncomingRequestPtr Request; - NActors::TActorId Sender; - + MimeTypes ContentType; TString IamToken; TString SerializedUserToken; - const NKikimrConfig::TServerlessProxyConfig& ServiceConfig; - NYdb::TDriver* Driver; - std::shared_ptr<NYdb::ICredentialsProvider> ServiceAccountCredentialsProvider; - TStringBuilder LogPrefix() const - { + TStringBuilder LogPrefix() const { return TStringBuilder() << "http request [" << MethodName << "] requestId [" << RequestId << "]"; } @@ -97,7 +102,9 @@ public: virtual ~IHttpRequestProcessor() = default; virtual const TString& Name() const = 0; - virtual void Execute(THttpRequestContext&& context, THolder<NKikimr::NSQS::TAwsRequestSignV4> signature, const TActorContext& ctx) = 0; + virtual void Execute(THttpRequestContext&& context, + THolder<NKikimr::NSQS::TAwsRequestSignV4> signature, + const TActorContext& ctx) = 0; }; class THttpRequestProcessors { @@ -107,7 +114,9 @@ public: public: void Initialize(); - bool Execute(const TString& name, THttpRequestContext&& params, THolder<NKikimr::NSQS::TAwsRequestSignV4> signature, const TActorContext& ctx); + bool Execute(const TString& name, THttpRequestContext&& params, + THolder<NKikimr::NSQS::TAwsRequestSignV4> signature, + const TActorContext& ctx); private: THashMap<TString, THolder<IHttpRequestProcessor>> Name2Processor; diff --git a/ydb/core/http_proxy/http_service.cpp b/ydb/core/http_proxy/http_service.cpp index b1b880d015f..6ac0abe6a34 100644 --- a/ydb/core/http_proxy/http_service.cpp +++ b/ydb/core/http_proxy/http_service.cpp @@ -14,6 +14,7 @@ #include <util/stream/file.h> + namespace NKikimr::NHttpProxy { using namespace NActors; @@ -21,43 +22,10 @@ namespace NKikimr::NHttpProxy { class THttpProxyActor : public NActors::TActorBootstrapped<THttpProxyActor> { using TBase = NActors::TActorBootstrapped<THttpProxyActor>; public: - explicit THttpProxyActor(const THttpProxyConfig& cfg) - : Config(cfg.Config) - { - ServiceAccountCredentialsProvider = cfg.CredentialsProvider; - Processors = MakeHolder<THttpRequestProcessors>(); - Processors->Initialize(); - if (cfg.UseSDK) { - auto config = NYdb::TDriverConfig().SetNetworkThreadsNum(1) - .SetGRpcKeepAlivePermitWithoutCalls(true) - .SetGRpcKeepAliveTimeout(TDuration::Seconds(90)) - .SetDiscoveryMode(NYdb::EDiscoveryMode::Async); - if (Config.GetCaCert()) { - config.UseSecureConnection(TFileInput(Config.GetCaCert()).ReadAll()); - } - Driver = MakeHolder<NYdb::TDriver>(std::move(config)); - } - } - - void Bootstrap(const TActorContext& ctx) { - TBase::Become(&THttpProxyActor::StateWork); - const auto& config = Config.GetHttpConfig(); - THolder<NHttp::TEvHttpProxy::TEvAddListeningPort> ev = MakeHolder<NHttp::TEvHttpProxy::TEvAddListeningPort>(config.GetPort()); - ev->Secure = config.GetSecure(); - ev->CertificateFile = config.GetCert(); - ev->PrivateKeyFile = config.GetKey(); - - ctx.Send(new NActors::IEventHandle(MakeHttpServerServiceID(), TActorId(), - ev.Release(), 0, true)); - ctx.Send(MakeHttpServerServiceID(), new NHttp::TEvHttpProxy::TEvRegisterHandler("/", MakeHttpProxyID())); - } - - TStringBuilder LogPrefix() const { - return TStringBuilder() << "proxy service:"; - } - - void Initialize(); + explicit THttpProxyActor(const THttpProxyConfig& cfg); + void Bootstrap(const TActorContext& ctx); + TStringBuilder LogPrefix() const; private: STFUNC(StateWork) { @@ -74,32 +42,41 @@ namespace NKikimr::NHttpProxy { std::shared_ptr<NYdb::ICredentialsProvider> ServiceAccountCredentialsProvider; }; - void THttpRequestContext::SendBadRequest(NYdb::EStatus status, const TString& errorText, - const TActorContext& ctx) { - NJson::TJsonValue value; - value.SetType(NJson::JSON_MAP); - value["message"] = errorText; - value["__type"] = StatusToErrorType(status); - - LOG_SP_INFO_S(ctx, NKikimrServices::HTTP_PROXY, "reply with status: " << status << " message: " << errorText); - auto res = Request->CreateResponse( - TStringBuilder() << (int)StatusToHttpCode(status), - StatusToErrorType(status), - strByMime(MIME_JSON), - NJson::WriteJson(value, false) - ); - ctx.Send(Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(res)); + THttpProxyActor::THttpProxyActor(const THttpProxyConfig& cfg) + : Config(cfg.Config) + { + ServiceAccountCredentialsProvider = cfg.CredentialsProvider; + Processors = MakeHolder<THttpRequestProcessors>(); + Processors->Initialize(); + if (cfg.UseSDK) { + auto config = NYdb::TDriverConfig().SetNetworkThreadsNum(1) + .SetGRpcKeepAlivePermitWithoutCalls(true) + .SetGRpcKeepAliveTimeout(TDuration::Seconds(90)) + .SetDiscoveryMode(NYdb::EDiscoveryMode::Async); + if (Config.GetCaCert()) { + config.UseSecureConnection(TFileInput(Config.GetCaCert()).ReadAll()); + } + Driver = MakeHolder<NYdb::TDriver>(std::move(config)); + } } - void THttpRequestContext::DoReply(const TActorContext& ctx) { - if (ResponseData.Status == NYdb::EStatus::SUCCESS) { - LOG_SP_INFO_S(ctx, NKikimrServices::HTTP_PROXY, "reply ok"); - auto res = Request->CreateResponseOK(NJson::WriteJson(ResponseData.ResponseBody, false), - strByMime(MIME_JSON)); - ctx.Send(Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(res)); - } else { - SendBadRequest(ResponseData.Status, ResponseData.ErrorText, ctx); - } + TStringBuilder THttpProxyActor::LogPrefix() const { + return TStringBuilder() << "proxy service:"; + } + + void THttpProxyActor::Bootstrap(const TActorContext& ctx) { + TBase::Become(&THttpProxyActor::StateWork); + const auto& config = Config.GetHttpConfig(); + THolder<NHttp::TEvHttpProxy::TEvAddListeningPort> ev = + MakeHolder<NHttp::TEvHttpProxy::TEvAddListeningPort>(config.GetPort()); + ev->Secure = config.GetSecure(); + ev->CertificateFile = config.GetCert(); + ev->PrivateKeyFile = config.GetKey(); + + ctx.Send(new NActors::IEventHandle(MakeHttpServerServiceID(), TActorId(), + ev.Release(), 0, true)); + ctx.Send(MakeHttpServerServiceID(), + new NHttp::TEvHttpProxy::TEvRegisterHandler("/", MakeHttpProxyID())); } void THttpProxyActor::Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr& ev, @@ -110,44 +87,42 @@ namespace NKikimr::NHttpProxy { return; } - THttpRequestContext context(Config, ServiceAccountCredentialsProvider); - context.Request = ev->Get()->Request; - context.Sender = ev->Sender; - context.ParseHeaders(context.Request->Headers); - - char address[INET6_ADDRSTRLEN]; - if (inet_ntop(AF_INET6, &(context.Request->Address), address, INET6_ADDRSTRLEN) != nullptr) { - context.SourceAddress = address; - } else { - context.SourceAddress = "unknown"; - } - - context.Driver = Driver.Get(); - context.DatabaseName = context.Request->URL; + THttpRequestContext context(Config, + ev->Get()->Request, + ev->Sender, + Driver.Get(), + ServiceAccountCredentialsProvider); - if (context.DatabaseName == "/") { - context.DatabaseName = ""; - } + context.ParseHeaders(context.Request->Headers); - LOG_SP_INFO_S(ctx, NKikimrServices::HTTP_PROXY, " incoming request from [" << context.SourceAddress << "] request [" << context.MethodName << "] url [" << context.Request->URL - << "] database [" << context.DatabaseName << "] requestId: " << context.RequestId); + LOG_SP_INFO_S(ctx, NKikimrServices::HTTP_PROXY, + " incoming request from [" << context.SourceAddress << "]" << + " request [" << context.MethodName << "]" << + " url [" << context.Request->URL << "]" << + " database [" << context.DatabaseName << "]" << + " requestId: " << context.RequestId); - if (!NJson::ReadJsonTree(context.Request->Body, &context.RequestBody)) { - context.SendBadRequest(NYdb::EStatus::BAD_REQUEST, "Couldn't parse json from http request body", ctx); + const auto requestBody = context.RequestData.Parse(context.ContentType, context.Request->Body); + if (requestBody) { + context.RequestData.Body = requestBody.value(); + } else { + context.SendBadRequest(NYdb::EStatus::BAD_REQUEST, "Can not parse request body", ctx); return; } + THolder<NKikimr::NSQS::TAwsRequestSignV4> signature; if (context.IamToken.empty()) { try { - TString fullRequest = TString(context.Request->Method) + " " - + context.Request->URL + " " + context.Request->Protocol - + "/" + context.Request->Version + "\r\n" - + context.Request->Headers - + context.Request->Content; + const TString fullRequest = TString(context.Request->Method) + " " + + context.Request->URL + " " + context.Request->Protocol + + "/" + context.Request->Version + "\r\n" + + context.Request->Headers + + context.Request->Content; signature = MakeHolder<NKikimr::NSQS::TAwsRequestSignV4>(fullRequest); } catch(NKikimr::NSQS::TSQSException& e) { - context.SendBadRequest(NYdb::EStatus::BAD_REQUEST, TStringBuilder() << "Malformed signature: " << e.what(), ctx); + context.SendBadRequest(NYdb::EStatus::BAD_REQUEST, + TStringBuilder() << "Malformed signature: " << e.what(), ctx); return; } } @@ -155,9 +130,8 @@ namespace NKikimr::NHttpProxy { Processors->Execute(context.MethodName, std::move(context), std::move(signature), ctx); } - NActors::IActor* CreateHttpProxy(const THttpProxyConfig& config) { return new THttpProxyActor(config); } -} +} // namespace NKikimr::NHttpProxy diff --git a/ydb/core/http_proxy/json_proto_conversion.h b/ydb/core/http_proxy/json_proto_conversion.h new file mode 100644 index 00000000000..5f9be4972eb --- /dev/null +++ b/ydb/core/http_proxy/json_proto_conversion.h @@ -0,0 +1,279 @@ +#pragma once + +#include <library/cpp/json/json_reader.h> +#include <library/cpp/json/json_writer.h> +#include <library/cpp/protobuf/json/json_output_create.h> +#include <library/cpp/protobuf/json/proto2json.h> +#include <library/cpp/protobuf/json/proto2json_printer.h> +#include <ydb/library/naming_conventions/naming_conventions.h> + + +namespace NKikimr::NHttpProxy { + +TString ProxyFieldNameConverter(const google::protobuf::FieldDescriptor& descriptor) { + return NNaming::SnakeToCamelCase(descriptor.name()); +} + +class TYdsProtoToJsonPrinter : public NProtobufJson::TProto2JsonPrinter { +public: + TYdsProtoToJsonPrinter(const google::protobuf::Reflection* reflection, + const NProtobufJson::TProto2JsonConfig& config) + : NProtobufJson::TProto2JsonPrinter(config) + , ProtoReflection(reflection) + {} + +protected: + template <bool InMapContext> + void PrintDoubleValue(const TStringBuf& key, double value, + NProtobufJson::IJsonOutput& json) { + if constexpr(InMapContext) { + json.WriteKey(key).Write(value); + } else { + json.Write(value); + } + } + + void PrintField(const NProtoBuf::Message& proto, const NProtoBuf::FieldDescriptor& field, + NProtobufJson::IJsonOutput& json, TStringBuf key = {}) override { + if (field.options().HasExtension(Ydb::DataStreams::V1::FieldTransformer)) { + if (field.options().GetExtension(Ydb::DataStreams::V1::FieldTransformer) == + Ydb::DataStreams::V1::TRANSFORM_BASE64) { + Y_ENSURE(field.cpp_type() == google::protobuf::FieldDescriptor::CPPTYPE_STRING, + "Base64 is only supported for strings"); + if (!key) { + key = MakeKey(field); + } + + if (field.is_repeated()) { + for (int i = 0, endI = ProtoReflection->FieldSize(proto, &field); i < endI; ++i) { + PrintStringValue<false>(field, TStringBuf(), + Base64Encode(proto.GetReflection()->GetRepeatedString(proto, &field, i)), json); + } + } else { + PrintStringValue<true>(field, key, + Base64Encode(proto.GetReflection()->GetString(proto, &field)), json); + } + return; + } + + if (field.options().GetExtension(Ydb::DataStreams::V1::FieldTransformer) == + Ydb::DataStreams::V1::TRANSFORM_DOUBLE_S_TO_INT_MS) { + Y_ENSURE(field.cpp_type() == google::protobuf::FieldDescriptor::CPPTYPE_INT64, + "Double S to Int MS is only supported for int64 timestamps"); + + if (!key) { + key = MakeKey(field); + } + + if (field.is_repeated()) { + for (int i = 0, endI = ProtoReflection->FieldSize(proto, &field); i < endI; ++i) { + double value = proto.GetReflection()->GetRepeatedInt64(proto, &field, i) / 1000.0; + PrintDoubleValue<false>(TStringBuf(), value, json); + } + } else { + double value = proto.GetReflection()->GetInt64(proto, &field) / 1000.0; + PrintDoubleValue<true>(key, value, json); + } + return; + } + + if (field.options().GetExtension(Ydb::DataStreams::V1::FieldTransformer) == + Ydb::DataStreams::V1::TRANSFORM_EMPTY_TO_NOTHING) { + Y_ENSURE(field.cpp_type() == google::protobuf::FieldDescriptor::CPPTYPE_STRING, + "Empty to nothing is only supported for strings"); + + if (!key) { + key = MakeKey(field); + } + + if (field.is_repeated()) { + for (int i = 0, endI = ProtoReflection->FieldSize(proto, &field); i < endI; ++i) { + auto value = proto.GetReflection()->GetRepeatedString(proto, &field, i); + if (!value.empty()) { + PrintStringValue<false>(field, TStringBuf(), + proto.GetReflection()->GetRepeatedString(proto, &field, i), json); + } + } + } else { + auto value = proto.GetReflection()->GetString(proto, &field); + if (!value.empty()) { + PrintStringValue<true>(field, key, + proto.GetReflection()->GetString(proto, &field), json); + } + } + return; + } + } else { + return NProtobufJson::TProto2JsonPrinter::PrintField(proto, field, json, key); + } + } + +private: + const google::protobuf::Reflection* ProtoReflection = nullptr; +}; + +void ProtoToJson(const NProtoBuf::Message& resp, NJson::TJsonValue& value) { + auto config = NProtobufJson::TProto2JsonConfig() + .SetFormatOutput(false) + .SetMissingSingleKeyMode(NProtobufJson::TProto2JsonConfig::MissingKeyDefault) + .SetNameGenerator(ProxyFieldNameConverter) + .SetEnumMode(NProtobufJson::TProto2JsonConfig::EnumName); + TYdsProtoToJsonPrinter printer(resp.GetReflection(), config); + printer.Print(resp, *NProtobufJson::CreateJsonMapOutput(value)); +} + +void JsonToProto(const NJson::TJsonValue& jsonValue, NProtoBuf::Message* message, ui32 depth = 0) { + Y_ENSURE(depth < 101, "Json depth is > 100"); + Y_ENSURE(jsonValue.IsMap(), "Top level of json value is not a map"); + auto* desc = message->GetDescriptor(); + auto* reflection = message->GetReflection(); + for (const auto& [key, value] : jsonValue.GetMap()) { + auto* fieldDescriptor = desc->FindFieldByName(NNaming::CamelToSnakeCase(key)); + Y_ENSURE(fieldDescriptor, "Unexpected json key: " + key); + auto transformer = Ydb::DataStreams::V1::TRANSFORM_NONE; + if (fieldDescriptor->options().HasExtension(Ydb::DataStreams::V1::FieldTransformer)) { + transformer = fieldDescriptor->options().GetExtension(Ydb::DataStreams::V1::FieldTransformer); + } + + if (value.IsArray()) { + Y_ENSURE(fieldDescriptor->is_repeated()); + for (auto& elem : value.GetArray()) { + switch (transformer) { + case Ydb::DataStreams::V1::TRANSFORM_BASE64: { + Y_ENSURE(fieldDescriptor->cpp_type() == + google::protobuf::FieldDescriptor::CPPTYPE_STRING, + "Base64 transformer is only applicable to strings"); + reflection->AddString(message, fieldDescriptor, Base64Decode(value.GetString())); + break; + } + case Ydb::DataStreams::V1::TRANSFORM_DOUBLE_S_TO_INT_MS: { + reflection->SetInt64(message, fieldDescriptor, value.GetDouble() * 1000); + break; + } + case Ydb::DataStreams::V1::TRANSFORM_EMPTY_TO_NOTHING: + case Ydb::DataStreams::V1::TRANSFORM_NONE: { + switch (fieldDescriptor->cpp_type()) { + case google::protobuf::FieldDescriptor::CPPTYPE_INT32: + reflection->AddInt32(message, fieldDescriptor, value.GetInteger()); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_INT64: + reflection->AddInt64(message, fieldDescriptor, value.GetInteger()); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_UINT32: + reflection->AddUInt32(message, fieldDescriptor, value.GetUInteger()); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_UINT64: + reflection->AddUInt64(message, fieldDescriptor, value.GetUInteger()); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_DOUBLE: + reflection->AddDouble(message, fieldDescriptor, value.GetDouble()); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_FLOAT: + reflection->AddFloat(message, fieldDescriptor, value.GetDouble()); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_BOOL: + reflection->AddFloat(message, fieldDescriptor, value.GetBoolean()); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_ENUM: + { + const NProtoBuf::EnumValueDescriptor* enumValueDescriptor = + fieldDescriptor->enum_type()->FindValueByName(value.GetString()); + i32 number{0}; + if (enumValueDescriptor == nullptr && + TryFromString(value.GetString(), number)) { + enumValueDescriptor = + fieldDescriptor->enum_type()->FindValueByNumber(number); + } + if (enumValueDescriptor != nullptr) { + reflection->AddEnum(message, fieldDescriptor, enumValueDescriptor); + } + } + break; + case google::protobuf::FieldDescriptor::CPPTYPE_STRING: + reflection->AddString(message, fieldDescriptor, value.GetString()); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE: { + NProtoBuf::Message *msg = reflection->AddMessage(message, fieldDescriptor); + JsonToProto(elem, msg, depth + 1); + break; + } + default: + Y_ENSURE(false, "Unexpected type"); + } + break; + } + default: + Y_ENSURE(false, "Unknown transformer type"); + } + } + } else { + switch (transformer) { + case Ydb::DataStreams::V1::TRANSFORM_BASE64: { + Y_ENSURE(fieldDescriptor->cpp_type() == + google::protobuf::FieldDescriptor::CPPTYPE_STRING, + "Base64 transformer is applicable only to strings"); + reflection->SetString(message, fieldDescriptor, Base64Decode(value.GetString())); + break; + } + case Ydb::DataStreams::V1::TRANSFORM_DOUBLE_S_TO_INT_MS: { + reflection->SetInt64(message, fieldDescriptor, value.GetDouble() * 1000); + break; + } + case Ydb::DataStreams::V1::TRANSFORM_EMPTY_TO_NOTHING: + case Ydb::DataStreams::V1::TRANSFORM_NONE: { + switch (fieldDescriptor->cpp_type()) { + case google::protobuf::FieldDescriptor::CPPTYPE_INT32: + reflection->SetInt32(message, fieldDescriptor, value.GetInteger()); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_INT64: + reflection->SetInt64(message, fieldDescriptor, value.GetInteger()); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_UINT32: + reflection->SetUInt32(message, fieldDescriptor, value.GetUInteger()); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_UINT64: + reflection->SetUInt64(message, fieldDescriptor, value.GetUInteger()); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_DOUBLE: + reflection->SetDouble(message, fieldDescriptor, value.GetDouble()); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_FLOAT: + reflection->SetFloat(message, fieldDescriptor, value.GetDouble()); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_BOOL: + reflection->SetBool(message, fieldDescriptor, value.GetBoolean()); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_ENUM: { + const NProtoBuf::EnumValueDescriptor* enumValueDescriptor = + fieldDescriptor->enum_type()->FindValueByName(value.GetString()); + i32 number{0}; + if (enumValueDescriptor == nullptr && + TryFromString(value.GetString(), number)) { + enumValueDescriptor = + fieldDescriptor->enum_type()->FindValueByNumber(number); + } + if (enumValueDescriptor != nullptr) { + reflection->SetEnum(message, fieldDescriptor, enumValueDescriptor); + } + break; + } + case google::protobuf::FieldDescriptor::CPPTYPE_STRING: + reflection->SetString(message, fieldDescriptor, value.GetString()); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE: { + auto *msg = reflection->MutableMessage(message, fieldDescriptor); + JsonToProto(value, msg, depth + 1); + break; + } + default: + Y_ENSURE(false, "Unexpected type"); + } + break; + } + default: Y_ENSURE(false, "Unexpected transformer"); + } + } + } +} + +} // namespace NKikimr::NHttpProxy |