diff options
author | Arseny <90683415+what-the-fawk@users.noreply.github.com> | 2024-12-25 18:11:19 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-25 18:11:19 +0300 |
commit | 27d87203f75a7b5a4911238355f87360599cb5e1 (patch) | |
tree | e15b1661d86ab0036d234336492303d3df081bb2 | |
parent | dfda963d7c7cb23081af099ca3312689902e05f2 (diff) | |
download | ydb-27d87203f75a7b5a4911238355f87360599cb5e1.tar.gz |
Http start query (#11759)
Co-authored-by: Arseny Bolotnikov <abolotnikov12@yandex-team.ru>
-rw-r--r-- | ydb/core/fq/libs/http_api_client/http_client.py | 22 | ||||
-rw-r--r-- | ydb/core/public_http/fq_handlers.h | 76 | ||||
-rw-r--r-- | ydb/core/public_http/http_service.cpp | 1 | ||||
-rw-r--r-- | ydb/core/public_http/openapi/openapi.yaml | 25 | ||||
-rw-r--r-- | ydb/tests/fq/http_api/test_http_api.py | 32 |
5 files changed, 150 insertions, 6 deletions
diff --git a/ydb/core/fq/libs/http_api_client/http_client.py b/ydb/core/fq/libs/http_api_client/http_client.py index 88b9e781c5..e5770ce9ad 100644 --- a/ydb/core/fq/libs/http_api_client/http_client.py +++ b/ydb/core/fq/libs/http_api_client/http_client.py @@ -11,7 +11,7 @@ from urllib3.util.retry import Retry from .query_results import YQResults -MAX_RETRY_FOR_SESSION = 4 +MAX_RETRY_FOR_SESSION = 100 BACK_OFF_FACTOR = 0.3 TIME_BETWEEN_RETRIES = 1000 ERROR_CODES = (500, 502, 504) @@ -150,6 +150,22 @@ class YQHttpClient(object): self._validate_http_error(response, expected_code=expected_code) return response.json()["id"] + def start_query( + self, + query_id: str, + request_id=None, + idempotency_key: str | None = None, + expected_code: int = 204, + ): + response = self.session.post( + self._compose_api_url(f"/api/fq/v1/queries/{query_id}/start"), + headers=self._build_headers(idempotency_key=idempotency_key, request_id=request_id), + params=self._build_params(), + ) + + self._validate_http_error(response, expected_code) + return response + def get_query_status(self, query_id, request_id=None, expected_code=200) -> Any: response = self.session.get( self._compose_api_url(f"/api/fq/v1/queries/{query_id}/status"), @@ -272,9 +288,7 @@ class YQHttpClient(object): return YQResults(result).results - def get_query_all_result_sets( - self, query_id: str, result_set_count: int, raw_format: bool = False - ) -> Any: + def get_query_all_result_sets(self, query_id: str, result_set_count: int, raw_format: bool = False) -> Any: result = [] for i in range(0, result_set_count): r = self.get_query_result_set(query_id, result_set_index=i, raw_format=raw_format) diff --git a/ydb/core/public_http/fq_handlers.h b/ydb/core/public_http/fq_handlers.h index 6fd53b9701..1a30ba6b40 100644 --- a/ydb/core/public_http/fq_handlers.h +++ b/ydb/core/public_http/fq_handlers.h @@ -10,6 +10,8 @@ #include <ydb/library/services/services.pb.h> #include <ydb/core/public_http/protos/fq.pb.h> +#include <type_traits> + namespace NKikimr::NPublicHttp { using namespace NActors; @@ -249,6 +251,7 @@ void SetIdempotencyKey(T& dst, const TString& key) { template <typename GrpcProtoRequestType, typename HttpProtoRequestType, typename GrpcProtoResultType, typename HttpProtoResultType, typename GrpcProtoResponseType> class TGrpcCallWrapper : public TActorBootstrapped<TGrpcCallWrapper<GrpcProtoRequestType, HttpProtoRequestType, GrpcProtoResultType, HttpProtoResultType, GrpcProtoResponseType>> { +protected: THttpRequestContext RequestContext; typedef std::function<std::unique_ptr<NGRpcService::TEvProxyRuntimeEvent>(TIntrusivePtr<NYdbGrpc::IRequestContextBase> ctx)> TGrpcProxyEventFactory; @@ -278,6 +281,10 @@ public: } void Bootstrap(const TActorContext& ctx) { + BootstrapWrapper(ctx); + } + + virtual void BootstrapWrapper(const TActorContext& ctx) { auto grpcRequest = std::make_unique<TGrpcProtoRequestType>(); if (Parse(*grpcRequest)) { TIntrusivePtr<TGrpcRequestContextWrapper> requestContext = new TGrpcRequestContextWrapper(RequestContext, std::move(grpcRequest), &SendReply); @@ -354,7 +361,6 @@ public: auto* httpResult = google::protobuf::Arena::CreateMessage<FQHttp::Error>(resp->GetArena()); FqConvert(typedResponse->operation(), *httpResult); FqPackToJson(json, *httpResult, jsonSettings); - requestContext.ResponseBadRequestJson(typedResponse->operation().status(), json.Str()); return; } @@ -396,4 +402,72 @@ DECLARE_YQ_GRPC_ACTOR(GetQueryStatus, GetQueryStatus); DECLARE_YQ_GRPC_ACTOR_WIHT_EMPTY_RESULT(StopQuery, ControlQuery); DECLARE_YQ_GRPC_ACTOR(GetResultData, GetResultData); +class TJsonStartQuery : public TGrpcCallWrapper<FederatedQuery::DescribeQueryRequest, FQHttp::GetQueryRequest, FederatedQuery::ModifyQueryResult, google::protobuf::Empty, FederatedQuery::ModifyQueryResponse> { +public: + typedef TGrpcCallWrapper<FederatedQuery::DescribeQueryRequest, FQHttp::GetQueryRequest, FederatedQuery::ModifyQueryResult, google::protobuf::Empty, FederatedQuery::ModifyQueryResponse> TGrpcCallWrapperBase; + + TJsonStartQuery(const THttpRequestContext& ctx) + : TGrpcCallWrapperBase(ctx, &NGRpcService::CreateFederatedQueryDescribeQueryRequestOperationCall) + {} + + void BootstrapWrapper(const TActorContext& ctx) override { + + auto describeRequest = std::make_unique<FederatedQuery::DescribeQueryRequest>(); + if (!Parse(*describeRequest)) { + this->Die(ctx); + return; + } + + TProtoStringType queryId = describeRequest->Getquery_id(); + TIntrusivePtr<TGrpcRequestContextWrapper> requestContext = MakeIntrusive<TGrpcRequestContextWrapper>( + RequestContext, + std::move(describeRequest), + [query_id = std::move(queryId), actorSystem = TActivationContext::ActorSystem()](const THttpRequestContext& requestContext, const TJsonSettings& jsonSettings, NProtoBuf::Message* resp, ui32 status) { + + Y_ABORT_UNLESS(resp); + Y_ABORT_UNLESS(resp->GetArena()); + + auto* typedResponse = static_cast<FederatedQuery::DescribeQueryResponse*>(resp); + if (!typedResponse->operation().result().template Is<FederatedQuery::DescribeQueryResult>()) { + TStringStream json; + auto httpResult = std::unique_ptr<FQHttp::Error>(new FQHttp::Error()); + FqConvert(typedResponse->operation(), *httpResult); + FqPackToJson(json, *httpResult, jsonSettings); + requestContext.ResponseBadRequestJson(typedResponse->operation().status(), json.Str()); + return; + } + + std::unique_ptr<FederatedQuery::DescribeQueryResult> describeResult = std::unique_ptr<FederatedQuery::DescribeQueryResult>(new FederatedQuery::DescribeQueryResult()); + if (!typedResponse->operation().result().UnpackTo(&*describeResult)) { + requestContext.ResponseBadRequest(Ydb::StatusIds::INTERNAL_ERROR, "Error in response unpack"); + return; + } + + // modify + auto modifyRequest = std::unique_ptr<FederatedQuery::ModifyQueryRequest>(new FederatedQuery::ModifyQueryRequest()); + + modifyRequest->set_query_id(query_id); + *modifyRequest->mutable_content() = describeResult->query().content(); + modifyRequest->set_execute_mode(::FederatedQuery::ExecuteMode::RUN); + modifyRequest->set_state_load_mode(::FederatedQuery::StateLoadMode::STATE_LOAD_MODE_UNSPECIFIED); + modifyRequest->set_previous_revision(describeResult->query().meta().Getlast_job_query_revision()); + modifyRequest->set_idempotency_key(requestContext.GetIdempotencyKey()); + + TIntrusivePtr<TGrpcRequestContextWrapper> requestContextModify = new TGrpcRequestContextWrapper( + requestContext, + std::move(modifyRequest), + TGrpcCallWrapper<FederatedQuery::ModifyQueryRequest, int, FederatedQuery::ModifyQueryResult, google::protobuf::Empty, FederatedQuery::ModifyQueryResponse>::SendReply + ); + + // new event -> new EventFactory + actorSystem->Send(NGRpcService::CreateGRpcRequestProxyId(), NGRpcService::CreateFederatedQueryModifyQueryRequestOperationCall(std::move(requestContextModify)).release()); + }); + + ctx.Send(NGRpcService::CreateGRpcRequestProxyId(), EventFactory(std::move(requestContext)).release()); + this->Die(ctx); + } +}; + +#undef TGrpcCallWrapperBase + } // namespace NKikimr::NPublicHttp diff --git a/ydb/core/public_http/http_service.cpp b/ydb/core/public_http/http_service.cpp index ae6d72209d..81fa9c66eb 100644 --- a/ydb/core/public_http/http_service.cpp +++ b/ydb/core/public_http/http_service.cpp @@ -57,6 +57,7 @@ namespace { Router.RegisterHandler(HTTP_METHOD_GET, "/api/fq/v1/queries/{query_id}/status", CreateHttpHandler<TJsonGetQueryStatus>()); Router.RegisterHandler(HTTP_METHOD_GET, "/api/fq/v1/queries/{query_id}/results/{result_set_index}", CreateHttpHandler<TJsonGetResultData>()); Router.RegisterHandler(HTTP_METHOD_POST, "/api/fq/v1/queries/{query_id}/stop", CreateHttpHandler<TJsonStopQuery>()); + Router.RegisterHandler(HTTP_METHOD_POST, "/api/fq/v1/queries/{query_id}/start", CreateHttpHandler<TJsonStartQuery>()); } void Bootstrap(const TActorContext& ctx) { diff --git a/ydb/core/public_http/openapi/openapi.yaml b/ydb/core/public_http/openapi/openapi.yaml index 6a490bebd3..358dfc1bbd 100644 --- a/ydb/core/public_http/openapi/openapi.yaml +++ b/ydb/core/public_http/openapi/openapi.yaml @@ -185,6 +185,31 @@ paths: required: true
schema:
type: string
+ '/queries/{query_id}/start':
+ post:
+ responses:
+ '204':
+ description: No Content
+ '400':
+ description: Bad Request
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/GenericError'
+ parameters:
+ - $ref: '#/components/parameters/Idempotency-Key'
+ - $ref: '#/components/parameters/Authorization'
+ - $ref: '#/components/parameters/x-request-id'
+ - $ref: '#/components/parameters/db'
+ - $ref: '#/components/parameters/project'
+ summary: start stopped query
+ operationId: start-query
+ parameters:
+ - name: query_id
+ in: path
+ required: true
+ schema:
+ type: string
'/queries/{query_id}/results/{result_set_index}':
parameters:
- name: query_id
diff --git a/ydb/tests/fq/http_api/test_http_api.py b/ydb/tests/fq/http_api/test_http_api.py index 74e454afe5..08130fb11c 100644 --- a/ydb/tests/fq/http_api/test_http_api.py +++ b/ydb/tests/fq/http_api/test_http_api.py @@ -76,7 +76,7 @@ class TestHttpApi(TestBase): assert len(query_id) == 20 status = client.get_query_status(query_id) - assert status in ["FAILED", "RUNNING", "COMPLETED"] + assert status in ["STARTING", "RUNNING", "COMPLETED", "COMPLETING"] wait_for_query_status(client, query_id, ["COMPLETED"]) query_json = client.get_query(query_id) @@ -98,6 +98,14 @@ class TestHttpApi(TestBase): response = client.stop_query(query_id) assert response.status_code == 204 + response = client.start_query(query_id) + assert response.status_code == 204 + + assert client.get_query_status(query_id) in ["STARTING", "RUNNING", "COMPLETED", "COMPLETING"] + + response = client.stop_query(query_id) + assert response.status_code == 204 + def test_empty_query(self): with self.create_client() as client: with pytest.raises( @@ -228,6 +236,28 @@ class TestHttpApi(TestBase): self.streaming_over_kikimr.compute_plane.start() c.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER) + def test_restart_idempotency(self): + c = FederatedQueryClient("my_folder", streaming_over_kikimr=self.streaming_over_kikimr) + self.streaming_over_kikimr.compute_plane.stop() + query_id = c.create_query("select1", "select 1").result.query_id + c.wait_query_status(query_id, fq.QueryMeta.STARTING) + + with self.create_client() as client: + response1 = client.stop_query(query_id, idempotency_key="Z") + assert response1.status_code == 204 + + response2 = client.start_query(query_id, idempotency_key="Z") + assert response2.status_code == 204 + + response2 = client.start_query(query_id, idempotency_key="Z") + assert response2.status_code == 204 + + response1 = client.stop_query(query_id, idempotency_key="Z") + assert response1.status_code == 204 + + self.streaming_over_kikimr.compute_plane.start() + c.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + def test_simple_streaming_query(self): self.init_topics("simple_streaming_query", create_output=False) c = FederatedQueryClient("my_folder", streaming_over_kikimr=self.streaming_over_kikimr) |