Skip to content

Utility functions

Vector Distances

kendall_tau(ordering_1, ordering_2, search_pairs)

Calculate the unnormalized Kendall tau distance between two orderings.

The Kendall tau rank distance is a metric that counts the number of pairwise disagreements between two ranking lists. The larger the distance, the more dissimilar the two lists are. Kendall tau distance is also called bubble-sort distance. An ordering holds the option names in the order of their rank (rank=index).

Parameters:

Name Type Description Default
ordering_1 IntArray

First (NumPy) array containing ranked options.

required
ordering_2 IntArray

The second ordering array.

required
search_pairs Sequence[tuple[int, int]]

Index pairs.

required

Returns:

Name Type Description
int float

Normalized kendall tau distance

Source code in src/utils/distance_functions.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def kendall_tau(ordering_1: IntArray, ordering_2: IntArray,
                search_pairs: Sequence[tuple[int, int]]) -> float:
    """
    Calculate the unnormalized Kendall tau distance between two orderings.

    The Kendall tau rank distance is a metric that counts the number
    of pairwise disagreements between two ranking lists.
    The larger the distance, the more dissimilar the two lists are.
    Kendall tau distance is also called bubble-sort distance.
    An ordering holds the option names in the order of their rank (rank=index).

    Args:
        ordering_1 (IntArray): First (NumPy) array containing ranked options.
        ordering_2 (IntArray): The second ordering array.
        search_pairs (Sequence[tuple[int,int]]): Index pairs.

    Returns:
        int: Normalized kendall tau distance
    """
    # TODO: remove these tests (comment out) on actual simulations to speed up
    n = ordering_1.size
    if n > 0:
        expected_arr = np.arange(n)
        assert (np.array_equal(np.sort(ordering_1), expected_arr)
                and np.array_equal(np.sort(ordering_2), expected_arr)) , \
            f"Error: Sequences {ordering_1}, {ordering_2} aren't comparable."

    # Get the unnormalized Kendall tau distance
    dist = unnormalized_kendall_tau(ordering_1, ordering_2, search_pairs)
    # Maximum possible Kendall tau distance
    max_distance = comb(n, 2)  # This is n choose 2, or n(n-1)/2
    # Normalize the distance
    normalized_distance = dist / max_distance

    return normalized_distance

kendall_tau_on_ranks(rank_arr_1, rank_arr_2, search_pairs, color_vec)

Beware: don't use this for orderings!

This function calculates the kendal tau distance between two rank vektors. (The Kendall tau rank distance is a metric that counts the number of pairwise disagreements between two ranking lists. The larger the distance, the more dissimilar the two lists are. Kendall tau distance is also called bubble-sort distance). Rank vectors hold the rank of each option (option = index). Not to be confused with an ordering (or sequence) where the vector holds options and the index is the rank.

Parameters:

Name Type Description Default
rank_arr_1 FloatArray

First Array containing the ranks of each option.

required
rank_arr_2 FloatArray

The second rank array.

required
search_pairs Sequence[tuple[int, int]]

The pairs of indices.

required
color_vec IntArray

(IntArray): The vector of colors (for efficiency).

required

Returns:

Name Type Description
int int

Kendall tau distance.

Source code in src/utils/distance_functions.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def kendall_tau_on_ranks(rank_arr_1: FloatArray, rank_arr_2: FloatArray,
                         search_pairs: Sequence[tuple[int, int]],
                         color_vec: IntArray) -> int:
    """
    Beware: don't use this for orderings!

    This function calculates the kendal tau distance between two rank vektors.
    (The Kendall tau rank distance is a metric that counts the number
    of pairwise disagreements between two ranking lists.
    The larger the distance, the more dissimilar the two lists are.
    Kendall tau distance is also called bubble-sort distance).
    Rank vectors hold the rank of each option (option = index).
    Not to be confused with an ordering (or sequence) where the vector
    holds options and the index is the rank.

    Args:
        rank_arr_1 (FloatArray): First Array containing the ranks of each option.
        rank_arr_2 (FloatArray): The second rank array.
        search_pairs (Sequence[tuple[int, int]]): The pairs of indices.
        color_vec: (IntArray): The vector of colors (for efficiency).

    Returns:
        int: Kendall tau distance.
    """
    # Get the ordering (option names being 0 to length)
    ordering_1 = np.argsort(rank_arr_1)
    ordering_2 = np.argsort(rank_arr_2)
    # print("Ord1:", list(ordering_1), " Ord2:", list(ordering_2))
    # Create the mapping array
    mapping_array = np.empty_like(ordering_1)  # Empty array with same shape
    mapping_array[ordering_1] = color_vec  # Fill the mapping
    # Use the mapping array to rename elements in ordering_2
    renamed_arr_2 = mapping_array[ordering_2]  # Uses NumPys advanced indexing
    # print("Ren1:",list(range(len(color_vec))), " Ren2:", list(renamed_arr_2))
    # Count inversions using precomputed pairs
    kendall_distance = 0
    # inversions = []
    for i, j in search_pairs:
        if renamed_arr_2[i] > renamed_arr_2[j]:
            # inversions.append((renamed_arr_2[i], renamed_arr_2[j]))
            kendall_distance += 1
    # print("Inversions:\n", inversions)
    return kendall_distance

