Commit 13f67e67 authored by David Dembinsky's avatar David Dembinsky
Browse files

Upload clean code and doc

parents
# Sequential Spatial Transformer Networks for Salient Object Classification
## David Dembinsky, Fatemeh Azimi, Federico Raue, Jörn Hees, Sebastian Palacio and Andreas Dengel
### TU Kaiserslautern, Germany
### German Research Center for Artificial Intelligence (DFKI), Germany
## About
This code was used for the experiments included in the paper mentioned above, introducing Deep Q-Spatial Transformer Networks (DQ-SSTN).
We train a Spatial Transformer Network (the so called "agent") to improve the performance of a downstream classifier in the context of cluttered saliency; a setup where besides the main object, there are other, smaller objects to distract the classifier. We train the Network using Q-Learning, a Reinforcment Learning algorithm, which allows us to train the network on non-differentiable feedback.
The DQ-SSTN sequentially selects transformations from a discrete action set and applies them. After a set number of iterations the transformed image is passed to the classifier::
![The DQ-SSTN](doc/img/DQ-SSTN.png "The Sequential Spatial Transformer iteratively selecting transformations to focus on the object of interest.")
We experimented with different reward formulations, taking advantage of the classifier's confidence after each transformation, as well as extracting bounding-box information from the dataset. Experiments indicated that the latter approch is more beneficial when dealing with a complex dataset such as PASCAL VOC. Our DQ-SSTN agent is able to select adequate transformations, to increase the IoU between the image frame and the main object's bounding-box:
![Qualitative results of the transformation process](doc/img/qualitative_results_small.png "Two examples where the DQ-SSTN succesfully selects transformations, to zoom around the object of interest.")
## Results
Using the PASCAL VOC dataset, the proposed DQ-SSTN is able to increase the IoU of the dataset by 11.53% and the accuracy of the classifier by 1.82%.
| **Method** | **Accuracy** | **IoU mean** |
|---------------------|--------------|---------------------------|
| Baseline classifier | 80.04 | 39.73 (no transformation) |
| Discrete IoU-reward | 81.86 | 51.26 |
The full paper can be found under **doc/**
## Structure
**src/**
- `main.py` : Starts the training, where different parameters can be changed in the start command
- `action_set.py` : The definition of the affine transformations the DQ-SSTN can choose from
- `auxils.py` : Contains methods used for training (calculate IoU, apply transformation) as well as for logging the run
- `loader.py` : The dataloader for the PASCAL VOC dataset. It assigns one single label to each image
- `networks.py` : The DQ-SSTN architecture as well as the classifier can be found here
- `replay_memory.py` : The replay memory used for Q-Learning, storing each single transformation
- **special_experiments/**
- *The code from here has to be moved to the main directory in order to make it run!*
- `main_rewards.py` : Includes the different reward deffinitions (Section 4.2)
- `main_iou.py` : Used to manually change the IoU of the images in the dataset in the proof of concept (Section 4.3.1).
- `main_hardmining.py` & `replay_memory_hardmining.py` : Used for the hard mining experiments (Section 4.3.2)
- `analyze_cluttered_salience.py`: Used to investigate the overlap between main object and other objects (Section 4.4.2)
The training can be started by `python src/main.py with CRes QRes "RL={'ID': $id, $rl }"`, where `$id` is a numeric ID that should be assigned and `$rl` are additional key words. The best foundd parameters are set as default values.
**res/**
- *Results get saved here. For each experiment a folder with a running ID (numeric) is created for runtime. After completion of the training, the results are saved in a folder with the same ID as speified in the start command (`ID_$id`)*
## Cite
\ No newline at end of file
import torch
class ActionSet:
r"""
Stores all possible affine transformations.
Functions:
`get_action_tensor()` Returns action tensor
"""
def __init__(self):
r"""
"""
T_size = 4.
# https://en.wikipedia.org/wiki/Affine_transformation#Image_transformation
# Action set config
# Translates 4 pixels
self.T = T_size/224 * 2 # Image coordinates are considered as absolute (-1,1), so from |pixel_max - pixel_min| = 2
# Zoom in by 0.8
self.S1 = 0.8
# Translation
ac_1_descrip = "Move Left"
ac_1 = torch.FloatTensor(( [1.0, 0.0, self.T],
[0, 1.0, 0.0],
[0.0, 0.0, 1.0])).view(1, 3, 3) # N,2,3
ac_2_descrip = "Move Right"
ac_2 = torch.FloatTensor(( [1.0, 0.0, (-self.T)],
[0, 1.0, 0.0],
[0.0, 0.0, 1.0])).view(1, 3, 3)
ac_3_descrip = "Move Up"
ac_3 = torch.FloatTensor(( [1.0, 0.0, 0.0],
[0.0, 1.0, self.T],
[0.0, 0.0, 1.0])).view(1, 3, 3)
ac_4_descrip = "Move Down"
ac_4 = torch.FloatTensor(( [1.0, 0.0, 0.0],
[0, 1.0, (-self.T)],
[0.0, 0.0, 1.0])).view(1, 3, 3)
# Scaling
ac_9_descrip = "Scale in xy"
ac_9 = torch.FloatTensor(( [self.S1, 0.0, 0.0],
[0.0, self.S1, 0.0],
[0.0, 0.0, 1.0])).view(1, 3, 3)
ac_9_bb = torch.FloatTensor(( [1/self.S1, 0.0, 0.0],
[0.0, 1/self.S1, 0.0],
[0.0, 0.0, 1.0])).view(1, 3, 3)
# Dummy non-operation (placeholder)
ac_0_descrip = "Identity"
ac_0 = torch.FloatTensor(( [1., 0.0, 0.0],
[0.0, 1., 0.0],
[0.0, 0.0, 1.0])).view(1, 3, 3)
action_list = [ac_0, ac_1, ac_2, ac_3, ac_4, ac_9]
self.action_descrip = [ac_0_descrip, ac_1_descrip, ac_2_descrip, ac_3_descrip, ac_4_descrip, ac_9_descrip]
self.action_tensor = torch.cat(action_list, dim=0)
self.bb_tensor = torch.cat([ac_0, ac_2, ac_1, ac_4, ac_3, ac_9_bb], dim=0)
def get_actions_name(self):
return self.action_descrip
def get_action_tensor(self):
r"""
Returns the action_tensor with shape ([A,3,3])
"""
return self.action_tensor
def get_bb_action_tensor(self):
return self.bb_tensor
def num_actions(self):
return self.action_tensor.shape[0]
if __name__ == "__main__":
acts = ActionSet()
print(acts.get_actions_name())
\ No newline at end of file
from os import mkdir, remove, rename
from os.path import abspath, isfile, join
from shutil import move, rmtree
from time import sleep
import torch
import torch.nn.functional as F
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def apply_action(acts, originals, action_ids, current_theta):
r"""
Applies the actions to the images in the batch by calculating the grid and applying it the input.
Parameters:
`acts`: The wrapper of the actions (from action_set.py)
`originals`: The original image
`action_ids`: The selected actions to apply to the images
`current_theta`: The current transformation or None, if its first step
"""
#b = originals.size(0)
img = originals.to(device)
aTensor = acts.get_action_tensor()
pred_thetas = aTensor[action_ids, :, :]
out_theta = torch.bmm(
pred_thetas,
current_theta) if current_theta is not None else pred_thetas
final_theta = out_theta[:, 0:2, :]
grid = F.affine_grid(final_theta, img.size(), align_corners=False).to(device)
out_img = F.grid_sample(img, grid, align_corners=False)
return (out_img, out_theta)
def apply_multiple_actions(acts, originals, action_ids):
r"""
Applies the actions to the images in the batch by calculating the grid and applying it to the input.
Instead of applying every action on tis own, it calculates the finsihed matrix.
Returns the state before and after the transition
Parameters:
`acts`: The wrapper of the actions (from action_set.py)
`originals`: The original image
`action_ids`: The selected actions to apply to the images as tensor of shape [B,T]
"""
img = originals.to(device)
new_theta = None
aTensor = torch.cat([acts.get_action_tensor(), acts.get_action_tensor()[0].unsqueeze(0)],dim=0) #Adds an additional identity to the end for replay memory
for t in range(action_ids.shape[1]):
old_theta = new_theta
id = action_ids[:,t]
pred_thetas = aTensor[id, :, :]
new_theta = torch.bmm(
pred_thetas,
old_theta) if old_theta is not None else pred_thetas
if old_theta is None:
old_theta = aTensor[-1].unsqueeze(0)
final_theta_old = old_theta[:, 0:2, :]
final_theta_new = new_theta[:, 0:2, :]
grid_old = F.affine_grid(final_theta_old, img.size(), align_corners=False).to(device)
grid_new = F.affine_grid(final_theta_new, img.size(), align_corners=False).to(device)
out_img_old = F.grid_sample(img, grid_old, align_corners=False)
out_img_new = F.grid_sample(img, grid_new, align_corners=False)
return out_img_old, out_img_new
def apply_action_to_bbx(acts, bbox, action_ids, bbox_theta):
aTensor = acts.get_bb_action_tensor()
a = aTensor[action_ids, :, :]
bbox_theta = torch.bmm(bbox_theta,a,) if bbox_theta is not None else a
b = torch.stack([ bbox[:,0:2],bbox[:,2:4],torch.ones([len(bbox),2]) ],dim=2).permute(0,2,1) # creates matrix (x1 x2)|(y1 y2)|(1 1)
b = torch.bmm(bbox_theta,b)
bbox = b[:,0:2].view(-1,4)
#bbox = torch.clamp(bbox,-1,1) # Clip bbox to image frage
return bbox, bbox_theta
def calculate_IoU(mask, bbox):
r"""
Calculates the IoU between each viewing mask and boundingbox. Returns the IoU as tensor [N]
`mask` & `bbox`: Tensor of shape [N,4]"""
mask_size = (mask[:,1]-mask[:,0])*(mask[:,3]-mask[:,2]) #[N]
bb_size = (bbox[:,1]-bbox[:,0])*(bbox[:,3]-bbox[:,2]) #[N]
# Intersection
xmax = torch.stack([bbox[:,1],mask[:,1]],dim=1)
xmin = torch.stack([bbox[:,0],mask[:,0]],dim=1)
ymax = torch.stack([bbox[:,3],mask[:,3]],dim=1)
ymin = torch.stack([bbox[:,2],mask[:,2]],dim=1)
x_overlap = torch.min(xmax,dim=1)[0] - torch.max(xmin,dim=1)[0] # [N]
x_overlap = torch.max( torch.stack( [x_overlap,torch.zeros_like(x_overlap)],dim=1) ,dim=1)[0] #[N]
y_overlap = torch.min(ymax,dim=1)[0] - torch.max(ymin,dim=1)[0] # [N]
y_overlap = torch.max( torch.stack( [y_overlap,torch.zeros_like(y_overlap)],dim=1) ,dim=1)[0] #[N]
intersection = x_overlap * y_overlap #[N]
union = mask_size + bb_size - intersection
return torch.true_divide(intersection,union)
def move_results(path, clean=False):
root = abspath(path)
destin = abspath(join(path, "Observer/"))
try:
mkdir(destin)
except FileExistsError:
pass
for f in ["config.json","cout.txt","metrics.json","run.json"]:
if not clean:
rename(abspath(join(root,f)), abspath(join(destin, f)))
else:
try:
remove(abspath(join(root,f)))
except:
pass
def finalize_directory(path,id):
root = abspath(path)
destin = abspath("./res/ID_{:03.0f}/".format(id))
try:
rmtree(destin,ignore_errors=True)
except:
pass
move(root,destin)
def save_for_table(id,best,test,train, ious):
try:
_PATH = "./res/Table.txt"
_LOCK = abspath("lck")
# Creates temp file as lock. If Lock exists, wait
attempts = 0
while attempts<10000000:
if isfile(_LOCK):
attempts +=1
if attempts > 1000:
return
from random import random
sleep(random())
else:
break
# Create lock
open(_LOCK,"x").close()
with open(abspath(_PATH),"a+") as f:
f.write("{id} & {best0:.2f} ({best1:.0f}) & {test0:.2f} ({test1:.4f}) & {train0:.2f} ({train1:.4f}) {iou_string}\\\\ \n".format(id=id,
best0=best[0],best1=best[1],
test0=test[0],test1=test[1],
train0=train[0],train1=train[1],
iou_string = " " if ious == (-1,-1) else " & {:.4f}/{:.4f}".format(ious[0],ious[1]) ))
remove(_LOCK)
except:
pass
try:
remove(_LOCK)
except:
pass
return
def adjust_iou(img,target,iou_t,transform_mask=None):
r"""Gets batch of imgs and targets and transforms them, so all have an IoU of iou_t
`iou_t` can be single value or torch tensor of shape [N]
`transform_mask` describes if the i-th image should be adapted IoU wise or not (boolean tensor)"""
# Center bbox
bbox = target[:,1:]
if transform_mask is None:
transform_mask = torch.tensor([1 for _ in range(len(bbox))])
transform_mask = transform_mask.type(torch.bool)
bbox_midpoint = (
(bbox[:,1]+bbox[:,0])/2,
(bbox[:,3]+bbox[:,2])/2
) #x,y
move = torch.FloatTensor(( [1., 0.0, 0.0],
[0.0, 1., 0.0],
[0.0, 0.0, 1.])).view(1, 3, 3).repeat(len(bbox),1,1)
move[transform_mask,0,2] = bbox_midpoint[0][transform_mask]
move[transform_mask,1,2] = bbox_midpoint[1][transform_mask]
bbox[transform_mask] = target[transform_mask,1:] - torch.stack([ bbox_midpoint[0],bbox_midpoint[0],bbox_midpoint[1],bbox_midpoint[1] ], dim=1)[transform_mask]
# Zoom in
x_larger_y = (bbox[:,1] > bbox[:,3])
min_scale = torch.tensor([bbox[i,1] if x_larger_y[i] else bbox[i,3] for i in range(len(bbox))])
# Get old IoU
iou_o = calculate_IoU( torch.tensor([[-1,1,-1,1] for _ in range(len(bbox))]), bbox)
# Scaling Factors
s = torch.sqrt(torch.true_divide(iou_o , iou_t))
s_too_small = s < min_scale
s2 = s.true_divide(min_scale) * s #s2 = s/s1
scale = ( # ([N],[N]) as (scale_x,scale_y)
torch.tensor([(min_scale[i] if x_larger_y[i] else s2[i]) if s_too_small[i] else s[i] for i in range(len(s))]),
torch.tensor([(s2[i] if x_larger_y[i] else min_scale[i]) if s_too_small[i] else s[i] for i in range(len(s))])
)
# Calculate new bbox
bbox_s = torch.stack([bbox[:,1]*(1-1/scale[0]), bbox[:,3] *(1-1/scale[1])], dim = 1) # Masks has [N,4], should return [N,2]
bTensor = torch.stack([ bbox_s[:,0],-bbox_s[:,0],bbox_s[:,1],-bbox_s[:,1] ],dim=1 ) # Has shape [N,4]
bbox[transform_mask] = bbox[transform_mask] + bTensor[transform_mask]
# Homogeneous Matrix
scale_tensor = torch.tensor([ [1, 0.0, 0.0], ######
[0.0, 1, 0.0],
[0.0, 0.0,1.0]] ).unsqueeze(0).repeat(len(bbox),1,1)
scale_tensor[transform_mask,0,0] = scale[0][transform_mask]
scale_tensor[transform_mask,1,1] = scale[1][transform_mask]
# Random translation
_rand_move = (
torch.rand(len(bbox)) * (1-bbox[:,1])*2 - (1-bbox[:,1]),
torch.rand(len(bbox)) * (1-bbox[:,3])*2 - (1-bbox[:,3])
)
rand_move = torch.FloatTensor(( [1., 0.0, 0.0],
[0.0, 1., 0.0],
[0.0, 0.0, 1.])).view(1, 3, 3).repeat(len(bbox),1,1)
rand_move[transform_mask,0,2] = _rand_move[0][transform_mask]
rand_move[transform_mask,1,2] = _rand_move[1][transform_mask]
bbox[transform_mask] = bbox[transform_mask] - torch.stack([ _rand_move[0],_rand_move[0],_rand_move[1],_rand_move[1] ], dim=1)[transform_mask]
transformations= [
# rand_move,scale_tensor,move
move, scale_tensor, rand_move
]
current_theta = None
for t in transformations:
current_theta = torch.bmm(
t,
current_theta) if current_theta is not None else t
final_theta = current_theta[:, 0:2, :]
grid = F.affine_grid(final_theta, img.size(), align_corners=False).to(device)
out_img = F.grid_sample(img, grid, align_corners=False)
target[:,1:] = bbox
return out_img,target
if __name__ == "__main__":
pass
import os
import torch
from torch.utils.data import DataLoader, ConcatDataset
from torch.utils.data.dataset import Dataset
import torchvision.transforms as T
from torchvision.datasets import VOCDetection
def get_Loader(train, batch_size, shuffle=True,num_workers=4, drop_last=False):
r"""
Returns a dataloader for the specified dataset, either vor train or test.
`train`: `True` returns the train set, `False` the test set
`batch_size`: N
"""
mode = "train" if train else "val"
dataset_ = get_pascal_dataset(mode)
return DataLoader(dataset_,batch_size,shuffle=shuffle,num_workers=num_workers,drop_last=drop_last)
# VOC PASCAL
def get_pascal_dataset(mode):
r"""
Uses horizontal flip to double the size of the dataset
"""
IMAGE_SIZE = 224
IDENTITY = T.RandomHorizontalFlip(p=0) # Pseudo-Identity function
def _voc_get_class_from_name(class_name,):
# One hot encoding might be required
if class_name == 'aeroplane':
return 0
elif class_name == 'bicycle':
return 1
elif class_name == 'bird':
return 2
elif class_name == 'boat':
return 3
elif class_name == 'bottle':
return 4
elif class_name == 'bus':
return 5
elif class_name == 'car':
return 6
elif class_name == 'cat':
return 7
elif class_name == 'chair':
return 8
elif class_name == 'cow':
return 9
elif class_name == 'diningtable':
return 10
elif class_name == 'dog':
return 11
elif class_name == 'horse':
return 12
elif class_name == 'motorbike':
return 13
elif class_name == 'person':
return 14
elif class_name == 'pottedplant':
return 15
elif class_name == 'sheep':
return 16
elif class_name == 'sofa':
return 17
elif class_name == 'train':
return 18
elif class_name == 'tvmonitor':
return 19
def _get_size_from_bb(bndbox):
x = int(bndbox["xmax"]) - int(bndbox["xmin"])
y = int(bndbox["ymax"]) - int(bndbox["ymin"])
return x*y
def _voc_adjust_bb(annotation,bndbox,):
r"""Returns the coordinates of the boundingbox normalized to the interval [-1,1] after reshaping the image to `IMAGE_SIZE`"""
# Get original coordinates
bb_box = [int(bndbox["xmin"]),int(bndbox["xmax"]),int(bndbox["ymin"]),int(bndbox["ymax"])]
# Get image size
h,w = int(annotation["annotation"]["size"]["height"]),int(annotation["annotation"]["size"]["width"])
# Squeeze to [-1,1]
bb_box = [ bb_box[0]/w*2-1, bb_box[1]/w*2-1 , bb_box[2]/h*2-1, bb_box[3]/h*2-1] # /wh = [0,1] ; *2 = [0,2] ; -1 = [-1,1]
return bb_box
def _voc_biggest_object(annotation):
r"""Returns the annotations of the biggest objcet of the given image by comparing the boundingbox sizes. Annotation is allways dict"""
objects = annotation["annotation"]["object"] #list of objects
if not isinstance(objects,list):
objects = [objects]
if len(objects)==1:
# Only one object in image
return objects[0]
sizes = []
for obj in objects:
sizes.append(_get_size_from_bb(obj["bndbox"]))
biggest_object_idx = torch.argmax(torch.tensor(sizes))
return objects[biggest_object_idx]
#########################################################################
class _voc_get_target(object):
def __init__(self,mirror) -> None:
self.mirror = mirror
return
def __call__(self,target):
annotation = target
r"""Returns the target as 5-tensor: [`label`,`xmin`,`xmax`,`ymin`,`ymax`] with the bbox beeing normalized to [-1,1] """
# If loader_object = True all objects are passed
object_annotation = _voc_biggest_object(annotation)
object_class = _voc_get_class_from_name(object_annotation["name"])
bbox = _voc_adjust_bb(annotation,object_annotation["bndbox"])
if self.mirror:
bbox[0], bbox[1] = -bbox[1], -bbox[0]
return torch.tensor([object_class] + bbox)
def __repr__(self) -> str:
return self.__class__.__name__ + 'mirror={}'.format(self.mirror)
#########################################################################
# Original size of Train set: 8218, Test set: 8333
pascal_root07 = "//ds/images/PascalVOC07"
pascal_root12= "//ds/images/PascalVOC12"
mirror = [IDENTITY]
if mode == "train":
mirror.append( T.RandomHorizontalFlip(p=1) ) # only mirror horizontally if test set
dataset_list = []
for m in mirror: # For train, the original and the mirrored dataset are included; for test only the original
transform = T.Compose([
T.Resize([IMAGE_SIZE,IMAGE_SIZE]),
m,
T.ToTensor(),
T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
dataset = VOCDetection
voc07 = dataset(root=pascal_root07,year="2007",download=False,image_set=mode,target_transform = _voc_get_target(m != IDENTITY), transform= transform)
voc12 = dataset(root=pascal_root12,year="2012",download=False,image_set=mode,target_transform = _voc_get_target(m != IDENTITY), transform= transform)
dataset_list.extend( [voc07,voc12] )
return ConcatDataset(dataset_list)
def get_transformed_pascal_dataset(train, batch_size, shuffle=True,num_workers=4, drop_last=False):
r"""Load a previously saved, transformed dataset"""
class transformed_PASCAL(Dataset):
def __init__(self,train = True):
path= "/netscratch/dembinsky/dataset/transformed_PASCAL/"
folder = "train/" if train else "val/"