aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/pyrsistent/py2/pyrsistent/_pdeque.py
blob: 5147b3fa6adbee0abc9006bc35ed5f1a23466abb (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
from ._compat import Sequence, Hashable
from itertools import islice, chain
from numbers import Integral
from pyrsistent._plist import plist


class PDeque(object):
    """
    Persistent double ended queue (deque). Allows quick appends and pops in both ends. Implemented
    using two persistent lists.

    A maximum length can be specified to create a bounded queue.

    Fully supports the Sequence and Hashable protocols including indexing and slicing but
    if you need fast random access go for the PVector instead.

    Do not instantiate directly, instead use the factory functions :py:func:`dq` or :py:func:`pdeque` to
    create an instance.

    Some examples:

    >>> x = pdeque([1, 2, 3])
    >>> x.left
    1
    >>> x.right
    3
    >>> x[0] == x.left
    True
    >>> x[-1] == x.right
    True
    >>> x.pop()
    pdeque([1, 2])
    >>> x.pop() == x[:-1]
    True
    >>> x.popleft()
    pdeque([2, 3])
    >>> x.append(4)
    pdeque([1, 2, 3, 4])
    >>> x.appendleft(4)
    pdeque([4, 1, 2, 3])

    >>> y = pdeque([1, 2, 3], maxlen=3)
    >>> y.append(4)
    pdeque([2, 3, 4], maxlen=3)
    >>> y.appendleft(4)
    pdeque([4, 1, 2], maxlen=3)
    """
    __slots__ = ('_left_list', '_right_list', '_length', '_maxlen', '__weakref__')

    def __new__(cls, left_list, right_list, length, maxlen=None):
        instance = super(PDeque, cls).__new__(cls)
        instance._left_list = left_list
        instance._right_list = right_list
        instance._length = length

        if maxlen is not None:
            if not isinstance(maxlen, Integral):
                raise TypeError('An integer is required as maxlen')

            if maxlen < 0:
                raise ValueError("maxlen must be non-negative")

        instance._maxlen = maxlen
        return instance

    @property
    def right(self):
        """
        Rightmost element in dqueue.
        """
        return PDeque._tip_from_lists(self._right_list, self._left_list)

    @property
    def left(self):
        """
        Leftmost element in dqueue.
        """
        return PDeque._tip_from_lists(self._left_list, self._right_list)

    @staticmethod
    def _tip_from_lists(primary_list, secondary_list):
        if primary_list:
            return primary_list.first

        if secondary_list:
            return secondary_list[-1]

        raise IndexError('No elements in empty deque')

    def __iter__(self):
        return chain(self._left_list, self._right_list.reverse())

    def __repr__(self):
        return "pdeque({0}{1})".format(list(self),
                                       ', maxlen={0}'.format(self._maxlen) if self._maxlen is not None else '')
    __str__ = __repr__

    @property
    def maxlen(self):
        """
        Maximum length of the queue.
        """
        return self._maxlen

    def pop(self, count=1):
        """
        Return new deque with rightmost element removed. Popping the empty queue
        will return the empty queue. A optional count can be given to indicate the
        number of elements to pop. Popping with a negative index is the same as
        popleft. Executes in amortized O(k) where k is the number of elements to pop.

        >>> pdeque([1, 2]).pop()
        pdeque([1])
        >>> pdeque([1, 2]).pop(2)
        pdeque([])
        >>> pdeque([1, 2]).pop(-1)
        pdeque([2])
        """
        if count < 0:
            return self.popleft(-count)

        new_right_list, new_left_list = PDeque._pop_lists(self._right_list, self._left_list, count)
        return PDeque(new_left_list, new_right_list, max(self._length - count, 0), self._maxlen)

    def popleft(self, count=1):
        """
        Return new deque with leftmost element removed. Otherwise functionally
        equivalent to pop().

        >>> pdeque([1, 2]).popleft()
        pdeque([2])
        """
        if count < 0:
            return self.pop(-count)

        new_left_list, new_right_list = PDeque._pop_lists(self._left_list, self._right_list, count)
        return PDeque(new_left_list, new_right_list, max(self._length - count, 0), self._maxlen)

    @staticmethod
    def _pop_lists(primary_list, secondary_list, count):
        new_primary_list = primary_list
        new_secondary_list = secondary_list

        while count > 0 and (new_primary_list or new_secondary_list):
            count -= 1
            if new_primary_list.rest:
                new_primary_list = new_primary_list.rest
            elif new_primary_list:
                new_primary_list = new_secondary_list.reverse()
                new_secondary_list = plist()
            else:
                new_primary_list = new_secondary_list.reverse().rest
                new_secondary_list = plist()

        return new_primary_list, new_secondary_list

    def _is_empty(self):
        return not self._left_list and not self._right_list

    def __lt__(self, other):
        if not isinstance(other, PDeque):
            return NotImplemented

        return tuple(self) < tuple(other)

    def __eq__(self, other):
        if not isinstance(other, PDeque):
            return NotImplemented

        if tuple(self) == tuple(other):
            # Sanity check of the length value since it is redundant (there for performance)
            assert len(self) == len(other)
            return True

        return False

    def __hash__(self):
        return  hash(tuple(self))

    def __len__(self):
        return self._length

    def append(self, elem):
        """
        Return new deque with elem as the rightmost element.

        >>> pdeque([1, 2]).append(3)
        pdeque([1, 2, 3])
        """
        new_left_list, new_right_list, new_length = self._append(self._left_list, self._right_list, elem)
        return PDeque(new_left_list, new_right_list, new_length, self._maxlen)

    def appendleft(self, elem):
        """
        Return new deque with elem as the leftmost element.

        >>> pdeque([1, 2]).appendleft(3)
        pdeque([3, 1, 2])
        """
        new_right_list, new_left_list, new_length = self._append(self._right_list, self._left_list, elem)
        return PDeque(new_left_list, new_right_list, new_length, self._maxlen)

    def _append(self, primary_list, secondary_list, elem):
        if self._maxlen is not None and self._length == self._maxlen:
            if self._maxlen == 0:
                return primary_list, secondary_list, 0
            new_primary_list, new_secondary_list = PDeque._pop_lists(primary_list, secondary_list, 1)
            return new_primary_list, new_secondary_list.cons(elem), self._length

        return primary_list, secondary_list.cons(elem), self._length + 1

    @staticmethod
    def _extend_list(the_list, iterable):
        count = 0
        for elem in iterable:
            the_list = the_list.cons(elem)
            count += 1

        return the_list, count

    def _extend(self, primary_list, secondary_list, iterable):
        new_primary_list, extend_count = PDeque._extend_list(primary_list, iterable)
        new_secondary_list = secondary_list
        current_len = self._length + extend_count
        if self._maxlen is not None and current_len > self._maxlen:
            pop_len = current_len - self._maxlen
            new_secondary_list, new_primary_list = PDeque._pop_lists(new_secondary_list, new_primary_list, pop_len)
            extend_count -= pop_len

        return new_primary_list, new_secondary_list, extend_count

    def extend(self, iterable):
        """
        Return new deque with all elements of iterable appended to the right.

        >>> pdeque([1, 2]).extend([3, 4])
        pdeque([1, 2, 3, 4])
        """
        new_right_list, new_left_list, extend_count = self._extend(self._right_list, self._left_list, iterable)
        return PDeque(new_left_list, new_right_list, self._length + extend_count, self._maxlen)

    def extendleft(self, iterable):
        """
        Return new deque with all elements of iterable appended to the left.

        NB! The elements will be inserted in reverse order compared to the order in the iterable.

        >>> pdeque([1, 2]).extendleft([3, 4])
        pdeque([4, 3, 1, 2])
        """
        new_left_list, new_right_list, extend_count = self._extend(self._left_list, self._right_list, iterable)
        return PDeque(new_left_list, new_right_list, self._length + extend_count, self._maxlen)

    def count(self, elem):
        """
        Return the number of elements equal to elem present in the queue

        >>> pdeque([1, 2, 1]).count(1)
        2
        """
        return self._left_list.count(elem) + self._right_list.count(elem)

    def remove(self, elem):
        """
        Return new deque with first element from left equal to elem removed. If no such element is found
        a ValueError is raised.

        >>> pdeque([2, 1, 2]).remove(2)
        pdeque([1, 2])
        """
        try:
            return PDeque(self._left_list.remove(elem), self._right_list, self._length - 1)
        except ValueError:
            # Value not found in left list, try the right list
            try:
                # This is severely inefficient with a double reverse, should perhaps implement a remove_last()?
                return PDeque(self._left_list,
                               self._right_list.reverse().remove(elem).reverse(), self._length - 1)
            except ValueError:
                raise ValueError('{0} not found in PDeque'.format(elem))

    def reverse(self):
        """
        Return reversed deque.

        >>> pdeque([1, 2, 3]).reverse()
        pdeque([3, 2, 1])

        Also supports the standard python reverse function.

        >>> reversed(pdeque([1, 2, 3]))
        pdeque([3, 2, 1])
        """
        return PDeque(self._right_list, self._left_list, self._length)
    __reversed__ = reverse

    def rotate(self, steps):
        """
        Return deque with elements rotated steps steps.

        >>> x = pdeque([1, 2, 3])
        >>> x.rotate(1)
        pdeque([3, 1, 2])
        >>> x.rotate(-2)
        pdeque([3, 1, 2])
        """
        popped_deque = self.pop(steps)
        if steps >= 0:
            return popped_deque.extendleft(islice(self.reverse(), steps))

        return popped_deque.extend(islice(self, -steps))

    def __reduce__(self):
        # Pickling support
        return pdeque, (list(self), self._maxlen)

    def __getitem__(self, index):
        if isinstance(index, slice):
            if index.step is not None and index.step != 1:
                # Too difficult, no structural sharing possible
                return pdeque(tuple(self)[index], maxlen=self._maxlen)

            result = self
            if index.start is not None:
                result = result.popleft(index.start % self._length)
            if index.stop is not None:
                result = result.pop(self._length - (index.stop % self._length))

            return result

        if not isinstance(index, Integral):
            raise TypeError("'%s' object cannot be interpreted as an index" % type(index).__name__)

        if index >= 0:
            return self.popleft(index).left

        shifted = len(self) + index
        if shifted < 0:
            raise IndexError(
                "pdeque index {0} out of range {1}".format(index, len(self)),
            )
        return self.popleft(shifted).left

    index = Sequence.index

Sequence.register(PDeque)
Hashable.register(PDeque)


def pdeque(iterable=(), maxlen=None):
    """
    Return deque containing the elements of iterable. If maxlen is specified then
    len(iterable) - maxlen elements are discarded from the left to if len(iterable) > maxlen.

    >>> pdeque([1, 2, 3])
    pdeque([1, 2, 3])
    >>> pdeque([1, 2, 3, 4], maxlen=2)
    pdeque([3, 4], maxlen=2)
    """
    t = tuple(iterable)
    if maxlen is not None:
        t = t[-maxlen:]
    length = len(t)
    pivot = int(length / 2)
    left = plist(t[:pivot])
    right = plist(t[pivot:], reverse=True)
    return PDeque(left, right, length, maxlen)

def dq(*elements):
    """
    Return deque containing all arguments.

    >>> dq(1, 2, 3)
    pdeque([1, 2, 3])
    """
    return pdeque(elements)