Cryptarchia v2

def phi(f, alpha):
    return 1 - (1-f)**alpha

@dataclass
class Params:
    SLOTS: int
    f: float
    honest_stake: np.array
    adversary_control: float
    window_size: int
    use_deps: bool

    @property
    def N(self):
        return len(self.honest_stake) + 1

    @property
    def stake(self):
        return np.append(self.honest_stake, self.honest_stake.sum() / (1/self.adversary_control - 1))
    
    @property
    def relative_stake(self):
        return self.stake / self.stake.sum()

    def slot_prob(self):
        return phi(self.f, self.relative_stake)

@dataclass
class Block:
    id: int
    slot: int
    refs: list[int]
    deps: list[int]
    leader: int
    adversarial: bool

A note on Phi and parallelization

Instead of increasing the $f$ parameter, we can run $\pi$ concurrent lotteries with a smaller $f$ parameter to slightly reduce bias. The dash-dotted lines show the probability of winning at least once per-slot:

image.png

However, this also increases the number of possible concurrent blocks per slot. A node will have non-zero probability to propose up to $\pi$ blocks per slot, since there are $\pi$ lotteries for each slot per node instead of a single lottery with higher winning probability. I'm wondering how this may impact the weight of the DAG.

DAG with Max Refs

import pulp

def dag_with_max_refs(blocks: list, window_size: int) -> dict[int, list[int]]:
    """
    Returns a dict: block_id -> list of referenced block_ids
    that maximizes the total number of references under these constraints:
    - References go backward in time (from later to earlier blocks)
    - Reference only within `window_size`
    - No transitive references (triangle closure constraint)
    """
    prob = pulp.LpProblem("MaxRefsDAG", pulp.LpMaximize)

    block_ids = [b.id for b in blocks]
    slots = {b.id: b.slot for b in blocks}
    id_set = set(block_ids)

    # Generate all valid candidate pairs (i -> j)
    pairs = [
        (i, j)
        for i in block_ids
        for j in block_ids
        if slots[i] > slots[j] and (slots[i] - slots[j]) <= window_size
    ]

    # Decision variables: x[i][j] = 1 if block i references block j
    x = pulp.LpVariable.dicts("x", pairs, cat="Binary")

    # Objective: maximize total number of references
    prob += pulp.lpSum(x[i, j] for (i, j) in pairs)

    # Triangle constraints: for all i, j, k:
    # if i -> j and j -> k then i must NOT reference k to avoid closure
    # Note that this is a good approximation, but it would allow the adversary simulation to
    # sneak in some extra refs. Since that only could improve our results, this is good enough.
    for i in block_ids:
        for j in block_ids:
            for k in block_ids:
                if (i, j) in x and (j, k) in x and (i, k) in x:
                    prob += x[i, j] + x[j, k] + x[i, k] <= 2

    # Solve the ILP
    solver = pulp.PULP_CBC_CMD(msg=False)
    result = prob.solve(solver)

    if pulp.LpStatus[result] != "Optimal":
        raise RuntimeError("ILP did not find an optimal solution.")

    # Extract result
    ref_graph = {i: [] for i in block_ids}
    for (i, j) in pairs:
        if pulp.value(x[i, j]) > 0.5:
            ref_graph[i].append(j)

    return ref_graph

Class Sim

Init

    def __init__(self, params: Params, network: NetworkParams):
        self.params = params
        self.network = network

        # leaders: fixed-size (N × SLOTS)
        self.leaders = np.zeros((params.N, params.SLOTS), dtype=np.int32)

        # Preallocate capacity for blocks: at most N * SLOTS blocks
        max_blocks = params.N * params.SLOTS
        self.block_slots = np.empty(max_blocks, dtype=np.int32)
        self.block_arrivals = np.empty((params.N, max_blocks), dtype=np.int32)
        self.num_blocks = 0

        self.blocks: list[Block] = []

        # Emit genesis block (id = 0)
        self.emit_block(leader=0, slot=0, refs=[], deps=[])
        # Set arrival times of genesis to 0
        self.block_arrivals[:, 0] = 0

Clone for Attack

def clone_for_attack(self):
        new = object.__new__(Sim)
        new.params = self.params
        new.network = self.network

        # Copy leaders array
        new.leaders = self.leaders.copy()

        # Preallocate same sizes
        max_blocks = self.params.N * self.params.SLOTS
        new.block_slots = np.empty_like(self.block_slots)
        new.block_arrivals = np.empty_like(self.block_arrivals)
        new.num_blocks = self.num_blocks

        # Copy blocks list (shallow copy of each Block)
        new.blocks = [
            Block(
                id=b.id,
                leader=b.leader,
                slot=b.slot,
                refs=b.refs.copy(),
                deps=b.deps.copy(),
                adversarial=b.adversarial
            ) for b in self.blocks
        ]

        # Copy underlying arrays
        new.block_slots[: self.num_blocks] = self.block_slots[: self.num_blocks]
        new.block_arrivals[:, : self.num_blocks] = self.block_arrivals[:, : self.num_blocks]

        return new

Get Seen Blocks in Window for Node

def get_seen_blocks_in_window_for_node(self, node_id: int, current_slot: int, window_size: int) -> list[int]:
        if not (0 <= node_id < self.params.N):
            raise ValueError(f"Invalid node_id: {node_id}. Must be between 0 and {self.params.N - 1}.")
        if window_size <= 0:
            raise ValueError(f"window_size must be positive. Got {window_size}.")
        if self.num_blocks == 0:
            return []

        min_slot = current_slot - window_size + 1
        max_slot = current_slot

        arrivals = self.block_arrivals[node_id, : self.num_blocks]
        slots    = self.block_slots[: self.num_blocks]

        mask = (
            (arrivals >= min_slot) & (arrivals <= max_slot) &
            (slots    >= min_slot) & (slots    <= max_slot)
        )
        return np.nonzero(mask)[0].tolist()

Get All Blocks in Window for Node

def get_all_blocks_in_window_for_node(self, node_id: int, current_slot: int, window_size: int) -> list[int]:
        if not (0 <= node_id < self.params.N):
            raise ValueError(f"Invalid node_id: {node_id}. Must be between 0 and {self.params.N - 1}.")
        if window_size <= 0:
            raise ValueError(f"window_size must be positive. Got {window_size}.")
        if self.num_blocks == 0:
            return []

        min_slot = current_slot - window_size + 1
        max_slot = current_slot

        slots = self.block_slots[: self.num_blocks]
        mask = (slots >= min_slot) & (slots <= max_slot)
        return np.nonzero(mask)[0].tolist()

Get Unreachable Blocks

def get_unreachable_blocks(self, node_id: int, current_slot: int) -> list[int]:
        if not (0 <= node_id < self.params.N):
            raise ValueError(f"Invalid node_id: {node_id}. Must be between 0 and {self.params.N - 1}.")

        arrivals = self.block_arrivals[node_id, : self.num_blocks]
        seen_ids = set(np.nonzero(arrivals <= current_slot)[0].tolist())

        has_incoming = set()
        for b in seen_ids:
            for parent in self.blocks[b].refs:
                if parent in seen_ids:
                    has_incoming.add(parent)

        return [b for b in seen_ids if b not in has_incoming]

Get Max Cardinality Antichain