Zum Inhalt

Utility functions

Vector Distances

distribution_distance_l1(dist_1, dist_2)

Normalized L1 distance between distributions (0..1).

L1 max for distributions is 2, so normalize by 2.

Source code in src/utils/distance_functions.py
133
134
135
136
137
138
139
140
141
142
143
144
def distribution_distance_l1(dist_1: FloatArray, dist_2: FloatArray) -> float:
    """Normalized L1 distance between distributions (0..1).

    L1 max for distributions is 2, so normalize by 2.
    """
    dist_1 = np.asarray(dist_1, dtype=np.float64)
    dist_2 = np.asarray(dist_2, dtype=np.float64)
    n = int(dist_1.size)
    if n > 0:
        validate_distribution(dist_1, n)
        validate_distribution(dist_2, n)
    return float(np.sum(np.abs(dist_1 - dist_2)) / 2.0)

kendall_tau_on_ranks(rank_arr_1, rank_arr_2, search_pairs, color_vec)

Beware: don't use this for orderings!

This function calculates the kendal tau distance between two rank vektors. (The Kendall tau rank distance is a metric that counts the number of pairwise disagreements between two orderings. The larger the distance, the more dissimilar the two lists are. Kendall tau distance is also called bubble-sort distance). Rank vectors hold the rank of each option (option = index). Not to be confused with an ordering (or sequence) where the vector holds options and the index is the rank.

Parameters:

Name Type Description Default
rank_arr_1 FloatArray

First Array containing the ranks of each option.

required
rank_arr_2 FloatArray

The second rank array.

required
search_pairs Sequence[tuple[int, int]]

The pairs of indices.

required
color_vec IntArray

(IntArray): The vector of colors (for efficiency).

required

Returns:

Name Type Description
int int

Kendall tau distance.

Source code in src/utils/distance_functions.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
def kendall_tau_on_ranks(rank_arr_1: FloatArray, rank_arr_2: FloatArray,
                         search_pairs: Sequence[tuple[int, int]],
                         color_vec: IntArray) -> int:
    """
    Beware: don't use this for orderings!

    This function calculates the kendal tau distance between two rank vektors.
    (The Kendall tau rank distance is a metric that counts the number
    of pairwise disagreements between two orderings.
    The larger the distance, the more dissimilar the two lists are.
    Kendall tau distance is also called bubble-sort distance).
    Rank vectors hold the rank of each option (option = index).
    Not to be confused with an ordering (or sequence) where the vector
    holds options and the index is the rank.

    Args:
        rank_arr_1 (FloatArray): First Array containing the ranks of each option.
        rank_arr_2 (FloatArray): The second rank array.
        search_pairs (Sequence[tuple[int, int]]): The pairs of indices.
        color_vec: (IntArray): The vector of colors (for efficiency).

    Returns:
        int: Kendall tau distance.
    """
    # Get the ordering (option names being 0 to length)
    ordering_1 = ranks_to_ordering(rank_arr_1)
    ordering_2 = ranks_to_ordering(rank_arr_2)
    # print("Ord1:", list(ordering_1), " Ord2:", list(ordering_2))
    # Create the mapping array
    mapping_array = np.empty_like(ordering_1)  # Empty array with same shape
    mapping_array[ordering_1] = color_vec  # Fill the mapping
    # Use the mapping array to rename elements in ordering_2
    renamed_arr_2 = mapping_array[ordering_2]  # Uses NumPys advanced indexing
    # print("Ren1:",list(range(len(color_vec))), " Ren2:", list(renamed_arr_2))
    # Count inversions using precomputed pairs
    kendall_distance = 0
    # inversions = []
    for i, j in search_pairs:
        if renamed_arr_2[i] > renamed_arr_2[j]:
            # inversions.append((renamed_arr_2[i], renamed_arr_2[j]))
            kendall_distance += 1
    # print("Inversions:\n", inversions)
    return kendall_distance

kendall_tau_order(ordering_1, ordering_2, search_pairs)

Normalized Kendall tau distance on orderings (permutations).

