123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- import json
-
- from psycopg2.extras import Json
-
- from django.contrib.postgres import forms, lookups
- from django.core import exceptions
- from django.db.models import (
- Field, TextField, Transform, lookups as builtin_lookups,
- )
- from django.utils.translation import gettext_lazy as _
-
- from .mixins import CheckFieldDefaultMixin
-
- __all__ = ['JSONField']
-
-
- class JsonAdapter(Json):
- """
- Customized psycopg2.extras.Json to allow for a custom encoder.
- """
- def __init__(self, adapted, dumps=None, encoder=None):
- self.encoder = encoder
- super().__init__(adapted, dumps=dumps)
-
- def dumps(self, obj):
- options = {'cls': self.encoder} if self.encoder else {}
- return json.dumps(obj, **options)
-
-
- class JSONField(CheckFieldDefaultMixin, Field):
- empty_strings_allowed = False
- description = _('A JSON object')
- default_error_messages = {
- 'invalid': _("Value must be valid JSON."),
- }
- _default_hint = ('dict', '{}')
-
- def __init__(self, verbose_name=None, name=None, encoder=None, **kwargs):
- if encoder and not callable(encoder):
- raise ValueError("The encoder parameter must be a callable object.")
- self.encoder = encoder
- super().__init__(verbose_name, name, **kwargs)
-
- def db_type(self, connection):
- return 'jsonb'
-
- def deconstruct(self):
- name, path, args, kwargs = super().deconstruct()
- if self.encoder is not None:
- kwargs['encoder'] = self.encoder
- return name, path, args, kwargs
-
- def get_transform(self, name):
- transform = super().get_transform(name)
- if transform:
- return transform
- return KeyTransformFactory(name)
-
- def get_prep_value(self, value):
- if value is not None:
- return JsonAdapter(value, encoder=self.encoder)
- return value
-
- def validate(self, value, model_instance):
- super().validate(value, model_instance)
- options = {'cls': self.encoder} if self.encoder else {}
- try:
- json.dumps(value, **options)
- except TypeError:
- raise exceptions.ValidationError(
- self.error_messages['invalid'],
- code='invalid',
- params={'value': value},
- )
-
- def value_to_string(self, obj):
- return self.value_from_object(obj)
-
- def formfield(self, **kwargs):
- return super().formfield(**{
- 'form_class': forms.JSONField,
- **kwargs,
- })
-
-
- JSONField.register_lookup(lookups.DataContains)
- JSONField.register_lookup(lookups.ContainedBy)
- JSONField.register_lookup(lookups.HasKey)
- JSONField.register_lookup(lookups.HasKeys)
- JSONField.register_lookup(lookups.HasAnyKeys)
- JSONField.register_lookup(lookups.JSONExact)
-
-
- class KeyTransform(Transform):
- operator = '->'
- nested_operator = '#>'
-
- def __init__(self, key_name, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.key_name = key_name
-
- def as_sql(self, compiler, connection):
- key_transforms = [self.key_name]
- previous = self.lhs
- while isinstance(previous, KeyTransform):
- key_transforms.insert(0, previous.key_name)
- previous = previous.lhs
- lhs, params = compiler.compile(previous)
- if len(key_transforms) > 1:
- return "(%s %s %%s)" % (lhs, self.nested_operator), [key_transforms] + params
- try:
- lookup = int(self.key_name)
- except ValueError:
- lookup = self.key_name
- return '(%s %s %%s)' % (lhs, self.operator), tuple(params) + (lookup,)
-
-
- class KeyTextTransform(KeyTransform):
- operator = '->>'
- nested_operator = '#>>'
- output_field = TextField()
-
-
- class KeyTransformTextLookupMixin:
- """
- Mixin for combining with a lookup expecting a text lhs from a JSONField
- key lookup. Make use of the ->> operator instead of casting key values to
- text and performing the lookup on the resulting representation.
- """
- def __init__(self, key_transform, *args, **kwargs):
- assert isinstance(key_transform, KeyTransform)
- key_text_transform = KeyTextTransform(
- key_transform.key_name, *key_transform.source_expressions, **key_transform.extra
- )
- super().__init__(key_text_transform, *args, **kwargs)
-
-
- class KeyTransformIExact(KeyTransformTextLookupMixin, builtin_lookups.IExact):
- pass
-
-
- class KeyTransformIContains(KeyTransformTextLookupMixin, builtin_lookups.IContains):
- pass
-
-
- class KeyTransformStartsWith(KeyTransformTextLookupMixin, builtin_lookups.StartsWith):
- pass
-
-
- class KeyTransformIStartsWith(KeyTransformTextLookupMixin, builtin_lookups.IStartsWith):
- pass
-
-
- class KeyTransformEndsWith(KeyTransformTextLookupMixin, builtin_lookups.EndsWith):
- pass
-
-
- class KeyTransformIEndsWith(KeyTransformTextLookupMixin, builtin_lookups.IEndsWith):
- pass
-
-
- class KeyTransformRegex(KeyTransformTextLookupMixin, builtin_lookups.Regex):
- pass
-
-
- class KeyTransformIRegex(KeyTransformTextLookupMixin, builtin_lookups.IRegex):
- pass
-
-
- KeyTransform.register_lookup(KeyTransformIExact)
- KeyTransform.register_lookup(KeyTransformIContains)
- KeyTransform.register_lookup(KeyTransformStartsWith)
- KeyTransform.register_lookup(KeyTransformIStartsWith)
- KeyTransform.register_lookup(KeyTransformEndsWith)
- KeyTransform.register_lookup(KeyTransformIEndsWith)
- KeyTransform.register_lookup(KeyTransformRegex)
- KeyTransform.register_lookup(KeyTransformIRegex)
-
-
- class KeyTransformFactory:
-
- def __init__(self, key_name):
- self.key_name = key_name
-
- def __call__(self, *args, **kwargs):
- return KeyTransform(self.key_name, *args, **kwargs)
|