Source code for state_representation

#!/usr/bin/env python

"""Uses tilecoding to create state.

Authors:
    Michele Albach, Shibhansh Dohare, David Quail, Parash Rahman, Niko Yasui.
"""
import numpy as np
import rospy
from scipy.misc import comb

from CTiles import tiles
from tools import get_next_pow2, timing


[docs]class StateConstants: """ Constants useful for the tile coding in StateManager. Note: The constants here are just for reference. You may change them however you wish. """ # image tiles NUM_RANDOM_POINTS = 100 CHANNELS = 3 NUM_IMAGE_TILINGS = 4 NUM_IMAGE_INTERVALS = 4 SCALE_RGB = NUM_IMAGE_INTERVALS / 256.0 IMAGE_IHT_SIZE = get_next_pow2( (NUM_IMAGE_INTERVALS + 1) * NUM_IMAGE_TILINGS) PIXEL_FEATURE_LENGTH = CHANNELS * IMAGE_IHT_SIZE TOTAL_IMAGE_FEATURE_LENGTH = NUM_RANDOM_POINTS * PIXEL_FEATURE_LENGTH IMAGE_START_INDEX = 0 # constants relating to image size recieved IMAGE_LI = 480 # rows IMAGE_CO = 640 # columns # IMU tiles NUM_IMU_TILINGS = 4 NUM_IMU_TILES = 4 SCALE_IMU = NUM_IMU_TILES / 2.0 # range is [-1, 1] IMU_IHT_SIZE = get_next_pow2((NUM_IMU_TILES + 1) * NUM_IMU_TILINGS) IMU_START_INDEX = IMAGE_START_INDEX + TOTAL_IMAGE_FEATURE_LENGTH # Odom tiles NUM_ODOM_TILINGS = 1 NUM_ODOM_TILES = 5 SCALE_ODOM = NUM_ODOM_TILES / 2.0 # range is [0, 2] ODOM_IHT_SIZE = get_next_pow2( (NUM_ODOM_TILES + 1) * (NUM_ODOM_TILES + 1) * NUM_ODOM_TILINGS) ODOM_START_INDEX = IMU_START_INDEX + IMU_IHT_SIZE # IR tiles IR_START_INDEX = ODOM_START_INDEX + ODOM_IHT_SIZE # IR_ITH_SIZE = 64*3 # IR_ITH_SIZE = 6*3 IR_ITH_SIZE = 6 # IR_ITH_SIZE = 3 # pixel pairs NUM_PP = comb(NUM_RANDOM_POINTS, 2, exact=True) NUM_PP_TILINGS = 4 NUM_PP_TILES = 4 SCALE_PP = NUM_PP_TILES / 2. # [-1, 1] PP_IHT_SIZE = get_next_pow2((NUM_PP_TILES + 1) * NUM_PP_TILINGS) PP_FEATURE_LENGTH = NUM_PP * PP_IHT_SIZE PP_START_INDEX = IR_START_INDEX + IR_ITH_SIZE # the 1 represents the bias unit, 3 for bump, TOTAL_FEATURE_LENGTH = (TOTAL_IMAGE_FEATURE_LENGTH + IMU_IHT_SIZE + ODOM_IHT_SIZE + IR_ITH_SIZE + 3 + 1 + PP_FEATURE_LENGTH) indices_in_phi = { 'image': np.arange(IMAGE_START_INDEX, IMAGE_START_INDEX + TOTAL_IMAGE_FEATURE_LENGTH), 'cimage': np.arange(IMAGE_START_INDEX, IMAGE_START_INDEX + TOTAL_IMAGE_FEATURE_LENGTH), 'imu': np.arange(IMU_START_INDEX, IMU_START_INDEX + IMU_IHT_SIZE), 'odom': np.arange(ODOM_START_INDEX, ODOM_START_INDEX + ODOM_IHT_SIZE), 'ir': np.arange(IR_START_INDEX, IR_START_INDEX + IR_ITH_SIZE), 'pixel_pairs': np.arange(PP_START_INDEX, PP_START_INDEX + PP_FEATURE_LENGTH), 'bump': np.arange(PP_START_INDEX + PP_FEATURE_LENGTH, PP_START_INDEX + PP_FEATURE_LENGTH + 3), 'bias': np.array([TOTAL_FEATURE_LENGTH - 1]), 'last_action': np.array([], dtype=int), } num_active_features = { "image": NUM_RANDOM_POINTS * CHANNELS * NUM_IMAGE_TILINGS, 'cimage': NUM_RANDOM_POINTS * CHANNELS * NUM_IMAGE_TILINGS, 'imu': NUM_IMU_TILINGS, 'odom': NUM_ODOM_TILINGS, 'ir': 3, 'bump': 3, 'bias': 1, 'last_action': 1, 'pixel_pairs': NUM_PP * NUM_PP_TILINGS }
[docs]class StateManager(object): def __init__(self, features_to_use): """Sets up the hash tables used for each feature encoding. Args: features_to_use: strings representing which sensorimotor information should actually be incorporated into phi (see :py:meth:`~state_representation.StateManager.get_phi`). """ num_img_ihts = StateConstants.NUM_RANDOM_POINTS * \ StateConstants.CHANNELS img_iht_size = StateConstants.IMAGE_IHT_SIZE self.img_ihts = [tiles.CollisionTable(img_iht_size, "safe") for _ in range(num_img_ihts)] self.pp_ihts = [ tiles.CollisionTable(StateConstants.PP_IHT_SIZE, "safe") for _ in range(StateConstants.NUM_PP) ] self.imu_iht = tiles.CollisionTable(StateConstants.IMU_IHT_SIZE, "safe") self.odom_iht = tiles.CollisionTable(StateConstants.ODOM_IHT_SIZE, "safe") # set up mask to chose pixels num_pixels = StateConstants.IMAGE_LI * StateConstants.IMAGE_CO num_chosen = StateConstants.NUM_RANDOM_POINTS self.chosen_indices = np.random.choice(a=num_pixels, size=num_chosen, replace=False) self.pixel_mask = np.zeros(num_pixels, dtype=np.bool) self.pixel_mask[self.chosen_indices] = True self.pixel_mask = self.pixel_mask.reshape(StateConstants.IMAGE_LI, StateConstants.IMAGE_CO) # most recent data self.last_image_raw = np.zeros((StateConstants.IMAGE_LI, StateConstants.IMAGE_CO, 3), dtype=int) self.last_imu_raw = float() self.last_odom_raw = np.zeros(4) self.last_ir_raw = (0, 0, 0) self.last_charging_raw = False self.last_bump_raw = False self.features_to_use = features_to_use
[docs] @timing def get_phi(self, image, bump, ir, imu, odom, bias, weights=None, *args, **kwargs): """Gets the binary tile coding of all the pertinent fields. """ phi = np.zeros(StateConstants.TOTAL_FEATURE_LENGTH, dtype=bool) # phi = np.zeros(2) # self.features_to_use = set() # phi[0] = imu if imu is not None else 0 # phi[1] = 1 def valid_image(img): return img is not None and len(img) > 0 and len(img[0]) > 0 if not valid_image(image): image = self.last_image_raw if 'image' in self.features_to_use: rospy.logwarn("Image is empty.") self.last_image_raw = image if 'image' or 'cimage' in self.features_to_use: rgb_points = image[self.pixel_mask].flatten().astype(float) rgb_points *= StateConstants.SCALE_RGB rgb_inds = np.arange(StateConstants.NUM_RANDOM_POINTS * 3) tile_inds = [tiles.tiles(StateConstants.NUM_IMAGE_TILINGS, self.img_ihts[i], [rgb_points[i]]) for i in rgb_inds] # tile_inds = np.ones((900,4), dtype=int) rgb_inds *= StateConstants.IMAGE_IHT_SIZE indices = (tile_inds + rgb_inds[:, np.newaxis]).ravel() assert np.min(indices) >= StateConstants.IMAGE_START_INDEX assert np.max(indices) <= ( StateConstants.IMAGE_START_INDEX + StateConstants.TOTAL_IMAGE_FEATURE_LENGTH ) phi[indices] = 1 if 'pixel_pairs' in self.features_to_use: # get vector of pixels with each pixel=(Channel1,Channel2,...) num_channels = StateConstants.CHANNELS pixels = image[self.pixel_mask].reshape(-1, num_channels).astype(float) # get vector of L2 norms of above pixels norms = np.linalg.norm(pixels, axis=1) assert norms.size == pixels.shape[0] # find indices to multiply to get upper triangle # of the outer product of the arrays row, col = np.triu_indices(norms.size, 1) # calculate upper triangle of inner product: pixels, pixels dots = np.einsum('ij,ij->i', pixels[row], pixels[col]) # calculate upper triangle of inner product: norms, norms norm_product = np.einsum('i,i->i', norms[row], norms[col]) # find cosine similarity and avoid making nans div_by_zero = norm_product == 0 cos_sim = dots cos_sim[div_by_zero] = 0 cos_sim[~div_by_zero] /= norm_product[~div_by_zero] assert (np.abs(cos_sim) <= 1.1).all() # fix rounding errors cos_sim[cos_sim < -1] = -1 cos_sim[cos_sim > 1] = 1 assert cos_sim.size == StateConstants.NUM_PP cos_sim *= StateConstants.SCALE_PP # get indices form tile coding pp_inds = np.arange(StateConstants.NUM_PP) tile_inds = [tiles.tiles(StateConstants.NUM_PP_TILINGS, self.pp_ihts[i], [cos_sim[i]]) for i in pp_inds] # offset tilecoding for each pixel by the IHT size to map # each pixel to a different set of PP_IHT_SIZE indices pp_inds *= StateConstants.PP_IHT_SIZE indices = (tile_inds + pp_inds[:, np.newaxis]).ravel() # offset all indices to correspond to the Pixel pairs # section of the feature array indices += StateConstants.PP_START_INDEX assert np.min(indices) >= StateConstants.PP_START_INDEX assert np.max(indices) <= ( StateConstants.PP_START_INDEX + StateConstants.PP_FEATURE_LENGTH ) phi[indices] = 1 if imu is None: imu = self.last_imu_raw if 'imu' in self.features_to_use: rospy.logwarn("No imu value.") if 'imu' in self.features_to_use: indices = np.array(tiles.tiles(StateConstants.NUM_IMU_TILINGS, self.imu_iht, [imu * StateConstants.SCALE_IMU])) phi[indices + StateConstants.IMU_START_INDEX] = 1 if odom is None: odom = self.last_odom_raw if 'odom' in self.features_to_use: rospy.logwarn("No odom value.") if 'odom' in self.features_to_use: indices = np.array(tiles.tiles(StateConstants.NUM_ODOM_TILINGS, self.odom_iht, ( odom * StateConstants.SCALE_ODOM).tolist(), [])) phi[indices + StateConstants.ODOM_START_INDEX] = 1 if ir is None: ir = self.last_ir_raw if 'ir' in self.features_to_use: rospy.logwarn("No ir value.") if 'ir' in self.features_to_use and len(ir) >= 3: # indices = np.asarray(ir) # indices += np.array([0,64,128]) ir_1 = [int(x) for x in format(ir[0], '#08b')[2:]] ir_2 = [int(x) for x in format(ir[1], '#08b')[2:]] ir_3 = [int(x) for x in format(ir[2], '#08b')[2:]] value = ir_1 + ir_2 + ir_3 # # if only need the information about the region the robot is ( # left,center,right) # in_right = ir_1[3] | ir_1[0] | ir_2[3] | ir_2[0] | ir_3[3] | # ir_3[0] # in_left = ir_1[5] | ir_1[1] | ir_2[5] | ir_2[1] | ir_3[5] | # ir_3[1] # in_center = ir_1[4] | ir_1[2] | ir_2[4] | ir_2[2] | ir_3[4] | # ir_3[2] # if in_center: # in_left = 0 # in_right = 0 # value = [in_left, in_center, in_right] # if only want to use the data from the center IR of the robot value = ir_2 indices = np.nonzero(value)[0] phi[np.asarray(indices) + StateConstants.IR_START_INDEX] = 1 # bump if bump is None: bump = self.last_bump_raw if 'bump' in self.features_to_use: rospy.logwarn("No bump value") if 'bump' in self.features_to_use: phi[StateConstants.indices_in_phi['bump']] = bump # bias unit if 'bias' in self.features_to_use: phi[StateConstants.indices_in_phi['bias']] = 1 return phi
[docs] def get_observations(self, bump, ir, charging, odom, imu, **kwargs): """A way to access auxiliary state information. """ observations = { 'bump': any(bump) if bump is not None else self.last_bump_raw, 'ir': ir if ir is not None else self.last_ir_raw, 'charging': charging if charging is not None else self.last_charging_raw, 'speed': odom[3] if odom is not None else self.last_odom_raw[3], 'imu': imu if imu is not None else self.last_imu_raw, } return observations