Source code for sdssdb.utils.internals

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2020-03-28
# @Filename: internals.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

import collections
import re
import sys
import time

from peewee import DoubleField, FloatField
from playhouse.reflection import PostgresqlMetadata, UnknownField


__all__ = ('vacuum', 'vacuum_all', 'get_cluster_index', 'get_unclustered_tables',
           'get_row_count', 'is_table_locked', 'get_database_columns')


[docs] def vacuum(database, table_name, analyze=True, verbose=False, schema=None): """Vacuums (and optionally analyses) a table. Parameters ---------- database : .PeeweeDatabaseConnection A PeeWee connection to the database to vacuum. table_name : str The table name. analyze : bool Whether to run ``ANALYZE`` when vacuumming. verbose : bool Whether to run in verbose mode. schema : str The schema to vacuum. If `None`, vacuums the entire database. """ def execute_sql(statement): tstart = time.time() # Change isolation level to allow executing commands such as VACUUM. connection = database.connection() original_isolation_level = connection.isolation_level connection.set_isolation_level(0) database.execute_sql(statement) tend = time.time() telapsed = tend - tstart if 'VEBOSE' in statement: print(f'Elapsed {telapsed:.1f} s') connection.set_isolation_level(original_isolation_level) assert database.is_connection_usable(), 'connection is not usable.' table_name = table_name if schema is None else schema + '.' + table_name statement = ('VACUUM' + (' VEBOSE' if verbose else '') + (' ANALYZE' if analyze else '') + ' ' + table_name) with database.atomic(): execute_sql(statement)
[docs] def vacuum_all(database, analyze=True, verbose=False, schema=None): """Vacuums all the tables in a database or schema. Parameters ---------- database : .PeeweeDatabaseConnection A PeeWee connection to the database to vacuum. analyze : bool Whether to run ``ANALYZE`` when vacuumming. verbose : bool Whether to run in verbose mode. schema : str The schema to vacuum. If `None`, vacuums the entire database. """ def execute_sql(statement): if 'VEBOSE' in statement: print(statement + ' ... ') else: sys.stdout.write(statement + ' ... ') sys.stdout.flush() tstart = time.time() # Change isolation level to allow executing commands such as VACUUM. connection = database.connection() original_isolation_level = connection.isolation_level connection.set_isolation_level(0) database.execute_sql(statement) tend = time.time() telapsed = tend - tstart if 'VEBOSE' in statement: print(f'Elapsed {telapsed:.1f} s') else: print(f'{telapsed:.1f} s') connection.set_isolation_level(original_isolation_level) assert database.is_connection_usable(), 'connection is not usable.' if schema is None: statement = 'VACUUM' + (' VEBOSE' if verbose else '') + (' ANALYZE' if analyze else '') with database.atomic(): execute_sql(statement) return tables = database.get_tables(schema=schema) for table in sorted(tables): table_name = table if schema is None else schema + '.' + table statement = ('VACUUM' + (' VEBOSE' if verbose else '') + (' ANALYZE' if analyze else '') + ' ' + table_name) with database.atomic(): execute_sql(statement)
[docs] def get_cluster_index(connection, table_name=None, schema=None): """Returns a tuple with the index on which a table has been clustered.""" table_name = table_name or '%%' schema = schema or '%%' with connection.atomic(): cursor = connection.execute_sql(f""" SELECT c.relname AS tablename, n.nspname AS schemaname, i.relname AS indexname FROM pg_index x JOIN pg_class c ON c.oid = x.indrelid JOIN pg_class i ON i.oid = x.indexrelid JOIN pg_namespace n ON n.oid = i.relnamespace WHERE x.indisclustered AND n.nspname LIKE '{schema}' AND c.relname LIKE '{table_name}'; """) return cursor.fetchall()
[docs] def get_unclustered_tables(connection, schema=None): """Lists tables not clustered.""" schema_like = schema or '%%' with connection.atomic(): table_names = connection.execute_sql(f""" SELECT tablename FROM pg_tables WHERE schemaname LIKE '{schema_like}'; """).fetchall() table_names = [table_name[0] for table_name in table_names] clustered = get_cluster_index(connection, schema=schema) if schema: clustered = [idx for idx in clustered if idx[1] == schema] unclustered = [table for table in table_names if table not in list(zip(*clustered))[0]] return unclustered
[docs] def get_row_count(connection, table_name, schema=None, approximate=True): """Returns the model row count. Parameters ---------- connection : .PeeweeDatabaseConnection The database connection. table : str The table name. schema : str The schema in which the table lives. approximate : bool If True, returns the approximate row count from the ``pg_class`` table (much faster). Otherwise calculates the exact count. """ if approximate: if schema is None: sql = ('SELECT reltuples AS approximate_row_count ' f'FROM pg_class WHERE relname = {table_name!r};') else: sql = ('SELECT reltuples AS approximate_row_count FROM pg_class ' 'JOIN pg_namespace ON pg_namespace.oid = pg_class.relnamespace ' f'WHERE relname = {table_name!r} AND pg_namespace.nspname = {schema!r};') else: if schema is None: sql = f'SELECT count(*) FROM {table_name};' else: sql = f'SELECT count(*) FROM {schema}.{table_name};' with connection.atomic(): count = connection.execute_sql(sql.format(table_name=table_name, schema=schema)).fetchall() if len(count) == 0: raise ValueError(f'failed retrieving the row count for table {table_name!r}' 'Check the table name and schema.') return count[0][0]
[docs] def is_table_locked(connection, table_name, schema=None): """Returns the locks for a table or `None` if no lock is present.""" schema = schema or '%%' sql = ('SELECT mode FROM pg_locks JOIN pg_class ' 'ON pg_class.oid = pg_locks.relation ' 'JOIN pg_namespace ON pg_namespace.oid = pg_class.relnamespace ' f'WHERE pg_class.relname = {table_name!r} ' f'AND pg_namespace.nspname LIKE {schema!r};') with connection.atomic(): locks = connection.execute_sql(sql).fetchall() if not locks: return None return list(zip(*locks))[0]
[docs] def get_database_columns(database, schema=None): """Returns database column metadata. Queries the ``pg_catalog`` schema to retrieve column information for all the tables in a schema. Parameters ---------- database : .PeeweeDatabaseConnection The database connection. schema : str The schema to query. Defaults to the public schema. Returns ------- metadata : `dict` A mapping keyed by the table name. For each table the list of primary keys is accessible via the key ``pk``, as well as the column metadata as ``columns``. Each column metadata is a named tuple with attributes ``name``, ``field_type`` (the Peewee column class), ``array_type``, and ``nullable``. """ metadata = {} ColumnMetadata = collections.namedtuple('ColumnMetadata', ('name', 'field_type', 'array_type', 'nullable')) schema = schema or 'public' # Get column type mapping. pg_metadata = PostgresqlMetadata(database) column_map = pg_metadata.column_map # Add array type for double pg_metadata.array_types[1021] = FloatField pg_metadata.array_types[1022] = DoubleField # Get the mapping of oid to relation (table) relids = dict(database.execute_sql( """SELECT c.oid, c.relname from pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid WHERE n.nspname = %s;""", (schema,)).fetchall()) # Get columns and create a mapping of table_name to column # name and field type. attr_data = database.execute_sql( 'SELECT attname, attrelid, atttypid, attnotnull ' 'FROM pg_catalog.pg_attribute a ' 'JOIN pg_catalog.pg_class c ON a.attrelid = c.oid ' 'JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid ' 'WHERE n.nspname = %s AND attnum > %s AND attisdropped IS FALSE AND ' ' c.reltype > %s', (schema, 0, 0)).fetchall() attrs_map = collections.defaultdict(list) for col_name, relid, typeid, not_null in attr_data: field_type = column_map.get(typeid, UnknownField) array_type = pg_metadata.array_types.get(typeid, False) null = not not_null attrs_map[relids[relid]].append((col_name, field_type, array_type, null)) # Get primary keys. Do not use information_schema.table_constraints # because that requires the user to have write access to the table. constraint_query = """ SELECT conrelid::regclass::text AS table_name, pg_get_constraintdef(c.oid) AS cdef FROM pg_constraint c JOIN pg_namespace n ON n.oid = c.connamespace WHERE contype IN (%s) AND n.nspname = %s ORDER BY conrelid::regclass::text, contype DESC; """ pks = database.execute_sql(constraint_query, ('pk', schema)).fetchall() pk_map = {} pk_re = re.compile(r'PRIMARY KEY \((.+)\)') for table_name, cdef in pks: cols = list(map(lambda x: x.strip(), pk_re.match(cdef).group(1).split(','))) pk_map[table_name.replace(schema + '.', '')] = cols # Compile the information in the metadata dictionary. for table_name in attrs_map: metadata[table_name] = {} metadata[table_name]['columns'] = [ColumnMetadata(*data) for data in attrs_map[table_name]] metadata[table_name]['pk'] = pk_map.get(table_name, None) return metadata