123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233 |
- from django.db.backends.base.introspection import (
- BaseDatabaseIntrospection, FieldInfo, TableInfo,
- )
- from django.db.models.indexes import Index
-
-
- class DatabaseIntrospection(BaseDatabaseIntrospection):
- # Maps type codes to Django Field types.
- data_types_reverse = {
- 16: 'BooleanField',
- 17: 'BinaryField',
- 20: 'BigIntegerField',
- 21: 'SmallIntegerField',
- 23: 'IntegerField',
- 25: 'TextField',
- 700: 'FloatField',
- 701: 'FloatField',
- 869: 'GenericIPAddressField',
- 1042: 'CharField', # blank-padded
- 1043: 'CharField',
- 1082: 'DateField',
- 1083: 'TimeField',
- 1114: 'DateTimeField',
- 1184: 'DateTimeField',
- 1186: 'DurationField',
- 1266: 'TimeField',
- 1700: 'DecimalField',
- 2950: 'UUIDField',
- }
-
- ignored_tables = []
-
- def get_field_type(self, data_type, description):
- field_type = super().get_field_type(data_type, description)
- if description.default and 'nextval' in description.default:
- if field_type == 'IntegerField':
- return 'AutoField'
- elif field_type == 'BigIntegerField':
- return 'BigAutoField'
- return field_type
-
- def get_table_list(self, cursor):
- """Return a list of table and view names in the current database."""
- cursor.execute("""
- SELECT c.relname,
- CASE WHEN {} THEN 'p' WHEN c.relkind IN ('m', 'v') THEN 'v' ELSE 't' END
- FROM pg_catalog.pg_class c
- LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
- WHERE c.relkind IN ('f', 'm', 'p', 'r', 'v')
- AND n.nspname NOT IN ('pg_catalog', 'pg_toast')
- AND pg_catalog.pg_table_is_visible(c.oid)
- """.format('c.relispartition' if self.connection.features.supports_table_partitions else 'FALSE'))
- return [TableInfo(*row) for row in cursor.fetchall() if row[0] not in self.ignored_tables]
-
- def get_table_description(self, cursor, table_name):
- """
- Return a description of the table with the DB-API cursor.description
- interface.
- """
- # Query the pg_catalog tables as cursor.description does not reliably
- # return the nullable property and information_schema.columns does not
- # contain details of materialized views.
- cursor.execute("""
- SELECT
- a.attname AS column_name,
- NOT (a.attnotnull OR (t.typtype = 'd' AND t.typnotnull)) AS is_nullable,
- pg_get_expr(ad.adbin, ad.adrelid) AS column_default
- FROM pg_attribute a
- LEFT JOIN pg_attrdef ad ON a.attrelid = ad.adrelid AND a.attnum = ad.adnum
- JOIN pg_type t ON a.atttypid = t.oid
- JOIN pg_class c ON a.attrelid = c.oid
- JOIN pg_namespace n ON c.relnamespace = n.oid
- WHERE c.relkind IN ('f', 'm', 'p', 'r', 'v')
- AND c.relname = %s
- AND n.nspname NOT IN ('pg_catalog', 'pg_toast')
- AND pg_catalog.pg_table_is_visible(c.oid)
- """, [table_name])
- field_map = {line[0]: line[1:] for line in cursor.fetchall()}
- cursor.execute("SELECT * FROM %s LIMIT 1" % self.connection.ops.quote_name(table_name))
- return [
- FieldInfo(
- line.name,
- line.type_code,
- line.display_size,
- line.internal_size,
- line.precision,
- line.scale,
- *field_map[line.name],
- )
- for line in cursor.description
- ]
-
- def get_sequences(self, cursor, table_name, table_fields=()):
- cursor.execute("""
- SELECT s.relname as sequence_name, col.attname
- FROM pg_class s
- JOIN pg_namespace sn ON sn.oid = s.relnamespace
- JOIN pg_depend d ON d.refobjid = s.oid AND d.refclassid = 'pg_class'::regclass
- JOIN pg_attrdef ad ON ad.oid = d.objid AND d.classid = 'pg_attrdef'::regclass
- JOIN pg_attribute col ON col.attrelid = ad.adrelid AND col.attnum = ad.adnum
- JOIN pg_class tbl ON tbl.oid = ad.adrelid
- JOIN pg_namespace n ON n.oid = tbl.relnamespace
- WHERE s.relkind = 'S'
- AND d.deptype in ('a', 'n')
- AND n.nspname = 'public'
- AND tbl.relname = %s
- """, [table_name])
- return [
- {'name': row[0], 'table': table_name, 'column': row[1]}
- for row in cursor.fetchall()
- ]
-
- def get_relations(self, cursor, table_name):
- """
- Return a dictionary of {field_name: (field_name_other_table, other_table)}
- representing all relationships to the given table.
- """
- cursor.execute("""
- SELECT c2.relname, a1.attname, a2.attname
- FROM pg_constraint con
- LEFT JOIN pg_class c1 ON con.conrelid = c1.oid
- LEFT JOIN pg_class c2 ON con.confrelid = c2.oid
- LEFT JOIN pg_attribute a1 ON c1.oid = a1.attrelid AND a1.attnum = con.conkey[1]
- LEFT JOIN pg_attribute a2 ON c2.oid = a2.attrelid AND a2.attnum = con.confkey[1]
- WHERE c1.relname = %s AND con.contype = 'f'
- """, [table_name])
- return {row[1]: (row[2], row[0]) for row in cursor.fetchall()}
-
- def get_key_columns(self, cursor, table_name):
- cursor.execute("""
- SELECT kcu.column_name, ccu.table_name AS referenced_table, ccu.column_name AS referenced_column
- FROM information_schema.constraint_column_usage ccu
- LEFT JOIN information_schema.key_column_usage kcu
- ON ccu.constraint_catalog = kcu.constraint_catalog
- AND ccu.constraint_schema = kcu.constraint_schema
- AND ccu.constraint_name = kcu.constraint_name
- LEFT JOIN information_schema.table_constraints tc
- ON ccu.constraint_catalog = tc.constraint_catalog
- AND ccu.constraint_schema = tc.constraint_schema
- AND ccu.constraint_name = tc.constraint_name
- WHERE kcu.table_name = %s AND tc.constraint_type = 'FOREIGN KEY'
- """, [table_name])
- return cursor.fetchall()
-
- def get_constraints(self, cursor, table_name):
- """
- Retrieve any constraints or keys (unique, pk, fk, check, index) across
- one or more columns. Also retrieve the definition of expression-based
- indexes.
- """
- constraints = {}
- # Loop over the key table, collecting things as constraints. The column
- # array must return column names in the same order in which they were
- # created.
- cursor.execute("""
- SELECT
- c.conname,
- array(
- SELECT attname
- FROM unnest(c.conkey) WITH ORDINALITY cols(colid, arridx)
- JOIN pg_attribute AS ca ON cols.colid = ca.attnum
- WHERE ca.attrelid = c.conrelid
- ORDER BY cols.arridx
- ),
- c.contype,
- (SELECT fkc.relname || '.' || fka.attname
- FROM pg_attribute AS fka
- JOIN pg_class AS fkc ON fka.attrelid = fkc.oid
- WHERE fka.attrelid = c.confrelid AND fka.attnum = c.confkey[1]),
- cl.reloptions
- FROM pg_constraint AS c
- JOIN pg_class AS cl ON c.conrelid = cl.oid
- JOIN pg_namespace AS ns ON cl.relnamespace = ns.oid
- WHERE ns.nspname = %s AND cl.relname = %s
- """, ["public", table_name])
- for constraint, columns, kind, used_cols, options in cursor.fetchall():
- constraints[constraint] = {
- "columns": columns,
- "primary_key": kind == "p",
- "unique": kind in ["p", "u"],
- "foreign_key": tuple(used_cols.split(".", 1)) if kind == "f" else None,
- "check": kind == "c",
- "index": False,
- "definition": None,
- "options": options,
- }
- # Now get indexes
- cursor.execute("""
- SELECT
- indexname, array_agg(attname ORDER BY arridx), indisunique, indisprimary,
- array_agg(ordering ORDER BY arridx), amname, exprdef, s2.attoptions
- FROM (
- SELECT
- c2.relname as indexname, idx.*, attr.attname, am.amname,
- CASE
- WHEN idx.indexprs IS NOT NULL THEN
- pg_get_indexdef(idx.indexrelid)
- END AS exprdef,
- CASE am.amname
- WHEN 'btree' THEN
- CASE (option & 1)
- WHEN 1 THEN 'DESC' ELSE 'ASC'
- END
- END as ordering,
- c2.reloptions as attoptions
- FROM (
- SELECT *
- FROM pg_index i, unnest(i.indkey, i.indoption) WITH ORDINALITY koi(key, option, arridx)
- ) idx
- LEFT JOIN pg_class c ON idx.indrelid = c.oid
- LEFT JOIN pg_class c2 ON idx.indexrelid = c2.oid
- LEFT JOIN pg_am am ON c2.relam = am.oid
- LEFT JOIN pg_attribute attr ON attr.attrelid = c.oid AND attr.attnum = idx.key
- WHERE c.relname = %s
- ) s2
- GROUP BY indexname, indisunique, indisprimary, amname, exprdef, attoptions;
- """, [table_name])
- for index, columns, unique, primary, orders, type_, definition, options in cursor.fetchall():
- if index not in constraints:
- basic_index = type_ == 'btree' and not index.endswith('_btree') and options is None
- constraints[index] = {
- "columns": columns if columns != [None] else [],
- "orders": orders if orders != [None] else [],
- "primary_key": primary,
- "unique": unique,
- "foreign_key": None,
- "check": False,
- "index": True,
- "type": Index.suffix if basic_index else type_,
- "definition": definition,
- "options": options,
- }
- return constraints
|