#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# =============================================================================
__author__ = "Simon Nieland, Michael Hardinghaus, María López Díaz"
__copyright__ = (
"Copyright (c) 2024 Institute of Transport Research, German Aerospace Center"
)
__credits__ = [
"Simon Nieland",
"Michael Hardinghaus",
"Marius Lehne",
"María López Díaz",
]
__license__ = "MIT"
__version__ = "0.0.2"
__maintainer__ = "Simon Nieland"
__email__ = "Simon.Nieland@dlr.de"
__status__ = "Development"
# =============================================================================
""" Bikeability computes bike-friendliness of specific areas."""
# =============================================================================
import os
import sys
import geopandas
import math
from pyproj import CRS
from bikeability import settings
import logging as lg
import datetime as dt
import unicodedata as ud
from contextlib import redirect_stdout
from pathlib import Path
from sklearn.cluster import DBSCAN
from shapely.geometry import MultiPoint
from shapely.geometry import Point
from geopandas import GeoDataFrame
import pandas as pd
import warnings
"""Utility functions for bikeability calculation"""
"""
@name : osm.py
@author : Simon Nieland
@date : 26.11.2023
@copyright : Institut fuer Verkehrsforschung, Deutsches Zentrum fuer Luft- und Raumfahrt
"""
project_path = os.path.abspath("../")
sys.path.append(project_path)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DeprecationWarning)
# from geoalchemy2 import Geometry, WKTElement
[docs]
def project_gdf(
gdf: geopandas.GeoDataFrame,
geom_col: str = "geometry",
to_crs: str = None,
to_latlong: bool = False,
) -> geopandas.GeoDataFrame:
"""
Project a GeoDataFrame to the UTM zone appropriate for its geometries' centroid.
The simple calculation in this function works well for most latitudes, but
won't work for some far northern locations like Svalbard and parts of far
northern Norway.
:param geom_col: geometry column of the dataset
:param gdf: the gdf to be projected
:type gdf: Geopandas.GeoDataFrame
:param to_crs: CRS code. if not None,project GeodataFrame to CRS
:type to_crs: int
:param to_latlong: If True, projects to latlong instead of to UTM
:type to_latlong: bool
:return projected_gdf: A projected GeoDataFrame
:rtype projected_gdf: Geopandas.GeoDataFrame
"""
assert len(gdf) > 0, "You cannot project an empty GeoDataFrame."
# if gdf has no gdf_name attribute, create one now
if not hasattr(gdf, "gdf_name"):
gdf.gdf_name = "unnamed"
# if to_crs was passed-in, use this value to project the gdf
if to_crs is not None:
projected_gdf = gdf.to_crs(to_crs)
# if to_crs was not passed-in, calculate the centroid of the geometry to
# determine UTM zone
else:
if to_latlong:
# if to_latlong is True, project the gdf to latlong
latlong_crs = settings.default_crs
projected_gdf = gdf.to_crs(latlong_crs)
else:
# else, project the gdf to UTM
# if GeoDataFrame is already in UTM, just return it
if (gdf.crs is not None) and (gdf.crs.is_geographic is False):
return gdf
# calculate the centroid of the union of all the geometries in the
# GeoDataFrame
avg_longitude = gdf[geom_col].union_all().centroid.x
# calculate the UTM zone from this avg longitude and define the UTM
# CRS to project
utm_zone = int(math.floor((avg_longitude + 180) / 6.0) + 1)
utm_crs = f"+proj = utm + datum = WGS84 + ellps = WGS84 + zone = {utm_zone} + units = m + type = crs"
crs = CRS.from_proj4(utm_crs)
epsg = crs.to_epsg()
projected_gdf = gdf.to_crs(epsg)
projected_gdf.gdf_name = gdf.gdf_name
return projected_gdf
[docs]
def get_centroids(cluster: object) -> tuple:
"""
:param cluster: Point cluster
:return:
"""
centroid = (MultiPoint(cluster).centroid.x, MultiPoint(cluster).centroid.y)
return tuple(centroid)
[docs]
def cluster_intersections_to_crossroad(
nodes: geopandas.GeoDataFrame, verbose: int = 0
) -> geopandas.GeoDataFrame:
"""
Clusters nodes of the street network into real crossroads
:param nodes: nodes of the street network
:param verbose: degree of verbosity
:return: Geodataframe including real crossroads as Points
"""
nodes["y"] = nodes["geometry"].y
nodes["x"] = nodes["geometry"].x
coords = nodes[["y", "x"]].values
# clustering
if verbose > 1:
print(" performing clustering of intersections..")
db = DBSCAN(eps=40, min_samples=1, n_jobs=-1).fit(coords)
cluster_labels = db.labels_
num_clusters = len(set(cluster_labels))
if verbose > 1:
print(" Number of crossroads: {}\n".format(num_clusters))
clusters = pd.Series([coords[cluster_labels == n] for n in range(num_clusters)])
# get cluster centroids
centermost_points = clusters.map(get_centroids)
lats, lons = zip(*centermost_points)
rep_points = pd.DataFrame({"lon": lons, "lat": lats})
rs = rep_points
geometry = [Point(xy) for xy in zip(rs.lon, rs.lat)]
geo_df = GeoDataFrame(rs, crs=nodes.crs, geometry=geometry)
return geo_df
[docs]
def ts(style="datetime", template=None):
"""
Return current local timestamp as a string.
Parameters
----------
style : string {"datetime", "date", "time"}
format the timestamp with this built-in style
template : string
if not None, format the timestamp with this format string instead of
one of the built-in styles
Returns
-------
ts : string
local timestamp string
"""
if template is None:
if style == "datetime":
template = "{:%Y-%m-%d %H:%M:%S}"
elif style == "date":
template = "{:%Y-%m-%d}"
elif style == "time":
template = "{:%H:%M:%S}"
else: # pragma: no cover
msg = f"unrecognized timestamp style {style!r}"
raise ValueError(msg)
return template.format(dt.datetime.now().astimezone())
[docs]
def log(message, level=None, name=None, filename=None):
"""
Write a message to the logger.
This logs to file and/or prints to the console (terminal), depending on
the current configuration of settings.log_file and settings.log_console.
Parameters
----------
message : string
the message to log
level : int
one of Python's logger.level constants
name : string
name of the logger
filename : string
name of the log file, without file extension
Returns
-------
None
"""
if level is None:
level = settings.log_level
if name is None:
name = settings.log_name
if filename is None:
filename = settings.log_filename
# if logging to file is turned on
if settings.log_file:
# get the current logger (or create a new one, if none), then log
# message at requested level
logger = _get_logger(level=level, name=name, filename=filename)
if level == lg.DEBUG:
logger.debug(message)
elif level == lg.INFO:
logger.info(message)
elif level == lg.WARNING:
logger.warning(message)
elif level == lg.ERROR:
logger.error(message)
# if logging to console (terminal window) is turned on
if settings.log_console:
# prepend timestamp then convert to ASCII for Windows command prompts
message = f"{ts()} {message}"
message = (
ud.normalize("NFKD", message).encode("ascii", errors="replace").decode()
)
try:
# print explicitly to terminal in case Jupyter has captured stdout
if getattr(sys.stdout, "_original_stdstream_copy", None) is not None:
# redirect the Jupyter-captured pipe back to original
os.dup2(sys.stdout._original_stdstream_copy, sys.__stdout__.fileno())
sys.stdout._original_stdstream_copy = None
with redirect_stdout(sys.__stdout__):
print(message, file=sys.__stdout__, flush=True)
except OSError:
# handle pytest on Windows raising OSError from sys.__stdout__
print(message, flush=True) # noqa: T201
def _get_logger(level, name, filename):
"""
Create a logger or return the current one if already instantiated.
Parameters
----------
level : int
one of Python's logger.level constants
name : string
name of the logger
filename : string
name of the log file, without file extension
Returns
-------
logger : logging.logger
"""
logger = lg.getLogger(name)
# if a logger with this name is not already set up
if not getattr(logger, "handler_set", None):
# get today's date and construct a log filename
log_filename = Path(settings.logs_folder) / f'{filename}_{ts(style="date")}.log'
# if the logs folder does not already exist, create it
log_filename.parent.mkdir(parents=True, exist_ok=True)
# create file handler and log formatter and set them up
handler = lg.FileHandler(log_filename, encoding="utf-8")
formatter = lg.Formatter("%(asctime)s %(levelname)s %(name)s %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(level)
logger.handler_set = True
return logger