working on cleaning up the project

This commit is contained in:
Raymond Jessop 2025-04-18 13:02:28 -05:00
parent 4a3297c250
commit bbfdfec0dc
2 changed files with 276 additions and 279 deletions

View File

@ -1,106 +1,153 @@
import logging
import os
from django.conf import settings
from django.contrib.sites.models import Site
import logging
from django.core.exceptions import ImproperlyConfigured
from .models import (
AwsSesSettings
from .models import AwsSesSettings
# Define constants for default values
DEFAULTS = {
'AWS_SES_REGION_NAME': 'us-east-1',
'AWS_SES_REGION_ENDPOINT': 'email.us-east-1.amazonaws.com',
'AWS_SES_AUTO_THROTTLE': 0.5,
'AWS_SES_RETURN_PATH': None,
'AWS_SES_CONFIGURATION_SET': None,
'DKIM_SELECTOR': 'ses',
'DKIM_HEADERS': ('From', 'To', 'Cc', 'Subject'),
'VERIFY_BOUNCE_SIGNATURES': True,
'BOUNCE_CERT_DOMAINS': ('amazonaws.com', 'amazon.com'),
'SES_BOUNCE_LIMIT': 1,
'SES_BACKEND_DEBUG': False,
'SES_BACKEND_DEBUG_LOGFILE_FORMATTER': '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
'DEFAULT_FROM_EMAIL': 'no_reply@example.com',
'UNSUBSCRIBE_TEMPLATE': 'django_aws_ses/unsubscribe.html',
'BASE_TEMPLATE': 'django_aws_ses/base.html',
}
# Selectively export key settings
__all__ = (
'ACCESS_KEY', 'SECRET_KEY', 'AWS_SES_REGION_NAME', 'AWS_SES_REGION_ENDPOINT',
'AWS_SES_AUTO_THROTTLE', 'AWS_SES_RETURN_PATH', 'AWS_SES_CONFIGURATION_SET',
'DKIM_DOMAIN', 'DKIM_PRIVATE_KEY', 'DKIM_SELECTOR', 'DKIM_HEADERS',
'TIME_ZONE', 'BASE_DIR', 'SES_BOUNCE_LIMIT', 'SES_BACKEND_DEBUG',
'SES_BACKEND_DEBUG_LOGFILE_PATH', 'SES_BACKEND_DEBUG_LOGFILE_FORMATTER',
'DEFAULT_FROM_EMAIL', 'HOME_URL', 'UNSUBSCRIBE_TEMPLATE', 'BASE_TEMPLATE',
'VERIFY_BOUNCE_SIGNATURES', 'BOUNCE_CERT_DOMAINS', 'logger',
)
def get_aws_ses_settings():
"""
Retrieve AwsSesSettings from the database for the current site.
Returns None if the settings cannot be retrieved.
"""
try:
return AwsSesSettings.objects.get(site_id=settings.SITE_ID)
except (AwsSesSettings.DoesNotExist, AttributeError) as e:
logger.warning("Failed to retrieve AwsSesSettings: %s", e)
return None
def configure_logger(debug, log_file_path, formatter):
"""
Configure the logger for the AWS SES app.
Sets up file logging if debug is enabled and a valid log file path is provided.
"""
logger = logging.getLogger('django_aws_ses')
logger.setLevel(logging.DEBUG if debug else logging.WARNING)
if debug and log_file_path:
# Validate log file path
log_dir = os.path.dirname(log_file_path)
if log_dir and not os.path.exists(log_dir):
os.makedirs(log_dir, exist_ok=True)
try:
handler = logging.FileHandler(log_file_path)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter(formatter)
handler.setFormatter(formatter)
logger.addHandler(handler)
except OSError as e:
logger.error("Failed to configure log file %s: %s", log_file_path, e)
return logger
# Initialize logger early to capture any setup errors
BASE_DIR = getattr(settings, 'BASE_DIR', None)
if not BASE_DIR:
raise ImproperlyConfigured(
"BASE_DIR must be defined in Django settings and point to the project root directory."
)
# Temporary logger for setup phase
logger = logging.getLogger('django_aws_ses')
__all__ = ('ACCESS_KEY', 'SECRET_KEY', 'AWS_SES_REGION_NAME',
'AWS_SES_REGION_ENDPOINT', 'AWS_SES_AUTO_THROTTLE',
'AWS_SES_RETURN_PATH', 'DKIM_DOMAIN', 'DKIM_PRIVATE_KEY',
'DKIM_SELECTOR', 'DKIM_HEADERS', 'TIME_ZONE', 'BASE_DIR',
'BOUNCE_LIMIT','SES_BACKEND_DEBUG','SES_BACKEND_DEBUG_LOGFILE_PATH',
'SES_BACKEND_DEBUG_LOGFILE_FORMATTER')
# Fetch AwsSesSettings from database
aws_ses_settings = get_aws_ses_settings()
aws_ses_Settings = None
# AWS Credentials
ACCESS_KEY = aws_ses_settings.access_key if aws_ses_settings else getattr(
settings, 'AWS_SES_ACCESS_KEY_ID', getattr(settings, 'AWS_ACCESS_KEY_ID', None)
)
SECRET_KEY = aws_ses_settings.secret_key if aws_ses_settings else getattr(
settings, 'AWS_SES_SECRET_ACCESS_KEY', getattr(settings, 'AWS_SECRET_ACCESS_KEY', None)
)
try:
aws_ses_Settings, c = AwsSesSettings.objects.get_or_create(site_id=settings.SITE_ID)
except Exception as e:
print("AwsSesSettings does not exist: error: %s" % e)
ACCESS_KEY = aws_ses_Settings.access_key if aws_ses_Settings else None
if ACCESS_KEY is None:
ACCESS_KEY = getattr(settings, 'AWS_SES_ACCESS_KEY_ID',getattr(settings, 'AWS_ACCESS_KEY_ID', None))
SECRET_KEY = aws_ses_Settings.secret_key if aws_ses_Settings else None
if SECRET_KEY is None:
SECRET_KEY = getattr(settings, 'AWS_SES_SECRET_ACCESS_KEY',getattr(settings, 'AWS_SECRET_ACCESS_KEY', None))
AWS_SES_REGION_NAME = aws_ses_Settings.region_name if aws_ses_Settings else None
if AWS_SES_REGION_NAME is None:
AWS_SES_REGION_NAME = getattr(settings, 'AWS_SES_REGION_NAME',getattr(settings, 'AWS_DEFAULT_REGION', 'us-east-1'))
AWS_SES_REGION_ENDPOINT = aws_ses_Settings.region_endpoint if aws_ses_Settings else None
if AWS_SES_REGION_ENDPOINT is None:
AWS_SES_REGION_ENDPOINT = getattr(settings, 'AWS_SES_REGION_ENDPOINT','email.us-east-1.amazonaws.com')
BASE_DIR = getattr(settings, 'BASE_DIR', None)
# Validate credentials
if not (ACCESS_KEY and SECRET_KEY):
raise ImproperlyConfigured(
"AWS SES credentials (ACCESS_KEY and SECRET_KEY) must be provided via AwsSesSettings or Django settings."
)
DEFAULT_FROM_EMAIL = ""
# AWS SES Configuration
AWS_SES_REGION_NAME = aws_ses_settings.region_name if aws_ses_settings else getattr(
settings, 'AWS_SES_REGION_NAME', getattr(settings, 'AWS_DEFAULT_REGION', DEFAULTS['AWS_SES_REGION_NAME'])
)
AWS_SES_REGION_ENDPOINT = aws_ses_settings.region_endpoint if aws_ses_settings else getattr(
settings, 'AWS_SES_REGION_ENDPOINT', DEFAULTS['AWS_SES_REGION_ENDPOINT']
)
AWS_SES_AUTO_THROTTLE = getattr(settings, 'AWS_SES_AUTO_THROTTLE', DEFAULTS['AWS_SES_AUTO_THROTTLE'])
AWS_SES_RETURN_PATH = getattr(settings, 'AWS_SES_RETURN_PATH', DEFAULTS['AWS_SES_RETURN_PATH'])
AWS_SES_CONFIGURATION_SET = getattr(settings, 'AWS_SES_CONFIGURATION_SET', DEFAULTS['AWS_SES_CONFIGURATION_SET'])
# DKIM Settings
DKIM_DOMAIN = getattr(settings, 'DKIM_DOMAIN', None)
DKIM_PRIVATE_KEY = getattr(settings, 'DKIM_PRIVATE_KEY', None)
DKIM_SELECTOR = getattr(settings, 'DKIM_SELECTOR', DEFAULTS['DKIM_SELECTOR'])
DKIM_HEADERS = getattr(settings, 'DKIM_HEADERS', DEFAULTS['DKIM_HEADERS'])
# Email Settings
try:
site = Site.objects.get_current()
DEFAULT_FROM_EMAIL = getattr(settings, 'DEFAULT_FROM_EMAIL', 'no_reply@%s' % site.domain)
except Exception as e:
print("Site Doesn't Exist, please configure Django sites")
print("Error is: %s" % e)
DEFAULT_FROM_EMAIL = getattr(settings, 'DEFAULT_FROM_EMAIL', f"no-reply@{site.domain}")
except Site.DoesNotExist:
DEFAULT_FROM_EMAIL = getattr(settings, 'DEFAULT_FROM_EMAIL', DEFAULTS['DEFAULT_FROM_EMAIL'])
logger.warning(
"Django sites framework not configured. Using DEFAULT_FROM_EMAIL: %s. "
"Configure the Site model or set DEFAULT_FROM_EMAIL in settings.", DEFAULT_FROM_EMAIL
)
HOME_URL = getattr(settings, 'HOME_URL', '')
if not BASE_DIR:
raise RuntimeError('No BASE_DIR defined in project settings, django_aws_ses requires BASE_DIR to be defined and pointed at your root directory. i.e. BASE_DIR = os.path.dirname(os.path.abspath(__file__))')
# Template Settings
UNSUBSCRIBE_TEMPLATE = getattr(settings, 'UNSUBSCRIBE_TEMPLATE', DEFAULTS['UNSUBSCRIBE_TEMPLATE'])
BASE_TEMPLATE = getattr(settings, 'BASE_TEMPLATE', DEFAULTS['BASE_TEMPLATE'])
UNSUBSCRIBE_TEMPLET = getattr(settings, 'UNSUBSCRIBE_TEMPLET', 'django_aws_ses/unsubscribe.html')
BASE_TEMPLET = getattr(settings, 'UNSUBSCRIBE_TEMPLET', 'django_aws_ses/base.html')
AWS_SES_REGION_ENDPOINT_URL = getattr(settings, 'AWS_SES_REGION_ENDPOINT_URL','https://' + AWS_SES_REGION_ENDPOINT)
AWS_SES_AUTO_THROTTLE = getattr(settings, 'AWS_SES_AUTO_THROTTLE', 0.5)
AWS_SES_RETURN_PATH = getattr(settings, 'AWS_SES_RETURN_PATH', None)
# Bounce and Verification Settings
VERIFY_BOUNCE_SIGNATURES = getattr(settings, 'AWS_SES_VERIFY_BOUNCE_SIGNATURES', DEFAULTS['VERIFY_BOUNCE_SIGNATURES'])
BOUNCE_CERT_DOMAINS = getattr(settings, 'AWS_SNS_BOUNCE_CERT_TRUSTED_DOMAINS', DEFAULTS['BOUNCE_CERT_DOMAINS'])
SES_BOUNCE_LIMIT = getattr(settings, 'SES_BOUNCE_LIMIT', DEFAULTS['SES_BOUNCE_LIMIT'])
# if AWS_SES_RETURN_PATH is None:
# AWS_SES_RETURN_PATH = "CF Doors <cdfbounced@zeeksgeeks.com>"
AWS_SES_CONFIGURATION_SET = getattr(settings, 'AWS_SES_CONFIGURATION_SET', None)
DKIM_DOMAIN = getattr(settings, "DKIM_DOMAIN", None)
DKIM_PRIVATE_KEY = getattr(settings, 'DKIM_PRIVATE_KEY', None)
DKIM_SELECTOR = getattr(settings, 'DKIM_SELECTOR', 'ses')
DKIM_HEADERS = getattr(settings, 'DKIM_HEADERS', ('From', 'To', 'Cc', 'Subject'))
# Debug Settings
SES_BACKEND_DEBUG = getattr(settings, 'SES_BACKEND_DEBUG', DEFAULTS['SES_BACKEND_DEBUG'])
SES_BACKEND_DEBUG_LOGFILE_PATH = getattr(
settings, 'SES_BACKEND_DEBUG_LOGFILE_PATH', os.path.join(BASE_DIR, 'aws_ses.log')
)
SES_BACKEND_DEBUG_LOGFILE_FORMATTER = getattr(
settings, 'SES_BACKEND_DEBUG_LOGFILE_FORMATTER', DEFAULTS['SES_BACKEND_DEBUG_LOGFILE_FORMATTER']
)
# Timezone
TIME_ZONE = settings.TIME_ZONE
VERIFY_BOUNCE_SIGNATURES = getattr(settings, 'AWS_SES_VERIFY_BOUNCE_SIGNATURES', True)
# Domains that are trusted when retrieving the certificate
# used to sign bounce messages.
BOUNCE_CERT_DOMAINS = getattr(settings, 'AWS_SNS_BOUNCE_CERT_TRUSTED_DOMAINS', (
'amazonaws.com',
'amazon.com',
))
SES_BOUNCE_LIMIT = getattr(settings,'BOUNCE_LIMT', 1)
SES_BACKEND_DEBUG = getattr(settings,'SES_BACKEND_DEBUG', False)
SES_BACKEND_DEBUG_LOGFILE_PATH = getattr(settings,'SES_BACKEND_DEBUG_LOGFILE_PATH', '%s/aws_ses.log' % BASE_DIR)
SES_BACKEND_DEBUG_LOGFILE_FORMATTER = getattr(settings,'SES_BACKEND_DEBUG_LOGFILE_FORMATTER', '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('django_aws_ses')
# logger.setLevel(logging.WARNING)
if SES_BACKEND_DEBUG:
logger.setLevel(logging.INFO)
# create a file handler
if SES_BACKEND_DEBUG_LOGFILE_PATH:
handler = logging.FileHandler(SES_BACKEND_DEBUG_LOGFILE_PATH)
handler.setLevel(logging.INFO)
# create a logging format
formatter = logging.Formatter(SES_BACKEND_DEBUG_LOGFILE_FORMATTER)
handler.setFormatter(formatter)
# add the handlers to the logger
logger.addHandler(handler)
#logger.info('something we are logging')
# Configure logger with final settings
logger = configure_logger(SES_BACKEND_DEBUG, SES_BACKEND_DEBUG_LOGFILE_PATH, SES_BACKEND_DEBUG_LOGFILE_FORMATTER)

View File

@ -1,23 +1,18 @@
import base64
import logging
import time
import re
import dns.resolver
from telnetlib import Telnet
from builtins import str as text
from builtins import bytes
from io import StringIO
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
from urllib.parse import urlparse
from django.core.exceptions import ImproperlyConfigured
from django.utils.encoding import smart_str
from django.dispatch.dispatcher import receiver
from django.db.models import Count
from django.contrib.auth import get_user_model # If used custom user model
User = get_user_model()
from django.contrib.auth import get_user_model
from cryptography import x509
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.exceptions import InvalidSignature
import requests
from . import settings
from . import signals
@ -26,148 +21,146 @@ from .models import (
ComplaintRecord,
BlackListedDomains,
SendRecord,
)
)
from django.contrib.auth import get_user_model
User = get_user_model()
# Logger setup
logger = settings.logger
class BounceMessageVerifier(object):
"""
A utility class for validating bounce messages
See: http://docs.amazonwebservices.com/sns/latest/gsg/SendMessageToHttp.verify.signature.html
Utility class for validating AWS SES/SNS bounce messages.
Verifies the message signature using the provided certificate.
"""
def __init__(self, bounce_dict):
"""
Creates a new bounce message from the given dict.
"""
# Initialize with the bounce message dictionary
self._data = bounce_dict
self._verified = None
self._certificate = None
def is_verified(self):
"""
Verifies an SES bounce message.
Verifies the signature of an SES bounce message.
Returns True if the signature is valid, False otherwise.
"""
if self._verified is None:
# Extract and decode the signature
signature = self._data.get('Signature')
if not signature:
logger.warning("No signature found in bounce message")
self._verified = False
return self._verified
# Decode the signature from base64
signature = bytes(base64.b64decode(signature))
try:
signature = base64.b64decode(signature)
except ValueError:
logger.warning("Invalid base64 signature")
self._verified = False
return self._verified
# Get the message to sign
# Get the message bytes to verify against
sign_bytes = self._get_bytes_to_sign()
if not sign_bytes:
logger.warning("Could not generate bytes to sign")
self._verified = False
return self._verified
if not self.certificate:
# Load the signing certificate
certificate = self.certificate
if not certificate:
logger.warning("No valid certificate available")
self._verified = False
return self._verified
# Extract the public key
pkey = self.certificate.get_pubkey()
# Use the public key to verify the signature.
pkey.verify_init()
pkey.verify_update(sign_bytes)
verify_result = pkey.verify_final(signature)
self._verified = verify_result == 1
# Verify the signature using the certificate's public key
try:
public_key = certificate.public_key()
public_key.verify(
signature,
sign_bytes,
padding.PKCS1v15(),
hashes.SHA1() # AWS SNS uses SHA1
)
self._verified = True
except InvalidSignature:
logger.warning("Signature verification failed: Invalid signature")
self._verified = False
except Exception as e:
logger.warning("Signature verification failed: %s", e)
self._verified = False
return self._verified
@property
def certificate(self):
"""
Retrieves the certificate used to sign the bounce message.
TODO: Cache the certificate based on the cert URL so we don't have to
retrieve it for each bounce message. *We would need to do it in a
secure way so that the cert couldn't be overwritten in the cache*
Fetches and loads the X.509 certificate used to sign the bounce message.
Returns None if the certificate cannot be loaded.
"""
if not hasattr(self, '_certificate'):
if self._certificate is None:
cert_url = self._get_cert_url()
# Only load certificates from a certain domain?
# Without some kind of trusted domain check, any old joe could
# craft a bounce message and sign it using his own certificate
# and we would happily load and verify it.
if not cert_url:
self._certificate = None
return self._certificate
logger.warning("No valid certificate URL")
return None
# Ensure requests is available
try:
import requests
except ImportError:
raise ImproperlyConfigured(
"`requests` is required for bounce message verification. "
"Please consider installing the `django-ses` with the "
"`bounce` extra - e.g. `pip install django-ses[bounce]`."
"Install with `pip install requests`."
)
# Ensure cryptography is available
try:
from M2Crypto import X509
from cryptography import x509
from cryptography.hazmat.backends import default_backend
except ImportError:
raise ImproperlyConfigured(
"`M2Crypto` is required for bounce message verification. "
"Please consider installing the `django-ses` with the "
"`bounce` extra - e.g. `pip install django-ses[bounce]`."
"`cryptography` is required for bounce message verification. "
"Install with `pip install cryptography`."
)
# We use requests because it verifies the https certificate
# when retrieving the signing certificate. If https was somehow
# hijacked then all bets are off.
# Fetch the certificate
response = requests.get(cert_url)
if response.status_code != 200:
logger.warning(u'Could not download certificate from %s: "%s"', cert_url, response.status_code)
self._certificate = None
return self._certificate
logger.warning("Failed to download certificate from %s: %s", cert_url, response.status_code)
return None
# Handle errors loading the certificate.
# If the certificate is invalid then return
# false as we couldn't verify the message.
# Load the certificate
try:
self._certificate = X509.load_cert_string(response.content)
except X509.X509Error as e:
logger.warning(u'Could not load certificate from %s: "%s"', cert_url, e)
self._certificate = None
self._certificate = x509.load_pem_x509_certificate(response.content, default_backend())
except Exception as e:
logger.warning("Failed to load certificate from %s: %s", cert_url, e)
return None
return self._certificate
def _get_cert_url(self):
"""
Get the signing certificate URL.
Only accept urls that match the domains set in the
AWS_SNS_BOUNCE_CERT_TRUSTED_DOMAINS setting. Sub-domains
are allowed. i.e. if amazonaws.com is in the trusted domains
then sns.us-east-1.amazonaws.com will match.
Retrieves the certificate URL from the message, ensuring it comes from a trusted domain.
Returns None if the URL is untrusted or invalid.
"""
cert_url = self._data.get('SigningCertURL')
if cert_url:
if cert_url.startswith('https://'):
url_obj = urlparse(cert_url)
for trusted_domain in settings.BOUNCE_CERT_DOMAINS:
parts = trusted_domain.split('.')
if url_obj.netloc.split('.')[-len(parts):] == parts:
return cert_url
logger.warning(u'Untrusted certificate URL: "%s"', cert_url)
if cert_url and cert_url.startswith('https://'):
url_obj = urlparse(cert_url)
for trusted_domain in settings.BOUNCE_CERT_DOMAINS:
parts = trusted_domain.split('.')
if url_obj.netloc.split('.')[-len(parts):] == parts:
return cert_url
logger.warning("Untrusted certificate URL: %s", cert_url)
else:
logger.warning(u'No signing certificate URL: "%s"', cert_url)
logger.warning("No/invalid certificate URL: %s", cert_url)
return None
def _get_bytes_to_sign(self):
"""
Creates the message used for signing SNS notifications.
This is used to verify the bounce message when it is received.
Constructs the message bytes to be signed for verification.
Returns None if the message type is unrecognized.
"""
# Depending on the message type the fields to add to the message
# differ so we handle that here.
msg_type = self._data.get('Type')
if msg_type == 'Notification':
fields_to_sign = [
@ -178,8 +171,7 @@ class BounceMessageVerifier(object):
'TopicArn',
'Type',
]
elif (msg_type == 'SubscriptionConfirmation' or
msg_type == 'UnsubscribeConfirmation'):
elif msg_type in ('SubscriptionConfirmation', 'UnsubscribeConfirmation'):
fields_to_sign = [
'Message',
'MessageId',
@ -190,172 +182,130 @@ class BounceMessageVerifier(object):
'Type',
]
else:
# Unrecognized type
logger.warning(u'Unrecognized SNS message Type: "%s"', msg_type)
logger.warning("Unrecognized SNS message type: %s", msg_type)
return None
outbytes = StringIO()
for field_name in fields_to_sign:
field_value = smart_str(self._data.get(field_name, ''),
errors="replace")
field_value = smart_str(self._data.get(field_name, ''), errors="replace")
if field_value:
outbytes.write(text(field_name))
outbytes.write(text("\n"))
outbytes.write(text(field_value))
outbytes.write(text("\n"))
response = outbytes.getvalue()
return bytes(response, 'utf-8')
outbytes.write(field_name)
outbytes.write("\n")
outbytes.write(field_value)
outbytes.write("\n")
return outbytes.getvalue().encode('utf-8')
def verify_bounce_message(msg):
"""
Verify an SES/SNS bounce notification message.
Verifies an SES/SNS bounce notification message.
Returns True if valid, False otherwise.
"""
verifier = BounceMessageVerifier(msg)
return verifier.is_verified()
@receiver(signals.email_pre_send)
def receiver_email_pre_send(sender, message=None, **kwargs):
#logger.info("receiver_email_pre_send received signal")
"""
Signal receiver for pre-send email processing.
Currently a no-op.
"""
pass
def filter_recipiants(recipiant_list):
logger.info("Starting filter_recipiants: %s" % recipiant_list)
if type(recipiant_list) != type([]):
logger.info("putting emails into a list")
"""
Filters a list of recipient email addresses to exclude invalid or blacklisted emails.
"""
logger.info("Starting filter_recipiants: %s", recipiant_list)
# Ensure recipient_list is a list
if not isinstance(recipiant_list, list):
logger.info("Converting recipients to list")
recipiant_list = [recipiant_list]
if len(recipiant_list) > 0:
recipiant_list = filter_recipiants_with_unsubscribe(recipiant_list)
if len(recipiant_list) > 0:
if recipiant_list:
recipiant_list = filter_recipiants_with_unsubscribe(recipiant_list)
recipiant_list = filter_recipiants_with_complaint_records(recipiant_list)
if len(recipiant_list) > 0:
recipiant_list = filter_recipiants_with_bounce_records(recipiant_list)
if len(recipiant_list) > 0:
recipiant_list = filter_recipiants_with_validater_email_domain(recipiant_list)
logger.info("recipiant list after filter_recipiants: %s" % recipiant_list)
logger.info("Filtered recipient list: %s", recipiant_list)
return recipiant_list
def filter_recipiants_with_unsubscribe(recipiant_list):
"""
filter message recipiants so we don't send emails to any email that have Unsubscribude
Removes recipients who have unsubscribed.
"""
#logger.info("unsubscribe filter running")
#logger.info("message.recipients() befor blacklist_emails filter: %s" % recipiant_list)
blacklist_emails = list(set([record.email for record in User.objects.filter(aws_ses__unsubscribe=True)]))
blacklist_emails = list(set([record.email for record in User.objects.filter(aws_ses__unsubscribe=True)]))
return filter_recipiants_with_blacklist(recipiant_list, blacklist_emails) if blacklist_emails else recipiant_list
if blacklist_emails:
return filter_recipiants_with_blacklist(recipiant_list, blacklist_emails)
else:
return recipiant_list
def filter_recipiants_with_complaint_records(recipiant_list):
"""
filter message recipiants so we don't send emails to any email that have a ComplaintRecord
Removes recipients with complaint records.
"""
#logger.info("complaint_records filter running")
#logger.info("message.recipients() befor blacklist_emails filter: %s" % recipiant_list)
blacklist_emails = list(set([record.email for record in ComplaintRecord.objects.filter(email__isnull=False)]))
if blacklist_emails:
return filter_recipiants_with_blacklist(recipiant_list, blacklist_emails)
else:
return recipiant_list
blacklist_emails = list(set([record.email for record in ComplaintRecord.objects.filter(email__isnull=False)]))
return filter_recipiants_with_blacklist(recipiant_list, blacklist_emails) if blacklist_emails else recipiant_list
def filter_recipiants_with_bounce_records(recipiant_list):
"""
filter message recipiants so we dont send emails to any email that has more BounceRecord
the SES_BOUNCE_LIMIT
Removes recipients with bounce records exceeding SES_BOUNCE_LIMIT.
"""
#logger.info("bounce_records filter running")
#logger.info("message.recipients() befor blacklist_emails filter: %s" % recipiant_list)
blacklist_emails = list(set([record.email for record in BounceRecord.objects.filter(email__isnull=False).annotate(total=Count('email')).filter(total__gte=settings.SES_BOUNCE_LIMIT)]))
if blacklist_emails:
return filter_recipiants_with_blacklist(recipiant_list, blacklist_emails)
else:
return recipiant_list
blacklist_emails = list(set([record.email for record in BounceRecord.objects.filter(email__isnull=False)
.annotate(total=Count('email')).filter(total__gte=settings.SES_BOUNCE_LIMIT)]))
return filter_recipiants_with_blacklist(recipiant_list, blacklist_emails) if blacklist_emails else recipiant_list
def filter_recipiants_with_blacklist(recipiant_list, blacklist_emails):
"""
filter message recipiants with a list of emails you dont want to email
Filters out emails from a blacklist.
"""
filtered_recipiant_list = [email for email in recipiant_list if email not in blacklist_emails]
return filtered_recipiant_list
return [email for email in recipiant_list if email not in blacklist_emails]
def filter_recipiants_with_validater_email_domain(recipiant_list):
debug_flag = True
"""
Validates email domains for new recipients.
"""
sent_list = [e.destination for e in SendRecord.objects.filter(destination__in=recipiant_list).distinct("destination")]
test_list = [e for e in recipiant_list if e not in sent_list]
for e in test_list:
if not validater_email_domain(e):
recipiant_list.remove(e)
recipiant_list.remove(e)
return recipiant_list
def validater_email_domain(email):
"""
Checks if an email's domain has valid MX records and is not blacklisted.
"""
if email.find("@") < 1:
return False
domain = email.split("@")[-1]
if BlackListedDomains.objects.filter(domain=domain).count() > 0:
if BlackListedDomains.objects.filter(domain=domain).exists():
return False
records = []
try:
records = dns.resolver.query(domain, 'MX')
except dns.resolver.NoNameservers as e:
return len(records) > 0
except (dns.resolver.NoNameservers, dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.resolver.LifetimeTimeout):
return False
except dns.resolver.NoAnswer as e:
return False
except dns.resolver.NXDOMAIN as e:
return False
except dns.resolver.LifetimeTimeout as e:
return False
if len(records) < 1:
return False
return True
def emailIsValid(email):
resp = False
"""
Validates email format using regex.
"""
regex = re.compile(r'([A-Za-z0-9]+[.\-_])*[A-Za-z0-9]+@[A-Za-z0-9-]+(.[A-Z|a-z]{2,})+')
if re.fullmatch(regex, email):
resp = True
return resp
return bool(re.fullmatch(regex, email))
def validate_email(email):
"""
Validates an email address for sending.
Checks format, bounce records, complaints, and domain validity.
"""
if not emailIsValid(email):
return False
if BounceRecord.objects.filter(email=email).count() >= settings.SES_BOUNCE_LIMIT:
return False
if ComplaintRecord.objects.filter(email=email).count() > 0:
if ComplaintRecord.objects.filter(email=email).exists():
return False
return validater_email_domain(email)