# Reinforcement Tosser
  • Title: Reinforcement Tosser

  • Subtitle: Beating an Interview Question, now with RL

  • Date: 2018-10-15 15:05

  • Category: Puzzle

  • Tags: reinforcement-learning, puzzle, gym, interview

  • Author: Varun Nayyar

  • Status: draft

Reinforcement Learning#

See part 1. This is still in draft.

This is not going to be a treatise on reinforcement learning. For that, you should check out this excellent blog, but I think trying to decide how to bet given my current multiplier and number of tosses left in order to optimize sounds like something a simple reinforcment learning approach could use.

I’m going to use OpenAI’s gym to build this as it’s a nice standard model of doing things that pre-existing code can handle

# this is the cell/env you copy if you have your own agent to test

from gym import Env
from gym.spaces import Discrete
import random
from math import ceil

class Tosser(Env):
    def __init__(self, numToss=8, initialAmount=100, nA=101):
        """
        Each action is to be a percentage point of remaining amount, rounded up
        State is numToss, currAmnt
        
        Is done when numToss or currAmnt are 0.
        """
        self.numToss = self.tossLeft = numToss
        self.initialAmount = self.currAmnt = initialAmount
        self.action_space = Discrete(nA)  # 0..100 inclusive!
    
    def seed(self, seed=None):
        random.seed(seed)

    def reset(self):
        self.tossLeft = self.numToss
        self.currAmnt = self.initialAmount
        return (self.tossLeft, self.currAmnt)
    
    def amntBet(self, action):
        lam = action/(self.action_space.n-1)
        return ceil(lam * self.currAmnt)
    
    def done(self):
        return (self.currAmnt == 0) or (self.tossLeft == 0)
    
    def step(self, action):
        assert self.action_space.contains(action)
        assert not self.done()
        
        amntBet = self.amntBet(action)
        
        if random.random() > 0.5:  # heads
            self.currAmnt += 3 * amntBet
            info = "Win"
        else:
            self.currAmnt -= amntBet
            info = "Loss"

        self.tossLeft -= 1
        
        reward = self.currAmnt/self.initialAmount if self.done() else 0
        return (self.tossLeft, self.currAmnt), reward, self.done(), info            

Measure to optimize.#

The standard reinforcement technique, Qtables, that I’m using sort of tracks the mean using this equation \(Q(s,a) = (1-\alpha) Q(s,a) + \alpha * [reward + \gamma * max (Q(s_{new}))]\). As we can see, this is effectively a weighted sum of the past samples, where more recent samples have a heavier weight. However, since we have a skewed distribution with a lot of samples biased to larger values, this will skew the measure too and we’ll optimize for a measure similar to the average as opposed to the median result

As a result, we store the result of every trial in the Q table and return the median. We use the same discount equation, but we update each state visited once after every trial. This is a slow process since the median can’t be calculated in a streaming method, but we optimize by using a sortedlist.

## support code - this is not well documented and pulled from some prior work in this space
## Will move to a library eventually
import numpy as np
from sortedcontainers import SortedList

