Source code for django_sorcery.db.middleware

"""Django middleware support for sqlalchemy."""
import logging

from . import databases
from .signals import all_signals


before_middleware_request = all_signals.signal("before_middleware_request")
after_middleware_response = all_signals.signal("after_middleware_response")

logger = logging.getLogger(__name__)


[docs]class BaseMiddleware: """Base middleware implementation that supports unit of work per request for django.""" logger = logger def __init__(self, get_response=None): self.get_response = get_response def __call__(self, request): response = self.process_request(request) if response is not None: return self.return_response(request, response) response = self.get_response(request) return self.process_response(request, response)
[docs] def process_request(self, request): """Hook for adding arbitrary logic to request processing.""" before_middleware_request.send(self.__class__, middleware=self, request=request)
[docs] def process_response(self, request, response): """Commits or rollbacks scoped sessions depending on status code then removes them.""" if response.status_code >= 400: self.rollback(request=request, response=response) return self.return_response(request, response) if request.method not in {"PUT", "POST", "PATCH", "GET", "DELETE"}: self.rollback(request=request, response=response) return self.return_response(request, response) try: self.flush(request=request, response=response) self.commit(request=request, response=response) except Exception: self.logger.error("Error during flush or commit") self.rollback(request=request, response=response) self.return_response(request, response) raise return self.return_response(request, response)
[docs] def return_response(self, request, response): """Hook for adding arbitrary logic to response processing.""" self.remove(request=request, response=response) after_middleware_response.send(self.__class__, middleware=self, request=request, response=response) return response
[docs]class SQLAlchemyDBMiddleware(BaseMiddleware): """A base SQLAlchemy db middleware. Used by SQLAlchemy to provide a default middleware for a single db, it will first try to flush and if successfull, proceed with commit. If there are any errors during flush, will issue a rollback. """ db = None
[docs] def rollback(self, request, response): """Rolls back current scoped session.""" self.db.rollback()
[docs] def flush(self, request, response): """Flushes current scoped session.""" self.db.flush()
[docs] def commit(self, request, response): """Commits current scoped session.""" self.db.commit()
[docs] def remove(self, request, response): """Removes current scoped session.""" self.db.remove()
[docs]class SQLAlchemyMiddleware(SQLAlchemyDBMiddleware): """A sqlalchemy middleware that manages all the dbs configured and initialized. it will first try to flush all the configured and initialized SQLAlchemy instances and if successfull, proceed with commit. If there are any errors during flush, all transactions will be rolled back. """ db = databases