# Copyright (c) 2022-2024 by Fraunhofer Institute for Energy Economics and Energy System Technology (IEE)
# Kassel and individual contributors (see AUTHORS file for details). All rights reserved.
# Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.
from dask_geopandas import from_geopandas
from geopandas import GeoDataFrame
from geopandas import GeoSeries
from pandas import concat
from shapely.geometry import LineString
from shapely.geometry import Point
from shapely.geometry import Polygon
from dave_core.datapool.osm_request import osm_request
from dave_core.settings import dave_settings
from dave_core.toolbox import intersection_with_area
[docs]
def get_osm_data(grid_data, key, border, target_geom):
"""
This function requests data from osm and filter it
INPUT:
**grid_data** (string) - DAVE data dictionary
**key** (string) - name of the object type which should be considered
**border** (geometrie) - border for the data consideration
**target_geom** (geometrie) - geometry of the considerd target
"""
data, meta_data = osm_request(data_type=key, area=border)
# add meta data
if f"{meta_data['Main'].Titel.loc[0]}" not in grid_data.meta_data.keys():
grid_data.meta_data[f"{meta_data['Main'].Titel.loc[0]}"] = meta_data
# check if there are data
if not data.empty:
# filter data parameters which are relevant for the grid modeling
data = data.filter(dave_settings["osm_tags"][key][3])
data.rename(columns={"id": "osm_id"}, inplace=True)
# consider only data which are linestring elements and within considered area
data_dask = from_geopandas(data.geometry, npartitions=dave_settings["cpu_number"])
data = data[
(data_dask.apply(lambda x: isinstance(x, LineString), meta=data_dask).compute())
& (data_dask.intersects(target_geom).compute())
]
data.set_crs(dave_settings["crs_main"], inplace=True)
return data
[docs]
def from_osm(
grid_data,
pbar,
roads,
buildings,
landuse,
railways,
waterways,
target_geom,
progress_step=None,
):
"""
This function searches for data on OpenStreetMap (OSM) and filters the relevant paramerters
for grid modeling
target = geometry of the considerd target
"""
# count object types to consider for progress bar
objects_list = [roads, buildings, landuse, railways, waterways]
objects_con = len([x for x in objects_list if x is True])
if objects_con == 0:
# update progress
pbar.update(progress_step)
# add a buffer to target to get a bigger view for some geographical informations
target_geom_buff = target_geom.buffer(dave_settings["osm_area_buffer"])
# create border for osm query
border = target_geom.convex_hull
border_buffer = target_geom_buff.convex_hull
# search relevant road informations in the target area
if roads:
roads = get_osm_data(grid_data, "road", border_buffer, target_geom_buff)
grid_data.roads.roads = concat([grid_data.roads.roads, roads], ignore_index=True)
# update progress
pbar.update(progress_step / objects_con)
# search landuse informations in the target area
if landuse:
# request landuse information
landuse = get_osm_data(grid_data, "landuse", border_buffer, target_geom_buff)
# request some leisure place information which are relevant as landuse area
leisure = get_osm_data(grid_data, "leisure", border_buffer, target_geom_buff)
# request some natural place information which are relevant as landuse area
natural = get_osm_data(
grid_data, "natural", border.buffer(0.01), target_geom
) # !!! Fehler landuse attribute
# natural parameter in landuse umbenennen und zu landuse hinzufügen?
landuse = concat([landuse, leisure, natural], ignore_index=True)
# check if there are data for landuse
if not landuse.empty:
# convert geometry to polygon
for _, land in landuse.iterrows():
if isinstance(land.geometry, LineString):
# A LinearRing must have at least 3 coordinate tuples
if len(land.geometry.coords[:]) >= 3:
landuse.at[land.name, "geometry"] = Polygon(land.geometry)
else:
landuse.drop([land.name], inplace=True)
elif isinstance(land.geometry, Point):
# delet landuse if geometry is a point
landuse.drop([land.name], inplace=True)
# intersect landuses with the target area
area = grid_data.area.rename(columns={"name": "bundesland"})
# filter landuses which are within the grid area
landuse = intersection_with_area(
landuse, area
) # !!! duplicated with intersection before?
# calculate polygon area in km²
landuse_3035 = landuse.to_crs(dave_settings["crs_meter"])
landuse["area_km2"] = landuse_3035.area / 1e06
# write landuse into grid_data
grid_data.landuse = concat([grid_data.landuse, landuse], ignore_index=True)
grid_data.landuse.set_crs(dave_settings["crs_main"], inplace=True)
# update progress
pbar.update(progress_step / objects_con)
# search building informations in the target area
if buildings:
buildings = get_osm_data(grid_data, "building", border, target_geom)
# check if there are data for buildings
if not buildings.empty:
# create building categories
residential = dave_settings["buildings_residential"]
commercial = dave_settings["buildings_commercial"]
# improve building tag with landuse parameter
if landuse if isinstance(landuse, bool) else not landuse.empty:
landuse_retail = landuse[landuse.landuse == "retail"].geometry.unary_union
landuse_industrial = landuse[landuse.landuse == "industrial"].geometry.unary_union
landuse_commercial = landuse[landuse.landuse == "commercial"].geometry.unary_union
for i, building in buildings.iterrows():
if building.building not in commercial:
if not landuse_retail is None and building.geometry.intersects(
landuse_retail
):
buildings.at[i, "building"] = "retail"
elif not landuse_industrial is None and building.geometry.intersects(
landuse_industrial
):
buildings.at[i, "building"] = "industrial"
elif not landuse_commercial is None and building.geometry.intersects(
landuse_commercial
):
buildings.at[i, "building"] = "commercial"
# write buildings into grid_data
grid_data.buildings.residential = concat(
[
grid_data.buildings.residential,
buildings[buildings.building.isin(residential)],
],
ignore_index=True,
)
grid_data.buildings.commercial = concat(
[
grid_data.buildings.commercial,
buildings[buildings.building.isin(commercial)],
],
ignore_index=True,
)
grid_data.buildings.other = concat(
[
grid_data.buildings.other,
buildings[~buildings.building.isin(residential + commercial)],
],
ignore_index=True,
)
# update progress
pbar.update(progress_step / objects_con)
# search railway informations in the target area
if railways:
railways = get_osm_data(grid_data, "railway", border_buffer, target_geom_buff)
grid_data.railways = concat([grid_data.railways, railways], ignore_index=True)
# update progress
pbar.update(progress_step / objects_con)
# search waterway informations in the target area
if waterways:
waterways = get_osm_data(grid_data, "waterway", border_buffer, target_geom_buff)
grid_data.waterways = concat([grid_data.waterways, waterways], ignore_index=True)
# update progress
pbar.update(progress_step / objects_con)
[docs]
def road_junctions(roads, grid_data):
"""
This function searches junctions for the relevant roads in the target area
"""
roads_3035 = roads.to_crs(dave_settings["crs_meter"])
if not roads_3035.empty:
junction_points = []
while len(roads_3035) > 1:
# considered line
line_geometry = roads_3035.iloc[0].geometry
# check considered line surrounding for possible intersectionpoints with other lines
lines_cross = roads_3035[roads_3035.geometry.crosses(line_geometry.buffer(1))]
if not lines_cross.empty:
# find line intersections between considered line and other lines
line_junctions = line_geometry.intersection(lines_cross.geometry.unary_union)
if line_junctions.geom_type == "Point":
junction_points.append(line_junctions)
elif line_junctions.geom_type == "MultiPoint":
for point in line_junctions.geoms:
junction_points.append(point)
# set new roads quantity for the next iterationstep
roads_3035 = roads_3035.iloc[1:, :]
roads_3035.reset_index(drop=True, inplace=True)
# delet duplicates
junctions = GeoSeries(junction_points).drop_duplicates()
# write road junctions into grid_data
junctions.set_crs(dave_settings["crs_meter"], inplace=True)
junctions = junctions.to_crs(dave_settings["crs_main"])
road_junctions = GeoDataFrame(
{"node_type": "road_junction", "source": "dave internal", "geometry": junctions},
crs="EPSG:4326",
)
grid_data.roads.road_junctions = concat(
[grid_data.roads.road_junctions, road_junctions], ignore_index=True
)
grid_data.roads.road_junctions.set_geometry("geometry", inplace=True)