Ordering: index = rank, value = option id.

Parameters:

Name Type Description Default
ordering_1 IntArray

First (NumPy) array containing ranked options.

required
ordering_2 IntArray

The second ordering array.

required
search_pairs Sequence[tuple[int, int]]

Index pairs.

required

Returns:

Name Type Description
int float

Normalized kendall tau distance

Source code in src/utils/distance_functions.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def kendall_tau_order(ordering_1: IntArray, ordering_2: IntArray,
                         search_pairs: Sequence[tuple[int, int]]) -> float:
    """Normalized Kendall tau distance on orderings (permutations).

    Ordering: index = rank, value = option id.

    Args:
        ordering_1 (IntArray): First (NumPy) array containing ranked options.
        ordering_2 (IntArray): The second ordering array.
        search_pairs (Sequence[tuple[int,int]]): Index pairs.

    Returns:
        int: Normalized kendall tau distance
    """
    # The Kendall tau rank distance is a metric that counts the number
    #     of pairwise disagreements between two orderings.
    #     The larger the distance, the more dissimilar the two lists are.
    #     Kendall tau distance is also called bubble-sort distance.
    ordering_1 = np.asarray(ordering_1, dtype=np.int64)
    ordering_2 = np.asarray(ordering_2, dtype=np.int64)
    n = ordering_1.size
    if n > 0:
        validate_ordering(ordering_1, n)
        validate_ordering(ordering_2, n)
    dist = unnormalized_kendall_tau(ordering_1, ordering_2, search_pairs)
    max_distance = comb(n, 2)
    return dist / max_distance if max_distance > 0 else 0.0

l1_score_distance(score_1, score_2)

L1 distance between score vectors (lower=better).

Source code in src/utils/distance_functions.py
122
123
124
125
126
127
128
129
130
def l1_score_distance(score_1: FloatArray, score_2: FloatArray) -> float:
    """L1 distance between score vectors (lower=better)."""
    score_1 = np.asarray(score_1, dtype=np.float64)
    score_2 = np.asarray(score_2, dtype=np.float64)
    n = int(score_1.size)
    if n > 0:
        validate_score_vector(score_1, n)
        validate_score_vector(score_2, n)
    return float(np.sum(np.abs(score_1 - score_2)))

spearman_distance(rank_arr_1, rank_arr_2)

Beware: don't use this for orderings!

This function calculates the Spearman distance between two rank vektors. Spearman's foot rule is a measure of the distance between ranked lists. It is given as the sum of the absolute differences between the ranks of the two lists. This function is meant to work with numeric values as well. Hence, we only assume the rank values to be comparable (e.q. normalized).

Parameters:

Name Type Description Default
rank_arr_1 FloatArray

Array containing the ranks of each option

required
rank_arr_2 FloatArray

The second rank array.

required

Returns:

Name Type Description
float float

The Spearman distance

Source code in src/utils/distance_functions.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def spearman_distance(rank_arr_1: FloatArray, rank_arr_2: FloatArray) -> float:
    """
    Beware: don't use this for orderings!

    This function calculates the Spearman distance between two rank vektors.
    Spearman's foot rule is a measure of the distance between ranked lists.
    It is given as the sum of the absolute differences between the ranks
    of the two lists.
    This function is meant to work with numeric values as well.
    Hence, we only assume the rank values to be comparable (e.q. normalized).

    Args:
        rank_arr_1 (FloatArray): Array containing the ranks of each option
        rank_arr_2 (FloatArray): The second rank array.

    Returns:
        float: The Spearman distance
    """
    assert rank_arr_1.size == rank_arr_2.size, \
        "Rank arrays must have the same length"
    if rank_arr_1.size > 0:
        assert (rank_arr_1.min() == rank_arr_2.min()
                and rank_arr_1.max() == rank_arr_2.max()), \
            f"Error: Sequences {rank_arr_1}, {rank_arr_2} aren't comparable."
    return np.sum(np.abs(rank_arr_1 - rank_arr_2))

spearman_footrule_ranks(rank_arr_1, rank_arr_2)

