Source code for dave_core.topology.low_voltage

# Copyright (c) 2022-2024 by Fraunhofer Institute for Energy Economics and Energy System Technology (IEE)
# Kassel and individual contributors (see AUTHORS file for details).
# All rights reserved.
# Copyright (c) 2024-2025 DAVE_core contributors
# Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.


import functools
import operator

from dask_geopandas import from_geopandas
from geopandas import GeoDataFrame
from geopandas import GeoSeries
from pandas import Series
from pandas import concat
from shapely import union_all
from shapely.geometry import LineString
from shapely.geometry import MultiPoint
from shapely.geometry import Point
from shapely.ops import nearest_points

from dave_core.datapool.oep_request import oep_request
from dave_core.geography.geo_utils import nearest_road_points
from dave_core.progressbar import create_tqdm
from dave_core.settings import dave_settings
from dave_core.toolbox import intersection_with_area
from dave_core.toolbox import related_sub


def connect_grid_nodes(road_course, road_points, start_node, end_node):
    """
    This function builds lines to connect grid nodes with each other along road courses
    """
    # get considered grid node pair
    start_point = Point(start_node)
    end_point = Point(end_node)
    # find nearest points to them
    start_nearest = nearest_points(start_point, road_points)[1]
    end_nearest = nearest_points(end_point, road_points)[1]
    # find road index
    start_index = road_course.index((start_nearest.x, start_nearest.y))
    end_index = road_course.index((end_nearest.x, end_nearest.y))
    # check if start_nearest between start and end point
    if abs(end_point.distance(start_nearest)) > abs(end_point.distance(start_point)):
        start_index += 1
    # check if end_nearest is between start and end point
    if abs(start_point.distance(end_nearest)) > abs(start_point.distance(end_point)):
        end_index -= 1
    # add points [start_point, points to follow the road course, end point]
    line_points = (
        [start_node] + [road_course[k] for k in range(start_index, end_index + 1)] + [end_node]
    )
    # create a lineString and return them
    return LineString(line_points)


def search_line_connections(road_geometry, all_nodes):
    road_course = road_geometry.coords[:]
    # change road direction to become a uniformly road style
    if road_course[0] > road_course[len(road_course) - 1]:
        road_course = road_course[::-1]
    road_points = MultiPoint(road_course)
    # find nodes on the considered road and sort them by their longitude to find start point
    grid_nodes = sorted(
        [node.coords[:][0] for node in all_nodes.geometry if road_geometry.distance(node) < 1e-10]
    )
    if grid_nodes:  # check if their are grid nodes on the considered road
        # sort nodes by their nearest neighbor
        grid_nodes_sort = [grid_nodes[0]]  # start node
        node_index = 0
        while len(grid_nodes) > 1:  # sort nodes by their sequenz along the road
            start_node = Point(grid_nodes.pop(node_index))
            grid_nodes_points = MultiPoint(grid_nodes)
            next_node = nearest_points(start_node, grid_nodes_points)[1]
            grid_nodes_sort.append(next_node.coords[:][0])
            node_index = grid_nodes.index(next_node.coords[:][0])
        # build lines to connect all grid nodes with each other
        return [
            connect_grid_nodes(
                road_course,
                road_points,
                start_node=grid_nodes_sort[j],
                end_node=grid_nodes_sort[j + 1],
            )
            for j in range(len(grid_nodes_sort) - 1)
        ]
    else:
        return []


def line_connections(grid_data):
    """
    This function creates the line connections between the building lines (Points on the roads)
    and the road junctions
    """
    # define relevant nodes
    nearest_building_point = GeoSeries(
        grid_data.lv_data.lv_nodes[
            grid_data.lv_data.lv_nodes.node_type == "grid_connection"
        ].geometry
    )
    all_nodes = concat([nearest_building_point, grid_data.roads.road_junctions]).drop_duplicates()
    # search line connections
    line_connect = GeoSeries(
        functools.reduce(
            operator.iadd,
            grid_data.roads.roads.geometry.apply(
                lambda x: search_line_connections(x, all_nodes)
            ).to_list(),
            [],
        ),
        crs=dave_settings["crs_main"],
    )
    # calculate line length
    lines_gdf = GeoDataFrame(
        {
            "geometry": line_connect,
            "line_type": "line_connections",
            "length_km": line_connect.length / 1000,
            "voltage_kv": 0.4,
            "voltage_level": 7,
            "source": "dave internal",
        },
        crs=dave_settings["crs_main"],
    )
    grid_data.lv_data.lv_lines = concat([grid_data.lv_data.lv_lines, lines_gdf], ignore_index=True)


