by tirthajyoti


Markov Chain Basics with a fun problem "Grad student's dilemma"

Dr. Tirthajyoti Sarkar, Fremont CA 94536

Andrey Markov was a Russian mathematician who studied stochastic processes. Markov was particularly interested in systems that follow a chain of linked events. In 1906, Markov produced interesting results about discrete processes that he called chain. A Markov Chain has a set of states S={s0,s1,...,sm} and a process that can move successively from one state to another. Each move is a single step and is based on a transition model T

To summarise a Markov chain is defined by:

$\bf{Set\ of\ possible\ States:}S=\{{s_0,s_1,...,s_m}\}$

$\bf{Initial\ State}: s_0$

$\bf{Transition\ Model}: T(s,s^′)$

A Markov chain is based on the Markov Property. The Markov property states that given the present, the future is conditionally independent of the past.

Let’s suppose we have a chain with only two states $s_0$ and $s_1$, where $s_0$ is the initial state. The process is in $s_0$ 90% of the time and it can move to $s_1$ the remaining 10% of the time. When the process is in state $s_1$ it will remain there 50% of the time.

So, the transition matrix can be written as,

$$T=\quad \begin{bmatrix} 0.9 & 0.1 \\ 0.5 & 0.5 \end{bmatrix}$$

The transition matrix is always a square matrix, and since we are dealing with probability distributions all the entries are within 0 and 1 and a single row sums to 1.

Here is the state diagram,


Markov Decision Process (MDP)

Markov Decision Processes are formally described as processes that follow the Markov property which states that "The future is independent of the past given the present". MDPs formally describe environments for reinforcement learning and we assume that the environment is fully observable.

