aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2023-03-15 21:52:00 +0300
committergvit <gvit@ydb.tech>2023-03-15 21:52:00 +0300
commit375bf331910edc492b41e0bae27b5ed1dd334ac4 (patch)
tree25d0ca91ac499e6598d3ad7db668934c46282825
parented5086c2dbf543b595a317eb344d87a72fa0aa94 (diff)
downloadydb-375bf331910edc492b41e0bae27b5ed1dd334ac4.tar.gz
Revert commit rXXXXXX
-rw-r--r--ydb/core/grpc_services/base/base.h3
-rw-r--r--ydb/core/grpc_services/rpc_execute_data_query.cpp8
-rw-r--r--ydb/core/kqp/common/kqp.h13
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp27
4 files changed, 12 insertions, 39 deletions
diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h
index 07d5733d0e9..a16444b08d7 100644
--- a/ydb/core/grpc_services/base/base.h
+++ b/ydb/core/grpc_services/base/base.h
@@ -393,8 +393,6 @@ public:
virtual const TMaybe<TString> GetRequestType() const = 0;
// Implementation must be thread safe
virtual void SetClientLostAction(std::function<void()>&& cb) = 0;
- // Allocation is thread safe. https://protobuf.dev/reference/cpp/arenas/#thread-safety
- virtual google::protobuf::Arena* GetArena() = 0;
};
// Request context
@@ -407,6 +405,7 @@ class IRequestCtx
public:
virtual google::protobuf::Message* GetRequestMut() = 0;
+ virtual google::protobuf::Arena* GetArena() = 0;
virtual void SetRuHeader(ui64 ru) = 0;
virtual void AddServerHint(const TString& hint) = 0;
diff --git a/ydb/core/grpc_services/rpc_execute_data_query.cpp b/ydb/core/grpc_services/rpc_execute_data_query.cpp
index ad7ba11da1e..1f9cd9b5ee6 100644
--- a/ydb/core/grpc_services/rpc_execute_data_query.cpp
+++ b/ydb/core/grpc_services/rpc_execute_data_query.cpp
@@ -165,7 +165,7 @@ public:
}
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
- auto& record = ev->Get()->Record.GetRef();
+ const auto& record = ev->Get()->Record.GetRef();
SetCost(record.GetConsumedRu());
AddServerHintsIfAny(record);
@@ -176,11 +176,7 @@ public:
try {
if (kqpResponse.GetYdbResults().size()) {
- Y_VERIFY_DEBUG(!kqpResponse.GetYdbResults().GetArena() ||
- queryResult->mutable_result_sets()->GetArena() == kqpResponse.GetYdbResults().GetArena());
- // https://protobuf.dev/reference/cpp/arenas/#swap
- // Actualy will be copy in case pf remote execution
- queryResult->mutable_result_sets()->Swap(record.MutableResponse()->MutableYdbResults());
+ queryResult->mutable_result_sets()->CopyFrom(kqpResponse.GetYdbResults());
} else {
NKqp::ConvertKqpQueryResultsToDbResult(kqpResponse, queryResult);
}
diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h
index 9f62f520410..1f03ca80f14 100644
--- a/ydb/core/kqp/common/kqp.h
+++ b/ydb/core/kqp/common/kqp.h
@@ -377,10 +377,6 @@ struct TEvKqp {
return RequestCtx ? RequestActorId : ActorIdFromProto(Record.GetRequestActorId());
}
- google::protobuf::Arena* GetArena() {
- return RequestCtx ? RequestCtx->GetArena() : nullptr;
- }
-
const TString& GetTraceId() const {
if (RequestCtx) {
if (!TraceId) {
@@ -579,17 +575,14 @@ struct TEvKqp {
}
void Realloc(std::shared_ptr<google::protobuf::Arena> arena) {
- ReallocRef(arena.get());
- Arena_ = arena;
- }
-
- void ReallocRef(google::protobuf::Arena* arena) {
// Allow realloc only if previous allocation was made using "normal" allocator
// and no data was writen. It prevents ineffective using of protobuf.
Y_ASSERT(!Protobuf_->GetArena());
Y_ASSERT(ByteSize() == 0);
delete Protobuf_;
- Protobuf_ = google::protobuf::Arena::CreateMessage<TProto>(arena);
+ Protobuf_ = google::protobuf::Arena::CreateMessage<TProto>(arena.get());
+ // Make sure arena is alive
+ Arena_ = arena;
}
bool ParseFromString(const TString& data) {
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 0f114edc8b1..ef0721cc1ff 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -239,24 +239,8 @@ struct TKqpQueryState {
ResetTimer();
return CpuTime;
}
-
- // Returns nullptr in case of no local event
- google::protobuf::Arena* GetArena() {
- return RequestEv->GetArena();
- }
};
-std::unique_ptr<TEvKqp::TEvQueryResponse> AllocQueryResponse(const std::shared_ptr<TKqpQueryState>& state) {
- auto resEv = std::make_unique<TEvKqp::TEvQueryResponse>();
- if (auto reqArena = state->GetArena()) {
- resEv->Record.ReallocRef(reqArena);
- } else {
- auto arena = std::make_shared<google::protobuf::Arena>();
- resEv->Record.Realloc(arena);
- }
- return resEv;
-}
-
struct TKqpCleanupCtx {
std::deque<TIntrusivePtr<TKqpTransactionContext>> TransactionsToBeAborted;
bool IsWaitingForWorkerToClose = false;
@@ -1545,8 +1529,9 @@ public:
}
void ReplySuccess() {
- YQL_ENSURE(QueryState);
- auto resEv = AllocQueryResponse(QueryState);
+ auto resEv = std::make_unique<TEvKqp::TEvQueryResponse>();
+ std::shared_ptr<google::protobuf::Arena> arena(new google::protobuf::Arena());
+ resEv->Record.Realloc(arena);
auto *record = &resEv->Record.GetRef();
auto *response = record->MutableResponse();
@@ -1556,6 +1541,7 @@ public:
FillStats(record);
+ YQL_ENSURE(QueryState);
if (QueryState->TxCtx) {
QueryState->TxCtx->OnEndQuery();
}
@@ -1605,7 +1591,6 @@ public:
bool useYdbResponseFormat = QueryState->GetUsePublicResponseDataFormat();
// Result for scan query is sent directly to target actor.
- Y_VERIFY(response->GetArena());
if (QueryState->PreparedQuery && !QueryState->IsStreamResult()) {
auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
for (size_t i = 0; i < phyQuery.ResultBindingsSize(); ++i) {
@@ -1615,7 +1600,7 @@ public:
YQL_ENSURE(QueryState->QueryData->HasResult(txIndex, resultIndex));
auto g = QueryState->QueryData->TypeEnv().BindAllocator();
- auto* protoRes = QueryState->QueryData->GetMkqlTxResult(txIndex, resultIndex, response->GetArena());
+ auto* protoRes = QueryState->QueryData->GetMkqlTxResult(txIndex, resultIndex, arena.get());
std::optional<IDataProvider::TFillSettings> fillSettings;
if (QueryState->PreparedQuery->ResultsSize()) {
YQL_ENSURE(phyQuery.ResultBindingsSize() == QueryState->PreparedQuery->ResultsSize(), ""
@@ -1626,7 +1611,7 @@ public:
fillSettings->RowsLimitPerWrite = result.GetRowsLimit();
}
}
- auto* finalResult = KikimrResultToProto(*protoRes, {}, fillSettings.value_or(FillSettings), response->GetArena());
+ auto* finalResult = KikimrResultToProto(*protoRes, {}, fillSettings.value_or(FillSettings), arena.get());
if (useYdbResponseFormat) {
ConvertKqpQueryResultToDbResult(*finalResult, response->AddYdbResults());
} else {