diff options
author | uzhas <uzhas@ydb.tech> | 2022-09-19 19:34:13 +0300 |
---|---|---|
committer | uzhas <uzhas@ydb.tech> | 2022-09-19 19:34:13 +0300 |
commit | de8ed12807e529db73db4fe1349384868c5b3050 (patch) | |
tree | 9202ce2134ad1fd97b6392a468ec611e89d29180 | |
parent | 2e63107dc1d53825b8b603102e941c9b2753b26f (diff) | |
download | ydb-de8ed12807e529db73db4fe1349384868c5b3050.tar.gz |
fq public HTTP API lib, part1
-rw-r--r-- | ydb/core/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 1 | ||||
-rw-r--r-- | ydb/core/protos/services.proto | 1 | ||||
-rw-r--r-- | ydb/core/public_http/CMakeLists.txt | 34 | ||||
-rw-r--r-- | ydb/core/public_http/fq_handlers.h | 349 | ||||
-rw-r--r-- | ydb/core/public_http/grpc_request_context_wrapper.cpp | 75 | ||||
-rw-r--r-- | ydb/core/public_http/grpc_request_context_wrapper.h | 52 | ||||
-rw-r--r-- | ydb/core/public_http/http_handler.h | 10 | ||||
-rw-r--r-- | ydb/core/public_http/http_req.cpp | 182 | ||||
-rw-r--r-- | ydb/core/public_http/http_req.h | 61 | ||||
-rw-r--r-- | ydb/core/public_http/http_router.cpp | 86 | ||||
-rw-r--r-- | ydb/core/public_http/http_router.h | 48 | ||||
-rw-r--r-- | ydb/core/public_http/http_service.cpp | 122 | ||||
-rw-r--r-- | ydb/core/public_http/http_service.h | 20 |
14 files changed, 1042 insertions, 0 deletions
diff --git a/ydb/core/CMakeLists.txt b/ydb/core/CMakeLists.txt index e197eb407b..a22589679e 100644 --- a/ydb/core/CMakeLists.txt +++ b/ydb/core/CMakeLists.txt @@ -37,6 +37,7 @@ add_subdirectory(node_whiteboard) add_subdirectory(persqueue) add_subdirectory(pgproxy) add_subdirectory(protos) +add_subdirectory(public_http) add_subdirectory(quoter) add_subdirectory(scheme) add_subdirectory(scheme_types) diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 0173752ce3..c760e40b6e 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1626,6 +1626,7 @@ message TAppConfig { optional TSchemeShardConfig SchemeShardConfig = 54; optional TTracingConfig TracingConfig = 55; optional TFailureInjectionConfig FailureInjectionConfig = 56; + optional THttpProxyConfig PublicHttpConfig = 57; repeated TNamedConfig NamedConfigs = 100; optional string ClusterYamlConfig = 101; diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto index edfdc741af..8bfe3c992a 100644 --- a/ydb/core/protos/services.proto +++ b/ydb/core/protos/services.proto @@ -308,6 +308,7 @@ enum EServiceKikimr { FQ_QUOTA_SERVICE = 1154; YQ_RATE_LIMITER = 1155; FQ_QUOTA_PROXY = 1156; + PUBLIC_HTTP = 1157; // 1024 - 1099 is reserved for nbs diff --git a/ydb/core/public_http/CMakeLists.txt b/ydb/core/public_http/CMakeLists.txt new file mode 100644 index 0000000000..80ab96584f --- /dev/null +++ b/ydb/core/public_http/CMakeLists.txt @@ -0,0 +1,34 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(ydb-core-public_http) +target_compile_options(ydb-core-public_http PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(ydb-core-public_http PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + cpp-actors-http + cpp-protobuf-json + ydb-core-base + ydb-core-http_proxy + core-grpc_services-local_rpc + ydb-core-protos + core-viewer-json + core-viewer-protos + yq-libs-result_formatter + yql-public-issue +) +target_sources(ydb-core-public_http PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/public_http/http_req.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/public_http/http_router.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/public_http/http_service.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/public_http/grpc_request_context_wrapper.cpp +) diff --git a/ydb/core/public_http/fq_handlers.h b/ydb/core/public_http/fq_handlers.h new file mode 100644 index 0000000000..ff3aeaf2d8 --- /dev/null +++ b/ydb/core/public_http/fq_handlers.h @@ -0,0 +1,349 @@ +#pragma once + +#include "grpc_request_context_wrapper.h" + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/protobuf/json/json2proto.h> +#include <ydb/core/protos/services.pb.h> +#include <ydb/core/grpc_services/grpc_request_proxy.h> +#include <ydb/core/grpc_services/service_yq.h> +#include <ydb/core/viewer/protos/fq.pb.h> +#include <ydb/core/yq/libs/result_formatter/result_formatter.h> + +namespace NKikimr::NPublicHttp { + +using namespace NActors; + +using ::google::protobuf::Descriptor; +using ::google::protobuf::FieldDescriptor; +using ::google::protobuf::Reflection; +using ::google::protobuf::RepeatedField; +using ::google::protobuf::RepeatedPtrField; + +#define SIMPLE_COPY_FIELD(field) dst.set_##field(src.field()) +#define SIMPLE_COPY_RENAME_FIELD(srcField, dstField) dst.set_##dstField(src.srcField()) + +#define SIMPLE_COPY_MUTABLE_FIELD(field) *dst.mutable_##field() = src.field() +#define SIMPLE_COPY_MUTABLE_RENAME_FIELD(srcField, dstField) *dst.mutable_##dstField() = src.srcField() + +#define SIMPLE_COPY_REPEATABLE_FIELD(field) FqConvert(src.field(), *dst.mutable_##field()) +#define SIMPLE_COPY_REPEATABLE_RENAME_FIELD(srcField, dstField) FqConvert(src.srcField(), *dst.mutable_##dstField()) + +#define FQ_CONVERT_FIELD(field) FqConvert(src.field(), *dst.mutable_##field()) +#define FQ_CONVERT_RENAME_FIELD(srcField, dstField) FqConvert(src.srcField(), *dst.mutable_##dstField()) + +static constexpr auto APPLICATION_JSON = "application/json"sv; + +template <typename T> +void FqConvert(const T& src, T& dst) { + dst.CopyFrom(src); +} + +template <typename T> +void FqConvert(const T& src, ::google::protobuf::Empty& dst) { + Y_UNUSED(src); + Y_UNUSED(dst); +} + +template <typename T, typename U> +void FqConvert(const RepeatedPtrField<T>& src, RepeatedPtrField<U>& dst) { + dst.Reserve(src.size()); + for (auto& v : src) { + FqConvert(v, *dst.Add()); + } +} + +void FqConvert(const Ydb::Operations::Operation& src, FederatedQueryHttp::Error& dst) { + SIMPLE_COPY_FIELD(status); + SIMPLE_COPY_MUTABLE_FIELD(issues); +} + +#define FQ_CONVERT_QUERY_CONTENT(srcType, dstType) \ +void FqConvert(const srcType& src, dstType& dst) { \ + SIMPLE_COPY_FIELD(type); \ + SIMPLE_COPY_FIELD(name); \ + SIMPLE_COPY_FIELD(text); \ + SIMPLE_COPY_FIELD(description); \ +} + +FQ_CONVERT_QUERY_CONTENT(FederatedQueryHttp::CreateQueryRequest, YandexQuery::QueryContent); +FQ_CONVERT_QUERY_CONTENT(YandexQuery::QueryContent, FederatedQueryHttp::GetQueryResult); + +void FqConvert(const FederatedQueryHttp::CreateQueryRequest& src, YandexQuery::CreateQueryRequest& dst) { + FqConvert(src, *dst.mutable_content()); + + dst.set_execute_mode(YandexQuery::RUN); + + auto& content = *dst.mutable_content(); + if (content.type() == YandexQuery::QueryContent::QUERY_TYPE_UNSPECIFIED) { + content.set_type(YandexQuery::QueryContent::ANALYTICS); + } + + if (content.acl().visibility() == YandexQuery::Acl::VISIBILITY_UNSPECIFIED) { + content.mutable_acl()->set_visibility(YandexQuery::Acl::PRIVATE); + } + + content.set_automatic(true); +} + +void FqConvert(const YandexQuery::CreateQueryResult& src, FederatedQueryHttp::CreateQueryResult& dst) { + SIMPLE_COPY_RENAME_FIELD(query_id, id); +} + +void FqConvert(const YandexQuery::CommonMeta& src, FederatedQueryHttp::QueryMeta& dst) { + SIMPLE_COPY_MUTABLE_FIELD(created_at); +} + +void FqConvert(const YandexQuery::QueryMeta& src, FederatedQueryHttp::QueryMeta& dst) { + SIMPLE_COPY_MUTABLE_FIELD(submitted_at); + SIMPLE_COPY_MUTABLE_FIELD(finished_at); + FqConvert(src.common(), dst); +} + +FederatedQueryHttp::GetQueryResult::ComputeStatus RemapQueryStatus(YandexQuery::QueryMeta::ComputeStatus status) { + switch (status) { + case YandexQuery::QueryMeta::COMPLETED: + return FederatedQueryHttp::GetQueryResult::COMPLETED; + + case YandexQuery::QueryMeta::ABORTED_BY_USER: + [[fallthrough]]; + case YandexQuery::QueryMeta::ABORTED_BY_SYSTEM: + [[fallthrough]]; + case YandexQuery::QueryMeta::FAILED: + return FederatedQueryHttp::GetQueryResult::FAILED; + + default: + return FederatedQueryHttp::GetQueryResult::RUNNING; + } +} + +void FqConvert(const YandexQuery::ResultSetMeta& src, FederatedQueryHttp::ResultSetMeta& dst) { + SIMPLE_COPY_FIELD(rows_count); + SIMPLE_COPY_FIELD(truncated); +} + +void FqConvert(const YandexQuery::Query& src, FederatedQueryHttp::GetQueryResult& dst) { + FQ_CONVERT_FIELD(meta); + + FqConvert(src.content(), dst); + dst.set_id(src.meta().common().id()); + dst.set_status(RemapQueryStatus(src.meta().status())); + + for (const auto& result_meta : src.result_set_meta()) { + FqConvert(result_meta, *dst.mutable_result_sets()->Add()); + } + + SIMPLE_COPY_MUTABLE_RENAME_FIELD(issue, issues); + dst.mutable_issues()->MergeFrom(src.transient_issue()); +} + +void FqConvert(const FederatedQueryHttp::GetQueryRequest& src, YandexQuery::DescribeQueryRequest& dst) { + SIMPLE_COPY_FIELD(query_id); +} + +void FqConvert(const YandexQuery::DescribeQueryResult& src, FederatedQueryHttp::GetQueryResult& dst) { + FqConvert(src.query(), dst); +} + +void FqConvert(const FederatedQueryHttp::GetQueryStatusRequest& src, YandexQuery::GetQueryStatusRequest& dst) { + SIMPLE_COPY_FIELD(query_id); +} + +void FqConvert(const YandexQuery::GetQueryStatusResult& src, FederatedQueryHttp::GetQueryStatusResult& dst) { + dst.set_status(RemapQueryStatus(src.status())); +} + +void FqConvert(const FederatedQueryHttp::StopQueryRequest& src, YandexQuery::ControlQueryRequest& dst) { + SIMPLE_COPY_FIELD(query_id); + dst.set_action(YandexQuery::ABORT); +} + +void FqConvert(const FederatedQueryHttp::GetResultDataRequest& src, YandexQuery::GetResultDataRequest& dst) { + SIMPLE_COPY_FIELD(query_id); + SIMPLE_COPY_FIELD(result_set_index); + SIMPLE_COPY_FIELD(offset); + SIMPLE_COPY_FIELD(limit); + + if (!dst.limit()) { + dst.set_limit(100); + } +} + +void FqConvert(const YandexQuery::GetResultDataResult& src, FederatedQueryHttp::GetResultDataResult& dst) { + SIMPLE_COPY_MUTABLE_FIELD(result_set); +} + +template <typename T> +void FqPackToJson(TStringStream& json, const T& httpResult, const TJsonSettings& jsonSettings) { + TProtoToJson::ProtoToJson(json, httpResult, jsonSettings); +} + +void FqPackToJson(TStringStream& json, const FederatedQueryHttp::GetResultDataResult& httpResult, const TJsonSettings&) { + auto resultSet = NYdb::TResultSet(httpResult.result_set()); + NJson::TJsonValue v; + NYq::FormatResultSet(v, resultSet, true, true); + NJson::TJsonWriterConfig jsonWriterConfig; + jsonWriterConfig.WriteNanAsString = true; + NJson::WriteJson(&json, &v, jsonWriterConfig); +} + +template <typename T> +void SetIdempotencyKey(T& dst, const TString& key) { + Y_UNUSED(dst); + Y_UNUSED(key); + + if constexpr ( + std::is_same<T, YandexQuery::CreateQueryRequest>::value || + std::is_same<T, YandexQuery::ControlQueryRequest>::value) + { + dst.set_idempotency_key(key); + } +} + +template <typename GrpcProtoRequestType, typename HttpProtoRequestType, typename GrpcProtoResultType, typename HttpProtoResultType, typename GrpcProtoResponseType> +class TGrpcCallWrapper : public TActorBootstrapped<TGrpcCallWrapper<GrpcProtoRequestType, HttpProtoRequestType, GrpcProtoResultType, HttpProtoResultType, GrpcProtoResponseType>> { + THttpRequestContext RequestContext; + + typedef std::function<std::unique_ptr<NGRpcService::TEvProxyRuntimeEvent>(TIntrusivePtr<NGrpc::IRequestContextBase> ctx)> TGrpcProxyEventFactory; + TGrpcProxyEventFactory EventFactory; + + NProtobufJson::TJson2ProtoConfig Json2ProtoConfig; + +public: + typedef GrpcProtoRequestType TGrpcProtoRequestType; + typedef HttpProtoRequestType THttpProtoRequestType; + typedef GrpcProtoResultType TGrpcProtoResultType; + typedef HttpProtoResultType THttpProtoResultType; + typedef GrpcProtoResponseType TGrpcProtoResponseType; + +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::VIEWER_HANDLER; + } + + TGrpcCallWrapper(const THttpRequestContext& requestContext, TGrpcProxyEventFactory eventFactory) + : RequestContext(requestContext) + , EventFactory(eventFactory) + { + Json2ProtoConfig = NProtobufJson::TJson2ProtoConfig() + .SetFieldNameMode(NProtobufJson::TJson2ProtoConfig::FieldNameCamelCase) + .SetMapAsObject(true); + } + + void Bootstrap(const TActorContext& ctx) { + auto grpcRequest = std::make_unique<TGrpcProtoRequestType>(); + if (Parse(*grpcRequest)) { + TIntrusivePtr<TGrpcRequestContextWrapper> requestContext = new TGrpcRequestContextWrapper(RequestContext, std::move(grpcRequest), &SendReply); + ctx.Send(NGRpcService::CreateGRpcRequestProxyId(), EventFactory(requestContext).release()); + } + + this->Die(ctx); + } + + bool Parse(TGrpcProtoRequestType& grpcRequest) { + const auto& httpRequest = *RequestContext.GetHttpRequest(); + try { + THttpProtoRequestType request; + if (httpRequest.Method == "POST"sv && RequestContext.GetContentType() == APPLICATION_JSON) { + NProtobufJson::Json2Proto(httpRequest.Body, request, Json2ProtoConfig); + } + + NHttp::TUrlParameters params(httpRequest.URL); + for (const auto& [name, value] : params.Parameters) { + SetProtoMessageField(request, name, value); + } + RequestContext.SetDb(TString(params.Get("db"))); + RequestContext.SetProject(TString(params.Get("project"))); + + // path params should overwrite query params in case of conflict + for (const auto& [name, value] : RequestContext.GetPathParams()) { + SetProtoMessageField(request, name, value); + } + FqConvert(request, grpcRequest); + SetIdempotencyKey(grpcRequest, RequestContext.GetIdempotencyKey()); + + return true; + } catch (const std::exception& e) { + ReplyError(TStringBuilder() << "Error in parsing: " << e.what()); + return false; + } + } + + static void SetProtoMessageField(THttpProtoRequestType& request, TStringBuf name, TStringBuf value) { + const Reflection* reflection = request.GetReflection(); + const Descriptor* descriptor = request.GetDescriptor(); + auto field = descriptor->FindFieldByLowercaseName(TString(name)); + if (!field) { + return; + } + + switch (field->cpp_type()) { + case FieldDescriptor::CPPTYPE_INT32: + return reflection->SetInt32(&request, field, FromString<i32>(value)); + case FieldDescriptor::CPPTYPE_INT64: + return reflection->SetInt64(&request, field, FromString<i64>(value)); + case FieldDescriptor::CPPTYPE_UINT32: + return reflection->SetUInt32(&request, field, FromString<ui32>(value)); + case FieldDescriptor::CPPTYPE_UINT64: + return reflection->SetUInt64(&request, field, FromString<ui64>(value)); + case FieldDescriptor::CPPTYPE_STRING: + return reflection->SetString(&request, field, TString(value)); + default: + break; + } + } + + void ReplyError(const TString& error) { + RequestContext.ResponseBadRequest(Ydb::StatusIds::BAD_REQUEST, error); + } + + static void SendReply(const THttpRequestContext& requestContext, const TJsonSettings& jsonSettings, NProtoBuf::Message* resp, ui32 status) { + Y_VERIFY(resp); + Y_VERIFY(resp->GetArena()); + Y_UNUSED(status); + auto* typedResponse = static_cast<TGrpcProtoResponseType*>(resp); + if (!typedResponse->operation().result().template Is<TGrpcProtoResultType>()) { + TStringStream json; + auto* httpResult = google::protobuf::Arena::CreateMessage<FederatedQueryHttp::Error>(resp->GetArena()); + FqConvert(typedResponse->operation(), *httpResult); + FqPackToJson(json, *httpResult, jsonSettings); + + requestContext.ResponseBadRequestJson(typedResponse->operation().status(), json.Str()); + return; + } + + auto* grpcResult = google::protobuf::Arena::CreateMessage<TGrpcProtoResultType>(resp->GetArena()); + typedResponse->operation().result().UnpackTo(grpcResult); + + if (THttpProtoResultType::descriptor()->full_name() == google::protobuf::Empty::descriptor()->full_name()) { + requestContext.ResponseNoContent(); + return; + } + + TStringStream json; + auto* httpResult = google::protobuf::Arena::CreateMessage<THttpProtoResultType>(resp->GetArena()); + FqConvert(*grpcResult, *httpResult); + FqPackToJson(json, *httpResult, jsonSettings); + + requestContext.ResponseOKJson(json.Str()); + } +}; + +#define DECLARE_YQ_GRPC_ACTOR_IMPL(action, internalAction, t1, t2, t3, t4, t5) \ +class TJson##action : public TGrpcCallWrapper<t1, t2, t3, t4, t5> { \ + typedef TGrpcCallWrapper<t1, t2, t3, t4, t5> TBase; \ +public: \ + explicit TJson##action(const THttpRequestContext& request) \ + : TBase(request, &NGRpcService::Create##internalAction##RequestOperationCall) {} \ +} + +#define DECLARE_YQ_GRPC_ACTOR(action, internalAction) DECLARE_YQ_GRPC_ACTOR_IMPL(action, internalAction, YandexQuery::internalAction##Request, FederatedQueryHttp::action##Request, YandexQuery::internalAction##Result, FederatedQueryHttp::action##Result, YandexQuery::internalAction##Response) +#define DECLARE_YQ_GRPC_ACTOR_WIHT_EMPTY_RESULT(action, internalAction) DECLARE_YQ_GRPC_ACTOR_IMPL(action, internalAction, YandexQuery::internalAction##Request, FederatedQueryHttp::action##Request, YandexQuery::internalAction##Result, ::google::protobuf::Empty, YandexQuery::internalAction##Response) + +DECLARE_YQ_GRPC_ACTOR(CreateQuery, CreateQuery); +DECLARE_YQ_GRPC_ACTOR(GetQuery, DescribeQuery); +DECLARE_YQ_GRPC_ACTOR(GetQueryStatus, GetQueryStatus); +DECLARE_YQ_GRPC_ACTOR_WIHT_EMPTY_RESULT(StopQuery, ControlQuery); +DECLARE_YQ_GRPC_ACTOR(GetResultData, GetResultData); + +} // namespace NKikimr::NPublicHttp diff --git a/ydb/core/public_http/grpc_request_context_wrapper.cpp b/ydb/core/public_http/grpc_request_context_wrapper.cpp new file mode 100644 index 0000000000..6b0fd7ce17 --- /dev/null +++ b/ydb/core/public_http/grpc_request_context_wrapper.cpp @@ -0,0 +1,75 @@ +#include "grpc_request_context_wrapper.h" + +namespace NKikimr::NPublicHttp { + + TGrpcRequestContextWrapper::TGrpcRequestContextWrapper(const THttpRequestContext& requestContext, std::unique_ptr<NProtoBuf::Message> request, TReplySender replySender) + : RequestContext(requestContext) + , LongProject("yandexcloud://" + requestContext.GetProject()) // todo: remove prefix + , Request(std::move(request)) + , ReplySender(std::move(replySender)) + , AuthState(true) + , DeadlineAt(TInstant::Max()) + { + JsonSettings.EnumAsNumbers = false; + JsonSettings.UI64AsString = true; + JsonSettings.EmptyRepeated = true; + } + + const NProtoBuf::Message* TGrpcRequestContextWrapper::GetRequest() const { + return Request.get(); + } + + NGrpc::TAuthState& TGrpcRequestContextWrapper::GetAuthState() { + return AuthState; + } + + void TGrpcRequestContextWrapper::Reply(NProtoBuf::Message* resp, ui32 status) { + Y_UNUSED(resp); + Y_UNUSED(status); + Y_VERIFY(resp); + ReplySender(RequestContext, JsonSettings, resp, status); + } + + void TGrpcRequestContextWrapper::Reply(grpc::ByteBuffer* resp, ui32 status) { + Y_UNUSED(resp); + Y_UNUSED(status); + Y_VERIFY(false, "TGrpcRequestContextWrapper::Reply"); + } + + void TGrpcRequestContextWrapper::ReplyUnauthenticated(const TString& in) { + RequestContext.ResponseUnauthenticated(in); + } + + void TGrpcRequestContextWrapper::ReplyError(grpc::StatusCode code, const TString& msg, const TString& details) { + auto errorMsg = TStringBuilder() << "Unexpected error. code: " << (int)code << ", msg: " << msg << ", details: " << details; + RequestContext.ResponseBadRequest(Ydb::StatusIds::BAD_REQUEST, errorMsg); + } + + TInstant TGrpcRequestContextWrapper::Deadline() const { + return DeadlineAt; + } + + TSet<TStringBuf> TGrpcRequestContextWrapper::GetPeerMetaKeys() const { + return {}; + } + + TVector<TStringBuf> TGrpcRequestContextWrapper::GetPeerMetaValues(TStringBuf key) const { + if (key == "x-ydb-database"sv) { + return { RequestContext.GetDb() }; + } + + if (key == "x-ydb-fq-project"sv) { + return { LongProject }; + } + + if (key == "x-ydb-auth-ticket"sv) { + return { RequestContext.GetToken() }; + } + + return { }; + } + + google::protobuf::Arena* TGrpcRequestContextWrapper::GetArena() { + return &Arena; + } +} // namespace NKikimr::NPublicHttp diff --git a/ydb/core/public_http/grpc_request_context_wrapper.h b/ydb/core/public_http/grpc_request_context_wrapper.h new file mode 100644 index 0000000000..70c1189872 --- /dev/null +++ b/ydb/core/public_http/grpc_request_context_wrapper.h @@ -0,0 +1,52 @@ +#pragma once + +#include "http_req.h" + +#include <ydb/core/grpc_services/base/base.h> +#include <ydb/core/viewer/json/json.h> +#include <ydb/public/api/protos/yq.pb.h> +#include <library/cpp/actors/http/http_proxy.h> + +namespace NKikimr::NPublicHttp { + +typedef std::function<void(const THttpRequestContext& requestContext, const TJsonSettings& jsonSettings, NProtoBuf::Message* resp, ui32 status)> TReplySender; + +class TGrpcRequestContextWrapper : public NGrpc::IRequestContextBase { +private: + THttpRequestContext RequestContext; + TString LongProject; + std::unique_ptr<NProtoBuf::Message> Request; + TReplySender ReplySender; + NGrpc::TAuthState AuthState; + google::protobuf::Arena Arena; + TJsonSettings JsonSettings; + TInstant DeadlineAt; + +public: + TGrpcRequestContextWrapper(const THttpRequestContext& requestContext, std::unique_ptr<NProtoBuf::Message> request, TReplySender replySender); + virtual const NProtoBuf::Message* GetRequest() const; + virtual NGrpc::TAuthState& GetAuthState(); + virtual void Reply(NProtoBuf::Message* resp, ui32 status = 0); + virtual void Reply(grpc::ByteBuffer* resp, ui32 status = 0); + virtual void ReplyUnauthenticated(const TString& in); + virtual void ReplyError(grpc::StatusCode code, const TString& msg, const TString& details); + virtual TInstant Deadline() const; + virtual TSet<TStringBuf> GetPeerMetaKeys() const; + virtual TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const; + virtual grpc_compression_level GetCompressionLevel() const { return GRPC_COMPRESS_LEVEL_NONE; } + + virtual google::protobuf::Arena* GetArena(); + + virtual void AddTrailingMetadata(const TString&, const TString&) {} + + virtual void UseDatabase(const TString& ) {} + + virtual void SetNextReplyCallback(TOnNextReply&&) {} + virtual void FinishStreamingOk() {} + virtual TAsyncFinishResult GetFinishFuture() { return {}; } + virtual TString GetPeer() const { return {}; } + virtual bool SslServer() const { return false; } +}; + +} // namespace NKikimr::NPublicHttp + diff --git a/ydb/core/public_http/http_handler.h b/ydb/core/public_http/http_handler.h new file mode 100644 index 0000000000..7237862905 --- /dev/null +++ b/ydb/core/public_http/http_handler.h @@ -0,0 +1,10 @@ +#pragma once + +#include "http_req.h" +#include <library/cpp/actors/http/http_proxy.h> + +namespace NKikimr::NPublicHttp { + +using THttpHandler = std::function<NActors::IActor*(const THttpRequestContext& request)>; + +} // namespace NKikimr::NPublicHttp diff --git a/ydb/core/public_http/http_req.cpp b/ydb/core/public_http/http_req.cpp new file mode 100644 index 0000000000..68fec8e328 --- /dev/null +++ b/ydb/core/public_http/http_req.cpp @@ -0,0 +1,182 @@ +#include "http_req.h" + +#include <library/cpp/actors/http/http_proxy.h> +//#include <library/cpp/http/misc/parsed_request.h> +//#include <library/cpp/http/server/response.h> + +//#include <ydb/core/grpc_caching/cached_grpc_request_actor.h> +//#include <ydb/core/grpc_services/local_rpc/local_rpc.h> +#include <ydb/core/http_proxy/http_req.h> + +//#include <library/cpp/uri/uri.h> + +#include <util/generic/guid.h> +#include <util/string/ascii.h> +//#include <util/string/cast.h> + +namespace NKikimr::NPublicHttp { + constexpr TStringBuf AUTHORIZATION_HEADER = "authorization"; + constexpr TStringBuf REQUEST_ID_HEADER = "x-request-id"; + constexpr TStringBuf REQUEST_CONTENT_TYPE_HEADER = "content-type"; + constexpr TStringBuf REQUEST_FORWARDED_FOR = "x-forwarded-for"; + constexpr TStringBuf IDEMPOTENCY_KEY_HEADER = "idempotency-key"; + + constexpr TStringBuf APPLICATION_JSON = "application/json"; + + TString GenerateRequestId(const TString& sourceReqId) { + if (sourceReqId.empty()) { + return CreateGuidAsString(); + } + + return TStringBuilder() << CreateGuidAsString() << "-" << sourceReqId; + } + + TString HttpCodeFamily(TStringBuf code) { + if (code.Size() != 3) { + return "unknown"; + } + + return TStringBuilder() << code[0] << "xx"; + } + + THttpRequestContext::THttpRequestContext(TActorSystem* actorSystem, NHttp::THttpIncomingRequestPtr request, NActors::TActorId sender, TInstant startedAt, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters) + : ActorSystem(actorSystem) + , Request(request) + , Sender(sender) + , StartedAt(startedAt) + , Counters(counters) + { + Y_VERIFY(ActorSystem); + Y_VERIFY(Request); + ParseHeaders(Request->Headers); + } + + void THttpRequestContext::SetPathParams(std::map<TString, TString> pathParams) { + PathParams = std::move(pathParams); + } + + const std::map<TString, TString>& THttpRequestContext::GetPathParams() const { + return PathParams; + } + + void THttpRequestContext::SetPathPattern(const TString& pathPattern) { + PathPattern = pathPattern; + } + + void THttpRequestContext::SetProject(const TString& project) { + Project = project; + } + + TString THttpRequestContext::GetProject() const { + return Project; + } + + void THttpRequestContext::SetDb(const TString& db) { + Db = db; + } + + TString THttpRequestContext::GetDb() const { + return Db; + } + + TString THttpRequestContext::GetToken() const { + return Token; + } + + NHttp::THttpIncomingRequestPtr THttpRequestContext::GetHttpRequest() const { + return Request; + } + + TString THttpRequestContext::GetContentType() const { + return ContentType; + } + + TString THttpRequestContext::GetIdempotencyKey() const { + return IdempotencyKey; + } + + void THttpRequestContext::ResponseBadRequest(Ydb::StatusIds::StatusCode status, const TString& errorText) const { + DoResponseBadRequest(status, errorText); + } + + void THttpRequestContext::ResponseBadRequestJson(Ydb::StatusIds::StatusCode status, const TString& json) const { + DoResponseBadRequest(status, json, APPLICATION_JSON); + } + + void THttpRequestContext::DoResponseBadRequest(Ydb::StatusIds::StatusCode status, const TString& errorText, TStringBuf contentType) const { + const NYdb::EStatus ydbStatus = static_cast<NYdb::EStatus>(status); + const TString httpCodeStr = ToString((int)NKikimr::NHttpProxy::StatusToHttpCode(ydbStatus)); + DoResponse(httpCodeStr, NKikimr::NHttpProxy::StatusToErrorType(ydbStatus), errorText, contentType); + } + + void THttpRequestContext::ResponseOK() const { + DoResponse("200", "OK"); + } + + void THttpRequestContext::ResponseOKJson(const TString& json) const { + DoResponse("200", "OK", json, APPLICATION_JSON); + } + + void THttpRequestContext::ResponseNotFound() const { + DoResponse("404", "Not Found"); + } + + void THttpRequestContext::ResponseNoContent() const { + DoResponse("204", "No Content"); + } + + void THttpRequestContext::ResponseUnauthenticated(const TString& message) const { + DoResponse("401", "Unauthorized", message); + } + + void THttpRequestContext::DoResponse(TStringBuf status, TStringBuf message, TStringBuf body, TStringBuf contentType) const { + auto res = Request->CreateResponse(status, message, contentType, body); + ActorSystem->Send(Sender, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(res)); + + const TDuration elapsed = TInstant::Now() - StartedAt; + LOG_INFO_S(*ActorSystem, NKikimrServices::PUBLIC_HTTP, + "HTTP response -> code: " << status << + ", method: " << Request->Method << + ", url: " << Request->URL << + ", content type: " << ContentType << + ", request body size: " << Request->Body.Size() << + ", response body size: " << body.Size() << + ", elapsed: " << elapsed << + ", from: " << Request->Address << + ", forwarded for: " << ForwardedFor << + ", request id: " << RequestId << + ", idempotency key: " << IdempotencyKey + ); + + auto group = Counters; + if (Db) { + group = group->GetSubgroup("db", Db); + } + if (Project) { + group = group->GetSubgroup("project", Project); + } + group = group->GetSubgroup("path_pattern", PathPattern)->GetSubgroup("method", TString(Request->Method)); + group->GetSubgroup("code", TString(status))->GetNamedCounter("name", "count", true)->Inc(); + group->GetSubgroup("code", HttpCodeFamily(status))->GetNamedCounter("name", "count", true)->Inc(); + } + + void THttpRequestContext::ParseHeaders(TStringBuf str) { + TString sourceReqId; + NHttp::THeaders headers(str); + for (const auto& header : headers.Headers) { + if (AsciiEqualsIgnoreCase(header.first, AUTHORIZATION_HEADER)) { + Token = header.second; + } else if (AsciiEqualsIgnoreCase(header.first, REQUEST_ID_HEADER)) { + sourceReqId = header.second; + } else if (AsciiEqualsIgnoreCase(header.first, REQUEST_CONTENT_TYPE_HEADER)) { + ContentType = header.second; + } else if (AsciiEqualsIgnoreCase(header.first, IDEMPOTENCY_KEY_HEADER)) { + IdempotencyKey = header.second; + } else if (AsciiEqualsIgnoreCase(header.first, REQUEST_FORWARDED_FOR)) { + ForwardedFor = header.second; + } + } + RequestId = GenerateRequestId(sourceReqId); + } + +} // namespace NKikimr::NPublicHttp diff --git a/ydb/core/public_http/http_req.h b/ydb/core/public_http/http_req.h new file mode 100644 index 0000000000..e7575a0f3d --- /dev/null +++ b/ydb/core/public_http/http_req.h @@ -0,0 +1,61 @@ +#pragma once + +#include <ydb/public/api/protos/ydb_status_codes.pb.h> +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/http/http.h> +//#include <library/cpp/http/server/http.h> + +namespace NKikimr::NPublicHttp { + +using namespace NActors; + +class THttpRequestContext { +public: + THttpRequestContext(TActorSystem* actorSystem, NHttp::THttpIncomingRequestPtr request, NActors::TActorId sender, TInstant startedAt, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters); + THttpRequestContext(const THttpRequestContext&) = default; + THttpRequestContext(const THttpRequestContext&&) = default; + + void ResponseBadRequest(Ydb::StatusIds::StatusCode status, const TString& errorText) const; + void ResponseBadRequestJson(Ydb::StatusIds::StatusCode status, const TString& json) const; + void ResponseOK() const; + void ResponseOKJson(const TString& json) const; + void ResponseNotFound() const; + void ResponseNoContent() const; + void ResponseUnauthenticated(const TString& message) const; + + void SetPathParams(std::map<TString, TString> pathParams); + const std::map<TString, TString>& GetPathParams() const; + void SetPathPattern(const TString& pathPattern); + void SetProject(const TString& project); + TString GetProject() const; + void SetDb(const TString& db); + TString GetDb() const; + TString GetToken() const; + NHttp::THttpIncomingRequestPtr GetHttpRequest() const; + TString GetContentType() const; + TString GetIdempotencyKey() const; + +private: + void ParseHeaders(TStringBuf headers); + void DoResponse(TStringBuf status, TStringBuf message, TStringBuf body = "", TStringBuf contentType = "text/plain") const; + void DoResponseBadRequest(Ydb::StatusIds::StatusCode status, const TString& body, TStringBuf contentType = "text/plain") const; + +private: + TActorSystem* ActorSystem; + NHttp::THttpIncomingRequestPtr Request; + NActors::TActorId Sender; + TInstant StartedAt; + TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters; + + TString ContentType; + TString ForwardedFor; + TString RequestId; + TString IdempotencyKey; + TString Token; + TString PathPattern; + std::map<TString, TString> PathParams; + TString Project; + TString Db; +}; + +} // namespace NKikimr::NPublicHttp diff --git a/ydb/core/public_http/http_router.cpp b/ydb/core/public_http/http_router.cpp new file mode 100644 index 0000000000..e3bb85939a --- /dev/null +++ b/ydb/core/public_http/http_router.cpp @@ -0,0 +1,86 @@ +#include "http_router.h" + +namespace NKikimr::NPublicHttp { + +namespace { + bool MatchPath(TStringBuf pathPattern, TStringBuf path, std::map<TString, TString>& pathParams) { + const char delim = '/'; + pathPattern.SkipPrefix("/"sv); + path.SkipPrefix("/"sv); + + TStringBuf pathPatternComponent = pathPattern.NextTok(delim); + TStringBuf pathComponent = path.NextTok(delim); + while (pathPatternComponent && pathComponent) { + if (pathPatternComponent.StartsWith('{') && pathPatternComponent.EndsWith('}')) { + TStringBuf paramName = pathPatternComponent.SubString(1, pathPatternComponent.Size() - 2); + pathParams.emplace(paramName, pathComponent); + } else { + if (pathPatternComponent != pathComponent) { + return false; + } + } + pathPatternComponent = pathPattern.NextTok(delim); + pathComponent = path.NextTok(delim); + } + + return !pathPattern && !path && !pathPatternComponent && !pathComponent; + } + + HTTP_METHOD ParseMethod(TStringBuf method) { + if (method == "GET"sv) { + return HTTP_METHOD_GET; + } + if (method == "POST"sv) { + return HTTP_METHOD_POST; + } + if (method == "PUT"sv) { + return HTTP_METHOD_PUT; + } + if (method == "DELETE"sv) { + return HTTP_METHOD_DELETE; + } + if (method == "OPTIONS"sv) { + return HTTP_METHOD_OPTIONS; + } + if (method == "HEAD"sv) { + return HTTP_METHOD_HEAD; + } + return HTTP_METHOD_UNDEFINED; + } +} + +void THttpRequestRouter::RegisterHandler(TStringBuf method, const TString& pathPattern, THttpHandler handler) { + RegisterHandler(ParseMethod(method), pathPattern, handler); +} + +void THttpRequestRouter::RegisterHandler(HTTP_METHOD method, const TString& pathPattern, THttpHandler handler) { + Data.emplace(std::make_pair(method, pathPattern), std::move(handler)); +} + +std::optional<THandlerWithParams> THttpRequestRouter::ResolveHandler(TStringBuf method, const TStringBuf& path) const { + return ResolveHandler(ParseMethod(method), path); +} + +std::optional<THandlerWithParams> THttpRequestRouter::ResolveHandler(HTTP_METHOD method, const TStringBuf& path) const { + auto it = Data.find(std::pair<HTTP_METHOD, TString>(method, path)); + if (it != Data.end()) { + return THandlerWithParams(it->first.second, it->second, {}); + } + + for (const auto& [k ,v] : Data) { + if (k.first != method) { + continue; + } + std::map<TString, TString> pathParams; + if (MatchPath(k.second, path, pathParams)) { + return THandlerWithParams(k.second, v, pathParams); + } + } + return {}; +} + +size_t THttpRequestRouter::GetSize() const { + return Data.size(); +} + +} // namespace NKikimr::NPublicHttp diff --git a/ydb/core/public_http/http_router.h b/ydb/core/public_http/http_router.h new file mode 100644 index 0000000000..f53e7a740d --- /dev/null +++ b/ydb/core/public_http/http_router.h @@ -0,0 +1,48 @@ +#pragma once + +#include "http_handler.h" + +namespace NKikimr::NPublicHttp { + +struct THandlerWithParams { + THandlerWithParams(TString pathPattern, THttpHandler handler, std::map<TString, TString> pathParams) + : PathPattern(std::move(pathPattern)) + , Handler(std::move(handler)) + , PathParams(std::move(pathParams)) + { + } + + THandlerWithParams() = default; + THandlerWithParams(THandlerWithParams&&) = default; + THandlerWithParams& operator=(const THandlerWithParams&) = default; + THandlerWithParams& operator=(const THandlerWithParams&&) = default; + + TString PathPattern; + THttpHandler Handler; + std::map<TString, TString> PathParams; +}; + +class THttpRequestRouter { +public: + void RegisterHandler(TStringBuf method, const TString& pathPattern, THttpHandler handler); + void RegisterHandler(HTTP_METHOD method, const TString& pathPattern, THttpHandler handler); + void RegisterGetHandler(const TString& pathPattern, THttpHandler handler) { + RegisterHandler(HTTP_METHOD_GET, pathPattern, handler); + } + + std::optional<THandlerWithParams> ResolveHandler(HTTP_METHOD method, const TStringBuf& path) const; + std::optional<THandlerWithParams> ResolveHandler(TStringBuf method, const TStringBuf& path) const; + size_t GetSize() const; + + template <typename F> + void ForEach(F f) const { + for (const auto& [p, handler] : Data) { + f(p.first, p.second, handler); + } + } + +private: + std::map<std::pair<HTTP_METHOD, TString>, THttpHandler> Data; +}; + +} // namespace NKikimr::NPublicHttp diff --git a/ydb/core/public_http/http_service.cpp b/ydb/core/public_http/http_service.cpp new file mode 100644 index 0000000000..c66eec67c6 --- /dev/null +++ b/ydb/core/public_http/http_service.cpp @@ -0,0 +1,122 @@ +#include "http_service.h" +#include "http_handler.h" +#include "http_req.h" +#include "http_router.h" +#include "fq_handlers.h" + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/events.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/log.h> +#include <library/cpp/actors/http/http_proxy.h> + +#include <util/stream/file.h> + +#include <ydb/library/http_proxy/error/error.h> + +namespace NKikimr::NPublicHttp { +namespace { + template <typename T> + THttpHandler CreateHttpHandler() { + return [](const THttpRequestContext& request) { + return new T(request); + }; + } +} + using namespace NActors; + + class TPublicHttpActor : public NActors::TActorBootstrapped<TPublicHttpActor> { + using TBase = NActors::TActorBootstrapped<TPublicHttpActor>; + private: + const NKikimrConfig::THttpProxyConfig Config; + const TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters; + THttpRequestRouter Router; + + public: + TPublicHttpActor(const NKikimrConfig::THttpProxyConfig& config, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters) + : Config(config) + , Counters(counters) + { + Router.RegisterHandler(HTTP_METHOD_POST, "/api/fq/v1/queries", CreateHttpHandler<TJsonCreateQuery>()); + Router.RegisterHandler(HTTP_METHOD_GET, "/api/fq/v1/queries/{query_id}", CreateHttpHandler<TJsonGetQuery>()); + Router.RegisterHandler(HTTP_METHOD_GET, "/api/fq/v1/queries/{query_id}/status", CreateHttpHandler<TJsonGetQueryStatus>()); + Router.RegisterHandler(HTTP_METHOD_GET, "/api/fq/v1/queries/{query_id}/results/{result_set_index}", CreateHttpHandler<TJsonGetResultData>()); + Router.RegisterHandler(HTTP_METHOD_POST, "/api/fq/v1/queries/{query_id}/stop", CreateHttpHandler<TJsonStopQuery>()); + } + + void Bootstrap(const TActorContext& ctx) { + TBase::Become(&TPublicHttpActor::StateWork); + THolder<NHttp::TEvHttpProxy::TEvAddListeningPort> ev = MakeHolder<NHttp::TEvHttpProxy::TEvAddListeningPort>(Config.GetPort()); + ev->Secure = Config.GetSecure(); + ev->CertificateFile = Config.GetCert(); + ev->PrivateKeyFile = Config.GetKey(); + + ctx.Send(new NActors::IEventHandle(MakePublicHttpServerID(), TActorId(), ev.Release(), 0, true)); + ctx.Send(MakePublicHttpServerID(), new NHttp::TEvHttpProxy::TEvRegisterHandler("/", MakePublicHttpID())); + } + + + private: + STFUNC(StateWork) { + switch (ev->GetTypeRewrite()) { + HFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle); + } + } + + void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr& ev, const TActorContext& ctx) { + auto httpRequest = ev->Get()->Request; + THttpRequestContext requestContext(ctx.ActorSystem(), httpRequest, ev->Sender, TInstant::Now(), Counters); + if (AsciiEqualsIgnoreCase(httpRequest->URL, "/ping")) { + requestContext.ResponseOK(); + return; + } + + /* + LOG_INFO_S(ctx, NKikimrServices::PUBLIC_HTTP, + "incoming request " << + " method [" << httpRequest->Method << "]" << + " url [" << httpRequest->URL << "]" << + " from [" << httpRequest->Address << "]"); + */ + + TStringBuf url = httpRequest->URL.Before('?'); + auto handlerWithParamsO = Router.ResolveHandler(httpRequest->Method, url); + if (!handlerWithParamsO) { + requestContext.ResponseNotFound(); + return; + } + requestContext.SetPathParams(std::move(handlerWithParamsO->PathParams)); + requestContext.SetPathPattern(handlerWithParamsO->PathPattern); + + try { + ctx.ExecutorThread.RegisterActor(handlerWithParamsO->Handler(requestContext)); + } catch (const std::exception& e) { + requestContext.ResponseBadRequest(Ydb::StatusIds::BAD_REQUEST, TStringBuilder() << "Error in request processing: " << e.what()); + return; + } + + } + }; + + NActors::IActor* CreatePublicHttp(const NKikimrConfig::THttpProxyConfig& config, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters) { + return new TPublicHttpActor(config, counters); + } + + void Initialize(NActors::TActorSystemSetup::TLocalServices& localServices, const TAppData& appData, const NKikimrConfig::THttpProxyConfig& config) { + if (!config.GetPort()) { + return; + } + + + auto actor = NKikimr::NPublicHttp::CreatePublicHttp(config, appData.Counters->GetSubgroup("counters", "public_http")); + localServices.push_back(std::pair<TActorId, TActorSetupCmd>( + MakePublicHttpID(), + TActorSetupCmd(actor, TMailboxType::HTSwap, appData.UserPoolId))); + + actor = NHttp::CreateHttpProxy(); + localServices.push_back(std::pair<TActorId, TActorSetupCmd>( + MakePublicHttpServerID(), + TActorSetupCmd(actor, TMailboxType::HTSwap, appData.UserPoolId))); + + } +} // namespace NKikimr::NPublicHttp diff --git a/ydb/core/public_http/http_service.h b/ydb/core/public_http/http_service.h new file mode 100644 index 0000000000..c604bb744f --- /dev/null +++ b/ydb/core/public_http/http_service.h @@ -0,0 +1,20 @@ +#pragma once + +#include <ydb/core/base/appdata.h> +#include <ydb/core/protos/config.pb.h> +#include <library/cpp/actors/core/actor.h> + +namespace NKikimr::NPublicHttp { + inline NActors::TActorId MakePublicHttpServerID() { + static const char x[12] = "pub_http_sr"; + return NActors::TActorId(0, TStringBuf(x, 12)); + } + + inline NActors::TActorId MakePublicHttpID() { + static const char x[12] = "public_http"; + return NActors::TActorId(0, TStringBuf(x, 12)); + } + + void Initialize(NActors::TActorSystemSetup::TLocalServices& localServices, const TAppData& appData, const NKikimrConfig::THttpProxyConfig& config); + +} // namespace NKikimr::NPublicHttp |