Normalized Spearman footrule on rank vectors.

Rank vector: index = option id, value = rank (0 best).

Source code in src/utils/distance_functions.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def spearman_footrule_ranks(rank_arr_1: FloatArray, rank_arr_2: FloatArray) -> float:
    """Normalized Spearman footrule on rank vectors.

    Rank vector: index = option id, value = rank (0 best).
    """
    rank_arr_1 = np.asarray(rank_arr_1)
    rank_arr_2 = np.asarray(rank_arr_2)
    n = int(rank_arr_1.size)
    if n > 0:
        validate_rank_vector(rank_arr_1, n)
        validate_rank_vector(rank_arr_2, n)
    distance = np.sum(np.abs(rank_arr_1 - rank_arr_2))
    # Maximum Spearman footrule distance:
    # - even n: n^2 / 2
    # - odd  n: (n^2 - 1) / 2
    if n % 2 == 0:
        max_dist = n**2 / 2
    else:
        max_dist = (n**2 - 1) / 2
    return distance / max_dist if max_dist > 0 else 0.0

spearman_fr_order(ordering_1, ordering_2, _search_pairs=None)

Normalized Spearman footrule on orderings.

Ordering: index = rank, value = option id.

Source code in src/utils/distance_functions.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def spearman_fr_order(ordering_1: IntArray, ordering_2: IntArray, _search_pairs=None) -> float:
    """Normalized Spearman footrule on orderings.

    Ordering: index = rank, value = option id.
    """
    ordering_1 = np.asarray(ordering_1, dtype=np.int64)
    ordering_2 = np.asarray(ordering_2, dtype=np.int64)
    n = ordering_1.size
    if n > 0:
        validate_ordering(ordering_1, n)
        validate_ordering(ordering_2, n)
    ranks_1 = ordering_to_ranks(ordering_1)
    ranks_2 = ordering_to_ranks(ordering_2)
    return spearman_footrule_ranks(ranks_1, ranks_2)

unnormalized_kendall_tau(ordering_1, ordering_2, search_pairs)

This function calculates the kendal tau distance on two orderings. An ordering holds the option names in the order of their rank (rank=index).

Parameters:

Name Type Description Default
ordering_1 IntArray

First Array containing ranked options.

required
ordering_2 IntArray

The second ordering array.

required
search_pairs Sequence[tuple[int, int]]

Index pairs (for efficiency).

required

Returns:

Type Description
int

The kendall tau distance

Source code in src/utils/distance_functions.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def unnormalized_kendall_tau(ordering_1: IntArray, ordering_2: IntArray,
                             search_pairs: Sequence[tuple[int, int]]) -> int:
    """
    This function calculates the kendal tau distance on two orderings.
    An ordering holds the option names in the order of their rank (rank=index).

    Args:
        ordering_1 (IntArray): First Array containing ranked options.
        ordering_2 (IntArray): The second ordering array.
        search_pairs (Sequence[tuple[int,int]]): Index pairs (for efficiency).

    Returns:
        The kendall tau distance
    """
    # Rename the elements to reduce the problem to counting inversions
    mapping = {option: idx for idx, option in enumerate(ordering_1)}
    renamed_arr_2 = np.array([mapping[option] for option in ordering_2])
    # Count inversions using precomputed pairs
    kendall_distance = 0
    for i, j in search_pairs:
        if renamed_arr_2[i] > renamed_arr_2[j]:
            kendall_distance += 1
    return kendall_distance

Simulation Indicators

get_grid_colors(model)

Return the current grid state as an array of rows (row-major): result[y, x] == color at position (x, y)

Source code in src/utils/metrics.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def get_grid_colors(model):
    """
    Return the current grid state as an array of rows (row-major):
      result[y, x] == color at position (x, y)
    """
    grid = model.grid

    h, w = grid.height, grid.width
    # Read in Mesa coord_iter() order (x-major), then reshape and transpose to (h, w)
    flat = np.fromiter(
        (cell.color if cell is not None else None for cell, _pos in grid.coord_iter()),
        dtype=np.int64,
        count=w * h,
    )
    grid_colors_arr = flat.reshape((w, h)).T  # -> shape (h, w) with arr[y, x]
    return grid_colors_arr

