Source code for sdssdb.utils.internals

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2020-03-28
# @Filename: internals.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

import sys
import time


__all__ = ('vacuum', 'vacuum_all', 'get_cluster_index', 'get_unclustered_tables',
           'get_row_count', 'is_table_locked')


[docs]def vacuum(database, table_name, analyze=True, verbose=False, schema=None): """Vacuums (and optionally analyses) a table. Parameters ---------- database : .PeeweeDatabaseConnection A PeeWee connection to the database to vacuum. table_name : str The table name. analyze : bool Whether to run ``ANALYZE`` when vacuumming. verbose : bool Whether to run in verbose mode. schema : str The schema to vacuum. If `None`, vacuums the entire database. """ def execute_sql(statement): tstart = time.time() # Change isolation level to allow executing commands such as VACUUM. connection = database.connection() original_isolation_level = connection.isolation_level connection.set_isolation_level(0) database.execute_sql(statement) tend = time.time() telapsed = tend - tstart if 'VEBOSE' in statement: print(f'Elapsed {telapsed:.1f} s') connection.set_isolation_level(original_isolation_level) assert database.is_connection_usable(), 'connection is not usable.' table_name = table_name if schema is None else schema + '.' + table_name statement = ('VACUUM' + (' VEBOSE' if verbose else '') + (' ANALYZE' if analyze else '') + ' ' + table_name) with database.atomic(): execute_sql(statement)
[docs]def vacuum_all(database, analyze=True, verbose=False, schema=None): """Vacuums all the tables in a database or schema. Parameters ---------- database : .PeeweeDatabaseConnection A PeeWee connection to the database to vacuum. analyze : bool Whether to run ``ANALYZE`` when vacuumming. verbose : bool Whether to run in verbose mode. schema : str The schema to vacuum. If `None`, vacuums the entire database. """ def execute_sql(statement): if 'VEBOSE' in statement: print(statement + ' ... ') else: sys.stdout.write(statement + ' ... ') sys.stdout.flush() tstart = time.time() # Change isolation level to allow executing commands such as VACUUM. connection = database.connection() original_isolation_level = connection.isolation_level connection.set_isolation_level(0) database.execute_sql(statement) tend = time.time() telapsed = tend - tstart if 'VEBOSE' in statement: print(f'Elapsed {telapsed:.1f} s') else: print(f'{telapsed:.1f} s') connection.set_isolation_level(original_isolation_level) assert database.is_connection_usable(), 'connection is not usable.' if schema is None: statement = 'VACUUM' + (' VEBOSE' if verbose else '') + (' ANALYZE' if analyze else '') with database.atomic(): execute_sql(statement) return tables = database.get_tables(schema=schema) for table in sorted(tables): table_name = table if schema is None else schema + '.' + table statement = ('VACUUM' + (' VEBOSE' if verbose else '') + (' ANALYZE' if analyze else '') + ' ' + table_name) with database.atomic(): execute_sql(statement)
[docs]def get_cluster_index(connection, table_name=None, schema=None): """Returns a tuple with the index on which a table has been clustered.""" table_name = table_name or '%%' schema = schema or '%%' with connection.atomic(): cursor = connection.execute_sql(f""" SELECT c.relname AS tablename, n.nspname AS schemaname, i.relname AS indexname FROM pg_index x JOIN pg_class c ON c.oid = x.indrelid JOIN pg_class i ON i.oid = x.indexrelid JOIN pg_namespace n ON n.oid = i.relnamespace WHERE x.indisclustered AND n.nspname LIKE '{schema}' AND c.relname LIKE '{table_name}'; """) return cursor.fetchall()
[docs]def get_unclustered_tables(connection, schema=None): """Lists tables not clustered.""" schema_like = schema or '%%' with connection.atomic(): table_names = connection.execute_sql(f""" SELECT tablename FROM pg_tables WHERE schemaname LIKE '{schema_like}'; """).fetchall() table_names = [table_name[0] for table_name in table_names] clustered = get_cluster_index(connection, schema=schema) if schema: clustered = [idx for idx in clustered if idx[1] == schema] unclustered = [table for table in table_names if table not in list(zip(*clustered))[0]] return unclustered
[docs]def get_row_count(connection, table_name, schema=None, approximate=True): """Returns the model row count. Parameters ---------- connection : .PeeweeDatabaseConnection The database connection. table : str The table name. schema : str The schema in which the table lives. approximate : bool If True, returns the approximate row count from the ``pg_class`` table (much faster). Otherwise calculates the exact count. """ if approximate: if schema is None: sql = ('SELECT reltuples AS approximate_row_count ' 'FROM pg_class WHERE relname = \'{table_name}\';') else: sql = ('SELECT reltuples AS approximate_row_count FROM pg_class ' 'JOIN pg_namespace ON pg_namespace.oid = pg_class.relnamespace ' 'WHERE relname = \'{table_name}\' AND pg_namespace.nspname = \'{schema}\';') else: if schema is None: sql = 'SELECT count(*) FROM {table_name};' else: sql = 'SELECT count(*) FROM {schema}.{table_name};' with connection.atomic(): count = connection.execute_sql(sql.format(table_name=table_name, schema=schema)).fetchall() if len(count) == 0: raise ValueError('failed retrieving the row count. Check the table name and schema.') return count[0][0]
[docs]def is_table_locked(connection, table_name, schema=None): """Returns the locks for a table or `None` if no lock is present.""" schema = schema or '%%' sql = ('SELECT mode FROM pg_locks JOIN pg_class ' 'ON pg_class.oid = pg_locks.relation ' 'JOIN pg_namespace ON pg_namespace.oid = pg_class.relnamespace ' f'WHERE pg_class.relname = {table_name!r} ' f'AND pg_namespace.nspname LIKE {schema!r};') with connection.atomic(): locks = connection.execute_sql(sql).fetchall() if not locks: return None return list(zip(*locks))[0]