summaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/comp_nodes/mkql_queue.cpp
diff options
context:
space:
mode:
authorvvvv <[email protected]>2025-10-09 12:25:18 +0300
committervvvv <[email protected]>2025-10-09 12:57:17 +0300
commitcb77d014972b2cdb27d2e6d979fc3a2772b27ad4 (patch)
tree7f3bcd8ce71c6bd0f3ccc11e31b9f665475b819e /yql/essentials/minikql/comp_nodes/mkql_queue.cpp
parentd58a8990d353b051c27e1069141117fdfde64358 (diff)
YQL-20086 minikql
commit_hash:e96f7390db5fcbe7e9f64f898141a263ad522daa
Diffstat (limited to 'yql/essentials/minikql/comp_nodes/mkql_queue.cpp')
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_queue.cpp102
1 files changed, 58 insertions, 44 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_queue.cpp b/yql/essentials/minikql/comp_nodes/mkql_queue.cpp
index 79f2b43654d..371c62912ca 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_queue.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_queue.cpp
@@ -11,7 +11,7 @@ namespace NMiniKQL {
namespace {
-class TQueueResource : public TComputationValue<TQueueResource> {
+class TQueueResource: public TComputationValue<TQueueResource> {
public:
TQueueResource(TMemoryUsageInfo* memInfo, const TStringBuf& tag, TMaybe<ui64> capacity, ui64 initSize)
: TComputationValue(memInfo)
@@ -73,7 +73,8 @@ protected:
TQueueResourceUser::TQueueResourceUser(TStringBuf&& tag, IComputationNode* resource)
: Tag(tag)
, Resource(resource)
-{}
+{
+}
TSafeCircularBuffer<TUnboxedValue>& TQueueResourceUser::CheckAndGetBuffer(const TUnboxedValuePod& resource) const {
return GetResource(resource).GetBuffer();
@@ -89,8 +90,9 @@ TQueueResource& TQueueResourceUser::GetResource(const TUnboxedValuePod& resource
return *static_cast<TQueueResource*>(resource.GetResource());
}
-class TQueueCreateWrapper : public TMutableComputationNode<TQueueCreateWrapper> {
+class TQueueCreateWrapper: public TMutableComputationNode<TQueueCreateWrapper> {
typedef TMutableComputationNode<TQueueCreateWrapper> TBaseComputation;
+
public:
TQueueCreateWrapper(TComputationMutables& mutables, TComputationNodePtrVector&& dependentNodes, const TString& name, TMaybe<ui64> capacity, ui64 initSize)
: TBaseComputation(mutables)
@@ -98,7 +100,8 @@ public:
, Name(name)
, Capacity(capacity)
, InitSize(initSize)
- {}
+ {
+ }
NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
return NUdf::TUnboxedValuePod(new TQueueResource(&ctx.HolderFactory.GetMemInfo(), Name, Capacity, InitSize));
@@ -115,14 +118,16 @@ private:
const ui64 InitSize;
};
-class TQueuePushWrapper : public TMutableComputationNode<TQueuePushWrapper>, public TQueueResourceUser {
+class TQueuePushWrapper: public TMutableComputationNode<TQueuePushWrapper>, public TQueueResourceUser {
typedef TMutableComputationNode<TQueuePushWrapper> TBaseComputation;
+
public:
TQueuePushWrapper(TComputationMutables& mutables, const TResourceType* resourceType, IComputationNode* resource, IComputationNode* value)
: TBaseComputation(mutables)
, TQueueResourceUser(resourceType->GetTag(), resource)
, Value(value)
- {}
+ {
+ }
NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
auto resource = Resource->GetValue(ctx);
@@ -143,13 +148,15 @@ private:
IComputationNode* const Value;
};
-class TQueuePopWrapper : public TMutableComputationNode<TQueuePopWrapper>, public TQueueResourceUser {
+class TQueuePopWrapper: public TMutableComputationNode<TQueuePopWrapper>, public TQueueResourceUser {
typedef TMutableComputationNode<TQueuePopWrapper> TBaseComputation;
+
public:
TQueuePopWrapper(TComputationMutables& mutables, const TResourceType* resourceType, IComputationNode* resource)
: TBaseComputation(mutables)
, TQueueResourceUser(resourceType->GetTag(), resource)
- {}
+ {
+ }
NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
auto resource = Resource->GetValue(ctx);
@@ -163,14 +170,17 @@ private:
}
};
-class TQueuePeekWrapper : public TMutableComputationNode<TQueuePeekWrapper>, public TQueueResourceUser {
+class TQueuePeekWrapper: public TMutableComputationNode<TQueuePeekWrapper>, public TQueueResourceUser {
typedef TMutableComputationNode<TQueuePeekWrapper> TBaseComputation;
+
public:
TQueuePeekWrapper(TComputationMutables& mutables, TComputationNodePtrVector&& dependentNodes, const TResourceType* resourceType, IComputationNode* resource, IComputationNode* index)
: TBaseComputation(mutables)
, TQueueResourceUser(resourceType->GetTag(), resource)
- , Index(index), DependentNodes(std::move(dependentNodes))
- {}
+ , Index(index)
+ , DependentNodes(std::move(dependentNodes))
+ {
+ }
NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
auto resource = Resource->GetValue(ctx);
@@ -190,12 +200,13 @@ private:
const TComputationNodePtrVector DependentNodes;
};
-class TQueueRangeWrapper : public TMutableComputationNode<TQueueRangeWrapper>, public TQueueResourceUser {
+class TQueueRangeWrapper: public TMutableComputationNode<TQueueRangeWrapper>, public TQueueResourceUser {
typedef TMutableComputationNode<TQueueRangeWrapper> TBaseComputation;
+
public:
- class TValue : public TComputationValue<TValue>, public TQueueResourceUser {
+ class TValue: public TComputationValue<TValue>, public TQueueResourceUser {
public:
- class TIterator : public TComputationValue<TIterator>, public TQueueResourceUser {
+ class TIterator: public TComputationValue<TIterator>, public TQueueResourceUser {
public:
TIterator(TMemoryUsageInfo* memInfo, TUnboxedValue queue, size_t begin, size_t end, ui64 generation, TStringBuf tag, IComputationNode* resource)
: TComputationValue<TIterator>(memInfo)
@@ -211,7 +222,7 @@ public:
private:
bool Next(NUdf::TUnboxedValue& value) override {
MKQL_ENSURE(Generation == Buffer.Generation(),
- "Queue generation changed while doing QueueRange: expected " << Generation << ", got: " << Buffer.Generation());
+ "Queue generation changed while doing QueueRange: expected " << Generation << ", got: " << Buffer.Generation());
if (Current >= End) {
return false;
}
@@ -278,7 +289,8 @@ public:
, Begin(begin)
, End(end)
, DependentNodes(std::move(dependentNodes))
- {}
+ {
+ }
NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
auto queue = Resource->GetValue(ctx);
@@ -302,12 +314,11 @@ private:
const TComputationNodePtrVector DependentNodes;
};
-class TPreserveStreamValue : public TComputationValue<TPreserveStreamValue>, public TQueueResourceUser {
+class TPreserveStreamValue: public TComputationValue<TPreserveStreamValue>, public TQueueResourceUser {
public:
using TBase = TComputationValue<TPreserveStreamValue>;
- TPreserveStreamValue(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue&& stream
- , NUdf::TUnboxedValue&& queue, TStringBuf tag, IComputationNode* resource, ui64 outpace)
+ TPreserveStreamValue(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue&& stream, NUdf::TUnboxedValue&& queue, TStringBuf tag, IComputationNode* resource, ui64 outpace)
: TBase(memInfo)
, TQueueResourceUser(std::move(tag), resource)
, Stream(std::move(stream))
@@ -315,7 +326,8 @@ public:
, OutpaceGoal(outpace)
, Buffer(CheckAndGetBuffer(Queue))
, FrontIndex(Buffer.UsedSize())
- {}
+ {
+ }
private:
NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& value) override {
@@ -332,23 +344,23 @@ private:
}
for (NUdf::TUnboxedValue item; State != EPreserveState::Emit && Outpace <= OutpaceGoal;) {
switch (Stream.Fetch(item)) {
- case NUdf::EFetchStatus::Yield:
- State = EPreserveState::Yield;
- return NUdf::EFetchStatus::Yield;
- case NUdf::EFetchStatus::Finish:
- State = EPreserveState::Emit;
- break;
- case NUdf::EFetchStatus::Ok:
- Buffer.PushBack(std::move(item));
- if (Buffer.IsUnbounded()) {
- UpdateBufferStats(Queue);
- }
- ++Outpace;
- if (Outpace > OutpaceGoal) {
- State = EPreserveState::GoOn;
- } else {
- State = EPreserveState::Feed;
- }
+ case NUdf::EFetchStatus::Yield:
+ State = EPreserveState::Yield;
+ return NUdf::EFetchStatus::Yield;
+ case NUdf::EFetchStatus::Finish:
+ State = EPreserveState::Emit;
+ break;
+ case NUdf::EFetchStatus::Ok:
+ Buffer.PushBack(std::move(item));
+ if (Buffer.IsUnbounded()) {
+ UpdateBufferStats(Queue);
+ }
+ ++Outpace;
+ if (Outpace > OutpaceGoal) {
+ State = EPreserveState::GoOn;
+ } else {
+ State = EPreserveState::Feed;
+ }
}
}
if (!Outpace) {
@@ -377,15 +389,17 @@ private:
ui64 Outpace = 0;
};
-class TPreserveStreamWrapper : public TMutableComputationNode<TPreserveStreamWrapper>, public TQueueResourceUser {
+class TPreserveStreamWrapper: public TMutableComputationNode<TPreserveStreamWrapper>, public TQueueResourceUser {
typedef TMutableComputationNode<TPreserveStreamWrapper> TBaseComputation;
+
public:
TPreserveStreamWrapper(TComputationMutables& mutables, IComputationNode* stream, const TResourceType* resourceType, IComputationNode* resource, ui64 outpace)
: TBaseComputation(mutables)
, TQueueResourceUser(resourceType->GetTag(), resource)
, Stream(stream)
, Outpace(outpace)
- {}
+ {
+ }
NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
return ctx.HolderFactory.Create<TPreserveStreamValue>(Stream->GetValue(ctx), Resource->GetValue(ctx), Tag, Resource, Outpace);
@@ -401,8 +415,8 @@ private:
const ui64 Outpace;
};
-template<class T, class...Args>
-IComputationNode* MakeNodeWithDeps(TCallable& callable, const TComputationNodeFactoryContext& ctx, unsigned reqArgs, Args...args) {
+template <class T, class... Args>
+IComputationNode* MakeNodeWithDeps(TCallable& callable, const TComputationNodeFactoryContext& ctx, unsigned reqArgs, Args... args) {
TComputationNodePtrVector dependentNodes(callable.GetInputsCount() - reqArgs);
for (ui32 i = reqArgs; i < callable.GetInputsCount(); ++i) {
dependentNodes[i - reqArgs] = LocateNode(ctx.NodeLocator, callable, i);
@@ -410,7 +424,7 @@ IComputationNode* MakeNodeWithDeps(TCallable& callable, const TComputationNodeFa
return new T(ctx.Mutables, std::move(dependentNodes), std::forward<Args>(args)...);
}
-}
+} // namespace
IComputationNode* WrapQueueCreate(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
const unsigned reqArgs = 3;
@@ -481,5 +495,5 @@ IComputationNode* WrapPreserveStream(TCallable& callable, const TComputationNode
return new TPreserveStreamWrapper(ctx.Mutables, stream, resourceType, resource, outpace);
}
-}
-}
+} // namespace NMiniKQL
+} // namespace NKikimr