Source code for calculate_return_example

from __future__ import division

import multiprocessing as mp

import numpy as np
import rospy
from geometry_msgs.msg import Twist, Vector3

from action_manager import start_action_manager
from gvf import GVF
from policy import Policy
from return_calculator import start_return_calculator
from state_representation import StateConstants


# go forward with probability 0.9 and left with probability 0.1
# class GoForwardWithRandomness(Policy):
#     def __init__(self, forward_percentage, *args, **kwargs):

#         self.forward_percentage = forward_percentage

#         # where the last action is recorded according
#         # to its respective constants
#         self.TURN = 2
#         self.FORWARD = 1
#         self.STOP = 0

#         Policy.__init__(self, *args, **kwargs)

#     def update(self, phi, observation, *args, **kwargs):

#         self.pi = np.zeros(self.action_space.size)

#         if observation['bump']:
#             self.pi[self.TURN] = 1
#         else:
#             self.pi[self.FORWARD] = self.forward_percentage
#             self.pi[self.TURN] = 1 - self.forward_percentage

[docs]class GoForwardWithRandomness(Policy): def __init__(self, forward_repeat_percentage, turn_repeat_percentage, *args, **kwargs): # self.forward_percentage = forward_percentage self.forward_repeat_percentage = forward_repeat_percentage self.turn_repeat_percentage = turn_repeat_percentage # where the last action is recorded according # to its respective constants self.TURN = 2 self.FORWARD = 1 self.STOP = 0 Policy.__init__(self, *args, **kwargs)
[docs] def update(self, phi, observation, *args, **kwargs): self.pi = np.zeros(self.action_space.size) if observation['bump']: self.pi[self.TURN] = 1 else: if self.last_index == self.FORWARD: self.pi[self.FORWARD] = self.forward_repeat_percentage self.pi[self.TURN] = 1 - self.forward_repeat_percentage else: self.pi[self.FORWARD] = 1 - self.turn_repeat_percentage self.pi[self.TURN] = self.turn_repeat_percentage
[docs]class TurnIfBump(Policy): def __init__(self, turn_repeat_percentage, *args, **kwargs): # self.forward_percentage = forward_percentage self.turn_repeat_percentage = turn_repeat_percentage # where the last action is recorded according # to its respective constants self.TURN = 2 self.FORWARD = 1 self.STOP = 0 Policy.__init__(self, *args, **kwargs)
[docs] def update(self, phi, observation, *args, **kwargs): self.pi = np.zeros(self.action_space.size) if observation['bump']: self.pi[self.TURN] = 1.0 else: self.pi[self.FORWARD] = 1.0
# if self.last_index == self.FORWARD: # self.pi[self.FORWARD] = 1.0 # else: # self.pi[self.FORWARD] = 1 - self.turn_repeat_percentage # self.pi[self.TURN] = self.turn_repeat_percentage # go forward if bump sensor is off
[docs]class GoForwardIfNotBump(Policy): def __init__(self, *args, **kwargs): # where the last action is recorded according # to its respective constants self.TURN = 2 self.FORWARD = 1 self.STOP = 0 Policy.__init__(self, *args, **kwargs)
[docs] def update(self, phi, observation, *args, **kwargs): if observation['bump']: chosen_index = self.STOP else: chosen_index = self.FORWARD self.pi = np.zeros(self.action_space.size) self.pi[chosen_index] = 1
if __name__ == "__main__": try: time_scale = 0.2 forward_speed = 0.2 turn_speed = 1.0 parameters = {'alpha': 0, 'alpha0': 0, 'lmbda': 0, } features_to_use = ['image', 'bias'] feature_indices = np.concatenate( [StateConstants.indices_in_phi[f] for f in features_to_use]) num_features = feature_indices.size one_if_bump = lambda observations: int(bool(sum(observations["bump"]))) discount_if_bump = lambda observations: 0 if bool( sum(observations["bump"])) else 0.9 # all available actions action_space = np.array([Twist(Vector3(0, 0, 0), Vector3(0, 0, 0)), Twist(Vector3(forward_speed, 0, 0), Vector3(0, 0, 0)), Twist(Vector3(0, 0, 0), Vector3(0, 0, turn_speed))]) distance_to_bump = GVF(cumulant=one_if_bump, gamma=discount_if_bump, target_policy=None, num_features=num_features, learner=None, name='DistanceToBump', logger=rospy.loginfo, feature_indices=feature_indices, **parameters) # behavior_policy = GoForwardWithRandomness( # forward_repeat_percentage=0.9, # turn_repeat_percentage = # 0.5, # action_space=action_space, # # feature_indices=feature_indices) behavior_policy = TurnIfBump(turn_repeat_percentage=0.5, action_space=action_space, feature_indices=feature_indices) target_policy = GoForwardIfNotBump(action_space=action_space, feature_indices=feature_indices) # Run action_manager action_manager_process = mp.Process(target=start_action_manager, name="action_manager", args=()) action_manager_process.start() # Run learning_foregound return_calculator_process = mp.Process(target=start_return_calculator, name="return_calculator", args=(time_scale, distance_to_bump, num_features, features_to_use, behavior_policy, target_policy)) return_calculator_process.start() except rospy.ROSInterruptException as detail: rospy.loginfo("Handling: {}".format(detail)) finally: try: return_calculator_process.join() action_manager_process.join() except NameError: pass