aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorspuchin <spuchin@ydb.tech>2023-01-31 17:47:24 +0300
committerspuchin <spuchin@ydb.tech>2023-01-31 17:47:24 +0300
commit6f3e374920f26d9fb38d142db6e1ea3b8e47650a (patch)
treed8444f03dd6bd869ecbb3402ef4f115770828d04
parentc691f11c04d73c324b901a2a933a90ba2d015fe0 (diff)
downloadydb-6f3e374920f26d9fb38d142db6e1ea3b8e47650a.tar.gz
Use data_executer with stream result for generic queries. ()
-rw-r--r--ydb/core/grpc_services/query/rpc_execute_query.cpp4
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp14
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.cpp35
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h33
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp31
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp11
6 files changed, 66 insertions, 62 deletions
diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp
index 27fbc44d33e..e34e7f53ef7 100644
--- a/ydb/core/grpc_services/query/rpc_execute_query.cpp
+++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp
@@ -37,6 +37,10 @@ std::tuple<Ydb::StatusIds::StatusCode, NYql::TIssues> FillKqpRequest(
kqpRequest.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_QUERY);
kqpRequest.MutableRequest()->SetKeepSession(false);
+ // TODO: Use tx control from request.
+ kqpRequest.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
+ kqpRequest.MutableRequest()->MutableTxControl()->set_commit_tx(true);
+
switch (req.query_case()) {
case Ydb::Query::ExecuteQueryRequest::kQueryContent: {
NYql::TIssues issues;
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 801de6ac379..cf5791bc004 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -136,8 +136,9 @@ public:
}
TKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TMaybe<TString>& userToken,
- TKqpRequestCounters::TPtr counters)
+ TKqpRequestCounters::TPtr counters, bool streamResult)
: TBase(std::move(request), database, userToken, counters, TWilsonKqp::DataExecuter, "DataExecuter")
+ , StreamResult(streamResult)
{
YQL_ENSURE(Request.IsolationLevel != NKikimrKqp::ISOLATION_LEVEL_UNDEFINED);
@@ -2213,8 +2214,11 @@ public:
if (channel.DstTask) {
FillEndpointDesc(*channelDesc.MutableDstEndpoint(), TasksGraph.GetTask(channel.DstTask));
+ } else if (StreamResult) {
+ auto proxy = GetOrCreateChannelProxy(channel);
+ ActorIdToProto(proxy->SelfId(), channelDesc.MutableDstEndpoint()->MutableActorId());
} else {
- // result channel
+ // For non-stream execution, collect results in executer and forward with response.
ActorIdToProto(SelfId(), channelDesc.MutableDstEndpoint()->MutableActorId());
}
@@ -2317,6 +2321,8 @@ private:
}
private:
+ bool StreamResult = false;
+
NTxProxy::TRequestControls RequestControls;
ui64 TxCoordinator = 0;
THashMap<ui64, TShardState> ShardStates;
@@ -2356,9 +2362,9 @@ private:
} // namespace
IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TMaybe<TString>& userToken,
- TKqpRequestCounters::TPtr counters)
+ TKqpRequestCounters::TPtr counters, bool streamResult)
{
- return new TKqpDataExecuter(std::move(request), database, userToken, counters);
+ return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult);
}
} // namespace NKqp
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
index 9523883e756..79ff9fcdb41 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
@@ -150,10 +150,9 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
if (request.Transactions.empty()) {
// commit-only or rollback-only data transaction
YQL_ENSURE(request.EraseLocks);
- return CreateKqpDataExecuter(std::move(request), database, userToken, counters);
+ return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false);
}
- bool data = true; // `false` stands for Scan
TMaybe<NKqpProto::TKqpPhyTx::EType> txsType;
for (auto& tx : request.Transactions) {
if (txsType) {
@@ -161,31 +160,23 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
YQL_ENSURE(*txsType == NKqpProto::TKqpPhyTx::TYPE_DATA, "Cannot execute multiple non-data physical txs.");
} else {
txsType = tx.Body->GetType();
+ }
+ }
- switch (tx.Body->GetType()) {
- case NKqpProto::TKqpPhyTx::TYPE_COMPUTE:
- case NKqpProto::TKqpPhyTx::TYPE_DATA:
- data = true;
- break;
+ switch (*txsType) {
+ case NKqpProto::TKqpPhyTx::TYPE_COMPUTE:
+ case NKqpProto::TKqpPhyTx::TYPE_DATA:
+ return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false);
- case NKqpProto::TKqpPhyTx::TYPE_SCAN:
- data = false;
- break;
+ case NKqpProto::TKqpPhyTx::TYPE_SCAN:
+ return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation);
- case NKqpProto::TKqpPhyTx::TYPE_GENERIC:
- // TODO: Use separate executer.
- data = false;
- break;
+ case NKqpProto::TKqpPhyTx::TYPE_GENERIC:
+ return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true);
- default:
- YQL_ENSURE(false, "Unsupported physical tx type: " << (ui32)tx.Body->GetType());
- }
- }
+ default:
+ YQL_ENSURE(false, "Unsupported physical tx type: " << (ui32)*txsType);
}
-
- return data
- ? CreateKqpDataExecuter(std::move(request), database, userToken, counters)
- : CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation);
}
} // namespace NKqp
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 6651813e1d2..543f87b5861 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -996,6 +996,35 @@ protected:
return true;
}
+ IActor* GetOrCreateChannelProxy(const NYql::NDq::TChannel& channel) {
+ IActor* proxy;
+
+ if (ResponseEv->TxResults[0].IsStream) {
+ if (!ResultChannelProxies.empty()) {
+ return ResultChannelProxies.begin()->second;
+ }
+
+ proxy = CreateResultStreamChannelProxy(TxId, channel.Id, ResponseEv->TxResults[0].MkqlItemType,
+ ResponseEv->TxResults[0].ColumnOrder, Target, Stats.get(), this->SelfId());
+ } else {
+ YQL_ENSURE(channel.DstInputIndex < ResponseEv->ResultsSize());
+
+ auto channelIt = ResultChannelProxies.find(channel.Id);
+
+ if (channelIt != ResultChannelProxies.end()) {
+ return channelIt->second;
+ }
+
+ proxy = CreateResultDataChannelProxy(TxId, channel.Id, Stats.get(), this->SelfId(),
+ channel.DstInputIndex, ResponseEv.get());
+ }
+
+ this->RegisterWithSameMailbox(proxy);
+ ResultChannelProxies.emplace(std::make_pair(channel.Id, proxy));
+
+ return proxy;
+ }
+
protected:
void PassAway() override {
LOG_D("terminate execution.");
@@ -1072,6 +1101,7 @@ protected:
TActorId KqpShardsResolverId;
THashMap<TActorId, TProgressStat> PendingComputeActors; // Running compute actors (pure and DS)
THashMap<TActorId, NYql::NDqProto::TComputeActorExtraData> ExtraData;
+ std::unordered_map<ui64, IActor*> ResultChannelProxies;
TVector<TProgressStat> LastStats;
@@ -1089,6 +1119,7 @@ protected:
ui64 LastTaskId = 0;
TString LastComputeActorId = "";
+
private:
static constexpr TDuration ResourceUsageUpdateInterval = TDuration::MilliSeconds(100);
};
@@ -1098,7 +1129,7 @@ private:
IActor* CreateKqpLiteralExecuter(IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters);
IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
- const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters);
+ const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult);
IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation);
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index d61f577dddf..b996a6e3f9b 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -730,34 +730,6 @@ public:
}
}
- IActor* GetOrCreateChannelProxy(const TChannel& channel) {
- IActor* proxy;
-
- if (ResponseEv->TxResults[0].IsStream) {
- if (!ResultChannelProxies.empty()) {
- return ResultChannelProxies.begin()->second;
- }
-
- proxy = CreateResultStreamChannelProxy(TxId, channel.Id, ResponseEv->TxResults[0].MkqlItemType,
- ResponseEv->TxResults[0].ColumnOrder, Target, Stats.get(), SelfId());
- } else {
- YQL_ENSURE(channel.DstInputIndex < ResponseEv->ResultsSize());
-
- auto channelIt = ResultChannelProxies.find(channel.Id);
-
- if (channelIt != ResultChannelProxies.end()) {
- return channelIt->second;
- }
-
- proxy = CreateResultDataChannelProxy(TxId, channel.Id, Stats.get(), SelfId(), channel.DstInputIndex, ResponseEv.get());
- }
-
- RegisterWithSameMailbox(proxy);
- ResultChannelProxies.emplace(std::make_pair(channel.Id, proxy));
-
- return proxy;
- }
-
void FillChannelDesc(NYql::NDqProto::TChannel& channelDesc, const TChannel& channel) {
channelDesc.SetId(channel.Id);
channelDesc.SetSrcTaskId(channel.SrcTask);
@@ -776,9 +748,6 @@ public:
channelDesc.SetIsPersistent(IsCrossShardChannel(TasksGraph, channel));
channelDesc.SetInMemory(channel.InMemory);
}
-
-private:
- std::unordered_map<ui64, IActor*> ResultChannelProxies;
const NKikimrConfig::TTableServiceConfig::TAggregationConfig AggregationSettings;
};
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index b9835c8e873..7052e5cee73 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -171,8 +171,7 @@ struct TKqpQueryState {
auto type = GetType();
return (
type == NKikimrKqp::QUERY_TYPE_SQL_SCAN ||
- type == NKikimrKqp::QUERY_TYPE_AST_SCAN ||
- type == NKikimrKqp::QUERY_TYPE_SQL_QUERY // TODO: Switch to MVCC snapshots after moving to separate executer.
+ type == NKikimrKqp::QUERY_TYPE_AST_SCAN
);
}
@@ -999,8 +998,12 @@ public:
IKqpGateway::TExecPhysicalRequest PrepareGenericRequest(TKqpQueryState *queryState) {
auto request = PrepareBaseRequest(queryState, queryState->TxCtx->TxAlloc);
- YQL_ENSURE(queryState);
- request.Snapshot = queryState->TxCtx->GetSnapshot();
+ if (queryState) {
+ request.Snapshot = queryState->TxCtx->GetSnapshot();
+ request.IsolationLevel = *queryState->TxCtx->EffectiveIsolationLevel;
+ } else {
+ request.IsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE;
+ }
return request;
}