[docs] def create_lv_topology(grid_data): """ This function creates a dictonary with all relevant geographical informations for the target area INPUT: **grid_data** (attrdict) - all Informations about the grid OUTPUT: Writes data in the DaVe dataset """ # set progress bar for lv topology pbar = create_tqdm(desc="create low voltage topology") # --- create substations # create mv/lv substations if grid_data.components_power.substations.mv_lv.empty: mvlv_substations, meta_data = oep_request(table="ego_dp_mvlv_substation") # add meta data if ( bool(meta_data) and f"{meta_data['Main'].Titel.loc[0]}" not in grid_data.meta_data.keys() ): grid_data.meta_data[f"{meta_data['Main'].Titel.loc[0]}"] = meta_data mvlv_substations.rename( columns={ "version": "ego_version", "mvlv_subst_id": "ego_subst_id", }, inplace=True, ) # change wrong crs from oep mvlv_substations.crs = dave_settings["crs_main"] # filter trafos which are within the grid area mvlv_substations = intersection_with_area(mvlv_substations, grid_data.area) if not mvlv_substations.empty: mvlv_substations["voltage_level"] = 6 # add dave name mvlv_substations.reset_index(drop=True, inplace=True) mvlv_substations.insert( 0, "dave_name", Series([f"substation_6_{x}" for x in mvlv_substations.index]), ) # add ehv substations to grid data grid_data.components_power.substations.mv_lv = concat( [ grid_data.components_power.substations.mv_lv, mvlv_substations, ], ignore_index=True, ) else: mvlv_substations = grid_data.components_power.substations.mv_lv.copy() # update progress pbar.update(5) # --- create lv nodes # shortest way between building centroid and road for relevant buildings (building connections) buildings_rel = concat( [grid_data.buildings.residential, grid_data.buildings.commercial], ignore_index=True, ) centroids = buildings_rel.reset_index(drop=True).centroid centroids = centroids.to_crs(dave_settings["crs_main"]) # filter roads which are not connected to other roads and roads which build small isolated road structures roads = grid_data.roads.roads roads_geom_dask = from_geopandas(roads.geometry, npartitions=dave_settings["cpu_number"]) roads_filter = roads[ roads_geom_dask.distance(union_all(grid_data.roads.road_junctions.geometry)).compute() < 1.1e-3 ] nearest_building_points = nearest_road_points( points=centroids, roads=roads_filter.geometry, ) building_connections = concat([centroids, nearest_building_points], axis=1) building_connections.columns = ["building_centroid", "nearest_point"] # delet duplicates in nearest road points building_nearest = GeoSeries(building_connections.nearest_point) building_nearest.drop_duplicates(inplace=True) # add lv nodes to grid data building_nodes_df = GeoDataFrame( { "geometry": building_connections.building_centroid, "node_type": "building_connection", "voltage_level": 7, "voltage_kv": 0.4, "source": "dave internal", } ) building_nodes_df = concat( [ building_nodes_df, GeoDataFrame( { "geometry": building_nearest, "node_type": "grid_connection", "voltage_level": 7, "voltage_kv": 0.4, "source": "dave internal", } ), ], ignore_index=True, ) # search for the substations where the lv nodes are within sub_infos = building_nodes_df.geometry.apply(lambda x: related_sub(x, mvlv_substations)) building_nodes_df["ego_subst_id"] = sub_infos.apply(lambda x: x[0]) building_nodes_df["subst_dave_name"] = sub_infos.apply(lambda x: x[1]) building_nodes_df["subst_name"] = sub_infos.apply(lambda x: x[2]) # update progress pbar.update(5) # add dave name building_nodes_df.reset_index(drop=True, inplace=True) building_nodes_df.insert( # !!! 0, "dave_name", Series([f"node_7_{x}" for x in building_nodes_df.index]), ) # add lv nodes to grid data grid_data.lv_data.lv_nodes = concat( [grid_data.lv_data.lv_nodes, building_nodes_df], ignore_index=True ) grid_data.lv_data.lv_nodes.crs = dave_settings["crs_main"] # update progress pbar.update(5) # --- create lines for building connections line_buildings = GeoSeries( list( map( lambda x, y: LineString([x, y]), building_connections["building_centroid"], building_connections["nearest_point"], ) ), crs=dave_settings["crs_main"], ) # calculate line length line_gdf = GeoDataFrame( { "geometry": line_buildings, "line_type": "line_buildings", "length_km": line_buildings.length / 1000, "voltage_kv": 0.4, "voltage_level": 7, "source": "dave internal", } ) # write line informations into grid data grid_data.lv_data.lv_lines = concat([grid_data.lv_data.lv_lines, line_gdf], ignore_index=True) # set crs grid_data.lv_data.lv_lines.crs = dave_settings["crs_main"] # create line connections to connect lines for buildings and road junctions with each other line_connections(grid_data) # add dave name for lv_lines grid_data.lv_data.lv_lines.reset_index(drop=True, inplace=True) grid_data.lv_data.lv_lines.insert( 0, "dave_name", Series([f"line_7_{x}" for x in grid_data.lv_data.lv_lines.index]), ) # update progress pbar.update(5) # --- create missing road junctions to connect the lines with each other # get line bus names for each line and add to line data lv_nodes = grid_data.lv_data.lv_nodes # get road junctions road_junctions_origin = grid_data.roads.road_junctions for _, line in grid_data.lv_data.lv_lines.iterrows(): road_junctions_grid = grid_data.lv_data.lv_nodes[ grid_data.lv_data.lv_nodes.node_type == "road_junction" ] line_coords_from = line.geometry.coords[:][0] line_coords_to = line.geometry.coords[:][len(line.geometry.coords[:]) - 1] from_bus = lv_nodes[lv_nodes.geometry.x == line_coords_from[0]] if len(from_bus) > 1: from_bus = from_bus[from_bus.geometry.y == line_coords_from[1]] to_bus = lv_nodes[lv_nodes.geometry.x == line_coords_to[0]] if len(to_bus) > 1: to_bus = to_bus[to_bus.geometry.y == line_coords_to[1]] if not from_bus.empty: grid_data.lv_data.lv_lines.at[line.name, "from_bus"] = from_bus.iloc[0].dave_name else: # check if there is a suitable road junction in grid data distance = road_junctions_grid.geometry.apply( lambda x, line_coords_from=line_coords_from: Point(line_coords_from).distance(x) ) if not distance.empty and distance.min() < 11: # road junction node was found dave_name = road_junctions_grid.loc[distance.idxmin()].dave_name else: # no road junction was found, create it from road junction data distance = road_junctions_origin.geometry.apply( lambda x, line_coords_from=line_coords_from: Point(line_coords_from).distance(x) ) if distance.min() < 11: road_junction_geom = road_junctions_origin.loc[distance.idxmin()].geometry # create lv_point for relevant road junction dave_number = int( grid_data.lv_data.lv_nodes.dave_name.tail(1).iloc[0].replace("node_7_", "") ) dave_name = "node_7_" + str(dave_number + 1) junction_point_gdf = GeoDataFrame( { "geometry": [road_junction_geom], "dave_name": dave_name, "node_type": "road_junction", "voltage_level": 7, "voltage_kv": 0.4, "source": "dave internal", }, crs=dave_settings["crs_main"], ) grid_data.lv_data.lv_nodes = concat( [grid_data.lv_data.lv_nodes, junction_point_gdf], ignore_index=True, ) grid_data.lv_data.lv_lines.at[line.name, "from_bus"] = dave_name grid_data.lv_data.lv_nodes.reset_index(drop=True, inplace=True) road_junctions_grid = grid_data.lv_data.lv_nodes[ grid_data.lv_data.lv_nodes.node_type == "road_junction" ] if not to_bus.empty: grid_data.lv_data.lv_lines.at[line.name, "to_bus"] = to_bus.iloc[0].dave_name else: # check if there is a suitable road junction in grid data distance = road_junctions_grid.geometry.apply( lambda x, line_coords_to=line_coords_to: Point(line_coords_to).distance(x) ) if distance.min() < 11: # road junction node was found dave_name = road_junctions_grid.loc[distance.idxmin()].dave_name else: # no road junction was found, create it from road junction data distance = road_junctions_origin.geometry.apply( lambda x, line_coords_to=line_coords_to: Point(line_coords_to).distance(x) ) if distance.min() < 11: road_junction_geom = road_junctions_origin.loc[distance.idxmin()].geometry # create lv_point for relevant road junction dave_number = int( grid_data.lv_data.lv_nodes.dave_name.tail(1).iloc[0].replace("node_7_", "") ) dave_name = "node_7_" + str(dave_number + 1) junction_point_gdf = GeoDataFrame( { "geometry": [road_junction_geom], "dave_name": dave_name, "node_type": "road_junction", "voltage_level": 7, "voltage_kv": 0.4, "source": "dave internal", }, crs=dave_settings["crs_main"], ) grid_data.lv_data.lv_nodes = concat( [grid_data.lv_data.lv_nodes, junction_point_gdf], ignore_index=True, ) grid_data.lv_data.lv_lines.at[line.name, "to_bus"] = dave_name grid_data.lv_data.lv_nodes.reset_index(drop=True, inplace=True) # set crs grid_data.lv_data.lv_nodes.set_crs(dave_settings["crs_main"], inplace=True) # update progress pbar.update(80 / len(grid_data.lv_data.lv_lines)) # close progress bar pbar.close()