aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2023-07-02 19:19:56 +0300
committergvit <gvit@ydb.tech>2023-07-02 19:19:56 +0300
commit995eb3873b3cee8fb2c969a27d8871d67909393d (patch)
tree0b80fc5749a13250c81810919b865598a4495d03
parent46348e368ed43863ad93eb203f05b920103b60d1 (diff)
downloadydb-995eb3873b3cee8fb2c969a27d8871d67909393d.tar.gz
fix use after free part 1
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h6
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_stats.h4
-rw-r--r--ydb/core/kqp/executer_actor/kqp_result_channel.cpp22
-rw-r--r--ydb/core/kqp/executer_actor/kqp_result_channel.h4
4 files changed, 18 insertions, 18 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index a2754c85b9..ceda9211b7 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -991,9 +991,9 @@ protected:
IActor* proxy;
if (txResult.IsStream) {
proxy = CreateResultStreamChannelProxy(TxId, channel.Id, txResult.MkqlItemType,
- txResult.ColumnOrder, txResult.QueryResultIndex, Target, Stats.get(), this->SelfId());
+ txResult.ColumnOrder, txResult.QueryResultIndex, Target, Stats, this->SelfId());
} else {
- proxy = CreateResultDataChannelProxy(TxId, channel.Id, Stats.get(), this->SelfId(),
+ proxy = CreateResultDataChannelProxy(TxId, channel.Id, Stats, this->SelfId(),
channel.DstInputIndex, ResponseEv.get());
}
@@ -1070,7 +1070,7 @@ protected:
const TString Database;
const TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
TKqpRequestCounters::TPtr Counters;
- std::unique_ptr<TQueryExecutionStats> Stats;
+ std::shared_ptr<TQueryExecutionStats> Stats;
TInstant StartTime;
TMaybe<TInstant> Deadline;
TMaybe<TInstant> CancelAt;
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.h b/ydb/core/kqp/executer_actor/kqp_executer_stats.h
index 5cae2ad4aa..f4bd4770ed 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_stats.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.h
@@ -25,8 +25,8 @@ public:
// basic stats
std::unordered_set<ui64> AffectedShards;
ui32 TotalTasks = 0;
- ui64 ResultBytes = 0;
- ui64 ResultRows = 0;
+ std::atomic<ui64> ResultBytes = 0;
+ std::atomic<ui64> ResultRows = 0;
TDuration ExecuterCpuTime;
TInstant StartTs;
diff --git a/ydb/core/kqp/executer_actor/kqp_result_channel.cpp b/ydb/core/kqp/executer_actor/kqp_result_channel.cpp
index 27901c2365..0ffcf1d7c6 100644
--- a/ydb/core/kqp/executer_actor/kqp_result_channel.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_result_channel.cpp
@@ -33,11 +33,11 @@ public:
return NKikimrServices::TActivity::KQP_RESULT_CHANNEL_PROXY;
}
- TResultCommonChannelProxy(ui64 txId, ui64 channelId, TQueryExecutionStats* stats, TActorId executer)
+ TResultCommonChannelProxy(ui64 txId, ui64 channelId, std::shared_ptr<TQueryExecutionStats> stats, TActorId executer)
: TActor(&TResultCommonChannelProxy::WorkState)
, TxId(txId)
, ChannelId(channelId)
- , Stats(stats)
+ , Stats(std::move(stats))
, Executer(executer) {}
protected:
@@ -130,7 +130,7 @@ private:
private:
const ui64 TxId;
const ui64 ChannelId;
- TQueryExecutionStats* Stats; // owned by KqpExecuter
+ std::shared_ptr<TQueryExecutionStats> Stats; // owned by KqpExecuter
const NActors::TActorId Executer;
NActors::TActorId ComputeActor;
};
@@ -138,9 +138,9 @@ private:
class TResultStreamChannelProxy : public TResultCommonChannelProxy {
public:
TResultStreamChannelProxy(ui64 txId, ui64 channelId, NKikimr::NMiniKQL::TType* itemType,
- const TVector<ui32>* columnOrder, ui32 queryResultIndex, TActorId target, TQueryExecutionStats* stats,
+ const TVector<ui32>* columnOrder, ui32 queryResultIndex, TActorId target, std::shared_ptr<TQueryExecutionStats> stats,
TActorId executer)
- : TResultCommonChannelProxy(txId, channelId, stats, executer)
+ : TResultCommonChannelProxy(txId, channelId, std::move(stats), executer)
, ColumnOrder(columnOrder)
, ItemType(itemType)
, QueryResultIndex(queryResultIndex)
@@ -179,9 +179,9 @@ private:
class TResultDataChannelProxy : public TResultCommonChannelProxy {
public:
- TResultDataChannelProxy(ui64 txId, ui64 channelId, TQueryExecutionStats* stats, TActorId executer,
+ TResultDataChannelProxy(ui64 txId, ui64 channelId, std::shared_ptr<TQueryExecutionStats> stats, TActorId executer,
ui32 inputIndex, TEvKqpExecuter::TEvTxResponse* resultReceiver)
- : TResultCommonChannelProxy(txId, channelId, stats, executer)
+ : TResultCommonChannelProxy(txId, channelId, std::move(stats), executer)
, InputIndex(inputIndex)
, ResultReceiver(resultReceiver) {}
@@ -212,7 +212,7 @@ private:
} // anonymous namespace end
NActors::IActor* CreateResultStreamChannelProxy(ui64 txId, ui64 channelId, NKikimr::NMiniKQL::TType* itemType,
- const TVector<ui32>* columnOrder, ui32 queryResultIndex, TActorId target, TQueryExecutionStats* stats,
+ const TVector<ui32>* columnOrder, ui32 queryResultIndex, TActorId target, std::shared_ptr<TQueryExecutionStats> stats,
TActorId executer)
{
LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER,
@@ -221,11 +221,11 @@ NActors::IActor* CreateResultStreamChannelProxy(ui64 txId, ui64 channelId, NKiki
);
return new TResultStreamChannelProxy(txId, channelId, itemType, columnOrder, queryResultIndex, target,
- stats, executer);
+ std::move(stats), executer);
}
NActors::IActor* CreateResultDataChannelProxy(ui64 txId, ui64 channelId,
- TQueryExecutionStats* stats, TActorId executer,
+ std::shared_ptr<TQueryExecutionStats> stats, TActorId executer,
ui32 inputIndex, TEvKqpExecuter::TEvTxResponse* resultsReceiver)
{
LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER,
@@ -233,7 +233,7 @@ NActors::IActor* CreateResultDataChannelProxy(ui64 txId, ui64 channelId,
", channelId: " << channelId
);
- return new TResultDataChannelProxy(txId, channelId, stats, executer, inputIndex, resultsReceiver);
+ return new TResultDataChannelProxy(txId, channelId, std::move(stats), executer, inputIndex, resultsReceiver);
}
} // namespace NKqp
diff --git a/ydb/core/kqp/executer_actor/kqp_result_channel.h b/ydb/core/kqp/executer_actor/kqp_result_channel.h
index 9589b1b267..ec7cf2b91e 100644
--- a/ydb/core/kqp/executer_actor/kqp_result_channel.h
+++ b/ydb/core/kqp/executer_actor/kqp_result_channel.h
@@ -26,10 +26,10 @@ struct TQueryExecutionStats;
struct TKqpExecuterTxResult;
NActors::IActor* CreateResultStreamChannelProxy(ui64 txId, ui64 channelId, NKikimr::NMiniKQL::TType* itemType,
- const TVector<ui32>* columnOrder, ui32 queryResultIndex, NActors::TActorId target, TQueryExecutionStats* stats,
+ const TVector<ui32>* columnOrder, ui32 queryResultIndex, NActors::TActorId target, std::shared_ptr<TQueryExecutionStats> stats,
NActors::TActorId executer);
-NActors::IActor* CreateResultDataChannelProxy(ui64 txId, ui64 channelId, TQueryExecutionStats* stats,
+NActors::IActor* CreateResultDataChannelProxy(ui64 txId, ui64 channelId, std::shared_ptr<TQueryExecutionStats> stats,
NActors::TActorId executer, ui32 inputIndex, TEvKqpExecuter::TEvTxResponse* receiver);
} // namespace NKikimr::NKqp