gini_index_0_100(values)

Compute the Gini index (0-100) for a 1D sequence of non-negative values.

Edge cases
  • empty / 1 element -> 0
  • all zeros -> 0
Source code in src/utils/metrics.py
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def gini_index_0_100(values: Union[Sequence[float], np.ndarray, None]) -> int:
    """Compute the Gini index (0-100) for a 1D sequence of non-negative values.

    Edge cases:
      - empty / 1 element -> 0
      - all zeros -> 0
    """
    if values is None:
        return 0
    arr = np.asarray(values, dtype=np.float64).ravel()
    n = int(arr.size)
    if n <= 1:
        return 0
    total = float(arr.sum())
    if total <= 0:
        return 0

    # Sort ascending (O(n log n)); n per-area is typically small.
    s = np.sort(arr)
    # Standard gini formula: (2*sum(i*x_i))/(n*sum(x)) - (n+1)/n
    i = np.arange(1, n + 1, dtype=np.float64)
    g = (2.0 * float((i * s).sum())) / (n * total) - (n + 1.0) / n
    # Numerical safety
    if g < 0:
        g = 0.0
    if g > 1:
        g = 1.0
    return int(g * 100)

Social Choice

Representation contract: - Input: ScoreVector table (pref_table) * rows = agents * columns = options * values = disagreement/oppose scores (lower = better) - Output: Ordering (permutation) with best option first.

This design allows non-discrete and non-equidistant preferences.

Implemented rules (schema B1): - plurality_rule (first-choice plurality after tie-prep) - approval_voting (canonical fixed-threshold approval mapping) - approval_voting_custom (legacy adaptive approval mapping; non-canonical) - utilitarian_rule (minimize total disagreement) - borda_rule (positional scoring derived from per-voter orderings) - schulze_rule (pairwise strongest-path ranking on options) - random_rule (uniform full random ranking, seed-deterministic)

approval_voting(pref_table, *, rng)

Canonical approval voting with fixed threshold mapping.

Parameters:

Name Type Description Default
pref_table ndarray

ScoreVector table (disagreement values). per agent (rows) per option (cols), lower = better.

required
rng Generator

Random number generator.

required

Returns: np.ndarray: Ordering (permutation) of options.

Source code in src/utils/social_welfare_functions.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def approval_voting(pref_table: np.ndarray, *, rng: np.random.Generator) -> np.ndarray:
    """
    Canonical approval voting with fixed threshold mapping.

    Args:
        pref_table (np.ndarray): ScoreVector table (disagreement values).
            per agent (rows) per option (cols), lower = better.
        rng (np.random.Generator): Random number generator.
    Returns:
        np.ndarray: Ordering (permutation) of options.
    """
    if pref_table.ndim != 2:
        raise ValueError("pref_table must be 2D")
    _n, m = pref_table.shape
    if m <= 0:
        return np.asarray([], dtype=np.int64)

    approvals = preprocessing_for_approval(pref_table, threshold=APPROVAL_THRESHOLD_TAU)
    approval_counts = np.sum(approvals, axis=0).astype(np.int64)
    # Tie-break policy:
    # 1) higher approval count wins
    # 2) lower total disagreement wins (content-based, label-neutral)
    # 3) randomized final tie-break (deterministic for fixed seed)
    totals = np.sum(pref_table, axis=0).astype(np.float64)
    rand = rng.random(m)
    ordering = np.lexsort((rand, totals, -approval_counts)).astype(np.int64)
    validate_ordering(ordering, m)
    return ordering

approval_voting_custom(pref_table, *, rng)

Legacy/custom approval mapping using adaptive per-voter threshold (mean-variance).

Kept for exploratory comparisons; not used as canonical approval in thesis baseline.

