aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/tools/python3/Lib/email/_header_value_parser.py
diff options
context:
space:
mode:
authorAlexander Smirnov <alex@ydb.tech>2024-07-08 15:54:05 +0000
committerAlexander Smirnov <alex@ydb.tech>2024-07-08 15:54:05 +0000
commitfc7be18c76af2e700641f3598c4856baeef1428e (patch)
tree11dbca45eb321c3a4dd08b12152acc6ef5dd3fa9 /contrib/tools/python3/Lib/email/_header_value_parser.py
parentec0e7ed6da6fb317741fd8468602949a1362eca5 (diff)
parentc92cb9d3a19331916f0c274d80e67f02a62caa9b (diff)
downloadydb-fc7be18c76af2e700641f3598c4856baeef1428e.tar.gz
Merge branch 'rightlib' into mergelibs-240708-1553
Diffstat (limited to 'contrib/tools/python3/Lib/email/_header_value_parser.py')
-rw-r--r--contrib/tools/python3/Lib/email/_header_value_parser.py102
1 files changed, 82 insertions, 20 deletions
diff --git a/contrib/tools/python3/Lib/email/_header_value_parser.py b/contrib/tools/python3/Lib/email/_header_value_parser.py
index e4a342d446..ab3c3031ef 100644
--- a/contrib/tools/python3/Lib/email/_header_value_parser.py
+++ b/contrib/tools/python3/Lib/email/_header_value_parser.py
@@ -566,12 +566,14 @@ class DisplayName(Phrase):
if res[0].token_type == 'cfws':
res.pop(0)
else:
- if res[0][0].token_type == 'cfws':
+ if (isinstance(res[0], TokenList) and
+ res[0][0].token_type == 'cfws'):
res[0] = TokenList(res[0][1:])
if res[-1].token_type == 'cfws':
res.pop()
else:
- if res[-1][-1].token_type == 'cfws':
+ if (isinstance(res[-1], TokenList) and
+ res[-1][-1].token_type == 'cfws'):
res[-1] = TokenList(res[-1][:-1])
return res.value
@@ -586,9 +588,13 @@ class DisplayName(Phrase):
quote = True
if len(self) != 0 and quote:
pre = post = ''
- if self[0].token_type=='cfws' or self[0][0].token_type=='cfws':
+ if (self[0].token_type == 'cfws' or
+ isinstance(self[0], TokenList) and
+ self[0][0].token_type == 'cfws'):
pre = ' '
- if self[-1].token_type=='cfws' or self[-1][-1].token_type=='cfws':
+ if (self[-1].token_type == 'cfws' or
+ isinstance(self[-1], TokenList) and
+ self[-1][-1].token_type == 'cfws'):
post = ' '
return pre+quote_string(self.display_name)+post
else:
@@ -950,6 +956,7 @@ class _InvalidEwError(errors.HeaderParseError):
DOT = ValueTerminal('.', 'dot')
ListSeparator = ValueTerminal(',', 'list-separator')
ListSeparator.as_ew_allowed = False
+ListSeparator.syntactic_break = False
RouteComponentMarker = ValueTerminal('@', 'route-component-marker')
#
@@ -1207,7 +1214,7 @@ def get_bare_quoted_string(value):
value is the text between the quote marks, with whitespace
preserved and quoted pairs decoded.
"""
- if value[0] != '"':
+ if not value or value[0] != '"':
raise errors.HeaderParseError(
"expected '\"' but found '{}'".format(value))
bare_quoted_string = BareQuotedString()
@@ -1448,7 +1455,7 @@ def get_local_part(value):
"""
local_part = LocalPart()
leader = None
- if value[0] in CFWS_LEADER:
+ if value and value[0] in CFWS_LEADER:
leader, value = get_cfws(value)
if not value:
raise errors.HeaderParseError(
@@ -1514,13 +1521,18 @@ def get_obs_local_part(value):
raise
token, value = get_cfws(value)
obs_local_part.append(token)
+ if not obs_local_part:
+ raise errors.HeaderParseError(
+ "expected obs-local-part but found '{}'".format(value))
if (obs_local_part[0].token_type == 'dot' or
obs_local_part[0].token_type=='cfws' and
+ len(obs_local_part) > 1 and
obs_local_part[1].token_type=='dot'):
obs_local_part.defects.append(errors.InvalidHeaderDefect(
"Invalid leading '.' in local part"))
if (obs_local_part[-1].token_type == 'dot' or
obs_local_part[-1].token_type=='cfws' and
+ len(obs_local_part) > 1 and
obs_local_part[-2].token_type=='dot'):
obs_local_part.defects.append(errors.InvalidHeaderDefect(
"Invalid trailing '.' in local part"))
@@ -1602,7 +1614,7 @@ def get_domain(value):
"""
domain = Domain()
leader = None
- if value[0] in CFWS_LEADER:
+ if value and value[0] in CFWS_LEADER:
leader, value = get_cfws(value)
if not value:
raise errors.HeaderParseError(
@@ -1678,6 +1690,8 @@ def get_obs_route(value):
if value[0] in CFWS_LEADER:
token, value = get_cfws(value)
obs_route.append(token)
+ if not value:
+ break
if value[0] == '@':
obs_route.append(RouteComponentMarker)
token, value = get_domain(value[1:])
@@ -1696,7 +1710,7 @@ def get_angle_addr(value):
"""
angle_addr = AngleAddr()
- if value[0] in CFWS_LEADER:
+ if value and value[0] in CFWS_LEADER:
token, value = get_cfws(value)
angle_addr.append(token)
if not value or value[0] != '<':
@@ -1706,7 +1720,7 @@ def get_angle_addr(value):
value = value[1:]
# Although it is not legal per RFC5322, SMTP uses '<>' in certain
# circumstances.
- if value[0] == '>':
+ if value and value[0] == '>':
angle_addr.append(ValueTerminal('>', 'angle-addr-end'))
angle_addr.defects.append(errors.InvalidHeaderDefect(
"null addr-spec in angle-addr"))
@@ -1758,6 +1772,9 @@ def get_name_addr(value):
name_addr = NameAddr()
# Both the optional display name and the angle-addr can start with cfws.
leader = None
+ if not value:
+ raise errors.HeaderParseError(
+ "expected name-addr but found '{}'".format(value))
if value[0] in CFWS_LEADER:
leader, value = get_cfws(value)
if not value:
@@ -1772,7 +1789,10 @@ def get_name_addr(value):
raise errors.HeaderParseError(
"expected name-addr but found '{}'".format(token))
if leader is not None:
- token[0][:0] = [leader]
+ if isinstance(token[0], TokenList):
+ token[0][:0] = [leader]
+ else:
+ token[:0] = [leader]
leader = None
name_addr.append(token)
token, value = get_angle_addr(value)
@@ -2765,11 +2785,15 @@ def _refold_parse_tree(parse_tree, *, policy):
# max_line_length 0/None means no limit, ie: infinitely long.
maxlen = policy.max_line_length or sys.maxsize
encoding = 'utf-8' if policy.utf8 else 'us-ascii'
- lines = ['']
- last_ew = None
+ lines = [''] # Folded lines to be output
+ leading_whitespace = '' # When we have whitespace between two encoded
+ # words, we may need to encode the whitespace
+ # at the beginning of the second word.
+ last_ew = None # Points to the last encoded character if there's an ew on
+ # the line
last_charset = None
wrap_as_ew_blocked = 0
- want_encoding = False
+ want_encoding = False # This is set to True if we need to encode this part
end_ew_not_allowed = Terminal('', 'wrap_as_ew_blocked')
parts = list(parse_tree)
while parts:
@@ -2793,10 +2817,12 @@ def _refold_parse_tree(parse_tree, *, policy):
# 'charset' property on the policy.
charset = 'utf-8'
want_encoding = True
+
if part.token_type == 'mime-parameters':
# Mime parameter folding (using RFC2231) is extra special.
_fold_mime_parameters(part, lines, maxlen, encoding)
continue
+
if want_encoding and not wrap_as_ew_blocked:
if not part.as_ew_allowed:
want_encoding = False
@@ -2819,7 +2845,9 @@ def _refold_parse_tree(parse_tree, *, policy):
if not hasattr(part, 'encode'):
# It's not a Terminal, do each piece individually.
parts = list(part) + parts
- else:
+ want_encoding = False
+ continue
+ elif part.as_ew_allowed:
# It's a terminal, wrap it as an encoded word, possibly
# combining it with previously encoded words if allowed.
if (last_ew is not None and
@@ -2828,21 +2856,44 @@ def _refold_parse_tree(parse_tree, *, policy):
last_charset == 'utf-8' and charset != 'us-ascii')):
last_ew = None
last_ew = _fold_as_ew(tstr, lines, maxlen, last_ew,
- part.ew_combine_allowed, charset)
+ part.ew_combine_allowed, charset, leading_whitespace)
+ # This whitespace has been added to the lines in _fold_as_ew()
+ # so clear it now.
+ leading_whitespace = ''
last_charset = charset
- want_encoding = False
- continue
+ want_encoding = False
+ continue
+ else:
+ # It's a terminal which should be kept non-encoded
+ # (e.g. a ListSeparator).
+ last_ew = None
+ want_encoding = False
+ # fall through
+
if len(tstr) <= maxlen - len(lines[-1]):
lines[-1] += tstr
continue
+
# This part is too long to fit. The RFC wants us to break at
# "major syntactic breaks", so unless we don't consider this
# to be one, check if it will fit on the next line by itself.
+ leading_whitespace = ''
if (part.syntactic_break and
len(tstr) + 1 <= maxlen):
newline = _steal_trailing_WSP_if_exists(lines)
if newline or part.startswith_fws():
+ # We're going to fold the data onto a new line here. Due to
+ # the way encoded strings handle continuation lines, we need to
+ # be prepared to encode any whitespace if the next line turns
+ # out to start with an encoded word.
lines.append(newline + tstr)
+
+ whitespace_accumulator = []
+ for char in lines[-1]:
+ if char not in WSP:
+ break
+ whitespace_accumulator.append(char)
+ leading_whitespace = ''.join(whitespace_accumulator)
last_ew = None
continue
if not hasattr(part, 'encode'):
@@ -2866,9 +2917,10 @@ def _refold_parse_tree(parse_tree, *, policy):
else:
# We can't fold it onto the next line either...
lines[-1] += tstr
+
return policy.linesep.join(lines) + policy.linesep
-def _fold_as_ew(to_encode, lines, maxlen, last_ew, ew_combine_allowed, charset):
+def _fold_as_ew(to_encode, lines, maxlen, last_ew, ew_combine_allowed, charset, leading_whitespace):
"""Fold string to_encode into lines as encoded word, combining if allowed.
Return the new value for last_ew, or None if ew_combine_allowed is False.
@@ -2883,7 +2935,7 @@ def _fold_as_ew(to_encode, lines, maxlen, last_ew, ew_combine_allowed, charset):
to_encode = str(
get_unstructured(lines[-1][last_ew:] + to_encode))
lines[-1] = lines[-1][:last_ew]
- if to_encode[0] in WSP:
+ elif to_encode[0] in WSP:
# We're joining this to non-encoded text, so don't encode
# the leading blank.
leading_wsp = to_encode[0]
@@ -2891,6 +2943,7 @@ def _fold_as_ew(to_encode, lines, maxlen, last_ew, ew_combine_allowed, charset):
if (len(lines[-1]) == maxlen):
lines.append(_steal_trailing_WSP_if_exists(lines))
lines[-1] += leading_wsp
+
trailing_wsp = ''
if to_encode[-1] in WSP:
# Likewise for the trailing space.
@@ -2910,11 +2963,20 @@ def _fold_as_ew(to_encode, lines, maxlen, last_ew, ew_combine_allowed, charset):
while to_encode:
remaining_space = maxlen - len(lines[-1])
- text_space = remaining_space - chrome_len
+ text_space = remaining_space - chrome_len - len(leading_whitespace)
if text_space <= 0:
lines.append(' ')
continue
+ # If we are at the start of a continuation line, prepend whitespace
+ # (we only want to do this when the line starts with an encoded word
+ # but if we're folding in this helper function, then we know that we
+ # are going to be writing out an encoded word.)
+ if len(lines) > 1 and len(lines[-1]) == 1 and leading_whitespace:
+ encoded_word = _ew.encode(leading_whitespace, charset=encode_as)
+ lines[-1] += encoded_word
+ leading_whitespace = ''
+
to_encode_word = to_encode[:text_space]
encoded_word = _ew.encode(to_encode_word, charset=encode_as)
excess = len(encoded_word) - remaining_space