def medianFromSorted(slist):
    if len(slist) % 2 == 1:
        return slist[len(slist)//2]
    else:
        i1 = len(slist)//2
        i2 = i1-1
        return (slist[i1] + slist[i2])/2
        

class QDynamicTable:
    
    def __init__(self, nA=4, nS=None):
        from collections import defaultdict
        self.num_actions = nA
        self.Q = defaultdict(lambda: [SortedList() for i in range(nA)])

    def get_Q(self, s, a):
        """Q(s, a): get the Q value of (s, a) pair"""
        if self.Q[s][a]:
            return medianFromSorted(self.Q[s][a])
        return 0

    def get_max(self, s):
        """max Q(s): get the max of all Q value of state s"""
        return max(self.get_Q(s, a) for a in range(self.num_actions))

    def set_Q(self, s, a, q):
        """Q(s, a) = q: update the q value of (s, a) pair"""
        self.Q[s][a].add(q)

    def get_max_a(self, s):
        """argmax_a Q(s, a): get the action which has the highest Q in state s"""
        mx = self.get_max(s)
        for a in range(self.num_actions):
            if self.get_Q(s, a) == mx:
                return a
        return random.randint(0, self.num_actions)
            
    def __str__(self):
        output = []
        for s in self.Q:
            output.append(s.__str__() + ": " + ["{:07.4f}".format(self.get_Q(s, a) or 0) for a in range(self.num_actions)].__str__())
        output.sort()
        return "QTable (number of actions = " + str(self.num_actions) + ", states = " + str(
            len(output)) + "):\n" + "\n".join(output)


    
class Epsilon:
    """
    Gratuitous class for the epsilon greedy part of reinforcement learning.
    """
    def __init__(self, start=1.0, end=0.01, update_decrement=0.01):
        self.start = start
        self.end = end
        self.update_decrement = update_decrement
        self._value = self.start
        self.isTraining = True

    def decrement(self, count=1):
        self._value = max(self.end, self._value - self.update_decrement * count)
        return self

    @property
    def value(self):
        if self.isTraining:
            return self._value
        else:
            # always explore
            return 0

    @value.setter
    def value(self, val):
        self._value = val


    

Agent#

To keep things simple we discretize/quantize the states of the game’s current winnings. This is done via some trial and error and realisation that the game will skew left with a long tail.

"""
Agent code here

This uses qtable learning, basically we convert number of tosses left and amount earned
into a set of finite states. As we see more and more run throughs, we get a sense of which
actions result in a higher multipler

This is somewhat akin to dynamic programming, but since in this case we don't have perfect
substructure in that the location of our multipler changes depending on how previous tosses/bets
went and once we've developed a high multiplier, we don't want to keep betting in a risky sense
"""

alpha = 0.4 # don't learn too quickly 
gamma = 0.99 # value future actions high-ish

class Agent:
    def __init__(self, numToss=8, initialAmount=100, nA=11):
        self.numToss = numToss
        self.initialAmount = initialAmount
        self.nA = nA
        
        self.env = Tosser(numToss, initialAmount, nA)
        self.epsilon = Epsilon(start=1.0, end=0.05, update_decrement=0.002)
        
        self.moneybins = self.getMoneyBins()
        self.Q = QDynamicTable(nA=nA)

        
    def getMoneyBins(self):
        # i expect more bins in the early game
        bn = (np.linspace(0, 20, 100, endpoint=False),
              np.linspace(20, 100, 40, endpoint=False),
              np.linspace(100, 250, 30, endpoint=False),
              np.linspace(250, max(2 ** self.numToss, 251), 20))
        return np.concatenate(bn)

    def getAction(self, s):
        if self.epsilon.value > random.random():
            return self.env.action_space.sample()
        else:
            return self.Q.get_max_a(s)

    def discretize(self, obs):
        """
        take observations and return a state
        """
        return obs[0], np.digitize(obs[1]/self.initialAmount, self.moneybins).item()

    def train(self, episodes=100, debug=False):
        self.epsilon.isTraining = True
        maxreward = 0
        for i in range(episodes):
            if i % (episodes / 10) == 0:
                print(f"Episode: {i} of {episodes}, eps: {self.epsilon.value}")
            done = False
            s = self.discretize(self.env.reset())
            action_list = []
            if debug: print(f"Epsiode: {i}, start={s}, eps={self.epsilon.value}")
            while not done:
                a = self.getAction(s)
                action_list.append((s, a))
                s_1, reward, done, info = self.env.step(a)
                if done:
                    # don't crowd up the q matrix
                    s_1 = (0,0)

#                 if debug: print(s_1)
                s_1 = self.discretize(s_1)
                # this is the usual equation, however, I need to track the median
                # so an approach like the below will only approximate the mean
                # and using this approach also results in a lot of untrue 0's if i track the median
                # instead I store the state in a reverse approach
#                 newq = alpha*(reward + gamma * self.Q.get_max(s_1)) + (1-alpha) * self.Q.get_Q(s,a)
                if debug: print(f"action: {a/(self.nA-1) * 100:.2f}, info: {info}" \
                                f"Tosses left: {self.env.tossLeft}, currAmnt={self.env.currAmnt}, state={s_1}" \
                                f", reward={reward}, newq={newq:.2f}, eps={self.epsilon.value}")
#                 self.Q.set_Q(s, a, newq)
                s = s_1
            gammcnt = gamma
            # here the action_state list and visited in a reverse method, updating
            # the states we've seen so far while discounting gamma
            for state, act in reversed(action_list):
#                 print(f"updating s:{state}, act: {act}, r: {reward * gammcnt}")
                self.Q.set_Q(state, act, gammcnt * reward)
                gammcnt *= gamma            
        
            self.epsilon.decrement(0.005)

    def run(self):
        self.epsilon.isTraining = False
        s = self.discretize(self.env.reset())
        steps = 0
        done = False
        while not done:
            action = self.getAction(s)
            s_1_f, reward, done, info = self.env.step(action)
            s_1 = self.discretize(s_1_f)

#             print(f"Done: {done}, Curr State: {s}, Action {['L', 'R'][action]}, New State: {s_1_f}")
            s = s_1

        self.env.close()
        return reward
agent = Agent(8)
agent.train(episodes=100000, debug=False)
#print(agent.Q)
Episode: 0 of 100000, eps: 1.0
Episode: 10000 of 100000, eps: 0.9000000000004551
Episode: 20000 of 100000, eps: 0.8000000000009102
Episode: 30000 of 100000, eps: 0.7000000000013653
Episode: 40000 of 100000, eps: 0.6000000000018204
Episode: 50000 of 100000, eps: 0.5000000000022755
Episode: 60000 of 100000, eps: 0.4000000000021755
Episode: 70000 of 100000, eps: 0.3000000000020755
Episode: 80000 of 100000, eps: 0.2000000000019755
Episode: 90000 of 100000, eps: 0.10000000000191017
# gamma = 0.99
nruns = 1000
ressy = np.zeros(nruns)
for i in range(1000):
    ressy[i] = agent.run()
    
np.percentile(ressy, [25,50,75]), np.mean(ressy)
(array([0.74  , 2.01  , 7.0375]), 5.89594)
# gamma = 0.9
nruns = 1000
ressy = np.zeros(nruns)
for i in range(1000):
    ressy[i] = agent.run()
    
np.percentile(ressy, [25,50,75]), np.mean(ressy)
(array([0.76, 2.41, 7.74]), 6.52583)
# gamma = 0.3
nruns = 1000
ressy = np.zeros(nruns)
for i in range(1000):
    ressy[i] = agent.run()
    
np.percentile(ressy, [25,50,75]), np.mean(ressy)
(array([0.74, 2.17, 7.32]), 6.2765)

Conclusion#

This is a very rudimentary application of reinforcement learning (and I hope to revisit this question with DQN and other deep learning frameworks), that I have shown above.

The higher gamma values optimize for the upper tails, while lower optimizes for the lower tails. We can see training with gamma=0.9 gives a median result of 3.33 > best possible play with a constant \(\lambda\) which is nice! We could run this again with the Qtable’s measure returning 25% and 75% to see what values we would get.