#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2020-03-28
# @Filename: internals.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
import collections
import re
import sys
import time
from peewee import DoubleField, FloatField
from playhouse.reflection import PostgresqlMetadata, UnknownField
__all__ = (
"vacuum",
"vacuum_all",
"get_cluster_index",
"get_unclustered_tables",
"get_row_count",
"is_table_locked",
"get_database_columns",
)
[docs]
def vacuum(database, table_name, analyze=True, verbose=False, schema=None):
"""Vacuums (and optionally analyses) a table.
Parameters
----------
database : .PeeweeDatabaseConnection
A PeeWee connection to the database to vacuum.
table_name : str
The table name.
analyze : bool
Whether to run ``ANALYZE`` when vacuumming.
verbose : bool
Whether to run in verbose mode.
schema : str
The schema to vacuum. If `None`, vacuums the entire database.
"""
def execute_sql(statement):
tstart = time.time()
# Change isolation level to allow executing commands such as VACUUM.
connection = database.connection()
original_isolation_level = connection.isolation_level
connection.set_isolation_level(0)
database.execute_sql(statement)
tend = time.time()
telapsed = tend - tstart
if "VEBOSE" in statement:
print(f"Elapsed {telapsed:.1f} s")
connection.set_isolation_level(original_isolation_level)
assert database.is_connection_usable(), "connection is not usable."
table_name = table_name if schema is None else schema + "." + table_name
statement = (
"VACUUM"
+ (" VEBOSE" if verbose else "")
+ (" ANALYZE" if analyze else "")
+ " "
+ table_name
)
with database.atomic():
execute_sql(statement)
[docs]
def vacuum_all(database, analyze=True, verbose=False, schema=None):
"""Vacuums all the tables in a database or schema.
Parameters
----------
database : .PeeweeDatabaseConnection
A PeeWee connection to the database to vacuum.
analyze : bool
Whether to run ``ANALYZE`` when vacuumming.
verbose : bool
Whether to run in verbose mode.
schema : str
The schema to vacuum. If `None`, vacuums the entire database.
"""
def execute_sql(statement):
if "VEBOSE" in statement:
print(statement + " ... ")
else:
sys.stdout.write(statement + " ... ")
sys.stdout.flush()
tstart = time.time()
# Change isolation level to allow executing commands such as VACUUM.
connection = database.connection()
original_isolation_level = connection.isolation_level
connection.set_isolation_level(0)
database.execute_sql(statement)
tend = time.time()
telapsed = tend - tstart
if "VEBOSE" in statement:
print(f"Elapsed {telapsed:.1f} s")
else:
print(f"{telapsed:.1f} s")
connection.set_isolation_level(original_isolation_level)
assert database.is_connection_usable(), "connection is not usable."
if schema is None:
statement = "VACUUM" + (" VEBOSE" if verbose else "") + (" ANALYZE" if analyze else "")
with database.atomic():
execute_sql(statement)
return
tables = database.get_tables(schema=schema)
for table in sorted(tables):
table_name = table if schema is None else schema + "." + table
statement = (
"VACUUM"
+ (" VEBOSE" if verbose else "")
+ (" ANALYZE" if analyze else "")
+ " "
+ table_name
)
with database.atomic():
execute_sql(statement)
[docs]
def get_cluster_index(connection, table_name=None, schema=None):
"""Returns a tuple with the index on which a table has been clustered."""
table_name = table_name or "%%"
schema = schema or "%%"
with connection.atomic():
cursor = connection.execute_sql(f"""
SELECT
c.relname AS tablename,
n.nspname AS schemaname,
i.relname AS indexname
FROM pg_index x
JOIN pg_class c ON c.oid = x.indrelid
JOIN pg_class i ON i.oid = x.indexrelid
JOIN pg_namespace n ON n.oid = i.relnamespace
WHERE
x.indisclustered AND
n.nspname LIKE '{schema}' AND
c.relname LIKE '{table_name}';
""")
return cursor.fetchall()
[docs]
def get_unclustered_tables(connection, schema=None):
"""Lists tables not clustered."""
schema_like = schema or "%%"
with connection.atomic():
table_names = connection.execute_sql(f"""
SELECT
tablename
FROM pg_tables
WHERE
schemaname LIKE '{schema_like}';
""").fetchall()
table_names = [table_name[0] for table_name in table_names]
clustered = get_cluster_index(connection, schema=schema)
if schema:
clustered = [idx for idx in clustered if idx[1] == schema]
unclustered = [table for table in table_names if table not in list(zip(*clustered))[0]]
return unclustered
[docs]
def get_row_count(connection, table_name, schema=None, approximate=True):
"""Returns the model row count.
Parameters
----------
connection : .PeeweeDatabaseConnection
The database connection.
table : str
The table name.
schema : str
The schema in which the table lives.
approximate : bool
If True, returns the approximate row count from the ``pg_class``
table (much faster). Otherwise calculates the exact count.
"""
if approximate:
if schema is None:
sql = (
"SELECT reltuples AS approximate_row_count "
f"FROM pg_class WHERE relname = {table_name!r};"
)
else:
sql = (
"SELECT reltuples AS approximate_row_count FROM pg_class "
"JOIN pg_namespace ON pg_namespace.oid = pg_class.relnamespace "
f"WHERE relname = {table_name!r} AND pg_namespace.nspname = {schema!r};"
)
else:
if schema is None:
sql = f"SELECT count(*) FROM {table_name};"
else:
sql = f"SELECT count(*) FROM {schema}.{table_name};"
with connection.atomic():
count = connection.execute_sql(sql.format(table_name=table_name, schema=schema)).fetchall()
if len(count) == 0:
raise ValueError(
f"failed retrieving the row count for table {table_name!r}"
"Check the table name and schema."
)
return count[0][0]
[docs]
def is_table_locked(connection, table_name, schema=None):
"""Returns the locks for a table or `None` if no lock is present."""
schema = schema or "%%"
sql = (
"SELECT mode FROM pg_locks JOIN pg_class "
"ON pg_class.oid = pg_locks.relation "
"JOIN pg_namespace ON pg_namespace.oid = pg_class.relnamespace "
f"WHERE pg_class.relname = {table_name!r} "
f"AND pg_namespace.nspname LIKE {schema!r};"
)
with connection.atomic():
locks = connection.execute_sql(sql).fetchall()
if not locks:
return None
return list(zip(*locks))[0]
[docs]
def get_database_columns(database, schema=None):
"""Returns database column metadata.
Queries the ``pg_catalog`` schema to retrieve column information
for all the tables in a schema.
Parameters
----------
database : .PeeweeDatabaseConnection
The database connection.
schema : str
The schema to query. Defaults to the public schema.
Returns
-------
metadata : `dict`
A mapping keyed by the table name. For each table the list of
primary keys is accessible via the key ``pk``, as well as the column
metadata as ``columns``. Each column metadata is a named tuple with
attributes ``name``, ``field_type`` (the Peewee column class),
``array_type``, and ``nullable``.
"""
metadata = {}
ColumnMetadata = collections.namedtuple(
"ColumnMetadata", ("name", "field_type", "array_type", "nullable")
)
schema = schema or "public"
# Get column type mapping.
pg_metadata = PostgresqlMetadata(database)
column_map = pg_metadata.column_map
# Add array type for double
pg_metadata.array_types[1021] = FloatField
pg_metadata.array_types[1022] = DoubleField
# Get the mapping of oid to relation (table)
relids = dict(
database.execute_sql(
"""SELECT c.oid, c.relname from pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname = %s;""",
(schema,),
).fetchall()
)
# Get columns and create a mapping of table_name to column
# name and field type.
attr_data = database.execute_sql(
"SELECT attname, attrelid, atttypid, attnotnull "
"FROM pg_catalog.pg_attribute a "
"JOIN pg_catalog.pg_class c ON a.attrelid = c.oid "
"JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid "
"WHERE n.nspname = %s AND attnum > %s AND attisdropped IS FALSE AND "
" c.reltype > %s",
(schema, 0, 0),
).fetchall()
attrs_map = collections.defaultdict(list)
for col_name, relid, typeid, not_null in attr_data:
field_type = column_map.get(typeid, UnknownField)
array_type = pg_metadata.array_types.get(typeid, False)
null = not not_null
attrs_map[relids[relid]].append((col_name, field_type, array_type, null))
# Get primary keys. Do not use information_schema.table_constraints
# because that requires the user to have write access to the table.
constraint_query = """
SELECT conrelid::regclass::text AS table_name,
pg_get_constraintdef(c.oid) AS cdef
FROM pg_constraint c
JOIN pg_namespace n ON n.oid = c.connamespace
WHERE contype IN (%s) AND n.nspname = %s
ORDER BY conrelid::regclass::text, contype DESC;
"""
pks = database.execute_sql(constraint_query, ("pk", schema)).fetchall()
pk_map = {}
pk_re = re.compile(r"PRIMARY KEY \((.+)\)")
for table_name, cdef in pks:
cols = list(map(lambda x: x.strip(), pk_re.match(cdef).group(1).split(",")))
pk_map[table_name.replace(schema + ".", "")] = cols
# Compile the information in the metadata dictionary.
for table_name in attrs_map:
metadata[table_name] = {}
metadata[table_name]["columns"] = [ColumnMetadata(*data) for data in attrs_map[table_name]]
metadata[table_name]["pk"] = pk_map.get(table_name, None)
return metadata