Department of Mathematics

The Chinese University of Hong Kong

1 Four-Compartment Mouse Markov Chain

# Four-Compartment Mouse Markov Chain


# Description:
#    1. Define transition matrix
#    2. Verify stochastic matrix properties
#    3. Compute distributions over time
#    4. Find probability of compartment 2 after 3 steps
#    5. Compute stationary distribution
#    6. Verify stationary equations
 

import numpy as np
from scipy.linalg import eig

# ============================================================
# Transition Matrix
# ============================================================

P = np.array([
    [0,   2/3, 0,   1/3],
    [2/3, 0,   1/3, 0  ],
    [0,   1/2, 0,   1/2],
    [1/2, 0,   1/2, 0  ]
], dtype=float)

# ============================================================
# Validate Matrix
# ============================================================

def validate_transition_matrix(P):
    """
    Check whether a matrix is a valid transition matrix.
    """

    rowsum = P.sum(axis=1)

    if not np.allclose(rowsum, 1):
        raise ValueError(
            f"Invalid transition matrix. Row sums = {rowsum}"
        )

    if np.any(P < 0):
        raise ValueError(
            "Transition matrix contains negative probabilities."
        )

    print("✓ Transition matrix is valid.\n")

validate_transition_matrix(P)
## ✓ Transition matrix is valid.
# ============================================================
# Initial State
# ============================================================

# Start in compartment 2
pi0 = np.array([0, 1, 0, 0])

print("Initial distribution:")
## Initial distribution:
print(pi0)
## [0 1 0 0]
print()
# ============================================================
# Compute Distributions
# ============================================================

def propagate_distribution(initial_dist, P, n_steps):
    """
    Compute distribution after n_steps.

    π(n) = π(0) P^n
    """

    distributions = [initial_dist]

    current = initial_dist.copy()

    for _ in range(n_steps):
        current = current @ P
        distributions.append(current)

    return distributions

distributions = propagate_distribution(pi0, P, 3)

for step, dist in enumerate(distributions):
    print(f"Distribution at step {step}:")
    print(dist)
    print()
## Distribution at step 0:
## [0 1 0 0]
## 
## Distribution at step 1:
## [0.66666667 0.         0.33333333 0.        ]
## 
## Distribution at step 2:
## [0.         0.61111111 0.         0.38888889]
## 
## Distribution at step 3:
## [0.60185185 0.         0.39814815 0.        ]
# ============================================================
# Probability of state 2 after 3 steps
# ============================================================

pi3 = distributions[3]

prob_state_2 = pi3[1]

print("=" * 60)
## ============================================================
print("Probability mouse is in compartment 2 after 3 steps")
## Probability mouse is in compartment 2 after 3 steps
print("=" * 60)
## ============================================================
print(prob_state_2)
## 0.0
# Expected result:
# 0 because chain alternates partitions

print()
# ============================================================
# Compute P^3 directly
# ============================================================

P3 = np.linalg.matrix_power(P, 3)

print("=" * 60)
## ============================================================
print("P^3")
## P^3
print("=" * 60)
## ============================================================
print(P3)
## [[0.         0.60185185 0.         0.39814815]
##  [0.60185185 0.         0.39814815 0.        ]
##  [0.         0.59722222 0.         0.40277778]
##  [0.59722222 0.         0.40277778 0.        ]]
print()
# ============================================================
# Stationary Distribution
# ============================================================

def stationary_distribution(P):
    """
    Solve πP = π

    Uses eigenvector associated with eigenvalue 1.
    """

    eigenvalues, eigenvectors = eig(P.T)

    idx = np.argmin(np.abs(eigenvalues - 1))

    stationary = np.real(eigenvectors[:, idx])

    stationary = stationary / stationary.sum()

    stationary = stationary.real

    return stationary

pi_star = stationary_distribution(P)

print("=" * 60)
## ============================================================
print("Stationary Distribution")
## Stationary Distribution
print("=" * 60)
## ============================================================
print(pi_star)
## [0.3 0.3 0.2 0.2]
print()
# ============================================================
# Verification
# ============================================================

verification = pi_star @ P

