From c2bb823bd4c964db7cbf91e376b687a6235d9940 Mon Sep 17 00:00:00 2001 From: Raymond Jessop Date: Fri, 18 Apr 2025 14:26:17 -0500 Subject: [PATCH] addmin.py and models.py updates --- django_aws_ses/admin.py | 137 +++++++++++++-------- django_aws_ses/models.py | 259 +++++++++++++++++++++++---------------- 2 files changed, 243 insertions(+), 153 deletions(-) diff --git a/django_aws_ses/admin.py b/django_aws_ses/admin.py index 9f69ce7..a4d3d8c 100644 --- a/django_aws_ses/admin.py +++ b/django_aws_ses/admin.py @@ -8,77 +8,118 @@ from .models import ( SendRecord, UnknownRecord, BlackListedDomains, - ) +) -from . import settings - -#logger = settings.logger - -class AwsSesSettingsAdmin(admin.ModelAdmin): - model = AwsSesSettings - list_display = ('get_site', 'region_name') - - def get_site(self, obj): - return obj.site.domain - - get_site.short_description = 'domain' - get_site.admin_order_field = 'site__domain' - -admin.site.register(AwsSesSettings, AwsSesSettingsAdmin) - -class AwsSesUserAddonAdmin(admin.ModelAdmin): - model = AwsSesUserAddon - list_display = ('get_email', 'unsubscribe') - def get_email(self, obj): - return obj.user.email - - get_email.short_description = 'email' - get_email.admin_order_field = 'user__email' - -admin.site.register(AwsSesUserAddon, AwsSesUserAddonAdmin) - -class SESStatAdmin(admin.ModelAdmin): - model = SESStat - list_display = ('date', 'delivery_attempts', 'bounces', 'complaints', 'rejects') - -admin.site.register(SESStat, SESStatAdmin) class AdminEmailListFilter(admin.SimpleListFilter): + """Filter records by email address containing a search term.""" + title = 'email' + parameter_name = 'email' + + def lookups(self, request, model_admin): + return ( + (None, 'All'), + ) + def queryset(self, request, queryset): - #logger.info('self.value(): %s' % self.value()) - return queryset.filter(email__contains=self.value()) - + if self.value(): + return queryset.filter(email__icontains=self.value()) + return queryset + + +@admin.register(AwsSesSettings) +class AwsSesSettingsAdmin(admin.ModelAdmin): + """Admin interface for AWS SES settings.""" + model = AwsSesSettings + list_display = ('get_site', 'region_name') + list_display_links = ('get_site',) + search_fields = ('site__domain', 'region_name') + + def get_site(self, obj): + """Display the domain of the associated site.""" + return obj.site.domain + + get_site.short_description = 'Domain' + get_site.admin_order_field = 'site__domain' + + +@admin.register(AwsSesUserAddon) +class AwsSesUserAddonAdmin(admin.ModelAdmin): + """Admin interface for user-specific AWS SES settings.""" + model = AwsSesUserAddon + list_display = ('get_email', 'unsubscribe') + list_display_links = ('get_email',) + list_filter = ('unsubscribe',) + search_fields = ('user__email',) + + def get_email(self, obj): + """Display the user's email address.""" + return obj.user.email + + get_email.short_description = 'Email' + get_email.admin_order_field = 'user__email' + + +@admin.register(SESStat) +class SESStatAdmin(admin.ModelAdmin): + """Admin interface for SES statistics.""" + model = SESStat + list_display = ('date', 'delivery_attempts', 'bounces', 'complaints', 'rejects') + list_display_links = ('date',) + date_hierarchy = 'date' + ordering = ('-date',) + + +@admin.register(BounceRecord) class BounceRecordAdmin(admin.ModelAdmin): + """Admin interface for bounce records.""" model = BounceRecord list_display = ('email', 'bounce_type', 'bounce_sub_type', 'status', 'timestamp') - list_filter = ('email', 'bounce_type', 'bounce_sub_type', 'status', 'timestamp') + list_display_links = ('email',) + list_filter = (AdminEmailListFilter, 'bounce_type', 'bounce_sub_type', 'status', 'timestamp') + search_fields = ('email', 'diagnostic_code') + date_hierarchy = 'timestamp' -admin.site.register(BounceRecord, BounceRecordAdmin) +@admin.register(ComplaintRecord) class ComplaintRecordAdmin(admin.ModelAdmin): + """Admin interface for complaint records.""" model = ComplaintRecord list_display = ('email', 'sub_type', 'feedback_type', 'timestamp') - list_filter = ('email', 'sub_type', 'feedback_type', 'timestamp') + list_display_links = ('email',) + list_filter = (AdminEmailListFilter, 'sub_type', 'feedback_type', 'timestamp') + search_fields = ('email',) + date_hierarchy = 'timestamp' -admin.site.register(ComplaintRecord, ComplaintRecordAdmin) +@admin.register(SendRecord) class SendRecordAdmin(admin.ModelAdmin): + """Admin interface for send records.""" model = SendRecord list_display = ('source', 'destination', 'subject', 'timestamp', 'status') - list_filter = ('source', 'destination', 'subject', 'timestamp', 'status') + list_display_links = ('destination',) + list_filter = (AdminEmailListFilter, 'source', 'status', 'timestamp') + search_fields = ('source', 'destination', 'subject') + date_hierarchy = 'timestamp' -admin.site.register(SendRecord, SendRecordAdmin) +@admin.register(UnknownRecord) class UnknownRecordAdmin(admin.ModelAdmin): + """Admin interface for unknown SES event records.""" model = UnknownRecord - list_display = ('event_type', 'aws_data') - list_filter = ('event_type', 'aws_data') + list_display = ('event_type', 'timestamp') + list_display_links = ('event_type',) + list_filter = ('event_type', 'timestamp') + search_fields = ('event_type', 'aws_data') + date_hierarchy = 'timestamp' -admin.site.register(UnknownRecord, UnknownRecordAdmin) +@admin.register(BlackListedDomains) class BlackListedDomainsAdmin(admin.ModelAdmin): + """Admin interface for blacklisted domains.""" model = BlackListedDomains list_display = ('domain', 'timestamp') - list_filter = ('domain', 'timestamp') - -admin.site.register(BlackListedDomains, BlackListedDomainsAdmin) \ No newline at end of file + list_display_links = ('domain',) + list_filter = ('timestamp',) + search_fields = ('domain',) + date_hierarchy = 'timestamp' \ No newline at end of file diff --git a/django_aws_ses/models.py b/django_aws_ses/models.py index c3866b4..4f97600 100644 --- a/django_aws_ses/models.py +++ b/django_aws_ses/models.py @@ -1,154 +1,203 @@ import hashlib +import hmac import logging -import traceback from django.conf import settings +from django.contrib.auth import get_user_model +from django.contrib.sites.models import Site from django.db import models from django.db.models.signals import post_save from django.dispatch import receiver -from django.contrib.sites.models import Site -from django.contrib.auth import get_user_model # If used custom user model from django.urls import reverse -from django.utils.encoding import force_bytes, force_str -from django.utils.http import urlsafe_base64_encode, urlsafe_base64_decode +from django.utils.encoding import force_bytes +from django.utils.http import urlsafe_base64_encode User = get_user_model() +logger = logging.getLogger(__name__) + -class AwsSesSettings(models.Model): - site = models.OneToOneField(Site, on_delete=models.CASCADE) - access_key = models.CharField(max_length=255, blank=True, null=True,) - secret_key = models.CharField(max_length=255, blank=True, null=True,) - region_name = models.CharField(max_length=255, blank=True, null=True,) - region_endpoint = models.CharField(max_length=255, blank=True, null=True,) - - class Meta: - verbose_name = 'AWS SES Settings' - @receiver(post_save, sender=Site) def update_awsses_settings(sender, instance, created, **kwargs): + """Create or update AwsSesSettings when a Site is saved.""" try: if created: AwsSesSettings.objects.create(site=instance) instance.awssessettings.save() except Exception as e: - print("Exception saving site error:%s" % e) - track = traceback.format_exc() - print("Exception saving site track: %s" % (track)) - - -class AwsSesUserAddon(models.Model): - user = models.OneToOneField(User, related_name='aws_ses', on_delete=models.CASCADE) - unsubscribe = models.BooleanField(default=False) - - class Meta: - verbose_name = 'User Data' - - def get_email(self): - email_field = self.user.get_email_field_name() - email = getattr(self, email_field, '') or '' - return email - - def unsubscribe_hash_generator(self): - email = self.get_email() - string_to_hash = "%s%s" % (str(self.user.pk), email) - return hashlib.md5(string_to_hash.encode()).hexdigest() - - def check_unsubscribe_hash(self, hash): - test_hash = self.unsubscribe_hash_generator() - return hash == test_hash - - def unsubscribe_url_generator(self): - uuid = urlsafe_base64_encode(force_bytes(self.user.pk)) - hash = self.unsubscribe_hash_generator() - return reverse('django_aws_ses:aws_ses_unsubscribe', kwargs={"uuid":uuid, "hash":hash}) + logger.error(f"Failed to save AwsSesSettings for site {instance.id}: {e}") + @receiver(post_save, sender=User) def update_awsses_user(sender, instance, created, **kwargs): - if created: - AwsSesUserAddon.objects.create(user=instance) + """Create or update AwsSesUserAddon when a User is saved.""" try: + if created: + AwsSesUserAddon.objects.create(user=instance) instance.aws_ses.save() - except AwsSesUserAddon.DoesNotExist: - AwsSesUserAddon.objects.create(user=instance) + except Exception as e: + logger.error(f"Failed to save AwsSesUserAddon for user {instance.id}: {e}") + + +class AwsSesSettings(models.Model): + """AWS SES configuration settings for a site.""" + site = models.OneToOneField(Site, on_delete=models.CASCADE, related_name='awssessettings') + access_key = models.CharField(max_length=255, blank=True, null=True) + secret_key = models.CharField(max_length=255, blank=True, null=True) + region_name = models.CharField(max_length=255, blank=True, null=True) + region_endpoint = models.CharField(max_length=255, blank=True, null=True) + + class Meta: + verbose_name = 'AWS SES Settings' + verbose_name_plural = 'AWS SES Settings' + + def __str__(self): + return f"AWS SES Settings for {self.site.domain}" + + +class AwsSesUserAddon(models.Model): + """Additional AWS SES data for a user, including unsubscribe status.""" + user = models.OneToOneField(User, related_name='aws_ses', on_delete=models.CASCADE) + unsubscribe = models.BooleanField(default=False) + + class Meta: + verbose_name = 'AWS SES User Addon' + verbose_name_plural = 'AWS SES User Addons' + + def __str__(self): + return f"AWS SES Addon for {self.user.email}" + + def get_email(self): + """Get the user's email address.""" + email_field = self.user.get_email_field_name() + return getattr(self.user, email_field, '') or '' + + def unsubscribe_hash_generator(self): + """Generate a secure hash for unsubscribe verification.""" + email = self.get_email() + message = f"{self.user.pk}{email}".encode() + return hmac.new( + settings.SECRET_KEY.encode(), + message, + hashlib.sha256 + ).hexdigest() + + def check_unsubscribe_hash(self, hash_value): + """Verify an unsubscribe hash.""" + return hmac.compare_digest(self.unsubscribe_hash_generator(), hash_value) + + def unsubscribe_url_generator(self): + """Generate a secure unsubscribe URL.""" + uuid = urlsafe_base64_encode(force_bytes(str(self.user.pk))) + hash_value = self.unsubscribe_hash_generator() + return reverse('django_aws_ses:aws_ses_unsubscribe', kwargs={"uuid": uuid, "hash": hash_value}) + class SESStat(models.Model): + """Daily statistics for AWS SES email sending.""" date = models.DateField(unique=True, db_index=True) delivery_attempts = models.PositiveIntegerField() bounces = models.PositiveIntegerField() complaints = models.PositiveIntegerField() rejects = models.PositiveIntegerField() - + class Meta: - verbose_name = 'SES Stat' + verbose_name = 'SES Statistic' + verbose_name_plural = 'SES Statistics' ordering = ['-date'] - - def __unicode__(self): + + def __str__(self): return self.date.strftime("%Y-%m-%d") - + + class BounceRecord(models.Model): - timestamp = models.DateTimeField(auto_now_add=True) - email = models.EmailField() - bounce_type = models.CharField(max_length=255, blank=True, null=True,) - bounce_sub_type = models.CharField(max_length=255, blank=True, null=True,) - reporting_mta = models.CharField(max_length=255, blank=True, null=True,) - status = models.CharField(max_length=255, blank=True, null=True,) - action = models.CharField(max_length=255, blank=True, null=True,) - feedback_id = models.TextField(max_length=255, blank=True, null=True,) - diagnostic_code = models.CharField(max_length=2048, blank=True, null=True,) + """Record of an email bounce event from AWS SES.""" + timestamp = models.DateTimeField(auto_now_add=True, db_index=True) + email = models.EmailField(db_index=True) + bounce_type = models.CharField(max_length=255, blank=True, null=True) + bounce_sub_type = models.CharField(max_length=255, blank=True, null=True) + reporting_mta = models.CharField(max_length=255, blank=True, null=True) + status = models.CharField(max_length=255, blank=True, null=True) + action = models.CharField(max_length=255, blank=True, null=True) + feedback_id = models.TextField(blank=True, null=True) + diagnostic_code = models.CharField(max_length=2048, blank=True, null=True) cleared = models.BooleanField(default=False) - + class Meta: - indexes = [models.Index(fields=["email"]),] - + verbose_name = 'Bounce Record' + verbose_name_plural = 'Bounce Records' + indexes = [models.Index(fields=['email', 'timestamp'])] + def __str__(self): - return "email: %s, type: %s, sub_type: %s, status: %s, date: %s" % (self.email, self.bounce_type, self.bounce_sub_type, self.status, self.timestamp) - + return f"Bounce: {self.email} ({self.bounce_type}, {self.timestamp})" + + class ComplaintRecord(models.Model): - timestamp = models.DateTimeField(auto_now_add=True) - email = models.EmailField() - sub_type = models.CharField(max_length=255, blank=True, null=True,) - feedback_id = models.TextField(max_length=255, blank=True, null=True,) - feedback_type = models.CharField(max_length=255, blank=True, null=True,) - + """Record of an email complaint event from AWS SES.""" + timestamp = models.DateTimeField(auto_now_add=True, db_index=True) + email = models.EmailField(db_index=True) + sub_type = models.CharField(max_length=255, blank=True, null=True) + feedback_id = models.TextField(blank=True, null=True) + feedback_type = models.CharField(max_length=255, blank=True, null=True) + + class Meta: + verbose_name = 'Complaint Record' + verbose_name_plural = 'Complaint Records' + indexes = [models.Index(fields=['email', 'timestamp'])] + def __str__(self): - return "email: %s, sub_type: %s, feedback_type: %s, date: %s" % (self.email, self.bounce_sub_type, self.feedback_type, self.timestamp) + return f"Complaint: {self.email} ({self.feedback_type}, {self.timestamp})" + class SendRecord(models.Model): - + """Record of an email send or delivery event from AWS SES.""" SEND = 'Send' DELIVERED = 'Delivery' - STATUS_CHOICE = ( - (SEND, SEND), - (DELIVERED, DELIVERED), + STATUS_CHOICES = ( + (SEND, 'Send'), + (DELIVERED, 'Delivery'), ) - - timestamp = models.DateTimeField(auto_now_add=True) - source = models.EmailField() - destination = models.EmailField() - subject = models.TextField(max_length=255, blank=True, null=True,) - message_id = models.TextField(max_length=255, blank=True, null=True,) - aws_process_time = models.IntegerField() - smtp_response = models.CharField(max_length=255, blank=True, null=True,) - status = models.CharField(max_length=255, blank=True, null=True,) - - class Meta: - indexes = [models.Index(fields=["destination"]),] - - def __str__(self): - return "source: %s, destination: %s, subject: %s, date: %s" % (self.source, self.destination, self.subject, self.timestamp) - + timestamp = models.DateTimeField(auto_now_add=True, db_index=True) + source = models.EmailField() + destination = models.EmailField(db_index=True) + subject = models.TextField(max_length=998, blank=True, null=True) + message_id = models.TextField(max_length=255, blank=True, null=True) + aws_process_time = models.IntegerField(default=0) + smtp_response = models.CharField(max_length=255, blank=True, null=True) + status = models.CharField(max_length=20, choices=STATUS_CHOICES, blank=True, null=True) + + class Meta: + verbose_name = 'Send Record' + verbose_name_plural = 'Send Records' + indexes = [models.Index(fields=['destination', 'timestamp'])] + + def __str__(self): + return f"Send: {self.source} to {self.destination} ({self.status}, {self.timestamp})" + + class UnknownRecord(models.Model): - timestamp = models.DateTimeField(auto_now_add=True) - event_type = models.CharField(max_length=255, blank=True, null=True,) - aws_data = models.TextField(blank=True, null=True,) - + """Record of unrecognized AWS SES events.""" + timestamp = models.DateTimeField(auto_now_add=True, db_index=True) + event_type = models.CharField(max_length=255, blank=True, null=True) + aws_data = models.TextField(blank=True, null=True) + + class Meta: + verbose_name = 'Unknown Record' + verbose_name_plural = 'Unknown Records' + indexes = [models.Index(fields=['event_type', 'timestamp'])] + def __str__(self): - return "eventType: %s, timestamp: %s" % (self.eventType, self.timestamp) - + return f"Unknown Event: {self.event_type} ({self.timestamp})" + + class BlackListedDomains(models.Model): - domain = models.CharField(max_length=255, unique=True) - timestamp = models.DateTimeField(auto_now_add=True) - + """Domains blacklisted for email sending.""" + domain = models.CharField(max_length=255, unique=True, db_index=True) + timestamp = models.DateTimeField(auto_now_add=True, db_index=True) + + class Meta: + verbose_name = 'Blacklisted Domain' + verbose_name_plural = 'Blacklisted Domains' + def __str__(self): - return "%s, blocked: %s" % (self.domain, self.timestamp) \ No newline at end of file + return f"Blacklisted: {self.domain} ({self.timestamp})" \ No newline at end of file