body {
    font-family: "Cambria", cursive, sans-serif;
import random, time
import numpy as np
from collections import defaultdict
import operator
import matplotlib.pyplot as plt

Class definitions

Base MDP class

class MDP:
    """A Markov Decision Process, defined by an initial state, transition model,
    and reward function. We also keep track of a gamma value, for use by
    algorithms. The transition model is represented somewhat differently from
    the text. Instead of P(s' | s, a) being a probability number for each
    state/state/action triplet, we instead have T(s, a) return a
    list of (p, s') pairs. We also keep track of the possible states,
    terminal states, and actions for each state."""

    def __init__(self, init, actlist, terminals, transitions = {}, reward = None, states=None, gamma=.9):
        if not (0 < gamma <= 1):
            raise ValueError("An MDP must have 0 < gamma <= 1")

        if states:
            self.states = states
            ## collect states from transitions table
            self.states = self.get_states_from_transitions(transitions)
        self.init = init
        if isinstance(actlist, list):
            ## if actlist is a list, all states have the same actions
            self.actlist = actlist
        elif isinstance(actlist, dict):
            ## if actlist is a dict, different actions for each state
            self.actlist = actlist
        self.terminals = terminals
        self.transitions = transitions
        #if self.transitions == {}:
            #print("Warning: Transition table is empty.")
        self.gamma = gamma
        if reward:
            self.reward = reward
            self.reward = {s : 0 for s in self.states}

    def R(self, state):
        """Return a numeric reward for this state."""
        return self.reward[state]

    def T(self, state, action):
        """Transition model. From a state and an action, return a list
        of (probability, result-state) pairs."""
        if(self.transitions == {}):
            raise ValueError("Transition model is missing")
            return self.transitions[state][action]

    def actions(self, state):
        """Set of actions that can be performed in this state. By default, a
        fixed list of actions, except for terminal states. Override this
        method if you need to specialize by state."""
        if state in self.terminals:
            return [None]
            return self.actlist

    def get_states_from_transitions(self, transitions):
        if isinstance(transitions, dict):
            s1 = set(transitions.keys())
            s2 = set([tr[1] for actions in transitions.values() 
                              for effects in actions.values() for tr in effects])
            return s1.union(s2)
            print('Could not retrieve states from transitions')
            return None

    def check_consistency(self):
        # check that all states in transitions are valid
        assert set(self.states) == self.get_states_from_transitions(self.transitions)
        # check that init is a valid state
        assert self.init in self.states
        # check reward for each state
        #assert set(self.reward.keys()) == set(self.states)
        assert set(self.reward.keys()) == set(self.states)
        # check that all terminals are valid states
        assert all([t in self.states for t in self.terminals])
        # check that probability distributions for all actions sum to 1
        for s1, actions in self.transitions.items():
            for a in actions.keys():
                s = 0
                for o in actions[a]:
                    s += o[0]
                assert abs(s - 1) < 0.001

A custom MDP class to extend functionality

We will write a CustomMDP class to extend the MDP class for the problem at hand.
This class will implement the T method to implement the transition model.

class CustomMDP(MDP):

    def __init__(self, transition_matrix, rewards, terminals, init, gamma=.9):
        # All possible actions.
        actlist = []
        for state in transition_matrix.keys():
        actlist = list(set(actlist))

        MDP.__init__(self, init, actlist, terminals=terminals, gamma=gamma)
        self.t = transition_matrix
        self.reward = rewards
        for state in self.t:

    def T(self, state, action):
        if action is None:
            return [(0.0, state)]
            return [(prob, new_state) for new_state, prob in self.t[state][action].items()]

A Simple MDP: Graduate student's dillema

State dependent reward function

Let us take a toy example MDP and solve it using value iteration and policy iteration. This is a simple example adapted from a similar problem by Dr. David Silver, tweaked to fit the limitations of the current functions.

Let's say you're a student attending lectures in a university. There are three lectures you need to attend on a given day. Attending the first lecture gives you 4 points of reward. After the first lecture, you have a 0.6 probability to continue into the second one, yielding 6 more points of reward. But, with a probability of 0.4, you get distracted and start using Facebook instead and get a reward of -1. From then onwards, you really can't let go of Facebook and there's just a 0.1 probability that you will concentrate back on the lecture.

After the second lecture, you have an equal chance of attending the next lecture or just falling asleep. Falling asleep is the terminal state and yields you no reward, but continuing on to the final lecture gives you a big reward of 10 points. From there on, you have a 40% chance of going to study and reach the terminal state, but a 60% chance of going to the pub with your friends instead. You end up drunk and don't know which lecture to attend, so you go to one of the lectures according to the probabilities given above.

Definition of transition matrix

We first have to define our Transition Matrix as a nested dictionary to fit the requirements of the MDP class.

t = {
    'leisure': {
                    'facebook': {'leisure':0.9, 'class1':0.1},
                    'quit': {'leisure':0.1, 'class1':0.9},
                    'study': {},
                    'sleep': {},
                    'pub': {}
    'class1': {
                    'study': {'class2':0.6, 'leisure':0.4},
                    'facebook': {'class2':0.4, 'leisure':0.6},
                    'quit': {},
                    'sleep': {},
                    'pub': {}
    'class2': {
                    'study': {'class3':0.5, 'end':0.5},
                    'sleep': {'end':0.5, 'class3':0.5},
                    'facebook': {},
                    'quit': {},
                    'pub': {},
    'class3': {
                    'study': {'end':0.6, 'class1':0.08, 'class2':0.16, 'class3':0.16},
                    'pub': {'end':0.4, 'class1':0.12, 'class2':0.24, 'class3':0.24},
                    'facebook': {},
                    'quit': {},
                    'sleep': {}
    'end': {}

Defining rewards

We now need to define the reward for each state.

rewards = {
    'class1': 4,
    'class2': 6,
    'class3': 10,
    'leisure': -1,
    'end': 0

Terminal state

This MDP has only one terminal state

terminals = ['end']

Setting initial state to Class 1

init = 'class1'

Read in an instance of the custom class

school_mdp = CustomMDP(t, rewards, terminals, init, gamma=.95)

Let's see the actions and rewards of the MDP

{'class1', 'class2', 'class3', 'end', 'leisure'}
['facebook', 'pub', 'quit', 'study', 'sleep']
['facebook', 'pub', 'quit', 'study', 'sleep']
{'class1': 4, 'class2': 6, 'class3': 10, 'leisure': -1, 'end': 0}

Value iteration

def value_iteration(mdp, epsilon=0.001):
    """Solving an MDP by value iteration.
    mdp: The MDP object
    epsilon: Stopping criteria
    U1 = {s: 0 for s in mdp.states}
    R, T, gamma = mdp.R, mdp.T, mdp.gamma
    while True:
        U = U1.copy()
        delta = 0
        for s in mdp.states:
            U1[s] = R(s) + gamma * max([sum([p * U[s1] for (p, s1) in T(s, a)])
                                        for a in mdp.actions(s)])
            delta = max(delta, abs(U1[s] - U[s]))
        if delta < epsilon * (1 - gamma) / gamma:
            return U
def value_iteration_over_time(mdp, iterations=20):
    U_over_time = []
    U1 = {s: 0 for s in mdp.states}
    R, T, gamma = mdp.R, mdp.T, mdp.gamma
    for _ in range(iterations):
        U = U1.copy()
        for s in mdp.states:
            U1[s] = R(s) + gamma * max([sum([p * U[s1] for (p, s1) in T(s, a)])
                                        for a in mdp.actions(s)])
    return U_over_time
def best_policy(mdp, U):
    """Given an MDP and a utility function U, determine the best policy,
    as a mapping from state to action."""

    pi = {}
    for s in mdp.states:
        pi[s] = max(mdp.actions(s), key=lambda a: expected_utility(a, s, U, mdp))
    return pi

Value iteration on the school MDP

{'class2': 15.792664558035112,
 'leisure': 18.079639654154484,
 'class3': 20.61614864677164,
 'end': 0.0,
 'class1': 20.306571436730533}
[{'class2': 0, 'leisure': 0, 'class3': 0, 'end': 0, 'class1': 0},
 {'class2': 6.0, 'leisure': -1.0, 'class3': 10.0, 'end': 0.0, 'class1': 4.0},
 {'class2': 10.75,
  'leisure': 2.3249999999999997,
  'class3': 14.104,
  'end': 0.0,
  'class1': 7.039999999999999},
 {'class2': 12.699399999999999,
  'leisure': 5.240074999999999,
  'class3': 16.469271999999997,
  'end': 0.0,
  'class1': 11.011},
 {'class2': 13.822904199999998,
  'leisure': 8.912212125,
  'class3': 17.905711216,
  'end': 0.0,
  'class1': 13.2298865},
 {'class2': 14.5052128276,
  'leisure': 11.158213109375,
  'class3': 18.742331375847996,
  'end': 0.0,
  'class1': 15.265696001499999},
 {'class2': 14.902607403527798,
  'leisure': 13.112200326673124,
  'class3': 19.320729422557143,
  'end': 0.0,
  'class1': 16.5080922932945},
 {'class2': 15.177346475714643,
  'leisure': 14.360077941800741,
  'class3': 19.68484331778294,
  'end': 0.0,
  'class1': 17.47712234414663},
 {'class2': 15.350300575946896,
  'leisure': 15.307147008716438,
  'class3': 19.94097122015016,
  'end': 0.0,
  'class1': 18.10791710904163},
 {'class2': 15.471961329571325,
  'leisure': 15.936448094058655,
  'class3': 20.110712519940876,
  'end': 0.0,
  'class1': 18.566387191601976}]

Plotting value updates over time/iterations

def plot_value_update(mdp,iterations=10,plot_kw=None):
    Plot value updates over iterations for a given MDP.
    x = value_iteration_over_time(mdp,iterations=iterations)
    value_states = {k:[] for k in mdp.states}
    for i in x:
        for k,v in i.items():
    plt.title("Evolution of state utilities over iteration", fontsize=18)
    for v in value_states:
    plt.ylabel("Utilities of states",fontsize=16)

Value iterations for various discount factors ($\gamma$)

for i in range(4):
    mdp = CustomMDP(t, rewards, terminals, init, gamma=1-0.2*i)

Value iteration for two different reward structures

rewards1 = {
    'class1': 4,
    'class2': 6,
    'class3': 10,
    'leisure': -1,
    'end': 0

mdp1 = CustomMDP(t, rewards1, terminals, init, gamma=.95)

rewards2 = {
    'class1': 1,
    'class2': 1.5,
    'class3': 2.5,
    'leisure': -4,
    'end': 0

mdp2 = CustomMDP(t, rewards2, terminals, init, gamma=.95)
{'class2': 3.717135902670662,
 'leisure': -2.2988059489639867,
 'class3': 4.667674180844613,
 'end': 0.0,
 'class1': 2.245184728702343}

Policy iteration

def expected_utility(a, s, U, mdp):
    """The expected utility of doing a in state s, according to the MDP and U."""
    return sum([p * U[s1] for (p, s1) in mdp.T(s, a)])
def policy_evaluation(pi, U, mdp, k=20):
    """Returns an updated utility mapping U from each state in the MDP to its
    utility, using an approximation (modified policy iteration)."""
    R, T, gamma = mdp.R, mdp.T, mdp.gamma
    for i in range(k):
        for s in mdp.states:
            U[s] = R(s) + gamma * sum([p * U[s1] for (p, s1) in T(s, pi[s])])
    return U
def policy_iteration(mdp,verbose=0):
    """Solves an MDP by policy iteration"""
    U = {s: 0 for s in mdp.states}
    pi = {s: random.choice(mdp.actions(s)) for s in mdp.states}
    if verbose:
        print("Initial random choice:",pi)
    while True:
        U = policy_evaluation(pi, U, mdp)
        unchanged = True
        for s in mdp.states:
            a = max(mdp.actions(s), key=lambda a: expected_utility(a, s, U, mdp))
            if a != pi[s]:
                pi[s] = a
                unchanged = False
        if unchanged:
            return (pi,iter_count)
        if verbose:
            print("Policy after iteration {}: {}".format(iter_count,pi))

Policy iteration over the school MDP

({'class2': 'study',
  'leisure': 'quit',
  'class3': 'pub',
  'end': None,
  'class1': 'facebook'},
Initial random choice: {'class2': 'sleep', 'leisure': 'study', 'class3': 'sleep', 'end': None, 'class1': 'pub'}
Policy after iteration 1: {'class2': 'study', 'leisure': 'quit', 'class3': 'pub', 'end': None, 'class1': 'study'}
Policy after iteration 2: {'class2': 'study', 'leisure': 'quit', 'class3': 'pub', 'end': None, 'class1': 'facebook'}
({'class2': 'study',
  'leisure': 'quit',
  'class3': 'pub',
  'end': None,
  'class1': 'facebook'},

Does the result match using value iteration? We use the best_policy function to find out

{'class2': 'study',
 'leisure': 'quit',
 'class3': 'pub',
 'end': None,
 'class1': 'facebook'}

Comparing computation efficiency (time) of value and policy iterations

Clearly values iteration method takes more iterations to reach the same steady-state compared to policy iteration technique. But how does their computation time compare? Let's find out.

Running value and policy iteration on the school MDP many times and averaging

def compute_time(mdp,iteration_technique='value',n_run=1000,epsilon=0.01):
    Computes the average time for value or policy iteration for a given MDP
    n_run: Number of runs to average over, default 1000
    epsilon: Error margin for the value iteration
    if iteration_technique=='value':
        t1 = time.time()
        for _ in range(n_run):
        t2 = time.time()
        print("Average value iteration took {} milliseconds".format((t2-t1)*1000/n_run))
        t1 = time.time()
        for _ in range(n_run):
        t2 = time.time()

        print("Average policy iteration took {} milliseconds".format((t2-t1)*1000/n_run))
Average value iteration took 1.4509124755859375 milliseconds
Average policy iteration took 0.6311731338500977 milliseconds


Q-learning class

class QLearningAgent:
    """ An exploratory Q-learning agent. It avoids having to learn the transition
        model because the Q-value of a state can be related directly to those of
        its neighbors.
    def __init__(self, mdp, Ne, Rplus, alpha=None):

        self.gamma = mdp.gamma
        self.terminals = mdp.terminals
        self.all_act = mdp.actlist
        self.Ne = Ne  # iteration limit in exploration function
        self.Rplus = Rplus  # large value to assign before iteration limit
        self.Q = defaultdict(float)
        self.Nsa = defaultdict(float)
        self.s = None
        self.a = None
        self.r = None
        self.states = mdp.states
        self.T = mdp.T

        if alpha:
            self.alpha = alpha
            self.alpha = lambda n: 1./(1+n)

    def f(self, u, n):
        """ Exploration function. Returns fixed Rplus until
        agent has visited state, action a Ne number of times."""
        if n < self.Ne:
            return self.Rplus
            return u

    def actions_in_state(self, state):
        """ Return actions possible in given state.
            Useful for max and argmax. """
        if state in self.terminals:
            return [None]
            for a in self.all_act:
                if len(self.T(state,a))>0:
            return act_list

    def __call__(self, percept):
        s1, r1 = self.update_state(percept)
        Q, Nsa, s, a, r = self.Q, self.Nsa, self.s, self.a, self.r
        alpha, gamma, terminals = self.alpha, self.gamma, self.terminals,
        actions_in_state = self.actions_in_state

        if s in terminals:
            Q[s, None] = r1
        if s is not None:
            Nsa[s, a] += 1
            Q[s, a] += alpha(Nsa[s, a]) * (r + gamma * max(Q[s1, a1]
                                           for a1 in actions_in_state(s1)) - Q[s, a])
        if s in terminals:
            self.s = self.a = self.r = None
            self.s, self.r = s1, r1
            self.a = max(actions_in_state(s1), key=lambda a1: self.f(Q[s1, a1], Nsa[s1, a1]))
        return self.a

    def update_state(self, percept):
        """To be overridden in most cases. The default case
        assumes the percept to be of type (state, reward)."""
        return percept

Trial run

def run_single_trial(agent_program, mdp):
    """Execute trial for given agent_program
    and mdp."""

    def take_single_action(mdp, s, a):
        Select outcome of taking action a
        in state s. Weighted Sampling.
        x = random.uniform(0, 1)
        cumulative_probability = 0.0
        for probability_state in mdp.T(s, a):
            probability, state = probability_state
            cumulative_probability += probability
            if x < cumulative_probability:
        return state

    current_state = mdp.init
    while True:
        current_reward = mdp.R(current_state)
        percept = (current_state, current_reward)
        next_action = agent_program(percept)
        if next_action is None:
        current_state = take_single_action(mdp, current_state, next_action)

Testing Q-learning

# Define an agent
q_agent = QLearningAgent(school_mdp, Ne=1000, Rplus=2,alpha=lambda n: 60./(59+n))
['facebook', 'quit']
            {('class1', 'facebook'): 4.0,
             ('class1', 'study'): 0.0,
             ('class2', 'study'): 6.0,
             ('class2', 'sleep'): 0.0,
             ('class3', 'pub'): 10.0,
             ('class3', 'study'): 0.0,
             ('end', None): 0.0})
for i in range(200):
            {('class1', 'facebook'): 26.974555722658792,
             ('class1', 'study'): 0.0,
             ('class2', 'study'): 19.571766877990868,
             ('class2', 'sleep'): 0.0,
             ('class3', 'pub'): 29.565421585494995,
             ('class3', 'study'): 0.0,
             ('end', None): 12.465810937631858,
             ('leisure', 'facebook'): 1.9553102704690524,
             ('leisure', 'quit'): 26.864903103513296})
def get_U_from_Q(q_agent):
    U = defaultdict(lambda: -100.) # Large negative value for comparison
    for state_action, value in q_agent.Q.items():
        state, action = state_action
        if U[state] < value:
                    U[state] = value
    return U
defaultdict(<function __main__.get_U_from_Q.<locals>.<lambda>()>,
            {'class1': 26.974555722658792,
             'class2': 19.571766877990868,
             'class3': 29.565421585494995,
             'end': 12.465810937631858,
             'leisure': 26.864903103513296})
q_agent = QLearningAgent(school_mdp, Ne=100, Rplus=25,alpha=lambda n: 10/(9+n))
for i in range(100000):
defaultdict(<function get_U_from_Q.<locals>.<lambda> at 0x7f634b2bf400>, {'class1': 23.146927628036167, 'leisure': 20.762394158984392, 'class2': 19.203763769205953, 'end': 4.003597268740953, 'class3': 24.022171442235805})
{'class2': 15.792664558035112, 'leisure': 18.079639654154484, 'class3': 20.61614864677164, 'end': 0.0, 'class1': 20.306571436730533}

Function for utility estimate by Q-learning by many iterations

def qlearning_iter(agent_program,mdp,iterations=1000,print_final_utility=True):
    Function for utility estimate by Q-learning by many iterations
    Returns a history object i.e. a list of dictionaries, where utility estimate for each iteration is stored
    q_agent = QLearningAgent(grid_1, Ne=25, Rplus=1.5,
                            alpha=lambda n: 10000./(9999+n))
    for i in range(iterations):
        if len(U)==len(mdp.states):
    if print_final_utility:
    return qhistory

How do the long-term utility estimates with Q-learning compare with value iteration?

def plot_qlearning_vi(hist, vi,plot_n_states=None):
    Compares and plots a Q-learning and value iteration results for the utility estimate of an MDP's states
    hist: A history object from a Q-learning run
    vi: A value iteration estimate for the same MDP
    plot_n_states: Restrict the plotting for n states (randomly chosen)
    utilities={k:[] for k in list(vi.keys())}
    for h in hist:
        for state in h.keys():
    if plot_n_states==None:
        for state in list(vi.keys()):
            plt.title("Plot of State: {} over Q-learning iterations".format(str(state)),fontsize=16)
            plt.legend(['Q-learning estimates','Value iteration estimate'],fontsize=14)
            plt.ylabel("Utility of the state",fontsize=14)
        for state in list(vi.keys())[:plot_n_states]:
            plt.title("Plot of State: {} over Q-learning iterations".format(str(state)),fontsize=16)
            plt.legend(['Q-learning estimates','Value iteration estimate'],fontsize=14)
            plt.ylabel("Utility of the state",fontsize=14)

Testing the long-term utility learning for the small (default) grid world

# Define the Q-learning agent
q_agent = QLearningAgent(school_mdp, Ne=100, Rplus=2,alpha=lambda n: 100/(99+n))
# Obtain the history by running the Q-learning for many iterations
# Get a value iteration estimate using the same MDP
vi = value_iteration(school_mdp,epsilon=0.001)
# Compare the utility estimates from two methods
for alpha in range(100,5100,1000):
    q_agent = QLearningAgent(school_mdp, Ne=10, Rplus=2,alpha=lambda n: alpha/(alpha-1+n))
    # Obtain the history by running the Q-learning for many iterations
    # Get a value iteration estimate using the same MDP
    vi = value_iteration(school_mdp,epsilon=0.001)
    # Compare the utility estimates from two methods