addmin.py and models.py updates

This commit is contained in:
Raymond Jessop 2025-04-18 14:26:17 -05:00
parent 06fcb36121
commit c2bb823bd4
2 changed files with 243 additions and 153 deletions

View File

@ -8,77 +8,118 @@ from .models import (
SendRecord, SendRecord,
UnknownRecord, UnknownRecord,
BlackListedDomains, BlackListedDomains,
) )
from . import settings
#logger = settings.logger
class AwsSesSettingsAdmin(admin.ModelAdmin):
model = AwsSesSettings
list_display = ('get_site', 'region_name')
def get_site(self, obj):
return obj.site.domain
get_site.short_description = 'domain'
get_site.admin_order_field = 'site__domain'
admin.site.register(AwsSesSettings, AwsSesSettingsAdmin)
class AwsSesUserAddonAdmin(admin.ModelAdmin):
model = AwsSesUserAddon
list_display = ('get_email', 'unsubscribe')
def get_email(self, obj):
return obj.user.email
get_email.short_description = 'email'
get_email.admin_order_field = 'user__email'
admin.site.register(AwsSesUserAddon, AwsSesUserAddonAdmin)
class SESStatAdmin(admin.ModelAdmin):
model = SESStat
list_display = ('date', 'delivery_attempts', 'bounces', 'complaints', 'rejects')
admin.site.register(SESStat, SESStatAdmin)
class AdminEmailListFilter(admin.SimpleListFilter): class AdminEmailListFilter(admin.SimpleListFilter):
"""Filter records by email address containing a search term."""
title = 'email'
parameter_name = 'email'
def lookups(self, request, model_admin):
return (
(None, 'All'),
)
def queryset(self, request, queryset): def queryset(self, request, queryset):
#logger.info('self.value(): %s' % self.value()) if self.value():
return queryset.filter(email__contains=self.value()) return queryset.filter(email__icontains=self.value())
return queryset
@admin.register(AwsSesSettings)
class AwsSesSettingsAdmin(admin.ModelAdmin):
"""Admin interface for AWS SES settings."""
model = AwsSesSettings
list_display = ('get_site', 'region_name')
list_display_links = ('get_site',)
search_fields = ('site__domain', 'region_name')
def get_site(self, obj):
"""Display the domain of the associated site."""
return obj.site.domain
get_site.short_description = 'Domain'
get_site.admin_order_field = 'site__domain'
@admin.register(AwsSesUserAddon)
class AwsSesUserAddonAdmin(admin.ModelAdmin):
"""Admin interface for user-specific AWS SES settings."""
model = AwsSesUserAddon
list_display = ('get_email', 'unsubscribe')
list_display_links = ('get_email',)
list_filter = ('unsubscribe',)
search_fields = ('user__email',)
def get_email(self, obj):
"""Display the user's email address."""
return obj.user.email
get_email.short_description = 'Email'
get_email.admin_order_field = 'user__email'
@admin.register(SESStat)
class SESStatAdmin(admin.ModelAdmin):
"""Admin interface for SES statistics."""
model = SESStat
list_display = ('date', 'delivery_attempts', 'bounces', 'complaints', 'rejects')
list_display_links = ('date',)
date_hierarchy = 'date'
ordering = ('-date',)
@admin.register(BounceRecord)
class BounceRecordAdmin(admin.ModelAdmin): class BounceRecordAdmin(admin.ModelAdmin):
"""Admin interface for bounce records."""
model = BounceRecord model = BounceRecord
list_display = ('email', 'bounce_type', 'bounce_sub_type', 'status', 'timestamp') list_display = ('email', 'bounce_type', 'bounce_sub_type', 'status', 'timestamp')
list_filter = ('email', 'bounce_type', 'bounce_sub_type', 'status', 'timestamp') list_display_links = ('email',)
list_filter = (AdminEmailListFilter, 'bounce_type', 'bounce_sub_type', 'status', 'timestamp')
search_fields = ('email', 'diagnostic_code')
date_hierarchy = 'timestamp'
admin.site.register(BounceRecord, BounceRecordAdmin)
@admin.register(ComplaintRecord)
class ComplaintRecordAdmin(admin.ModelAdmin): class ComplaintRecordAdmin(admin.ModelAdmin):
"""Admin interface for complaint records."""
model = ComplaintRecord model = ComplaintRecord
list_display = ('email', 'sub_type', 'feedback_type', 'timestamp') list_display = ('email', 'sub_type', 'feedback_type', 'timestamp')
list_filter = ('email', 'sub_type', 'feedback_type', 'timestamp') list_display_links = ('email',)
list_filter = (AdminEmailListFilter, 'sub_type', 'feedback_type', 'timestamp')
search_fields = ('email',)
date_hierarchy = 'timestamp'
admin.site.register(ComplaintRecord, ComplaintRecordAdmin)
@admin.register(SendRecord)
class SendRecordAdmin(admin.ModelAdmin): class SendRecordAdmin(admin.ModelAdmin):
"""Admin interface for send records."""
model = SendRecord model = SendRecord
list_display = ('source', 'destination', 'subject', 'timestamp', 'status') list_display = ('source', 'destination', 'subject', 'timestamp', 'status')
list_filter = ('source', 'destination', 'subject', 'timestamp', 'status') list_display_links = ('destination',)
list_filter = (AdminEmailListFilter, 'source', 'status', 'timestamp')
search_fields = ('source', 'destination', 'subject')
date_hierarchy = 'timestamp'
admin.site.register(SendRecord, SendRecordAdmin)
@admin.register(UnknownRecord)
class UnknownRecordAdmin(admin.ModelAdmin): class UnknownRecordAdmin(admin.ModelAdmin):
"""Admin interface for unknown SES event records."""
model = UnknownRecord model = UnknownRecord
list_display = ('event_type', 'aws_data') list_display = ('event_type', 'timestamp')
list_filter = ('event_type', 'aws_data') list_display_links = ('event_type',)
list_filter = ('event_type', 'timestamp')
search_fields = ('event_type', 'aws_data')
date_hierarchy = 'timestamp'
admin.site.register(UnknownRecord, UnknownRecordAdmin)
@admin.register(BlackListedDomains)
class BlackListedDomainsAdmin(admin.ModelAdmin): class BlackListedDomainsAdmin(admin.ModelAdmin):
"""Admin interface for blacklisted domains."""
model = BlackListedDomains model = BlackListedDomains
list_display = ('domain', 'timestamp') list_display = ('domain', 'timestamp')
list_filter = ('domain', 'timestamp') list_display_links = ('domain',)
list_filter = ('timestamp',)
admin.site.register(BlackListedDomains, BlackListedDomainsAdmin) search_fields = ('domain',)
date_hierarchy = 'timestamp'

