aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordcherednik <dcherednik@ydb.tech>2023-03-15 11:54:23 +0300
committerdcherednik <dcherednik@ydb.tech>2023-03-15 11:54:23 +0300
commit37c8e6daa14894ac5c80794042203224f2804ba9 (patch)
treea89820d32322eef633764be4810c037c36541bdc
parent107caf8ebd8673ad7b867bd4f673b5d9922a689c (diff)
downloadydb-37c8e6daa14894ac5c80794042203224f2804ba9.tar.gz
Do not copy response in case of local execution.
-rw-r--r--ydb/core/grpc_services/base/base.h3
-rw-r--r--ydb/core/grpc_services/rpc_execute_data_query.cpp8
-rw-r--r--ydb/core/kqp/common/kqp.h13
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp27
4 files changed, 39 insertions, 12 deletions
diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h
index a16444b08d7..07d5733d0e9 100644
--- a/ydb/core/grpc_services/base/base.h
+++ b/ydb/core/grpc_services/base/base.h
@@ -393,6 +393,8 @@ public:
virtual const TMaybe<TString> GetRequestType() const = 0;
// Implementation must be thread safe
virtual void SetClientLostAction(std::function<void()>&& cb) = 0;
+ // Allocation is thread safe. https://protobuf.dev/reference/cpp/arenas/#thread-safety
+ virtual google::protobuf::Arena* GetArena() = 0;
};
// Request context
@@ -405,7 +407,6 @@ class IRequestCtx
public:
virtual google::protobuf::Message* GetRequestMut() = 0;
- virtual google::protobuf::Arena* GetArena() = 0;
virtual void SetRuHeader(ui64 ru) = 0;
virtual void AddServerHint(const TString& hint) = 0;
diff --git a/ydb/core/grpc_services/rpc_execute_data_query.cpp b/ydb/core/grpc_services/rpc_execute_data_query.cpp
index 1f9cd9b5ee6..ad7ba11da1e 100644
--- a/ydb/core/grpc_services/rpc_execute_data_query.cpp
+++ b/ydb/core/grpc_services/rpc_execute_data_query.cpp
@@ -165,7 +165,7 @@ public:
}
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
- const auto& record = ev->Get()->Record.GetRef();
+ auto& record = ev->Get()->Record.GetRef();
SetCost(record.GetConsumedRu());
AddServerHintsIfAny(record);
@@ -176,7 +176,11 @@ public:
try {
if (kqpResponse.GetYdbResults().size()) {
- queryResult->mutable_result_sets()->CopyFrom(kqpResponse.GetYdbResults());
+ Y_VERIFY_DEBUG(!kqpResponse.GetYdbResults().GetArena() ||
+ queryResult->mutable_result_sets()->GetArena() == kqpResponse.GetYdbResults().GetArena());
+ // https://protobuf.dev/reference/cpp/arenas/#swap
+ // Actualy will be copy in case pf remote execution
+ queryResult->mutable_result_sets()->Swap(record.MutableResponse()->MutableYdbResults());
} else {
NKqp::ConvertKqpQueryResultsToDbResult(kqpResponse, queryResult);
}
diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h
index 1f03ca80f14..9f62f520410 100644
--- a/ydb/core/kqp/common/kqp.h
+++ b/ydb/core/kqp/common/kqp.h
@@ -377,6 +377,10 @@ struct TEvKqp {
return RequestCtx ? RequestActorId : ActorIdFromProto(Record.GetRequestActorId());
}
+ google::protobuf::Arena* GetArena() {
+ return RequestCtx ? RequestCtx->GetArena() : nullptr;
+ }
+
const TString& GetTraceId() const {
if (RequestCtx) {
if (!TraceId) {
@@ -575,14 +579,17 @@ struct TEvKqp {
}
void Realloc(std::shared_ptr<google::protobuf::Arena> arena) {
+ ReallocRef(arena.get());
+ Arena_ = arena;
+ }
+
+ void ReallocRef(google::protobuf::Arena* arena) {
// Allow realloc only if previous allocation was made using "normal" allocator
// and no data was writen. It prevents ineffective using of protobuf.
Y_ASSERT(!Protobuf_->GetArena());
Y_ASSERT(ByteSize() == 0);
delete Protobuf_;
- Protobuf_ = google::protobuf::Arena::CreateMessage<TProto>(arena.get());
- // Make sure arena is alive
- Arena_ = arena;
+ Protobuf_ = google::protobuf::Arena::CreateMessage<TProto>(arena);
}
bool ParseFromString(const TString& data) {
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index ef0721cc1ff..0f114edc8b1 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -239,8 +239,24 @@ struct TKqpQueryState {
ResetTimer();
return CpuTime;
}
+
+ // Returns nullptr in case of no local event
+ google::protobuf::Arena* GetArena() {
+ return RequestEv->GetArena();
+ }
};
+std::unique_ptr<TEvKqp::TEvQueryResponse> AllocQueryResponse(const std::shared_ptr<TKqpQueryState>& state) {
+ auto resEv = std::make_unique<TEvKqp::TEvQueryResponse>();
+ if (auto reqArena = state->GetArena()) {
+ resEv->Record.ReallocRef(reqArena);
+ } else {
+ auto arena = std::make_shared<google::protobuf::Arena>();
+ resEv->Record.Realloc(arena);
+ }
+ return resEv;
+}
+
struct TKqpCleanupCtx {
std::deque<TIntrusivePtr<TKqpTransactionContext>> TransactionsToBeAborted;
bool IsWaitingForWorkerToClose = false;
@@ -1529,9 +1545,8 @@ public:
}
void ReplySuccess() {
- auto resEv = std::make_unique<TEvKqp::TEvQueryResponse>();
- std::shared_ptr<google::protobuf::Arena> arena(new google::protobuf::Arena());
- resEv->Record.Realloc(arena);
+ YQL_ENSURE(QueryState);
+ auto resEv = AllocQueryResponse(QueryState);
auto *record = &resEv->Record.GetRef();
auto *response = record->MutableResponse();
@@ -1541,7 +1556,6 @@ public:
FillStats(record);
- YQL_ENSURE(QueryState);
if (QueryState->TxCtx) {
QueryState->TxCtx->OnEndQuery();
}
@@ -1591,6 +1605,7 @@ public:
bool useYdbResponseFormat = QueryState->GetUsePublicResponseDataFormat();
// Result for scan query is sent directly to target actor.
+ Y_VERIFY(response->GetArena());
if (QueryState->PreparedQuery && !QueryState->IsStreamResult()) {
auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
for (size_t i = 0; i < phyQuery.ResultBindingsSize(); ++i) {
@@ -1600,7 +1615,7 @@ public:
YQL_ENSURE(QueryState->QueryData->HasResult(txIndex, resultIndex));
auto g = QueryState->QueryData->TypeEnv().BindAllocator();
- auto* protoRes = QueryState->QueryData->GetMkqlTxResult(txIndex, resultIndex, arena.get());
+ auto* protoRes = QueryState->QueryData->GetMkqlTxResult(txIndex, resultIndex, response->GetArena());
std::optional<IDataProvider::TFillSettings> fillSettings;
if (QueryState->PreparedQuery->ResultsSize()) {
YQL_ENSURE(phyQuery.ResultBindingsSize() == QueryState->PreparedQuery->ResultsSize(), ""
@@ -1611,7 +1626,7 @@ public:
fillSettings->RowsLimitPerWrite = result.GetRowsLimit();
}
}
- auto* finalResult = KikimrResultToProto(*protoRes, {}, fillSettings.value_or(FillSettings), arena.get());
+ auto* finalResult = KikimrResultToProto(*protoRes, {}, fillSettings.value_or(FillSettings), response->GetArena());
if (useYdbResponseFormat) {
ConvertKqpQueryResultToDbResult(*finalResult, response->AddYdbResults());
} else {