diff options
author | dcherednik <dcherednik@ydb.tech> | 2022-10-26 14:13:07 +0300 |
---|---|---|
committer | dcherednik <dcherednik@ydb.tech> | 2022-10-26 14:13:07 +0300 |
commit | 71b018f828ad9f35f0b391747dd7eaa200793d1d (patch) | |
tree | a72222a98aa62b30d83638b0f7714c504cca229e | |
parent | 5e7625c7515775f88c42a8581489fb9efc993c3d (diff) | |
download | ydb-71b018f828ad9f35f0b391747dd7eaa200793d1d.tar.gz |
ydb format support for session actor.
-rw-r--r-- | library/cpp/grpc/server/grpc_request.h | 5 | ||||
-rw-r--r-- | library/cpp/grpc/server/grpc_request_base.h | 3 | ||||
-rw-r--r-- | ydb/core/grpc_services/base/base.h | 12 | ||||
-rw-r--r-- | ydb/core/grpc_services/local_rpc/local_rpc.h | 4 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_deferrable.h | 4 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_execute_data_query.cpp | 14 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_execute_yql_script.cpp | 2 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_kqp_base.h | 38 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/kqp.h | 10 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_response.cpp | 31 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_session_actor.cpp | 19 | ||||
-rw-r--r-- | ydb/core/kqp/proxy/kqp_proxy_service.cpp | 3 | ||||
-rw-r--r-- | ydb/core/kqp/proxy/kqp_proxy_service.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_params_ut.cpp | 4 | ||||
-rw-r--r-- | ydb/core/protos/kqp.proto | 4 | ||||
-rw-r--r-- | ydb/core/public_http/grpc_request_context_wrapper.cpp | 4 | ||||
-rw-r--r-- | ydb/core/public_http/grpc_request_context_wrapper.h | 1 | ||||
-rw-r--r-- | ydb/core/viewer/json_query.h | 2 |
19 files changed, 115 insertions, 50 deletions
diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h index b79a0d94fab..a3a5c291f07 100644 --- a/library/cpp/grpc/server/grpc_request.h +++ b/library/cpp/grpc/server/grpc_request.h @@ -179,6 +179,11 @@ public: return Request_; } + NProtoBuf::Message* GetRequestMut() override { + return Request_; + } + + TAuthState& GetAuthState() override { return AuthState_; } diff --git a/library/cpp/grpc/server/grpc_request_base.h b/library/cpp/grpc/server/grpc_request_base.h index 8827d028b34..105f9515d0a 100644 --- a/library/cpp/grpc/server/grpc_request_base.h +++ b/library/cpp/grpc/server/grpc_request_base.h @@ -53,6 +53,9 @@ public: //! Get pointer to the request's message. virtual const NProtoBuf::Message* GetRequest() const = 0; + //! Get mutable pointer to the request's message. + virtual NProtoBuf::Message* GetRequestMut() = 0; + //! Get current auth state virtual TAuthState& GetAuthState() = 0; diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h index 4bf9ed201ae..c98440185a7 100644 --- a/ydb/core/grpc_services/base/base.h +++ b/ydb/core/grpc_services/base/base.h @@ -380,6 +380,7 @@ class IRequestCtx : public virtual IRequestCtxBase { public: virtual void SetClientLostAction(std::function<void()>&& cb) = 0; virtual const google::protobuf::Message* GetRequest() const = 0; + virtual google::protobuf::Message* GetRequestMut() = 0; virtual google::protobuf::Arena* GetArena() = 0; virtual const TMaybe<TString> GetRequestType() const = 0; virtual void SetRuHeader(ui64 ru) = 0; @@ -1004,6 +1005,13 @@ public: return request; } + template <typename T> + static TRequest* GetProtoRequestMut(const T& req) { + auto request = dynamic_cast<TRequest*>(req->GetRequestMut()); + Y_VERIFY(request != nullptr, "Wrong using of TGRpcRequestWrapper"); + return request; + } + const TRequest* GetProtoRequest() const { return GetProtoRequest(this); } @@ -1094,6 +1102,10 @@ public: return Ctx_->GetRequest(); } + google::protobuf::Message* GetRequestMut() override { + return Ctx_->GetRequestMut(); + } + void SetRespHook(TRespHook&& hook) override { RespHook = std::move(hook); } diff --git a/ydb/core/grpc_services/local_rpc/local_rpc.h b/ydb/core/grpc_services/local_rpc/local_rpc.h index b39996b84f3..b70b9b420ca 100644 --- a/ydb/core/grpc_services/local_rpc/local_rpc.h +++ b/ydb/core/grpc_services/local_rpc/local_rpc.h @@ -137,6 +137,10 @@ public: return &Request; } + google::protobuf::Message* GetRequestMut() override { + return &Request; + } + void SetClientLostAction(std::function<void()>&&) override {} void AddServerHint(const TString&) override {} diff --git a/ydb/core/grpc_services/rpc_deferrable.h b/ydb/core/grpc_services/rpc_deferrable.h index 644e7b84c40..306f12620f4 100644 --- a/ydb/core/grpc_services/rpc_deferrable.h +++ b/ydb/core/grpc_services/rpc_deferrable.h @@ -47,6 +47,10 @@ public: return TRequest::GetProtoRequest(Request_); } + typename TRequest::TRequest* GetProtoRequestMut() { + return TRequest::GetProtoRequestMut(Request_); + } + Ydb::Operations::OperationParams::OperationMode GetOperationMode() const { return GetProtoRequest()->operation_params().operation_mode(); } diff --git a/ydb/core/grpc_services/rpc_execute_data_query.cpp b/ydb/core/grpc_services/rpc_execute_data_query.cpp index 5f62ffefae6..dca4c52bcb9 100644 --- a/ydb/core/grpc_services/rpc_execute_data_query.cpp +++ b/ydb/core/grpc_services/rpc_execute_data_query.cpp @@ -57,6 +57,8 @@ public: SetAuthToken(ev, *Request_); SetDatabase(ev, *Request_); + ev->Record.MutableRequest()->SetUsePublicResponseDataFormat(true); + NYql::TIssues issues; if (CheckSession(req->session_id(), issues)) { ev->Record.MutableRequest()->SetSessionId(req->session_id()); @@ -77,9 +79,8 @@ public: if (req->parametersSize() != 0) { try { - NKikimrMiniKQL::TParams params; - ConvertYdbParamsToMiniKQLParams(req->parameters(), params); - ev->Record.MutableRequest()->MutableParameters()->CopyFrom(params); + ConvertYdbParamsToMiniKQLParams(req->parameters(), *ev->Record.MutableRequest()->MutableParameters()); + //ev->Record.MutableRequest()->MutableYdbParameters()->swap(*req->mutable_parameters()); } catch (const std::exception& ex) { auto issue = MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Failed to parse query parameters."); issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(NYql::ExceptionToIssue(ex))); @@ -220,11 +221,14 @@ public: if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { const auto& kqpResponse = record.GetResponse(); const auto& issueMessage = kqpResponse.GetQueryIssues(); - auto queryResult = TEvExecuteDataQueryRequest::AllocateResult<Ydb::Table::ExecuteQueryResult>(Request_); try { - ConvertKqpQueryResultsToDbResult(kqpResponse, queryResult); + if (kqpResponse.GetYdbResults().size()) { + queryResult->mutable_result_sets()->CopyFrom(kqpResponse.GetYdbResults()); + } else { + NKqp::ConvertKqpQueryResultsToDbResult(kqpResponse, queryResult); + } ConvertQueryStats(kqpResponse, queryResult); if (kqpResponse.HasTxMeta()) { queryResult->mutable_tx_meta()->CopyFrom(kqpResponse.GetTxMeta()); diff --git a/ydb/core/grpc_services/rpc_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_execute_yql_script.cpp index 4bd8f86c9a5..1102709c57f 100644 --- a/ydb/core/grpc_services/rpc_execute_yql_script.cpp +++ b/ydb/core/grpc_services/rpc_execute_yql_script.cpp @@ -100,7 +100,7 @@ public: auto queryResult = TEvExecuteYqlScriptRequest::AllocateResult<TResult>(Request_); try { - ConvertKqpQueryResultsToDbResult(kqpResponse, queryResult); + NKqp::ConvertKqpQueryResultsToDbResult(kqpResponse, queryResult); } catch (const std::exception& ex) { NYql::TIssues issues; issues.AddIssue(NYql::ExceptionToIssue(ex)); diff --git a/ydb/core/grpc_services/rpc_kqp_base.h b/ydb/core/grpc_services/rpc_kqp_base.h index 8f5787aa9f5..16da82b632e 100644 --- a/ydb/core/grpc_services/rpc_kqp_base.h +++ b/ydb/core/grpc_services/rpc_kqp_base.h @@ -74,44 +74,6 @@ inline bool CheckQuery(const TString& query, NYql::TIssues& issues) { void FillQueryStats(Ydb::TableStats::QueryStats& queryStats, const NKikimrKqp::TQueryResponse& kqpResponse); -inline void ConvertKqpQueryResultToDbResult(const NKikimrMiniKQL::TResult& from, Ydb::ResultSet* to) { - const auto& type = from.GetType(); - TStackVec<NKikimrMiniKQL::TType> columnTypes; - Y_ENSURE(type.GetKind() == NKikimrMiniKQL::ETypeKind::Struct); - for (const auto& member : type.GetStruct().GetMember()) { - if (member.GetType().GetKind() == NKikimrMiniKQL::ETypeKind::List) { - for (const auto& column : member.GetType().GetList().GetItem().GetStruct().GetMember()) { - auto columnMeta = to->add_columns(); - columnMeta->set_name(column.GetName()); - columnTypes.push_back(column.GetType()); - ConvertMiniKQLTypeToYdbType(column.GetType(), *columnMeta->mutable_type()); - } - } - } - for (const auto& responseStruct : from.GetValue().GetStruct()) { - for (const auto& row : responseStruct.GetList()) { - auto newRow = to->add_rows(); - ui32 columnCount = static_cast<ui32>(row.StructSize()); - Y_ENSURE(columnCount == columnTypes.size()); - for (ui32 i = 0; i < columnCount; i++) { - const auto& column = row.GetStruct(i); - ConvertMiniKQLValueToYdbValue(columnTypes[i], column, *newRow->add_items()); - } - } - if (responseStruct.Getvalue_valueCase() == NKikimrMiniKQL::TValue::kBool) { - to->set_truncated(responseStruct.GetBool()); - } - } -} - -template<typename TFrom, typename TTo> -inline void ConvertKqpQueryResultsToDbResult(const TFrom& from, TTo* to) { - const auto& results = from.GetResults(); - for (const auto& result : results) { - ConvertKqpQueryResultToDbResult(result, to->add_result_sets()); - } -} - template <typename TDerived, typename TRequest> class TRpcKqpRequestActor : public TRpcOperationRequestActor<TDerived, TRequest> { using TBase = TRpcOperationRequestActor<TDerived, TRequest>; diff --git a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp index bc854fe3bb7..03c8a61cdd3 100644 --- a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp +++ b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp @@ -214,7 +214,7 @@ private: auto result = response.mutable_result(); try { - ConvertKqpQueryResultToDbResult(kqpResult, result->mutable_result_set()); + NKqp::ConvertKqpQueryResultToDbResult(kqpResult, result->mutable_result_set()); } catch (std::exception ex) { ReplyFinishStream(ex.what(), ctx); } diff --git a/ydb/core/kqp/kqp.h b/ydb/core/kqp/kqp.h index 9168308a314..e34a7c46114 100644 --- a/ydb/core/kqp/kqp.h +++ b/ydb/core/kqp/kqp.h @@ -16,6 +16,16 @@ namespace NKikimr { namespace NKqp { +void ConvertKqpQueryResultToDbResult(const NKikimrMiniKQL::TResult& from, Ydb::ResultSet* to); + +template<typename TFrom, typename TTo> +inline void ConvertKqpQueryResultsToDbResult(const TFrom& from, TTo* to) { + const auto& results = from.GetResults(); + for (const auto& result : results) { + ConvertKqpQueryResultToDbResult(result, to->add_result_sets()); + } +} + enum class ETableReadType { Other = 0, Scan = 1, diff --git a/ydb/core/kqp/kqp_response.cpp b/ydb/core/kqp/kqp_response.cpp index 8d96beaac4b..d98da6d1b8b 100644 --- a/ydb/core/kqp/kqp_response.cpp +++ b/ydb/core/kqp/kqp_response.cpp @@ -1,4 +1,5 @@ #include "kqp_impl.h" +#include <ydb/core/ydb_convert/ydb_convert.h> #include <ydb/library/yql/public/issue/yql_issue_message.h> @@ -51,6 +52,36 @@ bool HasSchemeOrFatalIssues(const TIssue& issue) { } // namespace +void ConvertKqpQueryResultToDbResult(const NKikimrMiniKQL::TResult& from, Ydb::ResultSet* to) { + const auto& type = from.GetType(); + TStackVec<NKikimrMiniKQL::TType> columnTypes; + Y_ENSURE(type.GetKind() == NKikimrMiniKQL::ETypeKind::Struct); + for (const auto& member : type.GetStruct().GetMember()) { + if (member.GetType().GetKind() == NKikimrMiniKQL::ETypeKind::List) { + for (const auto& column : member.GetType().GetList().GetItem().GetStruct().GetMember()) { + auto columnMeta = to->add_columns(); + columnMeta->set_name(column.GetName()); + columnTypes.push_back(column.GetType()); + ConvertMiniKQLTypeToYdbType(column.GetType(), *columnMeta->mutable_type()); + } + } + } + for (const auto& responseStruct : from.GetValue().GetStruct()) { + for (const auto& row : responseStruct.GetList()) { + auto newRow = to->add_rows(); + ui32 columnCount = static_cast<ui32>(row.StructSize()); + Y_ENSURE(columnCount == columnTypes.size()); + for (ui32 i = 0; i < columnCount; i++) { + const auto& column = row.GetStruct(i); + ConvertMiniKQLValueToYdbValue(columnTypes[i], column, *newRow->add_items()); + } + } + if (responseStruct.Getvalue_valueCase() == NKikimrMiniKQL::TValue::kBool) { + to->set_truncated(responseStruct.GetBool()); + } + } +} + TMaybe<Ydb::StatusIds::StatusCode> GetYdbStatus(const TIssue& issue) { if (issue.GetSeverity() == TSeverityIds::S_FATAL) { return Ydb::StatusIds::INTERNAL_ERROR; diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index 2fb94cabe65..70357b9320b 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -12,6 +12,7 @@ #include <ydb/core/kqp/provider/yql_kikimr_provider.h> #include <ydb/core/kqp/provider/yql_kikimr_results.h> #include <ydb/core/kqp/rm/kqp_snapshot_manager.h> +#include <ydb/core/ydb_convert/ydb_convert.h> #include <ydb/core/util/ulid.h> @@ -334,6 +335,16 @@ public: QueryState->StartTime = TInstant::Now(); QueryState->UserToken = event.GetUserToken(); QueryState->QueryDeadlines = GetQueryDeadlines(queryRequest); + + if (!queryRequest.HasParameters() && queryRequest.YdbParametersSize()) { + try { + ConvertYdbParamsToMiniKQLParams(queryRequest.GetYdbParameters(), *queryRequest.MutableParameters()); + } catch (const std::exception& ex) { + ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST) + << "Failed to parse query parameters. "<< ex.what(); + } + } + QueryState->ParametersSize = queryRequest.GetParameters().ByteSize(); QueryState->RequestActorId = ActorIdFromProto(event.GetRequestActorId()); QueryState->KeepSession = Settings.LongSession || queryRequest.GetKeepSession(); @@ -1460,6 +1471,8 @@ public: response->SetPreparedQuery(queryId); } + bool useYdbResponseFormat = QueryState->Request.GetUsePublicResponseDataFormat(); + if (QueryState->PreparedQuery) { auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery(); for (size_t i = 0; i < phyQuery.ResultBindingsSize(); ++i) { @@ -1484,7 +1497,11 @@ public: auto* protoRes = KikimrResultToProto(txResults[txIndex][resultIndex], {}, fillSettings.value_or(FillSettings), arena.get()); - response->AddResults()->Swap(protoRes); + if (useYdbResponseFormat) { + ConvertKqpQueryResultToDbResult(*protoRes, response->AddYdbResults()); + } else { + response->AddResults()->Swap(protoRes); + } } } diff --git a/ydb/core/kqp/proxy/kqp_proxy_service.cpp b/ydb/core/kqp/proxy/kqp_proxy_service.cpp index 3ccce5cef4e..034b69f16ab 100644 --- a/ydb/core/kqp/proxy/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy/kqp_proxy_service.cpp @@ -18,6 +18,7 @@ #include <ydb/core/actorlib_impl/long_timer.h> #include <ydb/public/lib/operation_id/operation_id.h> #include <ydb/core/node_whiteboard/node_whiteboard.h> +#include <ydb/core/ydb_convert/ydb_convert.h> #include <ydb/library/yql/utils/actor_log/log.h> #include <ydb/library/yql/core/services/mounts/yql_mounts.h> @@ -522,6 +523,8 @@ public: if (sessionInfo) { targetId = sessionInfo->WorkerId; } else { + // No local session. Disable public format due to compatibility. + request.SetUsePublicResponseDataFormat(false); targetId = TryGetSessionTargetActor(request.GetSessionId(), requestInfo, requestId); if (!targetId) { return; diff --git a/ydb/core/kqp/proxy/kqp_proxy_service.h b/ydb/core/kqp/proxy/kqp_proxy_service.h index e6d44a176ca..e8fec4eed61 100644 --- a/ydb/core/kqp/proxy/kqp_proxy_service.h +++ b/ydb/core/kqp/proxy/kqp_proxy_service.h @@ -155,7 +155,8 @@ public: ReadySessions[1].push_back(sessionId); } - auto result = LocalSessions.emplace(sessionId, TKqpSessionInfo(sessionId, workerId, database, dbCounters, std::move(pos))); + auto result = LocalSessions.emplace(sessionId, + TKqpSessionInfo(sessionId, workerId, database, dbCounters, std::move(pos))); SessionsCountPerDatabase[database]++; Y_VERIFY(result.second, "Duplicate session id!"); TargetIdIndex.emplace(workerId, sessionId); diff --git a/ydb/core/kqp/ut/kqp_params_ut.cpp b/ydb/core/kqp/ut/kqp_params_ut.cpp index 021f97e2d71..d00d5ddbb54 100644 --- a/ydb/core/kqp/ut/kqp_params_ut.cpp +++ b/ydb/core/kqp/ut/kqp_params_ut.cpp @@ -274,8 +274,8 @@ Y_UNIT_TEST_SUITE(KqpParams) { UNIT_ASSERT_C(actual == expected1 || actual == expected2, "expected: " << expected1 << ", got: " << actual); } - Y_UNIT_TEST_NEW_ENGINE(InvalidJson) { - TKikimrRunner kikimr; + Y_UNIT_TEST_QUAD(InvalidJson, UseNewEngine, UseSessionActor) { + auto kikimr = KikimrRunnerEnableSessionActor(UseNewEngine && UseSessionActor); auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index 8476ed6d0a0..143c37f74c1 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -102,6 +102,8 @@ message TQueryRequest { optional NYql.NDqProto.EDqStatsMode StatsMode = 20; // deprecated optional Ydb.Table.QueryStatsCollection.Mode CollectStats = 21; optional TTopicOperations TopicOperations = 22; + optional bool UsePublicResponseDataFormat = 23; + map<string, Ydb.TypedValue> YdbParameters = 24; } message TKqpPathIdProto { @@ -262,6 +264,7 @@ message TQueryResponse { repeated TParameterDescription QueryParameters = 10; optional Ydb.Table.TransactionMeta TxMeta = 11; optional NKqpProto.TKqpStatsQuery QueryStats = 12; + repeated Ydb.ResultSet YdbResults = 13; } message TEvQueryResponse { @@ -383,6 +386,7 @@ message TExecuterTxResult { optional NKikimrMiniKQL.TResult Locks = 4; reserved 5; // (deprecated) Stats optional NYql.NDqProto.TDqExecutionStats Stats = 6; + repeated Ydb.ResultSet YdbResults = 7; }; message TExecuterTxResponse { diff --git a/ydb/core/public_http/grpc_request_context_wrapper.cpp b/ydb/core/public_http/grpc_request_context_wrapper.cpp index 2b6913259e1..a7b0225c4ae 100644 --- a/ydb/core/public_http/grpc_request_context_wrapper.cpp +++ b/ydb/core/public_http/grpc_request_context_wrapper.cpp @@ -19,6 +19,10 @@ namespace NKikimr::NPublicHttp { return Request.get(); } + NProtoBuf::Message* TGrpcRequestContextWrapper::GetRequestMut() { + return Request.get(); + } + NGrpc::TAuthState& TGrpcRequestContextWrapper::GetAuthState() { return AuthState; } diff --git a/ydb/core/public_http/grpc_request_context_wrapper.h b/ydb/core/public_http/grpc_request_context_wrapper.h index 70c11898726..25605e2c20c 100644 --- a/ydb/core/public_http/grpc_request_context_wrapper.h +++ b/ydb/core/public_http/grpc_request_context_wrapper.h @@ -25,6 +25,7 @@ private: public: TGrpcRequestContextWrapper(const THttpRequestContext& requestContext, std::unique_ptr<NProtoBuf::Message> request, TReplySender replySender); virtual const NProtoBuf::Message* GetRequest() const; + virtual NProtoBuf::Message* GetRequestMut(); virtual NGrpc::TAuthState& GetAuthState(); virtual void Reply(NProtoBuf::Message* resp, ui32 status = 0); virtual void Reply(grpc::ByteBuffer* resp, ui32 status = 0); diff --git a/ydb/core/viewer/json_query.h b/ydb/core/viewer/json_query.h index fc7566e71cf..abc35c8b60b 100644 --- a/ydb/core/viewer/json_query.h +++ b/ydb/core/viewer/json_query.h @@ -295,7 +295,7 @@ private: try { for (const auto& result : response.GetResults()) { Ydb::ResultSet resultSet; - NGRpcService::ConvertKqpQueryResultToDbResult(result, &resultSet); + NKqp::ConvertKqpQueryResultToDbResult(result, &resultSet); ResultSets.emplace_back(std::move(resultSet)); } } |