Source code in src/utils/social_welfare_functions.py
171
172
173
174
175
176
177
178
179
180
181
def approval_voting_custom(pref_table: np.ndarray, *, rng: np.random.Generator) -> np.ndarray:
    """
    Legacy/custom approval mapping using adaptive per-voter threshold (mean-variance).

    Kept for exploratory comparisons; not used as canonical approval in thesis baseline.
    """
    pref_table = imp_prepr_for_approval(pref_table)
    approval_counts = np.sum(pref_table, axis=0)
    eps = 1e-6
    noise = rng.uniform(-eps, eps, len(approval_counts))
    return np.argsort(-(approval_counts + noise))

borda_rule(pref_table, *, rng)

Borda count derived from per-voter orderings of the ScoreVector.

We interpret each row as a (possibly tied) preference ordering by sorting scores ascending (lower=better). Then assign Borda points: best gets m-1, next m-2, ... worst gets 0.

Tie-breaking: - per-voter randomized tie-breaking using rng via scores_to_ordering. - final ordering tie-breaks by a secondary random key.

Source code in src/utils/social_welfare_functions.py
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def borda_rule(pref_table: np.ndarray, *, rng: np.random.Generator) -> np.ndarray:
    """Borda count derived from per-voter orderings of the ScoreVector.

    We interpret each row as a (possibly tied) preference ordering by sorting
    scores ascending (lower=better). Then assign Borda points:
      best gets m-1, next m-2, ... worst gets 0.

    Tie-breaking:
    - per-voter randomized tie-breaking using `rng` via scores_to_ordering.
    - final ordering tie-breaks by a secondary random key.
    """
    if pref_table.ndim != 2:
        raise ValueError("pref_table must be 2D")
    n, m = pref_table.shape
    if m <= 0:
        return np.asarray([], dtype=np.int64)

    points_by_rank = np.arange(m - 1, -1, -1, dtype=np.float64)  # length m
    totals = np.zeros(m, dtype=np.float64)
    for i in range(n):
        ordering = scores_to_ordering(pref_table[i], rng=rng)
        # ordering[0] is best -> gets m-1 points, etc.
        totals[ordering] += points_by_rank

    # Higher totals => better.
    rand = rng.random(m)
    ordering = np.lexsort((rand, -totals))
    validate_ordering(ordering, m)
    return ordering

complete_ranking(ordering, num_options, *, rng)

This function adds options that are not in the ordering in a random order.

Parameters:

Name Type Description Default
ordering ndarray

Partial ordering (permutation) of option indices.

required
num_options int

The total number of options.

required
rng Generator

Random number generator.

required

Returns: np.ndarray: Completed ordering of length num_options.

Source code in src/utils/social_welfare_functions.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def complete_ranking(
    ordering: np.ndarray,
    num_options: int,
    *,
    rng: np.random.Generator,
) -> np.ndarray:
    """
    This function adds options that are not in the ordering in a random order.

    Args:
        ordering (nd.ndarray): Partial ordering (permutation) of option indices.
        num_options (int): The total number of options.
        rng (np.random.Generator): Random number generator.
    Returns:
        np.ndarray: Completed ordering of length `num_options`.
    """
    all_options = np.arange(num_options)
    mask = np.isin(all_options, ordering, invert=True)
    non_included_options = all_options[mask]
    rng.shuffle(non_included_options)
    return np.concatenate((ordering, non_included_options))

continuous_score_voting(pref_table, *, rng)

Continuous score voting returning an Ordering (permutation) with RNG noise.

Parameters:

Name Type Description Default
pref_table ndarray

ScoreVector table (disagreement values). per agent (rows) per option (cols), lower = better.

required
rng Generator

Random number generator.

required
Source code in src/utils/social_welfare_functions.py
359
360
361
362
363
364
365
366
367
368
369
370
371
def continuous_score_voting(pref_table: np.ndarray, *, rng: np.random.Generator) -> np.ndarray:
    """Continuous score voting returning an Ordering (permutation) with RNG noise.

    Args:
        pref_table (np.ndarray): ScoreVector table (disagreement values).
            per agent (rows) per option (cols), lower = better.
        rng (np.random.Generator): Random number generator.
    """
    scores = np.sum(pref_table, axis=0)
    eps = 1e-8
    noise = rng.uniform(-eps, eps, len(scores))
    ordering = np.argsort(-(scores + noise))
    return ordering

