diff options
author | Vladislav Kuznetsov <va.kuznecov@physics.msu.ru> | 2022-04-12 19:34:10 +0300 |
---|---|---|
committer | Vladislav Kuznetsov <va.kuznecov@physics.msu.ru> | 2022-04-12 19:34:10 +0300 |
commit | 5ec07a4ecf15d30c6b6d8bdca12ad9c137ca71e3 (patch) | |
tree | dedee2289f7258a0ed22b8d38ccaed2ae1bbcb18 | |
parent | 1edfc13e1889542c6d5cf4cfaa9975181bf27686 (diff) | |
download | ydb-5ec07a4ecf15d30c6b6d8bdca12ad9c137ca71e3.tar.gz |
SCAN requests in TSessionActor KIKIMR-14551
ref:a0d2893702c3be4fe723f014ba44c4121300e85a
-rw-r--r-- | ydb/core/kqp/host/kqp_host.cpp | 31 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_host.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/kqp.h | 12 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_compile_actor.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_session_actor.cpp | 326 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_worker_actor.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_provider.h | 3 |
7 files changed, 282 insertions, 101 deletions
diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index 03539d916a..0c39b83092 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -1372,6 +1372,20 @@ public: }); } + IAsyncQueryResultPtr PrepareScanQuery(const TString& query, bool isSql, const TPrepareSettings& /*settings*/) override { + return CheckedProcessQuery(*ExprCtx, + [this, &query, isSql] (TExprContext& ctx) mutable { + return PrepareScanQueryInternal(query, isSql, ctx); + }); + } + + TQueryResult SyncPrepareScanQuery(const TString& query, bool isSql, const TPrepareSettings& settings) override { + return CheckedSyncProcessQuery( + [this, &query, isSql, settings] () mutable { + return PrepareScanQuery(query, isSql, settings); + }); + } + IAsyncQueryResultPtr ExecuteScanQuery(const TString& query, bool isSql, NKikimrMiniKQL::TParams&& parameters, const NActors::TActorId& target, const IKikimrQueryExecutor::TExecuteSettings& settings) override { @@ -1754,10 +1768,7 @@ private: } IAsyncQueryResultPtr ExplainScanQueryInternal(const TString& query, bool isSql, TExprContext& ctx) { - auto prepareResult = isSql - ? PrepareScanQueryInternal(query, ctx) - : PrepareScanQueryAstInternal(query, ctx); - return prepareResult; + return PrepareScanQueryInternal(query, isSql, ctx); } IAsyncQueryResultPtr PrepareDataQueryInternal(const TString& query, const TPrepareSettings& settings, @@ -1815,6 +1826,14 @@ private: SessionCtx, *ExecuteCtx); } + IAsyncQueryResultPtr PrepareScanQueryInternal(const TString& query, bool isSql, TExprContext& ctx, + EKikimrStatsMode statsMode = EKikimrStatsMode::None) + { + return isSql + ? PrepareScanQueryInternal(query, ctx, statsMode) + : PrepareScanQueryAstInternal(query, ctx); + } + IAsyncQueryResultPtr PrepareScanQueryInternal(const TString& query, TExprContext& ctx, EKikimrStatsMode statsMode = EKikimrStatsMode::None) { SetupYqlTransformer(nullptr); @@ -2069,9 +2088,7 @@ private: NKikimrMiniKQL::TParams&& parameters, const NActors::TActorId& target, const IKikimrQueryExecutor::TExecuteSettings& settings, TExprContext& ctx) { - auto prepareResult = isSql - ? PrepareScanQueryInternal(query, ctx, settings.StatsMode) - : PrepareScanQueryAstInternal(query, ctx); + auto prepareResult = PrepareScanQueryInternal(query, isSql, ctx, settings.StatsMode); if (!prepareResult) { return nullptr; diff --git a/ydb/core/kqp/host/kqp_host.h b/ydb/core/kqp/host/kqp_host.h index 11282d0ebe..721f04c8c4 100644 --- a/ydb/core/kqp/host/kqp_host.h +++ b/ydb/core/kqp/host/kqp_host.h @@ -94,6 +94,9 @@ public: virtual TQueryResult SyncExecuteSchemeQuery(const TString& query, bool isSql) = 0; /* Scan queries */ + virtual IAsyncQueryResultPtr PrepareScanQuery(const TString& query, bool isSql, const TPrepareSettings& settings) = 0; + virtual TQueryResult SyncPrepareScanQuery(const TString& query, bool isSql, const TPrepareSettings& settings) = 0; + virtual IAsyncQueryResultPtr ExecuteScanQuery(const TString& query, bool isSql, NKikimrMiniKQL::TParams&& parameters, const NActors::TActorId& target, const NYql::IKikimrQueryExecutor::TExecuteSettings& settings) = 0; diff --git a/ydb/core/kqp/kqp.h b/ydb/core/kqp/kqp.h index 09f794591d..b2a9a35ef0 100644 --- a/ydb/core/kqp/kqp.h +++ b/ydb/core/kqp/kqp.h @@ -133,12 +133,15 @@ struct TKqpQueryId { TString UserSid; TString Text; TKqpQuerySettings Settings; + bool Scan; public: - TKqpQueryId(const TString& cluster, const TString& database, const TString& text) + TKqpQueryId(const TString& cluster, const TString& database, const TString& text, bool scan) : Cluster(cluster) , Database(database) - , Text(text) {} + , Text(text) + , Scan(scan) + {} bool operator==(const TKqpQueryId& other) const { return @@ -146,7 +149,8 @@ public: Database == other.Database && UserSid == other.UserSid && Text == other.Text && - Settings == other.Settings; + Settings == other.Settings && + Scan == other.Scan; } bool operator!=(const TKqpQueryId& other) { @@ -159,7 +163,7 @@ public: bool operator>=(const TKqpQueryId&) = delete; size_t GetHash() const noexcept { - auto tuple = std::make_tuple(Cluster, Database, UserSid, Text, Settings); + auto tuple = std::make_tuple(Cluster, Database, UserSid, Text, Settings, Scan); return THash<decltype(tuple)>()(tuple); } }; diff --git a/ydb/core/kqp/kqp_compile_actor.cpp b/ydb/core/kqp/kqp_compile_actor.cpp index 25c6723cf6..ee471926fb 100644 --- a/ydb/core/kqp/kqp_compile_actor.cpp +++ b/ydb/core/kqp/kqp_compile_actor.cpp @@ -105,7 +105,9 @@ public: NCpuTime::TCpuTimer timer(CompileCpuTime); - AsyncCompileResult = KqpHost->PrepareDataQuery(Query.Text, prepareSettings); + AsyncCompileResult = Query.Scan + ? KqpHost->PrepareScanQuery(Query.Text, true, prepareSettings) + : KqpHost->PrepareDataQuery(Query.Text, prepareSettings); Continue(ctx); diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index 6d5d8c3f26..bb9a72dfbd 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -8,6 +8,7 @@ #include <ydb/core/kqp/executer/kqp_executer.h> #include <ydb/core/kqp/host/kqp_host_impl.h> #include <ydb/core/kqp/prepare/kqp_prepare.h> +#include <ydb/core/kqp/prepare/kqp_query_plan.h> #include <ydb/core/kqp/provider/yql_kikimr_provider.h> #include <ydb/core/kqp/provider/yql_kikimr_results.h> #include <ydb/core/kqp/rm/kqp_snapshot_manager.h> @@ -61,9 +62,9 @@ struct TKqpQueryState { TPreparedQueryConstPtr PreparedQuery; TKqpCompileResult::TConstPtr CompileResult; NKqpProto::TKqpStatsCompile CompileStats; - TIntrusivePtr<TKikimrQueryContext> QueryCtx; TIntrusivePtr<TKqpTransactionContext> TxCtx; - TVector<TVector<NKikimrMiniKQL::TResult>> TxResults; + std::shared_ptr<TKikimrQueryContext> QueryCtx = std::make_shared<TKikimrQueryContext>(); + TActorId RequestActorId; ui64 CurrentTx = 0; TString TraceId; @@ -75,7 +76,6 @@ struct TKqpQueryState { TString UserToken; - NLWTrace::TOrbit Orbit; TString TxId; // User tx @@ -254,30 +254,38 @@ public: YQL_ENSURE(txControl.tx_selector_case() == Ydb::Table::TransactionControl::kTxId, "Can't commit transaction - " << " there is no TxId in Query's TxControl, queryRequest: " << queryRequest.DebugString()); - LOG_D("queryRequest TxControl: " << txControl.DebugString()); QueryState->Commit = txControl.commit_tx(); const auto& txId = txControl.tx_id(); auto txCtx = FindTransaction(txId); - YQL_ENSURE(txCtx, "Can't find txId: " << txId); + LOG_D("queryRequest TxControl: " << txControl.DebugString() << " txCtx: " << (void*)txCtx.Get()); + if (!txCtx) { + std::vector<TIssue> issues{YqlIssue(TPosition(), TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND, + TStringBuilder() << "Transaction not found: " << QueryState->TxId)}; + auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); + ReplyQueryError(requestInfo, Ydb::StatusIds::NOT_FOUND, "", MessageFromIssues(issues)); + return; + } QueryState->TxCtx = std::move(txCtx); QueryState->TxId = txId; - ExecutePhyTx(/*query*/ nullptr, /*tx*/ nullptr, /*commit*/ true); + bool replied = ExecutePhyTx(/*query*/ nullptr, /*tx*/ nullptr, /*commit*/ true); - Become(&TKqpSessionActor::ExecuteState); + if (!replied) { + Become(&TKqpSessionActor::ExecuteState); + } } static bool IsQueryTypeSupported(NKikimrKqp::EQueryType type) { switch (type) { case NKikimrKqp::QUERY_TYPE_SQL_DML: case NKikimrKqp::QUERY_TYPE_PREPARED_DML: + case NKikimrKqp::QUERY_TYPE_SQL_SCAN: return true; // should not be compiled. TODO: forward to request executer // not supported yet case NKikimrKqp::QUERY_TYPE_SQL_DDL: - case NKikimrKqp::QUERY_TYPE_SQL_SCAN: case NKikimrKqp::QUERY_TYPE_AST_SCAN: case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT: case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING: @@ -313,6 +321,7 @@ public: << " prepared: " << queryRequest.HasPreparedQuery() << " tx_control: " << queryRequest.HasTxControl() << " action: " << action + << " type: " << (queryRequest.HasType() ? queryRequest.GetType() : NKikimrKqp::QUERY_TYPE_UNDEFINED) ); QueryState->Sender = ev->Sender; @@ -322,6 +331,7 @@ public: QueryState->UserToken = event.GetUserToken(); QueryState->QueryDeadlines = GetQueryDeadlines(queryRequest); QueryState->ParametersSize = queryRequest.GetParameters().ByteSize(); + QueryState->RequestActorId = ActorIdFromProto(event.GetRequestActorId()); switch (action) { case NKikimrKqp::QUERY_ACTION_EXECUTE: @@ -376,14 +386,15 @@ public: TMaybe<TString> uid; bool keepInCache = false; + bool scan = queryRequest.GetType() == NKikimrKqp::QUERY_TYPE_SQL_SCAN; switch (queryRequest.GetAction()) { case NKikimrKqp::QUERY_ACTION_EXECUTE: - query = TKqpQueryId(Settings.Cluster, Settings.Database, queryRequest.GetQuery()); + query = TKqpQueryId(Settings.Cluster, Settings.Database, queryRequest.GetQuery(), scan); keepInCache = queryRequest.GetQueryCachePolicy().keep_in_cache(); break; case NKikimrKqp::QUERY_ACTION_PREPARE: - query = TKqpQueryId(Settings.Cluster, Settings.Database, queryRequest.GetQuery()); + query = TKqpQueryId(Settings.Cluster, Settings.Database, queryRequest.GetQuery(), scan); keepInCache = true; break; @@ -423,7 +434,6 @@ public: if (ReplyQueryCompileError(compileResult)) { Cleanup(); StartIdleTimer(); - Become(&TThis::ReadyState); } else { FinalCleanup(); } @@ -435,30 +445,32 @@ public: YQL_ENSURE(compiledVersion == NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1, "Invalid compiled version: " << compiledVersion); + QueryState->CompileResult = compileResult; + QueryState->CompileStats.Swap(&ev->Get()->Stats); + QueryState->PreparedQuery = compileResult->PreparedQuery; + QueryState->Request.SetQuery(QueryState->PreparedQuery->GetText()); + auto& queryRequest = QueryState->Request; if (queryRequest.GetAction() == NKikimrKqp::QUERY_ACTION_PREPARE) { if (ReplyPrepareResult(compileResult)) { Cleanup(); StartIdleTimer(); - Become(&TThis::ReadyState); } else { FinalCleanup(); } return; } - QueryState->CompileResult = compileResult; - QueryState->CompileStats.Swap(&ev->Get()->Stats); - QueryState->PreparedQuery = compileResult->PreparedQuery; - QueryState->Request.SetQuery(QueryState->PreparedQuery->GetText()); - if (!PrepareQueryContext()) { return; } Become(&TKqpSessionActor::ExecuteState); - if (NeedSnapshot(*QueryState->TxCtx, *Config, /*rollback*/ false, QueryState->Commit, + if (queryRequest.GetType() == NKikimrKqp::QUERY_TYPE_SQL_SCAN) { + AcquirePersistentSnapshot(); + return; + } else if (NeedSnapshot(*QueryState->TxCtx, *Config, /*rollback*/ false, QueryState->Commit, &QueryState->PreparedQuery->GetPhysicalQuery(), /*preparedKql*/ nullptr)) { AcquireMvccSnapshot(); return; @@ -467,6 +479,29 @@ public: ExecuteOrDefer(); } + void AcquirePersistentSnapshot() { + auto timeout = QueryState->QueryDeadlines.TimeoutAt - TAppData::TimeProvider->Now(); + + auto* snapMgr = CreateKqpSnapshotManager(Settings.Database, timeout); + auto snapMgrActorId = TlsActivationContext->ExecutorThread.RegisterActor(snapMgr); + + THashSet<TString> tablesSet; + const auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery(); + for (const auto& phyTx: phyQuery.GetTransactions()) { + for (const auto& stage: phyTx.GetStages()) { + for (const auto& tableOp: stage.GetTableOps()) { + tablesSet.insert(tableOp.GetTable().GetPath()); + } + } + } + TVector<TString> tables(tablesSet.begin(), tablesSet.end()); + + auto ev = std::make_unique<TEvKqpSnapshot::TEvCreateSnapshotRequest>(tables); + Send(snapMgrActorId, ev.release()); + + QueryState->TxCtx->SnapshotHandle.ManagingActor = snapMgrActorId; + } + void AcquireMvccSnapshot() { LOG_D("AcquireMvccSnapshot"); auto timeout = QueryState->QueryDeadlines.TimeoutAt - TAppData::TimeProvider->Now(); @@ -482,13 +517,11 @@ public: auto *response = ev->Get(); if (response->Status != NKikimrIssues::TStatusIds::SUCCESS) { + // TODO (void)response->Issues; return; } - - QueryState->TxCtx->SnapshotHandle = IKqpGateway::TKqpSnapshotHandle{ - .Snapshot = response->Snapshot, - }; + QueryState->TxCtx->SnapshotHandle.Snapshot = response->Snapshot; // Can reply inside (in case of deferred-only transactions) and become ReadyState ExecuteOrDefer(); @@ -566,31 +599,40 @@ public: auto& queryRequest = QueryState->Request; - YQL_ENSURE(queryRequest.HasTxControl()); - auto& txControl = queryRequest.GetTxControl(); - - QueryState->Commit = txControl.commit_tx(); - switch (txControl.tx_selector_case()) { - case Ydb::Table::TransactionControl::kTxId: { - TString txId = txControl.tx_id(); - auto it = ExplicitTransactions.Find(txId); - YQL_ENSURE(it != ExplicitTransactions.End()); - QueryState->TxCtx = *it; - QueryState->TxId = txId; - QueryState->TxCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE; - break; + if (queryRequest.HasTxControl()) { + auto& txControl = queryRequest.GetTxControl(); + + QueryState->Commit = txControl.commit_tx(); + switch (txControl.tx_selector_case()) { + case Ydb::Table::TransactionControl::kTxId: { + TString txId = txControl.tx_id(); + auto it = ExplicitTransactions.Find(txId); + if (it == ExplicitTransactions.End()) { + std::vector<TIssue> issues{YqlIssue(TPosition(), TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND, + TStringBuilder() << "Transaction not found: " << QueryState->TxId)}; + ReplyQueryError(requestInfo, Ydb::StatusIds::BAD_REQUEST, "", MessageFromIssues(issues)); + return false; + } + QueryState->TxCtx = *it; + QueryState->TxId = txId; + QueryState->TxCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE; + break; + } + case Ydb::Table::TransactionControl::kBeginTx: { + BeginTx(txControl.begin_tx()); + break; + } + case Ydb::Table::TransactionControl::TX_SELECTOR_NOT_SET: + YQL_ENSURE(false); } - case Ydb::Table::TransactionControl::kBeginTx: { - BeginTx(txControl.begin_tx()); - break; - } - case Ydb::Table::TransactionControl::TX_SELECTOR_NOT_SET: - YQL_ENSURE(false); + } else { + QueryState->TxCtx = MakeIntrusive<TKqpTransactionContext>(false); + QueryState->TxCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_UNDEFINED; } - QueryState->QueryCtx = MakeIntrusive<TKikimrQueryContext>(); - QueryState->QueryCtx->TimeProvider = TAppData::TimeProvider; - QueryState->QueryCtx->RandomProvider = TAppData::RandomProvider; + auto& queryCtx = QueryState->QueryCtx; + queryCtx->TimeProvider = TAppData::TimeProvider; + queryCtx->RandomProvider = TAppData::RandomProvider; const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery(); auto [success, issues] = ApplyTableOperations(QueryState->TxCtx.Get(), phyQuery); @@ -603,18 +645,17 @@ public: auto action = queryRequest.GetAction(); auto queryType = queryRequest.GetType(); - if (action == NKikimrKqp::QUERY_ACTION_EXECUTE) { - YQL_ENSURE(queryType == NKikimrKqp::QUERY_TYPE_SQL_DML); + if (action == NKikimrKqp::QUERY_ACTION_EXECUTE && queryType == NKikimrKqp::QUERY_TYPE_SQL_DML) { queryType = NKikimrKqp::QUERY_TYPE_PREPARED_DML; action = NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED; } - YQL_ENSURE(action == NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED, + YQL_ENSURE(action == NKikimrKqp::QUERY_ACTION_EXECUTE || action == NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED, "Unexpected query action, expected: QUERY_ACTION_EXECUTE_PREPARED, got: " << action); - YQL_ENSURE(queryType == NKikimrKqp::QUERY_TYPE_PREPARED_DML, + YQL_ENSURE(queryType == NKikimrKqp::QUERY_TYPE_PREPARED_DML || queryType == NKikimrKqp::QUERY_TYPE_SQL_SCAN, "Unexpected query type, expected: QUERY_TYPE_PREPARED_DML, got: " << queryType); - ParseParameters(std::move(*QueryState->Request.MutableParameters()), QueryState->QueryCtx->Parameters); + ParseParameters(std::move(*QueryState->Request.MutableParameters()), queryCtx->Parameters); return true; } @@ -659,8 +700,7 @@ public: request.CancelAfter = cancelAt - now; } - auto& queryRequest = queryState->Request; - EKikimrStatsMode statsMode = GetStatsModeInt(queryRequest, EKikimrStatsMode::Basic); + EKikimrStatsMode statsMode = GetStatsModeInt(queryState->Request, EKikimrStatsMode::Basic); request.StatsMode = GetStatsMode(statsMode); request.Snapshot = queryState->TxCtx->GetSnapshot(); @@ -677,13 +717,32 @@ public: return request; } + IKqpGateway::TExecPhysicalRequest PrepareScanRequest(TKqpQueryState *queryState) { + IKqpGateway::TExecPhysicalRequest request; + + request.Timeout = queryState->QueryDeadlines.TimeoutAt - TAppData::TimeProvider->Now(); + if (!request.Timeout) { + // TODO: Just cancel request. + request.Timeout = TDuration::MilliSeconds(1); + } + request.MaxComputeActors = Config->_KqpMaxComputeActors.Get().GetRef(); + EKikimrStatsMode statsMode = GetStatsModeInt(queryState->Request, EKikimrStatsMode::Basic); + request.StatsMode = GetStatsMode(statsMode); + request.DisableLlvmForUdfStages = Config->DisableLlvmForUdfStages(); + request.LlvmEnabled = Config->GetEnableLlvm() != EOptionalFlag::Disabled; + request.Snapshot = QueryState->TxCtx->GetSnapshot(); + + return request; + } + NKikimrMiniKQL::TParams* ValidateParameter(const TString& name, const NKikimrMiniKQL::TType& type) { - YQL_ENSURE(QueryState->QueryCtx); + auto& queryCtx = QueryState->QueryCtx; + YQL_ENSURE(queryCtx); auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); - auto parameter = QueryState->QueryCtx->Parameters.FindPtr(name); + auto parameter = queryCtx->Parameters.FindPtr(name); if (!parameter) { if (type.GetKind() == NKikimrMiniKQL::ETypeKind::Optional) { - auto& newParameter = QueryState->QueryCtx->Parameters[name]; + auto& newParameter = queryCtx->Parameters[name]; newParameter.MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Optional); *newParameter.MutableType()->MutableOptional()->MutableItem() = type.GetOptional().GetItem(); @@ -707,13 +766,14 @@ public: ValidateParameter(paramDesc.GetName(), paramDesc.GetType()); } - TKqpParamsMap paramsMap; + TKqpParamsMap paramsMap(QueryState->QueryCtx); for (const auto& paramBinding : tx.GetParamBindings()) { try { + auto& qCtx = QueryState->QueryCtx; auto it = paramsMap.Values.emplace(paramBinding.GetName(), - *GetParamValue(/*ensure*/ true, *QueryState->QueryCtx, QueryState->TxResults, paramBinding)); + *GetParamValue(/*ensure*/ true, *qCtx, qCtx->TxResults, paramBinding)); YQL_ENSURE(it.second); } catch (const yexception& ex) { auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); @@ -724,7 +784,7 @@ public: return paramsMap; } - bool ShouldAcquireLocks(const NKqpProto::TKqpPhyQuery& query) { + bool ShouldAcquireLocks(const NKqpProto::TKqpPhyQuery* query) { auto& txCtx = *QueryState->TxCtx; if (*txCtx.EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE) { @@ -739,7 +799,8 @@ public: return true; // Acquire locks in read write tx } - for (auto& tx : query.GetTransactions()) { + YQL_ENSURE(query); + for (auto& tx : query->GetTransactions()) { if (tx.GetHasEffects()) { return true; // Acquire locks in read write tx } @@ -756,8 +817,8 @@ public: return true; } - static TKqpParamsMap GetParamsRefMap(const TParamValueMap& map) { - TKqpParamsMap paramsMap; + TKqpParamsMap GetParamsRefMap(const TParamValueMap& map) { + TKqpParamsMap paramsMap(QueryState->QueryCtx); for (auto& [k, v] : map) { auto res = paramsMap.Values.emplace(k, NYql::NDq::TMkqlValueRef(v)); YQL_ENSURE(res.second); @@ -769,7 +830,8 @@ public: TParamValueMap CreateKqpValueMap(const NKqpProto::TKqpPhyTx& tx) { TParamValueMap paramsMap; for (const auto& paramBinding : tx.GetParamBindings()) { - auto paramValueRef = *GetParamValue(/*ensure*/ true, *QueryState->QueryCtx, QueryState->TxResults, + auto& qCtx = QueryState->QueryCtx; + auto paramValueRef = *GetParamValue(/*ensure*/ true, *qCtx, qCtx->TxResults, paramBinding); NKikimrMiniKQL::TParams param; @@ -822,9 +884,11 @@ public: } } - void ExecutePhyTx(const NKqpProto::TKqpPhyQuery* query, std::shared_ptr<const NKqpProto::TKqpPhyTx> tx, bool commit) { + bool ExecutePhyTx(const NKqpProto::TKqpPhyQuery* query, std::shared_ptr<const NKqpProto::TKqpPhyTx> tx, bool commit) { auto& txCtx = *QueryState->TxCtx; - auto request = PreparePhysicalRequest(QueryState.get()); + auto request = (query && query->GetType() == NKqpProto::TKqpPhyQuery::TYPE_SCAN) + ? PrepareScanRequest(QueryState.get()) + : PreparePhysicalRequest(QueryState.get()); LOG_D("ExecutePhyTx, tx: " << (void*)tx.get() << " commit: " << commit << " txCtx.DeferredEffects.size(): " << txCtx.DeferredEffects.Size()); @@ -834,6 +898,7 @@ public: switch (tx->GetType()) { case NKqpProto::TKqpPhyTx::TYPE_COMPUTE: case NKqpProto::TKqpPhyTx::TYPE_DATA: + case NKqpProto::TKqpPhyTx::TYPE_SCAN: break; default: YQL_ENSURE(false, "Unexpected physical tx type in data query: " << (ui32)tx->GetType()); @@ -844,12 +909,12 @@ public: YQL_ENSURE(commit); if (txCtx.DeferredEffects.Empty() && !txCtx.Locks.HasLocks()) { ReplySuccess(); - return; + return true; } } if (commit) { - Y_VERIFY_DEBUG(txCtx.DeferredEffects.Empty() || !txCtx.Locks.Broken()); + YQL_ENSURE(txCtx.DeferredEffects.Empty() || !txCtx.Locks.Broken()); for (const auto& effect : txCtx.DeferredEffects) { YQL_ENSURE(!effect.Node); @@ -874,23 +939,24 @@ public: request.Locks.emplace_back(lock.GetValueRef(txCtx.Locks.LockType)); } } - } else if (ShouldAcquireLocks(*query)) { + } else if (ShouldAcquireLocks(query)) { request.AcquireLocksTxId = txCtx.Locks.GetLockTxId(); } LWTRACK(KqpPhyQueryProposeTx, QueryState->Orbit, QueryState->CurrentTx, request.Transactions.size(), request.Locks.size(), request.AcquireLocksTxId.Defined()); SendToExecuter(std::move(request)); + return false; } void SendToExecuter(IKqpGateway::TExecPhysicalRequest&& request) { auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database, (QueryState && QueryState->UserToken) ? TMaybe<TString>(QueryState->UserToken) : Nothing(), RequestCounters); - auto executerId = TlsActivationContext->ExecutorThread.RegisterActor(executerActor); - LOG_D("Created new KQP executer: " << executerId); + ExecuterId = TlsActivationContext->ExecutorThread.RegisterActor(executerActor); + LOG_D("Created new KQP executer: " << ExecuterId); - auto ev = std::make_unique<TEvTxUserProxy::TEvProposeKqpTransaction>(executerId); + auto ev = std::make_unique<TEvTxUserProxy::TEvProposeKqpTransaction>(ExecuterId); Send(MakeTxProxyID(), ev.release()); } @@ -905,6 +971,7 @@ public: void HandleExecute(TEvKqpExecuter::TEvTxResponse::TPtr& ev) { auto* response = ev->Get()->Record.MutableResponse(); LOG_D("TEvTxResponse, CurrentTx: " << QueryState->CurrentTx << " response: " << response->DebugString()); + ExecuterId = TActorId{}; if (response->GetStatus() != Ydb::StatusIds::SUCCESS) { LOG_I("TEvTxResponse has non-success status, CurrentTx: " << QueryState->CurrentTx @@ -914,6 +981,7 @@ public: return; } + YQL_ENSURE(QueryState); LWTRACK(KqpPhyQueryTxResponse, QueryState->Orbit, QueryState->CurrentTx, response->GetResult().ResultsSize()); // save tx results auto& txResult = *response->MutableResult(); @@ -923,12 +991,7 @@ public: txResults[i].Swap(txResult.MutableResults(i)); } - QueryState->TxResults.emplace_back(std::move(txResults)); - - if (txResult.HasStats()) { - auto* exec = QueryState->Stats.AddExecutions(); - exec->Swap(txResult.MutableStats()); - } + QueryState->QueryCtx->TxResults.emplace_back(std::move(txResults)); // locks merge if (txResult.HasLocks()) { @@ -943,6 +1006,22 @@ public: } } + bool scan = QueryState->Request.GetType() == NKikimrKqp::QUERY_TYPE_SQL_SCAN; + if (scan) { + if (QueryState->RequestActorId && txResult.HasStats()) { + auto statsEv = MakeHolder<TEvKqpExecuter::TEvStreamProfile>(); + auto& record = statsEv->Record; + + record.MutableProfile()->Swap(txResult.MutableStats()); + Send(QueryState->RequestActorId, statsEv.Release()); + } + } else { + if (txResult.HasStats()) { + auto* exec = QueryState->Stats.AddExecutions(); + exec->Swap(txResult.MutableStats()); + } + } + if (QueryState->PreparedQuery && QueryState->CurrentTx < QueryState->PreparedQuery->GetPhysicalQuery().TransactionsSize()) { ExecuteOrDefer(); @@ -951,6 +1030,16 @@ public: } } + void HandleExecute(TEvKqpExecuter::TEvStreamData::TPtr& ev) { + YQL_ENSURE(QueryState && QueryState->RequestActorId); + LOG_D("TEvStreamData: " << ev->Get()->Record.DebugString()); + TlsActivationContext->Send(ev->Forward(QueryState->RequestActorId)); + } + + void HandleExecute(TEvKqpExecuter::TEvStreamDataAck::TPtr& ev) { + TlsActivationContext->Send(ev->Forward(ExecuterId)); + } + void HandleExecute(TEvKqp::TEvAbortExecution::TPtr& ev) { auto& msg = ev->Get()->Record; @@ -1045,10 +1134,35 @@ public: } } + // TODO: Remove? Is it actual for NewEngine? + void FillQueryProfile(const NKqpProto::TKqpStatsQuery& stats, NKikimrKqp::TQueryResponse& response) { + auto& kqlProfile = *response.MutableProfile()->AddKqlProfiles(); + for (auto& execStats : stats.GetExecutions()) { + auto& txStats = *kqlProfile.AddMkqlProfiles()->MutableTxStats(); + + txStats.SetDurationUs(execStats.GetDurationUs()); + for (auto& tableStats : execStats.GetTables()) { + auto& txTableStats = *txStats.AddTableAccessStats(); + + txTableStats.MutableTableInfo()->SetName(tableStats.GetTablePath()); + if (tableStats.GetReadRows() > 0) { + txTableStats.MutableSelectRange()->SetRows(tableStats.GetReadRows()); + txTableStats.MutableSelectRange()->SetBytes(tableStats.GetReadBytes()); + } + if (tableStats.GetWriteRows() > 0) { + txTableStats.MutableUpdateRow()->SetCount(tableStats.GetWriteRows()); + txTableStats.MutableUpdateRow()->SetBytes(tableStats.GetWriteBytes()); + } + if (tableStats.GetEraseRows() > 0) { + txTableStats.MutableEraseRow()->SetCount(tableStats.GetEraseRows()); + } + } + } + } + void FillStats(NKikimrKqp::TEvQueryResponse* record) { auto *response = record->MutableResponse(); - auto* stats = response->MutableQueryStats(); - stats->Swap(&QueryState->Stats); + auto* stats = &QueryState->Stats; stats->SetDurationUs((TInstant::Now() - QueryState->StartTime).MicroSeconds()); //stats->SetWorkerCpuTimeUs(QueryState->CpuTime.MicroSeconds()); @@ -1076,22 +1190,28 @@ public: return resultsSize; } ); - } - } + } - void FillTxInfo(NKikimrKqp::TQueryResponse* response) { - Y_VERIFY(QueryState); - if (QueryState->TxId) { - response->MutableTxMeta()->set_id(QueryState->TxId); + bool reportStats = (GetStatsModeInt(queryRequest, EKikimrStatsMode::None) != EKikimrStatsMode::None); + if (reportStats) { + FillQueryProfile(*stats, *response); + response->SetQueryPlan(SerializeAnalyzePlan(*stats)); + + response->MutableQueryStats()->Swap(stats); } + } + void FillTxInfo(NKikimrKqp::TQueryResponse* response) { + YQL_ENSURE(QueryState); if (QueryState->Commit) { RemoveTransaction(QueryState->TxId); + QueryState->TxId = ""; } + response->MutableTxMeta()->set_id(QueryState->TxId); if (QueryState->TxCtx) { auto txInfo = QueryState->TxCtx->GetInfo(); - LOG_N("txInfo" + LOG_I("txInfo" << " Status: " << txInfo.Status << " Kind: " << txInfo.Kind << " TotalDuration: " << txInfo.TotalDuration.SecondsFloat()*1e3 @@ -1163,7 +1283,7 @@ public: auto txIndex = rb.GetTxResultBinding().GetTxIndex(); auto resultIndex = rb.GetTxResultBinding().GetResultIndex(); - auto& txResults = QueryState->TxResults; + auto& txResults = QueryState->QueryCtx->TxResults; YQL_ENSURE(txIndex < txResults.size()); YQL_ENSURE(resultIndex < txResults[txIndex].size()); @@ -1187,7 +1307,30 @@ public: bool ReplyQueryCompileError(const TKqpCompileResult::TConstPtr& compileResult) { auto responseEv = std::make_unique<TEvKqp::TEvQueryResponse>(); FillCompileStatus(compileResult, responseEv->Record); - responseEv->Record.GetRef().SetConsumedRu(1); + + auto& queryRequest = QueryState->Request; + TString txId = ""; + if (queryRequest.HasTxControl()) { + auto& txControl = queryRequest.GetTxControl(); + + if (txControl.tx_selector_case() == Ydb::Table::TransactionControl::kTxId) { + txId = txControl.tx_id(); + } + } + + LOG_W("ReplyQueryCompileError, status" << compileResult->Status << " remove tx with tx_id: " << txId); + auto txCtx = FindTransaction(txId); + if (txCtx) { + txCtx->Invalidate(); + AbortedTransactions.emplace_back(txCtx); + RemoveTransaction(txId); + } + txId = ""; + + auto* record = &responseEv->Record.GetRef(); + FillTxInfo(record->MutableResponse()); + record->SetConsumedRu(1); + return Reply(std::move(responseEv)); } @@ -1218,7 +1361,7 @@ public: auto& queryRequest = QueryState->Request; auto queryDuration = TInstant::Now() - QueryState->StartTime; - Y_VERIFY(Counters); + YQL_ENSURE(Counters); Counters->ReportQueryLatency(Settings.DbCounters, queryRequest.GetAction(), queryDuration); auto& record = responseEv->Record.GetRef(); @@ -1372,6 +1515,7 @@ public: } void EndCleanup(bool isFinal) { + LOG_D("EndCleanup, isFinal: " << isFinal); if (isFinal) { auto lifeSpan = TInstant::Now() - CreationTime; Counters->ReportSessionActorFinished(Settings.DbCounters, lifeSpan); @@ -1385,6 +1529,7 @@ public: PassAway(); } else { + AbortedTransactions.clear(); CleanupCtx.reset(); StartIdleTimer(); Become(&TKqpSessionActor::ReadyState); @@ -1412,6 +1557,8 @@ public: } } + LOG_I("Cleanup start, isFinal: " << isFinal << " CleanupCtx: " << bool{CleanupCtx} + << " AbortedTransactions.size(): " << AbortedTransactions.size()); QueryState.reset(); if (CleanupCtx) { Become(&TKqpSessionActor::CleanupState); @@ -1504,6 +1651,10 @@ public: switch (ev->GetTypeRewrite()) { hFunc(TEvKqp::TEvQueryRequest, HandleExecute); hFunc(TEvKqpExecuter::TEvTxResponse, HandleExecute); + + hFunc(TEvKqpExecuter::TEvStreamData, HandleExecute); + hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleExecute); + hFunc(TEvKqpExecuter::TEvExecuterProgress, HandleNoop); hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleExecute); hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, HandleExecute); @@ -1568,6 +1719,7 @@ private: TIntrusivePtr<TModuleResolverState> ModuleResolverState; TKqpSettings::TConstPtr KqpSettings; std::optional<TActorId> WorkerId; + TActorId ExecuterId; std::unique_ptr<TKqpQueryState> QueryState; std::unique_ptr<TKqpCleanupCtx> CleanupCtx; diff --git a/ydb/core/kqp/kqp_worker_actor.cpp b/ydb/core/kqp/kqp_worker_actor.cpp index 9cde5d9782..0efd2bb996 100644 --- a/ydb/core/kqp/kqp_worker_actor.cpp +++ b/ydb/core/kqp/kqp_worker_actor.cpp @@ -852,12 +852,12 @@ private: bool keepInCache = false; switch (queryRequest.GetAction()) { case NKikimrKqp::QUERY_ACTION_EXECUTE: - query = TKqpQueryId(Settings.Cluster, Settings.Database, queryRequest.GetQuery()); + query = TKqpQueryId(Settings.Cluster, Settings.Database, queryRequest.GetQuery(), /*scan*/ false); keepInCache = queryRequest.GetQueryCachePolicy().keep_in_cache(); break; case NKikimrKqp::QUERY_ACTION_PREPARE: - query = TKqpQueryId(Settings.Cluster, Settings.Database, queryRequest.GetQuery()); + query = TKqpQueryId(Settings.Cluster, Settings.Database, queryRequest.GetQuery(), /*scan*/ false); keepInCache = true; break; diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h index 6903e6f193..357fed9460 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider.h @@ -108,6 +108,9 @@ struct TKikimrQueryContext : TThrRefBase { THashMap<ui64, TIntrusivePtr<IKikimrQueryExecutor::TAsyncQueryResult>> InProgress; TVector<ui64> ExecutionOrder; + // Used to store results of transactions in TKqpSessionActor + TVector<TVector<NKikimrMiniKQL::TResult>> TxResults; + NActors::TActorId ReplyTarget; TMaybe<NKikimrKqp::TRlPath> RlPath; |