aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPisarenko Grigoriy <grigoriypisar@ydb.tech>2025-02-28 18:00:43 +0500
committerGitHub <noreply@github.com>2025-02-28 16:00:43 +0300
commit1547cff981d4a39f2a007760ca202b1071a95cd3 (patch)
tree4a7b73f6078532ffd2c0250b51ab794d8215ac66
parent45f8fc2ec6626a6735e7578d05997363242b71e7 (diff)
downloadydb-1547cff981d4a39f2a007760ca202b1071a95cd3.tar.gz
YQ-3561 FQrun supported remote databases (#15016)
-rw-r--r--ydb/core/fq/libs/actors/pending_fetcher.cpp10
-rw-r--r--ydb/core/fq/libs/config/protos/pending_fetcher.proto1
-rw-r--r--ydb/core/fq/libs/row_dispatcher/leader_election.cpp2
-rw-r--r--ydb/tests/tools/fqrun/README.md2
-rw-r--r--ydb/tests/tools/fqrun/configuration/fq_config.conf17
-rwxr-xr-xydb/tests/tools/fqrun/flame_graph.sh17
-rw-r--r--ydb/tests/tools/fqrun/fqrun.cpp248
-rw-r--r--ydb/tests/tools/fqrun/src/actors.cpp129
-rw-r--r--ydb/tests/tools/fqrun/src/actors.h27
-rw-r--r--ydb/tests/tools/fqrun/src/common.cpp39
-rw-r--r--ydb/tests/tools/fqrun/src/common.h31
-rw-r--r--ydb/tests/tools/fqrun/src/fq_runner.cpp40
-rw-r--r--ydb/tests/tools/fqrun/src/fq_runner.h4
-rw-r--r--ydb/tests/tools/fqrun/src/fq_setup.cpp148
-rw-r--r--ydb/tests/tools/fqrun/src/fq_setup.h8
-rw-r--r--ydb/tests/tools/fqrun/src/ya.make2
-rw-r--r--ydb/tests/tools/kqprun/README.md7
-rwxr-xr-xydb/tests/tools/kqprun/flame_graph.sh27
-rw-r--r--ydb/tests/tools/kqprun/kqprun.cpp20
-rw-r--r--ydb/tests/tools/kqprun/runlib/actors.h189
-rw-r--r--ydb/tests/tools/kqprun/runlib/application.cpp30
-rw-r--r--ydb/tests/tools/kqprun/runlib/application.h12
-rw-r--r--ydb/tests/tools/kqprun/runlib/settings.h10
-rw-r--r--ydb/tests/tools/kqprun/runlib/utils.cpp16
-rw-r--r--ydb/tests/tools/kqprun/runlib/ya.make1
-rw-r--r--ydb/tests/tools/kqprun/src/actors.cpp167
-rw-r--r--ydb/tests/tools/kqprun/src/actors.h58
-rw-r--r--ydb/tests/tools/kqprun/src/common.h12
-rw-r--r--ydb/tests/tools/kqprun/src/kqp_runner.cpp5
-rw-r--r--ydb/tests/tools/kqprun/src/ydb_setup.cpp4
30 files changed, 969 insertions, 314 deletions
diff --git a/ydb/core/fq/libs/actors/pending_fetcher.cpp b/ydb/core/fq/libs/actors/pending_fetcher.cpp
index 3975e59ccc..def090ba2b 100644
--- a/ydb/core/fq/libs/actors/pending_fetcher.cpp
+++ b/ydb/core/fq/libs/actors/pending_fetcher.cpp
@@ -105,6 +105,13 @@ struct TEvPrivate {
constexpr auto CLEANUP_PERIOD = TDuration::Seconds(60);
+TDuration GetPendingFetchPeriod(const NFq::NConfig::TPendingFetcherConfig& config) {
+ if (const auto periodMs = config.GetPendingFetchPeriodMs()) {
+ return TDuration::MilliSeconds(periodMs);
+ }
+ return TDuration::Seconds(1);
+}
+
} // namespace
class TPendingFetcher : public NActors::TActorBootstrapped<TPendingFetcher> {
@@ -170,6 +177,7 @@ public:
, ServiceCounters(serviceCounters, "pending_fetcher")
, GetTaskCounters("GetTask", ServiceCounters.Counters)
, FailedStatusCodeCounters(MakeIntrusive<TStatusCodeByScopeCounters>("IntermediateFailedStatusCode", ServiceCounters.RootCounters->GetSubgroup("component", "QueryDiagnostic")))
+ , PendingFetchPeriod(GetPendingFetchPeriod(config.GetPendingFetcher()))
, CredentialsFactory(credentialsFactory)
, S3Gateway(s3Gateway)
, ConnectorClient(connectorClient)
@@ -518,7 +526,7 @@ private:
IModuleResolver::TPtr ModuleResolver;
bool HasRunningRequest = false;
- const TDuration PendingFetchPeriod = TDuration::Seconds(1);
+ const TDuration PendingFetchPeriod;
TActorId DatabaseResolver;
diff --git a/ydb/core/fq/libs/config/protos/pending_fetcher.proto b/ydb/core/fq/libs/config/protos/pending_fetcher.proto
index 52f0dd1368..25f0203a6c 100644
--- a/ydb/core/fq/libs/config/protos/pending_fetcher.proto
+++ b/ydb/core/fq/libs/config/protos/pending_fetcher.proto
@@ -8,4 +8,5 @@ option java_package = "ru.yandex.kikimr.proto";
message TPendingFetcherConfig {
bool Enabled = 1;
+ uint64 PendingFetchPeriodMs = 2; // 1s by default
}
diff --git a/ydb/core/fq/libs/row_dispatcher/leader_election.cpp b/ydb/core/fq/libs/row_dispatcher/leader_election.cpp
index 6e709b594e..77ce3d0bd6 100644
--- a/ydb/core/fq/libs/row_dispatcher/leader_election.cpp
+++ b/ydb/core/fq/libs/row_dispatcher/leader_election.cpp
@@ -180,7 +180,7 @@ TLeaderElection::TLeaderElection(
: Config(config)
, CredentialsProviderFactory(credentialsProviderFactory)
, YqSharedResources(yqSharedResources)
- , YdbConnection(NewYdbConnection(config.GetDatabase(), credentialsProviderFactory, yqSharedResources->UserSpaceYdbDriver))
+ , YdbConnection(config.GetLocalMode() ? nullptr : NewYdbConnection(config.GetDatabase(), credentialsProviderFactory, yqSharedResources->UserSpaceYdbDriver))
, TablePathPrefix(JoinPath(config.GetDatabase().GetDatabase(), config.GetCoordinationNodePath()))
, CoordinationNodePath(JoinPath(TablePathPrefix, tenant))
, ParentId(parentId)
diff --git a/ydb/tests/tools/fqrun/README.md b/ydb/tests/tools/fqrun/README.md
index dcaee95ccc..14c0dc9b52 100644
--- a/ydb/tests/tools/fqrun/README.md
+++ b/ydb/tests/tools/fqrun/README.md
@@ -8,7 +8,7 @@ For profiling memory allocations build fqrun with ya make flags `-D PROFILE_MEMO
* `flame_graph.sh` - script for collecting flame graphs in svg format, usage:
```(bash)
- ./flame_graph.sh [graph collection time in seconds]
+ ./flame_graph.sh [graph collection time in seconds] [use sudo]
```
## Examples
diff --git a/ydb/tests/tools/fqrun/configuration/fq_config.conf b/ydb/tests/tools/fqrun/configuration/fq_config.conf
index 576b0d394b..6d7c90f6b1 100644
--- a/ydb/tests/tools/fqrun/configuration/fq_config.conf
+++ b/ydb/tests/tools/fqrun/configuration/fq_config.conf
@@ -3,7 +3,6 @@ EnableDynamicNameservice: true
EnableTaskCounters: true
CheckpointCoordinator {
- Enabled: true
CheckpointingPeriodMillis: 30000
MaxInflight: 1
@@ -24,7 +23,7 @@ Common {
MdbGateway: "https://mdb.api.cloud.yandex.net:443"
MdbTransformHost: false
ObjectStorageEndpoint: "https://storage.yandexcloud.net"
- IdsPrefix: "kr"
+ IdsPrefix: "fr"
QueryArtifactsCompressionMethod: "zstd_6"
MonitoringEndpoint: "monitoring.api.cloud.yandex.net"
KeepInternalErrors: true
@@ -42,10 +41,12 @@ Common {
ControlPlaneProxy {
Enabled: true
+ RequestTimeout: "1m"
}
ControlPlaneStorage {
Enabled: true
+ UseInMemory: true
StatsMode: STATS_MODE_PROFILE
DumpRawStatistics: true
TasksBatchSize: 100
@@ -84,6 +85,9 @@ ControlPlaneStorage {
OperationTimeoutSec: 60
CancelAfterSec: 60
}
+
+ RetryPolicyMapping {
+ }
}
DbPool {
@@ -201,10 +205,6 @@ Gateways {
}
}
-Health {
- Enabled: true
-}
-
NodesManager {
Enabled: true
}
@@ -223,7 +223,6 @@ PrivateProxy {
}
QuotasManager {
- Enabled: true
QuotaDescriptions {
SubjectType: "cloud"
MetricName: "yq.cpuPercent.count"
@@ -244,10 +243,6 @@ QuotasManager {
}
RateLimiter {
- Enabled: true
- ControlPlaneEnabled: true
- DataPlaneEnabled: true
-
Database {
TablePrefix: "yq/rate_limiter"
ClientTimeoutSec: 70
diff --git a/ydb/tests/tools/fqrun/flame_graph.sh b/ydb/tests/tools/fqrun/flame_graph.sh
index b6f8fe6da4..9b002f6227 100755
--- a/ydb/tests/tools/fqrun/flame_graph.sh
+++ b/ydb/tests/tools/fqrun/flame_graph.sh
@@ -2,24 +2,11 @@
set -eux
-function cleanup {
- sudo rm ./profdata
- rm ./profdata.txt
-}
-trap cleanup EXIT
-
-if [ $# -gt 1 ]; then
+if [ $# -gt 2 ]; then
echo "Too many arguments"
exit -1
fi
-fqrun_pid=$(pgrep -u $USER fqrun)
-
-sudo perf record -F 50 --call-graph dwarf -g --proc-map-timeout=10000 --pid $fqrun_pid -v -o profdata -- sleep ${1:-'30'}
-sudo perf script -i profdata > profdata.txt
-
SCRIPT_DIR=$(cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd)
-flame_graph_tool="$SCRIPT_DIR/../../../../contrib/tools/flame-graph/"
-
-${flame_graph_tool}/stackcollapse-perf.pl profdata.txt | ${flame_graph_tool}/flamegraph.pl > profdata.svg
+$SCRIPT_DIR/../kqprun/flame_graph.sh ${1:-'30'} ${2:-''} fqrun
diff --git a/ydb/tests/tools/fqrun/fqrun.cpp b/ydb/tests/tools/fqrun/fqrun.cpp
index 99f597d95c..3ebf36fe7c 100644
--- a/ydb/tests/tools/fqrun/fqrun.cpp
+++ b/ydb/tests/tools/fqrun/fqrun.cpp
@@ -20,27 +20,96 @@ namespace NFqRun {
namespace {
struct TExecutionOptions {
+ enum class EExecutionCase {
+ Stream,
+ AsyncStream
+ };
+
TString Query;
std::vector<FederatedQuery::ConnectionContent> Connections;
std::vector<FederatedQuery::BindingContent> Bindings;
+ ui32 LoopCount = 1;
+ TDuration LoopDelay;
+ bool ContinueAfterFail = false;
+
+ EExecutionCase ExecutionCase = EExecutionCase::Stream;
+
bool HasResults() const {
return !Query.empty();
}
- TRequestOptions GetQueryOptions() const {
+ TRequestOptions GetQueryOptions(ui64 queryId) const {
return {
- .Query = Query
+ .Query = Query,
+ .QueryId = queryId
};
}
void Validate(const TRunnerOptions& runnerOptions) const {
- if (!Query && !runnerOptions.FqSettings.MonitoringEnabled && !runnerOptions.FqSettings.GrpcEnabled) {
+ if (!Query && Connections.empty() && Bindings.empty() && !runnerOptions.FqSettings.MonitoringEnabled && !runnerOptions.FqSettings.GrpcEnabled) {
ythrow yexception() << "Nothing to execute and is not running as daemon";
}
+ ValidateAsyncOptions(runnerOptions.FqSettings.AsyncQueriesSettings);
+ ValidateTraceOpt(runnerOptions);
+ }
+
+private:
+ ui64 GetNumberOfQueries() const {
+ if (!Query) {
+ return 0;
+ }
+ return LoopCount ? LoopCount : std::numeric_limits<ui64>::max();
+ }
+
+ void ValidateAsyncOptions(const TAsyncQueriesSettings& asyncQueriesSettings) const {
+ if (asyncQueriesSettings.InFlightLimit && ExecutionCase != EExecutionCase::AsyncStream) {
+ ythrow yexception() << "In flight limit can not be used without async queries";
+ }
+
+ NColorizer::TColors colors = NColorizer::AutoColors(Cout);
+ const auto numberOfQueries = GetNumberOfQueries();
+ if (asyncQueriesSettings.InFlightLimit && asyncQueriesSettings.InFlightLimit > numberOfQueries) {
+ Cout << colors.Red() << "Warning: inflight limit is " << asyncQueriesSettings.InFlightLimit << ", that is larger than max possible number of queries " << numberOfQueries << colors.Default() << Endl;
+ }
+ }
+
+ void ValidateTraceOpt(const TRunnerOptions& runnerOptions) const {
+ if (runnerOptions.TraceOptAll && !runnerOptions.TraceOptIds.empty()) {
+ ythrow yexception() << "Trace opt ids can not be used with trace opt all flag";
+ }
+
+ const auto numberOfQueries = GetNumberOfQueries();
+ for (auto id : runnerOptions.TraceOptIds) {
+ if (id >= numberOfQueries) {
+ ythrow yexception() << "Trace opt id " << id << " should be less than number of queries " << numberOfQueries;
+ }
+ }
}
};
+void RunArgumentQuery(ui64 queryId, const TExecutionOptions& executionOptions, TFqRunner& runner) {
+ NColorizer::TColors colors = NColorizer::AutoColors(Cout);
+
+ switch (executionOptions.ExecutionCase) {
+ case TExecutionOptions::EExecutionCase::Stream: {
+ if (!runner.ExecuteStreamQuery(executionOptions.GetQueryOptions(queryId))) {
+ ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Query execution failed";
+ }
+ Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Fetching query results..." << colors.Default() << Endl;
+ if (!runner.FetchQueryResults()) {
+ ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Fetch query results failed";
+ }
+ break;
+ }
+
+ case TExecutionOptions::EExecutionCase::AsyncStream: {
+ runner.ExecuteQueryAsync(executionOptions.GetQueryOptions(queryId));
+ break;
+ }
+ }
+}
+
void RunArgumentQueries(const TExecutionOptions& executionOptions, TFqRunner& runner) {
NColorizer::TColors colors = NColorizer::AutoColors(Cout);
@@ -58,14 +127,31 @@ void RunArgumentQueries(const TExecutionOptions& executionOptions, TFqRunner& ru
}
}
- if (executionOptions.Query) {
- Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Executing query..." << colors.Default() << Endl;
- if (!runner.ExecuteStreamQuery(executionOptions.GetQueryOptions())) {
- ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Query execution failed";
+ if (!executionOptions.Query) {
+ return;
+ }
+
+ const size_t numberLoops = executionOptions.LoopCount;
+ for (size_t queryId = 0; queryId < numberLoops || numberLoops == 0; ++queryId) {
+ if (queryId > 0) {
+ Sleep(executionOptions.LoopDelay);
+ }
+
+ const TInstant startTime = TInstant::Now();
+ Cout << colors.Yellow() << startTime.ToIsoStringLocal() << " Executing query";
+ if (numberLoops != 1) {
+ Cout << ", loop " << queryId;
}
- Cout << colors.Yellow() << TInstant::Now().ToIsoStringLocal() << " Fetching query results..." << colors.Default() << Endl;
- if (!runner.FetchQueryResults()) {
- ythrow yexception() << TInstant::Now().ToIsoStringLocal() << " Fetch query results failed";
+ Cout << "..." << colors.Default() << Endl;
+
+ try {
+ RunArgumentQuery(queryId, executionOptions, runner);
+ } catch (const yexception& exception) {
+ if (executionOptions.ContinueAfterFail) {
+ Cerr << colors.Red() << CurrentExceptionMessage() << colors.Default() << Endl;
+ } else {
+ throw exception;
+ }
}
}
@@ -111,9 +197,18 @@ void RunScript(const TExecutionOptions& executionOptions, const TRunnerOptions&
}
class TMain : public TMainBase {
+ using TBase = TMainBase;
using EVerbose = TFqSetupSettings::EVerbose;
protected:
+ void RegisterLogOptions(NLastGetopt::TOpts& options) override {
+ TBase::RegisterLogOptions(options);
+
+ options.AddLongOption("log-fq", "FQ components log priority")
+ .RequiredArgument("priority")
+ .StoreMappedResultT<TString>(&FqLogPriority, GetLogPrioritiesMap("log-fq"));
+ }
+
void RegisterOptions(NLastGetopt::TOpts& options) override {
options.SetTitle("FqRun -- tool to execute stream queries through FQ proxy");
options.AddHelpOption('h');
@@ -208,6 +303,18 @@ protected:
// Outputs
+ options.AddLongOption('T', "trace-opt", "Print AST in the begin of each transformation")
+ .OptionalArgument("query-id")
+ .Handler1([this](const NLastGetopt::TOptsParser* option) {
+ const auto value = option->CurVal();
+ if (!value) {
+ RunnerOptions.TraceOptAll = true;
+ } else if (!RunnerOptions.TraceOptIds.emplace(FromString<ui64>(value)).second) {
+ ythrow yexception() << "Got duplicated trace opt index: " << value;
+ }
+ RunnerOptions.FqSettings.EnableTraceOpt = true;
+ });
+
options.AddLongOption("result-file", "File with query results (use '-' to write in stdout)")
.RequiredArgument("file")
.DefaultValue("-")
@@ -226,6 +333,20 @@ protected:
// Pipeline settings
+ TChoices<TExecutionOptions::EExecutionCase> executionCase({
+ {"stream", TExecutionOptions::EExecutionCase::Stream},
+ {"async-stream", TExecutionOptions::EExecutionCase::AsyncStream}
+ });
+ options.AddLongOption('C', "execution-case", "Type of query for -p argument")
+ .RequiredArgument("query-type")
+ .Choices(executionCase.GetChoices())
+ .StoreMappedResultT<TString>(&ExecutionOptions.ExecutionCase, executionCase);
+
+ options.AddLongOption("inflight-limit", "In flight limit for async queries (use 0 for unlimited)")
+ .RequiredArgument("uint")
+ .DefaultValue(0)
+ .StoreResult(&RunnerOptions.FqSettings.AsyncQueriesSettings.InFlightLimit);
+
options.AddLongOption("verbose", TStringBuilder() << "Common verbose level (max level " << static_cast<ui32>(EVerbose::Max) - 1 << ")")
.RequiredArgument("uint")
.DefaultValue(static_cast<ui8>(EVerbose::Info))
@@ -233,6 +354,73 @@ protected:
return static_cast<EVerbose>(std::min(value, static_cast<ui8>(EVerbose::Max)));
});
+ TChoices<TAsyncQueriesSettings::EVerbose> verbose({
+ {"each-query", TAsyncQueriesSettings::EVerbose::EachQuery},
+ {"final", TAsyncQueriesSettings::EVerbose::Final}
+ });
+ options.AddLongOption("async-verbose", "Verbose type for async queries")
+ .RequiredArgument("type")
+ .DefaultValue("each-query")
+ .Choices(verbose.GetChoices())
+ .StoreMappedResultT<TString>(&RunnerOptions.FqSettings.AsyncQueriesSettings.Verbose, verbose);
+
+ options.AddLongOption("ping-period", "Query ping period in milliseconds")
+ .RequiredArgument("uint")
+ .DefaultValue(100)
+ .StoreMappedResultT<ui64>(&RunnerOptions.PingPeriod, &TDuration::MilliSeconds<ui64>);
+
+ options.AddLongOption("loop-count", "Number of runs of the query (use 0 to start infinite loop)")
+ .RequiredArgument("uint")
+ .DefaultValue(ExecutionOptions.LoopCount)
+ .StoreResult(&ExecutionOptions.LoopCount);
+
+ options.AddLongOption("loop-delay", "Delay in milliseconds between loop steps")
+ .RequiredArgument("uint")
+ .DefaultValue(0)
+ .StoreMappedResultT<ui64>(&ExecutionOptions.LoopDelay, &TDuration::MilliSeconds<ui64>);
+
+ options.AddLongOption("continue-after-fail", "Don't not stop requests execution after fails")
+ .NoArgument()
+ .SetFlag(&ExecutionOptions.ContinueAfterFail);
+
+ // Cluster settings
+
+ options.AddLongOption("cp-storage", "Start real control plane storage instead of in memory (will use local database by default), token variable CP_STORAGE_TOKEN")
+ .OptionalArgument("database@endpoint")
+ .Handler1([this](const NLastGetopt::TOptsParser* option) {
+ RunnerOptions.FqSettings.EnableCpStorage = true;
+ if (const auto value = option->CurVal()) {
+ RunnerOptions.FqSettings.CpStorageDatabase = TExternalDatabase::Parse(value, "CP_STORAGE_TOKEN");
+ }
+ });
+
+ options.AddLongOption("checkpoints", "Start checkpoint coordinator (will use local database by default), token variable CHECKPOINTS_TOKEN")
+ .OptionalArgument("database@endpoint")
+ .Handler1([this](const NLastGetopt::TOptsParser* option) {
+ RunnerOptions.FqSettings.EnableCheckpoints = true;
+ if (const auto value = option->CurVal()) {
+ RunnerOptions.FqSettings.CheckpointsDatabase = TExternalDatabase::Parse(value, "CHECKPOINTS_TOKEN");
+ }
+ });
+
+ options.AddLongOption("quotas", "Start FQ quotas service and rate limiter (will be created local rate limiter by default), token variable QUOTAS_TOKEN")
+ .OptionalArgument("database@endpoint")
+ .Handler1([this](const NLastGetopt::TOptsParser* option) {
+ RunnerOptions.FqSettings.EnableQuotas = true;
+ if (const auto value = option->CurVal()) {
+ RunnerOptions.FqSettings.RateLimiterDatabase = TExternalDatabase::Parse(value, "QUOTAS_TOKEN");
+ }
+ });
+
+ options.AddLongOption("row-dispatcher", TStringBuilder() << "Use real coordinator for row dispatcher (will use local database by default), token variable ROW_DISPATCHER_TOKEN")
+ .OptionalArgument("database@endpoint")
+ .Handler1([this](const NLastGetopt::TOptsParser* option) {
+ RunnerOptions.FqSettings.EnableRemoteRd = true;
+ if (const auto value = option->CurVal()) {
+ RunnerOptions.FqSettings.RowDispatcherDatabase = TExternalDatabase::Parse(value, "ROW_DISPATCHER_TOKEN");
+ }
+ });
+
RegisterKikimrOptions(options, RunnerOptions.FqSettings);
}
@@ -242,16 +430,17 @@ protected:
RunnerOptions.FqSettings.YqlToken = GetEnv(YQL_TOKEN_VARIABLE);
RunnerOptions.FqSettings.FunctionRegistry = CreateFunctionRegistry().Get();
- auto& gatewayConfig = *RunnerOptions.FqSettings.FqConfig.mutable_gateways();
+ auto& fqConfig = RunnerOptions.FqSettings.FqConfig;
+ auto& gatewayConfig = *fqConfig.mutable_gateways();
FillTokens(gatewayConfig.mutable_pq());
FillTokens(gatewayConfig.mutable_s3());
FillTokens(gatewayConfig.mutable_generic());
FillTokens(gatewayConfig.mutable_ydb());
FillTokens(gatewayConfig.mutable_solomon());
- auto& logConfig = RunnerOptions.FqSettings.LogConfig;
- logConfig.SetDefaultLevel(NActors::NLog::EPriority::PRI_CRIT);
- FillLogConfig(logConfig);
+ fqConfig.MutablePendingFetcher()->SetPendingFetchPeriodMs(RunnerOptions.PingPeriod.MilliSeconds());
+
+ SetupLogsConfig();
if (!PqFilesMapping.empty()) {
auto fileGateway = MakeIntrusive<NYql::TDummyPqGateway>();
@@ -301,15 +490,44 @@ private:
}
}
+ void SetupLogsConfig() {
+ auto& logConfig = RunnerOptions.FqSettings.LogConfig;
+
+ logConfig.SetDefaultLevel(DefaultLogPriority.value_or(NActors::NLog::EPriority::PRI_CRIT));
+
+ if (FqLogPriority) {
+ std::unordered_map<NKikimrServices::EServiceKikimr, NActors::NLog::EPriority> fqLogPriorities;
+ std::unordered_set<TString> prefixes = {
+ "FQ_", "YQ_", "STREAMS", "PUBLIC_HTTP"
+ };
+ auto descriptor = NKikimrServices::EServiceKikimr_descriptor();
+ for (int i = 0; i < descriptor->value_count(); ++i) {
+ const auto service = static_cast<NKikimrServices::EServiceKikimr>(descriptor->value(i)->number());
+ const auto& servicceStr = NKikimrServices::EServiceKikimr_Name(service);
+ for (const auto& prefix : prefixes) {
+ if (servicceStr.StartsWith(prefix)) {
+ fqLogPriorities.emplace(service, *FqLogPriority);
+ break;
+ }
+ }
+ }
+ ModifyLogPriorities(fqLogPriorities, logConfig);
+ }
+
+ ModifyLogPriorities(LogPriorities, logConfig);
+ }
+
private:
TExecutionOptions ExecutionOptions;
TRunnerOptions RunnerOptions;
- std::unordered_map<TString, NYql::TDummyTopic> PqFilesMapping;
struct TTopicSettings {
bool CancelOnFileFinish = false;
};
std::unordered_map<TString, TTopicSettings> TopicsSettings;
+ std::unordered_map<TString, NYql::TDummyTopic> PqFilesMapping;
+
+ std::optional<NActors::NLog::EPriority> FqLogPriority;
};
} // anonymous namespace
diff --git a/ydb/tests/tools/fqrun/src/actors.cpp b/ydb/tests/tools/fqrun/src/actors.cpp
new file mode 100644
index 0000000000..9ce3706c4d
--- /dev/null
+++ b/ydb/tests/tools/fqrun/src/actors.cpp
@@ -0,0 +1,129 @@
+#include "actors.h"
+#include "common.h"
+
+#include <ydb/library/actors/core/actor_bootstrapped.h>
+
+using namespace NKikimrRun;
+
+namespace NFqRun {
+
+namespace {
+
+class TRunQueryActor : public NActors::TActorBootstrapped<TRunQueryActor> {
+public:
+ TRunQueryActor(TQueryRequest request, NThreading::TPromise<TQueryResponse> promise)
+ : Scope(request.Event->Scope)
+ , UserSid(request.Event->User)
+ , Token(request.Event->Token)
+ , PingPeriod(request.PingPeriod)
+ , Request(std::move(request.Event))
+ , Promise(promise)
+ {}
+
+ void Bootstrap() {
+ Send(NFq::ControlPlaneProxyActorId(), std::move(Request));
+
+ Become(&TRunQueryActor::StateFunc);
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(NFq::TEvControlPlaneProxy::TEvCreateQueryResponse, Handle);
+ hFunc(NFq::TEvControlPlaneProxy::TEvDescribeQueryResponse, Handle);
+ sFunc(NActors::TEvents::TEvWakeup, DescribeQuery);
+ )
+
+ void Handle(NFq::TEvControlPlaneProxy::TEvCreateQueryResponse::TPtr& ev) {
+ if (ev->Get()->Issues) {
+ Fail(FederatedQuery::QueryMeta::FAILED, {GroupIssues(NYql::TIssue("Failed to create request"), ev->Get()->Issues)});
+ return;
+ }
+
+ QueryId = ev->Get()->Result.query_id();
+ DescribeQuery();
+ }
+
+ void Handle(NFq::TEvControlPlaneProxy::TEvDescribeQueryResponse::TPtr& ev) {
+ if (ev->Get()->Issues) {
+ Fail(FederatedQuery::QueryMeta::FAILED, {GroupIssues(NYql::TIssue("Failed to describe request"), ev->Get()->Issues)});
+ return;
+ }
+
+ const auto& result = ev->Get()->Result.query();
+ const auto status = result.meta().status();
+ if (!IsFinalStatus(status)) {
+ Schedule(PingPeriod, new NActors::TEvents::TEvWakeup());
+ return;
+ }
+
+ TQueryResponse response;
+ response.Status = status;
+ NYql::IssuesFromMessage(result.issue(), response.Issues);
+ NYql::IssuesFromMessage(result.transient_issue(), response.TransientIssues);
+ Promise.SetValue(response);
+ PassAway();
+ }
+
+ void DescribeQuery() const {
+ FederatedQuery::DescribeQueryRequest request;
+ request.set_query_id(QueryId);
+
+ Send(NFq::ControlPlaneProxyActorId(), new NFq::TEvControlPlaneProxy::TEvDescribeQueryRequest(Scope, request, UserSid, Token, TVector<TString>{}));
+ }
+
+private:
+ void Fail(FederatedQuery::QueryMeta::ComputeStatus status, NYql::TIssues issues) {
+ Promise.SetValue({
+ .Status = status,
+ .Issues = std::move(issues)
+ });
+ PassAway();
+ }
+
+private:
+ const TString Scope;
+ const TString UserSid;
+ const TString Token;
+ const TDuration PingPeriod;
+
+ std::unique_ptr<NFq::TEvControlPlaneProxy::TEvCreateQueryRequest> Request;
+ NThreading::TPromise<TQueryResponse> Promise;
+ TString QueryId;
+};
+
+class TAsyncQueryRunnerActor : public TAsyncQueryRunnerActorBase<TQueryRequest, TQueryResponse> {
+ using TBase = TAsyncQueryRunnerActorBase<TQueryRequest, TQueryResponse>;
+
+public:
+ TAsyncQueryRunnerActor(const TAsyncQueriesSettings& settings)
+ : TBase(settings)
+ {}
+
+protected:
+ void RunQuery(TQueryRequest&& request, NThreading::TPromise<TQueryResponse> promise) override {
+ Register(new TRunQueryActor(std::move(request), promise));
+ }
+};
+
+} // anonymous namespace
+
+bool TQueryResponse::IsSuccess() const {
+ return Status == FederatedQuery::QueryMeta::COMPLETED;
+}
+
+TString TQueryResponse::GetStatus() const {
+ return FederatedQuery::QueryMeta::ComputeStatus_Name(Status);
+}
+
+TString TQueryResponse::GetError() const {
+ NYql::TIssues issues = Issues;
+ if (TransientIssues) {
+ issues.AddIssue(GroupIssues(NYql::TIssue("Transient issues"), TransientIssues));
+ }
+ return issues.ToString();
+}
+
+NActors::IActor* CreateAsyncQueryRunnerActor(const TAsyncQueriesSettings& settings) {
+ return new TAsyncQueryRunnerActor(settings);
+}
+
+} // namespace NFqRun
diff --git a/ydb/tests/tools/fqrun/src/actors.h b/ydb/tests/tools/fqrun/src/actors.h
new file mode 100644
index 0000000000..5f0be7d281
--- /dev/null
+++ b/ydb/tests/tools/fqrun/src/actors.h
@@ -0,0 +1,27 @@
+#pragma once
+
+#include <ydb/core/fq/libs/control_plane_proxy/events/events.h>
+#include <ydb/library/actors/core/actor.h>
+#include <ydb/library/actors/core/events.h>
+#include <ydb/tests/tools/kqprun/runlib/actors.h>
+
+namespace NFqRun {
+
+struct TQueryRequest {
+ std::unique_ptr<NFq::TEvControlPlaneProxy::TEvCreateQueryRequest> Event;
+ TDuration PingPeriod;
+};
+
+struct TQueryResponse {
+ FederatedQuery::QueryMeta::ComputeStatus Status;
+ NYql::TIssues Issues;
+ NYql::TIssues TransientIssues;
+
+ bool IsSuccess() const;
+ TString GetStatus() const;
+ TString GetError() const;
+};
+
+NActors::IActor* CreateAsyncQueryRunnerActor(const NKikimrRun::TAsyncQueriesSettings& settings);
+
+} // namespace NFqRun
diff --git a/ydb/tests/tools/fqrun/src/common.cpp b/ydb/tests/tools/fqrun/src/common.cpp
index 7d0fc18045..ea0aac6aa8 100644
--- a/ydb/tests/tools/fqrun/src/common.cpp
+++ b/ydb/tests/tools/fqrun/src/common.cpp
@@ -1,11 +1,50 @@
#include "common.h"
+#include <util/system/env.h>
+
+#include <ydb/library/aclib/aclib.h>
+
namespace NFqRun {
+TExternalDatabase TExternalDatabase::Parse(const TString& optionValue, const TString& tokenVar) {
+ TStringBuf database, endpoint;
+ TStringBuf(optionValue).Split('@', database, endpoint);
+ if (database.empty() || endpoint.empty()) {
+ ythrow yexception() << "Incorrect external database mapping, expected form database@endpoint";
+ }
+
+ TExternalDatabase result = {
+ .Endpoint = TString(endpoint),
+ .Database = TString(database),
+ .Token = GetEnv(tokenVar)
+ };
+
+ if (!result.Token) {
+ result.Token = GetEnv(YQL_TOKEN_VARIABLE);
+ if (!result.Token) {
+ result.Token = BUILTIN_ACL_ROOT;
+ }
+ }
+
+ return result;
+}
+
void SetupAcl(FederatedQuery::Acl* acl) {
if (acl->visibility() == FederatedQuery::Acl::VISIBILITY_UNSPECIFIED) {
acl->set_visibility(FederatedQuery::Acl::SCOPE);
}
}
+NYql::TIssue GroupIssues(NYql::TIssue rootIssue, const NYql::TIssues& childrenIssues) {
+ for (const auto& issue : childrenIssues) {
+ rootIssue.AddSubIssue(MakeIntrusive<NYql::TIssue>(issue));
+ }
+ return rootIssue;
+}
+
+bool IsFinalStatus(FederatedQuery::QueryMeta::ComputeStatus status) {
+ using EStatus = FederatedQuery::QueryMeta;
+ return IsIn({EStatus::FAILED, EStatus::COMPLETED, EStatus::ABORTED_BY_USER, EStatus::ABORTED_BY_SYSTEM}, status);
+}
+
} // namespace NFqRun
diff --git a/ydb/tests/tools/fqrun/src/common.h b/ydb/tests/tools/fqrun/src/common.h
index 5c139d83f5..f0960afd3f 100644
--- a/ydb/tests/tools/fqrun/src/common.h
+++ b/ydb/tests/tools/fqrun/src/common.h
@@ -14,6 +14,14 @@ namespace NFqRun {
constexpr char YQL_TOKEN_VARIABLE[] = "YQL_TOKEN";
constexpr i64 MAX_RESULT_SET_ROWS = 1000;
+struct TExternalDatabase {
+ TString Endpoint;
+ TString Database;
+ TString Token;
+
+ static TExternalDatabase Parse(const TString& optionValue, const TString& tokenVar);
+};
+
struct TFqSetupSettings : public NKikimrRun::TServerSettings {
enum class EVerbose {
None,
@@ -24,6 +32,19 @@ struct TFqSetupSettings : public NKikimrRun::TServerSettings {
};
bool EmulateS3 = false;
+ bool EnableTraceOpt = false;
+
+ bool EnableQuotas = false;
+ std::optional<TExternalDatabase> RateLimiterDatabase;
+
+ bool EnableCheckpoints = false;
+ std::optional<TExternalDatabase> CheckpointsDatabase;
+
+ bool EnableCpStorage = false;
+ std::optional<TExternalDatabase> CpStorageDatabase;
+
+ bool EnableRemoteRd = false;
+ std::optional<TExternalDatabase> RowDispatcherDatabase;
EVerbose VerboseLevel = EVerbose::Info;
@@ -33,19 +54,29 @@ struct TFqSetupSettings : public NKikimrRun::TServerSettings {
NFq::NConfig::TConfig FqConfig;
NKikimrConfig::TLogConfig LogConfig;
std::optional<NKikimrConfig::TActorSystemConfig> ActorSystemConfig;
+ NKikimrRun::TAsyncQueriesSettings AsyncQueriesSettings;
};
struct TRunnerOptions {
+ bool TraceOptAll = false;
+ std::unordered_set<ui64> TraceOptIds;
+
IOutputStream* ResultOutput = nullptr;
NKikimrRun::EResultOutputFormat ResultOutputFormat = NKikimrRun::EResultOutputFormat::RowsJson;
+ TDuration PingPeriod;
TFqSetupSettings FqSettings;
};
struct TRequestOptions {
TString Query;
+ ui64 QueryId;
};
void SetupAcl(FederatedQuery::Acl* acl);
+NYql::TIssue GroupIssues(NYql::TIssue rootIssue, const NYql::TIssues& childrenIssues);
+
+bool IsFinalStatus(FederatedQuery::QueryMeta::ComputeStatus status);
+
} // namespace NFqRun
diff --git a/ydb/tests/tools/fqrun/src/fq_runner.cpp b/ydb/tests/tools/fqrun/src/fq_runner.cpp
index 33995c17fa..36a53bbcff 100644
--- a/ydb/tests/tools/fqrun/src/fq_runner.cpp
+++ b/ydb/tests/tools/fqrun/src/fq_runner.cpp
@@ -10,8 +10,6 @@ namespace NFqRun {
class TFqRunner::TImpl {
using EVerbose = TFqSetupSettings::EVerbose;
- static constexpr TDuration REFRESH_PERIOD = TDuration::Seconds(1);
-
public:
explicit TImpl(const TRunnerOptions& options)
: Options(options)
@@ -22,6 +20,8 @@ public:
{}
bool ExecuteStreamQuery(const TRequestOptions& query) {
+ StartTraceOpt(query.QueryId);
+
if (VerboseLevel >= EVerbose::QueriesText) {
Cout << CoutColors.Cyan() << "Starting stream request:\n" << CoutColors.Default() << query.Query << Endl;
}
@@ -114,14 +114,26 @@ public:
return true;
}
-private:
- static bool IsFinalStatus(FederatedQuery::QueryMeta::ComputeStatus status) {
- using EStatus = FederatedQuery::QueryMeta;
- return IsIn({EStatus::FAILED, EStatus::COMPLETED, EStatus::ABORTED_BY_USER, EStatus::ABORTED_BY_SYSTEM}, status);
+ void ExecuteQueryAsync(const TRequestOptions& query) const {
+ StartTraceOpt(query.QueryId);
+
+ if (VerboseLevel >= EVerbose::QueriesText) {
+ Cout << CoutColors.Cyan() << "Starting async stream request:\n" << CoutColors.Default() << query.Query << Endl;
+ }
+
+ FqSetup.QueryRequestAsync(query, Options.PingPeriod);
+ }
+
+ void FinalizeRunner() const {
+ FqSetup.WaitAsyncQueries();
}
+private:
bool WaitStreamQuery() {
StartTime = TInstant::Now();
+ Y_DEFER {
+ TFqSetup::StopTraceOpt();
+ };
while (true) {
TExecutionMeta meta;
@@ -141,7 +153,7 @@ private:
return false;
}
- Sleep(REFRESH_PERIOD);
+ Sleep(Options.PingPeriod);
}
if (VerboseLevel >= EVerbose::Info) {
@@ -160,6 +172,12 @@ private:
return true;
}
+ void StartTraceOpt(size_t queryId) const {
+ if (Options.TraceOptAll || Options.TraceOptIds.contains(queryId)) {
+ FqSetup.StartTraceOpt();
+ }
+ }
+
private:
const TRunnerOptions Options;
const EVerbose VerboseLevel;
@@ -198,4 +216,12 @@ bool TFqRunner::CreateBindings(const std::vector<FederatedQuery::BindingContent>
return Impl->CreateBindings(bindings);
}
+void TFqRunner::ExecuteQueryAsync(const TRequestOptions& query) const {
+ Impl->ExecuteQueryAsync(query);
+}
+
+void TFqRunner::FinalizeRunner() const {
+ Impl->FinalizeRunner();
+}
+
} // namespace NFqRun
diff --git a/ydb/tests/tools/fqrun/src/fq_runner.h b/ydb/tests/tools/fqrun/src/fq_runner.h
index b6e85db142..153952a955 100644
--- a/ydb/tests/tools/fqrun/src/fq_runner.h
+++ b/ydb/tests/tools/fqrun/src/fq_runner.h
@@ -20,6 +20,10 @@ public:
bool CreateBindings(const std::vector<FederatedQuery::BindingContent>& bindings) const;
+ void ExecuteQueryAsync(const TRequestOptions& query) const;
+
+ void FinalizeRunner() const;
+
private:
class TImpl;
std::shared_ptr<TImpl> Impl;
diff --git a/ydb/tests/tools/fqrun/src/fq_setup.cpp b/ydb/tests/tools/fqrun/src/fq_setup.cpp
index b3f38ee699..98d53ac997 100644
--- a/ydb/tests/tools/fqrun/src/fq_setup.cpp
+++ b/ydb/tests/tools/fqrun/src/fq_setup.cpp
@@ -1,4 +1,5 @@
#include "fq_setup.h"
+#include "actors.h"
#include <library/cpp/colorizer/colors.h>
#include <library/cpp/testing/unittest/tests_data.h>
@@ -11,6 +12,8 @@
#include <ydb/library/grpc/server/actors/logger.h>
#include <ydb/library/security/ydb_credentials_provider_factory.h>
+#include <yql/essentials/utils/log/log.h>
+
using namespace NKikimrRun;
namespace NFqRun {
@@ -105,15 +108,13 @@ private:
const TString endpoint = TStringBuilder() << "localhost:" << grpcPort;
const TString database = NKikimr::CanonizePath(Settings.DomainName);
- const auto fillStorageConfig = [endpoint, database](NFq::NConfig::TYdbStorageConfig* config) {
- config->SetEndpoint(endpoint);
- config->SetDatabase(database);
+ const auto fillStorageConfig = [endpoint, database](NFq::NConfig::TYdbStorageConfig* config, std::optional<TExternalDatabase> externalDatabase = std::nullopt) {
+ config->SetEndpoint(externalDatabase ? externalDatabase->Endpoint : endpoint);
+ config->SetDatabase(externalDatabase ? externalDatabase->Database : database);
+ if (externalDatabase) {
+ config->SetToken(externalDatabase->Token);
+ }
};
- fillStorageConfig(fqConfig.MutableControlPlaneStorage()->MutableStorage());
- fillStorageConfig(fqConfig.MutableDbPool()->MutableStorage());
- fillStorageConfig(fqConfig.MutableCheckpointCoordinator()->MutableStorage());
- fillStorageConfig(fqConfig.MutableRateLimiter()->MutableDatabase());
- fillStorageConfig(fqConfig.MutableRowDispatcher()->MutableCoordinator()->MutableDatabase());
auto* privateApiConfig = fqConfig.MutablePrivateApi();
privateApiConfig->SetTaskServiceEndpoint(endpoint);
@@ -123,14 +124,37 @@ private:
nodesMenagerConfig->SetPort(grpcPort);
nodesMenagerConfig->SetHost("localhost");
- auto* healthConfig = fqConfig.MutableHealth();
- healthConfig->SetPort(grpcPort);
- healthConfig->SetDatabase(database);
-
if (Settings.EmulateS3) {
fqConfig.MutableCommon()->SetObjectStorageEndpoint("file://");
}
+ auto& cpStorage = *fqConfig.MutableControlPlaneStorage();
+ cpStorage.SetUseInMemory(!Settings.EnableCpStorage);
+ fillStorageConfig(cpStorage.MutableStorage(), Settings.CpStorageDatabase);
+ fillStorageConfig(fqConfig.MutableDbPool()->MutableStorage(), Settings.CpStorageDatabase);
+
+ auto& checkpoints = *fqConfig.MutableCheckpointCoordinator();
+ checkpoints.SetEnabled(Settings.EnableCheckpoints);
+ if (Settings.EnableCheckpoints) {
+ fillStorageConfig(checkpoints.MutableStorage(), Settings.CheckpointsDatabase);
+ }
+
+ fqConfig.MutableQuotasManager()->SetEnabled(Settings.EnableQuotas);
+
+ auto& rateLimiter = *fqConfig.MutableRateLimiter();
+ rateLimiter.SetEnabled(Settings.EnableQuotas);
+ rateLimiter.SetControlPlaneEnabled(Settings.EnableQuotas);
+ rateLimiter.SetDataPlaneEnabled(Settings.EnableQuotas);
+ if (Settings.EnableQuotas) {
+ fillStorageConfig(rateLimiter.MutableDatabase(), Settings.RateLimiterDatabase);
+ }
+
+ auto& rowDispatcher = *fqConfig.MutableRowDispatcher()->MutableCoordinator();
+ rowDispatcher.SetLocalMode(!Settings.EnableRemoteRd);
+ if (Settings.EnableRemoteRd) {
+ fillStorageConfig(rowDispatcher.MutableDatabase(), Settings.RowDispatcherDatabase);
+ }
+
return fqConfig;
}
@@ -155,11 +179,21 @@ private:
YqSharedResources->Init(GetRuntime()->GetActorSystem(0));
}
+ void InitializeYqlLogger() {
+ if (!Settings.EnableTraceOpt) {
+ return;
+ }
+
+ ModifyLogPriorities({{NKikimrServices::EServiceKikimr::YQL_PROXY, NActors::NLog::PRI_TRACE}}, Settings.LogConfig);
+ NYql::NLog::InitLogger(NActors::CreateNullBackend());
+ }
+
public:
explicit TImpl(const TFqSetupSettings& settings)
: Settings(settings)
{
const ui32 grpcPort = Settings.GrpcPort ? Settings.GrpcPort : PortManager.GetPort();
+ InitializeYqlLogger();
InitializeServer(grpcPort);
InitializeFqProxy(grpcPort);
@@ -179,15 +213,9 @@ public:
}
NFq::TEvControlPlaneProxy::TEvCreateQueryResponse::TPtr StreamRequest(const TRequestOptions& query) const {
- FederatedQuery::CreateQueryRequest request;
- request.set_execute_mode(FederatedQuery::ExecuteMode::RUN);
-
- auto& content = *request.mutable_content();
- content.set_type(FederatedQuery::QueryContent::STREAMING);
- content.set_text(query.Query);
- SetupAcl(content.mutable_acl());
-
- return RunControlPlaneProxyRequest<NFq::TEvControlPlaneProxy::TEvCreateQueryRequest, NFq::TEvControlPlaneProxy::TEvCreateQueryResponse>(request);
+ return RunControlPlaneProxyRequest<NFq::TEvControlPlaneProxy::TEvCreateQueryRequest, NFq::TEvControlPlaneProxy::TEvCreateQueryResponse>(
+ GetStreamRequest(query)
+ );
}
NFq::TEvControlPlaneProxy::TEvDescribeQueryResponse::TPtr DescribeQuery(const TString& queryId) const {
@@ -220,15 +248,69 @@ public:
return RunControlPlaneProxyRequest<NFq::TEvControlPlaneProxy::TEvCreateBindingRequest, NFq::TEvControlPlaneProxy::TEvCreateBindingResponse>(request);
}
+ void QueryRequestAsync(const TRequestOptions& query, TDuration pingPeriod) {
+ if (!AsyncQueryRunnerActorId) {
+ AsyncQueryRunnerActorId = GetRuntime()->Register(CreateAsyncQueryRunnerActor(Settings.AsyncQueriesSettings), 0, GetRuntime()->GetAppData().UserPoolId);
+ }
+
+ TQueryRequest request = {
+ .Event = GetControlPlaneRequest<NFq::TEvControlPlaneProxy::TEvCreateQueryRequest>(GetStreamRequest(query)),
+ .PingPeriod = pingPeriod
+ };
+ auto startPromise = NThreading::NewPromise();
+ GetRuntime()->Send(*AsyncQueryRunnerActorId, GetRuntime()->AllocateEdgeActor(), new NKikimrRun::TEvPrivate::TEvStartAsyncQuery(std::move(request), startPromise));
+
+ return startPromise.GetFuture().GetValueSync();
+ }
+
+ void WaitAsyncQueries() const {
+ if (!AsyncQueryRunnerActorId) {
+ return;
+ }
+
+ auto finalizePromise = NThreading::NewPromise();
+ GetRuntime()->Send(*AsyncQueryRunnerActorId, GetRuntime()->AllocateEdgeActor(), new NKikimrRun::TEvPrivate::TEvFinalizeAsyncQueryRunner(finalizePromise));
+
+ return finalizePromise.GetFuture().GetValueSync();
+ }
+
+ void StartTraceOpt() const {
+ if (!Settings.EnableTraceOpt) {
+ ythrow yexception() << "Trace opt was disabled";
+ }
+
+ NYql::NLog::YqlLogger().ResetBackend(CreateLogBackend());
+ }
+
+ static void StopTraceOpt() {
+ NYql::NLog::YqlLogger().ResetBackend(NActors::CreateNullBackend());
+ }
+
private:
NActors::TTestActorRuntime* GetRuntime() const {
return Server->GetRuntime();
}
+ static FederatedQuery::CreateQueryRequest GetStreamRequest(const TRequestOptions& query) {
+ FederatedQuery::CreateQueryRequest request;
+ request.set_execute_mode(FederatedQuery::ExecuteMode::RUN);
+
+ auto& content = *request.mutable_content();
+ content.set_type(FederatedQuery::QueryContent::STREAMING);
+ content.set_text(query.Query);
+ SetupAcl(content.mutable_acl());
+
+ return request;
+ }
+
+ template <typename TRequest, typename TProto>
+ std::unique_ptr<TRequest> GetControlPlaneRequest(const TProto& request) const {
+ return std::make_unique<TRequest>("yandexcloud://fqrun", request, BUILTIN_ACL_ROOT, Settings.YqlToken ? Settings.YqlToken : "fqrun", TVector<TString>{});
+ }
+
template <typename TRequest, typename TResponse, typename TProto>
typename TResponse::TPtr RunControlPlaneProxyRequest(const TProto& request) const {
- auto event = std::make_unique<TRequest>("yandexcloud://fqrun", request, BUILTIN_ACL_ROOT, Settings.YqlToken ? Settings.YqlToken : "fqrun", TVector<TString>{});
- return RunControlPlaneProxyRequest<TRequest, TResponse>(std::move(event));
+ return RunControlPlaneProxyRequest<TRequest, TResponse>(GetControlPlaneRequest<TRequest>(request));
}
template <typename TRequest, typename TResponse>
@@ -242,13 +324,15 @@ private:
}
private:
- const TFqSetupSettings Settings;
+ TFqSetupSettings Settings;
const NColorizer::TColors CoutColors;
NKikimr::Tests::TServer::TPtr Server;
std::unique_ptr<NKikimr::Tests::TClient> Client;
NFq::IYqSharedResources::TPtr YqSharedResources;
TPortManager PortManager;
+
+ std::optional<NActors::TActorId> AsyncQueryRunnerActorId;
};
TFqSetup::TFqSetup(const TFqSetupSettings& settings)
@@ -300,4 +384,20 @@ TRequestResult TFqSetup::CreateBinding(const FederatedQuery::BindingContent& bin
return GetStatus(response->Get()->Issues);
}
+void TFqSetup::QueryRequestAsync(const TRequestOptions& query, TDuration pingPeriod) const {
+ Impl->QueryRequestAsync(query, pingPeriod);
+}
+
+void TFqSetup::WaitAsyncQueries() const {
+ Impl->WaitAsyncQueries();
+}
+
+void TFqSetup::StartTraceOpt() const {
+ Impl->StartTraceOpt();
+}
+
+void TFqSetup::StopTraceOpt() {
+ TFqSetup::TImpl::StopTraceOpt();
+}
+
} // namespace NFqRun
diff --git a/ydb/tests/tools/fqrun/src/fq_setup.h b/ydb/tests/tools/fqrun/src/fq_setup.h
index 2ef7733bf0..560402b797 100644
--- a/ydb/tests/tools/fqrun/src/fq_setup.h
+++ b/ydb/tests/tools/fqrun/src/fq_setup.h
@@ -29,6 +29,14 @@ public:
TRequestResult CreateBinding(const FederatedQuery::BindingContent& binding) const;
+ void QueryRequestAsync(const TRequestOptions& query, TDuration pingPeriod) const;
+
+ void WaitAsyncQueries() const;
+
+ void StartTraceOpt() const;
+
+ static void StopTraceOpt();
+
private:
class TImpl;
std::shared_ptr<TImpl> Impl;
diff --git a/ydb/tests/tools/fqrun/src/ya.make b/ydb/tests/tools/fqrun/src/ya.make
index a5a54edf3a..bf88dd8dad 100644
--- a/ydb/tests/tools/fqrun/src/ya.make
+++ b/ydb/tests/tools/fqrun/src/ya.make
@@ -1,6 +1,7 @@
LIBRARY()
SRCS(
+ actors.cpp
common.cpp
fq_runner.cpp
fq_setup.cpp
@@ -14,6 +15,7 @@ PEERDIR(
ydb/core/fq/libs/init
ydb/core/fq/libs/mock
ydb/core/testlib
+ ydb/library/actors/core
ydb/library/folder_service/mock
ydb/library/grpc/server/actors
ydb/library/security
diff --git a/ydb/tests/tools/kqprun/README.md b/ydb/tests/tools/kqprun/README.md
index 10e2832941..225a58264a 100644
--- a/ydb/tests/tools/kqprun/README.md
+++ b/ydb/tests/tools/kqprun/README.md
@@ -4,6 +4,13 @@ Tool can be used to execute queries by using kikimr provider.
For profiling memory allocations build kqprun with ya make flag `-D PROFILE_MEMORY_ALLOCATIONS -D CXXFLAGS=-DPROFILE_MEMORY_ALLOCATIONS`.
+## Scripts
+
+* `flame_graph.sh` - script for collecting flame graphs in svg format, usage:
+ ```(bash)
+ ./flame_graph.sh [graph collection time in seconds] [use sudo]
+ ```
+
## Examples
### Queries
diff --git a/ydb/tests/tools/kqprun/flame_graph.sh b/ydb/tests/tools/kqprun/flame_graph.sh
index 61749c81b2..25467038bf 100755
--- a/ydb/tests/tools/kqprun/flame_graph.sh
+++ b/ydb/tests/tools/kqprun/flame_graph.sh
@@ -2,11 +2,30 @@
set -eux
-kqprun_pid=$(pgrep -u $USER kqprun)
+if [ $# -gt 3 ]; then
+ echo "Too many arguments"
+ exit -1
+fi
-sudo perf record -F 50 --call-graph dwarf -g --proc-map-timeout=10000 --pid $kqprun_pid -v -o profdata -- sleep ${1:-'30'}
-sudo perf script -i profdata > profdata.txt
+SUDO=""
-flame_graph_tool="../../../../contrib/tools/flame-graph/"
+if [ ${2:-''} ]; then
+ SUDO="sudo"
+fi
+
+function cleanup {
+ $SUDO rm ./profdata
+ rm ./profdata.txt
+}
+trap cleanup EXIT
+
+TARGER_PID=$(pgrep -u $USER ${3:-'kqprun'})
+
+$SUDO perf record -F 50 --call-graph dwarf -g --proc-map-timeout=10000 --pid $TARGER_PID -v -o profdata -- sleep ${1:-'30'}
+$SUDO perf script -i profdata > profdata.txt
+
+SCRIPT_DIR=$(cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd)
+
+flame_graph_tool="$SCRIPT_DIR/../../../../contrib/tools/flame-graph/"
${flame_graph_tool}/stackcollapse-perf.pl profdata.txt | ${flame_graph_tool}/flamegraph.pl > profdata.svg
diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp
index 3ec48c7d5b..87f147d762 100644
--- a/ydb/tests/tools/kqprun/kqprun.cpp
+++ b/ydb/tests/tools/kqprun/kqprun.cpp
@@ -404,6 +404,7 @@ class TMain : public TMainBase {
inline static const TString YqlToken = GetEnv(YQL_TOKEN_VARIABLE);
+ TDuration PingPeriod;
TExecutionOptions ExecutionOptions;
TRunnerOptions RunnerOptions;
@@ -623,6 +624,11 @@ protected:
.Choices(verbose.GetChoices())
.StoreMappedResultT<TString>(&RunnerOptions.YdbSettings.AsyncQueriesSettings.Verbose, verbose);
+ options.AddLongOption("ping-period", "Query ping period in milliseconds")
+ .RequiredArgument("uint")
+ .DefaultValue(1000)
+ .StoreMappedResultT<ui64>(&PingPeriod, &TDuration::MilliSeconds<ui64>);
+
TChoices<NKikimrKqp::EQueryAction> scriptAction({
{"execute", NKikimrKqp::QUERY_ACTION_EXECUTE},
{"explain", NKikimrKqp::QUERY_ACTION_EXPLAIN}
@@ -790,11 +796,13 @@ protected:
RunnerOptions.YdbSettings.FunctionRegistry = CreateFunctionRegistry().Get();
auto& appConfig = RunnerOptions.YdbSettings.AppConfig;
+ auto& queryService = *appConfig.MutableQueryServiceConfig();
if (ExecutionOptions.ResultsRowsLimit) {
- appConfig.MutableQueryServiceConfig()->SetScriptResultRowsLimit(ExecutionOptions.ResultsRowsLimit);
+ queryService.SetScriptResultRowsLimit(ExecutionOptions.ResultsRowsLimit);
}
+ queryService.SetProgressStatsPeriodMs(PingPeriod.MilliSeconds());
- FillLogConfig(*appConfig.MutableLogConfig());
+ SetupLogsConfig();
if (EmulateYt) {
const auto& fileStorageConfig = appConfig.GetQueryServiceConfig().GetFileStorage();
@@ -843,6 +851,14 @@ private:
}
}
}
+
+ void SetupLogsConfig() {
+ auto& logConfig = *RunnerOptions.YdbSettings.AppConfig.MutableLogConfig();
+ if (DefaultLogPriority) {
+ logConfig.SetDefaultLevel(*DefaultLogPriority);
+ }
+ ModifyLogPriorities(LogPriorities, logConfig);
+ }
};
} // anonymous namespace
diff --git a/ydb/tests/tools/kqprun/runlib/actors.h b/ydb/tests/tools/kqprun/runlib/actors.h
new file mode 100644
index 0000000000..9b9aebed9c
--- /dev/null
+++ b/ydb/tests/tools/kqprun/runlib/actors.h
@@ -0,0 +1,189 @@
+#pragma once
+
+#include "settings.h"
+
+#include <library/cpp/colorizer/colors.h>
+#include <library/cpp/threading/future/core/future.h>
+
+#include <ydb/library/actors/core/actor.h>
+#include <ydb/library/actors/core/events.h>
+#include <ydb/library/actors/core/hfunc.h>
+#include <ydb/public/api/protos/ydb_status_codes.pb.h>
+
+#include <yql/essentials/public/issue/yql_issue.h>
+#include <yql/essentials/public/issue/yql_issue_message.h>
+
+#include <queue>
+
+namespace NKikimrRun {
+
+struct TEvPrivate {
+ enum EEv : ui32 {
+ EvStartAsyncQuery = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
+ EvAsyncQueryFinished,
+ EvFinalizeAsyncQueryRunner,
+ EvEnd
+ };
+
+ static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)");
+
+ template <typename TQueryRequest>
+ struct TEvStartAsyncQuery : public NActors::TEventLocal<TEvStartAsyncQuery<TQueryRequest>, EvStartAsyncQuery> {
+ TEvStartAsyncQuery(TQueryRequest request, NThreading::TPromise<void> startPromise)
+ : Request(std::move(request))
+ , StartPromise(startPromise)
+ {}
+
+ TQueryRequest Request;
+ NThreading::TPromise<void> StartPromise;
+ };
+
+ template <typename TQueryResponse>
+ struct TEvAsyncQueryFinished : public NActors::TEventLocal<TEvAsyncQueryFinished<TQueryResponse>, EvAsyncQueryFinished> {
+ TEvAsyncQueryFinished(ui64 requestId, TQueryResponse result)
+ : RequestId(requestId)
+ , Result(std::move(result))
+ {}
+
+ const ui64 RequestId;
+ const TQueryResponse Result;
+ };
+
+ struct TEvFinalizeAsyncQueryRunner : public NActors::TEventLocal<TEvFinalizeAsyncQueryRunner, EvFinalizeAsyncQueryRunner> {
+ explicit TEvFinalizeAsyncQueryRunner(NThreading::TPromise<void> finalizePromise)
+ : FinalizePromise(finalizePromise)
+ {}
+
+ NThreading::TPromise<void> FinalizePromise;
+ };
+};
+
+template <typename TQueryRequest, typename TQueryResponse>
+class TAsyncQueryRunnerActorBase : public NActors::TActor<TAsyncQueryRunnerActorBase<TQueryRequest, TQueryResponse>> {
+ using TBase = NActors::TActor<TAsyncQueryRunnerActorBase<TQueryRequest, TQueryResponse>>;
+
+ struct TRequestInfo {
+ TInstant StartTime;
+ NThreading::TFuture<TQueryResponse> RequestFuture;
+ };
+
+public:
+ TAsyncQueryRunnerActorBase(const TAsyncQueriesSettings& settings)
+ : TBase(&TAsyncQueryRunnerActorBase::StateFunc)
+ , Settings_(settings)
+ {
+ RunningRequests_.reserve(Settings_.InFlightLimit);
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvPrivate::TEvStartAsyncQuery<TQueryRequest>, Handle);
+ hFunc(TEvPrivate::TEvAsyncQueryFinished<TQueryResponse>, Handle);
+ hFunc(TEvPrivate::TEvFinalizeAsyncQueryRunner, Handle);
+ )
+
+ void Handle(TEvPrivate::TEvStartAsyncQuery<TQueryRequest>::TPtr& ev) {
+ DelayedRequests_.emplace(std::move(ev));
+ StartDelayedRequests();
+ }
+
+ void Handle(TEvPrivate::TEvAsyncQueryFinished<TQueryResponse>::TPtr& ev) {
+ const ui64 requestId = ev->Get()->RequestId;
+ RequestsLatency_ += TInstant::Now() - RunningRequests_[requestId].StartTime;
+ RunningRequests_.erase(requestId);
+
+ const auto& response = ev->Get()->Result;
+
+ if (response.IsSuccess()) {
+ Completed_++;
+ if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::EachQuery) {
+ Cout << CoutColors_.Green() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " completed. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << Endl;
+ }
+ } else {
+ Failed_++;
+ Cout << CoutColors_.Red() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " failed " << response.GetStatus() << ". " << CoutColors_.Yellow() << GetInfoString() << "\n" << CoutColors_.Red() << "Issues:\n" << response.GetError() << CoutColors_.Default();
+ }
+
+ if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::Final && TInstant::Now() - LastReportTime_ > TDuration::Seconds(1)) {
+ Cout << CoutColors_.Green() << TInstant::Now().ToIsoStringLocal() << " Finished " << Failed_ + Completed_ << " requests. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << Endl;
+ LastReportTime_ = TInstant::Now();
+ }
+
+ StartDelayedRequests();
+ TryFinalize();
+ }
+
+ void Handle(TEvPrivate::TEvFinalizeAsyncQueryRunner::TPtr& ev) {
+ FinalizePromise_ = ev->Get()->FinalizePromise;
+ if (!TryFinalize()) {
+ Cout << CoutColors_.Yellow() << TInstant::Now().ToIsoStringLocal() << " Waiting for " << DelayedRequests_.size() + RunningRequests_.size() << " async queries..." << CoutColors_.Default() << Endl;
+ }
+ }
+
+protected:
+ virtual void RunQuery(TQueryRequest&& request, NThreading::TPromise<TQueryResponse> promise) = 0;
+
+private:
+ void StartDelayedRequests() {
+ while (!DelayedRequests_.empty() && (!Settings_.InFlightLimit || RunningRequests_.size() < Settings_.InFlightLimit)) {
+ auto request = std::move(DelayedRequests_.front());
+ DelayedRequests_.pop();
+
+ auto promise = NThreading::NewPromise<TQueryResponse>();
+ RunQuery(std::move(request->Get()->Request), promise);
+ RunningRequests_[RequestId_] = {
+ .StartTime = TInstant::Now(),
+ .RequestFuture = promise.GetFuture().Subscribe([id = RequestId_, this](const NThreading::TFuture<TQueryResponse>& f) {
+ this->Send(this->SelfId(), new TEvPrivate::TEvAsyncQueryFinished(id, std::move(f.GetValue())));
+ })
+ };
+
+ MaxInFlight_ = std::max(MaxInFlight_, RunningRequests_.size());
+ if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::EachQuery) {
+ Cout << CoutColors_.Cyan() << TInstant::Now().ToIsoStringLocal() << " Request #" << RequestId_ << " started. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << "\n";
+ }
+
+ RequestId_++;
+ request->Get()->StartPromise.SetValue();
+ }
+ }
+
+ bool TryFinalize() {
+ if (!FinalizePromise_ || !RunningRequests_.empty()) {
+ return false;
+ }
+
+ if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::Final) {
+ Cout << CoutColors_.Cyan() << TInstant::Now().ToIsoStringLocal() << " All async requests finished. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << "\n";
+ }
+
+ FinalizePromise_->SetValue();
+ this->PassAway();
+ return true;
+ }
+
+ TString GetInfoString() const {
+ TStringBuilder result = TStringBuilder() << "completed: " << Completed_ << ", failed: " << Failed_ << ", in flight: " << RunningRequests_.size() << ", max in flight: " << MaxInFlight_ << ", spend time: " << TInstant::Now() - StartTime_;
+ if (const auto amountRequests = Completed_ + Failed_) {
+ result << ", average latency: " << RequestsLatency_ / amountRequests;
+ }
+ return result;
+ }
+
+private:
+ const TAsyncQueriesSettings Settings_;
+ const TInstant StartTime_ = TInstant::Now();
+ const NColorizer::TColors CoutColors_ = NColorizer::AutoColors(Cout);
+
+ std::optional<NThreading::TPromise<void>> FinalizePromise_;
+ std::queue<typename TEvPrivate::TEvStartAsyncQuery<TQueryRequest>::TPtr> DelayedRequests_;
+ std::unordered_map<ui64, TRequestInfo> RunningRequests_;
+ TInstant LastReportTime_ = TInstant::Now();
+
+ ui64 RequestId_ = 1;
+ ui64 MaxInFlight_ = 0;
+ ui64 Completed_ = 0;
+ ui64 Failed_ = 0;
+ TDuration RequestsLatency_;
+};
+
+} // namespace NKikimrRun
diff --git a/ydb/tests/tools/kqprun/runlib/application.cpp b/ydb/tests/tools/kqprun/runlib/application.cpp
index 6b27760b5b..dac91ddb9d 100644
--- a/ydb/tests/tools/kqprun/runlib/application.cpp
+++ b/ydb/tests/tools/kqprun/runlib/application.cpp
@@ -18,6 +18,27 @@
namespace NKikimrRun {
+namespace {
+
+#ifdef PROFILE_MEMORY_ALLOCATIONS
+void InterruptHandler(int) {
+ NColorizer::TColors colors = NColorizer::AutoColors(Cerr);
+
+ Cout << colors.Red() << "Execution interrupted, finishing profile memory allocations..." << colors.Default() << Endl;
+ TMainBase::FinishProfileMemoryAllocations();
+
+ abort();
+}
+#endif
+
+} // anonymous namespace
+
+TMainBase::TMainBase() {
+#ifdef PROFILE_MEMORY_ALLOCATIONS
+ signal(SIGINT, &InterruptHandler);
+#endif
+}
+
#ifdef PROFILE_MEMORY_ALLOCATIONS
void TMainBase::FinishProfileMemoryAllocations() {
if (ProfileAllocationsOutput) {
@@ -103,7 +124,7 @@ void TMainBase::RegisterLogOptions(NLastGetopt::TOpts& options) {
.StoreMappedResultT<TString>(&DefaultLogPriority, GetLogPrioritiesMap("log-default"));
options.AddLongOption("log", "Component log priority in format <component>=<priority> (e. g. KQP_YQL=trace)")
- .RequiredArgument("component priority")
+ .RequiredArgument("component=priority")
.Handler1([this, logPriority = GetLogPrioritiesMap("log")](const NLastGetopt::TOptsParser* option) {
TStringBuf component;
TStringBuf priority;
@@ -119,13 +140,6 @@ void TMainBase::RegisterLogOptions(NLastGetopt::TOpts& options) {
});
}
-void TMainBase::FillLogConfig(NKikimrConfig::TLogConfig& config) const {
- if (DefaultLogPriority) {
- config.SetDefaultLevel(*DefaultLogPriority);
- }
- ModifyLogPriorities(LogPriorities, config);
-}
-
IOutputStream* TMainBase::GetDefaultOutput(const TString& file) {
if (file == "-") {
return &Cout;
diff --git a/ydb/tests/tools/kqprun/runlib/application.h b/ydb/tests/tools/kqprun/runlib/application.h
index fd67eb2ac0..ebb32506f3 100644
--- a/ydb/tests/tools/kqprun/runlib/application.h
+++ b/ydb/tests/tools/kqprun/runlib/application.h
@@ -16,8 +16,10 @@
namespace NKikimrRun {
class TMainBase : public TMainClassArgs {
-#ifdef PROFILE_MEMORY_ALLOCATIONS
public:
+ TMainBase();
+
+#ifdef PROFILE_MEMORY_ALLOCATIONS
static void FinishProfileMemoryAllocations();
#endif
@@ -26,8 +28,6 @@ protected:
virtual void RegisterLogOptions(NLastGetopt::TOpts& options);
- void FillLogConfig(NKikimrConfig::TLogConfig& config) const;
-
static IOutputStream* GetDefaultOutput(const TString& file);
TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> CreateFunctionRegistry() const;
@@ -36,12 +36,12 @@ protected:
inline static NColorizer::TColors CoutColors = NColorizer::AutoColors(Cout);
inline static IOutputStream* ProfileAllocationsOutput = nullptr;
-private:
- inline static std::vector<std::unique_ptr<TFileOutput>> FileHolders;
-
std::optional<NActors::NLog::EPriority> DefaultLogPriority;
std::unordered_map<NKikimrServices::EServiceKikimr, NActors::NLog::EPriority> LogPriorities;
+private:
+ inline static std::vector<std::unique_ptr<TFileOutput>> FileHolders;
+
TString UdfsDirectory;
TVector<TString> UdfsPaths;
bool ExcludeLinkedUdfs;
diff --git a/ydb/tests/tools/kqprun/runlib/settings.h b/ydb/tests/tools/kqprun/runlib/settings.h
index c77caf1876..874ec2b21e 100644
--- a/ydb/tests/tools/kqprun/runlib/settings.h
+++ b/ydb/tests/tools/kqprun/runlib/settings.h
@@ -4,6 +4,16 @@
namespace NKikimrRun {
+struct TAsyncQueriesSettings {
+ enum class EVerbose {
+ EachQuery,
+ Final,
+ };
+
+ ui64 InFlightLimit = 0;
+ EVerbose Verbose = EVerbose::EachQuery;
+};
+
struct TServerSettings {
TString DomainName = "Root";
diff --git a/ydb/tests/tools/kqprun/runlib/utils.cpp b/ydb/tests/tools/kqprun/runlib/utils.cpp
index f8c37d7b67..1b5a6373e3 100644
--- a/ydb/tests/tools/kqprun/runlib/utils.cpp
+++ b/ydb/tests/tools/kqprun/runlib/utils.cpp
@@ -1,5 +1,4 @@
#include "utils.h"
-#include "application.h"
#include <library/cpp/colorizer/colors.h>
#include <library/cpp/json/json_reader.h>
@@ -50,17 +49,6 @@ void FloatingPointExceptionHandler(int) {
abort();
}
-#ifdef PROFILE_MEMORY_ALLOCATIONS
-void InterruptHandler(int) {
- NColorizer::TColors colors = NColorizer::AutoColors(Cerr);
-
- Cout << colors.Red() << "Execution interrupted, finishing profile memory allocations..." << colors.Default() << Endl;
- TMainBase::FinishProfileMemoryAllocations();
-
- abort();
-}
-#endif
-
} // nonymous namespace
@@ -263,10 +251,6 @@ void SetupSignalActions() {
std::set_terminate(&TerminateHandler);
signal(SIGSEGV, &SegmentationFaultHandler);
signal(SIGFPE, &FloatingPointExceptionHandler);
-
-#ifdef PROFILE_MEMORY_ALLOCATIONS
- signal(SIGINT, &InterruptHandler);
-#endif
}
void PrintResultSet(EResultOutputFormat format, IOutputStream& output, const Ydb::ResultSet& resultSet) {
diff --git a/ydb/tests/tools/kqprun/runlib/ya.make b/ydb/tests/tools/kqprun/runlib/ya.make
index 39bd1328bf..cc92e6f903 100644
--- a/ydb/tests/tools/kqprun/runlib/ya.make
+++ b/ydb/tests/tools/kqprun/runlib/ya.make
@@ -9,6 +9,7 @@ PEERDIR(
library/cpp/colorizer
library/cpp/getopt
library/cpp/json
+ library/cpp/threading/future
ydb/core/base
ydb/core/blob_depot
ydb/core/fq/libs/compute/common
diff --git a/ydb/tests/tools/kqprun/src/actors.cpp b/ydb/tests/tools/kqprun/src/actors.cpp
index ab1ae17974..f869caa76b 100644
--- a/ydb/tests/tools/kqprun/src/actors.cpp
+++ b/ydb/tests/tools/kqprun/src/actors.cpp
@@ -6,6 +6,8 @@
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
#include <ydb/core/kqp/workload_service/actors/actors.h>
+using namespace NKikimrRun;
+
namespace NKqpRun {
namespace {
@@ -106,131 +108,18 @@ private:
std::vector<ui64> ResultSetSizes_;
};
-class TAsyncQueryRunnerActor : public NActors::TActor<TAsyncQueryRunnerActor> {
- using TBase = NActors::TActor<TAsyncQueryRunnerActor>;
-
- struct TRequestInfo {
- TInstant StartTime;
- NThreading::TFuture<TQueryResponse> RequestFuture;
- };
+class TAsyncQueryRunnerActor : public TAsyncQueryRunnerActorBase<TQueryRequest, TQueryResponse> {
+ using TBase = TAsyncQueryRunnerActorBase<TQueryRequest, TQueryResponse>;
public:
TAsyncQueryRunnerActor(const TAsyncQueriesSettings& settings)
- : TBase(&TAsyncQueryRunnerActor::StateFunc)
- , Settings_(settings)
- {
- RunningRequests_.reserve(Settings_.InFlightLimit);
- }
-
- STRICT_STFUNC(StateFunc,
- hFunc(TEvPrivate::TEvStartAsyncQuery, Handle);
- hFunc(TEvPrivate::TEvAsyncQueryFinished, Handle);
- hFunc(TEvPrivate::TEvFinalizeAsyncQueryRunner, Handle);
- )
-
- void Handle(TEvPrivate::TEvStartAsyncQuery::TPtr& ev) {
- DelayedRequests_.emplace(std::move(ev));
- StartDelayedRequests();
- }
-
- void Handle(TEvPrivate::TEvAsyncQueryFinished::TPtr& ev) {
- const ui64 requestId = ev->Get()->RequestId;
- RequestsLatency_ += TInstant::Now() - RunningRequests_[requestId].StartTime;
- RunningRequests_.erase(requestId);
-
- const auto& response = ev->Get()->Result.Response->Get()->Record;
- const auto status = response.GetYdbStatus();
-
- if (status == Ydb::StatusIds::SUCCESS) {
- Completed_++;
- if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::EachQuery) {
- Cout << CoutColors_.Green() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " completed. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << Endl;
- }
- } else {
- Failed_++;
- NYql::TIssues issues;
- NYql::IssuesFromMessage(response.GetResponse().GetQueryIssues(), issues);
- Cout << CoutColors_.Red() << TInstant::Now().ToIsoStringLocal() << " Request #" << requestId << " failed " << status << ". " << CoutColors_.Yellow() << GetInfoString() << "\n" << CoutColors_.Red() << "Issues:\n" << issues.ToString() << CoutColors_.Default();
- }
-
- if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::Final && TInstant::Now() - LastReportTime_ > TDuration::Seconds(1)) {
- Cout << CoutColors_.Green() << TInstant::Now().ToIsoStringLocal() << " Finished " << Failed_ + Completed_ << " requests. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << Endl;
- LastReportTime_ = TInstant::Now();
- }
-
- StartDelayedRequests();
- TryFinalize();
- }
-
- void Handle(TEvPrivate::TEvFinalizeAsyncQueryRunner::TPtr& ev) {
- FinalizePromise_ = ev->Get()->FinalizePromise;
- if (!TryFinalize()) {
- Cout << CoutColors_.Yellow() << TInstant::Now().ToIsoStringLocal() << " Waiting for " << DelayedRequests_.size() + RunningRequests_.size() << " async queries..." << CoutColors_.Default() << Endl;
- }
- }
-
-private:
- void StartDelayedRequests() {
- while (!DelayedRequests_.empty() && (!Settings_.InFlightLimit || RunningRequests_.size() < Settings_.InFlightLimit)) {
- auto request = std::move(DelayedRequests_.front());
- DelayedRequests_.pop();
-
- auto promise = NThreading::NewPromise<TQueryResponse>();
- Register(CreateRunScriptActorMock(std::move(request->Get()->Request), promise, nullptr));
- RunningRequests_[RequestId_] = {
- .StartTime = TInstant::Now(),
- .RequestFuture = promise.GetFuture().Subscribe([id = RequestId_, this](const NThreading::TFuture<TQueryResponse>& f) {
- Send(SelfId(), new TEvPrivate::TEvAsyncQueryFinished(id, std::move(f.GetValue())));
- })
- };
-
- MaxInFlight_ = std::max(MaxInFlight_, RunningRequests_.size());
- if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::EachQuery) {
- Cout << CoutColors_.Cyan() << TInstant::Now().ToIsoStringLocal() << " Request #" << RequestId_ << " started. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << "\n";
- }
-
- RequestId_++;
- request->Get()->StartPromise.SetValue();
- }
- }
-
- bool TryFinalize() {
- if (!FinalizePromise_ || !RunningRequests_.empty()) {
- return false;
- }
-
- if (Settings_.Verbose == TAsyncQueriesSettings::EVerbose::Final) {
- Cout << CoutColors_.Cyan() << TInstant::Now().ToIsoStringLocal() << " All async requests finished. " << CoutColors_.Yellow() << GetInfoString() << CoutColors_.Default() << "\n";
- }
-
- FinalizePromise_->SetValue();
- PassAway();
- return true;
- }
+ : TBase(settings)
+ {}
- TString GetInfoString() const {
- TStringBuilder result = TStringBuilder() << "completed: " << Completed_ << ", failed: " << Failed_ << ", in flight: " << RunningRequests_.size() << ", max in flight: " << MaxInFlight_ << ", spend time: " << TInstant::Now() - StartTime_;
- if (const auto amountRequests = Completed_ + Failed_) {
- result << ", average latency: " << RequestsLatency_ / amountRequests;
- }
- return result;
+protected:
+ void RunQuery(TQueryRequest&& request, NThreading::TPromise<TQueryResponse> promise) override {
+ Register(CreateRunScriptActorMock(std::move(request), promise, nullptr));
}
-
-private:
- const TAsyncQueriesSettings Settings_;
- const TInstant StartTime_ = TInstant::Now();
- const NColorizer::TColors CoutColors_ = NColorizer::AutoColors(Cout);
-
- std::optional<NThreading::TPromise<void>> FinalizePromise_;
- std::queue<TEvPrivate::TEvStartAsyncQuery::TPtr> DelayedRequests_;
- std::unordered_map<ui64, TRequestInfo> RunningRequests_;
- TInstant LastReportTime_ = TInstant::Now();
-
- ui64 RequestId_ = 1;
- ui64 MaxInFlight_ = 0;
- ui64 Completed_ = 0;
- ui64 Failed_ = 0;
- TDuration RequestsLatency_;
};
class TResourcesWaiterActor : public NActors::TActorBootstrapped<TResourcesWaiterActor> {
@@ -291,17 +180,6 @@ public:
}
}
- void Handle(TEvPrivate::TEvResourcesInfo::TPtr& ev) {
- const auto nodeCount = ev->Get()->NodeCount;
- if (nodeCount == Settings_.ExpectedNodeCount) {
- HealthCheckStage_ = EHealthCheck::FetchDatabase;
- DoHealthCheck();
- return;
- }
-
- Retry(TStringBuilder() << "invalid node count, got " << nodeCount << ", expected " << Settings_.ExpectedNodeCount, true);
- }
-
void Handle(NKikimr::NKqp::NWorkload::TEvFetchDatabaseResponse::TPtr& ev) {
const auto status = ev->Get()->Status;
if (status == Ydb::StatusIds::SUCCESS) {
@@ -325,7 +203,6 @@ public:
STRICT_STFUNC(StateFunc,
sFunc(NActors::TEvents::TEvWakeup, DoHealthCheck);
- hFunc(TEvPrivate::TEvResourcesInfo, Handle);
hFunc(NKikimr::NKqp::NWorkload::TEvFetchDatabaseResponse, Handle);
hFunc(NKikimr::NKqp::TEvKqp::TEvScriptResponse, Handle);
)
@@ -341,10 +218,14 @@ private:
return;
}
- ResourceManager_->RequestClusterResourcesInfo(
- [selfId = SelfId(), actorContext = ActorContext()](TVector<NKikimrKqp::TKqpNodeResources>&& resources) {
- actorContext.Send(selfId, new TEvPrivate::TEvResourcesInfo(resources.size()));
- });
+ const size_t nodeCount = ResourceManager_->GetClusterResources().size();
+ if (nodeCount == static_cast<size_t>(Settings_.ExpectedNodeCount)) {
+ HealthCheckStage_ = EHealthCheck::FetchDatabase;
+ DoHealthCheck();
+ return;
+ }
+
+ Retry(TStringBuilder() << "invalid node count, got " << nodeCount << ", expected " << Settings_.ExpectedNodeCount, true);
}
void FetchDatabase() {
@@ -525,6 +406,20 @@ private:
} // anonymous namespace
+bool TQueryResponse::IsSuccess() const {
+ return GetStatus() == Ydb::StatusIds::SUCCESS;
+}
+
+Ydb::StatusIds::StatusCode TQueryResponse::GetStatus() const {
+ return Response->Get()->Record.GetYdbStatus();
+}
+
+TString TQueryResponse::GetError() const {
+ NYql::TIssues issues;
+ NYql::IssuesFromMessage(Response->Get()->Record.GetResponse().GetQueryIssues(), issues);
+ return issues.ToString();
+}
+
NActors::IActor* CreateRunScriptActorMock(TQueryRequest request, NThreading::TPromise<TQueryResponse> promise, TProgressCallback progressCallback) {
return new TRunScriptActorMock(std::move(request), promise, progressCallback);
}
diff --git a/ydb/tests/tools/kqprun/src/actors.h b/ydb/tests/tools/kqprun/src/actors.h
index 410794491b..4f555cc6a5 100644
--- a/ydb/tests/tools/kqprun/src/actors.h
+++ b/ydb/tests/tools/kqprun/src/actors.h
@@ -4,13 +4,17 @@
#include <ydb/core/kqp/common/events/events.h>
#include <ydb/core/kqp/executer_actor/kqp_executer.h>
-
+#include <ydb/tests/tools/kqprun/runlib/actors.h>
namespace NKqpRun {
struct TQueryResponse {
NKikimr::NKqp::TEvKqp::TEvQueryResponse::TPtr Response;
std::vector<Ydb::ResultSet> ResultSets;
+
+ bool IsSuccess() const;
+ Ydb::StatusIds::StatusCode GetStatus() const;
+ TString GetError() const;
};
struct TQueryRequest {
@@ -35,61 +39,11 @@ struct TWaitResourcesSettings {
TString Database;
};
-struct TEvPrivate {
- enum EEv : ui32 {
- EvStartAsyncQuery = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
- EvAsyncQueryFinished,
- EvFinalizeAsyncQueryRunner,
-
- EvResourcesInfo,
-
- EvEnd
- };
-
- static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)");
-
- struct TEvStartAsyncQuery : public NActors::TEventLocal<TEvStartAsyncQuery, EvStartAsyncQuery> {
- TEvStartAsyncQuery(TQueryRequest request, NThreading::TPromise<void> startPromise)
- : Request(std::move(request))
- , StartPromise(startPromise)
- {}
-
- TQueryRequest Request;
- NThreading::TPromise<void> StartPromise;
- };
-
- struct TEvAsyncQueryFinished : public NActors::TEventLocal<TEvAsyncQueryFinished, EvAsyncQueryFinished> {
- TEvAsyncQueryFinished(ui64 requestId, TQueryResponse result)
- : RequestId(requestId)
- , Result(std::move(result))
- {}
-
- const ui64 RequestId;
- const TQueryResponse Result;
- };
-
- struct TEvFinalizeAsyncQueryRunner : public NActors::TEventLocal<TEvFinalizeAsyncQueryRunner, EvFinalizeAsyncQueryRunner> {
- explicit TEvFinalizeAsyncQueryRunner(NThreading::TPromise<void> finalizePromise)
- : FinalizePromise(finalizePromise)
- {}
-
- NThreading::TPromise<void> FinalizePromise;
- };
-
- struct TEvResourcesInfo : public NActors::TEventLocal<TEvResourcesInfo, EvResourcesInfo> {
- explicit TEvResourcesInfo(i32 nodeCount)
- : NodeCount(nodeCount)
- {}
-
- const i32 NodeCount;
- };
-};
-
using TProgressCallback = std::function<void(ui64 queryId, const NKikimrKqp::TEvExecuterProgress& executerProgress)>;
NActors::IActor* CreateRunScriptActorMock(TQueryRequest request, NThreading::TPromise<TQueryResponse> promise, TProgressCallback progressCallback);
-NActors::IActor* CreateAsyncQueryRunnerActor(const TAsyncQueriesSettings& settings);
+NActors::IActor* CreateAsyncQueryRunnerActor(const NKikimrRun::TAsyncQueriesSettings& settings);
NActors::IActor* CreateResourcesWaiterActor(NThreading::TPromise<void> promise, const TWaitResourcesSettings& settings);
diff --git a/ydb/tests/tools/kqprun/src/common.h b/ydb/tests/tools/kqprun/src/common.h
index f401b8d8c8..057223d6d1 100644
--- a/ydb/tests/tools/kqprun/src/common.h
+++ b/ydb/tests/tools/kqprun/src/common.h
@@ -22,16 +22,6 @@ constexpr char YQL_TOKEN_VARIABLE[] = "YQL_TOKEN";
constexpr ui64 DEFAULT_STORAGE_SIZE = 32_GB;
constexpr TDuration TENANT_CREATION_TIMEOUT = TDuration::Seconds(30);
-struct TAsyncQueriesSettings {
- enum class EVerbose {
- EachQuery,
- Final,
- };
-
- ui64 InFlightLimit = 0;
- EVerbose Verbose = EVerbose::EachQuery;
-};
-
struct TYdbSetupSettings : public NKikimrRun::TServerSettings {
enum class EVerbose {
None,
@@ -68,7 +58,7 @@ struct TYdbSetupSettings : public NKikimrRun::TServerSettings {
NKikimr::NMiniKQL::TComputationNodeFactory ComputationFactory;
TIntrusivePtr<NYql::IYtGateway> YtGateway;
NKikimrConfig::TAppConfig AppConfig;
- TAsyncQueriesSettings AsyncQueriesSettings;
+ NKikimrRun::TAsyncQueriesSettings AsyncQueriesSettings;
};
diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.cpp b/ydb/tests/tools/kqprun/src/kqp_runner.cpp
index bbc432be16..f295846a35 100644
--- a/ydb/tests/tools/kqprun/src/kqp_runner.cpp
+++ b/ydb/tests/tools/kqprun/src/kqp_runner.cpp
@@ -173,6 +173,9 @@ public:
private:
bool WaitScriptExecutionOperation(ui64 queryId) {
StartTime_ = TInstant::Now();
+ Y_DEFER {
+ TYdbSetup::StopTraceOpt();
+ };
TDuration getOperationPeriod = TDuration::Seconds(1);
if (auto progressStatsPeriodMs = Options_.YdbSettings.AppConfig.GetQueryServiceConfig().GetProgressStatsPeriodMs()) {
@@ -205,8 +208,6 @@ private:
Sleep(getOperationPeriod);
}
- TYdbSetup::StopTraceOpt();
-
PrintScriptAst(queryId, ExecutionMeta_.Ast);
PrintScriptProgress(queryId, ExecutionMeta_.Plan);
PrintScriptPlan(queryId, ExecutionMeta_.Plan);
diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.cpp b/ydb/tests/tools/kqprun/src/ydb_setup.cpp
index d70b727c78..145d4eaf78 100644
--- a/ydb/tests/tools/kqprun/src/ydb_setup.cpp
+++ b/ydb/tests/tools/kqprun/src/ydb_setup.cpp
@@ -550,7 +550,7 @@ public:
auto request = GetQueryRequest(query);
auto startPromise = NThreading::NewPromise();
- GetRuntime()->Send(*AsyncQueryRunnerActorId_, GetRuntime()->AllocateEdgeActor(), new TEvPrivate::TEvStartAsyncQuery(std::move(request), startPromise));
+ GetRuntime()->Send(*AsyncQueryRunnerActorId_, GetRuntime()->AllocateEdgeActor(), new NKikimrRun::TEvPrivate::TEvStartAsyncQuery(std::move(request), startPromise));
return startPromise.GetFuture().GetValueSync();
}
@@ -561,7 +561,7 @@ public:
}
auto finalizePromise = NThreading::NewPromise();
- GetRuntime()->Send(*AsyncQueryRunnerActorId_, GetRuntime()->AllocateEdgeActor(), new TEvPrivate::TEvFinalizeAsyncQueryRunner(finalizePromise));
+ GetRuntime()->Send(*AsyncQueryRunnerActorId_, GetRuntime()->AllocateEdgeActor(), new NKikimrRun::TEvPrivate::TEvFinalizeAsyncQueryRunner(finalizePromise));
return finalizePromise.GetFuture().GetValueSync();
}