updates to backends.py

This commit is contained in:
Raymond Jessop 2025-04-18 13:50:09 -05:00
parent bbfdfec0dc
commit a5a7344d3b
2 changed files with 186 additions and 266 deletions

View File

@ -1,71 +1,104 @@
import logging
from time import sleep
from datetime import datetime, timedelta
import boto3
from botocore.vendored.requests.packages.urllib3.exceptions import ResponseError
from django.core.cache import cache
from django.core.mail.backends.base import BaseEmailBackend
from django.db.models import Count
from django.dispatch import Signal
from datetime import datetime, timedelta
from time import sleep
import sys
from django.core.exceptions import ImproperlyConfigured
from requests.exceptions import RequestException as ResponseError
from . import settings
from . import signals
from . import utils
from .models import BounceRecord
logger = settings.logger
cached_rate_limits = {}
recent_send_times = []
def dkim_sign(message, dkim_domain=None, dkim_key=None, dkim_selector=None, dkim_headers=None):
"""Return signed email message if dkim package and settings are available."""
"""Sign an email message with DKIM if the package and settings are available.
Args:
message (str): The email message as a string.
dkim_domain (str): DKIM domain for signing.
dkim_key (str): DKIM private key.
dkim_selector (str): DKIM selector.
dkim_headers (tuple): Headers to include in DKIM signing.
Returns:
str: The signed message or original message if signing fails.
"""
try:
import dkim
except ImportError:
pass
else:
if dkim_domain and dkim_key:
sig = dkim.sign(message,
dkim_selector,
dkim_domain,
dkim_key,
include_headers=dkim_headers)
message = sig + message
return message
logger.warning("DKIM package not installed, skipping signing")
return message
if not (dkim_domain and dkim_key):
logger.debug("DKIM domain or key missing, skipping signing")
return message
try:
sig = dkim.sign(
message,
dkim_selector,
dkim_domain,
dkim_key,
include_headers=dkim_headers
)
return sig + message
except Exception as e:
logger.error(f"DKIM signing failed: {e}")
return message
class SESBackend(BaseEmailBackend):
"""A Django Email backend that uses Amazon's Simple Email Service.
"""
def __init__(self, fail_silently=False, aws_access_key=None,
aws_secret_key=None, aws_region_name=None,
aws_region_endpoint=None, aws_auto_throttle=None,
dkim_domain=None, dkim_key=None, dkim_selector=None,
dkim_headers=None, **kwargs):
"""Django email backend for Amazon SES.
super(SESBackend, self).__init__(fail_silently=fail_silently, **kwargs)
Sends emails using AWS SES with support for DKIM signing and rate limiting.
"""
def __init__(self, fail_silently=False, aws_access_key=None, aws_secret_key=None,
aws_region_name=None, aws_region_endpoint=None, aws_auto_throttle=None,
dkim_domain=None, dkim_key=None, dkim_selector=None, dkim_headers=None, **kwargs):
"""Initialize SES backend with AWS credentials and settings.
Args:
fail_silently (bool): If True, silently ignore errors.
aws_access_key (str): AWS access key ID.
aws_secret_key (str): AWS secret access key.
aws_region_name (str): AWS region name.
aws_region_endpoint (str): AWS SES endpoint URL.
aws_auto_throttle (float): Throttling factor for SES rate limits.
dkim_domain (str): DKIM domain for signing.
dkim_key (str): DKIM private key.
dkim_selector (str): DKIM selector.
dkim_headers (tuple): Headers to include in DKIM signing.
Raises:
ImproperlyConfigured: If AWS credentials are missing.
"""
super().__init__(fail_silently=fail_silently, **kwargs)
self._access_key_id = aws_access_key or settings.ACCESS_KEY
self._access_key = aws_secret_key or settings.SECRET_KEY
self._region_name = aws_region_name if aws_region_name else settings.AWS_SES_REGION_NAME
self._endpoint_url = aws_region_endpoint if aws_region_endpoint else settings.AWS_SES_REGION_ENDPOINT_URL
self._region_name = aws_region_name or settings.AWS_SES_REGION_NAME
self._endpoint_url = aws_region_endpoint or settings.AWS_SES_REGION_ENDPOINT_URL
self._throttle = aws_auto_throttle or settings.AWS_SES_AUTO_THROTTLE
self.dkim_domain = dkim_domain or settings.DKIM_DOMAIN
self.dkim_key = dkim_key or settings.DKIM_PRIVATE_KEY
self.dkim_selector = dkim_selector or settings.DKIM_SELECTOR
self.dkim_headers = dkim_headers or settings.DKIM_HEADERS
if not (self._access_key_id and self._access_key):
raise ImproperlyConfigured("AWS SES credentials are required.")
self.connection = None
def open(self):
"""Create a connection to the AWS API server. This can be reused for
sending multiple emails.
"""Create a connection to the AWS SES API server.
Returns:
bool: True if a new connection was created, False otherwise.
"""
if self.connection:
return False
@ -78,264 +111,151 @@ class SESBackend(BaseEmailBackend):
region_name=self._region_name,
endpoint_url=self._endpoint_url,
)
except Exception:
return True
except Exception as e:
logger.error(f"Failed to connect to SES: {e}")
if not self.fail_silently:
raise
return True
return False
def close(self):
"""Close any open HTTP connections to the API server.
"""
"""Close the SES API connection."""
self.connection = None
def send_messages(self, email_messages):
"""Sends one or more EmailMessage objects and returns the number of
email messages sent and a list of filtered emails.
def get_rate_limit(self):
"""Retrieve and cache the SES maximum send rate.
Returns:
float: The maximum send rate per second.
Raises:
Exception: If no connection is available to fetch the rate limit.
"""
logger.info("1 --- start of send_messages")
list_of_response = []
num_sent = 0
not_sent_list = []
sent_message = {"Sent":""}
calling_func = ''
cache_key = f"ses_rate_limit_{self._access_key_id}"
rate_limit = cache.get(cache_key)
if rate_limit is not None:
logger.debug(f"Retrieved cached rate limit: {rate_limit}")
return rate_limit
logger.debug("Fetching new rate limit from AWS SES")
new_conn_created = self.open()
if not self.connection:
raise Exception("No connection to check SES rate limit.")
try:
fcount = 0
while sys._getframe(fcount).f_code.co_name in ['send_messages', 'send', 'mail_admins', 'send_mail', 'emit', 'handle']:
fcount +=1
calling_func = sys._getframe(fcount).f_code.co_name
except Exception as e:
logger.info("fcount:%s, called from exception = %s" % (fcount, e))
logger.info("called from %s current throttle:%s" % (calling_func, self._throttle))
logger.info("send_messages")
quota_dict = self.connection.get_send_quota()
rate_limit = float(quota_dict['MaxSendRate'])
cache.set(cache_key, rate_limit, timeout=3600) # Cache for 1 hour
return rate_limit
finally:
if new_conn_created:
self.close()
def send_messages(self, email_messages):
"""Send one or more EmailMessage objects.
Args:
email_messages (list): List of EmailMessage objects to send.
Returns:
tuple: (number of messages sent, dictionary with sent/not sent info)
"""
if not email_messages:
logger.info("no email messages returning")
list_of_response.append({'error':'no email messages returning'})
logger.debug("No email messages to send")
return 0, {"Sent": ""}
new_conn_created = self.open()
if new_conn_created:
logger.info("created a new connection")
if not self.connection:
# Failed silently
logger.info("no connection returning")
list_of_response.append({'error':'no connection returning'})
logger.info("DEBUGING EMAILS --- list_of_response:%s" % (list_of_response))
logger.info("DEBUGING EMAILS --- return %s" % (num_sent))
return num_sent
logger.error("Failed to establish SES connection")
return 0, {"Sent": ""}
num_sent, sent_message, list_of_response = 0, {"Sent": ""}, []
source = settings.AWS_SES_RETURN_PATH or settings.DEFAULT_FROM_EMAIL
not_sent_list = []
source = settings.AWS_SES_RETURN_PATH
logger.info("email_messages: %s" % email_messages)
for message in email_messages:
# SES Configuration sets. If the AWS_SES_CONFIGURATION_SET setting
# is not None, append the appropriate header to the message so that
# SES knows which configuration set it belongs to.
#
# If settings.AWS_SES_CONFIGURATION_SET is a callable, pass it the
# message object and dkim settings and expect it to return a string
# containing the SES Configuration Set name.
message.aws_ses_response = {'error':'not sent yet'}
logger.info("Sending signal(email_pre_send)")
logger.info("message to: %s, cc: %s, bcc: %s" % (message.to, message.cc, message.bcc))
message.aws_ses_response = {'error': 'not sent yet'}
signals.email_pre_send.send_robust(self.__class__, message=message)
# for log in dir(message):
# logger.info(log)
pre_filter_recipients = message.recipients()
logger.info("message.recipients() = %s" % message.recipients())
marketing = message.extra_headers.get("marketing","False")
message.to = utils.filter_recipients(message.to)
message.cc = utils.filter_recipients(message.cc)
message.bcc = utils.filter_recipients(message.bcc)
message.to = utils.filter_recipiants(message.to)
message.cc = utils.filter_recipiants(message.cc)
message.bcc = utils.filter_recipiants(message.bcc)
logger.info("message.recipients() after email_pre_send: %s" % message.recipients())
if not message.recipients():
logger.info("no recipients left after the filter")
list_of_response.append({'error':'no recipients left after the filter'})
message.aws_ses_response = {'error':'no recipients left after the filter'}
sent_message = {"Not Sent":"No recipients left after filters"}
logger.debug("No recipients after filtering")
message.aws_ses_response = {'error': 'no recipients left after filtering'}
list_of_response.append({'error': 'no recipients left after filtering'})
continue
#raise Exception('No emails left after filters!')
else:
for email in pre_filter_recipients:
if email not in message.recipients():
not_sent_list.append(email)
if (settings.AWS_SES_CONFIGURATION_SET
and 'X-SES-CONFIGURATION-SET' not in message.extra_headers):
not_sent_list.extend([email for email in pre_filter_recipients if email not in message.recipients()])
if settings.AWS_SES_CONFIGURATION_SET and 'X-SES-CONFIGURATION-SET' not in message.extra_headers:
if callable(settings.AWS_SES_CONFIGURATION_SET):
message.extra_headers[
'X-SES-CONFIGURATION-SET'] = settings.AWS_SES_CONFIGURATION_SET(
message,
dkim_domain=self.dkim_domain,
dkim_key=self.dkim_key,
dkim_selector=self.dkim_selector,
dkim_headers=self.dkim_headers
)
message.extra_headers['X-SES-CONFIGURATION-SET'] = settings.AWS_SES_CONFIGURATION_SET(
message, dkim_domain=self.dkim_domain, dkim_key=self.dkim_key,
dkim_selector=self.dkim_selector, dkim_headers=self.dkim_headers
)
else:
message.extra_headers[
'X-SES-CONFIGURATION-SET'] = settings.AWS_SES_CONFIGURATION_SET
message.extra_headers['X-SES-CONFIGURATION-SET'] = settings.AWS_SES_CONFIGURATION_SET
# Automatic throttling. Assumes that this is the only SES client
# currently operating. The AWS_SES_AUTO_THROTTLE setting is a
# factor to apply to the rate limit, with a default of 0.5 to stay
# well below the actual SES throttle.
# Set the setting to 0 or None to disable throttling.
if self._throttle:
global recent_send_times
logger.info("inside if _throttle recent_send_times:%s" % recent_send_times)
now = datetime.now()
logger.info("inside if _throttle now:%s" % now)
# Get and cache the current SES max-per-second rate limit
# returned by the SES API.
logger.info("inside if _throttle calling get_rate_limit")
rate_limit = self.get_rate_limit()
logger.info("send_messages.throttle rate_limit='{}'".format(rate_limit))
# Prune from recent_send_times anything more than a few seconds
# ago. Even though SES reports a maximum per-second, the way
# they enforce the limit may not be on a one-second window.
# To be safe, we use a two-second window (but allow 2 times the
# rate limit) and then also have a default rate limit factor of
# 0.5 so that we really limit the one-second amount in two
# seconds.
window = 2.0 # seconds
cache_key = "ses_recent_send_times"
recent_send_times = cache.get(cache_key, [])
window = 2.0
window_start = now - timedelta(seconds=window)
new_send_times = []
for time in recent_send_times:
if time > window_start:
new_send_times.append(time)
recent_send_times = new_send_times
recent_send_times = [t for t in recent_send_times if t > window_start]
# If the number of recent send times in the last 1/_throttle
# seconds exceeds the rate limit, add a delay.
# Since I'm not sure how Amazon determines at exactly what
# point to throttle, better be safe than sorry and let in, say,
# half of the allowed rate.
if len(new_send_times) > rate_limit * window * self._throttle:
# Sleep the remainder of the window period.
delta = now - new_send_times[0]
total_seconds = (delta.microseconds + (delta.seconds +
delta.days * 24 * 3600) * 10**6) / 10**6
rate_limit = self.get_rate_limit()
if len(recent_send_times) > rate_limit * window * self._throttle:
delta = now - recent_send_times[0]
total_seconds = delta.total_seconds()
delay = window - total_seconds
if delay > 0:
sleep(delay)
recent_send_times.append(now)
# end of throttling
cache.set(cache_key, recent_send_times, timeout=2)
try:
logger.info("Try to send raw email")
#logger.info('message.message().as_string() = %s' % message.message().as_string())
logger.info("source = %s" % source)
logger.info("message.from_email = %s" % message.from_email)
logger.info("message.recipients() = %s" % message.recipients())
logger.info("dkim_key = %s" % self.dkim_key)
logger.info("dkim_domain = %s" % self.dkim_domain)
logger.info("dkim_selector = %s" % self.dkim_selector)
logger.info("dkim_headers = %s" % str(self.dkim_headers))
response = self.connection.send_raw_email(
Source=source or message.from_email,
Destinations=message.recipients(),
# todo attachments?
RawMessage={'Data': dkim_sign(message.message().as_string(),
dkim_key=self.dkim_key,
dkim_domain=self.dkim_domain,
dkim_selector=self.dkim_selector,
dkim_headers=self.dkim_headers)}
RawMessage={'Data': dkim_sign(
message.message().as_string(),
dkim_key=self.dkim_key,
dkim_domain=self.dkim_domain,
dkim_selector=self.dkim_selector,
dkim_headers=self.dkim_headers
)}
)
list_of_response.append(response)
message.aws_ses_response = response
message.extra_headers['status'] = 200
message.extra_headers['message_id'] = response['MessageId']
message.extra_headers['request_id'] = response['ResponseMetadata']['RequestId']
message.extra_headers.update({
'status': 200,
'message_id': response['MessageId'],
'request_id': response['ResponseMetadata']['RequestId']
})
num_sent += 1
if 'X-SES-CONFIGURATION-SET' in message.extra_headers:
logger.info(
u"send_messages.sent from='{}' recipients='{}' message_id='{}' request_id='{}' "
u"ses-configuration-set='{}'".format(
message.from_email,
", ".join(message.recipients()),
message.extra_headers['message_id'],
message.extra_headers['request_id'],
message.extra_headers['X-SES-CONFIGURATION-SET']
))
else:
logger.info(u"send_messages.sent from='{}' recipients='{}' message_id='{}' request_id='{}'".format(
message.from_email,
", ".join(message.recipients()),
message.extra_headers['message_id'],
message.extra_headers['request_id']
))
logger.info(
f"Sent email from {message.from_email} to {', '.join(message.recipients())}, "
f"message_id={message.extra_headers['message_id']}, request_id={message.extra_headers['request_id']}"
)
list_of_response.append(response)
except ResponseError as err:
# Store failure information so to post process it if required
error_keys = ['status', 'reason', 'body', 'request_id',
'error_code', 'error_message']
for key in error_keys:
message.extra_headers[key] = getattr(err, key, None)
logger.error(f"Failed to send email: {err}")
message.extra_headers.update({
key: getattr(err, key, None) for key in ['status', 'reason', 'body', 'request_id', 'error_code', 'error_message']
})
list_of_response.append({'error': str(err)})
if not self.fail_silently:
raise
if not_sent_list:
sent_message.update({"Sent":"%s" % not_sent_list})
logger.info("new_conn_created: %s" % new_conn_created)
sent_message["Sent"] = ", ".join(not_sent_list)
if new_conn_created:
logger.info("closing new connection after send")
self.close()
logger.info("DEBUGING EMAILS --- list_of_response:%s" % (list_of_response))
logger.info("DEBUGING EMAILS --- return %s, %s" % (num_sent, sent_message))
return num_sent, sent_message
def get_rate_limit(self):
logger.info("getting rate limit")
if self._access_key_id in cached_rate_limits:
logger.info("returning cached rate limit %s" % cached_rate_limits[self._access_key_id])
return cached_rate_limits[self._access_key_id]
logger.info("creating AWS connection")
new_conn_created = self.open()
if not self.connection:
logger.info("AWS connection creation failed")
raise Exception(
"No connection is available to check current SES rate limit.")
try:
quota_dict = self.connection.get_send_quota()
logger.info("AWS quota dict %s" % quota_dict)
max_per_second = quota_dict['MaxSendRate']
ret = float(max_per_second)
cached_rate_limits[self._access_key_id] = ret
return ret
finally:
if new_conn_created:
self.close()
logger.debug(f"Sent {num_sent} messages, response: {list_of_response}")
return num_sent, sent_message

