diff options
author | Pisarenko Grigoriy <grigoriypisar@ydb.tech> | 2025-02-28 18:00:43 +0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-28 16:00:43 +0300 |
commit | 1547cff981d4a39f2a007760ca202b1071a95cd3 (patch) | |
tree | 4a7b73f6078532ffd2c0250b51ab794d8215ac66 | |
parent | 45f8fc2ec6626a6735e7578d05997363242b71e7 (diff) | |
download | ydb-1547cff981d4a39f2a007760ca202b1071a95cd3.tar.gz |
YQ-3561 FQrun supported remote databases (#15016)
30 files changed, 969 insertions, 314 deletions
diff --git a/ydb/core/fq/libs/actors/pending_fetcher.cpp b/ydb/core/fq/libs/actors/pending_fetcher.cpp index 3975e59ccc..def090ba2b 100644 --- a/ydb/core/fq/libs/actors/pending_fetcher.cpp +++ b/ydb/core/fq/libs/actors/pending_fetcher.cpp @@ -105,6 +105,13 @@ struct TEvPrivate { constexpr auto CLEANUP_PERIOD = TDuration::Seconds(60); +TDuration GetPendingFetchPeriod(const NFq::NConfig::TPendingFetcherConfig& config) { + if (const auto periodMs = config.GetPendingFetchPeriodMs()) { + return TDuration::MilliSeconds(periodMs); + } + return TDuration::Seconds(1); +} + } // namespace class TPendingFetcher : public NActors::TActorBootstrapped<TPendingFetcher> { @@ -170,6 +177,7 @@ public: , ServiceCounters(serviceCounters, "pending_fetcher") , GetTaskCounters("GetTask", ServiceCounters.Counters) , FailedStatusCodeCounters(MakeIntrusive<TStatusCodeByScopeCounters>("IntermediateFailedStatusCode", ServiceCounters.RootCounters->GetSubgroup("component", "QueryDiagnostic"))) + , PendingFetchPeriod(GetPendingFetchPeriod(config.GetPendingFetcher())) , CredentialsFactory(credentialsFactory) , S3Gateway(s3Gateway) , ConnectorClient(connectorClient) @@ -518,7 +526,7 @@ private: IModuleResolver::TPtr ModuleResolver; bool HasRunningRequest = false; - const TDuration PendingFetchPeriod = TDuration::Seconds(1); + const TDuration PendingFetchPeriod; TActorId DatabaseResolver; diff --git a/ydb/core/fq/libs/config/protos/pending_fetcher.proto b/ydb/core/fq/libs/config/protos/pending_fetcher.proto index 52f0dd1368..25f0203a6c 100644 --- a/ydb/core/fq/libs/config/protos/pending_fetcher.proto +++ b/ydb/core/fq/libs/config/protos/pending_fetcher.proto @@ -8,4 +8,5 @@ option java_package = "ru.yandex.kikimr.proto"; message TPendingFetcherConfig { bool Enabled = 1; + uint64 PendingFetchPeriodMs = 2; // 1s by default } diff --git a/ydb/core/fq/libs/row_dispatcher/leader_election.cpp b/ydb/core/fq/libs/row_dispatcher/leader_election.cpp index 6e709b594e..77ce3d0bd6 100644 --- a/ydb/core/fq/libs/row_dispatcher/leader_election.cpp +++ b/ydb/core/fq/libs/row_dispatcher/leader_election.cpp @@ -180,7 +180,7 @@ TLeaderElection::TLeaderElection( : Config(config) , CredentialsProviderFactory(credentialsProviderFactory) , YqSharedResources(yqSharedResources) - , YdbConnection(NewYdbConnection(config.GetDatabase(), credentialsProviderFactory, yqSharedResources->UserSpaceYdbDriver)) + , YdbConnection(config.GetLocalMode() ? nullptr : NewYdbConnection(config.GetDatabase(), credentialsProviderFactory, yqSharedResources->UserSpaceYdbDriver)) , TablePathPrefix(JoinPath(config.GetDatabase().GetDatabase(), config.GetCoordinationNodePath())) , CoordinationNodePath(JoinPath(TablePathPrefix, tenant)) , ParentId(parentId) diff --git a/ydb/tests/tools/fqrun/README.md b/ydb/tests/tools/fqrun/README.md index dcaee95ccc..14c0dc9b52 100644 --- a/ydb/tests/tools/fqrun/README.md +++ b/ydb/tests/tools/fqrun/README.md @@ -8,7 +8,7 @@ For profiling memory allocations build fqrun with ya make flags `-D PROFILE_MEMO * `flame_graph.sh` - script for collecting flame graphs in svg format, usage: ```(bash) - ./flame_graph.sh [graph collection time in seconds] + ./flame_graph.sh [graph collection time in seconds] [use sudo] ``` ## Examples diff --git a/ydb/tests/tools/fqrun/configuration/fq_config.conf b/ydb/tests/tools/fqrun/configuration/fq_config.conf index 576b0d394b..6d7c90f6b1 100644 --- a/ydb/tests/tools/fqrun/configuration/fq_config.conf +++ b/ydb/tests/tools/fqrun/configuration/fq_config.conf @@ -3,7 +3,6 @@ EnableDynamicNameservice: true EnableTaskCounters: true CheckpointCoordinator { - Enabled: true CheckpointingPeriodMillis: 30000 MaxInflight: 1 @@ -24,7 +23,7 @@ Common { MdbGateway: "https://mdb.api.cloud.yandex.net:443" MdbTransformHost: false ObjectStorageEndpoint: "https://storage.yandexcloud.net" - IdsPrefix: "kr" + IdsPrefix: "fr" QueryArtifactsCompressionMethod: "zstd_6" MonitoringEndpoint: "monitoring.api.cloud.yandex.net" KeepInternalErrors: true @@ -42,10 +41,12 @@ Common { ControlPlaneProxy { Enabled: true + RequestTimeout: "1m" } ControlPlaneStorage { Enabled: true + UseInMemory: true StatsMode: STATS_MODE_PROFILE DumpRawStatistics: true TasksBatchSize: 100 @@ -84,6 +85,9 @@ ControlPlaneStorage { OperationTimeoutSec: 60 CancelAfterSec: 60 } + + RetryPolicyMapping { + } } DbPool { @@ -201,10 +205,6 @@ Gateways { } } -Health { - Enabled: true -} - NodesManager { Enabled: true } @@ -223,7 +223,6 @@ PrivateProxy { } QuotasManager { - Enabled: true QuotaDescriptions { SubjectType: "cloud" MetricName: "yq.cpuPercent.count" @@ -244,10 +243,6 @@ QuotasManager { } RateLimiter { - Enabled: true - ControlPlaneEnabled: true - DataPlaneEnabled: true - Database { TablePrefix: "yq/rate_limiter" ClientTimeoutSec: 70 diff --git a/ydb/tests/tools/fqrun/flame_graph.sh b/ydb/tests/tools/fqrun/flame_graph.sh index b6f8fe6da4..9b002f6227 100755 --- a/ydb/tests/tools/fqrun/flame_graph.sh +++ b/ydb/tests/tools/fqrun/flame_graph.sh @@ -2,24 +2,11 @@ set -eux -function cleanup { - sudo rm ./profdata - rm ./profdata.txt -} -trap cleanup EXIT - -if [ $# -gt 1 ]; then +if [ $# -gt 2 ]; then echo "Too many arguments" exit -1 fi -fqrun_pid=$(pgrep -u $USER fqrun) - -sudo perf record -F 50 --call-graph dwarf -g --proc-map-timeout=10000 --pid $fqrun_pid -v -o profdata -- sleep ${1:-'30'} -sudo perf script -i profdata > profdata.txt - SCRIPT_DIR=$(cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd) -flame_graph_tool="$SCRIPT_DIR/../../../../contrib/tools/flame-graph/" - -${flame_graph_tool}/stackcollapse-perf.pl profdata.txt | ${flame_graph_tool}/flamegraph.pl > profdata.svg +$SCRIPT_DIR/../kqprun/flame_graph.sh ${1:-'30'} ${2:-''} fqrun diff --git a/ydb/tests/tools/fqrun/fqrun.cpp b/ydb/tests/tools/fqrun/fqrun.cpp index 99f597d95c..3ebf36fe7c 100644 --- a/ydb/tests/tools/fqrun/fqrun.cpp +++ b/ydb/tests/tools/fqrun/fqrun.cpp @@ -20,27 +20,96 @@ namespace NFqRun { namespace { struct TExecutionOptions { + enum class EExecutionCase { + Stream, + AsyncStream + }; + TString Query; std::vector<FederatedQuery::ConnectionContent> Connections; std::vector<FederatedQuery::BindingContent> Bindings; + ui32 LoopCount = 1; + TDuration LoopDelay; + bool ContinueAfterFail = false; + + EExecutionCase ExecutionCase = EExecutionCase::Stream; + bool HasResults() const { return !Query.empty(); } - TRequestOptions GetQueryOptions() const { + TRequestOptions GetQueryOptions(ui64 queryId) const { return { - .Query = Query + .Query = Query, + .QueryId = queryId }; } void Validate(const TRunnerOptions& runnerOptions) const { - if (!Query && !runnerOptions.FqSettings.MonitoringEnabled && !runnerOptions.FqSettings.GrpcEnabled) { + if (!Query && Connections.empty() && Bindings.empty() && !runnerOptions.FqSettings.MonitoringEnabled && !runnerOptions.FqSettings.GrpcEnabled) { ythrow yexception() << "Nothing to execute and is not running as daemon"; } + ValidateAsyncOptions(runnerOptions.FqSettings.AsyncQueriesSettings); + ValidateTraceOpt(runnerOptions); + } + +private: + ui64 GetNumberOfQueries() const { + if (!Query) { + return 0; + } + return LoopCount ? LoopCount : std::numeric_limits<ui64>::max(); + } + + void ValidateAsyncOptions(const TAsyncQueriesSettings& asyncQueriesSettings) const { + if (asyncQueriesSettings.InFlightLimit && ExecutionCase != EExecutionCase::AsyncStream) { + ythrow yexception() << "In flight limit can not be used without async queries"; + } + + NColorizer::TColors colors = NColorizer::AutoColors(Cout); + const auto numberOfQueries = GetNumberOfQueries(); + if (asyncQueriesSettings.InFlightLimit && asyncQueriesSettings.InFlightLimit > numberOfQueries) { + Cout << colors.Red() << "Warning: inflight limit is " << asyncQueriesSettings.InFlightLimit << ", that is larger than max possible number of queries " << numberOfQueries << colors.Default() << Endl; + } + } + + void ValidateTraceOpt(const TRunnerOptions& runnerOptions) const { + if (runnerOptions.TraceOptAll && !runnerOptions.TraceOptIds.empty()) { + ythrow yexception() << "Trace opt ids can not be used with trace opt all flag"; + } + + const auto numberOfQueries = GetNumberOfQueries(); + for (auto id : runnerOptions.TraceOptIds) { + if (id >= numberOfQueries) { + ythrow yexception() << "Trace opt id " << id << " should be less than number of queries " << numberOfQueries; + } + } } }; +void RunArgumentQuery(ui64 queryId, const TExecutionOptions& executionOptions, TFqRunner& runner) { + NColorizer::TColors colors = NColorizer::AutoColors(Cout); + + switch (executionOptions.ExecutionCase) { + case TExecutionOptions::EExecutionCase::Stream: { + if (!runner.ExecuteStreamQuery(executionOptions.GetQueryOptions(queryId))) { + ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Query execution failed"; + } + Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Fetching query results..." << colors.Default() << Endl; + if (!runner.FetchQueryResults()) { + ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Fetch query results failed"; + } + break; + } + + case TExecutionOptions::EExecutionCase::AsyncStream: { + runner.ExecuteQueryAsync(executionOptions.GetQueryOptions(queryId)); + break; + } + } +} + void RunArgumentQueries(const TExecutionOptions& executionOptions, TFqRunner& runner) { NColorizer::TColors colors = NColorizer::AutoColors(Cout); @@ -58,14 +127,31 @@ void RunArgumentQueries(const TExecutionOptions& executionOptions, TFqRunner& ru } } - if (executionOptions.Query) { - Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing query..." << colors.Default() << Endl; - if (!runner.ExecuteStreamQuery(executionOptions.GetQueryOptions())) { - ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Query execution failed"; + if (!executionOptions.Query) { + return; + } + + const size_t numberLoops = executionOptions.LoopCount; + for (size_t queryId = 0; queryId < numberLoops || numberLoops == 0; ++queryId) { + if (queryId > 0) { + Sleep(executionOptions.LoopDelay); + } + + const TInstant startTime = TInstant::Now(); + Cout << colors.Yellow() << startTime.ToIsoStringLocal() << " Executing query"; + if (numberLoops != 1) { + Cout << ", loop " << queryId; } - Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Fetching query results..." << colors.Default() << Endl; - if (!runner.FetchQueryResults()) { - ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Fetch query results failed"; + Cout << "..." << colors.Default() << Endl; + + try { + RunArgumentQuery(queryId, executionOptions, runner); + } catch (const yexception& exception) { + if (executionOptions.ContinueAfterFail) { + Cerr << colors.Red() << CurrentExceptionMessage() << colors.Default() << Endl; + } else { + throw exception; + } } } @@ -111,9 +197,18 @@ void RunScript(const TExecutionOptions& executionOptions, const TRunnerOptions& } class TMain : public TMainBase { + using TBase = TMainBase; using EVerbose = TFqSetupSettings::EVerbose; protected: + void RegisterLogOptions(NLastGetopt::TOpts& options) override { + TBase::RegisterLogOptions(options); + + options.AddLongOption("log-fq", "FQ components log priority") + .RequiredArgument("priority") + .StoreMappedResultT<TString>(&FqLogPriority, GetLogPrioritiesMap("log-fq")); + } + void RegisterOptions(NLastGetopt::TOpts& options) override { options.SetTitle("FqRun -- tool to execute stream queries through FQ proxy"); options.AddHelpOption('h'); @@ -208,6 +303,18 @@ protected: // Outputs + options.AddLongOption('T', "trace-opt", "Print AST in the begin of each transformation") + .OptionalArgument("query-id") + .Handler1([this](const NLastGetopt::TOptsParser* option) { + const auto value = option->CurVal(); + if (!value) { + RunnerOptions.TraceOptAll = true; + } else if (!RunnerOptions.TraceOptIds.emplace(FromString<ui64>(value)).second) { + ythrow yexception() << "Got duplicated trace opt index: " << value; + } + RunnerOptions.FqSettings.EnableTraceOpt = true; + }); + options.AddLongOption("result-file", "File with query results (use '-' to write in stdout)") .RequiredArgument("file") .DefaultValue("-") @@ -226,6 +333,20 @@ protected: // Pipeline settings + TChoices<TExecutionOptions::EExecutionCase> executionCase({ + {"stream", TExecutionOptions::EExecutionCase::Stream}, + {"async-stream", TExecutionOptions::EExecutionCase::AsyncStream} + }); + options.AddLongOption('C', "execution-case", "Type of query for -p argument") + .RequiredArgument("query-type") + .Choices(executionCase.GetChoices()) + .StoreMappedResultT<TString>(&ExecutionOptions.ExecutionCase, executionCase); + + options.AddLongOption("inflight-limit", "In flight limit for async queries (use 0 for unlimited)") + .RequiredArgument("uint") + .DefaultValue(0) + .StoreResult(&RunnerOptions.FqSettings.AsyncQueriesSettings.InFlightLimit); + options.AddLongOption("verbose", TStringBuilder() << "Common verbose level (max level " << static_cast<ui32>(EVerbose::Max) - 1 << ")") .RequiredArgument("uint") .DefaultValue(static_cast<ui8>(EVerbose::Info)) @@ -233,6 +354,73 @@ protected: return static_cast<EVerbose>(std::min(value, static_cast<ui8>(EVerbose::Max))); }); + TChoices<TAsyncQueriesSettings::EVerbose> verbose({ + {"each-query", TAsyncQueriesSettings::EVerbose::EachQuery}, + {"final", TAsyncQueriesSettings::EVerbose::Final} + }); + options.AddLongOption("async-verbose", "Verbose type for async queries") + .RequiredArgument("type") + .DefaultValue("each-query") + .Choices(verbose.GetChoices()) + .StoreMappedResultT<TString>(&RunnerOptions.FqSettings.AsyncQueriesSettings.Verbose, verbose); + + options.AddLongOption("ping-period", "Query ping period in milliseconds") + .RequiredArgument("uint") + .DefaultValue(100) + .StoreMappedResultT<ui64>(&RunnerOptions.PingPeriod, &TDuration::MilliSeconds<ui64>); + + options.AddLongOption("loop-count", "Number of runs of the query (use 0 to start infinite loop)") + .RequiredArgument("uint") + .DefaultValue(ExecutionOptions.LoopCount) + .StoreResult(&ExecutionOptions.LoopCount); + + options.AddLongOption("loop-delay", "Delay in milliseconds between loop steps") + .RequiredArgument("uint") + .DefaultValue(0) + .StoreMappedResultT<ui64>(&ExecutionOptions.LoopDelay, &TDuration::MilliSeconds<ui64>); + + options.AddLongOption("continue-after-fail", "Don't not stop requests execution after fails") + .NoArgument() + .SetFlag(&ExecutionOptions.ContinueAfterFail); + + // Cluster settings + + options.AddLongOption("cp-storage", "Start real control plane storage instead of in memory (will use local database by default), token variable CP_STORAGE_TOKEN") + .OptionalArgument("database@endpoint") + .Handler1([this](const NLastGetopt::TOptsParser* option) { + RunnerOptions.FqSettings.EnableCpStorage = true; + if (const auto value = option->CurVal()) { + RunnerOptions.FqSettings.CpStorageDatabase = TExternalDatabase::Parse(value, "CP_STORAGE_TOKEN"); + } + }); + + options.AddLongOption("checkpoints", "Start checkpoint coordinator (will use local database by default), token variable CHECKPOINTS_TOKEN") + .OptionalArgument("database@endpoint") + .Handler1([this](const NLastGetopt::TOptsParser* option) { + RunnerOptions.FqSettings.EnableCheckpoints = true; + if (const auto value = option->CurVal()) { + RunnerOptions.FqSettings.CheckpointsDatabase = TExternalDatabase::Parse(value, "CHECKPOINTS_TOKEN"); + } + }); + + options.AddLongOption("quotas", "Start FQ quotas service and rate limiter (will be created local rate limiter by default), token variable QUOTAS_TOKEN") + .OptionalArgument("database@endpoint") + .Handler1([this](const NLastGetopt::TOptsParser* option) { + RunnerOptions.FqSettings.EnableQuotas = true; + if (const auto value = option->CurVal()) { + RunnerOptions.FqSettings.RateLimiterDatabase = TExternalDatabase::Parse(value, "QUOTAS_TOKEN"); + } + }); + + options.AddLongOption("row-dispatcher", TStringBuilder() << "Use real coordinator for row dispatcher (will use local database by default), token variable ROW_DISPATCHER_TOKEN") + .OptionalArgument("database@endpoint") + .Handler1([this](const NLastGetopt::TOptsParser* option) { + RunnerOptions.FqSettings.EnableRemoteRd = true; + if (const auto value = option->CurVal()) { + RunnerOptions.FqSettings.RowDispatcherDatabase = TExternalDatabase::Parse(value, "ROW_DISPATCHER_TOKEN"); + } + }); + RegisterKikimrOptions(options, RunnerOptions.FqSettings); } @@ -242,16 +430,17 @@ protected: RunnerOptions.FqSettings.YqlToken = GetEnv(YQL_TOKEN_VARIABLE); RunnerOptions.FqSettings.FunctionRegistry = CreateFunctionRegistry().Get(); - auto& gatewayConfig = *RunnerOptions.FqSettings.FqConfig.mutable_gateways(); + auto& fqConfig = RunnerOptions.FqSettings.FqConfig; + auto& gatewayConfig = *fqConfig.mutable_gateways(); FillTokens(gatewayConfig.mutable_pq()); FillTokens(gatewayConfig.mutable_s3()); FillTokens(gatewayConfig.mutable_generic()); FillTokens(gatewayConfig.mutable_ydb()); FillTokens(gatewayConfig.mutable_solomon()); - auto& logConfig = RunnerOptions.FqSettings.LogConfig; - logConfig.SetDefaultLevel(NActors::NLog::EPriority::PRI_CRIT); - FillLogConfig(logConfig); + fqConfig.MutablePendingFetcher()->SetPendingFetchPeriodMs(RunnerOptions.PingPeriod.MilliSeconds()); + + SetupLogsConfig(); if (!PqFilesMapping.empty()) { auto fileGateway = MakeIntrusive<NYql::TDummyPqGateway>(); @@ -301,15 +490,44 @@ private: } } + void SetupLogsConfig() { + auto& logConfig = RunnerOptions.FqSettings.LogConfig; + + logConfig.SetDefaultLevel(DefaultLogPriority.value_or(NActors::NLog::EPriority::PRI_CRIT)); + + if (FqLogPriority) { + std::unordered_map<NKikimrServices::EServiceKikimr, NActors::NLog::EPriority> fqLogPriorities; + std::unordered_set<TString> prefixes = { + "FQ_", "YQ_", "STREAMS", "PUBLIC_HTTP" + }; + auto descriptor = NKikimrServices::EServiceKikimr_descriptor(); + for (int i = 0; i < descriptor->value_count(); ++i) { + const auto service = static_cast<NKikimrServices::EServiceKikimr>(descriptor->value(i)->number()); + const auto& servicceStr = NKikimrServices::EServiceKikimr_Name(service); + for (const auto& prefix : prefixes) { + if (servicceStr.StartsWith(prefix)) { + fqLogPriorities.emplace(service, *FqLogPriority); + break; + } + } + } + ModifyLogPriorities(fqLogPriorities, logConfig); + } + + ModifyLogPriorities(LogPriorities, logConfig); + } + private: TExecutionOptions ExecutionOptions; TRunnerOptions RunnerOptions; - std::unordered_map<TString, NYql::TDummyTopic> PqFilesMapping; struct TTopicSettings { bool CancelOnFileFinish = false; }; std::unordered_map<TString, TTopicSettings> TopicsSettings; + std::unordered_map<TString, NYql::TDummyTopic> PqFilesMapping; + + std::optional<NActors::NLog::EPriority> FqLogPriority; }; } // anonymous namespace diff --git a/ydb/tests/tools/fqrun/src/actors.cpp b/ydb/tests/tools/fqrun/src/actors.cpp new file mode 100644 index 0000000000..9ce3706c4d --- /dev/null +++ b/ydb/tests/tools/fqrun/src/actors.cpp @@ -0,0 +1,129 @@ +#include "actors.h" +#include "common.h" + +#include <ydb/library/actors/core/actor_bootstrapped.h> + +using namespace NKikimrRun; + +namespace NFqRun { + +namespace { + +class TRunQueryActor : public NActors::TActorBootstrapped<TRunQueryActor> { +public: + TRunQueryActor(TQueryRequest request, NThreading::TPromise<TQueryResponse> promise) + : Scope(request.Event->Scope) + , UserSid(request.Event->User) + , Token(request.Event->Token) + , PingPeriod(request.PingPeriod) + , Request(std::move(request.Event)) + , Promise(promise) + {} + + void Bootstrap() { + Send(NFq::ControlPlaneProxyActorId(), std::move(Request)); + + Become(&TRunQueryActor::StateFunc); + } + + STRICT_STFUNC(StateFunc, + hFunc(NFq::TEvControlPlaneProxy::TEvCreateQueryResponse, Handle); + hFunc(NFq::TEvControlPlaneProxy::TEvDescribeQueryResponse, Handle); + sFunc(NActors::TEvents::TEvWakeup, DescribeQuery); + ) + + void Handle(NFq::TEvControlPlaneProxy::TEvCreateQueryResponse::TPtr& ev) { + if (ev->Get()->Issues) { + Fail(FederatedQuery::QueryMeta::FAILED, {GroupIssues(NYql::TIssue("Failed to create request"), ev->Get()->Issues)}); + return; + } + + QueryId = ev->Get()->Result.query_id(); + DescribeQuery(); + } + + void Handle(NFq::TEvControlPlaneProxy::TEvDescribeQueryResponse::TPtr& ev) { + if (ev->Get()->Issues) { + Fail(FederatedQuery::QueryMeta::FAILED, {GroupIssues(NYql::TIssue("Failed to describe request"), ev->Get()->Issues)}); + return; + } + + const auto& result = ev->Get()->Result.query(); + const auto status = result.meta().status(); + if (!IsFinalStatus(status)) { + Schedule(PingPeriod, new NActors::TEvents::TEvWakeup()); + return; + } + + TQueryResponse response; + response.Status = status; + NYql::IssuesFromMessage(result.issue(), response.Issues); + NYql::IssuesFromMessage(result.transient_issue(), response.TransientIssues); + Promise.SetValue(response); + PassAway(); + } + + void DescribeQuery() const { + FederatedQuery::DescribeQueryRequest request; + request.set_query_id(QueryId); + + Send(NFq::ControlPlaneProxyActorId(), new NFq::TEvControlPlaneProxy::TEvDescribeQueryRequest(Scope, request, UserSid, Token, TVector<TString>{})); + } + +private: + void Fail(FederatedQuery::QueryMeta::ComputeStatus status, NYql::TIssues issues) { + Promise.SetValue({ + .Status = status, + .Issues = std::move(issues) + }); + PassAway(); + } + +private: + const TString Scope; + const TString UserSid; + const TString Token; + const TDuration PingPeriod; + + std::unique_ptr<NFq::TEvControlPlaneProxy::TEvCreateQueryRequest> Request; + NThreading::TPromise<TQueryResponse> Promise; + TString QueryId; +}; + +class TAsyncQueryRunnerActor : public TAsyncQueryRunnerActorBase<TQueryRequest, TQueryResponse> { + using TBase = TAsyncQueryRunnerActorBase<TQueryRequest, TQueryResponse>; + +public: + TAsyncQueryRunnerActor(const TAsyncQueriesSettings& settings) + : TBase(settings) + {} + +protected: + void RunQuery(TQueryRequest&& request, NThreading::TPromise<TQueryResponse> promise) override { + Register(new TRunQueryActor(std::move(request), promise)); + } +}; + +} // anonymous namespace + +bool TQueryResponse::IsSuccess() const { + return Status == FederatedQuery::QueryMeta::COMPLETED; +} + +TString TQueryResponse::GetStatus() const { + return FederatedQuery::QueryMeta::ComputeStatus_Name(Status); +} + +TString TQueryResponse::GetError() const { + NYql::TIssues issues = Issues; + if (TransientIssues) { + issues.AddIssue(GroupIssues(NYql::TIssue("Transient issues"), TransientIssues)); + } + return issues.ToString(); +} + +NActors::IActor* CreateAsyncQueryRunnerActor(const TAsyncQueriesSettings& settings) { + return new TAsyncQueryRunnerActor(settings); +} + +} // namespace NFqRun diff --git a/ydb/tests/tools/fqrun/src/actors.h b/ydb/tests/tools/fqrun/src/actors.h new file mode 100644 index 0000000000..5f0be7d281 --- /dev/null +++ b/ydb/tests/tools/fqrun/src/actors.h @@ -0,0 +1,27 @@ +#pragma once + +#include <ydb/core/fq/libs/control_plane_proxy/events/events.h> +#include <ydb/library/actors/core/actor.h> +#include <ydb/library/actors/core/events.h> +#include <ydb/tests/tools/kqprun/runlib/actors.h> + +namespace NFqRun { + +struct TQueryRequest { + std::unique_ptr<NFq::TEvControlPlaneProxy::TEvCreateQueryRequest> Event; + TDuration PingPeriod; +}; + +struct TQueryResponse { + FederatedQuery::QueryMeta::ComputeStatus Status; + NYql::TIssues Issues; + NYql::TIssues TransientIssues; + + bool IsSuccess() const; + TString GetStatus() const; + TString GetError() const; +}; + +NActors::IActor* CreateAsyncQueryRunnerActor(const NKikimrRun::TAsyncQueriesSettings& settings); + +} // namespace NFqRun diff --git a/ydb/tests/tools/fqrun/src/common.cpp b/ydb/tests/tools/fqrun/src/common.cpp index 7d0fc18045..ea0aac6aa8 100644 --- a/ydb/tests/tools/fqrun/src/common.cpp +++ b/ydb/tests/tools/fqrun/src/common.cpp @@ -1,11 +1,50 @@ #include "common.h" +#include <util/system/env.h> + +#include <ydb/library/aclib/aclib.h> + namespace NFqRun { +TExternalDatabase TExternalDatabase::Parse(const TString& optionValue, const TString& tokenVar) { + TStringBuf database, endpoint; + TStringBuf(optionValue).Split('@', database, endpoint); + if (database.empty() || endpoint.empty()) { + ythrow yexception() << "Incorrect external database mapping, expected form database@endpoint"; + } + + TExternalDatabase result = { + .Endpoint = TString(endpoint), + .Database = TString(database), + .Token = GetEnv(tokenVar) + }; + + if (!result.Token) { + result.Token = GetEnv(YQL_TOKEN_VARIABLE); + if (!result.Token) { + result.Token = BUILTIN_ACL_ROOT; + } + } + + return result; +} + void SetupAcl(FederatedQuery::Acl* acl) { if (acl->visibility() == FederatedQuery::Acl::VISIBILITY_UNSPECIFIED) { acl->set_visibility(FederatedQuery::Acl::SCOPE); } } +NYql::TIssue GroupIssues(NYql::TIssue rootIssue, const NYql::TIssues& childrenIssues) { + for (const auto& issue : childrenIssues) { + rootIssue.AddSubIssue(MakeIntrusive<NYql::TIssue>(issue)); + } + return rootIssue; +} + +bool IsFinalStatus(FederatedQuery::QueryMeta::ComputeStatus status) { + using EStatus = FederatedQuery::QueryMeta; + return IsIn({EStatus::FAILED, EStatus::COMPLETED, EStatus::ABORTED_BY_USER, EStatus::ABORTED_BY_SYSTEM}, status); +} + } // namespace NFqRun diff --git a/ydb/tests/tools/fqrun/src/common.h b/ydb/tests/tools/fqrun/src/common.h index 5c139d83f5..f0960afd3f 100644 --- a/ydb/tests/tools/fqrun/src/common.h +++ b/ydb/tests/tools/fqrun/src/common.h @@ -14,6 +14,14 @@ namespace NFqRun { constexpr char YQL_TOKEN_VARIABLE[] = "YQL_TOKEN"; constexpr i64 MAX_RESULT_SET_ROWS = 1000; +struct TExternalDatabase { + TString Endpoint; + TString Database; + TString Token; + + static TExternalDatabase Parse(const TString& optionValue, const TString& tokenVar); +}; + struct TFqSetupSettings : public NKikimrRun::TServerSettings { enum class EVerbose { None, @@ -24,6 +32,19 @@ struct TFqSetupSettings : public NKikimrRun::TServerSettings { }; bool EmulateS3 = false; + bool EnableTraceOpt = false; + + bool EnableQuotas = false; + std::optional<TExternalDatabase> RateLimiterDatabase; + + bool EnableCheckpoints = false; + std::optional<TExternalDatabase> CheckpointsDatabase; + + bool EnableCpStorage = false; + std::optional<TExternalDatabase> CpStorageDatabase; + + bool EnableRemoteRd = false; + std::optional<TExternalDatabase> RowDispatcherDatabase; EVerbose VerboseLevel = EVerbose::Info; @@ -33,19 +54,29 @@ struct TFqSetupSettings : public NKikimrRun::TServerSettings { NFq::NConfig::TConfig FqConfig; NKikimrConfig::TLogConfig LogConfig; std::optional<NKikimrConfig::TActorSystemConfig> ActorSystemConfig; + NKikimrRun::TAsyncQueriesSettings AsyncQueriesSettings; }; struct TRunnerOptions { + bool TraceOptAll = false; + std::unordered_set<ui64> TraceOptIds; + IOutputStream* ResultOutput = nullptr; NKikimrRun::EResultOutputFormat ResultOutputFormat = NKikimrRun::EResultOutputFormat::RowsJson; + TDuration PingPeriod; TFqSetupSettings FqSettings; }; struct TRequestOptions { TString Query; + ui64 QueryId; }; void SetupAcl(FederatedQuery::Acl* acl); +NYql::TIssue GroupIssues(NYql::TIssue rootIssue, const NYql::TIssues& childrenIssues); + +bool IsFinalStatus(FederatedQuery::QueryMeta::ComputeStatus status); + } // namespace NFqRun diff --git a/ydb/tests/tools/fqrun/src/fq_runner.cpp b/ydb/tests/tools/fqrun/src/fq_runner.cpp index 33995c17fa..36a53bbcff 100644 --- a/ydb/tests/tools/fqrun/src/fq_runner.cpp +++ b/ydb/tests/tools/fqrun/src/fq_runner.cpp @@ -10,8 +10,6 @@ namespace NFqRun { class TFqRunner::TImpl { using EVerbose = TFqSetupSettings::EVerbose; - static constexpr TDuration REFRESH_PERIOD = TDuration::Seconds(1); - public: explicit TImpl(const TRunnerOptions& options) : Options(options) @@ -22,6 +20,8 @@ public: {} bool ExecuteStreamQuery(const TRequestOptions& query) { + StartTraceOpt(query.QueryId); + if (VerboseLevel >= EVerbose::QueriesText) { Cout << CoutColors.Cyan() << "Starting stream request:\n" << CoutColors.Default() << query.Query << Endl; } @@ -114,14 +114,26 @@ public: return true; } -private: - static bool IsFinalStatus(FederatedQuery::QueryMeta::ComputeStatus status) { - using EStatus = FederatedQuery::QueryMeta; - return IsIn({EStatus::FAILED, EStatus::COMPLETED, EStatus::ABORTED_BY_USER, EStatus::ABORTED_BY_SYSTEM}, status); + void ExecuteQueryAsync(const TRequestOptions& query) const { + StartTraceOpt(query.QueryId); + + if (VerboseLevel >= EVerbose::QueriesText) { + Cout << CoutColors.Cyan() << "Starting async stream request:\n" << CoutColors.Default() << query.Query << Endl; + } + + FqSetup.QueryRequestAsync(query, Options.PingPeriod); + } + + void FinalizeRunner() const { + FqSetup.WaitAsyncQueries(); } +private: bool WaitStreamQuery() { StartTime = TInstant::Now(); + Y_DEFER { + TFqSetup::StopTraceOpt(); + }; while (true) { TExecutionMeta meta; @@ -141,7 +153,7 @@ private: return false; } - Sleep(REFRESH_PERIOD); + Sleep(Options.PingPeriod); } if (VerboseLevel >= EVerbose::Info) { @@ -160,6 +172,12 @@ private: return true; } + void StartTraceOpt(size_t queryId) const { + if (Options.TraceOptAll || Options.TraceOptIds.contains(queryId)) { + FqSetup.StartTraceOpt(); + } + } + private: const TRunnerOptions Options; const EVerbose VerboseLevel; @@ -198,4 +216,12 @@ bool TFqRunner::CreateBindings(const std::vector<FederatedQuery::BindingContent> return Impl->CreateBindings(bindings); } +void TFqRunner::ExecuteQueryAsync(const TRequestOptions& query) const { + Impl->ExecuteQueryAsync(query); +} + +void TFqRunner::FinalizeRunner() const { + Impl->FinalizeRunner(); +} + } // namespace NFqRun diff --git a/ydb/tests/tools/fqrun/src/fq_runner.h b/ydb/tests/tools/fqrun/src/fq_runner.h index b6e85db142..153952a955 100644 --- a/ydb/tests/tools/fqrun/src/fq_runner.h +++ b/ydb/tests/tools/fqrun/src/fq_runner.h @@ -20,6 +20,10 @@ public: bool CreateBindings(const std::vector<FederatedQuery::BindingContent>& bindings) const; + void ExecuteQueryAsync(const TRequestOptions& query) const; + + void FinalizeRunner() const; + private: class TImpl; std::shared_ptr<TImpl> Impl; diff --git a/ydb/tests/tools/fqrun/src/fq_setup.cpp b/ydb/tests/tools/fqrun/src/fq_setup.cpp index b3f38ee699..98d53ac997 100644 --- a/ydb/tests/tools/fqrun/src/fq_setup.cpp +++ b/ydb/tests/tools/fqrun/src/fq_setup.cpp @@ -1,4 +1,5 @@ #include "fq_setup.h" +#include "actors.h" #include <library/cpp/colorizer/colors.h> #include <library/cpp/testing/unittest/tests_data.h> @@ -11,6 +12,8 @@ #include <ydb/library/grpc/server/actors/logger.h> #include <ydb/library/security/ydb_credentials_provider_factory.h> +#include <yql/essentials/utils/log/log.h> + using namespace NKikimrRun; namespace NFqRun { @@ -105,15 +108,13 @@ private: const TString endpoint = TStringBuilder() << "localhost:" << grpcPort; const TString database = NKikimr::CanonizePath(Settings.DomainName); - const auto fillStorageConfig = [endpoint, database](NFq::NConfig::TYdbStorageConfig* config) { - config->SetEndpoint(endpoint); - config->SetDatabase(database); + const auto fillStorageConfig = [endpoint, database](NFq::NConfig::TYdbStorageConfig* config, std::optional<TExternalDatabase> externalDatabase = std::nullopt) { + config->SetEndpoint(externalDatabase ? externalDatabase->Endpoint : endpoint); + config->SetDatabase(externalDatabase ? externalDatabase->Database : database); + if (externalDatabase) { + config->SetToken(externalDatabase->Token); + } }; - fillStorageConfig(fqConfig.MutableControlPlaneStorage()->MutableStorage()); - fillStorageConfig(fqConfig.MutableDbPool()->MutableStorage()); - fillStorageConfig(fqConfig.MutableCheckpointCoordinator()->MutableStorage()); - fillStorageConfig(fqConfig.MutableRateLimiter()->MutableDatabase()); - fillStorageConfig(fqConfig.MutableRowDispatcher()->MutableCoordinator()->MutableDatabase()); auto* privateApiConfig = fqConfig.MutablePrivateApi(); privateApiConfig->SetTaskServiceEndpoint(endpoint); @@ -123,14 +124,37 @@ private: nodesMenagerConfig->SetPort(grpcPort); nodesMenagerConfig->SetHost("localhost"); - auto* healthConfig = fqConfig.MutableHealth(); - healthConfig->SetPort(grpcPort); - healthConfig->SetDatabase(database); - if (Settings.EmulateS3) { fqConfig.MutableCommon()->SetObjectStorageEndpoint("file://"); } + auto& cpStorage = *fqConfig.MutableControlPlaneStorage(); + cpStorage.SetUseInMemory(!Settings.EnableCpStorage); + fillStorageConfig(cpStorage.MutableStorage(), Settings.CpStorageDatabase); + fillStorageConfig(fqConfig.MutableDbPool()->MutableStorage(), Settings.CpStorageDatabase); + + auto& checkpoints = *fqConfig.MutableCheckpointCoordinator(); + checkpoints.SetEnabled(Settings.EnableCheckpoints); + if (Settings.EnableCheckpoints) { + fillStorageConfig(checkpoints.MutableStorage(), Settings.CheckpointsDatabase); + } + + fqConfig.MutableQuotasManager()->SetEnabled(Settings.EnableQuotas); + + auto& rateLimiter = *fqConfig.MutableRateLimiter(); + rateLimiter.SetEnabled(Settings.EnableQuotas); + rateLimiter.SetControlPlaneEnabled(Settings.EnableQuotas); + rateLimiter.SetDataPlaneEnabled(Settings.EnableQuotas); + if (Settings.EnableQuotas) { + fillStorageConfig(rateLimiter.MutableDatabase(), Settings.RateLimiterDatabase); + } + + auto& rowDispatcher = *fqConfig.MutableRowDispatcher()->MutableCoordinator(); + rowDispatcher.SetLocalMode(!Settings.EnableRemoteRd); + if (Settings.EnableRemoteRd) { + fillStorageConfig(rowDispatcher.MutableDatabase(), Settings.RowDispatcherDatabase); + } + return fqConfig; } @@ -155,11 +179,21 @@ private: YqSharedResources->Init(GetRuntime()->GetActorSystem(0)); } + void InitializeYqlLogger() { + if (!Settings.EnableTraceOpt) { + return; + } + + ModifyLogPriorities({{NKikimrServices::EServiceKikimr::YQL_PROXY, NActors::NLog::PRI_TRACE}}, Settings.LogConfig); + NYql::NLog::InitLogger(NActors::CreateNullBackend()); + } + public: explicit TImpl(const TFqSetupSettings& settings) : Settings(settings) { const ui32 grpcPort = Settings.GrpcPort ? Settings.GrpcPort : PortManager.GetPort(); + InitializeYqlLogger(); InitializeServer(grpcPort); InitializeFqProxy(grpcPort); @@ -179,15 +213,9 @@ public: } NFq::TEvControlPlaneProxy::TEvCreateQueryResponse::TPtr StreamRequest(const TRequestOptions& query) const { - FederatedQuery::CreateQueryRequest request; - request.set_execute_mode(FederatedQuery::ExecuteMode::RUN); - - auto& content = *request.mutable_content(); - content.set_type(FederatedQuery::QueryContent::STREAMING); - content.set_text(query.Query); - SetupAcl(content.mutable_acl()); - - return RunControlPlaneProxyRequest<NFq::TEvControlPlaneProxy::TEvCreateQueryRequest, NFq::TEvControlPlaneProxy::TEvCreateQueryResponse>(request); + return RunControlPlaneProxyRequest<NFq::TEvControlPlaneProxy::TEvCreateQueryRequest, NFq::TEvControlPlaneProxy::TEvCreateQueryResponse>( + GetStreamRequest(query) + ); } NFq::TEvControlPlaneProxy::TEvDescribeQueryResponse::TPtr DescribeQuery(const TString& queryId) const { @@ -220,15 +248,69 @@ public: return RunControlPlaneProxyRequest<NFq::TEvControlPlaneProxy::TEvCreateBindingRequest, NFq::TEvControlPlaneProxy::TEvCreateBindingResponse>(request); } + void QueryRequestAsync(const TRequestOptions& query, TDuration pingPeriod) { + if (!AsyncQueryRunnerActorId) { + AsyncQueryRunnerActorId = GetRuntime()->Register(CreateAsyncQueryRunnerActor(Settings.AsyncQueriesSettings), 0, GetRuntime()->GetAppData().UserPoolId); + } + + TQueryRequest request = { + .Event = GetControlPlaneRequest<NFq::TEvControlPlaneProxy::TEvCreateQueryRequest>(GetStreamRequest(query)), + .PingPeriod = pingPeriod + }; + auto startPromise = NThreading::NewPromise(); + GetRuntime()->Send(*AsyncQueryRunnerActorId, GetRuntime()->AllocateEdgeActor(), new NKikimrRun::TEvPrivate::TEvStartAsyncQuery(std::move(request), startPromise)); + + return startPromise.GetFuture().GetValueSync(); + } + + void WaitAsyncQueries() const { + if (!AsyncQueryRunnerActorId) { + return; + } + + auto finalizePromise = NThreading::NewPromise(); + GetRuntime()->Send(*AsyncQueryRunnerActorId, GetRuntime()->AllocateEdgeActor(), new NKikimrRun::TEvPrivate::TEvFinalizeAsyncQueryRunner(finalizePromise)); + + return finalizePromise.GetFuture().GetValueSync(); + } + + void StartTraceOpt() const { + if (!Settings.EnableTraceOpt) { + ythrow yexception() << "Trace opt was disabled"; + } + + NYql::NLog::YqlLogger().ResetBackend(CreateLogBackend()); + } + + static void StopTraceOpt() { + NYql::NLog::YqlLogger().ResetBackend(NActors::CreateNullBackend()); + } + private: NActors::TTestActorRuntime* GetRuntime() const { return Server->GetRuntime(); } + static FederatedQuery::CreateQueryRequest GetStreamRequest(const TRequestOptions& query) { + FederatedQuery::CreateQueryRequest request; + request.set_execute_mode(FederatedQuery::ExecuteMode::RUN); + + auto& content = *request.mutable_content(); + content.set_type(FederatedQuery::QueryContent::STREAMING); + content.set_text(query.Query); + SetupAcl(content.mutable_acl()); + + return request; + } + + template <typename TRequest, typename TProto> + std::unique_ptr<TRequest> GetControlPlaneRequest(const TProto& request) const { + return std::make_unique<TRequest>("yandexcloud://fqrun", request, BUILTIN_ACL_ROOT, Settings.YqlToken ? Settings.YqlToken : "fqrun", TVector<TString>{}); + } + template <typename TRequest, typename TResponse, typename TProto> typename TResponse::TPtr RunControlPlaneProxyRequest(const TProto& request) const { - auto event = std::make_unique<TRequest>("yandexcloud://fqrun", request, BUILTIN_ACL_ROOT, Settings.YqlToken ? Settings.YqlToken : "fqrun", TVector<TString>{}); - return RunControlPlaneProxyRequest<TRequest, TResponse>(std::move(event)); + return RunControlPlaneProxyRequest<TRequest, TResponse>(GetControlPlaneRequest<TRequest>(request)); } template <typename TRequest, typename TResponse> @@ -242,13 +324,15 @@ private: } private: - const TFqSetupSettings Settings; + TFqSetupSettings Settings; const NColorizer::TColors CoutColors; NKikimr::Tests::TServer::TPtr Server; std::unique_ptr<NKikimr::Tests::TClient> Client; NFq::IYqSharedResources::TPtr YqSharedResources; TPortManager PortManager; + + std::optional<NActors::TActorId> AsyncQueryRunnerActorId; }; TFqSetup::TFqSetup(const TFqSetupSettings& settings) @@ -300,4 +384,20 @@ TRequestResult TFqSetup::CreateBinding(const FederatedQuery::BindingContent& bin return GetStatus(response->Get()->Issues); } +void TFqSetup::QueryRequestAsync(const TRequestOptions& query, TDuration pingPeriod) const { + Impl->QueryRequestAsync(query, pingPeriod); +} + +void TFqSetup::WaitAsyncQueries() const { + Impl->WaitAsyncQueries(); +} + +void TFqSetup::StartTraceOpt() const { + Impl->StartTraceOpt(); +} + +void TFqSetup::StopTraceOpt() { + TFqSetup::TImpl::StopTraceOpt(); +} + } // namespace NFqRun diff --git a/ydb/tests/tools/fqrun/src/fq_setup.h b/ydb/tests/tools/fqrun/src/fq_setup.h index 2ef7733bf0..560402b797 100644 --- a/ydb/tests/tools/fqrun/src/fq_setup.h +++ b/ydb/tests/tools/fqrun/src/fq_setup.h @@ -29,6 +29,14 @@ public: TRequestResult CreateBinding(const FederatedQuery::BindingContent& binding) const; + void QueryRequestAsync(const TRequestOptions& query, TDuration pingPeriod) const; + + void WaitAsyncQueries() const; + + void StartTraceOpt() const; + + static void StopTraceOpt(); + private: class TImpl; std::shared_ptr<TImpl> Impl; diff --git a/ydb/tests/tools/fqrun/src/ya.make b/ydb/tests/tools/fqrun/src/ya.make index a5a54edf3a..bf88dd8dad 100644 --- a/ydb/tests/tools/fqrun/src/ya.make +++ b/ydb/tests/tools/fqrun/src/ya.make @@ -1,6 +1,7 @@ LIBRARY() SRCS( + actors.cpp common.cpp fq_runner.cpp fq_setup.cpp @@ -14,6 +15,7 @@ PEERDIR( ydb/core/fq/libs/init ydb/core/fq/libs/mock ydb/core/testlib + ydb/library/actors/core ydb/library/folder_service/mock ydb/library/grpc/server/actors ydb/library/security diff --git a/ydb/tests/tools/kqprun/README.md b/ydb/tests/tools/kqprun/README.md index 10e2832941..225a58264a 100644 --- a/ydb/tests/tools/kqprun/README.md +++ b/ydb/tests/tools/kqprun/README.md @@ -4,6 +4,13 @@ Tool can be used to execute queries by using kikimr provider. For profiling memory allocations build kqprun with ya make flag `-D PROFILE_MEMORY_ALLOCATIONS -D CXXFLAGS=-DPROFILE_MEMORY_ALLOCATIONS`. +## Scripts + +* `flame_graph.sh` - script for collecting flame graphs in svg format, usage: + ```(bash) + ./flame_graph.sh [graph collection time in seconds] [use sudo] + ``` + ## Examples ### Queries diff --git a/ydb/tests/tools/kqprun/flame_graph.sh b/ydb/tests/tools/kqprun/flame_graph.sh index 61749c81b2..25467038bf 100755 --- a/ydb/tests/tools/kqprun/flame_graph.sh +++ b/ydb/tests/tools/kqprun/flame_graph.sh @@ -2,11 +2,30 @@ set -eux -kqprun_pid=$(pgrep -u $USER kqprun) +if [ $# -gt 3 ]; then + echo "Too many arguments" + exit -1 +fi -sudo perf record -F 50 --call-graph dwarf -g --proc-map-timeout=10000 --pid $kqprun_pid -v -o profdata -- sleep ${1:-'30'} -sudo perf script -i profdata > profdata.txt +SUDO="" -flame_graph_tool="../../../../contrib/tools/flame-graph/" +if [ ${2:-''} ]; then + SUDO="sudo" +fi + +function cleanup { + $SUDO rm ./profdata + rm ./profdata.txt +} +trap cleanup EXIT + +TARGER_PID=$(pgrep -u $USER ${3:-'kqprun'}) + +$SUDO perf record -F 50 --call-graph dwarf -g --proc-map-timeout=10000 --pid $TARGER_PID -v -o profdata -- sleep ${1:-'30'} +$SUDO perf script -i profdata > profdata.txt + +SCRIPT_DIR=$(cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd) + +flame_graph_tool="$SCRIPT_DIR/../../../../contrib/tools/flame-graph/" ${flame_graph_tool}/stackcollapse-perf.pl profdata.txt | ${flame_graph_tool}/flamegraph.pl > profdata.svg diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp index 3ec48c7d5b..87f147d762 100644 --- a/ydb/tests/tools/kqprun/kqprun.cpp +++ b/ydb/tests/tools/kqprun/kqprun.cpp @@ -404,6 +404,7 @@ class TMain : public TMainBase { inline static const TString YqlToken = GetEnv(YQL_TOKEN_VARIABLE); + TDuration PingPeriod; TExecutionOptions ExecutionOptions; TRunnerOptions RunnerOptions; @@ -623,6 +624,11 @@ protected: .Choices(verbose.GetChoices()) .StoreMappedResultT<TString>(&RunnerOptions.YdbSettings.AsyncQueriesSettings.Verbose, verbose); + options.AddLongOption("ping-period", "Query ping period in milliseconds") + .RequiredArgument("uint") + .DefaultValue(1000) + .StoreMappedResultT<ui64>(&PingPeriod, &TDuration::MilliSeconds<ui64>); + TChoices<NKikimrKqp::EQueryAction> scriptAction({ {"execute", NKikimrKqp::QUERY_ACTION_EXECUTE}, {"explain", NKikimrKqp::QUERY_ACTION_EXPLAIN} @@ -790,11 +796,13 @@ protected: RunnerOptions.YdbSettings.FunctionRegistry = CreateFunctionRegistry().Get(); auto& appConfig = RunnerOptions.YdbSettings.AppConfig; + auto& queryService = *appConfig.MutableQueryServiceConfig(); if (ExecutionOptions.ResultsRowsLimit) { - appConfig.MutableQueryServiceConfig()->SetScriptResultRowsLimit(ExecutionOptions.ResultsRowsLimit); + queryService.SetScriptResultRowsLimit(ExecutionOptions.ResultsRowsLimit); } + queryService.SetProgressStatsPeriodMs(PingPeriod.MilliSeconds()); - FillLogConfig(*appConfig.MutableLogConfig()); + SetupLogsConfig(); if (EmulateYt) { const auto& fileStorageConfig = appConfig.GetQueryServiceConfig().GetFileStorage(); @@ -843,6 +851,14 @@ private: } } } + + void SetupLogsConfig() { + auto& logConfig = *RunnerOptions.YdbSettings.AppConfig.MutableLogConfig(); + if (DefaultLogPriority) { + logConfig.SetDefaultLevel(*DefaultLogPriority); + } + ModifyLogPriorities(LogPriorities, logConfig); + } }; } // anonymous namespace diff --git a/ydb/tests/tools/kqprun/runlib/actors.h b/ydb/tests/tools/kqprun/runlib/actors.h new file mode 100644 index 0000000000..9b9aebed9c --- /dev/null +++ b/ydb/tests/tools/kqprun/runlib/actors.h @@ -0,0 +1,189 @@ +#pragma once + +#include "settings.h" + +#include <library/cpp/colorizer/colors.h> +#include <library/cpp/threading/future/core/future.h> + +#include <ydb/library/actors/core/actor.h> +#include <ydb/library/actors/core/events.h> +#include <ydb/library/actors/core/hfunc.h> +#include <ydb/public/api/protos/ydb_status_codes.pb.h> + +#include <yql/essentials/public/issue/yql_issue.h> +#include <yql/essentials/public/issue/yql_issue_message.h> + +#include <queue> + +namespace NKikimrRun { + +struct TEvPrivate { + enum EEv : ui32 { + EvStartAsyncQuery = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + EvAsyncQueryFinished, + EvFinalizeAsyncQueryRunner, + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)"); + + template <typename TQueryRequest> + struct TEvStartAsyncQuery : public NActors::TEventLocal<TEvStartAsyncQuery<TQueryRequest>, EvStartAsyncQuery> { + TEvStartAsyncQuery(TQueryRequest request, NThreading::TPromise<void> startPromise) + : Request(std::move(request)) + , StartPromise(startPromise) + {} + + TQueryRequest Request; + NThreading::TPromise<void> StartPromise; + }; + + template <typename TQueryResponse> + struct TEvAsyncQueryFinished : public NActors::TEventLocal<TEvAsyncQueryFinished<TQueryResponse>, EvAsyncQueryFinished> { + TEvAsyncQueryFinished(ui64 requestId, TQueryResponse result) + : RequestId(requestId) + , Result(std::move(result)) + {} + + const ui64 RequestId; + const TQueryResponse Result; + }; + + struct TEvFinalizeAsyncQueryRunner : public NActors::TEventLocal<TEvFinalizeAsyncQueryRunner, EvFinalizeAsyncQueryRunner> { + explicit TEvFinalizeAsyncQueryRunner(NThreading::TPromise<void> finalizePromise) + : FinalizePromise(finalizePromise) + {} + + NThreading::TPromise<void> FinalizePromise; + }; +}; + +template <typename TQueryRequest, typename TQueryResponse> +class TAsyncQueryRunnerActorBase : public NActors::TActor<TAsyncQueryRunnerActorBase<TQueryRequest, TQueryResponse>> { + using TBase = NActors::TActor<TAsyncQueryRunnerActorBase<TQueryRequest, TQueryResponse>>; + + struct TRequestInfo { + TInstant StartTime; + NThreading::TFuture<TQueryResponse> RequestFuture; + }; + +public: + TAsyncQueryRunnerActorBase(const TAsyncQueriesSettings& settings) + : TBase(&TAsyncQueryRunnerActorBase::StateFunc) + , Settings_(settings) + { + RunningRequests_.reserve(Settings_.InFlightLimit); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvPrivate::TEvStartAsyncQuery<TQueryRequest>, Handle); + hFunc(TEvPrivate::TEvAsyncQueryFinished<TQueryResponse>, Handle); + hFunc(TEvPrivate::TEvFinalizeAsyncQueryRunner, Handle); + ) + + void Handle(TEvPrivate::TEvStartAsyncQuery<TQueryRequest>::TPtr& ev) { + DelayedRequests_.emplace(std::move(ev)); + StartDelayedRequests(); + } + + void Handle(TEvPrivate::TEvAsyncQueryFinished<TQueryResponse>::TPtr& ev) { + const ui64 requestId = ev->Get()->RequestId; + RequestsLatency_ += TInstant::Now() - RunningRequests_[requestId].StartTime; + RunningRequests_.erase(requestId); + + const auto& response = ev->Get()->Result; + + if (response.IsSuccess()) { + Completed_++; + if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::EachQuery) { + Cout << CoutColors_.Green() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " completed. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << Endl; + } + } else { + Failed_++; + Cout << CoutColors_.Red() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " failed " << response.GetStatus() << ". " << CoutColors_.Yellow() << GetInfoString() << "\n" << CoutColors_.Red() << "Issues:\n" << response.GetError() << CoutColors_.Default(); + } + + if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::Final && TInstant::Now() - LastReportTime_ > TDuration::Seconds(1)) { + Cout << CoutColors_.Green() << TInstant::Now().ToIsoStringLocal() << " Finished " << Failed_ + Completed_ << " requests. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << Endl; + LastReportTime_ = TInstant::Now(); + } + + StartDelayedRequests(); + TryFinalize(); + } + + void Handle(TEvPrivate::TEvFinalizeAsyncQueryRunner::TPtr& ev) { + FinalizePromise_ = ev->Get()->FinalizePromise; + if (!TryFinalize()) { + Cout << CoutColors_.Yellow() << TInstant::Now().ToIsoStringLocal() << " Waiting for " << DelayedRequests_.size() + RunningRequests_.size() << " async queries..." << CoutColors_.Default() << Endl; + } + } + +protected: + virtual void RunQuery(TQueryRequest&& request, NThreading::TPromise<TQueryResponse> promise) = 0; + +private: + void StartDelayedRequests() { + while (!DelayedRequests_.empty() && (!Settings_.InFlightLimit || RunningRequests_.size() < Settings_.InFlightLimit)) { + auto request = std::move(DelayedRequests_.front()); + DelayedRequests_.pop(); + + auto promise = NThreading::NewPromise<TQueryResponse>(); + RunQuery(std::move(request->Get()->Request), promise); + RunningRequests_[RequestId_] = { + .StartTime = TInstant::Now(), + .RequestFuture = promise.GetFuture().Subscribe([id = RequestId_, this](const NThreading::TFuture<TQueryResponse>& f) { + this->Send(this->SelfId(), new TEvPrivate::TEvAsyncQueryFinished(id, std::move(f.GetValue()))); + }) + }; + + MaxInFlight_ = std::max(MaxInFlight_, RunningRequests_.size()); + if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::EachQuery) { + Cout << CoutColors_.Cyan() << TInstant::Now().ToIsoStringLocal() << " Request #" << RequestId_ << " started. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << "\n"; + } + + RequestId_++; + request->Get()->StartPromise.SetValue(); + } + } + + bool TryFinalize() { + if (!FinalizePromise_ || !RunningRequests_.empty()) { + return false; + } + + if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::Final) { + Cout << CoutColors_.Cyan() << TInstant::Now().ToIsoStringLocal() << " All async requests finished. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << "\n"; + } + + FinalizePromise_->SetValue(); + this->PassAway(); + return true; + } + + TString GetInfoString() const { + TStringBuilder result = TStringBuilder() << "completed: " << Completed_ << ", failed: " << Failed_ << ", in flight: " << RunningRequests_.size() << ", max in flight: " << MaxInFlight_ << ", spend time: " << TInstant::Now() - StartTime_; + if (const auto amountRequests = Completed_ + Failed_) { + result << ", average latency: " << RequestsLatency_ / amountRequests; + } + return result; + } + +private: + const TAsyncQueriesSettings Settings_; + const TInstant StartTime_ = TInstant::Now(); + const NColorizer::TColors CoutColors_ = NColorizer::AutoColors(Cout); + + std::optional<NThreading::TPromise<void>> FinalizePromise_; + std::queue<typename TEvPrivate::TEvStartAsyncQuery<TQueryRequest>::TPtr> DelayedRequests_; + std::unordered_map<ui64, TRequestInfo> RunningRequests_; + TInstant LastReportTime_ = TInstant::Now(); + + ui64 RequestId_ = 1; + ui64 MaxInFlight_ = 0; + ui64 Completed_ = 0; + ui64 Failed_ = 0; + TDuration RequestsLatency_; +}; + +} // namespace NKikimrRun diff --git a/ydb/tests/tools/kqprun/runlib/application.cpp b/ydb/tests/tools/kqprun/runlib/application.cpp index 6b27760b5b..dac91ddb9d 100644 --- a/ydb/tests/tools/kqprun/runlib/application.cpp +++ b/ydb/tests/tools/kqprun/runlib/application.cpp @@ -18,6 +18,27 @@ namespace NKikimrRun { +namespace { + +#ifdef PROFILE_MEMORY_ALLOCATIONS +void InterruptHandler(int) { + NColorizer::TColors colors = NColorizer::AutoColors(Cerr); + + Cout << colors.Red() << "Execution interrupted, finishing profile memory allocations..." << colors.Default() << Endl; + TMainBase::FinishProfileMemoryAllocations(); + + abort(); +} +#endif + +} // anonymous namespace + +TMainBase::TMainBase() { +#ifdef PROFILE_MEMORY_ALLOCATIONS + signal(SIGINT, &InterruptHandler); +#endif +} + #ifdef PROFILE_MEMORY_ALLOCATIONS void TMainBase::FinishProfileMemoryAllocations() { if (ProfileAllocationsOutput) { @@ -103,7 +124,7 @@ void TMainBase::RegisterLogOptions(NLastGetopt::TOpts& options) { .StoreMappedResultT<TString>(&DefaultLogPriority, GetLogPrioritiesMap("log-default")); options.AddLongOption("log", "Component log priority in format <component>=<priority> (e. g. KQP_YQL=trace)") - .RequiredArgument("component priority") + .RequiredArgument("component=priority") .Handler1([this, logPriority = GetLogPrioritiesMap("log")](const NLastGetopt::TOptsParser* option) { TStringBuf component; TStringBuf priority; @@ -119,13 +140,6 @@ void TMainBase::RegisterLogOptions(NLastGetopt::TOpts& options) { }); } -void TMainBase::FillLogConfig(NKikimrConfig::TLogConfig& config) const { - if (DefaultLogPriority) { - config.SetDefaultLevel(*DefaultLogPriority); - } - ModifyLogPriorities(LogPriorities, config); -} - IOutputStream* TMainBase::GetDefaultOutput(const TString& file) { if (file == "-") { return &Cout; diff --git a/ydb/tests/tools/kqprun/runlib/application.h b/ydb/tests/tools/kqprun/runlib/application.h index fd67eb2ac0..ebb32506f3 100644 --- a/ydb/tests/tools/kqprun/runlib/application.h +++ b/ydb/tests/tools/kqprun/runlib/application.h @@ -16,8 +16,10 @@ namespace NKikimrRun { class TMainBase : public TMainClassArgs { -#ifdef PROFILE_MEMORY_ALLOCATIONS public: + TMainBase(); + +#ifdef PROFILE_MEMORY_ALLOCATIONS static void FinishProfileMemoryAllocations(); #endif @@ -26,8 +28,6 @@ protected: virtual void RegisterLogOptions(NLastGetopt::TOpts& options); - void FillLogConfig(NKikimrConfig::TLogConfig& config) const; - static IOutputStream* GetDefaultOutput(const TString& file); TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> CreateFunctionRegistry() const; @@ -36,12 +36,12 @@ protected: inline static NColorizer::TColors CoutColors = NColorizer::AutoColors(Cout); inline static IOutputStream* ProfileAllocationsOutput = nullptr; -private: - inline static std::vector<std::unique_ptr<TFileOutput>> FileHolders; - std::optional<NActors::NLog::EPriority> DefaultLogPriority; std::unordered_map<NKikimrServices::EServiceKikimr, NActors::NLog::EPriority> LogPriorities; +private: + inline static std::vector<std::unique_ptr<TFileOutput>> FileHolders; + TString UdfsDirectory; TVector<TString> UdfsPaths; bool ExcludeLinkedUdfs; diff --git a/ydb/tests/tools/kqprun/runlib/settings.h b/ydb/tests/tools/kqprun/runlib/settings.h index c77caf1876..874ec2b21e 100644 --- a/ydb/tests/tools/kqprun/runlib/settings.h +++ b/ydb/tests/tools/kqprun/runlib/settings.h @@ -4,6 +4,16 @@ namespace NKikimrRun { +struct TAsyncQueriesSettings { + enum class EVerbose { + EachQuery, + Final, + }; + + ui64 InFlightLimit = 0; + EVerbose Verbose = EVerbose::EachQuery; +}; + struct TServerSettings { TString DomainName = "Root"; diff --git a/ydb/tests/tools/kqprun/runlib/utils.cpp b/ydb/tests/tools/kqprun/runlib/utils.cpp index f8c37d7b67..1b5a6373e3 100644 --- a/ydb/tests/tools/kqprun/runlib/utils.cpp +++ b/ydb/tests/tools/kqprun/runlib/utils.cpp @@ -1,5 +1,4 @@ #include "utils.h" -#include "application.h" #include <library/cpp/colorizer/colors.h> #include <library/cpp/json/json_reader.h> @@ -50,17 +49,6 @@ void FloatingPointExceptionHandler(int) { abort(); } -#ifdef PROFILE_MEMORY_ALLOCATIONS -void InterruptHandler(int) { - NColorizer::TColors colors = NColorizer::AutoColors(Cerr); - - Cout << colors.Red() << "Execution interrupted, finishing profile memory allocations..." << colors.Default() << Endl; - TMainBase::FinishProfileMemoryAllocations(); - - abort(); -} -#endif - } // nonymous namespace @@ -263,10 +251,6 @@ void SetupSignalActions() { std::set_terminate(&TerminateHandler); signal(SIGSEGV, &SegmentationFaultHandler); signal(SIGFPE, &FloatingPointExceptionHandler); - -#ifdef PROFILE_MEMORY_ALLOCATIONS - signal(SIGINT, &InterruptHandler); -#endif } void PrintResultSet(EResultOutputFormat format, IOutputStream& output, const Ydb::ResultSet& resultSet) { diff --git a/ydb/tests/tools/kqprun/runlib/ya.make b/ydb/tests/tools/kqprun/runlib/ya.make index 39bd1328bf..cc92e6f903 100644 --- a/ydb/tests/tools/kqprun/runlib/ya.make +++ b/ydb/tests/tools/kqprun/runlib/ya.make @@ -9,6 +9,7 @@ PEERDIR( library/cpp/colorizer library/cpp/getopt library/cpp/json + library/cpp/threading/future ydb/core/base ydb/core/blob_depot ydb/core/fq/libs/compute/common diff --git a/ydb/tests/tools/kqprun/src/actors.cpp b/ydb/tests/tools/kqprun/src/actors.cpp index ab1ae17974..f869caa76b 100644 --- a/ydb/tests/tools/kqprun/src/actors.cpp +++ b/ydb/tests/tools/kqprun/src/actors.cpp @@ -6,6 +6,8 @@ #include <ydb/core/kqp/rm_service/kqp_rm_service.h> #include <ydb/core/kqp/workload_service/actors/actors.h> +using namespace NKikimrRun; + namespace NKqpRun { namespace { @@ -106,131 +108,18 @@ private: std::vector<ui64> ResultSetSizes_; }; -class TAsyncQueryRunnerActor : public NActors::TActor<TAsyncQueryRunnerActor> { - using TBase = NActors::TActor<TAsyncQueryRunnerActor>; - - struct TRequestInfo { - TInstant StartTime; - NThreading::TFuture<TQueryResponse> RequestFuture; - }; +class TAsyncQueryRunnerActor : public TAsyncQueryRunnerActorBase<TQueryRequest, TQueryResponse> { + using TBase = TAsyncQueryRunnerActorBase<TQueryRequest, TQueryResponse>; public: TAsyncQueryRunnerActor(const TAsyncQueriesSettings& settings) - : TBase(&TAsyncQueryRunnerActor::StateFunc) - , Settings_(settings) - { - RunningRequests_.reserve(Settings_.InFlightLimit); - } - - STRICT_STFUNC(StateFunc, - hFunc(TEvPrivate::TEvStartAsyncQuery, Handle); - hFunc(TEvPrivate::TEvAsyncQueryFinished, Handle); - hFunc(TEvPrivate::TEvFinalizeAsyncQueryRunner, Handle); - ) - - void Handle(TEvPrivate::TEvStartAsyncQuery::TPtr& ev) { - DelayedRequests_.emplace(std::move(ev)); - StartDelayedRequests(); - } - - void Handle(TEvPrivate::TEvAsyncQueryFinished::TPtr& ev) { - const ui64 requestId = ev->Get()->RequestId; - RequestsLatency_ += TInstant::Now() - RunningRequests_[requestId].StartTime; - RunningRequests_.erase(requestId); - - const auto& response = ev->Get()->Result.Response->Get()->Record; - const auto status = response.GetYdbStatus(); - - if (status == Ydb::StatusIds::SUCCESS) { - Completed_++; - if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::EachQuery) { - Cout << CoutColors_.Green() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " completed. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << Endl; - } - } else { - Failed_++; - NYql::TIssues issues; - NYql::IssuesFromMessage(response.GetResponse().GetQueryIssues(), issues); - Cout << CoutColors_.Red() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " failed " << status << ". " << CoutColors_.Yellow() << GetInfoString() << "\n" << CoutColors_.Red() << "Issues:\n" << issues.ToString() << CoutColors_.Default(); - } - - if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::Final && TInstant::Now() - LastReportTime_ > TDuration::Seconds(1)) { - Cout << CoutColors_.Green() << TInstant::Now().ToIsoStringLocal() << " Finished " << Failed_ + Completed_ << " requests. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << Endl; - LastReportTime_ = TInstant::Now(); - } - - StartDelayedRequests(); - TryFinalize(); - } - - void Handle(TEvPrivate::TEvFinalizeAsyncQueryRunner::TPtr& ev) { - FinalizePromise_ = ev->Get()->FinalizePromise; - if (!TryFinalize()) { - Cout << CoutColors_.Yellow() << TInstant::Now().ToIsoStringLocal() << " Waiting for " << DelayedRequests_.size() + RunningRequests_.size() << " async queries..." << CoutColors_.Default() << Endl; - } - } - -private: - void StartDelayedRequests() { - while (!DelayedRequests_.empty() && (!Settings_.InFlightLimit || RunningRequests_.size() < Settings_.InFlightLimit)) { - auto request = std::move(DelayedRequests_.front()); - DelayedRequests_.pop(); - - auto promise = NThreading::NewPromise<TQueryResponse>(); - Register(CreateRunScriptActorMock(std::move(request->Get()->Request), promise, nullptr)); - RunningRequests_[RequestId_] = { - .StartTime = TInstant::Now(), - .RequestFuture = promise.GetFuture().Subscribe([id = RequestId_, this](const NThreading::TFuture<TQueryResponse>& f) { - Send(SelfId(), new TEvPrivate::TEvAsyncQueryFinished(id, std::move(f.GetValue()))); - }) - }; - - MaxInFlight_ = std::max(MaxInFlight_, RunningRequests_.size()); - if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::EachQuery) { - Cout << CoutColors_.Cyan() << TInstant::Now().ToIsoStringLocal() << " Request #" << RequestId_ << " started. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << "\n"; - } - - RequestId_++; - request->Get()->StartPromise.SetValue(); - } - } - - bool TryFinalize() { - if (!FinalizePromise_ || !RunningRequests_.empty()) { - return false; - } - - if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::Final) { - Cout << CoutColors_.Cyan() << TInstant::Now().ToIsoStringLocal() << " All async requests finished. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << "\n"; - } - - FinalizePromise_->SetValue(); - PassAway(); - return true; - } + : TBase(settings) + {} - TString GetInfoString() const { - TStringBuilder result = TStringBuilder() << "completed: " << Completed_ << ", failed: " << Failed_ << ", in flight: " << RunningRequests_.size() << ", max in flight: " << MaxInFlight_ << ", spend time: " << TInstant::Now() - StartTime_; - if (const auto amountRequests = Completed_ + Failed_) { - result << ", average latency: " << RequestsLatency_ / amountRequests; - } - return result; +protected: + void RunQuery(TQueryRequest&& request, NThreading::TPromise<TQueryResponse> promise) override { + Register(CreateRunScriptActorMock(std::move(request), promise, nullptr)); } - -private: - const TAsyncQueriesSettings Settings_; - const TInstant StartTime_ = TInstant::Now(); - const NColorizer::TColors CoutColors_ = NColorizer::AutoColors(Cout); - - std::optional<NThreading::TPromise<void>> FinalizePromise_; - std::queue<TEvPrivate::TEvStartAsyncQuery::TPtr> DelayedRequests_; - std::unordered_map<ui64, TRequestInfo> RunningRequests_; - TInstant LastReportTime_ = TInstant::Now(); - - ui64 RequestId_ = 1; - ui64 MaxInFlight_ = 0; - ui64 Completed_ = 0; - ui64 Failed_ = 0; - TDuration RequestsLatency_; }; class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaiterActor> { @@ -291,17 +180,6 @@ public: } } - void Handle(TEvPrivate::TEvResourcesInfo::TPtr& ev) { - const auto nodeCount = ev->Get()->NodeCount; - if (nodeCount == Settings_.ExpectedNodeCount) { - HealthCheckStage_ = EHealthCheck::FetchDatabase; - DoHealthCheck(); - return; - } - - Retry(TStringBuilder() << "invalid node count, got " << nodeCount << ", expected " << Settings_.ExpectedNodeCount, true); - } - void Handle(NKikimr::NKqp::NWorkload::TEvFetchDatabaseResponse::TPtr& ev) { const auto status = ev->Get()->Status; if (status == Ydb::StatusIds::SUCCESS) { @@ -325,7 +203,6 @@ public: STRICT_STFUNC(StateFunc, sFunc(NActors::TEvents::TEvWakeup, DoHealthCheck); - hFunc(TEvPrivate::TEvResourcesInfo, Handle); hFunc(NKikimr::NKqp::NWorkload::TEvFetchDatabaseResponse, Handle); hFunc(NKikimr::NKqp::TEvKqp::TEvScriptResponse, Handle); ) @@ -341,10 +218,14 @@ private: return; } - ResourceManager_->RequestClusterResourcesInfo( - [selfId = SelfId(), actorContext = ActorContext()](TVector<NKikimrKqp::TKqpNodeResources>&& resources) { - actorContext.Send(selfId, new TEvPrivate::TEvResourcesInfo(resources.size())); - }); + const size_t nodeCount = ResourceManager_->GetClusterResources().size(); + if (nodeCount == static_cast<size_t>(Settings_.ExpectedNodeCount)) { + HealthCheckStage_ = EHealthCheck::FetchDatabase; + DoHealthCheck(); + return; + } + + Retry(TStringBuilder() << "invalid node count, got " << nodeCount << ", expected " << Settings_.ExpectedNodeCount, true); } void FetchDatabase() { @@ -525,6 +406,20 @@ private: } // anonymous namespace +bool TQueryResponse::IsSuccess() const { + return GetStatus() == Ydb::StatusIds::SUCCESS; +} + +Ydb::StatusIds::StatusCode TQueryResponse::GetStatus() const { + return Response->Get()->Record.GetYdbStatus(); +} + +TString TQueryResponse::GetError() const { + NYql::TIssues issues; + NYql::IssuesFromMessage(Response->Get()->Record.GetResponse().GetQueryIssues(), issues); + return issues.ToString(); +} + NActors::IActor* CreateRunScriptActorMock(TQueryRequest request, NThreading::TPromise<TQueryResponse> promise, TProgressCallback progressCallback) { return new TRunScriptActorMock(std::move(request), promise, progressCallback); } diff --git a/ydb/tests/tools/kqprun/src/actors.h b/ydb/tests/tools/kqprun/src/actors.h index 410794491b..4f555cc6a5 100644 --- a/ydb/tests/tools/kqprun/src/actors.h +++ b/ydb/tests/tools/kqprun/src/actors.h @@ -4,13 +4,17 @@ #include <ydb/core/kqp/common/events/events.h> #include <ydb/core/kqp/executer_actor/kqp_executer.h> - +#include <ydb/tests/tools/kqprun/runlib/actors.h> namespace NKqpRun { struct TQueryResponse { NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr Response; std::vector<Ydb::ResultSet> ResultSets; + + bool IsSuccess() const; + Ydb::StatusIds::StatusCode GetStatus() const; + TString GetError() const; }; struct TQueryRequest { @@ -35,61 +39,11 @@ struct TWaitResourcesSettings { TString Database; }; -struct TEvPrivate { - enum EEv : ui32 { - EvStartAsyncQuery = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), - EvAsyncQueryFinished, - EvFinalizeAsyncQueryRunner, - - EvResourcesInfo, - - EvEnd - }; - - static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)"); - - struct TEvStartAsyncQuery : public NActors::TEventLocal<TEvStartAsyncQuery, EvStartAsyncQuery> { - TEvStartAsyncQuery(TQueryRequest request, NThreading::TPromise<void> startPromise) - : Request(std::move(request)) - , StartPromise(startPromise) - {} - - TQueryRequest Request; - NThreading::TPromise<void> StartPromise; - }; - - struct TEvAsyncQueryFinished : public NActors::TEventLocal<TEvAsyncQueryFinished, EvAsyncQueryFinished> { - TEvAsyncQueryFinished(ui64 requestId, TQueryResponse result) - : RequestId(requestId) - , Result(std::move(result)) - {} - - const ui64 RequestId; - const TQueryResponse Result; - }; - - struct TEvFinalizeAsyncQueryRunner : public NActors::TEventLocal<TEvFinalizeAsyncQueryRunner, EvFinalizeAsyncQueryRunner> { - explicit TEvFinalizeAsyncQueryRunner(NThreading::TPromise<void> finalizePromise) - : FinalizePromise(finalizePromise) - {} - - NThreading::TPromise<void> FinalizePromise; - }; - - struct TEvResourcesInfo : public NActors::TEventLocal<TEvResourcesInfo, EvResourcesInfo> { - explicit TEvResourcesInfo(i32 nodeCount) - : NodeCount(nodeCount) - {} - - const i32 NodeCount; - }; -}; - using TProgressCallback = std::function<void(ui64 queryId, const NKikimrKqp::TEvExecuterProgress& executerProgress)>; NActors::IActor* CreateRunScriptActorMock(TQueryRequest request, NThreading::TPromise<TQueryResponse> promise, TProgressCallback progressCallback); -NActors::IActor* CreateAsyncQueryRunnerActor(const TAsyncQueriesSettings& settings); +NActors::IActor* CreateAsyncQueryRunnerActor(const NKikimrRun::TAsyncQueriesSettings& settings); NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise<void> promise, const TWaitResourcesSettings& settings); diff --git a/ydb/tests/tools/kqprun/src/common.h b/ydb/tests/tools/kqprun/src/common.h index f401b8d8c8..057223d6d1 100644 --- a/ydb/tests/tools/kqprun/src/common.h +++ b/ydb/tests/tools/kqprun/src/common.h @@ -22,16 +22,6 @@ constexpr char YQL_TOKEN_VARIABLE[] = "YQL_TOKEN"; constexpr ui64 DEFAULT_STORAGE_SIZE = 32_GB; constexpr TDuration TENANT_CREATION_TIMEOUT = TDuration::Seconds(30); -struct TAsyncQueriesSettings { - enum class EVerbose { - EachQuery, - Final, - }; - - ui64 InFlightLimit = 0; - EVerbose Verbose = EVerbose::EachQuery; -}; - struct TYdbSetupSettings : public NKikimrRun::TServerSettings { enum class EVerbose { None, @@ -68,7 +58,7 @@ struct TYdbSetupSettings : public NKikimrRun::TServerSettings { NKikimr::NMiniKQL::TComputationNodeFactory ComputationFactory; TIntrusivePtr<NYql::IYtGateway> YtGateway; NKikimrConfig::TAppConfig AppConfig; - TAsyncQueriesSettings AsyncQueriesSettings; + NKikimrRun::TAsyncQueriesSettings AsyncQueriesSettings; }; diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.cpp b/ydb/tests/tools/kqprun/src/kqp_runner.cpp index bbc432be16..f295846a35 100644 --- a/ydb/tests/tools/kqprun/src/kqp_runner.cpp +++ b/ydb/tests/tools/kqprun/src/kqp_runner.cpp @@ -173,6 +173,9 @@ public: private: bool WaitScriptExecutionOperation(ui64 queryId) { StartTime_ = TInstant::Now(); + Y_DEFER { + TYdbSetup::StopTraceOpt(); + }; TDuration getOperationPeriod = TDuration::Seconds(1); if (auto progressStatsPeriodMs = Options_.YdbSettings.AppConfig.GetQueryServiceConfig().GetProgressStatsPeriodMs()) { @@ -205,8 +208,6 @@ private: Sleep(getOperationPeriod); } - TYdbSetup::StopTraceOpt(); - PrintScriptAst(queryId, ExecutionMeta_.Ast); PrintScriptProgress(queryId, ExecutionMeta_.Plan); PrintScriptPlan(queryId, ExecutionMeta_.Plan); diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.cpp b/ydb/tests/tools/kqprun/src/ydb_setup.cpp index d70b727c78..145d4eaf78 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.cpp +++ b/ydb/tests/tools/kqprun/src/ydb_setup.cpp @@ -550,7 +550,7 @@ public: auto request = GetQueryRequest(query); auto startPromise = NThreading::NewPromise(); - GetRuntime()->Send(*AsyncQueryRunnerActorId_, GetRuntime()->AllocateEdgeActor(), new TEvPrivate::TEvStartAsyncQuery(std::move(request), startPromise)); + GetRuntime()->Send(*AsyncQueryRunnerActorId_, GetRuntime()->AllocateEdgeActor(), new NKikimrRun::TEvPrivate::TEvStartAsyncQuery(std::move(request), startPromise)); return startPromise.GetFuture().GetValueSync(); } @@ -561,7 +561,7 @@ public: } auto finalizePromise = NThreading::NewPromise(); - GetRuntime()->Send(*AsyncQueryRunnerActorId_, GetRuntime()->AllocateEdgeActor(), new TEvPrivate::TEvFinalizeAsyncQueryRunner(finalizePromise)); + GetRuntime()->Send(*AsyncQueryRunnerActorId_, GetRuntime()->AllocateEdgeActor(), new NKikimrRun::TEvPrivate::TEvFinalizeAsyncQueryRunner(finalizePromise)); return finalizePromise.GetFuture().GetValueSync(); } |