Source code for dave_core.geography.target_area

# Copyright (c) 2022-2024 by Fraunhofer Institute for Energy Economics and Energy System Technology (IEE)
# Kassel and individual contributors (see AUTHORS file for details). All rights reserved.
# Use of this source code is governed by a BSD-style license that can be found in the LICENSE file.

from dask_geopandas import from_geopandas
from geopandas import GeoDataFrame
from geopandas import read_file
from pandas import DataFrame
from pandas import concat
from shapely.geometry import Polygon, MultiPolygon
from shapely.ops import unary_union

from dave_core.archiv_io import archiv_inventory
from dave_core.datapool.read_data import read_federal_states
from dave_core.datapool.read_data import read_nuts_regions
from dave_core.datapool.read_data import read_postal
from dave_core.geography.osm_data import from_osm
from dave_core.geography.osm_data import road_junctions
from dave_core.io.file_io import from_json_string
from dave_core.progressbar import create_tqdm
from dave_core.settings import dave_settings
from dave_core.toolbox import intersection_with_area


def _target_by_postalcode(grid_data, postalcode):
    """
    This function filter the postalcode informations for the target area.
    Multiple postalcode areas will be combinated.
    """
    postal, meta_data = read_postal()
    # add meta data
    if f"{meta_data['Main'].Titel.loc[0]}" not in grid_data.meta_data.keys():
        grid_data.meta_data[f"{meta_data['Main'].Titel.loc[0]}"] = meta_data
    if len(postalcode) == 1 and postalcode[0].lower() == "all":
        # in this case all postalcode areas will be choosen
        target = postal
    else:
        target = postal[postal.postalcode.isin(postalcode)].reset_index(drop=True)
        # sort postalcodes
        postalcode.sort()
    return target


def _target_by_own_area(grid_data, own_area):
    """
    This function define the target area by a own area from the user. This could be a shapefile or
    directly a polygon. Furthermore the function filter the postalcode informations for the target area.
    """
    if isinstance(own_area, str):
        if own_area.split(".")[-1] in ["shp", "geojson"]:
            target = read_file(own_area)
        else:
            target = from_json_string(own_area)
        # check if the given shape file is empty
        if target.empty:
            print("The given shapefile includes no data")
    elif isinstance(own_area, Polygon):
        target = GeoDataFrame(
            {"name": ["own area"], "geometry": [own_area]},
            crs=dave_settings["crs_main"],
        )
    elif isinstance(own_area, MultiPolygon):
        own_area = unary_union(own_area)
        target = GeoDataFrame(
            {"name": ["own area"], "geometry": [own_area]},
            crs=dave_settings["crs_main"],
        )
    else:
        print("The given format is unknown")

    # check crs and project to the right one if needed
    if (target.crs) and (target.crs != dave_settings["crs_main"]):
        target = target.to_crs(dave_settings["crs_main"])
    if "id" in target.keys():
        target = target.drop(columns=["id"])
    # convert own area into postal code areas for target_input
    postal, meta_data = read_postal()
    # add meta data
    if f"{meta_data['Main'].Titel.loc[0]}" not in grid_data.meta_data.keys():
        grid_data.meta_data[f"{meta_data['Main'].Titel.loc[0]}"] = meta_data
    # filter postal code areas which are within the target area
    postal_intersection = intersection_with_area(postal, target, remove_columns=False)
    # filter duplicated postal codes
    own_postal = postal_intersection["postalcode"].unique().tolist()
    return target, own_postal


def _target_by_town_name(grid_data, town_name):
    """
    This function filter the postalcode informations for the target area.
    Multiple town name areas will be combinated
    """
    postal, meta_data = read_postal()
    # add meta data
    if f"{meta_data['Main'].Titel.loc[0]}" not in grid_data.meta_data.keys():
        grid_data.meta_data[f"{meta_data['Main'].Titel.loc[0]}"] = meta_data
    if len(town_name) == 1 and town_name[0].lower() == "all":
        # in this case all city names will be choosen (same case as all postalcode areas)
        target = postal
    else:
        # bring town names in right format and filter data
        normalized_town_names = [town.lower() for town in town_name]
        normalized_postal_town = postal.town.str.lower()
        indexes = normalized_postal_town.isin(normalized_town_names)
        target = postal[indexes].reset_index(drop=True)
        if len(target.town.unique()) != len(town_name):
            raise ValueError("town name wasn`t found. Please check your input")
        # sort town names
        town_name.sort()
    return target, town_name


