# Copyright (c) 2022-2024 by Fraunhofer Institute for Energy Economics and Energy System Technology (IEE)
# Kassel and individual contributors (see AUTHORS file for details).
# All rights reserved.
# Copyright (c) 2024-2026 DAVE_core contributors
# Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.
from dask_geopandas import from_geopandas
from geopandas import GeoDataFrame
from pandas import concat
from shapely import wkb
from shapely.geometry import MultiPoint
from shapely.geometry import Polygon
from shapely.ops import nearest_points
from dave_core.components.substations import create_hv_mv_substations
from dave_core.components.substations import create_mv_lv_substations
from dave_core.datapool.oep_request import oep_request
from dave_core.progressbar import create_tqdm
from dave_core.settings import dave_settings
from dave_core.toolbox import add_dave_name
from dave_core.toolbox import intersection_with_area
def create_transformer(
substation, mv_buses, lv_buses
): # !!! hvmv kann hiermit auch gemacht werden, dafür die Funktion verallgemeinern
mv_buses_mvlv = mv_buses[mv_buses.node_type == "mvlv_substation"]
if (not mv_buses_mvlv.empty) and (substation.ego_subst_id in mv_buses.ego_subst_id.tolist()):
bus_hv = mv_buses[mv_buses.ego_subst_id == substation.ego_subst_id].iloc[0].dave_name
else:
# find closest mv node to the substation
multipoints_mv = MultiPoint(mv_buses.geometry.tolist())
nearest_point = nearest_points(substation.geometry, multipoints_mv)[1]
bus_hv = mv_buses[mv_buses.geometry == nearest_point].iloc[0].dave_name
# get lv bus
if ("ego_subst_id" in lv_buses.keys()) and (
substation.ego_subst_id in lv_buses.ego_subst_id.tolist()
):
bus_lv = lv_buses[lv_buses.ego_subst_id == substation.ego_subst_id].iloc[0].dave_name
else:
# find closest lv node to the substation (only road junctions allowed)
multipoints_lv = MultiPoint(
lv_buses[lv_buses.node_type == "road_junction"].geometry.tolist()
)
nearest_point = nearest_points(substation.geometry, multipoints_lv)[1]
bus_lv = lv_buses[lv_buses.geometry == nearest_point].iloc[0].dave_name
# create transformer
return [
bus_hv,
bus_lv,
dave_settings["mv_voltage"], # voltage_kv_hv
0.4, # voltage_kv_lv
6, # voltage_level
substation.ego_version,
substation.ego_subst_id,
substation.geometry,
]
[docs]
def create_ehv_hv_trafos(grid_data, power_levels, pbar):
"""
This function generates ehv/ehv and ehv/hv transformers
"""
# read transformator data from OEP, filter current exsist ones and rename paramter names
hv_trafos, meta_data = oep_request(table="ego_pf_hv_transformer")
# add meta data
if bool(meta_data) and f"{meta_data['Main'].Titel.loc[0]}" not in grid_data.meta_data.keys():
grid_data.meta_data[f"{meta_data['Main'].Titel.loc[0]}"] = meta_data
hv_trafos.rename(
columns={
"version": "ego_version",
"scn_name": "ego_scn_name",
"trafo_id": "ego_trafo_id",
"s_nom": "s_nom_mva",
},
inplace=True,
)
hv_trafos = hv_trafos[hv_trafos.ego_scn_name == "Status Quo"]
# filter transformers which are within the grid area
hv_trafos = intersection_with_area(hv_trafos, grid_data.area)
# update progress
pbar.update(5)
# in case of missing ehv/hv-level, nodes for the transformator must be procured from OEP
if ("ehv" in power_levels and grid_data.hv_data.hv_nodes.empty) or (
"hv" in power_levels and grid_data.ehv_data.ehv_nodes.empty
):
# read ehv/hv node data from OpenEnergyPlatform and adapt names
ehvhv_buses, meta_data = oep_request(table="ego_pf_hv_bus")
# add meta data
if (
bool(meta_data)
and f"{meta_data['Main'].Titel.loc[0]}" not in grid_data.meta_data.keys()
):
grid_data.meta_data[f"{meta_data['Main'].Titel.loc[0]}"] = meta_data
ehvhv_buses.rename(
columns={
"version": "ego_version",
"scn_name": "ego_scn_name",
"bus_id": "ego_bus_id",
"v_nom": "voltage_kv",
},
inplace=True,
)
ehvhv_buses = ehvhv_buses[ehvhv_buses.ego_scn_name == "Status Quo"]
# filter buses which are within the grid area
ehvhv_buses = intersection_with_area(ehvhv_buses, grid_data.area)
# update progress
pbar.update(5)
# search for trafo voltage and create missing nodes
for _, trafo in hv_trafos.iterrows():
if "ehv" in power_levels:
ehv_bus0 = grid_data.ehv_data.ehv_nodes[
grid_data.ehv_data.ehv_nodes.ego_bus_id == trafo.bus0
]
ehv_bus1 = grid_data.ehv_data.ehv_nodes[
grid_data.ehv_data.ehv_nodes.ego_bus_id == trafo.bus1
]
if not ehv_bus0.empty:
hv_trafos.at[trafo.name, "voltage_kv_lv"] = grid_data.ehv_data.ehv_nodes.loc[
ehv_bus0.index[0]
].voltage_kv
if not ehv_bus1.empty:
hv_trafos.at[trafo.name, "voltage_kv_hv"] = grid_data.ehv_data.ehv_nodes.loc[
ehv_bus1.index[0]
].voltage_kv
if ("hv" not in power_levels) and (ehv_bus0.empty):
hv_buses = ehvhv_buses[ehvhv_buses.voltage_kv.isin([110])]
hv_bus0 = hv_buses[hv_buses.ego_bus_id == trafo.bus0]
if not hv_bus0.empty:
hv_trafos.at[trafo.name, "voltage_kv_lv"] = hv_buses.loc[
hv_bus0.index[0]
].voltage_kv
# check if node allready exsist, otherwise create them
if grid_data.hv_data.hv_nodes.empty:
hv_bus0["voltage_level"] = 3
hv_bus0["source"] = "OEP"
grid_data.hv_data.hv_nodes = concat(
[grid_data.hv_data.hv_nodes, hv_bus0],
ignore_index=True,
)
elif grid_data.hv_data.hv_nodes[
grid_data.hv_data.hv_nodes.ego_bus_id == trafo.bus0
].empty:
hv_bus0["voltage_level"] = 3
hv_bus0["source"] = "OEP"
grid_data.hv_data.hv_nodes = concat(
[grid_data.hv_data.hv_nodes, hv_bus0],
ignore_index=True,
)
if "hv" in power_levels:
hv_bus0 = grid_data.hv_data.hv_nodes[
grid_data.hv_data.hv_nodes.ego_bus_id == trafo.bus0
]
if not hv_bus0.empty:
hv_trafos.at[trafo.name, "voltage_kv_lv"] = grid_data.hv_data.hv_nodes.loc[
hv_bus0.index[0]
].voltage_kv
if ("ehv" not in power_levels) and (not hv_bus0.empty):
ehv_buses = ehvhv_buses[ehvhv_buses.voltage_kv.isin([380, 220])]
ehv_bus1 = ehv_buses[ehv_buses.ego_bus_id == trafo.bus1]
if not ehv_bus1.empty:
hv_trafos.at[trafo.name, "voltage_kv_hv"] = ehv_buses.loc[
ehv_bus1.index[0]
].voltage_kv
# check if node allready exsist, otherwise create them
if grid_data.ehv_data.ehv_nodes.empty:
ehv_bus1["voltage_level"] = 1
ehv_bus1["source"] = "OEP"
grid_data.ehv_data.ehv_nodes = concat(
[grid_data.ehv_data.ehv_nodes, ehv_bus1],
ignore_index=True,
)
elif grid_data.ehv_data.ehv_nodes[
grid_data.ehv_data.ehv_nodes.ego_bus_id == trafo.bus1
].empty:
ehv_bus1["voltage_level"] = 1
ehv_bus1["source"] = "OEP"
grid_data.ehv_data.ehv_nodes = concat(
[grid_data.ehv_data.ehv_nodes, ehv_bus1],
ignore_index=True,
)
# update progress
pbar.update(10 / len(hv_trafos))
# add dave name for nodes which are created for the transformers
if "dave_name" not in grid_data.hv_data.hv_nodes.keys():
grid_data.hv_data.hv_nodes = add_dave_name(grid_data.hv_data.hv_nodes, "node_3")
if "geometry" in grid_data.hv_data.hv_nodes.keys():
grid_data.hv_data.hv_nodes.set_crs(dave_settings["crs_main"], inplace=True)
if "dave_name" not in grid_data.ehv_data.ehv_nodes.keys():
grid_data.ehv_data.ehv_nodes = add_dave_name(grid_data.ehv_data.ehv_nodes, "node_1")
if "geometry" in grid_data.ehv_data.ehv_nodes.keys():
grid_data.ehv_data.ehv_nodes.set_crs(dave_settings["crs_main"], inplace=True)
# write transformator data in grid data and decied the grid level depending on voltage level
if not hv_trafos.empty:
ehv_buses = grid_data.ehv_data.ehv_nodes
hv_buses = grid_data.hv_data.hv_nodes
if "ehv" in power_levels:
ehv_ehv_trafos = hv_trafos[hv_trafos.voltage_kv_lv.isin([380, 220])]
ehv_ehv_trafos["voltage_level"] = 1
# add dave name for trafo and connection buses
ehv_ehv_trafos.insert(
1,
"bus_hv",
ehv_ehv_trafos.bus1.apply(
lambda x: ehv_buses[ehv_buses.ego_bus_id == x].iloc[0].dave_name
),
)
ehv_ehv_trafos.insert(2, "bus_lv", None)
ehv_ehv_trafos["substation_name"] = None
ehv_ehv_trafos["tso_name"] = None
ehv_ehv_trafos.reset_index(drop=True, inplace=True)
for (
_,
trafo,
) in ehv_ehv_trafos.iterrows(): # TODO: ersetzen durch apply functions
# search for bus dave name and replace ego id
bus0 = ehv_buses[ehv_buses.ego_bus_id == trafo.bus0].iloc[0]
ehv_ehv_trafos.at[trafo.name, "bus_lv"] = bus0.dave_name
ehv_ehv_trafos.at[trafo.name, "bus_hv"] = (
ehv_buses[ehv_buses.ego_bus_id == trafo.bus1].iloc[0].dave_name
)
ehv_ehv_trafos.at[trafo.name, "substation_name"] = bus0.subst_name
if "tso_name" in bus0.keys():
ehv_ehv_trafos.at[trafo.name, "tso_name"] = bus0.tso_name
# drop columns with ego_id
ehv_ehv_trafos.drop(columns=["bus0", "bus1"], inplace=True)
# set crs
ehv_ehv_trafos.set_crs(dave_settings["crs_main"], inplace=True)
# add ehv/ehv trafos to grid data
grid_data.components_power.transformers.ehv_ehv = concat(
[
grid_data.components_power.transformers.ehv_ehv,
ehv_ehv_trafos,
],
ignore_index=True,
)
ehv_ehv_trafos = add_dave_name(ehv_ehv_trafos, "trafo_1")
ehv_hv_trafos = hv_trafos[hv_trafos.voltage_kv_lv == 110]
ehv_hv_trafos["voltage_level"] = 2
# add dave name trafo and connection buses
ehv_hv_trafos.insert(1, "bus_hv", None)
ehv_hv_trafos.insert(2, "bus_lv", None)
ehv_hv_trafos["substation_name"] = None
ehv_hv_trafos["tso_name"] = None
ehv_hv_trafos.reset_index(drop=True, inplace=True)
for (
_,
trafo,
) in ehv_hv_trafos.iterrows(): # TODO: ersetzen durch apply functions
# search for bus dave name and replace ego id
bus0 = hv_buses[hv_buses.ego_bus_id == trafo.bus0].iloc[0]
bus1 = ehv_buses[ehv_buses.ego_bus_id == trafo.bus1].iloc[0]
ehv_hv_trafos.at[trafo.name, "bus_lv"] = bus0.dave_name
ehv_hv_trafos.at[trafo.name, "bus_hv"] = bus1.dave_name
if "subst_name" in bus1.keys():
ehv_hv_trafos.at[trafo.name, "substation_name"] = bus1.subst_name
if "tso_name" in bus0.keys():
ehv_hv_trafos.at[trafo.name, "tso_name"] = bus1.tso_name
ehv_hv_trafos = add_dave_name(ehv_hv_trafos, "trafo_2")
# change column name
ehv_hv_trafos.drop(columns=["bus0", "bus1"], inplace=True)
# set crs
ehv_hv_trafos.set_crs(dave_settings["crs_main"], inplace=True)
# add ehv/ehv trafos to grid data
grid_data.components_power.transformers.ehv_hv = concat(
[grid_data.components_power.transformers.ehv_hv, ehv_hv_trafos],
ignore_index=True,
)
# update progress
pbar.update(10)
def create_mv_nodes(substations, node_type):
mv_nodes = substations.copy()
mv_nodes.rename(
columns={"dave_name": "subst_dave_name"},
inplace=True,
)
# set points for geometry
if "geometry" not in mv_nodes.keys() or any(
mv_nodes.geometry.apply(lambda x: isinstance(x, Polygon))
):
mv_nodes["geometry"] = mv_nodes.point.apply(lambda x: wkb.loads(x, hex=True))
# set crs suitable to points and project to right one
mv_nodes.set_crs(dave_settings["crs_degree"], allow_override=True, inplace=True)
mv_nodes.to_crs(dave_settings["crs_main"], inplace=True)
mv_nodes["node_type"] = node_type
mv_nodes["voltage_level"] = 5
mv_nodes["voltage_kv"] = dave_settings["mv_voltage"]
# add oep as source
mv_nodes["source"] = "OEP"
# add dave name
mv_nodes = add_dave_name(mv_nodes, "node_5")
# set crs
mv_nodes.set_crs(dave_settings["crs_main"], inplace=True)
return mv_nodes
[docs]
def create_hv_mv_trafos(grid_data, pbar):
"""
This function generates hv/mv transformers
"""
# get substation information
substations = create_hv_mv_substations(grid_data)
# check if there are substations in the area otherwise no transformer will be created
if not substations.empty:
substations.drop(
columns=[
"power_type",
"substation",
"frequency",
"ref",
"dbahn",
"status",
"otg_id",
"geom",
],
inplace=True,
)
# update progress
pbar.update(10)
# --- prepare hv nodes for the transformers
# check if the hv nodes already exist, otherwise create them
if grid_data.hv_data.hv_nodes.empty:
# --- in this case the missing hv nodes for the transformator must be procured from OEP
# read hv node data from OpenEnergyPlatform and adapt names
hv_nodes, meta_data = oep_request(table="ego_pf_hv_bus")
# add meta data
if (
bool(meta_data)
and f"{meta_data['Main'].Titel.loc[0]}" not in grid_data.meta_data.keys()
):
grid_data.meta_data[f"{meta_data['Main'].Titel.loc[0]}"] = meta_data
hv_nodes.rename(
columns={
"version": "ego_version",
"scn_name": "ego_scn_name",
"bus_id": "ego_bus_id",
"v_nom": "voltage_kv",
},
inplace=True,
)
# filter nodes which are on the hv level, current exsist and within the target area
hv_nodes = hv_nodes[
(hv_nodes.voltage_kv == 110) & (hv_nodes.ego_scn_name == "Status Quo")
]
# filter nodes which are within the grid area
hv_nodes = intersection_with_area(hv_nodes, grid_data.area)
hv_nodes["voltage_level"] = 3
hv_nodes["source"] = "OEP"
hv_nodes.drop(
columns=(["current_type", "v_mag_pu_min", "v_mag_pu_max", "geom"]),
inplace=True,
)
# check for hv nodes within hv/mv substations
substations_keys = substations.keys().tolist()
substations_keys = [
x for x in substations_keys if x not in ["ego_subst_id", "geometry"]
]
substations_reduced = substations.drop(columns=substations_keys)
# filter nodes which are within a substation
hv_nodes = intersection_with_area(hv_nodes, substations_reduced, remove_columns=False)
# add dave name
hv_nodes = add_dave_name(hv_nodes, "node_3")
# set crs
hv_nodes.set_crs(dave_settings["crs_main"], inplace=True)
# add mv nodes to grid data
grid_data.hv_data.hv_nodes = concat(
[grid_data.hv_data.hv_nodes, hv_nodes], ignore_index=True
)
else:
hv_nodes = grid_data.hv_data.hv_nodes
# update progress
pbar.update(10)
# --- prepare mv nodes for the transformers
# check for mv nodes within hv/mv substations if they were created in mv topology function
# Otherwise create missing mv nodes
if grid_data.mv_data.mv_nodes.empty:
# --- in this case the missing mv nodes for the transformers must be created
mv_nodes = create_mv_nodes(substations, "hvmv_substation")
# add mv nodes to grid data
grid_data.mv_data.mv_nodes = concat(
[grid_data.mv_data.mv_nodes, mv_nodes], ignore_index=True
)
else:
mv_nodes = grid_data.mv_data.mv_nodes
# create hv/mv transfromers
for _, sub in substations.iterrows():
# get hv bus
if ("ego_subst_id" in hv_nodes.keys()) and (
sub.ego_subst_id in hv_nodes.ego_subst_id.tolist()
):
bus_hv = hv_nodes[hv_nodes.ego_subst_id == sub.ego_subst_id].iloc[0].dave_name
else:
# find closest hv node to the substation
multipoints_hv = MultiPoint(hv_nodes.geometry.tolist())
nearest_point = nearest_points(sub.geometry.centroid, multipoints_hv)[1]
bus_hv = hv_nodes[hv_nodes.geometry == nearest_point].iloc[0].dave_name
# get lv bus
mv_nodes_hvmv = mv_nodes[mv_nodes.node_type == "hvmv_substation"]
if (not mv_nodes_hvmv.empty) and (
sub.ego_subst_id in mv_nodes_hvmv.ego_subst_id.tolist()
):
bus_lv = (
mv_nodes_hvmv[mv_nodes_hvmv.ego_subst_id == sub.ego_subst_id].iloc[0].dave_name
)
else:
# find closest mv node to the substation
multipoints_hv = MultiPoint(mv_nodes.geometry.tolist())
nearest_point = nearest_points(sub.geometry.centroid, multipoints_hv)[1]
bus_lv = hv_nodes[hv_nodes.geometry == nearest_point].iloc[0].dave_name
# create transformer
trafo_df = GeoDataFrame(
{
"bus_hv": bus_hv,
"bus_lv": bus_lv,
"voltage_kv_hv": [110],
"voltage_kv_lv": [dave_settings["mv_voltage"]],
"voltage_level": [4],
"ego_version": sub.ego_version,
"ego_subst_id": sub.ego_subst_id,
"osm_id": sub.osm_id,
"osm_www": sub.osm_www,
"substation_name": sub.subst_name,
"operator": sub.operator,
"Gemeindeschluessel": sub.Gemeindeschluessel,
"geometry": [sub.geometry.centroid],
},
crs=dave_settings["crs_main"],
)
grid_data.components_power.transformers.hv_mv = concat(
[grid_data.components_power.transformers.hv_mv, trafo_df],
ignore_index=True,
)
# update progress
pbar.update(9.98 / len(substations))
# add dave name
grid_data.components_power.transformers.hv_mv = add_dave_name(
grid_data.components_power.transformers.hv_mv, "trafo_4"
)
else:
# update progress
pbar.update(30)
[docs]
def create_mv_lv_trafos(grid_data, power_levels, pbar):
"""
This function generates mv/lv transformers
"""
# --- get substation information
substations = create_mv_lv_substations(grid_data)
substations.drop(
columns=(
list({"la_id", "geom", "subst_id", "is_dummy", "subst_cnt"} & set(substations.keys()))
),
inplace=True,
)
# update progress
pbar.update(10)
# --- prepare mv nodes for the transformers
# check if the mv nodes already exist, otherwise create them
if grid_data.mv_data.mv_nodes.empty:
# --- in this case the missing mv nodes for the transformator must be created
mv_buses = create_mv_nodes(substations, "mvlv_substation")
# add mv nodes to grid data
grid_data.mv_data.mv_nodes = concat(
[grid_data.mv_data.mv_nodes, mv_buses], ignore_index=True
)
else:
mv_buses = grid_data.mv_data.mv_nodes
# --- prepare lv nodes for the transformers
# check if the lv nodes already exist, otherwise create them
if grid_data.lv_data.lv_nodes.empty:
# --- in this case the missing lv nodes for the transformator must be created
lv_buses = substations.copy()
lv_buses.rename(
columns={"dave_name": "subst_dave_name"},
inplace=True,
)
lv_buses["node_type"] = "mvlv_substation"
lv_buses["voltage_level"] = 7
lv_buses["voltage_kv"] = 0.4
# add oep as source
lv_buses["source"] = "OEP"
# add dave name
lv_buses = add_dave_name(lv_buses, "node_7")
# set crs
lv_buses.set_crs(dave_settings["crs_main"], inplace=True)
# add mv nodes to grid data
grid_data.lv_data.lv_nodes = concat(
[grid_data.lv_data.lv_nodes, lv_buses], ignore_index=True
)
else:
lv_buses = grid_data.lv_data.lv_nodes
# update progress
pbar.update(10)
# create mv/lv transfromers
substations_dask = from_geopandas(substations, npartitions=dave_settings["cpu_number"])
trafo_gdf = GeoDataFrame(
columns=[
"bus_hv",
"bus_lv",
"voltage_kv_hv",
"voltage_kv_lv",
"voltage_level",
"ego_version",
"ego_subst_id",
"geometry",
]
)
trafo_gdf[
[
"bus_hv",
"bus_lv",
"voltage_kv_hv",
"voltage_kv_lv",
"voltage_level",
"ego_version",
"ego_subst_id",
"geometry",
]
] = substations_dask.apply(
lambda x: create_transformer(x, mv_buses, lv_buses),
axis=1,
result_type="expand",
meta=trafo_gdf,
).compute()
trafo_gdf.set_geometry("geometry", inplace=True, crs=dave_settings["crs_main"])
grid_data.components_power.transformers.mv_lv = concat(
[grid_data.components_power.transformers.mv_lv, trafo_gdf],
ignore_index=True,
)
# add a synthetic tranformer on the first grid node if necessary
if grid_data.components_power.transformers.mv_lv.empty:
if "mv" not in power_levels:
buses_lv = grid_data.lv_data.lv_nodes
first_bus = buses_lv[buses_lv.node_type == "road_junction"].iloc[0]
if grid_data.mv_data.mv_nodes.empty:
# if there are no mv nodes, create a new one from data of the first lv bus
dave_name = "node_5_0"
mv_bus_df = GeoDataFrame(
{
"dave_name": dave_name,
"voltage_kv": [dave_settings["mv_voltage"]],
"voltage_level": [5],
"geometry": [first_bus.geometry],
"source": "dave internal",
},
crs=dave_settings["crs_main"],
)
grid_data.mv_data.mv_nodes = concat(
[grid_data.mv_data.mv_nodes, mv_bus_df], ignore_index=True
)
bus_hv = dave_name
# !!! missing an else statement in case there are allready mv nodes (not empty)
bus_lv = first_bus.dave_name
# create transformer
trafo_df = GeoDataFrame(
{
"bus_hv": bus_hv,
"bus_lv": bus_lv,
"voltage_kv_hv": [dave_settings["mv_voltage"]],
"voltage_kv_lv": [0.4],
"voltage_level": [6],
"geometry": [first_bus.geometry],
},
crs=dave_settings["crs_main"],
)
grid_data.components_power.transformers.mv_lv = concat(
[grid_data.components_power.transformers.mv_lv, trafo_df],
ignore_index=True,
)
elif "lv" not in power_levels:
pass
# noch definieren
# add dave name
grid_data.components_power.transformers.mv_lv = add_dave_name(
grid_data.components_power.transformers.mv_lv, "trafo_6"
)
# update progress
pbar.update(10)
def create_transformers(grid_data):
"""
Collect transformers for different voltage levels.
- EHV/EHV and EHV/HV trafos → ego_pf_hv_transformer from OEP
- HV/MV trafos → ego_dp_hvmv_substation from OEP
- MV/LV trafos → ego_dp_mvlv_substation from OEP
"""
pbar = create_tqdm(desc="create transformers")
# --- normalize power_levels ---
raw = grid_data.target_input.power_levels
power_levels = [
str(level).lower() for sub in raw for level in (sub if isinstance(sub, list) else [sub])
]
# --- EHV/HV transformers ---
if any(x in power_levels for x in ["ehv", "hv"]) and not (
grid_data.ehv_data.ehv_nodes.empty and grid_data.hv_data.hv_nodes.empty
):
create_ehv_hv_trafos(grid_data, power_levels, pbar)
else:
pbar.update(30)
# --- HV/MV transformers ---
if (
any(x in power_levels for x in ["hv", "mv"])
and grid_data.components_power.transformers.hv_mv.empty
):
create_hv_mv_trafos(grid_data, pbar)
else:
pbar.update(30)
# --- MV/LV transformers ---
if (
any(x in power_levels for x in ["mv", "lv"])
and grid_data.components_power.transformers.mv_lv.empty
):
create_mv_lv_trafos(grid_data, power_levels, pbar)
else:
pbar.update(30)
# --- finish ---
pbar.update(10)
pbar.close()
# lv_nodes find closest node, hierbei wenn distanz mehr als 50 m dann leitung erstellen auf
# lv ebene. schauen ob bereits ein Knoten existiert (distance <=10E-05?) da die bei mv ebene
# schon erstellt werden, ansonsonsten knoten erstellen an dem trafo und mit dem nächsten
# knoten verbinden + Leitung diese zusätzzliche Leitung auch bei hv/mv trafos mit rein