Source code for state_representation

#!/usr/bin/env python

"""Uses tilecoding to create state.

    Michele Albach, Shibhansh Dohare, David Quail, Parash Rahman, Niko Yasui.
import numpy as np
import rospy
from scipy.misc import comb

from CTiles import tiles
from tools import get_next_pow2, timing

[docs]class StateConstants: """ Constants useful for the tile coding in StateManager. Note: The constants here are just for reference. You may change them however you wish. """ # image tiles NUM_RANDOM_POINTS = 100 CHANNELS = 3 NUM_IMAGE_TILINGS = 4 NUM_IMAGE_INTERVALS = 4 SCALE_RGB = NUM_IMAGE_INTERVALS / 256.0 IMAGE_IHT_SIZE = get_next_pow2( (NUM_IMAGE_INTERVALS + 1) * NUM_IMAGE_TILINGS) PIXEL_FEATURE_LENGTH = CHANNELS * IMAGE_IHT_SIZE TOTAL_IMAGE_FEATURE_LENGTH = NUM_RANDOM_POINTS * PIXEL_FEATURE_LENGTH IMAGE_START_INDEX = 0 # constants relating to image size recieved IMAGE_LI = 480 # rows IMAGE_CO = 640 # columns # IMU tiles NUM_IMU_TILINGS = 4 NUM_IMU_TILES = 4 SCALE_IMU = NUM_IMU_TILES / 2.0 # range is [-1, 1] IMU_IHT_SIZE = get_next_pow2((NUM_IMU_TILES + 1) * NUM_IMU_TILINGS) IMU_START_INDEX = IMAGE_START_INDEX + TOTAL_IMAGE_FEATURE_LENGTH # Odom tiles NUM_ODOM_TILINGS = 1 NUM_ODOM_TILES = 5 SCALE_ODOM = NUM_ODOM_TILES / 2.0 # range is [0, 2] ODOM_IHT_SIZE = get_next_pow2( (NUM_ODOM_TILES + 1) * (NUM_ODOM_TILES + 1) * NUM_ODOM_TILINGS) ODOM_START_INDEX = IMU_START_INDEX + IMU_IHT_SIZE # IR tiles IR_START_INDEX = ODOM_START_INDEX + ODOM_IHT_SIZE # IR_ITH_SIZE = 64*3 # IR_ITH_SIZE = 6*3 IR_ITH_SIZE = 6 # IR_ITH_SIZE = 3 # pixel pairs NUM_PP = comb(NUM_RANDOM_POINTS, 2, exact=True) NUM_PP_TILINGS = 4 NUM_PP_TILES = 4 SCALE_PP = NUM_PP_TILES / 2. # [-1, 1] PP_IHT_SIZE = get_next_pow2((NUM_PP_TILES + 1) * NUM_PP_TILINGS) PP_FEATURE_LENGTH = NUM_PP * PP_IHT_SIZE PP_START_INDEX = IR_START_INDEX + IR_ITH_SIZE # the 1 represents the bias unit, 3 for bump, TOTAL_FEATURE_LENGTH = (TOTAL_IMAGE_FEATURE_LENGTH + IMU_IHT_SIZE + ODOM_IHT_SIZE + IR_ITH_SIZE + 3 + 1 + PP_FEATURE_LENGTH) indices_in_phi = { 'image': np.arange(IMAGE_START_INDEX, IMAGE_START_INDEX + TOTAL_IMAGE_FEATURE_LENGTH), 'cimage': np.arange(IMAGE_START_INDEX, IMAGE_START_INDEX + TOTAL_IMAGE_FEATURE_LENGTH), 'imu': np.arange(IMU_START_INDEX, IMU_START_INDEX + IMU_IHT_SIZE), 'odom': np.arange(ODOM_START_INDEX, ODOM_START_INDEX + ODOM_IHT_SIZE), 'ir': np.arange(IR_START_INDEX, IR_START_INDEX + IR_ITH_SIZE), 'pixel_pairs': np.arange(PP_START_INDEX, PP_START_INDEX + PP_FEATURE_LENGTH), 'bump': np.arange(PP_START_INDEX + PP_FEATURE_LENGTH, PP_START_INDEX + PP_FEATURE_LENGTH + 3), 'bias': np.array([TOTAL_FEATURE_LENGTH - 1]), 'last_action': np.array([], dtype=int), } num_active_features = { "image": NUM_RANDOM_POINTS * CHANNELS * NUM_IMAGE_TILINGS, 'cimage': NUM_RANDOM_POINTS * CHANNELS * NUM_IMAGE_TILINGS, 'imu': NUM_IMU_TILINGS, 'odom': NUM_ODOM_TILINGS, 'ir': 3, 'bump': 3, 'bias': 1, 'last_action': 1, 'pixel_pairs': NUM_PP * NUM_PP_TILINGS }
[docs]class StateManager(object): def __init__(self, features_to_use): """Sets up the hash tables used for each feature encoding. Args: features_to_use: strings representing which sensorimotor information should actually be incorporated into phi (see :py:meth:`~state_representation.StateManager.get_phi`). """ num_img_ihts = StateConstants.NUM_RANDOM_POINTS * \ StateConstants.CHANNELS img_iht_size = StateConstants.IMAGE_IHT_SIZE self.img_ihts = [tiles.CollisionTable(img_iht_size, "safe") for _ in range(num_img_ihts)] self.pp_ihts = [ tiles.CollisionTable(StateConstants.PP_IHT_SIZE, "safe") for _ in range(StateConstants.NUM_PP) ] self.imu_iht = tiles.CollisionTable(StateConstants.IMU_IHT_SIZE, "safe") self.odom_iht = tiles.CollisionTable(StateConstants.ODOM_IHT_SIZE, "safe") # set up mask to chose pixels num_pixels = StateConstants.IMAGE_LI * StateConstants.IMAGE_CO num_chosen = StateConstants.NUM_RANDOM_POINTS self.chosen_indices = np.random.choice(a=num_pixels, size=num_chosen, replace=False) self.pixel_mask = np.zeros(num_pixels, dtype=np.bool) self.pixel_mask[self.chosen_indices] = True self.pixel_mask = self.pixel_mask.reshape(StateConstants.IMAGE_LI, StateConstants.IMAGE_CO) # most recent data self.last_image_raw = np.zeros((StateConstants.IMAGE_LI, StateConstants.IMAGE_CO, 3), dtype=int) self.last_imu_raw = float() self.last_odom_raw = np.zeros(4) self.last_ir_raw = (0, 0, 0) self.last_charging_raw = False self.last_bump_raw = False self.features_to_use = features_to_use
[docs] @timing def get_phi(self, image, bump, ir, imu, odom, bias, weights=None, *args, **kwargs): """Gets the binary tile coding of all the pertinent fields. """ phi = np.zeros(StateConstants.TOTAL_FEATURE_LENGTH, dtype=bool) # phi = np.zeros(2) # self.features_to_use = set() # phi[0] = imu if imu is not None else 0 # phi[1] = 1 def valid_image(img): return img is not None and len(img) > 0 and len(img[0]) > 0 if not valid_image(image): image = self.last_image_raw if 'image' in self.features_to_use: rospy.logwarn("Image is empty.") self.last_image_raw = image if 'image' or 'cimage' in self.features_to_use: rgb_points = image[self.pixel_mask].flatten().astype(float) rgb_points *= StateConstants.SCALE_RGB rgb_inds = np.arange(StateConstants.NUM_RANDOM_POINTS * 3) tile_inds = [tiles.tiles(StateConstants.NUM_IMAGE_TILINGS, self.img_ihts[i], [rgb_points[i]]) for i in rgb_inds] # tile_inds = np.ones((900,4), dtype=int) rgb_inds *= StateConstants.IMAGE_IHT_SIZE indices = (tile_inds + rgb_inds[:, np.newaxis]).ravel() assert np.min(indices) >= StateConstants.IMAGE_START_INDEX assert np.max(indices) <= ( StateConstants.IMAGE_START_INDEX + StateConstants.TOTAL_IMAGE_FEATURE_LENGTH ) phi[indices] = 1 if 'pixel_pairs' in self.features_to_use: # get vector of pixels with each pixel=(Channel1,Channel2,...) num_channels = StateConstants.CHANNELS pixels = image[self.pixel_mask].reshape(-1, num_channels).astype(float) # get vector of L2 norms of above pixels norms = np.linalg.norm(pixels, axis=1) assert norms.size == pixels.shape[0] # find indices to multiply to get upper triangle # of the outer product of the arrays row, col = np.triu_indices(norms.size, 1) # calculate upper triangle of inner product: pixels, pixels dots = np.einsum('ij,ij->i', pixels[row], pixels[col]) # calculate upper triangle of inner product: norms, norms norm_product = np.einsum('i,i->i', norms[row], norms[col]) # find cosine similarity and avoid making nans div_by_zero = norm_product == 0 cos_sim = dots cos_sim[div_by_zero] = 0 cos_sim[~div_by_zero] /= norm_product[~div_by_zero] assert (np.abs(cos_sim) <= 1.1).all() # fix rounding errors cos_sim[cos_sim < -1] = -1 cos_sim[cos_sim > 1] = 1 assert cos_sim.size == StateConstants.NUM_PP cos_sim *= StateConstants.SCALE_PP # get indices form tile coding pp_inds = np.arange(StateConstants.NUM_PP) tile_inds = [tiles.tiles(StateConstants.NUM_PP_TILINGS, self.pp_ihts[i], [cos_sim[i]]) for i in pp_inds] # offset tilecoding for each pixel by the IHT size to map # each pixel to a different set of PP_IHT_SIZE indices pp_inds *= StateConstants.PP_IHT_SIZE indices = (tile_inds + pp_inds[:, np.newaxis]).ravel() # offset all indices to correspond to the Pixel pairs # section of the feature array indices += StateConstants.PP_START_INDEX assert np.min(indices) >= StateConstants.PP_START_INDEX assert np.max(indices) <= ( StateConstants.PP_START_INDEX + StateConstants.PP_FEATURE_LENGTH ) phi[indices] = 1 if imu is None: imu = self.last_imu_raw if 'imu' in self.features_to_use: rospy.logwarn("No imu value.") if 'imu' in self.features_to_use: indices = np.array(tiles.tiles(StateConstants.NUM_IMU_TILINGS, self.imu_iht, [imu * StateConstants.SCALE_IMU])) phi[indices + StateConstants.IMU_START_INDEX] = 1 if odom is None: odom = self.last_odom_raw if 'odom' in self.features_to_use: rospy.logwarn("No odom value.") if 'odom' in self.features_to_use: indices = np.array(tiles.tiles(StateConstants.NUM_ODOM_TILINGS, self.odom_iht, ( odom * StateConstants.SCALE_ODOM).tolist(), [])) phi[indices + StateConstants.ODOM_START_INDEX] = 1 if ir is None: ir = self.last_ir_raw if 'ir' in self.features_to_use: rospy.logwarn("No ir value.") if 'ir' in self.features_to_use and len(ir) >= 3: # indices = np.asarray(ir) # indices += np.array([0,64,128]) ir_1 = [int(x) for x in format(ir[0], '#08b')[2:]] ir_2 = [int(x) for x in format(ir[1], '#08b')[2:]] ir_3 = [int(x) for x in format(ir[2], '#08b')[2:]] value = ir_1 + ir_2 + ir_3 # # if only need the information about the region the robot is ( # left,center,right) # in_right = ir_1[3] | ir_1[0] | ir_2[3] | ir_2[0] | ir_3[3] | # ir_3[0] # in_left = ir_1[5] | ir_1[1] | ir_2[5] | ir_2[1] | ir_3[5] | # ir_3[1] # in_center = ir_1[4] | ir_1[2] | ir_2[4] | ir_2[2] | ir_3[4] | # ir_3[2] # if in_center: # in_left = 0 # in_right = 0 # value = [in_left, in_center, in_right] # if only want to use the data from the center IR of the robot value = ir_2 indices = np.nonzero(value)[0] phi[np.asarray(indices) + StateConstants.IR_START_INDEX] = 1 # bump if bump is None: bump = self.last_bump_raw if 'bump' in self.features_to_use: rospy.logwarn("No bump value") if 'bump' in self.features_to_use: phi[StateConstants.indices_in_phi['bump']] = bump # bias unit if 'bias' in self.features_to_use: phi[StateConstants.indices_in_phi['bias']] = 1 return phi
[docs] def get_observations(self, bump, ir, charging, odom, imu, **kwargs): """A way to access auxiliary state information. """ observations = { 'bump': any(bump) if bump is not None else self.last_bump_raw, 'ir': ir if ir is not None else self.last_ir_raw, 'charging': charging if charging is not None else self.last_charging_raw, 'speed': odom[3] if odom is not None else self.last_odom_raw[3], 'imu': imu if imu is not None else self.last_imu_raw, } return observations