spearman(ordering_1, ordering_2, _search_pairs=None)

This calculates the normalized Spearman distance between two orderings. Spearman's foot rule is a measure of the distance between ranked lists. It is given as the sum of the absolute differences between the ranks of the two orderings (values from 0 to n-1 in any order).

Parameters:

Name Type Description Default
ordering_1 IntArray

The first (NumPy) array containing the option's ranks.

required
ordering_2 IntArray

The second rank array.

required
_search_pairs

This parameter is intentionally unused.

None

Returns:

Name Type Description
float float

Spearman distance

Source code in src/utils/distance_functions.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def spearman(ordering_1: IntArray, ordering_2: IntArray, _search_pairs=None) -> float:
    """
    This calculates the normalized Spearman distance between two orderings.
    Spearman's foot rule is a measure of the distance between ranked lists.
    It is given as the sum of the absolute differences between the ranks
    of the two orderings (values from 0 to n-1 in any order).

    Args:
        ordering_1: The first (NumPy) array containing the option's ranks.
        ordering_2: The second rank array.
        _search_pairs: This parameter is intentionally unused.

    Returns:
        float: Spearman distance
    """
    # TODO: remove these tests (comment out) on actual simulations to speed up
    n = ordering_1.size
    if n > 0:
        expected_arr = np.arange(n)
        assert (np.array_equal(np.sort(ordering_1), expected_arr)
                and np.array_equal(np.sort(ordering_2), expected_arr)) , \
            f"Error: Sequences {ordering_1}, {ordering_2} aren't comparable."
    distance = np.sum(np.abs(ordering_1 - ordering_2))
    # Normalize
    if n % 2 == 0:  # Even number of elements
        max_dist = n**2 / 2
    else: # Odd number of elements
        max_dist = n * (n - 1) / 2
    return distance / max_dist

spearman_distance(rank_arr_1, rank_arr_2)

Beware: don't use this for orderings!

This function calculates the Spearman distance between two rank vektors. Spearman's foot rule is a measure of the distance between ranked lists. It is given as the sum of the absolute differences between the ranks of the two lists. This function is meant to work with numeric values as well. Hence, we only assume the rank values to be comparable (e.q. normalized).

Parameters:

Name Type Description Default
rank_arr_1 FloatArray

Array containing the ranks of each option

required
rank_arr_2 FloatArray

The second rank array.

required

Returns:

Name Type Description
float float

The Spearman distance

Source code in src/utils/distance_functions.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def spearman_distance(rank_arr_1: FloatArray, rank_arr_2: FloatArray) -> float:
    """
    Beware: don't use this for orderings!

    This function calculates the Spearman distance between two rank vektors.
    Spearman's foot rule is a measure of the distance between ranked lists.
    It is given as the sum of the absolute differences between the ranks
    of the two lists.
    This function is meant to work with numeric values as well.
    Hence, we only assume the rank values to be comparable (e.q. normalized).

    Args:
        rank_arr_1 (FloatArray): Array containing the ranks of each option
        rank_arr_2 (FloatArray): The second rank array.

    Returns:
        float: The Spearman distance
    """
    # TODO: remove these tests (comment out) on actual simulations
    assert rank_arr_1.size == rank_arr_2.size, \
        "Rank arrays must have the same length"
    if rank_arr_1.size > 0:
        assert (rank_arr_1.min() == rank_arr_2.min()
                and rank_arr_1.max() == rank_arr_2.max()), \
            f"Error: Sequences {rank_arr_1}, {rank_arr_2} aren't comparable."
    return np.sum(np.abs(rank_arr_1 - rank_arr_2))

unnormalized_kendall_tau(ordering_1, ordering_2, search_pairs)

