Skip to content

main

Main script calling methods to do SOM calculations.

read_config(config_file='config.toml')

Reads configuration file.

Parameters:

Name Type Description Default
config_file str

path to configuration file, defaults to 'config.toml'.

'config.toml'

Returns:

Name Type Description
config dict

configuration settings.

Source code in src/main.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def read_config(config_file: str = 'config.toml'):
    """
    Reads configuration file.

    Arguments:
        config_file (str): path to configuration file, defaults to 'config.toml'.

    Returns:
        config (dict): configuration settings.
    """
    print('Reading configuration...')
    try:
        if not os.path.isfile(config_file): config_file = os.path.join(os.path.dirname(os.path.realpath(__file__)), config_file)
        with open(config_file, 'r') as f:
            config = toml.load(f)
    except Exception as e:
        fail_with_message('ERROR! Could not load config file!', e)
    return config

run(config, skip_sim=False)

Main function that loads input data and user configuration, runs simulations and processes results.

Parameters:

Name Type Description Default
config dict

configuration settings.

required
skip_sim bool

toggle to skip SOM calculations and only process results.

False
Source code in src/main.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def run(config: dict, skip_sim: bool = False):
    """
    Main function that loads input data and user configuration, 
    runs simulations and processes results.

    Arguments:
        config (dict): configuration settings.
        skip_sim (bool): toggle to skip SOM calculations and only process results.
    """
    # create log directory
    # NOTE! Existing logs are not deleted before new runs, only overwritten
    log_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'log')
    if os.path.exists(log_dir): shutil.rmtree(log_dir)
    os.makedirs(log_dir, exist_ok=True)

    #
    # setup paths
    #

    # main result directory
    export_path = os.path.realpath(config['export_path'])
    if not os.path.isdir(os.path.dirname(export_path)): export_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), config['export_path'])
    # individual simulation results directory
    sim_res_dir = os.path.join(os.path.dirname(export_path), 'sim_res')
    if os.path.exists(sim_res_dir):
        for f in [x for x in os.listdir(sim_res_dir) if x.endswith('.xlsx') or x.endswith('.pickle') and 'sim_res' in x]:
            if not skip_sim:
                os.remove(os.path.join(sim_res_dir, f))
    os.makedirs(sim_res_dir, exist_ok=True)
    # plot directory
    out_dir = os.path.join(os.path.dirname(export_path), 'output')
    if os.path.exists(out_dir): shutil.rmtree(out_dir)
    os.makedirs(out_dir, exist_ok=True)

    #
    # run simulations
    #

    # controlled randomness
    if config['use_random_seed']:
        print(f'Using random seed: {config["random_seed"]}')
        np.random.seed(config['random_seed'])

    # process survey data and read general input
    print('Loading input data...')
    try:
        input_data = som_app.build_input(config)
    except Exception as e:
        fail_with_message(f'ERROR! Something went wrong while processing input data! Check traceback.', e)

    # run simulations and do calculations
    print('Running simulations...')
    if not skip_sim:
        cpu_count = multiprocessing.cpu_count()     # available cpu cores
        with multiprocessing.Manager() as manager:
            progress = manager.Namespace()
            progress.current = 0
            progress.total = config['simulations']
            lock = manager.Lock()
            display_progress(progress.current / progress.total, text='\tProgress: ')
            if config['use_parallel_processing']:   # parallell processing for faster computations
                with multiprocessing.Pool(processes=(min(cpu_count - 2, config['simulations']))) as pool:
                    jobs = [(i, input_data, config, os.path.join(sim_res_dir, f'sim_res_{i}.xlsx'), os.path.join(log_dir, f'log_{i}.txt'), progress, lock) for i in range(config['simulations'])]
                    pool.starmap(run_sim, jobs)
            else:   # single core solution
                for i in range(config['simulations']):
                    run_sim(i, input_data, config, os.path.join(sim_res_dir, f'sim_res_{i}.xlsx'), os.path.join(log_dir, f'log_{i}.txt'), progress, lock)
            display_progress(progress.current / progress.total, text='\tProgress: ')

    #
    # process results
    #

    print('\nProcessing results...')
    try:
        print('\tCalculating means and errors...')
        res = som_app.build_results(sim_res_dir, input_data)    # get condensed results from the individual simulation runs
        print('\tExporting results to excel...')
        som_app.export_results_to_excel(res, input_data, export_path)   # write results to file for human reading
        if config['create_plots']:
            print('\tProducing plots...')
            som_plots.build_display(res, input_data, out_dir, config['use_parallel_processing'], config['filter'])   # plots various results for visual interpretation
    except Exception as e:
        fail_with_message(f'ERROR! Something went wrong while processing results! Check traceback.', e)

    return

run_sim(id, input_data, config, out_path, log_path, progress, lock)

Runs a single simulation round.

Parameters:

Name Type Description Default
id int

Simulation round identifier.

required
input_data dict[str, DataFrame]

Input data used for calculations.

required
config dict

User configuration settings.

required
out_path str

Output path for results.

required
log_path str

Output path for log.

required
progress Namespace

multiprocessing.Manager.Namespace:

  • current (int): Current amount of finished simulations.
  • total (int): Total amount of simulations to calculate.
required
lock Lock

multiprocessing.Manager.Lock, used to manage concurrent processes updating progress.

required

Returns:

Name Type Description
out int

0 (failure) | 1 (success)

Source code in src/main.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def run_sim(id: int, input_data: dict[str, pd.DataFrame], config: dict, out_path: str, log_path: str, progress, lock):
    """
    Runs a single simulation round.

    Arguments:
        id (int): Simulation round identifier.
        input_data (dict[str, DataFrame]): Input data used for calculations.
        config (dict): User configuration settings.
        out_path (str): Output path for results.
        log_path (str): Output path for log.
        progress (Namespace): multiprocessing.Manager.Namespace:

            - current (int): Current amount of finished simulations.
            - total (int): Total amount of simulations to calculate.

        lock (Lock): multiprocessing.Manager.Lock, used to manage concurrent processes updating progress.

    Returns:
        out (int): 0 (failure) | 1 (success)
    """
    log = open(log_path, 'w')

    try:

        print(f'sim = {id}', file=log)

        # Create links between core components
        print('\tBuilding links between Measures, Activities, Pressures and States...', file=log)
        data = som_app.build_links(copy.deepcopy(input_data))

        if config['use_scenario']:
            # Update activity contributions to scenario values
            print('\tApplying activity development scenario...', file=log)
            data['activity_contributions'] = som_app.build_scenario(data, config['scenario'])

        # Create cases
        print('\tBuilding cases...', file=log)
        data = som_app.build_cases(data)

        # Run model
        print('\tCalculating changes in environment...', file=log)
        data = som_app.build_changes(data)

        #
        # export results
        #
        print('\tExporting results...', file=log)
        # export to pickle
        with open(out_path.replace('xlsx', 'pickle'), 'wb') as f:
            pickle.dump(data, f)

        with lock:
            progress.current += 1
            display_progress(progress.current / progress.total, text='\tProgress: ')

    except Exception as e:
        fail_with_message(f'ERROR! Something went wrong during simulation! Check traceback.', e, file=log, do_not_exit=True)
        log.close()
        return 0

    log.close()
    return 1