diff options
author | shadchin <shadchin@yandex-team.ru> | 2022-02-10 16:44:30 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:44:30 +0300 |
commit | 2598ef1d0aee359b4b6d5fdd1758916d5907d04f (patch) | |
tree | 012bb94d777798f1f56ac1cec429509766d05181 /contrib/tools/python3/src/Lib/email/_header_value_parser.py | |
parent | 6751af0b0c1b952fede40b19b71da8025b5d8bcf (diff) | |
download | ydb-2598ef1d0aee359b4b6d5fdd1758916d5907d04f.tar.gz |
Restoring authorship annotation for <shadchin@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/tools/python3/src/Lib/email/_header_value_parser.py')
-rw-r--r-- | contrib/tools/python3/src/Lib/email/_header_value_parser.py | 454 |
1 files changed, 227 insertions, 227 deletions
diff --git a/contrib/tools/python3/src/Lib/email/_header_value_parser.py b/contrib/tools/python3/src/Lib/email/_header_value_parser.py index 51d355fbb0..a9dbc7f335 100644 --- a/contrib/tools/python3/src/Lib/email/_header_value_parser.py +++ b/contrib/tools/python3/src/Lib/email/_header_value_parser.py @@ -68,7 +68,7 @@ XXX: provide complete list of token types. """ import re -import sys +import sys import urllib # For urllib.parse.unquote from string import hexdigits from operator import itemgetter @@ -96,18 +96,18 @@ EXTENDED_ATTRIBUTE_ENDS = ATTRIBUTE_ENDS - set('%') def quote_string(value): return '"'+str(value).replace('\\', '\\\\').replace('"', r'\"')+'"' -# Match a RFC 2047 word, looks like =?utf-8?q?someword?= -rfc2047_matcher = re.compile(r''' - =\? # literal =? - [^?]* # charset - \? # literal ? - [qQbB] # literal 'q' or 'b', case insensitive - \? # literal ? - .*? # encoded word - \?= # literal ?= -''', re.VERBOSE | re.MULTILINE) - - +# Match a RFC 2047 word, looks like =?utf-8?q?someword?= +rfc2047_matcher = re.compile(r''' + =\? # literal =? + [^?]* # charset + \? # literal ? + [qQbB] # literal 'q' or 'b', case insensitive + \? # literal ? + .*? # encoded word + \?= # literal ?= +''', re.VERBOSE | re.MULTILINE) + + # # TokenList and its subclasses # @@ -509,11 +509,11 @@ class DotAtomText(TokenList): as_ew_allowed = True -class NoFoldLiteral(TokenList): - token_type = 'no-fold-literal' - as_ew_allowed = False - - +class NoFoldLiteral(TokenList): + token_type = 'no-fold-literal' + as_ew_allowed = False + + class AddrSpec(TokenList): token_type = 'addr-spec' @@ -561,8 +561,8 @@ class DisplayName(Phrase): @property def display_name(self): res = TokenList(self) - if len(res) == 0: - return res.value + if len(res) == 0: + return res.value if res[0].token_type == 'cfws': res.pop(0) else: @@ -584,7 +584,7 @@ class DisplayName(Phrase): for x in self: if x.token_type == 'quoted-string': quote = True - if len(self) != 0 and quote: + if len(self) != 0 and quote: pre = post = '' if self[0].token_type=='cfws' or self[0][0].token_type=='cfws': pre = ' ' @@ -730,7 +730,7 @@ class MimeParameters(TokenList): # to assume the RFC 2231 pieces can come in any order. However, we # output them in the order that we first see a given name, which gives # us a stable __str__. - params = {} # Using order preserving dict from Python 3.7+ + params = {} # Using order preserving dict from Python 3.7+ for token in self: if not token.token_type.endswith('parameter'): continue @@ -842,23 +842,23 @@ class HeaderLabel(TokenList): as_ew_allowed = False -class MsgID(TokenList): - token_type = 'msg-id' - as_ew_allowed = False - - def fold(self, policy): - # message-id tokens may not be folded. - return str(self) + policy.linesep - - -class MessageID(MsgID): - token_type = 'message-id' - - -class InvalidMessageID(MessageID): - token_type = 'invalid-message-id' - - +class MsgID(TokenList): + token_type = 'msg-id' + as_ew_allowed = False + + def fold(self, policy): + # message-id tokens may not be folded. + return str(self) + policy.linesep + + +class MessageID(MsgID): + token_type = 'message-id' + + +class InvalidMessageID(MessageID): + token_type = 'invalid-message-id' + + class Header(TokenList): token_type = 'header' @@ -940,10 +940,10 @@ class EWWhiteSpaceTerminal(WhiteSpaceTerminal): return '' -class _InvalidEwError(errors.HeaderParseError): - """Invalid encoded word found while parsing headers.""" - - +class _InvalidEwError(errors.HeaderParseError): + """Invalid encoded word found while parsing headers.""" + + # XXX these need to become classes and used as instances so # that a program can't change them in a parse tree and screw # up other parse trees. Maybe should have tests for that, too. @@ -1048,10 +1048,10 @@ def get_encoded_word(value): raise errors.HeaderParseError( "expected encoded word but found {}".format(value)) remstr = ''.join(remainder) - if (len(remstr) > 1 and - remstr[0] in hexdigits and - remstr[1] in hexdigits and - tok.count('?') < 2): + if (len(remstr) > 1 and + remstr[0] in hexdigits and + remstr[1] in hexdigits and + tok.count('?') < 2): # The ? after the CTE was followed by an encoded word escape (=XX). rest, *remainder = remstr.split('?=', 1) tok = tok + '?=' + rest @@ -1062,8 +1062,8 @@ def get_encoded_word(value): value = ''.join(remainder) try: text, charset, lang, defects = _ew.decode('=?' + tok + '?=') - except (ValueError, KeyError): - raise _InvalidEwError( + except (ValueError, KeyError): + raise _InvalidEwError( "encoded word format invalid: '{}'".format(ew.cte)) ew.charset = charset ew.lang = lang @@ -1078,10 +1078,10 @@ def get_encoded_word(value): _validate_xtext(vtext) ew.append(vtext) text = ''.join(remainder) - # Encoded words should be followed by a WS - if value and value[0] not in WSP: - ew.defects.append(errors.InvalidHeaderDefect( - "missing trailing whitespace after encoded-word")) + # Encoded words should be followed by a WS + if value and value[0] not in WSP: + ew.defects.append(errors.InvalidHeaderDefect( + "missing trailing whitespace after encoded-word")) return ew, value def get_unstructured(value): @@ -1113,12 +1113,12 @@ def get_unstructured(value): token, value = get_fws(value) unstructured.append(token) continue - valid_ew = True + valid_ew = True if value.startswith('=?'): try: token, value = get_encoded_word(value) - except _InvalidEwError: - valid_ew = False + except _InvalidEwError: + valid_ew = False except errors.HeaderParseError: # XXX: Need to figure out how to register defects when # appropriate here. @@ -1137,14 +1137,14 @@ def get_unstructured(value): unstructured.append(token) continue tok, *remainder = _wsp_splitter(value, 1) - # Split in the middle of an atom if there is a rfc2047 encoded word - # which does not have WSP on both sides. The defect will be registered - # the next time through the loop. - # This needs to only be performed when the encoded word is valid; - # otherwise, performing it on an invalid encoded word can cause - # the parser to go in an infinite loop. - if valid_ew and rfc2047_matcher.search(tok): - tok, *remainder = value.partition('=?') + # Split in the middle of an atom if there is a rfc2047 encoded word + # which does not have WSP on both sides. The defect will be registered + # the next time through the loop. + # This needs to only be performed when the encoded word is valid; + # otherwise, performing it on an invalid encoded word can cause + # the parser to go in an infinite loop. + if valid_ew and rfc2047_matcher.search(tok): + tok, *remainder = value.partition('=?') vtext = ValueTerminal(tok, 'vtext') _validate_xtext(vtext) unstructured.append(vtext) @@ -1211,28 +1211,28 @@ def get_bare_quoted_string(value): "expected '\"' but found '{}'".format(value)) bare_quoted_string = BareQuotedString() value = value[1:] - if value and value[0] == '"': + if value and value[0] == '"': token, value = get_qcontent(value) bare_quoted_string.append(token) while value and value[0] != '"': if value[0] in WSP: token, value = get_fws(value) elif value[:2] == '=?': - valid_ew = False + valid_ew = False try: token, value = get_encoded_word(value) bare_quoted_string.defects.append(errors.InvalidHeaderDefect( "encoded word inside quoted string")) - valid_ew = True + valid_ew = True except errors.HeaderParseError: token, value = get_qcontent(value) - # Collapse the whitespace between two encoded words that occur in a - # bare-quoted-string. - if valid_ew and len(bare_quoted_string) > 1: - if (bare_quoted_string[-1].token_type == 'fws' and - bare_quoted_string[-2].token_type == 'encoded-word'): - bare_quoted_string[-1] = EWWhiteSpaceTerminal( - bare_quoted_string[-1], 'fws') + # Collapse the whitespace between two encoded words that occur in a + # bare-quoted-string. + if valid_ew and len(bare_quoted_string) > 1: + if (bare_quoted_string[-1].token_type == 'fws' and + bare_quoted_string[-2].token_type == 'encoded-word'): + bare_quoted_string[-1] = EWWhiteSpaceTerminal( + bare_quoted_string[-1], 'fws') else: token, value = get_qcontent(value) bare_quoted_string.append(token) @@ -1389,9 +1389,9 @@ def get_word(value): leader, value = get_cfws(value) else: leader = None - if not value: - raise errors.HeaderParseError( - "Expected 'atom' or 'quoted-string' but found nothing.") + if not value: + raise errors.HeaderParseError( + "Expected 'atom' or 'quoted-string' but found nothing.") if value[0]=='"': token, value = get_quoted_string(value) elif value[0] in SPECIALS: @@ -1616,8 +1616,8 @@ def get_domain(value): token, value = get_dot_atom(value) except errors.HeaderParseError: token, value = get_atom(value) - if value and value[0] == '@': - raise errors.HeaderParseError('Invalid Domain') + if value and value[0] == '@': + raise errors.HeaderParseError('Invalid Domain') if leader is not None: token[:0] = [leader] domain.append(token) @@ -1641,7 +1641,7 @@ def get_addr_spec(value): addr_spec.append(token) if not value or value[0] != '@': addr_spec.defects.append(errors.InvalidHeaderDefect( - "addr-spec local part with no domain")) + "addr-spec local part with no domain")) return addr_spec, value addr_spec.append(ValueTerminal('@', 'address-at-symbol')) token, value = get_domain(value[1:]) @@ -2026,118 +2026,118 @@ def get_address_list(value): value = value[1:] return address_list, value - -def get_no_fold_literal(value): - """ no-fold-literal = "[" *dtext "]" - """ - no_fold_literal = NoFoldLiteral() - if not value: - raise errors.HeaderParseError( - "expected no-fold-literal but found '{}'".format(value)) - if value[0] != '[': - raise errors.HeaderParseError( - "expected '[' at the start of no-fold-literal " - "but found '{}'".format(value)) - no_fold_literal.append(ValueTerminal('[', 'no-fold-literal-start')) - value = value[1:] - token, value = get_dtext(value) - no_fold_literal.append(token) - if not value or value[0] != ']': - raise errors.HeaderParseError( - "expected ']' at the end of no-fold-literal " - "but found '{}'".format(value)) - no_fold_literal.append(ValueTerminal(']', 'no-fold-literal-end')) - return no_fold_literal, value[1:] - -def get_msg_id(value): - """msg-id = [CFWS] "<" id-left '@' id-right ">" [CFWS] - id-left = dot-atom-text / obs-id-left - id-right = dot-atom-text / no-fold-literal / obs-id-right - no-fold-literal = "[" *dtext "]" - """ - msg_id = MsgID() - if value and value[0] in CFWS_LEADER: - token, value = get_cfws(value) - msg_id.append(token) - if not value or value[0] != '<': - raise errors.HeaderParseError( - "expected msg-id but found '{}'".format(value)) - msg_id.append(ValueTerminal('<', 'msg-id-start')) - value = value[1:] - # Parse id-left. - try: - token, value = get_dot_atom_text(value) - except errors.HeaderParseError: - try: - # obs-id-left is same as local-part of add-spec. - token, value = get_obs_local_part(value) - msg_id.defects.append(errors.ObsoleteHeaderDefect( - "obsolete id-left in msg-id")) - except errors.HeaderParseError: - raise errors.HeaderParseError( - "expected dot-atom-text or obs-id-left" - " but found '{}'".format(value)) - msg_id.append(token) - if not value or value[0] != '@': - msg_id.defects.append(errors.InvalidHeaderDefect( - "msg-id with no id-right")) - # Even though there is no id-right, if the local part - # ends with `>` let's just parse it too and return - # along with the defect. - if value and value[0] == '>': - msg_id.append(ValueTerminal('>', 'msg-id-end')) - value = value[1:] - return msg_id, value - msg_id.append(ValueTerminal('@', 'address-at-symbol')) - value = value[1:] - # Parse id-right. - try: - token, value = get_dot_atom_text(value) - except errors.HeaderParseError: - try: - token, value = get_no_fold_literal(value) - except errors.HeaderParseError as e: - try: - token, value = get_domain(value) - msg_id.defects.append(errors.ObsoleteHeaderDefect( - "obsolete id-right in msg-id")) - except errors.HeaderParseError: - raise errors.HeaderParseError( - "expected dot-atom-text, no-fold-literal or obs-id-right" - " but found '{}'".format(value)) - msg_id.append(token) - if value and value[0] == '>': - value = value[1:] - else: - msg_id.defects.append(errors.InvalidHeaderDefect( - "missing trailing '>' on msg-id")) - msg_id.append(ValueTerminal('>', 'msg-id-end')) - if value and value[0] in CFWS_LEADER: - token, value = get_cfws(value) - msg_id.append(token) - return msg_id, value - - -def parse_message_id(value): - """message-id = "Message-ID:" msg-id CRLF - """ - message_id = MessageID() - try: - token, value = get_msg_id(value) - message_id.append(token) - except errors.HeaderParseError as ex: - token = get_unstructured(value) - message_id = InvalidMessageID(token) - message_id.defects.append( - errors.InvalidHeaderDefect("Invalid msg-id: {!r}".format(ex))) - else: - # Value after parsing a valid msg_id should be None. - if value: - message_id.defects.append(errors.InvalidHeaderDefect( - "Unexpected {!r}".format(value))) - - return message_id - + +def get_no_fold_literal(value): + """ no-fold-literal = "[" *dtext "]" + """ + no_fold_literal = NoFoldLiteral() + if not value: + raise errors.HeaderParseError( + "expected no-fold-literal but found '{}'".format(value)) + if value[0] != '[': + raise errors.HeaderParseError( + "expected '[' at the start of no-fold-literal " + "but found '{}'".format(value)) + no_fold_literal.append(ValueTerminal('[', 'no-fold-literal-start')) + value = value[1:] + token, value = get_dtext(value) + no_fold_literal.append(token) + if not value or value[0] != ']': + raise errors.HeaderParseError( + "expected ']' at the end of no-fold-literal " + "but found '{}'".format(value)) + no_fold_literal.append(ValueTerminal(']', 'no-fold-literal-end')) + return no_fold_literal, value[1:] + +def get_msg_id(value): + """msg-id = [CFWS] "<" id-left '@' id-right ">" [CFWS] + id-left = dot-atom-text / obs-id-left + id-right = dot-atom-text / no-fold-literal / obs-id-right + no-fold-literal = "[" *dtext "]" + """ + msg_id = MsgID() + if value and value[0] in CFWS_LEADER: + token, value = get_cfws(value) + msg_id.append(token) + if not value or value[0] != '<': + raise errors.HeaderParseError( + "expected msg-id but found '{}'".format(value)) + msg_id.append(ValueTerminal('<', 'msg-id-start')) + value = value[1:] + # Parse id-left. + try: + token, value = get_dot_atom_text(value) + except errors.HeaderParseError: + try: + # obs-id-left is same as local-part of add-spec. + token, value = get_obs_local_part(value) + msg_id.defects.append(errors.ObsoleteHeaderDefect( + "obsolete id-left in msg-id")) + except errors.HeaderParseError: + raise errors.HeaderParseError( + "expected dot-atom-text or obs-id-left" + " but found '{}'".format(value)) + msg_id.append(token) + if not value or value[0] != '@': + msg_id.defects.append(errors.InvalidHeaderDefect( + "msg-id with no id-right")) + # Even though there is no id-right, if the local part + # ends with `>` let's just parse it too and return + # along with the defect. + if value and value[0] == '>': + msg_id.append(ValueTerminal('>', 'msg-id-end')) + value = value[1:] + return msg_id, value + msg_id.append(ValueTerminal('@', 'address-at-symbol')) + value = value[1:] + # Parse id-right. + try: + token, value = get_dot_atom_text(value) + except errors.HeaderParseError: + try: + token, value = get_no_fold_literal(value) + except errors.HeaderParseError as e: + try: + token, value = get_domain(value) + msg_id.defects.append(errors.ObsoleteHeaderDefect( + "obsolete id-right in msg-id")) + except errors.HeaderParseError: + raise errors.HeaderParseError( + "expected dot-atom-text, no-fold-literal or obs-id-right" + " but found '{}'".format(value)) + msg_id.append(token) + if value and value[0] == '>': + value = value[1:] + else: + msg_id.defects.append(errors.InvalidHeaderDefect( + "missing trailing '>' on msg-id")) + msg_id.append(ValueTerminal('>', 'msg-id-end')) + if value and value[0] in CFWS_LEADER: + token, value = get_cfws(value) + msg_id.append(token) + return msg_id, value + + +def parse_message_id(value): + """message-id = "Message-ID:" msg-id CRLF + """ + message_id = MessageID() + try: + token, value = get_msg_id(value) + message_id.append(token) + except errors.HeaderParseError as ex: + token = get_unstructured(value) + message_id = InvalidMessageID(token) + message_id.defects.append( + errors.InvalidHeaderDefect("Invalid msg-id: {!r}".format(ex))) + else: + # Value after parsing a valid msg_id should be None. + if value: + message_id.defects.append(errors.InvalidHeaderDefect( + "Unexpected {!r}".format(value))) + + return message_id + # # XXX: As I begin to add additional header parsers, I'm realizing we probably # have two level of parser routines: the get_XXX methods that get a token in @@ -2535,9 +2535,9 @@ def get_parameter(value): while value: if value[0] in WSP: token, value = get_fws(value) - elif value[0] == '"': - token = ValueTerminal('"', 'DQUOTE') - value = value[1:] + elif value[0] == '"': + token = ValueTerminal('"', 'DQUOTE') + value = value[1:] else: token, value = get_qcontent(value) v.append(token) @@ -2558,7 +2558,7 @@ def parse_mime_parameters(value): the formal RFC grammar, but it is more convenient for us for the set of parameters to be treated as its own TokenList. - This is 'parse' routine because it consumes the remaining value, but it + This is 'parse' routine because it consumes the remaining value, but it would never be called to parse a full header. Instead it is called to parse everything after the non-parameter value of a specific MIME header. @@ -2764,7 +2764,7 @@ def _refold_parse_tree(parse_tree, *, policy): """ # max_line_length 0/None means no limit, ie: infinitely long. - maxlen = policy.max_line_length or sys.maxsize + maxlen = policy.max_line_length or sys.maxsize encoding = 'utf-8' if policy.utf8 else 'us-ascii' lines = [''] last_ew = None @@ -2778,9 +2778,9 @@ def _refold_parse_tree(parse_tree, *, policy): wrap_as_ew_blocked -= 1 continue tstr = str(part) - if part.token_type == 'ptext' and set(tstr) & SPECIALS: - # Encode if tstr contains special characters. - want_encoding = True + if part.token_type == 'ptext' and set(tstr) & SPECIALS: + # Encode if tstr contains special characters. + want_encoding = True try: tstr.encode(encoding) charset = encoding @@ -2802,7 +2802,7 @@ def _refold_parse_tree(parse_tree, *, policy): want_encoding = False last_ew = None if part.syntactic_break: - encoded_part = part.fold(policy=policy)[:-len(policy.linesep)] + encoded_part = part.fold(policy=policy)[:-len(policy.linesep)] if policy.linesep not in encoded_part: # It fits on a single line if len(encoded_part) > maxlen - len(lines[-1]): @@ -2837,7 +2837,7 @@ def _refold_parse_tree(parse_tree, *, policy): newline = _steal_trailing_WSP_if_exists(lines) if newline or part.startswith_fws(): lines.append(newline + tstr) - last_ew = None + last_ew = None continue if not hasattr(part, 'encode'): # It's not a terminal, try folding the subparts. @@ -2891,36 +2891,36 @@ def _fold_as_ew(to_encode, lines, maxlen, last_ew, ew_combine_allowed, charset): trailing_wsp = to_encode[-1] to_encode = to_encode[:-1] new_last_ew = len(lines[-1]) if last_ew is None else last_ew - - encode_as = 'utf-8' if charset == 'us-ascii' else charset - - # The RFC2047 chrome takes up 7 characters plus the length - # of the charset name. - chrome_len = len(encode_as) + 7 - - if (chrome_len + 1) >= maxlen: - raise errors.HeaderParseError( - "max_line_length is too small to fit an encoded word") - + + encode_as = 'utf-8' if charset == 'us-ascii' else charset + + # The RFC2047 chrome takes up 7 characters plus the length + # of the charset name. + chrome_len = len(encode_as) + 7 + + if (chrome_len + 1) >= maxlen: + raise errors.HeaderParseError( + "max_line_length is too small to fit an encoded word") + while to_encode: remaining_space = maxlen - len(lines[-1]) - text_space = remaining_space - chrome_len + text_space = remaining_space - chrome_len if text_space <= 0: lines.append(' ') continue - - to_encode_word = to_encode[:text_space] - encoded_word = _ew.encode(to_encode_word, charset=encode_as) - excess = len(encoded_word) - remaining_space - while excess > 0: - # Since the chunk to encode is guaranteed to fit into less than 100 characters, - # shrinking it by one at a time shouldn't take long. - to_encode_word = to_encode_word[:-1] - encoded_word = _ew.encode(to_encode_word, charset=encode_as) - excess = len(encoded_word) - remaining_space - lines[-1] += encoded_word - to_encode = to_encode[len(to_encode_word):] - + + to_encode_word = to_encode[:text_space] + encoded_word = _ew.encode(to_encode_word, charset=encode_as) + excess = len(encoded_word) - remaining_space + while excess > 0: + # Since the chunk to encode is guaranteed to fit into less than 100 characters, + # shrinking it by one at a time shouldn't take long. + to_encode_word = to_encode_word[:-1] + encoded_word = _ew.encode(to_encode_word, charset=encode_as) + excess = len(encoded_word) - remaining_space + lines[-1] += encoded_word + to_encode = to_encode[len(to_encode_word):] + if to_encode: lines.append(' ') new_last_ew = len(lines[-1]) |