View File

@ -1,154 +1,203 @@
import hashlib import hashlib
import hmac
import logging import logging
import traceback
from django.conf import settings from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.sites.models import Site
from django.db import models from django.db import models
from django.db.models.signals import post_save from django.db.models.signals import post_save
from django.dispatch import receiver from django.dispatch import receiver
from django.contrib.sites.models import Site
from django.contrib.auth import get_user_model # If used custom user model
from django.urls import reverse from django.urls import reverse
from django.utils.encoding import force_bytes, force_str from django.utils.encoding import force_bytes
from django.utils.http import urlsafe_base64_encode, urlsafe_base64_decode from django.utils.http import urlsafe_base64_encode
User = get_user_model() User = get_user_model()
logger = logging.getLogger(__name__)
class AwsSesSettings(models.Model):
site = models.OneToOneField(Site, on_delete=models.CASCADE)
access_key = models.CharField(max_length=255, blank=True, null=True,)
secret_key = models.CharField(max_length=255, blank=True, null=True,)
region_name = models.CharField(max_length=255, blank=True, null=True,)
region_endpoint = models.CharField(max_length=255, blank=True, null=True,)
class Meta:
verbose_name = 'AWS SES Settings'
@receiver(post_save, sender=Site) @receiver(post_save, sender=Site)
def update_awsses_settings(sender, instance, created, **kwargs): def update_awsses_settings(sender, instance, created, **kwargs):
"""Create or update AwsSesSettings when a Site is saved."""
try: try:
if created: if created:
AwsSesSettings.objects.create(site=instance) AwsSesSettings.objects.create(site=instance)
instance.awssessettings.save() instance.awssessettings.save()
except Exception as e: except Exception as e:
print("Exception saving site error:%s" % e) logger.error(f"Failed to save AwsSesSettings for site {instance.id}: {e}")
track = traceback.format_exc()
print("Exception saving site track: %s" % (track))
class AwsSesUserAddon(models.Model):
user = models.OneToOneField(User, related_name='aws_ses', on_delete=models.CASCADE)
unsubscribe = models.BooleanField(default=False)
class Meta:
verbose_name = 'User Data'
def get_email(self):
email_field = self.user.get_email_field_name()
email = getattr(self, email_field, '') or ''
return email
def unsubscribe_hash_generator(self):
email = self.get_email()
string_to_hash = "%s%s" % (str(self.user.pk), email)
return hashlib.md5(string_to_hash.encode()).hexdigest()
def check_unsubscribe_hash(self, hash):
test_hash = self.unsubscribe_hash_generator()
return hash == test_hash
def unsubscribe_url_generator(self):
uuid = urlsafe_base64_encode(force_bytes(self.user.pk))
hash = self.unsubscribe_hash_generator()
return reverse('django_aws_ses:aws_ses_unsubscribe', kwargs={"uuid":uuid, "hash":hash})
@receiver(post_save, sender=User) @receiver(post_save, sender=User)
def update_awsses_user(sender, instance, created, **kwargs): def update_awsses_user(sender, instance, created, **kwargs):
if created: """Create or update AwsSesUserAddon when a User is saved."""
AwsSesUserAddon.objects.create(user=instance)
try: try:
if created:
AwsSesUserAddon.objects.create(user=instance)
instance.aws_ses.save() instance.aws_ses.save()
except AwsSesUserAddon.DoesNotExist: except Exception as e:
AwsSesUserAddon.objects.create(user=instance) logger.error(f"Failed to save AwsSesUserAddon for user {instance.id}: {e}")
class AwsSesSettings(models.Model):
"""AWS SES configuration settings for a site."""
site = models.OneToOneField(Site, on_delete=models.CASCADE, related_name='awssessettings')
access_key = models.CharField(max_length=255, blank=True, null=True)
secret_key = models.CharField(max_length=255, blank=True, null=True)
region_name = models.CharField(max_length=255, blank=True, null=True)
region_endpoint = models.CharField(max_length=255, blank=True, null=True)
class Meta:
verbose_name = 'AWS SES Settings'
verbose_name_plural = 'AWS SES Settings'
def __str__(self):
return f"AWS SES Settings for {self.site.domain}"
class AwsSesUserAddon(models.Model):
"""Additional AWS SES data for a user, including unsubscribe status."""
user = models.OneToOneField(User, related_name='aws_ses', on_delete=models.CASCADE)
unsubscribe = models.BooleanField(default=False)
class Meta:
verbose_name = 'AWS SES User Addon'
verbose_name_plural = 'AWS SES User Addons'
def __str__(self):
return f"AWS SES Addon for {self.user.email}"
def get_email(self):
"""Get the user's email address."""
email_field = self.user.get_email_field_name()
return getattr(self.user, email_field, '') or ''
def unsubscribe_hash_generator(self):
"""Generate a secure hash for unsubscribe verification."""
email = self.get_email()
message = f"{self.user.pk}{email}".encode()
return hmac.new(
settings.SECRET_KEY.encode(),
message,
hashlib.sha256
).hexdigest()
def check_unsubscribe_hash(self, hash_value):
"""Verify an unsubscribe hash."""
return hmac.compare_digest(self.unsubscribe_hash_generator(), hash_value)
def unsubscribe_url_generator(self):
"""Generate a secure unsubscribe URL."""
uuid = urlsafe_base64_encode(force_bytes(str(self.user.pk)))
hash_value = self.unsubscribe_hash_generator()
return reverse('django_aws_ses:aws_ses_unsubscribe', kwargs={"uuid": uuid, "hash": hash_value})
class SESStat(models.Model): class SESStat(models.Model):
"""Daily statistics for AWS SES email sending."""
date = models.DateField(unique=True, db_index=True) date = models.DateField(unique=True, db_index=True)
delivery_attempts = models.PositiveIntegerField() delivery_attempts = models.PositiveIntegerField()
bounces = models.PositiveIntegerField() bounces = models.PositiveIntegerField()
complaints = models.PositiveIntegerField() complaints = models.PositiveIntegerField()
rejects = models.PositiveIntegerField() rejects = models.PositiveIntegerField()
class Meta: class Meta:
verbose_name = 'SES Stat' verbose_name = 'SES Statistic'
verbose_name_plural = 'SES Statistics'
ordering = ['-date'] ordering = ['-date']
def __unicode__(self): def __str__(self):
return self.date.strftime("%Y-%m-%d") return self.date.strftime("%Y-%m-%d")
class BounceRecord(models.Model): class BounceRecord(models.Model):
timestamp = models.DateTimeField(auto_now_add=True) """Record of an email bounce event from AWS SES."""
email = models.EmailField() timestamp = models.DateTimeField(auto_now_add=True, db_index=True)
bounce_type = models.CharField(max_length=255, blank=True, null=True,) email = models.EmailField(db_index=True)
bounce_sub_type = models.CharField(max_length=255, blank=True, null=True,) bounce_type = models.CharField(max_length=255, blank=True, null=True)
reporting_mta = models.CharField(max_length=255, blank=True, null=True,) bounce_sub_type = models.CharField(max_length=255, blank=True, null=True)
status = models.CharField(max_length=255, blank=True, null=True,) reporting_mta = models.CharField(max_length=255, blank=True, null=True)
action = models.CharField(max_length=255, blank=True, null=True,) status = models.CharField(max_length=255, blank=True, null=True)
feedback_id = models.TextField(max_length=255, blank=True, null=True,) action = models.CharField(max_length=255, blank=True, null=True)
diagnostic_code = models.CharField(max_length=2048, blank=True, null=True,) feedback_id = models.TextField(blank=True, null=True)
diagnostic_code = models.CharField(max_length=2048, blank=True, null=True)
cleared = models.BooleanField(default=False) cleared = models.BooleanField(default=False)
class Meta: class Meta:
indexes = [models.Index(fields=["email"]),] verbose_name = 'Bounce Record'
verbose_name_plural = 'Bounce Records'
indexes = [models.Index(fields=['email', 'timestamp'])]
def __str__(self): def __str__(self):
return "email: %s, type: %s, sub_type: %s, status: %s, date: %s" % (self.email, self.bounce_type, self.bounce_sub_type, self.status, self.timestamp) return f"Bounce: {self.email} ({self.bounce_type}, {self.timestamp})"
class ComplaintRecord(models.Model): class ComplaintRecord(models.Model):
timestamp = models.DateTimeField(auto_now_add=True) """Record of an email complaint event from AWS SES."""
email = models.EmailField() timestamp = models.DateTimeField(auto_now_add=True, db_index=True)
sub_type = models.CharField(max_length=255, blank=True, null=True,) email = models.EmailField(db_index=True)
feedback_id = models.TextField(max_length=255, blank=True, null=True,) sub_type = models.CharField(max_length=255, blank=True, null=True)
feedback_type = models.CharField(max_length=255, blank=True, null=True,) feedback_id = models.TextField(blank=True, null=True)
feedback_type = models.CharField(max_length=255, blank=True, null=True)
class Meta:
verbose_name = 'Complaint Record'
verbose_name_plural = 'Complaint Records'
indexes = [models.Index(fields=['email', 'timestamp'])]
def __str__(self): def __str__(self):
return "email: %s, sub_type: %s, feedback_type: %s, date: %s" % (self.email, self.bounce_sub_type, self.feedback_type, self.timestamp) return f"Complaint: {self.email} ({self.feedback_type}, {self.timestamp})"
class SendRecord(models.Model): class SendRecord(models.Model):
"""Record of an email send or delivery event from AWS SES."""
SEND = 'Send' SEND = 'Send'
DELIVERED = 'Delivery' DELIVERED = 'Delivery'
STATUS_CHOICE = ( STATUS_CHOICES = (
(SEND, SEND), (SEND, 'Send'),
(DELIVERED, DELIVERED), (DELIVERED, 'Delivery'),
) )
timestamp = models.DateTimeField(auto_now_add=True)
source = models.EmailField()
destination = models.EmailField()
subject = models.TextField(max_length=255, blank=True, null=True,)
message_id = models.TextField(max_length=255, blank=True, null=True,)
aws_process_time = models.IntegerField()
smtp_response = models.CharField(max_length=255, blank=True, null=True,)
status = models.CharField(max_length=255, blank=True, null=True,)
class Meta:
indexes = [models.Index(fields=["destination"]),]
def __str__(self):
return "source: %s, destination: %s, subject: %s, date: %s" % (self.source, self.destination, self.subject, self.timestamp)
timestamp = models.DateTimeField(auto_now_add=True, db_index=True)
source = models.EmailField()
destination = models.EmailField(db_index=True)
subject = models.TextField(max_length=998, blank=True, null=True)
message_id = models.TextField(max_length=255, blank=True, null=True)
aws_process_time = models.IntegerField(default=0)
smtp_response = models.CharField(max_length=255, blank=True, null=True)
status = models.CharField(max_length=20, choices=STATUS_CHOICES, blank=True, null=True)
class Meta:
verbose_name = 'Send Record'
verbose_name_plural = 'Send Records'
indexes = [models.Index(fields=['destination', 'timestamp'])]
def __str__(self):
return f"Send: {self.source} to {self.destination} ({self.status}, {self.timestamp})"
class UnknownRecord(models.Model): class UnknownRecord(models.Model):
timestamp = models.DateTimeField(auto_now_add=True) """Record of unrecognized AWS SES events."""
event_type = models.CharField(max_length=255, blank=True, null=True,) timestamp = models.DateTimeField(auto_now_add=True, db_index=True)
aws_data = models.TextField(blank=True, null=True,) event_type = models.CharField(max_length=255, blank=True, null=True)
aws_data = models.TextField(blank=True, null=True)
class Meta:
verbose_name = 'Unknown Record'
verbose_name_plural = 'Unknown Records'
indexes = [models.Index(fields=['event_type', 'timestamp'])]
def __str__(self): def __str__(self):
return "eventType: %s, timestamp: %s" % (self.eventType, self.timestamp) return f"Unknown Event: {self.event_type} ({self.timestamp})"
class BlackListedDomains(models.Model): class BlackListedDomains(models.Model):
domain = models.CharField(max_length=255, unique=True) """Domains blacklisted for email sending."""
timestamp = models.DateTimeField(auto_now_add=True) domain = models.CharField(max_length=255, unique=True, db_index=True)
timestamp = models.DateTimeField(auto_now_add=True, db_index=True)
class Meta:
verbose_name = 'Blacklisted Domain'
verbose_name_plural = 'Blacklisted Domains'
def __str__(self): def __str__(self):
return "%s, blocked: %s" % (self.domain, self.timestamp) return f"Blacklisted: {self.domain} ({self.timestamp})"