Reinforcement Learning
Contents
# Reinforcement Tosser
Title: Reinforcement Tosser
Subtitle: Beating an Interview Question, now with RL
Date: 2018-10-15 15:05
Category: Puzzle
Tags: reinforcement-learning, puzzle, gym, interview
Author: Varun Nayyar
Status: draft
Reinforcement Learning#
See part 1. This is still in draft.
This is not going to be a treatise on reinforcement learning. For that, you should check out this excellent blog, but I think trying to decide how to bet given my current multiplier and number of tosses left in order to optimize sounds like something a simple reinforcment learning approach could use.
I’m going to use OpenAI’s gym to build this as it’s a nice standard model of doing things that pre-existing code can handle
# this is the cell/env you copy if you have your own agent to test
from gym import Env
from gym.spaces import Discrete
import random
from math import ceil
class Tosser(Env):
def __init__(self, numToss=8, initialAmount=100, nA=101):
"""
Each action is to be a percentage point of remaining amount, rounded up
State is numToss, currAmnt
Is done when numToss or currAmnt are 0.
"""
self.numToss = self.tossLeft = numToss
self.initialAmount = self.currAmnt = initialAmount
self.action_space = Discrete(nA) # 0..100 inclusive!
def seed(self, seed=None):
random.seed(seed)
def reset(self):
self.tossLeft = self.numToss
self.currAmnt = self.initialAmount
return (self.tossLeft, self.currAmnt)
def amntBet(self, action):
lam = action/(self.action_space.n-1)
return ceil(lam * self.currAmnt)
def done(self):
return (self.currAmnt == 0) or (self.tossLeft == 0)
def step(self, action):
assert self.action_space.contains(action)
assert not self.done()
amntBet = self.amntBet(action)
if random.random() > 0.5: # heads
self.currAmnt += 3 * amntBet
info = "Win"
else:
self.currAmnt -= amntBet
info = "Loss"
self.tossLeft -= 1
reward = self.currAmnt/self.initialAmount if self.done() else 0
return (self.tossLeft, self.currAmnt), reward, self.done(), info
Measure to optimize.#
The standard reinforcement technique, Qtables, that I’m using sort of tracks the mean using this equation \(Q(s,a) = (1-\alpha) Q(s,a) + \alpha * [reward + \gamma * max (Q(s_{new}))]\). As we can see, this is effectively a weighted sum of the past samples, where more recent samples have a heavier weight. However, since we have a skewed distribution with a lot of samples biased to larger values, this will skew the measure too and we’ll optimize for a measure similar to the average as opposed to the median result
As a result, we store the result of every trial in the Q table and return the median. We use the same discount equation, but we update each state visited once after every trial. This is a slow process since the median can’t be calculated in a streaming method, but we optimize by using a sortedlist.
## support code - this is not well documented and pulled from some prior work in this space
## Will move to a library eventually
import numpy as np
from sortedcontainers import SortedList
def medianFromSorted(slist):
if len(slist) % 2 == 1:
return slist[len(slist)//2]
else:
i1 = len(slist)//2
i2 = i1-1
return (slist[i1] + slist[i2])/2
class QDynamicTable:
def __init__(self, nA=4, nS=None):
from collections import defaultdict
self.num_actions = nA
self.Q = defaultdict(lambda: [SortedList() for i in range(nA)])
def get_Q(self, s, a):
"""Q(s, a): get the Q value of (s, a) pair"""
if self.Q[s][a]:
return medianFromSorted(self.Q[s][a])
return 0
def get_max(self, s):
"""max Q(s): get the max of all Q value of state s"""
return max(self.get_Q(s, a) for a in range(self.num_actions))
def set_Q(self, s, a, q):
"""Q(s, a) = q: update the q value of (s, a) pair"""
self.Q[s][a].add(q)
def get_max_a(self, s):
"""argmax_a Q(s, a): get the action which has the highest Q in state s"""
mx = self.get_max(s)
for a in range(self.num_actions):
if self.get_Q(s, a) == mx:
return a
return random.randint(0, self.num_actions)
def __str__(self):
output = []
for s in self.Q:
output.append(s.__str__() + ": " + ["{:07.4f}".format(self.get_Q(s, a) or 0) for a in range(self.num_actions)].__str__())
output.sort()
return "QTable (number of actions = " + str(self.num_actions) + ", states = " + str(
len(output)) + "):\n" + "\n".join(output)
class Epsilon:
"""
Gratuitous class for the epsilon greedy part of reinforcement learning.
"""
def __init__(self, start=1.0, end=0.01, update_decrement=0.01):
self.start = start
self.end = end
self.update_decrement = update_decrement
self._value = self.start
self.isTraining = True
def decrement(self, count=1):
self._value = max(self.end, self._value - self.update_decrement * count)
return self
@property
def value(self):
if self.isTraining:
return self._value
else:
# always explore
return 0
@value.setter
def value(self, val):
self._value = val
Agent#
To keep things simple we discretize/quantize the states of the game’s current winnings. This is done via some trial and error and realisation that the game will skew left with a long tail.
"""
Agent code here
This uses qtable learning, basically we convert number of tosses left and amount earned
into a set of finite states. As we see more and more run throughs, we get a sense of which
actions result in a higher multipler
This is somewhat akin to dynamic programming, but since in this case we don't have perfect
substructure in that the location of our multipler changes depending on how previous tosses/bets
went and once we've developed a high multiplier, we don't want to keep betting in a risky sense
"""
alpha = 0.4 # don't learn too quickly
gamma = 0.99 # value future actions high-ish
class Agent:
def __init__(self, numToss=8, initialAmount=100, nA=11):
self.numToss = numToss
self.initialAmount = initialAmount
self.nA = nA
self.env = Tosser(numToss, initialAmount, nA)
self.epsilon = Epsilon(start=1.0, end=0.05, update_decrement=0.002)
self.moneybins = self.getMoneyBins()
self.Q = QDynamicTable(nA=nA)
def getMoneyBins(self):
# i expect more bins in the early game
bn = (np.linspace(0, 20, 100, endpoint=False),
np.linspace(20, 100, 40, endpoint=False),
np.linspace(100, 250, 30, endpoint=False),
np.linspace(250, max(2 ** self.numToss, 251), 20))
return np.concatenate(bn)
def getAction(self, s):
if self.epsilon.value > random.random():
return self.env.action_space.sample()
else:
return self.Q.get_max_a(s)
def discretize(self, obs):
"""
take observations and return a state
"""
return obs[0], np.digitize(obs[1]/self.initialAmount, self.moneybins).item()
def train(self, episodes=100, debug=False):
self.epsilon.isTraining = True
maxreward = 0
for i in range(episodes):
if i % (episodes / 10) == 0:
print(f"Episode: {i} of {episodes}, eps: {self.epsilon.value}")
done = False
s = self.discretize(self.env.reset())
action_list = []
if debug: print(f"Epsiode: {i}, start={s}, eps={self.epsilon.value}")
while not done:
a = self.getAction(s)
action_list.append((s, a))
s_1, reward, done, info = self.env.step(a)
if done:
# don't crowd up the q matrix
s_1 = (0,0)
# if debug: print(s_1)
s_1 = self.discretize(s_1)
# this is the usual equation, however, I need to track the median
# so an approach like the below will only approximate the mean
# and using this approach also results in a lot of untrue 0's if i track the median
# instead I store the state in a reverse approach
# newq = alpha*(reward + gamma * self.Q.get_max(s_1)) + (1-alpha) * self.Q.get_Q(s,a)
if debug: print(f"action: {a/(self.nA-1) * 100:.2f}, info: {info}" \
f"Tosses left: {self.env.tossLeft}, currAmnt={self.env.currAmnt}, state={s_1}" \
f", reward={reward}, newq={newq:.2f}, eps={self.epsilon.value}")
# self.Q.set_Q(s, a, newq)
s = s_1
gammcnt = gamma
# here the action_state list and visited in a reverse method, updating
# the states we've seen so far while discounting gamma
for state, act in reversed(action_list):
# print(f"updating s:{state}, act: {act}, r: {reward * gammcnt}")
self.Q.set_Q(state, act, gammcnt * reward)
gammcnt *= gamma
self.epsilon.decrement(0.005)
def run(self):
self.epsilon.isTraining = False
s = self.discretize(self.env.reset())
steps = 0
done = False
while not done:
action = self.getAction(s)
s_1_f, reward, done, info = self.env.step(action)
s_1 = self.discretize(s_1_f)
# print(f"Done: {done}, Curr State: {s}, Action {['L', 'R'][action]}, New State: {s_1_f}")
s = s_1
self.env.close()
return reward
agent = Agent(8)
agent.train(episodes=100000, debug=False)
#print(agent.Q)
Episode: 0 of 100000, eps: 1.0
Episode: 10000 of 100000, eps: 0.9000000000004551
Episode: 20000 of 100000, eps: 0.8000000000009102
Episode: 30000 of 100000, eps: 0.7000000000013653
Episode: 40000 of 100000, eps: 0.6000000000018204
Episode: 50000 of 100000, eps: 0.5000000000022755
Episode: 60000 of 100000, eps: 0.4000000000021755
Episode: 70000 of 100000, eps: 0.3000000000020755
Episode: 80000 of 100000, eps: 0.2000000000019755
Episode: 90000 of 100000, eps: 0.10000000000191017
# gamma = 0.99
nruns = 1000
ressy = np.zeros(nruns)
for i in range(1000):
ressy[i] = agent.run()
np.percentile(ressy, [25,50,75]), np.mean(ressy)
(array([0.74 , 2.01 , 7.0375]), 5.89594)
# gamma = 0.9
nruns = 1000
ressy = np.zeros(nruns)
for i in range(1000):
ressy[i] = agent.run()
np.percentile(ressy, [25,50,75]), np.mean(ressy)
(array([0.76, 2.41, 7.74]), 6.52583)
# gamma = 0.3
nruns = 1000
ressy = np.zeros(nruns)
for i in range(1000):
ressy[i] = agent.run()
np.percentile(ressy, [25,50,75]), np.mean(ressy)
(array([0.74, 2.17, 7.32]), 6.2765)
Conclusion#
This is a very rudimentary application of reinforcement learning (and I hope to revisit this question with DQN and other deep learning frameworks), that I have shown above.
The higher gamma values optimize for the upper tails, while lower optimizes for the lower tails. We can see training with gamma=0.9 gives a median result of 3.33 > best possible play with a constant \(\lambda\) which is nice! We could run this again with the Qtable’s measure returning 25% and 75% to see what values we would get.