print("π*P:")
## π*P:
print(verification)
## [0.3 0.3 0.2 0.2]
print()
print("Difference:")
## Difference:
print(verification - pi_star)
## [ 1.66533454e-16 -5.55111512e-17 -5.55111512e-17 -8.32667268e-17]
print()
if np.allclose(verification, pi_star):
    print("✓ Stationary distribution verified.")
else:
    print("✗ Verification failed.")
## ✓ Stationary distribution verified.
print()
# ============================================================
# Long-run probability of compartment 2
# ============================================================

print("=" * 60)
## ============================================================
print("Long-run probability of compartment 2")
## Long-run probability of compartment 2
print("=" * 60)
## ============================================================
print(pi_star[1])
## 0.3

2 Two-State Markov Chain Example

# Compute:

# P(X1=2, X4=1, X6=1, X18=1 | X0=1)

# using matrix powers.

# Transition Matrix:

#         Destination
#           1      2
#       ----------------
# 1 |    0.5    0.5
# 2 |    0.3    0.7

 

import numpy as np


# =============================================================
# Transition Matrix
# =============================================================

P = np.array(
    [
        [0.5, 0.5],
        [0.3, 0.7]
    ],
    dtype=float
)


# =============================================================
# Validation Functions
# =============================================================

def validate_transition_matrix(matrix):
    """
    Validate Markov transition matrix.

    Conditions:
    1. Square matrix
    2. No negative probabilities
    3. Each row sums to 1
    """

    rows, cols = matrix.shape

    if rows != cols:
        raise ValueError(
            "Transition matrix must be square."
        )

    if np.any(matrix < 0):
        raise ValueError(
            "Transition matrix contains negative entries."
        )

    row_sums = matrix.sum(axis=1)

    if not np.allclose(row_sums, 1.0):
        raise ValueError(
            f"Rows do not sum to 1.\nRow sums = {row_sums}"
        )

    print("✓ Transition matrix is valid.\n")


# =============================================================
# Compute Stationary Distribution
# =============================================================

def stationary_distribution(matrix):
    """
    Solve:

        πP = π

    using eigenvectors.
    """

    eigenvalues, eigenvectors = np.linalg.eig(
        matrix.T
    )

    idx = np.argmin(
        np.abs(eigenvalues - 1)
    )

    stationary = np.real(
        eigenvectors[:, idx]
    )

    stationary = stationary / stationary.sum()

    return stationary


# =============================================================
# Main Program
# =============================================================

def main():

    print("=" * 60)
    print("TWO-STATE MARKOV CHAIN")
    print("=" * 60)
    print()

    # ---------------------------------------------------------
    # Validate matrix
    # ---------------------------------------------------------
    validate_transition_matrix(P)

    print("Transition Matrix P")
    print(P)
    print()

    # ---------------------------------------------------------
    # Matrix powers
    # ---------------------------------------------------------
    P2 = np.linalg.matrix_power(P, 2)
    P3 = np.linalg.matrix_power(P, 3)
    P12 = np.linalg.matrix_power(P, 12)

    print("=" * 60)
    print("P^2")
    print("=" * 60)
    print(P2)
    print()

    print("=" * 60)
    print("P^3")
    print("=" * 60)
    print(P3)
    print()

    print("=" * 60)
    print("P^12")
    print("=" * 60)
    print(P12)
    print()

    # ---------------------------------------------------------
    # Extract required probabilities
    # ---------------------------------------------------------

    # P(X1=2 | X0=1)
    p12 = P[0, 1]

    # P(X4=1 | X1=2)
    p3_21 = P3[1, 0]

    # P(X6=1 | X4=1)
    p2_11 = P2[0, 0]

    # P(X18=1 | X6=1)
    p12_11 = P12[0, 0]

    print("=" * 60)
    print("Required Entries")
    print("=" * 60)

    print(f"P12      = {p12:.12f}")
    print(f"(P^3)21  = {p3_21:.12f}")
    print(f"(P^2)11  = {p2_11:.12f}")
    print(f"(P^12)11 = {p12_11:.12f}")
    print()

    # ---------------------------------------------------------
    # Final Joint Probability
    # ---------------------------------------------------------

    joint_probability = (
        p12
        * p3_21
        * p2_11
        * p12_11
    )

    print("=" * 60)
    print("FINAL JOINT PROBABILITY")
    print("=" * 60)

    print(
        "P(X1=2, X4=1, X6=1, X18=1 | X0=1)"
    )

    print(f"= {joint_probability:.15f}")
    print()

    # ---------------------------------------------------------
    # Stationary Distribution
    # ---------------------------------------------------------

    pi_star = stationary_distribution(P)

    print("=" * 60)
    print("STATIONARY DISTRIBUTION")
    print("=" * 60)

    print(pi_star)
    print()

    print("Verification πP:")
    print(pi_star @ P)
    print()

    if np.allclose(pi_star @ P, pi_star):
        print("✓ Verified: πP = π")
    else:
        print("✗ Verification failed")

    print()