def _target_by_federal_state(grid_data, federal_state):
    """
    This function filter the federal state informations for the target area.
    Multiple federal state areas will be combinated.
    """
    states, meta_data = read_federal_states()
    # add meta data
    if f"{meta_data['Main'].Titel.loc[0]}" not in grid_data.meta_data.keys():
        grid_data.meta_data[f"{meta_data['Main'].Titel.loc[0]}"] = meta_data
    if len(federal_state) == 1 and federal_state[0].lower() == "all":
        # in this case all federal states will be choosen
        target = states
    else:
        # bring federal state names in right format and filter data
        federal_state = [
            "-".join([part.capitalize() for part in state.split("-")]) for state in federal_state
        ]
        target = states[states["name"].isin(federal_state)].reset_index(drop=True)
        if len(target) != len(federal_state):
            raise ValueError("federal state name wasn`t found. Please check your input")
        # sort federal state names
        federal_state.sort()
    # convert federal states into postal code areas for target_input
    postal, meta_data = read_postal()
    # add meta data
    if f"{meta_data['Main'].Titel.loc[0]}" not in grid_data.meta_data.keys():
        grid_data.meta_data[f"{meta_data['Main'].Titel.loc[0]}"] = meta_data
    # filter postal code areas which are within the target area
    postal_intersection = intersection_with_area(postal, target, remove_columns=False)
    # filter duplicated postal codes
    federal_state_postal = postal_intersection["postalcode"].unique().tolist()
    return target, federal_state, federal_state_postal


def _target_by_nuts_region(grid_data, nuts_region):
    """
    This function filter the nuts region informations for the target area.
    """
    # check user input
    if isinstance(nuts_region, list):
        nuts_region = (nuts_region, "2016")  # default year
    # read nuts-3 areas
    nuts, meta_data = read_nuts_regions(year=nuts_region[1])
    nuts_3 = nuts[nuts.LEVL_CODE == 3]
    # add meta data
    if f"{meta_data['Main'].Titel.loc[0]}" not in grid_data.meta_data.keys():
        grid_data.meta_data[f"{meta_data['Main'].Titel.loc[0]}"] = meta_data
    if len(nuts_region[0]) == 1 and nuts_region[0][0].lower() == "all":
        # in this case all nuts_regions will be choosen
        target = nuts_3
    else:
        # bring NUTS ID in right format
        nuts_regions = [
            "".join([letter.upper() if letter.isalpha() else letter for letter in list(nuts)])
            for nuts in nuts_region[0]
        ]
        nuts_region = (nuts_regions, nuts_region[1])
        for i, region in enumerate(nuts_region[0]):
            # get area for nuts region
            nuts_contains = nuts_3[nuts_3["NUTS_ID"].str.contains(region)]
            target = nuts_contains if i == 0 else concat([target, nuts_contains], ignore_index=True)
            if nuts_contains.empty:
                raise ValueError("nuts region name wasn`t found. Please check your input")
    # filter duplicates
    target.drop_duplicates(inplace=True)
    # convert nuts regions into postal code areas for target_input
    postal, meta_data = read_postal()
    # add meta data
    if f"{meta_data['Main'].Titel.loc[0]}" not in grid_data.meta_data.keys():
        grid_data.meta_data[f"{meta_data['Main'].Titel.loc[0]}"] = meta_data
    # filter postal code areas which are within the target area
    postal_intersection = intersection_with_area(postal, target, remove_columns=False)
    # filter duplicated postal codes
    nuts_region_postal = postal_intersection["postalcode"].unique().tolist()
    return target, nuts_region_postal


