summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/testlib/test_client.cpp73
-rw-r--r--ydb/core/testlib/test_client.h4
-rw-r--r--ydb/tests/tools/kqprun/.gitignore2
-rw-r--r--ydb/tests/tools/kqprun/configuration/app_config.conf46
-rwxr-xr-xydb/tests/tools/kqprun/flame_graph.sh11
-rw-r--r--ydb/tests/tools/kqprun/kqprun.cpp177
-rw-r--r--ydb/tests/tools/kqprun/src/common.h6
-rw-r--r--ydb/tests/tools/kqprun/src/kqp_runner.cpp17
-rw-r--r--ydb/tests/tools/kqprun/src/ydb_setup.cpp28
9 files changed, 301 insertions, 63 deletions
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index bf73678f918..0e210a41261 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -338,11 +338,11 @@ namespace Tests {
}
}
- void TServer::EnableGRpc(const NYdbGrpc::TServerOptions& options) {
+ void TServer::EnableGRpc(const NYdbGrpc::TServerOptions& options, ui32 grpcServiceNodeId) {
GRpcServer.reset(new NYdbGrpc::TGRpcServer(options));
auto grpcService = new NGRpcProxy::TGRpcService();
- auto system(Runtime->GetAnyNodeActorSystem());
+ auto system(Runtime->GetActorSystem(grpcServiceNodeId));
if (Settings->Verbose) {
Cerr << "TServer::EnableGrpc on GrpcPort " << options.Port << ", node " << system->NodeId << Endl;
@@ -352,21 +352,23 @@ namespace Tests {
TVector<TActorId> grpcRequestProxies;
grpcRequestProxies.reserve(proxyCount);
- auto& appData = Runtime->GetAppData();
+ auto& appData = Runtime->GetAppData(grpcServiceNodeId);
NJaegerTracing::TSamplingThrottlingConfigurator tracingConfigurator(appData.TimeProvider, appData.RandomProvider);
for (size_t i = 0; i < proxyCount; ++i) {
auto grpcRequestProxy = NGRpcService::CreateGRpcRequestProxy(*Settings->AppConfig, tracingConfigurator.GetControl());
- auto grpcRequestProxyId = system->Register(grpcRequestProxy, TMailboxType::ReadAsFilled);
+ auto grpcRequestProxyId = system->Register(grpcRequestProxy, TMailboxType::ReadAsFilled, appData.UserPoolId);
system->RegisterLocalService(NGRpcService::CreateGRpcRequestProxyId(), grpcRequestProxyId);
grpcRequestProxies.push_back(grpcRequestProxyId);
}
system->Register(
- NConsole::CreateJaegerTracingConfigurator(std::move(tracingConfigurator), Settings->AppConfig->GetTracingConfig())
+ NConsole::CreateJaegerTracingConfigurator(std::move(tracingConfigurator), Settings->AppConfig->GetTracingConfig()),
+ TMailboxType::ReadAsFilled,
+ appData.UserPoolId
);
- auto grpcMon = system->Register(NGRpcService::CreateGrpcMonService(), TMailboxType::ReadAsFilled);
+ auto grpcMon = system->Register(NGRpcService::CreateGrpcMonService(), TMailboxType::ReadAsFilled, appData.UserPoolId);
system->RegisterLocalService(NGRpcService::GrpcMonServiceId(), grpcMon);
GRpcServerRootCounters = MakeIntrusive<::NMonitoring::TDynamicCounters>();
@@ -448,11 +450,12 @@ namespace Tests {
GRpcServer->Start();
}
- void TServer::EnableGRpc(ui16 port) {
+ void TServer::EnableGRpc(ui16 port, ui32 grpcServiceNodeId) {
EnableGRpc(NYdbGrpc::TServerOptions()
.SetHost("localhost")
.SetPort(port)
- .SetLogger(NYdbGrpc::CreateActorSystemLogger(*Runtime->GetAnyNodeActorSystem(), NKikimrServices::GRPC_SERVER))
+ .SetLogger(NYdbGrpc::CreateActorSystemLogger(*Runtime->GetActorSystem(grpcServiceNodeId), NKikimrServices::GRPC_SERVER)),
+ grpcServiceNodeId
);
}
@@ -796,7 +799,7 @@ namespace Tests {
NKikimr::NConfig::TConfigsDispatcherInitInfo {
.InitialConfig = initial,
});
- auto aid = Runtime->Register(dispatcher, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0);
+ auto aid = Runtime->Register(dispatcher, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0);
Runtime->RegisterService(NConsole::MakeConfigsDispatcherID(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx);
}
if (Settings->IsEnableMetadataProvider()) {
@@ -813,7 +816,7 @@ namespace Tests {
}
if (Settings->IsEnableExternalIndex()) {
auto* actor = NCSIndex::CreateService(NCSIndex::TConfig());
- const auto aid = Runtime->Register(actor, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0);
+ const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0);
Runtime->RegisterService(NCSIndex::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx);
}
{
@@ -839,7 +842,7 @@ namespace Tests {
Runtime->Register(CreateLabelsMaintainer({}), nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0);
auto sysViewService = NSysView::CreateSysViewServiceForTests();
- TActorId sysViewServiceId = Runtime->Register(sysViewService.Release(), nodeIdx);
+ TActorId sysViewServiceId = Runtime->Register(sysViewService.Release(), nodeIdx, appData.UserPoolId);
Runtime->RegisterService(NSysView::MakeSysViewServiceID(Runtime->GetNodeId(nodeIdx)), sysViewServiceId, nodeIdx);
auto tenantPublisher = CreateTenantNodeEnumerationPublisher();
@@ -857,11 +860,13 @@ namespace Tests {
}
void TServer::SetupProxies(ui32 nodeIdx) {
+ const ui32 userPoolId = Runtime->GetAppData(nodeIdx).UserPoolId;
+
Runtime->SetTxAllocatorTabletIds({ChangeStateStorage(TxAllocator, Settings->Domain)});
{
if (Settings->AuthConfig.HasLdapAuthentication()) {
IActor* ldapAuthProvider = NKikimr::CreateLdapAuthProvider(Settings->AuthConfig.GetLdapAuthentication());
- TActorId ldapAuthProviderId = Runtime->Register(ldapAuthProvider, nodeIdx);
+ TActorId ldapAuthProviderId = Runtime->Register(ldapAuthProvider, nodeIdx, userPoolId);
Runtime->RegisterService(MakeLdapAuthProviderID(), ldapAuthProviderId, nodeIdx);
}
TTicketParserSettings ticketParserSettings {
@@ -873,19 +878,19 @@ namespace Tests {
}
};
IActor* ticketParser = Settings->CreateTicketParser(ticketParserSettings);
- TActorId ticketParserId = Runtime->Register(ticketParser, nodeIdx);
+ TActorId ticketParserId = Runtime->Register(ticketParser, nodeIdx, userPoolId);
Runtime->RegisterService(MakeTicketParserID(), ticketParserId, nodeIdx);
}
{
IActor* healthCheck = NHealthCheck::CreateHealthCheckService();
- TActorId healthCheckId = Runtime->Register(healthCheck, nodeIdx);
+ TActorId healthCheckId = Runtime->Register(healthCheck, nodeIdx, userPoolId);
Runtime->RegisterService(NHealthCheck::MakeHealthCheckID(), healthCheckId, nodeIdx);
}
{
const auto& appData = Runtime->GetAppData(nodeIdx);
IActor* metadataCache = CreateDatabaseMetadataCache(appData.TenantName, appData.Counters).release();
- TActorId metadataCacheId = Runtime->Register(metadataCache, nodeIdx);
+ TActorId metadataCacheId = Runtime->Register(metadataCache, nodeIdx, userPoolId);
Runtime->RegisterService(MakeDatabaseMetadataCacheId(Runtime->GetNodeId(nodeIdx)), metadataCacheId, nodeIdx);
}
{
@@ -893,7 +898,7 @@ namespace Tests {
IActor* kqpRmService = NKqp::CreateKqpResourceManagerActor(
Settings->AppConfig->GetTableServiceConfig().GetResourceManager(), nullptr, {}, kqpProxySharedResources, Runtime->GetNodeId(nodeIdx));
- TActorId kqpRmServiceId = Runtime->Register(kqpRmService, nodeIdx);
+ TActorId kqpRmServiceId = Runtime->Register(kqpRmService, nodeIdx, userPoolId);
Runtime->RegisterService(NKqp::MakeKqpRmServiceID(Runtime->GetNodeId(nodeIdx)), kqpRmServiceId, nodeIdx);
if (!KqpLoggerScope) {
@@ -916,14 +921,14 @@ namespace Tests {
auto httpProxyActorId = NFq::MakeYqlAnalyticsHttpProxyId();
Runtime->RegisterService(
httpProxyActorId,
- Runtime->Register(NHttp::CreateHttpProxy(), nodeIdx),
+ Runtime->Register(NHttp::CreateHttpProxy(), nodeIdx, userPoolId),
nodeIdx
);
auto databaseResolverActorId = NFq::MakeDatabaseResolverActorId();
Runtime->RegisterService(
databaseResolverActorId,
- Runtime->Register(NFq::CreateDatabaseResolver(httpProxyActorId, Settings->CredentialsFactory), nodeIdx),
+ Runtime->Register(NFq::CreateDatabaseResolver(httpProxyActorId, Settings->CredentialsFactory), nodeIdx, userPoolId),
nodeIdx
);
@@ -957,13 +962,13 @@ namespace Tests {
TVector<NKikimrKqp::TKqpSetting>(Settings->KqpSettings),
nullptr, std::move(kqpProxySharedResources),
federatedQuerySetupFactory, Settings->S3ActorsFactory);
- TActorId kqpProxyServiceId = Runtime->Register(kqpProxyService, nodeIdx);
+ TActorId kqpProxyServiceId = Runtime->Register(kqpProxyService, nodeIdx, userPoolId);
Runtime->RegisterService(NKqp::MakeKqpProxyID(Runtime->GetNodeId(nodeIdx)), kqpProxyServiceId, nodeIdx);
IActor* scriptFinalizeService = NKqp::CreateKqpFinalizeScriptService(
Settings->AppConfig->GetQueryServiceConfig(), federatedQuerySetupFactory, Settings->S3ActorsFactory
);
- TActorId scriptFinalizeServiceId = Runtime->Register(scriptFinalizeService, nodeIdx);
+ TActorId scriptFinalizeServiceId = Runtime->Register(scriptFinalizeService, nodeIdx, userPoolId);
Runtime->RegisterService(NKqp::MakeKqpFinalizeScriptServiceId(Runtime->GetNodeId(nodeIdx)), scriptFinalizeServiceId, nodeIdx);
}
@@ -975,19 +980,19 @@ namespace Tests {
{
IActor* compileService = CreateMiniKQLCompileService(100000);
- TActorId compileServiceId = Runtime->Register(compileService, nodeIdx, Runtime->GetAppData(nodeIdx).SystemPoolId, TMailboxType::Revolving, 0);
+ TActorId compileServiceId = Runtime->Register(compileService, nodeIdx, userPoolId, TMailboxType::Revolving, 0);
Runtime->RegisterService(MakeMiniKQLCompileServiceID(), compileServiceId, nodeIdx);
}
{
IActor* longTxService = NLongTxService::CreateLongTxService();
- TActorId longTxServiceId = Runtime->Register(longTxService, nodeIdx);
+ TActorId longTxServiceId = Runtime->Register(longTxService, nodeIdx, userPoolId);
Runtime->RegisterService(NLongTxService::MakeLongTxServiceID(Runtime->GetNodeId(nodeIdx)), longTxServiceId, nodeIdx);
}
{
IActor* sequenceProxy = NSequenceProxy::CreateSequenceProxy();
- TActorId sequenceProxyId = Runtime->Register(sequenceProxy, nodeIdx);
+ TActorId sequenceProxyId = Runtime->Register(sequenceProxy, nodeIdx, userPoolId);
Runtime->RegisterService(NSequenceProxy::MakeSequenceProxyServiceID(), sequenceProxyId, nodeIdx);
}
@@ -1000,7 +1005,7 @@ namespace Tests {
}
{
IActor* icNodeCache = NIcNodeCache::CreateICNodesInfoCacheService(Runtime->GetDynamicCounters());
- TActorId icCacheId = Runtime->Register(icNodeCache, nodeIdx);
+ TActorId icCacheId = Runtime->Register(icNodeCache, nodeIdx, userPoolId);
Runtime->RegisterService(NIcNodeCache::CreateICNodesInfoCacheServiceId(), icCacheId, nodeIdx);
}
{
@@ -1013,12 +1018,12 @@ namespace Tests {
{
IActor* pqClusterTracker = NPQ::NClusterTracker::CreateClusterTracker();
- TActorId pqClusterTrackerId = Runtime->Register(pqClusterTracker, nodeIdx);
+ TActorId pqClusterTrackerId = Runtime->Register(pqClusterTracker, nodeIdx, userPoolId);
Runtime->RegisterService(NPQ::NClusterTracker::MakeClusterTrackerID(), pqClusterTrackerId, nodeIdx);
}
{
IActor* pqReadCacheService = NPQ::CreatePQDReadCacheService(Runtime->GetDynamicCounters());
- TActorId readCacheId = Runtime->Register(pqReadCacheService, nodeIdx);
+ TActorId readCacheId = Runtime->Register(pqReadCacheService, nodeIdx, userPoolId);
Runtime->RegisterService(NPQ::MakePQDReadCacheServiceActorId(), readCacheId, nodeIdx);
}
@@ -1028,7 +1033,7 @@ namespace Tests {
new ::NMonitoring::TDynamicCounters(), TDuration::Seconds(1)
);
- TActorId pqMetaCacheId = Runtime->Register(pqMetaCache, nodeIdx);
+ TActorId pqMetaCacheId = Runtime->Register(pqMetaCache, nodeIdx, userPoolId);
Runtime->RegisterService(NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), pqMetaCacheId, nodeIdx);
}
}
@@ -1039,7 +1044,7 @@ namespace Tests {
try {
fileBackend = MakeHolder<TFileLogBackend>(Settings->MeteringFilePath);
auto meteringActor = NMetering::CreateMeteringWriter(std::move(fileBackend));
- TActorId meteringId = Runtime->Register(meteringActor.Release(), nodeIdx);
+ TActorId meteringId = Runtime->Register(meteringActor.Release(), nodeIdx, Runtime->GetAppData(nodeIdx).IOPoolId);
Runtime->RegisterService(NMetering::MakeMeteringServiceID(), meteringId, nodeIdx);
} catch (const TFileError &ex) {
@@ -1051,19 +1056,19 @@ namespace Tests {
{
IActor* kesusService = NKesus::CreateKesusProxyService();
- TActorId kesusServiceId = Runtime->Register(kesusService, nodeIdx);
+ TActorId kesusServiceId = Runtime->Register(kesusService, nodeIdx, userPoolId);
Runtime->RegisterService(NKesus::MakeKesusProxyServiceId(), kesusServiceId, nodeIdx);
}
{
IActor* netClassifier = NNetClassifier::CreateNetClassifier();
- TActorId netClassifierId = Runtime->Register(netClassifier, nodeIdx);
+ TActorId netClassifierId = Runtime->Register(netClassifier, nodeIdx, userPoolId);
Runtime->RegisterService(NNetClassifier::MakeNetClassifierID(), netClassifierId, nodeIdx);
}
{
IActor* actor = CreatePollerActor();
- TActorId actorId = Runtime->Register(actor, nodeIdx);
+ TActorId actorId = Runtime->Register(actor, nodeIdx, Runtime->GetAppData(nodeIdx).SystemPoolId);
Runtime->RegisterService(MakePollerActorId(), actorId, nodeIdx);
}
@@ -1075,11 +1080,11 @@ namespace Tests {
}
IActor* actor = NKafka::CreateKafkaListener(MakePollerActorId(), settings, Settings->AppConfig->GetKafkaProxyConfig());
- TActorId actorId = Runtime->Register(actor, nodeIdx);
+ TActorId actorId = Runtime->Register(actor, nodeIdx, userPoolId);
Runtime->RegisterService(TActorId{}, actorId, nodeIdx);
IActor* metricsActor = CreateKafkaMetricsActor(NKafka::TKafkaMetricsSettings{Runtime->GetAppData().Counters->GetSubgroup("counters", "kafka_proxy")});
- TActorId metricsActorId = Runtime->Register(metricsActor, nodeIdx);
+ TActorId metricsActorId = Runtime->Register(metricsActor, nodeIdx, userPoolId);
Runtime->RegisterService(NKafka::MakeKafkaMetricsServiceID(), metricsActorId, nodeIdx);
}
@@ -1206,7 +1211,7 @@ namespace Tests {
IActor* viewer = CreateViewer(*Settings->KikimrRunConfig);
SetupPQVirtualHandlers(dynamic_cast<IViewer*>(viewer));
SetupDBVirtualHandlers(dynamic_cast<IViewer*>(viewer));
- TActorId viewerId = Runtime->Register(viewer, nodeIdx);
+ TActorId viewerId = Runtime->Register(viewer, nodeIdx, Runtime->GetAppData(nodeIdx).BatchPoolId);
Runtime->RegisterService(MakeViewerID(nodeIdx), viewerId, nodeIdx);
}
}
diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h
index 7e476a05dab..db3a73415ed 100644
--- a/ydb/core/testlib/test_client.h
+++ b/ydb/core/testlib/test_client.h
@@ -295,8 +295,8 @@ namespace Tests {
TServer& operator =(TServer&& server) = default;
virtual ~TServer();
- void EnableGRpc(const NYdbGrpc::TServerOptions& options);
- void EnableGRpc(ui16 port);
+ void EnableGRpc(const NYdbGrpc::TServerOptions& options, ui32 grpcServiceNodeId = 0);
+ void EnableGRpc(ui16 port, ui32 grpcServiceNodeId = 0);
void SetupRootStoragePools(const TActorId sender) const;
void SetupDefaultProfiles();
diff --git a/ydb/tests/tools/kqprun/.gitignore b/ydb/tests/tools/kqprun/.gitignore
index 807aadd42e7..a4578942a67 100644
--- a/ydb/tests/tools/kqprun/.gitignore
+++ b/ydb/tests/tools/kqprun/.gitignore
@@ -6,3 +6,5 @@ udfs
*.sql
*.bin
*.txt
+*.svg
+*.old
diff --git a/ydb/tests/tools/kqprun/configuration/app_config.conf b/ydb/tests/tools/kqprun/configuration/app_config.conf
index b57e03854ae..1c9c56cebd2 100644
--- a/ydb/tests/tools/kqprun/configuration/app_config.conf
+++ b/ydb/tests/tools/kqprun/configuration/app_config.conf
@@ -1,3 +1,49 @@
+ActorSystemConfig {
+ Executor {
+ Type: BASIC
+ Threads: 1
+ SpinThreshold: 10
+ Name: "System"
+ }
+ Executor {
+ Type: BASIC
+ Threads: 6
+ SpinThreshold: 1
+ Name: "User"
+ }
+ Executor {
+ Type: BASIC
+ Threads: 1
+ SpinThreshold: 1
+ Name: "Batch"
+ }
+ Executor {
+ Type: IO
+ Threads: 1
+ Name: "IO"
+ }
+ Executor {
+ Type: BASIC
+ Threads: 2
+ SpinThreshold: 10
+ Name: "IC"
+ TimePerMailboxMicroSecs: 100
+ }
+ Scheduler {
+ Resolution: 64
+ SpinThreshold: 0
+ ProgressThreshold: 10000
+ }
+ SysExecutor: 0
+ UserExecutor: 1
+ IoExecutor: 3
+ BatchExecutor: 2
+ ServiceExecutor {
+ ServiceName: "Interconnect"
+ ExecutorId: 4
+ }
+}
+
ColumnShardConfig {
DisabledOnSchemeShard: false
}
diff --git a/ydb/tests/tools/kqprun/flame_graph.sh b/ydb/tests/tools/kqprun/flame_graph.sh
new file mode 100755
index 00000000000..ead8de30683
--- /dev/null
+++ b/ydb/tests/tools/kqprun/flame_graph.sh
@@ -0,0 +1,11 @@
+#!/usr/bin/env bash
+
+# For svg graph download https://github.com/brendangregg/FlameGraph
+# and run `FlameGraph/stackcollapse-perf.pl profdata.txt | FlameGraph/flamegraph.pl > profdata.svg`
+
+pid=$(pgrep -u $USER kqprun)
+
+echo "Target process id: ${pid}"
+
+sudo perf record -F 50 --call-graph dwarf -g --proc-map-timeout=10000 --pid $pid -v -o profdata -- sleep 30
+sudo perf script -i profdata > profdata.txt
diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp
index 3e4df13110a..88802d2aead 100644
--- a/ydb/tests/tools/kqprun/kqprun.cpp
+++ b/ydb/tests/tools/kqprun/kqprun.cpp
@@ -44,25 +44,29 @@ struct TExecutionOptions {
std::vector<TString> TraceIds;
std::vector<TString> PoolIds;
std::vector<TString> UserSIDs;
+ ui64 ResultsRowsLimit = 0;
const TString DefaultTraceId = "kqprun";
bool HasResults() const {
- if (ScriptQueries.empty()) {
- return false;
- }
-
- for (size_t i = 0; i < ExecutionCases.size(); ++i) {
+ for (size_t i = 0; i < ScriptQueries.size(); ++i) {
if (GetScriptQueryAction(i) != NKikimrKqp::EQueryAction::QUERY_ACTION_EXECUTE) {
continue;
}
- if (ExecutionCases[i] != EExecutionCase::AsyncQuery) {
+ if (GetExecutionCase(i) != EExecutionCase::AsyncQuery) {
return true;
}
}
return false;
}
+ bool HasExecutionCase(EExecutionCase executionCase) const {
+ if (ExecutionCases.empty()) {
+ return executionCase == EExecutionCase::GenericScript;
+ }
+ return std::find(ExecutionCases.begin(), ExecutionCases.end(), executionCase) != ExecutionCases.end();
+ }
+
EExecutionCase GetExecutionCase(size_t index) const {
return GetValue(index, ExecutionCases, EExecutionCase::GenericScript);
}
@@ -106,6 +110,113 @@ struct TExecutionOptions {
};
}
+ void Validate(const NKqpRun::TRunnerOptions& runnerOptions) const {
+ if (!SchemeQuery && ScriptQueries.empty() && !runnerOptions.YdbSettings.MonitoringEnabled && !runnerOptions.YdbSettings.GrpcEnabled) {
+ ythrow yexception() << "Nothing to execute and is not running as daemon";
+ }
+
+ ValidateOptionsSizes();
+ ValidateSchemeQueryOptions(runnerOptions);
+ ValidateScriptExecutionOptions(runnerOptions);
+ ValidateAsyncOptions(runnerOptions.YdbSettings.AsyncQueriesSettings);
+ ValidateTraceOpt(runnerOptions.TraceOptType);
+ }
+
+private:
+ void ValidateOptionsSizes() const {
+ const auto checker = [numberQueries = ScriptQueries.size()](size_t checkSize, const TString& optionName) {
+ if (checkSize > numberQueries) {
+ ythrow yexception() << "Too many " << optionName << ". Specified " << checkSize << ", when number of queries is " << numberQueries;
+ }
+ };
+
+ checker(ExecutionCases.size(), "execution cases");
+ checker(ScriptQueryActions.size(), "script query actions");
+ checker(Databases.size(), "databases");
+ checker(TraceIds.size(), "trace ids");
+ checker(PoolIds.size(), "pool ids");
+ checker(UserSIDs.size(), "user SIDs");
+ }
+
+ void ValidateSchemeQueryOptions(const NKqpRun::TRunnerOptions& runnerOptions) const {
+ if (SchemeQuery) {
+ return;
+ }
+ if (runnerOptions.SchemeQueryAstOutput) {
+ ythrow yexception() << "Scheme query AST output can not be used without scheme query";
+ }
+ }
+
+ void ValidateScriptExecutionOptions(const NKqpRun::TRunnerOptions& runnerOptions) const {
+ // Script specific
+ if (HasExecutionCase(EExecutionCase::GenericScript)) {
+ return;
+ }
+ if (ForgetExecution) {
+ ythrow yexception() << "Forget execution can not be used without generic script queries";
+ }
+ if (runnerOptions.ScriptCancelAfter) {
+ ythrow yexception() << "Cancel after can not be used without generic script queries";
+ }
+
+ // Script/Query specific
+ if (HasExecutionCase(EExecutionCase::GenericQuery)) {
+ return;
+ }
+ if (ResultsRowsLimit) {
+ ythrow yexception() << "Result rows limit can not be used without script queries";
+ }
+ if (runnerOptions.InProgressStatisticsOutputFile) {
+ ythrow yexception() << "Script statistics can not be used without script queries";
+ }
+
+ // Common specific
+ if (HasExecutionCase(EExecutionCase::YqlScript)) {
+ return;
+ }
+ if (runnerOptions.ScriptQueryAstOutput) {
+ ythrow yexception() << "Script query AST output can not be used without script/yql queries";
+ }
+ if (runnerOptions.ScriptQueryPlanOutput) {
+ ythrow yexception() << "Script query plan output can not be used without script/yql queries";
+ }
+ }
+
+ void ValidateAsyncOptions(const NKqpRun::TAsyncQueriesSettings& asyncQueriesSettings) const {
+ if (asyncQueriesSettings.InFlightLimit && !HasExecutionCase(EExecutionCase::AsyncQuery)) {
+ ythrow yexception() << "In flight limit can not be used without async queries";
+ }
+
+ NColorizer::TColors colors = NColorizer::AutoColors(Cout);
+ if (LoopCount && asyncQueriesSettings.InFlightLimit && asyncQueriesSettings.InFlightLimit > ScriptQueries.size() * LoopCount) {
+ Cout << colors.Red() << "Warning: inflight limit is " << asyncQueriesSettings.InFlightLimit << ", that is larger than max possible number of queries " << ScriptQueries.size() * LoopCount << colors.Default() << Endl;
+ }
+ }
+
+ void ValidateTraceOpt(NKqpRun::TRunnerOptions::ETraceOptType traceOptType) const {
+ switch (traceOptType) {
+ case NKqpRun::TRunnerOptions::ETraceOptType::Scheme: {
+ if (!SchemeQuery) {
+ ythrow yexception() << "Trace opt type scheme cannot be used without scheme query";
+ }
+ break;
+ }
+ case NKqpRun::TRunnerOptions::ETraceOptType::Script: {
+ if (ScriptQueries.empty()) {
+ ythrow yexception() << "Trace opt type script cannot be used without script queries";
+ }
+ }
+ case NKqpRun::TRunnerOptions::ETraceOptType::All: {
+ if (!SchemeQuery && ScriptQueries.empty()) {
+ ythrow yexception() << "Trace opt type all cannot be used without any queries";
+ }
+ }
+ case NKqpRun::TRunnerOptions::ETraceOptType::Disabled: {
+ break;
+ }
+ }
+ }
+
private:
template <typename TValue>
static TValue GetValue(size_t index, const std::vector<TValue>& values, TValue defaultValue) {
@@ -278,7 +389,6 @@ class TMain : public TMainClassArgs {
TVector<TString> UdfsPaths;
TString UdfsDirectory;
bool ExcludeLinkedUdfs = false;
- ui64 ResultsRowsLimit = 1000;
bool EmulateYt = false;
static TString LoadFile(const TString& file) {
@@ -415,8 +525,8 @@ protected:
.StoreMappedResultT<TString>(&RunnerOptions.ResultOutput, &GetDefaultOutput);
options.AddLongOption('L', "result-rows-limit", "Rows limit for script execution results")
.RequiredArgument("uint")
- .DefaultValue(ResultsRowsLimit)
- .StoreResult(&ResultsRowsLimit);
+ .DefaultValue(0)
+ .StoreResult(&ExecutionOptions.ResultsRowsLimit);
TChoices<NKqpRun::TRunnerOptions::EResultOutputFormat> resultFormat({
{"rows", NKqpRun::TRunnerOptions::EResultOutputFormat::RowsJson},
{"full-json", NKqpRun::TRunnerOptions::EResultOutputFormat::FullJson},
@@ -441,7 +551,12 @@ protected:
.StoreMappedResultT<TString>(&RunnerOptions.ScriptQueryPlanOutput, &GetDefaultOutput);
options.AddLongOption("script-statistics", "File with script inprogress statistics")
.RequiredArgument("file")
- .StoreResult(&RunnerOptions.InProgressStatisticsOutputFile);
+ .StoreMappedResultT<TString>(&RunnerOptions.InProgressStatisticsOutputFile, [](const TString& file) {
+ if (file == "-") {
+ ythrow yexception() << "Script in progress statistics cannot be printed to stdout, please specify file name";
+ }
+ return file;
+ });
TChoices<NYdb::NConsoleClient::EDataFormat> planFormat({
{"pretty", NYdb::NConsoleClient::EDataFormat::Pretty},
{"table", NYdb::NConsoleClient::EDataFormat::PrettyTable},
@@ -453,6 +568,15 @@ protected:
.Choices(planFormat.GetChoices())
.StoreMappedResultT<TString>(&RunnerOptions.PlanOutputFormat, planFormat);
+ options.AddLongOption("script-timeline-file", "File with script query timline in svg format")
+ .RequiredArgument("file")
+ .StoreMappedResultT<TString>(&RunnerOptions.ScriptQueryTimelineFile, [](const TString& file) {
+ if (file == "-") {
+ ythrow yexception() << "Script timline cannot be printed to stdout, please specify file name";
+ }
+ return file;
+ });
+
// Pipeline settings
TChoices<TExecutionOptions::EExecutionCase> executionCase({
@@ -463,7 +587,6 @@ protected:
});
options.AddLongOption('C', "execution-case", "Type of query for -p argument")
.RequiredArgument("query-type")
- .DefaultValue("script")
.Choices(executionCase.GetChoices())
.Handler1([this, executionCase](const NLastGetopt::TOptsParser* option) {
TString choice(option->CurValOrDef());
@@ -489,13 +612,16 @@ protected:
});
options.AddLongOption('A', "script-action", "Script query execute action")
.RequiredArgument("script-action")
- .DefaultValue("execute")
.Choices(scriptAction.GetChoices())
.Handler1([this, scriptAction](const NLastGetopt::TOptsParser* option) {
TString choice(option->CurValOrDef());
ExecutionOptions.ScriptQueryActions.emplace_back(scriptAction(choice));
});
+ options.AddLongOption("timeout", "Reauests timeout in milliseconds")
+ .RequiredArgument("uint")
+ .StoreMappedResultT<ui64>(&RunnerOptions.YdbSettings.RequestsTimeout, &TDuration::MilliSeconds<ui64>);
+
options.AddLongOption("cancel-after", "Cancel script execution operation after specified delay in milliseconds")
.RequiredArgument("uint")
.StoreMappedResultT<ui64>(&RunnerOptions.ScriptCancelAfter, &TDuration::MilliSeconds<ui64>);
@@ -510,7 +636,7 @@ protected:
.StoreResult(&ExecutionOptions.LoopCount);
options.AddLongOption("loop-delay", "Delay in milliseconds between loop steps")
.RequiredArgument("uint")
- .DefaultValue(1000)
+ .DefaultValue(0)
.StoreMappedResultT<ui64>(&ExecutionOptions.LoopDelay, &TDuration::MilliSeconds<ui64>);
options.AddLongOption('D', "database", "Database path for -p queries")
@@ -576,6 +702,21 @@ protected:
.RequiredArgument("path")
.InsertTo(&RunnerOptions.YdbSettings.ServerlessTenants);
+ options.AddLongOption("storage-size", "Domain storage size in gigabytes")
+ .RequiredArgument("uint")
+ .DefaultValue(32)
+ .StoreMappedResultT<ui32>(&RunnerOptions.YdbSettings.DiskSize, [](ui32 diskSize) {
+ return static_cast<ui64>(diskSize) << 30;
+ });
+
+ options.AddLongOption("real-pdisks", "Use real PDisks instead of in memory PDisks (also disable disk mock)")
+ .NoArgument()
+ .SetFlag(&RunnerOptions.YdbSettings.UseRealPDisks);
+
+ options.AddLongOption("disable-disk-mock", "Disable disk mock on single node cluster")
+ .NoArgument()
+ .SetFlag(&RunnerOptions.YdbSettings.DisableDiskMock);
+
TChoices<std::function<void()>> backtrace({
{"heavy", &NKikimr::EnableYDBBacktraceFormat},
{"light", []() { SetFormatBackTraceFn(FormatBackTrace); }}
@@ -591,13 +732,17 @@ protected:
}
int DoRun(NLastGetopt::TOptsParseResult&&) override {
- if (!ExecutionOptions.SchemeQuery && ExecutionOptions.ScriptQueries.empty() && !RunnerOptions.YdbSettings.MonitoringEnabled && !RunnerOptions.YdbSettings.GrpcEnabled) {
- ythrow yexception() << "Nothing to execute";
+ ExecutionOptions.Validate(RunnerOptions);
+
+ if (RunnerOptions.YdbSettings.DisableDiskMock && RunnerOptions.YdbSettings.NodeCount + RunnerOptions.YdbSettings.SharedTenants.size() + RunnerOptions.YdbSettings.DedicatedTenants.size() > 1) {
+ ythrow yexception() << "Disable disk mock cannot be used for multi node clusters";
}
RunnerOptions.YdbSettings.YqlToken = YqlToken;
RunnerOptions.YdbSettings.FunctionRegistry = CreateFunctionRegistry(UdfsDirectory, UdfsPaths, ExcludeLinkedUdfs).Get();
- RunnerOptions.YdbSettings.AppConfig.MutableQueryServiceConfig()->SetScriptResultRowsLimit(ResultsRowsLimit);
+ if (ExecutionOptions.ResultsRowsLimit) {
+ RunnerOptions.YdbSettings.AppConfig.MutableQueryServiceConfig()->SetScriptResultRowsLimit(ExecutionOptions.ResultsRowsLimit);
+ }
if (EmulateYt) {
const auto& fileStorageConfig = RunnerOptions.YdbSettings.AppConfig.GetQueryServiceConfig().GetFileStorage();
diff --git a/ydb/tests/tools/kqprun/src/common.h b/ydb/tests/tools/kqprun/src/common.h
index 8e588c3f620..0bd74b48701 100644
--- a/ydb/tests/tools/kqprun/src/common.h
+++ b/ydb/tests/tools/kqprun/src/common.h
@@ -32,6 +32,11 @@ struct TYdbSetupSettings {
std::unordered_set<TString> SharedTenants;
std::unordered_set<TString> ServerlessTenants;
TDuration InitializationTimeout = TDuration::Seconds(10);
+ TDuration RequestsTimeout;
+
+ bool DisableDiskMock = false;
+ bool UseRealPDisks = false;
+ ui64 DiskSize = 32_GB;
bool MonitoringEnabled = false;
ui16 MonitoringPortOffset = 0;
@@ -69,6 +74,7 @@ struct TRunnerOptions {
IOutputStream* SchemeQueryAstOutput = nullptr;
IOutputStream* ScriptQueryAstOutput = nullptr;
IOutputStream* ScriptQueryPlanOutput = nullptr;
+ TString ScriptQueryTimelineFile;
TString InProgressStatisticsOutputFile;
EResultOutputFormat ResultOutputFormat = EResultOutputFormat::RowsJson;
diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.cpp b/ydb/tests/tools/kqprun/src/kqp_runner.cpp
index 1a500f7d115..f58eba9ee99 100644
--- a/ydb/tests/tools/kqprun/src/kqp_runner.cpp
+++ b/ydb/tests/tools/kqprun/src/kqp_runner.cpp
@@ -9,6 +9,7 @@
#include <ydb/public/lib/json_value/ydb_json_value.h>
#include <ydb/public/lib/ydb_cli/common/format.h>
+#include <ydb/public/lib/ydb_cli/common/plan2svg.h>
namespace NKqpRun {
@@ -159,8 +160,12 @@ public:
TYdbSetup::StopTraceOpt();
+ if (!meta.Plan) {
+ meta.Plan = ExecutionMeta_.Plan;
+ }
+
PrintScriptAst(meta.Ast);
- PrintScriptProgress(ExecutionMeta_.Plan);
+ PrintScriptProgress(meta.Plan);
PrintScriptPlan(meta.Plan);
PrintScriptFinish(meta, queryTypeStr);
@@ -262,6 +267,7 @@ private:
}
PrintScriptAst(ExecutionMeta_.Ast);
+ PrintScriptProgress(ExecutionMeta_.Plan);
PrintScriptPlan(ExecutionMeta_.Plan);
PrintScriptFinish(ExecutionMeta_, "Script");
@@ -354,6 +360,15 @@ private:
outputStream.Finish();
}
+ if (Options_.ScriptQueryTimelineFile) {
+ TFileOutput outputStream(Options_.ScriptQueryTimelineFile);
+
+ TPlanVisualizer planVisualizer;
+ planVisualizer.LoadPlans(plan);
+ outputStream.Write(planVisualizer.PrintSvg());
+
+ outputStream.Finish();
+ }
}
TProgressCallback GetProgressCallback() {
diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.cpp b/ydb/tests/tools/kqprun/src/ydb_setup.cpp
index 4270d16b16f..d481ef25428 100644
--- a/ydb/tests/tools/kqprun/src/ydb_setup.cpp
+++ b/ydb/tests/tools/kqprun/src/ydb_setup.cpp
@@ -5,6 +5,7 @@
#include <ydb/core/kqp/common/kqp_script_executions.h>
#include <ydb/core/kqp/proxy_service/kqp_script_executions.h>
+#include <ydb/core/testlib/basics/storage.h>
#include <ydb/core/testlib/test_client.h>
#include <ydb/library/yql/providers/s3/actors/yql_s3_actors_factory_impl.h>
@@ -121,6 +122,18 @@ private:
serverSettings.SetFrFactory(functionRegistryFactory);
}
+ void SetStorageSettings(NKikimr::Tests::TServerSettings& serverSettings) const {
+ const NKikimr::NFake::TStorage storage = {
+ .UseDisk = Settings_.UseRealPDisks,
+ .SectorSize = NKikimr::TTestStorageFactory::SECTOR_SIZE,
+ .ChunkSize = Settings_.UseRealPDisks ? NKikimr::TTestStorageFactory::CHUNK_SIZE : NKikimr::TTestStorageFactory::MEM_CHUNK_SIZE,
+ .DiskSize = Settings_.DiskSize
+ };
+
+ serverSettings.SetEnableMockOnSingleNode(!Settings_.DisableDiskMock && !Settings_.UseRealPDisks);
+ serverSettings.SetCustomDiskParams(storage);
+ }
+
NKikimr::Tests::TServerSettings GetServerSettings(ui32 grpcPort) {
const ui32 msgBusPort = PortManager_.GetPort();
@@ -147,6 +160,7 @@ private:
SetLoggerSettings(serverSettings);
SetFunctionRegistry(serverSettings);
+ SetStorageSettings(serverSettings);
if (Settings_.MonitoringEnabled) {
serverSettings.InitKikimrRunConfig();
@@ -307,7 +321,7 @@ public:
NKikimr::NKqp::TEvKqp::TEvScriptResponse::TPtr ScriptRequest(const TRequestOptions& script) const {
auto event = MakeHolder<NKikimr::NKqp::TEvKqp::TEvScriptRequest>();
- FillScriptRequest(script, event->Record);
+ FillQueryRequest(script, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT, event->Record);
return RunKqpProxyRequest<NKikimr::NKqp::TEvKqp::TEvScriptRequest, NKikimr::NKqp::TEvKqp::TEvScriptResponse>(std::move(event), script.Database);
}
@@ -315,7 +329,7 @@ public:
TQueryResponse QueryRequest(const TRequestOptions& query, TProgressCallback progressCallback) const {
auto request = GetQueryRequest(query);
auto promise = NThreading::NewPromise<TQueryResponse>();
- GetRuntime()->Register(CreateRunScriptActorMock(std::move(request), promise, progressCallback), 0, GetRuntime()->GetAppData().UserPoolId);
+ GetRuntime()->Register(CreateRunScriptActorMock(std::move(request), promise, progressCallback), request.TargetNode - GetRuntime()->GetFirstNodeId(), GetRuntime()->GetAppData().UserPoolId);
return promise.GetFuture().GetValueSync();
}
@@ -425,15 +439,9 @@ private:
request->SetCollectStats(Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL);
request->SetDatabase(GetDatabasePath(query.Database));
request->SetPoolId(query.PoolId);
- }
-
- void FillScriptRequest(const TRequestOptions& script, NKikimrKqp::TEvQueryRequest& event) const {
- FillQueryRequest(script, NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT, event);
- auto request = event.MutableRequest();
- if (script.Action == NKikimrKqp::QUERY_ACTION_EXECUTE) {
- request->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
- request->MutableTxControl()->set_commit_tx(true);
+ if (Settings_.RequestsTimeout) {
+ request->SetTimeoutMs(Settings_.RequestsTimeout.MilliSeconds());
}
}