summaryrefslogtreecommitdiffstats
path: root/yql/essentials/public/purecalc/common/worker.cpp
diff options
context:
space:
mode:
authorvvvv <[email protected]>2025-10-08 11:41:14 +0300
committervvvv <[email protected]>2025-10-08 12:20:42 +0300
commitd73f13cfdb331365ddad0da51ec36e0a3e4cf187 (patch)
tree88cab10170ce9aa3389be7f1a09247386dcf5ebd /yql/essentials/public/purecalc/common/worker.cpp
parentf377d8ad9e0741cd904c1d4934afdf24af517d93 (diff)
YQL-20086 public
commit_hash:68b0c2e9c2960587af7d57ecedcb38f4d05890b7
Diffstat (limited to 'yql/essentials/public/purecalc/common/worker.cpp')
-rw-r--r--yql/essentials/public/purecalc/common/worker.cpp124
1 files changed, 56 insertions, 68 deletions
diff --git a/yql/essentials/public/purecalc/common/worker.cpp b/yql/essentials/public/purecalc/common/worker.cpp
index d5561fc4f73..e6a32ff4118 100644
--- a/yql/essentials/public/purecalc/common/worker.cpp
+++ b/yql/essentials/public/purecalc/common/worker.cpp
@@ -46,15 +46,12 @@ TWorkerGraph::TWorkerGraph(
ui64 nativeYtTypeFlags,
TMaybe<ui64> deterministicTimeProviderSeed,
TLangVersion langver,
- bool insideEvaluation
-)
+ bool insideEvaluation)
: ScopedAlloc(__LOCATION__, NKikimr::TAlignedPagePoolCounters(), funcRegistry.SupportsSizedAllocators())
, Env(ScopedAlloc)
, FuncRegistry(funcRegistry)
, RandomProvider(CreateDefaultRandomProvider())
- , TimeProvider(deterministicTimeProviderSeed ?
- CreateDeterministicTimeProvider(*deterministicTimeProviderSeed) :
- CreateDefaultTimeProvider())
+ , TimeProvider(deterministicTimeProviderSeed ? CreateDeterministicTimeProvider(*deterministicTimeProviderSeed) : CreateDefaultTimeProvider())
, LLVMSettings(LLVMSettings)
, NativeYtTypeFlags(nativeYtTypeFlags)
{
@@ -124,19 +121,16 @@ TWorkerGraph::TWorkerGraph(
const THashSet<NKikimr::NMiniKQL::TInternName> selfCallableNames = {
Env.InternName(PurecalcInputCallableName),
- Env.InternName(PurecalcBlockInputCallableName)
- };
+ Env.InternName(PurecalcBlockInputCallableName)};
NKikimr::NMiniKQL::TExploringNodeVisitor explorer;
explorer.Walk(rootNode.GetNode(), Env.GetNodeStack());
auto compositeNodeFactory = NKikimr::NMiniKQL::GetCompositeWithBuiltinFactory(
- {NKikimr::NMiniKQL::GetYqlFactory(), NYql::GetPgFactory()}
- );
+ {NKikimr::NMiniKQL::GetYqlFactory(), NYql::GetPgFactory()});
auto nodeFactory = [&](
- NKikimr::NMiniKQL::TCallable& callable, const NKikimr::NMiniKQL::TComputationNodeFactoryContext& ctx
- ) -> NKikimr::NMiniKQL::IComputationNode* {
+ NKikimr::NMiniKQL::TCallable& callable, const NKikimr::NMiniKQL::TComputationNodeFactoryContext& ctx) -> NKikimr::NMiniKQL::IComputationNode* {
if (selfCallableNames.contains(callable.GetType()->GetNameStr())) {
if (insideEvaluation) {
throw TErrorException(0) << "Inputs aren't available during evaluation";
@@ -147,8 +141,7 @@ TWorkerGraph::TWorkerGraph(
YQL_ENSURE(inputIndex < inputsCount, "Self index is out of range");
YQL_ENSURE(!SelfNodes[inputIndex], "Self can be called at most once with each index");
return SelfNodes[inputIndex] = new NKikimr::NMiniKQL::TExternalComputationNode(ctx.Mutables);
- }
- else {
+ } else {
return compositeNodeFactory(callable, ctx);
}
};
@@ -171,7 +164,7 @@ TWorkerGraph::TWorkerGraph(
ComputationPattern = NKikimr::NMiniKQL::MakeComputationPattern(
explorer,
rootNode,
- { rootNode.GetNode() },
+ {rootNode.GetNode()},
computationPatternOpts);
ComputationGraph = ComputationPattern->Clone(
@@ -206,12 +199,11 @@ TWorker<TBase>::TWorker(
NKikimr::NUdf::ICountersProvider* countersProvider,
ui64 nativeYtTypeFlags,
TMaybe<ui64> deterministicTimeProviderSeed,
- TLangVersion langver
-)
+ TLangVersion langver)
: WorkerFactory_(std::move(factory))
, Graph_(exprRoot, exprCtx, serializedProgram, funcRegistry, userData,
- inputTypes, originalInputTypes, rawInputTypes, outputType, rawOutputType,
- LLVMSettings, countersProvider, nativeYtTypeFlags, deterministicTimeProviderSeed, langver, false)
+ inputTypes, originalInputTypes, rawInputTypes, outputType, rawOutputType,
+ LLVMSettings, countersProvider, nativeYtTypeFlags, deterministicTimeProviderSeed, langver, false)
{
}
@@ -404,9 +396,9 @@ NKikimr::NUdf::TUnboxedValue& TPullStreamWorker::GetOutput() {
}
void TPullStreamWorker::Release() {
- with_lock(GetScopedAlloc()) {
+ with_lock (GetScopedAlloc()) {
Output_ = NKikimr::NUdf::TUnboxedValue::Invalid();
- for (auto selfNode: Graph_.SelfNodes) {
+ for (auto selfNode : Graph_.SelfNodes) {
if (selfNode) {
selfNode->SetValue(Graph_.ComputationGraph->GetContext(), NKikimr::NUdf::TUnboxedValue::Invalid());
}
@@ -477,11 +469,11 @@ void TPullListWorker::ResetOutputIterator() {
}
void TPullListWorker::Release() {
- with_lock(GetScopedAlloc()) {
+ with_lock (GetScopedAlloc()) {
Output_ = NKikimr::NUdf::TUnboxedValue::Invalid();
OutputIterator_ = NKikimr::NUdf::TUnboxedValue::Invalid();
- for (auto selfNode: Graph_.SelfNodes) {
+ for (auto selfNode : Graph_.SelfNodes) {
if (selfNode) {
selfNode->SetValue(Graph_.ComputationGraph->GetContext(), NKikimr::NUdf::TUnboxedValue::Invalid());
}
@@ -492,45 +484,45 @@ void TPullListWorker::Release() {
}
namespace {
- class TPushStream final: public NKikimr::NMiniKQL::TCustomListValue {
- private:
- mutable bool HasIterator_ = false;
- bool HasValue_ = false;
- bool IsFinished_ = false;
- NKikimr::NUdf::TUnboxedValue Value_ = NKikimr::NUdf::TUnboxedValue::Invalid();
-
- public:
- using TCustomListValue::TCustomListValue;
-
- public:
- void SetValue(NKikimr::NUdf::TUnboxedValue&& value) {
- Value_ = std::move(value);
- HasValue_ = true;
- }
+class TPushStream final: public NKikimr::NMiniKQL::TCustomListValue {
+private:
+ mutable bool HasIterator_ = false;
+ bool HasValue_ = false;
+ bool IsFinished_ = false;
+ NKikimr::NUdf::TUnboxedValue Value_ = NKikimr::NUdf::TUnboxedValue::Invalid();
+
+public:
+ using TCustomListValue::TCustomListValue;
+
+public:
+ void SetValue(NKikimr::NUdf::TUnboxedValue&& value) {
+ Value_ = std::move(value);
+ HasValue_ = true;
+ }
- void SetFinished() {
- IsFinished_ = true;
- }
+ void SetFinished() {
+ IsFinished_ = true;
+ }
- NKikimr::NUdf::TUnboxedValue GetListIterator() const override {
- YQL_ENSURE(!HasIterator_, "only one pass over input is supported");
- HasIterator_ = true;
- return NKikimr::NUdf::TUnboxedValuePod(const_cast<TPushStream*>(this));
- }
+ NKikimr::NUdf::TUnboxedValue GetListIterator() const override {
+ YQL_ENSURE(!HasIterator_, "only one pass over input is supported");
+ HasIterator_ = true;
+ return NKikimr::NUdf::TUnboxedValuePod(const_cast<TPushStream*>(this));
+ }
- NKikimr::NUdf::EFetchStatus Fetch(NKikimr::NUdf::TUnboxedValue& result) override {
- if (IsFinished_) {
- return NKikimr::NUdf::EFetchStatus::Finish;
- } else if (!HasValue_) {
- return NKikimr::NUdf::EFetchStatus::Yield;
- } else {
- result = std::move(Value_);
- HasValue_ = false;
- return NKikimr::NUdf::EFetchStatus::Ok;
- }
+ NKikimr::NUdf::EFetchStatus Fetch(NKikimr::NUdf::TUnboxedValue& result) override {
+ if (IsFinished_) {
+ return NKikimr::NUdf::EFetchStatus::Finish;
+ } else if (!HasValue_) {
+ return NKikimr::NUdf::EFetchStatus::Yield;
+ } else {
+ result = std::move(Value_);
+ HasValue_ = false;
+ return NKikimr::NUdf::EFetchStatus::Ok;
}
- };
-}
+ }
+};
+} // namespace
void TPushStreamWorker::FeedToConsumer() {
auto value = Graph_.ComputationGraph->GetValue();
@@ -608,7 +600,7 @@ void TPushStreamWorker::OnFinish() {
}
void TPushStreamWorker::Release() {
- with_lock(GetScopedAlloc()) {
+ with_lock (GetScopedAlloc()) {
Consumer_.Destroy();
if (SelfNode_) {
SelfNode_->SetValue(Graph_.ComputationGraph->GetContext(), NKikimr::NUdf::TUnboxedValue::Invalid());
@@ -619,16 +611,12 @@ void TPushStreamWorker::Release() {
TWorker<IPushStreamWorker>::Release();
}
-
namespace NYql {
- namespace NPureCalc {
- template
- class TWorker<IPullStreamWorker>;
+namespace NPureCalc {
+template class TWorker<IPullStreamWorker>;
- template
- class TWorker<IPullListWorker>;
+template class TWorker<IPullListWorker>;
- template
- class TWorker<IPushStreamWorker>;
- }
-}
+template class TWorker<IPushStreamWorker>;
+} // namespace NPureCalc
+} // namespace NYql