imp_prepr_for_approval(pref_table)

This is just like preprocessing_for_approval, but more intelligent. It sets the threshold depending on the variances.

Parameters:

Name Type Description Default
pref_table ndarray

Preferences table.

required

Returns:

Type Description
ndarray

np.ndarray: Binary approvals with shape of pref_table.

Source code in src/utils/social_welfare_functions.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def imp_prepr_for_approval(pref_table: np.ndarray) -> np.ndarray:
    """
    This is just like preprocessing_for_approval, but more intelligent.
    It sets the threshold depending on the variances.

    Args:
        pref_table (np.ndarray): Preferences table.

    Returns:
        np.ndarray: Binary approvals with shape of `pref_table`.
    """
    # The threshold is set according to the variances
    threshold = np.mean(pref_table, axis=1) - np.var(pref_table, axis=1)
    return (pref_table < threshold.reshape(-1, 1)).astype(int)

plurality_rule(pref_table, *, rng)

This function implements the plurality-rule social welfare function.

Parameters:

Name Type Description Default
pref_table ndarray

ScoreVector table (disagreement values) per agent (rows) per option (cols), lower = better.

required
rng Generator

Random number generator.

required

Returns: np.ndarray: Ordering (permutation) of options.

Source code in src/utils/social_welfare_functions.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def plurality_rule(pref_table: np.ndarray, *, rng: np.random.Generator) -> np.ndarray:
    """
    This function implements the plurality-rule social welfare function.

    Args:
        pref_table (np.ndarray): ScoreVector table (disagreement values)
            per agent (rows) per option (cols), lower = better.
        rng (np.random.Generator): Random number generator.
    Returns:
        np.ndarray: Ordering (permutation) of options.
    """
    if pref_table.ndim != 2:
        raise ValueError("pref_table must be 2D")
    n, m = pref_table.shape  # n agents, m options
    if m <= 0:
        return np.asarray([], dtype=np.int64)
    if n <= 0:
        return np.arange(m, dtype=np.int64)

    prepared = run_tie_breaking_preparation_for_plurality(pref_table, rng=rng)
    first_choices = np.argmin(prepared, axis=1).astype(np.int64)
    # Preserve legacy plurality tie resolution pattern:
    # randomize ballot order, then stable-sort by plurality counts.
    rng.shuffle(first_choices)
    first_choice_counts: dict[int, int] = {}
    for choice in first_choices:
        c = int(choice)
        first_choice_counts[c] = first_choice_counts.get(c, 0) + 1
    option_count_pairs = list(first_choice_counts.items())
    option_count_pairs.sort(key=lambda x: x[1], reverse=True)
    ordering = np.array([pair[0] for pair in option_count_pairs], dtype=np.int64)
    if ordering.shape[0] < m:
        ordering = complete_ranking(ordering, m, rng=rng)
    validate_ordering(ordering, m)
    return ordering

preprocessing_for_approval(pref_table, threshold=None)

Interpret values below threshold as approval.

This function prepares the preference table for approval voting by interpreting every value below a threshold as an approval. Beware: the values are distance/disagreement => smaller = less disagreement The standard threshold is 1/m (m = number of options). The reasoning is that if the preferences are normalized, 1/m ensures the threshold to be proportionate to the number of options. It also ensures that, on average, half of the options will be approved. The actual number of approved options, however, can still vary depending on the specific values in the preference table.

Parameters:

Name Type Description Default
pref_table ndarray

Preferences table.

required
threshold float | None

Approval threshold; defaults to 1/m.

None

Returns:

Type Description
ndarray

np.ndarray: Binary approvals with shape of pref_table.

