diff options
Diffstat (limited to 'contrib/python/pexpect/pexpect/expect.py')
-rw-r--r-- | contrib/python/pexpect/pexpect/expect.py | 510 |
1 files changed, 255 insertions, 255 deletions
diff --git a/contrib/python/pexpect/pexpect/expect.py b/contrib/python/pexpect/pexpect/expect.py index 00ad4f0748..d3409db9d7 100644 --- a/contrib/python/pexpect/pexpect/expect.py +++ b/contrib/python/pexpect/pexpect/expect.py @@ -1,27 +1,27 @@ -import time - -from .exceptions import EOF, TIMEOUT - -class Expecter(object): - def __init__(self, spawn, searcher, searchwindowsize=-1): - self.spawn = spawn - self.searcher = searcher +import time + +from .exceptions import EOF, TIMEOUT + +class Expecter(object): + def __init__(self, spawn, searcher, searchwindowsize=-1): + self.spawn = spawn + self.searcher = searcher # A value of -1 means to use the figure from spawn, which should # be None or a positive number. - if searchwindowsize == -1: - searchwindowsize = spawn.searchwindowsize - self.searchwindowsize = searchwindowsize + if searchwindowsize == -1: + searchwindowsize = spawn.searchwindowsize + self.searchwindowsize = searchwindowsize self.lookback = None if hasattr(searcher, 'longest_string'): self.lookback = searcher.longest_string def do_search(self, window, freshlen): - spawn = self.spawn - searcher = self.searcher + spawn = self.spawn + searcher = self.searcher if freshlen > len(window): freshlen = len(window) index = searcher.search(window, freshlen, self.searchwindowsize) - if index >= 0: + if index >= 0: spawn._buffer = spawn.buffer_type() spawn._buffer.write(window[searcher.end:]) spawn.before = spawn._before.getvalue()[ @@ -29,10 +29,10 @@ class Expecter(object): spawn._before = spawn.buffer_type() spawn._before.write(window[searcher.end:]) spawn.after = window[searcher.start:searcher.end] - spawn.match = searcher.match - spawn.match_index = index - # Found a match - return index + spawn.match = searcher.match + spawn.match_index = index + # Found a match + return index elif self.searchwindowsize or self.lookback: maintain = self.searchwindowsize or self.lookback if spawn._buffer.tell() > maintain: @@ -97,275 +97,275 @@ class Expecter(object): window = spawn._buffer.read() return self.do_search(window, freshlen) - def eof(self, err=None): - spawn = self.spawn - + def eof(self, err=None): + spawn = self.spawn + spawn.before = spawn._before.getvalue() spawn._buffer = spawn.buffer_type() spawn._before = spawn.buffer_type() - spawn.after = EOF - index = self.searcher.eof_index - if index >= 0: - spawn.match = EOF - spawn.match_index = index - return index - else: - spawn.match = None - spawn.match_index = None - msg = str(spawn) + spawn.after = EOF + index = self.searcher.eof_index + if index >= 0: + spawn.match = EOF + spawn.match_index = index + return index + else: + spawn.match = None + spawn.match_index = None + msg = str(spawn) msg += '\nsearcher: %s' % self.searcher - if err is not None: - msg = str(err) + '\n' + msg + if err is not None: + msg = str(err) + '\n' + msg exc = EOF(msg) exc.__cause__ = None # in Python 3.x we can use "raise exc from None" raise exc - def timeout(self, err=None): - spawn = self.spawn - + def timeout(self, err=None): + spawn = self.spawn + spawn.before = spawn._before.getvalue() - spawn.after = TIMEOUT - index = self.searcher.timeout_index - if index >= 0: - spawn.match = TIMEOUT - spawn.match_index = index - return index - else: - spawn.match = None - spawn.match_index = None - msg = str(spawn) + spawn.after = TIMEOUT + index = self.searcher.timeout_index + if index >= 0: + spawn.match = TIMEOUT + spawn.match_index = index + return index + else: + spawn.match = None + spawn.match_index = None + msg = str(spawn) msg += '\nsearcher: %s' % self.searcher - if err is not None: - msg = str(err) + '\n' + msg - + if err is not None: + msg = str(err) + '\n' + msg + exc = TIMEOUT(msg) exc.__cause__ = None # in Python 3.x we can use "raise exc from None" raise exc - def errored(self): - spawn = self.spawn + def errored(self): + spawn = self.spawn spawn.before = spawn._before.getvalue() - spawn.after = None - spawn.match = None - spawn.match_index = None - - def expect_loop(self, timeout=-1): - """Blocking expect""" - spawn = self.spawn - - if timeout is not None: - end_time = time.time() + timeout - - try: + spawn.after = None + spawn.match = None + spawn.match_index = None + + def expect_loop(self, timeout=-1): + """Blocking expect""" + spawn = self.spawn + + if timeout is not None: + end_time = time.time() + timeout + + try: idx = self.existing_data() if idx is not None: return idx - while True: - # No match at this point - if (timeout is not None) and (timeout < 0): - return self.timeout() - # Still have time left, so read more data - incoming = spawn.read_nonblocking(spawn.maxread, timeout) + while True: + # No match at this point + if (timeout is not None) and (timeout < 0): + return self.timeout() + # Still have time left, so read more data + incoming = spawn.read_nonblocking(spawn.maxread, timeout) if self.spawn.delayafterread is not None: time.sleep(self.spawn.delayafterread) idx = self.new_data(incoming) # Keep reading until exception or return. if idx is not None: return idx - if timeout is not None: - timeout = end_time - time.time() - except EOF as e: - return self.eof(e) - except TIMEOUT as e: - return self.timeout(e) - except: - self.errored() - raise - - -class searcher_string(object): - '''This is a plain string search helper for the spawn.expect_any() method. - This helper class is for speed. For more powerful regex patterns - see the helper class, searcher_re. - - Attributes: - - eof_index - index of EOF, or -1 - timeout_index - index of TIMEOUT, or -1 - - After a successful match by the search() method the following attributes - are available: - - start - index into the buffer, first byte of match - end - index into the buffer, first byte after match - match - the matching string itself - - ''' - - def __init__(self, strings): - '''This creates an instance of searcher_string. This argument 'strings' - may be a list; a sequence of strings; or the EOF or TIMEOUT types. ''' - - self.eof_index = -1 - self.timeout_index = -1 - self._strings = [] + if timeout is not None: + timeout = end_time - time.time() + except EOF as e: + return self.eof(e) + except TIMEOUT as e: + return self.timeout(e) + except: + self.errored() + raise + + +class searcher_string(object): + '''This is a plain string search helper for the spawn.expect_any() method. + This helper class is for speed. For more powerful regex patterns + see the helper class, searcher_re. + + Attributes: + + eof_index - index of EOF, or -1 + timeout_index - index of TIMEOUT, or -1 + + After a successful match by the search() method the following attributes + are available: + + start - index into the buffer, first byte of match + end - index into the buffer, first byte after match + match - the matching string itself + + ''' + + def __init__(self, strings): + '''This creates an instance of searcher_string. This argument 'strings' + may be a list; a sequence of strings; or the EOF or TIMEOUT types. ''' + + self.eof_index = -1 + self.timeout_index = -1 + self._strings = [] self.longest_string = 0 - for n, s in enumerate(strings): - if s is EOF: - self.eof_index = n - continue - if s is TIMEOUT: - self.timeout_index = n - continue - self._strings.append((n, s)) + for n, s in enumerate(strings): + if s is EOF: + self.eof_index = n + continue + if s is TIMEOUT: + self.timeout_index = n + continue + self._strings.append((n, s)) if len(s) > self.longest_string: self.longest_string = len(s) - - def __str__(self): - '''This returns a human-readable string that represents the state of - the object.''' - + + def __str__(self): + '''This returns a human-readable string that represents the state of + the object.''' + ss = [(ns[0], ' %d: %r' % ns) for ns in self._strings] - ss.append((-1, 'searcher_string:')) - if self.eof_index >= 0: - ss.append((self.eof_index, ' %d: EOF' % self.eof_index)) - if self.timeout_index >= 0: - ss.append((self.timeout_index, - ' %d: TIMEOUT' % self.timeout_index)) - ss.sort() - ss = list(zip(*ss))[1] - return '\n'.join(ss) - - def search(self, buffer, freshlen, searchwindowsize=None): + ss.append((-1, 'searcher_string:')) + if self.eof_index >= 0: + ss.append((self.eof_index, ' %d: EOF' % self.eof_index)) + if self.timeout_index >= 0: + ss.append((self.timeout_index, + ' %d: TIMEOUT' % self.timeout_index)) + ss.sort() + ss = list(zip(*ss))[1] + return '\n'.join(ss) + + def search(self, buffer, freshlen, searchwindowsize=None): '''This searches 'buffer' for the first occurrence of one of the search - strings. 'freshlen' must indicate the number of bytes at the end of - 'buffer' which have not been searched before. It helps to avoid - searching the same, possibly big, buffer over and over again. - - See class spawn for the 'searchwindowsize' argument. - - If there is a match this returns the index of that string, and sets - 'start', 'end' and 'match'. Otherwise, this returns -1. ''' - - first_match = None - - # 'freshlen' helps a lot here. Further optimizations could - # possibly include: - # - # using something like the Boyer-Moore Fast String Searching - # Algorithm; pre-compiling the search through a list of - # strings into something that can scan the input once to - # search for all N strings; realize that if we search for - # ['bar', 'baz'] and the input is '...foo' we need not bother - # rescanning until we've read three more bytes. - # - # Sadly, I don't know enough about this interesting topic. /grahn - - for index, s in self._strings: - if searchwindowsize is None: - # the match, if any, can only be in the fresh data, - # or at the very end of the old data - offset = -(freshlen + len(s)) - else: - # better obey searchwindowsize - offset = -searchwindowsize - n = buffer.find(s, offset) - if n >= 0 and (first_match is None or n < first_match): - first_match = n - best_index, best_match = index, s - if first_match is None: - return -1 - self.match = best_match - self.start = first_match - self.end = self.start + len(self.match) - return best_index - - -class searcher_re(object): - '''This is regular expression string search helper for the - spawn.expect_any() method. This helper class is for powerful - pattern matching. For speed, see the helper class, searcher_string. - - Attributes: - - eof_index - index of EOF, or -1 - timeout_index - index of TIMEOUT, or -1 - - After a successful match by the search() method the following attributes - are available: - - start - index into the buffer, first byte of match - end - index into the buffer, first byte after match + strings. 'freshlen' must indicate the number of bytes at the end of + 'buffer' which have not been searched before. It helps to avoid + searching the same, possibly big, buffer over and over again. + + See class spawn for the 'searchwindowsize' argument. + + If there is a match this returns the index of that string, and sets + 'start', 'end' and 'match'. Otherwise, this returns -1. ''' + + first_match = None + + # 'freshlen' helps a lot here. Further optimizations could + # possibly include: + # + # using something like the Boyer-Moore Fast String Searching + # Algorithm; pre-compiling the search through a list of + # strings into something that can scan the input once to + # search for all N strings; realize that if we search for + # ['bar', 'baz'] and the input is '...foo' we need not bother + # rescanning until we've read three more bytes. + # + # Sadly, I don't know enough about this interesting topic. /grahn + + for index, s in self._strings: + if searchwindowsize is None: + # the match, if any, can only be in the fresh data, + # or at the very end of the old data + offset = -(freshlen + len(s)) + else: + # better obey searchwindowsize + offset = -searchwindowsize + n = buffer.find(s, offset) + if n >= 0 and (first_match is None or n < first_match): + first_match = n + best_index, best_match = index, s + if first_match is None: + return -1 + self.match = best_match + self.start = first_match + self.end = self.start + len(self.match) + return best_index + + +class searcher_re(object): + '''This is regular expression string search helper for the + spawn.expect_any() method. This helper class is for powerful + pattern matching. For speed, see the helper class, searcher_string. + + Attributes: + + eof_index - index of EOF, or -1 + timeout_index - index of TIMEOUT, or -1 + + After a successful match by the search() method the following attributes + are available: + + start - index into the buffer, first byte of match + end - index into the buffer, first byte after match match - the re.match object returned by a successful re.search - - ''' - - def __init__(self, patterns): - '''This creates an instance that searches for 'patterns' Where - 'patterns' may be a list or other sequence of compiled regular - expressions, or the EOF or TIMEOUT types.''' - - self.eof_index = -1 - self.timeout_index = -1 - self._searches = [] + + ''' + + def __init__(self, patterns): + '''This creates an instance that searches for 'patterns' Where + 'patterns' may be a list or other sequence of compiled regular + expressions, or the EOF or TIMEOUT types.''' + + self.eof_index = -1 + self.timeout_index = -1 + self._searches = [] for n, s in enumerate(patterns): - if s is EOF: - self.eof_index = n - continue - if s is TIMEOUT: - self.timeout_index = n - continue - self._searches.append((n, s)) - - def __str__(self): - '''This returns a human-readable string that represents the state of - the object.''' - - #ss = [(n, ' %d: re.compile("%s")' % - # (n, repr(s.pattern))) for n, s in self._searches] - ss = list() - for n, s in self._searches: + if s is EOF: + self.eof_index = n + continue + if s is TIMEOUT: + self.timeout_index = n + continue + self._searches.append((n, s)) + + def __str__(self): + '''This returns a human-readable string that represents the state of + the object.''' + + #ss = [(n, ' %d: re.compile("%s")' % + # (n, repr(s.pattern))) for n, s in self._searches] + ss = list() + for n, s in self._searches: ss.append((n, ' %d: re.compile(%r)' % (n, s.pattern))) - ss.append((-1, 'searcher_re:')) - if self.eof_index >= 0: - ss.append((self.eof_index, ' %d: EOF' % self.eof_index)) - if self.timeout_index >= 0: - ss.append((self.timeout_index, ' %d: TIMEOUT' % - self.timeout_index)) - ss.sort() - ss = list(zip(*ss))[1] - return '\n'.join(ss) - - def search(self, buffer, freshlen, searchwindowsize=None): + ss.append((-1, 'searcher_re:')) + if self.eof_index >= 0: + ss.append((self.eof_index, ' %d: EOF' % self.eof_index)) + if self.timeout_index >= 0: + ss.append((self.timeout_index, ' %d: TIMEOUT' % + self.timeout_index)) + ss.sort() + ss = list(zip(*ss))[1] + return '\n'.join(ss) + + def search(self, buffer, freshlen, searchwindowsize=None): '''This searches 'buffer' for the first occurrence of one of the regular - expressions. 'freshlen' must indicate the number of bytes at the end of - 'buffer' which have not been searched before. - - See class spawn for the 'searchwindowsize' argument. - - If there is a match this returns the index of that string, and sets - 'start', 'end' and 'match'. Otherwise, returns -1.''' - - first_match = None - # 'freshlen' doesn't help here -- we cannot predict the - # length of a match, and the re module provides no help. - if searchwindowsize is None: - searchstart = 0 - else: - searchstart = max(0, len(buffer) - searchwindowsize) - for index, s in self._searches: - match = s.search(buffer, searchstart) - if match is None: - continue - n = match.start() - if first_match is None or n < first_match: - first_match = n - the_match = match - best_index = index - if first_match is None: - return -1 - self.start = first_match - self.match = the_match - self.end = self.match.end() + expressions. 'freshlen' must indicate the number of bytes at the end of + 'buffer' which have not been searched before. + + See class spawn for the 'searchwindowsize' argument. + + If there is a match this returns the index of that string, and sets + 'start', 'end' and 'match'. Otherwise, returns -1.''' + + first_match = None + # 'freshlen' doesn't help here -- we cannot predict the + # length of a match, and the re module provides no help. + if searchwindowsize is None: + searchstart = 0 + else: + searchstart = max(0, len(buffer) - searchwindowsize) + for index, s in self._searches: + match = s.search(buffer, searchstart) + if match is None: + continue + n = match.start() + if first_match is None or n < first_match: + first_match = n + the_match = match + best_index = index + if first_match is None: + return -1 + self.start = first_match + self.match = the_match + self.end = self.match.end() return best_index |