This function calculates the kendal tau distance on two orderings. An ordering holds the option names in the order of their rank (rank=index).

Parameters:

Name Type Description Default
ordering_1 IntArray

First Array containing ranked options.

required
ordering_2 IntArray

The second ordering array.

required
search_pairs Sequence[tuple[int, int]]

Index pairs (for efficiency).

required

Returns:

Type Description
int

The kendall tau distance

Source code in src/utils/distance_functions.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def unnormalized_kendall_tau(ordering_1: IntArray, ordering_2: IntArray,
                             search_pairs: Sequence[tuple[int, int]]) -> int:
    """
    This function calculates the kendal tau distance on two orderings.
    An ordering holds the option names in the order of their rank (rank=index).

    Args:
        ordering_1 (IntArray): First Array containing ranked options.
        ordering_2 (IntArray): The second ordering array.
        search_pairs (Sequence[tuple[int,int]]): Index pairs (for efficiency).

    Returns:
        The kendall tau distance
    """
    # Rename the elements to reduce the problem to counting inversions
    mapping = {option: idx for idx, option in enumerate(ordering_1)}
    renamed_arr_2 = np.array([mapping[option] for option in ordering_2])
    # Count inversions using precomputed pairs
    kendall_distance = 0
    for i, j in search_pairs:
        if renamed_arr_2[i] > renamed_arr_2[j]:
            kendall_distance += 1
    return kendall_distance

Simulation Indicators

get_grid_colors(model)

Returns the current grid state as a list of rows. Each row is a list of cell colors. Assumes that the cells were created in row-major order and stored in model.color_cells.

Source code in src/utils/metrics.py
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
def get_grid_colors(model):
    """
    Returns the current grid state as a list of rows.
    Each row is a list of cell colors. Assumes that the cells were
    created in row-major order and stored in model.color_cells.
    """
    grid = []
    for row in range(model.height):
        start = row * model.width
        end = start + model.width
        # Get the color for each cell in the row.
        row_colors = [model.color_cells[i].color for i in range(start, end)]
        grid.append(row_colors)
    return grid

Social Choice

Here we define the social welfare functions that can be used in the simulation. Beware: We assume the preference relation in the following (unconventional) way on purpose. pref_table: numpy matrix with one row per agent, column number is option number and the values (each in [0,1]) are normalized ranking values. The purpose of this is to allow for non-discrete and non-equidistant rankings.

approval_voting(pref_table)

This function implements the approval voting social welfare function. Beware: Input is a preference table (values define a ranking, index=option), but the output is a ranking/an ordering (values represent options).

Parameters:

Name Type Description Default
pref_table ndarray

Agent's preferences (disagreement) as matrix.

required

Returns:

Type Description
ndarray

np.ndarray: Resulting preference ranking (beware: not a pref. relation).

Source code in src/utils/social_welfare_functions.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def approval_voting(pref_table: np.ndarray) -> np.ndarray:
    """
    This function implements the approval voting social welfare function.
    Beware: Input is a preference table (values define a ranking, index=option),
            but the output is a ranking/an ordering (values represent options).

    Args:
        pref_table (np.ndarray): Agent's preferences (disagreement) as matrix.

    Returns:
        np.ndarray: Resulting preference ranking (beware: not a pref. relation).
    """
    # TODO: does this take the meaning of the values into account? value = dist. = disagreement !
    pref_table = imp_prepr_for_approval(pref_table)
    # Count how often each option is approved
    approval_counts = np.sum(pref_table, axis=0)
    # Add noise to break ties TODO check for bias
    eps = 1e-4
    noise = np.random.uniform(-eps, eps, len(approval_counts))
    #option_count_pairs = list(enumerate(approval_counts + noise))
    #option_count_pairs.sort(key=lambda x: x[1], reverse=True)
    #return [pair[0] for pair in option_count_pairs]
    return np.argsort(-(approval_counts + noise))  # TODO: check order (ascending/descending) - np.argsort sorts ascending

complete_ranking(ranking, num_options)

This function adds options that are not in the ranking in a random order.

Parameters:

Name Type Description Default
ranking ndarray

Partial ranking of option indices.

required
num_options int

The total number of options.

required

Returns:

Type Description
ndarray

np.ndarray: Completed ranking of length num_options.

