Source code for django_sorcery.formsets.base
"""Helper functions for creating FormSet classes from SQLAlchemy models."""
from django.core.exceptions import ImproperlyConfigured
from django.forms import fields as djangofields
from django.forms.formsets import BaseFormSet, formset_factory
from django.forms.widgets import HiddenInput
from ..db import meta
from ..forms import ModelForm, modelform_factory
[docs]class BaseModelFormSet(BaseFormSet):
"""A ``FormSet`` for editing a queryset and/or adding new objects to it."""
model = None
session = None
absolute_max = 2000
# Set of fields that must be unique among forms of this set.
unique_fields = set()
def __init__(self, data=None, files=None, auto_id="id_%s", prefix=None, queryset=None, initial=None, **kwargs):
self.session = kwargs.pop("session", self.session)
self.queryset = queryset
self.initial_extra = initial
kwargs.update({"data": data, "files": files, "auto_id": auto_id, "prefix": prefix})
super().__init__(**kwargs)
[docs] def initial_form_count(self):
"""Return the number of forms that are required in this FormSet."""
return super().initial_form_count() if (self.data or self.files) else len(self.get_queryset())
def _existing_object(self, pk):
info = meta.model_info(self.model)
if not isinstance(pk, tuple):
pk = (pk,)
if not hasattr(self, "_object_dict"):
self._object_dict = {info.get_key(o): o for o in self.get_queryset()}
return self._object_dict.get(pk)
def _construct_form(self, i, **kwargs):
pk_required = i < self.initial_form_count()
if pk_required:
if self.is_bound:
info = meta.model_info(self.model)
pks = {}
for name, pk_info in info.primary_keys.items():
pk_key = f"{self.add_prefix(i)}-{name}"
pk_val = self.data.get(pk_key)
pks[name] = pk_info.column.type.python_type(pk_val) if pk_val else None
pk = info.primary_keys_from_dict(pks)
kwargs["instance"] = self._existing_object(pk)
else:
kwargs["instance"] = self.get_queryset()[i]
elif self.initial_extra:
# Set initial values for extra forms
try:
kwargs["initial"] = self.initial_extra[i - self.initial_form_count()]
except IndexError:
pass
kwargs["session"] = self.session
return super()._construct_form(i, **kwargs)
[docs] def add_fields(self, form, index):
info = meta.model_info(self.model)
if form.instance in self.session:
for name in info.primary_keys:
pk_field = djangofields.Field(initial=getattr(form.instance, name, None), widget=HiddenInput)
form.fields[name] = pk_field
super().add_fields(form, index)
[docs] def get_queryset(self):
"""Returns a query for the model."""
if not hasattr(self, "_queryset"):
if self.queryset is not None:
qs = self.queryset
else:
qs = self.session.query(self.model)[: self.absolute_max]
if hasattr(qs, "all"):
qs = qs.all()
self._queryset = qs
return self._queryset
[docs] def delete_existing(self, obj, **kwargs):
"""Deletes an existing model instance."""
self.session.delete(obj)
[docs] def save(self, flush=True, **kwargs):
"""Save model instances for every form, adding and changing instances
as necessary, and return the list of instances."""
self.new_objects = []
self.changed_objects = []
self.deleted_objects = []
saved_instances = []
for form in self.extra_forms:
if not form.has_changed():
continue
if self.can_delete and self._should_delete_form(form):
continue
obj = form.save(flush=flush)
self.new_objects.append(obj)
saved_instances.append(obj)
for form in self.initial_forms:
if form in self.deleted_forms:
self.deleted_objects.append(form.instance)
self.delete_existing(form.instance)
continue
elif form.has_changed():
self.changed_objects.append(form.instance)
obj = form.save(flush=flush)
saved_instances.append(obj)
return saved_instances
save.alters_data = True
[docs]def modelformset_factory(
model,
form=ModelForm,
formfield_callback=None,
formset=BaseModelFormSet,
extra=1,
can_delete=False,
can_order=False,
max_num=None,
fields=None,
exclude=None,
widgets=None,
validate_max=False,
localized_fields=None,
labels=None,
help_texts=None,
error_messages=None,
min_num=None,
validate_min=False,
field_classes=None,
session=None,
):
"""Return a FormSet class for the given sqlalchemy model class."""
_meta = getattr(form, "Meta", None)
fields = getattr(_meta, "fields", fields)
exclude = getattr(_meta, "exclude", exclude)
if fields is None and exclude is None:
raise ImproperlyConfigured(
"Calling modelformset_factory without defining 'fields' or " "'exclude' explicitly is prohibited."
)
form = modelform_factory(
model,
form=form,
fields=fields,
exclude=exclude,
formfield_callback=formfield_callback,
widgets=widgets,
localized_fields=localized_fields,
labels=labels,
help_texts=help_texts,
error_messages=error_messages,
field_classes=field_classes,
session=session,
)
FormSet = formset_factory(
form,
formset,
extra=extra,
min_num=min_num,
max_num=max_num,
can_order=can_order,
can_delete=can_delete,
validate_min=validate_min,
validate_max=validate_max,
)
class_name = f"{model.__name__}FormSet"
return type(form)(str(class_name), (FormSet,), {"model": model, "session": session})