from __future__ import unicode_literals
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import
from builtins import * # NOQA
from future import standard_library
from future.utils import with_metaclass
standard_library.install_aliases()
from abc import ABCMeta
from abc import abstractmethod
from abc import abstractproperty
from cached_property import cached_property
import chainer
from chainer import cuda
from chainer import functions as F
import numpy as np
[docs]class ActionValue(with_metaclass(ABCMeta, object)):
"""Struct that holds state-fixed Q-functions and its subproducts.
Every operation it supports is done in a batch manner.
"""
@abstractproperty
def greedy_actions(self):
"""Get argmax_a Q(s,a)."""
raise NotImplementedError()
@abstractproperty
def max(self):
"""Evaluate max Q(s,a)."""
raise NotImplementedError()
@abstractmethod
[docs] def evaluate_actions(self, actions):
"""Evaluate Q(s,a) with a = given actions."""
raise NotImplementedError()
[docs]class DiscreteActionValue(ActionValue):
"""Q-function output for discrete action space.
Args:
q_values (ndarray or chainer.Variable):
Array of Q values whose shape is (batchsize, n_actions)
"""
def __init__(self, q_values, q_values_formatter=lambda x: x):
assert isinstance(q_values, chainer.Variable)
self.xp = cuda.get_array_module(q_values.data)
self.q_values = q_values
self.n_actions = q_values.data.shape[1]
self.q_values_formatter = q_values_formatter
@cached_property
def greedy_actions(self):
return chainer.Variable(
self.q_values.data.argmax(axis=1).astype(np.int32))
@cached_property
def max(self):
with chainer.force_backprop_mode():
return F.select_item(self.q_values, self.greedy_actions)
def sample_epsilon_greedy_actions(self, epsilon):
assert self.q_values.data.shape[0] == 1, \
"This method doesn't support batch computation"
if np.random.random() < epsilon:
return chainer.Variable(
self.xp.asarray([np.random.randint(0, self.n_actions)],
dtype=np.int32))
else:
return self.greedy_actions
def evaluate_actions(self, actions):
return F.select_item(self.q_values, actions)
def compute_advantage(self, actions):
return self.evaluate_actions(actions) - self.max
def compute_double_advantage(self, actions, argmax_actions):
return (self.evaluate_actions(actions) -
self.evaluate_actions(argmax_actions))
def compute_expectation(self, beta):
return F.sum(F.softmax(beta * self.q_values) * self.q_values, axis=1)
def __repr__(self):
return 'DiscreteActionValue greedy_actions:{} q_values:{}'.format(
self.greedy_actions.data,
self.q_values_formatter(self.q_values.data))
[docs]class QuadraticActionValue(ActionValue):
"""Q-function output for continuous action space.
See: http://arxiv.org/abs/1603.00748
Define a Q(s,a) with A(s,a) in a quadratic form.
Q(s,a) = V(s,a) + A(s,a)
A(s,a) = -1/2 (u - mu(s))^T P(s) (u - mu(s))
Args:
mu (chainer.Variable): mu(s), actions that maximize A(s,a)
mat (chainer.Variable): P(s), coefficient matrices of A(s,a).
It must be positive definite.
v (chainer.Variable): V(s), values of s
min_action (ndarray): mininum action, not batched
max_action (ndarray): maximum action, not batched
"""
def __init__(self, mu, mat, v, min_action=None, max_action=None):
self.xp = cuda.get_array_module(mu.data)
self.mu = mu
self.mat = mat
self.v = v
self.min_action = self.xp.asarray(min_action, dtype=np.float32)
self.max_action = self.xp.asarray(max_action, dtype=np.float32)
self.batch_size = self.mu.data.shape[0]
@cached_property
def greedy_actions(self):
with chainer.force_backprop_mode():
a = self.mu
if self.min_action is not None:
a = F.maximum(
self.xp.broadcast_to(self.min_action, a.data.shape), a)
if self.max_action is not None:
a = F.minimum(
self.xp.broadcast_to(self.max_action, a.data.shape), a)
return a
@cached_property
def max(self):
with chainer.force_backprop_mode():
if self.min_action is None and self.max_action is None:
return F.reshape(self.v, (self.batch_size,))
else:
return self.evaluate_actions(self.greedy_actions)
def evaluate_actions(self, actions):
u_minus_mu = actions - self.mu
a = - 0.5 * \
F.batch_matmul(F.batch_matmul(
u_minus_mu, self.mat, transa=True), u_minus_mu)
return (F.reshape(a, (self.batch_size,)) +
F.reshape(self.v, (self.batch_size,)))
def compute_advantage(self, actions):
return self.evaluate_actions(actions) - self.max
def compute_double_advantage(self, actions, argmax_actions):
return (self.evaluate_actions(actions) -
self.evaluate_actions(argmax_actions))
def __repr__(self):
return 'QuadraticActionValue greedy_actions:{} v:{}'.format(
self.greedy_actions.data, self.v.data)
[docs]class SingleActionValue(ActionValue):
"""ActionValue that can evaluate only a single action."""
def __init__(self, evaluator, maximizer=None):
self.evaluator = evaluator
self.maximizer = maximizer
@cached_property
def greedy_actions(self):
with chainer.force_backprop_mode():
return self.maximizer()
@cached_property
def max(self):
with chainer.force_backprop_mode():
return self.evaluator(self.greedy_actions)
def evaluate_actions(self, actions):
return self.evaluator(actions)
def compute_advantage(self, actions):
return self.evaluator(actions) - self.max
def compute_double_advantage(self, actions, argmax_actions):
return (self.evaluate_actions(actions) -
self.evaluate_actions(argmax_actions))
def __repr__(self):
return 'SingleActionValue'