diff options
author | hcpp <hcpp@ydb.tech> | 2023-07-07 19:01:02 +0300 |
---|---|---|
committer | hcpp <hcpp@ydb.tech> | 2023-07-07 19:01:02 +0300 |
commit | 6f6c640f81808c8000f6068a8ec5dbb5af5bce62 (patch) | |
tree | 582f8c1d95c4b076163adf386ab26c12d950b835 | |
parent | c27892c7c433378cff845aff00cc697efb57c198 (diff) | |
download | ydb-6f6c640f81808c8000f6068a8ec5dbb5af5bce62.tar.gz |
fetch token has been supported for yqv2 proxy
8 files changed, 24 insertions, 33 deletions
diff --git a/ydb/core/fq/libs/compute/ydb/events/events.h b/ydb/core/fq/libs/compute/ydb/events/events.h index 0b3957405df..ae24c0f3896 100644 --- a/ydb/core/fq/libs/compute/ydb/events/events.h +++ b/ydb/core/fq/libs/compute/ydb/events/events.h @@ -100,15 +100,15 @@ struct TEvYdbCompute { }; struct TEvFetchScriptResultRequest : public NActors::TEventLocal<TEvFetchScriptResultRequest, EvFetchScriptResultRequest> { - TEvFetchScriptResultRequest(TString executionId, int64_t resultSetId, int64_t rowOffset) + TEvFetchScriptResultRequest(TString executionId, int64_t resultSetId, const TString& fetchToken) : ExecutionId(std::move(executionId)) , ResultSetId(resultSetId) - , RowOffset(rowOffset) + , FetchToken(fetchToken) {} TString ExecutionId; int64_t ResultSetId = 0; - int64_t RowOffset = 0; + TString FetchToken; }; struct TEvFetchScriptResultResponse : public NActors::TEventLocal<TEvFetchScriptResultResponse, EvFetchScriptResultResponse> { @@ -117,12 +117,14 @@ struct TEvYdbCompute { , Status(status) {} - explicit TEvFetchScriptResultResponse(NYdb::TResultSet resultSet) + explicit TEvFetchScriptResultResponse(NYdb::TResultSet resultSet, const TString& nextFetchToken) : ResultSet(std::move(resultSet)) + , NextFetchToken(nextFetchToken) , Status(NYdb::EStatus::SUCCESS) {} TMaybe<NYdb::TResultSet> ResultSet; + TString NextFetchToken; NYql::TIssues Issues; NYdb::EStatus Status; }; diff --git a/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp b/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp index 603ddb62878..599fa1b5024 100644 --- a/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp @@ -116,8 +116,9 @@ public: } StartTime = TInstant::Now(); + FetchToken = response.NextFetchToken; const auto resultSetProto = NYdb::TProtoAccessor::GetProto(*response.ResultSet); - if (response.ResultSet->RowsCount() == 0) { + if (response.ResultSet->RowsCount() == 0 || !FetchToken) { Fq::Private::PingTaskRequest pingTaskRequest; pingTaskRequest.mutable_result_id()->set_value(Params.ResultId); pingTaskRequest.set_result_set_count(1); @@ -160,7 +161,7 @@ public: auto fetchScriptResultCounters = Counters.GetCounters(ERequestType::RT_FETCH_SCRIPT_RESULT); fetchScriptResultCounters->InFly->Inc(); StartTime = TInstant::Now(); - Register(new TRetryActor<TEvYdbCompute::TEvFetchScriptResultRequest, TEvYdbCompute::TEvFetchScriptResultResponse, TString, int64_t, int64_t>(Counters.GetCounters(ERequestType::RT_FETCH_SCRIPT_RESULT), SelfId(), Connector, ExecutionId, 0, Offset)); + Register(new TRetryActor<TEvYdbCompute::TEvFetchScriptResultRequest, TEvYdbCompute::TEvFetchScriptResultResponse, TString, int64_t, TString>(Counters.GetCounters(ERequestType::RT_FETCH_SCRIPT_RESULT), SelfId(), Connector, ExecutionId, 0, FetchToken)); } Fq::Private::WriteTaskResultRequest CreateProtoRequestWithoutResultSet(ui64 startRowIndex) { @@ -183,6 +184,7 @@ private: TCounters Counters; TInstant StartTime; int64_t Offset = 0; + TString FetchToken; }; std::unique_ptr<NActors::IActor> CreateResultWriterActor(const TRunActorParams& params, diff --git a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp index 4c76f624c27..a75c639530b 100644 --- a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp @@ -78,14 +78,14 @@ public: void Handle(const TEvYdbCompute::TEvFetchScriptResultRequest::TPtr& ev) { NYdb::NQuery::TFetchScriptResultsSettings settings; - settings.RowsOffset(ev->Get()->RowOffset); + settings.FetchToken(ev->Get()->FetchToken); QueryClient ->FetchScriptResults(ev->Get()->ExecutionId, ev->Get()->ResultSetId, settings) .Apply([actorSystem = NActors::TActivationContext::ActorSystem(), recipient = ev->Sender, cookie = ev->Cookie](auto future) { try { auto response = future.ExtractValueSync(); if (response.IsSuccess()) { - actorSystem->Send(recipient, new TEvYdbCompute::TEvFetchScriptResultResponse(response.ExtractResultSet()), 0, cookie); + actorSystem->Send(recipient, new TEvYdbCompute::TEvFetchScriptResultResponse(response.ExtractResultSet(), response.GetNextFetchToken()), 0, cookie); } else { actorSystem->Send(recipient, new TEvYdbCompute::TEvFetchScriptResultResponse(response.GetIssues(), response.GetStatus()), 0, cookie); } diff --git a/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp b/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp index 6b4b77dc46e..4151aa9eef7 100644 --- a/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp +++ b/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp @@ -50,17 +50,12 @@ public: return; } - if (req->rows_offset() < 0) { - Reply(Ydb::StatusIds::BAD_REQUEST, "Invalid rows offset"); - return; - } - if (req->rows_limit() > MAX_ROWS_LIMIT) { Reply(Ydb::StatusIds::BAD_REQUEST, TStringBuilder() << "Rows limit is too large. Values <= " << MAX_ROWS_LIMIT << " are allowed"); return; } - RowsOffset = req->rows_offset(); + RowsOffset = 0; if (!req->fetch_token().Empty()) { auto fetch_token = TryFromString<ui64>(req->fetch_token()); if (fetch_token) { diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp index 95bb3d9de18..e4f51c60704 100644 --- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp @@ -510,7 +510,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation, 0).ExtractValueSync(); UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); - + CompareYson(R"([ ["1";"one"]; ["2";"two"]; @@ -550,10 +550,8 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { return; } - ValidatePlan(readyOp.Metadata().ExecStats.query_plan()); + ValidatePlan(readyOp.Metadata().ExecStats.query_plan()); } - - Y_UNIT_TEST(ExecuteScriptStatsBasic) { ExecuteScriptWithStatsMode(Ydb::Query::STATS_MODE_BASIC); } @@ -818,7 +816,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { sql.append("]);"); } } - + auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); @@ -840,7 +838,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { UNIT_ASSERT_VALUES_EQUAL(results.GetStatus(), EStatus::BAD_REQUEST); } - Y_UNIT_TEST(EmptyNextPageToken) { + Y_UNIT_TEST(EmptyNextFetchToken) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetQueryClient(); @@ -852,7 +850,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation, 0, settings).ExtractValueSync(); UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); - UNIT_ASSERT(results.NextPageToken().Empty()); + UNIT_ASSERT(results.GetNextFetchToken().Empty()); TResultSetParser resultSet(results.ExtractResultSet()); UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), NUMBER_OF_ROWS); @@ -881,7 +879,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetInt32(), i * ROWS_LIMIT + j); } - settings.FetchToken(results.NextPageToken()); + settings.FetchToken(results.GetNextFetchToken()); } } } diff --git a/ydb/public/api/protos/draft/ydb_query.proto b/ydb/public/api/protos/draft/ydb_query.proto index ad87797041d..82888b65272 100644 --- a/ydb/public/api/protos/draft/ydb_query.proto +++ b/ydb/public/api/protos/draft/ydb_query.proto @@ -261,11 +261,7 @@ message FetchScriptResultsRequest { int64 result_set_id = 6; - oneof fetch { - string fetch_token = 2 [(Ydb.length).le = 1024]; - } - - int64 rows_offset = 3 [(Ydb.value) = ">= 0"]; + string fetch_token = 2 [(Ydb.length).le = 1024]; int64 rows_limit = 4 [(Ydb.value) = ">= 0"]; } diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp b/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp index 1c66094594b..2cb82ef1fda 100644 --- a/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp @@ -91,7 +91,6 @@ public: if (settings.FetchToken_) { request.set_fetch_token(settings.FetchToken_); } - request.set_rows_offset(settings.RowsOffset_); request.set_rows_limit(settings.RowsLimit_); auto promise = NThreading::NewPromise<TFetchScriptResultsResult>(); diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/query.h b/ydb/public/sdk/cpp/client/draft/ydb_query/query.h index 8fd44070ff7..97b1012b929 100644 --- a/ydb/public/sdk/cpp/client/draft/ydb_query/query.h +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/query.h @@ -195,7 +195,6 @@ private: struct TFetchScriptResultsSettings : public TRequestSettings<TFetchScriptResultsSettings> { FLUENT_SETTING(TString, FetchToken); - FLUENT_SETTING_DEFAULT(ui64, RowsOffset, 0); FLUENT_SETTING_DEFAULT(ui64, RowsLimit, 1000); }; @@ -205,23 +204,23 @@ public: ui64 GetResultSetIndex() const { return ResultSetIndex_; } const TResultSet& GetResultSet() const { return *ResultSet_; } TResultSet ExtractResultSet() { return std::move(*ResultSet_); } - const TString& NextPageToken() const { return NextPageToken_; } + const TString& GetNextFetchToken() const { return NextFetchToken_; } explicit TFetchScriptResultsResult(TStatus&& status) : TStatus(std::move(status)) {} - TFetchScriptResultsResult(TStatus&& status, TResultSet&& resultSet, i64 resultSetIndex, const TString& nextPageToken) + TFetchScriptResultsResult(TStatus&& status, TResultSet&& resultSet, i64 resultSetIndex, const TString& nextFetchToken) : TStatus(std::move(status)) , ResultSet_(std::move(resultSet)) , ResultSetIndex_(resultSetIndex) - , NextPageToken_(nextPageToken) + , NextFetchToken_(nextFetchToken) {} private: TMaybe<TResultSet> ResultSet_; i64 ResultSetIndex_ = 0; - TString NextPageToken_; + TString NextFetchToken_; }; using TAsyncFetchScriptResultsResult = NThreading::TFuture<TFetchScriptResultsResult>; |