def phi(f, alpha):
return 1 - (1-f)**alpha
@dataclass
class Params:
SLOTS: int
f: float
honest_stake: np.array
adversary_control: float
window_size: int
use_deps: bool
@property
def N(self):
return len(self.honest_stake) + 1
@property
def stake(self):
return np.append(self.honest_stake, self.honest_stake.sum() / (1/self.adversary_control - 1))
@property
def relative_stake(self):
return self.stake / self.stake.sum()
def slot_prob(self):
return phi(self.f, self.relative_stake)
@dataclass
class Block:
id: int
slot: int
refs: list[int]
deps: list[int]
leader: int
adversarial: bool
Instead of increasing the $f$ parameter, we can run $\pi$ concurrent lotteries with a smaller $f$ parameter to slightly reduce bias. The dash-dotted lines show the probability of winning at least once per-slot:

However, this also increases the number of possible concurrent blocks per slot. A node will have non-zero probability to propose up to $\pi$ blocks per slot, since there are $\pi$ lotteries for each slot per node instead of a single lottery with higher winning probability. I'm wondering how this may impact the weight of the DAG.
import pulp
def dag_with_max_refs(blocks: list, window_size: int) -> dict[int, list[int]]:
"""
Returns a dict: block_id -> list of referenced block_ids
that maximizes the total number of references under these constraints:
- References go backward in time (from later to earlier blocks)
- Reference only within `window_size`
- No transitive references (triangle closure constraint)
"""
prob = pulp.LpProblem("MaxRefsDAG", pulp.LpMaximize)
block_ids = [b.id for b in blocks]
slots = {b.id: b.slot for b in blocks}
id_set = set(block_ids)
# Generate all valid candidate pairs (i -> j)
pairs = [
(i, j)
for i in block_ids
for j in block_ids
if slots[i] > slots[j] and (slots[i] - slots[j]) <= window_size
]
# Decision variables: x[i][j] = 1 if block i references block j
x = pulp.LpVariable.dicts("x", pairs, cat="Binary")
# Objective: maximize total number of references
prob += pulp.lpSum(x[i, j] for (i, j) in pairs)
# Triangle constraints: for all i, j, k:
# if i -> j and j -> k then i must NOT reference k to avoid closure
# Note that this is a good approximation, but it would allow the adversary simulation to
# sneak in some extra refs. Since that only could improve our results, this is good enough.
for i in block_ids:
for j in block_ids:
for k in block_ids:
if (i, j) in x and (j, k) in x and (i, k) in x:
prob += x[i, j] + x[j, k] + x[i, k] <= 2
# Solve the ILP
solver = pulp.PULP_CBC_CMD(msg=False)
result = prob.solve(solver)
if pulp.LpStatus[result] != "Optimal":
raise RuntimeError("ILP did not find an optimal solution.")
# Extract result
ref_graph = {i: [] for i in block_ids}
for (i, j) in pairs:
if pulp.value(x[i, j]) > 0.5:
ref_graph[i].append(j)
return ref_graph
def __init__(self, params: Params, network: NetworkParams):
self.params = params
self.network = network
# leaders: fixed-size (N × SLOTS)
self.leaders = np.zeros((params.N, params.SLOTS), dtype=np.int32)
# Preallocate capacity for blocks: at most N * SLOTS blocks
max_blocks = params.N * params.SLOTS
self.block_slots = np.empty(max_blocks, dtype=np.int32)
self.block_arrivals = np.empty((params.N, max_blocks), dtype=np.int32)
self.num_blocks = 0
self.blocks: list[Block] = []
# Emit genesis block (id = 0)
self.emit_block(leader=0, slot=0, refs=[], deps=[])
# Set arrival times of genesis to 0
self.block_arrivals[:, 0] = 0
def clone_for_attack(self):
new = object.__new__(Sim)
new.params = self.params
new.network = self.network
# Copy leaders array
new.leaders = self.leaders.copy()
# Preallocate same sizes
max_blocks = self.params.N * self.params.SLOTS
new.block_slots = np.empty_like(self.block_slots)
new.block_arrivals = np.empty_like(self.block_arrivals)
new.num_blocks = self.num_blocks
# Copy blocks list (shallow copy of each Block)
new.blocks = [
Block(
id=b.id,
leader=b.leader,
slot=b.slot,
refs=b.refs.copy(),
deps=b.deps.copy(),
adversarial=b.adversarial
) for b in self.blocks
]
# Copy underlying arrays
new.block_slots[: self.num_blocks] = self.block_slots[: self.num_blocks]
new.block_arrivals[:, : self.num_blocks] = self.block_arrivals[:, : self.num_blocks]
return new
def get_seen_blocks_in_window_for_node(self, node_id: int, current_slot: int, window_size: int) -> list[int]:
if not (0 <= node_id < self.params.N):
raise ValueError(f"Invalid node_id: {node_id}. Must be between 0 and {self.params.N - 1}.")
if window_size <= 0:
raise ValueError(f"window_size must be positive. Got {window_size}.")
if self.num_blocks == 0:
return []
min_slot = current_slot - window_size + 1
max_slot = current_slot
arrivals = self.block_arrivals[node_id, : self.num_blocks]
slots = self.block_slots[: self.num_blocks]
mask = (
(arrivals >= min_slot) & (arrivals <= max_slot) &
(slots >= min_slot) & (slots <= max_slot)
)
return np.nonzero(mask)[0].tolist()
def get_all_blocks_in_window_for_node(self, node_id: int, current_slot: int, window_size: int) -> list[int]:
if not (0 <= node_id < self.params.N):
raise ValueError(f"Invalid node_id: {node_id}. Must be between 0 and {self.params.N - 1}.")
if window_size <= 0:
raise ValueError(f"window_size must be positive. Got {window_size}.")
if self.num_blocks == 0:
return []
min_slot = current_slot - window_size + 1
max_slot = current_slot
slots = self.block_slots[: self.num_blocks]
mask = (slots >= min_slot) & (slots <= max_slot)
return np.nonzero(mask)[0].tolist()
def get_unreachable_blocks(self, node_id: int, current_slot: int) -> list[int]:
if not (0 <= node_id < self.params.N):
raise ValueError(f"Invalid node_id: {node_id}. Must be between 0 and {self.params.N - 1}.")
arrivals = self.block_arrivals[node_id, : self.num_blocks]
seen_ids = set(np.nonzero(arrivals <= current_slot)[0].tolist())
has_incoming = set()
for b in seen_ids:
for parent in self.blocks[b].refs:
if parent in seen_ids:
has_incoming.add(parent)
return [b for b in seen_ids if b not in has_incoming]