From bbfdfec0dc1db96fd1b09d77b8929326c85ad1e5 Mon Sep 17 00:00:00 2001 From: Raymond Jessop Date: Fri, 18 Apr 2025 13:02:28 -0500 Subject: [PATCH] working on cleaning up the project --- django_aws_ses/settings.py | 223 +++++++++++++++---------- django_aws_ses/utils.py | 332 ++++++++++++++++--------------------- 2 files changed, 276 insertions(+), 279 deletions(-) diff --git a/django_aws_ses/settings.py b/django_aws_ses/settings.py index e149aa0..f8e29b4 100644 --- a/django_aws_ses/settings.py +++ b/django_aws_ses/settings.py @@ -1,106 +1,153 @@ +import logging +import os from django.conf import settings from django.contrib.sites.models import Site -import logging +from django.core.exceptions import ImproperlyConfigured -from .models import ( - AwsSesSettings +from .models import AwsSesSettings + +# Define constants for default values +DEFAULTS = { + 'AWS_SES_REGION_NAME': 'us-east-1', + 'AWS_SES_REGION_ENDPOINT': 'email.us-east-1.amazonaws.com', + 'AWS_SES_AUTO_THROTTLE': 0.5, + 'AWS_SES_RETURN_PATH': None, + 'AWS_SES_CONFIGURATION_SET': None, + 'DKIM_SELECTOR': 'ses', + 'DKIM_HEADERS': ('From', 'To', 'Cc', 'Subject'), + 'VERIFY_BOUNCE_SIGNATURES': True, + 'BOUNCE_CERT_DOMAINS': ('amazonaws.com', 'amazon.com'), + 'SES_BOUNCE_LIMIT': 1, + 'SES_BACKEND_DEBUG': False, + 'SES_BACKEND_DEBUG_LOGFILE_FORMATTER': '%(asctime)s - %(name)s - %(levelname)s - %(message)s', + 'DEFAULT_FROM_EMAIL': 'no_reply@example.com', + 'UNSUBSCRIBE_TEMPLATE': 'django_aws_ses/unsubscribe.html', + 'BASE_TEMPLATE': 'django_aws_ses/base.html', +} + +# Selectively export key settings +__all__ = ( + 'ACCESS_KEY', 'SECRET_KEY', 'AWS_SES_REGION_NAME', 'AWS_SES_REGION_ENDPOINT', + 'AWS_SES_AUTO_THROTTLE', 'AWS_SES_RETURN_PATH', 'AWS_SES_CONFIGURATION_SET', + 'DKIM_DOMAIN', 'DKIM_PRIVATE_KEY', 'DKIM_SELECTOR', 'DKIM_HEADERS', + 'TIME_ZONE', 'BASE_DIR', 'SES_BOUNCE_LIMIT', 'SES_BACKEND_DEBUG', + 'SES_BACKEND_DEBUG_LOGFILE_PATH', 'SES_BACKEND_DEBUG_LOGFILE_FORMATTER', + 'DEFAULT_FROM_EMAIL', 'HOME_URL', 'UNSUBSCRIBE_TEMPLATE', 'BASE_TEMPLATE', + 'VERIFY_BOUNCE_SIGNATURES', 'BOUNCE_CERT_DOMAINS', 'logger', +) + +def get_aws_ses_settings(): + """ + Retrieve AwsSesSettings from the database for the current site. + Returns None if the settings cannot be retrieved. + """ + try: + return AwsSesSettings.objects.get(site_id=settings.SITE_ID) + except (AwsSesSettings.DoesNotExist, AttributeError) as e: + logger.warning("Failed to retrieve AwsSesSettings: %s", e) + return None + +def configure_logger(debug, log_file_path, formatter): + """ + Configure the logger for the AWS SES app. + Sets up file logging if debug is enabled and a valid log file path is provided. + """ + logger = logging.getLogger('django_aws_ses') + logger.setLevel(logging.DEBUG if debug else logging.WARNING) + + if debug and log_file_path: + # Validate log file path + log_dir = os.path.dirname(log_file_path) + if log_dir and not os.path.exists(log_dir): + os.makedirs(log_dir, exist_ok=True) + try: + handler = logging.FileHandler(log_file_path) + handler.setLevel(logging.DEBUG) + formatter = logging.Formatter(formatter) + handler.setFormatter(formatter) + logger.addHandler(handler) + except OSError as e: + logger.error("Failed to configure log file %s: %s", log_file_path, e) + + return logger + +# Initialize logger early to capture any setup errors +BASE_DIR = getattr(settings, 'BASE_DIR', None) +if not BASE_DIR: + raise ImproperlyConfigured( + "BASE_DIR must be defined in Django settings and point to the project root directory." ) +# Temporary logger for setup phase +logger = logging.getLogger('django_aws_ses') -__all__ = ('ACCESS_KEY', 'SECRET_KEY', 'AWS_SES_REGION_NAME', - 'AWS_SES_REGION_ENDPOINT', 'AWS_SES_AUTO_THROTTLE', - 'AWS_SES_RETURN_PATH', 'DKIM_DOMAIN', 'DKIM_PRIVATE_KEY', - 'DKIM_SELECTOR', 'DKIM_HEADERS', 'TIME_ZONE', 'BASE_DIR', - 'BOUNCE_LIMIT','SES_BACKEND_DEBUG','SES_BACKEND_DEBUG_LOGFILE_PATH', - 'SES_BACKEND_DEBUG_LOGFILE_FORMATTER') +# Fetch AwsSesSettings from database +aws_ses_settings = get_aws_ses_settings() -aws_ses_Settings = None +# AWS Credentials +ACCESS_KEY = aws_ses_settings.access_key if aws_ses_settings else getattr( + settings, 'AWS_SES_ACCESS_KEY_ID', getattr(settings, 'AWS_ACCESS_KEY_ID', None) +) +SECRET_KEY = aws_ses_settings.secret_key if aws_ses_settings else getattr( + settings, 'AWS_SES_SECRET_ACCESS_KEY', getattr(settings, 'AWS_SECRET_ACCESS_KEY', None) +) -try: - aws_ses_Settings, c = AwsSesSettings.objects.get_or_create(site_id=settings.SITE_ID) -except Exception as e: - print("AwsSesSettings does not exist: error: %s" % e) - -ACCESS_KEY = aws_ses_Settings.access_key if aws_ses_Settings else None -if ACCESS_KEY is None: - ACCESS_KEY = getattr(settings, 'AWS_SES_ACCESS_KEY_ID',getattr(settings, 'AWS_ACCESS_KEY_ID', None)) - -SECRET_KEY = aws_ses_Settings.secret_key if aws_ses_Settings else None -if SECRET_KEY is None: - SECRET_KEY = getattr(settings, 'AWS_SES_SECRET_ACCESS_KEY',getattr(settings, 'AWS_SECRET_ACCESS_KEY', None)) - -AWS_SES_REGION_NAME = aws_ses_Settings.region_name if aws_ses_Settings else None -if AWS_SES_REGION_NAME is None: - AWS_SES_REGION_NAME = getattr(settings, 'AWS_SES_REGION_NAME',getattr(settings, 'AWS_DEFAULT_REGION', 'us-east-1')) - -AWS_SES_REGION_ENDPOINT = aws_ses_Settings.region_endpoint if aws_ses_Settings else None -if AWS_SES_REGION_ENDPOINT is None: - AWS_SES_REGION_ENDPOINT = getattr(settings, 'AWS_SES_REGION_ENDPOINT','email.us-east-1.amazonaws.com') - -BASE_DIR = getattr(settings, 'BASE_DIR', None) +# Validate credentials +if not (ACCESS_KEY and SECRET_KEY): + raise ImproperlyConfigured( + "AWS SES credentials (ACCESS_KEY and SECRET_KEY) must be provided via AwsSesSettings or Django settings." + ) -DEFAULT_FROM_EMAIL = "" +# AWS SES Configuration +AWS_SES_REGION_NAME = aws_ses_settings.region_name if aws_ses_settings else getattr( + settings, 'AWS_SES_REGION_NAME', getattr(settings, 'AWS_DEFAULT_REGION', DEFAULTS['AWS_SES_REGION_NAME']) +) +AWS_SES_REGION_ENDPOINT = aws_ses_settings.region_endpoint if aws_ses_settings else getattr( + settings, 'AWS_SES_REGION_ENDPOINT', DEFAULTS['AWS_SES_REGION_ENDPOINT'] +) +AWS_SES_AUTO_THROTTLE = getattr(settings, 'AWS_SES_AUTO_THROTTLE', DEFAULTS['AWS_SES_AUTO_THROTTLE']) +AWS_SES_RETURN_PATH = getattr(settings, 'AWS_SES_RETURN_PATH', DEFAULTS['AWS_SES_RETURN_PATH']) +AWS_SES_CONFIGURATION_SET = getattr(settings, 'AWS_SES_CONFIGURATION_SET', DEFAULTS['AWS_SES_CONFIGURATION_SET']) + +# DKIM Settings +DKIM_DOMAIN = getattr(settings, 'DKIM_DOMAIN', None) +DKIM_PRIVATE_KEY = getattr(settings, 'DKIM_PRIVATE_KEY', None) +DKIM_SELECTOR = getattr(settings, 'DKIM_SELECTOR', DEFAULTS['DKIM_SELECTOR']) +DKIM_HEADERS = getattr(settings, 'DKIM_HEADERS', DEFAULTS['DKIM_HEADERS']) + +# Email Settings try: site = Site.objects.get_current() - - DEFAULT_FROM_EMAIL = getattr(settings, 'DEFAULT_FROM_EMAIL', 'no_reply@%s' % site.domain) -except Exception as e: - print("Site Doesn't Exist, please configure Django sites") - print("Error is: %s" % e) - + DEFAULT_FROM_EMAIL = getattr(settings, 'DEFAULT_FROM_EMAIL', f"no-reply@{site.domain}") +except Site.DoesNotExist: + DEFAULT_FROM_EMAIL = getattr(settings, 'DEFAULT_FROM_EMAIL', DEFAULTS['DEFAULT_FROM_EMAIL']) + logger.warning( + "Django sites framework not configured. Using DEFAULT_FROM_EMAIL: %s. " + "Configure the Site model or set DEFAULT_FROM_EMAIL in settings.", DEFAULT_FROM_EMAIL + ) + HOME_URL = getattr(settings, 'HOME_URL', '') -if not BASE_DIR: - raise RuntimeError('No BASE_DIR defined in project settings, django_aws_ses requires BASE_DIR to be defined and pointed at your root directory. i.e. BASE_DIR = os.path.dirname(os.path.abspath(__file__))') +# Template Settings +UNSUBSCRIBE_TEMPLATE = getattr(settings, 'UNSUBSCRIBE_TEMPLATE', DEFAULTS['UNSUBSCRIBE_TEMPLATE']) +BASE_TEMPLATE = getattr(settings, 'BASE_TEMPLATE', DEFAULTS['BASE_TEMPLATE']) -UNSUBSCRIBE_TEMPLET = getattr(settings, 'UNSUBSCRIBE_TEMPLET', 'django_aws_ses/unsubscribe.html') -BASE_TEMPLET = getattr(settings, 'UNSUBSCRIBE_TEMPLET', 'django_aws_ses/base.html') - -AWS_SES_REGION_ENDPOINT_URL = getattr(settings, 'AWS_SES_REGION_ENDPOINT_URL','https://' + AWS_SES_REGION_ENDPOINT) - -AWS_SES_AUTO_THROTTLE = getattr(settings, 'AWS_SES_AUTO_THROTTLE', 0.5) -AWS_SES_RETURN_PATH = getattr(settings, 'AWS_SES_RETURN_PATH', None) +# Bounce and Verification Settings +VERIFY_BOUNCE_SIGNATURES = getattr(settings, 'AWS_SES_VERIFY_BOUNCE_SIGNATURES', DEFAULTS['VERIFY_BOUNCE_SIGNATURES']) +BOUNCE_CERT_DOMAINS = getattr(settings, 'AWS_SNS_BOUNCE_CERT_TRUSTED_DOMAINS', DEFAULTS['BOUNCE_CERT_DOMAINS']) +SES_BOUNCE_LIMIT = getattr(settings, 'SES_BOUNCE_LIMIT', DEFAULTS['SES_BOUNCE_LIMIT']) -# if AWS_SES_RETURN_PATH is None: -# AWS_SES_RETURN_PATH = "CF Doors " - -AWS_SES_CONFIGURATION_SET = getattr(settings, 'AWS_SES_CONFIGURATION_SET', None) - -DKIM_DOMAIN = getattr(settings, "DKIM_DOMAIN", None) -DKIM_PRIVATE_KEY = getattr(settings, 'DKIM_PRIVATE_KEY', None) -DKIM_SELECTOR = getattr(settings, 'DKIM_SELECTOR', 'ses') -DKIM_HEADERS = getattr(settings, 'DKIM_HEADERS', ('From', 'To', 'Cc', 'Subject')) +# Debug Settings +SES_BACKEND_DEBUG = getattr(settings, 'SES_BACKEND_DEBUG', DEFAULTS['SES_BACKEND_DEBUG']) +SES_BACKEND_DEBUG_LOGFILE_PATH = getattr( + settings, 'SES_BACKEND_DEBUG_LOGFILE_PATH', os.path.join(BASE_DIR, 'aws_ses.log') +) +SES_BACKEND_DEBUG_LOGFILE_FORMATTER = getattr( + settings, 'SES_BACKEND_DEBUG_LOGFILE_FORMATTER', DEFAULTS['SES_BACKEND_DEBUG_LOGFILE_FORMATTER'] +) +# Timezone TIME_ZONE = settings.TIME_ZONE -VERIFY_BOUNCE_SIGNATURES = getattr(settings, 'AWS_SES_VERIFY_BOUNCE_SIGNATURES', True) - -# Domains that are trusted when retrieving the certificate -# used to sign bounce messages. -BOUNCE_CERT_DOMAINS = getattr(settings, 'AWS_SNS_BOUNCE_CERT_TRUSTED_DOMAINS', ( - 'amazonaws.com', - 'amazon.com', -)) - -SES_BOUNCE_LIMIT = getattr(settings,'BOUNCE_LIMT', 1) - -SES_BACKEND_DEBUG = getattr(settings,'SES_BACKEND_DEBUG', False) - -SES_BACKEND_DEBUG_LOGFILE_PATH = getattr(settings,'SES_BACKEND_DEBUG_LOGFILE_PATH', '%s/aws_ses.log' % BASE_DIR) - -SES_BACKEND_DEBUG_LOGFILE_FORMATTER = getattr(settings,'SES_BACKEND_DEBUG_LOGFILE_FORMATTER', '%(asctime)s - %(name)s - %(levelname)s - %(message)s') - -logger = logging.getLogger('django_aws_ses') -# logger.setLevel(logging.WARNING) -if SES_BACKEND_DEBUG: - logger.setLevel(logging.INFO) - # create a file handler - if SES_BACKEND_DEBUG_LOGFILE_PATH: - handler = logging.FileHandler(SES_BACKEND_DEBUG_LOGFILE_PATH) - handler.setLevel(logging.INFO) - # create a logging format - formatter = logging.Formatter(SES_BACKEND_DEBUG_LOGFILE_FORMATTER) - handler.setFormatter(formatter) - # add the handlers to the logger - logger.addHandler(handler) - #logger.info('something we are logging') +# Configure logger with final settings +logger = configure_logger(SES_BACKEND_DEBUG, SES_BACKEND_DEBUG_LOGFILE_PATH, SES_BACKEND_DEBUG_LOGFILE_FORMATTER) \ No newline at end of file diff --git a/django_aws_ses/utils.py b/django_aws_ses/utils.py index 758b041..0676f21 100644 --- a/django_aws_ses/utils.py +++ b/django_aws_ses/utils.py @@ -1,23 +1,18 @@ import base64 -import logging -import time import re import dns.resolver -from telnetlib import Telnet -from builtins import str as text -from builtins import bytes from io import StringIO -try: - from urllib.parse import urlparse -except ImportError: - from urlparse import urlparse +from urllib.parse import urlparse from django.core.exceptions import ImproperlyConfigured from django.utils.encoding import smart_str from django.dispatch.dispatcher import receiver from django.db.models import Count - -from django.contrib.auth import get_user_model # If used custom user model -User = get_user_model() +from django.contrib.auth import get_user_model +from cryptography import x509 +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import padding +from cryptography.exceptions import InvalidSignature +import requests from . import settings from . import signals @@ -26,148 +21,146 @@ from .models import ( ComplaintRecord, BlackListedDomains, SendRecord, - ) +) +from django.contrib.auth import get_user_model +User = get_user_model() + +# Logger setup logger = settings.logger - class BounceMessageVerifier(object): """ - A utility class for validating bounce messages - - See: http://docs.amazonwebservices.com/sns/latest/gsg/SendMessageToHttp.verify.signature.html + Utility class for validating AWS SES/SNS bounce messages. + Verifies the message signature using the provided certificate. """ - def __init__(self, bounce_dict): - """ - Creates a new bounce message from the given dict. - """ + # Initialize with the bounce message dictionary self._data = bounce_dict self._verified = None + self._certificate = None def is_verified(self): """ - Verifies an SES bounce message. - + Verifies the signature of an SES bounce message. + Returns True if the signature is valid, False otherwise. """ if self._verified is None: + # Extract and decode the signature signature = self._data.get('Signature') if not signature: + logger.warning("No signature found in bounce message") self._verified = False return self._verified - # Decode the signature from base64 - signature = bytes(base64.b64decode(signature)) + try: + signature = base64.b64decode(signature) + except ValueError: + logger.warning("Invalid base64 signature") + self._verified = False + return self._verified - # Get the message to sign + # Get the message bytes to verify against sign_bytes = self._get_bytes_to_sign() if not sign_bytes: + logger.warning("Could not generate bytes to sign") self._verified = False return self._verified - if not self.certificate: + # Load the signing certificate + certificate = self.certificate + if not certificate: + logger.warning("No valid certificate available") self._verified = False return self._verified - # Extract the public key - pkey = self.certificate.get_pubkey() - - # Use the public key to verify the signature. - pkey.verify_init() - pkey.verify_update(sign_bytes) - verify_result = pkey.verify_final(signature) - - self._verified = verify_result == 1 + # Verify the signature using the certificate's public key + try: + public_key = certificate.public_key() + public_key.verify( + signature, + sign_bytes, + padding.PKCS1v15(), + hashes.SHA1() # AWS SNS uses SHA1 + ) + self._verified = True + except InvalidSignature: + logger.warning("Signature verification failed: Invalid signature") + self._verified = False + except Exception as e: + logger.warning("Signature verification failed: %s", e) + self._verified = False return self._verified @property def certificate(self): """ - Retrieves the certificate used to sign the bounce message. - - TODO: Cache the certificate based on the cert URL so we don't have to - retrieve it for each bounce message. *We would need to do it in a - secure way so that the cert couldn't be overwritten in the cache* + Fetches and loads the X.509 certificate used to sign the bounce message. + Returns None if the certificate cannot be loaded. """ - if not hasattr(self, '_certificate'): + if self._certificate is None: cert_url = self._get_cert_url() - # Only load certificates from a certain domain? - # Without some kind of trusted domain check, any old joe could - # craft a bounce message and sign it using his own certificate - # and we would happily load and verify it. - if not cert_url: - self._certificate = None - return self._certificate + logger.warning("No valid certificate URL") + return None + # Ensure requests is available try: import requests except ImportError: raise ImproperlyConfigured( "`requests` is required for bounce message verification. " - "Please consider installing the `django-ses` with the " - "`bounce` extra - e.g. `pip install django-ses[bounce]`." + "Install with `pip install requests`." ) + # Ensure cryptography is available try: - from M2Crypto import X509 + from cryptography import x509 + from cryptography.hazmat.backends import default_backend except ImportError: raise ImproperlyConfigured( - "`M2Crypto` is required for bounce message verification. " - "Please consider installing the `django-ses` with the " - "`bounce` extra - e.g. `pip install django-ses[bounce]`." + "`cryptography` is required for bounce message verification. " + "Install with `pip install cryptography`." ) - # We use requests because it verifies the https certificate - # when retrieving the signing certificate. If https was somehow - # hijacked then all bets are off. + # Fetch the certificate response = requests.get(cert_url) if response.status_code != 200: - logger.warning(u'Could not download certificate from %s: "%s"', cert_url, response.status_code) - self._certificate = None - return self._certificate + logger.warning("Failed to download certificate from %s: %s", cert_url, response.status_code) + return None - # Handle errors loading the certificate. - # If the certificate is invalid then return - # false as we couldn't verify the message. + # Load the certificate try: - self._certificate = X509.load_cert_string(response.content) - except X509.X509Error as e: - logger.warning(u'Could not load certificate from %s: "%s"', cert_url, e) - self._certificate = None + self._certificate = x509.load_pem_x509_certificate(response.content, default_backend()) + except Exception as e: + logger.warning("Failed to load certificate from %s: %s", cert_url, e) + return None return self._certificate def _get_cert_url(self): """ - Get the signing certificate URL. - Only accept urls that match the domains set in the - AWS_SNS_BOUNCE_CERT_TRUSTED_DOMAINS setting. Sub-domains - are allowed. i.e. if amazonaws.com is in the trusted domains - then sns.us-east-1.amazonaws.com will match. + Retrieves the certificate URL from the message, ensuring it comes from a trusted domain. + Returns None if the URL is untrusted or invalid. """ cert_url = self._data.get('SigningCertURL') - if cert_url: - if cert_url.startswith('https://'): - url_obj = urlparse(cert_url) - for trusted_domain in settings.BOUNCE_CERT_DOMAINS: - parts = trusted_domain.split('.') - if url_obj.netloc.split('.')[-len(parts):] == parts: - return cert_url - logger.warning(u'Untrusted certificate URL: "%s"', cert_url) + if cert_url and cert_url.startswith('https://'): + url_obj = urlparse(cert_url) + for trusted_domain in settings.BOUNCE_CERT_DOMAINS: + parts = trusted_domain.split('.') + if url_obj.netloc.split('.')[-len(parts):] == parts: + return cert_url + logger.warning("Untrusted certificate URL: %s", cert_url) else: - logger.warning(u'No signing certificate URL: "%s"', cert_url) + logger.warning("No/invalid certificate URL: %s", cert_url) return None def _get_bytes_to_sign(self): """ - Creates the message used for signing SNS notifications. - This is used to verify the bounce message when it is received. + Constructs the message bytes to be signed for verification. + Returns None if the message type is unrecognized. """ - - # Depending on the message type the fields to add to the message - # differ so we handle that here. msg_type = self._data.get('Type') if msg_type == 'Notification': fields_to_sign = [ @@ -178,8 +171,7 @@ class BounceMessageVerifier(object): 'TopicArn', 'Type', ] - elif (msg_type == 'SubscriptionConfirmation' or - msg_type == 'UnsubscribeConfirmation'): + elif msg_type in ('SubscriptionConfirmation', 'UnsubscribeConfirmation'): fields_to_sign = [ 'Message', 'MessageId', @@ -190,172 +182,130 @@ class BounceMessageVerifier(object): 'Type', ] else: - # Unrecognized type - logger.warning(u'Unrecognized SNS message Type: "%s"', msg_type) + logger.warning("Unrecognized SNS message type: %s", msg_type) return None outbytes = StringIO() for field_name in fields_to_sign: - field_value = smart_str(self._data.get(field_name, ''), - errors="replace") + field_value = smart_str(self._data.get(field_name, ''), errors="replace") if field_value: - outbytes.write(text(field_name)) - outbytes.write(text("\n")) - outbytes.write(text(field_value)) - outbytes.write(text("\n")) - - response = outbytes.getvalue() - return bytes(response, 'utf-8') + outbytes.write(field_name) + outbytes.write("\n") + outbytes.write(field_value) + outbytes.write("\n") + return outbytes.getvalue().encode('utf-8') def verify_bounce_message(msg): """ - Verify an SES/SNS bounce notification message. + Verifies an SES/SNS bounce notification message. + Returns True if valid, False otherwise. """ verifier = BounceMessageVerifier(msg) return verifier.is_verified() @receiver(signals.email_pre_send) def receiver_email_pre_send(sender, message=None, **kwargs): - #logger.info("receiver_email_pre_send received signal") + """ + Signal receiver for pre-send email processing. + Currently a no-op. + """ pass def filter_recipiants(recipiant_list): - logger.info("Starting filter_recipiants: %s" % recipiant_list) - - if type(recipiant_list) != type([]): - logger.info("putting emails into a list") + """ + Filters a list of recipient email addresses to exclude invalid or blacklisted emails. + """ + logger.info("Starting filter_recipiants: %s", recipiant_list) + + # Ensure recipient_list is a list + if not isinstance(recipiant_list, list): + logger.info("Converting recipients to list") recipiant_list = [recipiant_list] - - if len(recipiant_list) > 0: - recipiant_list = filter_recipiants_with_unsubscribe(recipiant_list) - - if len(recipiant_list) > 0: + + if recipiant_list: + recipiant_list = filter_recipiants_with_unsubscribe(recipiant_list) recipiant_list = filter_recipiants_with_complaint_records(recipiant_list) - - if len(recipiant_list) > 0: recipiant_list = filter_recipiants_with_bounce_records(recipiant_list) - - if len(recipiant_list) > 0: recipiant_list = filter_recipiants_with_validater_email_domain(recipiant_list) - logger.info("recipiant list after filter_recipiants: %s" % recipiant_list) + logger.info("Filtered recipient list: %s", recipiant_list) return recipiant_list def filter_recipiants_with_unsubscribe(recipiant_list): """ - filter message recipiants so we don't send emails to any email that have Unsubscribude + Removes recipients who have unsubscribed. """ - #logger.info("unsubscribe filter running") - - #logger.info("message.recipients() befor blacklist_emails filter: %s" % recipiant_list) - blacklist_emails = list(set([record.email for record in User.objects.filter(aws_ses__unsubscribe=True)])) + blacklist_emails = list(set([record.email for record in User.objects.filter(aws_ses__unsubscribe=True)])) + return filter_recipiants_with_blacklist(recipiant_list, blacklist_emails) if blacklist_emails else recipiant_list - if blacklist_emails: - return filter_recipiants_with_blacklist(recipiant_list, blacklist_emails) - else: - return recipiant_list - def filter_recipiants_with_complaint_records(recipiant_list): """ - filter message recipiants so we don't send emails to any email that have a ComplaintRecord + Removes recipients with complaint records. """ - #logger.info("complaint_records filter running") - - #logger.info("message.recipients() befor blacklist_emails filter: %s" % recipiant_list) - blacklist_emails = list(set([record.email for record in ComplaintRecord.objects.filter(email__isnull=False)])) - - if blacklist_emails: - return filter_recipiants_with_blacklist(recipiant_list, blacklist_emails) - else: - return recipiant_list + blacklist_emails = list(set([record.email for record in ComplaintRecord.objects.filter(email__isnull=False)])) + return filter_recipiants_with_blacklist(recipiant_list, blacklist_emails) if blacklist_emails else recipiant_list def filter_recipiants_with_bounce_records(recipiant_list): """ - filter message recipiants so we dont send emails to any email that has more BounceRecord - the SES_BOUNCE_LIMIT + Removes recipients with bounce records exceeding SES_BOUNCE_LIMIT. """ - #logger.info("bounce_records filter running") - - #logger.info("message.recipients() befor blacklist_emails filter: %s" % recipiant_list) - - blacklist_emails = list(set([record.email for record in BounceRecord.objects.filter(email__isnull=False).annotate(total=Count('email')).filter(total__gte=settings.SES_BOUNCE_LIMIT)])) - - if blacklist_emails: - return filter_recipiants_with_blacklist(recipiant_list, blacklist_emails) - else: - return recipiant_list - + blacklist_emails = list(set([record.email for record in BounceRecord.objects.filter(email__isnull=False) + .annotate(total=Count('email')).filter(total__gte=settings.SES_BOUNCE_LIMIT)])) + return filter_recipiants_with_blacklist(recipiant_list, blacklist_emails) if blacklist_emails else recipiant_list + def filter_recipiants_with_blacklist(recipiant_list, blacklist_emails): """ - filter message recipiants with a list of emails you dont want to email + Filters out emails from a blacklist. """ - filtered_recipiant_list = [email for email in recipiant_list if email not in blacklist_emails] - - return filtered_recipiant_list + return [email for email in recipiant_list if email not in blacklist_emails] def filter_recipiants_with_validater_email_domain(recipiant_list): - debug_flag = True - + """ + Validates email domains for new recipients. + """ sent_list = [e.destination for e in SendRecord.objects.filter(destination__in=recipiant_list).distinct("destination")] - test_list = [e for e in recipiant_list if e not in sent_list] for e in test_list: - if not validater_email_domain(e): - recipiant_list.remove(e) - + recipiant_list.remove(e) + return recipiant_list def validater_email_domain(email): - + """ + Checks if an email's domain has valid MX records and is not blacklisted. + """ if email.find("@") < 1: - return False domain = email.split("@")[-1] - - if BlackListedDomains.objects.filter(domain=domain).count() > 0: + + if BlackListedDomains.objects.filter(domain=domain).exists(): return False - - records = [] + try: records = dns.resolver.query(domain, 'MX') - except dns.resolver.NoNameservers as e: + return len(records) > 0 + except (dns.resolver.NoNameservers, dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.resolver.LifetimeTimeout): return False - - except dns.resolver.NoAnswer as e: - return False - - except dns.resolver.NXDOMAIN as e: - return False - - except dns.resolver.LifetimeTimeout as e: - return False - - if len(records) < 1: - return False - - return True def emailIsValid(email): - - resp = False + """ + Validates email format using regex. + """ regex = re.compile(r'([A-Za-z0-9]+[.\-_])*[A-Za-z0-9]+@[A-Za-z0-9-]+(.[A-Z|a-z]{2,})+') - if re.fullmatch(regex, email): - resp = True - - return resp + return bool(re.fullmatch(regex, email)) def validate_email(email): - + """ + Validates an email address for sending. + Checks format, bounce records, complaints, and domain validity. + """ if not emailIsValid(email): return False - if BounceRecord.objects.filter(email=email).count() >= settings.SES_BOUNCE_LIMIT: return False - - if ComplaintRecord.objects.filter(email=email).count() > 0: + if ComplaintRecord.objects.filter(email=email).exists(): return False - return validater_email_domain(email) \ No newline at end of file