diff options
author | vvvv <[email protected]> | 2025-10-08 11:41:14 +0300 |
---|---|---|
committer | vvvv <[email protected]> | 2025-10-08 12:20:42 +0300 |
commit | d73f13cfdb331365ddad0da51ec36e0a3e4cf187 (patch) | |
tree | 88cab10170ce9aa3389be7f1a09247386dcf5ebd /yql/essentials/public/purecalc/common/worker.cpp | |
parent | f377d8ad9e0741cd904c1d4934afdf24af517d93 (diff) |
YQL-20086 public
commit_hash:68b0c2e9c2960587af7d57ecedcb38f4d05890b7
Diffstat (limited to 'yql/essentials/public/purecalc/common/worker.cpp')
-rw-r--r-- | yql/essentials/public/purecalc/common/worker.cpp | 124 |
1 files changed, 56 insertions, 68 deletions
diff --git a/yql/essentials/public/purecalc/common/worker.cpp b/yql/essentials/public/purecalc/common/worker.cpp index d5561fc4f73..e6a32ff4118 100644 --- a/yql/essentials/public/purecalc/common/worker.cpp +++ b/yql/essentials/public/purecalc/common/worker.cpp @@ -46,15 +46,12 @@ TWorkerGraph::TWorkerGraph( ui64 nativeYtTypeFlags, TMaybe<ui64> deterministicTimeProviderSeed, TLangVersion langver, - bool insideEvaluation -) + bool insideEvaluation) : ScopedAlloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), funcRegistry.SupportsSizedAllocators()) , Env(ScopedAlloc) , FuncRegistry(funcRegistry) , RandomProvider(CreateDefaultRandomProvider()) - , TimeProvider(deterministicTimeProviderSeed ? - CreateDeterministicTimeProvider(*deterministicTimeProviderSeed) : - CreateDefaultTimeProvider()) + , TimeProvider(deterministicTimeProviderSeed ? CreateDeterministicTimeProvider(*deterministicTimeProviderSeed) : CreateDefaultTimeProvider()) , LLVMSettings(LLVMSettings) , NativeYtTypeFlags(nativeYtTypeFlags) { @@ -124,19 +121,16 @@ TWorkerGraph::TWorkerGraph( const THashSet<NKikimr::NMiniKQL::TInternName> selfCallableNames = { Env.InternName(PurecalcInputCallableName), - Env.InternName(PurecalcBlockInputCallableName) - }; + Env.InternName(PurecalcBlockInputCallableName)}; NKikimr::NMiniKQL::TExploringNodeVisitor explorer; explorer.Walk(rootNode.GetNode(), Env.GetNodeStack()); auto compositeNodeFactory = NKikimr::NMiniKQL::GetCompositeWithBuiltinFactory( - {NKikimr::NMiniKQL::GetYqlFactory(), NYql::GetPgFactory()} - ); + {NKikimr::NMiniKQL::GetYqlFactory(), NYql::GetPgFactory()}); auto nodeFactory = [&]( - NKikimr::NMiniKQL::TCallable& callable, const NKikimr::NMiniKQL::TComputationNodeFactoryContext& ctx - ) -> NKikimr::NMiniKQL::IComputationNode* { + NKikimr::NMiniKQL::TCallable& callable, const NKikimr::NMiniKQL::TComputationNodeFactoryContext& ctx) -> NKikimr::NMiniKQL::IComputationNode* { if (selfCallableNames.contains(callable.GetType()->GetNameStr())) { if (insideEvaluation) { throw TErrorException(0) << "Inputs aren't available during evaluation"; @@ -147,8 +141,7 @@ TWorkerGraph::TWorkerGraph( YQL_ENSURE(inputIndex < inputsCount, "Self index is out of range"); YQL_ENSURE(!SelfNodes[inputIndex], "Self can be called at most once with each index"); return SelfNodes[inputIndex] = new NKikimr::NMiniKQL::TExternalComputationNode(ctx.Mutables); - } - else { + } else { return compositeNodeFactory(callable, ctx); } }; @@ -171,7 +164,7 @@ TWorkerGraph::TWorkerGraph( ComputationPattern = NKikimr::NMiniKQL::MakeComputationPattern( explorer, rootNode, - { rootNode.GetNode() }, + {rootNode.GetNode()}, computationPatternOpts); ComputationGraph = ComputationPattern->Clone( @@ -206,12 +199,11 @@ TWorker<TBase>::TWorker( NKikimr::NUdf::ICountersProvider* countersProvider, ui64 nativeYtTypeFlags, TMaybe<ui64> deterministicTimeProviderSeed, - TLangVersion langver -) + TLangVersion langver) : WorkerFactory_(std::move(factory)) , Graph_(exprRoot, exprCtx, serializedProgram, funcRegistry, userData, - inputTypes, originalInputTypes, rawInputTypes, outputType, rawOutputType, - LLVMSettings, countersProvider, nativeYtTypeFlags, deterministicTimeProviderSeed, langver, false) + inputTypes, originalInputTypes, rawInputTypes, outputType, rawOutputType, + LLVMSettings, countersProvider, nativeYtTypeFlags, deterministicTimeProviderSeed, langver, false) { } @@ -404,9 +396,9 @@ NKikimr::NUdf::TUnboxedValue& TPullStreamWorker::GetOutput() { } void TPullStreamWorker::Release() { - with_lock(GetScopedAlloc()) { + with_lock (GetScopedAlloc()) { Output_ = NKikimr::NUdf::TUnboxedValue::Invalid(); - for (auto selfNode: Graph_.SelfNodes) { + for (auto selfNode : Graph_.SelfNodes) { if (selfNode) { selfNode->SetValue(Graph_.ComputationGraph->GetContext(), NKikimr::NUdf::TUnboxedValue::Invalid()); } @@ -477,11 +469,11 @@ void TPullListWorker::ResetOutputIterator() { } void TPullListWorker::Release() { - with_lock(GetScopedAlloc()) { + with_lock (GetScopedAlloc()) { Output_ = NKikimr::NUdf::TUnboxedValue::Invalid(); OutputIterator_ = NKikimr::NUdf::TUnboxedValue::Invalid(); - for (auto selfNode: Graph_.SelfNodes) { + for (auto selfNode : Graph_.SelfNodes) { if (selfNode) { selfNode->SetValue(Graph_.ComputationGraph->GetContext(), NKikimr::NUdf::TUnboxedValue::Invalid()); } @@ -492,45 +484,45 @@ void TPullListWorker::Release() { } namespace { - class TPushStream final: public NKikimr::NMiniKQL::TCustomListValue { - private: - mutable bool HasIterator_ = false; - bool HasValue_ = false; - bool IsFinished_ = false; - NKikimr::NUdf::TUnboxedValue Value_ = NKikimr::NUdf::TUnboxedValue::Invalid(); - - public: - using TCustomListValue::TCustomListValue; - - public: - void SetValue(NKikimr::NUdf::TUnboxedValue&& value) { - Value_ = std::move(value); - HasValue_ = true; - } +class TPushStream final: public NKikimr::NMiniKQL::TCustomListValue { +private: + mutable bool HasIterator_ = false; + bool HasValue_ = false; + bool IsFinished_ = false; + NKikimr::NUdf::TUnboxedValue Value_ = NKikimr::NUdf::TUnboxedValue::Invalid(); + +public: + using TCustomListValue::TCustomListValue; + +public: + void SetValue(NKikimr::NUdf::TUnboxedValue&& value) { + Value_ = std::move(value); + HasValue_ = true; + } - void SetFinished() { - IsFinished_ = true; - } + void SetFinished() { + IsFinished_ = true; + } - NKikimr::NUdf::TUnboxedValue GetListIterator() const override { - YQL_ENSURE(!HasIterator_, "only one pass over input is supported"); - HasIterator_ = true; - return NKikimr::NUdf::TUnboxedValuePod(const_cast<TPushStream*>(this)); - } + NKikimr::NUdf::TUnboxedValue GetListIterator() const override { + YQL_ENSURE(!HasIterator_, "only one pass over input is supported"); + HasIterator_ = true; + return NKikimr::NUdf::TUnboxedValuePod(const_cast<TPushStream*>(this)); + } - NKikimr::NUdf::EFetchStatus Fetch(NKikimr::NUdf::TUnboxedValue& result) override { - if (IsFinished_) { - return NKikimr::NUdf::EFetchStatus::Finish; - } else if (!HasValue_) { - return NKikimr::NUdf::EFetchStatus::Yield; - } else { - result = std::move(Value_); - HasValue_ = false; - return NKikimr::NUdf::EFetchStatus::Ok; - } + NKikimr::NUdf::EFetchStatus Fetch(NKikimr::NUdf::TUnboxedValue& result) override { + if (IsFinished_) { + return NKikimr::NUdf::EFetchStatus::Finish; + } else if (!HasValue_) { + return NKikimr::NUdf::EFetchStatus::Yield; + } else { + result = std::move(Value_); + HasValue_ = false; + return NKikimr::NUdf::EFetchStatus::Ok; } - }; -} + } +}; +} // namespace void TPushStreamWorker::FeedToConsumer() { auto value = Graph_.ComputationGraph->GetValue(); @@ -608,7 +600,7 @@ void TPushStreamWorker::OnFinish() { } void TPushStreamWorker::Release() { - with_lock(GetScopedAlloc()) { + with_lock (GetScopedAlloc()) { Consumer_.Destroy(); if (SelfNode_) { SelfNode_->SetValue(Graph_.ComputationGraph->GetContext(), NKikimr::NUdf::TUnboxedValue::Invalid()); @@ -619,16 +611,12 @@ void TPushStreamWorker::Release() { TWorker<IPushStreamWorker>::Release(); } - namespace NYql { - namespace NPureCalc { - template - class TWorker<IPullStreamWorker>; +namespace NPureCalc { +template class TWorker<IPullStreamWorker>; - template - class TWorker<IPullListWorker>; +template class TWorker<IPullListWorker>; - template - class TWorker<IPushStreamWorker>; - } -} +template class TWorker<IPushStreamWorker>; +} // namespace NPureCalc +} // namespace NYql |