aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorshumkovnd <shumkovnd@yandex-team.com>2023-11-07 13:47:33 +0300
committershumkovnd <shumkovnd@yandex-team.com>2023-11-07 14:26:18 +0300
commitf572d47965e5e152dc9fae06edc4bb8f790f7196 (patch)
tree1a51147520481073dba79cf486b5d62e2423fa4a
parent5b1ec821c18014b06f3a78e4bc3b00f9ff4d59ee (diff)
downloadydb-f572d47965e5e152dc9fae06edc4bb8f790f7196.tar.gz
KIKIMR-19287: Send to sys view
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp56
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h7
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp9
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp110
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.cpp53
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.h3
-rw-r--r--ydb/core/kqp/ut/query/kqp_limits_ut.cpp56
-rw-r--r--ydb/core/kqp/ut/query/kqp_stats_ut.cpp181
8 files changed, 382 insertions, 93 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index a14cb64d96..e09aab4a57 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -211,6 +211,36 @@ public:
return Stats->CollectStatsByLongTasks && HasOlapTable;
}
+ void FillResponseStats(Ydb::StatusIds::StatusCode status) {
+ auto& response = *ResponseEv->Record.MutableResponse();
+
+ response.SetStatus(status);
+
+ if (Stats) {
+ ReportEventElapsedTime();
+
+ Stats->FinishTs = TInstant::Now();
+ Stats->Finish();
+
+ if (LogStatsByLongTasks() || CollectFullStats(Request.StatsMode)) {
+ for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) {
+ const auto& tx = Request.Transactions[txId].Body;
+ auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), response.GetResult().GetStats());
+ response.MutableResult()->MutableStats()->AddTxPlansWithStats(planWithStats);
+ }
+ }
+
+ if (LogStatsByLongTasks()) {
+ const auto& txPlansWithStats = response.GetResult().GetStats().GetTxPlansWithStats();
+ if (!txPlansWithStats.empty()) {
+ LOG_N("Full stats: " << txPlansWithStats);
+ }
+ }
+
+ Stats.reset();
+ }
+ }
+
void Finalize() {
if (LocksBroken) {
TString message = "Transaction locks invalidated.";
@@ -221,7 +251,7 @@ public:
auto& response = *ResponseEv->Record.MutableResponse();
- response.SetStatus(Ydb::StatusIds::SUCCESS);
+ FillResponseStats(Ydb::StatusIds::SUCCESS);
Counters->TxProxyMon->ReportStatusOK->Inc();
auto addLocks = [&](const NYql::NDqProto::TExtraInputData& data) {
@@ -249,30 +279,6 @@ public:
BuildLocks(*response.MutableResult()->MutableLocks(), Locks);
}
- if (Stats) {
- ReportEventElapsedTime();
-
- Stats->FinishTs = TInstant::Now();
- Stats->Finish();
-
- if (LogStatsByLongTasks() || CollectFullStats(Request.StatsMode)) {
- for (ui32 txId = 0; txId < Request.Transactions.size(); ++txId) {
- const auto& tx = Request.Transactions[txId].Body;
- auto planWithStats = AddExecStatsToTxPlan(tx->GetPlan(), response.GetResult().GetStats());
- response.MutableResult()->MutableStats()->AddTxPlansWithStats(planWithStats);
- }
- }
-
- if (LogStatsByLongTasks()) {
- const auto& txPlansWithStats = response.GetResult().GetStats().GetTxPlansWithStats();
- if (!txPlansWithStats.empty()) {
- LOG_N("Full stats: " << txPlansWithStats);
- }
- }
-
- Stats.reset();
- }
-
auto resultSize = ResponseEv->GetByteSize();
if (resultSize > (int)ReplySizeLimit) {
TString message = TStringBuilder() << "Query result size limit exceeded. ("
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index b478dd040b..b613d0b796 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -1427,12 +1427,17 @@ protected:
ExecuterSpan.EndError(TStringBuilder() << NYql::NDqProto::StatusIds_StatusCode_Name(status));
}
+ static_cast<TDerived*>(this)->FillResponseStats(Ydb::StatusIds::TIMEOUT);
+
// TEvAbortExecution can come from either ComputeActor or SessionActor (== Target).
- // If it have come from SessionActor there is no need to send new TEvAbortExecution back
if (abortSender != Target) {
auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(status, "Request timeout exceeded");
this->Send(Target, abortEv.Release());
}
+
+ LOG_E("Sending timeout response to: " << Target);
+ this->Send(Target, ResponseEv.release());
+
Request.Transactions.crop(0);
TerminateComputeActors(Ydb::StatusIds::TIMEOUT, message);
this->PassAway();
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index cb18ace2cf..02208b7041 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -292,10 +292,11 @@ private:
}
public:
- void Finalize() {
+
+ void FillResponseStats(Ydb::StatusIds::StatusCode status) {
auto& response = *ResponseEv->Record.MutableResponse();
- response.SetStatus(Ydb::StatusIds::SUCCESS);
+ response.SetStatus(status);
if (Stats) {
ReportEventElapsedTime();
@@ -316,6 +317,10 @@ public:
}
}
}
+ }
+
+ void Finalize() {
+ FillResponseStats(Ydb::StatusIds::SUCCESS);
LWTRACK(KqpScanExecuterFinalize, ResponseEv->Orbit, TxId, LastTaskId, LastComputeActorId, ResponseEv->ResultsSize());
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 07047d3931..6566a651c9 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -1027,7 +1027,7 @@ public:
void SendToSchemeExecuter(const TKqpPhyTxHolder::TConstPtr& tx) {
auto userToken = QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>();
- const TString requestType = QueryState ? QueryState->GetRequestType() : TString();
+ const TString requestType = QueryState ? QueryState->GetRequestType() : TString();
bool temporary = GetTemporaryTableInfo(tx).has_value();
auto executerActor = CreateKqpSchemeExecuter(tx, SelfId(), requestType, Settings.Database, userToken,
@@ -1247,14 +1247,15 @@ public:
LOG_I(logMsg << ", status: " << NYql::NDqProto::StatusIds_StatusCode_Name(msg.GetStatusCode()) << " send to: " << ExecuterId);
if (ExecuterId) {
- auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(msg.GetStatusCode(), "Request timeout exceeded");
- Send(ExecuterId, abortEv.Release());
+ auto abortEv = MakeHolder<TEvKqp::TEvAbortExecution>(
+ msg.GetStatusCode(),
+ "Request timeout exceeded");
+ Send(ExecuterId, abortEv.Release(), IEventHandle::FlagTrackDelivery);
}
// Do not shortcut in case of CancelAfter event. We can send this status only in case of RO TX.
if (msg.GetStatusCode() != NYql::NDqProto::StatusIds::CANCELLED) {
const auto& issues = ev->Get()->GetIssues();
- ExecuterId = TActorId{};
ReplyQueryError(NYql::NDq::DqStatusToYdbStatus(msg.GetStatusCode()), logMsg, MessageFromIssues(issues));
return;
}
@@ -1299,8 +1300,8 @@ public:
}
}
- void FillStats(NKikimrKqp::TEvQueryResponse* record) {
- auto *response = record->MutableResponse();
+ void FillSystemViewQueryStats(NKikimrKqp::TEvQueryResponse* record) {
+ YQL_ENSURE(QueryState);
auto* stats = &QueryState->Stats;
stats->SetDurationUs((TInstant::Now() - QueryState->StartTime).MicroSeconds());
@@ -1309,20 +1310,35 @@ public:
stats->MutableCompilation()->Swap(&QueryState->CompileStats);
}
- auto requestInfo = TKqpRequestInfo(QueryState->UserRequestContext->TraceId, SessionId);
- YQL_ENSURE(QueryState);
if (IsExecuteAction(QueryState->GetAction())) {
auto ru = NRuCalc::CalcRequestUnit(*stats);
- record->SetConsumedRu(ru);
+
+ if (record != nullptr) {
+ record->SetConsumedRu(ru);
+ }
auto now = TInstant::Now();
auto queryDuration = now - QueryState->StartTime;
CollectSystemViewQueryStats(stats, queryDuration, QueryState->GetDatabase(), ru);
+ }
+ }
+
+ void FillStats(NKikimrKqp::TEvQueryResponse* record) {
+ YQL_ENSURE(QueryState);
+
+ FillSystemViewQueryStats(record);
+
+ auto *response = record->MutableResponse();
+ auto requestInfo = TKqpRequestInfo(QueryState->UserRequestContext->TraceId, SessionId);
+
+ if (IsExecuteAction(QueryState->GetAction())) {
+ auto queryDuration = TDuration::MicroSeconds(QueryState->Stats.GetDurationUs());
SlowLogQuery(TlsActivationContext->AsActorContext(), Config.Get(), requestInfo, queryDuration,
record->GetYdbStatus(), QueryState->UserToken, QueryState->ParametersSize, record,
[this]() { return this->QueryState->ExtractQueryText(); });
}
+ auto* stats = &QueryState->Stats;
if (QueryState->ReportStats()) {
if (QueryState->GetStatsMode() >= Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL) {
response->SetQueryPlan(SerializeAnalyzePlan(*stats));
@@ -1778,6 +1794,61 @@ public:
}
}
+ void HandleWaitStats(TEvKqpExecuter::TEvTxResponse::TPtr& ev) {
+ // outdated response from dead executer.
+ // it this case we should just ignore the event.
+ if (ExecuterId != ev->Sender) {
+ return;
+ }
+
+ auto* ptr = ev->Get();
+ auto* response = ptr->Record.MutableResponse();
+
+ LOG_D("TEvTxResponse at WaitStats, CurrentTx: " << QueryState->CurrentTx
+ << "/" << (QueryState->PreparedQuery ? QueryState->PreparedQuery->GetPhysicalQuery().TransactionsSize() : 0)
+ << " response.status: " << response->GetStatus());
+
+ ExecuterId = TActorId{};
+
+ YQL_ENSURE(QueryState);
+
+ auto& executerResults = *response->MutableResult();
+ if (executerResults.HasStats()) {
+ auto* exec = QueryState->Stats.AddExecutions();
+ exec->Swap(executerResults.MutableStats());
+ }
+
+ Become(&TKqpSessionActor::ExecuteState);
+
+ FillSystemViewQueryStats(nullptr);
+ Cleanup(false);
+ }
+
+ void HandleNoop(TEvents::TEvUndelivered::TPtr& ev) {
+ // outdated TEvUndelivered from another executer.
+ // it this case we should just ignore the event.
+ Y_ENSURE(ExecuterId != ev->Sender);
+ }
+
+ void HandleWaitStats(TEvents::TEvUndelivered::TPtr& ev) {
+ // outdated TEvUndelivered from another executer.
+ // it this case we should just ignore the event.
+ if (ExecuterId != ev->Sender) {
+ return;
+ }
+
+ LOG_D("TEvUndelivered at WaitStats, CurrentTx: " << QueryState->CurrentTx);
+
+ ExecuterId = TActorId{};
+
+ YQL_ENSURE(QueryState);
+
+ Become(&TKqpSessionActor::ExecuteState);
+
+ FillSystemViewQueryStats(nullptr);
+ Cleanup(false);
+ }
+
void HandleCleanup(TEvKqpExecuter::TEvTxResponse::TPtr& ev) {
if (ev->Sender != ExecuterId) {
return;
@@ -1882,6 +1953,15 @@ public:
FillTxInfo(response);
+ if (ExecuterId && CurrentStateFunc() == &TThis::ExecuteState && ydbStatus == Ydb::StatusIds::TIMEOUT) {
+ Become(&TKqpSessionActor::WaitStatsState);
+ return;
+ }
+
+ ExecuterId = TActorId{};
+ if (CurrentStateFunc() == &TThis::CompileState && ydbStatus == Ydb::StatusIds::TIMEOUT) {
+ FillSystemViewQueryStats(nullptr);
+ }
Cleanup(IsFatalError(ydbStatus));
}
@@ -1929,6 +2009,7 @@ public:
hFunc(TEvKqp::TEvCompileResponse, HandleNoop);
hFunc(TEvKqpExecuter::TEvTxResponse, HandleNoop);
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleNoop);
+ hFunc(TEvents::TEvUndelivered, HandleNoop);
// message from KQP proxy in case of our reply just after kqp proxy timer tick
hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleNoop);
default:
@@ -1957,6 +2038,7 @@ public:
// forgotten messages from previous aborted request
hFunc(TEvKqpExecuter::TEvTxResponse, HandleNoop);
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
+ hFunc(TEvents::TEvUndelivered, HandleNoop);
default:
UnexpectedEvent("CompileState", ev);
}
@@ -1988,6 +2070,7 @@ public:
// forgotten messages from previous aborted request
hFunc(TEvKqp::TEvCompileResponse, Handle);
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleNoop);
+ hFunc(TEvents::TEvUndelivered, HandleNoop);
// always come from WorkerActor
hFunc(TEvKqp::TEvQueryResponse, ForwardResponse);
@@ -2018,6 +2101,7 @@ public:
hFunc(TEvKqp::TEvCompileResponse, HandleNoop);
hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleNoop);
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleNoop);
+ hFunc(TEvents::TEvUndelivered, HandleNoop);
// always come from WorkerActor
hFunc(TEvKqp::TEvCloseSessionResponse, HandleCleanup);
@@ -2033,6 +2117,14 @@ public:
STATEFN(FinalCleanupState) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvents::TEvGone, HandleFinalCleanup);
+ hFunc(TEvents::TEvUndelivered, HandleNoop);
+ }
+ }
+
+ STATEFN(WaitStatsState) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvents::TEvUndelivered, HandleWaitStats)
+ hFunc(TEvKqpExecuter::TEvTxResponse, HandleWaitStats);
}
}
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
index ae5a996481..f05c5f6be1 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
@@ -599,6 +599,59 @@ void FillProfile(NYdb::NTable::TScanQueryPart& streamPart, NYson::TYsonWriter& w
Y_UNUSED(profileIndex);
}
+void CreateLargeTable(TKikimrRunner& kikimr, ui32 rowsPerShard, ui32 keyTextSize,
+ ui32 dataTextSize, ui32 batchSizeRows, ui32 fillShardsCount, ui32 largeTableKeysPerShard)
+{
+ kikimr.GetTestClient().CreateTable("/Root", R"(
+ Name: "LargeTable"
+ Columns { Name: "Key", Type: "Uint64" }
+ Columns { Name: "KeyText", Type: "String" }
+ Columns { Name: "Data", Type: "Int64" }
+ Columns { Name: "DataText", Type: "String" }
+ KeyColumnNames: ["Key", "KeyText"],
+ SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 1000000 } } } }
+ SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 2000000 } } } }
+ SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 3000000 } } } }
+ SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 4000000 } } } }
+ SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 5000000 } } } }
+ SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 6000000 } } } }
+ SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 7000000 } } } }
+ )");
+
+ auto client = kikimr.GetTableClient();
+
+ for (ui32 shardIdx = 0; shardIdx < fillShardsCount; ++shardIdx) {
+ ui32 rowIndex = 0;
+ while (rowIndex < rowsPerShard) {
+
+ auto rowsBuilder = NYdb::TValueBuilder();
+ rowsBuilder.BeginList();
+ for (ui32 i = 0; i < batchSizeRows; ++i) {
+ rowsBuilder.AddListItem()
+ .BeginStruct()
+ .AddMember("Key")
+ .OptionalUint64(shardIdx * largeTableKeysPerShard + rowIndex)
+ .AddMember("KeyText")
+ .OptionalString(TString(keyTextSize, '0' + (i + shardIdx) % 10))
+ .AddMember("Data")
+ .OptionalInt64(rowIndex)
+ .AddMember("DataText")
+ .OptionalString(TString(dataTextSize, '0' + (i + shardIdx + 1) % 10))
+ .EndStruct();
+
+ ++rowIndex;
+ if (rowIndex == rowsPerShard) {
+ break;
+ }
+ }
+ rowsBuilder.EndList();
+
+ auto result = client.BulkUpsert("/Root/LargeTable", rowsBuilder.Build()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ }
+}
+
void PrintResultSet(const NYdb::TResultSet& resultSet, NYson::TYsonWriter& writer) {
auto columns = resultSet.GetColumnsMeta();
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h
index 1560d3913c..4189100e29 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.h
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.h
@@ -226,6 +226,9 @@ TString ReformatYson(const TString& yson);
void CompareYson(const TString& expected, const TString& actual);
void CompareYson(const TString& expected, const NKikimrMiniKQL::TResult& actual);
+void CreateLargeTable(TKikimrRunner& kikimr, ui32 rowsPerShard, ui32 keyTextSize,
+ ui32 dataTextSize, ui32 batchSizeRows = 100, ui32 fillShardsCount = 8, ui32 largeTableKeysPerShard = 1000000);
+
bool HasIssue(const NYql::TIssues& issues, ui32 code,
std::function<bool(const NYql::TIssue& issue)> predicate = {});
diff --git a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp
index ded8fe7c99..67537bcee6 100644
--- a/ydb/core/kqp/ut/query/kqp_limits_ut.cpp
+++ b/ydb/core/kqp/ut/query/kqp_limits_ut.cpp
@@ -10,68 +10,12 @@ namespace NKqp {
using namespace NYdb;
using namespace NYdb::NTable;
-static const ui32 LargeTableShards = 8;
-static const ui32 LargeTableKeysPerShard = 1000000;
-
namespace {
bool IsRetryable(const EStatus& status) {
return status == EStatus::OVERLOADED;
}
}
-static void CreateLargeTable(TKikimrRunner& kikimr, ui32 rowsPerShard, ui32 keyTextSize,
- ui32 dataTextSize, ui32 batchSizeRows = 100, ui32 fillShardsCount = LargeTableShards)
-{
- kikimr.GetTestClient().CreateTable("/Root", R"(
- Name: "LargeTable"
- Columns { Name: "Key", Type: "Uint64" }
- Columns { Name: "KeyText", Type: "String" }
- Columns { Name: "Data", Type: "Int64" }
- Columns { Name: "DataText", Type: "String" }
- KeyColumnNames: ["Key", "KeyText"],
- SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 1000000 } } } }
- SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 2000000 } } } }
- SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 3000000 } } } }
- SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 4000000 } } } }
- SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 5000000 } } } }
- SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 6000000 } } } }
- SplitBoundary { KeyPrefix { Tuple { Optional { Uint64: 7000000 } } } }
- )");
-
- auto client = kikimr.GetTableClient();
-
- for (ui32 shardIdx = 0; shardIdx < fillShardsCount; ++shardIdx) {
- ui32 rowIndex = 0;
- while (rowIndex < rowsPerShard) {
-
- auto rowsBuilder = TValueBuilder();
- rowsBuilder.BeginList();
- for (ui32 i = 0; i < batchSizeRows; ++i) {
- rowsBuilder.AddListItem()
- .BeginStruct()
- .AddMember("Key")
- .OptionalUint64(shardIdx * LargeTableKeysPerShard + rowIndex)
- .AddMember("KeyText")
- .OptionalString(TString(keyTextSize, '0' + (i + shardIdx) % 10))
- .AddMember("Data")
- .OptionalInt64(rowIndex)
- .AddMember("DataText")
- .OptionalString(TString(dataTextSize, '0' + (i + shardIdx + 1) % 10))
- .EndStruct();
-
- ++rowIndex;
- if (rowIndex == rowsPerShard) {
- break;
- }
- }
- rowsBuilder.EndList();
-
- auto result = client.BulkUpsert("/Root/LargeTable", rowsBuilder.Build()).ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
- }
- }
-}
-
Y_UNIT_TEST_SUITE(KqpLimits) {
Y_UNIT_TEST(DatashardProgramSize) {
auto app = NKikimrConfig::TAppConfig();
diff --git a/ydb/core/kqp/ut/query/kqp_stats_ut.cpp b/ydb/core/kqp/ut/query/kqp_stats_ut.cpp
index 64c62774a0..ed26e48f62 100644
--- a/ydb/core/kqp/ut/query/kqp_stats_ut.cpp
+++ b/ydb/core/kqp/ut/query/kqp_stats_ut.cpp
@@ -437,6 +437,187 @@ Y_UNIT_TEST(StreamLookupStats) {
});
}
+Y_UNIT_TEST(SysViewTimeout) {
+ TKikimrRunner kikimr;
+ CreateLargeTable(kikimr, 500000, 10, 100, 5000, 1);
+
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ {
+ TStringStream request;
+ request << "SELECT * FROM `/Root/.sys/top_queries_by_read_bytes_one_hour` ORDER BY Duration";
+
+ auto it = db.StreamExecuteScanQuery(request.Str()).GetValueSync();
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+
+ ui64 rowsCount = 0;
+ for (;;) {
+ auto streamPart = it.ReadNext().GetValueSync();
+ if (!streamPart.IsSuccess()) {
+ UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString());
+ break;
+ }
+
+ if (streamPart.HasResultSet()) {
+ auto resultSet = streamPart.ExtractResultSet();
+
+ NYdb::TResultSetParser parser(resultSet);
+ while (parser.TryNextRow()) {
+ auto value = parser.ColumnParser("QueryText").GetOptionalUtf8();
+ UNIT_ASSERT(value);
+ rowsCount++;
+ }
+ }
+ }
+ UNIT_ASSERT(rowsCount == 1);
+ }
+
+ auto settings = TStreamExecScanQuerySettings();
+ settings.ClientTimeout(TDuration::MilliSeconds(50));
+
+ TStringStream request;
+ request << R"(
+ SELECT COUNT(*) FROM `/Root/LargeTable` WHERE SUBSTRING(DataText, 50, 5) = "22222";
+ )";
+
+ auto result = db.StreamExecuteScanQuery(request.Str(), settings).GetValueSync();
+
+ if (result.IsSuccess()) {
+ try {
+ auto yson = StreamResultToYson(result, true);
+ UNIT_ASSERT(false);
+ } catch (const TStreamReadError& ex) {
+ UNIT_ASSERT_VALUES_EQUAL(ex.Status, NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED);
+ } catch (const std::exception& ex) {
+ UNIT_ASSERT_C(false, "unknown exception during the test");
+ }
+ } else {
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED);
+ }
+
+ {
+ TStringStream request;
+ request << "SELECT * FROM `/Root/.sys/top_queries_by_read_bytes_one_hour` ORDER BY Duration";
+
+ auto it = db.StreamExecuteScanQuery(request.Str()).GetValueSync();
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+
+ ui64 queryCount = 0;
+ ui64 rowsCount = 0;
+ for (;;) {
+ auto streamPart = it.ReadNext().GetValueSync();
+ if (!streamPart.IsSuccess()) {
+ UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString());
+ break;
+ }
+
+ if (streamPart.HasResultSet()) {
+ auto resultSet = streamPart.ExtractResultSet();
+
+ NYdb::TResultSetParser parser(resultSet);
+ while (parser.TryNextRow()) {
+ auto value = parser.ColumnParser("QueryText").GetOptionalUtf8();
+ UNIT_ASSERT(value);
+ if (*value == request.Str()) {
+ queryCount++;
+ }
+ rowsCount++;
+ }
+ }
+ }
+
+ UNIT_ASSERT(queryCount == 1);
+ UNIT_ASSERT(rowsCount == 2);
+ }
+}
+
+Y_UNIT_TEST(SysViewCancelled) {
+ TKikimrRunner kikimr;
+ CreateLargeTable(kikimr, 500000, 10, 100, 5000, 1);
+
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ {
+ TStringStream request;
+ request << "SELECT * FROM `/Root/.sys/top_queries_by_read_bytes_one_hour` ORDER BY Duration";
+
+ auto it = db.StreamExecuteScanQuery(request.Str()).GetValueSync();
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+
+ ui64 rowsCount = 0;
+ for (;;) {
+ auto streamPart = it.ReadNext().GetValueSync();
+ if (!streamPart.IsSuccess()) {
+ UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString());
+ break;
+ }
+
+ if (streamPart.HasResultSet()) {
+ auto resultSet = streamPart.ExtractResultSet();
+
+ NYdb::TResultSetParser parser(resultSet);
+ while (parser.TryNextRow()) {
+ auto value = parser.ColumnParser("QueryText").GetOptionalUtf8();
+ UNIT_ASSERT(value);
+ rowsCount++;
+ }
+ }
+ }
+ UNIT_ASSERT(rowsCount == 1);
+ }
+
+ auto prepareResult = session.PrepareDataQuery(Q_(R"(
+ SELECT COUNT(*) FROM `/Root/LargeTable` WHERE SUBSTRING(DataText, 50, 5) = "33333";
+ )")).GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(prepareResult.GetStatus(), NYdb::EStatus::SUCCESS, prepareResult.GetIssues().ToString());
+ auto dataQuery = prepareResult.GetQuery();
+
+ auto settings = TExecDataQuerySettings();
+ settings.CancelAfter(TDuration::MilliSeconds(100));
+
+ auto result = dataQuery.Execute(TTxControl::BeginTx().CommitTx(), settings).GetValueSync();
+
+ result.GetIssues().PrintTo(Cerr);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NYdb::EStatus::CANCELLED);
+
+ {
+ TStringStream request;
+ request << "SELECT * FROM `/Root/.sys/top_queries_by_read_bytes_one_hour` ORDER BY Duration";
+
+ auto it = db.StreamExecuteScanQuery(request.Str()).GetValueSync();
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+
+ ui64 queryCount = 0;
+ ui64 rowsCount = 0;
+ for (;;) {
+ auto streamPart = it.ReadNext().GetValueSync();
+ if (!streamPart.IsSuccess()) {
+ UNIT_ASSERT_C(streamPart.EOS(), streamPart.GetIssues().ToString());
+ break;
+ }
+
+ if (streamPart.HasResultSet()) {
+ auto resultSet = streamPart.ExtractResultSet();
+
+ NYdb::TResultSetParser parser(resultSet);
+ while (parser.TryNextRow()) {
+ auto value = parser.ColumnParser("QueryText").GetOptionalUtf8();
+ UNIT_ASSERT(value);
+ if (*value == request.Str()) {
+ queryCount++;
+ }
+ rowsCount++;
+ }
+ }
+ }
+
+ UNIT_ASSERT(queryCount == 1);
+ UNIT_ASSERT(rowsCount == 2);
+ }
+}
+
} // suite
} // namespace NKqp