|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339 |
- import threading
- from ctypes import POINTER, Structure, byref, c_byte, c_char_p, c_int, c_size_t
-
- from django.contrib.gis.geos.base import GEOSBase
- from django.contrib.gis.geos.libgeos import (
- GEOM_PTR, GEOSFuncFactory, geos_version_tuple,
- )
- from django.contrib.gis.geos.prototypes.errcheck import (
- check_geom, check_sized_string, check_string,
- )
- from django.contrib.gis.geos.prototypes.geom import c_uchar_p, geos_char_p
- from django.utils.encoding import force_bytes
-
-
- # ### The WKB/WKT Reader/Writer structures and pointers ###
- class WKTReader_st(Structure):
- pass
-
-
- class WKTWriter_st(Structure):
- pass
-
-
- class WKBReader_st(Structure):
- pass
-
-
- class WKBWriter_st(Structure):
- pass
-
-
- WKT_READ_PTR = POINTER(WKTReader_st)
- WKT_WRITE_PTR = POINTER(WKTWriter_st)
- WKB_READ_PTR = POINTER(WKBReader_st)
- WKB_WRITE_PTR = POINTER(WKBReader_st)
-
- # WKTReader routines
- wkt_reader_create = GEOSFuncFactory('GEOSWKTReader_create', restype=WKT_READ_PTR)
- wkt_reader_destroy = GEOSFuncFactory('GEOSWKTReader_destroy', argtypes=[WKT_READ_PTR])
-
- wkt_reader_read = GEOSFuncFactory(
- 'GEOSWKTReader_read', argtypes=[WKT_READ_PTR, c_char_p], restype=GEOM_PTR, errcheck=check_geom
- )
- # WKTWriter routines
- wkt_writer_create = GEOSFuncFactory('GEOSWKTWriter_create', restype=WKT_WRITE_PTR)
- wkt_writer_destroy = GEOSFuncFactory('GEOSWKTWriter_destroy', argtypes=[WKT_WRITE_PTR])
-
- wkt_writer_write = GEOSFuncFactory(
- 'GEOSWKTWriter_write', argtypes=[WKT_WRITE_PTR, GEOM_PTR], restype=geos_char_p, errcheck=check_string
- )
-
- wkt_writer_get_outdim = GEOSFuncFactory(
- 'GEOSWKTWriter_getOutputDimension', argtypes=[WKT_WRITE_PTR], restype=c_int
- )
- wkt_writer_set_outdim = GEOSFuncFactory(
- 'GEOSWKTWriter_setOutputDimension', argtypes=[WKT_WRITE_PTR, c_int]
- )
-
- wkt_writer_set_trim = GEOSFuncFactory('GEOSWKTWriter_setTrim', argtypes=[WKT_WRITE_PTR, c_byte])
- wkt_writer_set_precision = GEOSFuncFactory('GEOSWKTWriter_setRoundingPrecision', argtypes=[WKT_WRITE_PTR, c_int])
-
- # WKBReader routines
- wkb_reader_create = GEOSFuncFactory('GEOSWKBReader_create', restype=WKB_READ_PTR)
- wkb_reader_destroy = GEOSFuncFactory('GEOSWKBReader_destroy', argtypes=[WKB_READ_PTR])
-
-
- class WKBReadFunc(GEOSFuncFactory):
- # Although the function definitions take `const unsigned char *`
- # as their parameter, we use c_char_p here so the function may
- # take Python strings directly as parameters. Inside Python there
- # is not a difference between signed and unsigned characters, so
- # it is not a problem.
- argtypes = [WKB_READ_PTR, c_char_p, c_size_t]
- restype = GEOM_PTR
- errcheck = staticmethod(check_geom)
-
-
- wkb_reader_read = WKBReadFunc('GEOSWKBReader_read')
- wkb_reader_read_hex = WKBReadFunc('GEOSWKBReader_readHEX')
-
- # WKBWriter routines
- wkb_writer_create = GEOSFuncFactory('GEOSWKBWriter_create', restype=WKB_WRITE_PTR)
- wkb_writer_destroy = GEOSFuncFactory('GEOSWKBWriter_destroy', argtypes=[WKB_WRITE_PTR])
-
-
- # WKB Writing prototypes.
- class WKBWriteFunc(GEOSFuncFactory):
- argtypes = [WKB_WRITE_PTR, GEOM_PTR, POINTER(c_size_t)]
- restype = c_uchar_p
- errcheck = staticmethod(check_sized_string)
-
-
- wkb_writer_write = WKBWriteFunc('GEOSWKBWriter_write')
- wkb_writer_write_hex = WKBWriteFunc('GEOSWKBWriter_writeHEX')
-
-
- # WKBWriter property getter/setter prototypes.
- class WKBWriterGet(GEOSFuncFactory):
- argtypes = [WKB_WRITE_PTR]
- restype = c_int
-
-
- class WKBWriterSet(GEOSFuncFactory):
- argtypes = [WKB_WRITE_PTR, c_int]
-
-
- wkb_writer_get_byteorder = WKBWriterGet('GEOSWKBWriter_getByteOrder')
- wkb_writer_set_byteorder = WKBWriterSet('GEOSWKBWriter_setByteOrder')
- wkb_writer_get_outdim = WKBWriterGet('GEOSWKBWriter_getOutputDimension')
- wkb_writer_set_outdim = WKBWriterSet('GEOSWKBWriter_setOutputDimension')
- wkb_writer_get_include_srid = WKBWriterGet('GEOSWKBWriter_getIncludeSRID', restype=c_byte)
- wkb_writer_set_include_srid = WKBWriterSet('GEOSWKBWriter_setIncludeSRID', argtypes=[WKB_WRITE_PTR, c_byte])
-
-
- # ### Base I/O Class ###
- class IOBase(GEOSBase):
- "Base class for GEOS I/O objects."
- def __init__(self):
- # Getting the pointer with the constructor.
- self.ptr = self._constructor()
- # Loading the real destructor function at this point as doing it in
- # __del__ is too late (import error).
- self.destructor.func
-
- # ### Base WKB/WKT Reading and Writing objects ###
-
-
- # Non-public WKB/WKT reader classes for internal use because
- # their `read` methods return _pointers_ instead of GEOSGeometry
- # objects.
- class _WKTReader(IOBase):
- _constructor = wkt_reader_create
- ptr_type = WKT_READ_PTR
- destructor = wkt_reader_destroy
-
- def read(self, wkt):
- if not isinstance(wkt, (bytes, str)):
- raise TypeError
- return wkt_reader_read(self.ptr, force_bytes(wkt))
-
-
- class _WKBReader(IOBase):
- _constructor = wkb_reader_create
- ptr_type = WKB_READ_PTR
- destructor = wkb_reader_destroy
-
- def read(self, wkb):
- "Return a _pointer_ to C GEOS Geometry object from the given WKB."
- if isinstance(wkb, memoryview):
- wkb_s = bytes(wkb)
- return wkb_reader_read(self.ptr, wkb_s, len(wkb_s))
- elif isinstance(wkb, (bytes, str)):
- return wkb_reader_read_hex(self.ptr, wkb, len(wkb))
- else:
- raise TypeError
-
-
- # ### WKB/WKT Writer Classes ###
- class WKTWriter(IOBase):
- _constructor = wkt_writer_create
- ptr_type = WKT_WRITE_PTR
- destructor = wkt_writer_destroy
-
- _trim = False
- _precision = None
-
- def __init__(self, dim=2, trim=False, precision=None):
- super().__init__()
- if bool(trim) != self._trim:
- self.trim = trim
- if precision is not None:
- self.precision = precision
- self.outdim = dim
-
- def write(self, geom):
- "Return the WKT representation of the given geometry."
- return wkt_writer_write(self.ptr, geom.ptr)
-
- @property
- def outdim(self):
- return wkt_writer_get_outdim(self.ptr)
-
- @outdim.setter
- def outdim(self, new_dim):
- if new_dim not in (2, 3):
- raise ValueError('WKT output dimension must be 2 or 3')
- wkt_writer_set_outdim(self.ptr, new_dim)
-
- @property
- def trim(self):
- return self._trim
-
- @trim.setter
- def trim(self, flag):
- if bool(flag) != self._trim:
- self._trim = bool(flag)
- wkt_writer_set_trim(self.ptr, self._trim)
-
- @property
- def precision(self):
- return self._precision
-
- @precision.setter
- def precision(self, precision):
- if (not isinstance(precision, int) or precision < 0) and precision is not None:
- raise AttributeError('WKT output rounding precision must be non-negative integer or None.')
- if precision != self._precision:
- self._precision = precision
- wkt_writer_set_precision(self.ptr, -1 if precision is None else precision)
-
-
- class WKBWriter(IOBase):
- _constructor = wkb_writer_create
- ptr_type = WKB_WRITE_PTR
- destructor = wkb_writer_destroy
- geos_version = geos_version_tuple()
-
- def __init__(self, dim=2):
- super().__init__()
- self.outdim = dim
-
- def _handle_empty_point(self, geom):
- from django.contrib.gis.geos import Point
- if isinstance(geom, Point) and geom.empty:
- if self.srid:
- # PostGIS uses POINT(NaN NaN) for WKB representation of empty
- # points. Use it for EWKB as it's a PostGIS specific format.
- # https://trac.osgeo.org/postgis/ticket/3181
- geom = Point(float('NaN'), float('NaN'), srid=geom.srid)
- else:
- raise ValueError('Empty point is not representable in WKB.')
- return geom
-
- def write(self, geom):
- "Return the WKB representation of the given geometry."
- from django.contrib.gis.geos import Polygon
- geom = self._handle_empty_point(geom)
- wkb = wkb_writer_write(self.ptr, geom.ptr, byref(c_size_t()))
- if self.geos_version < (3, 6, 1) and isinstance(geom, Polygon) and geom.empty:
- # Fix GEOS output for empty polygon.
- # See https://trac.osgeo.org/geos/ticket/680.
- wkb = wkb[:-8] + b'\0' * 4
- return memoryview(wkb)
-
- def write_hex(self, geom):
- "Return the HEXEWKB representation of the given geometry."
- from django.contrib.gis.geos.polygon import Polygon
- geom = self._handle_empty_point(geom)
- wkb = wkb_writer_write_hex(self.ptr, geom.ptr, byref(c_size_t()))
- if self.geos_version < (3, 6, 1) and isinstance(geom, Polygon) and geom.empty:
- wkb = wkb[:-16] + b'0' * 8
- return wkb
-
- # ### WKBWriter Properties ###
-
- # Property for getting/setting the byteorder.
- def _get_byteorder(self):
- return wkb_writer_get_byteorder(self.ptr)
-
- def _set_byteorder(self, order):
- if order not in (0, 1):
- raise ValueError('Byte order parameter must be 0 (Big Endian) or 1 (Little Endian).')
- wkb_writer_set_byteorder(self.ptr, order)
-
- byteorder = property(_get_byteorder, _set_byteorder)
-
- # Property for getting/setting the output dimension.
- @property
- def outdim(self):
- return wkb_writer_get_outdim(self.ptr)
-
- @outdim.setter
- def outdim(self, new_dim):
- if new_dim not in (2, 3):
- raise ValueError('WKB output dimension must be 2 or 3')
- wkb_writer_set_outdim(self.ptr, new_dim)
-
- # Property for getting/setting the include srid flag.
- @property
- def srid(self):
- return bool(wkb_writer_get_include_srid(self.ptr))
-
- @srid.setter
- def srid(self, include):
- wkb_writer_set_include_srid(self.ptr, bool(include))
-
-
- # `ThreadLocalIO` object holds instances of the WKT and WKB reader/writer
- # objects that are local to the thread. The `GEOSGeometry` internals
- # access these instances by calling the module-level functions, defined
- # below.
- class ThreadLocalIO(threading.local):
- wkt_r = None
- wkt_w = None
- wkb_r = None
- wkb_w = None
- ewkb_w = None
-
-
- thread_context = ThreadLocalIO()
-
-
- # These module-level routines return the I/O object that is local to the
- # thread. If the I/O object does not exist yet it will be initialized.
- def wkt_r():
- thread_context.wkt_r = thread_context.wkt_r or _WKTReader()
- return thread_context.wkt_r
-
-
- def wkt_w(dim=2, trim=False, precision=None):
- if not thread_context.wkt_w:
- thread_context.wkt_w = WKTWriter(dim=dim, trim=trim, precision=precision)
- else:
- thread_context.wkt_w.outdim = dim
- thread_context.wkt_w.trim = trim
- thread_context.wkt_w.precision = precision
- return thread_context.wkt_w
-
-
- def wkb_r():
- thread_context.wkb_r = thread_context.wkb_r or _WKBReader()
- return thread_context.wkb_r
-
-
- def wkb_w(dim=2):
- if not thread_context.wkb_w:
- thread_context.wkb_w = WKBWriter(dim=dim)
- else:
- thread_context.wkb_w.outdim = dim
- return thread_context.wkb_w
-
-
- def ewkb_w(dim=2):
- if not thread_context.ewkb_w:
- thread_context.ewkb_w = WKBWriter(dim=dim)
- thread_context.ewkb_w.srid = True
- else:
- thread_context.ewkb_w.outdim = dim
- return thread_context.ewkb_w
|