# =============================================================
# Run Program
# =============================================================

if __name__ == "__main__":
    main()
## ============================================================
## TWO-STATE MARKOV CHAIN
## ============================================================
## 
## ✓ Transition matrix is valid.
## 
## Transition Matrix P
## [[0.5 0.5]
##  [0.3 0.7]]
## 
## ============================================================
## P^2
## ============================================================
## [[0.4  0.6 ]
##  [0.36 0.64]]
## 
## ============================================================
## P^3
## ============================================================
## [[0.38  0.62 ]
##  [0.372 0.628]]
## 
## ============================================================
## P^12
## ============================================================
## [[0.375 0.625]
##  [0.375 0.625]]
## 
## ============================================================
## Required Entries
## ============================================================
## P12      = 0.500000000000
## (P^3)21  = 0.372000000000
## (P^2)11  = 0.400000000000
## (P^12)11 = 0.375000002560
## 
## ============================================================
## FINAL JOINT PROBABILITY
## ============================================================
## P(X1=2, X4=1, X6=1, X18=1 | X0=1)
## = 0.027900000190464
## 
## ============================================================
## STATIONARY DISTRIBUTION
## ============================================================
## [0.375 0.625]
## 
## Verification πP:
## [0.375 0.625]
## 
## ✓ Verified: πP = π

3 Weather Forecast using Markov Chains

import numpy as np

# -------------------------------
# Step 1: Define the transition matrix
# Rows = current state, Columns = next state
# States: [Cloudy, Sunny, Snowy]
# -------------------------------
transmat = np.array([
    [0.57, 0.22, 0.21],  # Cloudy row
    [0.21, 0.44, 0.10],  # Sunny row
    [0.21, 0.33, 0.33]   # Snowy row
])

# -------------------------------
# Step 2: Define the initial state vector
# November 12, 2020 = Cloudy
# -------------------------------
state0 = np.array([1, 0, 0])  # Cloudy = 1, Sunny = 0, Snowy = 0

# -------------------------------
# Step 3: Forecast function
# state_n = transmat^n * state0
# -------------------------------
def forecast(transmat, state0, n_steps):
    # Raise transition matrix to the nth power
    transmat_n = np.linalg.matrix_power(transmat, n_steps)
    # Multiply by initial state vector
    state_n = transmat_n @ state0
    return transmat_n, state_n

# -------------------------------
# Step 4: Compute forecasts for 1, 2, 3 days ahead
# -------------------------------
for n in range(1, 4):
    transmat_n, state_n = forecast(transmat, state0, n)
    print(f"\nTransition Matrix ^ {n}:\n", transmat_n)
    print(f"State vector at day {n}:\n", state_n)
## 
## Transition Matrix ^ 1:
##  [[0.57 0.22 0.21]
##  [0.21 0.44 0.1 ]
##  [0.21 0.33 0.33]]
## State vector at day 1:
##  [0.57 0.21 0.21]
## 
## Transition Matrix ^ 2:
##  [[0.4152 0.2915 0.211 ]
##  [0.2331 0.2728 0.1211]
##  [0.2583 0.3003 0.186 ]]
## State vector at day 2:
##  [0.4152 0.2331 0.2583]
## 
## Transition Matrix ^ 3:
##  [[0.342189 0.289234 0.185972]
##  [0.215586 0.211277 0.116194]
##  [0.249354 0.250338 0.145653]]
## State vector at day 3:
##  [0.342189 0.215586 0.249354]
# -------------------------------
# Step 5: Interpretation
# -------------------------------
labels = ["Cloudy", "Sunny", "Snowy"]
transmat3, state3 = forecast(transmat, state0, 3)