Source code in src/utils/social_welfare_functions.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def preprocessing_for_approval(
    pref_table: np.ndarray,
    threshold: float | None = None,
) -> np.ndarray:
    """
    Interpret values below threshold as approval.

    This function prepares the preference table for approval voting
    by interpreting every value below a threshold as an approval.
    Beware: the values are distance/disagreement => smaller = less disagreement
    The standard threshold is 1/m (m = number of options).
    The reasoning is that if the preferences are normalized,
    1/m ensures the threshold to be proportionate to the number of options.
    It also ensures that, on average, half of the options will be approved.
    The actual number of approved options, however,
    can still vary depending on the specific values in the preference table.

    Args:
        pref_table (np.ndarray): Preferences table.
        threshold (float | None): Approval threshold; defaults to 1/m.

    Returns:
        np.ndarray: Binary approvals with shape of `pref_table`.
    """
    if threshold is None:
        threshold = 1 / pref_table.shape[1]
    return (pref_table < threshold).astype(int)

random_rule(pref_table, *, rng)

Uniform random full ranking of options.

Semantics: - Ignores preference magnitudes intentionally. - Returns a full permutation sampled uniformly from all m! rankings. - Deterministic for fixed RNG state/seed.

Source code in src/utils/social_welfare_functions.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
def random_rule(pref_table: np.ndarray, *, rng: np.random.Generator) -> np.ndarray:
    """Uniform random full ranking of options.

    Semantics:
    - Ignores preference magnitudes intentionally.
    - Returns a full permutation sampled uniformly from all `m!` rankings.
    - Deterministic for fixed RNG state/seed.
    """
    if pref_table.ndim != 2:
        raise ValueError("pref_table must be 2D")
    _n, m = pref_table.shape
    if m <= 0:
        return np.asarray([], dtype=np.int64)
    ordering = np.arange(m, dtype=np.int64)
    rng.shuffle(ordering)
    validate_ordering(ordering, m)
    return ordering

run_tie_breaking_preparation_for_plurality(pref_table, eps=1e-09, *, rng)

Prepare ballots for plurality rule by breaking only first-choice ties.

Parameters:

Name Type Description Default
pref_table ndarray

Preferences per agent (rows) per option (cols).

required
eps float

Tiny jitter amplitude used only on tied minimal entries.

1e-09
rng Generator

Random number generator.

required

Returns: np.ndarray: Table where per-row minimal-score ties are broken.

Source code in src/utils/social_welfare_functions.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def run_tie_breaking_preparation_for_plurality(
    pref_table: np.ndarray,
    eps: float = 1e-9,
    *,
    rng: np.random.Generator,
) -> np.ndarray:
    """
    Prepare ballots for plurality rule by breaking *only first-choice ties*.

    Args:
        pref_table (np.ndarray): Preferences per agent (rows) per option (cols).
        eps (float): Tiny jitter amplitude used only on tied minimal entries.
        rng (np.random.Generator): Random number generator.
    Returns:
        np.ndarray: Table where per-row minimal-score ties are broken.
    """
    prepared = np.array(pref_table, dtype=np.float64, copy=True)
    if prepared.ndim != 2:
        raise ValueError("pref_table must be 2D")
    n, m = prepared.shape
    if m <= 0:
        return prepared

    for i in range(n):
        row = prepared[i]
        min_val = float(np.min(row))
        # Tie only among exactly equal first-choice values.
        tied_min = np.flatnonzero(row == min_val)
        if tied_min.size > 1:
            jitter = np.zeros(m, dtype=np.float64)
            jitter[tied_min] = rng.uniform(-eps, eps, size=tied_min.size)
            prepared[i] = row + jitter
    return prepared

schulze_rule(pref_table, *, rng)

Schulze method on option candidates using pairwise strongest paths.

Semantics: - Candidates are option ids (columns of pref_table). - Lower score = better. For voter v, candidate i is preferred to j iff pref_table[v, i] < pref_table[v, j] (strict comparison). - Exact score ties are neutral and contribute to neither side.

Complexity: - O(n * m^2 + m^3), with n voters and m candidates. - Guarded by SCHULZE_MAX_OPTIONS for predictable runtime.

