Source code for geomcompare.geomrefdb

#!/usr/bin/env python
# -*- coding: utf-8 -*-


import inspect
import logging
import multiprocessing as mp
import os
import sqlite3
import time
import uuid
from tempfile import NamedTemporaryFile
from typing import Optional, Literal
from collections.abc import Generator, Callable

import psycopg2
import pyproj
import rtree
import shapely.ops
from pyproj.exceptions import CRSError
from shapely import speedups, wkb

from .comparefunc import _geoms_always_match
from ._geomrefdb_abc import GeomRefDB
from .geomutils import _geom_type_mapping, get_transform_func, _to_2D, _unchanged_geom, GeomObject
from .io import _setup_logger, _update_logger, GeometryIterable
from ._misc import SharedIterator, split_iter_to_lists


[docs]class PostGISGeomRefDB(GeomRefDB): def __init__(self, PG_params, PG_schema, PG_table, PG_geoms_column): self.PG_params = PG_params self.PG_conn = psycopg2.connect(**PG_params) self.PG_schema = PG_schema self.PG_table = PG_table self.PG_geoms_column = PG_geoms_column def __del__(self): self.PG_conn = None def __getstate__(self): attrs = self.__dict__.copy() attrs["PG_conn"] = None return attrs def __setstate__(self, state): self.__dict__ = state self.PG_conn = psycopg2.connect(**self.PG_params) def get_PG_geoms_EPSG(self): PG_cursor = self.PG_conn.cursor() PG_cursor.execute( f"SELECT Find_SRID('{self.PG_schema}', '{self.PG_table}', " f"'{self.PG_geoms_column}')" ) res = PG_cursor.fetchone() PG_cursor = None return int(res[0])
[docs] def true_positives(self, geoms_iter, geoms_EPSG, same_geoms_func): self.logger = _setup_logger() self.logger.info("Searching true positive geometries...") PG_geoms_EPSG = self.get_PG_geoms_EPSG() transform = PG_geoms_EPSG != int(geoms_EPSG) path2table = ".".join([self.PG_schema, self.PG_table]) if transform: SQL_query_template = ( f"SELECT ST_AsBinary(ST_Transform({self.PG_geoms_column}," f"{geoms_EPSG})) " f"FROM {path2table} " f"WHERE ST_Intersects({self.PG_geoms_column}, " f"ST_Transform(ST_GeomFromText('{{geom.wkt}}', {geoms_EPSG}), " f"{PG_geoms_EPSG}))" ) else: SQL_query_template = ( f"SELECT ST_AsBinary({self.PG_geoms_column}) " f"FROM {path2table} " f"WHERE ST_Intersects({self.PG_geoms_column}, " f"ST_GeomFromText('{{geom.wkt}}', {geoms_EPSG}))" ) PG_cursor = self.PG_conn.cursor() for geom in geoms_iter: PG_cursor.execute(SQL_query_template.format(geom=geom)) for row in PG_cursor: PG_geom = wkb.loads(row[0].tobytes()) if same_geoms_func(geom, PG_geom): yield geom break PG_cursor = None logger.info("Done searching true positive geometries.")
[docs] def false_positives(self, geoms_iter, geoms_EPSG, same_geoms_func): logger = _setup_logger() logger.info("Searching false positive geometries...") PG_geoms_EPSG = self.get_PG_geoms_EPSG() transform = PG_geoms_EPSG != int(geoms_EPSG) path2table = ".".join([self.PG_schema, self.PG_table]) if transform: SQL_query_template = ( f"SELECT ST_AsBinary(ST_Transform({self.PG_geoms_column}," f"{geoms_EPSG})) " f"FROM {path2table} " f"WHERE ST_Intersects({self.PG_geoms_column}, " f"ST_Transform(ST_GeomFromText('{{geom.wkt}}', " f"{geoms_EPSG}), {PG_geoms_EPSG}))" ) else: SQL_query_template = ( f"SELECT ST_AsBinary({self.PG_geoms_column}) " f"FROM {path2table} " f"WHERE ST_Intersects({self.PG_geoms_column}, " f"ST_GeomFromText('{{geom.wkt}}', {geoms_EPSG}))" ) PG_cursor = self.PG_conn.cursor() for geom in geoms_iter: PG_cursor.execute(SQL_query_template.format(geom=geom)) false_positive = True for row in PG_cursor: PG_geom = wkb.loads(row[0].tobytes()) if same_geoms_func(geom, PG_geom): false_positive = False break if false_positive: yield geom PG_cursor = None logger.info("Done searching false positive geometries.")
[docs] def missing_geometries(self, geoms_iter, AOI_geom, geoms_EPSG, same_geoms_func): logger = _setup_logger() logger.info("Searching missing geometries...") PG_geoms_EPSG = self.get_PG_geoms_EPSG() transform = PG_geoms_EPSG != int(geoms_EPSG) path2table = ".".join([self.PG_schema, self.PG_table]) if transform: PG_crs = pyproj.CRS(f"EPSG:{PG_geoms_EPSG}") AOI_crs = pyproj.CRS(f"EPSG:{geoms_EPSG}") project = pyproj.Transformer.from_crs( AOI_crs, PG_crs, always_xy=True ).transform aoi_wkt = shapely.ops.transform(project, AOI_geom).wkt SQL_query = ( f"SELECT ST_AsBinary(ST_Transform({self.PG_geoms_column}," f"{geoms_EPSG})) " f"FROM {path2table} " f"WHERE ST_Intersects({self.PG_geoms_column}, " f"ST_GeomFromText('{aoi_wkt}', {PG_geoms_EPSG}))" ) else: SQL_query = ( f"SELECT ST_AsBinary({self.PG_geoms_column}) " f"FROM {path2table} " f"WHERE ST_Intersects({self.PG_geoms_column}, " f"ST_GeomFromText('{AOI_geom.wkt}', {PG_geoms_EPSG}))" ) PG_cursor = self.PG_conn.cursor() PG_cursor.execute(SQL_query) index = rtree.index.Index() for i, geom in enumerate(geoms_iter): index.insert(i, geom.bounds, obj=geom) for row in PG_cursor: PG_geom = wkb.loads(row[0].tobytes()) if not any( same_geoms_func(el.object, PG_geom) for el in index.intersection(PG_geom.bounds, objects=True) ): yield PG_geom PG_cursor = None logger.info("Done searching missing geometries.")
[docs]class RtreeGeomRefDB(GeomRefDB): def __init__(self, geoms_iter, geoms_EPSG): self.index = rtree.index.Index() for i, geom in enumerate(geoms_iter): self.index.insert(i, geom.bounds, obj=geom) self.EPSG = geoms_EPSG
[docs] def true_positives(self, geoms_iter, geoms_EPSG, same_geoms_func): logger = _setup_logger() logger.info("Searching true positive geometries...") transform = geoms_EPSG != self.EPSG if transform: geoms_crs = pyproj.CRS(f"EPSG:{geoms_EPSG}") ref_DB_crs = pyproj.CRS(f"EPSG:{self.EPSG}") project = pyproj.Transformer.from_crs( geoms_crs, ref_DB_crs, always_xy=True ).transform for geom in geoms_iter: geom_reproj = shapely.ops.transform(project, geom) if any( same_geoms_func(geom_reproj, el.object) for el in self.index.intersection(geom_reproj.bounds, objects=True) ): yield geom else: for geom in geoms_iter: if any( same_geoms_func(geom, el.object) for el in self.index.intersection(geom.bounds, objects=True) ): yield geom logger.info("Done searching true positive geometries.")
[docs] def false_positives(self, geoms_iter, geoms_EPSG, same_geoms_func): logger = _setup_logger() logger.info("Searching false positive geometries...") transform = geoms_EPSG != self.EPSG if transform: geoms_crs = pyproj.CRS(f"EPSG:{geoms_EPSG}") ref_DB_crs = pyproj.CRS(f"EPSG:{self.EPSG}") project = pyproj.Transformer.from_crs( geoms_crs, ref_DB_crs_crs, always_xy=True ).transform for geom in geoms_iter: geom_reproj = shapely.ops.transform(project, geom) if not any( same_geoms_func(geom_reproj, el.object) for el in self.index.intersection(geom_reproj.bounds, objects=True) ): yield geom else: for geom in geoms_iter: if not any( same_geoms_func(geom, el.object) for el in self.index.intersection(geom.bounds, objects=True) ): yield geom logger.info("Done searching false positive geometries.")
def intersecting_idx_geoms(self, poly=None, bounds=None): if poly is not None: for el in self.index.intersection(poly.bounds, objects=True): idx_geom = el.object if poly.intersects(idx_geom): yield idx_geom else: if bounds is None: bounds = self.index.bounds for el in self.index.intersection(bounds, objects=True): yield el.object
[docs] def missing_geometries(self, geoms_iter, AOI_geom, geoms_EPSG, same_geoms_func): logger = _setup_logger() logger.info("Searching missing geometries...") index = rtree.index.Index() for i, geom in enumerate(geoms_iter): index.insert(i, geom.bounds, obj=geom) transform = geoms_EPSG != self.EPSG if transform: geoms_crs = pyproj.CRS(f"EPSG:{geoms_EPSG}") ref_DB_crs = pyproj.CRS(f"EPSG:{self.EPSG}") project = pyproj.Transformer.from_crs( ref_DB_crs, geoms_crs, always_xy=True ).transform if AOI_geom is not None: project_AOI = pyproj.Transformer.from_crs( geoms_crs, ref_DB_crs, always_xy=True ).transform AOI_geom = shapely.ops.transform(project_AOI, AOI_geom) for ref_geom in self.intersecting_idx_geoms(poly=AOI_geom): ref_geom = shapely.ops.transform(project, ref_geom) if not any( same_geoms_func(el.object, ref_geom) for el in index.intersection(ref_geom.bounds, objects=True) ): yield ref_geom else: for ref_geom in self.intersecting_idx_geoms(poly=AOI_geom): if not any( same_geoms_func(el.object, ref_geom) for el in index.intersection(ref_geom.bounds, objects=True) ): yield ref_geom logger.info("Done searching missing geometries.")
#: Geometry types supported by the `SQLiteGeomRefDB` class. SpatialiteGeomType = Literal[ "Point", "LineString", "Polygon", "MultiPoint", "MultiLineString", "MultiPolygon", "GeometryCollection", ]
[docs]class SQLiteGeomRefDB(GeomRefDB): """Concrete implementation of the GeomRefDB ABC using SQLite. SQLiteGeomRefDB is a concrete implementation of the interface defined by the GeomRefDB abstract base class. It enables to load an existing (or create a new) SQLite database, where geometry datasets can be stored and can be compared (based on geometry similarity functions) with other geometrical features from an external dataset. Instances of this class can handle simultaneously multiple reference datasets, with various geometry types (see :attr:`supported_geom_types`) and spatial reference systems. Parameters ---------- filename : `str`, optional Path to an existing spatialite database. default_epsg : `int`, optional Default EPSG code of the geometrical features that will be added to the database. If specified, the EPSG code will be default value of the ``geoms_epsg`` parameter for any subsequent call of the :meth:`add_geometries` method. geoms_iter : iterable of `.GeomObject`, optional Iterable of the geometrical features to add to this `SQLiteGeomRefDB` instance. Such features can also be added later to the class instance with the :meth:`add_geometries` method. geoms_tab_name : `str`, optional Name of the table where the geometrical features are to be stored. If the ``geoms_iter`` parameter is not given, ``geoms_tab_name`` will be ignored. geom_type : `SpatialiteGeomType`, optional Geometry type of the geometrical features passed as argument to the ``geoms_iter`` parameter. geoms_epsg : `int`, optional EPSG code of the geometrical features passed as argument to the ``geoms_iter`` parameter. If specified, it overrides the ``default_epsg`` parameter during the instance construction. in_ram : bool, default: ``True`` Set to ``True`` to create/load the database in RAM for faster access. Set to ``False`` for larger-than-RAM databases. logger : `logging.Logger`, optional Logger instance to use for logging outputs. logger_name : `str`, optional Name of the `logging.Logger` object to create for logging outputs. This parameter will be ignored if a Logger instance is passed to the ``logger`` parameter. logging_level : `int`, default: ``logging.INFO`` Logging level of the logging output. For mor information, please see the documentation of the `logging` module. Raises ------ ValueError If ``in_ram=False`` and ``filename=None``. Notes ----- This class makes use of the spatialite extension of SQLite, and as such, spatialite must be installed and available in order to work with instances of this class. """ def __init__( self, filename: Optional[str] = None, default_epsg: Optional[int] = None, geoms_iter: Optional[GeometryIterable] = None, geoms_tab_name: Optional[str] = None, geom_type: Optional[SpatialiteGeomType] = None, geoms_epsg: Optional[int] = None, in_ram: bool = True, logger: Optional[logging.Logger] = None, logger_name: Optional[str] = None, logging_level: int = logging.INFO, ) -> None: if filename is not None: self._filename = os.path.abspath(filename) else: self._filename = None if default_epsg is not None: try: default_epsg = int(default_epsg) _ = pyproj.CRS(default_epsg) except (CRSError, ValueError, TypeError): raise ValueError("{!r} ('default_epsg') is not a valid EPSG code!") else: self._default_epsg = default_epsg else: self._default_epsg = None self._in_ram = in_ram if logger is not None: self._logger = logger else: if logger_name is None: logger_name = type(self).__name__ self._logger = _setup_logger(name=logger_name, level=logging_level) if filename is not None and not os.path.isfile(filename): new_db = True else: new_db = False if filename is None: if not in_ram: raise ValueError( "The 'filename' cannot be set to None if 'in_ram' is set to False!" ) else: self._conn = sqlite3.connect(":memory:") self._conn.enable_load_extension(True) self._conn.load_extension("mod_spatialite") cursor = self._conn.cursor() cursor.execute("SELECT InitSpatialMetaData();") self._conn.commit() self._logger.info("New database created in RAM.") elif in_ram: self._conn = sqlite3.connect(":memory:") self._conn.enable_load_extension(True) self._conn.load_extension("mod_spatialite") if not new_db: disk_conn = sqlite3.connect(filename) disk_conn.backup(self._conn) disk_conn.close() self._logger.info( f"Database file {filename!r} successfully loaded in RAM." ) else: cursor = conn.cursor() cursor.execute("SELECT InitSpatialMetaData();") self._conn.commit() self._logger.info( f"File {dp_path!r} does not exist, new database created in RAM." ) else: self._conn = sqlite3.connect(filename) self._conn.enable_load_extension(True) self._conn.load_extension("mod_spatialite") if new_db: cursor = conn.cursor() cursor.execute("SELECT InitSpatialMetaData();") self._conn.commit() self._logger.info(f"New database created at {filename!r}.") if geoms_iter is not None: try: self.add_geometries( geoms_iter, geom_type=geom_type, geoms_epsg=geoms_epsg, geoms_tab_name=geoms_tab_name, ) except Exception: if new_db and not in_ram: self._logger.error( "An error occurred while adding geometries to the database, " f"deleting file {filename!r}..." ) os.remove(filename) else: self._logger.error( "An error occurred while adding geometries to the database..." ) raise @classmethod @property def supported_geom_types(cls) -> list[SpatialiteGeomType]: """`list` of supported geometry types: Types supported by `SQLiteGeomRefDB`. """ return [ "Point", "LineString", "Polygon", "MultiPoint", "MultiLineString", "MultiPolygon", "GeometryCollection", ] @property def filename(self) -> Optional[str]: """Path of the opened database file. The attribute is set to `None` if a new database was created in RAM for this instance. """ return self._filename @property def in_ram(self) -> bool: """``True`` if the database is created/loaded in RAM. ``False`` if the instance is connected to database file on disk. """ return self._in_ram @property def default_epsg(self) -> int: """Default EPSG code of the geometrical features that are added to the database. """ return self._default_epsg @default_epsg.setter def default_epsg(self, value: int) -> None: self.logger.info(f"The 'default_epsg' is now set to {default_epsg}.") self._default_epsg = value @property def logger(self) -> logging.Logger: """Ready configured Logger instance used for logging outputs. """ return self._logger def __del__(self): """Close the connection to the SQLite database and do some cleanup if the instance has been pickled. """ self._conn.close() if hasattr(self, "db_tf") and os.path.isfile(self.db_tf): try: os.remove(self.db_tf) except PermissionError: pass def __getstate__(self): """Get state when pickling the instance. """ db_tf = NamedTemporaryFile(suffix=".db", delete=False) db_tf.close() _update_logger(self.logger, level=None) self.save_db(db_tf.name) _update_logger(self.logger, level=self.logger.getEffectiveLevel()) attrs = self.__dict__.copy() attrs["db_tf"] = db_tf.name attrs["_conn"] = None return attrs def __setstate__(self, state): """Set state when unpickling an instance. """ self.__dict__ = state if self.in_ram: disk_conn = sqlite3.connect(self.db_tf) self._conn = sqlite3.connect(":memory:") disk_conn.backup(self._conn) disk_conn.close() else: self._conn = sqlite3.connect(self.db_tf) self._conn.enable_load_extension(True) self._conn.load_extension("mod_spatialite")
[docs] def save_db(self, filename: str, overwrite: bool = True) -> None: """Save the internal SQLite database to disk. The function saves the internal SQLite database, together with all the geometrical features added with :meth:`add_geometries`, to disk. The path of the resulting output file can later be passed to the ``filename`` argument of the `SQLiteGeomRefDB` class' constructor to load the saved database with all its features. This function is useful only to save loaded-in-RAM databases, as the geometrical features added to a :class:`SQLiteGeomRefDB` instance, with an opened connections to databases that reside on disk, will be saved automatically even after the instance destruction. Parameters ---------- filename : `str` Path of the output database file. overwrite : `bool`, default: ``True`` ``True`` if the output file should overwrite any existing file at path ``filename``, else ``False``. Returns ------- `None` """ if os.path.isfile(filename): if overwrite: self.logger.info(f"File {filename!r} already exists. Removing file...") os.remove(filename) else: self.logger.info( f"File {filename!r} already exists. Saving database aborted." ) return disk_conn = sqlite3.connect(filename) self._conn.backup(disk_conn) disk_conn.close() self.logger.info("Database was saved successfully.")
[docs] def add_geometries( self, geoms_iter: GeometryIterable, geom_type: Optional[SpatialiteGeomType] = None, geoms_epsg: Optional[int] = None, geoms_tab_name: Optional[str] = None, ) -> None: """Add geometrical features to the internal SQLite database. The function adds geometrical features to the internal SQLite database, which can then be used as a "reference dataset" when running other public methods of the `SQLiteGeomRefDB` instance. Parameters ---------- geoms_iter : iterable of `.GeomObject` Iterable of the geometrical features to add to this `SQLiteGeomRefDB` instance. geom_type : `SpatialiteGeomType`, optional Geometry type of the input geometrical features. If the ``geom_type`` is not specified by the user, the function will assume that the input features have the same geometry type as the features already stored in the destination table. geoms_epsg : `int`, optional EPSG code of the input geometrical features. If the ``geoms_epsg`` is not specified by the user, the function will assume that the input features are in the same spatial reference system as the features already stored in the destination table. Also, if the input features are to be stored in a new table of the database and the ``geoms_epsg`` is omitted, the `SSQLiteGeomRefDB` instance will use the EPSG code stored in the :attr:`default_epsg` attribute (if set). geoms_tab_name : `str`, optional Name of the table where the input geometrical features are to be stored in the internal SQLite database. If no argument is passed to the ``geoms_tab_name`` parameter, the function will try to store the input geometrical features into a table named *default_table*. The *default_table* table will be created if it does not already exist in the database. Raises ------ ValueError If ``geom_type`` is not specified, in the case of a new database/table. ValueError If ``geoms_epsg`` is not specified, in the case of a new database/table and if the :attr:`default_epsg` attribute is not set. ValueError If the argument passed to the ``geom_type`` parameter does not match the geometry type of the features already stored in the destination table. Warnings -------- The *geometry type* must be the same for all input features as they are to be stored in the same *geometry column* of the same table, and spatialite does not allow *geometry columns* to have mixed *geometry types*. Returns ------- None """ self.logger.info("Adding geometries to the database...") db_info = self.db_geom_info() ## Coordinates of input geometries are not transformed by ## default. Return the input geometry unchanged. transform_geom = _unchanged_geom cursor = self._conn.cursor() if geom_type is not None and not geom_type in self.supported_geom_types: raise ValueError( f"{geom_type!r} is not a valid value for the 'geom_type' argument! " "Supported geometry types are: {}.".format( ", ".join([f"{gt!r}" for gt in self.supported_geom_types]) ) ) if geoms_epsg is not None: try: geoms_epsg = int(geoms_epsg) _ = pyproj.CRS(geoms_epsg) except (CRSError, ValueError, TypeError): raise ValueError(f"{geoms_epsg!r} is not a valid EPSG code!") if geoms_tab_name is None: geoms_tab_name = "default_table" tab_info = db_info.get(geoms_tab_name, None) if tab_info is None: new_tab = True else: new_tab = False if new_tab: if geom_type is None: raise ValueError( "'geom_type' cannot be passed None for new databases/tables, " "or if no existing geometry table name ('geoms_tab_name') " "was passed as parameter!" ) if geoms_epsg is None: if self.default_epsg is not None: geoms_epsg = self.default_epsg else: raise ValueError( "'geoms_epsg' cannot be passed None if no default EPSG has " "been set!" ) cursor.execute( f"CREATE TABLE {geoms_tab_name} " "(r_id INTEGER PRIMARY KEY AUTOINCREMENT);" ) cursor.execute( f"SELECT AddGeometryColumn ('{geoms_tab_name}', " f"'geometry', {geoms_epsg}, '{geom_type}', 'XY', 1);" ) cursor.execute( f"SELECT CreateSpatialIndex('{geoms_tab_name}', 'geometry');" ) self._conn.commit() else: # if existing table if geom_type is not None and geom_type != tab_info["geom_type"]: raise ValueError( f"The {geom_type!r} geometry type does not match the geometry type " f"of the {geoms_tab_name!r} table!" ) if geoms_epsg is None: geoms_epsg = tab_info["srid"] elif geoms_epsg != tab_info["srid"]: transform_geom = get_transform_func(geoms_epsg, tab_info["srid"]) geoms_epsg = tab_info["srid"] for geom in geoms_iter: cursor.execute( f"INSERT INTO {geoms_tab_name} (geometry) VALUES " f"(GeomFromText('{transform_geom(_to_2D(geom)).wkt}', " f"{geoms_epsg}));" ) self._conn.commit()
[docs] def get_geometries( self, aoi_geom: Optional[GeomObject] = None, aoi_epsg: Optional[int] = None, geoms_tab_name: Optional[str] = None, output_epsg: Optional[int] = None, ) -> Generator[GeomObject]: """Get geometrical features from the internal SQLite database. Generator function which yields geometrical features stored in the internal database. The user can specify the table, or define a limited area to yield the features from. In addition, the spatial reference system of the output geometries can also be specified. Parameters ---------- aoi_geom : `.GeomObject`, optional *Area of interest*, where the geometrical features lies. aoi_epsg : `int`, optional EPSG code of the *area of interest* geometry/ies. geoms_tab_name : `str`, optional Name of the table where the geometrical features are stored in the internal SQLite database. If no argument is passed to the ``geoms_tab_name`` parameter, the function will try to yield geometrical features from a table named *default_table*. output_epsg : `int`, optional EPSG code of the yielded geometrical features. This parameter can be used to transform the yielded geometries to a different Spatial Reference System from the one used in the internal database. Yields ------ `.GeomObject` Geometrical features from the internal SQLite database. Raises ------ ValueError If ``geoms_tab_name`` is not specified and no table named *default_table* exist in the database. """ query_kwargs = dict() ## Coordinates of input geometries are not transformed by ## default. Return the input geometry unchanged. transform_geom = _unchanged_geom if geoms_tab_name is None: geoms_tab_name = "default_table" db_info = self.db_geom_info() tab_info = db_info.get(geoms_tab_name, None) if tab_info is None: raise RuntimeError(f"No {geoms_tab_name!r} table was found in the database!") else: query_kwargs["table"] = geoms_tab_name tab_epsg = tab_info["srid"] ## Coordinates of output geometries are not transformed by ## default. Return the output geometry unchanged. transform_geom = _unchanged_geom if output_epsg is not None: try: output_epsg = int(output_epsg) _ = pyproj.CRS(output_epsg) except (CRSError, ValueError, TypeError): raise ValueError(f"{output_epsg!r} is not a valid EPSG code!") transform_geom = get_transform_func(tab_epsg, output_epsg) if aoi_geom is not None: if aoi_epsg is not None: try: aoi_epsg = int(aoi_epsg) _ = pyproj.CRS(aoi_epsg) except (CRSError, ValueError, TypeError): raise ValueError(f"{aoi_epsg!r} is not a valid EPSG code!") if aoi_epsg != tab_epsg: transform_aoi = get_transform_func(aoi_epsg, tab_epsg) aoi_geom = transform_aoi(aoi_geom) query_kwargs["aoiwkt"] = aoi_geom.wkt query_kwargs["epsg"] = tab_epsg query = self._get_spatial_query( spatial_index=aoi_geom is not None, only_within_aoi=aoi_geom is not None, ) cursor = self._conn.cursor() cursor.execute(query.format(**query_kwargs)) for row in cursor: yield transform_geom(wkb.loads(row[0]))
[docs] def db_geom_info( self, to_stdout: bool = False, count_features: bool = False ) -> Optional[dict]: """Get information on features stored in the internal SQLite database. Get information on the geometrical features such as the name of the table(s) where they are stored, their geometry type(s), spatial reference system(s) and the number of features per table. This information can be returned as `dict` instance, or printed to *stdout*. Parameters ---------- to_stdout : bool, default: ``False`` If set to ``False``, the information is returned as a `dict`. If set to ``True``, the information is written to *stdout*. count_features : bool, default: ``False`` If set to ``True``, the function will also return the number of features/rows per table. If set to ``False``, the features will not be counted. Returns ------- `dict` or `None` If ``to_stdout=False``, returns a `dict` which keys are the table name(s), and which values are information on the individual table(s). This information is itself structured as a `dict`, which key/value pairs indicate for each table the geometry type (key: *geom_type*), the spatial reference system (key: *srid*), and optionally (if ``count_features=True``) the features count (key: *count*). The function returns `None` if ``to_stdout=True``. """ cursor = self._conn.cursor() cursor.execute("SELECT * FROM geometry_columns") info = { tab[0]: {"geom_type": _geom_type_mapping[tab[2]], "srid": tab[4]} for tab in cursor.fetchall() } if count_features: for tab in info.keys(): cursor.execute(f"SELECT COUNT(*) FROM {tab};") info[tab]["count"] = cursor.fetchone()[0] if not to_stdout: return info elif not info: return None else: f1 = "TABLE" f1_width = max(max(len(tab_name) for tab_name in info.keys()), len(f1)) f2 = "GEOMETRY" f2_width = max(max(len(info[k]["geom_type"]) for k in info.keys()), len(f2)) f3 = "EPSG" f3_width = max(max(len(str(info[k]["srid"])) for k in info.keys()), len(f3)) line_tmp = f"{{f1:<{f1_width}}} | {{f2:<{f2_width}}} | {{f3:>{f3_width}}}" if count_features: f4 = "COUNT" f4_width = max(max(len(str(info[k]["count"])) for k in info.keys()), len(f4)) line_tmp += f" | {{f4:>{f4_width}}}" line_tmp += "\n" head = line_tmp.format(**locals()) line_width = len(head) - 1 table = f"\n{head}{'-' * line_width}\n" if count_features: for k, v in info.items(): table += line_tmp.format( f1=k, f2=v["geom_type"], f3=str(v["srid"]), f4=str(v["count"]) ) else: for k, v in info.items(): table += line_tmp.format(f1=k, f2=v["geom_type"], f3=str(v["srid"])) print(table)
@staticmethod def _get_spatial_query(spatial_index: bool = True, only_within_aoi: bool = False, within_aoi: bool = False) -> str: """Builds SQL spatial query templates for accessing the database's features. This function builds SQL spatial query template for fast access to the features stored in the database. It can create templates where the queries use the internal spatial index to search for features within a *search frame*, and/or where the features returned must lie within an *area of interest*. Parameters ---------- spatial_index : bool, default: ``True`` If set to ``True``, create a template for using the spatial index and search for features that lie within a *search frame* and/or an *area of interest*. If set to ``False``, create a template for returning all features of table, without filtering (overrinding both ``only_within_aoi`` and ``within_aoi``). only_within_aoi : bool, default: ``False`` If set to ``True``, create a template that will use the spatial index for searching for features that only intersect with an *area of interest*, without further filtering. If set to ``False``, create a template that may use the spatial index for searching for features lying within an *area of interest* and/or within a *search frame* within_aoi : bool, default: ``False`` If set to ``True``, create a template that will use the spatial index for searching for features that intersect with an *area of interest*. If set to ``False``, the created template may not filter out features based on a *area of interest* (depending on the value passed to the ``only_within_aoi`` parameter). Returns ------- `str` Template of the SQL query. """ query = "SELECT AsBinary(geometry) FROM {{table}}" if not spatial_index: return query.format() + ";" else: query += " {spatial_filter};" sf = " WHERE {{table}}.ROWID IN {aoi}{geom}" if only_within_aoi or within_aoi: aoi = ( "(SELECT ROWID " " FROM SpatialIndex " " WHERE f_table_name = '{table}' " " AND search_frame = GeomFromText('{aoiwkt}', {epsg})) " "AND Intersects(geometry, GeomFromText('{aoiwkt}', {epsg}))" ) else: aoi = "" if only_within_aoi: geom = "" return query.format(spatial_filter=sf.format(**locals())) else: geom = ( "(SELECT ROWID " " FROM SpatialIndex " " WHERE f_table_name = '{table}' " " AND search_frame = GeomFromText('{geomsfwkt}', {epsg}))" ) if within_aoi: aoi += " AND {table}.ROWID IN " return query.format(spatial_filter=sf.format(**locals())) def _geoms_generator( self, geoms_iter, transform_geom, geoms_match, get_search_frame, query, query_kwargs, matching_geoms=True, ) -> Generator[GeomObject]: """Yield (non-)matching features resulting from the SQL query. Function generator that formats a SQL query template (from :meth:`_get_spatial_query`), and executes the resulting query for each input feature. The feature(s) resulting from the queries is/are yielded by the function, depending on whether its/their geometries match the geometry of the input features or not. """ cursor = self._conn.cursor() if matching_geoms: for geom in geoms_iter: geom_reproj = transform_geom(_to_2D(geom)) search_frame = get_search_frame(geom_reproj) query_kwargs["geomsfwkt"] = search_frame.wkt cursor.execute(query.format(**query_kwargs)) if any(geoms_match(geom_reproj, wkb.loads(row[0])) for row in cursor): yield geom else: for geom in geoms_iter: geom_reproj = transform_geom(_to_2D(geom)) search_frame = get_search_frame(geom_reproj) query_kwargs["geomsfwkt"] = search_frame.wkt cursor.execute(query.format(**query_kwargs)) if not any( geoms_match(geom_reproj, wkb.loads(row[0])) for row in cursor ): yield geom
[docs] def true_positives( self, geoms_iter: GeometryIterable, aoi_geom: Optional[GeomObject] = None, geoms_epsg: Optional[int] = None, geoms_tab_name: Optional[str] = None, geoms_match: Callable[[GeomObject, GeomObject], bool] = None, get_search_frame: Callable[[GeomObject], GeomObject] = None, ncores: Optional[int] = None, ) -> Generator[GeomObject]: # , **kwargs): """Identidy *matching* **input** geometries. The function takes as input geometrical features, and searches for *reference features* in one table of the internal database which geometries are considered to *match* that of the *input features*. All *input features* that have a geometry that *matches* the geometry of at least one of the *reference features* will be yielded back by the function. Parameters ---------- geoms_iter : iterable of `.GeomObject` Iterable of input geometrical features to compare to the features of the internal SQLite database. aoi_geom : `.GeomObject`, optional *Area of interest*, within which the database's features must lie. geoms_epsg : `int`, optional EPSG code of the input geometrical features (including ``aoi_geom`` if specified). If the ``geoms_epsg`` is not specified by the user, the function will assume that the *input features* are in the same spatial reference system as the *reference features*. geoms_tab_name : `str`, optional Name of the table where database's features that will be used as reference are stored. If no argument is passed to the ``geoms_tab_name`` parameter, the function will search for *reference features* in a table named *default_table*. geoms_match : `callable`, optional Comparison function that takes two positional arguments: - ``gtest``: *input* geometry (`.GeomObject`) - ``gref``: *reference* geometry (`.GeomObject`) The function returns ``True`` if it finds that both geometries *match*, else returns ``False``. If this parameter is omitted, the *input* geometrical feature will always be considered as a *match* in the case where its *search frame* (see ``get_search_frame`` parameter) interesects with one of the feature from the database's table. get_search_frame : `callable`, optional Function that takes as single argument an *input* geometry (`.GeomObject`) and returns its *search frame* (`.GeomObject`). If this parameter is omitted, the *search frame* will be the same as the *input* geometry. ncores : `int`, optional Number of cores to use for running the function. If unspecified, the function will run in a single process Yields ------ `.GeomObject` *Matching input* geometrical features. Notes ----- If the *spatial reference system* of the *input* geometrical features is different from that of the database's features, the *input features*' coordinates are reprojected on-the-fly, before being compared to features stored in the database. If an *input feature* is considered to be a *match*, it is yielded back unchanged (its coordinates in the original *spatial reference system*). """ self.logger.info("Searching true positive geometries...") query_kwargs = dict() ## Set default behaviors. ## If no function is passed to the 'geoms_match' parameter, ## the "test geometry" will always match the "reference ## geometry". This way, the user can still use the spatial ## index of the internal database and make simple query against ## bounding boxes of the stored geometries, without further ## assessment of whether geometries match each others. if geoms_match is None: geoms_match = _geoms_always_match ## Default search frame is the input geometry itself. if get_search_frame is None: get_search_frame = _unchanged_geom ## Coordinates of input geometries are not transformed by ## default. Return the input geometry unchanged. transform_geom = _unchanged_geom if geoms_tab_name is None: geoms_tab_name = "default_table" db_info = self.db_geom_info() tab_info = db_info.get(geoms_tab_name, None) if tab_info is None: raise RuntimeError(f"No {geoms_tab_name!r} table was found in the database!") else: query_kwargs["table"] = geoms_tab_name tab_epsg = tab_info["srid"] if geoms_epsg is None: geoms_epsg = tab_epsg else: try: geoms_epsg = int(geoms_epsg) _ = pyproj.CRS(geoms_epsg) except (CRSError, ValueError, TypeError): raise ValueError(f"{geoms_epsg!r} is not a valid EPSG code!") if geoms_epsg != tab_epsg: transform_geom = get_transform_func(geoms_epsg, tab_epsg) if aoi_geom is not None: aoi_geom = transform_geom(aoi_geom) query_kwargs["epsg"] = tab_epsg if aoi_geom is not None: query_kwargs["aoiwkt"] = aoi_geom.wkt if ncores is not None: try: ncores = int(ncores) max_cores = mp.cpu_count() - 1 if ncores > max_cores: self.logger.info( f"Value {ncores} passed to the 'ncores' argument is too high " "and may result in performance penalty, setting it down to " f"{max_cores}." ) ncores = max_cores elif ncores < 2: ncores = None except (ValueError, TypeError): raise ValueError( f"{ncores!r} is not a valid value for the 'ncores' argument!" ) query = self._get_spatial_query(within_aoi=aoi_geom is not None) if ncores is not None: tp_gen = self._parallelized_method( ncores, geoms_iter, transform_geom, geoms_match, get_search_frame, query, query_kwargs, matching_geoms=True, method_name="_geoms_generator", ) else: tp_gen = self._geoms_generator( geoms_iter, transform_geom, geoms_match, get_search_frame, query, query_kwargs, matching_geoms=True, ) yield from tp_gen self.logger.info("Done searching true positive geometries.")
[docs] def false_positives( self, geoms_iter: GeometryIterable, aoi_geom: Optional[GeomObject] = None, geoms_epsg: Optional[int] = None, geoms_tab_name: Optional[str] = None, geoms_match: Callable[[GeomObject, GeomObject], bool] = None, get_search_frame: Callable[[GeomObject], GeomObject] = None, ncores: Optional[int] = None, ) -> Generator[GeomObject]: # , **kwargs): """Identify *non-matching* **input** geometries. The function takes as input geometrical features, and searches for *reference features* in one table of the internal database which geometries are considered to *match* that of the *input features*. All *input features* that **DO NOT** have a geometry that matches the geometry of any *reference features* will be yielded back by the function. Parameters ---------- geoms_iter : iterable of `.GeomObject` Iterable of input geometrical features to compare to the features of the internal SQLite database. aoi_geom : `.GeomObject`, optional *Area of interest*, within which the database's features must lie. geoms_epsg : `int`, optional EPSG code of the input geometrical features (including ``aoi_geom`` if specified). If the ``geoms_epsg`` is not specified by the user, the function will assume that the *input features* are in the same spatial reference system as the *reference features*. geoms_tab_name : `str`, optional Name of the table where database's features that will be used as reference are stored. If no argument is passed to the ``geoms_tab_name`` parameter, the function will search for *reference features* in a table named *default_table*. geoms_match : `callable`, optional Comparison function that takes two positional arguments: - ``gtest``: *input* geometry (`.GeomObject`) - ``gref``: *reference* geometry (`.GeomObject`) The function returns ``True`` if it finds that both geometries *match*, else returns ``False``. If this parameter is omitted, the *input* geometrical feature will always be considered as a *match* in the case where its *search frame* (see ``get_search_frame`` parameter) interesects with one of the features from the database's table. get_search_frame : `callable`, optional Function that takes as single argument an *input* geometry (`.GeomObject`) and returns its *search frame* (`.GeomObject`). If this parameter is omitted, the *search frame* will be the same as the *input* geometry. ncores : `int`, optional Number of cores to use for running the function. If unspecified, the function will run in a single process Yields ------ `.GeomObject` *Non-matching input* geometrical features. Notes ----- If the *spatial reference system* of the *input* geometrical features is different from that of the database's features, the *input features*' coordinates are reprojected on-the-fly, before being compared to features stored in the database. If an *input feature* is **NOT** considered to be a *match*, it is yielded back unchanged (its coordinates in the original *spatial reference system*). """ self.logger.info("Searching false positive geometries...") query_kwargs = dict() ## Set default behaviors. ## If no function is passed to the 'geoms_match' parameter, ## the "test geometry" will always match the "reference ## geometry". This way, the user can still use the spatial ## index of the internal database and make simple query against ## bounding boxes of the stored geometries, without further ## assessment of whether geometries match. if geoms_match is None: geoms_match = _geoms_always_match ## Default search frame is the input geometry itself. if get_search_frame is None: get_search_frame = _unchanged_geom ## Coordinates of input geometries are not transformed by ## default. Return the input geometry unchanged. transform_geom = _unchanged_geom if geoms_tab_name is None: geoms_tab_name = "default_table" db_info = self.db_geom_info() tab_info = db_info.get(geoms_tab_name, None) if tab_info is None: raise RuntimeError(f"No {geoms_tab_name!r} table was found in the database!") else: query_kwargs["table"] = geoms_tab_name tab_epsg = tab_info["srid"] if geoms_epsg is None: geoms_epsg = tab_epsg else: try: geoms_epsg = int(geoms_epsg) _ = pyproj.CRS(geoms_epsg) except (CRSError, ValueError): raise ValueError(f"{geoms_epsg!r} is not a valid EPSG code!") if geoms_epsg != tab_epsg: transform_geom = get_transform_func(geoms_epsg, tab_epsg) if aoi_geom is not None: aoi_geom = transform_geom(aoi_geom) query_kwargs["epsg"] = tab_epsg if aoi_geom is not None: query_kwargs["aoiwkt"] = aoi_geom.wkt if ncores is not None: try: ncores = int(ncores) max_cores = mp.cpu_count() - 1 if ncores > max_cores: self.logger.info( f"Value {ncores} passed to the 'ncores' argument is too high " "and may result in performance penalty, setting it down to " f"{max_cores}." ) ncores = max_cores elif ncores < 2: ncores = None except (ValueError, TypeError): raise ValueError( f"{ncores!r} is not a valid value for the 'ncores' argument!" ) query = self._get_spatial_query(within_aoi=aoi_geom is not None) if ncores is not None: fp_gen = self._parallelized_method( ncores, geoms_iter, transform_geom, geoms_match, get_search_frame, query, query_kwargs, matching_geoms=False, method_name="_geoms_generator", ) else: fp_gen = self._geoms_generator( geoms_iter, transform_geom, geoms_match, get_search_frame, query, query_kwargs, matching_geoms=False, ) yield from fp_gen self.logger.info("Done searching false positive geometries.")
[docs] def missing_geometries( self, geoms_iter: GeometryIterable, geom_type: Optional[SpatialiteGeomType] = None, aoi_geom: Optional[GeomObject] = None, geoms_epsg: Optional[int] = None, geoms_tab_name: Optional[str] = None, geoms_match: Callable[[GeomObject, GeomObject], bool] = None, get_search_frame: Callable[[GeomObject], GeomObject] = None, ncores: Optional[int] = None, ) -> Generator[GeomObject]: # , **kwargs): """Identify (missing) *non-matching* **reference** geometries. The function takes as input geometrical features, and searches for *reference features* in one table of the internal database which geometries are **NOT** considered to *match* the geometry of any feature from the input set. All *reference features* that **DO NOT** have a geometry that *matches* the geometry of any *input features* will be yielded by the function. Parameters ---------- geoms_iter : iterable of `.GeomObject` Iterable of input geometrical features to compare to the features of the internal SQLite database. geom_type : `SpatialiteGeomType`, optional Geometry type of the input geometrical features. If the ``geom_type`` is not specified by the user, the function will assume that the *input features* have the same ``geom_type`` as the *reference features*. aoi_geom : `.GeomObject`, optional *Area of interest*, within which the database's features must lie. geoms_epsg : `int`, optional EPSG code of the input geometrical features (including ``aoi_geom`` if specified). If the ``geoms_epsg`` is not specified by the user, the function will assume that the *input features* are in the same spatial reference system as the *reference features*. geoms_tab_name : `str`, optional Name of the table where database's features that will be used as reference are stored. If no argument is passed to the ``geoms_tab_name`` parameter, the function will search for *reference features* in a table named *default_table*. geoms_match : `callable`, optional Comparison function that takes two positional arguments: - ``gtest``: *input* geometry (`.GeomObject`) - ``gref``: *reference* geometry (`.GeomObject`) The function returns ``True`` if it finds that both geometries *match*, else returns ``False``. If this parameter is omitted, the *input* geometrical feature will always be considered as a *match* in the case where its *search frame* (see ``get_search_frame`` parameter) interesects with one of the features from the database's table. get_search_frame : `callable`, optional Function that takes as single argument an *input* geometry (`.GeomObject`) and returns its *search frame* (`.GeomObject`). If this parameter is omitted, the *search frame* will be the same as the *input* geometry. ncores : `int`, optional Number of cores to use for running the function. If unspecified, the function will run in a single process Yields ------ `.GeomObject` *Non-matching reference* geometrical features. Notes ----- If the *spatial reference system* of the *input* geometrical features is different from that of the database's features, the *input features*' coordinates are reprojected on-the-fly, before being compared to features stored in the database. """ self.logger.info("Searching missing geometries...") query_kwargs = dict() if geoms_tab_name is None: geoms_tab_name = "default_table" db_info = self.db_geom_info() tab_info = db_info.get(geoms_tab_name, None) if tab_info is None: raise RuntimeError( f"No {geoms_tab_name!r} table was found in the database!" ) if geom_type is None: self.logger.info( "No geometry type was passed to the 'geom_type' argument. Assuming " "that all input geometries have the same geometry type as the ones " f"stored in the {geoms_tab_name!r} table." ) geom_type = tab_info["geom_type"] query_kwargs["table"] = geoms_tab_name tab_epsg = tab_info["srid"] query_kwargs["epsg"] = tab_epsg if geoms_epsg is None: geoms_epsg = tab_epsg else: try: geoms_epsg = int(geoms_epsg) _ = pyproj.CRS(geoms_epsg) except (CRSError, ValueError, TypeError): raise ValueError(f"{geoms_epsg!r} is not a valid EPSG code!") if geoms_epsg != tab_epsg and aoi_geom is not None: transform_aoi = get_transform_func(geoms_epsg, tab_epsg) aoi_geom = transform_aoi(aoi_geom) if aoi_geom is not None: query_kwargs["aoiwkt"] = aoi_geom.wkt logger_conf = dict() if ncores is not None: try: ncores = int(ncores) max_cores = mp.cpu_count() - 1 if ncores > max_cores: self.logger.info( f"Value {ncores} passed to the 'ncores' argument is too high " "and may result in performance penalty, setting it down to " f"{max_cores}." ) ncores = max_cores elif ncores < 2: ncores = None except (ValueError, TypeError): raise ValueError( f"{ncores!r} is not a valid value for the 'ncores' argument!" ) if ncores is None: logger_conf.update( {"logger_name": str(uuid.uuid1()), "logging_level": None} ) else: logger_conf["logger"] = self.logger input_geoms_db = SQLiteGeomRefDB( geoms_iter=geoms_iter, geom_type=geom_type, geoms_epsg=geoms_epsg, **logger_conf, ) query = self._get_spatial_query( spatial_index=aoi_geom is not None, only_within_aoi=aoi_geom is not None, ) cursor = self._conn.cursor() cursor.execute(query.format(**query_kwargs)) geoms_iter = (wkb.loads(row[0]) for row in cursor) if ncores is not None: mg_gen = input_geoms_db._parallelized_method( ncores, geoms_iter, method_name="false_positives", geoms_epsg=tab_epsg, geoms_match=geoms_match, get_search_frame=get_search_frame, ) else: mg_gen = input_geoms_db.false_positives( geoms_iter, geoms_epsg=tab_epsg, geoms_match=geoms_match, get_search_frame=get_search_frame, ) yield from mg_gen del input_geoms_db self.logger.info("Done searching missing geometries.")
def _parallelized_method( self, ncores, iterable, *method_args, method_name="", **method_kwargs ): """Parallelize function generator methods. The function enables to parallelize (use of multiple cores) methods which take as input an iterable and return a generator. Parameters ---------- ncores : `int` Number of cores to use for parallelizing the method. iterable : `Iterable` Input iterable. Notes ----- The parallelized method can take any number of positional and keyword arguments, as long as the input iterable is their first positional argument. """ if not method_name: method_name = inspect.stack()[1][3] method_obj = getattr(self, method_name, None) if method_obj is None: raise ValueError( f"{method_name!r} is not a valid method name for " f"the class {cls.__name__!r}!" ) # Same logging configuration for the workers as for the parent process. logger_conf = { "name": self.logger.name, "level": self.logger.getEffectiveLevel(), } method_kwargs["logger_conf"] = logger_conf # Split input iterable into ncores lists of equal length. Note: the # use of lists is for compatibility with the mutliprocessing # module. iterables = split_iter_to_lists(iterable, ncores) # Object to collect the results from the methods running in parallele. shared_results_iter = SharedIterator() # Sentinel value for sharing status amongst processes and knowing # when all workers are done. nprocs_done = mp.Value("i", 0) procs = list() for i in range(ncores): procs.append( mp.Process( target=self._wrap_method_return, args=[ method_obj, iterables[i], shared_results_iter, nprocs_done, *method_args, ], kwargs=method_kwargs, ) ) try: for p in procs: p.start() self.logger.info(f"New process spawned (PID: {p.pid}).") ## Must check sentinel value, join cannot be called as the ## spawned processes use a queue object (see: ## https://docs.python.org/3/library/multiprocessing.html#programming-guidelines). while not nprocs_done.value == ncores: time.sleep(0.2) except KeyboardInterrupt: for p in procs: p.terminate() p.join() del iterables raise else: return shared_results_iter def _wrap_method_return( self, method_obj, iterable, shared_results_iter, nprocs_done, *method_args, logger_conf=None, # must be here!! **method_kwargs, ): """Wraps the generators returned by the parallelized method, and appends their elements to a shared object compatible with multiprocessing. """ if logger_conf is None: logger = _setup_logger(name=str(uuid.uuid1()), level=None) else: logger_conf["name"] = ( logger_conf.get("name", type(self).__name__) + f" (PID: {os.getpid()})" ) logger = _setup_logger(**logger_conf) logger.info("Start worker function.") res = list(method_obj(iterable, *method_args, **method_kwargs)) n_geoms = len(res) if n_geoms > 1: logger.info(f"{n_geoms} geometries found.") else: logger.info(f"{n_geoms} geometry found.") shared_results_iter.put_iter(res) with nprocs_done.get_lock(): nprocs_done.value += 1 logger.info("Worker function completed.")