print("\n--- Forecast for November 15 (3 days ahead) ---")
## 
## --- Forecast for November 15 (3 days ahead) ---
for label, prob in zip(labels, state3):
    print(f"{label}: {prob:.2f} ({prob*100:.0f}%)")
## Cloudy: 0.34 (34%)
## Sunny: 0.22 (22%)
## Snowy: 0.25 (25%)

4 Market Forecast using Markov Chains

# Transition matrix from Table (Bull, Bear, Stagnant)

import numpy as np

# -------------------------------
# Step 1: Define the transition matrix
# Rows = current state, Columns = next state
# States: [Bull, Bear, Stagnant]
# -------------------------------
transmat = np.array([
    [0.900, 0.075, 0.025],  # Bull row
    [0.150, 0.800, 0.050],  # Bear row
    [0.250, 0.250, 0.500]   # Stagnant row
])

# -------------------------------
# Step 2: Define the initial state row vector
# Current week = Bear market
# -------------------------------
state0 = np.array([0, 1, 0])  # Bull=0, Bear=1, Stagnant=0

# -------------------------------
# Step 3: Forecast function
# state_n = state0 * transmat^n
# -------------------------------
def forecast(transmat, state0, n_steps):
    # Raise transition matrix to the nth power
    transmat_n = np.linalg.matrix_power(transmat, n_steps)
    # Multiply row vector by matrix power
    state_n = state0 @ transmat_n
    return transmat_n, state_n

# -------------------------------
# Step 4: Compute forecasts for 1, 5, 52, 99 weeks ahead
# -------------------------------
for n in [1, 5, 52, 99]:
    transmat_n, state_n = forecast(transmat, state0, n)
    print(f"\nTransition Matrix ^ {n}:\n", transmat_n)
    print(f"State vector at week {n}:\n", state_n)
## 
## Transition Matrix ^ 1:
##  [[0.9   0.075 0.025]
##  [0.15  0.8   0.05 ]
##  [0.25  0.25  0.5  ]]
## State vector at week 1:
##  [0.15 0.8  0.05]
## 
## Transition Matrix ^ 5:
##  [[0.70683  0.238305 0.054865]
##  [0.47661  0.450515 0.072875]
##  [0.54865  0.364375 0.086975]]
## State vector at week 5:
##  [0.47661  0.450515 0.072875]
## 
## Transition Matrix ^ 52:
##  [[0.62500006 0.31249994 0.06249999]
##  [0.62499988 0.31250011 0.06250001]
##  [0.62499995 0.31250005 0.0625    ]]
## State vector at week 52:
##  [0.62499988 0.31250011 0.06250001]
## 
## Transition Matrix ^ 99:
##  [[0.625  0.3125 0.0625]
##  [0.625  0.3125 0.0625]
##  [0.625  0.3125 0.0625]]
## State vector at week 99:
##  [0.625  0.3125 0.0625]
# -------------------------------
# Step 5: Compute stationary distribution
# Solve π such that π * transmat = π and sum(π)=1
# -------------------------------
# Transpose for solving linear system: (transmat.T - I)π = 0
A = transmat.T - np.eye(3)
# Add normalization constraint
A = np.vstack([A, np.ones(3)])
b = np.array([0, 0, 0, 1])

# Solve least squares system
pi, _, _, _ = np.linalg.lstsq(A, b, rcond=None)

print("\n--- Stationary Distribution ---")
## 
## --- Stationary Distribution ---
labels = ["Bull", "Bear", "Stagnant"]
for label, prob in zip(labels, pi):
    print(f"{label}: {prob:.2f} ({prob*100:.0f}%)")
## Bull: 0.63 (63%)
## Bear: 0.31 (31%)
## Stagnant: 0.06 (6%)