diff options
author | shumkovnd <shumkovnd@yandex-team.com> | 2023-11-07 13:47:33 +0300 |
---|---|---|
committer | shumkovnd <shumkovnd@yandex-team.com> | 2023-11-07 14:26:18 +0300 |
commit | f572d47965e5e152dc9fae06edc4bb8f790f7196 (patch) | |
tree | 1a51147520481073dba79cf486b5d62e2423fa4a | |
parent | 5b1ec821c18014b06f3a78e4bc3b00f9ff4d59ee (diff) | |
download | ydb-f572d47965e5e152dc9fae06edc4bb8f790f7196.tar.gz |
KIKIMR-19287: Send to sys view
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 56 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 7 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scan_executer.cpp | 9 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 110 | ||||
-rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.cpp | 53 | ||||
-rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/ut/query/kqp_limits_ut.cpp | 56 | ||||
-rw-r--r-- | ydb/core/kqp/ut/query/kqp_stats_ut.cpp | 181 |
8 files changed, 382 insertions, 93 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index a14cb64d96..e09aab4a57 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -211,6 +211,36 @@ public: return Stats->CollectStatsByLongTasks && HasOlapTable; } + void FillResponseStats(Ydb::StatusIds::StatusCode status) { + auto& response = *ResponseEv->Record.MutableResponse(); + + response.SetStatus(status); + + if (Stats) { + ReportEventElapsedTime(); + + Stats->FinishTs = TInstant::Now(); + Stats->Finish(); + + if (LogStatsByLongTasks() || CollectFullStats(Request.StatsMode)) { + for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) { + const auto& tx = Request.Transactions[txId].Body; + auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), response.GetResult().GetStats()); + response.MutableResult()->MutableStats()->AddTxPlansWithStats(planWithStats); + } + } + + if (LogStatsByLongTasks()) { + const auto& txPlansWithStats = response.GetResult().GetStats().GetTxPlansWithStats(); + if (!txPlansWithStats.empty()) { + LOG_N("Full stats: " << txPlansWithStats); + } + } + + Stats.reset(); + } + } + void Finalize() { if (LocksBroken) { TString message = "Transaction locks invalidated."; @@ -221,7 +251,7 @@ public: auto& response = *ResponseEv->Record.MutableResponse(); - response.SetStatus(Ydb::StatusIds::SUCCESS); + FillResponseStats(Ydb::StatusIds::SUCCESS); Counters->TxProxyMon->ReportStatusOK->Inc(); auto addLocks = [&](const NYql::NDqProto::TExtraInputData& data) { @@ -249,30 +279,6 @@ public: BuildLocks(*response.MutableResult()->MutableLocks(), Locks); } - if (Stats) { - ReportEventElapsedTime(); - - Stats->FinishTs = TInstant::Now(); - Stats->Finish(); - - if (LogStatsByLongTasks() || CollectFullStats(Request.StatsMode)) { - for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) { - const auto& tx = Request.Transactions[txId].Body; - auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), response.GetResult().GetStats()); - response.MutableResult()->MutableStats()->AddTxPlansWithStats(planWithStats); - } - } - - if (LogStatsByLongTasks()) { - const auto& txPlansWithStats = response.GetResult().GetStats().GetTxPlansWithStats(); - if (!txPlansWithStats.empty()) { - LOG_N("Full stats: " << txPlansWithStats); - } - } - - Stats.reset(); - } - auto resultSize = ResponseEv->GetByteSize(); if (resultSize > (int)ReplySizeLimit) { TString message = TStringBuilder() << "Query result size limit exceeded. (" diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index b478dd040b..b613d0b796 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -1427,12 +1427,17 @@ protected: ExecuterSpan.EndError(TStringBuilder() << NYql::NDqProto::StatusIds_StatusCode_Name(status)); } + static_cast<TDerived*>(this)->FillResponseStats(Ydb::StatusIds::TIMEOUT); + // TEvAbortExecution can come from either ComputeActor or SessionActor (== Target). - // If it have come from SessionActor there is no need to send new TEvAbortExecution back if (abortSender != Target) { auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(status, "Request timeout exceeded"); this->Send(Target, abortEv.Release()); } + + LOG_E("Sending timeout response to: " << Target); + this->Send(Target, ResponseEv.release()); + Request.Transactions.crop(0); TerminateComputeActors(Ydb::StatusIds::TIMEOUT, message); this->PassAway(); diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index cb18ace2cf..02208b7041 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -292,10 +292,11 @@ private: } public: - void Finalize() { + + void FillResponseStats(Ydb::StatusIds::StatusCode status) { auto& response = *ResponseEv->Record.MutableResponse(); - response.SetStatus(Ydb::StatusIds::SUCCESS); + response.SetStatus(status); if (Stats) { ReportEventElapsedTime(); @@ -316,6 +317,10 @@ public: } } } + } + + void Finalize() { + FillResponseStats(Ydb::StatusIds::SUCCESS); LWTRACK(KqpScanExecuterFinalize, ResponseEv->Orbit, TxId, LastTaskId, LastComputeActorId, ResponseEv->ResultsSize()); diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 07047d3931..6566a651c9 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1027,7 +1027,7 @@ public: void SendToSchemeExecuter(const TKqpPhyTxHolder::TConstPtr& tx) { auto userToken = QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>(); - const TString requestType = QueryState ? QueryState->GetRequestType() : TString(); + const TString requestType = QueryState ? QueryState->GetRequestType() : TString(); bool temporary = GetTemporaryTableInfo(tx).has_value(); auto executerActor = CreateKqpSchemeExecuter(tx, SelfId(), requestType, Settings.Database, userToken, @@ -1247,14 +1247,15 @@ public: LOG_I(logMsg << ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode()) << " send to: " << ExecuterId); if (ExecuterId) { - auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(msg.GetStatusCode(), "Request timeout exceeded"); - Send(ExecuterId, abortEv.Release()); + auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>( + msg.GetStatusCode(), + "Request timeout exceeded"); + Send(ExecuterId, abortEv.Release(), IEventHandle::FlagTrackDelivery); } // Do not shortcut in case of CancelAfter event. We can send this status only in case of RO TX. if (msg.GetStatusCode() != NYql::NDqProto::StatusIds::CANCELLED) { const auto& issues = ev->Get()->GetIssues(); - ExecuterId = TActorId{}; ReplyQueryError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), logMsg, MessageFromIssues(issues)); return; } @@ -1299,8 +1300,8 @@ public: } } - void FillStats(NKikimrKqp::TEvQueryResponse* record) { - auto *response = record->MutableResponse(); + void FillSystemViewQueryStats(NKikimrKqp::TEvQueryResponse* record) { + YQL_ENSURE(QueryState); auto* stats = &QueryState->Stats; stats->SetDurationUs((TInstant::Now() - QueryState->StartTime).MicroSeconds()); @@ -1309,20 +1310,35 @@ public: stats->MutableCompilation()->Swap(&QueryState->CompileStats); } - auto requestInfo = TKqpRequestInfo(QueryState->UserRequestContext->TraceId, SessionId); - YQL_ENSURE(QueryState); if (IsExecuteAction(QueryState->GetAction())) { auto ru = NRuCalc::CalcRequestUnit(*stats); - record->SetConsumedRu(ru); + + if (record != nullptr) { + record->SetConsumedRu(ru); + } auto now = TInstant::Now(); auto queryDuration = now - QueryState->StartTime; CollectSystemViewQueryStats(stats, queryDuration, QueryState->GetDatabase(), ru); + } + } + + void FillStats(NKikimrKqp::TEvQueryResponse* record) { + YQL_ENSURE(QueryState); + + FillSystemViewQueryStats(record); + + auto *response = record->MutableResponse(); + auto requestInfo = TKqpRequestInfo(QueryState->UserRequestContext->TraceId, SessionId); + + if (IsExecuteAction(QueryState->GetAction())) { + auto queryDuration = TDuration::MicroSeconds(QueryState->Stats.GetDurationUs()); SlowLogQuery(TlsActivationContext->AsActorContext(), Config.Get(), requestInfo, queryDuration, record->GetYdbStatus(), QueryState->UserToken, QueryState->ParametersSize, record, [this]() { return this->QueryState->ExtractQueryText(); }); } + auto* stats = &QueryState->Stats; if (QueryState->ReportStats()) { if (QueryState->GetStatsMode() >= Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL) { response->SetQueryPlan(SerializeAnalyzePlan(*stats)); @@ -1778,6 +1794,61 @@ public: } } + void HandleWaitStats(TEvKqpExecuter::TEvTxResponse::TPtr& ev) { + // outdated response from dead executer. + // it this case we should just ignore the event. + if (ExecuterId != ev->Sender) { + return; + } + + auto* ptr = ev->Get(); + auto* response = ptr->Record.MutableResponse(); + + LOG_D("TEvTxResponse at WaitStats, CurrentTx: " << QueryState->CurrentTx + << "/" << (QueryState->PreparedQuery ? QueryState->PreparedQuery->GetPhysicalQuery().TransactionsSize() : 0) + << " response.status: " << response->GetStatus()); + + ExecuterId = TActorId{}; + + YQL_ENSURE(QueryState); + + auto& executerResults = *response->MutableResult(); + if (executerResults.HasStats()) { + auto* exec = QueryState->Stats.AddExecutions(); + exec->Swap(executerResults.MutableStats()); + } + + Become(&TKqpSessionActor::ExecuteState); + + FillSystemViewQueryStats(nullptr); + Cleanup(false); + } + + void HandleNoop(TEvents::TEvUndelivered::TPtr& ev) { + // outdated TEvUndelivered from another executer. + // it this case we should just ignore the event. + Y_ENSURE(ExecuterId != ev->Sender); + } + + void HandleWaitStats(TEvents::TEvUndelivered::TPtr& ev) { + // outdated TEvUndelivered from another executer. + // it this case we should just ignore the event. + if (ExecuterId != ev->Sender) { + return; + } + + LOG_D("TEvUndelivered at WaitStats, CurrentTx: " << QueryState->CurrentTx); + + ExecuterId = TActorId{}; + + YQL_ENSURE(QueryState); + + Become(&TKqpSessionActor::ExecuteState); + + FillSystemViewQueryStats(nullptr); + Cleanup(false); + } + void HandleCleanup(TEvKqpExecuter::TEvTxResponse::TPtr& ev) { if (ev->Sender != ExecuterId) { return; @@ -1882,6 +1953,15 @@ public: FillTxInfo(response); + if (ExecuterId && CurrentStateFunc() == &TThis::ExecuteState && ydbStatus == Ydb::StatusIds::TIMEOUT) { + Become(&TKqpSessionActor::WaitStatsState); + return; + } + + ExecuterId = TActorId{}; + if (CurrentStateFunc() == &TThis::CompileState && ydbStatus == Ydb::StatusIds::TIMEOUT) { + FillSystemViewQueryStats(nullptr); + } Cleanup(IsFatalError(ydbStatus)); } @@ -1929,6 +2009,7 @@ public: hFunc(TEvKqp::TEvCompileResponse, HandleNoop); hFunc(TEvKqpExecuter::TEvTxResponse, HandleNoop); hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleNoop); + hFunc(TEvents::TEvUndelivered, HandleNoop); // message from KQP proxy in case of our reply just after kqp proxy timer tick hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleNoop); default: @@ -1957,6 +2038,7 @@ public: // forgotten messages from previous aborted request hFunc(TEvKqpExecuter::TEvTxResponse, HandleNoop); hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); + hFunc(TEvents::TEvUndelivered, HandleNoop); default: UnexpectedEvent("CompileState", ev); } @@ -1988,6 +2070,7 @@ public: // forgotten messages from previous aborted request hFunc(TEvKqp::TEvCompileResponse, Handle); hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleNoop); + hFunc(TEvents::TEvUndelivered, HandleNoop); // always come from WorkerActor hFunc(TEvKqp::TEvQueryResponse, ForwardResponse); @@ -2018,6 +2101,7 @@ public: hFunc(TEvKqp::TEvCompileResponse, HandleNoop); hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleNoop); hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleNoop); + hFunc(TEvents::TEvUndelivered, HandleNoop); // always come from WorkerActor hFunc(TEvKqp::TEvCloseSessionResponse, HandleCleanup); @@ -2033,6 +2117,14 @@ public: STATEFN(FinalCleanupState) { switch (ev->GetTypeRewrite()) { hFunc(TEvents::TEvGone, HandleFinalCleanup); + hFunc(TEvents::TEvUndelivered, HandleNoop); + } + } + + STATEFN(WaitStatsState) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvents::TEvUndelivered, HandleWaitStats) + hFunc(TEvKqpExecuter::TEvTxResponse, HandleWaitStats); } } diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp index ae5a996481..f05c5f6be1 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp +++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp @@ -599,6 +599,59 @@ void FillProfile(NYdb::NTable::TScanQueryPart& streamPart, NYson::TYsonWriter& w Y_UNUSED(profileIndex); } +void CreateLargeTable(TKikimrRunner& kikimr, ui32 rowsPerShard, ui32 keyTextSize, + ui32 dataTextSize, ui32 batchSizeRows, ui32 fillShardsCount, ui32 largeTableKeysPerShard) +{ + kikimr.GetTestClient().CreateTable("/Root", R"( + Name: "LargeTable" + Columns { Name: "Key", Type: "Uint64" } + Columns { Name: "KeyText", Type: "String" } + Columns { Name: "Data", Type: "Int64" } + Columns { Name: "DataText", Type: "String" } + KeyColumnNames: ["Key", "KeyText"], + SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 1000000 } } } } + SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 2000000 } } } } + SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 3000000 } } } } + SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 4000000 } } } } + SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 5000000 } } } } + SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 6000000 } } } } + SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 7000000 } } } } + )"); + + auto client = kikimr.GetTableClient(); + + for (ui32 shardIdx = 0; shardIdx < fillShardsCount; ++shardIdx) { + ui32 rowIndex = 0; + while (rowIndex < rowsPerShard) { + + auto rowsBuilder = NYdb::TValueBuilder(); + rowsBuilder.BeginList(); + for (ui32 i = 0; i < batchSizeRows; ++i) { + rowsBuilder.AddListItem() + .BeginStruct() + .AddMember("Key") + .OptionalUint64(shardIdx * largeTableKeysPerShard + rowIndex) + .AddMember("KeyText") + .OptionalString(TString(keyTextSize, '0' + (i + shardIdx) % 10)) + .AddMember("Data") + .OptionalInt64(rowIndex) + .AddMember("DataText") + .OptionalString(TString(dataTextSize, '0' + (i + shardIdx + 1) % 10)) + .EndStruct(); + + ++rowIndex; + if (rowIndex == rowsPerShard) { + break; + } + } + rowsBuilder.EndList(); + + auto result = client.BulkUpsert("/Root/LargeTable", rowsBuilder.Build()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + } +} + void PrintResultSet(const NYdb::TResultSet& resultSet, NYson::TYsonWriter& writer) { auto columns = resultSet.GetColumnsMeta(); diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h index 1560d3913c..4189100e29 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.h +++ b/ydb/core/kqp/ut/common/kqp_ut_common.h @@ -226,6 +226,9 @@ TString ReformatYson(const TString& yson); void CompareYson(const TString& expected, const TString& actual); void CompareYson(const TString& expected, const NKikimrMiniKQL::TResult& actual); +void CreateLargeTable(TKikimrRunner& kikimr, ui32 rowsPerShard, ui32 keyTextSize, + ui32 dataTextSize, ui32 batchSizeRows = 100, ui32 fillShardsCount = 8, ui32 largeTableKeysPerShard = 1000000); + bool HasIssue(const NYql::TIssues& issues, ui32 code, std::function<bool(const NYql::TIssue& issue)> predicate = {}); diff --git a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp index ded8fe7c99..67537bcee6 100644 --- a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp @@ -10,68 +10,12 @@ namespace NKqp { using namespace NYdb; using namespace NYdb::NTable; -static const ui32 LargeTableShards = 8; -static const ui32 LargeTableKeysPerShard = 1000000; - namespace { bool IsRetryable(const EStatus& status) { return status == EStatus::OVERLOADED; } } -static void CreateLargeTable(TKikimrRunner& kikimr, ui32 rowsPerShard, ui32 keyTextSize, - ui32 dataTextSize, ui32 batchSizeRows = 100, ui32 fillShardsCount = LargeTableShards) -{ - kikimr.GetTestClient().CreateTable("/Root", R"( - Name: "LargeTable" - Columns { Name: "Key", Type: "Uint64" } - Columns { Name: "KeyText", Type: "String" } - Columns { Name: "Data", Type: "Int64" } - Columns { Name: "DataText", Type: "String" } - KeyColumnNames: ["Key", "KeyText"], - SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 1000000 } } } } - SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 2000000 } } } } - SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 3000000 } } } } - SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 4000000 } } } } - SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 5000000 } } } } - SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 6000000 } } } } - SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 7000000 } } } } - )"); - - auto client = kikimr.GetTableClient(); - - for (ui32 shardIdx = 0; shardIdx < fillShardsCount; ++shardIdx) { - ui32 rowIndex = 0; - while (rowIndex < rowsPerShard) { - - auto rowsBuilder = TValueBuilder(); - rowsBuilder.BeginList(); - for (ui32 i = 0; i < batchSizeRows; ++i) { - rowsBuilder.AddListItem() - .BeginStruct() - .AddMember("Key") - .OptionalUint64(shardIdx * LargeTableKeysPerShard + rowIndex) - .AddMember("KeyText") - .OptionalString(TString(keyTextSize, '0' + (i + shardIdx) % 10)) - .AddMember("Data") - .OptionalInt64(rowIndex) - .AddMember("DataText") - .OptionalString(TString(dataTextSize, '0' + (i + shardIdx + 1) % 10)) - .EndStruct(); - - ++rowIndex; - if (rowIndex == rowsPerShard) { - break; - } - } - rowsBuilder.EndList(); - - auto result = client.BulkUpsert("/Root/LargeTable", rowsBuilder.Build()).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - } - } -} - Y_UNIT_TEST_SUITE(KqpLimits) { Y_UNIT_TEST(DatashardProgramSize) { auto app = NKikimrConfig::TAppConfig(); diff --git a/ydb/core/kqp/ut/query/kqp_stats_ut.cpp b/ydb/core/kqp/ut/query/kqp_stats_ut.cpp index 64c62774a0..ed26e48f62 100644 --- a/ydb/core/kqp/ut/query/kqp_stats_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_stats_ut.cpp @@ -437,6 +437,187 @@ Y_UNIT_TEST(StreamLookupStats) { }); } +Y_UNIT_TEST(SysViewTimeout) { + TKikimrRunner kikimr; + CreateLargeTable(kikimr, 500000, 10, 100, 5000, 1); + + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + { + TStringStream request; + request << "SELECT * FROM `/Root/.sys/top_queries_by_read_bytes_one_hour` ORDER BY Duration"; + + auto it = db.StreamExecuteScanQuery(request.Str()).GetValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + + ui64 rowsCount = 0; + for (;;) { + auto streamPart = it.ReadNext().GetValueSync(); + if (!streamPart.IsSuccess()) { + UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString()); + break; + } + + if (streamPart.HasResultSet()) { + auto resultSet = streamPart.ExtractResultSet(); + + NYdb::TResultSetParser parser(resultSet); + while (parser.TryNextRow()) { + auto value = parser.ColumnParser("QueryText").GetOptionalUtf8(); + UNIT_ASSERT(value); + rowsCount++; + } + } + } + UNIT_ASSERT(rowsCount == 1); + } + + auto settings = TStreamExecScanQuerySettings(); + settings.ClientTimeout(TDuration::MilliSeconds(50)); + + TStringStream request; + request << R"( + SELECT COUNT(*) FROM `/Root/LargeTable` WHERE SUBSTRING(DataText, 50, 5) = "22222"; + )"; + + auto result = db.StreamExecuteScanQuery(request.Str(), settings).GetValueSync(); + + if (result.IsSuccess()) { + try { + auto yson = StreamResultToYson(result, true); + UNIT_ASSERT(false); + } catch (const TStreamReadError& ex) { + UNIT_ASSERT_VALUES_EQUAL(ex.Status, NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED); + } catch (const std::exception& ex) { + UNIT_ASSERT_C(false, "unknown exception during the test"); + } + } else { + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED); + } + + { + TStringStream request; + request << "SELECT * FROM `/Root/.sys/top_queries_by_read_bytes_one_hour` ORDER BY Duration"; + + auto it = db.StreamExecuteScanQuery(request.Str()).GetValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + + ui64 queryCount = 0; + ui64 rowsCount = 0; + for (;;) { + auto streamPart = it.ReadNext().GetValueSync(); + if (!streamPart.IsSuccess()) { + UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString()); + break; + } + + if (streamPart.HasResultSet()) { + auto resultSet = streamPart.ExtractResultSet(); + + NYdb::TResultSetParser parser(resultSet); + while (parser.TryNextRow()) { + auto value = parser.ColumnParser("QueryText").GetOptionalUtf8(); + UNIT_ASSERT(value); + if (*value == request.Str()) { + queryCount++; + } + rowsCount++; + } + } + } + + UNIT_ASSERT(queryCount == 1); + UNIT_ASSERT(rowsCount == 2); + } +} + +Y_UNIT_TEST(SysViewCancelled) { + TKikimrRunner kikimr; + CreateLargeTable(kikimr, 500000, 10, 100, 5000, 1); + + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + { + TStringStream request; + request << "SELECT * FROM `/Root/.sys/top_queries_by_read_bytes_one_hour` ORDER BY Duration"; + + auto it = db.StreamExecuteScanQuery(request.Str()).GetValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + + ui64 rowsCount = 0; + for (;;) { + auto streamPart = it.ReadNext().GetValueSync(); + if (!streamPart.IsSuccess()) { + UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString()); + break; + } + + if (streamPart.HasResultSet()) { + auto resultSet = streamPart.ExtractResultSet(); + + NYdb::TResultSetParser parser(resultSet); + while (parser.TryNextRow()) { + auto value = parser.ColumnParser("QueryText").GetOptionalUtf8(); + UNIT_ASSERT(value); + rowsCount++; + } + } + } + UNIT_ASSERT(rowsCount == 1); + } + + auto prepareResult = session.PrepareDataQuery(Q_(R"( + SELECT COUNT(*) FROM `/Root/LargeTable` WHERE SUBSTRING(DataText, 50, 5) = "33333"; + )")).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(prepareResult.GetStatus(), NYdb::EStatus::SUCCESS, prepareResult.GetIssues().ToString()); + auto dataQuery = prepareResult.GetQuery(); + + auto settings = TExecDataQuerySettings(); + settings.CancelAfter(TDuration::MilliSeconds(100)); + + auto result = dataQuery.Execute(TTxControl::BeginTx().CommitTx(), settings).GetValueSync(); + + result.GetIssues().PrintTo(Cerr); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::CANCELLED); + + { + TStringStream request; + request << "SELECT * FROM `/Root/.sys/top_queries_by_read_bytes_one_hour` ORDER BY Duration"; + + auto it = db.StreamExecuteScanQuery(request.Str()).GetValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + + ui64 queryCount = 0; + ui64 rowsCount = 0; + for (;;) { + auto streamPart = it.ReadNext().GetValueSync(); + if (!streamPart.IsSuccess()) { + UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString()); + break; + } + + if (streamPart.HasResultSet()) { + auto resultSet = streamPart.ExtractResultSet(); + + NYdb::TResultSetParser parser(resultSet); + while (parser.TryNextRow()) { + auto value = parser.ColumnParser("QueryText").GetOptionalUtf8(); + UNIT_ASSERT(value); + if (*value == request.Str()) { + queryCount++; + } + rowsCount++; + } + } + } + + UNIT_ASSERT(queryCount == 1); + UNIT_ASSERT(rowsCount == 2); + } +} + } // suite } // namespace NKqp |