Source code in src/utils/social_welfare_functions.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def complete_ranking(ranking: np.ndarray, num_options: int) -> np.ndarray:
    """
    This function adds options that are not in the ranking in a random order.

    Args:
        ranking (nd.ndarray): Partial ranking of option indices.
        num_options (int): The total number of options.

    Returns:
        np.ndarray: Completed ranking of length `num_options`.
    """
    all_options = np.arange(num_options)
    mask = np.isin(all_options, ranking, invert=True)
    non_included_options = all_options[mask]
    np.random.shuffle(non_included_options)
    return np.concatenate((ranking, non_included_options))

continuous_score_voting(pref_table)

This function implements a continuous score voting based on disagreement. Beware: Input is a preference table (values define a ranking, index=option), but the output is a ranking/an ordering (values represent options).

Parameters:

Name Type Description Default
pref_table ndarray

Agent's preferences (disagreement) as matrix.

required

Returns:

Type Description
ndarray

np.ndarray: Resulting preference ranking (beware: not a pref. relation).

Source code in src/utils/social_welfare_functions.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def continuous_score_voting(pref_table: np.ndarray) -> np.ndarray:
    """
    This function implements a continuous score voting based on disagreement.
    Beware: Input is a preference table (values define a ranking, index=option),
            but the output is a ranking/an ordering (values represent options).

    Args:
        pref_table (np.ndarray): Agent's preferences (disagreement) as matrix.

    Returns:
        np.ndarray: Resulting preference ranking (beware: not a pref. relation).
    """
    # TODO: integrate and test
    # Sum up the disagreement for each option
    scores = np.sum(pref_table, axis=0)
    # Add noise to break ties
    eps = 1e-8
    noise = np.random.uniform(-eps, eps, len(scores))
    ranking = np.argsort(-(scores + noise))
    return ranking

imp_prepr_for_approval(pref_table)

This is just like preprocessing_for_approval, but more intelligent. It sets the threshold depending on the variances.

Parameters:

Name Type Description Default
pref_table ndarray

Preferences table.

required

Returns:

Type Description
ndarray

np.ndarray: Binary approvals with shape of pref_table.

Source code in src/utils/social_welfare_functions.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def imp_prepr_for_approval(pref_table: np.ndarray) -> np.ndarray:
    """
    This is just like preprocessing_for_approval, but more intelligent.
    It sets the threshold depending on the variances.

    Args:
        pref_table (np.ndarray): Preferences table.

    Returns:
        np.ndarray: Binary approvals with shape of `pref_table`.
    """
    # The threshold is set according to the variances
    threshold = np.mean(pref_table, axis=1) - np.var(pref_table, axis=1)
    if TYPE_CHECKING:
        assert isinstance(threshold, np.ndarray)
    return (pref_table < threshold.reshape(-1, 1)).astype(int)

majority_rule(pref_table)

This function implements the majority rule social welfare function. Beware: Input is a preference table (values define a ranking, index=option), but the output is a ranking/an ordering (values represent options).

Parameters:

Name Type Description Default
pref_table ndarray

Preferences (disagreement values) per agent (rows) per option (cols).

required

Returns:

Type Description
ndarray

np.ndarray: Resulting preference ranking (beware: not a pref. relation)

Source code in src/utils/social_welfare_functions.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def majority_rule(pref_table: np.ndarray) -> np.ndarray:
    """
    This function implements the majority rule social welfare function.
    Beware: Input is a preference table (values define a ranking, index=option),
            but the output is a ranking/an ordering (values represent options).

    Args:
        pref_table (np.ndarray): Preferences (disagreement values)
            per agent (rows) per option (cols).

    Returns:
        np.ndarray: Resulting preference ranking (beware: not a pref. relation)
    """
    n, m = pref_table.shape  # n agents, m options
    # Break ties if they exist
    pref_table = run_tie_breaking_preparation_for_majority(pref_table)
    # Count how often an option is ranked first (indexes of the min values)
    first_choices = np.argmin(pref_table, axis=1)
    # To avoid bias toward voters of low indices in the counting, we shuffle
    np.random.shuffle(first_choices)  # (crucial when counting shows ties later)
    first_choice_counts = {}
    for choice in first_choices:
        first_choice_counts[choice] = first_choice_counts.get(choice, 0) + 1
    # Get the ranking from the counts
    option_count_pairs = list(first_choice_counts.items())
    option_count_pairs.sort(key=lambda x: x[1], reverse=True)
    ranking = np.array([pair[0] for pair in option_count_pairs])
    # Faster:
    # count_pairs = np.array(option_count_pairs)
    # # Sort the array by the second element in descending order
    # sorted_indices = np.argsort(count_pairs[:, 1])[::-1]
    # count_pairs = count_pairs[sorted_indices]
    # ranking = count_pairs[:, 0].astype(int)
    # Fill up the ranking with the missing options (if any)
    if ranking.shape[0] < m:
        ranking = complete_ranking(ranking, m)
    return ranking

