From a5a7344d3b2a4ec61566bd6361c88cc20b6ab372 Mon Sep 17 00:00:00 2001 From: Raymond Jessop Date: Fri, 18 Apr 2025 13:50:09 -0500 Subject: [PATCH] updates to backends.py --- django_aws_ses/backends.py | 424 +++++++++++++++---------------------- django_aws_ses/utils.py | 28 +-- 2 files changed, 186 insertions(+), 266 deletions(-) diff --git a/django_aws_ses/backends.py b/django_aws_ses/backends.py index 9c2f915..7a9674c 100644 --- a/django_aws_ses/backends.py +++ b/django_aws_ses/backends.py @@ -1,71 +1,104 @@ import logging +from time import sleep +from datetime import datetime, timedelta import boto3 -from botocore.vendored.requests.packages.urllib3.exceptions import ResponseError +from django.core.cache import cache from django.core.mail.backends.base import BaseEmailBackend -from django.db.models import Count -from django.dispatch import Signal - -from datetime import datetime, timedelta -from time import sleep -import sys +from django.core.exceptions import ImproperlyConfigured +from requests.exceptions import RequestException as ResponseError from . import settings from . import signals from . import utils -from .models import BounceRecord logger = settings.logger -cached_rate_limits = {} -recent_send_times = [] - def dkim_sign(message, dkim_domain=None, dkim_key=None, dkim_selector=None, dkim_headers=None): - """Return signed email message if dkim package and settings are available.""" + """Sign an email message with DKIM if the package and settings are available. + + Args: + message (str): The email message as a string. + dkim_domain (str): DKIM domain for signing. + dkim_key (str): DKIM private key. + dkim_selector (str): DKIM selector. + dkim_headers (tuple): Headers to include in DKIM signing. + + Returns: + str: The signed message or original message if signing fails. + """ try: import dkim except ImportError: - pass - else: - if dkim_domain and dkim_key: - sig = dkim.sign(message, - dkim_selector, - dkim_domain, - dkim_key, - include_headers=dkim_headers) - message = sig + message - return message + logger.warning("DKIM package not installed, skipping signing") + return message + + if not (dkim_domain and dkim_key): + logger.debug("DKIM domain or key missing, skipping signing") + return message + + try: + sig = dkim.sign( + message, + dkim_selector, + dkim_domain, + dkim_key, + include_headers=dkim_headers + ) + return sig + message + except Exception as e: + logger.error(f"DKIM signing failed: {e}") + return message class SESBackend(BaseEmailBackend): - """A Django Email backend that uses Amazon's Simple Email Service. - """ - - - def __init__(self, fail_silently=False, aws_access_key=None, - aws_secret_key=None, aws_region_name=None, - aws_region_endpoint=None, aws_auto_throttle=None, - dkim_domain=None, dkim_key=None, dkim_selector=None, - dkim_headers=None, **kwargs): + """Django email backend for Amazon SES. - super(SESBackend, self).__init__(fail_silently=fail_silently, **kwargs) + Sends emails using AWS SES with support for DKIM signing and rate limiting. + """ + + def __init__(self, fail_silently=False, aws_access_key=None, aws_secret_key=None, + aws_region_name=None, aws_region_endpoint=None, aws_auto_throttle=None, + dkim_domain=None, dkim_key=None, dkim_selector=None, dkim_headers=None, **kwargs): + """Initialize SES backend with AWS credentials and settings. + + Args: + fail_silently (bool): If True, silently ignore errors. + aws_access_key (str): AWS access key ID. + aws_secret_key (str): AWS secret access key. + aws_region_name (str): AWS region name. + aws_region_endpoint (str): AWS SES endpoint URL. + aws_auto_throttle (float): Throttling factor for SES rate limits. + dkim_domain (str): DKIM domain for signing. + dkim_key (str): DKIM private key. + dkim_selector (str): DKIM selector. + dkim_headers (tuple): Headers to include in DKIM signing. + + Raises: + ImproperlyConfigured: If AWS credentials are missing. + """ + super().__init__(fail_silently=fail_silently, **kwargs) self._access_key_id = aws_access_key or settings.ACCESS_KEY self._access_key = aws_secret_key or settings.SECRET_KEY - self._region_name = aws_region_name if aws_region_name else settings.AWS_SES_REGION_NAME - self._endpoint_url = aws_region_endpoint if aws_region_endpoint else settings.AWS_SES_REGION_ENDPOINT_URL + self._region_name = aws_region_name or settings.AWS_SES_REGION_NAME + self._endpoint_url = aws_region_endpoint or settings.AWS_SES_REGION_ENDPOINT_URL self._throttle = aws_auto_throttle or settings.AWS_SES_AUTO_THROTTLE - self.dkim_domain = dkim_domain or settings.DKIM_DOMAIN self.dkim_key = dkim_key or settings.DKIM_PRIVATE_KEY self.dkim_selector = dkim_selector or settings.DKIM_SELECTOR self.dkim_headers = dkim_headers or settings.DKIM_HEADERS + if not (self._access_key_id and self._access_key): + raise ImproperlyConfigured("AWS SES credentials are required.") + self.connection = None def open(self): - """Create a connection to the AWS API server. This can be reused for - sending multiple emails. + """Create a connection to the AWS SES API server. + + Returns: + bool: True if a new connection was created, False otherwise. """ if self.connection: return False @@ -78,264 +111,151 @@ class SESBackend(BaseEmailBackend): region_name=self._region_name, endpoint_url=self._endpoint_url, ) - - except Exception: + return True + except Exception as e: + logger.error(f"Failed to connect to SES: {e}") if not self.fail_silently: raise - - return True + return False def close(self): - """Close any open HTTP connections to the API server. - """ + """Close the SES API connection.""" self.connection = None - def send_messages(self, email_messages): - """Sends one or more EmailMessage objects and returns the number of - email messages sent and a list of filtered emails. + def get_rate_limit(self): + """Retrieve and cache the SES maximum send rate. + + Returns: + float: The maximum send rate per second. + + Raises: + Exception: If no connection is available to fetch the rate limit. """ - logger.info("1 --- start of send_messages") - list_of_response = [] - num_sent = 0 - not_sent_list = [] - sent_message = {"Sent":""} - calling_func = '' + cache_key = f"ses_rate_limit_{self._access_key_id}" + rate_limit = cache.get(cache_key) + if rate_limit is not None: + logger.debug(f"Retrieved cached rate limit: {rate_limit}") + return rate_limit + + logger.debug("Fetching new rate limit from AWS SES") + new_conn_created = self.open() + if not self.connection: + raise Exception("No connection to check SES rate limit.") + try: - fcount = 0 - while sys._getframe(fcount).f_code.co_name in ['send_messages', 'send', 'mail_admins', 'send_mail', 'emit', 'handle']: - fcount +=1 - - calling_func = sys._getframe(fcount).f_code.co_name - - except Exception as e: - logger.info("fcount:%s, called from exception = %s" % (fcount, e)) - - logger.info("called from %s current throttle:%s" % (calling_func, self._throttle)) - - - - logger.info("send_messages") + quota_dict = self.connection.get_send_quota() + rate_limit = float(quota_dict['MaxSendRate']) + cache.set(cache_key, rate_limit, timeout=3600) # Cache for 1 hour + return rate_limit + finally: + if new_conn_created: + self.close() + + def send_messages(self, email_messages): + """Send one or more EmailMessage objects. + + Args: + email_messages (list): List of EmailMessage objects to send. + + Returns: + tuple: (number of messages sent, dictionary with sent/not sent info) + """ if not email_messages: - logger.info("no email messages returning") - list_of_response.append({'error':'no email messages returning'}) + logger.debug("No email messages to send") + return 0, {"Sent": ""} new_conn_created = self.open() - if new_conn_created: - logger.info("created a new connection") - if not self.connection: - # Failed silently - logger.info("no connection returning") - list_of_response.append({'error':'no connection returning'}) - logger.info("DEBUGING EMAILS --- list_of_response:%s" % (list_of_response)) - logger.info("DEBUGING EMAILS --- return %s" % (num_sent)) - return num_sent + logger.error("Failed to establish SES connection") + return 0, {"Sent": ""} + + num_sent, sent_message, list_of_response = 0, {"Sent": ""}, [] + source = settings.AWS_SES_RETURN_PATH or settings.DEFAULT_FROM_EMAIL + not_sent_list = [] - - - source = settings.AWS_SES_RETURN_PATH - - logger.info("email_messages: %s" % email_messages) - for message in email_messages: - # SES Configuration sets. If the AWS_SES_CONFIGURATION_SET setting - # is not None, append the appropriate header to the message so that - # SES knows which configuration set it belongs to. - # - # If settings.AWS_SES_CONFIGURATION_SET is a callable, pass it the - # message object and dkim settings and expect it to return a string - # containing the SES Configuration Set name. - message.aws_ses_response = {'error':'not sent yet'} - - logger.info("Sending signal(email_pre_send)") - logger.info("message to: %s, cc: %s, bcc: %s" % (message.to, message.cc, message.bcc)) + message.aws_ses_response = {'error': 'not sent yet'} signals.email_pre_send.send_robust(self.__class__, message=message) - - # for log in dir(message): - # logger.info(log) + pre_filter_recipients = message.recipients() - logger.info("message.recipients() = %s" % message.recipients()) - - marketing = message.extra_headers.get("marketing","False") + message.to = utils.filter_recipients(message.to) + message.cc = utils.filter_recipients(message.cc) + message.bcc = utils.filter_recipients(message.bcc) - message.to = utils.filter_recipiants(message.to) - message.cc = utils.filter_recipiants(message.cc) - message.bcc = utils.filter_recipiants(message.bcc) - - logger.info("message.recipients() after email_pre_send: %s" % message.recipients()) - - - - if not message.recipients(): - logger.info("no recipients left after the filter") - list_of_response.append({'error':'no recipients left after the filter'}) - message.aws_ses_response = {'error':'no recipients left after the filter'} - sent_message = {"Not Sent":"No recipients left after filters"} + logger.debug("No recipients after filtering") + message.aws_ses_response = {'error': 'no recipients left after filtering'} + list_of_response.append({'error': 'no recipients left after filtering'}) continue - - #raise Exception('No emails left after filters!') - else: - for email in pre_filter_recipients: - if email not in message.recipients(): - not_sent_list.append(email) - - if (settings.AWS_SES_CONFIGURATION_SET - and 'X-SES-CONFIGURATION-SET' not in message.extra_headers): + + not_sent_list.extend([email for email in pre_filter_recipients if email not in message.recipients()]) + + if settings.AWS_SES_CONFIGURATION_SET and 'X-SES-CONFIGURATION-SET' not in message.extra_headers: if callable(settings.AWS_SES_CONFIGURATION_SET): - message.extra_headers[ - 'X-SES-CONFIGURATION-SET'] = settings.AWS_SES_CONFIGURATION_SET( - message, - dkim_domain=self.dkim_domain, - dkim_key=self.dkim_key, - dkim_selector=self.dkim_selector, - dkim_headers=self.dkim_headers - ) + message.extra_headers['X-SES-CONFIGURATION-SET'] = settings.AWS_SES_CONFIGURATION_SET( + message, dkim_domain=self.dkim_domain, dkim_key=self.dkim_key, + dkim_selector=self.dkim_selector, dkim_headers=self.dkim_headers + ) else: - message.extra_headers[ - 'X-SES-CONFIGURATION-SET'] = settings.AWS_SES_CONFIGURATION_SET + message.extra_headers['X-SES-CONFIGURATION-SET'] = settings.AWS_SES_CONFIGURATION_SET - # Automatic throttling. Assumes that this is the only SES client - # currently operating. The AWS_SES_AUTO_THROTTLE setting is a - # factor to apply to the rate limit, with a default of 0.5 to stay - # well below the actual SES throttle. - # Set the setting to 0 or None to disable throttling. if self._throttle: - global recent_send_times - logger.info("inside if _throttle recent_send_times:%s" % recent_send_times) - now = datetime.now() - logger.info("inside if _throttle now:%s" % now) - - # Get and cache the current SES max-per-second rate limit - # returned by the SES API. - logger.info("inside if _throttle calling get_rate_limit") - rate_limit = self.get_rate_limit() - logger.info("send_messages.throttle rate_limit='{}'".format(rate_limit)) - - # Prune from recent_send_times anything more than a few seconds - # ago. Even though SES reports a maximum per-second, the way - # they enforce the limit may not be on a one-second window. - # To be safe, we use a two-second window (but allow 2 times the - # rate limit) and then also have a default rate limit factor of - # 0.5 so that we really limit the one-second amount in two - # seconds. - window = 2.0 # seconds + cache_key = "ses_recent_send_times" + recent_send_times = cache.get(cache_key, []) + window = 2.0 window_start = now - timedelta(seconds=window) - new_send_times = [] - for time in recent_send_times: - if time > window_start: - new_send_times.append(time) - recent_send_times = new_send_times + recent_send_times = [t for t in recent_send_times if t > window_start] - # If the number of recent send times in the last 1/_throttle - # seconds exceeds the rate limit, add a delay. - # Since I'm not sure how Amazon determines at exactly what - # point to throttle, better be safe than sorry and let in, say, - # half of the allowed rate. - if len(new_send_times) > rate_limit * window * self._throttle: - # Sleep the remainder of the window period. - delta = now - new_send_times[0] - total_seconds = (delta.microseconds + (delta.seconds + - delta.days * 24 * 3600) * 10**6) / 10**6 + rate_limit = self.get_rate_limit() + if len(recent_send_times) > rate_limit * window * self._throttle: + delta = now - recent_send_times[0] + total_seconds = delta.total_seconds() delay = window - total_seconds if delay > 0: sleep(delay) recent_send_times.append(now) - # end of throttling + cache.set(cache_key, recent_send_times, timeout=2) try: - logger.info("Try to send raw email") - #logger.info('message.message().as_string() = %s' % message.message().as_string()) - logger.info("source = %s" % source) - logger.info("message.from_email = %s" % message.from_email) - logger.info("message.recipients() = %s" % message.recipients()) - - logger.info("dkim_key = %s" % self.dkim_key) - logger.info("dkim_domain = %s" % self.dkim_domain) - logger.info("dkim_selector = %s" % self.dkim_selector) - logger.info("dkim_headers = %s" % str(self.dkim_headers)) response = self.connection.send_raw_email( Source=source or message.from_email, Destinations=message.recipients(), - # todo attachments? - RawMessage={'Data': dkim_sign(message.message().as_string(), - dkim_key=self.dkim_key, - dkim_domain=self.dkim_domain, - dkim_selector=self.dkim_selector, - dkim_headers=self.dkim_headers)} + RawMessage={'Data': dkim_sign( + message.message().as_string(), + dkim_key=self.dkim_key, + dkim_domain=self.dkim_domain, + dkim_selector=self.dkim_selector, + dkim_headers=self.dkim_headers + )} ) - - list_of_response.append(response) - message.aws_ses_response = response - - message.extra_headers['status'] = 200 - message.extra_headers['message_id'] = response['MessageId'] - message.extra_headers['request_id'] = response['ResponseMetadata']['RequestId'] + message.extra_headers.update({ + 'status': 200, + 'message_id': response['MessageId'], + 'request_id': response['ResponseMetadata']['RequestId'] + }) num_sent += 1 - if 'X-SES-CONFIGURATION-SET' in message.extra_headers: - logger.info( - u"send_messages.sent from='{}' recipients='{}' message_id='{}' request_id='{}' " - u"ses-configuration-set='{}'".format( - message.from_email, - ", ".join(message.recipients()), - message.extra_headers['message_id'], - message.extra_headers['request_id'], - message.extra_headers['X-SES-CONFIGURATION-SET'] - )) - else: - logger.info(u"send_messages.sent from='{}' recipients='{}' message_id='{}' request_id='{}'".format( - message.from_email, - ", ".join(message.recipients()), - message.extra_headers['message_id'], - message.extra_headers['request_id'] - )) - + logger.info( + f"Sent email from {message.from_email} to {', '.join(message.recipients())}, " + f"message_id={message.extra_headers['message_id']}, request_id={message.extra_headers['request_id']}" + ) + list_of_response.append(response) except ResponseError as err: - # Store failure information so to post process it if required - error_keys = ['status', 'reason', 'body', 'request_id', - 'error_code', 'error_message'] - for key in error_keys: - message.extra_headers[key] = getattr(err, key, None) + logger.error(f"Failed to send email: {err}") + message.extra_headers.update({ + key: getattr(err, key, None) for key in ['status', 'reason', 'body', 'request_id', 'error_code', 'error_message'] + }) + list_of_response.append({'error': str(err)}) if not self.fail_silently: raise + if not_sent_list: - sent_message.update({"Sent":"%s" % not_sent_list}) - - logger.info("new_conn_created: %s" % new_conn_created) + sent_message["Sent"] = ", ".join(not_sent_list) + if new_conn_created: - logger.info("closing new connection after send") self.close() - - logger.info("DEBUGING EMAILS --- list_of_response:%s" % (list_of_response)) - logger.info("DEBUGING EMAILS --- return %s, %s" % (num_sent, sent_message)) - - - return num_sent, sent_message - def get_rate_limit(self): - logger.info("getting rate limit") - if self._access_key_id in cached_rate_limits: - logger.info("returning cached rate limit %s" % cached_rate_limits[self._access_key_id]) - return cached_rate_limits[self._access_key_id] - - logger.info("creating AWS connection") - new_conn_created = self.open() - if not self.connection: - logger.info("AWS connection creation failed") - raise Exception( - "No connection is available to check current SES rate limit.") - try: - quota_dict = self.connection.get_send_quota() - logger.info("AWS quota dict %s" % quota_dict) - max_per_second = quota_dict['MaxSendRate'] - ret = float(max_per_second) - cached_rate_limits[self._access_key_id] = ret - return ret - finally: - if new_conn_created: - self.close() - - \ No newline at end of file + logger.debug(f"Sent {num_sent} messages, response: {list_of_response}") + return num_sent, sent_message \ No newline at end of file diff --git a/django_aws_ses/utils.py b/django_aws_ses/utils.py index 0676f21..1133c34 100644 --- a/django_aws_ses/utils.py +++ b/django_aws_ses/utils.py @@ -212,11 +212,11 @@ def receiver_email_pre_send(sender, message=None, **kwargs): """ pass -def filter_recipiants(recipiant_list): +def filter_recipients(recipiant_list): """ Filters a list of recipient email addresses to exclude invalid or blacklisted emails. """ - logger.info("Starting filter_recipiants: %s", recipiant_list) + logger.info("Starting filter_recipients: %s", recipiant_list) # Ensure recipient_list is a list if not isinstance(recipiant_list, list): @@ -224,43 +224,43 @@ def filter_recipiants(recipiant_list): recipiant_list = [recipiant_list] if recipiant_list: - recipiant_list = filter_recipiants_with_unsubscribe(recipiant_list) - recipiant_list = filter_recipiants_with_complaint_records(recipiant_list) - recipiant_list = filter_recipiants_with_bounce_records(recipiant_list) - recipiant_list = filter_recipiants_with_validater_email_domain(recipiant_list) + recipiant_list = filter_recipients_with_unsubscribe(recipiant_list) + recipiant_list = filter_recipients_with_complaint_records(recipiant_list) + recipiant_list = filter_recipients_with_bounce_records(recipiant_list) + recipiant_list = filter_recipients_with_validater_email_domain(recipiant_list) logger.info("Filtered recipient list: %s", recipiant_list) return recipiant_list -def filter_recipiants_with_unsubscribe(recipiant_list): +def filter_recipients_with_unsubscribe(recipiant_list): """ Removes recipients who have unsubscribed. """ blacklist_emails = list(set([record.email for record in User.objects.filter(aws_ses__unsubscribe=True)])) - return filter_recipiants_with_blacklist(recipiant_list, blacklist_emails) if blacklist_emails else recipiant_list + return filter_recipients_with_blacklist(recipiant_list, blacklist_emails) if blacklist_emails else recipiant_list -def filter_recipiants_with_complaint_records(recipiant_list): +def filter_recipients_with_complaint_records(recipiant_list): """ Removes recipients with complaint records. """ blacklist_emails = list(set([record.email for record in ComplaintRecord.objects.filter(email__isnull=False)])) - return filter_recipiants_with_blacklist(recipiant_list, blacklist_emails) if blacklist_emails else recipiant_list + return filter_recipients_with_blacklist(recipiant_list, blacklist_emails) if blacklist_emails else recipiant_list -def filter_recipiants_with_bounce_records(recipiant_list): +def filter_recipients_with_bounce_records(recipiant_list): """ Removes recipients with bounce records exceeding SES_BOUNCE_LIMIT. """ blacklist_emails = list(set([record.email for record in BounceRecord.objects.filter(email__isnull=False) .annotate(total=Count('email')).filter(total__gte=settings.SES_BOUNCE_LIMIT)])) - return filter_recipiants_with_blacklist(recipiant_list, blacklist_emails) if blacklist_emails else recipiant_list + return filter_recipients_with_blacklist(recipiant_list, blacklist_emails) if blacklist_emails else recipiant_list -def filter_recipiants_with_blacklist(recipiant_list, blacklist_emails): +def filter_recipients_with_blacklist(recipiant_list, blacklist_emails): """ Filters out emails from a blacklist. """ return [email for email in recipiant_list if email not in blacklist_emails] -def filter_recipiants_with_validater_email_domain(recipiant_list): +def filter_recipients_with_validater_email_domain(recipiant_list): """ Validates email domains for new recipients. """