aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorulya-sidorina <yulia@ydb.tech>2023-02-03 16:02:35 +0300
committerulya-sidorina <yulia@ydb.tech>2023-02-03 16:02:35 +0300
commit3a523cb63883287a45f6c2f5cc6b3f40e6316b05 (patch)
tree85b63a2aee75e4574a084baa08508f46cb85607d
parentcaf6f6406909fea82e873fe60b8b473c56f02d1c (diff)
downloadydb-3a523cb63883287a45f6c2f5cc6b3f40e6316b05.tar.gz
wrap allocator into shared_ptr
fix(dq_task_runner): wrap allocator into shared_ptr
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp9
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp17
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.h3
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h6
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.cpp155
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.h1
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp47
9 files changed, 151 insertions, 91 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index 09bd872c4d0..fe0224adb34 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -266,6 +266,7 @@ public:
, InputIndex(args.InputIndex)
, TypeEnv(args.TypeEnv)
, HolderFactory(args.HolderFactory)
+ , Alloc(args.Alloc)
{
TableId = TTableId(
Settings.GetTable().GetTableId().GetOwnerId(),
@@ -287,6 +288,13 @@ public:
}
}
+ virtual ~TKqpReadActor() {
+ if (!Results.empty() && Alloc) {
+ TGuard<NMiniKQL::TScopedAlloc> allocGuard(*Alloc);
+ Results.clear();
+ }
+ }
+
STFUNC(ReadyState) {
Y_UNUSED(ctx);
try {
@@ -981,6 +989,7 @@ private:
const ui64 InputIndex;
const NMiniKQL::TTypeEnvironment& TypeEnv;
const NMiniKQL::THolderFactory& HolderFactory;
+ std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
};
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
index 564c1b01591..b4c2f0dc3a3 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
@@ -24,9 +24,9 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
public:
TKqpStreamLookupActor(ui64 inputIndex, const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId,
const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory,
- NKikimrKqp::TKqpStreamLookupSettings&& settings)
+ std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, NKikimrKqp::TKqpStreamLookupSettings&& settings)
: InputIndex(inputIndex), Input(input), ComputeActorId(computeActorId), TypeEnv(typeEnv)
- , HolderFactory(holderFactory), TableId(MakeTableId(settings.GetTable()))
+ , HolderFactory(holderFactory), Alloc(alloc), TableId(MakeTableId(settings.GetTable()))
, Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId())
, LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>())
, ImmediateTx(settings.GetImmediateTx())
@@ -36,6 +36,13 @@ public:
, RetryReadTimeout(RETRY_READ_TIMEOUT) {
};
+ virtual ~TKqpStreamLookupActor() {
+ if (Input.HasValue() && Alloc) {
+ TGuard<NMiniKQL::TScopedAlloc> allocGuard(*Alloc);
+ Input.Clear();
+ }
+ }
+
void Bootstrap() {
ResolveTable();
@@ -624,6 +631,7 @@ private:
const NActors::TActorId ComputeActorId;
const NMiniKQL::TTypeEnvironment& TypeEnv;
const NMiniKQL::THolderFactory& HolderFactory;
+ std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
const TTableId TableId;
IKqpGateway::TKqpSnapshot Snapshot;
const TMaybe<ui64> LockTxId;
@@ -647,8 +655,9 @@ private:
std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(ui64 inputIndex,
const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId, const NMiniKQL::TTypeEnvironment& typeEnv,
- const NMiniKQL::THolderFactory& holderFactory, NKikimrKqp::TKqpStreamLookupSettings&& settings) {
- auto actor = new TKqpStreamLookupActor(inputIndex, input, computeActorId, typeEnv, holderFactory,
+ const NMiniKQL::THolderFactory& holderFactory, std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc,
+ NKikimrKqp::TKqpStreamLookupSettings&& settings) {
+ auto actor = new TKqpStreamLookupActor(inputIndex, input, computeActorId, typeEnv, holderFactory, alloc,
std::move(settings));
return {actor, actor};
}
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h
index 59d8d83b875..51205c88baa 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h
@@ -8,7 +8,8 @@ namespace NKqp {
std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(ui64 inputIndex,
const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId, const NMiniKQL::TTypeEnvironment& typeEnv,
- const NMiniKQL::THolderFactory& holderFactory, NKikimrKqp::TKqpStreamLookupSettings&& settings);
+ const NMiniKQL::THolderFactory& holderFactory, std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc,
+ NKikimrKqp::TKqpStreamLookupSettings&& settings);
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp
index dab7ce8f5c1..df6b8d5cfbd 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp
@@ -8,7 +8,7 @@ void RegisterStreamLookupActorFactory(NYql::NDq::TDqAsyncIoFactory& factory) {
factory.RegisterInputTransform<NKikimrKqp::TKqpStreamLookupSettings>("StreamLookupInputTransformer", [](NKikimrKqp::TKqpStreamLookupSettings&& settings,
NYql::NDq::TDqAsyncIoFactory::TInputTransformArguments&& args) {
return CreateStreamLookupActor(args.InputIndex, args.TransformInput, args.ComputeActorId, args.TypeEnv,
- args.HolderFactory, std::move(settings));
+ args.HolderFactory, args.Alloc, std::move(settings));
});
}
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
index 19c3c1d6122..b823da97c27 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
@@ -174,6 +174,7 @@ public:
const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv;
const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
::NMonitoring::TDynamicCounterPtr TaskCounters;
+ std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
};
struct TSinkArguments {
@@ -201,6 +202,7 @@ public:
const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv;
const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
NKikimr::NMiniKQL::TProgramBuilder& ProgramBuilder;
+ std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc;
};
struct TOutputTransformArguments {
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 64d3d11547a..d52073182e9 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -1524,7 +1524,8 @@ protected:
.ComputeActorId = this->SelfId(),
.TypeEnv = typeEnv,
.HolderFactory = holderFactory,
- .TaskCounters = TaskCounters
+ .TaskCounters = TaskCounters,
+ .Alloc = TaskRunner ? TaskRunner->GetAllocatorPtr() : nullptr
});
} catch (const std::exception& ex) {
throw yexception() << "Failed to create source " << inputDesc.GetSource().GetType() << ": " << ex.what();
@@ -1552,7 +1553,8 @@ protected:
.ComputeActorId = this->SelfId(),
.TypeEnv = typeEnv,
.HolderFactory = holderFactory,
- .ProgramBuilder = *transform.ProgramBuilder
+ .ProgramBuilder = *transform.ProgramBuilder,
+ .Alloc = TaskRunner->GetAllocatorPtr()
});
} catch (const std::exception& ex) {
throw yexception() << "Failed to create input transform " << inputDesc.GetTransform().GetType() << ": " << ex.what();
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
index 79bb5cc96c2..383a5c0d164 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
@@ -218,6 +218,7 @@ public:
: Context(context)
, Settings(settings)
, LogFunc(logFunc)
+ , AllocatedHolder(std::make_optional<TAllocatedHolder>())
, CollectBasicStats(Settings.CollectBasicStats)
, CollectProfileStats(Settings.CollectProfileStats)
{
@@ -231,12 +232,15 @@ public:
}
if (!Context.Alloc) {
- SelfAlloc = std::make_unique<TScopedAlloc>(__LOCATION__, TAlignedPagePoolCounters(),
- Context.FuncRegistry->SupportsSizedAllocators());
+ SelfAlloc = std::shared_ptr<TScopedAlloc>(new TScopedAlloc(__LOCATION__, TAlignedPagePoolCounters(),
+ Context.FuncRegistry->SupportsSizedAllocators()), [](TScopedAlloc* ptr) {
+ ptr->Acquire();
+ delete ptr;
+ });
}
if (!Context.TypeEnv) {
- SelfTypeEnv = std::make_unique<TTypeEnvironment>(Context.Alloc ? *Context.Alloc : *SelfAlloc);
+ AllocatedHolder->SelfTypeEnv = std::make_unique<TTypeEnvironment>(Context.Alloc ? *Context.Alloc : *SelfAlloc);
}
if (SelfAlloc) {
@@ -247,6 +251,8 @@ public:
~TDqTaskRunner() {
if (SelfAlloc) {
SelfAlloc->Acquire();
+ AllocatedHolder.reset();
+ SelfAlloc->Release();
}
}
@@ -273,12 +279,12 @@ public:
return nullptr;
};
- if (Y_UNLIKELY(CollectProfileStats && !ProgramParsed.StatsRegistry)) {
- ProgramParsed.StatsRegistry = NMiniKQL::CreateDefaultStatsRegistry();
+ if (Y_UNLIKELY(CollectProfileStats && !AllocatedHolder->ProgramParsed.StatsRegistry)) {
+ AllocatedHolder->ProgramParsed.StatsRegistry = NMiniKQL::CreateDefaultStatsRegistry();
}
TComputationPatternOpts opts(alloc.Ref(), typeEnv, taskRunnerFactory,
Context.FuncRegistry, NUdf::EValidateMode::None, validatePolicy, Settings.OptLLVM, EGraphPerProcess::Multi,
- ProgramParsed.StatsRegistry.Get());
+ AllocatedHolder->ProgramParsed.StatsRegistry.Get());
if (!SecureParamsProvider) {
SecureParamsProvider = MakeSimpleSecureParamsProvider(Settings.SecureParams);
@@ -414,20 +420,20 @@ public:
entry = CreateComputationPattern(task, program.GetRaw());
}
- ProgramParsed.PatternCacheEntry = entry;
+ AllocatedHolder->ProgramParsed.PatternCacheEntry = entry;
// clone pattern using TDqTaskRunner's alloc
auto opts = CreatePatternOpts(Alloc(), TypeEnv());
- ProgramParsed.CompGraph = ProgramParsed.GetPattern()->Clone(
+ AllocatedHolder->ProgramParsed.CompGraph = AllocatedHolder->ProgramParsed.GetPattern()->Clone(
opts.ToComputationOptions(*Context.RandomProvider, *Context.TimeProvider, &TypeEnv()));
- TBindTerminator term(ProgramParsed.CompGraph->GetTerminator());
+ TBindTerminator term(AllocatedHolder->ProgramParsed.CompGraph->GetTerminator());
- auto paramNode = ProgramParsed.CompGraph->GetEntryPoint(entry->ProgramInputsCount, /* require */ false);
+ auto paramNode = AllocatedHolder->ProgramParsed.CompGraph->GetEntryPoint(entry->ProgramInputsCount, /* require */ false);
if (paramNode) {
// TODO: Remove serialized parameters that are used in OLAP program and not used in current program
- const auto& graphHolderFactory = ProgramParsed.CompGraph->GetHolderFactory();
+ const auto& graphHolderFactory = AllocatedHolder->ProgramParsed.CompGraph->GetHolderFactory();
NUdf::TUnboxedValue* structMembers;
auto paramsCount = entry->ParamsStruct->GetMembersCount();
auto paramsStructValue = graphHolderFactory.CreateDirectArrayHolder(paramsCount, structMembers);
@@ -454,7 +460,7 @@ public:
}
}
- paramNode->SetValue(ProgramParsed.CompGraph->GetContext(), std::move(paramsStructValue));
+ paramNode->SetValue(AllocatedHolder->ProgramParsed.CompGraph->GetContext(), std::move(paramsStructValue));
} else {
/*
* This situation is ok, when there are OLAP parameters only. There is no parameter node
@@ -480,8 +486,8 @@ public:
LOG(TStringBuilder() << "Prepare task: " << TaskId);
auto startTime = TInstant::Now();
- auto& holderFactory = ProgramParsed.CompGraph->GetHolderFactory();
- TBindTerminator term(ProgramParsed.CompGraph->GetTerminator());
+ auto& holderFactory = AllocatedHolder->ProgramParsed.CompGraph->GetHolderFactory();
+ TBindTerminator term(AllocatedHolder->ProgramParsed.CompGraph->GetTerminator());
auto& typeEnv = TypeEnv();
@@ -493,7 +499,7 @@ public:
TType** inputType = &entry->InputItemTypes[i];
if (inputDesc.HasTransform()) {
const auto& transformDesc = inputDesc.GetTransform();
- transform = &InputTransforms[i];
+ transform = &AllocatedHolder->InputTransforms[i];
Y_VERIFY(!transform->TransformInput);
Y_VERIFY(!transform->TransformOutput);
@@ -527,7 +533,7 @@ public:
if (inputDesc.HasSource()) {
auto source = CreateDqAsyncInputBuffer(i, *inputType,
memoryLimits.ChannelBufferSize, Settings.CollectProfileStats);
- auto [_, inserted] = Sources.emplace(i, source);
+ auto [_, inserted] = AllocatedHolder->Sources.emplace(i, source);
Y_VERIFY(inserted);
inputs.emplace_back(source);
} else {
@@ -536,21 +542,21 @@ public:
auto inputChannel = CreateDqInputChannel(channelId, *inputType,
memoryLimits.ChannelBufferSize, Settings.CollectProfileStats, typeEnv, holderFactory,
inputChannelDesc.GetTransportVersion());
- auto ret = InputChannels.emplace(channelId, inputChannel);
+ auto ret = AllocatedHolder->InputChannels.emplace(channelId, inputChannel);
YQL_ENSURE(ret.second, "task: " << TaskId << ", duplicated input channelId: " << channelId);
inputs.emplace_back(inputChannel);
}
}
- auto entryNode = ProgramParsed.CompGraph->GetEntryPoint(i, true);
+ auto entryNode = AllocatedHolder->ProgramParsed.CompGraph->GetEntryPoint(i, true);
if (transform) {
transform->TransformInput = DqBuildInputValue(inputDesc, transform->TransformInputType, std::move(inputs), holderFactory);
inputs.clear();
inputs.emplace_back(transform->TransformOutput);
- entryNode->SetValue(ProgramParsed.CompGraph->GetContext(),
+ entryNode->SetValue(AllocatedHolder->ProgramParsed.CompGraph->GetContext(),
CreateInputUnionValue(std::move(inputs), holderFactory));
} else {
- entryNode->SetValue(ProgramParsed.CompGraph->GetContext(),
+ entryNode->SetValue(AllocatedHolder->ProgramParsed.CompGraph->GetContext(),
DqBuildInputValue(inputDesc, entry->InputItemTypes[i], std::move(inputs), holderFactory));
}
}
@@ -568,7 +574,7 @@ public:
TType** taskOutputType = &entry->OutputItemTypes[i];
if (outputDesc.HasTransform()) {
const auto& transformDesc = outputDesc.GetTransform();
- transform = &OutputTransforms[i];
+ transform = &AllocatedHolder->OutputTransforms[i];
Y_VERIFY(!transform->TransformInput);
Y_VERIFY(!transform->TransformOutput);
@@ -593,7 +599,7 @@ public:
if (outputDesc.HasSink()) {
auto sink = CreateDqAsyncOutputBuffer(i, *taskOutputType, memoryLimits.ChannelBufferSize,
Settings.CollectProfileStats);
- auto [_, inserted] = Sinks.emplace(i, sink);
+ auto [_, inserted] = AllocatedHolder->Sinks.emplace(i, sink);
Y_VERIFY(inserted);
outputs.emplace_back(sink);
} else {
@@ -615,7 +621,7 @@ public:
auto outputChannel = CreateDqOutputChannel(channelId, *taskOutputType, typeEnv,
holderFactory, settings, LogFunc);
- auto ret = OutputChannels.emplace(channelId, outputChannel);
+ auto ret = AllocatedHolder->OutputChannels.emplace(channelId, outputChannel);
YQL_ENSURE(ret.second, "task: " << TaskId << ", duplicated output channelId: " << channelId);
outputs.emplace_back(outputChannel);
}
@@ -638,12 +644,12 @@ public:
}
if (outputConsumers.empty()) {
- Output = nullptr;
+ AllocatedHolder->Output = nullptr;
} else if (outputConsumers.size() == 1) {
- Output = std::move(outputConsumers[0]);
+ AllocatedHolder->Output = std::move(outputConsumers[0]);
} else {
auto guard = BindAllocator();
- Output = CreateOutputMultiConsumer(std::move(outputConsumers));
+ AllocatedHolder->Output = CreateOutputMultiConsumer(std::move(outputConsumers));
}
auto prepareTime = TInstant::Now() - startTime;
@@ -659,13 +665,13 @@ public:
if (Stats) {
Stats->BuildCpuTime += prepareTime;
- for (auto&[channelId, inputChannel] : InputChannels) {
+ for (auto&[channelId, inputChannel] : AllocatedHolder->InputChannels) {
Stats->InputChannels.emplace(channelId, inputChannel->GetStats());
}
- for (auto&[inputIndex, source] : Sources) {
+ for (auto&[inputIndex, source] : AllocatedHolder->Sources) {
Stats->Sources.emplace(inputIndex, source->GetStats());
}
- for (auto&[channelId, outputChannel] : OutputChannels) {
+ for (auto&[channelId, outputChannel] : AllocatedHolder->OutputChannels) {
Stats->OutputChannels.emplace(channelId, outputChannel->GetStats());
}
}
@@ -673,10 +679,10 @@ public:
ERunStatus Run() final {
LOG(TStringBuilder() << "Run task: " << TaskId);
- if (!ResultStream) {
+ if (!AllocatedHolder->ResultStream) {
auto guard = BindAllocator();
- TBindTerminator term(ProgramParsed.CompGraph->GetTerminator());
- ResultStream = ProgramParsed.CompGraph->GetValue();
+ TBindTerminator term(AllocatedHolder->ProgramParsed.CompGraph->GetTerminator());
+ AllocatedHolder->ResultStream = AllocatedHolder->ProgramParsed.CompGraph->GetValue();
}
RunComputeTime = TDuration::Zero();
@@ -689,9 +695,9 @@ public:
if (Y_UNLIKELY(CollectProfileStats)) {
Stats->ComputeCpuTimeByRun->Collect(RunComputeTime.MilliSeconds());
- if (ProgramParsed.StatsRegistry) {
+ if (AllocatedHolder->ProgramParsed.StatsRegistry) {
Stats->MkqlStats.clear();
- ProgramParsed.StatsRegistry->ForEachStat([this](const TStatKey& key, i64 value) {
+ AllocatedHolder->ProgramParsed.StatsRegistry->ForEachStat([this](const TStatKey& key, i64 value) {
Stats->MkqlStats.emplace_back(TMkqlStat{key, value});
});
}
@@ -732,43 +738,43 @@ public:
}
IDqInputChannel::TPtr GetInputChannel(ui64 channelId) override {
- auto ptr = InputChannels.FindPtr(channelId);
+ auto ptr = AllocatedHolder->InputChannels.FindPtr(channelId);
YQL_ENSURE(ptr, "task: " << TaskId << " does not have input channelId: " << channelId);
return *ptr;
}
IDqAsyncInputBuffer::TPtr GetSource(ui64 inputIndex) override {
- auto ptr = Sources.FindPtr(inputIndex);
+ auto ptr = AllocatedHolder->Sources.FindPtr(inputIndex);
YQL_ENSURE(ptr, "task: " << TaskId << " does not have input index: " << inputIndex);
return *ptr;
}
IDqOutputChannel::TPtr GetOutputChannel(ui64 channelId) override {
- auto ptr = OutputChannels.FindPtr(channelId);
+ auto ptr = AllocatedHolder->OutputChannels.FindPtr(channelId);
YQL_ENSURE(ptr, "task: " << TaskId << " does not have output channelId: " << channelId);
return *ptr;
}
IDqAsyncOutputBuffer::TPtr GetSink(ui64 outputIndex) override {
- auto ptr = Sinks.FindPtr(outputIndex);
+ auto ptr = AllocatedHolder->Sinks.FindPtr(outputIndex);
YQL_ENSURE(ptr, "task: " << TaskId << " does not have output index: " << outputIndex);
return *ptr;
}
std::pair<NUdf::TUnboxedValue, IDqAsyncInputBuffer::TPtr> GetInputTransform(ui64 inputIndex) override {
- auto ptr = InputTransforms.FindPtr(inputIndex);
+ auto ptr = AllocatedHolder->InputTransforms.FindPtr(inputIndex);
YQL_ENSURE(ptr, "task: " << TaskId << " does not have input index: " << inputIndex << " or such transform");
return {ptr->TransformInput, ptr->TransformOutput};
}
std::pair<IDqAsyncOutputBuffer::TPtr, IDqOutputConsumer::TPtr> GetOutputTransform(ui64 outputIndex) override {
- auto ptr = OutputTransforms.FindPtr(outputIndex);
+ auto ptr = AllocatedHolder->OutputTransforms.FindPtr(outputIndex);
YQL_ENSURE(ptr, "task: " << TaskId << " does not have output index: " << outputIndex << " or such transform");
return {ptr->TransformInput, ptr->TransformOutput};
}
TGuard<NKikimr::NMiniKQL::TScopedAlloc> BindAllocator(TMaybe<ui64> memoryLimit = {}) override {
- auto guard = Context.TypeEnv ? Context.TypeEnv->BindAllocator() : SelfTypeEnv->BindAllocator();
+ auto guard = Context.TypeEnv ? Context.TypeEnv->BindAllocator() : AllocatedHolder->SelfTypeEnv->BindAllocator();
if (memoryLimit) {
guard.GetMutex()->SetLimit(*memoryLimit);
}
@@ -776,15 +782,19 @@ public:
}
bool IsAllocatorAttached() override {
- return Context.TypeEnv ? Context.TypeEnv->GetAllocator().IsAttached() : SelfTypeEnv->GetAllocator().IsAttached();
+ return Context.TypeEnv ? Context.TypeEnv->GetAllocator().IsAttached() : AllocatedHolder->SelfTypeEnv->GetAllocator().IsAttached();
}
const NKikimr::NMiniKQL::TTypeEnvironment& GetTypeEnv() const override {
- return Context.TypeEnv ? *Context.TypeEnv : *SelfTypeEnv;
+ return Context.TypeEnv ? *Context.TypeEnv : *AllocatedHolder->SelfTypeEnv;
}
const NKikimr::NMiniKQL::THolderFactory& GetHolderFactory() const override {
- return ProgramParsed.CompGraph->GetHolderFactory();
+ return AllocatedHolder->ProgramParsed.CompGraph->GetHolderFactory();
+ }
+
+ std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> GetAllocatorPtr() const override {
+ return SelfAlloc;
}
const THashMap<TString, TString>& GetSecureParams() const override {
@@ -810,17 +820,17 @@ public:
}
TString Save() const override {
- return ProgramParsed.CompGraph->SaveGraphState();
+ return AllocatedHolder->ProgramParsed.CompGraph->SaveGraphState();
}
void Load(TStringBuf in) override {
- Y_VERIFY(!ResultStream);
- ProgramParsed.CompGraph->LoadGraphState(in);
+ Y_VERIFY(!AllocatedHolder->ResultStream);
+ AllocatedHolder->ProgramParsed.CompGraph->LoadGraphState(in);
}
private:
NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv() {
- return Context.TypeEnv ? *Context.TypeEnv : *SelfTypeEnv;
+ return Context.TypeEnv ? *Context.TypeEnv : *AllocatedHolder->SelfTypeEnv;
}
NKikimr::NMiniKQL::TScopedAlloc& Alloc() {
@@ -829,16 +839,16 @@ private:
void FinishImpl() {
LOG(TStringBuilder() << "task" << TaskId << ", execution finished, finish consumers");
- Output->Finish();
+ AllocatedHolder->Output->Finish();
}
ERunStatus FetchAndDispatch() {
- if (!Output) {
+ if (!AllocatedHolder->Output) {
LOG("no consumers, Finish execution");
return ERunStatus::Finished;
}
- TBindTerminator term(ProgramParsed.CompGraph->GetTerminator());
+ TBindTerminator term(AllocatedHolder->ProgramParsed.CompGraph->GetTerminator());
auto startComputeTime = TInstant::Now();
Y_DEFER {
@@ -852,15 +862,15 @@ private:
};
auto guard = BindAllocator();
- if (Output->IsFinishing()) {
- if (Output->TryFinish()) {
+ if (AllocatedHolder->Output->IsFinishing()) {
+ if (AllocatedHolder->Output->TryFinish()) {
FinishImpl();
return ERunStatus::Finished;
} else {
return ERunStatus::PendingOutput;
}
}
- while (!Output->IsFull()) {
+ while (!AllocatedHolder->Output->IsFull()) {
if (Y_UNLIKELY(CollectProfileStats)) {
auto now = TInstant::Now();
StopWaitingOutput(now);
@@ -868,15 +878,15 @@ private:
}
NUdf::TUnboxedValue value;
- auto fetchStatus = ResultStream.Fetch(value);
+ auto fetchStatus = AllocatedHolder->ResultStream.Fetch(value);
switch (fetchStatus) {
case NUdf::EFetchStatus::Ok: {
- Output->Consume(std::move(value));
+ AllocatedHolder->Output->Consume(std::move(value));
break;
}
case NUdf::EFetchStatus::Finish: {
- if (!Output->TryFinish()) {
+ if (!AllocatedHolder->Output->TryFinish()) {
break;
}
FinishImpl();
@@ -898,8 +908,7 @@ private:
TLogFunc LogFunc;
std::unique_ptr<NUdf::ISecureParamsProvider> SecureParamsProvider;
- std::unique_ptr<NKikimr::NMiniKQL::TScopedAlloc> SelfAlloc; // if not set -> use Context.Alloc
- std::unique_ptr<NKikimr::NMiniKQL::TTypeEnvironment> SelfTypeEnv; // if not set -> use Context.TypeEnv
+ std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> SelfAlloc; // if not set -> use Context.Alloc
struct TInputTransformInfo {
NUdf::TUnboxedValue TransformInput;
@@ -922,16 +931,24 @@ private:
return PatternCacheEntry->Pattern.Get();
}
};
- TProgramParsed ProgramParsed;
-
- THashMap<ui64, IDqInputChannel::TPtr> InputChannels; // Channel id -> Channel
- THashMap<ui64, IDqAsyncInputBuffer::TPtr> Sources; // Input index -> Source
- THashMap<ui64, TInputTransformInfo> InputTransforms; // Output index -> Transform
- THashMap<ui64, IDqOutputChannel::TPtr> OutputChannels; // Channel id -> Channel
- THashMap<ui64, IDqAsyncOutputBuffer::TPtr> Sinks; // Output index -> Sink
- THashMap<ui64, TOutputTransformInfo> OutputTransforms; // Output index -> Transform
- IDqOutputConsumer::TPtr Output;
- NUdf::TUnboxedValue ResultStream;
+
+ struct TAllocatedHolder {
+ std::unique_ptr<NKikimr::NMiniKQL::TTypeEnvironment> SelfTypeEnv; // if not set -> use Context.TypeEnv
+
+ TProgramParsed ProgramParsed;
+
+ THashMap<ui64, IDqInputChannel::TPtr> InputChannels; // Channel id -> Channel
+ THashMap<ui64, IDqAsyncInputBuffer::TPtr> Sources; // Input index -> Source
+ THashMap<ui64, TInputTransformInfo> InputTransforms; // Output index -> Transform
+ THashMap<ui64, IDqOutputChannel::TPtr> OutputChannels; // Channel id -> Channel
+ THashMap<ui64, IDqAsyncOutputBuffer::TPtr> Sinks; // Output index -> Sink
+ THashMap<ui64, TOutputTransformInfo> OutputTransforms; // Output index -> Transform
+
+ IDqOutputConsumer::TPtr Output;
+ NUdf::TUnboxedValue ResultStream;
+ };
+
+ std::optional<TAllocatedHolder> AllocatedHolder;
NKikimr::NMiniKQL::TWatermark Watermark;
bool TaskHasEffects = false;
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
index 262fe77413e..0e6960b126c 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
@@ -311,6 +311,7 @@ public:
virtual bool IsAllocatorAttached() = 0;
virtual const NKikimr::NMiniKQL::TTypeEnvironment& GetTypeEnv() const = 0;
virtual const NKikimr::NMiniKQL::THolderFactory& GetHolderFactory() const = 0;
+ virtual std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> GetAllocatorPtr() const = 0;
virtual const THashMap<TString, TString>& GetSecureParams() const = 0;
virtual const THashMap<TString, TString>& GetTaskParams() const = 0;
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
index fda87142148..f8efe2d3d36 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
@@ -1244,10 +1244,9 @@ public:
const TString& traceId)
: TraceId(traceId)
, Task(task)
- , Alloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true)
- , TypeEnv(Alloc)
- , MemInfo("TDqTaskRunnerProxy")
- , HolderFactory(Alloc.Ref(), MemInfo)
+ , Alloc(new NKikimr::NMiniKQL::TScopedAlloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), true),
+ [](NKikimr::NMiniKQL::TScopedAlloc* ptr) { ptr->Acquire(); delete ptr; })
+ , AllocatedHolder(std::make_optional<TAllocatedHolder>(*Alloc, "TDqTaskRunnerProxy"))
, Running(true)
, Command(std::move(command))
, StderrReader(MakeHolder<TThread>([this] () { ReadStderr(); }))
@@ -1256,13 +1255,15 @@ public:
, TaskId(Task.GetId())
, StageId(stageId)
{
- Alloc.Release();
+ Alloc->Release();
StderrReader->Start();
InitTaskMeta();
}
~TTaskRunner() {
- Alloc.Acquire();
+ Alloc->Acquire();
+ AllocatedHolder.reset();
+ Alloc->Release();
Command->Kill();
Command->Wait(TDuration::Seconds(0));
}
@@ -1343,11 +1344,15 @@ public:
}
const NMiniKQL::TTypeEnvironment& GetTypeEnv() const override {
- return TypeEnv;
+ return AllocatedHolder->TypeEnv;
}
const NMiniKQL::THolderFactory& GetHolderFactory() const override {
- return HolderFactory;
+ return AllocatedHolder->HolderFactory;
+ }
+
+ std::shared_ptr<NMiniKQL::TScopedAlloc> GetAllocatorPtr() const {
+ return Alloc;
}
const THashMap<TString, TString>& GetSecureParams() const override {
@@ -1359,7 +1364,7 @@ public:
}
TGuard<NKikimr::NMiniKQL::TScopedAlloc> BindAllocator(TMaybe<ui64> memoryLimit) override {
- auto guard = TypeEnv.BindAllocator();
+ auto guard = AllocatedHolder->TypeEnv.BindAllocator();
if (memoryLimit) {
guard.GetMutex()->SetLimit(*memoryLimit);
}
@@ -1367,7 +1372,7 @@ public:
}
bool IsAllocatorAttached() override {
- return TypeEnv.GetAllocator().IsAttached();
+ return AllocatedHolder->TypeEnv.GetAllocator().IsAttached();
}
void Kill() override {
@@ -1435,10 +1440,20 @@ private:
THashMap<TString, TString> SecureParams;
THashMap<TString, TString> TaskParams;
- NKikimr::NMiniKQL::TScopedAlloc Alloc;
- NKikimr::NMiniKQL::TTypeEnvironment TypeEnv;
- NKikimr::NMiniKQL::TMemoryUsageInfo MemInfo;
- NKikimr::NMiniKQL::THolderFactory HolderFactory;
+ std::shared_ptr <NKikimr::NMiniKQL::TScopedAlloc> Alloc;
+
+ struct TAllocatedHolder {
+ TAllocatedHolder(NKikimr::NMiniKQL::TScopedAlloc& alloc, const TStringBuf& memInfoTitle)
+ : TypeEnv(alloc)
+ , MemInfo(memInfoTitle)
+ , HolderFactory(alloc.Ref(), MemInfo) {}
+
+ NKikimr::NMiniKQL::TTypeEnvironment TypeEnv;
+ NKikimr::NMiniKQL::TMemoryUsageInfo MemInfo;
+ NKikimr::NMiniKQL::THolderFactory HolderFactory;
+ };
+
+ std::optional<TAllocatedHolder> AllocatedHolder;
std::atomic<bool> Running;
int Code = -1;
@@ -1565,6 +1580,10 @@ public:
return Delegate->GetHolderFactory();
}
+ std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> GetAllocatorPtr() const override {
+ return Delegate->GetAllocatorPtr();
+ }
+
const THashMap<TString, TString>& GetSecureParams() const override {
return Delegate->GetSecureParams();
}