Source code for django_sorcery.db.query

"""sqlalchemy query related things."""
from collections import namedtuple
from functools import partial

import sqlalchemy as sa
import sqlalchemy.orm  # noqa
from django.db.models.constants import LOOKUP_SEP

from ..utils import lower
from . import meta


Operation = namedtuple("Operation", ["name", "args", "kwargs"])

# todo add transforms support - e.g. column__date__gt
LOOKUP_TO_EXPRESSION = {
    "contains": lambda column, value: column.contains(value),
    # "date"
    # "day"
    "endswith": lambda column, value: column.endswith(value),
    "exact": lambda column, value: column == value,
    "gt": lambda column, value: column > value,
    "gte": lambda column, value: column >= value,
    # "hour"
    "icontains": lambda column, value: sa.func.lower(column).contains(lower(value)),
    "iendswith": lambda column, value: sa.func.lower(column).endswith(lower(value)),
    "iexact": lambda column, value: sa.func.lower(column) == lower(value),
    "iin": lambda column, value: sa.func.lower(column).in_(lower(i) for i in value),
    "in": lambda column, value: column.in_(value),
    # "iregex"
    "isnull": lambda column, value: column == None if value else column != None,  # noqa
    "istartswith": lambda column, value: sa.func.lower(column).startswith(lower(value)),
    "lt": lambda column, value: column < value,
    "lte": lambda column, value: column <= value,
    # "minute"
    # "month"
    # "quarter"
    "range": lambda column, value: column.between(*value),
    # "regex"
    # "second"
    "startswith": lambda column, value: column.startswith(value),
    # "time"
    # "week"
    # "week_day"
    # "year"
}


[docs]class Query(sa.orm.Query): """A customized sqlalchemy query."""
[docs] def get(self, *args, **kwargs): """Return an instance based on the given primary key identifier, either as args or kwargs for composite keys. If no instance is found, returns ``None``. """ if kwargs: mapper = self._only_full_mapper_zero("get") pk = meta.model_info(mapper).primary_keys_from_dict(kwargs) if pk is not None: return super().get(pk) return None return super().get(*args, **kwargs)
[docs] def order_by(self, *criterion): """Standard SQLAlchemy ordering plus django-like expressions can be provided: For example:: MyModel.objects.order_by("-id") MyModel.objects.order_by("name") """ if all(isinstance(criteria, str) for criteria in criterion): model = self._only_full_mapper_zero("get") info = meta.model_info(model) new_criterion = [] for criteria in criterion: direction = sa.asc if criteria[0] == "-": criteria = criteria[1:] direction = sa.desc elif criteria[0] == "+": criteria = criteria[1:] col_info = info.primary_keys.get(criteria) or info.properties.get(criteria) new_criterion.append(direction(col_info.attribute)) criterion = new_criterion return super().order_by(*criterion)
[docs] def filter(self, *args, **kwargs): """Standard SQLAlchemy filtering plus django-like expressions can be provided: For example:: MyModel.objects.filter(MyModel.id == 5) MyModel.objects.filter(id=5) MyModel.objects.filter(id__gte=5) MyModel.objects.filter(relation__id__gte=5) """ args = args + tuple(self._lookup_to_expression(k, v) for k, v in kwargs.items()) return super().filter(*args)
def _lookup_to_expression(self, lookup, value): parts = lookup.split(LOOKUP_SEP) info = meta.model_info(self._only_full_mapper_zero("get")) props = dict(info.column_properties) lhs = None for i, part in enumerate(parts, 1): is_last = i == len(parts) if part in LOOKUP_TO_EXPRESSION: return LOOKUP_TO_EXPRESSION[part](lhs, value) elif part in info.relationships: rel = info.relationships[part] # directly comparing to model # e.g. .filter(relation=instance) if is_last: return rel.attribute == value lhs = rel.related_model props = dict(meta.model_info(lhs).column_properties) elif part in info.composites: comp = info.composites[part] props = comp.properties # directly comparing to composite # e.g. .filter(composite=instance) if is_last: return comp.attribute == value else: lhs = props[part].attribute return lhs == value
[docs]class QueryProperty: """A property class that returns a session scoped query object against the class when called. Used by the ``SQLAlchemy.queryproperty`` For example:: >>> class MyView(object): ... queryset = db.queryproperty(FooModel) You can even pass default filtering criteria if needed:: >>> class MyView(object): ... queryset = db.queryproperty(FooModel, to_be_deleted=False) In addition this pattern can be used to implement Django's ORM style model managers:: >>> class UserModel(db.Model): ... id = db.Column(db.Integer(), primary_key=True) ... username = db.Column(db.String()) ... is_active = db.Column(db.Boolean()) ... ... active = db.queryproperty(is_active=True) That can be used directly:: >>> UserModel.metadata.create_all(bind=db.engine) >>> db.add_all([ ... UserModel(id=1, username='foo', is_active=False), ... UserModel(id=2, username='bar', is_active=True), ... ]) >>> db.flush() >>> UserModel.objects.all() [UserModel(id=1, is_active=False, username='foo'), UserModel(id=2, is_active=True, username='bar')] >>> UserModel.active.all() [UserModel(id=2, is_active=True, username='bar')] This pattern is very useful when combined with Django style views:: >>> class MyView(object): ... queryset = UserModel.active >>> MyView().queryset.all() [UserModel(id=2, is_active=True, username='bar')] Additional filters/options can be applied as well:: >>> class MyView(object): ... queryset = UserModel.active.filter(UserModel.username == 'test') >>> MyView().queryset.all() [] """ def __init__(self, db, model=None, *args, **kwargs): self.db = db self.model = model self.ops = [] if args: self.ops.append(Operation("filter", args, {})) if kwargs: self.ops.append(Operation("filter_by", (), kwargs)) # sanity checks if self.model: assert isinstance(self.model, type) and issubclass( self.model, self.db.Model ), "{!r} is not SQLAlchemy model subclassing {!r}".format(model, self.db.Model) def __repr__(self): return "<{} db={!r}, model={!r}>".format(self.__class__.__name__, self.db, self.model.__name__) def _with_op(self, name, *args, **kwargs): prop = type(self)(self.db, self.model) prop.ops += self.ops prop.ops.append(Operation(name, args, kwargs)) return prop def __getattr__(self, item): if not hasattr(getattr(self.model, "query_class", Query), item): raise AttributeError("{!r} object has no attribute {!r}".format(self, item)) return partial(self._with_op, item) def __get__(self, instance, owner): model = self.model or (owner if issubclass(owner, self.db.Model) else None) if not model: raise AttributeError( "Cannot access {} when not bound to a model. " "You can explicitly instantiate descriptor with model class - `db.queryproperty(Model)`." "".format(self.__class__.__name__) ) try: mapper = sa.orm.class_mapper(model) except sa.orm.exc.UnmappedClassError: # when subclass references unmapped base class descriptor return query_class = getattr(model, "query_class", None) or self.db.query_class return self._apply_ops(query_class(mapper, session=self.db)) def _apply_ops(self, query): for op in self.ops: query = getattr(query, op.name)(*op.args, **op.kwargs) return query