1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
|
#include "mkql_replicate.h"
#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
#include <yql/essentials/minikql/computation/mkql_custom_list.h>
#include <yql/essentials/minikql/mkql_node_cast.h>
#include <yql/essentials/minikql/mkql_program_builder.h>
namespace NKikimr {
namespace NMiniKQL {
namespace {
class TIterableWrapper : public TMutableComputationNode<TIterableWrapper> {
typedef TMutableComputationNode<TIterableWrapper> TBaseComputation;
public:
class TValue : public TCustomListValue {
public:
class TIterator : public TComputationValue<TIterator> {
public:
TIterator(TMemoryUsageInfo* memInfo, const NUdf::TUnboxedValue& stream)
: TComputationValue<TIterator>(memInfo)
, Stream(stream)
{}
private:
bool Next(NUdf::TUnboxedValue& value) override {
auto status = Stream.Fetch(value);
MKQL_ENSURE(status != NUdf::EFetchStatus::Yield, "Yield is not supported");
return status != NUdf::EFetchStatus::Finish;
}
bool Skip() override {
NUdf::TUnboxedValue value;
auto status = Stream.Fetch(value);
MKQL_ENSURE(status != NUdf::EFetchStatus::Yield, "Yield is not supported");
return status != NUdf::EFetchStatus::Finish;
}
NUdf::TUnboxedValue Stream;
};
TValue(TMemoryUsageInfo* memInfo, TComputationContext& ctx, IComputationNode* stream, IComputationExternalNode* arg)
: TCustomListValue(memInfo)
, Ctx(ctx)
, Stream(stream)
, Arg(arg)
{
}
private:
NUdf::TUnboxedValue GetListIterator() const override {
auto stream = NewStream();
return Ctx.HolderFactory.Create<TIterator>(stream);
}
bool HasFastListLength() const override {
return Length.Defined();
}
ui64 GetListLength() const override {
if (!Length) {
auto stream = NewStream();
NUdf::TUnboxedValue item;
ui64 n = 0;
for (;;) {
auto status = stream.Fetch(item);
MKQL_ENSURE(status != NUdf::EFetchStatus::Yield, "Yield is not supported");
if (status == NUdf::EFetchStatus::Finish) {
break;
}
++n;
}
Length = n;
}
return *Length;
}
ui64 GetEstimatedListLength() const override {
return GetListLength();
}
bool HasListItems() const override {
if (!HasItems) {
if (Length) {
HasItems = *Length > 0;
} else {
auto stream = NewStream();
NUdf::TUnboxedValue item;
auto status = stream.Fetch(item);
MKQL_ENSURE(status != NUdf::EFetchStatus::Yield, "Yield is not supported");
HasItems = (status != NUdf::EFetchStatus::Finish);
}
}
return *HasItems;
}
NUdf::TUnboxedValue NewStream() const {
Arg->SetValue(Ctx, NUdf::TUnboxedValue());
return Stream->GetValue(Ctx);
}
TComputationContext& Ctx;
IComputationNode* const Stream;
IComputationExternalNode* const Arg;
mutable TMaybe<ui64> Length;
mutable TMaybe<bool> HasItems;
};
TIterableWrapper(TComputationMutables& mutables, IComputationNode* stream, IComputationExternalNode* arg)
: TBaseComputation(mutables)
, Stream(stream)
, Arg(arg)
{
}
NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
return ctx.HolderFactory.Create<TValue>(ctx, Stream, Arg);
}
private:
void RegisterDependencies() const final {
DependsOn(Stream);
Arg->AddDependence(Stream);
}
IComputationNode* const Stream;
IComputationExternalNode* const Arg;
};
}
IComputationNode* WrapIterable(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 2, "Expected 2 args");
const auto stream = LocateNode(ctx.NodeLocator, callable, 0);
const auto arg = LocateExternalNode(ctx.NodeLocator, callable, 1);
return new TIterableWrapper(ctx.Mutables, stream, arg);
}
}
}
|