# Copyright (c) 2022-2024 by Fraunhofer Institute for Energy Economics and Energy System Technology (IEE)
# Kassel and individual contributors (see AUTHORS file for details). All rights reserved.
# Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.
import functools
import operator
from dask_geopandas import from_geopandas
from geopandas import GeoDataFrame
from geopandas import GeoSeries
from pandas import Series
from pandas import concat
from shapely.geometry import LineString
from shapely.geometry import MultiPoint
from shapely.geometry import Point
from shapely.ops import nearest_points
from dave_core.datapool.oep_request import oep_request
from dave_core.geography.geo_utils import nearest_road_points
from dave_core.progressbar import create_tqdm
from dave_core.settings import dave_settings
from dave_core.toolbox import intersection_with_area
from dave_core.toolbox import related_sub
[docs]
def connect_grid_nodes(road_course, road_points, start_node, end_node):
"""
This function builds lines to connect grid nodes with each other along road courses
"""
# get considered grid node pair
start_point = Point(start_node)
end_point = Point(end_node)
# find nearest points to them
start_nearest = nearest_points(start_point, road_points)[1]
end_nearest = nearest_points(end_point, road_points)[1]
# find road index
start_index = road_course.index((start_nearest.x, start_nearest.y))
end_index = road_course.index((end_nearest.x, end_nearest.y))
# check if start_nearest between start and end point
if abs(end_point.distance(start_nearest)) > abs(end_point.distance(start_point)):
start_index += 1
# check if end_nearest is between start and end point
if abs(start_point.distance(end_nearest)) > abs(start_point.distance(end_point)):
end_index -= 1
# add points [start_point, points to follow the road course, end point]
line_points = (
[start_node] + [road_course[k] for k in range(start_index, end_index + 1)] + [end_node]
)
# create a lineString and return them
return LineString(line_points)
[docs]
def search_line_connections(road_geometry, all_nodes):
road_course = road_geometry.coords[:]
# change road direction to become a uniformly road style
if road_course[0] > road_course[len(road_course) - 1]:
road_course = road_course[::-1]
road_points = MultiPoint(road_course)
# find nodes on the considered road and sort them by their longitude to find start point
grid_nodes = sorted(
[node.coords[:][0] for node in all_nodes if road_geometry.distance(node) < 1e-10]
)
if grid_nodes: # check if their are grid nodes on the considered road
# sort nodes by their nearest neighbor
grid_nodes_sort = [grid_nodes[0]] # start node
node_index = 0
while len(grid_nodes) > 1: # sort nodes by their sequenz along the road
start_node = Point(grid_nodes.pop(node_index))
grid_nodes_points = MultiPoint(grid_nodes)
next_node = nearest_points(start_node, grid_nodes_points)[1]
grid_nodes_sort.append(next_node.coords[:][0])
node_index = grid_nodes.index(next_node.coords[:][0])
# build lines to connect all grid nodes with each other
return [
connect_grid_nodes(
road_course,
road_points,
start_node=grid_nodes_sort[j],
end_node=grid_nodes_sort[j + 1],
)
for j in range(len(grid_nodes_sort) - 1)
]
else:
return []
[docs]
def line_connections(grid_data):
"""
This function creates the line connections between the building lines (Points on the roads)
and the road junctions
"""
# define relevant nodes
nearest_building_point = GeoSeries(
grid_data.lv_data.lv_nodes[
grid_data.lv_data.lv_nodes.node_type == "grid_connection"
].geometry
)
all_nodes = concat([nearest_building_point, grid_data.roads.road_junctions]).drop_duplicates()
# search line connections
line_connect = GeoSeries(
functools.reduce(
operator.iadd,
grid_data.roads.roads.geometry.apply(
lambda x: search_line_connections(x, all_nodes)
).to_list(),
[],
),
crs=dave_settings["crs_main"],
) # Todo: TypeError: Non geometry data passed to GeoSeries constructor, received data of dtype 'object'
# calculate line length
line_connections_3035 = line_connect.to_crs(dave_settings["crs_meter"])
lines_gdf = GeoDataFrame(
{
"geometry": line_connect,
"line_type": "line_connections",
"length_km": line_connections_3035.length / 1000,
"voltage_kv": 0.4,
"voltage_level": 7,
"source": "dave internal",
},
crs=dave_settings["crs_main"],
)
grid_data.lv_data.lv_lines = concat([grid_data.lv_data.lv_lines, lines_gdf], ignore_index=True)
[docs]
def create_lv_topology(grid_data):
"""
This function creates a dictonary with all relevant geographical
informations for the target area
INPUT:
**grid_data** (attrdict) - all Informations about the grid
OUTPUT:
Writes data in the DaVe dataset
"""
# set progress bar for lv topology
pbar = create_tqdm(desc="create low voltage topology")
# --- create substations
# create mv/lv substations
if grid_data.components_power.substations.mv_lv.empty:
mvlv_substations, meta_data = oep_request(table="ego_dp_mvlv_substation")
# add meta data
if (
bool(meta_data)
and f"{meta_data['Main'].Titel.loc[0]}" not in grid_data.meta_data.keys()
):
grid_data.meta_data[f"{meta_data['Main'].Titel.loc[0]}"] = meta_data
mvlv_substations.rename(
columns={
"version": "ego_version",
"mvlv_subst_id": "ego_subst_id",
},
inplace=True,
)
# change wrong crs from oep
mvlv_substations.crs = dave_settings["crs_main"]
# filter trafos which are within the grid area
mvlv_substations = intersection_with_area(mvlv_substations, grid_data.area)
if not mvlv_substations.empty:
mvlv_substations["voltage_level"] = 6
# add dave name
mvlv_substations.reset_index(drop=True, inplace=True)
mvlv_substations.insert(
0,
"dave_name",
Series([f"substation_6_{x}" for x in mvlv_substations.index]),
)
# add ehv substations to grid data
grid_data.components_power.substations.mv_lv = concat(
[
grid_data.components_power.substations.mv_lv,
mvlv_substations,
],
ignore_index=True,
)
else:
mvlv_substations = grid_data.components_power.substations.mv_lv.copy()
# update progress
pbar.update(5)
# --- create lv nodes
# shortest way between building centroid and road for relevant buildings (building connections)
buildings_rel = concat(
[grid_data.buildings.residential, grid_data.buildings.commercial],
ignore_index=True,
)
buildings_rel_3035 = buildings_rel.to_crs(dave_settings["crs_meter"])
centroids = buildings_rel_3035.reset_index(drop=True).centroid
centroids = centroids.to_crs(dave_settings["crs_main"])
# filter roads which are not connected to other roads and roads which build small isolated road structures
roads = grid_data.roads.roads
roads_geom_dask = from_geopandas(roads.geometry, npartitions=dave_settings["cpu_number"])
roads_filter = roads[
roads_geom_dask.distance(grid_data.roads.road_junctions.unary_union).compute() < 1e-8
]
nearest_building_points = nearest_road_points(
points=centroids,
roads=roads_filter.geometry,
)
building_connections = concat([centroids, nearest_building_points], axis=1)
building_connections.columns = ["building_centroid", "nearest_point"]
# delet duplicates in nearest road points
building_nearest = GeoSeries(building_connections.nearest_point)
building_nearest.drop_duplicates(inplace=True)
# add lv nodes to grid data
building_nodes_df = GeoDataFrame(
{
"geometry": building_connections.building_centroid,
"node_type": "building_connection",
"voltage_level": 7,
"voltage_kv": 0.4,
"source": "dave internal",
}
)
building_nodes_df = concat(
[
building_nodes_df,
GeoDataFrame(
{
"geometry": building_nearest,
"node_type": "grid_connection",
"voltage_level": 7,
"voltage_kv": 0.4,
"source": "dave internal",
}
),
],
ignore_index=True,
)
# search for the substations where the lv nodes are within
sub_infos = building_nodes_df.geometry.apply(lambda x: related_sub(x, mvlv_substations))
building_nodes_df["ego_subst_id"] = sub_infos.apply(lambda x: x[0])
building_nodes_df["subst_dave_name"] = sub_infos.apply(lambda x: x[1])
building_nodes_df["subst_name"] = sub_infos.apply(lambda x: x[2])
# update progress
pbar.update(5)
# add dave name
building_nodes_df.reset_index(drop=True, inplace=True)
building_nodes_df.insert(
0,
"dave_name",
Series([f"node_7_{x}" for x in building_nodes_df.index]),
)
# add lv nodes to grid data
grid_data.lv_data.lv_nodes = concat(
[grid_data.lv_data.lv_nodes, building_nodes_df], ignore_index=True
)
grid_data.lv_data.lv_nodes.crs = dave_settings["crs_main"]
# update progress
pbar.update(5)
# --- create lines for building connections
line_buildings = GeoSeries(
list(
map(
lambda x, y: LineString([x, y]),
building_connections["building_centroid"],
building_connections["nearest_point"],
)
),
crs=dave_settings["crs_main"],
)
# calculate line length
line_buildings = line_buildings.set_crs(dave_settings["crs_main"])
line_buildings_3035 = line_buildings.to_crs(dave_settings["crs_meter"])
line_gdf = GeoDataFrame(
{
"geometry": line_buildings,
"line_type": "line_buildings",
"length_km": line_buildings_3035.length / 1000,
"voltage_kv": 0.4,
"voltage_level": 7,
"source": "dave internal",
}
)
# write line informations into grid data
grid_data.lv_data.lv_lines = concat([grid_data.lv_data.lv_lines, line_gdf], ignore_index=True)
# set crs
grid_data.lv_data.lv_lines.crs = dave_settings["crs_main"]
# create line connections to connect lines for buildings and road junctions with each other
line_connections(grid_data)
# add dave name for lv_lines
grid_data.lv_data.lv_lines.reset_index(drop=True, inplace=True)
grid_data.lv_data.lv_lines.insert(
0,
"dave_name",
Series([f"line_7_{x}" for x in grid_data.lv_data.lv_lines.index]),
)
# update progress
pbar.update(5)
# --- create missing road junctions to connect the lines with each other
# get line bus names for each line and add to line data
lv_nodes = grid_data.lv_data.lv_nodes
# get road junctions
road_junctions_origin = grid_data.roads.road_junctions
for _, line in grid_data.lv_data.lv_lines.iterrows():
road_junctions_grid = grid_data.lv_data.lv_nodes[
grid_data.lv_data.lv_nodes.node_type == "road_junction"
]
line_coords_from = line.geometry.coords[:][0]
line_coords_to = line.geometry.coords[:][len(line.geometry.coords[:]) - 1]
from_bus = lv_nodes[lv_nodes.geometry.x == line_coords_from[0]]
if len(from_bus) > 1:
from_bus = from_bus[from_bus.geometry.y == line_coords_from[1]]
to_bus = lv_nodes[lv_nodes.geometry.x == line_coords_to[0]]
if len(to_bus) > 1:
to_bus = to_bus[to_bus.geometry.y == line_coords_to[1]]
if not from_bus.empty:
grid_data.lv_data.lv_lines.at[line.name, "from_bus"] = from_bus.iloc[0].dave_name
else:
# check if there is a suitable road junction in grid data
distance = road_junctions_grid.geometry.apply(
lambda x, line_coords_from=line_coords_from: Point(line_coords_from).distance(x)
)
if not distance.empty and distance.min() < 1e-04:
# road junction node was found
dave_name = road_junctions_grid.loc[distance.idxmin()].dave_name
else:
# no road junction was found, create it from road junction data
distance = road_junctions_origin.geometry.apply(
lambda x, line_coords_from=line_coords_from: Point(line_coords_from).distance(x)
)
if distance.min() < 1e-04:
road_junction_geom = road_junctions_origin.loc[distance.idxmin()]
# create lv_point for relevant road junction
dave_number = int(
grid_data.lv_data.lv_nodes.dave_name.tail(1).iloc[0].replace("node_7_", "")
)
dave_name = "node_7_" + str(dave_number + 1)
junction_point_gdf = GeoDataFrame(
{
"geometry": [road_junction_geom],
"dave_name": dave_name,
"node_type": "road_junction",
"voltage_level": 7,
"voltage_kv": 0.4,
"source": "dave internal",
},
crs=dave_settings["crs_main"],
)
grid_data.lv_data.lv_nodes = concat(
[grid_data.lv_data.lv_nodes, junction_point_gdf],
ignore_index=True,
)
grid_data.lv_data.lv_lines.at[line.name, "from_bus"] = dave_name
grid_data.lv_data.lv_nodes.reset_index(drop=True, inplace=True)
road_junctions_grid = grid_data.lv_data.lv_nodes[
grid_data.lv_data.lv_nodes.node_type == "road_junction"
]
if not to_bus.empty:
grid_data.lv_data.lv_lines.at[line.name, "to_bus"] = to_bus.iloc[0].dave_name
else:
# check if there is a suitable road junction in grid data
distance = road_junctions_grid.geometry.apply(
lambda x, line_coords_to=line_coords_to: Point(line_coords_to).distance(x)
)
if distance.min() < 1e-04:
# road junction node was found
dave_name = road_junctions_grid.loc[distance.idxmin()].dave_name
else:
# no road junction was found, create it from road junction data
distance = road_junctions_origin.geometry.apply(
lambda x, line_coords_to=line_coords_to: Point(line_coords_to).distance(x)
)
if distance.min() < 1e-04:
road_junction_geom = road_junctions_origin.loc[distance.idxmin()]
# create lv_point for relevant road junction
dave_number = int(
grid_data.lv_data.lv_nodes.dave_name.tail(1).iloc[0].replace("node_7_", "")
)
dave_name = "node_7_" + str(dave_number + 1)
junction_point_gdf = GeoDataFrame(
{
"geometry": [road_junction_geom],
"dave_name": dave_name,
"node_type": "road_junction",
"voltage_level": 7,
"voltage_kv": 0.4,
"source": "dave internal",
},
crs=dave_settings["crs_main"],
)
grid_data.lv_data.lv_nodes = concat(
[grid_data.lv_data.lv_nodes, junction_point_gdf],
ignore_index=True,
)
grid_data.lv_data.lv_lines.at[line.name, "to_bus"] = dave_name
grid_data.lv_data.lv_nodes.reset_index(drop=True, inplace=True)
# set crs
grid_data.lv_data.lv_nodes.set_crs(dave_settings["crs_main"], inplace=True)
# update progress
pbar.update(80 / len(grid_data.lv_data.lv_lines))
# close progress bar
pbar.close()