aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgrigoriypisar <grigoriypisar@yandex-team.com>2023-09-26 19:38:34 +0300
committergrigoriypisar <grigoriypisar@yandex-team.com>2023-09-26 20:25:39 +0300
commitfc4f4003cc908667bb437d819021d097e9fbeb63 (patch)
treef208767577468ff93a6e9986b480a9afeea95a5c
parentaad09b45931dc341b45987e2e0dfd9245793a2e9 (diff)
downloadydb-fc4f4003cc908667bb437d819021d097e9fbeb63.tar.gz
fix dependencies of RowsLimit in FetchScriptResultsRPC
-rw-r--r--ydb/core/kqp/common/events/script_executions.h11
-rw-r--r--ydb/core/kqp/common/simple/kqp_event_ids.h1
-rw-r--r--ydb/core/kqp/proxy_service/kqp_script_executions.cpp68
-rw-r--r--ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp25
4 files changed, 89 insertions, 16 deletions
diff --git a/ydb/core/kqp/common/events/script_executions.h b/ydb/core/kqp/common/events/script_executions.h
index 3627d9d6760..94ff1de9094 100644
--- a/ydb/core/kqp/common/events/script_executions.h
+++ b/ydb/core/kqp/common/events/script_executions.h
@@ -185,4 +185,15 @@ struct TEvSaveScriptResultFinished : public NActors::TEventLocal<TEvSaveScriptRe
NYql::TIssues Issues;
};
+struct TEvFetchScriptResultsQueryResponse : public NActors::TEventLocal<TEvFetchScriptResultsQueryResponse, TKqpScriptExecutionEvents::EvFetchScriptResultsQueryResponse> {
+ TEvFetchScriptResultsQueryResponse(bool truncated, NKikimrKqp::TEvFetchScriptResultsResponse&& results)
+ : Truncated(truncated)
+ , Results(std::move(results))
+ {
+ }
+
+ bool Truncated;
+ NKikimrKqp::TEvFetchScriptResultsResponse Results;
+};
+
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/common/simple/kqp_event_ids.h b/ydb/core/kqp/common/simple/kqp_event_ids.h
index ea89513a22a..4aecb291a35 100644
--- a/ydb/core/kqp/common/simple/kqp_event_ids.h
+++ b/ydb/core/kqp/common/simple/kqp_event_ids.h
@@ -141,6 +141,7 @@ struct TKqpScriptExecutionEvents {
EvSaveScriptResultFinished,
EvCheckAliveRequest,
EvCheckAliveResponse,
+ EvFetchScriptResultsQueryResponse,
};
};
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
index 7cfc0450eca..bb8331b308a 100644
--- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
@@ -1887,8 +1887,7 @@ public:
TGetScriptExecutionResultQuery(const TString& database, const TString& executionId, i32 resultSetIndex, i64 offset, i64 limit)
: Database(database), ExecutionId(executionId), ResultSetIndex(resultSetIndex), Offset(offset), Limit(limit)
{
- Response = MakeHolder<TEvKqp::TEvFetchScriptResultsResponse>();
- Response->Record.SetResultSetIndex(ResultSetIndex);
+ Response.SetResultSetIndex(ResultSetIndex);
}
void OnRunQuery() override {
@@ -2012,11 +2011,13 @@ public:
Ydb::Query::Internal::ResultSetMeta meta;
NProtobufJson::Json2Proto(*metaValue, meta);
- *Response->Record.MutableResultSet()->mutable_columns() = meta.columns();
- Response->Record.MutableResultSet()->set_truncated(meta.truncated());
+ *Response.MutableResultSet()->mutable_columns() = meta.columns();
+ Response.MutableResultSet()->set_truncated(meta.truncated());
}
{ // rows
+ Truncated = ResultSets[1].Truncated();
+
NYdb::TResultSetParser result(ResultSets[1]);
while (result.TryNextRow()) {
@@ -2027,7 +2028,7 @@ public:
return;
}
- if (!Response->Record.MutableResultSet()->add_rows()->ParseFromString(*serializedRow)) {
+ if (!Response.MutableResultSet()->add_rows()->ParseFromString(*serializedRow)) {
Finish(Ydb::StatusIds::INTERNAL_ERROR, "Result set row is corrupted");
return;
}
@@ -2038,14 +2039,14 @@ public:
}
void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
- Response->Record.SetStatus(status);
+ Response.SetStatus(status);
if (status != Ydb::StatusIds::SUCCESS) {
- Response->Record.MutableResultSet()->Clear();
+ Response.MutableResultSet()->Clear();
}
if (issues) {
- NYql::IssuesToMessage(issues, Response->Record.MutableIssues());
+ NYql::IssuesToMessage(issues, Response.MutableIssues());
}
- Send(Owner, std::move(Response));
+ Send(Owner, new TEvFetchScriptResultsQueryResponse(Truncated, std::move(Response)));
}
private:
@@ -2054,7 +2055,8 @@ private:
const i32 ResultSetIndex;
const i64 Offset;
const i64 Limit;
- THolder<TEvKqp::TEvFetchScriptResultsResponse> Response;
+ bool Truncated = false;
+ NKikimrKqp::TEvFetchScriptResultsResponse Response;
};
class TGetScriptExecutionResultActor : public TActorBootstrapped<TGetScriptExecutionResultActor> {
@@ -2062,20 +2064,53 @@ public:
TGetScriptExecutionResultActor(const NActors::TActorId& replyActorId, const TString& database, const TString& executionId, i32 resultSetIndex, i64 offset, i64 limit)
: ReplyActorId(replyActorId), Database(database), ExecutionId(executionId), ResultSetIndex(resultSetIndex), Offset(offset), Limit(limit)
{
+ Response = MakeHolder<TEvKqp::TEvFetchScriptResultsResponse>();
}
- void Bootstrap() {
+ void CreateFetchScriptExecutionResultQuery() {
Register(new TGetScriptExecutionResultQuery(Database, ExecutionId, ResultSetIndex, Offset, Limit));
+ }
+ void Bootstrap() {
+ CreateFetchScriptExecutionResultQuery();
Become(&TGetScriptExecutionResultActor::StateFunc);
}
STRICT_STFUNC(StateFunc,
- hFunc(TEvKqp::TEvFetchScriptResultsResponse, Handle);
+ hFunc(TEvFetchScriptResultsQueryResponse, Handle);
)
- void Handle(TEvKqp::TEvFetchScriptResultsResponse::TPtr& ev) {
- Send(ev->Forward(ReplyActorId));
+ void RecordResults(NKikimrKqp::TEvFetchScriptResultsResponse&& results) {
+ if (!Response->Record.has_status()) {
+ Response->Record = std::move(results);
+ return;
+ }
+
+ const auto& rows = results.GetResultSet().get_arr_rows();
+ Response->Record.mutable_resultset()->mutable_rows()->Add(rows.begin(), rows.end());
+ }
+
+ void Handle(TEvFetchScriptResultsQueryResponse::TPtr& ev) {
+ if (ev->Get()->Results.GetStatus() != Ydb::StatusIds::SUCCESS) {
+ Response->Record = std::move(ev->Get()->Results);
+ Reply();
+ return;
+ }
+
+ i64 rowsCount = ev->Get()->Results.GetResultSet().rows_size();
+ RecordResults(std::move(ev->Get()->Results));
+
+ if (ev->Get()->Truncated) {
+ Offset += rowsCount;
+ Limit -= rowsCount;
+ CreateFetchScriptExecutionResultQuery();
+ } else {
+ Reply();
+ }
+ }
+
+ void Reply() {
+ Send(ReplyActorId, std::move(Response));
PassAway();
}
@@ -2084,8 +2119,9 @@ private:
const TString Database;
const TString ExecutionId;
const i32 ResultSetIndex;
- const i64 Offset;
- const i64 Limit;
+ i64 Offset;
+ i64 Limit;
+ THolder<TEvKqp::TEvFetchScriptResultsResponse> Response;
};
} // anonymous namespace
diff --git a/ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp
index ea3a77a0d01..8df50dec2e4 100644
--- a/ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp
@@ -693,6 +693,31 @@ Y_UNIT_TEST_SUITE(KqpQueryServiceScripts) {
UNIT_ASSERT(results.GetResultSet().Truncated());
}
+
+ Y_UNIT_TEST(TestFetchMoreThanLimit) {
+ constexpr size_t NUMER_BATCHES = 5;
+ constexpr size_t ROWS_LIMIT = 20;
+
+ NKikimrConfig::TAppConfig appCfg;
+ appCfg.MutableTableServiceConfig()->MutableQueryLimits()->set_resultrowslimit(ROWS_LIMIT);
+
+ auto kikimr = DefaultKikimrRunner({}, appCfg);
+ auto db = kikimr.GetQueryClient();
+ auto scriptExecutionOperation = CreateScriptExecutionOperation(NUMER_BATCHES * ROWS_LIMIT + 1, db, kikimr.GetDriver());
+
+ auto settings = TFetchScriptResultsSettings();
+ settings.RowsLimit(NUMER_BATCHES * ROWS_LIMIT);
+
+ TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0, settings).ExtractValueSync();
+ UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
+
+ TResultSetParser resultSet(results.ExtractResultSet());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), NUMER_BATCHES * ROWS_LIMIT);
+ for (size_t i = 0; i < NUMER_BATCHES * ROWS_LIMIT; ++i) {
+ UNIT_ASSERT(resultSet.TryNextRow());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetInt32(), i);
+ }
+ }
}
} // namespace NKqp