Source code for return_calculator

#!/usr/bin/env python

"""
Author: Banafsheh Rafiee

Description:
ReturnCalculator samples some time steps from the behavior policy and
computes the return for them.
In order to compute the return for each sample time step, it switches from
the behavior policy to the target policy.
"""

from __future__ import division

from Queue import Queue

import geometry_msgs.msg as geom_msg
import numpy as np
import rospy
from cv_bridge.core import CvBridge
from std_msgs.msg import Bool

import tools
from state_representation import StateConstants, StateManager


[docs]class ReturnCalculator: def __init__(self, time_scale, gvf, num_features, features_to_use, behavior_policy, target_policy): self.features_to_use = set(features_to_use + ['core', 'ir']) topics = filter(lambda x: x, [tools.features[f] for f in self.features_to_use]) # set up dictionary to receive sensor info self.recent = {topic: Queue(0) for topic in topics} # set up ros rospy.init_node('agent', anonymous=True) # setup sensor parsers for topic in topics: rospy.Subscriber(topic, tools.topic_format[topic], self.recent[topic].put) self.topics = topics rospy.loginfo("Started sensor threads.") # smooth out the actions self.time_scale = time_scale self.r = rospy.Rate(int(1.0 / self.time_scale)) # agent info self.gvf = gvf self.target_policy = target_policy self.behavior_policy = behavior_policy self.state_manager = StateManager(features_to_use) self.feature_indices = np.concatenate( [StateConstants.indices_in_phi[f] for f in features_to_use]) self.img_to_cv2 = CvBridge().compressed_imgmsg_to_cv2 # information for managing the shift between the target and behavior # policies self.following_mu = 0 self.following_pi = 1 self.current_condition = self.following_mu self.current_policy = self.behavior_policy self.fixed_steps_under_pi = 100 self.fixed_step_under_mu = 50 self.mu_max_horizon = 50 self.steps_under_mu = self.fixed_step_under_mu + np.random.randint( self.mu_max_horizon) # MSRE information self.sample_size = 1000 self.samples_phi = np.zeros((self.sample_size, num_features)) self.samples_G = np.zeros(self.sample_size) self.cumulant_buffer = np.zeros(self.fixed_steps_under_pi) self.gamma_buffer = np.zeros(self.fixed_steps_under_pi) # Set up publishers action_publisher = rospy.Publisher('action_cmd', geom_msg.Twist, queue_size=1) termination_publisher = rospy.Publisher('termination', Bool, queue_size=1) self.publishers = {'action': action_publisher, 'termination': termination_publisher } rospy.loginfo("Done LearningForeground init.")
[docs] def create_state(self): # bumper constants from # http://docs.ros.org/hydro/api/kobuki_msgs/html/msg/SensorState.html bump_codes = [1, 4, 2] # initialize data additional_features = set(tools.features.keys() + ['charging']) sensors = self.features_to_use.union(additional_features) # build data (used to make phi) data = {sensor: None for sensor in sensors} for source in sensors - {'ir'}: temp = None try: while True: temp = self.recent[tools.features[source]].get_nowait() except: pass data[source] = temp temp = [] try: while True: temp.append(self.recent[tools.features['ir']].get_nowait()) except: pass # use only the last 10 values, helpful at the end of episode when we # have accumulated at lot or IR data data['ir'] = temp[-10:] if temp else None if data['core'] is not None: bump = data['core'].bumper data['bump'] = map(lambda x: bool(x & bump), bump_codes) data['charging'] = bool(data['core'].charger & 2) if data['ir'] is not None: ir = [[0] * 6] * 3 # bitwise 'or' of all the ir data in last time_step for temp in data['ir']: a = [[int(x) for x in format(temp, '#08b')[2:]] for temp in [ord(obs) for obs in temp.data]] ir = [[k | l for k, l in zip(i, j)] for i, j in zip(a, ir)] data['ir'] = [int(''.join([str(i) for i in ir_temp]), 2) for ir_temp in ir] if data['image'] is not None: data['image'] = np.asarray(self.img_to_cv2(data['image'])) if data['odom'] is not None: pos = data['odom'].pose.pose.position data['odom'] = np.array([pos.x, pos.y]) if data['imu'] is not None: data['imu'] = data['imu'].orientation.z if 'bias' in self.features_to_use: data['bias'] = True # data['weights'] = self.gvfs[0].learner.theta if self.gvfs else None phi = self.state_manager.get_phi(**data) if 'last_action' in self.features_to_use: last_action = np.zeros(self.behavior_policy.action_space.size) last_action[self.behavior_policy.last_index] = 1 phi = np.concatenate([phi, last_action]) observation = self.state_manager.get_observations(**data) # observation['action'] = self.last_action return phi, observation
[docs] def take_action(self, action): self.publishers['action'].publish(action)
[docs] def update_return_buffers(self, index, observations): self.cumulant_buffer[index] = self.gvf.cumulant(observations) self.gamma_buffer[index] = self.gvf.gamma(observations)
[docs] def compute_return(self, sample_number): G = 0.0 for i in range(self.fixed_steps_under_pi): gamma = 1 for j in range(i): gamma = gamma * self.gamma_buffer[j] G = G + gamma * self.cumulant_buffer[i] print("----------------------------------") print("computed return: ", G) print("----------------------------------") self.samples_G[sample_number] = G
[docs] def run(self): sample_number = 0 num_steps_followed_mu = 0 num_steps_followed_pi = 0 while not rospy.is_shutdown(): # start_time = time.time() # get new state phi, observations = self.create_state() print("-------------------------------------") print("num_steps_followed_mu:", num_steps_followed_mu) print("num_steps_followed_pi:", num_steps_followed_pi) print("current_condition:", self.current_condition) print("fixed_steps_under_pi:", self.fixed_steps_under_pi) print("steps_under_mu:", self.steps_under_mu) print("sample_number:", sample_number) print("-------------------------------------") # take action self.current_policy.update(phi, observations) action = self.current_policy.choose_action() self.take_action(action) # update cumulant and gamma buffers if following the target policy if self.current_condition == self.following_pi: self.update_return_buffers(index=num_steps_followed_pi, observations=observations) num_steps_followed_pi += 1 elif self.current_condition == self.following_mu: num_steps_followed_mu += 1 # figure out which policy should be followed if num_steps_followed_pi == self.fixed_steps_under_pi: self.current_condition = self.following_mu self.current_policy = self.behavior_policy self.steps_under_mu = self.fixed_step_under_mu + \ np.random.randint( self.mu_max_horizon) num_steps_followed_pi = 0 # compute and store return self.compute_return(sample_number) sample_number = sample_number + 1 print("sample_number:", sample_number) elif num_steps_followed_mu == self.steps_under_mu: self.current_condition = self.following_pi self.current_policy = self.target_policy num_steps_followed_mu = 0 # store phi print("----------------------------------") print("") print("") print("") print("sample phi: ", np.sum(phi[self.feature_indices])) print("----------------------------------") self.samples_phi[sample_number, :] = phi[self.feature_indices] # terminate if collected information for sample size if sample_number % 10 == 0 and num_steps_followed_pi == ( self.fixed_steps_under_pi - 1): np.savez("actual_return_{}.npz".format(self.gvf.name), _return=self.samples_G[:sample_number], samples=self.samples_phi[:sample_number, :], sample_size=sample_number) if sample_number == self.sample_size: np.savez("actual_return_{}.npz".format(self.gvf.name), _return=self.samples_G[:sample_number], samples=self.samples_phi[:sample_number, :], sample_size=sample_number) self.publishers["termination"].publish(True) break # sleep until next time step self.r.sleep()
[docs]def start_return_calculator(time_scale, GVF, num_features, features_to_use, behavior_policy, target_policy): try: return_calculator = ReturnCalculator(time_scale, GVF, num_features, features_to_use, behavior_policy, target_policy) return_calculator.run() except rospy.ROSInterruptException as detail: rospy.loginfo("Handling: {}".format(detail))