aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/python/Twisted/py3/twisted/mail/relaymanager.py
diff options
context:
space:
mode:
authorshmel1k <shmel1k@ydb.tech>2023-11-26 18:16:14 +0300
committershmel1k <shmel1k@ydb.tech>2023-11-26 18:43:30 +0300
commitb8cf9e88f4c5c64d9406af533d8948deb050d695 (patch)
tree218eb61fb3c3b96ec08b4d8cdfef383104a87d63 /contrib/python/Twisted/py3/twisted/mail/relaymanager.py
parent523f645a83a0ec97a0332dbc3863bb354c92a328 (diff)
downloadydb-b8cf9e88f4c5c64d9406af533d8948deb050d695.tar.gz
add kikimr_configure
Diffstat (limited to 'contrib/python/Twisted/py3/twisted/mail/relaymanager.py')
-rw-r--r--contrib/python/Twisted/py3/twisted/mail/relaymanager.py1135
1 files changed, 1135 insertions, 0 deletions
diff --git a/contrib/python/Twisted/py3/twisted/mail/relaymanager.py b/contrib/python/Twisted/py3/twisted/mail/relaymanager.py
new file mode 100644
index 0000000000..18cc287833
--- /dev/null
+++ b/contrib/python/Twisted/py3/twisted/mail/relaymanager.py
@@ -0,0 +1,1135 @@
+# -*- test-case-name: twisted.mail.test.test_mail -*-
+# Copyright (c) Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Infrastructure for relaying mail through a smart host.
+
+Traditional peer-to-peer email has been increasingly replaced by smart host
+configurations. Instead of sending mail directly to the recipient, a sender
+sends mail to a smart host. The smart host finds the mail exchange server for
+the recipient and sends on the message.
+"""
+
+import email.utils
+import os
+import pickle
+import time
+from typing import Type
+
+from twisted.application import internet
+from twisted.internet import protocol
+from twisted.internet.defer import Deferred, DeferredList
+from twisted.internet.error import DNSLookupError
+from twisted.internet.protocol import connectionDone
+from twisted.mail import bounce, relay, smtp
+from twisted.python import log
+from twisted.python.failure import Failure
+
+
+class ManagedRelayerMixin:
+ """
+ SMTP Relayer which notifies a manager
+
+ Notify the manager about successful mail, failed mail
+ and broken connections
+ """
+
+ def __init__(self, manager):
+ self.manager = manager
+
+ @property
+ def factory(self):
+ return self._factory
+
+ @factory.setter
+ def factory(self, value):
+ self._factory = value
+
+ def sentMail(self, code, resp, numOk, addresses, log):
+ """
+ called when e-mail has been sent
+
+ we will always get 0 or 1 addresses.
+ """
+ message = self.names[0]
+ if code in smtp.SUCCESS:
+ self.manager.notifySuccess(self.factory, message)
+ else:
+ self.manager.notifyFailure(self.factory, message)
+ del self.messages[0]
+ del self.names[0]
+
+ def connectionLost(self, reason: Failure = connectionDone) -> None:
+ """
+ called when connection is broken
+
+ notify manager we will try to send no more e-mail
+ """
+ self.manager.notifyDone(self.factory)
+
+
+class SMTPManagedRelayer(ManagedRelayerMixin, relay.SMTPRelayer): # type: ignore[misc]
+ """
+ An SMTP managed relayer.
+
+ This managed relayer is an SMTP client which is responsible for sending a
+ set of messages and keeping an attempt manager informed about its progress.
+
+ @type factory: L{SMTPManagedRelayerFactory}
+ @ivar factory: The factory that created this relayer. This must be set by
+ the factory.
+ """
+
+ def __init__(self, messages, manager, *args, **kw):
+ """
+ @type messages: L{list} of L{bytes}
+ @param messages: The base filenames of messages to be relayed.
+
+ @type manager: L{_AttemptManager}
+ @param manager: An attempt manager.
+
+ @type args: 1-L{tuple} of (0) L{bytes} or 2-L{tuple} of
+ (0) L{bytes}, (1) L{int}
+ @param args: Positional arguments for L{SMTPClient.__init__}
+
+ @type kw: L{dict}
+ @param kw: Keyword arguments for L{SMTPClient.__init__}
+ """
+ ManagedRelayerMixin.__init__(self, manager)
+ relay.SMTPRelayer.__init__(self, messages, *args, **kw)
+
+
+class ESMTPManagedRelayer(ManagedRelayerMixin, relay.ESMTPRelayer): # type: ignore[misc]
+ """
+ An ESMTP managed relayer.
+
+ This managed relayer is an ESMTP client which is responsible for sending a
+ set of messages and keeping an attempt manager informed about its progress.
+ """
+
+ def __init__(self, messages, manager, *args, **kw):
+ """
+ @type messages: L{list} of L{bytes}
+ @param messages: The base filenames of messages to be relayed.
+
+ @type manager: L{_AttemptManager}
+ @param manager: An attempt manager.
+
+ @type args: 3-L{tuple} of (0) L{bytes}, (1) L{None} or
+ L{ClientContextFactory
+ <twisted.internet.ssl.ClientContextFactory>}, (2) L{bytes} or
+ 4-L{tuple} of (0) L{bytes}, (1) L{None} or
+ L{ClientContextFactory
+ <twisted.internet.ssl.ClientContextFactory>}, (2) L{bytes},
+ (3) L{int}
+ @param args: Positional arguments for L{ESMTPClient.__init__}
+
+ @type kw: L{dict}
+ @param kw: Keyword arguments for L{ESMTPClient.__init__}
+ """
+ ManagedRelayerMixin.__init__(self, manager)
+ relay.ESMTPRelayer.__init__(self, messages, *args, **kw)
+
+
+class SMTPManagedRelayerFactory(protocol.ClientFactory):
+ """
+ A factory to create an L{SMTPManagedRelayer}.
+
+ This factory creates a managed relayer which relays a set of messages over
+ SMTP and informs an attempt manager of its progress.
+
+ @ivar messages: See L{__init__}
+ @ivar manager: See L{__init__}
+
+ @type protocol: callable which returns L{SMTPManagedRelayer}
+ @ivar protocol: A callable which returns a managed relayer for SMTP. See
+ L{SMTPManagedRelayer.__init__} for parameters to the callable.
+
+ @type pArgs: 1-L{tuple} of (0) L{bytes} or 2-L{tuple} of
+ (0) L{bytes}, (1), L{int}
+ @ivar pArgs: Positional arguments for L{SMTPClient.__init__}
+
+ @type pKwArgs: L{dict}
+ @ivar pKwArgs: Keyword arguments for L{SMTPClient.__init__}
+ """
+
+ protocol: "Type[protocol.Protocol]" = SMTPManagedRelayer
+
+ def __init__(self, messages, manager, *args, **kw):
+ """
+ @type messages: L{list} of L{bytes}
+ @param messages: The base filenames of messages to be relayed.
+
+ @type manager: L{_AttemptManager}
+ @param manager: An attempt manager.
+
+ @type args: 1-L{tuple} of (0) L{bytes} or 2-L{tuple} of
+ (0) L{bytes}, (1), L{int}
+ @param args: Positional arguments for L{SMTPClient.__init__}
+
+ @type kw: L{dict}
+ @param kw: Keyword arguments for L{SMTPClient.__init__}
+ """
+ self.messages = messages
+ self.manager = manager
+ self.pArgs = args
+ self.pKwArgs = kw
+
+ def buildProtocol(self, addr):
+ """
+ Create an L{SMTPManagedRelayer}.
+
+ @type addr: L{IAddress <twisted.internet.interfaces.IAddress>} provider
+ @param addr: The address of the SMTP server.
+
+ @rtype: L{SMTPManagedRelayer}
+ @return: A managed relayer for SMTP.
+ """
+ protocol = self.protocol(
+ self.messages, self.manager, *self.pArgs, **self.pKwArgs
+ )
+ protocol.factory = self
+ return protocol
+
+ def clientConnectionFailed(self, connector, reason):
+ """
+ Notify the attempt manager that a connection could not be established.
+
+ @type connector: L{IConnector <twisted.internet.interfaces.IConnector>}
+ provider
+ @param connector: A connector.
+
+ @type reason: L{Failure}
+ @param reason: The reason the connection attempt failed.
+ """
+ self.manager.notifyNoConnection(self)
+ self.manager.notifyDone(self)
+
+
+class ESMTPManagedRelayerFactory(SMTPManagedRelayerFactory):
+ """
+ A factory to create an L{ESMTPManagedRelayer}.
+
+ This factory creates a managed relayer which relays a set of messages over
+ ESMTP and informs an attempt manager of its progress.
+
+ @type protocol: callable which returns L{ESMTPManagedRelayer}
+ @ivar protocol: A callable which returns a managed relayer for ESMTP. See
+ L{ESMTPManagedRelayer.__init__} for parameters to the callable.
+
+ @ivar secret: See L{__init__}
+ @ivar contextFactory: See L{__init__}
+ """
+
+ protocol = ESMTPManagedRelayer
+
+ def __init__(self, messages, manager, secret, contextFactory, *args, **kw):
+ """
+ @type messages: L{list} of L{bytes}
+ @param messages: The base filenames of messages to be relayed.
+
+ @type manager: L{_AttemptManager}
+ @param manager: An attempt manager.
+
+ @type secret: L{bytes}
+ @param secret: A string for the authentication challenge response.
+
+ @type contextFactory: L{None} or
+ L{ClientContextFactory <twisted.internet.ssl.ClientContextFactory>}
+ @param contextFactory: An SSL context factory.
+
+ @type args: 1-L{tuple} of (0) L{bytes} or 2-L{tuple} of
+ (0) L{bytes}, (1), L{int}
+ @param args: Positional arguments for L{SMTPClient.__init__}
+
+ @param kw: Keyword arguments for L{SMTPClient.__init__}
+ """
+ self.secret = secret
+ self.contextFactory = contextFactory
+ SMTPManagedRelayerFactory.__init__(self, messages, manager, *args, **kw)
+
+ def buildProtocol(self, addr):
+ """
+ Create an L{ESMTPManagedRelayer}.
+
+ @type addr: L{IAddress <twisted.internet.interfaces.IAddress>} provider
+ @param addr: The address of the ESMTP server.
+
+ @rtype: L{ESMTPManagedRelayer}
+ @return: A managed relayer for ESMTP.
+ """
+ s = self.secret and self.secret(addr)
+ protocol = self.protocol(
+ self.messages,
+ self.manager,
+ s,
+ self.contextFactory,
+ *self.pArgs,
+ **self.pKwArgs,
+ )
+ protocol.factory = self
+ return protocol
+
+
+class Queue:
+ """
+ A queue for messages to be relayed.
+
+ @ivar directory: See L{__init__}
+
+ @type n: L{int}
+ @ivar n: A number used to form unique filenames.
+
+ @type waiting: L{dict} of L{bytes}
+ @ivar waiting: The base filenames of messages waiting to be relayed.
+
+ @type relayed: L{dict} of L{bytes}
+ @ivar relayed: The base filenames of messages in the process of being
+ relayed.
+
+ @type noisy: L{bool}
+ @ivar noisy: A flag which determines whether informational log messages
+ will be generated (C{True}) or not (C{False}).
+ """
+
+ noisy = True
+
+ def __init__(self, directory):
+ """
+ Initialize non-volatile state.
+
+ @type directory: L{bytes}
+ @param directory: The pathname of the directory holding messages in the
+ queue.
+ """
+ self.directory = directory
+ self._init()
+
+ def _init(self):
+ """
+ Initialize volatile state.
+ """
+ self.n = 0
+ self.waiting = {}
+ self.relayed = {}
+ self.readDirectory()
+
+ def __getstate__(self):
+ """
+ Create a representation of the non-volatile state of the queue.
+
+ @rtype: L{dict} mapping L{bytes} to L{object}
+ @return: The non-volatile state of the queue.
+ """
+ return {"directory": self.directory}
+
+ def __setstate__(self, state):
+ """
+ Restore the non-volatile state of the queue and recreate the volatile
+ state.
+
+ @type state: L{dict} mapping L{bytes} to L{object}
+ @param state: The non-volatile state of the queue.
+ """
+ self.__dict__.update(state)
+ self._init()
+
+ def readDirectory(self):
+ """
+ Scan the message directory for new messages.
+ """
+ for message in os.listdir(self.directory):
+ # Skip non data files
+ if message[-2:] != "-D":
+ continue
+ self.addMessage(message[:-2])
+
+ def getWaiting(self):
+ """
+ Return the base filenames of messages waiting to be relayed.
+
+ @rtype: L{list} of L{bytes}
+ @return: The base filenames of messages waiting to be relayed.
+ """
+ return self.waiting.keys()
+
+ def hasWaiting(self):
+ """
+ Return an indication of whether the queue has messages waiting to be
+ relayed.
+
+ @rtype: L{bool}
+ @return: C{True} if messages are waiting to be relayed. C{False}
+ otherwise.
+ """
+ return len(self.waiting) > 0
+
+ def getRelayed(self):
+ """
+ Return the base filenames of messages in the process of being relayed.
+
+ @rtype: L{list} of L{bytes}
+ @return: The base filenames of messages in the process of being
+ relayed.
+ """
+ return self.relayed.keys()
+
+ def setRelaying(self, message):
+ """
+ Mark a message as being relayed.
+
+ @type message: L{bytes}
+ @param message: The base filename of a message.
+ """
+ del self.waiting[message]
+ self.relayed[message] = 1
+
+ def setWaiting(self, message):
+ """
+ Mark a message as waiting to be relayed.
+
+ @type message: L{bytes}
+ @param message: The base filename of a message.
+ """
+ del self.relayed[message]
+ self.waiting[message] = 1
+
+ def addMessage(self, message):
+ """
+ Mark a message as waiting to be relayed unless it is in the process of
+ being relayed.
+
+ @type message: L{bytes}
+ @param message: The base filename of a message.
+ """
+ if message not in self.relayed:
+ self.waiting[message] = 1
+ if self.noisy:
+ log.msg("Set " + message + " waiting")
+
+ def done(self, message):
+ """
+ Remove a message from the queue.
+
+ @type message: L{bytes}
+ @param message: The base filename of a message.
+ """
+ message = os.path.basename(message)
+ os.remove(self.getPath(message) + "-D")
+ os.remove(self.getPath(message) + "-H")
+ del self.relayed[message]
+
+ def getPath(self, message):
+ """
+ Return the full base pathname of a message in the queue.
+
+ @type message: L{bytes}
+ @param message: The base filename of a message.
+
+ @rtype: L{bytes}
+ @return: The full base pathname of the message.
+ """
+ return os.path.join(self.directory, message)
+
+ def getEnvelope(self, message):
+ """
+ Get the envelope for a message.
+
+ @type message: L{bytes}
+ @param message: The base filename of a message.
+
+ @rtype: L{list} of two L{bytes}
+ @return: A list containing the origination and destination addresses
+ for the message.
+ """
+ with self.getEnvelopeFile(message) as f:
+ return pickle.load(f)
+
+ def getEnvelopeFile(self, message):
+ """
+ Return the envelope file for a message in the queue.
+
+ @type message: L{bytes}
+ @param message: The base filename of a message.
+
+ @rtype: file
+ @return: The envelope file for the message.
+ """
+ return open(os.path.join(self.directory, message + "-H"), "rb")
+
+ def createNewMessage(self):
+ """
+ Create a new message in the queue.
+
+ @rtype: 2-L{tuple} of (0) file, (1) L{FileMessage}
+ @return: The envelope file and a message receiver for a new message in
+ the queue.
+ """
+ fname = f"{os.getpid()}_{time.time()}_{self.n}_{id(self)}"
+ self.n = self.n + 1
+ headerFile = open(os.path.join(self.directory, fname + "-H"), "wb")
+ tempFilename = os.path.join(self.directory, fname + "-C")
+ finalFilename = os.path.join(self.directory, fname + "-D")
+ messageFile = open(tempFilename, "wb")
+
+ from twisted.mail.mail import FileMessage
+
+ return headerFile, FileMessage(messageFile, tempFilename, finalFilename)
+
+
+class _AttemptManager:
+ """
+ A manager for an attempt to relay a set of messages to a mail exchange
+ server.
+
+ @ivar manager: See L{__init__}
+
+ @type _completionDeferreds: L{list} of L{Deferred}
+ @ivar _completionDeferreds: Deferreds which are to be notified when the
+ attempt to relay is finished.
+ """
+
+ def __init__(self, manager, noisy=True, reactor=None):
+ """
+ @type manager: L{SmartHostSMTPRelayingManager}
+ @param manager: A smart host.
+
+ @type noisy: L{bool}
+ @param noisy: A flag which determines whether informational log
+ messages will be generated (L{True}) or not (L{False}).
+
+ @type reactor: L{IReactorTime
+ <twisted.internet.interfaces.IReactorTime>} provider
+ @param reactor: A reactor which will be used to schedule delayed calls.
+ """
+ self.manager = manager
+ self._completionDeferreds = []
+ self.noisy = noisy
+
+ if not reactor:
+ from twisted.internet import reactor
+ self.reactor = reactor
+
+ def getCompletionDeferred(self):
+ """
+ Return a deferred which will fire when the attempt to relay is
+ finished.
+
+ @rtype: L{Deferred}
+ @return: A deferred which will fire when the attempt to relay is
+ finished.
+ """
+ self._completionDeferreds.append(Deferred())
+ return self._completionDeferreds[-1]
+
+ def _finish(self, relay, message):
+ """
+ Remove a message from the relay queue and from the smart host's list of
+ messages being relayed.
+
+ @type relay: L{SMTPManagedRelayerFactory}
+ @param relay: The factory for the relayer which sent the message.
+
+ @type message: L{bytes}
+ @param message: The path of the file holding the message.
+ """
+ self.manager.managed[relay].remove(os.path.basename(message))
+ self.manager.queue.done(message)
+
+ def notifySuccess(self, relay, message):
+ """
+ Remove a message from the relay queue after it has been successfully
+ sent.
+
+ @type relay: L{SMTPManagedRelayerFactory}
+ @param relay: The factory for the relayer which sent the message.
+
+ @type message: L{bytes}
+ @param message: The path of the file holding the message.
+ """
+ if self.noisy:
+ log.msg("success sending %s, removing from queue" % message)
+ self._finish(relay, message)
+
+ def notifyFailure(self, relay, message):
+ """
+ Generate a bounce message for a message which cannot be relayed.
+
+ @type relay: L{SMTPManagedRelayerFactory}
+ @param relay: The factory for the relayer responsible for the message.
+
+ @type message: L{bytes}
+ @param message: The path of the file holding the message.
+ """
+ if self.noisy:
+ log.msg("could not relay " + message)
+ # Moshe - Bounce E-mail here
+ # Be careful: if it's a bounced bounce, silently
+ # discard it
+ message = os.path.basename(message)
+ with self.manager.queue.getEnvelopeFile(message) as fp:
+ from_, to = pickle.load(fp)
+ from_, to, bounceMessage = bounce.generateBounce(
+ open(self.manager.queue.getPath(message) + "-D"), from_, to
+ )
+ fp, outgoingMessage = self.manager.queue.createNewMessage()
+ with fp:
+ pickle.dump([from_, to], fp)
+ for line in bounceMessage.splitlines():
+ outgoingMessage.lineReceived(line)
+ outgoingMessage.eomReceived()
+ self._finish(relay, self.manager.queue.getPath(message))
+
+ def notifyDone(self, relay):
+ """
+ When the connection is lost or cannot be established, prepare to
+ resend unsent messages and fire all deferred which are waiting for
+ the completion of the attempt to relay.
+
+ @type relay: L{SMTPManagedRelayerFactory}
+ @param relay: The factory for the relayer for the connection.
+ """
+ for message in self.manager.managed.get(relay, ()):
+ if self.noisy:
+ log.msg("Setting " + message + " waiting")
+ self.manager.queue.setWaiting(message)
+ try:
+ del self.manager.managed[relay]
+ except KeyError:
+ pass
+ notifications = self._completionDeferreds
+ self._completionDeferreds = None
+ for d in notifications:
+ d.callback(None)
+
+ def notifyNoConnection(self, relay):
+ """
+ When a connection to the mail exchange server cannot be established,
+ prepare to resend messages later.
+
+ @type relay: L{SMTPManagedRelayerFactory}
+ @param relay: The factory for the relayer meant to use the connection.
+ """
+ # Back off a bit
+ try:
+ msgs = self.manager.managed[relay]
+ except KeyError:
+ log.msg("notifyNoConnection passed unknown relay!")
+ return
+
+ if self.noisy:
+ log.msg("Backing off on delivery of " + str(msgs))
+
+ def setWaiting(queue, messages):
+ map(queue.setWaiting, messages)
+
+ self.reactor.callLater(30, setWaiting, self.manager.queue, msgs)
+ del self.manager.managed[relay]
+
+
+class SmartHostSMTPRelayingManager:
+ """
+ A smart host which uses SMTP managed relayers to send messages from the
+ relay queue.
+
+ L{checkState} must be called periodically at which time the state of the
+ relay queue is checked and new relayers are created as needed.
+
+ In order to relay a set of messages to a mail exchange server, a smart host
+ creates an attempt manager and a managed relayer factory for that set of
+ messages. When a connection is made with the mail exchange server, the
+ managed relayer factory creates a managed relayer to send the messages.
+ The managed relayer reports on its progress to the attempt manager which,
+ in turn, updates the smart host's relay queue and information about its
+ managed relayers.
+
+ @ivar queue: See L{__init__}.
+ @ivar maxConnections: See L{__init__}.
+ @ivar maxMessagesPerConnection: See L{__init__}.
+
+ @type fArgs: 3-L{tuple} of (0) L{list} of L{bytes},
+ (1) L{_AttemptManager}, (2) L{bytes} or 4-L{tuple} of (0) L{list}
+ of L{bytes}, (1) L{_AttemptManager}, (2) L{bytes}, (3) L{int}
+ @ivar fArgs: Positional arguments for
+ L{SMTPManagedRelayerFactory.__init__}.
+
+ @type fKwArgs: L{dict}
+ @ivar fKwArgs: Keyword arguments for L{SMTPManagedRelayerFactory.__init__}.
+
+ @type factory: callable which returns L{SMTPManagedRelayerFactory}
+ @ivar factory: A callable which creates a factory for creating a managed
+ relayer. See L{SMTPManagedRelayerFactory.__init__} for parameters to
+ the callable.
+
+ @type PORT: L{int}
+ @ivar PORT: The port over which to connect to the SMTP server.
+
+ @type mxcalc: L{None} or L{MXCalculator}
+ @ivar mxcalc: A resource for mail exchange host lookups.
+
+ @type managed: L{dict} mapping L{SMTPManagedRelayerFactory} to L{list} of
+ L{bytes}
+ @ivar managed: A mapping of factory for a managed relayer to
+ filenames of messages the managed relayer is responsible for.
+ """
+
+ factory: Type[protocol.ClientFactory] = SMTPManagedRelayerFactory
+
+ PORT = 25
+
+ mxcalc = None
+
+ def __init__(self, queue, maxConnections=2, maxMessagesPerConnection=10):
+ """
+ Initialize a smart host.
+
+ The default values specify connection limits appropriate for a
+ low-volume smart host.
+
+ @type queue: L{Queue}
+ @param queue: A relay queue.
+
+ @type maxConnections: L{int}
+ @param maxConnections: The maximum number of concurrent connections to
+ SMTP servers.
+
+ @type maxMessagesPerConnection: L{int}
+ @param maxMessagesPerConnection: The maximum number of messages for
+ which a relayer will be given responsibility.
+ """
+ self.maxConnections = maxConnections
+ self.maxMessagesPerConnection = maxMessagesPerConnection
+ self.managed = {} # SMTP clients we're managing
+ self.queue = queue
+ self.fArgs = ()
+ self.fKwArgs = {}
+
+ def __getstate__(self):
+ """
+ Create a representation of the non-volatile state of this object.
+
+ @rtype: L{dict} mapping L{bytes} to L{object}
+ @return: The non-volatile state of the queue.
+ """
+ dct = self.__dict__.copy()
+ del dct["managed"]
+ return dct
+
+ def __setstate__(self, state):
+ """
+ Restore the non-volatile state of this object and recreate the volatile
+ state.
+
+ @type state: L{dict} mapping L{bytes} to L{object}
+ @param state: The non-volatile state of the queue.
+ """
+ self.__dict__.update(state)
+ self.managed = {}
+
+ def checkState(self):
+ """
+ Check the state of the relay queue and, if possible, launch relayers to
+ handle waiting messages.
+
+ @rtype: L{None} or L{Deferred}
+ @return: No return value if no further messages can be relayed or a
+ deferred which fires when all of the SMTP connections initiated by
+ this call have disconnected.
+ """
+ self.queue.readDirectory()
+ if len(self.managed) >= self.maxConnections:
+ return
+ if not self.queue.hasWaiting():
+ return
+
+ return self._checkStateMX()
+
+ def _checkStateMX(self):
+ nextMessages = self.queue.getWaiting()
+ nextMessages.reverse()
+
+ exchanges = {}
+ for msg in nextMessages:
+ from_, to = self.queue.getEnvelope(msg)
+ name, addr = email.utils.parseaddr(to)
+ parts = addr.split("@", 1)
+ if len(parts) != 2:
+ log.err("Illegal message destination: " + to)
+ continue
+ domain = parts[1]
+
+ self.queue.setRelaying(msg)
+ exchanges.setdefault(domain, []).append(self.queue.getPath(msg))
+ if len(exchanges) >= (self.maxConnections - len(self.managed)):
+ break
+
+ if self.mxcalc is None:
+ self.mxcalc = MXCalculator()
+
+ relays = []
+ for domain, msgs in exchanges.iteritems():
+ manager = _AttemptManager(self, self.queue.noisy)
+ factory = self.factory(msgs, manager, *self.fArgs, **self.fKwArgs)
+ self.managed[factory] = map(os.path.basename, msgs)
+ relayAttemptDeferred = manager.getCompletionDeferred()
+ connectSetupDeferred = self.mxcalc.getMX(domain)
+ connectSetupDeferred.addCallback(lambda mx: str(mx.name))
+ connectSetupDeferred.addCallback(self._cbExchange, self.PORT, factory)
+ connectSetupDeferred.addErrback(
+ lambda err: (relayAttemptDeferred.errback(err), err)[1]
+ )
+ connectSetupDeferred.addErrback(self._ebExchange, factory, domain)
+ relays.append(relayAttemptDeferred)
+ return DeferredList(relays)
+
+ def _cbExchange(self, address, port, factory):
+ """
+ Initiate a connection with a mail exchange server.
+
+ This callback function runs after mail exchange server for the domain
+ has been looked up.
+
+ @type address: L{bytes}
+ @param address: The hostname of a mail exchange server.
+
+ @type port: L{int}
+ @param port: A port number.
+
+ @type factory: L{SMTPManagedRelayerFactory}
+ @param factory: A factory which can create a relayer for the mail
+ exchange server.
+ """
+ from twisted.internet import reactor
+
+ reactor.connectTCP(address, port, factory)
+
+ def _ebExchange(self, failure, factory, domain):
+ """
+ Prepare to resend messages later.
+
+ This errback function runs when no mail exchange server for the domain
+ can be found.
+
+ @type failure: L{Failure}
+ @param failure: The reason the mail exchange lookup failed.
+
+ @type factory: L{SMTPManagedRelayerFactory}
+ @param factory: A factory which can create a relayer for the mail
+ exchange server.
+
+ @type domain: L{bytes}
+ @param domain: A domain.
+ """
+ log.err("Error setting up managed relay factory for " + domain)
+ log.err(failure)
+
+ def setWaiting(queue, messages):
+ map(queue.setWaiting, messages)
+
+ from twisted.internet import reactor
+
+ reactor.callLater(30, setWaiting, self.queue, self.managed[factory])
+ del self.managed[factory]
+
+
+class SmartHostESMTPRelayingManager(SmartHostSMTPRelayingManager):
+ """
+ A smart host which uses ESMTP managed relayers to send messages from the
+ relay queue.
+
+ @type factory: callable which returns L{ESMTPManagedRelayerFactory}
+ @ivar factory: A callable which creates a factory for creating a managed
+ relayer. See L{ESMTPManagedRelayerFactory.__init__} for parameters to
+ the callable.
+ """
+
+ factory = ESMTPManagedRelayerFactory
+
+
+def _checkState(manager):
+ """
+ Prompt a relaying manager to check state.
+
+ @type manager: L{SmartHostSMTPRelayingManager}
+ @param manager: A relaying manager.
+ """
+ manager.checkState()
+
+
+def RelayStateHelper(manager, delay):
+ """
+ Set up a periodic call to prompt a relaying manager to check state.
+
+ @type manager: L{SmartHostSMTPRelayingManager}
+ @param manager: A relaying manager.
+
+ @type delay: L{float}
+ @param delay: The number of seconds between calls.
+
+ @rtype: L{TimerService <internet.TimerService>}
+ @return: A service which periodically reminds a relaying manager to check
+ state.
+ """
+ return internet.TimerService(delay, _checkState, manager)
+
+
+class CanonicalNameLoop(Exception):
+ """
+ An error indicating that when trying to look up a mail exchange host, a set
+ of canonical name records was found which form a cycle and resolution was
+ abandoned.
+ """
+
+
+class CanonicalNameChainTooLong(Exception):
+ """
+ An error indicating that when trying to look up a mail exchange host, too
+ many canonical name records which point to other canonical name records
+ were encountered and resolution was abandoned.
+ """
+
+
+class MXCalculator:
+ """
+ A utility for looking up mail exchange hosts and tracking whether they are
+ working or not.
+
+ @type clock: L{IReactorTime <twisted.internet.interfaces.IReactorTime>}
+ provider
+ @ivar clock: A reactor which will be used to schedule timeouts.
+
+ @type resolver: L{IResolver <twisted.internet.interfaces.IResolver>}
+ @ivar resolver: A resolver.
+
+ @type badMXs: L{dict} mapping L{bytes} to L{float}
+ @ivar badMXs: A mapping of non-functioning mail exchange hostname to time
+ at which another attempt at contacting it may be made.
+
+ @type timeOutBadMX: L{int}
+ @ivar timeOutBadMX: Period in seconds between attempts to contact a
+ non-functioning mail exchange host.
+
+ @type fallbackToDomain: L{bool}
+ @ivar fallbackToDomain: A flag indicating whether to attempt to use the
+ hostname directly when no mail exchange can be found (C{True}) or
+ not (C{False}).
+ """
+
+ timeOutBadMX = 60 * 60 # One hour
+ fallbackToDomain = True
+
+ def __init__(self, resolver=None, clock=None):
+ """
+ @type resolver: L{IResolver <twisted.internet.interfaces.IResolver>}
+ provider or L{None}
+ @param resolver: A resolver.
+
+ @type clock: L{IReactorTime <twisted.internet.interfaces.IReactorTime>}
+ provider or L{None}
+ @param clock: A reactor which will be used to schedule timeouts.
+ """
+ self.badMXs = {}
+ if resolver is None:
+ from twisted.names.client import createResolver
+
+ resolver = createResolver()
+ self.resolver = resolver
+ if clock is None:
+ from twisted.internet import reactor as clock
+ self.clock = clock
+
+ def markBad(self, mx):
+ """
+ Record that a mail exchange host is not currently functioning.
+
+ @type mx: L{bytes}
+ @param mx: The hostname of a mail exchange host.
+ """
+ self.badMXs[str(mx)] = self.clock.seconds() + self.timeOutBadMX
+
+ def markGood(self, mx):
+ """
+ Record that a mail exchange host is functioning.
+
+ @type mx: L{bytes}
+ @param mx: The hostname of a mail exchange host.
+ """
+ try:
+ del self.badMXs[mx]
+ except KeyError:
+ pass
+
+ def getMX(self, domain, maximumCanonicalChainLength=3):
+ """
+ Find the name of a host that acts as a mail exchange server
+ for a domain.
+
+ @type domain: L{bytes}
+ @param domain: A domain name.
+
+ @type maximumCanonicalChainLength: L{int}
+ @param maximumCanonicalChainLength: The maximum number of unique
+ canonical name records to follow while looking up the mail exchange
+ host.
+
+ @rtype: L{Deferred} which successfully fires with L{Record_MX}
+ @return: A deferred which succeeds with the MX record for the mail
+ exchange server for the domain or fails if none can be found.
+ """
+ mailExchangeDeferred = self.resolver.lookupMailExchange(domain)
+ mailExchangeDeferred.addCallback(self._filterRecords)
+ mailExchangeDeferred.addCallback(
+ self._cbMX, domain, maximumCanonicalChainLength
+ )
+ mailExchangeDeferred.addErrback(self._ebMX, domain)
+ return mailExchangeDeferred
+
+ def _filterRecords(self, records):
+ """
+ Organize the records of a DNS response by record name.
+
+ @type records: 3-L{tuple} of (0) L{list} of L{RRHeader
+ <twisted.names.dns.RRHeader>}, (1) L{list} of L{RRHeader
+ <twisted.names.dns.RRHeader>}, (2) L{list} of L{RRHeader
+ <twisted.names.dns.RRHeader>}
+ @param records: Answer resource records, authority resource records and
+ additional resource records.
+
+ @rtype: L{dict} mapping L{bytes} to L{list} of L{IRecord
+ <twisted.names.dns.IRecord>} provider
+ @return: A mapping of record name to record payload.
+ """
+ recordBag = {}
+ for answer in records[0]:
+ recordBag.setdefault(str(answer.name), []).append(answer.payload)
+ return recordBag
+
+ def _cbMX(self, answers, domain, cnamesLeft):
+ """
+ Try to find the mail exchange host for a domain from the given DNS
+ records.
+
+ This will attempt to resolve canonical name record results. It can
+ recognize loops and will give up on non-cyclic chains after a specified
+ number of lookups.
+
+ @type answers: L{dict} mapping L{bytes} to L{list} of L{IRecord
+ <twisted.names.dns.IRecord>} provider
+ @param answers: A mapping of record name to record payload.
+
+ @type domain: L{bytes}
+ @param domain: A domain name.
+
+ @type cnamesLeft: L{int}
+ @param cnamesLeft: The number of unique canonical name records
+ left to follow while looking up the mail exchange host.
+
+ @rtype: L{Record_MX <twisted.names.dns.Record_MX>} or L{Failure}
+ @return: An MX record for the mail exchange host or a failure if one
+ cannot be found.
+ """
+ # Do this import here so that relaymanager.py doesn't depend on
+ # twisted.names, only MXCalculator will.
+ from twisted.names import dns, error
+
+ seenAliases = set()
+ exchanges = []
+ # Examine the answers for the domain we asked about
+ pertinentRecords = answers.get(domain, [])
+ while pertinentRecords:
+ record = pertinentRecords.pop()
+
+ # If it's a CNAME, we'll need to do some more processing
+ if record.TYPE == dns.CNAME:
+ # Remember that this name was an alias.
+ seenAliases.add(domain)
+
+ canonicalName = str(record.name)
+ # See if we have some local records which might be relevant.
+ if canonicalName in answers:
+ # Make sure it isn't a loop contained entirely within the
+ # results we have here.
+ if canonicalName in seenAliases:
+ return Failure(CanonicalNameLoop(record))
+
+ pertinentRecords = answers[canonicalName]
+ exchanges = []
+ else:
+ if cnamesLeft:
+ # Request more information from the server.
+ return self.getMX(canonicalName, cnamesLeft - 1)
+ else:
+ # Give up.
+ return Failure(CanonicalNameChainTooLong(record))
+
+ # If it's an MX, collect it.
+ if record.TYPE == dns.MX:
+ exchanges.append((record.preference, record))
+
+ if exchanges:
+ exchanges.sort()
+ for preference, record in exchanges:
+ host = str(record.name)
+ if host not in self.badMXs:
+ return record
+ t = self.clock.seconds() - self.badMXs[host]
+ if t >= 0:
+ del self.badMXs[host]
+ return record
+ return exchanges[0][1]
+ else:
+ # Treat no answers the same as an error - jump to the errback to
+ # try to look up an A record. This provides behavior described as
+ # a special case in RFC 974 in the section headed I{Interpreting
+ # the List of MX RRs}.
+ return Failure(error.DNSNameError(f"No MX records for {domain!r}"))
+
+ def _ebMX(self, failure, domain):
+ """
+ Attempt to use the name of the domain directly when mail exchange
+ lookup fails.
+
+ @type failure: L{Failure}
+ @param failure: The reason for the lookup failure.
+
+ @type domain: L{bytes}
+ @param domain: The domain name.
+
+ @rtype: L{Record_MX <twisted.names.dns.Record_MX>} or L{Failure}
+ @return: An MX record for the domain or a failure if the fallback to
+ domain option is not in effect and an error, other than not
+ finding an MX record, occurred during lookup.
+
+ @raise IOError: When no MX record could be found and the fallback to
+ domain option is not in effect.
+
+ @raise DNSLookupError: When no MX record could be found and the
+ fallback to domain option is in effect but no address for the
+ domain could be found.
+ """
+ from twisted.names import dns, error
+
+ if self.fallbackToDomain:
+ failure.trap(error.DNSNameError)
+ log.msg(
+ "MX lookup failed; attempting to use hostname ({}) directly".format(
+ domain
+ )
+ )
+
+ # Alright, I admit, this is a bit icky.
+ d = self.resolver.getHostByName(domain)
+
+ def cbResolved(addr):
+ return dns.Record_MX(name=addr)
+
+ def ebResolved(err):
+ err.trap(error.DNSNameError)
+ raise DNSLookupError()
+
+ d.addCallbacks(cbResolved, ebResolved)
+ return d
+ elif failure.check(error.DNSNameError):
+ raise OSError(f"No MX found for {domain!r}")
+ return failure