preprocessing_for_approval(pref_table, threshold=None)

Interpret values below threshold as approval.

This function prepares the preference table for approval voting by interpreting every value below a threshold as an approval. Beware: the values are distance/disagreement => smaller = less disagreement The standard threshold is 1/m (m = number of options). The reasoning is that if the preferences are normalized, 1/m ensures the threshold to be proportionate to the number of options. It also ensures that, on average, half of the options will be approved. The actual number of approved options, however, can still vary depending on the specific values in the preference table.

Parameters:

Name Type Description Default
pref_table ndarray

Preferences table.

required
threshold float | None

Approval threshold; defaults to 1/m.

None

Returns:

Type Description
ndarray

np.ndarray: Binary approvals with shape of pref_table.

Source code in src/utils/social_welfare_functions.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def preprocessing_for_approval(pref_table: np.ndarray,
                               threshold: Optional[float] = None) -> np.ndarray:
    """
    Interpret values below threshold as approval.

    This function prepares the preference table for approval voting
    by interpreting every value below a threshold as an approval.
    Beware: the values are distance/disagreement => smaller = less disagreement
    The standard threshold is 1/m (m = number of options).
    The reasoning is that if the preferences are normalized,
    1/m ensures the threshold to be proportionate to the number of options.
    It also ensures that, on average, half of the options will be approved.
    The actual number of approved options, however,
    can still vary depending on the specific values in the preference table.

    Args:
        pref_table (np.ndarray): Preferences table.
        threshold (float | None): Approval threshold; defaults to 1/m.

    Returns:
        np.ndarray: Binary approvals with shape of `pref_table`.
    """
    if threshold is None:
        threshold = 1 / pref_table.shape[1]
    return (pref_table < threshold).astype(int)

run_tie_breaking_preparation_for_majority(pref_table, noise_factor=100)

This function prepares the preference table for majority rule such that it handles ties in the voters' preferences. Because majority rule cannot usually deal with ties. The tie breaking is randomized to ensure anonymity and neutrality.

Parameters:

Name Type Description Default
pref_table ndarray

Preferences per agent (rows) per option (cols).

required
noise_factor int

Controls noise magnitude.

100

Returns:

Type Description
ndarray

np.ndarray: Table without ties in first choices.

Source code in src/utils/social_welfare_functions.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def run_tie_breaking_preparation_for_majority(pref_table: np.ndarray,
                                              noise_factor: int = 100
                                              ) -> np.ndarray:
    """
    This function prepares the preference table for majority rule such that
    it handles ties in the voters' preferences.
    Because majority rule cannot usually deal with ties.
    The tie breaking is randomized to ensure anonymity and neutrality.

    Args:
        pref_table (np.ndarray): Preferences per agent (rows) per option (cols).
        noise_factor (int): Controls noise magnitude.

    Returns:
        np.ndarray: Table without ties in first choices.
    """
    # Add some random noise to break ties (based on the variances)
    variances = np.var(pref_table, axis=1)
    # If variances are zero, all values are equal, then select a random option
    mask = (variances == 0)
    # Split
    pref_tab_var_zero = pref_table[mask]
    pref_tab_var_non_zero = pref_table[~mask]
    n, m = pref_tab_var_non_zero.shape

    # Set exactly one option to 0 (the first choice) and the rest to 1/(m-1)
    pref_tab_var_zero.fill(1 / (m-1))
    for i in range(pref_tab_var_zero.shape[0]):
        rand_option = np.random.randint(0, m)
        pref_tab_var_zero[i, rand_option] = 0
    # On the non-zero part, add some noise to the values to break ties
    non_zero_variances = variances[~mask]
    # Generate noise based on the variances
    noise_eps = non_zero_variances / noise_factor
    noise = np.random.uniform(-noise_eps[:, np.newaxis],
                              noise_eps[:, np.newaxis], (n, m))
    # `noise_eps[:, np.newaxis]` reshapes noise_eps from shape `(n,)` to (n, 1)
    pref_tab_var_non_zero += noise

    # Put the parts back together
    return np.concatenate((pref_tab_var_non_zero, pref_tab_var_zero))