diff options
author | gvit <gvit@ydb.tech> | 2023-07-02 19:19:56 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2023-07-02 19:19:56 +0300 |
commit | 995eb3873b3cee8fb2c969a27d8871d67909393d (patch) | |
tree | 0b80fc5749a13250c81810919b865598a4495d03 | |
parent | 46348e368ed43863ad93eb203f05b920103b60d1 (diff) | |
download | ydb-995eb3873b3cee8fb2c969a27d8871d67909393d.tar.gz |
fix use after free part 1
4 files changed, 18 insertions, 18 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index a2754c85b9..ceda9211b7 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -991,9 +991,9 @@ protected: IActor* proxy; if (txResult.IsStream) { proxy = CreateResultStreamChannelProxy(TxId, channel.Id, txResult.MkqlItemType, - txResult.ColumnOrder, txResult.QueryResultIndex, Target, Stats.get(), this->SelfId()); + txResult.ColumnOrder, txResult.QueryResultIndex, Target, Stats, this->SelfId()); } else { - proxy = CreateResultDataChannelProxy(TxId, channel.Id, Stats.get(), this->SelfId(), + proxy = CreateResultDataChannelProxy(TxId, channel.Id, Stats, this->SelfId(), channel.DstInputIndex, ResponseEv.get()); } @@ -1070,7 +1070,7 @@ protected: const TString Database; const TIntrusiveConstPtr<NACLib::TUserToken> UserToken; TKqpRequestCounters::TPtr Counters; - std::unique_ptr<TQueryExecutionStats> Stats; + std::shared_ptr<TQueryExecutionStats> Stats; TInstant StartTime; TMaybe<TInstant> Deadline; TMaybe<TInstant> CancelAt; diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.h b/ydb/core/kqp/executer_actor/kqp_executer_stats.h index 5cae2ad4aa..f4bd4770ed 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_stats.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.h @@ -25,8 +25,8 @@ public: // basic stats std::unordered_set<ui64> AffectedShards; ui32 TotalTasks = 0; - ui64 ResultBytes = 0; - ui64 ResultRows = 0; + std::atomic<ui64> ResultBytes = 0; + std::atomic<ui64> ResultRows = 0; TDuration ExecuterCpuTime; TInstant StartTs; diff --git a/ydb/core/kqp/executer_actor/kqp_result_channel.cpp b/ydb/core/kqp/executer_actor/kqp_result_channel.cpp index 27901c2365..0ffcf1d7c6 100644 --- a/ydb/core/kqp/executer_actor/kqp_result_channel.cpp +++ b/ydb/core/kqp/executer_actor/kqp_result_channel.cpp @@ -33,11 +33,11 @@ public: return NKikimrServices::TActivity::KQP_RESULT_CHANNEL_PROXY; } - TResultCommonChannelProxy(ui64 txId, ui64 channelId, TQueryExecutionStats* stats, TActorId executer) + TResultCommonChannelProxy(ui64 txId, ui64 channelId, std::shared_ptr<TQueryExecutionStats> stats, TActorId executer) : TActor(&TResultCommonChannelProxy::WorkState) , TxId(txId) , ChannelId(channelId) - , Stats(stats) + , Stats(std::move(stats)) , Executer(executer) {} protected: @@ -130,7 +130,7 @@ private: private: const ui64 TxId; const ui64 ChannelId; - TQueryExecutionStats* Stats; // owned by KqpExecuter + std::shared_ptr<TQueryExecutionStats> Stats; // owned by KqpExecuter const NActors::TActorId Executer; NActors::TActorId ComputeActor; }; @@ -138,9 +138,9 @@ private: class TResultStreamChannelProxy : public TResultCommonChannelProxy { public: TResultStreamChannelProxy(ui64 txId, ui64 channelId, NKikimr::NMiniKQL::TType* itemType, - const TVector<ui32>* columnOrder, ui32 queryResultIndex, TActorId target, TQueryExecutionStats* stats, + const TVector<ui32>* columnOrder, ui32 queryResultIndex, TActorId target, std::shared_ptr<TQueryExecutionStats> stats, TActorId executer) - : TResultCommonChannelProxy(txId, channelId, stats, executer) + : TResultCommonChannelProxy(txId, channelId, std::move(stats), executer) , ColumnOrder(columnOrder) , ItemType(itemType) , QueryResultIndex(queryResultIndex) @@ -179,9 +179,9 @@ private: class TResultDataChannelProxy : public TResultCommonChannelProxy { public: - TResultDataChannelProxy(ui64 txId, ui64 channelId, TQueryExecutionStats* stats, TActorId executer, + TResultDataChannelProxy(ui64 txId, ui64 channelId, std::shared_ptr<TQueryExecutionStats> stats, TActorId executer, ui32 inputIndex, TEvKqpExecuter::TEvTxResponse* resultReceiver) - : TResultCommonChannelProxy(txId, channelId, stats, executer) + : TResultCommonChannelProxy(txId, channelId, std::move(stats), executer) , InputIndex(inputIndex) , ResultReceiver(resultReceiver) {} @@ -212,7 +212,7 @@ private: } // anonymous namespace end NActors::IActor* CreateResultStreamChannelProxy(ui64 txId, ui64 channelId, NKikimr::NMiniKQL::TType* itemType, - const TVector<ui32>* columnOrder, ui32 queryResultIndex, TActorId target, TQueryExecutionStats* stats, + const TVector<ui32>* columnOrder, ui32 queryResultIndex, TActorId target, std::shared_ptr<TQueryExecutionStats> stats, TActorId executer) { LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER, @@ -221,11 +221,11 @@ NActors::IActor* CreateResultStreamChannelProxy(ui64 txId, ui64 channelId, NKiki ); return new TResultStreamChannelProxy(txId, channelId, itemType, columnOrder, queryResultIndex, target, - stats, executer); + std::move(stats), executer); } NActors::IActor* CreateResultDataChannelProxy(ui64 txId, ui64 channelId, - TQueryExecutionStats* stats, TActorId executer, + std::shared_ptr<TQueryExecutionStats> stats, TActorId executer, ui32 inputIndex, TEvKqpExecuter::TEvTxResponse* resultsReceiver) { LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER, @@ -233,7 +233,7 @@ NActors::IActor* CreateResultDataChannelProxy(ui64 txId, ui64 channelId, ", channelId: " << channelId ); - return new TResultDataChannelProxy(txId, channelId, stats, executer, inputIndex, resultsReceiver); + return new TResultDataChannelProxy(txId, channelId, std::move(stats), executer, inputIndex, resultsReceiver); } } // namespace NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_result_channel.h b/ydb/core/kqp/executer_actor/kqp_result_channel.h index 9589b1b267..ec7cf2b91e 100644 --- a/ydb/core/kqp/executer_actor/kqp_result_channel.h +++ b/ydb/core/kqp/executer_actor/kqp_result_channel.h @@ -26,10 +26,10 @@ struct TQueryExecutionStats; struct TKqpExecuterTxResult; NActors::IActor* CreateResultStreamChannelProxy(ui64 txId, ui64 channelId, NKikimr::NMiniKQL::TType* itemType, - const TVector<ui32>* columnOrder, ui32 queryResultIndex, NActors::TActorId target, TQueryExecutionStats* stats, + const TVector<ui32>* columnOrder, ui32 queryResultIndex, NActors::TActorId target, std::shared_ptr<TQueryExecutionStats> stats, NActors::TActorId executer); -NActors::IActor* CreateResultDataChannelProxy(ui64 txId, ui64 channelId, TQueryExecutionStats* stats, +NActors::IActor* CreateResultDataChannelProxy(ui64 txId, ui64 channelId, std::shared_ptr<TQueryExecutionStats> stats, NActors::TActorId executer, ui32 inputIndex, TEvKqpExecuter::TEvTxResponse* receiver); } // namespace NKikimr::NKqp |