[docs] def target_area( grid_data, power_levels, gas_levels, postalcode=None, town_name=None, federal_state=None, nuts_region=None, own_area=None, buffer=0, roads=True, buildings=True, landuse=True, railways=True, waterways=True, ): """ This function calculate all relevant geographical informations for the target area and add it \ to the grid_data INPUT: **grid_data** (attrdict) - grid_data as a attrdict in dave structure **power_levels** (list) - this parameter defines which power levels should be considered \ options: 'ehv','hv','mv','lv', []. \ there could be choose: one level, multiple levels or 'ALL' **gas_levels** (list) - this parameter defines which gas levels should be considered \ options: 'hp','mp','lp', []. \ there could be choose: one level, multiple levels or 'ALL' One of these parameters must be set: **postalcode** (List of strings) - numbers of the target postalcode areas. \ it could also be choose ['ALL'] for all postalcode areas \ in germany **town_name** (List of strings) - names of the target towns \ it could also be choose ['ALL'] for all citys in germany **federal_state** (List of strings) - names of the target federal states \ it could also be choose ['ALL'] for all federal \ states in germany **nuts_region** (List of strings) - codes of the target nuts regions \ it could also be choose ['ALL'] for all nuts regions \ in europe **own_area** (string) - full path to a shape file which includes own target area \ (e.g. "C:/Users/name/test/test.shp") or Geodataframe as string OPTIONAL: **buffer** (float, default 0) - buffer for the target area **roads** (boolean, default True) - obtain informations about roads which are relevant for \ the grid model **buildings** (boolean, default True) - obtain informations about buildings **landuse** (boolean, default True) - obtain informations about landuses **railway** (boolean, default True) - obtain informations about railways **waterways** (boolean, default True) - obtain informations about waterways EXAMPLE: from dave.topology import target_area target_area(town_name = ['Kassel'], buffer=0) """ # set progress bar pbar = create_tqdm(desc="collect geographical data") # check wich input parameter is given if postalcode: target = _target_by_postalcode( grid_data, postalcode, ) target_input = DataFrame( { "typ": "postalcode", "data": [postalcode], "power_levels": [power_levels], "gas_levels": [gas_levels], } ) grid_data.target_input = target_input elif town_name: target, town_name = _target_by_town_name(grid_data, town_name) target_input = DataFrame( { "typ": "town name", "data": [town_name], "power_levels": [power_levels], "gas_levels": [gas_levels], } ) grid_data.target_input = target_input elif federal_state: target, federal_state, federal_state_postal = _target_by_federal_state( grid_data, federal_state ) target_input = DataFrame( { "typ": "federal state", "federal_states": [federal_state], "data": [federal_state_postal], "power_levels": [power_levels], "gas_levels": [gas_levels], } ) grid_data.target_input = target_input elif nuts_region: target, nuts_region_postal = _target_by_nuts_region(grid_data, nuts_region) target_input = DataFrame( { "typ": "nuts region", "nuts_regions": [nuts_region], "data": [nuts_region_postal], "power_levels": [power_levels], "gas_levels": [gas_levels], } ) grid_data.target_input = target_input elif own_area: target, own_postal = _target_by_own_area(grid_data, own_area) target_input = DataFrame( { "typ": "own area", "data": [own_postal], "power_levels": [power_levels], "gas_levels": [gas_levels], } ) grid_data.target_input = target_input else: raise SyntaxError("target area wasn`t defined") # write area informations into grid_data grid_data.area = concat([grid_data.area, target], ignore_index=True) if grid_data.area.crs is None: grid_data.area.set_crs(dave_settings["crs_main"], inplace=True) elif grid_data.area.crs != dave_settings["crs_main"]: grid_data.area.to_crs(dave_settings["crs_main"], inplace=True) # check if requested model is already in the archiv if not grid_data.target_input.iloc[0].typ == "own area": file_exists, file_name = archiv_inventory(grid_data, read_only=True) else: file_exists, file_name = False, "None" # update progress pbar.update(float(10)) if not file_exists: # create borders for target area, load osm-data and write into grid data if town_name: diff_targets = target["town"].drop_duplicates() # define progress step progress_step = 80 / len(diff_targets) for diff_target in diff_targets: town = target[target.town == diff_target] target_geom = town.geometry.unary_union if len(town) > 1 else town.iloc[0].geometry # Obtain data from OSM from_osm( grid_data, pbar, roads, buildings, landuse, railways, waterways, target_geom=target_geom, progress_step=progress_step, ) else: for i in range(len(target)): # define progress step progress_step = 80 / len(target) target_geom = target.geometry.iloc[i] # Obtain data from OSM from_osm( grid_data, pbar, roads, buildings, landuse, railways, waterways, target_geom=target_geom, progress_step=progress_step, ) # reset index for all osm data grid_data.roads.roads.reset_index(drop=True, inplace=True) grid_data.landuse.reset_index(drop=True, inplace=True) grid_data.buildings.residential.reset_index(drop=True, inplace=True) grid_data.buildings.commercial.reset_index(drop=True, inplace=True) # find road junctions roads_highway_dask = from_geopandas( grid_data.roads.roads.highway, npartitions=dave_settings["cpu_number"] ) if "lv" in grid_data.target_input.power_levels[0]: road_junctions( grid_data.roads.roads[roads_highway_dask.isin(dave_settings["roads_lv"]).compute()], grid_data, ) elif "mv" in grid_data.target_input.power_levels[0]: road_junctions( grid_data.roads.roads[roads_highway_dask.isin(dave_settings["roads_mv"]).compute()], grid_data, ) # close progress bar pbar.update(9.99) pbar.close() return file_exists, file_name else: # close progress bar pbar.update(float(90)) pbar.close() return file_exists, file_name