aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorArseny <90683415+what-the-fawk@users.noreply.github.com>2024-12-25 18:11:19 +0300
committerGitHub <noreply@github.com>2024-12-25 18:11:19 +0300
commit27d87203f75a7b5a4911238355f87360599cb5e1 (patch)
treee15b1661d86ab0036d234336492303d3df081bb2
parentdfda963d7c7cb23081af099ca3312689902e05f2 (diff)
downloadydb-27d87203f75a7b5a4911238355f87360599cb5e1.tar.gz
Http start query (#11759)
Co-authored-by: Arseny Bolotnikov <abolotnikov12@yandex-team.ru>
-rw-r--r--ydb/core/fq/libs/http_api_client/http_client.py22
-rw-r--r--ydb/core/public_http/fq_handlers.h76
-rw-r--r--ydb/core/public_http/http_service.cpp1
-rw-r--r--ydb/core/public_http/openapi/openapi.yaml25
-rw-r--r--ydb/tests/fq/http_api/test_http_api.py32
5 files changed, 150 insertions, 6 deletions
diff --git a/ydb/core/fq/libs/http_api_client/http_client.py b/ydb/core/fq/libs/http_api_client/http_client.py
index 88b9e781c5..e5770ce9ad 100644
--- a/ydb/core/fq/libs/http_api_client/http_client.py
+++ b/ydb/core/fq/libs/http_api_client/http_client.py
@@ -11,7 +11,7 @@ from urllib3.util.retry import Retry
from .query_results import YQResults
-MAX_RETRY_FOR_SESSION = 4
+MAX_RETRY_FOR_SESSION = 100
BACK_OFF_FACTOR = 0.3
TIME_BETWEEN_RETRIES = 1000
ERROR_CODES = (500, 502, 504)
@@ -150,6 +150,22 @@ class YQHttpClient(object):
self._validate_http_error(response, expected_code=expected_code)
return response.json()["id"]
+ def start_query(
+ self,
+ query_id: str,
+ request_id=None,
+ idempotency_key: str | None = None,
+ expected_code: int = 204,
+ ):
+ response = self.session.post(
+ self._compose_api_url(f"/api/fq/v1/queries/{query_id}/start"),
+ headers=self._build_headers(idempotency_key=idempotency_key, request_id=request_id),
+ params=self._build_params(),
+ )
+
+ self._validate_http_error(response, expected_code)
+ return response
+
def get_query_status(self, query_id, request_id=None, expected_code=200) -> Any:
response = self.session.get(
self._compose_api_url(f"/api/fq/v1/queries/{query_id}/status"),
@@ -272,9 +288,7 @@ class YQHttpClient(object):
return YQResults(result).results
- def get_query_all_result_sets(
- self, query_id: str, result_set_count: int, raw_format: bool = False
- ) -> Any:
+ def get_query_all_result_sets(self, query_id: str, result_set_count: int, raw_format: bool = False) -> Any:
result = []
for i in range(0, result_set_count):
r = self.get_query_result_set(query_id, result_set_index=i, raw_format=raw_format)
diff --git a/ydb/core/public_http/fq_handlers.h b/ydb/core/public_http/fq_handlers.h
index 6fd53b9701..1a30ba6b40 100644
--- a/ydb/core/public_http/fq_handlers.h
+++ b/ydb/core/public_http/fq_handlers.h
@@ -10,6 +10,8 @@
#include <ydb/library/services/services.pb.h>
#include <ydb/core/public_http/protos/fq.pb.h>
+#include <type_traits>
+
namespace NKikimr::NPublicHttp {
using namespace NActors;
@@ -249,6 +251,7 @@ void SetIdempotencyKey(T& dst, const TString& key) {
template <typename GrpcProtoRequestType, typename HttpProtoRequestType, typename GrpcProtoResultType, typename HttpProtoResultType, typename GrpcProtoResponseType>
class TGrpcCallWrapper : public TActorBootstrapped<TGrpcCallWrapper<GrpcProtoRequestType, HttpProtoRequestType, GrpcProtoResultType, HttpProtoResultType, GrpcProtoResponseType>> {
+protected:
THttpRequestContext RequestContext;
typedef std::function<std::unique_ptr<NGRpcService::TEvProxyRuntimeEvent>(TIntrusivePtr<NYdbGrpc::IRequestContextBase> ctx)> TGrpcProxyEventFactory;
@@ -278,6 +281,10 @@ public:
}
void Bootstrap(const TActorContext& ctx) {
+ BootstrapWrapper(ctx);
+ }
+
+ virtual void BootstrapWrapper(const TActorContext& ctx) {
auto grpcRequest = std::make_unique<TGrpcProtoRequestType>();
if (Parse(*grpcRequest)) {
TIntrusivePtr<TGrpcRequestContextWrapper> requestContext = new TGrpcRequestContextWrapper(RequestContext, std::move(grpcRequest), &SendReply);
@@ -354,7 +361,6 @@ public:
auto* httpResult = google::protobuf::Arena::CreateMessage<FQHttp::Error>(resp->GetArena());
FqConvert(typedResponse->operation(), *httpResult);
FqPackToJson(json, *httpResult, jsonSettings);
-
requestContext.ResponseBadRequestJson(typedResponse->operation().status(), json.Str());
return;
}
@@ -396,4 +402,72 @@ DECLARE_YQ_GRPC_ACTOR(GetQueryStatus, GetQueryStatus);
DECLARE_YQ_GRPC_ACTOR_WIHT_EMPTY_RESULT(StopQuery, ControlQuery);
DECLARE_YQ_GRPC_ACTOR(GetResultData, GetResultData);
+class TJsonStartQuery : public TGrpcCallWrapper<FederatedQuery::DescribeQueryRequest, FQHttp::GetQueryRequest, FederatedQuery::ModifyQueryResult, google::protobuf::Empty, FederatedQuery::ModifyQueryResponse> {
+public:
+ typedef TGrpcCallWrapper<FederatedQuery::DescribeQueryRequest, FQHttp::GetQueryRequest, FederatedQuery::ModifyQueryResult, google::protobuf::Empty, FederatedQuery::ModifyQueryResponse> TGrpcCallWrapperBase;
+
+ TJsonStartQuery(const THttpRequestContext& ctx)
+ : TGrpcCallWrapperBase(ctx, &NGRpcService::CreateFederatedQueryDescribeQueryRequestOperationCall)
+ {}
+
+ void BootstrapWrapper(const TActorContext& ctx) override {
+
+ auto describeRequest = std::make_unique<FederatedQuery::DescribeQueryRequest>();
+ if (!Parse(*describeRequest)) {
+ this->Die(ctx);
+ return;
+ }
+
+ TProtoStringType queryId = describeRequest->Getquery_id();
+ TIntrusivePtr<TGrpcRequestContextWrapper> requestContext = MakeIntrusive<TGrpcRequestContextWrapper>(
+ RequestContext,
+ std::move(describeRequest),
+ [query_id = std::move(queryId), actorSystem = TActivationContext::ActorSystem()](const THttpRequestContext& requestContext, const TJsonSettings& jsonSettings, NProtoBuf::Message* resp, ui32 status) {
+
+ Y_ABORT_UNLESS(resp);
+ Y_ABORT_UNLESS(resp->GetArena());
+
+ auto* typedResponse = static_cast<FederatedQuery::DescribeQueryResponse*>(resp);
+ if (!typedResponse->operation().result().template Is<FederatedQuery::DescribeQueryResult>()) {
+ TStringStream json;
+ auto httpResult = std::unique_ptr<FQHttp::Error>(new FQHttp::Error());
+ FqConvert(typedResponse->operation(), *httpResult);
+ FqPackToJson(json, *httpResult, jsonSettings);
+ requestContext.ResponseBadRequestJson(typedResponse->operation().status(), json.Str());
+ return;
+ }
+
+ std::unique_ptr<FederatedQuery::DescribeQueryResult> describeResult = std::unique_ptr<FederatedQuery::DescribeQueryResult>(new FederatedQuery::DescribeQueryResult());
+ if (!typedResponse->operation().result().UnpackTo(&*describeResult)) {
+ requestContext.ResponseBadRequest(Ydb::StatusIds::INTERNAL_ERROR, "Error in response unpack");
+ return;
+ }
+
+ // modify
+ auto modifyRequest = std::unique_ptr<FederatedQuery::ModifyQueryRequest>(new FederatedQuery::ModifyQueryRequest());
+
+ modifyRequest->set_query_id(query_id);
+ *modifyRequest->mutable_content() = describeResult->query().content();
+ modifyRequest->set_execute_mode(::FederatedQuery::ExecuteMode::RUN);
+ modifyRequest->set_state_load_mode(::FederatedQuery::StateLoadMode::STATE_LOAD_MODE_UNSPECIFIED);
+ modifyRequest->set_previous_revision(describeResult->query().meta().Getlast_job_query_revision());
+ modifyRequest->set_idempotency_key(requestContext.GetIdempotencyKey());
+
+ TIntrusivePtr<TGrpcRequestContextWrapper> requestContextModify = new TGrpcRequestContextWrapper(
+ requestContext,
+ std::move(modifyRequest),
+ TGrpcCallWrapper<FederatedQuery::ModifyQueryRequest, int, FederatedQuery::ModifyQueryResult, google::protobuf::Empty, FederatedQuery::ModifyQueryResponse>::SendReply
+ );
+
+ // new event -> new EventFactory
+ actorSystem->Send(NGRpcService::CreateGRpcRequestProxyId(), NGRpcService::CreateFederatedQueryModifyQueryRequestOperationCall(std::move(requestContextModify)).release());
+ });
+
+ ctx.Send(NGRpcService::CreateGRpcRequestProxyId(), EventFactory(std::move(requestContext)).release());
+ this->Die(ctx);
+ }
+};
+
+#undef TGrpcCallWrapperBase
+
} // namespace NKikimr::NPublicHttp
diff --git a/ydb/core/public_http/http_service.cpp b/ydb/core/public_http/http_service.cpp
index ae6d72209d..81fa9c66eb 100644
--- a/ydb/core/public_http/http_service.cpp
+++ b/ydb/core/public_http/http_service.cpp
@@ -57,6 +57,7 @@ namespace {
Router.RegisterHandler(HTTP_METHOD_GET, "/api/fq/v1/queries/{query_id}/status", CreateHttpHandler<TJsonGetQueryStatus>());
Router.RegisterHandler(HTTP_METHOD_GET, "/api/fq/v1/queries/{query_id}/results/{result_set_index}", CreateHttpHandler<TJsonGetResultData>());
Router.RegisterHandler(HTTP_METHOD_POST, "/api/fq/v1/queries/{query_id}/stop", CreateHttpHandler<TJsonStopQuery>());
+ Router.RegisterHandler(HTTP_METHOD_POST, "/api/fq/v1/queries/{query_id}/start", CreateHttpHandler<TJsonStartQuery>());
}
void Bootstrap(const TActorContext& ctx) {
diff --git a/ydb/core/public_http/openapi/openapi.yaml b/ydb/core/public_http/openapi/openapi.yaml
index 6a490bebd3..358dfc1bbd 100644
--- a/ydb/core/public_http/openapi/openapi.yaml
+++ b/ydb/core/public_http/openapi/openapi.yaml
@@ -185,6 +185,31 @@ paths:
required: true
schema:
type: string
+ '/queries/{query_id}/start':
+ post:
+ responses:
+ '204':
+ description: No Content
+ '400':
+ description: Bad Request
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/GenericError'
+ parameters:
+ - $ref: '#/components/parameters/Idempotency-Key'
+ - $ref: '#/components/parameters/Authorization'
+ - $ref: '#/components/parameters/x-request-id'
+ - $ref: '#/components/parameters/db'
+ - $ref: '#/components/parameters/project'
+ summary: start stopped query
+ operationId: start-query
+ parameters:
+ - name: query_id
+ in: path
+ required: true
+ schema:
+ type: string
'/queries/{query_id}/results/{result_set_index}':
parameters:
- name: query_id
diff --git a/ydb/tests/fq/http_api/test_http_api.py b/ydb/tests/fq/http_api/test_http_api.py
index 74e454afe5..08130fb11c 100644
--- a/ydb/tests/fq/http_api/test_http_api.py
+++ b/ydb/tests/fq/http_api/test_http_api.py
@@ -76,7 +76,7 @@ class TestHttpApi(TestBase):
assert len(query_id) == 20
status = client.get_query_status(query_id)
- assert status in ["FAILED", "RUNNING", "COMPLETED"]
+ assert status in ["STARTING", "RUNNING", "COMPLETED", "COMPLETING"]
wait_for_query_status(client, query_id, ["COMPLETED"])
query_json = client.get_query(query_id)
@@ -98,6 +98,14 @@ class TestHttpApi(TestBase):
response = client.stop_query(query_id)
assert response.status_code == 204
+ response = client.start_query(query_id)
+ assert response.status_code == 204
+
+ assert client.get_query_status(query_id) in ["STARTING", "RUNNING", "COMPLETED", "COMPLETING"]
+
+ response = client.stop_query(query_id)
+ assert response.status_code == 204
+
def test_empty_query(self):
with self.create_client() as client:
with pytest.raises(
@@ -228,6 +236,28 @@ class TestHttpApi(TestBase):
self.streaming_over_kikimr.compute_plane.start()
c.wait_query_status(query_id, fq.QueryMeta.ABORTED_BY_USER)
+ def test_restart_idempotency(self):
+ c = FederatedQueryClient("my_folder", streaming_over_kikimr=self.streaming_over_kikimr)
+ self.streaming_over_kikimr.compute_plane.stop()
+ query_id = c.create_query("select1", "select 1").result.query_id
+ c.wait_query_status(query_id, fq.QueryMeta.STARTING)
+
+ with self.create_client() as client:
+ response1 = client.stop_query(query_id, idempotency_key="Z")
+ assert response1.status_code == 204
+
+ response2 = client.start_query(query_id, idempotency_key="Z")
+ assert response2.status_code == 204
+
+ response2 = client.start_query(query_id, idempotency_key="Z")
+ assert response2.status_code == 204
+
+ response1 = client.stop_query(query_id, idempotency_key="Z")
+ assert response1.status_code == 204
+
+ self.streaming_over_kikimr.compute_plane.start()
+ c.wait_query_status(query_id, fq.QueryMeta.COMPLETED)
+
def test_simple_streaming_query(self):
self.init_topics("simple_streaming_query", create_output=False)
c = FederatedQueryClient("my_folder", streaming_over_kikimr=self.streaming_over_kikimr)