# Copyright (c) 2022-2024 by Fraunhofer Institute for Energy Economics and Energy System Technology (IEE)
# Kassel and individual contributors (see AUTHORS file for details). All rights reserved.
# Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.
from os import path
from dask_geopandas import from_geopandas
from geopandas import GeoDataFrame
from geopandas import overlay
from geopy.geocoders import ArcGIS
from numpy import append
from numpy import array
from pandas import concat
from scipy.spatial import Voronoi
from shapely.geometry import LineString
from shapely.geometry import MultiLineString
from shapely.geometry import MultiPoint
from shapely.geometry import Point
from shapely.ops import cascaded_union
from shapely.ops import linemerge
from shapely.ops import polygonize
from dave_core.settings import dave_settings
[docs]
def multiline_coords(line_geometry):
"""
This function extracts the coordinates from a MultiLineString
INPUT:
**line_geometry** (Shapely MultiLinesString) - geometry in MultiLineString format
OUTPUT:
**line_coords** (list) - coordinates of the given MultiLineString
"""
merged_line = linemerge(line_geometry)
# sometimes line merge can not merge the lines correctly
line_coords = []
if isinstance(merged_line, MultiLineString):
for line in list(merged_line.geoms):
line_coords += line.coords[:]
else:
line_coords += merged_line.coords[:]
return line_coords
[docs]
def create_interim_area(areas):
"""
This function creats a interim area to combine not connected areas.
INPUT:
**areas** (GeoDataFrame) - all considered grid areas
OUTPUT:
**areas** (GeoDataFrame) - all considered grid areas extended with interim areas
"""
# check if there are diffrent grid areas
if len(areas) > 1:
# check for isolated areas
areas_iso = []
for i, area in areas.iterrows():
# check if the considered area adjoining an other one
areas_other = areas.drop([i])
distance = areas_other.geometry.apply(lambda x, area=area: area.geometry.distance(x))
if distance.min() > 0:
areas_iso.append((i, distance.idxmin()))
# if their are isolated areas, check for a connection on the highest grid level
if len(areas_iso) > 0:
for area_iso in areas_iso:
# filter areas
geom1 = areas.loc[area_iso[0]].geometry
geom2 = areas.loc[area_iso[1]].geometry
# define diffrence area
combined = cascaded_union([geom1, geom2])
convex_hull = combined.convex_hull
difference = convex_hull.difference(geom1)
difference = difference.difference(geom2)
# add difference area to areas
areas = concat(
[
areas,
GeoDataFrame({"name": "interim area", "geometry": [difference]}),
],
ignore_index=True,
)
return areas
[docs]
def voronoi(points, polygon_param=True):
"""
This function calculates the voronoi diagram for given points
INPUT:
**points** (GeoDataFrame) - all nodes for voronoi analysis (centroids)
**polygon_param** (bool, default True) - if True the centroid and dave name for each \
voronoi polygon will be searched
OUTPUT:
**voronoi polygons** (GeoDataFrame) - all voronoi areas for the given points
"""
# define points for voronoi centroids
points = points.reset_index(drop=True) # don't use inplace
voronoi_centroids = [[point.x, point.y] for i, point in points.geometry.items()]
voronoi_points = array(voronoi_centroids)
# maximum points of the considered area define, which limit the voronoi polygons
bound_points = MultiPoint(points.geometry).convex_hull.buffer(1).bounds
points_boundary = [
[bound_points[0], bound_points[1]],
[bound_points[0], bound_points[3]],
[bound_points[2], bound_points[1]],
[bound_points[2], bound_points[3]],
]
# append boundary points to avoid infinit polygons with relevant nodes
voronoi_points = append(voronoi_points, points_boundary, axis=0)
# carry out voronoi analysis
vor = Voronoi(voronoi_points)
# select finit lines and create LineStrings (regions with -1 are infinit)
lines = [LineString(vor.vertices[line]) for line in vor.ridge_vertices if -1 not in line]
# create polygons from the lines
polygons = array(list(polygonize(lines)))
# create GeoDataFrame with polygons
voronoi_polygons = GeoDataFrame(geometry=polygons, crs=dave_settings["crs_main"])
# search voronoi centroids and dave name
if polygon_param:
voronoi_polygons_geom_dask = from_geopandas(
voronoi_polygons.geometry, npartitions=dave_settings["cpu_number"]
)
voronoi_polygons["centroid"] = voronoi_polygons_geom_dask.apply(
lambda x: (
points[points.within(x)].iloc[0].geometry
if not points[points.within(x)].empty
else "fail"
),
meta=voronoi_polygons_geom_dask,
).compute()
voronoi_polygons["dave_name"] = voronoi_polygons_geom_dask.apply(
lambda x: (
points[points.within(x)].iloc[0].dave_name
if not points[points.within(x)].empty
else "fail"
),
meta=voronoi_polygons_geom_dask,
).compute()
return voronoi_polygons
[docs]
def adress_to_coords(adress, geolocator=None):
"""
This function request geocoordinates to a given adress.
INPUT:
**Adress** (string) - format: street_name housenummber postal_code city
example: 'Königstor 59 34119 Kassel'
OUTPUT:
**geocoordinates** (tuple) - geocoordinates for the adress in format (longitude, latitude)
"""
if not geolocator:
geolocator = ArcGIS(timeout=None)
if adress:
location = geolocator.geocode(adress)
return (location.longitude, location.latitude)
[docs]
def get_data_path(filename=None, dirname=None):
"""
This function returns the full os path for a given directory (and filename)
"""
data_path = (
path.join(dave_settings["dave_dir"], "datapool", dirname, filename)
if filename
else path.join(dave_settings["dave_dir"], "datapool", dirname)
)
return data_path
[docs]
def intersection_with_area(gdf, area, remove_columns=True, only_limit=True):
"""
This function intersects a given geodataframe with an area in consideration of mixed geometry
types at both input variables
INPUT:
**gdf** (GeoDataFrame) - Data to be intersect with an area
**area** (GeoDataFrame) - Considered Area
**remove_columns** (bool, default True) - If True the area parameters will deleted in the \
result
**only_limit** (bool, default True) - If True it will only considered if the data \
intersects the area instead of which part of the area they intersect if the area is \
split in multiple polygons
OUTPUT:
**gdf_over** (GeoDataFrame) - Data which intersetcs with considered area
"""
# reduce grid area geometries to one polygon
if only_limit:
area = GeoDataFrame(geometry=[area.geometry.unary_union], crs=dave_settings["crs_main"])
# check if geodataframe has mixed geometries
geom_types_gdf = set(map(type, gdf.geometry))
geom_types_area = set(map(type, area.geometry))
if len(geom_types_gdf) > 1:
# in this case the geodataframe has mixed geometrie information. A seperated consideration
# of overlay is necessary because the function can not handle mixed geometries
gdf_over = GeoDataFrame([])
for geom_type in geom_types_gdf:
# get indeces for geom type
gdf_geom_idx = [
row.name for i, row in gdf.iterrows() if isinstance(row.geometry, (geom_type))
]
# check for values in the target area
gdf_over_geom = overlay(gdf.loc[gdf_geom_idx], area, how="intersection")
gdf_over = concat([gdf_over, gdf_over_geom], ignore_index=True)
elif len(geom_types_area) > 1:
# in this case the geodataframe has mixed geometrie information. A seperated consideration
# of overlay is necessary because the function can not handle mixed geometries
gdf_over = GeoDataFrame([])
for geom_type in geom_types_area:
area_geom_idx = [
row.name for i, row in area.iterrows() if isinstance(row.geometry, (geom_type))
]
# check for values in the target area
gdf_over_geom = overlay(gdf, area.loc[area_geom_idx], how="intersection")
gdf_over = concat(
[gdf_over, gdf_over_geom], ignore_index=True
) # TODO: Problem ist das es hier Population_1 und _2 gibt, daher wirft er einen Fehler
else:
gdf_over = overlay(gdf, area, how="intersection")
# remove parameters from area
if (not gdf_over.empty) and (remove_columns):
remove_columns = area.keys().tolist()
remove_columns.remove("geometry")
gdf_over.drop(columns=remove_columns, inplace=True)
return gdf_over
def intersect_with_composition(gdf1, gdf2, gdf1_name=None, area=None):
"""
This function intersects two GeoDataFrames with each other and calculates the composition how gdf1 will splitted to gdf2
Hint: gdf1 and gdf2 must have "name" and "geometry" parameters
INPUT:
**gdf1** (GeoDataFrame) - Area with polygons to divide
**gdf2** (GeoDataFrame) - Area with polygons or nodes to which gdf1 should be divide
OPTIONAL:
**gdf1_name** (GeoDataFrame, default None) - Gdf1 parameter which includes the area name. \
Per default the first column will taken
**grid_area** (GeoDataFrame, default None) - definition of the consider area
"""
# reduce data to considered area
if area:
gdf1 = intersection_with_area(gdf1, area)
gdf2 = intersection_with_area(gdf2, area)
# voronoi tesselation in case gdf2 is a dataset of points
if isinstance(gdf2.geometry.iloc[0], Point):
gdf2["geometry"] = voronoi(gdf2, polygon_param=False).geometry
# define gdf1 name per default
if not gdf1_name:
gdf1_name = gdf1.keys()[0]
if gdf1_name == "name":
gdf1_name = "gdf1_name"
gdf1.rename(columns={"name": gdf1_name}, inplace=True)
# intersect data with voronoi regions
gdf_intersect = overlay(gdf1, gdf2) # !!! ggf. drop von centorid und dave_name an der stelle
# replace "nan" because "nan" is not equals to nan
gdf_intersect.fillna("None", inplace=True)
# calculate area percentage
gdf_intersect["area_percentage"] = gdf_intersect.geometry.area / gdf_intersect.apply(
lambda x: gdf1[(gdf1[gdf1_name] == x[gdf1_name])].iloc[0].geometry.area,
axis=1,
)
return gdf_intersect