Source code for auto_docking_policies

#!/usr/bin/env python

"""Contains the policies needed for auto-docking.

Authors:
    Shibhansh Dohare, Niko Yasui.
"""
from __future__ import division

import numpy as np
import rospy

from policy import Policy


[docs]class EGreedy(Policy): """An epsilon-greedy policy. Args: epsilon (float, optional): Proportion of time to take a random action. Default: 0 (greedy). action_space (numpy array of action): Numpy array containing all actions available to any agent. value_function: A function used by the Policy to update values of pi. This is usually a value function learned by a GVF. feature_indices (numpy array of bool): Indices of the feature vector corresponding to indices used by the :py:obj:`value_function`. """ def __init__(self, epsilon, action_space, value_function, feature_indices, *args, **kwargs): self.epsilon = epsilon self.value = value_function kwargs['action_space'] = action_space kwargs['value_function'] = value_function kwargs['feature_indices'] = feature_indices Policy.__init__(self, *args, **kwargs)
[docs] def update(self, phi, *args, **kwargs): phi = phi[self.feature_indices] q_fun = np.vectorize(lambda action: self.value_function(phi, action)) q_values = q_fun(self.action_space) best_q = np.max(q_values) max_indices = (q_values == best_q) self.pi[max_indices] = (1 - self.epsilon) / max_indices.sum() self.pi[~max_indices] = 0 self.pi += self.epsilon / self.action_space.size
[docs]class AlternatingRotation(Policy): """A policy for task-3 of auto-docking. According to the policy, the robot rotates in one direction for some time and in the opposite direction for some other time, with the goal of aligning the robot with the docking station. It's a non-markov policy. Should be used as behaviour policy. It is designed to improve exploration comapred to Greedy or eGreedy for task-3 (align the robot with the docking station). Args: action_space (numpy array of action): Numpy array containing all actions available to any agent. """ def __init__(self, action_space, *args, **kwargs): self.time_steps = 0 self.num_time_steps = 100 kwargs['action_space'] = action_space Policy.__init__(self, *args, **kwargs) self.LEFT = 0 self.RIGHT = 1
[docs] def update(self, *args, **kwargs): """Deterministically sets ``pi`` based on the timestep. """ if self.time_steps > self.num_time_steps: self.last_index = self.LEFT else: self.last_index = self.RIGHT self.pi *= 0 self.pi[self.last_index] = 1 self.time_steps += 1 self.time_steps %= 2 * self.num_time_steps
[docs]class ForwardIfClear(Policy): """An implementation of ForwardIfClear for task-2 of auto-docking. According to the policy, the robot moves forward with a high probablity and turns with a small probablity. It also turns if the robot was moving forward and it encountered a bump. It's a markov policy. Should be used as behaviour policy. It is designed to improve exploration comapred to Greedy or eGreedy for task-2 (taking the robot to the center IR region). """ def __init__(self, action_space, *args, **kwargs): # The last action is recorded according to its respective constants # these indices should be in order of action space self.FORWARD = 0 self.TURN_RIGHT = 2 self.TURN_LEFT = 1 kwargs['action_space'] = action_space Policy.__init__(self, *args, **kwargs)
[docs] def update(self, phi, observation, *args, **kwargs): """Updates ``pi`` depending on if there is a bump or not.""" self.pi *= 0 if observation['bump']: self.pi[self.TURN_LEFT] += 0.5 self.pi[self.TURN_RIGHT] += 0.5 else: self.pi += 0.05 self.pi[self.FORWARD] += 0.9
[docs]class Switch: """Switches between two policies. According to the policy, robot follows an exploring policy for some time steps and then follows the learned greedy policy for some time. It's a non-markov policy. Should be used as behaviour policy. It is designed to check how well the robot has performed at regular intervals. Attributes: explorer (policy): Policy to use for exploration. exploiter (policy): Policy to use for exploitation. num_timesteps_explore (int): Number of timesteps to run each policy before switching. t (int, not passed): Counter. Switch policies when counter reaches ``num_timesteps_explore``. """ def __init__(self, explorer, exploiter, num_timesteps_explore): self.explorer = explorer self.exploiter = exploiter self.num_timesteps_explore = num_timesteps_explore self.t = 0 self.action_space = self.explorer.action_space self.last_index = self.explorer.last_index
[docs] def update(self, *args, **kwargs): self.t += 1 self.t %= self.num_timesteps_explore if self.t > self.num_timesteps_explore: self.exploiter.update(*args, **kwargs) rospy.loginfo( 'Greedy policy is the behaviour policy, no learning now') to_return = 'target_policy' self.last_index = self.exploiter.last_index else: self.explorer.update(*args, **kwargs) rospy.loginfo('Explorer policy is the behaviour policy') to_return = 'behavior_policy' self.last_index = self.explorer.last_index return to_return
[docs] def get_probability(self, *args, **kwargs): if self.t > self.num_timesteps_explore: prob = self.exploiter.get_probability(*args, **kwargs) else: prob = self.explorer.get_probability(*args, **kwargs) return prob
[docs] def choose_action(self, *args, **kwargs): if self.t > self.num_timesteps_explore: action = self.exploiter.choose_action(*args, **kwargs) else: action = self.explorer.choose_action(*args, **kwargs) return action