diff options
author | ns-vasilev <ns-vasilev@yandex-team.com> | 2023-11-09 11:52:20 +0300 |
---|---|---|
committer | ns-vasilev <ns-vasilev@yandex-team.com> | 2023-11-09 12:59:34 +0300 |
commit | aa2006c826045bfe38a22ff24782f23c41cc2254 (patch) | |
tree | 78ef1b388f5d77045e1787b15094b52514c77563 | |
parent | 2bf93ed892f5bd35bc74f0e2f1b34768eab59a92 (diff) | |
download | ydb-aa2006c826045bfe38a22ff24782f23c41cc2254.tar.gz |
KIKIMR-19998: generic for oltp
fix
fix
fix
fix
fix
debug
-rw-r--r-- | ydb/core/grpc_services/query/rpc_execute_query.cpp | 10 | ||||
-rw-r--r-- | ydb/library/workload/kv_workload.cpp | 8 | ||||
-rw-r--r-- | ydb/library/workload/workload_query_generator.h | 4 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_workload.cpp | 69 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_workload.h | 3 |
5 files changed, 79 insertions, 15 deletions
diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp index 03eda749d8..6016b88ab8 100644 --- a/ydb/core/grpc_services/query/rpc_execute_query.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp @@ -2,11 +2,12 @@ #include <ydb/core/actorlib_impl/long_timer.h> #include <ydb/core/base/appdata.h> -#include <ydb/library/ydb_issue/issue_helpers.h> +#include <ydb/core/grpc_services/audit_dml_operations.h> #include <ydb/core/grpc_services/base/base.h> +#include <ydb/core/grpc_services/cancelation/cancelation_event.h> #include <ydb/core/grpc_services/rpc_kqp_base.h> -#include <ydb/core/grpc_services/audit_dml_operations.h> #include <ydb/core/kqp/executer_actor/kqp_executer.h> +#include <ydb/library/ydb_issue/issue_helpers.h> #include <ydb/public/api/protos/ydb_query.pb.h> #include <library/cpp/actors/core/actor_bootstrapped.h> @@ -210,6 +211,7 @@ private: HFunc(TRpcServices::TEvGrpcNextReply, Handle); HFunc(NKqp::TEvKqpExecuter::TEvStreamData, Handle); HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle); + HFunc(NKikimr::NGRpcService::TEvSubscribeGrpcCancel, Handle); default: UnexpectedEvent(__func__, ev); } @@ -276,6 +278,10 @@ private: } } + void Handle(NKikimr::NGRpcService::TEvSubscribeGrpcCancel::TPtr&, const TActorContext&) { + // Ignore event now + } + void Handle(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx) { switch ((EWakeupTag) ev->Get()->Tag) { case EWakeupTag::ClientLostTag: diff --git a/ydb/library/workload/kv_workload.cpp b/ydb/library/workload/kv_workload.cpp index 00c048c82c..95cb2f3541 100644 --- a/ydb/library/workload/kv_workload.cpp +++ b/ydb/library/workload/kv_workload.cpp @@ -356,7 +356,7 @@ TQueryInfoList TKvWorkloadGenerator::Mixed() { auto upsertQuery = Upsert(std::move(rows)); - upsertQuery.front().DataQueryResultCallback = [](NYdb::NTable::TDataQueryResult queryResult) { + auto callback = [](auto queryResult) { if (queryResult.IsSuccess()) { // Note: helps to keep old values too if (RandomNumber<ui32>(1000) == 0) { @@ -364,6 +364,8 @@ TQueryInfoList TKvWorkloadGenerator::Mixed() { } } }; + upsertQuery.front().DataQueryResultCallback = callback; + upsertQuery.front().GenericQueryResultCallback = callback; return upsertQuery; } else { // read @@ -394,7 +396,7 @@ TQueryInfoList TKvWorkloadGenerator::Mixed() { auto selectQuery = Select(std::move(rows)); if (checkRow) { - selectQuery.front().DataQueryResultCallback = [](NYdb::NTable::TDataQueryResult queryResult) { + auto callback = [](auto queryResult) { if (queryResult.IsSuccess()) { TVector<TRow> readRows; for (auto& resultSet : queryResult.GetResultSets()) { @@ -407,6 +409,8 @@ TQueryInfoList TKvWorkloadGenerator::Mixed() { VerifyRows(lastRow, std::move(readRows), queryStatus); } }; + selectQuery.front().DataQueryResultCallback = callback; + selectQuery.front().GenericQueryResultCallback = callback; } return selectQuery; diff --git a/ydb/library/workload/workload_query_generator.h b/ydb/library/workload/workload_query_generator.h index 19815e95b1..bef3bd2130 100644 --- a/ydb/library/workload/workload_query_generator.h +++ b/ydb/library/workload/workload_query_generator.h @@ -1,7 +1,8 @@ #pragma once -#include "ydb/public/sdk/cpp/client/ydb_table/table.h" #include <ydb/public/sdk/cpp/client/ydb_params/params.h> +#include <ydb/public/sdk/cpp/client/ydb_query/client.h> +#include <ydb/public/sdk/cpp/client/ydb_table/table.h> #include <ydb/public/sdk/cpp/client/ydb_value/value.h> #include <list> @@ -31,6 +32,7 @@ struct TQueryInfo { std::optional<std::function<void(NYdb::NTable::TReadRowsResult)>> ReadRowsResultCallback; std::optional<std::function<void(NYdb::NTable::TDataQueryResult)>> DataQueryResultCallback; + std::optional<std::function<void(NYdb::NQuery::TExecuteQueryResult)>> GenericQueryResultCallback; }; using TQueryInfoList = std::list<TQueryInfo>; diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp index a30f2736f4..15cf3262f0 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_workload.cpp @@ -62,6 +62,7 @@ TWorkloadCommand::TWorkloadCommand(const TString& name, const std::initializer_l , WindowSec(0) , Quiet(false) , PrintTimestamp(false) + , QueryExecuterType() , WindowHist(60000, 2) // highestTrackableValue 60000ms = 60s, precision 2 , TotalHist(60000, 2) , TotalRetries(0) @@ -89,6 +90,8 @@ void TWorkloadCommand::Config(TConfig& config) { .DefaultValue(800).StoreResult(&CancelAfterTimeoutMs); config.Opts->AddLongOption("window", "Window duration in seconds.") .DefaultValue(1).StoreResult(&WindowSec); + config.Opts->AddLongOption("executer", "Query executer type (data or generic).") + .DefaultValue("data").StoreResult(&QueryExecuterType); } void TWorkloadCommand::PrepareForRun(TConfig& config) { @@ -104,23 +107,38 @@ void TWorkloadCommand::PrepareForRun(TConfig& config) { driverConfig.UseSecureConnection(config.CaCerts); } Driver = std::make_unique<NYdb::TDriver>(NYdb::TDriver(driverConfig)); - auto tableClientSettings = NTable::TClientSettings() - .SessionPoolSettings( - NTable::TSessionPoolSettings() - .MaxActiveSessions(10+Threads)); - TableClient = std::make_unique<NTable::TTableClient>(*Driver, tableClientSettings); + if (QueryExecuterType == "data") { + auto tableClientSettings = NTable::TClientSettings() + .SessionPoolSettings( + NTable::TSessionPoolSettings() + .MaxActiveSessions(10+Threads)); + TableClient = std::make_unique<NTable::TTableClient>(*Driver, tableClientSettings); + } else if (QueryExecuterType == "generic") { + auto queryClientSettings = NQuery::TClientSettings() + .SessionPoolSettings( + NQuery::TSessionPoolSettings() + .MaxActiveSessions(10+Threads)); + QueryClient = std::make_unique<NQuery::TQueryClient>(*Driver, queryClientSettings); + } else { + Y_FAIL_S("Unexpected executor Type: " + QueryExecuterType); + } } void TWorkloadCommand::WorkerFn(int taskId, TWorkloadQueryGenPtr workloadGen, const int type) { - auto querySettings = NYdb::NTable::TExecDataQuerySettings() + const auto dataQuerySettings = NYdb::NTable::TExecDataQuerySettings() .KeepInQueryCache(true) .OperationTimeout(TDuration::MilliSeconds(OperationTimeoutMs)) .ClientTimeout(TDuration::MilliSeconds(ClientTimeoutMs)) .CancelAfter(TDuration::MilliSeconds(CancelAfterTimeoutMs)); + const auto genericQuerySettings = NYdb::NQuery::TExecuteQuerySettings() + .ClientTimeout(TDuration::MilliSeconds(ClientTimeoutMs)); int retryCount = -1; - NYdbWorkload::TQueryInfo queryInfo; - auto runQuery = [this, &queryInfo, &querySettings, &retryCount] (NYdb::NTable::TSession session) -> NYdb::TStatus { + + auto runTableClient = [this, &queryInfo, &dataQuerySettings, &retryCount] (NYdb::NTable::TSession session) -> NYdb::TStatus { + if (!TableClient) { + Y_FAIL_S("Only data query executer type supported."); + } ++retryCount; if (queryInfo.AlterTable) { auto result = TableClient->RetryOperationSync([&queryInfo](NTable::TSession session) { @@ -137,7 +155,7 @@ void TWorkloadCommand::WorkerFn(int taskId, TWorkloadQueryGenPtr workloadGen, co } else { auto result = session.ExecuteDataQuery(queryInfo.Query.c_str(), NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx(), - queryInfo.Params, querySettings + queryInfo.Params, dataQuerySettings ).GetValueSync(); if (queryInfo.DataQueryResultCallback) { queryInfo.DataQueryResultCallback.value()(result); @@ -146,6 +164,37 @@ void TWorkloadCommand::WorkerFn(int taskId, TWorkloadQueryGenPtr workloadGen, co } }; + auto runQueryClient = [this, &queryInfo, &genericQuerySettings, &retryCount] (NYdb::NQuery::TSession session) -> NYdb::NQuery::TAsyncExecuteQueryResult { + if (!QueryClient) { + Y_FAIL_S("Only generic query executer type supported."); + } + ++retryCount; + if (queryInfo.AlterTable) { + Y_FAIL_S("Generic query doesn't support alter table."); + } else if (queryInfo.UseReadRows) { + Y_FAIL_S("Generic query doesn't support readrows."); + } else { + auto result = session.ExecuteQuery(queryInfo.Query.c_str(), + NYdb::NQuery::TTxControl::BeginTx(NYdb::NQuery::TTxSettings::SerializableRW()).CommitTx(), + queryInfo.Params, genericQuerySettings + ); + return result; + } + }; + + auto runQuery = [this, &runQueryClient, &runTableClient, &queryInfo]() -> NYdb::TStatus { + Y_ENSURE_BT(TableClient || QueryClient); + if (TableClient) { + return TableClient->RetryOperationSync(runTableClient); + } else { + auto result = QueryClient->RetryQuery(runQueryClient).GetValueSync(); + if (queryInfo.GenericQueryResultCallback) { + queryInfo.GenericQueryResultCallback.value()(result); + } + return result; + } + }; + while (Now() < StopTime) { auto queryInfoList = workloadGen->GetWorkload(type); if (queryInfoList.empty()) { @@ -157,7 +206,7 @@ void TWorkloadCommand::WorkerFn(int taskId, TWorkloadQueryGenPtr workloadGen, co NYdbWorkload::TQueryInfoList::iterator it; for (it = queryInfoList.begin(); it != queryInfoList.end(); ++it) { queryInfo = *it; - auto status = TableClient->RetryOperationSync(runQuery); + auto status = runQuery(); if (!status.IsSuccess()) { TotalErrors++; WindowErrors++; diff --git a/ydb/public/lib/ydb_cli/commands/ydb_workload.h b/ydb/public/lib/ydb_cli/commands/ydb_workload.h index 59145fc419..26500fab51 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_workload.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_workload.h @@ -1,6 +1,7 @@ #pragma once #include <ydb/public/lib/ydb_cli/commands/ydb_command.h> +#include <ydb/public/sdk/cpp/client/ydb_query/client.h> #include <library/cpp/histogram/hdr/histogram.h> #include <util/datetime/base.h> @@ -43,6 +44,7 @@ protected: std::unique_ptr<NYdb::TDriver> Driver; std::unique_ptr<NTable::TTableClient> TableClient; + std::unique_ptr<NQuery::TQueryClient> QueryClient; size_t TotalSec; size_t Threads; @@ -52,6 +54,7 @@ protected: unsigned int WindowSec; bool Quiet; bool PrintTimestamp; + TString QueryExecuterType; TInstant StartTime; TInstant StopTime; |