Source code for dave_core.components.loads

# Copyright (c) 2022-2024 by Fraunhofer Institute for Energy Economics and Energy System Technology (IEE)
# Kassel and individual contributors (see AUTHORS file for details). All rights reserved.
# Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.


from math import acos
from math import sin

from geopandas import GeoDataFrame
from numpy import array
from numpy import random
from pandas import concat
from shapely.geometry import LineString
from shapely.geometry import MultiLineString
from shapely.geometry import Polygon
from shapely.ops import polygonize
from shapely.ops import unary_union

from dave_core.datapool.osm_request import query_osm
from dave_core.datapool.read_data import read_federal_states
from dave_core.datapool.read_data import read_household_consumption
from dave_core.datapool.read_data import read_postal
from dave_core.progressbar import create_tqdm
from dave_core.settings import dave_settings
from dave_core.toolbox import intersection_with_area
from dave_core.toolbox import voronoi


[docs] def get_household_power(consumption_data, household_size): """ This function calculates the active and reactive power consumption for a given houshold size based on the consumption data for a year INPUT: **consumption_data** (Dict) - consumption data for germany from dave internal datapool **household_size** (int) - size of the houshold between 1 and 5 person """ # set power factor household_consumption = consumption_data["household_consumptions"][ consumption_data["household_consumptions"]["Personen pro Haushalt"] == household_size ] p_mw = (household_consumption.iloc[0]["Durchschnitt [kwh/a]"] / 1000) / dave_settings[ "h_per_a" ] q_mvar = ( p_mw * sin(acos(dave_settings["cos_phi_residential"])) / dave_settings["cos_phi_residential"] ) return p_mw, q_mvar
[docs] def create_loads(grid_data): """ This function creates loads by osm landuse polygons in the target area an assigne them to a suitable node on the considered voltage level by voronoi analysis """ # set progress bar pbar = create_tqdm(desc="create electrical loads") # define avarage load values residential_load = dave_settings["residential_load"] industrial_load = dave_settings["industrial_load"] commercial_load = dave_settings["commercial_load"] # set power factor for loads cos_phi_residential = dave_settings["cos_phi_residential"] cos_phi_industrial = dave_settings["cos_phi_industrial"] cos_phi_commercial = dave_settings["cos_phi_commercial"] # define power_levels power_levels = grid_data.target_input.power_levels[0] # create loads on grid level 7 (LV) if "lv" in power_levels and not grid_data.lv_data.lv_nodes.empty: # get lv building nodes building_nodes = grid_data.lv_data.lv_nodes[ grid_data.lv_data.lv_nodes.node_type == "building_connection" ] # --- create lv loads for residential federal_states, meta_data = read_federal_states() # add meta data if f"{meta_data['Main'].Titel.loc[0]}" not in grid_data.meta_data.keys(): grid_data.meta_data[f"{meta_data['Main'].Titel.loc[0]}"] = meta_data federal_states.rename(columns={"name": "federal state"}, inplace=True) # intersect residential buildings with federal state areas to get the suitable federal state buildings_feds = intersection_with_area( grid_data.buildings.residential, federal_states, remove_columns=False, ) buildings_feds.drop( columns=federal_states.keys().drop("federal state").drop("geometry"), inplace=True, ) # read consumption data consumption_data, meta_data = read_household_consumption() # update progress pbar.update(10) # add meta data if f"{meta_data['Main'].Titel.loc[0]}" not in grid_data.meta_data.keys(): grid_data.meta_data[f"{meta_data['Main'].Titel.loc[0]}"] = meta_data # get population for the diffrent areas if grid_data.target_input.iloc[0].typ in [ "postalcode", "town name", "federal state", ]: population_area = grid_data.area else: # --- Case for own shape as input data # calculate proportions of postal area for grid area postals, meta_data = read_postal() # add meta data if f"{meta_data['Main'].Titel.loc[0]}" not in grid_data.meta_data.keys(): grid_data.meta_data[f"{meta_data['Main'].Titel.loc[0]}"] = meta_data # filter postal code areas which are within the grid area postal_own_intersection = intersection_with_area( postals, grid_data.area, remove_columns=False ) postal_own_intersection.rename( columns={"population": "population_origin"}, inplace=True ) # filter landuses which are within postal code areas postal_own_landuse = intersection_with_area( grid_data.landuse, postal_own_intersection, remove_columns=False, ) for i, postal in postal_own_intersection.iterrows(): # --- calculate full plz residential area border = ( postals[postals.postalcode == postal.postalcode].iloc[0].geometry.convex_hull ) # Obtain data from OSM plz_residential, meta_data = query_osm( "way", border, recurse="down", tags=['landuse~"residential"'], ) # add meta data if f"{meta_data['Main'].Titel.loc[0]}" not in grid_data.meta_data.keys(): grid_data.meta_data[f"{meta_data['Main'].Titel.loc[0]}"] = meta_data # filter non Linestring objects drop_objects = [ obj.name for j, obj in plz_residential.iterrows() if not isinstance(obj.geometry, LineString) ] plz_residential.drop(drop_objects, inplace=True) plz_residential = unary_union( list(polygonize(plz_residential.geometry)) ) # !!! replace unary union function # calculate plz residential area for grid area plz_own_landuse = postal_own_landuse[ postal_own_landuse.postalcode == postal.postalcode ] plz_own_residential = plz_own_landuse[plz_own_landuse.landuse == "residential"] plz_own_residential = plz_own_residential.geometry.unary_union # calculate population for proportion of postal area pop_own = ( plz_own_residential.area / plz_residential.area ) * postal.population_origin postal_own_intersection.at[i, "population"] = round(pop_own) population_area = postal_own_intersection # update progress pbar.update(10) for _, area in population_area.iterrows(): if area.population != 0: # filter buildings for considered area buildings_area = buildings_feds[buildings_feds.geometry.within(area.geometry)] buildings_idx_first = buildings_area.index.to_list() # this will be reduced later federal_state = buildings_area.iloc[0]["federal state"] household_sizes = consumption_data["household_sizes"] sizes_feds = household_sizes[household_sizes.Bundesland == federal_state].iloc[0] # household weights for considered federal state w_1p = sizes_feds["Anteil 1 Person [%]"] / 100 w_2p = sizes_feds["Anteil 2 Personen [%]"] / 100 w_3p = sizes_feds["Anteil 3 Personen [%]"] / 100 w_4p = sizes_feds["Anteil 4 Personen [%]"] / 100 w_5p = sizes_feds["Anteil 5 Personen und mehr [%]"] / 100 # distribute the whole population over the considered area pop_distribute = area.population # construct random generator rng = random.default_rng() while pop_distribute != 0: if pop_distribute > 5: # select houshold size, weighted randomly household_size = rng.choice( [1, 2, 3, 4, 5], 1, p=[w_1p, w_2p, w_3p, w_4p, w_5p], )[0] else: # selcet the rest of population household_size = pop_distribute # get power values for household p_mw, q_mvar = get_household_power(consumption_data, household_size) # reduce population to distribute pop_distribute -= household_size # --- select building in grid area # first every building needs a load if len(buildings_idx_first) != 0: building_idx = buildings_idx_first[0] buildings_idx_first.remove(building_idx) # distribution for the rest loads makes no matter so selected randomly else: building_idx = rng.choice(buildings_area.index.to_list(), 1)[0] # search for suitable grid node building_boundary = buildings_area.loc[building_idx].geometry if isinstance(building_boundary, LineString): building_geom = Polygon(building_boundary) elif isinstance(building_boundary, MultiLineString): multiline_coords = [] for line in building_boundary.geoms: multiline_coords.extend(list(line.coords)) building_geom = Polygon(multiline_coords) building_node = building_nodes[building_nodes.geometry.within(building_geom)] if not building_node.empty: lv_node = building_node.iloc[0] else: # check the case that the building centroid is outside building boundary building_centroid = building_geom.centroid centroid_distance = building_nodes.geometry.apply( lambda x, building_centroid=building_centroid: building_centroid.distance( x ) ) if centroid_distance.min() < 1e-04: lv_node = building_nodes.loc[centroid_distance.idxmin()] # create residential load load_df = GeoDataFrame( { "bus": lv_node.dave_name, "p_mw": p_mw, "q_mvar": q_mvar, "landuse": "residential", "voltage_level": [7], "geometry": lv_node.geometry, } ) grid_data.components_power.loads = concat( [grid_data.components_power.loads, load_df], ignore_index=True, ) # update progress pbar.update(40 / len(population_area)) # create lv loads for industrial industrial_polygons = grid_data.landuse[grid_data.landuse.landuse == "industrial"] industrial_load_full = industrial_polygons.area_km2.sum() * industrial_load # in MW industrial_buildings = grid_data.buildings.commercial[ grid_data.buildings.commercial.building == "industrial" ] industrial_polygons_sum = unary_union( array(list(polygonize(industrial_buildings.geometry))) ) # !!! replace unary union function industrial_area_full = industrial_polygons_sum.area for _, industrial_poly in industrial_buildings.iterrows(): building_poly = next(iter(polygonize(industrial_poly.geometry))) # check for builing bus for load connection building_point = grid_data.lv_data.lv_nodes[ grid_data.lv_data.lv_nodes.geometry.within(building_poly) ] if not building_point.empty: if p_mw != 0: load_df = GeoDataFrame( { "bus": building_point.iloc[0].dave_name, "p_mw": industrial_load_full * (building_poly.area / industrial_area_full), "q_mvar": p_mw * sin(acos(cos_phi_industrial)) / cos_phi_industrial, "landuse": "industrial", "voltage_level": [7], "geometry": building_point.iloc[0].geometry, } ) grid_data.components_power.loads = concat( [grid_data.components_power.loads, load_df], ignore_index=True, ) # update progress pbar.update(20 / len(industrial_buildings)) # create lv loads for commercial commercial_polygons = grid_data.landuse[grid_data.landuse.landuse == "commercial"] commercial_load_full = commercial_polygons.area_km2.sum() * commercial_load # in MW commercial_buildings = grid_data.buildings.commercial[ grid_data.buildings.commercial.building != "industrial" ] commercial_polygons_sum = unary_union( array(list(polygonize(commercial_buildings.geometry))) ) commercial_area_full = commercial_polygons_sum.area for _, commercial_poly in commercial_buildings.iterrows(): building_poly = next(iter(polygonize(commercial_poly.geometry))) # check for builing bus for load connection building_point = grid_data.lv_data.lv_nodes[ grid_data.lv_data.lv_nodes.geometry.within(building_poly) ] if not building_point.empty: if p_mw != 0: load_df = GeoDataFrame( { "bus": building_point.iloc[0].dave_name, "p_mw": commercial_load_full * (building_poly.area / commercial_area_full), "q_mvar": p_mw * sin(acos(cos_phi_commercial)) / cos_phi_commercial, "landuse": "commercial", "voltage_level": [7], "geometry": building_point.iloc[0].geometry, } ) grid_data.components_power.loads = concat( [grid_data.components_power.loads, load_df], ignore_index=True, ) # update progress pbar.update(19.8 / len(commercial_buildings)) # create loads for non grid level 7 elif any(x in power_levels for x in ["ehv", "hv", "mv"]) and not ( grid_data.components_power.transformers.ehv_hv.empty and grid_data.components_power.transformers.hv_mv.empty and grid_data.components_power.transformers.mv_lv.empty ): # create loads on grid level 6 (MV/LV) if "mv" in power_levels: # In this case the loads are assigned to the nearest mv/lv-transformer voronoi_polygons = voronoi( grid_data.components_power.transformers.mv_lv[["dave_name", "geometry"]] ) trafos = grid_data.components_power.transformers.mv_lv voltage_level = 6 # create loads on grid level 4 (HV/MV) elif "hv" in power_levels: # In this case the loads are assigned to the nearest hv/mv-transformer voronoi_polygons = voronoi( grid_data.components_power.transformers.hv_mv[["dave_name", "geometry"]] ) trafos = grid_data.components_power.transformers.hv_mv voltage_level = 4 # create loads on grid level 2 (EHV/HV) elif "ehv" in power_levels: # In this case the loads are assigned to the nearest ehv/hv-transformer voronoi_polygons = voronoi( grid_data.components_power.transformers.ehv_hv[["dave_name", "geometry"]] ) trafos = grid_data.components_power.transformers.ehv_hv voltage_level = 2 # update progress pbar.update(10) # --- create loads for the lowest considered voltage level # filter landuses which are within the voronoi regions intersection = intersection_with_area( grid_data.landuse, voronoi_polygons, remove_columns=False ) intersection.drop(columns=["area_km2"], inplace=True) # calculate area from intersected polygons intersection_3035 = intersection.to_crs(dave_settings["crs_meter"]) intersection["area_km2"] = intersection_3035.area / 1e06 # --- calculate consumption for the diffrent landuses in every single voronoi polygon # create list of all diffrent connection transformers trafo_names = list(set(intersection.dave_name)) trafo_names.sort() # update progress pbar.update(10) # iterate trough diffrent transformers and calulate the diffrent landuse consumptions for trafo_name in trafo_names: # search trafo bus trafo = trafos[trafos.dave_name == trafo_name].iloc[0] landuse_polygons = intersection[intersection.dave_name == trafo_name] # categorize landuse polygons and add to grid_data for loadtype in ["residential", "industrial", "commercial"]: if loadtype == "residential": residential_polygons = landuse_polygons[ landuse_polygons.landuse == "residential" ] area = residential_polygons.area_km2.sum() p_mw = residential_load * area q_mvar = p_mw * sin(acos(cos_phi_residential)) / cos_phi_residential elif loadtype == "industrial": industrial_polygons = landuse_polygons[landuse_polygons.landuse == "industrial"] area = industrial_polygons.area_km2.sum() p_mw = industrial_load * area q_mvar = p_mw * sin(acos(cos_phi_industrial)) / cos_phi_industrial elif loadtype == "commercial": commercial_polygons = landuse_polygons[ landuse_polygons.landuse.isin(["commercial", "retail"]) ] area = commercial_polygons.area_km2.sum() p_mw = commercial_load * area q_mvar = p_mw * sin(acos(cos_phi_commercial)) / cos_phi_commercial if p_mw != 0: load_df = GeoDataFrame( { "bus": trafo.bus_lv, "p_mw": p_mw, "q_mvar": q_mvar, "landuse": loadtype, "trafo_name": trafo_name, "area_km2": area, "voltage_level": [voltage_level], "geometry": trafo.geometry, } ) grid_data.components_power.loads = concat( [grid_data.components_power.loads, load_df], ignore_index=True, ) # update progress pbar.update(79.8 / len(trafo_names)) # add dave name if not grid_data.components_power.loads.empty: grid_data.components_power.loads.insert( 0, "dave_name", grid_data.components_power.loads.apply( lambda x: f"load_{x.voltage_level}_{x.index}", axis=1 ), ) # close progress bar pbar.close()