# -*- coding: utf-8 -*-
import inspect
import itertools
import logging
import os
import sys
# from collections import defaultdict
from collections.abc import Iterable, Sequence, Generator
from numbers import Integral
from typing import Literal, NamedTuple, Optional, Union
try:
from osgeo import ogr, osr
ogr.UseExceptions()
except ImportError:
pass
import psycopg2
from shapely import wkb
from shapely.geometry import (
LinearRing,
LineString,
MultiLineString,
MultiPoint,
MultiPolygon,
Point,
Polygon,
)
from .geomutils import _geom_type_mapping, get_transform_func, _unchanged_geom, GeomObject
def _setup_logger(
name: Optional[str] = None, level: int = logging.INFO, show_pid: bool = False
) -> logging.Logger:
"""Setup the logging configuration for a Logger.
Return a ready-configured logging.Logger instance which will write
to 'stdout'.
Parameters
----------
name : `str`, optional
Name of the logging.Logger instance to get. Default is the filename
where the function is called.
level : `int`, default: ``logging.INFO``
Logging level to set to the returned logging.Logger instance.
show_pid : `bool`, default: ``False``
Show the process ID in the log records.
Returns
-------
`logging.Logger`
Ready-configured Logger
"""
if name is None:
name = os.path.basename(inspect.stack()[1].filename)
## Get logger.
logger = logging.getLogger(name)
## Remove existing handlers.
for handler in logger.handlers:
logger.removeHandler(handler)
if level is None:
logger.disabled = True
return logger
## Set basic logging configuration.
if show_pid:
logger.show_pid = True
pid = f"(PID: {os.getpid()}) "
else:
logger.show_pid = False
pid = ""
if level <= logging.DEBUG:
fmt = (
f"%(asctime)s - %(levelname)s - %(name)s {pid}in %(funcName)s "
"(l. %(lineno)d) - %(message)s"
)
else:
fmt = "%(asctime)s - %(levelname)s " f"- %(name)s {pid}- %(message)s"
formatter = logging.Formatter(fmt)
ch = logging.StreamHandler(sys.stdout)
ch.setFormatter(formatter)
logger.addHandler(ch)
logger.setLevel(level)
return logger
def _update_logger(logger: logging.Logger, **kwargs) -> None:
"""Update the configuration of a logging.Logger instance.
Parameters
----------
logger : logging.Logger
Logger instance to be updated.
Keyword arguments:
Configuration parameters of the logging.Logger instance to update,
such as "level" or "show_pid". See function "_setup_logger".
"""
level = kwargs.get("level", logger.getEffectiveLevel())
if level is None:
logger.disabled = True
return
elif "level" in kwargs.keys():
logger.disabled = False
## Set basic logging configuration.
if not hasattr(logger, "show_pid"):
logger.show_pid = False
show_pid = kwargs.get("show_pid", logger.show_pid)
if show_pid:
pid = f"(PID: {os.getpid()}) "
else:
pid = ""
if level <= logging.DEBUG:
fmt = (
f"%(asctime)s - %(levelname)s - %(name)s {pid}in %(funcName)s "
"(l. %(lineno)d) - %(message)s"
)
else:
fmt = "%(asctime)s - %(levelname)s " f"- %(name)s {pid}- %(message)s"
formatter = logging.Formatter(fmt)
for handler in logger.handlers:
handler.setFormatter(formatter)
logger.setLevel(level)
[docs]class ConnectionParameters(NamedTuple):
"""Parameters to open a connection to a PostGIS database.
Instances of this class are intended to be used as parameter for
the `fetch_geoms_from_pg` function.
Attributes
----------
.. host : `str`
Database host address.
.. dbname : `str`
Database name.
.. user : `str`
User name used to authenticate.
.. password : `str`
Password used to authenticate.
.. port : `int`, default: ``5432``
Connection port number.
"""
#: Database host address.
host: str
#: Database name.
dbname: str
#: User name used to authenticate.
user: str
#: Password used to authenticate.
password: str
#: Connection port number.
port: int = 5432
[docs]class SchemaTableColumn(NamedTuple):
"""Location of a geometry column in a PostGIS database.
Instances of this class are intended to be used as parameter for
the `fetch_geoms_from_pg` function.
Attributes
----------
.. schema : `str`
Schema name of the PostGIS database, where the table
containing the geometrical features is located.
.. table : `str`
Table name, where the geometrical features can be found.
.. column : `str`
Column name, where the geometrical features can be found.
"""
#: Schema name of the PostGIS database, where the table containing
#: the geometrical features is located.
schema: str
#: Table name, where the geometrical features can be found.
table: str
#: Column name, where the geometrical features can be found.
column: str
[docs]def fetch_geoms_from_pg(
conn: Optional[psycopg2.extensions.connection] = None,
conn_params: Optional[ConnectionParameters] = None,
sql_query: Optional[str] = None,
geoms_col_loc: Optional[SchemaTableColumn] = None,
aoi: Optional[GeomObject] = None,
aoi_epsg: Optional[int] = None,
output_epsg: Optional[int] = None,
) -> Generator[GeomObject]:
"""Fetch geometrical features from a PostGIS database.
Generator function which connects or uses an existing connection to a
PostGIS database, and yields geometrical features from specified
geometry column (within a given area or not), or based on a
user-defined SQL query. If the connection to the database is opened by
the function, it will be closed automatically after the last
geometrical feature is yielded.
Parameters
----------
conn : `psycopg2.extensions.connection`, optional
Pre-opened connection to the PostGIS database.
conn_params : `ConnectionParameters`, optional
Parameters to open a connection to the PostGIS database.
sql_query : `str`, optional
SQL query to use to extract geometrical features from the PostGIS
database.
geoms_col_loc : `SchemaTableColumn`, optional
Geometry column location within the PostGIS database.
aoi : `.GeomObject`, optional
*Area of interest*, where the geometrical features lies.
aoi_epsg : `int`, optional
EPSG code of the *area of interest* geometry/ies.
output_epsg : `int`, optional
EPSG code of the yielded geometrical features. This parameter can
be used to transform the yielded geometries to a different Spatial
Reference System from the one used in the PostGIS database.
Yields
------
`.GeomObject`
Geometrical features from the PostGIS database.
Raises
------
ValueError
If both ``conn`` and ``conn_params`` parameters are not passed
an argument different from `None`.
ValueError
If both ``sql_query`` and ``geoms_col_loc`` parameters are not
passed an argument different from `None`.
Notes
-----
In the case where the ``sql_query`` parameter is given, the parameters
``geoms_col_loc``, ``aoi``, ``aoi_epsg`` and ``output_epsg`` will be
ignored, as SQL queries can include filtering and reprojection.
"""
if conn is None:
if conn_params is None:
raise ValueError("'conn' and 'conn_params' cannot both be passed None!")
conn = psycopg2.connect(**conn_params._asdict())
close_conn = True
else:
close_conn = False
cursor = conn.cursor()
if sql_query is None:
if geoms_col_loc is None:
raise ValueError(
"'sql_query' and 'geoms_col_loc' cannot both be passed None!"
)
if aoi is not None or output_epsg is not None:
cursor.execute(
f"SELECT Find_SRID('{geoms_col_loc.schema}', "
f"'{geoms_col_loc.table}', "
f"'{geoms_col_loc.column}');"
)
pg_epsg = int(cursor.fetchone()[0])
where_filter = f"WHERE {geoms_col_loc.column} IS NOT NULL"
if aoi is not None:
if aoi_epsg is not None and int(aoi_epsg) != pg_epsg:
transform_aoi = get_transform_func(aoi_epsg, pg_epsg)
aoi = transform_aoi(aoi)
spatial_filter = (
f" AND ST_Intersects({geoms_col_loc.column}, "
f"ST_GeomFromText('{aoi.wkt}', {pg_epsg}));"
)
else:
spatial_filter = ";"
if output_epsg is not None and int(output_epsg) != pg_epsg:
geoms_col_loc = geoms_col_loc._replace(
column=(f"ST_Transform({geoms_col_loc.column}, " f"{output_epsg})")
)
sql_query = (
f"SELECT ST_AsBinary({geoms_col_loc.column}) "
f"FROM {geoms_col_loc.schema}.{geoms_col_loc.table} "
f"{where_filter}{spatial_filter}"
)
cursor.execute(sql_query)
for row in cursor:
yield wkb.loads(row[0].tobytes())
if close_conn:
conn = None
def _get_layer_epsg(layer) -> Optional[int]:
"""Extract and return the EPSG code of an ogr.Layer. Return None
if not found.
"""
lyr_srs = layer.GetSpatialRef()
if lyr_srs is not None and lyr_srs.AutoIdentifyEPSG() == 0:
return int(lyr_srs.GetAuthorityCode(None))
else:
return None
#: Type for identifying layers.
LayerID = Union[str, int]
[docs]class LayerFilter(NamedTuple):
"""Filter for extraction of geometrical features from file.
Instances of this class are intended to be used as parameter for
the `extract_geoms_from_file` function, for filtering and choosing
the geometrical features to extract.
Attributes
----------
.. layer_id : `LayerID`, optional
Name or index of the layer the filter will be applied to. If set
to `None`, the filter will be applied on all layers.
.. aoi : `.GeomObject`, optional
*Area of interest*, where the geometrical features lies. All
features lying outside the *area of interest* will be filtered
out (not extracted).
.. aoi_epsg : `int`, optional
EPSG code of the *area of interest* geometry/ies. If set to
`None`, the same Spatial Reference System as the layer will be
used.
.. attr_filter : `str`, optional
Valid string representation of an attribute filter
(e.g. ``"attr_name = 'value'"``).
.. fids : sequence of `int`, optional
IDs of the features to extract from the layer. This parameter
will be ignored if either the ``aoi`` or the ``attr_filter``
parameters are specified by the user.
"""
#: Name or index of the layer the filter will be applied to. If set to
#: `None`, the filter will be applied on all layers.
layer_id: Optional[LayerID] = None
#: `.GeomObject`, optional: *Area of interest*, where the
#: geometrical features lies. All features lying outside the *area
#: of interest* will be filtered out (not extracted).
aoi: Optional[GeomObject] = None
#: EPSG code of the *area of interest* geometry/ies. If set to `None`,
#: the same Spatial Reference System as the layer will be used.
aoi_epsg: Optional[int] = None
#: Valid string representation of an attribute filter
#: (e.g. ``"attr_name = 'value'"``).
attr_filter: Optional[str] = None
#: IDs of the features to extract from the layer. This parameter will
#: be ignored if either the ``aoi`` or the ``attr_filter`` parameters
#: are specified by the user.
fids: Optional[Sequence[int]] = None
## Type for specifying an Iterable of geometrical features.
GeometryIterable = Iterable[GeomObject]
[docs]def write_geoms_to_file(
filename: str,
driver_name: str,
geoms_iter: GeometryIterable,
geoms_epsg: Optional[int] = None,
layer: Optional[LayerID] = None,
mode: Literal["update", "overwrite"] = "update",
) -> None:
"""Write multiple geometrical features to disk.
The function takes as input an iterable of geometrical features
and writes them to disk using one of the existing `GDAL/OGR drivers
<https://gdal.org/drivers/vector/index.html>`_.
Parameters
----------
filename : `str`
Path to the output file where the geometrical features will be
written to.
driver_name : `str`
Name of the GDAL/OGR driver to use for writing the file.
geoms_iter : iterable of `.GeomObject`
Iterable of the geometrical features to write.
geoms_epsg : `int`, optional
EPSG code of the input geometrical features. If the Spatial
Reference System of the input geometrical features is
specified and differs from that of the layer they will
written to (in case of an update, see``mode`` parameter), the
coordinates of the geometries will be reprojected to the
layer's Spatial Reference System. It is set to `None` as
default (no Spatial Reference System).
layer : `LayerID`, optional
Layer name/index on which to write the input geometries. In
case of a file update (see ``mode`` parameter), the index of
an existing layer can be passed as argument. If layer is set
to `None` (default), the geometrical features will be written,
in ``update`` mode, on the first layer available (at index 0),
if any. If no layer is available, as well as in ``overwrite``
mode, the layer parameter set to `None` will result in the
function writing the input geometries to a layer named
``default`` (if the driver supports named layers).
mode : {``"update"``, ``"overwrite"``}
If set to ``"update"``, the function will update an existing
file, or will create it if it does not exist. If set to
``"overwrite"``, the function will delete any file at the path
set to the ``filename`` parameter, and will create a new file
at this same location.
Returns
-------
`None`
"""
logger = _setup_logger()
try:
from osgeo import ogr, osr
ogr.UseExceptions()
except ImportError:
raise NotImplementedError(
"You must install GDAL/OGR and its Python bindings to call "
f"{inspect.stack()[0].function!r}!"
)
driver = ogr.GetDriverByName(driver_name)
if geoms_epsg is not None:
geoms_epsg = int(geoms_epsg)
srs = osr.SpatialReference()
srs.ImportFromEPSG(geoms_epsg)
else:
srs = None
geoms_iter = iter(geoms_iter)
# geoms_list = iter(geoms_iter)
# if not len(set(g.__class__ for g in geoms_list)) == 1:
# raise ValueError("Cannot process input geometries of different types!")
first_geom = next(geoms_iter)
geom_type = _geom_type_mapping[first_geom.geom_type]
geoms_iter = itertools.chain([first_geom], geoms_iter)
if not mode in ("update", "overwrite"):
raise ValueError(
"Wrong value for the 'mode' argument: must be either 'update' or "
"'overwrite'!"
)
if mode == "update":
_update_geoms_file(
geoms_iter, geom_type, geoms_epsg, srs, filename, driver, layer, logger
)
else:
_write_geoms_file(geoms_iter, geom_type, srs, filename, driver, layer, logger)
def _update_geoms_file(
geoms_iter, geom_type, geoms_epsg, srs, filename, driver, layer, logger
):
"""Update or a create a new file on disk and add input geometries."""
ds = driver.Open(filename, 1)
if ds is None:
_write_geoms_file(
geoms_iter, geom_type, srs, filename, driver, layer, logger
)
return
if layer is not None:
lyr_obj = ds.GetLayer(layer)
if lyr_obj is None and isinstance(layer, Integral):
raise ValueError(f"The layer with index {layer!r} does not exist.")
else:
layer = "default"
lyr_obj = ds.GetLayer()
transform_geom = _unchanged_geom
if lyr_obj is None:
lyr_obj = ds.CreateLayer(layer, srs=srs, geom_type=geom_type)
lyr_def = lyr_obj.GetLayerDefn()
else:
lyr_def = lyr_obj.GetLayerDefn()
lyr_epsg = _get_layer_epsg(lyr_obj)
if geoms_epsg is not None and lyr_epsg is not None and lyr_epsg != geoms_epsg:
logger.info(
f"The spatial reference system of the output file {filename!r}, "
f"layer {layer!r}, is different from that of the input geometry "
"features. The geometry features will be reprojected before being "
"added to the file."
)
transform_geom = get_transform_func(geoms_epsg, lyr_epsg)
else:
logger.info(
f"The spatial reference system of the output file {filename!r}, "
f"layer {layer!r}, could not be found or identified. Input geometry "
"features will be added to the file without transformation."
)
for geom in geoms_iter:
feature = ogr.Feature(lyr_def)
geom = transform_geom(geom)
feature.SetGeometry(ogr.CreateGeometryFromWkt(geom.wkt))
lyr_obj.CreateFeature(feature)
feature = None
ds = None
def _write_geoms_file(geoms_iter, geom_type, srs, filename, driver, layer, logger):
"""Delete any existing path at given, create a new file on disk
and add input geometries.
"""
if isinstance(layer, Integral):
raise TypeError(
"You cannot create a new layer by index, you must specify a layer name."
)
if os.path.exists(filename):
driver.DeleteDataSource(filename)
ds = driver.CreateDataSource(filename)
if layer is None:
layer = "default"
lyr_obj = ds.CreateLayer(layer, srs=srs, geom_type=geom_type)
lyr_def = lyr_obj.GetLayerDefn()
for geom in geoms_iter:
feature = ogr.Feature(lyr_def)
feature.SetGeometry(ogr.CreateGeometryFromWkt(geom.wkt))
lyr_obj.CreateFeature(feature)
feature = None
## Close the output file.
ds = None