diff options
author | dcherednik <dcherednik@ydb.tech> | 2023-03-15 11:54:23 +0300 |
---|---|---|
committer | dcherednik <dcherednik@ydb.tech> | 2023-03-15 11:54:23 +0300 |
commit | 37c8e6daa14894ac5c80794042203224f2804ba9 (patch) | |
tree | a89820d32322eef633764be4810c037c36541bdc | |
parent | 107caf8ebd8673ad7b867bd4f673b5d9922a689c (diff) | |
download | ydb-37c8e6daa14894ac5c80794042203224f2804ba9.tar.gz |
Do not copy response in case of local execution.
-rw-r--r-- | ydb/core/grpc_services/base/base.h | 3 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_execute_data_query.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp.h | 13 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 27 |
4 files changed, 39 insertions, 12 deletions
diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h index a16444b08d7..07d5733d0e9 100644 --- a/ydb/core/grpc_services/base/base.h +++ b/ydb/core/grpc_services/base/base.h @@ -393,6 +393,8 @@ public: virtual const TMaybe<TString> GetRequestType() const = 0; // Implementation must be thread safe virtual void SetClientLostAction(std::function<void()>&& cb) = 0; + // Allocation is thread safe. https://protobuf.dev/reference/cpp/arenas/#thread-safety + virtual google::protobuf::Arena* GetArena() = 0; }; // Request context @@ -405,7 +407,6 @@ class IRequestCtx public: virtual google::protobuf::Message* GetRequestMut() = 0; - virtual google::protobuf::Arena* GetArena() = 0; virtual void SetRuHeader(ui64 ru) = 0; virtual void AddServerHint(const TString& hint) = 0; diff --git a/ydb/core/grpc_services/rpc_execute_data_query.cpp b/ydb/core/grpc_services/rpc_execute_data_query.cpp index 1f9cd9b5ee6..ad7ba11da1e 100644 --- a/ydb/core/grpc_services/rpc_execute_data_query.cpp +++ b/ydb/core/grpc_services/rpc_execute_data_query.cpp @@ -165,7 +165,7 @@ public: } void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { - const auto& record = ev->Get()->Record.GetRef(); + auto& record = ev->Get()->Record.GetRef(); SetCost(record.GetConsumedRu()); AddServerHintsIfAny(record); @@ -176,7 +176,11 @@ public: try { if (kqpResponse.GetYdbResults().size()) { - queryResult->mutable_result_sets()->CopyFrom(kqpResponse.GetYdbResults()); + Y_VERIFY_DEBUG(!kqpResponse.GetYdbResults().GetArena() || + queryResult->mutable_result_sets()->GetArena() == kqpResponse.GetYdbResults().GetArena()); + // https://protobuf.dev/reference/cpp/arenas/#swap + // Actualy will be copy in case pf remote execution + queryResult->mutable_result_sets()->Swap(record.MutableResponse()->MutableYdbResults()); } else { NKqp::ConvertKqpQueryResultsToDbResult(kqpResponse, queryResult); } diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h index 1f03ca80f14..9f62f520410 100644 --- a/ydb/core/kqp/common/kqp.h +++ b/ydb/core/kqp/common/kqp.h @@ -377,6 +377,10 @@ struct TEvKqp { return RequestCtx ? RequestActorId : ActorIdFromProto(Record.GetRequestActorId()); } + google::protobuf::Arena* GetArena() { + return RequestCtx ? RequestCtx->GetArena() : nullptr; + } + const TString& GetTraceId() const { if (RequestCtx) { if (!TraceId) { @@ -575,14 +579,17 @@ struct TEvKqp { } void Realloc(std::shared_ptr<google::protobuf::Arena> arena) { + ReallocRef(arena.get()); + Arena_ = arena; + } + + void ReallocRef(google::protobuf::Arena* arena) { // Allow realloc only if previous allocation was made using "normal" allocator // and no data was writen. It prevents ineffective using of protobuf. Y_ASSERT(!Protobuf_->GetArena()); Y_ASSERT(ByteSize() == 0); delete Protobuf_; - Protobuf_ = google::protobuf::Arena::CreateMessage<TProto>(arena.get()); - // Make sure arena is alive - Arena_ = arena; + Protobuf_ = google::protobuf::Arena::CreateMessage<TProto>(arena); } bool ParseFromString(const TString& data) { diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index ef0721cc1ff..0f114edc8b1 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -239,8 +239,24 @@ struct TKqpQueryState { ResetTimer(); return CpuTime; } + + // Returns nullptr in case of no local event + google::protobuf::Arena* GetArena() { + return RequestEv->GetArena(); + } }; +std::unique_ptr<TEvKqp::TEvQueryResponse> AllocQueryResponse(const std::shared_ptr<TKqpQueryState>& state) { + auto resEv = std::make_unique<TEvKqp::TEvQueryResponse>(); + if (auto reqArena = state->GetArena()) { + resEv->Record.ReallocRef(reqArena); + } else { + auto arena = std::make_shared<google::protobuf::Arena>(); + resEv->Record.Realloc(arena); + } + return resEv; +} + struct TKqpCleanupCtx { std::deque<TIntrusivePtr<TKqpTransactionContext>> TransactionsToBeAborted; bool IsWaitingForWorkerToClose = false; @@ -1529,9 +1545,8 @@ public: } void ReplySuccess() { - auto resEv = std::make_unique<TEvKqp::TEvQueryResponse>(); - std::shared_ptr<google::protobuf::Arena> arena(new google::protobuf::Arena()); - resEv->Record.Realloc(arena); + YQL_ENSURE(QueryState); + auto resEv = AllocQueryResponse(QueryState); auto *record = &resEv->Record.GetRef(); auto *response = record->MutableResponse(); @@ -1541,7 +1556,6 @@ public: FillStats(record); - YQL_ENSURE(QueryState); if (QueryState->TxCtx) { QueryState->TxCtx->OnEndQuery(); } @@ -1591,6 +1605,7 @@ public: bool useYdbResponseFormat = QueryState->GetUsePublicResponseDataFormat(); // Result for scan query is sent directly to target actor. + Y_VERIFY(response->GetArena()); if (QueryState->PreparedQuery && !QueryState->IsStreamResult()) { auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery(); for (size_t i = 0; i < phyQuery.ResultBindingsSize(); ++i) { @@ -1600,7 +1615,7 @@ public: YQL_ENSURE(QueryState->QueryData->HasResult(txIndex, resultIndex)); auto g = QueryState->QueryData->TypeEnv().BindAllocator(); - auto* protoRes = QueryState->QueryData->GetMkqlTxResult(txIndex, resultIndex, arena.get()); + auto* protoRes = QueryState->QueryData->GetMkqlTxResult(txIndex, resultIndex, response->GetArena()); std::optional<IDataProvider::TFillSettings> fillSettings; if (QueryState->PreparedQuery->ResultsSize()) { YQL_ENSURE(phyQuery.ResultBindingsSize() == QueryState->PreparedQuery->ResultsSize(), "" @@ -1611,7 +1626,7 @@ public: fillSettings->RowsLimitPerWrite = result.GetRowsLimit(); } } - auto* finalResult = KikimrResultToProto(*protoRes, {}, fillSettings.value_or(FillSettings), arena.get()); + auto* finalResult = KikimrResultToProto(*protoRes, {}, fillSettings.value_or(FillSettings), response->GetArena()); if (useYdbResponseFormat) { ConvertKqpQueryResultToDbResult(*finalResult, response->AddYdbResults()); } else { |