aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhcpp <hcpp@ydb.tech>2023-07-07 19:01:02 +0300
committerhcpp <hcpp@ydb.tech>2023-07-07 19:01:02 +0300
commit6f6c640f81808c8000f6068a8ec5dbb5af5bce62 (patch)
tree582f8c1d95c4b076163adf386ab26c12d950b835
parentc27892c7c433378cff845aff00cc697efb57c198 (diff)
downloadydb-6f6c640f81808c8000f6068a8ec5dbb5af5bce62.tar.gz
fetch token has been supported for yqv2 proxy
-rw-r--r--ydb/core/fq/libs/compute/ydb/events/events.h10
-rw-r--r--ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp6
-rw-r--r--ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp4
-rw-r--r--ydb/core/grpc_services/query/rpc_fetch_script_results.cpp7
-rw-r--r--ydb/core/kqp/ut/service/kqp_query_service_ut.cpp14
-rw-r--r--ydb/public/api/protos/draft/ydb_query.proto6
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp1
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/query.h9
8 files changed, 24 insertions, 33 deletions
diff --git a/ydb/core/fq/libs/compute/ydb/events/events.h b/ydb/core/fq/libs/compute/ydb/events/events.h
index 0b3957405df..ae24c0f3896 100644
--- a/ydb/core/fq/libs/compute/ydb/events/events.h
+++ b/ydb/core/fq/libs/compute/ydb/events/events.h
@@ -100,15 +100,15 @@ struct TEvYdbCompute {
};
struct TEvFetchScriptResultRequest : public NActors::TEventLocal<TEvFetchScriptResultRequest, EvFetchScriptResultRequest> {
- TEvFetchScriptResultRequest(TString executionId, int64_t resultSetId, int64_t rowOffset)
+ TEvFetchScriptResultRequest(TString executionId, int64_t resultSetId, const TString& fetchToken)
: ExecutionId(std::move(executionId))
, ResultSetId(resultSetId)
- , RowOffset(rowOffset)
+ , FetchToken(fetchToken)
{}
TString ExecutionId;
int64_t ResultSetId = 0;
- int64_t RowOffset = 0;
+ TString FetchToken;
};
struct TEvFetchScriptResultResponse : public NActors::TEventLocal<TEvFetchScriptResultResponse, EvFetchScriptResultResponse> {
@@ -117,12 +117,14 @@ struct TEvYdbCompute {
, Status(status)
{}
- explicit TEvFetchScriptResultResponse(NYdb::TResultSet resultSet)
+ explicit TEvFetchScriptResultResponse(NYdb::TResultSet resultSet, const TString& nextFetchToken)
: ResultSet(std::move(resultSet))
+ , NextFetchToken(nextFetchToken)
, Status(NYdb::EStatus::SUCCESS)
{}
TMaybe<NYdb::TResultSet> ResultSet;
+ TString NextFetchToken;
NYql::TIssues Issues;
NYdb::EStatus Status;
};
diff --git a/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp b/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp
index 603ddb62878..599fa1b5024 100644
--- a/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp
+++ b/ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp
@@ -116,8 +116,9 @@ public:
}
StartTime = TInstant::Now();
+ FetchToken = response.NextFetchToken;
const auto resultSetProto = NYdb::TProtoAccessor::GetProto(*response.ResultSet);
- if (response.ResultSet->RowsCount() == 0) {
+ if (response.ResultSet->RowsCount() == 0 || !FetchToken) {
Fq::Private::PingTaskRequest pingTaskRequest;
pingTaskRequest.mutable_result_id()->set_value(Params.ResultId);
pingTaskRequest.set_result_set_count(1);
@@ -160,7 +161,7 @@ public:
auto fetchScriptResultCounters = Counters.GetCounters(ERequestType::RT_FETCH_SCRIPT_RESULT);
fetchScriptResultCounters->InFly->Inc();
StartTime = TInstant::Now();
- Register(new TRetryActor<TEvYdbCompute::TEvFetchScriptResultRequest, TEvYdbCompute::TEvFetchScriptResultResponse, TString, int64_t, int64_t>(Counters.GetCounters(ERequestType::RT_FETCH_SCRIPT_RESULT), SelfId(), Connector, ExecutionId, 0, Offset));
+ Register(new TRetryActor<TEvYdbCompute::TEvFetchScriptResultRequest, TEvYdbCompute::TEvFetchScriptResultResponse, TString, int64_t, TString>(Counters.GetCounters(ERequestType::RT_FETCH_SCRIPT_RESULT), SelfId(), Connector, ExecutionId, 0, FetchToken));
}
Fq::Private::WriteTaskResultRequest CreateProtoRequestWithoutResultSet(ui64 startRowIndex) {
@@ -183,6 +184,7 @@ private:
TCounters Counters;
TInstant StartTime;
int64_t Offset = 0;
+ TString FetchToken;
};
std::unique_ptr<NActors::IActor> CreateResultWriterActor(const TRunActorParams& params,
diff --git a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp
index 4c76f624c27..a75c639530b 100644
--- a/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp
+++ b/ydb/core/fq/libs/compute/ydb/ydb_connector_actor.cpp
@@ -78,14 +78,14 @@ public:
void Handle(const TEvYdbCompute::TEvFetchScriptResultRequest::TPtr& ev) {
NYdb::NQuery::TFetchScriptResultsSettings settings;
- settings.RowsOffset(ev->Get()->RowOffset);
+ settings.FetchToken(ev->Get()->FetchToken);
QueryClient
->FetchScriptResults(ev->Get()->ExecutionId, ev->Get()->ResultSetId, settings)
.Apply([actorSystem = NActors::TActivationContext::ActorSystem(), recipient = ev->Sender, cookie = ev->Cookie](auto future) {
try {
auto response = future.ExtractValueSync();
if (response.IsSuccess()) {
- actorSystem->Send(recipient, new TEvYdbCompute::TEvFetchScriptResultResponse(response.ExtractResultSet()), 0, cookie);
+ actorSystem->Send(recipient, new TEvYdbCompute::TEvFetchScriptResultResponse(response.ExtractResultSet(), response.GetNextFetchToken()), 0, cookie);
} else {
actorSystem->Send(recipient, new TEvYdbCompute::TEvFetchScriptResultResponse(response.GetIssues(), response.GetStatus()), 0, cookie);
}
diff --git a/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp b/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp
index 6b4b77dc46e..4151aa9eef7 100644
--- a/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp
+++ b/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp
@@ -50,17 +50,12 @@ public:
return;
}
- if (req->rows_offset() < 0) {
- Reply(Ydb::StatusIds::BAD_REQUEST, "Invalid rows offset");
- return;
- }
-
if (req->rows_limit() > MAX_ROWS_LIMIT) {
Reply(Ydb::StatusIds::BAD_REQUEST, TStringBuilder() << "Rows limit is too large. Values <= " << MAX_ROWS_LIMIT << " are allowed");
return;
}
- RowsOffset = req->rows_offset();
+ RowsOffset = 0;
if (!req->fetch_token().Empty()) {
auto fetch_token = TryFromString<ui64>(req->fetch_token());
if (fetch_token) {
diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
index 95bb3d9de18..e4f51c60704 100644
--- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
@@ -510,7 +510,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation, 0).ExtractValueSync();
UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
-
+
CompareYson(R"([
["1";"one"];
["2";"two"];
@@ -550,10 +550,8 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
return;
}
- ValidatePlan(readyOp.Metadata().ExecStats.query_plan());
+ ValidatePlan(readyOp.Metadata().ExecStats.query_plan());
}
-
-
Y_UNIT_TEST(ExecuteScriptStatsBasic) {
ExecuteScriptWithStatsMode(Ydb::Query::STATS_MODE_BASIC);
}
@@ -818,7 +816,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
sql.append("]);");
}
}
-
+
auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
@@ -840,7 +838,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
UNIT_ASSERT_VALUES_EQUAL(results.GetStatus(), EStatus::BAD_REQUEST);
}
- Y_UNIT_TEST(EmptyNextPageToken) {
+ Y_UNIT_TEST(EmptyNextFetchToken) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetQueryClient();
@@ -852,7 +850,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation, 0, settings).ExtractValueSync();
UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
- UNIT_ASSERT(results.NextPageToken().Empty());
+ UNIT_ASSERT(results.GetNextFetchToken().Empty());
TResultSetParser resultSet(results.ExtractResultSet());
UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), NUMBER_OF_ROWS);
@@ -881,7 +879,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetInt32(), i * ROWS_LIMIT + j);
}
- settings.FetchToken(results.NextPageToken());
+ settings.FetchToken(results.GetNextFetchToken());
}
}
}
diff --git a/ydb/public/api/protos/draft/ydb_query.proto b/ydb/public/api/protos/draft/ydb_query.proto
index ad87797041d..82888b65272 100644
--- a/ydb/public/api/protos/draft/ydb_query.proto
+++ b/ydb/public/api/protos/draft/ydb_query.proto
@@ -261,11 +261,7 @@ message FetchScriptResultsRequest {
int64 result_set_id = 6;
- oneof fetch {
- string fetch_token = 2 [(Ydb.length).le = 1024];
- }
-
- int64 rows_offset = 3 [(Ydb.value) = ">= 0"];
+ string fetch_token = 2 [(Ydb.length).le = 1024];
int64 rows_limit = 4 [(Ydb.value) = ">= 0"];
}
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp b/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp
index 1c66094594b..2cb82ef1fda 100644
--- a/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp
@@ -91,7 +91,6 @@ public:
if (settings.FetchToken_) {
request.set_fetch_token(settings.FetchToken_);
}
- request.set_rows_offset(settings.RowsOffset_);
request.set_rows_limit(settings.RowsLimit_);
auto promise = NThreading::NewPromise<TFetchScriptResultsResult>();
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/query.h b/ydb/public/sdk/cpp/client/draft/ydb_query/query.h
index 8fd44070ff7..97b1012b929 100644
--- a/ydb/public/sdk/cpp/client/draft/ydb_query/query.h
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/query.h
@@ -195,7 +195,6 @@ private:
struct TFetchScriptResultsSettings : public TRequestSettings<TFetchScriptResultsSettings> {
FLUENT_SETTING(TString, FetchToken);
- FLUENT_SETTING_DEFAULT(ui64, RowsOffset, 0);
FLUENT_SETTING_DEFAULT(ui64, RowsLimit, 1000);
};
@@ -205,23 +204,23 @@ public:
ui64 GetResultSetIndex() const { return ResultSetIndex_; }
const TResultSet& GetResultSet() const { return *ResultSet_; }
TResultSet ExtractResultSet() { return std::move(*ResultSet_); }
- const TString& NextPageToken() const { return NextPageToken_; }
+ const TString& GetNextFetchToken() const { return NextFetchToken_; }
explicit TFetchScriptResultsResult(TStatus&& status)
: TStatus(std::move(status))
{}
- TFetchScriptResultsResult(TStatus&& status, TResultSet&& resultSet, i64 resultSetIndex, const TString& nextPageToken)
+ TFetchScriptResultsResult(TStatus&& status, TResultSet&& resultSet, i64 resultSetIndex, const TString& nextFetchToken)
: TStatus(std::move(status))
, ResultSet_(std::move(resultSet))
, ResultSetIndex_(resultSetIndex)
- , NextPageToken_(nextPageToken)
+ , NextFetchToken_(nextFetchToken)
{}
private:
TMaybe<TResultSet> ResultSet_;
i64 ResultSetIndex_ = 0;
- TString NextPageToken_;
+ TString NextFetchToken_;
};
using TAsyncFetchScriptResultsResult = NThreading::TFuture<TFetchScriptResultsResult>;