123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454 |
- import mimetypes
- from email import (
- charset as Charset, encoders as Encoders, generator, message_from_string,
- )
- from email.errors import InvalidHeaderDefect, NonASCIILocalPartDefect
- from email.header import Header
- from email.headerregistry import Address
- from email.message import Message
- from email.mime.base import MIMEBase
- from email.mime.message import MIMEMessage
- from email.mime.multipart import MIMEMultipart
- from email.mime.text import MIMEText
- from email.utils import formatdate, getaddresses, make_msgid, parseaddr
- from io import BytesIO, StringIO
- from pathlib import Path
-
- from django.conf import settings
- from django.core.mail.utils import DNS_NAME
- from django.utils.encoding import force_text
-
- # Don't BASE64-encode UTF-8 messages so that we avoid unwanted attention from
- # some spam filters.
- utf8_charset = Charset.Charset('utf-8')
- utf8_charset.body_encoding = None # Python defaults to BASE64
- utf8_charset_qp = Charset.Charset('utf-8')
- utf8_charset_qp.body_encoding = Charset.QP
-
- # Default MIME type to use on attachments (if it is not explicitly given
- # and cannot be guessed).
- DEFAULT_ATTACHMENT_MIME_TYPE = 'application/octet-stream'
-
- RFC5322_EMAIL_LINE_LENGTH_LIMIT = 998
-
-
- class BadHeaderError(ValueError):
- pass
-
-
- # Header names that contain structured address data (RFC #5322)
- ADDRESS_HEADERS = {
- 'from',
- 'sender',
- 'reply-to',
- 'to',
- 'cc',
- 'bcc',
- 'resent-from',
- 'resent-sender',
- 'resent-to',
- 'resent-cc',
- 'resent-bcc',
- }
-
-
- def forbid_multi_line_headers(name, val, encoding):
- """Forbid multi-line headers to prevent header injection."""
- encoding = encoding or settings.DEFAULT_CHARSET
- val = str(val) # val may be lazy
- if '\n' in val or '\r' in val:
- raise BadHeaderError("Header values can't contain newlines (got %r for header %r)" % (val, name))
- try:
- val.encode('ascii')
- except UnicodeEncodeError:
- if name.lower() in ADDRESS_HEADERS:
- val = ', '.join(sanitize_address(addr, encoding) for addr in getaddresses((val,)))
- else:
- val = Header(val, encoding).encode()
- else:
- if name.lower() == 'subject':
- val = Header(val).encode()
- return name, val
-
-
- def split_addr(addr, encoding):
- """
- Split the address into local part and domain and encode them.
-
- When non-ascii characters are present in the local part, it must be
- MIME-word encoded. The domain name must be idna-encoded if it contains
- non-ascii characters.
- """
- if '@' in addr:
- localpart, domain = addr.split('@', 1)
- # Try to get the simplest encoding - ascii if possible so that
- # to@example.com doesn't become =?utf-8?q?to?=@example.com. This
- # makes unit testing a bit easier and more readable.
- try:
- localpart.encode('ascii')
- except UnicodeEncodeError:
- localpart = Header(localpart, encoding).encode()
- domain = domain.encode('idna').decode('ascii')
- else:
- localpart = Header(addr, encoding).encode()
- domain = ''
- return (localpart, domain)
-
-
- def sanitize_address(addr, encoding):
- """
- Format a pair of (name, address) or an email address string.
- """
- if not isinstance(addr, tuple):
- addr = parseaddr(addr)
- nm, addr = addr
- localpart, domain = None, None
- nm = Header(nm, encoding).encode()
- try:
- addr.encode('ascii')
- except UnicodeEncodeError: # IDN or non-ascii in the local part
- localpart, domain = split_addr(addr, encoding)
-
- # An `email.headerregistry.Address` object is used since
- # email.utils.formataddr() naively encodes the name as ascii (see #25986).
- if localpart and domain:
- address = Address(nm, username=localpart, domain=domain)
- return str(address)
-
- try:
- address = Address(nm, addr_spec=addr)
- except (InvalidHeaderDefect, NonASCIILocalPartDefect):
- localpart, domain = split_addr(addr, encoding)
- address = Address(nm, username=localpart, domain=domain)
- return str(address)
-
-
- class MIMEMixin:
- def as_string(self, unixfrom=False, linesep='\n'):
- """Return the entire formatted message as a string.
- Optional `unixfrom' when True, means include the Unix From_ envelope
- header.
-
- This overrides the default as_string() implementation to not mangle
- lines that begin with 'From '. See bug #13433 for details.
- """
- fp = StringIO()
- g = generator.Generator(fp, mangle_from_=False)
- g.flatten(self, unixfrom=unixfrom, linesep=linesep)
- return fp.getvalue()
-
- def as_bytes(self, unixfrom=False, linesep='\n'):
- """Return the entire formatted message as bytes.
- Optional `unixfrom' when True, means include the Unix From_ envelope
- header.
-
- This overrides the default as_bytes() implementation to not mangle
- lines that begin with 'From '. See bug #13433 for details.
- """
- fp = BytesIO()
- g = generator.BytesGenerator(fp, mangle_from_=False)
- g.flatten(self, unixfrom=unixfrom, linesep=linesep)
- return fp.getvalue()
-
-
- class SafeMIMEMessage(MIMEMixin, MIMEMessage):
-
- def __setitem__(self, name, val):
- # message/rfc822 attachments must be ASCII
- name, val = forbid_multi_line_headers(name, val, 'ascii')
- MIMEMessage.__setitem__(self, name, val)
-
-
- class SafeMIMEText(MIMEMixin, MIMEText):
-
- def __init__(self, _text, _subtype='plain', _charset=None):
- self.encoding = _charset
- MIMEText.__init__(self, _text, _subtype=_subtype, _charset=_charset)
-
- def __setitem__(self, name, val):
- name, val = forbid_multi_line_headers(name, val, self.encoding)
- MIMEText.__setitem__(self, name, val)
-
- def set_payload(self, payload, charset=None):
- if charset == 'utf-8' and not isinstance(charset, Charset.Charset):
- has_long_lines = any(
- len(l.encode()) > RFC5322_EMAIL_LINE_LENGTH_LIMIT
- for l in payload.splitlines()
- )
- # Quoted-Printable encoding has the side effect of shortening long
- # lines, if any (#22561).
- charset = utf8_charset_qp if has_long_lines else utf8_charset
- MIMEText.set_payload(self, payload, charset=charset)
-
-
- class SafeMIMEMultipart(MIMEMixin, MIMEMultipart):
-
- def __init__(self, _subtype='mixed', boundary=None, _subparts=None, encoding=None, **_params):
- self.encoding = encoding
- MIMEMultipart.__init__(self, _subtype, boundary, _subparts, **_params)
-
- def __setitem__(self, name, val):
- name, val = forbid_multi_line_headers(name, val, self.encoding)
- MIMEMultipart.__setitem__(self, name, val)
-
-
- class EmailMessage:
- """A container for email information."""
- content_subtype = 'plain'
- mixed_subtype = 'mixed'
- encoding = None # None => use settings default
-
- def __init__(self, subject='', body='', from_email=None, to=None, bcc=None,
- connection=None, attachments=None, headers=None, cc=None,
- reply_to=None):
- """
- Initialize a single email message (which can be sent to multiple
- recipients).
- """
- if to:
- if isinstance(to, str):
- raise TypeError('"to" argument must be a list or tuple')
- self.to = list(to)
- else:
- self.to = []
- if cc:
- if isinstance(cc, str):
- raise TypeError('"cc" argument must be a list or tuple')
- self.cc = list(cc)
- else:
- self.cc = []
- if bcc:
- if isinstance(bcc, str):
- raise TypeError('"bcc" argument must be a list or tuple')
- self.bcc = list(bcc)
- else:
- self.bcc = []
- if reply_to:
- if isinstance(reply_to, str):
- raise TypeError('"reply_to" argument must be a list or tuple')
- self.reply_to = list(reply_to)
- else:
- self.reply_to = []
- self.from_email = from_email or settings.DEFAULT_FROM_EMAIL
- self.subject = subject
- self.body = body or ''
- self.attachments = []
- if attachments:
- for attachment in attachments:
- if isinstance(attachment, MIMEBase):
- self.attach(attachment)
- else:
- self.attach(*attachment)
- self.extra_headers = headers or {}
- self.connection = connection
-
- def get_connection(self, fail_silently=False):
- from django.core.mail import get_connection
- if not self.connection:
- self.connection = get_connection(fail_silently=fail_silently)
- return self.connection
-
- def message(self):
- encoding = self.encoding or settings.DEFAULT_CHARSET
- msg = SafeMIMEText(self.body, self.content_subtype, encoding)
- msg = self._create_message(msg)
- msg['Subject'] = self.subject
- msg['From'] = self.extra_headers.get('From', self.from_email)
- self._set_list_header_if_not_empty(msg, 'To', self.to)
- self._set_list_header_if_not_empty(msg, 'Cc', self.cc)
- self._set_list_header_if_not_empty(msg, 'Reply-To', self.reply_to)
-
- # Email header names are case-insensitive (RFC 2045), so we have to
- # accommodate that when doing comparisons.
- header_names = [key.lower() for key in self.extra_headers]
- if 'date' not in header_names:
- # formatdate() uses stdlib methods to format the date, which use
- # the stdlib/OS concept of a timezone, however, Django sets the
- # TZ environment variable based on the TIME_ZONE setting which
- # will get picked up by formatdate().
- msg['Date'] = formatdate(localtime=settings.EMAIL_USE_LOCALTIME)
- if 'message-id' not in header_names:
- # Use cached DNS_NAME for performance
- msg['Message-ID'] = make_msgid(domain=DNS_NAME)
- for name, value in self.extra_headers.items():
- if name.lower() != 'from': # From is already handled
- msg[name] = value
- return msg
-
- def recipients(self):
- """
- Return a list of all recipients of the email (includes direct
- addressees as well as Cc and Bcc entries).
- """
- return [email for email in (self.to + self.cc + self.bcc) if email]
-
- def send(self, fail_silently=False):
- """Send the email message."""
- if not self.recipients():
- # Don't bother creating the network connection if there's nobody to
- # send to.
- return 0
- return self.get_connection(fail_silently).send_messages([self])
-
- def attach(self, filename=None, content=None, mimetype=None):
- """
- Attach a file with the given filename and content. The filename can
- be omitted and the mimetype is guessed, if not provided.
-
- If the first parameter is a MIMEBase subclass, insert it directly
- into the resulting message attachments.
-
- For a text/* mimetype (guessed or specified), when a bytes object is
- specified as content, decode it as UTF-8. If that fails, set the
- mimetype to DEFAULT_ATTACHMENT_MIME_TYPE and don't decode the content.
- """
- if isinstance(filename, MIMEBase):
- assert content is None
- assert mimetype is None
- self.attachments.append(filename)
- else:
- assert content is not None
- mimetype = mimetype or mimetypes.guess_type(filename)[0] or DEFAULT_ATTACHMENT_MIME_TYPE
- basetype, subtype = mimetype.split('/', 1)
-
- if basetype == 'text':
- if isinstance(content, bytes):
- try:
- content = content.decode()
- except UnicodeDecodeError:
- # If mimetype suggests the file is text but it's
- # actually binary, read() raises a UnicodeDecodeError.
- mimetype = DEFAULT_ATTACHMENT_MIME_TYPE
-
- self.attachments.append((filename, content, mimetype))
-
- def attach_file(self, path, mimetype=None):
- """
- Attach a file from the filesystem.
-
- Set the mimetype to DEFAULT_ATTACHMENT_MIME_TYPE if it isn't specified
- and cannot be guessed.
-
- For a text/* mimetype (guessed or specified), decode the file's content
- as UTF-8. If that fails, set the mimetype to
- DEFAULT_ATTACHMENT_MIME_TYPE and don't decode the content.
- """
- path = Path(path)
- with path.open('rb') as file:
- content = file.read()
- self.attach(path.name, content, mimetype)
-
- def _create_message(self, msg):
- return self._create_attachments(msg)
-
- def _create_attachments(self, msg):
- if self.attachments:
- encoding = self.encoding or settings.DEFAULT_CHARSET
- body_msg = msg
- msg = SafeMIMEMultipart(_subtype=self.mixed_subtype, encoding=encoding)
- if self.body or body_msg.is_multipart():
- msg.attach(body_msg)
- for attachment in self.attachments:
- if isinstance(attachment, MIMEBase):
- msg.attach(attachment)
- else:
- msg.attach(self._create_attachment(*attachment))
- return msg
-
- def _create_mime_attachment(self, content, mimetype):
- """
- Convert the content, mimetype pair into a MIME attachment object.
-
- If the mimetype is message/rfc822, content may be an
- email.Message or EmailMessage object, as well as a str.
- """
- basetype, subtype = mimetype.split('/', 1)
- if basetype == 'text':
- encoding = self.encoding or settings.DEFAULT_CHARSET
- attachment = SafeMIMEText(content, subtype, encoding)
- elif basetype == 'message' and subtype == 'rfc822':
- # Bug #18967: per RFC2046 s5.2.1, message/rfc822 attachments
- # must not be base64 encoded.
- if isinstance(content, EmailMessage):
- # convert content into an email.Message first
- content = content.message()
- elif not isinstance(content, Message):
- # For compatibility with existing code, parse the message
- # into an email.Message object if it is not one already.
- content = message_from_string(force_text(content))
-
- attachment = SafeMIMEMessage(content, subtype)
- else:
- # Encode non-text attachments with base64.
- attachment = MIMEBase(basetype, subtype)
- attachment.set_payload(content)
- Encoders.encode_base64(attachment)
- return attachment
-
- def _create_attachment(self, filename, content, mimetype=None):
- """
- Convert the filename, content, mimetype triple into a MIME attachment
- object.
- """
- attachment = self._create_mime_attachment(content, mimetype)
- if filename:
- try:
- filename.encode('ascii')
- except UnicodeEncodeError:
- filename = ('utf-8', '', filename)
- attachment.add_header('Content-Disposition', 'attachment', filename=filename)
- return attachment
-
- def _set_list_header_if_not_empty(self, msg, header, values):
- """
- Set msg's header, either from self.extra_headers, if present, or from
- the values argument.
- """
- if values:
- try:
- value = self.extra_headers[header]
- except KeyError:
- value = ', '.join(str(v) for v in values)
- msg[header] = value
-
-
- class EmailMultiAlternatives(EmailMessage):
- """
- A version of EmailMessage that makes it easy to send multipart/alternative
- messages. For example, including text and HTML versions of the text is
- made easier.
- """
- alternative_subtype = 'alternative'
-
- def __init__(self, subject='', body='', from_email=None, to=None, bcc=None,
- connection=None, attachments=None, headers=None, alternatives=None,
- cc=None, reply_to=None):
- """
- Initialize a single email message (which can be sent to multiple
- recipients).
- """
- super().__init__(
- subject, body, from_email, to, bcc, connection, attachments,
- headers, cc, reply_to,
- )
- self.alternatives = alternatives or []
-
- def attach_alternative(self, content, mimetype):
- """Attach an alternative content representation."""
- assert content is not None
- assert mimetype is not None
- self.alternatives.append((content, mimetype))
-
- def _create_message(self, msg):
- return self._create_attachments(self._create_alternatives(msg))
-
- def _create_alternatives(self, msg):
- encoding = self.encoding or settings.DEFAULT_CHARSET
- if self.alternatives:
- body_msg = msg
- msg = SafeMIMEMultipart(_subtype=self.alternative_subtype, encoding=encoding)
- if self.body:
- msg.attach(body_msg)
- for alternative in self.alternatives:
- msg.attach(self._create_mime_attachment(*alternative))
- return msg
|