aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVladislav Kuznetsov <va.kuznecov@physics.msu.ru>2022-04-12 19:34:10 +0300
committerVladislav Kuznetsov <va.kuznecov@physics.msu.ru>2022-04-12 19:34:10 +0300
commit5ec07a4ecf15d30c6b6d8bdca12ad9c137ca71e3 (patch)
treededee2289f7258a0ed22b8d38ccaed2ae1bbcb18
parent1edfc13e1889542c6d5cf4cfaa9975181bf27686 (diff)
downloadydb-5ec07a4ecf15d30c6b6d8bdca12ad9c137ca71e3.tar.gz
SCAN requests in TSessionActor KIKIMR-14551
ref:a0d2893702c3be4fe723f014ba44c4121300e85a
-rw-r--r--ydb/core/kqp/host/kqp_host.cpp31
-rw-r--r--ydb/core/kqp/host/kqp_host.h3
-rw-r--r--ydb/core/kqp/kqp.h12
-rw-r--r--ydb/core/kqp/kqp_compile_actor.cpp4
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp326
-rw-r--r--ydb/core/kqp/kqp_worker_actor.cpp4
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_provider.h3
7 files changed, 282 insertions, 101 deletions
diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp
index 03539d916a..0c39b83092 100644
--- a/ydb/core/kqp/host/kqp_host.cpp
+++ b/ydb/core/kqp/host/kqp_host.cpp
@@ -1372,6 +1372,20 @@ public:
});
}
+ IAsyncQueryResultPtr PrepareScanQuery(const TString& query, bool isSql, const TPrepareSettings& /*settings*/) override {
+ return CheckedProcessQuery(*ExprCtx,
+ [this, &query, isSql] (TExprContext& ctx) mutable {
+ return PrepareScanQueryInternal(query, isSql, ctx);
+ });
+ }
+
+ TQueryResult SyncPrepareScanQuery(const TString& query, bool isSql, const TPrepareSettings& settings) override {
+ return CheckedSyncProcessQuery(
+ [this, &query, isSql, settings] () mutable {
+ return PrepareScanQuery(query, isSql, settings);
+ });
+ }
+
IAsyncQueryResultPtr ExecuteScanQuery(const TString& query, bool isSql, NKikimrMiniKQL::TParams&& parameters,
const NActors::TActorId& target, const IKikimrQueryExecutor::TExecuteSettings& settings) override
{
@@ -1754,10 +1768,7 @@ private:
}
IAsyncQueryResultPtr ExplainScanQueryInternal(const TString& query, bool isSql, TExprContext& ctx) {
- auto prepareResult = isSql
- ? PrepareScanQueryInternal(query, ctx)
- : PrepareScanQueryAstInternal(query, ctx);
- return prepareResult;
+ return PrepareScanQueryInternal(query, isSql, ctx);
}
IAsyncQueryResultPtr PrepareDataQueryInternal(const TString& query, const TPrepareSettings& settings,
@@ -1815,6 +1826,14 @@ private:
SessionCtx, *ExecuteCtx);
}
+ IAsyncQueryResultPtr PrepareScanQueryInternal(const TString& query, bool isSql, TExprContext& ctx,
+ EKikimrStatsMode statsMode = EKikimrStatsMode::None)
+ {
+ return isSql
+ ? PrepareScanQueryInternal(query, ctx, statsMode)
+ : PrepareScanQueryAstInternal(query, ctx);
+ }
+
IAsyncQueryResultPtr PrepareScanQueryInternal(const TString& query, TExprContext& ctx, EKikimrStatsMode statsMode = EKikimrStatsMode::None) {
SetupYqlTransformer(nullptr);
@@ -2069,9 +2088,7 @@ private:
NKikimrMiniKQL::TParams&& parameters, const NActors::TActorId& target,
const IKikimrQueryExecutor::TExecuteSettings& settings, TExprContext& ctx)
{
- auto prepareResult = isSql
- ? PrepareScanQueryInternal(query, ctx, settings.StatsMode)
- : PrepareScanQueryAstInternal(query, ctx);
+ auto prepareResult = PrepareScanQueryInternal(query, isSql, ctx, settings.StatsMode);
if (!prepareResult) {
return nullptr;
diff --git a/ydb/core/kqp/host/kqp_host.h b/ydb/core/kqp/host/kqp_host.h
index 11282d0ebe..721f04c8c4 100644
--- a/ydb/core/kqp/host/kqp_host.h
+++ b/ydb/core/kqp/host/kqp_host.h
@@ -94,6 +94,9 @@ public:
virtual TQueryResult SyncExecuteSchemeQuery(const TString& query, bool isSql) = 0;
/* Scan queries */
+ virtual IAsyncQueryResultPtr PrepareScanQuery(const TString& query, bool isSql, const TPrepareSettings& settings) = 0;
+ virtual TQueryResult SyncPrepareScanQuery(const TString& query, bool isSql, const TPrepareSettings& settings) = 0;
+
virtual IAsyncQueryResultPtr ExecuteScanQuery(const TString& query, bool isSql,
NKikimrMiniKQL::TParams&& parameters, const NActors::TActorId& target,
const NYql::IKikimrQueryExecutor::TExecuteSettings& settings) = 0;
diff --git a/ydb/core/kqp/kqp.h b/ydb/core/kqp/kqp.h
index 09f794591d..b2a9a35ef0 100644
--- a/ydb/core/kqp/kqp.h
+++ b/ydb/core/kqp/kqp.h
@@ -133,12 +133,15 @@ struct TKqpQueryId {
TString UserSid;
TString Text;
TKqpQuerySettings Settings;
+ bool Scan;
public:
- TKqpQueryId(const TString& cluster, const TString& database, const TString& text)
+ TKqpQueryId(const TString& cluster, const TString& database, const TString& text, bool scan)
: Cluster(cluster)
, Database(database)
- , Text(text) {}
+ , Text(text)
+ , Scan(scan)
+ {}
bool operator==(const TKqpQueryId& other) const {
return
@@ -146,7 +149,8 @@ public:
Database == other.Database &&
UserSid == other.UserSid &&
Text == other.Text &&
- Settings == other.Settings;
+ Settings == other.Settings &&
+ Scan == other.Scan;
}
bool operator!=(const TKqpQueryId& other) {
@@ -159,7 +163,7 @@ public:
bool operator>=(const TKqpQueryId&) = delete;
size_t GetHash() const noexcept {
- auto tuple = std::make_tuple(Cluster, Database, UserSid, Text, Settings);
+ auto tuple = std::make_tuple(Cluster, Database, UserSid, Text, Settings, Scan);
return THash<decltype(tuple)>()(tuple);
}
};
diff --git a/ydb/core/kqp/kqp_compile_actor.cpp b/ydb/core/kqp/kqp_compile_actor.cpp
index 25c6723cf6..ee471926fb 100644
--- a/ydb/core/kqp/kqp_compile_actor.cpp
+++ b/ydb/core/kqp/kqp_compile_actor.cpp
@@ -105,7 +105,9 @@ public:
NCpuTime::TCpuTimer timer(CompileCpuTime);
- AsyncCompileResult = KqpHost->PrepareDataQuery(Query.Text, prepareSettings);
+ AsyncCompileResult = Query.Scan
+ ? KqpHost->PrepareScanQuery(Query.Text, true, prepareSettings)
+ : KqpHost->PrepareDataQuery(Query.Text, prepareSettings);
Continue(ctx);
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index 6d5d8c3f26..bb9a72dfbd 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -8,6 +8,7 @@
#include <ydb/core/kqp/executer/kqp_executer.h>
#include <ydb/core/kqp/host/kqp_host_impl.h>
#include <ydb/core/kqp/prepare/kqp_prepare.h>
+#include <ydb/core/kqp/prepare/kqp_query_plan.h>
#include <ydb/core/kqp/provider/yql_kikimr_provider.h>
#include <ydb/core/kqp/provider/yql_kikimr_results.h>
#include <ydb/core/kqp/rm/kqp_snapshot_manager.h>
@@ -61,9 +62,9 @@ struct TKqpQueryState {
TPreparedQueryConstPtr PreparedQuery;
TKqpCompileResult::TConstPtr CompileResult;
NKqpProto::TKqpStatsCompile CompileStats;
- TIntrusivePtr<TKikimrQueryContext> QueryCtx;
TIntrusivePtr<TKqpTransactionContext> TxCtx;
- TVector<TVector<NKikimrMiniKQL::TResult>> TxResults;
+ std::shared_ptr<TKikimrQueryContext> QueryCtx = std::make_shared<TKikimrQueryContext>();
+ TActorId RequestActorId;
ui64 CurrentTx = 0;
TString TraceId;
@@ -75,7 +76,6 @@ struct TKqpQueryState {
TString UserToken;
-
NLWTrace::TOrbit Orbit;
TString TxId; // User tx
@@ -254,30 +254,38 @@ public:
YQL_ENSURE(txControl.tx_selector_case() == Ydb::Table::TransactionControl::kTxId, "Can't commit transaction - "
<< " there is no TxId in Query's TxControl, queryRequest: " << queryRequest.DebugString());
- LOG_D("queryRequest TxControl: " << txControl.DebugString());
QueryState->Commit = txControl.commit_tx();
const auto& txId = txControl.tx_id();
auto txCtx = FindTransaction(txId);
- YQL_ENSURE(txCtx, "Can't find txId: " << txId);
+ LOG_D("queryRequest TxControl: " << txControl.DebugString() << " txCtx: " << (void*)txCtx.Get());
+ if (!txCtx) {
+ std::vector<TIssue> issues{YqlIssue(TPosition(), TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND,
+ TStringBuilder() << "Transaction not found: " << QueryState->TxId)};
+ auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
+ ReplyQueryError(requestInfo, Ydb::StatusIds::NOT_FOUND, "", MessageFromIssues(issues));
+ return;
+ }
QueryState->TxCtx = std::move(txCtx);
QueryState->TxId = txId;
- ExecutePhyTx(/*query*/ nullptr, /*tx*/ nullptr, /*commit*/ true);
+ bool replied = ExecutePhyTx(/*query*/ nullptr, /*tx*/ nullptr, /*commit*/ true);
- Become(&TKqpSessionActor::ExecuteState);
+ if (!replied) {
+ Become(&TKqpSessionActor::ExecuteState);
+ }
}
static bool IsQueryTypeSupported(NKikimrKqp::EQueryType type) {
switch (type) {
case NKikimrKqp::QUERY_TYPE_SQL_DML:
case NKikimrKqp::QUERY_TYPE_PREPARED_DML:
+ case NKikimrKqp::QUERY_TYPE_SQL_SCAN:
return true;
// should not be compiled. TODO: forward to request executer
// not supported yet
case NKikimrKqp::QUERY_TYPE_SQL_DDL:
- case NKikimrKqp::QUERY_TYPE_SQL_SCAN:
case NKikimrKqp::QUERY_TYPE_AST_SCAN:
case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT:
case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING:
@@ -313,6 +321,7 @@ public:
<< " prepared: " << queryRequest.HasPreparedQuery()
<< " tx_control: " << queryRequest.HasTxControl()
<< " action: " << action
+ << " type: " << (queryRequest.HasType() ? queryRequest.GetType() : NKikimrKqp::QUERY_TYPE_UNDEFINED)
);
QueryState->Sender = ev->Sender;
@@ -322,6 +331,7 @@ public:
QueryState->UserToken = event.GetUserToken();
QueryState->QueryDeadlines = GetQueryDeadlines(queryRequest);
QueryState->ParametersSize = queryRequest.GetParameters().ByteSize();
+ QueryState->RequestActorId = ActorIdFromProto(event.GetRequestActorId());
switch (action) {
case NKikimrKqp::QUERY_ACTION_EXECUTE:
@@ -376,14 +386,15 @@ public:
TMaybe<TString> uid;
bool keepInCache = false;
+ bool scan = queryRequest.GetType() == NKikimrKqp::QUERY_TYPE_SQL_SCAN;
switch (queryRequest.GetAction()) {
case NKikimrKqp::QUERY_ACTION_EXECUTE:
- query = TKqpQueryId(Settings.Cluster, Settings.Database, queryRequest.GetQuery());
+ query = TKqpQueryId(Settings.Cluster, Settings.Database, queryRequest.GetQuery(), scan);
keepInCache = queryRequest.GetQueryCachePolicy().keep_in_cache();
break;
case NKikimrKqp::QUERY_ACTION_PREPARE:
- query = TKqpQueryId(Settings.Cluster, Settings.Database, queryRequest.GetQuery());
+ query = TKqpQueryId(Settings.Cluster, Settings.Database, queryRequest.GetQuery(), scan);
keepInCache = true;
break;
@@ -423,7 +434,6 @@ public:
if (ReplyQueryCompileError(compileResult)) {
Cleanup();
StartIdleTimer();
- Become(&TThis::ReadyState);
} else {
FinalCleanup();
}
@@ -435,30 +445,32 @@ public:
YQL_ENSURE(compiledVersion == NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1,
"Invalid compiled version: " << compiledVersion);
+ QueryState->CompileResult = compileResult;
+ QueryState->CompileStats.Swap(&ev->Get()->Stats);
+ QueryState->PreparedQuery = compileResult->PreparedQuery;
+ QueryState->Request.SetQuery(QueryState->PreparedQuery->GetText());
+
auto& queryRequest = QueryState->Request;
if (queryRequest.GetAction() == NKikimrKqp::QUERY_ACTION_PREPARE) {
if (ReplyPrepareResult(compileResult)) {
Cleanup();
StartIdleTimer();
- Become(&TThis::ReadyState);
} else {
FinalCleanup();
}
return;
}
- QueryState->CompileResult = compileResult;
- QueryState->CompileStats.Swap(&ev->Get()->Stats);
- QueryState->PreparedQuery = compileResult->PreparedQuery;
- QueryState->Request.SetQuery(QueryState->PreparedQuery->GetText());
-
if (!PrepareQueryContext()) {
return;
}
Become(&TKqpSessionActor::ExecuteState);
- if (NeedSnapshot(*QueryState->TxCtx, *Config, /*rollback*/ false, QueryState->Commit,
+ if (queryRequest.GetType() == NKikimrKqp::QUERY_TYPE_SQL_SCAN) {
+ AcquirePersistentSnapshot();
+ return;
+ } else if (NeedSnapshot(*QueryState->TxCtx, *Config, /*rollback*/ false, QueryState->Commit,
&QueryState->PreparedQuery->GetPhysicalQuery(), /*preparedKql*/ nullptr)) {
AcquireMvccSnapshot();
return;
@@ -467,6 +479,29 @@ public:
ExecuteOrDefer();
}
+ void AcquirePersistentSnapshot() {
+ auto timeout = QueryState->QueryDeadlines.TimeoutAt - TAppData::TimeProvider->Now();
+
+ auto* snapMgr = CreateKqpSnapshotManager(Settings.Database, timeout);
+ auto snapMgrActorId = TlsActivationContext->ExecutorThread.RegisterActor(snapMgr);
+
+ THashSet<TString> tablesSet;
+ const auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
+ for (const auto& phyTx: phyQuery.GetTransactions()) {
+ for (const auto& stage: phyTx.GetStages()) {
+ for (const auto& tableOp: stage.GetTableOps()) {
+ tablesSet.insert(tableOp.GetTable().GetPath());
+ }
+ }
+ }
+ TVector<TString> tables(tablesSet.begin(), tablesSet.end());
+
+ auto ev = std::make_unique<TEvKqpSnapshot::TEvCreateSnapshotRequest>(tables);
+ Send(snapMgrActorId, ev.release());
+
+ QueryState->TxCtx->SnapshotHandle.ManagingActor = snapMgrActorId;
+ }
+
void AcquireMvccSnapshot() {
LOG_D("AcquireMvccSnapshot");
auto timeout = QueryState->QueryDeadlines.TimeoutAt - TAppData::TimeProvider->Now();
@@ -482,13 +517,11 @@ public:
auto *response = ev->Get();
if (response->Status != NKikimrIssues::TStatusIds::SUCCESS) {
+ // TODO
(void)response->Issues;
return;
}
-
- QueryState->TxCtx->SnapshotHandle = IKqpGateway::TKqpSnapshotHandle{
- .Snapshot = response->Snapshot,
- };
+ QueryState->TxCtx->SnapshotHandle.Snapshot = response->Snapshot;
// Can reply inside (in case of deferred-only transactions) and become ReadyState
ExecuteOrDefer();
@@ -566,31 +599,40 @@ public:
auto& queryRequest = QueryState->Request;
- YQL_ENSURE(queryRequest.HasTxControl());
- auto& txControl = queryRequest.GetTxControl();
-
- QueryState->Commit = txControl.commit_tx();
- switch (txControl.tx_selector_case()) {
- case Ydb::Table::TransactionControl::kTxId: {
- TString txId = txControl.tx_id();
- auto it = ExplicitTransactions.Find(txId);
- YQL_ENSURE(it != ExplicitTransactions.End());
- QueryState->TxCtx = *it;
- QueryState->TxId = txId;
- QueryState->TxCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE;
- break;
+ if (queryRequest.HasTxControl()) {
+ auto& txControl = queryRequest.GetTxControl();
+
+ QueryState->Commit = txControl.commit_tx();
+ switch (txControl.tx_selector_case()) {
+ case Ydb::Table::TransactionControl::kTxId: {
+ TString txId = txControl.tx_id();
+ auto it = ExplicitTransactions.Find(txId);
+ if (it == ExplicitTransactions.End()) {
+ std::vector<TIssue> issues{YqlIssue(TPosition(), TIssuesIds::KIKIMR_TRANSACTION_NOT_FOUND,
+ TStringBuilder() << "Transaction not found: " << QueryState->TxId)};
+ ReplyQueryError(requestInfo, Ydb::StatusIds::BAD_REQUEST, "", MessageFromIssues(issues));
+ return false;
+ }
+ QueryState->TxCtx = *it;
+ QueryState->TxId = txId;
+ QueryState->TxCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE;
+ break;
+ }
+ case Ydb::Table::TransactionControl::kBeginTx: {
+ BeginTx(txControl.begin_tx());
+ break;
+ }
+ case Ydb::Table::TransactionControl::TX_SELECTOR_NOT_SET:
+ YQL_ENSURE(false);
}
- case Ydb::Table::TransactionControl::kBeginTx: {
- BeginTx(txControl.begin_tx());
- break;
- }
- case Ydb::Table::TransactionControl::TX_SELECTOR_NOT_SET:
- YQL_ENSURE(false);
+ } else {
+ QueryState->TxCtx = MakeIntrusive<TKqpTransactionContext>(false);
+ QueryState->TxCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_UNDEFINED;
}
- QueryState->QueryCtx = MakeIntrusive<TKikimrQueryContext>();
- QueryState->QueryCtx->TimeProvider = TAppData::TimeProvider;
- QueryState->QueryCtx->RandomProvider = TAppData::RandomProvider;
+ auto& queryCtx = QueryState->QueryCtx;
+ queryCtx->TimeProvider = TAppData::TimeProvider;
+ queryCtx->RandomProvider = TAppData::RandomProvider;
const NKqpProto::TKqpPhyQuery& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
auto [success, issues] = ApplyTableOperations(QueryState->TxCtx.Get(), phyQuery);
@@ -603,18 +645,17 @@ public:
auto action = queryRequest.GetAction();
auto queryType = queryRequest.GetType();
- if (action == NKikimrKqp::QUERY_ACTION_EXECUTE) {
- YQL_ENSURE(queryType == NKikimrKqp::QUERY_TYPE_SQL_DML);
+ if (action == NKikimrKqp::QUERY_ACTION_EXECUTE && queryType == NKikimrKqp::QUERY_TYPE_SQL_DML) {
queryType = NKikimrKqp::QUERY_TYPE_PREPARED_DML;
action = NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED;
}
- YQL_ENSURE(action == NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED,
+ YQL_ENSURE(action == NKikimrKqp::QUERY_ACTION_EXECUTE || action == NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED,
"Unexpected query action, expected: QUERY_ACTION_EXECUTE_PREPARED, got: " << action);
- YQL_ENSURE(queryType == NKikimrKqp::QUERY_TYPE_PREPARED_DML,
+ YQL_ENSURE(queryType == NKikimrKqp::QUERY_TYPE_PREPARED_DML || queryType == NKikimrKqp::QUERY_TYPE_SQL_SCAN,
"Unexpected query type, expected: QUERY_TYPE_PREPARED_DML, got: " << queryType);
- ParseParameters(std::move(*QueryState->Request.MutableParameters()), QueryState->QueryCtx->Parameters);
+ ParseParameters(std::move(*QueryState->Request.MutableParameters()), queryCtx->Parameters);
return true;
}
@@ -659,8 +700,7 @@ public:
request.CancelAfter = cancelAt - now;
}
- auto& queryRequest = queryState->Request;
- EKikimrStatsMode statsMode = GetStatsModeInt(queryRequest, EKikimrStatsMode::Basic);
+ EKikimrStatsMode statsMode = GetStatsModeInt(queryState->Request, EKikimrStatsMode::Basic);
request.StatsMode = GetStatsMode(statsMode);
request.Snapshot = queryState->TxCtx->GetSnapshot();
@@ -677,13 +717,32 @@ public:
return request;
}
+ IKqpGateway::TExecPhysicalRequest PrepareScanRequest(TKqpQueryState *queryState) {
+ IKqpGateway::TExecPhysicalRequest request;
+
+ request.Timeout = queryState->QueryDeadlines.TimeoutAt - TAppData::TimeProvider->Now();
+ if (!request.Timeout) {
+ // TODO: Just cancel request.
+ request.Timeout = TDuration::MilliSeconds(1);
+ }
+ request.MaxComputeActors = Config->_KqpMaxComputeActors.Get().GetRef();
+ EKikimrStatsMode statsMode = GetStatsModeInt(queryState->Request, EKikimrStatsMode::Basic);
+ request.StatsMode = GetStatsMode(statsMode);
+ request.DisableLlvmForUdfStages = Config->DisableLlvmForUdfStages();
+ request.LlvmEnabled = Config->GetEnableLlvm() != EOptionalFlag::Disabled;
+ request.Snapshot = QueryState->TxCtx->GetSnapshot();
+
+ return request;
+ }
+
NKikimrMiniKQL::TParams* ValidateParameter(const TString& name, const NKikimrMiniKQL::TType& type) {
- YQL_ENSURE(QueryState->QueryCtx);
+ auto& queryCtx = QueryState->QueryCtx;
+ YQL_ENSURE(queryCtx);
auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
- auto parameter = QueryState->QueryCtx->Parameters.FindPtr(name);
+ auto parameter = queryCtx->Parameters.FindPtr(name);
if (!parameter) {
if (type.GetKind() == NKikimrMiniKQL::ETypeKind::Optional) {
- auto& newParameter = QueryState->QueryCtx->Parameters[name];
+ auto& newParameter = queryCtx->Parameters[name];
newParameter.MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Optional);
*newParameter.MutableType()->MutableOptional()->MutableItem() = type.GetOptional().GetItem();
@@ -707,13 +766,14 @@ public:
ValidateParameter(paramDesc.GetName(), paramDesc.GetType());
}
- TKqpParamsMap paramsMap;
+ TKqpParamsMap paramsMap(QueryState->QueryCtx);
for (const auto& paramBinding : tx.GetParamBindings()) {
try {
+ auto& qCtx = QueryState->QueryCtx;
auto it = paramsMap.Values.emplace(paramBinding.GetName(),
- *GetParamValue(/*ensure*/ true, *QueryState->QueryCtx, QueryState->TxResults, paramBinding));
+ *GetParamValue(/*ensure*/ true, *qCtx, qCtx->TxResults, paramBinding));
YQL_ENSURE(it.second);
} catch (const yexception& ex) {
auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
@@ -724,7 +784,7 @@ public:
return paramsMap;
}
- bool ShouldAcquireLocks(const NKqpProto::TKqpPhyQuery& query) {
+ bool ShouldAcquireLocks(const NKqpProto::TKqpPhyQuery* query) {
auto& txCtx = *QueryState->TxCtx;
if (*txCtx.EffectiveIsolationLevel != NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE) {
@@ -739,7 +799,8 @@ public:
return true; // Acquire locks in read write tx
}
- for (auto& tx : query.GetTransactions()) {
+ YQL_ENSURE(query);
+ for (auto& tx : query->GetTransactions()) {
if (tx.GetHasEffects()) {
return true; // Acquire locks in read write tx
}
@@ -756,8 +817,8 @@ public:
return true;
}
- static TKqpParamsMap GetParamsRefMap(const TParamValueMap& map) {
- TKqpParamsMap paramsMap;
+ TKqpParamsMap GetParamsRefMap(const TParamValueMap& map) {
+ TKqpParamsMap paramsMap(QueryState->QueryCtx);
for (auto& [k, v] : map) {
auto res = paramsMap.Values.emplace(k, NYql::NDq::TMkqlValueRef(v));
YQL_ENSURE(res.second);
@@ -769,7 +830,8 @@ public:
TParamValueMap CreateKqpValueMap(const NKqpProto::TKqpPhyTx& tx) {
TParamValueMap paramsMap;
for (const auto& paramBinding : tx.GetParamBindings()) {
- auto paramValueRef = *GetParamValue(/*ensure*/ true, *QueryState->QueryCtx, QueryState->TxResults,
+ auto& qCtx = QueryState->QueryCtx;
+ auto paramValueRef = *GetParamValue(/*ensure*/ true, *qCtx, qCtx->TxResults,
paramBinding);
NKikimrMiniKQL::TParams param;
@@ -822,9 +884,11 @@ public:
}
}
- void ExecutePhyTx(const NKqpProto::TKqpPhyQuery* query, std::shared_ptr<const NKqpProto::TKqpPhyTx> tx, bool commit) {
+ bool ExecutePhyTx(const NKqpProto::TKqpPhyQuery* query, std::shared_ptr<const NKqpProto::TKqpPhyTx> tx, bool commit) {
auto& txCtx = *QueryState->TxCtx;
- auto request = PreparePhysicalRequest(QueryState.get());
+ auto request = (query && query->GetType() == NKqpProto::TKqpPhyQuery::TYPE_SCAN)
+ ? PrepareScanRequest(QueryState.get())
+ : PreparePhysicalRequest(QueryState.get());
LOG_D("ExecutePhyTx, tx: " << (void*)tx.get() << " commit: " << commit
<< " txCtx.DeferredEffects.size(): " << txCtx.DeferredEffects.Size());
@@ -834,6 +898,7 @@ public:
switch (tx->GetType()) {
case NKqpProto::TKqpPhyTx::TYPE_COMPUTE:
case NKqpProto::TKqpPhyTx::TYPE_DATA:
+ case NKqpProto::TKqpPhyTx::TYPE_SCAN:
break;
default:
YQL_ENSURE(false, "Unexpected physical tx type in data query: " << (ui32)tx->GetType());
@@ -844,12 +909,12 @@ public:
YQL_ENSURE(commit);
if (txCtx.DeferredEffects.Empty() && !txCtx.Locks.HasLocks()) {
ReplySuccess();
- return;
+ return true;
}
}
if (commit) {
- Y_VERIFY_DEBUG(txCtx.DeferredEffects.Empty() || !txCtx.Locks.Broken());
+ YQL_ENSURE(txCtx.DeferredEffects.Empty() || !txCtx.Locks.Broken());
for (const auto& effect : txCtx.DeferredEffects) {
YQL_ENSURE(!effect.Node);
@@ -874,23 +939,24 @@ public:
request.Locks.emplace_back(lock.GetValueRef(txCtx.Locks.LockType));
}
}
- } else if (ShouldAcquireLocks(*query)) {
+ } else if (ShouldAcquireLocks(query)) {
request.AcquireLocksTxId = txCtx.Locks.GetLockTxId();
}
LWTRACK(KqpPhyQueryProposeTx, QueryState->Orbit, QueryState->CurrentTx, request.Transactions.size(),
request.Locks.size(), request.AcquireLocksTxId.Defined());
SendToExecuter(std::move(request));
+ return false;
}
void SendToExecuter(IKqpGateway::TExecPhysicalRequest&& request) {
auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database,
(QueryState && QueryState->UserToken) ? TMaybe<TString>(QueryState->UserToken) : Nothing(),
RequestCounters);
- auto executerId = TlsActivationContext->ExecutorThread.RegisterActor(executerActor);
- LOG_D("Created new KQP executer: " << executerId);
+ ExecuterId = TlsActivationContext->ExecutorThread.RegisterActor(executerActor);
+ LOG_D("Created new KQP executer: " << ExecuterId);
- auto ev = std::make_unique<TEvTxUserProxy::TEvProposeKqpTransaction>(executerId);
+ auto ev = std::make_unique<TEvTxUserProxy::TEvProposeKqpTransaction>(ExecuterId);
Send(MakeTxProxyID(), ev.release());
}
@@ -905,6 +971,7 @@ public:
void HandleExecute(TEvKqpExecuter::TEvTxResponse::TPtr& ev) {
auto* response = ev->Get()->Record.MutableResponse();
LOG_D("TEvTxResponse, CurrentTx: " << QueryState->CurrentTx << " response: " << response->DebugString());
+ ExecuterId = TActorId{};
if (response->GetStatus() != Ydb::StatusIds::SUCCESS) {
LOG_I("TEvTxResponse has non-success status, CurrentTx: " << QueryState->CurrentTx
@@ -914,6 +981,7 @@ public:
return;
}
+ YQL_ENSURE(QueryState);
LWTRACK(KqpPhyQueryTxResponse, QueryState->Orbit, QueryState->CurrentTx, response->GetResult().ResultsSize());
// save tx results
auto& txResult = *response->MutableResult();
@@ -923,12 +991,7 @@ public:
txResults[i].Swap(txResult.MutableResults(i));
}
- QueryState->TxResults.emplace_back(std::move(txResults));
-
- if (txResult.HasStats()) {
- auto* exec = QueryState->Stats.AddExecutions();
- exec->Swap(txResult.MutableStats());
- }
+ QueryState->QueryCtx->TxResults.emplace_back(std::move(txResults));
// locks merge
if (txResult.HasLocks()) {
@@ -943,6 +1006,22 @@ public:
}
}
+ bool scan = QueryState->Request.GetType() == NKikimrKqp::QUERY_TYPE_SQL_SCAN;
+ if (scan) {
+ if (QueryState->RequestActorId && txResult.HasStats()) {
+ auto statsEv = MakeHolder<TEvKqpExecuter::TEvStreamProfile>();
+ auto& record = statsEv->Record;
+
+ record.MutableProfile()->Swap(txResult.MutableStats());
+ Send(QueryState->RequestActorId, statsEv.Release());
+ }
+ } else {
+ if (txResult.HasStats()) {
+ auto* exec = QueryState->Stats.AddExecutions();
+ exec->Swap(txResult.MutableStats());
+ }
+ }
+
if (QueryState->PreparedQuery &&
QueryState->CurrentTx < QueryState->PreparedQuery->GetPhysicalQuery().TransactionsSize()) {
ExecuteOrDefer();
@@ -951,6 +1030,16 @@ public:
}
}
+ void HandleExecute(TEvKqpExecuter::TEvStreamData::TPtr& ev) {
+ YQL_ENSURE(QueryState && QueryState->RequestActorId);
+ LOG_D("TEvStreamData: " << ev->Get()->Record.DebugString());
+ TlsActivationContext->Send(ev->Forward(QueryState->RequestActorId));
+ }
+
+ void HandleExecute(TEvKqpExecuter::TEvStreamDataAck::TPtr& ev) {
+ TlsActivationContext->Send(ev->Forward(ExecuterId));
+ }
+
void HandleExecute(TEvKqp::TEvAbortExecution::TPtr& ev) {
auto& msg = ev->Get()->Record;
@@ -1045,10 +1134,35 @@ public:
}
}
+ // TODO: Remove? Is it actual for NewEngine?
+ void FillQueryProfile(const NKqpProto::TKqpStatsQuery& stats, NKikimrKqp::TQueryResponse& response) {
+ auto& kqlProfile = *response.MutableProfile()->AddKqlProfiles();
+ for (auto& execStats : stats.GetExecutions()) {
+ auto& txStats = *kqlProfile.AddMkqlProfiles()->MutableTxStats();
+
+ txStats.SetDurationUs(execStats.GetDurationUs());
+ for (auto& tableStats : execStats.GetTables()) {
+ auto& txTableStats = *txStats.AddTableAccessStats();
+
+ txTableStats.MutableTableInfo()->SetName(tableStats.GetTablePath());
+ if (tableStats.GetReadRows() > 0) {
+ txTableStats.MutableSelectRange()->SetRows(tableStats.GetReadRows());
+ txTableStats.MutableSelectRange()->SetBytes(tableStats.GetReadBytes());
+ }
+ if (tableStats.GetWriteRows() > 0) {
+ txTableStats.MutableUpdateRow()->SetCount(tableStats.GetWriteRows());
+ txTableStats.MutableUpdateRow()->SetBytes(tableStats.GetWriteBytes());
+ }
+ if (tableStats.GetEraseRows() > 0) {
+ txTableStats.MutableEraseRow()->SetCount(tableStats.GetEraseRows());
+ }
+ }
+ }
+ }
+
void FillStats(NKikimrKqp::TEvQueryResponse* record) {
auto *response = record->MutableResponse();
- auto* stats = response->MutableQueryStats();
- stats->Swap(&QueryState->Stats);
+ auto* stats = &QueryState->Stats;
stats->SetDurationUs((TInstant::Now() - QueryState->StartTime).MicroSeconds());
//stats->SetWorkerCpuTimeUs(QueryState->CpuTime.MicroSeconds());
@@ -1076,22 +1190,28 @@ public:
return resultsSize;
}
);
- }
- }
+ }
- void FillTxInfo(NKikimrKqp::TQueryResponse* response) {
- Y_VERIFY(QueryState);
- if (QueryState->TxId) {
- response->MutableTxMeta()->set_id(QueryState->TxId);
+ bool reportStats = (GetStatsModeInt(queryRequest, EKikimrStatsMode::None) != EKikimrStatsMode::None);
+ if (reportStats) {
+ FillQueryProfile(*stats, *response);
+ response->SetQueryPlan(SerializeAnalyzePlan(*stats));
+
+ response->MutableQueryStats()->Swap(stats);
}
+ }
+ void FillTxInfo(NKikimrKqp::TQueryResponse* response) {
+ YQL_ENSURE(QueryState);
if (QueryState->Commit) {
RemoveTransaction(QueryState->TxId);
+ QueryState->TxId = "";
}
+ response->MutableTxMeta()->set_id(QueryState->TxId);
if (QueryState->TxCtx) {
auto txInfo = QueryState->TxCtx->GetInfo();
- LOG_N("txInfo"
+ LOG_I("txInfo"
<< " Status: " << txInfo.Status
<< " Kind: " << txInfo.Kind
<< " TotalDuration: " << txInfo.TotalDuration.SecondsFloat()*1e3
@@ -1163,7 +1283,7 @@ public:
auto txIndex = rb.GetTxResultBinding().GetTxIndex();
auto resultIndex = rb.GetTxResultBinding().GetResultIndex();
- auto& txResults = QueryState->TxResults;
+ auto& txResults = QueryState->QueryCtx->TxResults;
YQL_ENSURE(txIndex < txResults.size());
YQL_ENSURE(resultIndex < txResults[txIndex].size());
@@ -1187,7 +1307,30 @@ public:
bool ReplyQueryCompileError(const TKqpCompileResult::TConstPtr& compileResult) {
auto responseEv = std::make_unique<TEvKqp::TEvQueryResponse>();
FillCompileStatus(compileResult, responseEv->Record);
- responseEv->Record.GetRef().SetConsumedRu(1);
+
+ auto& queryRequest = QueryState->Request;
+ TString txId = "";
+ if (queryRequest.HasTxControl()) {
+ auto& txControl = queryRequest.GetTxControl();
+
+ if (txControl.tx_selector_case() == Ydb::Table::TransactionControl::kTxId) {
+ txId = txControl.tx_id();
+ }
+ }
+
+ LOG_W("ReplyQueryCompileError, status" << compileResult->Status << " remove tx with tx_id: " << txId);
+ auto txCtx = FindTransaction(txId);
+ if (txCtx) {
+ txCtx->Invalidate();
+ AbortedTransactions.emplace_back(txCtx);
+ RemoveTransaction(txId);
+ }
+ txId = "";
+
+ auto* record = &responseEv->Record.GetRef();
+ FillTxInfo(record->MutableResponse());
+ record->SetConsumedRu(1);
+
return Reply(std::move(responseEv));
}
@@ -1218,7 +1361,7 @@ public:
auto& queryRequest = QueryState->Request;
auto queryDuration = TInstant::Now() - QueryState->StartTime;
- Y_VERIFY(Counters);
+ YQL_ENSURE(Counters);
Counters->ReportQueryLatency(Settings.DbCounters, queryRequest.GetAction(), queryDuration);
auto& record = responseEv->Record.GetRef();
@@ -1372,6 +1515,7 @@ public:
}
void EndCleanup(bool isFinal) {
+ LOG_D("EndCleanup, isFinal: " << isFinal);
if (isFinal) {
auto lifeSpan = TInstant::Now() - CreationTime;
Counters->ReportSessionActorFinished(Settings.DbCounters, lifeSpan);
@@ -1385,6 +1529,7 @@ public:
PassAway();
} else {
+ AbortedTransactions.clear();
CleanupCtx.reset();
StartIdleTimer();
Become(&TKqpSessionActor::ReadyState);
@@ -1412,6 +1557,8 @@ public:
}
}
+ LOG_I("Cleanup start, isFinal: " << isFinal << " CleanupCtx: " << bool{CleanupCtx}
+ << " AbortedTransactions.size(): " << AbortedTransactions.size());
QueryState.reset();
if (CleanupCtx) {
Become(&TKqpSessionActor::CleanupState);
@@ -1504,6 +1651,10 @@ public:
switch (ev->GetTypeRewrite()) {
hFunc(TEvKqp::TEvQueryRequest, HandleExecute);
hFunc(TEvKqpExecuter::TEvTxResponse, HandleExecute);
+
+ hFunc(TEvKqpExecuter::TEvStreamData, HandleExecute);
+ hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleExecute);
+
hFunc(TEvKqpExecuter::TEvExecuterProgress, HandleNoop);
hFunc(NYql::NDq::TEvDq::TEvAbortExecution, HandleExecute);
hFunc(TEvKqpSnapshot::TEvCreateSnapshotResponse, HandleExecute);
@@ -1568,6 +1719,7 @@ private:
TIntrusivePtr<TModuleResolverState> ModuleResolverState;
TKqpSettings::TConstPtr KqpSettings;
std::optional<TActorId> WorkerId;
+ TActorId ExecuterId;
std::unique_ptr<TKqpQueryState> QueryState;
std::unique_ptr<TKqpCleanupCtx> CleanupCtx;
diff --git a/ydb/core/kqp/kqp_worker_actor.cpp b/ydb/core/kqp/kqp_worker_actor.cpp
index 9cde5d9782..0efd2bb996 100644
--- a/ydb/core/kqp/kqp_worker_actor.cpp
+++ b/ydb/core/kqp/kqp_worker_actor.cpp
@@ -852,12 +852,12 @@ private:
bool keepInCache = false;
switch (queryRequest.GetAction()) {
case NKikimrKqp::QUERY_ACTION_EXECUTE:
- query = TKqpQueryId(Settings.Cluster, Settings.Database, queryRequest.GetQuery());
+ query = TKqpQueryId(Settings.Cluster, Settings.Database, queryRequest.GetQuery(), /*scan*/ false);
keepInCache = queryRequest.GetQueryCachePolicy().keep_in_cache();
break;
case NKikimrKqp::QUERY_ACTION_PREPARE:
- query = TKqpQueryId(Settings.Cluster, Settings.Database, queryRequest.GetQuery());
+ query = TKqpQueryId(Settings.Cluster, Settings.Database, queryRequest.GetQuery(), /*scan*/ false);
keepInCache = true;
break;
diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h
index 6903e6f193..357fed9460 100644
--- a/ydb/core/kqp/provider/yql_kikimr_provider.h
+++ b/ydb/core/kqp/provider/yql_kikimr_provider.h
@@ -108,6 +108,9 @@ struct TKikimrQueryContext : TThrRefBase {
THashMap<ui64, TIntrusivePtr<IKikimrQueryExecutor::TAsyncQueryResult>> InProgress;
TVector<ui64> ExecutionOrder;
+ // Used to store results of transactions in TKqpSessionActor
+ TVector<TVector<NKikimrMiniKQL::TResult>> TxResults;
+
NActors::TActorId ReplyTarget;
TMaybe<NKikimrKqp::TRlPath> RlPath;