diff options
author | grigoriypisar <grigoriypisar@yandex-team.com> | 2023-09-26 19:38:34 +0300 |
---|---|---|
committer | grigoriypisar <grigoriypisar@yandex-team.com> | 2023-09-26 20:25:39 +0300 |
commit | fc4f4003cc908667bb437d819021d097e9fbeb63 (patch) | |
tree | f208767577468ff93a6e9986b480a9afeea95a5c | |
parent | aad09b45931dc341b45987e2e0dfd9245793a2e9 (diff) | |
download | ydb-fc4f4003cc908667bb437d819021d097e9fbeb63.tar.gz |
fix dependencies of RowsLimit in FetchScriptResultsRPC
-rw-r--r-- | ydb/core/kqp/common/events/script_executions.h | 11 | ||||
-rw-r--r-- | ydb/core/kqp/common/simple/kqp_event_ids.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_script_executions.cpp | 68 | ||||
-rw-r--r-- | ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp | 25 |
4 files changed, 89 insertions, 16 deletions
diff --git a/ydb/core/kqp/common/events/script_executions.h b/ydb/core/kqp/common/events/script_executions.h index 3627d9d6760..94ff1de9094 100644 --- a/ydb/core/kqp/common/events/script_executions.h +++ b/ydb/core/kqp/common/events/script_executions.h @@ -185,4 +185,15 @@ struct TEvSaveScriptResultFinished : public NActors::TEventLocal<TEvSaveScriptRe NYql::TIssues Issues; }; +struct TEvFetchScriptResultsQueryResponse : public NActors::TEventLocal<TEvFetchScriptResultsQueryResponse, TKqpScriptExecutionEvents::EvFetchScriptResultsQueryResponse> { + TEvFetchScriptResultsQueryResponse(bool truncated, NKikimrKqp::TEvFetchScriptResultsResponse&& results) + : Truncated(truncated) + , Results(std::move(results)) + { + } + + bool Truncated; + NKikimrKqp::TEvFetchScriptResultsResponse Results; +}; + } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/simple/kqp_event_ids.h b/ydb/core/kqp/common/simple/kqp_event_ids.h index ea89513a22a..4aecb291a35 100644 --- a/ydb/core/kqp/common/simple/kqp_event_ids.h +++ b/ydb/core/kqp/common/simple/kqp_event_ids.h @@ -141,6 +141,7 @@ struct TKqpScriptExecutionEvents { EvSaveScriptResultFinished, EvCheckAliveRequest, EvCheckAliveResponse, + EvFetchScriptResultsQueryResponse, }; }; diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp index 7cfc0450eca..bb8331b308a 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp @@ -1887,8 +1887,7 @@ public: TGetScriptExecutionResultQuery(const TString& database, const TString& executionId, i32 resultSetIndex, i64 offset, i64 limit) : Database(database), ExecutionId(executionId), ResultSetIndex(resultSetIndex), Offset(offset), Limit(limit) { - Response = MakeHolder<TEvKqp::TEvFetchScriptResultsResponse>(); - Response->Record.SetResultSetIndex(ResultSetIndex); + Response.SetResultSetIndex(ResultSetIndex); } void OnRunQuery() override { @@ -2012,11 +2011,13 @@ public: Ydb::Query::Internal::ResultSetMeta meta; NProtobufJson::Json2Proto(*metaValue, meta); - *Response->Record.MutableResultSet()->mutable_columns() = meta.columns(); - Response->Record.MutableResultSet()->set_truncated(meta.truncated()); + *Response.MutableResultSet()->mutable_columns() = meta.columns(); + Response.MutableResultSet()->set_truncated(meta.truncated()); } { // rows + Truncated = ResultSets[1].Truncated(); + NYdb::TResultSetParser result(ResultSets[1]); while (result.TryNextRow()) { @@ -2027,7 +2028,7 @@ public: return; } - if (!Response->Record.MutableResultSet()->add_rows()->ParseFromString(*serializedRow)) { + if (!Response.MutableResultSet()->add_rows()->ParseFromString(*serializedRow)) { Finish(Ydb::StatusIds::INTERNAL_ERROR, "Result set row is corrupted"); return; } @@ -2038,14 +2039,14 @@ public: } void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override { - Response->Record.SetStatus(status); + Response.SetStatus(status); if (status != Ydb::StatusIds::SUCCESS) { - Response->Record.MutableResultSet()->Clear(); + Response.MutableResultSet()->Clear(); } if (issues) { - NYql::IssuesToMessage(issues, Response->Record.MutableIssues()); + NYql::IssuesToMessage(issues, Response.MutableIssues()); } - Send(Owner, std::move(Response)); + Send(Owner, new TEvFetchScriptResultsQueryResponse(Truncated, std::move(Response))); } private: @@ -2054,7 +2055,8 @@ private: const i32 ResultSetIndex; const i64 Offset; const i64 Limit; - THolder<TEvKqp::TEvFetchScriptResultsResponse> Response; + bool Truncated = false; + NKikimrKqp::TEvFetchScriptResultsResponse Response; }; class TGetScriptExecutionResultActor : public TActorBootstrapped<TGetScriptExecutionResultActor> { @@ -2062,20 +2064,53 @@ public: TGetScriptExecutionResultActor(const NActors::TActorId& replyActorId, const TString& database, const TString& executionId, i32 resultSetIndex, i64 offset, i64 limit) : ReplyActorId(replyActorId), Database(database), ExecutionId(executionId), ResultSetIndex(resultSetIndex), Offset(offset), Limit(limit) { + Response = MakeHolder<TEvKqp::TEvFetchScriptResultsResponse>(); } - void Bootstrap() { + void CreateFetchScriptExecutionResultQuery() { Register(new TGetScriptExecutionResultQuery(Database, ExecutionId, ResultSetIndex, Offset, Limit)); + } + void Bootstrap() { + CreateFetchScriptExecutionResultQuery(); Become(&TGetScriptExecutionResultActor::StateFunc); } STRICT_STFUNC(StateFunc, - hFunc(TEvKqp::TEvFetchScriptResultsResponse, Handle); + hFunc(TEvFetchScriptResultsQueryResponse, Handle); ) - void Handle(TEvKqp::TEvFetchScriptResultsResponse::TPtr& ev) { - Send(ev->Forward(ReplyActorId)); + void RecordResults(NKikimrKqp::TEvFetchScriptResultsResponse&& results) { + if (!Response->Record.has_status()) { + Response->Record = std::move(results); + return; + } + + const auto& rows = results.GetResultSet().get_arr_rows(); + Response->Record.mutable_resultset()->mutable_rows()->Add(rows.begin(), rows.end()); + } + + void Handle(TEvFetchScriptResultsQueryResponse::TPtr& ev) { + if (ev->Get()->Results.GetStatus() != Ydb::StatusIds::SUCCESS) { + Response->Record = std::move(ev->Get()->Results); + Reply(); + return; + } + + i64 rowsCount = ev->Get()->Results.GetResultSet().rows_size(); + RecordResults(std::move(ev->Get()->Results)); + + if (ev->Get()->Truncated) { + Offset += rowsCount; + Limit -= rowsCount; + CreateFetchScriptExecutionResultQuery(); + } else { + Reply(); + } + } + + void Reply() { + Send(ReplyActorId, std::move(Response)); PassAway(); } @@ -2084,8 +2119,9 @@ private: const TString Database; const TString ExecutionId; const i32 ResultSetIndex; - const i64 Offset; - const i64 Limit; + i64 Offset; + i64 Limit; + THolder<TEvKqp::TEvFetchScriptResultsResponse> Response; }; } // anonymous namespace diff --git a/ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp index ea3a77a0d01..8df50dec2e4 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_scripts_ut.cpp @@ -693,6 +693,31 @@ Y_UNIT_TEST_SUITE(KqpQueryServiceScripts) { UNIT_ASSERT(results.GetResultSet().Truncated()); } + + Y_UNIT_TEST(TestFetchMoreThanLimit) { + constexpr size_t NUMER_BATCHES = 5; + constexpr size_t ROWS_LIMIT = 20; + + NKikimrConfig::TAppConfig appCfg; + appCfg.MutableTableServiceConfig()->MutableQueryLimits()->set_resultrowslimit(ROWS_LIMIT); + + auto kikimr = DefaultKikimrRunner({}, appCfg); + auto db = kikimr.GetQueryClient(); + auto scriptExecutionOperation = CreateScriptExecutionOperation(NUMER_BATCHES * ROWS_LIMIT + 1, db, kikimr.GetDriver()); + + auto settings = TFetchScriptResultsSettings(); + settings.RowsLimit(NUMER_BATCHES * ROWS_LIMIT); + + TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Id(), 0, settings).ExtractValueSync(); + UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); + + TResultSetParser resultSet(results.ExtractResultSet()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), NUMER_BATCHES * ROWS_LIMIT); + for (size_t i = 0; i < NUMER_BATCHES * ROWS_LIMIT; ++i) { + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetInt32(), i); + } + } } } // namespace NKqp |