aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorns-vasilev <ns-vasilev@yandex-team.com>2023-11-09 11:52:20 +0300
committerns-vasilev <ns-vasilev@yandex-team.com>2023-11-09 12:59:34 +0300
commitaa2006c826045bfe38a22ff24782f23c41cc2254 (patch)
tree78ef1b388f5d77045e1787b15094b52514c77563
parent2bf93ed892f5bd35bc74f0e2f1b34768eab59a92 (diff)
downloadydb-aa2006c826045bfe38a22ff24782f23c41cc2254.tar.gz
KIKIMR-19998: generic for oltp
fix fix fix fix fix debug
-rw-r--r--ydb/core/grpc_services/query/rpc_execute_query.cpp10
-rw-r--r--ydb/library/workload/kv_workload.cpp8
-rw-r--r--ydb/library/workload/workload_query_generator.h4
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_workload.cpp69
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_workload.h3
5 files changed, 79 insertions, 15 deletions
diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp
index 03eda749d8..6016b88ab8 100644
--- a/ydb/core/grpc_services/query/rpc_execute_query.cpp
+++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp
@@ -2,11 +2,12 @@
#include <ydb/core/actorlib_impl/long_timer.h>
#include <ydb/core/base/appdata.h>
-#include <ydb/library/ydb_issue/issue_helpers.h>
+#include <ydb/core/grpc_services/audit_dml_operations.h>
#include <ydb/core/grpc_services/base/base.h>
+#include <ydb/core/grpc_services/cancelation/cancelation_event.h>
#include <ydb/core/grpc_services/rpc_kqp_base.h>
-#include <ydb/core/grpc_services/audit_dml_operations.h>
#include <ydb/core/kqp/executer_actor/kqp_executer.h>
+#include <ydb/library/ydb_issue/issue_helpers.h>
#include <ydb/public/api/protos/ydb_query.pb.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
@@ -210,6 +211,7 @@ private:
HFunc(TRpcServices::TEvGrpcNextReply, Handle);
HFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle);
HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle);
+ HFunc(NKikimr::NGRpcService::TEvSubscribeGrpcCancel, Handle);
default:
UnexpectedEvent(__func__, ev);
}
@@ -276,6 +278,10 @@ private:
}
}
+ void Handle(NKikimr::NGRpcService::TEvSubscribeGrpcCancel::TPtr&, const TActorContext&) {
+ // Ignore event now
+ }
+
void Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx) {
switch ((EWakeupTag) ev->Get()->Tag) {
case EWakeupTag::ClientLostTag:
diff --git a/ydb/library/workload/kv_workload.cpp b/ydb/library/workload/kv_workload.cpp
index 00c048c82c..95cb2f3541 100644
--- a/ydb/library/workload/kv_workload.cpp
+++ b/ydb/library/workload/kv_workload.cpp
@@ -356,7 +356,7 @@ TQueryInfoList TKvWorkloadGenerator::Mixed() {
auto upsertQuery = Upsert(std::move(rows));
- upsertQuery.front().DataQueryResultCallback = [](NYdb::NTable::TDataQueryResult queryResult) {
+ auto callback = [](auto queryResult) {
if (queryResult.IsSuccess()) {
// Note: helps to keep old values too
if (RandomNumber<ui32>(1000) == 0) {
@@ -364,6 +364,8 @@ TQueryInfoList TKvWorkloadGenerator::Mixed() {
}
}
};
+ upsertQuery.front().DataQueryResultCallback = callback;
+ upsertQuery.front().GenericQueryResultCallback = callback;
return upsertQuery;
} else { // read
@@ -394,7 +396,7 @@ TQueryInfoList TKvWorkloadGenerator::Mixed() {
auto selectQuery = Select(std::move(rows));
if (checkRow) {
- selectQuery.front().DataQueryResultCallback = [](NYdb::NTable::TDataQueryResult queryResult) {
+ auto callback = [](auto queryResult) {
if (queryResult.IsSuccess()) {
TVector<TRow> readRows;
for (auto& resultSet : queryResult.GetResultSets()) {
@@ -407,6 +409,8 @@ TQueryInfoList TKvWorkloadGenerator::Mixed() {
VerifyRows(lastRow, std::move(readRows), queryStatus);
}
};
+ selectQuery.front().DataQueryResultCallback = callback;
+ selectQuery.front().GenericQueryResultCallback = callback;
}
return selectQuery;
diff --git a/ydb/library/workload/workload_query_generator.h b/ydb/library/workload/workload_query_generator.h
index 19815e95b1..bef3bd2130 100644
--- a/ydb/library/workload/workload_query_generator.h
+++ b/ydb/library/workload/workload_query_generator.h
@@ -1,7 +1,8 @@
#pragma once
-#include "ydb/public/sdk/cpp/client/ydb_table/table.h"
#include <ydb/public/sdk/cpp/client/ydb_params/params.h>
+#include <ydb/public/sdk/cpp/client/ydb_query/client.h>
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/public/sdk/cpp/client/ydb_value/value.h>
#include <list>
@@ -31,6 +32,7 @@ struct TQueryInfo {
std::optional<std::function<void(NYdb::NTable::TReadRowsResult)>> ReadRowsResultCallback;
std::optional<std::function<void(NYdb::NTable::TDataQueryResult)>> DataQueryResultCallback;
+ std::optional<std::function<void(NYdb::NQuery::TExecuteQueryResult)>> GenericQueryResultCallback;
};
using TQueryInfoList = std::list<TQueryInfo>;
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp
index a30f2736f4..15cf3262f0 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp
@@ -62,6 +62,7 @@ TWorkloadCommand::TWorkloadCommand(const TString& name, const std::initializer_l
, WindowSec(0)
, Quiet(false)
, PrintTimestamp(false)
+ , QueryExecuterType()
, WindowHist(60000, 2) // highestTrackableValue 60000ms = 60s, precision 2
, TotalHist(60000, 2)
, TotalRetries(0)
@@ -89,6 +90,8 @@ void TWorkloadCommand::Config(TConfig& config) {
.DefaultValue(800).StoreResult(&CancelAfterTimeoutMs);
config.Opts->AddLongOption("window", "Window duration in seconds.")
.DefaultValue(1).StoreResult(&WindowSec);
+ config.Opts->AddLongOption("executer", "Query executer type (data or generic).")
+ .DefaultValue("data").StoreResult(&QueryExecuterType);
}
void TWorkloadCommand::PrepareForRun(TConfig& config) {
@@ -104,23 +107,38 @@ void TWorkloadCommand::PrepareForRun(TConfig& config) {
driverConfig.UseSecureConnection(config.CaCerts);
}
Driver = std::make_unique<NYdb::TDriver>(NYdb::TDriver(driverConfig));
- auto tableClientSettings = NTable::TClientSettings()
- .SessionPoolSettings(
- NTable::TSessionPoolSettings()
- .MaxActiveSessions(10+Threads));
- TableClient = std::make_unique<NTable::TTableClient>(*Driver, tableClientSettings);
+ if (QueryExecuterType == "data") {
+ auto tableClientSettings = NTable::TClientSettings()
+ .SessionPoolSettings(
+ NTable::TSessionPoolSettings()
+ .MaxActiveSessions(10+Threads));
+ TableClient = std::make_unique<NTable::TTableClient>(*Driver, tableClientSettings);
+ } else if (QueryExecuterType == "generic") {
+ auto queryClientSettings = NQuery::TClientSettings()
+ .SessionPoolSettings(
+ NQuery::TSessionPoolSettings()
+ .MaxActiveSessions(10+Threads));
+ QueryClient = std::make_unique<NQuery::TQueryClient>(*Driver, queryClientSettings);
+ } else {
+ Y_FAIL_S("Unexpected executor Type: " + QueryExecuterType);
+ }
}
void TWorkloadCommand::WorkerFn(int taskId, TWorkloadQueryGenPtr workloadGen, const int type) {
- auto querySettings = NYdb::NTable::TExecDataQuerySettings()
+ const auto dataQuerySettings = NYdb::NTable::TExecDataQuerySettings()
.KeepInQueryCache(true)
.OperationTimeout(TDuration::MilliSeconds(OperationTimeoutMs))
.ClientTimeout(TDuration::MilliSeconds(ClientTimeoutMs))
.CancelAfter(TDuration::MilliSeconds(CancelAfterTimeoutMs));
+ const auto genericQuerySettings = NYdb::NQuery::TExecuteQuerySettings()
+ .ClientTimeout(TDuration::MilliSeconds(ClientTimeoutMs));
int retryCount = -1;
-
NYdbWorkload::TQueryInfo queryInfo;
- auto runQuery = [this, &queryInfo, &querySettings, &retryCount] (NYdb::NTable::TSession session) -> NYdb::TStatus {
+
+ auto runTableClient = [this, &queryInfo, &dataQuerySettings, &retryCount] (NYdb::NTable::TSession session) -> NYdb::TStatus {
+ if (!TableClient) {
+ Y_FAIL_S("Only data query executer type supported.");
+ }
++retryCount;
if (queryInfo.AlterTable) {
auto result = TableClient->RetryOperationSync([&queryInfo](NTable::TSession session) {
@@ -137,7 +155,7 @@ void TWorkloadCommand::WorkerFn(int taskId, TWorkloadQueryGenPtr workloadGen, co
} else {
auto result = session.ExecuteDataQuery(queryInfo.Query.c_str(),
NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx(),
- queryInfo.Params, querySettings
+ queryInfo.Params, dataQuerySettings
).GetValueSync();
if (queryInfo.DataQueryResultCallback) {
queryInfo.DataQueryResultCallback.value()(result);
@@ -146,6 +164,37 @@ void TWorkloadCommand::WorkerFn(int taskId, TWorkloadQueryGenPtr workloadGen, co
}
};
+ auto runQueryClient = [this, &queryInfo, &genericQuerySettings, &retryCount] (NYdb::NQuery::TSession session) -> NYdb::NQuery::TAsyncExecuteQueryResult {
+ if (!QueryClient) {
+ Y_FAIL_S("Only generic query executer type supported.");
+ }
+ ++retryCount;
+ if (queryInfo.AlterTable) {
+ Y_FAIL_S("Generic query doesn't support alter table.");
+ } else if (queryInfo.UseReadRows) {
+ Y_FAIL_S("Generic query doesn't support readrows.");
+ } else {
+ auto result = session.ExecuteQuery(queryInfo.Query.c_str(),
+ NYdb::NQuery::TTxControl::BeginTx(NYdb::NQuery::TTxSettings::SerializableRW()).CommitTx(),
+ queryInfo.Params, genericQuerySettings
+ );
+ return result;
+ }
+ };
+
+ auto runQuery = [this, &runQueryClient, &runTableClient, &queryInfo]() -> NYdb::TStatus {
+ Y_ENSURE_BT(TableClient || QueryClient);
+ if (TableClient) {
+ return TableClient->RetryOperationSync(runTableClient);
+ } else {
+ auto result = QueryClient->RetryQuery(runQueryClient).GetValueSync();
+ if (queryInfo.GenericQueryResultCallback) {
+ queryInfo.GenericQueryResultCallback.value()(result);
+ }
+ return result;
+ }
+ };
+
while (Now() < StopTime) {
auto queryInfoList = workloadGen->GetWorkload(type);
if (queryInfoList.empty()) {
@@ -157,7 +206,7 @@ void TWorkloadCommand::WorkerFn(int taskId, TWorkloadQueryGenPtr workloadGen, co
NYdbWorkload::TQueryInfoList::iterator it;
for (it = queryInfoList.begin(); it != queryInfoList.end(); ++it) {
queryInfo = *it;
- auto status = TableClient->RetryOperationSync(runQuery);
+ auto status = runQuery();
if (!status.IsSuccess()) {
TotalErrors++;
WindowErrors++;
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload.h b/ydb/public/lib/ydb_cli/commands/ydb_workload.h
index 59145fc419..26500fab51 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_workload.h
+++ b/ydb/public/lib/ydb_cli/commands/ydb_workload.h
@@ -1,6 +1,7 @@
#pragma once
#include <ydb/public/lib/ydb_cli/commands/ydb_command.h>
+#include <ydb/public/sdk/cpp/client/ydb_query/client.h>
#include <library/cpp/histogram/hdr/histogram.h>
#include <util/datetime/base.h>
@@ -43,6 +44,7 @@ protected:
std::unique_ptr<NYdb::TDriver> Driver;
std::unique_ptr<NTable::TTableClient> TableClient;
+ std::unique_ptr<NQuery::TQueryClient> QueryClient;
size_t TotalSec;
size_t Threads;
@@ -52,6 +54,7 @@ protected:
unsigned int WindowSec;
bool Quiet;
bool PrintTimestamp;
+ TString QueryExecuterType;
TInstant StartTime;
TInstant StopTime;