diff options
| author | vvvv <[email protected]> | 2025-10-09 12:25:18 +0300 |
|---|---|---|
| committer | vvvv <[email protected]> | 2025-10-09 12:57:17 +0300 |
| commit | cb77d014972b2cdb27d2e6d979fc3a2772b27ad4 (patch) | |
| tree | 7f3bcd8ce71c6bd0f3ccc11e31b9f665475b819e /yql/essentials/minikql/comp_nodes/mkql_queue.cpp | |
| parent | d58a8990d353b051c27e1069141117fdfde64358 (diff) | |
YQL-20086 minikql
commit_hash:e96f7390db5fcbe7e9f64f898141a263ad522daa
Diffstat (limited to 'yql/essentials/minikql/comp_nodes/mkql_queue.cpp')
| -rw-r--r-- | yql/essentials/minikql/comp_nodes/mkql_queue.cpp | 102 |
1 files changed, 58 insertions, 44 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_queue.cpp b/yql/essentials/minikql/comp_nodes/mkql_queue.cpp index 79f2b43654d..371c62912ca 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_queue.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_queue.cpp @@ -11,7 +11,7 @@ namespace NMiniKQL { namespace { -class TQueueResource : public TComputationValue<TQueueResource> { +class TQueueResource: public TComputationValue<TQueueResource> { public: TQueueResource(TMemoryUsageInfo* memInfo, const TStringBuf& tag, TMaybe<ui64> capacity, ui64 initSize) : TComputationValue(memInfo) @@ -73,7 +73,8 @@ protected: TQueueResourceUser::TQueueResourceUser(TStringBuf&& tag, IComputationNode* resource) : Tag(tag) , Resource(resource) -{} +{ +} TSafeCircularBuffer<TUnboxedValue>& TQueueResourceUser::CheckAndGetBuffer(const TUnboxedValuePod& resource) const { return GetResource(resource).GetBuffer(); @@ -89,8 +90,9 @@ TQueueResource& TQueueResourceUser::GetResource(const TUnboxedValuePod& resource return *static_cast<TQueueResource*>(resource.GetResource()); } -class TQueueCreateWrapper : public TMutableComputationNode<TQueueCreateWrapper> { +class TQueueCreateWrapper: public TMutableComputationNode<TQueueCreateWrapper> { typedef TMutableComputationNode<TQueueCreateWrapper> TBaseComputation; + public: TQueueCreateWrapper(TComputationMutables& mutables, TComputationNodePtrVector&& dependentNodes, const TString& name, TMaybe<ui64> capacity, ui64 initSize) : TBaseComputation(mutables) @@ -98,7 +100,8 @@ public: , Name(name) , Capacity(capacity) , InitSize(initSize) - {} + { + } NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { return NUdf::TUnboxedValuePod(new TQueueResource(&ctx.HolderFactory.GetMemInfo(), Name, Capacity, InitSize)); @@ -115,14 +118,16 @@ private: const ui64 InitSize; }; -class TQueuePushWrapper : public TMutableComputationNode<TQueuePushWrapper>, public TQueueResourceUser { +class TQueuePushWrapper: public TMutableComputationNode<TQueuePushWrapper>, public TQueueResourceUser { typedef TMutableComputationNode<TQueuePushWrapper> TBaseComputation; + public: TQueuePushWrapper(TComputationMutables& mutables, const TResourceType* resourceType, IComputationNode* resource, IComputationNode* value) : TBaseComputation(mutables) , TQueueResourceUser(resourceType->GetTag(), resource) , Value(value) - {} + { + } NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { auto resource = Resource->GetValue(ctx); @@ -143,13 +148,15 @@ private: IComputationNode* const Value; }; -class TQueuePopWrapper : public TMutableComputationNode<TQueuePopWrapper>, public TQueueResourceUser { +class TQueuePopWrapper: public TMutableComputationNode<TQueuePopWrapper>, public TQueueResourceUser { typedef TMutableComputationNode<TQueuePopWrapper> TBaseComputation; + public: TQueuePopWrapper(TComputationMutables& mutables, const TResourceType* resourceType, IComputationNode* resource) : TBaseComputation(mutables) , TQueueResourceUser(resourceType->GetTag(), resource) - {} + { + } NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { auto resource = Resource->GetValue(ctx); @@ -163,14 +170,17 @@ private: } }; -class TQueuePeekWrapper : public TMutableComputationNode<TQueuePeekWrapper>, public TQueueResourceUser { +class TQueuePeekWrapper: public TMutableComputationNode<TQueuePeekWrapper>, public TQueueResourceUser { typedef TMutableComputationNode<TQueuePeekWrapper> TBaseComputation; + public: TQueuePeekWrapper(TComputationMutables& mutables, TComputationNodePtrVector&& dependentNodes, const TResourceType* resourceType, IComputationNode* resource, IComputationNode* index) : TBaseComputation(mutables) , TQueueResourceUser(resourceType->GetTag(), resource) - , Index(index), DependentNodes(std::move(dependentNodes)) - {} + , Index(index) + , DependentNodes(std::move(dependentNodes)) + { + } NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { auto resource = Resource->GetValue(ctx); @@ -190,12 +200,13 @@ private: const TComputationNodePtrVector DependentNodes; }; -class TQueueRangeWrapper : public TMutableComputationNode<TQueueRangeWrapper>, public TQueueResourceUser { +class TQueueRangeWrapper: public TMutableComputationNode<TQueueRangeWrapper>, public TQueueResourceUser { typedef TMutableComputationNode<TQueueRangeWrapper> TBaseComputation; + public: - class TValue : public TComputationValue<TValue>, public TQueueResourceUser { + class TValue: public TComputationValue<TValue>, public TQueueResourceUser { public: - class TIterator : public TComputationValue<TIterator>, public TQueueResourceUser { + class TIterator: public TComputationValue<TIterator>, public TQueueResourceUser { public: TIterator(TMemoryUsageInfo* memInfo, TUnboxedValue queue, size_t begin, size_t end, ui64 generation, TStringBuf tag, IComputationNode* resource) : TComputationValue<TIterator>(memInfo) @@ -211,7 +222,7 @@ public: private: bool Next(NUdf::TUnboxedValue& value) override { MKQL_ENSURE(Generation == Buffer.Generation(), - "Queue generation changed while doing QueueRange: expected " << Generation << ", got: " << Buffer.Generation()); + "Queue generation changed while doing QueueRange: expected " << Generation << ", got: " << Buffer.Generation()); if (Current >= End) { return false; } @@ -278,7 +289,8 @@ public: , Begin(begin) , End(end) , DependentNodes(std::move(dependentNodes)) - {} + { + } NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { auto queue = Resource->GetValue(ctx); @@ -302,12 +314,11 @@ private: const TComputationNodePtrVector DependentNodes; }; -class TPreserveStreamValue : public TComputationValue<TPreserveStreamValue>, public TQueueResourceUser { +class TPreserveStreamValue: public TComputationValue<TPreserveStreamValue>, public TQueueResourceUser { public: using TBase = TComputationValue<TPreserveStreamValue>; - TPreserveStreamValue(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue&& stream - , NUdf::TUnboxedValue&& queue, TStringBuf tag, IComputationNode* resource, ui64 outpace) + TPreserveStreamValue(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue&& stream, NUdf::TUnboxedValue&& queue, TStringBuf tag, IComputationNode* resource, ui64 outpace) : TBase(memInfo) , TQueueResourceUser(std::move(tag), resource) , Stream(std::move(stream)) @@ -315,7 +326,8 @@ public: , OutpaceGoal(outpace) , Buffer(CheckAndGetBuffer(Queue)) , FrontIndex(Buffer.UsedSize()) - {} + { + } private: NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& value) override { @@ -332,23 +344,23 @@ private: } for (NUdf::TUnboxedValue item; State != EPreserveState::Emit && Outpace <= OutpaceGoal;) { switch (Stream.Fetch(item)) { - case NUdf::EFetchStatus::Yield: - State = EPreserveState::Yield; - return NUdf::EFetchStatus::Yield; - case NUdf::EFetchStatus::Finish: - State = EPreserveState::Emit; - break; - case NUdf::EFetchStatus::Ok: - Buffer.PushBack(std::move(item)); - if (Buffer.IsUnbounded()) { - UpdateBufferStats(Queue); - } - ++Outpace; - if (Outpace > OutpaceGoal) { - State = EPreserveState::GoOn; - } else { - State = EPreserveState::Feed; - } + case NUdf::EFetchStatus::Yield: + State = EPreserveState::Yield; + return NUdf::EFetchStatus::Yield; + case NUdf::EFetchStatus::Finish: + State = EPreserveState::Emit; + break; + case NUdf::EFetchStatus::Ok: + Buffer.PushBack(std::move(item)); + if (Buffer.IsUnbounded()) { + UpdateBufferStats(Queue); + } + ++Outpace; + if (Outpace > OutpaceGoal) { + State = EPreserveState::GoOn; + } else { + State = EPreserveState::Feed; + } } } if (!Outpace) { @@ -377,15 +389,17 @@ private: ui64 Outpace = 0; }; -class TPreserveStreamWrapper : public TMutableComputationNode<TPreserveStreamWrapper>, public TQueueResourceUser { +class TPreserveStreamWrapper: public TMutableComputationNode<TPreserveStreamWrapper>, public TQueueResourceUser { typedef TMutableComputationNode<TPreserveStreamWrapper> TBaseComputation; + public: TPreserveStreamWrapper(TComputationMutables& mutables, IComputationNode* stream, const TResourceType* resourceType, IComputationNode* resource, ui64 outpace) : TBaseComputation(mutables) , TQueueResourceUser(resourceType->GetTag(), resource) , Stream(stream) , Outpace(outpace) - {} + { + } NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { return ctx.HolderFactory.Create<TPreserveStreamValue>(Stream->GetValue(ctx), Resource->GetValue(ctx), Tag, Resource, Outpace); @@ -401,8 +415,8 @@ private: const ui64 Outpace; }; -template<class T, class...Args> -IComputationNode* MakeNodeWithDeps(TCallable& callable, const TComputationNodeFactoryContext& ctx, unsigned reqArgs, Args...args) { +template <class T, class... Args> +IComputationNode* MakeNodeWithDeps(TCallable& callable, const TComputationNodeFactoryContext& ctx, unsigned reqArgs, Args... args) { TComputationNodePtrVector dependentNodes(callable.GetInputsCount() - reqArgs); for (ui32 i = reqArgs; i < callable.GetInputsCount(); ++i) { dependentNodes[i - reqArgs] = LocateNode(ctx.NodeLocator, callable, i); @@ -410,7 +424,7 @@ IComputationNode* MakeNodeWithDeps(TCallable& callable, const TComputationNodeFa return new T(ctx.Mutables, std::move(dependentNodes), std::forward<Args>(args)...); } -} +} // namespace IComputationNode* WrapQueueCreate(TCallable& callable, const TComputationNodeFactoryContext& ctx) { const unsigned reqArgs = 3; @@ -481,5 +495,5 @@ IComputationNode* WrapPreserveStream(TCallable& callable, const TComputationNode return new TPreserveStreamWrapper(ctx.Mutables, stream, resourceType, resource, outpace); } -} -} +} // namespace NMiniKQL +} // namespace NKikimr |