View File

@ -212,11 +212,11 @@ def receiver_email_pre_send(sender, message=None, **kwargs):
"""
pass
def filter_recipiants(recipiant_list):
def filter_recipients(recipiant_list):
"""
Filters a list of recipient email addresses to exclude invalid or blacklisted emails.
"""
logger.info("Starting filter_recipiants: %s", recipiant_list)
logger.info("Starting filter_recipients: %s", recipiant_list)
# Ensure recipient_list is a list
if not isinstance(recipiant_list, list):
@ -224,43 +224,43 @@ def filter_recipiants(recipiant_list):
recipiant_list = [recipiant_list]
if recipiant_list:
recipiant_list = filter_recipiants_with_unsubscribe(recipiant_list)
recipiant_list = filter_recipiants_with_complaint_records(recipiant_list)
recipiant_list = filter_recipiants_with_bounce_records(recipiant_list)
recipiant_list = filter_recipiants_with_validater_email_domain(recipiant_list)
recipiant_list = filter_recipients_with_unsubscribe(recipiant_list)
recipiant_list = filter_recipients_with_complaint_records(recipiant_list)
recipiant_list = filter_recipients_with_bounce_records(recipiant_list)
recipiant_list = filter_recipients_with_validater_email_domain(recipiant_list)
logger.info("Filtered recipient list: %s", recipiant_list)
return recipiant_list
def filter_recipiants_with_unsubscribe(recipiant_list):
def filter_recipients_with_unsubscribe(recipiant_list):
"""
Removes recipients who have unsubscribed.
"""
blacklist_emails = list(set([record.email for record in User.objects.filter(aws_ses__unsubscribe=True)]))
return filter_recipiants_with_blacklist(recipiant_list, blacklist_emails) if blacklist_emails else recipiant_list
return filter_recipients_with_blacklist(recipiant_list, blacklist_emails) if blacklist_emails else recipiant_list
def filter_recipiants_with_complaint_records(recipiant_list):
def filter_recipients_with_complaint_records(recipiant_list):
"""
Removes recipients with complaint records.
"""
blacklist_emails = list(set([record.email for record in ComplaintRecord.objects.filter(email__isnull=False)]))
return filter_recipiants_with_blacklist(recipiant_list, blacklist_emails) if blacklist_emails else recipiant_list
return filter_recipients_with_blacklist(recipiant_list, blacklist_emails) if blacklist_emails else recipiant_list
def filter_recipiants_with_bounce_records(recipiant_list):
def filter_recipients_with_bounce_records(recipiant_list):
"""
Removes recipients with bounce records exceeding SES_BOUNCE_LIMIT.
"""
blacklist_emails = list(set([record.email for record in BounceRecord.objects.filter(email__isnull=False)
.annotate(total=Count('email')).filter(total__gte=settings.SES_BOUNCE_LIMIT)]))
return filter_recipiants_with_blacklist(recipiant_list, blacklist_emails) if blacklist_emails else recipiant_list
return filter_recipients_with_blacklist(recipiant_list, blacklist_emails) if blacklist_emails else recipiant_list
def filter_recipiants_with_blacklist(recipiant_list, blacklist_emails):
def filter_recipients_with_blacklist(recipiant_list, blacklist_emails):
"""
Filters out emails from a blacklist.
"""
return [email for email in recipiant_list if email not in blacklist_emails]
def filter_recipiants_with_validater_email_domain(recipiant_list):
def filter_recipients_with_validater_email_domain(recipiant_list):
"""
Validates email domains for new recipients.
"""