diff options
author | spuchin <spuchin@ydb.tech> | 2023-01-31 17:47:24 +0300 |
---|---|---|
committer | spuchin <spuchin@ydb.tech> | 2023-01-31 17:47:24 +0300 |
commit | 6f3e374920f26d9fb38d142db6e1ea3b8e47650a (patch) | |
tree | d8444f03dd6bd869ecbb3402ef4f115770828d04 | |
parent | c691f11c04d73c324b901a2a933a90ba2d015fe0 (diff) | |
download | ydb-6f3e374920f26d9fb38d142db6e1ea3b8e47650a.tar.gz |
Use data_executer with stream result for generic queries. ()
-rw-r--r-- | ydb/core/grpc_services/query/rpc_execute_query.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_data_executer.cpp | 14 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.cpp | 35 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 33 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_scan_executer.cpp | 31 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 11 |
6 files changed, 66 insertions, 62 deletions
diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp index 27fbc44d33e..e34e7f53ef7 100644 --- a/ydb/core/grpc_services/query/rpc_execute_query.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp @@ -37,6 +37,10 @@ std::tuple<Ydb::StatusIds::StatusCode, NYql::TIssues> FillKqpRequest( kqpRequest.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_QUERY); kqpRequest.MutableRequest()->SetKeepSession(false); + // TODO: Use tx control from request. + kqpRequest.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); + kqpRequest.MutableRequest()->MutableTxControl()->set_commit_tx(true); + switch (req.query_case()) { case Ydb::Query::ExecuteQueryRequest::kQueryContent: { NYql::TIssues issues; diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 801de6ac379..cf5791bc004 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -136,8 +136,9 @@ public: } TKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TMaybe<TString>& userToken, - TKqpRequestCounters::TPtr counters) + TKqpRequestCounters::TPtr counters, bool streamResult) : TBase(std::move(request), database, userToken, counters, TWilsonKqp::DataExecuter, "DataExecuter") + , StreamResult(streamResult) { YQL_ENSURE(Request.IsolationLevel != NKikimrKqp::ISOLATION_LEVEL_UNDEFINED); @@ -2213,8 +2214,11 @@ public: if (channel.DstTask) { FillEndpointDesc(*channelDesc.MutableDstEndpoint(), TasksGraph.GetTask(channel.DstTask)); + } else if (StreamResult) { + auto proxy = GetOrCreateChannelProxy(channel); + ActorIdToProto(proxy->SelfId(), channelDesc.MutableDstEndpoint()->MutableActorId()); } else { - // result channel + // For non-stream execution, collect results in executer and forward with response. ActorIdToProto(SelfId(), channelDesc.MutableDstEndpoint()->MutableActorId()); } @@ -2317,6 +2321,8 @@ private: } private: + bool StreamResult = false; + NTxProxy::TRequestControls RequestControls; ui64 TxCoordinator = 0; THashMap<ui64, TShardState> ShardStates; @@ -2356,9 +2362,9 @@ private: } // namespace IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TMaybe<TString>& userToken, - TKqpRequestCounters::TPtr counters) + TKqpRequestCounters::TPtr counters, bool streamResult) { - return new TKqpDataExecuter(std::move(request), database, userToken, counters); + return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult); } } // namespace NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp index 9523883e756..79ff9fcdb41 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp @@ -150,10 +150,9 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt if (request.Transactions.empty()) { // commit-only or rollback-only data transaction YQL_ENSURE(request.EraseLocks); - return CreateKqpDataExecuter(std::move(request), database, userToken, counters); + return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false); } - bool data = true; // `false` stands for Scan TMaybe<NKqpProto::TKqpPhyTx::EType> txsType; for (auto& tx : request.Transactions) { if (txsType) { @@ -161,31 +160,23 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt YQL_ENSURE(*txsType == NKqpProto::TKqpPhyTx::TYPE_DATA, "Cannot execute multiple non-data physical txs."); } else { txsType = tx.Body->GetType(); + } + } - switch (tx.Body->GetType()) { - case NKqpProto::TKqpPhyTx::TYPE_COMPUTE: - case NKqpProto::TKqpPhyTx::TYPE_DATA: - data = true; - break; + switch (*txsType) { + case NKqpProto::TKqpPhyTx::TYPE_COMPUTE: + case NKqpProto::TKqpPhyTx::TYPE_DATA: + return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false); - case NKqpProto::TKqpPhyTx::TYPE_SCAN: - data = false; - break; + case NKqpProto::TKqpPhyTx::TYPE_SCAN: + return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation); - case NKqpProto::TKqpPhyTx::TYPE_GENERIC: - // TODO: Use separate executer. - data = false; - break; + case NKqpProto::TKqpPhyTx::TYPE_GENERIC: + return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true); - default: - YQL_ENSURE(false, "Unsupported physical tx type: " << (ui32)tx.Body->GetType()); - } - } + default: + YQL_ENSURE(false, "Unsupported physical tx type: " << (ui32)*txsType); } - - return data - ? CreateKqpDataExecuter(std::move(request), database, userToken, counters) - : CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation); } } // namespace NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 6651813e1d2..543f87b5861 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -996,6 +996,35 @@ protected: return true; } + IActor* GetOrCreateChannelProxy(const NYql::NDq::TChannel& channel) { + IActor* proxy; + + if (ResponseEv->TxResults[0].IsStream) { + if (!ResultChannelProxies.empty()) { + return ResultChannelProxies.begin()->second; + } + + proxy = CreateResultStreamChannelProxy(TxId, channel.Id, ResponseEv->TxResults[0].MkqlItemType, + ResponseEv->TxResults[0].ColumnOrder, Target, Stats.get(), this->SelfId()); + } else { + YQL_ENSURE(channel.DstInputIndex < ResponseEv->ResultsSize()); + + auto channelIt = ResultChannelProxies.find(channel.Id); + + if (channelIt != ResultChannelProxies.end()) { + return channelIt->second; + } + + proxy = CreateResultDataChannelProxy(TxId, channel.Id, Stats.get(), this->SelfId(), + channel.DstInputIndex, ResponseEv.get()); + } + + this->RegisterWithSameMailbox(proxy); + ResultChannelProxies.emplace(std::make_pair(channel.Id, proxy)); + + return proxy; + } + protected: void PassAway() override { LOG_D("terminate execution."); @@ -1072,6 +1101,7 @@ protected: TActorId KqpShardsResolverId; THashMap<TActorId, TProgressStat> PendingComputeActors; // Running compute actors (pure and DS) THashMap<TActorId, NYql::NDqProto::TComputeActorExtraData> ExtraData; + std::unordered_map<ui64, IActor*> ResultChannelProxies; TVector<TProgressStat> LastStats; @@ -1089,6 +1119,7 @@ protected: ui64 LastTaskId = 0; TString LastComputeActorId = ""; + private: static constexpr TDuration ResourceUsageUpdateInterval = TDuration::MilliSeconds(100); }; @@ -1098,7 +1129,7 @@ private: IActor* CreateKqpLiteralExecuter(IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters); IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, - const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters); + const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult); IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TMaybe<TString>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation); diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index d61f577dddf..b996a6e3f9b 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -730,34 +730,6 @@ public: } } - IActor* GetOrCreateChannelProxy(const TChannel& channel) { - IActor* proxy; - - if (ResponseEv->TxResults[0].IsStream) { - if (!ResultChannelProxies.empty()) { - return ResultChannelProxies.begin()->second; - } - - proxy = CreateResultStreamChannelProxy(TxId, channel.Id, ResponseEv->TxResults[0].MkqlItemType, - ResponseEv->TxResults[0].ColumnOrder, Target, Stats.get(), SelfId()); - } else { - YQL_ENSURE(channel.DstInputIndex < ResponseEv->ResultsSize()); - - auto channelIt = ResultChannelProxies.find(channel.Id); - - if (channelIt != ResultChannelProxies.end()) { - return channelIt->second; - } - - proxy = CreateResultDataChannelProxy(TxId, channel.Id, Stats.get(), SelfId(), channel.DstInputIndex, ResponseEv.get()); - } - - RegisterWithSameMailbox(proxy); - ResultChannelProxies.emplace(std::make_pair(channel.Id, proxy)); - - return proxy; - } - void FillChannelDesc(NYql::NDqProto::TChannel& channelDesc, const TChannel& channel) { channelDesc.SetId(channel.Id); channelDesc.SetSrcTaskId(channel.SrcTask); @@ -776,9 +748,6 @@ public: channelDesc.SetIsPersistent(IsCrossShardChannel(TasksGraph, channel)); channelDesc.SetInMemory(channel.InMemory); } - -private: - std::unordered_map<ui64, IActor*> ResultChannelProxies; const NKikimrConfig::TTableServiceConfig::TAggregationConfig AggregationSettings; }; diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index b9835c8e873..7052e5cee73 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -171,8 +171,7 @@ struct TKqpQueryState { auto type = GetType(); return ( type == NKikimrKqp::QUERY_TYPE_SQL_SCAN || - type == NKikimrKqp::QUERY_TYPE_AST_SCAN || - type == NKikimrKqp::QUERY_TYPE_SQL_QUERY // TODO: Switch to MVCC snapshots after moving to separate executer. + type == NKikimrKqp::QUERY_TYPE_AST_SCAN ); } @@ -999,8 +998,12 @@ public: IKqpGateway::TExecPhysicalRequest PrepareGenericRequest(TKqpQueryState *queryState) { auto request = PrepareBaseRequest(queryState, queryState->TxCtx->TxAlloc); - YQL_ENSURE(queryState); - request.Snapshot = queryState->TxCtx->GetSnapshot(); + if (queryState) { + request.Snapshot = queryState->TxCtx->GetSnapshot(); + request.IsolationLevel = *queryState->TxCtx->EffectiveIsolationLevel; + } else { + request.IsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE; + } return request; } |