diff options
author | gvit <gvit@ydb.tech> | 2023-03-15 21:52:00 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2023-03-15 21:52:00 +0300 |
commit | 375bf331910edc492b41e0bae27b5ed1dd334ac4 (patch) | |
tree | 25d0ca91ac499e6598d3ad7db668934c46282825 | |
parent | ed5086c2dbf543b595a317eb344d87a72fa0aa94 (diff) | |
download | ydb-375bf331910edc492b41e0bae27b5ed1dd334ac4.tar.gz |
Revert commit rXXXXXX
-rw-r--r-- | ydb/core/grpc_services/base/base.h | 3 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_execute_data_query.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/common/kqp.h | 13 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 27 |
4 files changed, 12 insertions, 39 deletions
diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h index 07d5733d0e9..a16444b08d7 100644 --- a/ydb/core/grpc_services/base/base.h +++ b/ydb/core/grpc_services/base/base.h @@ -393,8 +393,6 @@ public: virtual const TMaybe<TString> GetRequestType() const = 0; // Implementation must be thread safe virtual void SetClientLostAction(std::function<void()>&& cb) = 0; - // Allocation is thread safe. https://protobuf.dev/reference/cpp/arenas/#thread-safety - virtual google::protobuf::Arena* GetArena() = 0; }; // Request context @@ -407,6 +405,7 @@ class IRequestCtx public: virtual google::protobuf::Message* GetRequestMut() = 0; + virtual google::protobuf::Arena* GetArena() = 0; virtual void SetRuHeader(ui64 ru) = 0; virtual void AddServerHint(const TString& hint) = 0; diff --git a/ydb/core/grpc_services/rpc_execute_data_query.cpp b/ydb/core/grpc_services/rpc_execute_data_query.cpp index ad7ba11da1e..1f9cd9b5ee6 100644 --- a/ydb/core/grpc_services/rpc_execute_data_query.cpp +++ b/ydb/core/grpc_services/rpc_execute_data_query.cpp @@ -165,7 +165,7 @@ public: } void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) { - auto& record = ev->Get()->Record.GetRef(); + const auto& record = ev->Get()->Record.GetRef(); SetCost(record.GetConsumedRu()); AddServerHintsIfAny(record); @@ -176,11 +176,7 @@ public: try { if (kqpResponse.GetYdbResults().size()) { - Y_VERIFY_DEBUG(!kqpResponse.GetYdbResults().GetArena() || - queryResult->mutable_result_sets()->GetArena() == kqpResponse.GetYdbResults().GetArena()); - // https://protobuf.dev/reference/cpp/arenas/#swap - // Actualy will be copy in case pf remote execution - queryResult->mutable_result_sets()->Swap(record.MutableResponse()->MutableYdbResults()); + queryResult->mutable_result_sets()->CopyFrom(kqpResponse.GetYdbResults()); } else { NKqp::ConvertKqpQueryResultsToDbResult(kqpResponse, queryResult); } diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h index 9f62f520410..1f03ca80f14 100644 --- a/ydb/core/kqp/common/kqp.h +++ b/ydb/core/kqp/common/kqp.h @@ -377,10 +377,6 @@ struct TEvKqp { return RequestCtx ? RequestActorId : ActorIdFromProto(Record.GetRequestActorId()); } - google::protobuf::Arena* GetArena() { - return RequestCtx ? RequestCtx->GetArena() : nullptr; - } - const TString& GetTraceId() const { if (RequestCtx) { if (!TraceId) { @@ -579,17 +575,14 @@ struct TEvKqp { } void Realloc(std::shared_ptr<google::protobuf::Arena> arena) { - ReallocRef(arena.get()); - Arena_ = arena; - } - - void ReallocRef(google::protobuf::Arena* arena) { // Allow realloc only if previous allocation was made using "normal" allocator // and no data was writen. It prevents ineffective using of protobuf. Y_ASSERT(!Protobuf_->GetArena()); Y_ASSERT(ByteSize() == 0); delete Protobuf_; - Protobuf_ = google::protobuf::Arena::CreateMessage<TProto>(arena); + Protobuf_ = google::protobuf::Arena::CreateMessage<TProto>(arena.get()); + // Make sure arena is alive + Arena_ = arena; } bool ParseFromString(const TString& data) { diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 0f114edc8b1..ef0721cc1ff 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -239,24 +239,8 @@ struct TKqpQueryState { ResetTimer(); return CpuTime; } - - // Returns nullptr in case of no local event - google::protobuf::Arena* GetArena() { - return RequestEv->GetArena(); - } }; -std::unique_ptr<TEvKqp::TEvQueryResponse> AllocQueryResponse(const std::shared_ptr<TKqpQueryState>& state) { - auto resEv = std::make_unique<TEvKqp::TEvQueryResponse>(); - if (auto reqArena = state->GetArena()) { - resEv->Record.ReallocRef(reqArena); - } else { - auto arena = std::make_shared<google::protobuf::Arena>(); - resEv->Record.Realloc(arena); - } - return resEv; -} - struct TKqpCleanupCtx { std::deque<TIntrusivePtr<TKqpTransactionContext>> TransactionsToBeAborted; bool IsWaitingForWorkerToClose = false; @@ -1545,8 +1529,9 @@ public: } void ReplySuccess() { - YQL_ENSURE(QueryState); - auto resEv = AllocQueryResponse(QueryState); + auto resEv = std::make_unique<TEvKqp::TEvQueryResponse>(); + std::shared_ptr<google::protobuf::Arena> arena(new google::protobuf::Arena()); + resEv->Record.Realloc(arena); auto *record = &resEv->Record.GetRef(); auto *response = record->MutableResponse(); @@ -1556,6 +1541,7 @@ public: FillStats(record); + YQL_ENSURE(QueryState); if (QueryState->TxCtx) { QueryState->TxCtx->OnEndQuery(); } @@ -1605,7 +1591,6 @@ public: bool useYdbResponseFormat = QueryState->GetUsePublicResponseDataFormat(); // Result for scan query is sent directly to target actor. - Y_VERIFY(response->GetArena()); if (QueryState->PreparedQuery && !QueryState->IsStreamResult()) { auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery(); for (size_t i = 0; i < phyQuery.ResultBindingsSize(); ++i) { @@ -1615,7 +1600,7 @@ public: YQL_ENSURE(QueryState->QueryData->HasResult(txIndex, resultIndex)); auto g = QueryState->QueryData->TypeEnv().BindAllocator(); - auto* protoRes = QueryState->QueryData->GetMkqlTxResult(txIndex, resultIndex, response->GetArena()); + auto* protoRes = QueryState->QueryData->GetMkqlTxResult(txIndex, resultIndex, arena.get()); std::optional<IDataProvider::TFillSettings> fillSettings; if (QueryState->PreparedQuery->ResultsSize()) { YQL_ENSURE(phyQuery.ResultBindingsSize() == QueryState->PreparedQuery->ResultsSize(), "" @@ -1626,7 +1611,7 @@ public: fillSettings->RowsLimitPerWrite = result.GetRowsLimit(); } } - auto* finalResult = KikimrResultToProto(*protoRes, {}, fillSettings.value_or(FillSettings), response->GetArena()); + auto* finalResult = KikimrResultToProto(*protoRes, {}, fillSettings.value_or(FillSettings), arena.get()); if (useYdbResponseFormat) { ConvertKqpQueryResultToDbResult(*finalResult, response->AddYdbResults()); } else { |