Source code in src/utils/social_welfare_functions.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
def schulze_rule(pref_table: np.ndarray, *, rng: np.random.Generator) -> np.ndarray:
    """Schulze method on option candidates using pairwise strongest paths.

    Semantics:
    - Candidates are option ids (columns of `pref_table`).
    - Lower score = better. For voter `v`, candidate `i` is preferred to `j` iff
      `pref_table[v, i] < pref_table[v, j]` (strict comparison).
    - Exact score ties are neutral and contribute to neither side.

    Complexity:
    - O(n * m^2 + m^3), with n voters and m candidates.
    - Guarded by `SCHULZE_MAX_OPTIONS` for predictable runtime.
    """
    if pref_table.ndim != 2:
        raise ValueError("pref_table must be 2D")
    n, m = pref_table.shape
    if m <= 0:
        return np.asarray([], dtype=np.int64)
    if n <= 0:
        return np.arange(m, dtype=np.int64)
    if m > SCHULZE_MAX_OPTIONS:
        raise ValueError(
            f"Schulze disabled above max options={SCHULZE_MAX_OPTIONS} (got {m})."
        )
    if not np.all(np.isfinite(pref_table)):
        raise ValueError("pref_table must contain only finite values")

    # Pairwise preference counts: d[i,j] = number of voters preferring i over j.
    d = np.zeros((m, m), dtype=np.int64)
    for v in range(n):
        row = np.asarray(pref_table[v], dtype=np.float64)
        pref = row[:, None] < row[None, :]
        d += pref.astype(np.int64)

    # Strongest paths initialization.
    p = np.zeros((m, m), dtype=np.int64)
    for i in range(m):
        for j in range(m):
            if i != j and d[i, j] > d[j, i]:
                p[i, j] = d[i, j]

    # Floyd-Warshall style strongest-path closure.
    for i in range(m):
        for j in range(m):
            if i == j:
                continue
            pji = p[j, i]
            if pji <= 0:
                continue
            for k in range(m):
                if k == i or k == j:
                    continue
                via = p[i, k]
                if via <= 0:
                    continue
                cand = pji if pji < via else via
                if cand > p[j, k]:
                    p[j, k] = cand

    # Total-order tie stack for deterministic + unbiased final ranking.
    wins = np.sum(p > p.T, axis=1).astype(np.int64)
    path_margin = np.sum(p - p.T, axis=1).astype(np.int64)
    pairwise_support = np.sum(d, axis=1).astype(np.int64)
    rand = rng.random(m)

    ordering = np.lexsort((rand, -pairwise_support, -path_margin, -wins)).astype(np.int64)
    validate_ordering(ordering, m)
    return ordering

utilitarian_rule(pref_table, *, rng)

Utilitarian (score) rule on disagreement scores.

Semantics: - pref_table entries are disagreement/oppose scores in [0,1], lower=better. - Aggregate by minimizing total disagreement across voters: total[j] = sum_i pref_table[i, j] Lower total => better.

Tie-breaking: - Random but deterministic given rng, using a secondary random key.

Source code in src/utils/social_welfare_functions.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
def utilitarian_rule(pref_table: np.ndarray, *, rng: np.random.Generator) -> np.ndarray:
    """Utilitarian (score) rule on disagreement scores.

    Semantics:
    - pref_table entries are disagreement/oppose scores in [0,1], lower=better.
    - Aggregate by minimizing total disagreement across voters:
        total[j] = sum_i pref_table[i, j]
      Lower total => better.

    Tie-breaking:
    - Random but deterministic given `rng`, using a secondary random key.
    """
    if pref_table.ndim != 2:
        raise ValueError("pref_table must be 2D")
    n, m = pref_table.shape
    if m <= 0:
        return np.asarray([], dtype=np.int64)
    totals = np.sum(pref_table, axis=0).astype(np.float64)
    # Primary key totals (ascending), secondary random key.
    rand = rng.random(m)
    ordering = np.lexsort((rand, totals))
    validate_ordering(ordering, m)
    return ordering