123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100 |
- from django.contrib.gis.db.backends.base.adapter import WKTAdapter
- from django.contrib.gis.db.backends.base.operations import (
- BaseSpatialOperations,
- )
- from django.contrib.gis.db.backends.utils import SpatialOperator
- from django.contrib.gis.db.models import aggregates
- from django.contrib.gis.geos.geometry import GEOSGeometryBase
- from django.contrib.gis.geos.prototypes.io import wkb_r
- from django.contrib.gis.measure import Distance
- from django.db.backends.mysql.operations import DatabaseOperations
- from django.utils.functional import cached_property
-
-
- class MySQLOperations(BaseSpatialOperations, DatabaseOperations):
-
- mysql = True
- name = 'mysql'
- geom_func_prefix = 'ST_'
-
- Adapter = WKTAdapter
-
- @cached_property
- def select(self):
- return self.geom_func_prefix + 'AsBinary(%s)'
-
- @cached_property
- def from_text(self):
- return self.geom_func_prefix + 'GeomFromText'
-
- @cached_property
- def gis_operators(self):
- MBREquals = 'MBREqual' if (
- self.connection.mysql_is_mariadb or self.connection.mysql_version < (5, 7, 6)
- ) else 'MBREquals'
- return {
- 'bbcontains': SpatialOperator(func='MBRContains'), # For consistency w/PostGIS API
- 'bboverlaps': SpatialOperator(func='MBROverlaps'), # ...
- 'contained': SpatialOperator(func='MBRWithin'), # ...
- 'contains': SpatialOperator(func='MBRContains'),
- 'disjoint': SpatialOperator(func='MBRDisjoint'),
- 'equals': SpatialOperator(func=MBREquals),
- 'exact': SpatialOperator(func=MBREquals),
- 'intersects': SpatialOperator(func='MBRIntersects'),
- 'overlaps': SpatialOperator(func='MBROverlaps'),
- 'same_as': SpatialOperator(func=MBREquals),
- 'touches': SpatialOperator(func='MBRTouches'),
- 'within': SpatialOperator(func='MBRWithin'),
- }
-
- disallowed_aggregates = (
- aggregates.Collect, aggregates.Extent, aggregates.Extent3D,
- aggregates.MakeLine, aggregates.Union,
- )
-
- @cached_property
- def unsupported_functions(self):
- unsupported = {
- 'AsGML', 'AsKML', 'AsSVG', 'Azimuth', 'BoundingCircle',
- 'ForcePolygonCW', 'ForceRHR', 'LineLocatePoint', 'MakeValid',
- 'MemSize', 'Perimeter', 'PointOnSurface', 'Reverse', 'Scale',
- 'SnapToGrid', 'Transform', 'Translate',
- }
- if self.connection.mysql_is_mariadb:
- unsupported.update({'GeoHash', 'IsValid'})
- if self.connection.mysql_version < (10, 2, 4):
- unsupported.add('AsGeoJSON')
- elif self.connection.mysql_version < (5, 7, 5):
- unsupported.update({'AsGeoJSON', 'GeoHash', 'IsValid'})
- return unsupported
-
- def geo_db_type(self, f):
- return f.geom_type
-
- def get_distance(self, f, value, lookup_type):
- value = value[0]
- if isinstance(value, Distance):
- if f.geodetic(self.connection):
- raise ValueError(
- 'Only numeric values of degree units are allowed on '
- 'geodetic distance queries.'
- )
- dist_param = getattr(value, Distance.unit_attname(f.units_name(self.connection)))
- else:
- dist_param = value
- return [dist_param]
-
- def get_geometry_converter(self, expression):
- read = wkb_r().read
- srid = expression.output_field.srid
- if srid == -1:
- srid = None
- geom_class = expression.output_field.geom_class
-
- def converter(value, expression, connection):
- if value is not None:
- geom = GEOSGeometryBase(read(memoryview(value)), geom_class)
- if srid:
- geom.srid = srid
- return geom
- return converter
|