Source code for wall_demo_example

"""Runs the demo that learns to avoid walls.

This module describes an agent that learns to avoid walls. It specifies
the agent's learning algorithm, parameters, policies, features, and
actions. The module also interfaces with the :doc:`learning_foreground`
and the :doc:`action_manager` to run the main learning loop and publish
actions respectively.

All parameters are set in ``if __name__ == "__main__"``

Authors:
    Michele Albach, Shibhansh Dohare, Banafsheh Rafiee,
    Parash Rahman, Niko Yasui.
"""
from __future__ import division

import multiprocessing as mp
from multiprocessing import Value
import random

import numpy as np
import rospy
from geometry_msgs.msg import Twist, Vector3

from action_manager import start_action_manager
from gtd import GTD
from gvf import GVF
from learning_foreground import start_learning_foreground
from policy import Policy
from state_representation import StateConstants
import tools


[docs]class GoForward(Policy): """Target Policy. Constant policy that only goes forward. Attributes: action_space (numpy array of action): Numpy array containing all actions available to any agent. fwd_action_index (int): Index of ``action_space`` containing the forward action. """ def __init__(self, action_space, fwd_action_index, *args, **kwargs): kwargs['action_space'] = action_space Policy.__init__(self, *args, **kwargs) self.pi *= 0.0 self.pi[fwd_action_index] += 1.0
[docs] def update(self, phi, observation, *args, **kwargs): pass
[docs]class PavlovSoftmax(Policy): """Behavior Policy. Softmax policy that forces the agent to select a "turn" action if the bump sensor is on. Attributes: time_scale (float): Number of seconds in a learning timestep. action_space (numpy array of action): Numpy array containing all actions available to any agent. value_function: A function used by the policy to update values of pi. This is usually a value function learned by a GVF. feature_indices (numpy array of bool): Indices of the feature vector corresponding to indices used by the :py:obj:`value_function`. """ def __init__(self, time_scale, action_space, value_function, feature_indices, *args, **kwargs): self.TURN = 1 self.FORWARD = 0 self.time_scale = time_scale kwargs['action_space'] = action_space kwargs['value_function'] = value_function kwargs['feature_indices'] = feature_indices Policy.__init__(self, *args, **kwargs)
[docs] def update(self, phi, observation, *args, **kwargs): """Updates :py:attr:`~policy.Policy.pi`.""" phi = phi[self.feature_indices] p = self.value_function(phi) if observation['bump']: self.pi *= 0 self.pi[self.TURN] = 1 else: # Joseph Modayil's constants # T = 1 / self.time_scale / 1 T = 6.0 k1 = np.log((T - 1) * (self.action_space.size - 1)) k2 = k1 * 4.0 # make preferences for each action prefs = np.zeros(2) def last(index): return self.last_index == index prefs[self.TURN] = k1 * last(self.TURN) prefs[self.FORWARD] = k2 * (0.5 - p) + k1 * last(self.FORWARD) self.pi = tools.softmax(prefs)
if __name__ == "__main__": try: random.seed(20170823) # turns on and off the hyperparameter search hyperparameter_experiment_mode = False action_manager_process = mp.Process(target=start_action_manager, name="action_manager", args=()) action_manager_process.start() # robotic parameters time_scale = 0.06 forward_speed = 0.2 turn_speed = 25. / 9 # all available actions action_space = np.array([Twist(Vector3(forward_speed, 0, 0), Vector3(0, 0, 0)), Twist(Vector3(0, 0, 0), Vector3(0, 0, turn_speed))]) # learning parameters alpha0 = 0.05 lmbda = 0.9 discount = 0.97 features_to_use = {'image', 'bias'} print_stats = ['cumulant', 'prediction'] feature_indices = np.concatenate( [StateConstants.indices_in_phi[f] for f in features_to_use]) num_active_features = sum( StateConstants.num_active_features[f] for f in features_to_use) num_features = feature_indices.size def discount_if_bump(obs): return 0 if obs["bump"] else discount def one_if_bump(obs): return int(obs['bump']) if obs is not None else 0 dtb_hp = {'alpha': alpha0 / num_active_features, 'beta': alpha0 / 1000 / num_active_features, 'lmbda': lmbda, 'alpha0': alpha0, 'num_features': num_features, 'feature_indices': feature_indices, } # prediction GVF dtb_policy = GoForward(action_space=action_space, fwd_action_index=0) dtb_learner = GTD(**dtb_hp) behavior_policy = PavlovSoftmax( action_space=action_space, feature_indices=dtb_hp['feature_indices'], value_function=dtb_learner.predict, time_scale=time_scale) distance_to_bump = GVF(cumulant=one_if_bump, gamma=discount_if_bump, target_policy=dtb_policy, learner=dtb_learner, name='DistanceToBump', **dtb_hp) # start processes cumulant_counter = Value('d', 0) foreground_process = mp.Process(target=start_learning_foreground, name="foreground", args=(time_scale, [distance_to_bump], features_to_use, behavior_policy, print_stats, None, cumulant_counter)) foreground_process.start() except rospy.ROSInterruptException as detail: rospy.loginfo("Handling: {}".format(detail)) finally: try: foreground_process.join() action_manager_process.join() except NameError: pass